橫貫八方揭秘RabbitMQ想鹰、RocketMQ紊婉、Kafka 的核心原理(建議收藏)

今天我們通過一篇文章來認識一下常見消息隊列RabbitMQ、RocketMQ辑舷、Kafka喻犁。

RabbitMQ

RabbitMQ各組件的功能

  • Broker :一個RabbitMQ實例就是一個Broker
  • Virtual Host :虛擬主機。相當于MySQL的DataBase,一個Broker上可以存在多個vhost肢础,vhost之間相互隔離还栓。每個vhost都擁有自己的隊列、交換機传轰、綁定和權(quán)限機制剩盒。vhost必須在連接時指定,默認的vhost是/路召。
  • Exchange :交換機勃刨,用來接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務(wù)器中的隊列。
  • Queue :消息隊列股淡,用來保存消息直到發(fā)送給消費者身隐。它是消息的容器。一個消息可投入一個或多個隊列唯灵。
  • Banding :綁定關(guān)系贾铝,用于消息隊列和交換機之間的關(guān)聯(lián)。通過路由鍵(Routing Key)將交換機和消息隊列關(guān)聯(lián)起來埠帕。
  • Channel :管道垢揩,一條雙向數(shù)據(jù)流通道。不管是發(fā)布消息敛瓷、訂閱隊列還是接收消息叁巨,這些動作都是通過管道完成。因為對于操作系統(tǒng)來說呐籽,建立和銷毀TCP都是非常昂貴的開銷锋勺,所以引入了管道的概念,以復用一條TCP連接狡蝶。
  • Connection :生產(chǎn)者/消費者 與broker之間的TCP連接庶橱。
  • Publisher :消息的生產(chǎn)者。
  • Consumer :消息的消費者苏章。
  • Message :消息奏瞬,它是由消息頭和消息體組成。消息頭則包括Routing-Key丝格、Priority(優(yōu)先級)等。

RabbitMQ的多種交換機類型

Exchange 分發(fā)消息給 Queue 時预伺, Exchange 的類型對應(yīng)不同的分發(fā)策略订咸,有3種類型的 Exchange :Direct酬诀、Fanout瞒御、Topic

  • Direct:消息中的 Routing Key 如果和 Binding 中的 Routing Key 完全一致趾唱, Exchange 就會將消息分發(fā)到對應(yīng)的隊列中甜癞。
  • Fanout:每個發(fā)到 Fanout 類型交換機的消息都會分發(fā)到所有綁定的隊列上去悠咱。Fanout交換機沒有 Routing Key 析既。它在三種類型的交換機中轉(zhuǎn)發(fā)消息是最快的谆奥。
  • Topic:Topic交換機通過模式匹配分配消息,將 Routing Key 和某個模式進行匹配空骚。它只能識別兩個通配符:"#"和"*"。# 匹配0個或多個單詞逢渔, * 匹配1個單詞肃廓。

TTL

TTL(Time To Live):生存時間。RabbitMQ支持消息的過期時間盲赊,一共2種哀蘑。

  • 在消息發(fā)送時進行指定绘迁。通過配置消息體的 Properties 缀台,可以指定當前消息的過期時間膛腐。
  • 在創(chuàng)建Exchange時指定哲身。從進入消息隊列開始計算律罢,只要超過了隊列的超時時間配置误辑,那么消息會自動清除巾钉。

生產(chǎn)者的消息確認機制

Confirm機制:

  • 消息的確認砰苍,是指生產(chǎn)者投遞消息后赚导,如果Broker收到消息吼旧,則會給我們生產(chǎn)者一個應(yīng)答圈暗。
  • 生產(chǎn)者進行接受應(yīng)答员串,用來確認這條消息是否正常的發(fā)送到了Broker寸齐,這種方式也是消息的可靠性投遞的核心保障!

如何實現(xiàn)Confirm確認消息?

  1. 在channel上開啟確認模式:channel.confirmSelect()
  2. 在channel上開啟監(jiān)聽:addConfirmListener 挣惰,監(jiān)聽成功和失敗的處理結(jié)果憎茂,根據(jù)具體的結(jié)果對消息進行重新發(fā)送或記錄日志處理等后續(xù)操作竖幔。

Return消息機制:

Return Listener用于處理一些不可路由的消息拳氢。

我們的消息生產(chǎn)者馋评,通過指定一個Exchange和Routing留特,把消息送達到某一個隊列中去蜕青,然后我們的消費者監(jiān)聽隊列進行消息的消費處理操作右核。

但是在某些情況下,如果我們在發(fā)送消息的時候,當前的exchange不存在或者指定的路由key路由不到针炉,這個時候我們需要監(jiān)聽這種不可達消息篡帕,就需要使用到Returrn Listener镰烧。

基礎(chǔ)API中有個關(guān)鍵的配置項 Mandatory :如果為true怔鳖,監(jiān)聽器會收到路由不可達的消息结执,然后進行處理献幔。如果為false蜡感,broker端會自動刪除該消息犀斋。

同樣闪水,通過監(jiān)聽的方式球榆, chennel.addReturnListener(ReturnListener rl) 傳入已經(jīng)重寫過handleReturn方法的ReturnListener持钉。

消費端ACK與NACK

消費端進行消費的時候每强,如果由于業(yè)務(wù)異晨罩矗可以進行日志的記錄辨绊,然后進行補償宣鄙。但是對于服務(wù)器宕機等嚴重問題冻晤,我們需要手動ACK保障消費端消費成功鼻弧。

// deliveryTag:消息在mq中的唯一標識// multiple:是否批量(和qos設(shè)置類似的參數(shù))// requeue:是否需要重回隊列温数〕糯蹋或者丟棄或者重回隊首再次消費够傍。public void basicNack(long deliveryTag, boolean multiple, boolean requeue) 

如上代碼冕屯,消息在消費端重回隊列是為了對沒有成功處理消息安聘,把消息重新返回到Broker浴韭。一般來說,實際應(yīng)用中都會關(guān)閉重回隊列(避免進入死循環(huán))榴芳,也就是設(shè)置為false窟感。

