SpringBoot RabbitMQ消息隊列的重試、超時吞瞪、延時馁启、死信隊列

今天介紹使用SpringBoot實現(xiàn)RabbitMQ消息隊列的高級用法。

  • MQ安裝
  • 自動創(chuàng)建
  • 消息重試
  • 消息超時
  • 死信隊列
  • 延時隊列

一芍秆、RabbitMQ的安裝

眾所周知惯疙,RabbitMQ的安裝相對復(fù)雜,需要先安裝Erlang妖啥,再按著對應(yīng)版本的RabbitMQ的服務(wù)端霉颠,最后為了方便管理還需要安裝rabbitmq_management管理端插件,偶爾還會出現(xiàn)一些安裝配置問題荆虱,故十分復(fù)雜蒿偎。
在開發(fā)測試環(huán)境下使用docker來安裝就方便多了,省去了環(huán)境和配置的麻煩怀读。

1. 拉取官方image

docker pull rabbitmq:management

2. 啟動RabbitMQ

docker run -dit --name MyRabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:management

rabbitmq:management: image:tag
--name:指定容器名诉位;
-d:后臺運行容器;
-t:在新容器內(nèi)指定一個偽終端或終端菜枷;
-i:允許你對容器內(nèi)的標(biāo)準(zhǔn)輸入 (STDIN) 進(jìn)行交互苍糠;
-p:指定服務(wù)運行的端口(5672:應(yīng)用訪問端口;15672:控制臺Web端口號)啤誊;
-e:指定環(huán)境變量岳瞭;(RABBITMQ_DEFAULT_USER:默認(rèn)的用戶名拥娄;RABBITMQ_DEFAULT_PASS:默認(rèn)用戶名的密碼);

至此RabbitMQ就安裝啟動完成了瞳筏,可以通過http://localhost:15672 登陸管理后臺稚瘾,用戶名密碼就是上面配置的admin/admin

二、使用SpringBoot自動創(chuàng)建隊列

1. 引入amqp包

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. MQ配置

bootstrap.yml 配置

spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /
    username: admin
    password: admin
    listener:
      simple:
        concurrency: 5
      direct:
        prefetch: 10

concurrency:每個listener在初始化的時候設(shè)置的并發(fā)消費者的個數(shù)
prefetch:每次從一次性從broker里面取的待消費的消息的個數(shù)

rabbitmq-spring.xml配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <!--接收消息的隊列名-->
    <rabbit:queue name="login-user-logined" />
    <!--聲明exchange的名稱與類型-->
    <rabbit:topic-exchange name="login_barryhome_fun">
        <rabbit:bindings>
            <!--queue與exchange的綁定和匹配路由-->
            <rabbit:binding queue="login-user-logined" pattern="login.user.logined"/>
        </rabbit:bindings>
    </rabbit:topic-exchange>
</beans>

rabbit:topic-exchange:聲明為topic消息類型
pattern="login.user.logined":此處是一個表達(dá)式姚炕,可使用“*”表示一個詞摊欠,“#”表示一個或多個詞

3. 消息生產(chǎn)端

@Autowired
RabbitTemplate rabbitTemplate;

@GetMapping("/send")
public LoginUser SendLoginSucceedMessage(){
    LoginUser loginUser = getLoginUser("succeed");
    // 發(fā)送消息
    rabbitTemplate.convertAndSend(MessageConstant.MESSAGE_EXCHANGE,
            MessageConstant.LOGIN_ROUTING_KEY, loginUser);
    return loginUser;
}

@NoArgsConstructor
@AllArgsConstructor
public class LoginUser implements Serializable {
    String userName;
    String realName;
    String userToken;
    Date loginTime;
    String status;
}

這里需要注意的是默認(rèn)情況下消息的轉(zhuǎn)換器為SimpleMessageConverter只能解析stringbyte,故傳遞的消息對象必須是可序列化的柱宦,實現(xiàn)Serializable接口

SimpleMessageConverter only supports String, byte[] and Serializable payloads, received: fun.barryhome.cloud.dto.LoginUser

4. 消息消費端

@Component
public class ReceiverMessage {

    @RabbitListener(queues = "login-user-logined")
    public void receiveLoginMessage(LoginUser loginUser) {
        System.err.println(loginUser);
    }
}

@RabbitListener(queues = "login-user-logined"):用于監(jiān)聽名為login-user-logined 隊列中的消息

5. 自動創(chuàng)建Queue

@SpringBootApplication
@ImportResource(value = "classpath:rabbitmq-spring.xml")
public class MQApplication {
    public static void main(String[] args) {
        SpringApplication.run(MQApplication.class, args);
    }
}

在沒有導(dǎo)入xml且MQ服務(wù)器上沒有列隊的情況下凄硼,會導(dǎo)致找不到相關(guān)queue的錯誤

channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'login-user-logined' in vhost '/', class-id=50, method-id=10)

而導(dǎo)入之后將自動創(chuàng)建 exchangequeue

三、消息重試

默認(rèn)情況下如果有消息消費出錯后會一直重試捷沸,造成消息堵塞


如圖可觀察unackedtotal一直是1,但deliver/get飆升

消息堵塞之后也影響到后續(xù)消息的消費狐史,時間越長越來越多的消息將無法及時消費處理痒给。
如果是單條或極少量的消息有問題可通過多開節(jié)點concurrency將正常的消息消息掉,但如果較多則全部節(jié)點都將堵塞骏全。

如果想遇到消息消費報錯重試幾次就舍棄苍柏,從而不影響后續(xù)消息的消費,如何實現(xiàn)呢姜贡?

spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /
    username: admin
    password: admin
    listener:
      simple:
        concurrency: 5
        prefetch: 10
        retry:
          enabled: true   # 允許消息消費失敗的重試
          max-attempts: 3   # 消息最多消費次數(shù)3次
          initial-interval: 2000    # 消息多次消費的間隔2秒

以上配置允許消息消費失敗后重試3次试吁,每次間隔2秒,如果還是失敗則直接舍棄掉本條消息楼咳。
重試可解決因非消息體本身處理問題產(chǎn)生的臨時性的故障熄捍,而將處理失敗的消息直接舍棄掉只是為其它消息正常處理的權(quán)益之計而以,將業(yè)務(wù)操作降到相對低的影響母怜。

四余耽、消息超時

消息重試可解決因消息處理報錯引起的問題。如果是消息處理過慢導(dǎo)致錯過時效苹熏,除了可在處理邏輯中進(jìn)行處理外碟贾,也可以通過消息的超時機制來處理,設(shè)定超時時間后將消息直接舍棄轨域。

修改rabbitmq-spring.xml

<rabbit:queue name="login-user-logined">
    <rabbit:queue-arguments>
    <entry key="x-message-ttl" value="10000" value-type="java.lang.Long" />
    </rabbit:queue-arguments>
</rabbit:queue>

x-message-ttl:在消息服務(wù)器停留的時間(ms)


如果配置前已存在queue將不能被修改袱耽,需要刪除原有queue后自動創(chuàng)建
創(chuàng)建成功后會在Features中有TTL標(biāo)識

五、死信隊列

死信隊列就是當(dāng)業(yè)務(wù)隊列處理失敗后干发,將消息根據(jù)routingKey轉(zhuǎn)投到另一隊列朱巨,這樣的情況有:

  • 消息被拒絕 (basic.reject or basic.nack) 且?guī)?requeue=false不重新入隊參數(shù)或達(dá)到的retry重新入隊的上限次數(shù)
  • 消息的TTL(Time To Live)-存活時間已經(jīng)過期
  • 隊列長度限制被超越(隊列滿,queue的"x-max-length"參數(shù))

1. 修改rabbitmq-spring.xml

<!--接收消息的隊列名-->
<rabbit:queue name="login-user-logined">
    <rabbit:queue-arguments>
        <entry key="x-message-ttl" value="10000" value-type="java.lang.Long"/>
        <!--死信的交換機-->
        <entry key="x-dead-letter-exchange" value="login_barryhome_fun"/>
        <!--死信發(fā)送的路由-->
        <entry key="x-dead-letter-routing-key" value="login.user.login.dlq"/>
    </rabbit:queue-arguments>
</rabbit:queue>
<rabbit:queue name="login-user-logined-dlq"/>

<!--申明exchange的名稱與類型-->
<rabbit:topic-exchange name="login_barryhome_fun">
    <rabbit:bindings>
        <!--queue與exchange的綁定和匹配路由-->
        <rabbit:binding queue="login-user-logined" pattern="login.user.logined"/>
        <rabbit:binding queue="login-user-logined-dlq" pattern="login.user.login.dlq"/>
    </rabbit:bindings>
</rabbit:topic-exchange>

通過對死信發(fā)送的交換機和路由的的設(shè)置铐然,可將消息轉(zhuǎn)向具體的queue中蔬崩。這里交換機可以和原業(yè)務(wù)隊列不是一個恶座。
當(dāng)login-user-logined中的消息處理失敗后將直接轉(zhuǎn)投向login-user-logined-dlq隊列中。
當(dāng)程序邏輯修復(fù)后可再將消息再移回業(yè)務(wù)隊列中move messages

2. 安裝插件


如圖提示需要先安裝插件

3. 移動消息


安裝成功后就可以輸入業(yè)務(wù)隊列名再轉(zhuǎn)投

六沥阳、延時隊列

延時隊列除了可以做一般的延時處理外跨琳,還可以當(dāng)作單個job的定時任務(wù)處理,比起一般通過定時器去輪詢的方式更優(yōu)雅桐罕。

