0. 基礎(chǔ)概念
消息隊(duì)列
消息隊(duì)列(message queue)是應(yīng)用程序間通信的一種方式上炎。應(yīng)用程序通過讀寫出入隊(duì)列的消息來(lái)通信蜈抓,而非直接調(diào)用彼此來(lái)通信(例如RPC遠(yuǎn)程進(jìn)程調(diào)用)檩禾。消息隊(duì)列也是分布式應(yīng)用間交換信息的一種技術(shù)壶硅。隊(duì)列中的消息是可以駐留在內(nèi)存或者磁盤上饱苟,直到存儲(chǔ)的消息被應(yīng)用讀取芙粱。通過消息隊(duì)列,應(yīng)用可以獨(dú)立的運(yùn)行,消息的生產(chǎn)者不需要知道消息的消費(fèi)者位置吭狡,消息的消費(fèi)者也不需要知道消息的生產(chǎn)者在哪里尖殃。
AMQP
AMQP(Advanced Message Queuing Protocol)高級(jí)消息隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個(gè)開放標(biāo)準(zhǔn)划煮,為面向消息的中間件設(shè)計(jì)送丰。消息中間件主要用于組件之間的解耦,消息的發(fā)送者無(wú)需知道消息使用者的存在弛秋,反之亦然器躏。
AMQP的主要特征是面向消息、隊(duì)列蟹略、路由(包括點(diǎn)對(duì)點(diǎn)和發(fā)布/訂閱)登失、可靠性、安全挖炬。
rabbitmq
RabbitMQ是一個(gè)開源的AMQP實(shí)現(xiàn)揽浙,服務(wù)器端用Erlang語(yǔ)言編寫,支持多種客戶端意敛,如:Python馅巷、Ruby、.NET草姻、Java钓猬、JMS、C撩独、PHP敞曹、ActionScript、XMPP综膀、STOMP等澳迫,支持AJAX。用于在分布式系統(tǒng)中存儲(chǔ)轉(zhuǎn)發(fā)消息僧须,在易用性纲刀、擴(kuò)展性、高可用性等方面表現(xiàn)不俗担平。
可靠性(Reliability)
RabbitMQ 使用一些機(jī)制來(lái)保證可靠性示绊,如持久化、傳輸確認(rèn)暂论、發(fā)布確認(rèn)面褐。靈活的路由(Flexible Routing)
在消息進(jìn)入隊(duì)列之前,通過 Exchange 來(lái)路由消息的取胎。對(duì)于典型的路由功能展哭,RabbitMQ 已經(jīng)提供了一些內(nèi)置的 Exchange 來(lái)實(shí)現(xiàn)湃窍。針對(duì)更復(fù)雜的路由功能,可以將多個(gè) Exchange 綁定在一起匪傍,也通過插件機(jī)制實(shí)現(xiàn)自己的Exchange 您市。消息集群(Clustering)
多個(gè) RabbitMQ 服務(wù)器可以組成一個(gè)集群,形成一個(gè)邏輯 Broker 役衡。高可用(Highly Available Queues)
隊(duì)列可以在集群中的機(jī)器上進(jìn)行鏡像茵休,使得在部分節(jié)點(diǎn)出問題的情況下隊(duì)列仍然可用。多種協(xié)議(Multi-protocol)
RabbitMQ 支持多種消息隊(duì)列協(xié)議手蝎,比如 STOMP榕莺、MQTT 等等。多語(yǔ)言客戶端(Many Clients)
RabbitMQ 幾乎支持所有常用語(yǔ)言棵介,比如 Java钉鸯、.NET、Ruby 等等邮辽。管理界面(Management UI)
RabbitMQ 提供了一個(gè)易用的用戶界面唠雕,使得用戶可以監(jiān)控和管理消息 Broker 的許多方面。跟蹤機(jī)制(Tracing)
如果消息異常逆巍,RabbitMQ 提供了消息跟蹤機(jī)制及塘,使用者可以找出發(fā)生了什么。插件機(jī)制(Plugin System)
RabbitMQ 提供了許多插件锐极,來(lái)從多方面進(jìn)行擴(kuò)展,也可以編寫自己的插件芳肌。
下圖是一個(gè)簡(jiǎn)單的消息隊(duì)列圖灵再,P是生產(chǎn)者,C是消費(fèi)者
1. rabbitmq服務(wù)端的安裝
Ubuntu 系統(tǒng)安裝
apt-get install erlang
apt-get install rabbitmq-server
service rabbitmq-server start #啟動(dòng)rabbitmq-server服務(wù)
Centos 系統(tǒng)安裝
yum install erlang
yum install rabbitmq-server #啟動(dòng)rabbitmq-server服務(wù)
chkconfig rabbitmq-server on #添加到啟動(dòng)項(xiàng)
service rabbitmq-server start
rabbitmqctl status 查看rabbitmq-server的運(yùn)行狀態(tài)
2. 用戶管理及安裝插件
用戶管理
rabbitmqctl list_users #查看用戶
abbitmqctl add_user username password #添加用戶
rabbitmqctl delete_user username #刪除用戶
rabbitmqctl change_password username newpassword #用戶名修改密碼
rabbitmqctl set_user_tags username administrator
rabbitmqctl set_permissions -p / username "." "." ".*"
插件安裝
rabbitmq-plugins list #查看安裝的插件
rabbitmq-plugins enable rabbitmq_management #安裝web管理rabbitmq插件
重啟rabbitmq-server
訪問 http://localhost:15672 使用添加的用戶可以進(jìn)入web訪問
3. 生產(chǎn)者消費(fèi)者Python代碼驗(yàn)證
python 安裝rabbitmq庫(kù)
pip install pika
生產(chǎn)者
import pika
username = "root"
passwd = "password"
auth = pika.PlainCredentials(username, passwd)
s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.100', credentials=auth))
channel = s_conn.channel()
channel.queue_declare(queue='hello')
for i in range(1,10):
channel.basic_publish(exchange='', routing_key='hello', body='message '+ str(i))
print("[生產(chǎn)者] send 'hello" + 'message '+ str(i))
s_conn.close()
代碼解讀
pika.PlainCredentials
是一種身份證數(shù)據(jù)格式亿笤,將用戶名密碼包含在內(nèi)翎迁,作為pika.ConnectionParameters
的一個(gè)參數(shù),若不指定用戶名和密碼,默認(rèn)會(huì)使用guest/guest作為用戶名和密碼來(lái)使用净薛。pika.ConnectionParameters
有如下多個(gè)連接參數(shù)配置汪榔,包括用戶身份、主機(jī)肃拜、端口痴腌、加密、超時(shí)時(shí)間等燃领。
host=_DEFAULT,port=_DEFAULT,virtual_host=_DEFAULT,credentials=_DEFAULT,channel_max=_DEFAULT,frame_max=_DEFAULT,heartbeat=_DEFAULT,ssl=_DEFAULT,ssl_options=_DEFAULT,connection_attempts=_DEFAULT,retry_delay=_DEFAULT,socket_timeout=_DEFAULT,locale=_DEFAULT,backpressure_detection=_DEFAULT,blocked_connection_timeout=_DEFAULT,client_properties=_DEFAULT,tcp_options=_DEFAULT
pika.BlockingConnection
創(chuàng)建一個(gè)連接士聪,s_conn.channel()
創(chuàng)建一個(gè)頻道、queue_declare
指定一個(gè)隊(duì)列也可以配置是否需要數(shù)據(jù)持久猛蔽,basic_publish()
發(fā)送消息剥悟,exchange, routing_key, body,properties=None, mandatory=False, immediate=False
是basic_publish的參數(shù)灵寺,exchange是交換器,routing_key是轉(zhuǎn)發(fā)至哪一個(gè)隊(duì)列区岗,如果有多個(gè)queue略板,可以模糊匹配發(fā)送至匹配到的queue,body是發(fā)送的消息體,properties中可以配置數(shù)據(jù)持久化慈缔,properties=pika.BasicProperties(delivery_mode=2,)
消費(fèi)者
import pika
def callback(ch, method, properties, body):
print(" [消費(fèi)者] Received %r" % body)
#ch.basic_ack(delivery_tag = method.delivery_tag)
username = "root"
passwd = "password"
auth = pika.PlainCredentials(username, passwd)
s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.100', credentials=auth))
channel = s_conn.channel()
channel.queue_declare(queue='hello')
channel.basic_qos(prefetch_count=1)
channel.basic_consume(consumer_callback=callback,queue="hello",no_ack=True)
channel.start_consuming()
代碼解讀
其他與生產(chǎn)者基本類似叮称,basic_consume
是從隊(duì)列中獲取數(shù)據(jù),callback是有消息后產(chǎn)生的回調(diào)函數(shù)胀糜,queue是隊(duì)列名稱颅拦,no_ack是獲取消息后是否發(fā)送ack回應(yīng),若為False則需要代碼回復(fù)ack教藻,若為True距帅,內(nèi)部封裝會(huì)自動(dòng)回復(fù)ack。basic_qos
就是RabbitMQ給消費(fèi)者發(fā)消息的時(shí)候檢測(cè)下消費(fèi)者里的消息數(shù)量括堤,如果超過指定值(比如1條)碌秸,就不給你發(fā)了。
4. 消息持久化
rabbitmq 數(shù)據(jù)持久化需要滿足三個(gè)條件
- 隊(duì)列的持久化
- 交換器的持久化
- 消息的持久化
若durable=False悄窃,rabbitmq重啟之后exchange讥电、queue和message都會(huì)被清除
若durable=True,rabbitmq重啟之后exchange和queue不會(huì)被清除,但是如果delivery_mode=1轧抗,數(shù)據(jù)依舊會(huì)被清除
只有當(dāng)durable=True恩敌,delivery_mode=2時(shí),才能實(shí)現(xiàn)消息的持久化横媚。
5 消息策略
rabbitmq消息基本原理
我們?cè)陂_篇的時(shí)候就留了一個(gè)坑纠炮,就是那個(gè)應(yīng)用結(jié)構(gòu)圖里面,消費(fèi)者Client A和消費(fèi)者Client B是如何知道我發(fā)送的消息是給Queue1還是給Queue2灯蝴,有沒有過這個(gè)問題恢口,那么我們就來(lái)解開這個(gè)面紗,看看到底是個(gè)什么構(gòu)造穷躁。首先明確一點(diǎn)就是生產(chǎn)者產(chǎn)生的消息并不是直接發(fā)送給消息隊(duì)列Queue的耕肩,而是要經(jīng)過Exchange(交換器),由Exchange再將消息路由到一個(gè)或多個(gè)Queue问潭,當(dāng)然這里還會(huì)對(duì)不符合路由規(guī)則的消息進(jìn)行丟棄掉猿诸,這里指的是后續(xù)要談到的Exchange Type。那么Exchange是怎樣將消息準(zhǔn)確的推送到對(duì)應(yīng)的Queue的呢睦授?那么這里的功勞最大的當(dāng)屬Binding两芳,RabbitMQ是通過Binding將Exchange和Queue鏈接在一起,這樣Exchange就知道如何將消息準(zhǔn)確的推送到Queue中去去枷。簡(jiǎn)單示意圖如下所示:
在綁定(Binding)Exchange和Queue的同時(shí)怖辆,一般會(huì)指定一個(gè)Binding Key是复,生產(chǎn)者將消息發(fā)送給Exchange的時(shí)候,一般會(huì)產(chǎn)生一個(gè)Routing Key竖螃,當(dāng)Routing Key和Binding Key對(duì)應(yīng)上的時(shí)候淑廊,消息就會(huì)發(fā)送到對(duì)應(yīng)的Queue中去。那么Exchange有四種類型特咆,不同的類型有著不同的策略季惩。也就是表明不同的類型將決定綁定的Queue不同,換言之就是說(shuō)生產(chǎn)者發(fā)送了一個(gè)消息腻格,Routing Key的規(guī)則是A画拾,那么生產(chǎn)者會(huì)將Routing Key=A的消息推送到Exchange中,這時(shí)候Exchange中會(huì)有自己的規(guī)則菜职,對(duì)應(yīng)的規(guī)則去篩選生產(chǎn)者發(fā)來(lái)的消息青抛,如果能夠?qū)?yīng)上Exchange的內(nèi)部規(guī)則就將消息推送到對(duì)應(yīng)的Queue中去。那么接下來(lái)就來(lái)詳細(xì)講解下Exchange里面類型酬核。
Exchange Type
我來(lái)用表格來(lái)描述下類型以及類型之間的區(qū)別蜜另。
- fanout
fanout類型的Exchange路由規(guī)則非常簡(jiǎn)單,它會(huì)把所有發(fā)送到該Exchange的消息路由到所有與它綁定的Queue中嫡意。
上圖所示举瑰,生產(chǎn)者(P)生產(chǎn)消息1將消息1推送到Exchange,由于Exchange Type=fanout這時(shí)候會(huì)遵循fanout的規(guī)則將消息推送到所有與它綁定Queue蔬螟,也就是圖上的兩個(gè)Queue最后兩個(gè)消費(fèi)者消費(fèi)此迅。
- direct
direct類型的Exchange路由規(guī)則也很簡(jiǎn)單,它會(huì)把消息路由到那些binding key與routing key完全匹配的Queue中
當(dāng)生產(chǎn)者(P)發(fā)送消息時(shí)Rotuing key=booking時(shí)旧巾,這時(shí)候?qū)⑾魉徒oExchange邮屁,Exchange獲取到生產(chǎn)者發(fā)送過來(lái)消息后,會(huì)根據(jù)自身的規(guī)則進(jìn)行與匹配相應(yīng)的Queue菠齿,這時(shí)發(fā)現(xiàn)Queue1和Queue2都符合,就會(huì)將消息傳送給這兩個(gè)隊(duì)列坐昙,如果我們以Rotuing key=create和Rotuing key=confirm發(fā)送消息時(shí)绳匀,這時(shí)消息只會(huì)被推送到Queue2隊(duì)列中,其他Routing Key的消息將會(huì)被丟棄炸客。
- topic
前面提到的direct規(guī)則是嚴(yán)格意義上的匹配疾棵,換言之Routing Key必須與Binding Key相匹配的時(shí)候才將消息傳送給Queue,那么topic這個(gè)規(guī)則就是模糊匹配痹仙,可以通過通配符滿足一部分規(guī)則就可以傳送是尔。它的約定是:
- routing key為一個(gè)句點(diǎn)號(hào)“. ”分隔的字符串(我們將被句點(diǎn)號(hào)“. ”分隔開的每一段獨(dú)立的字符串稱為一個(gè)單詞),如“stock.usd.nyse”开仰、“nyse.vmw”拟枚、“quick.orange.rabbit”
- binding key與routing key一樣也是句點(diǎn)號(hào)“. ”分隔的字符串
- binding key中可以存在兩種特殊字符“”與“#”薪铜,用于做模糊匹配,其中“”用于匹配一個(gè)單詞恩溅,“#”用于匹配多個(gè)單詞(可以是零個(gè))
當(dāng)生產(chǎn)者發(fā)送消息Routing Key=F.C.E的時(shí)候隔箍,這時(shí)候只滿足Queue1,所以會(huì)被路由到Queue中脚乡,如果Routing Key=A.C.E這時(shí)候會(huì)被同是路由到Queue1和Queue2中蜒滩,如果Routing Key=A.F.B時(shí),這里只會(huì)發(fā)送一條消息到Queue2中奶稠。
- headers
headers類型的Exchange不依賴于routing key與binding key的匹配規(guī)則來(lái)路由消息俯艰,而是根據(jù)發(fā)送的消息內(nèi)容中的headers屬性進(jìn)行匹配。
在綁定Queue與Exchange時(shí)指定一組鍵值對(duì)锌订;當(dāng)消息發(fā)送到Exchange時(shí)竹握,RabbitMQ會(huì)取到該消息的headers(也是一個(gè)鍵值對(duì)的形式),對(duì)比其中的鍵值對(duì)是否完全匹配Queue與Exchange綁定時(shí)指定的鍵值對(duì)瀑志;如果完全匹配則消息會(huì)路由到該Queue涩搓,否則不會(huì)路由到該Queue。
各種策略的使用教程可參考官網(wǎng)
各個(gè)平臺(tái)代碼實(shí)例可參考github