RabbitMQ 延時隊列

在上一篇文章中碟嘴,我們學習了死信隊列的相關內(nèi)容,文章最后我們提到力九,超時消息結合死信隊列也可以實現(xiàn)一個延時隊列耍铜。大致的流程是這樣的,如果正常業(yè)務隊列中的消息設置了過期時間跌前,并在消息過期后棕兼,讓消息流入一個死信隊列,然后消費者監(jiān)聽這個死信隊列抵乓,實現(xiàn)消息的延時處理伴挚,這樣就可以實現(xiàn)一個簡單的延時隊列。

上邊描述的延時隊列灾炭,其實是存在一些問題的茎芋,應付簡單的場景還行,如果需要獲得更加完善的功能體驗蜈出,可以選擇使用 RabbitMQ 提供的延時消息插件田弥。下邊我們分別了解兩種實現(xiàn)方式。

一铡原、準備工作

創(chuàng)建 SpringBoot 項目偷厦,添加依賴以及連接配置:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.properties配置 RabbitMQ 服務的相關連接信息:

server.port=8083
# rabbitmq 相關配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/

二、延時隊列的簡單實現(xiàn)

首先創(chuàng)建過期消息的交換機燕刻、隊列只泼,以及死信交換機、隊列卵洗,并完成綁定配置请唱,讓過期消息可以流入死信隊列:

@Configuration
public class RabbitMQConfig {
    // 創(chuàng)建過期消息的交換機
    @Bean
    DirectExchange ttlExchange() {
        return new DirectExchange("ttl.exchange", true, false);
    }

    // 創(chuàng)建死信交換機
    @Bean
    DirectExchange deadLetterExchange() {
        return new DirectExchange("dead.letter.exchange", true, false);
    }

    // 創(chuàng)建有過期時間的消息隊列,并配置死信隊列
    @Bean
    Queue ttlQueue() {
        HashMap<String, Object> args = new HashMap<>();
        // 設置隊列中消息的過期時間过蹂,單位毫秒
        args.put("x-message-ttl", 10 * 60 * 1000);
        // 設置死信交換機
        args.put("x-dead-letter-exchange", "dead.letter.exchange");
        // 設置死信交換機綁定隊列的routingKey
        args.put("x-dead-letter-routing-key", "dead.letter");
        return new Queue("ttl.queue", true, false, false, args);
    }

    // 創(chuàng)建死信隊列
    @Bean
    Queue deadLetterQueue() {
        return new Queue("dead.letter.queue", true);
    }
    
    @Bean
    Binding ttlBinding() {
        return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
    }

    @Bean
    Binding deadLetterBinding() {
        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("dead.letter");
    }
}

核心的內(nèi)容就是上邊的配置類了十绑,接下來就是發(fā)送消息到業(yè)務消息隊列,并只給死信隊列指定消費者榴啸,這樣發(fā)送的消息在正常業(yè)務隊列過期后孽惰,最終會流入死信隊列,進而被消費掉鸥印。生產(chǎn)者和消費者的代碼很簡單:

@Service
public class DelayedMessageSendService {
    Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());

    @Autowired
    RabbitTemplate rabbitTemplate;

    public void send(String message) {
        rabbitTemplate.convertAndSend("ttl.exchange", "ttl", message);
        logger.info("發(fā)送的業(yè)務消息:" + message);
    }
}
@Service
public class DelayedMessageReceiveService {
    Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());

    @RabbitListener(queues = "dead.letter.queue")
    public void receive(String message) {
        logger.info("收到的延時消息:" + message);
    }
}

如果我們有其它不同時間的延時業(yè)務需求勋功,就需要在配置類添加更多的和ttl.queue類似配置的過期消息隊列,如果新的延時業(yè)務需求太多库说,新消息隊列的數(shù)量將不可控狂鞋。

那如果不給隊列配置消息的過期時間,而是在發(fā)送消息時單獨給每條消息配置呢潜的?這樣想想好像解決上邊的問題骚揍,實現(xiàn)的代碼如下:

@Service
public class DelayedMessageSendService {
    Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());

    @Autowired
    RabbitTemplate rabbitTemplate;

    public void send(String message, Integer delay) {
        MessagePostProcessor processor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                // 設置消息的過期時間,單位毫秒
                message.getMessageProperties().setExpiration(String.valueOf(delay));
                return message;
            }
        };
        rabbitTemplate.convertAndSend("ttl.exchange", "ttl", message, processor);
        System.out.println("發(fā)送的業(yè)務消息:" + message);
    }
}

消費者的代碼還是上邊的。

我們發(fā)送兩條消息測試一下:

delayedMessageSendService.send("hello world", 30000);
delayedMessageSendService.send("hello rabbitmq", 10000);

仔細觀察上邊的運行結果信不,按照預期hello rabbitmq應該在10秒后先被消費嘲叔,然而由于我們先發(fā)送的hello world消息設置的過期時間為30秒,導致hello rabbitmq被阻塞抽活,直到30秒后陸續(xù)被消費掉硫戈。問題很明顯了,后發(fā)送消息的過期時間必須大于大于前邊已經(jīng)發(fā)送消息的過期時間下硕,這樣才能保證延時隊列正常工作丁逝,但實際使用中幾乎不能保證的。

可以看到梭姓,我們簡單實現(xiàn)的延時隊列雖然可用霜幼,但還是存在問題的。使用 RabbitMQ 延時消息插件誉尖,就不存在這些問題了罪既,可用性更高!

三释牺、RabbitMQ 延時消息插件

1萝衩、插件安裝

RabbitMQ 默認是沒有內(nèi)置延時消息插件的,需要我們單獨下載安裝没咙。下載地址入口:
https://www.rabbitmq.com/community-plugins.html

將下載好的插件放到 RabbitMQ 安裝目錄下的plugins目錄,然后進入sbin目錄千劈,我安裝的 Windows 版的 RabbitMQ祭刚,執(zhí)行rabbitmq-plugins.bat enable rabbitmq_delayed_message_exchange命令來安裝插件:


安裝好后,重啟服務就可以使用了墙牌。

如果使用原生 Java client 操作延時消息插件涡驮,可以參考:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

2、用法介紹

首先創(chuàng)建延時消息交換機以及隊列喜滨,創(chuàng)建交換機和之前有所不同捉捅,需要使用CustomExchange

@Configuration
public class DelayedRabbitMQConfig {
    @Bean
    CustomExchange delayedExchange() {
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange("delayed.exchange", "x-delayed-message", true, false, args);
    }

    @Bean
    Queue delayedQueue() {
        return new Queue("delayed.queue", true);
    }

    @Bean
    Binding delayedBinding() {
        return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delayed").noargs();
    }
}

然后創(chuàng)建消費者,監(jiān)聽delayed.queue

@Service
public class DelayedMessageReceiveService {
    Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());

    @RabbitListener(queues = "delayed.queue")
    public void receive2(String message) {
        logger.info("收到的延時消息:" + message);
    }
}

發(fā)送消息時指定消息要延時處理的時間:

@Service
public class DelayedMessageSendService {
    Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());

    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * @param message
     * @param delay   延時時間虽风,單位毫秒
     */
    public void send2(String message, Integer delay) {
        MessagePostProcessor processor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                // 設置消息延時處理的時間
                message.getMessageProperties().setDelay(delay);
                return message;
            }
        };
        rabbitTemplate.convertAndSend("delayed.exchange", "delayed", message, processor);
        logger.info("發(fā)送的延時消息:" + message);
    }
}

然后發(fā)送幾條消息棒口,測試下效果:

delayedMessageSendService.send2("hello world", 30000);
delayedMessageSendService.send2("hello rabbitmq", 10000);
delayedMessageSendService.send2("hello kitty", 20000);

最終的效果如下,符合我們的預期:


通過官方插件來實現(xiàn)延時消息隊列還是很簡單的辜膝。

使用延時隊列可以完成很多常見的需求无牵,比如預約商品開售前提醒用戶購買、下單后一定時間未支付則取消訂單厂抖、收到商品后一定時間還沒確認收貨則系統(tǒng)自動確認收貨等茎毁。

