【RabbitMQ】一文帶你搞定RabbitMQ延遲隊(duì)列

預(yù)計(jì)閱讀:10分鐘

一、說明

在上一篇中,介紹了RabbitMQ中的死信隊(duì)列是什么恢总,何時(shí)使用以及如何使用RabbitMQ的死信隊(duì)列。相信通過上一篇的學(xué)習(xí)睬愤,對(duì)于死信隊(duì)列已經(jīng)有了更多的了解,這一篇的內(nèi)容也跟死信隊(duì)列息息相關(guān)纹安,如果你還不了解死信隊(duì)列尤辱,那么建議你先進(jìn)行上一篇文章的閱讀。

這一篇里厢岂,我們將繼續(xù)介紹RabbitMQ的高級(jí)特性光督,通過本篇的學(xué)習(xí),你將收獲:

  1. 什么是延時(shí)隊(duì)列
  2. 延時(shí)隊(duì)列使用場(chǎng)景
  3. RabbitMQ中的TTL
  4. 如何利用RabbitMQ來實(shí)現(xiàn)延時(shí)隊(duì)列

二塔粒、本文大綱

以下是本文大綱:

1.png

本文閱讀前结借,需要對(duì)RabbitMQ以及死信隊(duì)列有一個(gè)簡(jiǎn)單的了解。

三卒茬、什么是延時(shí)隊(duì)列

延時(shí)隊(duì)列船老,首先,它是一種隊(duì)列圃酵,隊(duì)列意味著內(nèi)部的元素是有序的柳畔,元素出隊(duì)和入隊(duì)是有方向性的,元素從一端進(jìn)入郭赐,從另一端取出薪韩。

其次,延時(shí)隊(duì)列捌锭,最重要的特性就體現(xiàn)在它的延時(shí)屬性上俘陷,跟普通的隊(duì)列不一樣的是,普通隊(duì)列中的元素總是等著希望被早點(diǎn)取出處理观谦,而延時(shí)隊(duì)列中的元素則是希望被在指定時(shí)間得到取出和處理拉盾,所以延時(shí)隊(duì)列中的元素是都是帶時(shí)間屬性的,通常來說是需要被處理的消息或者任務(wù)坎匿。

簡(jiǎn)單來說盾剩,延時(shí)隊(duì)列就是用來存放需要在指定時(shí)間被處理的元素的隊(duì)列。

四替蔬、延時(shí)隊(duì)列使用場(chǎng)景

那么什么時(shí)候需要用延時(shí)隊(duì)列呢告私?考慮一下以下場(chǎng)景:

  1. 訂單在十分鐘之內(nèi)未支付則自動(dòng)取消。
  2. 新創(chuàng)建的店鋪承桥,如果在十天內(nèi)都沒有上傳過商品驻粟,則自動(dòng)發(fā)送消息提醒。
  3. 賬單在一周內(nèi)未支付,則自動(dòng)結(jié)算蜀撑。
  4. 用戶注冊(cè)成功后挤巡,如果三天內(nèi)沒有登陸則進(jìn)行短信提醒。
  5. 用戶發(fā)起退款酷麦,如果三天內(nèi)沒有得到處理則通知相關(guān)運(yùn)營人員矿卑。
  6. 預(yù)定會(huì)議后,需要在預(yù)定的時(shí)間點(diǎn)前十分鐘通知各個(gè)與會(huì)人員參加會(huì)議沃饶。

這些場(chǎng)景都有一個(gè)特點(diǎn)母廷,需要在某個(gè)事件發(fā)生之后或者之前的指定時(shí)間點(diǎn)完成某一項(xiàng)任務(wù),如:發(fā)生訂單生成事件糊肤,在十分鐘之后檢查該訂單支付狀態(tài)琴昆,然后將未支付的訂單進(jìn)行關(guān)閉;發(fā)生店鋪創(chuàng)建事件馆揉,十天后檢查該店鋪上新商品數(shù)业舍,然后通知上新數(shù)為0的商戶;發(fā)生賬單生成事件升酣,檢查賬單支付狀態(tài)舷暮,然后自動(dòng)結(jié)算未支付的賬單;發(fā)生新用戶注冊(cè)事件拗踢,三天后檢查新注冊(cè)用戶的活動(dòng)數(shù)據(jù)脚牍,然后通知沒有任何活動(dòng)記錄的用戶;發(fā)生退款事件巢墅,在三天之后檢查該訂單是否已被處理诸狭,如仍未被處理,則發(fā)送消息給相關(guān)運(yùn)營人員君纫;發(fā)生預(yù)定會(huì)議事件驯遇,判斷離會(huì)議開始是否只有十分鐘了,如果是蓄髓,則通知各個(gè)與會(huì)人員叉庐。

