rabbitmq消息可靠性之消息回調(diào)機制

rabbitmq消息可靠性之消息回調(diào)機制
rabbitmq消息回調(diào)機制圖解.png

rabbitmq在消息的發(fā)送與接收中草雕,會經(jīng)過上面的流程,這些流程中每一步都有可能導致消息丟失固以,或者消費失敗甚至直接是服務器宕機等墩虹,這是我們服務接受不了的,為了保證消息的可靠性憨琳,rabbitmq提供了以下幾種機制

  • 生產(chǎn)者確認機制

  • 消息持久化存儲

  • 消費者確認機制

  • 失敗重試機制

本文主要講解生產(chǎn)者確認機制诫钓,也是rabbitmq提供的消息回調(diào)機制,這個機制可以解決生產(chǎn)者發(fā)送消息到交換機和交換機路由到隊列過程中的消息丟失問題

這種機制必須給每個消息指定一個唯一ID篙螟,消息發(fā)送到rabbitmq之后會返回結(jié)果給生產(chǎn)者菌湃,表示消息是否發(fā)送成功,返回結(jié)果有以下兩種

  • publisher-confirm:發(fā)送者確認:消息成功投遞到交換機遍略,返回 ack惧所;消息未投遞到交換機,返回 nack

  • publisher-return:發(fā)送者回執(zhí):消息成功投遞到交換機绪杏,但是沒有路由到隊列下愈。返回 ack,及路由失敗原因

spring:
  rabbitmq:
    # rabbitMQ的ip地址
    host: 127.0.0.1
    # 端口
    port: 5672
    # 集群模式配置
    # addresses: 127.0.0.1:8071, 127.0.0.1:8072, 127.0.0.1:8073
    username: admin
    password: 123456
    virtual-host: /
    # 消費者確認機制相關(guān)配置 
    # 開啟publisher-confirm蕾久,
    # 這里支持兩種類型:simple:同步等待confirm結(jié)果势似,直到超時;# correlated:異步回調(diào)僧著,定義ConfirmCallback履因,MQ返回結(jié)果時會回調(diào)這個ConfirmCallback
    publisher-confirm-type: correlated
    # publish-returns:開啟publish-return功能,同樣是基于callback機制盹愚,不過是定義ReturnCallback
    publisher-returns: true
    # 定義消息路由失敗時的策略栅迄。true,則調(diào)用ReturnCallback杯拐;false:則直接丟棄消息
    template:
      mandatory: true

然后定義 ReturnCallback 回調(diào)霞篡,每個RabbitTemplate只能配置一個ReturnCallback世蔗,因此需要在項目加載時配置

package com.gitee.small.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@Slf4j
public class RabbitMQConfig implements ApplicationContextAware {

    //綁定鍵
    public final static String DOG = "topic.dog";
    public final static String CAT = "topic.cat";


    /**
     * Queue構(gòu)造函數(shù)參數(shù)說明
     * new Queue(SMS_QUEUE, true);
     * 1. 隊列名
     * 2. 是否持久化 true:持久化 false:不持久化
     */


    @Bean
    public Queue firstQueue() {
        return new Queue(DOG);
    }

    @Bean
    public Queue secondQueue() {
        return new Queue(CAT);
    }

    @Bean
    public TopicExchange exchange() {
        return new TopicExchange("topicExchange");
    }


    /**
     * 將firstQueue和topicExchange綁定,而且綁定的鍵值為topic.dog
     * 這樣只要是消息攜帶的路由鍵是topic.dog,才會分發(fā)到該隊列
     */
    @Bean(name = "binding.dog")
    public Binding bindingExchangeMessage() {
        return BindingBuilder.bind(firstQueue()).to(exchange()).with(DOG);
    }

    /**
     * 將secondQueue和topicExchange綁定,而且綁定的鍵值為用上通配路由鍵規(guī)則topic.#
     * 這樣只要是消息攜帶的路由鍵是以topic.開頭,都會分發(fā)到該隊列
     */
    @Bean(name = "binding.cat")
    public Binding bindingExchangeMessage2() {
        return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 獲取RabbitTemplate對象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 配置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 判斷是否是延遲消息
            Integer receivedDelay = message.getMessageProperties().getReceivedDelay();
            if (receivedDelay != null && receivedDelay > 0) {
                // 是一個延遲消息,忽略這個錯誤提示
                return;
            }
            // 記錄日志
            log.error("消息發(fā)送到隊列失敗朗兵,響應碼:{}, 失敗原因:{}, 交換機: {}, 路由key:{}, 消息: {}",
                    replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有需要的話污淋,重發(fā)消息
        });
    }
}

接著定義 ConfirmCallback,ConfirmCallback 可以在發(fā)送消息時指定余掖,因為每個業(yè)務處理 confirm 成功或失敗的邏輯不一定相同寸爆,上面已經(jīng)定義好exchange 和 queue,新建RabbitMqTest測試類

package smallJ;

