十四、SpringBoot整合RabbitMQ

1蔫浆、安裝

1.1殖属、Erlang:

Erlang下載地址,下載后安裝即可瓦盛。

1.2忱辅、RabbitMQ安裝

RabbitMQ下載地址,下載后安裝即可谭溉。

注意:Erlang的版本要與RabbitMQ版本需要匹配才行墙懂。

RabbitMQ Minimum required Erlang/OTP Maximum supported Erlang/OTP
3.7.x 19.3 20.3.x
3.6.15 19.3 20.3.x
3.6.14、 3.6.13 扮念、 3.6.12损搬、 3.6.11 R16B03 20.1.x
3.6.10 、 3.6.9柜与、 3.6.8巧勤、 3.6.7、 3.6.6弄匕、 3.6.5颅悉、 3.6.4 R16B03 19.3.x
3.6.3 、 3.6.2迁匠、 3.6.1剩瓶、 3.6.0 R16B03 18.3.x
3.5.x R14B04 17.5.x
3.4.x R13B03 16B03

2、可視化管理界面

  • Erlang和RabbitMQ安裝完成后城丧,通過命令行進(jìn)入到RabbitMQ的安裝目錄下的sbin目錄延曙,輸入以下命令,等待返回亡哄。

    • rabbitmq-plugins enable rabbitmq_management
  • 訪問http://ip:15672/, 使用guest/guest或者admin/admin登錄枝缔。

  • 如果使用的是Linux系統(tǒng),記得把防火墻的端口15672開放或者把防火墻關(guān)閉蚊惯。

  • 如果代碼中需要使用新用戶作為測試愿卸,需要在Admin標(biāo)簽頁中新建一個用戶,并同時設(shè)置密碼和virtual hosts截型。

RabbitMQ新增用戶.png

3趴荸、RabbitMQ術(shù)語

  • Server(Broker):接收客戶端連接,實(shí)現(xiàn)AMQP協(xié)議的消息隊列和路由功能的進(jìn)程菠劝;
  • Virtual Host:虛擬主機(jī)的概念赊舶,類似權(quán)限控制組睁搭,一個Virtual Host里可以有多個Exchange和Queue,權(quán)限控制的最小麗都是Virtual Host;
  • Exchange:交換機(jī)笼平,接收生產(chǎn)者發(fā)送的消息园骆,并根據(jù)Routing Key將消息路由到服務(wù)器中的隊列Queue。
  • ExchangeType:交換機(jī)類型決定了路由消息行為寓调,RabbitMQ中主要有三種類型Exchange锌唾,分別是fanout、direct夺英、topic晌涕;
  • Message Queue:消息隊列,用于存儲還未被消費(fèi)者消費(fèi)的消息;
  • Message:由Header和body組成,Header是由生產(chǎn)者添加的各種屬性的集合战得,包括Message是否被持久化懂傀、優(yōu)先級是多少萄喳、由哪個Message Queue接收等;body是真正需要發(fā)送的數(shù)據(jù)內(nèi)容;
  • BindingKey(RouteKey):綁定關(guān)鍵字,將一個特定的Exchange和一個特定的Queue綁定起來垮衷。

4、與Spring Boot的整合(簡單版HelloWorld)

4.1乖坠、引入RabbitMQ依賴

首先當(dāng)然是添加RabbitMQ的依賴?yán)膊笸唬瑥膍vn repository找到SpringBoot整合RabbitMQ的整合包。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>${spring-boot-amqp.version}</version>
</dependency>

來看看其依賴情況:

spring-boot-starter-amqp依賴情況.png

4.2熊泵、添加RabbitMQ配置信息

spring.rabbitmq.host=ip地址
spring.rabbitmq.port=5672
spring.rabbitmq.username=用戶名
spring.rabbitmq.password=密碼
spring.rabbitmq.virtual-host=/
spring.rabbitmq.publisher-confirms=true

這里配置的信息是最基礎(chǔ)的仰迁,復(fù)雜的配置可以自行百度嘗試。

4.3戈次、創(chuàng)建消息生產(chǎn)者

通過注入AmqpTemplate接口的實(shí)例來實(shí)現(xiàn)消息的發(fā)送轩勘,AmqpTemplate接口定義了一套針對AMQP協(xié)議的基礎(chǔ)操作筒扒。在Spring Boot中會根據(jù)配置來注入其具體實(shí)現(xiàn)怯邪。

@Component
public class Sender {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send(){
        String content = "hello : " + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));

        amqpTemplate.convertAndSend("hello", content);
    }
}

此時的AmqpTemplate對象其實(shí)是RabbitTemplate的實(shí)例,因為RabbitTemplateAmqpTemplate的子類:

AmqpTemplate子類.png

至于為啥能自動注入這個Bean花墩,后面會講解到悬秉,先不急。

4.4冰蘑、創(chuàng)建消息消費(fèi)者

通過@RabbitListener注解定義該類對hello隊列的監(jiān)聽和泌,并用@RabbitHandler注解來指定對消息的處理方法。所以祠肥,該消費(fèi)者實(shí)現(xiàn)了對hello隊列的消費(fèi)武氓,消費(fèi)操作為輸出消息的字符串內(nèi)容。

@RabbitListener(queues = {"hello"})
@Component
public class Receiver {

    @RabbitHandler
    public void handler(String content){
        System.out.println("接收到:" + content);
    }
}

4.5、創(chuàng)建RabbitMQ配置類

用來配置隊列县恕、交換器东羹、路由等高級信息。

@Configuration
public class RabbitConfig {

    @Bean
    public Queue helloQueue(){
        return new Queue("hello");
    }
}

4.6忠烛、編寫測試類

注入消息生產(chǎn)者用于向隊列中發(fā)送消息

@SpringBootTest
@RunWith(SpringRunner.class)
public class TestRabbitmq {

    @Autowired
    private Sender sender;

    @Test
    public void test(){
        sender.send();
    }
}

4.7属提、測試

  • 先啟動主程序類,用于監(jiān)聽隊列美尸;
  • 然后運(yùn)行測試類冤议,使用生產(chǎn)者向隊列中發(fā)送消息。

4.8 师坎、疑問

學(xué)過RabbitMQ基礎(chǔ)的童靴肯定知道恕酸,要通過RabbitMQ發(fā)送消息的話,需要創(chuàng)建通道胯陋,交換機(jī)尸疆,隊列,并將通道與交換機(jī)惶岭、交換機(jī)與隊列綁定起來寿弱,而上述的簡單例子中,為什么沒看到通道按灶、交換機(jī)的創(chuàng)建症革,也沒看到綁定的操作呢?其實(shí)在RabbitMQ中鸯旁,在不創(chuàng)建交換機(jī)的情況下噪矛,RabbitMQ會創(chuàng)建一個默認(rèn)的交換機(jī),通過RabbitMQ可視化管理界面可以看到:


RabbitMQ默認(rèn)交換機(jī).png

而且創(chuàng)建的隊列铺罢,默認(rèn)也就綁定到該交換機(jī)上艇挨,見下圖:


RabbitMQ默認(rèn)交換機(jī)與隊列的綁定.png

再仔細(xì)看這個默認(rèn)交換機(jī),能看到這個交換機(jī)(Exchange)類型是Direct模式的韭赘,至于什么是Direct模式缩滨,后面會講。

5泉瞻、Spring Boot自動配置RabbitMQ

同之前文章一樣脉漏,SpringBoot整合RabbitMQ同樣有個自動配置類,只不過RabbitMQ的自動配置類是由SpringBoot官方自行提供袖牙,而不像Mybatis是由Mybatis方提供的侧巨。這個自動配置類在spring-boot-autoconfigure-xxx.jar包中:


SpringBoot整合RabbitMQ的自動配置類.png

這里說個題外話,是關(guān)于自定義Starter的小知識點(diǎn):

  • 啟動器(Starter)只用來做依賴導(dǎo)入鞭达;
  • 需要專門寫一個自動配置模塊司忱;
  • 啟動器依賴自動配置模塊皇忿,使用的時候只需要引入啟動器(Starter);

而在命名規(guī)范中約定如下:

  • 官方命名空間:
    • 前綴:spring-boot-starter-xxx
    • 模式:spring-boot-starter-模塊名
    • 舉例:spring-boot-starter-web、spring-boot-starter-jdbc坦仍、...
  • 自定義命名空間:
    • 前綴:xxx-spring-boot-starter
    • 模式:模塊名-spring-boot-starter
    • 舉例:mybatis-spring-boot-starter禁添、pagehelper-spring-boot-starter、...

進(jìn)入正題之前桨踪,咱們先來看看Spring與RabbitMQ的整合時的配置信息:

<!-- RabbitMQ start -->
<!-- 連接配置 -->
<rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}"
                           password="${mq.password}" port="${mq.port}"  />

<rabbit:admin connection-factory="connectionFactory"/>

<!-- 消息隊列客戶端 -->
<rabbit:template id="amqpTemplate" exchange="${mq.queue}_exchange" connection-factory="connectionFactory"  />

