在 Spring Cloud Stream 中椭盏,提供了通用的消費(fèi)異常處理機(jī)制组砚,可以攔截到消費(fèi)者消費(fèi)消息時(shí)發(fā)生的異常,進(jìn)行自定義的處理邏輯掏颊。
4.1 復(fù)制項(xiàng)目
將消費(fèi)重試小節(jié)的 [sca-stream-rocketmq-consumer-retry
](糟红,復(fù)制出 sca-stream-rocketmq-consumer-error-handler
。
注意修改:<artifactId>sc-stream-rocketmq-consumer-error-handler</artifactId>
.
4.2 Demo01Consumer
修改 [Demo01Consumer] 類乌叶,增加消費(fèi)異常處理方法盆偿。完整代碼如下:
package com.erbadagang.springcloudalibaba.stream.rocketmq.consumer.listener;
import com.erbadagang.springcloudalibaba.stream.rocketmq.consumer.message.Demo01Message;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.stereotype.Component;
@Component
public class Demo01Consumer {
private Logger logger = LoggerFactory.getLogger(getClass());
@StreamListener(MySink.ERBADAGANG_INPUT)// 對(duì)應(yīng) ERBADAGANG-TOPIC-01.erbadagang-consumer-group-ERBADAGANG-TOPIC-01
public void onMessage(@Payload Demo01Message message) {
logger.info("[onMessage][線程編號(hào):{} 消息內(nèi)容:{}]", Thread.currentThread().getId(), message);
// <1> 注意,此處拋出一個(gè) RuntimeException 異常准浴,模擬消費(fèi)失敗
throw new RuntimeException("我就是故意拋出一個(gè)異常");
}
@ServiceActivator(inputChannel = "ERBADAGANG-TOPIC-01.erbadagang-consumer-group-ERBADAGANG-TOPIC-01.errors")
public void handleError(ErrorMessage errorMessage) {
logger.error("[handleError][payload:{}]", ExceptionUtils.getRootCauseMessage(errorMessage.getPayload()));
logger.error("[handleError][originalMessage:{}]", errorMessage.getOriginalMessage());
logger.error("[handleError][headers:{}]", errorMessage.getHeaders());
}
@StreamListener(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) // errorChannel
public void globalHandleError(ErrorMessage errorMessage) {
logger.error("[globalHandleError][payload:{}]", ExceptionUtils.getRootCauseMessage(errorMessage.getPayload()));
logger.error("[globalHandleError][originalMessage:{}]", errorMessage.getOriginalMessage());
logger.error("[globalHandleError][headers:{}]", errorMessage.getHeaders());
}
}
① 在 Spring Integration 的設(shè)定中事扭,若 #onMessage(@Payload Demo01Message message)
方法消費(fèi)消息發(fā)生異常時(shí),會(huì)發(fā)送錯(cuò)誤消息(ErrorMessage)到對(duì)應(yīng)的錯(cuò)誤 Channel(<destination>.<group>.errors
)中乐横。同時(shí)求橄,所有錯(cuò)誤 Channel 都橋接到了 Spring Integration 定義的全局錯(cuò)誤 Channel(errorChannel
)。
因此晰奖,我們有兩種方式來(lái)實(shí)現(xiàn)異常處理:
- 局部的異常處理:通過(guò)訂閱指定錯(cuò)誤 Channel
- 全局的異常處理:通過(guò)訂閱全局錯(cuò)誤 Channel
② 在 #handleError(ErrorMessage errorMessage)
方法上谈撒,我們聲明了 @ServiceActivator
注解腥泥,訂閱指定錯(cuò)誤 Channel的錯(cuò)誤消息匾南,實(shí)現(xiàn) #onMessage(@Payload Demo01Message message)
方法的局部異常處理。如下圖所示:
③ 在 #globalHandleError(ErrorMessage errorMessage) 方法上蛔外,我們聲明了 @StreamListener 注解蛆楞,訂閱全局錯(cuò)誤 Channel的錯(cuò)誤消息,實(shí)現(xiàn)全局異常處理夹厌。
④ 在全局和局部異常處理都定義的情況下豹爹,錯(cuò)誤消息僅會(huì)被符合條件的局部錯(cuò)誤異常處理。如果沒(méi)有符合條件的矛纹,錯(cuò)誤消息才會(huì)被全局異常處理臂聋。
4.3 簡(jiǎn)單測(cè)試
① 執(zhí)行 ConsumerApplication,啟動(dòng)消費(fèi)者的實(shí)例。
② 執(zhí)行 ProducerApplication孩等,啟動(dòng)生產(chǎn)者的實(shí)例艾君。
之后,請(qǐng)求 http://127.0.0.1:18080/demo01/send 接口肄方,發(fā)送一條消息冰垄。IDEA 控制臺(tái)輸出日志如下:
// onMessage 方法
2020-08-06 16:47:41.195 INFO 6692 --- [MessageThread_1] c.e.s.s.r.c.listener.Demo01Consumer : [onMessage][線程編號(hào):71 消息內(nèi)容:Demo01Message{id=742970346}]
// handleError 方法
2020-08-06 16:47:41.195 ERROR 6692 --- [MessageThread_1] c.e.s.s.r.c.listener.Demo01Consumer : [handleError][payload:RuntimeException: 我就是故意拋出一個(gè)異常]
2020-08-06 16:47:41.195 ERROR 6692 --- [MessageThread_1] c.e.s.s.r.c.listener.Demo01Consumer : [handleError][originalMessage:GenericMessage [payload=byte[16], headers={rocketmq_QUEUE_ID=1, rocketmq_TOPIC=ERBADAGANG-TOPIC-01, rocketmq_FLAG=0, rocketmq_RECONSUME_TIMES=0, rocketmq_MESSAGE_ID=C0A82B7C418018B4AAC21D5A5C1D0000, rocketmq_SYS_FLAG=0, id=3c82e982-5fac-f08f-ffe7-e3956d5c6cb5, CLUSTER=DefaultCluster, rocketmq_BORN_HOST=103.3.96.229, contentType=application/json, rocketmq_BORN_TIMESTAMP=1596703661085, timestamp=1596703661195}]]
2020-08-06 16:47:41.195 ERROR 6692 --- [MessageThread_1] c.e.s.s.r.c.listener.Demo01Consumer : [handleError][headers:{id=9b060a74-d81c-eb9f-c761-2cec21958df8, timestamp=1596703661195}]
不過(guò)要注意,如果異常處理方法成功权她,沒(méi)有重新拋出異常虹茶,會(huì)認(rèn)定為該消息被消費(fèi)成功,所以就不會(huì)進(jìn)行消費(fèi)重試隅要。
底線
本文源代碼使用 Apache License 2.0開(kāi)源許可協(xié)議蝴罪,這里是本文源碼Gitee地址,可通過(guò)命令git clone+地址
下載代碼到本地步清,也可直接點(diǎn)擊鏈接通過(guò)瀏覽器方式查看源代碼洲炊。