Consume Rebalance in RocketMQ & Kafka(上)

今天的計(jì)劃看下兩個(gè)消息中間件RocketMQ和Kafka的Rebalance方式- -

首先說下Rebalance是做啥...為啥需要rebalance并介紹一些參與rebalance的基本概念~

Kafka(RocketMQ)在Broker中會(huì)將一個(gè)topic劃分為多個(gè)Partition(ConsumeQueue), 消息在生產(chǎn)后會(huì)被投遞到某個(gè)Partition(ConsumeQueue)中.(PS: partition可以被分配不同broker)

而對(duì)于消費(fèi)者,為了解決一條消息如何消費(fèi)的問題, 引入了ConsumeGroup并將Consumer分配到某個(gè)consumeGroup中, MQ在處理消息時(shí)會(huì)保證消息肥惭,在一個(gè)Group中只會(huì)被一個(gè)Consumer消費(fèi)(In ClusterMode也是正常大家使用的方式).(所以N臺(tái)機(jī)器如果在Group中只會(huì)被一個(gè)Consumer收到)

所以趴捅,MQ需要

  • 將Partition(ConsumeQueue)分配給Consumer
  • 并保證一個(gè)Partition(ConsumeQueue)只會(huì)被分配給一個(gè)Group中的一個(gè)Consumer(這樣就做到一個(gè)消息只能被Group中一個(gè)Consumer消費(fèi)了)
  • 一個(gè)Consumer可以消費(fèi)多個(gè)ConsumeQueue
  • 在Consume變化時(shí)重新分配保證保證ConsumeQueue都有被處理
  • 在Partition數(shù)量變化時(shí)重新分配保證Consume
  • 同樣在Broker部分掛機(jī)的情況下分配過程保證正確

而上面的過程就是今天要討論的Rebalance

RocketMQ

粗看流程

RocketMQ的Rebalance邏輯實(shí)際是發(fā)生在Consume客戶端的(當(dāng)然也必須會(huì)從Broker或Nameserver獲取一些信息), 處理思路簡單說是這樣:(注意RocketMQ里的ConsumeQueue可以理解為Kafka的Partition,所以這節(jié)里都用ConsumeQueue表述)

reblance核心邏輯可以參看RebalanceImpl#topicSubscribeInfoTable

  • 首先這個(gè)rebalance過程是被觸發(fā)在每個(gè)consumer上
  • 在客戶端獲取到當(dāng)前topic在所有broker所有ConsumeQueue
  • 在客戶端獲取到所有當(dāng)前ConsumeGroup的Consumer列表(也就是知道和自己在一個(gè)Group的其他兄弟姐妹)
  • 客戶端觸發(fā)Rebalance時(shí)會(huì)對(duì)所有ConsumeQueue基于QueueID(每個(gè)ConsumeQueue的固定的屬性)進(jìn)行排序, 對(duì)所有的ConsumeID也進(jìn)行排序(每個(gè)Consumer的標(biāo)示), 將排序結(jié)果.
  • 然后使用當(dāng)前的分配策略進(jìn)行分配河咽,分配結(jié)果就是分配給當(dāng)前Consumer的ConsumeQueue列表
  • 因?yàn)槊總€(gè)Consumer都會(huì)運(yùn)行分配所以最終結(jié)果是所有consumer都各自拿到屬于自己的ConsumeQueue

