Springboot與RabbitMQ的整合

首先需弄明白RabbitMq的 exchange璧眠、route、queue的關(guān)系
可參照:(http://blog.csdn.net/samxx8/article/details/47417133)

RabbitMQ 三種Exchange

  • Direct Exchange – 處理路由鍵读虏。需要將一個隊列綁定到交換機上责静,要求該消息與一個特定的路由鍵完全匹配。
  • Fanout Exchange – 不處理路由鍵盖桥。你只需要簡單的將隊列綁定到交換機上灾螃。一個發(fā)送到交換機的消息都會被轉(zhuǎn)發(fā)到與該交換機綁定的所有隊列上。
  • Topic Exchange – 將路由鍵和某模式進行匹配揩徊。此時隊列需要綁定要一個模式上腰鬼。符號“#”匹配一個或多個詞,符號“*”匹配不多不少一個詞塑荒。

以下6個實例展示大部分應(yīng)用場景:

*1.單生產(chǎn)者和單消費者

新建springboot maven工程 springboot-rabbitmq-OneToOne

pom.xml配置如下:

其中spring-boot-starter-amqp 是 rabbitmq 核心包熄赡。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.dev</groupId>
    <artifactId>springboot-rabbitmq-OneToOne</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>springboot-rabbitmq-demo</name>
    <description>springboot-rabbitmq-demo</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.4.RELEASE</version>
        <relativePath/> 
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>
</project>

application.yml配置如下:
spring: 
  application:
    name: rabbit-service
  rabbitmq:
    host: xxx.xxx.xxx.xxx    #rabbitmq服務(wù)器地址
    port: xxxx               #rabbitmq服務(wù)器端口
    username: xxx
    password: xxx
主類
package com.rabbit;

import org.springframework.amqp.core.Queue;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import com.rabbit.queue.QueueName;

@SpringBootApplication
public class SpringbootRabbitmqDemoApplication {
    
    @Bean
    public Queue helloQueue() {//劃重點:自動創(chuàng)建Queue
        return new Queue(QueueName.OneToOneQueue);
    }

    public static void main(String[] args) {
        SpringApplication.run(SpringbootRabbitmqDemoApplication.class, args);
    }
}
Queue名稱定義
package com.rabbit.queue;
public class QueueName {    
//此處定義必須是final不然會報錯
//The value for annotation attribute RabbitListener.queues must be a constant expression
public final static String OneToOneQueue="OneToOneQueue";
}
發(fā)送端
package com.rabbit.hello;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.rabbit.queue.QueueName;

@Component
public class Sender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String sendMsg = "Sender: hello rabbitMQ ";
        System.out.println(sendMsg);
        //向指定Queue發(fā)送文本消息
        this.rabbitTemplate.convertAndSend(QueueName.OneToOneQueue, sendMsg);
    }

}
接收端
package com.rabbit.hello;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.rabbit.queue.QueueName;

@Component
@RabbitListener(queues = {QueueName.OneToOneQueue}) //監(jiān)控指定的Queue
public class Receiver {

    @RabbitHandler
    public void process(String revmsg) {
        System.out.println("Receiver  : " + revmsg);
    }

}
Controller
package com.rabbit.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import com.rabbit.hello.Sender;

@RestController
public class RabbitTest {
    
    @Autowired
    private Sender sender;
    
    @GetMapping("/oneToOne")
    public void oneToOne() {
        sender.send();
    }
}

測試結(jié)果:

http://127.0.0.1:8080/oneToOne

Sender: hello rabbitMQ 
Receiver  : Sender: hello rabbitMQ 

至此,單生產(chǎn)者和單消費者整合結(jié)束袜炕。

*2.單生產(chǎn)者和多消費者

與單消費者類似本谜,其中接收端為多個消費者

package com.rabbit.hello;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.rabbit.queue.QueueName;

@Component
@RabbitListener(queues = {QueueName.OneToManyQueue})//監(jiān)控指定的Queue, 名稱修改,以區(qū)分一對一的Queue
public class Receiver1 {

    @RabbitHandler
    public void process(String revmsg) {
        System.out.println("Receiver1  : " + revmsg);
    }
}
package com.rabbit.hello;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.rabbit.queue.QueueName;

