RocketMQ關(guān)鍵機制的設(shè)計原理

設(shè)計(design)


1 消息存儲

消息存儲是RocketMQ中最為復(fù)雜和最為重要的一部分韵丑,本節(jié)將分別從RocketMQ的消息存儲整體架構(gòu)、PageCache與Mmap內(nèi)存映射以及RocketMQ中兩種不同的刷盤方式三方面來分別展開敘述虚缎。

1.1 消息存儲整體架構(gòu)

消息存儲架構(gòu)圖中主要有下面三個跟消息存儲相關(guān)的文件構(gòu)成撵彻。

(1) CommitLog:消息主體以及元數(shù)據(jù)的存儲主體钓株,存儲Producer端寫入的消息主體內(nèi)容,消息內(nèi)容不是定長的。單個文件大小默認1G 陌僵,文件名長度為20位轴合,左邊補零,剩余為起始偏移量碗短,比如00000000000000000000代表了第一個文件受葛,起始偏移量為0,文件大小為1G=1073741824偎谁;當?shù)谝粋€文件寫滿了总滩,第二個文件為00000000001073741824,起始偏移量為1073741824巡雨,以此類推闰渔。消息主要是順序?qū)懭肴罩疚募斘募M了铐望,寫入下一個文件冈涧;

(2) ConsumeQueue:消息消費隊列,引入的目的主要是提高消息消費的性能正蛙,由于RocketMQ是基于主題topic的訂閱模式督弓,消息消費是針對主題進行的,如果要遍歷commitlog文件中根據(jù)topic檢索消息是非常低效的乒验。Consumer即可根據(jù)ConsumeQueue來查找待消費的消息咽筋。其中,ConsumeQueue(邏輯消費隊列)作為消費消息的索引徊件,保存了指定Topic下的隊列消息在CommitLog中的起始物理偏移量offset奸攻,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的commitlog索引文件虱痕,故consumequeue文件夾的組織方式如下:topic/queue/file三層組織結(jié)構(gòu)睹耐,具體存儲路徑為:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同樣consumequeue文件采取定長設(shè)計部翘,每一個條目共20個字節(jié)硝训,分別為8字節(jié)的commitlog物理偏移量、4字節(jié)的消息長度新思、8字節(jié)tag hashcode窖梁,單個文件由30W個條目組成,可以像數(shù)組一樣隨機訪問每一個條目夹囚,每個ConsumeQueue文件大小約5.72M纵刘;

(3) IndexFile:IndexFile(索引文件)提供了一種可以通過key或時間區(qū)間來查詢消息的方法。Index文件的存儲位置是:HOME \store\index{fileName}荸哟,文件名fileName是以創(chuàng)建時的時間戳命名的假哎,固定的單個IndexFile文件大小約為400M瞬捕,一個IndexFile可以保存 2000W個索引,IndexFile的底層存儲設(shè)計為在文件系統(tǒng)中實現(xiàn)HashMap結(jié)構(gòu)舵抹,故rocketmq的索引文件其底層實現(xiàn)為hash索引肪虎。

在上面的RocketMQ的消息存儲整體架構(gòu)圖中可以看出,RocketMQ采用的是混合型的存儲結(jié)構(gòu)惧蛹,即為Broker單個實例下所有的隊列共用一個日志數(shù)據(jù)文件(即為CommitLog)來存儲扇救。RocketMQ的混合型存儲結(jié)構(gòu)(多個Topic的消息實體內(nèi)容都存儲于一個CommitLog中)針對Producer和Consumer分別采用了數(shù)據(jù)和索引部分相分離的存儲結(jié)構(gòu),Producer發(fā)送消息至Broker端香嗓,然后Broker端使用同步或者異步的方式對消息刷盤持久化迅腔,保存至CommitLog中。只要消息被刷盤持久化至磁盤文件CommitLog中陶缺,那么Producer發(fā)送的消息就不會丟失。正因為如此洁灵,Consumer也就肯定有機會去消費這條消息饱岸。當無法拉取到消息后,可以等下一次消息拉取徽千,同時服務(wù)端也支持長輪詢模式苫费,如果一個消息拉取請求未拉取到消息,Broker允許等待30s的時間双抽,只要這段時間內(nèi)有新消息到達百框,將直接返回給消費端。這里牍汹,RocketMQ的具體做法是铐维,使用Broker端的后臺服務(wù)線程—ReputMessageService不停地分發(fā)請求并異步構(gòu)建ConsumeQueue(邏輯消費隊列)和IndexFile(索引文件)數(shù)據(jù)。

1.2 頁緩存與內(nèi)存映射

