如何用 RabbitMQ 實(shí)現(xiàn)延時(shí)隊(duì)列

本人所在的公司在生產(chǎn)中使用 RabbitMQ 作為消息中間件丑搔,并在業(yè)務(wù)中用到了 RabbitMQ 的延時(shí)隊(duì)列功能。RabbitMQ 原生并不直接支持消息的延時(shí)發(fā)送,但是通過安裝插件就可以做到這一點(diǎn)抡笼。RabbitMQ 可以方便集成到 Spring 框架中,利用插件跛梗,也可以很方便地使用延時(shí)隊(duì)列,實(shí)現(xiàn)延時(shí)處理某些業(yè)務(wù)邏輯的功能芥挣。因?yàn)榕渲煤褪褂煤?jiǎn)單,在保證生產(chǎn)環(huán)境中不會(huì)有巨大量的消息積壓的情況下耻台,還是有一定的適用場(chǎng)景的空免。

當(dāng)然,RabbitMQ 的這個(gè)插件雖然是官方插件粘我,但是并不在團(tuán)隊(duì)的主要維護(hù)列表上鼓蜒,雖然官方說明這個(gè)插件有穩(wěn)定運(yùn)行的能力,但是仍有諸多缺點(diǎn)征字。例如都弹,延時(shí)消息不能積壓到百萬級(jí)別以上,否則就會(huì)出現(xiàn)性能問題匙姜;另外畅厢,延時(shí)消息雖然會(huì)被持久化,但是數(shù)據(jù)庫是單點(diǎn)的氮昧,在分布式系統(tǒng)中框杜,單點(diǎn)總是不好的。

內(nèi)容簡(jiǎn)介

本文首先介紹消息隊(duì)列以及 RabbitMQ 中的一些基礎(chǔ)概念袖肥,然后介紹兩種可以通過 RabbitMQ 實(shí)現(xiàn)延時(shí)隊(duì)列的方案咪辱。延時(shí)隊(duì)列是指,消息在發(fā)布到隊(duì)列后不立即被消費(fèi)椎组,而是經(jīng)過一個(gè)固定的時(shí)間后再被發(fā)送給消費(fèi)者消費(fèi)油狂。

需要特別指出的是,本文會(huì)特別注明這兩種實(shí)現(xiàn)延時(shí)隊(duì)列的方案的限制和不足寸癌,且本文中提供的方案應(yīng)該作為一種思路被看待专筷,類似“延時(shí)隊(duì)列”的實(shí)現(xiàn)也可以通過其他中間件完成,例如 redis蒸苇,或者甚至通過數(shù)據(jù)庫加定時(shí)任務(wù)來實(shí)現(xiàn)磷蛹,一切技術(shù)方案都有其適用場(chǎng)景、優(yōu)勢(shì)和劣勢(shì)溪烤,需要根據(jù)實(shí)際情況做取舍味咳。

基礎(chǔ)知識(shí)

首先來復(fù)習(xí)一下消息隊(duì)列以及 RabbitMQ 相關(guān)的基礎(chǔ)知識(shí),如果你對(duì)這部分內(nèi)容已經(jīng)很熟悉了檬嘀,可以直接跳到“用 RabbitMQ 實(shí)現(xiàn)延遲隊(duì)列的兩種方案”一節(jié)繼續(xù)閱讀莺葫。

消息隊(duì)列核心角色

消息生產(chǎn)和消費(fèi)流程
  • publisher / producer:消息生產(chǎn)者
  • broker:消息隊(duì)列服務(wù)器實(shí)體
    • exchange:消息交換機(jī),指定消息按什么規(guī)則路由到哪個(gè)隊(duì)列(航空公司)
    • binding:exchange 和 queue 按照路由規(guī)則綁定起來(航線)
    • routing key:binding 的屬性枪眉,某些 exchange 類型下的消息路由的規(guī)則
    • queue:隊(duì)列是消息的載體捺檬,每條消息都會(huì)被投入到一個(gè)或多個(gè)隊(duì)列(目的地)
  • consumer:消息消費(fèi)者

