Alternate Exchange
Rabbitmq自己擴(kuò)展的功能熙宇,不是AMQP協(xié)議定義的。
Alternate Exchange
屬性的作用呼盆,創(chuàng)建Exchange
指定該Exchange
的Alternate Exchange
屬性擂达,發(fā)送消息的時(shí)候根據(jù)route key并沒(méi)有把消息路由到隊(duì)列中去,這就會(huì)將此消息路由到Alternate Exchange
屬性指定的Exchange
上了出刷。
自動(dòng)聲明帶有Alternate Exchange的Exchange,
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class MQConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
return rabbitAdmin;
}
@Bean
public Exchange exchange(){
Map<String,Object> argsMap = new HashMap<>();
argsMap.put("alternate-exchange","zhihao.miao.exchange.order");
return new TopicExchange("zhihao.miao.exchange.pay",true,false,argsMap);
}
@Bean
public Binding binding(){
return new Binding("zhihao.miao.pay",Binding.DestinationType.QUEUE,"zhihao.miao.exchange.pay","zhihao.miao.pay.*",new HashMap<>());
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
}
應(yīng)用啟動(dòng)類
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
@ComponentScan
public class Application {
public static void main(String[] args) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
//使得客戶端第一次連接rabbitmq
context.getBean(RabbitAdmin.class).getQueueProperties("**");
context.close();
}
}
啟動(dòng)應(yīng)用啟動(dòng)類之后生成一個(gè)帶有alternate-exchange
屬性的Exchange
坯辩。
zhihao.miao.exchange.pay
是個(gè)包含alternate-exchange
屬性的topic
類型的exchange
(route key是zhihao.miao.pay.*
馁龟,隊(duì)列名是zhihao.miao.pay
),alternate-exchange
屬性指定的是fanout
類型的exchange,exchange的名稱是zhihao.miao.exchange.order
(綁定到zhihao.miao.order
隊(duì)列)
如果正確的路由(符合zhihao.miao.pay.*
)規(guī)則漆魔,則zhihao.miao.pay
隊(duì)列接收到消息坷檩。如果是不正確的路由(不符合zhihao.miao.pay.*
)規(guī)則,則路由到zhihao.miao.exchange.pay
Exchange指定的alternate-exchange
屬性的Exchange中有送。
測(cè)試
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MQConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
}
啟動(dòng)應(yīng)用類:
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import java.util.concurrent.TimeUnit;
@ComponentScan
public class Application {
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
System.out.println(rabbitTemplate);
byte[] body = "hello,zhihao.miao".getBytes();
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("json");
Message message = new Message(body,messageProperties);
rabbitTemplate.send("zhihao.miao.exchange.pay","zhihao.miao.pay.aaa",message);
TimeUnit.SECONDS.sleep(3);
context.close();
}
}
此時(shí)發(fā)送消息到名為zhihao.miao.exchange.pay
的Exchange
淌喻,而Route key
是zhihao.miao.pay.aaa
僧家,所以能正確地路由到zhihao.miao.pay
隊(duì)列中雀摘。
當(dāng)指定的Route key
不能正確的路由的時(shí)候,則直接發(fā)送到名為zhihao.miao.exchange.order
的Exchange
八拱,而因?yàn)槲覀兌x的Exchange
類型是fanout
類型阵赠,所以就能路由到zhihao.miao.order
隊(duì)列中了涯塔。
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import java.util.concurrent.TimeUnit;
@ComponentScan
public class Application {
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
System.out.println(rabbitTemplate);
byte[] body = "hello,zhihao.miao".getBytes();
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("json");
Message message = new Message(body,messageProperties);
//此時(shí)route不到,那么就路由到alternate-exchange的屬性配置的exchage
rabbitTemplate.send("zhihao.miao.exchange.pay","hehe.zhihao.miao",message);
TimeUnit.SECONDS.sleep(3);
context.close();
}
}
一般alternate-exchange
屬性的值最好是fanout
類型的exchange
清蚀,否則還會(huì)根據(jù)route key
與alternate-exchange
屬性的exchange
進(jìn)行匹配再去路由匕荸。而如果指定了fanout
類型的exchange
,不需要去匹配routekey
枷邪。
alternate-exchange配置的Exchange也不能正確路由
示列說(shuō)明
創(chuàng)建了一個(gè)topic
類型的Exchange帶有alternate-exchange
屬性榛搔,其alternate-exchange
的exchange
也是topic
類型的exchange
,如果消息的route key
既不能,這個(gè)消息就會(huì)丟失东揣〖螅可以觸發(fā)publish confirm
機(jī)制,表示這個(gè)消息沒(méi)有確認(rèn)嘶卧。
配置:
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MQConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
}
正常路由到Exchange名為head.info路由的隊(duì)列中尔觉。
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import java.util.concurrent.TimeUnit;
@ComponentScan
public class Application {
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
System.out.println(rabbitTemplate);
byte[] body = "hello,zhihao.miao".getBytes();
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("json");
Message message = new Message(body,messageProperties);
//正確路由到header.info隊(duì)列
rabbitTemplate.send("head.info","head.info.a",message);
TimeUnit.SECONDS.sleep(3);
context.close();
}
}
路由到Exchange名為head.info指定的alternate-exchange配置的head.error所路由的隊(duì)列中。
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import java.util.concurrent.TimeUnit;
@ComponentScan
public class Application {
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
System.out.println(rabbitTemplate);
byte[] body = "hello,zhihao.miao".getBytes();
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("json");
Message message = new Message(body,messageProperties);
//正確路由到header.info隊(duì)列
rabbitTemplate.send("head.info","head.error.a",message);
TimeUnit.SECONDS.sleep(3);
context.close();
}
}
二者都不符合則消息丟失芥吟,可以使用publish confirm來(lái)做生產(chǎn)端的消息確認(rèn),因?yàn)橄](méi)有正確路由到隊(duì)列侦铜,所以觸發(fā)了return method。
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import java.util.concurrent.TimeUnit;
@ComponentScan
public class Application {
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
System.out.println(rabbitTemplate);
byte[] body = "hello".getBytes();
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("json");
Message message = new Message(body,messageProperties);
//正確路由到header.info隊(duì)列
rabbitTemplate.send("head.info","header.debug.a",message);
TimeUnit.SECONDS.sleep(30);
context.close();
}
}
配置:
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MQConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
factory.setPublisherReturns(true);
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
System.out.println("===========消息無(wú)法被路由=========");
System.out.println("replyCode: "+replyCode);
System.out.println("replyText: "+replyText);
System.out.println("exchange: "+exchange);
System.out.println("routingKey: "+routingKey);
});
return rabbitTemplate;
}
}
總結(jié)
- 建議
Alternate Exchange
的類型是fanout
钟鸵,防止出現(xiàn)路由失敗钉稍。
fanout exchange
一般不需要指定Alternate Exchange
屬性。 - 如果一個(gè)
Exchange
指定了Alternate Exchange
棺耍,那就意味著嫁盲,當(dāng)Exchange
和Alternate Exchange
都無(wú)法路由的時(shí)候,才會(huì)觸發(fā)return method
烈掠。