一锐锣、 MQ背景&選型
消息隊列作為高并發(fā)系統(tǒng)的核心組件之一玖姑,能夠幫助業(yè)務(wù)系統(tǒng)解構(gòu)提升開發(fā)效率和系統(tǒng)穩(wěn)定性斧拍。主要具有以下優(yōu)勢:
- 削峰填谷(主要解決瞬時寫壓力大于應(yīng)用服務(wù)能力導(dǎo)致消息丟失井濒、系統(tǒng)奔潰等問題)
- 系統(tǒng)解耦(解決不同重要程度婉弹、不同能力級別系統(tǒng)之間依賴導(dǎo)致一死全死)
- 提升性能(當(dāng)存在一對多調(diào)用時片挂,可以發(fā)一條消息給消息系統(tǒng)幻林,讓消息系統(tǒng)通知相關(guān)系統(tǒng))
- 蓄流壓測(線上有些鏈路不好壓測,可以通過堆積一定量消息再放開來壓測)
目前主流的MQ主要是Rocketmq音念、kafka沪饺、Rabbitmq,Rocketmq相比于Rabbitmq闷愤、kafka具有主要優(yōu)勢特性有:
? 支持事務(wù)型消息(消息發(fā)送和DB操作保持兩方的最終一致性整葡,rabbitmq和kafka不支持)
? 支持結(jié)合rocketmq的多個系統(tǒng)之間數(shù)據(jù)最終一致性(多方事務(wù),二方事務(wù)是前提)
? 支持18個級別的延遲消息(rabbitmq和kafka不支持)
? 支持指定次數(shù)和時間間隔的失敗消息重發(fā)(kafka不支持讥脐,rabbitmq需要手動確認)
? 支持consumer端tag過濾遭居,減少不必要的網(wǎng)絡(luò)傳輸(rabbitmq和kafka不支持)
? 支持重復(fù)消費(rabbitmq不支持,kafka支持)
Rocketmq旬渠、kafka俱萍、Rabbitmq的詳細對比,請參照下表格:
二告丢、RocketMQ集群概述
1. RocketMQ集群部署結(jié)構(gòu)
1) Name Server
Name Server是一個幾乎無狀態(tài)節(jié)點枪蘑,可集群部署,節(jié)點之間無任何信息同步岖免。
2) Broker
Broker部署相對復(fù)雜岳颇,Broker分為Master與Slave,一個Master可以對應(yīng)多個Slave觅捆,但是一個Slave只能對應(yīng)一個Master赦役,Master與Slave的對應(yīng)關(guān)系通過指定相同的Broker Name,不同的Broker Id來定義栅炒,BrokerId為0表示Master掂摔,非0表示Slave术羔。Master也可以部署多個。
每個Broker與Name Server集群中的所有節(jié)點建立長連接乙漓,定時(每隔30s)注冊Topic信息到所有Name Server级历。Name Server定時(每隔10s)掃描所有存活broker的連接,如果Name Server超過2分鐘沒有收到心跳叭披,則Name Server斷開與Broker的連接寥殖。
3) Producer
Producer與Name Server集群中的其中一個節(jié)點(隨機選擇)建立長連接,定期從Name Server取Topic路由信息涩蜘,并向提供Topic服務(wù)的Master建立長連接嚼贡,且定時向Master發(fā)送心跳。Producer完全無狀態(tài)同诫,可集群部署粤策。
Producer每隔30s(由ClientConfig的pollNameServerInterval)從Name server獲取所有topic隊列的最新情況,這意味著如果Broker不可用误窖,Producer最多30s能夠感知叮盘,在此期間內(nèi)發(fā)往Broker的所有消息都會失敗。
Producer每隔30s(由ClientConfig中heartbeatBrokerInterval決定)向所有關(guān)聯(lián)的broker發(fā)送心跳霹俺,Broker每隔10s中掃描所有存活的連接柔吼,如果Broker在2分鐘內(nèi)沒有收到心跳數(shù)據(jù),則關(guān)閉與Producer的連接丙唧。
4) Consumer
Consumer與Name Server集群中的其中一個節(jié)點(隨機選擇)建立長連接愈魏,定期從Name Server取Topic路由信息,并向提供Topic服務(wù)的Master想际、Slave建立長連接蝌戒,且定時向Master、Slave發(fā)送心跳沼琉。Consumer既可以從Master訂閱消息,也可以從Slave訂閱消息桩匪,訂閱規(guī)則由Broker配置決定打瘪。
Consumer每隔30s從Name server獲取topic的最新隊列情況,這意味著Broker不可用時傻昙,Consumer最多最需要30s才能感知闺骚。
Consumer每隔30s(由ClientConfig中heartbeatBrokerInterval決定)向所有關(guān)聯(lián)的broker發(fā)送心跳,Broker每隔10s掃描所有存活的連接妆档,若某個連接2分鐘內(nèi)沒有發(fā)送心跳數(shù)據(jù)僻爽,則關(guān)閉連接;并向該Consumer Group的所有Consumer發(fā)出通知贾惦,Group內(nèi)的Consumer重新分配隊列胸梆,然后繼續(xù)消費敦捧。
當(dāng)Consumer得到master宕機通知后,轉(zhuǎn)向slave消費碰镜,slave不能保證master的消息100%都同步過來了兢卵,因此會有少量的消息丟失。但是一旦master恢復(fù)绪颖,未同步過去的消息會被最終消費掉秽荤。
消費者對列是消費者連接之后(或者之前有連接過)才創(chuàng)建的。我們將原生的消費者標(biāo)識由 {IP}@{消費者group}擴展為 {IP}@{消費者group}{topic}{tag}柠横,(例如xxx.xxx.xxx.xxx@mqtest_producer-group_2m2sTest_tag-zyk)窃款。任何一個元素不同,都認為是不同的消費端牍氛,每個消費端會擁有一份自己消費對列(默認是broker對列數(shù)量*broker數(shù)量)晨继。新掛載的消費者對列中擁有commitlog中的所有數(shù)據(jù)。
三踱稍、 Rocketmq如何支持分布式事務(wù)消息
場景
A(存在DB操作)、B(存在DB操作)兩方需要保證分布式事務(wù)一致性悠抹,通過引入中間層MQ珠月,A和MQ保持事務(wù)一致性(異常情況下通過MQ反查A接口實現(xiàn)check),B和MQ保證事務(wù)一致(通過重試)楔敌,從而達到最終事務(wù)一致性啤挎。
原理:大事務(wù) = 小事務(wù) + 異步
1. MQ與DB一致性原理(兩方事務(wù))
流程圖
上圖是RocketMQ提供的保證MQ消息、DB事務(wù)一致性的方案卵凑。
MQ消息庆聘、DB操作一致性方案:
1)發(fā)送消息到MQ服務(wù)器,此時消息狀態(tài)為SEND_OK勺卢。此消息為consumer不可見伙判。
2)執(zhí)行DB操作;DB執(zhí)行成功Commit DB操作黑忱,DB執(zhí)行失敗Rollback DB操作宴抚。
3)如果DB執(zhí)行成功,回復(fù)MQ服務(wù)器甫煞,將狀態(tài)為COMMIT_MESSAGE菇曲;如果DB執(zhí)行失敗,回復(fù)MQ服務(wù)器抚吠,將狀態(tài)改為ROLLBACK_MESSAGE常潮。注意此過程有可能失敗。
4)MQ內(nèi)部提供一個名為“事務(wù)狀態(tài)服務(wù)”的服務(wù)楷力,此服務(wù)會檢查事務(wù)消息的狀態(tài)喊式,如果發(fā)現(xiàn)消息未COMMIT孵户,則通過Producer啟動時注冊的TransactionCheckListener來回調(diào)業(yè)務(wù)系統(tǒng),業(yè)務(wù)系統(tǒng)在checkLocalTransactionState方法中檢查DB事務(wù)狀態(tài)垃帅,如果成功延届,則回復(fù)COMMIT_MESSAGE,否則回復(fù)ROLLBACK_MESSAGE贸诚。
說明:
上面以DB為例方庭,其實此處可以是任何業(yè)務(wù)或者數(shù)據(jù)源。
以上SEND_OK酱固、COMMIT_MESSAGE械念、ROLLBACK_MESSAGE均是client jar提供的狀態(tài),在MQ服務(wù)器內(nèi)部是一個數(shù)字运悲。
TransactionCheckListener 是在消息的commit或者rollback消息丟失的情況下才會回調(diào)(上圖中灰色部分)龄减。這種消息丟失只存在于斷網(wǎng)或者rocketmq集群掛了的情況下。當(dāng)rocketmq集群掛了班眯,如果采用異步刷盤希停,存在1s內(nèi)數(shù)據(jù)丟失風(fēng)險,異步刷盤場景下保障事務(wù)沒有意義署隘。所以如果要核心業(yè)務(wù)用Rocketmq解決分布式事務(wù)問題宠能,建議選擇同步刷盤模式。
2. 多系統(tǒng)之間數(shù)據(jù)一致性(多方事務(wù))
當(dāng)需要保證多方(超過2方)的分布式一致性磁餐,上面的兩方事務(wù)一致性(通過Rocketmq的事務(wù)性消息解決)已經(jīng)無法支持违崇。這個時候需要引入TCC模式思想(Try-Confirm-Cancel,不清楚的自行百度)诊霹。
以上圖交易系統(tǒng)為例:
1)交易系統(tǒng)創(chuàng)建訂單(往DB插入一條記錄)羞延,同時發(fā)送訂單創(chuàng)建消息。通過RocketMq事務(wù)性消息保證一致性
2)接著執(zhí)行完成訂單所需的同步核心RPC服務(wù)(非核心的系統(tǒng)通過監(jiān)聽MQ消息自行處理脾还,處理結(jié)果不會影響交易狀態(tài))伴箩。執(zhí)行成功更改訂單狀態(tài),同時發(fā)送MQ消息鄙漏。
3)交易系統(tǒng)接受自己發(fā)送的訂單創(chuàng)建消息赛蔫,通過定時調(diào)度系統(tǒng)創(chuàng)建延時回滾任務(wù)(或者使用RocketMq的重試功能,設(shè)置第二次發(fā)送時間為定時任務(wù)的延遲創(chuàng)建時間泥张。在非消息堵塞的情況下,消息第一次到達延遲為1ms左右鞠值,這時可能RPC還未執(zhí)行完媚创,訂單狀態(tài)還未設(shè)置為完成,第二次消費時間可以指定)彤恶。延遲任務(wù)先通過查詢訂單狀態(tài)判斷訂單是否完成钞钙,完成則不創(chuàng)建回滾任務(wù)鳄橘,否則創(chuàng)建。 PS:多個RPC可以創(chuàng)建一個回滾任務(wù)芒炼,通過一個消費組接受一次消息就可以瘫怜;也可以通過創(chuàng)建多個消費組,一個消息消費多次本刽,每次消費創(chuàng)建一個RPC的回滾任務(wù)鲸湃。 回滾任務(wù)失敗,通過MQ的重發(fā)來重試子寓。
以上是交易系統(tǒng)和其他系統(tǒng)之間保持最終一致性的解決方案暗挑。
3.案例分析
1) 單機環(huán)境下的事務(wù)示意圖
如下為A給B轉(zhuǎn)賬的例子坎缭。
步驟 | 動作 |
---|---|
1 | 鎖定A的賬戶 |
2 | 鎖定B的賬戶 |
3 | 檢查A賬戶是否有1元 |
4 | A的賬戶扣減1元 |
5 | 給B的賬戶加1元 |
6 | 解鎖B的賬戶 |
7 | 解鎖A的賬戶 |
以上過程在代碼層面甚至可以簡化到在一個事物中執(zhí)行兩條sql語句轻掩。
2) 分布式環(huán)境下事務(wù)
和單機事務(wù)不同,A碱妆、B賬戶可能不在同一個DB中鲜屏,此時無法像在單機情況下使用事物來實現(xiàn)烹看。此時可以通過一下方式實現(xiàn),將轉(zhuǎn)賬操作分成兩個操作洛史。
a) A賬戶
步驟 | 動作 |
---|---|
1 | 鎖定A的賬戶 |
2 | 檢查A賬戶是否有1元 |
3 | A的賬戶扣減1元 |
4 | 解鎖A的賬戶 |
b) MQ消息
A賬戶數(shù)據(jù)發(fā)生變化時惯殊,發(fā)送MQ消息,MQ服務(wù)器將消息推送給轉(zhuǎn)賬系統(tǒng)虹菲,轉(zhuǎn)賬系統(tǒng)來給B賬號加錢靠胜。
c) B賬戶
步驟 | 動作 |
---|---|
1 | 鎖定B的賬戶 |
2 | 給B的賬戶加1元 |
3 | 解鎖B的賬戶 |
四、 順序消息
1. 順序消息缺陷
發(fā)送順序消息無法利用集群Fail Over特性消費順序消息的并行度依賴于隊列數(shù)量隊列熱點問題毕源,個別隊列由于哈希不均導(dǎo)致消息過多浪漠,消費速度跟不上,產(chǎn)生消息堆積問題遇到消息失敗的消息霎褐,無法跳過址愿,當(dāng)前隊列消費暫停。
2. 原理
produce在發(fā)送消息的時候冻璃,把消息發(fā)到同一個隊列(queue)中,消費者注冊消息監(jiān)聽器為MessageListenerOrderly响谓,這樣就可以保證消費端只有一個線程去消費消息。
注意:把消息發(fā)到同一個隊列(queue)省艳,不是同一個topic娘纷,默認情況下一個topic包括4個queue
3. 擴展
可以通過實現(xiàn)發(fā)送消息的對列選擇器方法,實現(xiàn)部分順序消息跋炕。
舉例:比如一個數(shù)據(jù)庫通過MQ來同步赖晶,只需要保證每個表的數(shù)據(jù)是同步的就可以。解析binlog,將表名作為對列選擇器的參數(shù)遏插,這樣就可以保證每個表的數(shù)據(jù)到同一個對列里面捂贿,從而保證表數(shù)據(jù)的順序消費
五、 最佳實踐
1. Producer
1) Topic
一個應(yīng)用盡可能用一個Topic胳嘲,消息子類型用tags來標(biāo)識厂僧,tags可以由應(yīng)用自由設(shè)置。只有發(fā)送消息設(shè)置了tags了牛,消費方在訂閱消息時颜屠,才可以利用tags 在broker做消息過濾。
2) key
每個消息在業(yè)務(wù)層面的唯一標(biāo)識碼白魂,要設(shè)置到 keys 字段汽纤,方便將來定位消息丟失問題。服務(wù)器會為每個消息創(chuàng)建索引(哈希索引)福荸,應(yīng)用可以通過 topic蕴坪,key來查詢這條消息內(nèi)容,以及消息被誰消費敬锐。由于是哈希索引背传,請務(wù)必保證key 盡可能唯一,這樣可以避免潛在的哈希沖突台夺。
//訂單Id
String orderId= "20034568923546";
message.setKeys(orderId);
3) 日志
消息發(fā)送成功或者失敗径玖,要打印消息日志,務(wù)必要打印 send result 和key 字段颤介。
4) send
send消息方法梳星,只要不拋異常,就代表發(fā)送成功滚朵。但是發(fā)送成功會有多個狀態(tài)冤灾,在sendResult里定義。
SEND_OK:消息發(fā)送成功
FLUSH_DISK_TIMEOUT:消息發(fā)送成功辕近,但是服務(wù)器刷盤超時韵吨,消息已經(jīng)進入服務(wù)器隊列,只有此時服務(wù)器宕機移宅,消息才會丟失
FLUSH_SLAVE_TIMEOUT:消息發(fā)送成功归粉,但是服務(wù)器同步到Slave時超時,消息已經(jīng)進入服務(wù)器隊列漏峰,只有此時服務(wù)器宕機糠悼,消息才會丟失
SLAVE_NOT_AVAILABLE:消息發(fā)送成功,但是此時slave不可用浅乔,消息已經(jīng)進入服務(wù)器隊列绢掰,只有此時服務(wù)器宕機,消息才會丟失
2. Consumer
1) 冪等
RocketMQ使用的消息原語是At Least Once,所以consumer可能多次收到同一個消息滴劲,此時務(wù)必做好冪等。
2) 日志
消費時記錄日志顾复,以便后續(xù)定位問題班挖。
3) 批量消費
盡量使用批量方式消費方式,可以很大程度上提高消費吞吐量芯砸。
六萧芙、 參考資料
1. 文檔
RocketMQ_design.pdf
RocketMQ_experience.pdf
2. 博客
分布式開放消息系統(tǒng)(RocketMQ)的原理與實踐
http://www.reibang.com/p/453c6e7ff81c
RocketMQ事務(wù)消費和順序消費詳解
http://www.cnblogs.com/520playboy/p/6750023.html
ZeroCopy
http://www.linuxjournal.com/article/6345
IO方式的性能數(shù)據(jù)