RabbtiMQ系列-5.SpringBoot集成RabbitMQ

一.概述

本文介紹Springboot如何集成RabbitMQ
本文內(nèi)容較多辜王,開始為最簡單的自動確認類的消息發(fā)送和消費嫉称,后續(xù)有發(fā)送方確認和接收方確認機制示例
關于RabbitMQ安裝就不說了恳邀,網(wǎng)上有很多
關于RabbitMQ的一些基礎概念和知識請查看前幾篇文章

二.RabbitMQ管理后臺

安裝好RabbitMQ并啟用管理后臺寒匙,訪問localhost:15672凿傅,輸入默認的用戶密碼guest/guest得到如下界面


三.SpringBoot集成

生產(chǎn)者集成

1.創(chuàng)建項目

我們首先創(chuàng)建一個Springboot項目demo-rabbitmq-producer


項目的pom.xml文件中引入依賴:

<!--springboot整合rabbitMQ只需引入amqp起步依賴-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>      

完整的pom.xml文件如下(注意demo-rabbitmq-common是上面項目截圖的自己定義的模塊項目萌京,里面只有一個User實體類泊交,為了后續(xù)測試發(fā)送實體類消息增加的):

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.1</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.tp</groupId>
    <artifactId>demo-rabbitmq-producer</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <name>demo-rabbit-mq-producer</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <!--web-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--springboot整合rabbitMQ只需引入amqp起步依賴-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!--lombok-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <!--demo-rabbitmq-common-->
        <dependency>
            <groupId>com.tp</groupId>
            <artifactId>demo-rabbitmq-common</artifactId>
            <version>1.0.0-SNAPSHOT</version>
        </dependency>
        <!--test-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

application.yml文件中進行配置:

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: TP-HOST #虛擬主機乳讥,可以不設置使用server默認host

注意:要保證guest用戶對虛擬主機有讀寫操作權(quán)限柱查,具體去RabbitMQ后臺管理系統(tǒng)配置

2.配置序列化策略

RabbitMQ序列化的選擇可以是jdk序列化,hessian云石,jackson唉工,protobuf等
而對于Java應用默認的序列化采用的是jdk序列化
SimpleMessageConverter對于要發(fā)送的消息體body為字節(jié)數(shù)組時,不進行處理汹忠。
消息本身假設是String淋硝,則將String轉(zhuǎn)成字節(jié)數(shù)組,假設是Java對象宽菜,則使用jdk序列化將消息轉(zhuǎn)成字節(jié)數(shù)組谣膳,轉(zhuǎn)出來的結(jié)果較大,含class類名铅乡、類對應方法等信息继谚,因此性能較差。
hessian阵幸、protobuf等都是基于壓縮反復字段的思想花履,降低數(shù)據(jù)傳輸量以提高性能。
jackson是以json表示來數(shù)據(jù)傳輸挚赊,性能優(yōu)于jdk序列化
所以使用RabbitMq作為中間件時诡壁,數(shù)據(jù)量比較大,此時就要考慮使用類似Jackson2JsonMessageConverter荠割、hessian等序列化形式欢峰,以此提高性能。

在生產(chǎn)者端增加RabbitMQConfig.java:

package com.tp.demo.rabbitmq.producer.config;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * FileName:   RabbitMQConfig
 * Author:     TP
 * Date:       12/13/20 4:31 PM
 * Description:開啟消息發(fā)送消息確認
 */
@Configuration
public class RabbitMQConfig {

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        // 設置序列化策略
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}
3.聲明交換機涨共、隊列信息

需要注意的是纽帖,雖然官方推薦在生產(chǎn)者和消費者都增加交換機、隊列和綁定關系聲明举反,但個人更推薦生產(chǎn)環(huán)境通過管理后臺創(chuàng)建交換機懊直、隊列和聲明綁定關系,一方面可以防止因為開發(fā)人員的代碼錯誤引發(fā)不必要的問題火鼻,另一方面也可以防止每次啟動創(chuàng)建上述這些東西
而如果我們無論在生產(chǎn)者端還是在消費者端進行聲明交換機室囊、隊列、將交換機和隊列進行綁定魁索,都會自動向MQ申請上述聲明融撞,MQ中如果有相同配置的聲明則自動返回成功,如果沒有則新建一個
本例為了方便粗蔚,暫時只在生產(chǎn)者端進行聲明尝偎,當?shù)谝淮伟l(fā)送消息的時候會去MQ服務器申請聲明信息
首先我們來一個直連交換機的例子:

package com.tp.demo.rabbitmq.producer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * FileName:   DirectRabbitConfig
 * Author:     TP
 * Date:       12/13/20 12:01 PM
 * Description: 直連交換機Config
 */
@Configuration
public class DirectRabbitConfig {

