前言
消息隊列是分布式系統(tǒng)中重要的組件屯阀,主要解決應(yīng)用耦合,異步消息轴术,流量削鋒等問題难衰。目前在生產(chǎn)環(huán)境,使用較多的消息隊列有ActiveMQ逗栽,RabbitMQ盖袭,ZeroMQ,Kafka彼宠,MetaMQ鳄虱,RocketMQ等。由于每個消息隊列都有它的優(yōu)勢和劣勢凭峡,我們公司對于不同的場景使用了不同類型的消息隊列醇蝴。對于RocketMQ消費端存在消息消費失敗的情況,通常有兩種方式想罕,一種是consumer端知道怎么處理悠栓,另一種是consumer不能處理(broker處理)霉涨,本文對后一種情況進行介紹,consumer獲取到消息但不能正常處理(ack)惭适,接下來這個消費失敗的消息在Broker里面如何存儲和重新讓consumer消費笙瑟,針對這個流程做了深入的分析。本文中的P代表producer癞志,C代表consumer往枷,本文的consumeQueue對應(yīng)前面的topic下面的隊列。
目錄
- RocketMQ的消費與存儲結(jié)構(gòu)
- RocketMQ的消費失敗消息處理邏輯
- Broker端處理失敗消息任務(wù)的啟動
- Consumer發(fā)回消費失敗消息流程
- Broker寫發(fā)回失敗消息的流程
RocketMQ的消費與存儲結(jié)構(gòu)
正常情況下凄杯,P發(fā)送消息到broker错洁,消息內(nèi)容寫到commitlog,消息內(nèi)容在commitlog的位置信息(索引)寫到consumerQueue戒突,C讀取consumerQueue的內(nèi)容消費消息屯碴。
RocketMq的存儲結(jié)構(gòu):
本文的內(nèi)容涉及上面的消費隊列服務(wù)(consumerQueue,%RETRY%groupName屬于consumerQueue)膊存,定時消息服務(wù)(SCHEDULE_TOPIC_XXXX)兩個模塊导而,C與broker的的消息消費只涉及到consumerQueue,定時消息服務(wù)只在broker內(nèi)部起作用隔崎。
RocketMQ的消費失敗消息處理邏輯
consumer消費失敗消息處理流程圖如下:
在下面的代碼和流程分析中請結(jié)合這個圖進行分析今艺。
其中SCHEDULE_TOPIC_XXXX和%RETRY%groupName的queue都存儲在目錄 ~/store/consumequeue 里面:
ll ~/store/consumequeue 如下:
ll ~/store/consumequeue/SCHEDULE_TOPIC_XXXX 如下:
從上圖可以看出SCHEDULE_TOPIC_XXXX的隊列名稱是從2開始到17,對應(yīng)的delayLevel為3到18爵卒,3對應(yīng)10s虚缎,18對應(yīng)2h,在類MessageStoreConfig中這樣定義延時時間:String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"钓株。SCHEDULE_TOPIC_XXXX這個topic只對內(nèi)部使用遥巴,對于consumer只能消費到retry隊列的數(shù)據(jù)。
consumer消費失敗的消息發(fā)回broker后總是先寫到SCHEDULE_TOPIC_XXXX里面享幽,然后schedule service讀取SCHEDULE_TOPIC_XXXX里面的數(shù)據(jù)寫到retry隊列,consumer消費retry隊列的數(shù)據(jù)拾弃,這樣就完成了一個循環(huán)值桩,從這個過程也能看到,一個消費失敗的消息體每次發(fā)回broker需要在commitLog里面存儲兩份(topic為SCHEDULE_TOPIC_XXXX的一份這個主要是為schedule service控制延時用的豪椿,topic為%RETRY%groupName的一份)奔坟。
當(dāng)我們想查看現(xiàn)在的延時消息數(shù)量,我們可以查看SCHEDULE_TOPIC_XXXX的offset來得知搭盾,使用CLI Admin Tool工具輸入命令“sh mqadmin brokerStatus”查看處理進度咳秉。如下圖:
其中每行為一個隊列,圖中第一列為隊列的名稱鸯隅,圖中第二列參數(shù)為當(dāng)前隊列處理的offset澜建,圖中第三列為當(dāng)前隊列最大存儲的offset向挖。通過第三列和第二列的值相減能得出當(dāng)前的隊列的消息數(shù)量。
Broker端處理失敗消息任務(wù)的啟動
ScheduleMessageService根據(jù)messageDelayLevel維護了每個延遲level對應(yīng)的隊列編號炕舵,以及每個隊列編號對應(yīng)的offset何之。在start方法里面會啟動18個timerTask(DeliverDelayedMessageTimerTask),每個對應(yīng)一個level咽筋,初始o(jì)ffset為0溶推。然后就是定時任務(wù)讀取SCHEDULE_TOPIC_XXXX隊列里面的消息進行判斷,如果消息的delayLevel對應(yīng)的時間滿足重新消費奸攻,那么就會忘consumeQueue里面寫這個消息蒜危,等待C重新來消費。
Consumer發(fā)回消費失敗消息流程
在ConsumeRequest的run方法里面也就是業(yè)務(wù)端處理消息的線程里面睹耐,對于status是非success的交給ConsumeMessageConcurrentlyService(本文只討論并行消費的模式辐赞,串行模式類似)的sendMessageBack方法處理,這個方法主要設(shè)置delayLevel(context.getDelayLevelWhenNextConsume())疏橄,然后傳遞給DefaultMQPushConsumerImpl.sendMessageBack找到對應(yīng)的消息來源queue占拍,把這個消息發(fā)送到這個queue里面,也就是說消費失敗的消息發(fā)回broker還是會在之前的那個queue里面捎迫。發(fā)回broker后本地再過5秒重試消費一次晃酒,如果這次成功,下次就不再消費窄绒。
上面流程的類圖:
在ConsumeRequest的run方法里面會調(diào)用ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this)來處理消費結(jié)果狀態(tài)贝次,在cluster(集群模式)下設(shè)置新的消息delayLevle值然后把失敗的消息發(fā)回Broker,廣播模式不發(fā)回彰导。注意ConsumeConcurrentlyContext的delayLevelWhenNextConsume屬性說明-1直接放到死信隊列蛔翅,0又broker每次對重試消費次數(shù)加1來控制重試策略,大于0由consumer控制重試消費策略(在listener的consumeMessage方法里面有個context:context.setDelayLevelWhenNextConsume(4)設(shè)置為1分鐘延時消費)位谋,默認(rèn)值為0山析。
Broker寫發(fā)回失敗消息的流程
broker端收到消費失敗消息后通過consumerSendMsgBack(P發(fā)送的消息不由這個處理,區(qū)分通過消息頭的type)方法設(shè)置當(dāng)前消息的delayTimeLevel掏父,這里計算delayTimeLevel笋轨,第一次重試默認(rèn)consumer發(fā)回為0,延遲為延遲等級為0+3=3赊淑;如果第一次不為0表明是consumer控制的情況爵政,直接取出delayTimeLevel,也就是和ConsumeConcurrentlyContext(consumer端控制)的delayLevelWhenNextConsume配置一致陶缺。設(shè)置好delayLevelTime后就交給DefaultMessageStore的putMessage方法钾挟,DefaultMessageStore的putMessage方法通過Commitlog的putMessage來寫入文件,這里需要重點關(guān)注的是在這個方法里面通過msg.getDelayTimeLevel() > 0這個條件饱岸,修改當(dāng)前消息topic為SCHEDULE_TOPIC_XXXX掺出,原來的topic保留在property里面徽千,在ScheduleMessageService里面判斷消息滿足條件后會把消息的topic改為真實的topic,通常是retry蛛砰,接著寫到consumeQueue里面,C對于%RETRY%consumerGroup這個topic在程序里面默認(rèn)是訂閱的不需用戶指定罐栈,然后隊列Id的計算方式為queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()),即msg.getDelayTimeLevel()-1泥畅,和前面的截圖2到17編號一致荠诬,然后消息體寫到commitlog文件和索引寫到SCHEDULE_TOPIC_XXXX隊列。類圖如下:
SendMessageProcessor處理遠程發(fā)來的消息位仁,包括P和C的柑贞,方法里面通過RequestCode.CONSUMER_SEND_MSG_BACK來判斷是不是重試發(fā)回的消息。然后會判斷這個消息對應(yīng)的topic為%RETRY%consumerGroup的是否創(chuàng)建過聂抢,沒有則創(chuàng)建钧嘶;接下來的處理就和上面的流程圖一樣了。
總結(jié)
本文圍繞consumer端消費失敗后RocketMQ各個模塊的處理邏輯進行了源碼的深入分析琳疏。相信有了以上的知識學(xué)習(xí)和實踐之后有决,當(dāng)業(yè)務(wù)應(yīng)用遇到了類似的問題就可以胸有成竹的應(yīng)對了。