@Component
@RabbitListener(queues = {QueueName.OneToManyQueue})//監(jiān)控指定的Queue, 名稱修改偎窘,以區(qū)分一對一的Queue
public class Receiver2 {

    @RabbitHandler
    public void process(String revmsg) {
        System.out.println("Receiver2  : " + revmsg);
    }
}
Controller
package com.rabbit.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import com.rabbit.hello.Sender;

@RestController
public class RabbitTest {
    
    @Autowired
    private Sender sender;

    
    /**
     * 單生產(chǎn)者-多消費者
     */
    @GetMapping("/oneToMany")
    public void oneToMany() {
        for(int i=0;i<10;i++){
            sender.send("hellomsg:"+i);
        }
        
    }
}

測試結(jié)果:

http://127.0.0.1:8080/oneToMany
生產(chǎn)者發(fā)送的10條消息乌助,分別被兩個消費者接收了

Sender: hello rabbitMQ 
Sender: hello rabbitMQ 
Sender: hello rabbitMQ 
Sender: hello rabbitMQ 
Sender: hello rabbitMQ 
Sender: hello rabbitMQ 
Sender: hello rabbitMQ 
Sender: hello rabbitMQ 
Sender: hello rabbitMQ 
Sender: hello rabbitMQ 
Receiver1  : Sender: hello rabbitMQ 
Receiver2  : Sender: hello rabbitMQ 
Receiver1  : Sender: hello rabbitMQ 
Receiver2  : Sender: hello rabbitMQ 
Receiver1  : Sender: hello rabbitMQ 
Receiver2  : Sender: hello rabbitMQ 
Receiver1  : Sender: hello rabbitMQ 
Receiver2  : Sender: hello rabbitMQ 
Receiver1  : Sender: hello rabbitMQ 
Receiver2  : Sender: hello rabbitMQ 

至此溜在,單生產(chǎn)者和多消費者整合結(jié)束。

*3.多生產(chǎn)者和多消費者

跟以上類似他托,再加個生產(chǎn)者即可

package com.rabbit.hello;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.rabbit.queue.QueueName;

@Component
public class Sender1 {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send(String msg) {
        String sendMsg = msg;
        System.out.println("Sender1 : " + sendMsg);
        this.rabbitTemplate.convertAndSend(QueueName.ManyToManyQueue, sendMsg);
    }

}
package com.rabbit.hello;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.rabbit.queue.QueueName;

@Component
public class Sender2 {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send(String msg) {
        String sendMsg = msg;
        System.out.println("Sender2 : " + sendMsg);
        this.rabbitTemplate.convertAndSend(QueueName.ManyToManyQueue, sendMsg);
    }

}
Controller
package com.rabbit.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import com.rabbit.hello.Sender1;
import com.rabbit.hello.Sender2;

@RestController
public class RabbitTest {
    
    @Autowired
    private Sender1 sender1;
    @Autowired
    private Sender2 sender2;
    
    /**
     * 多生產(chǎn)者-多消費者
     */
    @GetMapping("/manyToMany")
    public void manyToMany() {
        for(int i=0;i<10;i++){
            sender1.send("msg-1 :"+i);
            sender2.send("msg-2 :"+i);
        }
        
    }
}
http://127.0.0.1:8080/manyToMany
接收端仍然會均勻接收到消息

Sender1 : msg-1 :0
Sender2 : msg-2 :0
Sender1 : msg-1 :1
Sender2 : msg-2 :1
Sender1 : msg-1 :2
Sender2 : msg-2 :2
Sender1 : msg-1 :3
Sender2 : msg-2 :3
Sender1 : msg-1 :4
Sender2 : msg-2 :4
Sender1 : msg-1 :5
Sender2 : msg-2 :5
Sender1 : msg-1 :6
Sender2 : msg-2 :6
Sender1 : msg-1 :7
Sender2 : msg-2 :7
Sender1 : msg-1 :8
Sender2 : msg-2 :8
Sender1 : msg-1 :9
Sender2 : msg-2 :9
Receiver1  : msg-1 :0
Receiver2  : msg-2 :0
Receiver1  : msg-1 :1
Receiver2  : msg-2 :1
Receiver1  : msg-1 :2
Receiver2  : msg-2 :2
Receiver1  : msg-1 :3
Receiver2  : msg-2 :3
Receiver1  : msg-1 :4
Receiver2  : msg-2 :4
Receiver1  : msg-1 :5
Receiver2  : msg-2 :5
Receiver2  : msg-2 :6
Receiver1  : msg-1 :6
Receiver2  : msg-1 :7
Receiver1  : msg-2 :7
Receiver2  : msg-1 :8
Receiver1  : msg-2 :8
Receiver1  : msg-2 :9
Receiver2  : msg-1 :9

