如何才能讓Spring Boot與RabbitMQ結(jié)合實現(xiàn)延遲隊列

顧名思義录煤,延遲隊列就是進(jìn)入該隊列的消息會被延遲消費的隊列。而一般的隊列荞胡,消息一旦入隊了之后就會被消費者馬上消費妈踊。

延遲隊列能做什么?

延遲隊列多用于需要延遲工作的場景泪漂。最常見的是以下兩種場景:

延遲消費廊营。比如:

用戶生成訂單之后,需要過一段時間校驗訂單的支付狀態(tài)萝勤,如果訂單仍未支付則需要及時地關(guān)閉訂單露筒。

用戶注冊成功之后,需要過一段時間比如一周后校驗用戶的使用情況敌卓,如果發(fā)現(xiàn)用戶活躍度較低邀窃,則發(fā)送郵件或者短信來提醒用戶使用。

延遲重試。比如消費者從隊列里消費消息時失敗了瞬捕,但是想要延遲一段時間后自動重試。

如果不使用延遲隊列舵抹,那么我們只能通過一個輪詢掃描程序去完成肪虎。這種方案既不優(yōu)雅,也不方便做成統(tǒng)一的服務(wù)便于開發(fā)人員使用惧蛹。但是使用延遲隊列的話扇救,我們就可以輕而易舉地完成。

如何實現(xiàn)香嗓?

別急迅腔,在下文中,我們將詳細(xì)介紹如何利用 Spring Boot 加 RabbitMQ 來實現(xiàn)延遲隊列靠娱。

實現(xiàn)思路

在介紹具體的實現(xiàn)思路之前沧烈,我們先來介紹一下RabbitMQ的兩個特性,一個是Time-To-Live Extensions像云,另一個是Dead Letter Exchanges锌雀。

Time-To-Live Extensions

RabbitMQ允許我們?yōu)橄⒒蛘哧犃性O(shè)置TTL(time to live),也就是過期時間迅诬。TTL表明了一條消息可在隊列中存活的最大時間腋逆,單位為毫秒。也就是說侈贷,當(dāng)某條消息被設(shè)置了TTL或者當(dāng)某條消息進(jìn)入了設(shè)置了TTL的隊列時惩歉,這條消息會在經(jīng)過TTL秒后“死亡”,成為Dead Letter俏蛮。如果既配置了消息的TTL撑蚌,又配置了隊列的TTL,那么較小的那個值會被取用嫁蛇。更多資料請查閱 官方文檔 锨并。

Dead Letter Exchange

剛才提到了,被設(shè)置了TTL的消息在過期后會成為Dead Letter睬棚。其實在RabbitMQ中第煮,一共有三種消息的“死亡”形式:

消息被拒絕。通過調(diào)用basic.reject或者basic.nack并且設(shè)置的requeue參數(shù)為false抑党。

消息因為設(shè)置了TTL而過期包警。

消息進(jìn)入了一條已經(jīng)達(dá)到最大長度的隊列。

如果隊列設(shè)置了Dead Letter Exchange(DLX)底靠,那么這些Dead Letter就會被重新publish到Dead Letter Exchange害晦,通過Dead Letter Exchange路由到其他隊列。更多資料請查閱 官方文檔 。

流程圖

聰明的你肯定已經(jīng)想到了壹瘟,如何將RabbitMQ的TTL和DLX特性結(jié)合在一起鲫剿,實現(xiàn)一個延遲隊列。

針對于上述的延遲隊列的兩個場景稻轨,我們分別有以下兩種流程圖:

延遲消費

延遲消費是延遲隊列最為常用的使用模式灵莲。如下圖所示,生產(chǎn)者產(chǎn)生的消息首先會進(jìn)入緩沖隊列(圖中紅色隊列)殴俱。通過RabbitMQ提供的TTL擴展政冻,這些消息會被設(shè)置過期時間,也就是延遲消費的時間线欲。等消息過期之后明场,這些消息會通過配置好的DLX轉(zhuǎn)發(fā)到實際消費隊列(圖中藍(lán)色隊列),以此達(dá)到延遲消費的效果李丰。

延遲重試

延遲重試本質(zhì)上也是延遲消費的一種苦锨,但是這種模式的結(jié)構(gòu)與普通的延遲消費的流程圖較為不同,所以單獨拎出來介紹嫌套。

