RabbitMQ入門與使用篇(轉(zhuǎn))

介紹

RabbitMQ是一個(gè)由erlang開發(fā)的基于AMQP(Advanced Message Queue)協(xié)議的開源實(shí)現(xiàn)沉噩。用于在分布式系統(tǒng)中存儲(chǔ)轉(zhuǎn)發(fā)消息,在易用性柱蟀、擴(kuò)展性川蒙、高可用性等方面都非常的優(yōu)秀。是當(dāng)前最主流的消息中間件之一长已。

RabbitMQ的官方

概念:

Brocker:消息隊(duì)列服務(wù)器實(shí)體畜眨。

Exchange:消息交換機(jī),指定消息按什么規(guī)則术瓮,路由到哪個(gè)隊(duì)列康聂。

Queue:消息隊(duì)列,每個(gè)消息都會(huì)被投入到一個(gè)或者多個(gè)隊(duì)列里胞四。

Binding:綁定恬汁,它的作用是把exchange和queue按照路由規(guī)則binding起來。

Routing Key:路由關(guān)鍵字辜伟,exchange根據(jù)這個(gè)關(guān)鍵字進(jìn)行消息投遞氓侧。

Vhost:虛擬主機(jī)脊另,一個(gè)broker里可以開設(shè)多個(gè)vhost,用作不用用戶的權(quán)限分離约巷。

Producer:消息生產(chǎn)者尝蠕,就是投遞消息的程序。

Consumer:消息消費(fèi)者载庭,就是接受消息的程序看彼。

Channel:消息通道,在客戶端的每個(gè)連接里囚聚,可建立多個(gè)channel靖榕,每個(gè)channel代表一個(gè)會(huì)話任務(wù)。

消息隊(duì)列的使用過程大概如下:

消息接收

客戶端連接到消息隊(duì)列服務(wù)器顽铸,打開一個(gè)channel茁计。

客戶端聲明一個(gè)exchange,并設(shè)置相關(guān)屬性谓松。

客戶端聲明一個(gè)queue星压,并設(shè)置相關(guān)屬性。

客戶端使用routing key鬼譬,在exchange和queue之間建立好綁定關(guān)系娜膘。

消息發(fā)布

客戶端投遞消息到exchange。

exchange接收到消息后优质,就根據(jù)消息的key和已經(jīng)設(shè)置的binding竣贪,進(jìn)行消息路由,將消息投遞到一個(gè)或多個(gè)隊(duì)列里巩螃。

AMQP 里主要要說兩個(gè)組件:

Exchange 和 Queue

綠色的 X 就是 Exchange 演怎,紅色的是 Queue ,這兩者都在 Server 端避乏,又稱作 Broker

這部分是 RabbitMQ 實(shí)現(xiàn)的爷耀,而藍(lán)色的則是客戶端,通常有 Producer 和 Consumer 兩種類型拍皮。

Exchange通常分為四種:

fanout:該類型路由規(guī)則非常簡(jiǎn)單歹叮,會(huì)把所有發(fā)送到該Exchange的消息路由到所有與它綁定的Queue中,相當(dāng)于廣播功能

direct:該類型路由規(guī)則會(huì)將消息路由到binding key與routing key完全匹配的Queue中

topic:與direct類型相似春缕,只是規(guī)則沒有那么嚴(yán)格盗胀,可以模糊匹配和多條件匹配

headers:該類型不依賴于routing key與binding key的匹配規(guī)則來路由消息艘蹋,而是根據(jù)發(fā)送的消息內(nèi)容中的headers屬性進(jìn)行匹配

使用場(chǎng)景

官方介紹

下載與安裝

下載

rabbitmq

erlang

安裝

先安裝erlang

然后再安裝rabbitmq

管理工具

參考官方文檔

操作起來很簡(jiǎn)單锄贼,只需要在DOS下面,進(jìn)入安裝目錄(安裝路徑\RabbitMQ Server\rabbitmq_server-3.2.2\sbin)執(zhí)行如下命令就可以成功安裝女阀。

rabbitmq-pluginsenable rabbitmq_management? ?

可以通過訪問:http://localhost:15672進(jìn)行測(cè)試宅荤,默認(rèn)的登陸賬號(hào)為:guest屑迂,密碼為:guest。

其他配置

1. 安裝完以后erlang需要手動(dòng)設(shè)置ERLANG_HOME 的系統(tǒng)變量冯键。