頁緩存(PageCache)是OS對文件的緩存慎菲,用于加速對文件的讀寫嫁蛇。一般來說,程序?qū)ξ募M行順序讀寫的速度幾乎接近于內(nèi)存的讀寫速度露该,主要原因就是由于OS使用PageCache機制對讀寫訪問操作進行了性能優(yōu)化睬棚,將一部分的內(nèi)存用作PageCache。對于數(shù)據(jù)的寫入解幼,OS會先寫入至Cache內(nèi)抑党,隨后通過異步的方式由pdflush內(nèi)核線程將Cache內(nèi)的數(shù)據(jù)刷盤至物理磁盤上。對于數(shù)據(jù)的讀取撵摆,如果一次讀取文件時出現(xiàn)未命中PageCache的情況底靠,OS從物理磁盤上訪問讀取文件的同時,會順序?qū)ζ渌噜弶K的數(shù)據(jù)文件進行預(yù)讀取特铝。

在RocketMQ中苛骨,ConsumeQueue邏輯消費隊列存儲的數(shù)據(jù)較少篱瞎,并且是順序讀取,在page cache機制的預(yù)讀取作用下痒芝,Consume Queue文件的讀性能幾乎接近讀內(nèi)存俐筋,即使在有消息堆積情況下也不會影響性能。而對于CommitLog消息存儲的日志數(shù)據(jù)文件來說严衬,讀取消息內(nèi)容時候會產(chǎn)生較多的隨機訪問讀取澄者,嚴重影響性能。如果選擇合適的系統(tǒng)IO調(diào)度算法请琳,比如設(shè)置調(diào)度算法為“Deadline”(此時塊存儲采用SSD的話)粱挡,隨機讀的性能也會有所提升。

另外俄精,RocketMQ主要通過MappedByteBuffer對文件進行讀寫操作询筏。其中,利用了NIO中的FileChannel模型將磁盤上的物理文件直接映射到用戶態(tài)的內(nèi)存地址中(這種Mmap的方式減少了傳統(tǒng)IO將磁盤文件數(shù)據(jù)在操作系統(tǒng)內(nèi)核地址空間的緩沖區(qū)和用戶應(yīng)用程序地址空間的緩沖區(qū)之間來回進行拷貝的性能開銷)竖慧,將對文件的操作轉(zhuǎn)化為直接對內(nèi)存地址進行操作嫌套,從而極大地提高了文件的讀寫效率(正因為需要使用內(nèi)存映射機制,故RocketMQ的文件存儲都使用定長結(jié)構(gòu)來存儲圾旨,方便一次將整個文件映射至內(nèi)存)踱讨。

1.3 消息刷盤

(1) 同步刷盤:如上圖所示,只有在消息真正持久化至磁盤后RocketMQ的Broker端才會真正返回給Producer端一個成功的ACK響應(yīng)砍的。同步刷盤對MQ消息可靠性來說是一種不錯的保障痹筛,但是性能上會有較大影響,一般適用于金融業(yè)務(wù)應(yīng)用該模式較多廓鞠。

(2) 異步刷盤:能夠充分利用OS的PageCache的優(yōu)勢帚稠,只要消息寫入PageCache即可將成功的ACK返回給Producer端。消息刷盤采用后臺異步線程提交的方式進行床佳,降低了讀寫延遲翁锡,提高了MQ的性能和吞吐量。

2 通信機制

RocketMQ消息隊列集群主要包括NameServer夕土、Broker(Master/Slave)馆衔、Producer、Consumer4個角色怨绣,基本通訊流程如下:

(1) Broker啟動后需要完成一次將自己注冊至NameServer的操作角溃;隨后每隔30s時間定時向NameServer上報Topic路由信息。

(2) 消息生產(chǎn)者Producer作為客戶端發(fā)送消息時候篮撑,需要根據(jù)消息的Topic從本地緩存的TopicPublishInfoTable獲取路由信息减细。如果沒有則更新路由信息會從NameServer上重新拉取,同時Producer會默認每隔30s向NameServer拉取一次路由信息赢笨。

(3) 消息生產(chǎn)者Producer根據(jù)2)中獲取的路由信息選擇一個隊列(MessageQueue)進行消息發(fā)送未蝌;Broker作為消息的接收者接收消息并落盤存儲驮吱。

(4) 消息消費者Consumer根據(jù)2)中獲取的路由信息,并再完成客戶端的負載均衡后萧吠,選擇其中的某一個或者某幾個消息隊列來拉取消息并進行消費左冬。

從上面1)~3)中可以看出在消息生產(chǎn)者, Broker和NameServer之間都會發(fā)生通信(這里只說了MQ的部分通信),因此如何設(shè)計一個良好的網(wǎng)絡(luò)通信模塊在MQ中至關(guān)重要纸型,它將決定RocketMQ集群整體的消息傳輸能力與最終的性能拇砰。

