RocketMQ 設(shè)計原理與最佳實踐

RocketMQ 是一款開源的分布式消息系統(tǒng)驻债,基于高可用分布式集群技術(shù)乳规,提供低延時的、高可靠的消息發(fā)布與訂閱服務(wù)合呐。同時暮的,廣泛應(yīng)用于多個領(lǐng)域,包括異步通信解耦合砂、企業(yè)解決方案青扔、金融支付、電信翩伪、電子商務(wù)微猖、快遞物流、廣告營銷缘屹、社交凛剥、即時通信、移動應(yīng)用轻姿、手游犁珠、視頻、物聯(lián)網(wǎng)互亮、車聯(lián)網(wǎng)等犁享。

RocketMQ 前身叫做MetaQ,在MetaQ發(fā)布3.0版本的時候改名為 RocketMQ

RocketMQ本質(zhì)上的設(shè)計思路和Kafka類似豹休,但是和Kafka不同的是其使用Java進(jìn)行開發(fā)炊昆,由于在國內(nèi)的Java受眾群體遠(yuǎn)遠(yuǎn)多于Scala、Erlang威根,所以RocketMQ是很多以Java語言為主的公司的首選凤巨。同樣的RocketMQ和Kafka都是Apache基金會中的頂級項目,他們社區(qū)的活躍度都非常高洛搀,項目更新迭代也非掣易拢快。

RocketMQ是阿里review kafka的java版留美,如果消息性能要求高彰檬,用 RocketMQ 與 Kafka 可以更優(yōu)

消息隊列在實際應(yīng)用中常用的使用場景伸刃,包含應(yīng)用解耦、異步處理僧叉、流量削鋒奕枝、消息通訊、日志處理等瓶堕。

RocketMQ 官網(wǎng):http://rocketmq.apache.org

RocketMQ Github:https://github.com/apache/rocketmq

Kafka 官網(wǎng):http://kafka.apache.org

ActiveMQ 官網(wǎng):http://activemq.apache.org

RabbitMQ 官網(wǎng):https://www.rabbitmq.com

ZeroMQ 官網(wǎng):https://zeromq.org

MetaMQ RocketMQ的前世今生

某公司一直用的消息中間件是MetaMQ現(xiàn)在,網(wǎng)上相關(guān)的資料也不是很多症歇,今天去想淘寶為什會把MetaMQ給替換成了RocketMQ郎笆。就網(wǎng)上搜索了一下,這兩個居然是爺孫關(guān)系忘晤。

阿里巴巴消息中間件起源于2001年的五彩石項目宛蚓,Notify在這期間應(yīng)運(yùn)而生,用于交易核心消息的流轉(zhuǎn)设塔。

至2010年凄吏,B2B開始大規(guī)模使用ActiveMQ作為消息內(nèi)核,隨著阿里業(yè)務(wù)的快速發(fā)展闰蛔,急需一款支持順序消息痕钢,擁有海量消息堆積能力的消息中間件,MetaQ 1.0在2011年誕生序六。

到2012年任连,MetaQ已經(jīng)發(fā)展到了MetaQ 3.0,并抽象出了通用的消息引擎RocketMQ例诀。

隨后随抠,將RocketMQ進(jìn)行了開源,阿里的消息中間件正式走入了公眾的視野繁涂。

到2015年拱她,RocketMQ已經(jīng)經(jīng)歷了多年雙十一的洗禮,在可用性扔罪、可靠性以及穩(wěn)定性等方面都有出色的表現(xiàn)秉沼。與此同時,云計算大行其道步势,阿里消息中間件基于RocketMQ推出了Aliware MQ 1.0氧猬,開始為阿里云上成千上萬家企業(yè)提供消息服務(wù)。

到今年坏瘩,MetaQ在2016年雙十一承載了萬億級消息的流轉(zhuǎn)盅抚,跨越了一個新的里程碑,同時RocketMQ進(jìn)入Apache 孵化倔矾。

RocketMQ 產(chǎn)品發(fā)展歷史

大約經(jīng)歷了三個主要版本迭代:

1)Metaq 1.x(Metamorphosis)

由開源社區(qū)killme2008維護(hù)妄均,開源社區(qū)非持拢活躍 https://github.com/killme2008/Metamorphosis

2)Metaq 2.x

于2012年10月份上線,在淘寶內(nèi)部被廣泛使用丰包。

3)RocketMQ 3.x

基于公司內(nèi)部開源共建原則禁熏,?RocketMQ項目只維護(hù)核心功能,且去除了所有其他運(yùn)行時依賴邑彪,核心功能最簡化瞧毙。每個BU的個性化需求都在RocketMQ項目之上進(jìn)行深度定制。RocketMQ向其他BU提供的僅僅是Jar包寄症,例如要定制一個Broker宙彪,那么只需要依賴rocketmq-broker這個jar包即可,可通過API進(jìn)行交互有巧,如果定制client释漆,則依賴rocketmq-client這個jar包,對其提供的api進(jìn)行再封裝篮迎。

在RocketMQ項目基礎(chǔ)上衍生的項目如下

com.taobao.metaq v3.0 = RocketMQ + 淘寶個性化需求男图,為淘寶應(yīng)用提供消息服務(wù)。

com.alipay.zpullmsg v1.0 = RocketMQ + 支付寶個性化需求甜橱,為支付寶應(yīng)用提供消息服務(wù)逊笆。

com.alibaba.commonmq v1.0 = Notify + RocketMQ + B2B個性化需求,為B2B應(yīng)用提供消息服務(wù)渗鬼。

一览露、 MQ背景&選型

消息隊列作為分布式、高并發(fā)系統(tǒng)的核心組件之一譬胎,能夠幫助業(yè)務(wù)系統(tǒng)解構(gòu)差牛,提升開發(fā)效率和系統(tǒng)穩(wěn)定性。

MQ 主要具有以下優(yōu)勢:

1)削峰填谷:主要解決瞬時寫壓力大于應(yīng)用服務(wù)能力導(dǎo)致消息丟失堰乔、系統(tǒng)奔潰等問題

2)系統(tǒng)解耦:解決不同重要程度偏化、不同能力級別系統(tǒng)之間依賴導(dǎo)致一死全死

3)提升性能:當(dāng)存在一對多調(diào)用時,可以發(fā)一條消息給消息系統(tǒng)镐侯,讓消息系統(tǒng)通知相關(guān)系統(tǒng)

4)蓄流壓測:線上有些鏈路不好壓測侦讨,可以通過堆積一定量消息再放開來壓測

目前主流的MQ主要是RocketMQ、kafka苟翻、RabbitMQ等

RocketMQ相比于RabbitMQ韵卤、kafka具有主要優(yōu)勢特性有:

1)支持事務(wù)型消息(消息發(fā)送和DB操作保持兩方的最終一致性,rabbitmq和kafka不支持)

2)支持結(jié)合rocketmq的多個系統(tǒng)之間數(shù)據(jù)最終一致性(多方事務(wù)崇猫,二方事務(wù)是前提)

