Dead Letter Exchange
在隊(duì)列上指定一個(gè)Exchange,則在該隊(duì)列上發(fā)生如下情況,
1.消息被拒絕(basic.reject or basic.nack)系洛,且requeue=false
2.消息過(guò)期而被刪除(TTL)
3.消息數(shù)量超過(guò)隊(duì)列最大限制而被刪除
4.消息總大小超過(guò)隊(duì)列最大限制而被刪除
就會(huì)把該消息轉(zhuǎn)發(fā)到指定的這個(gè)exchange
同時(shí)也可以指定一個(gè)可選的x-dead-letter-routing-key
而晒,表示默認(rèn)的routing-key
,如果沒(méi)有指定,則使用消息的routeing-key
(也跟指定的exchange有關(guān)蜡娶,
如果是Fanout類型的exchange
混卵,則會(huì)轉(zhuǎn)發(fā)到所有綁定到該exchange
的所有隊(duì)列)。
拒絕消息或者nack
示列
定義一個(gè)隊(duì)列zhihao.miao.order
窖张,其有屬性x-dead-letter-exchange
是zhihao.miao.exchange.pay
幕随,往Exchange名為zhihao.miao.exchange.order
中發(fā)送消息。
往zhihao.miao.order
隊(duì)列中發(fā)送消息宿接,
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import java.util.concurrent.TimeUnit;
@ComponentScan
public class Application {
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
byte[] body = "hello,zhihao.miao".getBytes();
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("json");
Message message = new Message(body,messageProperties);
rabbitTemplate.send("zhihao.miao.exchange.order","zhihao.miao.order",message);
TimeUnit.SECONDS.sleep(30);
context.close();
}
}
配置類:
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MQConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
}
此時(shí)消息端拒絕消費(fèi)這個(gè)消息
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import java.util.concurrent.TimeUnit;
@ComponentScan
public class Application {
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
System.out.println(rabbitTemplate);
TimeUnit.SECONDS.sleep(30);
context.close();
}
}
配置類
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MQConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("zhihao.miao.order");
container.setDefaultRequeueRejected(false);
//手動(dòng)確認(rèn)
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener(new ChannelAwareMessageListener(){
@Override
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println("=====rece msg======");
System.out.println(new String(message.getBody()));
//執(zhí)行拒絕消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
}
});
return container;
}
}
因?yàn)槠渲付?code>x-dead-letter-exchange是zhihao.miao.exchange.pay
赘淮,所以會(huì)將消息轉(zhuǎn)發(fā)到zhihao.miao.exchange.pay
,而因?yàn)闆](méi)有指定x-dead-letter-routing-key
睦霎,所以會(huì)使用默認(rèn)的發(fā)送的消息的route key(zhihao.miao.order
)進(jìn)行路由梢卸,而我們zhihao.miao.exchange.pay
的路由信息如下,所以會(huì)將消息轉(zhuǎn)發(fā)到zhihao.miao.auto
隊(duì)列中去副女。
示列2
定義了隊(duì)列zhihao.miao.order
碑幅,不僅定義了x-dead-letter-exchange
屬性戴陡,也指定了x-dead-letter-routing-key
屬性
顯而易見(jiàn)當(dāng)拒絕了該消息的時(shí)候就會(huì)轉(zhuǎn)發(fā)到了zhihao.miao.exchange.pay
,而應(yīng)該該隊(duì)列指定了route key為zhihao.miao.pay
沟涨,所以轉(zhuǎn)發(fā)到了zhihao.miao.pay
隊(duì)列中去了恤批。
代碼很上面的一樣。
總結(jié)
上面的示列展示了當(dāng)定義隊(duì)列時(shí)指定了x-dead-letter-exchange
(x-dead-letter-routing-key
視情況而定)裹赴,并且消費(fèi)端執(zhí)行拒絕策略的時(shí)候?qū)⑾⒙酚傻街付ǖ腅xchange中去喜庞。我們知道還有二種情況會(huì)造成消息轉(zhuǎn)發(fā)到死信隊(duì)列。
一種是消息過(guò)期而被刪除棋返,可以使用這個(gè)方式使的rabbitmq實(shí)現(xiàn)延遲隊(duì)列的作用延都。還有一種就是消息數(shù)量超過(guò)隊(duì)列最大限制而被刪除或者消息總大小超過(guò)隊(duì)列最大限制而被刪除
參考資料
Dead Letter Exchanges
使用TTL(消息過(guò)期)來(lái)實(shí)現(xiàn)消息延遲