RocketMQ—(總結(jié))一篇就搞懂RocketMQ

一、RocketMQ 是什么?

image.png
  • 是一個隊列模型的消息中間件炼七,具有高性能缆巧、高可靠、高實時豌拙、分布式特點陕悬;

  • Producer、Consumer按傅、隊列都可以分布式捉超;

  • Producer向一些隊列輪流發(fā)送消息,隊列集合稱為Topic唯绍,Consumer如果做廣播消費拼岳,則一個consumer實例消費這個Topic對應(yīng)的所有隊列,如果做集群消費况芒,則多個Consumer實例平均消費這個topic對應(yīng)的隊列集合惜纸;

  • 能夠保證嚴(yán)格的消息順序;

  • 提供豐富的消息拉取模式绝骚;

  • 高效的訂閱者水平擴(kuò)展能力耐版;

  • 實時的消息訂閱機(jī)制;

  • 億級消息堆積能力皮壁;

  • 較少的依賴椭更。

二、RocketMQ基本概念

2.1 消息模型(Message Model)

RocketMQ主要由Producer蛾魄、Broker虑瀑、Consumer三部分組成湿滓,其中Producer負(fù)責(zé)生產(chǎn)消息,Consumer負(fù)責(zé)消費消息舌狗,Broker負(fù)責(zé)存儲消息叽奥。Broker在實際部署過程中對應(yīng)一臺服務(wù)器,每個Broker可以存儲多個Topic的消息痛侍,每個Topic的消息也可以分片存儲于不同的Broker朝氓。MessageQueue用于存儲消息的物理地址,每個Topic中的消息地址存儲于多個MessageQueue中主届。ConsumerGroup由多個Consumer實例構(gòu)成赵哲。

2.2 消息生產(chǎn)者(Producer)

負(fù)責(zé)生產(chǎn)消息,一般由業(yè)務(wù)系統(tǒng)負(fù)責(zé)生產(chǎn)消息君丁。一個消息生產(chǎn)者會把業(yè)務(wù)應(yīng)用系統(tǒng)里產(chǎn)生的消息發(fā)送到broker服務(wù)器枫夺。RocketMQ提供多種發(fā)送方式,同步發(fā)送绘闷、異步發(fā)送橡庞、順序發(fā)送、單向發(fā)送印蔗。同步和異步方式均需要Broker返回確認(rèn)信息扒最,單向發(fā)送不需要。

2.3 消息消費者(Consumer)

負(fù)責(zé)消費消息华嘹,一般是后臺系統(tǒng)負(fù)責(zé)異步消費吧趣。一個消息消費者會從Broker服務(wù)器拉取消息、并將其提供給應(yīng)用程序耙厚。從用戶應(yīng)用的角度而言提供了兩種消費形式:拉取式消費再菊、推動式消費。

2.4 主題(Topic)

表示一類消息的集合颜曾,每個主題包含若干條消息,每條消息只能屬于一個主題秉剑,是RocketMQ進(jìn)行消息訂閱的基本單位泛豪。

2.5 代理服務(wù)器(Broker Server)

消息中轉(zhuǎn)角色,負(fù)責(zé)存儲消息侦鹏、轉(zhuǎn)發(fā)消息诡曙。代理服務(wù)器在RocketMQ系統(tǒng)中負(fù)責(zé)接收從生產(chǎn)者發(fā)送來的消息并存儲、同時為消費者的拉取請求作準(zhǔn)備略水。代理服務(wù)器也存儲消息相關(guān)的元數(shù)據(jù)价卤,包括消費者組、消費進(jìn)度偏移和主題和隊列消息等渊涝。

2.6 名字服務(wù)(Name Server)

名稱服務(wù)充當(dāng)路由消息的提供者慎璧。生產(chǎn)者或消費者能夠通過名字服務(wù)查找各主題相應(yīng)的BrokerIP列表床嫌。多個Namesrv實例組成集群,但相互獨立胸私,沒有信息交換厌处。

2.7 拉取式消費(Pull Consumer)

Consumer消費的一種類型,應(yīng)用通常主動調(diào)用Consumer的拉消息方法從Broker服務(wù)器拉消息岁疼、主動權(quán)由應(yīng)用控制阔涉。一旦獲取了批量消息,應(yīng)用就會啟動消費過程捷绒。

2.8 推動式消費(Push Consumer)

Consumer消費的一種類型瑰排,該模式下Broker收到數(shù)據(jù)后會主動推送給消費端,該消費模式一般實時性較高暖侨。

2.9 生產(chǎn)者組(Producer Group)

同一類Producer的集合椭住,這類Producer發(fā)送同一類消息且發(fā)送邏輯一致。如果發(fā)送的是事務(wù)消息且原始生產(chǎn)者在發(fā)送之后崩潰它碎,則Broker服務(wù)器會聯(lián)系同一生產(chǎn)者組的其他生產(chǎn)者實例以提交或回溯消費函荣。

2.10 消費者組(Consumer Group)

同一類Consumer的集合,這類Consumer通常消費同一類消息且消費邏輯一致扳肛。消費者組使得在消息消費方面傻挂,實現(xiàn)負(fù)載均衡和容錯的目標(biāo)變得非常容易。要注意的是挖息,消費者組的消費者實例必須訂閱完全相同的Topic金拒。RocketMQ支持兩種消息模式:集群消費(Clustering)和廣播消費(Broadcasting)。

2.11 集群消費(Clustering)

集群消費模式下,相同Consumer Group的每個Consumer實例平均分?jǐn)傁ⅰ?/p>

2.12 廣播消費(Broadcasting)

廣播消費模式下套腹,相同Consumer Group的每個Consumer實例都接收全量的消息绪抛。

2.13 普通順序消息(Normal Ordered Message)

