介紹
rabbitmq是基于Erlang語言編寫的一種消息隊(duì)列中間件麦撵,具體的內(nèi)容網(wǎng)上有很多這里就不贅述了涣达,本文主要介紹一下在python當(dāng)中基于第三方庫pika
對(duì)rabbitmq的簡單使用
安裝
服務(wù)端
ubuntu安裝參考
https://www.cnblogs.com/vipstone/p/9184314.html
centos參考
https://blog.csdn.net/zhuzhezhuzhe1/article/details/80464291
客戶端
pip install pika
場(chǎng)景
- 任務(wù)隊(duì)列
- 發(fā)布訂閱內(nèi)容
- ...
生產(chǎn)者-消費(fèi)者模式
在消息隊(duì)列當(dāng)中馍资,最簡單的就是生產(chǎn)者消費(fèi)者模式霸琴,即生產(chǎn)者發(fā)布一條消息旬薯,一個(gè)或者多個(gè)消費(fèi)者在監(jiān)聽等待般又,最終發(fā)布的消息被其中一個(gè)消費(fèi)者給取走執(zhí)行的模式慕蔚,下面是簡單的示例:
生產(chǎn)者
import pika
credentials = pika.PlainCredentials('test', 'test')
# rabbitmq隊(duì)列賬號(hào)密碼
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
# 連接配置:主要包括host丐黄、port(默認(rèn)15672)、credentials(登錄用戶名密碼)
channel = connection.channel()
# 創(chuàng)建一個(gè)連接通道
channel.queue_declare(queue='aaa')
# 聲明一個(gè)名為aaa的隊(duì)列
channel.basic_publish(exchange='',
routing_key='aaa',
body='this is a msg!')
# 往aaa隊(duì)列發(fā)送一條消息
connection.close()
#關(guān)閉連接
消費(fèi)者
import pika
credentials = pika.PlainCredentials('test', 'test')
# rabbitmq隊(duì)列賬號(hào)密碼
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
# 連接配置:主要包括host孔飒、port(默認(rèn)5672)灌闺、credentials(登錄用戶名密碼)
channel = connection.channel()
# 創(chuàng)建一個(gè)連接通道
channel.queue_declare(queue='aaa')
# 聲明一個(gè)名為aaa的隊(duì)列
def callback(ch, method, properties, body):
print("Received msg: {}".format(body))
channel.basic_consume('aaa', callback, auto_ack=True)
# 從aaa隊(duì)列中取出一條消息,并執(zhí)行對(duì)應(yīng)回調(diào)坏瞄,第三個(gè)參數(shù)代表取到消息后自動(dòng)回復(fù)執(zhí)行完成桂对,生產(chǎn)者會(huì)將該任務(wù)消息刪除
# 在舊版里傳參順序和參數(shù)名可能有所不同(舊版里是:callback, queue='aaa', no_ack=True)
print('start...')
channel.start_consuming()
# 開啟循環(huán)監(jiān)聽消息隊(duì)列
此時(shí)如果打開多個(gè)消費(fèi)者,那么可以發(fā)現(xiàn)生產(chǎn)者的發(fā)送的消息隊(duì)列將會(huì)被消費(fèi)者按順序取走
生產(chǎn)者消費(fèi)者模式-常用配置
任務(wù)完成回復(fù)
auto_ack
參數(shù)可以配置任務(wù)是否需要回復(fù)鸠匀,默認(rèn)是False
蕉斜,即任務(wù)被取走之后,只有消費(fèi)者在回調(diào)當(dāng)中執(zhí)行了ch.basic_ack(delivery_tag=method.delivery_tag)
方法以后,代表任務(wù)執(zhí)行完成宅此,此時(shí)生產(chǎn)者才會(huì)把任務(wù)從消息隊(duì)列當(dāng)中刪除机错,若消費(fèi)者沒能在關(guān)閉前執(zhí)行上面那句方法,那么別的消費(fèi)者將會(huì)在之后取走該任務(wù)去執(zhí)行父腕,直到生產(chǎn)者接收到執(zhí)行完成的指令為止毡熏。如果auto_ack
值為True
,那么當(dāng)任務(wù)被取走之后侣诵,生產(chǎn)者將直接把隊(duì)列中的任務(wù)刪除痢法。舉例:
# 消費(fèi)者
import pika
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue='aaa')
def callback(ch, method, properties, body):
print("Received msg: {}".format(body))
ch.basic_ack(delivery_tag=method.delivery_tag)
# 回復(fù)任務(wù)完成
channel.basic_consume('aaa', callback, auto_ack=False)
# 設(shè)置auto_ack=False,此時(shí)消費(fèi)者必須提供任務(wù)完成的回復(fù)
print('start...')
channel.start_consuming()
消息隊(duì)列持久化
在隊(duì)列聲明時(shí)杜顺,可以通過參數(shù)durable
配置是否需要持久化财搁,默認(rèn)為False
,即不需要持久化躬络,此時(shí)如果服務(wù)端掛了尖奔,那么消息隊(duì)列的內(nèi)容將會(huì)丟失。如果配置持久化穷当,那么首先需要在聲明當(dāng)中設(shè)置durable=True
提茁,然后在發(fā)布時(shí)也配置分發(fā)模式為持久化分發(fā),舉例:
# 生產(chǎn)者
import pika
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue='bbb', durable=True)
# 聲明一個(gè)消息持久化的隊(duì)列馁菜,因?yàn)橹耙呀?jīng)創(chuàng)建了aaa隊(duì)列茴扁,并且不是持久化的隊(duì)列,所以這里新建一個(gè)不存在的隊(duì)列
channel.basic_publish(exchange='',
routing_key='bbb',
body="this is a durable msg!",
properties=pika.BasicProperties(
delivery_mode=2,
# 配置消息持久化
))
connection.close()
閑置消費(fèi)
默認(rèn)是按照客戶端的順序一個(gè)個(gè)循環(huán)派發(fā)任務(wù)的汪疮,但要是第一個(gè)客戶端沒執(zhí)行完峭火,而下一個(gè)客戶端已經(jīng)執(zhí)行完了,此時(shí)如果還把任務(wù)派發(fā)給第一個(gè)就有些不好了智嚷,所以需要將任務(wù)派發(fā)給閑置的客戶端卖丸,類似于Nginx的負(fù)載均衡,實(shí)現(xiàn)只需要在消費(fèi)者當(dāng)中加入一句代碼:channel.basic_qos(prefetch_count=1)
盏道,舉例:
# 消費(fèi)者
import pika
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue='aaa')
def callback(ch, method, properties, body):
print("Received msg: {}".format(body))
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
# 設(shè)置閑置消費(fèi)
channel.basic_consume('aaa', callback, auto_ack=False)
print('start...')
channel.start_consuming()
發(fā)布訂閱模式-fanout
前面介紹的都是生產(chǎn)者發(fā)布一條任務(wù)消息稍浆,然后一個(gè)或者多個(gè)消費(fèi)者中的其中一個(gè)取走這個(gè)任務(wù)去執(zhí)行的情況。而有一種場(chǎng)景猜嘱,如廣播衅枫、微信公眾號(hào)的消息推送這種,往往需要將一條消息發(fā)布給所有的消費(fèi)者執(zhí)行泉坐,而在rabbitmq當(dāng)中就可以通過創(chuàng)建一個(gè)exchange
交換器來創(chuàng)建和管理多個(gè)隊(duì)列为鳄,即一個(gè)exchange
下有多個(gè)隊(duì)列裳仆,并且每個(gè)隊(duì)列對(duì)應(yīng)一個(gè)消費(fèi)者腕让,當(dāng)有消息的時(shí)候,exchange
會(huì)將消息發(fā)送給自己所管理的所有隊(duì)列,此時(shí)需要設(shè)置類型為fanout
纯丸,舉例:
發(fā)布者
import pika
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='ex1', exchange_type='fanout')
# 聲明一個(gè)名為ex1的交換器偏形,發(fā)布類型為集體分發(fā)
channel.basic_publish(exchange='ex1',
routing_key='',
body="everyone on ex1 will get this msg!"
)
# 往ex1中所有隊(duì)列分發(fā)消息
connection.close()
訂閱者
import pika
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='ex1', exchange_type='fanout')
# 聲明一個(gè)名為ex1的交換器,發(fā)布類型為集體分發(fā)
result = channel.queue_declare(queue='', exclusive=True)
# 這里不定義名字觉鼻,通過exclusive=True生成一個(gè)名字不重復(fù)的隊(duì)列
queue_name = result.method.queue
# 獲取隊(duì)列名
channel.queue_bind(queue_name, exchange='ex1')
# 綁定隊(duì)列名和ex1交換器
def callback(ch, method, properties, body):
print("Received msg: {}".format(body))
channel.basic_consume(queue_name, callback, True)
# 使用隨機(jī)生成的隊(duì)列名在ex1交換器下接收消息
print('queue:{} start...'.format(queue_name))
channel.start_consuming()
注:
發(fā)布訂閱模式和生產(chǎn)消費(fèi)者模式還有一點(diǎn)不同就是:生產(chǎn)消費(fèi)者模式當(dāng)中俊扭,生產(chǎn)者產(chǎn)生的消費(fèi)只要沒被取走,那么消息就會(huì)一直留著等待被消費(fèi)者取走坠陈;而在發(fā)布訂閱模式當(dāng)中萨惑,發(fā)布者只會(huì)發(fā)布一次,發(fā)布完該消息就會(huì)被刪除仇矾,因此如果訂閱者沒有在發(fā)布者發(fā)布時(shí)接收到消息庸蔼,將永遠(yuǎn)錯(cuò)過接收的機(jī)會(huì)(就像關(guān)注公眾號(hào)以后,公眾號(hào)并不會(huì)把以前的所有歷史推送信息也給你重新再推送一遍一樣)
指定發(fā)布訂閱-direct
對(duì)于發(fā)布時(shí)贮匕,可以不給所有隊(duì)列發(fā)送消息姐仅,而是指定給哪些隊(duì)列發(fā)送,舉例:
發(fā)布者
import pika
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='ex2', exchange_type='direct')
# 聲明一個(gè)名為ex2的交換器刻盐,發(fā)布類型為指定路由分發(fā)
channel.basic_publish(exchange='ex2',
routing_key='test1',
body="only test1 will get this msg!"
)
# 往ex2中test1的路由分發(fā)消息
channel.basic_publish(exchange='ex2',
routing_key='test2',
body="only test2 will get this msg!"
)
# 往ex2中test2的路由分發(fā)消息
connection.close()
訂閱者
import pika
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='ex2', exchange_type='direct')
# 聲明一個(gè)名為ex2的交換器掏膏,接收類型為指定路由
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(queue_name, exchange='ex2', routing_key='test1')
# 綁定ex2與隊(duì)列,以及綁定對(duì)應(yīng)的路由
channel.queue_bind(queue_name, exchange='ex2', routing_key='test2')
channel.queue_bind(queue_name, exchange='ex2', routing_key='test3')
# 可以一個(gè)隊(duì)列綁定多個(gè)路由
def callback(ch, method, properties, body):
print("Received msg: {}".format(body))
channel.basic_consume(queue_name, callback, True)
print('queue:{} start...'.format(queue_name))
channel.start_consuming()
模糊匹配發(fā)布訂閱-topic
在發(fā)布訂閱時(shí)也可以通過模糊匹配路由敦锌,當(dāng)符合匹配規(guī)則的路由將會(huì)接收到消息馒疹,其中常用的通配符有*
和#
,*
后面能夠匹配一個(gè)單詞乙墙,#
后面能夠匹配一個(gè)或多個(gè)單詞(例如有a.#
和a.*
的匹配規(guī)則行冰,那么a.x
能被兩個(gè)規(guī)則都匹配到,但a.x.y
只能被a.#
匹配到)伶丐,舉例:
發(fā)布者
import pika
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='ex3', exchange_type='topic')
# 聲明一個(gè)名為ex3的交換器悼做,發(fā)布類型為指定模糊匹配路由分發(fā)
channel.basic_publish(exchange='ex3',
routing_key='test.a',
body="test.# or test.* will get this msg!"
)
channel.basic_publish(exchange='ex3',
routing_key='test.b.c',
body="only test.# will get this msg!"
)
connection.close()
訂閱者
import pika
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='ex3', exchange_type='topic')
# 聲明一個(gè)名為ex3的交換器,發(fā)布類型為模糊匹配路由接收
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(queue_name, exchange='ex3', routing_key='test.#')
# 這里因?yàn)閞outing_key='test.#'哗魂,所以test.a和test.b.c的消息都能接收到
def callback(ch, method, properties, body):
print("Received msg: {}".format(body))
channel.basic_consume(queue_name, callback, True)
print('queue:{} start...'.format(queue_name))
channel.start_consuming()
RPC
遠(yuǎn)程過程調(diào)用肛走,一個(gè)簡單的理解就是假如本地要調(diào)用一個(gè)函數(shù),而服務(wù)端已經(jīng)實(shí)現(xiàn)了該函數(shù)录别,那么可以向服務(wù)端請(qǐng)求調(diào)用該函數(shù)朽色,并把返回值返回給本地,具體參考:https://www.cnblogs.com/goldsunshine/p/8665456.html
其他操作
在pika
模塊當(dāng)中也提供了如刪除/解綁隊(duì)列组题、刪除/解綁交換器等操作葫男,舉例:
import pika
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credentials))
channel = connection.channel()
channel.exchange_delete("ex1")
# 刪除交換器
channel.queue_delete("aaa")
# 刪除隊(duì)列
connection.close()
更多關(guān)于pika操作參考:https://www.cnblogs.com/cwp-bg/p/8426188.html