01. RabbitMQ基礎(chǔ)使用

一、概述

RabbitMQ,俗稱“兔子MQ”(可見其輕巧损姜,敏捷),是目前非常熱門的一款開源消息中間件狸相,不管是互聯(lián)網(wǎng)行業(yè)還是傳統(tǒng)行業(yè)都廣泛使用(最早是為了解決電信行業(yè)系統(tǒng)之間的可靠通信而設(shè)計(jì))薛匪。

  • 高可靠性捐川、易擴(kuò)展脓鹃、高可用、功能豐富等
  • 支持大多數(shù)(甚至冷門)的編程語言客戶端古沥。
  • RabbitMQ遵循AMQP協(xié)議瘸右,自身采用Erlang(一種由愛立信開發(fā)的通用面向并發(fā)編程的語言)編寫。
  • RabbitMQ也支持MQTT(物聯(lián)網(wǎng)領(lǐng)域常用)等其他協(xié)議岩齿。

RabbitMQ具有很強(qiáng)大的插件擴(kuò)展能力太颤,官方和社區(qū)提供了非常豐富的插件可供選擇:

https://www.rabbitmq.com/community-plugins.html

1.1 邏輯架構(gòu)

通過前面的學(xué)習(xí),可以知道盹沈,rabbitMQ有三大核心:交換器(exchange)龄章、綁定關(guān)系(bindings)隊(duì)列(queue)乞封;

生產(chǎn)者在發(fā)送消息的時(shí)候做裙,需要(通過routingKey)指定具體發(fā)送到哪個(gè)broker的哪個(gè)虛擬主機(jī)的哪個(gè)交換器里,再通過bindingKey指定綁定關(guān)系(間接確定實(shí)際存儲的隊(duì)列queue)肃晚。

image

1.2 交換器類型

RabbitMQ常用的交換器類型有:fanout锚贱、directtopic关串、headers四種拧廊。

1.2.1 Fanout

類似于廣播监徘。交換器接收到消息之后,無腦給下屬的每個(gè)queue都發(fā)送一份吧碾,無需考慮交換鍵凰盔、綁定鍵。

image

1.2.2 Direct

direct類型的交換器路由規(guī)則很簡單倦春,它會把消息路由到那些BindingKey和RoutingKey完全匹配的隊(duì)列中廊蜒,如下圖:

image

1.2.3 Topic

相當(dāng)于在Direct的基礎(chǔ)上,支持類似正則的模式匹配溅漾。

topic類型的交換器在direct匹配規(guī)則上進(jìn)行了擴(kuò)展山叮,也是將消息路由到BindingKeyRoutingKey相匹配的隊(duì)列中,這里的匹配規(guī)則稍微不同添履,具體有如下約定:

  • BindingKeyRoutingKey一樣都是由.分隔的字符串屁倔;BindingKey中可以存在兩種特殊字符*#用于模糊匹配;

  • *用于匹配一個(gè)單詞暮胧,#用于匹配多個(gè)單詞(可以是0個(gè))锐借;

image

1.2.4 Headers(不推薦)

headers類型的交換器不依賴于路由鍵的匹配規(guī)則來路由信息,而是根據(jù)發(fā)送的消息內(nèi)容中的headers屬性進(jìn)行匹配往衷。headers類型的交換器性能很差钞翔,不實(shí)用,了解即可席舍。

1.3 數(shù)據(jù)存儲

存儲機(jī)制

RabbitMQ消息有兩種類型:持久化消息和非持久化消息布轿。這兩種消息都會被寫入磁盤。

  • 持久化消息在到達(dá)隊(duì)列時(shí)寫入磁盤来颤,同時(shí)會內(nèi)存中保存一份備份汰扭,當(dāng)內(nèi)存吃緊時(shí),消息從內(nèi)存中清除福铅。這會提高一定的性能萝毛。
  • 非持久化消息一般只存于內(nèi)存中,當(dāng)內(nèi)存壓力大時(shí)數(shù)據(jù)刷盤(內(nèi)存->硬盤)處理滑黔,以節(jié)省內(nèi)存空間笆包。

RabbitMQ存儲層包含兩個(gè)部分:隊(duì)列索引和消息存儲。

每個(gè)隊(duì)列都有索引(各自擁有獨(dú)立的索引文件)略荡;而消息的持久化文件則會被當(dāng)前虛擬機(jī)下的所有交換器中的所有隊(duì)列共同使用庵佣。

image

1.3.1 隊(duì)列索引 rabbit_queue_index

索引,維護(hù)隊(duì)列的落盤消息的信息撞芍,如存儲地點(diǎn)秧了、是否已被給消費(fèi)者接收、是否已被消費(fèi)者ack等序无。

每個(gè)隊(duì)列都有相對應(yīng)的索引验毡。

索引使用順序的段文件來存儲衡创,后綴為.idx,文件名從0開始累加晶通,每個(gè)段文件中包含固定的segment_entry_count條記錄璃氢,默認(rèn)值是16384.

每個(gè)index從磁盤中讀取消息的時(shí)候,至少要在內(nèi)存中維護(hù)一個(gè)段文件狮辽。