rocketmq-remoting 模塊是 RocketMQ消息隊列中負責網(wǎng)絡(luò)通信的模塊,它幾乎被其他所有需要網(wǎng)絡(luò)通信的模塊(諸如rocketmq-client狰腌、rocketmq-broker除破、rocketmq-namesrv)所依賴和引用。為了實現(xiàn)客戶端與服務(wù)器之間高效的數(shù)據(jù)請求與接收琼腔,RocketMQ消息隊列自定義了通信協(xié)議并在Netty的基礎(chǔ)之上擴展了通信模塊瑰枫。

2.1 Remoting通信類結(jié)構(gòu)

2.2 協(xié)議設(shè)計與編解碼

在Client和Server之間完成一次消息發(fā)送時,需要對發(fā)送的消息進行一個協(xié)議約定丹莲,因此就有必要自定義RocketMQ的消息協(xié)議光坝。同時,為了高效地在網(wǎng)絡(luò)中傳輸消息和對收到的消息讀取圾笨,就需要對消息進行編解碼教馆。在RocketMQ中逊谋,RemotingCommand這個類在消息傳輸過程中對所有數(shù)據(jù)內(nèi)容的封裝擂达,不但包含了所有的數(shù)據(jù)結(jié)構(gòu),還包含了編碼解碼操作胶滋。

Header字段 類型 Request說明 Response說明
code int 請求操作碼板鬓,應(yīng)答方根據(jù)不同的請求碼進行不同的業(yè)務(wù)處理 應(yīng)答響應(yīng)碼。0表示成功究恤,非0則表示各種錯誤
language LanguageCode 請求方實現(xiàn)的語言 應(yīng)答方實現(xiàn)的語言
version int 請求方程序的版本 應(yīng)答方程序的版本
opaque int 相當于requestId俭令,在同一個連接上的不同請求標識碼,與響應(yīng)消息中的相對應(yīng) 應(yīng)答不做修改直接返回
flag int 區(qū)分是普通RPC還是onewayRPC得標志 區(qū)分是普通RPC還是onewayRPC得標志
remark String 傳輸自定義文本信息 傳輸自定義文本信息
extFields HashMap<String, String> 請求自定義擴展信息 響應(yīng)自定義擴展信息

可見傳輸內(nèi)容主要可以分為以下4部分:

(1) 消息長度:總長度部宿,四個字節(jié)存儲抄腔,占用一個int類型;

(2) 序列化類型&消息頭長度:同樣占用一個int類型理张,第一個字節(jié)表示序列化類型赫蛇,后面三個字節(jié)表示消息頭長度;

(3) 消息頭數(shù)據(jù):經(jīng)過序列化后的消息頭數(shù)據(jù)雾叭;

(4) 消息主體數(shù)據(jù):消息主體的二進制字節(jié)數(shù)據(jù)內(nèi)容悟耘;

2.3 消息的通信方式和流程

在RocketMQ消息隊列中支持通信的方式主要有同步(sync)、異步(async)织狐、單向(oneway) 三種暂幼。其中“單向”通信模式相對簡單筏勒,一般用在發(fā)送心跳包場景下,無需關(guān)注其Response旺嬉。這里管行,主要介紹RocketMQ的異步通信流程。

2.4 Reactor多線程設(shè)計

RocketMQ的RPC通信采用Netty組件作為底層通信庫鹰服,同樣也遵循了Reactor多線程模型病瞳,同時又在這之上做了一些擴展和優(yōu)化。

上面的框圖中可以大致了解RocketMQ中NettyRemotingServer的Reactor 多線程模型悲酷。一個 Reactor 主線程(eventLoopGroupBoss套菜,即為上面的1)負責監(jiān)聽 TCP網(wǎng)絡(luò)連接請求,建立好連接设易,創(chuàng)建SocketChannel逗柴,并注冊到selector上。RocketMQ的源碼中會自動根據(jù)OS的類型選擇NIO和Epoll顿肺,也可以通過參數(shù)配置),然后監(jiān)聽真正的網(wǎng)絡(luò)數(shù)據(jù)戏溺。拿到網(wǎng)絡(luò)數(shù)據(jù)后,再丟給Worker線程池(eventLoopGroupSelector屠尊,即為上面的“N”旷祸,源碼中默認設(shè)置為3),在真正執(zhí)行業(yè)務(wù)邏輯之前需要進行SSL驗證讼昆、編解碼托享、空閑檢查、網(wǎng)絡(luò)連接管理浸赫,這些工作交給defaultEventExecutorGroup(即為上面的“M1”闰围,源碼中默認設(shè)置為8)去做。而處理業(yè)務(wù)操作放在業(yè)務(wù)線程池中執(zhí)行既峡,根據(jù) RomotingCommand 的業(yè)務(wù)請求碼code去processorTable這個本地緩存變量中找到對應(yīng)的 processor羡榴,然后封裝成task任務(wù)后,提交給對應(yīng)的業(yè)務(wù)processor處理線程池來執(zhí)行(sendMessageExecutor运敢,以發(fā)送消息為例校仑,即為上面的 “M2”)。從入口到業(yè)務(wù)邏輯的幾個步驟中線程池一直再增加传惠,這跟每一步邏輯復(fù)雜性相關(guān)迄沫,越復(fù)雜吁讨,需要的并發(fā)通道越寬滔悉。

