消息中間件-RocketMQ

RocketMQ是一個(gè)由阿里巴巴開源的消息中間件鬓催, 2012年開源酝蜒,2017年成為apache頂級(jí)項(xiàng)目咙俩。
它的核心設(shè)計(jì)借鑒了Kafka弦悉,所以我們?cè)诹私釸ocketMQ的時(shí)候窒典,會(huì)發(fā)現(xiàn)很多和kafka相同的特性。同時(shí)RocketMQ在某些功能上和kafka又有較大的差異

  1. 支持集群模型稽莉、負(fù)載均衡瀑志、水平擴(kuò)展能力
  2. 億級(jí)別消息堆積能力
  3. 采用零拷貝的原理,順序?qū)懕P污秆,隨機(jī)讀
  4. 底層通信框架采用Netty NIO
  5. NameServer代替Zookeeper劈猪,實(shí)現(xiàn)服務(wù)尋址和服務(wù)協(xié)調(diào)
  6. 消息失敗重試機(jī)制、消息可查詢
  7. 強(qiáng)調(diào)集群無單點(diǎn)良拼,可擴(kuò)展战得,任意一點(diǎn)高可用,水平可擴(kuò)展
  8. 經(jīng)過多次雙十一的考驗(yàn)

RocketMQ的架構(gòu)

image.png

集群本身沒有什么特殊之處庸推,和kafka的整體架構(gòu)類似常侦,其中zookeeper替換成了NameServer。

在rocketmq的早版本(2.x)的時(shí)候贬媒,是沒有namesrv組件的聋亡,用的是zookeeper做分布式協(xié)調(diào)和服務(wù)發(fā)現(xiàn),但是后期阿里數(shù)據(jù)根據(jù)實(shí)際業(yè)務(wù)需求進(jìn)行改進(jìn)和優(yōu)化际乘,自組研發(fā)了輕量級(jí)的namesrv,用于注冊(cè)Client服務(wù)與Broker的請(qǐng)求路由工作坡倔,namesrv上不做任何消息的位置存儲(chǔ),頻繁操作zookeeper的位置存儲(chǔ)數(shù)據(jù)會(huì)影響整體集群性能

RocketMQ由四部分組成
1)Name Server 可集群部署,節(jié)點(diǎn)之間無任何信息同步致讥。提供輕量級(jí)的服務(wù)發(fā)現(xiàn)和路由

2)Broker(消息中轉(zhuǎn)角色仅仆,負(fù)責(zé)存儲(chǔ)消息,轉(zhuǎn)發(fā)消息) 部署相對(duì)復(fù)雜垢袱,Broker 分為Master 與Slave墓拜,一個(gè)Master 可以對(duì)應(yīng)多個(gè)Slave,但是一個(gè)Slave 只能對(duì)應(yīng)一個(gè)Master请契,Master 與Slave 的對(duì)應(yīng)關(guān)系通過指定相同的BrokerName咳榜,不同的BrokerId來定 義,BrokerId為0 表示Master爽锥,非0 表示Slave涌韩。Master 也可以部署多個(gè)。

3)Producer氯夷,生產(chǎn)者臣樱,擁有相同 Producer Group 的 Producer 組成一個(gè)集群, 與Name Server 集群中的其中一個(gè)節(jié)點(diǎn)(隨機(jī)選擇)建立長(zhǎng)連接腮考,定期從Name Server 取Topic 路由信息雇毫,并向提供Topic服務(wù)的Master 建立長(zhǎng)連接,且定時(shí)向Master 發(fā)送心跳踩蔚。Producer 完全無狀態(tài)棚放,可集群部署。

4)Consumer馅闽,消費(fèi)者飘蚯,接收消息進(jìn)行消費(fèi)的實(shí)例,擁有相同 Consumer Group 的 Consumer 組成一個(gè)集群福也,與Name Server 集群中的其中一個(gè)節(jié)點(diǎn)(隨機(jī)選擇)建立長(zhǎng)連接局骤,定期從Name Server 取Topic 路由信息,并向提供Topic 服務(wù)的Master暴凑、Slave 建立長(zhǎng)連接庄涡,且定時(shí)向Master、Slave 發(fā)送心跳搬设。Consumer既可以從Master 訂閱消息,也可以從Slave 訂閱消息撕捍,訂閱規(guī)則由Broker 配置決定拿穴。

要使用rocketmq,至少需要啟動(dòng)兩個(gè)進(jìn)程忧风,nameserver默色、broker,前者是各種topic注冊(cè)中心狮腿,后者是真正的broker腿宰。

RocketMQ消息支持的模式

NormalProducer(普通)

消息同步發(fā)送

消息發(fā)送出去后呕诉,producer會(huì)等到broker回應(yīng)后才能繼續(xù)發(fā)送下一個(gè)消息


image.png

消息異步發(fā)送

