前段時間,一直有人問 spark streaming 偏移量問題。
什么是偏移量析藕?百度。
一個正陈咐穑或者線上的sparkstreaming執(zhí)行順序:
1.根據(jù)group獲取kafka當(dāng)前消費的位置
2.創(chuàng)建Dstream
3.rdd算子或者邏輯
4.將計算結(jié)果保存到數(shù)據(jù)庫中
5.提交偏移量
正常情況下噪径,spark streaming消費是沒辦法實現(xiàn)EOS柱恤。
為什么沒辦法實現(xiàn)EOS数初?
不管你先執(zhí)行第4步或者先執(zhí)行第5步,都是有問題的梗顺。先執(zhí)行第4步泡孩,那么執(zhí)行完第4步,程序掛了寺谤,偏移量沒提交仑鸥,下次程序重新啟動的時候,消息又要重新消費变屁,這是 at leastonce眼俊,至少一次消費,所以大部分情況下很多公司都是這樣的粟关,當(dāng)然可以通過數(shù)據(jù)庫來保證結(jié)果的冪等性(比如hbase疮胖,不管插入多少條,最終結(jié)果都是一條闷板,因為rowkey相同)澎灸。第5步先執(zhí)行就是扯淡了。遮晚。
那怎么才能保證消費的EOS呢性昭?
答:事務(wù)!
將第4部和第5部處在同一個事務(wù)中就能保證EOS县遣。如果將第4部保存在redis中糜颠,第5部偏移量保存到zk上或者hbase上這也是不行的,得自己實現(xiàn)分布式事務(wù)萧求。簡單點其兴,就是讓第4步和第5步保存在同一個數(shù)據(jù)庫中,然后這個數(shù)據(jù)庫帶事務(wù)功能就ok饭聚。
下面用mysql舉個例子忌警。
1.首先創(chuàng)建一張偏移量表:
CREATE TABLE `offset` (
`id` varchar(255) NOT NULL,
`topic` varchar(255) DEFAULT NULL,
`partition` int(11) DEFAULT NULL,
`offsets` bigint(255) DEFAULT NULL,
`group` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
2.定義一個抽象偏移量工具類
/**
* Created by zhuenjun on 2019/3/24.
為什么需要這個類?像我公司目前是把偏移量提交到原生的kafka內(nèi)部,kafka09版本是提交到zk上,
自從上了kafka2.0.1法绵,kafka自己都不把偏移量保存到zk上了箕速,那我還保存到zk上干嘛。朋譬。盐茎。
這個類的作用就是根據(jù)topic,group獲取外置的偏移量徙赢。
*/
public abstract class AbstractKafkaOffsetsTool implements Closeable {
final Logger logger = LoggerFactory.getLogger(this.getClass());
private static final String CLIENT_ID = "client-offsets";
String brokerServers;
public String topic;
public String group;
private KafkaConsumer<byte[], byte[]> kafkaConsumer;
public AbstractKafkaOffsetsTool(String brokerServers, String topic, String group) {
this.brokerServers = brokerServers;
this.topic = topic;
this.group = group;
}
/**
* 初始化的消費偏移量(不能直接拿來用)
*
* @return
*/
protected abstract Map<TopicPartition, Long> initConsumserOffsets(String topic, String group);
這個類的作用的話上面注釋有字柠。
3.獲取mysql中的消息偏移量
class MysqlOffsets extends SparkStreamingOffsets {
private static final String DRIVERCLASS = "com.mysql.jdbc.Driver";
private static final String URL = "jdbc:mysql://localhost:3306?serverTimezone=UTC&characterEncoding=utf-8";
private static final String USERNAME = "root";
private static final String PASSWORD = "root";
JdbcPool jdbcPool = new JdbcPool(
DRIVERCLASS,
URL,
USERNAME,
PASSWORD);
public MysqlOffsets(String group) {
super(group);
}
@Override
public Map<TopicPartition, Long> getConsumserOffsets(String brokerServer, String topic) {
AbstractKafkaOffsetsTool kafkOffsetsTool = new AbstractKafkaOffsetsTool(brokerServer, topic, group) {
private static final String sql = "select topic,`partition`,`offsets` from TEST.`OFFSET` where topic=? and `group`=?";
@Override
protected Map<TopicPartition, Long> initConsumserOffsets(String topic, String group) {
Map<TopicPartition, Long> topicPartitionLongMap = new HashMap<>();
jdbcPool.query(sql, new Object[]{topic, group}, result -> {
while (result.next()) {
topicPartitionLongMap.put(
new TopicPartition(
result.getString("topic"),
result.getInt("partition")
),
result.getLong("offsets"));
}
});
return topicPartitionLongMap;
}
};
Map<TopicPartition, Long> consumserOffsets = kafkOffsetsTool.getConsumserOffsets();
return consumserOffsets;
}
有人會在代碼中發(fā)現(xiàn),直接從mysql查出來就好,為何還要kafkOffsetsTool.getConsumserOffsets()這個方法狡赐?
直接從mysql拿出來的偏移量是不能直接用的窑业?為何?你的topic有可能會被刪除(或者刪除后又創(chuàng)建)枕屉,但是你的偏移量保存的地方卻在mysql常柄,這樣獲取到的偏移量消費不到消息可不行。所以這里還有個邏輯就是你獲取到的偏移量得和kafka內(nèi)部的topic的偏移量有個對比搀擂,這步很重要西潘,不然會有很多坑的!I谒獭喷市!
@Override
public void commitOffsets(OffsetRange[] offsetRangesResult, JavaInputDStream<ConsumerRecord<String, String>> directStream) throws Exception {
String upsertOffsetsSql = "INSERT INTO TEST.`OFFSET` (id,topic,`partition`,`offsets`,`group`)\n" +
"VALUES(?,?,?,?,?) ON DUPLICATE KEY UPDATE offsets = ?";
Connection connection = jdbcPool.getConnection();
//設(shè)置false就是開啟事務(wù)
connection.setAutoCommit(false);
try {
//提交偏移量
JdbcUtils.executePre(connection, upsertOffsetsSql, new ArrayList() {{
for (OffsetRange offsetRange : offsetRangesResult) {
//id="group_topic_partition"
add(new Object[]{
StringUtils.join(new Object[]{group, offsetRange.topic(), offsetRange.partition()}, "_"),
offsetRange.topic(),
offsetRange.partition(),
offsetRange.untilOffset(),
group,
offsetRange.untilOffset()
});
}
}});
//修改結(jié)果
JdbcUtils.execute(connection, "INSERT INTO TEST.test(id,name) value('1','spark')");
//提交事務(wù)
connection.commit();
} catch (Exception e) {
//回滾
connection.rollback();
}
}
昨天還有個小伙伴問我,如果偏移量提交到mysql威恼,數(shù)據(jù)不是會膨脹的很厲害品姓。。mysql也是可以實現(xiàn)upsert的方式的沃测,通過ON DUPLICATE KEY UPDATE缭黔。上面還有一個spark的細節(jié),mysql偏移量的提交是在driver端蒂破,而計算結(jié)果通常在executor馏谨,所以這塊有個注意的地方就是需要把executor的計算結(jié)果匯集到driver端更新,我這里是方面寫代碼附迷,就偷懶了寫死了持久化邏輯惧互,也沒抽離啥的。喇伯。
最后一句話喊儡,剩余代碼自己腦補!
拒絕伸手黨5揪荨0隆!!4以摺淤毛!
拒絕伸手黨!K懔5偷!K蚕睢蔗蹋!
拒絕伸手黨!4蚜堋V砗肌!R镩佟胁孙!
重要的事情說3遍_刖搿3屏邸!當(dāng)然有什么不懂稠鼻,可以互相交流冈止。微信號 cto_zej,加我備注 大數(shù)據(jù)候齿。