RabbitMQ消息訂閱發(fā)布

Publish\Subscribe(消息發(fā)布\訂閱)

廣播策略:每個人都能收到溉浙;或是過濾某些人可以接收
一個生產(chǎn)者蒋荚,對應對個消費者圆裕!

exchange type 過濾類型
fanout = 廣播
direct = 組播
topic = 規(guī)則播

之前的例子都基本都是1對1的消息發(fā)送和接收,即消息只能發(fā)送到指定的queue里赊时,但有些時候你想讓你的消息被所有的Queue收到行拢,類似廣播的效果,這時候就要用到exchange了竭缝。
exchange在定義的時候是有類型的沼瘫,以決定到底是哪些Queue符合條件,可以接收消息湿故。

fanout: 所有bind到此exchange的queue都可以接收消息
direct: 通過routingKey和exchange決定的那個唯一的queue可以接收消息
topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息
headers: 通過headers 來決定把消息發(fā)給哪些queue
表達式符號說明:#代表一個或多個字符膜蛔,*代表任何字符

例:#.a會匹配a.a皂股,aa.a,aaa.a等
    *.a會匹配a.a,b.a悍募,c.a等

注:使用RoutingKey為#战转,Exchange Type為topic的時候相當于使用fanout

廣播模式(一個生產(chǎn)者槐秧,多個消費者)

1忧设、路由指定為空!所有消息都發(fā)給exchange處理轉(zhuǎn)到隊列膀懈,轉(zhuǎn)到哪個隊列就需要exchange指定谨垃,所以在建立連接的時候要指定名字刘陶。
注意:exchange只負責轉(zhuǎn)發(fā)不負責存放消息!如果沒有隊列綁定消息就會扔掉疑苫!
2纷责、自動生成隊列名,然后使用完之后再刪掉
隊列參數(shù)exclusive=True唯一的挺勿,rabbit 隨機生成一個名字喂柒。
3、生產(chǎn)者和消費者端都要聲明隊列湃番,以排除生成者未啟動吭露,消費者獲取報錯的問題
4、生產(chǎn)者發(fā)送一條消息泥兰,說有的消費者都能接收到!高效膀捷,效率的完成發(fā)送削彬!

應用場景:新浪微博-訂閱模式,只有當前登錄的用戶才可以收到實時發(fā)送的消息

生產(chǎn)者代碼

import pika
import sys

credentials = pika.PlainCredentials('alex', 'alex123')