異步發(fā)送是指發(fā)送方發(fā)出數(shù)據(jù)后,不等接收方發(fā)回響應(yīng)吃度,接著發(fā)送下個(gè)數(shù)據(jù)包的通訊方式甩挫。 MQ 的異步發(fā)送,需要用戶實(shí)現(xiàn)異步發(fā)送回調(diào)接口(SendCallback)椿每。消息發(fā)送方在發(fā)送了一條消息后伊者,不需要等待服務(wù)器響應(yīng)即可返回,進(jìn)行第二條消息發(fā)送间护。發(fā)送方通過回調(diào)接口接收服務(wù)器響應(yīng)亦渗,并對(duì)響應(yīng)結(jié)果進(jìn)行處理。


image.png
producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        System.out.printf("%s%n",sendResult);
    }
    @Override
    public void onException(Throwable throwable) {
        throwable.printStackTrace();
    }
});

OneWay

單向(Oneway)發(fā)送特點(diǎn)為發(fā)送方只負(fù)責(zé)發(fā)送消息汁尺,不等待服務(wù)器回應(yīng)且沒有回調(diào)函數(shù)觸發(fā)法精,即只發(fā)送請(qǐng)求不等待應(yīng)答,效率最高痴突。


image.png
producer.sendOneway(msg);

OrderProducer(順序)

在上一章kafka的時(shí)候有說到搂蜓,消息可以通過自定義分區(qū)策略來失效消息的順序發(fā)送,實(shí)現(xiàn)原理就
是把同一類消息都發(fā)送到相同的分區(qū)上苞也。

在RocketMQ中洛勉,是基于多個(gè)Message Queue來實(shí)現(xiàn)類似于kafka的分區(qū)效果。如果一個(gè)Topic 要發(fā)送和接收的數(shù)據(jù)量非常大如迟, 需要能支持增加并行處理的機(jī)器來提高處理速度收毫,這時(shí)候一個(gè)Topic 可以根據(jù)需求設(shè)置一個(gè)或多個(gè)Message Queue。Topic 有了多個(gè)Message Queue 后殷勘,消息可以并行地向各個(gè)Message Queue 發(fā)送此再,消費(fèi)者也可以并行地從多個(gè)Message Queue 讀取消息并消費(fèi)。

RocketMQ消息發(fā)送及消費(fèi)的基本原理

image.png

這是一個(gè)比較宏觀的部署架構(gòu)圖玲销,rocketmq天然支持高可用输拇,它可以支持多主多從的部署架構(gòu),這也是和kafka最大的區(qū)別之一贤斜。
原因是RocketMQ中并沒有master選舉功能策吠,所以通過配置多個(gè)master節(jié)點(diǎn)來保證rocketMQ的高可用。和所有的集群角色定位一樣瘩绒,master節(jié)點(diǎn)負(fù)責(zé)接受事務(wù)請(qǐng)求猴抹、slave節(jié)點(diǎn)只負(fù)責(zé)接收讀請(qǐng)求,并且接收master同步過來的數(shù)據(jù)和slave保持一直锁荔。當(dāng)master掛了以后蟀给,如果當(dāng)前rocketmq是一主多從,就意味著無法接受發(fā)送端的消息,但是消費(fèi)者仍然能夠繼續(xù)消費(fèi)跋理。
所以配置多個(gè)主節(jié)點(diǎn)后择克,可以保證當(dāng)其中一個(gè)master節(jié)點(diǎn)掛了,另外一個(gè)master節(jié)點(diǎn)仍然能夠?qū)ν馓峁┫l(fā)送服務(wù)前普。
當(dāng)存在多個(gè)主節(jié)點(diǎn)時(shí)肚邢,一條消息只會(huì)發(fā)送到其中一個(gè)主節(jié)點(diǎn),rocketmq對(duì)于多個(gè)master節(jié)點(diǎn)的消息發(fā)送汁政,會(huì)做負(fù)載均衡道偷,使得消息可以平衡的發(fā)送到多個(gè)master節(jié)點(diǎn)上。
一個(gè)消費(fèi)者可以同時(shí)消費(fèi)多個(gè)master節(jié)點(diǎn)上的消息记劈,在這個(gè)架構(gòu)圖中勺鸦,兩個(gè)master節(jié)點(diǎn)恰好可以平均分發(fā)到兩個(gè)消費(fèi)者上,如果此時(shí)只有一個(gè)消費(fèi)者目木,那么這個(gè)消費(fèi)者會(huì)消費(fèi)兩個(gè)master節(jié)點(diǎn)的數(shù)據(jù)换途。由于每個(gè)master可以配置多個(gè)slave,所以如果其中一個(gè)master掛了刽射,消息仍然可以被消費(fèi)者從slave節(jié)點(diǎn)消費(fèi)到军拟。可以完美的實(shí)現(xiàn)rocketmq消息的高可用誓禁。

接下來懈息,站在topic的角度來看看消息是如何分發(fā)和處理的,假設(shè)有兩個(gè)master節(jié)點(diǎn)的集群摹恰,創(chuàng)建了一個(gè)TestTopic辫继,并且對(duì)這個(gè)topic創(chuàng)建了兩個(gè)隊(duì)列,也就是分區(qū)俗慈。
消費(fèi)者定義了兩個(gè)分組姑宽,分組的概念也是和kafka一樣,通過分組可以實(shí)現(xiàn)消息的廣播闺阱。


image.png

集群支持