如下圖所示逆屡,消費者發(fā)現(xiàn)該消息處理出現(xiàn)了異常,比如是因為網(wǎng)絡(luò)波動引起的異常踱讨。那么如果不等待一段時間魏蔗,直接就重試的話,很可能會導(dǎo)致在這期間內(nèi)一直無法成功痹筛,造成一定的資源浪費莺治。那么我們可以將其先放在緩沖隊列中(圖中紅色隊列),等消息經(jīng)過一段的延遲時間后再次進(jìn)入實際消費隊列中(圖中藍(lán)色隊列)帚稠,此時由于已經(jīng)過了“較長”的時間了谣旁,異常的一些波動通常已經(jīng)恢復(fù),這些消息可以被正常地消費滋早。

代碼實現(xiàn)

接下來我們將介紹如何在Spring Boot中實現(xiàn)基于RabbitMQ的延遲隊列榄审。我們假設(shè)讀者已經(jīng)擁有了Spring Boot與RabbitMQ的基本知識。如果想快速了解Spring Boot的相關(guān)基礎(chǔ)知識杆麸,可以參考我之前寫的一篇文章搁进。

初始化工程

首先我們在Intellij中創(chuàng)建一個Spring Boot工程,并且添加 spring-boot-starter-amqp 擴展昔头。

配置隊列

從上述的流程圖中我們可以看到饼问,一個延遲隊列的實現(xiàn),需要一個緩沖隊列以及一個實際的消費隊列揭斧。又由于在RabbitMQ中莱革,我們擁有兩種消息過期的配置方式,所以在代碼中,我們一共配置了三條隊列:

1.delay_queue_per_message_ttl:TTL配置在消息上的緩沖隊列盅视。

2.delay_queue_per_queue_ttl:TTL配置在隊列上的緩沖隊列捐名。

3.delay_process_queue:實際消費隊列。

我們通過Java Config的方式將上述的隊列配置為Bean闹击。由于我們添加了 spring-boot-starter-amqp 擴展桐筏,Spring Boot在啟動時會根據(jù)我們的配置自動創(chuàng)建這些隊列。為了方便接下來的測試拇砰,我們將delay_queue_per_message_ttl以及delay_queue_per_queue_ttl的DLX配置為同一個,且過期的消息都會通過DLX轉(zhuǎn)發(fā)到delay_process_queue狰腌。

delay_queue_per_message_ttl

首先介紹delay_queue_per_message_ttl的配置代碼:

@BeanQueuedelayQueuePerMessageTTL(){returnQueueBuilder.durable(DELAY_QUEUE_PER_MESSAGE_TTL_NAME) .withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) // DLX除破,dead letter發(fā)送到的exchange.withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) // dead letter攜帶的routing key.build();}

其中, x-dead-letter-exchange 聲明了隊列里的死信轉(zhuǎn)發(fā)到的DLX名稱琼腔, x-dead-letter-routing-key 聲明了這些死信在轉(zhuǎn)發(fā)時攜帶的routing-key名稱瑰枫。

delay_queue_per_queue_ttl

類似地,delay_queue_per_queue_ttl的配置代碼:

@BeanQueuedelayQueuePerQueueTTL(){returnQueueBuilder.durable(DELAY_QUEUE_PER_QUEUE_TTL_NAME) .withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) // DLX.withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) // dead letter攜帶的routing key.withArgument("x-message-ttl", QUEUE_EXPIRATION) // 設(shè)置隊列的過期時間.build();}

delay_queue_per_queue_ttl隊列的配置比delay_queue_per_message_ttl隊列的配置多了一個x-message-ttl 丹莲,該配置用來設(shè)置隊列的過期時間光坝。

delay_process_queue

delay_process_queue的配置最為簡單:

@BeanQueuedelayProcessQueue(){returnQueueBuilder.durable(DELAY_PROCESS_QUEUE_NAME) .build();}

配置Exchange

配置DLX

首先,我們需要配置DLX甥材,代碼如下:

@BeanDirectExchangedelayExchange(){returnnew DirectExchange(DELAY_EXCHANGE_NAME);}

然后再將該DLX綁定到實際消費隊列即delay_process_queue上盯另。這樣所有的死信都會通過DLX被轉(zhuǎn)發(fā)到delay_process_queue:

@BeanBindingdlxBinding(Queue delayProcessQueue, DirectExchange delayExchange){returnBindingBuilder.bind(delayProcessQueue) .to(delayExchange) .with(DELAY_PROCESS_QUEUE_NAME);}

配置延遲重試所需的Exchange

從延遲重試的流程圖中我們可以看到,消息處理失敗之后洲赵,我們需要將消息轉(zhuǎn)發(fā)到緩沖隊列鸳惯,所以緩沖隊列也需要綁定一個Exchange。 在本例中叠萍,我們將delay_process_per_queue_ttl作為延遲重試?yán)锏木彌_隊列 芝发。具體代碼是如何配置的,這里就不贅述了苛谷,大家可以查閱我 Github 中的代碼辅鲸。

