RabbitMQ知識梳理

一、整體架構(gòu)和核心概念

作用:異步、消峰晋修、解耦

二、使用springboot收發(fā)消息(Demo)

1. 引入依賴

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>3.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

2. 配置rabbitmq

  • 創(chuàng)建虛擬主機(jī)(Virtual host):/wlhost
  • 創(chuàng)建隊(duì)列(queue):wl.queue
  • wl.queue綁定在/wlhost上(用可以控制此虛擬主機(jī)的用戶創(chuàng)建該隊(duì)列即可)
  rabbitmq:
    host: 192.168.0.100
    port: 5672
    username: admin
    password: 123456
    virtual-host: /wlhost

3. 發(fā)送消息代碼

@SpringBootTest
public class SpringBootAmqpTest {
    @Resource
    private AmqpTemplate amqpTemplate;

    @Test
    void testSendMsg() {
        amqpTemplate.convertAndSend("wl.queue", "hello amqp!");
    }
}

4. 接收消息代碼

@Slf4j
@Component
public class MqListener {

    @RabbitListener(queues = "wl.queue")
    public void receivedMsg(String msg) {
        log.info("收到了消息:{}", msg);
    }
}
控制臺打印

三烟阐、work模型(能者多勞)

1. 配置prefetch(處理完成才會接收消息,未處理完不會接收消息)

  rabbitmq:
    host: 192.168.0.100
    port: 5672
    username: admin
    password: 123456
    virtual-host: /wlhost
    listener:
      simple:
        prefetch: 1

2. 發(fā)送消息代碼

    @Test
    void testSendWorkMsg() {
        for (int i = 1; i <= 50; i++) {
            amqpTemplate.convertAndSend("work.queue", "hello worker, message_" + i);
        }
    }

3. 接收消息代碼

    @RabbitListener(queues = "work.queue")
    public void worker1ReceivedMsg(String msg) throws InterruptedException {
        log.info("收到了消息:{}", msg);
        Thread.sleep(20);
    }

    @RabbitListener(queues = "work.queue")
    public void worker2ReceivedMsg(String msg) throws InterruptedException {
        log.warn("收到了消息:{}", msg);
        Thread.sleep(200);
    }
控制臺打印

四紊扬、四種交換機(jī)

4.1 fanout交換機(jī)

fanout exchange會將收到的消息廣播到每一個(gè)跟其綁定的queue蜒茄,因此也叫廣播模式
所有的微服務(wù)以及對應(yīng)的集群節(jié)點(diǎn)都會收到fanout exchange交換機(jī)跟其綁定的queue的消息

1. 配置fanout exchange

添加wl.fanout交換機(jī),并為其添加兩個(gè)隊(duì)列fanout.queue1餐屎,fanout.queue2


2. 發(fā)送消息代碼

    @Test
    void fanoutSendMsg() {
        String exchangeName = "wl.fanout";
        amqpTemplate.convertAndSend(exchangeName, "", "hello everyone!");
    }

3. 接收消息代碼

    @RabbitListener(queues = "fanout.queue1")
    public void fanout1ReceivedMsg(String msg) {
        log.info("消費(fèi)者1收到了消息:{}", msg);
    }

    @RabbitListener(queues = "fanout.queue2")
    public void fanout2ReceivedMsg(String msg) {
        log.info("消費(fèi)者2收到了消息:{}", msg);
    }
控制臺打印

4.2 direct交換機(jī)

direct exchange會將收到的消息根據(jù)路由規(guī)則到指定queue檀葛,因此也叫定向路由

  • 每一個(gè)queue都與Exchange設(shè)置一個(gè)BindingKey
  • 發(fā)布者發(fā)送消息時(shí),指定消息的BindingKey
  • Exchange將消息路由到BindingKey與消息BindingKey一致的隊(duì)列

若所有的BindingKey均一致腹缩,direct exchange也能實(shí)現(xiàn)fanout exchange的廣播效果

1. 配置direct exchange