生產(chǎn)者不直接把消息發(fā)送給消費(fèi)者,而是發(fā)送給 exchange贸铜,達(dá)到生產(chǎn)者和消費(fèi)者的解耦堡纬。

RabbitMQ 中主要的 exchange 類型

1. fanout

忽略 routing key聂受,消息會(huì)被發(fā)送到所有綁定到該 exchange 的所有隊(duì)列。

fanout exchange

2. direct

消息會(huì)被發(fā)送到綁定到該 exchange 且 routing key 與其一致的隊(duì)列烤镐。

direct exchange

3. topic

消息會(huì)被發(fā)送到綁定到該 exchange 且 routing key 與其匹配的隊(duì)列蛋济。

  • routing key 是一系列由點(diǎn)隔開的單詞
  • * 表示一個(gè)任意單詞
  • # 表示 0 個(gè)或多個(gè)單詞
topic exchange

例如上圖中,*.*.rabbit 表示任意種類的兔子炮叶,*.orange.* 表示任意橙色的東西碗旅。

Dead Letter Exchange (DLX)

在某些異常情況下,例如一個(gè)消息不能被路由到任何一個(gè)隊(duì)列镜悉,根據(jù)不同的策略祟辟,消息可以被返還給 publisher、丟棄或者加入 DLX侣肄。DLX 也是一個(gè)普通的 exchange旧困,在以下幾種情況下,一條消息會(huì)被重新發(fā)布到 DLX:

  • 消息被 consumer 拒收稼锅,并且 requeue 被設(shè)置為 false
  • 消息已經(jīng)超時(shí)了(TTL)
  • 隊(duì)列滿了吼具,無法接收新的消息

綁定到 DLX 的隊(duì)列(死信隊(duì)列)可用于存儲(chǔ)死信,這樣對(duì)于一些重要的消息即使消費(fèi)失敗也不會(huì)丟失矩距。

DLX 還可以提供延時(shí)重試的機(jī)制拗盒,因?yàn)橛械臅r(shí)候消息消費(fèi)失敗了(可能是業(yè)務(wù)系統(tǒng)網(wǎng)絡(luò)延時(shí)等原因造成的),但并不想馬上重試(例如將 requeue 設(shè)置為 true)锥债,而是隔一段時(shí)間后再重試陡蝇。

結(jié)合消息的 TTL,利用 DLX 還可以實(shí)現(xiàn)消息的延時(shí)消費(fèi)赞弥。

注:隊(duì)列可以設(shè)置其上消息的超時(shí)時(shí)間(message-ttl),也可以設(shè)置隊(duì)列自身的超時(shí)時(shí)間(expires)趣兄,只有消息本身超時(shí)才會(huì)變?yōu)樗佬耪雷螅?duì)列超時(shí)后,不會(huì)導(dǎo)致其上的消息變?yōu)樗佬拧?/strong>

用 RabbitMQ 實(shí)現(xiàn)延遲隊(duì)列的兩種方案

1. 隊(duì)列或者消息的 TTL 配合 DLX

實(shí)現(xiàn)流程如下圖所示:

DLX 實(shí)現(xiàn)延遲隊(duì)列

生產(chǎn)者將消息發(fā)布到“緩沖隊(duì)列”(消息先發(fā)送到 exchange艇潭,這個(gè) exchange 綁定了一個(gè)設(shè)置了 message-ttl 的隊(duì)列拼窥,該隊(duì)列沒有任何消費(fèi)者,因此消息會(huì)在一定時(shí)間后超時(shí))蹋凝。

消息超時(shí)后會(huì)被發(fā)布到“緩沖隊(duì)列”綁定的 DLX鲁纠,并被路由到對(duì)應(yīng)的“實(shí)際消費(fèi)隊(duì)列”,最終被消費(fèi)者消費(fèi)鳍寂。

