概述
????RabbitMQ是目前非常熱門的一款消息中間件钝域,不管是互聯(lián)網(wǎng)行業(yè)還是傳統(tǒng)行業(yè)都在大量地使用 。 RabbitMQ 憑借其高可靠屠橄、易擴(kuò)展座硕、高可用及豐富的功能特性受到越來越多企業(yè)的青睞
-
RabbitMQ的起源
????RabbitMQ是采用Erlang語言實(shí)現(xiàn)AMQP (Advanced Message Queuing Protocol趾浅,高級消息隊(duì)列協(xié)議)的消息中間件愕提,它最初起源于金融系統(tǒng),用于在分布式系統(tǒng)中存儲轉(zhuǎn)發(fā)消息
????在此之前潮孽,有一些消息中間件的商業(yè)實(shí)現(xiàn),比如微軟的MSMQ(MicroSoftMessageQueue)筷黔、IBM的WebSphere 等往史。 由于高昂的價(jià)格,一般只應(yīng)用于大型組織機(jī)構(gòu)佛舱,它們需要可靠性椎例、解藕及實(shí)時(shí)消息通信的功能。由于商業(yè)壁壘请祖,商業(yè)MQ供應(yīng)商想要解決應(yīng)用互通的問題订歪,而不是去創(chuàng)建標(biāo)準(zhǔn)來實(shí)現(xiàn)不同的MQ產(chǎn)品間的互通,或者允許應(yīng)用程序更改MQ平臺
????為了打破這個(gè)壁壘肆捕,同時(shí)為了能夠讓消息在各個(gè)消息隊(duì)列平臺間互融互通刷晋, JMS CJava Message Service) 應(yīng)運(yùn)而生 。 JMS試圖通過提供公共JavaAPI的方式,隱藏單獨(dú)MQ產(chǎn)品供應(yīng)商提供的實(shí)際接口眼虱,從而跨越了壁壘喻奥,以及解決了互通問題。從技術(shù)上講捏悬,Java應(yīng)用程序只需針對JMS API編程撞蚕,選擇合適的MQ驅(qū)動(dòng)即可,JMS會打理好其他部分 过牙。 ActiveMQ就是JMS的一種實(shí)現(xiàn) 甥厦。 不過嘗試使用單獨(dú)標(biāo)準(zhǔn)化接口來膠合眾多不同的接口,最終會暴露出問題寇钉,使得應(yīng)用程序變得更加脆弱刀疙。 所以急需一種新的消息通信標(biāo)準(zhǔn)化方案
????在2006年6月,由Cisco摧莽、 Redhat庙洼、iMatix等聯(lián)合制定了AMQP的公開標(biāo)準(zhǔn),由此AMQP登上了歷史的舞臺 镊辕。 它是應(yīng)用層協(xié)議的一個(gè)開放標(biāo)準(zhǔn)油够,以解決眾多消息中間件的需求和拓?fù)浣Y(jié)構(gòu)問題 。 它為面向消息的中間件設(shè)計(jì)征懈,基于此協(xié)議的客戶端與消息中間件可傳遞消息石咬,并不受產(chǎn)品、開發(fā)語言等條件的限制
????RabbitMQ 最初版本實(shí)現(xiàn)了AMQP的一個(gè)關(guān)鍵特性:使用協(xié)議本身就可以對隊(duì)列和交換器 (Exchange) 這樣的資源進(jìn)行配置 卖哎。 對于商業(yè)MQ供應(yīng)商來說鬼悠,資源配置需要通過管理終端的特定工具才能完成 。RabbitMQ的資源配置能力使其成為構(gòu)建分布式應(yīng)用的最完美的通信總線亏娜,特別有助于充分利用基于云的資源和進(jìn)行快速開發(fā) -
RabbitMQ特性
1.可靠性
RabbitMQ使用一些機(jī)制來保證可靠性焕窝, 如持久化、傳輸確認(rèn)及發(fā)布確認(rèn)等维贺,在后續(xù)的文章中我們將深入講述如何保證RabbitMQ消息的可靠性
2.靈活的路由
在消息進(jìn)入隊(duì)列之前它掂,通過交換器來路由消息
對于典型的路由功能,RabbitMQ己經(jīng)提供了一些內(nèi)置的交換器來實(shí)現(xiàn)溯泣。針對更復(fù)雜的路由功能虐秋,可以將多個(gè)交換器綁定在一起,也可以通過插件機(jī)制來實(shí)現(xiàn)自己的交換器
3.擴(kuò)展性
多個(gè)RabbitMQ節(jié)點(diǎn)可以組成一個(gè)集群垃沦,也可以根據(jù)實(shí)際業(yè)務(wù)情況動(dòng)態(tài)地?cái)U(kuò)展 集群中節(jié)點(diǎn)
4.高可用
隊(duì)列可以在集群中的機(jī)器上設(shè)置鏡像客给,使得在部分節(jié)點(diǎn)出現(xiàn)問題的情況下隊(duì)列仍然可用
5.多種協(xié)議
RabbitMQ除了原生支持AMQP協(xié)議,還支持STOMP肢簿, MQTT等多種消息中間件協(xié)議
6.多語言客戶端
RabbitMQ幾乎支持所有常用語言靶剑,比如 Java蜻拨、 Python、 Ruby抬虽、 PHP官觅、 C#、 JavaScript等
7.管理界面
RabbitMQ提供了一個(gè)易用的用戶界面阐污,使得用戶可以監(jiān)控和管理消息休涤、集群中的節(jié)點(diǎn)等。
8.插件機(jī)制
RabbitMQ 提供了許多插件 笛辟, 以實(shí)現(xiàn)從多方面進(jìn)行擴(kuò)展功氨,當(dāng)然也可以編寫自己的插件。
RabbitMQ的基本模型
????所有MQ產(chǎn)品從模型抽象上來說都是一樣的過程:生產(chǎn)者(producer)創(chuàng)建消息手幢,然后發(fā)布到隊(duì)列(queue)中捷凄,最后將消息發(fā)送到監(jiān)聽的消費(fèi)者,消費(fèi)者(consumer)來消費(fèi)某個(gè)隊(duì)列中的消息
上面只是一個(gè)宏觀上抽象的描述围来,而具體到RabbitMQ則有更詳細(xì)的內(nèi)部結(jié)構(gòu):
想要理解RabbitMQ跺涤,我們需要認(rèn)識一些基本概念:
-
1.消息(Message)
消息由標(biāo)簽(label)和消息體(payload)組成。
標(biāo)簽由一系列的可選屬性組成监透,這些屬性包括routing-key(路由鍵)桶错、priority(相對于其他消息的優(yōu)先權(quán))、delivery-mode(指出該消息可能需要持久性存儲)等
消息體一般是一個(gè)帶有業(yè)務(wù)邏輯結(jié)構(gòu)的數(shù)據(jù)胀蛮,比如一個(gè)JSON字符串院刁,當(dāng)然也可以進(jìn)一步對這個(gè)消息體進(jìn)行序列化操作 -
2.生產(chǎn)者(Publisher/Producer)
創(chuàng)建消息的一方稱為生產(chǎn)者,生產(chǎn)者把消息交由RabbitMQ粪狼,RabbitMQ之后會根據(jù)標(biāo)簽把消息發(fā)送給感興趣的消費(fèi)者 (Consumer) -
3.交換器(Exchange)
用來接收生產(chǎn)者發(fā)送的消息并負(fù)責(zé)將這些消息路由給服務(wù)器中的隊(duì)列退腥,如果路由不到,則返回給生產(chǎn)者再榄,或直接丟棄(視具體配置而定) -
4.綁定(Binding)
RabbitMQ中通過綁定將交換器Exchange與隊(duì)列Queue關(guān)聯(lián)起來狡刘,在綁定的時(shí)候一般會指定一個(gè)綁定鍵 (BindingKey),這樣 RabbitMQ就知道如何正確地將消息路由到隊(duì)列了困鸥,如下圖:
生產(chǎn)者將消息發(fā)送給交換器時(shí)嗅蔬,需要一個(gè)RoutingKey,當(dāng)BindingKey和RoutingKey相匹配時(shí)窝革, 消息會被路由到對應(yīng)的隊(duì)列中购城。在綁定多個(gè)隊(duì)列到同一個(gè)交換器的時(shí)候吕座, 這些綁定允許使用相同的BindingKey虐译。BindingKey 并不是在所有的情況下都生效,它依賴于交換器類型 吴趴,比如fanout類型的交換器就會無視 BindingKey漆诽,而是將消息路由到所有綁定到該交換器的隊(duì)列中。
需要注意的是:在某些情形下 , RoutingKey 與 BindingKey 可以看作同 一個(gè)東西 厢拭。例如如下代碼片段
/**
* FileName: MessageProducer
* Author: TP
* Date: 2019-06-07 18:56
* Description:消息生產(chǎn)者
*/
public class MessageProducer {
//交換器名稱
private static final String EXCHANGE_NAME = "exchange_demo";
//路由鍵
private static final String ROUTING_KEY = "routingKey_demo";
//隊(duì)列名稱
private static final String QUEUE_NAME = "queue_demo";
//IP地址
private static final String IP_ADDRESS = "127.0.0.1";
//端口(RabbitMQ服務(wù)端口默認(rèn)為5672)
private static final int PORT = 5672;
//用戶名
private static final String USER_NAME = "admin";
//密碼
private static final String PASS_WORD = "admin";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//創(chuàng)建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(IP_ADDRESS);
connectionFactory.setPort(PORT);
connectionFactory.setUsername(USER_NAME);
connectionFactory.setPassword(PASS_WORD);
//獲取連接
Connection connection = connectionFactory.newConnection();
//創(chuàng)建信道
Channel channel = connection.createChannel();
//創(chuàng)建一個(gè)類型為direct兰英、持久化的、非自動(dòng)刪除的交換器
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
//創(chuàng)建一個(gè)持久化供鸠、非排他畦贸、非自動(dòng)刪除的隊(duì)列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//將交換機(jī)與隊(duì)列通過路由鍵綁定(綁定key和路由key使用同一個(gè))
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
//發(fā)送一條持久化的消息:HELLO Rabbit MQ
String message = "HELLO Rabbit MQ";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
//關(guān)閉資源
channel.close();
connection.close();
}
}
以上代碼聲明了一個(gè)direct類型的交換器,然后將交換器和隊(duì)列綁定起來楞捂。channel.queueBind方法中本該使用BindingKey薄坏,卻和channel.basicPublish方法同樣使用了RoutingKey,這樣做的潛臺詞是:這里的RoutingKey和BindingKey是同一個(gè)東西寨闹。在direct交換器類型下胶坠, RoutingKey和BindingKey需要完全匹配才能使用,所以上面代碼中采用了此種寫法會顯得方便許多繁堡。
所以在很多人的代碼中可能習(xí)慣性地將BindingKey寫成RoutingKey沈善,尤其是在使用 direct類型的交換器的時(shí)候。
但是在topic交換器類型下椭蹄,RoutingKey和BindingKey之間需要做模糊匹配闻牡,兩者并不是相同的。
-
5.路由鍵(RoutingKey)
生產(chǎn)者將消息發(fā)給交換器的時(shí)候塑娇,一般會指定一個(gè)RoutingKey澈侠,用來指定這個(gè)消息的路由規(guī)則,而這個(gè) RoutingKey需要與交換器類型和綁定鍵 (BindingKey) 聯(lián)合使用才能最終生效埋酬。
在交換器類型和綁定鍵 (BindingKey) 固定的情況下哨啃,生產(chǎn)者可以在發(fā)送消息給交換器時(shí), 通過指定 RoutingKey來決定消息流向哪里写妥。 -
6.隊(duì)列(Queue)
消息隊(duì)列用來保存消息直到發(fā)送給消費(fèi)者拳球。它是消息的容器,也是消息的終點(diǎn)珍特。一個(gè)消息可投入一個(gè)或多個(gè)隊(duì)列祝峻。消息一直在隊(duì)列里面,等待消費(fèi)者連接到這個(gè)隊(duì)列將其取走扎筒。 -
7.連接(Connection)
網(wǎng)絡(luò)連接莱找,比如一個(gè)TCP連接。 -
8.信道(Channel)
多路復(fù)用連接中的一條獨(dú)立的雙向數(shù)據(jù)流通道嗜桌。信道是建立在真實(shí)的TCP連接內(nèi)地虛擬連接奥溺,AMQP 命令都是通過信道發(fā)出去的,不管是發(fā)布消息骨宠、訂閱隊(duì)列還是接收消息浮定,這些動(dòng)作都是通過信道完成相满。因?yàn)閷τ诓僮飨到y(tǒng)來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念桦卒,以復(fù)用一條 TCP 連接立美。 -
9.消費(fèi)者(Consumer)
消費(fèi)消息的一方稱為消費(fèi)者,表示一個(gè)從消息隊(duì)列中取得消息的客戶端應(yīng)用程序方灾。 -
10.虛擬主機(jī)(Virtual Host)
虛擬主機(jī)建蹄,表示一批交換器、消息隊(duì)列和相關(guān)對象裕偿。虛擬主機(jī)是共享相同的身份認(rèn)證和加密環(huán)境的獨(dú)立服務(wù)器域躲撰。每個(gè) vhost 本質(zhì)上就是一個(gè)mini版的RabbitMQ服務(wù)器,擁有自己的隊(duì)列击费、交換器拢蛋、綁定和權(quán)限機(jī)制。vhost是AMQP概念的基礎(chǔ)蔫巩,必須在連接時(shí)指定谆棱,RabbitMQ 默認(rèn)的vhost是 / -
11.服務(wù)節(jié)點(diǎn)(Broker)
對于RabbitMQ來說,一個(gè)RabbitMQBroker可以簡單地看作一個(gè)RabbitMQ服務(wù)節(jié)點(diǎn)圆仔, 或者RabbitMQ服務(wù)實(shí)例垃瞧。 大多數(shù)情況下也可以將一個(gè)RabbitMQ Broker看作一臺RabbitMQ服務(wù)器 。
整個(gè)消息隊(duì)列的運(yùn)轉(zhuǎn)過程大致如下:
交換器(Exchange)類型
RabbitMQ 常用的交換器類型有 fanout坪郭、 direct刹前、 topic单雾、 headers 這四種 齿诉。 AMQP 協(xié)議里還提
到另外兩種類型: System 和自定義橄霉,這里不予描述。對于這四種類型下面一一闡述沪曙。
-
fanout
這種類型會把所有發(fā)送到該交換器的消息路由到所有與該交換器綁定的隊(duì)列中奕污,很像子網(wǎng)廣播,每臺子網(wǎng)內(nèi)的主機(jī)都獲得了一份復(fù)制的消息液走,fanout 類型轉(zhuǎn)發(fā)消息是最快的碳默。
-
direct
direct類型的交換器路由規(guī)則也很簡單,它會把消息路由到那些 BindingKey和 RoutingKey 完全匹配的隊(duì)列中缘眶。
以下圖為例:交換器的類型為direct嘱根,如果我們發(fā)送一條消息,并在發(fā)送消息的時(shí)候設(shè)置路由鍵為" warning"巷懈,則消息會路由到Queuel和Queue2
如果在發(fā)送消息的時(shí)候設(shè)置路由鍵為"info" 或者 "debug"该抒,消息只會路由到Queue2。 如果以其他的路由鍵發(fā)送消息砸喻,則消息不會路由到這兩個(gè)隊(duì)列中柔逼。
-
topic
前面講到direct類型的交換器路由規(guī)則是完全匹配BindingKey和RoutingKey,但是這種嚴(yán)格的匹配方式在很多情況下不能滿足實(shí)際業(yè)務(wù)的需求割岛。 topic類型的交換器在匹配規(guī)則上進(jìn)行了擴(kuò)展愉适,它與direct類型的交換器相似,也是將消息路由到BindingKey和RoutingKey相匹配的隊(duì)列中癣漆,但這里的匹配規(guī)則有些不同维咸,它約定:
????1.RoutingKey為一個(gè)點(diǎn)號"."分隔的字符串(被點(diǎn)號"."分隔開的每一段獨(dú)立的字符串稱為一個(gè)單詞
????????如"com.rabbitmq.client","java.util.concurrent"惠爽、"com.hidden.client"
????2. BindingKey和RoutingKey一樣也是點(diǎn)號"."分隔的字符串
????3. BindingKey中可以存在兩種特殊字符串""和"#"用于做模糊匹配
????????其中""用于匹配一個(gè)單詞癌蓖,“#”用于匹配多個(gè)單詞(可以是0個(gè))
例如:
結(jié)果:
路由鍵為"com.rabbitmq.client" 的消息會同時(shí)路由到Queuel和Queue2
路由鍵為"com.hidden.client" 的消息只會路由到Queue2中
路由鍵為"com.hidden.demo" 的消息只會路由到Queue2中
路由鍵為 "java.rabbitmq.demo" 的消息只會路由到Queuel中
路由鍵為" java.util.concurrent" 的消息將會被丟棄或者返回給生產(chǎn)者(需要設(shè)置mandatory參數(shù)) ,因?yàn)樗鼪]有匹配任何路由鍵 headers
headers 類型的交換器不依賴于路由鍵的匹配規(guī)則來路由消息婚肆,而是根據(jù)發(fā)送的消息內(nèi)容中
的headers屬性進(jìn)行匹配租副。在綁定隊(duì)列和交換器時(shí)制定一組鍵值對,當(dāng)發(fā)送消息到交換器時(shí)较性,RabbitMQ 會獲取到該消息的headers(也是一個(gè)鍵值對的形式) 用僧,對比其中的鍵值對是否完全匹配隊(duì)列和交換器綁定時(shí)指定的鍵值對,如果完全匹配則消息會路由到該隊(duì)列赞咙,否則不會路由到該隊(duì)列责循。headers 類型的交換器性能會很差,而且也不實(shí)用攀操,基本上不會看到它的存在院仿。
RabbitMQ的運(yùn)轉(zhuǎn)流程
生產(chǎn)者端
- 生產(chǎn)者連接到RabbitMQ Broker,建立一個(gè)連接(Connection)速和,開啟一個(gè)信道 (Channel)
- 生產(chǎn)者聲明一個(gè)交換器歹垫,并設(shè)置相關(guān)屬性,比如交換機(jī)類型颠放、是否持久化等
- 生產(chǎn)者聲明一個(gè)隊(duì)列井設(shè)置相關(guān)屬性县钥,比如是否排他、是否持久化慈迈、是否自動(dòng)刪除等
- 生產(chǎn)者通過路由鍵將交換器和隊(duì)列綁定起來
- 生產(chǎn)者發(fā)送消息至 RabbitMQ Broker若贮,其中包含路由鍵、交換器等信息
- 相應(yīng)的交換器根據(jù)接收到的路由鍵查找相匹配的隊(duì)列
- 如果找到痒留,則將從 生產(chǎn)者發(fā)送過來的消息存入相應(yīng)的隊(duì)列中
- 如果沒有找到谴麦,則根據(jù)生產(chǎn)者配置的屬性選擇丟棄還是回退給生產(chǎn)者
- 關(guān)閉信道
- 關(guān)閉連接
消費(fèi)者端
- 消費(fèi)者連接到 RabbitMQ Broker,建立一個(gè)連接(Connection)伸头,開啟一個(gè)信道(Channel)匾效。
- 消費(fèi)者向RabbitMQ Broker請求消費(fèi)相應(yīng)隊(duì)列中的消息,可能會設(shè)置相應(yīng)的回調(diào)函數(shù)恤磷, 以及做一些準(zhǔn)備工作
- 等待 RabbitMQ Broker回應(yīng)并投遞相應(yīng)隊(duì)列中的消息
- 消費(fèi)者確認(rèn) (ack) 接收到的消息
- RabbitMQ從隊(duì)列中刪除相應(yīng)己經(jīng)被確認(rèn)的消息
- 關(guān)閉信道
- 關(guān)閉連接
在接下來的文章中我們將講解RabbitMQ的安裝與演示Java客戶端的使用