RocketMQ天生對(duì)集群的支持非常友好
1)單Master
優(yōu)點(diǎn):除了配置簡(jiǎn)單沒什么優(yōu)點(diǎn)
缺點(diǎn):不可靠炮车,該機(jī)器重啟或宕機(jī),將導(dǎo)致整個(gè)服務(wù)不可用
2)多Master
優(yōu)點(diǎn):配置簡(jiǎn)單酣溃,性能最高
缺點(diǎn):可能會(huì)有少量消息丟失(配置相關(guān))瘦穆,單臺(tái)機(jī)器重啟或宕機(jī)期間,該機(jī)器下未被消費(fèi)的消息在機(jī)器恢復(fù)前不可訂閱赊豌,影響消息實(shí)時(shí)性
3)多Master多Slave难审,每個(gè)Master配一個(gè)Slave,有多對(duì)Master-Slave亿絮,集群采用異步復(fù)制方式,主備有短暫消息延遲,毫秒級(jí)
優(yōu)點(diǎn):性能同多Master幾乎一樣派昧,實(shí)時(shí)性高黔姜,主備間切換對(duì)應(yīng)用透明,不需人工干預(yù)
缺點(diǎn):Master宕機(jī)或磁盤損壞時(shí)會(huì)有少量消息丟失
4)多Master多Slave蒂萎,每個(gè)Master配一個(gè)Slave秆吵,有多對(duì)Master-Slave,集群采用同步雙寫方式五慈,主備都寫成功纳寂,向應(yīng)用返回成功
優(yōu)點(diǎn):服務(wù)可用性與數(shù)據(jù)可用性非常高
缺點(diǎn):性能比異步集群略低,當(dāng)前版本主宕備不能自動(dòng)切換為主

需要注意的是泻拦,在RocketMQ里面毙芜,1臺(tái)機(jī)器只能要么是Master,要么是Slave争拐。這個(gè)在初始的機(jī)器配置里面腋粥,就定死了。不會(huì)像kafka那樣存在master動(dòng)態(tài)選舉的功能架曹。其中Master的broker id = 0隘冲,Slave的broker id > 0。
有點(diǎn)類似于mysql的主從概念绑雄,master掛了以后展辞,slave仍然可以提供讀服務(wù),但是由于有多主的存
在万牺,當(dāng)一個(gè)master掛了以后罗珍,可以寫到其他的master上。

消息發(fā)送到topic多個(gè)MessageQueue

演示一下對(duì)topic創(chuàng)建多個(gè)messageQueue

  1. 創(chuàng)建一個(gè)隊(duì)列杏愤,設(shè)置2個(gè)寫隊(duì)列以及2個(gè)讀隊(duì)列靡砌,如果讀和寫隊(duì)列不一致,會(huì)存在消息無法消費(fèi)到的問題


    image.png
  2. 構(gòu)建生產(chǎn)者和消費(fèi)者:參考上面寫的生產(chǎn)者消費(fèi)者代碼
  3. 消費(fèi)者數(shù)量控制對(duì)于隊(duì)列的消費(fèi)情況
    a) 如果消費(fèi)隊(duì)列為2珊楼,啟動(dòng)一個(gè)消費(fèi)者通殃,那么這個(gè)消費(fèi)者會(huì)消費(fèi)者兩個(gè)隊(duì)列,
    b) 如果兩個(gè)消費(fèi)者消費(fèi)這個(gè)隊(duì)列厕宗,那么意味著消息會(huì)均衡分?jǐn)偟竭@兩個(gè)消費(fèi)者中
    c) 如果消費(fèi)者數(shù)大于readQueueNumbs画舌,那么會(huì)有一些消費(fèi)者消費(fèi)不到消息,浪費(fèi)資源

消息的順序消費(fèi)

首先已慢,需要保證順序的消息要發(fā)送到同一個(gè)messagequeue中曲聂;其次,一個(gè)messagequeue只能被一個(gè)消費(fèi)者消費(fèi)佑惠,這點(diǎn)是由消息隊(duì)列的分配機(jī)制來保證的朋腋;最后齐疙,一個(gè)消費(fèi)者內(nèi)部對(duì)一個(gè)mq的消費(fèi)要保證是有序的。
我們要做到生產(chǎn)者 - messagequeue - 消費(fèi)者之間是一對(duì)一對(duì)一的關(guān)系旭咽。

自定義消息發(fā)送規(guī)則

通過自定義發(fā)送策略來實(shí)現(xiàn)消息只發(fā)送到同一個(gè)隊(duì)列
因?yàn)橐粋€(gè)Topic 會(huì)有多個(gè)Message Queue 贞奋,如果使用Producer 的默認(rèn)配置,這個(gè)Producer 會(huì)輪流向各個(gè)Message Queue 發(fā)送消息穷绵。Consumer 在消費(fèi)消息的時(shí)候轿塔,會(huì)根據(jù)負(fù)載均衡策略,消費(fèi)被分配到的Message Queue
如果不經(jīng)過特定的設(shè)置仲墨,某條消息被發(fā)往哪個(gè)Message Queue 勾缭,被哪個(gè)Consumer 消費(fèi)是未知的
如果業(yè)務(wù)需要我們把消息發(fā)送到指定的Message Queue 里,比如把同一類型的消息都發(fā)往相同的
Message Queue目养。那是不是可以實(shí)現(xiàn)順序消息的功能呢俩由?