<!-- queue 隊列聲明 -->
<!-- 
        durable 是否持久化 
        exclusive 僅創(chuàng)建者可以使用的私有隊列老翘,斷開后自動刪除 
        auto-delete 當(dāng)所有消費(fèi)端連接斷開后,是否自動刪除隊列 -->
<rabbit:queue id="test_queue" name="${mq.queue}_testQueue" durable="true" auto-delete="false" exclusive="false" />

<!-- Topic交換機(jī)定義锻离,其他類型交換機(jī)類似 -->
<!-- 
        交換機(jī):一個交換機(jī)可以綁定多個隊列铺峭,一個隊列也可以綁定到多個交換機(jī)上。
        如果沒有隊列綁定到交換機(jī)上汽纠,則發(fā)送到該交換機(jī)上的信息則會丟失卫键。
     -->
<rabbit:topic-exchange name="${mq.queue}_exchange" durable="true" auto-delete="false">
    <rabbit:bindings>
        <!-- 設(shè)置消息Queue匹配的pattern (direct模式為key) -->
        <rabbit:binding queue="test_queue" pattern="${mq.queue}_patt"/>
    </rabbit:bindings>
</rabbit:topic-exchange>

<bean name="rabbitmqService" class="xxx.yyy.zzz"></bean>

<!-- 配置監(jiān)聽 消費(fèi)者 -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
    <!-- 
            queues 監(jiān)聽隊列,多個用逗號分隔 
            ref 監(jiān)聽器 -->
    <rabbit:listener queues="test_queue" ref="rabbitmqService"/>
</rabbit:listener-container>

配合上面的xml配置文件來看看SpringBoot中RabbitMQ的自動配置類RabbitAutoConfiguration

@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {
    
    @Configuration
    @ConditionalOnMissingBean(ConnectionFactory.class)
    protected static class RabbitConnectionFactoryCreator {
    
        @Bean
        public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties config)
            throws Exception {
            RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean();
            //...
            return connectionFactory;
        }
    }
    
    @Configuration
    @Import(RabbitConnectionFactoryCreator.class)
    protected static class RabbitTemplateConfiguration {
        
        @Bean
        @ConditionalOnSingleCandidate(ConnectionFactory.class)
        @ConditionalOnMissingBean(RabbitTemplate.class)
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            //...
            return rabbitTemplate;
        }
        
        @Bean
        @ConditionalOnSingleCandidate(ConnectionFactory.class)
        @ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)
        @ConditionalOnMissingBean(AmqpAdmin.class)
        public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
            return new RabbitAdmin(connectionFactory);
        }
    }
}

該自動配置類中自動注冊了三個重要的Bean虱朵,分別是rabbitConnectionFactory莉炉、rabbitTemplateamqpAdmin碴犬,剛好與xml配置文件中的前三個Bean一一對應(yīng)絮宁。當(dāng)然RabbitMQ的配置信息由RabbitProperties類進(jìn)行導(dǎo)入:

@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitProperties {
    private String host = "localhost";
    private int port = 5672;
    private String username;
    private String password;
    private final Ssl ssl = new Ssl();
    private String virtualHost;
    
    private String addresses;
    private Integer requestedHeartbeat;
    private boolean publisherConfirms;
    private boolean publisherReturns;
    private Integer connectionTimeout;
    private final Cache cache = new Cache();
    private final Listener listener = new Listener();
    private final Template template = new Template();
    private List<Address> parsedAddresses;
    
    public static class Ssl {
        private boolean enabled;
        private String keyStore;
        private String keyStorePassword;
        private String trustStore;
        private String trustStorePassword;
        private String algorithm;
    }
    public static class Cache {
        private final Channel channel = new Channel();
        private final Connection connection = new Connection();
        
        public static class Channel {
            private Integer size;
            private Long checkoutTimeout;
        }

        public static class Connection {
            private CacheMode mode = CacheMode.CHANNEL;
            private Integer size;
        }
    }

    public static class Listener {
        @NestedConfigurationProperty
        private final AmqpContainer simple = new AmqpContainer();
    }

    public static class AmqpContainer {
        private boolean autoStartup = true;
        private AcknowledgeMode acknowledgeMode;
        private Integer concurrency;
        private Integer maxConcurrency;
        private Integer prefetch;
        private Integer transactionSize;
        private Boolean defaultRequeueRejected;
        private Long idleEventInterval;
        @NestedConfigurationProperty
        private final ListenerRetry retry = new ListenerRetry();
    }

    public static class Template {
        @NestedConfigurationProperty
        private final Retry retry = new Retry();
        private Boolean mandatory;
        private Long receiveTimeout;
        private Long replyTimeout;
    }

    public static class Retry {
        private boolean enabled;
        private int maxAttempts = 3;
        private long initialInterval = 1000L;
        private double multiplier = 1.0;
        private long maxInterval = 10000L;
    }

    public static class ListenerRetry extends Retry {
        private boolean stateless = true;
    }
    
    private static final class Address {
        private static final String PREFIX_AMQP = "amqp://";
        private static final int DEFAULT_PORT = 5672;
        private String host;
        private int port;
        private String username;
        private String password;
        private String virtualHost;
    }
}