定義消費者

我們創(chuàng)建一個最簡單的消費者ProcessReceiver,這個消費者監(jiān)聽delay_process_queue隊列腹殿,對于接受到的消息独悴,他會:

1.如果消息里的消息體不等于FAIL_MESSAGE,那么他會輸出消息體赫蛇。

2.如果消息里的消息體恰好是FAIL_MESSAGE绵患,那么他會模擬拋出異常,然后將該消息重定向到緩沖隊列(對應(yīng)延遲重試場景)悟耘。

另外落蝙,我們還需要新建一個監(jiān)聽容器用于存放消費者,代碼如下:

@BeanSimpleMessageListenerContainerprocessContainer(ConnectionFactory connectionFactory, ProcessReceiver processReceiver){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames(DELAY_PROCESS_QUEUE_NAME); // 監(jiān)聽delay_process_queuecontainer.setMessageListener(new MessageListenerAdapter(processReceiver));returncontainer;}

至此,我們前置的配置代碼已經(jīng)全部編寫完成筏勒,接下來我們需要編寫測試用例來測試我們的延遲隊列移迫。

編寫測試用例

延遲消費場景

首先我們編寫用于測試TTL設(shè)置在消息上的測試代碼。

我們借助 spring-rabbit 包下提供的RabbitTemplate類來發(fā)送消息管行。由于我們添加了 spring-boot-starter-amqp 擴展厨埋,Spring Boot會在初始化時自動地將RabbitTemplate當(dāng)成bean加載到容器中。

解決了消息的發(fā)送問題捐顷,那么又該如何為每個消息設(shè)置TTL呢荡陷?這里我們需要借助MessagePostProcessor。MessagePostProcessor通常用來設(shè)置消息的Header以及消息的屬性迅涮。我們新建一個ExpirationMessagePostProcessor類來負(fù)責(zé)設(shè)置消息的TTL屬性:

/*** 設(shè)置消息的失效時間*/public class ExpirationMessagePostProcessorimplements MessagePostProcessor{ private final Long ttl; // 毫秒public ExpirationMessagePostProcessor(Long ttl){ this.ttl = ttl;} @Overridepublic Message postProcessMessage(Message message)throws AmqpException {message.getMessageProperties().setExpiration(ttl.toString()); // 設(shè)置per-message的失效時間returnmessage;}}

然后在調(diào)用RabbitTemplate的convertAndSend方法時废赞,傳入ExpirationMessagePostPorcessor即可。我們向緩沖隊列中發(fā)送3條消息叮姑,過期時間依次為1秒唉地,2秒和3秒。具體的代碼如下所示:

@Testpublic voidtestDelayQueuePerMessageTTL()throws InterruptedException {ProcessReceiver.latch = new CountDownLatch(3);for(int i = 1; i <= 3; i++) { long expiration = i * 1000;rabbitTemplate.convertAndSend(QueueConfig.DELAY_QUEUE_PER_MESSAGE_TTL_NAME,(Object) ("Message From delay_queue_per_message_ttl with expiration "+ expiration), new ExpirationMessagePostProcessor(expiration));}ProcessReceiver.latch.await();}

細(xì)心的朋友一定會問传透,為什么要在代碼中加一個CountDownLatch呢耘沼?這是因為如果沒有l(wèi)atch阻塞住測試方法的話,測試用例會直接結(jié)束朱盐,程序退出群嗤,我們就看不到消息被延遲消費的表現(xiàn)了。

那么類似地托享,測試TTL設(shè)置在隊列上的代碼如下:

@Testpublic voidtestDelayQueuePerQueueTTL()throws InterruptedException {ProcessReceiver.latch = new CountDownLatch(3);for(int i = 1; i <= 3; i++) {rabbitTemplate.convertAndSend(QueueConfig.DELAY_QUEUE_PER_QUEUE_TTL_NAME,"Message From delay_queue_per_queue_ttl with expiration "+ QueueConfig.QUEUE_EXPIRATION);}ProcessReceiver.latch.await();}

我們向緩沖隊列中發(fā)送3條消息骚烧。理論上這3條消息會在4秒后同時過期。

延遲重試場景

我們同樣還需測試延遲重試場景闰围。

@Testpublic voidtestFailMessage()throws InterruptedException {ProcessReceiver.latch = new CountDownLatch(6);for(int i = 1; i <= 3; i++) {rabbitTemplate.convertAndSend(QueueConfig.DELAY_PROCESS_QUEUE_NAME, ProcessReceiver.FAIL_MESSAGE);}ProcessReceiver.latch.await();}

我們向delay_process_queue發(fā)送3條會觸發(fā)FAIL的消息赃绊,理論上這3條消息會在4秒后自動重試。

查看測試結(jié)果

延遲消費場景

延遲消費的場景測試我們分為了TTL設(shè)置在消息上和TTL設(shè)置在隊列上兩種羡榴。首先碧查,我們先看一下TTL設(shè)置在消息上的測試結(jié)果:

從上圖中我們可以看到,ProcessReceiver分別經(jīng)過1秒校仑、2秒忠售、3秒收到消息。測試結(jié)果表明消息不僅被延遲消費了迄沫,而且每條消息的延遲時間是可以被個性化設(shè)置的稻扬。TTL設(shè)置在消息上的延遲消費場景測試成功。

然后羊瘩,TTL設(shè)置在隊列上的測試結(jié)果如下圖:

從上圖中我們可以看到泰佳,ProcessReceiver經(jīng)過了4秒的延遲之后盼砍,同時收到了3條消息。測試結(jié)果表明消息不僅被延遲消費了逝她,同時也證明了當(dāng)TTL設(shè)置在隊列上的時候浇坐,消息的過期時間是固定的。TTL設(shè)置在隊列上的延遲消費場景測試成功黔宛。

延遲重試場景

接下來近刘,我們再來看一下延遲重試的測試結(jié)果:

ProcessReceiver首先收到了3條會觸發(fā)FAIL的消息,然后將其移動到緩沖隊列之后臀晃,過了4秒觉渴,又收到了剛才的那3條消息。延遲重試場景測試成功徽惋。

想學(xué)習(xí)交流HashMap,nginx疆拘、dubbo、Spring MVC,分布式寂曹、高性能高可用、redis回右、jvm隆圆、多線程、netty翔烁、kafka渺氧、的對于課程有興趣加群:629740746同時也可以免費獲得下面分享的視頻資料

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市蹬屹,隨后出現(xiàn)的幾起案子侣背,更是在濱河造成了極大的恐慌,老刑警劉巖慨默,帶你破解...
    沈念sama閱讀 217,542評論 6 504
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件贩耐,死亡現(xiàn)場離奇詭異,居然都是意外死亡厦取,警方通過查閱死者的電腦和手機潮太,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,822評論 3 394
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來虾攻,“玉大人铡买,你說我怎么就攤上這事■浚” “怎么了奇钞?”我有些...
    開封第一講書人閱讀 163,912評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長漂坏。 經(jīng)常有香客問我景埃,道長媒至,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,449評論 1 293
  • 正文 為了忘掉前任纠亚,我火速辦了婚禮塘慕,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘蒂胞。我一直安慰自己图呢,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,500評論 6 392
  • 文/花漫 我一把揭開白布骗随。 她就那樣靜靜地躺著蛤织,像睡著了一般。 火紅的嫁衣襯著肌膚如雪鸿染。 梳的紋絲不亂的頭發(fā)上指蚜,一...
    開封第一講書人閱讀 51,370評論 1 302
  • 那天,我揣著相機與錄音涨椒,去河邊找鬼摊鸡。 笑死,一個胖子當(dāng)著我的面吹牛蚕冬,可吹牛的內(nèi)容都是我干的免猾。 我是一名探鬼主播,決...
    沈念sama閱讀 40,193評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼囤热,長吁一口氣:“原來是場噩夢啊……” “哼猎提!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起旁蔼,我...
    開封第一講書人閱讀 39,074評論 0 276
  • 序言:老撾萬榮一對情侶失蹤锨苏,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后棺聊,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體伞租,經(jīng)...
    沈念sama閱讀 45,505評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,722評論 3 335
  • 正文 我和宋清朗相戀三年限佩,在試婚紗的時候發(fā)現(xiàn)自己被綠了肯夏。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,841評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡犀暑,死狀恐怖驯击,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情耐亏,我是刑警寧澤徊都,帶...
    沈念sama閱讀 35,569評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站广辰,受9級特大地震影響暇矫,放射性物質(zhì)發(fā)生泄漏主之。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,168評論 3 328
  • 文/蒙蒙 一李根、第九天 我趴在偏房一處隱蔽的房頂上張望槽奕。 院中可真熱鬧,春花似錦房轿、人聲如沸粤攒。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,783評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽夯接。三九已至,卻和暖如春纷妆,著一層夾襖步出監(jiān)牢的瞬間盔几,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,918評論 1 269
  • 我被黑心中介騙來泰國打工掩幢, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留逊拍,地道東北人。 一個月前我還...
    沈念sama閱讀 47,962評論 2 370
  • 正文 我出身青樓际邻,卻偏偏與公主長得像顺献,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子枯怖,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,781評論 2 354

推薦閱讀更多精彩內(nèi)容