觸發(fā)條件這個(gè)rebalance的條件有:

  • 每20s定時(shí)刷新(準(zhǔn)確說上次刷新后等20s, @see RebalanceService#run
  • 收到Broker告知的Consume變化通知時(shí)@see ClientRemotingProcessor#notifyConsumerIdsChanged
  • 每次Client啟動(dòng)時(shí)@see DefaultMQPushConsumerImpl#start

上面簡單的描述了分配過程,不過我們接下來會(huì)看下各個(gè)細(xì)節(jié)~

獲取所有ConsumeQueue信息

這里我們從設(shè)置開始一起走到獲取~

  • ConsumeQueue會(huì)在創(chuàng)建Topic時(shí)指定Topic里Queue的數(shù)量(細(xì)化說有Write和Read這里不展開- -)赋元,最終創(chuàng)建結(jié)果會(huì)被存儲(chǔ)到NameServer上(就如名字所說保存一些元數(shù)據(jù)的server)
  • 所以Consume會(huì)直接從NameServer獲取關(guān)于當(dāng)前有多少Q(mào)ueue的信息
  • 獲取Queue數(shù)量是從本地的``獲取的代碼位于@see MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String)這個(gè)函數(shù)入口有些多但情況就是會(huì)從從NameServer獲取并在變化時(shí)更新在客戶端緩存的對(duì)特定topic的Routine信息(這包括所有的broker忘蟹,每個(gè)broker編號(hào)queueId為0->x)
  • 有個(gè)入口需要關(guān)注就是MQClientInstance#startScheduledTask會(huì)每10s刷主動(dòng)刷一次
  • 然后實(shí)際就從RebalanceImpl#topicSubscribeInfoTable這個(gè)緩存字段獲取來rebalance了

問題:...等等一會(huì)兒看

獲取Group中的其他Consume信息

這里我們倒過來從獲取走到數(shù)據(jù)來源..

  • group信息是保存在broker上的每次分配都會(huì)從broker拉取@see MQClientAPIImpl#getConsumerIdListByGroup
  • 這里獲取Group中其他ConsumerId請(qǐng)求的是任意一個(gè)brokerGroup(一個(gè)topic創(chuàng)建時(shí)可以指定多個(gè)brokerName, 一個(gè)brokerName下可以有多臺(tái)brokerNode,為了便于理解這里把相同brokerName的多個(gè)brokerNode假裝叫做brokerGroup)
  • 之后會(huì)從brokerGroup中選擇一臺(tái)機(jī)器獲取, 獲取會(huì)優(yōu)先獲取Master節(jié)點(diǎn)搁凸, 如果Master沒有會(huì)亂獲取一臺(tái)(實(shí)現(xiàn)是hashmap里iter的第一個(gè)- -因?yàn)楹竺婵吹揭驗(yàn)槊颗_(tái)consumer都連接所有broker所以理論上可以亂選)
  • 在broker上每個(gè)Group現(xiàn)在的consumer信息是保存在內(nèi)存中的ConsumerManager#consumerTable一個(gè)Map
  • 而更新的地方只有一個(gè)ClientManageProcessor#heartBeat也就是收到client心跳信息的時(shí)候
  • 好了, 我們必須看下心跳咋上報(bào)的MQClientInstance#sendHeartbeatToAllBroker, 可以看到果然媚值,只要當(dāng)前client有consumer信息(即不是純粹的produer角色的client)就會(huì)像所有brokerNode上報(bào)心跳, 注意不是brokerGroup是所有node(這里感覺建立心跳連接有些多???不過目前這模式好像沒太好優(yōu)化方法 再想想- -)

好了這里畫個(gè)圖總結(jié)下~

所有Conume都會(huì)向所有broker建立連接并心跳上報(bào),所以所以任意一臺(tái)broker都有當(dāng)前group的所有節(jié)點(diǎn)信息(正常情況), 客戶端想要獲取當(dāng)前group的所有consumer信息直接亂選一臺(tái)活著的獲取就好了

幾個(gè)內(nèi)置的分配策略

首先前面說過分配前提是已經(jīng)獲取到所有可用Queue和所有當(dāng)前Group的Consumer护糖,并都做了排序褥芒,各個(gè)Consumer各自執(zhí)行分配, 分配邏輯實(shí)現(xiàn)是AllocateMessageQueueStrategy的幾個(gè)實(shí)現(xiàn)

  • AllocateMessageQueueAveragely: 平均分配
  • AllocateMessageQueueByMachineRoom: 看注釋是什么alipay邏輯機(jī)房的邏輯???主要是對(duì)brokerName基于邏輯機(jī)房進(jìn)行了篩選- -?不過能否用怎么用就...
  • AllocateMessageQueueAveragelyByCircle: 環(huán)形分配
  • AllocateMessageQueueByConfig: 配死(或通過其他機(jī)制動(dòng)起來?)

