kafka exactly once 批處理

簡(jiǎn)介

這幾天率寡,有個(gè)大兄弟問(wèn)框弛,如何實(shí)現(xiàn) kafka 多線程批消費(fèi)貌踏,目標(biāo):

  1. 確保 exactly once 語(yǔ)義
  2. 數(shù)據(jù)不丟失
  3. 支持定時(shí)同步州泊,如15分鐘/30分鐘等
  4. 支持多線程(kafka 實(shí)際上不支持丧蘸,但是可以通過(guò)多個(gè) groupid+offset 控制實(shí)現(xiàn))

方案

  1. 上述目標(biāo)1、2、3力喷,簡(jiǎn)單實(shí)現(xiàn)可以通過(guò)存儲(chǔ)消費(fèi)對(duì)應(yīng)的offset來(lái)處理刽漂,也就是進(jìn)程啟動(dòng)時(shí)通過(guò) seek 賦值,指定 partition 從哪個(gè) offset 開(kāi)始消費(fèi)弟孟,此處使用zk存儲(chǔ)
  2. 對(duì)于目標(biāo)4贝咙,實(shí)現(xiàn)邏輯簡(jiǎn)單來(lái)說(shuō):
    a. 找到 offset begin 與 offset end
    b. 切分 offset 區(qū)間段,例如切分為5個(gè):[0,5),[5,10),[10,15),[15,20),[20,23)
    c. 根據(jù)上述起5個(gè)線程拂募,消費(fèi)指定區(qū)間的數(shù)據(jù)庭猩,由于 kafka 同個(gè)groupid,并行消費(fèi)的話陈症,會(huì)發(fā)生 rebalance蔼水,類(lèi)似這樣的日志,具體原理就不分析了(詳細(xì)看下groupid)


    image.png

    d. 避免這個(gè)問(wèn)題录肯,我們可以模擬多 groupid 消費(fèi)趴腋,一個(gè) groupid 消費(fèi)一段【不用原生API消費(fèi)的話,可以直接用 spark struct streaming论咏,支持類(lèi)該模式】

核心代碼

  1. 第一步优炬,獲取此時(shí) kafka topic 信息,如拿到 topic partition 相關(guān)信息
consumer.subscribe(Collections.singletonList(kafkaTopic));
//        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
consumer.poll(Duration.ofSeconds(5));

// 獲取offset信息
List<PartitionInfo> partitionInfos = consumer.partitionsFor(kafkaTopic);
List<TopicPartition> topicPartitions = new ArrayList<>();
for (PartitionInfo partitionInfo : partitionInfos) {
    TopicPartition partition = new TopicPartition(kafkaTopic, partitionInfo.partition());
    topicPartitions.add(partition);
}

Map<TopicPartition, Long> topicPartitionMap = consumer.endOffsets(topicPartitions, Duration.ofSeconds(MAX_TIME_AWAIT_END_OFFSET, 0));
Map<Integer, Long> partitionOffsetMap = new ConcurrentHashMap<>();
topicPartitionMap.forEach((key, value) -> partitionOffsetMap.put(key.partition(), value > 0 ? value - 1 : 0L));
LOG.info("end offsets: {}", partitionOffsetMap.toString());

  1. 第二步厅贪,找到緩存在 zk 的 partition offset 信息蠢护,并設(shè)置 partition 的 metadata
if (! isConsumeUserOffset) {
    for (TopicPartition partition : topicPartitions) {
        Long persistOffset = getPersistOffset(partition.partition());
        if (persistOffset <= 0) {
            persistOffset = consumer.position(partition);
        }
        LOG.info("get persist offset, partition num {}: {}", partition.partition(), persistOffset);
        // 啟動(dòng)時(shí)使用上次offset,消費(fèi)下一次养涮,所以 +1
        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(persistOffset + 1);
        consumer.seek(partition, offsetAndMetadata);
    }
} else {
    if (autoOffsetReset.equals("earliest")) {
        LOG.info("auto.offset.reset={}", autoOffsetReset);
        consumer.seekToBeginning(topicPartitions);
    }
    if (autoOffsetReset.equals("latest")) {
        LOG.info("auto.offset.reset={}", autoOffsetReset);
        consumer.seekToEnd(topicPartitions);
    }
    if (autoOffsetReset.equals("None")) {
        // topic各分區(qū)都存在已提交的offset時(shí)葵硕,從offset后開(kāi)始消費(fèi);只要有一個(gè)分區(qū)不存在已提交的offset单寂,則拋出異常, 忽略不用
        LOG.info("auto.offset.reset={}, it has so many problem, do nothing!", autoOffsetReset);
    }
}
  1. 第三部吐辙,按批消費(fèi)宣决,并存儲(chǔ)該批次每個(gè) partition 最后一個(gè) offset
