一推姻、簡介
RocketMQ是一款阿里巴巴開源的消息中間件,在2017年9月份成為Apache的頂級項目宛官,是國內(nèi)首個互聯(lián)網(wǎng)中間件在?Apache?上的頂級項目迹缀。
RocketMQ的起源受到另一款消息中間件Kafka的啟發(fā)。最初颅围,淘寶內(nèi)部的交易系統(tǒng)使用了淘寶自主研發(fā)的Notify消息中間件伟葫,使用Mysql作為消息存儲媒介,可完全水平擴(kuò)容院促。
為了進(jìn)一步降低成本和提升寫入性能筏养,需要在存儲部分可以進(jìn)一步優(yōu)化,2011年初常拓,Linkin開源了Kafka這個優(yōu)秀的消息中間件渐溶,淘寶中間件團(tuán)隊在對Kafka做過充分Review之后,被Kafka無限消息堆積弄抬,高效的持久化速度所吸引茎辐。
不過當(dāng)時Kafka主要定位于日志傳輸,對于使用在淘寶交易掂恕、訂單拖陆、充值等場景下還有諸多特性不滿足,例如:延遲消息懊亡,消費重試依啰,事務(wù)消息,消息過濾等店枣,這些都是一些企業(yè)級消息中間件需要具備的功能速警。
為此叹誉,淘寶中間件團(tuán)隊重新用Java語言編寫了RocketMQ,定位于非日志的可靠消息傳輸闷旧。不過隨著RocketMQ的演進(jìn)长豁,現(xiàn)在也支持了日志流式處理。
二忙灼、主要模塊
Namesrv:?存儲當(dāng)前集群所有Brokers信息匠襟、Topic跟Broker的對應(yīng)關(guān)系。
Broker:?集群最核心模塊缀棍,主要負(fù)責(zé)Topic消息存儲宅此、消費者的消費位點管理(消費進(jìn)度)。
Producer:?消息生產(chǎn)者爬范。
Consumer:?消息消費者。
三弱匪、集群部署以及主要工作流程
集群部署架構(gòu)圖:
結(jié)合部署結(jié)構(gòu)圖青瀑,描述集群工作流程:
1,啟動Namesrv萧诫,Namesrv起來后監(jiān)聽端口斥难,等待Broker、Produer帘饶、Consumer連上來哑诊,相當(dāng)于一個路由控制中心。
2及刻,Broker啟動镀裤,跟所有的Namesrv保持長連接,定時發(fā)送心跳包缴饭。心跳包中包含當(dāng)前Broker信息(IP+端口等)以及存儲所有topic信息暑劝。注冊成功后,namesrv集群中就有Topic跟Broker的映射關(guān)系颗搂。
3担猛,收發(fā)消息前,先創(chuàng)建topic丢氢,創(chuàng)建topic時需要指定該topic要存儲在哪些Broker上傅联。也可以在發(fā)送消息時自動創(chuàng)建Topic。
4疚察,Producer發(fā)送消息蒸走,啟動時先跟Namesrv集群中的其中一臺建立長連接,并從Namesrv中獲取當(dāng)前發(fā)送的Topic存在哪些Broker上稍浆,然后跟對應(yīng)的Broker建立長連接载碌,直接向Broker發(fā)消息猜嘱。
5,Consumer跟Producer類似嫁艇。跟其中一臺Namesrv建立長連接朗伶,獲取當(dāng)前訂閱Topic存在哪些Broker上,然后直接跟Broker建立連接通道步咪,開始消費消息论皆。
四、相關(guān)模塊功能
1猾漫、Namesrv点晴,Namesrv用于存儲Topic、Broker關(guān)系信息悯周,功能簡單粒督,穩(wěn)定性高。多個Namesrv之間相互沒有通信禽翼,單臺Namesrv宕機(jī)不影響其他Namesrv與集群屠橄;
即使整個Namesrv集群宕機(jī),已經(jīng)正常工作的Producer闰挡,Consumer锐墙,Broker仍然能正常工作,但新起的Producer, Consumer长酗,Broker就無法工作溪北。
?2、Broker(client的發(fā)送夺脾、讀取消息等各種核心功能)
? ? ?一之拨,高并發(fā)讀寫服務(wù)
? ? ? ? ?Broker的高并發(fā)讀寫主要是依靠以下兩點:
? ? ? ??消息順序?qū)懀蠺opic數(shù)據(jù)同時只會寫一個文件(commitlog)劳翰,一個文件滿1G敦锌,再寫新文件,真正的順序?qū)懕P佳簸,使得發(fā)消息TPS大幅提高乙墙。
消息隨機(jī)讀,RocketMQ盡可能讓讀命中系統(tǒng)pagecache生均,因為操作系統(tǒng)訪問pagecache時听想,即使只訪問1K的消息,系統(tǒng)也會提前預(yù)讀出更多的數(shù)據(jù)马胧,在下次讀時就可能命中pagecache汉买,減少IO操作。
? ? ?二佩脊,負(fù)載均衡與動態(tài)伸縮
? ? ? ??負(fù)載均衡:Broker上存Topic信息蛙粘,Topic由多個隊列組成垫卤,隊列會平均分散在多個Broker上,而Producer的發(fā)送機(jī)制保證消息盡量平均分布到所有隊列中出牧,最終效果就是所有消息都平均落在每個Broker上穴肘。
? ? ? ??動態(tài)伸縮能力(非順序消息):Broker的伸縮性體現(xiàn)在兩個維度:Topic, Broker。
? ? ? ??Topic維度:假如一個Topic的消息量特別大舔痕,但集群水位壓力還是很低评抚,就可以擴(kuò)大該Topic的隊列數(shù),Topic的隊列數(shù)跟發(fā)送伯复、消費速度成正比慨代。
? ? ? ? Broker維度:如果集群水位很高了,需要擴(kuò)容啸如,直接加機(jī)器部署B(yǎng)roker就可以侍匙。Broker起來后向Namesrv注冊,Producer叮雳、Consumer通過Namesrv發(fā)現(xiàn)新Broker丈积,立即跟該Broker直連,收發(fā)消息债鸡。
? ? 三,高可用&高可靠
? ? ? ? ?高可用:集群部署時一般都為主備铛纬,備機(jī)實時從主機(jī)同步消息厌均,如果其中一個主機(jī)宕機(jī),備機(jī)提供消費服務(wù)告唆,但不提供寫服務(wù)棺弊。
? ? ? ? ?高可靠:所有發(fā)往broker的消息,有同步刷盤和異步刷盤機(jī)制擒悬;同步刷盤時模她,消息寫入物理文件才會返回成功,異步刷盤時懂牧,只有機(jī)器宕機(jī)侈净,才會產(chǎn)生消息丟失,broker掛掉可能會發(fā)生僧凤,
? ? ? ? ?但是機(jī)器宕機(jī)崩潰是很少發(fā)生的畜侦,除非突然斷電
? ? 四,Broker與Namesrv的心跳機(jī)制
單個Broker跟所有Namesrv保持心跳請求躯保,心跳間隔為30秒旋膳,心跳請求中包括當(dāng)前Broker所有的Topic信息。Namesrv會反查Broer的心跳信息途事,如果某個Broker在2分鐘之內(nèi)都沒有心跳验懊,
則認(rèn)為該Broker下線擅羞,調(diào)整Topic跟Broker的對應(yīng)關(guān)系。但此時Namesrv不會主動通知Producer义图、Consumer有Broker宕機(jī)减俏。
3、消費者(consumer)
消費者啟動時需要指定Namesrv地址歌溉,與其中一個Namesrv建立長連接垄懂。消費者每隔30秒從nameserver獲取所有topic的最新隊列情況,這意味著某個broker如果宕機(jī)痛垛,
客戶端最多要30秒才能感知草慧。連接建立后,從namesrv中獲取當(dāng)前消費Topic所涉及的Broker匙头,直連Broker漫谷。
? ??Consumer跟Broker是長連接,會每隔30秒發(fā)心跳信息到Broker蹂析。Broker端每10秒檢查一次當(dāng)前存活的Consumer舔示,若發(fā)現(xiàn)某個Consumer 2分鐘內(nèi)沒有心跳,
就斷開與該Consumer的連接电抚,并且向該消費組的其他實例發(fā)送通知惕稻,觸發(fā)該消費者集群的負(fù)載均衡。
? ?消費者端的負(fù)載均衡
? ? ? 先討論消費者的消費模式蝙叛,消費者有兩種模式消費:集群消費俺祠,廣播消費。
? ? ??廣播消費:每個消費者消費Topic下的所有隊列借帘。
集群消費:一個topic可以由同一個ID下所有消費者分擔(dān)消費蜘渣。具體例子:假如TopicA有6個隊列,某個消費者ID起了2個消費者實例肺然,那么每個消費者負(fù)責(zé)消費3個隊列蔫缸。如果再增加一個消費者ID相同消費者實例,
即當(dāng)前共有3個消費者同時消費6個隊列际起,那每個消費者負(fù)責(zé)2個隊列的消費拾碌。
消費者端的負(fù)載均衡,就是集群消費模式下加叁,同一個ID的所有消費者實例平均消費該Topic的所有隊列倦沧。
?4、生產(chǎn)者(Producer)
? ? ?Producer啟動時它匕,也需要指定Namesrv的地址展融,從Namesrv集群中選一臺建立長連接。如果該Namesrv宕機(jī)豫柬,會自動連其他Namesrv告希。直到有可用的Namesrv為止扑浸。
? ? ?生產(chǎn)者每30秒從Namesrv獲取Topic跟Broker的映射關(guān)系,更新到本地內(nèi)存中燕偶。再跟Topic涉及的所有Broker建立長連接喝噪,每隔30秒發(fā)一次心跳。在Broker端也會每10秒掃描一次當(dāng)前注冊的Producer指么,
如果發(fā)現(xiàn)某個Producer超過2分鐘都沒有發(fā)心跳酝惧,則斷開連接。
? ??生產(chǎn)者端的負(fù)載均衡
? ??生產(chǎn)者發(fā)送時伯诬,會自動輪詢當(dāng)前所有可發(fā)送的broker晚唇,一條消息發(fā)送成功,下次換另外一個broker發(fā)送盗似,以達(dá)到消息平均落到所有的broker上哩陕。
五、服務(wù)端主要邏輯概念
? ? ? 服務(wù)端邏輯存儲圖:
1赫舒、Consumerqueue:producer悍及、consumer與broker上某個topic打交道時都要通過cq,可以是認(rèn)為是實際物理消息的一個索引接癌,記錄則物理消息在commitlog上的offset心赶,同時維護(hù)則consumer的消費進(jìn)度offset;
? 2缺猛、IndexFile:MessageStore中存儲的消息除了通過ConsumeQueue提供給consumer消費之外园担,還支持通過MessageID或者M(jìn)essageKey來查詢消息;使用ID查詢時枯夜,因為ID就是用broker+offset生成的(這里msgId指的是服務(wù)端的),
所以很容易就找到對應(yīng)的commitLog文件來讀取消息艰山。對于用MessageKey來查詢消息湖雹,MessageStore通過構(gòu)建一個index來提高讀取速度
整個slotTable+indexLinkedList可以理解成java的HashMap。每當(dāng)放一個新的消息的index進(jìn)來曙搬,首先取MessageKey的hashCode摔吏,然后用hashCode對slot總數(shù)取模,得到應(yīng)該放到哪個slot中纵装,slot總數(shù)系統(tǒng)默認(rèn)500W個征讲。
只要是取hash就必然面臨hash沖突的問題,跟HashMap一樣橡娄,IndexFile也是使用一個鏈表結(jié)構(gòu)來解決hash沖突诗箍。只是這里跟HashMap稍微有點區(qū)別的地方是,slot中放的是最新index的指針挽唉。這個是因為一般查詢的時候肯定是優(yōu)先查最近的消息滤祖。
每個slot中放的指針值是索引在indexFile中的偏移量筷狼,如上圖,每個索引大小是20字節(jié)匠童,所以根據(jù)當(dāng)前索引是這個文件中的第幾個(偏移量)埂材,就很容易定位到索引的位置。然后每個索引都保存了跟它同一個slot的前一個索引的位置汤求。
?3耀鸦、Commitlog:消息的物理存儲文件仍劈,默認(rèn)一個1G大小,超過1G重新創(chuàng)建文件;
?4参咙、Topic:消息的邏輯上的一個分組的概念,相同類型的消息投遞到一個topic上甩十,producer和consumer與broker打交道都是通過topic維度來進(jìn)行濒募;
?5、Tag:為了業(yè)務(wù)細(xì)分在topic基礎(chǔ)上對消息打的一個標(biāo)記赊颠,同一個消息可以被打一個或者多個tag格二,消費者消費消息時可以根據(jù)一個或者多個tag進(jìn)行過濾;
?六竣蹦、Consumer拉取消息兩種模式:
? ? ? pull模式:主動拉取的模式顶猜,需要設(shè)置topic和每次拉取的條數(shù),具體需要拉取多少消息需要在代碼里寫痘括;
? ? ? push模式:主動推送模式长窄,表象是服務(wù)端有消息了主動推動給消費者,實際實現(xiàn)上是通過后臺啟動單獨拉取消息線程不斷的拉取服務(wù)端消息纲菌;
?七挠日、順序消息:
某些業(yè)務(wù)需要實現(xiàn)消息的順序性來保持業(yè)務(wù)的正常進(jìn)行,rocketmq不嚴(yán)格支持順序消息翰舌,可以通過topic下定義一個queue或者將幾條順序的消息發(fā)送到一個queue上實現(xiàn)(可以通過發(fā)送消息時producer的對象選擇器實現(xiàn))嚣潜;
?八、事務(wù)消息:
有些業(yè)務(wù)需要實現(xiàn)事務(wù)消息椅贱,在業(yè)務(wù)被提交時消息才能被消費懂算,當(dāng)業(yè)務(wù)被取消是則不消費當(dāng)前消息,rocketmq實現(xiàn)事務(wù)消息主要是通過consumerqueue來實現(xiàn)事務(wù)消息庇麦,由于消費者消費消息都是先通過consumerqueue來拿到消息的物理位置计技,
只有普通消息或者被提交的事務(wù)消息才被放到consumerqueue里才能被 消費;
?九山橄、服務(wù)端保存消息機(jī)制:
? ? ? ?服務(wù)端支持同步消息或者異步消息垮媒,同步消息每條消息都會被實時刷盤,異步消息是先被寫到PAGECACHE,消息累積到一定量才被寫到磁盤涣澡;
? ? ? ?消息支持物理存儲贱呐,但是磁盤大小是有限的,默認(rèn)存儲48小時刪除或者存儲占用磁盤空間70%時刪除老文件入桂;
?十奄薇、延時消息
? ? ??RocketMQ 開源版本延遲消息臨時存儲在一個內(nèi)部主題中,不支持任意時間精度抗愁,支持特定的 level馁蒂,例如定時 5s,10s蜘腌,1m 等
? ? ? Broker端內(nèi)置延遲消息處理能力沫屡,核心實現(xiàn)思路都是一樣:將延遲消息通過一個臨時存儲進(jìn)行暫存,到期后才投遞到目標(biāo)Topic中撮珠。如下圖所示
步驟說明如下:
producer要將一個延遲消息發(fā)送到某個Topic中
Broker判斷這是一個延遲消息后沮脖,將其通過臨時存儲進(jìn)行暫存。
Broker內(nèi)部通過一個延遲服務(wù)(delay service)檢查消息是否到期芯急,將到期的消息投遞到目標(biāo)Topic中勺届。這個的延遲服務(wù)名字為delay service,不同消息中間件的延遲服務(wù)模塊名稱可能不同娶耍。
消費者消費目標(biāo)topic中的延遲投遞的消息
? ?十一免姿、消費進(jìn)度重置
? ? ? ? 如果想對已經(jīng)消費過的消息進(jìn)行重新消費可以對消費進(jìn)度進(jìn)行重置重新消費;