普通順序消費模式下,消費者通過同一個消息隊列(Topic分區(qū)电禀,稱作Message Queue)收到的消息是有順序的幢码,不同消息隊列收到的消息則可能是無順序的。

2.14 嚴(yán)格順序消息(Strictly Ordered Message)

嚴(yán)格順序消息模式下尖飞,消費者收到的所有消息均是有順序的症副。

2.15 消息(Message)

消息系統(tǒng)所傳輸信息的物理載體,生產(chǎn)和消費數(shù)據(jù)的最小單位政基,每條消息必須屬于一個主題贞铣。RocketMQ中每個消息擁有唯一的Message ID,且可以攜帶具有業(yè)務(wù)標(biāo)識的Key沮明。系統(tǒng)提供了通過Message ID和Key查詢消息的功能辕坝。

2.16 標(biāo)簽(Tag)

為消息設(shè)置的標(biāo)志,用于同一主題下區(qū)分不同類型的消息荐健。來自同一業(yè)務(wù)單元的消息酱畅,可以根據(jù)不同業(yè)務(wù)目的在同一主題下設(shè)置不同標(biāo)簽琳袄。標(biāo)簽?zāi)軌蛴行У乇3执a的清晰度和連貫性,并優(yōu)化RocketMQ提供的查詢系統(tǒng)圣贸。消費者可以根據(jù)Tag實現(xiàn)對不同子主題的不同消費邏輯灾杰,實現(xiàn)更好的擴(kuò)展性渺贤。

三、RocketMQ特性(features)

3.1 訂閱與發(fā)布

消息的發(fā)布是指某個生產(chǎn)者向某個topic發(fā)送消息;消息的訂閱是指某個消費者關(guān)注了某個topic中帶有某些tag的消息甫恩,進(jìn)而從該topic消費數(shù)據(jù)杭抠。

3.2 消息順序

消息有序指的是一類消息消費時羊始,能按照發(fā)送的順序來消費谱俭。例如:一個訂單產(chǎn)生了三條消息分別是訂單創(chuàng)建、訂單付款啄骇、訂單完成痴鳄。消費時要按照這個順序消費才能有意義,但是同時訂單之間是可以并行消費的缸夹。RocketMQ可以嚴(yán)格的保證消息有序痪寻。順序消息分為全局順序消息與分區(qū)順序消息,全局順序是指某個Topic下的所有消息都要保證順序虽惭;部分順序消息只要保證每一組消息被順序消費即可橡类。

  • 全局順序

    對于指定的一個Topic,所有消息按照嚴(yán)格的先入先出(FIFO)的順序進(jìn)行發(fā)布和消費芽唇。適用場景:性能要求不高顾画,所有的消息嚴(yán)格按照FIFO 原則進(jìn)行消息發(fā)布和消費的場景;

  • 分區(qū)順序

    對于指定的一個Topic匆笤,所有消息根據(jù) sharding key 進(jìn)行區(qū)塊分區(qū)研侣。同一個分區(qū)內(nèi)的消息按照嚴(yán)格的FIFO順序進(jìn)行發(fā)布和消費。Sharding key是順序消息中用來區(qū)分不同分區(qū)的關(guān)鍵字段炮捧,和普通消息的Key是完全不同的概念庶诡。

    適用場景:性能要求高,以sharding key作為分區(qū)字段咆课,在同一個區(qū)塊中嚴(yán)格的按照FIFO原則進(jìn)行消息發(fā)布和消費的場景灌砖。

3.3 消息過濾

RocketMQ的消費者可以根據(jù)Tag進(jìn)行消息過濾,也支持自定義屬性過濾傀蚌。消息過濾目前是在Broker端實現(xiàn)的,優(yōu)點是減少了對于Consumer無用消息的網(wǎng)絡(luò)傳輸蘸吓,缺點是增加了Broker的負(fù)擔(dān)善炫、而且實現(xiàn)相對復(fù)雜。

3.4 消息可靠性

RocketMQ支持消息的高可靠库继,影響消息可靠性的幾種情況:

  1. Broker非正常關(guān)閉箩艺;

  2. Broker異常Crash窜醉;

  3. OS Crash;

  4. 機(jī)器掉電艺谆,但是能立即恢復(fù)供電情況榨惰;

  5. 機(jī)器無法開機(jī)(可能是cpu、主板静汤、內(nèi)存等關(guān)鍵設(shè)備損壞)琅催;

  6. 磁盤設(shè)備損壞;

1虫给、2藤抡、3、4 四種情況都屬于硬件資源可立即恢復(fù)情況抹估,RocketMQ在這四種情況下能保證消息不丟缠黍,或者丟失少量數(shù)據(jù)(依賴刷盤方式是同步還是異步)。5药蜻、6屬于單點故障瓷式,且無法恢復(fù),一旦發(fā)生语泽,在此單點上的消息全部丟失贸典。RocketMQ在這兩種情況下,通過異步復(fù)制湿弦,可保證99%的消息不丟瓤漏,但是仍然會有極少量的消息可能丟失。通過同步雙寫技術(shù)可以完全避免單點颊埃,同步雙寫勢必會影響性能蔬充,適合對消息可靠性要求極高的場合,例如與Money相關(guān)的應(yīng)用班利。注:RocketMQ從3.0版本開始支持同步雙寫饥漫。

3.5 至少一次

至少一次(At least Once)指每個消息必須投遞一次。Consumer先Pull消息到本地罗标,消費完成后庸队,才向服務(wù)器返回ack,如果沒有消費一定不會ack消息闯割,所以RocketMQ可以很好的支持此特性彻消。

3.6 回溯消費