添加wl.direct交換機(jī)屿聋,并為其添加兩個(gè)隊(duì)列direct.queue1direct.queue2


2. 發(fā)送消息代碼

    @Test
    void directSendMsg() {
        String exchangeName = "wl.direct";
        amqpTemplate.convertAndSend(exchangeName, "red", "hello red!");
    }

3. 接收消息代碼

    @RabbitListener(queues = "direct.queue1")
    public void direct1ReceivedMsg(String msg) {
        log.info("消費(fèi)者1可以收到routingKey為blue和red的消息:{}", msg);
    }

    @RabbitListener(queues = "direct.queue2")
    public void direct2ReceivedMsg(String msg) {
        log.info("消費(fèi)者2可以收到routingKey為yellow和red的消息:{}", msg);
    }

控制臺打印

4. 將發(fā)送消息routingKey改為yellow

    @Test
    void directSendMsg() {
        String exchangeName = "wl.direct";
        amqpTemplate.convertAndSend(exchangeName, "yellow", "hello red!");
    }
控制臺打印

4.3 topic交換機(jī)(推薦使用)

topic exchangedirect exchange類似藏鹊,區(qū)別咋子與routingKey可以使多個(gè)單詞的列表润讥,并以.分割
queueexchange指定routingKey時(shí)可以使用通配符

  • “#”:代指0個(gè)或多個(gè)單詞
  • “*”:代指1個(gè)單詞

1. 配置topic exchange

添加wl.topic交換機(jī),并為其添加兩個(gè)隊(duì)列topic.queue1盘寡,topic.queue2


2. 發(fā)送消息代碼

    @Test
    void topicSendMsg() {
        String exchangeName = "wl.topic";
        amqpTemplate.convertAndSend(exchangeName, "china.weather", "這是中國天氣消息!");
    }

3. 接收消息代碼

    @RabbitListener(queues = "topic.queue1")
    public void topic1ReceivedMsg(String msg) {
        log.info("消費(fèi)者1可以收到routingKey含weather的消息:{}", msg);
    }

    @RabbitListener(queues = "topic.queue2")
    public void topic2ReceivedMsg(String msg) {
        log.info("消費(fèi)者2可以收到routingKey含china的消息:{}", msg);
    }
控制臺打印

4.4 headers交換機(jī)

不依賴路由鍵匹配規(guī)則路由消息楚殿。是根據(jù)發(fā)送消息內(nèi)容中的headers屬性進(jìn)行匹配。性能差竿痰,基本用不到

五脆粥、創(chuàng)建(聲明)隊(duì)列和交換機(jī)

1. 使用注解創(chuàng)建(推薦使用)

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "wl.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "blue"}
    ))
    public void direct1ReceivedMsg(String msg) {
        log.info("消費(fèi)者1可以收到routingKey為blue和red的消息:{}", msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "wl.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "yellow"}
    ))
    public void direct2ReceivedMsg(String msg) {
        log.info("消費(fèi)者2可以收到routingKey為yellow和red的消息:{}", msg);
    }

2. 使用Bean注入創(chuàng)建

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MqConfig {

    @Bean
    public FanoutExchange fanoutExchange() {
//        ExchangeBuilder.fanoutExchange("wl.fanout").build();
        return new FanoutExchange("wl.fanout");
    }

    @Bean
    public Queue queue() {
//        return QueueBuilder.durable("fanout.queue1").build();
        return new Queue("fanout.queue1");
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(fanoutExchange());
    }
}

六、消息轉(zhuǎn)換器

發(fā)送消息和接收消息方都需要引入依賴影涉,注入消息轉(zhuǎn)換器
否則會使用jdk自帶的序列化方式ObjectOutputStream

1. 引入依賴

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

2. 注入消息轉(zhuǎn)換器

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class SpringbootConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringbootConsumerApplication.class, args);
    }

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

