背景
何為延遲隊(duì)列?
顧名思義惰瓜,延遲隊(duì)列就是進(jìn)入該隊(duì)列的消息會(huì)被延遲消費(fèi)的隊(duì)列。而一般的隊(duì)列汉矿,消息一旦入隊(duì)了之后就會(huì)被消費(fèi)者馬上消費(fèi)崎坊。
延遲隊(duì)列能做什么?
延遲隊(duì)列多用于需要延遲工作的場(chǎng)景负甸。最常見(jiàn)的是以下兩種場(chǎng)景:
- 延遲消費(fèi)流强。比如:
- 用戶生成訂單之后,需要過(guò)一段時(shí)間校驗(yàn)訂單的支付狀態(tài)呻待,如果訂單仍未支付則需要及時(shí)地關(guān)閉訂單打月。
- 用戶注冊(cè)成功之后,需要過(guò)一段時(shí)間比如一周后校驗(yàn)用戶的使用情況蚕捉,如果發(fā)現(xiàn)用戶活躍度較低奏篙,則發(fā)送郵件或者短信來(lái)提醒用戶使用。
- 延遲重試。比如消費(fèi)者從隊(duì)列里消費(fèi)消息時(shí)失敗了秘通,但是想要延遲一段時(shí)間后自動(dòng)重試为严。
如果不使用延遲隊(duì)列,那么我們只能通過(guò)一個(gè)輪詢掃描程序去完成肺稀。這種方案既不優(yōu)雅第股,也不方便做成統(tǒng)一的服務(wù)便于開(kāi)發(fā)人員使用。但是使用延遲隊(duì)列的話话原,我們就可以輕而易舉地完成夕吻。
如何實(shí)現(xiàn)?
別急繁仁,在下文中涉馅,我們將詳細(xì)介紹如何利用Spring Boot加RabbitMQ來(lái)實(shí)現(xiàn)延遲隊(duì)列。
本文出現(xiàn)的示例代碼都已push到Github倉(cāng)庫(kù)中:https://github.com/Lovelcp/blog-demos/tree/master/spring-boot-rabbitmq-delay-queue
實(shí)現(xiàn)思路
在介紹具體的實(shí)現(xiàn)思路之前黄虱,我們先來(lái)介紹一下RabbitMQ的兩個(gè)特性稚矿,一個(gè)是Time-To-Live Extensions,另一個(gè)是Dead Letter Exchanges捻浦。
Time-To-Live Extensions
RabbitMQ允許我們?yōu)橄⒒蛘哧?duì)列設(shè)置TTL(time to live)晤揣,也就是過(guò)期時(shí)間。TTL表明了一條消息可在隊(duì)列中存活的最大時(shí)間默勾,單位為毫秒碉渡。也就是說(shuō),當(dāng)某條消息被設(shè)置了TTL或者當(dāng)某條消息進(jìn)入了設(shè)置了TTL的隊(duì)列時(shí)母剥,這條消息會(huì)在經(jīng)過(guò)TTL秒后“死亡”,成為Dead Letter形导。如果既配置了消息的TTL环疼,又配置了隊(duì)列的TTL,那么較小的那個(gè)值會(huì)被取用朵耕。更多資料請(qǐng)查閱官方文檔炫隶。
Dead Letter Exchange
剛才提到了,被設(shè)置了TTL的消息在過(guò)期后會(huì)成為Dead Letter阎曹。其實(shí)在RabbitMQ中伪阶,一共有三種消息的“死亡”形式:
- 消息被拒絕。通過(guò)調(diào)用basic.reject或者basic.nack并且設(shè)置的requeue參數(shù)為false处嫌。
- 消息因?yàn)樵O(shè)置了TTL而過(guò)期栅贴。
- 消息進(jìn)入了一條已經(jīng)達(dá)到最大長(zhǎng)度的隊(duì)列。
如果隊(duì)列設(shè)置了Dead Letter Exchange(DLX)熏迹,那么這些Dead Letter就會(huì)被重新publish到Dead Letter Exchange檐薯,通過(guò)Dead Letter Exchange路由到其他隊(duì)列。更多資料請(qǐng)查閱官方文檔。
流程圖
聰明的你肯定已經(jīng)想到了坛缕,如何將RabbitMQ的TTL和DLX特性結(jié)合在一起墓猎,實(shí)現(xiàn)一個(gè)延遲隊(duì)列。
針對(duì)于上述的延遲隊(duì)列的兩個(gè)場(chǎng)景赚楚,我們分別有以下兩種流程圖:
延遲消費(fèi)
延遲消費(fèi)是延遲隊(duì)列最為常用的使用模式毙沾。如下圖所示,生產(chǎn)者產(chǎn)生的消息首先會(huì)進(jìn)入緩沖隊(duì)列(圖中紅色隊(duì)列)宠页。通過(guò)RabbitMQ提供的TTL擴(kuò)展左胞,這些消息會(huì)被設(shè)置過(guò)期時(shí)間,也就是延遲消費(fèi)的時(shí)間勇皇。等消息過(guò)期之后罩句,這些消息會(huì)通過(guò)配置好的DLX轉(zhuǎn)發(fā)到實(shí)際消費(fèi)隊(duì)列(圖中藍(lán)色隊(duì)列),以此達(dá)到延遲消費(fèi)的效果敛摘。
延遲重試
延遲重試本質(zhì)上也是延遲消費(fèi)的一種门烂,但是這種模式的結(jié)構(gòu)與普通的延遲消費(fèi)的流程圖較為不同,所以單獨(dú)拎出來(lái)介紹兄淫。
如下圖所示屯远,消費(fèi)者發(fā)現(xiàn)該消息處理出現(xiàn)了異常,比如是因?yàn)榫W(wǎng)絡(luò)波動(dòng)引起的異常捕虽。那么如果不等待一段時(shí)間慨丐,直接就重試的話,很可能會(huì)導(dǎo)致在這期間內(nèi)一直無(wú)法成功泄私,造成一定的資源浪費(fèi)房揭。那么我們可以將其先放在緩沖隊(duì)列中(圖中紅色隊(duì)列),等消息經(jīng)過(guò)一段的延遲時(shí)間后再次進(jìn)入實(shí)際消費(fèi)隊(duì)列中(圖中藍(lán)色隊(duì)列)晌端,此時(shí)由于已經(jīng)過(guò)了“較長(zhǎng)”的時(shí)間了捅暴,異常的一些波動(dòng)通常已經(jīng)恢復(fù),這些消息可以被正常地消費(fèi)咧纠。
代碼實(shí)現(xiàn)
接下來(lái)我們將介紹如何在Spring Boot中實(shí)現(xiàn)基于RabbitMQ的延遲隊(duì)列蓬痒。我們假設(shè)讀者已經(jīng)擁有了Spring Boot與RabbitMQ的基本知識(shí)。如果想快速了解Spring Boot的相關(guān)基礎(chǔ)知識(shí)漆羔,可以參考我之前寫的一篇文章梧奢。
初始化工程
首先我們?cè)贗ntellij中創(chuàng)建一個(gè)Spring Boot工程,并且添加spring-boot-starter-amqp
擴(kuò)展演痒。
配置隊(duì)列
從上述的流程圖中我們可以看到亲轨,一個(gè)延遲隊(duì)列的實(shí)現(xiàn),需要一個(gè)緩沖隊(duì)列以及一個(gè)實(shí)際的消費(fèi)隊(duì)列嫡霞。又由于在RabbitMQ中瓶埋,我們擁有兩種消息過(guò)期的配置方式,所以在代碼中,我們一共配置了三條隊(duì)列:
- delay_queue_per_message_ttl:TTL配置在消息上的緩沖隊(duì)列养筒。
- delay_queue_per_queue_ttl:TTL配置在隊(duì)列上的緩沖隊(duì)列曾撤。
- delay_process_queue:實(shí)際消費(fèi)隊(duì)列。
我們通過(guò)Java Config的方式將上述的隊(duì)列配置為Bean晕粪。由于我們添加了spring-boot-starter-amqp
擴(kuò)展挤悉,Spring Boot在啟動(dòng)時(shí)會(huì)根據(jù)我們的配置自動(dòng)創(chuàng)建這些隊(duì)列。為了方便接下來(lái)的測(cè)試巫湘,我們將delay_queue_per_message_ttl以及delay_queue_per_queue_ttl的DLX配置為同一個(gè)装悲,且過(guò)期的消息都會(huì)通過(guò)DLX轉(zhuǎn)發(fā)到delay_process_queue。
delay_queue_per_message_ttl
首先介紹delay_queue_per_message_ttl的配置代碼:
@Bean
Queue delayQueuePerMessageTTL() {
return QueueBuilder.durable(DELAY_QUEUE_PER_MESSAGE_TTL_NAME)
.withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) // DLX尚氛,dead letter發(fā)送到的exchange
.withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) // dead letter攜帶的routing key
.build();
}
其中诀诊,x-dead-letter-exchange
聲明了隊(duì)列里的死信轉(zhuǎn)發(fā)到的DLX名稱,x-dead-letter-routing-key
聲明了這些死信在轉(zhuǎn)發(fā)時(shí)攜帶的routing-key名稱阅嘶。
delay_queue_per_queue_ttl
類似地属瓣,delay_queue_per_queue_ttl的配置代碼:
@Bean
Queue delayQueuePerQueueTTL() {
return QueueBuilder.durable(DELAY_QUEUE_PER_QUEUE_TTL_NAME)
.withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) // DLX
.withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) // dead letter攜帶的routing key
.withArgument("x-message-ttl", QUEUE_EXPIRATION) // 設(shè)置隊(duì)列的過(guò)期時(shí)間
.build();
}
delay_queue_per_queue_ttl隊(duì)列的配置比delay_queue_per_message_ttl隊(duì)列的配置多了一個(gè)x-message-ttl
,該配置用來(lái)設(shè)置隊(duì)列的過(guò)期時(shí)間讯柔。
delay_process_queue
delay_process_queue的配置最為簡(jiǎn)單:
@Bean
Queue delayProcessQueue() {
return QueueBuilder.durable(DELAY_PROCESS_QUEUE_NAME)
.build();
}
配置Exchange
配置DLX
首先抡蛙,我們需要配置DLX,代碼如下:
@Bean
DirectExchange delayExchange() {
return new DirectExchange(DELAY_EXCHANGE_NAME);
}
然后再將該DLX綁定到實(shí)際消費(fèi)隊(duì)列即delay_process_queue上魂迄。這樣所有的死信都會(huì)通過(guò)DLX被轉(zhuǎn)發(fā)到delay_process_queue:
@Bean
Binding dlxBinding(Queue delayProcessQueue, DirectExchange delayExchange) {
return BindingBuilder.bind(delayProcessQueue)
.to(delayExchange)
.with(DELAY_PROCESS_QUEUE_NAME);
}
配置延遲重試所需的Exchange
從延遲重試的流程圖中我們可以看到粗截,消息處理失敗之后,我們需要將消息轉(zhuǎn)發(fā)到緩沖隊(duì)列捣炬,所以緩沖隊(duì)列也需要綁定一個(gè)Exchange熊昌。在本例中,我們將delay_process_per_queue_ttl作為延遲重試?yán)锏木彌_隊(duì)列湿酸。具體代碼是如何配置的浴捆,這里就不贅述了,大家可以查閱我Github中的代碼稿械。
定義消費(fèi)者
我們創(chuàng)建一個(gè)最簡(jiǎn)單的消費(fèi)者ProcessReceiver,這個(gè)消費(fèi)者監(jiān)聽(tīng)delay_process_queue隊(duì)列冲粤,對(duì)于接受到的消息美莫,他會(huì):
- 如果消息里的消息體不等于FAIL_MESSAGE,那么他會(huì)輸出消息體梯捕。
- 如果消息里的消息體恰好是FAIL_MESSAGE厢呵,那么他會(huì)模擬拋出異常,然后將該消息重定向到緩沖隊(duì)列(對(duì)應(yīng)延遲重試場(chǎng)景)傀顾。
另外襟铭,我們還需要新建一個(gè)監(jiān)聽(tīng)容器用于存放消費(fèi)者,代碼如下:
@Bean
SimpleMessageListenerContainer processContainer(ConnectionFactory connectionFactory, ProcessReceiver processReceiver) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(DELAY_PROCESS_QUEUE_NAME); // 監(jiān)聽(tīng)delay_process_queue
container.setMessageListener(new MessageListenerAdapter(processReceiver));
return container;
}
至此,我們前置的配置代碼已經(jīng)全部編寫完成寒砖,接下來(lái)我們需要編寫測(cè)試用例來(lái)測(cè)試我們的延遲隊(duì)列赐劣。
編寫測(cè)試用例
延遲消費(fèi)場(chǎng)景
首先我們編寫用于測(cè)試TTL設(shè)置在消息上的測(cè)試代碼。
我們借助spring-rabbit
包下提供的RabbitTemplate類來(lái)發(fā)送消息哩都。由于我們添加了spring-boot-starter-amqp
擴(kuò)展魁兼,Spring Boot會(huì)在初始化時(shí)自動(dòng)地將RabbitTemplate當(dāng)成bean加載到容器中。
解決了消息的發(fā)送問(wèn)題漠嵌,那么又該如何為每個(gè)消息設(shè)置TTL呢咐汞?這里我們需要借助MessagePostProcessor。MessagePostProcessor通常用來(lái)設(shè)置消息的Header以及消息的屬性儒鹿。我們新建一個(gè)ExpirationMessagePostProcessor類來(lái)負(fù)責(zé)設(shè)置消息的TTL屬性:
/**
* 設(shè)置消息的失效時(shí)間
*/
public class ExpirationMessagePostProcessor implements MessagePostProcessor {
private final Long ttl; // 毫秒
public ExpirationMessagePostProcessor(Long ttl) {
this.ttl = ttl;
}
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties()
.setExpiration(ttl.toString()); // 設(shè)置per-message的失效時(shí)間
return message;
}
}
然后在調(diào)用RabbitTemplate的convertAndSend方法時(shí)化撕,傳入ExpirationMessagePostPorcessor即可。我們向緩沖隊(duì)列中發(fā)送3條消息约炎,過(guò)期時(shí)間依次為1秒植阴,2秒和3秒。具體的代碼如下所示:
@Test
public void testDelayQueuePerMessageTTL() throws InterruptedException {
ProcessReceiver.latch = new CountDownLatch(3);
for (int i = 1; i <= 3; i++) {
long expiration = i * 1000;
rabbitTemplate.convertAndSend(QueueConfig.DELAY_QUEUE_PER_MESSAGE_TTL_NAME,
(Object) ("Message From delay_queue_per_message_ttl with expiration " + expiration), new ExpirationMessagePostProcessor(expiration));
}
ProcessReceiver.latch.await();
}
細(xì)心的朋友一定會(huì)問(wèn)章钾,為什么要在代碼中加一個(gè)CountDownLatch呢墙贱?這是因?yàn)槿绻麤](méi)有l(wèi)atch阻塞住測(cè)試方法的話,測(cè)試用例會(huì)直接結(jié)束贱傀,程序退出惨撇,我們就看不到消息被延遲消費(fèi)的表現(xiàn)了。
那么類似地府寒,測(cè)試TTL設(shè)置在隊(duì)列上的代碼如下:
@Test
public void testDelayQueuePerQueueTTL() throws InterruptedException {
ProcessReceiver.latch = new CountDownLatch(3);
for (int i = 1; i <= 3; i++) {
rabbitTemplate.convertAndSend(QueueConfig.DELAY_QUEUE_PER_QUEUE_TTL_NAME,
"Message From delay_queue_per_queue_ttl with expiration " + QueueConfig.QUEUE_EXPIRATION);
}
ProcessReceiver.latch.await();
}
我們向緩沖隊(duì)列中發(fā)送3條消息魁衙。理論上這3條消息會(huì)在4秒后同時(shí)過(guò)期。
延遲重試場(chǎng)景
我們同樣還需測(cè)試延遲重試場(chǎng)景株搔。
@Test
public void testFailMessage() throws InterruptedException {
ProcessReceiver.latch = new CountDownLatch(6);
for (int i = 1; i <= 3; i++) {
rabbitTemplate.convertAndSend(QueueConfig.DELAY_PROCESS_QUEUE_NAME, ProcessReceiver.FAIL_MESSAGE);
}
ProcessReceiver.latch.await();
}
我們向delay_process_queue發(fā)送3條會(huì)觸發(fā)FAIL的消息剖淀,理論上這3條消息會(huì)在4秒后自動(dòng)重試。
查看測(cè)試結(jié)果
延遲消費(fèi)場(chǎng)景
延遲消費(fèi)的場(chǎng)景測(cè)試我們分為了TTL設(shè)置在消息上和TTL設(shè)置在隊(duì)列上兩種纤房。首先纵隔,我們先看一下TTL設(shè)置在消息上的測(cè)試結(jié)果:
從上圖中我們可以看到,ProcessReceiver分別經(jīng)過(guò)1秒炮姨、2秒捌刮、3秒收到消息。測(cè)試結(jié)果表明消息不僅被延遲消費(fèi)了舒岸,而且每條消息的延遲時(shí)間是可以被個(gè)性化設(shè)置的绅作。TTL設(shè)置在消息上的延遲消費(fèi)場(chǎng)景測(cè)試成功。
然后蛾派,TTL設(shè)置在隊(duì)列上的測(cè)試結(jié)果如下圖:
從上圖中我們可以看到俄认,ProcessReceiver經(jīng)過(guò)了4秒的延遲之后个少,同時(shí)收到了3條消息。測(cè)試結(jié)果表明消息不僅被延遲消費(fèi)了眯杏,同時(shí)也證明了當(dāng)TTL設(shè)置在隊(duì)列上的時(shí)候夜焦,消息的過(guò)期時(shí)間是固定的。TTL設(shè)置在隊(duì)列上的延遲消費(fèi)場(chǎng)景測(cè)試成功役拴。
延遲重試場(chǎng)景
接下來(lái)糊探,我們?cè)賮?lái)看一下延遲重試的測(cè)試結(jié)果:
ProcessReceiver首先收到了3條會(huì)觸發(fā)FAIL的消息,然后將其移動(dòng)到緩沖隊(duì)列之后河闰,過(guò)了4秒科平,又收到了剛才的那3條消息。延遲重試場(chǎng)景測(cè)試成功姜性。
總結(jié)
本文首先介紹了延遲隊(duì)列的概念以及用途瞪慧,并且通過(guò)代碼詳細(xì)講解了如何通過(guò)Spring Boot和RabbitMQ實(shí)現(xiàn)一個(gè)延遲隊(duì)列。希望本文能夠?qū)Υ蠹移綍r(shí)的學(xué)習(xí)和工作能有所啟發(fā)和幫助部念。有什么意見(jiàn)或者問(wèn)題歡迎在評(píng)論下方留言弃酌,謝謝!
本文首發(fā)于http://kissyu.org/2017/11/18/Spring%20Boot%E4%B8%8ERabbitMQ%E7%BB%93%E5%90%88%E5%AE%9E%E7%8E%B0%E5%BB%B6%E8%BF%9F%E9%98%9F%E5%88%97/
歡迎評(píng)論和轉(zhuǎn)載儡炼!