Spring Cloud Stream 進(jìn)階配置——高可用(一)——失敗重試

前言

軟件的高可用一直是軟件建設(shè)的難點(diǎn)篓跛,接下來探討一下如何借助 Spring Cloud Stream 讓我們的 rabbitmq 變得更加高可用。

消息消費(fèi)失敗

消息的消費(fèi)举塔,說到底其實(shí)就是:根據(jù)接收到的消息(攜帶了某種信號(hào))執(zhí)行一系列業(yè)務(wù)邏輯央渣。而執(zhí)行過程中,由于種種異常情況芽丹,或多或少都會(huì)出現(xiàn)執(zhí)行失敗的情況,那么問題來了咕村,當(dāng)消息消費(fèi)失敗后,該怎么處理呢懈涛?

對(duì)于那種因?yàn)橥话l(fā)的異常情況導(dǎo)致消息消費(fèi)失敗的,可以簡單的分為:

  • 短暫性異常
  • 持久性異常

短暫性異常比如有:網(wǎng)絡(luò)抖動(dòng)導(dǎo)致遠(yuǎn)程調(diào)用失敗無法繼續(xù)執(zhí)行導(dǎo)致消費(fèi)失敗宇植,這種短暫性異常一般在短時(shí)間內(nèi)就能恢復(fù)正常埋心,所以如果能讓消費(fèi)失敗后的消息等待一小段時(shí)間后重新被投遞并消費(fèi),那豈不是能大大減少因?yàn)楫惓?dǎo)致消費(fèi)失敗的消息數(shù)量闲坎,因?yàn)楫惓茬斧;謴?fù)了,消息也就能正常消費(fèi)了项秉。

持久性異常比如有:某個(gè)服務(wù)因?yàn)橐粋€(gè)未在測(cè)試階段發(fā)現(xiàn)的bug導(dǎo)致整個(gè)遠(yuǎn)程服務(wù)不可用伙狐,遠(yuǎn)程服務(wù)不可用瞬欧,消息也就注定消費(fèi)失敗了,這種情況下艘虎,肯定沒辦法短時(shí)間內(nèi)就解決并重新部署服務(wù),因此属划,就算消息被重新投遞多少次候生,也不可能被正常消費(fèi),所以簡單的重復(fù)投遞消費(fèi)失敗的消息是無法讓消息被正常消費(fèi)的须蜗。這樣反而只會(huì)無謂的浪費(fèi)系統(tǒng)資源,說不定還會(huì)因此影響到其他服務(wù)明肮。

失敗重試

上面說到,失敗重試可以解決短暫性導(dǎo)致的消費(fèi)失敗的情況循未。那么秫舌,Spring Cloud Stream 支不支持呢?答案是肯定的羔味,而且還非常簡單钠右,只需加入幾個(gè)配置即可。

首先飒房,配置 spring.cloud.stream.bindings.<channelName>.consumer.maxAttempts 是用來決定:消息最大可以被嘗試消費(fèi)的次數(shù)狠毯,包含第一次投遞。舉個(gè)例子嚼松,假設(shè)為默認(rèn)值 3,在第一次投遞后寝受,消費(fèi)失敗了罕偎,那么該消息還可以再被重復(fù)投遞2次。如果設(shè)為1颜及,也就代表不重試俏站。另外,該配置的值必須大于0乾翔,當(dāng)配置了 0 或 負(fù)數(shù)施戴,直接無法啟動(dòng)成功萌丈,并報(bào)如下錯(cuò)誤:

max attempts should be greater than zero

其次辆雾,既然有了失敗重試機(jī)制,那么肯定得有重試策略度迂,所以還需另外3個(gè)參數(shù)的配合惭墓,分別為(以下參數(shù)的前綴與maxAttempts 一樣,均為 spring.cloud.stream.bindings.<channelName>.consumer):

  • backOffInitialInterval: 消息消費(fèi)失敗后重試消費(fèi)消息的初始化間隔時(shí)間腊凶。默認(rèn)1s,即第一次重試消費(fèi)會(huì)在1s后進(jìn)行
  • backOffMultiplier: 相鄰兩次重試之間的間隔時(shí)間的倍數(shù)褐缠。默認(rèn)2风瘦,即第二次是第一次間隔時(shí)間的2倍,第三次是第二次的2倍
  • backOffMaxInterval: 下一次嘗試重試的最大時(shí)間間隔胡桨,默認(rèn)為10000ms瞬雹,即10s。