*4. topic ExChange(DirectExchange 與之類似較簡單掖肋,此處不作實例演示,可參照RPC消息實例赏参。)

啟動類修改:

package com.rabbit;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import com.rabbit.queue.QueueName;


@SpringBootApplication
public class SpringbootRabbitmqTopicApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringbootRabbitmqTopicApplication.class, args);
    }

    // 創(chuàng)建隊列
    @Bean
    public Queue queueMessage() {
        return new Queue(QueueName.message);//隊列名稱自己定義在QueueName類里
    }

    // 創(chuàng)建隊列
    @Bean
    public Queue queueMessages() {
        return new Queue(QueueName.messages);
    }

    // 創(chuàng)建交換器
    @Bean
    TopicExchange exchange() {
        return new TopicExchange("topicExchange");
    }

    @Bean
    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {//劃重點:隊列名稱與上述方法名一致
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");//同一exchange下的隊列綁定不同的routingkey
    }
    
    @Bean
    Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");//#代表一個或多個匹配志笼,*代表一個字符匹配
    }

}
發(fā)送方

//3個不同方法均可用于測試,綁定3個不同的routingkey

package com.rabbit.topic;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.rabbit.queue.QueueName;

@Component
public class TopicSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "i am testTopic";
        System.out.println("Sender : " + context);
        String routingKey="topic.1";
        this.rabbitTemplate.convertAndSend("topicExchange", routingKey, context);
    }

    public void send1() {
        String context = "i am testTopic 1";
        System.out.println("Sender : " + context);
        String routingKey="topic.message";
        //檢查routingKey "topic.message" 是否匹配QueueName.message(topic.message)把篓,QueueName.messages(topic.#)中的routingKey
        this.rabbitTemplate.convertAndSend("topicExchange", routingKey,context);
    }
    
    public void send2() {
        String context = "i am testTopic 2";
        System.out.println("Sender : " + context);
        String routingKey="topic.messages";
        this.rabbitTemplate.convertAndSend("topicExchange", routingKey,context);
    }
}
兩個接收方

指定不同的隊列纫溃,測試是單獨收到還是同時收到消息

package com.rabbit.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.rabbit.queue.QueueName;

@Component
@RabbitListener(queues = QueueName.message)
public class TopicReceiver1 {
    @RabbitHandler
    public void process(String message) {
        System.out.println("Topic Receiver1 : " + message);
    }
}
package com.rabbit.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.rabbit.queue.QueueName;

@Component
@RabbitListener(queues = QueueName.messages)
public class TopicReceiver2 {
    @RabbitHandler
    public void process(String message) {
        System.out.println("Topic Receiver2 : " + message);
    }
}
Controller
package com.rabbit.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import com.rabbit.topic.TopicSender;

@RestController
public class RabbitTest {
    
    @Autowired
    private TopicSender topicSender;
    
    /**
     * topic exchange類型rabbitmq測試
     */
    @GetMapping("/topicTest")
    public void topicTest() {
           topicSender.send();
    }
    @GetMapping("/topicTest1")
    public void topicTest1() {
           topicSender.send1();
    }
    @GetMapping("/topicTest2")
    public void topicTest2() {
           topicSender.send2();
    }
}
測試結(jié)果(routingKey-隊列1:topic.message 隊列2:topic.messages)
http://localhost:8080/topicTest
Sender : i am testTopic
Topic Receiver2 : i am testTopic
發(fā)送方的routingkey 是topic.1不匹配第一個隊列綁定的routingkey
而第二個隊列routingkey 是topic.# 模糊匹配,所以綁定第二個隊列的接收端能收到消息
--------------------------------------------
http://localhost:8080/topicTest1
Sender : i am testTopic 1
Topic Receiver1 : i am testTopic 1
Topic Receiver2 : i am testTopic 1
發(fā)送方的routingkey是topic.message韧掩,綁定的兩個隊列的routingKey均匹配紊浩,則都能收到
---------------------------------------------
http://localhost:8080/topicTest2
Sender : i am testTopic 2
Topic Receiver2 : i am testTopic 2
發(fā)送方的routingkey是topic.messages ,不匹配第一個隊列routingKey疗锐,而第二個模糊匹配坊谁,則能收到

