RocketMQ 與 Spring Cloud Stream整合(四腾誉、消費(fèi)異常處理機(jī)制)

在 Spring Cloud Stream 中椭盏,提供了通用的消費(fèi)異常處理機(jī)制组砚,可以攔截到消費(fèi)者消費(fèi)消息時(shí)發(fā)生的異常,進(jìn)行自定義的處理邏輯掏颊。

4.1 復(fù)制項(xiàng)目

消費(fèi)重試小節(jié)的 [sca-stream-rocketmq-consumer-retry](糟红,復(fù)制出 sca-stream-rocketmq-consumer-error-handler
注意修改:<artifactId>sc-stream-rocketmq-consumer-error-handler</artifactId>.

4.2 Demo01Consumer

修改 [Demo01Consumer] 類乌叶,增加消費(fèi)異常處理方法盆偿。完整代碼如下:

package com.erbadagang.springcloudalibaba.stream.rocketmq.consumer.listener;

import com.erbadagang.springcloudalibaba.stream.rocketmq.consumer.message.Demo01Message;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.stereotype.Component;

@Component
public class Demo01Consumer {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @StreamListener(MySink.ERBADAGANG_INPUT)// 對(duì)應(yīng) ERBADAGANG-TOPIC-01.erbadagang-consumer-group-ERBADAGANG-TOPIC-01
    public void onMessage(@Payload Demo01Message message) {
        logger.info("[onMessage][線程編號(hào):{} 消息內(nèi)容:{}]", Thread.currentThread().getId(), message);
        // <1> 注意,此處拋出一個(gè) RuntimeException 異常准浴,模擬消費(fèi)失敗
        throw new RuntimeException("我就是故意拋出一個(gè)異常");
    }

    @ServiceActivator(inputChannel = "ERBADAGANG-TOPIC-01.erbadagang-consumer-group-ERBADAGANG-TOPIC-01.errors")
    public void handleError(ErrorMessage errorMessage) {
        logger.error("[handleError][payload:{}]", ExceptionUtils.getRootCauseMessage(errorMessage.getPayload()));
        logger.error("[handleError][originalMessage:{}]", errorMessage.getOriginalMessage());
        logger.error("[handleError][headers:{}]", errorMessage.getHeaders());
    }

    @StreamListener(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) // errorChannel
    public void globalHandleError(ErrorMessage errorMessage) {
        logger.error("[globalHandleError][payload:{}]", ExceptionUtils.getRootCauseMessage(errorMessage.getPayload()));
        logger.error("[globalHandleError][originalMessage:{}]", errorMessage.getOriginalMessage());
        logger.error("[globalHandleError][headers:{}]", errorMessage.getHeaders());
    }

}

① 在 Spring Integration 的設(shè)定中事扭,若 #onMessage(@Payload Demo01Message message) 方法消費(fèi)消息發(fā)生異常時(shí),會(huì)發(fā)送錯(cuò)誤消息(ErrorMessage)到對(duì)應(yīng)的錯(cuò)誤 Channel(<destination>.<group>.errors中乐横。同時(shí)求橄,所有錯(cuò)誤 Channel 都橋接到了 Spring Integration 定義的全局錯(cuò)誤 Channel(errorChannel)

因此晰奖,我們有兩種方式來(lái)實(shí)現(xiàn)異常處理:

  • 局部的異常處理:通過(guò)訂閱指定錯(cuò)誤 Channel
  • 全局的異常處理:通過(guò)訂閱全局錯(cuò)誤 Channel

② 在 #handleError(ErrorMessage errorMessage) 方法上谈撒,我們聲明了 @ServiceActivator 注解腥泥,訂閱指定錯(cuò)誤 Channel的錯(cuò)誤消息匾南,實(shí)現(xiàn) #onMessage(@Payload Demo01Message message) 方法的局部異常處理。如下圖所示:

配置的對(duì)應(yīng)關(guān)系

③ 在 #globalHandleError(ErrorMessage errorMessage) 方法上蛔外,我們聲明了 @StreamListener 注解蛆楞,訂閱全局錯(cuò)誤 Channel的錯(cuò)誤消息,實(shí)現(xiàn)全局異常處理夹厌。

④ 在全局和局部異常處理都定義的情況下豹爹,錯(cuò)誤消息僅會(huì)被符合條件的局部錯(cuò)誤異常處理。如果沒(méi)有符合條件的矛纹,錯(cuò)誤消息才會(huì)被全局異常處理臂聋。

4.3 簡(jiǎn)單測(cè)試

① 執(zhí)行 ConsumerApplication,啟動(dòng)消費(fèi)者的實(shí)例。

② 執(zhí)行 ProducerApplication孩等,啟動(dòng)生產(chǎn)者的實(shí)例艾君。

之后,請(qǐng)求 http://127.0.0.1:18080/demo01/send 接口肄方,發(fā)送一條消息冰垄。IDEA 控制臺(tái)輸出日志如下:

// onMessage 方法
2020-08-06 16:47:41.195  INFO 6692 --- [MessageThread_1] c.e.s.s.r.c.listener.Demo01Consumer      : [onMessage][線程編號(hào):71 消息內(nèi)容:Demo01Message{id=742970346}]

// handleError 方法
2020-08-06 16:47:41.195 ERROR 6692 --- [MessageThread_1] c.e.s.s.r.c.listener.Demo01Consumer      : [handleError][payload:RuntimeException: 我就是故意拋出一個(gè)異常]
2020-08-06 16:47:41.195 ERROR 6692 --- [MessageThread_1] c.e.s.s.r.c.listener.Demo01Consumer      : [handleError][originalMessage:GenericMessage [payload=byte[16], headers={rocketmq_QUEUE_ID=1, rocketmq_TOPIC=ERBADAGANG-TOPIC-01, rocketmq_FLAG=0, rocketmq_RECONSUME_TIMES=0, rocketmq_MESSAGE_ID=C0A82B7C418018B4AAC21D5A5C1D0000, rocketmq_SYS_FLAG=0, id=3c82e982-5fac-f08f-ffe7-e3956d5c6cb5, CLUSTER=DefaultCluster, rocketmq_BORN_HOST=103.3.96.229, contentType=application/json, rocketmq_BORN_TIMESTAMP=1596703661085, timestamp=1596703661195}]]
2020-08-06 16:47:41.195 ERROR 6692 --- [MessageThread_1] c.e.s.s.r.c.listener.Demo01Consumer      : [handleError][headers:{id=9b060a74-d81c-eb9f-c761-2cec21958df8, timestamp=1596703661195}]

不過(guò)要注意,如果異常處理方法成功权她,沒(méi)有重新拋出異常虹茶,會(huì)認(rèn)定為該消息被消費(fèi)成功,所以就不會(huì)進(jìn)行消費(fèi)重試隅要。

底線


本文源代碼使用 Apache License 2.0開(kāi)源許可協(xié)議蝴罪,這里是本文源碼Gitee地址,可通過(guò)命令git clone+地址下載代碼到本地步清,也可直接點(diǎn)擊鏈接通過(guò)瀏覽器方式查看源代碼洲炊。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市尼啡,隨后出現(xiàn)的幾起案子暂衡,更是在濱河造成了極大的恐慌,老刑警劉巖崖瞭,帶你破解...
    沈念sama閱讀 211,817評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件狂巢,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡书聚,警方通過(guò)查閱死者的電腦和手機(jī)唧领,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,329評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)雌续,“玉大人斩个,你說(shuō)我怎么就攤上這事⊙倍牛” “怎么了受啥?”我有些...
    開(kāi)封第一講書人閱讀 157,354評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)鸽心。 經(jīng)常有香客問(wèn)我滚局,道長(zhǎng),這世上最難降的妖魔是什么顽频? 我笑而不...
    開(kāi)封第一講書人閱讀 56,498評(píng)論 1 284
  • 正文 為了忘掉前任藤肢,我火速辦了婚禮,結(jié)果婚禮上糯景,老公的妹妹穿的比我還像新娘嘁圈。我一直安慰自己省骂,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,600評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布最住。 她就那樣靜靜地躺著冀宴,像睡著了一般。 火紅的嫁衣襯著肌膚如雪温学。 梳的紋絲不亂的頭發(fā)上略贮,一...
    開(kāi)封第一講書人閱讀 49,829評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音仗岖,去河邊找鬼逃延。 笑死,一個(gè)胖子當(dāng)著我的面吹牛轧拄,可吹牛的內(nèi)容都是我干的线欲。 我是一名探鬼主播抡秆,決...
    沈念sama閱讀 38,979評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了阻逮?” 一聲冷哼從身側(cè)響起捅暴,我...
    開(kāi)封第一講書人閱讀 37,722評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤芥永,失蹤者是張志新(化名)和其女友劉穎庄蹋,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體卓箫,經(jīng)...
    沈念sama閱讀 44,189評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡载矿,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,519評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了烹卒。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片闷盔。...
    茶點(diǎn)故事閱讀 38,654評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖旅急,靈堂內(nèi)的尸體忽然破棺而出逢勾,到底是詐尸還是另有隱情,我是刑警寧澤藐吮,帶...
    沈念sama閱讀 34,329評(píng)論 4 330
  • 正文 年R本政府宣布溺拱,位于F島的核電站,受9級(jí)特大地震影響炎码,放射性物質(zhì)發(fā)生泄漏盟迟。R本人自食惡果不足惜秋泳,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,940評(píng)論 3 313
  • 文/蒙蒙 一潦闲、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧迫皱,春花似錦歉闰、人聲如沸辖众。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 30,762評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)凹炸。三九已至,卻和暖如春昼弟,著一層夾襖步出監(jiān)牢的瞬間啤它,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 31,993評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工舱痘, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留变骡,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,382評(píng)論 2 360
  • 正文 我出身青樓芭逝,卻偏偏與公主長(zhǎng)得像塌碌,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子旬盯,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,543評(píng)論 2 349