關于 RabbitMQ 延時隊列的內(nèi)容就介紹到這里了。

本文完忱辅!

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末七蜘,一起剝皮案震驚了整個濱河市谭溉,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌橡卤,老刑警劉巖扮念,帶你破解...
    沈念sama閱讀 219,270評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異蒜魄,居然都是意外死亡扔亥,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,489評論 3 395
  • 文/潘曉璐 我一進店門谈为,熙熙樓的掌柜王于貴愁眉苦臉地迎上來旅挤,“玉大人,你說我怎么就攤上這事伞鲫≌城眩” “怎么了?”我有些...
    開封第一講書人閱讀 165,630評論 0 356
  • 文/不壞的土叔 我叫張陵秕脓,是天一觀的道長柒瓣。 經(jīng)常有香客問我,道長吠架,這世上最難降的妖魔是什么芙贫? 我笑而不...
    開封第一講書人閱讀 58,906評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮傍药,結果婚禮上磺平,老公的妹妹穿的比我還像新娘。我一直安慰自己拐辽,他們只是感情好拣挪,可當我...
    茶點故事閱讀 67,928評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著俱诸,像睡著了一般菠劝。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上睁搭,一...
    開封第一講書人閱讀 51,718評論 1 305
  • 那天赶诊,我揣著相機與錄音,去河邊找鬼介袜。 笑死甫何,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的遇伞。 我是一名探鬼主播辙喂,決...
    沈念sama閱讀 40,442評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了巍耗?” 一聲冷哼從身側響起秋麸,我...
    開封第一講書人閱讀 39,345評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎炬太,沒想到半個月后灸蟆,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,802評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡亲族,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,984評論 3 337
  • 正文 我和宋清朗相戀三年炒考,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片霎迫。...
    茶點故事閱讀 40,117評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡斋枢,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出知给,到底是詐尸還是另有隱情瓤帚,我是刑警寧澤,帶...
    沈念sama閱讀 35,810評論 5 346
  • 正文 年R本政府宣布涩赢,位于F島的核電站戈次,受9級特大地震影響,放射性物質發(fā)生泄漏筒扒。R本人自食惡果不足惜怯邪,卻給世界環(huán)境...
    茶點故事閱讀 41,462評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望花墩。 院中可真熱鬧擎颖,春花似錦、人聲如沸观游。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,011評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽懂缕。三九已至,卻和暖如春王凑,著一層夾襖步出監(jiān)牢的瞬間搪柑,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,139評論 1 272
  • 我被黑心中介騙來泰國打工索烹, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留工碾,地道東北人。 一個月前我還...
    沈念sama閱讀 48,377評論 3 373
  • 正文 我出身青樓百姓,卻偏偏與公主長得像渊额,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,060評論 2 355

推薦閱讀更多精彩內(nèi)容

  • 延時隊列我們可以簡單粗暴的理解它為延時發(fā)送消息的隊列 那延時隊列的應用場景有哪些呢旬迹,比如訂單在一段時間內(nèi)未支付則取...
    這是一個假的程序員閱讀 388評論 0 0
  • 延時隊列在實際業(yè)務場景中可能會用到延時消息發(fā)送火惊,例如支付場景,準時支付奔垦、超過未支付將執(zhí)行不同的方案屹耐,其中超時未支付...
    禪兜閱讀 4,529評論 0 1
  • rabbitmq延時隊列(實現(xiàn)定時任務) 場景 比如未付款訂單,超過一定時間后椿猎,系統(tǒng)自動取消訂單并釋放占有物品惶岭。 ...
    緩慢移動的蝸牛閱讀 651評論 0 0
  • 什么是延時隊列 延遲隊列首先它是一個隊列,作為隊列它的第一個特征是有序的犯眠,而之所以它被稱為延時隊列它還有一個更重要...
    一個菜鳥JAVA閱讀 4,944評論 0 1
  • 在RabbitMQ中,隊列支持下面幾個屬性.x-message-ttl: 10000 表示隊列中的消息只能存活10...
    江河湖海琴瑟琵琶閱讀 2,063評論 0 0