1. 修改rabbitmq-spring.xml

<rabbit:topic-exchange name="login_barryhome_fun" delayed="true">

初次配置時脉让,如果報以下錯誤,則是服務(wù)器不支持此命令功炮,需要安裝插件

Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=503, reply-text=COMMAND_INVALID - unknown exchange type 'x-delayed-message', class-id=40, method-id=10)

2. 安裝插件

  1. 下載插件:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/v3.8.0

  2. 上傳插件到docker容器中/plugins
    docker ps 查詢rabbitmq的 CONTAINER ID

docker cp rabbitmq_delayed_message_exchange-3.8.0.ez 2c248563a2b0:/plugins
  1. 進(jìn)入docker容器內(nèi)部
docker exec -it 2c248563a2b0 /bin/bash
  1. 安裝插件
cd /plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

具體安裝教程可參考:https://blog.csdn.net/magic_1024/article/details/103840681

安裝成功后重啟程序溅潜,觀察mq管理端的exchange可發(fā)現(xiàn)

3. 發(fā)送延時消息

@GetMapping("/sendDelay")
public LoginUser SendDelayLoginSucceedMessage() {
    LoginUser loginUser = getLoginUser("succeed");

    MessagePostProcessor messagePostProcessor = message -> {
        // 延時10s
        message.getMessageProperties().setHeader("x-delay", 10000);
        return message;
    };

    // 發(fā)送消息
    rabbitTemplate.convertAndSend(MessageConstant.MESSAGE_EXCHANGE,
            MessageConstant.LOGIN_ROUTING_KEY, loginUser, messagePostProcessor);
    return loginUser;
}

需要注意的是消息的發(fā)送是實時的,消息服務(wù)器接收到消息待延時時間后再投到對應(yīng)的queue中

七薪伏、完整代碼

https://gitee.com/hypier/barry-cloud/tree/master/cloud-mq

八滚澜、請關(guān)注我的公眾號

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市嫁怀,隨后出現(xiàn)的幾起案子设捐,更是在濱河造成了極大的恐慌,老刑警劉巖塘淑,帶你破解...
    沈念sama閱讀 212,816評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件萝招,死亡現(xiàn)場離奇詭異,居然都是意外死亡存捺,警方通過查閱死者的電腦和手機槐沼,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,729評論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來捌治,“玉大人岗钩,你說我怎么就攤上這事【叩危” “怎么了凹嘲?”我有些...
    開封第一講書人閱讀 158,300評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長构韵。 經(jīng)常有香客問我周蹭,道長,這世上最難降的妖魔是什么疲恢? 我笑而不...
    開封第一講書人閱讀 56,780評論 1 285
  • 正文 為了忘掉前任凶朗,我火速辦了婚禮,結(jié)果婚禮上显拳,老公的妹妹穿的比我還像新娘棚愤。我一直安慰自己,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 65,890評論 6 385
  • 文/花漫 我一把揭開白布宛畦。 她就那樣靜靜地躺著瘸洛,像睡著了一般。 火紅的嫁衣襯著肌膚如雪次和。 梳的紋絲不亂的頭發(fā)上反肋,一...
    開封第一講書人閱讀 50,084評論 1 291
  • 那天,我揣著相機與錄音踏施,去河邊找鬼石蔗。 笑死,一個胖子當(dāng)著我的面吹牛畅形,可吹牛的內(nèi)容都是我干的养距。 我是一名探鬼主播,決...
    沈念sama閱讀 39,151評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼日熬,長吁一口氣:“原來是場噩夢啊……” “哼棍厌!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起竖席,我...
    開封第一講書人閱讀 37,912評論 0 268
  • 序言:老撾萬榮一對情侶失蹤定铜,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后怕敬,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,355評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡帘皿,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,666評論 2 327
  • 正文 我和宋清朗相戀三年东跪,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片鹰溜。...
    茶點故事閱讀 38,809評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡虽填,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出曹动,到底是詐尸還是另有隱情斋日,我是刑警寧澤,帶...
    沈念sama閱讀 34,504評論 4 334
  • 正文 年R本政府宣布墓陈,位于F島的核電站恶守,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏贡必。R本人自食惡果不足惜兔港,卻給世界環(huán)境...
    茶點故事閱讀 40,150評論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望仔拟。 院中可真熱鬧衫樊,春花似錦、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,882評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至臀栈,卻和暖如春蔫慧,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背挂脑。 一陣腳步聲響...
    開封第一講書人閱讀 32,121評論 1 267
  • 我被黑心中介騙來泰國打工藕漱, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人崭闲。 一個月前我還...
    沈念sama閱讀 46,628評論 2 362
  • 正文 我出身青樓肋联,卻偏偏與公主長得像,于是被迫代替她去往敵國和親刁俭。 傳聞我的和親對象是個殘疾皇子橄仍,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,724評論 2 351