消息隊(duì)列中間件 RabbitMQ 詳細(xì)介紹——安裝與基本應(yīng)用(Python)

RabbitMQ 是當(dāng)前最流行的消息中間件(Message Broker)之一题造,支持多種消息協(xié)議(如 AMQP厌衙、MQTT)憾赁。
同時(shí)它也是一個(gè)輕量級(jí)的非常易于部署的開源軟件娱颊,可以運(yùn)行在當(dāng)前大多數(shù)操作系統(tǒng)及云端環(huán)境中,也能夠部署在分布式的集群環(huán)境里以達(dá)到高可用髓介、可伸縮的需求惕鼓。
此外,RabbitMQ 還為目前主流的編程語(yǔ)言提供了豐富的開發(fā)工具版保。

一呜笑、軟件安裝

可以進(jìn)入 官方下載界面 閱讀針對(duì)自己操作系統(tǒng)版本的安裝手冊(cè),根據(jù)需求選擇適合的安裝方式彻犁。
Windows 系統(tǒng)可以直接在該頁(yè)面中獲取二進(jìn)制安裝包(還需要安裝 Erlang 環(huán)境)叫胁,Linux 系統(tǒng)也可以根據(jù)發(fā)行版的不同添加特定的軟件鏡像源。

我這里是 Ubuntu 19.04汞幢,沒有特別的需求驼鹅,所以直接從系統(tǒng)默認(rèn)的軟件鏡像源里下載安裝,命令如下:
$ sudo apt-get install rabbitmq-server

安裝完成以后森篷,運(yùn)行 systemctl status rabbitmq-server 命令查看 RabbitMQ 服務(wù)的運(yùn)行狀態(tài):

$ systemctl status rabbitmq-server
● rabbitmq-server.service - RabbitMQ Messaging Server
   Loaded: loaded (/lib/systemd/system/rabbitmq-server.service; enabled; vendor preset: enabled)
   Active: active (running) since Fri 2019-07-26 01:03:27 CST; 2min 55s ago
 Main PID: 770 (beam.smp)
   Status: "Initialized"
    Tasks: 85 (limit: 2302)
   Memory: 85.8M
   CGroup: /system.slice/rabbitmq-server.service
           ├─ 741 /bin/sh /usr/sbin/rabbitmq-server
           ├─ 770 /usr/lib/erlang/erts-10.2.4/bin/beam.smp -W w -A 64 -MBas ageffcbf -MHas ageffcbf -MBlmbcs 512 -MHlmbcs 512 -MMmcs 30 -P 1048576 -t 5000000 -stbt db -zdbbl 128000 -K true -- -root /usr/lib/erlang -progname erl -- -home /var/lib/rabbitmq -- -pa /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.8/ebin -noshell -noinput -s rabbit boot -sname rabbit@server1 -boot start_sasl -kernel inet_default_connect_options [{nodelay,true}] -sasl errlog_type error -sasl sasl_error_logger false -rabbit lager_log_root "/var/log/rabbitmq" -rabbit lager_default_file "/var/log/rabbitmq/rabbit@server1.log" -rabbit lager_upgrade_file "/var/log/rabbitmq/rabbit@server1_upgrade.log" -rabbit enabled_plugins_file "/etc/rabbitmq/enabled_plugins" -rabbit plugins_dir "/usr/lib/rabbitmq/plugins:/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.8/plugins" -rabbit plugins_expand_dir "/var/lib/rabbitmq/mnesia/rabbit@server1-plugins-expand" -os_mon start_cpu_sup false -os_mon start_disksup false -os_mon start_memsup false -mnesia dir "/var/lib/rabbitmq/mnesia/rabbit@server1" -kernel inet_dist_listen_min 25672 -kernel inet_dist_listen_max 25672
           ├─1243 erl_child_setup 65536
           ├─1286 inet_gethost 4
           └─1287 inet_gethost 4

