Python 消息中間件RabbitMQ使用

介紹

rabbitmq是基于Erlang語言編寫的一種消息隊(duì)列中間件麦撵,具體的內(nèi)容網(wǎng)上有很多這里就不贅述了涣达,本文主要介紹一下在python當(dāng)中基于第三方庫pika對(duì)rabbitmq的簡單使用

安裝

服務(wù)端
ubuntu安裝參考

https://www.cnblogs.com/vipstone/p/9184314.html

centos參考

https://blog.csdn.net/zhuzhezhuzhe1/article/details/80464291

客戶端
pip install pika

場(chǎng)景

  • 任務(wù)隊(duì)列
  • 發(fā)布訂閱內(nèi)容
  • ...

生產(chǎn)者-消費(fèi)者模式

在消息隊(duì)列當(dāng)中馍资,最簡單的就是生產(chǎn)者消費(fèi)者模式霸琴,即生產(chǎn)者發(fā)布一條消息旬薯,一個(gè)或者多個(gè)消費(fèi)者在監(jiān)聽等待般又,最終發(fā)布的消息被其中一個(gè)消費(fèi)者給取走執(zhí)行的模式慕蔚,下面是簡單的示例:

生產(chǎn)者
import pika
 
credentials = pika.PlainCredentials('test', 'test')
# rabbitmq隊(duì)列賬號(hào)密碼
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
# 連接配置:主要包括host丐黄、port(默認(rèn)15672)、credentials(登錄用戶名密碼)
channel = connection.channel()
# 創(chuàng)建一個(gè)連接通道
channel.queue_declare(queue='aaa')
# 聲明一個(gè)名為aaa的隊(duì)列
channel.basic_publish(exchange='',
                      routing_key='aaa',
                      body='this is a msg!')
# 往aaa隊(duì)列發(fā)送一條消息
connection.close()
#關(guān)閉連接
消費(fèi)者
import pika
 
credentials = pika.PlainCredentials('test', 'test')
# rabbitmq隊(duì)列賬號(hào)密碼
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
# 連接配置:主要包括host孔飒、port(默認(rèn)5672)灌闺、credentials(登錄用戶名密碼)
channel = connection.channel()
# 創(chuàng)建一個(gè)連接通道
channel.queue_declare(queue='aaa')
# 聲明一個(gè)名為aaa的隊(duì)列
def callback(ch, method, properties, body):
    print("Received msg: {}".format(body))
 
channel.basic_consume('aaa', callback, auto_ack=True)
# 從aaa隊(duì)列中取出一條消息,并執(zhí)行對(duì)應(yīng)回調(diào)坏瞄,第三個(gè)參數(shù)代表取到消息后自動(dòng)回復(fù)執(zhí)行完成桂对,生產(chǎn)者會(huì)將該任務(wù)消息刪除
# 在舊版里傳參順序和參數(shù)名可能有所不同(舊版里是:callback, queue='aaa', no_ack=True)
print('start...')
channel.start_consuming()
# 開啟循環(huán)監(jiān)聽消息隊(duì)列

此時(shí)如果打開多個(gè)消費(fèi)者,那么可以發(fā)現(xiàn)生產(chǎn)者的發(fā)送的消息隊(duì)列將會(huì)被消費(fèi)者按順序取走

生產(chǎn)者消費(fèi)者模式-常用配置

任務(wù)完成回復(fù)

auto_ack參數(shù)可以配置任務(wù)是否需要回復(fù)鸠匀,默認(rèn)是False蕉斜,即任務(wù)被取走之后,只有消費(fèi)者在回調(diào)當(dāng)中執(zhí)行了ch.basic_ack(delivery_tag=method.delivery_tag)方法以后,代表任務(wù)執(zhí)行完成宅此,此時(shí)生產(chǎn)者才會(huì)把任務(wù)從消息隊(duì)列當(dāng)中刪除机错,若消費(fèi)者沒能在關(guān)閉前執(zhí)行上面那句方法,那么別的消費(fèi)者將會(huì)在之后取走該任務(wù)去執(zhí)行父腕,直到生產(chǎn)者接收到執(zhí)行完成的指令為止毡熏。如果auto_ack值為True,那么當(dāng)任務(wù)被取走之后侣诵,生產(chǎn)者將直接把隊(duì)列中的任務(wù)刪除痢法。舉例:

# 消費(fèi)者
import pika
 
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue='aaa')

def callback(ch, method, properties, body):
    print("Received msg: {}".format(body))
    ch.basic_ack(delivery_tag=method.delivery_tag)
    # 回復(fù)任務(wù)完成
 
channel.basic_consume('aaa', callback, auto_ack=False)
# 設(shè)置auto_ack=False,此時(shí)消費(fèi)者必須提供任務(wù)完成的回復(fù)
print('start...')
channel.start_consuming()
消息隊(duì)列持久化

在隊(duì)列聲明時(shí)杜顺,可以通過參數(shù)durable配置是否需要持久化财搁,默認(rèn)為False,即不需要持久化躬络,此時(shí)如果服務(wù)端掛了尖奔,那么消息隊(duì)列的內(nèi)容將會(huì)丟失。如果配置持久化穷当,那么首先需要在聲明當(dāng)中設(shè)置durable=True提茁,然后在發(fā)布時(shí)也配置分發(fā)模式為持久化分發(fā),舉例:

# 生產(chǎn)者
import pika
 
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue='bbb', durable=True)
# 聲明一個(gè)消息持久化的隊(duì)列馁菜,因?yàn)橹耙呀?jīng)創(chuàng)建了aaa隊(duì)列茴扁,并且不是持久化的隊(duì)列,所以這里新建一個(gè)不存在的隊(duì)列
channel.basic_publish(exchange='',
                      routing_key='bbb',
                      body="this is a durable msg!", 
                      properties=pika.BasicProperties(
                          delivery_mode=2,
                          # 配置消息持久化
                      ))
connection.close()
閑置消費(fèi)

默認(rèn)是按照客戶端的順序一個(gè)個(gè)循環(huán)派發(fā)任務(wù)的汪疮,但要是第一個(gè)客戶端沒執(zhí)行完峭火,而下一個(gè)客戶端已經(jīng)執(zhí)行完了,此時(shí)如果還把任務(wù)派發(fā)給第一個(gè)就有些不好了智嚷,所以需要將任務(wù)派發(fā)給閑置的客戶端卖丸,類似于Nginx的負(fù)載均衡,實(shí)現(xiàn)只需要在消費(fèi)者當(dāng)中加入一句代碼:channel.basic_qos(prefetch_count=1)盏道,舉例:

# 消費(fèi)者
import pika
 
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue='aaa')

def callback(ch, method, properties, body):
    print("Received msg: {}".format(body))
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
# 設(shè)置閑置消費(fèi)
channel.basic_consume('aaa', callback, auto_ack=False)
print('start...')
channel.start_consuming()

發(fā)布訂閱模式-fanout

前面介紹的都是生產(chǎn)者發(fā)布一條任務(wù)消息稍浆,然后一個(gè)或者多個(gè)消費(fèi)者中的其中一個(gè)取走這個(gè)任務(wù)去執(zhí)行的情況。而有一種場(chǎng)景猜嘱,如廣播衅枫、微信公眾號(hào)的消息推送這種,往往需要將一條消息發(fā)布給所有的消費(fèi)者執(zhí)行泉坐,而在rabbitmq當(dāng)中就可以通過創(chuàng)建一個(gè)exchange交換器來創(chuàng)建和管理多個(gè)隊(duì)列为鳄,即一個(gè)exchange下有多個(gè)隊(duì)列裳仆,并且每個(gè)隊(duì)列對(duì)應(yīng)一個(gè)消費(fèi)者腕让,當(dāng)有消息的時(shí)候,exchange會(huì)將消息發(fā)送給自己所管理的所有隊(duì)列,此時(shí)需要設(shè)置類型為fanout纯丸,舉例:

發(fā)布者
import pika
 
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='ex1', exchange_type='fanout')
# 聲明一個(gè)名為ex1的交換器偏形,發(fā)布類型為集體分發(fā)
channel.basic_publish(exchange='ex1',
                      routing_key='',
                      body="everyone on ex1 will get this msg!"
                      )