消息(包括消息頭一也、消息體、屬性)可以直接存儲在index中喉脖,也可以存儲在store中椰苟。最佳的方式是較小的消息存在index中,而較大的消息存在store中树叽。這個(gè)消息大小的界定可以通過queue_index_embed_msgs_below來配置舆蝴,默認(rèn)值為4096B。當(dāng)一個(gè)消息小于設(shè)定的大小閾值時(shí)题诵,就可以存儲在index中洁仗,這樣性能上可以得到優(yōu)化。一個(gè)完整的消息大小小于這個(gè)值性锭,就放到索引中赠潦,否則放到持久化消息文件中。

1.3.2 消息存儲 rabbit_msg_store

消息以鍵值對的形式存儲到文件中草冈,一個(gè)虛擬主機(jī)上的所有隊(duì)列使用同一塊存儲她奥,每個(gè)節(jié)點(diǎn)只有一個(gè)。

存儲分為持久化存儲(msg_store_persistent)和短暫存儲(msg_store_transient)疲陕。持久化存

儲的內(nèi)容在broker重啟后不會丟失方淤,短暫存儲的內(nèi)容在broker重啟后丟失钉赁。

store使用文件來存儲蹄殃,后綴為.rdq,經(jīng)過store處理的所有消息都會以追加的方式寫入到該文件中你踩,當(dāng)該文件的大小超過指定的限制(file_size_limit)后诅岩,將會關(guān)閉該文件并創(chuàng)建一個(gè)新的文件以供新的消息寫入。文件名從0開始進(jìn)行累加带膜。在進(jìn)行消息的存儲時(shí)吩谦,RabbitMQ會在ETS(Erlang TermStorage)表中記錄消息在文件中的位置映射和文件的相關(guān)信息。

1.3.3 消息讀取與刪除

  • 讀取

讀取消息時(shí)膝藕,先根據(jù)消息的ID(msg_id)找到對應(yīng)存儲的文件式廷,如果文件存在并且未被鎖住,則直接打開文件芭挽,從指定位置讀取消息內(nèi)容滑废。如果文件不存在或者被鎖住了蝗肪,則發(fā)送請求由store進(jìn)行處理。

  • 刪除

刪除消息時(shí)蠕趁,只是從ETS表刪除指定消息的相關(guān)信息薛闪,同時(shí)更新消息對應(yīng)的存儲文件和相關(guān)信息。

在執(zhí)行消息刪除操作時(shí)俺陋,并不立即對文件中的消息進(jìn)行刪除豁延,也就是說消息依然在文件中,僅僅是標(biāo)記為垃圾數(shù)據(jù)而已腊状。當(dāng)一個(gè)文件中都是垃圾數(shù)據(jù)時(shí)可以將這個(gè)文件刪除诱咏。當(dāng)檢測到前后兩個(gè)文件中的有效數(shù)據(jù)可以合并成一個(gè)文件,并且所有的垃圾數(shù)據(jù)的大小和所有文件(至少有3個(gè)文件存在的情況下)的數(shù)據(jù)大小的比值超過設(shè)置的閾值garbage_fraction(默認(rèn)值0.5)時(shí)缴挖,才會觸發(fā)垃圾回收胰苏,將這兩個(gè)文件合并,執(zhí)行合并的兩個(gè)文件一定是邏輯上相鄰的兩個(gè)文件醇疼。

合并邏輯:

  • 鎖定這兩個(gè)文件

  • 先整理前面的文件的有效數(shù)據(jù)硕并,再整理后面的文件的有效數(shù)據(jù)

  • 將后面文件的有效數(shù)據(jù)寫入到前面的文件中

  • 更新消息在ETS表中的記錄

  • 刪除后面文件

image

1.3.4 隊(duì)列結(jié)構(gòu)

通常隊(duì)列由rabbit_amqqueue_processbacking_queue這兩部分組成,

image
  • rabbit_amqqueue_process負(fù)責(zé)協(xié)議相關(guān)的消息處理秧荆,即接收生產(chǎn)者發(fā)布的消息倔毙、向消費(fèi)者交付消息、處理消息的確認(rèn)(包括生產(chǎn)端的confirm和消費(fèi)端的ack)等乙濒。

  • backing_queue是消息存儲的具體形式和引擎陕赃,并向rabbit_amqqueue_process提供相關(guān)的接口以供調(diào)用。

如果消息投遞的目的隊(duì)列是空的颁股,并且有消費(fèi)者訂閱了這個(gè)隊(duì)列么库,那么該消息會直接發(fā)送給消費(fèi)者,不會經(jīng)過隊(duì)列這一步甘有。當(dāng)消息無法直接投遞給消費(fèi)者時(shí)诉儒,需要暫時(shí)將消息存入隊(duì)列,以便重新投遞亏掀。

rabbit_variable_queue.erl源碼中定義了RabbitMQ隊(duì)列的4種狀態(tài):

  • alpha:消息索引和消息內(nèi)容都存內(nèi)存忱反,最耗內(nèi)存,很少消耗CPU滤愕;
  • beta:消息索引存內(nèi)存温算,消息內(nèi)存存磁盤;
  • gama:消息索引內(nèi)存和磁盤都有间影,消息內(nèi)容存磁盤注竿;
  • delta:消息索引和內(nèi)容都存磁盤,基本不消耗內(nèi)存,消耗更多CPU和I/O操作