線程數(shù) 線程名 線程具體說明
1 NettyBoss_%d Reactor 主線程
N NettyServerEPOLLSelector_%d_%d Reactor 線程池
M1 NettyServerCodecThread_%d Worker線程池
M2 RemotingExecutorThread_%d 業(yè)務(wù)processor處理線程池

3 消息過濾

RocketMQ分布式消息隊列的消息過濾方式有別于其它MQ中間件亏吝,是在Consumer端訂閱消息時再做消息過濾的道伟。RocketMQ這么做是在于其Producer端寫入消息和Consumer端訂閱消息采用分離存儲的機制來實現(xiàn)的睛挚,Consumer端訂閱消息是需要通過ConsumeQueue這個消息消費的邏輯隊列拿到一個索引裁眯,然后再從CommitLog里面讀取真正的消息實體內(nèi)容豆胸,所以說到底也是還繞不開其存儲結(jié)構(gòu)常柄。其ConsumeQueue的存儲結(jié)構(gòu)如下,可以看到其中有8個字節(jié)存儲的Message Tag的哈希值摇予,基于Tag的消息過濾正式基于這個字段值的汽绢。

主要支持如下2種的過濾方式 (1) Tag過濾方式:Consumer端在訂閱消息時除了指定Topic還可以指定TAG,如果一個消息有多個TAG侧戴,可以用||分隔宁昭。其中,Consumer端會將這個訂閱請求構(gòu)建成一個 SubscriptionData酗宋,發(fā)送一個Pull消息的請求給Broker端积仗。Broker端從RocketMQ的文件存儲層—Store讀取數(shù)據(jù)之前,會用這些數(shù)據(jù)先構(gòu)建一個MessageFilter蜕猫,然后傳給Store寂曹。Store從 ConsumeQueue讀取到一條記錄后,會用它記錄的消息tag hash值去做過濾回右,由于在服務(wù)端只是根據(jù)hashcode進行判斷隆圆,無法精確對tag原始字符串進行過濾,故在消息消費端拉取到消息后翔烁,還需要對消息的原始tag字符串進行比對渺氧,如果不同,則丟棄該消息蹬屹,不進行消息消費侣背。

(2) SQL92的過濾方式:這種方式的大致做法和上面的Tag過濾方式一樣,只是在Store層的具體過濾過程不太一樣哩治,真正的 SQL expression 的構(gòu)建和執(zhí)行由rocketmq-filter模塊負責的秃踩。每次過濾都去執(zhí)行SQL表達式會影響效率衬鱼,所以RocketMQ使用了BloomFilter避免了每次都去執(zhí)行业筏。SQL92的表達式上下文為消息的屬性。

4 負載均衡

RocketMQ中的負載均衡都在Client端完成鸟赫,具體來說的話蒜胖,主要可以分為Producer端發(fā)送消息時候的負載均衡和Consumer端訂閱消息的負載均衡。

4.1 Producer的負載均衡

Producer端在發(fā)送消息的時候抛蚤,會先根據(jù)Topic找到指定的TopicPublishInfo台谢,在獲取了TopicPublishInfo路由信息后,RocketMQ的客戶端在默認方式下selectOneMessageQueue()方法會從TopicPublishInfo中的messageQueueList中選擇一個隊列(MessageQueue)進行發(fā)送消息岁经。具體的容錯策略均在MQFaultStrategy這個類中定義朋沮。這里有一個sendLatencyFaultEnable開關(guān)變量,如果開啟缀壤,在隨機遞增取模的基礎(chǔ)上樊拓,再過濾掉not available的Broker代理纠亚。所謂的"latencyFaultTolerance",是指對之前失敗的筋夏,按一定的時間做退避蒂胞。例如,如果上次請求的latency超過550Lms条篷,就退避3000Lms骗随;超過1000L,就退避60000L赴叹;如果關(guān)閉鸿染,采用隨機遞增取模的方式選擇一個隊列(MessageQueue)來發(fā)送消息,latencyFaultTolerance機制是實現(xiàn)消息發(fā)送高可用的核心關(guān)鍵所在乞巧。

4.2 Consumer的負載均衡

在RocketMQ中牡昆,Consumer端的兩種消費模式(Push/Pull)都是基于拉模式來獲取消息的,而在Push模式只是對pull模式的一種封裝摊欠,其本質(zhì)實現(xiàn)為消息拉取線程在從服務(wù)器拉取到一批消息后丢烘,然后提交到消息消費線程池后,又“馬不停蹄”的繼續(xù)向服務(wù)器再次嘗試拉取消息些椒。如果未拉取到消息播瞳,則延遲一下又繼續(xù)拉取。在兩種基于拉模式的消費方式(Push/Pull)中免糕,均需要Consumer端在知道從Broker端的哪一個消息隊列—隊列中去獲取消息赢乓。因此,有必要在Consumer端來做負載均衡石窑,即Broker端中多個MessageQueue分配給同一個ConsumerGroup中的哪些Consumer消費牌芋。

