在2018-3-1
日SpringBoot
官方發(fā)版了2.0.0.RELEASE
最新版本,新版本完全基于Spring5.0
來構(gòu)建,JDK
最低支持也從原來的1.6
也改成了1.8
,不再兼容1.8
以下的版本,更多新特性請查看官方文檔睬澡。
本章目標
基于SpringBoot
整合RabbitMQ
完成消息延遲消費。
免費教程專題
恒宇少年在博客整理三套免費學習教程專題
艳汽,由于文章偏多
特意添加了閱讀指南
猴贰,新文章以及之前的文章都會在專題內(nèi)陸續(xù)填充
对雪,希望可以幫助大家解惑更多知識點河狐。
構(gòu)建項目
注意前言
由于
SpringBoot
的內(nèi)置掃描機制,我們?nèi)绻蛔詣优渲脪呙杪窂缴罚埍3窒旅?code>rabbitmq-common模塊內(nèi)的配置可以被SpringBoot
掃描到馋艺,否則不會自動創(chuàng)建隊列,控制臺會輸出404的錯誤信息迈套。
SpringBoot 企業(yè)級核心技術(shù)學習專題
專題 | 專題名稱 | 專題描述 |
---|---|---|
001 | Spring Boot 核心技術(shù) | 講解SpringBoot一些企業(yè)級層面的核心組件 |
002 | Spring Boot 核心技術(shù)章節(jié)源碼 | Spring Boot 核心技術(shù)簡書每一篇文章碼云對應(yīng)源碼 |
003 | Spring Cloud 核心技術(shù) | 對Spring Cloud核心技術(shù)全面講解 |
004 | Spring Cloud 核心技術(shù)章節(jié)源碼 | Spring Cloud 核心技術(shù)簡書每一篇文章對應(yīng)源碼 |
005 | QueryDSL 核心技術(shù) | 全面講解QueryDSL核心技術(shù)以及基于SpringBoot整合SpringDataJPA |
006 | SpringDataJPA 核心技術(shù) | 全面講解SpringDataJPA核心技術(shù) |
007 | SpringBoot核心技術(shù)學習目錄 | SpringBoot系統(tǒng)的學習目錄捐祠,敬請關(guān)注點贊!桑李!! |
我們本章采用2.0.0.RELEASE
版本的SpringBoot
踱蛀,添加相關(guān)的依賴如下所示:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
......
<dependencies>
<!--rabbbitMQ相關(guān)依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--web相關(guān)依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--lombok依賴-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!--spring boot tester-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--fast json依賴-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.40</version>
</dependency>
</dependencies>
......
我們?nèi)匀徊捎枚嗄K的方式來測試隊列的Provider
以及Consumer
。
隊列公共模塊
我們先來創(chuàng)建一個名為rabbitmq-common
公共依賴模塊(Create New Maven Module)
在公共模塊內(nèi)添加一個QueueEnum
隊列枚舉配置贵白,該枚舉內(nèi)配置隊列的Exchange
率拒、QueueName
、RouteKey
等相關(guān)內(nèi)容禁荒,如下所示:
package com.hengyu.rabbitmq.lazy.enums;
import lombok.Getter;
/**
* 消息隊列枚舉配置
*
* @author:于起宇 <br/>
* ===============================
* Created with IDEA.
* Date:2018/3/3
* Time:下午4:33
* 簡書:http://www.reibang.com/u/092df3f77bca
* ================================
*/
@Getter
public enum QueueEnum {
/**
* 消息通知隊列
*/
MESSAGE_QUEUE("message.center.direct", "message.center.create", "message.center.create"),
/**
* 消息通知ttl隊列
*/
MESSAGE_TTL_QUEUE("message.center.topic.ttl", "message.center.create.ttl", "message.center.create.ttl");
/**
* 交換名稱
*/
private String exchange;
/**
* 隊列名稱
*/
private String name;
/**
* 路由鍵
*/
private String routeKey;
QueueEnum(String exchange, String name, String routeKey) {
this.exchange = exchange;
this.name = name;
this.routeKey = routeKey;
}
}
可以看到MESSAGE_QUEUE
隊列配置跟我們之前章節(jié)的配置一樣猬膨,而我們另外新創(chuàng)建了一個后綴為ttl
的消息隊列配置。我們采用的這種方式是RabbitMQ
消息隊列其中一種的延遲消費模塊呛伴,通過配置隊列消息過期后轉(zhuǎn)發(fā)的形式勃痴。
這種模式比較簡單,我們需要將消息先發(fā)送到
ttl
延遲隊列內(nèi)热康,當消息到達過期時間后會自動轉(zhuǎn)發(fā)到ttl
隊列內(nèi)配置的轉(zhuǎn)發(fā)Exchange
以及RouteKey
綁定的隊列內(nèi)完成消息消費沛申。
下面我們來模擬消息通知
的延遲消費場景,先來創(chuàng)建一個名為MessageRabbitMqConfiguration
的隊列配置類姐军,該配置類內(nèi)添加消息通知隊列
配置以及消息通過延遲隊列
配置污它,如下所示:
/**
* 消息通知 - 消息隊列配置信息
*
* @author:恒宇少年 <br/>
* ===============================
* Created with IDEA.
* Date:2018/3/3
* Time:下午4:32
* 簡書:http://www.reibang.com/u/092df3f77bca
* ================================
*/
@Configuration
public class MessageRabbitMqConfiguration {
/**
* 消息中心實際消費隊列交換配置
*
* @return
*/
@Bean
DirectExchange messageDirect() {
return (DirectExchange) ExchangeBuilder
.directExchange(QueueEnum.MESSAGE_QUEUE.getExchange())
.durable(true)
.build();
}
/**
* 消息中心延遲消費交換配置
*
* @return
*/
@Bean
DirectExchange messageTtlDirect() {
return (DirectExchange) ExchangeBuilder
.directExchange(QueueEnum.MESSAGE_TTL_QUEUE.getExchange())
.durable(true)
.build();
}
/**
* 消息中心實際消費隊列配置
*
* @return
*/
@Bean
public Queue messageQueue() {
return new Queue(QueueEnum.MESSAGE_QUEUE.getName());
}
/**
* 消息中心TTL隊列
*
* @return
*/
@Bean
Queue messageTtlQueue() {
return QueueBuilder
.durable(QueueEnum.MESSAGE_TTL_QUEUE.getName())
// 配置到期后轉(zhuǎn)發(fā)的交換
.withArgument("x-dead-letter-exchange", QueueEnum.MESSAGE_QUEUE.getExchange())
// 配置到期后轉(zhuǎn)發(fā)的路由鍵
.withArgument("x-dead-letter-routing-key", QueueEnum.MESSAGE_QUEUE.getRouteKey())
.build();
}
/**
* 消息中心實際消息交換與隊列綁定
*
* @param messageDirect 消息中心交換配置
* @param messageQueue 消息中心隊列
* @return
*/
@Bean
Binding messageBinding(DirectExchange messageDirect, Queue messageQueue) {
return BindingBuilder
.bind(messageQueue)
.to(messageDirect)
.with(QueueEnum.MESSAGE_QUEUE.getRouteKey());
}
/**
* 消息中心TTL綁定實際消息中心實際消費交換機
*
* @param messageTtlQueue
* @param messageTtlDirect
* @return
*/
@Bean
public Binding messageTtlBinding(Queue messageTtlQueue, DirectExchange messageTtlDirect) {
return BindingBuilder
.bind(messageTtlQueue)
.to(messageTtlDirect)
.with(QueueEnum.MESSAGE_TTL_QUEUE.getRouteKey());
}
}
我們聲明了消息通知隊列
的相關(guān)Exchange
、Queue
、Binding
等配置衫贬,將message.center.create
隊列通過路由鍵message.center.create
綁定到了message.center.direct
交換上德澈。
除此之外,我們還添加了消息通知延遲隊列
的Exchange
固惯、Queue
梆造、Binding
等配置,將message.center.create.ttl
隊列通過message.center.create.ttl
路由鍵綁定到了message.center.topic.ttl
交換上葬毫。
我們仔細來看看messageTtlQueue
延遲隊列的配置镇辉,跟messageQueue
隊列配置不同的地方這里多出了x-dead-letter-exchange
、x-dead-letter-routing-key
兩個參數(shù)贴捡,而這兩個參數(shù)就是配置延遲隊列過期后轉(zhuǎn)發(fā)的Exchange
忽肛、RouteKey
,只要在創(chuàng)建隊列時對應(yīng)添加了這兩個參數(shù)烂斋,在RabbitMQ
管理平臺看到的隊列配置就不僅是單純的Direct
類型的隊列類型屹逛,如下圖所示:
在上圖內(nèi)我們可以看到message.center.create.ttl
隊列多出了DLX
、DLK
的配置汛骂,這就是RabbitMQ
內(nèi)死信交換
的標志罕模。
滿足死信交換
的條件,在官方文檔中表示:
Messages from a queue can be 'dead-lettered'; that is, republished to another exchange when any of the following events occur:
The message is rejected (basic.reject or basic.nack) with requeue=false,
The TTL for the message expires; or
The queue length limit is exceeded.
- 該消息被拒絕(basic.reject或 basic.nack)帘瞭,requeue = false
- 消息的TTL過期
- 隊列長度限制已超出
官方文檔地址
我們需要滿足上面的其中一種方式就可以了淑掌,我們采用滿足第二個條件,采用過期的方式蝶念。
隊列消息提供者
我們再來創(chuàng)建一個名為rabbitmq-lazy-provider
的模塊(Create New Maven Module)抛腕,并且在pom.xml
配置文件內(nèi)添加rabbitmq-common
模塊的依賴,如下所示:
<!--添加公共模塊依賴-->
<dependency>
<groupId>com.hengyu</groupId>
<artifactId>rabbitmq-common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
配置隊列
在resource
下創(chuàng)建一個名為application.yml
的配置文件媒殉,在該配置文件內(nèi)添加如下配置信息:
spring:
#rabbitmq消息隊列配置信息
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /hengboy
publisher-confirms: true
消息提供者類
接下來我們來創(chuàng)建名為MessageProvider
消息提供者類担敌,用來發(fā)送消息內(nèi)容到消息通知延遲隊列,代碼如下所示:
/**
* 消息通知 - 提供者
*
* @author:于起宇 <br/>
* ===============================
* Created with IDEA.
* Date:2018/3/3
* Time:下午4:40
* 簡書:http://www.reibang.com/u/092df3f77bca
* ================================
*/
@Component
public class MessageProvider {
/**
* logger instance
*/
static Logger logger = LoggerFactory.getLogger(MessageProvider.class);
/**
* RabbitMQ 模版消息實現(xiàn)類
*/
@Autowired
private AmqpTemplate rabbitMqTemplate;
/**
* 發(fā)送延遲消息
*
* @param messageContent 消息內(nèi)容
* @param exchange 隊列交換
* @param routerKey 隊列交換綁定的路由鍵
* @param delayTimes 延遲時長适袜,單位:毫秒
*/
public void sendMessage(Object messageContent, String exchange, String routerKey, final long delayTimes) {
if (!StringUtils.isEmpty(exchange)) {
logger.info("延遲:{}毫秒寫入消息隊列:{}柄错,消息內(nèi)容:{}", delayTimes, routerKey, JSON.toJSONString(messageContent));
// 執(zhí)行發(fā)送消息到指定隊列
rabbitMqTemplate.convertAndSend(exchange, routerKey, messageContent, message -> {
// 設(shè)置延遲毫秒值
message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
return message;
});
} else {
logger.error("未找到隊列消息:{},所屬的交換機", exchange);
}
}
}
由于我們在 pom.xml
配置文件內(nèi)添加了RabbitMQ
相關(guān)的依賴并且在上面application.yml
文件內(nèi)添加了對應(yīng)的配置苦酱,SpringBoot
為我們自動實例化了AmqpTemplate
售貌,該實例可以發(fā)送任何類型的消息到指定隊列。
我們采用convertAndSend
方法疫萤,將消息內(nèi)容發(fā)送到指定Exchange
颂跨、RouterKey
隊列,并且通過setExpiration
方法設(shè)置過期時間扯饶,單位:毫秒恒削。
編寫發(fā)送測試
我們在test
目錄下創(chuàng)建一個測試類池颈,如下所示:
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitMqLazyProviderApplication.class)
public class RabbitMqLazyProviderApplicationTests {
/**
* 消息隊列提供者
*/
@Autowired
private MessageProvider messageProvider;
/**
* 測試延遲消息消費
*/
@Test
public void testLazy() {
// 測試延遲10秒
messageProvider.sendMessage("測試延遲消費,寫入時間:" + new Date(),
QueueEnum.MESSAGE_TTL_QUEUE.getExchange(),
QueueEnum.MESSAGE_TTL_QUEUE.getRouteKey(),
10000);
}
}
注意:
@SpringBootTest
注解內(nèi)添加了classes
入口類的配置,因為我們是模塊創(chuàng)建的項目并不是默認創(chuàng)建的SpringBoot
項目钓丰,這里需要配置入口程序類才可以運行測試躯砰。
在測試類我們注入了MessageProvider
消息提供者,調(diào)用sendMessage
方法發(fā)送消息到消息通知延遲隊列
携丁,并且設(shè)置延遲的時間為10秒
琢歇,這里衡量發(fā)送到指定隊列的標準是要看MessageRabbitMqConfiguration
配置類內(nèi)的相關(guān)Binding
配置,通過Exchange
梦鉴、RouterKey
值進行發(fā)送到指定的隊列李茫。
到目前為止我們的rabbitmq-lazy-provider
消息提供模塊已經(jīng)編寫完成了,下面我們來看看消息消費者模塊肥橙。
隊列消息消費者
我們再來創(chuàng)建一個名為rabbitmq-lazy-consumer
的模塊(Create New Maven Module)魄宏,同樣需要在pom.xml
配置文件內(nèi)添加rabbitmq-common
模塊的依賴,如下所示:
<!--添加公共模塊依賴-->
<dependency>
<groupId>com.hengyu</groupId>
<artifactId>rabbitmq-common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
當然同樣需要在resource
下創(chuàng)建application.yml
并添加消息隊列的相關(guān)配置存筏,代碼就不貼出來了宠互,可以直接從rabbitmq-lazy-provider
模塊中復(fù)制application.yml
文件到當前模塊內(nèi)。
消息消費者類
接下來創(chuàng)建一個名為MessageConsumer
的消費者類方篮,該類需要監(jiān)聽消息通知隊列
名秀,代碼如下所示:
/**
* 消息通知 - 消費者
*
* @author:于起宇 <br/>
* ===============================
* Created with IDEA.
* Date:2018/3/3
* Time:下午5:00
* 簡書:http://www.reibang.com/u/092df3f77bca
* ================================
*/
@Component
@RabbitListener(queues = "message.center.create")
public class MessageConsumer {
/**
* logger instance
*/
static Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
@RabbitHandler
public void handler(String content) {
logger.info("消費內(nèi)容:{}", content);
}
}
在@RabbitListener
注解內(nèi)配置了監(jiān)聽的隊列励负,這里配置內(nèi)容是QueueEnum
枚舉內(nèi)的queueName
屬性值藕溅,當然如果你采用常量的方式在注解屬性上是直接可以使用的,枚舉不支持這種配置继榆,這里只能把QueueName
字符串配置到queues
屬性上了巾表。
由于我們在消息發(fā)送時采用字符串的形式發(fā)送消息內(nèi)容,這里在@RabbitHandler
處理方法的參數(shù)內(nèi)要保持數(shù)據(jù)類型一致略吨!
消費者入口類
我們?yōu)橄M者模塊添加一個入口程序類集币,用于啟動消費者,代碼如下所示:
/**
* 【第四十六章:SpringBoot & RabbitMQ完成消息延遲消費】
* 隊列消費者模塊 - 入口程序類
*
* @author:于起宇 <br/>
* ===============================
* Created with IDEA.
* Date:2018/3/3
* Time:下午4:55
* 簡書:http://www.reibang.com/u/092df3f77bca
* ================================
*/
@SpringBootApplication
public class RabbitMqLazyConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitMqLazyConsumerApplication.class, args);
}
}
測試
我們的代碼已經(jīng)編寫完畢翠忠,下面來測試下是否完成了我們預(yù)想的效果鞠苟,步驟如下所示:
1. 啟動消費者模塊
2. 執(zhí)行RabbitMqLazyProviderApplicationTests.testLazy()方法進行發(fā)送消息到通知延遲隊列
3. 查看消費者模塊控制臺輸出內(nèi)容
我們可以在消費者模塊控制臺看到輸出內(nèi)容:
2018-03-04 10:10:34.765 INFO 70486 --- [cTaskExecutor-1] c.h.r.lazy.consumer.MessageConsumer : 消費內(nèi)容:測試延遲消費,寫入時間:Sun Mar 04 10:10:24 CST 2018
我們在提供者測試方法發(fā)送消息的時間為10:10:24
,而真正消費的時間則為10:10:34
秽之,與我們預(yù)計的一樣当娱,消息延遲了10秒
后去執(zhí)行消費。
總結(jié)
終上所述我們完成了消息隊列的延遲消費
考榨,采用死信
方式跨细,通過消息過期方式觸發(fā),在實際項目研發(fā)過程中河质,延遲消費還是很有必要的冀惭,可以省去一些定時任務(wù)的配置震叙。
本章源碼已經(jīng)上傳到碼云:
SpringBoot配套源碼地址:https://gitee.com/hengboy/spring-boot-chapter
SpringCloud配套源碼地址:https://gitee.com/hengboy/spring-cloud-chapter