3)支持18個級別的延遲消息(rabbitmq和kafka不支持)

4)支持指定次數(shù)和時間間隔的失敗消息重發(fā)(kafka不支持沈条,rabbitmq需要手動確認(rèn))

5)支持consumer端tag過濾,減少不必要的網(wǎng)絡(luò)傳輸(rabbitmq和kafka不支持)

6)支持重復(fù)消費(fèi)(rabbitmq不支持诅炉,kafka支持)

Rocketmq蜡歹、kafka屋厘、Rabbitmq的詳細(xì)對比,請參照下表格:

二月而、RocketMQ集群概述

1. RocketMQ集群部署結(jié)構(gòu)

1) Name Server

Name Server是一個幾乎無狀態(tài)節(jié)點(diǎn)汗洒,可集群部署,節(jié)點(diǎn)之間無任何信息同步父款。

2) Broker

Broker部署相對復(fù)雜溢谤,Broker分為Master與Slave,一個Master可以對應(yīng)多個Slave铛漓,但是一個Slave只能對應(yīng)一個Master溯香,Master與Slave的對應(yīng)關(guān)系通過指定相同的Broker Name,不同的Broker Id來定義浓恶,BrokerId為0表示Master,非0表示Slave结笨。Master也可以部署多個包晰。

每個Broker與Name Server集群中的所有節(jié)點(diǎn)建立長連接,定時(每隔30s)注冊Topic信息到所有Name Server炕吸。Name Server定時(每隔10s)掃描所有存活broker的連接伐憾,如果Name Server超過2分鐘沒有收到心跳,則Name Server斷開與Broker的連接赫模。

3) Producer

Producer與Name Server集群中的其中一個節(jié)點(diǎn)(隨機(jī)選擇)建立長連接树肃,定期從Name Server取Topic路由信息,并向提供Topic服務(wù)的Master建立長連接瀑罗,且定時向Master發(fā)送心跳胸嘴。Producer完全無狀態(tài),可集群部署斩祭。

Producer每隔30s(由ClientConfig的pollNameServerInterval)從Name server獲取所有topic隊列的最新情況劣像,這意味著如果Broker不可用,Producer最多30s能夠感知摧玫,在此期間內(nèi)發(fā)往Broker的所有消息都會失敗耳奕。

Producer每隔30s(由ClientConfig中heartbeatBrokerInterval決定)向所有關(guān)聯(lián)的broker發(fā)送心跳,Broker每隔10s中掃描所有存活的連接诬像,如果Broker在2分鐘內(nèi)沒有收到心跳數(shù)據(jù)屋群,則關(guān)閉與Producer的連接。

4) Consumer

Consumer與Name Server集群中的其中一個節(jié)點(diǎn)(隨機(jī)選擇)建立長連接坏挠,定期從Name Server取Topic路由信息芍躏,并向提供Topic服務(wù)的Master、Slave建立長連接癞揉,且定時向Master纸肉、Slave發(fā)送心跳溺欧。Consumer既可以從Master訂閱消息,也可以從Slave訂閱消息柏肪,訂閱規(guī)則由Broker配置決定姐刁。

Consumer每隔30s從Name server獲取topic的最新隊列情況,這意味著Broker不可用時烦味,Consumer最多最需要30s才能感知聂使。

Consumer每隔30s(由ClientConfig中heartbeatBrokerInterval決定)向所有關(guān)聯(lián)的broker發(fā)送心跳,Broker每隔10s掃描所有存活的連接谬俄,若某個連接2分鐘內(nèi)沒有發(fā)送心跳數(shù)據(jù)柏靶,則關(guān)閉連接;并向該Consumer Group的所有Consumer發(fā)出通知溃论,Group內(nèi)的Consumer重新分配隊列屎蜓,然后繼續(xù)消費(fèi)。

當(dāng)Consumer得到master宕機(jī)通知后钥勋,轉(zhuǎn)向slave消費(fèi)炬转,slave不能保證master的消息100%都同步過來了,因此會有少量的消息丟失算灸。但是一旦master恢復(fù)扼劈,未同步過去的消息會被最終消費(fèi)掉。

消費(fèi)者隊列是消費(fèi)者連接之后(或者之前有連接過)才創(chuàng)建的菲驴。我們將原生的消費(fèi)者標(biāo)識由 {IP}@{消費(fèi)者group}擴(kuò)展為?{IP}@{消費(fèi)者group}{topic}{tag}荐吵,例如xxx.xxx.xxx.xxx@mqtest_producer-group_2m2sTest_tag-zyk。任何一個元素不同赊瞬,都認(rèn)為是不同的消費(fèi)端先煎,每個消費(fèi)端會擁有一份自己消費(fèi)對列(默認(rèn)是broker對列數(shù)量*broker數(shù)量)。新掛載的消費(fèi)者對列中擁有commitlog中的所有數(shù)據(jù)森逮。

如果有需要榨婆,可以查看Rocketmq更多源碼解析

三、 Rocketmq如何支持分布式事務(wù)消息

場景

A(存在DB操作)褒侧、B(存在DB操作)兩方需要保證分布式事務(wù)一致性良风,通過引入中間層MQ,

A和MQ保持事務(wù)一致性(異常情況下通過MQ反查A接口實現(xiàn)check)闷供,

B和MQ保證事務(wù)一致(通過重試)烟央,從而達(dá)到最終事務(wù)一致性。

原理:大事務(wù) = 小事務(wù) + 異步

1. MQ與DB一致性原理(兩方事務(wù))

流程圖

上圖是RocketMQ提供的保證MQ消息歪脏、DB事務(wù)一致性的方案疑俭。

MQ消息、DB操作一致性方案:

1)發(fā)送消息到MQ服務(wù)器婿失,此時消息狀態(tài)為SEND_OK钞艇。此消息為consumer不可見啄寡。

2)執(zhí)行DB操作;DB執(zhí)行成功Commit DB操作哩照,DB執(zhí)行失敗Rollback DB操作挺物。

3)如果DB執(zhí)行成功,回復(fù)MQ服務(wù)器飘弧,將狀態(tài)為COMMIT_MESSAGE识藤;如果DB執(zhí)行失敗,回復(fù)MQ服務(wù)器次伶,將狀態(tài)改為ROLLBACK_MESSAGE痴昧。注意此過程有可能失敗。

4)MQ內(nèi)部提供一個名為“事務(wù)狀態(tài)服務(wù)”的服務(wù)冠王,此服務(wù)會檢查事務(wù)消息的狀態(tài)赶撰,如果發(fā)現(xiàn)消息未COMMIT,則通過Producer啟動時注冊的TransactionCheckListener來回調(diào)業(yè)務(wù)系統(tǒng)柱彻,業(yè)務(wù)系統(tǒng)在checkLocalTransactionState方法中檢查DB事務(wù)狀態(tài)扣囊,如果成功,則回復(fù)COMMIT_MESSAGE绒疗,否則回復(fù)ROLLBACK_MESSAGE。

說明:

上面以DB為例骂澄,其實此處可以是任何業(yè)務(wù)或者數(shù)據(jù)源吓蘑。