總結(jié)下最后結(jié)果就是能獲取到屬于當(dāng)前consumer的ConsumeQueue(代碼里叫MessageQueue)

標(biāo)記已Queue已被占用

上面我們看到整個(gè)標(biāo)記過程都是在consumer本地就完成了,各個(gè)consumer間通過排序+一個(gè)一致的算法就完成了分配锰扶,并沒有和其他consumer的交互献酗。

然而這是有問題的,因?yàn)閞ebalance是各自執(zhí)行坷牛,不排除某個(gè)時(shí)刻兩個(gè)同一個(gè)Group的兩個(gè)Consumer都懟到一個(gè)Queue上罕偎,而這個(gè)從設(shè)計(jì)上是絕對(duì)不允許的,所以這里需要一個(gè)機(jī)制保證永遠(yuǎn)不會(huì)出現(xiàn)同Group兩個(gè)Consume懟到一個(gè)Queue上京闰。

RocketMQ目前是選擇在Broker上維護(hù)一個(gè)LockMap來實(shí)現(xiàn)(后面會(huì)討論這個(gè)也許有問題??)

RebalanceImpl#updateProcessQueueTableInRebalance中, 如果是新分配的Queue, 會(huì)調(diào)用this.lock(mq)

for (MessageQueue mq : mqSet) {
    if (!this.processQueueTable.containsKey(mq)) {
         if (isOrder && !this.lock(mq)) { // !!! here
             log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
               continue;
         }

繼續(xù)往下跟代碼(為了避免太長這里不貼代碼了)颜及,會(huì)發(fā)現(xiàn)lock會(huì)向masterNode(brokerId=0)的節(jié)點(diǎn)發(fā)LockBatchRequestBody(只有Master?Master掛了的話- -?)

最后在masterNode內(nèi)存中會(huì)通過RebalanceLockManager#mqLockTable實(shí)現(xiàn)加鎖占用(帶超時(shí)默認(rèn)1分鐘超時(shí)類似租期), 如果master時(shí)這lock信息會(huì)丟失掉?當(dāng)依賴定時(shí)rebalance可以恢復(fù),不過那次rebalance如果有沖突之類的情況發(fā)生的話...? 好吧 后面再來看這些check特殊場(chǎng)景

如果加鎖失敗(別人已經(jīng)占用或者鎖請(qǐng)求失敗)會(huì)不對(duì)這個(gè)queue不做處理蹂楣。俏站。然后等下次rebalance, 再來看別人是否釋放鎖和masterbroker是否恢復(fù)...

同樣在RebalanceImpl#updateProcessQueueTableInRebalance會(huì)將無需處理的隊(duì)列從當(dāng)前處理中remove掉~這部分邏輯跟下去位于RebalancePushImpl#removeUnnecessaryMessageQueue,會(huì)等待當(dāng)前正在執(zhí)行的消費(fèi)并等processQueue處理干凈才嘗試向maserNode發(fā)起unlock(看代碼這里好像沒處理masterBroker網(wǎng)路不通的情況如果unlock不成功直接算成功了???)

好了如果按照預(yù)期正常unlock痊土,其他consumer可以lock并開始消費(fèi)乾翔,或者等20s下次rebalance可以開始消費(fèi)(如果本次因次序沒競爭lock上)

Challenge

最后,我們來假設(shè)些場(chǎng)景施戴,看看能否正常work

1. Consume加入
  • 假設(shè)開始有1個(gè)aconsumer消費(fèi)3個(gè)隊(duì)列q1``q2``q3反浓,啟動(dòng)后rebalance消費(fèi)3個(gè)q并在lockMap中都占有
  • bconsumer這時(shí)加入,b自己啟動(dòng)觸發(fā)自己rebalance, a收到b加入的change事件后開始rebalace,
  • b獲得q3, 所以嘗試lock赞哗,但a還占有著lock失敗暫時(shí)不去消費(fèi)q3
  • a獲取q1, q2, 所以removeq3雷则,并在處理當(dāng)前消息等一會(huì)unlock
  • 定時(shí)rebalance運(yùn)行, b成功lockq3并開始消費(fèi)

整體看沒啥問題,雖然新加入的consumer要等一陣才能接手消費(fèi)(有間隙消費(fèi)小lag), 另外那個(gè)等一會(huì)兒unlock特殊情況下一會(huì)兒小概率會(huì)有問題

2. Consume離開(斷線)
  • 假設(shè)兩個(gè)consumera處理q1,q2, b處理q3
  • b因?yàn)榫W(wǎng)絡(luò)原因斷線肪笋,broker發(fā)出change事件觸發(fā)在線的a進(jìn)行rebalance
  • a這時(shí)會(huì)分配接管q1``q2``q3, 對(duì)新加入的q3進(jìn)行l(wèi)ock, 然后發(fā)現(xiàn)是lock不了的因?yàn)?code>b已經(jīng)在lock了
  • 這時(shí)候需要等60s后的rebalance, a才有機(jī)會(huì)解盤q3的消息

