什么是定時消息和延遲消息?
- 定時消息:Producer 將消息發(fā)送到 MQ 服務端撩匕,但并不期望這條消息立馬投遞鹰晨,而是推遲到在當前時間點之后的某一個時間投遞到 Consumer 進行消費,該消息即定時消息止毕。
- 延遲消息:Producer 將消息發(fā)送到 MQ 服務端模蜡,但并不期望這條消息立馬投遞,而是延遲一定時間后才投遞到 Consumer 進行消費扁凛,該消息即延時消息忍疾。
定時消息與延遲消息在代碼配置上存在一些差異,但是最終達到的效果相同:消息在發(fā)送到 MQ 服務端后并不會立馬投遞谨朝,而是根據(jù)消息中的屬性延遲固定時間后才投遞給消費者卤妒。
目前業(yè)界MQ對定時消息和延遲消息的支持情況
上圖是阿里云上對業(yè)界MQ功能的對比,其中開源產(chǎn)品中只有阿里的RocketMQ支持延遲消息字币,且是固定的18個Level则披。
固定Level的含義是延遲是特定級別的,比如支持3秒洗出、5秒的Level士复,那么用戶只能發(fā)送3秒延遲或者5秒延遲,不能發(fā)送8秒延遲的消息翩活。
消息隊列RocketMQ的阿里云版本(收費版本)才支持到精確到秒級別的延遲消息(沒有特定Level的限制)阱洪。
上圖是CMQ中對MQ功能的對比便贵,其中標明騰訊的CMQ支持延遲消息,但是沒有具體寫明支持到什么精度冗荸,支持任意時間還是特定的Level嫉沽。
通過騰訊云上CMQ的API文檔可以看到有一個秒級別的delaySeconds,應該是支持任意級別的延遲俏竞,即和收費版本的RocketMQ一致。
總結(jié)
- 開源版本中堂竟,只有RocketMQ支持延遲消息魂毁,且只支持18個特定級別的延遲
- 付費版本中,阿里云和騰訊云上的MQ產(chǎn)品都支持精度為秒級別的延遲消息
(真是有錢能使鬼推磨啊出嘹,有錢就能發(fā)任意延遲的消息了席楚,沒錢最多只能發(fā)特定Level了)
任意延遲的消息難點在哪里?
開源版本沒有支持任意延遲的消息,我想可能有以下幾個原因:
- 任意延遲的消息的需求不強烈
- 可能是一個比較有技術含量的點,不愿意開源
需求不強
對支持任意延遲的需求確實不強祭犯,因為:
- 延遲并不是MQ場景的核心功能壤蚜,業(yè)務單獨做一個替代方案的成本不大
- 業(yè)務上一般對延遲的需求都是固定的,比如下單后半小時check是否付款枯怖,發(fā)貨后7天check是否收貨
在我司,MQ上線一年多后才有業(yè)務方希望我能支持延遲消息,且不要求任意延遲抛寝,只要求和RocketMQ開源版本一致,支持一些業(yè)務上的級別即可曙旭。
不愿意開源
為了差異化(好在云上賣錢)盗舰,只能降開源版本的功能進行閹割,所以開源版本的RocketMQ變成了只支持特定Level的延遲桂躏。
難點在哪里钻趋?
既然業(yè)務有需求,我們肯定也要去支持剂习。
首先蛮位,我們先劃清楚定義和邊界:
在我們的系統(tǒng)范圍內(nèi),支持任意延遲的消息指的是:
- 精度支持到秒級別
- 最大支持30天的延遲
本著對自己的高要求鳞绕,我們并不滿足于開源RocketMQ的18個Level的方案土至。那么,如果我們自己要去實現(xiàn)一個支持任意延遲的消息隊列猾昆,難點在哪里呢陶因?
- 排序
- 消息存儲
首先,支持任意延遲意味著消息是需要在服務端進行排序的垂蜗。
比如用戶先發(fā)了一條延遲1分鐘的消息楷扬,一秒后發(fā)了一條延遲3秒的消息解幽,顯然延遲3秒的消息需要先被投遞出去。那么服務端在收到消息后需要對消息進行排序后再投遞出去烘苹。
在MQ中躲株,為了保證可靠性,消息是需要落盤的镣衡,且對性能和延遲的要求霜定,決定了在服務端對消息進行排序是完全不可接受的。
其次廊鸥,目前MQ的方案中都是基于WAL的方式實現(xiàn)的(RocketMQ望浩、Kafka),日志文件會被過期刪除惰说,一般會保留最近一段時間的數(shù)據(jù)磨德。
支持任意級別的延遲,那么需要保存最近30天的消息吆视。
阿里內(nèi)部 1000+ 核心應用使用典挑,每天流轉(zhuǎn)幾千億條消息,經(jīng)過雙11交易啦吧、商品等核心鏈路真實場景的驗證您觉,穩(wěn)定可靠。
考慮一下一天幾千億的消息授滓,保存30天的話需要堆多少服務器顾犹,顯然是無法做到的。
知己知彼
雖然決定自己做褒墨,但是依舊需要先了解開源的實現(xiàn)炫刷,那么就只能看看RocketMQ開源版本中,支持18個Level是怎么實現(xiàn)的郁妈,希望能從中得到一些靈感浑玛。
上圖是通過RocketMQ源碼分析后簡化一個實現(xiàn)原理方案示意圖。
分為兩個部分:
- 消息的寫入
- 消息的Schedule
消息寫入中:
- 在寫入CommitLog之前噩咪,如果是延遲消息顾彰,替換掉消息的Topic和queueId(被替換為延遲消息特定的Topic,queueId則為延遲級別對應的id)
- 消息寫入CommitLog之后胃碾,提交dispatchRequest到DispatchService
- 因為在第①步中Topic和QueueId被替換了涨享,所以寫入的ConsumeQueue實際上非真正消息應該所屬的ConsumeQueue,而是寫入到ScheduledConsumeQueue中(這個特定的Queue存放不會被消費)
Schedule過程中:
- 給每個Level設置定時器仆百,從ScheduledConsumeQueue中讀取信息
- 如果ScheduledConsumeQueue中的元素已近到時厕隧,那么從CommitLog中讀取消息內(nèi)容,恢復成正常的消息內(nèi)容寫入CommitLog
- 寫入CommitLog后提交dispatchRequest給DispatchService
- 因為在寫入CommitLog前已經(jīng)恢復了Topic等屬性,所以此時DispatchService會將消息投遞到正確的ConsumeQueue中
回顧一下這個方案吁讨,最大的優(yōu)點就是沒有了排序:
- 先發(fā)一條level是5s的消息髓迎,再發(fā)一條level是3s的消息,因為他們會屬于不同的ScheduleQueue所以投遞順序能保持正確
- 如果先后發(fā)兩條level相同的消息建丧,那么他們的處于同一個ConsumeQueue且保持發(fā)送順序
- 因為level數(shù)固定排龄,每個level的有自己獨立的定時器,開銷也不會很大
- ScheduledConsumeQueue其實是一個普通的ConsumeQueue翎朱,所以可靠性等都可以按照原系統(tǒng)的M-S結(jié)構等得到保障
但是這個方案也有一些問題:
- 固定了Level橄维,不夠靈活,最多只能支持18個Level
- 業(yè)務是會變的拴曲,但是Level需要提前劃分争舞,不支持修改
- 如果要支持30天的延遲,CommitLog的量會很大疗韵,這塊怎么處理沒有看到
站在巨人的肩膀上
總結(jié)RocketMQ的方案,通過劃分Level的方式侄非,將排序操作轉(zhuǎn)換為了O(1)的ConsumeQueue 的append操作蕉汪。
我們?nèi)ブС秩我庋舆t的消息,必然也需要通過類似的方式避免掉排序逞怨。
此時我們想到了TimeWheel:《Hashed and Hierarchical Timing Wheels: Data Structures for the Efficient Implementation of a Timer Facility 》
Netty中也是用TimeWheel來優(yōu)化I/O超時的操作者疤。
TimeWheel
TimeWheel的大致原理如下:
- 箭頭按照一定方向固定頻率移動(如手表指針),每一次跳動稱為一個tick叠赦。ticksPerWheel表示一個定時輪上的tick數(shù)驹马。
如每次tick為1秒,ticksPerWheel為60除秀,那么這就和現(xiàn)實中的秒針走動完全一致糯累。
TimeWheel應用到延遲消息中
無論定時消息還是延遲消息,最終都是投遞后延遲一段時間對用戶可見册踩。
假設這個延遲時間為X秒泳姐,那么X%(ticksPerWheel * tick)可以計算出X所屬的TimeWheel中位置。
這里存在一個問題暂吉,以上圖為例胖秒,TimeWheel的size為8,那么延遲1秒和9秒的消息都處在一個鏈表中慕的。如果用戶先發(fā)了延遲9秒的消息再發(fā)了延遲1秒的消息阎肝,他們在一個鏈表中所以延遲1秒的消息會需要等待延遲9秒的消息先投遞。顯然這是不能接受的肮街,那么如何解決這個問題风题?
排序
顯然,如果對TimeWheel一個tick中的元素進行排序顯然就解決了上面的問題。但是顯而易見的是排序是不可能的俯邓。
擴大時間輪
最直觀的方式骡楼,我們能不能通過擴大時間輪的方式避免延遲9和延遲1落到一個tick位置上?
假設支持30天稽鞭,精度為1秒鸟整,那么ticksPerWheel=30 * 24 * 60 * 60,這樣每一個tick上的延遲都是一致的朦蕴,不存在上述的問題(類似于將RocketMQ的Level提升到了30 * 24 * 60 * 60個)篮条。但是TimeWheel需要被加載到內(nèi)存操作,這顯然是無法接受的吩抓。
多級時間輪
單個TimeWheel無法支持涉茧,那么能否顯示中的時針、分針的形式疹娶,構建多級時間輪來解決呢伴栓?
多級時間輪解決了上述的問題,但是又引入了新的問題:
- 在整點(tick指向0的位置)需要加載大量的數(shù)據(jù)會導致延遲雨饺,比如第二個時間輪到整點需要加載未來一天的數(shù)據(jù)
- 時間輪需要載入到內(nèi)存钳垮,這個開銷是不可接受的
延遲加載
多級定時輪的問題在于需要加載大量數(shù)據(jù)到內(nèi)存,那么能否優(yōu)化一下將這里的數(shù)據(jù)延遲加載到內(nèi)存來解決內(nèi)存開銷的問題呢额港?
在多級定時輪的方案中饺窿,顯然對于未來一小時或者未來一天的數(shù)據(jù)可以不加載到內(nèi)存,而可以只加載延遲時間臨近的消息移斩。
進一步優(yōu)化肚医,可以將數(shù)據(jù)按照固定延遲間隔劃分,那么每次加載的數(shù)據(jù)量是大致相同的向瓷,不會出tick約大的定時輪需要加載越多的數(shù)據(jù)肠套,那么方案如下:
基于上述的方案,那么TimeWheel中存儲未來30分鐘需要投遞的消息的索引猖任,索引為一個long型糠排,那么數(shù)據(jù)量為:30 * 60 * 8 * TPS,相對來說內(nèi)存開銷是可以接受的超升,比如TPS為1w那么大概開銷為200M+入宦。
之后的數(shù)據(jù)按照每30分鐘一個塊的形式寫入文件,那么每個整點時的操作就是計算一下將30分鐘的消息Hash到對應的TimeWheel上室琢,那么排序問題就解決了乾闰。
到此為止就只剩下一個問題,如何保存30天的數(shù)據(jù)盈滴?
CommitLog保存超長延遲的數(shù)據(jù)
CommitLog是有時效性的涯肩,比如在我們只保存最近7天的消息轿钠,過期數(shù)據(jù)將被刪除。對于延遲消息病苗,可能需要30天之后投遞疗垛,顯然是不能被刪除的。
那么我們怎么保存延遲消息呢硫朦?
直觀的方法就是將延遲消息從CommitLog中剝離出來贷腕,獨立存儲以保存更長的時間。
通過DispatchService將WAL中的延遲消息寫入到獨立的文件中咬展。這些文件按照延遲時間組成一個鏈表泽裳。
鏈表長度為最大延遲時間/每個文件保存的時間長度。
那么WAL可以按照正常的策略進行過期刪除破婆,Delay Msg File則在一個文件投遞完之后進行刪除涮总。
唯一的問題是這里會有Delay Msg File帶來的隨機寫問題,但是這個對系統(tǒng)整體性能不會有很大影響祷舀,在可接受范圍內(nèi)瀑梗。
BOUNS
結(jié)合TimeWheel和CommitLog保存超長延遲數(shù)據(jù)的方案,加上一些優(yōu)化手段裳扯,基本就完成了支持任意延遲時間的方案:
- 消息寫入WAL
- Dispatcher處理延遲消息
- 延遲消息一定時間的直接寫入TimeWheel
- 延遲超過一定時間寫入DelayMessageStorage
- DelayMessageStorage對DelayMsgFile構建一層索引抛丽,這樣在映射到TimeWheel時只需要做一次Hash操作
- 通過TimeWheel將消息投遞到ConsumeQueue中完成對Consumer的可見
通過這個方案解決了最初提出來的任意延遲消息的兩個難點:
- 消息的排序問題
- 超長延遲消息的存儲問題
最后
本文從延遲消息的概念出發(fā),了解業(yè)界的支持情況嚎朽,確定延遲消息的難點和支持邊界铺纽,最后通過一步步推導完成了一個相對來說從內(nèi)存開銷和性能上都可以滿足期望的方案柬帕。