死信隊列DLX

死信隊列(DLX Dead-Letter-Exchange):當消息在一個隊列中變成死信之后肌括,它會被重新推送到另一個隊列,這個隊列就是死信隊列紧索。

DLX也是一個正常的Exchange珠漂,和一般的Exchange沒有區(qū)別媳危,它能在任何的隊列上被指定,實際上就是設(shè)置某個隊列的屬性暮蹂。

當這個隊列中有死信時仰泻,RabbitMQ就會自動的將這個消息重新發(fā)布到設(shè)置的Exchange上去集侯,進而被路由到另一個隊列。

RocketMQ

阿里巴巴雙十一官方指定消息產(chǎn)品术健,支撐阿里巴巴集團所有的消息服務(wù)荞估,歷經(jīng)十余年高可用與高可靠的嚴苛考驗,是阿里巴巴交易鏈路的核心產(chǎn)品飞醉。

Rocket:火箭的意思缅帘。

RocketMQ的核心概念

他有以下核心概念:Broker 逗栽、 Topic 彼宠、 Tag 凭峡、 MessageQueue 、 NameServer 按价、 Group 楼镐、 Offset 、 Producer 以及 Consumer 秉宿。

下面來詳細介紹描睦。

  • Broker:消息中轉(zhuǎn)角色,負責存儲消息韵丑,轉(zhuǎn)發(fā)消息撵彻。
    Broker是具體提供業(yè)務(wù)的服務(wù)器轴合,單個Broker節(jié)點與所有的NameServer節(jié)點保持長連接及心跳值桩,并會定時將Topic信息注冊到NameServer,順帶一提底層的通信和連接都是基于Netty實現(xiàn)的。
    Broker負責消息存儲鸯隅,以Topic為緯度支持輕量級的隊列蝌以,單機可以支撐上萬隊列規(guī)模,支持消息推拉模型徊件。官網(wǎng)上有數(shù)據(jù)顯示:具有上億級消息堆積能力,同時可嚴格保證消息的有序性部翘。
  • Topic:主題新思!它是消息的第一級類型。
    比如一個電商系統(tǒng)可以分為:交易消息崔兴、物流消息等位谋,一條消息必須有一個 Topic 掏父。Topic與生產(chǎn)者和消費者的關(guān)系非常松散,一個 Topic 可以有0個陶缺、1個、多個生產(chǎn)者向其發(fā)送消息苫费,一個生產(chǎn)者也可以同時向不同的 Topic 發(fā)送消息百框。一個 Topic 也可以被 0個、1個方椎、多個消費者訂閱。
  • Tag:標簽闸拿!可以看作子主題新荤,它是消息的第二級類型篱瞎,用于為用戶提供額外的靈活性。
    使用標簽澄者,同一業(yè)務(wù)模塊不同目的的消息就可以用相同Topic而不同的Tag來標識。比如交易消息又可以分為:交易創(chuàng)建消息抱怔、交易完成消息等嘀倒,一條消息可以沒有Tag。標簽有助于保持您的代碼干凈和連貫局冰,并且還可以為RabbitMQ提供的查詢系統(tǒng)提供幫助测蘑。
  • MessageQueue:一個Topic下可以設(shè)置多個消息隊列,發(fā)送消息時執(zhí)行該消息的Topic康二,RocketMQ會輪詢該Topic下的所有隊列將消息發(fā)出去。消息的物理管理單位沫勿。一個Topic下可以有多個Queue挨约,Queue的引入使得消息的存儲可以分布式集群化,具有了水平擴展能力产雹。
  • NameServer:類似Kafka中的ZooKeeper诫惭,但NameServer集群之間是沒有通信的,相對ZK來說更加輕量蔓挖。
    它主要負責對于源數(shù)據(jù)的管理夕土,包括了對于Topic和路由信息的管理。每個Broker在啟動的時候會到NameServer注冊瘟判,Producer在發(fā)送消息前會根據(jù)Topic去NameServer獲取對應(yīng)Broker的路由信息怨绣,Consumer也會定時獲取 Topic 的路由信息。
  • Producer:生產(chǎn)者拷获,支持三種方式發(fā)送消息:同步篮撑、異步和單向
    單向發(fā)送 :消息發(fā)出去后匆瓜,可以繼續(xù)發(fā)送下一條消息或執(zhí)行業(yè)務(wù)代碼赢笨,不等待服務(wù)器回應(yīng)未蝌,且沒有回調(diào)函數(shù)
    異步發(fā)送 :消息發(fā)出去后质欲,可以繼續(xù)發(fā)送下一條消息或執(zhí)行業(yè)務(wù)代碼树埠,不等待服務(wù)器回應(yīng),有回調(diào)函數(shù)嘶伟。
    同步發(fā)送 :消息發(fā)出去后怎憋,等待服務(wù)器響應(yīng)成功或失敗,才能繼續(xù)后面的操作九昧。
  • Consumer:消費者绊袋,支持 PUSH 和 PULL 兩種消費模式,支持集群消費廣播消費 集群消費 :該模式下一個消費者集群共同消費一個主題的多個隊列铸鹰,一個隊列只會被一個消費者消費癌别,如果某個消費者掛掉,分組內(nèi)其它消費者會接替掛掉的消費者繼續(xù)消費蹋笼。
    廣播消費 :會發(fā)給消費者組中的每一個消費者進行消費展姐。相當于RabbitMQ的發(fā)布訂閱模式。
  • Group:分組剖毯,一個組可以訂閱多個Topic圾笨。
    分為ProducerGroup,ConsumerGroup逊谋,代表某一類的生產(chǎn)者和消費者擂达,一般來說同一個服務(wù)可以作為Group,同一個Group一般來說發(fā)送和消費的消息都是一樣的
  • Offset:在RocketMQ中胶滋,所有消息隊列都是持久化板鬓,長度無限的數(shù)據(jù)結(jié)構(gòu),所謂長度無限是指隊列中的每個存儲單元都是定長究恤,訪問其中的存儲單元使用Offset來訪問俭令,Offset為Java Long類型,64位部宿,理論上在 100年內(nèi)不會溢出唤蔗,所以認為是長度無限。也可以認為Message Queue是一個長度無限的數(shù)組窟赏,Offset就是下標妓柜。

