寫在前面
又到了年底跳槽高峰季柳沙,很多小伙伴出去面試時跃赚,不少面試官都會問到消息隊列的問題囱晴,不少小伙伴回答的不是很完美澳泵,有些小伙伴是心里知道答案实愚,嘴上卻沒有很好的表達(dá)出來,究其根本原因兔辅,還是對相關(guān)的知識點理解的不夠透徹腊敲。今天,我們就一起來探討下這個話題维苔。注:文章有點長碰辅,你說你能一鼓作氣看完,我有點不信=槭薄没宾!
什么是消息隊列?
消息隊列(Message Queue)是在消息的傳輸過程中保存消息的容器沸柔,是應(yīng)用間的通信方式循衰。消息發(fā)送后可以立即返回,由消息系統(tǒng)保證消息的可靠傳輸褐澎,消息發(fā)布者只管把消息寫到隊列里面而不用考慮誰需要消息羹蚣,而消息的使用者也不需要知道誰發(fā)布的消息,只管到消息隊列里面取乱凿,這樣生產(chǎn)和消費便可以做到分離。
為什么要使用消息隊列咽弦?
優(yōu)點:
- 異步處理:例如短信通知徒蟆、終端狀態(tài)推送、App推送型型、用戶注冊等
- 數(shù)據(jù)同步:業(yè)務(wù)數(shù)據(jù)推送同步
- 重試補償:記賬失敗重試
- 系統(tǒng)解耦:通訊上下行段审、終端異常監(jiān)控、分布式事件中心
- 流量消峰:秒殺場景下的下單處理
- 發(fā)布訂閱:HSF的服務(wù)狀態(tài)變化通知闹蒜、分布式事件中心
- 高并發(fā)緩沖:日志服務(wù)寺枉、監(jiān)控上報
使用消息隊列比較核心的作用就是:解耦、異步绷落、削峰姥闪。
缺點:
- 系統(tǒng)可用性降低 系統(tǒng)引入的外部依賴越多,越容易掛掉砌烁?如何保證消息隊列的高可用筐喳?
- 系統(tǒng)復(fù)雜度提高 怎么保證消息沒有重復(fù)消費催式?怎么處理消息丟失的情況?怎么保證消息傳遞的順序性避归?
- 一致性問題 A 系統(tǒng)處理完了直接返回成功了荣月,人都以為你這個請求就成功了;但是問題是梳毙,要是 BCD 三個系統(tǒng)那里哺窄,BD 兩個系統(tǒng)寫庫成功了,結(jié)果 C 系統(tǒng)寫庫失敗了账锹,咋整萌业?你這數(shù)據(jù)就不一致了。
以下主要討論的RabbitMQ和Kafka兩種消息隊列牌废。
如何保證消息隊列的高可用咽白?
RabbitMQ的高可用
RabbitMQ的高可用是基于主從(非分布式)做高可用性。RabbitMQ 有三種模式:單機模式(Demo級別)鸟缕、普通集群模式(無高可用性)晶框、鏡像集群模式(高可用性)。
-
普通集群模式
普通集群模式懂从,意思就是在多臺機器上啟動多個 RabbitMQ 實例授段,每個機器啟動一個。你創(chuàng)建的 queue番甩,只會放在一個 RabbitMQ 實例上侵贵,但是每個實例都同步 queue 的元數(shù)據(jù)(元數(shù)據(jù)可以認(rèn)為是 queue 的一些配置信息,通過元數(shù)據(jù)缘薛,可以找到 queue 所在實例)窍育。你消費的時候,實際上如果連接到了另外一個實例宴胧,那么那個實例會從 queue 所在實例上拉取數(shù)據(jù)過來漱抓。
這種方式確實很麻煩,也不怎么好恕齐,沒做到所謂的分布式乞娄,就是個普通集群。因為這導(dǎo)致你要么消費者每次隨機連接一個實例然后拉取數(shù)據(jù)显歧,要么固定連接那個 queue 所在實例消費數(shù)據(jù)仪或,前者有數(shù)據(jù)拉取的開銷,后者導(dǎo)致單實例性能瓶頸范删。
而且如果那個放 queue 的實例宕機了,會導(dǎo)致接下來其他實例就無法從那個實例拉取拷肌,如果你開啟了消息持久化,讓 RabbitMQ 落地存儲消息的話契沫,消息不一定會丟靶病,得等這個實例恢復(fù)了涕侈,然后才可以繼續(xù)從這個 queue 拉取數(shù)據(jù)。
所以這個事兒就比較尷尬了郊闯,這就沒有什么所謂的高可用性谨履,這方案主要是提高吞吐量的矗钟,就是說讓集群中多個節(jié)點來服務(wù)某個 queue 的讀寫操作吨艇。
- 鏡像集群模式
這種模式失尖,才是所謂的 RabbitMQ 的高可用模式琼富。跟普通集群模式不一樣的是人断,在鏡像集群模式下暇仲,你創(chuàng)建的 queue,無論元數(shù)據(jù)還是 queue 里的消息都會存在于多個實例上挑胸,就是說簿透,每個 RabbitMQ 節(jié)點都有這個 queue 的一個完整鏡像蚂维,包含 queue 的全部數(shù)據(jù)的意思。然后每次你寫消息到 queue 的時候蜕煌,都會自動把消息同步到多個實例的 queue 上靠柑。
那么如何開啟這個鏡像集群模式呢?其實很簡單释液,RabbitMQ 有很好的管理控制臺形帮,就是在后臺新增一個策略完残,這個策略是鏡像集群模式的策略亡笑,指定的時候是可以要求數(shù)據(jù)同步到所有節(jié)點的萌京,也可以要求同步到指定數(shù)量的節(jié)點宏浩,再次創(chuàng)建 queue 的時候,應(yīng)用這個策略靠瞎,就會自動將數(shù)據(jù)同步到其他的節(jié)點上去了比庄。
這樣的話,好處在于乏盐,你任何一個機器宕機了佳窑,沒事兒,其它機器(節(jié)點)還包含了這個 queue 的完整數(shù)據(jù)父能,別的 consumer 都可以到其它節(jié)點上去消費數(shù)據(jù)神凑。壞處在于,第一何吝,這個性能開銷也太大了吧溉委,消息需要同步到所有機器上,導(dǎo)致網(wǎng)絡(luò)帶寬壓力和消耗很重爱榕!第二瓣喊,這么玩兒,不是分布式的黔酥,就沒有擴展性可言了藻三,如果某個 queue 負(fù)載很重,你加機器跪者,新增的機器也包含了這個 queue 的所有數(shù)據(jù)棵帽,并沒有辦法線性擴展你的 queue。你想渣玲,如果這個 queue 的數(shù)據(jù)量很大逗概,大到這個機器上的容量無法容納了,此時該怎么辦呢忘衍?
Kafka的高可用
Kafka 一個最基本的架構(gòu)認(rèn)識:由多個 broker 組成仗谆,每個 broker 是一個節(jié)點;你創(chuàng)建一個 topic淑履,這個 topic 可以劃分為多個 partition隶垮,每個 partition 可以存在于不同的 broker 上,每個 partition 就放一部分?jǐn)?shù)據(jù)秘噪。
這就是天然的分布式消息隊列狸吞,就是說一個 topic 的數(shù)據(jù),是分散放在多個機器上的,每個機器就放一部分?jǐn)?shù)據(jù)蹋偏。
實際上 RabbmitMQ 之類的便斥,并不是分布式消息隊列,它就是傳統(tǒng)的消息隊列威始,只不過提供了一些集群枢纠、HA(High Availability, 高可用性) 的機制而已,因為無論怎么玩兒黎棠,RabbitMQ 一個 queue 的數(shù)據(jù)都是放在一個節(jié)點里的晋渺,鏡像集群下,也是每個節(jié)點都放這個 queue 的完整數(shù)據(jù)脓斩。
Kafka 0.8 以前木西,是沒有 HA 機制的,就是任何一個 broker 宕機了随静,那個 broker 上的 partition 就廢了八千,沒法寫也沒法讀,沒有什么高可用性可言燎猛。
比如說恋捆,我們假設(shè)創(chuàng)建了一個 topic,指定其 partition 數(shù)量是 3 個重绷,分別在三臺機器上鸠信。但是,如果第二臺機器宕機了论寨,會導(dǎo)致這個 topic 的 1/3 的數(shù)據(jù)就丟了星立,因此這個是做不到高可用的。
Kafka 0.8 以后葬凳,提供了 HA 機制绰垂,就是 replica(復(fù)制品) 副本機制。每個 partition 的數(shù)據(jù)都會同步到其它機器上火焰,形成自己的多個 replica 副本劲装。所有 replica 會選舉一個 leader 出來,那么生產(chǎn)和消費都跟這個 leader 打交道昌简,然后其他 replica 就是 follower占业。寫的時候,leader 會負(fù)責(zé)把數(shù)據(jù)同步到所有 follower 上去纯赎,讀的時候就直接讀 leader 上的數(shù)據(jù)即可谦疾。只能讀寫 leader?很簡單犬金,要是你可以隨意讀寫每個 follower念恍,那么就要 care 數(shù)據(jù)一致性的問題六剥,系統(tǒng)復(fù)雜度太高,很容易出問題峰伙。Kafka 會均勻地將一個 partition 的所有 replica 分布在不同的機器上疗疟,這樣才可以提高容錯性。
這么搞瞳氓,就有所謂的高可用性了策彤,因為如果某個 broker 宕機了,沒事兒匣摘,那個 broker上面的 partition 在其他機器上都有副本的店诗。如果這個宕機的 broker 上面有某個 partition 的 leader,那么此時會從 follower 中重新選舉一個新的 leader 出來恋沃,大家繼續(xù)讀寫那個新的 leader 即可。這就有所謂的高可用性了必指。
寫數(shù)據(jù)的時候囊咏,生產(chǎn)者就寫 leader,然后 leader 將數(shù)據(jù)落地寫本地磁盤塔橡,接著其他 follower 自己主動從 leader 來 pull 數(shù)據(jù)梅割。一旦所有 follower 同步好數(shù)據(jù)了,就會發(fā)送 ack 給 leader葛家,leader 收到所有 follower 的 ack 之后户辞,就會返回寫成功的消息給生產(chǎn)者。(當(dāng)然癞谒,這只是其中一種模式底燎,還可以適當(dāng)調(diào)整這個行為)
消費的時候,只會從 leader 去讀弹砚,但是只有當(dāng)一個消息已經(jīng)被所有 follower 都同步成功返回 ack 的時候双仍,這個消息才會被消費者讀到。
如何保證消息不重復(fù)消費(冪等性)桌吃?
首先朱沃,所有的消息隊列都會有這樣重復(fù)消費的問題,因為這是不MQ來保證茅诱,而是我們自己開發(fā)保證的逗物,我們使用Kakfa來討論是如何實現(xiàn)的。
Kakfa有個offset的概念瑟俭,就是每個消息寫進(jìn)去都會有一個offset值翎卓,代表消費的序號,然后consumer消費了數(shù)據(jù)之后摆寄,默認(rèn)每隔一段時間會把自己消費過的消息的offset值提交莲祸,表示我已經(jīng)消費過了蹂安,下次要是我重啟啥的,就讓我從當(dāng)前提交的offset處來繼續(xù)消費锐帜。
但是凡事總有意外田盈,比如我們之前生產(chǎn)經(jīng)常遇到的,就是你有時候重啟系統(tǒng)缴阎,看你怎么重啟了允瞧,如果碰到點著急的,直接 kill 進(jìn)程了蛮拔,再重啟述暂。這會導(dǎo)致 consumer 有些消息處理了,但是沒來得及提交 offset建炫,尷尬了畦韭。重啟之后,少數(shù)消息會再次消費一次肛跌。
其實重復(fù)消費不可怕艺配,可怕的是你沒考慮到重復(fù)消費之后,怎么保證冪等性衍慎。
舉個例子吧转唉。假設(shè)你有個系統(tǒng),消費一條消息就往數(shù)據(jù)庫里插入一條數(shù)據(jù)稳捆,要是你一個消息重復(fù)兩次赠法,你不就插入了兩條,這數(shù)據(jù)不就錯了乔夯?但是你要是消費到第二次的時候砖织,自己判斷一下是否已經(jīng)消費過了,若是就直接扔了末荐,這樣不就保留了一條數(shù)據(jù)镶苞,從而保證了數(shù)據(jù)的正確性。一條數(shù)據(jù)重復(fù)出現(xiàn)兩次鞠评,數(shù)據(jù)庫里就只有一條數(shù)據(jù)茂蚓,這就保證了系統(tǒng)的冪等性。冪等性剃幌,通俗點說聋涨,就一個數(shù)據(jù),或者一個請求负乡,給你重復(fù)來多次牍白,你得確保對應(yīng)的數(shù)據(jù)是不會改變的,不能出錯抖棘。
所以第二個問題來了茂腥,怎么保證消息隊列消費的冪等性狸涌?
其實還是得結(jié)合業(yè)務(wù)來思考,我這里給幾個思路:
- 比如你拿個數(shù)據(jù)要寫庫最岗,你先根據(jù)主鍵查一下帕胆,如果這數(shù)據(jù)都有了,你就別插入了般渡,update 一下好吧懒豹。
- 比如你是寫 Redis,那沒問題了驯用,反正每次都是 set脸秽,天然冪等性。
- 比如你不是上面兩個場景蝴乔,那做的稍微復(fù)雜一點记餐,你需要讓生產(chǎn)者發(fā)送每條數(shù)據(jù)的時候,里面加一個全局唯一的 id薇正,類似訂單 id 之類的東西片酝,然后你這里消費到了之后,先根據(jù)這個 id 去比如 Redis 里查一下铝穷,之前消費過嗎钠怯?如果沒有消費過佳魔,你就處理曙聂,然后這個 id 寫 Redis。如果消費過了鞠鲜,那你就別處理了宁脊,保證別重復(fù)處理相同的消息即可。
- 比如基于數(shù)據(jù)庫的唯一鍵來保證重復(fù)數(shù)據(jù)不會重復(fù)插入多條贤姆。因為有唯一鍵約束了榆苞,重復(fù)數(shù)據(jù)插入只會報錯,不會導(dǎo)致數(shù)據(jù)庫中出現(xiàn)臟數(shù)據(jù)霞捡。
當(dāng)然坐漏,如何保證 MQ 的消費是冪等性的,需要結(jié)合具體的業(yè)務(wù)來看碧信。
如何保證消息的可靠傳輸(不丟失)赊琳?
這個是肯定的,MQ的基本原則就是數(shù)據(jù)不能多一條砰碴,也不能少一條躏筏,不能多其實就是我們前面重復(fù)消費的問題。不能少呈枉,就是數(shù)據(jù)不能丟趁尼,像計費埃碱,扣費的一些信息,是肯定不能丟失的酥泞。
數(shù)據(jù)的丟失問題砚殿,可能出現(xiàn)在生產(chǎn)者、MQ婶博、消費者中瓮具,咱們從 RabbitMQ 和 Kafka 分別來分析一下吧。
RabbitMQ如何保證消息的可靠
生產(chǎn)者丟數(shù)據(jù)
生產(chǎn)者將數(shù)據(jù)發(fā)送到 RabbitMQ 的時候凡人,可能數(shù)據(jù)就在半路給搞丟了名党,因為網(wǎng)絡(luò)問題啥的,都有可能挠轴。
此時可以選擇用 RabbitMQ 提供的事務(wù)功能传睹,就是生產(chǎn)者發(fā)送數(shù)據(jù)之前開啟 RabbitMQ 事務(wù)channel.txSelect
,然后發(fā)送消息岸晦,如果消息沒有成功被 RabbitMQ 接收到欧啤,那么生產(chǎn)者會收到異常報錯,此時就可以回滾事務(wù)channel.txRollback
启上,然后重試發(fā)送消息邢隧;如果收到了消息,那么可以提交事務(wù)channel.txCommit
冈在。
// 開啟事務(wù)
channel.txSelect
try {
// 這里發(fā)送消息
} catch (Exception e) {
channel.txRollback
// 這里再次重發(fā)這條消息
}
// 提交事務(wù)
channel.txCommit
但是問題是倒慧,RabbitMQ 事務(wù)機制(同步)一搞,基本上吞吐量會下來包券,因為太耗性能纫谅。
所以一般來說,如果你要確保說寫 RabbitMQ 的消息別丟溅固,可以開啟 confirm
模式付秕,在生產(chǎn)者那里設(shè)置開啟 confirm
模式之后,你每次寫的消息都會分配一個唯一的 id侍郭,然后如果寫入了 RabbitMQ 中询吴,RabbitMQ 會給你回傳一個 ack
消息,告訴你說這個消息 ok 了亮元。如果 RabbitMQ 沒能處理這個消息猛计,會回調(diào)你的一個 nack
接口,告訴你這個消息接收失敗苹粟,你可以重試有滑。而且你可以結(jié)合這個機制自己在內(nèi)存里維護(hù)每個消息 id 的狀態(tài),如果超過一定時間還沒接收到這個消息的回調(diào)嵌削,那么你可以重發(fā)毛好。
事務(wù)機制和 confirm
機制最大的不同在于望艺,事務(wù)機制是同步的,你提交一個事務(wù)之后會阻塞在那兒肌访,但是 confirm
機制是異步的找默,你發(fā)送個消息之后就可以發(fā)送下一個消息,然后那個消息 RabbitMQ 接收了之后會異步回調(diào)你的一個接口通知你這個消息接收到了吼驶。
所以一般在生產(chǎn)者這塊避免數(shù)據(jù)丟失惩激,都是用 confirm
機制的。
RabbitMQ丟數(shù)據(jù)
就是 RabbitMQ 自己弄丟了數(shù)據(jù)蟹演,這個你必須開啟 RabbitMQ 的持久化风钻,就是消息寫入之后會持久化到磁盤,哪怕是 RabbitMQ 自己掛了酒请,恢復(fù)之后會自動讀取之前存儲的數(shù)據(jù)骡技,一般數(shù)據(jù)不會丟。除非極其罕見的是羞反,RabbitMQ 還沒持久化布朦,自己就掛了,可能導(dǎo)致少量數(shù)據(jù)丟失昼窗,但是這個概率較小是趴。
設(shè)置持久化有兩個步驟:
- 創(chuàng)建 queue 的時候?qū)⑵湓O(shè)置為持久化 這樣就可以保證 RabbitMQ 持久化 queue 的元數(shù)據(jù),但是它是不會持久化 queue 里的數(shù)據(jù)的澄惊。
- 第二個是發(fā)送消息的時候?qū)⑾⒌?
deliveryMode
設(shè)置為 2 就是將消息設(shè)置為持久化的唆途,此時 RabbitMQ 就會將消息持久化到磁盤上去。
必須要同時設(shè)置這兩個持久化才行缤削,RabbitMQ 哪怕是掛了窘哈,再次重啟吹榴,也會從磁盤上重啟恢復(fù) queue亭敢,恢復(fù)這個 queue 里的數(shù)據(jù)。
注意图筹,哪怕是你給 RabbitMQ 開啟了持久化機制帅刀,也有一種可能,就是這個消息寫到了 RabbitMQ 中远剩,但是還沒來得及持久化到磁盤上扣溺,結(jié)果不巧,此時 RabbitMQ 掛了瓜晤,就會導(dǎo)致內(nèi)存里的一點點數(shù)據(jù)丟失锥余。
所以,持久化可以跟生產(chǎn)者那邊的 confirm
機制配合起來痢掠,只有消息被持久化到磁盤之后驱犹,才會通知生產(chǎn)者 ack
了嘲恍,所以哪怕是在持久化到磁盤之前,RabbitMQ 掛了雄驹,數(shù)據(jù)丟了佃牛,生產(chǎn)者收不到 ack
,你也是可以自己重發(fā)的医舆。
消費者丟數(shù)據(jù)
RabbitMQ 如果丟失了數(shù)據(jù)俘侠,主要是因為你消費的時候,剛消費到蔬将,還沒處理爷速,結(jié)果進(jìn)程掛了,比如重啟了霞怀,那么就尷尬了遍希,RabbitMQ 認(rèn)為你都消費了,這數(shù)據(jù)就丟了里烦。
這個時候得用 RabbitMQ 提供的 ack
機制凿蒜,簡單來說,就是你必須關(guān)閉 RabbitMQ 的自動 ack
胁黑,可以通過一個 api 來調(diào)用就行废封,然后每次你自己代碼里確保處理完的時候,再在程序里 ack
一把丧蘸。這樣的話漂洋,如果你還沒處理完,不就沒有 ack
了力喷?那 RabbitMQ 就認(rèn)為你還沒處理完刽漂,這個時候 RabbitMQ 會把這個消費分配給別的 consumer 去處理,消息是不會丟的弟孟。
Kakfa如何保證消息的可靠
-
消費者丟數(shù)據(jù)
唯一可能導(dǎo)致消費者弄丟數(shù)據(jù)的情況贝咙,就是說,你消費到了這個消息拂募,然后消費者那邊自動提交了 offset庭猩,讓 Kafka 以為你已經(jīng)消費好了這個消息,但其實你才剛準(zhǔn)備處理這個消息陈症,你還沒處理蔼水,你自己就掛了,此時這條消息就丟咯录肯。
這不是跟 RabbitMQ 差不多嗎趴腋,大家都知道 Kafka 會自動提交 offset,那么只要關(guān)閉自動提交 offset,在處理完之后自己手動提交 offset优炬,就可以保證數(shù)據(jù)不會丟疏叨。但是此時確實還是可能會有重復(fù)消費,比如你剛處理完穿剖,還沒提交 offset蚤蔓,結(jié)果自己掛了,此時肯定會重復(fù)消費一次糊余,自己保證冪等性就好了秀又。
生產(chǎn)環(huán)境碰到的一個問題,就是說我們的 Kafka 消費者消費到了數(shù)據(jù)之后是寫到一個內(nèi)存的 queue 里先緩沖一下贬芥,結(jié)果有的時候吐辙,你剛把消息寫入內(nèi)存 queue,然后消費者會自動提交 offset蘸劈。然后此時我們重啟了系統(tǒng)昏苏,就會導(dǎo)致內(nèi)存 queue 里還沒來得及處理的數(shù)據(jù)就丟失了。
-
Kafka丟數(shù)據(jù)
這塊比較常見的一個場景威沫,就是 Kafka 某個 broker 宕機贤惯,然后重新選舉 partition 的 leader。大家想想棒掠,要是此時其他的 follower 剛好還有些數(shù)據(jù)沒有同步孵构,結(jié)果此時 leader 掛了,然后選舉某個 follower 成 leader 之后烟很,不就少了一些數(shù)據(jù)颈墅?這就丟了一些數(shù)據(jù)啊。
生產(chǎn)環(huán)境也遇到過雾袱,我們也是恤筛,之前 Kafka 的 leader 機器宕機了,將 follower 切換為 leader 之后芹橡,就會發(fā)現(xiàn)說這個數(shù)據(jù)就丟了毒坛。
所以此時一般是要求起碼設(shè)置如下 4 個參數(shù):
- 給 topic 設(shè)置
replication.factor
參數(shù):這個值必須大于 1,要求每個 partition 必須有至少 2 個副本僻族。 - 在 Kafka 服務(wù)端設(shè)置
min.insync.replicas
參數(shù):這個值必須大于 1粘驰,這個是要求一個 leader 至少感知到有至少一個 follower 還跟自己保持聯(lián)系屡谐,沒掉隊述么,這樣才能確保 leader 掛了還有一個 follower 吧。 - 在 producer 端設(shè)置
acks=all
:這個是要求每條數(shù)據(jù)愕掏,必須是寫入所有 replica 之后度秘,才能認(rèn)為是寫成功了。 - 在 producer 端設(shè)置
retries=MAX
(很大很大很大的一個值,無限次重試的意思):這個是要求一旦寫入失敗剑梳,就無限重試唆貌,卡在這里了。
我們生產(chǎn)環(huán)境就是按照上述要求配置的垢乙,這樣配置之后锨咙,至少在 Kafka broker 端就可以保證在 leader 所在 broker 發(fā)生故障,進(jìn)行 leader 切換時寻定,數(shù)據(jù)不會丟失比藻。
- 給 topic 設(shè)置
-
生產(chǎn)者丟數(shù)據(jù)
如果按照上述的思路設(shè)置了
acks=all
卜高,一定不會丟,要求是骂倘,你的 leader 接收到消息,所有的 follower 都同步到了消息之后巴席,才認(rèn)為本次寫成功了历涝。如果沒滿足這個條件,生產(chǎn)者會自動不斷的重試漾唉,重試無限次荧库。
如何保證消息的順序性?
我舉個例子赵刑,我們以前做過一個 mysql binlog
同步的系統(tǒng)电爹,壓力還是非常大的,日同步數(shù)據(jù)要達(dá)到上億料睛,就是說數(shù)據(jù)從一個 mysql 庫原封不動地同步到另一個 mysql 庫里面去(mysql -> mysql)丐箩。常見的一點在于說比如大數(shù)據(jù) team,就需要同步一個 mysql 庫過來恤煞,對公司的業(yè)務(wù)系統(tǒng)的數(shù)據(jù)做各種復(fù)雜的操作屎勘。
你在 mysql 里增刪改一條數(shù)據(jù),對應(yīng)出來了增刪改 3 條 binlog
日志居扒,接著這三條 binlog
發(fā)送到 MQ 里面概漱,再消費出來依次執(zhí)行,起碼得保證人家是按照順序來的吧喜喂?不然本來是:增加瓤摧、修改、刪除玉吁;你楞是換了順序給執(zhí)行成刪除照弥、修改、增加进副,不全錯了么这揣。
本來這個數(shù)據(jù)同步過來,應(yīng)該最后這個數(shù)據(jù)被刪除了;結(jié)果你搞錯了這個順序给赞,最后這個數(shù)據(jù)保留下來了机打,數(shù)據(jù)同步就出錯了。
先看看順序會錯亂的倆場景:
- RabbitMQ:一個 queue片迅,多個 consumer残邀。比如,生產(chǎn)者向 RabbitMQ 里發(fā)送了三條數(shù)據(jù)柑蛇,順序依次是 data1/data2/data3罐旗,壓入的是 RabbitMQ 的一個內(nèi)存隊列。有三個消費者分別從 MQ 中消費這三條數(shù)據(jù)中的一條唯蝶,結(jié)果消費者2先執(zhí)行完操作九秀,把 data2 存入數(shù)據(jù)庫,然后是 data1/data3粘我。這不明顯亂了鼓蜒。
- Kafka:比如說我們建了一個 topic,有三個 partition征字。生產(chǎn)者在寫的時候都弹,其實可以指定一個 key,比如說我們指定了某個訂單 id 作為 key匙姜,那么這個訂單相關(guān)的數(shù)據(jù)畅厢,一定會被分發(fā)到同一個 partition 中去,而且這個 partition 中的數(shù)據(jù)一定是有順序的氮昧。 消費者從 partition 中取出來數(shù)據(jù)的時候框杜,也一定是有順序的。到這里袖肥,順序還是 ok 的咪辱,沒有錯亂。接著椎组,我們在消費者里可能會搞多個線程來并發(fā)處理消息油狂。因為如果消費者是單線程消費處理,而處理比較耗時的話寸癌,比如處理一條消息耗時幾十 ms专筷,那么 1 秒鐘只能處理幾十條消息,這吞吐量太低了蒸苇。而多個線程并發(fā)跑的話磷蛹,順序可能就亂掉了。
RabbitMQ解決方案
拆分多個 queue填渠,每個 queue 一個 consumer弦聂,就是多一些 queue 而已鸟辅,確實是麻煩點氛什;或者就一個 queue 但是對應(yīng)一個 consumer莺葫,然后這個 consumer 內(nèi)部用內(nèi)存隊列做排隊,然后分發(fā)給底層不同的 worker 來處理枪眉。
Kafka解決方案
- 一個 topic捺檬,一個 partition,一個 consumer贸铜,內(nèi)部單線程消費堡纬,單線程吞吐量太低,一般不會用這個蒿秦。
- 寫 N 個內(nèi)存 queue烤镐,具有相同 key 的數(shù)據(jù)都到同一個內(nèi)存 queue;然后對于 N 個線程棍鳖,每個線程分別消費一個內(nèi)存 queue 即可炮叶,這樣就能保證順序性。
如何處理消息推積渡处?
大量消息在 mq 里積壓了幾個小時了還沒解決
一個消費者一秒是 1000 條镜悉,一秒 3 個消費者是 3000 條,一分鐘就是 18 萬條医瘫。所以如果你積壓了幾百萬到上千萬的數(shù)據(jù)侣肄,即使消費者恢復(fù)了,也需要大概 1 小時的時間才能恢復(fù)過來醇份。
一般這個時候稼锅,只能臨時緊急擴容了,具體操作步驟和思路如下:
- 先修復(fù) consumer 的問題僚纷,確保其恢復(fù)消費速度缰贝,然后將現(xiàn)有 consumer 都停掉。
- 新建一個 topic畔濒,partition 是原來的 10 倍剩晴,臨時建立好原先 10 倍的 queue 數(shù)量。
- 然后寫一個臨時的分發(fā)數(shù)據(jù)的 consumer 程序侵状,這個程序部署上去消費積壓的數(shù)據(jù)赞弥,消費之后不做耗時的處理,直接均勻輪詢寫入臨時建立好的 10 倍數(shù)量的 queue趣兄。
- 接著臨時征用 10 倍的機器來部署 consumer绽左,每一批 consumer 消費一個臨時 queue 的數(shù)據(jù)。這種做法相當(dāng)于是臨時將 queue 資源和 consumer 資源擴大 10 倍艇潭,以正常的 10 倍速度來消費數(shù)據(jù)拼窥。
- 等快速消費完積壓數(shù)據(jù)之后戏蔑,得恢復(fù)原先部署的架構(gòu),重新用原先的 consumer 機器來消費消息鲁纠。
mq 中的消息過期失效了
假設(shè)你用的是 RabbitMQ总棵,RabbtiMQ 是可以設(shè)置過期時間的,也就是 TTL改含。如果消息在 queue 中積壓超過一定的時間就會被 RabbitMQ 給清理掉情龄,這個數(shù)據(jù)就沒了。那這就是第二個坑了捍壤。這就不是說數(shù)據(jù)會大量積壓在 mq 里骤视,而是大量的數(shù)據(jù)會直接搞丟。
這個情況下鹃觉,就不是說要增加 consumer 消費積壓的消息专酗,因為實際上沒啥積壓,而是丟了大量的消息盗扇。我們可以采取一個方案祷肯,就是批量重導(dǎo),這個我們之前線上也有類似的場景干過粱玲。就是大量積壓的時候躬柬,我們當(dāng)時就直接丟棄數(shù)據(jù)了,然后等過了高峰期以后抽减,比如大家一起喝咖啡熬夜到晚上12點以后允青,用戶都睡覺了。這個時候我們就開始寫程序卵沉,將丟失的那批數(shù)據(jù)颠锉,寫個臨時程序,一點一點的查出來史汗,然后重新灌入 mq 里面去琼掠,把白天丟的數(shù)據(jù)給他補回來。也只能是這樣了停撞。
假設(shè) 1 萬個訂單積壓在 mq 里面瓷蛙,沒有處理,其中 1000 個訂單都丟了戈毒,你只能手動寫程序把那 1000 個訂單給查出來艰猬,手動發(fā)到 mq 里去再補一次。
mq 都快寫滿了
如果消息積壓在 mq 里埋市,你很長時間都沒有處理掉冠桃,此時導(dǎo)致 mq 都快寫滿了,咋辦道宅?這個還有別的辦法嗎食听?沒有胸蛛,誰讓你第一個方案執(zhí)行的太慢了,你臨時寫程序樱报,接入數(shù)據(jù)來消費葬项,消費一個丟棄一個,都不要了肃弟,快速消費掉所有的消息玷室。然后走第二個方案零蓉,到了晚上再補數(shù)據(jù)吧笤受。
參考資料:
- Kafa深度解析
- RabbitMQ源碼解析
好了,今天就到這兒吧敌蜂,我是冰河箩兽,大家有啥問題可以在下方留言,一起交流技術(shù)章喉,一起進(jìn)階汗贫,一起牛逼~~