7月 26 01:02:44 server1 systemd[1]: Starting RabbitMQ Messaging Server...
7月 26 01:03:27 server1 systemd[1]: rabbitmq-server.service: Supervising process 770 which is not our child. We'll most likely not notice when it exits.
7月 26 01:03:27 server1 systemd[1]: Started RabbitMQ Messaging Server.
Web Admin

RabbitMQ 還提供了可以遠(yuǎn)程訪問(wèn)的 Web 管理與監(jiān)控工具输钩,默認(rèn)以插件的形式安裝到系統(tǒng)中,需要使用 rabbitmq-plugins 命令開啟仲智。
具體命令如下:
$ sudo rabbitmq-plugins enable rabbitmq_management

RabbitMQ 默認(rèn)創(chuàng)建了一個(gè)用戶名密碼分別為 guest/guest 的用戶买乃,只是該用戶只允許本地登錄。(我這里是遠(yuǎn)程钓辆。剪验。肴焊。)
如果需要遠(yuǎn)程訪問(wèn) Web 控制臺(tái),可以通過(guò) rabbitmqctl 命令創(chuàng)建一個(gè)新的管理賬戶:
$ sudo rabbitmqctl add_user <username> <password>

此時(shí)新創(chuàng)建的賬戶仍無(wú)法登錄功戚,還需要為其分配用戶角色以及對(duì) vhost 的管理權(quán)限搓萧,命令如下:

$ sudo rabbitmqctl set_user_tags <username> administrator
$ sudo rabbitmqctl set_permissions -p / <username> ".*" ".*" ".*"

權(quán)限設(shè)置完畢后建芙,即可用之前指定的用戶名密碼遠(yuǎn)程登錄 Web 管理系統(tǒng)领虹,界面如下圖:


Web Admin

Web 形式的后臺(tái)界面為管理工作與監(jiān)控需求提供了便捷的接口赐写,同時(shí)大部分管理操作也可直接通過(guò) rabbitmqctl 命令完成,具體可參考該命令的幫助信息:

$ sudo rabbitmqctl

Usage:
rabbitmqctl [-n <node>] [-l] [-q] <command> [<command options>]
...
Commands:
    add_user <username> <password>
    add_vhost <vhost>
    authenticate_user <username> <password>
    await_online_nodes <count> [-t <timeout>]
    cancel_sync_queue [-p <vhost>] queue
    change_cluster_node_type <disc|ram>
    change_password <username> <password>
...

二乘粒、架構(gòu)解析

RabbitMQ 是一種高性能豌注、穩(wěn)定、可伸縮(集群部署)的消息中間件谓厘,由 Erlang 語(yǔ)言編寫幌羞。

Erlang 是一種函數(shù)式編程語(yǔ)言寸谜,專注于分布式竟稳、高容錯(cuò)的軟件類實(shí)時(shí)系統(tǒng)等應(yīng)用場(chǎng)景。它通過(guò)輕量級(jí)的進(jìn)程設(shè)計(jì)以及進(jìn)程之間的消息通信熊痴,提供了一個(gè)高層次的不需要共享狀態(tài)的并發(fā)模型他爸。
RabbitMQ 集群通過(guò) Erlang VM 原生的 IPC (inter-process communication) 機(jī)制完成跨節(jié)點(diǎn)的消息通信。

松耦合架構(gòu)

對(duì)于傳統(tǒng)的應(yīng)用架構(gòu)果善,比如一個(gè) Web 應(yīng)用的登錄程序诊笤,往往需要對(duì)后端的數(shù)據(jù)庫(kù)表格進(jìn)行多項(xiàng)實(shí)時(shí)的寫入操作。而當(dāng)用戶的訪問(wèn)量大增時(shí)巾陕,此時(shí)的表格更新操作很容易成為瓶頸并影響到整體的響應(yīng)速度讨跟。