延時消息

開源版的RocketMQ不支持任意時間精度,僅支持特定的level涯穷,例如定時5s棍掐,10s,1min等拷况。其中作煌,level=0級表示不延時掘殴,level=1表示1級延時,level=2表示2級延時粟誓,以此類推奏寨。

延時等級如下:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

順序消息

消息有序指的是可以按照消息的發(fā)送順序來消費(FIFO)。RocketMQ可以嚴格的保證消息有序鹰服,可以分為 分區(qū)有序 或者 全局有序 病瞳。

事務(wù)消息

消息隊列MQ提供類似X/Open XA的分布式事務(wù)功能,通過消息隊列MQ事務(wù)消息能達到分布式事務(wù)的最終一致悲酷。上圖說明了事務(wù)消息的大致流程:正常事務(wù)消息的發(fā)送和提交套菜、事務(wù)消息的補償流程。

事務(wù)消息發(fā)送及提交:

  1. 發(fā)送half消息
  2. 服務(wù)端響應(yīng)消息寫入結(jié)果
  3. 根據(jù)發(fā)送結(jié)果執(zhí)行本地事務(wù)(如果寫入失敗设易,此時half消息對業(yè)務(wù)不可見逗柴,本地邏輯不執(zhí)行);
  4. 根據(jù)本地事務(wù)狀態(tài)執(zhí)行Commit或Rollback(Commit操作生成消息索引顿肺,消息對消費者可見)戏溺。

事務(wù)消息的補償流程:

  1. 對沒有Commit/Rollback的事務(wù)消息(pending狀態(tài)的消息),從服務(wù)端發(fā)起一次“回查”屠尊;
  2. Producer收到回查消息于购,檢查回查消息對應(yīng)的本地事務(wù)的狀態(tài)。
  3. 根據(jù)本地事務(wù)狀態(tài)知染,重新Commit或RollBack

其中,補償階段用于解決消息Commit或Rollback發(fā)生超時或者失敗的情況斑胜。

事務(wù)消息狀態(tài):

事務(wù)消息共有三種狀態(tài):提交狀態(tài)控淡、回滾狀態(tài)、中間狀態(tài):

  1. TransactionStatus.CommitTransaction:提交事務(wù)止潘,它允許消費者消費此消息掺炭。
  2. TransactionStatus.RollbackTransaction:回滾事務(wù),它代表該消息將被刪除凭戴,不允許被消費涧狮。
  3. TransactionStatus.Unkonwn:中間狀態(tài),它代表需要檢查消息隊列來確定消息狀態(tài)么夫。

RocketMQ的高可用機制

RocketMQ是天生支持分布式的者冤,可以配置主從以及水平擴展。

Master角色的Broker支持讀和寫档痪,Slave角色的Broker僅支持讀涉枫,也就是 Producer只能和Master角色的Broker連接寫入消息;Consumer可以連接 Master角色的Broker腐螟,也可以連接Slave角色的Broker來讀取消息愿汰。

消息消費的高可用(主從):

在Consumer的配置文件中困后,并不需要設(shè)置是從Master讀還是從Slave讀,當Master不可用或者繁忙的時候衬廷,Consumer會被自動切換到從Slave讀摇予。有了自動切換Consumer這種機制,當一個Master角色的機器出現(xiàn)故障后吗跋,Consumer仍然可以從Slave讀取消息侧戴,不影響Consumer程序。

在4.5版本之前如果Master節(jié)點掛了小腊,Slave節(jié)點是不能自動切換成master節(jié)點的這個時候需要手動停止Slave角色的Broker救鲤,更改配置文件,用新的配置文件啟動Broker秩冈。但是在4.5之后本缠,RocketMQ引入了Dledger同步機制,這個時候如果Master節(jié)點掛了入问,Dledger會通過Raft協(xié)議選舉出新的master節(jié)點丹锹,不需要手動修改配置侧漓。

消息發(fā)送高可用(配置多個主節(jié)點):

在創(chuàng)建Topic的時候精钮,把Topic的多個Message Queue創(chuàng)建在多個Broker組上(相同Broker名稱,不同 brokerId的機器組成一個Broker組)惧互,這樣當一個Broker組的Master不可用后棱烂,其他組的Master仍然可用租漂,Producer仍然可以發(fā)送消息。

主從復制:

如果一個Broker組有Master和Slave颊糜,消息需要從Master復制到Slave 上哩治,有同步和異步兩種復制方式。

  • 同步復制:同步復制方式是等Master和Slave均寫成功后才反饋給客戶端寫成功狀態(tài)衬鱼。如果Master出故障业筏, Slave上有全部的備份數(shù)據(jù),容易恢復同步復制會增大數(shù)據(jù)寫入延遲鸟赫,降低系統(tǒng)吞吐量蒜胖。
  • 異步復制:異步復制方式是只要Master寫成功 即可反饋給客戶端寫成功狀態(tài)。在異步復制方式下抛蚤,系統(tǒng)擁有較低的延遲和較高的吞吐量台谢,但是如果Master出了故障,有些數(shù)據(jù)因為沒有被寫 入Slave岁经,有可能會丟失

通常情況下对碌,應(yīng)該把Master和Slave配置成同步刷盤方式,主從之間配置成異步的復制方式蒿偎,這樣即使有一臺機器出故障朽们,仍然能保證數(shù)據(jù)不丟怀读,是個不錯的選擇。

負載均衡

Producer負載均衡:

Producer端骑脱,每個實例在發(fā)消息的時候菜枷,默認會輪詢所有的Message Queue發(fā)送,以達到讓消息平均落在不同的Queue上叁丧。而由于Queue可以散落在不同的Broker啤誊,所以消息就發(fā)送到不同的Broker下,如下圖:

Consumer負載均衡:

如果Consumer實例的數(shù)量比Message Queue的總數(shù)量還多的話拥娄,多出來的Consumer實例將無法分到Queue蚊锹,也就無法消費到消息,也就無法起到分攤負載的作用了稚瘾。所以需要控制讓Queue的總數(shù)量大于等于Consumer的數(shù)量牡昆。

  • 消費者的集群模式:啟動多個消費者就可以保證消費者的負載均衡(均攤隊列)
  • 默認使用的是均攤隊列:會按照Queue的數(shù)量和實例的數(shù)量平均分配Queue給每個實例,這樣每個消費者可以均攤消費的隊列摊欠,如下圖所示6個隊列和三個生產(chǎn)者丢烘。
  • 另外一種平均的算法環(huán)狀輪流分Queue的形式,每個消費者些椒,均攤不同主節(jié)點的一個消息隊列播瞳,如下圖所示:

對于廣播模式并不是負載均衡的,要求一條消息需要投遞到一個消費組下面所有的消費者實例免糕,所以也就沒有消息被分攤消費的說法赢乓。

死信隊列

當一條消息消費失敗,RocketMQ就會自動進行消息重試石窑。而如果消息超過最大重試次數(shù)牌芋,RocketMQ就會認為這個消息有問題。但是此時尼斧,RocketMQ不會立刻將這個有問題的消息丟棄,而會將其發(fā)送到這個消費者組對應(yīng)的一種特殊隊列:死信隊列试吁。死信隊列的名稱是 %DLQ%+ConsumGroup 棺棵。

死信隊列具有以下特性:

  1. 一個死信隊列對應(yīng)一個Group ID, 而不是對應(yīng)單個消費者實例熄捍。
  2. 如果一個Group ID未產(chǎn)生死信消息烛恤,消息隊列RocketMQ不會為其創(chuàng)建相應(yīng)的死信隊列。
  3. 一個死信隊列包含了對應(yīng)Group ID產(chǎn)生的所有死信消息余耽,不論該消息屬于哪個Topic缚柏。

Kafka

Kafka是一個分布式、支持分區(qū)的碟贾、多副本的币喧,基于ZooKeeper協(xié)調(diào)的分布式消息系統(tǒng)轨域。

新版Kafka已經(jīng)不再需要ZooKeeper。

它最大的特性就是可以實時的處理大量數(shù)據(jù)以滿足各種需求場景:比如基于Hadoop的批處理系統(tǒng)杀餐、低延遲的實時系統(tǒng)干发、Storm/Spark流式處理引擎,Web/Nginx日志史翘、訪問日志枉长,消息服務(wù)等等,用Scala語言編寫琼讽。屬于Apache基金會的頂級開源項目必峰。

先看一下Kafka的架構(gòu)圖 :

Kafka的核心概念

在Kafka中有幾個核心概念:

  • Broker:消息中間件處理節(jié)點,一個Kafka節(jié)點就是一個Broker钻蹬,一個或者多個Broker可以組成一個Kafka集群
  • Topic:Kafka根據(jù)topic對消息進行歸類吼蚁,發(fā)布到Kafka集群的每條消息都需要指定一個topic
  • Producer:消息生產(chǎn)者,向Broker發(fā)送消息的客戶端
  • Consumer:消息消費者脉让,從Broker讀取消息的客戶端
  • ConsumerGroup:每個Consumer屬于一個特定的ConsumerGroup桂敛,一條消息可以被多個不同的ConsumerGroup消費,但是一個ConsumerGroup中只能有一個Consumer能夠消費該消息
  • Partition:物理上的概念溅潜,一個topic可以分為多個partition术唬,每個partition內(nèi)部消息是有序的
  • Leader:每個Partition有多個副本,其中有且僅有一個作為Leader滚澜,Leader是負責數(shù)據(jù)讀寫的Partition粗仓。
  • Follower:Follower跟隨Leader,所有寫請求都通過Leader路由设捐,數(shù)據(jù)變更會廣播給所有Follower借浊,F(xiàn)ollower與Leader保持數(shù)據(jù)同步。如果Leader失效萝招,則從Follower中選舉出一個新的Leader蚂斤。當Follower與Leader掛掉、卡住或者同步太慢槐沼,Leader會把這個Follower從 ISR列表 中刪除曙蒸,重新創(chuàng)建一個Follower。
  • Offset:偏移量岗钩。Kafka的存儲文件都是按照offset.kafka來命名纽窟,用Offset做名字的好處是方便查找。例如你想找位于2049的位置兼吓,只要找到2048.kafka的文件即可臂港。

可以這么來理解Topic,Partition和Broker:

一個Topic,代表邏輯上的一個業(yè)務(wù)數(shù)據(jù)集审孽,比如訂單相關(guān)操作消息放入訂單Topic县袱,用戶相關(guān)操作消息放入用戶Topic,對于大型網(wǎng)站來說瓷胧,后端數(shù)據(jù)都是海量的显拳,訂單消息很可能是非常巨量的,比如有幾百個G甚至達到TB級別搓萧,如果把這么多數(shù)據(jù)都放在一臺機器上可定會有容量限制問題杂数,那么就可以在Topic內(nèi)部劃分多個Partition來分片存儲數(shù)據(jù),不同的Partition可以位于不同的機器上瘸洛,相當于分布式存儲揍移。每臺機器上都運行一個Kafka的進程Broker。

Kafka核心總控制器Controller

在Kafka集群中會有一個或者多個Broker反肋,其中有一個Broker會被選舉為控制器(Kafka Controller)那伐,可以理解為 Broker-Leader ,它負責管理整個 集群中所有分區(qū)和副本的狀態(tài)石蔗。

Partition-Leader

Controller選舉機制

在Kafka集群啟動的時候罕邀,選舉的過程是集群中每個Broker都會嘗試在ZooKeeper上創(chuàng)建一個 /controller臨時節(jié)點,ZooKeeper會保證有且僅有一個Broker能創(chuàng)建成功养距,這個Broker就會成為集群的總控器Controller诉探。