1、Consumer端的心跳包發(fā)送

在Consumer啟動后松逊,它就會通過定時任務(wù)不斷地向RocketMQ集群中的所有Broker實例發(fā)送心跳包(其中包含了躺屁,消息消費分組名稱、訂閱關(guān)系集合经宏、消息通信模式和客戶端id的值等信息)犀暑。Broker端在收到Consumer的心跳消息后,會將它維護在ConsumerManager的本地緩存變量—consumerTable烁兰,同時并將封裝后的客戶端網(wǎng)絡(luò)通道信息保存在本地緩存變量—channelInfoTable中耐亏,為之后做Consumer端的負載均衡提供可以依據(jù)的元數(shù)據(jù)信息。

2沪斟、Consumer端實現(xiàn)負載均衡的核心類—RebalanceImpl

在Consumer實例的啟動流程中的啟動MQClientInstance實例部分广辰,會完成負載均衡服務(wù)線程—RebalanceService的啟動(每隔20s執(zhí)行一次)。通過查看源碼可以發(fā)現(xiàn),RebalanceService線程的run()方法最終調(diào)用的是RebalanceImpl類的rebalanceByTopic()方法择吊,該方法是實現(xiàn)Consumer端負載均衡的核心袱耽。這里,rebalanceByTopic()方法會根據(jù)消費者通信類型為“廣播模式”還是“集群模式”做不同的邏輯處理干发。這里主要來看下集群模式下的主要處理流程:

(1) 從rebalanceImpl實例的本地緩存變量—topicSubscribeInfoTable中朱巨,獲取該Topic主題下的消息消費隊列集合(mqSet);

(2) 根據(jù)topic和consumerGroup為參數(shù)調(diào)用mQClientFactory.findConsumerIdList()方法向Broker端發(fā)送獲取該消費組下消費者Id列表的RPC通信請求(Broker端基于前面Consumer端上報的心跳包數(shù)據(jù)而構(gòu)建的consumerTable做出響應(yīng)返回枉长,業(yè)務(wù)請求碼:GET_CONSUMER_LIST_BY_GROUP)冀续;

(3) 先對Topic下的消息消費隊列、消費者Id排序必峰,然后用消息隊列分配策略算法(默認為:消息隊列的平均分配算法)洪唐,計算出待拉取的消息隊列。這里的平均分配算法吼蚁,類似于分頁的算法凭需,將所有MessageQueue排好序類似于記錄,將所有消費端Consumer排好序類似頁數(shù)肝匆,并求出每一頁需要包含的平均size和每個頁面記錄的范圍range粒蜈,最后遍歷整個range而計算出當前Consumer端應(yīng)該分配到的記錄(這里即為:MessageQueue)。

(4) 然后旗国,調(diào)用updateProcessQueueTableInRebalance()方法枯怖,具體的做法是,先將分配到的消息隊列集合(mqSet)與processQueueTable做一個過濾比對能曾。

  • 上圖中processQueueTable標注的紅色部分度硝,表示與分配到的消息隊列集合mqSet互不包含。將這些隊列設(shè)置Dropped屬性為true寿冕,然后查看這些隊列是否可以移除出processQueueTable緩存變量蕊程,這里具體執(zhí)行removeUnnecessaryMessageQueue()方法,即每隔1s 查看是否可以獲取當前消費處理隊列的鎖驼唱,拿到的話返回true藻茂。如果等待1s后,仍然拿不到當前消費處理隊列的鎖則返回false曙蒸。如果返回true捌治,則從processQueueTable緩存變量中移除對應(yīng)的Entry岗钩;

  • 上圖中processQueueTable的綠色部分纽窟,表示與分配到的消息隊列集合mqSet的交集。判斷該ProcessQueue是否已經(jīng)過期了兼吓,在Pull模式的不用管臂港,如果是Push模式的,設(shè)置Dropped屬性為true,并且調(diào)用removeUnnecessaryMessageQueue()方法审孽,像上面一樣嘗試移除Entry县袱;

