應(yīng)用場(chǎng)景
上一篇《Spring Cloud Stream消費(fèi)失敗后的處理策略(一):自動(dòng)重試》介紹了默認(rèn)就會(huì)生效的消息重試功能。對(duì)于一些因環(huán)境原因、網(wǎng)絡(luò)抖動(dòng)等不穩(wěn)定因素引發(fā)的問(wèn)題可以起到比較好的作用峭梳。但是對(duì)于諸如代碼本身存在的邏輯錯(cuò)誤等,無(wú)論重試多少次都不可能成功的問(wèn)題墅垮,是無(wú)法修復(fù)的积蔚。對(duì)于這樣的情況意鲸,前文中說(shuō)了可以利用日志記錄消息內(nèi)容,配合告警來(lái)做補(bǔ)救尽爆,但是很顯然怎顾,這樣做非常原始,并且太過(guò)笨拙漱贱,處理復(fù)雜度過(guò)高槐雾。所以,我們需要需求更好的辦法幅狮,本文將介紹針對(duì)該類問(wèn)題的一種處理方法:自定義錯(cuò)誤處理邏輯募强。
動(dòng)手試試
準(zhǔn)備一個(gè)會(huì)消費(fèi)失敗的例子,可以直接沿用前文的工程崇摄,也可以新建一個(gè)擎值,然后創(chuàng)建如下代碼的邏輯:
@EnableBinding(TestApplication.TestTopic.class)
@SpringBootApplication
public class TestApplication {
public static void main(String[] args) {
SpringApplication.run(TestApplication.class, args);
}
@RestController
static class TestController {
@Autowired
private TestTopic testTopic;
/**
* 消息生產(chǎn)接口
*
* @param message
* @return
*/
@GetMapping("/sendMessage")
public String messageWithMQ(@RequestParam String message) {
testTopic.output().send(MessageBuilder.withPayload(message).build());
return "ok";
}
}
/**
* 消息消費(fèi)邏輯
*/
@Slf4j
@Component
static class TestListener {
@StreamListener(TestTopic.INPUT)
public void receive(String payload) {
log.info("Received payload : " + payload);
throw new RuntimeException("Message consumer failed!");
}
}
interface TestTopic {
String OUTPUT = "example-topic-output";
String INPUT = "example-topic-input";
@Output(OUTPUT)
MessageChannel output();
@Input(INPUT)
SubscribableChannel input();
}
}
內(nèi)容很簡(jiǎn)單,既包含了消息的生產(chǎn)配猫,也包含了消息消費(fèi)幅恋。消息消費(fèi)的時(shí)候主動(dòng)拋出了一個(gè)異常來(lái)模擬消息的消費(fèi)失敗。
在啟動(dòng)應(yīng)用之前泵肄,還要記得配置一下輸入輸出通道對(duì)應(yīng)的物理目標(biāo)(exchange或topic名)、并設(shè)置一下分組淑翼,比如:
spring.cloud.stream.bindings.example-topic-input.destination=test-topic
spring.cloud.stream.bindings.example-topic-input.group=stream-exception-handler
spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1
spring.cloud.stream.bindings.example-topic-output.destination=test-topic
完成了上面配置之后腐巢,啟動(dòng)應(yīng)用并訪問(wèn)localhost:8080/sendMessage?message=hello
接口來(lái)發(fā)送一個(gè)消息到MQ中了,此時(shí)可以看到消費(fèi)失敗后拋出了異常玄括,跟上一篇文章的結(jié)果一樣冯丙,消息消費(fèi)失敗,記錄了日志,消息信息丟棄胃惜。
下面泞莉,針對(duì)消息消費(fèi)失敗,在TestListener
中針對(duì)消息消費(fèi)邏輯創(chuàng)建一段錯(cuò)誤處理邏輯船殉,比如:
@Slf4j
@Component
static class TestListener {
@StreamListener(TestTopic.INPUT)
public void receive(String payload) {
log.info("Received payload : " + payload);
throw new RuntimeException("Message consumer failed!");
}
/**
* 消息消費(fèi)失敗的降級(jí)處理邏輯
*
* @param message
*/
@ServiceActivator(inputChannel = "test-topic.stream-exception-handler.errors")
public void error(Message<?> message) {
log.info("Message consumer failed, call fallback!");
}
}
通過(guò)使用@ServiceActivator(inputChannel = "test-topic.stream-exception-handler.errors")
指定了某個(gè)通道的錯(cuò)誤處理映射鲫趁。其中,inputChannel的配置中對(duì)應(yīng)關(guān)系如下:
-
test-topic
:消息通道對(duì)應(yīng)的目標(biāo)(destination利虫,即:spring.cloud.stream.bindings.example-topic-input.destination
的配置) -
stream-exception-handler
:消息通道對(duì)應(yīng)的消費(fèi)組(group挨厚,即:spring.cloud.stream.bindings.example-topic-input.group
的配置)
再啟動(dòng)應(yīng)用并訪問(wèn)localhost:8080/sendMessage?message=hello
接口來(lái)發(fā)送一個(gè)消息到MQ中,此時(shí)可以看到日志如下:
2018-12-11 12:00:35.500 INFO 75269 --- [ctor-http-nio-3] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2018-12-11 12:00:35.512 INFO 75269 --- [ctor-http-nio-3] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory.publisher#311db1cb:0/SimpleConnection@40370d8c [delegate=amqp://guest@127.0.0.1:5672/, localPort= 54391]
2018-12-11 12:00:35.527 INFO 75269 --- [ption-handler-1] c.d.stream.TestApplication$TestListener : Received: hello,
2018-12-11 12:00:38.541 INFO 75269 --- [ption-handler-1] c.d.stream.TestApplication$TestListener : Message consumer failed, call fallback!
雖然消費(fèi)邏輯中輸出了消息內(nèi)容之后拋出了異常糠惫,但是會(huì)進(jìn)入到error函數(shù)中疫剃,執(zhí)行錯(cuò)誤處理邏輯(這里只是答應(yīng)了一句話),用戶可以根據(jù)需要讀取消息內(nèi)容以及異常詳情做更進(jìn)一步的細(xì)化處理硼讽。
深入思考
由于error邏輯是通過(guò)編碼方式來(lái)實(shí)現(xiàn)的巢价,所以這段邏輯相對(duì)來(lái)說(shuō)比較死。通常固阁,只有業(yè)務(wù)上有明確的錯(cuò)誤處理邏輯的時(shí)候蹄溉,這種方法才可以比較好的被應(yīng)用到。不然能做的可能也只是將消息記錄下來(lái)您炉,然后具體的分析原因后再去做補(bǔ)救措施柒爵。所以這種方法也不是萬(wàn)能的,主要適用于有明確錯(cuò)誤處理方案的方式來(lái)使用(這種場(chǎng)景并不多)赚爵,另外棉胀。。冀膝。
注意:有坑唁奢! 這個(gè)方案在目前版本(2.0.x)其實(shí)還有一個(gè)坑,這種方式并不能很好的處理異常消息窝剖,會(huì)有部分消息得不到正確的處理麻掸,由于應(yīng)用場(chǎng)景也不多,所以目前不推薦使用這種方法來(lái)做(完全可以用原始的異常捕獲機(jī)制來(lái)處理赐纱,只是沒有這種方式那么優(yōu)雅)脊奋。目前看官方issue是在Spring Cloud Stream的2.1.0版本中會(huì)修復(fù),后續(xù)發(fā)布之后可以使用該功能疙描,具體點(diǎn)擊查看:Issue #1357诚隙。
而對(duì)于沒有特定的錯(cuò)誤處理方案的,也只能通過(guò)記錄和后續(xù)處理來(lái)解決起胰,可能這樣的方式也只是比從日志中抓去簡(jiǎn)單那么一些久又,并沒有得到很大的提升。但是,不要緊地消,因?yàn)橄乱黄覀儗⒗^續(xù)介紹其他更好的處理方案炉峰。
代碼示例
本文示例讀者可以通過(guò)查看下面?zhèn)}庫(kù)的中的stream-exception-handler-2
項(xiàng)目:
如果您對(duì)這些感興趣,歡迎star脉执、follow疼阔、收藏、轉(zhuǎn)發(fā)給予支持适瓦!
以下專題教程也許您會(huì)有興趣
本文首發(fā):http://blog.didispace.com/spring-cloud-starter-finchley-7-3/