MQ(Message Queue)是一種跨進程的通信機制气笙,用于傳遞消息次企。是一種高效、可靠潜圃、安全缸棵、可擴展的分布式消息服務。通俗點說谭期,就是一個先進先出的數(shù)據(jù)結構堵第。MQ有三大主要作用分別為解耦、異步隧出、削峰/限流踏志。
消息隊列掃盲
消息隊列顧名思義就是存放消息的隊列。
所以問題并不是消息隊列是什么胀瞪,而是 消息隊列為什么會出現(xiàn)针余?消息隊列能用來干什么弄诲?用它來干這些事會帶來什么好處吼句?消息隊列會帶來副作用嗎捍靠?
消息隊列算是作為后端程序員的一個必備技能吧膊毁,因為分布式應用必定涉及到各個系統(tǒng)之間的通信問題断序,這個時候消息隊列也應運而生了饼酿“锘伲可以說分布式的產(chǎn)生是消息隊列的基礎铭乾,而分布式怕是一個很古老的概念了吧汛蝙,所以消息隊列也是一個很古老的中間件了烈涮。
消息隊列能用來干什么?
異步
你可能會反駁我患雇,應用之間的通信又不是只能由消息隊列解決跃脊,好好的通信為什么中間非要插一個消息隊列呢?我不能直接進行通信嗎苛吱?
很好酪术,你又提出了一個概念,同步通信翠储。就比如現(xiàn)在業(yè)界使用比較多的 Dubbo
就是一個適用于各個系統(tǒng)之間同步通信的 RPC
框架绘雁。
我來舉個??吧,比如我們有一個購票系統(tǒng)援所,需求是用戶在購買完之后能接收到購買完成的短信庐舟。
我們省略中間的網(wǎng)絡通信時間消耗,假如購票系統(tǒng)處理需要 150ms 住拭,短信系統(tǒng)處理需要 200ms 挪略,那么整個處理流程的時間消耗就是 150ms + 200ms = 350ms历帚。
當然,乍看沒什么問題杠娱⊥炖危可是仔細一想你就感覺有點問題,我用戶購票在購票系統(tǒng)的時候其實就已經(jīng)完成了購買摊求,而我現(xiàn)在通過同步調用非要讓整個請求拉長時間禽拔,而短息系統(tǒng)這玩意又不是很有必要,它僅僅是一個輔助功能增強用戶體驗感而已室叉。我現(xiàn)在整個調用流程就有點 頭重腳輕 的感覺了睹栖,購票是一個不太耗時的流程,而我現(xiàn)在因為同步調用茧痕,非要等待發(fā)送短信這個比較耗時的操作才返回結果野来。那我如果再加一個發(fā)送郵件呢?
這樣整個系統(tǒng)的調用鏈又變長了凿渊,整個時間就變成了550ms梁只。
當我們在學生時代需要在食堂排隊的時候,我們和食堂大媽就是一個同步的模型埃脏。
我們需要告訴食堂大媽:“姐姐搪锣,給我加個雞腿,再加個酸辣土豆絲彩掐,幫我澆點汁上去构舟,多打點飯哦??
然后大媽幫我們打飯配菜,我們看著大媽那顫抖的手和掉落的土豆絲不禁咽了咽口水堵幽。
最終我們從大媽手中接過飯菜然后去尋找座位了...
回想一下狗超,我們在給大媽發(fā)送需要的信息之后我們是 同步等待大媽給我配好飯菜 的,上面我們只是加了雞腿和土豆絲朴下,萬一我再加一個番茄牛腩努咐,韭菜雞蛋,這樣是不是大媽打飯配菜的流程就會變長殴胧,我們等待的時間也會相應的變長渗稍。
那后來,我們工作賺錢了有錢去飯店吃飯了团滥,我們告訴服務員來一碗牛肉面加個荷包蛋 (傳達一個消息) 竿屹,然后我們就可以在飯桌上安心的玩手機了 (干自己其他事情) ,等到我們的牛肉面上了我們就可以吃了灸姊。這其中我們也就傳達了一個消息拱燃,然后我們又轉過頭干其他事情了。這其中雖然做面的時間沒有變短力惯,但是我們只需要傳達一個消息就可以看其他事情了碗誉,這是一個 異步 的概念召嘶。
所以,為了解決這一個問題哮缺,聰明的程序員在中間也加了個類似于服務員的中間件——消息隊列苍蔬。這個時候我們就可以把模型給改造了。
這樣蝴蜓,我們在將消息存入消息隊列之后我們就可以直接返回了(我們告訴服務員我們要吃什么然后玩手機),所以整個耗時只是 150ms + 10ms = 160ms俺猿。
但是你需要注意的是茎匠,整個流程的時長是沒變的,就像你僅僅告訴服務員要吃什么是不會影響到做面的速度的押袍。
解耦
回到最初同步調用的過程诵冒,我們寫個偽代碼簡單概括一下。
那么第二步谊惭,我們又添加了一個發(fā)送郵件汽馋,我們就得重新去修改代碼,如果我們又加一個需求:用戶購買完還需要給他加積分圈盔,這個時候我們是不是又得改代碼豹芯?
如果你覺得還行,那么我這個時候不要發(fā)郵件這個服務了呢驱敲,我是不是又得改代碼铁蹈,又得重啟應用?
這樣改來改去是不是很麻煩众眨,那么 此時我們就用一個消息隊列在中間進行解耦 握牧。你需要注意的是,我們后面的發(fā)送短信娩梨、發(fā)送郵件沿腰、添加積分等一些操作都依賴于上面的 result
,這東西抽象出來就是購票的處理結果呀狈定,比如訂單號颂龙,用戶賬號等等,也就是說我們后面的一系列服務都是需要同樣的消息來進行處理掸冤。既然這樣厘托,我們是不是可以通過 “廣播消息” 來實現(xiàn)。
上面所講的“廣播”并不是真正的廣播稿湿,而是接下來的系統(tǒng)作為消費者去 訂閱 特定的主題铅匹。比如我們這里的主題就可以叫做 訂票
,我們購買系統(tǒng)作為一個生產(chǎn)者去生產(chǎn)這條消息放入消息隊列饺藤,然后消費者訂閱了這個主題包斑,會從消息隊列中拉取消息并消費流礁。就比如我們剛剛畫的那張圖,你會發(fā)現(xiàn)罗丰,在生產(chǎn)者這邊我們只需要關注 生產(chǎn)消息到指定主題中 神帅,而 消費者只需要關注從指定主題中拉取消息 就行了。
如果沒有消息隊列萌抵,每當一個新的業(yè)務接入找御,我們都要在主系統(tǒng)調用新接口、或者當我們取消某些業(yè)務绍填,我們也得在主系統(tǒng)刪除某些接口調用霎桅。有了消息隊列,我們只需要關心消息是否送達了隊列讨永,至于誰希望訂閱滔驶,接下來收到消息如何處理,是下游的事情卿闹,無疑極大地減少了開發(fā)和聯(lián)調的工作量揭糕。
削峰
我們再次回到一開始我們使用同步調用系統(tǒng)的情況,并且思考一下锻霎,如果此時有大量用戶請求購票整個系統(tǒng)會變成什么樣著角?
如果,此時有一萬的請求進入購票系統(tǒng)量窘,我們知道運行我們主業(yè)務的服務器配置一般會比較好雇寇,所以這里我們假設購票系統(tǒng)能承受這一萬的用戶請求,那么也就意味著我們同時也會出現(xiàn)一萬調用發(fā)短信服務的請求蚌铜。而對于短信系統(tǒng)來說并不是我們的主要業(yè)務锨侯,所以我們配備的硬件資源并不會太高,那么你覺得現(xiàn)在這個短信系統(tǒng)能承受這一萬的峰值么冬殃,且不說能不能承受囚痴,系統(tǒng)會不會 直接崩潰 了?
短信業(yè)務又不是我們的主業(yè)務审葬,我們能不能 折中處理 呢深滚?如果我們把購買完成的信息發(fā)送到消息隊列中,而短信系統(tǒng) 盡自己所能地去消息隊列中取消息和消費消息 涣觉,即使處理速度慢一點也無所謂痴荐,只要我們的系統(tǒng)沒有崩潰就行了。
留得江山在官册,還怕沒柴燒生兆?你敢說每次發(fā)送驗證碼的時候是一發(fā)你就收到了的么?
消息隊列能帶來什么好處膝宁?
其實上面我已經(jīng)說了鸦难。異步根吁、解耦、削峰合蔽。 哪怕你上面的都沒看懂也千萬要記住這六個字击敌,因為他不僅是消息隊列的精華,更是編程和架構的精華拴事。
消息隊列會帶來副作用嗎沃斤?
沒有哪一門技術是“銀彈”,消息隊列也有它的副作用刃宵。
比如轰枝,本來好好的兩個系統(tǒng)之間的調用,我中間加了個消息隊列组去,如果消息隊列掛了怎么辦呢?是不是 降低了系統(tǒng)的可用性 步淹?
那這樣是不是要保證HA(高可用)从隆?是不是要搞集群?那么我 整個系統(tǒng)的復雜度是不是上升了 缭裆?
拋開上面的問題不講键闺,萬一我發(fā)送方發(fā)送失敗了,然后執(zhí)行重試澈驼,這樣就可能產(chǎn)生重復的消息辛燥。
或者我消費端處理失敗了,請求重發(fā)缝其,這樣也會產(chǎn)生重復的消息挎塌。
對于一些微服務來說,消費重復消息會帶來更大的麻煩内边,比如增加積分榴都,這個時候我加了多次是不是對其他用戶不公平?
那么漠其,又 如何解決重復消費消息的問題 呢嘴高?
如果我們此時的消息需要保證嚴格的順序性怎么辦呢?比如生產(chǎn)者生產(chǎn)了一系列的有序消息(對一個id為1的記錄進行刪除增加修改)和屎,但是我們知道在發(fā)布訂閱模型中拴驮,對于主題是無順序的,那么這個時候就會導致對于消費者消費消息的時候沒有按照生產(chǎn)者的發(fā)送順序消費柴信,比如這個時候我們消費的順序為修改刪除增加套啤,如果該記錄涉及到金額的話是不是會出大事情?
那么颠印,又 如何解決消息的順序消費問題 呢纲岭?
就拿我們上面所講的分布式系統(tǒng)來說抹竹,用戶購票完成之后是不是需要增加賬戶積分?在同一個系統(tǒng)中我們一般會使用事務來進行解決止潮,如果用 Spring
的話我們在上面?zhèn)未a中加入 @Transactional
注解就好了窃判。但是在不同系統(tǒng)中如何保證事務呢?總不能這個系統(tǒng)我扣錢成功了你那積分系統(tǒng)積分沒加吧喇闸?或者說我這扣錢明明失敗了袄琳,你那積分系統(tǒng)給我加了積分。
那么燃乍,又如何 解決分布式事務問題 呢唆樊?
我們剛剛說了,消息隊列可以進行削峰操作刻蟹,那如果我的消費者如果消費很慢或者生產(chǎn)者生產(chǎn)消息很快逗旁,這樣是不是會將消息堆積在消息隊列中?
那么舆瘪,又如何 解決消息堆積的問題 呢片效?
可用性降低,復雜度上升英古,又帶來一系列的重復消費淀衣,順序消費,分布式事務召调,消息堆積的問題膨桥,這消息隊列還怎么用啊唠叛?
別急只嚣,辦法總是有的。
RocketMQ是什么艺沼?
哇介牙,你個混蛋!上面給我拋出那么多問題澳厢,你現(xiàn)在又講 RocketMQ
环础,還讓不讓人活了?剩拢!
別急別急线得,話說你現(xiàn)在清楚 MQ
的構造嗎,我還沒講呢徐伐,我們先搞明白 MQ
的內部構造贯钩,再來看看如何解決上面的一系列問題吧,不過你最好帶著問題去閱讀和了解喔。
RocketMQ
是一個 隊列模型 的消息中間件角雷,具有高性能祸穷、高可靠、高實時勺三、分布式 的特點雷滚。它是一個采用 Java
語言開發(fā)的分布式的消息系統(tǒng),由阿里巴巴團隊開發(fā)吗坚,在2016年底貢獻給 Apache
祈远,成為了 Apache
的一個頂級項目。 在阿里內部商源,RocketMQ
很好地服務了集團大大小小上千個應用车份,在每年的雙十一當天,更有不可思議的萬億級消息通過 RocketMQ
流轉牡彻。
廢話不多說扫沼,想要了解 RocketMQ
歷史的同學可以自己去搜尋資料。聽完上面的介紹庄吼,你只要知道 RocketMQ
很快充甚、很牛、而且經(jīng)歷過雙十一的實踐就行了霸褒!
隊列模型和主題模型
在談 RocketMQ
的技術架構之前,我們先來了解一下兩個名詞概念——隊列模型 和 主題模型 盈蛮。
首先我問一個問題废菱,消息隊列為什么要叫消息隊列?
你可能覺得很弱智抖誉,這玩意不就是存放消息的隊列嘛殊轴?不叫消息隊列叫什么?
的確袒炉,早期的消息中間件是通過 隊列 這一模型來實現(xiàn)的旁理,可能是歷史原因,我們都習慣把消息中間件成為消息隊列我磁。
但是孽文,如今例如 RocketMQ
、Kafka
這些優(yōu)秀的消息中間件不僅僅是通過一個 隊列 來實現(xiàn)消息存儲的夺艰。
隊列模型
就像我們理解隊列一樣芋哭,消息中間件的隊列模型就真的只是一個隊列。郁副。减牺。我畫一張圖給大家理解。
在一開始我跟你提到了一個 “廣播” 的概念,也就是說如果我們此時我們需要將一個消息發(fā)送給多個消費者(比如此時我需要將信息發(fā)送給短信系統(tǒng)和郵件系統(tǒng))拔疚,這個時候單個隊列即不能滿足需求了肥隆。
當然你可以讓 Producer
生產(chǎn)消息放入多個隊列中羊异,然后每個隊列去對應每一個消費者阀参。問題是可以解決,創(chuàng)建多個隊列并且復制多份消息是會很影響資源和性能的滤蝠。而且墩虹,這樣子就會導致生產(chǎn)者需要知道具體消費者個數(shù)然后去復制對應數(shù)量的消息隊列嘱巾,這就違背我們消息中間件的 解耦 這一原則。
主題模型
那么有沒有好的方法去解決這一個問題呢诫钓?有旬昭,那就是 主題模型 或者可以稱為 發(fā)布訂閱模型 。
感興趣的同學可以去了解一下設計模式里面的觀察者模式并且手動實現(xiàn)一下菌湃,我相信你會有所收獲的问拘。
在主題模型中,消息的生產(chǎn)者稱為 發(fā)布者(Publisher) 惧所,消息的消費者稱為 訂閱者(Subscriber) 骤坐,存放消息的容器稱為 主題(Topic) 。
其中下愈,發(fā)布者將消息發(fā)送到指定主題中纽绍,訂閱者需要 提前訂閱主題 才能接受特定主題的消息。
RocketMQ中的消息模型
RockerMQ
中的消息模型就是按照 主題模型 所實現(xiàn)的势似。你可能會好奇這個 主題 到底是怎么實現(xiàn)的呢拌夏?你上面也沒有講到呀!
其實對于主題模型的實現(xiàn)來說每個消息中間件的底層設計都是不一樣的履因,就比如 Kafka
中的 分區(qū) 障簿,RocketMQ
中的 隊列 ,RabbitMQ
中的 Exchange
栅迄。我們可以理解為 主題模型/發(fā)布訂閱模型 就是一個標準站故,那些中間件只不過照著這個標準去實現(xiàn)而已。
所以毅舆,RocketMQ
中的 主題模型 到底是如何實現(xiàn)的呢西篓?首先我畫一張圖,大家嘗試著去理解一下憋活。
我們可以看到在整個圖中有 Producer Group
污淋、Topic
、Consumer Group
三個角色余掖,我來分別介紹一下他們寸爆。
-
Producer Group
生產(chǎn)者組: 代表某一類的生產(chǎn)者礁鲁,比如我們有多個秒殺系統(tǒng)作為生產(chǎn)者,這多個合在一起就是一個Producer Group
生產(chǎn)者組赁豆,它們一般生產(chǎn)相同的消息仅醇。 -
Consumer Group
消費者組: 代表某一類的消費者,比如我們有多個短信系統(tǒng)作為消費者魔种,這多個合在一起就是一個Consumer Group
消費者組析二,它們一般消費相同的消息。 -
Topic
主題: 代表一類消息节预,比如訂單消息叶摄,物流消息等等。
你可以看到圖中生產(chǎn)者組中的生產(chǎn)者會向主題發(fā)送消息安拟,而 主題中存在多個隊列蛤吓,生產(chǎn)者每次生產(chǎn)消息之后是指定主題中的某個隊列發(fā)送消息的。
每個主題中都有多個隊列(這里還不涉及到 Broker
)糠赦,集群消費模式下会傲,一個消費者集群多臺機器共同消費一個 topic
的多個隊列拙泽,一個隊列只會被一個消費者消費顾瞻。如果某個消費者掛掉退渗,分組內其它消費者會接替掛掉的消費者繼續(xù)消費秒裕。就像上圖中 Consumer1
和 Consumer2
分別對應著兩個隊列,而 Consuer3
是沒有隊列對應的颖低,所以一般來講要控制 消費者組中的消費者個數(shù)和主題中隊列個數(shù)相同 。
當然也可以消費者個數(shù)小于隊列個數(shù)伴嗡,只不過不太建議。如下圖。
每個消費組在每個隊列上維護一個消費位置 自娩,為什么呢脐彩?
因為我們剛剛畫的僅僅是一個消費者組,我們知道在發(fā)布訂閱模式中一般會涉及到多個消費者組,而每個消費者組在每個隊列中的消費位置都是不同的。如果此時有多個消費者組绵载,那么消息被一個消費者組消費完之后是不會刪除的(因為其它消費者組也需要呀)购裙,它僅僅是為每個消費者組維護一個 消費位移(offset) 躯畴,每次消費者組消費完會返回一個成功的響應摹察,然后隊列再把維護的消費位移加一,這樣就不會出現(xiàn)剛剛消費過的消息再一次被消費了。
可能你還有一個問題,為什么一個主題中需要維護多個隊列 ?
答案是 提高并發(fā)能力 。的確予借,每個主題中只存在一個隊列也是可行的晦溪。你想一下,如果每個主題中只存在一個隊列牌借,這個隊列中也維護著每個消費者組的消費位置磷籍,這樣也可以做到 發(fā)布訂閱模式 。如下圖。
但是万俗,這樣我生產(chǎn)者是不是只能向一個隊列發(fā)送消息蓖墅?又因為需要維護消費位置所以一個隊列只能對應一個消費者組中的消費者,這樣是不是其他的 Consumer
就沒有用武之地了迂曲?從這兩個角度來講传黄,并發(fā)度一下子就小了很多杰扫。
所以總結來說,RocketMQ
通過使用在一個 Topic
中配置多個隊列并且每個隊列維護每個消費者組的消費位置 實現(xiàn)了 主題模式/發(fā)布訂閱模式 膘掰。
RocketMQ的架構圖
講完了消息模型章姓,我們理解起 RocketMQ
的技術架構起來就容易多了。
RocketMQ
技術架構中有四大角色 NameServer
识埋、Broker
蛹疯、Producer
歉铝、Consumer
。我來向大家分別解釋一下這四個角色是干啥的。
-
Broker
: 主要負責消息的存儲、投遞和查詢以及服務高可用保證。說白了就是消息隊列服務器嘛胎挎,生產(chǎn)者生產(chǎn)消息到Broker
筷转,消費者從Broker
拉取消息并消費左电。這里,我還得普及一下關于
Broker
童番、Topic
和 隊列的關系。上面我講解了Topic
和隊列的關系——一個Topic
中存在多個隊列娜亿,那么這個Topic
和隊列存放在哪呢督赤?一個
Topic
分布在多個Broker
上,一個Broker
可以配置多個Topic
助隧,它們是多對多的關系。如果某個
Topic
消息量很大滑沧,應該給它多配置幾個隊列(上文中提到了提高并發(fā)能力)并村,并且 盡量多分布在不同Broker
上,以減輕某個Broker
的壓力 滓技。Topic
消息量都比較均勻的情況下哩牍,如果某個broker
上的隊列越多,則該broker
壓力越大令漂。所以說我們需要配置多個Broker膝昆。
NameServer
: 不知道你們有沒有接觸過ZooKeeper
和Spring Cloud
中的Eureka
,它其實也是一個 注冊中心 叠必,主要提供兩個功能:Broker管理 和 路由信息管理 荚孵。說白了就是Broker
會將自己的信息注冊到NameServer
中,此時NameServer
就存放了很多Broker
的信息(Broker的路由表)纬朝,消費者和生產(chǎn)者就從NameServer
中獲取路由表然后照著路由表的信息和對應的Broker
進行通信(生產(chǎn)者和消費者定期會向NameServer
去查詢相關的Broker
的信息)收叶。Producer
: 消息發(fā)布的角色,支持分布式集群方式部署共苛。說白了就是生產(chǎn)者判没。Consumer
: 消息消費的角色,支持分布式集群方式部署隅茎。支持以push推哆致,pull拉兩種模式對消息進行消費。同時也支持集群方式和廣播方式的消費患膛,它提供實時消息訂閱機制摊阀。說白了就是消費者。
聽完了上面的解釋你可能會覺得,這玩意好簡單胞此。不就是這樣的么臣咖?
嗯?你可能會發(fā)現(xiàn)一個問題漱牵,這老家伙 NameServer
干啥用的夺蛇,這不多余嗎?直接 Producer
酣胀、Consumer
和 Broker
直接進行生產(chǎn)消息刁赦,消費消息不就好了么?
但是闻镶,我們上文提到過 Broker
是需要保證高可用的甚脉,如果整個系統(tǒng)僅僅靠著一個 Broker
來維持的話,那么這個 Broker
的壓力會不會很大铆农?所以我們需要使用多個 Broker
來保證 負載均衡 牺氨。
如果說,我們的消費者和生產(chǎn)者直接和多個 Broker
相連墩剖,那么當 Broker
修改的時候必定會牽連著每個生產(chǎn)者和消費者猴凹,這樣就會產(chǎn)生耦合問題,而 NameServer
注冊中心就是用來解決這個問題的岭皂。
如果還不是很理解的話郊霎,可以去看我介紹
Spring Cloud
的那篇文章,其中介紹了Eureka
注冊中心爷绘。
當然歹篓,RocketMQ
中的技術架構肯定不止前面那么簡單,因為上面圖中的四個角色都是需要做集群的揉阎。我給出一張官網(wǎng)的架構圖庄撮,大家嘗試理解一下。
其實和我們最開始畫的那張乞丐版的架構圖也沒什么區(qū)別毙籽,主要是一些細節(jié)上的差別洞斯。聽我細細道來??。
第一坑赡、我們的 Broker
做了集群并且還進行了主從部署 烙如,由于消息分布在各個 Broker
上,一旦某個 Broker
宕機毅否,則該Broker
上的消息讀寫都會受到影響亚铁。所以 Rocketmq
提供了 master/slave
的結構,salve
定時從 master
同步數(shù)據(jù)(同步刷盤或者異步刷盤)螟加,如果 master
宕機徘溢,則 slave
提供消費服務吞琐,但是不能寫入消息 (后面我還會提到哦)。
第二然爆、為了保證 HA
站粟,我們的 NameServer
也做了集群部署,但是請注意它是 去中心化 的曾雕。也就意味著它沒有主節(jié)點奴烙,你可以很明顯地看出 NameServer
的所有節(jié)點是沒有進行 Info Replicate
的,在 RocketMQ
中是通過 單個Broker和所有NameServer保持長連接 剖张,并且在每隔30秒 Broker
會向所有 Nameserver
發(fā)送心跳切诀,心跳包含了自身的 Topic
配置信息,這個步驟就對應這上面的 Routing Info
搔弄。
第三幅虑、在生產(chǎn)者需要向 Broker
發(fā)送消息的時候,需要先從 NameServer
獲取關于 Broker
的路由信息肯污,然后通過 輪詢 的方法去向每個隊列中生產(chǎn)數(shù)據(jù)以達到 負載均衡 的效果翘单。
第四吨枉、消費者通過 NameServer
獲取所有 Broker
的路由信息后蹦渣,向 Broker
發(fā)送 Pull
請求來獲取消息數(shù)據(jù)。Consumer
可以以兩種模式啟動—— 廣播(Broadcast)和集群(Cluster)貌亭。廣播模式下柬唯,一條消息會發(fā)送給 同一個消費組中的所有消費者 ,集群模式下消息只會發(fā)送給一個消費者圃庭。
如何解決 順序消費锄奢、重復消費
其實,這些東西都是我在介紹消息隊列帶來的一些副作用的時候提到的剧腻,也就是說拘央,這些問題不僅僅掛鉤于 RocketMQ
,而是應該每個消息中間件都需要去解決的书在。
在上面我介紹 RocketMQ
的技術架構的時候我已經(jīng)向你展示了 它是如何保證高可用的 灰伟,這里不涉及運維方面的搭建,如果你感興趣可以自己去官網(wǎng)上照著例子搭建屬于你自己的 RocketMQ
集群儒旬。
其實
Kafka
的架構基本和RocketMQ
類似栏账,只是它注冊中心使用了Zookeeper
、它的 分區(qū) 就相當于RocketMQ
中的 隊列 栈源。還有一些小細節(jié)不同會在后面提到挡爵。
順序消費
在上面的技術架構介紹中,我們已經(jīng)知道了 RocketMQ
在主題上是無序的甚垦、它只有在隊列層面才是保證有序 的茶鹃。
這又扯到兩個概念——普通順序 和 嚴格順序 涣雕。
所謂普通順序是指 消費者通過 同一個消費隊列收到的消息是有順序的 ,不同消息隊列收到的消息則可能是無順序的前计。普通順序消息在 Broker
重啟情況下不會保證消息順序性 (短暫時間) 胞谭。
所謂嚴格順序是指 消費者收到的 所有消息 均是有順序的。嚴格順序消息 即使在異常情況下也會保證消息的順序性 男杈。
但是丈屹,嚴格順序看起來雖好,實現(xiàn)它可會付出巨大的代價伶棒。如果你使用嚴格順序模式旺垒,Broker
集群中只要有一臺機器不可用,則整個集群都不可用肤无。你還用啥先蒋?現(xiàn)在主要場景也就在 binlog
同步。
一般而言宛渐,我們的 MQ
都是能容忍短暫的亂序竞漾,所以推薦使用普通順序模式。
那么窥翩,我們現(xiàn)在使用了 普通順序模式 业岁,我們從上面學習知道了在 Producer
生產(chǎn)消息的時候會進行輪詢(取決你的負載均衡策略)來向同一主題的不同消息隊列發(fā)送消息。那么如果此時我有幾個消息分別是同一個訂單的創(chuàng)建寇蚊、支付笔时、發(fā)貨,在輪詢的策略下這 三個消息會被發(fā)送到不同隊列 仗岸,因為在不同的隊列此時就無法使用 RocketMQ
帶來的隊列有序特性來保證消息有序性了允耿。
那么,怎么解決呢扒怖?
其實很簡單较锡,我們需要處理的僅僅是將同一語義下的消息放入同一個隊列(比如這里是同一個訂單),那我們就可以使用 Hash取模法 來保證同一個訂單在同一個隊列中就行了盗痒。
重復消費
emmm蚂蕴,就兩個字—— 冪等 。在編程中一個冪等 操作的特點是其任意多次執(zhí)行所產(chǎn)生的影響均與一次執(zhí)行的影響相同积糯。比如說掂墓,這個時候我們有一個訂單的處理積分的系統(tǒng),每當來一個消息的時候它就負責為創(chuàng)建這個訂單的用戶的積分加上相應的數(shù)值看成【啵可是有一次,消息隊列發(fā)送給訂單系統(tǒng) FrancisQ 的訂單信息川慌,其要求是給 FrancisQ 的積分加上 500吃嘿。但是積分系統(tǒng)在收到 FrancisQ 的訂單信息處理完成之后返回給消息隊列處理成功的信息的時候出現(xiàn)了網(wǎng)絡波動(當然還有很多種情況祠乃,比如Broker意外重啟等等),這條回應沒有發(fā)送成功兑燥。
那么亮瓷,消息隊列沒收到積分系統(tǒng)的回應會不會嘗試重發(fā)這個消息?問題就來了降瞳,我再發(fā)這個消息嘱支,萬一它又給 FrancisQ 的賬戶加上 500 積分怎么辦呢?
所以我們需要給我們的消費者實現(xiàn) 冪等 挣饥,也就是對同一個消息的處理結果除师,執(zhí)行多少次都不變。
那么如何給業(yè)務實現(xiàn)冪等呢扔枫?這個還是需要結合具體的業(yè)務的汛聚。你可以使用 寫入 Redis
來保證,因為 Redis
的 key
和 value
就是天然支持冪等的短荐。當然還有使用 數(shù)據(jù)庫插入法 倚舀,基于數(shù)據(jù)庫的唯一鍵來保證重復數(shù)據(jù)不會被插入多條。
不過最主要的還是需要 根據(jù)特定場景使用特定的解決方案 忍宋,你要知道你的消息消費是否是完全不可重復消費還是可以忍受重復消費的痕貌,然后再選擇強校驗和弱校驗的方式。畢竟在 CS 領域還是很少有技術銀彈的說法讶踪。
而在整個互聯(lián)網(wǎng)領域芯侥,冪等不僅僅適用于消息隊列的重復消費問題泊交,這些實現(xiàn)冪等的方法乳讥,也同樣適用于,在其他場景中來解決重復請求或者重復調用的問題 廓俭。比如將HTTP服務設計成冪等的云石,解決前端或者APP重復提交表單數(shù)據(jù)的問題 ,也可以將一個微服務設計成冪等的研乒,解決 RPC
框架自動重試導致的 重復調用問題 汹忠。
分布式事務
如何解釋分布式事務呢?事務大家都知道吧雹熬?要么都執(zhí)行要么都不執(zhí)行 宽菜。在同一個系統(tǒng)中我們可以輕松地實現(xiàn)事務,但是在分布式架構中竿报,我們有很多服務是部署在不同系統(tǒng)之間的铅乡,而不同服務之間又需要進行調用。比如此時我下訂單然后增加積分烈菌,如果保證不了分布式事務的話阵幸,就會出現(xiàn)A系統(tǒng)下了訂單花履,但是B系統(tǒng)增加積分失敗或者A系統(tǒng)沒有下訂單,B系統(tǒng)卻增加了積分挚赊。前者對用戶不友好诡壁,后者對運營商不利滚停,這是我們都不愿意見到的的畴。
那么舆声,如何去解決這個問題呢碉纺?
如今比較常見的分布式事務實現(xiàn)有 2PC问词、TCC 和事務消息(half 半消息機制)熟史。每一種實現(xiàn)都有其特定的使用場景椒舵,但是也有各自的問題柜蜈,都不是完美的解決方案举反。
在 RocketMQ
中使用的是 事務消息加上事務反查機制 來解決分布式事務問題的懊直。我畫了張圖,大家可以對照著圖進行理解火鼻。
在第一步發(fā)送的 half 消息 室囊,它的意思是 在事務提交之前,對于消費者來說魁索,這個消息是不可見的 融撞。
那么,如何做到寫入消息但是對用戶不可見呢粗蔚?RocketMQ事務消息的做法是:如果消息是half消息尝偎,將備份原消息的主題與消息消費隊列,然后 改變主題 為RMQ_SYS_TRANS_HALF_TOPIC鹏控。由于消費組未訂閱該主題致扯,故消費端無法消費half類型的消息,然后RocketMQ會開啟一個定時任務当辐,從Topic為RMQ_SYS_TRANS_HALF_TOPIC中拉取消息進行消費抖僵,根據(jù)生產(chǎn)者組獲取一個服務提供者發(fā)送回查事務狀態(tài)請求,根據(jù)事務狀態(tài)來決定是提交或回滾消息缘揪。
你可以試想一下耍群,如果沒有從第5步開始的 事務反查機制 ,如果出現(xiàn)網(wǎng)路波動第4步?jīng)]有發(fā)送成功找筝,這樣就會產(chǎn)生 MQ 不知道是不是需要給消費者消費的問題蹈垢,他就像一個無頭蒼蠅一樣。在 RocketMQ
中就是使用的上述的事務反查來解決的袖裕,而在 Kafka
中通常是直接拋出一個異常讓用戶來自行解決曹抬。
你還需要注意的是,在 MQ Server
指向系統(tǒng)B的操作已經(jīng)和系統(tǒng)A不相關了陆赋,也就是說在消息隊列中的分布式事務是——本地事務和存儲消息到消息隊列才是同一個事務沐祷。這樣也就產(chǎn)生了事務的最終一致性嚷闭,因為整個過程是異步的,每個系統(tǒng)只要保證它自己那一部分的事務就行了赖临。
消息堆積問題
在上面我們提到了消息隊列一個很重要的功能——削峰 胞锰。那么如果這個峰值太大了導致消息堆積在隊列中怎么辦呢?
其實這個問題可以將它廣義化兢榨,因為產(chǎn)生消息堆積的根源其實就只有兩個——生產(chǎn)者生產(chǎn)太快或者消費者消費太慢嗅榕。
我們可以從多個角度去思考解決這個問題,當流量到峰值的時候是因為生產(chǎn)者生產(chǎn)太快吵聪,我們可以使用一些 限流降級 的方法凌那,當然你也可以增加多個消費者實例去水平擴展增加消費能力來匹配生產(chǎn)的激增。如果消費者消費過慢的話吟逝,我們可以先檢查 是否是消費者出現(xiàn)了大量的消費錯誤 帽蝶,或者打印一下日志查看是否是哪一個線程卡死,出現(xiàn)了鎖資源不釋放等等的問題块攒。
當然励稳,最快速解決消息堆積問題的方法還是增加消費者實例,不過 同時你還需要增加每個主題的隊列數(shù)量 囱井。
別忘了在
RocketMQ
中驹尼,一個隊列只會被一個消費者消費 ,如果你僅僅是增加消費者實例就會出現(xiàn)我一開始給你畫架構圖的那種情況庞呕。
回溯消費
回溯消費是指 Consumer
已經(jīng)消費成功的消息新翎,由于業(yè)務上需求需要重新消費,在RocketMQ
中住练, Broker
在向Consumer
投遞成功消息后地啰,消息仍然需要保留 。并且重新消費一般是按照時間維度澎羞,例如由于 Consumer
系統(tǒng)故障髓绽,恢復后需要重新消費1小時前的數(shù)據(jù)敛苇,那么 Broker
要提供一種機制妆绞,可以按照時間維度來回退消費進度。RocketMQ
支持按照時間回溯消費枫攀,時間維度精確到毫秒括饶。
這是官方文檔的解釋,我直接照搬過來就當科普了来涨。
RocketMQ 的刷盤機制
上面我講了那么多的 RocketMQ
的架構和設計原理图焰,你有沒有好奇
在 Topic
中的 隊列是以什么樣的形式存在的?
隊列中的消息又是如何進行存儲持久化的呢蹦掐?
我在上文中提到的 同步刷盤 和 異步刷盤 又是什么呢技羔?它們會給持久化帶來什么樣的影響呢僵闯?
下面我將給你們一一解釋。
同步刷盤和異步刷盤
如上圖所示藤滥,在同步刷盤中需要等待一個刷盤成功的 ACK
鳖粟,同步刷盤對 MQ
消息可靠性來說是一種不錯的保障,但是 性能上會有較大影響 拙绊,一般地適用于金融等特定業(yè)務場景向图。
而異步刷盤往往是開啟一個線程去異步地執(zhí)行刷盤操作。消息刷盤采用后臺異步線程提交的方式進行标沪, 降低了讀寫延遲 榄攀,提高了 MQ
的性能和吞吐量,一般適用于如發(fā)驗證碼等對于消息保證要求不太高的業(yè)務場景金句。
一般地檩赢,異步刷盤只有在 Broker
意外宕機的時候會丟失部分數(shù)據(jù),你可以設置 Broker
的參數(shù) FlushDiskType
來調整你的刷盤策略(ASYNC_FLUSH 或者 SYNC_FLUSH)违寞。
同步復制和異步復制
上面的同步刷盤和異步刷盤是在單個結點層面的漠畜,而同步復制和異步復制主要是指的 Borker
主從模式下,主節(jié)點返回消息給客戶端的時候是否需要同步從節(jié)點坞靶。
- 同步復制: 也叫 “同步雙寫”憔狞,也就是說,只有消息同步雙寫到主從結點上時才返回寫入成功 彰阴。
- 異步復制: 消息寫入主節(jié)點之后就直接返回寫入成功 瘾敢。
然而,很多事情是沒有完美的方案的尿这,就比如我們進行消息寫入的節(jié)點越多就更能保證消息的可靠性簇抵,但是隨之的性能也會下降,所以需要程序員根據(jù)特定業(yè)務場景去選擇適應的主從復制方案射众。
那么碟摆,異步復制會不會也像異步刷盤那樣影響消息的可靠性呢?
答案是不會的叨橱,因為兩者就是不同的概念典蜕,對于消息可靠性是通過不同的刷盤策略保證的,而像異步同步復制策略僅僅是影響到了 可用性 罗洗。為什么呢愉舔?其主要原因是 RocketMQ
是不支持自動主從切換的,當主節(jié)點掛掉之后伙菜,生產(chǎn)者就不能再給這個主節(jié)點生產(chǎn)消息了轩缤。
比如這個時候采用異步復制的方式,在主節(jié)點還未發(fā)送完需要同步的消息的時候主節(jié)點掛掉了,這個時候從節(jié)點就少了一部分消息火的。但是此時生產(chǎn)者無法再給主節(jié)點生產(chǎn)消息了壶愤,消費者可以自動切換到從節(jié)點進行消費(僅僅是消費),所以在主節(jié)點掛掉的時間只會產(chǎn)生主從結點短暫的消息不一致的情況馏鹤,降低了可用性公你,而當主節(jié)點重啟之后,從節(jié)點那部分未來得及復制的消息還會繼續(xù)復制假瞬。
在單主從架構中陕靠,如果一個主節(jié)點掛掉了,那么也就意味著整個系統(tǒng)不能再生產(chǎn)了脱茉。那么這個可用性的問題能否解決呢剪芥?一個主從不行那就多個主從的唄,別忘了在我們最初的架構圖中琴许,每個 Topic
是分布在不同 Broker
中的税肪。
但是這種復制方式同樣也會帶來一個問題,那就是無法保證 嚴格順序 榜田。在上文中我們提到了如何保證的消息順序性是通過將一個語義的消息發(fā)送在同一個隊列中益兄,使用 Topic
下的隊列來保證順序性的。如果此時我們主節(jié)點A負責的是訂單A的一系列語義消息箭券,然后它掛了净捅,這樣其他節(jié)點是無法代替主節(jié)點A的,如果我們任意節(jié)點都可以存入任何消息辩块,那就沒有順序性可言了蛔六。
而在 RocketMQ
中采用了 Dledger
解決這個問題。他要求在寫入消息的時候废亭,要求至少消息復制到半數(shù)以上的節(jié)點之后国章,才給客?端返回寫?成功,并且它是?持通過選舉來動態(tài)切換主節(jié)點的豆村。這里我就不展開說明了液兽,讀者可以自己去了解。
也不是說
Dledger
是個完美的方案掌动,至少在Dledger
選舉過程中是無法提供服務的四啰,而且他必須要使用三個節(jié)點或以上,如果多數(shù)節(jié)點同時掛掉他也是無法保證可用性的坏匪,而且要求消息復制板書以上節(jié)點的效率和直接異步復制還是有一定的差距的拟逮。
存儲機制
還記得上面我們一開始的三個問題嗎?到這里第三個問題已經(jīng)解決了适滓。
但是,在 Topic
中的 隊列是以什么樣的形式存在的恋追?隊列中的消息又是如何進行存儲持久化的呢凭迹? 還未解決罚屋,其實這里涉及到了 RocketMQ
是如何設計它的存儲結構了。我首先想大家介紹 RocketMQ
消息存儲架構中的三大角色——CommitLog
嗅绸、ConsumeQueue
和 IndexFile
脾猛。
-
CommitLog
: 消息主體以及元數(shù)據(jù)的存儲主體,存儲Producer
端寫入的消息主體內容,消息內容不是定長的鱼鸠。單個文件大小默認1G 猛拴,文件名長度為20位,左邊補零蚀狰,剩余為起始偏移量愉昆,比如00000000000000000000代表了第一個文件,起始偏移量為0麻蹋,文件大小為1G=1073741824跛溉;當?shù)谝粋€文件寫滿了,第二個文件為00000000001073741824扮授,起始偏移量為1073741824芳室,以此類推。消息主要是順序寫入日志文件刹勃,當文件滿了堪侯,寫入下一個文件。 -
ConsumeQueue
: 消息消費隊列荔仁,引入的目的主要是提高消息消費的性能(我們再前面也講了)抖格,由于RocketMQ
是基于主題Topic
的訂閱模式,消息消費是針對主題進行的咕晋,如果要遍歷commitlog
文件中根據(jù)Topic
檢索消息是非常低效的雹拄。Consumer
即可根據(jù)ConsumeQueue
來查找待消費的消息。其中掌呜,ConsumeQueue
(邏輯消費隊列)作為消費消息的索引滓玖,保存了指定Topic
下的隊列消息在CommitLog
中的起始物理偏移量offset
,消息大小size
和消息Tag
的HashCode
值质蕉。consumequeue
文件可以看成是基于topic
的commitlog
索引文件势篡,故consumequeue
文件夾的組織方式如下:topic/queue/file三層組織結構,具體存儲路徑為:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}模暗。同樣consumequeue
文件采取定長設計禁悠,每一個條目共20個字節(jié),分別為8字節(jié)的commitlog
物理偏移量兑宇、4字節(jié)的消息長度碍侦、8字節(jié)taghashcode
,單個文件由30W個條目組成,可以像數(shù)組一樣隨機訪問每一個條目瓷产,每個ConsumeQueue
文件大小約5.72M站玄; -
IndexFile
:IndexFile
(索引文件)提供了一種可以通過key或時間區(qū)間來查詢消息的方法。這里只做科普不做詳細介紹濒旦。
總結來說株旷,整個消息存儲的結構,最主要的就是 CommitLoq
和 ConsumeQueue
尔邓。而 ConsumeQueue
你可以大概理解為 Topic
中的隊列晾剖。
RocketMQ
采用的是 混合型的存儲結構 ,即為 Broker
單個實例下所有的隊列共用一個日志數(shù)據(jù)文件來存儲消息梯嗽。有意思的是在同樣高并發(fā)的 Kafka
中會為每個 Topic
分配一個存儲文件齿尽。這就有點類似于我們有一大堆書需要裝上書架,RockeMQ
是不分書的種類直接成批的塞上去的慷荔,而 Kafka
是將書本放入指定的分類區(qū)域的雕什。
而 RocketMQ
為什么要這么做呢?原因是 提高數(shù)據(jù)的寫入效率 显晶,不分 Topic
意味著我們有更大的幾率獲取 成批 的消息進行數(shù)據(jù)寫入贷岸,但也會帶來一個麻煩就是讀取消息的時候需要遍歷整個大文件,這是非常耗時的磷雇。
所以偿警,在 RocketMQ
中又使用了 ConsumeQueue
作為每個隊列的索引文件來 提升讀取消息的效率。我們可以直接根據(jù)隊列的消息序號唯笙,計算出索引的全局位置(索引序號*索引固定?度20)螟蒸,然后直接讀取這條索引,再根據(jù)索引中記錄的消息的全局位置崩掘,找到消息七嫌。
講到這里,你可能對 RockeMQ
的存儲架構還有些模糊苞慢,沒事诵原,我們結合著圖來理解一下。
emmm挽放,是不是有一點復雜绍赛,看英文圖片和英文文檔的時候就不要慫,硬著頭皮往下看就行辑畦。
如果上面沒看懂的讀者一定要認真看下面的流程分析吗蚌!
首先,在最上面的那一塊就是我剛剛講的你現(xiàn)在可以直接 把 ConsumerQueue
理解為 Queue
纯出。
在圖中最左邊說明了 紅色方塊 代表被寫入的消息蚯妇,虛線方塊代表等待被寫入的敷燎。左邊的生產(chǎn)者發(fā)送消息會指定 Topic
、QueueId
和具體消息內容侮措,而在 Broker
中管你是哪門子消息懈叹,他直接 **全部順序存儲到了 CommitLog **乖杠。而根據(jù)生產(chǎn)者指定的 Topic
和 QueueId
將這條消息本身在 CommitLog
的偏移(offset)分扎,消息本身大小,和tag的hash值存入對應的 ConsumeQueue
索引文件中胧洒。而在每個隊列中都保存了 ConsumeOffset
即每個消費者組的消費位置(我在架構那里提到了畏吓,忘了的同學可以回去看一下),而消費者拉取消息進行消費的時候只需要根據(jù) ConsumeOffset
獲取下一個未被消費的消息就行了卫漫。
上述就是我對于整個消息存儲架構的大概理解(這里不涉及到一些細節(jié)討論菲饼,比如稀疏索引等等問題),希望對你有幫助列赎。
為什么 CommitLog
文件要設計成固定大小的長度呢宏悦?提醒:內存映射機制。