Consumer的負(fù)載均衡
RocketMQ在消費(fèi)端的負(fù)載均衡如下圖所示,各個(gè)partition均勻分布在各個(gè)consumer上遗菠,如下圖所示:
所有consumer依次消費(fèi)每一個(gè)partition,如果partition數(shù)量不是consumer的整數(shù)倍维费,則排在前面的consumer會(huì)消費(fèi)更多的partition种蝶,下面可以看看consumer的實(shí)現(xiàn)。
Rebalance的實(shí)現(xiàn)
RocketMQ的consumer負(fù)載均衡依賴于RebalanceImpl類靡羡,以push的方式為例系洛,在DefaultMQPushConsumerImpl為例,其中包含RebalancePushImpl:
RebalanceImpl負(fù)責(zé)消費(fèi)端的負(fù)載均衡略步,其中的doRebalance方法:
我們?cè)龠M(jìn)入到rebalanceImpl的doRebalance方法描扯,其中調(diào)用了rebalanceByTopic,我們看看rebalanceByTopic中的邏輯:
可以看到,其主體邏輯比較簡(jiǎn)單:
- 先獲取topic下的MessageQueue,一個(gè)MessageQueue實(shí)際上就是一個(gè)partition
- 然后獲取當(dāng)前topic和group的client id坛吁,即當(dāng)前group中消費(fèi)此topic的客戶端
- 隨后對(duì)partition和client id做排序
- 然后調(diào)用strategy獲取當(dāng)前客戶端需要消費(fèi)的partition
- 最后更新訂閱
因此舷夺,負(fù)載均衡的主體算法在AllocateMessageQueueStrategy中實(shí)現(xiàn),通過DefaultMQPushConsumer的默認(rèn)構(gòu)造器我們可以看到,默認(rèn)使用的AllocateMessageQueueStrategy是AllocateMessageQueueAveragely實(shí)現(xiàn)類:
找到具體的實(shí)現(xiàn)類后,我們可以看到默認(rèn)使用的負(fù)載均衡算法:
公式寫的非常繞,代幾個(gè)數(shù)進(jìn)去算一下就知道蜂桶,默認(rèn)情況下,rocketmq使用的是連續(xù)分配的方式也切,示意圖如下:
AllocateMessageQueueStrategy提供了多個(gè)實(shí)現(xiàn):
- AllocateMessageQueueAveragely是前面講的默認(rèn)方式
- AllocateMessageQueueAveragelyByCircle則是本文最前面的示意圖扑媚,每個(gè)消費(fèi)者依次消費(fèi)一個(gè)partition
- AllocateMessageQueueConsistentHash使用的是一致性hash算法
- AllocateMachineRoomNearby是通過就近元?jiǎng)t腰湾,離的近的消費(fèi)
- AllocateMessageQueueByConfig是通過配置的方式
在不同的情況下,我們可以選擇使用不同的負(fù)載均衡實(shí)現(xiàn)方式疆股。
對(duì)于特定場(chǎng)景费坊,甚至可以自己實(shí)現(xiàn)負(fù)載均衡策略,比如我們的應(yīng)用需要消費(fèi)非常多個(gè)topic押桃,而每個(gè)topic的partition不一定剛才都是機(jī)器 數(shù)量的整數(shù)倍葵萎,這個(gè)時(shí)候,按照rocketmq提供的負(fù)載均衡策略唱凯,排在前面的consumer消費(fèi)的partition數(shù)量會(huì)多于后面的consumer羡忘,當(dāng)topic非常多時(shí),這就導(dǎo)致排在前面的consumer消費(fèi)的partition比后面的consumer要多很多磕昼,造成集群中不同機(jī)器的水位相差非常大卷雕,這種場(chǎng)景下就知道自己實(shí)現(xiàn)負(fù)載均衡策略
何時(shí)重新Rebalace
這里先要介紹一個(gè)類MQClientInstance,此類在DefaultMQPushConsumerImpl的start方法中有如下代碼:
這里的mQClientFactory的實(shí)現(xiàn)類實(shí)際上就是一個(gè)MQClientInstance票从,進(jìn)入到MQClientInstance類的構(gòu)造器中漫雕,我們可以看到它創(chuàng)建了一個(gè)RebalanceService對(duì)象,代碼如下:
我們一級(jí)級(jí)的看下去峰鄙,在RebalanceService的run方法中浸间,可以看到,默認(rèn)每20s調(diào)一次doRebalance:
而在父類ServiceThread中吟榴,我們可以看到run方法的調(diào)用方式魁蒜,實(shí)際上是創(chuàng)建了一個(gè)線程:
因此,當(dāng)一個(gè)consumer出現(xiàn)宕機(jī)后吩翻,默認(rèn)最多20s兜看,其它機(jī)器將重新消費(fèi)已宕機(jī)的機(jī)器消費(fèi)的partition,同樣當(dāng)有新的consumer連接上后狭瞎,20s內(nèi)也會(huì)完成rebalance使得新的consumer有機(jī)會(huì)消費(fèi)partition