Spring Cloud Stream消費(fèi)失敗后的處理策略(三):使用DLQ隊(duì)列(RabbitMQ)

應(yīng)用場景

前兩天我們已經(jīng)介紹了兩種Spring Cloud Stream對(duì)消息失敗的處理策略:

  • 自動(dòng)重試:對(duì)于一些因環(huán)境原因(如:網(wǎng)絡(luò)抖動(dòng)等不穩(wěn)定因素)引發(fā)的問題可以起到比較好的作用胸嘴,提高消息處理的成功率变汪。
  • 自定義錯(cuò)誤處理邏輯:如果業(yè)務(wù)上,消息處理失敗之后有明確的降級(jí)邏輯可以彌補(bǔ)的产弹,可以采用這種方式,但是2.0.x版本有Bug,2.1.x版本修復(fù)。

那么如果代碼本身存在邏輯錯(cuò)誤斯撮,無論重試多少次都不可能成功,也沒有具體的降級(jí)業(yè)務(wù)邏輯悦即,之前在深入思考中討論過吮成,可以通過日志橱乱,或者降級(jí)邏輯記錄的方式把錯(cuò)誤消息保存下來辜梳,然后事后分析、修復(fù)Bug再重新處理泳叠。但是很顯然作瞄,這樣做非常原始,并且太過笨拙危纫,處理復(fù)雜度過高宗挥。所以,本文將介紹利用中間件特性來便捷地處理該問題的方案:使用RabbitMQ的DLQ隊(duì)列种蝶。

動(dòng)手試試

準(zhǔn)備一個(gè)會(huì)消費(fèi)失敗的例子契耿,可以直接沿用前文的工程。也可以新建一個(gè)螃征,然后創(chuàng)建如下代碼的邏輯:

@EnableBinding(TestApplication.TestTopic.class)
@SpringBootApplication
public class TestApplication {

    public static void main(String[] args) {
        SpringApplication.run(TestApplication.class, args);
    }

    @RestController
    static class TestController {

        @Autowired
        private TestTopic testTopic;

        /**
         * 消息生產(chǎn)接口
         *
         * @param message
         * @return
         */
        @GetMapping("/sendMessage")
        public String messageWithMQ(@RequestParam String message) {
            testTopic.output().send(MessageBuilder.withPayload(message).build());
            return "ok";
        }

    }

    /**
     * 消息消費(fèi)邏輯
     */
    @Slf4j
    @Component
    static class TestListener {

        @StreamListener(TestTopic.INPUT)
        public void receive(String payload) {
            log.info("Received payload : " + payload);
            throw new RuntimeException("Message consumer failed!");
        }

    }

    interface TestTopic {

        String OUTPUT = "example-topic-output";
        String INPUT = "example-topic-input";

        @Output(OUTPUT)
        MessageChannel output();

        @Input(INPUT)
        SubscribableChannel input();

    }

}

內(nèi)容很簡單搪桂,既包含了消息的生產(chǎn),也包含了消息消費(fèi)盯滚。消息消費(fèi)的時(shí)候主動(dòng)拋出了一個(gè)異常來模擬消息的消費(fèi)失敗踢械。

在啟動(dòng)應(yīng)用之前,還要記得配置一下輸入輸出通道對(duì)應(yīng)的物理目標(biāo)(exchange或topic名)魄藕、并設(shè)置一下分組内列,比如:

spring.cloud.stream.bindings.example-topic-input.destination=test-topic
spring.cloud.stream.bindings.example-topic-input.group=stream-exception-handler
spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1
spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.auto-bind-dlq=true

spring.cloud.stream.bindings.example-topic-output.destination=test-topic

這里加入了一個(gè)重要配置spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.auto-bind-dlq=true,用來開啟DLQ(死信隊(duì)列)背率。完成了上面配置之后话瞧,啟動(dòng)應(yīng)用并訪問localhost:8080/sendMessage?message=hello接口來發(fā)送一個(gè)消息到MQ中了嫩与,此時(shí)可以看到消費(fèi)失敗后拋出了異常,消息消費(fèi)失敗交排,記錄了日志蕴纳。此時(shí),可以查看RabbitMQ的控制臺(tái)如下:

pasted-129.png

其中个粱,test-topic.stream-exception-handler.dlq隊(duì)列就是test-topic.stream-exception-handler的dlq(死信)隊(duì)列古毛,當(dāng)test-topic.stream-exception-handler隊(duì)列中的消息消費(fèi)時(shí)候之后,就會(huì)將這條消息原封不動(dòng)的轉(zhuǎn)存到dlq隊(duì)列中都许。這樣這些沒有得到妥善處理的消息就通過簡單的配置實(shí)現(xiàn)了存儲(chǔ)稻薇,之后,我們還可以通過簡單的操作對(duì)這些消息進(jìn)行重新消費(fèi)胶征。我們只需要在控制臺(tái)中點(diǎn)擊test-topic.stream-exception-handler.dlq隊(duì)列的名字進(jìn)入到詳情頁面之后塞椎,使用Move messages功能,直接將這些消息移動(dòng)回test-topic.stream-exception-handler隊(duì)列睛低,這樣這些消息就能重新被消費(fèi)一次案狠。

pasted-130.png

如果Move messages功能中是如下內(nèi)容:

To move messages, the shovel plugin must be enabled, try:

$ rabbitmq-plugins enable rabbitmq_shovel rabbitmq_shovel_management

那是由于沒有安裝對(duì)應(yīng)的插件,只需要根據(jù)提示的命令安裝就能使用該命令了钱雷。

深入思考

先來總結(jié)一下在引入了RabbitMQ的DLQ之后骂铁,對(duì)于消息異常處理更為完整一些的基本思路:

  1. 瞬時(shí)的環(huán)境抖動(dòng)引起的異常,利用重試功能提高處理成功率
  2. 如果重試依然失敗的罩抗,日志報(bào)錯(cuò)拉庵,并進(jìn)入DLQ隊(duì)列
  3. 日志告警通知相關(guān)開發(fā)人員,分析問題原因
  4. 解決問題(修復(fù)程序Bug套蒂、擴(kuò)容等措施)之后钞支,DLQ隊(duì)列中的消息移回重新處理

在這樣的整體思路中,可能還涉及一些微調(diào)操刀,這里舉幾個(gè)常見例子烁挟,幫助讀者進(jìn)一步了解一些特殊的場景和配置使用!

場景一:有些消息在業(yè)務(wù)上存在時(shí)效性骨坑,進(jìn)入死信隊(duì)列之后撼嗓,過一段時(shí)間再處理已經(jīng)沒有意義,這個(gè)時(shí)候如何過濾這些消息呢卡啰?

只需要配置一個(gè)參數(shù)即可:

spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.dlq-ttl=10000

該參數(shù)可以控制DLQ隊(duì)列中消息的存活時(shí)間静稻,當(dāng)超過配置時(shí)間之后,該消息會(huì)自動(dòng)的從DLQ隊(duì)列中移除匈辱。

場景二:可能進(jìn)入DLQ隊(duì)列的消息存在各種不同的原因(不同異常造成的)振湾,此時(shí)如果在做補(bǔ)救措施的時(shí)候,還希望根據(jù)這些異常做不同的處理時(shí)候亡脸,我們?nèi)绾螀^(qū)分這些消息進(jìn)入DLQ的原因呢押搪?

再來看看這個(gè)參數(shù):

spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.republish-to-dlq=true

該參數(shù)默認(rèn)是false树酪,如果設(shè)置了死信隊(duì)列的時(shí)候,會(huì)將消息原封不動(dòng)的發(fā)送到死信隊(duì)列(也就是上面例子中的實(shí)現(xiàn))大州,此時(shí)大家可以在RabbitMQ控制臺(tái)中通過Get message(s)功能來看看隊(duì)列中的消息续语,應(yīng)該如下圖所示:

pasted-131.png

這是一條原始消息。

如果我們?cè)撆渲迷O(shè)置為true的時(shí)候厦画,那么該消息在進(jìn)入到死信隊(duì)列的時(shí)候疮茄,會(huì)在headers中加入錯(cuò)誤信息,如下圖所示:


pasted-132.png

這樣根暑,不論我們是通過移回原通道處理還是新增訂閱處理這些消息的時(shí)候就可以以此作為依據(jù)進(jìn)行分類型處理了力试。

關(guān)于RabbitMQ的binder中還有很多關(guān)于DLQ的配置,這里不一一介紹了排嫌,上面幾個(gè)是目前筆者使用過的幾個(gè)畸裳,其他一些暫時(shí)認(rèn)為采用默認(rèn)配置已經(jīng)夠用,除非還有其他定制要求淳地,或者是存量內(nèi)容怖糊,需要去適配才會(huì)去配置。讀者可以查看官方文檔了解更多詳情颇象!

代碼示例

本文示例讀者可以通過查看下面?zhèn)}庫的中的stream-exception-handler-3項(xiàng)目:

如果您對(duì)這些感興趣伍伤,歡迎star、follow夯到、收藏嚷缭、轉(zhuǎn)發(fā)給予支持!

以下專題教程也許您會(huì)有興趣

本文首發(fā):http://blog.didispace.com/spring-cloud-starter-finchley-7-4/

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末耍贾,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子路幸,更是在濱河造成了極大的恐慌荐开,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,194評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件简肴,死亡現(xiàn)場離奇詭異晃听,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)砰识,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,058評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門能扒,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人辫狼,你說我怎么就攤上這事初斑。” “怎么了膨处?”我有些...
    開封第一講書人閱讀 156,780評(píng)論 0 346
  • 文/不壞的土叔 我叫張陵见秤,是天一觀的道長砂竖。 經(jīng)常有香客問我,道長鹃答,這世上最難降的妖魔是什么乎澄? 我笑而不...
    開封第一講書人閱讀 56,388評(píng)論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮测摔,結(jié)果婚禮上置济,老公的妹妹穿的比我還像新娘。我一直安慰自己锋八,他們只是感情好舟肉,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,430評(píng)論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著查库,像睡著了一般路媚。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上樊销,一...
    開封第一講書人閱讀 49,764評(píng)論 1 290
  • 那天整慎,我揣著相機(jī)與錄音,去河邊找鬼围苫。 笑死裤园,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的剂府。 我是一名探鬼主播拧揽,決...
    沈念sama閱讀 38,907評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢(mèng)啊……” “哼腺占!你這毒婦竟也來了淤袜?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,679評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤衰伯,失蹤者是張志新(化名)和其女友劉穎铡羡,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體意鲸,經(jīng)...
    沈念sama閱讀 44,122評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡烦周,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,459評(píng)論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了怎顾。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片读慎。...
    茶點(diǎn)故事閱讀 38,605評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖槐雾,靈堂內(nèi)的尸體忽然破棺而出夭委,到底是詐尸還是另有隱情,我是刑警寧澤蚜退,帶...
    沈念sama閱讀 34,270評(píng)論 4 329
  • 正文 年R本政府宣布闰靴,位于F島的核電站彪笼,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏蚂且。R本人自食惡果不足惜配猫,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,867評(píng)論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望杏死。 院中可真熱鬧泵肄,春花似錦、人聲如沸淑翼。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,734評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽玄括。三九已至冯丙,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間遭京,已是汗流浹背胃惜。 一陣腳步聲響...
    開封第一講書人閱讀 31,961評(píng)論 1 265
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留哪雕,地道東北人船殉。 一個(gè)月前我還...
    沈念sama閱讀 46,297評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像斯嚎,于是被迫代替她去往敵國和親利虫。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,472評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容