回溯消費是指Consumer已經(jīng)消費成功的消息,由于業(yè)務(wù)上需求需要重新消費宙拉,要支持此功能宾尚,Broker在向Consumer投遞成功消息后,消息仍然需要保留。并且重新消費一般是按照時間維度煌贴,例如由于Consumer系統(tǒng)故障御板,恢復(fù)后需要重新消費1小時前的數(shù)據(jù),那么Broker要提供一種機(jī)制牛郑,可以按照時間維度來回退消費進(jìn)度怠肋。RocketMQ支持按照時間回溯消費,時間維度精確到毫秒淹朋。

3.7 事務(wù)消息

RocketMQ事務(wù)消息(Transactional Message)是指應(yīng)用本地事務(wù)和發(fā)送消息操作可以被定義到全局事務(wù)中笙各,要么同時成功,要么同時失敗瑞你。RocketMQ的事務(wù)消息提供類似X/Open XA的分布事務(wù)功能酪惭,通過事務(wù)消息能達(dá)到分布式事務(wù)的最終一致。

3.8 定時消息

定時消息(延遲隊列)是指消息發(fā)送到broker后者甲,不會立即被消費春感,等待特定時間投遞給真正的topic。broker有配置項messageDelayLevel虏缸,默認(rèn)值為“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”鲫懒,18個level」粽蓿可以配置自定義messageDelayLevel窥岩。注意,messageDelayLevel是broker的屬性宰缤,不屬于某個topic颂翼。發(fā)消息時,設(shè)置delayLevel等級即可:msg.setDelayLevel(level)慨灭。level有以下三種情況:

  • level == 0朦乏,消息為非延遲消息;

  • 1 <= level <= maxLevel氧骤,消息延遲特定時間呻疹,例如level == 1,延遲1s筹陵;

  • level > maxLevel刽锤,則leve l== maxLevel,例如level == 20朦佩,延遲2h并思。

定時消息會暫存在名為SCHEDULE_TOPIC_XXXX的topic中,并根據(jù)delayTimeLevel存入特定的queue语稠,queueId = delayTimeLevel – 1纺荧,即一個queue只存相同延遲的消息,保證具有相同發(fā)送延遲的消息能夠順序消費。broker會調(diào)度地消費SCHEDULE_TOPIC_XXXX宙暇,將消息寫入真實的topic。需要注意的是议泵,定時消息會在第一次寫入和調(diào)度寫入真實topic時都會計數(shù)占贫,因此發(fā)送數(shù)量、tps都會變高先口。

3.9 消息重試

Consumer消費消息失敗后型奥,要提供一種重試機(jī)制,令消息再消費一次碉京。Consumer消費消息失敗通诚嵝冢可以認(rèn)為有以下幾種情況:

  • 由于消息本身的原因,例如反序列化失敗谐宙,消息數(shù)據(jù)本身無法處理(例如話費充值烫葬,當(dāng)前消息的手機(jī)號被注銷,無法充值)等凡蜻。這種錯誤通常需要跳過這條消息搭综,再消費其它消息,而這條失敗的消息即使立刻重試消費划栓,99%也不成功兑巾,所以最好提供一種定時重試機(jī)制,即過10秒后再重試忠荞。

  • 由于依賴的下游應(yīng)用服務(wù)不可用蒋歌,例如db連接不可用,外系統(tǒng)網(wǎng)絡(luò)不可達(dá)等委煤。遇到這種錯誤堂油,即使跳過當(dāng)前失敗的消息,消費其他消息同樣也會報錯素标。這種情況建議應(yīng)用sleep 30s称诗,再消費下一條消息,這樣可以減輕Broker重試消息的壓力头遭。

RocketMQ會為每個消費組都設(shè)置一個Topic名稱為“%RETRY%+consumerGroup”的重試隊列(這里需要注意的是寓免,這個Topic的重試隊列是針對消費組,而不是針對每個Topic設(shè)置的)计维,用于暫時保存因為各種異常而導(dǎo)致Consumer端無法消費的消息袜香。考慮到異出昊蹋恢復(fù)起來需要一些時間蜈首,會為重試隊列設(shè)置多個重試級別,每個重試級別都有與之對應(yīng)的重新投遞延時,重試次數(shù)越多投遞延時就越大欢策。RocketMQ對于重試消息的處理是先保存至Topic名稱為“SCHEDULE_TOPIC_XXXX”的延遲隊列中吆寨,后臺定時任務(wù)按照對應(yīng)的時間進(jìn)行Delay后重新保存至“%RETRY%+consumerGroup”的重試隊列中。

3.10 消息重投

生產(chǎn)者在發(fā)送消息時踩寇,同步消息失敗會重投啄清,異步消息有重試,oneway沒有任何保證俺孙。消息重投保證消息盡可能發(fā)送成功辣卒、不丟失,但可能會造成消息重復(fù)睛榄,消息重復(fù)在RocketMQ中是無法避免的問題荣茫。消息重復(fù)在一般情況下不會發(fā)生,當(dāng)出現(xiàn)消息量大场靴、網(wǎng)絡(luò)抖動啡莉,消息重復(fù)就會是大概率事件。另外憎乙,生產(chǎn)者主動重發(fā)票罐、consumer負(fù)載變化也會導(dǎo)致重復(fù)消息。如下方法可以設(shè)置消息重試策略:

  • retryTimesWhenSendFailed:同步發(fā)送失敗重投次數(shù)泞边,默認(rèn)為2该押,因此生產(chǎn)者會最多嘗試發(fā)送retryTimesWhenSendFailed + 1次。不會選擇上次失敗的broker阵谚,嘗試向其他broker發(fā)送蚕礼,最大程度保證消息不丟。超過重投次數(shù)梢什,拋出異常奠蹬,由客戶端保證消息不丟。當(dāng)出現(xiàn)RemotingException嗡午、MQClientException和部分MQBrokerException時會重投囤躁。

  • retryTimesWhenSendAsyncFailed:異步發(fā)送失敗重試次數(shù),異步重試不會選擇其他broker荔睹,僅在同一個broker上做重試狸演,不保證消息不丟。

  • retryAnotherBrokerWhenNotStoreOK:消息刷盤(主或備)超時或slave不可用(返回狀態(tài)非SEND_OK)僻他,是否嘗試發(fā)送到其他broker宵距,默認(rèn)false。十分重要消息可以開啟吨拗。

