一添寺、RabbitMQ的特點
RabbitMQ是一款使用Erlang語言開發(fā)的,實現(xiàn)AMQP(高級消息隊列協(xié)議)的開源消息中間件兼丰。首先要知道一些RabbitMQ的特點柔纵,官網(wǎng)可查:
- 可靠性:支持持久化氯窍,傳輸確認饲常,發(fā)布確認等保證了MQ的可靠性。
- 靈活的分發(fā)消息策略:這應該是RabbitMQ的一大特點狼讨。在消息進入MQ前由Exchange(交換機)進行路由消息贝淤。
- 分發(fā)消息策略有:簡單模式、工作隊列模式政供、發(fā)布訂閱模式播聪、路由模式、通配符模式布隔。
- 支持集群:多臺RabbitMQ服務器可以組成一個集群离陶,形成一個邏輯Broker。
- 多種協(xié)議:RabbitMQ支持多種消息隊列協(xié)議衅檀,比如 STOMP枕磁、MQTT 等等。
- 支持多種語言客戶端:RabbitMQ幾乎支持所有常用編程語言术吝,包括 Java计济、.NET、Ruby 等等排苍。
- 可視化管理界面:RabbitMQ提供了一個易用的用戶界面沦寂,使得用戶可以監(jiān)控和管理消息 Broker。
- 插件機制:RabbitMQ提供了許多插件淘衙,可以通過插件進行擴展传藏,也可以編寫自己的插件。
二彤守、AMQP
AMQP模型
消息(message)被發(fā)布者(publisher)發(fā)送給交換機(exchange)毯侦,然后交換機將收到的消息根據(jù)路由規(guī)則分發(fā)給綁定的隊列(queue)。最后AMQP代理會將消息投遞給訂閱了此隊列的消費者具垫,或者消費者按照需求自行獲取侈离。
消息確認
從安全角度考慮,網(wǎng)絡是不可靠的筝蚕,接收消息的應用也有可能在處理消息的時候失敗卦碾。基于此原因起宽,AMQP模塊包含了一個消息確認(message acknowledgements)的概念:當消息從隊列投遞給消費者的時候洲胖,消費者服務器需要返回一個ack(確認信息),當broker收到了確認才會將該消息刪除坯沪;消息確認可以是自動的绿映,也可以是由消費端手動確認。此外也支持生產(chǎn)端向broker發(fā)送消息得到broker的ack腐晾,從而針對做出響應邏輯叉弦。
AMQP是一個可編程的協(xié)議
某種意義上說AMQP的實體和路由規(guī)則是由應用本身定義的,而不是由消息代理定義赴魁。包括像聲明隊列和交換機卸奉,定義他們之間的綁定,訂閱隊列等等關于協(xié)議本身的操作颖御。但是需要注意雙方定義的沖突榄棵,否則會暴露出配置錯誤的問題。
三潘拱、RabbitMQ安裝
Windows10安裝
步驟
- 到erlang官網(wǎng)下載win10版安裝包疹鳄。下載完成后傻瓜式安裝。
- 配置erlang環(huán)境變量
cmd輸入erl驗證安裝是否成功芦岂,如下成功瘪弓;ctrl+c退出
- 傻瓜式安裝RabbitMQ服務。
在RabbitMQ的gitHub項目中禽最,下載window版本的服務端安裝包 - 進入安裝目錄腺怯,sbin目錄下袱饭,執(zhí)行:rabbitmq-plugins enable rabbitmq_management 命令安裝管理頁面的插件
- 雙擊rabbitmq-server.bat啟動腳本或者在sbin目錄下執(zhí)行:rabbitmq-server start,然后打開服務管理可以看到RabbitMQ正在運行呛占。
- 打開瀏覽器輸入http://localhost:15672虑乖,賬號密碼默認是:guest/guest
四、Spring整合AMQP
官方中文文檔
GitHup翻譯文檔
GitHub - rockit-ba/spring-rabbit-: spring AMQP 實現(xiàn): spring rabbit 官方中文文檔翻譯
Spring AMQP主要對象類及作用
- Queue:對應RabbitMQ中Queue
- AmqpTemplate:接口晾虑,用于向RabbitMQ發(fā)送和接收Message
- RabbitTemplate:AmqpTemplate的實現(xiàn)類
- @RabbitListener:指定消息接收方疹味,可以配置在類和方法上
- @RabbitHandler:指定消息接收方,只能配置在方法上帜篇,可以與@RabbitListener一起使用
- Message:對RabbitMQ消息的封裝
- Exchange:對RabbitMQ的Exchange的封裝糙捺,子類有TopicExchange、FanoutExchange和DirectExchange等
- Binding:將一個Queue綁定到某個Exchange笙隙,本身只是一個聲明洪灯,并不做實際綁定操作
- AmqpAdmin:接口,用于Exchange和Queue的管理逃沿,比如創(chuàng)建/刪除/綁定等婴渡,自動檢查Binding類并完成綁定操作
- RabbitAdmin:AmqpAdmin的實現(xiàn)類
- ConnectionFactory:創(chuàng)建Connection的工廠類,RabbitMQ也有一個名為ConnectionFactory的類但二者沒有繼承關系凯亮,Spring ConnectionFactory可以認為是對RabbitMQ ConnectionFactory的封裝
- CachingConnectionFactory:Spring ConnectionFactory的實現(xiàn)類边臼,可以用于緩存Channel和Connection
- Connection:Spring中用于創(chuàng)建Channel的連接類,RabbitMQ也有一個名為Connection的類假消,但二者沒有繼承關系柠并,Spring Connection是對RabbitMQ Connection的封裝
- SimpleConnection:Spring Connection的實現(xiàn)類,將實際工作代理給RabbitMQ的Connection類
- MessageListenerContainer:接口富拗,消費端負責與RabbitMQ服務器保持連接并將Message傳遞給實際的@RabbitListener/@RabbitHandler處理
- RabbitListenerContainerFactory:接口臼予,用于創(chuàng)建MessageListenerContainer
- SimpleMessageListenerContainer:MessageListenerContainer的實現(xiàn)類
- SimpleRabbitListenerContainerFactory:RabbitListenerContainerFactory的實現(xiàn)類
- RabbitProperties:用于配置Spring AMQP的Property類
Spring AMQP主要參數(shù)
基礎信息
參數(shù) | 默認值 | 說明 |
---|---|---|
spring.rabbitmq.host | localhost | 主機 |
spring.rabbitmq.port | 5672 | 端口 |
spring.rabbitmq.username | guest | 用戶名 |
spring.rabbitmq.password | guest | 密碼 |
spring.rabbitmq.virtual-host | / | 虛擬主機 |
spring.rabbitmq.addresses | server的地址列表(以逗號分隔),配置了該項將忽略spring.rabbitmq.host和spring.rabbitmq.port | |
spring.rabbitmq.requested-heartbeat | 請求心跳超時時間,0表示不指定啃沪;如果后面沒加時間單位默認為秒 | |
spring.rabbitmq.publisher-confirm-type | none | 發(fā)布確認類型粘拾,none、correlated创千、simple該配置只管有無投遞到exchange缰雇,而不管有無發(fā)送到隊列當中 |
spring.rabbitmq.publisher-returns | false | 是否啟用發(fā)布返回 |
spring.rabbitmq.connection-timeout | 連接超時時間,0表示永不超時 |
緩存cache
參數(shù) | 默認值 | 說明 |
---|---|---|
spring.rabbitmq.cache.channel.checkout-timeout | 如果已達到channel緩存大小追驴,等待獲取channel的時間械哟。 如果為0,則始終創(chuàng)建一個新channel殿雪。 | |
spring.rabbitmq.cache.channel.size | 緩存中保持的channel數(shù)量 | |
spring.rabbitmq.cache.connection.size | 緩存的connection數(shù)暇咆,只有是CONNECTION模式時生效 | |
spring.rabbitmq.cache.connection.mode | channel | 連接工廠緩存模式 |
Listener
參數(shù) | 默認值 | 說明 |
---|---|---|
spring.rabbitmq.listener.type | simple | 容器類型,simple或direct |
spring.rabbitmq.listener.simple.auto-startup | true | 應用啟動時是否啟動容器 |
spring.rabbitmq.listener.simple.acknowledge-mode | auto | 消息確認方式,none爸业、manual和auto |
spring.rabbitmq.listener.simple.concurrency | listener最小消費者數(shù) | |
spring.rabbitmq.listener.simple.max-concurrency | listener最大消費者數(shù) | |
spring.rabbitmq.listener.simple.prefetch | 一個消費者最多可處理的nack消息數(shù)量 | |
spring.rabbitmq.listener.simple.default-requeue-rejected | true | 被拒絕的消息是否重新入隊 |
spring.rabbitmq.listener.simple.missing-queues-fatal | true | 如果容器聲明的隊列不可用其骄,是否失敗沃呢;或如果在運行時刪除一個或多個隊列年栓,是否停止容器 |
spring.rabbitmq.listener.simple.idle-event-interval | 空閑容器事件應多久發(fā)布一次 | |
spring.rabbitmq.listener.simple.retry.enabled | false | 是否開啟消費者重試 |
spring.rabbitmq.listener.simple.retry.max-attempts | 3 | 最大重試次數(shù) |
spring.rabbitmq.listener.simple.retry.max-interval | 10000ms | 最大重試間隔 |
spring.rabbitmq.listener.simple.retry.initial-interval | 1000ms | 第一次和第二次嘗試發(fā)送消息的時間間隔 |
spring.rabbitmq.listener.simple.retry.multiplier | 1.0 | 應用于前一個重試間隔的乘數(shù) |
spring.rabbitmq.listener.simple.retry.stateless | true | 重試是無狀態(tài)還是有狀態(tài) |
spring.rabbitmq.listener.direct.consumers-per-queue | 每個隊列消費者數(shù)量 |
Template
參數(shù) | 默認值 | 說明 |
---|---|---|
spring.rabbitmq.template.mandatory | false | 消息在沒有被隊列接收時是否退回,與spring.rabbitmq.publisher-returns類似薄霜, 該配置優(yōu)先級高于spring.rabbitmq.publisher-returns |
spring.rabbitmq.template.receive-timeout | receive() 操作的超時時間 | |
spring.rabbitmq.template.reply-timeout | sendAndReceive() 操作的超時時間 | |
spring.rabbitmq.template.retry.enabled | false | 發(fā)送消息是否重試 |
spring.rabbitmq.template.retry.max-attempts | 3.0 | 發(fā)送消息最大重試次數(shù) |
spring.rabbitmq.template.retry.initial-interval | 1000ms | 第一次和第二次嘗試發(fā)送消息的時間間隔 |
spring.rabbitmq.template.retry.multiplier | 1.0 | 應用于前一個重試間隔的乘數(shù) |
spring.rabbitmq.template.retry.max-interval | 10000ms | 最大重試間隔 |
五、Springboot整合AMQP
5.1 消費端監(jiān)聽相關注解
@RabbitListener
可以作用在類或方法上纸兔,設置監(jiān)聽的隊列惰瓜。 如果未設置containerFactory(),則使用默認容器工廠汉矿。
內(nèi)置許多屬性提供綁定隊列的關系崎坊。
- 作用在方法上:表明該方法監(jiān)聽某個隊列
- 作用在類上:需配合使用@RabbitHandler,監(jiān)聽隊列會調用@RabbitHandler注釋的方法
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "directQueue-Two", durable = "false"),
exchange = @Exchange(value = "MqSendService-One", type = "direct", durable = "false"),
key = "One"),
ackMode = "MANUAL"
)
public void tsJucDirectMsgTwo(@Header Message data, Channel channel){
}
注意事項
必須指定監(jiān)聽的隊列洲拇。建議聲明指定綁定交換器和隊列奈揍,保持和生產(chǎn)端一致
方式一:只聲明監(jiān)聽隊列(不推薦)
@RabbitListener(queues = "directQueue-One")
該方式消費者會默認監(jiān)聽這個隊列,如果rabbit服務端broker內(nèi)不存在該隊列赋续,則會一直報錯男翰。
方式二:保持和生產(chǎn)端同步,指定綁定關系
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "directQueue-One",type = "direct"),
exchange = @Exchange(value = "MqSendService-One"),
key = "One"
))
該方式如果broker內(nèi)還未存在指定隊列纽乱,則會直接創(chuàng)建指定的Exchange和Queue蛾绎。
無隊列情況出現(xiàn)場景:
- 在生產(chǎn)端聲明了但還未發(fā)送消息情況,因為若只在生產(chǎn)端聲明鸦列,但還未發(fā)送過消息租冠,就不會創(chuàng)建對應的Exchange和Queue。
- broker中的隊列被刪除
@RabbitHandler
@RabbitListener 標注在類上面表示當有收到消息的時候薯嗤,就交給 @RabbitHandler 的方法處理顽爹;
具體使用哪個方法處理,根據(jù) MessageConverter 轉換后的參數(shù)類型
@Component
@RabbitListener(queues = "consumer_queue")
public class Receiver {
@RabbitHandler
public void processMessage1(String message) {
System.out.println(message);
}
@RabbitHandler
public void processMessage2(byte[] message) {
System.out.println(new String(message));
}
}
@Payload
可以獲取消息中的 body 信息
@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body) {
System.out.println("body:"+body);
}
@Header骆姐,@Headers
可以獲得消息中的 headers 信息
@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Header String token) {
System.out.println("body:"+body);
System.out.println("token:"+token);
}
@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Headers Map<String,Object> headers) {
System.out.println("body:"+body);
System.out.println("Headers:"+headers);
}
快速入門
Gitee項目:Ahang/ts-rabbitmq (gitee.com)
六镜粤、RabbitMQ結構介紹
隊列,交換機和綁定統(tǒng)稱為AMQP實體(AMQP entities)
6.1 成員
ConnectionFactory诲锹、Connection
ConnectionFactory繁仁、Connection、Channel都是RabbitMQ對外提供的API中最基本的對象归园。Connection是publisher/consumer 和 broker 之間的 TCP 連接黄虱,它封裝了socket協(xié)議相關部分邏輯。ConnectionFactory為Connection的制造工廠庸诱。
Channel
如果每一次訪問 RabbitMQ 都建立一個 Connection捻浦,在消息量大的時候建立 TCP Connection 的開銷將是巨大的晤揣,效率也較低。
Channel 是在 connection 內(nèi)部建立的邏輯連接朱灿,如果應用程序支持多線程昧识,通常每個 thread 創(chuàng)建單獨的 channel 進行通訊,AMQP method 包含了 channel id 幫助客戶端和 message broker 識別 channel盗扒,所以 channel 之間是完全隔離的跪楞。
Channel 作為輕量級的Connection 極大減少了操作系統(tǒng)建立 TCP connection 的開銷
Producer(生產(chǎn)者)
生產(chǎn)消息的一方,通過信道向指定交換機發(fā)送消息侣灶;
生產(chǎn)者可以在發(fā)送消息前聲明Exchange甸祭、Queue以及對應關系;聲明后發(fā)送消息如果無相關成員則會按照聲明情況創(chuàng)建對應的Exchange和Queue褥影。
若不聲明直接發(fā)送則會按照默認規(guī)則發(fā)送池户。
Consumer(消費者)
消費消息的一方,通過監(jiān)聽指定隊列來消費消息凡怎;
消費者同樣可以聲明Exchange校焦、Queue以及對應關系,聲明后如果監(jiān)聽發(fā)現(xiàn)不存在監(jiān)聽隊列统倒,則會按照聲明創(chuàng)建對應的Exchange和Queue寨典。
Exchange(交換機)
用于接受、分配消息檐薯,存在多種不同類型的交換機處理特定需求凝赛;
不做存儲,消息會存儲在隊列中坛缕;交換機只是進行消息的接收墓猎、轉發(fā)、分配赚楚。
Queue(隊列)
用于存儲生產(chǎn)者的消息
RoutingKey(路由鍵)
用于生產(chǎn)者者指定的消息路由鍵規(guī)則毙沾;
是為了匹配交換機上的綁定路由鍵,從而找到要發(fā)送的隊列宠页。
//會去名為“Topic-Ex”的交換機匹配“One.Two.Three”的綁定路由鍵
rabbitTemplate.send("Topic-Ex","One.Two.Three",msg);
BindingKey(綁定鍵)
用于把交換器的消息綁定到隊列上左胞;
是在配置時指定交換機和隊列的綁定路由鍵,是為了去匹配生產(chǎn)者發(fā)送消息指定的路由鍵举户;每個交換機和隊列之間都會有一個對應的綁定路由烤宙,首先消息發(fā)送到指定交換機,再根據(jù)發(fā)送的路由規(guī)則匹配事先設置的綁定路由鍵俭嘁,匹配到對應的綁定路由則代表消息找到對應的隊列躺枕。
//生產(chǎn)方通過配置類指定綁定關系
@Bean
public Binding bingExchange2(){
return BindingBuilder.bind(topicQueue2()) //綁定隊列
.to(topicExchange()) //隊列綁定到哪個交換器
.with("*"); //綁定路由key,必須指定
}
//消費方監(jiān)聽聲明交換機和隊列關系,應當與上方保持一致,否則會創(chuàng)建新的
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "topicQueue-One", durable = "false"),
exchange = @Exchange(value = "Topic-Ex", type = "topic", durable = "false"),
key = "*"))
public void tsTopicMsg(Message data, Channel channel) {
String str = new String(data.getBody());
System.out.println(str + "-----:" + seq);
seq.incrementAndGet();
}
虛擬主機
每個Rabbit都能創(chuàng)建很多vhost拐云,我們稱之為虛擬主機罢猪,每個虛擬主機其實都是mini版的RabbitMQ,擁有自己的隊列叉瘩,交換器和綁定膳帕,擁有自己的權限機制。
出于多租戶和安全因素設計的薇缅,把 AMQP 的基本組件劃分到一個虛擬的分組中危彩,類似于網(wǎng)絡中的 namespace 概念(或RocketMQ的Group)。
當多個不同的用戶使用同一個 RabbitMQ server 提供的服務時泳桦,可以劃分出多個 vhost恬砂,每個用戶在自己的 vhost 創(chuàng)建 exchange/queue 等。
vhost特性
- RabbitMQ默認的vhost是“/”開箱即用蓬痒;
- 多個vhost是隔離的,多個vhost無法通訊漆羔,并且不用擔心命名沖突(隊列和交換器和綁定)梧奢,實現(xiàn)了多層分離;
- 創(chuàng)建用戶的時候必須指定vhost演痒;
vhost操作
可以通過rabbitmqctl工具命令
- 創(chuàng)建
rabbitmqctl add_vhost[vhost_name]
- 刪除vhost
rabbitmqctl delete_vhost[vhost_name]
- 查看所有的vhost
rabbitmqctl list_vhosts
交換機類型
多消費者情況
當一個隊列被多個消費者監(jiān)聽亲轨,那么消息將被均勻分配到消費者,且如果某條消息阻塞不會將其他消息發(fā)到另一個空閑的消費者鸟顺,消息的分配在一開始就固定了惦蚊。
Direct類型(默認,匹配發(fā)送)
它會把消息路由到那些binding key與routing key完全匹配的Queue中讯嫂。
它是一個一對一的模型蹦锋,一條消息一定會被發(fā)到指定的一個隊列(完全匹配)。
配置代碼
@Configuration
public class RabbitDirectConfig {
@Bean
public Queue directQueue(){
//參數(shù)介紹
//1.隊列名 2.是否持久化 3.是否獨占 4.自動刪除 5.其他參數(shù)
return new Queue("directQueue-One",false,false,false,null);
}
@Bean
public Queue directQueue2(){
//參數(shù)介紹
//1.隊列名 2.是否持久化 3.是否獨占 4.自動刪除 5.其他參數(shù)
return new Queue("directQueue-Two",false,false,false,null);
}
@Bean
public DirectExchange directExchange(){
//參數(shù)介紹
//1.交換器名 2.是否持久化 3.自動刪除 4.其他參數(shù)
return new DirectExchange("MqSendService-One",false,false,null);
}
@Bean
public Binding bingExchange(){
return BindingBuilder.bind(directQueue()) //綁定隊列
.to(directExchange()) //隊列綁定到哪個交換器
.with("One"); //綁定路由key,必須指定
}
@Bean
public Binding bingExchange2(){
return BindingBuilder.bind(directQueue2()) //綁定隊列
.to(directExchange()) //隊列綁定到哪個交換器
.with("Two"); //綁定路由key,必須指定
}
}
Topic類型(拓展匹配發(fā)送)
它是Direct類型的一種擴展欧芽,提供靈活的匹配規(guī)則莉掂。
- routing key為一個句點號 " . " 分隔的字符串(我們將被句點號“. ”分隔開的每一段獨立的字符串稱為一個單詞),如"One.Two"
- binding key與routing key一樣也是句點號 " . " 分隔的字符串
- binding key中可以存在兩種特殊字符 " * " 與 " # " 千扔,用于做模糊匹配曲楚,其中“*”用于匹配一個單詞讯柔,“#”用于匹配多個單詞(可以是零個)
配置代碼
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitTopicConfig {
@Bean
public Queue topicQueue(){
//參數(shù)介紹
//1.隊列名 2.是否持久化 3.是否獨占 4.自動刪除 5.其他參數(shù)
return new Queue("topicQueue-One",false,false,false,null);
}
@Bean
public Queue topicQueue2(){
//參數(shù)介紹
//1.隊列名 2.是否持久化 3.是否獨占 4.自動刪除 5.其他參數(shù)
return new Queue("topicQueue-Two",false,false,false,null);
}
@Bean
public TopicExchange topicExchange(){
//參數(shù)介紹
//1.交換器名 2.是否持久化 3.自動刪除 4.其他參數(shù)
return new TopicExchange("Topic-Ex",false,false,null);
}
@Bean
public Binding bingExchange(){
return BindingBuilder.bind(topicQueue()) //綁定隊列
.to(topicExchange()) //隊列綁定到哪個交換器
.with("*.Two.*"); //路由key,必須指定
}
@Bean
public Binding bingExchange2(){
return BindingBuilder.bind(topicQueue2()) //綁定隊列
.to(topicExchange()) //隊列綁定到哪個交換器
.with("#"); //路由key,必須指定
}
}
Fanout 類型(廣播發(fā)送)
它會把所有發(fā)送到該Exchange的消息路由到所有與它綁定的Queue中。
它是一種一對多的類型推溃,無法指定Binding Key,發(fā)送的一條消息會被發(fā)到綁定的所有隊列朴乖。
配置代碼
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitFanoutConfig {
@Bean
public Queue fanoutQueue(){
//參數(shù)介紹
//1.隊列名 2.是否持久化 3.是否獨占 4.自動刪除 5.其他參數(shù)
return new Queue("fanoutQueue-One",false,false,false,null);
}
@Bean
public Queue fanoutQueue2(){
//參數(shù)介紹
//1.隊列名 2.是否持久化 3.是否獨占 4.自動刪除 5.其他參數(shù)
return new Queue("fanoutQueue-Two",false,false,false,null);
}
@Bean
public FanoutExchange fanoutExchange(){
//參數(shù)介紹
//1.交換器名 2.是否持久化 3.自動刪除 4.其他參數(shù)
return new FanoutExchange("Fanout-Ex",false,false,null);
}
@Bean
public Binding bingExchange(){
return BindingBuilder.bind(fanoutQueue()) //綁定隊列
.to(fanoutExchange()); //隊列綁定到哪個交換器
}
@Bean
public Binding bingExchange2(){
return BindingBuilder.bind(fanoutQueue()) //綁定隊列
.to(fanoutExchange()); //隊列綁定到哪個交換器
}
}
Headers(鍵值對匹配,不常用)
headers類型的Exchange不依賴于routing key與binding key的匹配規(guī)則來路由消息植阴,而是根據(jù)發(fā)送的消息內(nèi)容中的headers屬性進行匹配。
在綁定Queue與Exchange時指定一組鍵值對喷鸽;當消息發(fā)送到Exchange時众雷,RabbitMQ會取到該消息的headers(也是一個鍵值對的形式),對比其中的鍵值對是否完全匹配Queue與Exchange綁定時指定的鍵值對做祝;如果完全匹配則消息會路由到該Queue砾省,否則不會路由到該Queue。
該類型不常用混槐,暫不提供代碼编兄。
Message(消息)
當執(zhí)行諸如 basicPublish() 之類的操作時,內(nèi)容作為字節(jié)數(shù)組參數(shù)傳遞声登,而其他屬性作為單獨的參數(shù)傳入狠鸳。
public class Message {
private final MessageProperties messageProperties;
private final byte[] body;
public Message(byte[] body, MessageProperties messageProperties) {
this.body = body;
this.messageProperties = messageProperties;
}
public byte[] getBody() {
return this.body;
}
public MessageProperties getMessageProperties() {
return this.messageProperties;
}
...
}
MessageProperties 接口定義了幾個常見的屬性,例如“messageId”悯嗓、“timestamp”碰煌、“contentType”等等。 還可以通過調用 setHeader(String key, Object value) 方法擴展這些屬性绅作。
消息序列化
自定義的要作為消息object發(fā)送的類一定要實現(xiàn)Serializable接口,否則將收到IllegalArgumentException: SimpleMessageConverter only supports String, byte[] and Serializable payloads蛾派。
從版本開始 1.5.7, 1.6.11, 1.7.4俄认, 和 2.0.0,如果消息正文是序列化的 Serializable Java對象洪乍,執(zhí)行時不再反序列化(默認)眯杏, 這是為了防止不安全的反序列化。 默認情況下壳澳,僅 java.util 和 java.lang類反序列化岂贩。
要恢復以前的行為,可以通過調用添加允許的類/包模式 Message.addAllowedListPatterns(…)巷波。
//通配符
Message.addAllowedListPatterns("com.zh.*.class");
//單個
Message.addAllowedListPatterns(User.class.getName());
@org.junit.jupiter.api.Test
public void test() {
NoMessage hello = new NoMessage("hello");
SimpleMessageConverter simpleMessageConverter = new SimpleMessageConverter();
Message message = simpleMessageConverter.toMessage(hello, new MessageProperties());
log.info("添加白名單之前---{}",message);
Message.addAllowedListPatterns(NoMessage.class.getName());
log.info("NoMessage 全限定名:{}",NoMessage.class.getName());
log.info("添加白名單之后---{}",message);
}
輸出:
添加白名單之前---(Body:'[B@6fc3e1a4(byte[89])' MessageProperties
NoMessage 全限定名:com.rabbit.producer.NoMessage
添加白名單之后---(Body:'NoMessage(content=hello)'
Queue(隊列)
構建者創(chuàng)建
@Bean
public Queue directQueue(){
//需要的屬性可以通過構建者不斷添加
Queue queue = QueueBuilder.durable("dis").autoDelete().ttl(100).build();
return queue;
}
構造方法new
@Bean
public DirectExchange directExchange(){
Map<String, Object> args = new HashMap<>(3);
//聲明當前隊列綁定的死信交換機
args.put("x-dead-letter-exchange", "dead_exchange");
//聲明當前隊列的死信路由 key
args.put("x-dead-letter-routing-key", "dead");
//聲明隊列的 TTL
args.put("x-message-ttl", 10000);
//參數(shù)介紹
//1.交換器名 2.是否持久化 3.自動刪除 4.其他參數(shù)
return new DirectExchange("MqSendService-One",false,false,args);
}
七萎津、特性功能
Prefetch count(消息分配)
如果有多個消費者同時訂閱同一個Queue中的消息,Queue中的消息會被平攤給多個消費者抹镊。這樣并不好锉屈,因為如果每個消息的處理時間不同,就有可能會導致某些消費者一直在忙垮耳,而另外一些消費者很快就處理完手頭工作并一直空閑的情況颈渊。
我們可以通過設置prefetchCount來表示該消費者在每次在該隊列只能處理幾個消息遂黍,比如我們設置prefetchCount=1,則該消費者每次在同一隊列只能消費一條消息俊嗽,消息未處理完不會被分配該隊列其他消息雾家。這樣就達到能者多勞的效果。
rabbitmq:
addresses: 127.0.0.1
cache:
channel:
size: 25
# 指定消費端消息確認方式
listener:
simple:
# 消費端最小并發(fā)數(shù)
concurrency: 1
# 消費端最大并發(fā)數(shù)
max-concurrency: 5
# 一次處理的消息數(shù)量
prefetch: 2
# 手動應答
acknowledge-mode: manual
QOS預取值(設置未確認消息緩沖區(qū)大猩芑怼)
介紹
這是RabbitMQ的一種保護機制芯咧。防止當消息激增的時候,海量的消息進入consumer而引發(fā)consumer宕機妹田。
該值定義通道上允許的未確認消息的最大數(shù)量唬党,這是為了防止Unacked消息緩沖區(qū)存在過多的Unacked消息。
一旦數(shù)量達到配置的數(shù)量鬼佣,RabbitMQ 將停止在通道上傳遞更多消息驶拱。
除非至少有一個未處理的消息被確認,例如晶衷,假設在通道上有未確認的消息 5蓝纲、6、7晌纫,8税迷,并且通道的預取計數(shù)設置為 4,此時 RabbitMQ 將不會在該通道上再傳遞任何消息锹漱,除非至少有一個未應答的消息被 ack箭养。比方說 tag=6 這個消息剛剛被確認 ACK,RabbitMQ 將會感知這個情況到并再發(fā)送一條消息哥牍。
代碼實現(xiàn)
這個可以通過設置消息分配數(shù)目達到效果毕泌。
listener:
simple:
# 消費端最小并發(fā)數(shù)
concurrency: 1
# 消費端最大并發(fā)數(shù)
max-concurrency: 5
# 一次處理的消息數(shù)量
prefetch: 2
# 手動應答
acknowledge-mode: manual
緩沖區(qū)大小
- min = concurrency * prefetch * 節(jié)點數(shù)量
- max = max-concurrency * prefetch * 節(jié)點數(shù)量
當 unacked_msg_count < min 隊列不會阻塞。但需要及時處理unacked的消息嗅辣。 - unacked_msg_count >= min 可能會出現(xiàn)堵塞撼泛。 - unacked_msg_count >= max 隊列一定阻塞。
死信隊列
RabbitMQ的死信隊列不像RocketMQ一樣時原本就存在的澡谭,它需要我們自己設置一個交換機然后綁定隊列愿题,我們在語義上將其用作為存放無法消費的消息的隊列。
RabbitMQ的死信是通過為普通隊列設置死信參數(shù)蛙奖,當該隊列出現(xiàn)無法消費的消息潘酗,就會將這些消息轉移到設置的死信隊列中。
死信消息產(chǎn)生原因
- 消息 TTL 過期
- 隊列達到最大長度(隊列滿了雁仲,無法再添加數(shù)據(jù)到 mq 中)
- 消息被拒絕(basic.reject 或 basic.nack)并且 requeue=false
RabbitMQ中的TTL
TTL 是 RabbitMQ 中一個消息或者隊列的屬性崎脉,表明一條消息或者該隊列中的所有消息的最大存活時間,單位是毫秒伯顶。
換句話說囚灼,如果一條消息設置了 TTL 屬性或者進入了設置 TTL 屬性的隊列骆膝,那么這條消息如果在 TTL 設置的時間內(nèi)沒有被消費,則會成為"死信"灶体。如果同時配置了隊列的 TTL 和消息的TTL阅签,那么 較小 的那個值將會被使用,有兩種方式設置 TTL蝎抽。
設置TTL的方式
消息設置TTL
Message msg = new Message(s.getBytes(StandardCharsets.UTF_8));
//參數(shù)四 MessagePostProcessor:用于在執(zhí)行消息轉換后添加/修改標頭或屬性政钟。
//它還可以用于在偵聽器容器和AmqpTemplate接收消息時修改入站消息。
rabbitTemplate.convertAndSend("MqSendService-One","One",msg,correlationData->{
correlationData.getMessageProperties().setExpiration("1000");
return correlationData;
});
//也可在創(chuàng)建消息時指定
msg.getMessageProperties().setExpiration("1000");
隊列設置TTL
@Bean
public DirectExchange directExchange(){
Map<String, Object> args = new HashMap<>(3);
//聲明隊列的 TTL
args.put("x-message-ttl", 10000);
//參數(shù)介紹
//1.交換器名 2.是否持久化 3.自動刪除 4.其他參數(shù)
return new DirectExchange("MqSendService-One",false,false,args);
}
@Bean
public Queue directQueue(){
//需要的屬性可以通過構建者不斷添加
Queue queue = QueueBuilder.noDurable("TTL_Queue").ttl(100).build();
return queue;
}
二者的區(qū)別
如果設置了隊列的 TTL 屬性樟结,那么一旦消息過期养交,就會被隊列丟棄(如果配置了死信隊列被丟到死信隊列中),
而消息設置TTL方式瓢宦,消息即使過期碎连,也不一定會被馬上丟棄,因為因為 RabbitMQ 只會檢查第一個消息是否過期驮履,如果過期則丟到死信隊列鱼辙,如果第一個消息的延時時長很長,而第二個消息的延時時長很短玫镐,第二個消息并不會優(yōu)先得到執(zhí)行倒戏。
另外,還需要注意的一點是恐似,如果 不設置 TTL杜跷,表示消息永遠不會過期,如果將 TTL 設置為 0矫夷,則表示除非此時可以直接投遞該消息到消費者葱椭,否則該消息將會被丟棄。
代碼實現(xiàn)
- 1.語義聲明死信交換機
@Bean
public DirectExchange deadExchange(){
//參數(shù)介紹
//1.交換器名 2.是否持久化 3.自動刪除 4.其他參數(shù)
return new DirectExchange("Dead_Exchange",false,false,null);
}
- 2.聲明死信隊列口四,并建立綁定關系
@Bean
public Queue directQueue(){
//參數(shù)介紹
//1.隊列名 2.是否持久化 3.是否獨占 4.自動刪除 5.其他參數(shù)
return new Queue("Dead_Queue",false,false,false,null);
}
- 3.為正常隊列設置死信參數(shù)(重點)
@Bean
public Queue directQueue(){
Map<String, Object> args = new HashMap<>(3);
//聲明當前隊列綁定的死信交換機
args.put("x-dead-letter-exchange", "dead_exchange");
//聲明當前隊列的死信路由 key
args.put("x-dead-letter-routing-key", "dead");
//參數(shù)介紹
//1.隊列名 2.是否持久化 3.是否獨占 4.自動刪除 5.其他參數(shù)
return new Queue("directQueue-One",false,false,false,args);
}
@Bean
public Queue directQueue2(){
Queue queue = QueueBuilder
.durable("dis")
.autoDelete()
.ttl(100)
.deadLetterExchange("Dead_Exchange") //設置死信交換機參數(shù)
.deadLetterRoutingKey("Dead") //設置死信隊列的路由key
.build();
return queue;
}
延遲隊列
利用死信隊列達到
RabbitMQ的延遲隊列可以通過設置TTL的時間再配合設置死信隊列的參數(shù)達到。
例:創(chuàng)建一個隊列并設置TTL時間秦陋,但無人監(jiān)聽消費蔓彩,那么當TTL時間達到,該消息就會進入死信隊列驳概,這時設置一個監(jiān)聽死信隊列的消 費者赤嚼,從而達到延遲消費的效果。
利用官網(wǎng)延遲隊列插件達到
優(yōu)先級隊列
介紹
RabbitMQ支持為隊列設置優(yōu)先級顺又,從而達到優(yōu)先級高的隊列中消息被優(yōu)先消費更卒。
實現(xiàn)代碼
@Bean
public Queue directQueue2() {
//設置隊列優(yōu)先級
//args.put("x-max-priority",5)
Queue queue = QueueBuilder
//持久化并設置隊列名
.durable("dis")
//開啟隊列優(yōu)先級,并設置優(yōu)先級數(shù)
.maxPriority(5)
.build();
return queue;
}
惰性隊列
介紹
默認情況下稚照,當生產(chǎn)者將消息發(fā)送到 RabbitMQ 的時候蹂空,隊列中的消息會盡可能的存儲在內(nèi)存之中俯萌,這樣可以更加快速的將消息發(fā)送給消費者。即使是持久化的消息上枕,在被寫入磁盤的同時也會在內(nèi)存中駐留一份備份咐熙。
惰性隊列會盡可能的將消息存入磁盤中,而在消費者消費到相應的消息時才會被加載到內(nèi)存中辨萍,它的一個重要的設計目標是 支持更多的消息存儲棋恼。當消費者由于各種各樣的原因(比如消費者下線、宕機亦或者是由于維護而關閉等)而致使長時間內(nèi)不能消費消息造成堆積時锈玉,惰性隊列就很有必要了爪飘。
代碼實現(xiàn)
隊列存在兩種模式:default 和 lazy。lazy即為惰性隊列模式拉背。
@Bean
public Queue directQueue2() {
//設置惰性隊列
//args.put("x-queue-mode", "lazy");
Queue queue = QueueBuilder
//持久化并設置隊列名
.durable("dis")
//設為惰性隊列
.lazy()
.build();
return queue;
}
災難防護
Message acknowledgment(消息確認)
介紹
從安全角度考慮师崎,網(wǎng)絡是不可靠的,接收消息的應用也有可能在處理消息的時候失敗去团÷盏基于此原因,AMQP模塊包含了一個消息確認(message acknowledgements)的概念:當消息從隊列投遞給消費者的時候土陪,消費者服務器需要返回一個ack(確認信息)昼汗,當broker收到了確認才會將該消息刪除;消息確認可以是自動的鬼雀,也可以是由消費端手動確認顷窒。此外也支持生產(chǎn)端向broker發(fā)送消息得到broker的ack,從而針對做出響應邏輯源哩。
發(fā)布端消息確認(發(fā)布確認)
確認模式
-
NONE
- 禁用發(fā)布確認模式鞋吉,是默認值
-
CORRELATED
- 發(fā)布消息成功到交換器后會觸發(fā)回調方法
-
SIMPLE
- 經(jīng)測試有兩種效果,其一效果和 CORRELATED 值一樣會觸發(fā)回調方法励烦;
- 其二在發(fā)布消息成功后使用 rabbitTemplate 調用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 節(jié)點返回發(fā)送結果谓着,根據(jù)返回結果來判定下一步的邏輯,要注意的點是waitForConfirmsOrDie 方法如果返回 false 則會關閉 channel坛掠,則接下來無法發(fā)送消息到 broker赊锚。
快速入門
- 1.配置文件設置發(fā)布確認方式
spring:
application:
name: produer-mq-7001
rabbitmq:
addresses: 127.0.0.1
username: guest
password: guest
# 發(fā)布確認方式,默認NONE
publisher-confirm-type: correlated
- 2.配置RabbitTemplate
由于發(fā)布確認需要設置回調屉栓,但是Spring默認是單例的舷蒲,如果直接注入RabbitTemplate,那么在設置發(fā)布確認回調時友多,會被認為是重新設置回調方法牲平;而一個RabbitTemplate只能有初始的一個發(fā)布確認回調。
public class RabbitTemplate extends RabbitAccessor implements ... {
...
public void setConfirmCallback(ConfirmCallback confirmCallback) {
Assert.state(this.confirmCallback == null || this.confirmCallback.equals(confirmCallback),
"Only one ConfirmCallback is supported by each RabbitTemplate");
this.confirmCallback = confirmCallback;
}
...
}
public abstract class Assert {
public Assert() {
}
public static void state(boolean expression, String message) {
if (!expression) {
throw new IllegalStateException(message);
}
}
...
}
解決方式:
- 1域滥、使用多例纵柿,可以達到不同的消息發(fā)布使用不同的確認回調(違背單例)
@Bean
@Scope("prototype")
public RabbitTemplate getRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
- 2蜈抓、使用單例,在初始時即配置確認回調(僅能有一個確認回調)
@Bean
public RabbitTemplate getRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
if (!b){
ReturnedMessage dataReturned = correlationData.getReturned();
String str = new String(dataReturned.getMessage().getBody());
System.out.println(str);
log.error("消息發(fā)送失敗藐窄,請重試");
return;
}
}
});
return rabbitTemplate;
}
@Autowired
private RabbitTemplate rabbitTemplate;
//依賴注入 rabbitTemplate 之后再設置它的回調對象
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
if (!b){
ReturnedMessage dataReturned = correlationData.getReturned();
String str = new String(dataReturned.getMessage().getBody());
System.out.println(str);
log.error("消息發(fā)送失敗资昧,請重試");
return;
}
}
});
}
回退消息
在僅開啟了生產(chǎn)者確認機制的情況下,交換機接收到消息后荆忍,會直接給消息生產(chǎn)者發(fā)送確認消息格带,如果發(fā)現(xiàn)該消息不可路由,那么消息會被直接丟棄刹枉,此時生產(chǎn)者是不知道消息被丟棄這個事件的叽唱。
此時通過設置 mandatory 參數(shù)可以在當消息傳遞過程中不可達目的地時將消息返回給生產(chǎn)者,需搭配使用 ReturnsCallback
@Bean
public RabbitTemplate getRabbitTemplate(){
//若使用confirm-callback或return-callback微宝,必須要配置publisherConfirms或publisherReturns為true
//每個rabbitTemplate只能有一個confirm-callback和return-callback棺亭,如果這里配置了,那么寫生產(chǎn)者的時候不能再寫confirm-callback和return-callback
//使用return-callback時必須設置mandatory為true蟋软,或者在配置中設置mandatory-expression的值為true
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//true:交換機無法將消息進行路由時镶摘,會將該消息返回給生產(chǎn)者
//false:如果發(fā)現(xiàn)消息無法進行路由,則直接丟棄;默認false
rabbitTemplate.setMandatory(true);
//將對象序列化為json串
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
/**
* 如果消息沒有到exchange,則confirm回調,ack=false
* 如果消息到達exchange,則confirm回調,ack=true
* exchange到queue成功,則不回調return
* exchange到queue失敗,則回調return(需設置mandatory=true,否則不回回調,消息就丟了)
*/
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){
log.info("消息發(fā)送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
}else{
log.info("消息發(fā)送失敗:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
}
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息丟失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
}
});
return rabbitTemplate;
}
消費端消息確認
消息端確認模式
NONE:不確認岳守,即監(jiān)聽器監(jiān)聽到消息后直接確認
MANUAL:手動確認凄敢,需要消費端手動回復確認
-
AUTO:容器將根據(jù)監(jiān)聽器是正常返回還是拋出異常來發(fā)出 ack/nack,注意與NONE區(qū)分
- Spring 默認requeue-rejected配置為true湿痢,所以在消費消息發(fā)生異常后該消息會重新入隊涝缝。并且若存在消費集群,會將某個消費端Nack的消息交給其他消費者譬重。
消息確認實現(xiàn)方式
方式一:配置文件
spring:
application:
name: consumer-mq-7100
rabbitmq:
addresses: 127.0.0.1
cache:
channel:
size: 25
# 指定消費端消息確認方式
listener:
simple:
acknowledge-mode: manual
方式二:@RabbitListener 指定
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "directQueue-One", durable = "false"),
exchange = @Exchange(value = "MqSendService-One", type = "direct", durable = "false"),
key = "One"),
ackMode = "MANUAL") //指定消費端消息確認方式
public void tsAckDirectMsg(Message data, Channel channel) throws IOException {
String str = new String(data.getBody());
System.out.println(str + "-----:" + seq);
System.out.println();
seq.incrementAndGet();
System.out.println(data.getMessageProperties().getDeliveryTag());
System.out.println(channel.getChannelNumber());
channel.basicAck(data.getMessageProperties().getDeliveryTag(),false);
}
channel.basicAck() 方法
參數(shù):
- 1拒逮、long deliveryTag
- 消息的索引。通常設為 data.getMessageProperties().getDeliveryTag()臀规。
- 每個消息在一個channel中都有唯一的一個deliveryTag滩援,每次發(fā)送一條,deliveryTag都會+1塔嬉,從0開始計數(shù)玩徊;確認消息傳入的deliveryTag需保證和渠道內(nèi)的一致,否則無法確認邑遏,該消息會被設置為 ready 狀態(tài)。
注意:當deliveryTag被固定一個數(shù)字m時恰矩,當m > deliveryTag就會換個渠道重新監(jiān)聽消費记盒。
無法確認的消息(deliveryTag不匹配,通道已關閉外傅,連接已關閉或 TCP 連接丟失)會重新入隊纪吮,被設為 ready 狀態(tài)俩檬,如果存在其他消費者,會將消息發(fā)送 給其他消費者碾盟,否則反復嘗試僅存消費者棚辽。但沒進行確認的消息會被設為 Unacked。
- 2冰肴、boolean multiple
- 是否批量確認屈藐。
- 當設為true時,會批量確認deliveryTag小于傳入deliveryTag參數(shù)的消息熙尉。
channel.basicNack() 方法
參數(shù)多了一個 boolean requeue 是否重新入隊联逻,前兩個參數(shù)同上。
Message durability(消息持久化)
默認情況下 RabbitMQ 退出或由于某種原因崩潰時检痰,它忽視隊列和消息包归。
隊列的持久化
在聲明隊列的時候設置持久化為 true。
需要注意的就是如果之前聲明的隊列不是持久化的铅歼,需要把原先隊列先刪除公壤,或者重新創(chuàng)建一個持久化的隊列,不然就會出現(xiàn)錯誤椎椰。
@Bean
public Queue directQueue(){
//參數(shù)介紹
//1.隊列名 2.是否持久化 3.是否獨占 4.自動刪除 5.其他參數(shù)
return new Queue("directQueue-One",true,false,false,null);
}
交換機的持久化厦幅,同上。
備用交換機
前言
有了消息回退的功能我們可以感知到消息的投遞情況俭识,但是對于這些無法路由到的消息我們可能只能做一個記錄的功能慨削,然后再手動處理;并且消息回退會增加生產(chǎn)者的復雜性套媚;那么現(xiàn)在如何想要實現(xiàn)不增加生產(chǎn)者的復雜性缚态,并保證消息不丟失呢?因為消息是不可達的堤瘤,所以顯然無法通過死信隊列機制實現(xiàn)玫芦。所以通過這種備用交換機的機制可以實現(xiàn)。
實現(xiàn)原理
它是通過在聲明交換機的時候本辐,為該交換機設置一個備用的交換機桥帆;當主交換機接收一條消息不可達后,會將該消息轉發(fā)到備用交換機慎皱,它在將這些消息發(fā)到自己綁定的隊列老虫,一般備用交換機的類型都設置為 Fanout(廣播類型)。這樣我們可以統(tǒng)一設置一個消費者監(jiān)聽該交換機下的隊列對其進行統(tǒng)一處理茫多。
實現(xiàn)代碼
mandatory 參數(shù)與備份交換機可以一起使用的時候祈匙,如果兩者同時開啟,誰優(yōu)先級高,經(jīng)測試備份交換機優(yōu)先級高
@Configuration
public class RabbitDirectConfig {
@Bean
public Queue alternateQueue(){
//參數(shù)介紹
//1.隊列名 2.是否持久化 3.是否獨占 4.自動刪除 5.其他參數(shù)
Queue queue = QueueBuilder.durable("alternateQueue")
.autoDelete()
.build();
return queue;
}
@Bean
public FanoutExchange alternateExchange(){
return new FanoutExchange("Alternate_Exchange",true,false,null);
}
@Bean
public DirectExchange directExchange(){
// ExchangeBuilder exchange = ExchangeBuilder.directExchange("MqSendService-One")
// .durable(false)
// .autoDelete()
// .withArgument("alternate-exchange", "Alternate_Exchange");
//參數(shù)介紹
//1.交換器名 2.是否持久化 3.自動刪除 4.其他參數(shù)
Map<String,Object> args = new HashMap<>(3);
args.put("alternate-exchange","Alternate_Exchange");
return new DirectExchange("MqSendService-One",false,false,args);
}
@Bean
public Binding bingAlternateExchange(){
return BindingBuilder.bind(alternateQueue()) //綁定隊列
.to(alternateExchange()); //隊列綁定到哪個交換器
}
@Bean
public Binding bingExchange(){
return BindingBuilder.bind(directQueue()) //綁定隊列
.to(directExchange()) //隊列綁定到哪個交換器
.with("One"); //路由key,必須指定
}
}
參考:
https://blog.csdn.net/AhangA/article/details/121641034
https://blog.csdn.net/zhongxu_yuan/article/details/124462229