3. 發(fā)送消息代碼

    @Test
    void testSendObjectMsg() {
        String exchangeName = "wl.object.exchange";
        Student student = Student.builder()
                .studentId(111)
                .studentName("張三")
                .gender("男")
                .age(24)
                .tel("13265659898")
                .clazzId(4)
                .build();
        amqpTemplate.convertAndSend(exchangeName, "", student);
    }

4. 接收消息代碼

    @RabbitListener(queues = "wl.object.queue")
    public void objectReceivedMsg(Map<String, String> map) {
        log.warn("收到了消息:{}", map);
    }
控制臺打印

七冠绢、消息可靠性問題

7.1 生產(chǎn)者重連(rabbitmq服務(wù)器宕機(jī),重連)

解決消息發(fā)送給服務(wù)器是丟失問題

當(dāng)網(wǎng)絡(luò)不穩(wěn)定時(shí)常潮,利用重試機(jī)制可以提高消息發(fā)送的成功率,不過SpirngAMQP的重試機(jī)制是阻塞式的楷力,也就是多次重試等待的過程中喊式,當(dāng)前線程是被阻塞的孵户,會影響業(yè)務(wù)性能

如果對于 業(yè)務(wù)性能有要求,建議禁用重試機(jī)制岔留。如一定要用夏哭,請?jiān)O(shè)置合理的等待時(shí)長重試次數(shù),當(dāng)然也可以考慮用異步線程來執(zhí)行發(fā)送消息的代碼献联。

  rabbitmq:
    connection-timeout: 1s #設(shè)置mq的連接超時(shí)時(shí)間
    template:
      retry:
        enabled: true #開啟超時(shí)重試機(jī)制
        initial-interval: 1000ms #失敗后的初始等待時(shí)間
        multiplier: 1 #失敗后下次的等待時(shí)長倍數(shù)竖配,下次等待時(shí)長 = initial-interval * multiplier

7.2 生產(chǎn)者確認(rèn)機(jī)制

rabbitMQ有publisher returnpublisher confirm兩種確認(rèn)機(jī)制。開啟這兩種確認(rèn)及之后里逆,MQ成功收到消息后會返回確認(rèn)消息給生產(chǎn)者进胯。返回的結(jié)果有以下幾種情況:

  • exchange存在,routingKey不存在原押,MQ返回ack胁镐,但會通過publisher return返回路由異常,告知投遞成功
  • exchange诸衔、routingKey均存在盯漂,臨時(shí)消息入隊(duì)成功,返回ack笨农,告知投遞成功
  • exchange就缆、routingKey均存在,持久消息入隊(duì)成功且完成持久化谒亦,返回ack竭宰,告知投遞成功
  • 其他情況都會返回nack,告知投遞失敗

publisher-returns:默認(rèn)為false诊霹,不開啟publisher return機(jī)制
publisher-confirm-type:默認(rèn)為none羞延,publisher confirm機(jī)制

  • none:關(guān)閉confirm
  • simple:同步阻塞等待mq的回執(zhí)消息
  • correlated:異步回調(diào)返回mq的回執(zhí)消息

1. 配置生產(chǎn)者確認(rèn)機(jī)制

  rabbitmq:
    host: 192.168.0.100
    port: 5672
    username: admin
    password: 123456
    virtual-host: /wlhost
    connection-timeout: 1s #設(shè)置mq的連接超時(shí)時(shí)間
    publisher-returns: true #開啟publisher return機(jī)制
    #none:關(guān)閉confirm,simple:同步阻塞等待mq的回執(zhí)消息脾还,correlated:異步回調(diào)返回mq的回執(zhí)消息
    publisher-confirm-type: correlated #publisher confirm機(jī)制伴箩;

2. 設(shè)置publisher return

@Configuration
@Slf4j
public class MqConfig {