3.11 流量控制

生產(chǎn)者流控满哪,因為broker處理能力達(dá)到瓶頸婿斥;消費者流控,因為消費能力達(dá)到瓶頸哨鸭。

生產(chǎn)者流控:

  • commitLog文件被鎖時間超過osPageCacheBusyTimeOutMills時民宿,參數(shù)默認(rèn)為1000ms,返回流控像鸡。

  • 如果開啟:

    transientStorePoolEnable == true

    并且broker為異步刷盤的主機(jī)勘高,且transientStorePool中資源不足,拒絕當(dāng)前send請求坟桅,返回流控。

  • broker每隔10ms檢查send請求隊列頭部請求的等待時間蕊蝗,如果超過waitTimeMillsInSendQueue仅乓,默認(rèn)200ms,拒絕當(dāng)前send請求蓬戚,返回流控夸楣。

  • broker通過拒絕send 請求方式實現(xiàn)流量控制。

注意子漩,生產(chǎn)者流控豫喧,不會嘗試消息重投。

消費者流控:

  • 消費者本地緩存消息數(shù)超過pullThresholdForQueue時幢泼,默認(rèn)1000紧显。

  • 消費者本地緩存消息大小超過pullThresholdSizeForQueue時,默認(rèn)100MB缕棵。

  • 消費者本地緩存消息跨度超過consumeConcurrentlyMaxSpan時孵班,默認(rèn)2000。

  • 消費者流控的結(jié)果是降低拉取頻率招驴。

3.12 死信隊列

死信隊列用于處理無法被正常消費的消息篙程。當(dāng)一條消息初次消費失敗,消息隊列會自動進(jìn)行消息重試别厘;達(dá)到最大重試次數(shù)后虱饿,若消費依然失敗,則表明消費者在正常情況下無法正確地消費該消息触趴,此時氮发,消息隊列 不會立刻將消息丟棄,而是將其發(fā)送到該消費者對應(yīng)的特殊隊列中雕蔽。RocketMQ將這種正常情況下無法被消費的消息稱為死信消息(Dead-Letter Message)折柠,將存儲死信消息的特殊隊列稱為死信隊列(Dead-Letter Queue)。在RocketMQ中批狐,可以通過使用console控制臺對死信隊列中的消息進(jìn)行重發(fā)來使得消費者實例再次進(jìn)行消費扇售。

四前塔、RocketMQ 設(shè)計

消息存儲

image.png

消息存儲是RocketMQ中最為復(fù)雜和最為重要的一部分,接下來分別從RocketMQ的消息存儲整體架構(gòu)承冰、RocketMQ中兩種不同的刷盤方式來展開敘述华弓。

消息存儲整體架構(gòu)

消息存儲架構(gòu)圖中最重要的三個跟消息存儲相關(guān)的文件構(gòu)成

  • CommitLog

    消息主體以及元數(shù)據(jù)的存儲主體,存儲Producer端寫入的消息主體內(nèi)容,消息內(nèi)容不是定長的困乒。單個文件大小默認(rèn)1G, 文件名長度為20位寂屏,左邊補(bǔ)零,剩余為起始偏移量娜搂,比如00000000000000000000代表了第一個文件迁霎,起始偏移量為0,文件大小為1G=1073741824百宇;當(dāng)?shù)谝粋€文件寫滿了考廉,第二個文件為00000000001073741824,起始偏移量為1073741824携御,以此類推昌粤。消息主要是順序?qū)懭肴罩疚募?dāng)文件滿了啄刹,寫入下一個文件

  • ConsumeQueue

    消息消費隊列涮坐,引入的目的主要是提高消息消費的性能,由于RocketMQ是基于主題topic的訂閱模式誓军,消息消費是針對主題進(jìn)行的袱讹,如果要遍歷commitlog文件中根據(jù)topic檢索消息是非常低效的。Consumer即可根據(jù)ConsumeQueue來查找待消費的消息谭企。

    其中廓译,ConsumeQueue(邏輯消費隊列)作為消費消息的索引,保存了指定Topic下的隊列消息在CommitLog中的起始物理偏移量offset债查,消息大小size和消息Tag的HashCode值非区。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夾的組織方式如下:topic/queue/file三層組織結(jié)構(gòu)盹廷,具體存儲路徑為:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}征绸。

    同樣consumequeue文件采取定長設(shè)計,每一個條目共20個字節(jié)俄占,分別為8字節(jié)的commitlog物理偏移量管怠、4字節(jié)的消息長度、8字節(jié)tag hashcode缸榄,單個文件由30W個條目組成渤弛,可以像數(shù)組一樣隨機(jī)訪問每一個條目视哑,每個ConsumeQueue文件大小約5.72M圃伶;

  • IndexFile

    IndexFile(索引文件)提供了一種可以通過key或時間區(qū)間來查詢消息的方法扣草。Index文件的存儲位置是:{fileName}间影,文件名fileName是以創(chuàng)建時的時間戳命名的,固定的單個IndexFile文件大小約為400M晴氨,一個IndexFile可以保存 2000W個索引康嘉,IndexFile的底層存儲設(shè)計為在文件系統(tǒng)中實現(xiàn)HashMap結(jié)構(gòu),故rocketmq的索引文件其底層實現(xiàn)為hash索引籽前。

