前言
軟件的高可用一直是軟件建設(shè)的難點(diǎn)篓跛,接下來探討一下如何借助 Spring Cloud Stream
讓我們的 rabbitmq
變得更加高可用。
消息消費(fèi)失敗
消息的消費(fèi)举塔,說到底其實(shí)就是:根據(jù)接收到的消息(攜帶了某種信號(hào))執(zhí)行一系列業(yè)務(wù)邏輯央渣。而執(zhí)行過程中,由于種種異常情況芽丹,或多或少都會(huì)出現(xiàn)執(zhí)行失敗的情況,那么問題來了咕村,當(dāng)消息消費(fèi)失敗后,該怎么處理呢懈涛?
對(duì)于那種因?yàn)橥话l(fā)的異常情況導(dǎo)致消息消費(fèi)失敗的,可以簡單的分為:
- 短暫性異常
- 持久性異常
短暫性異常比如有:網(wǎng)絡(luò)抖動(dòng)導(dǎo)致遠(yuǎn)程調(diào)用失敗無法繼續(xù)執(zhí)行導(dǎo)致消費(fèi)失敗宇植,這種短暫性異常一般在短時(shí)間內(nèi)就能恢復(fù)正常埋心,所以如果能讓消費(fèi)失敗后的消息等待一小段時(shí)間后重新被投遞并消費(fèi),那豈不是能大大減少因?yàn)楫惓?dǎo)致消費(fèi)失敗的消息數(shù)量闲坎,因?yàn)楫惓茬斧;謴?fù)了,消息也就能正常消費(fèi)了项秉。
持久性異常比如有:某個(gè)服務(wù)因?yàn)橐粋€(gè)未在測(cè)試階段發(fā)現(xiàn)的bug導(dǎo)致整個(gè)遠(yuǎn)程服務(wù)不可用伙狐,遠(yuǎn)程服務(wù)不可用瞬欧,消息也就注定消費(fèi)失敗了,這種情況下艘虎,肯定沒辦法短時(shí)間內(nèi)就解決并重新部署服務(wù),因此属划,就算消息被重新投遞多少次候生,也不可能被正常消費(fèi),所以簡單的重復(fù)投遞消費(fèi)失敗的消息是無法讓消息被正常消費(fèi)的须蜗。這樣反而只會(huì)無謂的浪費(fèi)系統(tǒng)資源,說不定還會(huì)因此影響到其他服務(wù)明肮。
失敗重試
上面說到,失敗重試可以解決短暫性導(dǎo)致的消費(fèi)失敗的情況循未。那么秫舌,Spring Cloud Stream
支不支持呢?答案是肯定的羔味,而且還非常簡單钠右,只需加入幾個(gè)配置即可。
首先飒房,配置 spring.cloud.stream.bindings.<channelName>.consumer.maxAttempts
是用來決定:消息最大可以被嘗試消費(fèi)的次數(shù)狠毯,包含第一次投遞。舉個(gè)例子嚼松,假設(shè)為默認(rèn)值 3,在第一次投遞后寝受,消費(fèi)失敗了罕偎,那么該消息還可以再被重復(fù)投遞2次。如果設(shè)為1颜及,也就代表不重試俏站。另外,該配置的值必須大于0乾翔,當(dāng)配置了 0 或 負(fù)數(shù)施戴,直接無法啟動(dòng)成功萌丈,并報(bào)如下錯(cuò)誤:
其次辆雾,既然有了失敗重試機(jī)制,那么肯定得有重試策略度迂,所以還需另外3個(gè)參數(shù)的配合惭墓,分別為(以下參數(shù)的前綴與maxAttempts
一樣,均為 spring.cloud.stream.bindings.<channelName>.consumer
):
- backOffInitialInterval: 消息消費(fèi)失敗后重試消費(fèi)消息的初始化間隔時(shí)間腊凶。默認(rèn)1s,即第一次重試消費(fèi)會(huì)在1s后進(jìn)行
- backOffMultiplier: 相鄰兩次重試之間的間隔時(shí)間的倍數(shù)褐缠。默認(rèn)2风瘦,即第二次是第一次間隔時(shí)間的2倍,第三次是第二次的2倍
- backOffMaxInterval: 下一次嘗試重試的最大時(shí)間間隔胡桨,默認(rèn)為10000ms瞬雹,即10s。
那么怎么結(jié)合起來理解呢?舉個(gè)例子:假設(shè)這幾個(gè)配置均使用默認(rèn)值状婶,重試第一次1s,第二次2秒草姻,因?yàn)槟J(rèn)最大重試次數(shù)為3稍刀,所以也就不會(huì)進(jìn)行第三次重試敞曹;而如果最大重試次數(shù)配置了大于3的值综膀,比如10,那么第三次4秒橄登,第四次為8秒讥此,而在第五次重試的時(shí)候,若沒有最大重試時(shí)間間隔的限制卒稳,重試時(shí)間為 2^4^ = 16
他巨,但是因?yàn)橛辛瞬怀^10秒的限制,第五次重試的時(shí)間間隔為10秒闻蛀,而不是剛剛算出的16秒;而接下來剩余的重試次數(shù)役衡,其重試時(shí)間間隔均為10秒薪棒。
示例
以下代碼可在 源碼 查看。
配置
spring:
application:
name: scas-data-collection
profiles:
active:
default
cloud:
stream:
binders:
rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings:
packetUplinkOutput:
destination: packetUplinkTopic
content-type: application/json
binder: rabbit
packetUplinkInput:
destination: packetUplinkTopic
content-type: application/json
group: ${spring.application.name}
binder: rabbit
consumer:
maxAttempts: 3 # 當(dāng)消息消費(fèi)失敗時(shí)棵介,嘗試消費(fèi)該消息的最大次數(shù)(消息消費(fèi)失敗后吧史,發(fā)布者會(huì)重新投遞)贸营。默認(rèn)3
backOffInitialInterval: 1000 # 消息消費(fèi)失敗后重試消費(fèi)消息的初始化間隔時(shí)間。默認(rèn)1s钞脂,即第一次重試消費(fèi)會(huì)在1s后進(jìn)行
backOffMultiplier: 2 # 相鄰兩次重試之間的間隔時(shí)間的倍數(shù)冰啃。默認(rèn)2刘莹,即第二次是第一次間隔時(shí)間的2倍焚刚,第三次是第二次的2倍
backOffMaxInterval: 10000 # 下一次嘗試重試的最大時(shí)間間隔,默認(rèn)為10000ms汪榔,即10s痴腌。
上面的配置均使用默認(rèn)配置。
消息模型
@Data
@NoArgsConstructor
@AllArgsConstructor
public class PacketModel {
/**
* 設(shè)備 eui
*/
private String devEui;
/**
* 數(shù)據(jù)
*/
private String data;
// 省略其他字段
}
測(cè)試用例
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("maxAttempt")
@EnableBinding({ScasMaxAttemptTest.MessageSink.class, ScasMaxAttemptTest.MessageSource.class})
public class ScasMaxAttemptTest {
@Autowired
private PacketUplinkProducer packetUplinkProducer;
private Random random = new Random();
private List<String> devEuis = new ArrayList<>(10);
@PostConstruct
private void initDevEuis() {
devEuis.add("10001");
devEuis.add("10002");
devEuis.add("10003");
devEuis.add("10004");
devEuis.add("10005");
devEuis.add("10006");
devEuis.add("10007");
devEuis.add("10008");
devEuis.add("10009");
devEuis.add("10010");
}
/**
*
*/
@Test
public void test() throws InterruptedException {
for (int i = 0; i < 1; i++) {
String devEui = getDevEuis();
packetUplinkProducer.publish(new PacketModel(devEui, UUID.randomUUID().toString()));
}
Thread.sleep(1000000);
}
private String getDevEuis() {
return devEuis.get(random.nextInt(10));
}
@Component
public static class PacketUplinkProducer {
@Autowired
private MessageSource messageSource;
public void publish(PacketModel model) {
log.info("發(fā)布上行數(shù)據(jù)包消息. model: [{}].", model);
messageSource.packetUplinkOutput().send(MessageBuilder.withPayload(model).build());
}
}
@Component
public static class PacketUplinkHandler {
@StreamListener("packetUplinkInput")
public void handle(PacketModel model) {
log.info("消費(fèi)上行數(shù)據(jù)包消息. model: [{}].", model);
throw new RuntimeException();
}
}
public interface MessageSink {
@Input("packetUplinkInput")
SubscribableChannel packetUplinkInput();
}
public interface MessageSource {
@Output("packetUplinkOutput")
MessageChannel packetUplinkOutput();
}
}@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("maxAttempt")
@EnableBinding({ScasMaxAttemptTest.MessageSink.class, ScasMaxAttemptTest.MessageSource.class})
public class ScasMaxAttemptTest {
@Autowired
private PacketUplinkProducer packetUplinkProducer;
private Random random = new Random();
private List<String> devEuis = new ArrayList<>(10);
@PostConstruct
private void initDevEuis() {
devEuis.add("10001");
devEuis.add("10002");
devEuis.add("10003");
devEuis.add("10004");
devEuis.add("10005");
devEuis.add("10006");
devEuis.add("10007");
devEuis.add("10008");
devEuis.add("10009");
devEuis.add("10010");
}
/**
*
*/
@Test
public void test() throws InterruptedException {
for (int i = 0; i < 1; i++) {
String devEui = getDevEuis();
packetUplinkProducer.publish(new PacketModel(devEui, UUID.randomUUID().toString()));
}
Thread.sleep(1000000);
}
private String getDevEuis() {
return devEuis.get(random.nextInt(10));
}
@Component
public static class PacketUplinkProducer {
@Autowired
private MessageSource messageSource;
public void publish(PacketModel model) {
log.info("發(fā)布上行數(shù)據(jù)包消息. model: [{}].", model);
messageSource.packetUplinkOutput().send(MessageBuilder.withPayload(model).build());
}
}
@Component
public static class PacketUplinkHandler {
@StreamListener("packetUplinkInput")
public void handle(PacketModel model) {
log.info("消費(fèi)上行數(shù)據(jù)包消息. model: [{}].", model);
throw new RuntimeException();
}
}
public interface MessageSink {
@Input("packetUplinkInput")
SubscribableChannel packetUplinkInput();
}
public interface MessageSource {
@Output("packetUplinkOutput")
MessageChannel packetUplinkOutput();
}
}
運(yùn)行測(cè)試用例
使用默認(rèn)配置
運(yùn)行測(cè)試用例后剥悟,你會(huì)看到控制臺(tái)打印類似如下的日志:
可以看到,打印的日志與上文分析的一致略板,第一次消費(fèi)失敗后慈缔,會(huì)再重試2次,一共嘗試消費(fèi)3次藐鹤,最后一次也失敗后,直接拋出異常挠蛉,不再繼續(xù)重試肄满。
增加最大重試次數(shù)
配置 maxAttempts = 10
,再次啟動(dòng)測(cè)試用例讥电,日志打印如下:
可以看到轧抗,從第五次重試開始瞬测,剩下的重試次數(shù)纠炮,重試時(shí)間間隔均為10s灯蝴。
如何配置更合適
其實(shí) Spring Cloud Stream
的默認(rèn)配置基本就夠了,因?yàn)槿绻且驗(yàn)槎虝盒援惓?dǎo)致消息消費(fèi)失敗耕肩,重試2次基本就差不多了问潭,重試太多反而可能會(huì)導(dǎo)致出現(xiàn)其他問題。
但是考慮到有些短暫性異辰泼Γ可能無法在1灾茁、2秒內(nèi)恢復(fù)正常,那我們可以稍微增大配置 backOffInitialInterval
或 backOffMultiplier
的值北专,比如:backOffInitialInterval = 5000
,backOffMultiplier = 5
语婴,backOffMaxInterval =60000
录粱,這種配置可能就比較適合實(shí)時(shí)性不高的情況。
總之菜职,我們可以根據(jù)具體業(yè)務(wù)以及生產(chǎn)環(huán)境旗闽,調(diào)整這幾個(gè)配置的值。
重試次數(shù)用完后消息會(huì)去哪适室?
你可能會(huì)好奇,當(dāng)重試次數(shù)用完后蔬螟,消息會(huì)跑去哪呢汽畴?這時(shí)如果訪問 Rabbitmq可視化頁面耸序,你會(huì)看到:
可以看到坎怪,
Ready
Unacked
Total
均為0廓握,也就是說,消息被丟棄了隙券?
事實(shí)上,消息確實(shí)被丟棄了殉了,但是這樣不好吧拟枚,這樣會(huì)存在丟失部分消息的隱患,于是不得不引入另一個(gè)概念——死信隊(duì)列隔箍。死信隊(duì)列有什么用呢脚乡?死信隊(duì)列是用來接收因?yàn)榉N種原因?qū)е孪o法正常消費(fèi)后的消息,當(dāng)然這里的原因不止消息重試次數(shù)用完后的消息奶稠。
因?yàn)樗佬抨?duì)列超出本文的范疇,這里就不詳細(xì)說明竹握,會(huì)在以后的文章詳講辆飘。
持久性異常的消費(fèi)失敗
當(dāng)異常情況為持久性異常,在異常情況恢復(fù)正常之前芹关,那么無論重試多少次紧卒,消息都無法被正常消費(fèi),所以只能在重試次數(shù)用完之后,要么丟棄該消息或進(jìn)入死信隊(duì)列贬媒。所以重試次數(shù)不能設(shè)置過大肘习,避免浪費(fèi)系統(tǒng)資源坡倔。
推薦閱讀
Spring Cloud Stream 進(jìn)階配置——高吞吐量(一)——多消費(fèi)者
Spring Cloud Stream 進(jìn)階配置——高吞吐量(二)——彈性消費(fèi)者數(shù)量
Spring Cloud Stream 進(jìn)階配置——高吞吐量(三)——批量預(yù)取消息(prefetch)