以上SEND_OK、COMMIT_MESSAGE坟冲、ROLLBACK_MESSAGE?均是client jar提供的狀態(tài)磨镶,在MQ服務(wù)器內(nèi)部是一個數(shù)字。

TransactionCheckListener 是在消息的commit或者rollback消息丟失的情況下才會回調(diào)(上圖中灰色部分)健提。這種消息丟失只存在于斷網(wǎng)或者RocketMQ集群掛了的情況下琳猫。當(dāng)RocketMQ集群掛了,如果采用異步刷盤私痹,存在1s內(nèi)數(shù)據(jù)丟失風(fēng)險脐嫂,異步刷盤場景下保障事務(wù)沒有意義。所以如果要核心業(yè)務(wù)用RocketMQ解決分布式事務(wù)問題紊遵,建議選擇同步刷盤模式账千。

2. 多系統(tǒng)之間數(shù)據(jù)一致性(多方事務(wù))

當(dāng)需要保證多方(超過2方)的分布式一致性,上面的兩方事務(wù)一致性(通過RocketMQ的事務(wù)性消息解決)已經(jīng)無法支持暗膜。這個時候需要引入TCC模式思想(Try-Confirm-Cancel匀奏,不清楚的自行百度)。

以上圖交易系統(tǒng)為例:

1)交易系統(tǒng)創(chuàng)建訂單(往DB插入一條記錄)学搜,同時發(fā)送訂單創(chuàng)建消息娃善。通過RocketMQ事務(wù)性消息保證一致性

2)接著執(zhí)行完成訂單所需的同步核心RPC服務(wù)(非核心的系統(tǒng)通過監(jiān)聽MQ消息自行處理论衍,處理結(jié)果不會影響交易狀態(tài))。執(zhí)行成功更改訂單狀態(tài)聚磺,同時發(fā)送MQ消息坯台。

3)交易系統(tǒng)接受自己發(fā)送的訂單創(chuàng)建消息,通過定時調(diào)度系統(tǒng)創(chuàng)建延時回滾任務(wù)(或者使用RocketMQ的重試功能咧最,設(shè)置第二次發(fā)送時間為定時任務(wù)的延遲創(chuàng)建時間捂人。在非消息堵塞的情況下,消息第一次到達(dá)延遲為1ms左右矢沿,這時可能RPC還未執(zhí)行完滥搭,訂單狀態(tài)還未設(shè)置為完成,第二次消費(fèi)時間可以指定)捣鲸。延遲任務(wù)先通過查詢訂單狀態(tài)判斷訂單是否完成瑟匆,完成則不創(chuàng)建回滾任務(wù),否則創(chuàng)建栽惶。 PS:多個RPC可以創(chuàng)建一個回滾任務(wù)愁溜,通過一個消費(fèi)組接受一次消息就可以;也可以通過創(chuàng)建多個消費(fèi)組外厂,一個消息消費(fèi)多次冕象,每次消費(fèi)創(chuàng)建一個RPC的回滾任務(wù)。 回滾任務(wù)失敗汁蝶,通過MQ的重發(fā)來重試渐扮。

以上是交易系統(tǒng)和其他系統(tǒng)之間保持最終一致性的解決方案。

3. 案例分析

1) 單機(jī)環(huán)境下的事務(wù)示意圖

如下為A給B轉(zhuǎn)賬的例子掖棉。

1鎖定A的賬戶

2鎖定B的賬戶

3檢查A賬戶是否有1元

4A的賬戶扣減1元

5給B的賬戶加1元

6解鎖B的賬戶

7解鎖A的賬戶

步驟動作

以上過程在代碼層面墓律,甚至可以簡化到在一個事物中,執(zhí)行兩條sql語句幔亥。

2) 分布式環(huán)境下事務(wù)

和單機(jī)事務(wù)不同耻讽,A、B賬戶可能不在同一個DB中帕棉,此時無法像在單機(jī)情況下使用事務(wù)來實現(xiàn)针肥。

此時可以通過一下方式實現(xiàn),將轉(zhuǎn)賬操作分成兩個操作笤昨。

a) A賬戶

1鎖定A的賬戶

2檢查A賬戶是否有1元

3A的賬戶扣減1元

4解鎖A的賬戶

步驟動作

b) MQ消息

A賬戶數(shù)據(jù)發(fā)生變化時祖驱,發(fā)送MQ消息,MQ服務(wù)器將消息推送給轉(zhuǎn)賬系統(tǒng)瞒窒,轉(zhuǎn)賬系統(tǒng)來給B賬號加錢捺僻。

c) B賬戶

1鎖定B的賬戶

2給B的賬戶加1元

3解鎖B的賬戶

步驟動作

四、 順序消息

RocketMq有3中消息類型

1. 普通消費(fèi)

2. 順序消費(fèi)

3. 事務(wù)消費(fèi)

順序消費(fèi)場景:在網(wǎng)購的時候,我們需要下單匕坯,那么下單需要假如有三個順序:

第一束昵、創(chuàng)建訂單

第二:訂單付款

第三:訂單完成

也就是這三個環(huán)節(jié)要有順序,這個訂單才有意義葛峻,RocketMQ可以保證順序消費(fèi)抓半。

RocketMQ 實現(xiàn)順序消費(fèi)的原理:produce在發(fā)送消息的時候谎懦,把消息發(fā)到同一個隊列(queue)中,消費(fèi)者注冊消息監(jiān)聽器為MessageListenerOrderly,這樣就可以保證消費(fèi)端只有一個線程去消費(fèi)消息

注意:是把把消息發(fā)到同一個隊列(queue)苗膝,不是同一個topic弛矛,默認(rèn)情況下一個topic包括4個queue

1. 順序消息缺陷

發(fā)送順序消息劣坊,無法利用集群Fail Over特性拉一,消費(fèi)順序消息的并行度依賴于隊列數(shù)量隊列熱點(diǎn)問題,個別隊列由于哈希不均導(dǎo)致消息過多唧龄,消費(fèi)速度跟不上兼砖,產(chǎn)生消息堆積問題遇到消息失敗的消息,無法跳過既棺,當(dāng)前隊列消費(fèi)暫停讽挟。

2. 原理

produce在發(fā)送消息的時候,把消息發(fā)到同一個隊列(queue)中丸冕,消費(fèi)者注冊消息監(jiān)聽器為MessageListenerOrderly耽梅,這樣就可以保證消費(fèi)端只有一個線程去消費(fèi)消息。

注意:把消息發(fā)到同一個隊列(queue)胖烛,不是同一個topic褐墅,默認(rèn)情況下一個topic包括4個queue

3. 擴(kuò)展

可以通過實現(xiàn)發(fā)送消息的隊列選擇器方法,實現(xiàn)部分順序消息洪己。

舉例:比如一個數(shù)據(jù)庫通過MQ來同步,只需要保證每個表的數(shù)據(jù)是同步的就可以竟贯。解析binlog答捕,將表名作為隊列選擇器的參數(shù),這樣就可以保證每個表的數(shù)據(jù)到同一個對列里面屑那,從而保證表數(shù)據(jù)的順序消費(fèi)