在RocketMQ的消息存儲整體架構(gòu)圖中可以看出亭珍,RocketMQ采用的是混合型的存儲結(jié)構(gòu),即為Broker單個實例下所有的隊列共用一個日志數(shù)據(jù)文件(即為CommitLog)來存儲枝哄。RocketMQ的混合型存儲結(jié)構(gòu)(多個Topic的消息實體內(nèi)容都存儲于一個CommitLog中)針對Producer和Consumer分別采用了數(shù)據(jù)和索引部分相分離的存儲結(jié)構(gòu)肄梨,Producer發(fā)送消息至Broker端,然后Broker端使用同步或者異步的方式對消息刷盤持久化挠锥,保存至CommitLog中峭范。只要消息被刷盤持久化至磁盤文件CommitLog中,那么Producer發(fā)送的消息就不會丟失瘪贱。正因為如此,Consumer也就肯定有機(jī)會去消費這條消息辆毡。當(dāng)無法拉取到消息后菜秦,可以等下一次消息拉取,同時服務(wù)端也支持長輪詢模式舶掖,如果一個消息拉取請求未拉取到消息球昨,Broker允許等待30s的時間,只要這段時間內(nèi)有新消息到達(dá)眨攘,將直接返回給消費端主慰。這里,RocketMQ的具體做法是鲫售,使用Broker端的后臺服務(wù)線程—ReputMessageService不停地分發(fā)請求并異步構(gòu)建ConsumeQueue(邏輯消費隊列)和IndexFile(索引文件)數(shù)據(jù)共螺。

消息刷盤

image.png
  • 同步刷盤

    只有在消息真正持久化至磁盤后RocketMQ的Broker端才會真正返回給Producer端一個成功的ACK響應(yīng)。同步刷盤對MQ消息可靠性來說是一種不錯的保障情竹,但是性能上會有較大影響藐不,一般適用于金融業(yè)務(wù)應(yīng)用該模式較多。

  • 異步刷盤

    能夠充分利用OS的PageCache的優(yōu)勢秦效,只要消息寫入PageCache即可將成功的ACK返回給Producer端雏蛮。消息刷盤采用后臺異步線程提交的方式進(jìn)行,降低了讀寫延遲阱州,提高了MQ的性能和吞吐量挑秉。

五、RocketMQ 技術(shù)架構(gòu)

image.png

RocketMQ架構(gòu)上主要分為四部分苔货,如上圖所示:

  • Producer:

    消息發(fā)布的角色犀概,支持分布式集群方式部署立哑。Producer通過MQ的負(fù)載均衡模塊選擇相應(yīng)的Broker集群隊列進(jìn)行消息投遞,投遞的過程支持快速失敗并且低延遲阱冶。

  • NameServer:

    NameServer是一個非常簡單的Topic路由注冊中心刁憋,其角色類似Dubbo中的zookeeper,支持Broker的動態(tài)注冊與發(fā)現(xiàn)木蹬。

    主要包括兩個功能:Broker管理至耻,NameServer接受Broker集群的注冊信息并且保存下來作為路由信息的基本數(shù)據(jù)。然后提供心跳檢測機(jī)制镊叁,檢查Broker是否還存活尘颓;路由信息管理,每個NameServer將保存關(guān)于Broker集群的整個路由信息和用于客戶端查詢的隊列信息晦譬。然后Producer和Conumser通過NameServer就可以知道整個Broker集群的路由信息疤苹,從而進(jìn)行消息的投遞和消費。NameServer通常也是集群的方式部署敛腌,各實例間相互不進(jìn)行信息通訊卧土。

    Broker是向每一臺NameServer注冊自己的路由信息,所以每一個NameServer實例上面都保存一份完整的路由信息像樊。當(dāng)某個NameServer因某種原因下線了尤莺,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以動態(tài)感知Broker的路由的信息生棍。

  • BrokerServer:

    Broker主要負(fù)責(zé)消息的存儲颤霎、投遞和查詢以及服務(wù)高可用保證,為了實現(xiàn)這些功能涂滴,Broker包含了以下幾個重要子模塊友酱。

  • Remoting Module:

    整個Broker的實體,負(fù)責(zé)處理來自clients端的請求柔纵。

  • Client Manager:

    負(fù)責(zé)管理客戶端(Producer/Consumer)和維護(hù)Consumer的Topic訂閱信息缔杉。

  • Store Service:

    提供方便簡單的API接口處理消息存儲到物理硬盤和查詢功能。

  • HA Service:

    高可用服務(wù)搁料,提供Master Broker 和 Slave Broker之間的數(shù)據(jù)同步功能壮吩。

  • Index Service:

    根據(jù)特定的Message key對投遞到Broker的消息進(jìn)行索引服務(wù),以提供消息的快速查詢加缘。

image.png

六鸭叙、RocketMQ 部署架構(gòu)

image.png

RocketMQ 網(wǎng)絡(luò)部署特點

  • NameServer是一個幾乎無狀態(tài)節(jié)點,可集群部署拣宏,節(jié)點之間無任何信息同步沈贝。

  • Broker部署相對復(fù)雜,Broker分為Master與Slave勋乾,一個Master可以對應(yīng)多個Slave宋下,但是一個Slave只能對應(yīng)一個Master嗡善,Master與Slave 的對應(yīng)關(guān)系通過指定相同的BrokerName,不同的BrokerId 來定義学歧,BrokerId為0表示Master罩引,非0表示Slave。

    Master也可以部署多個枝笨。每個Broker與NameServer集群中的所有節(jié)點建立長連接袁铐,定時注冊Topic信息到所有NameServer。注意:當(dāng)前RocketMQ版本在部署架構(gòu)上支持一Master多Slave横浑,但只有BrokerId=1的從服務(wù)器才會參與消息的讀負(fù)載剔桨。

  • Producer與NameServer集群中的其中一個節(jié)點(隨機(jī)選擇)建立長連接,定期從NameServer獲取Topic路由信息徙融,并向提供Topic 服務(wù)的Master建立長連接洒缀,且定時向Master發(fā)送心跳。Producer完全無狀態(tài)欺冀,可集群部署树绩。

  • Consumer與NameServer集群中的其中一個節(jié)點(隨機(jī)選擇)建立長連接,定期從NameServer獲取Topic路由信息隐轩,并向提供Topic服務(wù)的Master葱峡、Slave建立長連接,且定時向Master龙助、Slave發(fā)送心跳。

    Consumer既可以從Master訂閱消息蛛芥,也可以從Slave訂閱消息提鸟,消費者在向Master拉取消息時,Master服務(wù)器會根據(jù)拉取偏移量與最大偏移量的距離(判斷是否讀老消息仅淑,產(chǎn)生讀I/O)称勋,以及從服務(wù)器是否可讀等因素建議下一次是從Master還是Slave拉取。

