在 [Kafka 101 - 5] 圖文并茂地介紹 offset 概念 中我們介紹了消息的 offset 和消費(fèi)者的 offset含衔,并且提到消費(fèi)者的 offset 的維護(hù)方式是消費(fèi)者自己提交到一個特殊 topic癣漆,聽起來似乎很簡單岳颇,但實(shí)際這個提交 offset 的過程也有點(diǎn)內(nèi)容,所以本文來學(xué)習(xí) Kafka 中消費(fèi)者提交 offset 的五種方式胶滋。
需要明確的一點(diǎn)是睡汹,即使 Kafka 中維護(hù)了消費(fèi)者的 offset,消費(fèi)者仍然有可能重復(fù)消費(fèi)或者少消費(fèi)數(shù)據(jù)的相嵌,如果想要保證消費(fèi)數(shù)據(jù)的完全準(zhǔn)確腿时,不丟不重,即所謂的“Exactly Once”饭宾,需要使用別的機(jī)制來保證批糟,比如 Flink 的 checkpoint 機(jī)制,但這些機(jī)制也需要能夠正確的理解 offset 的提交捏雌。
內(nèi)容提要:
- 環(huán)境說明
- 自動提交
- 同步提交
- 異步提交
- 同步&異步結(jié)合
- 提交指定 offset
- 總結(jié)
1. 環(huán)境說明
操作系統(tǒng):MacOS/Linux
Kafka:本地安裝了社區(qū)版的 2.3.1 版本,運(yùn)行在 9092 端口
Kafka 的安裝使用可以參考:[Kafka 101-1] Kafka安裝使用
消費(fèi)者 Java API 的基本使用可以參考:[Kafka 101-3] 使用Java API消費(fèi)數(shù)據(jù)實(shí)戰(zhàn)
2. 自動提交
這是 offset 提交的默認(rèn)方式笆搓。
消費(fèi)者有一個參數(shù)叫做 enable.auto.commit
性湿,表示是否啟用 offset 的自動提交,默認(rèn)值為 true
满败,并且還有一個配套的參數(shù)叫做 auto.commit.interval.ms
肤频,表示自動提交 offset 的時間間隔,默認(rèn)值是 5000
算墨,即 5s 提交自動提交一次 offset宵荒,還記得消費(fèi)者的 poll 循環(huán)嗎?長這個樣子:
while (true) {
// consumer 是一個 KafkaConsumer對象
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
// 用 println 模擬數(shù)據(jù)數(shù)據(jù)過程
System.out.printf("消息內(nèi)容為:%s\n", record.value());
}
}
每當(dāng)調(diào)用 KafkaConsumer 對象的 poll
方法時净嘀,它會去檢查距離上一次提交 offset 是否已經(jīng)過去 5s报咳,如果夠 5s 的話,則會幫助你把這次 poll
操作消費(fèi)的最大 offset 提交給 Kafka broker挖藏。
注意:如果在自動提交了 offset 之后 3s 的時候程序因?yàn)槟撤N原因掛掉了暑刃,并且在這 3s 期間消費(fèi)了 10000 條數(shù)據(jù),那么當(dāng)程序重啟后膜眠,這 10000 條數(shù)據(jù)會被再次消費(fèi)岩臣,因?yàn)?Kafka 中記錄的是最后一次提交的 offset,程序重啟后會從這個 offset 開始繼續(xù)消費(fèi)宵膨,所以架谎,自動提交有重復(fù)消費(fèi)數(shù)據(jù)的風(fēng)險,而且我們完全無法控制辟躏,看起來不是很棒谷扣,所以在重視數(shù)據(jù)準(zhǔn)確性的場景中,都不會采用自動提交的方式捎琐。
下文中的方法都需要在構(gòu)造 KafkaConsumer 對象的時候傳入
auto.commit.offset
參數(shù)抑钟,并且設(shè)置為false
。
3. 同步提交
直接上代碼:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
// 用 println 模擬數(shù)據(jù)處理過程
System.out.printf("消息內(nèi)容為:%s\n", record.value());
}
// 處理完畢后提交 offset
try {
consumer.commitSync();
} catch (CommitFailedException e) {
// commitSync 會一直嘗試提交直到成功或者遇到不可恢復(fù)的錯誤
log.error("commit faile", e)
}
}
可以看到野哭,同步提交 offset 的方法叫做 commitSync()
在塔,這個方法會提交最近一次調(diào)用 poll
所消費(fèi)到的最大 offset,這上面的例子中拨黔,我們是把 commitSync()
放在了數(shù)據(jù)處理之后蛔溃,如果程序在數(shù)據(jù)處理過程中掛掉,這時已經(jīng)處理了一部分?jǐn)?shù)據(jù)(比如寫到了MySQL 中),那么重啟后會從上一次提交的 offset 繼續(xù)消費(fèi)贺待,之前已經(jīng)寫入 MySQL 的數(shù)據(jù)會被再處理一次徽曲,因此MySQL 中的數(shù)據(jù)會重復(fù)。
那么如果我們把 commitSync
放在數(shù)據(jù)處理的代碼之前呢麸塞,答案是數(shù)據(jù)有可能丟失秃臣,因?yàn)檫@種情況下的問題是,可能已經(jīng)提交了 offset哪工,但是數(shù)據(jù)處理過程沒有完成奥此,故障重啟之后只能消費(fèi)到新數(shù)據(jù),重啟前沒有來得及處理的數(shù)據(jù)也處理不到了雁比。
所以稚虎,同步提交的方式,有可能造成數(shù)據(jù)重復(fù)偎捎,也有可能造成數(shù)據(jù)丟失蠢终,取決于 commitSync()
方法的位置。
4. 異步提交
有個同步茴她,就有個異步寻拂,同步相比異步的不足是,提交 offset 的時候程序會阻塞住丈牢,限制了消費(fèi)吞吐量兜喻,因此就有了異步提交,上代碼:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
// 用 println 模擬數(shù)據(jù)處理過程
System.out.printf("消息內(nèi)容為:%s\n", record.value());
}
// 處理完畢后提交 offset
consumer.commitAsync();
}
調(diào)用了 commitAsync
方法后赡麦,程序會繼續(xù)執(zhí)行朴皆,確實(shí)可以提高吞吐量,但是沒有只有好處沒有壞處的事泛粹,上面提到遂铡,提交 offset 是有可能失敗的,同步提交方法會一直嘗試提交晶姊,要么成功扒接,要么遇到不可恢復(fù)的錯誤拋異常,但是異步提交方法不會重試们衙,為什么不重試呢钾怔?因?yàn)槿绻怀晒鸵恢敝卦嚕锌赡芨乱淮蔚?offset 提交都已經(jīng)完成了蒙挑,如果重試成功了宗侦,反而會把更新的 offset 給覆蓋掉,造成重復(fù)忆蚀,所以干脆不重試矾利。
異步的方法一般都一個回調(diào)函數(shù)姑裂,這個也有:
consumer.commitAsyn(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
if (e != null)
log.error("Commit failed for offsets {}", offsets, e);
}
})
如果想要在異步提交失敗的時候重試,《Kafka 權(quán)威指南》給了一種方法男旗,維護(hù)一個單調(diào)遞增的序列號舶斧,每提交一次,遞增一下這個序列號察皇,并把序列號傳給回調(diào)函數(shù)茴厉,當(dāng)在回調(diào)函數(shù)中發(fā)現(xiàn)失敗時,比較一下回調(diào)函數(shù)中的序列號和當(dāng)前序列號的大小什荣,如果當(dāng)前的序列號已經(jīng)比回調(diào)函數(shù)的序列號大矾缓,那就不用重新提交 offset 了,因?yàn)橐呀?jīng)有一個更新的 offset 在提交了溃睹。
我覺得按照這個說法而账,代碼可能長這個樣子(聲明:沒有驗(yàn)證過):
// 維護(hù)全局序列號
int global_seq = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("消息內(nèi)容為:%s\n", record.value());
}
// 異步提交 offset 時傳給回調(diào)函數(shù)
consumer.commitAsync(new MyOffsetCommitCallback(global_seq++));
}
// 私有類實(shí)現(xiàn) OffsetCommitCallback 接口
private class MyOffsetCommitCallback implements OffsetCommitCallback {
private int seq;
// 回調(diào)函數(shù)初始化時記錄當(dāng)前序列號
MyOffsetCommitCallback(int global_seq) {
this.seq = global_seq;
}
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
if (exception != nul) {
// 兩者相等表示還沒有更新的 offset 在提交胰坟,可以重試
if (this.seq == global_seq) {
// 在這里重試因篇,怎么重試你來想吧
}
}
}
}
5. 同步&異步結(jié)合
其實(shí)偶爾有一兩次提交 offset 失敗問題不大,因?yàn)橹灰罄m(xù)有 offset 提交成功了笔横,之前的失敗可以忽略的竞滓。但是如果知道這是最后一次提交了,那么還是有必要確保這次提交能成功的吹缔。所以一種常用的提交 offset 的方式商佑,是同時使用同步提交和異步提交,它可以兼顧效率和可靠性(數(shù)據(jù)準(zhǔn)確性)厢塘,上代碼:
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("消息內(nèi)容為:%s\n", record.value());
}
// 即使失敗也不要緊茶没,要么有下一次異步的提交,要么有關(guān)閉前的最后一次提交
consumer.commitAsync();
}
// 捕獲處理不了的異常
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
// 關(guān)閉 consumer 前最后一次提交使用同步的方式晚碾,最大程度的確保成功
consumer.commitSync();
} finally {
consumer.close();
}
}
簡單的解釋一下代碼:poll
循環(huán)里用異步提交抓半,效率高,整個 poll
循環(huán)捕獲到異常之后在關(guān)閉前進(jìn)行一次同步提交格嘁,穩(wěn)妥笛求,保證最新的 offset能被提交(當(dāng)然,如果是不可恢復(fù)的異常糕簿,比如 Kafka 宕機(jī)探入,發(fā)生這種異常是無法提交成功的)。
6. 提交指定 offset
上面的這些提交方式懂诗,都是 poll
一次蜂嗽,提交一次,其實(shí)還可以 poll
一次殃恒,提交多次徒爹,比如每處理一條數(shù)據(jù)荚醒,提交一次,上代碼:
// 引入由 TopPartition 和 OffsetAndMetadata 組成的 Map 類型
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.printf("消息內(nèi)容為:%s\n", record.value());
// 更新當(dāng)前分區(qū)的 offset
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));
// 這里用了異步提交隆嗅,也可以用同步提交
// consumer.commitSync(currentOffsets);
consumer.commitAsync(currentOffsets, null);
}
}
解釋一下:同步提交的方法和異步提交的方法都可以接受Map<TopicPartition, OffsetAndMetadata>
類型的參數(shù)界阁,提交這個參數(shù)指定的 offset,而不是通過 poll
方法消費(fèi)的最大 offset胖喳。
7. 總結(jié)
各種 offset 的提交方式和優(yōu)缺點(diǎn)總結(jié)如下:
提交方式 | 優(yōu)點(diǎn)&缺點(diǎn) |
---|---|
自動提交 | 可能有大量重復(fù)消費(fèi)泡躯,不受控制 |
同步提交 | 效率低,也有可能重復(fù)消費(fèi)丽焊, 但比自動提交少 |
異步提交 | 調(diào)用效率高 |
同步&異步結(jié)合 | 高吞吐量较剃,且可靠,可能有少量的重復(fù)消費(fèi)(推薦) |
提交指定 offset | 可以比其它方式更加頻繁的提交技健,仍然有可能重復(fù) |
歡迎交流討論写穴,吐槽建議。
勤學(xué)似春起之苗雌贱,不見其增啊送,日有所長
輟學(xué)如磨刀之石,不見其損欣孤,日有所虧
關(guān)注【大數(shù)據(jù)學(xué)徒】馋没,用技術(shù)干貨助你日有所長