RocketMQ 是一款開源的分布式消息系統(tǒng)驻债,基于高可用分布式集群技術(shù)乳规,提供低延時的、高可靠的消息發(fā)布與訂閱服務(wù)合呐。同時暮的,廣泛應(yīng)用于多個領(lǐng)域,包括異步通信解耦合砂、企業(yè)解決方案青扔、金融支付、電信翩伪、電子商務(wù)微猖、快遞物流、廣告營銷缘屹、社交凛剥、即時通信、移動應(yīng)用轻姿、手游犁珠、視頻、物聯(lián)網(wǎng)互亮、車聯(lián)網(wǎng)等犁享。
RocketMQ 前身叫做MetaQ,在MetaQ發(fā)布3.0版本的時候改名為 RocketMQ
RocketMQ本質(zhì)上的設(shè)計思路和Kafka類似豹休,但是和Kafka不同的是其使用Java進(jìn)行開發(fā)炊昆,由于在國內(nèi)的Java受眾群體遠(yuǎn)遠(yuǎn)多于Scala、Erlang威根,所以RocketMQ是很多以Java語言為主的公司的首選凤巨。同樣的RocketMQ和Kafka都是Apache基金會中的頂級項目,他們社區(qū)的活躍度都非常高洛搀,項目更新迭代也非掣易拢快。
RocketMQ是阿里review kafka的java版留美,如果消息性能要求高彰檬,用 RocketMQ 與 Kafka 可以更優(yōu)
消息隊列在實際應(yīng)用中常用的使用場景伸刃,包含應(yīng)用解耦、異步處理僧叉、流量削鋒奕枝、消息通訊、日志處理等瓶堕。
RocketMQ 官網(wǎng):http://rocketmq.apache.org
RocketMQ Github:https://github.com/apache/rocketmq
Kafka 官網(wǎng):http://kafka.apache.org
ActiveMQ 官網(wǎng):http://activemq.apache.org
RabbitMQ 官網(wǎng):https://www.rabbitmq.com
ZeroMQ 官網(wǎng):https://zeromq.org
MetaMQ RocketMQ的前世今生
某公司一直用的消息中間件是MetaMQ現(xiàn)在,網(wǎng)上相關(guān)的資料也不是很多症歇,今天去想淘寶為什會把MetaMQ給替換成了RocketMQ郎笆。就網(wǎng)上搜索了一下,這兩個居然是爺孫關(guān)系忘晤。
阿里巴巴消息中間件起源于2001年的五彩石項目宛蚓,Notify在這期間應(yīng)運(yùn)而生,用于交易核心消息的流轉(zhuǎn)设塔。
至2010年凄吏,B2B開始大規(guī)模使用ActiveMQ作為消息內(nèi)核,隨著阿里業(yè)務(wù)的快速發(fā)展闰蛔,急需一款支持順序消息痕钢,擁有海量消息堆積能力的消息中間件,MetaQ 1.0在2011年誕生序六。
到2012年任连,MetaQ已經(jīng)發(fā)展到了MetaQ 3.0,并抽象出了通用的消息引擎RocketMQ例诀。
隨后随抠,將RocketMQ進(jìn)行了開源,阿里的消息中間件正式走入了公眾的視野繁涂。
到2015年拱她,RocketMQ已經(jīng)經(jīng)歷了多年雙十一的洗禮,在可用性扔罪、可靠性以及穩(wěn)定性等方面都有出色的表現(xiàn)秉沼。與此同時,云計算大行其道步势,阿里消息中間件基于RocketMQ推出了Aliware MQ 1.0氧猬,開始為阿里云上成千上萬家企業(yè)提供消息服務(wù)。
到今年坏瘩,MetaQ在2016年雙十一承載了萬億級消息的流轉(zhuǎn)盅抚,跨越了一個新的里程碑,同時RocketMQ進(jìn)入Apache 孵化倔矾。
RocketMQ 產(chǎn)品發(fā)展歷史
大約經(jīng)歷了三個主要版本迭代:
1)Metaq 1.x(Metamorphosis)
由開源社區(qū)killme2008維護(hù)妄均,開源社區(qū)非持拢活躍 https://github.com/killme2008/Metamorphosis
2)Metaq 2.x
于2012年10月份上線,在淘寶內(nèi)部被廣泛使用丰包。
3)RocketMQ 3.x
基于公司內(nèi)部開源共建原則禁熏,?RocketMQ項目只維護(hù)核心功能,且去除了所有其他運(yùn)行時依賴邑彪,核心功能最簡化瞧毙。每個BU的個性化需求都在RocketMQ項目之上進(jìn)行深度定制。RocketMQ向其他BU提供的僅僅是Jar包寄症,例如要定制一個Broker宙彪,那么只需要依賴rocketmq-broker這個jar包即可,可通過API進(jìn)行交互有巧,如果定制client释漆,則依賴rocketmq-client這個jar包,對其提供的api進(jìn)行再封裝篮迎。
在RocketMQ項目基礎(chǔ)上衍生的項目如下
com.taobao.metaq v3.0 = RocketMQ + 淘寶個性化需求男图,為淘寶應(yīng)用提供消息服務(wù)。
com.alipay.zpullmsg v1.0 = RocketMQ + 支付寶個性化需求甜橱,為支付寶應(yīng)用提供消息服務(wù)逊笆。
com.alibaba.commonmq v1.0 = Notify + RocketMQ + B2B個性化需求,為B2B應(yīng)用提供消息服務(wù)渗鬼。
一览露、 MQ背景&選型
消息隊列作為分布式、高并發(fā)系統(tǒng)的核心組件之一譬胎,能夠幫助業(yè)務(wù)系統(tǒng)解構(gòu)差牛,提升開發(fā)效率和系統(tǒng)穩(wěn)定性。
MQ 主要具有以下優(yōu)勢:
1)削峰填谷:主要解決瞬時寫壓力大于應(yīng)用服務(wù)能力導(dǎo)致消息丟失堰乔、系統(tǒng)奔潰等問題
2)系統(tǒng)解耦:解決不同重要程度偏化、不同能力級別系統(tǒng)之間依賴導(dǎo)致一死全死
3)提升性能:當(dāng)存在一對多調(diào)用時,可以發(fā)一條消息給消息系統(tǒng)镐侯,讓消息系統(tǒng)通知相關(guān)系統(tǒng)
4)蓄流壓測:線上有些鏈路不好壓測侦讨,可以通過堆積一定量消息再放開來壓測
目前主流的MQ主要是RocketMQ、kafka苟翻、RabbitMQ等
RocketMQ相比于RabbitMQ韵卤、kafka具有主要優(yōu)勢特性有:
1)支持事務(wù)型消息(消息發(fā)送和DB操作保持兩方的最終一致性,rabbitmq和kafka不支持)
2)支持結(jié)合rocketmq的多個系統(tǒng)之間數(shù)據(jù)最終一致性(多方事務(wù)崇猫,二方事務(wù)是前提)
3)支持18個級別的延遲消息(rabbitmq和kafka不支持)
4)支持指定次數(shù)和時間間隔的失敗消息重發(fā)(kafka不支持沈条,rabbitmq需要手動確認(rèn))
5)支持consumer端tag過濾,減少不必要的網(wǎng)絡(luò)傳輸(rabbitmq和kafka不支持)
6)支持重復(fù)消費(fèi)(rabbitmq不支持诅炉,kafka支持)
Rocketmq蜡歹、kafka屋厘、Rabbitmq的詳細(xì)對比,請參照下表格:
二月而、RocketMQ集群概述
1. RocketMQ集群部署結(jié)構(gòu)
1) Name Server
Name Server是一個幾乎無狀態(tài)節(jié)點(diǎn)汗洒,可集群部署,節(jié)點(diǎn)之間無任何信息同步父款。
2) Broker
Broker部署相對復(fù)雜溢谤,Broker分為Master與Slave,一個Master可以對應(yīng)多個Slave铛漓,但是一個Slave只能對應(yīng)一個Master溯香,Master與Slave的對應(yīng)關(guān)系通過指定相同的Broker Name,不同的Broker Id來定義浓恶,BrokerId為0表示Master,非0表示Slave结笨。Master也可以部署多個包晰。
每個Broker與Name Server集群中的所有節(jié)點(diǎn)建立長連接,定時(每隔30s)注冊Topic信息到所有Name Server炕吸。Name Server定時(每隔10s)掃描所有存活broker的連接伐憾,如果Name Server超過2分鐘沒有收到心跳,則Name Server斷開與Broker的連接赫模。
3) Producer
Producer與Name Server集群中的其中一個節(jié)點(diǎn)(隨機(jī)選擇)建立長連接树肃,定期從Name Server取Topic路由信息,并向提供Topic服務(wù)的Master建立長連接瀑罗,且定時向Master發(fā)送心跳胸嘴。Producer完全無狀態(tài),可集群部署斩祭。
Producer每隔30s(由ClientConfig的pollNameServerInterval)從Name server獲取所有topic隊列的最新情況劣像,這意味著如果Broker不可用,Producer最多30s能夠感知摧玫,在此期間內(nèi)發(fā)往Broker的所有消息都會失敗耳奕。
Producer每隔30s(由ClientConfig中heartbeatBrokerInterval決定)向所有關(guān)聯(lián)的broker發(fā)送心跳,Broker每隔10s中掃描所有存活的連接诬像,如果Broker在2分鐘內(nèi)沒有收到心跳數(shù)據(jù)屋群,則關(guān)閉與Producer的連接。
4) Consumer
Consumer與Name Server集群中的其中一個節(jié)點(diǎn)(隨機(jī)選擇)建立長連接坏挠,定期從Name Server取Topic路由信息芍躏,并向提供Topic服務(wù)的Master、Slave建立長連接癞揉,且定時向Master纸肉、Slave發(fā)送心跳溺欧。Consumer既可以從Master訂閱消息,也可以從Slave訂閱消息柏肪,訂閱規(guī)則由Broker配置決定姐刁。
Consumer每隔30s從Name server獲取topic的最新隊列情況,這意味著Broker不可用時烦味,Consumer最多最需要30s才能感知聂使。
Consumer每隔30s(由ClientConfig中heartbeatBrokerInterval決定)向所有關(guān)聯(lián)的broker發(fā)送心跳,Broker每隔10s掃描所有存活的連接谬俄,若某個連接2分鐘內(nèi)沒有發(fā)送心跳數(shù)據(jù)柏靶,則關(guān)閉連接;并向該Consumer Group的所有Consumer發(fā)出通知溃论,Group內(nèi)的Consumer重新分配隊列屎蜓,然后繼續(xù)消費(fèi)。
當(dāng)Consumer得到master宕機(jī)通知后钥勋,轉(zhuǎn)向slave消費(fèi)炬转,slave不能保證master的消息100%都同步過來了,因此會有少量的消息丟失算灸。但是一旦master恢復(fù)扼劈,未同步過去的消息會被最終消費(fèi)掉。
消費(fèi)者隊列是消費(fèi)者連接之后(或者之前有連接過)才創(chuàng)建的菲驴。我們將原生的消費(fèi)者標(biāo)識由 {IP}@{消費(fèi)者group}擴(kuò)展為?{IP}@{消費(fèi)者group}{topic}{tag}荐吵,例如xxx.xxx.xxx.xxx@mqtest_producer-group_2m2sTest_tag-zyk。任何一個元素不同赊瞬,都認(rèn)為是不同的消費(fèi)端先煎,每個消費(fèi)端會擁有一份自己消費(fèi)對列(默認(rèn)是broker對列數(shù)量*broker數(shù)量)。新掛載的消費(fèi)者對列中擁有commitlog中的所有數(shù)據(jù)森逮。
如果有需要榨婆,可以查看Rocketmq更多源碼解析
三、 Rocketmq如何支持分布式事務(wù)消息
場景
A(存在DB操作)褒侧、B(存在DB操作)兩方需要保證分布式事務(wù)一致性良风,通過引入中間層MQ,
A和MQ保持事務(wù)一致性(異常情況下通過MQ反查A接口實現(xiàn)check)闷供,
B和MQ保證事務(wù)一致(通過重試)烟央,從而達(dá)到最終事務(wù)一致性。
原理:大事務(wù) = 小事務(wù) + 異步
1. MQ與DB一致性原理(兩方事務(wù))
流程圖
上圖是RocketMQ提供的保證MQ消息歪脏、DB事務(wù)一致性的方案疑俭。
MQ消息、DB操作一致性方案:
1)發(fā)送消息到MQ服務(wù)器婿失,此時消息狀態(tài)為SEND_OK钞艇。此消息為consumer不可見啄寡。
2)執(zhí)行DB操作;DB執(zhí)行成功Commit DB操作哩照,DB執(zhí)行失敗Rollback DB操作挺物。
3)如果DB執(zhí)行成功,回復(fù)MQ服務(wù)器飘弧,將狀態(tài)為COMMIT_MESSAGE识藤;如果DB執(zhí)行失敗,回復(fù)MQ服務(wù)器次伶,將狀態(tài)改為ROLLBACK_MESSAGE痴昧。注意此過程有可能失敗。
4)MQ內(nèi)部提供一個名為“事務(wù)狀態(tài)服務(wù)”的服務(wù)冠王,此服務(wù)會檢查事務(wù)消息的狀態(tài)赶撰,如果發(fā)現(xiàn)消息未COMMIT,則通過Producer啟動時注冊的TransactionCheckListener來回調(diào)業(yè)務(wù)系統(tǒng)柱彻,業(yè)務(wù)系統(tǒng)在checkLocalTransactionState方法中檢查DB事務(wù)狀態(tài)扣囊,如果成功,則回復(fù)COMMIT_MESSAGE绒疗,否則回復(fù)ROLLBACK_MESSAGE。
說明:
上面以DB為例骂澄,其實此處可以是任何業(yè)務(wù)或者數(shù)據(jù)源吓蘑。
以上SEND_OK、COMMIT_MESSAGE坟冲、ROLLBACK_MESSAGE?均是client jar提供的狀態(tài)磨镶,在MQ服務(wù)器內(nèi)部是一個數(shù)字。
TransactionCheckListener 是在消息的commit或者rollback消息丟失的情況下才會回調(diào)(上圖中灰色部分)健提。這種消息丟失只存在于斷網(wǎng)或者RocketMQ集群掛了的情況下琳猫。當(dāng)RocketMQ集群掛了,如果采用異步刷盤私痹,存在1s內(nèi)數(shù)據(jù)丟失風(fēng)險脐嫂,異步刷盤場景下保障事務(wù)沒有意義。所以如果要核心業(yè)務(wù)用RocketMQ解決分布式事務(wù)問題紊遵,建議選擇同步刷盤模式账千。
2. 多系統(tǒng)之間數(shù)據(jù)一致性(多方事務(wù))
當(dāng)需要保證多方(超過2方)的分布式一致性,上面的兩方事務(wù)一致性(通過RocketMQ的事務(wù)性消息解決)已經(jīng)無法支持暗膜。這個時候需要引入TCC模式思想(Try-Confirm-Cancel匀奏,不清楚的自行百度)。
以上圖交易系統(tǒng)為例:
1)交易系統(tǒng)創(chuàng)建訂單(往DB插入一條記錄)学搜,同時發(fā)送訂單創(chuàng)建消息娃善。通過RocketMQ事務(wù)性消息保證一致性
2)接著執(zhí)行完成訂單所需的同步核心RPC服務(wù)(非核心的系統(tǒng)通過監(jiān)聽MQ消息自行處理论衍,處理結(jié)果不會影響交易狀態(tài))。執(zhí)行成功更改訂單狀態(tài)聚磺,同時發(fā)送MQ消息坯台。
3)交易系統(tǒng)接受自己發(fā)送的訂單創(chuàng)建消息,通過定時調(diào)度系統(tǒng)創(chuàng)建延時回滾任務(wù)(或者使用RocketMQ的重試功能咧最,設(shè)置第二次發(fā)送時間為定時任務(wù)的延遲創(chuàng)建時間捂人。在非消息堵塞的情況下,消息第一次到達(dá)延遲為1ms左右矢沿,這時可能RPC還未執(zhí)行完滥搭,訂單狀態(tài)還未設(shè)置為完成,第二次消費(fèi)時間可以指定)捣鲸。延遲任務(wù)先通過查詢訂單狀態(tài)判斷訂單是否完成瑟匆,完成則不創(chuàng)建回滾任務(wù),否則創(chuàng)建栽惶。 PS:多個RPC可以創(chuàng)建一個回滾任務(wù)愁溜,通過一個消費(fèi)組接受一次消息就可以;也可以通過創(chuàng)建多個消費(fèi)組外厂,一個消息消費(fèi)多次冕象,每次消費(fèi)創(chuàng)建一個RPC的回滾任務(wù)。 回滾任務(wù)失敗汁蝶,通過MQ的重發(fā)來重試渐扮。
以上是交易系統(tǒng)和其他系統(tǒng)之間保持最終一致性的解決方案。
3. 案例分析
1) 單機(jī)環(huán)境下的事務(wù)示意圖
如下為A給B轉(zhuǎn)賬的例子掖棉。
1鎖定A的賬戶
2鎖定B的賬戶
3檢查A賬戶是否有1元
4A的賬戶扣減1元
5給B的賬戶加1元
6解鎖B的賬戶
7解鎖A的賬戶
步驟動作
以上過程在代碼層面墓律,甚至可以簡化到在一個事物中,執(zhí)行兩條sql語句幔亥。
2) 分布式環(huán)境下事務(wù)
和單機(jī)事務(wù)不同耻讽,A、B賬戶可能不在同一個DB中帕棉,此時無法像在單機(jī)情況下使用事務(wù)來實現(xiàn)针肥。
此時可以通過一下方式實現(xiàn),將轉(zhuǎn)賬操作分成兩個操作笤昨。
a) A賬戶
1鎖定A的賬戶
2檢查A賬戶是否有1元
3A的賬戶扣減1元
4解鎖A的賬戶
步驟動作
b) MQ消息
A賬戶數(shù)據(jù)發(fā)生變化時祖驱,發(fā)送MQ消息,MQ服務(wù)器將消息推送給轉(zhuǎn)賬系統(tǒng)瞒窒,轉(zhuǎn)賬系統(tǒng)來給B賬號加錢捺僻。
c) B賬戶
1鎖定B的賬戶
2給B的賬戶加1元
3解鎖B的賬戶
步驟動作
四、 順序消息
RocketMq有3中消息類型
1. 普通消費(fèi)
2. 順序消費(fèi)
3. 事務(wù)消費(fèi)
順序消費(fèi)場景:在網(wǎng)購的時候,我們需要下單匕坯,那么下單需要假如有三個順序:
第一束昵、創(chuàng)建訂單
第二:訂單付款
第三:訂單完成
也就是這三個環(huán)節(jié)要有順序,這個訂單才有意義葛峻,RocketMQ可以保證順序消費(fèi)抓半。
RocketMQ 實現(xiàn)順序消費(fèi)的原理:produce在發(fā)送消息的時候谎懦,把消息發(fā)到同一個隊列(queue)中,消費(fèi)者注冊消息監(jiān)聽器為MessageListenerOrderly,這樣就可以保證消費(fèi)端只有一個線程去消費(fèi)消息
注意:是把把消息發(fā)到同一個隊列(queue)苗膝,不是同一個topic弛矛,默認(rèn)情況下一個topic包括4個queue
1. 順序消息缺陷
發(fā)送順序消息劣坊,無法利用集群Fail Over特性拉一,消費(fèi)順序消息的并行度依賴于隊列數(shù)量隊列熱點(diǎn)問題,個別隊列由于哈希不均導(dǎo)致消息過多唧龄,消費(fèi)速度跟不上兼砖,產(chǎn)生消息堆積問題遇到消息失敗的消息,無法跳過既棺,當(dāng)前隊列消費(fèi)暫停讽挟。
2. 原理
produce在發(fā)送消息的時候,把消息發(fā)到同一個隊列(queue)中丸冕,消費(fèi)者注冊消息監(jiān)聽器為MessageListenerOrderly耽梅,這樣就可以保證消費(fèi)端只有一個線程去消費(fèi)消息。
注意:把消息發(fā)到同一個隊列(queue)胖烛,不是同一個topic褐墅,默認(rèn)情況下一個topic包括4個queue
3. 擴(kuò)展
可以通過實現(xiàn)發(fā)送消息的隊列選擇器方法,實現(xiàn)部分順序消息洪己。
舉例:比如一個數(shù)據(jù)庫通過MQ來同步,只需要保證每個表的數(shù)據(jù)是同步的就可以竟贯。解析binlog答捕,將表名作為隊列選擇器的參數(shù),這樣就可以保證每個表的數(shù)據(jù)到同一個對列里面屑那,從而保證表數(shù)據(jù)的順序消費(fèi)
五拱镐、 最佳實踐
1. Producer
1) Topic
一個應(yīng)用盡可能用一個Topic,消息子類型用tags來標(biāo)識持际,tags可以由應(yīng)用自由設(shè)置沃琅。
只有發(fā)送消息設(shè)置了tags,消費(fèi)方在訂閱消息時蜘欲,才可以利用tags 在broker做消息過濾益眉。
2) key
每個消息在業(yè)務(wù)層面的唯一標(biāo)識碼,要設(shè)置到 keys 字段,方便將來定位消息丟失問題郭脂。服務(wù)器會為每個消息創(chuàng)建索引(哈希索引)年碘,應(yīng)用可以通過 topic,key來查詢這條消息內(nèi)容展鸡,以及消息被誰消費(fèi)屿衅。由于是哈希索引,請務(wù)必保證key 盡可能唯一莹弊,這樣可以避免潛在的哈希沖突涤久。
// 訂單Id
String orderId= "20034568923546";
message.setKeys(orderId);
3) 日志
消息發(fā)送成功或者失敗,要打印消息日志忍弛,務(wù)必要打印 send result 和key 字段响迂。
4) send
send消息方法,只要不拋異常剧罩,就代表發(fā)送成功栓拜。但是發(fā)送成功會有多個狀態(tài),在sendResult里定義惠昔。
SEND_OK:消息發(fā)送成功
FLUSH_DISK_TIMEOUT:消息發(fā)送成功幕与,但是服務(wù)器刷盤超時,消息已經(jīng)進(jìn)入服務(wù)器隊列镇防,只有此時服務(wù)器宕機(jī)啦鸣,消息才會丟失
FLUSH_SLAVE_TIMEOUT:消息發(fā)送成功,但是服務(wù)器同步到Slave時超時来氧,消息已經(jīng)進(jìn)入服務(wù)器隊列诫给,只有此時服務(wù)器宕機(jī),消息才會丟失
SLAVE_NOT_AVAILABLE:消息發(fā)送成功啦扬,但是此時slave不可用中狂,消息已經(jīng)進(jìn)入服務(wù)器隊列,只有此時服務(wù)器宕機(jī)扑毡,消息才會丟失
2. Consumer
1) 冪等
RocketMQ使用的消息原語是At Least Once胃榕,所以consumer可能多次收到同一個消息,此時務(wù)必做好冪等瞄摊。
冪等(idempotent勋又、idempotence)是一個數(shù)學(xué)與計算機(jī)學(xué)概念,常見于抽象代數(shù)中换帜。
在編程中一個冪等操作的特點(diǎn)是:其任意多次執(zhí)行所產(chǎn)生的影響(結(jié)果)楔壤,均與一次執(zhí)行的影響相同。
冪等函數(shù)惯驼,或冪等方法蹲嚣,是指可以使用相同參數(shù)重復(fù)執(zhí)行递瑰,并能獲得相同結(jié)果的函數(shù)。這些函數(shù)不會影響系統(tǒng)狀態(tài)端铛,也不用擔(dān)心重復(fù)執(zhí)行會對系統(tǒng)造成改變泣矛。
例如,“setTrue()”函數(shù)就是一個冪等函數(shù)禾蚕,無論多次執(zhí)行您朽,其結(jié)果都是一樣的。更復(fù)雜的操作冪等保證换淆,是利用唯一交易號(流水號)實現(xiàn)哗总。
2) 日志
消費(fèi)時記錄日志,以便后續(xù)定位問題倍试。
3) 批量消費(fèi)
盡量使用批量方式消費(fèi)方式讯屈,可以很大程度上提高消費(fèi)吞吐量。
MQ 消息隊列的底層原理
1县习、生產(chǎn)者
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
for (int i = 0; i < 128; i++)
try {
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes
(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
2涮母、消費(fèi)者
public class PushConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// wrong time format 2017_0422_221800
consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
3、RocketMQ架構(gòu)原理
對于RocketMQ先拋出幾個問題:
RocketMQ的topic和隊列是什么樣的躁愿,和Kafka的分區(qū)有什么不同叛本?
RocketMQ網(wǎng)絡(luò)模型是什么樣的,和Kafka對比如何彤钟?
RocketMQ消息存儲模型是什么樣的来候,如何保證高可靠的存儲,和Kafka對比如何逸雹?
3.1 RocketMQ架構(gòu)圖
對于RocketMQ的架構(gòu)圖营搅,在大體上來看和Kafka并沒有太多的差別,
但是在很多細(xì)節(jié)上是有很多差別的梆砸,接下來會一一進(jìn)行講述转质。
3.2 RocketMQ名詞解釋
在3.1的架構(gòu)中我們有?多個Producer,多個主Broker帖世,多個從Broker
每個Producer可以對應(yīng)多個Topic峭拘,每個Consumer也可以消費(fèi)多個Topic,多對多的關(guān)系
Broker信息會上報至NameServer狮暑,Consumer會從NameServer中拉取Broker和Topic的信息。
Producer:消息生產(chǎn)者辉饱,向Broker發(fā)送消息的客戶端
Consumer:消息消費(fèi)者搬男,從Broker讀取消息的客戶端
Broker:消息中間的處理節(jié)點(diǎn),這里和kafka不同彭沼,kafka的Broker沒有主從的概念缔逛,都可以寫入請求以及備份其他節(jié)點(diǎn)數(shù)據(jù),RocketMQ只有主Broker節(jié)點(diǎn)才能寫,一般也通過主節(jié)點(diǎn)讀褐奴,當(dāng)主節(jié)點(diǎn)有故障或者一些其他特殊情況才會使用從節(jié)點(diǎn)讀按脚,有點(diǎn)類似- 于mysql的主從架構(gòu)。
Topic:消息主題敦冬,一級消息類型辅搬,生產(chǎn)者向其發(fā)送消息, 消費(fèi)者讀取其消息。
Group:分為ProducerGroup脖旱,ConsumerGroup堪遂,代表某一類的生產(chǎn)者和消費(fèi)者,一般來說同一個服務(wù)可以作為Group萌庆,同一個Group一般來說發(fā)送和消費(fèi)的消息都是一樣的溶褪。
Tag:Kafka中沒有這個概念,Tag是屬于二級消息類型践险,一般來說業(yè)務(wù)有關(guān)聯(lián)的可以使用同一個Tag猿妈,比如訂單消息隊列,使用Topic_Order巍虫,Tag可以分為Tag_食品訂單,Tag_服裝訂單等等彭则。
Queue: 在kafka中叫Partition,每個Queue內(nèi)部是有序的垫言,在RocketMQ中分為讀和寫兩種隊列贰剥,一般來說讀寫隊列數(shù)量一致,如果不一致就會出現(xiàn)很多問題筷频。
NameServer:Kafka中使用的是ZooKeeper保存Broker的地址信息蚌成,以及Broker的Leader的選舉,在RocketMQ中并沒有采用選舉Broker的策略凛捏,所以采用了無狀態(tài)的NameServer來存儲担忧,由于NameServer是無狀態(tài)的,集群節(jié)點(diǎn)之間并不會通信坯癣,所以上傳數(shù)據(jù)的時候都需要向所有節(jié)點(diǎn)進(jìn)行發(fā)送瓶盛。
很多朋友都在問什么是無狀態(tài)呢?狀態(tài)的有無實際上就是數(shù)據(jù)是否會做存儲示罗,有狀態(tài)的話數(shù)據(jù)會被持久化惩猫,無狀態(tài)的服務(wù)可以理解就是一個內(nèi)存服務(wù),NameServer本身也是一個內(nèi)存服務(wù)蚜点,所有數(shù)據(jù)都存儲在內(nèi)存中轧房,重啟之后都會丟失。
3.3 Topic 和 Queue
在RocketMQ中的每一條消息绍绘,都有一個Topic奶镶,用來區(qū)分不同的消息迟赃。一個Topic主題一般會有多個消息的訂閱者,當(dāng)生產(chǎn)者發(fā)布消息到某個主題時厂镇,訂閱了這個主題的消費(fèi)者都可以接收到生產(chǎn)者寫入的新消息纤壁。
在Topic中有分為了多個Queue,這其實是我們發(fā)送/讀取消息通道的最小單位捺信,我們發(fā)送消息都需要指定某個寫入某個Queue酌媒,拉取消息的時候也需要指定拉取某個Queue,所以我們的順序消息可以基于我們的Queue維度保持隊列有序残黑,如果想做到全局有序那么需要將Queue大小設(shè)置為1馍佑,這樣所有的數(shù)據(jù)都會在Queue中有序。
在上圖中我們的Producer會通過一些策略進(jìn)行Queue的選擇:
1)非順序消息:非順序消息一般直接采用輪訓(xùn)發(fā)送的方式進(jìn)行發(fā)送梨水。
2)順序消息:根據(jù)某個Key拭荤,比如我們常見的訂單Id,用戶Id疫诽,進(jìn)行Hash舅世,將同一類數(shù)據(jù)放在同一個隊列中,保證我們的順序性奇徒。
我們同一組Consumer也會根據(jù)一些策略來選Queue雏亚,常見的比如平均分配或者一致性Hash分配。
要注意的是當(dāng)Consumer出現(xiàn)下線或者上線的時候摩钙,這里需要做重平衡罢低,也就是Rebalance,RocketMQ的重平衡機(jī)制如下:
1)定時拉取broker胖笛,topic的最新信息
2)每隔20s做重平衡
3)隨機(jī)選取當(dāng)前Topic的一個主Broker网持,這里要注意的是不是每次重平衡所有主Broker都會被選中,因為會存在一個Broker再多個Broker的情況
4)獲取當(dāng)前Broker长踊,當(dāng)前ConsumerGroup的所有機(jī)器ID
5)然后進(jìn)行策略分配
由于重平衡是定時做的功舀,所以這里有可能會出現(xiàn)某個Queue同時被兩個Consumer消費(fèi),所以會出現(xiàn)消息重復(fù)投遞身弊。
Kafka的重平衡機(jī)制和RocketMQ不同辟汰,Kafka的重平衡是通過Consumer和Coordinator聯(lián)系來完成的,當(dāng)Coordinator感知到消費(fèi)組的變化阱佛,會在心跳過程中發(fā)送重平衡的信號帖汞,然后由一個ConsumerLeader進(jìn)行重平衡選擇,然后再由Coordinator將結(jié)果通知給所有的消費(fèi)者凑术。
Queue 讀寫數(shù)量不一致
在RocketMQ中Queue被分為讀和寫兩種翩蘸,在最開始接觸RocketMQ的時候,一直以為讀寫隊列數(shù)量配置不一致不會出現(xiàn)什么問題的麦萤,比如當(dāng)消費(fèi)者機(jī)器很多的時候我們配置很多讀的隊列鹿鳖,但是實際過程中發(fā)現(xiàn)會出現(xiàn)消息無法消費(fèi)和根本沒有消息消費(fèi)的情況。
當(dāng)寫的隊列數(shù)量大于讀的隊列的數(shù)量壮莹,當(dāng)大于讀隊列這部分ID的寫隊列的數(shù)據(jù)會無法消費(fèi)翅帜,因為不會將其分配給消費(fèi)者。
當(dāng)讀的隊列數(shù)量大于寫的隊列數(shù)量命满,那么多的隊列數(shù)量就不會有消息被投遞進(jìn)來涝滴。
這個功能在RocketMQ在我看來明顯沒什么用,因為基本上都會設(shè)置為讀寫隊列大小一樣胶台,這個為啥不直接將其進(jìn)行統(tǒng)一歼疮,反而容易讓用戶配置不一樣出現(xiàn)錯誤。
這個問題在RocketMQ的Issue里也沒有收到好的答案诈唬。
3.4 消費(fèi)模型
一般來說消息隊列的消費(fèi)模型分為兩種:MQPullConsumer?和?MQPushConsumer韩脏,基于推送的消息(push)模型和基于拉取(poll)的消息模型。
1)基于推送模型的消息系統(tǒng)铸磅,由消息代理記錄消費(fèi)狀態(tài)赡矢。
消息代理將消息推送到消費(fèi)者后,標(biāo)記這條消息為已經(jīng)被消費(fèi)阅仔,但是這種方式無法很好地保證消費(fèi)的處理語義吹散。比如當(dāng)我們把已經(jīng)把消息發(fā)送給消費(fèi)者之后,由于消費(fèi)進(jìn)程掛掉或者由于網(wǎng)絡(luò)原因沒有收到這條消息八酒,如果我們在消費(fèi)代理將其標(biāo)記為已消費(fèi)空民,這個消息就永久丟失了。如果我們利用生產(chǎn)者收到消息后回復(fù)這種方法羞迷,消息代理需要記錄消費(fèi)狀態(tài)界轩,這種不可取。
用過RocketMQ的同學(xué)肯定不禁會想到闭树,在RocketMQ中不是提供了兩種消費(fèi)者嗎耸棒?
MQPullConsumer和MQPushConsumer,其中MQPushConsumer不就是我們的推模型嗎报辱?
其實這兩種模型都是客戶端主動去拉消息与殃,其中的實現(xiàn)區(qū)別如下:
1)MQPullConsumer
每次拉取消息需要傳入拉取消息的offset和每次拉取多少消息量,具體拉取哪里的消息碍现,拉取多少是由客戶端控制幅疼。
2)MQPushConsumer
同樣也是客戶端主動拉取消息,但是消息進(jìn)度是由服務(wù)端保存昼接,Consumer會定時上報自己消費(fèi)到哪里爽篷,所以Consumer下次消費(fèi)的時候是可以找到上次消費(fèi)的點(diǎn),一般來說使用PushConsumer我們不需要關(guān)心offset和拉取多少數(shù)據(jù)慢睡,直接使用即可逐工。
集群消費(fèi)和廣播消費(fèi)
消費(fèi)模式我們分為兩種铡溪,集群消費(fèi),廣播消費(fèi):
1)集群消費(fèi)
同一個GroupId都屬于一個集群泪喊,一般來說一條消息棕硫,只會被任意一個消費(fèi)者處理。
2)廣播消費(fèi)
廣播消費(fèi)的消息會被集群中所有消費(fèi)者進(jìn)行消息袒啼,但是要注意:因為廣播消費(fèi)的offset在服務(wù)端保存成本太高哈扮,所以客戶端每一次重啟都會從最新消息消費(fèi),而不是上次保存的offset蚓再。
3.5 網(wǎng)絡(luò)模型
在Kafka中使用的原生的socket實現(xiàn)網(wǎng)絡(luò)通信滑肉,而RocketMQ使用的是Netty網(wǎng)絡(luò)框架,現(xiàn)在越來越多的中間件都不會直接選擇原生的socket摘仅,而是使用的Netty框架靶庙,主要得益于下面幾個原因:
1)API使用簡單,不需要關(guān)心過多的網(wǎng)絡(luò)細(xì)節(jié)实檀,更專注于中間件邏輯惶洲。
2)性能高。
3)成熟穩(wěn)定膳犹,jdk nio的bug都被修復(fù)了恬吕。
選擇框架是一方面,而想要保證網(wǎng)絡(luò)通信的高效须床,網(wǎng)絡(luò)線程模型也是一方面铐料,我們常見的有
1+N(1個Acceptor線程,N個IO線程)
1+N+M(1個acceptor線程豺旬,N個IO線程钠惩,M個worker線程)等模型
RocketMQ使用的是?1+N1+N2+M 的模型,如下圖所示:
1個acceptor線程族阅,N1個IO線程篓跛,N2個線程用來做Shake-hand,SSL驗證坦刀,編解碼愧沟,M個線程用來做業(yè)務(wù)處理。這樣的好處將編解碼鲤遥,和SSL驗證等一些可能耗時的操作放在了一個單獨(dú)的線程池沐寺,不會占據(jù)我們業(yè)務(wù)線程和IO線程。
3.6 高可靠的分布式存儲模型
做為一個好的消息系統(tǒng)盖奈,高性能的存儲混坞,高可用都不可少。
3.6.1 高性能日志存儲
RocketMQ和Kafka的存儲核心設(shè)計有很大的不同,所以其在寫入性能方面也有很大的差別究孕,這是2016年阿里中間件團(tuán)隊對RocketMQ和Kafka不同Topic下做的性能測試:
產(chǎn)品Topic數(shù)量發(fā)送端并發(fā)數(shù)發(fā)送端RT(ms)發(fā)送端TPS消費(fèi)端TPS
RocketMQ6480089000086000
12880097800077000
256800107500075000
Kafka648005136000136000
1282562385008500
25625613322152352
從上可以看出:
Kafka在Topic數(shù)量由64增長到256時啥酱,吞吐量下降了98.37%
RocketMQ在Topic數(shù)量由64增長到256時,吞吐量只下降了16%
這是為什么呢厨诸?kafka一個topic下面的所有消息都是以partition的方式分布式的存儲在多個節(jié)點(diǎn)上懈涛。同時在kafka的機(jī)器上,每個Partition其實都會對應(yīng)一個日志目錄泳猬,在目錄下面會對應(yīng)多個日志分段。所以如果Topic很多的時候Kafka雖然寫文件是順序?qū)懹钪玻珜嶋H上文件過多得封,會造成磁盤IO競爭非常激烈。
那RocketMQ為什么在多Topic的情況下指郁,依然還能很好的保持較多的吞吐量呢忙上?
我們首先來看一下RocketMQ中比較關(guān)鍵的文件:
這里有四個目錄(這里的解釋就直接用RocketMQ官方的了):
commitLog:消息主體以及元數(shù)據(jù)的存儲主體,存儲Producer端寫入的消息主體內(nèi)容,消息內(nèi)容不是定長的闲坎。單個文件大小默認(rèn)1G 疫粥,文件名長度為20位,左邊補(bǔ)零腰懂,剩余為起始偏移量梗逮,比如00000000000000000000代表了第一個文件,起始偏移量為0绣溜,文件大小為1G=1073741824慷彤;當(dāng)?shù)谝粋€文件寫滿了,第二個文件為00000000001073741824怖喻,起始偏移量為1073741824底哗,以此類推。消息主要是順序?qū)懭肴罩疚募校?dāng)文件滿了跋选,寫入下一個文件;
config:保存一些配置信息哗蜈,包括一些Group前标,Topic以及Consumer消費(fèi)offset等信息。
consumeQueue:消息消費(fèi)隊列恬叹,引入的目的主要是提高消息消費(fèi)的性能候生,由于RocketMQ是基于主題topic的訂閱模式,消息消費(fèi)是針對主題進(jìn)行的绽昼,如果要遍歷commitlog文件中根據(jù)topic檢索消息是非常低效的唯鸭。
Consumer即可根據(jù)ConsumeQueue來查找待消費(fèi)的消息。其中硅确,ConsumeQueue(邏輯消費(fèi)隊列)作為消費(fèi)消息的索引目溉,保存了指定Topic下的隊列消息在CommitLog中的起始物理偏移量offset明肮,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的commitlog索引文件缭付,故consumequeue文件夾的組織方式如下:topic/queue/file三層組織結(jié)構(gòu)柿估,具體存儲路徑為:HOME \store\index\${fileName},文件名fileName是以創(chuàng)建時的時間戳命名的陷猫,固定的單個IndexFile文件大小約為400M秫舌,一個IndexFile可以保存 2000W個索引,IndexFile的底層存儲設(shè)計為在文件系統(tǒng)中實現(xiàn)HashMap結(jié)構(gòu)绣檬,故rocketmq的索引文件其底層實現(xiàn)為hash索引足陨。 我們發(fā)現(xiàn)我們的消息主體數(shù)據(jù)并沒有像Kafka一樣寫入多個文件,而是寫入一個文件,這樣我們的寫入IO競爭就非常小娇未,可以在很多Topic的時候依然保持很高的吞吐量墨缘。有同學(xué)說這里的ConsumeQueue寫是在不停的寫入呢,并且ConsumeQueue是以Queue維度來創(chuàng)建文件零抬,那么文件數(shù)量依然很多镊讼,在這里ConsumeQueue的寫入的數(shù)據(jù)量很小,每條消息只有20個字節(jié)平夜,30W條數(shù)據(jù)也才6M左右蝶棋,所以其實對我們的影響相對Kafka的Topic之間影響是要小很多的。我們整個的邏輯可以如下:
Producer不斷的再往CommitLog添加新的消息忽妒,有一個定時任務(wù)ReputService會不斷的掃描新添加進(jìn)來的CommitLog嚼松,然后不斷的去構(gòu)建ConsumerQueue和Index。
注意:這里指的都是普通的硬盤锰扶,在SSD上面多個文件并發(fā)寫入和單個文件寫入影響不大献酗。
讀取消息
Kafka中每個Partition都會是一個單獨(dú)的文件,所以當(dāng)消費(fèi)某個消息的時候坷牛,會很好的出現(xiàn)順序讀罕偎,我們知道OS從物理磁盤上訪問讀取文件的同時,會順序?qū)ζ渌噜弶K的數(shù)據(jù)文件進(jìn)行預(yù)讀取京闰,將數(shù)據(jù)放入PageCache颜及,所以Kafka的讀取消息性能比較好。
RocketMQ讀取流程如下:
先讀取ConsumerQueue中的offset對應(yīng)CommitLog物理的offset
根據(jù)offset讀取CommitLog
ConsumerQueue也是每個Queue一個單獨(dú)的文件蹂楣,并且其文件體積小俏站,所以很容易利用PageCache提高性能。而CommitLog痊土,由于同一個Queue的連續(xù)消息在CommitLog其實是不連續(xù)的肄扎,所以會造成隨機(jī)讀,RocketMQ對此做了幾個優(yōu)化:
Mmap映射讀取,Mmap的方式減少了傳統(tǒng)IO將磁盤文件數(shù)據(jù)在操作系統(tǒng)內(nèi)核地址空間的緩沖區(qū)和用戶應(yīng)用程序地址空間的緩沖區(qū)之間來回進(jìn)行拷貝的性能開銷
使用DeadLine調(diào)度算法+SSD存儲盤
由于Mmap映射受到內(nèi)存限制犯祠,當(dāng)不在Mmmap映射這部分?jǐn)?shù)據(jù)的時候(也就是消息堆積過多)旭等,默認(rèn)是內(nèi)存的40%,會將請求發(fā)送到SLAVE,減緩Master的壓力
3.6.2 可用性
3.6.2.1 集群模式
我們首先需要選擇一種集群模式衡载,來適應(yīng)我們可忍耐的可用程度搔耕,一般來說分為四種:
1)單Master
這種模式,可用性最低痰娱,但是成本也是最低弃榨,一旦宕機(jī),所有都不可用梨睁。這種一般只適用于本地測試惭墓。
2)單Master多SLAVE
這種模式,可用性一般而姐,如果主宕機(jī),那么所有寫入都不可用划咐,讀取依然可用拴念,如果master磁盤損壞,可以依賴slave的數(shù)據(jù)褐缠。
3)多Master
這種模式政鼠,可用性一般,如果出現(xiàn)部分master宕機(jī)队魏,那么這部分master上的消息都不可消費(fèi)公般,也不可寫數(shù)據(jù),如果一個Topic的隊列在多個Master上都有胡桨,那么可以保證沒有宕機(jī)的那部分可以正常消費(fèi)官帘,寫入。如果master的磁盤損壞會導(dǎo)致消息丟失昧谊。
4)多Master多Slave
這種模式刽虹,可用性最高,但是維護(hù)成本也最高呢诬,當(dāng)master宕機(jī)了之后涌哲,只會出現(xiàn)在這部分master上的隊列不可寫入,但是讀取依然是可以的尚镰,并且如果master磁盤損壞阀圾,可以依賴slave的數(shù)據(jù)。
一般來說投入生產(chǎn)環(huán)境的話都會選擇第四種狗唉,來保證最高的可用性初烘。
3.6.2.2 消息的可用性
當(dāng)我們選擇好了集群模式之后,那么我們需要關(guān)心的就是怎么去存儲和復(fù)制這個數(shù)據(jù),rocketMQ對消息的刷盤提供了同步和異步的策略來滿足我們的账月,當(dāng)我們選擇同步刷盤之后综膀,如果刷盤超時會給返回FLUSH_DISK_TIMEOUT,如果是異步刷盤不會返回刷盤相關(guān)信息局齿,選擇同步刷盤可以盡最大程度滿足我們的消息不會丟失剧劝。
除了存儲有選擇之后,我們的主從同步提供了同步和異步兩種模式來進(jìn)行復(fù)制抓歼,當(dāng)然選擇同步可以提升可用性讥此,但是消息的發(fā)送RT時間會下降10%左右。
3.6.3 Dleger
我們上面對于master-slave部署模式已經(jīng)做了很多分析谣妻,我們發(fā)現(xiàn)萄喳,當(dāng)master出現(xiàn)問題的時候,我們的寫入怎么都會不可用蹋半,除非恢復(fù)master他巨,或者手動將我們的slave切換成master,導(dǎo)致了我們的Slave在多數(shù)情況下只有讀取的作用减江。RocketMQ在最近的幾個版本中推出了Dleger-RocketMQ染突,使用Raft協(xié)議復(fù)制CommitLog,并且自動進(jìn)行選主辈灼,這樣master宕機(jī)的時候份企,寫入依然保持可用。
有關(guān)Dleger-RocketMQ的信息更多的可以查看這篇文章:Dledger-RocketMQ 基于Raft協(xié)議的commitlog存儲庫
3.7 定時/延時消息
定時消息和延時消息在實際業(yè)務(wù)場景中使用的比較多巡莹,比如下面的一些場景:
1)訂單超時未支付自動關(guān)閉司志,因為在很多場景中下單之后庫存就被鎖定了,這里需要將其進(jìn)行超時關(guān)閉降宅。
2)需要一些延時的操作骂远,比如一些兜底的邏輯,當(dāng)做完某個邏輯之后腰根,可以發(fā)送延時消息比如延時半個小時吧史,進(jìn)行兜底檢查補(bǔ)償。
3)在某個時間給用戶發(fā)送消息唠雕,同樣也可以使用延時消息贸营。
在開源版本的RocketMQ中延時消息并不支持任意時間的延時,需要設(shè)置幾個固定的延時等級岩睁,目前默認(rèn)設(shè)置為:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h钞脂,從1s到2h分別對應(yīng)著等級1到18,而阿里云中的版本(要付錢)是可以支持40天內(nèi)的任何時刻(毫秒級別)捕儒。
我們先看下在RocketMQ中定時任務(wù)原理圖:
Step1:Producer在自己發(fā)送的消息上設(shè)置好需要延時的級別冰啃。
Step2: Broker發(fā)現(xiàn)此消息是延時消息邓夕,將Topic進(jìn)行替換成延時Topic,每個延時級別都會作為一個單獨(dú)的queue阎毅,將自己的Topic作為額外信息存儲焚刚。
Step3: 構(gòu)建ConsumerQueue
Step4: 定時任務(wù)定時掃描每個延時級別的ConsumerQueue。
Step5: 拿到ConsumerQueue中的CommitLog的Offset扇调,獲取消息矿咕,判斷是否已經(jīng)達(dá)到執(zhí)行時間
Step6: 如果達(dá)到,那么將消息的Topic恢復(fù)狼钮,進(jìn)行重新投遞碳柱。如果沒有達(dá)到則延遲沒有達(dá)到的這段時間執(zhí)行任務(wù)。
可以看見延時消息是利用新建單獨(dú)的Topic和Queue來實現(xiàn)的熬芜,如果我們要實現(xiàn)40天之內(nèi)的任意時間度莲镣,基于這種方案,那么需要402460601000個queue涎拉,這樣的成本是非常之高的瑞侮,那阿里云上面的支持任意時間是怎么實現(xiàn)的呢?這里猜測是持久化二級TimeWheel時間輪鼓拧,二級時間輪用于替代我們的ConsumeQueue半火,保存Commitlog-Offset,然后通過時間輪不斷的取出當(dāng)前已經(jīng)到了的時間毁枯,然后再次投遞消息。具體的實現(xiàn)邏輯需要后續(xù)會單獨(dú)寫一篇文章叮称。
3.8 事務(wù)消息
事務(wù)消息同樣的也是RocketMQ中的一大特色种玛,其可以幫助我們完成分布式事務(wù)的最終一致性
具體使用事務(wù)消息步驟如下:
Step1:調(diào)用sendMessageInTransaction發(fā)送事務(wù)消息
Step2: 如果發(fā)送成功,則執(zhí)行本地事務(wù)瓤檐。
Step3: 如果執(zhí)行本地事務(wù)成功則發(fā)送commit赂韵,如果失敗則發(fā)送rollback。
Step4: 如果其中某個階段比如commit發(fā)送失敗挠蛉,rocketMQ會進(jìn)行定時從Broker回查祭示,本地事務(wù)的狀態(tài)。
事務(wù)消息的使用整個流程相對之前幾種消息使用比較復(fù)雜袖牙,下面是事務(wù)消息實現(xiàn)的原理圖:
Step1: 發(fā)送事務(wù)消息只磷,這里也叫做halfMessage皱蹦,會將Topic替換為HalfMessage的Topic。
Step2: 發(fā)送commit或者rollback汇陆,如果是commit這里會查詢出之前的消息,然后將消息復(fù)原成原Topic带饱,并且發(fā)送一個OpMessage用于記錄當(dāng)前消息可以刪除毡代。如果是rollback這里會直接發(fā)送一個OpMessage刪除阅羹。
Step3: 在Broker有個處理事務(wù)消息的定時任務(wù),定時對比halfMessage和OpMessage教寂,如果有OpMessage且狀態(tài)為刪除捏鱼,那么該條消息必定commit或者rollback,所以就可以刪除這條消息酪耕。
Step4: 如果事務(wù)超時(默認(rèn)是6s)导梆,還沒有opMessage,那么很有可能commit信息丟了因妇,這里會去反查我們的Producer本地事務(wù)狀態(tài)问潭。
Step5: 根據(jù)查詢出來的信息做Step2。
我們發(fā)現(xiàn)RocketMQ實現(xiàn)事務(wù)消息也是通過修改原Topic信息婚被,和延遲消息一樣狡忙,然后模擬成消費(fèi)者進(jìn)行消費(fèi),做一些特殊的業(yè)務(wù)邏輯址芯。當(dāng)然我們還可以利用這種方式去做RocketMQ更多的擴(kuò)展灾茁。
4. 總結(jié)
這里讓我們在回到文章中提到的幾個問題:
RocketMQ的topic和隊列是什么樣的,和Kafka的分區(qū)有什么不同谷炸?
RocketMQ網(wǎng)絡(luò)模型是什么樣的北专,和Kafka對比如何?
RocketMQ消息存儲模型是什么樣的旬陡,如何保證高可靠的存儲拓颓,和Kafka對比如何?
希望對你有所幫助描孟!
如果看完的小伙伴有興趣了解更多的話驶睦,歡迎添加vx小助手:SOSOXWV??免費(fèi)領(lǐng)取資料哦!