[Kafka 101-6] “茴字有五種寫法”之 offset 的提交

[Kafka 101 - 5] 圖文并茂地介紹 offset 概念 中我們介紹了消息的 offset 和消費(fèi)者的 offset含衔,并且提到消費(fèi)者的 offset 的維護(hù)方式是消費(fèi)者自己提交到一個特殊 topic癣漆,聽起來似乎很簡單岳颇,但實(shí)際這個提交 offset 的過程也有點(diǎn)內(nèi)容,所以本文來學(xué)習(xí) Kafka 中消費(fèi)者提交 offset 的五種方式胶滋。

需要明確的一點(diǎn)是睡汹,即使 Kafka 中維護(hù)了消費(fèi)者的 offset,消費(fèi)者仍然有可能重復(fù)消費(fèi)或者少消費(fèi)數(shù)據(jù)的相嵌,如果想要保證消費(fèi)數(shù)據(jù)的完全準(zhǔn)確腿时,不丟不重,即所謂的“Exactly Once”饭宾,需要使用別的機(jī)制來保證批糟,比如 Flink 的 checkpoint 機(jī)制,但這些機(jī)制也需要能夠正確的理解 offset 的提交捏雌。

內(nèi)容提要:

  1. 環(huán)境說明
  2. 自動提交
  3. 同步提交
  4. 異步提交
  5. 同步&異步結(jié)合
  6. 提交指定 offset
  7. 總結(jié)

1. 環(huán)境說明

操作系統(tǒng):MacOS/Linux
Kafka:本地安裝了社區(qū)版的 2.3.1 版本,運(yùn)行在 9092 端口

Kafka 的安裝使用可以參考:[Kafka 101-1] Kafka安裝使用
消費(fèi)者 Java API 的基本使用可以參考:[Kafka 101-3] 使用Java API消費(fèi)數(shù)據(jù)實(shí)戰(zhàn)

2. 自動提交

這是 offset 提交的默認(rèn)方式笆搓。

消費(fèi)者有一個參數(shù)叫做 enable.auto.commit 性湿,表示是否啟用 offset 的自動提交,默認(rèn)值為 true满败,并且還有一個配套的參數(shù)叫做 auto.commit.interval.ms肤频,表示自動提交 offset 的時間間隔,默認(rèn)值是 5000算墨,即 5s 提交自動提交一次 offset宵荒,還記得消費(fèi)者的 poll 循環(huán)嗎?長這個樣子:

while (true) {
  // consumer 是一個 KafkaConsumer對象
  ConsumerRecords<String, String> records = consumer.poll(100);
  for (ConsumerRecord<String, String> record : records) {
    // 用 println 模擬數(shù)據(jù)數(shù)據(jù)過程
    System.out.printf("消息內(nèi)容為:%s\n", record.value());
  }
}

每當(dāng)調(diào)用 KafkaConsumer 對象的 poll 方法時净嘀,它會去檢查距離上一次提交 offset 是否已經(jīng)過去 5s报咳,如果夠 5s 的話,則會幫助你把這次 poll 操作消費(fèi)的最大 offset 提交給 Kafka broker挖藏。

注意:如果在自動提交了 offset 之后 3s 的時候程序因?yàn)槟撤N原因掛掉了暑刃,并且在這 3s 期間消費(fèi)了 10000 條數(shù)據(jù),那么當(dāng)程序重啟后膜眠,這 10000 條數(shù)據(jù)會被再次消費(fèi)岩臣,因?yàn)?Kafka 中記錄的是最后一次提交的 offset,程序重啟后會從這個 offset 開始繼續(xù)消費(fèi)宵膨,所以架谎,自動提交有重復(fù)消費(fèi)數(shù)據(jù)的風(fēng)險,而且我們完全無法控制辟躏,看起來不是很棒谷扣,所以在重視數(shù)據(jù)準(zhǔn)確性的場景中,都不會采用自動提交的方式捎琐。

下文中的方法都需要在構(gòu)造 KafkaConsumer 對象的時候傳入 auto.commit.offset 參數(shù)抑钟,并且設(shè)置為 false