看起來似乎使用定時(shí)任務(wù),一直輪詢數(shù)據(jù)会喝,每秒查一次陡叠,取出需要被處理的數(shù)據(jù),然后處理不就完事了嗎肢执?如果數(shù)據(jù)量比較少枉阵,確實(shí)可以這樣做,比如:對(duì)于“如果賬單一周內(nèi)未支付則進(jìn)行自動(dòng)結(jié)算”這樣的需求预茄,如果對(duì)于時(shí)間不是嚴(yán)格限制兴溜,而是寬松意義上的一周,那么每天晚上跑個(gè)定時(shí)任務(wù)檢查一下所有未支付的賬單,確實(shí)也是一個(gè)可行的方案拙徽。但對(duì)于數(shù)據(jù)量比較大刨沦,并且時(shí)效性較強(qiáng)的場(chǎng)景,如:“訂單十分鐘內(nèi)未支付則關(guān)閉“膘怕,短期內(nèi)未支付的訂單數(shù)據(jù)可能會(huì)有很多想诅,活動(dòng)期間甚至?xí)_(dá)到百萬甚至千萬級(jí)別,對(duì)這么龐大的數(shù)據(jù)量仍舊使用輪詢的方式顯然是不可取的岛心,很可能在一秒內(nèi)無法完成所有訂單的檢查侧蘸,同時(shí)會(huì)給數(shù)據(jù)庫帶來很大壓力,無法滿足業(yè)務(wù)要求而且性能低下鹉梨。

更重要的一點(diǎn)是,不穿稳!優(yōu)存皂!雅!

沒錯(cuò)逢艘,作為一名有追求的程序員旦袋,始終應(yīng)該追求更優(yōu)雅的架構(gòu)和更優(yōu)雅的代碼風(fēng)格,寫代碼要像寫詩一樣優(yōu)美它改“淘校【滑稽】

這時(shí)候,延時(shí)隊(duì)列就可以閃亮登場(chǎng)了央拖,以上場(chǎng)景祭阀,正是延時(shí)隊(duì)列的用武之地。

既然延時(shí)隊(duì)列可以解決很多特定場(chǎng)景下鲜戒,帶時(shí)間屬性的任務(wù)需求专控,那么如何構(gòu)造一個(gè)延時(shí)隊(duì)列呢?接下來遏餐,本文將介紹如何用RabbitMQ來實(shí)現(xiàn)延時(shí)隊(duì)列伦腐。

五、RabbitMQ中的TTL

在介紹延時(shí)隊(duì)列之前失都,還需要先介紹一下RabbitMQ中的一個(gè)高級(jí)特性——TTL(Time To Live)柏蘑。

TTL是什么呢?TTL是RabbitMQ中一個(gè)消息或者隊(duì)列的屬性粹庞,表明一條消息或者該隊(duì)列中的所有消息的最大存活時(shí)間咳焚,單位是毫秒。換句話說信粮,如果一條消息設(shè)置了TTL屬性或者進(jìn)入了設(shè)置TTL屬性的隊(duì)列黔攒,那么這條消息如果在TTL設(shè)置的時(shí)間內(nèi)沒有被消費(fèi),則會(huì)成為“死信”(至于什么是死信,請(qǐng)翻看上一篇)督惰。如果同時(shí)配置了隊(duì)列的TTL和消息的TTL不傅,那么較小的那個(gè)值將會(huì)被使用。

那么赏胚,如何設(shè)置這個(gè)TTL值呢访娶?有兩種方式,第一種是在創(chuàng)建隊(duì)列的時(shí)候設(shè)置隊(duì)列的“x-message-ttl”屬性觉阅,如下:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);` 

這樣所有被投遞到該隊(duì)列的消息都最多不會(huì)存活超過6s崖疤。

另一種方式便是針對(duì)每條消息設(shè)置TTL,代碼如下:


AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());` 

這樣這條消息的過期時(shí)間也被設(shè)置成了6s典勇。

但這兩種方式是有區(qū)別的劫哼,如果設(shè)置了隊(duì)列的TTL屬性,那么一旦消息過期割笙,就會(huì)被隊(duì)列丟棄权烧,而第二種方式,消息即使過期伤溉,也不一定會(huì)被馬上丟棄般码,因?yàn)橄⑹欠襁^期是在即將投遞到消費(fèi)者之前判定的,如果當(dāng)前隊(duì)列有嚴(yán)重的消息積壓情況乱顾,則已過期的消息也許還能存活較長(zhǎng)時(shí)間板祝。

