RabbitMQ -- part4 [Routing]

上節(jié)介紹了通過廣播簡單的將日志推送到所有的consumer。本節(jié)介紹通過定義規(guī)則眶熬,讓consumer有選擇的接受日志兆览。例如:只接收error級別的日志,不接受info級別的日志懦砂。
  • Bindings

binding是關聯(lián) exchangequeue 的,也可以認為是此 queue 對此 exchange 的消息感興趣。

Bindings有另外一個參數(shù): routing_key孕惜,此參數(shù)依賴于 exchange type愧薛,如果是 fanout,則忽略此參數(shù)衫画。

channel.queue_bind(exchange=exchange_name
                    queue=queue_name,
                    routing_key='black')
  • Direct exchange

之前的日志系統(tǒng)毫炉,是將消息全部廣播到所有的consumer,現(xiàn)在需要對于日志的嚴重性級別來做規(guī)則削罩。例如:需要將 critical error 級別的消息寫入磁盤瞄勾,而不寫入 warning 和 info 級別的日志消息。

fanout exchange弥激,僅僅是在廣播消息进陡,缺乏靈活性。使用 direct exchange代替微服,routing算法很簡單 -- 一個隊列中的消息準確匹配到 routing_key 的值趾疚。示例圖:

direct exchange

如圖,direct exchange "X" 和兩個隊列綁定以蕴。第一個隊列綁定orange糙麦,第二個隊列綁定black和green。if消息的 routing_key 為orage丛肮,則exchange會將消息路由到隊列Q1赡磅,elif消息的 routing_key 為black或green,則exchange會將消息路由到隊列Q2宝与,else消息將會被丟棄焚廊。

  • Multiple bindings
multiple bindings

綁定多個隊列,類似 fanout exchange习劫,將消息推送到所有匹配的隊列咆瘟。routing_key為black的消息將同時交付到Q1和Q2隊列。

  • Emitting logs
  1. 創(chuàng)建exchange

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

  1. 發(fā)送消息
channel.basic_publish(exchange='direct_logs',
                    routing_key=severity,   # severity可以是info榜聂、warning搞疗、error
                    body=message)
  • Subscribing

與之前不同的是嗓蘑,為接受的日志的每個級別须肆,創(chuàng)建一個新的binding。

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

for severity in serverities:
    channel.queue_bind(exchange='direct_logs',
                        queue=queue_name,
                        routing_key=severity)
  • 完整樣例
demo
  1. 編輯 emit_log_direct.py 程序桩皿,用來提交日志
#!/usr/bin/env python3
# coding=utf-8

import pika
import sys

message = ' '.join(sys.argv[1:]) or 'Hello World!'

# 獲取用戶終端輸入的severity
severity = sys.argv[1] if len(sys.argv) > 2 else 'info'

connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()

# 使用 direct exchange豌汇,名稱為"direct_logs"
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message,
)

print("[x] Sent '%s'" % message)

connection.close()
  1. 編輯 receive_logs_direct.py 程序,用來接受日志
#!/usr/bin/env python3
# coding=utf-8

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()

# 聲明exchange泄隔,使用 direct type拒贱, 名稱為"direct_logs"
channel.exchange_declare(exchange="direct_logs", exchange_type="direct")

result = channel.queue_declare(exclusive=True)

queue_name = result.method.queue

# 獲取終端輸入的severity,可指定多個,之后使用for循環(huán)接收
severities = sys.argv[1:]

if not severities:
    sys.stderr.write('Usage: %s [info] [warning] [error]\n' % __name__)
    sys.exit(1)

def callback(ch, method, properties, body):
    print('[x] Received %r' % body)

# 循環(huán)接收用戶指定的severities
for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                      queue=queue_name,
                      routing_key=severity)

channel.basic_consume(callback, queue=queue_name, no_ack=True)

print('[*] Waiting for messgaes. To exit press CTRL+C')

channel.start_consuming()
  1. 執(zhí)行
# 提交日志
> python3 emit_log_direct.py test_info
> python3 emit_log_direct.py warning test_warning

