Publish\Subscribe(消息發(fā)布\訂閱)
廣播策略:每個人都能收到溉浙;或是過濾某些人可以接收
一個生產(chǎn)者蒋荚,對應對個消費者圆裕!
exchange type 過濾類型
fanout = 廣播
direct = 組播
topic = 規(guī)則播
之前的例子都基本都是1對1的消息發(fā)送和接收,即消息只能發(fā)送到指定的queue里赊时,但有些時候你想讓你的消息被所有的Queue收到行拢,類似廣播的效果,這時候就要用到exchange了竭缝。
exchange在定義的時候是有類型的沼瘫,以決定到底是哪些Queue符合條件,可以接收消息湿故。
fanout: 所有bind到此exchange的queue都可以接收消息
direct: 通過routingKey和exchange決定的那個唯一的queue可以接收消息
topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息
headers: 通過headers 來決定把消息發(fā)給哪些queue
表達式符號說明:#代表一個或多個字符膜蛔,*代表任何字符
例:#.a會匹配a.a皂股,aa.a,aaa.a等
*.a會匹配a.a,b.a悍募,c.a等
注:使用RoutingKey為#战转,Exchange Type為topic的時候相當于使用fanout
廣播模式(一個生產(chǎn)者槐秧,多個消費者)
1忧设、路由指定為空!所有消息都發(fā)給exchange處理轉(zhuǎn)到隊列膀懈,轉(zhuǎn)到哪個隊列就需要exchange指定谨垃,所以在建立連接的時候要指定名字刘陶。
注意:exchange只負責轉(zhuǎn)發(fā)不負責存放消息!如果沒有隊列綁定消息就會扔掉疑苫!
2纷责、自動生成隊列名,然后使用完之后再刪掉
隊列參數(shù)exclusive=True唯一的挺勿,rabbit 隨機生成一個名字喂柒。
3、生產(chǎn)者和消費者端都要聲明隊列湃番,以排除生成者未啟動吭露,消費者獲取報錯的問題
4、生產(chǎn)者發(fā)送一條消息泥兰,說有的消費者都能接收到!高效膀捷,效率的完成發(fā)送削彬!
應用場景:新浪微博-訂閱模式,只有當前登錄的用戶才可以收到實時發(fā)送的消息
生產(chǎn)者代碼
import pika
import sys
credentials = pika.PlainCredentials('alex', 'alex123')
parameters = pika.ConnectionParameters(host='localhost', credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel() # 隊列連接通道
channel.exchange_declare(exchange='logs', exchange_type='fanout') # 聲明隊列 exchange名字和類型
message = ' '.join(sys.argv[1:]) or "info: Hello World!" # 獲取外界輸入的信息壶笼,否則就是hello world
channel.basic_publish(exchange='logs', # 指定exchange的名字
routing_key='', # 注意覆劈,不需要指定隊列名沛励!
body=message) # 信息
print(" [x] Sent %r" % message)
connection.close()
消費者代碼
import pika
credentials = pika.PlainCredentials('alex', 'alex123')
parameters = pika.ConnectionParameters(host='localhost', credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel() # 隊列連接通道
channel.exchange_declare(exchange='logs', exchange_type='fanout')
# 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除
queue_obj = channel.queue_declare('',exclusive=True)
queue_name = queue_obj.method.queue # 獲取隊列名
print('queue name', queue_name, queue_obj) # 打印會列名
channel.queue_bind(queue=queue_name, exchange='logs') # 綁定隊列到Exchange
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(on_message_callback=callback, queue=queue_name, auto_ack=True)
channel.start_consuming()
direct 組播模式:有選擇的接收消息(exchange type=direct)
1目派、有選擇的接收消息(exchange type=direct)址貌,RabbitMQ還支持根據(jù)關(guān)鍵字發(fā)送,相當于是添加了一個過濾地帶遍蟋!
即:隊列綁定關(guān)鍵字螟凭,發(fā)送者將數(shù)據(jù)根據(jù)關(guān)鍵字發(fā)送到消息exchange,exchange根據(jù) 關(guān)鍵字 判定應該將數(shù)據(jù)發(fā)送至指定隊列棒厘。
2下隧、發(fā)什么類型的,什么類型的接收,在接收端運行的時候加參數(shù)何乎,指定接收的類型。
3抢野、routing_key = 'xxx' 與廣播相比不再為空各墨,隊列由執(zhí)行時手動輸入獲取,然后路由指定發(fā)送到哪個隊列恃轩。
4扁瓢、按照類型:生產(chǎn)者發(fā)送指定類型的消息引几;消費者循環(huán)綁定隊列挽铁,如果不存在不接收
例子:就像廣播電臺,要想接收生產(chǎn)者發(fā)送的數(shù)據(jù)楣铁,必須是綁定且在線更扁!如果斷開一段時間再接收該電臺消息,之前的訊息就不會再收到溃列!
應用場景:日志分類處理邏輯 【注:可以同時存在多個消費者】
生產(chǎn)者代碼
# rabbitmq_send.py
import pika
import sys
credentials = pika.PlainCredentials('alex', 'alex123')
parameters = pika.ConnectionParameters(host='localhost', credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel() # 隊列連接通道
channel.exchange_declare(exchange='direct_log', exchange_type='direct') # 聲明消息隊列及類型
log_level = sys.argv[1] if len(sys.argv) > 1 else 'info' # 日志等級
message = ' '.join(sys.argv[1:]) or "info: Hello World!" # 接收手動輸入的消息內(nèi)容
channel.basic_publish(exchange='direct_log',
routing_key=log_level,
body=message)
print(" [x] Sent %r" % message)
connection.close()
消費者代碼
# rabbitmq_receive.py
import pika, sys
credentials = pika.PlainCredentials('alex', 'alex123')
parameters = pika.ConnectionParameters(host='localhost', credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel() # 隊列連接通道
# 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除
queue_obj = channel.queue_declare('', exclusive=True)
queue_name = queue_obj.method.queue
print('queue name', queue_name, queue_obj)
log_levels = sys.argv[1:] # 日志等級 info warning error danger
# 判斷存不存在听隐,不存在退出
if not log_levels:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
# 循環(huán)綁定隊列
for level in log_levels:
channel.queue_bind(exchange='direct_log',
queue=queue_name,
routing_key=level) # 綁定隊列到Exchange
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(on_message_callback=callback, queue=queue_name, auto_ack=True)
channel.start_consuming()
需要用命令行傳遞參數(shù)來啟動消費者雅任,以便于接收規(guī)定的日志級別的隊列消息咨跌,如:
python rabbitmq_receive.py info
python rabbitmq_receive.py warning error
生產(chǎn)者也可以用命令行啟動以指定發(fā)送的級別锌半,如:
python rabbitmq_send.py info
python rabbitmq_send.py error
topic規(guī)則播
話題類型,可以根據(jù)正則進行更精確的匹配哭当,按照規(guī)則過濾。exchange_type = topic陋葡,僅改下類型即可彻采!
在topic類型下,可以讓隊列綁定幾個模糊的關(guān)鍵字岭粤,之后發(fā)送者將數(shù)據(jù)發(fā)送到exchange特笋,exchange將傳入”路由值“和 ”關(guān)鍵字“進行匹配,匹配成功虎囚,則將數(shù)據(jù)發(fā)送到指定隊列蔫磨。
# 表示可以匹配 0 個 或 多個 單詞
* 表示只能匹配 一個 單詞
接收所有運行的日志:
python receive_logs_topic.py "#"
接收"mysql"的所有日志:
python receive_logs_topic.py "mysql.*"
或者如果只想接收"critical"日志:
python receive_logs_topic.py "*.critical"
可以創(chuàng)建多個綁定:
python receive_logs_topic.py "mysql.*" "*.critical"
并發(fā)出帶有routing_key的日志類型"mysql.critical":
python emit_log_topic.py "mysql.critical" "A critical kernel error"
#測試執(zhí)行如下:
#客戶端一:
- python3 receive1.py *.django
#客戶端二:
- python3 receive1.py mysql.error
#客戶端三:
- python3 receive1.py mysql.*
#服務端:
- python3 receive1.py #匹配相應的客戶端
生產(chǎn)者代碼
import pika
import sys
credentials = pika.PlainCredentials('alex', 'alex123')
parameters = pika.ConnectionParameters(host='localhost',credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel() #隊列連接通道
channel.exchange_declare(exchange='topic_log',exchange_type='topic')
#log_level = sys.argv[1] if len(sys.argv) > 1 else 'info'
log_level = sys.argv[1] if len(sys.argv) > 1 else 'all.info'
message = ' '.join(sys.argv[1:]) or "all.info: Hello World!"
channel.basic_publish(exchange='topic_log',
routing_key=log_level,
body=message)
print(" [x] Sent %r" % message)
connection.close()
消費者代碼
import pika,sys
credentials = pika.PlainCredentials('alex', 'alex123')
parameters = pika.ConnectionParameters(host='localhost',credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel() #隊列連接通道
#不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除
queue_obj = channel.queue_declare('', exclusive=True)
queue_name = queue_obj.method.queue
log_levels = sys.argv[1:] # info warning errr
if not log_levels:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
for level in log_levels:
channel.queue_bind(exchange='topic_log',
queue=queue_name,
routing_key=level) #綁定隊列到Exchange
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(on_message_callback=callback,queue=queue_name, auto_ack=True)
channel.start_consuming()