分布式開放消息系統(tǒng)(RocketMQ)的原理與實(shí)踐
來源:http://www.reibang.com/p/453c6e7ff81c
備注:
如果您此前未接觸過RocketMQ蜒秤,請先閱讀附錄部分,以便了解RocketMQ的整體架構(gòu)和相關(guān)術(shù)語
文中的MQServer與Broker表示同一概念
分布式消息系統(tǒng)作為實(shí)現(xiàn)分布式系統(tǒng)可擴(kuò)展域帐、可伸縮性的關(guān)鍵組件,需要具有高吞吐量、高可用等特點(diǎn)碳胳。而談到消息系統(tǒng)的設(shè)計(jì),就回避不了兩個(gè)問題:
消息的順序問題
消息的重復(fù)問題
RocketMQ作為阿里開源的一款高性能沫勿、高吞吐量的消息中間件,它是怎樣來解決這兩個(gè)問題的味混?RocketMQ 有哪些關(guān)鍵特性产雹?其實(shí)現(xiàn)原理是怎樣的?
關(guān)鍵特性以及其實(shí)現(xiàn)原理
一翁锡、順序消息
消息有序指的是一類消息消費(fèi)時(shí)蔓挖,能按照發(fā)送的順序來消費(fèi)。例如:一個(gè)訂單產(chǎn)生了 3 條消息馆衔,分別是訂單創(chuàng)建瘟判、訂單付款、訂單完成角溃。消費(fèi)時(shí)拷获,要按照這個(gè)順序消費(fèi)才有意義。但同時(shí)訂單之間又是可以并行消費(fèi)的减细。
假如生產(chǎn)者產(chǎn)生了2條消息:M1匆瓜、M2,要保證這兩條消息的順序,應(yīng)該怎樣做驮吱?你腦中想到的可能是這樣:
你可能會采用這種方式保證消息順序
M1發(fā)送到S1后茧妒,M2發(fā)送到S2,如果要保證M1先于M2被消費(fèi)左冬,那么需要M1到達(dá)消費(fèi)端后桐筏,通知S2,然后S2再將M2發(fā)送到消費(fèi)端拇砰。
這個(gè)模型存在的問題是梅忌,如果M1和M2分別發(fā)送到兩臺Server上,就不能保證M1先達(dá)到毕匀,也就不能保證M1被先消費(fèi)铸鹰,那么就需要在MQ Server集群維護(hù)消息的順序。那么如何解決皂岔?一種簡單的方式就是將M1蹋笼、M2發(fā)送到同一個(gè)Server上:
保證消息順序,你改進(jìn)后的方法
這樣可以保證M1先于M2到達(dá)MQServer(客戶端等待M1成功后再發(fā)送M2)躁垛,根據(jù)先達(dá)到先被消費(fèi)的原則剖毯,M1會先于M2被消費(fèi),這樣就保證了消息的順序教馆。
這個(gè)模型逊谋,理論上可以保證消息的順序,但在實(shí)際運(yùn)用中你應(yīng)該會遇到下面的問題:
網(wǎng)絡(luò)延遲問題
只要將消息從一臺服務(wù)器發(fā)往另一臺服務(wù)器土铺,就會存在網(wǎng)絡(luò)延遲問題胶滋。如上圖所示,如果發(fā)送M1耗時(shí)大于發(fā)送M2的耗時(shí)悲敷,那么M2就先被消費(fèi)究恤,仍然不能保證消息的順序。即使M1和M2同時(shí)到達(dá)消費(fèi)端后德,由于不清楚消費(fèi)端1和消費(fèi)端2的負(fù)載情況部宿,仍然有可能出現(xiàn)M2先于M1被消費(fèi)。如何解決這個(gè)問題瓢湃?將M1和M2發(fā)往同一個(gè)消費(fèi)者即可理张,且發(fā)送M1后,需要消費(fèi)端響應(yīng)成功后才能發(fā)送M2绵患。
但又會引入另外一個(gè)問題雾叭,如果發(fā)送M1后,消費(fèi)端1沒有響應(yīng)落蝙,那是繼續(xù)發(fā)送M2呢拷况,還是重新發(fā)送M1作煌?一般為了保證消息一定被消費(fèi),肯定會選擇重發(fā)M1到另外一個(gè)消費(fèi)端2赚瘦,就如下圖所示粟誓。
保證消息順序的正確姿勢
這樣的模型就嚴(yán)格保證消息的順序,細(xì)心的你仍然會發(fā)現(xiàn)問題起意,消費(fèi)端1沒有響應(yīng)Server時(shí)有兩種情況鹰服,一種是M1確實(shí)沒有到達(dá),另外一種情況是消費(fèi)端1已經(jīng)響應(yīng)揽咕,但是Server端沒有收到悲酷。如果是第二種情況,重發(fā)M1亲善,就會造成M1被重復(fù)消費(fèi)设易。也就是我們后面要說的第二個(gè)問題,消息重復(fù)問題蛹头。
回過頭來看消息順序問題顿肺,嚴(yán)格的順序消息非常容易理解,而且處理問題也比較容易渣蜗,要實(shí)現(xiàn)嚴(yán)格的順序消息屠尊,簡單且可行的辦法就是:
保證生產(chǎn)者 - MQServer - 消費(fèi)者是一對一對一的關(guān)系
但是這樣設(shè)計(jì),并行度就成為了消息系統(tǒng)的瓶頸(吞吐量不夠)耕拷,也會導(dǎo)致更多的異常處理讼昆,比如:只要消費(fèi)端出現(xiàn)問題,就會導(dǎo)致整個(gè)處理流程阻塞骚烧,我們不得不花費(fèi)更多的精力來解決阻塞的問題浸赫。
但我們的最終目標(biāo)是要集群的高容錯(cuò)性和高吞吐量。這似乎是一對不可調(diào)和的矛盾赃绊,那么阿里是如何解決的既峡?
世界上解決一個(gè)計(jì)算機(jī)問題最簡單的方法:“恰好”不需要解決它!—— 沈詢
有些問題凭戴,看起來很重要,但實(shí)際上我們可以通過合理的設(shè)計(jì)或者將問題分解來規(guī)避炕矮。如果硬要把時(shí)間花在解決它們身上么夫,實(shí)際上是浪費(fèi)的,效率低下的肤视。從這個(gè)角度來看消息的順序問題档痪,我們可以得出兩個(gè)結(jié)論:
1、不關(guān)注亂序的應(yīng)用實(shí)際大量存在
2邢滑、隊(duì)列無序并不意味著消息無序
最后我們從源碼角度分析RocketMQ怎么實(shí)現(xiàn)發(fā)送順序消息腐螟。
一般消息是通過輪詢所有隊(duì)列來發(fā)送的(負(fù)載均衡策略),順序消息可以根據(jù)業(yè)務(wù),比如說訂單號相同的消息發(fā)送到同一個(gè)隊(duì)列乐纸。下面的示例中衬廷,OrderId相同的消息,會發(fā)送到同一個(gè)隊(duì)列:
// RocketMQ默認(rèn)提供了兩種MessageQueueSelector實(shí)現(xiàn):隨機(jī)/Hash
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
在獲取到路由信息以后汽绢,會根據(jù)MessageQueueSelector實(shí)現(xiàn)的算法來選擇一個(gè)隊(duì)列吗跋,同一個(gè)OrderId獲取到的隊(duì)列是同一個(gè)隊(duì)列。
private SendResult send() {
// 獲取topic路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
// 根據(jù)我們的算法宁昭,選擇一個(gè)發(fā)送隊(duì)列
// 這里的arg = orderId
mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
if (mq != null) {
return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
}
}
}
二跌宛、消息重復(fù)
上面在解決消息順序問題時(shí),引入了一個(gè)新的問題积仗,就是消息重復(fù)疆拘。那么RocketMQ是怎樣解決消息重復(fù)的問題呢?還是“恰好”不解決寂曹。
造成消息的重復(fù)的根本原因是:網(wǎng)絡(luò)不可達(dá)哎迄。只要通過網(wǎng)絡(luò)交換數(shù)據(jù),就無法避免這個(gè)問題稀颁。所以解決這個(gè)問題的辦法就是不解決芬失,轉(zhuǎn)而繞過這個(gè)問題。那么問題就變成了:如果消費(fèi)端收到兩條一樣的消息匾灶,應(yīng)該怎樣處理棱烂?
1、消費(fèi)端處理消息的業(yè)務(wù)邏輯保持冪等性
2阶女、保證每條消息都有唯一編號且保證消息處理成功與去重表的日志同時(shí)出現(xiàn)
第1條很好理解颊糜,只要保持冪等性,不管來多少條重復(fù)消息秃踩,最后處理的結(jié)果都一樣衬鱼。第2條原理就是利用一張日志表來記錄已經(jīng)處理成功的消息的ID,如果新到的消息ID已經(jīng)在日志表中憔杨,那么就不再處理這條消息鸟赫。
我們可以看到第1條的解決方式,很明顯應(yīng)該在消費(fèi)端實(shí)現(xiàn)消别,不屬于消息系統(tǒng)要實(shí)現(xiàn)的功能抛蚤。第2條可以消息系統(tǒng)實(shí)現(xiàn),也可以業(yè)務(wù)端實(shí)現(xiàn)寻狂。正常情況下出現(xiàn)重復(fù)消息的概率不一定大岁经,且由消息系統(tǒng)實(shí)現(xiàn)的話,肯定會對消息系統(tǒng)的吞吐量和高可用有影響蛇券,所以最好還是由業(yè)務(wù)端自己處理消息重復(fù)的問題缀壤,這也是RocketMQ不解決消息重復(fù)的問題的原因樊拓。
RocketMQ不保證消息不重復(fù),如果你的業(yè)務(wù)需要保證嚴(yán)格的不重復(fù)消息塘慕,需要你自己在業(yè)務(wù)端去重筋夏。
三、事務(wù)消息
RocketMQ除了支持普通消息苍糠,順序消息叁丧,另外還支持事務(wù)消息。首先討論一下什么是事務(wù)消息以及支持事務(wù)消息的必要性岳瞭。我們以一個(gè)轉(zhuǎn)帳的場景為例來說明這個(gè)問題:Bob向Smith轉(zhuǎn)賬100塊拥娄。
在單機(jī)環(huán)境下,執(zhí)行事務(wù)的情況瞳筏,大概是下面這個(gè)樣子:
單機(jī)環(huán)境下轉(zhuǎn)賬事務(wù)示意圖
當(dāng)用戶增長到一定程度稚瘾,Bob和Smith的賬戶及余額信息已經(jīng)不在同一臺服務(wù)器上了,那么上面的流程就變成了這樣:
集群環(huán)境下轉(zhuǎn)賬事務(wù)示意圖
這時(shí)候你會發(fā)現(xiàn)姚炕,同樣是一個(gè)轉(zhuǎn)賬的業(yè)務(wù)摊欠,在集群環(huán)境下,耗時(shí)居然成倍的增長柱宦,這顯然是不能夠接受的些椒。那我們?nèi)绾蝸硪?guī)避這個(gè)問題?
大事務(wù) = 小事務(wù) + 異步
將大事務(wù)拆分成多個(gè)小事務(wù)異步執(zhí)行掸刊。這樣基本上能夠?qū)⒖鐧C(jī)事務(wù)的執(zhí)行效率優(yōu)化到與單機(jī)一致免糕。轉(zhuǎn)賬的事務(wù)就可以分解成如下兩個(gè)小事務(wù):
小事務(wù)+異步消息
圖中執(zhí)行本地事務(wù)(Bob賬戶扣款)和發(fā)送異步消息應(yīng)該保持同時(shí)成功或者失敗中,也就是扣款成功了忧侧,發(fā)送消息一定要成功石窑,如果扣款失敗了,就不能再發(fā)送消息蚓炬。那問題是:我們是先扣款還是先發(fā)送消息呢松逊?
首先我們看下,先發(fā)送消息肯夏,大致的示意圖如下:
事務(wù)消息:先發(fā)送消息
存在的問題是:如果消息發(fā)送成功经宏,但是扣款失敗,消費(fèi)端就會消費(fèi)此消息驯击,進(jìn)而向Smith賬戶加錢烁兰。
先發(fā)消息不行,那我們就先扣款唄余耽,大致的示意圖如下:
事務(wù)消息-先扣款
存在的問題跟上面類似:如果扣款成功缚柏,發(fā)送消息失敗苹熏,就會出現(xiàn)Bob扣錢了碟贾,但是Smith賬戶未加錢币喧。
可能大家會有很多的方法來解決這個(gè)問題,比如:直接將發(fā)消息放到Bob扣款的事務(wù)中去袱耽,如果發(fā)送失敗杀餐,拋出異常,事務(wù)回滾朱巨。這樣的處理方式也符合“恰好”不需要解決的原則史翘。RocketMQ支持事務(wù)消息,下面我們來看看RocketMQ是怎樣來實(shí)現(xiàn)的冀续。
RocketMQ實(shí)現(xiàn)發(fā)送事務(wù)消息
RocketMQ第一階段發(fā)送Prepared消息時(shí)琼讽,會拿到消息的地址,第二階段執(zhí)行本地事物洪唐,第三階段通過第一階段拿到的地址去訪問消息钻蹬,并修改狀態(tài)。細(xì)心的你可能又發(fā)現(xiàn)問題了凭需,如果確認(rèn)消息發(fā)送失敗了怎么辦问欠?RocketMQ會定期掃描消息集群中的事物消息,這時(shí)候發(fā)現(xiàn)了Prepared消息粒蜈,它會向消息發(fā)送者確認(rèn)顺献,Bob的錢到底是減了還是沒減呢?如果減了是回滾還是繼續(xù)發(fā)送確認(rèn)消息呢枯怖?RocketMQ會根據(jù)發(fā)送端設(shè)置的策略來決定是回滾還是繼續(xù)發(fā)送確認(rèn)消息注整。這樣就保證了消息發(fā)送與本地事務(wù)同時(shí)成功或同時(shí)失敗。
那我們來看下RocketMQ源碼嫁怀,是不是這樣來處理事務(wù)消息的设捐。客戶端發(fā)送事務(wù)消息的部分(完整代碼請查看:rocketmq-example工程下的com.alibaba.rocketmq.example.transaction.TransactionProducer):
// 未決事務(wù)塘淑,MQ服務(wù)器回查客戶端
// 也就是上文所說的萝招,當(dāng)RocketMQ發(fā)現(xiàn)Prepared消息
時(shí),會根據(jù)這個(gè)Listener實(shí)現(xiàn)的策略來決斷事務(wù)
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
// 構(gòu)造事務(wù)消息的生產(chǎn)者
TransactionMQProducer producer = new TransactionMQProducer("groupName");
// 設(shè)置事務(wù)決斷處理類
producer.setTransactionCheckListener(transactionCheckListener);
// 本地事務(wù)的處理邏輯存捺,相當(dāng)于示例中檢查Bob賬戶并扣錢的邏輯
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
producer.start()
// 構(gòu)造MSG槐沼,省略構(gòu)造參數(shù)
Message msg = new Message(......);
// 發(fā)送消息
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
producer.shutdown();
接著查看sendMessageInTransaction方法的源碼,總共分為3個(gè)階段:發(fā)送Prepared消息捌治、執(zhí)行本地事務(wù)岗钩、發(fā)送確認(rèn)消息。
public TransactionSendResult sendMessageInTransaction(.....) {
// 邏輯代碼肖油,非實(shí)際代碼
// 1.發(fā)送消息
sendResult = this.send(msg);
// sendResult.getSendStatus() == SEND_OK
// 2.如果消息發(fā)送成功兼吓,處理與消息關(guān)聯(lián)的本地事務(wù)單元
LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
// 3.結(jié)束事務(wù)
this.endTransaction(sendResult, localTransactionState, localException);
}
endTransaction方法會將請求發(fā)往broker(mq server)去更新事物消息的最終狀態(tài):
根據(jù)sendResult找到Prepared消息
根據(jù)localTransaction更新消息的最終狀態(tài)
如果endTransaction方法執(zhí)行失敗,導(dǎo)致數(shù)據(jù)沒有發(fā)送到broker森枪,broker會有回查線程定時(shí)(默認(rèn)1分鐘)掃描每個(gè)存儲事務(wù)狀態(tài)的表格文件视搏,如果是已經(jīng)提交或者回滾的消息直接跳過审孽,如果是prepared狀態(tài)則會向Producer發(fā)起CheckTransaction請求,Producer會調(diào)用DefaultMQProducerImpl.checkTransactionState()方法來處理broker的定時(shí)回調(diào)請求浑娜,而checkTransactionState會調(diào)用我們的事務(wù)設(shè)置的決斷方法佑力,最后調(diào)用endTransactionOneway讓broker來更新消息的最終狀態(tài)。
再回到轉(zhuǎn)賬的例子筋遭,如果Bob的賬戶的余額已經(jīng)減少打颤,且消息已經(jīng)發(fā)送成功,Smith端開始消費(fèi)這條消息漓滔,這個(gè)時(shí)候就會出現(xiàn)消費(fèi)失敗和消費(fèi)超時(shí)兩個(gè)問題哼绑?解決超時(shí)問題的思路就是一直重試奠滑,直到消費(fèi)端消費(fèi)消息成功,整個(gè)過程中有可能會出現(xiàn)消息重復(fù)的問題,按照前面的思路解決即可伤塌。
消費(fèi)事務(wù)消息
這樣基本上可以解決超時(shí)問題见芹,但是如果消費(fèi)失敗怎么辦叮叹?阿里提供給我們的解決方法是:人工解決宙项。大家可以考慮一下,按照事務(wù)的流程畅形,因?yàn)槟撤N原因Smith加款失敗养距,需要回滾整個(gè)流程。如果消息系統(tǒng)要實(shí)現(xiàn)這個(gè)回滾流程的話日熬,系統(tǒng)復(fù)雜度將大大提升棍厌,且很容易出現(xiàn)Bug,估計(jì)出現(xiàn)Bug的概率會比消費(fèi)失敗的概率大很多竖席。我們需要衡量是否值得花這么大的代價(jià)來解決這樣一個(gè)出現(xiàn)概率非常小的問題耘纱,這也是大家在解決疑難問題時(shí)需要多多思考的地方。
20160321補(bǔ)充:在3.2.6版本中移除了事務(wù)消息的實(shí)現(xiàn)毕荐,所以此版本不支持事務(wù)消息束析,具體情況請參考rocketmq的issues:
https://github.com/alibaba/RocketMQ/issues/65
https://github.com/alibaba/RocketMQ/issues/138
https://github.com/alibaba/RocketMQ/issues/156
四、Producer如何發(fā)送消息
Producer輪詢某topic下的所有隊(duì)列的方式來實(shí)現(xiàn)發(fā)送方的負(fù)載均衡憎亚,如下圖所示:
producer發(fā)送消息負(fù)載均衡
首先分析一下RocketMQ的客戶端發(fā)送消息的源碼:
// 構(gòu)造Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 初始化Producer员寇,整個(gè)應(yīng)用生命周期內(nèi),只需要初始化1次
producer.start();
// 構(gòu)造Message
Message msg = new Message("TopicTest1",// topic
"TagA",// tag:給消息打標(biāo)簽,用于區(qū)分一類消息第美,可為null
"OrderID188",// key:自定義Key蝶锋,可以用于去重,可為null
("Hello MetaQ").getBytes());// body:消息內(nèi)容
// 發(fā)送消息并返回結(jié)果
SendResult sendResult = producer.send(msg);
// 清理資源什往,關(guān)閉網(wǎng)絡(luò)連接扳缕,注銷自己
producer.shutdown();
在整個(gè)應(yīng)用生命周期內(nèi),生產(chǎn)者需要調(diào)用一次start方法來初始化,初始化主要完成的任務(wù)有:
如果沒有指定namesrv地址躯舔,將會自動(dòng)尋址
啟動(dòng)定時(shí)任務(wù):更新namesrv地址贡必、從namsrv更新topic路由信息、清理已經(jīng)掛掉的broker庸毫、向所有broker發(fā)送心跳...
啟動(dòng)負(fù)載均衡的服務(wù)
初始化完成后,開始發(fā)送消息衫樊,發(fā)送消息的主要代碼如下:
private SendResult sendDefaultImpl(Message msg,......) {
// 檢查Producer的狀態(tài)是否是RUNNING
this.makeSureStateOK();
// 檢查msg是否合法:是否為null飒赃、topic,body是否為空、body是否超長
Validators.checkMessage(msg, this.defaultMQProducer);
// 獲取topic路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
// 從路由信息中選擇一個(gè)消息隊(duì)列
MessageQueue mq = topicPublishInfo.selectOneMessageQueue(lastBrokerName);
// 將消息發(fā)送到該隊(duì)列上去
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
}
代碼中需要關(guān)注的兩個(gè)方法tryToFindTopicPublishInfo和selectOneMessageQueue科侈。前面說過在producer初始化時(shí)载佳,會啟動(dòng)定時(shí)任務(wù)獲取路由信息并更新到本地緩存,所以tryToFindTopicPublishInfo會首先從緩存中獲取topic路由信息臀栈,如果沒有獲取到蔫慧,則會自己去namesrv獲取路由信息。selectOneMessageQueue方法通過輪詢的方式权薯,返回一個(gè)隊(duì)列姑躲,以達(dá)到負(fù)載均衡的目的。
如果Producer發(fā)送消息失敗盟蚣,會自動(dòng)重試黍析,重試的策略:
重試次數(shù) < retryTimesWhenSendFailed(可配置)
總的耗時(shí)(包含重試n次的耗時(shí)) < sendMsgTimeout(發(fā)送消息時(shí)傳入的參數(shù))
同時(shí)滿足上面兩個(gè)條件后,Producer會選擇另外一個(gè)隊(duì)列發(fā)送消息
五屎开、消息存儲
RocketMQ的消息存儲是由consume queue和commit log配合完成的阐枣。
1、Consume Queue
consume queue是消息的邏輯隊(duì)列奄抽,相當(dāng)于字典的目錄蔼两,用來指定消息在物理文件commit log上的位置。
我們可以在配置中指定consumequeue與commitlog存儲的目錄
每個(gè)topic下的每個(gè)queue都有一個(gè)對應(yīng)的consumequeue文件逞度,比如:
{topicName}/
{fileName}
Consume Queue文件組織额划,如圖所示:
Consume Queue文件組織示意圖
根據(jù)topic和queueId來組織文件,圖中TopicA有兩個(gè)隊(duì)列0,1档泽,那么TopicA和QueueId=0組成一個(gè)ConsumeQueue锁孟,TopicA和QueueId=1組成另一個(gè)ConsumeQueue。
按照消費(fèi)端的GroupName來分組重試隊(duì)列茁瘦,如果消費(fèi)端消費(fèi)失敗品抽,消息將被發(fā)往重試隊(duì)列中,比如圖中的%RETRY%ConsumerGroupA甜熔。
按照消費(fèi)端的GroupName來分組死信隊(duì)列圆恤,如果消費(fèi)端消費(fèi)失敗,并重試指定次數(shù)后,仍然失敗盆昙,則發(fā)往死信隊(duì)列羽历,比如圖中的%DLQ%ConsumerGroupA。
死信隊(duì)列(Dead Letter Queue)一般用于存放由于某種原因無法傳遞的消息淡喜,比如處理失敗或者已經(jīng)過期的消息秕磷。
Consume Queue中存儲單元是一個(gè)20字節(jié)定長的二進(jìn)制數(shù)據(jù),順序?qū)戫樞蜃x炼团,如下圖所示:
consumequeue文件存儲單元格式
CommitLog Offset是指這條消息在Commit Log文件中的實(shí)際偏移量
Size存儲中消息的大小
Message Tag HashCode存儲消息的Tag的哈希值:主要用于訂閱時(shí)消息過濾(訂閱時(shí)如果指定了Tag澎嚣,會根據(jù)HashCode來快速查找到訂閱的消息)
2、Commit Log
CommitLog:消息存放的物理文件瘟芝,每臺broker上的commitlog被本機(jī)所有的queue共享易桃,不做任何區(qū)分。
文件的默認(rèn)位置如下锌俱,仍然可通過配置文件修改:
{commitlog}${fileName}
CommitLog的消息存儲單元長度不固定晤郑,文件順序?qū)懀S機(jī)讀贸宏。消息的存儲結(jié)構(gòu)如下表所示造寝,按照編號順序以及編號對應(yīng)的內(nèi)容依次存儲。
Commit Log存儲單元結(jié)構(gòu)圖
3吭练、消息存儲實(shí)現(xiàn)
消息存儲實(shí)現(xiàn)匹舞,比較復(fù)雜,也值得大家深入了解线脚,后面會單獨(dú)成文來分析赐稽,這小節(jié)只以代碼說明一下具體的流程。
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
synchronized (this) {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
// Here settings are stored timestamp, in order to ensure an orderly global
msg.setStoreTimestamp(beginLockTimestamp);
// MapedFile:操作物理文件在內(nèi)存中的映射以及將內(nèi)存數(shù)據(jù)持久化到物理文件中
MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile();
// 將Message追加到文件commitlog
result = mapedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:break;
case END_OF_FILE:
// Create a new file, re-write the message
mapedFile = this.mapedFileQueue.getLastMapedFile();
result = mapedFile.appendMessage(msg, this.appendMessageCallback);
break;
DispatchRequest dispatchRequest = new DispatchRequest(
topic,// 1
queueId,// 2
result.getWroteOffset(),// 3
result.getWroteBytes(),// 4
tagsCode,// 5
msg.getStoreTimestamp(),// 6
result.getLogicsOffset(),// 7
msg.getKeys(),// 8
/**
- Transaction
*/
msg.getSysFlag(),// 9
msg.getPreparedTransactionOffset());// 10
// 1.分發(fā)消息位置到ConsumeQueue
// 2.分發(fā)到IndexService建立索引
this.defaultMessageStore.putDispatchRequest(dispatchRequest);
}
4浑侥、消息的索引文件
如果一個(gè)消息包含key值的話姊舵,會使用IndexFile存儲消息索引,文件的內(nèi)容結(jié)構(gòu)如圖:
消息索引
索引文件主要用于根據(jù)key來查詢消息的寓落,流程主要是:
根據(jù)查詢的 key 的 hashcode%slotNum 得到具體的槽的位置(slotNum 是一個(gè)索引文件里面包含的最大槽的數(shù)目括丁,例如圖中所示 slotNum=5000000)
根據(jù) slotValue(slot 位置對應(yīng)的值)查找到索引項(xiàng)列表的最后一項(xiàng)(倒序排列,slotValue 總是指向最新的一個(gè)索引項(xiàng))
遍歷索引項(xiàng)列表返回查詢時(shí)間范圍內(nèi)的結(jié)果集(默認(rèn)一次最大返回的 32 條記錄)
六、消息訂閱
RocketMQ消息訂閱有兩種模式伶选,一種是Push模式史飞,即MQServer主動(dòng)向消費(fèi)端推送;另外一種是Pull模式仰税,即消費(fèi)端在需要時(shí)构资,主動(dòng)到MQServer拉取。但在具體實(shí)現(xiàn)時(shí)陨簇,Push和Pull模式都是采用消費(fèi)端主動(dòng)拉取的方式吐绵。
首先看下消費(fèi)端的負(fù)載均衡:
消費(fèi)端負(fù)載均衡
消費(fèi)端會通過RebalanceService線程,10秒鐘做一次基于topic下的所有隊(duì)列負(fù)載:
遍歷Consumer下的所有topic,然后根據(jù)topic訂閱所有的消息
獲取同一topic和Consumer Group下的所有Consumer
然后根據(jù)具體的分配策略來分配消費(fèi)隊(duì)列己单,分配的策略包含:平均分配唉窃、消費(fèi)端配置等
如同上圖所示:如果有 5 個(gè)隊(duì)列,2 個(gè) consumer纹笼,那么第一個(gè) Consumer 消費(fèi) 3 個(gè)隊(duì)列纹份,第二 consumer 消費(fèi) 2 個(gè)隊(duì)列。這里采用的就是平均分配策略廷痘,它類似于我們的分頁蔓涧,TOPIC下面的所有queue就是記錄,Consumer的個(gè)數(shù)就相當(dāng)于總的頁數(shù)牍疏,那么每頁有多少條記錄,就類似于某個(gè)Consumer會消費(fèi)哪些隊(duì)列拨齐。
通過這樣的策略來達(dá)到大體上的平均消費(fèi)鳞陨,這樣的設(shè)計(jì)也可以很方面的水平擴(kuò)展Consumer來提高消費(fèi)能力。
消費(fèi)端的Push模式是通過長輪詢的模式來實(shí)現(xiàn)的瞻惋,就如同下圖:
Push模式示意圖
Consumer端每隔一段時(shí)間主動(dòng)向broker發(fā)送拉消息請求厦滤,broker在收到Pull請求后,如果有消息就立即返回?cái)?shù)據(jù)歼狼,Consumer端收到返回的消息后掏导,再回調(diào)消費(fèi)者設(shè)置的Listener方法。如果broker在收到Pull請求時(shí)羽峰,消息隊(duì)列里沒有數(shù)據(jù)趟咆,broker端會阻塞請求直到有數(shù)據(jù)傳遞或超時(shí)才返回。
當(dāng)然梅屉,Consumer端是通過一個(gè)線程將阻塞隊(duì)列LinkedBlockingQueue中的PullRequest發(fā)送到broker拉取消息值纱,以防止Consumer一致被阻塞。而Broker端坯汤,在接收到Consumer的PullRequest時(shí)虐唠,如果發(fā)現(xiàn)沒有消息,就會把PullRequest扔到ConcurrentHashMap中緩存起來惰聂。broker在啟動(dòng)時(shí)疆偿,會啟動(dòng)一個(gè)線程不停的從ConcurrentHashMap取出PullRequest檢查,直到有數(shù)據(jù)返回搓幌。
七杆故、RocketMQ的其他特性
前面的6個(gè)特性都是基本上都是點(diǎn)到為止,想要深入了解溉愁,還需要大家多多查看源碼反番,多多在實(shí)際中運(yùn)用。當(dāng)然除了已經(jīng)提到的特性外,RocketMQ還支持:
定時(shí)消息
消息的刷盤策略
主動(dòng)同步策略:同步雙寫罢缸、異步復(fù)制
海量消息堆積能力
高效通信
.......
其中涉及到的很多設(shè)計(jì)思路和解決方法都值得我們深入研究:
消息的存儲設(shè)計(jì):既要滿足海量消息的堆積能力篙贸,又要滿足極快的查詢效率,還要保證寫入的效率枫疆。
高效的通信組件設(shè)計(jì):高吞吐量爵川,毫秒級的消息投遞能力都離不開高效的通信。
.......
RocketMQ最佳實(shí)踐
一息楔、Producer最佳實(shí)踐
1寝贡、一個(gè)應(yīng)用盡可能用一個(gè) Topic,消息子類型用 tags 來標(biāo)識值依,tags 可以由應(yīng)用自由設(shè)置圃泡。只有發(fā)送消息設(shè)置了tags,消費(fèi)方在訂閱消息時(shí)愿险,才可以利用 tags 在 broker 做消息過濾颇蜡。
2、每個(gè)消息在業(yè)務(wù)層面的唯一標(biāo)識碼辆亏,要設(shè)置到 keys 字段风秤,方便將來定位消息丟失問題。由于是哈希索引扮叨,請務(wù)必保證 key 盡可能唯一缤弦,這樣可以避免潛在的哈希沖突。
3彻磁、消息發(fā)送成功或者失敗碍沐,要打印消息日志,務(wù)必要打印 sendresult 和 key 字段衷蜓。
4抢韭、對于消息不可丟失應(yīng)用,務(wù)必要有消息重發(fā)機(jī)制恍箭。例如:消息發(fā)送失敗刻恭,存儲到數(shù)據(jù)庫,能有定時(shí)程序嘗試重發(fā)或者人工觸發(fā)重發(fā)扯夭。
5鳍贾、某些應(yīng)用如果不關(guān)注消息是否發(fā)送成功,請直接使用sendOneWay方法發(fā)送消息交洗。
二骑科、Consumer最佳實(shí)踐
1、消費(fèi)過程要做到冪等(即消費(fèi)端去重)
2构拳、盡量使用批量方式消費(fèi)方式咆爽,可以很大程度上提高消費(fèi)吞吐量梁棠。
3、優(yōu)化每條消息消費(fèi)過程
三斗埂、其他配置
線上應(yīng)該關(guān)閉autoCreateTopicEnable符糊,即在配置文件中將其設(shè)置為false。
RocketMQ在發(fā)送消息時(shí)呛凶,會首先獲取路由信息男娄。如果是新的消息,由于MQServer上面還沒有創(chuàng)建對應(yīng)的Topic漾稀,這個(gè)時(shí)候模闲,如果上面的配置打開的話,會返回默認(rèn)TOPIC的(RocketMQ會在每臺broker上面創(chuàng)建名為TBW102的TOPIC)路由信息崭捍,然后Producer會選擇一臺Broker發(fā)送消息尸折,選中的broker在存儲消息時(shí),發(fā)現(xiàn)消息的topic還沒有創(chuàng)建殷蛇,就會自動(dòng)創(chuàng)建topic实夹。后果就是:以后所有該TOPIC的消息,都將發(fā)送到這臺broker上晾咪,達(dá)不到負(fù)載均衡的目的收擦。
所以基于目前RocketMQ的設(shè)計(jì)贮配,建議關(guān)閉自動(dòng)創(chuàng)建TOPIC的功能谍倦,然后根據(jù)消息量的大小,手動(dòng)創(chuàng)建TOPIC泪勒。
RocketMQ設(shè)計(jì)相關(guān)
RocketMQ的設(shè)計(jì)假定:
每臺PC機(jī)器都可能宕機(jī)不可服務(wù)
任意集群都有可能處理能力不足
最壞的情況一定會發(fā)生
內(nèi)網(wǎng)環(huán)境需要低延遲來提供最佳用戶體驗(yàn)
RocketMQ的關(guān)鍵設(shè)計(jì):
分布式集群化
強(qiáng)數(shù)據(jù)安全
海量數(shù)據(jù)堆積
毫秒級投遞延遲(推拉模式)
這是RocketMQ在設(shè)計(jì)時(shí)的假定前提以及需要到達(dá)的效果昼蛀。我想這些假定適用于所有的系統(tǒng)設(shè)計(jì)。隨著我們系統(tǒng)的服務(wù)的增多圆存,每位開發(fā)者都要注意自己的程序是否存在單點(diǎn)故障叼旋,如果掛了應(yīng)該怎么恢復(fù)、能不能很好的水平擴(kuò)展沦辙、對外的接口是否足夠高效夫植、自己管理的數(shù)據(jù)是否足夠安全...... 多多規(guī)范自己的設(shè)計(jì),才能開發(fā)出高效健壯的程序油讯。
附錄:RocketMQ涉及到的幾個(gè)專業(yè)術(shù)語和整體架構(gòu)介紹
一详民、RocketMQ中的專業(yè)術(shù)語
Topic
topic表示消息的第一級類型,比如一個(gè)電商系統(tǒng)的消息可以分為:交易消息陌兑、物流消息...... 一條消息必須有一個(gè)Topic沈跨。
Tag
Tag表示消息的第二級類型,比如交易消息又可以分為:交易創(chuàng)建消息兔综,交易完成消息..... 一條消息可以沒有Tag饿凛。RocketMQ提供2級消息分類狞玛,方便大家靈活控制。
Queue
一個(gè)topic下涧窒,我們可以設(shè)置多個(gè)queue(消息隊(duì)列)心肪。當(dāng)我們發(fā)送消息時(shí),需要要指定該消息的topic杀狡。RocketMQ會輪詢該topic下的所有隊(duì)列蒙畴,將消息發(fā)送出去。
Producer 與 Producer Group
Producer表示消息隊(duì)列的生產(chǎn)者呜象。消息隊(duì)列的本質(zhì)就是實(shí)現(xiàn)了publish-subscribe模式膳凝,生產(chǎn)者生產(chǎn)消息,消費(fèi)者消費(fèi)消息恭陡。所以這里的Producer就是用來生產(chǎn)和發(fā)送消息的蹬音,一般指業(yè)務(wù)系統(tǒng)。
Producer Group是一類Producer的集合名稱休玩,這類Producer通常發(fā)送一類消息著淆,且發(fā)送邏輯一致。
Consumer 與 Consumer Group
消息消費(fèi)者拴疤,一般由后臺系統(tǒng)異步消費(fèi)消息永部。
Push Consumer
Consumer 的一種,應(yīng)用通常向 Consumer 對象注冊一個(gè) Listener 接口呐矾,一旦收到消息苔埋,Consumer 對象立刻回調(diào) Listener 接口方法。
Pull Consumer
Consumer 的一種蜒犯,應(yīng)用通常主動(dòng)調(diào)用 Consumer 的拉消息方法從 Broker 拉消息组橄,主動(dòng)權(quán)由應(yīng)用控制。
Consumer Group是一類Consumer的集合名稱罚随,這類Consumer通常消費(fèi)一類消息玉工,且消費(fèi)邏輯一致。
Broker
消息的中轉(zhuǎn)者淘菩,負(fù)責(zé)存儲和轉(zhuǎn)發(fā)消息遵班。可以理解為消息隊(duì)列服務(wù)器潮改,提供了消息的接收狭郑、存儲、拉取和轉(zhuǎn)發(fā)服務(wù)进陡。broker是RocketMQ的核心愿阐,它不不能掛的,所以需要保證broker的高可用趾疚。
廣播消費(fèi)
一條消息被多個(gè)Consumer消費(fèi)缨历,即使這些Consumer屬于同一個(gè)Consumer Group以蕴,消息也會被Consumer Group中的每個(gè)Consumer都消費(fèi)一次。在廣播消費(fèi)中的Consumer Group概念可以認(rèn)為在消息劃分方面無意義辛孵。
集群消費(fèi)
一個(gè)Consumer Group中的Consumer實(shí)例平均分?jǐn)傁M(fèi)消息丛肮。例如某個(gè)Topic有 9 條消息,其中一個(gè)Consumer Group有 3 個(gè)實(shí)例(可能是 3 個(gè)進(jìn)程,或者 3 臺機(jī)器)魄缚,那么每個(gè)實(shí)例只消費(fèi)其中的 3 條消息宝与。
NameServer
NameServer即名稱服務(wù),兩個(gè)功能:
接收broker的請求冶匹,注冊broker的路由信息
接口client的請求习劫,根據(jù)某個(gè)topic獲取其到broker的路由信息
NameServer沒有狀態(tài),可以橫向擴(kuò)展嚼隘。每個(gè)broker在啟動(dòng)的時(shí)候會到NameServer注冊诽里;Producer在發(fā)送消息前會根據(jù)topic到NameServer獲取路由(到broker)信息;Consumer也會定時(shí)獲取topic路由信息飞蛹。
二谤狡、RocketMQ Overview
rocketmq overview
Producer向一些隊(duì)列輪流發(fā)送消息,隊(duì)列集合稱為Topic卧檐,Consumer如果做廣播消費(fèi)墓懂,則一個(gè)consumer實(shí)例消費(fèi)這個(gè)Topic對應(yīng)的所有隊(duì)列;如果做集群消費(fèi)霉囚,則多個(gè)Consumer實(shí)例平均消費(fèi)這個(gè)Topic對應(yīng)的隊(duì)列集合捕仔。
再看下RocketMQ物理部署結(jié)構(gòu)圖:
RocketMQ網(wǎng)絡(luò)部署圖
RocketMQ網(wǎng)絡(luò)部署特點(diǎn):
Name Server 是一個(gè)幾乎無狀態(tài)節(jié)點(diǎn),可集群部署佛嬉,節(jié)點(diǎn)之間無任何信息同步逻澳。
Broker部署相對復(fù)雜闸天,Broker分為Master與Slave暖呕,一個(gè)Master可以對應(yīng)多個(gè)Slave,但是一個(gè)Slave只能對應(yīng)一個(gè)Master苞氮,Master與Slave的對應(yīng)關(guān)系通過指定相同的BrokerName湾揽,不同的BrokerId來定義,BrokerId=0表示Master笼吟,非0表示Slave库物。Master也可以部署多個(gè)。每個(gè)Broker與Name Server集群中的所有節(jié)點(diǎn)建立長連接贷帮,定時(shí)注冊Topic信息到所有Name Server戚揭。
Producer與Name Server集群中的其中一個(gè)節(jié)點(diǎn)(隨機(jī)選擇)建立長連接,定期從Name Server取Topic路由信息撵枢,并向提供Topic 服務(wù)的Master建立長連接民晒,且定時(shí)向Master發(fā)送心跳精居。Producer 完全無狀態(tài),可集群部署潜必。
Consumer與Name Server集群中的其中一個(gè)節(jié)點(diǎn)(隨機(jī)選擇)建立長連接靴姿,定期從Name Server取Topic 路由信息,并向提供Topic服務(wù)的Master磁滚、Slave建立長連接佛吓,且定時(shí)向Master、Slave發(fā)送心跳垂攘。Consumer既可以從Master訂閱消息维雇,也可以從Slave訂閱消息,訂閱規(guī)則由Broker配置決定晒他。
三谆沃、其他參考資料
(如需查看請向本號發(fā)送“RocketMQ”)
RocketMQ用戶指南
RocketMQ原理簡介
RocketMQ最佳實(shí)踐
阿里分布式開放消息服務(wù)(ONS)原理與實(shí)踐2
阿里分布式開放消息服務(wù)(ONS)原理與實(shí)踐3
RocketMQ原理解析
推薦關(guān)鍵詞「分布式系統(tǒng)」閱讀:
騰訊文學(xué)內(nèi)容中心分布式文件系統(tǒng)的設(shè)計(jì)和實(shí)現(xiàn)
大型SOA架構(gòu)體系里的數(shù)據(jù)一致性問題
騰訊計(jì)費(fèi)平臺部:分布式MySQL數(shù)據(jù)庫TDSQL架構(gòu)分析
可擴(kuò)展Web架構(gòu)與分布式系統(tǒng)
注:直接點(diǎn)擊上述文章的標(biāo)題,即可查看相關(guān)文章仪芒。
版權(quán)申明:內(nèi)容來源網(wǎng)絡(luò)唁影,版權(quán)歸原創(chuàng)者所有。除非無法確認(rèn)掂名,我們都會標(biāo)明作者及出處据沈,如有侵權(quán)煩請告知,我們會立即刪除并表示歉意饺蔑。謝謝锌介。
-END-
作者:meng_philip123
鏈接:http://www.reibang.com/p/468176c6bc1b