此篇文章不僅僅在于分析RocketMQ的消息不均衡原因與解決思路, 更在于分析MQ消息不均衡的原因和解決思路, 只是拿RocketMQ作為例子.
從生產(chǎn)者角度考慮,是不是生產(chǎn)者發(fā)送消息的時候,只鐘愛某幾個Queue,只將消息發(fā)送到個別Queue中,才造成消息傾斜呢?
這種情況可能會發(fā)生,但概率比較低. 而且也不是生產(chǎn)者只鐘愛某幾個Queue, 多半情況是某些條件不滿足了, 才導致把消息發(fā)送給了某些Queue中. RocketMQ是按照Queue隊列輪詢的方式將消息發(fā)送出去的, 保證生產(chǎn)者(皇帝)能臨幸所有的嬪妃(指Queue), 當然RocketMQ也考慮了發(fā)送消息延遲的情況, 盡量將消息發(fā)送到消息延遲低的broker上. 即便發(fā)送順序消息, 基于key的哈希值, 也基本可以認為消息是均衡發(fā)送到各個Queue中的.
假如我們有2個broker,topic有4個隊列, 那么我們看下,作為生產(chǎn)者它拿到的隊列信息長啥樣.
因為每個broker有4個隊列, 生產(chǎn)者獲取到了8個隊列. 而且隊列的順序不是雜亂的, 是按照相同的broker,不同的queueId的順序排列的.
假如不存在消息發(fā)送延遲的情況, TOPIC-A有4個隊列, 則broker1和broker2各有4個隊列, 那么生產(chǎn)者發(fā)送消息的時候,先發(fā)送給broker1的4個隊列,然后再發(fā)送給broker2的4個隊列,之后再次發(fā)送給broker1的4個隊列,循環(huán)往復進行...
【正常情況】
如上圖所示,生產(chǎn)者依次將消息發(fā)送給broker1-0,broker1-1,broker1-2,broker1-3,broker2-0,broker2-1,broker2-2,broker2-3的Queue隊列中,循環(huán)往復進行...
【發(fā)送失敗情況】
假如向broker2的某個隊列發(fā)送消息的時候,發(fā)送失敗了,根據(jù)重試機制,那么生產(chǎn)者會從broker1中選擇一個隊列進行發(fā)送,如下圖所示. 這種情況,似乎會造成消息傾斜的可能,畢竟生產(chǎn)者把消息發(fā)送到了一個broker上,接下來講解消費端的時候再說如何應對這種情況.
如果只有一個broker2, 沒有broker1,即便發(fā)送消息給broker2失敗了,也只能從broker2中再選擇一個其他的Queue進行發(fā)送.
【延遲容錯情況】
生產(chǎn)者每次發(fā)送消息之后,都會記錄消息發(fā)送給每個broker的延遲情況. 假如向broker2的某個隊列發(fā)送消息的時候, 發(fā)現(xiàn)broker2的延遲比較大, 那么生產(chǎn)者會暫時舍棄broker2, 而是從broker1中選擇一個隊列進行發(fā)送, 和上圖一樣, 這種情況,似乎也會造成消息傾斜的可能,畢竟生產(chǎn)者把消息發(fā)送到了一個broker上,接下來講解消費端的時候再說如何應對這種情況.
【總結】
總之,由于生產(chǎn)者導致消息不均衡的概率很低, 相對于消費端, 可以忽略生產(chǎn)者導致消息不均衡的情況了. 可以認為生產(chǎn)者是均勻的將消息發(fā)送到不同的Queue中的.
在排查消息不均衡的情況時, 可以排查一下是否存消息發(fā)送失敗和發(fā)送延遲的情況. 而且即便由于這2個原因造成發(fā)送消息時出現(xiàn)消息發(fā)送不均衡的情況, 在消費端也可以避免.
生產(chǎn)者盡量會保證消息發(fā)送,從全局的Queue隊列來看都是均衡的, 消息會均衡的發(fā)送到每個Queue. 即便某個broker出現(xiàn)了問題, 也會保證消息發(fā)送到其他broker的隊列也是均衡的.
RocketMQ消息投遞的具體策略在 org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue 方法中. 生產(chǎn)者發(fā)送消息的時候, 就會調(diào)用到這個方法, 選擇對應的隊列.
我們假設生產(chǎn)者發(fā)送的消息都是均衡的發(fā)送到不同的Queue隊列中的. 而且我們也假設每個消費者的消費能力沒有太大差別.
接下來分析消費端如何造成消息不均衡的情況的.
說明一點的是,消息不均衡,即便增加消費者,也無法解決消息不均衡.
我親自經(jīng)歷的,以及我知道別人咨詢阿里的結論, 阿里工程師針對消息不均衡的情況,是讓你增加消費者的消費能力,其實是不對的.
假如我們有2個broker,topic有4個隊列, 有2個消費者,如下圖所示
如上圖,共有8個隊列,那么這8個隊列該如何分配給2個消費者呢?
常規(guī)的方案就是平均分配, 8個隊列,2個消費者, 那么每個消費者分配4個隊列, 如下圖所示
上面說的這個策略對應的類是AllocateMessageQueueAveragely
在RocketMQ的源碼中,默認就是指定了AllocateMessageQueueAveragely策略.
public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsumer {
/**
* Queue allocation algorithm
*/
private AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
}
看似完美,而且這也是RocketMQ消費端默認的分配隊列的情況. 然而它卻有著天生的缺陷. 假如此時生產(chǎn)者有4條消息需要發(fā)送, 按照輪詢的策略,生產(chǎn)者會把消息發(fā)送給broker1的queueId=0,1,2,3 這4個隊列, 意味著只有消費者1可以消費到消息, 而消費者2一直空閑. 過了一會, 生產(chǎn)者再發(fā)送4條消息, 會把消息發(fā)送給broker2的queueId=0,1,2,3 這4個隊列, 意味著只有消費者2可以消費到消息, 而消費者1變成了一直空閑. 就會造成一個消費者忙碌一個消費者空閑的情況, 間接出現(xiàn)了消息不均衡情況. 如果出現(xiàn)了上面說的生產(chǎn)者發(fā)送失敗或發(fā)送延遲的情況, 可能消息都會發(fā)送到broker1上, 造成消費者1一直忙碌, 而消費者2無所事事, 間接出現(xiàn)了消息不均衡情況.
AllocateMessageQueueAveragely策略, 從全局看的話, 隊列是平均分配給每個消費者的, 但是從局部看的話, 隊列沒有平均分配給每個消費者. 這就是關鍵所在.
其實生產(chǎn)者沒有過失, 在隊列可用,消息發(fā)送延遲時間可控的情況下, 生產(chǎn)者一直是致力于把消息均衡發(fā)送給每個隊列, 即便broker2已經(jīng)不可使用了,在局部看來, 生產(chǎn)者依然是將消息均衡的發(fā)送到broker1的每個隊列上. 之所以導致消息不均衡的罪魁禍首其實是消費端的分配隊列的策略. 按照上面的分配隊列策略, 是不完美的, 天生會造成消息不均衡. 只要生產(chǎn)者的生產(chǎn)消息的速度足夠快, 消費者消費消息的速度明顯跟不上, 這種消息不均衡的現(xiàn)象就會明顯出現(xiàn).
RocketMQ還提供了一個分配隊列的策略, 它是AllocateMessageQueueAveragelyByCircle , 這個策略是環(huán)形平均分配, 如下圖所示
就好像消費者1和消費者2輪流從8個隊列里面挨個拿禮物一樣, 不管從全局,還是局部來看, 隊列都是平均的分配給2個消費者. 即便broker2此時不可用, 消息都被發(fā)送到了broker1上, 依然是有2個消費者來消費消息的, 而不像之前的那樣, 只有一個消費者消費消息, 避免了部分消費者處于饑餓的狀態(tài), 間接避免了消息不均衡的問題.
RocketMQ中的RebalanceImpl類負責消費端的負載均衡, 用于給消費端分配對應的隊列,源碼如下
// 源碼位置 : org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
...
}
case CLUSTERING: {
if (mqSet != null && cidAll != null) {
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
// 調(diào)用具體的分配隊列策略
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) { }
...
}
break;
}
default:
break;
}
}
【說明】
1.在原生RocketMQ v4.5.2源碼中并沒有看到可以手動指定不同分配隊列策略的入口, 也許是我沒找到
2.如果使用的是商業(yè)版的RocketMQ, 那么使用的ons-client包, 在這個包的1.8.7.4.Final中,如果實例化Consumer的時候指定了AllocateMessageQueueAveragelyByCircle策略的話, 那么就可以使用它.
ons-client底層代碼,根據(jù)程序員的配置, 使用不同的分配隊列的策略,如下
而我們能做的,就是在實例化Consumer的時候,指定AVG_BY_CIRCLE屬性, 那么商業(yè)版RocketMQ底層就是使用AllocateMessageQueueAveragelyByCircle策略用來給每個消費者分配隊列.
讀者朋友只需要知道采用什么策略可以達到什么效果即可, 具體的實現(xiàn)與具體的版本和寫法有關,根據(jù)實際情況調(diào)整下即可.
3.當然RocketMQ還有其他的分配隊列的策略,這里就沒有介紹.
4.腳本工具, 由于工作中使用的是阿里云商業(yè)版的RocketMQ, 有時候需要模擬發(fā)送和接收消息,基于官方API文檔,寫了一個HTTP協(xié)議的Python版的生產(chǎn)者和消費者, 以及TCP協(xié)議的Java版的生產(chǎn)者和消費者, 其實沒有啥技術含量,就是把官網(wǎng)的API拿過來,改吧改吧而已.
雖然我們可以采取環(huán)形平均分配的策略, 但是還需要考慮一個問題,如下圖
當消費者數(shù)量和隊列數(shù)量不匹配的時候, 總會有一些消費者比其他消費者多分配幾個隊列, 一旦生產(chǎn)者生產(chǎn)的消息速度比較快, 而消費者消費消息的速度比較慢的話, 那么多分配的幾個隊列就會造成消息傾斜消息不均衡的悲慘狀況.
導致消費者消費慢的原因,比如RPC調(diào)用慢, 數(shù)據(jù)庫查詢慢, 硬件等原因
【總結】
第一點消費端需要采取合適的隊列分配策略,比如按照環(huán)形平均分配策略(AllocateMessageQueueAveragelyByCircle),
第二點消費者的數(shù)量和隊列的數(shù)量盡量保持公平平均分配, 前面這兩點就是在說, 不僅每個消費者要分配到相同數(shù)量的隊列, 而且這些隊列還需要按照環(huán)形的方式分配給消費者.
第三點就是消費者的消費速度不能太慢, 盡量把消息盡快消費掉, 而且消費者彼此之間消費能力差別不大.
歡迎溝通交流