至此topic相關(guān)結(jié)束。

*5. fanout ExChange

Fanout 就是我們熟悉的廣播模式或者訂閱模式滑臊,給Fanout轉(zhuǎn)發(fā)器發(fā)送消息口芍,綁定這個轉(zhuǎn)發(fā)器的所有隊列都收到這個消息。

啟動類(3個隊列綁定FanoutExchange ):
package com.rabbit;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class SpringbootRabbitmqFanoutApplication {
    
      @Bean
        public Queue AMessage() {
            return new Queue("fanout.A");
        }

        @Bean
        public Queue BMessage() {
            return new Queue("fanout.B");
        }

        @Bean
        public Queue CMessage() {
            return new Queue("fanout.C");
        }

        @Bean
        FanoutExchange fanoutExchange() {
            return new FanoutExchange("fanoutExchange");
        }

        @Bean
        Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(AMessage).to(fanoutExchange);
        }

        @Bean
        Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(BMessage).to(fanoutExchange);
        }

        @Bean
        Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(CMessage).to(fanoutExchange);
        }


    public static void main(String[] args) {
        SpringApplication.run(SpringbootRabbitmqFanoutApplication.class, args);
    }
}
發(fā)送方:
package com.rabbit.fanout;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class FanoutSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String msgString="fanoutSender : I am fanoutSender";
        System.out.println(msgString);
        this.rabbitTemplate.convertAndSend("fanoutExchange","hehe" ,msgString);//routingkey任意雇卷,去除routingKey則不生效
    }

}
接收方
package com.rabbit.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("FanoutReceiverA  : " + msg);
    }

} 
package com.rabbit.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("FanoutReceiverB  : " + msg);
    }

} 
package com.rabbit.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("FanoutReceiverC  : " + msg);
    }

} 
Controller:
package com.rabbit.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import com.rabbit.fanout.FanoutSender;

@RestController
public class RabbitTest {
    
    @Autowired
    private FanoutSender fanoutSender;
    @Autowired
    
    /**
     * fanout exchange類型rabbitmq測試
     */
    @GetMapping("/fanoutTest")
    public void fanoutTest() {
           fanoutSender.send();
    }
}
測試結(jié)果
http://localhost:8080/fanoutTest

fanoutSender : I am fanoutSender
FanoutReceiverB  : fanoutSender : I am fanoutSender
FanoutReceiverC  : fanoutSender : I am fanoutSender
FanoutReceiverA  : fanoutSender : I am fanoutSender

至此fanout exchange結(jié)束

*6. RPC(callback)

啟動類
package com.rabbit;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import com.rabbit.queue.QueueName;

@SpringBootApplication
public class SpringbootRabbitmqCallbackApplication {
    
    /** 設(shè)置交換機類型 */
    @Bean
    public DirectExchange defaultExchange() {
        /**
         * DirectExchange:按照routingkey分發(fā)到指定隊列 
         * TopicExchange:多關(guān)鍵字匹配
         * FanoutExchange: 將消息分發(fā)到所有的綁定隊列鬓椭,無routingkey的概念 
         * HeadersExchange:通過添加屬性key-value匹配
         */
        return new DirectExchange(QueueName.FOO_EXCHANGE);
    }

    @Bean
    public Queue fooQueue() {
        return new Queue(QueueName.FOO_QUEUE);
    }

    @Bean
    public Binding binding() {
        /** 將隊列綁定到交換機 */
        return BindingBuilder.bind(fooQueue()).to(defaultExchange()).with(QueueName.FOO_ROUTINGKEY);
    }

    public static void main(String[] args) {
        SpringApplication.run(SpringbootRabbitmqCallbackApplication.class, args);
    }
}

