預(yù)計(jì)閱讀:10分鐘
一、說明
在上一篇中,介紹了RabbitMQ中的死信隊(duì)列是什么恢总,何時(shí)使用以及如何使用RabbitMQ的死信隊(duì)列。相信通過上一篇的學(xué)習(xí)睬愤,對(duì)于死信隊(duì)列已經(jīng)有了更多的了解,這一篇的內(nèi)容也跟死信隊(duì)列息息相關(guān)纹安,如果你還不了解死信隊(duì)列尤辱,那么建議你先進(jìn)行上一篇文章的閱讀。
這一篇里厢岂,我們將繼續(xù)介紹RabbitMQ的高級(jí)特性光督,通過本篇的學(xué)習(xí),你將收獲:
- 什么是延時(shí)隊(duì)列
- 延時(shí)隊(duì)列使用場(chǎng)景
- RabbitMQ中的TTL
- 如何利用RabbitMQ來實(shí)現(xiàn)延時(shí)隊(duì)列
二塔粒、本文大綱
以下是本文大綱:
本文閱讀前结借,需要對(duì)RabbitMQ以及死信隊(duì)列有一個(gè)簡(jiǎn)單的了解。
三卒茬、什么是延時(shí)隊(duì)列
延時(shí)隊(duì)列
船老,首先,它是一種隊(duì)列圃酵,隊(duì)列意味著內(nèi)部的元素是有序
的柳畔,元素出隊(duì)和入隊(duì)是有方向性的,元素從一端進(jìn)入郭赐,從另一端取出薪韩。
其次,延時(shí)隊(duì)列
捌锭,最重要的特性就體現(xiàn)在它的延時(shí)
屬性上俘陷,跟普通的隊(duì)列不一樣的是,普通隊(duì)列中的元素總是等著希望被早點(diǎn)取出處理观谦,而延時(shí)隊(duì)列中的元素則是希望被在指定時(shí)間得到取出和處理
拉盾,所以延時(shí)隊(duì)列中的元素是都是帶時(shí)間屬性的,通常來說是需要被處理的消息或者任務(wù)坎匿。
簡(jiǎn)單來說盾剩,延時(shí)隊(duì)列就是用來存放需要在指定時(shí)間被處理的元素的隊(duì)列。
四替蔬、延時(shí)隊(duì)列使用場(chǎng)景
那么什么時(shí)候需要用延時(shí)隊(duì)列呢告私?考慮一下以下場(chǎng)景:
- 訂單在十分鐘之內(nèi)未支付則自動(dòng)取消。
- 新創(chuàng)建的店鋪承桥,如果在十天內(nèi)都沒有上傳過商品驻粟,則自動(dòng)發(fā)送消息提醒。
- 賬單在一周內(nèi)未支付,則自動(dòng)結(jié)算蜀撑。
- 用戶注冊(cè)成功后挤巡,如果三天內(nèi)沒有登陸則進(jìn)行短信提醒。
- 用戶發(fā)起退款酷麦,如果三天內(nèi)沒有得到處理則通知相關(guān)運(yùn)營人員矿卑。
- 預(yù)定會(huì)議后,需要在預(yù)定的時(shí)間點(diǎn)前十分鐘通知各個(gè)與會(huì)人員參加會(huì)議沃饶。
這些場(chǎng)景都有一個(gè)特點(diǎn)母廷,需要在某個(gè)事件發(fā)生之后或者之前的指定時(shí)間點(diǎn)完成某一項(xiàng)任務(wù),如:發(fā)生訂單生成事件糊肤,在十分鐘之后檢查該訂單支付狀態(tài)琴昆,然后將未支付的訂單進(jìn)行關(guān)閉;發(fā)生店鋪創(chuàng)建事件馆揉,十天后檢查該店鋪上新商品數(shù)业舍,然后通知上新數(shù)為0的商戶;發(fā)生賬單生成事件升酣,檢查賬單支付狀態(tài)舷暮,然后自動(dòng)結(jié)算未支付的賬單;發(fā)生新用戶注冊(cè)事件拗踢,三天后檢查新注冊(cè)用戶的活動(dòng)數(shù)據(jù)脚牍,然后通知沒有任何活動(dòng)記錄的用戶;發(fā)生退款事件巢墅,在三天之后檢查該訂單是否已被處理诸狭,如仍未被處理,則發(fā)送消息給相關(guān)運(yùn)營人員君纫;發(fā)生預(yù)定會(huì)議事件驯遇,判斷離會(huì)議開始是否只有十分鐘了,如果是蓄髓,則通知各個(gè)與會(huì)人員叉庐。
看起來似乎使用定時(shí)任務(wù),一直輪詢數(shù)據(jù)会喝,每秒查一次陡叠,取出需要被處理的數(shù)據(jù),然后處理不就完事了嗎肢执?如果數(shù)據(jù)量比較少枉阵,確實(shí)可以這樣做,比如:對(duì)于“如果賬單一周內(nèi)未支付則進(jìn)行自動(dòng)結(jié)算”這樣的需求预茄,如果對(duì)于時(shí)間不是嚴(yán)格限制兴溜,而是寬松意義上的一周,那么每天晚上跑個(gè)定時(shí)任務(wù)檢查一下所有未支付的賬單,確實(shí)也是一個(gè)可行的方案拙徽。但對(duì)于數(shù)據(jù)量比較大刨沦,并且時(shí)效性較強(qiáng)的場(chǎng)景,如:“訂單十分鐘內(nèi)未支付則關(guān)閉“膘怕,短期內(nèi)未支付的訂單數(shù)據(jù)可能會(huì)有很多想诅,活動(dòng)期間甚至?xí)_(dá)到百萬甚至千萬級(jí)別,對(duì)這么龐大的數(shù)據(jù)量仍舊使用輪詢的方式顯然是不可取的岛心,很可能在一秒內(nèi)無法完成所有訂單的檢查侧蘸,同時(shí)會(huì)給數(shù)據(jù)庫帶來很大壓力,無法滿足業(yè)務(wù)要求而且性能低下鹉梨。
更重要的一點(diǎn)是,不穿稳!優(yōu)存皂!雅!
沒錯(cuò)逢艘,作為一名有追求的程序員旦袋,始終應(yīng)該追求更優(yōu)雅的架構(gòu)和更優(yōu)雅的代碼風(fēng)格,寫代碼要像寫詩一樣優(yōu)美它改“淘校【滑稽】
這時(shí)候,延時(shí)隊(duì)列就可以閃亮登場(chǎng)了央拖,以上場(chǎng)景祭阀,正是延時(shí)隊(duì)列的用武之地。
既然延時(shí)隊(duì)列
可以解決很多特定場(chǎng)景下鲜戒,帶時(shí)間屬性的任務(wù)需求专控,那么如何構(gòu)造一個(gè)延時(shí)隊(duì)列呢?接下來遏餐,本文將介紹如何用RabbitMQ來實(shí)現(xiàn)延時(shí)隊(duì)列伦腐。
五、RabbitMQ中的TTL
在介紹延時(shí)隊(duì)列之前失都,還需要先介紹一下RabbitMQ中的一個(gè)高級(jí)特性——TTL(Time To Live)
柏蘑。
TTL
是什么呢?TTL
是RabbitMQ中一個(gè)消息或者隊(duì)列的屬性粹庞,表明一條消息或者該隊(duì)列中的所有消息的最大存活時(shí)間
咳焚,單位是毫秒。換句話說信粮,如果一條消息設(shè)置了TTL屬性或者進(jìn)入了設(shè)置TTL屬性的隊(duì)列黔攒,那么這條消息如果在TTL設(shè)置的時(shí)間內(nèi)沒有被消費(fèi),則會(huì)成為“死信”(至于什么是死信,請(qǐng)翻看上一篇)督惰。如果同時(shí)配置了隊(duì)列的TTL和消息的TTL不傅,那么較小的那個(gè)值將會(huì)被使用。
那么赏胚,如何設(shè)置這個(gè)TTL值呢访娶?有兩種方式,第一種是在創(chuàng)建隊(duì)列的時(shí)候設(shè)置隊(duì)列的“x-message-ttl”屬性觉阅,如下:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);`
這樣所有被投遞到該隊(duì)列的消息都最多不會(huì)存活超過6s崖疤。
另一種方式便是針對(duì)每條消息設(shè)置TTL,代碼如下:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());`
這樣這條消息的過期時(shí)間也被設(shè)置成了6s典勇。
但這兩種方式是有區(qū)別的劫哼,如果設(shè)置了隊(duì)列的TTL屬性,那么一旦消息過期割笙,就會(huì)被隊(duì)列丟棄权烧,而第二種方式,消息即使過期伤溉,也不一定會(huì)被馬上丟棄般码,因?yàn)橄⑹欠襁^期是在即將投遞到消費(fèi)者之前判定的,如果當(dāng)前隊(duì)列有嚴(yán)重的消息積壓情況乱顾,則已過期的消息也許還能存活較長(zhǎng)時(shí)間板祝。
另外,還需要注意的一點(diǎn)是走净,如果不設(shè)置TTL券时,表示消息永遠(yuǎn)不會(huì)過期,如果將TTL設(shè)置為0温技,則表示除非此時(shí)可以直接投遞該消息到消費(fèi)者革为,否則該消息將會(huì)被丟棄。
六舵鳞、如何利用RabbitMQ實(shí)現(xiàn)延時(shí)隊(duì)列
前一篇里介紹了如果設(shè)置死信隊(duì)列震檩,前文中又介紹了TTL,至此蜓堕,利用RabbitMQ實(shí)現(xiàn)延時(shí)隊(duì)列的兩大要素已經(jīng)集齊抛虏,接下來只需要將它們進(jìn)行調(diào)和,再加入一點(diǎn)點(diǎn)調(diào)味料套才,延時(shí)隊(duì)列就可以新鮮出爐了迂猴。
想想看,延時(shí)隊(duì)列
背伴,不就是想要消息延遲多久被處理嗎沸毁,TTL則剛好能讓消息在延遲多久之后成為死信峰髓,另一方面,成為死信的消息都會(huì)被投遞到死信隊(duì)列里息尺,這樣只需要消費(fèi)者一直消費(fèi)死信隊(duì)列里的消息就萬事大吉了携兵,因?yàn)槔锩娴南⒍际窍M涣⒓刺幚淼南ⅰ?/p>
從下圖可以大致看出消息的流向:
生產(chǎn)者生產(chǎn)一條延時(shí)消息,根據(jù)需要延時(shí)時(shí)間的不同搂誉,利用不同的routingkey將消息路由到不同的延時(shí)隊(duì)列徐紧,每個(gè)隊(duì)列都設(shè)置了不同的TTL屬性,并綁定在同一個(gè)死信交換機(jī)中炭懊,消息過期后并级,根據(jù)routingkey的不同,又會(huì)被路由到不同的死信隊(duì)列中侮腹,消費(fèi)者只需要監(jiān)聽對(duì)應(yīng)的死信隊(duì)列進(jìn)行處理即可嘲碧。
下面來看代碼:
先聲明交換機(jī)、隊(duì)列以及他們的綁定關(guān)系:
@Configuration
public class RabbitMQConfig {
public static final String DELAY_EXCHANGE_NAME = "delay.queue.demo.business.exchange";
public static final String DELAY_QUEUEA_NAME = "delay.queue.demo.business.queuea";
public static final String DELAY_QUEUEB_NAME = "delay.queue.demo.business.queueb";
public static final String DELAY_QUEUEA_ROUTING_KEY = "delay.queue.demo.business.queuea.routingkey";
public static final String DELAY_QUEUEB_ROUTING_KEY = "delay.queue.demo.business.queueb.routingkey";
public static final String DEAD_LETTER_EXCHANGE = "delay.queue.demo.deadletter.exchange";
public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "delay.queue.demo.deadletter.delay_10s.routingkey";
public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "delay.queue.demo.deadletter.delay_60s.routingkey";
public static final String DEAD_LETTER_QUEUEA_NAME = "delay.queue.demo.deadletter.queuea";
public static final String DEAD_LETTER_QUEUEB_NAME = "delay.queue.demo.deadletter.queueb";
// 聲明延時(shí)Exchange
@Bean("delayExchange")
public DirectExchange delayExchange(){
return new DirectExchange(DELAY_EXCHANGE_NAME);
}
// 聲明死信Exchange
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange(){
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
// 聲明延時(shí)隊(duì)列A 延時(shí)10s
// 并綁定到對(duì)應(yīng)的死信交換機(jī)
@Bean("delayQueueA")
public Queue delayQueueA(){
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 這里聲明當(dāng)前隊(duì)列綁定的死信交換機(jī)
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 這里聲明當(dāng)前隊(duì)列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
// x-message-ttl 聲明隊(duì)列的TTL
args.put("x-message-ttl", 6000);
return QueueBuilder.durable(DELAY_QUEUEA_NAME).withArguments(args).build();
}
// 聲明延時(shí)隊(duì)列B 延時(shí) 60s
// 并綁定到對(duì)應(yīng)的死信交換機(jī)
@Bean("delayQueueB")
public Queue delayQueueB(){
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 這里聲明當(dāng)前隊(duì)列綁定的死信交換機(jī)
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 這里聲明當(dāng)前隊(duì)列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY);
// x-message-ttl 聲明隊(duì)列的TTL
args.put("x-message-ttl", 60000);
return QueueBuilder.durable(DELAY_QUEUEB_NAME).withArguments(args).build();
}
// 聲明死信隊(duì)列A 用于接收延時(shí)10s處理的消息
@Bean("deadLetterQueueA")
public Queue deadLetterQueueA(){
return new Queue(DEAD_LETTER_QUEUEA_NAME);
}
// 聲明死信隊(duì)列B 用于接收延時(shí)60s處理的消息
@Bean("deadLetterQueueB")
public Queue deadLetterQueueB(){
return new Queue(DEAD_LETTER_QUEUEB_NAME);
}
// 聲明延時(shí)隊(duì)列A綁定關(guān)系
@Bean
public Binding delayBindingA(@Qualifier("delayQueueA") Queue queue,
@Qualifier("delayExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEA_ROUTING_KEY);
}
// 聲明業(yè)務(wù)隊(duì)列B綁定關(guān)系
@Bean
public Binding delayBindingB(@Qualifier("delayQueueB") Queue queue,
@Qualifier("delayExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEB_ROUTING_KEY);
}
// 聲明死信隊(duì)列A綁定關(guān)系
@Bean
public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
}
// 聲明死信隊(duì)列B綁定關(guān)系
@Bean
public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
}
}`
接下來父阻,創(chuàng)建兩個(gè)消費(fèi)者呀潭,分別對(duì)兩個(gè)死信隊(duì)列的消息進(jìn)行消費(fèi):
@Slf4j
@Component
public class DeadLetterQueueConsumer {
@RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)
public void receiveA(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("當(dāng)前時(shí)間:{},死信隊(duì)列A收到消息:{}", new Date().toString(), msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
@RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)
public void receiveB(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("當(dāng)前時(shí)間:{},死信隊(duì)列B收到消息:{}", new Date().toString(), msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}`
然后是消息的生產(chǎn)者:
@Component
public class DelayMessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(String msg, DelayTypeEnum type){
switch (type){
case DELAY_10s:
rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUEA_ROUTING_KEY, msg);
break;
case DELAY_60s:
rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUEB_ROUTING_KEY, msg);
break;
}
}
}
接下來,我們暴露一個(gè)web接口來生產(chǎn)消息:
@Slf4j
@RequestMapping("rabbitmq")
@RestController
public class RabbitMQMsgController {
@Autowired
private DelayMessageSender sender;
@RequestMapping("sendmsg")
public void sendMsg(String msg, Integer delayType){
log.info("當(dāng)前時(shí)間:{},收到請(qǐng)求至非,msg:{},delayType:{}", new Date(), msg, delayType);
sender.sendMsg(msg, Objects.requireNonNull(DelayTypeEnum.getDelayTypeEnumByValue(delayType)));
}
}`
準(zhǔn)備就緒,啟動(dòng)糠聪!
打開rabbitMQ的管理后臺(tái)荒椭,可以看到我們剛才創(chuàng)建的交換機(jī)和隊(duì)列信息:
接下來,我們來發(fā)送幾條消息舰蟆,http://localhost:8080/rabbitmq/sendmsg?msg=testMsg1&delayType=1 http://localhost:8080/rabbitmq/sendmsg?msg=testMsg2&delayType=2
日志如下:
2019-07-28 16:02:19.813 INFO 3860 --- [nio-8080-exec-9] c.m.d.controller.RabbitMQMsgController : 當(dāng)前時(shí)間:Sun Jul 28 16:02:19 CST 2019,收到請(qǐng)求趣惠,msg:testMsg1,delayType:1
2019-07-28 16:02:19.815 INFO 3860 --- [nio-8080-exec-9] .l.DirectReplyToMessageListenerContainer : SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-o-qPpkWIkRm73DIrOIVhig identity=766339] started
2019-07-28 16:02:25.829 INFO 3860 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer : 當(dāng)前時(shí)間:Sun Jul 28 16:02:25 CST 2019,死信隊(duì)列A收到消息:testMsg1
2019-07-28 16:02:41.326 INFO 3860 --- [nio-8080-exec-1] c.m.d.controller.RabbitMQMsgController : 當(dāng)前時(shí)間:Sun Jul 28 16:02:41 CST 2019,收到請(qǐng)求,msg:testMsg2,delayType:2
2019-07-28 16:03:41.329 INFO 3860 --- [ntContainer#0-1] c.m.d.mq.DeadLetterQueueConsumer : 當(dāng)前時(shí)間:Sun Jul 28 16:03:41 CST 2019,死信隊(duì)列B收到消息:testMsg2`
第一條消息在6s后變成了死信消息身害,然后被消費(fèi)者消費(fèi)掉味悄,第二條消息在60s之后變成了死信消息,然后被消費(fèi)掉塌鸯,這樣侍瑟,一個(gè)還算ok的延時(shí)隊(duì)列就打造完成了。
不過丙猬,等等涨颜,如果這樣使用的話,豈不是每增加一個(gè)新的時(shí)間需求茧球,就要新增一個(gè)隊(duì)列庭瑰,這里只有6s和60s兩個(gè)時(shí)間選項(xiàng),如果需要一個(gè)小時(shí)后處理抢埋,那么就需要增加TTL為一個(gè)小時(shí)的隊(duì)列弹灭,如果是預(yù)定會(huì)議室然后提前通知這樣的場(chǎng)景督暂,豈不是要增加無數(shù)個(gè)隊(duì)列才能滿足需求?穷吮?
嗯逻翁,仔細(xì)想想,事情并不簡(jiǎn)單酒来。
七卢未、RabbitMQ延時(shí)隊(duì)列優(yōu)化
顯然,需要一種更通用的方案才能滿足需求堰汉,那么就只能將TTL設(shè)置在消息屬性里了辽社。我們來試一試。
增加一個(gè)延時(shí)隊(duì)列翘鸭,用于接收設(shè)置為任意延時(shí)時(shí)長(zhǎng)的消息滴铅,增加一個(gè)相應(yīng)的死信隊(duì)列和routingkey:
@Configuration
public class RabbitMQConfig {
public static final String DELAY_EXCHANGE_NAME = "delay.queue.demo.business.exchange";
public static final String DELAY_QUEUEC_NAME = "delay.queue.demo.business.queuec";
public static final String DELAY_QUEUEC_ROUTING_KEY = "delay.queue.demo.business.queuec.routingkey";
public static final String DEAD_LETTER_EXCHANGE = "delay.queue.demo.deadletter.exchange";
public static final String DEAD_LETTER_QUEUEC_ROUTING_KEY = "delay.queue.demo.deadletter.delay_anytime.routingkey";
public static final String DEAD_LETTER_QUEUEC_NAME = "delay.queue.demo.deadletter.queuec";
// 聲明延時(shí)Exchange
@Bean("delayExchange")
public DirectExchange delayExchange(){
return new DirectExchange(DELAY_EXCHANGE_NAME);
}
// 聲明死信Exchange
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange(){
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
// 聲明延時(shí)隊(duì)列C 不設(shè)置TTL
// 并綁定到對(duì)應(yīng)的死信交換機(jī)
@Bean("delayQueueC")
public Queue delayQueueC(){
Map<String, Object> args = new HashMap<>(3);
// x-dead-letter-exchange 這里聲明當(dāng)前隊(duì)列綁定的死信交換機(jī)
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 這里聲明當(dāng)前隊(duì)列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEC_ROUTING_KEY);
return QueueBuilder.durable(DELAY_QUEUEC_NAME).withArguments(args).build();
}
// 聲明死信隊(duì)列C 用于接收延時(shí)任意時(shí)長(zhǎng)處理的消息
@Bean("deadLetterQueueC")
public Queue deadLetterQueueC(){
return new Queue(DEAD_LETTER_QUEUEC_NAME);
}
// 聲明延時(shí)列C綁定關(guān)系
@Bean
public Binding delayBindingC(@Qualifier("delayQueueC") Queue queue,
@Qualifier("delayExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEC_ROUTING_KEY);
}
// 聲明死信隊(duì)列C綁定關(guān)系
@Bean
public Binding deadLetterBindingC(@Qualifier("deadLetterQueueC") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEC_ROUTING_KEY);
}
}
增加一個(gè)死信隊(duì)列C的消費(fèi)者:
@RabbitListener(queues = DEAD_LETTER_QUEUEC_NAME)
public void receiveC(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("當(dāng)前時(shí)間:{},死信隊(duì)列C收到消息:{}", new Date().toString(), msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
再次啟動(dòng)!然后訪問:http://localhost:8080/rabbitmq/delayMsg?msg=testMsg1delayTime=5000 來生產(chǎn)消息就乓,注意這里的單位是毫秒汉匙。
2019-07-28 16:45:07.033 INFO 31468 --- [nio-8080-exec-4] c.m.d.controller.RabbitMQMsgController : 當(dāng)前時(shí)間:Sun Jul 28 16:45:07 CST 2019,收到請(qǐng)求,msg:testMsg1,delayTime:5000
2019-07-28 16:45:11.694 INFO 31468 --- [nio-8080-exec-5] c.m.d.controller.RabbitMQMsgController : 當(dāng)前時(shí)間:Sun Jul 28 16:45:11 CST 2019,收到請(qǐng)求生蚁,msg:testMsg2,delayTime:5000
2019-07-28 16:45:12.048 INFO 31468 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer : 當(dāng)前時(shí)間:Sun Jul 28 16:45:12 CST 2019,死信隊(duì)列C收到消息:testMsg1
2019-07-28 16:45:16.709 INFO 31468 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer : 當(dāng)前時(shí)間:Sun Jul 28 16:45:16 CST 2019,死信隊(duì)列C收到消息:testMsg2`
看起來似乎沒什么問題噩翠,但不要高興的太早,在最開始的時(shí)候邦投,就介紹過伤锚,如果使用在消息屬性上設(shè)置TTL的方式,消息可能并不會(huì)按時(shí)“死亡“志衣,因?yàn)镽abbitMQ只會(huì)檢查第一個(gè)消息是否過期屯援,如果過期則丟到死信隊(duì)列,索引如果第一個(gè)消息的延時(shí)時(shí)長(zhǎng)很長(zhǎng)念脯,而第二個(gè)消息的延時(shí)時(shí)長(zhǎng)很短狞洋,則第二個(gè)消息并不會(huì)優(yōu)先得到執(zhí)行。
實(shí)驗(yàn)一下:
2019-07-28 16:49:02.957 INFO 31468 --- [nio-8080-exec-8] c.m.d.controller.RabbitMQMsgController : 當(dāng)前時(shí)間:Sun Jul 28 16:49:02 CST 2019,收到請(qǐng)求绿店,msg:longDelayedMsg,delayTime:20000
2019-07-28 16:49:10.671 INFO 31468 --- [nio-8080-exec-9] c.m.d.controller.RabbitMQMsgController : 當(dāng)前時(shí)間:Sun Jul 28 16:49:10 CST 2019,收到請(qǐng)求吉懊,msg:shortDelayedMsg,delayTime:2000
2019-07-28 16:49:22.969 INFO 31468 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer : 當(dāng)前時(shí)間:Sun Jul 28 16:49:22 CST 2019,死信隊(duì)列C收到消息:longDelayedMsg
2019-07-28 16:49:22.970 INFO 31468 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer : 當(dāng)前時(shí)間:Sun Jul 28 16:49:22 CST 2019,死信隊(duì)列C收到消息:shortDelayedMsg`
我們先發(fā)了一個(gè)延時(shí)時(shí)長(zhǎng)為20s的消息,然后發(fā)了一個(gè)延時(shí)時(shí)長(zhǎng)為2s的消息假勿,結(jié)果顯示惕它,第二個(gè)消息會(huì)在等第一個(gè)消息成為死信后才會(huì)“死亡“。
0|*1*****八废登、利用RabbitMQ插件實(shí)現(xiàn)延遲隊(duì)列
上文中提到的問題淹魄,確實(shí)是一個(gè)硬傷,如果不能實(shí)現(xiàn)在消息粒度上添加TTL堡距,并使其在設(shè)置的TTL時(shí)間及時(shí)死亡甲锡,就無法設(shè)計(jì)成一個(gè)通用的延時(shí)隊(duì)列兆蕉。
那如何解決這個(gè)問題呢?不要慌缤沦,安裝一個(gè)插件即可:https://www.rabbitmq.com/community-plugins.html 告匠,下載rabbitmq_delayed_message_exchange插件遥诉,然后解壓放置到RabbitMQ的插件目錄。
接下來,進(jìn)入RabbitMQ的安裝目錄下的sbin目錄丈冬,執(zhí)行下面命令讓該插件生效初坠,然后重啟RabbitMQ炸站。
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
然后闹伪,我們?cè)俾暶鲙讉€(gè)Bean:
@Configuration
public class DelayedRabbitMQConfig {
public static final String DELAYED_QUEUE_NAME = "delay.queue.demo.delay.queue";
public static final String DELAYED_EXCHANGE_NAME = "delay.queue.demo.delay.exchange";
public static final String DELAYED_ROUTING_KEY = "delay.queue.demo.delay.routingkey";
@Bean
public Queue immediateQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}
@Bean
public CustomExchange customExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
@Bean
public Binding bindingNotify(@Qualifier("immediateQueue") Queue queue,
@Qualifier("customExchange") CustomExchange customExchange) {
return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
controller層再添加一個(gè)入口:
@RequestMapping("delayMsg2")
public void delayMsg2(String msg, Integer delayTime) {
log.info("當(dāng)前時(shí)間:{},收到請(qǐng)求,msg:{},delayTime:{}", new Date(), msg, delayTime);
sender.sendDelayMsg(msg, delayTime);
}`
消息生產(chǎn)者的代碼也需要修改:
public void sendDelayMsg(String msg, Integer delayTime) {
rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, msg, a ->{
a.getMessageProperties().setDelay(delayTime);
return a;
});
}
最后届巩,再創(chuàng)建一個(gè)消費(fèi)者:
@RabbitListener(queues = DELAYED_QUEUE_NAME)
public void receiveD(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("當(dāng)前時(shí)間:{},延時(shí)隊(duì)列收到消息:{}", new Date().toString(), msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
一切準(zhǔn)備就緒硅瞧,啟動(dòng)!然后分別訪問以下鏈接:
http://localhost:8080/rabbitmq/delayMsg2?msg=msg1&delayTime=20000
http://localhost:8080/rabbitmq/delayMsg2?msg=msg2&delayTime=2000`
日志如下:
2019-07-28 17:28:13.729 INFO 25804 --- [nio-8080-exec-2] c.m.d.controller.RabbitMQMsgController : 當(dāng)前時(shí)間:Sun Jul 28 17:28:13 CST 2019,收到請(qǐng)求恕汇,msg:msg1,delayTime:20000
2019-07-28 17:28:20.607 INFO 25804 --- [nio-8080-exec-1] c.m.d.controller.RabbitMQMsgController : 當(dāng)前時(shí)間:Sun Jul 28 17:28:20 CST 2019,收到請(qǐng)求腕唧,msg:msg2,delayTime:2000
2019-07-28 17:28:22.624 INFO 25804 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer : 當(dāng)前時(shí)間:Sun Jul 28 17:28:22 CST 2019,延時(shí)隊(duì)列收到消息:msg2
2019-07-28 17:28:33.751 INFO 25804 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer : 當(dāng)前時(shí)間:Sun Jul 28 17:28:33 CST 2019,延時(shí)隊(duì)列收到消息:msg1`
第二個(gè)消息被先消費(fèi)掉了,符合預(yù)期瘾英。至此枣接,RabbitMQ實(shí)現(xiàn)延時(shí)隊(duì)列的部分就完結(jié)了。
九缺谴、總結(jié)
延時(shí)隊(duì)列在需要延時(shí)處理的場(chǎng)景下非常有用月腋,使用RabbitMQ來實(shí)現(xiàn)延時(shí)隊(duì)列可以很好的利用RabbitMQ的特性,如:消息可靠發(fā)送瓣赂、消息可靠投遞、死信隊(duì)列來保障消息至少被消費(fèi)一次以及未被正確處理的消息不會(huì)被丟棄片拍。另外煌集,通過RabbitMQ集群的特性,可以很好的解決單點(diǎn)故障問題捌省,不會(huì)因?yàn)閱蝹€(gè)節(jié)點(diǎn)掛掉導(dǎo)致延時(shí)隊(duì)列不可用或者消息丟失苫纤。
當(dāng)然,延時(shí)隊(duì)列還有很多其它選擇纲缓,比如利用Java的DelayQueu卷拘,利用Redis的zset,利用Quartz或者利用kafka的時(shí)間輪祝高,這些方式各有特點(diǎn)栗弟,但就像爐石傳說一般,這些知識(shí)就好比手里的卡牌工闺,知道的越多乍赫,可以用的卡牌也就越多瓣蛀,遇到問題便能游刃有余,所以需要大量的知識(shí)儲(chǔ)備和經(jīng)驗(yàn)積累才能打造出更出色的卡牌組合雷厂,讓自己解決問題的能力得到更好的提升惋增。
但另一方面,隨著時(shí)間的流逝和閱歷的增長(zhǎng)改鲫,越來越感覺到自己的能力有限诈皿,無法獨(dú)自面對(duì)紛繁復(fù)雜且多變的業(yè)務(wù)需求,在很多方面需要其他人的協(xié)助才能很好的完成任務(wù)像棘。也知道聞道有先后稽亏,術(shù)業(yè)有專攻,不會(huì)再狂妄自大讲弄,覺得自己能把所有事情都搞定措左,也將重心慢慢轉(zhuǎn)移到研究如何有效的進(jìn)行團(tuán)隊(duì)合作上來,我相信一個(gè)高度協(xié)調(diào)的團(tuán)隊(duì)永遠(yuǎn)比一個(gè)人戰(zhàn)斗要更有價(jià)值避除。
花了一個(gè)周末的時(shí)間完成了這篇文章怎披,文中所有的代碼都上傳到了github,https://github.com/MFrank2016/delayed-queue-demo如有需要可以自行查閱瓶摆,希望能對(duì)你有幫助凉逛,如果有錯(cuò)誤的地方,歡迎指正群井,也歡迎關(guān)注我的公眾號(hào)進(jìn)行留言交流状飞。