大家自行嘗試,這里就不多描述了服协。

RabbitAutoConfiguration配置類上有個簽名:

@Import(RabbitAnnotationDrivenConfiguration.class)

來看看RabbitAnnotationDrivenConfiguration類:

public class SimpleRabbitListenerContainerFactory
        extends AbstractRabbitListenerContainerFactory<SimpleMessageListenerContainer>
        implements ApplicationContextAware, ApplicationEventPublisherAware {

    private Executor taskExecutor;

    private PlatformTransactionManager transactionManager;

    private Integer txSize;

    private Integer concurrentConsumers;

    private Integer maxConcurrentConsumers;

    private Long startConsumerMinInterval;

    private Long stopConsumerMinInterval;

    private Integer consecutiveActiveTrigger;

    private Integer consecutiveIdleTrigger;

    private Integer prefetchCount;

    private Long receiveTimeout;

    private Boolean defaultRequeueRejected;

    private Advice[] adviceChain;

    private BackOff recoveryBackOff;

    private Boolean missingQueuesFatal;

    private Boolean mismatchedQueuesFatal;

    private ConsumerTagStrategy consumerTagStrategy;

    private Long idleEventInterval;

    private ApplicationEventPublisher applicationEventPublisher;

    private ApplicationContext applicationContext;
    
    @Override
    protected SimpleMessageListenerContainer createContainerInstance() {
        return new SimpleMessageListenerContainer();
    }

    @Override
    protected void initializeContainer(SimpleMessageListenerContainer instance) {
        //other code...
    }
}

它其實(shí)是SimpleMessageListenerContainer的工廠類绍昂,而SimpleMessageListenerContainer又是<rabbit:listener-container />標(biāo)簽的具體實(shí)現(xiàn)類,剛好又同xml配置文件的消費(fèi)監(jiān)聽容器對應(yīng)偿荷。

至于其他的配置信息窘游,如隊列和交換機(jī)的創(chuàng)建,以及隊列與交換機(jī)的綁定就由配置類自行定義了跳纳。請往下接著看...

6忍饰、RabbitMQ交換機(jī)及工作模式

RabbitMQ的交換機(jī)Exchange有如下幾種類型:

  • Fanout

  • Direct

  • Topic

  • **Header **

其中header類型的Exchange由于用的相對較少,所以本章主要講述其他三種類型的Exchange寺庄。

RabbitMQ的工作模式:

  • 發(fā)布/訂閱模式:對應(yīng)Fanout類型的交換機(jī)艾蓝。
  • 路由模式:對應(yīng)Direct類型的交換機(jī)。
  • 通配符模式:對應(yīng)Topic類型的交換機(jī)铣揉。

6.1饶深、發(fā)布/訂閱模式(Fanout)

任何發(fā)送到Fanout Exchange的消息都會被轉(zhuǎn)發(fā)到與該Exchange綁定(Binding)的所有Queue上。

  • 可以理解為路由表的模式逛拱;
  • 這種模式不需要RouteKey;
  • 這種模式需要提前將Exchange與Queue進(jìn)行綁定台猴,一個Exchange可以綁定多個Queue朽合,一個Queue可以同多個Exchange進(jìn)行綁定俱两;
  • 如果接受到消息的Exchange沒有與任何Queue綁定,則消息會被拋棄曹步。

代碼示例:

FanoutConfig配置類代碼宪彩,配置了兩個隊列和一個交換機(jī),并綁定:

@Configuration
public class FanoutConfig {

    public static final String FANOUT_QUEUE_NAME_1 = "fanout-queue-1";
    public static final String FANOUT_QUEUE_NAME_2 = "fanout-queue-2";
    public static final String FANOUT_EXCHANGE_NAME = "fanout-exchange";

    @Bean
    public Queue fanoutQueue1() {
//        return new Queue(FANOUT_QUEUE_NAME_1);//默認(rèn)情況讲婚,durable為true,exclusive為false,auto-delete為false
        return QueueBuilder.durable(FANOUT_QUEUE_NAME_1).build();
    }

