- 一叙淌、架構(gòu)圖對比
- 1.1 Kafka Architecture
- 1.2 RocketMQ Architecture
- 1.3 總結(jié)
- 二茂洒、高可用對比
- 2.1 Kafka
- 2.2 RocketMQ
- 2.3 總結(jié)
- 三智哀、文件存儲(chǔ)結(jié)構(gòu)對比
- 3.1 Kafka
- 3.2 RocketMQ
- 3.3 總結(jié)
- 四盒卸、發(fā)送數(shù)據(jù)可靠性對比
- 五虹蓄、功能豐富性對比
- 5.1 對比
- 5.2 關(guān)于有序消息
- 六、生產(chǎn)者分區(qū)分配策略對比
- 6.1 Kafka
- 6.2 RocketMQ
- 6.3 總結(jié)
- 七跃须、消費(fèi)者分區(qū)分配策略對比
- 7.1 Kafka
- 7.2 RocketMQ
- 7.3 總結(jié)
- 八第练、RocketMQ中的部分源碼分析
- 8.1 異步婴梧、同步竟坛、單向生產(chǎn)消息
- 8.1.1 同步消息發(fā)送流程
- 8.1.2 單向消息發(fā)送流程
- 8.1.3 異步消息發(fā)送流程
- 8.2 Tag消息過濾
- 8.3 順序消息的保證
- 8.1 異步婴梧、同步竟坛、單向生產(chǎn)消息
Kafka:Linkedin研發(fā)并捐贈(zèng)給Apache軟件基金會(huì)的開源分布式消息系統(tǒng)
RocketMQ:阿里研發(fā)并捐贈(zèng)給Apache軟件基金會(huì)的開源分布式消息系統(tǒng)
一、架構(gòu)圖對比
1.1 Kafka Architecture
- Zookeeper集群:存儲(chǔ)Kafka集群的元數(shù)據(jù)绒尊,Kafka早期版本Consumer Offset也被存儲(chǔ)在此
- Broker集群:對外提供消息服務(wù)谭羔、消息存儲(chǔ)的地方
- Producer Client:數(shù)據(jù)生產(chǎn)者
- Consumer Client:數(shù)據(jù)消費(fèi)者
1.2 RocketMQ Architecture
- NameServer集群:存儲(chǔ)RocketMQ集群的路由信息沙郭,譬如BrokerData、TopicData罢荡,角色可類比到Kafka中的Zookeeper集群络凿,但NameServer集群是無中心化集群,所有機(jī)器地位相等了赵,且不同之間NameServer無任何數(shù)據(jù)交互项玛,相比于ZK集群更加輕量固灵。
- Broker集群:對外提供服務(wù)诗力、消息存儲(chǔ)
- Producer Client:數(shù)據(jù)生產(chǎn)者
- Consume Client:數(shù)據(jù)消費(fèi)者
1.3 總結(jié)
大體看來递递,兩款MQ的組成部分相似践叠,有負(fù)責(zé)消息存儲(chǔ)與檢索的服務(wù)、有集群注冊中心分飞、還有業(yè)務(wù)側(cè)必須用的兩個(gè)Client挖垛。
這里大致說一下NameServer與Zookeeper的對比:
在分布式環(huán)境下多臺(tái)機(jī)器肯定需要互相協(xié)調(diào)以應(yīng)對各類復(fù)雜場景包括:對外提供服務(wù)菇怀、內(nèi)部容錯(cuò)闽铐、HA澳叉。Kafka與RocketMQ也是一樣,他們分別引入Zookeeper與NameServer作為自身集群的協(xié)調(diào)工作者,其實(shí)更像是一個(gè)元數(shù)據(jù)存儲(chǔ)地與注冊中心载慈,它們都存儲(chǔ)著集群下機(jī)器信息與Topic信息等,為多機(jī)之間的協(xié)調(diào)工作提供數(shù)據(jù)支持。
Zookeeper是一個(gè)比較重量級的中間件小作,在集群模式下,Zookeeper通過ZAB協(xié)議來保證Leader
與follower
之間的數(shù)據(jù)一致击儡,這里說一個(gè)疑惑:網(wǎng)上很多資料會(huì)說ZK是強(qiáng)一致的,其實(shí)不是,ZAB協(xié)議中在數(shù)據(jù)同步時(shí),采用的也是過半機(jī)制,也就是說只要半數(shù)follower
向Leader
進(jìn)行ACK后祠丝,Leader
會(huì)被認(rèn)為數(shù)據(jù)已經(jīng)同步成功疾呻,因此嚴(yán)格來說除嘹,ZK集群并不能保證數(shù)據(jù)強(qiáng)一致。
NameServer相比于ZK是一個(gè)非常輕量級的組件岸蜗,是無中心化的集群模式尉咕,集群下各節(jié)點(diǎn)是無差異的,節(jié)點(diǎn)與節(jié)點(diǎn)之間不會(huì)相互進(jìn)行信息通訊璃岳,因此NameServer可以看成是無狀態(tài)年缎,那么自然就不需要引入各類復(fù)雜的分布式協(xié)議以保證集群的數(shù)據(jù)一致性與容錯(cuò)性。
二铃慷、高可用對比
這里說的高可用主要針對Broker HA進(jìn)行說明单芜,注意:對于Rocket與Kafka來說,Broker的高可用有著較大的差異犁柜,個(gè)人覺得是兩者對比的重點(diǎn)洲鸠。
對于MQ產(chǎn)品而言保證消息的安全一定是重中之重的任務(wù),RocketMQ與Kafka都引入了數(shù)據(jù)冗余策略來保證HA
2.1 Kafka
上圖為執(zhí)行以下命令得到的一段幫助文檔:
cd ${KAFKA_HOME}/bin
sh kafka-topics.sh -help
Kafka在創(chuàng)建Topic時(shí)馋缅,可指定--replication-factor
參數(shù)扒腕,該參數(shù)表示為每個(gè)partition
創(chuàng)建多少個(gè)副本,而這些副本不能在同臺(tái)機(jī)器上萤悴,以保證HA袜匿,如下圖:
分區(qū)follower
會(huì)主動(dòng)向leader
進(jìn)行數(shù)據(jù)同步以維護(hù)自身消息,當(dāng)leader
所在機(jī)器發(fā)生不可用時(shí)稚疹,只要還有follower
可用則依然可以對外提供服務(wù)(這里針對主居灯、副本切換可能導(dǎo)致部署數(shù)據(jù)丟失情況不作討論)。
總體來看内狗,Topic下的partition
有幾個(gè)replication
具體是在創(chuàng)建該Topic時(shí)指定的怪嫌,是以Topic為維度設(shè)置的。
2.2 RocketMQ
RocketMQ中每個(gè)Topic MessageQueue(可以類比到Kafka中的Partition角色)
可以有幾個(gè)副本并不是在創(chuàng)建Topic時(shí)指定柳沙,而是由Broker集群創(chuàng)建之初就決定了岩灭,下圖列舉了1m-1s的集群結(jié)構(gòu)。
在${ROCKET_HOME}/conf
目錄下就內(nèi)置了四種Broker部署模式:
- 單機(jī):當(dāng)前Broker掉線則會(huì)導(dǎo)致整個(gè)RocketMQ不可用赂鲤,數(shù)據(jù)沒有任何冗余噪径,無法保證HA
- 2m-noslave:單臺(tái)機(jī)器宕機(jī)期間,這臺(tái)機(jī)器上未被消費(fèi)的消息在機(jī)器恢復(fù)之前不可訂閱数初,消息實(shí)時(shí)性會(huì)受到影響
- 2m-2s-async:每個(gè)Master配置一個(gè)Slave找爱,有多對Master-Slave,HA采用異步復(fù)制方式泡孩,主備有短暫消息延遲(毫秒級)
- 2m-2s-sync:每個(gè)Master配置一個(gè)Slave车摄,有多對Master-Slave,HA采用同步雙寫方式,即只有主備都寫成功吮播,才向應(yīng)用返回成功变屁,但是會(huì)導(dǎo)致單條消息的RT會(huì)略高
2.3 總結(jié)
因此Kafka的副本機(jī)制更加靈活,不會(huì)受限于Kafka Cluster的部署模式意狠,而是可以根據(jù)業(yè)務(wù)需要細(xì)粒度的控制自身Topic-partition的副本粟关,而RocketMQ則是受限于Cluster部署模式,其副本可以看成是Master-Broker的冗余环戈,無法單獨(dú)制定某個(gè)Topic下MessageQueue的副本闷板。
簡而言之,副本的維度不同谷市,Kafka是Partition級別蛔垢,RocketMQ是Broker級別。
三迫悠、文件存儲(chǔ)結(jié)構(gòu)對比
3.1 Kafka
sh kafka-topics.sh --create --topic multi-test --zookeeper zk:2181 --partitions 2 --replication-factor 1
創(chuàng)建完該Topic之后可以在${KAFKA_HOME}/data
下看到新建的兩個(gè)目錄:multi-test-0
鹏漆、multi-test-1
分別對應(yīng)著multi-test
下的兩個(gè)partition分區(qū),而每個(gè)文件夾下都有兩個(gè)非常重要的文件:
- index
index文件主要用于對log
文件的索引创泄,內(nèi)部每條記錄大小固定艺玲,這可以非常快速的根據(jù)Offset找到log
文件中對應(yīng)記錄鞠抑。
然后再根據(jù)記錄中消息在log
文件中的偏移量找出具體消息 - log
所有的消息都存在此文件中饭聚,根據(jù)配置定期壓縮或刪除
3.2 RocketMQ
RocketMQ中的文件較多,下面列舉最核心的三類文件:
- commitLog
消息存儲(chǔ)的地方搁拙,但是并不區(qū)分當(dāng)前broker的不同Topic秒梳,也就是說,不論當(dāng)前Broker承載了多少Topic箕速,所有的數(shù)據(jù)全部順序?qū)懭朐撐募A下的文件中酪碘。 - consumeQueue
是commitLog文件的索引文件,每條消息定長盐茎,方便根據(jù)偏移量快速計(jì)算查找兴垦。每條記錄包含三個(gè)字段:
1)消息在commitLog偏移量
2)消息長度
3)消息Tag HashCode:用于消息過濾 - config
記錄元信息,主要包含兩類:- Topic元數(shù)據(jù)
- 各個(gè)consume消費(fèi)進(jìn)度
當(dāng)然RocketMQ除了上述兩類文件字柠,還有abort探越、index
文件,再次不再贅述窑业。
3.3 總結(jié)
Kafka與RocketMQ的文件結(jié)構(gòu)大體相似钦幔,但因?yàn)镽ocketMQ所支持的消息類型更加豐富一些,因此文件也會(huì)稍稍多一些数冬。
Kafka的數(shù)據(jù)日志文件是按Topic-Partition為維度內(nèi)部再按日志大小進(jìn)行切分节槐,而RocketMQ日志文件是按Broker為維度內(nèi)部再按日志大小進(jìn)行切分搀庶,這是最大的不同點(diǎn)拐纱。
四铜异、發(fā)送數(shù)據(jù)可靠性對比
Kafka與RocketMQ都支持?jǐn)?shù)據(jù)發(fā)送失敗重試機(jī)制,其中RocketMQ支持的重試種類更加豐富:
- retryTimesWhenSendFailed
同步發(fā)送失敗重試次數(shù) - retryTimesWhenSendAsyncFailed
異步發(fā)送失敗重試次數(shù) - retryAnotherBrokerWhenNotStoreOK
消息刷盤失敗或Slave不可用
但需要注意的是:RocketMQ Broadcasting Message不支持重試
五秸架、功能豐富性對比
5.1 對比
RocketMQ較Kafka而言揍庄,功能更加豐富,RocketMQ主要的消息功能有:
- 有序消息
- 廣播消息
- 延遲消息
- 消息過濾
- 事務(wù)消息
- 消費(fèi)者支持PULL/PUSH模型
而Kafka只支持簡單消息的流轉(zhuǎn)东抹,并不支持上述功能蚂子。
5.2 關(guān)于有序消息
這里需要先明確一個(gè)前提,即RocketMQ Consumer使用的是PUSH模式缭黔,因?yàn)镻OLL模式下是否存在消費(fèi)者消費(fèi)亂序完全取決于自己程序的編寫食茎,因此PULL模式不作考慮。
關(guān)于有序消息馏谨,RocketMQ的官網(wǎng)中單獨(dú)把它當(dāng)作一個(gè)功能别渔,也是網(wǎng)上其他帖子提到RocketMQ就一定會(huì)提到的功能。
有序消息需要保證三個(gè)地方的有序:
- Producer數(shù)據(jù)發(fā)送有序
順序在前的消息一定要先發(fā)網(wǎng)Broker惧互,若失敗則需要不斷進(jìn)行重試哎媚,才能發(fā)下一條消息 - Broker中有序
Broker中隊(duì)列天然符合FIFO
策略,因此這一環(huán)可以暫時(shí)不用考慮 - Consumer消費(fèi)有序
Consume Pull下來的消息喊儡,處理要保證有序
通過上述三個(gè)環(huán)節(jié)的分析可以得出:
需要保證有序的消息需要被發(fā)送到Broker中同一個(gè)partition
或者messageQueue
中拨与,然后利用隊(duì)列本身的FIFO
特性保證前兩步消息有序,剩下的只要保證消費(fèi)者端能夠有序的進(jìn)行消息消費(fèi)即可艾猜。
而在第三步中买喧,兩個(gè)MQ具有不同的處理方式。Kafka Consumer中單個(gè)分區(qū)是單線程處理匆赃,而Rocket Consumer是線程池方式處理淤毛。
因此對于Kafka而言,Producer只要保證相關(guān)聯(lián)消息進(jìn)入同一個(gè)Partition
即可保證消息順序消費(fèi)炸庞。
對于Rocket而言钱床,由于拉取到的一批數(shù)據(jù)是并行處理的,加上CPU的調(diào)度不確定性埠居,因此這一環(huán)節(jié)如果不加手段處理查牌,則無法保證有序性。
在RocketMQ是通過對當(dāng)前處理隊(duì)列加鎖的方式保證了消費(fèi)有序滥壕,即:同一個(gè)隊(duì)列只有一個(gè)線程進(jìn)行處理纸颜,具體的代碼分析請移步本文8.3順序消息的保證
小節(jié),某種意義上來說绎橘,相當(dāng)于把消息處理線程池設(shè)置為單線程模式胁孙。
六唠倦、生產(chǎn)者分區(qū)分配策略對比
6.1 Kafka
- Round-robin:消息以輪訓(xùn)方式發(fā)送到不同
Partition
- 指定Partition:Kafka Producer API中支持消息指定某
Partition
進(jìn)行發(fā)送 - 指定Key:發(fā)送消息時(shí)指定Key,Kafka Producer內(nèi)部會(huì)將Key進(jìn)行Hash并對
Partition Num
進(jìn)行取模操作以選出具體的Partition
6.2 RocketMQ
- Round-robin:消息以輪訓(xùn)方式發(fā)送到不同
MessageQueue
- MessageSelector:消息發(fā)送時(shí)涮较,自定義MessageSelector函數(shù)
6.3 總結(jié)
Kafka與RocketMQ的多Partition/MessageQueue
機(jī)制本身都是為了提升生產(chǎn)效率稠鼻。
生產(chǎn)者的分區(qū)分配是在Producer Client中完成的,而不是消息發(fā)送到Broker之后再去做負(fù)載狂票,這是為了減少Broker的壓力候齿。
七、消費(fèi)者分區(qū)分配策略對比
7.1 Kafka
Round-robin
以輪詢方式將Partition
分發(fā)給同一個(gè)消費(fèi)者組下的消費(fèi)者-
Range
將Partition
按照分區(qū)號進(jìn)行排序闺属,再將ConsumerGruop
下的Consume
進(jìn)行排序慌盯,然后以近乎平均分的方式將Partition
分配出去若某個(gè)
ConsumerGroup
同時(shí)以Range
模式訂閱多個(gè)Topic
時(shí),可能會(huì)造成消費(fèi)者的浪費(fèi)掂器,這一點(diǎn)SpringDataKafka
官網(wǎng)有描述亚皂,如下圖:
7.2 RocketMQ
- 平均分配策略
將MessageQueue
按ID排序,然后按照AVG
算法進(jìn)行平均分配給Consumer
- 環(huán)形平均策略
其效果等同于Kafka
輪詢方式分配 - 一致性Hash策略
該算法會(huì)將Consumer
的Hash值作為Node
節(jié)點(diǎn)存放到Hash環(huán)上国瓮,然后將MessageQueue
也按照Hash值放入該環(huán)中灭必,通過順時(shí)針方向,距離MessageQueue
最近的Consumer
就是該MessageQueue
要分配的Consumer
- 同機(jī)房策略
該算法會(huì)根據(jù)MessageQueue
的部署機(jī)房位置與Consumer
的位置巍膘,過濾出當(dāng)前Consumer
相同機(jī)房的MessageQueue
厂财,然后再按照平均分配 或 環(huán)形策略對同機(jī)房的MessageQueue
二次分配
7.3 總結(jié)
消費(fèi)者分區(qū)分配同生產(chǎn)者分區(qū)分配一致,依然是在Consumer Client
中通過對應(yīng)API指定峡懈,其中RocketMQ的分區(qū)策略更加豐富一些璃饱。
八、RocketMQ中的部分源碼分析
8.1 異步肪康、同步荚恶、單向生產(chǎn)消息
8.1.1 同步消息發(fā)送流程
源碼筆記如下:
由于RocketMQ底層使用Netty
完成網(wǎng)絡(luò)通訊,而上述方法的入?yún)?code>Channel是Netty
核心組件之一磷支,首先Producer Client API
會(huì)先在自身完成MessageQueue
的負(fù)載谒撼,在選出MessageQueue
之后,便根據(jù)MessageQueue
找出Producer Client
端維護(hù)的通向各個(gè)MessageQueue
的長鏈接雾狈,并獲取對應(yīng)的Channel
對象廓潜。
第412行代碼首先會(huì)為本次請求構(gòu)造出一個(gè)全局ID,然后開始構(gòu)造ResponseFuture
對象善榛,該對象內(nèi)部有一個(gè)核心的CountDownLatch
辩蛋,以完成Future
特性,然后會(huì)將opaque
與ResponseFuture
的關(guān)聯(lián)關(guān)系保存至一個(gè)全局ConcurrentHashMap
中移盆;
將封裝好的Request
寫入到Channel
中發(fā)送出去悼院,在其會(huì)調(diào)函數(shù)中,對發(fā)送失敗的數(shù)據(jù)進(jìn)行去除一些狀態(tài)咒循,這里不做過多的描述据途。
在第436行绞愚,調(diào)用了ResponseFuture
的waitResponse
方法,該方法源碼如下:
可以看到就是調(diào)用了內(nèi)部CountDownLatch
讓當(dāng)前線程進(jìn)行阻塞等待狀態(tài)颖医,當(dāng)該線程阻塞完畢之后位衩,接著往下就是對外返回Response
相應(yīng)了,因此就能達(dá)到Sync
發(fā)送的效果便脊。
那么接下來只要思考一個(gè)問題即可:CountDownLatch
是在何處觸發(fā)了countDown
蚂四?看ResponseFuture
源碼中另一個(gè)方法:
那么該方法是在何處被調(diào)用呢光戈?
其實(shí)就在Broker
的響應(yīng)發(fā)送到Producer Client
時(shí)調(diào)用的哪痰,還記得一開始的opaque
變量嗎?當(dāng)Producer Client
發(fā)送消息時(shí)久妆,也會(huì)將該參數(shù)一并發(fā)送出去晌杰,當(dāng)Broker
端接受并處理完畢后,依然會(huì)將opaque
變量放入Response
一起發(fā)回到Producer Client
筷弦,此時(shí)Producer Client
只要在接受到響應(yīng)的地方解析出opaque
并在opaque
與ResponseFuture
的關(guān)聯(lián)關(guān)系ConcurrentHashMap
中找出ResponseFuture
并調(diào)用其putResponse
方法即可肋演。
上述過程可能有些復(fù)雜,需要童鞋們先對Netty
編程烂琴、網(wǎng)絡(luò)協(xié)議
有一個(gè)大致了解爹殊。畢竟直接基于TCP
協(xié)議層開發(fā)難度肯定要大于直接使用市面上已有的應(yīng)用層協(xié)議
。
8.1.2 單向消息發(fā)送流程
單向消息其實(shí)核心處理邏輯與8.1.1同步消息發(fā)送流程
一樣奸绷,唯一不同的是:單向消息不會(huì)把ResponseCommand
對象返回給業(yè)務(wù)方
8.1.3 異步消息發(fā)送流程
源碼大致與同步發(fā)送類似梗夸,最大的不同就是方法如參會(huì)有一個(gè)CallBack
函數(shù),該函數(shù)就是業(yè)務(wù)編寫的回調(diào)函數(shù)号醉,該函數(shù)會(huì)被納入到RespinseFuture
對象中反症,然后數(shù)據(jù)發(fā)送之后也沒有調(diào)用waitResponse
對線程進(jìn)行阻塞。
上圖為ResponseFuture
對象中執(zhí)行callback
函數(shù)的源碼畔派,這樣就可以理清楚Async
方式的實(shí)現(xiàn)邏輯了铅碍。
上述源碼為Producer Client
接受到Broker
的響應(yīng)之后對數(shù)據(jù)進(jìn)行處理的方法,同樣根據(jù)消息中的opaque
變量在opaque
與RespinseFuture
關(guān)系表中找到對應(yīng)的ResponseFuture
對象线椰,然后執(zhí)行executrInvokeCallBack
方法胞谈,該方法源碼如下:
這樣就完成了Async
邏輯。
8.2 Tag消息過濾
首先要明確一個(gè)前提:
在3.2 Rocket文件結(jié)構(gòu)
中描述了consumeQueue
中每條記錄都包含Tag
的HashCode
憨愉,并且Consumer
在獲取消息時(shí)烦绳,會(huì)經(jīng)過一次Tag HashCode
比較,然后才會(huì)從commitLog
中取出具體數(shù)據(jù)那是不是意味著消息過濾任務(wù)已經(jīng)完成了莱衩?其實(shí)不是爵嗅,考慮如下情況:某些情況下不同的Tag
其HashCode
可能相同,至于Broker
端為什么不再consumeQueue
中直接存儲(chǔ)Tag
字符串笨蚁,然后使用equals
方式對比呢睹晒?主要是因?yàn)?code>HashCode為整型數(shù)據(jù)對比效率高趟庄。
在有了上述背景之后,也就了解了消息過濾不僅要在Broker
側(cè)過濾也要在Consumer Client
側(cè)對Tag
字符串進(jìn)行過濾伪很,源碼如下:
在Consumer
獲取到Message
之后(不論是PULL模型還是PUSH模型)戚啥,都會(huì)遍歷消息將Tag
相符的消息拿出。
8.3 順序消息的保證
對于有序與無序方式消費(fèi)锉试,這里討論的前提都是基于PUSH
模型下:
對于有序消息猫十,先會(huì)根據(jù)MessageQueue
對象獲取到其對應(yīng)的鎖,源碼如下:
在獲取鎖完畢后先進(jìn)行上鎖再進(jìn)行具體的消費(fèi)業(yè)務(wù)邏輯處理呆盖。