那么怎么結(jié)合起來理解呢?舉個(gè)例子:假設(shè)這幾個(gè)配置均使用默認(rèn)值状婶,重試第一次1s,第二次2秒草姻,因?yàn)槟J(rèn)最大重試次數(shù)為3稍刀,所以也就不會(huì)進(jìn)行第三次重試敞曹;而如果最大重試次數(shù)配置了大于3的值综膀,比如10,那么第三次4秒橄登,第四次為8秒讥此,而在第五次重試的時(shí)候,若沒有最大重試時(shí)間間隔的限制卒稳,重試時(shí)間為 2^4^ = 16他巨,但是因?yàn)橛辛瞬怀^10秒的限制,第五次重試的時(shí)間間隔為10秒闻蛀,而不是剛剛算出的16秒;而接下來剩余的重試次數(shù)役衡,其重試時(shí)間間隔均為10秒薪棒。

示例

以下代碼可在 源碼 查看。

配置

spring:
  application:
    name: scas-data-collection
  profiles:
    active:
      default

  cloud:
    stream:
      binders:
        rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest

      bindings:
        packetUplinkOutput:
          destination: packetUplinkTopic
          content-type: application/json
          binder: rabbit

        packetUplinkInput:
          destination: packetUplinkTopic
          content-type: application/json
          group: ${spring.application.name}
          binder: rabbit
          consumer:
            maxAttempts: 3 # 當(dāng)消息消費(fèi)失敗時(shí)棵介,嘗試消費(fèi)該消息的最大次數(shù)(消息消費(fèi)失敗后吧史,發(fā)布者會(huì)重新投遞)贸营。默認(rèn)3
            backOffInitialInterval: 1000 # 消息消費(fèi)失敗后重試消費(fèi)消息的初始化間隔時(shí)間。默認(rèn)1s钞脂,即第一次重試消費(fèi)會(huì)在1s后進(jìn)行
            backOffMultiplier: 2 # 相鄰兩次重試之間的間隔時(shí)間的倍數(shù)冰啃。默認(rèn)2刘莹,即第二次是第一次間隔時(shí)間的2倍焚刚,第三次是第二次的2倍
            backOffMaxInterval: 10000 # 下一次嘗試重試的最大時(shí)間間隔,默認(rèn)為10000ms汪榔,即10s痴腌。

上面的配置均使用默認(rèn)配置。

消息模型
@Data
@NoArgsConstructor
@AllArgsConstructor
public class PacketModel {
    /**
     * 設(shè)備 eui
     */
    private String devEui;

    /**
     * 數(shù)據(jù)
     */
    private String data;

    // 省略其他字段
}

測(cè)試用例


@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("maxAttempt")
@EnableBinding({ScasMaxAttemptTest.MessageSink.class, ScasMaxAttemptTest.MessageSource.class})
public class ScasMaxAttemptTest {

    @Autowired
    private PacketUplinkProducer packetUplinkProducer;

    private Random random = new Random();
    private List<String> devEuis = new ArrayList<>(10);

    @PostConstruct
    private void initDevEuis() {
        devEuis.add("10001");
        devEuis.add("10002");
        devEuis.add("10003");
        devEuis.add("10004");
        devEuis.add("10005");
        devEuis.add("10006");
        devEuis.add("10007");
        devEuis.add("10008");
        devEuis.add("10009");
        devEuis.add("10010");
    }

    /**
     *
     */
    @Test
    public void test() throws InterruptedException {
        for (int i = 0; i < 1; i++) {
            String devEui = getDevEuis();
            packetUplinkProducer.publish(new PacketModel(devEui, UUID.randomUUID().toString()));
        }

        Thread.sleep(1000000);

    }

    private String getDevEuis() {
        return devEuis.get(random.nextInt(10));
    }

    @Component
    public static class PacketUplinkProducer {

        @Autowired
        private MessageSource messageSource;

        public void publish(PacketModel model) {
        log.info("發(fā)布上行數(shù)據(jù)包消息. model: [{}].", model);
            messageSource.packetUplinkOutput().send(MessageBuilder.withPayload(model).build());
        }

    }

    @Component
    public static class PacketUplinkHandler {