消息存入隊(duì)列后巩割,不是固定不變的胰丁,它會隨著系統(tǒng)的負(fù)載在隊(duì)列中不斷流動(dòng),消息的狀態(tài)會不斷發(fā)生變化喂分。

持久化的消息锦庸,索引和內(nèi)容都必須先保存在磁盤上,才會處于上述狀態(tài)中的一種蒲祈,側(cè)面說明甘萧,gama狀態(tài)是持久化消息才會有的狀態(tài)。

在運(yùn)行時(shí)梆掸,RabbitMQ會根據(jù)消息傳遞的速度定期計(jì)算一個(gè)當(dāng)前內(nèi)存中能夠保存的最大消息數(shù)量(target_ram_count)扬卷,如果alpha狀態(tài)的消息數(shù)量大于此值,則會引起消息的狀態(tài)轉(zhuǎn)換酸钦,多余的消息可能會轉(zhuǎn)換到beta怪得、gama或者delta狀態(tài)。區(qū)分這4種狀態(tài)的主要作用是滿足不同的內(nèi)存和CPU需求卑硫。

對于普通沒有設(shè)置優(yōu)先級和鏡像的隊(duì)列來說徒恋,backing_queue的默認(rèn)實(shí)現(xiàn)是rabbit_variable_queue,其內(nèi)部通過5個(gè)子隊(duì)列Q1欢伏、Q2入挣、delta、Q3硝拧、Q4來體現(xiàn)消息的各個(gè)狀態(tài)径筏。

消息的流動(dòng)如圖所示:

image
  • 消息初始存放在Q1

  • 當(dāng)Q4內(nèi)存充足時(shí)障陶,會直接從Q1推送到Q4滋恬,再由消費(fèi)者消費(fèi)掉;

  • 當(dāng)Q4內(nèi)存占用過大時(shí)抱究,Q1的消息會推送到Q2恢氯,由Q2推送到Q3,再由Q3推動(dòng)到Q4媳维,最后被消費(fèi)掉酿雪;

  • 當(dāng)Q3Q4都內(nèi)存占用過大時(shí)侄刽,消息就會由Q1Q2推送到delta朋凉,硬盤直接存儲起來州丹。等Q3Q4的內(nèi)存占用降低之后,再推送到Q3墓毒,最后由Q3推送到Q4吓揪,被消費(fèi)掉。

    即:消息最后都會交由Q4傳遞到消費(fèi)者所计。

消費(fèi)者獲取消息也會引起消息的狀態(tài)轉(zhuǎn)換柠辞。

當(dāng)消費(fèi)者獲取消息時(shí)

  • 首先會從Q4中獲取消息,如果獲取成功則返回主胧。
  • 如果Q4為空叭首,則嘗試從Q3中獲取消息,系統(tǒng)首先會判斷Q3是否為空踪栋,如果為空則返回隊(duì)列為空焙格,即此時(shí)隊(duì)列中無消息。
  • 如果Q3不為空夷都,則取出Q3中的消息眷唉;進(jìn)而再判斷此時(shí)Q3和Delta中的長度,如果都為空囤官,則可以認(rèn)為 Q2冬阳、Delta、 Q3党饮、Q4 全部為空咒彤,此時(shí)將Q1中的消息直接轉(zhuǎn)移至Q4,下次直接從Q4中獲取消息谆奥。
  • 如果Q3讀取消息之后為空粹污,Delta不為空,則將Delta的消息轉(zhuǎn)移至Q3中捏检,下次可以直接從Q3中獲取消息荞驴。

在將消息從Delta轉(zhuǎn)移到Q3的過程中,是按照索引分段讀取的:首先讀取某一段贯城,然后判斷讀取的消息的個(gè)數(shù)與Delta中消息的個(gè)數(shù)是否相等熊楼,如果相等,則可以判定此時(shí)Delta中己無消息能犯,則直接將Q2和剛讀取到的消息一并放入到Q3中鲫骗,如果不相等,僅將此次讀取到的消息轉(zhuǎn)移到Q3踩晶。

這里就有兩處疑問:

  • 第一個(gè)疑問是:為什么Q3為空則可以認(rèn)定整個(gè)隊(duì)列為空执泰?

    • 試想一下,如果Q3為空渡蜻,Delta不為空术吝,那么在Q3取出最后一條消息的時(shí)候计济,Delta 上的消息就會被轉(zhuǎn)移到Q3這樣與 Q3 為空矛盾;
    • 如果Delta 為空且Q2不為空排苍,則在Q3取出最后一條消息時(shí)會將Q2的消息并入到Q3中沦寂,這樣也與Q3為空矛盾;
    • 在Q3取出最后一條消息之后淘衙,如果Q2传藏、Delta、Q3都為空彤守,且Q1不為空時(shí)毯侦,則Q1的消息會\被轉(zhuǎn)移到Q4,這與Q4為空矛盾遗增。
  • 為什么Q3和Delta都為空時(shí)叫惊,則可以認(rèn)為 Q2、Delta做修、Q3霍狰、Q4全部為空?

    其實(shí)針對第一個(gè)問題的論述也解釋了這個(gè)問題饰及,通常在負(fù)載正常時(shí)蔗坯,如果消費(fèi)速度大于生產(chǎn)速度,對于不需要保證可靠不丟失的消息來說燎含,極有可能只會處于alpha狀態(tài)宾濒。

