第四十六章:SpringBoot & RabbitMQ完成消息延遲消費

2018-3-1SpringBoot官方發(fā)版了2.0.0.RELEASE最新版本,新版本完全基于Spring5.0來構(gòu)建,JDK最低支持也從原來的1.6也改成了1.8,不再兼容1.8以下的版本,更多新特性請查看官方文檔睬澡。

本章目標

基于SpringBoot整合RabbitMQ完成消息延遲消費。

免費教程專題

恒宇少年在博客整理三套免費學習教程專題艳汽,由于文章偏多特意添加了閱讀指南猴贰,新文章以及之前的文章都會在專題內(nèi)陸續(xù)填充对雪,希望可以幫助大家解惑更多知識點河狐。

構(gòu)建項目

注意前言

由于SpringBoot的內(nèi)置掃描機制,我們?nèi)绻蛔詣优渲脪呙杪窂缴罚埍3窒旅?code>rabbitmq-common模塊內(nèi)的配置可以被SpringBoot掃描到馋艺,否則不會自動創(chuàng)建隊列,控制臺會輸出404的錯誤信息迈套。

SpringBoot 企業(yè)級核心技術(shù)學習專題


專題 專題名稱 專題描述
001 Spring Boot 核心技術(shù) 講解SpringBoot一些企業(yè)級層面的核心組件
002 Spring Boot 核心技術(shù)章節(jié)源碼 Spring Boot 核心技術(shù)簡書每一篇文章碼云對應(yīng)源碼
003 Spring Cloud 核心技術(shù) 對Spring Cloud核心技術(shù)全面講解
004 Spring Cloud 核心技術(shù)章節(jié)源碼 Spring Cloud 核心技術(shù)簡書每一篇文章對應(yīng)源碼
005 QueryDSL 核心技術(shù) 全面講解QueryDSL核心技術(shù)以及基于SpringBoot整合SpringDataJPA
006 SpringDataJPA 核心技術(shù) 全面講解SpringDataJPA核心技術(shù)
007 SpringBoot核心技術(shù)學習目錄 SpringBoot系統(tǒng)的學習目錄捐祠,敬請關(guān)注點贊!桑李!!

我們本章采用2.0.0.RELEASE版本的SpringBoot踱蛀,添加相關(guān)的依賴如下所示:

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
</parent>
......
<dependencies>
        <!--rabbbitMQ相關(guān)依賴-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!--web相關(guān)依賴-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--lombok依賴-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <!--spring boot tester-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!--fast json依賴-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.40</version>
        </dependency>
    </dependencies>
......

我們?nèi)匀徊捎枚嗄K的方式來測試隊列的Provider以及Consumer

隊列公共模塊

我們先來創(chuàng)建一個名為rabbitmq-common公共依賴模塊(Create New Maven Module)
在公共模塊內(nèi)添加一個QueueEnum隊列枚舉配置贵白,該枚舉內(nèi)配置隊列的Exchange率拒、QueueNameRouteKey等相關(guān)內(nèi)容禁荒,如下所示:

package com.hengyu.rabbitmq.lazy.enums;

import lombok.Getter;

/**
 * 消息隊列枚舉配置
 *
 * @author:于起宇 <br/>
 * ===============================
 * Created with IDEA.
 * Date:2018/3/3
 * Time:下午4:33
 * 簡書:http://www.reibang.com/u/092df3f77bca
 * ================================
 */
@Getter
public enum QueueEnum {
    /**
     * 消息通知隊列
     */
    MESSAGE_QUEUE("message.center.direct", "message.center.create", "message.center.create"),
    /**
     * 消息通知ttl隊列
     */
    MESSAGE_TTL_QUEUE("message.center.topic.ttl", "message.center.create.ttl", "message.center.create.ttl");
    /**
     * 交換名稱
     */
    private String exchange;
    /**
     * 隊列名稱
     */
    private String name;
    /**
     * 路由鍵
     */
    private String routeKey;

    QueueEnum(String exchange, String name, String routeKey) {
        this.exchange = exchange;
        this.name = name;
        this.routeKey = routeKey;
    }
}

可以看到MESSAGE_QUEUE隊列配置跟我們之前章節(jié)的配置一樣猬膨,而我們另外新創(chuàng)建了一個后綴為ttl的消息隊列配置。我們采用的這種方式是RabbitMQ消息隊列其中一種的延遲消費模塊呛伴,通過配置隊列消息過期后轉(zhuǎn)發(fā)的形式勃痴。