五拱镐、 最佳實踐

1. Producer

1) Topic

一個應(yīng)用盡可能用一個Topic,消息子類型用tags來標(biāo)識持际,tags可以由應(yīng)用自由設(shè)置沃琅。

只有發(fā)送消息設(shè)置了tags,消費(fèi)方在訂閱消息時蜘欲,才可以利用tags 在broker做消息過濾益眉。

2) key

每個消息在業(yè)務(wù)層面的唯一標(biāo)識碼,要設(shè)置到 keys 字段,方便將來定位消息丟失問題郭脂。服務(wù)器會為每個消息創(chuàng)建索引(哈希索引)年碘,應(yīng)用可以通過 topic,key來查詢這條消息內(nèi)容展鸡,以及消息被誰消費(fèi)屿衅。由于是哈希索引,請務(wù)必保證key 盡可能唯一莹弊,這樣可以避免潛在的哈希沖突涤久。

// 訂單Id

String orderId= "20034568923546";

message.setKeys(orderId);

3) 日志

消息發(fā)送成功或者失敗,要打印消息日志忍弛,務(wù)必要打印 send result 和key 字段响迂。

4) send

send消息方法,只要不拋異常剧罩,就代表發(fā)送成功栓拜。但是發(fā)送成功會有多個狀態(tài),在sendResult里定義惠昔。

SEND_OK:消息發(fā)送成功

FLUSH_DISK_TIMEOUT:消息發(fā)送成功幕与,但是服務(wù)器刷盤超時,消息已經(jīng)進(jìn)入服務(wù)器隊列镇防,只有此時服務(wù)器宕機(jī)啦鸣,消息才會丟失

FLUSH_SLAVE_TIMEOUT:消息發(fā)送成功,但是服務(wù)器同步到Slave時超時来氧,消息已經(jīng)進(jìn)入服務(wù)器隊列诫给,只有此時服務(wù)器宕機(jī),消息才會丟失

SLAVE_NOT_AVAILABLE:消息發(fā)送成功啦扬,但是此時slave不可用中狂,消息已經(jīng)進(jìn)入服務(wù)器隊列,只有此時服務(wù)器宕機(jī)扑毡,消息才會丟失

2. Consumer

1) 冪等

RocketMQ使用的消息原語是At Least Once胃榕,所以consumer可能多次收到同一個消息,此時務(wù)必做好冪等瞄摊。

冪等(idempotent勋又、idempotence)是一個數(shù)學(xué)與計算機(jī)學(xué)概念,常見于抽象代數(shù)中换帜。

在編程中一個冪等操作的特點(diǎn)是:其任意多次執(zhí)行所產(chǎn)生的影響(結(jié)果)楔壤,均與一次執(zhí)行的影響相同。

冪等函數(shù)惯驼,或冪等方法蹲嚣,是指可以使用相同參數(shù)重復(fù)執(zhí)行递瑰,并能獲得相同結(jié)果的函數(shù)。這些函數(shù)不會影響系統(tǒng)狀態(tài)端铛,也不用擔(dān)心重復(fù)執(zhí)行會對系統(tǒng)造成改變泣矛。

例如,“setTrue()”函數(shù)就是一個冪等函數(shù)禾蚕,無論多次執(zhí)行您朽,其結(jié)果都是一樣的。更復(fù)雜的操作冪等保證换淆,是利用唯一交易號(流水號)實現(xiàn)哗总。

2) 日志

消費(fèi)時記錄日志,以便后續(xù)定位問題倍试。

3) 批量消費(fèi)

盡量使用批量方式消費(fèi)方式讯屈,可以很大程度上提高消費(fèi)吞吐量。

MQ 消息隊列的底層原理

1县习、生產(chǎn)者

public class Producer {

public static void main(String[] args) throws MQClientException, InterruptedException {

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

producer.start();

for (int i = 0; i < 128; i++)

try {

Message msg = new Message("TopicTest",

"TagA",

"OrderID188",

"Hello world".getBytes

(RemotingHelper.DEFAULT_CHARSET));

SendResult sendResult = producer.send(msg);

System.out.printf("%s%n", sendResult);

} catch (Exception e) {

e.printStackTrace();

}

producer.shutdown();

}

}

2涮母、消費(fèi)者

public class PushConsumer {

public static void main(String[] args) throws InterruptedException, MQClientException {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");

consumer.subscribe("TopicTest", "*");

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

// wrong time format 2017_0422_221800

consumer.setConsumeTimestamp("20181109221800");

consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,

ConsumeConcurrentlyContext context) {

System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

});

consumer.start();

System.out.printf("Consumer Started.%n");

}

}

3、RocketMQ架構(gòu)原理

對于RocketMQ先拋出幾個問題:

RocketMQ的topic和隊列是什么樣的躁愿,和Kafka的分區(qū)有什么不同叛本?

RocketMQ網(wǎng)絡(luò)模型是什么樣的,和Kafka對比如何彤钟?

RocketMQ消息存儲模型是什么樣的来候,如何保證高可靠的存儲,和Kafka對比如何逸雹?

3.1 RocketMQ架構(gòu)圖

對于RocketMQ的架構(gòu)圖营搅,在大體上來看和Kafka并沒有太多的差別,

但是在很多細(xì)節(jié)上是有很多差別的梆砸,接下來會一一進(jìn)行講述转质。

3.2 RocketMQ名詞解釋

在3.1的架構(gòu)中我們有?多個Producer,多個主Broker帖世,多個從Broker

每個Producer可以對應(yīng)多個Topic峭拘,每個Consumer也可以消費(fèi)多個Topic,多對多的關(guān)系

Broker信息會上報至NameServer狮暑,Consumer會從NameServer中拉取Broker和Topic的信息。

Producer:消息生產(chǎn)者辉饱,向Broker發(fā)送消息的客戶端

Consumer:消息消費(fèi)者搬男,從Broker讀取消息的客戶端

Broker:消息中間的處理節(jié)點(diǎn),這里和kafka不同彭沼,kafka的Broker沒有主從的概念缔逛,都可以寫入請求以及備份其他節(jié)點(diǎn)數(shù)據(jù),RocketMQ只有主Broker節(jié)點(diǎn)才能寫,一般也通過主節(jié)點(diǎn)讀褐奴,當(dāng)主節(jié)點(diǎn)有故障或者一些其他特殊情況才會使用從節(jié)點(diǎn)讀按脚,有點(diǎn)類似- 于mysql的主從架構(gòu)。

Topic:消息主題敦冬,一級消息類型辅搬,生產(chǎn)者向其發(fā)送消息, 消費(fèi)者讀取其消息。

Group:分為ProducerGroup脖旱,ConsumerGroup堪遂,代表某一類的生產(chǎn)者和消費(fèi)者,一般來說同一個服務(wù)可以作為Group萌庆,同一個Group一般來說發(fā)送和消費(fèi)的消息都是一樣的溶褪。