# 往ex1中所有隊(duì)列分發(fā)消息
connection.close()
訂閱者
import pika
 
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='ex1', exchange_type='fanout')
# 聲明一個(gè)名為ex1的交換器,發(fā)布類型為集體分發(fā)
result = channel.queue_declare(queue='', exclusive=True)
# 這里不定義名字觉鼻,通過exclusive=True生成一個(gè)名字不重復(fù)的隊(duì)列
queue_name = result.method.queue
# 獲取隊(duì)列名
channel.queue_bind(queue_name, exchange='ex1')
# 綁定隊(duì)列名和ex1交換器
def callback(ch, method, properties, body):
    print("Received msg: {}".format(body))

channel.basic_consume(queue_name, callback, True)
# 使用隨機(jī)生成的隊(duì)列名在ex1交換器下接收消息
print('queue:{} start...'.format(queue_name))
channel.start_consuming()

注:
發(fā)布訂閱模式和生產(chǎn)消費(fèi)者模式還有一點(diǎn)不同就是:生產(chǎn)消費(fèi)者模式當(dāng)中俊扭,生產(chǎn)者產(chǎn)生的消費(fèi)只要沒被取走,那么消息就會(huì)一直留著等待被消費(fèi)者取走坠陈;而在發(fā)布訂閱模式當(dāng)中萨惑,發(fā)布者只會(huì)發(fā)布一次,發(fā)布完該消息就會(huì)被刪除仇矾,因此如果訂閱者沒有在發(fā)布者發(fā)布時(shí)接收到消息庸蔼,將永遠(yuǎn)錯(cuò)過接收的機(jī)會(huì)(就像關(guān)注公眾號(hào)以后,公眾號(hào)并不會(huì)把以前的所有歷史推送信息也給你重新再推送一遍一樣)

指定發(fā)布訂閱-direct

對(duì)于發(fā)布時(shí)贮匕,可以不給所有隊(duì)列發(fā)送消息姐仅,而是指定給哪些隊(duì)列發(fā)送,舉例:

發(fā)布者
import pika
 
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='ex2', exchange_type='direct')
# 聲明一個(gè)名為ex2的交換器刻盐,發(fā)布類型為指定路由分發(fā)
channel.basic_publish(exchange='ex2',
                      routing_key='test1',
                      body="only test1 will get this msg!"
                      )
# 往ex2中test1的路由分發(fā)消息
channel.basic_publish(exchange='ex2',
                      routing_key='test2',
                      body="only test2 will get this msg!"
                      )
# 往ex2中test2的路由分發(fā)消息
connection.close()
訂閱者
import pika
 
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='ex2', exchange_type='direct')
# 聲明一個(gè)名為ex2的交換器掏膏,接收類型為指定路由
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(queue_name, exchange='ex2', routing_key='test1')
# 綁定ex2與隊(duì)列,以及綁定對(duì)應(yīng)的路由
channel.queue_bind(queue_name, exchange='ex2', routing_key='test2')
channel.queue_bind(queue_name, exchange='ex2', routing_key='test3')
# 可以一個(gè)隊(duì)列綁定多個(gè)路由
def callback(ch, method, properties, body):
    print("Received msg: {}".format(body))

channel.basic_consume(queue_name, callback, True)
print('queue:{} start...'.format(queue_name))
channel.start_consuming()

模糊匹配發(fā)布訂閱-topic

在發(fā)布訂閱時(shí)也可以通過模糊匹配路由敦锌,當(dāng)符合匹配規(guī)則的路由將會(huì)接收到消息馒疹,其中常用的通配符有*#*后面能夠匹配一個(gè)單詞乙墙,#后面能夠匹配一個(gè)或多個(gè)單詞(例如有a.#a.*的匹配規(guī)則行冰,那么a.x能被兩個(gè)規(guī)則都匹配到,但a.x.y只能被a.#匹配到)伶丐,舉例:

發(fā)布者
import pika
 
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='ex3', exchange_type='topic')
# 聲明一個(gè)名為ex3的交換器悼做,發(fā)布類型為指定模糊匹配路由分發(fā)
channel.basic_publish(exchange='ex3',
                      routing_key='test.a',
                      body="test.# or test.* will get this msg!"
                      )