    /**
     * 定義隊列,名字為testDirectQueue
     */
    @Bean
    public Queue testDirectQueue() {
        // durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啟時仍然存在致扯,暫存隊列:當前連接有效
        // exclusive:默認也是false肤寝,只能被當前創(chuàng)建的連接使用,而且當連接關閉后隊列即被刪除抖僵。此參考優(yōu)先級高于durable
        // autoDelete:是否自動刪除鲤看,當沒有生產(chǎn)者或者消費者使用此隊列,該隊列會自動刪除耍群。
        // return new Queue("testDirectQueue", true, true, false);

        //一般設置一下隊列的持久化就好,其余兩個就是默認false
        return new Queue("testDirectQueue", true);
    }

    /**
     * 定義Direct類型交換機义桂,名字為testDirectExchange
     */
    @Bean
    DirectExchange testDirectExchange() {
        return new DirectExchange("testDirectExchange", true, false);
    }

    /**
     * 將隊列和交換機綁定,并設置路由鍵:testDirectRouting
     */
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(testDirectQueue()).to(testDirectExchange()).with("testDirectRouting");
    }
}
4.發(fā)送消息
package com.tp.demo.rabbitmq.producer.controller.simple;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * FileName:   SendDirectMessageController
 * Author:     TP
 * Date:       12/13/20 12:11 PM
 * Description:直連交換機消息發(fā)送Controller
 */
@RestController
public class SendDirectMessageController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping("/sendDirectMessage")
    public String sendDirectMessage() {
        String message = "你好蹈垢,我是直連交換機過來的一條消息";
        //將消息攜帶綁定鍵值:testDirectRouting 發(fā)送到交換機testDirectExchange
        rabbitTemplate.convertAndSend("testDirectExchange", "testDirectRouting", message);
        return "ok";
    }
}

好了慷吊,發(fā)送方就是這么簡單,我們可以往MQ發(fā)送一條消息了

觀察管理后臺耘婚,發(fā)現(xiàn)隊列中已經(jīng)有消息了:

消費者集成

1.創(chuàng)建項目

新建一個Springboot項目:demo-rabbitmq-consumer

項目的pom.xml文件中引入依賴:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.1</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.tp</groupId>
    <artifactId>demo-rabbitmq-consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo-rabbitmq-consumer</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <!--web-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--springboot整合rabbitMQ只需引入amqp起步依賴-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!--lombok-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <!--demo-rabbitmq-common-->
        <dependency>
            <groupId>com.tp</groupId>
            <artifactId>demo-rabbitmq-common</artifactId>
            <version>1.0.0-SNAPSHOT</version>
        </dependency>
        <!--test-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

application.yml文件中做如下配置:

server:
  port: 7268
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: TP-HOST #虛擬主機罢浇,可以不設置使用server默認host

注意:生產(chǎn)者和消費者的虛擬主機要對應

2.配置序列化策略

因為生產(chǎn)者我們用了Jackson2JsonMessageConverter進行序列化,所以消費者端我們也使用Jackson2JsonMessageConverter進行反序列化

新建一個配置類RabbitMQConfig如下:

package com.tp.demo.rabbitmq.consumer.config;

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * FileName:   RabbitMQConfig
 * Author:     TP
 * Date:       12/14/20 9:03 AM
 * Description:消息消費者配置
 */
@Configuration
public class RabbitMQConfig {

    /**
     * 消息消費者配置JSON反序列化使用Jackson2JsonMessageConverter沐祷,與消息生產(chǎn)者保持一致
     */
    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }
}
3.配置監(jiān)聽器

消費者端是通過監(jiān)聽器監(jiān)聽消息的嚷闭,我們配置一個監(jiān)聽器用于接收上面的隊列消息:

package com.tp.demo.rabbitmq.consumer.listener.simple;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * FileName:   DirectListener
 * Author:     TP
 * Date:       12/13/20 12:26 PM
 * Description:直連交換機消費者監(jiān)聽器
 * 直連交換機默認輪詢所有消費者
 * 如果我們定義了多個消費者監(jiān)聽了同一個隊列,會以輪詢的方式消費赖临,且不存在重復消費
 */
@Component
public class DirectListener {

    @RabbitListener(queues = "testDirectQueue")
    public void onMessage(String message) {
        System.out.println("DirectListener收到消息:" + message);
    }
}

擴展

  • Spring對amqp的支持很靈活胞锰,在消費者端我們可以使用org.springframework.amqp.core.Message對象統(tǒng)一接收消息,也可以使用你喜歡的任意類型進行接收兢榨,但要保證發(fā)送時候和接收時候的對象類型要保持一致嗅榕。(例如StringMap吵聪、JavaBean都可以)
  • 注解@RabbitListener可以定義在方法上凌那,也可以定義在類上,用以聲明一個消息監(jiān)聽器吟逝。
    -- 如果定義在類上帽蝶,需要配合@RabbitHandler標注在對應的方法上,指明具體使用哪個方法做監(jiān)聽
    -- 如果定義在方法上块攒,則可以省略@RabbitHandler
  • 如果我們定義了多個相同配置的消息監(jiān)聽器励稳,消費者會輪詢消費,且不會重復消費