        @StreamListener("packetUplinkInput")
        public void handle(PacketModel model) {
            log.info("消費(fèi)上行數(shù)據(jù)包消息. model: [{}].", model);
            throw new RuntimeException();
        }

    }

    public interface MessageSink {

        @Input("packetUplinkInput")
        SubscribableChannel packetUplinkInput();

    }

    public interface MessageSource {

        @Output("packetUplinkOutput")
        MessageChannel packetUplinkOutput();

    }

}@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("maxAttempt")
@EnableBinding({ScasMaxAttemptTest.MessageSink.class, ScasMaxAttemptTest.MessageSource.class})
public class ScasMaxAttemptTest {

    @Autowired
    private PacketUplinkProducer packetUplinkProducer;

    private Random random = new Random();
    private List<String> devEuis = new ArrayList<>(10);

    @PostConstruct
    private void initDevEuis() {
        devEuis.add("10001");
        devEuis.add("10002");
        devEuis.add("10003");
        devEuis.add("10004");
        devEuis.add("10005");
        devEuis.add("10006");
        devEuis.add("10007");
        devEuis.add("10008");
        devEuis.add("10009");
        devEuis.add("10010");
    }

    /**
     *
     */
    @Test
    public void test() throws InterruptedException {
        for (int i = 0; i < 1; i++) {
            String devEui = getDevEuis();
            packetUplinkProducer.publish(new PacketModel(devEui, UUID.randomUUID().toString()));
        }

        Thread.sleep(1000000);

    }

    private String getDevEuis() {
        return devEuis.get(random.nextInt(10));
    }

    @Component
    public static class PacketUplinkProducer {

        @Autowired
        private MessageSource messageSource;

        public void publish(PacketModel model) {
        log.info("發(fā)布上行數(shù)據(jù)包消息. model: [{}].", model);
            messageSource.packetUplinkOutput().send(MessageBuilder.withPayload(model).build());
        }

    }

    @Component
    public static class PacketUplinkHandler {

        @StreamListener("packetUplinkInput")
        public void handle(PacketModel model) {
            log.info("消費(fèi)上行數(shù)據(jù)包消息. model: [{}].", model);
            throw new RuntimeException();
        }

    }

    public interface MessageSink {

        @Input("packetUplinkInput")
        SubscribableChannel packetUplinkInput();

    }

    public interface MessageSource {

        @Output("packetUplinkOutput")
        MessageChannel packetUplinkOutput();

    }

}

運(yùn)行測(cè)試用例

使用默認(rèn)配置

運(yùn)行測(cè)試用例后剥悟,你會(huì)看到控制臺(tái)打印類似如下的日志:


使用默認(rèn)配置

可以看到,打印的日志與上文分析的一致略板,第一次消費(fèi)失敗后慈缔,會(huì)再重試2次,一共嘗試消費(fèi)3次藐鹤,最后一次也失敗后,直接拋出異常挠蛉,不再繼續(xù)重試肄满。

增加最大重試次數(shù)

配置 maxAttempts = 10,再次啟動(dòng)測(cè)試用例讥电,日志打印如下:

最大嘗試重試次數(shù)為10

可以看到轧抗,從第五次重試開始瞬测,剩下的重試次數(shù)纠炮,重試時(shí)間間隔均為10s灯蝴。

如何配置更合適

其實(shí) Spring Cloud Stream 的默認(rèn)配置基本就夠了,因?yàn)槿绻且驗(yàn)槎虝盒援惓?dǎo)致消息消費(fèi)失敗耕肩,重試2次基本就差不多了问潭,重試太多反而可能會(huì)導(dǎo)致出現(xiàn)其他問題。

但是考慮到有些短暫性異辰泼Γ可能無法在1灾茁、2秒內(nèi)恢復(fù)正常,那我們可以稍微增大配置 backOffInitialIntervalbackOffMultiplier 的值北专,比如:backOffInitialInterval = 5000backOffMultiplier = 5语婴,backOffMaxInterval =60000录粱,這種配置可能就比較適合實(shí)時(shí)性不高的情況。

總之菜职,我們可以根據(jù)具體業(yè)務(wù)以及生產(chǎn)環(huán)境旗闽,調(diào)整這幾個(gè)配置的值。

重試次數(shù)用完后消息會(huì)去哪适室?

