消息隊列有什么優(yōu)點和缺點?
為什么使用消息隊列?假設(shè)你的業(yè)務(wù)場景遇到個技術(shù)挑戰(zhàn)镊讼,如果不用 MQ 可能會很麻煩,但是你用了 MQ 之后會帶給你很多好處祈餐。
消息隊列 MQ 的常見使用場景其實有很多唬格,但是比較核心的有如下三個:
- 解耦
- 異步
- 削峰
解耦:A 系統(tǒng)發(fā)送個數(shù)據(jù)到 BCD 三個系統(tǒng),接口調(diào)用發(fā)送韭寸,那如果 E 系統(tǒng)也要這個數(shù)據(jù)呢?那如果 C 系統(tǒng)現(xiàn)在不需要了呢?
現(xiàn)在 A 系統(tǒng)又要發(fā)送第二種數(shù)據(jù)了呢?而且 A 系統(tǒng)要時時刻刻考慮 BCDE 四個系統(tǒng)如果掛了咋辦?要不要重發(fā)?我要不要把消息存起來?
你需要去考慮一下你負(fù)責(zé)的系統(tǒng)中是否有類似的場景,就是一個系統(tǒng)或者一個模塊荆隘,調(diào)用了多個系統(tǒng)或者模塊恩伺,互相之間的調(diào)用很復(fù)雜,維護起來很麻煩椰拒。
但是晶渠,這個調(diào)用是不需要直接同步調(diào)用接口的,如果用 MQ 給他異步化解耦燃观,也是可以的褒脯。你就只需要去考慮在你的項目里,是不是可以運用這個 MQ 去進行系統(tǒng)的解耦缆毁。
異步:A 系統(tǒng)接收一個請求番川,需要在自己本地寫庫,還需要在 BCD 三個系統(tǒng)寫庫脊框,自己本地寫庫要 30ms颁督,BCD 三個系統(tǒng)分別寫庫要 300ms、450ms浇雹、200ms沉御。
最終請求總延時是 30 + 300 + 450 + 200 = 980ms,接近 1s昭灵,異步后嚷节,BCD 三個系統(tǒng)分別寫庫的時間,A 系統(tǒng)就不再考慮了虎锚。
削峰:每天 0 點到 16 點硫痰,A 系統(tǒng)風(fēng)平浪靜,每秒并發(fā)請求數(shù)量就 100 個窜护。結(jié)果每次一到 16 點~23 點效斑,每秒并發(fā)請求數(shù)量突然會暴增到 10000 條。
但是系統(tǒng)最大的處理能力就只能是每秒鐘處理 1000 個請求啊柱徙。怎么辦?需要我們進行流量的削峰缓屠,讓系統(tǒng)可以平緩的處理突增的請求。
優(yōu)點上面已經(jīng)說了护侮,就是在特殊場景下有其對應(yīng)的好處敌完,解耦、異步羊初、削峰滨溉,那么消息隊列有什么缺點?
系統(tǒng)可用性降低:系統(tǒng)引入的外部依賴越多什湘,越容易掛掉,本來你就是 A 系統(tǒng)調(diào)用 BCD 三個系統(tǒng)的接口就好了晦攒。
ABCD 四個系統(tǒng)好好的闽撤,沒啥問題,你偏加個 MQ 進來脯颜,萬一 MQ 掛了怎么辦?MQ 掛了哟旗,整套系統(tǒng)崩潰了,業(yè)務(wù)也就停頓了栋操。
系統(tǒng)復(fù)雜性提高:硬生生加個 MQ 進來闸餐,怎么保證消息沒有重復(fù)消費?怎么處理消息丟失的情況?怎么保證消息傳遞的順序性?
一致性問題:A 系統(tǒng)處理完了直接返回成功了,大家都以為你這個請求就成功了矾芙。
但問題是绎巨,要是 BCD 三個系統(tǒng)那里,BD 兩個系統(tǒng)寫庫成功了蠕啄,結(jié)果 C 系統(tǒng)寫庫失敗了场勤,你這數(shù)據(jù)就不一致了。
所以消息隊列實際是一種非常復(fù)雜的架構(gòu)歼跟,你引入它有很多好處和媳,但是也得針對它帶來的壞處做各種額外的技術(shù)方案和架構(gòu)來規(guī)避掉。
常見消息隊列的比較如下圖:
如何解決重復(fù)消費?
消息重復(fù)的原因
消息發(fā)送端應(yīng)用的消息重復(fù)發(fā)送哈街,有以下幾種情況:
- 消息發(fā)送端發(fā)送消息給消息中間件留瞳,消息中間件收到消息并成功存儲,而這時消息中間件出現(xiàn)了問題骚秦,導(dǎo)致應(yīng)用端沒有收到消息發(fā)送成功的返回因而進行重試產(chǎn)生了重復(fù)她倘。
- 消息中間件因為負(fù)載高響應(yīng)變慢,成功把消息存儲到消息存儲中后作箍,返回“成功”這個結(jié)果時超時硬梁。
- 消息中間件將消息成功寫入消息存儲,在返回結(jié)果時網(wǎng)絡(luò)出現(xiàn)問題胞得,導(dǎo)致應(yīng)用發(fā)送端重試荧止,而重試時網(wǎng)絡(luò)恢復(fù),由此導(dǎo)致重復(fù)阶剑。
可以看到跃巡,通過消息發(fā)送端產(chǎn)生消息重復(fù)的主要原因是消息成功進入消息存儲后,因為各種原因使得消息發(fā)送端沒有收到“成功”的返回結(jié)果牧愁,并且又有重試機制素邪,因而導(dǎo)致重復(fù)。
消息到達了消息存儲猪半,由消息中間件進行向外的投遞時產(chǎn)生重復(fù)兔朦,有以下幾種情況:
- 消息被投遞到消息接收者應(yīng)用進行處理偷线,處理完畢后應(yīng)用出問題了,消息中間件不知道消息處理結(jié)果烘绽,會再次投遞。
- 消息被投遞到消息接收者應(yīng)用進行處理俐填,處理完畢后網(wǎng)絡(luò)出現(xiàn)問題了安接,消息中間件沒有收到消息處理結(jié)果,會再次投遞英融。
- 消息被投遞到消息接收者應(yīng)用進行處理盏檐,處理時間比較長,消息中間件因為消息超時會再次投遞驶悟。
- 消息被投遞到消息接收者應(yīng)用進行處理胡野,處理完畢后消息中間件出問題了,沒能收到消息結(jié)果并處理,會再次投遞痕鳍。
- 消息被投遞到消息接收者應(yīng)用進行處理硫豆,處理完畢后消息中間件收到結(jié)果但是遇到消息存儲故障,沒能更新投遞狀態(tài)笼呆,會再次投遞熊响。
可以看到,在投遞過程中產(chǎn)生的消息重復(fù)接收主要是因為消息接收者成功處理完消息后诗赌,消息中間件不能及時更新投遞狀態(tài)造成的汗茄。
如何解決重復(fù)消費
那么有什么辦法可以解決呢?主要是要求消息接收者來處理這種重復(fù)的情況,也就是要求消息接收者的消息處理是冪等操作铭若。
什么是冪等性?對于消息接收端的情況洪碳,冪等的含義是采用同樣的輸入多次調(diào)用處理函數(shù),得到同樣的結(jié)果叼屠。
例如瞳腌,一個 SQL 操作:
1. update stat_table set count= 10 where id =1
這個操作多次執(zhí)行,id 等于 1 的記錄中的 count 字段的值都為 10镜雨,這個操作就是冪等的纯趋,我們不用擔(dān)心這個操作被重復(fù)。
再來看另外一個 SQL 操作:
1. update stat_table set count= count +1 where id= 1;
這樣的 SQL 操作就不是冪等的冷离,一旦重復(fù)吵冒,結(jié)果就會產(chǎn)生變化。
因此應(yīng)對消息重復(fù)的辦法是使消息接收端的處理是一個冪等操作西剥。這樣的做法降低了消息中間件的整體復(fù)雜性痹栖,不過也給使用消息中間件的消息接收端應(yīng)用帶來了一定的限制和門檻。
①MVCC
多版本并發(fā)控制瞭空,樂觀鎖的一種實現(xiàn)揪阿,在生產(chǎn)者發(fā)送消息時進行數(shù)據(jù)更新時需要帶上數(shù)據(jù)的版本號疗我,消費者去更新時需要去比較持有數(shù)據(jù)的版本號,版本號不一致的操作無法成功南捂。
例如博客點贊次數(shù)自動 +1 的接口:
1. public boolean addCount(Long id, Long version);
2. update blogTable set count= count+1,version=version+1 where id=321 and version=123
每一個 version 只有一次執(zhí)行成功的機會吴裤,一旦失敗了生產(chǎn)者必須重新獲取數(shù)據(jù)的最新版本號再次發(fā)起更新。
②去重表
利用數(shù)據(jù)庫表單的特性來實現(xiàn)冪等溺健,常用的一個思路是在表上構(gòu)建唯一性索引麦牺,保證某一類數(shù)據(jù)一旦執(zhí)行完畢,后續(xù)同樣的請求不再重復(fù)處理了(利用一張日志表來記錄已經(jīng)處理成功的消息的 id鞭缭,如果新到的消息 id 已經(jīng)在日志表中剖膳,那么就不再處理這條消息。)
以電商平臺為例子岭辣,電商平臺上的訂單 id 就是最適合的 token吱晒。當(dāng)用戶下單時,會經(jīng)歷多個環(huán)節(jié)沦童,比如生成訂單仑濒,減庫存,減優(yōu)惠券等等偷遗。
每一個環(huán)節(jié)執(zhí)行時都先檢測一下該訂單 id 是否已經(jīng)執(zhí)行過這一步驟躏精,對未執(zhí)行的請求,執(zhí)行操作并緩存結(jié)果鹦肿,而對已經(jīng)執(zhí)行過的 id矗烛,則直接返回之前的執(zhí)行結(jié)果,不做任何操作箩溃。
這樣可以在最大程度上避免操作的重復(fù)執(zhí)行問題瞭吃,緩存起來的執(zhí)行結(jié)果也能用于事務(wù)的控制等。
如何保證消息的可靠性傳輸?
ActiveMQ
要保證消息的可靠性涣旨,除了消息的持久化歪架,還包括兩個方面:
- 生產(chǎn)者發(fā)送的消息可以被 ActiveMQ 收到。
- 消費者收到了 ActiveMQ 發(fā)送的消息霹陡。
①生產(chǎn)者
非持久化又不在事務(wù)中的消息和蚪,可能會有消息的丟失。為保證消息可以被 ActiveMQ 收到烹棉,我們應(yīng)該采用事務(wù)消息或持久化消息攒霹。
②消費者
消費者對消息的確認(rèn)有四種機制:
- AUTO_ACKNOWLEDGE=1:自動確認(rèn)
- CLIENT_ACKNOWLEDGE=2:客戶端手動確認(rèn)
- DUPS_OK_ACKNOWLEDGE=3:自動批量確認(rèn)
- SESSION_TRANSACTED=0:事務(wù)提交并確認(rèn)
ACK_MODE 描述了 Consumer 與 Broker 確認(rèn)消息的方式(時機),比如當(dāng)消息被 Consumer 接收之后浆洗,Consumer 將在何時確認(rèn)消息催束。
所以 ack_mode 描述的不是 Producer 與 Broker 之間的關(guān)系,而是 Customer 與 Broker 之間的關(guān)系伏社。
對于 Broker 而言抠刺,只有接收到 ACK 指令塔淤,才會認(rèn)為消息被正確的接收或者處理成功了。通過 ACK速妖,可以在 Consumer 與 Broker 之間建立一種簡單的“擔(dān)备叻洌”機制。
AUTO_ACKNOWLEDGE:自動確認(rèn)罕容,“同步”(receive)方法返回 message 給消息時會立即確認(rèn)备恤。
在"異步"(messageListener)方式中,將會首先調(diào)用listener.onMessage(message)杀赢。
如果 onMessage 方法正常結(jié)束烘跺,消息將會正常確認(rèn);如果 onMessage 方法異常湘纵,將導(dǎo)致消費者要求 ActiveMQ 重發(fā)消息脂崔。
CLIENT_ACKNOWLEDGE:客戶端手動確認(rèn),這就意味著 AcitveMQ 將不會“自作主張”的為你 ACK 任何消息梧喷,開發(fā)者需要自己擇機確認(rèn)砌左。
我們可以在當(dāng)前消息處理成功之后,立即調(diào)用 message.acknowledge() 方法來"逐個"確認(rèn)消息铺敌,這樣可以盡可能的減少因網(wǎng)絡(luò)故障而導(dǎo)致消息重發(fā)的個數(shù)汇歹。
當(dāng)然也可以處理多條消息之后,間歇性的調(diào)用 ACKNOWLEDGE 方法來一次確認(rèn)多條消息偿凭,減少 ACK 的次數(shù)來提升 Consumer 的效率产弹,不過需要自行權(quán)衡。
DUPS_OK_ACKNOWLEDGE:類似于 AUTO_ACK 確認(rèn)機制弯囊,為自動批量確認(rèn)而生痰哨,而且具有“延遲”確認(rèn)的特點,ActiveMQ 會根據(jù)內(nèi)部算法匾嘱,在收到一定數(shù)量的消息自動進行確認(rèn)斤斧。
在此模式下,可能會出現(xiàn)重復(fù)消息霎烙,什么時候?當(dāng) Consumer 故障重啟后撬讽,那些尚未 ACK 的消息會重新發(fā)送過來。
SESSION_TRANSACTED:當(dāng) Session 使用事務(wù)時悬垃,就是使用此模式游昼。當(dāng)決定事務(wù)中的消息可以確認(rèn)時,必須調(diào)用 session.commit() 方法尝蠕,Commit 方法將會導(dǎo)致當(dāng)前 Session 的事務(wù)中所有消息立即被確認(rèn)酱床。
在事務(wù)開始之后的任何時機調(diào)用 rollback(),意味著當(dāng)前事務(wù)的結(jié)束趟佃,事務(wù)中所有的消息都將被重發(fā)扇谣。當(dāng)然在 Commit 之前拋出異常昧捷,也會導(dǎo)致事務(wù)的 rollback。
RabbitMQ
①生產(chǎn)者弄丟了數(shù)據(jù)
生產(chǎn)者將數(shù)據(jù)發(fā)送到 RabbitMQ 的時候罐寨,可能數(shù)據(jù)就在半路給搞丟了靡挥,因為網(wǎng)絡(luò)啥的問題,都有可能鸯绿。
此時可以選擇用 RabbitMQ 提供的事務(wù)功能跋破,就是生產(chǎn)者發(fā)送數(shù)據(jù)之前開啟 RabbitMQ 事務(wù)(channel.txSelect),然后發(fā)送消息瓶蝴,如果消息沒有成功被 RabbitMQ 接收到毒返,那么生產(chǎn)者會收到異常報錯德绿。
此時就可以回滾事務(wù)(channel.txRollback)凡辱,然后重試發(fā)送消息;如果收到了消息,那么可以提交事務(wù)(channel.txCommit)燕少。
但是問題是男窟,RabbitMQ 事務(wù)機制一搞盆赤,基本上吞吐量會下來,因為太耗性能歉眷。
所以一般來說牺六,如果要確保 RabbitMQ 的消息別丟,可以開啟 Confirm 模式汗捡。
在生產(chǎn)者那里設(shè)置開啟 Confirm 模式之后淑际,你每次寫的消息都會分配一個唯一的 id,然后如果寫入了 RabbitMQ 中扇住,RabbitMQ 會給你回傳一個 ACK 消息春缕,告訴你說這個消息 OK 了。
如果 RabbitMQ 沒能處理這個消息台囱,會回調(diào)你一個 nack 接口淡溯,告訴你這個消息接收失敗,你可以重試簿训。
而且你可以結(jié)合這個機制咱娶,自己在內(nèi)存里維護每個消息 id 的狀態(tài),如果超過一定時間還沒接收到這個消息的回調(diào)强品,那么你可以重發(fā)膘侮。
事務(wù)機制和 Cnofirm 機制最大的不同在于:事務(wù)機制是同步的,你提交一個事務(wù)之后會阻塞在那兒的榛。
但是 Confirm 機制是異步的琼了,你發(fā)送個消息之后就可以發(fā)送下一個消息,然后那個消息 RabbitMQ 接收了之后會異步回調(diào)你一個接口通知你這個消息接收到了。
所以一般在生產(chǎn)者這塊避免數(shù)據(jù)丟失雕薪,都是用 Confirm 機制的昧诱。
②RabbitMQ 弄丟了數(shù)據(jù)
就是 RabbitMQ 自己弄丟了數(shù)據(jù),這個你必須開啟 RabbitMQ 的持久化所袁,就是消息寫入之后會持久化到磁盤盏档,哪怕是 RabbitMQ 自己掛了,恢復(fù)之后會自動讀取之前存儲的數(shù)據(jù)燥爷,一般數(shù)據(jù)不會丟蜈亩。
除非極其罕見的是,RabbitMQ 還沒持久化前翎,自己就掛了稚配,可能導(dǎo)致少量數(shù)據(jù)會丟失的,但是這個概率較小港华。
設(shè)置持久化有兩個步驟:
- 創(chuàng)建 queue 和交換器的時候?qū)⑵湓O(shè)置為持久化的道川,這樣就可以保證 RabbitMQ 持久化相關(guān)的元數(shù)據(jù),但是不會持久化 queue 里的數(shù)據(jù)苹丸。
- 發(fā)送消息的時候?qū)⑾⒌?deliveryMode 設(shè)置為 2愤惰,就是將消息設(shè)置為持久化的苇经,此時 RabbitMQ 就會將消息持久化到磁盤上去赘理。
必須要同時設(shè)置這兩個持久化才行,RabbitMQ 哪怕是掛了扇单,再次重啟商模,也會從磁盤上重啟恢復(fù) queue,恢復(fù)這個 queue 里的數(shù)據(jù)蜘澜。
而且持久化可以跟生產(chǎn)者那邊的 Confirm 機制配合起來施流,只有消息被持久化到磁盤之后,才會通知生產(chǎn)者 ACK 了鄙信。
所以哪怕是在持久化到磁盤之前瞪醋,RabbitMQ 掛了,數(shù)據(jù)丟了装诡,生產(chǎn)者收不到 ACK银受,你也是可以自己重發(fā)的。
哪怕是你給 RabbitMQ 開啟了持久化機制鸦采,也有一種可能宾巍,就是這個消息寫到了 RabbitMQ 中,但是還沒來得及持久化到磁盤上渔伯,結(jié)果不巧顶霞,此時 RabbitMQ 掛了,就會導(dǎo)致內(nèi)存里的一點點數(shù)據(jù)會丟失锣吼。
③消費端弄丟了數(shù)據(jù)
RabbitMQ 如果丟失了數(shù)據(jù)选浑,主要是因為你消費的時候蓝厌,剛消費到,還沒處理古徒,結(jié)果進程掛了褂始,比如重啟了,那么就尷尬了描函,RabbitMQ 認(rèn)為你都消費了崎苗,這數(shù)據(jù)就丟了。
這個時候得用 RabbitMQ 提供的 ACK 機制舀寓,簡單來說胆数,就是你關(guān)閉 RabbitMQ 自動 ACK,可以通過一個 API 來調(diào)用就行互墓,然后每次你自己代碼里確保處理完的時候必尼,再程序里 ACK 一把。
這樣的話篡撵,如果你還沒處理完判莉,不就沒有 ACK?那 RabbitMQ 就認(rèn)為你還沒處理完,這個時候 RabbitMQ 會把這個消費分配給別的 Consumer 去處理育谬,消息是不會丟的券盅。
Kafka
①消費端弄丟了數(shù)據(jù)
唯一可能導(dǎo)致消費者弄丟數(shù)據(jù)的情況,就是說膛檀,你那個消費到了這個消息锰镀,然后消費者那邊自動提交了 Offset,讓 Kafka 以為你已經(jīng)消費好了這個消息咖刃。
其實你剛準(zhǔn)備處理這個消息泳炉,你還沒處理,你自己就掛了嚎杨,此時這條消息就丟咯花鹅。
大家都知道 Kafka 會自動提交 Offset,那么只要關(guān)閉自動提交 Offset枫浙,在處理完之后自己手動提交 Offset刨肃,就可以保證數(shù)據(jù)不會丟。
但是此時確實還是會重復(fù)消費自脯,比如你剛處理完之景,還沒提交 Offset,結(jié)果自己掛了膏潮,此時肯定會重復(fù)消費一次锻狗,自己保證冪等性就好了。
生產(chǎn)環(huán)境碰到的一個問題,就是說我們的 Kafka 消費者消費到了數(shù)據(jù)之后是寫到一個內(nèi)存的 queue 里先緩沖一下轻纪,結(jié)果有的時候油额,你剛把消息寫入內(nèi)存 queue,然后消費者會自動提交 Offset刻帚。
然后此時我們重啟了系統(tǒng)潦嘶,就會導(dǎo)致內(nèi)存 queue 里還沒來得及處理的數(shù)據(jù)就丟失了。
②Kafka 弄丟了數(shù)據(jù)
這塊比較常見的一個場景崇众,就是 Kafka 某個 Broker 宕機掂僵,然后重新選舉 Partition 的 Leader 時。
大家想想顷歌,要是此時其他的 Follower 剛好還有些數(shù)據(jù)沒有同步锰蓬,結(jié)果此時 Leader 掛了,然后選舉某個 Follower 成 Leader 之后眯漩,他不就少了一些數(shù)據(jù)?這就丟了一些數(shù)據(jù)啊芹扭。
所以此時一般是要求起碼設(shè)置如下四個參數(shù):
- 給這個 Topic 設(shè)置 replication.factor 參數(shù):這個值必須大于 1,要求每個 Partition 必須有至少 2 個副本赦抖。
- 在 Kafka 服務(wù)端設(shè)置 min.insync.replicas 參數(shù):這個值必須大于 1舱卡,這個是要求一個 Leader 至少感知到有至少一個 Follower 還跟自己保持聯(lián)系,沒掉隊队萤,這樣才能確保 Leader 掛了還有一個 Follower 吧轮锥。
- 在 Producer 端設(shè)置 acks=all:這個是要求每條數(shù)據(jù),必須是寫入所有 Replica 之后浮禾,才能認(rèn)為是寫成功了交胚。
- 在 Producer 端設(shè)置 retries=MAX(很大很大很大的一個值份汗,無限次重試的意思):這個是要求一旦寫入失敗盈电,就無限重試,卡在這里了杯活。
③生產(chǎn)者會不會弄丟數(shù)據(jù)
如果按照上述的思路設(shè)置了 ack=all匆帚,一定不會丟,要求是旁钧,你的 Leader 接收到消息吸重,所有的 Follower 都同步到了消息之后,才認(rèn)為本次寫成功了歪今。如果沒滿足這個條件嚎幸,生產(chǎn)者會自動不斷的重試,重試無限次寄猩。
消息的順序性
從根本上說嫉晶,異步消息是不應(yīng)該有順序依賴的,在 MQ 上估計是沒法解決。
要實現(xiàn)嚴(yán)格的順序消息替废,簡單且可行的辦法就是:保證生產(chǎn)者箍铭、MQServer、消費者是一對一對一的關(guān)系椎镣。
ActiveMQ
①通過高級特性 Consumer 獨有消費者(exclusive consumer)
1. queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");
2. consumer = session.createConsumer(queue);
當(dāng)在接收信息的時候诈火,有多個獨占消費者的時候,只有一個獨占消費者可以接收到消息状答。
獨占消息就是在有多個消費者同時消費一個 queue 時冷守,可以保證只有一個消費者可以消費消息。
這樣雖然保證了消息的順序問題惊科,不過也帶來了一個問題教沾,就是這個 queue 的所有消息將只會在這一個主消費者上消費,其他消費者將閑置译断,達不到負(fù)載均衡分配授翻。
而實際業(yè)務(wù)我們可能更多的是這樣的場景,比如一個訂單會發(fā)出一組順序消息孙咪,我們只要求這一組消息是順序消費的堪唐,而訂單與訂單之間又是可以并行消費的,不需要順序翎蹈,因為順序也沒有任何意義淮菠。
有沒有辦法做到呢?可以利用 ActiveMQ 的另一個高級特性之 messageGroup。
②利用 ActiveMQ 的高級特性:Message Groups
Message Groups 特性是一種負(fù)載均衡的機制荤堪。在一個消息被分發(fā)到 Consumer 之前合陵,Broker 首先檢查消息 JMSXGroupID 屬性。
如果存在澄阳,那么 Broker 會檢查是否有某個 Consumer 擁有這個 Message Group拥知。
如果沒有,那么 Broker 會選擇一個 Consumer碎赢,并將它關(guān)聯(lián)到這個 Message Group低剔。
此后,這個 Consumer 會接收這個 Message Group 的所有消息肮塞,直到 Consumer 被關(guān)閉襟齿。
Message Group 被關(guān)閉,通過發(fā)送一個消息枕赵,并設(shè)置這個消息的 JMSXGroupSeq 為 -1猜欺。
1. bytesMessage.setStringProperty("JMSXGroupID", "constact-20100000002");
2. bytesMessage.setIntProperty("JMSXGroupSeq", -1);
如上圖所示,同一個 queue 中拷窜,擁有相同 JMSXGroupID 的消息將發(fā)往同一個消費者开皿,解決順序問題;不同分組的消息又能被其他消費者并行消費钓试,解決負(fù)載均衡的問題。
RabbitMQ
如果有順序依賴的消息副瀑,要保證消息有一個 hashKey弓熏,類似于數(shù)據(jù)庫表分區(qū)的的分區(qū) key 列。保證對同一個 key 的消息發(fā)送到相同的隊列糠睡。
A 用戶產(chǎn)生的消息(包括創(chuàng)建消息和刪除消息)都按 A 的 hashKey 分發(fā)到同一個隊列挽鞠。
只需要把強相關(guān)的兩條消息基于相同的路由就行了,也就是說經(jīng)過 m1 和 m2 的在路由表里的路由是一樣的狈孔,那自然 m1 會優(yōu)先于 m2 去投遞信认。而且一個 queue 只對應(yīng)一個 Consumer。
Kafka
一個 Topic均抽,一個 Partition嫁赏,一個 Consumer,內(nèi)部單線程消費油挥。
如何解決消息隊列的延時以及過期失效問題?RabbitMQ 是可以設(shè)置過期時間的潦蝇,就是 TTL。
如果消息在 queue 中積壓超過一定的時間深寥,而又沒有設(shè)置死信隊列機制攘乒,就會被 RabbitMQ 給清理掉,這個數(shù)據(jù)就沒了惋鹅。ActiveMQ 則通過更改配置则酝,支持消息的定時發(fā)送。
有幾百萬消息持續(xù)積壓幾小時怎么解決?
發(fā)生了線上故障闰集,幾千萬條數(shù)據(jù)在 MQ 里積壓很久沽讹。是修復(fù) Consumer 的問題,讓他恢復(fù)消費速度武鲁,然后等待幾個小時消費完畢?這是個解決方案爽雄,不過有時候我們還會進行臨時緊急擴容。
一個消費者一秒是 1000 條洞坑,3 個消費者一秒是 3000 條盲链,一分鐘是 18 萬條。
所以如果積壓了幾百萬到上千萬的數(shù)據(jù)迟杂,即使消費者恢復(fù)了,也需要大概一小時的時間才能恢復(fù)過來本慕。
一般這個時候排拷,只能操作臨時緊急擴容了,具體操作步驟和思路如下:
- 先修復(fù) Consumer 的問題锅尘,確保其恢復(fù)消費速度监氢,然后將現(xiàn)有 Consumer 都停掉布蔗。
- 新建一個 Topic,Partition 是原來的 10 倍浪腐,臨時建立好原先 10 倍或者 20 倍的 queue 數(shù)量纵揍。
然后寫一個臨時的分發(fā)數(shù)據(jù)的 Consumer 程序,這個程序部署上去消費積壓的數(shù)據(jù)议街,消費之后不做耗時的處理泽谨,直接均勻輪詢寫入臨時建立好的 10 倍數(shù)量的 queue。
- 接著臨時征用 10 倍的機器來部署 Consumer特漩,每一批 Consumer 消費一個臨時 queue 的數(shù)據(jù)吧雹。
- 這種做法相當(dāng)于是臨時將 queue 資源和 Consumer 資源擴大 10 倍,以正常的 10 倍速度來消費數(shù)據(jù)涂身。
- 等快速消費完積壓數(shù)據(jù)之后雄卷,再恢復(fù)原先部署架構(gòu),重新用原先的 Consumer 機器來消費消息蛤售。
Kafka是如何實現(xiàn)高性能的?
①宏觀架構(gòu)層面利用 Partition 實現(xiàn)并行處理
Kafka 中每個 Topic 都包含一個或多個 Partition丁鹉,不同 Partition 可位于不同節(jié)點。
同時 Partition 在物理上對應(yīng)一個本地文件夾悴能,每個 Partition 包含一個或多個 Segment鳄炉,每個 Segment 包含一個數(shù)據(jù)文件和一個與之對應(yīng)的索引文件。
在邏輯上搜骡,可以把一個 Partition 當(dāng)作一個非常長的數(shù)組拂盯,可通過這個“數(shù)組”的索引(Offset)去訪問其數(shù)據(jù)。
一方面记靡,由于不同 Partition 可位于不同機器谈竿,因此可以充分利用集群優(yōu)勢,實現(xiàn)機器間的并行處理摸吠。
另一方面空凸,由于 Partition 在物理上對應(yīng)一個文件夾,即使多個 Partition 位于同一個節(jié)點寸痢,也可通過配置讓同一節(jié)點上的不同 Partition 置于不同的 disk drive 上呀洲,從而實現(xiàn)磁盤間的并行處理,充分發(fā)揮多磁盤的優(yōu)勢啼止。
利用多磁盤的具體方法是道逗,將不同磁盤 mount 到不同目錄,然后在 server.properties 中献烦,將 log.dirs 設(shè)置為多目錄(用逗號分隔)滓窍。
Kafka 會自動將所有 Partition 盡可能均勻分配到不同目錄也即不同目錄(也即不同 disk)上。
Partition 是最小并發(fā)粒度巩那,Partition 個數(shù)決定了可能的最大并行度吏夯。
②ISR 實現(xiàn)可用性與數(shù)據(jù)一致性的動態(tài)平衡
常用數(shù)據(jù)復(fù)制及一致性方案有如下幾種:
Master-Slave:
- RDBMS 的讀寫分離即為典型的 Master-Slave 方案此蜈。
- 同步復(fù)制可保證強一致性但會影響可用性。
- 異步復(fù)制可提供高可用性但會降低一致性噪生。
WNR:
- 主要用于去中心化的分布式系統(tǒng)中裆赵。
- N 代表總副本數(shù),W 代表每次寫操作要保證的最少寫成功的副本數(shù)跺嗽,R 代表每次讀至少要讀取的副本數(shù)战授。
- 當(dāng) W+R>N 時,可保證每次讀取的數(shù)據(jù)至少有一個副本擁有最新的數(shù)據(jù)抛蚁。
- 多個寫操作的順序難以保證陈醒,可能導(dǎo)致多副本間的寫操作順序不一致。Dynamo 通過向量時鐘保證最終一致性瞧甩。
Paxos 及其變種:
- Google 的 Chubby钉跷,Zookeeper 的原子廣播協(xié)議(Zab),RAFT 等肚逸。
基于 ISR 的數(shù)據(jù)復(fù)制方案:Kafka 的數(shù)據(jù)復(fù)制是以 Partition 為單位的爷辙。而多個備份間的數(shù)據(jù)復(fù)制,通過 Follower 向 Leader 拉取數(shù)據(jù)完成朦促。
從這一點來講膝晾,Kafka 的數(shù)據(jù)復(fù)制方案接近于上文所講的 Master-Slave 方案。
不同的是务冕,Kafka 既不是完全的同步復(fù)制血当,也不是完全的異步復(fù)制,而是基于 ISR 的動態(tài)復(fù)制方案禀忆。
ISR臊旭,也即 In-Sync Replica。每個 Partition 的 Leader 都會維護這樣一個列表箩退,該列表中离熏,包含了所有與之同步的 Replica(包含 Leader 自己)。
每次數(shù)據(jù)寫入時戴涝,只有 ISR 中的所有 Replica 都復(fù)制完滋戳,Leader 才會將其置為 Commit,它才能被 Consumer 所消費啥刻。
這種方案奸鸯,與同步復(fù)制非常接近。但不同的是郑什,這個 ISR 是由 Leader 動態(tài)維護的府喳。
如果 Follower 不能緊“跟上”Leader,它將被 Leader 從 ISR 中移除蘑拯,待它又重新“跟上”Leader 后钝满,會被 Leader 再次加到 ISR 中。每次改變 ISR 后申窘,Leader 都會將最新的 ISR 持久化到 Zookeeper 中弯蚜。
由于 Leader 可移除不能及時與之同步的 Follower,故與同步復(fù)制相比可避免最慢的 Follower 拖慢整體速度剃法,也即 ISR 提高了系統(tǒng)可用性碎捺。
ISR 中的所有 Follower 都包含了所有 Commit 過的消息,而只有 Commit 過的消息才會被 Consumer 消費贷洲。
故從 Consumer 的角度而言收厨,ISR 中的所有 Replica 都始終處于同步狀態(tài),從而與異步復(fù)制方案相比提高了數(shù)據(jù)一致性优构。
ISR 可動態(tài)調(diào)整诵叁,極限情況下,可以只包含 Leader钦椭,極大提高了可容忍的宕機的 Follower 的數(shù)量拧额。
與 Majority Quorum 方案相比,容忍相同個數(shù)的節(jié)點失敗彪腔,所要求的總節(jié)點數(shù)少了近一半侥锦。
③具體實現(xiàn)層面高效使用磁盤特性和操作系統(tǒng)特性
將寫磁盤的過程變?yōu)轫樞驅(qū)?/strong>
Kafka 的整個設(shè)計中,Partition 相當(dāng)于一個非常長的數(shù)組德挣,而 Broker 接收到的所有消息順序?qū)懭脒@個大數(shù)組中恭垦。
同時 Consumer 通過 Offset 順序消費這些數(shù)據(jù),并且不刪除已經(jīng)消費的數(shù)據(jù)格嗅,從而避免了隨機寫磁盤的過程番挺。
由于磁盤有限,不可能保存所有數(shù)據(jù)吗浩,實際上作為消息系統(tǒng) Kafka 也沒必要保存所有數(shù)據(jù)建芙,需要刪除舊的數(shù)據(jù)。
而這個刪除過程懂扼,并非通過使用“讀-寫”模式去修改文件禁荸,而是將 Partition 分為多個 Segment,每個 Segment 對應(yīng)一個物理文件阀湿,通過刪除整個文件的方式去刪除 Partition 內(nèi)的數(shù)據(jù)赶熟。
這種方式清除舊數(shù)據(jù)的方式,也避免了對文件的隨機寫操作陷嘴。在存儲機制上映砖,使用了 Log Structured Merge Trees(LSM) 。
注:Log Structured Merge Trees(LSM)灾挨,谷歌 “BigTable” 的論文中提出邑退,LSM 是當(dāng)前被用在許多產(chǎn)品的文件結(jié)構(gòu)策略:HBase竹宋,Cassandra,LevelDB地技,SQLite蜈七,Kafka。
LSM 被設(shè)計來提供比傳統(tǒng)的 B+ 樹或者 ISAM 更好的寫操作吞吐量莫矗,通過消去隨機的本地更新操作來達到這個目標(biāo)飒硅。
這個問題的本質(zhì)還是磁盤隨機操作慢,順序讀寫快作谚。這兩種操作存在巨大的差距三娩,無論是磁盤還是 SSD,而且快至少三個數(shù)量級妹懒。
充分利用 Page Cache
使用 Page Cache 的好處如下:
- I/O Scheduler 會將連續(xù)的小塊寫組裝成大塊的物理寫從而提高性能雀监。
- I/O Scheduler 會嘗試將一些寫操作重新按順序排好,從而減少磁盤頭的移動時間彬伦。
- 充分利用所有空閑內(nèi)存(非 JVM 內(nèi)存)滔悉。如果使用應(yīng)用層 Cache(即 JVM 堆內(nèi)存),會增加 GC 負(fù)擔(dān)单绑。
- 讀操作可直接在 Page Cache 內(nèi)進行回官。如果消費和生產(chǎn)速度相當(dāng),甚至不需要通過物理磁盤(直接通過 Page Cache)交換數(shù)據(jù)搂橙。
- 如果進程重啟歉提,JVM 內(nèi)的 Cache 會失效,但 Page Cache 仍然可用区转。
Broker 收到數(shù)據(jù)后苔巨,寫磁盤時只是將數(shù)據(jù)寫入 Page Cache,并不保證數(shù)據(jù)一定完全寫入磁盤废离。
從這一點看侄泽,可能會造成機器宕機時,Page Cache 內(nèi)的數(shù)據(jù)未寫入磁盤從而造成數(shù)據(jù)丟失蜻韭。
但是這種丟失只發(fā)生在機器斷電等造成操作系統(tǒng)不工作的場景悼尾,而這種場景完全可以由 Kafka 層面的 Replication 機制去解決。
如果為了保證這種情況下數(shù)據(jù)不丟失而強制將 Page Cache 中的數(shù)據(jù) Flush 到磁盤肖方,反而會降低性能闺魏。
也正因如此,Kafka 雖然提供了 flush.messages 和 flush.ms 兩個參數(shù)將 Page Cache 中的數(shù)據(jù)強制 Flush 到磁盤俯画,但是 Kafka 并不建議使用析桥。
如果數(shù)據(jù)消費速度與生產(chǎn)速度相當(dāng),甚至不需要通過物理磁盤交換數(shù)據(jù),而是直接通過 Page Cache 交換數(shù)據(jù)泡仗。同時埋虹,F(xiàn)ollower 從 Leader Fetch 數(shù)據(jù)時,也可通過 Page Cache 完成沮焕。
注:Page Cache吨岭,又稱 pcache拉宗,其中文名稱為頁高速緩沖存儲器峦树,簡稱頁高緩。
Page Cache 的大小為一頁旦事,通常為 4K魁巩。在 Linux 讀寫文件時,它用于緩存文件的邏輯內(nèi)容姐浮,從而加快對磁盤上映像和數(shù)據(jù)的訪問谷遂。 這是 Linux 操作系統(tǒng)的一個特色。
支持多 Disk Drive
Broker 的 log.dirs 配置項卖鲤,允許配置多個文件夾肾扰。如果機器上有多個 Disk Drive,可將不同的 Disk 掛載到不同的目錄蛋逾,然后將這些目錄都配置到 log.dirs 里集晚。
Kafka 會盡可能將不同的 Partition 分配到不同的目錄,也即不同的 Disk 上区匣,從而充分利用了多 Disk 的優(yōu)勢偷拔。
零拷貝
Kafka 中存在大量的網(wǎng)絡(luò)數(shù)據(jù)持久化到磁盤(Producer 到 Broker)和磁盤文件通過網(wǎng)絡(luò)發(fā)送(Broker 到 Consumer)的過程。這一過程的性能直接影響 Kafka 的整體吞吐量亏钩。
傳統(tǒng)模式下的四次拷貝與四次上下文切換莲绰,以將磁盤文件通過網(wǎng)絡(luò)發(fā)送為例。
傳統(tǒng)模式下姑丑,一般使用如下偽代碼所示的方法先將文件數(shù)據(jù)讀入內(nèi)存蛤签,然后通過 Socket 將內(nèi)存中的數(shù)據(jù)發(fā)送出去。
1. buffer = File.readSocket.send(buffer)
這一過程實際上發(fā)生了四次數(shù)據(jù)拷貝:
- 首先通過系統(tǒng)調(diào)用將文件數(shù)據(jù)讀入到內(nèi)核態(tài) Buffer(DMA 拷貝)栅哀。
- 然后應(yīng)用程序?qū)?nèi)存態(tài) Buffer 數(shù)據(jù)讀入到用戶態(tài) Buffer(CPU 拷貝)震肮。
- 接著用戶程序通過 Socket 發(fā)送數(shù)據(jù)時將用戶態(tài) Buffer 數(shù)據(jù)拷貝到內(nèi)核態(tài) Buffer(CPU 拷貝)。
- 最后通過 DMA 拷貝將數(shù)據(jù)拷貝到 NIC Buffer昌屉。同時钙蒙,還伴隨著四次上下文切換。
而 Linux 2.4+ 內(nèi)核通過 sendfile 系統(tǒng)調(diào)用间驮,提供了零拷貝躬厌。數(shù)據(jù)通過 DMA 拷貝到內(nèi)核態(tài) Buffer 后,直接通過 DMA 拷貝到 NIC Buffer,無需 CPU 拷貝扛施。這也是零拷貝這一說法的來源鸿捧。
除了減少數(shù)據(jù)拷貝外,因為整個讀文件-網(wǎng)絡(luò)發(fā)送由一個 sendfile 調(diào)用完成疙渣,整個過程只有兩次上下文切換匙奴,因此大大提高了性能。
從具體實現(xiàn)來看妄荔,Kafka 的數(shù)據(jù)傳輸通過 Java NIO 的 FileChannel 的 transferTo 和 transferFrom 方法實現(xiàn)零拷貝泼菌。
注: transferTo 和 transferFrom 并不保證一定能使用零拷貝。實際上是否能使用零拷貝與操作系統(tǒng)相關(guān)啦租,如果操作系統(tǒng)提供 sendfile 這樣的零拷貝系統(tǒng)調(diào)用哗伯,則這兩個方法會通過這樣的系統(tǒng)調(diào)用充分利用零拷貝的優(yōu)勢,否則并不能通過這兩個方法本身實現(xiàn)零拷貝篷角。
減少網(wǎng)絡(luò)開銷批處理
批處理是一種常用的用于提高 I/O 性能的方式焊刹。對 Kafka 而言,批處理既減少了網(wǎng)絡(luò)傳輸?shù)?Overhead恳蹲,又提高了寫磁盤的效率虐块。
Kafka 的 send 方法并非立即將消息發(fā)送出去,而是通過 batch.size 和 linger.ms 控制實際發(fā)送頻率嘉蕾,從而實現(xiàn)批量發(fā)送贺奠。
由于每次網(wǎng)絡(luò)傳輸,除了傳輸消息本身以外荆针,還要傳輸非常多的網(wǎng)絡(luò)協(xié)議本身的一些內(nèi)容(稱為 Overhead)敞嗡,所以將多條消息合并到一起傳輸,可有效減少網(wǎng)絡(luò)傳輸?shù)?Overhead航背,進而提高了傳輸效率喉悴。
數(shù)據(jù)壓縮降低網(wǎng)絡(luò)負(fù)載
Kafka 從 0.7 開始,即支持將數(shù)據(jù)壓縮后再傳輸給 Broker玖媚。除了可以將每條消息單獨壓縮然后傳輸外箕肃,Kafka 還支持在批量發(fā)送時,將整個 Batch 的消息一起壓縮后傳輸今魔。
數(shù)據(jù)壓縮的一個基本原理是勺像,重復(fù)數(shù)據(jù)越多壓縮效果越好。因此將整個 Batch 的數(shù)據(jù)一起壓縮能更大幅度減小數(shù)據(jù)量错森,從而更大程度提高網(wǎng)絡(luò)傳輸效率吟宦。
Broker 接收消息后,并不直接解壓縮涩维,而是直接將消息以壓縮后的形式持久化到磁盤殃姓。Consumer Fetch 到數(shù)據(jù)后再解壓縮。
因此 Kafka 的壓縮不僅減少了 Producer 到 Broker 的網(wǎng)絡(luò)傳輸負(fù)載,同時也降低了 Broker 磁盤操作的負(fù)載蜗侈,也降低了 Consumer 與 Broker 間的網(wǎng)絡(luò)傳輸量篷牌,從而極大得提高了傳輸效率,提高了吞吐量踏幻。
高效的序列化方式
Kafka 消息的 Key 和 Payload(或者說 Value)的類型可自定義枷颊,只需同時提供相應(yīng)的序列化器和反序列化器即可。
因此用戶可以通過使用快速且緊湊的序列化-反序列化方式(如 Avro该面,Protocal Buffer)來減少實際網(wǎng)絡(luò)傳輸和磁盤存儲的數(shù)據(jù)規(guī)模夭苗,從而提高吞吐率。
這里要注意吆倦,如果使用的序列化方法太慢听诸,即使壓縮比非常高,最終的效率也不一定高。
彩蛋
順便在此給大家推薦一個Java方面的交流學(xué)習(xí)群:4112676阳懂,里面會分享一些高級面試題锋爪,還有資深架構(gòu)師錄制的視頻錄像:有Spring,MyBatis督怜,Netty源碼分析,高并發(fā)、高性能荒吏、分布式、微服務(wù)架構(gòu)的原理渊鞋,JVM性能優(yōu)化這些成為架構(gòu)師必備的知識體系绰更,主要針對Java開發(fā)人員提升自己,突破瓶頸锡宋,相信你來學(xué)習(xí)儡湾,會有提升和收獲。在這個群里會有你需要的內(nèi)容 朋友們請抓緊時間加入進來吧