(1)RabbitMQ的高可用性
RabbitMQ是比較有代表性的铲掐,因為是基于主從做高可用性的譬猫,我們就以他為例子講解第一種MQ的高可用性怎么實現(xiàn)。
rabbitmq有三種模式:單機模式羡疗,普通集群模式染服,鏡像集群模式
1)單機模式
就是demo級別的,一般就是你本地啟動了玩玩兒的叨恨,沒人生產用單機模式
2)普通集群模式
意思就是在多臺機器上啟動多個rabbitmq實例柳刮,每個機器啟動一個。但是你創(chuàng)建的queue痒钝,只會放在一個rabbtimq實例上秉颗,但是每個實例都同步queue的元數(shù)據(jù)。完了你消費的時候送矩,實際上如果連接到了另外一個實例蚕甥,那么那個實例會從queue所在實例上拉取數(shù)據(jù)過來。
這種方式確實很麻煩栋荸,也不怎么好菇怀,沒做到所謂的分布式,就是個普通集群蒸其。因為這導致你要么消費者每次隨機連接一個實例然后拉取數(shù)據(jù)敏释,要么固定連接那個queue所在實例消費數(shù)據(jù),前者有數(shù)據(jù)拉取的開銷摸袁,后者導致單實例性能瓶頸钥顽。
而且如果那個放queue的實例宕機了,會導致接下來其他實例就無法從那個實例拉取靠汁,如果你開啟了消息持久化蜂大,讓rabbitmq落地存儲消息的話,消息不一定會丟蝶怔,得等這個實例恢復了奶浦,然后才可以繼續(xù)從這個queue拉取數(shù)據(jù)。
所以這個事兒就比較尷尬了踢星,這就沒有什么所謂的高可用性可言了澳叉,這方案主要是提高吞吐量的,就是說讓集群中多個節(jié)點來服務某個queue的讀寫操作沐悦。
3)鏡像集群模式
這種模式成洗,才是所謂的rabbitmq的高可用模式,跟普通集群模式不一樣的是藏否,你創(chuàng)建的queue瓶殃,無論元數(shù)據(jù)還是queue里的消息都會存在于多個實例上,然后每次你寫消息到queue的時候副签,都會自動把消息到多個實例的queue里進行消息同步遥椿。
這樣的話基矮,好處在于,你任何一個機器宕機了冠场,沒事兒家浇,別的機器都可以用。壞處在于碴裙,第一蓝谨,這個性能開銷也太大了吧,消息同步所有機器青团,導致網(wǎng)絡帶寬壓力和消耗很重!第二咖楣,這么玩兒督笆,就沒有擴展性可言了,如果某個queue負載很重诱贿,你加機器娃肿,新增的機器也包含了這個queue的所有數(shù)據(jù),并沒有辦法線性擴展你的queue
那么怎么開啟這個鏡像集群模式呢珠十?我這里簡單說一下料扰,避免面試人家問你你不知道,其實很簡單rabbitmq有很好的管理控制臺焙蹭,就是在后臺新增一個策略晒杈,這個策略是鏡像集群模式的策略,指定的時候可以要求數(shù)據(jù)同步到所有節(jié)點的孔厉,也可以要求就同步到指定數(shù)量的節(jié)點拯钻,然后你再次創(chuàng)建queue的時候,應用這個策略撰豺,就會自動將數(shù)據(jù)同步到其他的節(jié)點上去了粪般。
(2)kafka的高可用性
kafka一個最基本的架構認識:多個broker組成,每個broker是一個節(jié)點污桦;你創(chuàng)建一個topic亩歹,這個topic可以劃分為多個partition,每個partition可以存在于不同的broker上凡橱,每個partition就放一部分數(shù)據(jù)小作。
這就是天然的分布式消息隊列,就是說一個topic的數(shù)據(jù)梭纹,是分散放在多個機器上的躲惰,每個機器就放一部分數(shù)據(jù)。
實際上rabbitmq之類的变抽,并不是分布式消息隊列础拨,他就是傳統(tǒng)的消息隊列氮块,只不過提供了一些集群、HA的機制而已诡宗,因為無論怎么玩兒滔蝉,rabbitmq一個queue的數(shù)據(jù)都是放在一個節(jié)點里的,鏡像集群下塔沃,也是每個節(jié)點都放這個queue的完整數(shù)據(jù)蝠引。
kafka 0.8以前,是沒有HA機制的蛀柴,就是任何一個broker宕機了螃概,那個broker上的partition就廢了,沒法寫也沒法讀鸽疾,沒有什么高可用性可言吊洼。
kafka 0.8以后,提供了HA機制制肮,就是replica副本機制冒窍。每個partition的數(shù)據(jù)都會同步到吉他機器上,形成自己的多個replica副本豺鼻。然后所有replica會選舉一個leader出來综液,那么生產和消費都跟這個leader打交道,然后其他replica就是follower儒飒。寫的時候谬莹,leader會負責把數(shù)據(jù)同步到所有follower上去,讀的時候就直接讀leader上數(shù)據(jù)即可桩了。只能讀寫leader届良?很簡單,要是你可以隨意讀寫每個follower圣猎,那么就要care數(shù)據(jù)一致性的問題士葫,系統(tǒng)復雜度太高,很容易出問題送悔。kafka會均勻的將一個partition的所有replica分布在不同的機器上慢显,這樣才可以提高容錯性。
這么搞欠啤,就有所謂的高可用性了荚藻,因為如果某個broker宕機了,沒事兒洁段,那個broker上面的partition在其他機器上都有副本的应狱,如果這上面有某個partition的leader,那么此時會重新選舉一個新的leader出來祠丝,大家繼續(xù)讀寫那個新的leader即可疾呻。這就有所謂的高可用性了除嘹。
寫數(shù)據(jù)的時候,生產者就寫leader岸蜗,然后leader將數(shù)據(jù)落地寫本地磁盤尉咕,接著其他follower自己主動從leader來pull數(shù)據(jù)。一旦所有follower同步好數(shù)據(jù)了璃岳,就會發(fā)送ack給leader年缎,leader收到所有follower的ack之后,就會返回寫成功的消息給生產者铃慷。(當然单芜,這只是其中一種模式,還可以適當調整這個行為)
消費的時候犁柜,只會從leader去讀缓溅,但是只有一個消息已經(jīng)被所有follower都同步成功返回ack的時候,這個消息才會被消費者讀到赁温。
怎么保證消息隊列消費的冪等性?
先大概說一說可能會有哪些重復消費的問題淤齐。
首先就是比如rabbitmq股囊、rocketmq、kafka更啄,都有可能會出現(xiàn)消費重復消費的問題稚疹,正常。因為這問題通常不是mq自己保證的祭务,是給你保證的内狗。然后我們挑一個kafka來舉個例子,說說怎么重復消費吧义锥。
kafka實際上有個offset的概念柳沙,就是每個消息寫進去,都有一個offset拌倍,代表他的序號赂鲤,然后consumer消費了數(shù)據(jù)之后,每隔一段時間柱恤,會把自己消費過的消息的offset提交一下数初,代表我已經(jīng)消費過了,下次我要是重啟啥的梗顺,你就讓我繼續(xù)從上次消費到的offset來繼續(xù)消費吧泡孩。
但是凡事總有意外,比如我們之前生產經(jīng)常遇到的寺谤,就是你有時候重啟系統(tǒng)仑鸥,看你怎么重啟了吮播,如果碰到點著急的,直接kill進程了锈候,再重啟薄料。這會導致consumer有些消息處理了,但是沒來得及提交offset泵琳,尷尬了摄职。重啟之后,少數(shù)消息會再次消費一次获列。
其實重復消費不可怕谷市,可怕的是你沒考慮到重復消費之后,怎么保證冪等性击孩。
給你舉個例子吧迫悠。假設你有個系統(tǒng),消費一條往數(shù)據(jù)庫里插入一條巩梢,要是你一個消息重復兩次创泄,你不就插入了兩條,這數(shù)據(jù)不就錯了括蝠?但是你要是消費到第二次的時候鞠抑,自己判斷一下已經(jīng)消費過了,直接扔了忌警,不就保留了一條數(shù)據(jù)搁拙?
一條數(shù)據(jù)重復出現(xiàn)兩次,數(shù)據(jù)庫里就只有一條數(shù)據(jù)法绵,這就保證了系統(tǒng)的冪等性
冪等性箕速,我通俗點說,就一個數(shù)據(jù)朋譬,或者一個請求盐茎,給你重復來多次,你得確保對應的數(shù)據(jù)是不會改變的徙赢,不能出錯庭呜。
其實還是得結合業(yè)務來思考,我這里給幾個思路:
(1)比如你拿個數(shù)據(jù)要寫庫犀忱,你先根據(jù)主鍵查一下募谎,如果這數(shù)據(jù)都有了,你就別插入了阴汇,update一下好吧
(2)比如你是寫redis数冬,那沒問題了,反正每次都是set,天然冪等性
(3)比如你不是上面兩個場景拐纱,那做的稍微復雜一點铜异,你需要讓生產者發(fā)送每條數(shù)據(jù)的時候,里面加一個全局唯一的id秸架,類似訂單id之類的東西揍庄,然后你這里消費到了之后,先根據(jù)這個id去比如redis里查一下东抹,之前消費過嗎蚂子?如果沒有消費過,你就處理缭黔,然后這個id寫redis食茎。如果消費過了,那你就別處理了馏谨,保證別重復處理相同的消息即可别渔。
還有比如基于數(shù)據(jù)庫的唯一鍵來保證重復數(shù)據(jù)不會重復插入多條,我們之前線上系統(tǒng)就有這個問題惧互,就是拿到數(shù)據(jù)的時候哎媚,每次重啟可能會有重復,因為kafka消費者還沒來得及提交offset喊儡,重復數(shù)據(jù)拿到了以后我們插入的時候拨与,因為有唯一鍵約束了,所以重復數(shù)據(jù)只會插入報錯管宵,不會導致數(shù)據(jù)庫中出現(xiàn)臟數(shù)據(jù)
如何保證MQ的消費是冪等性的,需要結合具體的業(yè)務來看
數(shù)據(jù)丟失怎么辦(如何保證消息的可靠性傳輸)
(1)rabbitmq
1)生產者弄丟了數(shù)據(jù)
生產者將數(shù)據(jù)發(fā)送到rabbitmq的時候攀甚,可能數(shù)據(jù)就在半路給搞丟了箩朴,因為網(wǎng)絡啥的問題,都有可能秋度。
此時可以選擇用rabbitmq提供的事務功能炸庞,就是生產者發(fā)送數(shù)據(jù)之前開啟rabbitmq事務(channel.txSelect),然后發(fā)送消息荚斯,如果消息沒有成功被rabbitmq接收到埠居,那么生產者會收到異常報錯,此時就可以回滾事務(channel.txRollback)事期,然后重試發(fā)送消息滥壕;如果收到了消息,那么可以提交事務(channel.txCommit)兽泣。但是問題是绎橘,rabbitmq事務機制一搞,基本上吞吐量會下來唠倦,因為太耗性能称鳞。
所以一般來說涮较,如果你要確保說寫rabbitmq的消息別丟,可以開啟confirm模式冈止,在生產者那里設置開啟confirm模式之后狂票,你每次寫的消息都會分配一個唯一的id,然后如果寫入了rabbitmq中熙暴,rabbitmq會給你回傳一個ack消息闺属,告訴你說這個消息ok了。如果rabbitmq沒能處理這個消息怨咪,會回調你一個nack接口屋剑,告訴你這個消息接收失敗,你可以重試诗眨。而且你可以結合這個機制自己在內存里維護每個消息id的狀態(tài)唉匾,如果超過一定時間還沒接收到這個消息的回調,那么你可以重發(fā)匠楚。
事務機制和cnofirm機制最大的不同在于巍膘,事務機制是同步的,你提交一個事務之后會阻塞在那兒芋簿,但是confirm機制是異步的峡懈,你發(fā)送個消息之后就可以發(fā)送下一個消息,然后那個消息rabbitmq接收了之后會異步回調你一個接口通知你這個消息接收到了与斤。
所以一般在生產者這塊避免數(shù)據(jù)丟失肪康,都是用confirm機制的。
2)rabbitmq弄丟了數(shù)據(jù)
就是rabbitmq自己弄丟了數(shù)據(jù)撩穿,這個你必須開啟rabbitmq的持久化磷支,就是消息寫入之后會持久化到磁盤,哪怕是rabbitmq自己掛了食寡,恢復之后會自動讀取之前存儲的數(shù)據(jù)雾狈,一般數(shù)據(jù)不會丟。除非極其罕見的是抵皱,rabbitmq還沒持久化善榛,自己就掛了,可能導致少量數(shù)據(jù)會丟失的呻畸,但是這個概率較小移盆。
設置持久化有兩個步驟,第一個是創(chuàng)建queue的時候將其設置為持久化的伤为,這樣就可以保證rabbitmq持久化queue的元數(shù)據(jù)味滞,但是不會持久化queue里的數(shù)據(jù);第二個是發(fā)送消息的時候將消息的deliveryMode設置為2,就是將消息設置為持久化的剑鞍,此時rabbitmq就會將消息持久化到磁盤上去昨凡。必須要同時設置這兩個持久化才行,rabbitmq哪怕是掛了蚁署,再次重啟便脊,也會從磁盤上重啟恢復queue,恢復這個queue里的數(shù)據(jù)光戈。
而且持久化可以跟生產者那邊的confirm機制配合起來哪痰,只有消息被持久化到磁盤之后,才會通知生產者ack了久妆,所以哪怕是在持久化到磁盤之前晌杰,rabbitmq掛了,數(shù)據(jù)丟了筷弦,生產者收不到ack肋演,你也是可以自己重發(fā)的。
哪怕是你給rabbitmq開啟了持久化機制烂琴,也有一種可能爹殊,就是這個消息寫到了rabbitmq中,但是還沒來得及持久化到磁盤上奸绷,結果不巧梗夸,此時rabbitmq掛了,就會導致內存里的一點點數(shù)據(jù)會丟失号醉。
3)消費端弄丟了數(shù)據(jù)
rabbitmq如果丟失了數(shù)據(jù)反症,主要是因為你消費的時候,剛消費到畔派,還沒處理铅碍,結果進程掛了,比如重啟了父虑,那么就尷尬了该酗,rabbitmq認為你都消費了授药,這數(shù)據(jù)就丟了士嚎。
這個時候得用rabbitmq提供的ack機制,簡單來說悔叽,就是你關閉rabbitmq自動ack莱衩,可以通過一個api來調用就行,然后每次你自己代碼里確保處理完的時候娇澎,再程序里ack一把笨蚁。這樣的話,如果你還沒處理完,不就沒有ack括细?那rabbitmq就認為你還沒處理完伪很,這個時候rabbitmq會把這個消費分配給別的consumer去處理,消息是不會丟的奋单。
(2)kafka
1)消費端弄丟了數(shù)據(jù)
唯一可能導致消費者弄丟數(shù)據(jù)的情況能颁,就是說嵌巷,你那個消費到了這個消息,然后消費者那邊自動提交了offset,讓kafka以為你已經(jīng)消費好了這個消息血淌,其實你剛準備處理這個消息,你還沒處理靶庙,你自己就掛了石蔗,此時這條消息就丟咯。
這不是一樣么乏苦,大家都知道kafka會自動提交offset株扛,那么只要關閉自動提交offset,在處理完之后自己手動提交offset邑贴,就可以保證數(shù)據(jù)不會丟席里。但是此時確實還是會重復消費,比如你剛處理完拢驾,還沒提交offset奖磁,結果自己掛了,此時肯定會重復消費一次繁疤,自己保證冪等性就好了咖为。
生產環(huán)境碰到的一個問題,就是說我們的kafka消費者消費到了數(shù)據(jù)之后是寫到一個內存的queue里先緩沖一下稠腊,結果有的時候躁染,你剛把消息寫入內存queue,然后消費者會自動提交offset架忌。
然后此時我們重啟了系統(tǒng)吞彤,就會導致內存queue里還沒來得及處理的數(shù)據(jù)就丟失了
2)kafka弄丟了數(shù)據(jù)
這塊比較常見的一個場景,就是kafka某個broker宕機叹放,然后重新選舉partiton的leader時饰恕。大家想想,要是此時其他的follower剛好還有些數(shù)據(jù)沒有同步井仰,結果此時leader掛了埋嵌,然后選舉某個follower成leader之后,他不就少了一些數(shù)據(jù)俱恶?這就丟了一些數(shù)據(jù)啊雹嗦。
生產環(huán)境也遇到過范舀,我們也是,之前kafka的leader機器宕機了了罪,將follower切換為leader之后锭环,就會發(fā)現(xiàn)說這個數(shù)據(jù)就丟了
所以此時一般是要求起碼設置如下4個參數(shù):
給這個topic設置replication.factor參數(shù):這個值必須大于1,要求每個partition必須有至少2個副本
在kafka服務端設置min.insync.replicas參數(shù):這個值必須大于1泊藕,這個是要求一個leader至少感知到有至少一個follower還跟自己保持聯(lián)系田藐,沒掉隊,這樣才能確保leader掛了還有一個follower吧
在producer端設置acks=all:這個是要求每條數(shù)據(jù)吱七,必須是寫入所有replica之后汽久,才能認為是寫成功了
在producer端設置retries=MAX(很大很大很大的一個值,無限次重試的意思):這個是要求一旦寫入失敗踊餐,就無限重試景醇,卡在這里了
我們生產環(huán)境就是按照上述要求配置的,這樣配置之后吝岭,至少在kafka broker端就可以保證在leader所在broker發(fā)生故障三痰,進行l(wèi)eader切換時,數(shù)據(jù)不會丟失
3)生產者會不會弄丟數(shù)據(jù)
如果按照上述的思路設置了ack=all窜管,一定不會丟散劫,要求是,你的leader接收到消息幕帆,所有的follower都同步到了消息之后获搏,才認為本次寫成功了。如果沒滿足這個條件失乾,生產者會自動不斷的重試常熙,重試無限次。
數(shù)據(jù)的順序性
1)rabbitmq保證數(shù)據(jù)的順序性
如果存在多個消費者碱茁,那么就讓每個消費者對應一個queue裸卫,然后把要發(fā)送 的數(shù)據(jù)全都放到一個queue,這樣就能保證所有的數(shù)據(jù)只到達一個消費者從而保證每個數(shù)據(jù)到達數(shù)據(jù)庫都是順序的纽竣。
(1)rabbitmq:拆分多個queue墓贿,每個queue一個consumer,就是多一些queue而已蜓氨,確實是麻煩點聋袋;或者就一個queue但是對應一個consumer,然后這個consumer內部用內存隊列做排隊语盈,然后分發(fā)給底層不同的worker來處理
1)kafka保證數(shù)據(jù)的順序性
?kafka 寫入partion時指定一個key舱馅,列如訂單id缰泡,那么消費者從partion中取出數(shù)據(jù)的時候肯定是有序的刀荒,當開啟多個線程的時候可能導致數(shù)據(jù)不一致代嗤,這時候就需要內存隊列,將相同的hash過的數(shù)據(jù)放在一個內存隊列里缠借,這樣就能保證一條線程對應一個內存隊列的數(shù)據(jù)寫入數(shù)據(jù)庫的時候順序性的干毅,從而可以開啟多條線程對應多個內存隊列
(2)kafka:一個topic,一個partition泼返,一個consumer硝逢,內部單線程消費,寫N個內存queue绅喉,然后N個線程分別消費一個內存queue即可
MQ積壓幾百萬條數(shù)據(jù)怎么辦渠鸽?
這個是我們真實遇到過的一個場景,確實是線上故障了柴罐,這個時候要不然就是修復consumer的問題徽缚,讓他恢復消費速度,然后傻傻的等待幾個小時消費完畢革屠。這個肯定不能在面試的時候說吧凿试。
一個消費者一秒是1000條,一秒3個消費者是3000條似芝,一分鐘是18萬條那婉,1000多萬條
所以如果你積壓了幾百萬到上千萬的數(shù)據(jù),即使消費者恢復了党瓮,也需要大概1小時的時間才能恢復過來
一般這個時候详炬,只能操作臨時緊急擴容了,具體操作步驟和思路如下:
1)先修復consumer的問題寞奸,確保其恢復消費速度痕寓,然后將現(xiàn)有cnosumer都停掉
2)新建一個topic,partition是原來的10倍蝇闭,臨時建立好原先10倍或者20倍的queue數(shù)量
3)然后寫一個臨時的分發(fā)數(shù)據(jù)的consumer程序呻率,這個程序部署上去消費積壓的數(shù)據(jù),消費之后不做耗時的處理呻引,直接均勻輪詢寫入臨時建立好的10倍數(shù)量的queue
4)接著臨時征用10倍的機器來部署consumer礼仗,每一批consumer消費一個臨時queue的數(shù)據(jù)
5)這種做法相當于是臨時將queue資源和consumer資源擴大10倍,以正常的10倍速度來消費數(shù)據(jù)
6)等快速消費完積壓數(shù)據(jù)之后逻悠,得恢復原先部署架構元践,重新用原先的consumer機器來消費消息
(2)這里我們假設再來第二個坑
假設你用的是rabbitmq,rabbitmq是可以設置過期時間的童谒,就是TTL单旁,如果消息在queue中積壓超過一定的時間就會被rabbitmq給清理掉,這個數(shù)據(jù)就沒了饥伊。那這就是第二個坑了象浑。這就不是說數(shù)據(jù)會大量積壓在mq里蔫饰,而是大量的數(shù)據(jù)會直接搞丟。
這個情況下愉豺,就不是說要增加consumer消費積壓的消息篓吁,因為實際上沒啥積壓,而是丟了大量的消息蚪拦。我們可以采取一個方案杖剪,就是批量重導,這個我們之前線上也有類似的場景干過驰贷。就是大量積壓的時候盛嘿,我們當時就直接丟棄數(shù)據(jù)了,然后等過了高峰期以后括袒,比如大家一起喝咖啡熬夜到晚上12點以后孩擂,用戶都睡覺了。
這個時候我們就開始寫程序箱熬,將丟失的那批數(shù)據(jù)类垦,寫個臨時程序,一點一點的查出來城须,然后重新灌入mq里面去蚤认,把白天丟的數(shù)據(jù)給他補回來。也只能是這樣了糕伐。
假設1萬個訂單積壓在mq里面砰琢,沒有處理,其中1000個訂單都丟了良瞧,你只能手動寫程序把那1000個訂單給查出來陪汽,手動發(fā)到mq里去再補一次
(3)然后我們再來假設第三個坑
如果走的方式是消息積壓在mq里,那么如果你很長時間都沒處理掉褥蚯,此時導致mq都快寫滿了挚冤,咋辦?這個還有別的辦法嗎赞庶?沒有训挡,誰讓你第一個方案執(zhí)行的太慢了,你臨時寫程序歧强,接入數(shù)據(jù)來消費澜薄,消費一個丟棄一個,都不要了摊册,快速消費掉所有的消息肤京。然后走第二個方案,到了晚上再補數(shù)據(jù)吧茅特。