隊(duì)列中消息的 TTL 可以設(shè)置在隊(duì)列上改含,有以下兩種方法:

  • 設(shè)置隊(duì)列的 policy(message-ttl)。如果需要修改迄汛,可以直接在服務(wù)上用命令修改捍壤,無需改動(dòng)業(yè)務(wù)代碼骤视,無需刪除原有隊(duì)列。官方文檔推薦用這種方法設(shè)置 TTL鹃觉。具體命令如下:
rabbitmqctl set_policy expiry ".*" '{"expires":1800000}' --apply-to queues
  • 設(shè)置 optional queue arguments 的 x-message-ttl 值专酗,由于屬性是聲明隊(duì)列時(shí)設(shè)置的,因此如果需要修改盗扇,則必須刪除原來的隊(duì)列而建新的隊(duì)列祷肯,并且修改相應(yīng)的業(yè)務(wù)代碼。這種方法可能適用于聲明 auto-delete 為 true 的隊(duì)列疗隶,由業(yè)務(wù)代碼聲明佑笋,可以靈活設(shè)置想要的 TTL。示例代碼如下:
// 聲明隊(duì)列時(shí)設(shè)置 x-message-ttl 屬性為 60000 毫秒
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 60000);
channel.queueDeclare("myqueue", false, false, false, args);

消息在發(fā)布時(shí)也可以單獨(dú)設(shè)置 TTL抽减。如果此時(shí)隊(duì)列也設(shè)置了 TTL允青,則取兩者較小的值起作用。示例代碼如下:

byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
    .expiration("60000")
    .build();
channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);

然而卵沉,根據(jù)官網(wǎng)文檔的描述:

Only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered).

超時(shí)的消息只有到達(dá)了隊(duì)列的頭部才會(huì)真正被移除或變?yōu)樗佬拧?/p>

舉例來說颠锉,如果隊(duì)列設(shè)置了消息的超時(shí)時(shí)間為 10 秒,一個(gè)自身設(shè)置了超時(shí)時(shí)間為 5 秒的消息并不一定能保證其被加入隊(duì)列 5 秒后被移除史汗。因?yàn)檫@條消息之前可能已經(jīng)有很多消息插入隊(duì)列了琼掠,這些消息的超時(shí)時(shí)間都是 10 秒,那么這條消息就必須等前面的消息都被移除后才能被移除停撞。因此瓷蛙,即使單獨(dú)設(shè)置消息的超時(shí)時(shí)間,其行為也非常不可控戈毒,無法實(shí)現(xiàn)所謂任意時(shí)長(zhǎng)的延時(shí)效果艰猬。

消息超時(shí)變?yōu)樗佬藕螅瑫?huì)被發(fā)布到隊(duì)列配置的 DLX埋市,DLX 也是一個(gè)再普通不過的 exchange冠桃,發(fā)布到 DLX 的消息同樣會(huì)被路由到綁定到 DLX 的符合路由條件的隊(duì)列,這樣就實(shí)現(xiàn)了消息的延時(shí)消費(fèi)道宅。配置隊(duì)列的 DLX 同樣有兩種方式:

  • 設(shè)置隊(duì)列的 policy(dead-letter-exchange)食听。具體命令如下:
rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"my-dlx"}' --apply-to queues
  • 設(shè)置 optional queue arguments 的 x-dead-letter-exchange 值。示例代碼如下:
// 聲明一個(gè) direct exchange
channel.exchangeDeclare("some.exchange.name", "direct");
// 聲明一個(gè)隊(duì)列污茵,將前面聲明的 exchange 設(shè)置為其 DLX
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
channel.queueDeclare("myqueue", false, false, false, args);

緩沖隊(duì)列可以設(shè)置 x-dead-letter-routing-key 屬性樱报,如果不設(shè)置,消息進(jìn)入 DLX 后泞当,將仍用消息自身的 routing key 路由迹蛤。