import com.gitee.small.Application;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.UUID;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
@Slf4j
public class RabbitMqTest {


    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test() throws InterruptedException {
        // 1.準備CorrelationData
        // 1.1.消息ID
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        // 1.2.準備ConfirmCallback
        correlationData.getFuture().addCallback(result -> {
            // 判斷結(jié)果
            if (result.isAck()) {
                // ACK
                log.info("消息成功投遞到交換機盐欺!消息ID: {}", correlationData.getId());
            } else {
                // NACK
                log.error("消息投遞到交換機失斄薅埂!消息ID:{}冗美,原因:{}", correlationData.getId(), result.getReason());
                // 重發(fā)消息
            }
        }, ex -> {
            // 記錄日志
            log.error("消息發(fā)送異常, ID:{}, 原因{}", correlationData.getId(), ex.getMessage());
            // 可以重發(fā)消息
        });
        rabbitTemplate.convertAndSend("topicExchange", "topic.dog", "路由模式測試-dog", correlationData);
        // 程序休眠兩秒等待回調(diào)
        Thread.sleep(2000);
    }
}

加兩個監(jiān)聽器進行測試

package com.gitee.small.rabbitmq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class TopicRabbitReceiver {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.dog"),
            exchange = @Exchange(value = "bindingExchangeMessage", type = ExchangeTypes.TOPIC)
    ))
    public void process(String msg) {
        log.info("dog-收到消息:{}", msg);
    }


    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.cat"),
            exchange = @Exchange(value = "bindingExchangeMessage2", type = ExchangeTypes.TOPIC)
    ))
    public void  process2(String msg){
        log.info("cat-收到消息:{}", msg);
    }
}

測試結(jié)果如下

smallJ.RabbitMqTest   : 消息成功投遞到交換機魔种!消息ID: 83f057fa-042d-4f56-872d-9d31a0444b82
c.g.small.rabbitmq.TopicRabbitReceiver   : dog-收到消息:路由模式測試-dog
c.g.small.rabbitmq.TopicRabbitReceiver   : cat-收到消息:路由模式測試-dog
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
禁止轉(zhuǎn)載,如需轉(zhuǎn)載請通過簡信或評論聯(lián)系作者粉洼。
  • 序言:七十年代末节预,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子属韧,更是在濱河造成了極大的恐慌安拟,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,820評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件宵喂,死亡現(xiàn)場離奇詭異糠赦,居然都是意外死亡,警方通過查閱死者的電腦和手機锅棕,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,648評論 3 399
  • 文/潘曉璐 我一進店門拙泽,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人哲戚,你說我怎么就攤上這事奔滑。” “怎么了顺少?”我有些...
    開封第一講書人閱讀 168,324評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長王浴。 經(jīng)常有香客問我脆炎,道長,這世上最難降的妖魔是什么氓辣? 我笑而不...
    開封第一講書人閱讀 59,714評論 1 297
  • 正文 為了忘掉前任秒裕,我火速辦了婚禮,結(jié)果婚禮上钞啸,老公的妹妹穿的比我還像新娘几蜻。我一直安慰自己喇潘,他們只是感情好,可當我...
    茶點故事閱讀 68,724評論 6 397
  • 文/花漫 我一把揭開白布梭稚。 她就那樣靜靜地躺著颖低,像睡著了一般。 火紅的嫁衣襯著肌膚如雪弧烤。 梳的紋絲不亂的頭發(fā)上忱屑,一...
    開封第一講書人閱讀 52,328評論 1 310
  • 那天,我揣著相機與錄音暇昂,去河邊找鬼莺戒。 笑死,一個胖子當著我的面吹牛急波,可吹牛的內(nèi)容都是我干的从铲。 我是一名探鬼主播,決...
    沈念sama閱讀 40,897評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼澄暮,長吁一口氣:“原來是場噩夢啊……” “哼名段!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起赏寇,我...
    開封第一講書人閱讀 39,804評論 0 276
  • 序言:老撾萬榮一對情侶失蹤吉嫩,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后嗅定,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體自娩,經(jīng)...
    沈念sama閱讀 46,345評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,431評論 3 340
  • 正文 我和宋清朗相戀三年渠退,在試婚紗的時候發(fā)現(xiàn)自己被綠了忙迁。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,561評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡碎乃,死狀恐怖姊扔,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情梅誓,我是刑警寧澤恰梢,帶...
    沈念sama閱讀 36,238評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站梗掰,受9級特大地震影響嵌言,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜及穗,卻給世界環(huán)境...
    茶點故事閱讀 41,928評論 3 334
  • 文/蒙蒙 一摧茴、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧埂陆,春花似錦苛白、人聲如沸娃豹。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,417評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽懂版。三九已至,卻和暖如春缓窜,著一層夾襖步出監(jiān)牢的瞬間定续,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,528評論 1 272
  • 我被黑心中介騙來泰國打工禾锤, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留私股,地道東北人。 一個月前我還...
    沈念sama閱讀 48,983評論 3 376
  • 正文 我出身青樓恩掷,卻偏偏與公主長得像倡鲸,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子黄娘,可洞房花燭夜當晚...
    茶點故事閱讀 45,573評論 2 359

推薦閱讀更多精彩內(nèi)容