RocketMQ消費失敗消息深入分析(consumer,broker的具體處理邏輯)

前言

消息隊列是分布式系統(tǒng)中重要的組件屯阀,主要解決應(yīng)用耦合,異步消息轴术,流量削鋒等問題难衰。目前在生產(chǎn)環(huán)境,使用較多的消息隊列有ActiveMQ逗栽,RabbitMQ盖袭,ZeroMQ,Kafka彼宠,MetaMQ鳄虱,RocketMQ等。由于每個消息隊列都有它的優(yōu)勢和劣勢凭峡,我們公司對于不同的場景使用了不同類型的消息隊列醇蝴。對于RocketMQ消費端存在消息消費失敗的情況,通常有兩種方式想罕,一種是consumer端知道怎么處理悠栓,另一種是consumer不能處理(broker處理)霉涨,本文對后一種情況進行介紹,consumer獲取到消息但不能正常處理(ack)惭适,接下來這個消費失敗的消息在Broker里面如何存儲和重新讓consumer消費笙瑟,針對這個流程做了深入的分析。本文中的P代表producer癞志,C代表consumer往枷,本文的consumeQueue對應(yīng)前面的topic下面的隊列。

目錄

  • RocketMQ的消費與存儲結(jié)構(gòu)
  • RocketMQ的消費失敗消息處理邏輯
  • Broker端處理失敗消息任務(wù)的啟動
  • Consumer發(fā)回消費失敗消息流程
  • Broker寫發(fā)回失敗消息的流程

RocketMQ的消費與存儲結(jié)構(gòu)

正常情況下凄杯,P發(fā)送消息到broker错洁,消息內(nèi)容寫到commitlog,消息內(nèi)容在commitlog的位置信息(索引)寫到consumerQueue戒突,C讀取consumerQueue的內(nèi)容消費消息屯碴。


發(fā)送消費.png

RocketMq的存儲結(jié)構(gòu):


存儲結(jié)構(gòu).png

本文的內(nèi)容涉及上面的消費隊列服務(wù)(consumerQueue,%RETRY%groupName屬于consumerQueue)膊存,定時消息服務(wù)(SCHEDULE_TOPIC_XXXX)兩個模塊导而,C與broker的的消息消費只涉及到consumerQueue,定時消息服務(wù)只在broker內(nèi)部起作用隔崎。

RocketMQ的消費失敗消息處理邏輯

consumer消費失敗消息處理流程圖如下:


rocketmq消費失敗發(fā)回broker.png

在下面的代碼和流程分析中請結(jié)合這個圖進行分析今艺。
其中SCHEDULE_TOPIC_XXXX和%RETRY%groupName的queue都存儲在目錄 ~/store/consumequeue 里面:
ll ~/store/consumequeue 如下:


schedule and retry store.png

ll ~/store/consumequeue/SCHEDULE_TOPIC_XXXX 如下:


schedule queue.png

從上圖可以看出SCHEDULE_TOPIC_XXXX的隊列名稱是從2開始到17,對應(yīng)的delayLevel為3到18爵卒,3對應(yīng)10s虚缎,18對應(yīng)2h,在類MessageStoreConfig中這樣定義延時時間:String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"钓株。SCHEDULE_TOPIC_XXXX這個topic只對內(nèi)部使用遥巴,對于consumer只能消費到retry隊列的數(shù)據(jù)。
consumer消費失敗的消息發(fā)回broker后總是先寫到SCHEDULE_TOPIC_XXXX里面享幽,然后schedule service讀取SCHEDULE_TOPIC_XXXX里面的數(shù)據(jù)寫到retry隊列,consumer消費retry隊列的數(shù)據(jù)拾弃,這樣就完成了一個循環(huán)值桩,從這個過程也能看到,一個消費失敗的消息體每次發(fā)回broker需要在commitLog里面存儲兩份(topic為SCHEDULE_TOPIC_XXXX的一份這個主要是為schedule service控制延時用的豪椿,topic為%RETRY%groupName的一份)奔坟。
當(dāng)我們想查看現(xiàn)在的延時消息數(shù)量,我們可以查看SCHEDULE_TOPIC_XXXX的offset來得知搭盾,使用CLI Admin Tool工具輸入命令“sh mqadmin brokerStatus”查看處理進度咳秉。如下圖:


shcedule offset.png

其中每行為一個隊列,圖中第一列為隊列的名稱鸯隅,圖中第二列參數(shù)為當(dāng)前隊列處理的offset澜建,圖中第三列為當(dāng)前隊列最大存儲的offset向挖。通過第三列和第二列的值相減能得出當(dāng)前的隊列的消息數(shù)量。

Broker端處理失敗消息任務(wù)的啟動

scheduleMessage類圖.png

ScheduleMessageService根據(jù)messageDelayLevel維護了每個延遲level對應(yīng)的隊列編號炕舵,以及每個隊列編號對應(yīng)的offset何之。在start方法里面會啟動18個timerTask(DeliverDelayedMessageTimerTask),每個對應(yīng)一個level咽筋,初始o(jì)ffset為0溶推。然后就是定時任務(wù)讀取SCHEDULE_TOPIC_XXXX隊列里面的消息進行判斷,如果消息的delayLevel對應(yīng)的時間滿足重新消費奸攻,那么就會忘consumeQueue里面寫這個消息蒜危,等待C重新來消費。

Consumer發(fā)回消費失敗消息流程

consumer發(fā)送消費失敗消息.png

在ConsumeRequest的run方法里面也就是業(yè)務(wù)端處理消息的線程里面睹耐,對于status是非success的交給ConsumeMessageConcurrentlyService(本文只討論并行消費的模式辐赞,串行模式類似)的sendMessageBack方法處理,這個方法主要設(shè)置delayLevel(context.getDelayLevelWhenNextConsume())疏橄,然后傳遞給DefaultMQPushConsumerImpl.sendMessageBack找到對應(yīng)的消息來源queue占拍,把這個消息發(fā)送到這個queue里面,也就是說消費失敗的消息發(fā)回broker還是會在之前的那個queue里面捎迫。發(fā)回broker后本地再過5秒重試消費一次晃酒,如果這次成功,下次就不再消費窄绒。
上面流程的類圖:


consusmer發(fā)回broker失敗消息類圖.png

在ConsumeRequest的run方法里面會調(diào)用ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this)來處理消費結(jié)果狀態(tài)贝次,在cluster(集群模式)下設(shè)置新的消息delayLevle值然后把失敗的消息發(fā)回Broker,廣播模式不發(fā)回彰导。注意ConsumeConcurrentlyContext的delayLevelWhenNextConsume屬性說明-1直接放到死信隊列蛔翅,0又broker每次對重試消費次數(shù)加1來控制重試策略,大于0由consumer控制重試消費策略(在listener的consumeMessage方法里面有個context:context.setDelayLevelWhenNextConsume(4)設(shè)置為1分鐘延時消費)位谋,默認(rèn)值為0山析。

Broker寫發(fā)回失敗消息的流程

broker寫消費失敗的消息.png

broker端收到消費失敗消息后通過consumerSendMsgBack(P發(fā)送的消息不由這個處理,區(qū)分通過消息頭的type)方法設(shè)置當(dāng)前消息的delayTimeLevel掏父,這里計算delayTimeLevel笋轨,第一次重試默認(rèn)consumer發(fā)回為0,延遲為延遲等級為0+3=3赊淑;如果第一次不為0表明是consumer控制的情況爵政,直接取出delayTimeLevel,也就是和ConsumeConcurrentlyContext(consumer端控制)的delayLevelWhenNextConsume配置一致陶缺。設(shè)置好delayLevelTime后就交給DefaultMessageStore的putMessage方法钾挟,DefaultMessageStore的putMessage方法通過Commitlog的putMessage來寫入文件,這里需要重點關(guān)注的是在這個方法里面通過msg.getDelayTimeLevel() > 0這個條件饱岸,修改當(dāng)前消息topic為SCHEDULE_TOPIC_XXXX掺出,原來的topic保留在property里面徽千,在ScheduleMessageService里面判斷消息滿足條件后會把消息的topic改為真實的topic,通常是retry蛛砰,接著寫到consumeQueue里面,C對于%RETRY%consumerGroup這個topic在程序里面默認(rèn)是訂閱的不需用戶指定罐栈,然后隊列Id的計算方式為queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()),即msg.getDelayTimeLevel()-1泥畅,和前面的截圖2到17編號一致荠诬,然后消息體寫到commitlog文件和索引寫到SCHEDULE_TOPIC_XXXX隊列。類圖如下:


broker處理發(fā)回失敗消息類圖.png

SendMessageProcessor處理遠程發(fā)來的消息位仁,包括P和C的柑贞,方法里面通過RequestCode.CONSUMER_SEND_MSG_BACK來判斷是不是重試發(fā)回的消息。然后會判斷這個消息對應(yīng)的topic為%RETRY%consumerGroup的是否創(chuàng)建過聂抢,沒有則創(chuàng)建钧嘶;接下來的處理就和上面的流程圖一樣了。

總結(jié)

本文圍繞consumer端消費失敗后RocketMQ各個模塊的處理邏輯進行了源碼的深入分析琳疏。相信有了以上的知識學(xué)習(xí)和實踐之后有决,當(dāng)業(yè)務(wù)應(yīng)用遇到了類似的問題就可以胸有成竹的應(yīng)對了。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末空盼,一起剝皮案震驚了整個濱河市书幕,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌揽趾,老刑警劉巖台汇,帶你破解...
    沈念sama閱讀 207,248評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異篱瞎,居然都是意外死亡苟呐,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,681評論 2 381
  • 文/潘曉璐 我一進店門俐筋,熙熙樓的掌柜王于貴愁眉苦臉地迎上來牵素,“玉大人,你說我怎么就攤上這事澄者“蚀簦” “怎么了?”我有些...
    開封第一講書人閱讀 153,443評論 0 344
  • 文/不壞的土叔 我叫張陵闷哆,是天一觀的道長。 經(jīng)常有香客問我单起,道長抱怔,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,475評論 1 279
  • 正文 為了忘掉前任嘀倒,我火速辦了婚禮屈留,結(jié)果婚禮上局冰,老公的妹妹穿的比我還像新娘。我一直安慰自己灌危,他們只是感情好康二,可當(dāng)我...
    茶點故事閱讀 64,458評論 5 374
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著勇蝙,像睡著了一般沫勿。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上味混,一...
    開封第一講書人閱讀 49,185評論 1 284
  • 那天产雹,我揣著相機與錄音,去河邊找鬼翁锡。 笑死蔓挖,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的馆衔。 我是一名探鬼主播瘟判,決...
    沈念sama閱讀 38,451評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼角溃!你這毒婦竟也來了拷获?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,112評論 0 261
  • 序言:老撾萬榮一對情侶失蹤开镣,失蹤者是張志新(化名)和其女友劉穎刀诬,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體邪财,經(jīng)...
    沈念sama閱讀 43,609評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡陕壹,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,083評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了树埠。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片糠馆。...
    茶點故事閱讀 38,163評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖怎憋,靈堂內(nèi)的尸體忽然破棺而出又碌,到底是詐尸還是另有隱情,我是刑警寧澤绊袋,帶...
    沈念sama閱讀 33,803評論 4 323
  • 正文 年R本政府宣布毕匀,位于F島的核電站,受9級特大地震影響癌别,放射性物質(zhì)發(fā)生泄漏皂岔。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,357評論 3 307
  • 文/蒙蒙 一展姐、第九天 我趴在偏房一處隱蔽的房頂上張望躁垛。 院中可真熱鬧剖毯,春花似錦、人聲如沸教馆。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,357評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽待侵。三九已至瘾腰,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間舒憾,已是汗流浹背镀钓。 一陣腳步聲響...
    開封第一講書人閱讀 31,590評論 1 261
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留镀迂,地道東北人丁溅。 一個月前我還...
    沈念sama閱讀 45,636評論 2 355
  • 正文 我出身青樓,卻偏偏與公主長得像探遵,于是被迫代替她去往敵國和親窟赏。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,925評論 2 344

推薦閱讀更多精彩內(nèi)容