另外,還需要注意的一點(diǎn)是走净,如果不設(shè)置TTL券时,表示消息永遠(yuǎn)不會(huì)過期,如果將TTL設(shè)置為0温技,則表示除非此時(shí)可以直接投遞該消息到消費(fèi)者革为,否則該消息將會(huì)被丟棄。

六舵鳞、如何利用RabbitMQ實(shí)現(xiàn)延時(shí)隊(duì)列

前一篇里介紹了如果設(shè)置死信隊(duì)列震檩,前文中又介紹了TTL,至此蜓堕,利用RabbitMQ實(shí)現(xiàn)延時(shí)隊(duì)列的兩大要素已經(jīng)集齊抛虏,接下來只需要將它們進(jìn)行調(diào)和,再加入一點(diǎn)點(diǎn)調(diào)味料套才,延時(shí)隊(duì)列就可以新鮮出爐了迂猴。

想想看,延時(shí)隊(duì)列背伴,不就是想要消息延遲多久被處理嗎沸毁,TTL則剛好能讓消息在延遲多久之后成為死信峰髓,另一方面,成為死信的消息都會(huì)被投遞到死信隊(duì)列里息尺,這樣只需要消費(fèi)者一直消費(fèi)死信隊(duì)列里的消息就萬事大吉了携兵,因?yàn)槔锩娴南⒍际窍M涣⒓刺幚淼南ⅰ?/p>

從下圖可以大致看出消息的流向:

生產(chǎn)者生產(chǎn)一條延時(shí)消息,根據(jù)需要延時(shí)時(shí)間的不同搂誉,利用不同的routingkey將消息路由到不同的延時(shí)隊(duì)列徐紧,每個(gè)隊(duì)列都設(shè)置了不同的TTL屬性,并綁定在同一個(gè)死信交換機(jī)中炭懊,消息過期后并级,根據(jù)routingkey的不同,又會(huì)被路由到不同的死信隊(duì)列中侮腹,消費(fèi)者只需要監(jiān)聽對(duì)應(yīng)的死信隊(duì)列進(jìn)行處理即可嘲碧。

下面來看代碼:

先聲明交換機(jī)、隊(duì)列以及他們的綁定關(guān)系:


@Configuration
public class RabbitMQConfig {

    public static final String DELAY_EXCHANGE_NAME = "delay.queue.demo.business.exchange";
    public static final String DELAY_QUEUEA_NAME = "delay.queue.demo.business.queuea";
    public static final String DELAY_QUEUEB_NAME = "delay.queue.demo.business.queueb";
    public static final String DELAY_QUEUEA_ROUTING_KEY = "delay.queue.demo.business.queuea.routingkey";
    public static final String DELAY_QUEUEB_ROUTING_KEY = "delay.queue.demo.business.queueb.routingkey";
    public static final String DEAD_LETTER_EXCHANGE = "delay.queue.demo.deadletter.exchange";
    public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "delay.queue.demo.deadletter.delay_10s.routingkey";
    public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "delay.queue.demo.deadletter.delay_60s.routingkey";
    public static final String DEAD_LETTER_QUEUEA_NAME = "delay.queue.demo.deadletter.queuea";
    public static final String DEAD_LETTER_QUEUEB_NAME = "delay.queue.demo.deadletter.queueb";

    // 聲明延時(shí)Exchange
    @Bean("delayExchange")
    public DirectExchange delayExchange(){
        return new DirectExchange(DELAY_EXCHANGE_NAME);
    }