    @Resource
    private RabbitTemplate rabbitTemplate;
    @PostConstruct
    public RabbitTemplate rabbitTemplate() {
        rabbitTemplate.setReturnsCallback(returnedMessage -> {
            log.info("收到消息的return callback,exchange:{}鄙漏,routingKey:{}嗤谚,msg:{},replyCode:{}怔蚌,replyText:{}",
                    returnedMessage.getMessage(),
                    returnedMessage.getRoutingKey(),
                    returnedMessage.getMessage(),
                    returnedMessage.getReplyCode(),
                    returnedMessage.getReplyText()
            );
        });
        return rabbitTemplate;
    }
}

3. 發(fā)送消息代碼巩步,設(shè)置publisher confirm

    @Test
    void publisherConfirmSendMsg() {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
            @Override
            public void onFailure(Throwable throwable) {
                log.error("消息回調(diào)失敗", throwable);
            }

            @Override
            public void onSuccess(CorrelationData.Confirm confirm) {
                log.info("收到消息回執(zhí)");
                if (confirm.isAck()) {
                    log.info("消息發(fā)送成功,收到ack");
                } else {
                    log.error("消息發(fā)送失敗桦踊,收到nack椅野,原因:{}", confirm.getReason());
                }
            }
        });
        String exchangeName = "wl.topic";
//        rabbitTemplate.convertAndSend(exchangeName, "aaa.bbb", "這是中國天氣消息!", correlationData);
        rabbitTemplate.convertAndSend(exchangeName, "china.weather", "這是中國天氣消息!", correlationData);
    }
控制臺打印

若將exchangeName修改為不存在的名稱wls.topic

控制臺打印

若將routingKey修改為不存在的名稱aaa.bbb

控制臺打印

7.3 消費(fèi)者確認(rèn)機(jī)制

當(dāng)開啟消費(fèi)者確認(rèn)機(jī)制Consumer Acknowledgement后,消費(fèi)者處理消息結(jié)束后,會向MQ發(fā)送一個(gè)回執(zhí)竟闪,告知rabbitmq消息處理的狀態(tài)离福。回執(zhí)有三種可選值:

  • ack:成功處理消息炼蛤,rabbitmqqueue中刪除該消息
  • nack:處理消息失敗妖爷,rabbitmq需要再次投遞消息
  • reject:處理消息失敗并拒絕該消息,rabbitmqqueue中刪除該消息

針對rabbitmq的消費(fèi)者確認(rèn)機(jī)制理朋,SpringAMQP已經(jīng)實(shí)現(xiàn)了消息確認(rèn)功能絮识,并允許我們通過配置文件的方式選擇ack處理方式:

  • none:不處理。消息投遞給消費(fèi)者后立即返回ack嗽上,消息從mq刪除次舌。不建議使用
  • manual:手動(dòng)模式。需要自己在業(yè)務(wù)代碼中調(diào)用api炸裆,發(fā)送ackreject垃它,存在業(yè)務(wù)入侵,但更靈活
  • auto:自動(dòng)模式烹看。利用AOP對接收消息的處理邏輯做環(huán)繞增強(qiáng),業(yè)務(wù)正常執(zhí)行自動(dòng)返回ack国拇,推薦使用

當(dāng)業(yè)務(wù)出現(xiàn)異常時(shí),根據(jù)異常判斷返回不同結(jié)果:

  • 業(yè)務(wù)異常惯殊,例如RuntimeException酱吝,自動(dòng)返回nack
  • 消息處理或校驗(yàn)異常,例如MessageCoversionException土思,自動(dòng)返回reject

1. 配置消費(fèi)者確認(rèn)機(jī)制

  rabbitmq:
    host: 192.168.0.100
    port: 5672
    username: admin
    password: 123456
    virtual-host: /wlhost
    listener:
      simple:
        acknowledge-mode: auto #none:不處理务热;manual:手動(dòng)模式;auto:自動(dòng)模式

2. 配置消費(fèi)者重試機(jī)制

