消息的發(fā)送和消費并不是百分百成功的拜银,在出現(xiàn)消息推送失敗時柿扣,RocketMQ有何補償方式來進(jìn)行消息重試呢咽白?這是我們今天要一起學(xué)習(xí)的點。
1. 順序消息的重試
對于順序消息篓足,當(dāng)消費者消費消息失敗后段誊,消息隊列 RocketMQ 會自動不斷進(jìn)行消息重試(每次間隔時間為 1 秒),這時栈拖,應(yīng)用會出現(xiàn)消息消費被阻塞的情況枕扫。因此,在使用順序消息時辱魁,務(wù)必保證應(yīng)用能夠及時監(jiān)控并處理消費失敗的情況,避免阻塞現(xiàn)象的發(fā)生诗鸭。
2. 無序消息的重試
對于無序消息(普通染簇、定時、延時强岸、事務(wù)消息)锻弓,當(dāng)消費者消費消息失敗時,您可以通過設(shè)置返回狀態(tài)達(dá)到消息重試的結(jié)果蝌箍。
注意:無序消息的重試只針對集群消費方式生效青灼;廣播方式不提供失敗重試特性,即消費失敗后妓盲,失敗消息不再重試杂拨,繼續(xù)消費新的消息。
2.1 重試次數(shù)
消息隊列 RocketMQ 默認(rèn)允許每條消息最多重試 16 次悯衬,每次重試的間隔時間如下:
第幾次重試 | 與上次重試的間隔時間 | 第幾次重試 | 與上次重試的間隔時間 |
---|---|---|---|
1 | 10 秒 | 9 | 7 分鐘 |
2 | 30 秒 | 10 | 8 分鐘 |
3 | 1 分鐘 | 11 | 9 分鐘 |
4 | 2 分鐘 | 12 | 10 分鐘 |
5 | 3 分鐘 | 13 | 20 分鐘 |
6 | 4 分鐘 | 14 | 30 分鐘 |
7 | 5 分鐘 | 15 | 1 小時 |
8 | 6 分鐘 | 16 | 2 小時 |
如果消息重試 16 次后仍然失敗弹沽,消息將不再投遞。如果嚴(yán)格按照上述重試時間間隔計算筋粗,某條消息在一直消費失敗的前提下策橘,將會在接下來的 4 小時 46 分鐘之內(nèi)進(jìn)行 16 次重試,超過這個時間范圍消息將不再重試投遞娜亿。
注意: 一條消息無論重試多少次丽已,這些重試消息的 Message ID 不會改變。
2.2 配置方式
2.2.1 消費失敗后买决,重試配置方式
集群消費方式下沛婴,消息消費失敗后期望消息重試吼畏,需要在消息監(jiān)聽器接口的實現(xiàn)中明確進(jìn)行配置(三種方式任選一種):
- 返回 Action.ReconsumeLater (推薦)
- 返回 Null
- 拋出異常
public class MessageListenerImpl implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
//處理消息
doConsumeMessage(message);
//方式1:返回 Action.ReconsumeLater,消息將重試
return Action.ReconsumeLater;
//方式2:返回 null瘸味,消息將重試
return null;
//方式3:直接拋出異常宫仗, 消息將重試
throw new RuntimeException("Consumer Message exceotion");
}
}
2.2.2 消費失敗后,不重試配置方式
集群消費方式下旁仿,消息失敗后期望消息不重試藕夫,需要捕獲消費邏輯中可能拋出的異常,最終返回 Action.CommitMessage枯冈,此后這條消息將不會再重試毅贮。
public class MessageListenerImpl implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
try {
doConsumeMessage(message);
} catch (Throwable e) {
//捕獲消費邏輯中的所有異常,并返回 Action.CommitMessage;
return Action.CommitMessage;
}
//消息處理正常尘奏,直接返回 Action.CommitMessage;
return Action.CommitMessage;
}
}
2.2.3 自定義消息最大重試次數(shù)
消息隊列 RocketMQ 允許 Consumer 啟動的時候設(shè)置最大重試次數(shù)滩褥,重試時間間隔將按照如下策略:
- 最大重試次數(shù)小于等于 16 次,則重試時間間隔同上表描述炫加。
- 最大重試次數(shù)大于 16 次瑰煎,超過 16 次的重試時間間隔均為每次 2 小時。
Properties properties = new Properties();
//配置對應(yīng) Group ID 的最大消息重試次數(shù)為 20 次
properties.put(PropertyKeyConst.MaxReconsumeTimes, "20");
Consumer consumer =ONSFactory.createConsumer(properties);
需要注意的是:
- 消息最大重試次數(shù)的設(shè)置對相同 Group ID 下的所有 Consumer 實例有效俗孝。
- 如果只對相同 Group ID 下兩個 Consumer 實例中的其中一個設(shè)置了 MaxReconsumeTimes酒甸,那么該配置對兩個 Consumer 實例均生效。
- 配置采用覆蓋的方式生效赋铝,即最后啟動的 Consumer 實例會覆蓋之前的啟動實例的配置插勤。
2.2.4 獲取消息重試次數(shù)
消費者收到消息后,可按照如下方式獲取消息的重試次數(shù):
public class MessageListenerImpl implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
//獲取消息的重試次數(shù)
System.out.println(message.getReconsumeTimes());
return Action.CommitMessage;
}
}