    // 聲明死信Exchange
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }

    // 聲明延時(shí)隊(duì)列A 延時(shí)10s
    // 并綁定到對(duì)應(yīng)的死信交換機(jī)
    @Bean("delayQueueA")
    public Queue delayQueueA(){
        Map<String, Object> args = new HashMap<>(2);
        // x-dead-letter-exchange    這里聲明當(dāng)前隊(duì)列綁定的死信交換機(jī)
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // x-dead-letter-routing-key  這里聲明當(dāng)前隊(duì)列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
        // x-message-ttl  聲明隊(duì)列的TTL
        args.put("x-message-ttl", 6000);
        return QueueBuilder.durable(DELAY_QUEUEA_NAME).withArguments(args).build();
    }

    // 聲明延時(shí)隊(duì)列B 延時(shí) 60s
    // 并綁定到對(duì)應(yīng)的死信交換機(jī)
    @Bean("delayQueueB")
    public Queue delayQueueB(){
        Map<String, Object> args = new HashMap<>(2);
        // x-dead-letter-exchange    這里聲明當(dāng)前隊(duì)列綁定的死信交換機(jī)
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // x-dead-letter-routing-key  這里聲明當(dāng)前隊(duì)列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY);
        // x-message-ttl  聲明隊(duì)列的TTL
        args.put("x-message-ttl", 60000);
        return QueueBuilder.durable(DELAY_QUEUEB_NAME).withArguments(args).build();
    }

    // 聲明死信隊(duì)列A 用于接收延時(shí)10s處理的消息
    @Bean("deadLetterQueueA")
    public Queue deadLetterQueueA(){
        return new Queue(DEAD_LETTER_QUEUEA_NAME);
    }

    // 聲明死信隊(duì)列B 用于接收延時(shí)60s處理的消息
    @Bean("deadLetterQueueB")
    public Queue deadLetterQueueB(){
        return new Queue(DEAD_LETTER_QUEUEB_NAME);
    }

    // 聲明延時(shí)隊(duì)列A綁定關(guān)系
    @Bean
    public Binding delayBindingA(@Qualifier("delayQueueA") Queue queue,
                                    @Qualifier("delayExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEA_ROUTING_KEY);
    }

    // 聲明業(yè)務(wù)隊(duì)列B綁定關(guān)系
    @Bean
    public Binding delayBindingB(@Qualifier("delayQueueB") Queue queue,
                                    @Qualifier("delayExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEB_ROUTING_KEY);
    }

    // 聲明死信隊(duì)列A綁定關(guān)系
    @Bean
    public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
                                    @Qualifier("deadLetterExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
    }

    // 聲明死信隊(duì)列B綁定關(guān)系
    @Bean
    public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,
                                      @Qualifier("deadLetterExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
    }
}` 

接下來父阻,創(chuàng)建兩個(gè)消費(fèi)者呀潭,分別對(duì)兩個(gè)死信隊(duì)列的消息進(jìn)行消費(fèi):


@Slf4j
@Component
public class DeadLetterQueueConsumer {

    @RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)
    public void receiveA(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("當(dāng)前時(shí)間:{},死信隊(duì)列A收到消息:{}", new Date().toString(), msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    @RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)
    public void receiveB(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("當(dāng)前時(shí)間:{},死信隊(duì)列B收到消息:{}", new Date().toString(), msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}` 

然后是消息的生產(chǎn)者:


@Component
public class DelayMessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMsg(String msg, DelayTypeEnum type){
        switch (type){
            case DELAY_10s:
                rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUEA_ROUTING_KEY, msg);
                break;
            case DELAY_60s:
                rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUEB_ROUTING_KEY, msg);
                break;
        }
    }
}

接下來,我們暴露一個(gè)web接口來生產(chǎn)消息:

@Slf4j
@RequestMapping("rabbitmq")
@RestController
public class RabbitMQMsgController {

    @Autowired
    private DelayMessageSender sender;

    @RequestMapping("sendmsg")
    public void sendMsg(String msg, Integer delayType){
        log.info("當(dāng)前時(shí)間:{},收到請(qǐng)求至非,msg:{},delayType:{}", new Date(), msg, delayType);
        sender.sendMsg(msg, Objects.requireNonNull(DelayTypeEnum.getDelayTypeEnumByValue(delayType)));
    }
}` 

準(zhǔn)備就緒,啟動(dòng)糠聪!

打開rabbitMQ的管理后臺(tái)荒椭,可以看到我們剛才創(chuàng)建的交換機(jī)和隊(duì)列信息:

接下來,我們來發(fā)送幾條消息舰蟆,http://localhost:8080/rabbitmq/sendmsg?msg=testMsg1&delayType=1 http://localhost:8080/rabbitmq/sendmsg?msg=testMsg2&delayType=2

日志如下:


2019-07-28 16:02:19.813  INFO 3860 --- [nio-8080-exec-9] c.m.d.controller.RabbitMQMsgController   : 當(dāng)前時(shí)間:Sun Jul 28 16:02:19 CST 2019,收到請(qǐng)求趣惠,msg:testMsg1,delayType:1
2019-07-28 16:02:19.815  INFO 3860 --- [nio-8080-exec-9] .l.DirectReplyToMessageListenerContainer : SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-o-qPpkWIkRm73DIrOIVhig identity=766339] started
2019-07-28 16:02:25.829  INFO 3860 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer         : 當(dāng)前時(shí)間:Sun Jul 28 16:02:25 CST 2019,死信隊(duì)列A收到消息:testMsg1
2019-07-28 16:02:41.326  INFO 3860 --- [nio-8080-exec-1] c.m.d.controller.RabbitMQMsgController   : 當(dāng)前時(shí)間:Sun Jul 28 16:02:41 CST 2019,收到請(qǐng)求,msg:testMsg2,delayType:2
2019-07-28 16:03:41.329  INFO 3860 --- [ntContainer#0-1] c.m.d.mq.DeadLetterQueueConsumer         : 當(dāng)前時(shí)間:Sun Jul 28 16:03:41 CST 2019,死信隊(duì)列B收到消息:testMsg2` 

