應(yīng)用場景
前兩天我們已經(jīng)介紹了兩種Spring Cloud Stream對(duì)消息失敗的處理策略:
- 自動(dòng)重試:對(duì)于一些因環(huán)境原因(如:網(wǎng)絡(luò)抖動(dòng)等不穩(wěn)定因素)引發(fā)的問題可以起到比較好的作用胸嘴,提高消息處理的成功率变汪。
- 自定義錯(cuò)誤處理邏輯:如果業(yè)務(wù)上,消息處理失敗之后有明確的降級(jí)邏輯可以彌補(bǔ)的产弹,可以采用這種方式,但是2.0.x版本有Bug,2.1.x版本修復(fù)。
那么如果代碼本身存在邏輯錯(cuò)誤斯撮,無論重試多少次都不可能成功,也沒有具體的降級(jí)業(yè)務(wù)邏輯悦即,之前在深入思考中討論過吮成,可以通過日志橱乱,或者降級(jí)邏輯記錄的方式把錯(cuò)誤消息保存下來辜梳,然后事后分析、修復(fù)Bug再重新處理泳叠。但是很顯然作瞄,這樣做非常原始,并且太過笨拙危纫,處理復(fù)雜度過高宗挥。所以,本文將介紹利用中間件特性來便捷地處理該問題的方案:使用RabbitMQ的DLQ隊(duì)列种蝶。
動(dòng)手試試
準(zhǔn)備一個(gè)會(huì)消費(fèi)失敗的例子契耿,可以直接沿用前文的工程。也可以新建一個(gè)螃征,然后創(chuàng)建如下代碼的邏輯:
@EnableBinding(TestApplication.TestTopic.class)
@SpringBootApplication
public class TestApplication {
public static void main(String[] args) {
SpringApplication.run(TestApplication.class, args);
}
@RestController
static class TestController {
@Autowired
private TestTopic testTopic;
/**
* 消息生產(chǎn)接口
*
* @param message
* @return
*/
@GetMapping("/sendMessage")
public String messageWithMQ(@RequestParam String message) {
testTopic.output().send(MessageBuilder.withPayload(message).build());
return "ok";
}
}
/**
* 消息消費(fèi)邏輯
*/
@Slf4j
@Component
static class TestListener {
@StreamListener(TestTopic.INPUT)
public void receive(String payload) {
log.info("Received payload : " + payload);
throw new RuntimeException("Message consumer failed!");
}
}
interface TestTopic {
String OUTPUT = "example-topic-output";
String INPUT = "example-topic-input";
@Output(OUTPUT)
MessageChannel output();
@Input(INPUT)
SubscribableChannel input();
}
}
內(nèi)容很簡單搪桂,既包含了消息的生產(chǎn),也包含了消息消費(fèi)盯滚。消息消費(fèi)的時(shí)候主動(dòng)拋出了一個(gè)異常來模擬消息的消費(fèi)失敗踢械。
在啟動(dòng)應(yīng)用之前,還要記得配置一下輸入輸出通道對(duì)應(yīng)的物理目標(biāo)(exchange或topic名)魄藕、并設(shè)置一下分組内列,比如:
spring.cloud.stream.bindings.example-topic-input.destination=test-topic
spring.cloud.stream.bindings.example-topic-input.group=stream-exception-handler
spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1
spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.auto-bind-dlq=true
spring.cloud.stream.bindings.example-topic-output.destination=test-topic
這里加入了一個(gè)重要配置spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.auto-bind-dlq=true
,用來開啟DLQ(死信隊(duì)列)背率。完成了上面配置之后话瞧,啟動(dòng)應(yīng)用并訪問localhost:8080/sendMessage?message=hello
接口來發(fā)送一個(gè)消息到MQ中了嫩与,此時(shí)可以看到消費(fèi)失敗后拋出了異常,消息消費(fèi)失敗交排,記錄了日志蕴纳。此時(shí),可以查看RabbitMQ的控制臺(tái)如下:
其中个粱,test-topic.stream-exception-handler.dlq
隊(duì)列就是test-topic.stream-exception-handler
的dlq(死信)隊(duì)列古毛,當(dāng)test-topic.stream-exception-handler
隊(duì)列中的消息消費(fèi)時(shí)候之后,就會(huì)將這條消息原封不動(dòng)的轉(zhuǎn)存到dlq隊(duì)列中都许。這樣這些沒有得到妥善處理的消息就通過簡單的配置實(shí)現(xiàn)了存儲(chǔ)稻薇,之后,我們還可以通過簡單的操作對(duì)這些消息進(jìn)行重新消費(fèi)胶征。我們只需要在控制臺(tái)中點(diǎn)擊test-topic.stream-exception-handler.dlq
隊(duì)列的名字進(jìn)入到詳情頁面之后塞椎,使用Move messages
功能,直接將這些消息移動(dòng)回test-topic.stream-exception-handler
隊(duì)列睛低,這樣這些消息就能重新被消費(fèi)一次案狠。
如果Move messages功能中是如下內(nèi)容:
To move messages, the shovel plugin must be enabled, try:
$ rabbitmq-plugins enable rabbitmq_shovel rabbitmq_shovel_management
那是由于沒有安裝對(duì)應(yīng)的插件,只需要根據(jù)提示的命令安裝就能使用該命令了钱雷。
深入思考
先來總結(jié)一下在引入了RabbitMQ的DLQ之后骂铁,對(duì)于消息異常處理更為完整一些的基本思路:
- 瞬時(shí)的環(huán)境抖動(dòng)引起的異常,利用重試功能提高處理成功率
- 如果重試依然失敗的罩抗,日志報(bào)錯(cuò)拉庵,并進(jìn)入DLQ隊(duì)列
- 日志告警通知相關(guān)開發(fā)人員,分析問題原因
- 解決問題(修復(fù)程序Bug套蒂、擴(kuò)容等措施)之后钞支,DLQ隊(duì)列中的消息移回重新處理
在這樣的整體思路中,可能還涉及一些微調(diào)操刀,這里舉幾個(gè)常見例子烁挟,幫助讀者進(jìn)一步了解一些特殊的場景和配置使用!
場景一:有些消息在業(yè)務(wù)上存在時(shí)效性骨坑,進(jìn)入死信隊(duì)列之后撼嗓,過一段時(shí)間再處理已經(jīng)沒有意義,這個(gè)時(shí)候如何過濾這些消息呢卡啰?
只需要配置一個(gè)參數(shù)即可:
spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.dlq-ttl=10000
該參數(shù)可以控制DLQ隊(duì)列中消息的存活時(shí)間静稻,當(dāng)超過配置時(shí)間之后,該消息會(huì)自動(dòng)的從DLQ隊(duì)列中移除匈辱。
場景二:可能進(jìn)入DLQ隊(duì)列的消息存在各種不同的原因(不同異常造成的)振湾,此時(shí)如果在做補(bǔ)救措施的時(shí)候,還希望根據(jù)這些異常做不同的處理時(shí)候亡脸,我們?nèi)绾螀^(qū)分這些消息進(jìn)入DLQ的原因呢押搪?
再來看看這個(gè)參數(shù):
spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.republish-to-dlq=true
該參數(shù)默認(rèn)是false树酪,如果設(shè)置了死信隊(duì)列的時(shí)候,會(huì)將消息原封不動(dòng)的發(fā)送到死信隊(duì)列(也就是上面例子中的實(shí)現(xiàn))大州,此時(shí)大家可以在RabbitMQ控制臺(tái)中通過Get message(s)
功能來看看隊(duì)列中的消息续语,應(yīng)該如下圖所示:
這是一條原始消息。
如果我們?cè)撆渲迷O(shè)置為true的時(shí)候厦画,那么該消息在進(jìn)入到死信隊(duì)列的時(shí)候疮茄,會(huì)在headers中加入錯(cuò)誤信息,如下圖所示:
這樣根暑,不論我們是通過移回原通道處理還是新增訂閱處理這些消息的時(shí)候就可以以此作為依據(jù)進(jìn)行分類型處理了力试。
關(guān)于RabbitMQ的binder中還有很多關(guān)于DLQ的配置,這里不一一介紹了排嫌,上面幾個(gè)是目前筆者使用過的幾個(gè)畸裳,其他一些暫時(shí)認(rèn)為采用默認(rèn)配置已經(jīng)夠用,除非還有其他定制要求淳地,或者是存量內(nèi)容怖糊,需要去適配才會(huì)去配置。讀者可以查看官方文檔了解更多詳情颇象!
代碼示例
本文示例讀者可以通過查看下面?zhèn)}庫的中的stream-exception-handler-3
項(xiàng)目:
如果您對(duì)這些感興趣伍伤,歡迎star、follow夯到、收藏嚷缭、轉(zhuǎn)發(fā)給予支持!
以下專題教程也許您會(huì)有興趣
本文首發(fā):http://blog.didispace.com/spring-cloud-starter-finchley-7-4/