- 以下例子代碼可在github或者在gitee下載
github:代碼鏈接
gitee:代碼鏈接 - 如果對(duì)springboot 使用rabbitmq還不太熟悉的話可以看上一篇博文:springboot rabbitmq入門使用
RabbitMQ常用的交換器類型有fanout、direct、topic、headers這四種胚迫,其中headers實(shí)際很少用到谓厘。
fanout:把所有發(fā)送到該交換器的消息路由到所有與該交換器綁定的隊(duì)列中叮叹。
direct:匹配規(guī)則相對(duì)簡(jiǎn)單,把消息路由到交換機(jī)和路由鍵RoutingKey綁定的隊(duì)列中派阱。
topic:匹配規(guī)則靈活切诀,路由鍵RoutingKey可使用通配符" * " 和 “ # ”揩环,代表匹配一個(gè)單詞和任意單詞。
代碼目錄結(jié)構(gòu)如圖趾牧,分別演示三種交換機(jī)類型:
一检盼、fanout廣播消息模型
fanout廣播交換機(jī),當(dāng)有多個(gè)消息隊(duì)列需要監(jiān)聽(tīng)同個(gè)消息進(jìn)行不同的業(yè)務(wù)處理的時(shí)候翘单,那么可以采用廣播交換機(jī),一個(gè)廣播交換機(jī)蹦渣,多個(gè)消息隊(duì)列綁定該交換機(jī)哄芜,在發(fā)送消息的時(shí)候把消息發(fā)送到該交換機(jī)上,那么在監(jiān)聽(tīng)消息端多個(gè)消息隊(duì)列將監(jiān)聽(tīng)到該消息柬唯,如圖所示认臊。
(1)首先在RabbitmqConfig中創(chuàng)建兩個(gè)隊(duì)列,fanoutQueueOne和fanoutQueueTwo锄奢,交換機(jī)類型為FanoutExchange失晴,最后是兩個(gè)隊(duì)列與FanoutExchange交換機(jī)綁定起來(lái)剧腻。
@Slf4j
@Configuration
public class RabbitmqConfig {
/**
* 創(chuàng)建消息模型-fanoutExchange
*/
//廣播fanout消息模型-隊(duì)列1
@Bean
public Queue fanoutQueueOne(){
return new Queue(RabbitMqConstants.FANOUT_ONE_QUEUE,true);
}
//廣播fanout消息模型-隊(duì)列2
@Bean
public Queue fanoutQueueTwo(){
return new Queue(RabbitMqConstants.FANOUT_TWO_QUEUE,true);
}
//廣播fanout消息模型-創(chuàng)建交換機(jī)-fanoutExchange
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange(RabbitMqConstants.FANOUT_EXCHANGE,true,false);
}
//廣播fanout消息模型-創(chuàng)建綁定1
@Bean
public Binding fanoutBindingOne(){
return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange());
}
//廣播fanout消息模型-創(chuàng)建綁定2
@Bean
public Binding fanoutBindingTwo(){
return BindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange());
}
}
RabbitMqConstants常量值如下:
@Data
public class RabbitMqConstants {
//廣播fanoutExchange消息模型
public static final String FANOUT_ONE_QUEUE = "mq.fanout.one.queue";
public static final String FANOUT_TWO_QUEUE = "mq.fanout.two.queue";
public static final String FANOUT_EXCHANGE = "mq.fanout.exchange";
}
(2)啟動(dòng)項(xiàng)目,訪問(wèn)http://127.0.0.1:15672/可以看到我們?cè)O(shè)置的隊(duì)列交換機(jī)以及綁定的路由相關(guān)信息:
(3)fanout廣播消息模型-生產(chǎn)者FanoutPublisher涂屁,RabbitMQ發(fā)送消息的操作組件RabbitTemplate設(shè)置fanout廣播交換機(jī)书在,最后發(fā)送消息。
@Slf4j
@Component
public class FanoutPublisher {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 發(fā)送消息
* @param order 訂單消息
*/
public void sendMsg(Order order){
try {
//設(shè)置廣播式交換機(jī)FanoutExchange
rabbitTemplate.setExchange(RabbitMqConstants.FANOUT_EXCHANGE);
//發(fā)送消息
rabbitTemplate.convertAndSend(order);
//打印日志
log.info("消息模型fanoutExchange-生產(chǎn)者-發(fā)送消息:{} ", order);
}catch (Exception e){
log.error("消息模型fanoutExchange-生產(chǎn)者-發(fā)送消息:{},發(fā)生異常: ", order, e);
}
}
}
(4)fanout廣播消息模型-消費(fèi)者FanoutConsumer拆又,前面設(shè)置了兩個(gè)隊(duì)列儒旬,這里設(shè)置兩個(gè)隊(duì)列進(jìn)行監(jiān)聽(tīng)。
@Slf4j
@Component
public class FanoutConsumer {
/**
* 監(jiān)聽(tīng)并消費(fèi)隊(duì)列中的消息-fanoutExchange-one-這是第一條隊(duì)列對(duì)應(yīng)的消費(fèi)者
*/
@RabbitListener(queues = RabbitMqConstants.FANOUT_ONE_QUEUE,containerFactory = "singleListenerContainer")
public void consumeFanoutMsgOne(Order order){
try {
log.info("消息模型fanoutExchange-one-消費(fèi)者-監(jiān)聽(tīng)消費(fèi)到消息:{} ",order);
}catch (Exception e){
log.error("消息模型-消費(fèi)者-發(fā)生異常:",e);
}
}
/**
* 監(jiān)聽(tīng)并消費(fèi)隊(duì)列中的消息-fanoutExchange-two-這是第二條隊(duì)列對(duì)應(yīng)的消費(fèi)者
*/
@RabbitListener(queues = RabbitMqConstants.FANOUT_TWO_QUEUE,containerFactory = "singleListenerContainer")
public void consumeFanoutMsgTwo(Order order){
try {
log.info("消息模型fanoutExchange-two-消費(fèi)者-監(jiān)聽(tīng)消費(fèi)到消息:{} ",order);
}catch (Exception e){
log.error("消息模型-消費(fèi)者-發(fā)生異常:",e);
}
}
}
(5)最后調(diào)用test方法發(fā)送消息
@Test
public void testFanoutPublish() {
Order order = new Order();
order.setOrdernum("123456");
fanoutPublisher.sendMsg(order);
}
二帖族、direct直連傳輸消息模型
direct交換機(jī)相對(duì)嚴(yán)謹(jǐn)栈源,不像fanout廣播交換機(jī),direct交換機(jī)發(fā)送消息到消息隊(duì)列的時(shí)候有一個(gè)路由規(guī)則竖般,即路由鍵甚垦,這個(gè)路由鍵將指引交換機(jī)把消息指定到對(duì)應(yīng)的隊(duì)列之中進(jìn)行消費(fèi),在實(shí)際開(kāi)發(fā)中涣雕,direct交換機(jī)比較常用艰亮,當(dāng)有某個(gè)特定消息需要被某一個(gè)隊(duì)列進(jìn)行消費(fèi)處理的時(shí)候,可采用direct交換機(jī)胞谭。
(1)同樣在RabbitmqConfig 配置類中創(chuàng)建兩個(gè)隊(duì)列directQueueOne垃杖、directQueueTwo,由directExchange用分別用兩個(gè)路由鍵"mq.direct.routing.key.one"和"mq.direct.routing.key.two"綁定起來(lái)丈屹。
@Slf4j
@Configuration
public class RabbitmqConfig {
/**
* 創(chuàng)建消息模型-directExchange
*/
//直連傳輸direct消息模型-創(chuàng)建交換機(jī)-directExchange
@Bean
public DirectExchange directExchange(){
return new DirectExchange(RabbitMqConstants.DIRECT_EXCHANGE,true,false);
}
//直連傳輸direct消息模型-創(chuàng)建隊(duì)列1
@Bean
public Queue directQueueOne(){
return new Queue(RabbitMqConstants.DIRECT_ONE_QUEUE,true);
}
//直連傳輸direct消息模型-創(chuàng)建隊(duì)列2
@Bean
public Queue directQueueTwo(){
return new Queue(RabbitMqConstants.DIRECT_TWO_QUEUE,true);
}
//直連傳輸direct消息模型-創(chuàng)建綁定1
@Bean
public Binding directBindingOne(){
return BindingBuilder.bind(directQueueOne()).to(directExchange()).with(RabbitMqConstants.DIRECT_ONE_ROUTING_KEY);
}
//直連傳輸direct消息模型-創(chuàng)建綁定2
@Bean
public Binding directBindingTwo(){
return BindingBuilder.bind(directQueueTwo()).to(directExchange()).with(RabbitMqConstants.DIRECT_TWO_ROUTING_KEY);
}
}
RabbitMqConstants常量值如下:
@Data
public class RabbitMqConstants {
//直連directExchange消息模型
public static final String DIRECT_ONE_QUEUE = "mq.direct.one.queue";
public static final String DIRECT_TWO_QUEUE = "mq.direct.two.queue";
public static final String DIRECT_ONE_ROUTING_KEY = "mq.direct.routing.key.one";
public static final String DIRECT_TWO_ROUTING_KEY = "mq.direct.routing.key.two";
public static final String DIRECT_EXCHANGE = "mq.direct.exchange";
}
(2)啟動(dòng)項(xiàng)目调俘,訪問(wèn)http://127.0.0.1:15672/可以看到我們?cè)O(shè)置的隊(duì)列交換機(jī)以及綁定的路由相關(guān)信息:
(3)directExchange直連傳輸消息模型-生產(chǎn)者DirectPublisher
@Slf4j
@Component
public class DirectPublisher {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 發(fā)送消息-基于DirectExchange消息模型-one
*/
public void sendMsgDirectOne(Order order){
try {
//設(shè)置交換機(jī)
rabbitTemplate.setExchange(RabbitMqConstants.DIRECT_EXCHANGE);
//設(shè)置路由1
rabbitTemplate.setRoutingKey(RabbitMqConstants.DIRECT_ONE_ROUTING_KEY);
//發(fā)送消息
rabbitTemplate.convertAndSend(order);
//打印日志
log.info("消息模型DirectExchange-one-生產(chǎn)者-發(fā)送消息:{} ",order);
}catch (Exception e){
log.error("消息模型DirectExchange-one-生產(chǎn)者-發(fā)送消息:{},發(fā)生異常:{} ",order, e);
}
}
/**
* 發(fā)送消息-基于DirectExchange消息模型-two
*/
public void sendMsgDirectTwo(Order order){
try {
//設(shè)置交換機(jī)
rabbitTemplate.setExchange(RabbitMqConstants.DIRECT_EXCHANGE);
//設(shè)置路由2
rabbitTemplate.setRoutingKey(RabbitMqConstants.DIRECT_TWO_ROUTING_KEY);
//發(fā)送消息
rabbitTemplate.convertAndSend(order);
//打印日志
log.info("消息模型DirectExchange-two-生產(chǎn)者-發(fā)送消息:{} ",order);
}catch (Exception e){
log.error("消息模型DirectExchange-two-生產(chǎn)者-發(fā)送消息:{},發(fā)生異常:{} ",order, e);
}
}
}
(4)directExchange直連傳輸消息模型-消費(fèi)者DirectConsumer
@Slf4j
@Component
public class DirectConsumer {
/** 這是第一個(gè)路由綁定的對(duì)應(yīng)隊(duì)列的消費(fèi)者方法
* 監(jiān)聽(tīng)并消費(fèi)隊(duì)列中的消息-directExchange-one
*/
@RabbitListener(queues = RabbitMqConstants.DIRECT_ONE_QUEUE,containerFactory = "singleListenerContainer")
public void consumeDirectMsgOne(Order order){
try {
//打印日志消息
log.info("消息模型directExchange-one-消費(fèi)者-監(jiān)聽(tīng)消費(fèi)到消息:{} ",order);
}catch (Exception e){
log.error("消息模型directExchange-one-消費(fèi)者-監(jiān)聽(tīng)消費(fèi)發(fā)生異常:",e);
}
}
/**
* 這是第二個(gè)路由綁定的對(duì)應(yīng)隊(duì)列的消費(fèi)者方法
* 監(jiān)聽(tīng)并消費(fèi)隊(duì)列中的消息-directExchange-two
*/
@RabbitListener(queues = RabbitMqConstants.DIRECT_TWO_QUEUE, containerFactory = "singleListenerContainer")
public void consumeDirectMsgTwo(Order order) {
try {
//打印日志消息
log.info("消息模型directExchange-two-消費(fèi)者-監(jiān)聽(tīng)消費(fèi)到消息:{} ", order);
} catch (Exception e) {
log.error("消息模型directExchange-two-消費(fèi)者-監(jiān)聽(tīng)消費(fèi)發(fā)生異常:", e);
}
}
}
(5)最后調(diào)用test方法發(fā)送消息
@Test
public void testDirectPublish() {
Order order1 = new Order();
order1.setOrdernum("one-123456");
Order order2 = new Order();
order2.setOrdernum("tow-123456");
directPublisher.sendMsgDirectOne(order1);
directPublisher.sendMsgDirectTwo(order2);
}
三、topic主題消息模型
topic交換機(jī)相對(duì)靈活旺垒,路由鍵規(guī)則有通配符" * " 和 " # "符號(hào)代替了一個(gè)單詞和零或者多個(gè)單詞彩库,例如當(dāng)路由鍵有用通配符" * "符號(hào)的時(shí)候,即有一個(gè)路由鍵為“mq.topic.routing.key.*”先蒋,那么在發(fā)送消息的時(shí)候骇钦,生產(chǎn)者設(shè)置了路由鍵為“mq.topic.routing.key.one”、“mq.topic.routing.key.two”竞漾、“mq.topic.routing.key.three”等等眯搭,都可以將該消息發(fā)送到topic交換機(jī)路由鍵為“mq.topic.routing.key.”綁定的消息隊(duì)列中,最終被監(jiān)聽(tīng)到业岁。
(1)同樣在RabbitmqConfig 配置類中創(chuàng)建兩個(gè)隊(duì)列topicQueueOne鳞仙、topicQueueTwo,由topicExchange用分別用兩個(gè)路由鍵"mq.topic.routing.key."和"mq.topic.routing.key.#"綁定起來(lái)笔时。
@Slf4j
@Configuration
public class RabbitmqConfig {
//主題topic消息模型-創(chuàng)建交換機(jī)-topicExchange
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(RabbitMqConstants.TOPIC_EXCHANGE,true,false);
}
//主題topic消息模型-創(chuàng)建隊(duì)列1
@Bean
public Queue topicQueueOne(){
return new Queue(RabbitMqConstants.TOPIC_ONE_QUEUE,true);
}
//主題topic消息模型-創(chuàng)建隊(duì)列2
@Bean
public Queue topicQueueTwo(){
return new Queue(RabbitMqConstants.TOPIC_TWO_QUEUE,true);
}
//主題topic消息模型-創(chuàng)建綁定-通配符為*的路由
@Bean
public Binding topicBindingOne(){
return BindingBuilder.bind(topicQueueOne()).to(topicExchange()).with(RabbitMqConstants.TOPIC_ONE_ROUTING_KEY);
}
//主題topic消息模型-創(chuàng)建綁定-通配符為#的路由
@Bean
public Binding topicBindingTwo(){
return BindingBuilder.bind(topicQueueTwo()).to(topicExchange()).with(RabbitMqConstants.TOPIC_TWO_ROUTING_KEY);
}
}
RabbitMqConstants常量值如下:
@Data
public class RabbitMqConstants {
//主題topicExchange消息模型
public static final String TOPIC_ONE_QUEUE = "mq.topic.one.queue";
public static final String TOPIC_TWO_QUEUE = "mq.topic.two.queue";
public static final String TOPIC_ONE_ROUTING_KEY = "mq.topic.routing.key.*";
public static final String TOPIC_TWO_ROUTING_KEY = "mq.topic.routing.key.#";
public static final String TOPIC_EXCHANGE = "mq.topic.exchange";
}
(2)啟動(dòng)項(xiàng)目棍好,訪問(wèn)http://127.0.0.1:15672/可以看到我們?cè)O(shè)置的隊(duì)列交換機(jī)以及綁定的路由相關(guān)信息:
(3)topicExchange消息模型-生產(chǎn)者topicPublisher
@Slf4j
@Component
public class TopicPublisher {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 發(fā)送消息-基于TopicExchange消息模型
*/
public void sendMsgTopic(Order order, String routingKey){
try {
//指定交換機(jī)
rabbitTemplate.setExchange(RabbitMqConstants.TOPIC_EXCHANGE);
//指定路由的實(shí)際取值,根據(jù)不同取值,RabbitMQ將自行進(jìn)行匹配通配符借笙,從而路由到不同的隊(duì)列中
rabbitTemplate.setRoutingKey(routingKey);
//發(fā)送消息
rabbitTemplate.convertAndSend(order);
//打印日志
log.info("消息模型TopicExchange-生產(chǎn)者-發(fā)送消息:{},路由:{} ", order, routingKey);
} catch (Exception e) {
log.error("消息模型TopicExchange-生產(chǎn)者-發(fā)送消息:{},發(fā)生異常:{} ", order, e);
}
}
}
(4)topicExchange消息模型-消費(fèi)者topicConsumer
@Slf4j
@Component
public class TopicConsumer {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 監(jiān)聽(tīng)并消費(fèi)隊(duì)列中的消息-topicExchange-*通配符
*/
@RabbitListener(queues = RabbitMqConstants.TOPIC_ONE_QUEUE, containerFactory = "singleListenerContainer")
public void consumeTopicMsgOne(Order order) {
try {
log.info("消息模型topicExchange-*-消費(fèi)者-監(jiān)聽(tīng)消費(fèi)到消息:{} ", order);
} catch (Exception e) {
log.error("消息模型topicExchange-*-消費(fèi)者-監(jiān)聽(tīng)消費(fèi)發(fā)生異常:", e);
}
}
/**
* 監(jiān)聽(tīng)并消費(fèi)隊(duì)列中的消息-topicExchange-#通配符
*/
@RabbitListener(queues = RabbitMqConstants.TOPIC_TWO_QUEUE, containerFactory = "singleListenerContainer")
public void consumeTopicMsgTwo(Order order) {
try {
log.info("消息模型topicExchange-#-消費(fèi)者-監(jiān)聽(tīng)消費(fèi)到消息:{} ", order);
} catch (Exception e) {
log.error("消息模型topicExchange-#-消費(fèi)者-監(jiān)聽(tīng)消費(fèi)發(fā)生異常:", e);
}
}
}
(5)最后調(diào)用test方法發(fā)送消息扒怖,路由鍵有:mq.topic.routing.key.java、mq.topic.routing.key.php.python业稼、mq.topic.routing.key盗痒。
@Test
public void testTopicPublish() {
//此時(shí)相當(dāng)于*,即java替代了*的位置
//當(dāng)然由于#表示任意單詞盼忌,因而也將路由到#表示的路由和對(duì)應(yīng)的隊(duì)列中
String routingKeyOne="mq.topic.routing.key.java";
//此時(shí)相當(dāng)于#:即 php.python 替代了#的位置
String routingKeyTwo="mq.topic.routing.key.php.python";
//此時(shí)相當(dāng)于#:即0個(gè)單詞
String routingKeyThree="mq.topic.routing.key";
Order order = new Order();
order.setOrdernum("123456");
topicPublisher.sendMsgTopic(order,routingKeyOne);
//topicPublisher.sendMsgTopic(order,routingKeyTwo);
//topicPublisher.sendMsgTopic(order,routingKeyThree);
}
上一篇博客:springboot rabbitmq入門使用
下一篇博客:springboot rabbitmq高可用消息確認(rèn)消費(fèi)實(shí)戰(zhàn)
參考資料:
《分布式中間件實(shí)戰(zhàn)》
《rabbitmq實(shí)戰(zhàn)指南》