啟動消費者項目囱井,在控制臺會得到如下輸出:

Connected to the target VM, address: '127.0.0.1:52798', transport: 'socket'

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.4.1)

2020-12-14 15:29:50.092  INFO 98654 --- [           main] .t.d.r.c.DemoRabbitmqConsumerApplication : Starting DemoRabbitmqConsumerApplication using Java 1.8.0_191 on tianpengdeMacBook-Pro.local with PID 98654 (/Users/tianpeng/workspace/tp/my-boot-rabbitmq/demo-rabbitmq-consumer/target/classes started by tianpeng in /Users/tianpeng/workspace/tp/my-boot-rabbitmq)
2020-12-14 15:29:50.094  INFO 98654 --- [           main] .t.d.r.c.DemoRabbitmqConsumerApplication : No active profile set, falling back to default profiles: default
2020-12-14 15:29:50.805  INFO 98654 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 7268 (http)
2020-12-14 15:29:50.812  INFO 98654 --- [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2020-12-14 15:29:50.812  INFO 98654 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.41]
2020-12-14 15:29:50.861  INFO 98654 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2020-12-14 15:29:50.861  INFO 98654 --- [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 735 ms
2020-12-14 15:29:51.184  INFO 98654 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2020-12-14 15:29:51.396  INFO 98654 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 7268 (http) with context path ''
2020-12-14 15:29:51.398  INFO 98654 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [127.0.0.1:5672]
2020-12-14 15:29:51.425  INFO 98654 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#37c36608:0/SimpleConnection@7e62cfa3 [delegate=amqp://guest@127.0.0.1:5672/TP-HOST, localPort= 52811]
DirectListener收到消息:你好驹尼,我是直連交換機過來的一條消息
2020-12-14 15:29:51.555  INFO 98654 --- [           main] .t.d.r.c.DemoRabbitmqConsumerApplication : Started DemoRabbitmqConsumerApplication in 1.777 seconds (JVM running for 2.34)

由此看到,消息被正確消費了庞呕,又由于我們采用的是RabbitMQ的默認消息確認機制:自動確認新翎,所以此條消息會被RabbitMQ從隊列中移除:

上述示例是直連交換機的演示,關于扇形交換機和主題交換機,這里分別給出消息生產(chǎn)者和消費者的代碼料祠,就不進行演示和后臺截圖了骆捧,相信大家自己將下面代碼copy到自己項目里澎羞,操作一下就能實現(xiàn)

生產(chǎn)者端扇形交換機聲明:

package com.tp.demo.rabbitmq.producer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * FileName:   FanoutRabbitConfig
 * Author:     TP
 * Date:       12/13/20 3:58 PM
 * Description:
 */
@Configuration
public class FanoutRabbitConfig {

    /**
     * 創(chuàng)建三個隊列 :fanout.A   fanout.B  fanout.C
     * 將三個隊列都綁定在交換機fanoutExchange上
     * 因為是扇型交換機髓绽,路由鍵無需配置,配置也不起作用
     */
    @Bean
    public Queue queueA() {
        return new Queue("fanout.A");
    }

    @Bean
    public Queue queueB() {
        return new Queue("fanout.B");
    }

    @Bean
    public Queue queueC() {
        return new Queue("fanout.C");
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    @Bean
    Binding bindingExchangeA() {
        return BindingBuilder.bind(queueA()).to(fanoutExchange());
    }

    @Bean
    Binding bindingExchangeB() {
        return BindingBuilder.bind(queueB()).to(fanoutExchange());
    }

    @Bean
    Binding bindingExchangeC() {
        return BindingBuilder.bind(queueC()).to(fanoutExchange());
    }
}

生產(chǎn)者端主題交換機聲明:

package com.tp.demo.rabbitmq.producer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * FileName:   TopicRabbitConfig
 * Author:     TP
 * Date:       12/13/20 12:52 PM
 * Description:主題交換機Config
 */
@Configuration
public class TopicRabbitConfig {
    // 綁定鍵
    private final static String man = "topic.man";
    private final static String woman = "topic.woman";

    @Bean
    public Queue firstQueue() {
        return new Queue(TopicRabbitConfig.man);
    }

    @Bean
    public Queue secondQueue() {
        return new Queue(TopicRabbitConfig.woman);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange("topicExchange");
    }


    // 將firstQueue和topicExchange綁定,而且綁定的鍵值為topic.man
    // 這樣只要是消息攜帶的路由鍵是topic.man,才會分發(fā)到該隊列
    @Bean
    Binding bindingExchangeMessage() {
        return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
    }

    // 將secondQueue和topicExchange綁定,而且綁定的鍵值為用上通配路由鍵規(guī)則topic.#
    // 這樣只要是消息攜帶的路由鍵是以topic.開頭,都會分發(fā)到該隊列
    @Bean
    Binding bindingExchangeMessage2() {
        return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
    }
}

生產(chǎn)者端發(fā)送消息

package com.tp.demo.rabbitmq.producer.controller.simple;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * FileName:   SendFanoutMessageController
 * Author:     TP
 * Date:       12/13/20 4:02 PM
 * Description:扇形交換機消息發(fā)送Controller
 */
@RestController
public class SendFanoutMessageController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendFanoutMessage")
    public String sendFanoutMessage() throws JsonProcessingException {
        String message = "message: testFanoutMessage...";
        for (int i = 0; i < 200; i++) {
            rabbitTemplate.convertAndSend("fanoutExchange", null, message);
        }
        return "ok";
    }
}
package com.tp.demo.rabbitmq.producer.controller.simple;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * FileName:   SendTopicMessageController
 * Author:     TP
 * Date:       12/13/20 12:56 PM
 * Description:主題交換機消息發(fā)送Controller
 */
@RestController
public class SendTopicMessageController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping("/sendTopicMessage1")
    public String sendTopicMessage1() {
        String message = "message: MAN ";
        rabbitTemplate.convertAndSend("topicExchange", "topic.man", message);
        return "ok";
    }

    @GetMapping("/sendTopicMessage2")
    public String sendTopicMessage2() {
        String message = "message: woman will all in";
        rabbitTemplate.convertAndSend("topicExchange", "topic.woman", message);
        return "ok";
    }
}

