@author Jacky wang
轉(zhuǎn)載請注明出處,http://www.reibang.com/p/4cfedabca746
一、 RabbitMQ的介紹
RabbitMQ是消息中間件的一種,消息中間件即分布式系統(tǒng)中完成消息的發(fā)送和接收的基礎(chǔ)軟件.這些軟件有很多,包括ActiveMQ(apache公司的)吗跋,RocketMQ等。
消息中間件的工作過程可以用生產(chǎn)者消費者模型來表示.即,生產(chǎn)者不斷的向消息隊列發(fā)送信息,而消費者從消息隊列中消費信息.具體過程如下:
從上圖可看出,對于消息隊列來說:生產(chǎn)者患亿,消息隊列步藕,消費者是最重要的三個概念挑格,生產(chǎn)者發(fā)消息到消息隊列中去,消費者監(jiān)聽指定的消息隊列,并且當(dāng)消息隊列收到消息之后咙冗,接收消息隊列傳來的消息,并且給予相應(yīng)的處理漂彤。消息隊列常用于分布式系統(tǒng)之間互相信息的傳遞雾消。
對于RabbitMQ來說,除了這三個基本模塊以外,還添加了一個模塊,即交換機(Exchange).它使得生產(chǎn)者和消息隊列之間產(chǎn)生了隔離,生產(chǎn)者將消息發(fā)送給交換機,而交換機則根據(jù)調(diào)度策略把相應(yīng)的消息轉(zhuǎn)發(fā)給對應(yīng)的消息隊列.那么RabitMQ的工作流程如下所示:
交換機Exchange:交換機的主要作用是接收相應(yīng)的消息并且綁定到指定的隊列。交換機有四種類型挫望,分別為Direct立润,topic,headers媳板,F(xiàn)anout:
Direct 是RabbitMQ默認(rèn)的交換機模式,也是最簡單的模式.即創(chuàng)建消息隊列的時候,指定一個BindingKey.當(dāng)發(fā)送者發(fā)送消息的時候,指定對應(yīng)的Key.當(dāng)Key和消息隊列的BindingKey一致的時候,消息將會被發(fā)送到該消息隊列中桑腮。
topic 轉(zhuǎn)發(fā)信息主要是依據(jù)通配符,隊列和交換機的綁定主要是依據(jù)一種模式(通配符+字符串),而當(dāng)發(fā)送消息的時候,只有指定的Key和該模式相匹配的時候,消息才會被發(fā)送到該消息隊列中。
headers 也是根據(jù)一個規(guī)則進行匹配,在消息隊列和交換機綁定的時候會指定一組鍵值對規(guī)則,而發(fā)送消息的時候也會指定一組鍵值對規(guī)則,當(dāng)兩組鍵值對規(guī)則相匹配的時候,消息會被發(fā)送到匹配的消息隊列中蛉幸。
Fanout 是路由廣播的形式,將會把消息發(fā)給綁定它的全部隊列,即便設(shè)置了key,也會被忽略破讨。
二搁骑、SpringBoot整合RabbitMQ
RabbitMQ的安裝參考:01_RabbitMQ的安裝
Springboot集成RabbitMQ十分簡單!下面開始搭建環(huán)境乏冀。
下面例子中Direct,Topic,Fanout模式的pom和application.properties的配置都是一樣的。并且,都采用的交換機模式的demo妒茬。
2.1 Direct模式
包結(jié)構(gòu)如下圖:
2.1.1 pom.xml依賴與application.properites配置
pom.xml
:
提供springboot集成RabbitMQ的依賴:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.8.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<!-- 熱部署插件 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<fork>true</fork>
</configuration>
</plugin>
</plugins>
</build>
application.properties:
server.port=9090
spring.application.name=spirngboot-rabbitmq
spring.rabbitmq.host=192.168.1.188
spring.rabbitmq.port=5672
spring.rabbitmq.username=jack
spring.rabbitmq.password=jack2017
spring.rabbitmq.virtual-host=/
2.1.2 RabbitMQ配置類及生產(chǎn)消費者配置
RabbitConfig配置類:
@SpringBootConfiguration
public class RabbitConfig {
public static final String ROUTING_KEY = "hello";
public static final String DIRECT_EXCHANGE = "directExchange";
/**
* DirectQueue名為abc的隊列
*/
@Bean
public Queue queueABC() {
return new Queue("abc");
}
/**
* DirectQueue名為xyz的隊列
*/
@Bean
public Queue queueXYZ() {
return new Queue("xyz");
}
@Bean
DirectExchange exchange() {
return new DirectExchange(DIRECT_EXCHANGE);
}
/**
* 將abc消息隊列綁定到directExchange交換機上,bingding-key為hello
*
* @param helloQueue
* @param exchange
* @return
*/
@Bean
Binding bindingExchangeABC(Queue queueABC, DirectExchange exchange) {
return BindingBuilder.bind(queueABC).to(exchange).with(ROUTING_KEY);
}
/**
* 將xyz消息隊列綁定到directExchange交換機上,bingding-key為hello
*
* @param queueXYZ
* @param exchange
* @return
*/
@Bean
Binding bindingExchangeXYZ(Queue queueXYZ, DirectExchange exchange) {
return BindingBuilder.bind(queueXYZ).to(exchange).with(ROUTING_KEY);
}
}
DirectSender生產(chǎn)者:
@Component
public class DirectSender {
//rabbitTemplate直接注入使用
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.err.println("sender1 : " + sdf.format(new Date()));
rabbitTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE, RabbitConfig.ROUTING_KEY, sdf.format(new Date()));
}
public void send2() {
User user = new User("Jack", 24);
System.err.println("sender2 : " + new Gson().toJson(user));
rabbitTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE, RabbitConfig.ROUTING_KEY, new Gson().toJson(user));
}
}
DirectReceiver消費者:
@Component
public class DirectReceiver {
@RabbitListener(queues = { "abc" }) //監(jiān)聽abc隊列
public void processABC(String msg) {
System.err.println("Receiver Queue ABC : " + msg);
}
@RabbitListener(queues = {"xyz"}) //監(jiān)聽xyz隊列
public void processXYZ(String msg) {
System.err.println("Receiver Queue XYZ : " + msg);
}
}
2.1.3 Application入口類及Controller測試入口
Application項目啟動入口:
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
DirectController測試入口:
@RestController
public class DirectController {
@Autowired
private DirectSender sender;
@RequestMapping("/send")
public void send() {
sender.send();
}
@RequestMapping("/send2")
public void send2() {
sender.send2();
}
}
2.1.4 測試結(jié)果+總結(jié)
關(guān)鍵在于綁定時綁定的交換機以及binding-key,生產(chǎn)者在發(fā)送消息時,指定了交換機和bingding-key,rabbitmq根據(jù)在RabbitMQ配置類綁定的Bean找到對應(yīng)的消息隊列Queue,從而將消息傳遞過去绿淋。
2.2 Topic模式
包結(jié)構(gòu)如下圖:
2.2.1 pom.xml依賴與application.properites配置
pom.xml依賴與application.properties的配置與Direct模式一致盾沫,略蕾哟。
2.2.2 RabbitMQ配置類及生產(chǎn)消費者配置
RabbitConfig配置類:
@SpringBootConfiguration
public class RabbitConfig {
// 隊列routing key
public static final String TOPIC_MESSAGE = "topic.message";
public static final String TOPIC_MESSAGES = "topic.messages";
// 交換機exchange
public static final String TOPIC_Exchange = "topic_exchange";
@Bean
public Queue queueMessage() {
return new Queue(TOPIC_MESSAGE);
}
@Bean
public Queue queueMessages() {
return new Queue(TOPIC_MESSAGES);
}
@Bean
public TopicExchange exchange() {
return new TopicExchange(TOPIC_Exchange);
}
/**
* 將隊列topic.message與exchange綁定逐哈,binding_key為topic.message,就是完全匹配
* @param queueMessage
* @param exchange
* @return
*/
@Bean
Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}
/**
* 將隊列topic.messages與exchange綁定,binding_key為topic.#,模糊匹配
* @param queueMessage
* @param exchange
* @return
*/
@Bean
Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");// *表示一個詞,#表示零個或多個詞
}
}
TopicSender生產(chǎn)者:
@Component
public class TopicSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String msg1 = "I am topic.mesaage msg======";
System.err.println("sender1 : " + msg1);
rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_Exchange, "topic.message", msg1);
}
public void send2() {
String msg2 = "I am topic.mesaages msg########";
System.err.println("sender2 : " + msg2);
rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_Exchange, "topic.messages", msg2);
}
}
TopicReceiver消費者:
@Component
public class TopicReceiver {
@RabbitListener(queues = RabbitConfig.TOPIC_MESSAGE) // 監(jiān)聽器監(jiān)聽指定的queue
public void process1(String message) {
System.err.println("Receiver1 >>> " + message);
}
@RabbitListener(queues = RabbitConfig.TOPIC_MESSAGES) // 監(jiān)聽器監(jiān)聽指定的queue
public void process2(String messages) {
System.err.println("Receiver2 >>> " + messages);
}
}
2.2.3 Application入口類及Controller測試入口
Application項目啟動入口:
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
TopicController測試入口:
@RestController
public class TopicController {
@Autowired
private TopicSender sender;
@RequestMapping("/send")
public void send() {
sender.send();
}
@RequestMapping("/send2")
public void send2() {
sender.send2();
}
}
2.2.4 測試結(jié)果+總結(jié)
rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_Exchange, "topic.message", msg1);
方法的第一個參數(shù)是交換機名稱,第二個參數(shù)是發(fā)送的key,第三個參數(shù)是傳遞消息的內(nèi)容唯咬。
RabbitMQ將會根據(jù)第二個參數(shù)去尋找有沒有匹配此規(guī)則的隊列,如果有,則把消息給它,如果有不止一個,則把消息分發(fā)給匹配的隊列(每個隊列都有消息!)。
顯然在我們的測試中,參數(shù)2匹配了兩個隊列(topic.message和topic.#),因此消息將會被發(fā)放到這兩個隊列中,而監(jiān)聽這兩個隊列的監(jiān)聽器都將收到消息!
那么如果把參數(shù)2改為topic.messages呢?顯然只會匹配到一個隊列,那么process2方法對應(yīng)的監(jiān)聽器收到消息!
2.3 Fanout模式
包結(jié)構(gòu)與上述一致,如下圖:
2.3.1 pom.xml依賴與application.properites配置
pom.xml依賴與application.properties的配置與Direct模式一致厚柳,略。
2.3.2 RabbitMQ配置類及生產(chǎn)消費者配置
RabbitConfig配置類:
@SpringBootConfiguration
public class RabbitConfig {
// 隊列routing key
public static final String FANOUT_A = "fanout.a";
public static final String FANOUT_B = "fanout.b";
public static final String FANOUT_C = "fanout.c";
// 交換機exchange
public static final String FANOUT_Exchange = "fanout_Exchange";
@Bean
public Queue aMessage() {
return new Queue(FANOUT_A);
}
@Bean
public Queue bMessage() {
return new Queue(FANOUT_B);
}
@Bean
public Queue cMessage() {
return new Queue(FANOUT_C);
}
/**
* fanout路由交換器
*/
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_Exchange);
}
/**
* 將隊列fanout.a與fanoutExchange綁定
* @param aMessage
* @param fanoutExchange
* @return
*/
@Bean
Binding bindingExchangeA(Queue aMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(aMessage).to(fanoutExchange);
}
/**
* 將隊列fanout.a與fanoutExchange綁定
* @param bMessage
* @param fanoutExchange
* @return
*/
@Bean
Binding bindingExchangeB(Queue bMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(bMessage).to(fanoutExchange);
}
/**
* 將隊列fanout.c與fanoutExchange綁定
* @param cMessage
* @param fanoutExchange
* @return
*/
@Bean
Binding bindingExchangeC(Queue cMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(cMessage).to(fanoutExchange);
}
}
FanoutSender生產(chǎn)者:
@Component
public class FanoutSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String msg1 = "I am fanout.mesaage msg======";
System.err.println("fanoutSender : " + msg1);
rabbitTemplate.convertAndSend(RabbitConfig.FANOUT_Exchange, "abcd.efg", msg1);
}
}
FanoutReceiver消費者:
@Component
public class FanoutReceiver {
@RabbitListener(queues = RabbitConfig.FANOUT_A) // 監(jiān)聽器監(jiān)聽指定的queue
public void process1(String message) {
System.err.println("FanoutReceiver1 >>> " + message);
}
@RabbitListener(queues = RabbitConfig.FANOUT_B) // 監(jiān)聽器監(jiān)聽指定的queue
public void process2(String messages) {
System.err.println("FanoutReceiver2 >>> " + messages);
}
@RabbitListener(queues = RabbitConfig.FANOUT_C) // 監(jiān)聽器監(jiān)聽指定的queue
public void process3(String messages) {
System.err.println("FanoutReceiver3 >>> " + messages);
}
}
2.3.3 Application入口類及Controller測試入口
Application項目啟動入口:
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
FanoutController測試入口:
@RestController
public class FanoutController {
@Autowired
private FanoutSender sender;
@RequestMapping("/send")
public void send() {
sender.send();
}
}
2.3.4 測試結(jié)果+總結(jié)
rabbitTemplate.convertAndSend(RabbitConfig.FANOUT_Exchange, "abcd.efg", msg1);
由以上結(jié)果可知:就算fanoutSender發(fā)送消息的時候胳泉,指定了routing_key為"abcd.efg"钳吟,但是所有接收者都接受到了消息
2.4 消息發(fā)送之后的回調(diào)callback
包結(jié)構(gòu)如下圖:
2.4.1 pom.xml依賴與application.properites配置
pom.xml:
主要依賴是:spring-boot-starter-amqp,其他依賴都是為了方便測試
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.8.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<!-- 熱部署插件 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<fork>true</fork>
</configuration>
</plugin>
</plugins>
</build>
application.properties添加以下配置:
server.port=9090
spring.application.name=spirngboot-rabbitmq
spring.rabbitmq.host=192.168.1.188
spring.rabbitmq.port=5672
spring.rabbitmq.username=jack
spring.rabbitmq.password=jack2017
spring.rabbitmq.virtual-host=/
#如果要進行消息回調(diào)壁酬,則這里必須要設(shè)置為true
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
2.4.2 RabbitMQ配置類
RabbitConfig配置類,作用為指定隊列,交換器類型及綁定操作:
共聲明了2個隊列璃搜,分別是topic.a,topic.b唾糯,交換器類型為TopicExchange,并與topic.a吩愧,topic.b隊列分別綁定。
@SpringBootConfiguration
public class RabbitConfig {
// 隊列routing key
public static final String TOPIC_A = "topic.a";
public static final String TOPIC_B = "topic.b";
// 交換機exchange
public static final String TOPIC_Exchange = "topicExchange";
@Bean
public Queue queueA() {
return new Queue(TOPIC_A, true);// true表示持久化該隊列
}
@Bean
public Queue queueB() {
return new Queue(TOPIC_B, true);
}
/**
* Topic路由交換器
*/
@Bean
TopicExchange topicExchange() {
return new TopicExchange(TOPIC_Exchange);
}
/**
* 將隊列topic.a與topicExchange綁定
*/
@Bean
Binding bindingExchangeA(Queue queueA, TopicExchange topicExchange) {
return BindingBuilder.bind(queueA).to(topicExchange).with("topic.a");//bindingKey為topic.a
}
/**
* 將隊列topic.b與topicExchange綁定
*/
@Bean
Binding bindingExchangeB(Queue queueB, TopicExchange topicExchange) {
return BindingBuilder.bind(queueB).to(topicExchange).with("topic.#");
}
}
2.4.3 Sender消息生產(chǎn)者
Sender消息生產(chǎn)者:
@Component
public class Sender implements RabbitTemplate.ConfirmCallback, ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
// 標(biāo)注了@PostConstruct注釋的方法將在類實例化之后調(diào)用
// 標(biāo)注了@PreDestroy注釋的方法將在類銷毀之前調(diào)用
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.err.println("消息發(fā)送成功:" + correlationData);
} else {
System.err.println("消息發(fā)送失敗:" + cause);
}
System.out.println("-----------------------------------------------------------------");
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String rerringKey) {
System.err.println(message.getMessageProperties().getCorrelationIdString() + " 發(fā)送失敗");
}
// 發(fā)送消息
public void send(String msg) {
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
System.err.println("開始發(fā)送消息 : " + msg.toLowerCase() + ">>" + correlationId);
String response = rabbitTemplate.convertSendAndReceive(RabbitConfig.TOPIC_Exchange, "topic.a", msg, correlationId).toString();
System.err.println("結(jié)束發(fā)送消息 : " + msg.toLowerCase());
System.err.println("消費者響應(yīng) : " + response + " 消息處理完成");
}
}
要點:
1.注入RabbitTemplate
2.實現(xiàn)RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback接口(后者非必須)。
ConfirmCallback接口用于實現(xiàn)消息發(fā)送到RabbitMQ交換器后接收ack回調(diào)腿堤。
ReturnCallback接口用于實現(xiàn)消息發(fā)送到RabbitMQ交換器,但無相應(yīng)隊列與交換器綁定時的回調(diào)。
3.實現(xiàn)消息發(fā)送方法樱衷。調(diào)用rabbitTemplate相應(yīng)的方法即可耍鬓。
2.4.4 Receiver消息生產(chǎn)者
application.properties增加以下配置:
spring.rabbitmq.listener.concurrency=2 //最小消息監(jiān)聽線程數(shù)
spring.rabbitmq.listener.max-concurrency=2 //最大消息監(jiān)聽線程數(shù)
由于定義了2個隊列,所以分別定義不同的監(jiān)聽器監(jiān)聽不同的隊列。
由于最小消息監(jiān)聽線程數(shù)和最大消息監(jiān)聽線程數(shù)都是2度苔,所以每個監(jiān)聽器各有2個線程實現(xiàn)監(jiān)聽功能甩骏。
Receiver消息生產(chǎn)者:
@Component
public class Receiver {
@RabbitListener(queues = RabbitConfig.TOPIC_A) // 監(jiān)聽器監(jiān)聽指定的queue
public String processMessage1(String msg) {
System.err.println(Thread.currentThread().getName() + " 接收到來自topic.a隊列的消息:" + msg);
return msg.toUpperCase();
}
@RabbitListener(queues = RabbitConfig.TOPIC_B) // 監(jiān)聽器監(jiān)聽指定的queue
public void process2(String msg) {
System.err.println(Thread.currentThread().getName() + " 接收到來自topic.b隊列的消息:" + msg);
}
}
要點:
1.監(jiān)聽器參數(shù)類型與消息實際類型匹配论熙。在生產(chǎn)者中發(fā)送的消息實際類型是String二蓝,所以這里監(jiān)聽器參數(shù)類型也是String。
2.如果監(jiān)聽器需要有響應(yīng)返回給生產(chǎn)者,直接在監(jiān)聽方法中return即可牡借。
2.4.5 Application入口類及測試入口
Application項目啟動入口:
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
測試:
@RunWith(SpringRunner.class)
@SpringBootTest(classes = { Application.class })
public class SpringbootTest {
@Autowired
private Sender sender;
@Test
public void sendTest() throws Exception {
while (true) {
String msg = new Date().toString();
sender.send(msg);
Thread.sleep(1000);
}
}
}
2.4.6 測試結(jié)果+總結(jié)
到這里,總結(jié)就結(jié)束了,最后,再附兩張經(jīng)過以上測試后,RabbitMQ中的exchange與Queue的展示:
這是因為,項目已啟動,我們配置類中配置的就自動會添加到RabbitMQ中對應(yīng)的exchange或者Queue,并完成綁定。