Kafka offset管理

Kafka中的每個partition都由一系列有序的猜嘱、不可變的消息組成毅整,這些消息被連續(xù)的追加到partition中映胁。partition中的每個消息都有一個連續(xù)的序號预侯,用于partition唯一標(biāo)識一條消息致开。

Offset記錄著下一條將要發(fā)送給Consumer的消息的序號。

Offset從語義上來看擁有兩種:Current Offset和Committed Offset萎馅。

Current Offset

Current Offset保存在Consumer客戶端中双戳,它表示Consumer希望收到的下一條消息的序號。它僅僅在poll()方法中使用糜芳。例如飒货,Consumer第一次調(diào)用poll()方法后收到了20條消息魄衅,那么Current Offset就被設(shè)置為20。這樣Consumer下一次調(diào)用poll()方法時塘辅,Kafka就知道應(yīng)該從序號為21的消息開始讀取晃虫。這樣就能夠保證每次Consumer poll消息時,都能夠收到不重復(fù)的消息扣墩。

Committed Offset

Committed Offset保存在Broker上哲银,它表示Consumer已經(jīng)確認(rèn)消費過的消息的序號。主要通過commitSynccommitAsync
API來操作呻惕。舉個例子荆责,Consumer通過poll() 方法收到20條消息后,此時Current Offset就是20亚脆,經(jīng)過一系列的邏輯處理后做院,并沒有調(diào)用consumer.commitAsync()consumer.commitSync()來提交Committed Offset,那么此時Committed Offset依舊是0濒持。

Committed Offset主要用于Consumer Rebalance键耕。在Consumer Rebalance的過程中,一個partition被分配給了一個Consumer弥喉,那么這個Consumer該從什么位置開始消費消息呢郁竟?答案就是Committed Offset。另外由境,如果一個Consumer消費了5條消息(poll并且成功commitSync)之后宕機(jī)了棚亩,重新啟動之后它仍然能夠從第6條消息開始消費,因為Committed Offset已經(jīng)被Kafka記錄為5虏杰。

總結(jié)一下讥蟆,Current Offset是針對Consumer的poll過程的,它可以保證每次poll都返回不重復(fù)的消息纺阔;而Committed Offset是用于Consumer Rebalance過程的瘸彤,它能夠保證新的Consumer能夠從正確的位置開始消費一個partition,從而避免重復(fù)消費笛钝。

在Kafka 0.9前质况,Committed Offset信息保存在zookeeper的[consumers/{group}/offsets/{topic}/{partition}]目錄中(zookeeper其實并不適合進(jìn)行大批量的讀寫操作,尤其是寫操作)玻靡。而在0.9之后结榄,所有的offset信息都保存在了Broker上的一個名為__consumer_offsets的topic中。

Kafka集群中offset的管理都是由Group Coordinator中的Offset Manager完成的囤捻。

Group Coordinator

Group Coordinator是運行在Kafka集群中每一個Broker內(nèi)的一個進(jìn)程臼朗。它主要負(fù)責(zé)Consumer Group的管理,Offset位移管理以及Consumer Rebalance

對于每一個Consumer Group视哑,Group Coordinator都會存儲以下信息:

  • 訂閱的topics列表
  • Consumer Group配置信息绣否,包括session timeout等
  • 組中每個Consumer的元數(shù)據(jù)。包括主機(jī)名挡毅,consumer id
  • 每個Group正在消費的topic partition的當(dāng)前offsets
  • Partition的ownership元數(shù)據(jù)蒜撮,包括consumer消費的partitions映射關(guān)系

Consumer Group如何確定自己的coordinator是誰呢? 簡單來說分為兩步:

  1. 確定Consumer Group offset信息將要寫入__consumers_offsets topic的哪個分區(qū)慷嗜。具體計算公式:
__consumers_offsets partition# = Math.abs(groupId.hashCode() % offsets.topic.num.partitions)  //offsets.topic.num.partitions默認(rèn)值為50淀弹。
  1. 該分區(qū)leader所在的broker就是被選定的coordinator

Offset存儲模型

由于一個partition只能固定的交給一個消費者組中的一個消費者消費,因此Kafka保存offset時并不直接為每個消費者保存庆械,而是以groupid-topic-partition -> offset的方式保存薇溃。如圖所示:


group-offset.png

Kafka在保存Offset的時候,實際上是將Consumer Group和partition對應(yīng)的offset以消息的方式保存在__consumers_offsets這個topic中缭乘。

__consumers_offsets默認(rèn)擁有50個partition沐序,可以通過

Math.abs(groupId.hashCode() % offsets.topic.num.partitions) 

的方式來查詢某個Consumer Group的offset信息保存在__consumers_offsets的哪個partition中。下圖展示了__consumers_offsets中保存的offset消息的格式:


__consumers_offsets.png
__consumers_offsets_data.png

如圖所示堕绩,一條offset消息的格式為groupid-topic-partition -> offset策幼。因此consumer poll消息時,已知groupid和topic奴紧,又通過Coordinator分配partition的方式獲得了對應(yīng)的partition特姐,自然能夠通過Coordinator查找__consumers_offsets的方式獲得最新的offset了。

Offset查詢

前面我們已經(jīng)描述過offset的存儲模型黍氮,它是按照groupid-topic-partition -> offset的方式存儲的唐含。然而Kafka只提供了根據(jù)offset讀取消息的模型,并不支持根據(jù)key讀取消息的方式沫浆。那么Kafka是如何支持Offset的查詢呢捷枯?

答案就是Offsets Cache!专执!