setERLANG_HOME=F:\Program Files\erl9.0#環(huán)境變量`path`里加入:%ERLANG_HOME%\bin#環(huán)境變量`path`里加入: 安裝路徑\RabbitMQ Server\rabbitmq_server-3.6.10\sbin

2.激活Rabbit MQ’s Management Plugin

使用Rabbit MQ 管理插件惹盼,可以更好的可視化方式查看Rabbit MQ 服務(wù)器實(shí)例的狀態(tài),你可以在命令行中使用下面的命令激活惫确。

rabbitmq-plugins.batenable? rabbitmq_management

3.創(chuàng)建管理用戶

rabbitmqctl.bat add_user sa 123456

4. 設(shè)置管理員

rabbitmqctl.bat set_user_tags sa administrator

5.設(shè)置權(quán)限

rabbitmqctl.bat set_permissions -p / sa".*"".*"".*"

6. 其他命令

#查詢用戶:? ? rabbitmqctl.bat list_users#查詢vhosts:? ? rabbitmqctl.bat list_vhosts#啟動(dòng)RabbitMQ服務(wù):? ? net stop RabbitMQ&& net start RabbitMQ? ?

以上這些手报,賬號(hào)、vhost改化、權(quán)限掩蛤、作用域等基本就設(shè)置完了。

基于.net使用

RabbitMQ.Client是RabbiMQ 官方提供的的客戶端

EasyNetQ是基于RabbitMQ.Client 基礎(chǔ)上封裝的開源客戶端,使用非常方便

以下操作RabbitMQ的代碼例子陈肛,都是基于EasyNetQ的使用和再封裝揍鸟,在文章底部有demo例子的源碼下載地址

創(chuàng)建 IBus

/// /// 消息服務(wù)器連接器/// publicclassBusBuilder{publicstaticIBusCreateMessageBus(){// 消息服務(wù)器連接字符串// var connectionString = ConfigurationManager.ConnectionStrings["RabbitMQ"];stringconnString="host=127.0.0.1:5672;virtualHost=TestQueue;username=sa;password=123456";if(connString==null||connString==string.Empty)thrownewException("messageserver connection string is missing or empty");returnRabbitHutch.CreateBus(connString);}}

Fanout Exchange

所有發(fā)送到Fanout Exchange的消息都會(huì)被轉(zhuǎn)發(fā)到與該Exchange 綁定(Binding)的所有Queue上。

Fanout Exchange 不需要處理RouteKey 句旱。只需要簡(jiǎn)單的將隊(duì)列綁定到exchange 上阳藻。這樣發(fā)送到exchange的消息都會(huì)被轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊(duì)列上。類似子網(wǎng)廣播谈撒,每臺(tái)子網(wǎng)內(nèi)的主機(jī)都獲得了一份復(fù)制的消息腥泥。 所以,F(xiàn)anout Exchange 轉(zhuǎn)發(fā)消息是最快的啃匿。

/// ///? 消息消耗(fanout)/// /// 消息類型/// 回調(diào)/// 交換器名/// 隊(duì)列名/// 路由名publicstaticvoidFanoutConsume(Actionhandler,stringexChangeName="fanout_mq",stringqueueName="fanout_queue_default",stringroutingKey="")whereT:class{varbus=BusBuilder.CreateMessageBus();varadbus=bus.Advanced;varexchange=adbus.ExchangeDeclare(exChangeName,ExchangeType.Fanout);varqueue=CreateQueue(adbus,queueName);adbus.Bind(exchange,queue,routingKey);adbus.Consume(queue,registration=>{registration.Add((message,info)=>{handler(message.Body);});});}/// /// 消息上報(bào)(fanout)/// /// 消息類型/// 主題名/// 消息命名/// 錯(cuò)誤信息/// publicstaticboolFanoutPush(Tt,outstringmsg,stringexChangeName="fanout_mq",stringroutingKey="")whereT:class{msg=string.Empty;try{using(varbus=BusBuilder.CreateMessageBus()){varadbus=bus.Advanced;varexchange=adbus.ExchangeDeclare(exChangeName,ExchangeType.Fanout);adbus.Publish(exchange,routingKey,false,newMessage(t));returntrue;}}catch(Exceptionex){msg=ex.ToString();returnfalse;}}

所有發(fā)送到Direct Exchange的消息被轉(zhuǎn)發(fā)到RouteKey中指定的Queue道川。