當這個Controller角色的Broker宕機了,此時ZooKeeper臨時節(jié)點會消失棍厌,集群里其他Broker會一直監(jiān)聽這個臨時節(jié) 點肾胯,發(fā)現(xiàn)臨時節(jié)點消失了,就競爭再次創(chuàng)建臨時節(jié)點耘纱,就是我們上面說的選舉機制敬肚,ZooKeeper又會保證有一個Broker成為新的Controller。具備控制器身份的Broker需要比其他普通的Broker多一份職責束析,具體細節(jié)如下:

  1. 監(jiān)聽Broker相關(guān)的變化艳馒。為ZooKeeper中的/brokers/ids/節(jié)點添加BrokerChangeListener,用來處理Broker增減的變化员寇。
  2. 監(jiān)聽Topic相關(guān)的變化弄慰。為ZooKeeper中的/brokers/topics節(jié)點添加TopicChangeListener,用來處理Topic增減的變化丁恭;為ZooKeeper中的/admin/delete_topics節(jié)點添加TopicDeletionListener曹动,用來處理刪除Topic的動作斋日。
  3. 從ZooKeeper中讀取獲取當前所有與Topic牲览、Partition以及Broker有關(guān)的信息并進行相應(yīng)的管理 。對于所有Topic所對應(yīng)的ZooKeeper中的/brokers/topics/節(jié)點添加PartitionModificationsListener,用來監(jiān)聽Topic中的分區(qū)分配變化第献。
  4. 更新集群的元數(shù)據(jù)信息贡必,同步到其他普通的Broker節(jié)點中

Partition副本選舉Leader機制

Controller感知到分區(qū)Leader所在的Broker掛了,Controller會從ISR列表(參數(shù) unclean.leader.election.enable=false的前提下)里挑第一個Broker作為Leader(第一個Broker最先放進ISR列表庸毫,可能是同步數(shù)據(jù)最多的副本)仔拟,如果參數(shù)unclean.leader.election.enable為true,代表在ISR列表里所有副本都掛了的時候可以在ISR列表以外的副本中選Leader飒赃,這種設(shè)置利花,可以提高可用性,但是選出的新Leader有可能數(shù)據(jù)少很多载佳。副本進入ISR列表有兩個條件:

  1. 副本節(jié)點不能產(chǎn)生分區(qū)炒事,必須能與ZooKeeper保持會話以及跟Leader副本網(wǎng)絡(luò)連通
  2. 副本能復制Leader上的所有寫操作,并且不能落后太多蔫慧。(與Leader副本同步滯后的副本挠乳,是由replica.lag.time.max.ms配置決定的,超過這個時間都沒有跟Leader同步過的一次的副本會被移出ISR列表)

消費者消費消息的Offset記錄機制

每個Consumer會定期將自己消費分區(qū)的Offset提交給Kafka內(nèi)部Topic:consumer_offsets姑躲,提交過去的時候睡扬,key是consumerGroupId+topic+分區(qū)號,value就是當前Offset的值黍析,Kafka會定期清理Topic里的消息卖怜,最后就保留最新的那條數(shù)據(jù)。

因為__consumer_offsets可能會接收高并發(fā)的請求橄仍,Kafka默認給其分配50個分區(qū)(可以通過 offsets.topic.num.partitions設(shè)置)韧涨,這樣可以通過加機器的方式抗大并發(fā)。

消費者Rebalance機制

Rebalance就是說 如果消費組里的消費者數(shù)量有變化或消費的分區(qū)數(shù)有變化侮繁,Kafka會重新分配消費者與消費分區(qū)的關(guān)系 虑粥。比如consumer group中某個消費者掛了,此時會自動把分配給他的分區(qū)交給其他的消費者宪哩,如果他又重啟了娩贷,那么又會把一些分區(qū)重新交還給他。

注意:Rebalance只針對subscribe這種不指定分區(qū)消費的情況锁孟,如果通過assign這種消費方式指定了分區(qū)彬祖,Kafka不會進行Rebalance。

如下情況可能會觸發(fā)消費者Rebalance:

  1. 消費組里的Consumer增加或減少了
  2. 動態(tài)給Topic增加了分區(qū)
  3. 消費組訂閱了更多的Topic

Rebalance過程中品抽,消費者無法從Kafka消費消息储笑,這對Kafka的TPS會有影響,如果Kafka集群內(nèi)節(jié)點較多圆恤,比如數(shù)百 個突倍,那重平衡可能會耗時極多,所以應(yīng)盡量避免在系統(tǒng)高峰期的重平衡發(fā)生。

Rebalance過程如下

當有消費者加入消費組時羽历,消費者焊虏、消費組及組協(xié)調(diào)器之間會經(jīng)歷以下幾個階段:

第一階段:選擇組協(xié)調(diào)器

組協(xié)調(diào)器GroupCoordinator:每個consumer group都會選擇一個Broker作為自己的組協(xié)調(diào)器coordinator,負責監(jiān)控這個消費組里的所有消費者的心跳秕磷,以及判斷是否宕機诵闭,然后開啟消費者Rebalance。consumer group中的每個consumer啟動時會向Kafka集群中的某個節(jié)點發(fā)送FindCoordinatorRequest請求來查找對應(yīng)的組協(xié)調(diào)器GroupCoordinator澎嚣,并跟其建立網(wǎng)絡(luò)連接疏尿。組協(xié)調(diào)器選擇方式:通過如下公式可以選出consumer消費的Offset要提交到__consumer_offsets的哪個分區(qū),這個分區(qū)Leader對應(yīng)的Broker就是這個consumer group的coordinator公式:

hash(consumer group id) % 對應(yīng)主題的分區(qū)數(shù)

第二階段:加入消費組JOIN GROUP

在成功找到消費組所對應(yīng)的GroupCoordinator之后就進入加入消費組的階段易桃,在此階段的消費者會向GroupCoordinator發(fā)送JoinGroupRequest請求润歉,并處理響應(yīng)。然后GroupCoordinator從一個consumer group中選擇第一個加入group的consumer作為Leader(消費組協(xié)調(diào)器)颈抚,把consumer group情況發(fā)送給這個Leader踩衩,接著這個Leader會負責制定分區(qū)方案。

第三階段(SYNC GROUP)

