圖片看不到可以看我的CSDN的博客:
https://blog.csdn.net/u013332124/article/details/83548706
一穗酥、GroupCoordinator 概念
GroupCoordinator是運(yùn)行在Broker上的一個(gè)服務(wù),用來(lái)管理Consumer Group的member和各個(gè)partition的消費(fèi)進(jìn)度,這里的member指的是我們的KafkaConsumer實(shí)例神年。
broker在啟動(dòng)的時(shí)候會(huì)啟動(dòng)一個(gè)GroupCoordinator實(shí)例。一個(gè)集群可能有多個(gè)broker芭梯,那么怎么確定一個(gè)新的Consumer要和哪個(gè)broker上的GroupCoordinator交互呢暖璧?
這就和kafka上的一個(gè)內(nèi)部使用的topic __consumer_offsets
有關(guān)系了。
__consumer_offsets
consumer_offsets是kafka內(nèi)部使用的一個(gè)topic琳袄,專(zhuān)門(mén)用來(lái)存儲(chǔ)group具體消費(fèi)的情況江场,默認(rèn)情況下,這個(gè)topic有50個(gè)partition窖逗,每個(gè)partition有3個(gè)副本址否。我們進(jìn)入某個(gè)broker的日志目錄,一般都能看到該topic對(duì)應(yīng)的partition目錄碎紊,如下圖:
Consumer如何找到對(duì)應(yīng)的GroupCoordinator
__consumer_offsets的會(huì)分布在各個(gè)broker佑附,當(dāng)一個(gè)新的Consumer要尋找和它交互的GroupCoordinator時(shí),需要先對(duì)它的GroupId進(jìn)行hash仗考,然后取模__consumer_offsets的partition數(shù)量音同,最后得到的值就是對(duì)應(yīng)partition,那么這個(gè)partition的leader所在的broker就是我們要交互的那個(gè)broker了秃嗜。獲取partition公式如下:
abs(GroupId.hashCode()) % NumPartitions
NumPartitions為_(kāi)_consumer_offsets的數(shù)量权均。GroupId為初始化Consumer時(shí)指定的groupId。
舉個(gè)例子锅锨,假設(shè)一個(gè)GroupId計(jì)算出來(lái)的hashcode是5螺句,之后取模50得到5。那么partition-5的leader所在的broker就是我們要找的那個(gè)節(jié)點(diǎn)橡类。這個(gè)Consumer后面都會(huì)直接和該broker上的GroupCoordinator交互蛇尚。
二、Consumer加入Group流程
Consumer在拉取數(shù)據(jù)之前顾画,必須加入某個(gè)group取劫,在consumer加入到group的整個(gè)流程中匆笤,主要涉及到了3種請(qǐng)求:
- GROUP_COORDINATOR請(qǐng)求
- JOIN_GROUP請(qǐng)求
- SYNC_GROUP請(qǐng)求
GROUP_COORDINATOR請(qǐng)求
前面我們知道了通過(guò)__consumer_offsets和對(duì)應(yīng)的公式可以算出要和哪臺(tái)broker上的GroupCoordinator做交互,但是我們并不知道__consumer_offsets的各個(gè)partition位于哪些broker上谱邪。比如我們通過(guò)公式算出了要和__consumer_offsets的partition-5所在的broker做交互炮捧,但是我們不知道它的partition-5的leader在哪個(gè)broker上。因此我們需要先往集群的一個(gè)broker發(fā)送一個(gè)GROUP_COORDINATOR請(qǐng)求來(lái)獲取對(duì)應(yīng)的brokerId惦银。
要往哪個(gè)broker發(fā)送GROUP_COORDINATOR請(qǐng)求也不是隨機(jī)選擇的咆课,kafka會(huì)默認(rèn)選擇一個(gè)當(dāng)前連接數(shù)最少的broker來(lái)發(fā)送該請(qǐng)求。這個(gè)連接數(shù)是指inFightRequest扯俱,也就是當(dāng)前客戶(hù)端發(fā)往broker還未返回的那些連接數(shù)量书蚪。
broker處理:
kafka的broker接收到GROUP_COORDINATOR請(qǐng)求后,會(huì)通過(guò)公式abs(GroupId.hashCode()) % NumPartitions
算出對(duì)應(yīng)的partition迅栅,然后搜索__consumer_offsets的metadata殊校,找到該partition leader所在的brokerId,最后返回給客戶(hù)端读存。
這里要注意一點(diǎn):
- 如果__consumer_offsets被刪除了或者還未創(chuàng)建为流,broker找不到對(duì)應(yīng)的metadata時(shí),會(huì)自動(dòng)創(chuàng)建一個(gè)新的名為_(kāi)_consumer_offsets的topic然后再查找對(duì)應(yīng)的brokerId让簿。
JOIN_GROUP請(qǐng)求
找到要交互的broker后敬察,客戶(hù)端就會(huì)往該broker發(fā)送 JOIN_GROUP請(qǐng)求了。
JOIN_GROUP請(qǐng)求主要是讓Consumer加入到指定的group中尔当,broker上的GroupCoordinator服務(wù)會(huì)管理group的各個(gè)Consumer静汤。
broker收到 JOIN_GROUP請(qǐng)求后,讓目標(biāo)group進(jìn)入 PreparingRebalance狀態(tài)居凶,等待一段時(shí)間后,返回一些信息藤抡,這些信息包括Consumer在group中對(duì)應(yīng)的memberId以及該group的leaderId侠碧、generationId(每次reblance都會(huì)+1)等等,如果對(duì)應(yīng)consumer是leader缠黍,那么還會(huì)將當(dāng)期組中所有的members信息返回給leader用于后面讓leader來(lái)分配各個(gè)member要消費(fèi)的partition(第一個(gè)加入該group的consumer就是該group的leader)弄兜。
Consumer收到broker返回的信息后,如果沒(méi)有錯(cuò)誤則表示已經(jīng)加入到該Group中了瓷式。接著繼續(xù)發(fā)送SYNC_GROUP請(qǐng)求替饿。
SYNC_GROUP請(qǐng)求
前面的JOIN_GROUP請(qǐng)求只是加入目標(biāo)group,還沒(méi)有真正的分配partiton贸典。SYNC_GROUP請(qǐng)求就是用于獲取consumer要消費(fèi)哪些partition用的视卢。
Consumer根據(jù)前面JOIN_GROUP請(qǐng)求的返回值,會(huì)判斷自己是否是leader廊驼,如果是leader据过,就直接獲取group中的所有members然后使用PartitionAssignor的實(shí)現(xiàn)類(lèi)來(lái)為group中的各個(gè)Consumer分配要消費(fèi)哪些partition惋砂,PartitionAssignor的默認(rèn)實(shí)現(xiàn)是RangeAssignor,也可以通過(guò)配置partition.assignment.strategy
來(lái)指定不同的分配策略绳锅。最后leader把分配好的信息封裝成SYNC_GROUP請(qǐng)求發(fā)送給broker西饵。
如果consumer是follower,就直接發(fā)送一個(gè)SYNC_GROUP請(qǐng)求給broker鳞芙。
broker收到SYNC_GROUP請(qǐng)求后眷柔,根據(jù)group中的leader給的分配信息在內(nèi)存中給每個(gè)member分配對(duì)應(yīng)的partiton,然后將這些信息返回給對(duì)應(yīng)的consumer原朝。
最后驯嘱,各個(gè)group中的consumer就得到了自己要消費(fèi)的partition,就可以開(kāi)始拉取數(shù)據(jù)了竿拆。
三宙拉、Group的狀態(tài)變更
對(duì)于一個(gè) Consumer Group,它會(huì)有五種狀態(tài):Dead丙笋、Empty谢澈、AwaitingSync、PreparingRebalance御板、Stable锥忿。狀態(tài)間的變更關(guān)系如下圖所示:
[圖片上傳失敗...(image-f69012-1542368543646)]
Empty狀態(tài)
Empty狀態(tài)表示該Consumer Group中沒(méi)有任何member。新建的Group都是處于這個(gè)狀態(tài)怠肋,它可能轉(zhuǎn)化為以下兩種狀態(tài)
- PreparingRebalance:如果有新的member加入敬鬓,狀態(tài)就會(huì)變更成PreparingRebalance,等待partition-rebalance開(kāi)始笙各。
- Dead:如果該group被移除掉钉答,狀態(tài)就會(huì)變成Dead。
PreparingRebalance狀態(tài)
PreparingRebalance狀態(tài)表示該Group正在等待partition-rebalance開(kāi)始杈抢。這個(gè)狀態(tài)存在的目的主要是為了等待所有的member都加入到該Group中了数尿,然后開(kāi)始進(jìn)行partition rebalance(也就是進(jìn)入AwaitingSync狀態(tài))。這樣就可以盡量保證在進(jìn)行partition reblance時(shí)惶楼,group中的member不會(huì)發(fā)送變動(dòng)右蹦。它可能轉(zhuǎn)化為以下三種狀態(tài):
- Dead:如果該group被移除掉,狀態(tài)就會(huì)變成Dead
- AwaitingSync:第一個(gè)發(fā)送PreparingRebalance請(qǐng)求的Consumer返回后歼捐,group的狀態(tài)就會(huì)變成AwaitingSync何陆,等待重新分配partition
- Empty:group中最后一個(gè)member離開(kāi)了,group重新變?yōu)镋mpty狀態(tài)
AwaitingSync狀態(tài)
AwaitingSync狀態(tài)表示該Group正在等待重新分配partition的結(jié)果豹储,partiton的分配是由member的leader來(lái)進(jìn)行的贷盲,等leader發(fā)來(lái)SYNC_GROUP請(qǐng)求,GroupCoordinator知道partiton的分配情況了剥扣,Group狀態(tài)就會(huì)變成Stable晃洒。它可能轉(zhuǎn)化為以下三種狀態(tài):
- Dead:如果該group被移除掉慨灭,狀態(tài)就會(huì)變成Dead
- PreparingRebalance:如果有新的member加入或者舊成員離開(kāi),狀態(tài)會(huì)重新變回PreparingRebalance球及,等待新的一輪partition分配
- Stable:partition分配完成氧骤,進(jìn)入Stable狀態(tài)
Stable狀態(tài)
Stable狀態(tài)表示目前Group已經(jīng)給各個(gè)Consumer分配好各自要消費(fèi)的partition了。只要Group沒(méi)有發(fā)生成員變動(dòng)或者member要消費(fèi)的元數(shù)據(jù)沒(méi)發(fā)送變動(dòng)(比如某topic的partition數(shù)量變更)吃引,狀態(tài)就會(huì)一直維持在Stable筹陵。它也可能轉(zhuǎn)化為以下兩種狀態(tài):
- Dead:如果該group被移除掉,狀態(tài)就會(huì)變成Dead
- PreparingRebalance:如果有新的member加入或者舊成員離開(kāi)镊尺,狀態(tài)會(huì)重新變回PreparingRebalance朦佩,等待新的一輪partition分配
Dead狀態(tài)
Dead狀態(tài)表示該Group已經(jīng)被移除掉了。如果__consumer_offsets的partition分布發(fā)生變動(dòng)庐氮,就會(huì)導(dǎo)致Group可能不屬于該broker上的GroupCoordinator管理语稠,GroupCoordinator就會(huì)移除Group。
正常consumer加入group中的狀態(tài)變動(dòng)情況
Empty —> PreparingRebalance —> AwaitingSync —> Stable
當(dāng)一個(gè)consumer發(fā)送 JOIN_GROUP請(qǐng)求要求加入一個(gè)新的group時(shí)弄砍,GroupCoordinator發(fā)現(xiàn)之前沒(méi)有這個(gè)group仙畦,就會(huì)新建一個(gè)group,此時(shí)該group的狀態(tài)為Empty音婶。之后由于有新成員加入慨畸,狀態(tài)迅速轉(zhuǎn)變?yōu)?strong>PreparingRebalance。另外衣式,GroupCoordinator收到JOIN_GROUP請(qǐng)求后會(huì)等待一段時(shí)間再返回寸士,讓該Group在PreparingRebalance狀態(tài)等待一定時(shí)間,以確保該加入的member都加入了碴卧。
PreparingRebalance再返回JOIN_GROUP請(qǐng)求后弱卡,就會(huì)把Group的狀態(tài)置為AwaitingSync。 Consumer收到響應(yīng)后住册,會(huì)再發(fā)送SYNC_GROUP請(qǐng)求等待partition分配完成婶博。如果該Consumer是leader,則該Consumer會(huì)在本地進(jìn)行partition的分配界弧,然后把partition的分配結(jié)果隨著SYNC_GROUP請(qǐng)求一起上報(bào)給GroupCoordinator。之后GroupCoordinator收到leader發(fā)送過(guò)來(lái)的分配情況搭综,就會(huì)將狀態(tài)置為Stable垢箕,之后將這些信息作為SYNC_GROUP請(qǐng)求的響應(yīng)發(fā)送給各個(gè)Consumer,各個(gè)Consumer就都得到了自己要消費(fèi)的partition兑巾。
四条获、Consumer心跳機(jī)制
Consumer在加入Group后,會(huì)開(kāi)啟一個(gè)線程蒋歌,不斷的向GroupCoordinator發(fā)送心跳請(qǐng)求帅掘,報(bào)告自己還活著委煤。GroupCoordinator會(huì)管理group中所有Consumer的心跳,如果發(fā)現(xiàn)有一個(gè)Consumer超過(guò)一定時(shí)間沒(méi)有發(fā)送心跳過(guò)來(lái)修档,GroupCoordinator會(huì)認(rèn)為這個(gè)Consumer已經(jīng)離開(kāi)group碧绞。這時(shí)GroupCoordinator會(huì)將該group的狀態(tài)重新置為PreparingRebalance,開(kāi)啟新一輪的partition分配吱窝。
心跳的發(fā)送頻率和consumer的配置
heartbeat.interval.ms
有關(guān)讥邻,默認(rèn)是3000,也就是每3s發(fā)送一次心跳院峡。GroupCoordinator判斷member是否過(guò)期和consumer的配置
session.timeout.ms
有關(guān)兴使,默認(rèn)為10000,也就是超過(guò)10s沒(méi)收到心跳請(qǐng)求照激,就移除該member发魄。
Group處于Stable狀態(tài)下,新加入一個(gè)Consumer
如果目前group已經(jīng)處于stable狀態(tài)了(各個(gè)consumer都在消費(fèi)了)俩垃,又新加入了一個(gè)Consumer励幼,那么狀態(tài)會(huì)怎么變更呢?
首先吆寨,新的Consumer會(huì)發(fā)送一個(gè) JOIN_GROUP請(qǐng)求給GroupCoordinator赏淌,GroupCoordinator收到請(qǐng)求后發(fā)現(xiàn)這是一個(gè)新的member,就會(huì)將group的狀態(tài)置為PreparingRebalance,然后等待其他member也發(fā)送 JOIN_GROUP請(qǐng)求啄清。
那么其他正在消費(fèi)的consumer怎么知道要重新分配partition呢六水?這個(gè)就和心跳機(jī)制有關(guān)系了。Consumer發(fā)送心跳給GroupCoordinator的時(shí)候辣卒,如果GroupCoordinator發(fā)現(xiàn)此刻group的狀態(tài)是PreparingRebalance掷贾,就會(huì)告訴Consumer需要重新分配partition了,各個(gè)Consumer收到消息后就開(kāi)始重新發(fā)送JOIN_GROUP請(qǐng)求荣茫。
Consumer離開(kāi)可能引發(fā)的Group狀態(tài)變更
當(dāng)Consumer超過(guò)一定時(shí)間沒(méi)有發(fā)送心跳想帅,GroupCoordinator會(huì)認(rèn)為該Consumer已經(jīng)離開(kāi)group。此時(shí)GroupCoordinator會(huì)將該group的狀態(tài)置為PreparingRebalance啡莉,等待新一輪的parition分配港准。
五、 __consumer_offsets topic中的消息
在之前老的版本中咧欣,consumer消費(fèi)的offset情況是存儲(chǔ)在zookeeper中的浅缸,但是kafka對(duì)zk的依賴(lài)性很強(qiáng),當(dāng)consumer的數(shù)量不斷增多魄咕,zk的負(fù)擔(dān)也會(huì)越來(lái)越大衩椒,最終可能會(huì)影響到zk的正常運(yùn)行。因此,在后面的版本中毛萌,kafka設(shè)計(jì)者將consumer的commitedoffset寫(xiě)到內(nèi)部的一個(gè)topic中苟弛,也就是__consumer_offsets。
__consumer_offsets存儲(chǔ)的兩種消息類(lèi)型
除了存儲(chǔ)Consumer Group的commitedOffset外阁将, __consumer_offsets中其他還存儲(chǔ)了另外一種消息類(lèi)型:GroupCoordinator管理的元數(shù)據(jù)信息膏秫,這些元數(shù)據(jù)包括GroupCoordinator管理的所有Group的信息,比如Group的狀態(tài)冀痕,Group的leader信息荔睹,以及Group的各個(gè)member信息,但不包括Group中各個(gè)topic-partition的commitedOffset言蛇。kafka通過(guò)消息key的不同來(lái)區(qū)分兩種消息類(lèi)型
- commitedOffset 消息類(lèi)型
它的key是 GroupId+topic+partition僻他,value是對(duì)應(yīng)的offset。
- GroupCoordinator 消息類(lèi)型
它的key是 GroupId腊尚,value是對(duì)應(yīng)的元數(shù)據(jù)信息
__consumer_offsets 消息的加載和寫(xiě)入
數(shù)據(jù)加載
[圖片上傳失敗...(image-14e902-1542368543647)]
為了讀取更快吨拗,無(wú)論是commitedOffset,還是GroupCoordinator的元數(shù)據(jù)婿斥,都會(huì)從 __consumer_offsets中加載出來(lái)緩存起來(lái)劝篷。這些數(shù)據(jù)在broker啟動(dòng)的時(shí)候加載的。
一個(gè)broker在啟動(dòng)的時(shí)候民宿,并不會(huì)知道自己機(jī)器上的那些partition是leader還是replica娇妓,所以無(wú)法立即從 __consumer_offsets中加載數(shù)據(jù)。如果這時(shí)有Consumer來(lái)拉取offset活鹰,kafka就會(huì)拋出一個(gè)異常給Consumer哈恰,Consumer等待會(huì)等待若干時(shí)間后再次請(qǐng)求。
在broker啟動(dòng)后志群,Controller會(huì)感應(yīng)到有新的broker啟動(dòng)着绷,然后知道這個(gè)broker上的partition哪些是leader,哪些是replica锌云,之后發(fā)送一個(gè)LEADER_AND_ISR請(qǐng)求給該broker荠医。該Broker收到這個(gè)請(qǐng)求,解析附帶的請(qǐng)求數(shù)據(jù)桑涎,就可以知道自己機(jī)器上的parition哪些是leader彬向,哪些是replica了。接著攻冷,如果發(fā)現(xiàn)這些partition有__consumer_offsets的話(huà)娃胆,就開(kāi)始讀取__consumer_offsets的數(shù)據(jù)并加載的內(nèi)存中。
加載的流程也很簡(jiǎn)單讲衫,kafka會(huì)預(yù)先申請(qǐng)5M的內(nèi)存空間缕棵,然后從目標(biāo)partition的第一條offset開(kāi)始讀取,直到讀取到最后一個(gè)offset為止涉兽。由于kafka會(huì)定時(shí)對(duì) __consumer_offsets進(jìn)行compact招驴,因此__consumer_offsets的partition大小一般也不會(huì)太大。
commitedOffset消息寫(xiě)入
當(dāng)有Consumer提交OFFSET_COMMIT請(qǐng)求時(shí)枷畏,就會(huì)往__consumer_offsets的對(duì)應(yīng)partition寫(xiě)入消息了别厘。由于kafka對(duì)這個(gè)topic開(kāi)始了消息壓縮(Compact),因此隨著時(shí)間的流逝拥诡,相同的key触趴,舊的記錄會(huì)被清除掉,只剩下最新的那個(gè)key
GroupCoordinator元數(shù)據(jù)寫(xiě)入
在Group狀態(tài)變?yōu)镾table后渴肉,GroupCoordinator會(huì)將當(dāng)前Group的相關(guān)元數(shù)據(jù)寫(xiě)入對(duì)應(yīng)partiiton冗懦。當(dāng)有成員離開(kāi)Group,Group狀態(tài)變成Empty的時(shí)候仇祭,也會(huì)寫(xiě)一條消息到partition披蕉。