介紹
RabbitMQ是一個(gè)由erlang開發(fā)的基于AMQP(Advanced Message Queue)協(xié)議的開源實(shí)現(xiàn)沉噩。用于在分布式系統(tǒng)中存儲(chǔ)轉(zhuǎn)發(fā)消息,在易用性柱蟀、擴(kuò)展性川蒙、高可用性等方面都非常的優(yōu)秀。是當(dāng)前最主流的消息中間件之一长已。
概念:
Brocker:消息隊(duì)列服務(wù)器實(shí)體畜眨。
Exchange:消息交換機(jī),指定消息按什么規(guī)則术瓮,路由到哪個(gè)隊(duì)列康聂。
Queue:消息隊(duì)列,每個(gè)消息都會(huì)被投入到一個(gè)或者多個(gè)隊(duì)列里胞四。
Binding:綁定恬汁,它的作用是把exchange和queue按照路由規(guī)則binding起來。
Routing Key:路由關(guān)鍵字辜伟,exchange根據(jù)這個(gè)關(guān)鍵字進(jìn)行消息投遞氓侧。
Vhost:虛擬主機(jī)脊另,一個(gè)broker里可以開設(shè)多個(gè)vhost,用作不用用戶的權(quán)限分離约巷。
Producer:消息生產(chǎn)者尝蠕,就是投遞消息的程序。
Consumer:消息消費(fèi)者载庭,就是接受消息的程序看彼。
Channel:消息通道,在客戶端的每個(gè)連接里囚聚,可建立多個(gè)channel靖榕,每個(gè)channel代表一個(gè)會(huì)話任務(wù)。
消息隊(duì)列的使用過程大概如下:
消息接收
客戶端連接到消息隊(duì)列服務(wù)器顽铸,打開一個(gè)channel茁计。
客戶端聲明一個(gè)exchange,并設(shè)置相關(guān)屬性谓松。
客戶端聲明一個(gè)queue星压,并設(shè)置相關(guān)屬性。
客戶端使用routing key鬼譬,在exchange和queue之間建立好綁定關(guān)系娜膘。
消息發(fā)布
客戶端投遞消息到exchange。
exchange接收到消息后优质,就根據(jù)消息的key和已經(jīng)設(shè)置的binding竣贪,進(jìn)行消息路由,將消息投遞到一個(gè)或多個(gè)隊(duì)列里巩螃。
AMQP 里主要要說兩個(gè)組件:
Exchange 和 Queue
綠色的 X 就是 Exchange 演怎,紅色的是 Queue ,這兩者都在 Server 端避乏,又稱作 Broker
這部分是 RabbitMQ 實(shí)現(xiàn)的爷耀,而藍(lán)色的則是客戶端,通常有 Producer 和 Consumer 兩種類型拍皮。
Exchange通常分為四種:
fanout:該類型路由規(guī)則非常簡(jiǎn)單歹叮,會(huì)把所有發(fā)送到該Exchange的消息路由到所有與它綁定的Queue中,相當(dāng)于廣播功能
direct:該類型路由規(guī)則會(huì)將消息路由到binding key與routing key完全匹配的Queue中
topic:與direct類型相似春缕,只是規(guī)則沒有那么嚴(yán)格盗胀,可以模糊匹配和多條件匹配
headers:該類型不依賴于routing key與binding key的匹配規(guī)則來路由消息艘蹋,而是根據(jù)發(fā)送的消息內(nèi)容中的headers屬性進(jìn)行匹配
使用場(chǎng)景
下載與安裝
下載
安裝
先安裝erlang
然后再安裝rabbitmq
管理工具
參考官方文檔
操作起來很簡(jiǎn)單锄贼,只需要在DOS下面,進(jìn)入安裝目錄(安裝路徑\RabbitMQ Server\rabbitmq_server-3.2.2\sbin)執(zhí)行如下命令就可以成功安裝女阀。
rabbitmq-pluginsenable rabbitmq_management? ?
可以通過訪問:http://localhost:15672進(jìn)行測(cè)試宅荤,默認(rèn)的登陸賬號(hào)為:guest屑迂,密碼為:guest。
其他配置
1. 安裝完以后erlang需要手動(dòng)設(shè)置ERLANG_HOME 的系統(tǒng)變量冯键。
setERLANG_HOME=F:\Program Files\erl9.0#環(huán)境變量`path`里加入:%ERLANG_HOME%\bin#環(huán)境變量`path`里加入: 安裝路徑\RabbitMQ Server\rabbitmq_server-3.6.10\sbin
2.激活Rabbit MQ’s Management Plugin
使用Rabbit MQ 管理插件惹盼,可以更好的可視化方式查看Rabbit MQ 服務(wù)器實(shí)例的狀態(tài),你可以在命令行中使用下面的命令激活惫确。
rabbitmq-plugins.batenable? rabbitmq_management
3.創(chuàng)建管理用戶
rabbitmqctl.bat add_user sa 123456
4. 設(shè)置管理員
rabbitmqctl.bat set_user_tags sa administrator
5.設(shè)置權(quán)限
rabbitmqctl.bat set_permissions -p / sa".*"".*"".*"
6. 其他命令
#查詢用戶:? ? rabbitmqctl.bat list_users#查詢vhosts:? ? rabbitmqctl.bat list_vhosts#啟動(dòng)RabbitMQ服務(wù):? ? net stop RabbitMQ&& net start RabbitMQ? ?
以上這些手报,賬號(hào)、vhost改化、權(quán)限掩蛤、作用域等基本就設(shè)置完了。
基于.net使用
RabbitMQ.Client是RabbiMQ 官方提供的的客戶端
EasyNetQ是基于RabbitMQ.Client 基礎(chǔ)上封裝的開源客戶端,使用非常方便
以下操作RabbitMQ的代碼例子陈肛,都是基于EasyNetQ的使用和再封裝揍鸟,在文章底部有demo例子的源碼下載地址
創(chuàng)建 IBus
/// /// 消息服務(wù)器連接器/// publicclassBusBuilder{publicstaticIBusCreateMessageBus(){// 消息服務(wù)器連接字符串// var connectionString = ConfigurationManager.ConnectionStrings["RabbitMQ"];stringconnString="host=127.0.0.1:5672;virtualHost=TestQueue;username=sa;password=123456";if(connString==null||connString==string.Empty)thrownewException("messageserver connection string is missing or empty");returnRabbitHutch.CreateBus(connString);}}
Fanout Exchange
所有發(fā)送到Fanout Exchange的消息都會(huì)被轉(zhuǎn)發(fā)到與該Exchange 綁定(Binding)的所有Queue上。
Fanout Exchange 不需要處理RouteKey 句旱。只需要簡(jiǎn)單的將隊(duì)列綁定到exchange 上阳藻。這樣發(fā)送到exchange的消息都會(huì)被轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊(duì)列上。類似子網(wǎng)廣播谈撒,每臺(tái)子網(wǎng)內(nèi)的主機(jī)都獲得了一份復(fù)制的消息腥泥。 所以,F(xiàn)anout Exchange 轉(zhuǎn)發(fā)消息是最快的啃匿。
/// ///? 消息消耗(fanout)/// /// 消息類型/// 回調(diào)/// 交換器名/// 隊(duì)列名/// 路由名publicstaticvoidFanoutConsume(Actionhandler,stringexChangeName="fanout_mq",stringqueueName="fanout_queue_default",stringroutingKey="")whereT:class{varbus=BusBuilder.CreateMessageBus();varadbus=bus.Advanced;varexchange=adbus.ExchangeDeclare(exChangeName,ExchangeType.Fanout);varqueue=CreateQueue(adbus,queueName);adbus.Bind(exchange,queue,routingKey);adbus.Consume(queue,registration=>{registration.Add((message,info)=>{handler(message.Body);});});}/// /// 消息上報(bào)(fanout)/// /// 消息類型/// 主題名/// 消息命名/// 錯(cuò)誤信息/// publicstaticboolFanoutPush(Tt,outstringmsg,stringexChangeName="fanout_mq",stringroutingKey="")whereT:class{msg=string.Empty;try{using(varbus=BusBuilder.CreateMessageBus()){varadbus=bus.Advanced;varexchange=adbus.ExchangeDeclare(exChangeName,ExchangeType.Fanout);adbus.Publish(exchange,routingKey,false,newMessage(t));returntrue;}}catch(Exceptionex){msg=ex.ToString();returnfalse;}}
所有發(fā)送到Direct Exchange的消息被轉(zhuǎn)發(fā)到RouteKey中指定的Queue道川。
Direct模式,可以使用RabbitMQ自帶的Exchange:default Exchange 立宜。所以不需要將Exchange進(jìn)行任何綁定(binding)操作 冒萄。消息傳遞時(shí),RouteKey必須完全匹配橙数,才會(huì)被隊(duì)列接收尊流,否則該消息會(huì)被拋棄。
/// /// 消息發(fā)送(direct)/// /// 消息類型/// 發(fā)送到的隊(duì)列/// 發(fā)送內(nèi)容publicstaticvoidDirectSend(stringqueue,Tmessage)whereT:class{using(varbus=BusBuilder.CreateMessageBus()){bus.Send(queue,message);}}/// /// 消息接收(direct)/// /// 消息類型/// 接收的隊(duì)列/// 回調(diào)操作/// 錯(cuò)誤信息/// publicstaticboolDirectReceive(stringqueue,Actioncallback,outstringmsg)whereT:class{msg=string.Empty;try{varbus=BusBuilder.CreateMessageBus();bus.Receive(queue,callback);}catch(Exceptionex){msg=ex.ToString();returnfalse;}returntrue;}/// /// 消息發(fā)送/// /// /// /// /// /// /// /// publicstaticboolDirectPush(Tt,outstringmsg,stringexChangeName="direct_mq",stringroutingKey="direct_rout_default")whereT:class{msg=string.Empty;try{using(varbus=BusBuilder.CreateMessageBus()){varadbus=bus.Advanced;varexchange=adbus.ExchangeDeclare(exChangeName,ExchangeType.Direct);adbus.Publish(exchange,routingKey,false,newMessage(t));returntrue;}}catch(Exceptionex){msg=ex.ToString();returnfalse;}}/// /// 消息接收///? /// /// 消息類型/// 回調(diào)/// 交換器名/// 隊(duì)列名/// 路由名publicstaticboolDirectConsume(Actionhandler,outstringmsg,stringexChangeName="direct_mq",stringqueueName="direct_queue_default",stringroutingKey="direct_rout_default")whereT:class{msg=string.Empty;try{varbus=BusBuilder.CreateMessageBus();varadbus=bus.Advanced;varexchange=adbus.ExchangeDeclare(exChangeName,ExchangeType.Direct);varqueue=CreateQueue(adbus,queueName);adbus.Bind(exchange,queue,routingKey);adbus.Consume(queue,registration=>{registration.Add((message,info)=>{handler(message.Body);});});}catch(Exceptionex){msg=ex.ToString();returnfalse;}returntrue;}
Topic Exchange
消息發(fā)布(Publish)
要使用主題發(fā)布灯帮,只需使用帶有主題的重載的Publish方法:
varbus=RabbitHutch.CreateBus(...);bus.Publish(message,"X.A");
訂閱者可以通過指定要匹配的主題來過濾郵件崖技。
這些可以包括通配符:
*=>匹配一個(gè)字。
#=>匹配到零個(gè)或多個(gè)單詞钟哥。
所以發(fā)布的主題為“X.A.2”的消息將匹配“S祝”,“X.D宸。”吁恍,“* .A.*”,而不是“X.B. *”或“A”。
警告冀瓦,Publish只顧發(fā)送消息到隊(duì)列伴奥,但是不管有沒有消費(fèi)端訂閱,所以翼闽,發(fā)布之后拾徙,如果沒有消費(fèi)者,該消息將不會(huì)被消費(fèi)甚至丟失感局。
消息訂閱(Subscribe)
EasyNetQ提供了消息訂閱尼啡,當(dāng)調(diào)用Subscribe方法時(shí)候,EasyNetQ會(huì)創(chuàng)建一個(gè)用于接收消息的隊(duì)列询微,不過與消息發(fā)布不同的是玄叠,消息訂閱增加了一個(gè)參數(shù),subscribe_id.代碼如下:
bus.Subscribe("my_id",handler,x=>x.WithTopic("X.*"));
警告: 具有相同訂閱者但不同主題字符串的兩個(gè)單獨(dú)訂閱可能不會(huì)產(chǎn)生您期望的效果拓提。 subscriberId有效地標(biāo)識(shí)個(gè)體AMQP隊(duì)列读恃。 具有相同subscriptionId的兩個(gè)訂閱者將連接到相同的隊(duì)列,并且兩者都將添加自己的主題綁定代态。 所以寺惫,例如,如果你這樣做:
bus.Subscribe("my_id",handlerOfXDotStar,x=>x.WithTopic("X.*"));bus.Subscribe("my_id",handlerOfStarDotB,x=>x.WithTopic("*.B"));
匹配“x.”或“.B”的所有消息將被傳遞到“XXX_my_id”隊(duì)列蹦疑。 然后西雀,RabbitMQ將向兩個(gè)消費(fèi)者傳遞消息,其中handlerOfXDotStar和handlerOfStarDotB輪流獲取每條消息歉摧。
現(xiàn)在艇肴,如果你想要匹配多個(gè)主題(“X.”O(jiān)R“.B”),你可以使用另一個(gè)重載的訂閱方法叁温,它采用多個(gè)主題再悼,如下所示:
bus.Subscribe("my_id",handler,x=>x.WithTopic("X.*").WithTopic("*.B"));
/// /// 獲取主題 /// /// 主題內(nèi)容類型/// 訂閱者ID/// 消息接收響應(yīng)回調(diào)///? 訂閱主題集合publicstaticvoidTopicSubscribe(stringsubscriptionId,Actioncallback,paramsstring[]topics)whereT:class{varbus=BusBuilder.CreateMessageBus();bus.Subscribe(subscriptionId,callback,(config)=>{foreach(varitemintopics)config.WithTopic(item);});}/// /// 發(fā)布主題/// /// 主題內(nèi)容類型/// 主題名稱/// 主題內(nèi)容/// 錯(cuò)誤信息/// publicstaticboolTopicPublish(stringtopic,Tmessage,outstringmsg)whereT:class{msg=string.Empty;try{using(varbus=BusBuilder.CreateMessageBus()){bus.Publish(message,topic);returntrue;}}catch(Exceptionex){msg=ex.ToString();returnfalse;}}/// /// 發(fā)布主題/// /// /// 消息類型/// 消息內(nèi)容/// 主題名/// 錯(cuò)誤信息/// 交換器名/// publicstaticboolTopicSub(Tt,stringtopic,outstringmsg,stringexChangeName="topic_mq")whereT:class{msg=string.Empty;try{if(string.IsNullOrWhiteSpace(topic))thrownewException("推送主題不能為空");using(varbus=BusBuilder.CreateMessageBus()){varadbus=bus.Advanced;//var queue = adbus.QueueDeclare("user.notice.zhangsan");varexchange=adbus.ExchangeDeclare(exChangeName,ExchangeType.Topic);adbus.Publish(exchange,topic,false,newMessage(t));returntrue;}}catch(Exceptionex){msg=ex.ToString();returnfalse;}}/// /// 獲取主題 /// /// /// 消息類型/// 訂閱者ID/// 回調(diào)/// 交換器名/// 主題名publicstaticvoidTopicConsume(Actioncallback,stringexChangeName="topic_mq",stringsubscriptionId="topic_subid",paramsstring[]topics)whereT:class{varbus=BusBuilder.CreateMessageBus();varadbus=bus.Advanced;varexchange=adbus.ExchangeDeclare(exChangeName,ExchangeType.Topic);varqueue=adbus.QueueDeclare(subscriptionId);foreach(varitemintopics)adbus.Bind(exchange,queue,item);adbus.Consume(queue,registration=>{registration.Add((message,info)=>{callback(message.Body);});});}? ?
具體發(fā)布/訂閱消息的Demo和相關(guān)測(cè)試看源碼Demo
注意
當(dāng)在創(chuàng)建訂閱者去消費(fèi)隊(duì)列的時(shí)候
/// /// 獲取主題 /// /// publicstaticvoidGetSub(Ttopic,Actioncallback)whereT:class{using(varbus=BusBuilder.CreateMessageBus()){bus.Subscribe(topic.ToString(),callback,x=>x.WithTopic(topic.ToString()));}}
using里的對(duì)象在執(zhí)行完成后被回收了,導(dǎo)致剛連接上去就又?jǐn)嚅_了(剛開始寫的時(shí)候膝但,習(xí)慣性加using冲九,排查了好久才發(fā)現(xiàn),欲哭無淚)
源碼項(xiàng)目運(yùn)行前的準(zhǔn)備與確認(rèn):
到RabbitMQ管理后臺(tái)添加TestQueueVHost跟束,并且分配用戶權(quán)限莺奸,然后到RabbitMQHelper.BusBuilder類里配置RabbitMQ連接服務(wù)的相關(guān)信息host=127.0.0.1:5672;virtualHost=TestQueue;username=sa;password=123456,(根據(jù)配置的內(nèi)容和用戶修改)
參考資料(鳴謝):
.NET操作RabbitMQ組件EasyNetQ使用中文簡(jiǎn)版文檔冀宴。
附:Demo源碼GitHub地址
歡迎到原文地址關(guān)注和交流