對于持久化消息,它一定會進(jìn)入gamma狀態(tài)屏箍,在開啟publisher confirm機(jī)制時(shí)绘梦,只有到了gamma 狀態(tài)時(shí)才會確認(rèn)該消息己被接收,若消息消費(fèi)速度足夠快赴魁、內(nèi)存也充足卸奉,這些消息也不會繼續(xù)走到下一個(gè)狀態(tài)。

為什么消息的堆積導(dǎo)致性能下降颖御?

在系統(tǒng)負(fù)載較高時(shí)榄棵,消息若不能很快被消費(fèi)掉,這些消息就會進(jìn)入到很深的隊(duì)列中去潘拱,這樣會增加處理每個(gè)消息的平均開銷疹鳄。

會花費(fèi)更多的性能維護(hù)消息的狀態(tài)流轉(zhuǎn)

因?yàn)橐ǜ嗟臅r(shí)間和資源處理“堆積”的消息,如此用來處理新流入的消息的能力就會降低芦岂,使得后流入的消息又被積壓到很深的隊(duì)列中瘪弓,繼續(xù)增大處理每個(gè)消息的平均開銷,繼而情況變得越來越惡化盔腔,使得系統(tǒng)的處理能力大大降低杠茬。

應(yīng)對這一問題一般有3種措施:

  • 增加prefetch_count的值月褥,即一次發(fā)送多條消息給消費(fèi)者弛随,加快消息被消費(fèi)的速度瓢喉。
  • 采用multiple ack,降低處理 ack 帶來的開銷
  • 流量控制

二舀透、基本命令

# 前臺啟動(dòng)Erlang VM和RabbitMQ 
rabbitmq-server 
# 后臺啟動(dòng) 
rabbitmq-server -detached 
# 停止RabbitMQ和Erlang VM 
rabbitmqctl stop 
# 查看所有隊(duì)列 
rabbitmqctl list_queues 
# 查看所有虛擬主機(jī) 
rabbitmqctl list_vhosts 
# 在Erlang VM運(yùn)行的情況下啟動(dòng)RabbitMQ應(yīng)用 
rabbitmqctl start_app rabbitmqctl stop_app 
# 查看節(jié)點(diǎn)狀態(tài) 
rabbitmqctl status 
# 查看所有可用的插件 
rabbitmq-plugins list 
# 啟用插件 
rabbitmq-plugins enable <plugin-name> 
# 停用插件
rabbitmq-plugins disable <plugin-name> 
# 添加用戶 
rabbitmqctl add_user username password 
# 列出所有用戶: 
rabbitmqctl list_users 
# 刪除用戶: 
rabbitmqctl delete_user username 
# 清除用戶權(quán)限: 
rabbitmqctl clear_permissions -p vhostpath username 
# 列出用戶權(quán)限: 
rabbitmqctl list_user_permissions username 
# 修改密碼: 
rabbitmqctl change_password username newpassword 
# 設(shè)置用戶權(quán)限: 
rabbitmqctl set_permissions -p vhostpath username "^$" ".*" ".*" 
# 創(chuàng)建虛擬主機(jī): 
rabbitmqctl add_vhost vhostpath #
# 列出所以虛擬主機(jī): 
rabbitmqctl list_vhosts 
# 列出虛擬主機(jī)上的所有權(quán)限: 
rabbitmqctl list_permissions -p vhostpath 
# 刪除虛擬主機(jī): 
rabbitmqctl delete_vhost vhost vhostpath 
# 移除所有數(shù)據(jù)栓票,要在 rabbitmqctl stop_app 之后使用: 
rabbitmqctl reset

三、RabbitMq工作流程

3.1 生產(chǎn)者發(fā)送消息的流程

  • 生產(chǎn)者連接RabbitMQ愕够,建立TCP連接(Connection)走贪,開啟信道(Channel)

  • 生產(chǎn)者聲明一個(gè)Exchange(交換器),并設(shè)置相關(guān)屬性惑芭,比如交換器類型坠狡、是否持久化等

  • 生產(chǎn)者聲明一個(gè)Queue(隊(duì)列)并設(shè)置相關(guān)屬性,比如是否排他遂跟、是否持久化逃沿、是否自動(dòng)刪除等

  • 生產(chǎn)者通過 bindingKey (綁定Key)將交換器和隊(duì)列綁定( binding )起來

  • 生產(chǎn)者發(fā)送消息至RabbitMQ Broker,其中包含 routingKey (路由鍵)幻锁、交換器等信息

  • 相應(yīng)的交換器根據(jù)接收到的 routingKey 查找相匹配的隊(duì)列凯亮。

  • 如果找到,則將從生產(chǎn)者發(fā)送過來的消息存入相應(yīng)的隊(duì)列中哄尔。

  • 如果沒有找到假消,則根據(jù)生產(chǎn)者配置的屬性選擇丟棄還是回退給生產(chǎn)者

  • 關(guān)閉信道、關(guān)閉連接岭接。