channel.basic_publish(exchange='ex3',
                      routing_key='test.b.c',
                      body="only test.# will get this msg!"
                      )
connection.close()
訂閱者
import pika
 
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='ex3', exchange_type='topic')
# 聲明一個(gè)名為ex3的交換器,發(fā)布類型為模糊匹配路由接收
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(queue_name, exchange='ex3', routing_key='test.#')
# 這里因?yàn)閞outing_key='test.#'哗魂,所以test.a和test.b.c的消息都能接收到

def callback(ch, method, properties, body):
    print("Received msg: {}".format(body))

channel.basic_consume(queue_name, callback, True)
print('queue:{} start...'.format(queue_name))
channel.start_consuming()

RPC

遠(yuǎn)程過程調(diào)用肛走,一個(gè)簡單的理解就是假如本地要調(diào)用一個(gè)函數(shù),而服務(wù)端已經(jīng)實(shí)現(xiàn)了該函數(shù)录别,那么可以向服務(wù)端請(qǐng)求調(diào)用該函數(shù)朽色,并把返回值返回給本地,具體參考:https://www.cnblogs.com/goldsunshine/p/8665456.html

其他操作

pika模塊當(dāng)中也提供了如刪除/解綁隊(duì)列组题、刪除/解綁交換器等操作葫男,舉例:

import pika
 
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_delete("ex1")
# 刪除交換器
channel.queue_delete("aaa")
# 刪除隊(duì)列
connection.close()

更多關(guān)于pika操作參考:https://www.cnblogs.com/cwp-bg/p/8426188.html

更多參考

RabbitMQ的六種工作模式
Python RabbitMQ原理和使用場(chǎng)景以及模式

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市崔列,隨后出現(xiàn)的幾起案子梢褐,更是在濱河造成了極大的恐慌旺遮,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,941評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件盈咳,死亡現(xiàn)場(chǎng)離奇詭異耿眉,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)鱼响,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門鸣剪,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人丈积,你說我怎么就攤上這事筐骇。” “怎么了江滨?”我有些...
    開封第一講書人閱讀 165,345評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵拥褂,是天一觀的道長。 經(jīng)常有香客問我牙寞,道長饺鹃,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,851評(píng)論 1 295
  • 正文 為了忘掉前任间雀,我火速辦了婚禮悔详,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘惹挟。我一直安慰自己茄螃,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,868評(píng)論 6 392
  • 文/花漫 我一把揭開白布连锯。 她就那樣靜靜地躺著归苍,像睡著了一般。 火紅的嫁衣襯著肌膚如雪运怖。 梳的紋絲不亂的頭發(fā)上拼弃,一...
    開封第一講書人閱讀 51,688評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音摇展,去河邊找鬼吻氧。 笑死,一個(gè)胖子當(dāng)著我的面吹牛咏连,可吹牛的內(nèi)容都是我干的盯孙。 我是一名探鬼主播,決...
    沈念sama閱讀 40,414評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼祟滴,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼振惰!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起垄懂,我...
    開封第一講書人閱讀 39,319評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤骑晶,失蹤者是張志新(化名)和其女友劉穎痛垛,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體透罢,經(jīng)...
    沈念sama閱讀 45,775評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡榜晦,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,945評(píng)論 3 336
  • 正文 我和宋清朗相戀三年冠蒋,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了羽圃。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,096評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡抖剿,死狀恐怖朽寞,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情斩郎,我是刑警寧澤脑融,帶...
    沈念sama閱讀 35,789評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站缩宜,受9級(jí)特大地震影響肘迎,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜锻煌,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,437評(píng)論 3 331
  • 文/蒙蒙 一妓布、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧宋梧,春花似錦匣沼、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,993評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至倦沧,卻和暖如春唇撬,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背展融。 一陣腳步聲響...
    開封第一講書人閱讀 33,107評(píng)論 1 271
  • 我被黑心中介騙來泰國打工局荚, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人愈污。 一個(gè)月前我還...
    沈念sama閱讀 48,308評(píng)論 3 372
  • 正文 我出身青樓耀态,卻偏偏與公主長得像,于是被迫代替她去往敵國和親暂雹。 傳聞我的和親對(duì)象是個(gè)殘疾皇子首装,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,037評(píng)論 2 355