消費者端消息監(jiān)聽器:

package com.tp.demo.rabbitmq.consumer.listener.simple;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * FileName:   FanoutListenerA
 * Author:     TP
 * Date:       12/13/20 4:07 PM
 * Description:
 */
@Component
public class FanoutListenerA {

    @RabbitListener(queues = "fanout.A")
    public void onMessage(String message) {
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("FanoutListenerA收到消息:" + message);
    }
}
package com.tp.demo.rabbitmq.consumer.listener.simple;


import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * FileName:   FanoutListenerC
 * Author:     TP
 * Date:       12/13/20 4:08 PM
 * Description:
 */
@Component
public class FanoutListenerB {

    @RabbitListener(queues = "fanout.B")
    public void onMessage(String message) {
        System.out.println("FanoutListenerB收到消息:" + message);
    }
}
package com.tp.demo.rabbitmq.consumer.listener.simple;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * FileName:   FanoutListenerC
 * Author:     TP
 * Date:       12/13/20 4:08 PM
 * Description:
 */
@Component
public class FanoutListenerC {

    @RabbitListener(queues = "fanout.C")
    public void onMessage(String message) {
        System.out.println("FanoutListenerC收到消息:" + message);
    }
}
package com.tp.demo.rabbitmq.consumer.listener.simple;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * FileName:   TopicListener1
 * Author:     TP
 * Date:       12/13/20 2:32 PM
 * Description:
 */
@Component
public class TopicListener1 {

    @RabbitListener(queues = "topic.man")
    public void onMessage(String message) {
        System.out.println("TopicListener1收到消息:" + message);
    }
}
package com.tp.demo.rabbitmq.consumer.listener.simple;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * FileName:   TopicListener
 * Author:     TP
 * Date:       12/13/20 2:32 PM
 * Description:
 */
@Component
public class TopicListener2 {

    @RabbitListener(queues = "topic.woman")
    public void process(String message) {
        System.out.println("TopicListener2收到消息:" + message);
    }
}

三.消息可靠投遞:消息確認機制

RabbitMQ中生產(chǎn)者端和消費者端都有消息確認機制妆绞,已盡量避免消息的丟失

消息生產(chǎn)者投遞確認

====================================================
生產(chǎn)者發(fā)送確認機制共有2種回調(diào):ConfirmCallback顺呕、ReturnCallback
兩種回調(diào)函數(shù)都是在什么情況會觸發(fā)呢?
總體來說括饶,推送消息存在以下四種情況:
①消息推送到server株茶,但是在server里找不到交換機
②消息推送到server,找到交換機了图焰,但是沒找到隊列
③消息推送到sever启盛,交換機和隊列啥都沒找到
④消息推送成功
====================================================
①這種情況觸發(fā)的是 ConfirmCallback回調(diào)函數(shù)
②這種情況觸發(fā)的是 ConfirmCallback和ReturnCallback兩個回調(diào)函數(shù)
③這種情況觸發(fā)的是 ConfirmCallback回調(diào)函數(shù)
④這種情況觸發(fā)的是 ConfirmCallback回調(diào)函數(shù)
====================================================
我們可以在回調(diào)函數(shù)根據(jù)需求做對應的擴展或者業(yè)務數(shù)據(jù)處理