    @Bean
    public Queue fanoutQueue2() {
//        return new Queue(FANOUT_QUEUE_NAME_1);//默認(rèn)情況尿孔,durable為true,exclusive為false,auto-delete為false
        return QueueBuilder.durable(FANOUT_QUEUE_NAME_2).build();
    }

    @Bean
    public FanoutExchange fanoutExchange() {
//        return new FanoutExchange(FANOUT_EXCHANGE_NAME);//默認(rèn)情況下,durable為true,auto-delete為false
        return (FanoutExchange) ExchangeBuilder.fanoutExchange(FANOUT_EXCHANGE_NAME).durable(true).build();
    }

    @Bean
    public Binding fanoutBinding1(FanoutExchange fanoutExchange, Queue fanoutQueue1) {
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    @Bean
    public Binding fanoutBinding2(FanoutExchange fanoutExchange, Queue fanoutQueue2) {
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

消息生產(chǎn)者:

@Component(value = "fanout-sender")
public class Sender {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send(String name){
        String content = "hello : " + name + "筹麸,當(dāng)前時間:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));

        amqpTemplate.convertAndSend(FanoutConfig.FANOUT_EXCHANGE_NAME, "", content);
    }
}

消費(fèi)者1號:

@RabbitListener(queues = {FanoutConfig.FANOUT_QUEUE_NAME_1})
@Component("fanout-receiver1")
public class Receiver1 {

    @RabbitHandler
    public void handler(String content){
        System.out.println("Fanout.Receiver1接收到:" + content);
    }
}

消費(fèi)者2號:

@RabbitListener(queues = {FanoutConfig.FANOUT_QUEUE_NAME_2})
@Component("fanout-receiver2")
public class Receiver2 {

    @RabbitHandler
    public void handler(String content){
        System.out.println("Fanout.Receiver2接收到:" + content);
    }
}

Fanout控制器:

@RestController
public class FanoutController {

    @Autowired
    @Qualifier("fanout-sender")
    private Sender sender;

    @RequestMapping("/fanout")
    public String hello(String name){
        sender.send(name);
        return "success";
    }
}

輸入url:http://localhost:8081/fanout?name=Cay 觀看控制臺輸出:

fanout模式輸出結(jié)果.png

6.2活合、路由模式(Direct)

  • RabbitMQ默認(rèn)自帶Exchange,該Exchange的名字為空字符串物赶,當(dāng)前也可以自己指定名字白指;

  • 在默認(rèn)的Exchange下,不需要將Exchange與Queue綁定酵紫, RabbitMQ會自動綁定告嘲;而如果使用自定義的Exchange,則需要在將Exchange綁定到Queue的時候需要指定一個RouteKey奖地;

  • 在消息傳遞時需要一個RouteKey橄唬;

  • 所有發(fā)送到Direct Exchange的消息會被轉(zhuǎn)發(fā)到RouteKey中指定的Queue。

  • 如果vhost中不存在RouteKey中指定的隊列名参歹,則該消息會被拋棄轧坎。

代碼示例:

DirectConfig配置類代碼,配置兩個隊列泽示,通過兩個不同的routeKey綁定到同一個Exchange上:

@Configuration
public class DirectConfig {

    public static final String DIRECT_QUEUE_NAME_1 = "direct-queue-1";
    public static final String DIRECT_QUEUE_NAME_2 = "direct-queue-2";
    public static final String DIRECT_EXCHANGE_NAME = "direct-exchange";

    public static final String ROUTE_KEY_1 = "direct.route.key.1";
    public static final String ROUTE_KEY_2 = "direct.route.key.2";

    @Bean
    public Queue directQueue1() {
//        return new Queue(DIRECT_QUEUE_NAME_1);//默認(rèn)情況缸血,durable為true,exclusive為false,auto-delete為false
        return QueueBuilder.durable(DIRECT_QUEUE_NAME_1).build();
    }

    @Bean
    public Queue directQueue2() {
//        return new Queue(DIRECT_QUEUE_NAME_2);//默認(rèn)情況,durable為true,exclusive為false,auto-delete為false
        return QueueBuilder.durable(DIRECT_QUEUE_NAME_2).build();
    }

    @Bean
    public DirectExchange directExchange() {
//        return new DirectExchange(DIRECT_EXCHANGE_NAME_1);//默認(rèn)情況下械筛,durable為true,auto-delete為false
        return (DirectExchange) ExchangeBuilder.directExchange(DIRECT_EXCHANGE_NAME).durable(true).build();
    }

    @Bean
    public Binding directBinding1(DirectExchange directExchange, Queue directQueue1) {
        return BindingBuilder.bind(directQueue1).to(directExchange).with(ROUTE_KEY_1);
    }

    @Bean
    public Binding directBinding2(DirectExchange directExchange, Queue directQueue2) {
        return BindingBuilder.bind(directQueue2).to(directExchange).with(ROUTE_KEY_2);
    }
}

消息生產(chǎn)者:

@Component("direct-sender")
public class Sender {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send(Integer selector) {
        String content = "hello捎泻,我是%d號,當(dāng)前時間:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        String routeKey = "";
        if (selector.intValue() == 1) {
            content = String.format(content, 1);
            routeKey = DirectConfig.ROUTE_KEY_1;
        } else if (selector.intValue() == 2) {
            content = String.format(content, 2);
            routeKey = DirectConfig.ROUTE_KEY_2;
        }
        amqpTemplate.convertAndSend(DirectConfig.DIRECT_EXCHANGE_NAME, routeKey, content);
    }
}

消費(fèi)者1號:

@RabbitListener(queues = {DirectConfig.DIRECT_QUEUE_NAME_1})
@Component("direct-receiver1")
public class Receiver1 {

    @RabbitHandler
    public void handler(String content){
        System.out.println("Direct.Receiver1接收到:" + content);
    }
}

消費(fèi)者2號:

@RabbitListener(queues = {DirectConfig.DIRECT_QUEUE_NAME_2})
@Component("direct-receiver2")
public class Receiver2 {

    @RabbitHandler
    public void handler(String content){
        System.out.println("Direct.Receiver2接收到:" + content);
    }
}

Direct控制器:

@RestController
public class DirectController {
    private static final Logger logger = LoggerFactory.getLogger(DirectController.class);

    @Autowired
    @Qualifier("direct-sender")
    private Sender sender;

    @RequestMapping("/direct")
    public String hello(@RequestParam(defaultValue = "1") int selector){
        logger.info("參數(shù)selector:" + selector);
        sender.send(selector);
        return "success";
    }
}

輸入兩次不同的參數(shù)selector:

direct模式輸出結(jié)果.png

6.3埋哟、通配符模式(Topic)

任何發(fā)送到Topic Exchange的消息都會被轉(zhuǎn)發(fā)到所有關(guān)心RouteKey中指定的Queue上笆豁。

  • 這種模式較為復(fù)雜,簡單來說赤赊,就是每個隊列都有其關(guān)心的主題闯狱,所有的消息都帶有一個“標(biāo)題”(RouteKey),Exchange會將消息轉(zhuǎn)發(fā)到所有關(guān)注主題能與RouteKey模糊匹配的隊列抛计。

  • 這種模式需要RouteKey哄孤,也需要提前綁定Exchange與Queue。

  • 在進(jìn)行綁定時吹截,要提供一個該隊列關(guān)心的主題瘦陈,如“#.log.#”表示該隊列關(guān)心所有涉及l(fā)og的消息(一個RouteKey為”MQ.log.error”的消息會被轉(zhuǎn)發(fā)到該隊列)凝危。

  • “#”表示0個或若干個關(guān)鍵字,“*”表示一個關(guān)鍵字晨逝。如“l(fā)og.*”能與“l(fā)og.warn”匹配蛾默,但是無法與“l(fā)og.warn.timeout”匹配;但是“l(fā)og.#”能與上述兩者匹配捉貌。

  • 同樣支鸡,如果Exchange沒有發(fā)現(xiàn)能夠與RouteKey匹配的Queue,則會拋棄此消息趁窃。

代碼示例:

TopicConfig配置類牧挣,聲明了兩個隊列,分別對應(yīng)兩個routeKey:topic.#和topic.*

@Configuration
public class TopicConfig {

    public static final String TOPIC_QUEUE_NAME_1 = "topic-queue-1";
    public static final String TOPIC_QUEUE_NAME_2 = "topic-queue-2";
    public static final String TOPIC_EXCHANGE_NAME = "topic-exchange";

    public static final String ROUTE_KEY_1 = "topic.#";
    public static final String ROUTE_KEY_2 = "topic.*";

    @Bean
    public TopicExchange topicExchange() {
//        return new TopicExchange(TOPIC_EXCHANGE_NAME);//默認(rèn)情況下棚菊,durable為true,auto-delete為false
        return (TopicExchange) ExchangeBuilder.topicExchange(TOPIC_EXCHANGE_NAME).durable(true).build();
    }
    
    @Bean
    public Queue topicQueue1() {
        return new Queue(TOPIC_QUEUE_NAME_1);
    }

    @Bean
    public Queue topicQueue2() {
        return new Queue(TOPIC_QUEUE_NAME_2);
    }

    @Bean
    public Binding topicBinding1(TopicExchange topicExchange, Queue topicQueue1) {
        return BindingBuilder.bind(topicQueue1).to(topicExchange).with(ROUTE_KEY_1);
    }

    @Bean
    public Binding topicBinding2(TopicExchange topicExchange, Queue topicQueue2) {
        return BindingBuilder.bind(topicQueue2).to(topicExchange).with(ROUTE_KEY_2);
    }
}

消息生產(chǎn)者:

@Component("topic-sender")
public class Sender {

    private static final String TOPIC_PREFIX = "topic.";

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send(String selector){
        String content = "hello浸踩,當(dāng)前時間:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));

        amqpTemplate.convertAndSend(TopicConfig.TOPIC_EXCHANGE_NAME, TOPIC_PREFIX + selector, content);
    }
}

消費(fèi)者1號:

@RabbitListener(queues = {TopicConfig.TOPIC_QUEUE_NAME_1})
@Component("topic-receiver1")
public class Receiver1 {

    @RabbitHandler
    public void handler(String content){
        System.out.println("Topic.Receiver1接收到:" + content);
    }
}

消費(fèi)者2號:

@RabbitListener(queues = {TopicConfig.TOPIC_QUEUE_NAME_2})
@Component("topic-receiver2")
public class Receiver2 {

    @RabbitHandler
    public void handler(String content){
        System.out.println("Topic.Receiver2接收到:" + content);
    }
}

Topic控制器:

@RestController
@RequestMapping("/topic")
public class TopicController {

    @Autowired
    @Qualifier("topic-sender")
    private Sender sender;

    @RequestMapping("/send")
    public String send(String routeKey){
        sender.send(routeKey);
        return "success";
    }
}

通過url訪問不同的routeKey來查看結(jié)果:

topic模式輸出結(jié)果1.png
topic模式輸出結(jié)果2.png

可以看到,如果參數(shù)為message的時候即routeKey為topic.message统求,兩個隊列都能接收到消息检碗,而如果參數(shù)為message.a的時候即routeKey為topic.message.a,只有隊列1能接收到消息码邻,而隊列2不能接收到消息折剃。

7、消息確認(rèn)與回調(diào)

? 默認(rèn)情況下像屋,RabbitMQ發(fā)送消息以及接收消息是自動確認(rèn)的怕犁,意思也就是說,消息發(fā)送方發(fā)送消息的時候己莺,認(rèn)為消息已經(jīng)成功發(fā)送到了RabbitMQ服務(wù)器奏甫,而當(dāng)消息發(fā)送給消費(fèi)者后,RabbitMQ服務(wù)器就立即自動確認(rèn)凌受,然后將消息從隊列中刪除了阵子。而這樣的自動機(jī)制會造成消息的丟失,我們常常聽到“丟消息”的字眼胜蛉。

? 為了解決消息的丟失挠进,RabbitMQ便產(chǎn)生了手動確認(rèn)的機(jī)制:

  • 發(fā)送者:
    • 當(dāng)消息不能路由到任何隊列時,會進(jìn)行確認(rèn)失敗操作誊册,如果發(fā)送方設(shè)置了mandatory=true模式领突,則先會調(diào)用basic.return方法,然后調(diào)用basic.ack方法案怯;
    • 當(dāng)消息可以路由時君旦,消息被發(fā)送到所有綁定的隊列時,進(jìn)行消息的確認(rèn)basic.ack
  • 接收者:
    • 當(dāng)消息成功被消費(fèi)時于宙,可以進(jìn)行消息的確認(rèn)basic.ack浮驳;
    • 當(dāng)消息不能正常被消息時悍汛,可以進(jìn)行消息的反確認(rèn)basic.nack 或者拒絕basic.reject捞魁。

當(dāng)設(shè)置mandatory=true時,其中basic.ackbasic.nack會調(diào)用自定義的RabbitTemplate.ConfirmCallback接口的confirm方法离咐。

public interface ConfirmCallback {
    /**
         * Confirmation callback.
         * @param correlationData correlation data for the callback.
         * @param ack true for ack, false for nack
         * @param cause An optional cause, for nack, when available, otherwise null.
         */
    void confirm(CorrelationData correlationData, boolean ack, String cause);

}

而發(fā)送者發(fā)送消息時無法路由后谱俭,會調(diào)用baisc.return方法,其確認(rèn)機(jī)制由RabbitTemplate.ReturnCallback接口的returnedMessage方法實(shí)現(xiàn)宵蛀。

public interface ReturnCallback {
    /**
         * Returned message callback.
         * @param message the returned message.
         * @param replyCode the reply code.
         * @param replyText the reply text.
         * @param exchange the exchange.
         * @param routingKey the routing key.
         */
    void returnedMessage(Message message, int replyCode, String replyText,
                         String exchange, String routingKey);
}

7.1昆著、使用Spring配置RabbitMQ的確認(rèn)機(jī)制

  • 修改publisher-confirms為true
<!--連接工廠 -->
 <rabbit:connection-factory id="connectionFactory" host="{ip地址}" port="{端口}" username="{用戶名}" password="{密碼}" publisher-confirms="true"/>
  • 修改消息回調(diào)方法confirm-callback和return-callback為bean的id
<!-- mandatory必須設(shè)置true,return callback才生效 -->
<rabbit:template id="amqpTemplate"  connection-factory="connectionFactory" 
                 confirm-callback="confirmCallBackListener"
                 return-callback="returnCallBackListener" 
                 mandatory="true" 
/>
  • 消息回調(diào)類
@Service
public class ConfirmCallBackListener implements RabbitTemplate.ConfirmCallback{

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        //other code...
    }
}

@Service
class returnCallBackListener implements RabbitTemplate.ReturnCallback{
    
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        //other code...
    }
}
  • 修改消息確認(rèn)機(jī)制改成手動確認(rèn)manual:
<rabbit:listener-container
        connection-factory="connectionFactory" acknowledge="manual" >
        <!-- other xml  -->
</rabbit:listener-container>

7.2、使用SpringBoot配置RabbitMQ的確認(rèn)機(jī)制

@Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
    //other code...
    
