流程是這樣的登夫,訂閱者很钓,發(fā)送消息到test交換機(jī)香府,通過route key 分發(fā)到綁定的隊(duì)列,這里涉及到交換機(jī)的類型码倦,可以看我上一篇文章企孩。如果沒有匹配到這個(gè)routeKey就默認(rèn)發(fā)送到AE交換機(jī)(fanout模式),這個(gè)交換機(jī)要設(shè)置
internal:true
意為內(nèi)部交換機(jī) 袁稽。AE交換機(jī)再把錯(cuò)誤的消息勿璃,發(fā)送到其綁定的隊(duì)列中,如果test交換機(jī),發(fā)送消息被匹配到的隊(duì)里中补疑,而處理該隊(duì)列的訂閱者闻葵,拒絕了或者超時(shí)了處理,test交換機(jī)就將該消息發(fā)送到就死信交換機(jī)癣丧,然后到死信隊(duì)列中
一槽畔、 進(jìn)入死信隊(duì)列(進(jìn)入死信的三種方式)
- 1.消息被拒絕(basic.reject or basic.nack)并且requeue=false
- 2.消息TTL過期
- 3.隊(duì)列達(dá)到最大長度
代碼演示
- channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒絕消息
- true 發(fā)送給下一個(gè)消費(fèi)者
- false 誰都不接受,從隊(duì)列中刪除
Rabbit設(shè)置
- 1.設(shè)置AE交換機(jī) 設(shè)置為內(nèi)部交換機(jī)胁编,模式為
fanout
當(dāng)發(fā)送到正常交換機(jī)消息厢钧,沒有被匹配到route key的消息對(duì)進(jìn)到改交換機(jī)
FanoutExchange fanoutExchange=new FanoutExchange("alter");
fanoutExchange.setInternal(true);//設(shè)置為內(nèi)部交換機(jī),作為處理了非法的消息,無法匹配到route key的消息
- 為AE交換機(jī)綁定隊(duì)列 `alter_message`
- 2.設(shè)置處理正常的交換機(jī)
test
綁定參數(shù),設(shè)置沒有匹配 route key
的消息發(fā)送到AE交換機(jī) alternate-exchange
-
3.添加正常的隊(duì)列
hello 測試處理正常邏輯
task_queue 模擬被拒絕的消息
添加超時(shí)時(shí)間和死信交換機(jī)和rk
x-dead-letter-exchange: dead_letter_exchange
x-dead-letter-routing-key: task_queue.fail
x-message-ttl: 600
-
4.設(shè)置死信交換機(jī)
dead_letter_exchange
- 另外創(chuàng)建死信隊(duì)列
dead
- 綁定 route key
task_queue.fail
- 另外創(chuàng)建死信隊(duì)列
死信交換機(jī)
模擬死信隊(duì)列
代碼實(shí)例 Python
import pika
#認(rèn)證嬉橙,生產(chǎn)者
credentials = pika.PlainCredentials('guest', 'guest')
#鏈接rabbit服務(wù)器(localhost是本機(jī)早直,如果是其他服務(wù)器請修改為ip地址)
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1',5672,'/',credentials))
#通過tcp協(xié)議獲取一個(gè)連接
channel = connection.channel()
#聲明一個(gè)對(duì)下列和賈環(huán)加
#channel.queue_declare(queue='hello')
#被hello接受了
channel.basic_publish(exchange='test',
routing_key='hello',
body='Hello World!')
#發(fā)送了一個(gè)沒有匹配的消息,匹配到了alter_message
channel.basic_publish(exchange='test',
routing_key='hello12312',
body='Hello World!')
#模擬一條雖然能被匹配到市框,但是無法消費(fèi)的消息霞扬,然后被發(fā)送到死信隊(duì)列消息
channel.basic_publish(exchange='test',
routing_key='task_queue',
body='Hello World!')
-
正常隊(duì)列
-
沒有匹配到的到
-
被拒絕或者超時(shí)進(jìn)入私信隊(duì)列的
使用代碼去創(chuàng)建隊(duì)列和交換機(jī) Java
@Bean
public ConnectionFactory connectionFactory() throws Exception {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1", 5672);
connectionFactory.setUsername("liuxin");
connectionFactory.setPassword("930914lx");
connectionFactory.setVirtualHost("az");
connectionFactory.setPublisherConfirms(true); // 必須要設(shè)置回調(diào)
Channel channel = connectionFactory.createConnection().createChannel(false);
//String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
Map<String, Object> arguments = new HashMap<>();
arguments.put("internal",true);
//String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
//設(shè)置AE交換機(jī)
channel.exchangeDeclare("alter", "fanout", false, false, false, arguments);
channel.queueDeclare("alter_message", false, false, false, null);
channel.queueBind("alter_message", "alter", "");
//聲明死信交換機(jī)并綁定
channel.exchangeDeclare("dead_letter_exchange", "direct", false, false, null);
channel.queueDeclare("dead", false, false, false, null);
channel.queueBind("dead", "dead_letter_exchange", "task_queue.fail");
arguments = new HashMap<>();
arguments.put("alternate-exchange", "alter");//指定AE交換機(jī)
channel.exchangeDeclare("test", "direct", false, false, arguments);
//聲明接受正式的隊(duì)列,不需要參數(shù)
channel.queueDeclare("hello", false, false, false, null);
channel.queueBind("hello", "test", "hello");
arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "dead_letter_exchange");
arguments.put("x-dead-letter-routing-key", "task_queue.fail");
arguments.put("x-message-ttl",6000);//6s沒有被處理枫振,就死了
//設(shè)置測試死信隊(duì)列的task_queue喻圃,推送該隊(duì)列里面,被拒絕會(huì)到dead_letter_exchange粪滤,并最終到dead斧拍,routeKey,task_queue.fail 為并設(shè)置死信隊(duì)列參數(shù)
channel.queueDeclare("task_queue", false, false, false, arguments);
channel.queueBind("task_queue", "test", "task_queue");
return connectionFactory;
}
/**
* 接受消息的監(jiān)聽杖小,這個(gè)監(jiān)聽客戶交易流水的消息
* 針對(duì)消費(fèi)者配置
*
* @return
*/
@Bean
public SimpleMessageListenerContainer messageContainer1(ConnectionFactory connectionFactory, PayMentConsumeImpl transactionConsume) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.addQueueNames("hello");
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(8);
container.setConcurrentConsumers(4);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設(shè)置確認(rèn)模式手工確認(rèn),當(dāng)設(shè)置了此模式肆汹,必須返回ACK,否則會(huì)進(jìn)入死信隊(duì)列
container.setMessageListener(transactionConsume);
container.setPrefetchCount(1000);
return container;
}