為了支持生產(chǎn)者消息投遞確認,我們需要作如下內(nèi)容:

  • application.yml中增加如下配置:
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: TP-HOST #虛擬主機技羔,可以不設置使用server默認host
    publisher-confirm-type: correlated #確認消息已發(fā)送到交換機(Exchange)
    publisher-returns: true #確認消息已發(fā)送到隊列(Queue)
  • 對之前生產(chǎn)者端的RabbitMQConfig.java進行改造如下:
package com.tp.demo.rabbitmq.producer.config;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * FileName:   RabbitMQConfig
 * Author:     TP
 * Date:       12/13/20 4:31 PM
 * Description:開啟消息發(fā)送消息確認
 */
@Configuration
public class RabbitMQConfig {
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        // 設置開啟Mandatory僵闯,才能觸發(fā)回調(diào)函數(shù),無論消息推送結(jié)果怎么樣都強制調(diào)用回調(diào)函數(shù)
        rabbitTemplate.setMandatory(true);

        // 設置發(fā)送方確認
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            System.out.println("ConfirmCallback >>> " + "相關數(shù)據(jù):" + correlationData);
            System.out.println("ConfirmCallback >>> " + "確認情況:" + ack);
            System.out.println("ConfirmCallback >>> " + "原因:" + cause);
        });

        rabbitTemplate.setReturnsCallback(e -> {
            System.out.println("ReturnCallback >>> " + "消息:" + e.getMessage());
            System.out.println("ReturnCallback >>> " + "回應碼:" + e.getReplyCode());
            System.out.println("ReturnCallback >>> " + "回應信息:" + e.getReplyText());
            System.out.println("ReturnCallback >>> " + "交換機:" + e.getExchange());
            System.out.println("ReturnCallback >>> " + "路由鍵:" + e.getRoutingKey());
        });

        // 設置序列化策略
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

為了方便藤滥,我們封裝了一個消息發(fā)送工具:

package com.tp.demo.rabbitmq.producer.sender;

import com.tp.demo.rabbitmq.producer.utils.RandomUtils;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * FileName:   RabbitSender
 * Author:     TP
 * Date:       12/14/20 9:16 AM
 * Description:封裝一個RabbitMQ發(fā)送消息對象鳖粟,方便使用
 * 當然,你也可以直接使用RabbitTemplate
 */
@Component
public class RabbitSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //發(fā)送消息方法調(diào)用: 構(gòu)建Message消息
    public void convertAndSend(String exchange, String routingKey, Object message) {
        // 時間戳+6位隨機字符保證全局唯一
        // 用于ack保證唯一一條消息(在做補償策略的時候拙绊,必須保證這是全局唯一的消息)
        // 在消費方可以通過message.getMessageProperties().getHeaders().get("spring_returned_message_correlation")獲取到該CorrelationData
        CorrelationData correlationData = new CorrelationData(RandomUtils.UUID());
        // 發(fā)送消息
        rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
    }
}

這個方法中向图,每次發(fā)送消息的時候會生成一個全局唯一標識放入CorrelationDataCorrelationData我們也可以封裝業(yè)務ID信息(把send方法增加個參數(shù)标沪,在發(fā)送消息的時候指定)榄攀,這樣我們就可以在消息發(fā)送失敗的時候,根據(jù)業(yè)務ID進行自己的補償機制

我們發(fā)送一個不存在的交換機進行測試:

package com.tp.demo.rabbitmq.producer.controller.simple;

import com.tp.demo.rabbitmq.producer.sender.RabbitSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * FileName:   TestSendFailCallbackController
 * Author:     TP
 * Date:       12/14/20 10:22 AM
 * Description:
 */
@RestController
public class TestSendFailCallbackController {

    @Autowired
    RabbitSender rabbitSender;

    @GetMapping("/sendMessageFail")
    public String sendDirectMessage() {
        String message = "Hello金句,This will fail...";
        //將消息攜帶綁定鍵值:testDirectRouting 發(fā)送到交換機testDirectExchange
        rabbitSender.convertAndSend("none_exchange", "testDirectRouting", message);
        return "ok";
    }
}

查看控制臺輸出:

Connected to the target VM, address: '127.0.0.1:53731', transport: 'socket'

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.4.1)