Tag:Kafka中沒有這個概念,Tag是屬于二級消息類型践险,一般來說業(yè)務(wù)有關(guān)聯(lián)的可以使用同一個Tag猿妈,比如訂單消息隊列,使用Topic_Order巍虫,Tag可以分為Tag_食品訂單,Tag_服裝訂單等等彭则。

Queue: 在kafka中叫Partition,每個Queue內(nèi)部是有序的垫言,在RocketMQ中分為讀和寫兩種隊列贰剥,一般來說讀寫隊列數(shù)量一致,如果不一致就會出現(xiàn)很多問題筷频。

NameServer:Kafka中使用的是ZooKeeper保存Broker的地址信息蚌成,以及Broker的Leader的選舉,在RocketMQ中并沒有采用選舉Broker的策略凛捏,所以采用了無狀態(tài)的NameServer來存儲担忧,由于NameServer是無狀態(tài)的,集群節(jié)點(diǎn)之間并不會通信坯癣,所以上傳數(shù)據(jù)的時候都需要向所有節(jié)點(diǎn)進(jìn)行發(fā)送瓶盛。

很多朋友都在問什么是無狀態(tài)呢?狀態(tài)的有無實際上就是數(shù)據(jù)是否會做存儲示罗,有狀態(tài)的話數(shù)據(jù)會被持久化惩猫,無狀態(tài)的服務(wù)可以理解就是一個內(nèi)存服務(wù),NameServer本身也是一個內(nèi)存服務(wù)蚜点,所有數(shù)據(jù)都存儲在內(nèi)存中轧房,重啟之后都會丟失。

3.3 Topic 和 Queue

在RocketMQ中的每一條消息绍绘,都有一個Topic奶镶,用來區(qū)分不同的消息迟赃。一個Topic主題一般會有多個消息的訂閱者,當(dāng)生產(chǎn)者發(fā)布消息到某個主題時厂镇,訂閱了這個主題的消費(fèi)者都可以接收到生產(chǎn)者寫入的新消息纤壁。

在Topic中有分為了多個Queue,這其實是我們發(fā)送/讀取消息通道的最小單位捺信,我們發(fā)送消息都需要指定某個寫入某個Queue酌媒,拉取消息的時候也需要指定拉取某個Queue,所以我們的順序消息可以基于我們的Queue維度保持隊列有序残黑,如果想做到全局有序那么需要將Queue大小設(shè)置為1馍佑,這樣所有的數(shù)據(jù)都會在Queue中有序。

在上圖中我們的Producer會通過一些策略進(jìn)行Queue的選擇:

1)非順序消息:非順序消息一般直接采用輪訓(xùn)發(fā)送的方式進(jìn)行發(fā)送梨水。

2)順序消息:根據(jù)某個Key拭荤,比如我們常見的訂單Id,用戶Id疫诽,進(jìn)行Hash舅世,將同一類數(shù)據(jù)放在同一個隊列中,保證我們的順序性奇徒。

我們同一組Consumer也會根據(jù)一些策略來選Queue雏亚,常見的比如平均分配或者一致性Hash分配。

要注意的是當(dāng)Consumer出現(xiàn)下線或者上線的時候摩钙,這里需要做重平衡罢低,也就是Rebalance,RocketMQ的重平衡機(jī)制如下:

1)定時拉取broker胖笛,topic的最新信息

2)每隔20s做重平衡

3)隨機(jī)選取當(dāng)前Topic的一個主Broker网持,這里要注意的是不是每次重平衡所有主Broker都會被選中,因為會存在一個Broker再多個Broker的情況

4)獲取當(dāng)前Broker长踊,當(dāng)前ConsumerGroup的所有機(jī)器ID

5)然后進(jìn)行策略分配

由于重平衡是定時做的功舀,所以這里有可能會出現(xiàn)某個Queue同時被兩個Consumer消費(fèi),所以會出現(xiàn)消息重復(fù)投遞身弊。

Kafka的重平衡機(jī)制和RocketMQ不同辟汰,Kafka的重平衡是通過Consumer和Coordinator聯(lián)系來完成的,當(dāng)Coordinator感知到消費(fèi)組的變化阱佛,會在心跳過程中發(fā)送重平衡的信號帖汞,然后由一個ConsumerLeader進(jìn)行重平衡選擇,然后再由Coordinator將結(jié)果通知給所有的消費(fèi)者凑术。

Queue 讀寫數(shù)量不一致

在RocketMQ中Queue被分為讀和寫兩種翩蘸,在最開始接觸RocketMQ的時候,一直以為讀寫隊列數(shù)量配置不一致不會出現(xiàn)什么問題的麦萤,比如當(dāng)消費(fèi)者機(jī)器很多的時候我們配置很多讀的隊列鹿鳖,但是實際過程中發(fā)現(xiàn)會出現(xiàn)消息無法消費(fèi)和根本沒有消息消費(fèi)的情況。

當(dāng)寫的隊列數(shù)量大于讀的隊列的數(shù)量壮莹,當(dāng)大于讀隊列這部分ID的寫隊列的數(shù)據(jù)會無法消費(fèi)翅帜,因為不會將其分配給消費(fèi)者。

當(dāng)讀的隊列數(shù)量大于寫的隊列數(shù)量命满,那么多的隊列數(shù)量就不會有消息被投遞進(jìn)來涝滴。

這個功能在RocketMQ在我看來明顯沒什么用,因為基本上都會設(shè)置為讀寫隊列大小一樣胶台,這個為啥不直接將其進(jìn)行統(tǒng)一歼疮,反而容易讓用戶配置不一樣出現(xiàn)錯誤。

這個問題在RocketMQ的Issue里也沒有收到好的答案诈唬。

3.4 消費(fèi)模型

一般來說消息隊列的消費(fèi)模型分為兩種:MQPullConsumer?和?MQPushConsumer韩脏,基于推送的消息(push)模型和基于拉取(poll)的消息模型。

1)基于推送模型的消息系統(tǒng)铸磅,由消息代理記錄消費(fèi)狀態(tài)赡矢。

消息代理將消息推送到消費(fèi)者后,標(biāo)記這條消息為已經(jīng)被消費(fèi)阅仔,但是這種方式無法很好地保證消費(fèi)的處理語義吹散。比如當(dāng)我們把已經(jīng)把消息發(fā)送給消費(fèi)者之后,由于消費(fèi)進(jìn)程掛掉或者由于網(wǎng)絡(luò)原因沒有收到這條消息八酒,如果我們在消費(fèi)代理將其標(biāo)記為已消費(fèi)空民,這個消息就永久丟失了。如果我們利用生產(chǎn)者收到消息后回復(fù)這種方法羞迷,消息代理需要記錄消費(fèi)狀態(tài)界轩,這種不可取。

用過RocketMQ的同學(xué)肯定不禁會想到闭树,在RocketMQ中不是提供了兩種消費(fèi)者嗎耸棒?

MQPullConsumer和MQPushConsumer,其中MQPushConsumer不就是我們的推模型嗎报辱?

其實這兩種模型都是客戶端主動去拉消息与殃,其中的實現(xiàn)區(qū)別如下:

1)MQPullConsumer

每次拉取消息需要傳入拉取消息的offset和每次拉取多少消息量,具體拉取哪里的消息碍现,拉取多少是由客戶端控制幅疼。

2)MQPushConsumer

同樣也是客戶端主動拉取消息,但是消息進(jìn)度是由服務(wù)端保存昼接,Consumer會定時上報自己消費(fèi)到哪里爽篷,所以Consumer下次消費(fèi)的時候是可以找到上次消費(fèi)的點(diǎn),一般來說使用PushConsumer我們不需要關(guān)心offset和拉取多少數(shù)據(jù)慢睡,直接使用即可逐工。

集群消費(fèi)和廣播消費(fèi)

消費(fèi)模式我們分為兩種铡溪,集群消費(fèi),廣播消費(fèi):

1)集群消費(fèi)

同一個GroupId都屬于一個集群泪喊,一般來說一條消息棕硫,只會被任意一個消費(fèi)者處理。

2)廣播消費(fèi)

廣播消費(fèi)的消息會被集群中所有消費(fèi)者進(jìn)行消息袒啼,但是要注意:因為廣播消費(fèi)的offset在服務(wù)端保存成本太高哈扮,所以客戶端每一次重啟都會從最新消息消費(fèi),而不是上次保存的offset蚓再。

3.5 網(wǎng)絡(luò)模型

在Kafka中使用的原生的socket實現(xiàn)網(wǎng)絡(luò)通信滑肉,而RocketMQ使用的是Netty網(wǎng)絡(luò)框架,現(xiàn)在越來越多的中間件都不會直接選擇原生的socket摘仅,而是使用的Netty框架靶庙,主要得益于下面幾個原因:

1)API使用簡單,不需要關(guān)心過多的網(wǎng)絡(luò)細(xì)節(jié)实檀,更專注于中間件邏輯惶洲。

2)性能高。

3)成熟穩(wěn)定膳犹,jdk nio的bug都被修復(fù)了恬吕。

選擇框架是一方面,而想要保證網(wǎng)絡(luò)通信的高效须床,網(wǎng)絡(luò)線程模型也是一方面铐料,我們常見的有

1+N(1個Acceptor線程,N個IO線程)

1+N+M(1個acceptor線程豺旬,N個IO線程钠惩,M個worker線程)等模型

RocketMQ使用的是?1+N1+N2+M 的模型,如下圖所示:

1個acceptor線程族阅,N1個IO線程篓跛,N2個線程用來做Shake-hand,SSL驗證坦刀,編解碼愧沟,M個線程用來做業(yè)務(wù)處理。這樣的好處將編解碼鲤遥,和SSL驗證等一些可能耗時的操作放在了一個單獨(dú)的線程池沐寺,不會占據(jù)我們業(yè)務(wù)線程和IO線程。

3.6 高可靠的分布式存儲模型

做為一個好的消息系統(tǒng)盖奈,高性能的存儲混坞,高可用都不可少。

3.6.1 高性能日志存儲

RocketMQ和Kafka的存儲核心設(shè)計有很大的不同,所以其在寫入性能方面也有很大的差別究孕,這是2016年阿里中間件團(tuán)隊對RocketMQ和Kafka不同Topic下做的性能測試:

產(chǎn)品Topic數(shù)量發(fā)送端并發(fā)數(shù)發(fā)送端RT(ms)發(fā)送端TPS消費(fèi)端TPS

RocketMQ6480089000086000

12880097800077000

256800107500075000

Kafka648005136000136000

1282562385008500

25625613322152352

從上可以看出:

Kafka在Topic數(shù)量由64增長到256時啥酱,吞吐量下降了98.37%

RocketMQ在Topic數(shù)量由64增長到256時,吞吐量只下降了16%

這是為什么呢厨诸?kafka一個topic下面的所有消息都是以partition的方式分布式的存儲在多個節(jié)點(diǎn)上懈涛。同時在kafka的機(jī)器上,每個Partition其實都會對應(yīng)一個日志目錄泳猬,在目錄下面會對應(yīng)多個日志分段。所以如果Topic很多的時候Kafka雖然寫文件是順序?qū)懹钪玻珜嶋H上文件過多得封,會造成磁盤IO競爭非常激烈。

那RocketMQ為什么在多Topic的情況下指郁,依然還能很好的保持較多的吞吐量呢忙上?

我們首先來看一下RocketMQ中比較關(guān)鍵的文件:

這里有四個目錄(這里的解釋就直接用RocketMQ官方的了):

commitLog:消息主體以及元數(shù)據(jù)的存儲主體,存儲Producer端寫入的消息主體內(nèi)容,消息內(nèi)容不是定長的闲坎。單個文件大小默認(rèn)1G 疫粥,文件名長度為20位,左邊補(bǔ)零腰懂,剩余為起始偏移量梗逮,比如00000000000000000000代表了第一個文件,起始偏移量為0绣溜,文件大小為1G=1073741824慷彤;當(dāng)?shù)谝粋€文件寫滿了,第二個文件為00000000001073741824怖喻,起始偏移量為1073741824底哗,以此類推。消息主要是順序?qū)懭肴罩疚募校?dāng)文件滿了跋选,寫入下一個文件;

config:保存一些配置信息哗蜈,包括一些Group前标,Topic以及Consumer消費(fèi)offset等信息。

consumeQueue:消息消費(fèi)隊列恬叹,引入的目的主要是提高消息消費(fèi)的性能候生,由于RocketMQ是基于主題topic的訂閱模式,消息消費(fèi)是針對主題進(jìn)行的绽昼,如果要遍歷commitlog文件中根據(jù)topic檢索消息是非常低效的唯鸭。

Consumer即可根據(jù)ConsumeQueue來查找待消費(fèi)的消息。其中硅确,ConsumeQueue(邏輯消費(fèi)隊列)作為消費(fèi)消息的索引目溉,保存了指定Topic下的隊列消息在CommitLog中的起始物理偏移量offset明肮,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的commitlog索引文件缭付,故consumequeue文件夾的組織方式如下:topic/queue/file三層組織結(jié)構(gòu)柿估,具體存儲路徑為:HOME \store\index\${fileName},文件名fileName是以創(chuàng)建時的時間戳命名的陷猫,固定的單個IndexFile文件大小約為400M秫舌,一個IndexFile可以保存 2000W個索引,IndexFile的底層存儲設(shè)計為在文件系統(tǒng)中實現(xiàn)HashMap結(jié)構(gòu)绣檬,故rocketmq的索引文件其底層實現(xiàn)為hash索引足陨。 我們發(fā)現(xiàn)我們的消息主體數(shù)據(jù)并沒有像Kafka一樣寫入多個文件,而是寫入一個文件,這樣我們的寫入IO競爭就非常小娇未,可以在很多Topic的時候依然保持很高的吞吐量墨缘。有同學(xué)說這里的ConsumeQueue寫是在不停的寫入呢,并且ConsumeQueue是以Queue維度來創(chuàng)建文件零抬,那么文件數(shù)量依然很多镊讼,在這里ConsumeQueue的寫入的數(shù)據(jù)量很小,每條消息只有20個字節(jié)平夜,30W條數(shù)據(jù)也才6M左右蝶棋,所以其實對我們的影響相對Kafka的Topic之間影響是要小很多的。我們整個的邏輯可以如下:

Producer不斷的再往CommitLog添加新的消息忽妒,有一個定時任務(wù)ReputService會不斷的掃描新添加進(jìn)來的CommitLog嚼松,然后不斷的去構(gòu)建ConsumerQueue和Index。

注意:這里指的都是普通的硬盤锰扶,在SSD上面多個文件并發(fā)寫入和單個文件寫入影響不大献酗。

讀取消息

Kafka中每個Partition都會是一個單獨(dú)的文件,所以當(dāng)消費(fèi)某個消息的時候坷牛,會很好的出現(xiàn)順序讀罕偎,我們知道OS從物理磁盤上訪問讀取文件的同時,會順序?qū)ζ渌噜弶K的數(shù)據(jù)文件進(jìn)行預(yù)讀取京闰,將數(shù)據(jù)放入PageCache颜及,所以Kafka的讀取消息性能比較好。

RocketMQ讀取流程如下:

先讀取ConsumerQueue中的offset對應(yīng)CommitLog物理的offset

根據(jù)offset讀取CommitLog

ConsumerQueue也是每個Queue一個單獨(dú)的文件蹂楣,并且其文件體積小俏站,所以很容易利用PageCache提高性能。而CommitLog痊土,由于同一個Queue的連續(xù)消息在CommitLog其實是不連續(xù)的肄扎,所以會造成隨機(jī)讀,RocketMQ對此做了幾個優(yōu)化:

Mmap映射讀取,Mmap的方式減少了傳統(tǒng)IO將磁盤文件數(shù)據(jù)在操作系統(tǒng)內(nèi)核地址空間的緩沖區(qū)和用戶應(yīng)用程序地址空間的緩沖區(qū)之間來回進(jìn)行拷貝的性能開銷

使用DeadLine調(diào)度算法+SSD存儲盤

由于Mmap映射受到內(nèi)存限制犯祠,當(dāng)不在Mmmap映射這部分?jǐn)?shù)據(jù)的時候(也就是消息堆積過多)旭等,默認(rèn)是內(nèi)存的40%,會將請求發(fā)送到SLAVE,減緩Master的壓力

3.6.2 可用性

3.6.2.1 集群模式

我們首先需要選擇一種集群模式衡载,來適應(yīng)我們可忍耐的可用程度搔耕,一般來說分為四種:

1)單Master

這種模式,可用性最低痰娱,但是成本也是最低弃榨,一旦宕機(jī),所有都不可用梨睁。這種一般只適用于本地測試惭墓。

2)單Master多SLAVE

這種模式,可用性一般而姐,如果主宕機(jī),那么所有寫入都不可用划咐,讀取依然可用拴念,如果master磁盤損壞,可以依賴slave的數(shù)據(jù)褐缠。

3)多Master

這種模式政鼠,可用性一般,如果出現(xiàn)部分master宕機(jī)队魏,那么這部分master上的消息都不可消費(fèi)公般,也不可寫數(shù)據(jù),如果一個Topic的隊列在多個Master上都有胡桨,那么可以保證沒有宕機(jī)的那部分可以正常消費(fèi)官帘,寫入。如果master的磁盤損壞會導(dǎo)致消息丟失昧谊。

4)多Master多Slave

這種模式刽虹,可用性最高,但是維護(hù)成本也最高呢诬,當(dāng)master宕機(jī)了之后涌哲,只會出現(xiàn)在這部分master上的隊列不可寫入,但是讀取依然是可以的尚镰,并且如果master磁盤損壞阀圾,可以依賴slave的數(shù)據(jù)。

一般來說投入生產(chǎn)環(huán)境的話都會選擇第四種狗唉,來保證最高的可用性初烘。

3.6.2.2 消息的可用性

當(dāng)我們選擇好了集群模式之后,那么我們需要關(guān)心的就是怎么去存儲和復(fù)制這個數(shù)據(jù),rocketMQ對消息的刷盤提供了同步和異步的策略來滿足我們的账月,當(dāng)我們選擇同步刷盤之后综膀,如果刷盤超時會給返回FLUSH_DISK_TIMEOUT,如果是異步刷盤不會返回刷盤相關(guān)信息局齿,選擇同步刷盤可以盡最大程度滿足我們的消息不會丟失剧劝。

除了存儲有選擇之后,我們的主從同步提供了同步和異步兩種模式來進(jìn)行復(fù)制抓歼,當(dāng)然選擇同步可以提升可用性讥此,但是消息的發(fā)送RT時間會下降10%左右。

3.6.3 Dleger

我們上面對于master-slave部署模式已經(jīng)做了很多分析谣妻,我們發(fā)現(xiàn)萄喳,當(dāng)master出現(xiàn)問題的時候,我們的寫入怎么都會不可用蹋半,除非恢復(fù)master他巨,或者手動將我們的slave切換成master,導(dǎo)致了我們的Slave在多數(shù)情況下只有讀取的作用减江。RocketMQ在最近的幾個版本中推出了Dleger-RocketMQ染突,使用Raft協(xié)議復(fù)制CommitLog,并且自動進(jìn)行選主辈灼,這樣master宕機(jī)的時候份企,寫入依然保持可用。

有關(guān)Dleger-RocketMQ的信息更多的可以查看這篇文章:Dledger-RocketMQ 基于Raft協(xié)議的commitlog存儲庫

3.7 定時/延時消息

定時消息和延時消息在實際業(yè)務(wù)場景中使用的比較多巡莹,比如下面的一些場景:

1)訂單超時未支付自動關(guān)閉司志,因為在很多場景中下單之后庫存就被鎖定了,這里需要將其進(jìn)行超時關(guān)閉降宅。

2)需要一些延時的操作骂远,比如一些兜底的邏輯,當(dāng)做完某個邏輯之后腰根,可以發(fā)送延時消息比如延時半個小時吧史,進(jìn)行兜底檢查補(bǔ)償。

3)在某個時間給用戶發(fā)送消息唠雕,同樣也可以使用延時消息贸营。

在開源版本的RocketMQ中延時消息并不支持任意時間的延時,需要設(shè)置幾個固定的延時等級岩睁,目前默認(rèn)設(shè)置為:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h钞脂,從1s到2h分別對應(yīng)著等級1到18,而阿里云中的版本(要付錢)是可以支持40天內(nèi)的任何時刻(毫秒級別)捕儒。

我們先看下在RocketMQ中定時任務(wù)原理圖:

Step1:Producer在自己發(fā)送的消息上設(shè)置好需要延時的級別冰啃。

Step2: Broker發(fā)現(xiàn)此消息是延時消息邓夕,將Topic進(jìn)行替換成延時Topic,每個延時級別都會作為一個單獨(dú)的queue阎毅,將自己的Topic作為額外信息存儲焚刚。