3.2 消費(fèi)者接收消息的過程

  • 消費(fèi)者連接到RabbitMQ Broker 富拗,建立一個(gè)連接(Connection ) ,開啟一個(gè)信道(Channel) 鸣戴。
  • 消費(fèi)者向RabbitMQ Broker 請求消費(fèi)相應(yīng)隊(duì)列中的消息啃沪,可能會設(shè)置相應(yīng)的回調(diào)函數(shù)以及做一些準(zhǔn)備工作
  • 等待 RabbitMQ Broker 回應(yīng)并投遞相應(yīng)隊(duì)列中的消息, 消費(fèi)者接收消息葵擎。
  • 消費(fèi)者確認(rèn)(ack) 接收到的消息谅阿。
  • RabbitMQ 從隊(duì)列中刪除相應(yīng)己經(jīng)被確認(rèn)的消息。
  • 關(guān)閉信道酬滤、關(guān)閉連接签餐。

3.3 代碼演示

3.3.0 常量

public class Demo01Constrant {

    public static final String queueName ="zephyrQueue.biz";

    public static final String exName="zephyrEx.biz";

    public static final String routingKey="hello.world";
}

3.3.1 生產(chǎn)者:

public class HelloProducer {


    public static void main(String[] args) throws IOException, TimeoutException {
        /** 0. 初始化*/
        // 獲取連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        // 設(shè)置主機(jī)名 hostname
        factory.setHost("192.168.11.60");
        // 設(shè)置虛擬主機(jī)名稱  /在url中的轉(zhuǎn)義字符是 %2f
        factory.setVirtualHost("/");
        // 用戶名
        factory.setUsername("root");
        // 密碼
        factory.setPassword("123456");
        // amqp的端口號
        factory.setPort(5672);

        /** 1. 建立連接*/
        // 建立TCP連接
        Connection connection = factory.newConnection();
        // 獲取通道
        Channel channel = connection.createChannel();

        /**2. 聲明消息隊(duì)列 */
        // 消息隊(duì)列名稱
        // 是否是持久化的
        // 是否是排他的
        // 是否是自動(dòng)刪除的
        // 消息隊(duì)列的屬性信息。使用默認(rèn)值盯串;
        channel.queueDeclare(Demo01Constrant.queueName, false, false, true, null);

        /** 3. 聲明交換器 */
        // 交換器的名稱
        // 交換器的類型
        // 交換器是否是持久化的
        // 交換器是否是自動(dòng)刪除的
        // 交換器的屬性map集合
        channel.exchangeDeclare(Demo01Constrant.exName, BuiltinExchangeType.DIRECT, false, false, null);

        /**4. 交換器和消息隊(duì)列綁定氯檐,并指定路由鍵*/
        channel.queueBind(Demo01Constrant.queueName, Demo01Constrant.exName, Demo01Constrant.routingKey);

        /**5. 發(fā)送消息*/
        // 交換器的名字
        // 該消息的路由鍵
        // 該消息的屬性BasicProperties對象
        // 消息的字節(jié)數(shù)組
        channel.basicPublish(Demo01Constrant.exName, Demo01Constrant.routingKey, null, "hello world message".getBytes());

        /** 6. 關(guān)閉通道、連接*/
        channel.close();
        connection.close();
    }
}

3.3.2 簡單消費(fèi)者

通過拉取的方式消費(fèi):main方法執(zhí)行完畢就結(jié)束体捏,mq中有消息就讀取冠摄,沒消息就啥也不干:channel.basicGet

public class HelloGetConsumer {

    public static void main(String[] args) throws NoSuchAlgorithmException, KeyManagementException, URISyntaxException, IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        // 指定協(xié)議: amqp://
        // 指定用戶名  root
        // 指定密碼   123456
        // 指定host   192.168.11.60
        // 指定端口號  5672
        // 指定虛擬主機(jī)  %2f
        factory.setUri("amqp://root:123456@192.168.11.60:5672/%2f");

        final Connection connection = factory.newConnection();

        final Channel channel = connection.createChannel();

        // 拉消息模式
        // 指定從哪個(gè)消費(fèi)者消費(fèi)消息
        // 指定是否自動(dòng)確認(rèn)消息  true表示自動(dòng)確認(rèn)
        final GetResponse getResponse = channel.basicGet(Demo01Constrant.queueName, true);
        // 獲取消息體  hello world 1
        final byte[] body = getResponse.getBody();
        System.out.println(new String(body));

        // 關(guān)閉信道糯崎、連接
        channel.close();
        connection.close();

    }
}

3.3.3 阻塞式消費(fèi)者

通過消息推送的方式消費(fèi):main方法執(zhí)行后,mq中有消息就讀取河泳,沒消息就阻塞:channel.basicConsume

public class HelloConsumer {

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@192.168.11.60:5672/%2f");

        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        // 確保MQ中有該隊(duì)列沃呢,如果沒有則創(chuàng)建
        channel.queueDeclare(Demo01Constrant.queueName, false, false, true, null);

        // 監(jiān)聽消息,一旦有消息推送過來拆挥,就調(diào)用第一個(gè)lambda表達(dá)式
        channel.basicConsume(Demo01Constrant.queueName, (consumerTag, message) -> {
            System.out.println(new String(message.getBody()));
        }, (consumerTag) -> {});

        // channel.close();
        // connection.close();
    }
}

3.4 Connection和Channel的關(guān)系