這種模式比較簡單,我們需要將消息先發(fā)送到ttl延遲隊列內(nèi)热康,當消息到達過期時間后會自動轉(zhuǎn)發(fā)到ttl隊列內(nèi)配置的轉(zhuǎn)發(fā)Exchange以及RouteKey綁定的隊列內(nèi)完成消息消費沛申。

下面我們來模擬消息通知的延遲消費場景,先來創(chuàng)建一個名為MessageRabbitMqConfiguration的隊列配置類姐军,該配置類內(nèi)添加消息通知隊列配置以及消息通過延遲隊列配置污它,如下所示:

/**
 * 消息通知 - 消息隊列配置信息
 *
 * @author:恒宇少年 <br/>
 * ===============================
 * Created with IDEA.
 * Date:2018/3/3
 * Time:下午4:32
 * 簡書:http://www.reibang.com/u/092df3f77bca
 * ================================
 */
@Configuration
public class MessageRabbitMqConfiguration {
    /**
     * 消息中心實際消費隊列交換配置
     *
     * @return
     */
    @Bean
    DirectExchange messageDirect() {
        return (DirectExchange) ExchangeBuilder
                .directExchange(QueueEnum.MESSAGE_QUEUE.getExchange())
                .durable(true)
                .build();
    }

    /**
     * 消息中心延遲消費交換配置
     *
     * @return
     */
    @Bean
    DirectExchange messageTtlDirect() {
        return (DirectExchange) ExchangeBuilder
                .directExchange(QueueEnum.MESSAGE_TTL_QUEUE.getExchange())
                .durable(true)
                .build();
    }

    /**
     * 消息中心實際消費隊列配置
     *
     * @return
     */
    @Bean
    public Queue messageQueue() {
        return new Queue(QueueEnum.MESSAGE_QUEUE.getName());
    }


    /**
     * 消息中心TTL隊列
     *
     * @return
     */
    @Bean
    Queue messageTtlQueue() {
        return QueueBuilder
                .durable(QueueEnum.MESSAGE_TTL_QUEUE.getName())
                // 配置到期后轉(zhuǎn)發(fā)的交換
                .withArgument("x-dead-letter-exchange", QueueEnum.MESSAGE_QUEUE.getExchange())
                // 配置到期后轉(zhuǎn)發(fā)的路由鍵
                .withArgument("x-dead-letter-routing-key", QueueEnum.MESSAGE_QUEUE.getRouteKey())
                .build();
    }

    /**
     * 消息中心實際消息交換與隊列綁定
     *
     * @param messageDirect 消息中心交換配置
     * @param messageQueue  消息中心隊列
     * @return
     */
    @Bean
    Binding messageBinding(DirectExchange messageDirect, Queue messageQueue) {
        return BindingBuilder
                .bind(messageQueue)
                .to(messageDirect)
                .with(QueueEnum.MESSAGE_QUEUE.getRouteKey());
    }

    /**
     * 消息中心TTL綁定實際消息中心實際消費交換機
     *
     * @param messageTtlQueue
     * @param messageTtlDirect
     * @return
     */
    @Bean
    public Binding messageTtlBinding(Queue messageTtlQueue, DirectExchange messageTtlDirect) {
        return BindingBuilder
                .bind(messageTtlQueue)
                .to(messageTtlDirect)
                .with(QueueEnum.MESSAGE_TTL_QUEUE.getRouteKey());
    }
}

我們聲明了消息通知隊列的相關(guān)ExchangeQueueBinding等配置衫贬,將message.center.create隊列通過路由鍵message.center.create綁定到了message.center.direct交換上德澈。

除此之外,我們還添加了消息通知延遲隊列Exchange固惯、Queue梆造、Binding等配置,將message.center.create.ttl隊列通過message.center.create.ttl路由鍵綁定到了message.center.topic.ttl交換上葬毫。