2020-12-14 16:01:15.954  INFO 5063 --- [           main] .t.d.r.p.DemoRabbitMqProducerApplication : Starting DemoRabbitMqProducerApplication using Java 1.8.0_191 on tianpengdeMacBook-Pro.local with PID 5063 (/Users/tianpeng/workspace/tp/my-boot-rabbitmq/demo-rabbitmq-producer/target/classes started by tianpeng in /Users/tianpeng/workspace/tp/my-boot-rabbitmq)
2020-12-14 16:01:15.957  INFO 5063 --- [           main] .t.d.r.p.DemoRabbitMqProducerApplication : No active profile set, falling back to default profiles: default
2020-12-14 16:01:16.791  INFO 5063 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 7269 (http)
2020-12-14 16:01:16.800  INFO 5063 --- [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2020-12-14 16:01:16.800  INFO 5063 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.41]
2020-12-14 16:01:16.854  INFO 5063 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2020-12-14 16:01:16.855  INFO 5063 --- [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 862 ms
2020-12-14 16:01:17.204  INFO 5063 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2020-12-14 16:01:17.362  INFO 5063 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 7269 (http) with context path ''
2020-12-14 16:01:17.369  INFO 5063 --- [           main] .t.d.r.p.DemoRabbitMqProducerApplication : Started DemoRabbitMqProducerApplication in 1.72 seconds (JVM running for 2.142)
2020-12-14 16:02:26.993  INFO 5063 --- [nio-7269-exec-2] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2020-12-14 16:02:26.993  INFO 5063 --- [nio-7269-exec-2] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2020-12-14 16:02:26.994  INFO 5063 --- [nio-7269-exec-2] o.s.web.servlet.DispatcherServlet        : Completed initialization in 1 ms
2020-12-14 16:02:27.074  INFO 5063 --- [nio-7269-exec-2] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [127.0.0.1:5672]
2020-12-14 16:02:27.100  INFO 5063 --- [nio-7269-exec-2] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#7bab5898:0/SimpleConnection@1759112d [delegate=amqp://guest@127.0.0.1:5672/TP-HOST, localPort= 53771]
2020-12-14 16:02:27.139 ERROR 5063 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'none_exchange' in vhost 'TP-HOST', class-id=60, method-id=40)
ConfirmCallback >>> 相關數(shù)據(jù):CorrelationData [id=20201214160227064961897]
ConfirmCallback >>> 確認情況:false
ConfirmCallback >>> 原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'none_exchange' in vhost 'TP-HOST', class-id=60, method-id=40)

可以看到CorrelationData中取到了我們生成的UUID檩赢,我們可以根據(jù)這個UUID做自己的要業(yè)務補償

順便貼一下自定義的RandomUtils:

package com.tp.demo.rabbitmq.producer.utils;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Random;

/**
 * FileName:   RandomUtils
 * Author:     TP
 * Date:       12/14/20 9:36 AM
 * Description:
 */
public class RandomUtils {

    public synchronized static String UUID() {
        DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS");
        String current = LocalDateTime.now().format(dtf);
        return current + getRandomString(6);
    }

    public synchronized static String UUID(String prefix) {
        DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS");
        String current = LocalDateTime.now().format(dtf);
        return prefix + current + getRandomString(6);
    }

    private static String getRandomString(int length) {
        StringBuffer sb = new StringBuffer();
        if (length > 0) {
            for (int i = 0; i < length; i++) {
                sb.append(new Random().nextInt(10));
            }
            return sb.toString();
        }
        return null;
    }
}

到此,消息生產(chǎn)者確認機制就算完成了

消息消費者投遞確認

重頭戲來了趴梢,消息消費者端RabbitMQ默認是自動確認的漠畜,只要消息發(fā)送到了消費者,則認為消息已被消費坞靶,消息在RabbitMQ服務器會被移除憔狞,這顯然在生產(chǎn)環(huán)境上是極度危險的,所以我們都會設置消息消費者端的消費確認為手動彰阴,具體步驟如下:

  • 改造消息消費者端的RabbitMQConfig.java:
package com.tp.demo.rabbitmq.consumer.config;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * FileName:   RabbitMQConfig
 * Author:     TP
 * Date:       12/14/20 9:03 AM
 * Description:消息消費者配置
 */
@Configuration
public class RabbitMQConfig {

    /**
     * 消息消費者配置JSON反序列化使用Jackson2JsonMessageConverter瘾敢,與消息生產(chǎn)者保持一致
     */
    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }

}

注意:對于沒有自己聲明上述SimpleRabbitListenerContainerFactory的同學,可以在yml中直接配置:

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: TP-HOST #虛擬主機,可以不設置使用server默認host
    listener:
      simple:
        acknowledge-mode: manual

而我們自己聲明了SimpleRabbitListenerContainerFactory簇抵,這時如果在yml中增加上述配置是無效的

  • 監(jiān)聽器內(nèi)進行手動消息確認
package com.tp.demo.rabbitmq.consumer.listener.ack;

import com.rabbitmq.client.Channel;
import com.tp.demo.rabbitmq.common.entity.User;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * FileName:   DirectAckListener
 * Author:     TP
 * Date:       12/13/20 7:02 PM
 * Description:
 * 注解@RabbitListener可以定義在方法上也可以定義在類上
 * -- 如果定義在類上庆杜,需要配合@RabbitHandler標注在方法上,指明具體使用哪個方法做監(jiān)聽
 * -- 如果定義在方法上碟摆,則可以省略@RabbitHandler
 */
@Component
public class DirectAckListener {