結(jié)合部署架構(gòu)圖涯竟,描述集群工作流程:

  • 啟動NameServer赡鲜,NameServer起來后監(jiān)聽端口,等待Broker庐船、Producer银酬、Consumer連上來,相當(dāng)于一個路由控制中心筐钟。

  • Broker啟動揩瞪,跟所有的NameServer保持長連接,定時發(fā)送心跳包篓冲。心跳包中包含當(dāng)前Broker信息(IP+端口等)以及存儲所有Topic信息李破。注冊成功后宠哄,NameServer集群中就有Topic跟Broker的映射關(guān)系。

  • 收發(fā)消息前嗤攻,先創(chuàng)建Topic毛嫉,創(chuàng)建Topic時需要指定該Topic要存儲在哪些Broker上,也可以在發(fā)送消息時自動創(chuàng)建Topic妇菱。

  • Producer發(fā)送消息承粤,啟動時先跟NameServer集群中的其中一臺建立長連接,并從NameServer中獲取當(dāng)前發(fā)送的Topic存在哪些Broker上恶耽,輪詢從隊列列表中選擇一個隊列密任,然后與隊列所在的Broker建立長連接從而向Broker發(fā)消息。

  • Consumer跟Producer類似偷俭,跟其中一臺NameServer建立長連接浪讳,獲取當(dāng)前訂閱Topic存在哪些Broker上,然后直接跟Broker建立連接通道涌萤,開始消費消息淹遵。

七、RocketMQ 集群部署

7.1 集群模式

單Master模式

這種方式風(fēng)險較大负溪,一旦Broker重啟或者宕機(jī)時透揣,會導(dǎo)致整個服務(wù)不可用。不建議線上環(huán)境使用,可以用于本地測試川抡。

  • 優(yōu)點:本地開發(fā)測試辐真,配置簡單,同步刷盤消息不會丟失崖堤。

  • 缺點:不可靠侍咱,如果宕機(jī)會導(dǎo)致服務(wù)不可用。

多Master模式

一個集群無Slave密幔,全是Master楔脯,例如2個Master或者3個Master,這種模式的優(yōu)缺點如下:

  • 優(yōu)點:配置簡單胯甩,單個Master宕機(jī)或重啟維護(hù)對應(yīng)用無影響昧廷,在磁盤配置為RAID10時,即使機(jī)器宕機(jī)不可恢復(fù)情況下偎箫,由于RAID10磁盤非衬炯恚可靠,消息也不會丟(異步刷盤丟失少量消息淹办,同步刷盤一條不丟)弄诲,性能最高;

  • 缺點:單臺機(jī)器宕機(jī)期間,這臺機(jī)器上未被消費的消息在機(jī)器恢復(fù)之前不可訂閱齐遵,消息實時性會受到影響寂玲。

多Master多Slave模式(異步)

每個Master配置一個Slave,有多對Master-Slave梗摇,HA采用異步復(fù)制方式拓哟,主備有短暫消息延遲(毫秒級),這種模式的優(yōu)缺點如下:

  • 優(yōu)點:即使磁盤損壞伶授,消息丟失的非常少断序,且消息實時性不會受影響,同時Master宕機(jī)后糜烹,消費者仍然可以從Slave消費违诗,而且此過程對應(yīng)用透明,不需要人工干預(yù)疮蹦,性能同多Master模式幾乎一樣诸迟;

  • 缺點:Master宕機(jī),磁盤損壞情況下會丟失少量消息愕乎。

多Master多Slave模式(同步)

每個Master配置一個Slave阵苇,有多對Master-Slave,HA采用同步雙寫方式感论,即只有主備都寫成功绅项,才向應(yīng)用返回成功,這種模式的優(yōu)缺點如下:

  • 優(yōu)點:數(shù)據(jù)與服務(wù)都無單點故障比肄,Master宕機(jī)情況下快耿,消息無延遲,服務(wù)可用性與數(shù)據(jù)可用性都非常高芳绩;

  • 缺點:性能比異步復(fù)制模式略低(大約低10%左右)掀亥,發(fā)送單個消息的RT會略高,且目前版本在主節(jié)點宕機(jī)后示括,備機(jī)不能自動切換為主機(jī)。

DLedger 集群模式

RocketMQ-on-DLedger Group 是指一組相同名稱的 Broker痢畜,至少需要 3 個節(jié)點垛膝,通過 Raft 自動選舉出一個 Leader,其余節(jié)點 作為 Follower丁稀,并在 Leader 和 Follower 之間復(fù)制數(shù)據(jù)以保證高可用吼拥。

RocketMQ-on-DLedger Group 能自動容災(zāi)切換,并保證數(shù)據(jù)一致线衫。RocketMQ-on-DLedger Group 是可以水平擴(kuò)展的凿可,也即可以部署任意多個 RocketMQ-on-DLedger Group 同時對外提供服務(wù)。

  • 優(yōu)點:多節(jié)點(至少三個)組成集群,其中一個為 Leader 節(jié)點枯跑,其余為 Follower 節(jié)點組成高可用惨驶,能夠自動容災(zāi)切換。

  • 缺點:集群成本增加(同一個group最少新增一臺機(jī)器)敛助、RocketMQ4.5及以后版本才支持粗卜。

