首先需弄明白RabbitMq的 exchange璧眠、route、queue的關(guān)系
可參照:(http://blog.csdn.net/samxx8/article/details/47417133)
RabbitMQ 三種Exchange
- Direct Exchange – 處理路由鍵读虏。需要將一個隊列綁定到交換機上责静,要求該消息與一個特定的路由鍵完全匹配。
- Fanout Exchange – 不處理路由鍵盖桥。你只需要簡單的將隊列綁定到交換機上灾螃。一個發(fā)送到交換機的消息都會被轉(zhuǎn)發(fā)到與該交換機綁定的所有隊列上。
- Topic Exchange – 將路由鍵和某模式進行匹配揩徊。此時隊列需要綁定要一個模式上腰鬼。符號“#”匹配一個或多個詞,符號“*”匹配不多不少一個詞塑荒。
以下6個實例展示大部分應(yīng)用場景:
*1.單生產(chǎn)者和單消費者
新建springboot maven工程 springboot-rabbitmq-OneToOne
pom.xml配置如下:
其中spring-boot-starter-amqp 是 rabbitmq 核心包熄赡。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.dev</groupId>
<artifactId>springboot-rabbitmq-OneToOne</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>springboot-rabbitmq-demo</name>
<description>springboot-rabbitmq-demo</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.4.RELEASE</version>
<relativePath/>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
</project>
application.yml配置如下:
spring:
application:
name: rabbit-service
rabbitmq:
host: xxx.xxx.xxx.xxx #rabbitmq服務(wù)器地址
port: xxxx #rabbitmq服務(wù)器端口
username: xxx
password: xxx
主類
package com.rabbit;
import org.springframework.amqp.core.Queue;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import com.rabbit.queue.QueueName;
@SpringBootApplication
public class SpringbootRabbitmqDemoApplication {
@Bean
public Queue helloQueue() {//劃重點:自動創(chuàng)建Queue
return new Queue(QueueName.OneToOneQueue);
}
public static void main(String[] args) {
SpringApplication.run(SpringbootRabbitmqDemoApplication.class, args);
}
}
Queue名稱定義
package com.rabbit.queue;
public class QueueName {
//此處定義必須是final不然會報錯
//The value for annotation attribute RabbitListener.queues must be a constant expression
public final static String OneToOneQueue="OneToOneQueue";
}
發(fā)送端
package com.rabbit.hello;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.rabbit.queue.QueueName;
@Component
public class Sender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String sendMsg = "Sender: hello rabbitMQ ";
System.out.println(sendMsg);
//向指定Queue發(fā)送文本消息
this.rabbitTemplate.convertAndSend(QueueName.OneToOneQueue, sendMsg);
}
}
接收端
package com.rabbit.hello;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbit.queue.QueueName;
@Component
@RabbitListener(queues = {QueueName.OneToOneQueue}) //監(jiān)控指定的Queue
public class Receiver {
@RabbitHandler
public void process(String revmsg) {
System.out.println("Receiver : " + revmsg);
}
}
Controller
package com.rabbit.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import com.rabbit.hello.Sender;
@RestController
public class RabbitTest {
@Autowired
private Sender sender;
@GetMapping("/oneToOne")
public void oneToOne() {
sender.send();
}
}
測試結(jié)果:
http://127.0.0.1:8080/oneToOne
Sender: hello rabbitMQ
Receiver : Sender: hello rabbitMQ
至此,單生產(chǎn)者和單消費者整合結(jié)束袜炕。
*2.單生產(chǎn)者和多消費者
與單消費者類似本谜,其中接收端為多個消費者
package com.rabbit.hello;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbit.queue.QueueName;
@Component
@RabbitListener(queues = {QueueName.OneToManyQueue})//監(jiān)控指定的Queue, 名稱修改,以區(qū)分一對一的Queue
public class Receiver1 {
@RabbitHandler
public void process(String revmsg) {
System.out.println("Receiver1 : " + revmsg);
}
}
package com.rabbit.hello;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbit.queue.QueueName;
@Component
@RabbitListener(queues = {QueueName.OneToManyQueue})//監(jiān)控指定的Queue, 名稱修改偎窘,以區(qū)分一對一的Queue
public class Receiver2 {
@RabbitHandler
public void process(String revmsg) {
System.out.println("Receiver2 : " + revmsg);
}
}
Controller
package com.rabbit.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import com.rabbit.hello.Sender;
@RestController
public class RabbitTest {
@Autowired
private Sender sender;
/**
* 單生產(chǎn)者-多消費者
*/
@GetMapping("/oneToMany")
public void oneToMany() {
for(int i=0;i<10;i++){
sender.send("hellomsg:"+i);
}
}
}
測試結(jié)果:
http://127.0.0.1:8080/oneToMany
生產(chǎn)者發(fā)送的10條消息乌助,分別被兩個消費者接收了
Sender: hello rabbitMQ
Sender: hello rabbitMQ
Sender: hello rabbitMQ
Sender: hello rabbitMQ
Sender: hello rabbitMQ
Sender: hello rabbitMQ
Sender: hello rabbitMQ
Sender: hello rabbitMQ
Sender: hello rabbitMQ
Sender: hello rabbitMQ
Receiver1 : Sender: hello rabbitMQ
Receiver2 : Sender: hello rabbitMQ
Receiver1 : Sender: hello rabbitMQ
Receiver2 : Sender: hello rabbitMQ
Receiver1 : Sender: hello rabbitMQ
Receiver2 : Sender: hello rabbitMQ
Receiver1 : Sender: hello rabbitMQ
Receiver2 : Sender: hello rabbitMQ
Receiver1 : Sender: hello rabbitMQ
Receiver2 : Sender: hello rabbitMQ
至此溜在,單生產(chǎn)者和多消費者整合結(jié)束。
*3.多生產(chǎn)者和多消費者
跟以上類似他托,再加個生產(chǎn)者即可
package com.rabbit.hello;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.rabbit.queue.QueueName;
@Component
public class Sender1 {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String msg) {
String sendMsg = msg;
System.out.println("Sender1 : " + sendMsg);
this.rabbitTemplate.convertAndSend(QueueName.ManyToManyQueue, sendMsg);
}
}
package com.rabbit.hello;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.rabbit.queue.QueueName;
@Component
public class Sender2 {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String msg) {
String sendMsg = msg;
System.out.println("Sender2 : " + sendMsg);
this.rabbitTemplate.convertAndSend(QueueName.ManyToManyQueue, sendMsg);
}
}
Controller
package com.rabbit.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import com.rabbit.hello.Sender1;
import com.rabbit.hello.Sender2;
@RestController
public class RabbitTest {
@Autowired
private Sender1 sender1;
@Autowired
private Sender2 sender2;
/**
* 多生產(chǎn)者-多消費者
*/
@GetMapping("/manyToMany")
public void manyToMany() {
for(int i=0;i<10;i++){
sender1.send("msg-1 :"+i);
sender2.send("msg-2 :"+i);
}
}
}
http://127.0.0.1:8080/manyToMany
接收端仍然會均勻接收到消息
Sender1 : msg-1 :0
Sender2 : msg-2 :0
Sender1 : msg-1 :1
Sender2 : msg-2 :1
Sender1 : msg-1 :2
Sender2 : msg-2 :2
Sender1 : msg-1 :3
Sender2 : msg-2 :3
Sender1 : msg-1 :4
Sender2 : msg-2 :4
Sender1 : msg-1 :5
Sender2 : msg-2 :5
Sender1 : msg-1 :6
Sender2 : msg-2 :6
Sender1 : msg-1 :7
Sender2 : msg-2 :7
Sender1 : msg-1 :8
Sender2 : msg-2 :8
Sender1 : msg-1 :9
Sender2 : msg-2 :9
Receiver1 : msg-1 :0
Receiver2 : msg-2 :0
Receiver1 : msg-1 :1
Receiver2 : msg-2 :1
Receiver1 : msg-1 :2
Receiver2 : msg-2 :2
Receiver1 : msg-1 :3
Receiver2 : msg-2 :3
Receiver1 : msg-1 :4
Receiver2 : msg-2 :4
Receiver1 : msg-1 :5
Receiver2 : msg-2 :5
Receiver2 : msg-2 :6
Receiver1 : msg-1 :6
Receiver2 : msg-1 :7
Receiver1 : msg-2 :7
Receiver2 : msg-1 :8
Receiver1 : msg-2 :8
Receiver1 : msg-2 :9
Receiver2 : msg-1 :9
*4. topic ExChange(DirectExchange 與之類似較簡單掖肋,此處不作實例演示,可參照RPC消息實例赏参。)
啟動類修改:
package com.rabbit;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import com.rabbit.queue.QueueName;
@SpringBootApplication
public class SpringbootRabbitmqTopicApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootRabbitmqTopicApplication.class, args);
}
// 創(chuàng)建隊列
@Bean
public Queue queueMessage() {
return new Queue(QueueName.message);//隊列名稱自己定義在QueueName類里
}
// 創(chuàng)建隊列
@Bean
public Queue queueMessages() {
return new Queue(QueueName.messages);
}
// 創(chuàng)建交換器
@Bean
TopicExchange exchange() {
return new TopicExchange("topicExchange");
}
@Bean
Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {//劃重點:隊列名稱與上述方法名一致
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");//同一exchange下的隊列綁定不同的routingkey
}
@Bean
Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");//#代表一個或多個匹配志笼,*代表一個字符匹配
}
}
發(fā)送方
//3個不同方法均可用于測試,綁定3個不同的routingkey
package com.rabbit.topic;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.rabbit.queue.QueueName;
@Component
public class TopicSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "i am testTopic";
System.out.println("Sender : " + context);
String routingKey="topic.1";
this.rabbitTemplate.convertAndSend("topicExchange", routingKey, context);
}
public void send1() {
String context = "i am testTopic 1";
System.out.println("Sender : " + context);
String routingKey="topic.message";
//檢查routingKey "topic.message" 是否匹配QueueName.message(topic.message)把篓,QueueName.messages(topic.#)中的routingKey
this.rabbitTemplate.convertAndSend("topicExchange", routingKey,context);
}
public void send2() {
String context = "i am testTopic 2";
System.out.println("Sender : " + context);
String routingKey="topic.messages";
this.rabbitTemplate.convertAndSend("topicExchange", routingKey,context);
}
}
兩個接收方
指定不同的隊列纫溃,測試是單獨收到還是同時收到消息
package com.rabbit.topic;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbit.queue.QueueName;
@Component
@RabbitListener(queues = QueueName.message)
public class TopicReceiver1 {
@RabbitHandler
public void process(String message) {
System.out.println("Topic Receiver1 : " + message);
}
}
package com.rabbit.topic;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbit.queue.QueueName;
@Component
@RabbitListener(queues = QueueName.messages)
public class TopicReceiver2 {
@RabbitHandler
public void process(String message) {
System.out.println("Topic Receiver2 : " + message);
}
}
Controller
package com.rabbit.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import com.rabbit.topic.TopicSender;
@RestController
public class RabbitTest {
@Autowired
private TopicSender topicSender;
/**
* topic exchange類型rabbitmq測試
*/
@GetMapping("/topicTest")
public void topicTest() {
topicSender.send();
}
@GetMapping("/topicTest1")
public void topicTest1() {
topicSender.send1();
}
@GetMapping("/topicTest2")
public void topicTest2() {
topicSender.send2();
}
}
測試結(jié)果(routingKey-隊列1:topic.message 隊列2:topic.messages)
http://localhost:8080/topicTest
Sender : i am testTopic
Topic Receiver2 : i am testTopic
發(fā)送方的routingkey 是topic.1不匹配第一個隊列綁定的routingkey
而第二個隊列routingkey 是topic.# 模糊匹配,所以綁定第二個隊列的接收端能收到消息
--------------------------------------------
http://localhost:8080/topicTest1
Sender : i am testTopic 1
Topic Receiver1 : i am testTopic 1
Topic Receiver2 : i am testTopic 1
發(fā)送方的routingkey是topic.message韧掩,綁定的兩個隊列的routingKey均匹配紊浩,則都能收到
---------------------------------------------
http://localhost:8080/topicTest2
Sender : i am testTopic 2
Topic Receiver2 : i am testTopic 2
發(fā)送方的routingkey是topic.messages ,不匹配第一個隊列routingKey疗锐,而第二個模糊匹配坊谁,則能收到
至此topic相關(guān)結(jié)束。
*5. fanout ExChange
Fanout 就是我們熟悉的廣播模式或者訂閱模式滑臊,給Fanout轉(zhuǎn)發(fā)器發(fā)送消息口芍,綁定這個轉(zhuǎn)發(fā)器的所有隊列都收到這個消息。
啟動類(3個隊列綁定FanoutExchange ):
package com.rabbit;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class SpringbootRabbitmqFanoutApplication {
@Bean
public Queue AMessage() {
return new Queue("fanout.A");
}
@Bean
public Queue BMessage() {
return new Queue("fanout.B");
}
@Bean
public Queue CMessage() {
return new Queue("fanout.C");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(BMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(CMessage).to(fanoutExchange);
}
public static void main(String[] args) {
SpringApplication.run(SpringbootRabbitmqFanoutApplication.class, args);
}
}
發(fā)送方:
package com.rabbit.fanout;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class FanoutSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String msgString="fanoutSender : I am fanoutSender";
System.out.println(msgString);
this.rabbitTemplate.convertAndSend("fanoutExchange","hehe" ,msgString);//routingkey任意雇卷,去除routingKey則不生效
}
}
接收方
package com.rabbit.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {
@RabbitHandler
public void process(String msg) {
System.out.println("FanoutReceiverA : " + msg);
}
}
package com.rabbit.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {
@RabbitHandler
public void process(String msg) {
System.out.println("FanoutReceiverB : " + msg);
}
}
package com.rabbit.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC {
@RabbitHandler
public void process(String msg) {
System.out.println("FanoutReceiverC : " + msg);
}
}
Controller:
package com.rabbit.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import com.rabbit.fanout.FanoutSender;
@RestController
public class RabbitTest {
@Autowired
private FanoutSender fanoutSender;
@Autowired
/**
* fanout exchange類型rabbitmq測試
*/
@GetMapping("/fanoutTest")
public void fanoutTest() {
fanoutSender.send();
}
}
測試結(jié)果
http://localhost:8080/fanoutTest
fanoutSender : I am fanoutSender
FanoutReceiverB : fanoutSender : I am fanoutSender
FanoutReceiverC : fanoutSender : I am fanoutSender
FanoutReceiverA : fanoutSender : I am fanoutSender
至此fanout exchange結(jié)束
*6. RPC(callback)
啟動類
package com.rabbit;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import com.rabbit.queue.QueueName;
@SpringBootApplication
public class SpringbootRabbitmqCallbackApplication {
/** 設(shè)置交換機類型 */
@Bean
public DirectExchange defaultExchange() {
/**
* DirectExchange:按照routingkey分發(fā)到指定隊列
* TopicExchange:多關(guān)鍵字匹配
* FanoutExchange: 將消息分發(fā)到所有的綁定隊列鬓椭,無routingkey的概念
* HeadersExchange:通過添加屬性key-value匹配
*/
return new DirectExchange(QueueName.FOO_EXCHANGE);
}
@Bean
public Queue fooQueue() {
return new Queue(QueueName.FOO_QUEUE);
}
@Bean
public Binding binding() {
/** 將隊列綁定到交換機 */
return BindingBuilder.bind(fooQueue()).to(defaultExchange()).with(QueueName.FOO_ROUTINGKEY);
}
public static void main(String[] args) {
SpringApplication.run(SpringbootRabbitmqCallbackApplication.class, args);
}
}
增加回調(diào)處理,這里不再使用application.properties默認配置的方式聋庵,會在程序中顯示的使用文件中的配置信息膘融。
package com.rabbitmq.config;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
@Configuration
public class AmqpConfig {
@Value("${spring.rabbitmq.addresses}")
private String addresses;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
@Value("${spring.rabbitmq.publisher-confirms}")
private boolean publisherConfirms;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(addresses);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
/** 如果要進行消息回調(diào),則這里必須要設(shè)置為true */
connectionFactory.setPublisherConfirms(publisherConfirms);
return connectionFactory;
}
@Bean
/** 因為要設(shè)置回調(diào)類祭玉,所以應(yīng)是prototype類型氧映,如果是singleton類型,則回調(diào)類為最后一次設(shè)置 */
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}
}
發(fā)送方
package com.rabbit.callback;
import java.util.UUID;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.rabbit.queue.QueueName;
@Component
public class Sender implements RabbitTemplate.ConfirmCallback { //劃重點 必須實現(xiàn)該接口
private RabbitTemplate rabbitTemplate;
@Autowired
public Sender(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
this.rabbitTemplate.setConfirmCallback(this);
}
public void send(String msg) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
System.out.println("send: " + correlationData.getId());
this.rabbitTemplate.convertAndSend(QueueName.FOO_EXCHANGE, QueueName.FOO_ROUTINGKEY, msg, correlationData);
}
/** 回調(diào)方法 */
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm: " + correlationData.getId());
}
}
接收方
package com.rabbit.callback;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Configuration;
import com.rabbit.queue.QueueName;
@Configuration
@RabbitListener(queues = QueueName.FOO_QUEUE)
public class Receiver {
@RabbitHandler
public void process(String foo) {
System.out.println("Receiver: " + foo);
}
}
相關(guān)名稱定義
package com.rabbit.queue;
public class QueueName {
public static final String FOO_EXCHANGE = "callback.exchange.foo";
public static final String FOO_ROUTINGKEY = "callback.routingkey.foo";
public static final String FOO_QUEUE = "callback.queue.foo";
}
Controller
package com.rabbit.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import com.rabbit.callback.Sender;
@RestController
public class RabbitTest {
@Autowired
private Sender sender;
@GetMapping("/callback")
public void send() {
sender.send("this is callback message");
}
}
測試結(jié)果
http://localhost:8080/callback
send: dc9243c5-b524-4f8c-a0c5-248018ecf66b
confirm: dc9243c5-b524-4f8c-a0c5-248018ecf66b
Receiver: this is callback message
至此Callback(RPC)結(jié)束
說明:rabbitmq中消息的發(fā)送也支持對象的傳遞脱货,只需對象實現(xiàn)Serializable接口即可岛都。
Springboot與RabbitMQ的整合部分結(jié)束,感謝振峻。
部分內(nèi)容參照:
http://www.cnblogs.com/boshen-hzb/p/6841982.html
http://blog.csdn.net/zl18310999566/article/details/54341057
https://www.rabbitmq.com/getstarted.html