為什么使用消息隊列
解耦
看這么個場景。A 系統(tǒng)發(fā)送數(shù)據(jù)到 BCD 三個系統(tǒng)布蔗,通過接口調(diào)用發(fā)送管闷。如果 E 系統(tǒng)也要這個數(shù)據(jù)呢?那如果 C 系統(tǒng)現(xiàn)在不需要了呢谍咆?A 系統(tǒng)負責人幾乎崩潰......
在這個場景中禾锤,A 系統(tǒng)跟其它各種亂七八糟的系統(tǒng)嚴重耦合,A 系統(tǒng)產(chǎn)生一條比較關(guān)鍵的數(shù)據(jù)摹察,很多系統(tǒng)都需要 A 系統(tǒng)將這個數(shù)據(jù)發(fā)送過來恩掷。A 系統(tǒng)要時時刻刻考慮 BCDE 四個系統(tǒng)如果掛了該咋辦?要不要重發(fā)供嚎,要不要把消息存起來黄娘?頭發(fā)都白了扒妥础!
如果使用 MQ逼争,A 系統(tǒng)產(chǎn)生一條數(shù)據(jù)优床,發(fā)送到 MQ 里面去,哪個系統(tǒng)需要數(shù)據(jù)自己去 MQ 里面消費誓焦。如果新系統(tǒng)需要數(shù)據(jù)胆敞,直接從 MQ 里消費即可;如果某個系統(tǒng)不需要這條數(shù)據(jù)了杂伟,就取消對 MQ 消息的消費即可移层。這樣下來,A 系統(tǒng)壓根兒不需要去考慮要給誰發(fā)送數(shù)據(jù)赫粥,不需要維護這個代碼观话,也不需要考慮人家是否調(diào)用成功、失敗超時等情況越平。
總結(jié):通過一個 MQ匪燕,Pub/Sub 發(fā)布訂閱消息這么一個模型,A 系統(tǒng)就跟其它系統(tǒng)徹底解耦了喧笔。你需要去考慮一下你負責的系統(tǒng)中是否有類似的場景帽驯,就是一個系統(tǒng)或者一個模塊,調(diào)用了多個系統(tǒng)或者模塊书闸,互相之間的調(diào)用很復雜尼变,維護起來很麻煩。但是其實這個調(diào)用是不需要直接同步調(diào)用接口的浆劲,如果用 MQ 給它異步化解耦嫌术,也是可以的,你就需要去考慮在你的項目里牌借,是不是可以運用這個 MQ 去進行系統(tǒng)的解耦婶肩。在簡歷中體現(xiàn)出來這塊東西葛峻,用 MQ 作解耦玄糟。
異步
再來看一個場景堰氓,A 系統(tǒng)接收一個請求,需要在自己本地寫庫现柠,還需要在 BCD 三個系統(tǒng)寫庫院领,自己本地寫庫要 3ms,BCD 三個系統(tǒng)分別寫庫要 300ms够吩、450ms比然、200ms。最終請求總延時是 3 + 300 + 450 + 200 = 953ms周循,接近 1s强法,用戶感覺搞個什么東西万俗,慢死了慢死了。用戶通過瀏覽器發(fā)起請求饮怯,等待個 1s闰歪,這幾乎是不可接受的。
一般互聯(lián)網(wǎng)類的企業(yè)硕淑,對于用戶直接的操作课竣,一般要求是每個請求都必須在 200 ms 以內(nèi)完成嘉赎,對用戶幾乎是無感知的置媳。
如果使用 MQ,那么 A 系統(tǒng)連續(xù)發(fā)送 3 條消息到 MQ 隊列中公条,假如耗時 5ms拇囊,A 系統(tǒng)從接受一個請求到返回響應(yīng)給用戶,總時長是 3 + 5 = 8ms靶橱,對于用戶而言寥袭,其實感覺上就是點個按鈕,8ms 以后就直接返回了关霸,爽传黄!網(wǎng)站做得真好,真快队寇!
削峰
每天 0:00 到 12:00膘掰,A 系統(tǒng)風平浪靜,每秒并發(fā)請求數(shù)量就 50 個佳遣。結(jié)果每次一到 12:00 ~ 13:00 识埋,每秒并發(fā)請求數(shù)量突然會暴增到 5k+ 條。但是系統(tǒng)是直接基于 MySQL 的零渐,大量的請求涌入 MySQL窒舟,每秒鐘對 MySQL 執(zhí)行約 5k 條 SQL。
一般的 MySQL诵盼,扛到每秒 2k 個請求就差不多了惠豺,如果每秒請求到 5k 的話,可能就直接把 MySQL 給打死了风宁,導致系統(tǒng)崩潰耕腾,用戶也就沒法再使用系統(tǒng)了。
但是高峰期一過杀糯,到了下午的時候扫俺,就成了低峰期,可能也就 1w 的用戶同時在網(wǎng)站上操作固翰,每秒中的請求數(shù)量可能也就 50 個請求狼纬,對整個系統(tǒng)幾乎沒有任何的壓力羹呵。
如果使用 MQ,每秒 5k 個請求寫入 MQ疗琉,A 系統(tǒng)每秒鐘最多處理 2k 個請求冈欢,因為 MySQL 每秒鐘最多處理 2k 個。A 系統(tǒng)從 MQ 中慢慢拉取請求盈简,每秒鐘就拉取 2k 個請求凑耻,不要超過自己每秒能處理的最大請求數(shù)量就 ok,這樣下來柠贤,哪怕是高峰期的時候香浩,A 系統(tǒng)也絕對不會掛掉。而 MQ 每秒鐘 5k 個請求進來臼勉,就 2k 個請求出去邻吭,結(jié)果就導致在中午高峰期(1 個小時),可能有幾十萬甚至幾百萬的請求積壓在 MQ 中宴霸。
這個短暫的高峰期積壓是 ok 的囱晴,因為高峰期過了之后,每秒鐘就 50 個請求進 MQ瓢谢,但是 A 系統(tǒng)依然會按照每秒 2k 個請求的速度在處理畸写。所以說,只要高峰期一過氓扛,A 系統(tǒng)就會快速將積壓的消息給解決掉枯芬。
消息隊列的缺點
系統(tǒng)可用性降低
系統(tǒng)引入的外部依賴越多,越容易掛掉幢尚。本來你就是 A 系統(tǒng)調(diào)用 BCD 三個系統(tǒng)的接口就好了破停,ABCD 四個系統(tǒng)還好好的,沒啥問題尉剩,你偏加個 MQ 進來真慢,萬一 MQ 掛了咋整?MQ 一掛理茎,整套系統(tǒng)崩潰黑界,你不就完了?
系統(tǒng)復雜度提高
硬生生加個 MQ 進來皂林,你怎么保證消息沒有重復消費朗鸠?怎么處理消息丟失的情況?怎么保證消息傳遞的順序性础倍?頭大頭大烛占,問題一大堆,痛苦不已。
一致性問題
A 系統(tǒng)處理完了直接返回成功了忆家,人都以為你這個請求就成功了犹菇;但是問題是,要是 BCD 三個系統(tǒng)那里芽卿,BD 兩個系統(tǒng)寫庫成功了揭芍,結(jié)果 C 系統(tǒng)寫庫失敗了,咋整卸例?你這數(shù)據(jù)就不一致了称杨。
ActiveMQ,RabbitMQ筷转,RocketMQ姑原,Kafka有什么異同
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
開發(fā)語言 | java | erlang | java | scala |
單機吞吐量 | 萬級,比 RocketMQ旦装、Kafka 低一個數(shù)量級 | 同 ActiveMQ | 10 萬級页衙,支撐高吞吐 | 10 萬級摊滔,高吞吐阴绢,一般配合大數(shù)據(jù)類的系統(tǒng)來進行實時數(shù)據(jù)計算、日志采集等場景 |
topic 數(shù)量對吞吐量的影響 | topic 可以達到幾百/幾千的級別艰躺,吞吐量會有較小幅度的下降呻袭,這是 RocketMQ 的一大優(yōu)勢,在同等機器下腺兴,可以支撐大量的 topic | topic 從幾十到幾百個時候左电,吞吐量會大幅度下降,在同等機器下页响,Kafka 盡量保證 topic 數(shù)量不要過多篓足,如果要支撐大規(guī)模的 topic,需要增加更多的機器資源 | ||
時效性 | ms 級 | 微秒級闰蚕,這是 RabbitMQ 的一大特點栈拖,延遲最低 | ms 級 | 延遲在 ms 級以內(nèi) |
可用性 | 高,基于主從架構(gòu)實現(xiàn)高可用 | 同 ActiveMQ | 非常高没陡,分布式架構(gòu) | 非常高涩哟,分布式,一個數(shù)據(jù)多個副本盼玄,少數(shù)機器宕機贴彼,不會丟失數(shù)據(jù),不會導致不可用 |
消息可靠性 | 有較低的概率丟失數(shù)據(jù) | 基本不丟 | 經(jīng)過參數(shù)優(yōu)化配置埃儿,可以做到 0 丟失 | 同 RocketMQ |
功能支持 | MQ 領(lǐng)域的功能極其完備 | 基于 erlang 開發(fā)器仗,并發(fā)能力很強,性能極好童番,延時很低 | MQ 功能較為完善精钮,還是分布式的暴心,擴展性好 | 功能較為簡單,主要支持簡單的 MQ 功能杂拨,在大數(shù)據(jù)領(lǐng)域的實時計算以及日志采集被大規(guī)模使用 |
社區(qū)活躍度 | 低 | 很高 | 一般 | 很高 |
- 中小型公司专普,技術(shù)實力較為一般,技術(shù)挑戰(zhàn)不是特別高弹沽,用 RabbitMQ 是不錯的選擇檀夹;
- 大型公司,基礎(chǔ)架構(gòu)研發(fā)實力較強策橘,用 RocketMQ 是很好的選擇炸渡。
- 大數(shù)據(jù)領(lǐng)域的實時計算、日志采集等場景丽已,用 Kafka 是業(yè)內(nèi)標準的蚌堵,幾乎是全世界這個領(lǐng)域的事實性規(guī)范。
如何保證消息隊列的高可用沛婴?
RabbitMQ 的高可用性
RabbitMQ 是比較有代表性的吼畏,因為是基于主從(非分布式)做高可用性的,我們就以 RabbitMQ 為例子講解第一種 MQ 的高可用性怎么實現(xiàn)嘁灯。
RabbitMQ 有三種模式:單機模式泻蚊、普通集群模式、鏡像集群模式丑婿。
單機模式
單機模式性雄,就是 Demo 級別的,一般就是你本地啟動了玩玩兒的??羹奉,沒人生產(chǎn)用單機模式秒旋。
普通集群模式(無高可用性)
普通集群模式,意思就是在多臺機器上啟動多個 RabbitMQ 實例诀拭,每個機器啟動一個迁筛。你創(chuàng)建的 queue,只會放在一個 RabbitMQ 實例上炫加,但是每個實例都同步 queue 的元數(shù)據(jù)(元數(shù)據(jù)可以認為是 queue 的一些配置信息瑰煎,通過元數(shù)據(jù),可以找到 queue 所在實例)俗孝。你消費的時候酒甸,實際上如果連接到了另外一個實例,那么那個實例會從 queue 所在實例上拉取數(shù)據(jù)過來赋铝。
這種方式確實很麻煩插勤,也不怎么好,沒做到所謂的分布式,就是個普通集群农尖。因為這導致你要么消費者每次隨機連接一個實例然后拉取數(shù)據(jù)析恋,要么固定連接那個 queue 所在實例消費數(shù)據(jù),前者有數(shù)據(jù)拉取的開銷盛卡,后者導致單實例性能瓶頸助隧。
而且如果那個放 queue 的實例宕機了,會導致接下來其他實例就無法從那個實例拉取滑沧,如果你開啟了消息持久化并村,讓 RabbitMQ 落地存儲消息的話,消息不一定會丟滓技,得等這個實例恢復了哩牍,然后才可以繼續(xù)從這個 queue 拉取數(shù)據(jù)。
所以這個事兒就比較尷尬了令漂,這就沒有什么所謂的高可用性膝昆,這方案主要是提高吞吐量的,就是說讓集群中多個節(jié)點來服務(wù)某個 queue 的讀寫操作叠必。
鏡像集群模式(高可用性)
這種模式荚孵,才是所謂的 RabbitMQ 的高可用模式。跟普通集群模式不一樣的是挠唆,在鏡像集群模式下处窥,你創(chuàng)建的 queue嘱吗,無論元數(shù)據(jù)還是 queue 里的消息都會存在于多個實例上玄组,就是說,每個 RabbitMQ 節(jié)點都有這個 queue 的一個完整鏡像谒麦,包含 queue 的全部數(shù)據(jù)的意思俄讹。然后每次你寫消息到 queue 的時候,都會自動把消息同步到多個實例的 queue 上绕德。
那么如何開啟這個鏡像集群模式呢患膛?其實很簡單,RabbitMQ 有很好的管理控制臺耻蛇,就是在后臺新增一個策略踪蹬,這個策略是鏡像集群模式的策略,指定的時候是可以要求數(shù)據(jù)同步到所有節(jié)點的臣咖,也可以要求同步到指定數(shù)量的節(jié)點跃捣,再次創(chuàng)建 queue 的時候,應(yīng)用這個策略夺蛇,就會自動將數(shù)據(jù)同步到其他的節(jié)點上去了疚漆。
這樣的話,好處在于,你任何一個機器宕機了娶聘,沒事兒闻镶,其它機器(節(jié)點)還包含了這個 queue 的完整數(shù)據(jù),別的 consumer 都可以到其它節(jié)點上去消費數(shù)據(jù)丸升。壞處在于铆农,第一,這個性能開銷也太大了吧狡耻,消息需要同步到所有機器上顿涣,導致網(wǎng)絡(luò)帶寬壓力和消耗很重!第二酝豪,這么玩兒涛碑,不是分布式的,就沒有擴展性可言了孵淘,如果某個 queue 負載很重蒲障,你加機器,新增的機器也包含了這個 queue 的所有數(shù)據(jù)瘫证,并沒有辦法線性擴展你的 queue揉阎。你想,如果這個 queue 的數(shù)據(jù)量很大背捌,大到這個機器上的容量無法容納了毙籽,此時該怎么辦呢?
RocketMQ 的高可用性
RcoketMQ的集群有:多master 模式毡庆、多master多slave異步復制模式坑赡、多 master多slave同步雙寫模式。
多master多slave模式部署架構(gòu)圖:
通信過程如下:Producer 與 NameServer集群中的其中一個節(jié)點(隨機選擇)建立長連接么抗,定期從 NameServer 獲取 Topic 路由信息毅否,并向提供 Topic 服務(wù)的 Broker Master 建立長連接,且定時向 Broker 發(fā)送心跳蝇刀。Producer 只能將消息發(fā)送到 Broker master螟加,但是 Consumer 則不一樣,它同時和提供 Topic 服務(wù)的 Master 和 Slave建立長連接吞琐,既可以從 Broker Master 訂閱消息捆探,也可以從 Broker Slave 訂閱消息。
Kafka 的高可用性
Kafka 一個最基本的架構(gòu)認識:由多個 broker 組成站粟,每個 broker 是一個節(jié)點黍图;你創(chuàng)建一個 topic,這個 topic 可以劃分為多個 partition卒蘸,每個 partition 可以存在于不同的 broker 上雌隅,每個 partition 就放一部分數(shù)據(jù)翻默。
這就是天然的分布式消息隊列,就是說一個 topic 的數(shù)據(jù)恰起,是分散放在多個機器上的修械,每個機器就放一部分數(shù)據(jù)。
實際上 RabbitMQ 之類的检盼,并不是分布式消息隊列肯污,它就是傳統(tǒng)的消息隊列,只不過提供了一些集群吨枉、HA(High Availability, 高可用性) 的機制而已蹦渣,因為無論怎么玩兒,RabbitMQ 一個 queue 的數(shù)據(jù)都是放在一個節(jié)點里的貌亭,鏡像集群下柬唯,也是每個節(jié)點都放這個 queue 的完整數(shù)據(jù)。
Kafka 0.8 以前圃庭,是沒有 HA 機制的锄奢,就是任何一個 broker 宕機了,那個 broker 上的 partition 就廢了剧腻,沒法寫也沒法讀拘央,沒有什么高可用性可言。
比如說书在,我們假設(shè)創(chuàng)建了一個 topic灰伟,指定其 partition 數(shù)量是 3 個,分別在三臺機器上儒旬。但是栏账,如果第二臺機器宕機了,會導致這個 topic 的 1/3 的數(shù)據(jù)就丟了义矛,因此這個是做不到高可用的发笔。
Kafka 0.8 以后,提供了 HA 機制凉翻,就是 replica(復制品) 副本機制。每個 partition 的數(shù)據(jù)都會同步到其它機器上捻激,形成自己的多個 replica 副本制轰。所有 replica 會選舉一個 leader 出來,那么生產(chǎn)和消費都跟這個 leader 打交道胞谭,然后其他 replica 就是 follower垃杖。寫的時候,leader 會負責把數(shù)據(jù)同步到所有 follower 上去丈屹,讀的時候就直接讀 leader 上的數(shù)據(jù)即可调俘。只能讀寫 leader伶棒?很簡單,要是你可以隨意讀寫每個 follower彩库,那么就要 care 數(shù)據(jù)一致性的問題肤无,系統(tǒng)復雜度太高,很容易出問題骇钦。Kafka 會均勻地將一個 partition 的所有 replica 分布在不同的機器上宛渐,這樣才可以提高容錯性。
這么搞眯搭,就有所謂的高可用性了窥翩,因為如果某個 broker 宕機了,沒事兒鳞仙,那個 broker上面的 partition 在其他機器上都有副本的寇蚊。如果這個宕機的 broker 上面有某個 partition 的 leader,那么此時會從 follower 中重新選舉一個新的 leader 出來棍好,大家繼續(xù)讀寫那個新的 leader 即可幔荒。這就有所謂的高可用性了。
寫數(shù)據(jù)的時候梳玫,生產(chǎn)者就寫 leader爹梁,然后 leader 將數(shù)據(jù)落地寫本地磁盤,接著其他 follower 自己主動從 leader 來 pull 數(shù)據(jù)提澎。一旦所有 follower 同步好數(shù)據(jù)了姚垃,就會發(fā)送 ack 給 leader,leader 收到所有 follower 的 ack 之后盼忌,就會返回寫成功的消息給生產(chǎn)者积糯。(當然,這只是其中一種模式谦纱,還可以適當調(diào)整這個行為)
消費的時候看成,只會從 leader 去讀,但是只有當一個消息已經(jīng)被所有 follower 都同步成功返回 ack 的時候跨嘉,這個消息才會被消費者讀到川慌。
如何保證消息不被重復消費?
回答這個問題祠乃,首先你別聽到重復消息這個事兒梦重,就一無所知吧,你先大概說一說可能會有哪些重復消費的問題亮瓷。
首先琴拧,比如 RabbitMQ、RocketMQ嘱支、Kafka蚓胸,都有可能會出現(xiàn)消息重復消費的問題挣饥,正常。因為這問題通常不是 MQ 自己保證的沛膳,是由我們開發(fā)來保證的扔枫。挑一個 Kafka 來舉個例子,說說怎么重復消費吧于置。
Kafka 實際上有個 offset 的概念茧吊,就是每個消息寫進去,都有一個 offset八毯,代表消息的序號搓侄,然后 consumer 消費了數(shù)據(jù)之后,每隔一段時間(定時定期)话速,會把自己消費過的消息的 offset 提交一下讶踪,表示“我已經(jīng)消費過了,下次我要是重啟啥的泊交,你就讓我繼續(xù)從上次消費到的 offset 來繼續(xù)消費吧”乳讥。
但是凡事總有意外,比如我們之前生產(chǎn)經(jīng)常遇到的廓俭,就是你有時候重啟系統(tǒng)云石,看你怎么重啟了,如果碰到點著急的研乒,直接 kill 進程了汹忠,再重啟。這會導致 consumer 有些消息處理了雹熬,但是沒來得及提交 offset宽菜,尷尬了。重啟之后竿报,少數(shù)消息會再次消費一次铅乡。
舉個栗子。
有這么個場景烈菌。數(shù)據(jù) 1/2/3 依次進入 kafka阵幸,kafka 會給這三條數(shù)據(jù)每條分配一個 offset,代表這條數(shù)據(jù)的序號僧界,我們就假設(shè)分配的 offset 依次是 152/153/154侨嘀。消費者從 kafka 去消費的時候,也是按照這個順序去消費捂襟。假如當消費者消費了 offset=153
的這條數(shù)據(jù),剛準備去提交 offset 到 zookeeper欢峰,此時消費者進程被重啟了葬荷。那么此時消費過的數(shù)據(jù) 1/2 的 offset 并沒有提交涨共,kafka 也就不知道你已經(jīng)消費了 offset=153
這條數(shù)據(jù)。那么重啟之后宠漩,消費者會找 kafka 說举反,嘿海雪,哥兒們风瘦,你給我接著把上次我消費到的那個地方后面的數(shù)據(jù)繼續(xù)給我傳遞過來。由于之前的 offset 沒有提交成功藻雪,那么數(shù)據(jù) 1/2 會再次傳過來雕崩,如果此時消費者沒有去重的話魁索,那么就會導致重復消費。
如果消費者干的事兒是拿一條數(shù)據(jù)就往數(shù)據(jù)庫里寫一條盼铁,會導致說粗蔚,你可能就把數(shù)據(jù) 1/2 在數(shù)據(jù)庫里插入了 2 次,那么數(shù)據(jù)就錯啦饶火。
其實重復消費不可怕鹏控,可怕的是你沒考慮到重復消費之后,怎么保證冪等性肤寝。
舉個例子吧当辐。假設(shè)你有個系統(tǒng),消費一條消息就往數(shù)據(jù)庫里插入一條數(shù)據(jù)鲤看,要是你一個消息重復兩次缘揪,你不就插入了兩條,這數(shù)據(jù)不就錯了刨摩?但是你要是消費到第二次的時候寺晌,自己判斷一下是否已經(jīng)消費過了,若是就直接扔了澡刹,這樣不就保留了一條數(shù)據(jù)呻征,從而保證了數(shù)據(jù)的正確性。
一條數(shù)據(jù)重復出現(xiàn)兩次罢浇,數(shù)據(jù)庫里就只有一條數(shù)據(jù)陆赋,這就保證了系統(tǒng)的冪等性。
冪等性嚷闭,通俗點說攒岛,就一個數(shù)據(jù),或者一個請求胞锰,給你重復來多次灾锯,你得確保對應(yīng)的數(shù)據(jù)是不會改變的,不能出錯嗅榕。
所以第二個問題來了顺饮,怎么保證消息隊列消費的冪等性吵聪?
其實還是得結(jié)合業(yè)務(wù)來思考,我這里給幾個思路:
- 比如你拿個數(shù)據(jù)要寫庫兼雄,你先根據(jù)主鍵查一下吟逝,如果這數(shù)據(jù)都有了,你就別插入了赦肋,update 一下好吧块攒。
- 比如你是寫 Redis,那沒問題了佃乘,反正每次都是 set囱井,天然冪等性。
- 比如你不是上面兩個場景恕稠,那做的稍微復雜一點琅绅,你需要讓生產(chǎn)者發(fā)送每條數(shù)據(jù)的時候,里面加一個全局唯一的 id鹅巍,類似訂單 id 之類的東西千扶,然后你這里消費到了之后,先根據(jù)這個 id 去比如 Redis 里查一下骆捧,之前消費過嗎澎羞?如果沒有消費過,你就處理敛苇,然后這個 id 寫 Redis妆绞。如果消費過了,那你就別處理了枫攀,保證別重復處理相同的消息即可括饶。
- 比如基于數(shù)據(jù)庫的唯一鍵來保證重復數(shù)據(jù)不會重復插入多條。因為有唯一鍵約束了来涨,重復數(shù)據(jù)插入只會報錯图焰,不會導致數(shù)據(jù)庫中出現(xiàn)臟數(shù)據(jù)。
當然蹦掐,如何保證 MQ 的消費是冪等性的技羔,需要結(jié)合具體的業(yè)務(wù)來看。
如何保證消息的可靠傳輸(處理消息丟失)卧抗?
RabbitMQ
生產(chǎn)者弄丟數(shù)據(jù)
生產(chǎn)者將數(shù)據(jù)發(fā)送到 RabbitMQ 的時候藤滥,可能數(shù)據(jù)就在半路給搞丟了,因為網(wǎng)絡(luò)問題啥的社裆,都有可能拙绊。
此時可以選擇用 RabbitMQ 提供的事務(wù)功能,就是生產(chǎn)者發(fā)送數(shù)據(jù)之前開啟 RabbitMQ 事務(wù)channel.txSelect
,然后發(fā)送消息时呀,如果消息沒有成功被 RabbitMQ 接收到张漂,那么生產(chǎn)者會收到異常報錯晶默,此時就可以回滾事務(wù)channel.txRollback
谨娜,然后重試發(fā)送消息;如果收到了消息磺陡,那么可以提交事務(wù)channel.txCommit
趴梢。
// 開啟事務(wù)
channel.txSelect
try {
// 這里發(fā)送消息
} catch (Exception e) {
channel.txRollback
// 這里再次重發(fā)這條消息
} // 提交事務(wù)
channel.txCommit
但是問題是,RabbitMQ 事務(wù)機制(同步)一搞币他,基本上吞吐量會下來坞靶,因為太耗性能。
所以一般來說蝴悉,如果你要確保說寫 RabbitMQ 的消息別丟彰阴,可以開啟 confirm
模式,在生產(chǎn)者那里設(shè)置開啟 confirm
模式之后拍冠,你每次寫的消息都會分配一個唯一的 id尿这,然后如果寫入了 RabbitMQ 中,RabbitMQ 會給你回傳一個 ack
消息庆杜,告訴你說這個消息 ok 了射众。如果 RabbitMQ 沒能處理這個消息,會回調(diào)你的一個 nack
接口晃财,告訴你這個消息接收失敗叨橱,你可以重試。而且你可以結(jié)合這個機制自己在內(nèi)存里維護每個消息 id 的狀態(tài)断盛,如果超過一定時間還沒接收到這個消息的回調(diào)罗洗,那么你可以重發(fā)。
事務(wù)機制和 confirm
機制最大的不同在于钢猛,事務(wù)機制是同步的伙菜,你提交一個事務(wù)之后會阻塞在那兒,但是 confirm
機制是異步的厢洞,你發(fā)送個消息之后就可以發(fā)送下一個消息仇让,然后那個消息 RabbitMQ 接收了之后會異步回調(diào)你的一個接口通知你這個消息接收到了。
所以一般在生產(chǎn)者這塊避免數(shù)據(jù)丟失躺翻,都是用 confirm
機制的丧叽。
RabbitMQ弄丟數(shù)據(jù)
就是 RabbitMQ 自己弄丟了數(shù)據(jù),這個你必須開啟 RabbitMQ 的持久化公你,就是消息寫入之后會持久化到磁盤踊淳,哪怕是 RabbitMQ 自己掛了,恢復之后會自動讀取之前存儲的數(shù)據(jù),一般數(shù)據(jù)不會丟迂尝。除非極其罕見的是脱茉,RabbitMQ 還沒持久化,自己就掛了垄开,可能導致少量數(shù)據(jù)丟失琴许,但是這個概率較小。
設(shè)置持久化有兩個步驟:
- 創(chuàng)建 queue 的時候?qū)⑵湓O(shè)置為持久化
這樣就可以保證 RabbitMQ 持久化 queue 的元數(shù)據(jù)溉躲,但是它是不會持久化 queue 里的數(shù)據(jù)的榜田。 - 第二個是發(fā)送消息的時候?qū)⑾⒌?
deliveryMode
設(shè)置為 2
就是將消息設(shè)置為持久化的,此時 RabbitMQ 就會將消息持久化到磁盤上去锻梳。
必須要同時設(shè)置這兩個持久化才行箭券,RabbitMQ 哪怕是掛了,再次重啟疑枯,也會從磁盤上重啟恢復 queue辩块,恢復這個 queue 里的數(shù)據(jù)。
注意荆永,哪怕是你給 RabbitMQ 開啟了持久化機制废亭,也有一種可能,就是這個消息寫到了 RabbitMQ 中屁魏,但是還沒來得及持久化到磁盤上滔以,結(jié)果不巧,此時 RabbitMQ 掛了氓拼,就會導致內(nèi)存里的一點點數(shù)據(jù)丟失你画。
所以,持久化可以跟生產(chǎn)者那邊的 confirm
機制配合起來桃漾,只有消息被持久化到磁盤之后坏匪,才會通知生產(chǎn)者 ack
了,所以哪怕是在持久化到磁盤之前撬统,RabbitMQ 掛了适滓,數(shù)據(jù)丟了,生產(chǎn)者收不到 ack
恋追,你也是可以自己重發(fā)的凭迹。
消費端弄丟數(shù)據(jù)
RabbitMQ 如果丟失了數(shù)據(jù),主要是因為你消費的時候苦囱,剛消費到嗅绸,還沒處理,結(jié)果進程掛了撕彤,比如重啟了鱼鸠,那么就尷尬了,RabbitMQ 認為你都消費了,這數(shù)據(jù)就丟了蚀狰。
這個時候得用 RabbitMQ 提供的 ack
機制愉昆,簡單來說,就是你必須關(guān)閉 RabbitMQ 的自動 ack
麻蹋,可以通過一個 api 來調(diào)用就行跛溉,然后每次你自己代碼里確保處理完的時候,再在程序里 ack
一把哥蔚。這樣的話倒谷,如果你還沒處理完,不就沒有 ack
了糙箍?那 RabbitMQ 就認為你還沒處理完,這個時候 RabbitMQ 會把這個消費分配給別的 consumer 去處理牵祟,消息是不會丟的深夯。
Kafka
先引一張Kafka Replication數(shù)據(jù)流向圖
消費端弄丟數(shù)據(jù)
唯一可能導致消費者弄丟數(shù)據(jù)的情況,就是說诺苹,你消費到了這個消息咕晋,然后消費者那邊自動提交了 offset,讓 Kafka 以為你已經(jīng)消費好了這個消息收奔,但其實你才剛準備處理這個消息掌呜,你還沒處理,你自己就掛了坪哄,此時這條消息就丟咯质蕉。
這不是跟 RabbitMQ 差不多嗎,大家都知道 Kafka 會自動提交 offset翩肌,那么只要關(guān)閉自動提交 offset模暗,在處理完之后自己手動提交 offset,就可以保證數(shù)據(jù)不會丟念祭。但是此時確實還是可能會有重復消費兑宇,比如你剛處理完,還沒提交 offset粱坤,結(jié)果自己掛了隶糕,此時肯定會重復消費一次,自己保證冪等性就好了站玄。
生產(chǎn)環(huán)境碰到的一個問題枚驻,就是說我們的 Kafka 消費者消費到了數(shù)據(jù)之后是寫到一個內(nèi)存的 queue 里先緩沖一下,結(jié)果有的時候蜒什,你剛把消息寫入內(nèi)存 queue测秸,然后消費者會自動提交 offset。然后此時我們重啟了系統(tǒng),就會導致內(nèi)存 queue 里還沒來得及處理的數(shù)據(jù)就丟失了霎冯。
Kafka弄丟數(shù)據(jù)
這塊比較常見的一個場景铃拇,就是 Kafka 某個 broker 宕機,然后重新選舉 partition 的 leader沈撞。大家想想慷荔,要是此時其他的 follower 剛好還有些數(shù)據(jù)沒有同步,結(jié)果此時 leader 掛了缠俺,然后選舉某個 follower 成 leader 之后显晶,不就少了一些數(shù)據(jù)?這就丟了一些數(shù)據(jù)啊壹士。
生產(chǎn)環(huán)境也遇到過磷雇,我們也是,之前 Kafka 的 leader 機器宕機了躏救,將 follower 切換為 leader 之后唯笙,就會發(fā)現(xiàn)說這個數(shù)據(jù)就丟了。
所以此時一般是要求起碼設(shè)置如下 4 個參數(shù):
- 給 topic 設(shè)置
replication.factor
參數(shù):這個值必須大于 1盒使,要求每個 partition 必須有至少 2 個副本崩掘。 - 在 Kafka 服務(wù)端設(shè)置
min.insync.replicas
參數(shù):這個值必須大于 1,這個是要求一個 leader 至少感知到有至少一個 follower 還跟自己保持聯(lián)系少办,沒掉隊苞慢,這樣才能確保 leader 掛了還有一個 follower 吧。 - 在 producer 端設(shè)置
acks=all
:這個是要求每條數(shù)據(jù)英妓,必須是寫入所有 replica 之后挽放,才能認為是寫成功了。 - 在 producer 端設(shè)置
retries=MAX
(很大很大很大的一個值鞋拟,無限次重試的意思):這個是要求一旦寫入失敗骂维,就無限重試,卡在這里了贺纲。
我們生產(chǎn)環(huán)境就是按照上述要求配置的航闺,這樣配置之后,至少在 Kafka broker 端就可以保證在 leader 所在 broker 發(fā)生故障猴誊,進行 leader 切換時潦刃,數(shù)據(jù)不會丟失。
生產(chǎn)者丟數(shù)據(jù)
如果按照上述的思路設(shè)置了 acks=all
懈叹,一定不會丟乖杠,要求是,你的 leader 接收到消息澄成,所有的 follower 都同步到了消息之后胧洒,才認為本次寫成功了畏吓。如果沒滿足這個條件,生產(chǎn)者會自動不斷的重試卫漫,重試無限次菲饼。
如何保證消息的順序?
我舉個例子列赎,我們以前做過一個 mysql binlog
同步的系統(tǒng)宏悦,壓力還是非常大的,日同步數(shù)據(jù)要達到上億包吝,就是說數(shù)據(jù)從一個 mysql 庫原封不動地同步到另一個 mysql 庫里面去(mysql -> mysql)饼煞。常見的一點在于說比如大數(shù)據(jù) team,就需要同步一個 mysql 庫過來诗越,對公司的業(yè)務(wù)系統(tǒng)的數(shù)據(jù)做各種復雜的操作砖瞧。
你在 mysql 里增刪改一條數(shù)據(jù),對應(yīng)出來了增刪改 3 條 binlog
日志掺喻,接著這三條 binlog
發(fā)送到 MQ 里面芭届,再消費出來依次執(zhí)行,起碼得保證人家是按照順序來的吧感耙?不然本來是:增加、修改持隧、刪除即硼;你愣是換了順序給執(zhí)行成刪除、修改屡拨、增加只酥,不全錯了么。
本來這個數(shù)據(jù)同步過來呀狼,應(yīng)該最后這個數(shù)據(jù)被刪除了裂允;結(jié)果你搞錯了這個順序,最后這個數(shù)據(jù)保留下來了哥艇,數(shù)據(jù)同步就出錯了绝编。
先看看順序會錯亂的倆場景:
RabbitMQ:一個 queue,多個 consumer貌踏。比如十饥,生產(chǎn)者向 RabbitMQ 里發(fā)送了三條數(shù)據(jù),順序依次是 data1/data2/data3祖乳,壓入的是 RabbitMQ 的一個內(nèi)存隊列逗堵。有三個消費者分別從 MQ 中消費這三條數(shù)據(jù)中的一條,結(jié)果消費者2先執(zhí)行完操作眷昆,把 data2 存入數(shù)據(jù)庫蜒秤,然后是 data1/data3汁咏。這不明顯亂了。
Kafka:比如說我們建了一個 topic作媚,有三個 partition攘滩。生產(chǎn)者在寫的時候,其實可以指定一個 key掂骏,比如說我們指定了某個訂單 id 作為 key轰驳,那么這個訂單相關(guān)的數(shù)據(jù),一定會被分發(fā)到同一個 partition 中去弟灼,而且這個 partition 中的數(shù)據(jù)一定是有順序的级解。
消費者從 partition 中取出來數(shù)據(jù)的時候,也一定是有順序的田绑。到這里勤哗,順序還是 ok 的,沒有錯亂掩驱。接著芒划,我們在消費者里可能會搞多個線程來并發(fā)處理消息。因為如果消費者是單線程消費處理欧穴,而處理比較耗時的話民逼,比如處理一條消息耗時幾十 ms,那么 1 秒鐘只能處理幾十條消息涮帘,這吞吐量太低了拼苍。而多個線程并發(fā)跑的話,順序可能就亂掉了调缨。
解決方案
RabbitMQ
拆分多個 queue疮鲫,每個 queue 一個 consumer,就是多一些 queue 而已弦叶,確實是麻煩點俊犯;或者就一個 queue 但是對應(yīng)一個 consumer,然后這個 consumer 內(nèi)部用內(nèi)存隊列做排隊伤哺,然后分發(fā)給底層不同的 worker 來處理燕侠。
Kafka
- 一個 topic,一個 partition默责,一個 consumer贬循,內(nèi)部單線程消費,單線程吞吐量太低桃序,一般不會用這個杖虾。
- 寫 N 個內(nèi)存 queue,具有相同 key 的數(shù)據(jù)都到同一個內(nèi)存 queue媒熊;然后對于 N 個線程奇适,每個線程分別消費一個內(nèi)存 queue 即可坟比,這樣就能保證順序性。
如何解決消息隊列的延時以及過期失效問題嚷往?消息隊列滿了以后該怎么處理葛账?有幾百萬消息持續(xù)積壓幾小時,說說怎么解決皮仁?
你看這問法籍琳,其實本質(zhì)針對的場景,都是說贷祈,可能你的消費端出了問題趋急,不消費了;或者消費的速度極其慢势誊。接著就坑爹了呜达,可能你的消息隊列集群的磁盤都快寫滿了,都沒人消費粟耻,這個時候怎么辦查近?或者是這整個就積壓了幾個小時,你這個時候怎么辦挤忙?或者是你積壓的時間太長了霜威,導致比如 RabbitMQ 設(shè)置了消息過期時間后就沒了怎么辦?
所以就這事兒册烈,其實線上挺常見的侥祭,一般不出,一出就是大 case茄厘。一般常見于,舉個例子谈宛,消費端每次消費之后要寫 mysql次哈,結(jié)果 mysql 掛了,消費端 hang 那兒了吆录,不動了窑滞;或者是消費端出了個什么岔子,導致消費速度極其慢恢筝。
大量消息在 mq 里積壓了幾個小時了還沒解決
幾千萬條數(shù)據(jù)在 MQ 里積壓了七八個小時哀卫,從下午 4 點多,積壓到了晚上 11 點多撬槽。這個是我們真實遇到過的一個場景此改,確實是線上故障了,這個時候要不然就是修復 consumer 的問題侄柔,讓它恢復消費速度共啃,然后傻傻的等待幾個小時消費完畢占调。這個肯定不能在面試的時候說吧。
一個消費者一秒是 1000 條移剪,一秒 3 個消費者是 3000 條究珊,一分鐘就是 18 萬條。所以如果你積壓了幾百萬到上千萬的數(shù)據(jù)纵苛,即使消費者恢復了剿涮,也需要大概 1 小時的時間才能恢復過來。
一般這個時候攻人,只能臨時緊急擴容了取试,具體操作步驟和思路如下:
- 先修復 consumer 的問題,確保其恢復消費速度贝椿,然后將現(xiàn)有 consumer 都停掉想括。
- 新建一個 topic,partition 是原來的 10 倍烙博,臨時建立好原先 10 倍的 queue 數(shù)量瑟蜈。
- 然后寫一個臨時的分發(fā)數(shù)據(jù)的 consumer 程序,這個程序部署上去消費積壓的數(shù)據(jù)渣窜,消費之后不做耗時的處理铺根,直接均勻輪詢寫入臨時建立好的 10 倍數(shù)量的 queue。
- 接著臨時征用 10 倍的機器來部署 consumer乔宿,每一批 consumer 消費一個臨時 queue 的數(shù)據(jù)位迂。這種做法相當于是臨時將 queue 資源和 consumer 資源擴大 10 倍,以正常的 10 倍速度來消費數(shù)據(jù)详瑞。
- 等快速消費完積壓數(shù)據(jù)之后掂林,得恢復原先部署的架構(gòu),重新用原先的 consumer 機器來消費消息坝橡。
mq 中的消息過期失效了
假設(shè)你用的是 RabbitMQ泻帮,RabbtiMQ 是可以設(shè)置過期時間的,也就是 TTL计寇。如果消息在 queue 中積壓超過一定的時間就會被 RabbitMQ 給清理掉锣杂,這個數(shù)據(jù)就沒了。那這就是第二個坑了番宁。這就不是說數(shù)據(jù)會大量積壓在 mq 里元莫,而是大量的數(shù)據(jù)會直接搞丟。
這個情況下蝶押,就不是說要增加 consumer 消費積壓的消息踱蠢,因為實際上沒啥積壓,而是丟了大量的消息播聪。我們可以采取一個方案朽基,就是批量重導布隔,這個我們之前線上也有類似的場景干過。就是大量積壓的時候稼虎,我們當時就直接丟棄數(shù)據(jù)了衅檀,然后等過了高峰期以后,比如大家一起喝咖啡熬夜到晚上12點以后霎俩,用戶都睡覺了哀军。這個時候我們就開始寫程序,將丟失的那批數(shù)據(jù)打却,寫個臨時程序杉适,一點一點的查出來,然后重新灌入 mq 里面去柳击,把白天丟的數(shù)據(jù)給他補回來猿推。也只能是這樣了。
假設(shè) 1 萬個訂單積壓在 mq 里面捌肴,沒有處理蹬叭,其中 1000 個訂單都丟了,你只能手動寫程序把那 1000 個訂單給查出來状知,手動發(fā)到 mq 里去再補一次秽五。
mq 都快寫滿了
如果消息積壓在 mq 里,你很長時間都沒有處理掉饥悴,此時導致 mq 都快寫滿了坦喘,咋辦?這個還有別的辦法嗎西设?沒有瓣铣,誰讓你第一個方案執(zhí)行的太慢了,你臨時寫程序贷揽,接入數(shù)據(jù)來消費坯沪,消費一個丟棄一個,都不要了擒滑,快速消費掉所有的消息。然后走第二個方案叉弦,到了晚上再補數(shù)據(jù)吧丐一。
如果讓你寫一個消息隊列,該如何進行架構(gòu)設(shè)計淹冰?說一下你的思路库车。
其實聊到這個問題,一般面試官要考察兩塊:
- 你有沒有對某一個消息隊列做過較為深入的原理的了解樱拴,或者從整體了解把握住一個消息隊列的架構(gòu)原理柠衍。
- 看看你的設(shè)計能力洋满,給你一個常見的系統(tǒng),就是消息隊列系統(tǒng)珍坊,看看你能不能從全局把握一下整體架構(gòu)設(shè)計牺勾,給出一些關(guān)鍵點出來畅哑。
說實話寝衫,問類似問題的時候炫乓,大部分人基本都會蒙挣菲,因為平時從來沒有思考過類似的問題渗钉,大多數(shù)人就是平時埋頭用瞒瘸,從來不去思考背后的一些東西双戳。類似的問題戏蔑,比如叹洲,如果讓你來設(shè)計一個 Spring 框架你會怎么做柠硕?如果讓你來設(shè)計一個 Dubbo 框架你會怎么做?如果讓你來設(shè)計一個 MyBatis 框架你會怎么做运提?
其實回答這類問題蝗柔,說白了,不求你看過那技術(shù)的源碼糙捺,起碼你要大概知道那個技術(shù)的基本原理诫咱、核心組成部分、基本架構(gòu)構(gòu)成洪灯,然后參照一些開源的技術(shù)把一個系統(tǒng)設(shè)計出來的思路說一下就好坎缭。
比如說這個消息隊列系統(tǒng),我們從以下幾個角度來考慮一下:
首先這個 mq 得支持可伸縮性吧签钩,就是需要的時候快速擴容掏呼,就可以增加吞吐量和容量,那怎么搞铅檩?設(shè)計個分布式的系統(tǒng)唄憎夷,參照一下 kafka 的設(shè)計理念,broker -> topic -> partition昧旨,每個 partition 放一個機器拾给,就存一部分數(shù)據(jù)。如果現(xiàn)在資源不夠了兔沃,簡單啊蒋得,給 topic 增加 partition,然后做數(shù)據(jù)遷移乒疏,增加機器额衙,不就可以存放更多數(shù)據(jù),提供更高的吞吐量了?
其次你得考慮一下這個 mq 的數(shù)據(jù)要不要落地磁盤吧窍侧?那肯定要了县踢,落磁盤才能保證別進程掛了數(shù)據(jù)就丟了。那落磁盤的時候怎么落拔凹硼啤?順序?qū)懀@樣就沒有磁盤隨機讀寫的尋址開銷锋爪,磁盤順序讀寫的性能是很高的丙曙,這就是 kafka 的思路。
其次你考慮一下你的 mq 的可用性捌浣尽亏镰?這個事兒,具體參考之前可用性那個環(huán)節(jié)講解的 kafka 的高可用保障機制拯爽。多副本 -> leader & follower -> broker 掛了重新選舉 leader 即可對外服務(wù)索抓。
能不能支持數(shù)據(jù) 0 丟失啊毯炮?可以的逼肯,參考我們之前說的那個 kafka 數(shù)據(jù)零丟失方案。
mq 肯定是很復雜的桃煎,面試官問你這個問題篮幢,其實是個開放題,他就是看看你有沒有從架構(gòu)角度整體構(gòu)思和設(shè)計的思維以及能力为迈。確實這個問題可以刷掉一大批人三椿,因為大部分人平時不思考這些東西。
Redis做消息隊列與其他消息隊列相比有什么不同
Redis作為消息隊列:
- 如果你的需求是快產(chǎn)快消的即時消費場景葫辐,并且生產(chǎn)的消息立即被消費者消費掉搜锰。
- 如果速度是你十分看重的,比如慢了一秒好幾千萬這種耿战。
- 如果允許出現(xiàn)消息丟失的場景蛋叼。
- 如果你不需要系統(tǒng)保存你發(fā)送過的消息。
- 如果需要處理的數(shù)據(jù)量并不是那么巨大剂陡。
其他消息隊列:
- 如果你想要穩(wěn)定的消息隊列狈涮。
- 如果你想要你發(fā)送過的消息可以保留一定的時間,并不是無跡可尋的時候鸭栖。
- 如果你無法忍受數(shù)據(jù)的丟失薯嗤。
- 如果速度不需要那么的快。
- 如果需要處理數(shù)據(jù)量巨大的時候纤泵。
應(yīng)用場景分析
Redis:輕量級,高并發(fā),延遲敏感
即時數(shù)據(jù)分析捏题、秒殺計數(shù)器玻褪、緩存等。
其他MQ:重量級公荧,高并發(fā)带射,異步
批量數(shù)據(jù)異步處理、并行任務(wù)串行化循狰,高負載任務(wù)的負載均衡等窟社。