while (! isEndToConsume) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
    LOG.info("consumer records length: {}", records.count());
    List<String> writeRecords = new ArrayList<>();
    for (ConsumerRecord<String, String> record : records) {
        // 替換非法字符
        String value = record.value().replaceAll("\n|\t|\r", "\001");
        try {
            Map<String, Object> fieldMap = JSON.parseObject(value, new TypeReference<Map<String, Object>>() {
            });

            JSONObject jsonObject = new JSONObject(fieldMap);
            writeRecords.add(jsonObject.toJSONString());
            cachePartitionOffsetMap.put(record.partition(), record.offset());
            recordCount ++;

        } catch (Exception e) {
            // 非 JSON 數(shù)據(jù)時(shí)直接跳出
            LOG.info("record parse to json exception: {}, record: {}" , e.getMessage(), value);
//                    e.printStackTrace();
            continue;
        }
    }
 }
  1. 第四步,存儲(chǔ) partition offset 信息
// 判斷是否消費(fèi)到最后的offset
Map<Integer, Boolean> cacheHasConsumedPartitionMap = new ConcurrentHashMap<>();
if (! cachePartitionOffsetMap.isEmpty()) {
    LOG.info("cache partition offset map: {}", cachePartitionOffsetMap.toString());
    for(Integer partition : partitionOffsetMap.keySet()) {
        if (cachePartitionOffsetMap.containsKey(partition)) {
            long cacheOffset = cachePartitionOffsetMap.get(partition);
            // 存儲(chǔ)最新 offset昏苏,作為程序退出依據(jù)
            if (cacheOffset >= partitionOffsetMap.get(partition)) {
                cacheHasConsumedPartitionMap.put(partition, true);
            }
        }
    }
    // 每個(gè)批次都存起來(lái)
    cachePartitionOffsetMap.forEach((key, value) -> setPersistOffset(key, value));
}
// 退出條件
if (cacheHasConsumedPartitionMap.size() == partitionOffsetMap.size()) {
    isEndToConsume = true;
}
// 程序觸發(fā)提交
consumer.commitSync();

運(yùn)行

java -cp /data/shopee/logtohdfs/logtohdfs-1.0-SNAPSHOT-jar-with-dependencies.jar com.xxx.bigdata.kafka.KafkaReader xxx_json.properties xxx

image.png

image.png

image.png

每個(gè) partition 對(duì)應(yīng)的 offset 都超過(guò)圖一尊沸,程序開(kāi)始初獲取的offset,則退出這次調(diào)度

待完善

  1. 多線程按批處理(目標(biāo)4)贤惯,其實(shí)也就是洼专,基于獲取全局的 topic 信息,分拆生成多個(gè)新的 consumer孵构,暫不實(shí)現(xiàn)屁商。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市颈墅,隨后出現(xiàn)的幾起案子蜡镶,更是在濱河造成了極大的恐慌雾袱,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,188評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件官还,死亡現(xiàn)場(chǎng)離奇詭異芹橡,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)望伦,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,464評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén)林说,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人屯伞,你說(shuō)我怎么就攤上這事腿箩。” “怎么了愕掏?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,562評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵度秘,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我饵撑,道長(zhǎng)剑梳,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,893評(píng)論 1 295
  • 正文 為了忘掉前任滑潘,我火速辦了婚禮垢乙,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘语卤。我一直安慰自己追逮,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,917評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布粹舵。 她就那樣靜靜地躺著钮孵,像睡著了一般。 火紅的嫁衣襯著肌膚如雪眼滤。 梳的紋絲不亂的頭發(fā)上巴席,一...
    開(kāi)封第一講書(shū)人閱讀 51,708評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音诅需,去河邊找鬼漾唉。 笑死,一個(gè)胖子當(dāng)著我的面吹牛堰塌,可吹牛的內(nèi)容都是我干的赵刑。 我是一名探鬼主播,決...
    沈念sama閱讀 40,430評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼场刑,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼般此!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,342評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤恤煞,失蹤者是張志新(化名)和其女友劉穎屎勘,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體居扒,經(jīng)...
    沈念sama閱讀 45,801評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡概漱,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,976評(píng)論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了喜喂。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片瓤摧。...
    茶點(diǎn)故事閱讀 40,115評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖玉吁,靈堂內(nèi)的尸體忽然破棺而出照弥,到底是詐尸還是另有隱情,我是刑警寧澤进副,帶...
    沈念sama閱讀 35,804評(píng)論 5 346
  • 正文 年R本政府宣布这揣,位于F島的核電站,受9級(jí)特大地震影響影斑,放射性物質(zhì)發(fā)生泄漏给赞。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,458評(píng)論 3 331
  • 文/蒙蒙 一矫户、第九天 我趴在偏房一處隱蔽的房頂上張望片迅。 院中可真熱鬧,春花似錦皆辽、人聲如沸柑蛇。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,008評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)耻台。三九已至,卻和暖如春空另,著一層夾襖步出監(jiān)牢的瞬間盆耽,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,135評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工痹换, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留征字,地道東北人都弹。 一個(gè)月前我還...
    沈念sama閱讀 48,365評(píng)論 3 373
  • 正文 我出身青樓娇豫,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親畅厢。 傳聞我的和親對(duì)象是個(gè)殘疾皇子冯痢,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,055評(píng)論 2 355