1酷含、前言
對于所有的 MQ 來說汪茧,必問的一道面試題就是 RocketMQ 順序消息怎樣做?原理是什么什往?
首先我們要明確什么順序消費(fèi)慌闭,順序消費(fèi)的定義是什么躯舔?我所理解的順序消費(fèi),指的針對某一類消息丧失,比如都是訂單A 的消息來說惜互,它的消費(fèi)有先后順序琳拭,類似于 FIFO描验。假設(shè)訂單 A 有創(chuàng)建、付款膘流、完成這幾類消息,我們對于訂單 A 的消息耕魄,必須要滿足先消費(fèi)創(chuàng)建彭谁,其次是付款,最后是完成奄抽。
所以針對整個鏈路來說甩鳄,我們不僅需要塞的時(shí)候是有序的,消費(fèi)的時(shí)候也應(yīng)該做到有序妙啃。就算是以 FIFO 順序塞進(jìn)去,消費(fèi)如果使用多線程同時(shí)消費(fèi)同一個 ConsumerQueue 且同時(shí)能消費(fèi)多個消息馆匿,那必然做不到有序燥滑。接下來,會從 provider铭拧、consumer 兩個方面說明如何做到有序。
首先針對順序消息呕臂,生產(chǎn)者可以是多線程的肪跋,只要保證每個線程發(fā)的是不同類型的消息(如發(fā)生不同訂單的消息),那么在不同的分區(qū)就可以保證有序谜洽;
2、實(shí)現(xiàn)
針對 provider 來說阐虚,RocketMQ 提供了發(fā)送順序消息的方式,即 MessageQueueSelector:
public interface MessageQueueSelector {
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
provider 在發(fā)送的時(shí)候贸宏,只要選擇消息發(fā)送到那個 ConsumerQueue 即可磕洪。比如訂單來說,使用訂單 id 作為 key 選擇隊(duì)列鲫咽,那么同一個訂單的消息必定能發(fā)送到同一個隊(duì)列谷异。
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg; //根據(jù)訂單id選擇發(fā)送queue
long index = id % mqs.size();
return mqs.get((int) index);
}
}, orderList.get(i).getOrderId());//訂單id
所以 provider 的順序發(fā)送異常簡單。
針對 consumer 來說箩绍,需要使用 MessageListenerOrderly 來消費(fèi)消息:
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// 可以看到每個queue有唯一的consume線程來消費(fèi), 訂單對每個queue(分區(qū))有序
System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer 順序消費(fèi)的原理也很簡單尺上。消費(fèi)者消費(fèi)消息的時(shí)候,會有一個 PullMessageService 拉取線程(單線程)拉取消息卑吭,然后放入到 processQueue(每個消費(fèi)隊(duì)列對應(yīng)一個 processQueue) 中马绝,因?yàn)槭菃尉€程拉取的,對于同一個隊(duì)列的消息(雖然消費(fèi)者可以訂閱多個隊(duì)列富稻,但是對于同一個隊(duì)列是有序的)是有序的唉窃。在放入 processQueue 之后纹笼,會調(diào)用 ConsumeMessageConcurrentlyService 或 ConsumeMessageOrderlyService 來進(jìn)行消費(fèi),這里是調(diào)用 ConsumeMessageOrderlyService 進(jìn)行消費(fèi)。ConsumeMessageOrderlyService 在消費(fèi)的時(shí)候件已,會先獲取每一個 ConsumerQueue 的鎖元暴,然后從 processQueue 獲取消息消費(fèi),這也就意味著茉盏,對于每一個 ConsumerQueue 的消息來說,消費(fèi)的邏輯也是順序的铜秆。
3讶迁、缺點(diǎn)
- 發(fā)送順序消息無法利用集群 FailOver 特性(發(fā)送時(shí)已經(jīng)選定發(fā)到哪個隊(duì)列)
- 消費(fèi)順序消息的并行度依賴于隊(duì)列數(shù)量
- 隊(duì)列熱點(diǎn)問題,個別隊(duì)列由于哈希不均導(dǎo)致消息過多啸驯,消費(fèi)速度跟不上祟峦,產(chǎn)生消息堆積問題
- 遇到消息失敗的消息,無法跳過宅楞,當(dāng)前隊(duì)列消費(fèi)暫停
不能更換MessageQueue重試就需要MessageQueue有自己的副本,通過Raft搓幌、Paxos之類的算法保證有可用的副本迅箩,或者通過其他高可用的存儲設(shè)備來存儲MessageQueue。
熱點(diǎn)問題好像沒有什么好的解決辦法饲趋,只能通過拆分MessageQueue和優(yōu)化路由方法來盡量均衡的將消息分配到不同的MessageQueue奕塑。
消費(fèi)并行度理論上不會有太大問題,因?yàn)镸essageQueue的數(shù)量可以調(diào)整龄砰。
消費(fèi)失敗的無法跳過是不可避免的讨衣,因?yàn)樘^可能導(dǎo)致后續(xù)的數(shù)據(jù)處理都是錯誤的式镐。不過可以提供一些策略娘汞,由用戶根據(jù)錯誤類型來決定是否跳過,并且提供重試隊(duì)列之類的功能你弦,在跳過之后用戶可以在“其他”地方重新消費(fèi)到這條消息。
4扮叨、感悟
其實(shí)對于所謂的順序消費(fèi)來說领迈,本質(zhì)上是類似于一個狀態(tài)機(jī)的行為,比如一個訂單先創(chuàng)建狸捅,后付款、最后結(jié)束的行為磁浇,完全可以定義一個狀態(tài)朽褪,而且發(fā)生的順序是有先后的。所以完全不必要使用什么順序消費(fèi)衍锚,可以先創(chuàng)建嗤堰,把創(chuàng)建消息塞到 mq,從 mq 獲取到創(chuàng)建消息消費(fèi)踢匣,然后創(chuàng)建一個付款消息,再塞到 mq后专。然后從 mq 消費(fèi)付款消息输莺,然后標(biāo)識訂單結(jié)束裸诽。完全可以用一個狀態(tài)機(jī) + mq + db 來做建瘫,更加穩(wěn)定通用尸折。