第一條消息在6s后變成了死信消息身害,然后被消費(fèi)者消費(fèi)掉味悄,第二條消息在60s之后變成了死信消息,然后被消費(fèi)掉塌鸯,這樣侍瑟,一個(gè)還算ok的延時(shí)隊(duì)列就打造完成了。

不過丙猬,等等涨颜,如果這樣使用的話,豈不是每增加一個(gè)新的時(shí)間需求茧球,就要新增一個(gè)隊(duì)列庭瑰,這里只有6s和60s兩個(gè)時(shí)間選項(xiàng),如果需要一個(gè)小時(shí)后處理抢埋,那么就需要增加TTL為一個(gè)小時(shí)的隊(duì)列弹灭,如果是預(yù)定會(huì)議室然后提前通知這樣的場(chǎng)景督暂,豈不是要增加無數(shù)個(gè)隊(duì)列才能滿足需求?穷吮?

嗯逻翁,仔細(xì)想想,事情并不簡(jiǎn)單酒来。

七卢未、RabbitMQ延時(shí)隊(duì)列優(yōu)化

顯然,需要一種更通用的方案才能滿足需求堰汉,那么就只能將TTL設(shè)置在消息屬性里了辽社。我們來試一試。

增加一個(gè)延時(shí)隊(duì)列翘鸭,用于接收設(shè)置為任意延時(shí)時(shí)長(zhǎng)的消息滴铅,增加一個(gè)相應(yīng)的死信隊(duì)列和routingkey:

@Configuration
public class RabbitMQConfig {

    public static final String DELAY_EXCHANGE_NAME = "delay.queue.demo.business.exchange";
    public static final String DELAY_QUEUEC_NAME = "delay.queue.demo.business.queuec";
    public static final String DELAY_QUEUEC_ROUTING_KEY = "delay.queue.demo.business.queuec.routingkey";
    public static final String DEAD_LETTER_EXCHANGE = "delay.queue.demo.deadletter.exchange";
    public static final String DEAD_LETTER_QUEUEC_ROUTING_KEY = "delay.queue.demo.deadletter.delay_anytime.routingkey";
    public static final String DEAD_LETTER_QUEUEC_NAME = "delay.queue.demo.deadletter.queuec";

    // 聲明延時(shí)Exchange
    @Bean("delayExchange")
    public DirectExchange delayExchange(){
        return new DirectExchange(DELAY_EXCHANGE_NAME);
    }