Direct模式,可以使用RabbitMQ自帶的Exchange:default Exchange 立宜。所以不需要將Exchange進(jìn)行任何綁定(binding)操作 冒萄。消息傳遞時(shí),RouteKey必須完全匹配橙数,才會(huì)被隊(duì)列接收尊流,否則該消息會(huì)被拋棄。

/// /// 消息發(fā)送(direct)/// /// 消息類型/// 發(fā)送到的隊(duì)列/// 發(fā)送內(nèi)容publicstaticvoidDirectSend(stringqueue,Tmessage)whereT:class{using(varbus=BusBuilder.CreateMessageBus()){bus.Send(queue,message);}}/// /// 消息接收(direct)/// /// 消息類型/// 接收的隊(duì)列/// 回調(diào)操作/// 錯(cuò)誤信息/// publicstaticboolDirectReceive(stringqueue,Actioncallback,outstringmsg)whereT:class{msg=string.Empty;try{varbus=BusBuilder.CreateMessageBus();bus.Receive(queue,callback);}catch(Exceptionex){msg=ex.ToString();returnfalse;}returntrue;}/// /// 消息發(fā)送/// /// /// /// /// /// /// /// publicstaticboolDirectPush(Tt,outstringmsg,stringexChangeName="direct_mq",stringroutingKey="direct_rout_default")whereT:class{msg=string.Empty;try{using(varbus=BusBuilder.CreateMessageBus()){varadbus=bus.Advanced;varexchange=adbus.ExchangeDeclare(exChangeName,ExchangeType.Direct);adbus.Publish(exchange,routingKey,false,newMessage(t));returntrue;}}catch(Exceptionex){msg=ex.ToString();returnfalse;}}/// /// 消息接收///? /// /// 消息類型/// 回調(diào)/// 交換器名/// 隊(duì)列名/// 路由名publicstaticboolDirectConsume(Actionhandler,outstringmsg,stringexChangeName="direct_mq",stringqueueName="direct_queue_default",stringroutingKey="direct_rout_default")whereT:class{msg=string.Empty;try{varbus=BusBuilder.CreateMessageBus();varadbus=bus.Advanced;varexchange=adbus.ExchangeDeclare(exChangeName,ExchangeType.Direct);varqueue=CreateQueue(adbus,queueName);adbus.Bind(exchange,queue,routingKey);adbus.Consume(queue,registration=>{registration.Add((message,info)=>{handler(message.Body);});});}catch(Exceptionex){msg=ex.ToString();returnfalse;}returntrue;}

Topic Exchange

消息發(fā)布(Publish)

要使用主題發(fā)布灯帮,只需使用帶有主題的重載的Publish方法:

varbus=RabbitHutch.CreateBus(...);bus.Publish(message,"X.A");

訂閱者可以通過指定要匹配的主題來過濾郵件崖技。

這些可以包括通配符:

*=>匹配一個(gè)字。

#=>匹配到零個(gè)或多個(gè)單詞钟哥。

所以發(fā)布的主題為“X.A.2”的消息將匹配“S祝”,“X.D宸。”吁恍,“* .A.*”,而不是“X.B. *”或“A”。

警告冀瓦,Publish只顧發(fā)送消息到隊(duì)列伴奥,但是不管有沒有消費(fèi)端訂閱,所以翼闽,發(fā)布之后拾徙,如果沒有消費(fèi)者,該消息將不會(huì)被消費(fèi)甚至丟失感局。

消息訂閱(Subscribe)

EasyNetQ提供了消息訂閱尼啡,當(dāng)調(diào)用Subscribe方法時(shí)候,EasyNetQ會(huì)創(chuàng)建一個(gè)用于接收消息的隊(duì)列询微,不過與消息發(fā)布不同的是玄叠,消息訂閱增加了一個(gè)參數(shù),subscribe_id.代碼如下:

bus.Subscribe("my_id",handler,x=>x.WithTopic("X.*"));

警告: 具有相同訂閱者但不同主題字符串的兩個(gè)單獨(dú)訂閱可能不會(huì)產(chǎn)生您期望的效果拓提。 subscriberId有效地標(biāo)識(shí)個(gè)體AMQP隊(duì)列读恃。 具有相同subscriptionId的兩個(gè)訂閱者將連接到相同的隊(duì)列,并且兩者都將添加自己的主題綁定代态。 所以寺惫,例如,如果你這樣做:

bus.Subscribe("my_id",handlerOfXDotStar,x=>x.WithTopic("X.*"));bus.Subscribe("my_id",handlerOfStarDotB,x=>x.WithTopic("*.B"));

匹配“x.”或“.B”的所有消息將被傳遞到“XXX_my_id”隊(duì)列蹦疑。 然后西雀,RabbitMQ將向兩個(gè)消費(fèi)者傳遞消息,其中handlerOfXDotStar和handlerOfStarDotB輪流獲取每條消息歉摧。

現(xiàn)在艇肴,如果你想要匹配多個(gè)主題(“X.”O(jiān)R“.B”),你可以使用另一個(gè)重載的訂閱方法叁温,它采用多個(gè)主題再悼,如下所示:

bus.Subscribe("my_id",handler,x=>x.WithTopic("X.*").WithTopic("*.B"));

/// /// 獲取主題 /// /// 主題內(nèi)容類型/// 訂閱者ID/// 消息接收響應(yīng)回調(diào)///? 訂閱主題集合publicstaticvoidTopicSubscribe(stringsubscriptionId,Actioncallback,paramsstring[]topics)whereT:class{varbus=BusBuilder.CreateMessageBus();bus.Subscribe(subscriptionId,callback,(config)=>{foreach(varitemintopics)config.WithTopic(item);});}/// /// 發(fā)布主題/// /// 主題內(nèi)容類型/// 主題名稱/// 主題內(nèi)容/// 錯(cuò)誤信息/// publicstaticboolTopicPublish(stringtopic,Tmessage,outstringmsg)whereT:class{msg=string.Empty;try{using(varbus=BusBuilder.CreateMessageBus()){bus.Publish(message,topic);returntrue;}}catch(Exceptionex){msg=ex.ToString();returnfalse;}}/// /// 發(fā)布主題/// /// /// 消息類型/// 消息內(nèi)容/// 主題名/// 錯(cuò)誤信息/// 交換器名/// publicstaticboolTopicSub(Tt,stringtopic,outstringmsg,stringexChangeName="topic_mq")whereT:class{msg=string.Empty;try{if(string.IsNullOrWhiteSpace(topic))thrownewException("推送主題不能為空");using(varbus=BusBuilder.CreateMessageBus()){varadbus=bus.Advanced;//var queue = adbus.QueueDeclare("user.notice.zhangsan");varexchange=adbus.ExchangeDeclare(exChangeName,ExchangeType.Topic);adbus.Publish(exchange,topic,false,newMessage(t));returntrue;}}catch(Exceptionex){msg=ex.ToString();returnfalse;}}/// /// 獲取主題 /// /// /// 消息類型/// 訂閱者ID/// 回調(diào)/// 交換器名/// 主題名publicstaticvoidTopicConsume(Actioncallback,stringexChangeName="topic_mq",stringsubscriptionId="topic_subid",paramsstring[]topics)whereT:class{varbus=BusBuilder.CreateMessageBus();varadbus=bus.Advanced;varexchange=adbus.ExchangeDeclare(exChangeName,ExchangeType.Topic);varqueue=adbus.QueueDeclare(subscriptionId);foreach(varitemintopics)adbus.Bind(exchange,queue,item);adbus.Consume(queue,registration=>{registration.Add((message,info)=>{callback(message.Body);});});}? ?

具體發(fā)布/訂閱消息的Demo和相關(guān)測(cè)試看源碼Demo

注意

當(dāng)在創(chuàng)建訂閱者去消費(fèi)隊(duì)列的時(shí)候

/// /// 獲取主題 /// /// publicstaticvoidGetSub(Ttopic,Actioncallback)whereT:class{using(varbus=BusBuilder.CreateMessageBus()){bus.Subscribe(topic.ToString(),callback,x=>x.WithTopic(topic.ToString()));}}

using里的對(duì)象在執(zhí)行完成后被回收了,導(dǎo)致剛連接上去就又?jǐn)嚅_了(剛開始寫的時(shí)候膝但,習(xí)慣性加using冲九,排查了好久才發(fā)現(xiàn),欲哭無淚)

源碼項(xiàng)目運(yùn)行前的準(zhǔn)備與確認(rèn):

到RabbitMQ管理后臺(tái)添加TestQueueVHost跟束,并且分配用戶權(quán)限莺奸,然后到RabbitMQHelper.BusBuilder類里配置RabbitMQ連接服務(wù)的相關(guān)信息host=127.0.0.1:5672;virtualHost=TestQueue;username=sa;password=123456,(根據(jù)配置的內(nèi)容和用戶修改)

