RabbitMQ 是當(dāng)前最流行的消息中間件(Message Broker)之一题造,支持多種消息協(xié)議(如 AMQP厌衙、MQTT)憾赁。
同時(shí)它也是一個(gè)輕量級(jí)的非常易于部署的開源軟件娱颊,可以運(yùn)行在當(dāng)前大多數(shù)操作系統(tǒng)及云端環(huán)境中,也能夠部署在分布式的集群環(huán)境里以達(dá)到高可用髓介、可伸縮的需求惕鼓。
此外,RabbitMQ 還為目前主流的編程語(yǔ)言提供了豐富的開發(fā)工具版保。
一呜笑、軟件安裝
可以進(jìn)入 官方下載界面 閱讀針對(duì)自己操作系統(tǒng)版本的安裝手冊(cè),根據(jù)需求選擇適合的安裝方式彻犁。
Windows 系統(tǒng)可以直接在該頁(yè)面中獲取二進(jìn)制安裝包(還需要安裝 Erlang 環(huán)境)叫胁,Linux 系統(tǒng)也可以根據(jù)發(fā)行版的不同添加特定的軟件鏡像源。
我這里是 Ubuntu 19.04汞幢,沒有特別的需求驼鹅,所以直接從系統(tǒng)默認(rèn)的軟件鏡像源里下載安裝,命令如下:
$ sudo apt-get install rabbitmq-server
安裝完成以后森篷,運(yùn)行 systemctl status rabbitmq-server
命令查看 RabbitMQ 服務(wù)的運(yùn)行狀態(tài):
$ systemctl status rabbitmq-server
● rabbitmq-server.service - RabbitMQ Messaging Server
Loaded: loaded (/lib/systemd/system/rabbitmq-server.service; enabled; vendor preset: enabled)
Active: active (running) since Fri 2019-07-26 01:03:27 CST; 2min 55s ago
Main PID: 770 (beam.smp)
Status: "Initialized"
Tasks: 85 (limit: 2302)
Memory: 85.8M
CGroup: /system.slice/rabbitmq-server.service
├─ 741 /bin/sh /usr/sbin/rabbitmq-server
├─ 770 /usr/lib/erlang/erts-10.2.4/bin/beam.smp -W w -A 64 -MBas ageffcbf -MHas ageffcbf -MBlmbcs 512 -MHlmbcs 512 -MMmcs 30 -P 1048576 -t 5000000 -stbt db -zdbbl 128000 -K true -- -root /usr/lib/erlang -progname erl -- -home /var/lib/rabbitmq -- -pa /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.8/ebin -noshell -noinput -s rabbit boot -sname rabbit@server1 -boot start_sasl -kernel inet_default_connect_options [{nodelay,true}] -sasl errlog_type error -sasl sasl_error_logger false -rabbit lager_log_root "/var/log/rabbitmq" -rabbit lager_default_file "/var/log/rabbitmq/rabbit@server1.log" -rabbit lager_upgrade_file "/var/log/rabbitmq/rabbit@server1_upgrade.log" -rabbit enabled_plugins_file "/etc/rabbitmq/enabled_plugins" -rabbit plugins_dir "/usr/lib/rabbitmq/plugins:/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.8/plugins" -rabbit plugins_expand_dir "/var/lib/rabbitmq/mnesia/rabbit@server1-plugins-expand" -os_mon start_cpu_sup false -os_mon start_disksup false -os_mon start_memsup false -mnesia dir "/var/lib/rabbitmq/mnesia/rabbit@server1" -kernel inet_dist_listen_min 25672 -kernel inet_dist_listen_max 25672
├─1243 erl_child_setup 65536
├─1286 inet_gethost 4
└─1287 inet_gethost 4
7月 26 01:02:44 server1 systemd[1]: Starting RabbitMQ Messaging Server...
7月 26 01:03:27 server1 systemd[1]: rabbitmq-server.service: Supervising process 770 which is not our child. We'll most likely not notice when it exits.
7月 26 01:03:27 server1 systemd[1]: Started RabbitMQ Messaging Server.
Web Admin
RabbitMQ 還提供了可以遠(yuǎn)程訪問(wèn)的 Web 管理與監(jiān)控工具输钩,默認(rèn)以插件的形式安裝到系統(tǒng)中,需要使用 rabbitmq-plugins
命令開啟仲智。
具體命令如下:
$ sudo rabbitmq-plugins enable rabbitmq_management
RabbitMQ 默認(rèn)創(chuàng)建了一個(gè)用戶名密碼分別為 guest/guest
的用戶买乃,只是該用戶只允許本地登錄。(我這里是遠(yuǎn)程钓辆。剪验。肴焊。)
如果需要遠(yuǎn)程訪問(wèn) Web 控制臺(tái),可以通過(guò) rabbitmqctl
命令創(chuàng)建一個(gè)新的管理賬戶:
$ sudo rabbitmqctl add_user <username> <password>
此時(shí)新創(chuàng)建的賬戶仍無(wú)法登錄功戚,還需要為其分配用戶角色以及對(duì) vhost 的管理權(quán)限搓萧,命令如下:
$ sudo rabbitmqctl set_user_tags <username> administrator
$ sudo rabbitmqctl set_permissions -p / <username> ".*" ".*" ".*"
權(quán)限設(shè)置完畢后建芙,即可用之前指定的用戶名密碼遠(yuǎn)程登錄 Web 管理系統(tǒng)领虹,界面如下圖:
Web 形式的后臺(tái)界面為管理工作與監(jiān)控需求提供了便捷的接口赐写,同時(shí)大部分管理操作也可直接通過(guò) rabbitmqctl
命令完成,具體可參考該命令的幫助信息:
$ sudo rabbitmqctl
Usage:
rabbitmqctl [-n <node>] [-l] [-q] <command> [<command options>]
...
Commands:
add_user <username> <password>
add_vhost <vhost>
authenticate_user <username> <password>
await_online_nodes <count> [-t <timeout>]
cancel_sync_queue [-p <vhost>] queue
change_cluster_node_type <disc|ram>
change_password <username> <password>
...
二乘粒、架構(gòu)解析
RabbitMQ 是一種高性能豌注、穩(wěn)定、可伸縮(集群部署)的消息中間件谓厘,由 Erlang 語(yǔ)言編寫幌羞。
Erlang 是一種函數(shù)式編程語(yǔ)言寸谜,專注于分布式竟稳、高容錯(cuò)的軟件類實(shí)時(shí)系統(tǒng)等應(yīng)用場(chǎng)景。它通過(guò)輕量級(jí)的進(jìn)程設(shè)計(jì)以及進(jìn)程之間的消息通信熊痴,提供了一個(gè)高層次的不需要共享狀態(tài)的并發(fā)模型他爸。
RabbitMQ 集群通過(guò) Erlang VM 原生的 IPC (inter-process communication) 機(jī)制完成跨節(jié)點(diǎn)的消息通信。
松耦合架構(gòu)
對(duì)于傳統(tǒng)的應(yīng)用架構(gòu)果善,比如一個(gè) Web 應(yīng)用的登錄程序诊笤,往往需要對(duì)后端的數(shù)據(jù)庫(kù)表格進(jìn)行多項(xiàng)實(shí)時(shí)的寫入操作。而當(dāng)用戶的訪問(wèn)量大增時(shí)巾陕,此時(shí)的表格更新操作很容易成為瓶頸并影響到整體的響應(yīng)速度讨跟。
相對(duì)于登錄程序直接更新表格數(shù)據(jù)的緊耦合架構(gòu),可以將前端的請(qǐng)求數(shù)據(jù)推送到基于消息的中間件或者某個(gè)中心化的消息隊(duì)列應(yīng)用鄙煤,再通過(guò)中間件分發(fā)消息到多個(gè)消費(fèi)者(Consumer)應(yīng)用晾匠,由消費(fèi)者獨(dú)立、異步地完成最終的數(shù)據(jù)庫(kù)更新操作梯刚。
基于消息的中間件對(duì)于創(chuàng)建數(shù)據(jù)導(dǎo)向的靈活的應(yīng)用架構(gòu)有非常大的優(yōu)勢(shì)凉馆。RabbitMQ 支持的松耦合設(shè)計(jì)可以使應(yīng)用不再受類似于數(shù)據(jù)庫(kù)寫操作的性能限制。
同時(shí)這種架構(gòu)也非常易于橫向擴(kuò)展亡资,可以在添加作用于相同數(shù)據(jù)的應(yīng)用實(shí)例時(shí)不影響現(xiàn)有的核心功能澜共。
三、消息應(yīng)用示例代碼
下文中將使用 Python 語(yǔ)言及其 RabbitMQ 客戶端 Pika
創(chuàng)建 5 個(gè)基本的消息應(yīng)用锥腻,結(jié)構(gòu)由簡(jiǎn)單到復(fù)雜嗦董,源代碼均參考自官網(wǎng) RabbitMQ Tutorials 。
安裝 pika 庫(kù):pip install pika
Hello World
該應(yīng)用的結(jié)構(gòu)示意圖如下:
由 P (Producer) 發(fā)送一條消息到隊(duì)列(Queue)瘦黑,再由隊(duì)列轉(zhuǎn)發(fā)消息到 C (Consumer) 京革。
發(fā)送端代碼 send.py
如下:
#!/usr/bin/env python
import pika
# 初始化與 RabbitMQ 服務(wù)器的連接
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 隊(duì)列聲明
channel.queue_declare(queue='hello')
# 發(fā)送消息
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
接收端 reveive.py
代碼如下:
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
# 接收到消息后觸發(fā)的回調(diào)函數(shù)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
# 消費(fèi)者聲明與消息監(jiān)聽
channel.basic_consume(
queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
測(cè)試
首先運(yùn)行 4 次發(fā)送程序:
$ python send.py
[x] Sent 'Hello World'
$ python send.py
[x] Sent 'Hello World'
$ python send.py
[x] Sent 'Hello World'
$ python send.py
[x] Sent 'Hello World'
從 Web 管理界面中可以看到销睁,此時(shí)隊(duì)列中緩存了 4 條消息。
運(yùn)行接收端程序:
$ python receive.py
[x] Waiting for messages. To exit press CTRL+C
[x] Received b'Hello World'
[x] Received b'Hello World'
[x] Received b'Hello World'
[x] Received b'Hello World'
發(fā)送端連續(xù) 4 次發(fā)送的消息被接收端收取存崖,隊(duì)列中緩存的消息被清空冻记。同時(shí)接收端保持運(yùn)行狀態(tài)等待新的消息被轉(zhuǎn)發(fā)給自己。
消息隊(duì)列一直處于等待生產(chǎn)者發(fā)送消息和將收到或緩存的消息轉(zhuǎn)發(fā)給消費(fèi)者的狀態(tài)来惧。如未有消費(fèi)者及時(shí)接收和處理被轉(zhuǎn)發(fā)的消息冗栗,則這部分消息緩存在隊(duì)列中等待進(jìn)一步操作。
Work Queue
結(jié)構(gòu)示意圖:
本例中將創(chuàng)建一個(gè) Work Queue 用來(lái)將消耗時(shí)間長(zhǎng)的任務(wù)以輪詢的方式分發(fā)給多個(gè)消費(fèi)者處理供搀。
生產(chǎn)者源代碼 new_task.py
:
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
print(" [x] Sent %r" % message)
connection.close()
消費(fèi)者源代碼 worker.py
:
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag) # Message acknowledgment
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()
Message acknowledgment
消費(fèi)者在處理接收到的任務(wù)或消息時(shí)有可能會(huì)消耗比較多的時(shí)間隅居,在此過(guò)程中,如消費(fèi)者端出現(xiàn)軟硬件故障葛虐,則會(huì)出現(xiàn)消息丟失的情況胎源。
RabbitMQ 支持 Message acknowledgment 。即消費(fèi)者在接收和處理完一個(gè)特定的消息后會(huì)向 RabbitMQ 返回一個(gè)應(yīng)答(ack)屿脐,說(shuō)明該消息可以從隊(duì)列中移除涕蚤。
如果消費(fèi)者在返回應(yīng)答之前丟失與隊(duì)列的連接,則 RabbitMQ 判定對(duì)應(yīng)的消息未由消費(fèi)者完全處理的诵,會(huì)將該消息保留在隊(duì)列中并重新分發(fā)給其他在線的消費(fèi)者万栅。
Message durability
消息應(yīng)答的機(jī)制可以確保即使消費(fèi)者宕機(jī)的情況下任務(wù)仍不會(huì)丟失。但是當(dāng) RabbitMQ 服務(wù)本身出現(xiàn)故障時(shí)西疤,隊(duì)列以及隊(duì)列中緩存的消息仍舊會(huì)被清理掉烦粒。
為了保證 RabbitMQ 中隊(duì)列以及消息的持久化,首先需要在生產(chǎn)者和消費(fèi)者代碼中同時(shí)聲明隊(duì)列為 durable
:
channel.queue_declare(queue='task_queue', durable=True)
此外還需要將生產(chǎn)者代碼中的 delivery_mode
屬性設(shè)置為 2 確保消息的持久化:
properties=pika.BasicProperties(delivery_mode=2,)
測(cè)試
打開兩個(gè)命令行終端代赁,分別運(yùn)行 worker.py
程序:
# Shell 1
$ python worker.py
[x] Waiting for messages. To exit press CTRL+C
# Shell 2
$ python worker.py
[x] Waiting for messages. To exit press CTRL+C
打開另一個(gè)終端窗口運(yùn)行 new_task.py
程序發(fā)送 4 條消息:
# Shell 3
$ python new_task.py First Message
[x] Sent 'First Message'
$ python new_task.py Second Message
[x] Sent 'Second Message'
$ python new_task.py Third Message
[x] Sent 'Third Message'
$ python new_task.py Forth Message
[x] Sent 'Forth Message'
最終兩個(gè)消費(fèi)者分別接收到隊(duì)列分發(fā)的兩條消息:
# Shell 1
$ python worker.py
[x] Waiting for messages. To exit press CTRL+C
[x] Received b'First Message'
[x] Done
[x] Received b'Third Message'
[x] Done
# Shell 2
$ python worker.py
[x] Waiting for messages. To exit press CTRL+C
[x] Received b'Second Message'
[x] Done
[x] Received b'Forth Message'
[x] Done
Fair dispatch
當(dāng) RabbitMQ 以輪詢的方式(即平均分配)將隊(duì)列中的消息轉(zhuǎn)發(fā)給多個(gè)消費(fèi)者時(shí)扰她,如果這些消費(fèi)者接收到的任務(wù)繁重程度差異很大,則會(huì)導(dǎo)致某些消費(fèi)者端任務(wù)的積壓芭碍。
為了避免這種情況發(fā)生徒役,可以使用 basic_qos
方法設(shè)置 prefetch
的值,如 worker.py
程序中的以下代碼:
channel.basic_qos(prefetch_count=1)
豁跑。
該行代碼可以確保同一個(gè)消費(fèi)者在任意時(shí)間點(diǎn)最多只接受 1 個(gè)任務(wù)分配給自己廉涕。即如果某個(gè)消費(fèi)者當(dāng)前有未處理完的消息,則不再接收新的消息直到當(dāng)前的任務(wù)處理完艇拍。
Publish/Subscribe
結(jié)構(gòu)示意圖:
Exchange
在之前的示例中狐蜕,用到了消息隊(duì)列模型中的以下幾個(gè)組件:
- producer :生產(chǎn)者,即發(fā)送消息的應(yīng)用
- queue :隊(duì)列卸夕,即存儲(chǔ)消息的緩存
- consumer :消費(fèi)者层释,即接收消息的應(yīng)用
實(shí)際上在 RabbitMQ 的消息模型中,生產(chǎn)者從來(lái)不會(huì)將消息直接發(fā)送到隊(duì)列中快集,而是將消息發(fā)送給一個(gè)名為 exchange 的組件贡羔。
exchange 的一端用來(lái)接收生產(chǎn)者發(fā)送的消息廉白,一端用來(lái)將消息推送到隊(duì)列中。它通過(guò) exchange type 中的定義判斷特定的消息是該推送給某個(gè)對(duì)應(yīng)的隊(duì)列乖寒,還是將其廣播給多個(gè)隊(duì)列猴蹂,又或者直接丟棄。
RabbitMQ 主要提供了 4 種 exchange 類型:direct
楣嘁、topic
磅轻、headers
和 fanout
。
本例中使用 fanout
逐虚,即 exchange 會(huì)將接收到的消息以廣播的形式發(fā)送給所有關(guān)聯(lián)的隊(duì)列聋溜,再由隊(duì)列傳遞給消費(fèi)者處理。
源代碼(emit_log.py
)如下:
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)
connection.close()
receive_logs.py
:
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
receive_logs.py
文件中有一行 result = channel.queue_declare(queue='', exclusive=True)
代碼叭爱,用來(lái)聲明一個(gè)臨時(shí)隊(duì)列( queue=''
沒有指定名稱撮躁,因此會(huì)由 RabbitMQ 設(shè)置隨機(jī)的名稱),同時(shí) exclusive=True
設(shè)置該隊(duì)列在消費(fèi)者斷開連接后自行刪除买雾。
測(cè)試
同時(shí)打開兩個(gè)命令行窗口分別運(yùn)行 receive_logs.py
文件:
# Shell 1
$ python receive_logs.py
[*] Waiting for logs. To exit press CTRL+C
# Shell 2
$ python receive_logs.py
[*] Waiting for logs. To exit press CTRL+C
再打開第三個(gè)終端執(zhí)行 emit_log.py
命令 4 次:
# Shell 3
$ python emit_log.py
[x] Sent 'info: Hello World!'
$ python emit_log.py
[x] Sent 'info: Hello World!'
$ python emit_log.py
[x] Sent 'info: Hello World!'
$ python emit_log.py
[x] Sent 'info: Hello World!'
此時(shí)之前運(yùn)行的兩個(gè) receive 程序同時(shí)收到發(fā)送的 4 條消息:
$ python receive_logs.py
[*] Waiting for logs. To exit press CTRL+C
[x] b'info: Hello World!'
[x] b'info: Hello World!'
[x] b'info: Hello World!'
[x] b'info: Hello World!'
$ python receive_logs.py
[*] Waiting for logs. To exit press CTRL+C
[x] b'info: Hello World!'
[x] b'info: Hello World!'
[x] b'info: Hello World!'
[x] b'info: Hello World!'
Routing
結(jié)構(gòu)示意圖:
與上一個(gè)例子中以廣播的形式轉(zhuǎn)發(fā)消息不同把曼,本例中允許消費(fèi)者通過(guò)隊(duì)列有選擇地訂閱生產(chǎn)者發(fā)送的部分消息。
Binding 和 Direct exchange
在 RabbitMQ 中凝果,binding 代表 exchange 與隊(duì)列的對(duì)應(yīng)關(guān)系祝迂,即隊(duì)列會(huì)根據(jù) binding 的設(shè)置對(duì) exchange 轉(zhuǎn)發(fā)的消息有選擇性地接收睦尽。
因此 binding 的最終效果也依賴于 exchange 的類型器净。比如之前用到的 fanout
類型,由于是廣播的形式(轉(zhuǎn)發(fā)給所有關(guān)聯(lián)的隊(duì)列)并不需要選擇的動(dòng)作当凡,則 binding 的值被忽略山害。
但是對(duì)于 direct
類型的 exchange ,則可以通過(guò) binding 對(duì)消息進(jìn)行篩選沿量。在 direct exchange 下浪慌,只有當(dāng)隊(duì)列的 binding_key
與消息的 routing_key
一致時(shí),隊(duì)列才會(huì)收到 exchange 轉(zhuǎn)發(fā)的消息朴则。
emit_log_direct.py
:
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
exchange='direct_logs', routing_key=severity, body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
receive_logs_direct.py
:
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
for severity in severities:
channel.queue_bind(
exchange='direct_logs', queue=queue_name, routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
測(cè)試
首先運(yùn)行 receive_logs_direct.py
程序并指定參數(shù)為 error
(即只接收標(biāo)記為“error”的消息):
# Shell 1
$ python receive_logs_direct.py error
[*] Waiting for logs. To exit press CTRL+C
打開另一終端同樣運(yùn)行 receive_logs_direct.py
程序并指定參數(shù)為 info warning
(即接收標(biāo)記為 info
或 warning
的消息):
# Shell 2
$ python receive_logs_direct.py info warning
[*] Waiting for logs. To exit press CTRL+C
打開第三個(gè)終端并運(yùn)行 emit_log_direct.py
程序發(fā)送 4 條日志消息:
# Shell 3
$ python emit_log_direct.py error "This is an error"
[x] Sent 'error':'This is an error'
$ python emit_log_direct.py info "Hi, I am an info"
[x] Sent 'info':'Hi, I am an info'
$ python emit_log_direct.py warning "Yeah, it's a warning"
[x] Sent 'warning':"Yeah, it's a warning"
$ python emit_log_direct.py error "Hi, it's an error again"
[x] Sent 'error':"Hi, it's an error again"
此時(shí) Shell 1 中只接收到了標(biāo)記為 error
的消息:
$ python receive_logs_direct.py error
[*] Waiting for logs. To exit press CTRL+C
[x] 'error':b'This is an error'
[x] 'error':b"Hi, it's an error again"
而 Shell 2 中接收到了標(biāo)記為 info
和 warning
的消息:
$ python receive_logs_direct.py info warning
[*] Waiting for logs. To exit press CTRL+C
[x] 'info':b'Hi, I am an info'
[x] 'warning':b"Yeah, it's a warning"
Topics
結(jié)構(gòu)示意圖:
direct
類型的 exchange 雖然可以根據(jù)消息的 routing_key
以及隊(duì)列的 binding_key
有選擇性的推送消息到隊(duì)列权纤,但是并不適合更復(fù)雜的場(chǎng)景。
而 topic
類型的 exchange 與 direct
類型邏輯上大致相同乌妒,只是 topic
類型的 exchange 并沒有一個(gè)明確的 routing_key
汹想,而是由幾個(gè)點(diǎn)號(hào)(.
)分隔的單詞(如 lazy.orange.cat
)進(jìn)行定義。
與之對(duì)應(yīng)的 binding_key
也需要遵循同樣的形式撤蚊,只不過(guò) binding_key
額外支持兩個(gè)特殊含義的字符:
- 星號(hào)(
*
)可以表示某一個(gè)任意的單詞 - 井號(hào)(
#
)可以表示任意 0 個(gè)或多個(gè)單詞
因此對(duì)于上圖(Topics)中的情形古掏,routing_key
為 quick.orange.rabbit
的消息會(huì)被轉(zhuǎn)發(fā)給 Q1 和 Q2 隊(duì)列,quick.orange.fox
則只會(huì)轉(zhuǎn)發(fā)給 Q1 隊(duì)列侦啸,lazy.orange.male.rabbit
被轉(zhuǎn)發(fā)給 Q2 隊(duì)列槽唾。
emit_log_topic
:
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
exchange='topic_logs', routing_key=routing_key, body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
receive_logs_topic.py
:
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(
exchange='topic_logs', queue=queue_name, routing_key=binding_key)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
測(cè)試
先運(yùn)行接收端程序(Shell 1 和 Shell 2)丧枪,再運(yùn)行發(fā)送端(Shell 3),效果如下:
# Shell 3
$ python emit_log_topic.py "kern.warning" "A kernel warning message"
[x] Sent 'kern.warning':'A kernel warning message'
$ python emit_log_topic.py "network.critical" "A critical network error"
[x] Sent 'network.critical':'A critical network error'
$ python emit_log_topic.py "kern.critical" "A critical kernel error"
[x] Sent 'kern.critical':'A critical kernel error'
# Shell 1
$ python receive_logs_topic.py "kern.*"
[*] Waiting for logs. To exit press CTRL+C
[x] 'kern.warning':b'A kernel warning message'
[x] 'kern.critical':b'A critical kernel error'
# Shell 2
$ python receive_logs_topic.py "*.critical"
[*] Waiting for logs. To exit press CTRL+C
[x] 'network.critical':b'A critical network error'
[x] 'kern.critical':b'A critical kernel error'