增加回調(diào)處理,這里不再使用application.properties默認配置的方式聋庵,會在程序中顯示的使用文件中的配置信息膘融。

package com.rabbitmq.config;  
  
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;  
import org.springframework.amqp.rabbit.connection.ConnectionFactory;  
import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.beans.factory.annotation.Value;  
import org.springframework.beans.factory.config.ConfigurableBeanFactory;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  
import org.springframework.context.annotation.Scope;  
  
@Configuration  
public class AmqpConfig {  
  
    @Value("${spring.rabbitmq.addresses}")  
    private String addresses;  
    @Value("${spring.rabbitmq.username}")  
    private String username;  
    @Value("${spring.rabbitmq.password}")  
    private String password;  
    @Value("${spring.rabbitmq.virtual-host}")  
    private String virtualHost;  
    @Value("${spring.rabbitmq.publisher-confirms}")  
    private boolean publisherConfirms;  
  
    @Bean  
    public ConnectionFactory connectionFactory() {  
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();  
        connectionFactory.setAddresses(addresses);  
        connectionFactory.setUsername(username);  
        connectionFactory.setPassword(password);  
        connectionFactory.setVirtualHost(virtualHost);  
        /** 如果要進行消息回調(diào),則這里必須要設(shè)置為true */  
        connectionFactory.setPublisherConfirms(publisherConfirms);  
        return connectionFactory;  
    }  
  
    @Bean  
    /** 因為要設(shè)置回調(diào)類祭玉,所以應(yīng)是prototype類型氧映,如果是singleton類型,則回調(diào)類為最后一次設(shè)置 */  
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)  
    public RabbitTemplate rabbitTemplate() {  
        RabbitTemplate template = new RabbitTemplate(connectionFactory());  
        return template;  
    }  
  
}  
發(fā)送方
package com.rabbit.callback;  
  
import java.util.UUID;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.rabbit.queue.QueueName;
  
@Component  
public class Sender implements RabbitTemplate.ConfirmCallback {  //劃重點 必須實現(xiàn)該接口
  
    private RabbitTemplate rabbitTemplate;  
  
    @Autowired  
    public Sender(RabbitTemplate rabbitTemplate) {  
        this.rabbitTemplate = rabbitTemplate;  
        this.rabbitTemplate.setConfirmCallback(this);  
    }  
  
    public void send(String msg) {  
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());  
        System.out.println("send: " + correlationData.getId());  
        this.rabbitTemplate.convertAndSend(QueueName.FOO_EXCHANGE, QueueName.FOO_ROUTINGKEY, msg, correlationData);  
    }  
  
    /** 回調(diào)方法 */  
    @Override  
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {  
        System.out.println("confirm: " + correlationData.getId());  
    }  
}  
接收方
package com.rabbit.callback;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Configuration;

import com.rabbit.queue.QueueName;

@Configuration
@RabbitListener(queues = QueueName.FOO_QUEUE)
public class Receiver {

    @RabbitHandler
    public void process(String foo) {
        System.out.println("Receiver: " + foo);
    }
}
相關(guān)名稱定義
package com.rabbit.queue;

public class QueueName {
    
    public static final String FOO_EXCHANGE   = "callback.exchange.foo";  
    public static final String FOO_ROUTINGKEY = "callback.routingkey.foo";  
    public static final String FOO_QUEUE      = "callback.queue.foo";  

}

Controller
package com.rabbit.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import com.rabbit.callback.Sender;
  
@RestController  
public class RabbitTest {  
  
    @Autowired  
    private Sender sender;  
  
    @GetMapping("/callback")  
    public void send() {  
        sender.send("this is callback message");  
    }  
}  
測試結(jié)果
http://localhost:8080/callback

send: dc9243c5-b524-4f8c-a0c5-248018ecf66b
confirm: dc9243c5-b524-4f8c-a0c5-248018ecf66b
Receiver: this is callback message

至此Callback(RPC)結(jié)束

說明:rabbitmq中消息的發(fā)送也支持對象的傳遞脱货,只需對象實現(xiàn)Serializable接口即可岛都。

Springboot與RabbitMQ的整合部分結(jié)束,感謝振峻。