3.4.1 創(chuàng)建連接 new Connection();

其源碼如下:

public Connection newConnection(ExecutorService executor, AddressResolver addressResolver, String clientProvidedName) throws IOException, TimeoutException {
    ...
    // 設(shè)置參數(shù)
    ConnectionParams params = params(executor);
    ...
    // 判斷自動(dòng)恢復(fù)薄霜,默認(rèn)true
    if (isAutomaticRecoveryEnabled()) {
        // 創(chuàng)建并返回可恢復(fù)連接對象
        AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addressResolver, metricsCollector);
        conn.init();
        return conn;
    } else {
        // 獲取到所有地址并遍歷,連接上一個(gè)就直接返回對應(yīng)的連接纸兔; 
        List<Address> addrs = addressResolver.getAddresses();
        Exception lastException = null;
        for (Address addr : addrs) {
            try {
                FrameHandler handler = fhFactory.create(addr, clientProvidedName);
                AMQConnection conn = createConnection(params, handler, metricsCollector);
                conn.start();
                this.metricsCollector.newConnection(conn);
                return conn;
            } catch (IOException e) {
                lastException = e;
            } catch (TimeoutException te) {
                lastException = te;
            }
        }
        ...
        throw new IOException("failed to connect");
    }
}

其主要進(jìn)行了如下工作:

  • 調(diào)用param()方法惰瓜,進(jìn)行一些初始化的參數(shù)設(shè)置(用戶配置的線程池會在此處進(jìn)行配置);

    // 參數(shù)設(shè)置
    public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
        ConnectionParams result = new ConnectionParams();
                // 會在這里配置用戶設(shè)置的線程池
        result.setConsumerWorkServiceExecutor(consumerWorkServiceExecutor);
        // 一系列set
        result.setXxx(xx);
        ...
        return result;
    }
    
  • 判斷是否是自動(dòng)回復(fù)的連接(默認(rèn)true)

    • 若true汉矿,則在init()方法中崎坊,遍歷服務(wù)器地址,創(chuàng)建可恢復(fù)連接對象(RecoveryAwareAMQConnection)洲拇,并調(diào)用start()方法進(jìn)行連接奈揍。若成功,則直接返回連接對象.

      public void init() throws IOException, TimeoutException {
          // cf是RecoveryAwareAMQConnectionFactory對象
          this.delegate = this.cf.newConnection();
          this.addAutomaticRecoveryListener(delegate);
      }
      
      public RecoveryAwareAMQConnection newConnection() throws IOException, TimeoutException {
          Exception lastException = null;
          List<Address> shuffled = shuffle(addressResolver.getAddresses());
      
          for (Address addr : shuffled) {
              try {
                  FrameHandler frameHandler = factory.create(addr, connectionName());
                  RecoveryAwareAMQConnection conn = createConnection(params, frameHandler, metricsCollector);
                  conn.start();
                  metricsCollector.newConnection(conn);
                  return conn;
              } catch (IOException e) {
                  lastException = e;
              } catch (TimeoutException te) {
                  lastException = te;
              }
          }
          ...
          throw new IOException("failed to connect");
      }
      
    • 若false呻待,則直接遍歷服務(wù)器地址打月,創(chuàng)建普通連接對象(AMQConnection),并調(diào)用start()方法蚕捉。若成功奏篙,則直接返回連接對象;

  • 而start()方法則進(jìn)行一些建立連接所必須的協(xié)議通信工作迫淹;

    // 協(xié)議通信
    public void start() throws IOException, TimeoutException {
      
        ...
          // 封裝StartOk(或SecureOk)報(bào)文對象(StartOk秘通、SecureOk都屬于AMQP的規(guī)范)
        Method method = (challenge == null)
                                    ? new AMQP.Connection.StartOk.Builder()
                                                .clientProperties(_clientProperties)
                                                .mechanism(sm.getName())
                                                .response(response)
                                                .build()
                                    : new AMQP.Connection.SecureOk.Builder().response(response).build();
        
        ...
          // 向服務(wù)端發(fā)送報(bào)文對象
        Method serverResponse = _channel0.rpc(method, handshakeTimeout/2).getMethod();
    
        ...
          // 初始化channel管理器(_channelManager為空則不能創(chuàng)建channel)
        _channelManager = instantiateChannelManager(channelMax, threadFactory);
    
        ...
          // 封裝并發(fā)送TuneOk報(bào)文對象(優(yōu)化完畢)
        _channel0.transmit(new AMQP.Connection.TuneOk.Builder()
                            .channelMax(channelMax)
                            .frameMax(frameMax)
                            .heartbeat(heartbeat)
                            .build());
          // 封裝并發(fā)送Open報(bào)文對象
        _channel0.exnWrappingRpc(new AMQP.Connection.Open.Builder()
                                    .virtualHost(_virtualHost)
                                .build());
       ... 
    }
    

至此,connection就創(chuàng)建好了敛熬。

