消息分發(fā)策略
當有多個消費者時酒朵,RabbitMQ將會輪流的將消息發(fā)送給消費者.這種分發(fā)消息的方式稱為'round-robin'
看例子
sender.py
# encoding:utf8
__author__ = 'brianyang'
import sys
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
routing_key='hello',
body=sys.argv[1])
print 'send done'
connection.close()
receiver.py
# encoding:utf8
__author__ = 'brianyang'
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare('hello')
def callback(ch, method, prop, body):
print body
channel.basic_consume(callback, queue='hello', no_ack=True)
channel.start_consuming()
connection.close()
運行兩個消費者
shell1$ python receiver.py
shell2$ python receiver.py
發(fā)送消息
python sender.py msg1
python sender.py msg2
python sender.py msg3
python sender.py msg4
python sender.py msg5
python sender.py msg6
生產者收到的內容為
receiver1
shell1$ msg1
shell1$ msg3
shell1$ msg5
receiver2
shell1$ msg2
shell1$ msg4
shell1$ msg6
可以看到消息是以輪詢的方式發(fā)送給消費者的.
消息容錯機制
客戶端崩潰
引入ack確認機制.客戶端每成功消費一個消息狰闪,向服務器發(fā)送一個確認消息,通知服務器這條消息已經被成功消費.服務器收到消息后膨处,將這條消息從unacknowledged中移除,否則砂竖,服務器就會一直等待客戶端發(fā)來的ack消息.當客戶端出現(xiàn)異常時真椿,未消費的消息被重新放到消費隊列中.這樣避免了客戶端崩潰造成的數據丟失.
啟用ack機制的消費者代碼如下:
# encoding:utf8
__author__ = 'brianyang'
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare('hello1')
def callback(ch, method, prop, body):
print body
time.sleep(3)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(callback, queue='hello1', no_ack=False)
channel.start_consuming()
connection.close()
就是要保證no_ack為False,這也是no_ack的默認值.
這里一定要顯式的發(fā)送確認消息`ch.basic_ack(delivery_tag=method.delivery_tag)明確的告訴服務器消息被處理了.
查看隊列中的消息數量,可以使用
brianyang@brianyang-Latitude-E5440:/home/q/title/test$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello1 6 2
當消費者在消費時被強行結束時乎澄,消息并沒有丟失突硝,只要出現(xiàn)可用的消費者時,消息會被重新發(fā)送.
服務端崩潰
可以通過持久化(durable)來確保通道和消息都被保存到磁盤中進行持久化置济,但是由于從內存寫入磁盤也需要時間解恰,如果這段時間出現(xiàn)故障,則這些消息也是會丟失的.所以durable是一種弱的持久化.
持久化需要在queue和message中聲明.
sender.py
# encoding:utf8
__author__ = 'brianyang'
import sys
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello1', durable=True)
channel.basic_publish(exchange='',
routing_key='hello1',
body=sys.argv[1],
properties=pika.BasicProperties(
delivery_mode=2,
)
)
print 'send done'
connection.close()
receiver.py
# encoding:utf8
__author__ = 'brianyang'
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare('hello1', durable=True)
def callback(ch, method, prop, body):
print body
time.sleep(3)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(callback, queue='hello1', no_ack=False)
channel.start_consuming()
connection.close()
hello1被聲明為了持久化的通道舟肉,這里不能用hello命名修噪,因為之前已經存在了一個非持久化的通道hello,RabbitMQ不允許對一個已經聲明過的通道進行重定義.
生產者在發(fā)送消息時,將消息的類型定義為delivery_mode=2
,用來將消息持久化.
通過例子來看下效果,消息持久化前路媚,也就是沒有添加durable時黄琼,重啟server后消息會丟失.
添加持久化選項后,重啟server后消息沒有丟失.
RabbitMQ對新入列的消息進行分配,不會考慮消費者的狀態(tài)脏款,如果兩個消費者一個處理能力強围苫,一個處理能力弱,長時間下來就會造成一個消費者消息堆積撤师,另一個消費者相對很閑剂府,為了公平期間,可以設置每次每個消費者只分發(fā)N個任務剃盾,只有任務收到ack后腺占,才繼續(xù)分發(fā)任務.(當no_ack為True時,這個功能失效)
修改后的代碼receiver.py內容不變痒谴,sender.py修改為
sender.py
# encoding:utf8
__author__ = 'brianyang'
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare('hello1', durable=True)
def callback(ch, method, prop, body):
print body
time.sleep(3)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='hello1', no_ack=False)
channel.start_consuming()
connection.close()
prefetch_count=1
意味著每次只為消費者分配一條消息衰伯,消費者處理完成之后,才分配新的消息
exchange 發(fā)布/訂閱模型
exchange的作用好比:收發(fā)郵件時的郵件組.如果A,B在郵件組中,C不在郵件組中积蔚,那么當Z向郵件組發(fā)郵件時意鲸,只有A,B能收到,對于Z而言尽爆,并不關心郵件組里有誰怎顾,只負責向郵件組里發(fā)郵件,如果郵件組里沒人漱贱,那么郵件就會被丟棄槐雾,當Z發(fā)了100封郵件后,C加入了郵件組幅狮,那么C只能收到從第101封開始的郵件蚜退,之前的郵件是看不到的.在這里Z是發(fā)布者,A,B,C是訂閱著,郵件組是exchange.
其中P就是發(fā)布者(Z),C1,C2就是消費者(A,B,C),X就是exchange(郵件組),amq.gen-RQ6..和amq.gen_As8..就是exchange與消費者之間通信的通道(郵箱).
上代碼
receiver.py
# encoding:utf8
__author__ = 'brianyang'
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='exg', queue=queue_name)
def callback(ch, method, prop, body):
print body
time.sleep(3)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(callback, queue=queue_name, no_ack=False)
channel.start_consuming()
connection.close()
sender.py
# encoding:utf8
__author__ = 'brianyang'
import sys
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='exg', type='fanout')
channel.basic_publish(exchange='exg',
routing_key='',
body=sys.argv[1])
print 'send done'
connection.close()
下面是個例子,可以體會下與使用queue有什么不同.
直接使用channel時彪笼,消息是面對消費者的,每條消息都會等待消費者消費蚂且,而使用exchange時配猫,消息是面對exchange的,對于是否有消費者通過channel與exchange綁定是未知的杏死,exchange不會將消息保存起來等待消費者.
路由
使用type類型為fanout的exchange作為中轉時泵肄,所有的訂閱著都會收到相同的消息,假設我有兩個用戶淑翼,一個是vip,一個是普通用戶腐巢,有些消息我只想發(fā)給vip,不想讓普通用戶也收到玄括,這時候就要學習下路由功能.
首先看下代碼
sender.py
# encoding:utf8
__author__ = 'brianyang'
import sys
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='exchange', type='direct')
channel.basic_publish(exchange='exchange',
routing_key=sys.argv[1],
body=sys.argv[2])
print 'send done'
connection.close()
receiver.py
# encoding:utf8
__author__ = 'brianyang'
import pika
import time
import sys
exg_types = sys.argv[1:]
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
for exg_type in exg_types:
channel.queue_bind(exchange='exchange', queue=queue_name, routing_key=exg_type)
def callback(ch, method, prop, body):
print method.routing_key
print body
time.sleep(1)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(callback, queue=queue_name, no_ack=False)
channel.start_consuming()
connection.close()
在receiver.py中冯丙,綁定exchange與queue的操作中多了一個routing_key=exg_type
,這里routing_key就是一個路由標識,只有當sender使用basic_publish指定的routing_key等于這個routing_key時遭京,消息才會通過exchange發(fā)送到該queue中胃惜,同時要注意泞莉,聲明exchange的type也變?yōu)榱薲irect而不是之前的fanout.從字面上也很容易理解,之前的方式是廣播船殉,現(xiàn)在的方式是直連.
繼續(xù)盜圖
通過演示看下效果
更加復雜的路由
當exchange的type為direct時鲫趁,通過判斷綁定的routing_key與發(fā)送的routing_key是否相等來判斷應該將消息放入到哪個channel中.這是最簡單的一種匹配方式,設想有這樣一種場景利虫,公司給員工通過隊列發(fā)送消息挨厚,員工分為程序猿,前端喵和產品狗糠惫,同時員工又分為不同的級別:初級疫剃,中級和高級,員工又有不同的性別寞钥,公司對每種類別的員工的消息也不同慌申,例如對于初級女前端,公司的祝福語為:前端的萌妹子感謝你在公司1年的付出理郑,對于高級男程序猿蹄溉,公司的祝福語為:后端的屌絲男感謝你在公司3年的付出.
對于這種維度更加廣的路由,可以使用Topics. 使用Topics也很簡單您炉,首先將exchange的type變?yōu)閠opic.
topics支持通配符,如下:
-
*
(star) can substitute for exactly one word. 使用*表示一個單詞 -
#
(hash) can substitute for zero or more words. 使用#表示0個或多個單詞
routing_key的定義必須遵循 - 由一系列英文逗號分割的單詞組成
例如上面的高級男程序猿就可以定義為:high.man.monkey,初級女前端喵定義為:low.women.cat
如果一個channel的routing_key為 - high.# : 接收向所有的高級員工發(fā)送的信息
- high.*.monkey : 接收向所有的高級程序猿發(fā)送的信息
- ..dog : 向所有產品狗發(fā)送的信息
原諒我這么粗魯的稱呼柒爵,這只是俚語!
通過topic的方式,可以更加精準的控制路由
繼續(xù)盜圖:
代碼如下:
sender.py
# encoding:utf8
__author__ = 'brianyang'
import sys
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='exchange', type='topics')
channel.basic_publish(exchange='exchange',
routing_key=sys.argv[1],
body=sys.argv[2])
print 'send done'
connection.close()
receiver.py代碼不變赚爵,看下演示:
實現(xiàn)RPC
RabbitMQ將消息放在消息隊列中棉胀,可以方便的實現(xiàn)生產者-消費者模型.而RPC(遠程過程調用)是一種構建SOA非常關鍵的技術,即面向服務架構.服務可以分布在集群中,通過增減機器可以方便的擴展服務的處理能力.
RabbitMQ實現(xiàn)RPC的原理就是,請求服務的應用將請求參數放入到請求隊列中,同時傳遞一個回調隊列和唯一id,回調隊列用來存放服務方的計算結果,唯一id用來識別是客戶端的哪一次請求,需要保證唯一性.
服務端在請求隊列中獲取到消息后,進行計算,計算結束后將結果放入請求方給的回調隊列中,同時傳回唯一id.
盜圖
首先看下代碼
customer.py
# encoding:utf8
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare('service_queue')
def Fib(n):
if n == 1 or n == 2:
return n
return Fib(n - 1) + Fib(n - 2)
def Cal(ch, method, props, body):
num = int(body)
print 'cal {}'.format(num)
resp = Fib(num)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=props.correlation_id),
body=str(resp)
)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(Cal, queue='service_queue')
channel.start_consuming()
sender.py
# encoding:utf8
import pika
import uuid
import sys
class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='service_queue',
properties=pika.BasicProperties(
reply_to = self.callback_queue,
correlation_id = self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
num = int(sys.argv[1])
response = fibonacci_rpc.call(num)
print(" [x] Requesting fib({})".format(num))
print(" [.] Result is %r" % response)
根據官網的demo稍微修改
效果如下:
可以看到當客戶端請求計算一個比較大的數的Fib數列值的時候,客戶端和服務器都阻塞了,當另一個客戶端請求計算時,由于沒有消費者可以消費所以也阻塞了,這就好比在生產中單臺服務器提供服務遇到了瓶頸,SOA架構可以方便的擴容,在這里就是又啟了一個消費者,通過這種方法可以動態(tài)的改變集群的處理能力.
這些內容都在官方快速入門可以看到,我只是搬運工+漢化,加深印象,RabbitMQ tutorials
好吧,簡書竟然有張圖傳不上去,有興趣的來看看原文把