感覺這部分等鎖超時(shí)有些無奈- -月劈,消費(fèi)不會(huì)亂但會(huì)有消費(fèi)lag增加

3. Consume同時(shí)并發(fā)加入

  • 假設(shè)開始只有一個(gè)consumera處理q1``q2``q3
  • 之后consumerbc“同時(shí)”加入
  • 首先各自啟動(dòng)后自己rebalance,blockq2, clockq3可能會(huì)失敗
  • 然后a收到change消息開始rebalance, 這時(shí)可能看到bc 也可能只看到b,rebalance處理change是使用wakeup不會(huì)重復(fù)喚醒(已醒著不會(huì)再來一次),所以本次rebalance是有可能認(rèn)為只有b只unlockq3..不過沒關(guān)系還有下次20s的rebalance那次還是可以懟正
  • 同理b,c靠20s運(yùn)行一次的rebalance也是可以懟正

所以我們看到可以保證消費(fèi)不會(huì)亂藤乙,不過代價(jià)是要過一陣新加入的consume才能真正開始接手消費(fèi)(間隙小lag)

4. Topic調(diào)整Queue數(shù)量

上面提到過Client每10s會(huì)從NameServer刷一次TopicRoutine(MQClientInstance#startScheduledTask), 所以Queue變化正常會(huì)在這里被收到并更新本地緩存猜揪。

然后,正常情況下等下次rebalance時(shí)就會(huì)用新的Queue信息進(jìn)行重新分配坛梁,然后基于上面說的lock和定期重rebalance規(guī)則而姐,最終可以保證ok且中途不亂

異常情況下,想到的幾個(gè)NameServer數(shù)據(jù)不一致或交換routine刷新和rebalance次序划咐,看好像最終也都能達(dá)到期望狀態(tài) - -

5. Broker掛了

上面提到過拴念,lock信息是放到每個(gè)BrokerGroup中的master(id0)上的,所以如果Master掛了的話褐缠,lock會(huì)用永遠(yuǎn)不成功政鼠,可以理解為新Consume無法加入,老Consume無法退出队魏,必須等待broker活過來公般,但之前在跑的的還可以正常運(yùn)行(只要?jiǎng)e離開了還沒加入然后broker掛了- -這種情況部分queue會(huì)有l(wèi)ag)

(PS背景介紹: 在rocketmq的設(shè)計(jì)里brokerGroup的master掛了group不可以寫入,但可以改寫其他brokerGroup來完成寫入HA,消費(fèi)者HA可以通過brokerGroup里的slave消費(fèi)之前堆在brokerGroup里的內(nèi)容)

broker活過來后,因?yàn)槭莾?nèi)存官帘,所以下次觸發(fā)rebalance會(huì)重新恢復(fù)lock的map蟹略。。

