[TOC]
Kafka中的每個partition都由一系列有序的趟薄、不可變的消息組成,這些消息被連續(xù)的追加到partition中呈枉。partition中的每個消息都有一個連續(xù)的序號挂签,用于partition唯一標(biāo)識一條消息谈为。
Offset記錄著下一條將要發(fā)送給Consumer的消息的序號。
Offset從語義上來看擁有兩種:Current Offset和Committed Offset瓤逼。
Current Offset
Current Offset保存在Consumer客戶端中笼吟,它表示Consumer希望收到的下一條消息的序號。它僅僅在poll()方法中使用霸旗。例如贷帮,Consumer第一次調(diào)用poll()方法后收到了20條消息,那么Current Offset就被設(shè)置為20诱告。這樣Consumer下一次調(diào)用poll()方法時撵枢,Kafka就知道應(yīng)該從序號為21的消息開始讀取。這樣就能夠保證每次Consumer poll消息時精居,都能夠收到不重復(fù)的消息诲侮。
Committed Offset
Committed Offset保存在Broker上,它表示Consumer已經(jīng)確認(rèn)消費過的消息的序號箱蟆。主要通過commitSync和commitAsyncAPI來操作沟绪。舉個例子,Consumer通過poll() 方法收到20條消息后空猜,此時Current Offset就是20绽慈,經(jīng)過一系列的邏輯處理后,并沒有調(diào)用consumer.commitAsync()或consumer.commitSync()來提交Committed Offset辈毯,那么此時Committed Offset依舊是0坝疼。
Committed Offset主要用于Consumer Rebalance。在Consumer Rebalance的過程中谆沃,一個partition被分配給了一個Consumer钝凶,那么這個Consumer該從什么位置開始消費消息呢?答案就是Committed Offset唁影。另外耕陷,如果一個Consumer消費了5條消息(poll并且成功commitSync)之后宕機(jī)了,重新啟動之后它仍然能夠從第6條消息開始消費据沈,因為Committed Offset已經(jīng)被Kafka記錄為5哟沫。
總結(jié)一下,Current Offset是針對Consumer的poll過程的锌介,它可以保證每次poll都返回不重復(fù)的消息嗜诀;而Committed Offset是用于Consumer Rebalance過程的猾警,它能夠保證新的Consumer能夠從正確的位置開始消費一個partition,從而避免重復(fù)消費隆敢。
在Kafka 0.9前发皿,Committed Offset信息保存在zookeeper的[consumers/{group}/offsets/{topic}/{partition}]目錄中(zookeeper其實并不適合進(jìn)行大批量的讀寫操作,尤其是寫操作)拂蝎。而在0.9之后雳窟,所有的offset信息都保存在了Broker上的一個名為__consumer_offsets的topic中。
Kafka集群中offset的管理都是由Group Coordinator中的Offset Manager完成的匣屡。
Group Coordinator
Group Coordinator是運行在Kafka集群中每一個Broker內(nèi)的一個進(jìn)程封救。它主要負(fù)責(zé)Consumer Group的管理,Offset位移管理以及Consumer Rebalance捣作。
對于每一個Consumer Group誉结,Group Coordinator都會存儲以下信息:
- 訂閱的topics列表
- Consumer Group配置信息,包括session timeout等
- 組中每個Consumer的元數(shù)據(jù)券躁。包括主機(jī)名惩坑,consumer id
- 每個Group正在消費的topic partition的當(dāng)前offsets
- Partition的ownership元數(shù)據(jù),包括consumer消費的partitions映射關(guān)系
Consumer Group如何確定自己的coordinator是誰呢也拜? 簡單來說分為兩步:
確定Consumer Group offset信息將要寫入__consumers_offsets topic的哪個分區(qū)以舒。具體計算公式:
__consumers_offsets partition# = Math.abs(groupId.hashCode() % offsets.topic.num.partitions) //offsets.topic.num.partitions默認(rèn)值為50。
該分區(qū)leader所在的broker就是被選定的coordinator
Offset存儲模型
由于一個partition只能固定的交給一個消費者組中的一個消費者消費慢哈,因此Kafka保存offset時并不直接為每個消費者保存蔓钟,而是以groupid-topic-partition -> offset的方式保存。如圖所示:
Kafka在保存Offset的時候卵贱,實際上是將Consumer Group和partition對應(yīng)的offset以消息的方式保存在__consumers_offsets這個topic中滥沫。
__consumers_offsets默認(rèn)擁有50個partition,可以通過
Math.abs(groupId.hashCode() % offsets.topic.num.partitions)
的方式來查詢某個Consumer Group的offset信息保存在__consumers_offsets的哪個partition中键俱。下圖展示了__consumers_offsets中保存的offset消息的格式:
如圖所示兰绣,一條offset消息的格式為groupid-topic-partition -> offset。因此consumer poll消息時编振,已知groupid和topic缀辩,又通過Coordinator分配partition的方式獲得了對應(yīng)的partition,自然能夠通過Coordinator查找__consumers_offsets的方式獲得最新的offset了踪央。
Offset查詢
前面我們已經(jīng)描述過offset的存儲模型臀玄,它是按照groupid-topic-partition -> offset的方式存儲的。然而Kafka只提供了根據(jù)offset讀取消息的模型杯瞻,并不支持根據(jù)key讀取消息的方式镐牺。那么Kafka是如何支持Offset的查詢呢?
答案就是Offsets Cache?颉!
如圖所示,Consumer提交offset時旗唁,Kafka Offset Manager會首先追加一條條新的commit消息到__consumers_offsets topic中畦浓,然后更新對應(yīng)的緩存。讀取offset時從緩存中讀取检疫,而不是直接讀取__consumers_offsets這個topic讶请。
Log Compaction
我們已經(jīng)知道,Kafka使用groupid-topic-partition -> offset*的消息格式屎媳,將Offset信息存儲在__consumers_offsets topic中夺溢。請看下面一個例子:
如圖,對于audit-consumer這個Consumer Group來說烛谊,上面的存儲了兩條具有相同key的記錄:PageViewEvent-0 -> 240和PageViewEvent-0 -> 323风响。事實上,這就是一種無用的冗余丹禀。因為對于一個partition來說状勤,我們實際上只需要它當(dāng)前最新的Offsets。
為了消除這樣的過期數(shù)據(jù)双泪,Kafka為__consumers_offsets topic設(shè)置了Log Compaction功能持搜。Log Compaction意味著對于有相同key的的不同value值,只保留最后一個版本焙矛。如果應(yīng)用只關(guān)心key對應(yīng)的最新value值葫盼,可以開啟Kafka的Log Compaction功能,Kafka會定期將相同key的消息進(jìn)行合并村斟,只保留最新的value值剪返。
在新建topic時添加log.cleanup.policy=compact參數(shù)就可以為topic開啟Log Compaction功能。
auto.offset.reset參數(shù)
auto.offset.reset表示如果Kafka中沒有存儲對應(yīng)的offset信息的話(有可能offset信息被刪除)邓梅,消費者從何處開始消費消息脱盲。它擁有三個可選值:
- earliest:從最早的offset開始消費
- latest:從最后的offset開始消費
- none:直接拋出exception給consumer
看一下下面兩個場景:
- Consumer消費了5條消息后宕機(jī)了,重啟之后它讀取到對應(yīng)的partition的Committed Offset為5日缨,因此會直接從第6條消息開始讀取钱反。此時完全依賴于Committed Offset機(jī)制,和auto.offset.reset配置完全無關(guān)匣距。
- 新建了一個新的Group面哥,并添加了一個Consumer,它訂閱了一個已經(jīng)存在的Topic毅待。此時Kafka中還沒有這個Consumer相應(yīng)的Offset信息尚卫,因此此時Kafka就會根據(jù)auto.offset.reset配置來決定這個Consumer從何處開始消費消息。