限制:

  • 由于延時(shí)消息實(shí)際存放在隊(duì)列中,因此延時(shí)不易設(shè)置過大,否則可能會(huì)導(dǎo)致大量消息積壓在隊(duì)列中笤受,占用 broker 資源穷缤。
  • 不支持發(fā)送任意 TTL 的消息,TTL 必須與延時(shí)隊(duì)列綁定箩兽。

2. 使用 rabbitmq-delayed-message-exchange 插件

RabbitMQ v3.5.8 及以后版本支持該插件津肛,首先來看一下官方文檔上的一句話:

This plugin is considered to be experimental yet fairly stable and potential suitable for production use as long as the user is aware of its limitations.

該插件是實(shí)驗(yàn)性質(zhì)的,然而其也相當(dāng)穩(wěn)定并具有在生產(chǎn)中使用的潛質(zhì)汗贫,前提是使用者清楚它的局限性身坐。

rabbitmq-delayed-message-exchange 插件會(huì)將消息持久化到數(shù)據(jù)庫(Mnesia),再利用調(diào)度器調(diào)度消息的發(fā)布落包。因此延遲消息并不直接發(fā)布到任何 exchange部蛇,因此也不會(huì)直接保存到任何隊(duì)列里,而是先保存在磁盤上咐蝇,到了具體的需要發(fā)布的時(shí)間再發(fā)布涯鲁,因此消息在發(fā)布時(shí)會(huì)對(duì)于實(shí)際的發(fā)布時(shí)間有一定的延遲。

限制:

  • 持久化的消息數(shù)據(jù)庫是單點(diǎn)的有序,一旦某個(gè)節(jié)點(diǎn)的丟失或者禁用該節(jié)點(diǎn)上的 rabbitmq-delayed-message-exchange 插件會(huì)導(dǎo)致這個(gè)節(jié)點(diǎn)上的所有延遲消息都丟失掉抹腿。
  • 消息只會(huì)被嘗試發(fā)布一次,如果發(fā)布消息時(shí)沒有可以消費(fèi)的隊(duì)列存在旭寿,消息也不會(huì)被退回原發(fā)布者警绩,因?yàn)椴荒鼙WC此時(shí)原發(fā)布者還“活著”。
  • 同時(shí)存在的延遲消息總數(shù)不能超過百萬級(jí)盅称。具體參見 issue 72肩祥。其實(shí)說白了,我們不應(yīng)該期待把 RabbitMQ 當(dāng)做數(shù)據(jù)庫來使用缩膝,每一個(gè)技術(shù)都有其適用條件混狠,如果有大量延遲消息需要持久化,完全可以考慮用其他方法解決問題疾层。

插件的安裝和使用方法參見官方文檔:RabbitMQ plugins将饺。

那么我們?cè)撊绾问褂眠@個(gè)插件實(shí)現(xiàn)延遲隊(duì)列呢?具體步驟如下:

  • 使用 exchange 的擴(kuò)展屬性云芦,將 exchange 設(shè)置為”延遲“的俯逾,代碼如下:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
  • 發(fā)布消息時(shí)設(shè)置消息 header 的 x-delayed 屬性贸桶,可以看到插件支持任意時(shí)長(zhǎng)的消息延時(shí)舅逸。代碼如下:
byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);

參考資料

  1. RabbitMQ 官方文檔
  2. rabbitmq-delayed-message-exchange

實(shí)驗(yàn)代碼