我們仔細來看看messageTtlQueue延遲隊列的配置镇辉,跟messageQueue隊列配置不同的地方這里多出了x-dead-letter-exchangex-dead-letter-routing-key兩個參數(shù)贴捡,而這兩個參數(shù)就是配置延遲隊列過期后轉(zhuǎn)發(fā)的Exchange忽肛、RouteKey,只要在創(chuàng)建隊列時對應(yīng)添加了這兩個參數(shù)烂斋,在RabbitMQ管理平臺看到的隊列配置就不僅是單純的Direct類型的隊列類型屹逛,如下圖所示:

隊列類型差異

在上圖內(nèi)我們可以看到message.center.create.ttl隊列多出了DLXDLK的配置汛骂,這就是RabbitMQ內(nèi)死信交換的標志罕模。
滿足死信交換的條件,在官方文檔中表示:

Messages from a queue can be 'dead-lettered'; that is, republished to another exchange when any of the following events occur:

The message is rejected (basic.reject or basic.nack) with requeue=false,
The TTL for the message expires; or
The queue length limit is exceeded.

  • 該消息被拒絕(basic.reject或 basic.nack)帘瞭,requeue = false
  • 消息的TTL過期
  • 隊列長度限制已超出
    官方文檔地址

我們需要滿足上面的其中一種方式就可以了淑掌,我們采用滿足第二個條件,采用過期的方式蝶念。

隊列消息提供者

我們再來創(chuàng)建一個名為rabbitmq-lazy-provider的模塊(Create New Maven Module)抛腕,并且在pom.xml配置文件內(nèi)添加rabbitmq-common模塊的依賴,如下所示:

<!--添加公共模塊依賴-->
<dependency>
      <groupId>com.hengyu</groupId>
      <artifactId>rabbitmq-common</artifactId>
      <version>0.0.1-SNAPSHOT</version>
</dependency>

配置隊列

resource下創(chuàng)建一個名為application.yml的配置文件媒殉,在該配置文件內(nèi)添加如下配置信息:

spring:
  #rabbitmq消息隊列配置信息
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /hengboy
    publisher-confirms: true

消息提供者類

接下來我們來創(chuàng)建名為MessageProvider消息提供者類担敌,用來發(fā)送消息內(nèi)容到消息通知延遲隊列,代碼如下所示:

/**
 * 消息通知 - 提供者
 *
 * @author:于起宇 <br/>
 * ===============================
 * Created with IDEA.
 * Date:2018/3/3
 * Time:下午4:40
 * 簡書:http://www.reibang.com/u/092df3f77bca
 * ================================
 */
@Component
public class MessageProvider {
    /**
     * logger instance
     */
    static Logger logger = LoggerFactory.getLogger(MessageProvider.class);
    /**
     * RabbitMQ 模版消息實現(xiàn)類
     */
    @Autowired
    private AmqpTemplate rabbitMqTemplate;

    /**
     * 發(fā)送延遲消息
     *
     * @param messageContent 消息內(nèi)容
     * @param exchange       隊列交換
     * @param routerKey      隊列交換綁定的路由鍵
     * @param delayTimes     延遲時長适袜,單位:毫秒
     */
    public void sendMessage(Object messageContent, String exchange, String routerKey, final long delayTimes) {
        if (!StringUtils.isEmpty(exchange)) {
            logger.info("延遲:{}毫秒寫入消息隊列:{}柄错,消息內(nèi)容:{}", delayTimes, routerKey, JSON.toJSONString(messageContent));
            // 執(zhí)行發(fā)送消息到指定隊列
            rabbitMqTemplate.convertAndSend(exchange, routerKey, messageContent, message -> {
                // 設(shè)置延遲毫秒值
                message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
                return message;
            });
        } else {
            logger.error("未找到隊列消息:{},所屬的交換機", exchange);
        }
    }
}

由于我們在 pom.xml配置文件內(nèi)添加了RabbitMQ相關(guān)的依賴并且在上面application.yml文件內(nèi)添加了對應(yīng)的配置苦酱,SpringBoot為我們自動實例化了AmqpTemplate售貌,該實例可以發(fā)送任何類型的消息到指定隊列。
我們采用convertAndSend方法疫萤,將消息內(nèi)容發(fā)送到指定Exchange颂跨、RouterKey隊列,并且通過setExpiration方法設(shè)置過期時間扯饶,單位:毫秒恒削。

編寫發(fā)送測試

我們在test目錄下創(chuàng)建一個測試類池颈,如下所示:

@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitMqLazyProviderApplication.class)
public class RabbitMqLazyProviderApplicationTests {
    /**
     * 消息隊列提供者
     */
    @Autowired
    private MessageProvider messageProvider;

    /**
     * 測試延遲消息消費
     */
    @Test
    public void testLazy() {
        // 測試延遲10秒
        messageProvider.sendMessage("測試延遲消費,寫入時間:" + new Date(),
                QueueEnum.MESSAGE_TTL_QUEUE.getExchange(),
                QueueEnum.MESSAGE_TTL_QUEUE.getRouteKey(),
                10000);
    }
}

注意:@SpringBootTest注解內(nèi)添加了classes入口類的配置,因為我們是模塊創(chuàng)建的項目并不是默認創(chuàng)建的SpringBoot項目钓丰,這里需要配置入口程序類才可以運行測試躯砰。

在測試類我們注入了MessageProvider消息提供者,調(diào)用sendMessage方法發(fā)送消息到消息通知延遲隊列携丁,并且設(shè)置延遲的時間為10秒琢歇,這里衡量發(fā)送到指定隊列的標準是要看MessageRabbitMqConfiguration配置類內(nèi)的相關(guān)Binding配置,通過Exchange梦鉴、RouterKey值進行發(fā)送到指定的隊列李茫。

到目前為止我們的rabbitmq-lazy-provider消息提供模塊已經(jīng)編寫完成了,下面我們來看看消息消費者模塊肥橙。

隊列消息消費者

我們再來創(chuàng)建一個名為rabbitmq-lazy-consumer的模塊(Create New Maven Module)魄宏,同樣需要在pom.xml配置文件內(nèi)添加rabbitmq-common模塊的依賴,如下所示:

<!--添加公共模塊依賴-->
<dependency>
      <groupId>com.hengyu</groupId>
      <artifactId>rabbitmq-common</artifactId>
      <version>0.0.1-SNAPSHOT</version>
</dependency>

當然同樣需要在resource下創(chuàng)建application.yml并添加消息隊列的相關(guān)配置存筏,代碼就不貼出來了宠互,可以直接從rabbitmq-lazy-provider模塊中復(fù)制application.yml文件到當前模塊內(nèi)。

消息消費者類

接下來創(chuàng)建一個名為MessageConsumer的消費者類方篮,該類需要監(jiān)聽消息通知隊列名秀,代碼如下所示:

/**
 * 消息通知 - 消費者
 *
 * @author:于起宇 <br/>
 * ===============================
 * Created with IDEA.
 * Date:2018/3/3
 * Time:下午5:00
 * 簡書:http://www.reibang.com/u/092df3f77bca
 * ================================
 */
@Component
@RabbitListener(queues = "message.center.create")
public class MessageConsumer {
    /**
     * logger instance
     */
    static Logger logger = LoggerFactory.getLogger(MessageConsumer.class);

    @RabbitHandler
    public void handler(String content) {
        logger.info("消費內(nèi)容:{}", content);
    }
}

@RabbitListener注解內(nèi)配置了監(jiān)聽的隊列励负,這里配置內(nèi)容是QueueEnum枚舉內(nèi)的queueName屬性值藕溅,當然如果你采用常量的方式在注解屬性上是直接可以使用的,枚舉不支持這種配置继榆,這里只能把QueueName字符串配置到queues屬性上了巾表。
由于我們在消息發(fā)送時采用字符串的形式發(fā)送消息內(nèi)容,這里在@RabbitHandler處理方法的參數(shù)內(nèi)要保持數(shù)據(jù)類型一致略吨!

消費者入口類

我們?yōu)橄M者模塊添加一個入口程序類集币,用于啟動消費者,代碼如下所示:

/**
 * 【第四十六章:SpringBoot & RabbitMQ完成消息延遲消費】
 * 隊列消費者模塊 - 入口程序類
 *
 * @author:于起宇 <br/>
 * ===============================
 * Created with IDEA.
 * Date:2018/3/3
 * Time:下午4:55
 * 簡書:http://www.reibang.com/u/092df3f77bca
 * ================================
 */
@SpringBootApplication
public class RabbitMqLazyConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(RabbitMqLazyConsumerApplication.class, args);
    }
}

測試

我們的代碼已經(jīng)編寫完畢翠忠,下面來測試下是否完成了我們預(yù)想的效果鞠苟,步驟如下所示:

1. 啟動消費者模塊
2. 執(zhí)行RabbitMqLazyProviderApplicationTests.testLazy()方法進行發(fā)送消息到通知延遲隊列
3. 查看消費者模塊控制臺輸出內(nèi)容

我們可以在消費者模塊控制臺看到輸出內(nèi)容:

2018-03-04 10:10:34.765  INFO 70486 --- [cTaskExecutor-1] c.h.r.lazy.consumer.MessageConsumer      : 消費內(nèi)容:測試延遲消費,寫入時間:Sun Mar 04 10:10:24 CST 2018

我們在提供者測試方法發(fā)送消息的時間為10:10:24,而真正消費的時間則為10:10:34秽之,與我們預(yù)計的一樣当娱,消息延遲了10秒后去執(zhí)行消費。

總結(jié)

終上所述我們完成了消息隊列的延遲消費考榨,采用死信方式跨细,通過消息過期方式觸發(fā),在實際項目研發(fā)過程中河质,延遲消費還是很有必要的冀惭,可以省去一些定時任務(wù)的配置震叙。

本章源碼已經(jīng)上傳到碼云:
SpringBoot配套源碼地址:https://gitee.com/hengboy/spring-boot-chapter
SpringCloud配套源碼地址:https://gitee.com/hengboy/spring-cloud-chapter

作者個人 博客
使用開源框架 ApiBoot 助你成為Api接口服務(wù)架構(gòu)師

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市散休,隨后出現(xiàn)的幾起案子媒楼,更是在濱河造成了極大的恐慌,老刑警劉巖戚丸,帶你破解...
    沈念sama閱讀 218,941評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件匣砖,死亡現(xiàn)場離奇詭異,居然都是意外死亡昏滴,警方通過查閱死者的電腦和手機猴鲫,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來谣殊,“玉大人拂共,你說我怎么就攤上這事∫黾福” “怎么了宜狐?”我有些...
    開封第一講書人閱讀 165,345評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長蛇捌。 經(jīng)常有香客問我抚恒,道長,這世上最難降的妖魔是什么络拌? 我笑而不...
    開封第一講書人閱讀 58,851評論 1 295
  • 正文 為了忘掉前任俭驮,我火速辦了婚禮,結(jié)果婚禮上春贸,老公的妹妹穿的比我還像新娘混萝。我一直安慰自己,他們只是感情好萍恕,可當我...
    茶點故事閱讀 67,868評論 6 392
  • 文/花漫 我一把揭開白布逸嘀。 她就那樣靜靜地躺著,像睡著了一般允粤。 火紅的嫁衣襯著肌膚如雪崭倘。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,688評論 1 305
  • 那天类垫,我揣著相機與錄音司光,去河邊找鬼。 笑死阔挠,一個胖子當著我的面吹牛飘庄,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播购撼,決...
    沈念sama閱讀 40,414評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼跪削,長吁一口氣:“原來是場噩夢啊……” “哼谴仙!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起碾盐,我...
    開封第一講書人閱讀 39,319評論 0 276
  • 序言:老撾萬榮一對情侶失蹤晃跺,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后毫玖,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體掀虎,經(jīng)...
    沈念sama閱讀 45,775評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年付枫,在試婚紗的時候發(fā)現(xiàn)自己被綠了烹玉。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,096評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡阐滩,死狀恐怖二打,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情掂榔,我是刑警寧澤继效,帶...
    沈念sama閱讀 35,789評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站装获,受9級特大地震影響瑞信,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜穴豫,卻給世界環(huán)境...
    茶點故事閱讀 41,437評論 3 331
  • 文/蒙蒙 一凡简、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧绩郎,春花似錦潘鲫、人聲如沸翁逞。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,993評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽挖函。三九已至状植,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間怨喘,已是汗流浹背津畸。 一陣腳步聲響...
    開封第一講書人閱讀 33,107評論 1 271
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留必怜,地道東北人肉拓。 一個月前我還...
    沈念sama閱讀 48,308評論 3 372
  • 正文 我出身青樓,卻偏偏與公主長得像梳庆,于是被迫代替她去往敵國和親暖途。 傳聞我的和親對象是個殘疾皇子卑惜,可洞房花燭夜當晚...
    茶點故事閱讀 45,037評論 2 355

推薦閱讀更多精彩內(nèi)容