消息存儲(chǔ)
分布式隊(duì)列因?yàn)橛懈呖煽啃缘囊笕叮詳?shù)據(jù)要進(jìn)行持久化存儲(chǔ)泳秀。
- 消息生成者發(fā)送消息
- MQ收到消息收擦,將消息進(jìn)行持久化溪食,在存儲(chǔ)中新增一條記錄
- 返回ACK給生產(chǎn)者
- MQ push 消息給對(duì)應(yīng)的消費(fèi)者鼓拧,然后等待消費(fèi)者返回ACK
- 如果消息消費(fèi)者在指定時(shí)間內(nèi)成功返回ack半火,那么MQ認(rèn)為消息消費(fèi)成功,在存儲(chǔ)中刪除消息季俩,即執(zhí)行第6步钮糖;如果MQ在指定時(shí)間內(nèi)沒有收到ACK,則認(rèn)為消息消費(fèi)失敗酌住,會(huì)嘗試重新push消息,重復(fù)執(zhí)行4店归、5、6步驟
- MQ刪除消息
1.1.1 存儲(chǔ)介質(zhì)
關(guān)系型數(shù)據(jù)庫(kù)DB
Apache下開源的另外一款MQ—ActiveMQ(默認(rèn)采用的KahaDB做消息存儲(chǔ))可選用JDBC的方式來(lái)做消息持久化酪我,通過(guò)簡(jiǎn)單的xml配置信息即可實(shí)現(xiàn)JDBC消息存儲(chǔ)消痛。由于,普通關(guān)系型數(shù)據(jù)庫(kù)(如Mysql)在單表數(shù)據(jù)量達(dá)到千萬(wàn)級(jí)別的情況下都哭,其IO讀寫性能往往會(huì)出現(xiàn)瓶頸秩伞。在可靠性方面,該種方案非常依賴DB欺矫,如果一旦DB出現(xiàn)故障纱新,則MQ的消息就無(wú)法落盤存儲(chǔ)會(huì)導(dǎo)致線上故障
文件系統(tǒng)
目前業(yè)界較為常用的幾款產(chǎn)品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盤至所部署虛擬機(jī)/物理機(jī)的文件系統(tǒng)來(lái)做持久化(刷盤一般可以分為異步刷盤和同步刷盤兩種模式)。消息刷盤為消息存儲(chǔ)提供了一種高效率穆趴、高可靠性和高性能的數(shù)據(jù)持久化方式脸爱。除非部署MQ機(jī)器本身或是本地磁盤掛了,否則一般是不會(huì)出現(xiàn)無(wú)法持久化的故障問(wèn)題未妹。
1.1.2 性能對(duì)比
文件系統(tǒng)>關(guān)系型數(shù)據(jù)庫(kù)DB
1.1.3 消息的存儲(chǔ)和發(fā)送
消息存儲(chǔ)
磁盤如果使用得當(dāng)鳍烁,磁盤的速度完全可以匹配上網(wǎng)絡(luò) 的數(shù)據(jù)傳輸速度讯赏。目前的高性能磁盤过蹂,順序?qū)懰俣瓤梢赃_(dá)到600MB/s憔足, 超過(guò)了一般網(wǎng)卡的傳輸速度。但是磁盤隨機(jī)寫的速度只有大概100KB/s酪耕,和順序?qū)懙男阅芟嗖?000倍导梆!因?yàn)橛腥绱司薮蟮乃俣炔顒e,好的消息隊(duì)列系統(tǒng)會(huì)比普通的消息隊(duì)列系統(tǒng)速度快多個(gè)數(shù)量級(jí)。RocketMQ的消息用順序?qū)?/strong>,保證了消息存儲(chǔ)的速度看尼。
消息發(fā)送
Linux操作系統(tǒng)分為【用戶態(tài)】和【內(nèi)核態(tài)】递鹉,文件操作、網(wǎng)絡(luò)操作需要涉及這兩種形態(tài)的切換藏斩,免不了進(jìn)行數(shù)據(jù)復(fù)制躏结。
一臺(tái)服務(wù)器 把本機(jī)磁盤文件的內(nèi)容發(fā)送到客戶端,一般分為兩個(gè)步驟:
1)read狰域;讀取本地文件內(nèi)容媳拴;
2)write;將讀取的內(nèi)容通過(guò)網(wǎng)絡(luò)發(fā)送出去兆览。
這兩個(gè)看似簡(jiǎn)單的操作屈溉,實(shí)際進(jìn)行了4 次數(shù)據(jù)復(fù)制,分別是:
- 從磁盤復(fù)制數(shù)據(jù)到內(nèi)核態(tài)內(nèi)存抬探;
- 從內(nèi)核態(tài)內(nèi)存復(fù) 制到用戶態(tài)內(nèi)存子巾;
- 然后從用戶態(tài) 內(nèi)存復(fù)制到網(wǎng)絡(luò)驅(qū)動(dòng)的內(nèi)核態(tài)內(nèi)存;
- 最后是從網(wǎng)絡(luò)驅(qū)動(dòng)的內(nèi)核態(tài)內(nèi)存復(fù) 制到網(wǎng)卡中進(jìn)行傳輸小压。
通過(guò)使用mmap的方式线梗,可以省去向用戶態(tài)的內(nèi)存復(fù)制,提高速度怠益。這種機(jī)制在Java中是通過(guò)MappedByteBuffer
實(shí)現(xiàn)的
RocketMQ充分利用了上述特性仪搔,也就是所謂的“零拷貝”技術(shù),提高消息存盤和網(wǎng)絡(luò)發(fā)送的速度蜻牢。
這里需要注意的是烤咧,采用MappedByteBuffer
這種內(nèi)存映射的方式有幾個(gè)限制,其中之一是一次只能映射1.5~2G 的文件至用戶態(tài)的虛擬內(nèi)存孩饼,這也是為何RocketMQ默認(rèn)設(shè)置單個(gè)CommitLog日志數(shù)據(jù)文件為1G的原因了
順序?qū)懰柘鳎憧截?保證性能
1.1.4 消息存儲(chǔ)結(jié)構(gòu)
RocketMQ消息的存儲(chǔ)是由ConsumeQueue
和CommitLog
配合完成 的竹挡,消息真正的物理存儲(chǔ)文件是CommitLog
镀娶,ConsumeQueue
是消息的邏輯隊(duì)列,類似數(shù)據(jù)庫(kù)的索引文件揪罕,存儲(chǔ)的是指向物理存儲(chǔ)的地址梯码。每 個(gè)Topic下的每個(gè)Message Queue
都有一個(gè)對(duì)應(yīng)的ConsumeQueue
文件。
- CommitLog:存儲(chǔ)消息的元數(shù)據(jù)
- ConsumerQueue:存儲(chǔ)消息在CommitLog的索引(按照消息偏移量查詢)
- IndexFile:為了消息查詢提供了一種通過(guò)key或時(shí)間區(qū)間來(lái)查詢消息的方法好啰,這種通過(guò)IndexFile來(lái)查找消息的方法不影響發(fā)送與消費(fèi)消息的主流程(按照其他條件查詢)
1.1.5 刷盤機(jī)制
RocketMQ的消息是存儲(chǔ)到磁盤上的轩娶,這樣既能保證斷電后恢復(fù), 又可以讓存儲(chǔ)的消息量超出內(nèi)存的限制框往。RocketMQ為了提高性能鳄抒,會(huì)盡可能地保證磁盤的順序?qū)憽O⒃谕ㄟ^(guò)Producer寫入RocketMQ的時(shí) 候,有兩種寫磁盤方式许溅,分布式同步刷盤和異步刷盤瓤鼻。
1)同步刷盤
在返回寫成功狀態(tài)時(shí),消息已經(jīng)被寫入磁盤贤重。具體流程是茬祷,消息寫入內(nèi)存的PAGECACHE后,立刻通知刷盤線程刷盤并蝗, 然后等待刷盤完成祭犯,刷盤線程執(zhí)行完成后喚醒等待的線程,返回消息寫 成功的狀態(tài)滚停。
2)異步刷盤
在返回寫成功狀態(tài)時(shí)沃粗,消息可能只是被寫入了內(nèi)存的PAGECACHE,寫操作的返回快键畴,吞吐量大陪每;當(dāng)內(nèi)存里的消息量積累到一定程度時(shí),統(tǒng)一觸發(fā)寫磁盤動(dòng)作镰吵,快速寫入檩禾。
3)配置
同步刷盤還是異步刷盤,都是通過(guò)Broker配置文件里的flushDiskType 參數(shù)設(shè)置的疤祭,這個(gè)參數(shù)被配置成SYNC_FLUSH盼产、ASYNC_FLUSH中的 一個(gè)。
默認(rèn)異步刷盤
1.2 高可用性機(jī)制
RocketMQ分布式集群是通過(guò)Master和Slave的配合達(dá)到高可用性的勺馆。
Master和Slave的區(qū)別:在Broker的配置文件中戏售,參數(shù) brokerId的值為0表明這個(gè)Broker是Master,大于0表明這個(gè)Broker是 Slave草穆,同時(shí)brokerRole參數(shù)也會(huì)說(shuō)明這個(gè)Broker是Master還是Slave灌灾。
Master角色的Broker支持讀和寫,Slave角色的Broker僅支持讀悲柱,也就是 Producer只能和Master角色的Broker連接寫入消息锋喜;Consumer可以連接 Master角色的Broker,也可以連接Slave角色的Broker來(lái)讀取消息豌鸡。
1.2.1 消息消費(fèi)高可用
在Consumer的配置文件中嘿般,并不需要設(shè)置是從Master讀還是從Slave 讀,當(dāng)Master不可用或者繁忙的時(shí)候涯冠,Consumer會(huì)被自動(dòng)切換到從Slave 讀炉奴。有了自動(dòng)切換Consumer這種機(jī)制,當(dāng)一個(gè)Master角色的機(jī)器出現(xiàn)故障后蛇更,Consumer仍然可以從Slave讀取消息瞻赶,不影響Consumer程序赛糟。這就達(dá)到了消費(fèi)端的高可用性。
1.2.2 消息發(fā)送高可用
在創(chuàng)建Topic的時(shí)候砸逊,把Topic的多個(gè)Message Queue創(chuàng)建在多個(gè)Broker組上(相同Broker名稱虑灰,不同 brokerId的機(jī)器組成一個(gè)Broker組),這樣當(dāng)一個(gè)Broker組的Master不可 用后痹兜,其他組的Master仍然可用穆咐,Producer仍然可以發(fā)送消息。 RocketMQ目前還不支持把Slave自動(dòng)轉(zhuǎn)成Master字旭,如果機(jī)器資源不足对湃, 需要把Slave轉(zhuǎn)成Master,則要手動(dòng)停止Slave角色的Broker遗淳,更改配置文 件拍柒,用新的配置文件啟動(dòng)Broker。
1.2.3 消息主從復(fù)制
如果一個(gè)Broker組有Master和Slave屈暗,消息需要從Master復(fù)制到Slave 上拆讯,有同步和異步兩種復(fù)制方式。
1)同步復(fù)制
同步復(fù)制方式是等Master和Slave均寫 成功后才反饋給客戶端寫成功狀態(tài)养叛;
在同步復(fù)制方式下种呐,如果Master出故障, Slave上有全部的備份數(shù)據(jù)弃甥,容易恢復(fù)爽室,但是同步復(fù)制會(huì)增大數(shù)據(jù)寫入 延遲,降低系統(tǒng)吞吐量淆攻。
2)異步復(fù)制
異步復(fù)制方式是只要Master寫成功 即可反饋給客戶端寫成功狀態(tài)阔墩。
在異步復(fù)制方式下,系統(tǒng)擁有較低的延遲和較高的吞吐量瓶珊,但是如果Master出了故障啸箫,有些數(shù)據(jù)因?yàn)闆]有被寫 入Slave,有可能會(huì)丟失伞芹;
3)配置
同步復(fù)制和異步復(fù)制是通過(guò)Broker配置文件里的brokerRole參數(shù)進(jìn)行設(shè)置的忘苛,這個(gè)參數(shù)可以被設(shè)置成ASYNC_MASTER、 SYNC_MASTER丑瞧、SLAVE三個(gè)值中的一個(gè)柑土。
4)總結(jié)
實(shí)際應(yīng)用中要結(jié)合業(yè)務(wù)場(chǎng)景蜀肘,合理設(shè)置刷盤方式和主從復(fù)制方式绊汹, 尤其是SYNC_FLUSH方式,由于頻繁地觸發(fā)磁盤寫動(dòng)作扮宠,會(huì)明顯降低 性能西乖。通常情況下狐榔,應(yīng)該把Master和Save配置成ASYNC_FLUSH的刷盤 方式,主從之間配置成SYNC_MASTER的復(fù)制方式获雕,這樣即使有一臺(tái) 機(jī)器出故障薄腻,仍然能保證數(shù)據(jù)不丟,是個(gè)不錯(cuò)的選擇届案。
1.3 負(fù)載均衡
1.3.1 Producer負(fù)載均衡
Producer端庵楷,每個(gè)實(shí)例在發(fā)消息的時(shí)候,默認(rèn)會(huì)輪詢所有的message queue發(fā)送楣颠,以達(dá)到讓消息平均落在不同的queue上尽纽。而由于queue可以散落在不同的broker,所以消息就發(fā)送到不同的broker下童漩,如下圖:
圖中箭頭線條上的標(biāo)號(hào)代表順序弄贿,發(fā)布方會(huì)把第一條消息發(fā)送至 Queue 0,然后第二條消息發(fā)送至 Queue 1矫膨,以此類推差凹。
1.3.2 Consumer負(fù)載均衡
1)集群模式
在集群消費(fèi)模式下,每條消息只需要投遞到訂閱這個(gè)topic的Consumer Group下的一個(gè)實(shí)例即可侧馅。RocketMQ采用主動(dòng)拉取的方式拉取并消費(fèi)消息危尿,在拉取的時(shí)候需要明確指定拉取哪一條message queue。
而每當(dāng)實(shí)例的數(shù)量有變更馁痴,都會(huì)觸發(fā)一次所有實(shí)例的負(fù)載均衡脚线,這時(shí)候會(huì)按照queue的數(shù)量和實(shí)例的數(shù)量平均分配queue給每個(gè)實(shí)例。
默認(rèn)的分配算法是AllocateMessageQueueAveragely弥搞,如下圖:
還有另外一種平均的算法是AllocateMessageQueueAveragelyByCircle邮绿,也是平均分?jǐn)偯恳粭lqueue,只是以環(huán)狀輪流分queue的形式攀例,如下圖:
需要注意的是船逮,集群模式下,queue都是只允許分配只一個(gè)實(shí)例粤铭,這是由于如果多個(gè)實(shí)例同時(shí)消費(fèi)一個(gè)queue的消息挖胃,由于拉取哪些消息是consumer主動(dòng)控制的,那樣會(huì)導(dǎo)致同一個(gè)消息在不同的實(shí)例下被消費(fèi)多次梆惯,所以算法上都是一個(gè)queue只分給一個(gè)consumer實(shí)例酱鸭,一個(gè)consumer實(shí)例可以允許同時(shí)分到不同的queue。
通過(guò)增加consumer實(shí)例去分?jǐn)俼ueue的消費(fèi)垛吗,可以起到水平擴(kuò)展的消費(fèi)能力的作用凹髓。而有實(shí)例下線的時(shí)候,會(huì)重新觸發(fā)負(fù)載均衡怯屉,這時(shí)候原來(lái)分配到的queue將分配到其他實(shí)例上繼續(xù)消費(fèi)蔚舀。
但是如果consumer實(shí)例的數(shù)量比message queue的總數(shù)量還多的話饵沧,多出來(lái)的consumer實(shí)例將無(wú)法分到queue,也就無(wú)法消費(fèi)到消息赌躺,也就無(wú)法起到分?jǐn)傌?fù)載的作用了狼牺。所以需要控制讓queue的總數(shù)量大于等于consumer的數(shù)量。
2)廣播模式
由于廣播模式下要求一條消息需要投遞到一個(gè)消費(fèi)組下面所有的消費(fèi)者實(shí)例礼患,所以也就沒有消息被分?jǐn)傁M(fèi)的說(shuō)法是钥。
在實(shí)現(xiàn)上,其中一個(gè)不同就是在consumer分配queue的時(shí)候缅叠,所有consumer都分到所有的queue咏瑟。
1.4 消息重試
1.4.1 順序消息的重試
對(duì)于順序消息,當(dāng)消費(fèi)者消費(fèi)消息失敗后痪署,消息隊(duì)列 RocketMQ 會(huì)自動(dòng)不斷進(jìn)行消息重試(每次間隔時(shí)間為 1 秒)码泞,這時(shí),應(yīng)用會(huì)出現(xiàn)消息消費(fèi)被阻塞的情況狼犯。因此余寥,在使用順序消息時(shí),務(wù)必保證應(yīng)用能夠及時(shí)監(jiān)控并處理消費(fèi)失敗的情況悯森,避免阻塞現(xiàn)象的發(fā)生宋舷。
1.4.2 無(wú)序消息的重試
對(duì)于無(wú)序消息(普通、定時(shí)瓢姻、延時(shí)祝蝠、事務(wù)消息),當(dāng)消費(fèi)者消費(fèi)消息失敗時(shí)幻碱,您可以通過(guò)設(shè)置返回狀態(tài)達(dá)到消息重試的結(jié)果绎狭。
無(wú)序消息的重試只針對(duì)集群消費(fèi)方式生效;廣播方式不提供失敗重試特性褥傍,即消費(fèi)失敗后儡嘶,失敗消息不再重試,繼續(xù)消費(fèi)新的消息恍风。
1)重試次數(shù)
消息隊(duì)列 RocketMQ 默認(rèn)允許每條消息最多重試 16 次蹦狂,每次重試的間隔時(shí)間如下:
第幾次重試 | 與上次重試的間隔時(shí)間 | 第幾次重試 | 與上次重試的間隔時(shí)間 |
---|---|---|---|
1 | 10 秒 | 9 | 7 分鐘 |
2 | 30 秒 | 10 | 8 分鐘 |
3 | 1 分鐘 | 11 | 9 分鐘 |
4 | 2 分鐘 | 12 | 10 分鐘 |
5 | 3 分鐘 | 13 | 20 分鐘 |
6 | 4 分鐘 | 14 | 30 分鐘 |
7 | 5 分鐘 | 15 | 1 小時(shí) |
8 | 6 分鐘 | 16 | 2 小時(shí) |
如果消息重試 16 次后仍然失敗,消息將不再投遞朋贬。如果嚴(yán)格按照上述重試時(shí)間間隔計(jì)算凯楔,某條消息在一直消費(fèi)失敗的前提下,將會(huì)在接下來(lái)的 4 小時(shí) 46 分鐘之內(nèi)進(jìn)行 16 次重試锦募,超過(guò)這個(gè)時(shí)間范圍消息將不再重試投遞摆屯。
注意: 一條消息無(wú)論重試多少次,這些重試消息的 Message ID 不會(huì)改變御滩。
2)配置方式
消費(fèi)失敗后鸥拧,重試配置方式
集群消費(fèi)方式下党远,消息消費(fèi)失敗后期望消息重試削解,需要在消息監(jiān)聽器接口的實(shí)現(xiàn)中明確進(jìn)行配置(三種方式任選一種):
- 返回 Action.ReconsumeLater (推薦)
- 返回 Null
- 拋出異常
public class MessageListenerImpl implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
//處理消息
doConsumeMessage(message);
//方式1:返回 Action.ReconsumeLater富弦,消息將重試
return Action.ReconsumeLater;
//方式2:返回 null,消息將重試
return null;
//方式3:直接拋出異常氛驮, 消息將重試
throw new RuntimeException("Consumer Message exceotion");
}
}
消費(fèi)失敗后腕柜,不重試配置方式
集群消費(fèi)方式下,消息失敗后期望消息不重試矫废,需要捕獲消費(fèi)邏輯中可能拋出的異常盏缤,最終返回 Action.CommitMessage,此后這條消息將不會(huì)再重試蓖扑。
public class MessageListenerImpl implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
try {
doConsumeMessage(message);
} catch (Throwable e) {
//捕獲消費(fèi)邏輯中的所有異常唉铜,并返回 Action.CommitMessage;
return Action.CommitMessage;
}
//消息處理正常,直接返回 Action.CommitMessage;
return Action.CommitMessage;
}
}
自定義消息最大重試次數(shù)
消息隊(duì)列 RocketMQ 允許 Consumer 啟動(dòng)的時(shí)候設(shè)置最大重試次數(shù)律杠,重試時(shí)間間隔將按照如下策略:
- 最大重試次數(shù)小于等于 16 次潭流,則重試時(shí)間間隔同上表描述。
- 最大重試次數(shù)大于 16 次柜去,超過(guò) 16 次的重試時(shí)間間隔均為每次 2 小時(shí)灰嫉。
Properties properties = new Properties();
//配置對(duì)應(yīng) Group ID 的最大消息重試次數(shù)為 20 次
properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");
Consumer consumer =ONSFactory.createConsumer(properties);
注意:
- 消息最大重試次數(shù)的設(shè)置對(duì)相同 Group ID 下的所有 Consumer 實(shí)例有效。
- 如果只對(duì)相同 Group ID 下兩個(gè) Consumer 實(shí)例中的其中一個(gè)設(shè)置了 MaxReconsumeTimes嗓奢,那么該配置對(duì)兩個(gè) Consumer 實(shí)例均生效讼撒。
- 配置采用覆蓋的方式生效,即最后啟動(dòng)的 Consumer 實(shí)例會(huì)覆蓋之前的啟動(dòng)實(shí)例的配置
獲取消息重試次數(shù)
消費(fèi)者收到消息后股耽,可按照如下方式獲取消息的重試次數(shù):
public class MessageListenerImpl implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
//獲取消息的重試次數(shù)
System.out.println(message.getReconsumeTimes());
return Action.CommitMessage;
}
}
1.5 死信隊(duì)列
當(dāng)一條消息初次消費(fèi)失敗根盒,消息隊(duì)列 RocketMQ 會(huì)自動(dòng)進(jìn)行消息重試;達(dá)到最大重試次數(shù)后物蝙,若消費(fèi)依然失敗郑象,則表明消費(fèi)者在正常情況下無(wú)法正確地消費(fèi)該消息,此時(shí)茬末,消息隊(duì)列 RocketMQ 不會(huì)立刻將消息丟棄厂榛,而是將其發(fā)送到該消費(fèi)者對(duì)應(yīng)的特殊隊(duì)列中。
在消息隊(duì)列 RocketMQ 中丽惭,這種正常情況下無(wú)法被消費(fèi)的消息稱為死信消息(Dead-Letter Message)击奶,存儲(chǔ)死信消息的特殊隊(duì)列稱為死信隊(duì)列(Dead-Letter Queue)。
1.5.1 死信特性
死信消息具有以下特性
- 不會(huì)再被消費(fèi)者正常消費(fèi)责掏。
- 有效期與正常消息相同柜砾,均為 3 天,3 天后會(huì)被自動(dòng)刪除换衬。因此痰驱,請(qǐng)?jiān)谒佬畔a(chǎn)生后的 3 天內(nèi)及時(shí)處理证芭。
死信隊(duì)列具有以下特性:
- 一個(gè)死信隊(duì)列對(duì)應(yīng)一個(gè) Group ID, 而不是對(duì)應(yīng)單個(gè)消費(fèi)者實(shí)例担映。
- 如果一個(gè) Group ID 未產(chǎn)生死信消息废士,消息隊(duì)列 RocketMQ 不會(huì)為其創(chuàng)建相應(yīng)的死信隊(duì)列。
- 一個(gè)死信隊(duì)列包含了對(duì)應(yīng) Group ID 產(chǎn)生的所有死信消息蝇完,不論該消息屬于哪個(gè) Topic官硝。
1.5.2 查看死信信息
- 在控制臺(tái)查詢出現(xiàn)死信隊(duì)列的主題信息
- 在消息界面根據(jù)主題查詢死信消息
- 選擇重新發(fā)送消息
一條消息進(jìn)入死信隊(duì)列,意味著某些因素導(dǎo)致消費(fèi)者無(wú)法正常消費(fèi)該消息短蜕,因此氢架,通常需要您對(duì)其進(jìn)行特殊處理。排查可疑因素并解決問(wèn)題后朋魔,可以在消息隊(duì)列 RocketMQ 控制臺(tái)重新發(fā)送該消息岖研,讓消費(fèi)者重新消費(fèi)一次。
1.6 消費(fèi)冪等
消息隊(duì)列 RocketMQ 消費(fèi)者在接收到消息以后警检,有必要根據(jù)業(yè)務(wù)上的唯一 Key 對(duì)消息做冪等處理的必要性孙援。
1.6.1 消費(fèi)冪等的必要性
在互聯(lián)網(wǎng)應(yīng)用中,尤其在網(wǎng)絡(luò)不穩(wěn)定的情況下解滓,消息隊(duì)列 RocketMQ 的消息有可能會(huì)出現(xiàn)重復(fù)赃磨,這個(gè)重復(fù)簡(jiǎn)單可以概括為以下情況:
-
發(fā)送時(shí)消息重復(fù)
當(dāng)一條消息已被成功發(fā)送到服務(wù)端并完成持久化,此時(shí)出現(xiàn)了網(wǎng)絡(luò)閃斷或者客戶端宕機(jī)洼裤,導(dǎo)致服務(wù)端對(duì)客戶端應(yīng)答失敗邻辉。 如果此時(shí)生產(chǎn)者意識(shí)到消息發(fā)送失敗并嘗試再次發(fā)送消息,消費(fèi)者后續(xù)會(huì)收到兩條內(nèi)容相同并且 Message ID 也相同的消息腮鞍。
-
投遞時(shí)消息重復(fù)
消息消費(fèi)的場(chǎng)景下值骇,消息已投遞到消費(fèi)者并完成業(yè)務(wù)處理,當(dāng)客戶端給服務(wù)端反饋應(yīng)答的時(shí)候網(wǎng)絡(luò)閃斷移国。 為了保證消息至少被消費(fèi)一次吱瘩,消息隊(duì)列 RocketMQ 的服務(wù)端將在網(wǎng)絡(luò)恢復(fù)后再次嘗試投遞之前已被處理過(guò)的消息,消費(fèi)者后續(xù)會(huì)收到兩條內(nèi)容相同并且 Message ID 也相同的消息迹缀。
-
負(fù)載均衡時(shí)消息重復(fù)(包括但不限于網(wǎng)絡(luò)抖動(dòng)使碾、Broker 重啟以及訂閱方應(yīng)用重啟)
當(dāng)消息隊(duì)列 RocketMQ 的 Broker 或客戶端重啟、擴(kuò)容或縮容時(shí)祝懂,會(huì)觸發(fā) Rebalance票摇,此時(shí)消費(fèi)者可能會(huì)收到重復(fù)消息。
1.6.2 處理方式
因?yàn)?Message ID 有可能出現(xiàn)沖突(重復(fù))的情況砚蓬,所以真正安全的冪等處理矢门,不建議以 Message ID 作為處理依據(jù)。 最好的方式是以業(yè)務(wù)唯一標(biāo)識(shí)作為冪等處理的關(guān)鍵依據(jù),而業(yè)務(wù)的唯一標(biāo)識(shí)可以通過(guò)消息 Key 進(jìn)行設(shè)置:
Message message = new Message();
message.setKey("ORDERID_100");
SendResult sendResult = producer.send(message);
訂閱方收到消息時(shí)可以根據(jù)消息的 Key 進(jìn)行冪等處理:
consumer.subscribe("ons_test", "*", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
String key = message.getKey()
// 根據(jù)業(yè)務(wù)唯一標(biāo)識(shí)的 key 做冪等處理
}
});