RabbitMQ淺讀

消息分發(fā)策略

當有多個消費者時酒朵,RabbitMQ將會輪流的將消息發(fā)送給消費者.這種分發(fā)消息的方式稱為'round-robin'
看例子
sender.py

# encoding:utf8
__author__ = 'brianyang'

import sys
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=sys.argv[1])

print 'send done'
connection.close()

receiver.py

# encoding:utf8
__author__ = 'brianyang'

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare('hello')


def callback(ch, method, prop, body):
    print body


channel.basic_consume(callback, queue='hello', no_ack=True)
channel.start_consuming()
connection.close()

運行兩個消費者

shell1$ python receiver.py
shell2$ python receiver.py

發(fā)送消息

python sender.py msg1
python sender.py msg2
python sender.py msg3
python sender.py msg4
python sender.py msg5
python sender.py msg6

生產者收到的內容為
receiver1

shell1$ msg1
shell1$ msg3
shell1$ msg5

receiver2

shell1$ msg2
shell1$ msg4
shell1$ msg6


可以看到消息是以輪詢的方式發(fā)送給消費者的.

消息容錯機制

客戶端崩潰

引入ack確認機制.客戶端每成功消費一個消息狰闪,向服務器發(fā)送一個確認消息,通知服務器這條消息已經被成功消費.服務器收到消息后膨处,將這條消息從unacknowledged中移除,否則砂竖,服務器就會一直等待客戶端發(fā)來的ack消息.當客戶端出現(xiàn)異常時真椿,未消費的消息被重新放到消費隊列中.這樣避免了客戶端崩潰造成的數據丟失.
啟用ack機制的消費者代碼如下:

# encoding:utf8
__author__ = 'brianyang'

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare('hello1')


def callback(ch, method, prop, body):
    print body
    time.sleep(3)
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_consume(callback, queue='hello1', no_ack=False)
channel.start_consuming()
connection.close()