和kafka一樣,rocketMQ也提供了消息路由的功能混稽,我們可以自定義消息分發(fā)策略采驻,可以實(shí)現(xiàn)
MessageQueueSelector,來實(shí)現(xiàn)自己的消息分發(fā)策略

SendResult sendResult=producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
        int key=o.hashCode();
        int size = list.size();
        int index = key%size;
        return list.get(index);// list.get(0);
    }
},"key_"+i);

如何保證消息消費(fèi)順序呢匈勋?

通過分區(qū)規(guī)則可以實(shí)現(xiàn)同類消息在rocketmq上的順序存儲(chǔ)礼旅。但是對(duì)于消費(fèi)端來說,如何保證消費(fèi)的順序洽洁?
RocketMQ中提供了MessageListenerOrderly 一個(gè)類來實(shí)現(xiàn)順序消費(fèi)

consumer.subscribe("store_topic_test","*");
consumer.registerMessageListener((MessageListenerOrderly) (list,consumeOrderlyContext) -> {
    list.stream().forEach(messageExt -> System.out.println(new String(messageExt.getBody())));
    return ConsumeOrderlyStatus.SUCCESS;
});

順序消費(fèi)會(huì)帶來一些問題痘系,

  1. 遇到消息失敗的消息,無法跳過饿自,當(dāng)前隊(duì)列消費(fèi)暫停
  2. 降低了消息處理的性能

消費(fèi)端的負(fù)載均衡

和kafka一樣汰翠,消費(fèi)端也會(huì)針對(duì)Message Queue做負(fù)載均衡,使得每個(gè)消費(fèi)者能夠合理的消費(fèi)多個(gè)分區(qū)的消息昭雌。
消費(fèi)端會(huì)通過RebalanceService線程复唤,10秒鐘做一次基于topic下的所有隊(duì)列負(fù)載

  • 消費(fèi)端遍歷自己的所有topic,依次調(diào)rebalanceByTopic
  • 根據(jù)topic獲取此topic下的所有queue
  • 選擇一臺(tái)broker獲取基于group的所有消費(fèi)端(有心跳向所有broker注冊(cè)客戶端信息)
  • 選擇隊(duì)列分配策略實(shí)例AllocateMessageQueueStrategy執(zhí)行分配算法

什么時(shí)候觸發(fā)負(fù)載均衡

  • 消費(fèi)者啟動(dòng)之后
  • 消費(fèi)者數(shù)量發(fā)生變更
  • 每10秒會(huì)觸發(fā)檢查一次rebalance

分配算法
RocketMQ提供了6中分區(qū)的分配算法:

  • (AllocateMessageQueueAveragely)平均分配算法(默認(rèn))
  • (AllocateMessageQueueAveragelyByCircle)環(huán)狀分配消息隊(duì)列
  • (AllocateMessageQueueByConfig)按照配置來分配隊(duì)列: 根據(jù)用戶指定的配置來進(jìn)行負(fù)載
  • (AllocateMessageQueueByMachineRoom)按照指定機(jī)房來配置隊(duì)列
  • (AllocateMachineRoomNearby)按照就近機(jī)房來配置隊(duì)列:
  • (AllocateMessageQueueConsistentHash)一致性hash烛卧,根據(jù)消費(fèi)者的cid進(jìn)行

消息的的可靠性原則

在實(shí)際使用RocketMQ的時(shí)候我們并不能保證每次發(fā)送的消息都剛好能被消費(fèi)者一次性正常消費(fèi)成功佛纫,可能會(huì)存在需要多次消費(fèi)才能成功或者一直消費(fèi)失敗的情況,那作為發(fā)送者該做如何處理呢总放?

消息消費(fèi)端的確認(rèn)機(jī)制

RocketMQ提供了ack機(jī)制呈宇,以保證消息能夠被正常消費(fèi)。發(fā)送者為了保證消息肯定消費(fèi)成功局雄,只有使用方明確表示消費(fèi)成功甥啄,RocketMQ才會(huì)認(rèn)為消息消費(fèi)成功。中途斷電炬搭,拋出異常等都不會(huì)認(rèn)為成功

consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeOrderlyContext) -> {
    list.stream().forEach(messageExt -> System.out.println(new String(messageExt.getBody())));
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

所有消費(fèi)者在設(shè)置監(jiān)聽的時(shí)候會(huì)提供一個(gè)回調(diào),業(yè)務(wù)實(shí)現(xiàn)消費(fèi)回調(diào)的時(shí)候,當(dāng)回調(diào)方法中返回
ConsumeConcurrentlyStatus.CONSUME_SUCCESS熔脂,RocketMQ才會(huì)認(rèn)為這批消息(默認(rèn)是1條)是消費(fèi)完成的。如果這時(shí)候消息消費(fèi)失敗充尉,例如數(shù)據(jù)庫(kù)異常,余額不足扣款失敗等一切業(yè)務(wù)認(rèn)為消息需要重試的場(chǎng)景衣形,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就會(huì)認(rèn)為這批消息消費(fèi)失敗了

消息的衰減重試

為了保證消息肯定至少被消費(fèi)一次姿鸿,RocketMQ會(huì)把這批消息重新發(fā)回到broker谆吴,在延遲的某個(gè)時(shí)間點(diǎn)(默認(rèn)是10秒,業(yè)務(wù)可設(shè)置)后苛预,再次投遞到這個(gè)ConsumerGroup句狼。而如果一直這樣重復(fù)消費(fèi)都持續(xù)失敗到一定次數(shù)(默認(rèn)16次),就會(huì)投遞到DLQ死信隊(duì)列热某。應(yīng)用可以監(jiān)控死信隊(duì)列來做人工干預(yù)可以修改broker-a.conf文件
messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

重試消息的處理機(jī)制

一般情況下我們?cè)趯?shí)際生產(chǎn)中是不需要重試16次腻菇,這樣既浪費(fèi)時(shí)間又浪費(fèi)性能,理論上當(dāng)嘗試重復(fù)次數(shù)達(dá)到我們想要的結(jié)果時(shí)如果還是消費(fèi)失敗昔馋,那么我們需要將對(duì)應(yīng)的消息進(jìn)行記錄筹吐,并且結(jié)束重復(fù)嘗試

