一、整體架構(gòu)和核心概念
作用:異步、消峰晋修、解耦
二、使用springboot收發(fā)消息(Demo)
1. 引入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>3.2.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
2. 配置rabbitmq
- 創(chuàng)建虛擬主機(jī)(Virtual host):
/wlhost
- 創(chuàng)建隊(duì)列(queue):
wl.queue
- 將
wl.queue
綁定在/wlhost
上(用可以控制此虛擬主機(jī)的用戶創(chuàng)建該隊(duì)列即可)
rabbitmq:
host: 192.168.0.100
port: 5672
username: admin
password: 123456
virtual-host: /wlhost
3. 發(fā)送消息代碼
@SpringBootTest
public class SpringBootAmqpTest {
@Resource
private AmqpTemplate amqpTemplate;
@Test
void testSendMsg() {
amqpTemplate.convertAndSend("wl.queue", "hello amqp!");
}
}
4. 接收消息代碼
@Slf4j
@Component
public class MqListener {
@RabbitListener(queues = "wl.queue")
public void receivedMsg(String msg) {
log.info("收到了消息:{}", msg);
}
}
三烟阐、work模型(能者多勞)
1. 配置prefetch(處理完成才會接收消息,未處理完不會接收消息)
rabbitmq:
host: 192.168.0.100
port: 5672
username: admin
password: 123456
virtual-host: /wlhost
listener:
simple:
prefetch: 1
2. 發(fā)送消息代碼
@Test
void testSendWorkMsg() {
for (int i = 1; i <= 50; i++) {
amqpTemplate.convertAndSend("work.queue", "hello worker, message_" + i);
}
}
3. 接收消息代碼
@RabbitListener(queues = "work.queue")
public void worker1ReceivedMsg(String msg) throws InterruptedException {
log.info("收到了消息:{}", msg);
Thread.sleep(20);
}
@RabbitListener(queues = "work.queue")
public void worker2ReceivedMsg(String msg) throws InterruptedException {
log.warn("收到了消息:{}", msg);
Thread.sleep(200);
}
四紊扬、四種交換機(jī)
4.1 fanout交換機(jī)
fanout exchange
會將收到的消息廣播到每一個(gè)跟其綁定的queue
蜒茄,因此也叫廣播模式
所有的微服務(wù)以及對應(yīng)的集群節(jié)點(diǎn)都會收到fanout exchange
交換機(jī)跟其綁定的queue
的消息
1. 配置fanout exchange
添加
wl.fanout
交換機(jī),并為其添加兩個(gè)隊(duì)列fanout.queue1
餐屎,fanout.queue2
2. 發(fā)送消息代碼
@Test
void fanoutSendMsg() {
String exchangeName = "wl.fanout";
amqpTemplate.convertAndSend(exchangeName, "", "hello everyone!");
}
3. 接收消息代碼
@RabbitListener(queues = "fanout.queue1")
public void fanout1ReceivedMsg(String msg) {
log.info("消費(fèi)者1收到了消息:{}", msg);
}
@RabbitListener(queues = "fanout.queue2")
public void fanout2ReceivedMsg(String msg) {
log.info("消費(fèi)者2收到了消息:{}", msg);
}
4.2 direct交換機(jī)
direct exchange
會將收到的消息根據(jù)路由規(guī)則到指定queue
檀葛,因此也叫定向路由
- 每一個(gè)
queue
都與Exchange
設(shè)置一個(gè)BindingKey
- 發(fā)布者發(fā)送消息時(shí),指定消息的
BindingKey
Exchange
將消息路由到BindingKey
與消息BindingKey
一致的隊(duì)列若所有的
BindingKey
均一致腹缩,direct exchange
也能實(shí)現(xiàn)fanout exchange
的廣播效果
1. 配置direct exchange
添加
wl.direct
交換機(jī)屿聋,并為其添加兩個(gè)隊(duì)列direct.queue1
,direct.queue2
2. 發(fā)送消息代碼
@Test
void directSendMsg() {
String exchangeName = "wl.direct";
amqpTemplate.convertAndSend(exchangeName, "red", "hello red!");
}
3. 接收消息代碼
@RabbitListener(queues = "direct.queue1")
public void direct1ReceivedMsg(String msg) {
log.info("消費(fèi)者1可以收到routingKey為blue和red的消息:{}", msg);
}
@RabbitListener(queues = "direct.queue2")
public void direct2ReceivedMsg(String msg) {
log.info("消費(fèi)者2可以收到routingKey為yellow和red的消息:{}", msg);
}
4. 將發(fā)送消息
routingKey
改為yellow
@Test
void directSendMsg() {
String exchangeName = "wl.direct";
amqpTemplate.convertAndSend(exchangeName, "yellow", "hello red!");
}
4.3 topic交換機(jī)(推薦使用)
topic exchange
與direct exchange
類似藏鹊,區(qū)別咋子與routingKey
可以使多個(gè)單詞的列表润讥,并以.
分割
queue
與exchange
指定routingKey
時(shí)可以使用通配符
- “#”:代指0個(gè)或多個(gè)單詞
- “*”:代指1個(gè)單詞
1. 配置topic exchange
添加
wl.topic
交換機(jī),并為其添加兩個(gè)隊(duì)列topic.queue1
盘寡,topic.queue2
2. 發(fā)送消息代碼
@Test
void topicSendMsg() {
String exchangeName = "wl.topic";
amqpTemplate.convertAndSend(exchangeName, "china.weather", "這是中國天氣消息!");
}
3. 接收消息代碼
@RabbitListener(queues = "topic.queue1")
public void topic1ReceivedMsg(String msg) {
log.info("消費(fèi)者1可以收到routingKey含weather的消息:{}", msg);
}
@RabbitListener(queues = "topic.queue2")
public void topic2ReceivedMsg(String msg) {
log.info("消費(fèi)者2可以收到routingKey含china的消息:{}", msg);
}
4.4 headers交換機(jī)
不依賴路由鍵匹配規(guī)則路由消息楚殿。是根據(jù)發(fā)送消息內(nèi)容中的headers屬性進(jìn)行匹配。性能差竿痰,基本用不到
五脆粥、創(chuàng)建(聲明)隊(duì)列和交換機(jī)
1. 使用注解創(chuàng)建(推薦使用)
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "wl.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void direct1ReceivedMsg(String msg) {
log.info("消費(fèi)者1可以收到routingKey為blue和red的消息:{}", msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "wl.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void direct2ReceivedMsg(String msg) {
log.info("消費(fèi)者2可以收到routingKey為yellow和red的消息:{}", msg);
}
2. 使用Bean注入創(chuàng)建
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MqConfig {
@Bean
public FanoutExchange fanoutExchange() {
// ExchangeBuilder.fanoutExchange("wl.fanout").build();
return new FanoutExchange("wl.fanout");
}
@Bean
public Queue queue() {
// return QueueBuilder.durable("fanout.queue1").build();
return new Queue("fanout.queue1");
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(fanoutExchange());
}
}
六、消息轉(zhuǎn)換器
發(fā)送消息和接收消息方都需要
引入依賴
影涉,注入消息轉(zhuǎn)換器
否則會使用jdk
自帶的序列化方式ObjectOutputStream
1. 引入依賴
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
2. 注入消息轉(zhuǎn)換器
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class SpringbootConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootConsumerApplication.class, args);
}
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
3. 發(fā)送消息代碼
@Test
void testSendObjectMsg() {
String exchangeName = "wl.object.exchange";
Student student = Student.builder()
.studentId(111)
.studentName("張三")
.gender("男")
.age(24)
.tel("13265659898")
.clazzId(4)
.build();
amqpTemplate.convertAndSend(exchangeName, "", student);
}
4. 接收消息代碼
@RabbitListener(queues = "wl.object.queue")
public void objectReceivedMsg(Map<String, String> map) {
log.warn("收到了消息:{}", map);
}
七冠绢、消息可靠性問題
7.1 生產(chǎn)者重連(rabbitmq服務(wù)器宕機(jī),重連)
解決消息發(fā)送給服務(wù)器是丟失問題
當(dāng)網(wǎng)絡(luò)不穩(wěn)定時(shí)常潮,利用重試機(jī)制可以提高消息發(fā)送的成功率,不過SpirngAMQP的重試機(jī)制是
阻塞式
的楷力,也就是多次重試等待的過程中喊式,當(dāng)前線程是被阻塞的孵户,會影響業(yè)務(wù)性能如果對于 業(yè)務(wù)性能有要求,建議
禁用
重試機(jī)制岔留。如一定要用夏哭,請?jiān)O(shè)置合理的等待時(shí)長
和重試次數(shù)
,當(dāng)然也可以考慮用異步
線程來執(zhí)行發(fā)送消息的代碼献联。
rabbitmq:
connection-timeout: 1s #設(shè)置mq的連接超時(shí)時(shí)間
template:
retry:
enabled: true #開啟超時(shí)重試機(jī)制
initial-interval: 1000ms #失敗后的初始等待時(shí)間
multiplier: 1 #失敗后下次的等待時(shí)長倍數(shù)竖配,下次等待時(shí)長 = initial-interval * multiplier
7.2 生產(chǎn)者確認(rèn)機(jī)制
rabbitMQ有
publisher return
和publisher confirm
兩種確認(rèn)機(jī)制。開啟這兩種確認(rèn)及之后里逆,MQ成功收到消息后會返回確認(rèn)消息給生產(chǎn)者进胯。返回的結(jié)果有以下幾種情況:
exchange
存在,routingKey
不存在原押,MQ返回ack胁镐,但會通過publisher return
返回路由異常,告知投遞成功exchange
诸衔、routingKey
均存在盯漂,臨時(shí)消息入隊(duì)成功,返回ack
笨农,告知投遞成功exchange
就缆、routingKey
均存在,持久消息入隊(duì)成功且完成持久化谒亦,返回ack
竭宰,告知投遞成功- 其他情況都會返回
nack
,告知投遞失敗
publisher-returns:默認(rèn)為false诊霹,不開啟publisher return機(jī)制
publisher-confirm-type:默認(rèn)為none羞延,publisher confirm機(jī)制
- none:關(guān)閉confirm
- simple:同步阻塞等待mq的回執(zhí)消息
- correlated:異步回調(diào)返回mq的回執(zhí)消息
1. 配置生產(chǎn)者確認(rèn)機(jī)制
rabbitmq:
host: 192.168.0.100
port: 5672
username: admin
password: 123456
virtual-host: /wlhost
connection-timeout: 1s #設(shè)置mq的連接超時(shí)時(shí)間
publisher-returns: true #開啟publisher return機(jī)制
#none:關(guān)閉confirm,simple:同步阻塞等待mq的回執(zhí)消息脾还,correlated:異步回調(diào)返回mq的回執(zhí)消息
publisher-confirm-type: correlated #publisher confirm機(jī)制伴箩;
2. 設(shè)置publisher return
@Configuration
@Slf4j
public class MqConfig {
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct
public RabbitTemplate rabbitTemplate() {
rabbitTemplate.setReturnsCallback(returnedMessage -> {
log.info("收到消息的return callback,exchange:{}鄙漏,routingKey:{}嗤谚,msg:{},replyCode:{}怔蚌,replyText:{}",
returnedMessage.getMessage(),
returnedMessage.getRoutingKey(),
returnedMessage.getMessage(),
returnedMessage.getReplyCode(),
returnedMessage.getReplyText()
);
});
return rabbitTemplate;
}
}
3. 發(fā)送消息代碼巩步,設(shè)置publisher confirm
@Test
void publisherConfirmSendMsg() {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable throwable) {
log.error("消息回調(diào)失敗", throwable);
}
@Override
public void onSuccess(CorrelationData.Confirm confirm) {
log.info("收到消息回執(zhí)");
if (confirm.isAck()) {
log.info("消息發(fā)送成功,收到ack");
} else {
log.error("消息發(fā)送失敗桦踊,收到nack椅野,原因:{}", confirm.getReason());
}
}
});
String exchangeName = "wl.topic";
// rabbitTemplate.convertAndSend(exchangeName, "aaa.bbb", "這是中國天氣消息!", correlationData);
rabbitTemplate.convertAndSend(exchangeName, "china.weather", "這是中國天氣消息!", correlationData);
}
若將
exchangeName
修改為不存在的名稱wls.topic
若將
routingKey
修改為不存在的名稱aaa.bbb
7.3 消費(fèi)者確認(rèn)機(jī)制
當(dāng)開啟消費(fèi)者確認(rèn)機(jī)制
Consumer Acknowledgement
后,消費(fèi)者處理消息結(jié)束后,會向MQ發(fā)送一個(gè)回執(zhí)竟闪,告知rabbitmq
消息處理的狀態(tài)离福。回執(zhí)有三種可選值:
- ack:成功處理消息炼蛤,
rabbitmq
從queue
中刪除該消息- nack:處理消息失敗妖爷,
rabbitmq
需要再次投遞消息- reject:處理消息失敗并拒絕該消息,
rabbitmq
從queue
中刪除該消息
針對
rabbitmq
的消費(fèi)者確認(rèn)機(jī)制理朋,SpringAMQP
已經(jīng)實(shí)現(xiàn)了消息確認(rèn)功能絮识,并允許我們通過配置文件的方式選擇ack處理方式:
- none:不處理。消息投遞給消費(fèi)者后立即返回
ack
嗽上,消息從mq刪除次舌。不建議使用- manual:手動(dòng)模式。需要自己在業(yè)務(wù)代碼中調(diào)用
api
炸裆,發(fā)送ack
或reject
垃它,存在業(yè)務(wù)入侵,但更靈活- auto:自動(dòng)模式烹看。利用
AOP
對接收消息的處理邏輯做環(huán)繞增強(qiáng),業(yè)務(wù)正常執(zhí)行自動(dòng)返回ack
国拇,推薦使用
當(dāng)業(yè)務(wù)出現(xiàn)異常時(shí),根據(jù)異常判斷返回不同結(jié)果:
- 業(yè)務(wù)異常惯殊,例如
RuntimeException
酱吝,自動(dòng)返回nack
- 消息處理或校驗(yàn)異常,例如
MessageCoversionException
土思,自動(dòng)返回reject
1. 配置消費(fèi)者確認(rèn)機(jī)制
rabbitmq:
host: 192.168.0.100
port: 5672
username: admin
password: 123456
virtual-host: /wlhost
listener:
simple:
acknowledge-mode: auto #none:不處理务热;manual:手動(dòng)模式;auto:自動(dòng)模式
2. 配置消費(fèi)者重試機(jī)制
此時(shí)己儒,若拋出
RuntimeException
崎岂,mq的消息沒有出隊(duì),會無限重新投遞消息闪湾,給mq服務(wù)器帶來不必要的壓力冲甘,影響mq的性能,利用Spring
的retry
機(jī)制途样,在消費(fèi)者出現(xiàn)異常時(shí)利用本地重試江醇。配置如下:
rabbitmq:
host: 192.168.0.100
port: 5672
username: admin
password: 123456
virtual-host: /wlhost
listener:
simple:
acknowledge-mode: auto
retry:
enabled: true
max-attempts: 3 #最大重試次數(shù)
stateless: true #true無狀態(tài),false有狀態(tài)何暇。如果業(yè)務(wù)中包含事務(wù)陶夜,這里改為false
注:重試次數(shù)耗盡之后,不管消息是否消費(fèi)成功裆站,消息都會出隊(duì)条辟,會導(dǎo)致消息丟失
3. 配置失敗消息處理策略
在開啟重試模式后黔夭,重試次數(shù)耗盡,如果消息依然失敗捂贿,則需要有
MessageRecoverer
接口來處理纠修,三種實(shí)現(xiàn):
RejectAndDontRequeueRecoverer
:重試耗盡后,直接reject
厂僧,丟棄消息。默認(rèn)方式ImmediateRequeueMessageRecoverer
:重試耗盡后了牛,返回nack
颜屠,消息重新入隊(duì)RepublishMessageRecoverer
:重試耗盡后,將失敗的消息投遞到指定的交換機(jī)
配置失敗消息處理策略信息
@Configuration
// 只有spring.rabbitmq.listener.simple.retry.enabled = true時(shí)鹰祸,此配置才會生效
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled", havingValue = "true")
public class ErrorMqConfig {
@Bean
public DirectExchange errorExchange() {
return new DirectExchange("error.exchange");
}
@Bean
public Queue errorQueue() {
return new Queue("error.queue");
}
@Bean
public Binding errorBinding(DirectExchange errorExchange, Queue errorQueue) {
return BindingBuilder.bind(errorQueue).to(errorExchange).with("error"); // error是routingKey
}
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
return new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error");
}
}
生產(chǎn)者代碼
@Test
void testSendObjectMsg() {
String exchangeName = "wl.object.exchange";
Student student = Student.builder()
.studentId(111)
.studentName("張三")
.gender("男")
.age(24)
.tel("13265659898")
.clazzId(4)
.build();
rabbitTemplate.convertAndSend(exchangeName, "", student);
}
消費(fèi)者代碼
@RabbitListener(queues = "wl.object.queue")
public void objectReceivedMsg(Map<String, String> map) {
log.warn("收到了消息:{}", map);
// throw new RuntimeException("拋出業(yè)務(wù)異常");
throw new MessageConversionException("拋出業(yè)務(wù)異常");
}
4. 業(yè)務(wù)冪等性
方案一:唯一消息id
- 為每一條消息生成一個(gè)唯一的id甫窟,與消息一起發(fā)送給消費(fèi)者
- 消費(fèi)者收到消息處理完業(yè)務(wù)后,將消息id保存到數(shù)據(jù)庫
- 如果下次收到相同的消息蛙婴,去數(shù)據(jù)庫查詢判斷是否存在粗井,存在則不處理本次消息
開啟默認(rèn)的messageId,converter.setCreateMessageIds(true);
@SpringBootApplication
public class SpringbootConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootConsumerApplication.class, args);
}
@Bean
public MessageConverter messageConverter() {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
converter.setCreateMessageIds(true);
return converter;
}
}
此id為
UUID.randomUUID().toString()
街图,可根據(jù)自己的需求更改成其他id浇衬,如分布式id、雪花算法id等
if (this.createMessageIds && messageProperties.getMessageId() == null) {
messageProperties.setMessageId(UUID.randomUUID().toString());
}
方案二:結(jié)合業(yè)務(wù)邏輯
- 比如更新訂單狀態(tài)(未支付=>已支付)餐济,可以先查詢當(dāng)前訂單是否為已支付耘擂,若為已支付,不處理本次消息
八絮姆、數(shù)據(jù)持久化
消息有兩種類型醉冤,臨時(shí)消息
non-persistent
和持久化消息persistent
,通過springAMQP發(fā)送的消息默認(rèn)都是持久化的篙悯。 發(fā)送消息時(shí)可以指定消息類型蚁阳,deliver mode = 1
是臨時(shí)消息,deliver mode = 2
是持久化消息
臨時(shí)消息存在內(nèi)存中鸽照,持久化消息存在磁盤上螺捐,當(dāng)內(nèi)存消息滿時(shí),會觸發(fā)pageout
移宅,此時(shí)mq是阻塞的归粉,持久化消息不會觸發(fā)pageout
mq的消息存在內(nèi)存中,易丟失漏峰,需要采用數(shù)據(jù)持久化和改變隊(duì)列模式(
lazy queue
)來解決消息丟失和阻塞問題
lazy queue
優(yōu)化了消息寫入磁盤的效率糠悼。保證隊(duì)列和消息都持久化,再使用Lazy Queue
隊(duì)列模式保證消息的可靠性浅乔,但內(nèi)存只會保存最近的消息
聲明Lazy Queue
的兩種方式代碼
@Bean
public Queue queue() {
return QueueBuilder.durable("fanout.queue1").lazy().build();
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "wl.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"},
arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void direct1ReceivedMsg(String msg) {
log.info("消費(fèi)者1可以收到routingKey為blue和red的消息:{}", msg);
}
消息持久化代碼
@Test
void persistentSendMsg() {
Message message = MessageBuilder
.withBody("hello world".getBytes())
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
String exchangeName = "wl.topic";
rabbitTemplate.convertAndSend(exchangeName, "china.weather", message);
}
九倔喂、死信交換機(jī)
當(dāng)一個(gè)隊(duì)列中的消息滿足下列情況之一時(shí)铝条,就會成為死信
dead letter
- 消費(fèi)者使用
basic.reject
或basic.nack
聲明消費(fèi)失敗,并且消息的requeue參數(shù)設(shè)置為false- 消息是一個(gè)過期消息(達(dá)到了隊(duì)列或消息本身設(shè)置的過期時(shí)間)席噩,超時(shí)無人消費(fèi)
- 要投遞的隊(duì)列消息堆積滿了班缰,最早的消息可能成為死信
如果隊(duì)列通過
dead-letter-exchange
屬性指定了一個(gè)交換機(jī),那么該隊(duì)列中的死信就會投遞到這個(gè)交換機(jī)中悼枢,這個(gè)交換機(jī)被稱為死信交換機(jī)(dead letter exchange
埠忘,簡稱DLX)死信交換機(jī)可以實(shí)現(xiàn)延遲消息的效果,但官網(wǎng)提供了延遲消息的插件馒索,更方便實(shí)用
十莹妒、延遲消息
1. 安裝插件 rabbitmq_delayed_message_exchange,安裝對應(yīng)mq的版本即可绰上,將其放到plugins目錄下執(zhí)行以下命令旨怠,并重啟mq服務(wù)。官網(wǎng)地址:https://www.rabbitmq.com/community-plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
2. 生產(chǎn)者代碼蜈块,設(shè)置messageProperties
的delay
即可鉴腻,單位ms
@Test
void testSendDelayMsg() {
String exchangeName = "wl.delay.exchange";
Student student = Student.builder()
.studentId(111)
.studentName("張三")
.gender("男")
.age(24)
.tel("13265659898")
.clazzId(4)
.build();
// rabbitTemplate.convertAndSend(exchangeName, "delayMsg", student, message -> {
// message.getMessageProperties().setDelay(10000);
// return message;
// });
rabbitTemplate.convertAndSend(exchangeName, "delayMsg", student, getMessagePostProcessor(10000));
log.info("發(fā)送延遲消息:{}", student);
}
public MessagePostProcessor getMessagePostProcessor(Integer delayTime) {
return message -> {
message.getMessageProperties().setDelay(delayTime);
return message;
};
}
3. 消費(fèi)者代碼,將交換機(jī)的delayed
屬性置true
即可
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue"),
exchange = @Exchange(name = "wl.delay.direct", type = ExchangeTypes.DIRECT, delayed = "true"),
key = {"delayMsg"}
))
public void delayReceivedMsg(Map<String, String> map) {
log.info("收到延遲消息:{}", map);
}