你可能會(huì)好奇,當(dāng)重試次數(shù)用完后蔬螟,消息會(huì)跑去哪呢汽畴?這時(shí)如果訪問 Rabbitmq可視化頁面耸序,你會(huì)看到:

消息被丟棄了

可以看到坎怪,Ready Unacked Total 均為0廓握,也就是說,消息被丟棄了隙券?

事實(shí)上,消息確實(shí)被丟棄了殉了,但是這樣不好吧拟枚,這樣會(huì)存在丟失部分消息的隱患,于是不得不引入另一個(gè)概念——死信隊(duì)列隔箍。死信隊(duì)列有什么用呢脚乡?死信隊(duì)列是用來接收因?yàn)榉N種原因?qū)е孪o法正常消費(fèi)后的消息,當(dāng)然這里的原因不止消息重試次數(shù)用完后的消息奶稠。

因?yàn)樗佬抨?duì)列超出本文的范疇,這里就不詳細(xì)說明竹握,會(huì)在以后的文章詳講辆飘。

持久性異常的消費(fèi)失敗

當(dāng)異常情況為持久性異常,在異常情況恢復(fù)正常之前芹关,那么無論重試多少次紧卒,消息都無法被正常消費(fèi),所以只能在重試次數(shù)用完之后,要么丟棄該消息或進(jìn)入死信隊(duì)列贬媒。所以重試次數(shù)不能設(shè)置過大肘习,避免浪費(fèi)系統(tǒng)資源坡倔。

推薦閱讀

Spring Cloud Stream 進(jìn)階配置——高吞吐量(一)——多消費(fèi)者
Spring Cloud Stream 進(jìn)階配置——高吞吐量(二)——彈性消費(fèi)者數(shù)量
Spring Cloud Stream 進(jìn)階配置——高吞吐量(三)——批量預(yù)取消息(prefetch)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末罪塔,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子瘩缆,更是在濱河造成了極大的恐慌佃蚜,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,126評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件熟尉,死亡現(xiàn)場離奇詭異洲脂,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)恐锦,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門一铅,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人飘蚯,你說我怎么就攤上這事福也。” “怎么了暴凑?”我有些...
    開封第一講書人閱讀 152,445評(píng)論 0 341
  • 文/不壞的土叔 我叫張陵,是天一觀的道長凯傲。 經(jīng)常有香客問我,道長幌缝,這世上最難降的妖魔是什么诫欠? 我笑而不...
    開封第一講書人閱讀 55,185評(píng)論 1 278
  • 正文 為了忘掉前任,我火速辦了婚禮轿偎,結(jié)果婚禮上被廓,老公的妹妹穿的比我還像新娘。我一直安慰自己昆婿,他們只是感情好亦渗,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,178評(píng)論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著多律,像睡著了一般搂蜓。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上帮碰,一...
    開封第一講書人閱讀 48,970評(píng)論 1 284
  • 那天殉挽,我揣著相機(jī)與錄音,去河邊找鬼斯碌。 笑死,一個(gè)胖子當(dāng)著我的面吹牛投慈,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播加袋,決...
    沈念sama閱讀 38,276評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢(mèng)啊……” “哼倚搬!你這毒婦竟也來了阳堕?” 一聲冷哼從身側(cè)響起择克,我...
    開封第一講書人閱讀 36,927評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤肚邢,失蹤者是張志新(化名)和其女友劉穎拭卿,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體响蕴,經(jīng)...
    沈念sama閱讀 43,400評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡惠桃,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,883評(píng)論 2 323
  • 正文 我和宋清朗相戀三年辜王,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片呐馆。...
    茶點(diǎn)故事閱讀 37,997評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡汹来,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出收班,到底是詐尸還是另有隱情,我是刑警寧澤炮车,帶...
    沈念sama閱讀 33,646評(píng)論 4 322
  • 正文 年R本政府宣布,位于F島的核電站纪隙,受9級(jí)特大地震影響扛或,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜熙兔,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,213評(píng)論 3 307
  • 文/蒙蒙 一住涉、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧舆声,春花似錦、人聲如沸碱屁。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽柿赊。三九已至幻枉,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間展辞,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評(píng)論 1 260
  • 我被黑心中介騙來泰國打工洽腺, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留覆旱,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,423評(píng)論 2 352
  • 正文 我出身青樓藕坯,卻偏偏與公主長得像,于是被迫代替她去往敵國和親炼彪。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,722評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容