一、概述
RabbitMQ,俗稱“兔子MQ”(可見其輕巧损姜,敏捷),是目前非常熱門的一款開源消息中間件狸相,不管是互聯(lián)網(wǎng)行業(yè)還是傳統(tǒng)行業(yè)都廣泛使用(最早是為了解決電信行業(yè)系統(tǒng)之間的可靠通信而設(shè)計(jì))薛匪。
- 高可靠性捐川、易擴(kuò)展脓鹃、高可用、功能豐富等
- 支持大多數(shù)(甚至冷門)的編程語言客戶端古沥。
- RabbitMQ遵循
AMQP
協(xié)議瘸右,自身采用Erlang(一種由愛立信開發(fā)的通用面向并發(fā)編程的語言)編寫。 - RabbitMQ也支持
MQTT(物聯(lián)網(wǎng)領(lǐng)域常用)
等其他協(xié)議岩齿。
RabbitMQ具有很強(qiáng)大的插件擴(kuò)展能力太颤,官方和社區(qū)提供了非常豐富的插件可供選擇:
https://www.rabbitmq.com/community-plugins.html
1.1 邏輯架構(gòu)
通過前面的學(xué)習(xí),可以知道盹沈,rabbitMQ有三大核心:交換器(exchange)
龄章、綁定關(guān)系(bindings)
、隊(duì)列(queue)
乞封;
生產(chǎn)者在發(fā)送消息的時(shí)候做裙,需要(通過routingKey
)指定具體發(fā)送到哪個(gè)broker的哪個(gè)虛擬主機(jī)的哪個(gè)交換器里,再通過bindingKey
指定綁定關(guān)系(間接確定實(shí)際存儲的隊(duì)列queue)肃晚。
1.2 交換器類型
RabbitMQ常用的交換器類型有:fanout
锚贱、direct
、topic
关串、headers
四種拧廊。
1.2.1 Fanout
類似于廣播监徘。交換器接收到消息之后,無腦給下屬的每個(gè)queue都發(fā)送一份吧碾,無需考慮交換鍵凰盔、綁定鍵。
1.2.2 Direct
direct
類型的交換器路由規(guī)則很簡單倦春,它會把消息路由到那些BindingKey和RoutingKey完全匹配的隊(duì)列中廊蜒,如下圖:
1.2.3 Topic
相當(dāng)于在Direct的基礎(chǔ)上,支持類似正則的模式匹配溅漾。
topic
類型的交換器在direct
匹配規(guī)則上進(jìn)行了擴(kuò)展山叮,也是將消息路由到BindingKey
和RoutingKey
相匹配的隊(duì)列中,這里的匹配規(guī)則稍微不同添履,具體有如下約定:
BindingKey
和RoutingKey
一樣都是由.
分隔的字符串屁倔;BindingKey
中可以存在兩種特殊字符*
和#
用于模糊匹配;*
用于匹配一個(gè)單詞暮胧,#
用于匹配多個(gè)單詞(可以是0個(gè))锐借;
1.2.4 Headers(不推薦)
headers類型的交換器不依賴于路由鍵的匹配規(guī)則來路由信息,而是根據(jù)發(fā)送的消息內(nèi)容中的headers屬性進(jìn)行匹配往衷。headers類型的交換器性能很差钞翔,不實(shí)用,了解即可席舍。
1.3 數(shù)據(jù)存儲
存儲機(jī)制
RabbitMQ消息有兩種類型:持久化消息和非持久化消息布轿。這兩種消息都會被寫入磁盤。
- 持久化消息在到達(dá)隊(duì)列時(shí)寫入磁盤来颤,同時(shí)會內(nèi)存中保存一份備份汰扭,當(dāng)內(nèi)存吃緊時(shí),消息從內(nèi)存中清除福铅。這會提高一定的性能萝毛。
- 非持久化消息一般只存于內(nèi)存中,當(dāng)內(nèi)存壓力大時(shí)數(shù)據(jù)刷盤(內(nèi)存->硬盤)處理滑黔,以節(jié)省內(nèi)存空間笆包。
RabbitMQ存儲層包含兩個(gè)部分:隊(duì)列索引和消息存儲。
每個(gè)隊(duì)列都有索引(各自擁有獨(dú)立的索引文件)略荡;而消息的持久化文件則會被當(dāng)前虛擬機(jī)下的所有交換器中的所有隊(duì)列共同使用庵佣。
1.3.1 隊(duì)列索引 rabbit_queue_index
索引,維護(hù)隊(duì)列的落盤消息的信息撞芍,如存儲地點(diǎn)秧了、是否已被給消費(fèi)者接收、是否已被消費(fèi)者ack等序无。
每個(gè)隊(duì)列都有相對應(yīng)的索引验毡。
索引使用順序的段文件來存儲衡创,后綴為.idx
,文件名從0開始累加晶通,每個(gè)段文件中包含固定的segment_entry_count
條記錄璃氢,默認(rèn)值是16384
.
每個(gè)index從磁盤中讀取消息的時(shí)候,至少要在內(nèi)存中維護(hù)一個(gè)段文件狮辽。
消息(包括消息頭一也、消息體、屬性)可以直接存儲在index中喉脖,也可以存儲在store中椰苟。最佳的方式是較小的消息存在index中,而較大的消息存在store中树叽。這個(gè)消息大小的界定可以通過queue_index_embed_msgs_below
來配置舆蝴,默認(rèn)值為4096B。當(dāng)一個(gè)消息小于設(shè)定的大小閾值時(shí)题诵,就可以存儲在index中洁仗,這樣性能上可以得到優(yōu)化。一個(gè)完整的消息大小小于這個(gè)值性锭,就放到索引中赠潦,否則放到持久化消息文件中。
1.3.2 消息存儲 rabbit_msg_store
消息以鍵值對
的形式存儲到文件中草冈,一個(gè)虛擬主機(jī)上的所有隊(duì)列使用同一塊存儲她奥,每個(gè)節(jié)點(diǎn)只有一個(gè)。
存儲分為持久化存儲(msg_store_persistent
)和短暫存儲(msg_store_transient)疲陕。持久化存
儲的內(nèi)容在broker重啟后不會丟失方淤,短暫存儲的內(nèi)容在broker重啟后丟失钉赁。
store使用文件來存儲蹄殃,后綴為.rdq,經(jīng)過store處理的所有消息都會以追加的方式寫入到該文件中你踩,當(dāng)該文件的大小超過指定的限制(file_size_limit)后诅岩,將會關(guān)閉該文件并創(chuàng)建一個(gè)新的文件以供新的消息寫入。文件名從0開始進(jìn)行累加带膜。在進(jìn)行消息的存儲時(shí)吩谦,RabbitMQ會在ETS(Erlang TermStorage)表中記錄消息在文件中的位置映射和文件的相關(guān)信息。
1.3.3 消息讀取與刪除
- 讀取
讀取消息時(shí)膝藕,先根據(jù)消息的ID(msg_id)找到對應(yīng)存儲的文件式廷,如果文件存在并且未被鎖住,則直接打開文件芭挽,從指定位置讀取消息內(nèi)容滑废。如果文件不存在或者被鎖住了蝗肪,則發(fā)送請求由store進(jìn)行處理。
- 刪除
刪除消息時(shí)蠕趁,只是從ETS表刪除指定消息的相關(guān)信息薛闪,同時(shí)更新消息對應(yīng)的存儲文件和相關(guān)信息。
在執(zhí)行消息刪除操作時(shí)俺陋,并不立即對文件中的消息進(jìn)行刪除豁延,也就是說消息依然在文件中,僅僅是標(biāo)記為垃圾數(shù)據(jù)而已腊状。當(dāng)一個(gè)文件中都是垃圾數(shù)據(jù)時(shí)可以將這個(gè)文件刪除诱咏。當(dāng)檢測到前后兩個(gè)文件中的有效數(shù)據(jù)可以合并成一個(gè)文件,并且所有的垃圾數(shù)據(jù)的大小和所有文件(至少有3個(gè)文件存在的情況下)的數(shù)據(jù)大小的比值超過設(shè)置的閾值garbage_fraction(默認(rèn)值0.5)時(shí)缴挖,才會觸發(fā)垃圾回收胰苏,將這兩個(gè)文件合并,執(zhí)行合并的兩個(gè)文件一定是邏輯上相鄰的兩個(gè)文件醇疼。
合并邏輯:
鎖定這兩個(gè)文件
先整理前面的文件的有效數(shù)據(jù)硕并,再整理后面的文件的有效數(shù)據(jù)
將后面文件的有效數(shù)據(jù)寫入到前面的文件中
更新消息在ETS表中的記錄
刪除后面文件
1.3.4 隊(duì)列結(jié)構(gòu)
通常隊(duì)列由rabbit_amqqueue_process
和backing_queue
這兩部分組成,
rabbit_amqqueue_process
負(fù)責(zé)協(xié)議相關(guān)的消息處理秧荆,即接收生產(chǎn)者發(fā)布的消息倔毙、向消費(fèi)者交付消息、處理消息的確認(rèn)(包括生產(chǎn)端的confirm和消費(fèi)端的ack)等乙濒。backing_queue
是消息存儲的具體形式和引擎陕赃,并向rabbit_amqqueue_process
提供相關(guān)的接口以供調(diào)用。
如果消息投遞的目的隊(duì)列是空的颁股,并且有消費(fèi)者訂閱了這個(gè)隊(duì)列么库,那么該消息會直接發(fā)送給消費(fèi)者,不會經(jīng)過隊(duì)列這一步甘有。當(dāng)消息無法直接投遞給消費(fèi)者時(shí)诉儒,需要暫時(shí)將消息存入隊(duì)列,以便重新投遞亏掀。
rabbit_variable_queue.erl
源碼中定義了RabbitMQ隊(duì)列的4種狀態(tài):
- alpha:消息索引和消息內(nèi)容都存內(nèi)存忱反,最耗內(nèi)存,很少消耗CPU滤愕;
- beta:消息索引存內(nèi)存温算,消息內(nèi)存存磁盤;
- gama:消息索引內(nèi)存和磁盤都有间影,消息內(nèi)容存磁盤注竿;
- delta:消息索引和內(nèi)容都存磁盤,基本不消耗內(nèi)存,消耗更多CPU和I/O操作
消息存入隊(duì)列后巩割,不是固定不變的胰丁,它會隨著系統(tǒng)的負(fù)載在隊(duì)列中不斷流動(dòng),消息的狀態(tài)會不斷發(fā)生變化喂分。
持久化的消息锦庸,索引和內(nèi)容都必須先保存在磁盤上,才會處于上述狀態(tài)中的一種蒲祈,側(cè)面說明甘萧,gama狀態(tài)是持久化消息才會有的狀態(tài)。
在運(yùn)行時(shí)梆掸,RabbitMQ會根據(jù)消息傳遞的速度定期計(jì)算一個(gè)當(dāng)前內(nèi)存中能夠保存的最大消息數(shù)量(target_ram_count
)扬卷,如果alpha狀態(tài)的消息數(shù)量大于此值,則會引起消息的狀態(tài)轉(zhuǎn)換酸钦,多余的消息可能會轉(zhuǎn)換到beta怪得、gama或者delta狀態(tài)。區(qū)分這4種狀態(tài)的主要作用是滿足不同的內(nèi)存和CPU需求卑硫。
對于普通沒有設(shè)置優(yōu)先級和鏡像的隊(duì)列來說徒恋,backing_queue
的默認(rèn)實(shí)現(xiàn)是rabbit_variable_queue
,其內(nèi)部通過5個(gè)子隊(duì)列Q1欢伏、Q2入挣、delta、Q3硝拧、Q4來體現(xiàn)消息的各個(gè)狀態(tài)径筏。
消息的流動(dòng)如圖所示:
消息初始存放在
Q1
;當(dāng)
Q4
內(nèi)存充足時(shí)障陶,會直接從Q1
推送到Q4
滋恬,再由消費(fèi)者消費(fèi)掉;當(dāng)
Q4
內(nèi)存占用過大時(shí)抱究,Q1
的消息會推送到Q2
恢氯,由Q2
推送到Q3
,再由Q3
推動(dòng)到Q4
媳维,最后被消費(fèi)掉酿雪;-
當(dāng)
Q3
、Q4
都內(nèi)存占用過大時(shí)侄刽,消息就會由Q1
、Q2
推送到delta
朋凉,硬盤直接存儲起來州丹。等Q3
、Q4
的內(nèi)存占用降低之后,再推送到Q3
墓毒,最后由Q3
推送到Q4
吓揪,被消費(fèi)掉。即:消息最后都會交由Q4傳遞到消費(fèi)者所计。
消費(fèi)者獲取消息也會引起消息的狀態(tài)轉(zhuǎn)換柠辞。
當(dāng)消費(fèi)者獲取消息時(shí)
- 首先會從Q4中獲取消息,如果獲取成功則返回主胧。
- 如果Q4為空叭首,則嘗試從Q3中獲取消息,系統(tǒng)首先會判斷Q3是否為空踪栋,如果為空則返回隊(duì)列為空焙格,即此時(shí)隊(duì)列中無消息。
- 如果Q3不為空夷都,則取出Q3中的消息眷唉;進(jìn)而再判斷此時(shí)Q3和Delta中的長度,如果都為空囤官,則可以認(rèn)為 Q2冬阳、Delta、 Q3党饮、Q4 全部為空咒彤,此時(shí)將Q1中的消息直接轉(zhuǎn)移至Q4,下次直接從Q4中獲取消息谆奥。
- 如果Q3讀取消息之后為空粹污,Delta不為空,則將Delta的消息轉(zhuǎn)移至Q3中捏检,下次可以直接從Q3中獲取消息荞驴。
在將消息從Delta轉(zhuǎn)移到Q3的過程中,是按照索引分段讀取的:首先讀取某一段贯城,然后判斷讀取的消息的個(gè)數(shù)與Delta中消息的個(gè)數(shù)是否相等熊楼,如果相等,則可以判定此時(shí)Delta中己無消息能犯,則直接將Q2和剛讀取到的消息一并放入到Q3中鲫骗,如果不相等,僅將此次讀取到的消息轉(zhuǎn)移到Q3踩晶。
這里就有兩處疑問:
-
第一個(gè)疑問是:為什么Q3為空則可以認(rèn)定整個(gè)隊(duì)列為空执泰?
- 試想一下,如果Q3為空渡蜻,Delta不為空术吝,那么在Q3取出最后一條消息的時(shí)候计济,Delta 上的消息就會被轉(zhuǎn)移到Q3這樣與 Q3 為空矛盾;
- 如果Delta 為空且Q2不為空排苍,則在Q3取出最后一條消息時(shí)會將Q2的消息并入到Q3中沦寂,這樣也與Q3為空矛盾;
- 在Q3取出最后一條消息之后淘衙,如果Q2传藏、Delta、Q3都為空彤守,且Q1不為空時(shí)毯侦,則Q1的消息會\被轉(zhuǎn)移到Q4,這與Q4為空矛盾遗增。
-
為什么Q3和Delta都為空時(shí)叫惊,則可以認(rèn)為 Q2、Delta做修、Q3霍狰、Q4全部為空?
其實(shí)針對第一個(gè)問題的論述也解釋了這個(gè)問題饰及,通常在負(fù)載正常時(shí)蔗坯,如果消費(fèi)速度大于生產(chǎn)速度,對于不需要保證可靠不丟失的消息來說燎含,極有可能只會處于alpha狀態(tài)宾濒。
對于持久化消息,它一定會進(jìn)入gamma狀態(tài)屏箍,在開啟publisher confirm機(jī)制時(shí)绘梦,只有到了gamma 狀態(tài)時(shí)才會確認(rèn)該消息己被接收,若消息消費(fèi)速度足夠快赴魁、內(nèi)存也充足卸奉,這些消息也不會繼續(xù)走到下一個(gè)狀態(tài)。
為什么消息的堆積導(dǎo)致性能下降颖御?
在系統(tǒng)負(fù)載較高時(shí)榄棵,消息若不能很快被消費(fèi)掉,這些消息就會進(jìn)入到很深的隊(duì)列中去潘拱,這樣會增加處理每個(gè)消息的平均開銷疹鳄。
(會花費(fèi)更多的性能維護(hù)消息的狀態(tài)流轉(zhuǎn))
因?yàn)橐ǜ嗟臅r(shí)間和資源處理“堆積”的消息,如此用來處理新流入的消息的能力就會降低芦岂,使得后流入的消息又被積壓到很深的隊(duì)列中瘪弓,繼續(xù)增大處理每個(gè)消息的平均開銷,繼而情況變得越來越惡化盔腔,使得系統(tǒng)的處理能力大大降低杠茬。
應(yīng)對這一問題一般有3種措施:
- 增加prefetch_count的值月褥,即一次發(fā)送多條消息給消費(fèi)者弛随,加快消息被消費(fèi)的速度瓢喉。
- 采用multiple ack,降低處理 ack 帶來的開銷
- 流量控制
二舀透、基本命令
# 前臺啟動(dòng)Erlang VM和RabbitMQ
rabbitmq-server
# 后臺啟動(dòng)
rabbitmq-server -detached
# 停止RabbitMQ和Erlang VM
rabbitmqctl stop
# 查看所有隊(duì)列
rabbitmqctl list_queues
# 查看所有虛擬主機(jī)
rabbitmqctl list_vhosts
# 在Erlang VM運(yùn)行的情況下啟動(dòng)RabbitMQ應(yīng)用
rabbitmqctl start_app rabbitmqctl stop_app
# 查看節(jié)點(diǎn)狀態(tài)
rabbitmqctl status
# 查看所有可用的插件
rabbitmq-plugins list
# 啟用插件
rabbitmq-plugins enable <plugin-name>
# 停用插件
rabbitmq-plugins disable <plugin-name>
# 添加用戶
rabbitmqctl add_user username password
# 列出所有用戶:
rabbitmqctl list_users
# 刪除用戶:
rabbitmqctl delete_user username
# 清除用戶權(quán)限:
rabbitmqctl clear_permissions -p vhostpath username
# 列出用戶權(quán)限:
rabbitmqctl list_user_permissions username
# 修改密碼:
rabbitmqctl change_password username newpassword
# 設(shè)置用戶權(quán)限:
rabbitmqctl set_permissions -p vhostpath username "^$" ".*" ".*"
# 創(chuàng)建虛擬主機(jī):
rabbitmqctl add_vhost vhostpath #
# 列出所以虛擬主機(jī):
rabbitmqctl list_vhosts
# 列出虛擬主機(jī)上的所有權(quán)限:
rabbitmqctl list_permissions -p vhostpath
# 刪除虛擬主機(jī):
rabbitmqctl delete_vhost vhost vhostpath
# 移除所有數(shù)據(jù)栓票,要在 rabbitmqctl stop_app 之后使用:
rabbitmqctl reset
三、RabbitMq工作流程
3.1 生產(chǎn)者發(fā)送消息的流程
生產(chǎn)者連接RabbitMQ愕够,建立TCP連接(Connection)走贪,開啟信道(Channel)
生產(chǎn)者聲明一個(gè)Exchange(交換器),并設(shè)置相關(guān)屬性惑芭,比如交換器類型坠狡、是否持久化等
生產(chǎn)者聲明一個(gè)Queue(隊(duì)列)并設(shè)置相關(guān)屬性,比如是否排他遂跟、是否持久化逃沿、是否自動(dòng)刪除等
生產(chǎn)者通過 bindingKey (綁定Key)將交換器和隊(duì)列綁定( binding )起來
生產(chǎn)者發(fā)送消息至RabbitMQ Broker,其中包含 routingKey (路由鍵)幻锁、交換器等信息
相應(yīng)的交換器根據(jù)接收到的 routingKey 查找相匹配的隊(duì)列凯亮。
如果找到,則將從生產(chǎn)者發(fā)送過來的消息存入相應(yīng)的隊(duì)列中哄尔。
如果沒有找到假消,則根據(jù)生產(chǎn)者配置的屬性選擇丟棄還是回退給生產(chǎn)者
關(guān)閉信道、關(guān)閉連接岭接。
3.2 消費(fèi)者接收消息的過程
- 消費(fèi)者連接到RabbitMQ Broker 富拗,建立一個(gè)連接(Connection ) ,開啟一個(gè)信道(Channel) 鸣戴。
- 消費(fèi)者向RabbitMQ Broker 請求消費(fèi)相應(yīng)隊(duì)列中的消息啃沪,可能會設(shè)置相應(yīng)的回調(diào)函數(shù)以及做一些準(zhǔn)備工作
- 等待 RabbitMQ Broker 回應(yīng)并投遞相應(yīng)隊(duì)列中的消息, 消費(fèi)者接收消息葵擎。
- 消費(fèi)者確認(rèn)(ack) 接收到的消息谅阿。
- RabbitMQ 從隊(duì)列中刪除相應(yīng)己經(jīng)被確認(rèn)的消息。
- 關(guān)閉信道酬滤、關(guān)閉連接签餐。
3.3 代碼演示
3.3.0 常量
public class Demo01Constrant {
public static final String queueName ="zephyrQueue.biz";
public static final String exName="zephyrEx.biz";
public static final String routingKey="hello.world";
}
3.3.1 生產(chǎn)者:
public class HelloProducer {
public static void main(String[] args) throws IOException, TimeoutException {
/** 0. 初始化*/
// 獲取連接工廠
ConnectionFactory factory = new ConnectionFactory();
// 設(shè)置主機(jī)名 hostname
factory.setHost("192.168.11.60");
// 設(shè)置虛擬主機(jī)名稱 /在url中的轉(zhuǎn)義字符是 %2f
factory.setVirtualHost("/");
// 用戶名
factory.setUsername("root");
// 密碼
factory.setPassword("123456");
// amqp的端口號
factory.setPort(5672);
/** 1. 建立連接*/
// 建立TCP連接
Connection connection = factory.newConnection();
// 獲取通道
Channel channel = connection.createChannel();
/**2. 聲明消息隊(duì)列 */
// 消息隊(duì)列名稱
// 是否是持久化的
// 是否是排他的
// 是否是自動(dòng)刪除的
// 消息隊(duì)列的屬性信息。使用默認(rèn)值盯串;
channel.queueDeclare(Demo01Constrant.queueName, false, false, true, null);
/** 3. 聲明交換器 */
// 交換器的名稱
// 交換器的類型
// 交換器是否是持久化的
// 交換器是否是自動(dòng)刪除的
// 交換器的屬性map集合
channel.exchangeDeclare(Demo01Constrant.exName, BuiltinExchangeType.DIRECT, false, false, null);
/**4. 交換器和消息隊(duì)列綁定氯檐,并指定路由鍵*/
channel.queueBind(Demo01Constrant.queueName, Demo01Constrant.exName, Demo01Constrant.routingKey);
/**5. 發(fā)送消息*/
// 交換器的名字
// 該消息的路由鍵
// 該消息的屬性BasicProperties對象
// 消息的字節(jié)數(shù)組
channel.basicPublish(Demo01Constrant.exName, Demo01Constrant.routingKey, null, "hello world message".getBytes());
/** 6. 關(guān)閉通道、連接*/
channel.close();
connection.close();
}
}
3.3.2 簡單消費(fèi)者
通過拉取的方式消費(fèi):main方法執(zhí)行完畢就結(jié)束体捏,mq中有消息就讀取冠摄,沒消息就啥也不干:channel.basicGet
public class HelloGetConsumer {
public static void main(String[] args) throws NoSuchAlgorithmException, KeyManagementException, URISyntaxException, IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
// 指定協(xié)議: amqp://
// 指定用戶名 root
// 指定密碼 123456
// 指定host 192.168.11.60
// 指定端口號 5672
// 指定虛擬主機(jī) %2f
factory.setUri("amqp://root:123456@192.168.11.60:5672/%2f");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// 拉消息模式
// 指定從哪個(gè)消費(fèi)者消費(fèi)消息
// 指定是否自動(dòng)確認(rèn)消息 true表示自動(dòng)確認(rèn)
final GetResponse getResponse = channel.basicGet(Demo01Constrant.queueName, true);
// 獲取消息體 hello world 1
final byte[] body = getResponse.getBody();
System.out.println(new String(body));
// 關(guān)閉信道糯崎、連接
channel.close();
connection.close();
}
}
3.3.3 阻塞式消費(fèi)者
通過消息推送的方式消費(fèi):main方法執(zhí)行后,mq中有消息就讀取河泳,沒消息就阻塞:channel.basicConsume
public class HelloConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.11.60:5672/%2f");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// 確保MQ中有該隊(duì)列沃呢,如果沒有則創(chuàng)建
channel.queueDeclare(Demo01Constrant.queueName, false, false, true, null);
// 監(jiān)聽消息,一旦有消息推送過來拆挥,就調(diào)用第一個(gè)lambda表達(dá)式
channel.basicConsume(Demo01Constrant.queueName, (consumerTag, message) -> {
System.out.println(new String(message.getBody()));
}, (consumerTag) -> {});
// channel.close();
// connection.close();
}
}
3.4 Connection和Channel的關(guān)系
3.4.1 創(chuàng)建連接 new Connection();
其源碼如下:
public Connection newConnection(ExecutorService executor, AddressResolver addressResolver, String clientProvidedName) throws IOException, TimeoutException {
...
// 設(shè)置參數(shù)
ConnectionParams params = params(executor);
...
// 判斷自動(dòng)恢復(fù)薄霜,默認(rèn)true
if (isAutomaticRecoveryEnabled()) {
// 創(chuàng)建并返回可恢復(fù)連接對象
AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addressResolver, metricsCollector);
conn.init();
return conn;
} else {
// 獲取到所有地址并遍歷,連接上一個(gè)就直接返回對應(yīng)的連接纸兔;
List<Address> addrs = addressResolver.getAddresses();
Exception lastException = null;
for (Address addr : addrs) {
try {
FrameHandler handler = fhFactory.create(addr, clientProvidedName);
AMQConnection conn = createConnection(params, handler, metricsCollector);
conn.start();
this.metricsCollector.newConnection(conn);
return conn;
} catch (IOException e) {
lastException = e;
} catch (TimeoutException te) {
lastException = te;
}
}
...
throw new IOException("failed to connect");
}
}
其主要進(jìn)行了如下工作:
-
調(diào)用param()方法惰瓜,進(jìn)行一些初始化的參數(shù)設(shè)置(用戶配置的線程池會在此處進(jìn)行配置);
// 參數(shù)設(shè)置 public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) { ConnectionParams result = new ConnectionParams(); // 會在這里配置用戶設(shè)置的線程池 result.setConsumerWorkServiceExecutor(consumerWorkServiceExecutor); // 一系列set result.setXxx(xx); ... return result; }
-
判斷是否是自動(dòng)回復(fù)的連接(默認(rèn)true)
-
若true汉矿,則在init()方法中崎坊,遍歷服務(wù)器地址,創(chuàng)建可恢復(fù)連接對象(RecoveryAwareAMQConnection)洲拇,并調(diào)用start()方法進(jìn)行連接奈揍。若成功,則直接返回連接對象.
public void init() throws IOException, TimeoutException { // cf是RecoveryAwareAMQConnectionFactory對象 this.delegate = this.cf.newConnection(); this.addAutomaticRecoveryListener(delegate); } public RecoveryAwareAMQConnection newConnection() throws IOException, TimeoutException { Exception lastException = null; List<Address> shuffled = shuffle(addressResolver.getAddresses()); for (Address addr : shuffled) { try { FrameHandler frameHandler = factory.create(addr, connectionName()); RecoveryAwareAMQConnection conn = createConnection(params, frameHandler, metricsCollector); conn.start(); metricsCollector.newConnection(conn); return conn; } catch (IOException e) { lastException = e; } catch (TimeoutException te) { lastException = te; } } ... throw new IOException("failed to connect"); }
若false呻待,則直接遍歷服務(wù)器地址打月,創(chuàng)建普通連接對象(AMQConnection),并調(diào)用start()方法蚕捉。若成功奏篙,則直接返回連接對象;
-
-
而start()方法則進(jìn)行一些建立連接所必須的協(xié)議通信工作迫淹;
// 協(xié)議通信 public void start() throws IOException, TimeoutException { ... // 封裝StartOk(或SecureOk)報(bào)文對象(StartOk秘通、SecureOk都屬于AMQP的規(guī)范) Method method = (challenge == null) ? new AMQP.Connection.StartOk.Builder() .clientProperties(_clientProperties) .mechanism(sm.getName()) .response(response) .build() : new AMQP.Connection.SecureOk.Builder().response(response).build(); ... // 向服務(wù)端發(fā)送報(bào)文對象 Method serverResponse = _channel0.rpc(method, handshakeTimeout/2).getMethod(); ... // 初始化channel管理器(_channelManager為空則不能創(chuàng)建channel) _channelManager = instantiateChannelManager(channelMax, threadFactory); ... // 封裝并發(fā)送TuneOk報(bào)文對象(優(yōu)化完畢) _channel0.transmit(new AMQP.Connection.TuneOk.Builder() .channelMax(channelMax) .frameMax(frameMax) .heartbeat(heartbeat) .build()); // 封裝并發(fā)送Open報(bào)文對象 _channel0.exnWrappingRpc(new AMQP.Connection.Open.Builder() .virtualHost(_virtualHost) .build()); ... }
至此,connection就創(chuàng)建好了敛熬。
3.4.2 創(chuàng)建信道 new Channel();
-
獲取到在創(chuàng)建連接過程中的ChannelManager對象cm肺稀,再通過cm創(chuàng)建channel對象;
@Override public Channel createChannel() throws IOException { ensureIsOpen(); ChannelManager cm = _channelManager; if (cm == null) return null; Channel channel = cm.createChannel(this); metricsCollector.newChannel(channel); return channel; }
-
在synchronized鎖控制下应民,獲取channel編號话原,然后逐步完成channel對象創(chuàng)建;
public ChannelN createChannel(AMQConnection connection) throws IOException { ChannelN ch; synchronized (this.monitor) { // 獲取channel編號 int channelNumber = channelNumberAllocator.allocate(); if (channelNumber == -1) { return null; } else { // 傳入連接诲锹、編號繁仁,創(chuàng)建channel ch = addNewChannel(connection, channelNumber); } } ch.open(); // now that it's been safely added return ch; } private ChannelN addNewChannel(AMQConnection connection, int channelNumber) { ... ChannelN ch = instantiateChannel(connection, channelNumber, this.workService); _channelMap.put(ch.getChannelNumber(), ch); return ch; } protected ChannelN instantiateChannel(AMQConnection connection, int channelNumber, ConsumerWorkService workService) { return new ChannelN(connection, channelNumber, workService, this.metricsCollector); } RecoveryAwareChannelManager.java @Override protected ChannelN instantiateChannel(AMQConnection connection, int channelNumber, ConsumerWorkService workService) { return new RecoveryAwareChannelN(connection, channelNumber, workService, this.metricsCollector); }
至此,channel就創(chuàng)建好了归园。
3.4.3 綁定連接與信道 queueBind();
在調(diào)用了channel.queueDeclare
黄虱、channel.exchangeDeclare
之后,就需要調(diào)用queueBind()庸诱,綁定connection與channel:
queueBind()方法只是依據(jù)AMQP協(xié)議捻浦,嘗試發(fā)送Queue.Bind報(bào)文晤揣,并得到Queue.BindOk響應(yīng)
public Queue.BindOk queueBind(String queue, String exchange,
String routingKey, Map<String, Object> arguments)
throws IOException
{
validateQueueNameLength(queue);
return (Queue.BindOk)
// 封裝并發(fā)送Queue.Bind報(bào)文對象,返回的是Queue.BindOk對象朱灿,表示綁定成功
exnWrappingRpc(new Queue.Bind.Builder()
.queue(queue)
.exchange(exchange)
.routingKey(routingKey)
.arguments(arguments)
.build())
.getMethod();
}
通過exnWrappingRpc
方法一路追溯昧识,可以看到,需要發(fā)送的幀數(shù)據(jù)被存儲到中母剥,然后喚醒selector
對象滞诺,通過輪詢選擇器發(fā)送(Reactor模型)形导,進(jìn)而實(shí)現(xiàn)connection對象的復(fù)用环疼。
public AMQCommand exnWrappingRpc(Method m)
throws IOException
{
...
return privateRpc(m);
}
private AMQCommand privateRpc(Method m)
throws IOException, ShutdownSignalException
{
SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation(m);
rpc(m, k);
// 至此,請求發(fā)送完畢朵耕,調(diào)用getReply()方法休眠當(dāng)前線程炫隶,直到connection對象的讀線程返回響應(yīng)。
// At this point, the request method has been sent, and we
// should wait for the reply to arrive.
//
// Calling getReply() on the continuation puts us to sleep
// until the connection's reader-thread throws the reply over
// the fence or the RPC times out (if enabled)
if(_rpcTimeout == NO_RPC_TIMEOUT) {
return k.getReply();
} else {
try {
return k.getReply(_rpcTimeout);
} catch (TimeoutException e) {
throw wrapTimeoutException(m, e);
}
}
}
public void rpc(Method m, RpcContinuation k)
throws IOException
{
synchronized (_channelMutex) {
ensureIsOpen();
quiescingRpc(m, k);
}
}
public void quiescingRpc(Method m, RpcContinuation k)
throws IOException
{
synchronized (_channelMutex) {
enqueueRpc(k);
quiescingTransmit(m);
}
}
public void quiescingTransmit(Method m) throws IOException {
synchronized (_channelMutex) {
quiescingTransmit(new AMQCommand(m));
}
}
public void quiescingTransmit(AMQCommand c) throws IOException {
synchronized (_channelMutex) {
...
c.transmit(this);
}
}
public void transmit(AMQChannel channel) throws IOException {
int channelNumber = channel.getChannelNumber();
AMQConnection connection = channel.getConnection();
synchronized (assembler) {
Method m = this.assembler.getMethod();
if (m.hasContent()) {
...
connection.writeFrame(m.toFrame(channelNumber));
connection.writeFrame(headerFrame);
for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
...
connection.writeFrame(frame);
}
} else {
connection.writeFrame(m.toFrame(channelNumber));
}
}
connection.flush();
}
// (通過connection對象)寫數(shù)據(jù)幀
public void writeFrame(Frame f) throws IOException {
_frameHandler.writeFrame(f);
_heartbeatSender.signalActivity();
}
SocketChannelFrameHandler.java
public void writeFrame(Frame frame) throws IOException {
state.write(frame);
}
public void write(Frame frame) throws IOException {
sendWriteRequest(new FrameWriteRequest(frame));
}
private void sendWriteRequest(WriteRequest writeRequest) throws IOException {
boolean offered = this.writeQueue.offer(writeRequest);
if(offered) {
this.writeSelectorState.registerFrameHandlerState(this, SelectionKey.OP_WRITE);
this.readSelectorState.selector.wakeup();
} else {
throw new IOException("Frame enqueuing failed");
}
...
}
readSelectorState.selector對應(yīng)的源碼如下:
package com.rabbitmq.client.impl.nio;
public class SelectorHolder {
final Selector selector;
final Set<SocketChannelRegistration> registrations = Collections
.newSetFromMap(new ConcurrentHashMap<SocketChannelRegistration, Boolean>());
SelectorHolder(Selector selector) {
this.selector = selector;
}
public void registerFrameHandlerState(SocketChannelFrameHandlerState state, int operations) {
registrations.add(new SocketChannelRegistration(state, operations));
selector.wakeup();
}
}