相對(duì)于登錄程序直接更新表格數(shù)據(jù)的緊耦合架構(gòu),可以將前端的請(qǐng)求數(shù)據(jù)推送到基于消息的中間件或者某個(gè)中心化的消息隊(duì)列應(yīng)用鄙煤,再通過(guò)中間件分發(fā)消息到多個(gè)消費(fèi)者(Consumer)應(yīng)用晾匠,由消費(fèi)者獨(dú)立、異步地完成最終的數(shù)據(jù)庫(kù)更新操作梯刚。

image.png

基于消息的中間件對(duì)于創(chuàng)建數(shù)據(jù)導(dǎo)向的靈活的應(yīng)用架構(gòu)有非常大的優(yōu)勢(shì)凉馆。RabbitMQ 支持的松耦合設(shè)計(jì)可以使應(yīng)用不再受類似于數(shù)據(jù)庫(kù)寫操作的性能限制。
同時(shí)這種架構(gòu)也非常易于橫向擴(kuò)展亡资,可以在添加作用于相同數(shù)據(jù)的應(yīng)用實(shí)例時(shí)不影響現(xiàn)有的核心功能澜共。
tightly coupled application

loosely coupled application

三、消息應(yīng)用示例代碼

下文中將使用 Python 語(yǔ)言及其 RabbitMQ 客戶端 Pika 創(chuàng)建 5 個(gè)基本的消息應(yīng)用锥腻,結(jié)構(gòu)由簡(jiǎn)單到復(fù)雜嗦董,源代碼均參考自官網(wǎng) RabbitMQ Tutorials
安裝 pika 庫(kù):pip install pika

Hello World

該應(yīng)用的結(jié)構(gòu)示意圖如下:


helloworld

由 P (Producer) 發(fā)送一條消息到隊(duì)列(Queue)瘦黑,再由隊(duì)列轉(zhuǎn)發(fā)消息到 C (Consumer) 京革。

發(fā)送端代碼 send.py 如下:

#!/usr/bin/env python
import pika

# 初始化與 RabbitMQ 服務(wù)器的連接
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 隊(duì)列聲明
channel.queue_declare(queue='hello')

# 發(fā)送消息
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

接收端 reveive.py 代碼如下:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

# 接收到消息后觸發(fā)的回調(diào)函數(shù)
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

# 消費(fèi)者聲明與消息監(jiān)聽
channel.basic_consume(
    queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
測(cè)試

首先運(yùn)行 4 次發(fā)送程序:

$ python send.py
 [x] Sent 'Hello World'
$ python send.py
 [x] Sent 'Hello World'
$ python send.py
 [x] Sent 'Hello World'
$ python send.py
 [x] Sent 'Hello World'

從 Web 管理界面中可以看到销睁,此時(shí)隊(duì)列中緩存了 4 條消息。


overview

運(yùn)行接收端程序:

$ python receive.py
 [x] Waiting for messages. To exit press CTRL+C
 [x] Received b'Hello World'
 [x] Received b'Hello World'
 [x] Received b'Hello World'
 [x] Received b'Hello World'

發(fā)送端連續(xù) 4 次發(fā)送的消息被接收端收取存崖,隊(duì)列中緩存的消息被清空冻记。同時(shí)接收端保持運(yùn)行狀態(tài)等待新的消息被轉(zhuǎn)發(fā)給自己。


overview 2

消息隊(duì)列一直處于等待生產(chǎn)者發(fā)送消息和將收到或緩存的消息轉(zhuǎn)發(fā)給消費(fèi)者的狀態(tài)来惧。如未有消費(fèi)者及時(shí)接收和處理被轉(zhuǎn)發(fā)的消息冗栗,則這部分消息緩存在隊(duì)列中等待進(jìn)一步操作。

Work Queue

結(jié)構(gòu)示意圖:

Work Queue

本例中將創(chuàng)建一個(gè) Work Queue 用來(lái)將消耗時(shí)間長(zhǎng)的任務(wù)以輪詢的方式分發(fā)給多個(gè)消費(fèi)者處理供搀。

生產(chǎn)者源代碼 new_task.py

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2,  # make message persistent
    ))