參考資料(鳴謝):

EasyNetQ-基于Topic的路由

.NET操作RabbitMQ組件EasyNetQ使用中文簡(jiǎn)版文檔冀宴。

RabbitMQ入門指南

附:Demo源碼GitHub地址


歡迎到原文地址關(guān)注和交流

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末灭贷,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子略贮,更是在濱河造成了極大的恐慌甚疟,老刑警劉巖仗岖,帶你破解...
    沈念sama閱讀 211,948評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異古拴,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)真友,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,371評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門黄痪,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人盔然,你說我怎么就攤上這事桅打。” “怎么了愈案?”我有些...
    開封第一講書人閱讀 157,490評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵挺尾,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我站绪,道長(zhǎng)遭铺,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,521評(píng)論 1 284
  • 正文 為了忘掉前任恢准,我火速辦了婚禮魂挂,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘馁筐。我一直安慰自己涂召,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,627評(píng)論 6 386
  • 文/花漫 我一把揭開白布敏沉。 她就那樣靜靜地躺著果正,像睡著了一般。 火紅的嫁衣襯著肌膚如雪盟迟。 梳的紋絲不亂的頭發(fā)上秋泳,一...
    開封第一講書人閱讀 49,842評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音攒菠,去河邊找鬼轮锥。 笑死,一個(gè)胖子當(dāng)著我的面吹牛要尔,可吹牛的內(nèi)容都是我干的舍杜。 我是一名探鬼主播,決...
    沈念sama閱讀 38,997評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼赵辕,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼既绩!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起还惠,我...
    開封第一講書人閱讀 37,741評(píng)論 0 268
  • 序言:老撾萬榮一對(duì)情侶失蹤饲握,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體救欧,經(jīng)...
    沈念sama閱讀 44,203評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡衰粹,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,534評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了笆怠。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片铝耻。...
    茶點(diǎn)故事閱讀 38,673評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖蹬刷,靈堂內(nèi)的尸體忽然破棺而出瓢捉,到底是詐尸還是另有隱情,我是刑警寧澤办成,帶...
    沈念sama閱讀 34,339評(píng)論 4 330
  • 正文 年R本政府宣布泡态,位于F島的核電站,受9級(jí)特大地震影響迂卢,放射性物質(zhì)發(fā)生泄漏某弦。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,955評(píng)論 3 313
  • 文/蒙蒙 一而克、第九天 我趴在偏房一處隱蔽的房頂上張望刀崖。 院中可真熱鬧,春花似錦拍摇、人聲如沸亮钦。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,770評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽蜂莉。三九已至,卻和暖如春混卵,著一層夾襖步出監(jiān)牢的瞬間映穗,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,000評(píng)論 1 266
  • 我被黑心中介騙來泰國打工幕随, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留蚁滋,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,394評(píng)論 2 360
  • 正文 我出身青樓赘淮,卻偏偏與公主長(zhǎng)得像辕录,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子梢卸,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,562評(píng)論 2 349

推薦閱讀更多精彩內(nèi)容

  • 介紹 RabbitMQ是一個(gè)由erlang開發(fā)的基于AMQP(Advanced Message Queue)協(xié)議的...
    SFLYQ閱讀 1,111評(píng)論 0 9
  • 來源 RabbitMQ是用Erlang實(shí)現(xiàn)的一個(gè)高并發(fā)高可靠AMQP消息隊(duì)列服務(wù)器走诞。支持消息的持久化、事務(wù)蛤高、擁塞控...
    jiangmo閱讀 10,350評(píng)論 2 34
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理蚣旱,服務(wù)發(fā)現(xiàn)碑幅,斷路器,智...
    卡卡羅2017閱讀 134,633評(píng)論 18 139
  • 轉(zhuǎn)載2017年11月01日 09:54:03 2595 RabbitMQ 即一個(gè)消息隊(duì)列塞绿,主要是用來實(shí)現(xiàn)應(yīng)用程序的...
    楊傳池chris閱讀 6,346評(píng)論 1 0
  • RabbitMQ 即一個(gè)消息隊(duì)列沟涨,主要是用來實(shí)現(xiàn)應(yīng)用程序的異步和解耦,同時(shí)也能起到消息緩沖异吻,消息分發(fā)的作用裹赴。 消息...
    彩虹之夢(mèng)閱讀 1,082評(píng)論 2 1