死信隊(duì)列可以實(shí)現(xiàn)消息在未被正常消費(fèi)的場(chǎng)景下醉者,對(duì)這些消息進(jìn)行其他處理,保證消息不會(huì)被丟棄烂瘫。
# 概念:
-
消息會(huì)變成死信消息的場(chǎng)景:
- 消息被
(basic.reject() or basic.nack()) and requeue = false
黔夭,即消息被消費(fèi)者拒絕簽收,并且重新入隊(duì)為false溯祸。
1.1 有一種場(chǎng)景需要注意下:消費(fèi)者設(shè)置了自動(dòng)ACK肢专,當(dāng)重復(fù)投遞次數(shù)達(dá)到了設(shè)置的最大retry次數(shù)之后,消息也會(huì)投遞到死信隊(duì)列焦辅,但是內(nèi)部的原理還是調(diào)用了nack
/reject
博杖。 - 消息過(guò)期,過(guò)了TTL存活時(shí)間筷登。
- 隊(duì)列設(shè)置了
x-max-length
最大消息數(shù)量且當(dāng)前隊(duì)列中的消息已經(jīng)達(dá)到了這個(gè)數(shù)量剃根,再次投遞,消息將被擠掉前方,被擠掉的是最靠近被消費(fèi)那一端的消息狈醉。
- 消息被
-
代碼編寫(xiě)流程是:
- 有一個(gè)(n個(gè))正常業(yè)務(wù)的Exchange,比如為
user-exchange
惠险。 - 有一個(gè)(n個(gè))正常業(yè)務(wù)的Queue舔糖,比如為
user-queue
。(因?yàn)樵撽?duì)列需要綁定死信交換機(jī)莺匠,所以需要加倆參數(shù):死信交換機(jī):x-dead-letter-exchange
金吗,死信消息路由鍵:x-dead-letter-routing-key
) - 進(jìn)行正常業(yè)務(wù)的交換機(jī)和隊(duì)列綁定。
- 定義一個(gè)死信交換機(jī),比如為
common-dead-letter-exchange
摇庙。 - 將正常業(yè)務(wù)的隊(duì)列綁定到死信交換機(jī)(隊(duì)列設(shè)置了
x-dead-letter-exchange
即會(huì)自動(dòng)綁定)旱物。 - 定義死信隊(duì)列
user-dead-letter-queue
用于接收死信消息,綁定死信交換機(jī)卫袒。
- 有一個(gè)(n個(gè))正常業(yè)務(wù)的Exchange,比如為
-
業(yè)務(wù)流程是:
- 正常業(yè)務(wù)消息被投遞到正常業(yè)務(wù)的Exchange宵呛,該Exchange根據(jù)路由鍵將消息路由到綁定的正常隊(duì)列。
- 正常業(yè)務(wù)隊(duì)列中的消息變成了死信消息之后夕凝,會(huì)被自動(dòng)投遞到該隊(duì)列綁定的死信交換機(jī)上(并帶上配置的路由鍵宝穗,如果沒(méi)有指定死信消息的路由鍵,則默認(rèn)繼承該消息在正常業(yè)務(wù)時(shí)設(shè)定的路由鍵)码秉。
- 死信交換機(jī)收到消息后逮矛,將消息根據(jù)路由規(guī)則路由到指定的死信隊(duì)列。
- 消息到達(dá)死信隊(duì)列后转砖,可監(jiān)聽(tīng)該死信隊(duì)列须鼎,處理死信消息。
死信交換機(jī)
府蔗、死信隊(duì)列
也是普通的交換機(jī)和隊(duì)列晋控,只不過(guò)是我們?nèi)藶榈膶⒛硞€(gè)交換機(jī)和隊(duì)列來(lái)處理死信消息。流程圖
# 代碼實(shí)現(xiàn)
- 配置
spring:
application:
name: learn-rabbitmq
rabbitmq:
host: localhost
port: 5672
username: futao
password: 123456789
virtual-host: deadletter-vh
connection-timeout: 15000
# 發(fā)送確認(rèn)
publisher-confirms: true
# 路由失敗回調(diào)
publisher-returns: true
template:
# 必須設(shè)置成true 消息路由失敗通知監(jiān)聽(tīng)者姓赤,而不是將消息丟棄
mandatory: true
listener:
simple:
# 每次從RabbitMQ獲取的消息數(shù)量
prefetch: 1
default-requeue-rejected: false
# 每個(gè)隊(duì)列啟動(dòng)的消費(fèi)者數(shù)量
concurrency: 1
# 每個(gè)隊(duì)列最大的消費(fèi)者數(shù)量
max-concurrency: 1
# 簽收模式為手動(dòng)簽收-那么需要在代碼中手動(dòng)ACK
acknowledge-mode: manual
app:
rabbitmq:
# 隊(duì)列定義
queue:
# 正常業(yè)務(wù)隊(duì)列
user: user-queue
# 死信隊(duì)列
user-dead-letter: user-dead-letter-queue
# 交換機(jī)定義
exchange:
# 正常業(yè)務(wù)交換機(jī)
user: user-exchange
# 死信交換機(jī)
common-dead-letter: common-dead-letter-exchange
- 隊(duì)列赡译、交換機(jī)定義與綁定。
/**
* 隊(duì)列與交換機(jī)定義與綁定
*
* @author futao
* @date 2020/4/7.
*/
@Configuration
public class Declare {
/**
* 用戶隊(duì)列
*
* @param userQueueName 用戶隊(duì)列名
* @return
*/
@Bean
public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName,
@Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) {
return QueueBuilder
.durable(userQueueName)
//聲明該隊(duì)列的死信消息發(fā)送到的 交換機(jī) (隊(duì)列添加了這個(gè)參數(shù)之后會(huì)自動(dòng)與該交換機(jī)綁定不铆,并設(shè)置路由鍵蝌焚,不需要開(kāi)發(fā)者手動(dòng)設(shè)置)
.withArgument("x-dead-letter-exchange", commonDeadLetterExchange)
//聲明該隊(duì)列死信消息在交換機(jī)的 路由鍵
.withArgument("x-dead-letter-routing-key", "user-dead-letter-routing-key")
.build();
}
/**
* 用戶交換機(jī)
*
* @param userExchangeName 用戶交換機(jī)名
* @return
*/
@Bean
public Exchange userExchange(@Value("${app.rabbitmq.exchange.user}") String userExchangeName) {
return ExchangeBuilder
.topicExchange(userExchangeName)
.durable(true)
.build();
}
/**
* 用戶隊(duì)列與交換機(jī)綁定
*
* @param userQueue 用戶隊(duì)列名
* @param userExchange 用戶交換機(jī)名
* @return
*/
@Bean
public Binding userBinding(Queue userQueue, Exchange userExchange) {
return BindingBuilder
.bind(userQueue)
.to(userExchange)
.with("user.*")
.noargs();
}
/**
* 死信交換機(jī)
*
* @param commonDeadLetterExchange 通用死信交換機(jī)名
* @return
*/
@Bean
public Exchange commonDeadLetterExchange(@Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) {
return ExchangeBuilder
.topicExchange(commonDeadLetterExchange)
.durable(true)
.build();
}
/**
* 用戶隊(duì)列的死信消息 路由的隊(duì)列
* 用戶隊(duì)列user-queue的死信投遞到死信交換機(jī)`common-dead-letter-exchange`后再投遞到該隊(duì)列
* 用這個(gè)隊(duì)列來(lái)接收user-queue的死信消息
*
* @return
*/
@Bean
public Queue userDeadLetterQueue(@Value("${app.rabbitmq.queue.user-dead-letter}") String userDeadLetterQueue) {
return QueueBuilder
.durable(userDeadLetterQueue)
.build();
}
/**
* 死信隊(duì)列綁定死信交換機(jī)
*
* @param userDeadLetterQueue user-queue對(duì)應(yīng)的死信隊(duì)列
* @param commonDeadLetterExchange 通用死信交換機(jī)
* @return
*/
@Bean
public Binding userDeadLetterBinding(Queue userDeadLetterQueue, Exchange commonDeadLetterExchange) {
return BindingBuilder
.bind(userDeadLetterQueue)
.to(commonDeadLetterExchange)
.with("user-dead-letter-routing-key")
.noargs();
}
}
- 定義好之后啟動(dòng)程序,springboot會(huì)讀取Spring容器中類(lèi)型為Queue和Exchange的bean進(jìn)行隊(duì)列和交換機(jī)的初始化與綁定狂男。當(dāng)然也可以自己在RabbitMQ的管理后臺(tái)進(jìn)行手動(dòng)創(chuàng)建與綁定综看。
-
查看管理后臺(tái)
交換機(jī)
隊(duì)列
隊(duì)列與死信交換機(jī)
# 測(cè)試
- 消息生產(chǎn)者
/**
* @author futao
* @date 2020/4/7.
*/
@Component
public class DeadLetterSender {
@Autowired
private RabbitTemplate rabbitTemplate;
@Value("${app.rabbitmq.exchange.user}")
private String userExchange;
public void send() {
User user = User.builder()
.userName("天文")
.address("浙江杭州")
.birthday(LocalDate.now(ZoneOffset.ofHours(8)))
.build();
rabbitTemplate.convertAndSend(userExchange, "user.abc", user);
}
}
1. 場(chǎng)景1.1
消息被(basic.reject() or basic.nack()) and requeue = false品腹,即消息被消費(fèi)者拒絕或者nack岖食,并且重新入隊(duì)為false。
nack()與reject()的區(qū)別是:reject()不支持批量拒絕舞吭,而nack()可以.
- 消費(fèi)者代碼
/**
* @author futao
* @date 2020/4/9.
*/
@Slf4j
@Component
public class Consumer {
/**
* 正常用戶隊(duì)列消息監(jiān)聽(tīng)消費(fèi)者
*
* @param user
* @param message
* @param channel
*/
@RabbitListener(queues = "${app.rabbitmq.queue.user}")
public void userConsumer(User user, Message message, Channel channel) {
log.info("正常用戶業(yè)務(wù)監(jiān)聽(tīng):接收到消息:[{}]", JSON.toJSONString(user));
try {
//參數(shù)為:消息的DeliveryTag泡垃,是否批量拒絕,是否重新入隊(duì)
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
log.info("拒絕簽收...消息的路由鍵為:[{}]", message.getMessageProperties().getReceivedRoutingKey());
} catch (IOException e) {
log.error("消息拒絕簽收失敗", e);
}
}
/**
* @param user
* @param message
* @param channel
*/
@RabbitListener(queues = "${app.rabbitmq.queue.user-dead-letter}")
public void userDeadLetterConsumer(User user, Message message, Channel channel) {
log.info("接收到死信消息:[{}]", JSON.toJSONString(user));
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("死信隊(duì)列簽收消息....消息路由鍵為:[{}]", message.getMessageProperties().getReceivedRoutingKey());
} catch (IOException e) {
log.error("死信隊(duì)列消息簽收失敗", e);
}
}
}
- 可以看到羡鸥,正常消息被NACK之后最后到了死信隊(duì)列蔑穴,且路由鍵發(fā)生了變化。
死信消息
1. 場(chǎng)景1.2
消費(fèi)者設(shè)置了自動(dòng)簽收惧浴,當(dāng)重復(fù)投遞次數(shù)達(dá)到了設(shè)置的最大retry次數(shù)之后存和,消息也會(huì)投遞到死信隊(duì)列,但是內(nèi)部的原理還是調(diào)用了
nack
/reject
。
- application.yml中需要更改一些配置
spring:
application:
name: learn-rabbitmq
rabbitmq:
listener:
simple:
# 每次從RabbitMQ獲取的消息數(shù)量
prefetch: 1
default-requeue-rejected: false
# 每個(gè)隊(duì)列啟動(dòng)的消費(fèi)者數(shù)量
concurrency: 1
# 每個(gè)隊(duì)列最大的消費(fèi)者數(shù)量
max-concurrency: 1
# 自動(dòng)簽收
acknowledge-mode: auto
retry:
enabled: true
# 第一次嘗試時(shí)間間隔
initial-interval: 10S
# 兩次嘗試之間的最長(zhǎng)持續(xù)時(shí)間捐腿。
max-interval: 10S
# 最大重試次數(shù)(=第一次正常投遞1+重試次數(shù)4)
max-attempts: 5
# 上一次重試時(shí)間的乘數(shù)
multiplier: 1.0
- 消費(fèi)者代碼
/**
* @author futao
* @date 2020/4/9.
*/
@Slf4j
@Configuration
public class AutoAckConsumer {
/**
* 正常用戶隊(duì)列消息監(jiān)聽(tīng)消費(fèi)者
*
* @param user
*/
@RabbitListener(queues = "${app.rabbitmq.queue.user}")
public void userConsumer(User user) {
log.info("正常用戶業(yè)務(wù)監(jiān)聽(tīng):接收到消息:[{}]", JSON.toJSONString(user));
throw new RuntimeException("模擬發(fā)生異常");
}
/**
* @param user
*/
@RabbitListener(queues = "${app.rabbitmq.queue.user-dead-letter}")
public void userDeadLetterConsumer(User user) {
log.info("接收到死信消息并自動(dòng)簽收:[{}]", JSON.toJSONString(user));
}
}
-
測(cè)試結(jié)果:
image.png
image.png 從測(cè)試結(jié)果可以看出纵朋,消息如果未被正常消費(fèi),則進(jìn)行重試茄袖,如果最終還未被正常消費(fèi)操软,則會(huì)被投遞到死信隊(duì)列。
initial-interval
,max-interval
這兩個(gè)參數(shù)啥作用不知道宪祥,現(xiàn)在測(cè)試的結(jié)果是一直都會(huì)取最短的那個(gè)時(shí)間作為下次投遞時(shí)間...
2. 測(cè)試場(chǎng)景 2
消息過(guò)期聂薪,過(guò)了TTL存活時(shí)間。
- 需要修改隊(duì)列定義蝗羊,設(shè)置隊(duì)列消息的過(guò)期時(shí)間
x-message-ttl
.
/**
* 用戶隊(duì)列
*
* @param userQueueName 用戶隊(duì)列名
* @return
*/
@Bean
public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName,
@Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) {
return QueueBuilder
.durable(userQueueName)
//聲明該隊(duì)列的死信消息發(fā)送到的 交換機(jī) (隊(duì)列添加了這個(gè)參數(shù)之后會(huì)自動(dòng)與該交換機(jī)綁定藏澳,并設(shè)置路由鍵,不需要開(kāi)發(fā)者手動(dòng)設(shè)置)
.withArgument("x-dead-letter-exchange", commonDeadLetterExchange)
//聲明該隊(duì)列死信消息在交換機(jī)的 路由鍵
.withArgument("x-dead-letter-routing-key", "user-dead-letter-routing-key")
//該隊(duì)列的消息的過(guò)期時(shí)間-超過(guò)這個(gè)時(shí)間還未被消費(fèi)則路由到死信隊(duì)列
.withArgument("x-message-ttl", 5000)
.build();
}
-
把
user-queue
的消費(fèi)者注釋?zhuān)瓜o(wú)法被消費(fèi)肘交,直到消息在隊(duì)列中的時(shí)間達(dá)到設(shè)定的存活時(shí)間笆载。
ttl -
根據(jù)日志可以看到,消息在5S后會(huì)被投遞到死信隊(duì)列涯呻。
image.png 注意:可以給隊(duì)列設(shè)置消息過(guò)期時(shí)間凉驻,那么所有投遞到這個(gè)隊(duì)列的消息都自動(dòng)具有這個(gè)屬性。還可以在消息投遞之前复罐,給每條消息設(shè)定指定的過(guò)期時(shí)間涝登。(當(dāng)兩者都設(shè)置了,則默認(rèn)取較短的值)
下面測(cè)試給每條消息設(shè)置指定的過(guò)期時(shí)間:
- 修改消息生產(chǎn)者:
/**
* @author futao
* @date 2020/4/7.
*/
@Slf4j
@Component
public class DeadLetterSender {
@Autowired
private RabbitTemplate rabbitTemplate;
@Value("${app.rabbitmq.exchange.user}")
private String userExchange;
public void send(String exp) {
User user = User.builder()
.userName("天文")
.address("浙江杭州")
.birthday(LocalDate.now(ZoneOffset.ofHours(8)))
.build();
log.info("消息投遞...指定的存活時(shí)長(zhǎng)為:[{}]ms", exp);
rabbitTemplate.convertAndSend(userExchange, "user.abc", user, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties messageProperties = message.getMessageProperties();
//為每條消息設(shè)定過(guò)期時(shí)間
messageProperties.setExpiration(exp);
return message;
}
});
}
}
- 從測(cè)試結(jié)果可以看出效诅,每條消息都在指定的時(shí)間投遞到了死信隊(duì)列胀滚。
【坑】重點(diǎn)注意!!!:RabbitMQ對(duì)于消息過(guò)期的檢測(cè):只會(huì)檢測(cè)最近將要被消費(fèi)的那條消息是否到達(dá)了過(guò)期時(shí)間,不會(huì)檢測(cè)非末端消息是否過(guò)期乱投。造成的問(wèn)題是:非末端消息已經(jīng)過(guò)期了咽笼,但是因?yàn)槟┒讼⑦€未過(guò)期,非末端消息處于阻塞狀態(tài)戚炫,所以非末端消息不會(huì)被檢測(cè)到已經(jīng)過(guò)期剑刑。使業(yè)務(wù)產(chǎn)生與預(yù)期嚴(yán)重不一致的結(jié)果。
-
對(duì)上面的問(wèn)題進(jìn)行測(cè)試:(第一條消息的過(guò)期時(shí)間設(shè)置成10S双肤,第二條消息設(shè)置成5S)
image.png - 從測(cè)試結(jié)果可以看出施掏,id為1的消息存活時(shí)長(zhǎng)為10S,id為2的消息存活時(shí)間為5S茅糜。但是只有當(dāng)?shù)谝粭l消息(id=1)過(guò)期之后七芭,id=2的消息到達(dá)隊(duì)列末端,才會(huì)被檢測(cè)到已經(jīng)過(guò)期蔑赘。
3. 測(cè)試場(chǎng)景3
隊(duì)列設(shè)置了
x-max-length
最大消息數(shù)量且當(dāng)前隊(duì)列中的消息已經(jīng)達(dá)到了這個(gè)數(shù)量狸驳,再次投遞预明,消息將被擠掉,被擠掉的是最靠近被消費(fèi)那一端的消息耙箍。
- 修改隊(duì)列定義
/**
* 用戶隊(duì)列
*
* @param userQueueName 用戶隊(duì)列名
* @return
*/
@Bean
public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName,
@Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) {
return QueueBuilder
.durable(userQueueName)
//聲明該隊(duì)列的死信消息發(fā)送到的 交換機(jī) (隊(duì)列添加了這個(gè)參數(shù)之后會(huì)自動(dòng)與該交換機(jī)綁定贮庞,并設(shè)置路由鍵,不需要開(kāi)發(fā)者手動(dòng)設(shè)置)
.withArgument("x-dead-letter-exchange", commonDeadLetterExchange)
//聲明該隊(duì)列死信消息在交換機(jī)的 路由鍵
.withArgument("x-dead-letter-routing-key", "user-dead-letter-routing-key")
//隊(duì)列最大消息數(shù)量
.withArgument("x-max-length", 2)
.build();
}
-
向隊(duì)列中投遞消息
image.png -
從結(jié)果可以看出究西,當(dāng)投遞第3條消息的時(shí)候窗慎,RabbitMQ會(huì)把在最靠經(jīng)被消費(fèi)那一端的消息移出隊(duì)列,并投遞到死信隊(duì)列卤材。
image.png
隊(duì)列中將始終保持最多兩個(gè)消息粘秆。
# 其他:
-
Queue的可配置項(xiàng)可在RabbitMQ的管理后臺(tái)查看:
image.png - 源碼:https://github.com/FutaoSmile/springboot-learn-integration/tree/master/springboot-learn-rabbitmq
# 相關(guān):
SpringBoot RabbitMQ實(shí)現(xiàn)消息可靠投遞
# TODO:
- 消費(fèi)端限流保護(hù)
- 延遲隊(duì)列