一、 MQ背景&選型
消息隊(duì)列作為高并發(fā)系統(tǒng)的核心組件之一茬祷,能夠幫助業(yè)務(wù)系統(tǒng)解構(gòu)提升開發(fā)效率和系統(tǒng)穩(wěn)定性。主要具有以下優(yōu)勢:
- 削峰填谷(主要解決瞬時(shí)寫壓力大于應(yīng)用服務(wù)能力導(dǎo)致消息丟失慎恒、系統(tǒng)奔潰等問題)
- 系統(tǒng)解耦(解決不同重要程度南窗、不同能力級(jí)別系統(tǒng)之間依賴導(dǎo)致一死全死)
- 提升性能(當(dāng)存在一對(duì)多調(diào)用時(shí)赞哗,可以發(fā)一條消息給消息系統(tǒng)雷则,讓消息系統(tǒng)通知相關(guān)系統(tǒng))
- 蓄流壓測(線上有些鏈路不好壓測,可以通過堆積一定量消息再放開來壓測)
目前主流的MQ主要是Rocketmq肪笋、kafka月劈、Rabbitmq,Rocketmq相比于Rabbitmq藤乙、kafka具有主要優(yōu)勢特性有:
? 支持事務(wù)型消息(消息發(fā)送和DB操作保持兩方的最終一致性猜揪,rabbitmq和kafka不支持)
? 支持結(jié)合rocketmq的多個(gè)系統(tǒng)之間數(shù)據(jù)最終一致性(多方事務(wù),二方事務(wù)是前提)
? 支持18個(gè)級(jí)別的延遲消息(rabbitmq和kafka不支持)
? 支持指定次數(shù)和時(shí)間間隔的失敗消息重發(fā)(kafka不支持坛梁,rabbitmq需要手動(dòng)確認(rèn))
? 支持consumer端tag過濾而姐,減少不必要的網(wǎng)絡(luò)傳輸(rabbitmq和kafka不支持)
? 支持重復(fù)消費(fèi)(rabbitmq不支持,kafka支持)
Rocketmq划咐、kafka拴念、Rabbitmq的詳細(xì)對(duì)比钧萍,請(qǐng)參照下表格:
二、RocketMQ集群概述
1. RocketMQ集群部署結(jié)構(gòu)
1) Name Server
Name Server是一個(gè)幾乎無狀態(tài)節(jié)點(diǎn)政鼠,可集群部署风瘦,節(jié)點(diǎn)之間無任何信息同步。
2) Broker
Broker部署相對(duì)復(fù)雜公般,Broker分為Master與Slave万搔,一個(gè)Master可以對(duì)應(yīng)多個(gè)Slave,但是一個(gè)Slave只能對(duì)應(yīng)一個(gè)Master官帘,Master與Slave的對(duì)應(yīng)關(guān)系通過指定相同的Broker Name瞬雹,不同的Broker Id來定義,BrokerId為0表示Master刽虹,非0表示Slave挖炬。Master也可以部署多個(gè)。
每個(gè)Broker與Name Server集群中的所有節(jié)點(diǎn)建立長連接状婶,定時(shí)(每隔30s)注冊(cè)Topic信息到所有Name Server意敛。Name Server定時(shí)(每隔10s)掃描所有存活broker的連接,如果Name Server超過2分鐘沒有收到心跳膛虫,則Name Server斷開與Broker的連接草姻。
3) Producer
Producer與Name Server集群中的其中一個(gè)節(jié)點(diǎn)(隨機(jī)選擇)建立長連接,定期從Name Server取Topic路由信息稍刀,并向提供Topic服務(wù)的Master建立長連接撩独,且定時(shí)向Master發(fā)送心跳。Producer完全無狀態(tài)账月,可集群部署综膀。
Producer每隔30s(由ClientConfig的pollNameServerInterval)從Name server獲取所有topic隊(duì)列的最新情況,這意味著如果Broker不可用局齿,Producer最多30s能夠感知剧劝,在此期間內(nèi)發(fā)往Broker的所有消息都會(huì)失敗。
Producer每隔30s(由ClientConfig中heartbeatBrokerInterval決定)向所有關(guān)聯(lián)的broker發(fā)送心跳抓歼,Broker每隔10s中掃描所有存活的連接讥此,如果Broker在2分鐘內(nèi)沒有收到心跳數(shù)據(jù),則關(guān)閉與Producer的連接谣妻。
4) Consumer
Consumer與Name Server集群中的其中一個(gè)節(jié)點(diǎn)(隨機(jī)選擇)建立長連接萄喳,定期從Name Server取Topic路由信息,并向提供Topic服務(wù)的Master蹋半、Slave建立長連接他巨,且定時(shí)向Master、Slave發(fā)送心跳。Consumer既可以從Master訂閱消息染突,也可以從Slave訂閱消息匪傍,訂閱規(guī)則由Broker配置決定。
Consumer每隔30s從Name server獲取topic的最新隊(duì)列情況觉痛,這意味著Broker不可用時(shí)役衡,Consumer最多最需要30s才能感知。
Consumer每隔30s(由ClientConfig中heartbeatBrokerInterval決定)向所有關(guān)聯(lián)的broker發(fā)送心跳薪棒,Broker每隔10s掃描所有存活的連接手蝎,若某個(gè)連接2分鐘內(nèi)沒有發(fā)送心跳數(shù)據(jù),則關(guān)閉連接俐芯;并向該Consumer Group的所有Consumer發(fā)出通知棵介,Group內(nèi)的Consumer重新分配隊(duì)列,然后繼續(xù)消費(fèi)吧史。
當(dāng)Consumer得到master宕機(jī)通知后邮辽,轉(zhuǎn)向slave消費(fèi),slave不能保證master的消息100%都同步過來了贸营,因此會(huì)有少量的消息丟失吨述。但是一旦master恢復(fù),未同步過去的消息會(huì)被最終消費(fèi)掉钞脂。
消費(fèi)者對(duì)列是消費(fèi)者連接之后(或者之前有連接過)才創(chuàng)建的揣云。我們將原生的消費(fèi)者標(biāo)識(shí)由 {IP}@{消費(fèi)者group}擴(kuò)展為 {IP}@{消費(fèi)者group}{topic}{tag},(例如xxx.xxx.xxx.xxx@mqtest_producer-group_2m2sTest_tag-zyk)冰啃。任何一個(gè)元素不同邓夕,都認(rèn)為是不同的消費(fèi)端,每個(gè)消費(fèi)端會(huì)擁有一份自己消費(fèi)對(duì)列(默認(rèn)是broker對(duì)列數(shù)量*broker數(shù)量)阎毅。新掛載的消費(fèi)者對(duì)列中擁有commitlog中的所有數(shù)據(jù)焚刚。
三扇调、 Rocketmq如何支持分布式事務(wù)消息
場景
A(存在DB操作)矿咕、B(存在DB操作)兩方需要保證分布式事務(wù)一致性,通過引入中間層MQ肃拜,A和MQ保持事務(wù)一致性(異常情況下通過MQ反查A接口實(shí)現(xiàn)check)痴腌,B和MQ保證事務(wù)一致(通過重試)雌团,從而達(dá)到最終事務(wù)一致性燃领。
原理:大事務(wù) = 小事務(wù) + 異步
1. MQ與DB一致性原理(兩方事務(wù))
流程圖
上圖是RocketMQ提供的保證MQ消息、DB事務(wù)一致性的方案锦援。
MQ消息猛蔽、DB操作一致性方案:
1)發(fā)送消息到MQ服務(wù)器,此時(shí)消息狀態(tài)為SEND_OK。此消息為consumer不可見曼库。
2)執(zhí)行DB操作区岗;DB執(zhí)行成功Commit DB操作,DB執(zhí)行失敗Rollback DB操作毁枯。
3)如果DB執(zhí)行成功慈缔,回復(fù)MQ服務(wù)器,將狀態(tài)為COMMIT_MESSAGE种玛;如果DB執(zhí)行失敗藐鹤,回復(fù)MQ服務(wù)器,將狀態(tài)改為ROLLBACK_MESSAGE赂韵。注意此過程有可能失敗娱节。
4)MQ內(nèi)部提供一個(gè)名為“事務(wù)狀態(tài)服務(wù)”的服務(wù),此服務(wù)會(huì)檢查事務(wù)消息的狀態(tài)祭示,如果發(fā)現(xiàn)消息未COMMIT肄满,則通過Producer啟動(dòng)時(shí)注冊(cè)的TransactionCheckListener來回調(diào)業(yè)務(wù)系統(tǒng),業(yè)務(wù)系統(tǒng)在checkLocalTransactionState方法中檢查DB事務(wù)狀態(tài)质涛,如果成功稠歉,則回復(fù)COMMIT_MESSAGE,否則回復(fù)ROLLBACK_MESSAGE汇陆。
說明:
上面以DB為例轧抗,其實(shí)此處可以是任何業(yè)務(wù)或者數(shù)據(jù)源。
以上SEND_OK瞬测、COMMIT_MESSAGE横媚、ROLLBACK_MESSAGE均是client jar提供的狀態(tài),在MQ服務(wù)器內(nèi)部是一個(gè)數(shù)字月趟。
TransactionCheckListener 是在消息的commit或者rollback消息丟失的情況下才會(huì)回調(diào)(上圖中灰色部分)灯蝴。這種消息丟失只存在于斷網(wǎng)或者rocketmq集群掛了的情況下。當(dāng)rocketmq集群掛了孝宗,如果采用異步刷盤穷躁,存在1s內(nèi)數(shù)據(jù)丟失風(fēng)險(xiǎn),異步刷盤場景下保障事務(wù)沒有意義因妇。所以如果要核心業(yè)務(wù)用Rocketmq解決分布式事務(wù)問題问潭,建議選擇同步刷盤模式。
2. 多系統(tǒng)之間數(shù)據(jù)一致性(多方事務(wù))
當(dāng)需要保證多方(超過2方)的分布式一致性婚被,上面的兩方事務(wù)一致性(通過Rocketmq的事務(wù)性消息解決)已經(jīng)無法支持狡忙。這個(gè)時(shí)候需要引入TCC模式思想(Try-Confirm-Cancel,不清楚的自行百度)址芯。
以上圖交易系統(tǒng)為例:
1)交易系統(tǒng)創(chuàng)建訂單(往DB插入一條記錄)灾茁,同時(shí)發(fā)送訂單創(chuàng)建消息窜觉。通過RocketMq事務(wù)性消息保證一致性
2)接著執(zhí)行完成訂單所需的同步核心RPC服務(wù)(非核心的系統(tǒng)通過監(jiān)聽MQ消息自行處理,處理結(jié)果不會(huì)影響交易狀態(tài))北专。執(zhí)行成功更改訂單狀態(tài)禀挫,同時(shí)發(fā)送MQ消息。
3)交易系統(tǒng)接受自己發(fā)送的訂單創(chuàng)建消息拓颓,通過定時(shí)調(diào)度系統(tǒng)創(chuàng)建延時(shí)回滾任務(wù)(或者使用RocketMq的重試功能语婴,設(shè)置第二次發(fā)送時(shí)間為定時(shí)任務(wù)的延遲創(chuàng)建時(shí)間。在非消息堵塞的情況下驶睦,消息第一次到達(dá)延遲為1ms左右腻格,這時(shí)可能RPC還未執(zhí)行完,訂單狀態(tài)還未設(shè)置為完成啥繁,第二次消費(fèi)時(shí)間可以指定)菜职。延遲任務(wù)先通過查詢訂單狀態(tài)判斷訂單是否完成,完成則不創(chuàng)建回滾任務(wù)旗闽,否則創(chuàng)建酬核。 PS:多個(gè)RPC可以創(chuàng)建一個(gè)回滾任務(wù),通過一個(gè)消費(fèi)組接受一次消息就可以适室;也可以通過創(chuàng)建多個(gè)消費(fèi)組嫡意,一個(gè)消息消費(fèi)多次,每次消費(fèi)創(chuàng)建一個(gè)RPC的回滾任務(wù)捣辆。 回滾任務(wù)失敗蔬螟,通過MQ的重發(fā)來重試。
以上是交易系統(tǒng)和其他系統(tǒng)之間保持最終一致性的解決方案汽畴。
3.案例分析
1) 單機(jī)環(huán)境下的事務(wù)示意圖
如下為A給B轉(zhuǎn)賬的例子旧巾。
步驟 | 動(dòng)作 |
---|---|
1 | 鎖定A的賬戶 |
2 | 鎖定B的賬戶 |
3 | 檢查A賬戶是否有1元 |
4 | A的賬戶扣減1元 |
5 | 給B的賬戶加1元 |
6 | 解鎖B的賬戶 |
7 | 解鎖A的賬戶 |
以上過程在代碼層面甚至可以簡化到在一個(gè)事物中執(zhí)行兩條sql語句。
2) 分布式環(huán)境下事務(wù)
和單機(jī)事務(wù)不同忍些,A鲁猩、B賬戶可能不在同一個(gè)DB中,此時(shí)無法像在單機(jī)情況下使用事物來實(shí)現(xiàn)罢坝。此時(shí)可以通過一下方式實(shí)現(xiàn)廓握,將轉(zhuǎn)賬操作分成兩個(gè)操作。
a) A賬戶
步驟 | 動(dòng)作 |
---|---|
1 | 鎖定A的賬戶 |
2 | 檢查A賬戶是否有1元 |
3 | A的賬戶扣減1元 |
4 | 解鎖A的賬戶 |
b) MQ消息
A賬戶數(shù)據(jù)發(fā)生變化時(shí)嘁酿,發(fā)送MQ消息隙券,MQ服務(wù)器將消息推送給轉(zhuǎn)賬系統(tǒng),轉(zhuǎn)賬系統(tǒng)來給B賬號(hào)加錢闹司。
c) B賬戶
步驟 | 動(dòng)作 |
---|---|
1 | 鎖定B的賬戶 |
2 | 給B的賬戶加1元 |
3 | 解鎖B的賬戶 |
四娱仔、 順序消息
1. 順序消息缺陷
發(fā)送順序消息無法利用集群Fail Over特性消費(fèi)順序消息的并行度依賴于隊(duì)列數(shù)量隊(duì)列熱點(diǎn)問題,個(gè)別隊(duì)列由于哈希不均導(dǎo)致消息過多开仰,消費(fèi)速度跟不上拟枚,產(chǎn)生消息堆積問題遇到消息失敗的消息薪铜,無法跳過众弓,當(dāng)前隊(duì)列消費(fèi)暫停恩溅。
2. 原理
produce在發(fā)送消息的時(shí)候,把消息發(fā)到同一個(gè)隊(duì)列(queue)中,消費(fèi)者注冊(cè)消息監(jiān)聽器為MessageListenerOrderly谓娃,這樣就可以保證消費(fèi)端只有一個(gè)線程去消費(fèi)消息脚乡。
注意:把消息發(fā)到同一個(gè)隊(duì)列(queue),不是同一個(gè)topic滨达,默認(rèn)情況下一個(gè)topic包括4個(gè)queue
3. 擴(kuò)展
可以通過實(shí)現(xiàn)發(fā)送消息的對(duì)列選擇器方法奶稠,實(shí)現(xiàn)部分順序消息。
舉例:比如一個(gè)數(shù)據(jù)庫通過MQ來同步捡遍,只需要保證每個(gè)表的數(shù)據(jù)是同步的就可以锌订。解析binlog,將表名作為對(duì)列選擇器的參數(shù)画株,這樣就可以保證每個(gè)表的數(shù)據(jù)到同一個(gè)對(duì)列里面辆飘,從而保證表數(shù)據(jù)的順序消費(fèi)
五、 最佳實(shí)踐
1. Producer
1) Topic
一個(gè)應(yīng)用盡可能用一個(gè)Topic谓传,消息子類型用tags來標(biāo)識(shí)蜈项,tags可以由應(yīng)用自由設(shè)置。只有發(fā)送消息設(shè)置了tags续挟,消費(fèi)方在訂閱消息時(shí)紧卒,才可以利用tags 在broker做消息過濾。
2) key
每個(gè)消息在業(yè)務(wù)層面的唯一標(biāo)識(shí)碼诗祸,要設(shè)置到 keys 字段跑芳,方便將來定位消息丟失問題。服務(wù)器會(huì)為每個(gè)消息創(chuàng)建索引(哈希索引)直颅,應(yīng)用可以通過 topic聋亡,key來查詢這條消息內(nèi)容,以及消息被誰消費(fèi)际乘。由于是哈希索引坡倔,請(qǐng)務(wù)必保證key 盡可能唯一,這樣可以避免潛在的哈希沖突脖含。
//訂單Id
String orderId= "20034568923546";
message.setKeys(orderId);
3) 日志
消息發(fā)送成功或者失敗罪塔,要打印消息日志,務(wù)必要打印 send result 和key 字段养葵。
4) send
send消息方法征堪,只要不拋異常,就代表發(fā)送成功关拒。但是發(fā)送成功會(huì)有多個(gè)狀態(tài)佃蚜,在sendResult里定義庸娱。
SEND_OK:消息發(fā)送成功
FLUSH_DISK_TIMEOUT:消息發(fā)送成功,但是服務(wù)器刷盤超時(shí)谐算,消息已經(jīng)進(jìn)入服務(wù)器隊(duì)列熟尉,只有此時(shí)服務(wù)器宕機(jī),消息才會(huì)丟失
FLUSH_SLAVE_TIMEOUT:消息發(fā)送成功洲脂,但是服務(wù)器同步到Slave時(shí)超時(shí)斤儿,消息已經(jīng)進(jìn)入服務(wù)器隊(duì)列,只有此時(shí)服務(wù)器宕機(jī)恐锦,消息才會(huì)丟失
SLAVE_NOT_AVAILABLE:消息發(fā)送成功往果,但是此時(shí)slave不可用,消息已經(jīng)進(jìn)入服務(wù)器隊(duì)列一铅,只有此時(shí)服務(wù)器宕機(jī)陕贮,消息才會(huì)丟失
2. Consumer
1) 冪等
RocketMQ使用的消息原語是At Least Once,所以consumer可能多次收到同一個(gè)消息潘飘,此時(shí)務(wù)必做好冪等肮之。
2) 日志
消費(fèi)時(shí)記錄日志,以便后續(xù)定位問題福也。
3) 批量消費(fèi)
盡量使用批量方式消費(fèi)方式局骤,可以很大程度上提高消費(fèi)吞吐量。
六暴凑、 參考資料
1. 文檔
RocketMQ_design.pdf
RocketMQ_experience.pdf
2. 博客
分布式開放消息系統(tǒng)(RocketMQ)的原理與實(shí)踐
http://www.reibang.com/p/453c6e7ff81c
RocketMQ事務(wù)消費(fèi)和順序消費(fèi)詳解
http://www.cnblogs.com/520playboy/p/6750023.html
ZeroCopy
http://www.linuxjournal.com/article/6345
IO方式的性能數(shù)據(jù)
http://stblog.baidu-tech.com/?p=851
作者:彥幀
鏈接:http://www.reibang.com/p/2838890f3284
來源:簡書
著作權(quán)歸作者所有峦甩。商業(yè)轉(zhuǎn)載請(qǐng)聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請(qǐng)注明出處现喳。