一、前言
發(fā)布訂閱模式族壳,即producer發(fā)送者,發(fā)布一個消息趣些,多個接收者都能獲取到同樣的消息仿荆。大致流程是,發(fā)送者將消息發(fā)送到指定的交換機,交換機將消息發(fā)布到綁定到該交換的所有隊列里拢操,接收者通過隊列獲取消息锦亦。
發(fā)布訂閱模式.png
二、發(fā)送者Producer
1.FanoutExchangeConfig.java
新建交換機庐冯,并將隊列A B C綁定到該交換機
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutExchangeConfig {
/**
* 新建交換機孽亲,并將隊列A B C綁定到該交換機
*/
public static final String FANOUT_EXCHANGE = "weather_fanout_exchange";
public static final String FANOUT_EXCHANGE_QUEUE_A = "fanout_exchange_queue_a";
public static final String FANOUT_EXCHANGE_QUEUE_B = "fanout_exchange_queue_b";
public static final String FANOUT_EXCHANGE_QUEUE_C = "fanout_exchange_queue_c";
/**
* 創(chuàng)建A隊列
*/
@Bean
public Queue queueA(){
return new Queue(FANOUT_EXCHANGE_QUEUE_A);
}
/**
* 創(chuàng)建B隊列
*/
@Bean
public Queue queueB(){
return new Queue(FANOUT_EXCHANGE_QUEUE_B);
}
/**
* 創(chuàng)建C隊列
*/
@Bean
public Queue queueC(){
return new Queue(FANOUT_EXCHANGE_QUEUE_C);
}
/**
* 創(chuàng)建fanout交換機
* @return
*/
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE);
}
/**
* 將A隊列綁定到交換機上
*/
@Bean
Binding bindingExchangeA() {
return BindingBuilder.bind(queueA()).to(fanoutExchange());
}
/**
* 將B隊列綁定到交換機上
*/
@Bean
Binding bindingExchangeB() {
return BindingBuilder.bind(queueB()).to(fanoutExchange());
}
/**
* 將C隊列綁定到交換機上
*/
@Bean
Binding bindingExchangeC() {
return BindingBuilder.bind(queueC()).to(fanoutExchange());
}
}
fanout交換機.png
2.發(fā)送消息到指定交換機
@RestController
@RequestMapping(value = "send")
public class ProducerController {
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("fanout-exchange")
public void sendExchange(){
String msg = "send msg to fanout exchange" + new Date().toString();
rabbitTemplate.convertAndSend(FanoutExchangeConfig.FANOUT_EXCHANGE, null, msg);
}
}
隊列上的消息.png
三、Consumer接收者
1.RabbitMqConfig.java
@Configuration
public class RabbitMqConfig {
public static final String FANOUT_EXCHANGE_QUEUE_A = "fanout_exchange_queue_a";
public static final String FANOUT_EXCHANGE_QUEUE_B = "fanout_exchange_queue_b";
public static final String FANOUT_EXCHANGE_QUEUE_C = "fanout_exchange_queue_c";
}
2.RabbitMqReceiver.java
@Component
public class RabbitMqReceiver {
private final static Logger logger = LoggerFactory.getLogger(RabbitMqReceiver.class);
@RabbitListener(queues = RabbitMqConfig.FANOUT_EXCHANGE_QUEUE_A)
public void receiverQueueA(String msg, Channel channel, Message message) throws IOException {
logger.info("receiverQueueA 接收到消息為:"+msg);
}
@RabbitListener(queues = RabbitMqConfig.FANOUT_EXCHANGE_QUEUE_B)
public void receiverQueueB(String msg, Channel channel, Message message) throws IOException {
logger.info("receiverQueueB 接收到消息為:"+msg);
}
@RabbitListener(queues = RabbitMqConfig.FANOUT_EXCHANGE_QUEUE_C)
public void receiverQueueC(String msg, Channel channel, Message message) throws IOException {
logger.info("receiverQueueC 接收到消息為:"+msg);
}
}
reciver.png
從日志可以看出來展父,每個隊列都能收到相同的消息
以上返劲!