Step3: 構(gòu)建ConsumerQueue

Step4: 定時任務(wù)定時掃描每個延時級別的ConsumerQueue。

Step5: 拿到ConsumerQueue中的CommitLog的Offset扇调,獲取消息矿咕,判斷是否已經(jīng)達(dá)到執(zhí)行時間

Step6: 如果達(dá)到,那么將消息的Topic恢復(fù)狼钮,進(jìn)行重新投遞碳柱。如果沒有達(dá)到則延遲沒有達(dá)到的這段時間執(zhí)行任務(wù)。

可以看見延時消息是利用新建單獨(dú)的Topic和Queue來實現(xiàn)的熬芜,如果我們要實現(xiàn)40天之內(nèi)的任意時間度莲镣,基于這種方案,那么需要402460601000個queue涎拉,這樣的成本是非常之高的瑞侮,那阿里云上面的支持任意時間是怎么實現(xiàn)的呢?這里猜測是持久化二級TimeWheel時間輪鼓拧,二級時間輪用于替代我們的ConsumeQueue半火,保存Commitlog-Offset,然后通過時間輪不斷的取出當(dāng)前已經(jīng)到了的時間毁枯,然后再次投遞消息。具體的實現(xiàn)邏輯需要后續(xù)會單獨(dú)寫一篇文章叮称。

3.8 事務(wù)消息

事務(wù)消息同樣的也是RocketMQ中的一大特色种玛,其可以幫助我們完成分布式事務(wù)的最終一致性

具體使用事務(wù)消息步驟如下:

Step1:調(diào)用sendMessageInTransaction發(fā)送事務(wù)消息

Step2: 如果發(fā)送成功,則執(zhí)行本地事務(wù)瓤檐。

Step3: 如果執(zhí)行本地事務(wù)成功則發(fā)送commit赂韵,如果失敗則發(fā)送rollback。

Step4: 如果其中某個階段比如commit發(fā)送失敗挠蛉,rocketMQ會進(jìn)行定時從Broker回查祭示,本地事務(wù)的狀態(tài)。

事務(wù)消息的使用整個流程相對之前幾種消息使用比較復(fù)雜袖牙,下面是事務(wù)消息實現(xiàn)的原理圖:

Step1: 發(fā)送事務(wù)消息只磷,這里也叫做halfMessage皱蹦,會將Topic替換為HalfMessage的Topic。

Step2: 發(fā)送commit或者rollback汇陆,如果是commit這里會查詢出之前的消息,然后將消息復(fù)原成原Topic带饱,并且發(fā)送一個OpMessage用于記錄當(dāng)前消息可以刪除毡代。如果是rollback這里會直接發(fā)送一個OpMessage刪除阅羹。

Step3: 在Broker有個處理事務(wù)消息的定時任務(wù),定時對比halfMessage和OpMessage教寂,如果有OpMessage且狀態(tài)為刪除捏鱼,那么該條消息必定commit或者rollback,所以就可以刪除這條消息酪耕。

Step4: 如果事務(wù)超時(默認(rèn)是6s)导梆,還沒有opMessage,那么很有可能commit信息丟了因妇,這里會去反查我們的Producer本地事務(wù)狀態(tài)问潭。

Step5: 根據(jù)查詢出來的信息做Step2。

我們發(fā)現(xiàn)RocketMQ實現(xiàn)事務(wù)消息也是通過修改原Topic信息婚被,和延遲消息一樣狡忙,然后模擬成消費(fèi)者進(jìn)行消費(fèi),做一些特殊的業(yè)務(wù)邏輯址芯。當(dāng)然我們還可以利用這種方式去做RocketMQ更多的擴(kuò)展灾茁。

4. 總結(jié)

這里讓我們在回到文章中提到的幾個問題:

RocketMQ的topic和隊列是什么樣的,和Kafka的分區(qū)有什么不同谷炸?

RocketMQ網(wǎng)絡(luò)模型是什么樣的北专,和Kafka對比如何?

RocketMQ消息存儲模型是什么樣的旬陡,如何保證高可靠的存儲拓颓,和Kafka對比如何?

希望對你有所幫助描孟!

如果看完的小伙伴有興趣了解更多的話驶睦,歡迎添加vx小助手:SOSOXWV??免費(fèi)領(lǐng)取資料哦!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末匿醒,一起剝皮案震驚了整個濱河市场航,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌廉羔,老刑警劉巖溉痢,帶你破解...
    沈念sama閱讀 218,284評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異憋他,居然都是意外死亡孩饼,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,115評論 3 395
  • 文/潘曉璐 我一進(jìn)店門竹挡,熙熙樓的掌柜王于貴愁眉苦臉地迎上來捣辆,“玉大人,你說我怎么就攤上這事此迅∑耄” “怎么了旧巾?”我有些...
    開封第一講書人閱讀 164,614評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長忍些。 經(jīng)常有香客問我鲁猩,道長,這世上最難降的妖魔是什么罢坝? 我笑而不...
    開封第一講書人閱讀 58,671評論 1 293
  • 正文 為了忘掉前任廓握,我火速辦了婚禮,結(jié)果婚禮上嘁酿,老公的妹妹穿的比我還像新娘隙券。我一直安慰自己,他們只是感情好闹司,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,699評論 6 392
  • 文/花漫 我一把揭開白布娱仔。 她就那樣靜靜地躺著,像睡著了一般游桩。 火紅的嫁衣襯著肌膚如雪牲迫。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,562評論 1 305
  • 那天借卧,我揣著相機(jī)與錄音盹憎,去河邊找鬼。 笑死铐刘,一個胖子當(dāng)著我的面吹牛陪每,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播镰吵,決...
    沈念sama閱讀 40,309評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼檩禾,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了捡遍?” 一聲冷哼從身側(cè)響起锌订,我...
    開封第一講書人閱讀 39,223評論 0 276
  • 序言:老撾萬榮一對情侶失蹤竹握,失蹤者是張志新(化名)和其女友劉穎画株,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體啦辐,經(jīng)...
    沈念sama閱讀 45,668評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡谓传,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,859評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了芹关。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片续挟。...
    茶點(diǎn)故事閱讀 39,981評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖侥衬,靈堂內(nèi)的尸體忽然破棺而出诗祸,到底是詐尸還是另有隱情跑芳,我是刑警寧澤,帶...
    沈念sama閱讀 35,705評論 5 347
  • 正文 年R本政府宣布直颅,位于F島的核電站博个,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏功偿。R本人自食惡果不足惜盆佣,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,310評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望械荷。 院中可真熱鬧共耍,春花似錦、人聲如沸吨瞎。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,904評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽关拒。三九已至佃蚜,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間着绊,已是汗流浹背谐算。 一陣腳步聲響...
    開封第一講書人閱讀 33,023評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留归露,地道東北人洲脂。 一個月前我還...
    沈念sama閱讀 48,146評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像剧包,于是被迫代替她去往敵國和親恐锦。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,933評論 2 355

推薦閱讀更多精彩內(nèi)容