什么是消息隊列
消息隊列(Message Queue)是一種進程間通信或同一進程的不同線程間的通信方式肚邢。
什么時候需要消息隊列
異步處理:例如短信通知谬运、終端狀態(tài)推送离福、App推送溅呢、用戶注冊等
有些業(yè)務不想也不需要立即處理消息澡屡。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列咐旧,但并不立即處理它驶鹉。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們铣墨。數(shù)據(jù)同步:業(yè)務數(shù)據(jù)推送同步
重試補償:記賬失敗重試
系統(tǒng)解耦:通訊上下行室埋、終端異常監(jiān)控、分布式事件中心
降低工程間的強依賴程度伊约,針對異構系統(tǒng)進行適配姚淆。在項目啟動之初來預測將來項目會碰到什么需求,是極其困難的屡律。通過消息系統(tǒng)在處理過程中間插入了一個隱含的肉盹、基于數(shù)據(jù)的接口層,兩邊的處理過程都要實現(xiàn)這一接口疹尾,當應用發(fā)生變化時,可以獨立的擴展或修改兩邊的處理過程骤肛,只要確保它們遵守同樣的接口約束
流量消峰:秒殺場景下的下單處理
在訪問量劇增的情況下纳本,應用仍然需要繼續(xù)發(fā)揮作用,但是這樣的突發(fā)流量無法提取預知腋颠;如果以為了能處理這類瞬間峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費繁成。使用消息隊列能夠使關鍵組件頂住突發(fā)的訪問壓力,而不會因為突發(fā)的超負荷的請求而完全崩潰淑玫。發(fā)布訂閱:HSF的服務狀態(tài)變化通知巾腕、分布式事件中心
數(shù)據(jù)流處理:日志服務、監(jiān)控上報
分布式系統(tǒng)產(chǎn)生的海量數(shù)據(jù)流絮蒿,如:業(yè)務日志尊搬、監(jiān)控數(shù)據(jù)、用戶行為等土涝,針對這些數(shù)據(jù)流進行實時或批量采集匯總佛寿,然后進行大數(shù)據(jù)分析是當前互聯(lián)網(wǎng)的必備技術,通過消息隊列完成此類數(shù)據(jù)收集是最好的選擇分布式事務
RPC調用
消息隊列核心概念
Broker(消息服務器)
Broker的概念來自與Apache ActiveMQ但壮,通俗的講就是MQ的服務器冀泻。
Producer(生產(chǎn)者)
業(yè)務的發(fā)起方常侣,負責生產(chǎn)消息傳輸給broker
Consumer(消費者)
業(yè)務的處理方,負責從broker獲取消息并進行業(yè)務邏輯處理
Topic(主題)
發(fā)布訂閱模式下的消息統(tǒng)一匯集地弹渔,不同生產(chǎn)者向topic發(fā)送消息胳施,由MQ服務器分發(fā)到不同的訂閱 者,實現(xiàn)消息的廣播
Queue(隊列)
PTP模式下肢专,特定生產(chǎn)者向特定queue發(fā)送消息舞肆,消費者訂閱特定的queue完成指定消息的接收。
本地隊列
本地隊列按照功能可劃分為初始化隊列鸟召,傳輸隊列胆绊,目標隊列和死信隊列。
初始化隊列:用作消息觸發(fā)功能欧募。
傳輸隊列:是暫存待傳的消息压状,條件許可的情況下,通過管道將消息傳送到其他的隊列管理器跟继。
目標隊列:是消息的目的地种冬,可以長期存放消息。
死信隊列:如果消息不能送達目標隊列舔糖,也不能再路由出去娱两,則被自動放入死信隊列保存。別名隊列&遠程隊列
是一個隊列定義金吗,用來指定遠端隊列管理器的隊列十兢。使用了遠程隊列,程序就不需要知道目標隊列的位置摇庙。模型隊列
模型隊列定義了一套本地隊列的屬性結合旱物,一旦打開模型隊列,隊列管理器會按照這些屬性動態(tài)地創(chuàng)建出一個本地隊列卫袒。
Message(消息體)
根據(jù)不同通信協(xié)議定義的固定格式進行編碼的數(shù)據(jù)包宵呛,來封裝業(yè)務數(shù)據(jù),實現(xiàn)消息的傳輸
消息模式
PTP點對點
點對點模型用于消息生產(chǎn)者和消息消費者之間點到點的通信夕凝。
點對點模式包含三個角色:
- 消息隊列(Queue)
- 發(fā)送者(Sender)
- 接收者(Receiver)
每個消息都被發(fā)送到一個特定的隊列宝穗,接收者從隊列中獲取消息。隊列保留著消息码秉,可以放在內(nèi)存 中也可以持久化逮矛,直到他們被消費或超時。
特點:
- 每個消息只有一個消費者(Consumer)(即一旦被消費泡徙,消息就不再在消息隊列中)
- 發(fā)送者和接收者之間在時間上沒有依賴性
- 接收者在成功接收消息之后需向隊列應答成功
- 利用FIFO先進先出的特性橱鹏,可以保證消息的順序性。
Pub/Sub發(fā)布訂閱
發(fā)布訂閱模型包含三個角色:
- 主題(Topic)
- 發(fā)布者(Publisher)
- 訂閱者(Subscriber)
多個發(fā)布者將消息發(fā)送到Topic,系統(tǒng)將這些消息傳遞給多個訂閱者莉兰。
特點:
- 每個消息可以有多個消費者:和點對點方式不同挑围,發(fā)布消息可以被所有訂閱者消費
- 發(fā)布者和訂閱者之間有時間上的依賴性。
- 針對某個主題(Topic)的訂閱者糖荒,它必須創(chuàng)建一個訂閱者之后杉辙,才能消費發(fā)布者的消息。
- 為了消費消息捶朵,訂閱者必須保持運行的狀態(tài)蜘矢。
常用協(xié)議
AMQP
AMQP即Advanced Message Queuing Protocol,是應用層協(xié)議的一個開放標準综看,為面向消息的中間件設計品腹。消息中間件主要用于組件之間的解耦,消息的發(fā)送者無需知道消息使用者的存在红碑,反之亦然舞吭。AMQP 的主要特征是面向消息、隊列析珊、路由(包括點對點和發(fā)布/訂閱)羡鸥、可靠性、安全忠寻。
優(yōu)點:可靠惧浴、通用
MQTT
MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸)是IBM開發(fā)的一個即時通訊協(xié)議奕剃,有可能成為物聯(lián)網(wǎng)的重要組成部分衷旅。該協(xié)議支持所有平臺,幾乎可以把所有聯(lián)網(wǎng)物品和外部連接起來纵朋,被用來當做傳感器和致動器(比如通過Twitter讓房屋聯(lián)網(wǎng))的通信協(xié)議芜茵。
優(yōu)點:格式簡潔、占用帶寬小倡蝙、移動端通信、PUSH绞佩、嵌入式系統(tǒng)
STOMP
STOMP(Streaming Text Orientated Message Protocol)是流文本定向消息協(xié)議寺鸥,是一種為MOM(Message Oriented Middleware,面向消息的中間件)設計的簡單文本協(xié)議品山。STOMP提供一個可互操作的連接格式胆建,允許客戶端與任意STOMP消息代理(Broker)進行交互。
優(yōu)點:命令模式(非topic\queue模式)
XMPP
XMPP(可擴展消息處理現(xiàn)場協(xié)議肘交,Extensible Messaging and Presence Protocol)是基于可擴展標記語言(XML)的協(xié)議笆载,多用于即時消息(IM)以及在線現(xiàn)場探測。適用于服務器之間的準即時操作。核心是基于XML流傳輸凉驻,這個協(xié)議可能最終允許因特網(wǎng)用戶向因特網(wǎng)上的其他任何人發(fā)送即時消息腻要,即使其操作系統(tǒng)和瀏覽器不同。
優(yōu)點:通用公開涝登、兼容性強雄家、可擴展、安全性高胀滚,但XML編碼格式占用帶寬大
常用MQ產(chǎn)品對比(RabbitMq趟济、Kafaka)
架構方面
- Kafaka是正常的mq架構,包括provider broker consumer咽笼。Kafaka(默認)沒有消息確認機制顷编。
- RabbitMq中的broker由exchange、binder queue三部分組成剑刑,其中exchange和binding組成了消息的路由鍵媳纬;客戶端Producer通過連接channel和server進行通信,Consumer從queue獲取消息進行消費叛甫,RabbitMq有消息確認機制层宫。
吞吐量方面
- Kafaka采用zero-copy方式,即數(shù)據(jù)存儲和獲取是本地磁盤順序批量操作其监,具有O(1)復雜度萌腿,數(shù)據(jù)處理效率很高;
- RabbitMq在吞吐量方面不如Kafaka抖苦,RabbitMq支持對消息可靠的傳遞毁菱,支持事務,不支持批量的操作锌历;
可用性方面
- Kafka的broker采用主備模式贮庞,所以可用性很高;
- RabbitMq支持miror queue究西,主queue失效窗慎,minor queue生效;
集群負載方面
- Kafaka使用zookeeper實現(xiàn)負載均衡卤材,zookeeper管理集群中的broker sonsumer遮斥,通過zookeeper的協(xié)調機制,producer會記錄topic對應的broker扇丛,對broker進行輪詢或者隨機訪問broker术吗,實現(xiàn)負載均衡;
- RabbitMq需要單獨自定義負載均衡帆精;
總結
MQ | 吞吐量 | 應用場景 | 特點 |
---|---|---|---|
RabbitMq | 3500-4000msg/s | 非海量高可靠場景较屿,大規(guī)模企業(yè)應用隧魄、ESB、復雜路由策略隘蝎、易購系統(tǒng)整合 | 協(xié)議豐富购啄、兼容性強、功能完善末贾、消息格式比較大闸溃、速度較慢、消息持久化對性能影響較大 |
ZeroMq | >800000msg/s | 高并發(fā)連接場景拱撵,如:在線游戲辉川。海量高實時性場景。如:股票行情 | 偏重于網(wǎng)絡開發(fā)拴测,開發(fā)成本高乓旗,高級功能需自行實現(xiàn),不建議做傳統(tǒng)MQ使用 |
ActiveMq | ~3600msg/s | 非海量高可靠場景集索、企業(yè)級應用屿愚、分布式事務(XA)、異構系統(tǒng)整合 | 相對RabbitMq較輕量級务荆、性能相近妆距、完整JMS支持、配置較復雜 |
Redis | ~15000msg/s | 高吞吐低延時函匕、大量小消息體(<10k)娱据、順序性或排序要求、異構系統(tǒng)整合 | 輕量級MQ的快速簡單實現(xiàn)盅惜、容災與負載等功能需自行實現(xiàn) |
Kafka | IN ~70000msg/s中剩,OUT >150000msg/s | 日志等海量數(shù)據(jù)流、DB數(shù)據(jù)同步抒寂、高堆積離線數(shù)據(jù)處理 | 非典型MQ更偏向于流式數(shù)據(jù)批處理 |
RabbitMQ
RabbitMQ 是實現(xiàn) AMQP(高級消息隊列協(xié)議)的消息中間件的一種结啼,最初起源于金融系統(tǒng),用于在分布式系統(tǒng)中存儲轉發(fā)消息屈芜,在易用性郊愧、擴展性、高可用性等方面表現(xiàn)不俗井佑。 RabbitMQ 主要是為了實現(xiàn)系統(tǒng)之間的雙向解耦而實現(xiàn)的糕珊。當生產(chǎn)者大量產(chǎn)生數(shù)據(jù)時,消費者無法快速消費毅糟,那么需要一個中間層。保存這個數(shù)據(jù)澜公。
RabbitMQ 是一個開源的 AMQP 實現(xiàn)姆另,服務器端用Erlang語言編寫喇肋,支持多種客戶端,如:Python迹辐、Ruby蝶防、.NET、Java明吩、JMS间学、C、PHP印荔、ActionScript低葫、XMPP、STOMP 等仍律,支持 AJAX嘿悬。用于在分布式系統(tǒng)中存儲轉發(fā)消息,在易用性水泉、擴展性善涨、高可用性等方面表現(xiàn)不俗。
關鍵字
Channel(通道)
道是兩個管理器之間的一種單向點對點的的通信連接草则,如果需要雙向交流钢拧,可以建立一對通道。
Exchange(消息交換機)
Exchange類似于數(shù)據(jù)通信網(wǎng)絡中的交換機炕横,提供消息路由策略源内。
RabbitMq中,producer不是通過信道直接將消息發(fā)送給queue看锉,而是先發(fā)送給Exchange姿锭。一個Exchange可以和多個Queue進行綁定,producer在傳遞消息的時候伯铣,會傳遞一個ROUTING_KEY呻此,Exchange會根據(jù)這個ROUTING_KEY按照特定的路由算法,將消息路由給指定的queue腔寡。和Queue一樣焚鲜,Exchange也可設置為持久化,臨時或者自動刪除放前。
Exchange有4種類型:direct(默認)忿磅,fanout, topic凭语, 和headers葱她。
不同類型的Exchange轉發(fā)消息的策略有所區(qū)別:
- direct
直接交換器,工作方式類似于單播似扔,Exchange會將消息發(fā)送完全匹配ROUTING_KEY的Queue - fanout
廣播是式交換器吨些,不管消息的ROUTING_KEY設置為什么搓谆,Exchange都會將消息轉發(fā)給所有綁定的Queue。 - topic
主題交換器豪墅,工作方式類似于組播泉手,Exchange會將消息轉發(fā)和ROUTING_KEY匹配模式相同的所有隊列,比如偶器,ROUTING_KEY為user.stock的Message會轉發(fā)給綁定匹配模式為 * .stock斩萌,user.stock, * . * 和#.user.stock.#的隊列屏轰。( * 表是匹配一個任意詞組颊郎,#表示匹配0個或多個詞組) - headers
消息體的header匹配(ignore)
Binding(綁定)
所謂綁定就是將一個特定的 Exchange 和一個特定的 Queue 綁定起來。Exchange 和Queue的綁定可以是多對多的關系亭枷。
Routing Key(路由關鍵字)
exchange根據(jù)這個關鍵字進行消息投遞袭艺。
vhost(虛擬主機)
在RabbitMq server上可以創(chuàng)建多個虛擬的message broker,又叫做virtual hosts (vhosts)叨粘。每一個vhost本質上是一個mini-rabbitmq server猾编,分別管理各自的exchange,和bindings升敲。vhost相當于物理的server答倡,可以為不同app提供邊界隔離,使得應用安全的運行在不同的vhost實例上驴党,相互之間不會干擾瘪撇。producer和consumer連接rabbit server需要指定一個vhost。
消息發(fā)送及接受過程
假設P1和C1注冊了相同的Broker港庄,Exchange和Queue倔既。P1發(fā)送的消息最終會被C1消費。
基本的通信流程大概如下所示:
- P1生產(chǎn)消息鹏氧,發(fā)送給服務器端的Exchange
- Exchange收到消息渤涌,根據(jù)ROUTINKEY,將消息轉發(fā)給匹配的Queue1
- Queue1收到消息把还,將消息發(fā)送給訂閱者C1
- C1收到消息实蓬,發(fā)送ACK給隊列確認收到消息
- Queue1收到ACK,刪除隊列中緩存的此條消息
Consumer收到消息時需要顯式的向rabbit broker發(fā)送basic吊履。ack消息或者consumer訂閱消息時設置auto_ack參數(shù)為true安皱。
在通信過程中,隊列對ACK的處理有以下幾種情況:
- 如果consumer接收了消息艇炎,發(fā)送ack酌伊,rabbitmq會刪除隊列中這個消息,發(fā)送另一條消息給consumer缀踪。
- 如果cosumer接受了消息居砖, 但在發(fā)送ack之前斷開連接燕锥,rabbitmq會認為這條消息沒有被deliver,在consumer在次連接的時候悯蝉,這條消息會被redeliver。
- 如果consumer接受了消息托慨,但是程序中有bug鼻由,忘記了ack,rabbitmq不會重復發(fā)送消息厚棵。
- rabbitmq2蕉世。0。0和之后的版本支持consumer reject某條(類)消息婆硬,可以通過設置requeue參數(shù)中的reject為true達到目地狠轻,那么rabbitmq將會把消息發(fā)送給下一個注冊的consumer。
消息的ACK機制
即消息的Ackownledge確認機制彬犯,為了保證消息不丟失向楼,消息隊列提供了消息Acknowledge機制,即ACK機制谐区,當Consumer確認消息已經(jīng)被消費處理湖蜕,發(fā)送一個ACK給消息隊列,此時消息隊列便可以刪除這個消息了宋列。如果Consumer宕機/關閉昭抒,沒有發(fā)送ACK,消息隊列將認為這個消息沒有被處理炼杖,會將這個消息重新發(fā)送給其他的Consumer重新消費處理灭返。
消息的事務支持
消息的收發(fā)處理支持事務,例如:在任務中心場景中坤邪,一次處理可能涉及多個消息的接收熙含、處理,這應該處于同一個事務范圍內(nèi)罩扇,如果一個消息處理失敗婆芦,事務回滾,消息重新回到隊列中喂饥。
消息的持久化
消息的持久化消约,對于一些關鍵的核心業(yè)務來說是非常重要的,啟用消息持久化后员帮,消息隊列宕機重啟后或粮,消息可以從持久化存儲恢復,消息不丟失捞高,可以繼續(xù)消費處理氯材。
消息處理模式
fanout 模式
模式特點:
- 可以理解他是一個廣播模式
- 不需要routing key它的消息發(fā)送時通過Exchange binding進行路由的~~在這個模式下routing key失去作用
- 這種模式需要提前將Exchange與Queue進行綁定渣锦,一個Exchange可以綁定多個Queue,一個Queue可以同多個Exchange進行綁定
- 如果接收到消息的Exchange沒有與任何Queue綁定氢哮,則消息會被拋棄袋毙。
direct 模式
任何發(fā)送到Direct Exchange的消息都會被轉發(fā)到routing_key中指定的Queue。
- 一般情況可以使用rabbitMQ自帶的Exchange:”” (該Exchange的名字為空字符串)冗尤, 也可以自定義Exchange
- 這種模式下不需要將Exchange進行任何綁定(bind)操作听盖。當然也可以進行綁定×哑撸可以將不同的routing_key與不同的queue進行綁定皆看,不同的queue與不同exchange進行綁定
- 消息傳遞時需要一個“routing_key”
- 如果消息中不存在routing_key中綁定的隊列名,則該消息會被拋棄背零。
如果一個exchange 聲明為direct腰吟,并且bind中指定了routing_key,那么發(fā)送消息時需要同時指明該exchange和routing_key徙瓶。
簡而言之就是:生產(chǎn)者生成消息發(fā)送給Exchange毛雇, Exchange根據(jù)Exchange類型和basic_publish中的routing_key進行消息發(fā)送 消費者:訂閱Exchange并根據(jù)Exchange類型和binding key(bindings 中的routing key) ,如果生產(chǎn)者和訂閱者的routing_key相同倍啥,Exchange就會路由到那個隊列禾乘。
topic 模式
前面講到direct類型的Exchange路由規(guī)則是完全匹配binding key與routing key,但這種嚴格的匹配方式在很多情況下不能滿足實際業(yè)務需求虽缕。
topic類型的Exchange在匹配規(guī)則上進行了擴展始藕,它與direct類型的Exchage相似,也是將消息路由到binding key與routing key相匹配的Queue中氮趋,但這里的匹配規(guī)則有些不同伍派。
它約定:
- routing key為一個句點號“. ”分隔的字符串(我們將被句點號“。 ”分隔開的每一段獨立的字符串稱為一個單詞)剩胁,如“stock.usd.nyse”诉植、“nyse.vmw”、“quick.orange.rabbit”
- binding key與routing key一樣也是句點號“. ”分隔的字符串
- binding key中可以存在兩種特殊字符“”與“#”昵观,用于做模糊匹配晾腔,其中“”用于匹配一個單詞,“#”用于匹配多個單詞(可以是零個)
以上圖中的配置為例啊犬,routingKey=”quick.orange.rabbit”的消息會同時路由到Q1與Q2灼擂,routingKey=”lazy.orange.fox”的消息會路由到Q1,routingKey=”lazy.brown.fox”的消息會路由到Q2觉至,routingKey=”lazy.pink.rabbit”的消息會路由到Q2(只會投遞給Q2一次剔应,雖然這個routingKey與Q2的兩個bindingKey都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”峻贮、routingKey=”quick.orange.male.rabbit”的消息將會被丟棄席怪,因為它們沒有匹配任何bindingKey。
集群
RabbitMQ纤控,部署分三種模式:單機模式挂捻,普通集群模式,鏡像集群模式船万。
普通集群模式
多臺機器部署细层,每個機器放一個rabbitmq實例,但是創(chuàng)建的queue只會放在一個rabbitmq實例上唬涧,每個實例同步queue的元數(shù)據(jù)。
如果消費時連的是其他實例盛撑,那個實例會從queue所在實例拉取數(shù)據(jù)碎节。這就會導致拉取數(shù)據(jù)的開銷,如果那個放queue的實例宕機了抵卫,那么其他實例就無法從那個實例拉取狮荔,即便開啟了消息持久化,讓rabbitmq落地存儲消息的話介粘,消息不一定會丟殖氏,但得等這個實例恢復了,然后才可以繼續(xù)從這個queue拉取數(shù)據(jù)姻采,這就沒什么高可用可言雅采,主要是提供吞吐量,讓集群中多個節(jié)點來服務某個queue的讀寫操作慨亲。
鏡像集群模式
queue的元數(shù)據(jù)和消息都會存放在多個實例婚瓜,每次寫消息就自動同步到多個queue實例里。這樣任何一個機器宕機刑棵,其他機器都可以頂上,但是性能開銷太大,消息同步導致網(wǎng)絡帶寬壓力和消耗很重留瞳,另外摸柄,沒有擴展性可言,如果queue負載很重碍舍,加機器柠座,新增的機器也包含了這個queue的所有數(shù)據(jù),并沒有辦法線性擴展你的queue乒验。此時愚隧,需要開啟鏡像集群模式,在rabbitmq管理控制臺新增一個策略,將數(shù)據(jù)同步到指定數(shù)量的節(jié)點狂塘,然后你再次創(chuàng)建queue的時候录煤,應用這個策略,就會自動將數(shù)據(jù)同步到其他的節(jié)點上去了荞胡。
Kafka
介紹
Kafka 是 Apache 的子項目妈踊,是一個高性能跨語言的分布式發(fā)布/訂閱消息隊列系統(tǒng)(沒有嚴格實現(xiàn) JMS 規(guī)范的點對點模型,但可以實現(xiàn)其效果)泪漂,在企業(yè)開發(fā)中有廣泛的應用廊营。高性能是其最大優(yōu)勢,劣勢是消息的可靠性(丟失或重復)萝勤,這個劣勢是為了換取高性能露筒,開發(fā)者可以以稍降低性能,來換取消息的可靠性敌卓。
Topics/logs
一個Topic可以認為是一類消息慎式,每個topic將被分成多個partition(區(qū)),每個partition在存儲層面是append log文件趟径。任何發(fā)布到此partition的消息都會被直接追加到log文件的尾部瘪吏,每條消息在文件中的位置稱為offset(偏移量),offset為一個long型數(shù)字蜗巧,它是唯一標記一條消息掌眠。它唯一的標記一條消息。kafka并沒有提供其他額外的索引機制來存儲offset幕屹,因為在kafka中幾乎不允許對消息進行“隨機讀寫”蓝丙。
Kafka和JMS(Java Message Service)實現(xiàn)(activeMQ)不同的是:即使消息被消費,消息仍然不會被立即刪除望拖。日志文件將會根據(jù)broker中的配置要求迅腔,保留一定的時間之后刪除;比如log文件保留2天靠娱,那么兩天后沧烈,文件會被清除,無論其中的消息是否被消費像云。kafka通過這種簡單的手段锌雀,來釋放磁盤空間,以及減少消息消費之后對文件內(nèi)容改動的磁盤IO開支迅诬。
對于consumer而言腋逆,它需要保存消費消息的offset,對于offset的保存和使用侈贷,有consumer來控制惩歉;當consumer正常消費消息時,offset將會"線性"的向前驅動,即消息將依次順序被消費撑蚌。事實上consumer可以使用任意順序消費消息上遥,它只需要將offset重置為任意值。(offset將會保存在zookeeper中争涌,參見下文)
kafka集群幾乎不需要維護任何consumer和producer狀態(tài)信息粉楚,這些信息有zookeeper保存;因此producer和consumer的客戶端實現(xiàn)非常輕量級亮垫,它們可以隨意離開模软,而不會對集群造成額外的影響。
partitions的設計目的有多個饮潦。最根本原因是kafka基于文件存儲燃异。通過分區(qū),可以將日志內(nèi)容分散到多個server上继蜡,來避免文件尺寸達到單機磁盤的上限特铝,每個partiton都會被當前server(kafka實例)保存;可以將一個topic切分多任意多個partitions壹瘟,來消息保存/消費的效率。此外越多的partitions意味著可以容納更多的consumer鳄逾,有效提升并發(fā)消費的能力稻轨。(具體原理參見下文)。
Distribution
一個Topic的多個partitions雕凹,被分布在kafka集群中的多個server上殴俱;每個server(kafka實例)負責partitions中消息的讀寫操作;此外kafka還可以配置partitions需要備份的個數(shù)(replicas)枚抵,每個partition將會被備份到多臺機器上线欲,以提高可用性。
基于replicated方案汽摹,那么就意味著需要對多個備份進行調度李丰;每個partition都有一個server為"leader";leader負責所有的讀寫操作逼泣,如果leader失效趴泌,那么將會有其他follower來接管(成為新的leader);follower只是單調的和leader跟進拉庶,同步消息即可嗜憔。由此可見作為leader的server承載了全部的請求壓力,因此從集群的整體考慮氏仗,有多少個partitions就意味著有多少個"leader"吉捶,kafka會將"leader"均衡的分散在每個實例上,來確保整體的性能穩(wěn)定。
Producers
Producer將消息發(fā)布到指定的Topic中呐舔,同時Producer也能決定將此消息歸屬于哪個partition币励;比如基于"round-robin"方式或者通過其他的一些算法等。
Consumers
本質上kafka只支持Topic滋早。每個consumer屬于一個consumer group榄审;反過來說,每個group中可以有多個consumer杆麸。發(fā)送到Topic的消息搁进,只會被訂閱此Topic的每個group中的一個consumer消費。
如果所有的consumer都具有相同的group昔头,這種情況和queue模式很像饼问;消息將會在consumers之間負載均衡。
如果所有的consumer都具有不同的group揭斧,那這就是"發(fā)布-訂閱"莱革;消息將會廣播給所有的消費者。
在kafka中讹开,一個partition中的消息只會被group中的一個consumer消費盅视;每個group中consumer消息消費互相獨立;我們可以認為一個group是一個"訂閱"者旦万,一個Topic中的每個partions闹击,只會被一個"訂閱者"中的一個consumer消費,不過一個consumer可以消費多個partitions中的消息成艘。kafka只能保證一個partition中的消息被某個consumer消費時赏半,消息是順序的。事實上淆两,從Topic角度來說断箫,消息仍不是有序的。
Kafka的設計原理決定秋冰,對于一個topic仲义,同一個group中不能有多于partitions個數(shù)的consumer同時消費,否則將意味著某些consumer將無法得到消息剑勾。
Guarantees
- 發(fā)送到partitions中的消息將會按照它接收的順序追加到日志中
- 對于消費者而言光坝,它們消費消息的順序和日志中消息順序一致。
- 如果Topic的"replicationfactor"為N甥材,那么允許N-1個kafka實例失效盯另。
使用場景
Messaging
對于一些常規(guī)的消息系統(tǒng),kafka是個不錯的選擇洲赵;partitons/replication和容錯鸳惯,可以使kafka具有良好的擴展性和性能優(yōu)勢商蕴。不過到目前為止,我們應該很清楚認識到芝发,kafka并沒有提供JMS中的“事務性”绪商、“消息傳輸擔保(消息確認機制)”、“消息分組”等企業(yè)級特性辅鲸;kafka只能使用作為“常規(guī)”的消息系統(tǒng)格郁,在一定程度上,尚未確保消息的發(fā)送與接收絕對可靠(比如:消息重發(fā)独悴,消息發(fā)送丟失等)例书;Websit activity tracking
kafka可以作為“網(wǎng)站活性跟蹤”的最佳工具;可以將網(wǎng)頁/用戶操作等信息發(fā)送到kafka中刻炒。并實時監(jiān)控决采,或者離線統(tǒng)計分析等;Log Aggregation
kafka的特性決定它非常適合作為“日志收集中心”坟奥;application可以將操作日志“批量”树瞭、“異步”的發(fā)送到kafka集群中,而不是保存在本地或者DB中爱谁;kafka可以批量提交消息/壓縮消息等晒喷,這對producer端而言,幾乎感覺不到性能的開支访敌。此時consumer端可以使hadoop等其他系統(tǒng)化的存儲和分析系統(tǒng)凉敲;
消息可靠性保障
Kafka就比較適合高吞吐量并且允許少量數(shù)據(jù)丟失的場景,如果非要保證“消息可靠傳輸”捐顷,可以使用JMS。
Kafka Producer 消息發(fā)送有兩種方式(配置參數(shù) producer.type):
- producer.type=sync(默認值): 后臺線程中消息發(fā)送是同步方式雨效,對應的類為 kafka.producer.SyncProducer迅涮;
- producer.type=async: 后臺線程中消息發(fā)送是異步方式,對應的類為 kafka.producer.AyncProducer徽龟;優(yōu)點是可批量發(fā)送消息(消息個數(shù)達到 batch.num.messages=200 或時間達到時發(fā)送)叮姑、吞吐量佳,缺點是發(fā)送不及時可能導致丟失据悔;
對于同步方式(producer.type=sync)传透?Kafka Producer 消息發(fā)送有三種確認方式(配置參數(shù) acks):
- acks=0: producer 不等待 Leader 確認,只管發(fā)出即可极颓;最可能丟失消息朱盐,適用于高吞吐可丟失的業(yè)務;
- acks=1(默認值): producer 等待 Leader 寫入本地日志后就確認菠隆;之后 Leader 向 Followers 同步時兵琳,如果 Leader 宕機會導致消息沒同步而丟失狂秘,producer 卻依舊認為成功;
- acks=all/-1: producer 等待 Leader 寫入本地日志躯肌、而且 Leader 向 Followers 同步完成后才會確認者春,最可靠;
設計原理
kafka的設計初衷是希望作為一個統(tǒng)一的信息收集平臺清女,能夠實時的收集反饋信息钱烟,并需要能夠支撐較大的數(shù)據(jù)量,且具備良好的容錯能力嫡丙。
持久性
kafka使用文件存儲消息拴袭,這就直接決定kafka在性能上嚴重依賴文件系統(tǒng)的本身特性。且無論任何OS下迄沫,對文件系統(tǒng)本身的優(yōu)化幾乎沒有可能稻扬。文件緩存/直接內(nèi)存映射等是常用的手段。因為kafka是對日志文件進行append操作羊瘩,因此磁盤檢索的開支是較小的泰佳;同時為了減少磁盤寫入的次數(shù),broker會將消息暫時buffer起來尘吗,當消息的個數(shù)(或尺寸)達到一定閥值時逝她,再flush到磁盤,這樣減少了磁盤IO調用的次數(shù)睬捶。
性能
需要考慮的影響性能點很多黔宛,除磁盤IO之外,我們還需要考慮網(wǎng)絡IO擒贸,這直接關系到kafka的吞吐量問題臀晃。kafka并沒有提供太多高超的技巧;對于producer端介劫,可以將消息buffer起來徽惋,當消息的條數(shù)達到一定閥值時,批量發(fā)送給broker座韵;對于consumer端也是一樣险绘,批量fetch多條消息。不過消息量的大小可以通過配置文件來指定誉碴。對于kafka broker端宦棺,似乎有個sendfile系統(tǒng)調用可以潛在的提升網(wǎng)絡IO的性能:將文件的數(shù)據(jù)映射到系統(tǒng)內(nèi)存中,socket直接讀取相應的內(nèi)存區(qū)域即可黔帕,而無需進程再次copy和交換代咸。 其實對于producer/consumer/broker三者而言,CPU的開支應該都不大成黄,因此啟用消息壓縮機制是一個良好的策略侣背;壓縮需要消耗少量的CPU資源白华,不過對于kafka而言,網(wǎng)絡IO更應該需要考慮贩耐』⌒龋可以將任何在網(wǎng)絡上傳輸?shù)南⒍冀?jīng)過壓縮。kafka支持gzip/snappy等多種壓縮方式潮太。
生產(chǎn)者
負載均衡: producer將會和Topic下所有partition leader保持socket連接管搪;消息由producer直接通過socket發(fā)送到broker,中間不會經(jīng)過任何“路由層“铡买。事實上更鲁,消息被路由到哪個partition上,有producer客戶端決定奇钞。比如可以采用“random““key-hash““輪詢“等澡为,如果一個topic中有多個partitions,那么在producer端實現(xiàn)“消息均衡分發(fā)“是必要的景埃。
其中partition leader的位置(host:port)注冊在zookeeper中媒至,producer作為zookeeper client,已經(jīng)注冊了watch用來監(jiān)聽partition leader的變更事件谷徙。
異步發(fā)送:將多條消息暫且在客戶端buffer起來拒啰,并將他們批量的發(fā)送到broker,小數(shù)據(jù)IO太多完慧,會拖慢整體的網(wǎng)絡延遲谋旦,批量延遲發(fā)送事實上提升了網(wǎng)絡效率。不過這也有一定的隱患屈尼,比如說當producer失效時册着,那些尚未發(fā)送的消息將會丟失。
消費者
consumer端向broker發(fā)送“fetch”請求脾歧,并告知其獲取消息的offset甲捏;此后consumer將會獲得一定條數(shù)的消息;consumer端也可以重置offset來重新消費消息涨椒。
在JMS實現(xiàn)中摊鸡,Topic模型基于push方式绽媒,即broker將消息推送給consumer端蚕冬。不過在kafka中,采用了pull方式是辕,即consumer在和broker建立連接之后囤热,主動去pull(或者說fetch)消息;這中模式有些優(yōu)點获三,首先consumer端可以根據(jù)自己的消費能力適時的去fetch消息并處理旁蔼,且可以控制消息消費的進度(offset)锨苏;此外,消費者可以良好的控制消息消費的數(shù)量棺聊,batch fetch伞租。
其他JMS實現(xiàn),消息消費的位置是有prodiver保留限佩,以便避免重復發(fā)送消息或者將沒有消費成功的消息重發(fā)等葵诈,同時還要控制消息的狀態(tài)。這就要求JMS broker需要太多額外的工作祟同。在kafka中作喘,partition中的消息只有一個consumer在消費,且不存在消息狀態(tài)的控制晕城,也沒有復雜的消息確認機制泞坦,可見kafka broker端是相當輕量級的。當消息被consumer接收之后砖顷,consumer可以在本地保存最后消息的offset贰锁,并間歇性的向zookeeper注冊offset。由此可見择吊,consumer客戶端也很輕量級李根。
消息傳送機制
對于JMS實現(xiàn),消息傳輸擔保非常直接:有且只有一次(exactly once)几睛。
在kafka中稍有不同:
- at most once: 最多一次房轿,這個和JMS中"非持久化"消息類似。發(fā)送一次所森,無論成敗囱持,將不會重發(fā)。
- at least once: 消息至少發(fā)送一次焕济,如果消息未能接受成功纷妆,可能會重發(fā),直到接收成功晴弃。
- exactly once: 消息只會發(fā)送一次掩幢。
at most once: 消費者fetch消息,然后保存offset上鞠,然后處理消息际邻;當client保存offset之后,但是在消息處理過程中出現(xiàn)了異常芍阎,導致部分消息未能繼續(xù)處理世曾。那么此后"未處理"的消息將不能被fetch到,這就是"at most once"谴咸。
at least once: 消費者fetch消息轮听,然后處理消息骗露,然后保存offset。如果消息處理成功之后血巍,但是在保存offset階段zookeeper異常導致保存操作未能執(zhí)行成功萧锉,這就導致接下來再次fetch時可能獲得上次已經(jīng)處理過的消息,這就是"at least once"述寡,原因offset沒有及時的提交給zookeeper驹暑,zookeeper恢復正常還是之前offset狀態(tài)。
exactly once: kafka中并沒有嚴格的去實現(xiàn)(基于2階段提交辨赐,事務)优俘,我們認為這種策略在kafka中是沒有必要的。
通常情況下“at-least-once”是我們首選掀序。(相比at most once而言帆焕,重復接收數(shù)據(jù)總比丟失數(shù)據(jù)要好)。
如何保證消息不被重復消費:使用冪等性消費者
集群
kafka高可用由多個broker組成不恭,每個broker是一個節(jié)點叶雹;
創(chuàng)建一個topic,這個topic會劃分為多個partition换吧,每個partition存在于不同的broker上折晦,每個partition就放一部分數(shù)據(jù)。
kafka是一個分布式消息隊列沾瓦,就是說一個topic的數(shù)據(jù)满着,是分散放在不同的機器上,每個機器就放一部分數(shù)據(jù)贯莺。
在0.8版本以前风喇,是沒有HA機制的,就是任何一個broker宕機了缕探,那個broker上的partition就廢了魂莫,沒法寫也沒法讀,沒有什么高可用性可言爹耗。
0.8版本以后耙考,才提供了HA機制,也就是就是replica副本機制潭兽。每個partition的數(shù)據(jù)都會同步到其他的機器上倦始,形成自己的多個replica副本。然后所有replica會選舉一個leader出來讼溺,那么生產(chǎn)和消費都跟這個leader打交道楣号,然后其他replica就是follower最易。
寫的時候怒坯,leader會負責把數(shù)據(jù)同步到所有follower上去炫狱,讀的時候就直接讀leader上數(shù)據(jù)即可。
kafka會均勻的將一個partition的所有replica分布在不同的機器上剔猿,從而提高容錯性视译。
如果某個broker宕機了也沒事,它上面的partition在其他機器上都有副本的归敬,如果這上面有某個partition的leader酷含,那么此時會重新選舉一個新的leader出來,大家繼續(xù)讀寫那個新的leader即可汪茧。這就有所謂的高可用性了椅亚。
寫數(shù)據(jù)的時候,生產(chǎn)者就寫leader舱污,然后leader將數(shù)據(jù)落地寫本地磁盤呀舔,接著其他follower自己主動從leader來pull數(shù)據(jù)。一旦所有follower同步好數(shù)據(jù)了扩灯,就會發(fā)送ack給leader媚赖,leader收到所有follower的ack之后,就會返回寫成功的消息給生產(chǎn)者珠插。
復制備份
- kafka將每個partition數(shù)據(jù)復制到多個server上惧磺,任何一個partition有一個leader和多個follower(可以沒有);
- 備份的個數(shù)可以通過broker配置文件來設定捻撑。leader處理所有的read-write請求磨隘,follower需要和leader保持同步。Follower和consumer一樣顾患,消費消息并保存在本地日志中琳拭;
- leader負責跟蹤所有的follower狀態(tài),如果follower"落后"太多或者失效描验,leader將會把它從replicas同步列表中刪除白嘁;
- 當所有的follower都將一條消息保存成功,此消息才被認為是"committed"膘流,那么此時consumer才能消費它絮缅;
- 即使只有一個replicas實例存活,仍然可以保證消息的正常發(fā)送和接收呼股,只要zookeeper集群存活即可(不同于其他分布式存儲耕魄,比如hbase需要"多數(shù)派"存活才行);
- 當leader失效時彭谁,需在followers中選取出新的leader吸奴,可能此時follower落后于leader,因此需要選擇一個"up-to-date"的follower;
- 選擇follower時需要兼顧一個問題则奥,就是新leaderserver上所已經(jīng)承載的partition leader的個數(shù)考润,如果一個server上有過多的partition leader,意味著此server將承受著更多的IO壓力读处。在選舉新leader糊治,需要考慮到"負載均衡";
如何處理消息丟失
消息丟失會出現(xiàn)在三個環(huán)節(jié)罚舱,分別是生產(chǎn)者井辜、mq中間件、消費者:
RabbitMQ
生產(chǎn)者端的控制
使用事務管闷,出錯后重試粥脚。
或者使用異步回調(confirm),由消費者回調生產(chǎn)者方法通知生產(chǎn)者消息是否正常處理包个。中間件的控制
持久化消息到磁盤(不能保證絕對不丟)阿逃;消費者端控制
關閉Auto ACK,手動ACK赃蛛。
Kafka
大體和RabbitMQ相同恃锉。
- 設置acks=all,此時kafka會確保消息同步到所有follower才會ACK呕臂。
當broker leader宕機了破托,并且消息沒有同步到follower,此時選舉出了新的leader并沒有同步到剛才的消息歧蒋,那么這條消息便丟了土砂,解決的方法是,谜洽。
如何保證消息的順序性
Rabbitmq
需要保證順序的消息投遞到同一個queue中萝映,這個queue只能有一個consumer,如果需要提升性能阐虚,可以用內(nèi)存隊列做排隊序臂,然后分發(fā)給底層不同的worker來處理。
Kafka
寫入一個partition中的數(shù)據(jù)一定是有序的实束。生產(chǎn)者在寫的時候 奥秆,可以指定一個key,比如指定訂單id作為key咸灿,這個訂單相關數(shù)據(jù)一定會被分發(fā)到一個partition中去构订。消費者從partition中取出數(shù)據(jù)的時候也一定是有序的,把每個數(shù)據(jù)放入對應的一個內(nèi)存隊列避矢,一個partition中有幾條相關數(shù)據(jù)就用幾個內(nèi)存隊列悼瘾,消費者開啟多個線程囊榜,每個線程處理一個內(nèi)存隊列。