# consumer1接收日志
> python3 receive_logs_direct.py info warning
[*] Waiting for messgaes. To exit press CTRL+C
[x] Received b'test_info'
[x] Received b'warning test_warning'

# consumer2接收日志
> python3 receive_logs_direct.py warning
[*] Waiting for messgaes. To exit press CTRL+C
[x] Received b'warning test_warning'

參考文檔: http://www.rabbitmq.com/tutorials/tutorial-four-python.html


最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末逻澳,一起剝皮案震驚了整個濱河市闸天,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌斜做,老刑警劉巖苞氮,帶你破解...
    沈念sama閱讀 218,941評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異瓤逼,居然都是意外死亡笼吟,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評論 3 395
  • 文/潘曉璐 我一進店門霸旗,熙熙樓的掌柜王于貴愁眉苦臉地迎上來贷帮,“玉大人,你說我怎么就攤上這事诱告∧焓啵” “怎么了?”我有些...
    開封第一講書人閱讀 165,345評論 0 356
  • 文/不壞的土叔 我叫張陵精居,是天一觀的道長诲侮。 經(jīng)常有香客問我,道長箱蟆,這世上最難降的妖魔是什么沟绪? 我笑而不...
    開封第一講書人閱讀 58,851評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮空猜,結果婚禮上绽慈,老公的妹妹穿的比我還像新娘。我一直安慰自己辈毯,他們只是感情好坝疼,可當我...
    茶點故事閱讀 67,868評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著谆沃,像睡著了一般钝凶。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上唁影,一...
    開封第一講書人閱讀 51,688評論 1 305
  • 那天耕陷,我揣著相機與錄音,去河邊找鬼据沈。 笑死哟沫,一個胖子當著我的面吹牛,可吹牛的內容都是我干的锌介。 我是一名探鬼主播嗜诀,決...
    沈念sama閱讀 40,414評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼猾警,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了隆敢?” 一聲冷哼從身側響起发皿,我...
    開封第一講書人閱讀 39,319評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎拂蝎,沒想到半個月后雳窟,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,775評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡匣屡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年封救,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片捣作。...
    茶點故事閱讀 40,096評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡誉结,死狀恐怖,靈堂內的尸體忽然破棺而出券躁,到底是詐尸還是另有隱情惩坑,我是刑警寧澤,帶...
    沈念sama閱讀 35,789評論 5 346
  • 正文 年R本政府宣布也拜,位于F島的核電站以舒,受9級特大地震影響,放射性物質發(fā)生泄漏慢哈。R本人自食惡果不足惜蔓钟,卻給世界環(huán)境...
    茶點故事閱讀 41,437評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望卵贱。 院中可真熱鬧滥沫,春花似錦、人聲如沸键俱。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,993評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽编振。三九已至缀辩,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間踪央,已是汗流浹背臀玄。 一陣腳步聲響...
    開封第一講書人閱讀 33,107評論 1 271
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留杯瞻,地道東北人镐牺。 一個月前我還...
    沈念sama閱讀 48,308評論 3 372
  • 正文 我出身青樓,卻偏偏與公主長得像魁莉,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,037評論 2 355

推薦閱讀更多精彩內容

  • 來源 RabbitMQ是用Erlang實現(xiàn)的一個高并發(fā)高可靠AMQP消息隊列服務器旗唁。支持消息的持久化畦浓、事務、擁塞控...
    jiangmo閱讀 10,361評論 2 34
  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理检疫,服務發(fā)現(xiàn)讶请,斷路器,智...
    卡卡羅2017閱讀 134,659評論 18 139
  • rabbitMQ是一款基于AMQP協(xié)議的消息中間件屎媳,它能夠在應用之間提供可靠的消息傳輸夺溢。在易用性,擴展性烛谊,高可用性...
    點融黑幫閱讀 3,003評論 3 41
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 15,925評論 2 11
  • rabbitmq RabbitMQ官方入門教程 本文算是實現(xiàn)對入門教程的 java版本翻譯吧风响。本文中演示代碼地址 ...
    我不是李小龍閱讀 559評論 0 1