此時(shí)己儒,若拋出RuntimeException崎岂,mq的消息沒有出隊(duì),會無限重新投遞消息闪湾,給mq服務(wù)器帶來不必要的壓力冲甘,影響mq的性能,利用Springretry機(jī)制途样,在消費(fèi)者出現(xiàn)異常時(shí)利用本地重試江醇。配置如下:

  rabbitmq:
    host: 192.168.0.100
    port: 5672
    username: admin
    password: 123456
    virtual-host: /wlhost
    listener:
      simple:
        acknowledge-mode: auto
        retry:
          enabled: true
          max-attempts: 3 #最大重試次數(shù)
          stateless: true #true無狀態(tài),false有狀態(tài)何暇。如果業(yè)務(wù)中包含事務(wù)陶夜,這里改為false

注:重試次數(shù)耗盡之后,不管消息是否消費(fèi)成功裆站,消息都會出隊(duì)条辟,會導(dǎo)致消息丟失

3. 配置失敗消息處理策略

在開啟重試模式后黔夭,重試次數(shù)耗盡,如果消息依然失敗捂贿,則需要有MessageRecoverer接口來處理纠修,三種實(shí)現(xiàn):

  • RejectAndDontRequeueRecoverer:重試耗盡后,直接reject厂僧,丟棄消息。默認(rèn)方式
  • ImmediateRequeueMessageRecoverer:重試耗盡后了牛,返回nack颜屠,消息重新入隊(duì)
  • RepublishMessageRecoverer:重試耗盡后,將失敗的消息投遞到指定的交換機(jī)

配置失敗消息處理策略信息

@Configuration
// 只有spring.rabbitmq.listener.simple.retry.enabled = true時(shí)鹰祸,此配置才會生效
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled", havingValue = "true")
public class ErrorMqConfig {

    @Bean
    public DirectExchange errorExchange() {
        return new DirectExchange("error.exchange");
    }

    @Bean
    public Queue errorQueue() {
        return new Queue("error.queue");
    }

    @Bean
    public Binding errorBinding(DirectExchange errorExchange, Queue errorQueue) {
        return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");  // error是routingKey
    }

    @Bean
    public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
        return new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error");
    }
}

生產(chǎn)者代碼

    @Test
    void testSendObjectMsg() {
        String exchangeName = "wl.object.exchange";
        Student student = Student.builder()
                .studentId(111)
                .studentName("張三")
                .gender("男")
                .age(24)
                .tel("13265659898")
                .clazzId(4)
                .build();
        rabbitTemplate.convertAndSend(exchangeName, "", student);
    }

消費(fèi)者代碼

    @RabbitListener(queues = "wl.object.queue")
    public void objectReceivedMsg(Map<String, String> map) {
        log.warn("收到了消息:{}", map);
//        throw new RuntimeException("拋出業(yè)務(wù)異常");
        throw new MessageConversionException("拋出業(yè)務(wù)異常");
    }
控制臺打印
rabbitmq面板e(cuò)rrorQueue信息

4. 業(yè)務(wù)冪等性

方案一:唯一消息id

  • 為每一條消息生成一個(gè)唯一的id甫窟,與消息一起發(fā)送給消費(fèi)者
  • 消費(fèi)者收到消息處理完業(yè)務(wù)后,將消息id保存到數(shù)據(jù)庫
  • 如果下次收到相同的消息蛙婴,去數(shù)據(jù)庫查詢判斷是否存在粗井,存在則不處理本次消息

開啟默認(rèn)的messageId,converter.setCreateMessageIds(true);

@SpringBootApplication
public class SpringbootConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringbootConsumerApplication.class, args);
    }

    @Bean
    public MessageConverter messageConverter() {
        Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
        converter.setCreateMessageIds(true);
        return converter;
    }
}

此id為UUID.randomUUID().toString()街图,可根據(jù)自己的需求更改成其他id浇衬,如分布式id、雪花算法id等

if (this.createMessageIds && messageProperties.getMessageId() == null) {
   messageProperties.setMessageId(UUID.randomUUID().toString());
}