    //發(fā)送者是否確認(rèn)
    cachingConnectionFactory.setPublisherConfirms(true);
    return cachingConnectionFactory;
}

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(){
    SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();

    //other code...

    //修改成手動確認(rèn)
    rabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

    return rabbitListenerContainerFactory;
}

@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());

    //重點(diǎn)
    rabbitTemplate.setMandatory(true);
    //消息回調(diào)
    rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {

    });

    rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {

    });
    return rabbitTemplate;
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末术陶,一起剝皮案震驚了整個濱河市凑懂,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌梧宫,老刑警劉巖接谨,帶你破解...
    沈念sama閱讀 211,290評論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異塘匣,居然都是意外死亡脓豪,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,107評論 2 385
  • 文/潘曉璐 我一進(jìn)店門忌卤,熙熙樓的掌柜王于貴愁眉苦臉地迎上來扫夜,“玉大人,你說我怎么就攤上這事驰徊◇源常” “怎么了?”我有些...
    開封第一講書人閱讀 156,872評論 0 347
  • 文/不壞的土叔 我叫張陵棍厂,是天一觀的道長颗味。 經(jīng)常有香客問我,道長勋桶,這世上最難降的妖魔是什么脱衙? 我笑而不...
    開封第一講書人閱讀 56,415評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮例驹,結(jié)果婚禮上捐韩,老公的妹妹穿的比我還像新娘。我一直安慰自己鹃锈,他們只是感情好荤胁,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,453評論 6 385
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著屎债,像睡著了一般仅政。 火紅的嫁衣襯著肌膚如雪垢油。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,784評論 1 290
  • 那天圆丹,我揣著相機(jī)與錄音滩愁,去河邊找鬼。 笑死辫封,一個胖子當(dāng)著我的面吹牛硝枉,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播倦微,決...
    沈念sama閱讀 38,927評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼妻味,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了欣福?” 一聲冷哼從身側(cè)響起责球,我...
    開封第一講書人閱讀 37,691評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎拓劝,沒想到半個月后雏逾,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,137評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡凿将,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,472評論 2 326
  • 正文 我和宋清朗相戀三年校套,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片牧抵。...
    茶點(diǎn)故事閱讀 38,622評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡笛匙,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出犀变,到底是詐尸還是另有隱情妹孙,我是刑警寧澤,帶...
    沈念sama閱讀 34,289評論 4 329
  • 正文 年R本政府宣布获枝,位于F島的核電站蠢正,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏省店。R本人自食惡果不足惜嚣崭,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,887評論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望懦傍。 院中可真熱鬧雹舀,春花似錦、人聲如沸粗俱。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,741評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至签财,卻和暖如春串慰,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背唱蒸。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評論 1 265
  • 我被黑心中介騙來泰國打工邦鲫, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人油宜。 一個月前我還...
    沈念sama閱讀 46,316評論 2 360
  • 正文 我出身青樓掂碱,卻偏偏與公主長得像怜姿,于是被迫代替她去往敵國和親慎冤。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,490評論 2 348

推薦閱讀更多精彩內(nèi)容