consumer leader通過給GroupCoordinator發(fā)送SyncGroupRequest贩汉,接著GroupCoordinator就把分區(qū)方案下發(fā)給各個consumer驱富,他們會根據(jù)指定分區(qū)的Leader Broker進行網(wǎng)絡(luò)連接以及消息消費。

消費者Rebalance分區(qū)分配策略

主要有三種Rebalance的策略:range 匹舞、 round-robin 褐鸥、 sticky 。默認情況為range分配策略赐稽。

假設(shè)一個主題有10個分區(qū)(0-9)叫榕,現(xiàn)在有三個consumer消費:

range策略:按照分區(qū)序號排序分配 ,假設(shè)n=分區(qū)數(shù)/消費者數(shù)量 = 3姊舵, m=分區(qū)數(shù)%消費者數(shù)量 = 1晰绎,那么前 m 個消 費者每個分配 n+1 個分區(qū),后面的(消費者數(shù)量-m )個消費者每個分配 n 個分區(qū)括丁。比如分區(qū)0~ 3給一個consumer荞下,分區(qū)4~ 6給一個consumer,分區(qū)7~9給一個consumer史飞。

round-robin策略:輪詢分配 尖昏,比如分區(qū)0、3构资、6抽诉、9給一個consumer,分區(qū)1吐绵、4迹淌、7給一個consumer塞帐,分區(qū)2、5巍沙、 8給一個consumer

sticky策略:初始時分配策略與round-robin類似,但是在rebalance的時候荷鼠,需要保證如下兩個原則:

  1. 分區(qū)的分配要盡可能均勻 句携。
  2. 分區(qū)的分配盡可能與上次分配的保持相同。

當兩者發(fā)生沖突時允乐,第一個目標優(yōu)先于第二個目標 矮嫉。這樣可以最大程度維持原來的分區(qū)分配的策略。比如對于第一種range情況的分配牍疏,如果第三個consumer掛了蠢笋,那么重新用sticky策略分配的結(jié)果如下:consumer1除了原有的0~ 3,會再分配一個7 consumer2除了原有的4~ 6鳞陨,會再分配8和9昨寞。

Producer發(fā)布消息機制剖析

1、寫入方式

producer采用push模式將消息發(fā)布到broker厦滤,每條消息都被append到patition中援岩,屬于順序?qū)懘疟P(順序?qū)懘疟P 比 隨機寫 效率要高,保障 kafka 吞吐率)掏导。

2享怀、消息路由

producer發(fā)送消息到broker時,會根據(jù)分區(qū)算法選擇將其存儲到哪一個partition趟咆。其路由機制為:

hash(key)%分區(qū)數(shù)

3添瓷、寫入流程

  1. producer先從ZooKeeper的 "/brokers/…/state" 節(jié)點找到該partition的leader
  2. producer將消息發(fā)送給該leader
  3. leader將消息寫入本地log
  4. followers從leader pull消息,寫入本地log后向leader發(fā)送ACK
  5. leader收到所有ISR中的replica的ACK后值纱,增加HW(high watermark鳞贷,最后commit的offset)并向producer發(fā)送ACK

HW與LEO

HW俗稱高水位 ,HighWatermark的縮寫虐唠,取一個partition對應(yīng)的ISR中最小的LEO(log-end-offset)作為HW悄晃, consumer最多只能消費到HW所在的位置。另外每個replica都有HW,leader和follower各自負責更新自己的HW的狀 態(tài)凿滤。對于leader新寫入的消息妈橄,consumer不能立刻消費,leader會等待該消息被所有ISR中的replicas同步后更新HW翁脆, 此時消息才能被consumer消費眷蚓。這樣就保證了如果leader所在的broker失效,該消息仍然可以從新選舉的leader中獲取反番。對于來自內(nèi)部broker的讀取請求沙热,沒有HW的限制叉钥。

日志分段存儲

Kafka一個分區(qū)的消息數(shù)據(jù)對應(yīng)存儲在一個文件夾下,以topic名稱+分區(qū)號命名篙贸,消息在分區(qū)內(nèi)是分段存儲的投队, 每個段的消息都存儲在不一樣的log文件里,Kafka規(guī)定了一個段位的log文件最大為1G爵川,做這個限制目的是為了方便把log文件加載到內(nèi)存去操作:

1 ### 部分消息的offset索引文件敷鸦,kafka每次往分區(qū)發(fā)4K(可配置)消息就會記錄一條當前消息的offset到index文件, 2 ### 如果要定位消息的offset會先在這個文件里快速定位寝贡,再去log文件里找具體消息 3 00000000000000000000.index 4 ### 消息存儲文件扒披,主要存offset和消息體 5 00000000000000000000.log 6 ### 消息的發(fā)送時間索引文件,kafka每次往分區(qū)發(fā)4K(可配置)消息就會記錄一條當前消息的發(fā)送時間戳與對應(yīng)的offset到timeindex文件圃泡, 7 ### 如果需要按照時間來定位消息的offset碟案,會先在這個文件里查找 8 00000000000000000000.timeindex 9 10 00000000000005367851.index 11 00000000000005367851.log 12 00000000000005367851.timeindex 13 14 00000000000009936472.index 15 00000000000009936472.log 16 00000000000009936472.timeindex

這個9936472之類的數(shù)字,就是代表了這個日志段文件里包含的起始 Offset颇蜡,也就說明這個分區(qū)里至少都寫入了接近1000萬條數(shù)據(jù)了价说。Kafka Broker有一個參數(shù),log.segment.bytes风秤,限定了每個日志段文件的大小熔任,最大就是1GB。一個日志段文件滿了唁情,就自動開一個新的日志段文件來寫入疑苔,避免單個文件過大,影響文件的讀寫性能甸鸟,這個過程叫做log rolling惦费,正在被寫入的那個日志段文件,叫做active log segment抢韭。

最后附一張ZooKeeper節(jié)點數(shù)據(jù)圖

MQ帶來的一些問題薪贫、及解決方案

如何保證順序消費?

  • RabbitMQ:一個Queue對應(yīng)一個Consumer即可解決刻恭。
  • RocketMQ:hash(key)%隊列數(shù)
  • Kafkahash(key)%分區(qū)數(shù)