方案二:結(jié)合業(yè)務(wù)邏輯

  • 比如更新訂單狀態(tài)(未支付=>已支付)餐济,可以先查詢當(dāng)前訂單是否為已支付耘擂,若為已支付,不處理本次消息

八絮姆、數(shù)據(jù)持久化

消息有兩種類型醉冤,臨時(shí)消息non-persistent和持久化消息persistent,通過springAMQP發(fā)送的消息默認(rèn)都是持久化的篙悯。 發(fā)送消息時(shí)可以指定消息類型蚁阳,deliver mode = 1是臨時(shí)消息,deliver mode = 2是持久化消息
臨時(shí)消息存在內(nèi)存中鸽照,持久化消息存在磁盤上螺捐,當(dāng)內(nèi)存消息滿時(shí),會觸發(fā)pageout移宅,此時(shí)mq是阻塞的归粉,持久化消息不會觸發(fā)pageout

mq的消息存在內(nèi)存中,易丟失漏峰,需要采用數(shù)據(jù)持久化和改變隊(duì)列模式(lazy queue)來解決消息丟失和阻塞問題
lazy queue優(yōu)化了消息寫入磁盤的效率糠悼。保證隊(duì)列和消息都持久化,再使用Lazy Queue隊(duì)列模式保證消息的可靠性浅乔,但內(nèi)存只會保存最近的消息

聲明Lazy Queue的兩種方式代碼

    @Bean
    public Queue queue() {
        return QueueBuilder.durable("fanout.queue1").lazy().build();
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "wl.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "blue"},
            arguments = @Argument(name = "x-queue-mode", value = "lazy")
    ))
    public void direct1ReceivedMsg(String msg) {
        log.info("消費(fèi)者1可以收到routingKey為blue和red的消息:{}", msg);
    }

消息持久化代碼

    @Test
    void persistentSendMsg() {
        Message message = MessageBuilder
                .withBody("hello world".getBytes())
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .build();
        String exchangeName = "wl.topic";
        rabbitTemplate.convertAndSend(exchangeName, "china.weather", message);
    }

九倔喂、死信交換機(jī)

當(dāng)一個(gè)隊(duì)列中的消息滿足下列情況之一時(shí)铝条,就會成為死信dead letter

  • 消費(fèi)者使用basic.rejectbasic.nack聲明消費(fèi)失敗,并且消息的requeue參數(shù)設(shè)置為false
  • 消息是一個(gè)過期消息(達(dá)到了隊(duì)列或消息本身設(shè)置的過期時(shí)間)席噩,超時(shí)無人消費(fèi)
  • 要投遞的隊(duì)列消息堆積滿了班缰,最早的消息可能成為死信

如果隊(duì)列通過dead-letter-exchange屬性指定了一個(gè)交換機(jī),那么該隊(duì)列中的死信就會投遞到這個(gè)交換機(jī)中悼枢,這個(gè)交換機(jī)被稱為死信交換機(jī)(dead letter exchange埠忘,簡稱DLX)

死信交換機(jī)可以實(shí)現(xiàn)延遲消息的效果,但官網(wǎng)提供了延遲消息的插件馒索,更方便實(shí)用

工作原理

十莹妒、延遲消息

1. 安裝插件 rabbitmq_delayed_message_exchange,安裝對應(yīng)mq的版本即可绰上,將其放到plugins目錄下執(zhí)行以下命令旨怠,并重啟mq服務(wù)。官網(wǎng)地址:https://www.rabbitmq.com/community-plugins

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

2. 生產(chǎn)者代碼蜈块,設(shè)置messagePropertiesdelay即可鉴腻,單位ms

    @Test
    void testSendDelayMsg() {
        String exchangeName = "wl.delay.exchange";
        Student student = Student.builder()
                .studentId(111)
                .studentName("張三")
                .gender("男")
                .age(24)
                .tel("13265659898")
                .clazzId(4)
                .build();
//        rabbitTemplate.convertAndSend(exchangeName, "delayMsg", student, message -> {
//            message.getMessageProperties().setDelay(10000);
//            return message;
//        });
        rabbitTemplate.convertAndSend(exchangeName, "delayMsg", student, getMessagePostProcessor(10000));
        log.info("發(fā)送延遲消息:{}", student);
    }

    public MessagePostProcessor getMessagePostProcessor(Integer delayTime) {
        return message -> {
            message.getMessageProperties().setDelay(delayTime);
            return message;
        };
    }