3. 同步提交

直接上代碼:

while (true) {
  ConsumerRecords<String, String> records = consumer.poll(100);
  for (ConsumerRecord<String, String> record : records) {
    // 用 println 模擬數(shù)據(jù)處理過程
    System.out.printf("消息內(nèi)容為:%s\n", record.value());
  }
  // 處理完畢后提交 offset
  try {
    consumer.commitSync();
  } catch (CommitFailedException e) {
    // commitSync 會一直嘗試提交直到成功或者遇到不可恢復(fù)的錯誤
    log.error("commit faile", e)
  }
}

可以看到野哭,同步提交 offset 的方法叫做 commitSync()在塔,這個方法會提交最近一次調(diào)用 poll 所消費(fèi)到的最大 offset,這上面的例子中拨黔,我們是把 commitSync() 放在了數(shù)據(jù)處理之后蛔溃,如果程序在數(shù)據(jù)處理過程中掛掉,這時已經(jīng)處理了一部分?jǐn)?shù)據(jù)(比如寫到了MySQL 中),那么重啟后會從上一次提交的 offset 繼續(xù)消費(fèi)贺待,之前已經(jīng)寫入 MySQL 的數(shù)據(jù)會被再處理一次徽曲,因此MySQL 中的數(shù)據(jù)會重復(fù)。

那么如果我們把 commitSync 放在數(shù)據(jù)處理的代碼之前呢麸塞,答案是數(shù)據(jù)有可能丟失秃臣,因?yàn)檫@種情況下的問題是,可能已經(jīng)提交了 offset哪工,但是數(shù)據(jù)處理過程沒有完成奥此,故障重啟之后只能消費(fèi)到新數(shù)據(jù),重啟前沒有來得及處理的數(shù)據(jù)也處理不到了雁比。

所以稚虎,同步提交的方式,有可能造成數(shù)據(jù)重復(fù)偎捎,也有可能造成數(shù)據(jù)丟失蠢终,取決于 commitSync() 方法的位置

4. 異步提交

有個同步茴她,就有個異步寻拂,同步相比異步的不足是,提交 offset 的時候程序會阻塞住丈牢,限制了消費(fèi)吞吐量兜喻,因此就有了異步提交,上代碼:

while (true) {
  ConsumerRecords<String, String> records = consumer.poll(100);
  for (ConsumerRecord<String, String> record : records) {
    // 用 println 模擬數(shù)據(jù)處理過程
    System.out.printf("消息內(nèi)容為:%s\n", record.value());
  }
  // 處理完畢后提交 offset
  consumer.commitAsync();
}

調(diào)用了 commitAsync 方法后赡麦,程序會繼續(xù)執(zhí)行朴皆,確實(shí)可以提高吞吐量,但是沒有只有好處沒有壞處的事泛粹,上面提到遂铡,提交 offset 是有可能失敗的,同步提交方法會一直嘗試提交晶姊,要么成功扒接,要么遇到不可恢復(fù)的錯誤拋異常,但是異步提交方法不會重試们衙,為什么不重試呢钾怔?因?yàn)槿绻怀晒鸵恢敝卦嚕锌赡芨乱淮蔚?offset 提交都已經(jīng)完成了蒙挑,如果重試成功了宗侦,反而會把更新的 offset 給覆蓋掉,造成重復(fù)忆蚀,所以干脆不重試矾利。

異步的方法一般都一個回調(diào)函數(shù)姑裂,這個也有:

consumer.commitAsyn(new OffsetCommitCallback() {
    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
        if (e != null)
            log.error("Commit failed for offsets {}", offsets, e);
    }
})

如果想要在異步提交失敗的時候重試,《Kafka 權(quán)威指南》給了一種方法男旗,維護(hù)一個單調(diào)遞增的序列號舶斧,每提交一次,遞增一下這個序列號察皇,并把序列號傳給回調(diào)函數(shù)茴厉,當(dāng)在回調(diào)函數(shù)中發(fā)現(xiàn)失敗時,比較一下回調(diào)函數(shù)中的序列號和當(dāng)前序列號的大小什荣,如果當(dāng)前的序列號已經(jīng)比回調(diào)函數(shù)的序列號大矾缓,那就不用重新提交 offset 了,因?yàn)橐呀?jīng)有一個更新的 offset 在提交了溃睹。

