一.概述
本文介紹Springboot如何集成RabbitMQ
本文內(nèi)容較多辜王,開始為最簡單的自動確認類的消息發(fā)送和消費嫉称,后續(xù)有發(fā)送方確認和接收方確認機制示例
關于RabbitMQ安裝就不說了恳邀,網(wǎng)上有很多
關于RabbitMQ的一些基礎概念和知識請查看前幾篇文章
二.RabbitMQ管理后臺
安裝好RabbitMQ并啟用管理后臺寒匙,訪問localhost:15672凿傅,輸入默認的用戶密碼guest/guest得到如下界面
三.SpringBoot集成
生產(chǎn)者集成
1.創(chuàng)建項目
我們首先創(chuàng)建一個Springboot項目demo-rabbitmq-producer
項目的
pom.xml
文件中引入依賴:
<!--springboot整合rabbitMQ只需引入amqp起步依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
完整的pom.xml
文件如下(注意demo-rabbitmq-common是上面項目截圖的自己定義的模塊項目萌京,里面只有一個User實體類泊交,為了后續(xù)測試發(fā)送實體類消息增加的):
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.1</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.tp</groupId>
<artifactId>demo-rabbitmq-producer</artifactId>
<version>1.0.0-SNAPSHOT</version>
<name>demo-rabbit-mq-producer</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!--web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--springboot整合rabbitMQ只需引入amqp起步依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!--demo-rabbitmq-common-->
<dependency>
<groupId>com.tp</groupId>
<artifactId>demo-rabbitmq-common</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<!--test-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
在application.yml
文件中進行配置:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: TP-HOST #虛擬主機乳讥,可以不設置使用server默認host
注意
:要保證guest用戶對虛擬主機有讀寫操作權(quán)限柱查,具體去RabbitMQ后臺管理系統(tǒng)配置
2.配置序列化策略
RabbitMQ序列化的選擇可以是jdk序列化,hessian云石,jackson唉工,protobuf等
而對于Java應用默認的序列化采用的是jdk序列化
SimpleMessageConverter對于要發(fā)送的消息體body為字節(jié)數(shù)組時,不進行處理汹忠。
消息本身假設是String淋硝,則將String轉(zhuǎn)成字節(jié)數(shù)組,假設是Java對象宽菜,則使用jdk序列化將消息轉(zhuǎn)成字節(jié)數(shù)組谣膳,轉(zhuǎn)出來的結(jié)果較大,含class類名铅乡、類對應方法等信息继谚,因此性能較差。
hessian阵幸、protobuf等都是基于壓縮反復字段的思想花履,降低數(shù)據(jù)傳輸量以提高性能。
jackson是以json表示來數(shù)據(jù)傳輸挚赊,性能優(yōu)于jdk序列化
所以使用RabbitMq作為中間件時诡壁,數(shù)據(jù)量比較大,此時就要考慮使用類似Jackson2JsonMessageConverter荠割、hessian等序列化形式欢峰,以此提高性能。
在生產(chǎn)者端增加RabbitMQConfig.java
:
package com.tp.demo.rabbitmq.producer.config;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* FileName: RabbitMQConfig
* Author: TP
* Date: 12/13/20 4:31 PM
* Description:開啟消息發(fā)送消息確認
*/
@Configuration
public class RabbitMQConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
// 設置序列化策略
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
3.聲明交換機涨共、隊列信息
需要注意的是纽帖,雖然官方推薦在生產(chǎn)者和消費者都增加交換機、隊列和綁定關系聲明举反,但個人更推薦生產(chǎn)環(huán)境通過管理后臺創(chuàng)建交換機懊直、隊列和聲明綁定關系
,一方面可以防止因為開發(fā)人員的代碼錯誤引發(fā)不必要的問題火鼻,另一方面也可以防止每次啟動創(chuàng)建上述這些東西
而如果我們無論在生產(chǎn)者端還是在消費者端進行聲明交換機室囊、隊列、將交換機和隊列進行綁定魁索,都會自動向MQ申請上述聲明融撞,MQ中如果有相同配置的聲明則自動返回成功,如果沒有則新建一個
本例為了方便粗蔚,暫時只在生產(chǎn)者端進行聲明尝偎,當?shù)谝淮伟l(fā)送消息的時候會去MQ服務器申請聲明信息
首先我們來一個直連交換機的例子:
package com.tp.demo.rabbitmq.producer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* FileName: DirectRabbitConfig
* Author: TP
* Date: 12/13/20 12:01 PM
* Description: 直連交換機Config
*/
@Configuration
public class DirectRabbitConfig {
/**
* 定義隊列,名字為testDirectQueue
*/
@Bean
public Queue testDirectQueue() {
// durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啟時仍然存在致扯,暫存隊列:當前連接有效
// exclusive:默認也是false肤寝,只能被當前創(chuàng)建的連接使用,而且當連接關閉后隊列即被刪除抖僵。此參考優(yōu)先級高于durable
// autoDelete:是否自動刪除鲤看,當沒有生產(chǎn)者或者消費者使用此隊列,該隊列會自動刪除耍群。
// return new Queue("testDirectQueue", true, true, false);
//一般設置一下隊列的持久化就好,其余兩個就是默認false
return new Queue("testDirectQueue", true);
}
/**
* 定義Direct類型交換機义桂,名字為testDirectExchange
*/
@Bean
DirectExchange testDirectExchange() {
return new DirectExchange("testDirectExchange", true, false);
}
/**
* 將隊列和交換機綁定,并設置路由鍵:testDirectRouting
*/
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(testDirectQueue()).to(testDirectExchange()).with("testDirectRouting");
}
}
4.發(fā)送消息
package com.tp.demo.rabbitmq.producer.controller.simple;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* FileName: SendDirectMessageController
* Author: TP
* Date: 12/13/20 12:11 PM
* Description:直連交換機消息發(fā)送Controller
*/
@RestController
public class SendDirectMessageController {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/sendDirectMessage")
public String sendDirectMessage() {
String message = "你好蹈垢,我是直連交換機過來的一條消息";
//將消息攜帶綁定鍵值:testDirectRouting 發(fā)送到交換機testDirectExchange
rabbitTemplate.convertAndSend("testDirectExchange", "testDirectRouting", message);
return "ok";
}
}
好了慷吊,發(fā)送方就是這么簡單,我們可以往MQ發(fā)送一條消息了
觀察管理后臺耘婚,發(fā)現(xiàn)隊列中已經(jīng)有消息了:
消費者集成
1.創(chuàng)建項目
新建一個Springboot項目:demo-rabbitmq-consumer
項目的pom.xml
文件中引入依賴:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.1</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.tp</groupId>
<artifactId>demo-rabbitmq-consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo-rabbitmq-consumer</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!--web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--springboot整合rabbitMQ只需引入amqp起步依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!--demo-rabbitmq-common-->
<dependency>
<groupId>com.tp</groupId>
<artifactId>demo-rabbitmq-common</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<!--test-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.yml文件中做如下配置:
server:
port: 7268
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: TP-HOST #虛擬主機罢浇,可以不設置使用server默認host
注意:生產(chǎn)者和消費者的虛擬主機要對應
2.配置序列化策略
因為生產(chǎn)者我們用了Jackson2JsonMessageConverter
進行序列化,所以消費者端我們也使用Jackson2JsonMessageConverter
進行反序列化
新建一個配置類RabbitMQConfig
如下:
package com.tp.demo.rabbitmq.consumer.config;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* FileName: RabbitMQConfig
* Author: TP
* Date: 12/14/20 9:03 AM
* Description:消息消費者配置
*/
@Configuration
public class RabbitMQConfig {
/**
* 消息消費者配置JSON反序列化使用Jackson2JsonMessageConverter沐祷,與消息生產(chǎn)者保持一致
*/
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
}
3.配置監(jiān)聽器
消費者端是通過監(jiān)聽器監(jiān)聽消息的嚷闭,我們配置一個監(jiān)聽器用于接收上面的隊列消息:
package com.tp.demo.rabbitmq.consumer.listener.simple;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* FileName: DirectListener
* Author: TP
* Date: 12/13/20 12:26 PM
* Description:直連交換機消費者監(jiān)聽器
* 直連交換機默認輪詢所有消費者
* 如果我們定義了多個消費者監(jiān)聽了同一個隊列,會以輪詢的方式消費赖临,且不存在重復消費
*/
@Component
public class DirectListener {
@RabbitListener(queues = "testDirectQueue")
public void onMessage(String message) {
System.out.println("DirectListener收到消息:" + message);
}
}
擴展:
- Spring對amqp的支持很靈活胞锰,在消費者端我們可以使用
org.springframework.amqp.core.Message
對象統(tǒng)一接收消息,也可以使用你喜歡的任意類型進行接收兢榨,但要保證發(fā)送時候和接收時候的對象類型要保持一致嗅榕。(例如String
、Map
吵聪、JavaBean
都可以) - 注解@RabbitListener可以定義在方法上凌那,也可以定義在類上,用以聲明一個消息監(jiān)聽器吟逝。
-- 如果定義在類上帽蝶,需要配合@RabbitHandler
標注在對應的方法上,指明具體使用哪個方法做監(jiān)聽
-- 如果定義在方法上块攒,則可以省略@RabbitHandler
- 如果我們定義了多個相同配置的消息監(jiān)聽器励稳,消費者會輪詢消費,且不會重復消費
啟動消費者項目囱井,在控制臺會得到如下輸出:
Connected to the target VM, address: '127.0.0.1:52798', transport: 'socket'
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.4.1)
2020-12-14 15:29:50.092 INFO 98654 --- [ main] .t.d.r.c.DemoRabbitmqConsumerApplication : Starting DemoRabbitmqConsumerApplication using Java 1.8.0_191 on tianpengdeMacBook-Pro.local with PID 98654 (/Users/tianpeng/workspace/tp/my-boot-rabbitmq/demo-rabbitmq-consumer/target/classes started by tianpeng in /Users/tianpeng/workspace/tp/my-boot-rabbitmq)
2020-12-14 15:29:50.094 INFO 98654 --- [ main] .t.d.r.c.DemoRabbitmqConsumerApplication : No active profile set, falling back to default profiles: default
2020-12-14 15:29:50.805 INFO 98654 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 7268 (http)
2020-12-14 15:29:50.812 INFO 98654 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2020-12-14 15:29:50.812 INFO 98654 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.41]
2020-12-14 15:29:50.861 INFO 98654 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2020-12-14 15:29:50.861 INFO 98654 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 735 ms
2020-12-14 15:29:51.184 INFO 98654 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2020-12-14 15:29:51.396 INFO 98654 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 7268 (http) with context path ''
2020-12-14 15:29:51.398 INFO 98654 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [127.0.0.1:5672]
2020-12-14 15:29:51.425 INFO 98654 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#37c36608:0/SimpleConnection@7e62cfa3 [delegate=amqp://guest@127.0.0.1:5672/TP-HOST, localPort= 52811]
DirectListener收到消息:你好驹尼,我是直連交換機過來的一條消息
2020-12-14 15:29:51.555 INFO 98654 --- [ main] .t.d.r.c.DemoRabbitmqConsumerApplication : Started DemoRabbitmqConsumerApplication in 1.777 seconds (JVM running for 2.34)
由此看到,消息被正確消費了庞呕,又由于我們采用的是RabbitMQ的默認消息確認機制:自動確認新翎,所以此條消息會被RabbitMQ從隊列中移除:
上述示例是直連交換機的演示,關于扇形交換機和主題交換機,這里分別給出消息生產(chǎn)者和消費者的代碼料祠,就不進行演示和后臺截圖了骆捧,相信大家自己將下面代碼copy到自己項目里澎羞,操作一下就能實現(xiàn)
生產(chǎn)者端扇形交換機聲明:
package com.tp.demo.rabbitmq.producer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* FileName: FanoutRabbitConfig
* Author: TP
* Date: 12/13/20 3:58 PM
* Description:
*/
@Configuration
public class FanoutRabbitConfig {
/**
* 創(chuàng)建三個隊列 :fanout.A fanout.B fanout.C
* 將三個隊列都綁定在交換機fanoutExchange上
* 因為是扇型交換機髓绽,路由鍵無需配置,配置也不起作用
*/
@Bean
public Queue queueA() {
return new Queue("fanout.A");
}
@Bean
public Queue queueB() {
return new Queue("fanout.B");
}
@Bean
public Queue queueC() {
return new Queue("fanout.C");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeA() {
return BindingBuilder.bind(queueA()).to(fanoutExchange());
}
@Bean
Binding bindingExchangeB() {
return BindingBuilder.bind(queueB()).to(fanoutExchange());
}
@Bean
Binding bindingExchangeC() {
return BindingBuilder.bind(queueC()).to(fanoutExchange());
}
}
生產(chǎn)者端主題交換機聲明:
package com.tp.demo.rabbitmq.producer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* FileName: TopicRabbitConfig
* Author: TP
* Date: 12/13/20 12:52 PM
* Description:主題交換機Config
*/
@Configuration
public class TopicRabbitConfig {
// 綁定鍵
private final static String man = "topic.man";
private final static String woman = "topic.woman";
@Bean
public Queue firstQueue() {
return new Queue(TopicRabbitConfig.man);
}
@Bean
public Queue secondQueue() {
return new Queue(TopicRabbitConfig.woman);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("topicExchange");
}
// 將firstQueue和topicExchange綁定,而且綁定的鍵值為topic.man
// 這樣只要是消息攜帶的路由鍵是topic.man,才會分發(fā)到該隊列
@Bean
Binding bindingExchangeMessage() {
return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
}
// 將secondQueue和topicExchange綁定,而且綁定的鍵值為用上通配路由鍵規(guī)則topic.#
// 這樣只要是消息攜帶的路由鍵是以topic.開頭,都會分發(fā)到該隊列
@Bean
Binding bindingExchangeMessage2() {
return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
}
}
生產(chǎn)者端發(fā)送消息
package com.tp.demo.rabbitmq.producer.controller.simple;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* FileName: SendFanoutMessageController
* Author: TP
* Date: 12/13/20 4:02 PM
* Description:扇形交換機消息發(fā)送Controller
*/
@RestController
public class SendFanoutMessageController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendFanoutMessage")
public String sendFanoutMessage() throws JsonProcessingException {
String message = "message: testFanoutMessage...";
for (int i = 0; i < 200; i++) {
rabbitTemplate.convertAndSend("fanoutExchange", null, message);
}
return "ok";
}
}
package com.tp.demo.rabbitmq.producer.controller.simple;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* FileName: SendTopicMessageController
* Author: TP
* Date: 12/13/20 12:56 PM
* Description:主題交換機消息發(fā)送Controller
*/
@RestController
public class SendTopicMessageController {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/sendTopicMessage1")
public String sendTopicMessage1() {
String message = "message: MAN ";
rabbitTemplate.convertAndSend("topicExchange", "topic.man", message);
return "ok";
}
@GetMapping("/sendTopicMessage2")
public String sendTopicMessage2() {
String message = "message: woman will all in";
rabbitTemplate.convertAndSend("topicExchange", "topic.woman", message);
return "ok";
}
}
消費者端消息監(jiān)聽器:
package com.tp.demo.rabbitmq.consumer.listener.simple;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* FileName: FanoutListenerA
* Author: TP
* Date: 12/13/20 4:07 PM
* Description:
*/
@Component
public class FanoutListenerA {
@RabbitListener(queues = "fanout.A")
public void onMessage(String message) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("FanoutListenerA收到消息:" + message);
}
}
package com.tp.demo.rabbitmq.consumer.listener.simple;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* FileName: FanoutListenerC
* Author: TP
* Date: 12/13/20 4:08 PM
* Description:
*/
@Component
public class FanoutListenerB {
@RabbitListener(queues = "fanout.B")
public void onMessage(String message) {
System.out.println("FanoutListenerB收到消息:" + message);
}
}
package com.tp.demo.rabbitmq.consumer.listener.simple;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* FileName: FanoutListenerC
* Author: TP
* Date: 12/13/20 4:08 PM
* Description:
*/
@Component
public class FanoutListenerC {
@RabbitListener(queues = "fanout.C")
public void onMessage(String message) {
System.out.println("FanoutListenerC收到消息:" + message);
}
}
package com.tp.demo.rabbitmq.consumer.listener.simple;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* FileName: TopicListener1
* Author: TP
* Date: 12/13/20 2:32 PM
* Description:
*/
@Component
public class TopicListener1 {
@RabbitListener(queues = "topic.man")
public void onMessage(String message) {
System.out.println("TopicListener1收到消息:" + message);
}
}
package com.tp.demo.rabbitmq.consumer.listener.simple;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* FileName: TopicListener
* Author: TP
* Date: 12/13/20 2:32 PM
* Description:
*/
@Component
public class TopicListener2 {
@RabbitListener(queues = "topic.woman")
public void process(String message) {
System.out.println("TopicListener2收到消息:" + message);
}
}
三.消息可靠投遞:消息確認機制
RabbitMQ中生產(chǎn)者端和消費者端都有消息確認機制妆绞,已盡量避免消息的丟失
消息生產(chǎn)者投遞確認
====================================================
生產(chǎn)者發(fā)送確認機制共有2種回調(diào):ConfirmCallback顺呕、ReturnCallback
兩種回調(diào)函數(shù)都是在什么情況會觸發(fā)呢?
總體來說括饶,推送消息存在以下四種情況:
①消息推送到server株茶,但是在server里找不到交換機
②消息推送到server,找到交換機了图焰,但是沒找到隊列
③消息推送到sever启盛,交換機和隊列啥都沒找到
④消息推送成功
====================================================
①這種情況觸發(fā)的是 ConfirmCallback回調(diào)函數(shù)
②這種情況觸發(fā)的是 ConfirmCallback和ReturnCallback兩個回調(diào)函數(shù)
③這種情況觸發(fā)的是 ConfirmCallback回調(diào)函數(shù)
④這種情況觸發(fā)的是 ConfirmCallback回調(diào)函數(shù)
====================================================
我們可以在回調(diào)函數(shù)根據(jù)需求做對應的擴展或者業(yè)務數(shù)據(jù)處理
為了支持生產(chǎn)者消息投遞確認,我們需要作如下內(nèi)容:
- 在
application.yml
中增加如下配置:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: TP-HOST #虛擬主機技羔,可以不設置使用server默認host
publisher-confirm-type: correlated #確認消息已發(fā)送到交換機(Exchange)
publisher-returns: true #確認消息已發(fā)送到隊列(Queue)
- 對之前生產(chǎn)者端的
RabbitMQConfig.java
進行改造如下:
package com.tp.demo.rabbitmq.producer.config;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* FileName: RabbitMQConfig
* Author: TP
* Date: 12/13/20 4:31 PM
* Description:開啟消息發(fā)送消息確認
*/
@Configuration
public class RabbitMQConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
// 設置開啟Mandatory僵闯,才能觸發(fā)回調(diào)函數(shù),無論消息推送結(jié)果怎么樣都強制調(diào)用回調(diào)函數(shù)
rabbitTemplate.setMandatory(true);
// 設置發(fā)送方確認
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
System.out.println("ConfirmCallback >>> " + "相關數(shù)據(jù):" + correlationData);
System.out.println("ConfirmCallback >>> " + "確認情況:" + ack);
System.out.println("ConfirmCallback >>> " + "原因:" + cause);
});
rabbitTemplate.setReturnsCallback(e -> {
System.out.println("ReturnCallback >>> " + "消息:" + e.getMessage());
System.out.println("ReturnCallback >>> " + "回應碼:" + e.getReplyCode());
System.out.println("ReturnCallback >>> " + "回應信息:" + e.getReplyText());
System.out.println("ReturnCallback >>> " + "交換機:" + e.getExchange());
System.out.println("ReturnCallback >>> " + "路由鍵:" + e.getRoutingKey());
});
// 設置序列化策略
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
為了方便藤滥,我們封裝了一個消息發(fā)送工具:
package com.tp.demo.rabbitmq.producer.sender;
import com.tp.demo.rabbitmq.producer.utils.RandomUtils;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* FileName: RabbitSender
* Author: TP
* Date: 12/14/20 9:16 AM
* Description:封裝一個RabbitMQ發(fā)送消息對象鳖粟,方便使用
* 當然,你也可以直接使用RabbitTemplate
*/
@Component
public class RabbitSender {
@Autowired
private RabbitTemplate rabbitTemplate;
//發(fā)送消息方法調(diào)用: 構(gòu)建Message消息
public void convertAndSend(String exchange, String routingKey, Object message) {
// 時間戳+6位隨機字符保證全局唯一
// 用于ack保證唯一一條消息(在做補償策略的時候拙绊,必須保證這是全局唯一的消息)
// 在消費方可以通過message.getMessageProperties().getHeaders().get("spring_returned_message_correlation")獲取到該CorrelationData
CorrelationData correlationData = new CorrelationData(RandomUtils.UUID());
// 發(fā)送消息
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
}
}
這個方法中向图,每次發(fā)送消息的時候會生成一個全局唯一標識放入CorrelationData
,CorrelationData
我們也可以封裝業(yè)務ID信息(把send方法增加個參數(shù)标沪,在發(fā)送消息的時候指定)榄攀,這樣我們就可以在消息發(fā)送失敗的時候,根據(jù)業(yè)務ID進行自己的補償機制
我們發(fā)送一個不存在的交換機進行測試:
package com.tp.demo.rabbitmq.producer.controller.simple;
import com.tp.demo.rabbitmq.producer.sender.RabbitSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* FileName: TestSendFailCallbackController
* Author: TP
* Date: 12/14/20 10:22 AM
* Description:
*/
@RestController
public class TestSendFailCallbackController {
@Autowired
RabbitSender rabbitSender;
@GetMapping("/sendMessageFail")
public String sendDirectMessage() {
String message = "Hello金句,This will fail...";
//將消息攜帶綁定鍵值:testDirectRouting 發(fā)送到交換機testDirectExchange
rabbitSender.convertAndSend("none_exchange", "testDirectRouting", message);
return "ok";
}
}
查看控制臺輸出:
Connected to the target VM, address: '127.0.0.1:53731', transport: 'socket'
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.4.1)
2020-12-14 16:01:15.954 INFO 5063 --- [ main] .t.d.r.p.DemoRabbitMqProducerApplication : Starting DemoRabbitMqProducerApplication using Java 1.8.0_191 on tianpengdeMacBook-Pro.local with PID 5063 (/Users/tianpeng/workspace/tp/my-boot-rabbitmq/demo-rabbitmq-producer/target/classes started by tianpeng in /Users/tianpeng/workspace/tp/my-boot-rabbitmq)
2020-12-14 16:01:15.957 INFO 5063 --- [ main] .t.d.r.p.DemoRabbitMqProducerApplication : No active profile set, falling back to default profiles: default
2020-12-14 16:01:16.791 INFO 5063 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 7269 (http)
2020-12-14 16:01:16.800 INFO 5063 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2020-12-14 16:01:16.800 INFO 5063 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.41]
2020-12-14 16:01:16.854 INFO 5063 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2020-12-14 16:01:16.855 INFO 5063 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 862 ms
2020-12-14 16:01:17.204 INFO 5063 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2020-12-14 16:01:17.362 INFO 5063 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 7269 (http) with context path ''
2020-12-14 16:01:17.369 INFO 5063 --- [ main] .t.d.r.p.DemoRabbitMqProducerApplication : Started DemoRabbitMqProducerApplication in 1.72 seconds (JVM running for 2.142)
2020-12-14 16:02:26.993 INFO 5063 --- [nio-7269-exec-2] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2020-12-14 16:02:26.993 INFO 5063 --- [nio-7269-exec-2] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2020-12-14 16:02:26.994 INFO 5063 --- [nio-7269-exec-2] o.s.web.servlet.DispatcherServlet : Completed initialization in 1 ms
2020-12-14 16:02:27.074 INFO 5063 --- [nio-7269-exec-2] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [127.0.0.1:5672]
2020-12-14 16:02:27.100 INFO 5063 --- [nio-7269-exec-2] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#7bab5898:0/SimpleConnection@1759112d [delegate=amqp://guest@127.0.0.1:5672/TP-HOST, localPort= 53771]
2020-12-14 16:02:27.139 ERROR 5063 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'none_exchange' in vhost 'TP-HOST', class-id=60, method-id=40)
ConfirmCallback >>> 相關數(shù)據(jù):CorrelationData [id=20201214160227064961897]
ConfirmCallback >>> 確認情況:false
ConfirmCallback >>> 原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'none_exchange' in vhost 'TP-HOST', class-id=60, method-id=40)
可以看到CorrelationData中取到了我們生成的UUID檩赢,我們可以根據(jù)這個UUID做自己的要業(yè)務補償
順便貼一下自定義的RandomUtils
:
package com.tp.demo.rabbitmq.producer.utils;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Random;
/**
* FileName: RandomUtils
* Author: TP
* Date: 12/14/20 9:36 AM
* Description:
*/
public class RandomUtils {
public synchronized static String UUID() {
DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS");
String current = LocalDateTime.now().format(dtf);
return current + getRandomString(6);
}
public synchronized static String UUID(String prefix) {
DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS");
String current = LocalDateTime.now().format(dtf);
return prefix + current + getRandomString(6);
}
private static String getRandomString(int length) {
StringBuffer sb = new StringBuffer();
if (length > 0) {
for (int i = 0; i < length; i++) {
sb.append(new Random().nextInt(10));
}
return sb.toString();
}
return null;
}
}
到此,消息生產(chǎn)者確認機制就算完成了
消息消費者投遞確認
重頭戲來了趴梢,消息消費者端RabbitMQ默認是自動確認的漠畜,只要消息發(fā)送到了消費者,則認為消息已被消費坞靶,消息在RabbitMQ服務器會被移除憔狞,這顯然在生產(chǎn)環(huán)境上是極度危險的,所以我們都會設置消息消費者端的消費確認為手動彰阴,具體步驟如下:
-
改造消息消費者端的
RabbitMQConfig.java
:
package com.tp.demo.rabbitmq.consumer.config;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* FileName: RabbitMQConfig
* Author: TP
* Date: 12/14/20 9:03 AM
* Description:消息消費者配置
*/
@Configuration
public class RabbitMQConfig {
/**
* 消息消費者配置JSON反序列化使用Jackson2JsonMessageConverter瘾敢,與消息生產(chǎn)者保持一致
*/
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
}
注意:對于沒有自己聲明上述SimpleRabbitListenerContainerFactory
的同學,可以在yml中直接配置:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: TP-HOST #虛擬主機,可以不設置使用server默認host
listener:
simple:
acknowledge-mode: manual
而我們自己聲明了SimpleRabbitListenerContainerFactory
簇抵,這時如果在yml中增加上述配置是無效的
- 監(jiān)聽器內(nèi)進行手動消息確認
package com.tp.demo.rabbitmq.consumer.listener.ack;
import com.rabbitmq.client.Channel;
import com.tp.demo.rabbitmq.common.entity.User;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* FileName: DirectAckListener
* Author: TP
* Date: 12/13/20 7:02 PM
* Description:
* 注解@RabbitListener可以定義在方法上也可以定義在類上
* -- 如果定義在類上庆杜,需要配合@RabbitHandler標注在方法上,指明具體使用哪個方法做監(jiān)聽
* -- 如果定義在方法上碟摆,則可以省略@RabbitHandler
*/
@Component
public class DirectAckListener {
@RabbitListener(queues = "testDirectAckQueue")
public void process(Message message, Channel channel) throws IOException {
System.out.println("接收到消息總體內(nèi)容:" + message);
System.out.println("實際消息內(nèi)容:" + new String(message.getBody()));
// TODO 業(yè)務邏輯
// 1.獲取message中的body晃财,解析消息內(nèi)容
// 2.其他業(yè)務邏輯......
// 回執(zhí)情形1:消費成功
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 回執(zhí)情形2:消費成功消費處理失敗,重新放入隊列(一定要慎用典蜕,防止造成無限返回隊列->消費者->返回隊列.....造成消息積壓)
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
// 回執(zhí)情形3:消費處理失敗断盛,拒絕接收(可以指定是否重新放入隊列,如果消息不重新放入隊列愉舔,RabbitMQ服務端會將消息移除)
// channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
/**
* 如果你很懶钢猛,不想從message中獲取body再自己反序列化為想要的實體bean怎么辦?
* Spring對rabbitMQ的集成允許我們直接使用bean接收轩缤,如下:直接可以用形參封裝
* 擴展:我們可以在生產(chǎn)者端發(fā)送任意類型的消息命迈,并且在消費者端直接用形參封裝,但你必須保證用的是同一種數(shù)據(jù)類型
* 注意:如果想測試這種快捷方式火的,請將注解注釋放開壶愤,并將上面的process全部注釋掉
*/
// @RabbitListener(queues = "testDirectAckQueue")
public void process(User user, Message message, Channel channel) throws IOException {
System.out.println(user);
// TODO 業(yè)務邏輯
// ......
// 回執(zhí)情形1:進行消息回執(zhí)
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
// 回執(zhí)情形2:消費成功消費處理失敗,重新放入隊列(一定要慎用卫玖,防止造成無限返回隊列->消費者->返回隊列.....造成消息積壓)
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
// 回執(zhí)情形3:消費處理失敗公你,拒絕接收(可以指定是否重新放入隊列,如果消息不重新放入隊列假瞬,RabbitMQ服務端會將消息移除)
// channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
}
當我們設置了消息手動確認陕靠,如果消息到了消費者而消費者一直不確認,在RabbitMQ中這條消息將會一直處于unacked
待確認狀態(tài)脱茉,直到消費者與RabbitMQ斷開連接剪芥,這條消息又會重新變成ready
狀態(tài),消費者重啟后會重新消費消息琴许,對于消費者手動確認税肪,其回執(zhí)方式有3種,詳見上述代碼的注釋榜田,這里就不再說明了
測試:
發(fā)送一條需要手動確認的消息如下:
package com.tp.demo.rabbitmq.producer.controller.ack;
import com.tp.demo.rabbitmq.common.entity.User;
import com.tp.demo.rabbitmq.producer.sender.RabbitSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
/**
* FileName: SendDirectMessageController
* Author: TP
* Date: 12/13/20 12:11 PM
* Description:直連交換機消息發(fā)送Controller
*/
@RestController
public class SendDirectAckMessageController {
@Autowired
RabbitSender rabbitSender;
@GetMapping("/sendDirectMessageAck")
public String sendDirectMessage() {
User user = new User();
user.setId(1);
user.setUserName("TP");
user.setPassWord("pwd123456");
user.setAge(18);
user.setCreateTime(LocalDateTime.now());
rabbitSender.convertAndSend("testDirectAckExchange", "testDirectAckRouting", user);
return "ok";
}
@GetMapping("/sendDirectMessageAck2")
public String sendDirectMessage2() {
for (int i = 20; i <= 30; i++) {
rabbitSender.convertAndSend("testDirectAckExchange", "testDirectAckRouting", "我是一條需要確認的消息");
}
return "ok";
}
}
我們在消費者端加上debug益兄,讓消息先不走回執(zhí),觀察效果:
放開debu后:
控制臺輸出:
接收到消息總體內(nèi)容:(Body:'{"id":1,"userName":"TP","passWord":"pwd123456","age":18,"createTime":[2020,12,14,16,35,16,705000000]}' MessageProperties [headers={spring_listener_return_correlation=97fb83de-fd43-4529-9d24-abef559a9fcb, spring_returned_message_correlation=20201214163516705225241, __TypeId__=com.tp.demo.rabbitmq.common.entity.User}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=testDirectAckExchange, receivedRoutingKey=testDirectAckRouting, deliveryTag=2, consumerTag=amq.ctag-F5bUnBjCf-umNuRMKLLUAg, consumerQueue=testDirectAckQueue])
實際消息內(nèi)容:{"id":1,"userName":"TP","passWord":"pwd123456","age":18,"createTime":[2020,12,14,16,35,16,705000000]}
其他2種回執(zhí)箭券,請自行測試
本示例發(fā)送實體類消息用的User類如下:
package com.tp.demo.rabbitmq.common.entity;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import lombok.Data;
import java.time.LocalDateTime;
/**
* FileName: User
* Author: TP
* Date: 12/14/20 11:33 AM
* Description:
*/
@Data
public class User {
private Integer id;
private String userName;
private String passWord;
private Integer age;
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
@JsonSerialize(using = LocalDateTimeSerializer.class)
private LocalDateTime createTime;
}
至此净捅,Springboot整合RabbitMQ完畢
問題
鄙人在測試中遇到如下問題:消費者端已經(jīng)開啟了消費手動確認,如果我們發(fā)送一條消息辩块,消息內(nèi)容為一個JavaBean
蛔六,如果在消費者端監(jiān)聽器進行反序列化消息內(nèi)容到Message
參數(shù)時失敗拋出異常了荆永,則RabbitMQ會直接將消息移除,而不會將這條消息標記為unacked
国章,這會導致消息丟失
為什么會這樣我也不知道具钥,如果有高人看到此篇文章并對這種情形有理解,請留言液兽,不勝感激骂删!
當然了,我們可以轉(zhuǎn)JSON發(fā)送String消息抵碟,然后自己接收后再解析桃漾,也可以生產(chǎn)者和消費者引用maven私服內(nèi)的同一個jar包坏匪,同一個實體類不會出現(xiàn)反序列化失敗的問題拟逮,不理解的是如果Message就是封裝失敗了,為什么會將這條消息移除呢适滓,而不是標記為未確認呢敦迄??凭迹?
后續(xù)
針對上面的問題罚屋,通過艱難的Spring-amqp源碼debug,定位了問題所在:
消費端監(jiān)聽器如果封裝參數(shù)失敗會拋出:org.springframework.amqp.rabbit.support.ListenerExecutionFailedException
嗅绸,這個異常被認為為fatal異常脾猛,也就是致命異常
圖中可以看出,這種情形下鱼鸠,Spring-amqp在進行nack的時候猛拴,是否requeue最終為false,所以不會重新放入隊列中
那么針對這種情形蚀狰,我們怎么解決呢愉昆?
我們可以使用死信隊列,這種情況消息會被認為是死信并發(fā)送到死信隊列里(如果已經(jīng)配置)
并且強烈推薦生產(chǎn)環(huán)境為自己的所有業(yè)務隊列配置上死信隊列麻蹋,能保證消息的可靠性跛溉,通過死信隊列進行業(yè)務補償。