parameters = pika.ConnectionParameters(host='localhost', credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel()  # 隊列連接通道

channel.exchange_declare(exchange='logs', exchange_type='fanout')  # 聲明隊列 exchange名字和類型

message = ' '.join(sys.argv[1:]) or "info: Hello World!"  # 獲取外界輸入的信息壶笼,否則就是hello world

channel.basic_publish(exchange='logs',  # 指定exchange的名字
                      routing_key='',  # 注意覆劈,不需要指定隊列名沛励!
                      body=message)  # 信息
print(" [x] Sent %r" % message)
connection.close()

消費者代碼

import pika

credentials = pika.PlainCredentials('alex', 'alex123')

parameters = pika.ConnectionParameters(host='localhost', credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel()  # 隊列連接通道
channel.exchange_declare(exchange='logs', exchange_type='fanout')

# 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除
queue_obj = channel.queue_declare('',exclusive=True)

queue_name = queue_obj.method.queue  # 獲取隊列名
print('queue name', queue_name, queue_obj)  # 打印會列名

channel.queue_bind(queue=queue_name, exchange='logs')  # 綁定隊列到Exchange

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r" % body)


channel.basic_consume(on_message_callback=callback, queue=queue_name, auto_ack=True)

channel.start_consuming()

direct 組播模式:有選擇的接收消息(exchange type=direct)

1目派、有選擇的接收消息(exchange type=direct)址貌,RabbitMQ還支持根據(jù)關(guān)鍵字發(fā)送,相當于是添加了一個過濾地帶遍蟋!
即:隊列綁定關(guān)鍵字螟凭,發(fā)送者將數(shù)據(jù)根據(jù)關(guān)鍵字發(fā)送到消息exchange,exchange根據(jù) 關(guān)鍵字 判定應該將數(shù)據(jù)發(fā)送至指定隊列棒厘。
2下隧、發(fā)什么類型的,什么類型的接收,在接收端運行的時候加參數(shù)何乎,指定接收的類型。
3抢野、routing_key = 'xxx' 與廣播相比不再為空各墨,隊列由執(zhí)行時手動輸入獲取,然后路由指定發(fā)送到哪個隊列恃轩。
4扁瓢、按照類型:生產(chǎn)者發(fā)送指定類型的消息引几;消費者循環(huán)綁定隊列挽铁,如果不存在不接收
例子:就像廣播電臺,要想接收生產(chǎn)者發(fā)送的數(shù)據(jù)楣铁,必須是綁定且在線更扁!如果斷開一段時間再接收該電臺消息,之前的訊息就不會再收到溃列!

應用場景:日志分類處理邏輯 【注:可以同時存在多個消費者】

生產(chǎn)者代碼

# rabbitmq_send.py
import pika
import sys

credentials = pika.PlainCredentials('alex', 'alex123')

parameters = pika.ConnectionParameters(host='localhost', credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel()  # 隊列連接通道

channel.exchange_declare(exchange='direct_log', exchange_type='direct')  # 聲明消息隊列及類型

log_level = sys.argv[1] if len(sys.argv) > 1 else 'info'  # 日志等級

message = ' '.join(sys.argv[1:]) or "info: Hello World!"  # 接收手動輸入的消息內(nèi)容

channel.basic_publish(exchange='direct_log',
                      routing_key=log_level,
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

消費者代碼

# rabbitmq_receive.py
import pika, sys

credentials = pika.PlainCredentials('alex', 'alex123')

parameters = pika.ConnectionParameters(host='localhost', credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel()  # 隊列連接通道

# 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除
queue_obj = channel.queue_declare('', exclusive=True)

queue_name = queue_obj.method.queue
print('queue name', queue_name, queue_obj)

log_levels = sys.argv[1:]  # 日志等級 info warning error danger

# 判斷存不存在听隐,不存在退出
if not log_levels:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

# 循環(huán)綁定隊列
for level in log_levels:
    channel.queue_bind(exchange='direct_log',
                       queue=queue_name,
                       routing_key=level)  # 綁定隊列到Exchange

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r" % body)


channel.basic_consume(on_message_callback=callback, queue=queue_name, auto_ack=True)
channel.start_consuming()

需要用命令行傳遞參數(shù)來啟動消費者雅任,以便于接收規(guī)定的日志級別的隊列消息咨跌,如:

python rabbitmq_receive.py info
python rabbitmq_receive.py warning error

生產(chǎn)者也可以用命令行啟動以指定發(fā)送的級別锌半,如:

python rabbitmq_send.py info
python rabbitmq_send.py error

topic規(guī)則播

話題類型,可以根據(jù)正則進行更精確的匹配哭当,按照規(guī)則過濾。exchange_type = topic陋葡,僅改下類型即可彻采!
在topic類型下,可以讓隊列綁定幾個模糊的關(guān)鍵字岭粤,之后發(fā)送者將數(shù)據(jù)發(fā)送到exchange特笋,exchange將傳入”路由值“和 ”關(guān)鍵字“進行匹配,匹配成功虎囚,則將數(shù)據(jù)發(fā)送到指定隊列蔫磨。

# 表示可以匹配 0 個 或 多個 單詞
* 表示只能匹配 一個 單詞

接收所有運行的日志:
python receive_logs_topic.py "#"
接收"mysql"的所有日志:
python receive_logs_topic.py "mysql.*"
或者如果只想接收"critical"日志:
python receive_logs_topic.py "*.critical"
可以創(chuàng)建多個綁定:
python receive_logs_topic.py "mysql.*" "*.critical"
并發(fā)出帶有routing_key的日志類型"mysql.critical":
python emit_log_topic.py "mysql.critical" "A critical kernel error"

#測試執(zhí)行如下:
#客戶端一:
    - python3 receive1.py *.django
#客戶端二:
    - python3 receive1.py mysql.error
#客戶端三:
    - python3 receive1.py mysql.*
     
#服務端:
    - python3 receive1.py  #匹配相應的客戶端

生產(chǎn)者代碼

import pika
import sys

credentials = pika.PlainCredentials('alex', 'alex123')

parameters = pika.ConnectionParameters(host='localhost',credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel() #隊列連接通道

channel.exchange_declare(exchange='topic_log',exchange_type='topic')

#log_level =  sys.argv[1] if len(sys.argv) > 1 else 'info'
log_level =  sys.argv[1] if len(sys.argv) > 1 else 'all.info'

message = ' '.join(sys.argv[1:]) or "all.info: Hello World!"

channel.basic_publish(exchange='topic_log',
                      routing_key=log_level,
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

消費者代碼

import pika,sys
credentials = pika.PlainCredentials('alex', 'alex123')

parameters = pika.ConnectionParameters(host='localhost',credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel() #隊列連接通道

#不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除
queue_obj = channel.queue_declare('', exclusive=True)
queue_name = queue_obj.method.queue

log_levels = sys.argv[1:] # info warning errr

if not log_levels:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for level in log_levels:
    channel.queue_bind(exchange='topic_log',
                       queue=queue_name,
                       routing_key=level) #綁定隊列到Exchange

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(on_message_callback=callback,queue=queue_name, auto_ack=True)
channel.start_consuming()
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市蝗岖,隨后出現(xiàn)的幾起案子魄揉,更是在濱河造成了極大的恐慌,老刑警劉巖瓣俯,帶你破解...
    沈念sama閱讀 217,734評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件兵怯,死亡現(xiàn)場離奇詭異媒区,居然都是意外死亡掸犬,警方通過查閱死者的電腦和手機绪爸,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,931評論 3 394
  • 文/潘曉璐 我一進店門奠货,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人递惋,你說我怎么就攤上這事萍虽。” “怎么了超全?”我有些...
    開封第一講書人閱讀 164,133評論 0 354
  • 文/不壞的土叔 我叫張陵邓馒,是天一觀的道長。 經(jīng)常有香客問我,道長挂疆,這世上最難降的妖魔是什么下翎? 我笑而不...
    開封第一講書人閱讀 58,532評論 1 293
  • 正文 為了忘掉前任视事,我火速辦了婚禮,結(jié)果婚禮上跌穗,老公的妹妹穿的比我還像新娘虏辫。我一直安慰自己,他們只是感情好砌庄,可當我...
    茶點故事閱讀 67,585評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著缝彬,像睡著了一般哺眯。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上壳贪,一...
    開封第一講書人閱讀 51,462評論 1 302
  • 那天违施,我揣著相機與錄音瑟幕,去河邊找鬼。 笑死辣往,一個胖子當著我的面吹牛殖卑,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播孵稽,決...
    沈念sama閱讀 40,262評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼菩鲜,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了猛频?” 一聲冷哼從身側(cè)響起蛛勉,我...
    開封第一講書人閱讀 39,153評論 0 276
  • 序言:老撾萬榮一對情侶失蹤董习,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后赎败,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體狱从,經(jīng)...
    沈念sama閱讀 45,587評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡灿巧,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,792評論 3 336
  • 正文 我和宋清朗相戀三年锅知,在試婚紗的時候發(fā)現(xiàn)自己被綠了幻馁。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片越锈。...
    茶點故事閱讀 39,919評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡仗嗦,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出甘凭,到底是詐尸還是另有隱情稀拐,我是刑警寧澤,帶...
    沈念sama閱讀 35,635評論 5 345
  • 正文 年R本政府宣布丹弱,位于F島的核電站德撬,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏躲胳。R本人自食惡果不足惜蜓洪,卻給世界環(huán)境...
    茶點故事閱讀 41,237評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望坯苹。 院中可真熱鬧隆檀,春花似錦、人聲如沸粹湃。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,855評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至坚冀,卻和暖如春济赎,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背记某。 一陣腳步聲響...
    開封第一講書人閱讀 32,983評論 1 269
  • 我被黑心中介騙來泰國打工司训, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人液南。 一個月前我還...
    沈念sama閱讀 48,048評論 3 370
  • 正文 我出身青樓壳猜,卻偏偏與公主長得像,于是被迫代替她去往敵國和親滑凉。 傳聞我的和親對象是個殘疾皇子统扳,可洞房花燭夜當晚...
    茶點故事閱讀 44,864評論 2 354