    // 聲明死信Exchange
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }

    // 聲明延時(shí)隊(duì)列C 不設(shè)置TTL
    // 并綁定到對(duì)應(yīng)的死信交換機(jī)
    @Bean("delayQueueC")
    public Queue delayQueueC(){
        Map<String, Object> args = new HashMap<>(3);
        // x-dead-letter-exchange    這里聲明當(dāng)前隊(duì)列綁定的死信交換機(jī)
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // x-dead-letter-routing-key  這里聲明當(dāng)前隊(duì)列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEC_ROUTING_KEY);
        return QueueBuilder.durable(DELAY_QUEUEC_NAME).withArguments(args).build();
    }

    // 聲明死信隊(duì)列C 用于接收延時(shí)任意時(shí)長(zhǎng)處理的消息
    @Bean("deadLetterQueueC")
    public Queue deadLetterQueueC(){
        return new Queue(DEAD_LETTER_QUEUEC_NAME);
    }

    // 聲明延時(shí)列C綁定關(guān)系
    @Bean
    public Binding delayBindingC(@Qualifier("delayQueueC") Queue queue,
                                 @Qualifier("delayExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEC_ROUTING_KEY);
    }

    // 聲明死信隊(duì)列C綁定關(guān)系
    @Bean
    public Binding deadLetterBindingC(@Qualifier("deadLetterQueueC") Queue queue,
                                      @Qualifier("deadLetterExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEC_ROUTING_KEY);
    }
}

增加一個(gè)死信隊(duì)列C的消費(fèi)者:

@RabbitListener(queues = DEAD_LETTER_QUEUEC_NAME)
public void receiveC(Message message, Channel channel) throws IOException {
    String msg = new String(message.getBody());
    log.info("當(dāng)前時(shí)間:{},死信隊(duì)列C收到消息:{}", new Date().toString(), msg);
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

再次啟動(dòng)!然后訪問:http://localhost:8080/rabbitmq/delayMsg?msg=testMsg1delayTime=5000 來生產(chǎn)消息就乓,注意這里的單位是毫秒汉匙。

2019-07-28 16:45:07.033  INFO 31468 --- [nio-8080-exec-4] c.m.d.controller.RabbitMQMsgController   : 當(dāng)前時(shí)間:Sun Jul 28 16:45:07 CST 2019,收到請(qǐng)求,msg:testMsg1,delayTime:5000
2019-07-28 16:45:11.694  INFO 31468 --- [nio-8080-exec-5] c.m.d.controller.RabbitMQMsgController   : 當(dāng)前時(shí)間:Sun Jul 28 16:45:11 CST 2019,收到請(qǐng)求生蚁,msg:testMsg2,delayTime:5000
2019-07-28 16:45:12.048  INFO 31468 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer         : 當(dāng)前時(shí)間:Sun Jul 28 16:45:12 CST 2019,死信隊(duì)列C收到消息:testMsg1
2019-07-28 16:45:16.709  INFO 31468 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer         : 當(dāng)前時(shí)間:Sun Jul 28 16:45:16 CST 2019,死信隊(duì)列C收到消息:testMsg2` 

看起來似乎沒什么問題噩翠,但不要高興的太早,在最開始的時(shí)候邦投,就介紹過伤锚,如果使用在消息屬性上設(shè)置TTL的方式,消息可能并不會(huì)按時(shí)“死亡“志衣,因?yàn)镽abbitMQ只會(huì)檢查第一個(gè)消息是否過期屯援,如果過期則丟到死信隊(duì)列,索引如果第一個(gè)消息的延時(shí)時(shí)長(zhǎng)很長(zhǎng)念脯,而第二個(gè)消息的延時(shí)時(shí)長(zhǎng)很短狞洋,則第二個(gè)消息并不會(huì)優(yōu)先得到執(zhí)行。

實(shí)驗(yàn)一下:

2019-07-28 16:49:02.957  INFO 31468 --- [nio-8080-exec-8] c.m.d.controller.RabbitMQMsgController   : 當(dāng)前時(shí)間:Sun Jul 28 16:49:02 CST 2019,收到請(qǐng)求绿店,msg:longDelayedMsg,delayTime:20000
2019-07-28 16:49:10.671  INFO 31468 --- [nio-8080-exec-9] c.m.d.controller.RabbitMQMsgController   : 當(dāng)前時(shí)間:Sun Jul 28 16:49:10 CST 2019,收到請(qǐng)求吉懊,msg:shortDelayedMsg,delayTime:2000
2019-07-28 16:49:22.969  INFO 31468 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer         : 當(dāng)前時(shí)間:Sun Jul 28 16:49:22 CST 2019,死信隊(duì)列C收到消息:longDelayedMsg
2019-07-28 16:49:22.970  INFO 31468 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer         : 當(dāng)前時(shí)間:Sun Jul 28 16:49:22 CST 2019,死信隊(duì)列C收到消息:shortDelayedMsg` 

我們先發(fā)了一個(gè)延時(shí)時(shí)長(zhǎng)為20s的消息,然后發(fā)了一個(gè)延時(shí)時(shí)長(zhǎng)為2s的消息假勿,結(jié)果顯示惕它,第二個(gè)消息會(huì)在等第一個(gè)消息成為死信后才會(huì)“死亡“。

0|*1*****八废登、利用RabbitMQ插件實(shí)現(xiàn)延遲隊(duì)列

上文中提到的問題淹魄,確實(shí)是一個(gè)硬傷,如果不能實(shí)現(xiàn)在消息粒度上添加TTL堡距,并使其在設(shè)置的TTL時(shí)間及時(shí)死亡甲锡,就無法設(shè)計(jì)成一個(gè)通用的延時(shí)隊(duì)列兆蕉。

那如何解決這個(gè)問題呢?不要慌缤沦,安裝一個(gè)插件即可:https://www.rabbitmq.com/community-plugins.html 告匠,下載rabbitmq_delayed_message_exchange插件遥诉,然后解壓放置到RabbitMQ的插件目錄。

接下來,進(jìn)入RabbitMQ的安裝目錄下的sbin目錄丈冬,執(zhí)行下面命令讓該插件生效初坠,然后重啟RabbitMQ炸站。


rabbitmq-plugins enable rabbitmq_delayed_message_exchange

然后闹伪,我們?cè)俾暶鲙讉€(gè)Bean:

@Configuration
public class DelayedRabbitMQConfig {
    public static final String DELAYED_QUEUE_NAME = "delay.queue.demo.delay.queue";
    public static final String DELAYED_EXCHANGE_NAME = "delay.queue.demo.delay.exchange";
    public static final String DELAYED_ROUTING_KEY = "delay.queue.demo.delay.routingkey";

    @Bean
    public Queue immediateQueue() {
        return new Queue(DELAYED_QUEUE_NAME);
    }

    @Bean
    public CustomExchange customExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }

    @Bean
    public Binding bindingNotify(@Qualifier("immediateQueue") Queue queue,
                                 @Qualifier("customExchange") CustomExchange customExchange) {
        return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

controller層再添加一個(gè)入口:

@RequestMapping("delayMsg2")
public void delayMsg2(String msg, Integer delayTime) {
    log.info("當(dāng)前時(shí)間:{},收到請(qǐng)求,msg:{},delayTime:{}", new Date(), msg, delayTime);
    sender.sendDelayMsg(msg, delayTime);
}` 

消息生產(chǎn)者的代碼也需要修改:


public void sendDelayMsg(String msg, Integer delayTime) {
    rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, msg, a ->{
        a.getMessageProperties().setDelay(delayTime);
        return a;
    });
}

最后届巩,再創(chuàng)建一個(gè)消費(fèi)者:


@RabbitListener(queues = DELAYED_QUEUE_NAME)
public void receiveD(Message message, Channel channel) throws IOException {
    String msg = new String(message.getBody());
    log.info("當(dāng)前時(shí)間:{},延時(shí)隊(duì)列收到消息:{}", new Date().toString(), msg);
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

一切準(zhǔn)備就緒硅瞧,啟動(dòng)!然后分別訪問以下鏈接:


http://localhost:8080/rabbitmq/delayMsg2?msg=msg1&delayTime=20000
http://localhost:8080/rabbitmq/delayMsg2?msg=msg2&delayTime=2000` 

日志如下:

2019-07-28 17:28:13.729  INFO 25804 --- [nio-8080-exec-2] c.m.d.controller.RabbitMQMsgController   : 當(dāng)前時(shí)間:Sun Jul 28 17:28:13 CST 2019,收到請(qǐng)求恕汇,msg:msg1,delayTime:20000
2019-07-28 17:28:20.607  INFO 25804 --- [nio-8080-exec-1] c.m.d.controller.RabbitMQMsgController   : 當(dāng)前時(shí)間:Sun Jul 28 17:28:20 CST 2019,收到請(qǐng)求腕唧,msg:msg2,delayTime:2000
2019-07-28 17:28:22.624  INFO 25804 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer         : 當(dāng)前時(shí)間:Sun Jul 28 17:28:22 CST 2019,延時(shí)隊(duì)列收到消息:msg2
2019-07-28 17:28:33.751  INFO 25804 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer         : 當(dāng)前時(shí)間:Sun Jul 28 17:28:33 CST 2019,延時(shí)隊(duì)列收到消息:msg1` 

第二個(gè)消息被先消費(fèi)掉了,符合預(yù)期瘾英。至此枣接,RabbitMQ實(shí)現(xiàn)延時(shí)隊(duì)列的部分就完結(jié)了。

九缺谴、總結(jié)

延時(shí)隊(duì)列在需要延時(shí)處理的場(chǎng)景下非常有用月腋,使用RabbitMQ來實(shí)現(xiàn)延時(shí)隊(duì)列可以很好的利用RabbitMQ的特性,如:消息可靠發(fā)送瓣赂、消息可靠投遞、死信隊(duì)列來保障消息至少被消費(fèi)一次以及未被正確處理的消息不會(huì)被丟棄片拍。另外煌集,通過RabbitMQ集群的特性,可以很好的解決單點(diǎn)故障問題捌省,不會(huì)因?yàn)閱蝹€(gè)節(jié)點(diǎn)掛掉導(dǎo)致延時(shí)隊(duì)列不可用或者消息丟失苫纤。

當(dāng)然,延時(shí)隊(duì)列還有很多其它選擇纲缓,比如利用Java的DelayQueu卷拘,利用Redis的zset,利用Quartz或者利用kafka的時(shí)間輪祝高,這些方式各有特點(diǎn)栗弟,但就像爐石傳說一般,這些知識(shí)就好比手里的卡牌工闺,知道的越多乍赫,可以用的卡牌也就越多瓣蛀,遇到問題便能游刃有余,所以需要大量的知識(shí)儲(chǔ)備和經(jīng)驗(yàn)積累才能打造出更出色的卡牌組合雷厂,讓自己解決問題的能力得到更好的提升惋增。

但另一方面,隨著時(shí)間的流逝和閱歷的增長(zhǎng)改鲫,越來越感覺到自己的能力有限诈皿,無法獨(dú)自面對(duì)紛繁復(fù)雜且多變的業(yè)務(wù)需求,在很多方面需要其他人的協(xié)助才能很好的完成任務(wù)像棘。也知道聞道有先后稽亏,術(shù)業(yè)有專攻,不會(huì)再狂妄自大讲弄,覺得自己能把所有事情都搞定措左,也將重心慢慢轉(zhuǎn)移到研究如何有效的進(jìn)行團(tuán)隊(duì)合作上來,我相信一個(gè)高度協(xié)調(diào)的團(tuán)隊(duì)永遠(yuǎn)比一個(gè)人戰(zhàn)斗要更有價(jià)值避除。

花了一個(gè)周末的時(shí)間完成了這篇文章怎披,文中所有的代碼都上傳到了github,https://github.com/MFrank2016/delayed-queue-demo如有需要可以自行查閱瓶摆,希望能對(duì)你有幫助凉逛,如果有錯(cuò)誤的地方,歡迎指正群井,也歡迎關(guān)注我的公眾號(hào)進(jìn)行留言交流状飞。

https://www.cnblogs.com/mfrank/p/11260355.html

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市书斜,隨后出現(xiàn)的幾起案子诬辈,更是在濱河造成了極大的恐慌,老刑警劉巖荐吉,帶你破解...
    沈念sama閱讀 218,941評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件焙糟,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡样屠,警方通過查閱死者的電腦和手機(jī)穿撮,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來痪欲,“玉大人悦穿,你說我怎么就攤上這事∫堤撸” “怎么了栗柒?”我有些...
    開封第一講書人閱讀 165,345評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)知举。 經(jīng)常有香客問我傍衡,道長(zhǎng)深员,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,851評(píng)論 1 295
  • 正文 為了忘掉前任蛙埂,我火速辦了婚禮倦畅,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘绣的。我一直安慰自己叠赐,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,868評(píng)論 6 392
  • 文/花漫 我一把揭開白布屡江。 她就那樣靜靜地躺著芭概,像睡著了一般。 火紅的嫁衣襯著肌膚如雪惩嘉。 梳的紋絲不亂的頭發(fā)上罢洲,一...
    開封第一講書人閱讀 51,688評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音文黎,去河邊找鬼惹苗。 笑死,一個(gè)胖子當(dāng)著我的面吹牛耸峭,可吹牛的內(nèi)容都是我干的桩蓉。 我是一名探鬼主播,決...
    沈念sama閱讀 40,414評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼劳闹,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼院究!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起本涕,我...
    開封第一講書人閱讀 39,319評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤业汰,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后菩颖,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體样漆,經(jīng)...
    沈念sama閱讀 45,775評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,945評(píng)論 3 336
  • 正文 我和宋清朗相戀三年位他,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片产场。...
    茶點(diǎn)故事閱讀 40,096評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡鹅髓,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出京景,到底是詐尸還是另有隱情窿冯,我是刑警寧澤,帶...
    沈念sama閱讀 35,789評(píng)論 5 346
  • 正文 年R本政府宣布确徙,位于F島的核電站醒串,受9級(jí)特大地震影響执桌,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜芜赌,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,437評(píng)論 3 331
  • 文/蒙蒙 一仰挣、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧缠沈,春花似錦膘壶、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,993評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至柬赐,卻和暖如春亡问,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背肛宋。 一陣腳步聲響...
    開封第一講書人閱讀 33,107評(píng)論 1 271
  • 我被黑心中介騙來泰國打工州藕, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人悼吱。 一個(gè)月前我還...
    沈念sama閱讀 48,308評(píng)論 3 372
  • 正文 我出身青樓慎框,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國和親后添。 傳聞我的和親對(duì)象是個(gè)殘疾皇子笨枯,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,037評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容