7.2 本地測試環(huán)境快速搭建(單Master模式)

環(huán)境準(zhǔn)備

  1. jdk1.8+

  2. rocketmq-all-4.5.0-bin-release

安裝步驟

1、解壓 rocketmq-all-4.5.1-bin-release.zip 到指定目錄,如下:

image.png

benchmark:基礎(chǔ)測試腳本目錄 / lib:運行依賴包

bin:命令運維腳本目錄 / conf:配置目錄

2纳击、進(jìn)入bin 目錄下編輯 runserver.sh 和 runborker.sh 兩個文件续扔,調(diào)整一下namesrv和broker的啟動的jvm內(nèi)存參數(shù)。具體參數(shù)大小根據(jù)系統(tǒng)環(huán)境配置情況而定:

vim runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

vim runroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"  
ps:broker端本地環(huán)境不建議開啟堆外內(nèi)存

3焕数、根據(jù)情況配置broker的一些參數(shù)

brokerClusterName = DefaultCluster
brokerName = broker-a
# 0表示master >0表示slave
brokerId = 0
# 幾點開始刪除文件
deleteWhen = 04
# 文件保留時間纱昧,默認(rèn)48小時
fileReservedTime = 48
# Broker 的角色
#- ASYNC_MASTER 異步復(fù)制Master
#- SYNC_MASTER 同步雙寫Master
#- SLAVE
brokerRole = ASYNC_MASTER
#刷盤方式
# ASYNC_FLUSH 異步刷盤
# SYNC_FLUSH 同步刷盤
flushDiskType = ASYNC_FLUSH

備注:
# 存儲路徑
storePathRootDir=xxx/store
# commitLog存儲路徑
storePathCommitLog=xxx/store/commitlog
# 消費隊列存儲路徑
storePathConsumeQueue=xx/store/pathconsumequeue
# 消息索引存儲路徑
storePathIndex=xxx/store/pathindex
# checkpoint 文件存儲
storeCheckpoint=xxx/store/checkpoint
# abort 文件存儲
abortFile=xxx/store/abort

備注:

  • xxx 默認(rèn)等于System.getProperty("user.home") +File.separator+"store"

  • 更多其他配置:

    MessageStoreConfig、BrokerConfig源碼類

  • 根據(jù)實際情況配置存儲路徑堡赔,storePathCommitLog占用空間相對最大

4识脆、啟動rocketmq, 先啟動 namesrv 再啟動 broker

--后臺啟動namesrv  
nohup sh mqnamesrv &  
--后臺啟動broker  
export NAMESRV_ADDR=localhost:9876  
--如果你在brocker.conf文件中配置了namesrvAddr = localhost:9876就直接用下面的命令  
nohup sh mqbroker -c /xxx/rocketmq-all-4.5.1-bin-release/conf/broker.conf &  
--否則用這個命令  
nohup sh mqbroker -n localhost:9876 -c /xxx/rocketmq-all-4.5.1-bin-release/conf/

5、查看進(jìn)程和日志看是否啟動成功

jps -l  or 查看 namesrv.log 和 broker.log 日志

八加匈、本地源碼快速搭建調(diào)試

源碼獲取

https://github.com/apache/rocketmq/releases

該網(wǎng)址可下載source包存璃,也可以fork倉庫。

啟動 RocketMQ Namesrv

參考NameServerInstanceTest的startup方法雕拼,編寫main方法纵东,demo如下:


public class NameServerInstanceTest {
    .....
      public static void main(String[] args) throws Exception {
        final NamesrvConfig namesrvConfig = new NamesrvConfig(); 
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        nettyServerConfig.setListenPort(9876); //設(shè)置端口
        // 創(chuàng)建 NamesrvController,啟動
        NamesrvController namesrvController = new NamesrvController(namesrvConfig, nettyServerConfig);
        namesrvController.initialize();
        namesrvController.start();
        Thread.sleep(Long.MAX_VALUE); //掛起
    }
}

備注:

運行main方法, 顯示以下關(guān)鍵字說明啟動成功

  • NettyEventExecutor service started

  • FileWatchService service started

啟動 RocketMQ Broker

參考BrokerControllerTest的testBrokerRestart方法啥寇,編寫main方法偎球,demo如下:

public class BrokerControllerTest {
    .....
 public static void main(String[] args) throws Exception {
        NettyServerConfig nettyServerConfig = new NettyServerConfig();
        nettyServerConfig.setListenPort(10911);
        BrokerConfig brokerConfig = new BrokerConfig();  // BrokerConfig 配置
        brokerConfig.setNamesrvAddr("127.0.0.1:9876"); // 配置nameServer地址
        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
        // 創(chuàng)建 BrokerController 對象,并啟動
        BrokerController brokerController = new BrokerController(//
                brokerConfig,
                nettyServerConfig, 
                new NettyClientConfig(),
                messageStoreConfig);
        brokerController.initialize();
        brokerController.start();
        Thread.sleep(Long.MAX_VALUE); //掛起
    }
}

備注:

運行main方法,不報錯即可辑甜,但是在:NameServerInstanceTest(nameServer)中衰絮,發(fā)現(xiàn)以下關(guān)鍵字日志,表明broker注冊nameServer成功:new topic registered...等

啟動 RocketMQ Producer

參考:org.apache.rocketmq.example.quickstart.Producer

public class Producer {

