1.什么是死信(Dead Letter)屠尊?
官方文檔:https://www.rabbitmq.com/dlx.html
一個(gè)消息變成死信的條件有:
1.消費(fèi)被拒絕(basic.reject 或者 basic.nack)捶障,并且參數(shù) requeue = false 時(shí)
2.消息TTL(存活時(shí)間)過(guò)期
3.隊(duì)列達(dá)到最大長(zhǎng)度
rabbitMQ對(duì)于死信消息的處理是:如果配置了死信隊(duì)列蝗砾,成為死信的消息會(huì)被丟進(jìn)死信隊(duì)列,如果沒(méi)有則被丟棄溺忧。
2.配置死信隊(duì)列
死信隊(duì)列不是特殊的隊(duì)列憋飞,它只是綁定在了死信交換機(jī)上而已搭幻,并且死信交換機(jī)也不是特殊的交換機(jī),它也只是用來(lái)接收死信消息的
哦
一般分為以下步驟:
1.配置業(yè)務(wù)隊(duì)列,綁定到業(yè)務(wù)交換機(jī)上
2.為業(yè)務(wù)隊(duì)列配置死信交換機(jī)和路由
3.配置死信隊(duì)列券勺,綁定到死信交換機(jī)上
模擬其中一種條件來(lái)進(jìn)行實(shí)戰(zhàn)绪钥!
配置類(lèi):
@Configuration
public class RabbitMQConfig {
/**
* 業(yè)務(wù)交換機(jī)
*/
public static final String BUSINESS_EXCHANGE_NAME = "business.exchange";
/**
* 業(yè)務(wù)隊(duì)列
*/
public static final String BUSINESS_QUEUE_NAME = "business.queue";
/**
* 死信交換機(jī)
*/
public static final String DEAD_EXCHANGE_NAME = "dead.exchange";
/**
* 死信隊(duì)列
*/
public static final String DEAD_QUEUE_NAME = "dead.queue";
public static final String DEAD_ROUTING_KEY = "dead.routing.key";
/**
* 聲明業(yè)務(wù)交換機(jī)
**/
@Bean("businessExchange")
public FanoutExchange businessExchange(){
return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
}
/**
* 聲明業(yè)務(wù)隊(duì)列
*
**/
@Bean("businessQueue")
public Queue businessQueue(){
Map<String, Object> args = new HashMap<>(2);
//聲明綁定的死信交換機(jī)
args.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
//聲明死信路由key
args.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
return QueueBuilder.durable(BUSINESS_QUEUE_NAME).withArguments(args).build();
}
/**
* 業(yè)務(wù)隊(duì)列綁定到業(yè)務(wù)交換機(jī)
*
**/
@Bean
public Binding businessBind(@Qualifier("businessQueue")Queue businessQueue,
@Qualifier("businessExchange")FanoutExchange businessExchange){
return BindingBuilder.bind(businessQueue).to(businessExchange);
}
/**
* 聲明死信交換機(jī)
**/
@Bean("deadExchange")
public DirectExchange deadExchange(){
return new DirectExchange(DEAD_EXCHANGE_NAME);
}
/**
* 聲明死信隊(duì)列
*
**/
@Bean("deadQueue")
public Queue deadQueue(){
return new Queue(DEAD_QUEUE_NAME);
}
/**
* 死信隊(duì)列綁定到死信交換機(jī)
*
**/
@Bean
public Binding deadBind(@Qualifier("deadQueue")Queue deadQueue,
@Qualifier("deadExchange")DirectExchange deadExchange){
return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY);
}
}
exchange
businessExchange
deadExchange
業(yè)務(wù)隊(duì)列監(jiān)聽(tīng)
@Component
public class BusinessListener {
Logger logger = LoggerFactory.getLogger(getClass());
@RabbitListener(queues = RabbitMQConfig.BUSINESS_QUEUE_NAME)
public void receive(Message message, Channel channel) throws Exception{
String msg = new String(message.getBody());
logger.info("收到消息:" + msg);
if (msg.contains("dead")){
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}else {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
}
死信隊(duì)列監(jiān)聽(tīng)
@Component
public class DeadListener {
Logger logger = LoggerFactory.getLogger(getClass());
@RabbitListener(queues = RabbitMQConfig.DEAD_QUEUE_NAME)
public void receive(Message message, Channel channel) throws Exception{
String msg = new String(message.getBody());
logger.info("收到消息:" + msg);
}
}
寫(xiě)一個(gè)controller來(lái)生產(chǎn)消息,方便測(cè)試
@RestController
@RequestMapping("my")
public class MyController {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostMapping("send")
public void send(@RequestParam("msg")String msg){
rabbitTemplate.convertSendAndReceive(RabbitMQConfig.BUSINESS_EXCHANGE_NAME,"", msg);
}
}
發(fā)送一個(gè)消息
發(fā)送消息“msg”
結(jié)果
結(jié)果
發(fā)送一個(gè)死信消息
發(fā)送消息“deadmsg”
結(jié)果
結(jié)果