consumer.registerMessageListener((MessageListenerConcurrently) (list,
consumeOrderlyContext) -> {
    for (MessageExt messageExt : list) {
        if(messageExt.getReconsumeTimes()==3) {
            //可以將對(duì)應(yīng)的數(shù)據(jù)保存到數(shù)據(jù)庫(kù),以便人工干預(yù)
            System.out.println(messageExt.getMsgId()+","+messageExt.getBody());
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

TransactionProducer(事務(wù)消息)

RocketMQ消息的事務(wù)架構(gòu)設(shè)計(jì)

  1. 生產(chǎn)者執(zhí)行本地事務(wù)秘遏,修改訂單支付狀態(tài)丘薛,并且提交事務(wù)
  2. 生產(chǎn)者發(fā)送事務(wù)消息到broker上,消息發(fā)送到broker上在沒有確認(rèn)之前邦危,消息對(duì)于consumer是不可見狀態(tài)
  3. 生產(chǎn)者確認(rèn)事務(wù)消息洋侨,使得發(fā)送到broker上的事務(wù)消息對(duì)于消費(fèi)者可見
  4. 消費(fèi)者獲取到消息進(jìn)行消費(fèi),消費(fèi)完之后執(zhí)行ack進(jìn)行確認(rèn)
  5. 這里可能會(huì)存在一個(gè)問題倦蚪,生產(chǎn)者本地事務(wù)成功后希坚,發(fā)送事務(wù)確認(rèn)消息到broker上失敗了怎么
    辦?這個(gè)時(shí)候意味著消費(fèi)者無法正常消費(fèi)到這個(gè)消息陵且。所以RocketMQ提供了消息回查機(jī)制裁僧,如果
    事務(wù)消息一直處于中間狀態(tài),broker會(huì)發(fā)起重試去查詢broker上這個(gè)事務(wù)的處理狀態(tài)滩报。一旦發(fā)現(xiàn)
    事務(wù)處理成功锅知,則把當(dāng)前這條消息設(shè)置為可見


    image.png

事務(wù)消息的實(shí)踐

通過一個(gè)下單以后扣減庫(kù)存的數(shù)據(jù)一致性場(chǎng)景來演示RocketMQ的分布式事務(wù)特性

TransactionProducer

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, InterruptedException {
        TransactionMQProducer transactionProducer = new
                TransactionMQProducer("tx_producer_group");
        transactionProducer.setNamesrvAddr("192.168.13.102:9876");
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        //自定義線程池,用于異步執(zhí)行事務(wù)操作
        transactionProducer.setExecutorService(executorService);
        transactionProducer.setTransactionListener(new TransactionListenerLocal());
        transactionProducer.start();
        for (int i = 0; i < 20; i++) {
            String orderId = UUID.randomUUID().toString();
            String body = "{'operation':'doOrder','orderId':'" + orderId + "'}";
            Message message = new Message("pay_tx_topic", "TagA", orderId,
                    body.getBytes(RemotingHelper.DEFAULT_CHARSET));
            transactionProducer.sendMessageInTransaction(message,
                    orderId + "&" + i);
            Thread.sleep(1000);
        }
    }
}

TransactionListenerLocal

public class TransactionListenerLocal implements TransactionListener {
    private static final Map<String, Boolean> results = new ConcurrentHashMap<>();

    //執(zhí)行本地事務(wù)
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        System.out.println(":執(zhí)行本地事務(wù):" + arg.toString());
        String orderId = arg.toString();
        boolean rs = saveOrder(orderId);//模擬數(shù)據(jù)入庫(kù)操作
        return rs ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.UNKNOW;
        // 這個(gè)返回狀態(tài)表示告訴broker這個(gè)事務(wù)消息是否被確認(rèn)脓钾,允許給到consumer進(jìn)行消費(fèi)
        // LocalTransactionState.ROLLBACK_MESSAGE 回滾
        // LocalTransactionState.UNKNOW 未知
    }

    //提供事務(wù)執(zhí)行狀態(tài)的回查方法售睹,提供給broker回調(diào)
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        String orderId = msg.getKeys();
        System.out.println("執(zhí)行事務(wù)執(zhí)行狀態(tài)的回查,orderId:" + orderId);
        boolean rs = Boolean.TRUE.equals(results.get(orderId));
        System.out.println("回調(diào):" + rs);
        return rs ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
    }

    private boolean saveOrder(String orderId) {
        //如果訂單取模等于0可训,表示成功,否則表示失敗
        boolean success = Math.abs(Objects.hash(orderId)) % 2 == 0;
        results.put(orderId, success);
        return success;
    }
}

TransactionConsumer

public class TransactionConsumer {

    public static void main(String[] args) throws MQClientException, IOException {
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("tx_consumer_group");
        defaultMQPushConsumer.setNamesrvAddr("192.168.11.162:9876");
        defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        defaultMQPushConsumer.subscribe("pay_tx_topic", "*");
        defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            msgs.stream().forEach(messageExt -> {
                try {
                    String orderId = messageExt.getKeys();
                    String body = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
                    System.out.println("收到消息:" + body + "昌妹,開始扣減庫(kù)存:" + orderId);
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
            });
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        defaultMQPushConsumer.start();
        System.in.read();
    }
}

RocketMQ事務(wù)消息的三種狀態(tài)

  1. ROLLBACK_MESSAGE:回滾事務(wù)
  2. COMMIT_MESSAGE: 提交事務(wù)
  3. UNKNOW: broker會(huì)定時(shí)的回查Producer消息狀態(tài)捶枢,直到徹底成功或失敗。

當(dāng)executeLocalTransaction方法返回ROLLBACK_MESSAGE時(shí)飞崖,表示直接回滾事務(wù)烂叔,當(dāng)返回
COMMIT_MESSAGE提交事務(wù)
當(dāng)返回UNKNOW時(shí),Broker會(huì)在一段時(shí)間之后回查checkLocalTransaction固歪,根據(jù)checkLocalTransaction返回狀態(tài)執(zhí)行事務(wù)的操作(回滾或提交)蒜鸡,
如示例中,當(dāng)返回ROLLBACK_MESSAGE時(shí)消費(fèi)者不會(huì)收到消息牢裳,且不會(huì)調(diào)用回查函數(shù)逢防,當(dāng)返回
COMMIT_MESSAGE時(shí)事務(wù)提交,消費(fèi)者收到消息蒲讯,當(dāng)返回UNKNOW時(shí)忘朝,在一段時(shí)間之后調(diào)用回查函數(shù),并根據(jù)status判斷返回提交或回滾狀態(tài)判帮,返回提交狀態(tài)的消息將會(huì)被消費(fèi)者消費(fèi)局嘁,所以此時(shí)消費(fèi)者可以消費(fèi)部分消息

消息的存儲(chǔ)和發(fā)送

由于分布式消息隊(duì)列對(duì)于可靠性的要求比較高,所以需要保證生產(chǎn)者將消息發(fā)送到broker之后晦墙,保證消息是不出現(xiàn)丟失的悦昵,因此消息隊(duì)列就少不了對(duì)于可靠性存儲(chǔ)的要求

MQ消息存儲(chǔ)選擇

從主流的幾種MQ消息隊(duì)列采用的存儲(chǔ)方式來看,主要會(huì)有三種

  1. 分布式KV存儲(chǔ)偎痛,比如ActiveMQ中采用的levelDB旱捧、Redis, 這種存儲(chǔ)方式對(duì)于消息讀寫能力要求不高的情況下可以使用
  2. 文件系統(tǒng)存儲(chǔ)踩麦,常見的比如kafka枚赡、RocketMQ、RabbitMQ都是采用消息刷盤到所部署的機(jī)器上的文件系統(tǒng)來做持久化谓谦,這種方案適合對(duì)于有高吞吐量要求的消息中間件贫橙,因?yàn)橄⑺⒈P是一種高效率,高可靠反粥、高性能的持久化方式卢肃,除非磁盤出現(xiàn)故障,否則一般是不會(huì)出現(xiàn)無法持久化的問題
  3. 關(guān)系型數(shù)據(jù)庫(kù)才顿,比如ActiveMQ可以采用mysql作為消息存儲(chǔ)莫湘,關(guān)系型數(shù)據(jù)庫(kù)在單表數(shù)據(jù)量達(dá)到千
    萬級(jí)的情況下IO性能會(huì)出現(xiàn)瓶頸,所以ActiveMQ并不適合于高吞吐量的消息隊(duì)列場(chǎng)景郑气。

總的來說幅垮,對(duì)于存儲(chǔ)效率,文件系統(tǒng)要優(yōu)于分布式KV存儲(chǔ)尾组,分布式KV存儲(chǔ)要優(yōu)于關(guān)系型數(shù)據(jù)庫(kù)

消息的存儲(chǔ)結(jié)構(gòu)

RocketMQ就是采用文件系統(tǒng)的方式來存儲(chǔ)消息忙芒,消息的存儲(chǔ)是由ConsumeQueue和CommitLog配合完成的示弓。CommitLog是消息真正的物理存儲(chǔ)文件。ConsumeQueue是消息的邏輯隊(duì)列呵萨,有點(diǎn)類似于數(shù)據(jù)庫(kù)的索引文件奏属,里面存儲(chǔ)的是指向CommitLog文件中消息存儲(chǔ)的地址。

每個(gè)Topic下的每個(gè)Message Queue都會(huì)對(duì)應(yīng)一個(gè)ConsumeQueue文件潮峦,文件的地址是:
{store_home}/consumequeue/{topicNmae}/{queueId}/{filename}, 默認(rèn)路徑: /root/store
在rocketMQ的文件存儲(chǔ)目錄下囱皿,可以看到這樣一個(gè)結(jié)構(gòu)的的而文件。

image.png

我們只需要關(guān)心Commitlog忱嘹、Consumequeue铆帽、Index

CommitLog

CommitLog是用來存放消息的物理文件,每個(gè)broker上的commitLog本當(dāng)前機(jī)器上的所有consumerQueue共享德谅,不做任何的區(qū)分。
CommitLog中的文件默認(rèn)大小為1G萨螺,可以動(dòng)態(tài)配置窄做; 當(dāng)一個(gè)文件寫滿以后,會(huì)生成一個(gè)新的
commitlog文件慰技。所有的Topic數(shù)據(jù)是順序?qū)懭朐贑ommitLog文件中的椭盏。
文件名的長(zhǎng)度為20位,左邊補(bǔ)0吻商,剩余未起始偏移量掏颊,比如
00000000000000000000 表示第一個(gè)文件, 文件大小為102410241024艾帐,當(dāng)?shù)谝粋€(gè)文件寫滿之后乌叶,生成第二個(gè)文件
000000000001073741824 表示第二個(gè)文件,起始偏移量為1073741824

