RocketMQ應(yīng)用——延時消息

什么是延時消息

當(dāng)消息寫入Broker后,在指定的時長后才可被消費處理的消息天试,成為延時消息

采用RocketMQ的延時消息可以實現(xiàn)定時任務(wù)功能槐壳,而無需使用定時器。典型的應(yīng)用場景是喜每,電商交易中超時未支付關(guān)閉訂單的場景

在電商平臺中务唐,訂單創(chuàng)建時會發(fā)送一條延遲消息。這條消息將會在30分鐘后投遞給后臺業(yè)務(wù)系統(tǒng)(Consumer)带兜,后臺業(yè)務(wù)系統(tǒng)收到該消息后會判斷對應(yīng)的訂單是否已經(jīng)完成支付绍哎。如果未完成,則取消訂單鞋真,將商品再次放回到庫存崇堰;如果完成支付,則忽略

延時等級

延時消息的延遲時長不支持隨意時長的延遲,是通過特定的延遲等級來指定的海诲。延時等級定義在RocketMQ服務(wù)端的MessageStoreConfig類的如下變量中

image.png

即繁莹,若指定延時等級為3,則表示延時時長為10秒特幔,即延時等級是從1開始計數(shù)的

當(dāng)然咨演,如果需要自定義的延時等級,可以通過在Broker加載的配置中新增如下配置(例如下面增加了1天這個等級)蚯斯。配置文件在RocketMQ安裝目錄下的conf目錄中

messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d

延時消息實現(xiàn)原理

image.png

具體實現(xiàn)方案是:

修改消息

image.png

Producer將消息發(fā)送到Broker后薄风,Broker會首先將消息寫入到commitlog文件,然后需要將其分發(fā)到相應(yīng)的consumequeue拍嵌。不過遭赂,在分發(fā)之前,系統(tǒng)會先判斷消息中是否帶有延時等級横辆。若沒有撇他,則直接正常分發(fā);若有則需要經(jīng)歷一個復(fù)雜的過程:

  • 修改消息的Topic為SCHEDULE_TOPIC_XXXX
  • 根據(jù)延時等級狈蚤,在consumequeue目錄中SCHEDULE_TOPIC_XXXX主題下創(chuàng)建出相應(yīng)的queueId目錄與consumequeue文件

延遲等級delayLevel與queueId的對應(yīng)關(guān)系為queueId = delayLevel - 1

需要注意困肩,在創(chuàng)建queueId目錄時,并不是一次性將所有延遲等級對應(yīng)的目錄全部創(chuàng)建完畢脆侮,而是用到哪個延遲等級創(chuàng)建哪個目錄

image.png
  • 修改消息索引單元內(nèi)容锌畸。索引單元的Message Tag HashCode部分原本存放的是消息的Tag的Hash值。現(xiàn)修改為消息的投遞時間靖避。投遞時間是指該消息被重新修改為原Topic后再次被寫入到commitlog中的事件蹋绽。投遞時間 = 消息存儲時間 + 延遲等級時間。消息存儲時間指的是消息被發(fā)送到Broker時的時間戳
  • 將消息索引寫入到SCHEDULE_TOPIC_XXXX主題下相應(yīng)的consumequeue中

SCHEDULE_TOPIC_XXXX目錄中各個延時等級Queue中的消息是如何排序的呢筋蓖?

是按照消息投遞時間排序的卸耘。一個Broker中同一等級的所有延時消息會被寫入到consumequeue目錄中SCHEDULE_TOPIC_XXXX目錄下相同Queue中。即一個Queue中消息投遞時間的延遲等級時間是相同的粘咖。那么投遞時間就取決于消息存儲時間了蚣抗。即按照消息被發(fā)送到Broker的時間進行排序的。

延遲投遞消息

Broker內(nèi)部有一個延遲消息服務(wù)類ScheuleMessageService瓮下,其會消費SCHEDULE_TOPIC_XXXX中的消息翰铡,即按照每條消息的投遞時間,將延時消息投遞到目標(biāo)Topic中讽坏。不過锭魔,在投遞之前會從commitlog中將原來寫入的消息再次讀出,并將其原來的延時等級設(shè)置為0路呜,即原消息變味了一條不延遲的普通消息迷捧。然后再次將消息投遞到目標(biāo)Topic中织咧。