最后,為過濾后的消息隊列集合(mqSet)中的每個MessageQueue創(chuàng)建一個ProcessQueue對象并存入RebalanceImpl的processQueueTable隊列中(其中調(diào)用RebalanceImpl實例的computePullFromWhere(MessageQueue mq)方法獲取該MessageQueue對象的下一個進度消費值offset佑力,隨后填充至接下來要創(chuàng)建的pullRequest對象屬性中)式散,并創(chuàng)建拉取請求對象—pullRequest添加到拉取列表—pullRequestList中,最后執(zhí)行dispatchPullRequest()方法打颤,將Pull消息的請求對象PullRequest依次放入PullMessageService服務(wù)線程的阻塞隊列pullRequestQueue中暴拄,待該服務(wù)線程取出后向Broker端發(fā)起Pull消息的請求。其中编饺,可以重點對比下乖篷,RebalancePushImpl和RebalancePullImpl兩個實現(xiàn)類的dispatchPullRequest()方法不同,RebalancePullImpl類里面的該方法為空透且,這樣子也就回答了上一篇中最后的那道思考題了撕蔼。

消息消費隊列在同一消費組不同消費者之間的負載均衡,其核心設(shè)計理念是在一個消息消費隊列在同一時間只允許被同一消費組內(nèi)的一個消費者消費秽誊,一個消息消費者能同時消費多個消息隊列鲸沮。

5 事務(wù)消息

Apache RocketMQ在4.3.0版中已經(jīng)支持分布式事務(wù)消息,這里RocketMQ采用了2PC的思想來實現(xiàn)了提交事務(wù)消息锅论,同時增加一個補償邏輯來處理二階段超時或者失敗的消息诉探,如下圖所示。

5.1 RocketMQ事務(wù)消息流程概要

上圖說明了事務(wù)消息的大致方案棍厌,其中分為兩個流程:正常事務(wù)消息的發(fā)送及提交肾胯、事務(wù)消息的補償流程。

1.事務(wù)消息發(fā)送及提交:

(1) 發(fā)送消息(half消息)耘纱。

(2) 服務(wù)端響應(yīng)消息寫入結(jié)果敬肚。

(3) 根據(jù)發(fā)送結(jié)果執(zhí)行本地事務(wù)(如果寫入失敗,此時half消息對業(yè)務(wù)不可見束析,本地邏輯不執(zhí)行)艳馒。

(4) 根據(jù)本地事務(wù)狀態(tài)執(zhí)行Commit或者Rollback(Commit操作生成消息索引,消息對消費者可見)

2.補償流程:

(1) 對沒有Commit/Rollback的事務(wù)消息(pending狀態(tài)的消息)员寇,從服務(wù)端發(fā)起一次“回查”

(2) Producer收到回查消息弄慰,檢查回查消息對應(yīng)的本地事務(wù)的狀態(tài)

(3) 根據(jù)本地事務(wù)狀態(tài),重新Commit或者Rollback

其中蝶锋,補償階段用于解決消息Commit或者Rollback發(fā)生超時或者失敗的情況陆爽。

5.2 RocketMQ事務(wù)消息設(shè)計

1.事務(wù)消息在一階段對用戶不可見

在RocketMQ事務(wù)消息的主要流程中,一階段的消息如何對用戶不可見扳缕。其中慌闭,事務(wù)消息相對普通消息最大的特點就是一階段發(fā)送的消息對用戶是不可見的别威。那么,如何做到寫入消息但是對用戶不可見呢驴剔?RocketMQ事務(wù)消息的做法是:如果消息是half消息省古,將備份原消息的主題與消息消費隊列,然后改變主題為RMQ_SYS_TRANS_HALF_TOPIC丧失。由于消費組未訂閱該主題豺妓,故消費端無法消費half類型的消息,然后RocketMQ會開啟一個定時任務(wù)布讹,從Topic為RMQ_SYS_TRANS_HALF_TOPIC中拉取消息進行消費科侈,根據(jù)生產(chǎn)者組獲取一個服務(wù)提供者發(fā)送回查事務(wù)狀態(tài)請求,根據(jù)事務(wù)狀態(tài)來決定是提交或回滾消息炒事。

在RocketMQ中臀栈,消息在服務(wù)端的存儲結(jié)構(gòu)如下,每條消息都會有對應(yīng)的索引信息挠乳,Consumer通過ConsumeQueue這個二級索引來讀取消息實體內(nèi)容权薯,其流程如下:

RocketMQ的具體實現(xiàn)策略是:寫入的如果事務(wù)消息,對消息的Topic和Queue等屬性進行替換睡扬,同時將原來的Topic和Queue信息存儲到消息的屬性中盟蚣,正因為消息主題被替換,故消息并不會轉(zhuǎn)發(fā)到該原主題的消息消費隊列卖怜,消費者無法感知消息的存在屎开,不會消費。其實改變消息主題是RocketMQ的常用“套路”马靠,回想一下延時消息的實現(xiàn)機制奄抽。

2.Commit和Rollback操作以及Op消息的引入