所有實(shí)驗(yàn)代碼參見:github 源碼注意皇筛,不同框架有不同的集成 RabbitMQ 的方案琉历,請(qǐng)參閱相關(guān)文檔,請(qǐng)不要在生產(chǎn)環(huán)境中直接使用實(shí)驗(yàn)代碼。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末旗笔,一起剝皮案震驚了整個(gè)濱河市彪置,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌蝇恶,老刑警劉巖拳魁,帶你破解...
    沈念sama閱讀 212,816評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異撮弧,居然都是意外死亡潘懊,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,729評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門贿衍,熙熙樓的掌柜王于貴愁眉苦臉地迎上來授舟,“玉大人,你說我怎么就攤上這事贸辈∈褪鳎” “怎么了?”我有些...
    開封第一講書人閱讀 158,300評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵擎淤,是天一觀的道長(zhǎng)奢啥。 經(jīng)常有香客問我,道長(zhǎng)揉燃,這世上最難降的妖魔是什么扫尺? 我笑而不...
    開封第一講書人閱讀 56,780評(píng)論 1 285
  • 正文 為了忘掉前任,我火速辦了婚禮炊汤,結(jié)果婚禮上正驻,老公的妹妹穿的比我還像新娘。我一直安慰自己抢腐,他們只是感情好姑曙,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,890評(píng)論 6 385
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著迈倍,像睡著了一般伤靠。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上啼染,一...
    開封第一講書人閱讀 50,084評(píng)論 1 291
  • 那天宴合,我揣著相機(jī)與錄音,去河邊找鬼迹鹅。 笑死卦洽,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的斜棚。 我是一名探鬼主播阀蒂,決...
    沈念sama閱讀 39,151評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼该窗,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了蚤霞?” 一聲冷哼從身側(cè)響起酗失,我...
    開封第一講書人閱讀 37,912評(píng)論 0 268
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎昧绣,沒想到半個(gè)月后规肴,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,355評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡夜畴,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,666評(píng)論 2 327
  • 正文 我和宋清朗相戀三年奏纪,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片斩启。...
    茶點(diǎn)故事閱讀 38,809評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡序调,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出兔簇,到底是詐尸還是另有隱情发绢,我是刑警寧澤,帶...
    沈念sama閱讀 34,504評(píng)論 4 334
  • 正文 年R本政府宣布垄琐,位于F島的核電站边酒,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏狸窘。R本人自食惡果不足惜墩朦,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,150評(píng)論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望翻擒。 院中可真熱鬧氓涣,春花似錦、人聲如沸陋气。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,882評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽巩趁。三九已至痒玩,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間议慰,已是汗流浹背蠢古。 一陣腳步聲響...
    開封第一講書人閱讀 32,121評(píng)論 1 267
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留别凹,地道東北人草讶。 一個(gè)月前我還...
    沈念sama閱讀 46,628評(píng)論 2 362
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像番川,于是被迫代替她去往敵國(guó)和親到涂。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,724評(píng)論 2 351

推薦閱讀更多精彩內(nèi)容

  • 什么叫消息隊(duì)列 消息(Message)是指在應(yīng)用間傳送的數(shù)據(jù)颁督。消息可以非常簡(jiǎn)單践啄,比如只包含文本字符串,也可以更復(fù)雜...
    lijun_m閱讀 1,340評(píng)論 0 1
  • 四種交換機(jī)類型 fanout ? 它會(huì)將發(fā)送到該交換機(jī)的消息路由到所有與該交互機(jī)存在binding的隊(duì)列中沉御,無視R...
    TomDwan閱讀 579評(píng)論 0 0
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 15,899評(píng)論 2 11
  • 1.vhost(虛擬主機(jī)) 虛擬主機(jī)相當(dāng)于一個(gè)隔離的空間屿讽,多個(gè)虛擬主機(jī)可以對(duì)不同的用戶,不同的作用分割開來 2.P...
    MR_Hanjc閱讀 784評(píng)論 0 0
  • 又是一副臨摹Billy Showell的吠裆,很喜歡這種細(xì)致的風(fēng)格伐谈,不知道是否因?yàn)閷W(xué)生物的關(guān)系,這類風(fēng)格的畫我看著特別...
    畫師喬子閱讀 979評(píng)論 1 6