部分內(nèi)容參照:
http://www.cnblogs.com/boshen-hzb/p/6841982.html
http://blog.csdn.net/zl18310999566/article/details/54341057
https://www.rabbitmq.com/getstarted.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末臼疫,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子扣孟,更是在濱河造成了極大的恐慌烫堤,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,948評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異鸽斟,居然都是意外死亡拔创,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,371評論 3 385
  • 文/潘曉璐 我一進店門富蓄,熙熙樓的掌柜王于貴愁眉苦臉地迎上來剩燥,“玉大人,你說我怎么就攤上這事立倍∶鸷欤” “怎么了?”我有些...
    開封第一講書人閱讀 157,490評論 0 348
  • 文/不壞的土叔 我叫張陵口注,是天一觀的道長变擒。 經(jīng)常有香客問我,道長寝志,這世上最難降的妖魔是什么赁项? 我笑而不...
    開封第一講書人閱讀 56,521評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮澈段,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘舰攒。我一直安慰自己败富,他們只是感情好,可當我...
    茶點故事閱讀 65,627評論 6 386
  • 文/花漫 我一把揭開白布摩窃。 她就那樣靜靜地躺著兽叮,像睡著了一般。 火紅的嫁衣襯著肌膚如雪猾愿。 梳的紋絲不亂的頭發(fā)上鹦聪,一...
    開封第一講書人閱讀 49,842評論 1 290
  • 那天,我揣著相機與錄音蒂秘,去河邊找鬼泽本。 笑死,一個胖子當著我的面吹牛姻僧,可吹牛的內(nèi)容都是我干的规丽。 我是一名探鬼主播,決...
    沈念sama閱讀 38,997評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼撇贺,長吁一口氣:“原來是場噩夢啊……” “哼赌莺!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起松嘶,我...
    開封第一講書人閱讀 37,741評論 0 268
  • 序言:老撾萬榮一對情侶失蹤艘狭,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體巢音,經(jīng)...
    沈念sama閱讀 44,203評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡遵倦,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,534評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了港谊。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片骇吭。...
    茶點故事閱讀 38,673評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖歧寺,靈堂內(nèi)的尸體忽然破棺而出燥狰,到底是詐尸還是另有隱情,我是刑警寧澤斜筐,帶...
    沈念sama閱讀 34,339評論 4 330
  • 正文 年R本政府宣布龙致,位于F島的核電站,受9級特大地震影響顷链,放射性物質(zhì)發(fā)生泄漏目代。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,955評論 3 313
  • 文/蒙蒙 一嗤练、第九天 我趴在偏房一處隱蔽的房頂上張望榛了。 院中可真熱鬧,春花似錦煞抬、人聲如沸霜大。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,770評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽战坤。三九已至,卻和暖如春残拐,著一層夾襖步出監(jiān)牢的瞬間途茫,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,000評論 1 266
  • 我被黑心中介騙來泰國打工溪食, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留囊卜,地道東北人。 一個月前我還...
    沈念sama閱讀 46,394評論 2 360
  • 正文 我出身青樓错沃,卻偏偏與公主長得像边败,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子捎废,可洞房花燭夜當晚...
    茶點故事閱讀 43,562評論 2 349

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理笑窜,服務(wù)發(fā)現(xiàn),斷路器登疗,智...
    卡卡羅2017閱讀 134,633評論 18 139
  • 來源 RabbitMQ是用Erlang實現(xiàn)的一個高并發(fā)高可靠AMQP消息隊列服務(wù)器排截。支持消息的持久化嫌蚤、事務(wù)、擁塞控...
    jiangmo閱讀 10,350評論 2 34
  • RabbitMQ 即一個消息隊列断傲,主要是用來實現(xiàn)應(yīng)用程序的異步和解耦脱吱,同時也能起到消息緩沖,消息分發(fā)的作用认罩。 消息...
    彩虹之夢閱讀 1,082評論 2 1
  • 什么叫消息隊列 消息(Message)是指在應(yīng)用間傳送的數(shù)據(jù)箱蝠。消息可以非常簡單,比如只包含文本字符串垦垂,也可以更復(fù)雜...
    lijun_m閱讀 1,340評論 0 1