?
場景
????延時消息即消息發(fā)送后并不立即對消費者可見隅茎,而是在用戶指定的時間投遞給消費者干发。比如我們現(xiàn)在發(fā)送一條延時1分鐘的消息实胸,消息發(fā)送后立即發(fā)送給服務器,但是服務器在1分鐘后才將該消息交給消費者姻檀。
????這種延時消息有一些什么應用場景呢命满?比如在電商網站上我們購物的時候,下單后我們沒有立即支付绣版,這個時候界面上往往會提醒你如果xx分鐘還未支付訂單將被取消。對于這么一個功能如果不使用延時消息歼疮,那我們就需要使用類似定時任務的功能杂抽,比如每分鐘我們跑一個定時任務對訂單表進行掃描,將未支付訂單掃出韩脏,如果從下單時間到現(xiàn)在已經超過了45分鐘則將該訂單取消缩麸。但是定時掃描有一個問題是效率不高,如果訂單很多將會嚴重的影響db的性能赡矢。如果使用延時消息就沒有這樣的問題了杭朱,只需要發(fā)送一條延時xx分鐘的的延時消息即可,在消息里攜帶有訂單號吹散,xx分鐘后消費者收到該消息檢查對應訂單狀態(tài)做出對應處理弧械,這種方式將大大減輕對db的壓力,實現(xiàn)起來也更優(yōu)雅空民。
????上面描述的是一種延時時間固定的場景刃唐,還有一些是要指定時間執(zhí)行。比如買了一張一周后北京去東京的機票界轩,那么在乘機時間到來之前可能要發(fā)送數(shù)次提醒的短信給用戶画饥,那么我們也可以在用戶下單后發(fā)送一條延時消息,延時到乘機時間之前發(fā)送浊猾。
需求
????有了場景抖甘,我們首先來分析一下需求:
延時時間是不固定的,比如我們無法預測用戶訂未來多久的機票葫慎,所以我們不能僅僅提供幾種不同延時單位的延時功能衔彻。
延時時間精確在秒這個級別就可以了,不需要精確到1秒以內幅疼。
最大的延時時間應該有個度米奸。比如最大延時1年或2年(可能有同學問難道不能提供任意最大延時時間么?任意最大延時時間會增加系統(tǒng)的實現(xiàn)的復雜度爽篷,而在實際中并沒有什么用處悴晰,一般我們都盡量不推薦延時太久的消息,因為系統(tǒng)在不斷地演變,比如當前設計的時候消息是延時兩年铡溪,但是兩年后系統(tǒng)早已大變樣了漂辐,兩年前的消息都不一定有人記得,更別人說兼容兩年前的消息格式了)棕硫。
有了上面的限定髓涯,我們來討論一下延時消息的設計。
設計
????延時說白了就是一個定時任務的功能哈扮,指定一個未來的時間執(zhí)行消息投遞的任務纬纪,時間到了再將消息投遞出去。
? ? 如果遇到定時任務的場景往往會有這么幾個方案來考慮:
優(yōu)先級隊列(堆) 比如JAVA里的ScheduledThreadPoolExecutor滑肉。定時任務都丟到一個優(yōu)先級隊列里包各,按照到期時間進行排序,線程池從隊列里取任務出來執(zhí)行靶庙,算法復雜度是O(logN)问畅。?
掃描 所有任務都放到一個List里,然后一個死循環(huán)六荒,比如每100ms執(zhí)行一次护姆,掃描List里所有任務,當某任務到期后取出執(zhí)行掏击。這種方式實現(xiàn)簡單卵皂,算法復雜度是O(N),如果任務太多的話效率會很低铐料,適合任務比較少的場景渐裂。
hash wheel 按照任務的到期時間將任務放到一個刻度盤里,比如未來1秒的放到位置1钠惩,未來2秒的放到位置2柒凉,依次類推。每次刻度盤轉一個刻度篓跛,轉到該可讀則將該刻度上所有任務執(zhí)行膝捞,算法復雜度是O(1)。
????上面這三種方式都是基于內存的數(shù)據(jù)結構愧沟,也就是我們得將所有任務都放到內存里蔬咬,如果用在延時消息上,顯然是不現(xiàn)實的沐寺,實際上也是沒有必要的林艘。如果這個消息是幾個小時后需要投遞,我們?yōu)槭裁葱枰F(xiàn)在就將其加載進來一直占著內存呢混坞?看起來我們只需要提前一段時間加載未來某段時間需要投遞的消息即可狐援。比如我們將消息按照一個小時為一個段钢坦,每次只加載一個段的消息到內存里。其實我們可以用一個很形象的比如來描述這種結構:兩層時間輪(hash wheel)啥酱。第一層hash wheel位于磁盤上爹凹,精度較粗,每個小時為1個刻度镶殷。第二層hash wheel位于內存里禾酱,只包含第一層hash wheel一個刻度的數(shù)據(jù),精度為1秒绘趋。
????但是我們怎么去加載這些需要的消息將其組織為第一層hash wheel呢颤陶?消息接收后存儲到一個順序的log文件,消息接收的順序和消息的延時時間之間是沒有任何關系的陷遮。比如現(xiàn)在收到了一條消息指郁,是1個小時后需要投遞,稍后收到一條消息可能是5分鐘之后投遞拷呆。我們加載時候是按照延時時間進行加載的,比如我們需要加載未來一個小時需要投遞的消息:
?
????比如上圖所示疫粥,3 seconds是最近要投遞的消息茬斧,然后是5minutes,而排在最頭上的是1個小時后要投遞的梗逮。我們不可能每次要預加載的時候都從頭掃描一遍项秉,然后將需要的消息加載。
????怎么辦呢慷彤?對于需要快速查找娄蔼,我們肯定會想到建立索引。那么我們只需要按照我們的預加載的時間段劃分索引即可了底哗,比如我們建立2019021813, 2019021814...這樣的索引文件岁诉,文件里每一個entry就是一個索引,該索引包含以下結構:
index:
????schedule time: int64
offset: int64
offset是指向前面log的偏移跋选,而schedule time是消息的到期時間涕癣。這樣我們每次只需要加載一個段(比如2019021813)的索引到內存就行了,內存中的hashwheel根據(jù)schedule time決定到期時間前标,到期后根據(jù)offset讀取到消息內容將消息投遞出去坠韩。
這個存儲結構到這里基本上就ok了,但是存在一個落地實施的問題(磁盤的空間是有限的):如果一開始收到一條消息是6個月之后投遞的炼列,后面收到了一些一個小時內投遞的只搁,實際上只要消息投遞后我們就可以將消息刪除了,這樣可以大大的節(jié)約內存空間俭尖,但是因為log的頭部有一條6個月之后的消息氢惋,所以我們還不能將該log刪除掉,也就是至少6個月我們不能刪除消息,除非我們按照消息來刪除明肮,也就是將6個月后的消息保留下來菱农,而一個小時內已經投遞了的消息刪除掉(一種compact機制),但是這種實現(xiàn)就變得很復雜柿估。
????其實換個方式就簡單了循未,在前面我們按照每個時間段建立索引文件,那么如果我們不僅僅建立索引呢秫舌?也就是索引文件里不僅僅是索引的妖,而是包括完整的消息:消息收到后先進入一個按照接收順序的log(qmq里稱之為message log),然后回放該log足陨,按照log里每條消息的投遞時間將消息放到對應的時間段上(qmq里稱之為schedule log)嫂粟,這樣只要回放完成后message log里的消息就可以刪除了,message log只需要保留很少的內容墨缘,而schedule log是按照投遞時間段來組織的星虹,已經投遞過的時間段也可以立即刪除了。通過這種變化我們順利的解決了磁盤占用問題镊讼,另外還有一個副產品:讀寫分離宽涌。我們可以將延時消息里的message log放到小容量高性能的SSD里,提高消息發(fā)送的吞吐量和延時蝶棋,而將schedule log放到大容量低成本的HDD里卸亮,可以支撐時間更久的延時消息(下圖即延時消息的存儲結構):
?
其他細節(jié)
1. Server重啟如何發(fā)現(xiàn)未投遞消息
????在這里還有一些具體實現(xiàn)細節(jié)需要處理。雖然我們按照每個時間單位重新組織了消息(schedule log)玩裙,但是在該時間段內的消息并不是按照投遞時間排序的兼贸。比如每個小時為一個時間段,那么可能第59分鐘的消息排在最前面吃溅,而幾秒內需要投遞的排在最后面溶诞,那如果某個時間段內的消息正在投遞時應用突然掛掉了,那么再次恢復的時候我們并不能準確的知道消息投遞到哪兒了罕偎。所以我們增加了一個dispatch log很澄,dispatch log在消息投遞完成后寫入,dispatch log里每一個entry記錄的是schedule log里的offset颜及,表示該offset的消息已經投遞甩苛,當應用重啟后我們會對比schedule log和dispatch log,將未投遞的消息找出來重新加載投遞俏站,dispatch log相當于一個位圖數(shù)據(jù)結構讯蒲。
2. 正在加載某個時間段內的消息過程中又來了屬于該時間段內消息如何處理,會不會重復加載
????在我們決定加載某個時間段消息時(正在加載的時間段稱之為current loading segment)肄扎,我們首先會取得該時間段文件的最大offset墨林,然后加載只會加載這個offset范圍內的消息(qmq內稱之為loading offset)赁酝,而加載過程中如果又來了該時間段內消息,那這個消息的offset也是>loading offset:
if( message.offset in current loading segment && message.offset > loading offset){
????add to memory hash wheel
}
3. 加載一個時間段內的消息是不是需要占用太多的內存
????實際上我們并不會將schedule log里完整的消息加載到內存旭等,只會加載索引到內存酌呆,根據(jù)前面的介紹,每個索引是16個字節(jié)(實際大小可以參照代碼搔耕,略有出入)隙袁。假設我們使用1G內存加載一個小時索引的話,則可以裝載1G/16B = (1024M * 1024K * 1024B)/(16B) = 67108864 條消息索引弃榨。則每秒qps可以達到18641(67108864 / 60 / 60)菩收。如果我們想每秒達到10萬qps,每個小時一個刻度則需要5493MB鲸睛,如果覺得內存占用過高娜饵,則可以相應的縮小時間段大小,比如10分鐘一個時間段官辈,則10萬qps只需要占用915MB內存箱舞。通過計算可知這種設計方式還是在合理的范圍內的。
免費學習視頻歡迎關注云圖智聯(lián):https://e.yuntuzhilian.com/