ConsumeQueue

consumeQueue表示消息消費(fèi)的邏輯隊(duì)列柒爸,這里面包含MessageQueue在commitlog中的其實(shí)物理位置偏移量offset准浴,消息實(shí)體內(nèi)容的大小和Message Tag的hash值。對(duì)于實(shí)際物理存儲(chǔ)來說捎稚,
consumeQueue對(duì)應(yīng)每個(gè)topic和queueid下的文件乐横,每個(gè)consumeQueue類型的文件也是有大小,每個(gè)文件默認(rèn)大小約為600W個(gè)字節(jié)今野,如果文件滿了后會(huì)也會(huì)生成一個(gè)新的文件

IndexFile

索引文件葡公,如果一個(gè)消息包含Key值的話,會(huì)使用IndexFile存儲(chǔ)消息索引条霜。Index索引文件提供了對(duì)CommitLog進(jìn)行數(shù)據(jù)檢索催什,提供了一種通過key或者時(shí)間區(qū)間來查找CommitLog中的消息的方法。在物理存儲(chǔ)中蛔外,文件名是以創(chuàng)建的時(shí)間戳明明蛆楞,固定的單個(gè)IndexFile大小大概為400M溯乒,一個(gè)IndexFile可以保存2000W個(gè)索引

abort

broker在啟動(dòng)的時(shí)候會(huì)創(chuàng)建一個(gè)空的名為abort的文件,并在shutdown時(shí)將其刪除豹爹,用于標(biāo)識(shí)進(jìn)程是否正常退出裆悄,如果不正常退出,會(huì)在啟動(dòng)時(shí)做故障恢復(fù)

消息存儲(chǔ)的整體結(jié)構(gòu)

image.png

RocketMQ的消息存儲(chǔ)采用的是混合型的存儲(chǔ)結(jié)構(gòu),也就是Broker單個(gè)實(shí)例下的所有隊(duì)列公用一個(gè)日志數(shù)據(jù)文件CommitLog臂聋。這個(gè)是和Kafka又一個(gè)不同之處光稼。
為什么不采用kafka的設(shè)計(jì),針對(duì)不同的partition存儲(chǔ)一個(gè)獨(dú)立的物理文件呢孩等?這是因?yàn)樵趉afka的設(shè)計(jì)中艾君,一旦kafka中Topic的Partition數(shù)量過多,隊(duì)列文件會(huì)過多肄方,那么會(huì)給磁盤的IO讀寫造成比較大的壓力冰垄,也就造成了性能瓶頸。所以RocketMQ進(jìn)行了優(yōu)化权她,消息主題統(tǒng)一存儲(chǔ)在CommitLog中虹茶。
當(dāng)然,這種設(shè)計(jì)并不是銀彈隅要,它也有它的優(yōu)缺點(diǎn)

優(yōu)點(diǎn)在于:由于消息主題都是通過CommitLog來進(jìn)行讀寫蝴罪,ConsumerQueue中只存儲(chǔ)很少的數(shù)據(jù),所以隊(duì)列更加輕量化步清。對(duì)于磁盤的訪問是串行化從而避免了磁盤的競(jìng)爭(zhēng)
缺點(diǎn)在于:消息寫入磁盤雖然是基于順序?qū)懸牛亲x的過程確是隨機(jī)的。讀取一條消息會(huì)先讀取
ConsumeQueue廓啊,再讀CommitLog欢搜,會(huì)降低消息讀的效率。