就是要保證no_ack為False,這也是no_ack的默認值.
這里一定要顯式的發(fā)送確認消息`ch.basic_ack(delivery_tag=method.delivery_tag)明確的告訴服務器消息被處理了.
查看隊列中的消息數量,可以使用

brianyang@brianyang-Latitude-E5440:/home/q/title/test$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello1  6   2

當消費者在消費時被強行結束時乎澄,消息并沒有丟失突硝,只要出現(xiàn)可用的消費者時,消息會被重新發(fā)送.

服務端崩潰

可以通過持久化(durable)來確保通道和消息都被保存到磁盤中進行持久化置济,但是由于從內存寫入磁盤也需要時間解恰,如果這段時間出現(xiàn)故障,則這些消息也是會丟失的.所以durable是一種弱的持久化.
持久化需要在queue和message中聲明.
sender.py

# encoding:utf8
__author__ = 'brianyang'

import sys
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='hello1', durable=True)

channel.basic_publish(exchange='',
                      routing_key='hello1',
                      body=sys.argv[1],
                      properties=pika.BasicProperties(
                          delivery_mode=2,
                      )
)

print 'send done'
connection.close()

receiver.py

# encoding:utf8
__author__ = 'brianyang'

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare('hello1', durable=True)


def callback(ch, method, prop, body):
    print body
    time.sleep(3)
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_consume(callback, queue='hello1', no_ack=False)
channel.start_consuming()
connection.close()

hello1被聲明為了持久化的通道舟肉,這里不能用hello命名修噪,因為之前已經存在了一個非持久化的通道hello,RabbitMQ不允許對一個已經聲明過的通道進行重定義.
生產者在發(fā)送消息時,將消息的類型定義為delivery_mode=2,用來將消息持久化.
通過例子來看下效果,消息持久化前路媚,也就是沒有添加durable時黄琼,重啟server后消息會丟失.

非持久化

添加持久化選項后,重啟server后消息沒有丟失.
持久化

RabbitMQ對新入列的消息進行分配,不會考慮消費者的狀態(tài)脏款,如果兩個消費者一個處理能力強围苫,一個處理能力弱,長時間下來就會造成一個消費者消息堆積撤师,另一個消費者相對很閑剂府,為了公平期間,可以設置每次每個消費者只分發(fā)N個任務剃盾,只有任務收到ack后腺占,才繼續(xù)分發(fā)任務.(當no_ack為True時,這個功能失效)
修改后的代碼receiver.py內容不變痒谴,sender.py修改為
sender.py

# encoding:utf8
__author__ = 'brianyang'

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare('hello1', durable=True)


def callback(ch, method, prop, body):
    print body
    time.sleep(3)
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='hello1', no_ack=False)
channel.start_consuming()
connection.close()

prefetch_count=1意味著每次只為消費者分配一條消息衰伯,消費者處理完成之后,才分配新的消息

exchange 發(fā)布/訂閱模型

exchange的作用好比:收發(fā)郵件時的郵件組.如果A,B在郵件組中,C不在郵件組中积蔚,那么當Z向郵件組發(fā)郵件時意鲸,只有A,B能收到,對于Z而言尽爆,并不關心郵件組里有誰怎顾,只負責向郵件組里發(fā)郵件,如果郵件組里沒人漱贱,那么郵件就會被丟棄槐雾,當Z發(fā)了100封郵件后,C加入了郵件組幅狮,那么C只能收到從第101封開始的郵件蚜退,之前的郵件是看不到的.在這里Z是發(fā)布者,A,B,C是訂閱著,郵件組是exchange.


盜圖一張

其中P就是發(fā)布者(Z),C1,C2就是消費者(A,B,C),X就是exchange(郵件組),amq.gen-RQ6..和amq.gen_As8..就是exchange與消費者之間通信的通道(郵箱).
上代碼
receiver.py

# encoding:utf8
__author__ = 'brianyang'

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='exg', queue=queue_name)


def callback(ch, method, prop, body):
    print body
    time.sleep(3)
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_consume(callback, queue=queue_name, no_ack=False)
channel.start_consuming()
connection.close()

sender.py

# encoding:utf8
__author__ = 'brianyang'

import sys
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.exchange_declare(exchange='exg', type='fanout')

channel.basic_publish(exchange='exg',
                      routing_key='',
                      body=sys.argv[1])

print 'send done'
connection.close()

下面是個例子,可以體會下與使用queue有什么不同.


exchange

直接使用channel時彪笼,消息是面對消費者的,每條消息都會等待消費者消費蚂且,而使用exchange時配猫,消息是面對exchange的,對于是否有消費者通過channel與exchange綁定是未知的杏死,exchange不會將消息保存起來等待消費者.

路由

使用type類型為fanout的exchange作為中轉時泵肄,所有的訂閱著都會收到相同的消息,假設我有兩個用戶淑翼,一個是vip,一個是普通用戶腐巢,有些消息我只想發(fā)給vip,不想讓普通用戶也收到玄括,這時候就要學習下路由功能.

首先看下代碼
sender.py

# encoding:utf8
__author__ = 'brianyang'

import sys
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.exchange_declare(exchange='exchange', type='direct')

channel.basic_publish(exchange='exchange',
                      routing_key=sys.argv[1],
                      body=sys.argv[2])

print 'send done'
connection.close()

receiver.py

# encoding:utf8
__author__ = 'brianyang'

import pika
import time
import sys

exg_types = sys.argv[1:]

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
for exg_type in exg_types:
    channel.queue_bind(exchange='exchange', queue=queue_name, routing_key=exg_type)


def callback(ch, method, prop, body):
    print method.routing_key
    print body
    time.sleep(1)
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_consume(callback, queue=queue_name, no_ack=False)
channel.start_consuming()
connection.close()

在receiver.py中冯丙,綁定exchange與queue的操作中多了一個routing_key=exg_type,這里routing_key就是一個路由標識,只有當sender使用basic_publish指定的routing_key等于這個routing_key時遭京,消息才會通過exchange發(fā)送到該queue中胃惜,同時要注意泞莉,聲明exchange的type也變?yōu)榱薲irect而不是之前的fanout.從字面上也很容易理解,之前的方式是廣播船殉,現(xiàn)在的方式是直連.
繼續(xù)盜圖

routing_pic

通過演示看下效果
routing

更加復雜的路由

當exchange的type為direct時鲫趁,通過判斷綁定的routing_key與發(fā)送的routing_key是否相等來判斷應該將消息放入到哪個channel中.這是最簡單的一種匹配方式,設想有這樣一種場景利虫,公司給員工通過隊列發(fā)送消息挨厚,員工分為程序猿,前端喵和產品狗糠惫,同時員工又分為不同的級別:初級疫剃,中級和高級,員工又有不同的性別寞钥,公司對每種類別的員工的消息也不同慌申,例如對于初級女前端,公司的祝福語為:前端的萌妹子感謝你在公司1年的付出理郑,對于高級男程序猿蹄溉,公司的祝福語為:后端的屌絲男感謝你在公司3年的付出.
對于這種維度更加廣的路由,可以使用Topics. 使用Topics也很簡單您炉,首先將exchange的type變?yōu)閠opic.
topics支持通配符,如下:

  • * (star) can substitute for exactly one word. 使用*表示一個單詞
  • # (hash) can substitute for zero or more words. 使用#表示0個或多個單詞
    routing_key的定義必須遵循 - 由一系列英文逗號分割的單詞組成
    例如上面的高級男程序猿就可以定義為:high.man.monkey,初級女前端喵定義為:low.women.cat
    如果一個channel的routing_key為
  • high.# : 接收向所有的高級員工發(fā)送的信息
  • high.*.monkey : 接收向所有的高級程序猿發(fā)送的信息
  • ..dog : 向所有產品狗發(fā)送的信息

原諒我這么粗魯的稱呼柒爵,這只是俚語!

通過topic的方式,可以更加精準的控制路由
繼續(xù)盜圖:


topic

代碼如下:
sender.py

# encoding:utf8
__author__ = 'brianyang'

import sys
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.exchange_declare(exchange='exchange', type='topics')

channel.basic_publish(exchange='exchange',
                      routing_key=sys.argv[1],
                      body=sys.argv[2])

print 'send done'
connection.close()

receiver.py代碼不變赚爵,看下演示:


topic

實現(xiàn)RPC

RabbitMQ將消息放在消息隊列中棉胀,可以方便的實現(xiàn)生產者-消費者模型.而RPC(遠程過程調用)是一種構建SOA非常關鍵的技術,即面向服務架構.服務可以分布在集群中,通過增減機器可以方便的擴展服務的處理能力.
RabbitMQ實現(xiàn)RPC的原理就是,請求服務的應用將請求參數放入到請求隊列中,同時傳遞一個回調隊列和唯一id,回調隊列用來存放服務方的計算結果,唯一id用來識別是客戶端的哪一次請求,需要保證唯一性.
服務端在請求隊列中獲取到消息后,進行計算,計算結束后將結果放入請求方給的回調隊列中,同時傳回唯一id.
盜圖


RPC_pic

首先看下代碼
customer.py

# encoding:utf8

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
channel.queue_declare('service_queue')


def Fib(n):
    if n == 1 or n == 2:
        return n
    return Fib(n - 1) + Fib(n - 2)


def Cal(ch, method, props, body):
    num = int(body)
    print 'cal {}'.format(num)
    resp = Fib(num)
    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id=props.correlation_id),
                     body=str(resp)
    )
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(Cal, queue='service_queue')
channel.start_consuming()

sender.py

# encoding:utf8

import pika
import uuid
import sys

class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='service_queue',
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
                                         ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

num = int(sys.argv[1])
response = fibonacci_rpc.call(num)
print(" [x] Requesting fib({})".format(num))
print(" [.] Result is %r" % response)

根據官網的demo稍微修改
效果如下:

RPC

可以看到當客戶端請求計算一個比較大的數的Fib數列值的時候,客戶端和服務器都阻塞了,當另一個客戶端請求計算時,由于沒有消費者可以消費所以也阻塞了,這就好比在生產中單臺服務器提供服務遇到了瓶頸,SOA架構可以方便的擴容,在這里就是又啟了一個消費者,通過這種方法可以動態(tài)的改變集群的處理能力.
這些內容都在官方快速入門可以看到,我只是搬運工+漢化,加深印象,RabbitMQ tutorials

好吧,簡書竟然有張圖傳不上去,有興趣的來看看原文

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市冀膝,隨后出現(xiàn)的幾起案子唁奢,更是在濱河造成了極大的恐慌,老刑警劉巖窝剖,帶你破解...
    沈念sama閱讀 217,084評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件麻掸,死亡現(xiàn)場離奇詭異,居然都是意外死亡赐纱,警方通過查閱死者的電腦和手機脊奋,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,623評論 3 392
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來疙描,“玉大人诚隙,你說我怎么就攤上這事∑鹨龋” “怎么了久又?”我有些...
    開封第一講書人閱讀 163,450評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長。 經常有香客問我籽孙,道長烈评,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,322評論 1 293
  • 正文 為了忘掉前任犯建,我火速辦了婚禮讲冠,結果婚禮上,老公的妹妹穿的比我還像新娘适瓦。我一直安慰自己竿开,他們只是感情好,可當我...
    茶點故事閱讀 67,370評論 6 390
  • 文/花漫 我一把揭開白布玻熙。 她就那樣靜靜地躺著否彩,像睡著了一般。 火紅的嫁衣襯著肌膚如雪嗦随。 梳的紋絲不亂的頭發(fā)上列荔,一...
    開封第一講書人閱讀 51,274評論 1 300
  • 那天,我揣著相機與錄音枚尼,去河邊找鬼贴浙。 笑死,一個胖子當著我的面吹牛署恍,可吹牛的內容都是我干的崎溃。 我是一名探鬼主播,決...
    沈念sama閱讀 40,126評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼盯质,長吁一口氣:“原來是場噩夢啊……” “哼袁串!你這毒婦竟也來了?” 一聲冷哼從身側響起呼巷,我...
    開封第一講書人閱讀 38,980評論 0 275
  • 序言:老撾萬榮一對情侶失蹤囱修,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后王悍,有當地人在樹林里發(fā)現(xiàn)了一具尸體蔚袍,經...
    沈念sama閱讀 45,414評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,599評論 3 334
  • 正文 我和宋清朗相戀三年配名,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片晋辆。...
    茶點故事閱讀 39,773評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡渠脉,死狀恐怖,靈堂內的尸體忽然破棺而出瓶佳,到底是詐尸還是另有隱情芋膘,我是刑警寧澤,帶...
    沈念sama閱讀 35,470評論 5 344
  • 正文 年R本政府宣布,位于F島的核電站为朋,受9級特大地震影響臂拓,放射性物質發(fā)生泄漏。R本人自食惡果不足惜习寸,卻給世界環(huán)境...
    茶點故事閱讀 41,080評論 3 327
  • 文/蒙蒙 一胶惰、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧霞溪,春花似錦孵滞、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,713評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至殴蓬,卻和暖如春匿级,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背染厅。 一陣腳步聲響...
    開封第一講書人閱讀 32,852評論 1 269
  • 我被黑心中介騙來泰國打工痘绎, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人糟秘。 一個月前我還...
    沈念sama閱讀 47,865評論 2 370
  • 正文 我出身青樓简逮,卻偏偏與公主長得像,于是被迫代替她去往敵國和親尿赚。 傳聞我的和親對象是個殘疾皇子散庶,可洞房花燭夜當晚...
    茶點故事閱讀 44,689評論 2 354

推薦閱讀更多精彩內容

  • 來源 RabbitMQ是用Erlang實現(xiàn)的一個高并發(fā)高可靠AMQP消息隊列服務器。支持消息的持久化凌净、事務悲龟、擁塞控...
    jiangmo閱讀 10,357評論 2 34
  • RabbitMQ 即一個消息隊列,主要是用來實現(xiàn)應用程序的異步和解耦冰寻,同時也能起到消息緩沖须教,消息分發(fā)的作用。 消息...
    彩虹之夢閱讀 1,086評論 2 1
  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理斩芭,服務發(fā)現(xiàn)轻腺,斷路器,智...
    卡卡羅2017閱讀 134,654評論 18 139
  • 1. 歷史 RabbitMQ是一個由erlang開發(fā)的AMQP(Advanced Message Queue )的...
    高廣超閱讀 6,096評論 3 51
  • rabbitMQ是一款基于AMQP協(xié)議的消息中間件划乖,它能夠在應用之間提供可靠的消息傳輸贬养。在易用性,擴展性琴庵,高可用性...
    點融黑幫閱讀 2,997評論 3 41