我覺得按照這個說法而账,代碼可能長這個樣子(聲明:沒有驗(yàn)證過):

// 維護(hù)全局序列號
int global_seq = 0;
while (true) {
  ConsumerRecords<String, String> records = consumer.poll(100);
  for (ConsumerRecord<String, String> record : records) {
    System.out.printf("消息內(nèi)容為:%s\n", record.value());
  }
  // 異步提交 offset 時傳給回調(diào)函數(shù)
  consumer.commitAsync(new MyOffsetCommitCallback(global_seq++));
}
// 私有類實(shí)現(xiàn) OffsetCommitCallback 接口
private class MyOffsetCommitCallback implements OffsetCommitCallback {
  private int seq;
  // 回調(diào)函數(shù)初始化時記錄當(dāng)前序列號
  MyOffsetCommitCallback(int global_seq) {
    this.seq = global_seq;
  }
  public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
        if (exception != nul) {
      // 兩者相等表示還沒有更新的 offset 在提交胰坟,可以重試
      if (this.seq == global_seq) {
        // 在這里重試因篇,怎么重試你來想吧
      }
    }
  }
}

5. 同步&異步結(jié)合

其實(shí)偶爾有一兩次提交 offset 失敗問題不大,因?yàn)橹灰罄m(xù)有 offset 提交成功了笔横,之前的失敗可以忽略的竞滓。但是如果知道這是最后一次提交了,那么還是有必要確保這次提交能成功的吹缔。所以一種常用的提交 offset 的方式商佑,是同時使用同步提交和異步提交,它可以兼顧效率和可靠性(數(shù)據(jù)準(zhǔn)確性)厢塘,上代碼:

try {
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
      System.out.printf("消息內(nèi)容為:%s\n", record.value());
        }
    // 即使失敗也不要緊茶没,要么有下一次異步的提交,要么有關(guān)閉前的最后一次提交
    consumer.commitAsync();
  }
// 捕獲處理不了的異常
} catch (Exception e) {
  log.error("Unexpected error", e);
} finally {
  try {
    // 關(guān)閉 consumer 前最后一次提交使用同步的方式晚碾,最大程度的確保成功
    consumer.commitSync();
  } finally {
    consumer.close();
  }
}

簡單的解釋一下代碼:poll 循環(huán)里用異步提交抓半,效率高,整個 poll 循環(huán)捕獲到異常之后在關(guān)閉前進(jìn)行一次同步提交格嘁,穩(wěn)妥笛求,保證最新的 offset能被提交(當(dāng)然,如果是不可恢復(fù)的異常糕簿,比如 Kafka 宕機(jī)探入,發(fā)生這種異常是無法提交成功的)。

6. 提交指定 offset

上面的這些提交方式懂诗,都是 poll 一次蜂嗽,提交一次,其實(shí)還可以 poll 一次殃恒,提交多次徒爹,比如每處理一條數(shù)據(jù)荚醒,提交一次,上代碼:

// 引入由 TopPartition 和 OffsetAndMetadata 組成的 Map 類型
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
while (true) {
  ConsumerRecords<String, String> records = consumer.poll(100);
  for (ConsumerRecord<String, String> record : records)
  {
    System.out.printf("消息內(nèi)容為:%s\n", record.value());
    // 更新當(dāng)前分區(qū)的 offset
    currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));
    // 這里用了異步提交隆嗅,也可以用同步提交
    // consumer.commitSync(currentOffsets);
    consumer.commitAsync(currentOffsets, null);
  }
}

解釋一下:同步提交的方法和異步提交的方法都可以接受Map<TopicPartition, OffsetAndMetadata> 類型的參數(shù)界阁,提交這個參數(shù)指定的 offset,而不是通過 poll 方法消費(fèi)的最大 offset胖喳。