    @RabbitListener(queues = "testDirectAckQueue")
    public void process(Message message, Channel channel) throws IOException {
        System.out.println("接收到消息總體內(nèi)容:" + message);
        System.out.println("實際消息內(nèi)容:" + new String(message.getBody()));

        // TODO 業(yè)務邏輯
        // 1.獲取message中的body晃财,解析消息內(nèi)容
        // 2.其他業(yè)務邏輯......

        // 回執(zhí)情形1:消費成功
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        // 回執(zhí)情形2:消費成功消費處理失敗,重新放入隊列(一定要慎用典蜕,防止造成無限返回隊列->消費者->返回隊列.....造成消息積壓)
        // channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);

        // 回執(zhí)情形3:消費處理失敗断盛,拒絕接收(可以指定是否重新放入隊列,如果消息不重新放入隊列愉舔,RabbitMQ服務端會將消息移除)
        // channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
    }

    /**
     * 如果你很懶钢猛,不想從message中獲取body再自己反序列化為想要的實體bean怎么辦?
     * Spring對rabbitMQ的集成允許我們直接使用bean接收轩缤,如下:直接可以用形參封裝
     * 擴展:我們可以在生產(chǎn)者端發(fā)送任意類型的消息命迈,并且在消費者端直接用形參封裝,但你必須保證用的是同一種數(shù)據(jù)類型
     * 注意:如果想測試這種快捷方式火的,請將注解注釋放開壶愤,并將上面的process全部注釋掉
     */
    // @RabbitListener(queues = "testDirectAckQueue")
    public void process(User user, Message message, Channel channel) throws IOException {
        System.out.println(user);

        // TODO 業(yè)務邏輯
        // ......

        // 回執(zhí)情形1:進行消息回執(zhí)
        channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);

        // 回執(zhí)情形2:消費成功消費處理失敗,重新放入隊列(一定要慎用卫玖,防止造成無限返回隊列->消費者->返回隊列.....造成消息積壓)
        // channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);

        // 回執(zhí)情形3:消費處理失敗公你,拒絕接收(可以指定是否重新放入隊列,如果消息不重新放入隊列假瞬,RabbitMQ服務端會將消息移除)
        // channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);

    }
}

當我們設置了消息手動確認陕靠,如果消息到了消費者而消費者一直不確認,在RabbitMQ中這條消息將會一直處于unacked待確認狀態(tài)脱茉,直到消費者與RabbitMQ斷開連接剪芥,這條消息又會重新變成ready狀態(tài),消費者重啟后會重新消費消息琴许,對于消費者手動確認税肪,其回執(zhí)方式有3種,詳見上述代碼的注釋榜田,這里就不再說明了

測試:
發(fā)送一條需要手動確認的消息如下:

package com.tp.demo.rabbitmq.producer.controller.ack;

import com.tp.demo.rabbitmq.common.entity.User;
import com.tp.demo.rabbitmq.producer.sender.RabbitSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;

/**
 * FileName:   SendDirectMessageController
 * Author:     TP
 * Date:       12/13/20 12:11 PM
 * Description:直連交換機消息發(fā)送Controller
 */
@RestController
public class SendDirectAckMessageController {

    @Autowired
    RabbitSender rabbitSender;

    @GetMapping("/sendDirectMessageAck")
    public String sendDirectMessage() {
        User user = new User();
        user.setId(1);
        user.setUserName("TP");
        user.setPassWord("pwd123456");
        user.setAge(18);
        user.setCreateTime(LocalDateTime.now());
        rabbitSender.convertAndSend("testDirectAckExchange", "testDirectAckRouting", user);
        return "ok";
    }

    @GetMapping("/sendDirectMessageAck2")
    public String sendDirectMessage2() {
        for (int i = 20; i <= 30; i++) {
            rabbitSender.convertAndSend("testDirectAckExchange", "testDirectAckRouting", "我是一條需要確認的消息");
        }
        return "ok";
    }
}

我們在消費者端加上debug益兄,讓消息先不走回執(zhí),觀察效果:

放開debu后:
控制臺輸出:

接收到消息總體內(nèi)容:(Body:'{"id":1,"userName":"TP","passWord":"pwd123456","age":18,"createTime":[2020,12,14,16,35,16,705000000]}' MessageProperties [headers={spring_listener_return_correlation=97fb83de-fd43-4529-9d24-abef559a9fcb, spring_returned_message_correlation=20201214163516705225241, __TypeId__=com.tp.demo.rabbitmq.common.entity.User}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=testDirectAckExchange, receivedRoutingKey=testDirectAckRouting, deliveryTag=2, consumerTag=amq.ctag-F5bUnBjCf-umNuRMKLLUAg, consumerQueue=testDirectAckQueue])
實際消息內(nèi)容:{"id":1,"userName":"TP","passWord":"pwd123456","age":18,"createTime":[2020,12,14,16,35,16,705000000]}

其他2種回執(zhí)箭券,請自行測試

本示例發(fā)送實體類消息用的User類如下:

package com.tp.demo.rabbitmq.common.entity;

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import lombok.Data;

import java.time.LocalDateTime;

