SpringBoot整合RabbitMQ——RabbitMQ進(jìn)階

rabbitMQ如何保證如果消息發(fā)送失敗,保證其消息不丟失狐胎、怎么設(shè)置消息過(guò)期時(shí)間以及死信隊(duì)列是如何在消息消費(fèi)失敗時(shí)保證消息不丟失的掸犬、如何使用過(guò)期時(shí)間來(lái)實(shí)現(xiàn)延遲隊(duì)列以及rabbitMQ的持久化跟匆、消息確認(rèn)的機(jī)制是怎樣的?本博文將具體介紹上述內(nèi)容

本博文中的代碼實(shí)現(xiàn)實(shí)在SpringBoot整合RabbitMQ——消息的發(fā)送和接收的基礎(chǔ)上實(shí)現(xiàn)了咱枉,完整的代碼可以查看Gitee上的項(xiàng)目rabbitmq

rabbitMQ如何保證消息的不丟失

消息的丟失有以下四種情況:

  1. 消息發(fā)送到RabbitMQ服務(wù)器,交換機(jī)根據(jù)自身的類(lèi)型和路由鍵無(wú)法匹配到隊(duì)列徒恋,導(dǎo)致消息丟失
  2. 消息設(shè)置了過(guò)期時(shí)間蚕断,消息過(guò)期了導(dǎo)致消息丟失
  3. 消息不能被正確的消費(fèi),導(dǎo)致消息的丟失
  4. 因?yàn)榉?wù)器的崩潰導(dǎo)致消息的丟失

針對(duì)以上的情況入挣,rabbitMQ提供了不同的解決方案

消息設(shè)置mandatory參數(shù)和使用備份交換機(jī)

mandatory參數(shù)

在上篇博文中我們其實(shí)已經(jīng)在配置文件中配置了mandatory的參數(shù)亿乳,并且在connectionFactory中設(shè)置了其中的參數(shù)

    spring:
      rabbitmq:
        template:
          mandatory: true
        publisher-confirms: true
        publisher-returns: true