7. 總結(jié)

各種 offset 的提交方式和優(yōu)缺點(diǎn)總結(jié)如下:

提交方式 優(yōu)點(diǎn)&缺點(diǎn)
自動提交 可能有大量重復(fù)消費(fèi)泡躯,不受控制
同步提交 效率低,也有可能重復(fù)消費(fèi)丽焊, 但比自動提交少
異步提交 調(diào)用效率高
同步&異步結(jié)合 高吞吐量较剃,且可靠,可能有少量的重復(fù)消費(fèi)(推薦)
提交指定 offset 可以比其它方式更加頻繁的提交技健,仍然有可能重復(fù)

歡迎交流討論写穴,吐槽建議。

勤學(xué)似春起之苗雌贱,不見其增啊送,日有所長
輟學(xué)如磨刀之石,不見其損欣孤,日有所虧
關(guān)注【大數(shù)據(jù)學(xué)徒】馋没,用技術(shù)干貨助你日有所長

大數(shù)據(jù)學(xué)徒

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市降传,隨后出現(xiàn)的幾起案子篷朵,更是在濱河造成了極大的恐慌,老刑警劉巖婆排,帶你破解...
    沈念sama閱讀 206,482評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件声旺,死亡現(xiàn)場離奇詭異,居然都是意外死亡段只,警方通過查閱死者的電腦和手機(jī)腮猖,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,377評論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來翼悴,“玉大人缚够,你說我怎么就攤上這事○惺辏” “怎么了谍椅?”我有些...
    開封第一講書人閱讀 152,762評論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長古话。 經(jīng)常有香客問我雏吭,道長,這世上最難降的妖魔是什么陪踩? 我笑而不...
    開封第一講書人閱讀 55,273評論 1 279
  • 正文 為了忘掉前任杖们,我火速辦了婚禮悉抵,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘摘完。我一直安慰自己姥饰,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,289評論 5 373
  • 文/花漫 我一把揭開白布孝治。 她就那樣靜靜地躺著列粪,像睡著了一般。 火紅的嫁衣襯著肌膚如雪谈飒。 梳的紋絲不亂的頭發(fā)上岂座,一...
    開封第一講書人閱讀 49,046評論 1 285
  • 那天,我揣著相機(jī)與錄音杭措,去河邊找鬼费什。 笑死,一個胖子當(dāng)著我的面吹牛手素,可吹牛的內(nèi)容都是我干的鸳址。 我是一名探鬼主播,決...
    沈念sama閱讀 38,351評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼刑桑,長吁一口氣:“原來是場噩夢啊……” “哼氯质!你這毒婦竟也來了募舟?” 一聲冷哼從身側(cè)響起祠斧,我...
    開封第一講書人閱讀 36,988評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎拱礁,沒想到半個月后琢锋,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,476評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡呢灶,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,948評論 2 324
  • 正文 我和宋清朗相戀三年吴超,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片鸯乃。...
    茶點(diǎn)故事閱讀 38,064評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡鲸阻,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出缨睡,到底是詐尸還是另有隱情鸟悴,我是刑警寧澤,帶...
    沈念sama閱讀 33,712評論 4 323
  • 正文 年R本政府宣布奖年,位于F島的核電站细诸,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏陋守。R本人自食惡果不足惜震贵,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,261評論 3 307
  • 文/蒙蒙 一利赋、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧猩系,春花似錦媚送、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,264評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至幽纷,卻和暖如春式塌,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背友浸。 一陣腳步聲響...
    開封第一講書人閱讀 31,486評論 1 262
  • 我被黑心中介騙來泰國打工峰尝, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人收恢。 一個月前我還...
    沈念sama閱讀 45,511評論 2 354
  • 正文 我出身青樓武学,卻偏偏與公主長得像,于是被迫代替她去往敵國和親伦意。 傳聞我的和親對象是個殘疾皇子火窒,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,802評論 2 345

推薦閱讀更多精彩內(nèi)容