print(" [x] Sent %r" % message)
connection.close()

消費(fèi)者源代碼 worker.py

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)  # Message acknowledgment


channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)

channel.start_consuming()
Message acknowledgment

消費(fèi)者在處理接收到的任務(wù)或消息時(shí)有可能會(huì)消耗比較多的時(shí)間隅居,在此過(guò)程中,如消費(fèi)者端出現(xiàn)軟硬件故障葛虐,則會(huì)出現(xiàn)消息丟失的情況胎源。

RabbitMQ 支持 Message acknowledgment 。即消費(fèi)者在接收和處理完一個(gè)特定的消息后會(huì)向 RabbitMQ 返回一個(gè)應(yīng)答(ack)屿脐,說(shuō)明該消息可以從隊(duì)列中移除涕蚤。
如果消費(fèi)者在返回應(yīng)答之前丟失與隊(duì)列的連接,則 RabbitMQ 判定對(duì)應(yīng)的消息未由消費(fèi)者完全處理的诵,會(huì)將該消息保留在隊(duì)列中并重新分發(fā)給其他在線的消費(fèi)者万栅。

Message durability

消息應(yīng)答的機(jī)制可以確保即使消費(fèi)者宕機(jī)的情況下任務(wù)仍不會(huì)丟失。但是當(dāng) RabbitMQ 服務(wù)本身出現(xiàn)故障時(shí)西疤,隊(duì)列以及隊(duì)列中緩存的消息仍舊會(huì)被清理掉烦粒。

為了保證 RabbitMQ 中隊(duì)列以及消息的持久化,首先需要在生產(chǎn)者和消費(fèi)者代碼中同時(shí)聲明隊(duì)列為 durable
channel.queue_declare(queue='task_queue', durable=True)

此外還需要將生產(chǎn)者代碼中的 delivery_mode 屬性設(shè)置為 2 確保消息的持久化:
properties=pika.BasicProperties(delivery_mode=2,)

測(cè)試

打開兩個(gè)命令行終端代赁,分別運(yùn)行 worker.py 程序:

# Shell 1
$ python worker.py
 [x] Waiting for messages. To exit press CTRL+C
# Shell 2
$ python worker.py
 [x] Waiting for messages. To exit press CTRL+C

打開另一個(gè)終端窗口運(yùn)行 new_task.py 程序發(fā)送 4 條消息:

# Shell 3
$ python new_task.py First Message
 [x] Sent 'First Message'
$ python new_task.py Second Message
 [x] Sent 'Second Message'
$ python new_task.py Third Message
 [x] Sent 'Third Message'
$ python new_task.py Forth Message
 [x] Sent 'Forth Message'

最終兩個(gè)消費(fèi)者分別接收到隊(duì)列分發(fā)的兩條消息:

# Shell 1
$ python worker.py
 [x] Waiting for messages. To exit press CTRL+C
 [x] Received b'First Message'
 [x] Done
 [x] Received b'Third Message'
 [x] Done
# Shell 2
$ python worker.py
 [x] Waiting for messages. To exit press CTRL+C
 [x] Received b'Second Message'
 [x] Done
 [x] Received b'Forth Message'
 [x] Done
Fair dispatch

當(dāng) RabbitMQ 以輪詢的方式(即平均分配)將隊(duì)列中的消息轉(zhuǎn)發(fā)給多個(gè)消費(fèi)者時(shí)扰她,如果這些消費(fèi)者接收到的任務(wù)繁重程度差異很大,則會(huì)導(dǎo)致某些消費(fèi)者端任務(wù)的積壓芭碍。
為了避免這種情況發(fā)生徒役,可以使用 basic_qos 方法設(shè)置 prefetch 的值,如 worker.py 程序中的以下代碼:
channel.basic_qos(prefetch_count=1) 豁跑。

該行代碼可以確保同一個(gè)消費(fèi)者在任意時(shí)間點(diǎn)最多只接受 1 個(gè)任務(wù)分配給自己廉涕。即如果某個(gè)消費(fèi)者當(dāng)前有未處理完的消息,則不再接收新的消息直到當(dāng)前的任務(wù)處理完艇拍。

