一盔憨、為什么使用 MQ?
1.1 解耦
1.1.1 解耦1
例如電商系統(tǒng)核心是交易服務洗搂,交易服務要調用另外三個服務热康,訂單服務沛申、庫存服務、倉儲服務姐军。
這三個服務如果有一個服務不可用污它,交易服務就無法正常運行,所以交易服務是強耦合另外三個服務庶弃。
引入MQ之后衫贬,交易服務只跟MQ交互,把消息發(fā)到MQ里面就行了歇攻,無需關心另外三個服務是否可用固惯。這時候交易服務跟另外三個服務就是弱耦合的關系,耦合性被降低了缴守。
哪怕是另外三個服務暫時不可用葬毫,也不影響交易服務的運行,只要其他服務運行起來后屡穗,把MQ里面的消息消費了就行贴捡。(降級接口)
1.1.2 解耦2
假設 A 系統(tǒng)在用戶發(fā)生某個操作的時候,需要把用戶提交的數(shù)據(jù)同時推送到 B村砂、C 兩個系統(tǒng)的時候烂斋。這個時候負責 A 系統(tǒng)的哥們想:沒事啊,B础废、C 兩個系統(tǒng)給我提供一個 HTTP 接口或者 RPC 接口汛骂,我把數(shù)據(jù)推送過去不就完事了嘛,負責 A 系統(tǒng)的哥們美滋滋评腺。
一切看起來很美好帘瞭,但是隨著業(yè)務快速迭代,這個時候系統(tǒng) D 也想要這個數(shù)據(jù)蒿讥。那既然這樣蝶念,A 系統(tǒng)的開發(fā)同學就改咯,在發(fā)送數(shù)據(jù)給 B芋绸、C 的同時加上一個 D媒殉。但是,越到后面越發(fā)現(xiàn)侥钳,麻煩來了适袜。整個系統(tǒng)好像不止這個數(shù)據(jù)要發(fā)送給 B、C舷夺、D苦酱、還有第二售貌、第三個數(shù)據(jù)要發(fā)送給 B、C疫萤、D颂跨。甚至有時候又加入了 E、F 等系統(tǒng)扯饶,他們也要這個數(shù)據(jù)恒削。并且有時候可能 B 系統(tǒng)突然又不要這個數(shù)據(jù)了,A 系統(tǒng)改來改去尾序,A 系統(tǒng)的開發(fā)哥們頭皮發(fā)麻钓丰。更復雜的場景是,數(shù)據(jù)通過接口傳給其他系統(tǒng)有時候還要考慮重試每币、超時等一些異常情況携丁。
這個時候,就該我們的 MQ 粉墨登場了兰怠,這種情況下使用 MQ 來解耦是再合適不過了梦鉴,因為負責 A 系統(tǒng)的哥們只需要把消息扔到 MQ 就行了,其他系統(tǒng)按需來訂閱消息就好了揭保。就算某個系統(tǒng)不需要這個數(shù)據(jù)了肥橙,也不會需要 A 系統(tǒng)改動代碼。
1.2 異步
沒有引入MQ的時候秸侣,交易服務需要同步調用三個服務存筏,如果調用一個服務需要耗時1秒,那么同步調用三個服務需要耗時3秒塔次。在引入MQ之后方篮,全都改成了異步調用,整個耗時不到1秒励负,大大提高了接口的性能。
1.3 削峰
如果一秒內同時來了5000筆交易匕得,而訂單服務每秒只能處理100筆交易继榆,那么后面的4900筆交易失敗。在引入MQ之后汁掠,交易服務可以把交易數(shù)據(jù)先發(fā)送到MQ中略吨,而訂單服務再慢慢從MQ拉取交易信息處理。從而避免突發(fā)流量壓垮服務器考阱。
1.3.1 削峰填谷
舉個例子翠忠,比如我們的訂單系統(tǒng),在下單的時候就會往數(shù)據(jù)庫寫數(shù)據(jù)乞榨。但是數(shù)據(jù)庫只能支撐每秒 1000 左右的并發(fā)寫入秽之,并發(fā)量再高就容易宕機当娱。低峰期的時候并發(fā)也就 100 多個,但是在高峰期時候考榨,并發(fā)量會突然激增到 5000 以上跨细,這個時候數(shù)據(jù)庫肯定死了。
但是使用了 MQ 之后河质,情況就變了冀惭,消息被 MQ 保存起來了,然后系統(tǒng)就可以按照自己的消費能力來消費掀鹅,比如每秒 1000 個數(shù)據(jù)散休,這樣慢慢寫入數(shù)據(jù)庫,這樣就不會打死數(shù)據(jù)庫了乐尊。
至于為什么叫做削峰填谷呢溃槐?如果沒有用 MQ 的情況下,并發(fā)量高峰期的時候是有一個“頂峰”的科吭,然后高峰期過后又是一個低并發(fā)的“谷”昏滴。但是使用了 MQ 之后,限制消費消息的速度為 1000QPS对人,但是這樣一來谣殊,高峰期產生的數(shù)據(jù)勢必會被積壓在 MQ 中,高峰就被“削”掉了牺弄。但是因為消息積壓姻几,在高峰期過后的一段時間內,消費消息的速度還是會維持在 1000QPS势告,直到消費完積壓的消息顷链,這就叫做“填谷”。
二拄衰、引入MQ之后的問題
2.1 系統(tǒng)可用性降低
本來整個系統(tǒng)有四個服務奋隶,我們只需要保證這四個服務可用就行了。現(xiàn)在又多引入了一個MQ回溺,我們還要保證MQ的可用春贸,所以整個系統(tǒng)的可用性降低。
2.2 系統(tǒng)復雜性提高
本來交易服務是同步調用另外三個服務遗遵,如果另外三個服務不可用萍恕,交易服務能立即感知到。引入MQ之后车要,整個系統(tǒng)的穩(wěn)定性就要靠MQ保證了允粤。
這時候,我們就要考慮到發(fā)到MQ里面的消息怎么避免丟失的問題?順序性消費的問題类垫,就是同一筆交易的下單消息應該比撤單消息先處理司光。重復性消費的問題,就是同一筆下單交易的消息可能被多次處理阔挠。
當然飘庄,每種問題都有具體的解決方案,避免消息丟失可以使用MQ集群购撼,順序性消費可以把消息發(fā)到同一個分區(qū)跪削,重復性消費可以在消費端做冪等性處理。
2.3 重復消費問題
2.3.1 問題場景
重復消費問題可以說是 MQ 中普遍存在的問題迂求, 不管你用哪種 MQ 都無法避免碾盐。有哪些場景會出現(xiàn)重復的消息呢?
- 消息生產者產生了重復的消息揩局;
- Kafka 和 RocketMQ 的 offset 被回調了毫玖;
- 消息消費者確認失敗凌盯;
- 消息消費者確認時超時付枫;
- 業(yè)務系統(tǒng)主動發(fā)起重試。
如果重復消息不做正確的處理驰怎,會對業(yè)務造成很大的影響阐滩,產生重復數(shù)據(jù)或者導致數(shù)據(jù)異常,比如會員系統(tǒng)多開通了一個月的會員等县忌。
2.3.2 解決方案
不管是由于生產者產生的重復消息掂榔,還是由于消費者導致的重復消息,我們都可以在消費者中解決這個問題症杏。
這就要求消費者在做業(yè)務處理時装获,要做冪等設計。在這里我推薦增加一張消費消息表厉颤,來解決 MQ的這類問題穴豫。
消費消息表中,使用 messageId 做唯一索引走芋。在處理業(yè)務邏輯之前绩郎,先根據(jù) messageId 查詢一下該消息有沒有處理過。如果已經處理過了則直接返回成功翁逞,如果沒有處理過,則繼續(xù)做業(yè)務處理溉仑。
補充:RocketMQ消費過程冪等
以下內容來自RocketMQ官網:
RocketMQ無法避免消息重復(Exactly-Once)挖函,所以如果業(yè)務對消費重復非常敏感,務必要在業(yè)務層面進行去重處理≡勾可以借助關系數(shù)據(jù)庫進行去重津畸。首先需要確定消息的唯一鍵,可以是msgId必怜,也可以是消息內容中的唯一標識字段肉拓,例如訂單Id等。在消費之前判斷唯一鍵是否在關系數(shù)據(jù)庫中存在梳庆。如果不存在則插入暖途,并消費,否則跳過膏执。(實際過程要考慮原子性問題驻售,判斷是否存在可以嘗試插入,如果報主鍵沖突更米,則插入失敗欺栗,直接跳過)
msgId一定是全局唯一標識符,但是實際使用中征峦,可能會存在相同的消息有兩個不同msgId的情況(消費者主動重發(fā)迟几、因客戶端重投機制導致的重復等),這種情況就需要使業(yè)務字段進行重復消費栏笆。
2.4 數(shù)據(jù)一致性問題(異步分布式事務問題)
2.4.1 問題場景
當服務間是同步調用的時候类腮,我們還可以使用本地事務來控制數(shù)據(jù)的一致性。但是引入MQ之后竖伯,服務間的調用都是異步了存哲,就沒辦法使用本地事務,也就無法做到數(shù)據(jù)的強一致性了七婴。
例如祟偷,調用訂單服務下單成功了,但是調用庫存服務扣減庫存失敗打厘,就會導致超賣修肠,是嚴重的線上事故。
這時候怎么辦户盯?
方案一:需要事務強一致的嵌施,不用消息異步,如下單莽鸭、減庫存要放在一個事務里控制吗伤,加積分這種非核心的業(yè)務才用消息異步處理。
方案二:可以使用MQ事務消息(只有RocketMQ才有事務消息功能硫眨,RocketMQ收發(fā)事務消息)足淆。
事務狀態(tài)有以下三種:
- TransactionStatus.CommitTransaction:提交事務,允許訂閱方消費該消息。
- TransactionStatus.RollbackTransaction:回滾事務巧号,消息將被丟棄不允許消費族奢。
- TransactionStatus.Unknow:無法判斷狀態(tài),期待消息隊列RocketMQ版的Broker向發(fā)送方再次詢問該消息對應的本地事務的狀態(tài)丹鸿。
步驟一: A 服務向消息中間件發(fā)布消息
在服務A處理任務A前越走,首先向消息中間件發(fā)送一條半信息。
消息中間件收到后將該消息持久化靠欢,但不進行投遞廊敌。持久化成功后,向A服務返回確認應答掺涛。
服務A收到確認應答后庭敦,便可以開始處理任務A。
任務A處理完成后薪缆,服務A便會向消息中間件發(fā)送Commit 或者 Rollback 請求秧廉,該請求發(fā)送完成后,服務A的工作任務就結束了拣帽,該事務的處理過程也就結束了疼电。
在消息中間件收到 Commit 后,便會向 B 服務投遞消息减拭,如果收到 Rollback 便會直接丟棄消息蔽豺。
如果消息中間件在最后的過程中,長時間沒有收到服務A 發(fā)送的 Commit 或 Rollback 指令拧粪,這個時候就需要依靠 超時詢問機制修陡。
步驟二: 消息中間件向B服務投遞消息
消息中間件收到A服務的提交 Commit指令后便會將該消息投遞給B服務,然后將自己的狀態(tài)置為阻塞等待狀態(tài)可霎。B服務收到消息中間件發(fā)送的消息后便開始處理任務B魄鸦,處理完成后便會向消息中間件發(fā)出回應。但是在消息中間件阻塞等待的時候同樣會出現(xiàn)問題癣朗。
- 正常情況:消息中間件投遞完消息后拾因,進入阻塞等待狀態(tài),在收到確認應答后便認為事務處理完成旷余,該流程結束绢记。
- 等待超時情況:在等待確認應答超時之后就會重新進行投遞,直到B服務器返回消費成功響應為止正卧。而消息重試的次數(shù)和時間間隔都可以設置蠢熄,如果最終還是不能成功進行投遞,則需要人工干預炉旷。
2.4.2 解決方案
我們都知道數(shù)據(jù)一致性分為:強一致性护赊、弱一致性惠遏、最終一致性砾跃。
而 MQ 為了性能考慮使用的是最終一致性骏啰,那么必定會出現(xiàn)數(shù)據(jù)不一致的問題。這類問題大概率是因為消費者讀取消息后抽高,業(yè)務邏輯處理失敗導致的判耕。這時候可以增加重試機制。重試分為同步重試和異步重試翘骂。
有些消息量比較小的業(yè)務場景壁熄,可以采用同步重試。在消費消息時如果處理失敗碳竟,立刻重試 3-5 次草丧,如果還是失敗則寫入到記錄表中。但如果消息量比較大莹桅,則不建議使用這種方式昌执。因為如果出現(xiàn)網絡異常,可能會導致大量的消息不斷重試诈泼,影響消息讀取速度造成消息堆積懂拾。
消息量比較大的業(yè)務場景,建議采用異步重試铐达。在消費者處理失敗之后岖赋,立刻寫入重試表,有個 job(如采用xxljob) 專門定時重試瓮孙。
還有一種做法:如果消費失敗唐断,自己給同一個 topic 發(fā)一條消息。在后面的某個時間點杭抠,自己又會消費到那條消息脸甘,起到了重試的效果。如果對消息順序要求不高的場景祈争,可以使用這種方式斤程。
2.5 消息丟失問題
2.5.1 問題場景
同樣消息丟失問題,也是 MQ 中普遍存在的問題菩混,不管你用哪種 MQ 都 無法避免忿墅。有哪些 場景會出現(xiàn)消息丟失問題呢?
- 生產者產生消息時沮峡,由于網絡原因發(fā)送到 MQ 失敗了疚脐;
- MQ 服務器持久化,存儲磁盤時出現(xiàn)異常邢疙;
- Kafka和RocketMQ 的 offset 被回調時棍弄,略過了很多消息望薄;
- 消費者剛讀取消息,已經 ACK 確認呼畸,但業(yè)務還沒處理完痕支,服務就被重啟了。
導致消息丟失問題的原因挺多的蛮原, 生產者卧须、 MQ 服務器、 消費者都有可能產生問題儒陨。我在這里就不一一列舉了花嘶。最終的結果會導致消費者無法正確的處理消息,而導致數(shù)據(jù)不一致的情況蹦漠。
2.5.2 解決方案
不管你是否承認椭员,有時候消息真的會丟。即使這種概率非常小笛园,也會對業(yè)務有影響隘击。生產者、MQ 服務器喘沿、消費者都有可能會導致消息丟失的問題闸度。為了解決這個問題,我們可以增加一張消息發(fā)送表蚜印。
當生產者發(fā)完消息之后莺禁,會往該表中寫入一條數(shù)據(jù),狀態(tài) status 標記為待確認窄赋;
如果消費者讀取消息之后哟冬,調用生產者的 API 更新該消息的status為已確認;
有個job(xxljob) 每隔一段時間檢查一次消息發(fā)送表忆绰,如果5分鐘(這個時間可以根據(jù)實際情況來定)后還有狀態(tài)是待確認的消息浩峡,則認為該消息已經丟失了,重新發(fā)條消息错敢。
這樣不管是由于生產者翰灾、 MQ服務器、還是消費者導致的消息丟失問題稚茅,job 都會重新發(fā)消息纸淮。
2.6 消息順序問題
2.6.1 問題場景
有些業(yè)務數(shù)據(jù)是有狀態(tài)的,比如訂單有下單亚享、支付咽块、完成、退貨等狀態(tài)欺税。 如果訂單數(shù)據(jù)作為消息體侈沪,就會涉及順序問題了揭璃。
例如消費者收到同一個訂單的兩條消息。第一條消息的狀態(tài)是下單亭罪,第二條消息的狀態(tài)是支付瘦馍,這是沒問題的。但如果第一條消息的狀態(tài)是支付皆撩,第二條消息的狀態(tài)是下單就會有問題了扣墩。沒有下單就先支付了?
消息順序問題是一個非常棘手的問題扛吞,比如:
Kafka 同一個 partition 中能保證順序,但是不同的 partition 無法保證順序荆责;
RabbitMQ的同一個queue能夠保證順序滥比,但是如果多個消費者同一個queue 也會有順序問題。
如果消費者使用多線程消費消息做院,也無法保證順序盲泛。
如果消費消息時同一個訂單的多條消息中,中間的一條消息出現(xiàn)異常情況键耕,順序將會被打亂寺滚。
還有如果生產者發(fā)送到 MQ中的路由規(guī)則,跟消費者不一樣屈雄,也無法保證順序村视。
2.6.2 解決方案
消息順序問題是一種常見問題。我們以 Kafka 消費訂單消息為例酒奶,訂單有下單蚁孔、 支付、 完成惋嚎、 退貨等狀態(tài)杠氢。這些狀態(tài)是有先后順序的,如果順序錯了會導致業(yè)務異常另伍。
解決這類問題之前鼻百,我們需要先確認:消費者是否真的需要知道中間狀態(tài),只知道最終狀態(tài)行不行摆尝?
其實很多時候温艇,我真的需要知道的是最終狀態(tài)。這時可以把流程優(yōu)化一下:
這種方式可以解決大部分的消息順序問題结榄。
但如果真的有需要保證消息順序的需求中贝,那么可以將訂單號路由到不同的 partition。同一個訂單號的消息臼朗,每次到發(fā)到同一個partition邻寿。
2.7 消息堆積
2.7.1 問題場景
如果消息消費者讀取消息的速度蝎土,能夠跟上消息生產者的節(jié)奏,那么整套 MQ 機制就能發(fā)揮最大作用绣否。
但是很多時候誊涯,由于某些批處理或者其他原因,導致消費速度小于生產速度蒜撮。這樣會直接導致消息堆積問題暴构,從而影響業(yè)務功能。
這里以下單 開通會員為例段磨,如果消息出現(xiàn)堆積會導致用戶下單之后取逾,很久之后才能變成會員。這種情況肯定會引起大量用戶投訴苹支。
2.7.2 解決方案
那么消息堆積問題該如何解決呢砾隅?這個要看消息是否需要保證順序。如果不需要保證順序债蜜,可以讀取消息之后用多線程處理業(yè)務邏輯晴埂。
這樣就能增加業(yè)務邏輯處理速度,解決消息堆積問題寻定。但是線程池的核心線程數(shù)和最大線程數(shù)需要合理配置儒洛,不然可能會浪費系統(tǒng)資源。
如果需要保證順序狼速,可以讀取消息之后將消息按照一定的規(guī)則分發(fā)到多個隊列中琅锻,然后在隊列中用單線程處理。