不過感覺有個(gè)極端情況遏佣。。就是master掛了揽浙,然后這時(shí)消費(fèi)者有變化或者隊(duì)列數(shù)目有調(diào)整状婶。。馅巷。因?yàn)閱?dòng)時(shí)內(nèi)存為空等于沒占鎖膛虫,而實(shí)際之前consumer已經(jīng)在跑,在還沒來得及rebalance就發(fā)生了變更钓猬,這時(shí)可能出現(xiàn)同group里兩個(gè)consumer同時(shí)消費(fèi)一個(gè)queue稍刀??敞曹?账月?

RocketMQ小結(jié)

RocketMQ在master不掛的情況下rebalance可以保證消費(fèi)不亂,雖然可能會(huì)有消息lag問題但感覺并不關(guān)鍵澳迫;而master掛且同時(shí)發(fā)生rebalance這個(gè)的確有些問題局齿。。此外rebalance完全由客戶端控制其他人有沒有用上相互之前并不知道橄登;并且各自拉namesrv可能會(huì)看到不一致的數(shù)據(jù)雖然最終通過定期重rebalance可以一致會(huì)導(dǎo)致不必要的rebalance的感覺- -

看似有些問題待解決抓歼,如果理解有誤歡迎討論~~哈哈哈

下面開始看下kafka0.9版本之后的方式,據(jù)說kafka在很久之前和rocketmq目前用的很像, 但后來改了..

Kafka

kafka還在看, 沒按照預(yù)期搞完拢锹,見下偏文章哈~

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末谣妻,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子卒稳,更是在濱河造成了極大的恐慌蹋半,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,718評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件充坑,死亡現(xiàn)場(chǎng)離奇詭異湃窍,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)匪傍,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,683評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門您市,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人役衡,你說我怎么就攤上這事茵休。” “怎么了?”我有些...
    開封第一講書人閱讀 158,207評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵榕莺,是天一觀的道長俐芯。 經(jīng)常有香客問我,道長钉鸯,這世上最難降的妖魔是什么吧史? 我笑而不...
    開封第一講書人閱讀 56,755評(píng)論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮唠雕,結(jié)果婚禮上贸营,老公的妹妹穿的比我還像新娘。我一直安慰自己岩睁,他們只是感情好钞脂,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,862評(píng)論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著捕儒,像睡著了一般冰啃。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上刘莹,一...
    開封第一講書人閱讀 50,050評(píng)論 1 291
  • 那天阎毅,我揣著相機(jī)與錄音,去河邊找鬼点弯。 笑死净薛,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的蒲拉。 我是一名探鬼主播肃拜,決...
    沈念sama閱讀 39,136評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼雌团!你這毒婦竟也來了燃领?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,882評(píng)論 0 268
  • 序言:老撾萬榮一對(duì)情侶失蹤锦援,失蹤者是張志新(化名)和其女友劉穎猛蔽,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體灵寺,經(jīng)...
    沈念sama閱讀 44,330評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡曼库,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,651評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了略板。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片毁枯。...
    茶點(diǎn)故事閱讀 38,789評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖叮称,靈堂內(nèi)的尸體忽然破棺而出种玛,到底是詐尸還是另有隱情藐鹤,我是刑警寧澤,帶...
    沈念sama閱讀 34,477評(píng)論 4 333
  • 正文 年R本政府宣布赂韵,位于F島的核電站娱节,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏祭示。R本人自食惡果不足惜肄满,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,135評(píng)論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望质涛。 院中可真熱鬧稠歉,春花似錦、人聲如沸蹂窖。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,864評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽瞬测。三九已至,卻和暖如春纠炮,著一層夾襖步出監(jiān)牢的瞬間月趟,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,099評(píng)論 1 267
  • 我被黑心中介騙來泰國打工恢口, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留孝宗,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,598評(píng)論 2 362
  • 正文 我出身青樓耕肩,卻偏偏與公主長得像因妇,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子猿诸,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,697評(píng)論 2 351

推薦閱讀更多精彩內(nèi)容