如何實現(xiàn)延遲消費瞧省?

  • RabbitMQ:兩種方案 死信隊列 + TTL引入RabbitMQ的延遲插件
  • RocketMQ:天生支持延時消息。
  • Kafka:步驟如下 專門為要延遲的消息創(chuàng)建一個Topic新建一個消費者去消費這個Topic消息持久化再開一個線程定時去拉取持久化的消息鳍贾,放入實際要消費的Topic實際消費的消費者從實際要消費的Topic拉取消息鞍匾。

如何保證消息的可靠性投遞

RabbitMQ:

  • Broker-->消費者:手動ACK
  • 生產(chǎn)者-->Broker:兩種方案

數(shù)據(jù)庫持久化:

1.將業(yè)務(wù)訂單數(shù)據(jù)和生成的Message進行持久化操作(一般情況下插入數(shù)據(jù)庫,這里如果分庫的話可能涉及到分布式事務(wù))2.將Message發(fā)送到Broker服務(wù)器中3.通過RabbitMQ的Confirm機制骑科,在producer端橡淑,監(jiān)聽服務(wù)器是否ACK。4.如果ACK了咆爽,就將Message這條數(shù)據(jù)狀態(tài)更新為已發(fā)送梁棠。如果失敗置森,修改為失敗狀態(tài)。5.分布式定時任務(wù)查詢數(shù)據(jù)庫3分鐘(這個具體時間應(yīng)該根據(jù)的時效性來定)之前的發(fā)送失敗的消息6.重新發(fā)送消息符糊,記錄發(fā)送次數(shù)7.如果發(fā)送次數(shù)過多仍然失敗凫海,那么就需要人工排查之類的操作。

優(yōu)點:能夠保證消息百分百不丟失男娄。

缺點:第一步會涉及到分布式事務(wù)問題行贪。

消息的延遲投遞:

流程圖中,顏色不同的代表不同的message1.將業(yè)務(wù)訂單持久化2.發(fā)送一條Message到broker(稱之為主Message)沪伙,再發(fā)送相同的一條到不同的隊列或者交換機(這條稱為確認Message)中。3.主Message由實際業(yè)務(wù)處理端消費后县好,生成一條響應(yīng)Message围橡。之前的確認Message由Message Service應(yīng)用處理入庫。4~6.實際業(yè)務(wù)處理端發(fā)送的確認Message由Message Service接收后缕贡,將原Message狀態(tài)修改翁授。7.如果該條Message沒有被確認,則通過rpc調(diào)用重新由producer進行全過程晾咪。

優(yōu)點:相對于持久化方案來說響應(yīng)速度有所提升

缺點:系統(tǒng)復雜性有點高收擦,萬一兩條消息都失敗了,消息存在丟失情況谍倦,仍需Confirm機制做補償塞赂。

RocketMQ

生產(chǎn)者弄丟數(shù)據(jù):

Producer在把Message發(fā)送Broker的過程中,因為網(wǎng)絡(luò)問題等發(fā)生丟失昼蛀,或者Message到了Broker宴猾,但是出了問題,沒有保存下來叼旋。針對這個問題仇哆,RocketMQ對Producer發(fā)送消息設(shè)置了3種方式:

同步發(fā)送異步發(fā)送單向發(fā)送

Broker弄丟數(shù)據(jù):

Broker接收到Message暫存到內(nèi)存,Consumer還沒來得及消費夫植,Broker掛掉了讹剔。

可以通過 持久化 設(shè)置去解決:

  1. 創(chuàng)建Queue的時候設(shè)置持久化,保證Broker持久化Queue的元數(shù)據(jù)详民,但是不會持久化Queue里面的消息
  2. 將Message的deliveryMode設(shè)置為2延欠,可以將消息持久化到磁盤,這樣只有Message支持化到磁盤之后才會發(fā)送通知Producer ack

這兩步過后沈跨,即使Broker掛了衫冻,Producer肯定收不到ack的,就可以進行重發(fā)谒出。

消費者弄丟數(shù)據(jù):

Consumer有消費到Message隅俘,但是內(nèi)部出現(xiàn)問題邻奠,Message還沒處理,Broker以為Consumer處理完了为居,只會把后續(xù)的消息發(fā)送碌宴。這時候,就要 關(guān)閉autoack蒙畴,消息處理過后贰镣,進行手動ack , 多次消費失敗的消息,會進入 死信隊列 膳凝,這時候需要人工干預碑隆。

Kafka

生產(chǎn)者弄丟數(shù)據(jù)

設(shè)置了 acks=all ,一定不會丟蹬音,要求是上煤,你的 leader 接收到消息,所有的 follower 都同步到了消息之后著淆,才認為本次寫成功了劫狠。如果沒滿足這個條件,生產(chǎn)者會自動不斷的重試永部,重試無限次独泞。

Broker弄丟數(shù)據(jù)

Kafka 某個 broker 宕機,然后重新選舉 partition 的 leader苔埋。大家想想懦砂,要是此時其他的 follower 剛好還有些數(shù)據(jù)沒有同步,結(jié)果此時 leader 掛了组橄,然后選舉某個 follower 成 leader 之后孕惜,不就少了一些數(shù)據(jù)?這就丟了一些數(shù)據(jù)啊晨炕。

此時一般是要求起碼設(shè)置如下 4 個參數(shù):

replication.factormin.insync.replicasacks=allretries=MAX

我們生產(chǎn)環(huán)境就是按照上述要求配置的衫画,這樣配置之后,至少在 Kafka broker 端就可以保證在 leader 所在 broker 發(fā)生故障瓮栗,進行 leader 切換時削罩,數(shù)據(jù)不會丟失。

消費者弄丟數(shù)據(jù)

你消費到了這個消息费奸,然后消費者那邊自動提交了 offset弥激,讓 Kafka 以為你已經(jīng)消費好了這個消息,但其實你才剛準備處理這個消息愿阐,你還沒處理微服,你自己就掛了,此時這條消息就丟咯缨历。

