上節(jié)介紹了通過廣播簡單的將日志推送到所有的consumer。本節(jié)介紹通過定義規(guī)則眶熬,讓consumer有選擇的接受日志兆览。例如:只接收error級別的日志,不接受info級別的日志懦砂。
- Bindings
binding是關聯(lián) exchange 和 queue 的,也可以認為是此 queue 對此 exchange 的消息感興趣。
Bindings有另外一個參數(shù): routing_key孕惜,此參數(shù)依賴于 exchange type愧薛,如果是 fanout,則忽略此參數(shù)衫画。
channel.queue_bind(exchange=exchange_name
queue=queue_name,
routing_key='black')
- Direct exchange
之前的日志系統(tǒng)毫炉,是將消息全部廣播到所有的consumer,現(xiàn)在需要對于日志的嚴重性級別來做規(guī)則削罩。例如:需要將 critical error 級別的消息寫入磁盤瞄勾,而不寫入 warning 和 info 級別的日志消息。
fanout exchange弥激,僅僅是在廣播消息进陡,缺乏靈活性。使用 direct exchange代替微服,routing算法很簡單 -- 一個隊列中的消息準確匹配到 routing_key
的值趾疚。示例圖:
如圖,direct exchange "X" 和兩個隊列綁定以蕴。第一個隊列綁定orange糙麦,第二個隊列綁定black和green。if
消息的 routing_key 為orage丛肮,則exchange會將消息路由到隊列Q1赡磅,elif
消息的 routing_key 為black或green,則exchange會將消息路由到隊列Q2宝与,else
消息將會被丟棄焚廊。
- Multiple bindings
綁定多個隊列,類似 fanout exchange习劫,將消息推送到所有匹配的隊列咆瘟。routing_key
為black的消息將同時交付到Q1和Q2隊列。
- Emitting logs
- 創(chuàng)建exchange
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
- 發(fā)送消息
channel.basic_publish(exchange='direct_logs',
routing_key=severity, # severity可以是info榜聂、warning搞疗、error
body=message)
- Subscribing
與之前不同的是嗓蘑,為接受的日志的每個級別须肆,創(chuàng)建一個新的binding。
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
for severity in serverities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
- 完整樣例
- 編輯 emit_log_direct.py 程序桩皿,用來提交日志
#!/usr/bin/env python3
# coding=utf-8
import pika
import sys
message = ' '.join(sys.argv[1:]) or 'Hello World!'
# 獲取用戶終端輸入的severity
severity = sys.argv[1] if len(sys.argv) > 2 else 'info'
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
# 使用 direct exchange豌汇,名稱為"direct_logs"
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message,
)
print("[x] Sent '%s'" % message)
connection.close()
- 編輯 receive_logs_direct.py 程序,用來接受日志
#!/usr/bin/env python3
# coding=utf-8
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
# 聲明exchange泄隔,使用 direct type拒贱, 名稱為"direct_logs"
channel.exchange_declare(exchange="direct_logs", exchange_type="direct")
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# 獲取終端輸入的severity,可指定多個,之后使用for循環(huán)接收
severities = sys.argv[1:]
if not severities:
sys.stderr.write('Usage: %s [info] [warning] [error]\n' % __name__)
sys.exit(1)
def callback(ch, method, properties, body):
print('[x] Received %r' % body)
# 循環(huán)接收用戶指定的severities
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
channel.basic_consume(callback, queue=queue_name, no_ack=True)
print('[*] Waiting for messgaes. To exit press CTRL+C')
channel.start_consuming()
- 執(zhí)行
# 提交日志
> python3 emit_log_direct.py test_info
> python3 emit_log_direct.py warning test_warning
# consumer1接收日志
> python3 receive_logs_direct.py info warning
[*] Waiting for messgaes. To exit press CTRL+C
[x] Received b'test_info'
[x] Received b'warning test_warning'
# consumer2接收日志
> python3 receive_logs_direct.py warning
[*] Waiting for messgaes. To exit press CTRL+C
[x] Received b'warning test_warning'
參考文檔: http://www.rabbitmq.com/tutorials/tutorial-four-python.html