上節(jié)介紹了將每個消息投遞給一個consumer邑遏,本節(jié)介紹Publish/Subscribe(推送梢灭、訂閱)取具,將一個消息投遞到多個consumer价认,實質是通過廣播形式將消息傳遞給所有consumer
RabbitMQ消息模塊的核心理念是producer不直接將消息交付到隊列,并且producer通常不知道將消息交付到哪個隊列中顾患,producer僅僅將消息發(fā)送到 exchange番捂。exchange其實就是一側接受producer交付的消息,另一側將消息推送到隊列中江解。exchange必須知道接收的消息內容设预,然后通過 exchange type 來定義是將消息推送到指定隊列、推送到多個隊列還是直接丟棄消息犁河。
1. Exchange
exchange 的有效類型有:direct鳖枕,topic,headers和fanout
這里主要關注"fanout"桨螺,創(chuàng)建并命名為"logs":
channel.exchange_declare(exchange='logs', exchange_type='fanout')
fanout exchange宾符,從字面也是就能知道,它廣播所有的消息到它所知道的隊列灭翔。
可以通過 rabbitmqctl 命令列出當前的exchange:
rabbitmqctl list_exchanges
空字符("")代表默認exchange魏烫,消息將會交付到 routing_key 指定的隊列中,前提是此隊列必須存在肝箱。
2. Temporary queues
1.連接到Rabbit的一個臨時隊列(使用隨機名稱命名隊列)则奥,在聲明隊列時不需要指定
queue
參數:result = channel.queue_declare()
2.一旦consumer連接關閉,則刪除隊列狭园。設置
exclusive
標志:result = channel.queue_declare(exclusive=True)
3. Bindings
上面已經創(chuàng)建了"fanout exchange"和queue读处,現在需要告訴exchange將消息發(fā)送到queue。(關聯exchange和queue的動作叫: binding)
channel.queue_bind(exchange='logs', queue=result.method.queue) # result.method.queue 是一個隨機的隊列名唱矛,例如:amq.gen-JzTY20BRgKO-HjmUJj0wLg
列出"bindings":
rabbitmqctl list_bindings
構建一個日志系統(tǒng)罚舱,由兩個程序組成,第一個發(fā)送日志消息绎谦,第二個接受并且打印
- emit_log.py 提交日志
#!/usr/bin/env python3
# coding=utf-8
import pika
message = ' '.join(sys.argv[1:]) or 'Hello World!'
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
# 聲明"logs" exchange管闷,類型為fanout(廣播)
channel.exchange_declare(exchange='logs', exchange_type='fanout')
channel.basic_publish(exchange='logs',
routing_key='', # fanout模式,需要提供此參數窃肠,但是忽略此值
body=message,
)
print("[x] Sent '%s'" % message)
connection.close()
- receive_logs.py 接受并打印日志
#!/usr/bin/env python3
# coding=utf-8
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
# 聲明"logs" exchange包个,類型為fanout(廣播)
channel.exchange_declare(exchange="logs", exchange_type="fanout")
# 使用RabbitMQ生成隨機名的臨時隊列,設置exclusive標志冤留,當consumer斷開連接時碧囊,銷毀隊列
result = channel.queue_declare(exclusive=True)
def callback(ch, method, properties, body):
print('[x] Received %r' % body)
# result.method.queue 隊列名稱
queue_name = result.method.queue
# 綁定exchange和queue
channel.queue_bind(exchange='logs', queue=queue_name)
channel.basic_consume(callback, queue=queue_name, no_ack=True)
print('[*] Waiting for messgaes. To exit press CTRL+C')
channel.start_consuming()
參考文檔:http://www.rabbitmq.com/tutorials/tutorial-three-python.html