摘要:如果Consumer端消費(fèi)消息失敗举反,那么RocketMQ是如何對(duì)失敗的異常情況進(jìn)行處理?
前面兩篇RocketMQ消息消費(fèi)(一)/(二)篇赎离,主要從Push/Pull兩種消費(fèi)模式的簡(jiǎn)要流程涣澡、長(zhǎng)輪詢機(jī)制和Consumer端負(fù)載均衡這幾點(diǎn)內(nèi)容出發(fā)惠爽,介紹了RocketMQ消息消費(fèi)的正常流程和細(xì)節(jié)內(nèi)容,本篇內(nèi)容將主要介紹Consumer端消費(fèi)失敗的異常流程沥曹。
這里先回顧往期RocketMQ技術(shù)分享的篇幅:
(1)消息中間件—RocketMQ的RPC通信(一)
(2)消息中間件—RocketMQ的RPC通信(二)
(3)消息中間件—RocketMQ消息發(fā)送
(4)消息中間件—RocketMQ消息消費(fèi)(一)
(5)消息中間件—RocketMQ消息消費(fèi)(二)(push模式實(shí)現(xiàn))
一份名、其他MQ中間件消費(fèi)端可靠性的保障
在業(yè)務(wù)開(kāi)發(fā)中,大家一定都遇到過(guò)業(yè)務(wù)工程因?yàn)楦黝惍惓#赡苁菢I(yè)務(wù)工程本身的異常架专、JVM內(nèi)存異惩剑或者系統(tǒng)所在的虛擬機(jī)宕機(jī)等),而導(dǎo)致MQ中間件發(fā)送過(guò)來(lái)的業(yè)務(wù)消息消費(fèi)失敗而無(wú)法再次消費(fèi)該消息的情況部脚。目前想邦,很多MQ消息中間件都有相應(yīng)的機(jī)制和方法來(lái)保證Consumer端消費(fèi)消息的可靠性。下面先來(lái)看看RabbitMQ和Kafka這兩款MQ消息中間件是如何來(lái)保證消費(fèi)者端消息處理的可靠性的呢委刘?
1.1 簡(jiǎn)談RabbitMQ的手動(dòng)消息確認(rèn)ACK機(jī)制
RabbitMQ提供了消息確認(rèn)機(jī)制丧没。消費(fèi)者在訂閱隊(duì)列時(shí)鹰椒,可以在代碼中手動(dòng)設(shè)置autoAck參數(shù)為false,這時(shí)RabbitMQ會(huì)等待消費(fèi)者顯式地回復(fù)確認(rèn)信號(hào)(即為顯式地調(diào)用channel.basicAck(envelope.getDeliveryTag(), false)方法)后才從集群中的內(nèi)存(或磁盤)節(jié)點(diǎn)上移除消息呕童,從而保證了這條消息不會(huì)因?yàn)橄M(fèi)失敗而導(dǎo)致丟失漆际。
1.2 簡(jiǎn)析Kafka消息消費(fèi)的手動(dòng)提交
在Kafka中,也可以采用上面那種的消費(fèi)后的確認(rèn)機(jī)制夺饲,通過(guò)在Consumer端設(shè)置“enable.auto.commit”屬性為false后奸汇,待業(yè)務(wù)工程正常處理完消費(fèi)后,在代碼中手動(dòng)調(diào)用KafkaConsumer實(shí)例的commitSync()方法提交(ps:這里指的是同步阻塞commit消費(fèi)的偏移量往声,等待Broker端的返回響應(yīng)擂找,需要注意Broker端在對(duì)commit請(qǐng)求做出響應(yīng)之前,消費(fèi)端會(huì)處于阻塞狀態(tài)浩销,從而限制消息的處理性能和整體吞吐量)贯涎,以確保消息能夠正常被消費(fèi)。如果在消費(fèi)過(guò)程中慢洋,消費(fèi)端突然Crash塘雳,這時(shí)候消費(fèi)偏移量沒(méi)有commit,等正称粘铮恢復(fù)后依然還會(huì)處理剛剛未commit的消息败明。
二、RocketMQ消費(fèi)失敗后的消費(fèi)重試機(jī)制
對(duì)比了另外兩款MQ中間件后斑芜,接下來(lái)進(jìn)入正題肩刃,主要來(lái)說(shuō)說(shuō)RocketMQ在消費(fèi)失敗后的是如何來(lái)保證消息消費(fèi)的可靠性?
2.1 重試隊(duì)列與死信隊(duì)列的概念
在介紹RocketMQ的消費(fèi)重試機(jī)制之前杏头,需要先來(lái)說(shuō)下“重試隊(duì)列”和“死信隊(duì)列”兩個(gè)概念盈包。
(1)重試隊(duì)列:如果Consumer端因?yàn)楦鞣N類型異常導(dǎo)致本次消費(fèi)失敗,為防止該消息丟失而需要將其重新回發(fā)給Broker端保存醇王,保存這種因?yàn)楫惓o(wú)法正常消費(fèi)而回發(fā)給MQ的消息隊(duì)列稱之為重試隊(duì)列呢燥。RocketMQ會(huì)為每個(gè)消費(fèi)組都設(shè)置一個(gè)Topic名稱為“%RETRY%+consumerGroup”的重試隊(duì)列(這里需要注意的是,這個(gè)Topic的重試隊(duì)列是針對(duì)消費(fèi)組寓娩,而不是針對(duì)每個(gè)Topic設(shè)置的)叛氨,用于暫時(shí)保存因?yàn)楦鞣N異常而導(dǎo)致Consumer端無(wú)法消費(fèi)的消息〖椋考慮到異衬海恢復(fù)起來(lái)需要一些時(shí)間,會(huì)為重試隊(duì)列設(shè)置多個(gè)重試級(jí)別焊夸,每個(gè)重試級(jí)別都有與之對(duì)應(yīng)的重新投遞延時(shí)仁连,重試次數(shù)越多投遞延時(shí)就越大。RocketMQ對(duì)于重試消息的處理是先保存至Topic名稱為“SCHEDULE_TOPIC_XXXX”的延遲隊(duì)列中阱穗,后臺(tái)定時(shí)任務(wù)按照對(duì)應(yīng)的時(shí)間進(jìn)行Delay后重新保存至“%RETRY%+consumerGroup”的重試隊(duì)列中(具體細(xì)節(jié)后面會(huì)詳細(xì)闡述)饭冬。
(2)死信隊(duì)列:由于有些原因?qū)е翪onsumer端長(zhǎng)時(shí)間的無(wú)法正常消費(fèi)從Broker端Pull過(guò)來(lái)的業(yè)務(wù)消息使鹅,為了確保消息不會(huì)被無(wú)故的丟棄,那么超過(guò)配置的“最大重試消費(fèi)次數(shù)”后就會(huì)移入到這個(gè)死信隊(duì)列中昌抠。在RocketMQ中患朱,SubscriptionGroupConfig配置常量默認(rèn)地設(shè)置了兩個(gè)參數(shù),一個(gè)是retryQueueNums為1(重試隊(duì)列數(shù)量為1個(gè))炊苫,另外一個(gè)是retryMaxTimes為16(最大重試消費(fèi)的次數(shù)為16次)裁厅。Broker端通過(guò)校驗(yàn)判斷,如果超過(guò)了最大重試消費(fèi)次數(shù)則會(huì)將消息移至這里所說(shuō)的死信隊(duì)列侨艾。這里姐直,RocketMQ會(huì)為每個(gè)消費(fèi)組都設(shè)置一個(gè)Topic命名為“%DLQ%+consumerGroup"的死信隊(duì)列。一般在實(shí)際應(yīng)用中蒋畜,移入至死信隊(duì)列的消息,需要人工干預(yù)處理撞叽;
2.1 Consumer端回發(fā)消息至Broker端
在業(yè)務(wù)工程中的Consumer端(Push消費(fèi)模式下)姻成,如果消息能夠正常消費(fèi)需要在注冊(cè)的消息監(jiān)聽(tīng)回調(diào)方法中返回CONSUME_SUCCESS的消費(fèi)狀態(tài),否則因?yàn)楦黝惍惓OM(fèi)失敗則返回RECONSUME_LATER的消費(fèi)狀態(tài)愿棋。消費(fèi)狀態(tài)的枚舉類型如下所示:
public enum ConsumeConcurrentlyStatus {
//業(yè)務(wù)方消費(fèi)成功
CONSUME_SUCCESS,
//業(yè)務(wù)方消費(fèi)失敗科展,之后進(jìn)行重新嘗試消費(fèi)
RECONSUME_LATER;
}
如果業(yè)務(wù)工程對(duì)消息消費(fèi)失敗了,那么則會(huì)拋出異常并且返回這里的RECONSUME_LATER狀態(tài)糠雨。這里才睹,在消費(fèi)消息的服務(wù)線程—consumeMessageService中,將封裝好的消息消費(fèi)任務(wù)ConsumeRequest提交至線程池—consumeExecutor異步執(zhí)行甘邀。從消息消費(fèi)任務(wù)ConsumeRequest的run()方法中會(huì)執(zhí)行業(yè)務(wù)工程中注冊(cè)的消息監(jiān)聽(tīng)回調(diào)方法琅攘,并在processConsumeResult方法中根據(jù)業(yè)務(wù)工程返回的狀態(tài)(CONSUME_SUCCESS或者RECONSUME_LATER)進(jìn)行判斷和做對(duì)應(yīng)的處理(下面講的都是在消費(fèi)通信模式為集群模型下的,廣播模型下的比較簡(jiǎn)單就不再分析了)松邪。
(1)業(yè)務(wù)方正常消費(fèi)(CONSUME_SUCCESS):正常情況下坞琴,設(shè)置ackIndex的值為consumeRequest.getMsgs().size() - 1,因此后面的遍歷consumeRequest.getMsgs()消息集合條件不成立逗抑,不會(huì)調(diào)用回發(fā)消費(fèi)失敗消息至Broker端的方法—sendMessageBack(msg, context)剧辐。最后,更新消費(fèi)的偏移量邮府;
(2)業(yè)務(wù)方消費(fèi)失斢亍(RECONSUME_LATER):異常情況下,設(shè)置ackIndex的值為-1褂傀,這時(shí)就會(huì)進(jìn)入到遍歷consumeRequest.getMsgs()消息集合的for循環(huán)中忍啤,執(zhí)行回發(fā)消息的方法—sendMessageBack(msg, context)。這里紊服,首先會(huì)根據(jù)brokerName得到Broker端的地址信息檀轨,然后通過(guò)網(wǎng)絡(luò)通信的Remoting模塊發(fā)送RPC請(qǐng)求到指定的Broker上胸竞,如果上述過(guò)程失敗,則創(chuàng)建一條新的消息重新發(fā)送給Broker参萄,此時(shí)新消息的Topic為“%RETRY%+ConsumeGroupName”—重試隊(duì)列的主題卫枝。其中,在MQClientAPIImpl實(shí)例的consumerSendMessageBack()方法中封裝了ConsumerSendMsgBackRequestHeader的請(qǐng)求體讹挎,隨后完成回發(fā)消費(fèi)失敗消息的RPC通信請(qǐng)求(業(yè)務(wù)請(qǐng)求碼為:CONSUMER_SEND_MSG_BACK)校赤。倘若上面的回發(fā)消息流程失敗,則會(huì)延遲5S后重新在Consumer端進(jìn)行重新消費(fèi)筒溃。與正常消費(fèi)的情況一樣马篮,在最后更新消費(fèi)的偏移量;
2.3 Broker端對(duì)于回發(fā)消息處理的主要流程
Broker端收到這條Consumer端回發(fā)過(guò)來(lái)的消息后怜奖,通過(guò)業(yè)務(wù)請(qǐng)求碼(CONSUMER_SEND_MSG_BACK)匹配業(yè)務(wù)處理器—SendMessageProcessor來(lái)處理浑测。在完成一系列的前置校驗(yàn)(這里主要是“消費(fèi)分組是否存在”、“檢查Broker是否有寫(xiě)入權(quán)限”歪玲、“檢查重試隊(duì)列數(shù)是否大于0”等)后迁央,嘗試獲取重試隊(duì)列的TopicConfig對(duì)象(如果是第一次無(wú)法獲取到,則調(diào)用createTopicInSendMessageBackMethod()方法進(jìn)行創(chuàng)建)滥崩。根據(jù)回發(fā)過(guò)來(lái)的消息偏移量嘗試從commitlog日志文件中查詢消息內(nèi)容岖圈,若不存在則返回異常錯(cuò)誤。
然后钙皮,設(shè)置重試隊(duì)列的Topic—“%RETRY%+consumerGroup”至MessageExt的擴(kuò)展屬性“RETRY_TOPIC”中蜂科,并對(duì)根據(jù)延遲級(jí)別delayLevel和最大重試消費(fèi)次數(shù)maxReconsumeTimes進(jìn)行判斷,如果超過(guò)最大重試消費(fèi)次數(shù)(默認(rèn)16次)短条,則會(huì)創(chuàng)建死信隊(duì)列的TopicConfig對(duì)象(用于后面將回發(fā)過(guò)來(lái)的消息移入死信隊(duì)列)导匣。在構(gòu)建完成需要落盤的MessageExtBrokerInner對(duì)象后,調(diào)用“commitLog.putMessage(msg)”方法做消息持久化茸时。這里逐抑,需要注意的是,在putMessage(msg)的方法里會(huì)使用“SCHEDULE_TOPIC_XXXX”和對(duì)應(yīng)的延遲級(jí)別隊(duì)列Id分別替換MessageExtBrokerInner對(duì)象的Topic和QueueId屬性值屹蚊,并將原來(lái)設(shè)置的重試隊(duì)列主題(“%RETRY%+consumerGroup”)的Topic和QueueId屬性值做一個(gè)備份分別存入擴(kuò)展屬性properties的“REAL_TOPIC”和“REAL_QID”屬性中厕氨。看到這里也就大致明白了汹粤,回發(fā)給Broker端的消費(fèi)失敗的消息并非直接保存至重試隊(duì)列中命斧,而是會(huì)先存至Topic為“SCHEDULE_TOPIC_XXXX”的定時(shí)延遲隊(duì)列中。
疑問(wèn):上面說(shuō)了RocketMQ的重試隊(duì)列的Topic是“%RETRY%+consumerGroup”嘱兼,為啥這里要保存至Topic是“SCHEDULE_TOPIC_XXXX”的這個(gè)延遲隊(duì)列中呢国葬?
在源碼中搜索下關(guān)鍵字—“SCHEDULE_TOPIC_XXXX”,會(huì)發(fā)現(xiàn)Broker端還存在著一個(gè)后臺(tái)服務(wù)線程—ScheduleMessageService(通過(guò)消息存儲(chǔ)服務(wù)—DefaultMessageStore啟動(dòng)),通過(guò)查看源碼可以知道其中有一個(gè)DeliverDelayedMessageTimerTask定時(shí)任務(wù)線程會(huì)根據(jù)Topic(“SCHEDULE_TOPIC_XXXX”)與QueueId汇四,先查到邏輯消費(fèi)隊(duì)列ConsumeQueue接奈,然后根據(jù)偏移量,找到ConsumeQueue中的內(nèi)存映射對(duì)象通孽,從commitlog日志中找到消息對(duì)象MessageExt序宦,并做一個(gè)消息體的轉(zhuǎn)換(messageTimeup()方法,由定時(shí)延遲隊(duì)列消息轉(zhuǎn)化為重試隊(duì)列的消息)背苦,再次做持久化落盤互捌,這時(shí)候才會(huì)真正的保存至重試隊(duì)列中⌒屑粒看到這里就可以解釋上面的疑問(wèn)了秕噪,定時(shí)延遲隊(duì)列只是為了用于暫存的,然后延遲一段時(shí)間再將消息移入至重試隊(duì)列中厚宰。RocketMQ設(shè)定不同的延時(shí)級(jí)別delayLevel腌巾,并且與定時(shí)延遲隊(duì)列相對(duì)應(yīng),具體源碼如下:
//省略
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
/**
* 定時(shí)延時(shí)消息主題的隊(duì)列與延遲等級(jí)對(duì)應(yīng)關(guān)系
* @param delayLevel
* @return
*/
public static int delayLevel2QueueId(final int delayLevel) {
return delayLevel - 1;
}
2.4 Consumer端消費(fèi)重試機(jī)制
每個(gè)Consumer實(shí)例在啟動(dòng)的時(shí)候就默認(rèn)訂閱了該消費(fèi)組的重試隊(duì)列主題铲觉,DefaultMQPushConsumerImpl的copySubscription()方法中的相關(guān)代碼如下:
private void copySubscription() throws MQClientException {
//省略其他代碼...
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
break;
case CLUSTERING://如果消息消費(fèi)模式為集群模式壤躲,還需要為該消費(fèi)組對(duì)應(yīng)一個(gè)重試主題
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}
//省略其他代碼...
}
因此,這里也就清楚了备燃,Consumer端會(huì)一直訂閱該重試隊(duì)列主題的消息,向Broker端發(fā)送如下的拉取消息的PullRequest請(qǐng)求凌唬,以嘗試重新再次消費(fèi)重試隊(duì)列中積壓的消息并齐。
PullRequest [consumerGroup=CID_JODIE_1, messageQueue=MessageQueue [topic=%RETRY%CID_JODIE_1, brokerName=HQSKCJJIDRRD6KC, queueId=0], nextOffset=51]
最后,給出一張RocketMQ消息重試機(jī)制的框圖(ps:這里只是描述了消息消費(fèi)失敗后重試?yán)〉牟糠种匾^(guò)程):
三客税、總結(jié)
RocketMQ的消息消費(fèi)(三)(消息消費(fèi)重試)篇幅就先分析到這里了况褪。關(guān)于RocketMQ消息消費(fèi)的內(nèi)容比較多也比較復(fù)雜,需要讀者結(jié)合源碼并多次debug(可以通過(guò)分別在Consumer端和Broker端的部分重要方法中打印重要對(duì)象中的各個(gè)屬性值的方式更耻,來(lái)仔細(xì)研究下其中的過(guò)程)测垛,才可以對(duì)其有一個(gè)較為深刻的理解。限于筆者的才疏學(xué)淺秧均,對(duì)本文內(nèi)容可能還有理解不到位的地方食侮,如有闡述不合理之處還望留言一起探討。
我的博客即將搬運(yùn)同步至騰訊云+社區(qū)目胡,邀請(qǐng)大家一同入駐:https://cloud.tencent.com/developer/support-plan?invite_code=f5z033h1gm2x