/**
 * FileName:   User
 * Author:     TP
 * Date:       12/14/20 11:33 AM
 * Description:
 */
@Data
public class User {

    private Integer id;

    private String userName;

    private String passWord;

    private Integer age;

    @JsonDeserialize(using = LocalDateTimeDeserializer.class)
    @JsonSerialize(using = LocalDateTimeSerializer.class)
    private LocalDateTime createTime;
}

至此净捅,Springboot整合RabbitMQ完畢

問題

鄙人在測試中遇到如下問題:消費者端已經(jīng)開啟了消費手動確認,如果我們發(fā)送一條消息辩块,消息內(nèi)容為一個JavaBean蛔六,如果在消費者端監(jiān)聽器進行反序列化消息內(nèi)容到Message參數(shù)時失敗拋出異常了荆永,則RabbitMQ會直接將消息移除,而不會將這條消息標記為unacked国章,這會導致消息丟失
為什么會這樣我也不知道具钥,如果有高人看到此篇文章并對這種情形有理解,請留言液兽,不勝感激骂删!

當然了,我們可以轉(zhuǎn)JSON發(fā)送String消息抵碟,然后自己接收后再解析桃漾,也可以生產(chǎn)者和消費者引用maven私服內(nèi)的同一個jar包坏匪,同一個實體類不會出現(xiàn)反序列化失敗的問題拟逮,不理解的是如果Message就是封裝失敗了,為什么會將這條消息移除呢适滓,而不是標記為未確認呢敦迄??凭迹?

后續(xù)

針對上面的問題罚屋,通過艱難的Spring-amqp源碼debug,定位了問題所在:
消費端監(jiān)聽器如果封裝參數(shù)失敗會拋出:org.springframework.amqp.rabbit.support.ListenerExecutionFailedException嗅绸,這個異常被認為為fatal異常脾猛,也就是致命異常

圖中可以看出,這種情形下鱼鸠,Spring-amqp在進行nack的時候猛拴,是否requeue最終為false,所以不會重新放入隊列中

那么針對這種情形蚀狰,我們怎么解決呢愉昆?
我們可以使用死信隊列,這種情況消息會被認為是死信并發(fā)送到死信隊列里(如果已經(jīng)配置)

并且強烈推薦生產(chǎn)環(huán)境為自己的所有業(yè)務隊列配置上死信隊列麻蹋,能保證消息的可靠性跛溉,通過死信隊列進行業(yè)務補償。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末扮授,一起剝皮案震驚了整個濱河市芳室,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌刹勃,老刑警劉巖堪侯,帶你破解...
    沈念sama閱讀 216,372評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異深夯,居然都是意外死亡抖格,警方通過查閱死者的電腦和手機诺苹,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評論 3 392
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來雹拄,“玉大人收奔,你說我怎么就攤上這事∽揖粒” “怎么了坪哄?”我有些...
    開封第一講書人閱讀 162,415評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長势篡。 經(jīng)常有香客問我翩肌,道長,這世上最難降的妖魔是什么禁悠? 我笑而不...
    開封第一講書人閱讀 58,157評論 1 292
  • 正文 為了忘掉前任念祭,我火速辦了婚禮,結(jié)果婚禮上碍侦,老公的妹妹穿的比我還像新娘粱坤。我一直安慰自己,他們只是感情好瓷产,可當我...
    茶點故事閱讀 67,171評論 6 388
  • 文/花漫 我一把揭開白布站玄。 她就那樣靜靜地躺著,像睡著了一般濒旦。 火紅的嫁衣襯著肌膚如雪株旷。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,125評論 1 297
  • 那天尔邓,我揣著相機與錄音晾剖,去河邊找鬼。 笑死铃拇,一個胖子當著我的面吹牛钞瀑,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播慷荔,決...
    沈念sama閱讀 40,028評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼雕什,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了显晶?” 一聲冷哼從身側(cè)響起贷岸,我...
    開封第一講書人閱讀 38,887評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎磷雇,沒想到半個月后偿警,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,310評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡唯笙,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,533評論 2 332
  • 正文 我和宋清朗相戀三年螟蒸,在試婚紗的時候發(fā)現(xiàn)自己被綠了盒使。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,690評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡七嫌,死狀恐怖少办,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情诵原,我是刑警寧澤英妓,帶...
    沈念sama閱讀 35,411評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站绍赛,受9級特大地震影響蔓纠,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜吗蚌,卻給世界環(huán)境...
    茶點故事閱讀 41,004評論 3 325
  • 文/蒙蒙 一腿倚、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧褪测,春花似錦猴誊、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽乖杠。三九已至分扎,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間胧洒,已是汗流浹背畏吓。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評論 1 268
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留卫漫,地道東北人。 一個月前我還...
    沈念sama閱讀 47,693評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像达布,于是被迫代替她去往敵國和親骇窍。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,577評論 2 353

推薦閱讀更多精彩內(nèi)容