1. 概述
老話說的好:出錯(cuò)不怕劲阎,怕的是出了錯(cuò),卻不去改正鸠真。如果屢次出錯(cuò)悯仙,無法改對,就先記下了吠卷,然后找援軍解決锡垄。
言歸正傳,今天來聊一下 Stream 組件的 出錯(cuò)重試 和 死信隊(duì)列撤嫩。
RabbitMQ 鏡像模式集群的搭建偎捎,可參見我的另一篇文章《RabbitMQ 3.9.7 鏡像模式集群的搭建》(http://www.reibang.com/p/187325419e95)
在早期的 SpringCloud 版本中常使用 @Input、@Output序攘、@EnableBinding 和 @StreamListener 注解開發(fā)生產(chǎn)者與消費(fèi)者。
官方原文:Deprecated as of 3.1 in favor of functional programming model寻拂。
SpringCloud 2020.0.4 版本中程奠,已經(jīng)不推薦這么開發(fā)了,因此這里我們也使用新的寫法(函數(shù)式編程方式) 開發(fā)祭钉。
閑話不多說瞄沙,直接上代碼。
2. 消息出錯(cuò)重試
2.1 主要依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 健康檢查 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
2.2 消息實(shí)體類
@Setter
@Getter
public class MyMessage implements java.io.Serializable {
// 消息體
private String payload;
}
2.3 生產(chǎn)者
// 消息出錯(cuò)重試
@GetMapping("/retry")
public String sendRetryMessage(@RequestParam("body") String body) {
MyMessage myMessage = new MyMessage();
myMessage.setPayload(body);
// 生產(chǎn)消息
// 第一個(gè)參數(shù)是綁定名稱,格式為:自定義的綁定名稱-out-0距境,myRetry是自定義的綁定名稱申尼,out代表生產(chǎn)者,0是固定寫法
// 自定義的綁定名稱必須與消費(fèi)方法的方法名保持一致
// 第二個(gè)參數(shù)是發(fā)送的消息實(shí)體
streamBridge.send("myRetry-out-0", myMessage);
return "SUCCESS";
}
2.4 消費(fèi)者
// 消息出錯(cuò)重試
@Bean
public Consumer<MyMessage> myRetry() { // 方法名必須與生產(chǎn)消息時(shí)自定義的綁定名稱一致
return message -> {
log.info("接收消息:{}", message.getPayload());
throw new RuntimeException("消費(fèi)報(bào)錯(cuò)");
};
}
2.5 application.yml 配置
spring:
application:
name: my-stream-new
rabbitmq: # RabbitMQ 配置
addresses: 192.168.1.12:5672
username: guest
password: guest
virtual-host: /
connection-timeout: 16000
cloud:
function:
# 定義消費(fèi)者垫桂,多個(gè)用分號(hào)分隔师幕,當(dāng)存在大于1個(gè)的消費(fèi)者時(shí),不定義不會(huì)生效
definition: myRetry
stream:
bindings:
# 消息出錯(cuò)重試
myRetry-in-0:
destination: my-retry-topic
# 配置重試次數(shù)(本機(jī)重試)
# 次數(shù)等于 1 诬滩,相當(dāng)于不重試
consumer:
max-attempts: 3
myRetry-out-0:
destination: my-retry-topic
2.6 驗(yàn)證消息出錯(cuò)重試
發(fā)送消息接口:
Get http://localhost:49000/stream/retry?body=出錯(cuò)重試消息
自動(dòng)生成的 Exchange
自動(dòng)生成的 Queue
消費(fèi)情況
3. 死信隊(duì)列
3.1 生產(chǎn)者
// 死信隊(duì)列
@GetMapping("/dlq")
public String sendDlqMessage(@RequestParam("body") String body) {
MyMessage myMessage = new MyMessage();
myMessage.setPayload(body);
// 生產(chǎn)消息
// 第一個(gè)參數(shù)是綁定名稱霹粥,格式為:自定義的綁定名稱-out-0,myDlq是自定義的綁定名稱疼鸟,out代表生產(chǎn)者后控,0是固定寫法
// 自定義的綁定名稱必須與消費(fèi)方法的方法名保持一致
// 第二個(gè)參數(shù)是發(fā)送的消息實(shí)體
streamBridge.send("myDlq-out-0", myMessage);
return "SUCCESS";
}
3.2 消費(fèi)者
// 死信隊(duì)列
@Bean
public Consumer<MyMessage> myDlq() { // 方法名必須與生產(chǎn)消息時(shí)自定義的綁定名稱一致
return message -> {
log.info("接收消息:{}", message.getPayload());
throw new RuntimeException("消費(fèi)報(bào)錯(cuò)");
};
}
3.3 application.yml 配置
spring:
application:
name: my-stream-new
rabbitmq: # RabbitMQ 配置
addresses: 192.168.1.12:5672
username: guest
password: guest
virtual-host: /
connection-timeout: 16000
cloud:
function:
# 定義消費(fèi)者,多個(gè)用分號(hào)分隔空镜,當(dāng)存在大于1個(gè)的消費(fèi)者時(shí)浩淘,不定義不會(huì)生效
definition: myRetry;myDlq
stream:
bindings:
# 消息出錯(cuò)重試
myRetry-in-0:
destination: my-retry-topic
# 配置重試次數(shù)(本機(jī)重試)
# 次數(shù)等于 1 ,相當(dāng)于不重試
consumer:
max-attempts: 3
myRetry-out-0:
destination: my-retry-topic
# 死信隊(duì)列
myDlq-in-0:
destination: my-dlq-topic
group: dlq-group
# 配置重試次數(shù)(本機(jī)重試)
# 次數(shù)等于 1 吴攒,相當(dāng)于不重試
consumer:
max-attempts: 3
myDlq-out-0:
destination: my-dlq-topic
rabbit:
bindings:
# 死信隊(duì)列
myDlq-in-0:
consumer:
autoBindDlq: true # 自動(dòng)綁定死信隊(duì)列馋袜,會(huì)自動(dòng)創(chuàng)建一個(gè)默認(rèn)的死信隊(duì)列
myDlq-out-0:
producer:
autoBindDlq: true # 自動(dòng)綁定死信隊(duì)列,會(huì)自動(dòng)創(chuàng)建一個(gè)默認(rèn)的死信隊(duì)列
3.4 驗(yàn)證死信隊(duì)列
發(fā)送消息接口:
GET http://localhost:49000/stream/dlq?body=死信隊(duì)列
自動(dòng)生成的 Exchange
自動(dòng)生成的 Queue
消費(fèi)情況
死信隊(duì)列情況
4. 綜述
今天聊了一下 SpringCloud Stream 組件 消息出錯(cuò)重試 與 死信隊(duì)列 的實(shí)現(xiàn) 舶斧,希望可以對大家的工作有所幫助欣鳖。
歡迎幫忙點(diǎn)贊、評論茴厉、轉(zhuǎn)發(fā)泽台、加關(guān)注 :)
關(guān)注追風(fēng)人聊Java,每天更新Java干貨矾缓。