什么是延時消息
當(dāng)消息寫入Broker后,在指定的時長后才可被消費處理的消息天试,成為延時消息
采用RocketMQ的延時消息可以實現(xiàn)定時任務(wù)功能槐壳,而無需使用定時器。典型的應(yīng)用場景是喜每,電商交易中超時未支付關(guān)閉訂單的場景
在電商平臺中务唐,訂單創(chuàng)建時會發(fā)送一條延遲消息。這條消息將會在30分鐘后投遞給后臺業(yè)務(wù)系統(tǒng)(Consumer)带兜,后臺業(yè)務(wù)系統(tǒng)收到該消息后會判斷對應(yīng)的訂單是否已經(jīng)完成支付绍哎。如果未完成,則取消訂單鞋真,將商品再次放回到庫存崇堰;如果完成支付,則忽略
延時等級
延時消息的延遲時長不支持隨意時長的延遲,是通過特定的延遲等級來指定的海诲。延時等級定義在RocketMQ服務(wù)端的MessageStoreConfig類的如下變量中
即繁莹,若指定延時等級為3,則表示延時時長為10秒特幔,即延時等級是從1開始計數(shù)的
當(dāng)然咨演,如果需要自定義的延時等級,可以通過在Broker加載的配置中新增如下配置(例如下面增加了1天這個等級)蚯斯。配置文件在RocketMQ安裝目錄下的conf目錄中
messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d
延時消息實現(xiàn)原理
具體實現(xiàn)方案是:
修改消息
Producer將消息發(fā)送到Broker后薄风,Broker會首先將消息寫入到commitlog文件,然后需要將其分發(fā)到相應(yīng)的consumequeue拍嵌。不過遭赂,在分發(fā)之前,系統(tǒng)會先判斷消息中是否帶有延時等級横辆。若沒有撇他,則直接正常分發(fā);若有則需要經(jīng)歷一個復(fù)雜的過程:
- 修改消息的Topic為SCHEDULE_TOPIC_XXXX
- 根據(jù)延時等級狈蚤,在consumequeue目錄中SCHEDULE_TOPIC_XXXX主題下創(chuàng)建出相應(yīng)的queueId目錄與consumequeue文件
延遲等級delayLevel與queueId的對應(yīng)關(guān)系為queueId = delayLevel - 1
需要注意困肩,在創(chuàng)建queueId目錄時,并不是一次性將所有延遲等級對應(yīng)的目錄全部創(chuàng)建完畢脆侮,而是用到哪個延遲等級創(chuàng)建哪個目錄
- 修改消息索引單元內(nèi)容锌畸。索引單元的Message Tag HashCode部分原本存放的是消息的Tag的Hash值。現(xiàn)修改為消息的投遞時間靖避。投遞時間是指該消息被重新修改為原Topic后再次被寫入到commitlog中的事件蹋绽。投遞時間 = 消息存儲時間 + 延遲等級時間。消息存儲時間指的是消息被發(fā)送到Broker時的時間戳
- 將消息索引寫入到SCHEDULE_TOPIC_XXXX主題下相應(yīng)的consumequeue中
SCHEDULE_TOPIC_XXXX目錄中各個延時等級Queue中的消息是如何排序的呢筋蓖?
是按照消息投遞時間排序的卸耘。一個Broker中同一等級的所有延時消息會被寫入到consumequeue目錄中SCHEDULE_TOPIC_XXXX目錄下相同Queue中。即一個Queue中消息投遞時間的延遲等級時間是相同的粘咖。那么投遞時間就取決于消息存儲時間了蚣抗。即按照消息被發(fā)送到Broker的時間進行排序的。
延遲投遞消息
Broker內(nèi)部有一個延遲消息服務(wù)類ScheuleMessageService瓮下,其會消費SCHEDULE_TOPIC_XXXX中的消息翰铡,即按照每條消息的投遞時間,將延時消息投遞到目標(biāo)Topic中讽坏。不過锭魔,在投遞之前會從commitlog中將原來寫入的消息再次讀出,并將其原來的延時等級設(shè)置為0路呜,即原消息變味了一條不延遲的普通消息迷捧。然后再次將消息投遞到目標(biāo)Topic中织咧。
ScheduleMessageService在Broker啟動時,會創(chuàng)建并啟動一個定時器漠秋,用于執(zhí)行相應(yīng)的定時任務(wù)笙蒙。系統(tǒng)會根據(jù)延時等級的個數(shù),定義響應(yīng)數(shù)量的TimerTask庆锦,每個TimerTask負責(zé)一個延遲等級消息的消費與投遞捅位。每個TimerTask都會檢測相應(yīng)Queue隊列的第一條消息是否到期。若第一條消息未到期搂抒,則后面的所有消息更不會到期艇搀;若第一條消息到期了,則將該消息投遞到目標(biāo)Topic求晶,即消費該消息
將消息重新寫入commitlog
延遲消息服務(wù)類ScheduleMessageService將延遲消息再次發(fā)送給commitlog焰雕,并再次形成新的消息索引條目,分發(fā)到相應(yīng)Queue
這其實就是一次普通消息發(fā)送誉帅。只不過這次的消息Producer是延遲消息服務(wù)類
代碼
定義DelayProducer類
DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr("rocketmqOS:9876");
producer.start();
for (int i = 0; i < 10; i++) {
byte[] body = ("Hi," + i).getBytes();
Message msg = new Message("TopicB", "someTag", body);
// 指定消息延遲等級為3級,即延遲10s
msg.setDelayTimeLevel(3);
SendResult sendResult = producer.send(msg);
// 輸出消息被發(fā)送的時間
System.out.print(new SimpleDateFormat("mm:ss").format(new Date()));
System.out.println(" ," + sendResult);
}
producer.shutdown();