本篇主要介紹一下spring boot 整合 rabbit mq 的使用。
項目介紹
本篇文章的例子分別寫在兩個項目中:
- spring-boot-rabbitmq-producer:存放消息生產端 producer 相關類慌植,也就是消息發(fā)送端甚牲;
- spring-boot-rabbitmq-consumer:存放消息消費者 consumer 相關類,也就是消息接收端蝶柿;
項目已經上傳到github上:https://github.com/xsg1995/spring-boot-rabbitmq
pom.xml
pom文件引入依賴:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.xsg</groupId>
<artifactId>spring-boot-rabbitmq-producer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>spring-boot-rabbitmq-producer</name>
<url>http://maven.apache.org</url>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.0.RELEASE</version>
<relativePath/>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!-- spring-boot-starter-amqp -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--spring boot test-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.yml
配置rabbit mq相關信息
spring:
rabbitmq:
host: localhost #rabbitmq服務地址
port: 5672 #rabbitmq通信端口
username: guest #rabbitmq用戶名
password: guest #rabbitmq密碼
以fanout分發(fā)策略分發(fā)消息的使用
- fanout:當指定fanout分發(fā)策略時丈钙,交換機不會處理路由key,交換機會將消息發(fā)送到所有綁定了在該交換機的隊列上交汤。如下圖所示:
以下代碼實現功能流程如下:
- 聲明一個名為 AFanoutExchange 的 FanoutExChange 類型的交換機雏赦;
- 聲明一個名稱為 AFanoutQueue 的 Queue 隊列,并綁定到 AFanoutExchange 交換機芙扎;
- 生產者發(fā)送消息到 AFanoutExchange 交換機星岗,AFanoutExchange 交換機將消息路由發(fā)送與其綁定的 Queue 上面;
- 消費端從 Queue 上面拉取到消息進行消費戒洼。
producer生產消息端配置exchange
在 RabbitMQExchangeConfig 類中配置exchange信息:
/**
* 以Fanout方式發(fā)送消息
* 定義一個Exchange交換機俏橘,發(fā)送的消息將通過該交換機轉發(fā)
* @return
*/
@Bean
public FanoutExchange AFanoutExchange() {
//傳入exchange交換機的名稱 AFanoutExchange
return new FanoutExchange(RabbitMQExchangeConstant.A_FANOUT_EXCHANGE);
}
在 RabbitMQExchangeConstant 中配置exchange 名稱:
/**
* 以fanout方法發(fā)送A信息的Exchange名稱
*/
public static final String A_FANOUT_EXCHANGE = "AFanoutExchange";
producer生產端配置發(fā)送消息類
在 ASender 類中主要定義消息發(fā)送的邏輯,也就是消息發(fā)送者:
/**
* 用于發(fā)送A消息的sender
*/
@Component
public class ASender {
@Autowired
private AmqpTemplate rabbitTemplate;
/**
* 發(fā)送消息到 AFanoutExchange 交換機
*/
public void sendToAFanoutExchange(String msg) {
//要發(fā)送的信息拼上當前時間戳
String content = msg + "\t" + DateUtils.getDateTime();
//第一個參數表示Exchange交換機的名稱
//第二個參數表示路由Key圈浇,Fanout方式路由消息不會處理路由key
//第三個參數為要發(fā)送的消息
this.rabbitTemplate.convertAndSend(
RabbitMQExchangeConstant.A_FANOUT_EXCHANGE,
"",
content);
}
}
comsumer消費端配置exchange
在 RabbitMQExchangeConfig 類中配置exchange:
/**
* 以Fanout方式發(fā)送消息
* 定義一個Exchange交換機寥掐,從該交換機接收消息
* @return
*/
@Bean
public FanoutExchange AFanoutExchange() {
//傳入exchange交換機的名稱 AFanoutExchange
return new FanoutExchange(RabbitMQExchangeConstant.A_FANOUT_EXCHANGE);
}
RabbitMQExchangeConstant 中配置 exchange 的名稱:
/**
* 以fanout方法接收A信息的Exchange名稱
*/
public static final String A_FANOUT_EXCHANGE = "AFanoutExchange";
consumer消費端配置Queue
在RabbitMQQueueConfig 配置 queue:
/**
* 創(chuàng)建綁定到 AFanoutExchange 交換機的隊列
* @return
*/
@Bean
public Queue AFanoutQueue() {
//傳入隊列名稱
return new Queue(RabbitMQQueueConstant.A_FANOUT_QUEUE);
}
在 RabbitMQQueueConstant 中指定隊列名稱:
/**
* 指定綁定到 AFanoutExchange 交換機的隊列名稱,用于接收 A 類型的信息
*/
public static final String A_FANOUT_QUEUE = "AFanoutQueue";
consumer消費端配置bind信息
在 RabbitMQBindConfig 中配置bind信息:
/**
* 將 AFanoutQueue 隊列綁定到 AFanoutExchange 交換機上
* 用 AFanoutQueue 隊列接收 AFanoutExchange 發(fā)送過來的消息
* @param AFanoutQueue
* @param AFanoutExchange
* @return
*/
@Bean
public Binding bindAFanoutExchangeToAFanoutQueue(Queue AFanoutQueue, FanoutExchange AFanoutExchange) {
return BindingBuilder.bind(AFanoutQueue).to(AFanoutExchange);
}
consumer消費端接收消息類
在 AFanoutConsumer 類中主要定義接收消息邏輯磷蜀,也就是消息消費者:
/**
* 用于接收A消息的消費者consumer
*/
@Component
//表示監(jiān)聽名稱為 AFanoutQueue 的消息隊列
@RabbitListener(queues = {RabbitMQQueueConstant.A_FANOUT_QUEUE})
public class AFanoutConsumer {
/**
* 定義接收消息處理邏輯
* @param content
*/
@RabbitHandler
public void handler(String content) {
System.out.println("AFanoutConsumer 接收到消息: " + content);
}
}
運行測試:
首先運行 consumer 端也就是 spring-boot-rabbitmq-consumer 項目中的啟動類啟動消費者服務召耘,然后運行 producer 端也就是 spring-boot-rabbitmq-producer 項目中的 ASenderTest 類,用來發(fā)送消息褐隆,內容如下:
/**
* ASender的測試類
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class ASenderTest {
@Autowired
private ASender aSender;
/**
* 以 Fanout 方式發(fā)送消息測試用例
*/
@Test
public void sendToAFanoutExchangeTest() {
String msg = "Hello, I am A msg. — sendToAFanoutExchange";
this.aSender.sendToAFanoutExchange(msg);
}
}
運行結果污它,consumer接收到消息如下:
可以定義多個隊列綁定到同一個exchange,如果以fanout方式轉發(fā)消息妓灌,那么監(jiān)聽對應隊列的多個消費端都會收到消息轨蛤,如下所示:
以direct分發(fā)策略分發(fā)消息的使用
-
direct:當指定direct分發(fā)策略時,如果消息的路由key與隊列綁定的路由key相同時虫埂,交換器就會將消息發(fā)送到該隊列中祥山。例如發(fā)送消息是指定路由key為 rk1 ,那么如果隊列綁定的路由key也是 rk1掉伏,那么交換機會將消息發(fā)送到該隊列缝呕;
direct發(fā)送消息模式
以下代碼實現功能流程如下:
- 聲明一個名為 ADirectExchange 的 DirectExchange類型的交換機澳窑;
- 聲明一個名稱為 ADirectQueue的 Queue 隊列,并綁定到 ADirectExchange 交換機上供常,指定交換機與隊列之間的路由key為 A.rk1摊聋;
- 生產者發(fā)送消息到 ADirectExchange 交換機并指定發(fā)送的路由key為 A.rk1 ,ADirectExchange 交換機將消息路由發(fā)送與其綁定的 Queue 上面栈暇;
- 消費端從 Queue 上面拉取到消息進行消費麻裁。
producer生產消息端配置exchange
在 RabbitMQExchangeConfig 類中配置exchange信息:
/**
* 以direct方式發(fā)送消息
* 定義一個Exchange交換機,發(fā)送的消息將通過該交換機轉發(fā)
* @return
*/
@Bean
public DirectExchange ADirectExchange() {
//傳入exchange交換機的名稱 ADirectExchange
return new DirectExchange(RabbitMQExchangeConstant.A_DIRECT_EXCHANGE);
}
在 RabbitMQExchangeConstant 中配置exchange 名稱:
/**
* 以direct方法發(fā)送A信息的Exchange名稱
*/
public static final String A_DIRECT_EXCHANGE = "ADirectExchange";
producer生產端編寫發(fā)送消息邏輯
在 ASender 類中編寫消息發(fā)送的邏輯:
/**
* 發(fā)送消息到 ADirectExchange 交換機
*/
public void sendToADirectExchange(String msg, String routeKey) {
//要發(fā)送的信息拼上當前時間戳
String content = msg + "\t" + DateUtils.getDateTime();
//第一個參數表示Exchange交換機的名稱
//第二個參數表示路由Key源祈,direct方式路由消息時煎源,會將消息發(fā)送到綁定該路由key的隊列上
//第三個參數為要發(fā)送的消息
this.rabbitTemplate.convertAndSend(
RabbitMQExchangeConstant.A_FANOUT_EXCHANGE,
routeKey,
content);
}
comsumer消費端配置exchange
在 RabbitMQExchangeConfig 類中配置exchange:
/**
* 以direct方式發(fā)送消息
* 定義一個Exchange交換機,從該交換機接收消息
* @return
*/
@Bean
public DirectExchange ADirectExchange() {
//傳入exchange交換機的名稱 ADirectExchange
return new DirectExchange(RabbitMQExchangeConstant.A_DIRECT_EXCHANGE);
}
RabbitMQExchangeConstant 中配置 exchange 的名稱:
/**
* 以direct方法發(fā)送A信息的Exchange名稱
*/
public static final String A_DIRECT_EXCHANGE = "ADirectExchange";
consumer消費端配置Queue
在RabbitMQQueueConfig 配置 queue:
/**
* 創(chuàng)建綁定到 ADirectExchange 交換機的隊列
* @return
*/
@Bean
public Queue ADirectQueue() {
//傳入隊列名稱
return new Queue(RabbitMQQueueConstant.A_DIRECT_QUEUE);
}
在 RabbitMQQueueConstant 中指定隊列名稱:
/**
* 指定綁定到 ADirectExchange 交換機的隊列名稱香缺,用于接收 A 類型的信息
*/
public static final String A_DIRECT_QUEUE = "ADirectQueue";
consumer消費端配置bind信息
在 RabbitMQBindConfig 中配置bind信息:
/**
* 將 ADirectQueue 隊列綁定到 ADirectExchange交換機上
* 用 ADirectQueue 隊列接收 ADirectExchange 交換機發(fā)送過來的消息
* 指定路由key 為 A.rk1
* @param ADirectQueue
* @param ADirectExchange
* @return
*/
@Bean
public Binding bindADirectExchangeToADirectQueue(Queue ADirectQueue, DirectExchange ADirectExchange) {
//以 direct 方式接收消息需要指定路由key手销,也就是with傳入的參數
return BindingBuilder.bind(ADirectQueue).to(ADirectExchange).with(RabbitMQRoutKeyConstant.A_RK1);
}
在 RabbitMQRoutKeyConstant 中配置路由key的名稱:
/**
* 指定路由key 為 A.rk1, 表示接收路由key為 A.rk1 的消息
*/
public static final String A_RK1 = "A.rk1";
consumer消費端接收消息類
在 ADirectConsumer 類中主要定義接收消息邏輯图张,也就是消息消費者:
/**
* 用于接收A消息的消費者consumer
*/
@Component
//表示監(jiān)聽名稱為 ADirectQueue 的消息隊列
@RabbitListener(queues = {RabbitMQQueueConstant.A_DIRECT_QUEUE})
public class ADirectConsumer {
/**
* 定義接收消息處理邏輯
* @param content
*/
@RabbitHandler
public void handler(String content) {
System.out.println("ADirectConsumer 接收到消息: " + content);
}
}
運行測試:
在 ASenderTest 中編寫測試用例:
/**
* 以 Direct 方式發(fā)送消息測試用例
*/
@Test
public void sendToADirectExchangeTest() {
String msg = "Hello, I am A msg. — sendToADirectExchange ";
//第一個參數為送的消息
//第二個參數為發(fā)送消息的路由key
this.aSender.sendToADirectExchange(msg, "A.rk1");
}
啟動 consumer 端也就是 spring-boot-rabbitmq-consumer 項目中的啟動類啟動消費者服務锋拖,然后運行 producer 端也就是 spring-boot-rabbitmq-producer 項目中的 ASenderTest 類的 sendToADirectExchangeTest 方法,用來發(fā)送消息祸轮,運行結果如下圖顯示:
拷貝 ADirectConsumer 類命名為 ADirectQueue2兽埃,也是接收 ADirectQueue 隊列的消息,內容如下:
/**
* 用于接收A消息的消費者consumer
*/
@Component
//表示監(jiān)聽名稱為 ADirectQueue 的消息隊列
@RabbitListener(queues = {RabbitMQQueueConstant.A_DIRECT_QUEUE})
public class ADirectConsumer2 {
/**
* 定義接收消息處理邏輯
* @param content
*/
@RabbitHandler
public void handler(String content) {
System.out.println("ADirectConsumer2 接收到消息: " + content);
}
}
修改測試用例方法為發(fā)送10條消息:
/**
* 以 Direct 方式發(fā)送消息測試用例
*/
@Test
public void sendToADirectExchangeTest() {
for (int i = 0; i < 10; i++) {
String msg = "Hello, I am A msg. — sendToADirectExchange ";
//第一個參數為送的消息
//第二個參數為發(fā)送消息的路由key
this.aSender.sendToADirectExchange(msg, "A.rk1");
}
}
運行結果如下:
從上面結果可以看出适袜,如果以 direct 方式分發(fā)數據讲仰,并且有多個消費者同時消費同一個 queue 中的數據,那么那么消息發(fā)送會以輪詢的發(fā)送平均的發(fā)送到多個消費者端痪蝇。
以topic分發(fā)策略發(fā)送消息的使用
- topic:當指定topic分發(fā)策略時,交換器會通過模式匹配分發(fā)消息冕房,如果路由key與某個模式匹配時躏啰,交換機就會將消息發(fā)送到與該模式匹配的隊列中。例如某個隊列 queue 綁定的路由key的模式為 a.# 耙册,當 publisher 發(fā)送消息時给僵,如果指定發(fā)送的路由key為 a.b 或者是 a.c 時,該隊列將會收到路由器發(fā)送的消息详拙。
以下代碼實現功能流程如下:
- 聲明一個名為 ATopicExchange的 TopicExchange 類型的交換機帝际;
- 聲明一個名稱為 A_TopicQueue 的 Queue 隊列,并綁定到 ADirectExchange 交換機上饶辙,指定交換機與隊列之間的路由key的匹配模式為 A.#蹲诀;
- 生產者發(fā)送消息到 ATopicExchange 交換機并指定發(fā)送的路由key為 A.b ,ATopicExchange 交換機將消息路由發(fā)送與其綁定的 Queue 上面弃揽;
- 消費端從 Queue 上面拉取到消息進行消費脯爪。
producer生產消息端配置exchange
在 RabbitMQExchangeConfig 類中配置exchange信息:
/**
* 以topic方式發(fā)送消息
* 定義一個Exchange交換機则北,發(fā)送的消息將通過該交換機轉發(fā)
* @return
*/
@Bean
public TopicExchange ATopicExchange() {
//傳入exchange交換機的名稱 ATopicExchange
return new TopicExchange(RabbitMQExchangeConstant.A_TOPIC_EXCHANGE);
}
在 RabbitMQExchangeConstant 中配置exchange 名稱:
/**
* 以topic方法發(fā)送A信息的Exchange名稱
*/
public static final String A_TOPIC_EXCHANGE = "ATopicExchange";
producer生產端編寫發(fā)送消息邏輯
在 ASender 類中編寫消息發(fā)送的邏輯:
/**
* 發(fā)送消息到 ATopicExchange 交換機
*/
public void sendToATopicExchange(String msg, String routeKey) {
//要發(fā)送的信息拼上當前時間戳
String content = msg + "\t" + DateUtils.getDateTime();
//第一個參數表示Exchange交換機的名稱
//第二個參數表示路由Key,topic方式路由消息時痕慢,會將消息發(fā)送到匹配該路由key的隊列上
//第三個參數為要發(fā)送的消息
this.rabbitTemplate.convertAndSend(
RabbitMQExchangeConstant.A_TOPIC_EXCHANGE,
routeKey,
content);
}
comsumer消費端配置exchange
在 RabbitMQExchangeConfig 類中配置exchange:
/**
* 以topic方式發(fā)送消息
* 定義一個Exchange交換機尚揣,發(fā)送的消息將通過該交換機轉發(fā)
* @return
*/
@Bean
public TopicExchange ATopicExchange() {
//傳入exchange交換機的名稱 ATopicExchange
return new TopicExchange(RabbitMQExchangeConstant.A_TOPIC_EXCHANGE);
}
RabbitMQExchangeConstant 中配置 exchange 的名稱:
/**
* 以topic方法發(fā)送A信息的Exchange名稱
*/
public static final String A_TOPIC_EXCHANGE = "ATopicExchange";
consumer消費端配置Queue
在RabbitMQQueueConfig 配置 queue:
/**
* 創(chuàng)建綁定到 ATopicExchange 交換機的隊列
*
* @return
*/
@Bean
public Queue ATopicQueue() {
//傳入隊列名稱
return new Queue(RabbitMQQueueConstant.A_TOPIC_QUEUE);
}
在 RabbitMQQueueConstant 中指定隊列名稱:
/**
* 指定綁定到 ATopicExchange 交換機的隊列名稱,用于接收 A 類型的信息
*/
public static final String A_TOPIC_QUEUE = "A_TopicQueue";
consumer消費端配置bind信息
在 RabbitMQBindConfig 中配置bind信息:
/**
* 將 ATopicQueue 隊列綁定到 ATopicExchange
* 用 ATopicQueue 隊列接收 ATopicExchange 交換機發(fā)送過來的消息
* 指定路由key 為 A.#
* @param ATopicQueue
* @param ATopicExchange
* @return
*/
@Bean
public Binding bindATopicExchangeToA_BTopicQueue(Queue ATopicQueue, TopicExchange ATopicExchange) {
//以 direct 方式接收消息需要指定路由key掖举,也就是with傳入的參數
return BindingBuilder.bind(ATopicQueue).to(ATopicExchange).with(RabbitMQRoutKeyConstant.A_ALL);
}
在 RabbitMQRoutKeyConstant 中配置路由key的名稱:
/**
* 指定路由key 為 A.#匀借, 表示接收路由key為 A. 開頭的消息
*/
public static final String A_ALL = "A.#";
consumer消費端接收消息類
在 ATopicConsumer 類中主要定義接收消息邏輯,接收 A_BTopicQueue 隊列中的消息:
/**
* 用于接收A消息的消費者consumer
*/
@Component
//表示監(jiān)聽名稱為 A_TopicQueue 的消息隊列
@RabbitListener(queues = {RabbitMQQueueConstant.A_TOPIC_QUEUE})
public class ATopicConsumer {
/**
* 定義接收消息處理邏輯
* @param content
*/
@RabbitHandler
public void handler(String content) {
System.out.println("ATopicConsumer 接收到消息: " + content);
}
}
運行測試:
在 ASenderTest 中編寫測試用例:
/**
* 以 Topic 方式發(fā)送消息測試用例
*/
@Test
public void sendToATopicExchangeTest() {
String msg = "Hello, I am A.b msg. — sendToATopicExchange ";
//第一個參數為送的消息
//第二個參數為發(fā)送消息的路由key
this.aSender.sendToATopicExchange(msg, "A.b");
}
啟動 consumer 端也就是 spring-boot-rabbitmq-consumer 項目中的啟動類啟動消費者服務再登,然后運行 producer 端也就是 spring-boot-rabbitmq-producer 項目中的 ASenderTest 類的 sendToATopicExchangeTest方法有缆,用來發(fā)送消息,運行結果如下圖顯示:
至此俺叭,以 fanout恭取、direct、topic 方式發(fā)送與消費消息的例子都已經介紹完畢熄守。
關于 Exchange蜈垮、Queue 參數詳解可以參考:Spring boot集成RabbitMQ中Exchange與Queue參數詳解