ScheduleMessageService在Broker啟動時,會創(chuàng)建并啟動一個定時器漠秋,用于執(zhí)行相應(yīng)的定時任務(wù)笙蒙。系統(tǒng)會根據(jù)延時等級的個數(shù),定義響應(yīng)數(shù)量的TimerTask庆锦,每個TimerTask負責(zé)一個延遲等級消息的消費與投遞捅位。每個TimerTask都會檢測相應(yīng)Queue隊列的第一條消息是否到期。若第一條消息未到期搂抒,則后面的所有消息更不會到期艇搀;若第一條消息到期了,則將該消息投遞到目標(biāo)Topic求晶,即消費該消息

將消息重新寫入commitlog

延遲消息服務(wù)類ScheduleMessageService將延遲消息再次發(fā)送給commitlog焰雕,并再次形成新的消息索引條目,分發(fā)到相應(yīng)Queue

這其實就是一次普通消息發(fā)送誉帅。只不過這次的消息Producer是延遲消息服務(wù)類

代碼

定義DelayProducer類

DefaultMQProducer producer = new DefaultMQProducer("pg"); 
producer.setNamesrvAddr("rocketmqOS:9876"); 
producer.start(); 
for (int i = 0; i < 10; i++) { 
    byte[] body = ("Hi," + i).getBytes(); 
    Message msg = new Message("TopicB", "someTag", body); 
    // 指定消息延遲等級為3級,即延遲10s 
    msg.setDelayTimeLevel(3); 
    SendResult sendResult = producer.send(msg); 
    // 輸出消息被發(fā)送的時間 
    System.out.print(new SimpleDateFormat("mm:ss").format(new Date())); 
    System.out.println(" ," + sendResult); 
}
producer.shutdown();

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末右莱,一起剝皮案震驚了整個濱河市蚜锨,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌慢蜓,老刑警劉巖亚再,帶你破解...
    沈念sama閱讀 217,734評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異晨抡,居然都是意外死亡氛悬,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,931評論 3 394
  • 文/潘曉璐 我一進店門耘柱,熙熙樓的掌柜王于貴愁眉苦臉地迎上來如捅,“玉大人,你說我怎么就攤上這事调煎【登玻” “怎么了?”我有些...
    開封第一講書人閱讀 164,133評論 0 354
  • 文/不壞的土叔 我叫張陵士袄,是天一觀的道長悲关。 經(jīng)常有香客問我,道長娄柳,這世上最難降的妖魔是什么寓辱? 我笑而不...
    開封第一講書人閱讀 58,532評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮赤拒,結(jié)果婚禮上秫筏,老公的妹妹穿的比我還像新娘诱鞠。我一直安慰自己,他們只是感情好跳昼,可當(dāng)我...
    茶點故事閱讀 67,585評論 6 392
  • 文/花漫 我一把揭開白布般甲。 她就那樣靜靜地躺著,像睡著了一般鹅颊。 火紅的嫁衣襯著肌膚如雪敷存。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,462評論 1 302
  • 那天堪伍,我揣著相機與錄音锚烦,去河邊找鬼。 笑死帝雇,一個胖子當(dāng)著我的面吹牛涮俄,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播尸闸,決...
    沈念sama閱讀 40,262評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼彻亲,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了吮廉?” 一聲冷哼從身側(cè)響起苞尝,我...
    開封第一講書人閱讀 39,153評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎宦芦,沒想到半個月后宙址,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,587評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡调卑,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,792評論 3 336
  • 正文 我和宋清朗相戀三年抡砂,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片恬涧。...
    茶點故事閱讀 39,919評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡注益,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出溯捆,到底是詐尸還是另有隱情聊浅,我是刑警寧澤,帶...
    沈念sama閱讀 35,635評論 5 345
  • 正文 年R本政府宣布现使,位于F島的核電站低匙,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏碳锈。R本人自食惡果不足惜顽冶,卻給世界環(huán)境...
    茶點故事閱讀 41,237評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望售碳。 院中可真熱鬧强重,春花似錦绞呈、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,855評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至倘要,卻和暖如春圾亏,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背封拧。 一陣腳步聲響...
    開封第一講書人閱讀 32,983評論 1 269
  • 我被黑心中介騙來泰國打工志鹃, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人泽西。 一個月前我還...
    沈念sama閱讀 48,048評論 3 370
  • 正文 我出身青樓曹铃,卻偏偏與公主長得像,于是被迫代替她去往敵國和親捧杉。 傳聞我的和親對象是個殘疾皇子陕见,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,864評論 2 354

推薦閱讀更多精彩內(nèi)容