簡(jiǎn)介
這幾天率寡,有個(gè)大兄弟問(wèn)框弛,如何實(shí)現(xiàn) kafka 多線程批消費(fèi)貌踏,目標(biāo):
- 確保 exactly once 語(yǔ)義
- 數(shù)據(jù)不丟失
- 支持定時(shí)同步州泊,如15分鐘/30分鐘等
- 支持多線程(kafka 實(shí)際上不支持丧蘸,但是可以通過(guò)多個(gè) groupid+offset 控制實(shí)現(xiàn))
方案
- 上述目標(biāo)1、2、3力喷,簡(jiǎn)單實(shí)現(xiàn)可以通過(guò)存儲(chǔ)消費(fèi)對(duì)應(yīng)的offset來(lái)處理刽漂,也就是進(jìn)程啟動(dòng)時(shí)通過(guò) seek 賦值,指定 partition 從哪個(gè) offset 開(kāi)始消費(fèi)弟孟,此處使用zk存儲(chǔ)
-
對(duì)于目標(biāo)4贝咙,實(shí)現(xiàn)邏輯簡(jiǎn)單來(lái)說(shuō):
a. 找到 offset begin 與 offset end
b. 切分 offset 區(qū)間段,例如切分為5個(gè):[0,5),[5,10),[10,15),[15,20),[20,23)
c. 根據(jù)上述起5個(gè)線程拂募,消費(fèi)指定區(qū)間的數(shù)據(jù)庭猩,由于 kafka 同個(gè)groupid,并行消費(fèi)的話陈症,會(huì)發(fā)生 rebalance蔼水,類(lèi)似這樣的日志,具體原理就不分析了(詳細(xì)看下groupid)
image.png
d. 避免這個(gè)問(wèn)題录肯,我們可以模擬多 groupid 消費(fèi)趴腋,一個(gè) groupid 消費(fèi)一段【不用原生API消費(fèi)的話,可以直接用 spark struct streaming论咏,支持類(lèi)該模式】
核心代碼
- 第一步优炬,獲取此時(shí) kafka topic 信息,如拿到 topic partition 相關(guān)信息
consumer.subscribe(Collections.singletonList(kafkaTopic));
// ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
consumer.poll(Duration.ofSeconds(5));
// 獲取offset信息
List<PartitionInfo> partitionInfos = consumer.partitionsFor(kafkaTopic);
List<TopicPartition> topicPartitions = new ArrayList<>();
for (PartitionInfo partitionInfo : partitionInfos) {
TopicPartition partition = new TopicPartition(kafkaTopic, partitionInfo.partition());
topicPartitions.add(partition);
}
Map<TopicPartition, Long> topicPartitionMap = consumer.endOffsets(topicPartitions, Duration.ofSeconds(MAX_TIME_AWAIT_END_OFFSET, 0));
Map<Integer, Long> partitionOffsetMap = new ConcurrentHashMap<>();
topicPartitionMap.forEach((key, value) -> partitionOffsetMap.put(key.partition(), value > 0 ? value - 1 : 0L));
LOG.info("end offsets: {}", partitionOffsetMap.toString());
- 第二步厅贪,找到緩存在 zk 的 partition offset 信息蠢护,并設(shè)置 partition 的 metadata
if (! isConsumeUserOffset) {
for (TopicPartition partition : topicPartitions) {
Long persistOffset = getPersistOffset(partition.partition());
if (persistOffset <= 0) {
persistOffset = consumer.position(partition);
}
LOG.info("get persist offset, partition num {}: {}", partition.partition(), persistOffset);
// 啟動(dòng)時(shí)使用上次offset,消費(fèi)下一次养涮,所以 +1
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(persistOffset + 1);
consumer.seek(partition, offsetAndMetadata);
}
} else {
if (autoOffsetReset.equals("earliest")) {
LOG.info("auto.offset.reset={}", autoOffsetReset);
consumer.seekToBeginning(topicPartitions);
}
if (autoOffsetReset.equals("latest")) {
LOG.info("auto.offset.reset={}", autoOffsetReset);
consumer.seekToEnd(topicPartitions);
}
if (autoOffsetReset.equals("None")) {
// topic各分區(qū)都存在已提交的offset時(shí)葵硕,從offset后開(kāi)始消費(fèi);只要有一個(gè)分區(qū)不存在已提交的offset单寂,則拋出異常, 忽略不用
LOG.info("auto.offset.reset={}, it has so many problem, do nothing!", autoOffsetReset);
}
}
- 第三部吐辙,按批消費(fèi)宣决,并存儲(chǔ)該批次每個(gè) partition 最后一個(gè) offset
while (! isEndToConsume) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
LOG.info("consumer records length: {}", records.count());
List<String> writeRecords = new ArrayList<>();
for (ConsumerRecord<String, String> record : records) {
// 替換非法字符
String value = record.value().replaceAll("\n|\t|\r", "\001");
try {
Map<String, Object> fieldMap = JSON.parseObject(value, new TypeReference<Map<String, Object>>() {
});
JSONObject jsonObject = new JSONObject(fieldMap);
writeRecords.add(jsonObject.toJSONString());
cachePartitionOffsetMap.put(record.partition(), record.offset());
recordCount ++;
} catch (Exception e) {
// 非 JSON 數(shù)據(jù)時(shí)直接跳出
LOG.info("record parse to json exception: {}, record: {}" , e.getMessage(), value);
// e.printStackTrace();
continue;
}
}
}
- 第四步,存儲(chǔ) partition offset 信息
// 判斷是否消費(fèi)到最后的offset
Map<Integer, Boolean> cacheHasConsumedPartitionMap = new ConcurrentHashMap<>();
if (! cachePartitionOffsetMap.isEmpty()) {
LOG.info("cache partition offset map: {}", cachePartitionOffsetMap.toString());
for(Integer partition : partitionOffsetMap.keySet()) {
if (cachePartitionOffsetMap.containsKey(partition)) {
long cacheOffset = cachePartitionOffsetMap.get(partition);
// 存儲(chǔ)最新 offset昏苏,作為程序退出依據(jù)
if (cacheOffset >= partitionOffsetMap.get(partition)) {
cacheHasConsumedPartitionMap.put(partition, true);
}
}
}
// 每個(gè)批次都存起來(lái)
cachePartitionOffsetMap.forEach((key, value) -> setPersistOffset(key, value));
}
// 退出條件
if (cacheHasConsumedPartitionMap.size() == partitionOffsetMap.size()) {
isEndToConsume = true;
}
// 程序觸發(fā)提交
consumer.commitSync();
運(yùn)行
java -cp /data/shopee/logtohdfs/logtohdfs-1.0-SNAPSHOT-jar-with-dependencies.jar com.xxx.bigdata.kafka.KafkaReader xxx_json.properties xxx
image.png
image.png
image.png
每個(gè) partition 對(duì)應(yīng)的 offset 都超過(guò)圖一尊沸,程序開(kāi)始初獲取的offset,則退出這次調(diào)度
待完善
- 多線程按批處理(目標(biāo)4)贤惯,其實(shí)也就是洼专,基于獲取全局的 topic 信息,分拆生成多個(gè)新的 consumer孵构,暫不實(shí)現(xiàn)屁商。