Publish/Subscribe

結(jié)構(gòu)示意圖:


Publish-Subscribe
Exchange

在之前的示例中狐蜕,用到了消息隊(duì)列模型中的以下幾個(gè)組件:

  • producer :生產(chǎn)者,即發(fā)送消息的應(yīng)用
  • queue :隊(duì)列卸夕,即存儲(chǔ)消息的緩存
  • consumer :消費(fèi)者层释,即接收消息的應(yīng)用

實(shí)際上在 RabbitMQ 的消息模型中,生產(chǎn)者從來(lái)不會(huì)將消息直接發(fā)送到隊(duì)列中快集,而是將消息發(fā)送給一個(gè)名為 exchange 的組件贡羔。
exchange 的一端用來(lái)接收生產(chǎn)者發(fā)送的消息廉白,一端用來(lái)將消息推送到隊(duì)列中。它通過(guò) exchange type 中的定義判斷特定的消息是該推送給某個(gè)對(duì)應(yīng)的隊(duì)列乖寒,還是將其廣播給多個(gè)隊(duì)列猴蹂,又或者直接丟棄。

RabbitMQ 主要提供了 4 種 exchange 類型:direct楣嘁、topic磅轻、headersfanout
本例中使用 fanout逐虚,即 exchange 會(huì)將接收到的消息以廣播的形式發(fā)送給所有關(guān)聯(lián)的隊(duì)列聋溜,再由隊(duì)列傳遞給消費(fèi)者處理。

源代碼(emit_log.py)如下:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)
connection.close()

receive_logs.py

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs', queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

receive_logs.py 文件中有一行 result = channel.queue_declare(queue='', exclusive=True) 代碼叭爱,用來(lái)聲明一個(gè)臨時(shí)隊(duì)列( queue='' 沒有指定名稱撮躁,因此會(huì)由 RabbitMQ 設(shè)置隨機(jī)的名稱),同時(shí) exclusive=True 設(shè)置該隊(duì)列在消費(fèi)者斷開連接后自行刪除买雾。

測(cè)試

同時(shí)打開兩個(gè)命令行窗口分別運(yùn)行 receive_logs.py 文件:

# Shell 1
$ python receive_logs.py
 [*] Waiting for logs. To exit press CTRL+C
# Shell 2
$ python receive_logs.py
 [*] Waiting for logs. To exit press CTRL+C

再打開第三個(gè)終端執(zhí)行 emit_log.py 命令 4 次:

# Shell 3
$ python emit_log.py
 [x] Sent 'info: Hello World!'
$ python emit_log.py
 [x] Sent 'info: Hello World!'
$ python emit_log.py
 [x] Sent 'info: Hello World!'
$ python emit_log.py
 [x] Sent 'info: Hello World!'

此時(shí)之前運(yùn)行的兩個(gè) receive 程序同時(shí)收到發(fā)送的 4 條消息:

$ python receive_logs.py
 [*] Waiting for logs. To exit press CTRL+C
 [x] b'info: Hello World!'
 [x] b'info: Hello World!'
 [x] b'info: Hello World!'
 [x] b'info: Hello World!'
$ python receive_logs.py
 [*] Waiting for logs. To exit press CTRL+C
 [x] b'info: Hello World!'
 [x] b'info: Hello World!'
 [x] b'info: Hello World!'
 [x] b'info: Hello World!'
Routing

結(jié)構(gòu)示意圖:

Routing

與上一個(gè)例子中以廣播的形式轉(zhuǎn)發(fā)消息不同把曼,本例中允許消費(fèi)者通過(guò)隊(duì)列有選擇地訂閱生產(chǎn)者發(fā)送的部分消息。

Binding 和 Direct exchange