消息發(fā)送到消息接收的整體流程

  1. Producer將消息發(fā)送到Broker后崖瞭,Broker會(huì)采用同步或者異步的方式把消息寫入到CommitLog狂巢。
    RocketMQ所有的消息都會(huì)存放在CommitLog中,為了保證消息存儲(chǔ)不發(fā)生混亂书聚,對(duì)CommitLog
    寫之前會(huì)加鎖唧领,同時(shí)也可以使得消息能夠被順序?qū)懭氲紺ommitLog,只要消息被持久化到磁盤文件CommitLog雌续,那么就可以保證Producer發(fā)送的消息不會(huì)丟失斩个。


    image.png
  2. commitLog持久化后,會(huì)把里面的消息Dispatch到對(duì)應(yīng)的Consume Queue上驯杜,Consume Queue
    相當(dāng)于kafka中的partition受啥,是一個(gè)邏輯隊(duì)列,存儲(chǔ)了這個(gè)Queue在CommiLog中的起始o(jì)ffset,
    log大小和MessageTag的hashCode滚局。


    image.png
  3. 當(dāng)消費(fèi)者進(jìn)行消息消費(fèi)時(shí)居暖,會(huì)先讀取consumerQueue , 邏輯消費(fèi)隊(duì)列ConsumeQueue保存了指
    定Topic下的隊(duì)列消息在CommitLog中的起始物理偏移量Offset,消息大小藤肢、和消息Tag的
    HashCode值


    image.png
  4. 直接從consumequeue中讀取消息是沒有數(shù)據(jù)的太闺,真正的消息主體在commitlog中,所以還需要
    從commitlog中讀取消息


    image.png

什么時(shí)候清理物理消息文件嘁圈?

消息存儲(chǔ)在CommitLog之后省骂,的確是會(huì)被清理的,但是這個(gè)清理只會(huì)在以下任一條件成立才會(huì)批量刪
除消息文件(CommitLog):

  1. 消息文件過期(默認(rèn)72小時(shí))最住,且到達(dá)清理時(shí)點(diǎn)(默認(rèn)是凌晨4點(diǎn))钞澳,刪除過期文件。
  2. 消息文件過期(默認(rèn)72小時(shí))涨缚,且磁盤空間達(dá)到了水位線(默認(rèn)75%)轧粟,刪除過期文件。
  3. 磁盤已經(jīng)達(dá)到必須釋放的上限(85%水位線)的時(shí)候脓魏,則開始批量清理文件(無論是否過期)逃延,直到空間充足。

注:若磁盤空間達(dá)到危險(xiǎn)水位線(默認(rèn)90%)轧拄,出于保護(hù)自身的目的,broker會(huì)拒絕寫入服務(wù)讽膏。

——學(xué)自咕泡學(xué)院

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末檩电,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子府树,更是在濱河造成了極大的恐慌俐末,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,402評(píng)論 6 499
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件奄侠,死亡現(xiàn)場(chǎng)離奇詭異卓箫,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)垄潮,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,377評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門烹卒,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人弯洗,你說我怎么就攤上這事旅急。” “怎么了牡整?”我有些...
    開封第一講書人閱讀 162,483評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵藐吮,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我,道長(zhǎng)谣辞,這世上最難降的妖魔是什么迫摔? 我笑而不...
    開封第一講書人閱讀 58,165評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮泥从,結(jié)果婚禮上句占,老公的妹妹穿的比我還像新娘。我一直安慰自己歉闰,他們只是感情好辖众,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,176評(píng)論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著和敬,像睡著了一般凹炸。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上昼弟,一...
    開封第一講書人閱讀 51,146評(píng)論 1 297
  • 那天啤它,我揣著相機(jī)與錄音,去河邊找鬼舱痘。 笑死变骡,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的芭逝。 我是一名探鬼主播塌碌,決...
    沈念sama閱讀 40,032評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼旬盯!你這毒婦竟也來了台妆?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,896評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤胖翰,失蹤者是張志新(化名)和其女友劉穎接剩,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體萨咳,經(jīng)...
    沈念sama閱讀 45,311評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡懊缺,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,536評(píng)論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了培他。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片鹃两。...
    茶點(diǎn)故事閱讀 39,696評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖舀凛,靈堂內(nèi)的尸體忽然破棺而出怔毛,到底是詐尸還是另有隱情,我是刑警寧澤腾降,帶...
    沈念sama閱讀 35,413評(píng)論 5 343
  • 正文 年R本政府宣布拣度,位于F島的核電站,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏抗果。R本人自食惡果不足惜筋帖,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,008評(píng)論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望冤馏。 院中可真熱鬧日麸,春花似錦、人聲如沸逮光。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽涕刚。三九已至嗡综,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間杜漠,已是汗流浹背极景。 一陣腳步聲響...
    開封第一講書人閱讀 32,815評(píng)論 1 269
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留驾茴,地道東北人盼樟。 一個(gè)月前我還...
    沈念sama閱讀 47,698評(píng)論 2 368
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像锈至,于是被迫代替她去往敵國(guó)和親晨缴。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,592評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容