3.4.2 創(chuàng)建信道 new Channel();

  • 獲取到在創(chuàng)建連接過程中的ChannelManager對象cm肺稀,再通過cm創(chuàng)建channel對象;

    @Override
    public Channel createChannel() throws IOException {
        ensureIsOpen();
        ChannelManager cm = _channelManager;
        if (cm == null) return null;
        Channel channel = cm.createChannel(this);
        metricsCollector.newChannel(channel);
        return channel;
    }
    
  • 在synchronized鎖控制下应民,獲取channel編號话原,然后逐步完成channel對象創(chuàng)建;

    public ChannelN createChannel(AMQConnection connection) throws IOException {
      ChannelN ch;
      synchronized (this.monitor) {
        // 獲取channel編號
        int channelNumber = channelNumberAllocator.allocate();
        if (channelNumber == -1) {
          return null;
        } else {
          // 傳入連接诲锹、編號繁仁,創(chuàng)建channel
          ch = addNewChannel(connection, channelNumber);
        }
      }
      ch.open(); // now that it's been safely added
      return ch;
    }
    
    private ChannelN addNewChannel(AMQConnection connection, int channelNumber) {
      ...
      ChannelN ch = instantiateChannel(connection, channelNumber, this.workService);
      _channelMap.put(ch.getChannelNumber(), ch);
      return ch;
    }
    
    protected ChannelN instantiateChannel(AMQConnection connection, int channelNumber, ConsumerWorkService workService) {
      return new ChannelN(connection, channelNumber, workService, this.metricsCollector);
    }
    
    
    RecoveryAwareChannelManager.java
    @Override
    protected ChannelN instantiateChannel(AMQConnection connection, int channelNumber, ConsumerWorkService workService) {
      return new RecoveryAwareChannelN(connection, channelNumber, workService, this.metricsCollector);
    }
    

至此,channel就創(chuàng)建好了归园。

3.4.3 綁定連接與信道 queueBind();

在調(diào)用了channel.queueDeclare黄虱、channel.exchangeDeclare之后,就需要調(diào)用queueBind()庸诱,綁定connection與channel:

queueBind()方法只是依據(jù)AMQP協(xié)議捻浦,嘗試發(fā)送Queue.Bind報(bào)文晤揣,并得到Queue.BindOk響應(yīng)

public Queue.BindOk queueBind(String queue, String exchange,
                              String routingKey, Map<String, Object> arguments)
  throws IOException
{
  validateQueueNameLength(queue);
  return (Queue.BindOk)
    // 封裝并發(fā)送Queue.Bind報(bào)文對象,返回的是Queue.BindOk對象朱灿,表示綁定成功
    exnWrappingRpc(new Queue.Bind.Builder()
                   .queue(queue)
                   .exchange(exchange)
                   .routingKey(routingKey)
                   .arguments(arguments)
                   .build())
    .getMethod();
}

通過exnWrappingRpc方法一路追溯昧识,可以看到,需要發(fā)送的幀數(shù)據(jù)被存儲到中母剥,然后喚醒selector對象滞诺,通過輪詢選擇器發(fā)送(Reactor模型)形导,進(jìn)而實(shí)現(xiàn)connection對象的復(fù)用环疼。

public AMQCommand exnWrappingRpc(Method m)
    throws IOException
{
    ...
    return privateRpc(m);
    
}

private AMQCommand privateRpc(Method m)
    throws IOException, ShutdownSignalException
{
    SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation(m);
    rpc(m, k);
    // 至此,請求發(fā)送完畢朵耕,調(diào)用getReply()方法休眠當(dāng)前線程炫隶,直到connection對象的讀線程返回響應(yīng)。
    // At this point, the request method has been sent, and we
    // should wait for the reply to arrive.
    //
    // Calling getReply() on the continuation puts us to sleep
    // until the connection's reader-thread throws the reply over
    // the fence or the RPC times out (if enabled)
    if(_rpcTimeout == NO_RPC_TIMEOUT) {
        return k.getReply();
    } else {
        try {
            return k.getReply(_rpcTimeout);
        } catch (TimeoutException e) {
            throw wrapTimeoutException(m, e);
        }
    }
}

public void rpc(Method m, RpcContinuation k)
    throws IOException
{
    synchronized (_channelMutex) {
        ensureIsOpen();
        quiescingRpc(m, k);
    }
}

public void quiescingRpc(Method m, RpcContinuation k)
    throws IOException
{
    synchronized (_channelMutex) {
        enqueueRpc(k);
        quiescingTransmit(m);
    }
}

public void quiescingTransmit(Method m) throws IOException {
    synchronized (_channelMutex) {
        quiescingTransmit(new AMQCommand(m));
    }
}

public void quiescingTransmit(AMQCommand c) throws IOException {
    synchronized (_channelMutex) {
        ...
        c.transmit(this);
    }
}


public void transmit(AMQChannel channel) throws IOException {
    int channelNumber = channel.getChannelNumber();
    AMQConnection connection = channel.getConnection();
    synchronized (assembler) {
        Method m = this.assembler.getMethod();
        if (m.hasContent()) {
            ...
            connection.writeFrame(m.toFrame(channelNumber));
            connection.writeFrame(headerFrame);
            for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
                ...
                connection.writeFrame(frame);
            }
        } else {
            connection.writeFrame(m.toFrame(channelNumber));
        }
    }
    connection.flush();
}

// (通過connection對象)寫數(shù)據(jù)幀
public void writeFrame(Frame f) throws IOException {
    _frameHandler.writeFrame(f);
    _heartbeatSender.signalActivity();
}

