今天的計(jì)劃看下兩個(gè)消息中間件RocketMQ和Kafka的Rebalance
方式- -
首先說下Rebalance
是做啥...為啥需要rebalance并介紹一些參與rebalance的基本概念~
Kafka(RocketMQ)在Broker中會(huì)將一個(gè)topic劃分為多個(gè)Partition
(ConsumeQueue
), 消息在生產(chǎn)后會(huì)被投遞到某個(gè)Partition
(ConsumeQueue
)中.(PS: partition可以被分配不同broker)
而對(duì)于消費(fèi)者,為了解決一條消息如何消費(fèi)的問題, 引入了ConsumeGroup
并將Consumer
分配到某個(gè)consumeGroup中, MQ在處理消息時(shí)會(huì)保證消息肥惭,在一個(gè)Group中只會(huì)被一個(gè)Consumer
消費(fèi)(In ClusterMode也是正常大家使用的方式).(所以N臺(tái)機(jī)器如果在Group中只會(huì)被一個(gè)Consumer收到)
所以趴捅,MQ需要
- 將Partition(ConsumeQueue)分配給Consumer
- 并保證一個(gè)Partition(ConsumeQueue)只會(huì)被分配給一個(gè)Group中的一個(gè)Consumer(這樣就做到一個(gè)消息只能被Group中一個(gè)Consumer消費(fèi)了)
- 一個(gè)Consumer可以消費(fèi)多個(gè)ConsumeQueue
- 在Consume變化時(shí)重新分配保證保證ConsumeQueue都有被處理
- 在Partition數(shù)量變化時(shí)重新分配保證Consume
- 同樣在Broker部分掛機(jī)的情況下分配過程保證正確
而上面的過程就是今天要討論的Rebalance
RocketMQ
粗看流程
RocketMQ的Rebalance邏輯實(shí)際是發(fā)生在Consume客戶端的(當(dāng)然也必須會(huì)從Broker或Nameserver獲取一些信息), 處理思路簡單說是這樣:(注意RocketMQ里的ConsumeQueue可以理解為Kafka的Partition,所以這節(jié)里都用ConsumeQueue
表述)
reblance核心邏輯可以參看RebalanceImpl#topicSubscribeInfoTable
- 首先這個(gè)rebalance過程是被觸發(fā)在每個(gè)consumer上
- 在客戶端獲取到當(dāng)前topic在所有broker所有ConsumeQueue
- 在客戶端獲取到所有當(dāng)前ConsumeGroup的Consumer列表(也就是知道和自己在一個(gè)Group的其他兄弟姐妹)
- 客戶端觸發(fā)Rebalance時(shí)會(huì)對(duì)所有ConsumeQueue基于QueueID(每個(gè)ConsumeQueue的固定的屬性)進(jìn)行排序, 對(duì)所有的ConsumeID也進(jìn)行排序(每個(gè)Consumer的標(biāo)示), 將排序結(jié)果.
- 然后使用當(dāng)前的分配策略進(jìn)行分配河咽,分配結(jié)果就是分配給當(dāng)前Consumer的ConsumeQueue列表
- 因?yàn)槊總€(gè)Consumer都會(huì)運(yùn)行分配所以最終結(jié)果是所有consumer都各自拿到屬于自己的ConsumeQueue
觸發(fā)條件這個(gè)rebalance的條件有:
- 每20s定時(shí)刷新(準(zhǔn)確說上次刷新后等20s,
@see RebalanceService#run
- 收到Broker告知的Consume變化通知時(shí)
@see ClientRemotingProcessor#notifyConsumerIdsChanged
- 每次Client啟動(dòng)時(shí)
@see DefaultMQPushConsumerImpl#start
上面簡單的描述了分配過程,不過我們接下來會(huì)看下各個(gè)細(xì)節(jié)~
獲取所有ConsumeQueue信息
這里我們從設(shè)置開始一起走到獲取~
- ConsumeQueue會(huì)在創(chuàng)建Topic時(shí)指定Topic里Queue的數(shù)量(細(xì)化說有Write和Read這里不展開- -)赋元,最終創(chuàng)建結(jié)果會(huì)被存儲(chǔ)到
NameServer
上(就如名字所說保存一些元數(shù)據(jù)的server) - 所以Consume會(huì)直接從
NameServer
獲取關(guān)于當(dāng)前有多少Q(mào)ueue的信息 - 獲取Queue數(shù)量是從本地的``獲取的代碼位于
@see MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String)
這個(gè)函數(shù)入口有些多但情況就是會(huì)從從NameServer獲取并在變化時(shí)更新在客戶端緩存的對(duì)特定topic的Routine信息(這包括所有的broker忘蟹,每個(gè)broker編號(hào)queueId為0->x) - 有個(gè)入口需要關(guān)注就是
MQClientInstance#startScheduledTask
會(huì)每10s刷主動(dòng)刷一次 - 然后實(shí)際就從
RebalanceImpl#topicSubscribeInfoTable
這個(gè)緩存字段獲取來rebalance了
問題:...等等一會(huì)兒看
獲取Group中的其他Consume信息
這里我們倒過來從獲取走到數(shù)據(jù)來源..
- group信息是保存在broker上的每次分配都會(huì)從broker拉取
@see MQClientAPIImpl#getConsumerIdListByGroup
- 這里獲取Group中其他ConsumerId請(qǐng)求的是任意一個(gè)brokerGroup(一個(gè)topic創(chuàng)建時(shí)可以指定多個(gè)
brokerName
, 一個(gè)brokerName下可以有多臺(tái)brokerNode,為了便于理解這里把相同brokerName的多個(gè)brokerNode假裝叫做brokerGroup) - 之后會(huì)從brokerGroup中選擇一臺(tái)機(jī)器獲取, 獲取會(huì)優(yōu)先獲取Master節(jié)點(diǎn)搁凸, 如果Master沒有會(huì)亂獲取一臺(tái)(實(shí)現(xiàn)是hashmap里iter的第一個(gè)- -因?yàn)楹竺婵吹揭驗(yàn)槊颗_(tái)consumer都連接所有broker所以理論上可以亂選)
- 在broker上每個(gè)Group現(xiàn)在的consumer信息是保存在內(nèi)存中的
ConsumerManager#consumerTable
一個(gè)Map - 而更新的地方只有一個(gè)
ClientManageProcessor#heartBeat
也就是收到client心跳信息的時(shí)候 - 好了, 我們必須看下心跳咋上報(bào)的
MQClientInstance#sendHeartbeatToAllBroker
, 可以看到果然媚值,只要當(dāng)前client有consumer信息(即不是純粹的produer角色的client)就會(huì)像所有brokerNode上報(bào)心跳, 注意不是brokerGroup是所有node(這里感覺建立心跳連接有些多???不過目前這模式好像沒太好優(yōu)化方法 再想想- -)
好了這里畫個(gè)圖總結(jié)下~
所有Conume都會(huì)向所有broker建立連接并心跳上報(bào),所以所以任意一臺(tái)broker都有當(dāng)前group的所有節(jié)點(diǎn)信息(正常情況), 客戶端想要獲取當(dāng)前group的所有consumer信息直接亂選一臺(tái)活著的獲取就好了
幾個(gè)內(nèi)置的分配策略
首先前面說過分配前提是已經(jīng)獲取到所有可用Queue和所有當(dāng)前Group的Consumer护糖,并都做了排序褥芒,各個(gè)Consumer各自執(zhí)行分配, 分配邏輯實(shí)現(xiàn)是AllocateMessageQueueStrategy
的幾個(gè)實(shí)現(xiàn)
- AllocateMessageQueueAveragely: 平均分配
- AllocateMessageQueueByMachineRoom: 看注釋是什么alipay邏輯機(jī)房的邏輯???主要是對(duì)brokerName基于邏輯機(jī)房進(jìn)行了篩選- -?不過能否用怎么用就...
- AllocateMessageQueueAveragelyByCircle: 環(huán)形分配
- AllocateMessageQueueByConfig: 配死(或通過其他機(jī)制動(dòng)起來?)
總結(jié)下最后結(jié)果就是能獲取到屬于當(dāng)前consumer的ConsumeQueue(代碼里叫MessageQueue)
標(biāo)記已Queue已被占用
上面我們看到整個(gè)標(biāo)記過程都是在consumer本地就完成了,各個(gè)consumer間通過排序+一個(gè)一致的算法就完成了分配锰扶,并沒有和其他consumer的交互献酗。
然而這是有問題的,因?yàn)閞ebalance是各自執(zhí)行坷牛,不排除某個(gè)時(shí)刻兩個(gè)同一個(gè)Group的兩個(gè)Consumer都懟到一個(gè)Queue上罕偎,而這個(gè)從設(shè)計(jì)上是絕對(duì)不允許的,所以這里需要一個(gè)機(jī)制保證永遠(yuǎn)不會(huì)出現(xiàn)同Group兩個(gè)Consume懟到一個(gè)Queue上京闰。
RocketMQ目前是選擇在Broker上維護(hù)一個(gè)LockMap來實(shí)現(xiàn)(后面會(huì)討論這個(gè)也許有問題??)
在RebalanceImpl#updateProcessQueueTableInRebalance
中, 如果是新分配的Queue, 會(huì)調(diào)用this.lock(mq)
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
if (isOrder && !this.lock(mq)) { // !!! here
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
繼續(xù)往下跟代碼(為了避免太長這里不貼代碼了)颜及,會(huì)發(fā)現(xiàn)lock會(huì)向masterNode(brokerId=0)的節(jié)點(diǎn)發(fā)LockBatchRequestBody(只有Master?Master掛了的話- -?)
最后在masterNode內(nèi)存中會(huì)通過RebalanceLockManager#mqLockTable
實(shí)現(xiàn)加鎖占用(帶超時(shí)默認(rèn)1分鐘超時(shí)類似租期), 如果master時(shí)這lock信息會(huì)丟失掉?當(dāng)依賴定時(shí)rebalance可以恢復(fù),不過那次rebalance如果有沖突之類的情況發(fā)生的話...? 好吧 后面再來看這些check特殊場(chǎng)景
如果加鎖失敗(別人已經(jīng)占用或者鎖請(qǐng)求失敗)會(huì)不對(duì)這個(gè)queue不做處理蹂楣。俏站。然后等下次rebalance, 再來看別人是否釋放鎖和masterbroker是否恢復(fù)...
同樣在RebalanceImpl#updateProcessQueueTableInRebalance
會(huì)將無需處理的隊(duì)列從當(dāng)前處理中remove掉~這部分邏輯跟下去位于RebalancePushImpl#removeUnnecessaryMessageQueue
,會(huì)等待當(dāng)前正在執(zhí)行的消費(fèi)并等processQueue處理干凈才嘗試向maserNode發(fā)起unlock(看代碼這里好像沒處理masterBroker網(wǎng)路不通的情況如果unlock不成功直接算成功了???)
好了如果按照預(yù)期正常unlock痊土,其他consumer可以lock并開始消費(fèi)乾翔,或者等20s下次rebalance可以開始消費(fèi)(如果本次因次序沒競爭lock上)
Challenge
最后,我們來假設(shè)些場(chǎng)景施戴,看看能否正常work
1. Consume加入
- 假設(shè)開始有1個(gè)
a
consumer消費(fèi)3個(gè)隊(duì)列q1``q2``q3
反浓,啟動(dòng)后rebalance消費(fèi)3個(gè)q并在lockMap中都占有 -
b
consumer這時(shí)加入,b
自己啟動(dòng)觸發(fā)自己rebalance,a
收到b
加入的change事件后開始rebalace, -
b
獲得q3
, 所以嘗試lock赞哗,但a還占有著lock失敗暫時(shí)不去消費(fèi)q3
-
a
獲取q1
,q2
, 所以removeq3
雷则,并在處理當(dāng)前消息等一會(huì)unlock - 定時(shí)rebalance運(yùn)行,
b
成功lockq3
并開始消費(fèi)
整體看沒啥問題,雖然新加入的consumer要等一陣才能接手消費(fèi)(有間隙消費(fèi)小lag), 另外那個(gè)等一會(huì)兒unlock特殊情況下一會(huì)兒小概率會(huì)有問題
2. Consume離開(斷線)
- 假設(shè)兩個(gè)consumer
a
處理q1
,q2
,b
處理q3
-
b
因?yàn)榫W(wǎng)絡(luò)原因斷線肪笋,broker發(fā)出change事件觸發(fā)在線的a
進(jìn)行rebalance -
a
這時(shí)會(huì)分配接管q1``q2``q3
, 對(duì)新加入的q3
進(jìn)行l(wèi)ock, 然后發(fā)現(xiàn)是lock不了的因?yàn)?code>b已經(jīng)在lock了 - 這時(shí)候需要等60s后的rebalance,
a
才有機(jī)會(huì)解盤q3
的消息
感覺這部分等鎖超時(shí)有些無奈- -月劈,消費(fèi)不會(huì)亂但會(huì)有消費(fèi)lag增加
3. Consume同時(shí)并發(fā)加入
- 假設(shè)開始只有一個(gè)consumer
a
處理q1``q2``q3
- 之后consumer
b
和c
“同時(shí)”加入 - 首先各自啟動(dòng)后自己rebalance,
b
lockq2
,c
lockq3
可能會(huì)失敗 - 然后
a
收到change消息開始rebalance, 這時(shí)可能看到b
和c
也可能只看到b
,rebalance處理change是使用wakeup不會(huì)重復(fù)喚醒(已醒著不會(huì)再來一次),所以本次rebalance是有可能認(rèn)為只有b
只unlockq3
..不過沒關(guān)系還有下次20s的rebalance那次還是可以懟正 - 同理
b
,c
靠20s運(yùn)行一次的rebalance也是可以懟正
所以我們看到可以保證消費(fèi)不會(huì)亂藤乙,不過代價(jià)是要過一陣新加入的consume才能真正開始接手消費(fèi)(間隙小lag)
4. Topic調(diào)整Queue數(shù)量
上面提到過Client每10s會(huì)從NameServer刷一次TopicRoutine(MQClientInstance#startScheduledTask
), 所以Queue變化正常會(huì)在這里被收到并更新本地緩存猜揪。
然后,正常情況下等下次rebalance時(shí)就會(huì)用新的Queue信息進(jìn)行重新分配坛梁,然后基于上面說的lock和定期重rebalance規(guī)則而姐,最終可以保證ok且中途不亂
異常情況下,想到的幾個(gè)NameServer數(shù)據(jù)不一致或交換routine刷新和rebalance次序划咐,看好像最終也都能達(dá)到期望狀態(tài) - -
5. Broker掛了
上面提到過拴念,lock信息是放到每個(gè)BrokerGroup中的master(id0)上的,所以如果Master掛了的話褐缠,lock會(huì)用永遠(yuǎn)不成功政鼠,可以理解為新Consume無法加入,老Consume無法退出队魏,必須等待broker活過來公般,但之前在跑的的還可以正常運(yùn)行(只要?jiǎng)e離開了還沒加入然后broker掛了- -這種情況部分queue會(huì)有l(wèi)ag)
(PS背景介紹: 在rocketmq的設(shè)計(jì)里brokerGroup的master掛了group不可以寫入,但可以改寫其他brokerGroup來完成寫入HA,消費(fèi)者HA可以通過brokerGroup里的slave消費(fèi)之前堆在brokerGroup里的內(nèi)容)
broker活過來后,因?yàn)槭莾?nèi)存官帘,所以下次觸發(fā)rebalance會(huì)重新恢復(fù)lock的map蟹略。。
不過感覺有個(gè)極端情況遏佣。。就是master掛了揽浙,然后這時(shí)消費(fèi)者有變化或者隊(duì)列數(shù)目有調(diào)整状婶。。馅巷。因?yàn)閱?dòng)時(shí)內(nèi)存為空等于沒占鎖膛虫,而實(shí)際之前consumer已經(jīng)在跑,在還沒來得及rebalance就發(fā)生了變更钓猬,這時(shí)可能出現(xiàn)同group里兩個(gè)consumer同時(shí)消費(fèi)一個(gè)queue稍刀??敞曹?账月?
RocketMQ小結(jié)
RocketMQ在master不掛的情況下rebalance可以保證消費(fèi)不亂,雖然可能會(huì)有消息lag問題但感覺并不關(guān)鍵澳迫;而master掛且同時(shí)發(fā)生rebalance這個(gè)的確有些問題局齿。。此外rebalance完全由客戶端控制其他人有沒有用上相互之前并不知道橄登;并且各自拉namesrv可能會(huì)看到不一致的數(shù)據(jù)雖然最終通過定期重rebalance可以一致會(huì)導(dǎo)致不必要的rebalance的感覺- -
看似有些問題待解決抓歼,如果理解有誤歡迎討論~~哈哈哈
下面開始看下kafka0.9版本之后的方式,據(jù)說kafka在很久之前和rocketmq目前用的很像, 但后來改了..
Kafka
kafka還在看, 沒按照預(yù)期搞完拢锹,見下偏文章哈~