這不是跟 RabbitMQ 差不多嗎以蕴,大家都知道 Kafka 會自動提交 offset糙麦,那么只要 關(guān)閉自動提交 offset,在處理完之后自己手動提交 offset丛肮,就可以保證數(shù)據(jù)不會丟赡磅。但是此時確實還是可能會有重復消費,比如你剛處理完宝与,還沒提交 offset焚廊,結(jié)果自己掛了,此時肯定會重復消費一次习劫,自己保證冪等性就好了咆瘟。

如何保證消息的冪等?

以 RocketMQ 為例诽里,下面列出了消息重復的場景:

發(fā)送時消息重復

當一條消息已被成功發(fā)送到服務(wù)端并完成持久化袒餐,此時出現(xiàn)了網(wǎng)絡(luò)閃斷或者客戶端宕機,導致服務(wù)端對客戶端應(yīng)答失敗须肆。如果此時生產(chǎn)者意識到消息發(fā)送失敗并嘗試再次發(fā)送消息匿乃,消費者后續(xù)會收到兩條內(nèi)容相同并且Message ID也相同的消息桩皿。

投遞時消息重復

消息消費的場景下豌汇,消息已投遞到消費者并完成業(yè)務(wù)處理,當客戶端給服務(wù)端反饋應(yīng)答的時候網(wǎng)絡(luò)閃斷泄隔。為了保證消息至少被消費一次拒贱,消息隊列RocketMQ版的服務(wù)端將在網(wǎng)絡(luò)恢復后再次嘗試投遞之前已被處理過的消息,消費者后續(xù)會收到兩條內(nèi)容相同并且Message ID也相同的消息佛嬉。

負載均衡時消息重復(包括但不限于網(wǎng)絡(luò)抖動逻澳、Broker重啟以及消費者應(yīng)用重啟)

當消息隊列RocketMQ版的Broker或客戶端重啟、擴容或縮容時暖呕,會觸發(fā)Rebalance斜做,此時消費者可能會收到重復消息。

那么湾揽,有什么解決方案呢瓤逼?直接上圖。

如何解決消息積壓的問題库物?

關(guān)于這個問題霸旗,有幾個點需要考慮:

如何快速讓積壓的消息被消費掉?

臨時寫一個消息分發(fā)的消費者戚揭,把積壓隊列里的消息均勻分發(fā)到N個隊列中诱告,同時一個隊列對應(yīng)一個消費者,相當于消費速度提高了N倍民晒。

修改前:

修改后:

積壓時間太久精居,導致部分消息過期锄禽,怎么處理唉堪?

批量重導误窖。在業(yè)務(wù)不繁忙的時候伴郁,比如凌晨茵乱,提前準備好程序料仗,把丟失的那批消息查出來则北,重新導入到MQ中筑舅。

消息大量積壓媒吗,MQ磁盤被寫滿了辈毯,導致新消息進不來了坝疼,丟掉了大量消息,怎么處理谆沃?

這個沒辦法钝凶。誰讓【消息分發(fā)的消費者】寫的太慢了,你臨時寫程序唁影,接入數(shù)據(jù)來消費耕陷,消費一個丟棄一個,都不要了据沈,快速消費掉所有的消息哟沫。然后走第二個方案,到了晚上再補數(shù)據(jù)吧锌介。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末嗜诀,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子孔祸,更是在濱河造成了極大的恐慌隆敢,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,651評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件崔慧,死亡現(xiàn)場離奇詭異拂蝎,居然都是意外死亡,警方通過查閱死者的電腦和手機惶室,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,468評論 3 392
  • 文/潘曉璐 我一進店門温自,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人拇涤,你說我怎么就攤上這事捣作。” “怎么了鹅士?”我有些...
    開封第一講書人閱讀 162,931評論 0 353
  • 文/不壞的土叔 我叫張陵券躁,是天一觀的道長。 經(jīng)常有香客問我,道長也拜,這世上最難降的妖魔是什么以舒? 我笑而不...
    開封第一講書人閱讀 58,218評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮慢哈,結(jié)果婚禮上蔓钟,老公的妹妹穿的比我還像新娘。我一直安慰自己卵贱,他們只是感情好滥沫,可當我...
    茶點故事閱讀 67,234評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著键俱,像睡著了一般兰绣。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上编振,一...
    開封第一講書人閱讀 51,198評論 1 299
  • 那天缀辩,我揣著相機與錄音,去河邊找鬼踪央。 笑死臀玄,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的畅蹂。 我是一名探鬼主播健无,決...
    沈念sama閱讀 40,084評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼魁莉!你這毒婦竟也來了睬涧?” 一聲冷哼從身側(cè)響起募胃,我...
    開封第一講書人閱讀 38,926評論 0 274
  • 序言:老撾萬榮一對情侶失蹤旗唁,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后痹束,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體检疫,經(jīng)...
    沈念sama閱讀 45,341評論 1 311
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,563評論 2 333
  • 正文 我和宋清朗相戀三年祷嘶,在試婚紗的時候發(fā)現(xiàn)自己被綠了屎媳。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,731評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡论巍,死狀恐怖烛谊,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情嘉汰,我是刑警寧澤丹禀,帶...
    沈念sama閱讀 35,430評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站,受9級特大地震影響双泪,放射性物質(zhì)發(fā)生泄漏持搜。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,036評論 3 326
  • 文/蒙蒙 一焙矛、第九天 我趴在偏房一處隱蔽的房頂上張望葫盼。 院中可真熱鬧,春花似錦村斟、人聲如沸贫导。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,676評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽脱盲。三九已至,卻和暖如春日缨,著一層夾襖步出監(jiān)牢的瞬間钱反,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,829評論 1 269
  • 我被黑心中介騙來泰國打工匣距, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留面哥,地道東北人。 一個月前我還...
    沈念sama閱讀 47,743評論 2 368
  • 正文 我出身青樓毅待,卻偏偏與公主長得像尚卫,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子尸红,可洞房花燭夜當晚...
    茶點故事閱讀 44,629評論 2 354

推薦閱讀更多精彩內(nèi)容