在 RabbitMQ 中凝果,binding 代表 exchange 與隊(duì)列的對(duì)應(yīng)關(guān)系祝迂,即隊(duì)列會(huì)根據(jù) binding 的設(shè)置對(duì) exchange 轉(zhuǎn)發(fā)的消息有選擇性地接收睦尽。
因此 binding 的最終效果也依賴于 exchange 的類型器净。比如之前用到的 fanout 類型,由于是廣播的形式(轉(zhuǎn)發(fā)給所有關(guān)聯(lián)的隊(duì)列)并不需要選擇的動(dòng)作当凡,則 binding 的值被忽略山害。

但是對(duì)于 direct 類型的 exchange ,則可以通過(guò) binding 對(duì)消息進(jìn)行篩選沿量。在 direct exchange 下浪慌,只有當(dāng)隊(duì)列的 binding_key 與消息的 routing_key 一致時(shí),隊(duì)列才會(huì)收到 exchange 轉(zhuǎn)發(fā)的消息朴则。

emit_log_direct.py

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
    exchange='direct_logs', routing_key=severity, body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

receive_logs_direct.py

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(
        exchange='direct_logs', queue=queue_name, routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()
測(cè)試

首先運(yùn)行 receive_logs_direct.py 程序并指定參數(shù)為 error(即只接收標(biāo)記為“error”的消息):

# Shell 1
$ python receive_logs_direct.py error
 [*] Waiting for logs. To exit press CTRL+C

打開另一終端同樣運(yùn)行 receive_logs_direct.py 程序并指定參數(shù)為 info warning(即接收標(biāo)記為 infowarning 的消息):

# Shell 2
$ python receive_logs_direct.py info warning
 [*] Waiting for logs. To exit press CTRL+C

打開第三個(gè)終端并運(yùn)行 emit_log_direct.py 程序發(fā)送 4 條日志消息:

# Shell 3
$ python emit_log_direct.py error "This is an error"
 [x] Sent 'error':'This is an error'
$ python emit_log_direct.py info "Hi, I am an info"
 [x] Sent 'info':'Hi, I am an info'
$ python emit_log_direct.py warning "Yeah, it's a warning"
 [x] Sent 'warning':"Yeah, it's a warning"
$ python emit_log_direct.py error "Hi, it's an error again"
 [x] Sent 'error':"Hi, it's an error again"

此時(shí) Shell 1 中只接收到了標(biāo)記為 error 的消息:

$ python receive_logs_direct.py error
 [*] Waiting for logs. To exit press CTRL+C
 [x] 'error':b'This is an error'
 [x] 'error':b"Hi, it's an error again"

而 Shell 2 中接收到了標(biāo)記為 infowarning 的消息:

$ python receive_logs_direct.py info warning
 [*] Waiting for logs. To exit press CTRL+C
 [x] 'info':b'Hi, I am an info'
 [x] 'warning':b"Yeah, it's a warning"
Topics

結(jié)構(gòu)示意圖:


Topics

direct 類型的 exchange 雖然可以根據(jù)消息的 routing_key 以及隊(duì)列的 binding_key 有選擇性的推送消息到隊(duì)列权纤,但是并不適合更復(fù)雜的場(chǎng)景。
topic 類型的 exchange 與 direct 類型邏輯上大致相同乌妒,只是 topic 類型的 exchange 并沒有一個(gè)明確的 routing_key汹想,而是由幾個(gè)點(diǎn)號(hào)(.)分隔的單詞(如 lazy.orange.cat)進(jìn)行定義。
與之對(duì)應(yīng)的 binding_key 也需要遵循同樣的形式撤蚊,只不過(guò) binding_key 額外支持兩個(gè)特殊含義的字符:

  • 星號(hào)(*)可以表示某一個(gè)任意的單詞
  • 井號(hào)(#)可以表示任意 0 個(gè)或多個(gè)單詞

因此對(duì)于上圖(Topics)中的情形古掏,routing_keyquick.orange.rabbit 的消息會(huì)被轉(zhuǎn)發(fā)給 Q1 和 Q2 隊(duì)列,quick.orange.fox 則只會(huì)轉(zhuǎn)發(fā)給 Q1 隊(duì)列侦啸,lazy.orange.male.rabbit 被轉(zhuǎn)發(fā)給 Q2 隊(duì)列槽唾。

emit_log_topic

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
    exchange='topic_logs', routing_key=routing_key, body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

receive_logs_topic.py

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(
        exchange='topic_logs', queue=queue_name, routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()
測(cè)試

先運(yùn)行接收端程序(Shell 1 和 Shell 2)丧枪,再運(yùn)行發(fā)送端(Shell 3),效果如下:

# Shell 3
$ python emit_log_topic.py "kern.warning" "A kernel warning message"
 [x] Sent 'kern.warning':'A kernel warning message'
$ python emit_log_topic.py "network.critical" "A critical network error"
 [x] Sent 'network.critical':'A critical network error'
$ python emit_log_topic.py "kern.critical" "A critical kernel error"
 [x] Sent 'kern.critical':'A critical kernel error'
# Shell 1
$ python receive_logs_topic.py "kern.*"
 [*] Waiting for logs. To exit press CTRL+C
 [x] 'kern.warning':b'A kernel warning message'
 [x] 'kern.critical':b'A critical kernel error'
# Shell 2
$ python receive_logs_topic.py "*.critical"
 [*] Waiting for logs. To exit press CTRL+C
 [x] 'network.critical':b'A critical network error'
 [x] 'kern.critical':b'A critical kernel error'

參考資料

RabbitMQ Tutorials
RabbitMQ in Depth

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末庞萍,一起剝皮案震驚了整個(gè)濱河市拧烦,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌钝计,老刑警劉巖屎篱,帶你破解...
    沈念sama閱讀 218,755評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異葵蒂,居然都是意外死亡交播,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門践付,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)秦士,“玉大人,你說(shuō)我怎么就攤上這事永高∷硗粒” “怎么了?”我有些...
    開封第一講書人閱讀 165,138評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵命爬,是天一觀的道長(zhǎng)曹傀。 經(jīng)常有香客問(wèn)我,道長(zhǎng)饲宛,這世上最難降的妖魔是什么皆愉? 我笑而不...
    開封第一講書人閱讀 58,791評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮艇抠,結(jié)果婚禮上幕庐,老公的妹妹穿的比我還像新娘。我一直安慰自己家淤,他們只是感情好异剥,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,794評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著絮重,像睡著了一般冤寿。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上青伤,一...
    開封第一講書人閱讀 51,631評(píng)論 1 305
  • 那天督怜,我揣著相機(jī)與錄音,去河邊找鬼潮模。 笑死亮蛔,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的擎厢。 我是一名探鬼主播究流,決...
    沈念sama閱讀 40,362評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼辣吃,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了芬探?” 一聲冷哼從身側(cè)響起神得,我...
    開封第一講書人閱讀 39,264評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎偷仿,沒想到半個(gè)月后哩簿,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,724評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡酝静,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評(píng)論 3 336
  • 正文 我和宋清朗相戀三年节榜,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片别智。...
    茶點(diǎn)故事閱讀 40,040評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡宗苍,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出薄榛,到底是詐尸還是另有隱情讳窟,我是刑警寧澤,帶...
    沈念sama閱讀 35,742評(píng)論 5 346
  • 正文 年R本政府宣布敞恋,位于F島的核電站丽啡,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏硬猫。R本人自食惡果不足惜补箍,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,364評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望浦徊。 院中可真熱鬧馏予,春花似錦、人聲如沸盔性。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,944評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)冕香。三九已至,卻和暖如春后豫,著一層夾襖步出監(jiān)牢的瞬間悉尾,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,060評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工挫酿, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留构眯,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,247評(píng)論 3 371
  • 正文 我出身青樓早龟,卻偏偏與公主長(zhǎng)得像惫霸,于是被迫代替她去往敵國(guó)和親猫缭。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,979評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容