SocketChannelFrameHandler.java
public void writeFrame(Frame frame) throws IOException {
    state.write(frame);
}

public void write(Frame frame) throws IOException {
    sendWriteRequest(new FrameWriteRequest(frame));
}

private void sendWriteRequest(WriteRequest writeRequest) throws IOException {
    boolean offered = this.writeQueue.offer(writeRequest);
    if(offered) {
        this.writeSelectorState.registerFrameHandlerState(this, SelectionKey.OP_WRITE);
        this.readSelectorState.selector.wakeup();
    } else {
        throw new IOException("Frame enqueuing failed");
    }
        ...
}

readSelectorState.selector對應(yīng)的源碼如下:

package com.rabbitmq.client.impl.nio;

public class SelectorHolder {

    final Selector selector;

    final Set<SocketChannelRegistration> registrations = Collections
        .newSetFromMap(new ConcurrentHashMap<SocketChannelRegistration, Boolean>());

    SelectorHolder(Selector selector) {
        this.selector = selector;
    }

    public void registerFrameHandlerState(SocketChannelFrameHandlerState state, int operations) {
        registrations.add(new SocketChannelRegistration(state, operations));
        selector.wakeup();
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末阎曹,一起剝皮案震驚了整個(gè)濱河市伪阶,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌处嫌,老刑警劉巖栅贴,帶你破解...
    沈念sama閱讀 211,817評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異熏迹,居然都是意外死亡檐薯,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,329評論 3 385
  • 文/潘曉璐 我一進(jìn)店門注暗,熙熙樓的掌柜王于貴愁眉苦臉地迎上來坛缕,“玉大人,你說我怎么就攤上這事捆昏∽” “怎么了?”我有些...
    開封第一講書人閱讀 157,354評論 0 348
  • 文/不壞的土叔 我叫張陵骗卜,是天一觀的道長宠页。 經(jīng)常有香客問我,道長寇仓,這世上最難降的妖魔是什么举户? 我笑而不...
    開封第一講書人閱讀 56,498評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮焚刺,結(jié)果婚禮上敛摘,老公的妹妹穿的比我還像新娘。我一直安慰自己乳愉,他們只是感情好兄淫,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,600評論 6 386
  • 文/花漫 我一把揭開白布屯远。 她就那樣靜靜地躺著,像睡著了一般捕虽。 火紅的嫁衣襯著肌膚如雪慨丐。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,829評論 1 290
  • 那天泄私,我揣著相機(jī)與錄音房揭,去河邊找鬼。 笑死晌端,一個(gè)胖子當(dāng)著我的面吹牛捅暴,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播咧纠,決...
    沈念sama閱讀 38,979評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼蓬痒,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了漆羔?” 一聲冷哼從身側(cè)響起梧奢,我...
    開封第一講書人閱讀 37,722評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎演痒,沒想到半個(gè)月后亲轨,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,189評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡鸟顺,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,519評論 2 327
  • 正文 我和宋清朗相戀三年惦蚊,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片诊沪。...
    茶點(diǎn)故事閱讀 38,654評論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡养筒,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出端姚,到底是詐尸還是另有隱情晕粪,我是刑警寧澤,帶...
    沈念sama閱讀 34,329評論 4 330
  • 正文 年R本政府宣布渐裸,位于F島的核電站巫湘,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏昏鹃。R本人自食惡果不足惜尚氛,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,940評論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望洞渤。 院中可真熱鬧阅嘶,春花似錦、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,762評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至魂迄,卻和暖如春粗截,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背捣炬。 一陣腳步聲響...
    開封第一講書人閱讀 31,993評論 1 266
  • 我被黑心中介騙來泰國打工熊昌, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人湿酸。 一個(gè)月前我還...
    沈念sama閱讀 46,382評論 2 360
  • 正文 我出身青樓婿屹,卻偏偏與公主長得像,于是被迫代替她去往敵國和親稿械。 傳聞我的和親對象是個(gè)殘疾皇子选泻,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,543評論 2 349

推薦閱讀更多精彩內(nèi)容

  • RabbitMQ 簡介 1. RabbitMQ 介紹 行業(yè)還是傳統(tǒng)行業(yè)都廣泛使用(最早是為了解決電信行業(yè)系統(tǒng)之間的...
    左師兄zuosx閱讀 281評論 0 1
  • RabbitMQ是采用Erlang語言實(shí)現(xiàn)AMQP(Advanced Message Queuing Protoc...
    陳晨_軟件五千言閱讀 2,049評論 0 5
  • 來源 RabbitMQ是用Erlang實(shí)現(xiàn)的一個(gè)高并發(fā)高可靠AMQP消息隊(duì)列服務(wù)器。支持消息的持久化美莫、事務(wù)、擁塞控...
    jiangmo閱讀 10,350評論 2 34
  • RabbitMQ詳解 本文地址:http://www.host900.com/index.php/articles...
    嘉加家佳七閱讀 2,506評論 0 9
  • 今天感恩節(jié)哎梯捕,感謝一直在我身邊的親朋好友厢呵。感恩相遇!感恩不離不棄傀顾。 中午開了第一次的黨會襟铭,身份的轉(zhuǎn)變要...
    迷月閃星情閱讀 10,559評論 0 11