生產(chǎn)者控制臺

3. 消費(fèi)者代碼,將交換機(jī)的delayed屬性置true即可

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "delay.queue"),
            exchange = @Exchange(name = "wl.delay.direct", type = ExchangeTypes.DIRECT, delayed = "true"),
            key = {"delayMsg"}
    ))
    public void delayReceivedMsg(Map<String, String> map) {
        log.info("收到延遲消息:{}", map);
    }
消費(fèi)者控制臺

參考自:https://www.bilibili.com/video/BV1mN4y1Z7t9

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末百揭,一起剝皮案震驚了整個(gè)濱河市爽哎,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌信峻,老刑警劉巖倦青,帶你破解...
    沈念sama閱讀 221,820評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異盹舞,居然都是意外死亡产镐,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,648評論 3 399
  • 文/潘曉璐 我一進(jìn)店門踢步,熙熙樓的掌柜王于貴愁眉苦臉地迎上來癣亚,“玉大人,你說我怎么就攤上這事获印∈鑫恚” “怎么了?”我有些...
    開封第一講書人閱讀 168,324評論 0 360
  • 文/不壞的土叔 我叫張陵兼丰,是天一觀的道長玻孟。 經(jīng)常有香客問我,道長鳍征,這世上最難降的妖魔是什么黍翎? 我笑而不...
    開封第一講書人閱讀 59,714評論 1 297
  • 正文 為了忘掉前任,我火速辦了婚禮艳丛,結(jié)果婚禮上匣掸,老公的妹妹穿的比我還像新娘趟紊。我一直安慰自己,他們只是感情好碰酝,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,724評論 6 397
  • 文/花漫 我一把揭開白布霎匈。 她就那樣靜靜地躺著,像睡著了一般送爸。 火紅的嫁衣襯著肌膚如雪铛嘱。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,328評論 1 310
  • 那天碱璃,我揣著相機(jī)與錄音弄痹,去河邊找鬼。 笑死嵌器,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的谐丢。 我是一名探鬼主播爽航,決...
    沈念sama閱讀 40,897評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼乾忱!你這毒婦竟也來了讥珍?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,804評論 0 276
  • 序言:老撾萬榮一對情侶失蹤窄瘟,失蹤者是張志新(化名)和其女友劉穎衷佃,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體蹄葱,經(jīng)...
    沈念sama閱讀 46,345評論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡氏义,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,431評論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了图云。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片惯悠。...
    茶點(diǎn)故事閱讀 40,561評論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖竣况,靈堂內(nèi)的尸體忽然破棺而出克婶,到底是詐尸還是另有隱情,我是刑警寧澤丹泉,帶...
    沈念sama閱讀 36,238評論 5 350
  • 正文 年R本政府宣布情萤,位于F島的核電站,受9級特大地震影響摹恨,放射性物質(zhì)發(fā)生泄漏筋岛。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,928評論 3 334
  • 文/蒙蒙 一睬塌、第九天 我趴在偏房一處隱蔽的房頂上張望泉蝌。 院中可真熱鬧歇万,春花似錦、人聲如沸勋陪。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,417評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽诅愚。三九已至寒锚,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間违孝,已是汗流浹背刹前。 一陣腳步聲響...
    開封第一講書人閱讀 33,528評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留雌桑,地道東北人喇喉。 一個(gè)月前我還...
    沈念sama閱讀 48,983評論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像校坑,于是被迫代替她去往敵國和親拣技。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,573評論 2 359

推薦閱讀更多精彩內(nèi)容