Offsets Cache.JPG

如圖所示淮捆,Consumer提交offset時,Kafka Offset Manager會首先追加一條條新的commit消息到__consumers_offsets topic中本股,然后更新對應(yīng)的緩存攀痊。讀取offset時從緩存中讀取,而不是直接讀取__consumers_offsets這個topic拄显。

Log Compaction

我們已經(jīng)知道蚕苇,Kafka使用groupid-topic-partition -> offset*的消息格式,將Offset信息存儲在__consumers_offsets topic中凿叠。請看下面一個例子:

__consumers_offsets.JPG

如圖,對于audit-consumer這個Consumer Group來說,上面的存儲了兩條具有相同key的記錄:PageViewEvent-0 -> 240PageViewEvent-0 -> 323盒件。事實上蹬碧,這就是一種無用的冗余。因為對于一個partition來說炒刁,我們實際上只需要它當(dāng)前最新的Offsets恩沽。因此這條舊的PageViewEvent-0 -> 240記錄事實上是無用的。

為了消除這樣的過期數(shù)據(jù)翔始,Kafka為__consumers_offsets topic設(shè)置了Log Compaction功能罗心。Log Compaction意味著對于有相同key的的不同value值,只保留最后一個版本城瞎。如果應(yīng)用只關(guān)心key對應(yīng)的最新value值渤闷,可以開啟Kafka的Log Compaction功能,Kafka會定期將相同key的消息進(jìn)行合并脖镀,只保留最新的value值飒箭。

這張圖片生動的闡述了Log Compaction的過程:


Log Compaction.JPG

下圖闡釋了__consumers_offsets topic中的數(shù)據(jù)在Log Compaction下的變化:


Log Compaction for __consumers_offsets.JPG

在新建topic時添加log.cleanup.policy=compact參數(shù)就可以為topic開啟Log Compaction功能。

auto.offset.reset參數(shù)

auto.offset.reset表示如果Kafka中沒有存儲對應(yīng)的offset信息的話(有可能offset信息被刪除)蜒灰,消費者從何處開始消費消息弦蹂。它擁有三個可選值:

  • earliest:從最早的offset開始消費
  • latest:從最后的offset開始消費
  • none:直接拋出exception給consumer

看一下下面兩個場景:

  1. Consumer消費了5條消息后宕機(jī)了,重啟之后它讀取到對應(yīng)的partition的Committed Offset為5强窖,因此會直接從第6條消息開始讀取凸椿。此時完全依賴于Committed Offset機(jī)制,和auto.offset.reset配置完全無關(guān)翅溺。

  2. 新建了一個新的Group脑漫,并添加了一個Consumer,它訂閱了一個已經(jīng)存在的Topic未巫。此時Kafka中還沒有這個Consumer相應(yīng)的Offset信息窿撬,因此此時Kafka就會根據(jù)auto.offset.reset配置來決定這個Consumer從何處開始消費消息。

參考文章

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末叙凡,一起剝皮案震驚了整個濱河市劈伴,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌握爷,老刑警劉巖跛璧,帶你破解...
    沈念sama閱讀 217,907評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異新啼,居然都是意外死亡追城,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,987評論 3 395
  • 文/潘曉璐 我一進(jìn)店門燥撞,熙熙樓的掌柜王于貴愁眉苦臉地迎上來座柱,“玉大人迷帜,你說我怎么就攤上這事∩矗” “怎么了戏锹?”我有些...
    開封第一講書人閱讀 164,298評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長火诸。 經(jīng)常有香客問我锦针,道長,這世上最難降的妖魔是什么置蜀? 我笑而不...
    開封第一講書人閱讀 58,586評論 1 293
  • 正文 為了忘掉前任奈搜,我火速辦了婚禮,結(jié)果婚禮上盯荤,老公的妹妹穿的比我還像新娘馋吗。我一直安慰自己,他們只是感情好廷雅,可當(dāng)我...
    茶點故事閱讀 67,633評論 6 392
  • 文/花漫 我一把揭開白布耗美。 她就那樣靜靜地躺著,像睡著了一般航缀。 火紅的嫁衣襯著肌膚如雪商架。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,488評論 1 302
  • 那天芥玉,我揣著相機(jī)與錄音蛇摸,去河邊找鬼。 笑死灿巧,一個胖子當(dāng)著我的面吹牛赶袄,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播抠藕,決...
    沈念sama閱讀 40,275評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼饿肺,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了盾似?” 一聲冷哼從身側(cè)響起敬辣,我...
    開封第一講書人閱讀 39,176評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎零院,沒想到半個月后溉跃,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,619評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡告抄,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,819評論 3 336
  • 正文 我和宋清朗相戀三年撰茎,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片打洼。...
    茶點故事閱讀 39,932評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡龄糊,死狀恐怖逆粹,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情绎签,我是刑警寧澤枯饿,帶...
    沈念sama閱讀 35,655評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站诡必,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏搔扁。R本人自食惡果不足惜爸舒,卻給世界環(huán)境...
    茶點故事閱讀 41,265評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望稿蹲。 院中可真熱鬧扭勉,春花似錦、人聲如沸苛聘。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,871評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽设哗。三九已至唱捣,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間网梢,已是汗流浹背震缭。 一陣腳步聲響...
    開封第一講書人閱讀 32,994評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留战虏,地道東北人拣宰。 一個月前我還...
    沈念sama閱讀 48,095評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像烦感,于是被迫代替她去往敵國和親巡社。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,884評論 2 354

推薦閱讀更多精彩內(nèi)容