    public static void main(String[] args) throws MQClientException, InterruptedException, IOException {

        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        /*
         * Launch the instance.
         */
        producer.setNamesrvAddr("127.0.0.1:9876"); // 配置nameServer地址
        producer.start();

        for (int i = 0; i < 1; i++) { // i 可以隨便配置
            try {

                /*
                 * Create a message instance, specifying topic, tag and message body.
                 */
                Message msg = new Message("abc" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
//                msg.setDelayTimeLevel(10);
/*
* Call send message to deliver message to one of brokers.
*/
SendResult sendResult = producer.send(msg);

                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        /*
         * Shut down once the producer instance is not longer in use.
         */
        producer.shutdown();
    }
}

備注:

運行main方法, 發(fā)現(xiàn)日志中包含:

SendResult [sendStatus=SEND_OK...即發(fā)送成功

啟動 RocketMQ Consumer磷醋,參考:

org.apache.rocketmq.example.quickstart.Consumer

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        /*
         * Instantiate with specified consumer group name.
         */
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");

        /*
         * Specify name server addresses.
         * <p/>
         *
         * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
         * <pre>
         * {@code
         * consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
         * }
         * </pre>
         */

        /*
         * Specify where to start in case the specified consumer group is a brand new one.
         */
        consumer.setNamesrvAddr("127.0.0.1:9876"); // 配置nameServer地址
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        /*
         * Subscribe one more more topics to consume.
         */
        consumer.subscribe("abc", "*");

        /*
         *  Register callback to execute on arrival of messages fetched from brokers.
         */
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        /*
         *  Launch the consumer instance.
         */
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

備注:

運行main方法, 發(fā)現(xiàn)日志中包含:Receive New Messages...即消費成功

九猫牡、總結(jié)

rocketMq作為低延遲、高并發(fā)邓线、高可用淌友、高可靠的分布式消息中間件,其詳細(xì)知識點非常多骇陈,如需深入震庭,建議源碼入坑。


程序員的核心競爭力其實還是技術(shù)你雌,因此對技術(shù)還是要不斷的學(xué)習(xí)器联,關(guān)注 “IT巔峰技術(shù)” 公眾號 ,該公眾號內(nèi)容定位:中高級開發(fā)、架構(gòu)師拨拓、中層管理人員等中高端崗位服務(wù)的肴颊,除了技術(shù)交流外還有很多架構(gòu)思想和實戰(zhàn)案例。

作者是 《 消息中間件 RocketMQ 技術(shù)內(nèi)幕》 一書作者千元,同時也是 “RocketMQ 上海社區(qū)”聯(lián)合創(chuàng)始人苫昌,曾就職于拼多多、德邦等公司幸海,現(xiàn)任上市快遞公司架構(gòu)負(fù)責(zé)人祟身,主要負(fù)責(zé)開發(fā)框架的搭建、中間件相關(guān)技術(shù)的二次開發(fā)和運維管理物独、混合云及基礎(chǔ)服務(wù)平臺的建設(shè)袜硫。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市挡篓,隨后出現(xiàn)的幾起案子婉陷,更是在濱河造成了極大的恐慌,老刑警劉巖官研,帶你破解...
    沈念sama閱讀 223,207評論 6 521
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件秽澳,死亡現(xiàn)場離奇詭異,居然都是意外死亡戏羽,警方通過查閱死者的電腦和手機(jī)担神,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,455評論 3 400
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來始花,“玉大人妄讯,你說我怎么就攤上這事】嵯” “怎么了亥贸?”我有些...
    開封第一講書人閱讀 170,031評論 0 366
  • 文/不壞的土叔 我叫張陵,是天一觀的道長浇垦。 經(jīng)常有香客問我炕置,道長,這世上最難降的妖魔是什么男韧? 我笑而不...
    開封第一講書人閱讀 60,334評論 1 300
  • 正文 為了忘掉前任朴摊,我火速辦了婚禮,結(jié)果婚禮上煌抒,老公的妹妹穿的比我還像新娘仍劈。我一直安慰自己厕倍,他們只是感情好寡壮,可當(dāng)我...
    茶點故事閱讀 69,322評論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著,像睡著了一般况既。 火紅的嫁衣襯著肌膚如雪这溅。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,895評論 1 314
  • 那天棒仍,我揣著相機(jī)與錄音悲靴,去河邊找鬼。 笑死莫其,一個胖子當(dāng)著我的面吹牛癞尚,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播乱陡,決...
    沈念sama閱讀 41,300評論 3 424
  • 文/蒼蘭香墨 我猛地睜開眼浇揩,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了憨颠?” 一聲冷哼從身側(cè)響起胳徽,我...
    開封第一講書人閱讀 40,264評論 0 277
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎爽彤,沒想到半個月后养盗,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,784評論 1 321
  • 正文 獨居荒郊野嶺守林人離奇死亡适篙,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,870評論 3 343
  • 正文 我和宋清朗相戀三年往核,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片匙瘪。...
    茶點故事閱讀 40,989評論 1 354
  • 序言:一個原本活蹦亂跳的男人離奇死亡铆铆,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出丹喻,到底是詐尸還是另有隱情薄货,我是刑警寧澤,帶...
    沈念sama閱讀 36,649評論 5 351
  • 正文 年R本政府宣布碍论,位于F島的核電站谅猾,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏鳍悠。R本人自食惡果不足惜税娜,卻給世界環(huán)境...
    茶點故事閱讀 42,331評論 3 336
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望藏研。 院中可真熱鬧敬矩,春花似錦、人聲如沸蠢挡。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,814評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至禽炬,卻和暖如春涧卵,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背腹尖。 一陣腳步聲響...
    開封第一講書人閱讀 33,940評論 1 275
  • 我被黑心中介騙來泰國打工柳恐, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人热幔。 一個月前我還...
    沈念sama閱讀 49,452評論 3 379
  • 正文 我出身青樓乐设,卻偏偏與公主長得像,于是被迫代替她去往敵國和親绎巨。 傳聞我的和親對象是個殘疾皇子伤提,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,995評論 2 361

推薦閱讀更多精彩內(nèi)容