消息丟失的場景
- 消息發(fā)送時消息丟失
- 路由消息時消息丟失
- 消息未持久化消息丟失
- 消費消息時消息丟失
消息發(fā)送可靠性
AMQP協(xié)議提供的一個事務(wù)機制
一般不使用恨搓,影響吞吐量
發(fā)送方確認機制(publisher confirm)
首先生產(chǎn)者通過調(diào)用channel.confirmSelect方法將信道設(shè)置為confirm模式堰燎,一旦信道進入confirm模式,所有在該信道上面發(fā)布的消息都會被指派一個唯一的ID(從1開始)身诺,一旦消息被投遞到所有匹配的隊列之后,RabbitMQ就會發(fā)送一個確認(Basic.Ack)給生產(chǎn)者(包含消息的唯一deliveryTag和multiple參數(shù))晦毙,這就使得生產(chǎn)者知曉消息已經(jīng)正確到達了目的地了暇赤。
Confirm模式有三種方式實現(xiàn)
- 串行confirm模式:producer每發(fā)送一條消息后,調(diào)用waitForConfirms()方法演顾,等待broker端confirm供搀,如果服務(wù)器端返回false或者在超時時間內(nèi)未返回,客戶端進行消息重傳钠至。
- 批量confirm模式:producer每發(fā)送一批消息后葛虐,調(diào)用waitForConfirms()方法,等待broker端confirm棉钧。
- 異步confirm模式:提供一個回調(diào)方法屿脐,broker confirm了一條或者多條消息后producer端會回調(diào)這個方法。 我們分別來看看這三種confirm模式
異步Confirm模式
package com.rabbitmq.client;
import java.io.IOException;
/**
* Implement this interface in order to be notified of Confirm events.
* Acks represent messages handled successfully; Nacks represent
* messages lost by the broker. Note, the lost messages could still
* have been delivered to consumers, but the broker cannot guarantee
* this.
*/
public interface ConfirmListener {
void handleAck(long deliveryTag, boolean multiple) throws IOException;
void handleNack(long deliveryTag, boolean multiple) throws IOException;
}
這里就需要在發(fā)送消息之前將消息存儲起來,以便于在ConfirmListener中處理消息發(fā)送成功和失敗的情況的诵,可以存儲到數(shù)據(jù)庫或者Redis中万栅。
如果發(fā)送失敗則需要進行消息重發(fā),重試超過一定次數(shù)后仍然失敗則需要記錄日志西疤,告警烦粒,人工處理。
消息端如何保證消息可靠性
手動確認機制
消費者消費完畢后手動地向 Broker 發(fā)送確認通知代赁,Broker 收到確認通知后再從隊列中刪除對應(yīng)的消息扰她。
重試
在消息消費處理邏輯中加入重試機制,以處理一些被調(diào)用服務(wù)網(wǎng)絡(luò)抖動等情況導(dǎo)致的消息消費失敗的情況管跺。
如何重試超過一定次數(shù)后仍然失敗則將消息發(fā)送到死信隊列义黎。
死信隊列
1禾进、消息被否定確認使用 channel.basicNack
或 channel.basicReject
豁跑,并且此時requeue
屬性被設(shè)置為false
。
2泻云、消息在隊列中的時間超過了設(shè)置的TTL(time to live)時間艇拍。
3、消息數(shù)量超過了隊列的容量限制宠纯。
當一個隊列中的消息滿足上述三種情況任一個時卸夕,該消息就會從原隊列移至死信隊列,若改隊列沒有綁定死信隊列則消息被丟棄婆瓜。
死信隊列和普通的業(yè)務(wù)隊列沒有什么差別快集,只不過是業(yè)務(wù)上創(chuàng)建用來存儲處理失敗的消息的隊列。所以其工作方式也和業(yè)務(wù)隊列相同廉白,死信仍然需要交換機的轉(zhuǎn)發(fā)到達死信隊列个初。
根據(jù)實際的業(yè)務(wù)情況,我們可以創(chuàng)建專門的死信消費者對死信進行處理猴蹂,或者進行人工補償院溺。
如何保證消息100%被消費
舉個例子,用戶注冊贈送積分磅轻,這里贈送積分是通過消息隊列進行解耦珍逸。
解決方案一、消息落庫 + 定時任務(wù) + 冪等 + 重試 + 人工補償
用戶表 + 消息表聋溜,在同一個事務(wù)中存儲用戶注冊數(shù)據(jù)和贈送積分數(shù)據(jù)谆膳。
在事務(wù)之外執(zhí)行消息發(fā)送,通過發(fā)送端confirm機制保證消息發(fā)送成功撮躁。
消費端消費消息摹量,消費完成后進行手動ack, 這里也會出現(xiàn)ack時消息隊列server突然宕機的情況,這時就需要保證消費端消費消息需要實現(xiàn)冪等(因為消息會被重發(fā))缨称。消息消費成功后將消息表中的消息狀態(tài)設(shè)置為完成凝果。
定時任務(wù),定時掃描未處理的消息睦尽,進行消息重發(fā)器净,重發(fā)超過一定次數(shù)后標記為失敗,轉(zhuǎn)人工處理当凡。
解決方案二山害、延遲投遞 + 回調(diào)檢查
上游服務(wù)完成業(yè)務(wù)處理后,發(fā)送兩條消息沿量,一條給下游服務(wù)進行業(yè)務(wù)處理浪慌,如贈送積分業(yè)務(wù),另一條給callback服務(wù)朴则。
下游服務(wù)接收到業(yè)務(wù)消息并處理完成之后就直接發(fā)送一條消息給callback服務(wù)权纤,callback服務(wù)接收到消息后就知道剛才有一條消息被成功處理了,callback服務(wù)把這條消息持久到數(shù)據(jù)庫中乌妒,當上游服務(wù)之前發(fā)送的延遲消息到達callback服務(wù)時進行數(shù)據(jù)庫檢查汹想,如果存在則說明消息被成功消費了,如果不存在則通過PRC調(diào)用通知上游服務(wù)有消息沒有處理撤蚊,上游服務(wù)重新發(fā)送業(yè)務(wù)消息和延遲確定消息進行重試古掏。
Step 1: 上游服務(wù)業(yè)務(wù)處理
上游服務(wù) --- 【a.業(yè)務(wù)消息】 ----> 下游服務(wù)
上游服務(wù) --- 【b.延遲確認消息】 ----> callback服務(wù)
Step 2: 下游服務(wù)業(yè)務(wù)處理
下游服務(wù) --- 消費a消息 ---- 【c.消費確認消息】 ---> callback服務(wù)
Step 3: 消息處理情況持久化
callback服務(wù) --- 消費【c. 消費確認消息】 ---持久化到DB(a消息已被成功消費)
Step 4: 上游服務(wù)檢查消息處理情況
callback服務(wù) --- 消費【b.延遲確認消息】 --- 檢查DB
Step 5: 重試
callback服務(wù) --- 檢查DB 通過 --- 完成
callback服務(wù) --- 檢查DB 不通過 --- RPC通知上游服務(wù)
雖然這種方案也是無法做到 100% 的可靠傳遞,在特別極端的情況侦啸,還是需要定時任務(wù)和補償機制進行輔助槽唾。但是該方案的核心是減少數(shù)據(jù)庫操作,這個點很重要光涂,因為這是在高并發(fā)的場景下庞萍,主要考慮性能。當然我們還是要補償機制顶捷,即可以做到最終一致性挂绰。