在完成一階段寫入一條對用戶不可見的消息后,二階段如果是Commit操作甩鳄,則需要讓消息對用戶可見逞度;如果是Rollback則需要撤銷一階段的消息。先說Rollback的情況妙啃。對于Rollback档泽,本身一階段的消息對用戶是不可見的,其實不需要真正撤銷消息(實際上RocketMQ也無法去真正的刪除一條消息揖赴,因為是順序?qū)懳募模┕菽洹5菂^(qū)別于這條消息沒有確定狀態(tài)(Pending狀態(tài),事務(wù)懸而未決)燥滑,需要一個操作來標識這條消息的最終狀態(tài)渐北。RocketMQ事務(wù)消息方案中引入了Op消息的概念,用Op消息標識事務(wù)消息已經(jīng)確定的狀態(tài)(Commit或者Rollback)突倍。如果一條事務(wù)消息沒有對應(yīng)的Op消息腔稀,說明這個事務(wù)的狀態(tài)還無法確定(可能是二階段失敗了)盆昙。引入Op消息后羽历,事務(wù)消息無論是Commit或者Rollback都會記錄一個Op操作焊虏。Commit相對于Rollback只是在寫入Op消息前創(chuàng)建Half消息的索引。

3.Op消息的存儲和對應(yīng)關(guān)系

RocketMQ將Op消息寫入到全局一個特定的Topic中通過源碼中的方法—TransactionalMessageUtil.buildOpTopic()秕磷;這個Topic是一個內(nèi)部的Topic(像Half消息的Topic一樣)诵闭,不會被用戶消費。Op消息的內(nèi)容為對應(yīng)的Half消息的存儲的Offset澎嚣,這樣通過Op消息能索引到Half消息進行后續(xù)的回查操作疏尿。

4.Half消息的索引構(gòu)建

在執(zhí)行二階段Commit操作時,需要構(gòu)建出Half消息的索引易桃。一階段的Half消息由于是寫到一個特殊的Topic褥琐,所以二階段構(gòu)建索引時需要讀取出Half消息,并將Topic和Queue替換成真正的目標的Topic和Queue晤郑,之后通過一次普通消息的寫入操作來生成一條對用戶可見的消息敌呈。所以RocketMQ事務(wù)消息二階段其實是利用了一階段存儲的消息的內(nèi)容,在二階段時恢復(fù)出一條完整的普通消息造寝,然后走一遍消息寫入流程磕洪。

5.如何處理二階段失敗的消息?

如果在RocketMQ事務(wù)消息的二階段過程中失敗了诫龙,例如在做Commit操作時析显,出現(xiàn)網(wǎng)絡(luò)問題導(dǎo)致Commit失敗,那么需要通過一定的策略使這條消息最終被Commit签赃。RocketMQ采用了一種補償機制谷异,稱為“回查”。Broker端對未確定狀態(tài)的消息發(fā)起回查锦聊,將消息發(fā)送到對應(yīng)的Producer端(同一個Group的Producer)晰绎,由Producer根據(jù)消息來檢查本地事務(wù)的狀態(tài),進而執(zhí)行Commit或者Rollback括丁。Broker端通過對比Half消息和Op消息進行事務(wù)消息的回查并且推進CheckPoint(記錄那些事務(wù)消息的狀態(tài)是確定的)荞下。

值得注意的是,rocketmq并不會無休止的的信息事務(wù)狀態(tài)回查史飞,默認回查15次尖昏,如果15次回查還是無法得知事務(wù)狀態(tài),rocketmq默認回滾該消息构资。

6 消息查詢

RocketMQ支持按照下面兩種維度(“按照Message Id查詢消息”抽诉、“按照Message Key查詢消息”)進行消息查詢。

6.1 按照MessageId查詢消息

RocketMQ中的MessageId的長度總共有16字節(jié)吐绵,其中包含了消息存儲主機地址(IP地址和端口)迹淌,消息Commit Log offset河绽。“按照MessageId查詢消息”在RocketMQ中具體做法是:Client端從MessageId中解析出Broker的地址(IP地址和端口)和Commit Log的偏移地址后封裝成一個RPC請求后通過Remoting通信層發(fā)送(業(yè)務(wù)請求碼:VIEW_MESSAGE_BY_ID)唉窃。Broker端走的是QueryMessageProcessor耙饰,讀取消息的過程用其中的 commitLog offset 和 size 去 commitLog 中找到真正的記錄并解析成一個完整的消息返回。

6.2 按照Message Key查詢消息

“按照Message Key查詢消息”纹份,主要是基于RocketMQ的IndexFile索引文件來實現(xiàn)的苟跪。RocketMQ的索引文件邏輯結(jié)構(gòu),類似JDK中HashMap的實現(xiàn)蔓涧。索引文件的具體結(jié)構(gòu)如下:

IndexFile索引文件為用戶提供通過“按照Message Key查詢消息”的消息索引查詢服務(wù)件已,IndexFile文件的存儲位置是:HOME\store\index{fileName},文件名fileName是以創(chuàng)建時的時間戳命名的元暴,文件大小是固定的篷扩,等于40+500W4+2000W20= 420000040個字節(jié)大小。如果消息的properties中設(shè)置了UNIQ_KEY這個屬性茉盏,就用 topic + “#” + UNIQ_KEY的value作為 key 來做寫入操作鉴未。如果消息設(shè)置了KEYS屬性(多個KEY以空格分隔),也會用 topic + “#” + KEY 來做索引援岩。

其中的索引數(shù)據(jù)包含了Key Hash/CommitLog Offset/Timestamp/NextIndex offset 這四個字段歼狼,一共20 Byte。NextIndex offset 即前面讀出來的 slotValue享怀,如果有 hash沖突羽峰,就可以用這個字段將所有沖突的索引用鏈表的方式串起來了。Timestamp記錄的是消息storeTimestamp之間的差添瓷,并不是一個絕對的時間梅屉。整個Index File的結(jié)構(gòu)如圖,40 Byte 的Header用于保存一些總的統(tǒng)計信息鳞贷,4500W的 Slot Table并不保存真正的索引數(shù)據(jù)坯汤,而是保存每個槽位對應(yīng)的單向鏈表的頭。202000W 是真正的索引數(shù)據(jù)搀愧,即一個 Index File 可以保存 2000W個索引惰聂。

“按照Message Key查詢消息”的方式,RocketMQ的具體做法是咱筛,主要通過Broker端的QueryMessageProcessor業(yè)務(wù)處理器來查詢搓幌,讀取消息的過程就是用topic和key找到IndexFile索引文件中的一條記錄,根據(jù)其中的commitLog offset從CommitLog文件中讀取消息的實體內(nèi)容迅箩。

原文可看https://github.com/apache/rocketmq/blob/master/docs/cn/design.md

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末溉愁,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子饲趋,更是在濱河造成了極大的恐慌拐揭,老刑警劉巖撤蟆,帶你破解...
    沈念sama閱讀 218,036評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異堂污,居然都是意外死亡家肯,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,046評論 3 395
  • 文/潘曉璐 我一進店門敷鸦,熙熙樓的掌柜王于貴愁眉苦臉地迎上來息楔,“玉大人寝贡,你說我怎么就攤上這事扒披。” “怎么了圃泡?”我有些...
    開封第一講書人閱讀 164,411評論 0 354
  • 文/不壞的土叔 我叫張陵碟案,是天一觀的道長。 經(jīng)常有香客問我颇蜡,道長价说,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,622評論 1 293
  • 正文 為了忘掉前任风秤,我火速辦了婚禮鳖目,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘缤弦。我一直安慰自己领迈,他們只是感情好,可當我...
    茶點故事閱讀 67,661評論 6 392
  • 文/花漫 我一把揭開白布碍沐。 她就那樣靜靜地躺著狸捅,像睡著了一般。 火紅的嫁衣襯著肌膚如雪累提。 梳的紋絲不亂的頭發(fā)上尘喝,一...
    開封第一講書人閱讀 51,521評論 1 304
  • 那天,我揣著相機與錄音斋陪,去河邊找鬼朽褪。 笑死,一個胖子當著我的面吹牛无虚,可吹牛的內(nèi)容都是我干的缔赠。 我是一名探鬼主播,決...
    沈念sama閱讀 40,288評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼骑科,長吁一口氣:“原來是場噩夢啊……” “哼橡淑!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起咆爽,我...
    開封第一講書人閱讀 39,200評論 0 276
  • 序言:老撾萬榮一對情侶失蹤梁棠,失蹤者是張志新(化名)和其女友劉穎置森,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體符糊,經(jīng)...
    沈念sama閱讀 45,644評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡凫海,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,837評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了男娄。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片行贪。...
    茶點故事閱讀 39,953評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖模闲,靈堂內(nèi)的尸體忽然破棺而出建瘫,到底是詐尸還是另有隱情,我是刑警寧澤尸折,帶...
    沈念sama閱讀 35,673評論 5 346
  • 正文 年R本政府宣布啰脚,位于F島的核電站,受9級特大地震影響实夹,放射性物質(zhì)發(fā)生泄漏橄浓。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,281評論 3 329
  • 文/蒙蒙 一亮航、第九天 我趴在偏房一處隱蔽的房頂上張望荸实。 院中可真熱鬧,春花似錦缴淋、人聲如沸准给。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,889評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽圆存。三九已至,卻和暖如春仇哆,著一層夾襖步出監(jiān)牢的瞬間沦辙,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,011評論 1 269
  • 我被黑心中介騙來泰國打工讹剔, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留油讯,地道東北人。 一個月前我還...
    沈念sama閱讀 48,119評論 3 370
  • 正文 我出身青樓延欠,卻偏偏與公主長得像陌兑,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子由捎,可洞房花燭夜當晚...
    茶點故事閱讀 44,901評論 2 355