RocketMQ是一個(gè)由阿里巴巴開源的消息中間件鬓催, 2012年開源酝蜒,2017年成為apache頂級(jí)項(xiàng)目咙俩。
它的核心設(shè)計(jì)借鑒了Kafka弦悉,所以我們?cè)诹私釸ocketMQ的時(shí)候窒典,會(huì)發(fā)現(xiàn)很多和kafka相同的特性。同時(shí)RocketMQ在某些功能上和kafka又有較大的差異
- 支持集群模型稽莉、負(fù)載均衡瀑志、水平擴(kuò)展能力
- 億級(jí)別消息堆積能力
- 采用零拷貝的原理,順序?qū)懕P污秆,隨機(jī)讀
- 底層通信框架采用Netty NIO
- NameServer代替Zookeeper劈猪,實(shí)現(xiàn)服務(wù)尋址和服務(wù)協(xié)調(diào)
- 消息失敗重試機(jī)制、消息可查詢
- 強(qiáng)調(diào)集群無單點(diǎn)良拼,可擴(kuò)展战得,任意一點(diǎn)高可用,水平可擴(kuò)展
- 經(jīng)過多次雙十一的考驗(yàn)
RocketMQ的架構(gòu)
集群本身沒有什么特殊之處庸推,和kafka的整體架構(gòu)類似常侦,其中zookeeper替換成了NameServer。
在rocketmq的早版本(2.x)的時(shí)候贬媒,是沒有namesrv組件的聋亡,用的是zookeeper做分布式協(xié)調(diào)和服務(wù)發(fā)現(xiàn),但是后期阿里數(shù)據(jù)根據(jù)實(shí)際業(yè)務(wù)需求進(jìn)行改進(jìn)和優(yōu)化际乘,自組研發(fā)了輕量級(jí)的namesrv,用于注冊(cè)Client服務(wù)與Broker的請(qǐng)求路由工作坡倔,namesrv上不做任何消息的位置存儲(chǔ),頻繁操作zookeeper的位置存儲(chǔ)數(shù)據(jù)會(huì)影響整體集群性能
RocketMQ由四部分組成
1)Name Server 可集群部署,節(jié)點(diǎn)之間無任何信息同步致讥。提供輕量級(jí)的服務(wù)發(fā)現(xiàn)和路由
2)Broker(消息中轉(zhuǎn)角色仅仆,負(fù)責(zé)存儲(chǔ)消息,轉(zhuǎn)發(fā)消息) 部署相對(duì)復(fù)雜垢袱,Broker 分為Master 與Slave墓拜,一個(gè)Master 可以對(duì)應(yīng)多個(gè)Slave,但是一個(gè)Slave 只能對(duì)應(yīng)一個(gè)Master请契,Master 與Slave 的對(duì)應(yīng)關(guān)系通過指定相同的BrokerName咳榜,不同的BrokerId來定 義,BrokerId為0 表示Master爽锥,非0 表示Slave涌韩。Master 也可以部署多個(gè)。
3)Producer氯夷,生產(chǎn)者臣樱,擁有相同 Producer Group 的 Producer 組成一個(gè)集群, 與Name Server 集群中的其中一個(gè)節(jié)點(diǎn)(隨機(jī)選擇)建立長(zhǎng)連接腮考,定期從Name Server 取Topic 路由信息雇毫,并向提供Topic服務(wù)的Master 建立長(zhǎng)連接,且定時(shí)向Master 發(fā)送心跳踩蔚。Producer 完全無狀態(tài)棚放,可集群部署。
4)Consumer馅闽,消費(fèi)者飘蚯,接收消息進(jìn)行消費(fèi)的實(shí)例,擁有相同 Consumer Group 的 Consumer 組成一個(gè)集群福也,與Name Server 集群中的其中一個(gè)節(jié)點(diǎn)(隨機(jī)選擇)建立長(zhǎng)連接局骤,定期從Name Server 取Topic 路由信息,并向提供Topic 服務(wù)的Master暴凑、Slave 建立長(zhǎng)連接庄涡,且定時(shí)向Master、Slave 發(fā)送心跳搬设。Consumer既可以從Master 訂閱消息,也可以從Slave 訂閱消息撕捍,訂閱規(guī)則由Broker 配置決定拿穴。
要使用rocketmq,至少需要啟動(dòng)兩個(gè)進(jìn)程忧风,nameserver默色、broker,前者是各種topic注冊(cè)中心狮腿,后者是真正的broker腿宰。
RocketMQ消息支持的模式
NormalProducer(普通)
消息同步發(fā)送
消息發(fā)送出去后呕诉,producer會(huì)等到broker回應(yīng)后才能繼續(xù)發(fā)送下一個(gè)消息
消息異步發(fā)送
異步發(fā)送是指發(fā)送方發(fā)出數(shù)據(jù)后,不等接收方發(fā)回響應(yīng)吃度,接著發(fā)送下個(gè)數(shù)據(jù)包的通訊方式甩挫。 MQ 的異步發(fā)送,需要用戶實(shí)現(xiàn)異步發(fā)送回調(diào)接口(SendCallback)椿每。消息發(fā)送方在發(fā)送了一條消息后伊者,不需要等待服務(wù)器響應(yīng)即可返回,進(jìn)行第二條消息發(fā)送间护。發(fā)送方通過回調(diào)接口接收服務(wù)器響應(yīng)亦渗,并對(duì)響應(yīng)結(jié)果進(jìn)行處理。
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%s%n",sendResult);
}
@Override
public void onException(Throwable throwable) {
throwable.printStackTrace();
}
});
OneWay
單向(Oneway)發(fā)送特點(diǎn)為發(fā)送方只負(fù)責(zé)發(fā)送消息汁尺,不等待服務(wù)器回應(yīng)且沒有回調(diào)函數(shù)觸發(fā)法精,即只發(fā)送請(qǐng)求不等待應(yīng)答,效率最高痴突。
producer.sendOneway(msg);
OrderProducer(順序)
在上一章kafka的時(shí)候有說到搂蜓,消息可以通過自定義分區(qū)策略來失效消息的順序發(fā)送,實(shí)現(xiàn)原理就
是把同一類消息都發(fā)送到相同的分區(qū)上苞也。
在RocketMQ中洛勉,是基于多個(gè)Message Queue來實(shí)現(xiàn)類似于kafka的分區(qū)效果。如果一個(gè)Topic 要發(fā)送和接收的數(shù)據(jù)量非常大如迟, 需要能支持增加并行處理的機(jī)器來提高處理速度收毫,這時(shí)候一個(gè)Topic 可以根據(jù)需求設(shè)置一個(gè)或多個(gè)Message Queue。Topic 有了多個(gè)Message Queue 后殷勘,消息可以并行地向各個(gè)Message Queue 發(fā)送此再,消費(fèi)者也可以并行地從多個(gè)Message Queue 讀取消息并消費(fèi)。
RocketMQ消息發(fā)送及消費(fèi)的基本原理
這是一個(gè)比較宏觀的部署架構(gòu)圖玲销,rocketmq天然支持高可用输拇,它可以支持多主多從的部署架構(gòu),這也是和kafka最大的區(qū)別之一贤斜。
原因是RocketMQ中并沒有master選舉功能策吠,所以通過配置多個(gè)master節(jié)點(diǎn)來保證rocketMQ的高可用。和所有的集群角色定位一樣瘩绒,master節(jié)點(diǎn)負(fù)責(zé)接受事務(wù)請(qǐng)求猴抹、slave節(jié)點(diǎn)只負(fù)責(zé)接收讀請(qǐng)求,并且接收master同步過來的數(shù)據(jù)和slave保持一直锁荔。當(dāng)master掛了以后蟀给,如果當(dāng)前rocketmq是一主多從,就意味著無法接受發(fā)送端的消息,但是消費(fèi)者仍然能夠繼續(xù)消費(fèi)跋理。
所以配置多個(gè)主節(jié)點(diǎn)后择克,可以保證當(dāng)其中一個(gè)master節(jié)點(diǎn)掛了,另外一個(gè)master節(jié)點(diǎn)仍然能夠?qū)ν馓峁┫l(fā)送服務(wù)前普。
當(dāng)存在多個(gè)主節(jié)點(diǎn)時(shí)肚邢,一條消息只會(huì)發(fā)送到其中一個(gè)主節(jié)點(diǎn),rocketmq對(duì)于多個(gè)master節(jié)點(diǎn)的消息發(fā)送汁政,會(huì)做負(fù)載均衡道偷,使得消息可以平衡的發(fā)送到多個(gè)master節(jié)點(diǎn)上。
一個(gè)消費(fèi)者可以同時(shí)消費(fèi)多個(gè)master節(jié)點(diǎn)上的消息记劈,在這個(gè)架構(gòu)圖中勺鸦,兩個(gè)master節(jié)點(diǎn)恰好可以平均分發(fā)到兩個(gè)消費(fèi)者上,如果此時(shí)只有一個(gè)消費(fèi)者目木,那么這個(gè)消費(fèi)者會(huì)消費(fèi)兩個(gè)master節(jié)點(diǎn)的數(shù)據(jù)换途。由于每個(gè)master可以配置多個(gè)slave,所以如果其中一個(gè)master掛了刽射,消息仍然可以被消費(fèi)者從slave節(jié)點(diǎn)消費(fèi)到军拟。可以完美的實(shí)現(xiàn)rocketmq消息的高可用誓禁。
接下來懈息,站在topic的角度來看看消息是如何分發(fā)和處理的,假設(shè)有兩個(gè)master節(jié)點(diǎn)的集群摹恰,創(chuàng)建了一個(gè)TestTopic辫继,并且對(duì)這個(gè)topic創(chuàng)建了兩個(gè)隊(duì)列,也就是分區(qū)俗慈。
消費(fèi)者定義了兩個(gè)分組姑宽,分組的概念也是和kafka一樣,通過分組可以實(shí)現(xiàn)消息的廣播闺阱。
集群支持
RocketMQ天生對(duì)集群的支持非常友好
1)單Master
優(yōu)點(diǎn):除了配置簡(jiǎn)單沒什么優(yōu)點(diǎn)
缺點(diǎn):不可靠炮车,該機(jī)器重啟或宕機(jī),將導(dǎo)致整個(gè)服務(wù)不可用
2)多Master
優(yōu)點(diǎn):配置簡(jiǎn)單酣溃,性能最高
缺點(diǎn):可能會(huì)有少量消息丟失(配置相關(guān))瘦穆,單臺(tái)機(jī)器重啟或宕機(jī)期間,該機(jī)器下未被消費(fèi)的消息在機(jī)器恢復(fù)前不可訂閱赊豌,影響消息實(shí)時(shí)性
3)多Master多Slave难审,每個(gè)Master配一個(gè)Slave,有多對(duì)Master-Slave亿絮,集群采用異步復(fù)制方式,主備有短暫消息延遲,毫秒級(jí)
優(yōu)點(diǎn):性能同多Master幾乎一樣派昧,實(shí)時(shí)性高黔姜,主備間切換對(duì)應(yīng)用透明,不需人工干預(yù)
缺點(diǎn):Master宕機(jī)或磁盤損壞時(shí)會(huì)有少量消息丟失
4)多Master多Slave蒂萎,每個(gè)Master配一個(gè)Slave秆吵,有多對(duì)Master-Slave,集群采用同步雙寫方式五慈,主備都寫成功纳寂,向應(yīng)用返回成功
優(yōu)點(diǎn):服務(wù)可用性與數(shù)據(jù)可用性非常高
缺點(diǎn):性能比異步集群略低,當(dāng)前版本主宕備不能自動(dòng)切換為主
需要注意的是泻拦,在RocketMQ里面毙芜,1臺(tái)機(jī)器只能要么是Master,要么是Slave争拐。這個(gè)在初始的機(jī)器配置里面腋粥,就定死了。不會(huì)像kafka那樣存在master動(dòng)態(tài)選舉的功能架曹。其中Master的broker id = 0隘冲,Slave的broker id > 0。
有點(diǎn)類似于mysql的主從概念绑雄,master掛了以后展辞,slave仍然可以提供讀服務(wù),但是由于有多主的存
在万牺,當(dāng)一個(gè)master掛了以后罗珍,可以寫到其他的master上。
消息發(fā)送到topic多個(gè)MessageQueue
演示一下對(duì)topic創(chuàng)建多個(gè)messageQueue
-
創(chuàng)建一個(gè)隊(duì)列杏愤,設(shè)置2個(gè)寫隊(duì)列以及2個(gè)讀隊(duì)列靡砌,如果讀和寫隊(duì)列不一致,會(huì)存在消息無法消費(fèi)到的問題
- 構(gòu)建生產(chǎn)者和消費(fèi)者:參考上面寫的生產(chǎn)者消費(fèi)者代碼
- 消費(fèi)者數(shù)量控制對(duì)于隊(duì)列的消費(fèi)情況
a) 如果消費(fèi)隊(duì)列為2珊楼,啟動(dòng)一個(gè)消費(fèi)者通殃,那么這個(gè)消費(fèi)者會(huì)消費(fèi)者兩個(gè)隊(duì)列,
b) 如果兩個(gè)消費(fèi)者消費(fèi)這個(gè)隊(duì)列厕宗,那么意味著消息會(huì)均衡分?jǐn)偟竭@兩個(gè)消費(fèi)者中
c) 如果消費(fèi)者數(shù)大于readQueueNumbs画舌,那么會(huì)有一些消費(fèi)者消費(fèi)不到消息,浪費(fèi)資源
消息的順序消費(fèi)
首先已慢,需要保證順序的消息要發(fā)送到同一個(gè)messagequeue中曲聂;其次,一個(gè)messagequeue只能被一個(gè)消費(fèi)者消費(fèi)佑惠,這點(diǎn)是由消息隊(duì)列的分配機(jī)制來保證的朋腋;最后齐疙,一個(gè)消費(fèi)者內(nèi)部對(duì)一個(gè)mq的消費(fèi)要保證是有序的。
我們要做到生產(chǎn)者 - messagequeue - 消費(fèi)者之間是一對(duì)一對(duì)一的關(guān)系旭咽。
自定義消息發(fā)送規(guī)則
通過自定義發(fā)送策略來實(shí)現(xiàn)消息只發(fā)送到同一個(gè)隊(duì)列
因?yàn)橐粋€(gè)Topic 會(huì)有多個(gè)Message Queue 贞奋,如果使用Producer 的默認(rèn)配置,這個(gè)Producer 會(huì)輪流向各個(gè)Message Queue 發(fā)送消息穷绵。Consumer 在消費(fèi)消息的時(shí)候轿塔,會(huì)根據(jù)負(fù)載均衡策略,消費(fèi)被分配到的Message Queue
如果不經(jīng)過特定的設(shè)置仲墨,某條消息被發(fā)往哪個(gè)Message Queue 勾缭,被哪個(gè)Consumer 消費(fèi)是未知的
如果業(yè)務(wù)需要我們把消息發(fā)送到指定的Message Queue 里,比如把同一類型的消息都發(fā)往相同的
Message Queue目养。那是不是可以實(shí)現(xiàn)順序消息的功能呢俩由?
和kafka一樣,rocketMQ也提供了消息路由的功能混稽,我們可以自定義消息分發(fā)策略采驻,可以實(shí)現(xiàn)
MessageQueueSelector,來實(shí)現(xiàn)自己的消息分發(fā)策略
SendResult sendResult=producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
int key=o.hashCode();
int size = list.size();
int index = key%size;
return list.get(index);// list.get(0);
}
},"key_"+i);
如何保證消息消費(fèi)順序呢匈勋?
通過分區(qū)規(guī)則可以實(shí)現(xiàn)同類消息在rocketmq上的順序存儲(chǔ)礼旅。但是對(duì)于消費(fèi)端來說,如何保證消費(fèi)的順序洽洁?
RocketMQ中提供了MessageListenerOrderly 一個(gè)類來實(shí)現(xiàn)順序消費(fèi)
consumer.subscribe("store_topic_test","*");
consumer.registerMessageListener((MessageListenerOrderly) (list,consumeOrderlyContext) -> {
list.stream().forEach(messageExt -> System.out.println(new String(messageExt.getBody())));
return ConsumeOrderlyStatus.SUCCESS;
});
順序消費(fèi)會(huì)帶來一些問題痘系,
- 遇到消息失敗的消息,無法跳過饿自,當(dāng)前隊(duì)列消費(fèi)暫停
- 降低了消息處理的性能
消費(fèi)端的負(fù)載均衡
和kafka一樣汰翠,消費(fèi)端也會(huì)針對(duì)Message Queue做負(fù)載均衡,使得每個(gè)消費(fèi)者能夠合理的消費(fèi)多個(gè)分區(qū)的消息昭雌。
消費(fèi)端會(huì)通過RebalanceService線程复唤,10秒鐘做一次基于topic下的所有隊(duì)列負(fù)載
- 消費(fèi)端遍歷自己的所有topic,依次調(diào)rebalanceByTopic
- 根據(jù)topic獲取此topic下的所有queue
- 選擇一臺(tái)broker獲取基于group的所有消費(fèi)端(有心跳向所有broker注冊(cè)客戶端信息)
- 選擇隊(duì)列分配策略實(shí)例AllocateMessageQueueStrategy執(zhí)行分配算法
什么時(shí)候觸發(fā)負(fù)載均衡
- 消費(fèi)者啟動(dòng)之后
- 消費(fèi)者數(shù)量發(fā)生變更
- 每10秒會(huì)觸發(fā)檢查一次rebalance
分配算法
RocketMQ提供了6中分區(qū)的分配算法:
- (AllocateMessageQueueAveragely)平均分配算法(默認(rèn))
- (AllocateMessageQueueAveragelyByCircle)環(huán)狀分配消息隊(duì)列
- (AllocateMessageQueueByConfig)按照配置來分配隊(duì)列: 根據(jù)用戶指定的配置來進(jìn)行負(fù)載
- (AllocateMessageQueueByMachineRoom)按照指定機(jī)房來配置隊(duì)列
- (AllocateMachineRoomNearby)按照就近機(jī)房來配置隊(duì)列:
- (AllocateMessageQueueConsistentHash)一致性hash烛卧,根據(jù)消費(fèi)者的cid進(jìn)行
消息的的可靠性原則
在實(shí)際使用RocketMQ的時(shí)候我們并不能保證每次發(fā)送的消息都剛好能被消費(fèi)者一次性正常消費(fèi)成功佛纫,可能會(huì)存在需要多次消費(fèi)才能成功或者一直消費(fèi)失敗的情況,那作為發(fā)送者該做如何處理呢总放?
消息消費(fèi)端的確認(rèn)機(jī)制
RocketMQ提供了ack機(jī)制呈宇,以保證消息能夠被正常消費(fèi)。發(fā)送者為了保證消息肯定消費(fèi)成功局雄,只有使用方明確表示消費(fèi)成功甥啄,RocketMQ才會(huì)認(rèn)為消息消費(fèi)成功。中途斷電炬搭,拋出異常等都不會(huì)認(rèn)為成功
consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeOrderlyContext) -> {
list.stream().forEach(messageExt -> System.out.println(new String(messageExt.getBody())));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
所有消費(fèi)者在設(shè)置監(jiān)聽的時(shí)候會(huì)提供一個(gè)回調(diào),業(yè)務(wù)實(shí)現(xiàn)消費(fèi)回調(diào)的時(shí)候,當(dāng)回調(diào)方法中返回
ConsumeConcurrentlyStatus.CONSUME_SUCCESS熔脂,RocketMQ才會(huì)認(rèn)為這批消息(默認(rèn)是1條)是消費(fèi)完成的。如果這時(shí)候消息消費(fèi)失敗充尉,例如數(shù)據(jù)庫(kù)異常,余額不足扣款失敗等一切業(yè)務(wù)認(rèn)為消息需要重試的場(chǎng)景衣形,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就會(huì)認(rèn)為這批消息消費(fèi)失敗了
消息的衰減重試
為了保證消息肯定至少被消費(fèi)一次姿鸿,RocketMQ會(huì)把這批消息重新發(fā)回到broker谆吴,在延遲的某個(gè)時(shí)間點(diǎn)(默認(rèn)是10秒,業(yè)務(wù)可設(shè)置)后苛预,再次投遞到這個(gè)ConsumerGroup句狼。而如果一直這樣重復(fù)消費(fèi)都持續(xù)失敗到一定次數(shù)(默認(rèn)16次),就會(huì)投遞到DLQ死信隊(duì)列热某。應(yīng)用可以監(jiān)控死信隊(duì)列來做人工干預(yù)可以修改broker-a.conf文件
messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
重試消息的處理機(jī)制
一般情況下我們?cè)趯?shí)際生產(chǎn)中是不需要重試16次腻菇,這樣既浪費(fèi)時(shí)間又浪費(fèi)性能,理論上當(dāng)嘗試重復(fù)次數(shù)達(dá)到我們想要的結(jié)果時(shí)如果還是消費(fèi)失敗昔馋,那么我們需要將對(duì)應(yīng)的消息進(jìn)行記錄筹吐,并且結(jié)束重復(fù)嘗試
consumer.registerMessageListener((MessageListenerConcurrently) (list,
consumeOrderlyContext) -> {
for (MessageExt messageExt : list) {
if(messageExt.getReconsumeTimes()==3) {
//可以將對(duì)應(yīng)的數(shù)據(jù)保存到數(shù)據(jù)庫(kù),以便人工干預(yù)
System.out.println(messageExt.getMsgId()+","+messageExt.getBody());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
TransactionProducer(事務(wù)消息)
RocketMQ消息的事務(wù)架構(gòu)設(shè)計(jì)
- 生產(chǎn)者執(zhí)行本地事務(wù)秘遏,修改訂單支付狀態(tài)丘薛,并且提交事務(wù)
- 生產(chǎn)者發(fā)送事務(wù)消息到broker上,消息發(fā)送到broker上在沒有確認(rèn)之前邦危,消息對(duì)于consumer是不可見狀態(tài)
- 生產(chǎn)者確認(rèn)事務(wù)消息洋侨,使得發(fā)送到broker上的事務(wù)消息對(duì)于消費(fèi)者可見
- 消費(fèi)者獲取到消息進(jìn)行消費(fèi),消費(fèi)完之后執(zhí)行ack進(jìn)行確認(rèn)
-
這里可能會(huì)存在一個(gè)問題倦蚪,生產(chǎn)者本地事務(wù)成功后希坚,發(fā)送事務(wù)確認(rèn)消息到broker上失敗了怎么
辦?這個(gè)時(shí)候意味著消費(fèi)者無法正常消費(fèi)到這個(gè)消息陵且。所以RocketMQ提供了消息回查機(jī)制裁僧,如果
事務(wù)消息一直處于中間狀態(tài),broker會(huì)發(fā)起重試去查詢broker上這個(gè)事務(wù)的處理狀態(tài)滩报。一旦發(fā)現(xiàn)
事務(wù)處理成功锅知,則把當(dāng)前這條消息設(shè)置為可見
事務(wù)消息的實(shí)踐
通過一個(gè)下單以后扣減庫(kù)存的數(shù)據(jù)一致性場(chǎng)景來演示RocketMQ的分布式事務(wù)特性
TransactionProducer
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, InterruptedException {
TransactionMQProducer transactionProducer = new
TransactionMQProducer("tx_producer_group");
transactionProducer.setNamesrvAddr("192.168.13.102:9876");
ExecutorService executorService = Executors.newFixedThreadPool(10);
//自定義線程池,用于異步執(zhí)行事務(wù)操作
transactionProducer.setExecutorService(executorService);
transactionProducer.setTransactionListener(new TransactionListenerLocal());
transactionProducer.start();
for (int i = 0; i < 20; i++) {
String orderId = UUID.randomUUID().toString();
String body = "{'operation':'doOrder','orderId':'" + orderId + "'}";
Message message = new Message("pay_tx_topic", "TagA", orderId,
body.getBytes(RemotingHelper.DEFAULT_CHARSET));
transactionProducer.sendMessageInTransaction(message,
orderId + "&" + i);
Thread.sleep(1000);
}
}
}
TransactionListenerLocal
public class TransactionListenerLocal implements TransactionListener {
private static final Map<String, Boolean> results = new ConcurrentHashMap<>();
//執(zhí)行本地事務(wù)
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println(":執(zhí)行本地事務(wù):" + arg.toString());
String orderId = arg.toString();
boolean rs = saveOrder(orderId);//模擬數(shù)據(jù)入庫(kù)操作
return rs ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.UNKNOW;
// 這個(gè)返回狀態(tài)表示告訴broker這個(gè)事務(wù)消息是否被確認(rèn)脓钾,允許給到consumer進(jìn)行消費(fèi)
// LocalTransactionState.ROLLBACK_MESSAGE 回滾
// LocalTransactionState.UNKNOW 未知
}
//提供事務(wù)執(zhí)行狀態(tài)的回查方法售睹,提供給broker回調(diào)
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String orderId = msg.getKeys();
System.out.println("執(zhí)行事務(wù)執(zhí)行狀態(tài)的回查,orderId:" + orderId);
boolean rs = Boolean.TRUE.equals(results.get(orderId));
System.out.println("回調(diào):" + rs);
return rs ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
}
private boolean saveOrder(String orderId) {
//如果訂單取模等于0可训,表示成功,否則表示失敗
boolean success = Math.abs(Objects.hash(orderId)) % 2 == 0;
results.put(orderId, success);
return success;
}
}
TransactionConsumer
public class TransactionConsumer {
public static void main(String[] args) throws MQClientException, IOException {
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("tx_consumer_group");
defaultMQPushConsumer.setNamesrvAddr("192.168.11.162:9876");
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
defaultMQPushConsumer.subscribe("pay_tx_topic", "*");
defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
msgs.stream().forEach(messageExt -> {
try {
String orderId = messageExt.getKeys();
String body = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("收到消息:" + body + "昌妹,開始扣減庫(kù)存:" + orderId);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
defaultMQPushConsumer.start();
System.in.read();
}
}
RocketMQ事務(wù)消息的三種狀態(tài)
- ROLLBACK_MESSAGE:回滾事務(wù)
- COMMIT_MESSAGE: 提交事務(wù)
- UNKNOW: broker會(huì)定時(shí)的回查Producer消息狀態(tài)捶枢,直到徹底成功或失敗。
當(dāng)executeLocalTransaction方法返回ROLLBACK_MESSAGE時(shí)飞崖,表示直接回滾事務(wù)烂叔,當(dāng)返回
COMMIT_MESSAGE提交事務(wù)
當(dāng)返回UNKNOW時(shí),Broker會(huì)在一段時(shí)間之后回查checkLocalTransaction固歪,根據(jù)checkLocalTransaction返回狀態(tài)執(zhí)行事務(wù)的操作(回滾或提交)蒜鸡,
如示例中,當(dāng)返回ROLLBACK_MESSAGE時(shí)消費(fèi)者不會(huì)收到消息牢裳,且不會(huì)調(diào)用回查函數(shù)逢防,當(dāng)返回
COMMIT_MESSAGE時(shí)事務(wù)提交,消費(fèi)者收到消息蒲讯,當(dāng)返回UNKNOW時(shí)忘朝,在一段時(shí)間之后調(diào)用回查函數(shù),并根據(jù)status判斷返回提交或回滾狀態(tài)判帮,返回提交狀態(tài)的消息將會(huì)被消費(fèi)者消費(fèi)局嘁,所以此時(shí)消費(fèi)者可以消費(fèi)部分消息
消息的存儲(chǔ)和發(fā)送
由于分布式消息隊(duì)列對(duì)于可靠性的要求比較高,所以需要保證生產(chǎn)者將消息發(fā)送到broker之后晦墙,保證消息是不出現(xiàn)丟失的悦昵,因此消息隊(duì)列就少不了對(duì)于可靠性存儲(chǔ)的要求
MQ消息存儲(chǔ)選擇
從主流的幾種MQ消息隊(duì)列采用的存儲(chǔ)方式來看,主要會(huì)有三種
- 分布式KV存儲(chǔ)偎痛,比如ActiveMQ中采用的levelDB旱捧、Redis, 這種存儲(chǔ)方式對(duì)于消息讀寫能力要求不高的情況下可以使用
- 文件系統(tǒng)存儲(chǔ)踩麦,常見的比如kafka枚赡、RocketMQ、RabbitMQ都是采用消息刷盤到所部署的機(jī)器上的文件系統(tǒng)來做持久化谓谦,這種方案適合對(duì)于有高吞吐量要求的消息中間件贫橙,因?yàn)橄⑺⒈P是一種高效率,高可靠反粥、高性能的持久化方式卢肃,除非磁盤出現(xiàn)故障,否則一般是不會(huì)出現(xiàn)無法持久化的問題
- 關(guān)系型數(shù)據(jù)庫(kù)才顿,比如ActiveMQ可以采用mysql作為消息存儲(chǔ)莫湘,關(guān)系型數(shù)據(jù)庫(kù)在單表數(shù)據(jù)量達(dá)到千
萬級(jí)的情況下IO性能會(huì)出現(xiàn)瓶頸,所以ActiveMQ并不適合于高吞吐量的消息隊(duì)列場(chǎng)景郑气。
總的來說幅垮,對(duì)于存儲(chǔ)效率,文件系統(tǒng)要優(yōu)于分布式KV存儲(chǔ)尾组,分布式KV存儲(chǔ)要優(yōu)于關(guān)系型數(shù)據(jù)庫(kù)
消息的存儲(chǔ)結(jié)構(gòu)
RocketMQ就是采用文件系統(tǒng)的方式來存儲(chǔ)消息忙芒,消息的存儲(chǔ)是由ConsumeQueue和CommitLog配合完成的示弓。CommitLog是消息真正的物理存儲(chǔ)文件。ConsumeQueue是消息的邏輯隊(duì)列呵萨,有點(diǎn)類似于數(shù)據(jù)庫(kù)的索引文件奏属,里面存儲(chǔ)的是指向CommitLog文件中消息存儲(chǔ)的地址。
每個(gè)Topic下的每個(gè)Message Queue都會(huì)對(duì)應(yīng)一個(gè)ConsumeQueue文件潮峦,文件的地址是:
{topicNmae}/{filename}, 默認(rèn)路徑: /root/store
在rocketMQ的文件存儲(chǔ)目錄下囱皿,可以看到這樣一個(gè)結(jié)構(gòu)的的而文件。
我們只需要關(guān)心Commitlog忱嘹、Consumequeue铆帽、Index
CommitLog
CommitLog是用來存放消息的物理文件,每個(gè)broker上的commitLog本當(dāng)前機(jī)器上的所有consumerQueue共享德谅,不做任何的區(qū)分。
CommitLog中的文件默認(rèn)大小為1G萨螺,可以動(dòng)態(tài)配置窄做; 當(dāng)一個(gè)文件寫滿以后,會(huì)生成一個(gè)新的
commitlog文件慰技。所有的Topic數(shù)據(jù)是順序?qū)懭朐贑ommitLog文件中的椭盏。
文件名的長(zhǎng)度為20位,左邊補(bǔ)0吻商,剩余未起始偏移量掏颊,比如
00000000000000000000 表示第一個(gè)文件, 文件大小為102410241024艾帐,當(dāng)?shù)谝粋€(gè)文件寫滿之后乌叶,生成第二個(gè)文件
000000000001073741824 表示第二個(gè)文件,起始偏移量為1073741824
ConsumeQueue
consumeQueue表示消息消費(fèi)的邏輯隊(duì)列柒爸,這里面包含MessageQueue在commitlog中的其實(shí)物理位置偏移量offset准浴,消息實(shí)體內(nèi)容的大小和Message Tag的hash值。對(duì)于實(shí)際物理存儲(chǔ)來說捎稚,
consumeQueue對(duì)應(yīng)每個(gè)topic和queueid下的文件乐横,每個(gè)consumeQueue類型的文件也是有大小,每個(gè)文件默認(rèn)大小約為600W個(gè)字節(jié)今野,如果文件滿了后會(huì)也會(huì)生成一個(gè)新的文件
IndexFile
索引文件葡公,如果一個(gè)消息包含Key值的話,會(huì)使用IndexFile存儲(chǔ)消息索引条霜。Index索引文件提供了對(duì)CommitLog進(jìn)行數(shù)據(jù)檢索催什,提供了一種通過key或者時(shí)間區(qū)間來查找CommitLog中的消息的方法。在物理存儲(chǔ)中蛔外,文件名是以創(chuàng)建的時(shí)間戳明明蛆楞,固定的單個(gè)IndexFile大小大概為400M溯乒,一個(gè)IndexFile可以保存2000W個(gè)索引
abort
broker在啟動(dòng)的時(shí)候會(huì)創(chuàng)建一個(gè)空的名為abort的文件,并在shutdown時(shí)將其刪除豹爹,用于標(biāo)識(shí)進(jìn)程是否正常退出裆悄,如果不正常退出,會(huì)在啟動(dòng)時(shí)做故障恢復(fù)
消息存儲(chǔ)的整體結(jié)構(gòu)
RocketMQ的消息存儲(chǔ)采用的是混合型的存儲(chǔ)結(jié)構(gòu),也就是Broker單個(gè)實(shí)例下的所有隊(duì)列公用一個(gè)日志數(shù)據(jù)文件CommitLog臂聋。這個(gè)是和Kafka又一個(gè)不同之處光稼。
為什么不采用kafka的設(shè)計(jì),針對(duì)不同的partition存儲(chǔ)一個(gè)獨(dú)立的物理文件呢孩等?這是因?yàn)樵趉afka的設(shè)計(jì)中艾君,一旦kafka中Topic的Partition數(shù)量過多,隊(duì)列文件會(huì)過多肄方,那么會(huì)給磁盤的IO讀寫造成比較大的壓力冰垄,也就造成了性能瓶頸。所以RocketMQ進(jìn)行了優(yōu)化权她,消息主題統(tǒng)一存儲(chǔ)在CommitLog中虹茶。
當(dāng)然,這種設(shè)計(jì)并不是銀彈隅要,它也有它的優(yōu)缺點(diǎn)
優(yōu)點(diǎn)在于:由于消息主題都是通過CommitLog來進(jìn)行讀寫蝴罪,ConsumerQueue中只存儲(chǔ)很少的數(shù)據(jù),所以隊(duì)列更加輕量化步清。對(duì)于磁盤的訪問是串行化從而避免了磁盤的競(jìng)爭(zhēng)
缺點(diǎn)在于:消息寫入磁盤雖然是基于順序?qū)懸牛亲x的過程確是隨機(jī)的。讀取一條消息會(huì)先讀取
ConsumeQueue廓啊,再讀CommitLog欢搜,會(huì)降低消息讀的效率。
消息發(fā)送到消息接收的整體流程
-
Producer將消息發(fā)送到Broker后崖瞭,Broker會(huì)采用同步或者異步的方式把消息寫入到CommitLog狂巢。
RocketMQ所有的消息都會(huì)存放在CommitLog中,為了保證消息存儲(chǔ)不發(fā)生混亂书聚,對(duì)CommitLog
寫之前會(huì)加鎖唧领,同時(shí)也可以使得消息能夠被順序?qū)懭氲紺ommitLog,只要消息被持久化到磁盤文件CommitLog雌续,那么就可以保證Producer發(fā)送的消息不會(huì)丟失斩个。
-
commitLog持久化后,會(huì)把里面的消息Dispatch到對(duì)應(yīng)的Consume Queue上驯杜,Consume Queue
相當(dāng)于kafka中的partition受啥,是一個(gè)邏輯隊(duì)列,存儲(chǔ)了這個(gè)Queue在CommiLog中的起始o(jì)ffset,
log大小和MessageTag的hashCode滚局。
-
當(dāng)消費(fèi)者進(jìn)行消息消費(fèi)時(shí)居暖,會(huì)先讀取consumerQueue , 邏輯消費(fèi)隊(duì)列ConsumeQueue保存了指
定Topic下的隊(duì)列消息在CommitLog中的起始物理偏移量Offset,消息大小藤肢、和消息Tag的
HashCode值
-
直接從consumequeue中讀取消息是沒有數(shù)據(jù)的太闺,真正的消息主體在commitlog中,所以還需要
從commitlog中讀取消息
什么時(shí)候清理物理消息文件嘁圈?
消息存儲(chǔ)在CommitLog之后省骂,的確是會(huì)被清理的,但是這個(gè)清理只會(huì)在以下任一條件成立才會(huì)批量刪
除消息文件(CommitLog):
- 消息文件過期(默認(rèn)72小時(shí))最住,且到達(dá)清理時(shí)點(diǎn)(默認(rèn)是凌晨4點(diǎn))钞澳,刪除過期文件。
- 消息文件過期(默認(rèn)72小時(shí))涨缚,且磁盤空間達(dá)到了水位線(默認(rèn)75%)轧粟,刪除過期文件。
- 磁盤已經(jīng)達(dá)到必須釋放的上限(85%水位線)的時(shí)候脓魏,則開始批量清理文件(無論是否過期)逃延,直到空間充足。
注:若磁盤空間達(dá)到危險(xiǎn)水位線(默認(rèn)90%)轧拄,出于保護(hù)自身的目的,broker會(huì)拒絕寫入服務(wù)讽膏。
——學(xué)自咕泡學(xué)院