@[toc]
今天這篇文章比較簡單,來和小伙伴們分享一下 RabbitMQ 的七種消息傳遞形式。一起來看看铲汪。
大部分情況下熊尉,我們可能都是在 Spring Boot 或者 Spring Cloud 環(huán)境下使用 RabbitMQ,因此本文我也主要從這兩個方面來和大家分享 RabbitMQ 的用法掌腰。
1. RabbitMQ 架構(gòu)簡介
一圖勝千言狰住,如下:
這張圖中涉及到如下一些概念:
- 生產(chǎn)者(Publisher):發(fā)布消息到 RabbitMQ 中的交換機(Exchange)上。
- 交換機(Exchange):和生產(chǎn)者建立連接并接收生產(chǎn)者的消息齿梁。
- 消費者(Consumer):監(jiān)聽 RabbitMQ 中的 Queue 中的消息催植。
- 隊列(Queue):Exchange 將消息分發(fā)到指定的 Queue,Queue 和消費者進(jìn)行交互士飒。
- 路由(Routes):交換機轉(zhuǎn)發(fā)消息到隊列的規(guī)則查邢。
2. 準(zhǔn)備工作
大家知道,RabbitMQ 是 AMQP 陣營里的產(chǎn)品酵幕,Spring Boot 為 AMQP 提供了自動化配置依賴 spring-boot-starter-amqp扰藕,因此首先創(chuàng)建 Spring Boot 項目并添加該依賴,如下:
項目創(chuàng)建成功后芳撒,在 application.properties 中配置 RabbitMQ 的基本連接信息邓深,如下:
spring.rabbitmq.host=localhost
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672
接下來進(jìn)行 RabbitMQ 配置,在 RabbitMQ 中笔刹,所有的消息生產(chǎn)者提交的消息都會交由 Exchange 進(jìn)行再分配芥备,Exchange 會根據(jù)不同的策略將消息分發(fā)到不同的 Queue 中。
RabbitMQ 官網(wǎng)介紹了如下幾種消息分發(fā)的形式:
這里給出了七種舌菜,其中第七種是消息確認(rèn)萌壳,消息確認(rèn)這塊松哥之前發(fā)過相關(guān)的文章,傳送門:
所以這里我主要和大家介紹前六種消息收發(fā)方式。
3. 消息收發(fā)
3.1 Hello World
咦爱咬?這個咋沒有交換機尺借?這個其實是默認(rèn)的交換機,我們需要提供一個生產(chǎn)者一個隊列以及一個消費者精拟。消息傳播圖如下:
來看看代碼實現(xiàn):
先來看看隊列的定義:
@Configuration
public class HelloWorldConfig {
public static final String HELLO_WORLD_QUEUE_NAME = "hello_world_queue";
@Bean
Queue queue1() {
return new Queue(HELLO_WORLD_QUEUE_NAME);
}
}
再來看看消息消費者的定義:
@Component
public class HelloWorldConsumer {
@RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME)
public void receive(String msg) {
System.out.println("msg = " + msg);
}
}
消息發(fā)送:
@SpringBootTest
class RabbitmqdemoApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
rabbitTemplate.convertAndSend(HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, "hello");
}
}
這個時候使用的其實是默認(rèn)的直連交換機(DirectExchange)燎斩,DirectExchange 的路由策略是將消息隊列綁定到一個 DirectExchange 上,當(dāng)一條消息到達(dá) DirectExchange 時會被轉(zhuǎn)發(fā)到與該條消息 routing key
相同的 Queue 上蜂绎,例如消息隊列名為 “hello-queue”栅表,則 routingkey 為 “hello-queue” 的消息會被該消息隊列接收。
3.2 Work queues
這種情況是這樣的:
一個生產(chǎn)者师枣,一個默認(rèn)的交換機(DirectExchange)谨读,一個隊列,兩個消費者坛吁,如下圖:
一個隊列對應(yīng)了多個消費者劳殖,默認(rèn)情況下,由隊列對消息進(jìn)行平均分配拨脉,消息會被分到不同的消費者手中哆姻。消費者可以配置各自的并發(fā)能力,進(jìn)而提高消息的消費能力玫膀,也可以配置手動 ack矛缨,來決定是否要消費某一條消息。
先來看并發(fā)能力的配置帖旨,如下:
@Component
public class HelloWorldConsumer {
@RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME)
public void receive(String msg) {
System.out.println("receive = " + msg);
}
@RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME,concurrency = "10")
public void receive2(String msg) {
System.out.println("receive2 = " + msg+"------->"+Thread.currentThread().getName());
}
}
可以看到箕昭,第二個消費者我配置了 concurrency 為 10,此時解阅,對于第二個消費者落竹,將會同時存在 10 個子線程去消費消息。
啟動項目货抄,在 RabbitMQ 后臺也可以看到一共有 11 個消費者述召。
此時,如果生產(chǎn)者發(fā)送 10 條消息蟹地,就會一下都被消費掉积暖。
消息發(fā)送方式如下:
@SpringBootTest
class RabbitmqdemoApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend(HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, "hello");
}
}
}
消息消費日志如下:
可以看到,消息都被第一個消費者消費了怪与。但是小伙伴們需要注意夺刑,事情并不總是這樣(多試幾次就可以看到差異),消息也有可能被第一個消費者消費(只是由于第二個消費者有十個線程一起開動分别,所以第二個消費者消費的消息占比更大)遍愿。
當(dāng)然消息消費者也可以開啟手動 ack,這樣可以自行決定是否消費 RabbitMQ 發(fā)來的消息茎杂,配置手動 ack 的方式如下:
spring.rabbitmq.listener.simple.acknowledge-mode=manual
消費代碼如下:
@Component
public class HelloWorldConsumer {
@RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME)
public void receive(Message message,Channel channel) throws IOException {
System.out.println("receive="+message.getPayload());
channel.basicAck(((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)),true);
}
@RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, concurrency = "10")
public void receive2(Message message, Channel channel) throws IOException {
System.out.println("receive2 = " + message.getPayload() + "------->" + Thread.currentThread().getName());
channel.basicReject(((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)), true);
}
}
此時第二個消費者拒絕了所有消息错览,第一個消費者消費了所有消息。
這就是 Work queues 這種情況煌往。
3.3 Publish/Subscrite
再來看發(fā)布訂閱模式倾哺,這種情況是這樣:
一個生產(chǎn)者,多個消費者刽脖,每一個消費者都有自己的一個隊列羞海,生產(chǎn)者沒有將消息直接發(fā)送到隊列,而是發(fā)送到了交換機曲管,每個隊列綁定交換機却邓,生產(chǎn)者發(fā)送的消息經(jīng)過交換機,到達(dá)隊列院水,實現(xiàn)一個消息被多個消費者獲取的目的腊徙。需要注意的是简十,如果將消息發(fā)送到一個沒有隊列綁定的 Exchange上面,那么該消息將會丟失撬腾,這是因為在 RabbitMQ 中 Exchange 不具備存儲消息的能力螟蝙,只有隊列具備存儲消息的能力,如下圖:
這種情況下民傻,我們有四種交換機可供選擇胰默,分別是:
- Direct
- Fanout
- Topic
- Header
我分別來給大家舉一個簡單例子看下。
3.3.1 Direct
DirectExchange 的路由策略是將消息隊列綁定到一個 DirectExchange 上漓踢,當(dāng)一條消息到達(dá) DirectExchange 時會被轉(zhuǎn)發(fā)到與該條消息 routing key 相同的 Queue 上牵署,例如消息隊列名為 “hello-queue”,則 routingkey 為 “hello-queue” 的消息會被該消息隊列接收喧半。DirectExchange 的配置如下:
@Configuration
public class RabbitDirectConfig {
public final static String DIRECTNAME = "javaboy-direct";
@Bean
Queue queue() {
return new Queue("hello-queue");
}
@Bean
DirectExchange directExchange() {
return new DirectExchange(DIRECTNAME, true, false);
}
@Bean
Binding binding() {
return BindingBuilder.bind(queue())
.to(directExchange()).with("direct");
}
}
- 首先提供一個消息隊列Queue奴迅,然后創(chuàng)建一個DirectExchange對象,三個參數(shù)分別是名字薯酝,重啟后是否依然有效以及長期未用時是否刪除半沽。
- 創(chuàng)建一個Binding對象將Exchange和Queue綁定在一起。
- DirectExchange和Binding兩個Bean的配置可以省略掉吴菠,即如果使用DirectExchange者填,可以只配置一個Queue的實例即可。
再來看看消費者:
@Component
public class DirectReceiver {
@RabbitListener(queues = "hello-queue")
public void handler1(String msg) {
System.out.println("DirectReceiver:" + msg);
}
}
通過 @RabbitListener 注解指定一個方法是一個消息消費方法做葵,方法參數(shù)就是所接收到的消息占哟。然后在單元測試類中注入一個 RabbitTemplate 對象來進(jìn)行消息發(fā)送,如下:
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void directTest() {
rabbitTemplate.convertAndSend("hello-queue", "hello direct!");
}
}
最終執(zhí)行結(jié)果如下:
3.3.2 Fanout
FanoutExchange 的數(shù)據(jù)交換策略是把所有到達(dá) FanoutExchange 的消息轉(zhuǎn)發(fā)給所有與它綁定的 Queue 上酿矢,在這種策略中榨乎,routingkey 將不起任何作用,F(xiàn)anoutExchange 配置方式如下:
@Configuration
public class RabbitFanoutConfig {
public final static String FANOUTNAME = "sang-fanout";
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUTNAME, true, false);
}
@Bean
Queue queueOne() {
return new Queue("queue-one");
}
@Bean
Queue queueTwo() {
return new Queue("queue-two");
}
@Bean
Binding bindingOne() {
return BindingBuilder.bind(queueOne()).to(fanoutExchange());
}
@Bean
Binding bindingTwo() {
return BindingBuilder.bind(queueTwo()).to(fanoutExchange());
}
}
在這里首先創(chuàng)建 FanoutExchange瘫筐,參數(shù)含義與創(chuàng)建 DirectExchange 參數(shù)含義一致蜜暑,然后創(chuàng)建兩個 Queue,再將這兩個 Queue 都綁定到 FanoutExchange 上策肝。接下來創(chuàng)建兩個消費者肛捍,如下:
@Component
public class FanoutReceiver {
@RabbitListener(queues = "queue-one")
public void handler1(String message) {
System.out.println("FanoutReceiver:handler1:" + message);
}
@RabbitListener(queues = "queue-two")
public void handler2(String message) {
System.out.println("FanoutReceiver:handler2:" + message);
}
}
兩個消費者分別消費兩個消息隊列中的消息,然后在單元測試中發(fā)送消息之众,如下:
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void fanoutTest() {
rabbitTemplate
.convertAndSend(RabbitFanoutConfig.FANOUTNAME,
null, "hello fanout!");
}
}
注意這里發(fā)送消息時不需要 routingkey
拙毫,指定 exchange
即可,routingkey
可以直接傳一個 null
棺禾。
最終執(zhí)行日志如下:
3.3.3 Topic
TopicExchange 是比較復(fù)雜但是也比較靈活的一種路由策略缀蹄,在 TopicExchange 中,Queue 通過 routingkey 綁定到 TopicExchange 上,當(dāng)消息到達(dá) TopicExchange 后缺前,TopicExchange 根據(jù)消息的 routingkey 將消息路由到一個或者多個 Queue 上蛀醉。TopicExchange 配置如下:
@Configuration
public class RabbitTopicConfig {
public final static String TOPICNAME = "sang-topic";
@Bean
TopicExchange topicExchange() {
return new TopicExchange(TOPICNAME, true, false);
}
@Bean
Queue xiaomi() {
return new Queue("xiaomi");
}
@Bean
Queue huawei() {
return new Queue("huawei");
}
@Bean
Queue phone() {
return new Queue("phone");
}
@Bean
Binding xiaomiBinding() {
return BindingBuilder.bind(xiaomi()).to(topicExchange())
.with("xiaomi.#");
}
@Bean
Binding huaweiBinding() {
return BindingBuilder.bind(huawei()).to(topicExchange())
.with("huawei.#");
}
@Bean
Binding phoneBinding() {
return BindingBuilder.bind(phone()).to(topicExchange())
.with("#.phone.#");
}
}
- 首先創(chuàng)建 TopicExchange,參數(shù)和前面的一致诡延。然后創(chuàng)建三個 Queue滞欠,第一個 Queue 用來存儲和 “xiaomi” 有關(guān)的消息,第二個 Queue 用來存儲和 “huawei” 有關(guān)的消息肆良,第三個 Queue 用來存儲和 “phone” 有關(guān)的消息。
- 將三個 Queue 分別綁定到 TopicExchange 上逸绎,第一個 Binding 中的 “xiaomi.#” 表示消息的 routingkey 凡是以 “xiaomi” 開頭的惹恃,都將被路由到名稱為 “xiaomi” 的 Queue 上,第二個 Binding 中的 “huawei.#” 表示消息的 routingkey 凡是以 “huawei” 開頭的棺牧,都將被路由到名稱為 “huawei” 的 Queue 上巫糙,第三個 Binding 中的 “#.phone.#” 則表示消息的 routingkey 中凡是包含 “phone” 的,都將被路由到名稱為 “phone” 的 Queue 上颊乘。
接下來針對三個 Queue 創(chuàng)建三個消費者参淹,如下:
@Component
public class TopicReceiver {
@RabbitListener(queues = "phone")
public void handler1(String message) {
System.out.println("PhoneReceiver:" + message);
}
@RabbitListener(queues = "xiaomi")
public void handler2(String message) {
System.out.println("XiaoMiReceiver:"+message);
}
@RabbitListener(queues = "huawei")
public void handler3(String message) {
System.out.println("HuaWeiReceiver:"+message);
}
}
然后在單元測試中進(jìn)行消息的發(fā)送,如下:
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void topicTest() {
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,
"xiaomi.news","小米新聞..");
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,
"huawei.news","華為新聞..");
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,
"xiaomi.phone","小米手機..");
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,
"huawei.phone","華為手機..");
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,
"phone.news","手機新聞..");
}
}
根據(jù) RabbitTopicConfig 中的配置乏悄,第一條消息將被路由到名稱為 “xiaomi” 的 Queue 上浙值,第二條消息將被路由到名為 “huawei” 的 Queue 上,第三條消息將被路由到名為 “xiaomi” 以及名為 “phone” 的 Queue 上檩小,第四條消息將被路由到名為 “huawei” 以及名為 “phone” 的 Queue 上开呐,最后一條消息則將被路由到名為 “phone” 的 Queue 上。
3.3.4 Header
HeadersExchange 是一種使用較少的路由策略规求,HeadersExchange 會根據(jù)消息的 Header 將消息路由到不同的 Queue 上筐付,這種策略也和 routingkey無關(guān),配置如下:
@Configuration
public class RabbitHeaderConfig {
public final static String HEADERNAME = "javaboy-header";
@Bean
HeadersExchange headersExchange() {
return new HeadersExchange(HEADERNAME, true, false);
}
@Bean
Queue queueName() {
return new Queue("name-queue");
}
@Bean
Queue queueAge() {
return new Queue("age-queue");
}
@Bean
Binding bindingName() {
Map<String, Object> map = new HashMap<>();
map.put("name", "sang");
return BindingBuilder.bind(queueName())
.to(headersExchange()).whereAny(map).match();
}
@Bean
Binding bindingAge() {
return BindingBuilder.bind(queueAge())
.to(headersExchange()).where("age").exists();
}
}
這里的配置大部分和前面介紹的一樣阻肿,差別主要體現(xiàn)的 Binding 的配置上瓦戚,第一個 bindingName 方法中,whereAny 表示消息的 Header 中只要有一個 Header 匹配上 map 中的 key/value丛塌,就把該消息路由到名為 “name-queue” 的 Queue 上较解,這里也可以使用 whereAll 方法,表示消息的所有 Header 都要匹配姨伤。whereAny 和 whereAll 實際上對應(yīng)了一個名為 x-match 的屬性哨坪。bindingAge 中的配置則表示只要消息的 Header 中包含 age,不管 age 的值是多少乍楚,都將消息路由到名為 “age-queue” 的 Queue 上当编。
接下來創(chuàng)建兩個消息消費者:
@Component
public class HeaderReceiver {
@RabbitListener(queues = "name-queue")
public void handler1(byte[] msg) {
System.out.println("HeaderReceiver:name:"
+ new String(msg, 0, msg.length));
}
@RabbitListener(queues = "age-queue")
public void handler2(byte[] msg) {
System.out.println("HeaderReceiver:age:"
+ new String(msg, 0, msg.length));
}
}
注意這里的參數(shù)用 byte 數(shù)組接收。然后在單元測試中創(chuàng)建消息的發(fā)送方法徒溪,這里消息的發(fā)送也和 routingkey 無關(guān)忿偷,如下:
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void headerTest() {
Message nameMsg = MessageBuilder
.withBody("hello header! name-queue".getBytes())
.setHeader("name", "sang").build();
Message ageMsg = MessageBuilder
.withBody("hello header! age-queue".getBytes())
.setHeader("age", "99").build();
rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, ageMsg);
rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, nameMsg);
}
}
這里創(chuàng)建兩條消息金顿,兩條消息具有不同的 header,不同 header 的消息將被發(fā)到不同的 Queue 中去鲤桥。
最終執(zhí)行效果如下:
3.4 Routing
這種情況是這樣:
一個生產(chǎn)者揍拆,一個交換機,兩個隊列茶凳,兩個消費者嫂拴,生產(chǎn)者在創(chuàng)建 Exchange 后,根據(jù) RoutingKey 去綁定相應(yīng)的隊列贮喧,并且在發(fā)送消息時筒狠,指定消息的具體 RoutingKey 即可。
如下圖:
這個就是按照 routing key 去路由消息箱沦,我這里就不再舉例子了辩恼,大家可以參考 3.3.1 小結(jié)。
3.5 Topics
這種情況是這樣:
一個生產(chǎn)者谓形,一個交換機灶伊,兩個隊列,兩個消費者寒跳,生產(chǎn)者創(chuàng)建 Topic 的 Exchange 并且綁定到隊列中聘萨,這次綁定可以通過 *
和 #
關(guān)鍵字,對指定 RoutingKey
內(nèi)容冯袍,編寫時注意格式 xxx.xxx.xxx
去編寫匈挖。
如下圖:
這個我也就不舉例啦,前面 3.3.3 小節(jié)已經(jīng)舉過例子了康愤,不再贅述儡循。
3.6 RPC
RPC 這種消息收發(fā)形式,松哥前兩天剛剛寫了文章和大家介紹征冷,這里就不多說了择膝,傳送門:
3.7 Publisher Confirms
這種發(fā)送確認(rèn)松哥之前有寫過相關(guān)文章,傳送門:
4. 小結(jié)
好啦,今天這篇文章主要是和小伙伴們整理了 RabbitMQ 中消息收發(fā)的七種形式叔收,感興趣的小伙伴可以試試哦~
公眾號【江南一點雨】后臺回復(fù) rabbitmqdemo齿穗,獲取本文案例地址~