然后再發(fā)送消息的時(shí)候設(shè)置回調(diào)函數(shù)

    /**
     * 確認(rèn)后回調(diào)方法
     *
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public final void confirm(CorrelationData correlationData, boolean ack, String cause) {
        this.logger.info("confirm-----correlationData:" + correlationData.toString() + "---ack:" + ack + "----cause:" + cause);
        // TODO 記錄日志(數(shù)據(jù)庫(kù)或者es)
        this.handleConfirmCallback(correlationData.getId(), ack, cause);
    }

    /**
     * 失敗后回調(diào)方法
     *
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public final void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        this.logger.info("return-----message:" + message.toString() + "---replyCode:" + replyCode + "----replyText:" + replyText + "----exchange:" + exchange + "----routingKey:" + routingKey);
        // TODO 記錄日志(數(shù)據(jù)庫(kù)或者es)
        this.handleReturnCallback(message, replyCode, replyText, routingKey);
    }

用戶(hù)可以在這里對(duì)消息進(jìn)行本地持久化,其實(shí)上面的也叫消息確認(rèn)模式径筏,發(fā)送端將消息發(fā)送給RabbitMQ葛假,rabbitMQ會(huì)異步回調(diào)Confirm方法,告訴發(fā)送方滋恬,RabbitMQ服務(wù)端有沒(méi)有收到消息聊训,如果沒(méi)有收到消息的話,原因是什么夷恍。同時(shí)會(huì)異步回調(diào)設(shè)置的returnedMessage將發(fā)送的消息返回

備份交換機(jī)

除了上面的消息確認(rèn)模式魔眨,還有一種備份交換機(jī)的方案也是可以解決消息的丟失問(wèn)題,具體的邏輯如下:

  1. 聲明一個(gè)交換機(jī)A酿雪,其類(lèi)型為fanout類(lèi)型
  2. 聲明一個(gè)交換機(jī)B遏暴,設(shè)置其alternate-exchange屬性為交換機(jī)A
  3. 聲明一個(gè)隊(duì)列a,并且與交換機(jī)A綁定
  4. 聲明一個(gè)隊(duì)列b指黎,并且與交換機(jī)B綁定朋凉,路由鍵為rb

這樣我們就實(shí)現(xiàn)了備份交換機(jī)功能,其業(yè)務(wù)實(shí)現(xiàn)邏輯如下:
我們發(fā)送一個(gè)消息到交換機(jī)B上醋安,當(dāng)路由鍵等于rb時(shí)杂彭,消息會(huì)正確發(fā)送到隊(duì)列b上墓毒,當(dāng)路由鍵不等于rb時(shí),即消息不能正確的發(fā)送到隊(duì)列b上亲怠,此時(shí)就會(huì)發(fā)送給交換機(jī)A所计,由于交換機(jī)A是fanout類(lèi)型的,所以消息會(huì)被進(jìn)一步發(fā)送到隊(duì)列a上团秽。這樣我們就實(shí)現(xiàn)了發(fā)送方消息的不丟失主胧。

[圖片上傳失敗...(image-5a7baf-1562419602501)]

代碼實(shí)現(xiàn)這里給出alternaate-exchange的實(shí)現(xiàn)

    Map<String,Object> map = new HashMap<String,Object>();
    map.put("alternate-exchange","A");
    MqExchange exchange = new MqExchange().arguments(map).name("B").type(ExchangeTypeEnum.DIRECT.getCode());
    amExchangeDeclare.declareExchange(exchange);

過(guò)期時(shí)間和死信隊(duì)列

過(guò)期時(shí)間

我們常見(jiàn)的購(gòu)物車(chē)訂單,一般有這樣的需求习勤,在規(guī)定的時(shí)間內(nèi)沒(méi)有付款的話踪栋,該訂單就會(huì)失效,這里常見(jiàn)就是使用消息的過(guò)期時(shí)間來(lái)控制的图毕,在rabbitMQ中實(shí)現(xiàn)過(guò)期時(shí)間有兩種方式

  • 設(shè)置隊(duì)列的過(guò)期時(shí)間夷都,則該隊(duì)列中所有的消息的過(guò)期時(shí)間都是一樣的
    // 設(shè)置隊(duì)列的過(guò)期時(shí)間
    Map<String,Object> map = new HashMap<String,Object>();
    map.put("x-message-ttl",6000);
    MqQueue queue = new MqQueue().name(queueName).arguments(map);
    amQueueDeclare.declareQueue(queue);

如果不設(shè)置ttl(Time To Live),則這個(gè)消息不會(huì)過(guò)期,如果將TTL設(shè)置為0予颤,則表示除非此時(shí)可以直接將消息投遞到消費(fèi)者囤官,否則消息會(huì)立即丟棄

  • 設(shè)置消息的過(guò)期時(shí)間,只有這個(gè)消息存在過(guò)期時(shí)間荣瑟,設(shè)置消息的過(guò)期時(shí)間如下:
    // 設(shè)置消息的過(guò)期時(shí)間
    MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            message.getMessageProperties().setExpiration(6000);
            return message;
        }
    };
    
    sendService.send(exchangeName,routingKey,data, messagePostProcessor, messageId);

注意:使用第一種方式來(lái)設(shè)置過(guò)期時(shí)間治拿,一旦消息過(guò)期,就會(huì)從隊(duì)列中消除笆焰,而采用第二種方式劫谅,即使消息過(guò)期,也不會(huì)馬上從隊(duì)列中消除嚷掠,因?yàn)槊織l消息是否過(guò)期是在即將投遞到消費(fèi)者之期間進(jìn)行判斷的

隊(duì)列也是有過(guò)期時(shí)間的捏检,通過(guò)x-expires屬性來(lái)設(shè)置的

    Map<String,Object> map = new HashMap<String,Object>();
    map.put("x-expires",6000);
    MqQueue queue = new MqQueue().name(queueName).arguments(map);
    amQueueDeclare.declareQueue(queue);

死信隊(duì)列

DLX:Dead-Letter-Exchange:當(dāng)一個(gè)消息在一個(gè)隊(duì)列中變成死信之后,他能被重新發(fā)送到另一個(gè)交換機(jī)中不皆,這個(gè)交換機(jī)就是DLX贯城。

那么消息滿(mǎn)足什么條件就會(huì)成為死信呢?

  • 消息被拒絕霹娄,并且設(shè)置requeue參數(shù)為false
  • 消息過(guò)期
  • 隊(duì)列達(dá)到最大長(zhǎng)度

那么如何使用死信隊(duì)列能犯?
死信隊(duì)列一般都是作為其他隊(duì)列的一個(gè)屬性來(lái)用的,當(dāng)這個(gè)隊(duì)列中存在死信時(shí)犬耻,RabbitMQ就會(huì)自動(dòng)將這個(gè)消息重新發(fā)布到設(shè)置的DLX上踩晶,進(jìn)而被路由到另一個(gè)隊(duì)列中

    // 定義一個(gè)交換機(jī)
    MqExchange dlxExchange = new MqExchange().name("B").type(ExchangeTypeEnum.DIRECT.getCode());
    amExchangeDeclare.declareExchange(exchange);
    
    // 聲明一個(gè)隊(duì)列時(shí),設(shè)置他的屬性 x-dead-letter-exchange 為上面定義的交換機(jī)
     Map<String,Object> map = new HashMap<String,Object>();
    map.put("x-dead-letter-exchange ","dlxExchange");
    // 也可以為DLX指定路由鍵 這個(gè)不是必須的枕磁,如果沒(méi)有設(shè)置路由鍵渡蜻,則使用原隊(duì)列的路由鍵
    map.put("x-dead-letter-routing-key","dlx-routing-key");
    MqQueue queue = new MqQueue().name(queueName).arguments(map);
    amQueueDeclare.declareQueue(queue);

這樣就初步完成了死信隊(duì)列的聲明。其業(yè)務(wù)流程圖如下:

image

延時(shí)隊(duì)列(定時(shí)隊(duì)列)

上述的購(gòu)物車(chē)訂單的示例,其實(shí)最優(yōu)的方案設(shè)計(jì)是使用TTL+DLX來(lái)實(shí)現(xiàn)茸苇,如果用戶(hù)沒(méi)有在規(guī)定的時(shí)間來(lái)支付排苍,則這個(gè)訂單就進(jìn)行一場(chǎng)處理。

延時(shí)隊(duì)列的具體使用方法如下:

方案一:

  1. 聲明一個(gè)設(shè)置死信隊(duì)列的隊(duì)列学密,該隊(duì)列沒(méi)有消費(fèi)者
  2. 給每一個(gè)發(fā)送該隊(duì)列的消息設(shè)置過(guò)期時(shí)間
  3. 消息一旦過(guò)期就會(huì)被死信隊(duì)列消費(fèi)淘衙,這樣就能實(shí)現(xiàn)延時(shí)隊(duì)列的效果

方案二:

  1. 聲明多個(gè)設(shè)置死信隊(duì)列、不同過(guò)期時(shí)間的隊(duì)列腻暮,該隊(duì)列沒(méi)有消費(fèi)者
  2. 然后通過(guò)不同的的routingKey來(lái)將消息發(fā)送到不同的隊(duì)列上
  3. 等隊(duì)列過(guò)期時(shí)間一到幔翰,消息就會(huì)匹配到死信隊(duì)列上,這樣也能實(shí)現(xiàn)延時(shí)隊(duì)列的效果

方案一和方案二的實(shí)現(xiàn)原理基本相同西壮,不同的是一個(gè)是消息的過(guò)期時(shí)間,一個(gè)是隊(duì)列的過(guò)期時(shí)間叫惊,方案二聲明多個(gè)不同過(guò)期時(shí)間的隊(duì)列款青,而方案一只聲明一個(gè)隊(duì)列,這樣的話有優(yōu)點(diǎn)也有缺點(diǎn)霍狰,優(yōu)點(diǎn)是減少了RabbitMQ的隊(duì)列數(shù)量抡草,缺點(diǎn)是降低了RabbitMQ隊(duì)列消息消費(fèi)的速度,而使用哪種方案可以根據(jù)業(yè)務(wù)和流量來(lái)衡量使用

具體的業(yè)務(wù)流程圖如下:

[圖片上傳失敗...(image-1f73c4-1562419602501)]

消息持久化

RabbitMQ中交換機(jī)蔗坯、隊(duì)列和消息都可以持久化康震,其中交換機(jī)和隊(duì)列的持久化只需要在聲明時(shí),其屬性durable為true即可宾濒,而消息的持久化是建立在隊(duì)列持久化的基礎(chǔ)上腿短,因?yàn)樵赗abbitMQ中,消息時(shí)存儲(chǔ)在隊(duì)列上的绘梦,隊(duì)列都沒(méi)有了橘忱,消息肯定也是存儲(chǔ)不了的。

消息的持久化在上面的內(nèi)容已經(jīng)介紹過(guò)了卸奉,不同版本的SpringBoot的RabbitMQ集成钝诚,實(shí)現(xiàn)的方式可能 不一樣,但是本質(zhì)都是一樣的榄棵,最終操作的都是RabbitMQ服務(wù)器

消息和隊(duì)列的持久化都是比較簡(jiǎn)單的凝颇,但是我們這里要清楚的知道,如果我們將所有的消息都設(shè)置了持久化了疹鳄,會(huì)嚴(yán)重影響RabbitMQ的性能拧略,畢竟消息寫(xiě)入到磁盤(pán)的速度比寫(xiě)入內(nèi)存的數(shù)據(jù)慢的不是一點(diǎn)點(diǎn)的。

這個(gè)就要求我們?cè)谠O(shè)計(jì)時(shí)需要注意尚辑,對(duì)于可靠性不是那么高的消息辑鲤,可以不采用持久化處理來(lái)提高吞吐量。在選擇是否將消息持久化時(shí)杠茬,需要在可靠性和吞吐量之間做一個(gè)權(quán)衡

消息消費(fèi)處理

參數(shù)isAck

這里需要注意一點(diǎn)月褥,在我們上一篇博文SpringBoot整合RabbitMQ——消息的發(fā)送和接收中弛随,我們有介紹在為隊(duì)列設(shè)置監(jiān)聽(tīng)時(shí),有個(gè)參數(shù)isAck宁赤,這里如果設(shè)置成true舀透,則隊(duì)列在接收到消息后,不管業(yè)務(wù)方有沒(méi)有完全消費(fèi)消息决左,都會(huì)給RabbitMQ返回個(gè)消息已經(jīng)消費(fèi)成功的結(jié)果愕够,RabbitMQ在判斷消息已經(jīng)成功消費(fèi)了則會(huì)刪除隊(duì)列中的消息,但是業(yè)務(wù)方其實(shí)沒(méi)有真正完成消息的消費(fèi)佛猛,這樣就會(huì)導(dǎo)致數(shù)據(jù)的丟失惑芭。

那我們?cè)趺磥?lái)處理這個(gè)問(wèn)題呢?

其實(shí)很簡(jiǎn)單继找,我們只需要將isAck字段的參數(shù)設(shè)置成false即可遂跟,這樣的話就需要業(yè)務(wù)方手動(dòng)去操作該消息有沒(méi)有被成功消費(fèi)。如果沒(méi)有消費(fèi)的話就拒絕這個(gè)消息婴渡,是的拒絕這個(gè)消息幻锁,還記得我們之前介紹過(guò)的死信隊(duì)列,隊(duì)列設(shè)置了死信隊(duì)列边臼,消息一旦被拒絕的話哄尔,消息就會(huì)進(jìn)入死信交換機(jī),進(jìn)而匹配到響應(yīng)的隊(duì)列中柠并,可以在隊(duì)列中將消息持久化到本地岭接,這是一種解決方法。

具體的流程圖可以參考死信隊(duì)列的流程圖:


image

消息分發(fā)

在實(shí)際的生產(chǎn)過(guò)程中堂鲤,可能一個(gè)隊(duì)列存在多個(gè)消費(fèi)者亿傅,那么隊(duì)列此時(shí)收到的消息就會(huì)以輪詢(xún)的方式發(fā)送給消費(fèi)者。

一般情況下瘟栖,rabbitMQ會(huì)將第m條消息發(fā)送給第m%n個(gè)消費(fèi)者葵擎。這里其實(shí)有個(gè)隱患,在消費(fèi)者任務(wù)非常繁重的情況下半哟,來(lái)不及消費(fèi)那么多的消息酬滤,而其他的消費(fèi)者,由于某些原因寓涨,很快的處理完消息盯串,這種情況就很容易出現(xiàn)某個(gè)消費(fèi)者承受的壓力就比較大,造成整體應(yīng)用的吞吐量下降戒良。

為了解決這個(gè)問(wèn)題体捏, 我們其實(shí)可以設(shè)置Channel信道上的最大處理消息的個(gè)數(shù),代碼設(shè)置如下:

    // 在系統(tǒng)初始化啟動(dòng)時(shí),加載Connection几缭,創(chuàng)建Channel河泳,然后設(shè)置BasicQos的個(gè)數(shù)
    Channel channel = cachingConnectionFactory.createConnection().createChannel();
    channel.basicQos(1000);

basicQos具體的作用是設(shè)置允許限制Channel上所有消費(fèi)者所能保持的最大未確認(rèn)的消息的數(shù)量

注意如果這里設(shè)置了最大的未確認(rèn)的消息的數(shù)量,那么所有的消費(fèi)者都會(huì)生效

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末年栓,一起剝皮案震驚了整個(gè)濱河市拆挥,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌某抓,老刑警劉巖纸兔,帶你破解...
    沈念sama閱讀 211,948評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異否副,居然都是意外死亡汉矿,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,371評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén)备禀,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)负甸,“玉大人,你說(shuō)我怎么就攤上這事痹届。” “怎么了打月?”我有些...
    開(kāi)封第一講書(shū)人閱讀 157,490評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵队腐,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我奏篙,道長(zhǎng)柴淘,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,521評(píng)論 1 284
  • 正文 為了忘掉前任秘通,我火速辦了婚禮为严,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘肺稀。我一直安慰自己第股,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,627評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布话原。 她就那樣靜靜地躺著夕吻,像睡著了一般。 火紅的嫁衣襯著肌膚如雪繁仁。 梳的紋絲不亂的頭發(fā)上涉馅,一...
    開(kāi)封第一講書(shū)人閱讀 49,842評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音黄虱,去河邊找鬼稚矿。 笑死,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的晤揣。 我是一名探鬼主播桥爽,決...
    沈念sama閱讀 38,997評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼碉渡!你這毒婦竟也來(lái)了聚谁?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 37,741評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤滞诺,失蹤者是張志新(化名)和其女友劉穎形导,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體习霹,經(jīng)...
    沈念sama閱讀 44,203評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡朵耕,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,534評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了淋叶。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片阎曹。...
    茶點(diǎn)故事閱讀 38,673評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖煞檩,靈堂內(nèi)的尸體忽然破棺而出处嫌,到底是詐尸還是另有隱情,我是刑警寧澤斟湃,帶...
    沈念sama閱讀 34,339評(píng)論 4 330
  • 正文 年R本政府宣布熏迹,位于F島的核電站,受9級(jí)特大地震影響凝赛,放射性物質(zhì)發(fā)生泄漏注暗。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,955評(píng)論 3 313
  • 文/蒙蒙 一墓猎、第九天 我趴在偏房一處隱蔽的房頂上張望捆昏。 院中可真熱鬧,春花似錦毙沾、人聲如沸骗卜。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,770評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)膨俐。三九已至,卻和暖如春罩句,著一層夾襖步出監(jiān)牢的瞬間焚刺,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,000評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工门烂, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留乳愉,地道東北人兄淫。 一個(gè)月前我還...
    沈念sama閱讀 46,394評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像蔓姚,于是被迫代替她去往敵國(guó)和親捕虽。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,562評(píng)論 2 349

推薦閱讀更多精彩內(nèi)容