源碼:https://github.com/ltoddy/rabbitmq-tutorial
路由
本章節(jié)教程重點介紹的內容
在之前的教程中摘完,我們構建了一個簡單的日志系統 我們能夠將日志消息廣播給許多接收者说订。
在本教程中吼渡,我們將添加一個功能 - 我們將只能訂閱一部分消息法瑟。例如译断,我們只能將重要的錯誤消息引導到日志文件(以節(jié)省磁盤空間)萧恕,同時仍然能夠在控制臺上打印所有日志消息漾狼。
綁定
在前面的例子中,我們已經創(chuàng)建了綁定。您可能會回想一下代碼:
channel.queue_bind(exchange=EXCHANGE_NAME,
queue=queue_name)
綁定是交換和隊列之間的關系师溅。這可以簡單地理解為: the queue is interested in messages from this exchange.
綁定可以使用額外的routing_key參數茅信。為了避免與basic_publish參數混淆,我們將其稱為綁定鍵墓臭。這就是我們如何使用一個鍵創(chuàng)建一個綁定:
channel.queue_bind(exchange=exchange_name,
queue=queue_name,
routing_key='black')
綁定鍵的含義取決于交換類型蘸鲸。我們之前使用的 fanout 交換簡單地忽略了它的價值。
直接交換
我們之前教程的日志記錄系統將所有消息廣播給所有消費者窿锉。我們希望將其擴展為允許根據其進行嚴格的過濾消息酌摇。
例如,我們可能希望將嚴重錯誤的日志消息寫入磁盤嗡载,而不會寫入警告或信息日志消息窑多。
我們正在使用fanout交換,這不會給我們太多的靈活性 - 它只能無意識地播放洼滚。
我們將使用direct交換埂息。direct交換背后的路由算法很簡單 - 消息進入隊列,其綁定密鑰與消息的路由密鑰完全匹配遥巴。
為了說明這一點千康,請考慮以下設置:
在這個設置中,我們可以看到有兩個隊列綁定的直接交換機X. 第一個隊列用綁定鍵orange綁定铲掐,第二個隊列有兩個綁定拾弃,一個綁定鍵為black,另一個為green摆霉。
在這種設置中豪椿,使用路由鍵orange發(fā)布到交換機的消息 將被路由到隊列Q1。帶有black或gree路由鍵的消息將進入Q2携栋。所有其他消息將被丟棄砂碉。
多個綁定
使用相同的綁定密鑰綁定多個隊列是完全合法的。在我們的例子中刻两,我們可以使用綁定鍵black添加X和Q1之間的綁定。
在這種情況下滴某,direct交換就像fanout一樣磅摹,并將消息廣播到所有匹配的隊列。帶有路由鍵black的消息將傳送到Q1和Q2霎奢。
發(fā)出日志
我們將使用這個模型用于我們的日志系統户誓。取而代之的fanout,我們將消息發(fā)送到direct交換幕侠。我們將提供嚴格的日志作為路由鍵(routing key)帝美。
這樣接收腳本將能夠選擇想要接收的消息。我們先關注發(fā)出日志的實現晤硕。
像往常一樣悼潭,我們需要首先創(chuàng)建一個交換:
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')
我們準備發(fā)送一條消息:
channel.basic_publish(exchange='direct_logs',
routing_key='',
body=message)
為了簡化事情庇忌,我們將假設“severity”可以是'info','warning'舰褪,'error'之一皆疹。
訂閱
接收郵件的方式與上一個教程中的一樣,只有一個例外 - 我們將為每個我們感興趣的嚴重程度創(chuàng)建一個新綁定占拍。
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
把它放在一起
emit_log_direct.py的代碼:
#!/usr/bin/env python
import sys
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')
severity = sys.args[1:] if len(sys.argv) > 2 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity, body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
receive_logs_direct.py的代碼:
#!/usr/bin/env python
import sys
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(cb, method, properities, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
如果只想保存'warning'和'error'(而不是'info')將消息記錄到文件中略就,只需打開一個控制臺并輸入:
python receive_logs_direct.py warning error > logs_from_rabbit.log
如果您希望在屏幕上看到所有日志消息,請打開一個新終端并執(zhí)行以下操作:
python receive_logs_direct.py info warning error
例如晃酒,要輸出error日志消息表牢,只需輸入:
python emit_log_direct.py error "Run. Run. Or it will explode."