Spring Cloud Stream消費(fèi)失敗后的處理策略(二):自定義錯(cuò)誤處理邏輯

應(yīng)用場(chǎng)景

上一篇《Spring Cloud Stream消費(fèi)失敗后的處理策略(一):自動(dòng)重試》介紹了默認(rèn)就會(huì)生效的消息重試功能。對(duì)于一些因環(huán)境原因、網(wǎng)絡(luò)抖動(dòng)等不穩(wěn)定因素引發(fā)的問(wèn)題可以起到比較好的作用峭梳。但是對(duì)于諸如代碼本身存在的邏輯錯(cuò)誤等,無(wú)論重試多少次都不可能成功的問(wèn)題墅垮,是無(wú)法修復(fù)的积蔚。對(duì)于這樣的情況意鲸,前文中說(shuō)了可以利用日志記錄消息內(nèi)容,配合告警來(lái)做補(bǔ)救尽爆,但是很顯然怎顾,這樣做非常原始,并且太過(guò)笨拙漱贱,處理復(fù)雜度過(guò)高槐雾。所以,我們需要需求更好的辦法幅狮,本文將介紹針對(duì)該類問(wèn)題的一種處理方法:自定義錯(cuò)誤處理邏輯募强。

動(dòng)手試試

準(zhǔn)備一個(gè)會(huì)消費(fèi)失敗的例子,可以直接沿用前文的工程崇摄,也可以新建一個(gè)擎值,然后創(chuàng)建如下代碼的邏輯:

@EnableBinding(TestApplication.TestTopic.class)
@SpringBootApplication
public class TestApplication {

    public static void main(String[] args) {
        SpringApplication.run(TestApplication.class, args);
    }

    @RestController
    static class TestController {

        @Autowired
        private TestTopic testTopic;

        /**
         * 消息生產(chǎn)接口
         *
         * @param message
         * @return
         */
        @GetMapping("/sendMessage")
        public String messageWithMQ(@RequestParam String message) {
            testTopic.output().send(MessageBuilder.withPayload(message).build());
            return "ok";
        }

    }

    /**
     * 消息消費(fèi)邏輯
     */
    @Slf4j
    @Component
    static class TestListener {

        @StreamListener(TestTopic.INPUT)
        public void receive(String payload) {
            log.info("Received payload : " + payload);
            throw new RuntimeException("Message consumer failed!");
        }

    }

    interface TestTopic {

        String OUTPUT = "example-topic-output";
        String INPUT = "example-topic-input";

        @Output(OUTPUT)
        MessageChannel output();

        @Input(INPUT)
        SubscribableChannel input();

    }

}

內(nèi)容很簡(jiǎn)單,既包含了消息的生產(chǎn)配猫,也包含了消息消費(fèi)幅恋。消息消費(fèi)的時(shí)候主動(dòng)拋出了一個(gè)異常來(lái)模擬消息的消費(fèi)失敗。

在啟動(dòng)應(yīng)用之前泵肄,還要記得配置一下輸入輸出通道對(duì)應(yīng)的物理目標(biāo)(exchange或topic名)、并設(shè)置一下分組淑翼,比如:

spring.cloud.stream.bindings.example-topic-input.destination=test-topic
spring.cloud.stream.bindings.example-topic-input.group=stream-exception-handler
spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1

spring.cloud.stream.bindings.example-topic-output.destination=test-topic

完成了上面配置之后腐巢,啟動(dòng)應(yīng)用并訪問(wèn)localhost:8080/sendMessage?message=hello接口來(lái)發(fā)送一個(gè)消息到MQ中了,此時(shí)可以看到消費(fèi)失敗后拋出了異常玄括,跟上一篇文章的結(jié)果一樣冯丙,消息消費(fèi)失敗,記錄了日志,消息信息丟棄胃惜。

下面泞莉,針對(duì)消息消費(fèi)失敗,在TestListener中針對(duì)消息消費(fèi)邏輯創(chuàng)建一段錯(cuò)誤處理邏輯船殉,比如:

@Slf4j
@Component
static class TestListener {

    @StreamListener(TestTopic.INPUT)
    public void receive(String payload) {
        log.info("Received payload : " + payload);
        throw new RuntimeException("Message consumer failed!");
    }

    /**
     * 消息消費(fèi)失敗的降級(jí)處理邏輯
     *
     * @param message
     */
    @ServiceActivator(inputChannel = "test-topic.stream-exception-handler.errors")
    public void error(Message<?> message) {
        log.info("Message consumer failed, call fallback!");
    }

}

通過(guò)使用@ServiceActivator(inputChannel = "test-topic.stream-exception-handler.errors")指定了某個(gè)通道的錯(cuò)誤處理映射鲫趁。其中,inputChannel的配置中對(duì)應(yīng)關(guān)系如下:

  • test-topic:消息通道對(duì)應(yīng)的目標(biāo)(destination利虫,即:spring.cloud.stream.bindings.example-topic-input.destination的配置)
  • stream-exception-handler:消息通道對(duì)應(yīng)的消費(fèi)組(group挨厚,即:spring.cloud.stream.bindings.example-topic-input.group的配置)

再啟動(dòng)應(yīng)用并訪問(wèn)localhost:8080/sendMessage?message=hello接口來(lái)發(fā)送一個(gè)消息到MQ中,此時(shí)可以看到日志如下:

2018-12-11 12:00:35.500  INFO 75269 --- [ctor-http-nio-3] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2018-12-11 12:00:35.512  INFO 75269 --- [ctor-http-nio-3] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory.publisher#311db1cb:0/SimpleConnection@40370d8c [delegate=amqp://guest@127.0.0.1:5672/, localPort= 54391]
2018-12-11 12:00:35.527  INFO 75269 --- [ption-handler-1] c.d.stream.TestApplication$TestListener  : Received: hello,
2018-12-11 12:00:38.541  INFO 75269 --- [ption-handler-1] c.d.stream.TestApplication$TestListener  : Message consumer failed, call fallback!

雖然消費(fèi)邏輯中輸出了消息內(nèi)容之后拋出了異常糠惫,但是會(huì)進(jìn)入到error函數(shù)中疫剃,執(zhí)行錯(cuò)誤處理邏輯(這里只是答應(yīng)了一句話),用戶可以根據(jù)需要讀取消息內(nèi)容以及異常詳情做更進(jìn)一步的細(xì)化處理硼讽。

深入思考

由于error邏輯是通過(guò)編碼方式來(lái)實(shí)現(xiàn)的巢价,所以這段邏輯相對(duì)來(lái)說(shuō)比較死。通常固阁,只有業(yè)務(wù)上有明確的錯(cuò)誤處理邏輯的時(shí)候蹄溉,這種方法才可以比較好的被應(yīng)用到。不然能做的可能也只是將消息記錄下來(lái)您炉,然后具體的分析原因后再去做補(bǔ)救措施柒爵。所以這種方法也不是萬(wàn)能的,主要適用于有明確錯(cuò)誤處理方案的方式來(lái)使用(這種場(chǎng)景并不多)赚爵,另外棉胀。。冀膝。

注意:有坑唁奢! 這個(gè)方案在目前版本(2.0.x)其實(shí)還有一個(gè)坑,這種方式并不能很好的處理異常消息窝剖,會(huì)有部分消息得不到正確的處理麻掸,由于應(yīng)用場(chǎng)景也不多,所以目前不推薦使用這種方法來(lái)做(完全可以用原始的異常捕獲機(jī)制來(lái)處理赐纱,只是沒有這種方式那么優(yōu)雅)脊奋。目前看官方issue是在Spring Cloud Stream的2.1.0版本中會(huì)修復(fù),后續(xù)發(fā)布之后可以使用該功能疙描,具體點(diǎn)擊查看:Issue #1357诚隙。

而對(duì)于沒有特定的錯(cuò)誤處理方案的,也只能通過(guò)記錄和后續(xù)處理來(lái)解決起胰,可能這樣的方式也只是比從日志中抓去簡(jiǎn)單那么一些久又,并沒有得到很大的提升。但是,不要緊地消,因?yàn)橄乱黄覀儗⒗^續(xù)介紹其他更好的處理方案炉峰。

代碼示例

本文示例讀者可以通過(guò)查看下面?zhèn)}庫(kù)的中的stream-exception-handler-2項(xiàng)目:

如果您對(duì)這些感興趣,歡迎star脉执、follow疼阔、收藏、轉(zhuǎn)發(fā)給予支持适瓦!

以下專題教程也許您會(huì)有興趣

本文首發(fā):http://blog.didispace.com/spring-cloud-starter-finchley-7-3/

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末竿开,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子玻熙,更是在濱河造成了極大的恐慌否彩,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,383評(píng)論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件嗦随,死亡現(xiàn)場(chǎng)離奇詭異列荔,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)枚尼,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,522評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門贴浙,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人署恍,你說(shuō)我怎么就攤上這事崎溃。” “怎么了盯质?”我有些...
    開封第一講書人閱讀 157,852評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵袁串,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我呼巷,道長(zhǎng)囱修,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,621評(píng)論 1 284
  • 正文 為了忘掉前任王悍,我火速辦了婚禮破镰,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘压储。我一直安慰自己鲜漩,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,741評(píng)論 6 386
  • 文/花漫 我一把揭開白布渠脉。 她就那樣靜靜地躺著宇整,像睡著了一般。 火紅的嫁衣襯著肌膚如雪芋膘。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,929評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音为朋,去河邊找鬼臂拓。 笑死,一個(gè)胖子當(dāng)著我的面吹牛习寸,可吹牛的內(nèi)容都是我干的胶惰。 我是一名探鬼主播,決...
    沈念sama閱讀 39,076評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼霞溪,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼孵滞!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起鸯匹,我...
    開封第一講書人閱讀 37,803評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤坊饶,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后殴蓬,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體匿级,經(jīng)...
    沈念sama閱讀 44,265評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,582評(píng)論 2 327
  • 正文 我和宋清朗相戀三年染厅,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了痘绎。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,716評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡肖粮,死狀恐怖孤页,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情涩馆,我是刑警寧澤行施,帶...
    沈念sama閱讀 34,395評(píng)論 4 333
  • 正文 年R本政府宣布,位于F島的核電站凌净,受9級(jí)特大地震影響悲龟,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜冰寻,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,039評(píng)論 3 316
  • 文/蒙蒙 一须教、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧斩芭,春花似錦轻腺、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,798評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至琴庵,卻和暖如春误算,著一層夾襖步出監(jiān)牢的瞬間仰美,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,027評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工儿礼, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留咖杂,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,488評(píng)論 2 361
  • 正文 我出身青樓蚊夫,卻偏偏與公主長(zhǎng)得像诉字,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子知纷,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,612評(píng)論 2 350

推薦閱讀更多精彩內(nèi)容