今天介紹使用SpringBoot實現(xiàn)RabbitMQ消息隊列的高級用法。
- MQ安裝
- 自動創(chuàng)建
- 消息重試
- 消息超時
- 死信隊列
- 延時隊列
一芍秆、RabbitMQ的安裝
眾所周知惯疙,RabbitMQ
的安裝相對復(fù)雜,需要先安裝Erlang妖啥,再按著對應(yīng)版本的RabbitMQ的服務(wù)端霉颠,最后為了方便管理還需要安裝rabbitmq_management管理端插件,偶爾還會出現(xiàn)一些安裝配置問題荆虱,故十分復(fù)雜蒿偎。
在開發(fā)測試環(huán)境下使用docker
來安裝就方便多了,省去了環(huán)境和配置的麻煩怀读。
1. 拉取官方image
docker pull rabbitmq:management
2. 啟動RabbitMQ
docker run -dit --name MyRabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:management
rabbitmq:management: image:tag
--name:指定容器名诉位;
-d:后臺運行容器;
-t:在新容器內(nèi)指定一個偽終端或終端菜枷;
-i:允許你對容器內(nèi)的標(biāo)準(zhǔn)輸入 (STDIN) 進(jìn)行交互苍糠;
-p:指定服務(wù)運行的端口(5672:應(yīng)用訪問端口;15672:控制臺Web端口號)啤誊;
-e:指定環(huán)境變量岳瞭;(RABBITMQ_DEFAULT_USER:默認(rèn)的用戶名拥娄;RABBITMQ_DEFAULT_PASS:默認(rèn)用戶名的密碼);
至此RabbitMQ就安裝啟動完成了瞳筏,可以通過http://localhost:15672 登陸管理后臺稚瘾,用戶名密碼就是上面配置的admin/admin
二、使用SpringBoot自動創(chuàng)建隊列
1. 引入amqp包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2. MQ配置
bootstrap.yml 配置
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /
username: admin
password: admin
listener:
simple:
concurrency: 5
direct:
prefetch: 10
concurrency
:每個listener在初始化的時候設(shè)置的并發(fā)消費者的個數(shù)
prefetch
:每次從一次性從broker里面取的待消費的消息的個數(shù)
rabbitmq-spring.xml配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--接收消息的隊列名-->
<rabbit:queue name="login-user-logined" />
<!--聲明exchange的名稱與類型-->
<rabbit:topic-exchange name="login_barryhome_fun">
<rabbit:bindings>
<!--queue與exchange的綁定和匹配路由-->
<rabbit:binding queue="login-user-logined" pattern="login.user.logined"/>
</rabbit:bindings>
</rabbit:topic-exchange>
</beans>
rabbit:topic-exchange
:聲明為topic消息類型
pattern="login.user.logined"
:此處是一個表達(dá)式姚炕,可使用“*”表示一個詞摊欠,“#”表示一個或多個詞
3. 消息生產(chǎn)端
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public LoginUser SendLoginSucceedMessage(){
LoginUser loginUser = getLoginUser("succeed");
// 發(fā)送消息
rabbitTemplate.convertAndSend(MessageConstant.MESSAGE_EXCHANGE,
MessageConstant.LOGIN_ROUTING_KEY, loginUser);
return loginUser;
}
@NoArgsConstructor
@AllArgsConstructor
public class LoginUser implements Serializable {
String userName;
String realName;
String userToken;
Date loginTime;
String status;
}
這里需要注意的是默認(rèn)情況下消息的轉(zhuǎn)換器為SimpleMessageConverter
只能解析string和byte,故傳遞的消息對象必須是可序列化的柱宦,實現(xiàn)Serializable
接口
SimpleMessageConverter only supports String, byte[] and Serializable payloads, received: fun.barryhome.cloud.dto.LoginUser
4. 消息消費端
@Component
public class ReceiverMessage {
@RabbitListener(queues = "login-user-logined")
public void receiveLoginMessage(LoginUser loginUser) {
System.err.println(loginUser);
}
}
@RabbitListener(queues = "login-user-logined")
:用于監(jiān)聽名為login-user-logined 隊列中的消息
5. 自動創(chuàng)建Queue
@SpringBootApplication
@ImportResource(value = "classpath:rabbitmq-spring.xml")
public class MQApplication {
public static void main(String[] args) {
SpringApplication.run(MQApplication.class, args);
}
}
在沒有導(dǎo)入xml且MQ服務(wù)器上沒有列隊的情況下凄硼,會導(dǎo)致找不到相關(guān)queue的錯誤
channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'login-user-logined' in vhost '/', class-id=50, method-id=10)
而導(dǎo)入之后將自動創(chuàng)建
exchange和queue
三、消息重試
默認(rèn)情況下如果有消息消費出錯后會一直重試捷沸,造成消息堵塞
如圖可觀察unacked和total一直是1,但deliver/get飆升
消息堵塞之后也影響到后續(xù)消息的消費狐史,時間越長越來越多的消息將無法及時消費處理痒给。
如果是單條或極少量的消息有問題可通過多開節(jié)點concurrency
將正常的消息消息掉,但如果較多則全部節(jié)點都將堵塞骏全。
如果想遇到消息消費報錯重試幾次就舍棄苍柏,從而不影響后續(xù)消息的消費,如何實現(xiàn)呢姜贡?
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /
username: admin
password: admin
listener:
simple:
concurrency: 5
prefetch: 10
retry:
enabled: true # 允許消息消費失敗的重試
max-attempts: 3 # 消息最多消費次數(shù)3次
initial-interval: 2000 # 消息多次消費的間隔2秒
以上配置允許消息消費失敗后重試3次试吁,每次間隔2秒,如果還是失敗則直接舍棄掉本條消息楼咳。
重試可解決因非消息體本身處理問題產(chǎn)生的臨時性的故障熄捍,而將處理失敗的消息直接舍棄掉只是為其它消息正常處理的權(quán)益之計而以,將業(yè)務(wù)操作降到相對低的影響母怜。
四余耽、消息超時
消息重試
可解決因消息處理報錯引起的問題。如果是消息處理過慢導(dǎo)致錯過時效苹熏,除了可在處理邏輯中進(jìn)行處理外碟贾,也可以通過消息的超時機制來處理,設(shè)定超時時間后將消息直接舍棄轨域。
修改rabbitmq-spring.xml
<rabbit:queue name="login-user-logined">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value="10000" value-type="java.lang.Long" />
</rabbit:queue-arguments>
</rabbit:queue>
x-message-ttl
:在消息服務(wù)器停留的時間(ms)
如果配置前已存在queue將不能被修改袱耽,需要刪除原有queue后自動創(chuàng)建
創(chuàng)建成功后會在Features中有TTL標(biāo)識
五、死信隊列
死信隊列就是當(dāng)業(yè)務(wù)隊列處理失敗后干发,將消息根據(jù)routingKey轉(zhuǎn)投到另一隊列朱巨,這樣的情況有:
- 消息被拒絕 (basic.reject or basic.nack) 且?guī)?requeue=false不重新入隊參數(shù)或達(dá)到的retry重新入隊的上限次數(shù)
- 消息的TTL(Time To Live)-存活時間已經(jīng)過期
- 隊列長度限制被超越(隊列滿,queue的"x-max-length"參數(shù))
1. 修改rabbitmq-spring.xml
<!--接收消息的隊列名-->
<rabbit:queue name="login-user-logined">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value="10000" value-type="java.lang.Long"/>
<!--死信的交換機-->
<entry key="x-dead-letter-exchange" value="login_barryhome_fun"/>
<!--死信發(fā)送的路由-->
<entry key="x-dead-letter-routing-key" value="login.user.login.dlq"/>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:queue name="login-user-logined-dlq"/>
<!--申明exchange的名稱與類型-->
<rabbit:topic-exchange name="login_barryhome_fun">
<rabbit:bindings>
<!--queue與exchange的綁定和匹配路由-->
<rabbit:binding queue="login-user-logined" pattern="login.user.logined"/>
<rabbit:binding queue="login-user-logined-dlq" pattern="login.user.login.dlq"/>
</rabbit:bindings>
</rabbit:topic-exchange>
通過對死信發(fā)送的交換機和路由的的設(shè)置铐然,可將消息轉(zhuǎn)向具體的queue中蔬崩。這里交換機可以和原業(yè)務(wù)隊列不是一個恶座。
當(dāng)login-user-logined
中的消息處理失敗后將直接轉(zhuǎn)投向login-user-logined-dlq
隊列中。
當(dāng)程序邏輯修復(fù)后可再將消息再移回業(yè)務(wù)隊列中move messages
2. 安裝插件
如圖提示需要先安裝插件
3. 移動消息
安裝成功后就可以輸入業(yè)務(wù)隊列名再轉(zhuǎn)投
六沥阳、延時隊列
延時隊列除了可以做一般的延時處理外跨琳,還可以當(dāng)作單個job的定時任務(wù)處理,比起一般通過定時器去輪詢的方式更優(yōu)雅桐罕。
1. 修改rabbitmq-spring.xml
<rabbit:topic-exchange name="login_barryhome_fun" delayed="true">
初次配置時脉让,如果報以下錯誤,則是服務(wù)器不支持此命令功炮,需要安裝插件
Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=503, reply-text=COMMAND_INVALID - unknown exchange type 'x-delayed-message', class-id=40, method-id=10)
2. 安裝插件
下載插件:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/v3.8.0
-
上傳插件到docker容器中/plugins
docker ps
查詢rabbitmq的 CONTAINER ID
docker cp rabbitmq_delayed_message_exchange-3.8.0.ez 2c248563a2b0:/plugins
- 進(jìn)入docker容器內(nèi)部
docker exec -it 2c248563a2b0 /bin/bash
- 安裝插件
cd /plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
具體安裝教程可參考:https://blog.csdn.net/magic_1024/article/details/103840681
安裝成功后重啟程序溅潜,觀察mq管理端的exchange可發(fā)現(xiàn)
3. 發(fā)送延時消息
@GetMapping("/sendDelay")
public LoginUser SendDelayLoginSucceedMessage() {
LoginUser loginUser = getLoginUser("succeed");
MessagePostProcessor messagePostProcessor = message -> {
// 延時10s
message.getMessageProperties().setHeader("x-delay", 10000);
return message;
};
// 發(fā)送消息
rabbitTemplate.convertAndSend(MessageConstant.MESSAGE_EXCHANGE,
MessageConstant.LOGIN_ROUTING_KEY, loginUser, messagePostProcessor);
return loginUser;
}
需要注意的是消息的發(fā)送是
實時
的,消息服務(wù)器接收到消息待延時時間后再投到對應(yīng)的queue中
七薪伏、完整代碼
https://gitee.com/hypier/barry-cloud/tree/master/cloud-mq