什么叫消息隊列
????????消息(Message)是指在應(yīng)用間傳送的數(shù)據(jù)铃岔。消息可以非常簡單啸澡,比如只包含文本字符串,也可以更復(fù)雜慰毅,可能包含嵌入對象隘截。
????????消息隊列(Message Queue)是一種應(yīng)用間的通信方式,消息發(fā)送后可以立即返回汹胃,由消息系統(tǒng)來確保消息的可靠傳遞婶芭。消息發(fā)布者只管把消息發(fā)布到 MQ 中而不用管誰來取,消息使用者只管從 MQ 中取消息而不管是誰發(fā)布的着饥。這樣發(fā)布者和使用者都不用知道對方的存在犀农。
為何用消息隊列
????????從上面的描述中可以看出消息隊列是一種應(yīng)用間的異步協(xié)作機制,那什么時候需要使用 MQ 呢宰掉?
????????以常見的訂單系統(tǒng)為例呵哨,用戶點擊【下單】按鈕之后的業(yè)務(wù)邏輯可能包括:扣減庫存、生成相應(yīng)單據(jù)轨奄、發(fā)紅包孟害、發(fā)短信通知。在業(yè)務(wù)發(fā)展初期這些邏輯可能放在一起同步執(zhí)行挪拟,隨著業(yè)務(wù)的發(fā)展訂單量增長挨务,需要提升系統(tǒng)服務(wù)的性能,這時可以將一些不需要立即生效的操作拆分出來異步執(zhí)行玉组,比如發(fā)放紅包谎柄、發(fā)短信通知等。這種場景下就可以用 MQ 惯雳,在下單的主流程(比如扣減庫存朝巫、生成相應(yīng)單據(jù))完成之后發(fā)送一條消息到 MQ 讓主流程快速完結(jié),而由另外的單獨線程拉取MQ的消息(或者由 MQ 推送消息)石景,當(dāng)發(fā)現(xiàn) MQ 中有發(fā)紅包或發(fā)短信之類的消息時劈猿,執(zhí)行相應(yīng)的業(yè)務(wù)邏輯拙吉。
????????以上是用于業(yè)務(wù)解耦的情況,其它常見場景包括最終一致性糙臼、廣播庐镐、錯峰流控等等恩商。
RabbitMQ 特點
(1)可靠性(Reliability):RabbitMQ 使用一些機制來保證可靠性变逃,如持久化、傳輸確認怠堪、發(fā)布確認揽乱。
(2)靈活的路由(Flexible Routing):在消息進入隊列之前,通過 Exchange 來路由消息的粟矿。對于典型的路由功能凰棉,RabbitMQ 已經(jīng)提供了一些內(nèi)置的 Exchange 來實現(xiàn)。針對更復(fù)雜的路由功能陌粹,可以將多個 Exchange 綁定在一起撒犀,也通過插件機制實現(xiàn)自己的 Exchange 。
(3)消息集群(Clustering):多個 RabbitMQ 服務(wù)器可以組成一個集群掏秩,形成一個邏輯 Broker 或舞。
(4)高可用(Highly Available Queues):隊列可以在集群中的機器上進行鏡像,使得在部分節(jié)點出問題的情況下隊列仍然可用蒙幻。
(5)多種協(xié)議(Multi-protocol):RabbitMQ 支持多種消息隊列協(xié)議映凳,比如 STOMP、MQTT 等等邮破。
(6)多語言客戶端(Many Clients):RabbitMQ 幾乎支持所有常用語言诈豌,比如 Java、.NET抒和、Ruby 等等矫渔。
(7)管理界面(Management UI):RabbitMQ 提供了一個易用的用戶界面,使得用戶可以監(jiān)控和管理消息 Broker 的許多方面摧莽。
(8)跟蹤機制(Tracing):如果消息異常庙洼,RabbitMQ 提供了消息跟蹤機制,使用者可以找出發(fā)生了什么范嘱。
(9)插件機制(Plugin System):RabbitMQ 提供了許多插件送膳,來從多方面進行擴展,也可以編寫自己的插件丑蛤。
RabbitMQ 中的概念模型
消息模型
????????所有 MQ 產(chǎn)品從模型抽象上來說都是一樣的過程:消費者(consumer)訂閱某個隊列叠聋。生產(chǎn)者(producer)創(chuàng)建消息,然后發(fā)布到隊列(queue)中受裹,最后將消息發(fā)送到監(jiān)聽的消費者碌补。
RabbitMQ 基本概念
????????上面只是最簡單抽象的描述虏束,具體到 RabbitMQ 則有更詳細的概念需要解釋。上面介紹過 RabbitMQ 是 AMQP 協(xié)議的一個開源實現(xiàn)厦章,所以其內(nèi)部實際上也是 AMQP 中的基本概念:
(1)Message:消息镇匀,消息是不具名的,它由消息頭和消息體組成袜啃。消息體是不透明的汗侵,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)群发、priority(相對于其他消息的優(yōu)先權(quán))晰韵、delivery-mode(指出該消息可能需要持久性存儲)等。
(2)Publisher:消息的生產(chǎn)者熟妓,也是一個向交換器發(fā)布消息的客戶端應(yīng)用程序
(3)Exchange:交換器雪猪,用來接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務(wù)器中的隊列
(4)Binding:綁定,用于消息隊列和交換器之間的關(guān)聯(lián)起愈。一個綁定就是基于路由鍵將交換器和消息隊列連接起來的路由規(guī)則只恨,所以可以將交換器理解成一個由綁定構(gòu)成的路由表
(5)Queue:消息隊列,用來保存消息直到發(fā)送給消費者抬虽。它是消息的容器官觅,也是消息的終點。一個消息可投入一個或多個隊列斥赋。消息一直在隊列里面缰猴,等待消費者連接到這個隊列將其取走
(6)Connection:網(wǎng)絡(luò)連接,比如一個TCP連接
(7)Channel:信道疤剑,多路復(fù)用連接中的一條獨立的雙向數(shù)據(jù)流通道滑绒。信道是建立在真實的TCP連接內(nèi)地虛擬連接,AMQP 命令都是通過信道發(fā)出去的隘膘,不管是發(fā)布消息疑故、訂閱隊列還是接收消息,這些動作都是通過信道完成弯菊。因為對于操作系統(tǒng)來說建立和銷毀 TCP 都是非常昂貴的開銷纵势,所以引入了信道的概念,以復(fù)用一條 TCP 連接管钳。
(8)Consumer:消息的消費者钦铁,表示一個從消息隊列中取得消息的客戶端應(yīng)用程序。
(9)Virtual Host:虛擬主機才漆,表示一批交換器牛曹、消息隊列和相關(guān)對象。虛擬主機是共享相同的身份認證和加密環(huán)境的獨立服務(wù)器域醇滥。每個 vhost 本質(zhì)上就是一個 mini 版的 RabbitMQ 服務(wù)器黎比,擁有自己的隊列超营、交換器、綁定和權(quán)限機制阅虫。vhost 是 AMQP 概念的基礎(chǔ)演闭,必須在連接時指定,RabbitMQ 默認的 vhost 是/ (每個virtual host本質(zhì)上都是一個RabbitMQ Server颓帝,擁有它自己的queue米碰,exchagne,和bings rule等等躲履。這保證了你可以在多個不同的application中使用RabbitMQ见间。)
(10)Broker:表示消息隊列服務(wù)器實體聊闯。
AMQP 中的消息路由
????????AMQP 中消息的路由過程和 Java 開發(fā)者熟悉的 JMS 存在一些差別工猜,AMQP 中增加了 Exchange 和 Binding 的角色。生產(chǎn)者把消息發(fā)布到 Exchange 上菱蔬,消息最終到達隊列并被消費者接收篷帅,而 Binding 決定交換器的消息應(yīng)該發(fā)送到那個隊列。
Exchange 類型
????????Exchange分發(fā)消息時根據(jù)類型的不同分發(fā)策略有區(qū)別拴泌,目前共四種類型:direct魏身、fanout、topic蚪腐、headers 箭昵。headers 匹配 AMQP 消息的 header 而不是路由鍵,此外 headers 交換器和 direct 交換器完全一致回季,但性能差很多家制,目前幾乎用不到了,所以直接看另外三種類型:
(1)direct
????????消息中的路由鍵(routing key)如果和 Binding 中的 binding key 一致泡一, 交換器就將消息發(fā)到對應(yīng)的隊列中颤殴。路由鍵與隊列名完全匹配,如果一個隊列綁定到交換機要求路由鍵為“dog”鼻忠,則只轉(zhuǎn)發(fā) routing key 標(biāo)記為“dog”的消息涵但,不會轉(zhuǎn)發(fā)“dog.puppy”,也不會轉(zhuǎn)發(fā)“dog.guard”等等帖蔓。它是完全匹配矮瘟、單播的模式。
????????如果 routing key 匹配, 那么Message就會被傳遞到相應(yīng)的queue中塑娇。其實在queue創(chuàng)建時澈侠,它會自動的以queue的名字作為routing key來綁定那個exchange
????????Direct exchange的路由算法非常簡單:通過binding key的完全匹配,可以通過下圖來說明
????????exchange X和兩個queue綁定在一起钝吮。Q1的binding key是orange埋涧。Q2的binding key是black和green板辽。當(dāng)P publish key是orange時,exchange會把它放到Q1棘催。如果是black或者green那么就會到Q2劲弦。其余的Message都會被丟棄。
(2)fanout
????????每個發(fā)到 fanout 類型交換器的消息都會分到所有綁定的隊列上去醇坝。fanout 交換器不處理路由鍵邑跪,只是簡單的將隊列綁定到交換器上,每個發(fā)送到交換器的消息都會被轉(zhuǎn)發(fā)到與該交換器綁定的所有隊列上呼猪。很像子網(wǎng)廣播画畅,每臺子網(wǎng)內(nèi)的主機都獲得了一份復(fù)制的消息。fanout 類型轉(zhuǎn)發(fā)消息是最快的宋距。
(3)topic
????????topic 交換器通過模式匹配分配消息的路由鍵屬性轴踱,將路由鍵和某個模式進行匹配,此時隊列需要綁定到一個模式上谚赎。它將路由鍵和綁定鍵的字符串切分成單詞淫僻,這些單詞之間用點隔開。它同樣也會識別兩個通配符:符號“#”和符號“”壶唤。#匹配0個或多個單詞雳灵,匹配不多不少一個單詞。
基本示例
發(fā)送端 producer
import ?pika
# 建立一個實例
connection = pika.BlockingConnection(
????????pika.ConnectionParameters('localhost',5672)# 默認端口5672闸盔,可不寫)
?# 聲明一個管道悯辙,在管道里發(fā)消息
channel = connection.channel()
# 在管道里聲明queue
channel.queue_declare(queue='hello’)
# RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
????????????????????????????????????????????routing_key='hello',# queue名字
????????????????????????????????????????????body='Hello World!')# 消息內(nèi)容
print(" [x] Sent 'Hello World!'")
connection.close(). # 隊列關(guān)閉
接收端 consumer
import pika
import time
# 建立實例
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost’))
# 聲明管道
channel = connection.channel()
# 為什么又聲明了一個‘hello’隊列?# 如果確定已經(jīng)聲明了迎吵,可以不聲明躲撰。但是你不知道那個機器先運行,所以要聲明兩次钓觉。
channel.queue_declare(queue='hello’)
def ?callback(ch, method, properties, body):
# 四個參數(shù)為標(biāo)準(zhǔn)格式
? ??????print(ch, method, properties)# 打印看一下是什么# 管道內(nèi)存對象 內(nèi)容相關(guān)信息 后面講
? ??????print(" [x] Received %r"% body)
????????time.sleep(15)
????????ch.basic_ack(delivery_tag = method.delivery_tag)# 告訴生成者茴肥,消息處理完成
channel.basic_consume(# 消費消息
????????????callback,# 如果收到消息,就調(diào)用callback函數(shù)來處理消息
????????????queue='hello',# 你要從那個隊列里收消息#?
? ? ? ? ? ? #no_ack=True? # 寫的話荡灾,如果接收消息瓤狐,機器宕機消息就丟了# 一般不寫。宕機則生產(chǎn)者檢測到發(fā)給其他消費者)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()# 開始消費消息
RabbitMQ 消息分發(fā)輪詢
(1)上面的只是一個生產(chǎn)者批幌、一個消費者础锐,能不能一個生產(chǎn)者多個消費者呢?可以上面的例子荧缘,多啟動幾個消費者consumer皆警,看一下消息的接收情況。采用輪詢機制截粗;把消息依次分發(fā)
(2)假如消費者處理消息需要15秒信姓,如果當(dāng)機了鸵隧,那這個消息處理明顯還沒處理完,怎么處理意推?(可以模擬消費端斷了豆瘫,分別注釋和不注釋 no_ack=True 看一下)你沒給我回復(fù)確認,就代表消息沒處理完菊值。
(3)上面的效果消費端斷了就轉(zhuǎn)到另外一個消費端去了外驱,但是生產(chǎn)者怎么知道消費端斷了呢?因為生產(chǎn)者和消費者是通過socket連接的腻窒,socket斷了昵宇,就說明消費端斷開了。
(4)上面的模式只是依次分發(fā)儿子,實際情況是機器配置不一樣瓦哎。怎么設(shè)置類似權(quán)重的操作?RabbitMQ怎么辦呢典徊,RabbitMQ做了簡單的處理就能實現(xiàn)公平的分發(fā)杭煎。就是RabbitMQ給消費者發(fā)消息的時候檢測下消費者里的消息數(shù)量,如果超過指定值(比如1條)卒落,就不給你發(fā)了。
只需要在消費者端蜂桶,channel.basic_consume前加上就可以了儡毕。
????????channel.basic_qos(prefetch_count=1)# 類似權(quán)重,按能力分發(fā)扑媚,如果有一個消息腰湾,就不在給你發(fā) ? ? ? ? ? ? ? channel.basic_consume(# 消費消息
RabbitMQ 消息持久化(durable、properties)
rabbitmqctl list_queues# 查看當(dāng)前queue數(shù)量及queue里消息數(shù)量
消息持久化
如果隊列里還有消息疆股,RabbitMQ 服務(wù)端宕機了呢费坊?消息還在不在?把RabbitMQ服務(wù)重啟旬痹,看一下消息在不在附井。上面的情況下,宕機了两残,消息就久了永毅,下面看看如何把消息持久化。
每次聲明隊列的時候人弓,都加上durable沼死,注意每個隊列都得寫,客戶端崔赌、服務(wù)端聲明的時候都得寫意蛀。
# 在管道里聲明
queuechannel.queue_declare(queue='hello2', durable=True)
測試結(jié)果發(fā)現(xiàn)耸别,只是把隊列持久化了,但是隊列里的消息沒了县钥。durable的作用只是把隊列持久化太雨。離消息持久話還差一步:發(fā)送端發(fā)送消息時,加上properties
properties=pika.BasicProperties(
delivery_mode=2,# 消息持久化)
發(fā)送端 producer
importpika
connection = pika.BlockingConnection(pika.ConnectionParameters(
????????????????????????????'localhost',5672))# 默認端口5672魁蒜,可不寫
channel = connection.channel()
#聲明queue
channel.queue_declare(queue='hello2', durable=True)# 若聲明過囊扳,則換一個名字
#n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
????????????routing_key='hello2',
????????????body='Hello World!',
????????????properties=pika.BasicProperties(
????????????????delivery_mode=2,# make message persistent)
)
print(" [x] Sent 'Hello World!'")
connection.close()
接收端 consumer
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello2', durable=True)
def ?callback(ch, method, properties, body):
????????print(" [x] Received %r"% body)
????????time.sleep(10)
????????ch.basic_ack(delivery_tag = method.delivery_tag)# 告訴生產(chǎn)者,消息處理完成
channel.basic_qos(prefetch_count=1)# 類似權(quán)重兜看,按能力分發(fā)锥咸,如果有一個消息,就不在給你發(fā)channel.basic_consume(# 消費消息
????????????????????????callback,# 如果收到消息细移,就調(diào)用callback
????????????????????????queue='hello2’,
????????????????????????# no_ack=True? # 一般不寫搏予,處理完接收處理結(jié)果。宕機則發(fā)給其他消費者)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
RabbitMQ消息隊列(三):任務(wù)分發(fā)機制
????????當(dāng)有Consumer需要大量的運算時弧轧,RabbitMQ Server需要一定的分發(fā)機制來balance每個Consumer的load雪侥。試想一下,對于web application來說精绎,在一個很多的HTTP request里是沒有時間來處理復(fù)雜的運算的速缨,只能通過后臺的一些工作線程來完成。接下來我們分布講解代乃。
????????默認情況下旬牲,RabbitMQ 會順序的分發(fā)每個Message。當(dāng)每個收到ack后搁吓,會將該Message刪除原茅,然后將下一個Message分發(fā)到下一個Consumer。這種分發(fā)方式叫做round-robin堕仔。這種分發(fā)還有問題擂橘,接著向下讀吧。
????????每個Consumer可能需要一段時間才能處理完收到的數(shù)據(jù)摩骨。如果在這個過程中通贞,Consumer出錯了,異常退出了仿吞,而數(shù)據(jù)還沒有處理完成滑频,那么非常不幸,這段數(shù)據(jù)就丟失了唤冈。因為我們采用no-ack的方式進行確認峡迷,也就是說,每次Consumer接到數(shù)據(jù)后,而不管是否處理完成绘搞,RabbitMQ Server會立即把這個Message標(biāo)記為完成彤避,然后從queue中刪除了。
????????如果一個Consumer異常退出了夯辖,它處理的數(shù)據(jù)能夠被另外的Consumer處理琉预,這樣數(shù)據(jù)在這種情況下就不會丟失了。為了保證數(shù)據(jù)不被丟失蒿褂,RabbitMQ支持消息確認機制圆米,即acknowledgments。為了保證數(shù)據(jù)能被正確處理而不僅僅是被Consumer收到啄栓,那么我們不能采用no-ack娄帖。而應(yīng)該是在處理完數(shù)據(jù)后發(fā)送ack。(在處理數(shù)據(jù)后發(fā)送的ack昙楚,就是告訴RabbitMQ數(shù)據(jù)已經(jīng)被接收近速,處理完成,RabbitMQ可以去安全的刪除它了堪旧,如果Consumer退出了但是沒有發(fā)送ack削葱,那么RabbitMQ就會把這個Message發(fā)送到下一個Consumer。這樣就保證了在Consumer異常退出的情況下數(shù)據(jù)也不會丟失)
Message durability消息持久化
????????為了保證在RabbitMQ退出或者crash了數(shù)據(jù)仍沒有丟失淳梦,需要將queue和Message都要持久化析砸。queue的持久化需要在聲明時指定durable=True:
????????????channel.queue_declare(queue='hello',?durable=True)
需要持久化Message,即在Publish的時候指定一個properties:
????????????channel.basic_publish(exchange='',
????????????????????????????????????????????????????routing_key="task_queue",
????????????????????????????????????????????????????body=message,
????????????????????????????????????????????????????properties=pika.BasicProperties(
????????????????????????????????????????????????????????????delivery_mode?=2,#?make?message?persistent
????????????????????????????????))
(RabbitMQ需要時間去把這些信息存到磁盤上谭跨,這個time window雖然短干厚,但是它的確還是有。在這個時間窗口內(nèi)如果數(shù)據(jù)沒有保存螃宙,數(shù)據(jù)還會丟失。還有另一個原因就是RabbitMQ并不是為每個Message都做fsync:它可能僅僅是把它保存到Cache里所坯,還沒來得及保存到物理磁盤上谆扎。
因此這個持久化還是有問題。但是對于大多數(shù)應(yīng)用來說芹助,這已經(jīng)足夠了堂湖。當(dāng)然為了保持一致性,你可以把每次的publish放到一個transaction中状土。這個transaction的實現(xiàn)需要user defined codes)
Fair dispatch 公平分發(fā)
????????默認狀態(tài)下无蜂,RabbitMQ將第n個Message分發(fā)給第n個Consumer。當(dāng)然n是取余后的蒙谓。它不管Consumer是否還有unacked Message斥季,只是按照這個默認機制進行分發(fā)。那么如果有個Consumer工作比較重,那么就會導(dǎo)致有的Consumer基本沒事可做酣倾,有的Consumer卻是毫無休息的機會舵揭。那么,RabbitMQ是如何處理這種問題呢躁锡?
????????通過basic.qos方法設(shè)置prefetch_count=1午绳。這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message。換句話說映之,在接收到該Consumer的ack前拦焚,他它不會將新的Message分發(fā)給它。 設(shè)置方法如下:
????????????channel.basic_qos(prefetch_count=1)
一般情況下杠输,我們只在消息分發(fā)的時候會去聲明channel.exchange_declare络凿。作為好的習(xí)慣,在producer和consumer中分別聲明一次以保證所要使用的exchange存在
channel.exchange_declare(exchange='logs',
type='fanout')