Spring AMQP
先對(duì)本片博客進(jìn)行總結(jié)敷钾,可直接跳過總結(jié)看下面的博客正文。
總結(jié):
spring-amqp二個(gè)核心類RabbitAdmin和RabbitTemplate類
1.RabbitAdmin類完成對(duì)Exchange晚树,Queue,Binging的操作,在容器中管理了RabbitAdmin類的時(shí)候茂洒,可以對(duì)Exchange搜变,Queue采缚,Binging進(jìn)行自動(dòng)聲明。
2.RabbitTemplate類是發(fā)送和接收消息的工具類挠他。(下一篇博客具體講解)
簡(jiǎn)介
Spring AMQP項(xiàng)目將核心Spring概念應(yīng)用于基于AMQP的消息傳遞解決方案的開發(fā)扳抽。 它提供了一個(gè)“模板”(template)作為發(fā)送和接收消息的高級(jí)抽象。 它還通過“偵聽器容器(listener container)”為消息驅(qū)動(dòng)的POJO提供支持。 這些庫(kù)促進(jìn)AMQP資源的管理贸呢,同時(shí)促進(jìn)使用依賴注入和聲明式配置镰烧。 在所有這些情況下,您將看到與Spring框架中的JMS支持的相似之處楞陷。
Spring AMQP包括兩個(gè)部分怔鳖;spring-amqp是對(duì)amqp的一些概念的一些抽象。spring-rabbit是對(duì)AMQP的實(shí)現(xiàn)RabbitMQ的實(shí)現(xiàn)猜谚。
特征
- 異步處理消費(fèi)消息的一個(gè)監(jiān)聽容器(
Listener container
) - 使用
RabbitTemplate
類的實(shí)例來發(fā)送和接收消息败砂。 - 使用
RabbitAdmin
去自動(dòng)聲明隊(duì)列(queues
),交換機(jī)(exchanges
)魏铅,綁定(bindings
)
spring-amqp
模塊是對(duì)AMQP協(xié)議的一個(gè)抽象和封裝昌犹。所以說對(duì)所有的AMQP的實(shí)現(xiàn)都進(jìn)行的抽象和封裝,比如
org.springframework.amqp.core.Binding
:綁定的封裝览芳,類型有QUEUE
和EXCHANGE
斜姥。
org.springframework.amqp.core.Exchange
:其有基本的四種實(shí)現(xiàn)
org.springframework.amqp.core.Message
:消息是由屬性和body構(gòu)成,將屬性也封裝成一個(gè)對(duì)象MessageProperties沧竟。
org.springframework.amqp.core.MessageProperties
:對(duì)消息屬性進(jìn)行了抽象铸敏。
org.springframework.amqp.core.Queue
:隊(duì)列的封裝。
還有對(duì)消息的轉(zhuǎn)換進(jìn)行了封裝悟泵,相關(guān)的類在org.springframework.amqp.support.converter
包下面杈笔。(下面的博客會(huì)專門講解消息轉(zhuǎn)換converter的一些實(shí)現(xiàn))。
spring-rabbit
模塊是建立在spring
糕非,spring-amqp
蒙具,amqp-client
(rabbitmq java client)之上的,是具體操作RabbitMQ的朽肥,底層對(duì)Rabbitmq
的操作是使用amqp-client
的禁筏。
二個(gè)核心類,一個(gè)是org.springframework.amqp.rabbit.core.RabbitAdmin
和org.springframework.amqp.rabbit.core.RabbitTemplate
spring-rabbit
對(duì)日志進(jìn)行了擴(kuò)展衡招,可以將日志發(fā)送到mq中篱昔。
Demo
加入spring-amqp依賴:
<dependencies>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.7.3.RELEASE</version>
</dependency>
</dependencies>
RabbitmqAdmin使用
容器中納入ConnectionFactory和RabbitAdmin管理
@Configuration
public class MQConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
return new RabbitAdmin(connectionFactory);
}
}
應(yīng)用類,使用RabbitAdmin進(jìn)行Exchange始腾,Queue州刽,Binding操作
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import java.util.HashMap;
import java.util.Map;
@ComponentScan
public class Application {
public static void main(String[] args) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
RabbitAdmin rabbitAdmin = context.getBean(RabbitAdmin.class);
System.out.println(rabbitAdmin);
//創(chuàng)建四種類型的Exchange,可重復(fù)執(zhí)行
rabbitAdmin.declareExchange(new DirectExchange("zhihao.direct.exchange",true,false));
rabbitAdmin.declareExchange(new TopicExchange("zhihao.topic.exchange",true,false));
rabbitAdmin.declareExchange(new FanoutExchange("zhihao.fanout.exchange",true,false));
rabbitAdmin.declareExchange(new HeadersExchange("zhihao.header.exchange",true,false));
//刪除Exchange
//rabbitAdmin.deleteExchange("zhihao.header.exchange");
//定義隊(duì)列
rabbitAdmin.declareQueue(new Queue("zhihao.debug",true));
rabbitAdmin.declareQueue(new Queue("zhihao.info",true));
rabbitAdmin.declareQueue(new Queue("zhihao.error",true));
//刪除隊(duì)列
//rabbitAdmin.deleteQueue("zhihao.debug");
//將隊(duì)列中的消息全消費(fèi)掉
rabbitAdmin.purgeQueue("zhihao.info",false);
//綁定,指定要綁定的Exchange和Route key
rabbitAdmin.declareBinding(new Binding("zhihao.debug",Binding.DestinationType.QUEUE,
"zhihao.direct.exchange","zhihao.hehe",new HashMap()));
rabbitAdmin.declareBinding(new Binding("zhihao.info",Binding.DestinationType.QUEUE,
"zhihao.direct.exchange","zhihao.haha",new HashMap()));
rabbitAdmin.declareBinding(new Binding("zhihao.error",Binding.DestinationType.QUEUE,
"zhihao.direct.exchange","zhihao.welcome",new HashMap()));
//綁定header exchange
Map<String,Object> headerValues = new HashMap<>();
headerValues.put("type",1);
headerValues.put("size",10);
//whereAll指定了x-match: all參數(shù)
rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("zhihao.debug")).
to(new HeadersExchange("zhihao.header.exchange")).whereAll(headerValues).match());
//whereAll指定了x-match: any參數(shù)
rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("zhihao.info")).
to(new HeadersExchange("zhihao.header.exchange")).whereAny(headerValues).match());
//進(jìn)行解綁
rabbitAdmin.removeBinding(BindingBuilder.bind(new Queue("zhihao.info")).
to(new TopicExchange("zhihao.direct.exchange")).with("zhihao.info"));
//聲明topic類型的exchange
rabbitAdmin.declareExchange(new TopicExchange("zhihao.hehe.exchange",true,false));
rabbitAdmin.declareExchange(new TopicExchange("zhihao.miao.exchange",true,false));
//exchange與exchange綁定
rabbitAdmin.declareBinding(new Binding("zhihao.hehe.exchange",Binding.DestinationType.EXCHANGE,
"zhihao.miao.exchange","zhihao",new HashMap()));
//使用BindingBuilder進(jìn)行綁定
rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("zhihao.debug")).
to(new TopicExchange("zhihao.topic.exchange")).with("zhihao.miao"));
//rabbitAdmin.declareBinding(new Binding("amq.rabbitmq.trace",Binding.DestinationType.EXCHANGE,
//"amq.rabbitmq.log","zhihao",new HashMap()));
context.close();
}
}
Exchange ,Queue,Binding的自動(dòng)聲明
直接把要自動(dòng)聲明的組件Bean納入到spring容器中管理即可浪箭。
自動(dòng)聲明發(fā)生的rabbitmq第一次連接創(chuàng)建的時(shí)候穗椅。如果系統(tǒng)從啟動(dòng)到停止沒有創(chuàng)建任何連接,則不會(huì)自動(dòng)創(chuàng)建山林。自定聲明支持單個(gè)和多個(gè)。
自動(dòng)聲明Exchange:
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DeclareConfig {
//聲明direct類型的Exchange
@Bean
public Exchange directExchange(){
return new DirectExchange("zhihao.direct.exchange",true,false);
}
//聲明topic類型的Exchange
@Bean
public Exchange topicExchange(){
return new TopicExchange("zhihao.topic.exchange",true,false);
}
//聲明fanout類型的Exchange
@Bean
public Exchange fanoutExchange(){
return new FanoutExchange("zhihao.fanout.exchange",true,false);
}
//聲明headers類型的Exchange
@Bean
public Exchange headersExchange(){
return new HeadersExchange("zhihao.header.exchange",true,false);
}
}
配置類,在spring容器中納入ConnectionFactory實(shí)例和RabbitAdmin實(shí)例
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MQConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
return new RabbitAdmin(connectionFactory);
}
}
啟動(dòng)應(yīng)用類驼抹,自動(dòng)聲明發(fā)生的rabbitmq第一次連接創(chuàng)建的時(shí)候桑孩。如果系統(tǒng)從啟動(dòng)到停止沒有創(chuàng)建任何連接,則不會(huì)自動(dòng)創(chuàng)建框冀。
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
@ComponentScan
public class Application {
public static void main(String[] args) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
//使得客戶端第一次連接rabbitmq
context.getBean(RabbitAdmin.class).getQueueProperties("**");
context.close();
}
}
隊(duì)列的自動(dòng)聲明
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DeclareConfig {
@Bean
public Queue debugQueue(){
return new Queue("zhihao.debug",true);
}
@Bean
public Queue infoQueue(){
return new Queue("zhihao.info",true);
}
@Bean
public Queue errorQueue(){
return new Queue("zhihao.error",true);
}
}
上面的Application和DeclareConfig不列舉出來了流椒,執(zhí)行Application應(yīng)用啟動(dòng)類,查看web管控臺(tái)的隊(duì)列生成明也。
綁定的自動(dòng)生成
DeclareConfig類中宣虾,
import org.springframework.amqp.core.Binding;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
@Configuration
public class DeclareConfig {
@Bean
public Binding binding(){
return new Binding("zhihao.debug",Binding.DestinationType.QUEUE,
"zhihao.direct.exchange","zhihao.debug",new HashMap());
}
@Bean
public Binding binding2(){
return new Binding("zhihao.info",Binding.DestinationType.QUEUE,
"zhihao.direct.exchange","zhihao.info",new HashMap());
}
@Bean
public Binding binding3(){
return new Binding("zhihao.error",Binding.DestinationType.QUEUE,
"zhihao.direct.exchange","zhihao.error",new HashMap());
}
}
上面的Application和DeclareConfig不列舉出來了,執(zhí)行Application應(yīng)用啟動(dòng)類温数,查看web管控臺(tái)的Binding生成绣硝。
一次性生成多個(gè)queue,exchange,binding
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@Configuration
public class DeclareConfig {
@Bean
public List<Queue> queues(){
List<Queue> queueList = new ArrayList<>();
queueList.add(new Queue("chao.wang.debug",true));
queueList.add(new Queue("chao.wang.info",true));
queueList.add(new Queue("chao.wang.error",true));
return queueList;
}
@Bean
public List<Exchange> exchanges(){
List<Exchange> exchangeList = new ArrayList<>();
exchangeList.add(new TopicExchange("chao.wang.debug.topic.exchange",true,false));
exchangeList.add(new TopicExchange("chao.wang.info.topic.exchange",true,false));
exchangeList.add(new TopicExchange("chao.wang.error.topic.exchange",true,false));
return exchangeList;
}
@Bean
public List<Binding> bindings(){
List<Binding> bindingList = new ArrayList<>();
bindingList.add(BindingBuilder.bind(new Queue("chao.wang.debug")).
to(new TopicExchange("chao.wang.debug.topic.exchange")).with("chao.wang.#"));
bindingList.add(BindingBuilder.bind(new Queue("chao.wang.info")).
to(new TopicExchange("chao.wang.debug.topic.exchange")).with("chao.wang.*"));
bindingList.add(BindingBuilder.bind(new Queue("chao.wang.error")).
to(new TopicExchange("chao.wang.debug.topic.exchange")).with("chao.wang.error.*"));
return bindingList;
}
}
上面的Application和DeclareConfig不列舉出來了撑刺,執(zhí)行Application應(yīng)用啟動(dòng)類鹉胖,查看web管控臺(tái)Exchange,Queue,Binding都已經(jīng)生成。
注意
當(dāng)聲明隊(duì)列是以amp開頭的時(shí)候够傍,隊(duì)列是不能創(chuàng)建聲明的甫菠。
@Bean
public Queue amqQueue(){
return new Queue("amp.log",true);
}
總結(jié)
自動(dòng)聲明的一些條件
- 要有連接(對(duì)rabbitmq的連接)
- 容器中要有
org.springframework.amqp.rabbit.core.RabbitAdmin
的實(shí)例RabbitAdmin
的autoStartup
屬性必須為true。- 如果
ConnectionFactory
使用的是CachingConnectionFactory
冕屯,則cacheMode
必須是CachingConnectionFactory.CacheMode.CHANNEL
(默認(rèn))寂诱。- 所要聲明的組件(
Queue
,Exchange
和Binding
)的shouldDeclare
必須是true
(默認(rèn)就是true
)Queue
隊(duì)列的名字不能以amq.
開頭安聘。
注意:Queue
痰洒,Exchange
和Binding
都直接或者間接的繼承Declarable
,而Declarable
中定義了shouldDeclare
的方法搞挣。
自動(dòng)聲明源碼分析
org.springframework.amqp.rabbit.core.RabbitAdmin
實(shí)現(xiàn)InitializingBean
接口带迟,在BeanFactory
設(shè)置完所有屬性之后執(zhí)行特定初始化(afterPropertiesSet
方法)
RabbitAdmin
的afterPropertiesSet
方法,
@Override
public void afterPropertiesSet() {
synchronized (this.lifecycleMonitor) {
//autoStartup屬性的值為false的時(shí)候囱桨,直接return
if (this.running || !this.autoStartup) {
return;
}
//connectionFactory實(shí)例如果是CachingConnectionFactory仓犬,并且CacheMode是CacheMode.CONNECTION也會(huì)return下面不執(zhí)行了。
if (this.connectionFactory instanceof CachingConnectionFactory &&
((CachingConnectionFactory) this.connectionFactory).getCacheMode() == CacheMode.CONNECTION) {
this.logger.warn("RabbitAdmin auto declaration is not supported with CacheMode.CONNECTION");
return;
}
//連接的監(jiān)聽器
this.connectionFactory.addConnectionListener(new ConnectionListener() {
// Prevent stack overflow...
private final AtomicBoolean initializing = new AtomicBoolean(false);
@Override
public void onCreate(Connection connection) {
if (!initializing.compareAndSet(false, true)) {
// If we are already initializing, we don't need to do it again...
return;
}
try {
//執(zhí)行這個(gè)方法
initialize();
}
finally {
initializing.compareAndSet(true, false);
}
}
@Override
public void onClose(Connection connection) {
}
});
this.running = true;
}
}
RabbitAdmin
的initialize
方法舍肠,聲明所有exchanges
, queues
和 bindings
/**
* Declares all the exchanges, queues and bindings in the enclosing application context, if any. It should be safe
* (but unnecessary) to call this method more than once.
*/
public void initialize() {
if (this.applicationContext == null) {
this.logger.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings");
return;
}
this.logger.debug("Initializing declarations");
//得到容器中所有的Exchange
Collection<Exchange> contextExchanges = new LinkedList<Exchange>(
this.applicationContext.getBeansOfType(Exchange.class).values());
//得到容器中所有的Queue
Collection<Queue> contextQueues = new LinkedList<Queue>(
this.applicationContext.getBeansOfType(Queue.class).values());
//得到容器中所有的Binding
Collection<Binding> contextBindings = new LinkedList<Binding>(
this.applicationContext.getBeansOfType(Binding.class).values());
//獲取容器中所有的Collection搀继,如果容器中所有元素是Exchange,Queue或者Binding的時(shí)候?qū)⑦@些實(shí)例也加入到spring容器中翠语。
@SuppressWarnings("rawtypes")
Collection<Collection> collections = this.applicationContext.getBeansOfType(Collection.class, false, false)
.values();
for (Collection<?> collection : collections) {
if (collection.size() > 0 && collection.iterator().next() instanceof Declarable) {
for (Object declarable : collection) {
if (declarable instanceof Exchange) {
contextExchanges.add((Exchange) declarable);
}
else if (declarable instanceof Queue) {
contextQueues.add((Queue) declarable);
}
else if (declarable instanceof Binding) {
contextBindings.add((Binding) declarable);
}
}
}
}
//進(jìn)行了filter過濾叽躯,
final Collection<Exchange> exchanges = filterDeclarables(contextExchanges);
final Collection<Queue> queues = filterDeclarables(contextQueues);
final Collection<Binding> bindings = filterDeclarables(contextBindings);
for (Exchange exchange : exchanges) {
if ((!exchange.isDurable() || exchange.isAutoDelete()) && this.logger.isInfoEnabled()) {
this.logger.info("Auto-declaring a non-durable or auto-delete Exchange ("
+ exchange.getName()
+ ") durable:" + exchange.isDurable() + ", auto-delete:" + exchange.isAutoDelete() + ". "
+ "It will be deleted by the broker if it shuts down, and can be redeclared by closing and "
+ "reopening the connection.");
}
}
for (Queue queue : queues) {
if ((!queue.isDurable() || queue.isAutoDelete() || queue.isExclusive()) && this.logger.isInfoEnabled()) {
this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue ("
+ queue.getName()
+ ") durable:" + queue.isDurable() + ", auto-delete:" + queue.isAutoDelete() + ", exclusive:"
+ queue.isExclusive() + ". "
+ "It will be redeclared if the broker stops and is restarted while the connection factory is "
+ "alive, but all messages will be lost.");
}
}
this.rabbitTemplate.execute(new ChannelCallback<Object>() {
@Override
public Object doInRabbit(Channel channel) throws Exception {
//聲明exchange,如果exchange是默認(rèn)的exchange那么也不會(huì)聲明肌括。
declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
//聲明隊(duì)列点骑,如果隊(duì)列名以amq.開頭的也不會(huì)進(jìn)行聲明
declareQueues(channel, queues.toArray(new Queue[queues.size()]));
declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));
return null;
}
});
this.logger.debug("Declarations finished");
}
filterDeclarables
方法過濾一些Exchange
,Queue
,Binding
黑滴,因?yàn)檫@三個(gè)類都是繼承Declarable這個(gè)類
憨募,
private <T extends Declarable> Collection<T> filterDeclarables(Collection<T> declarables) {
Collection<T> filtered = new ArrayList<T>();
for (T declarable : declarables) {
Collection<?> adminsWithWhichToDeclare = declarable.getDeclaringAdmins();
//shouldDeclare屬性必須是true,否則就會(huì)被過濾掉了
if (declarable.shouldDeclare() &&
(adminsWithWhichToDeclare.isEmpty() || adminsWithWhichToDeclare.contains(this))) {
filtered.add(declarable);
}
}
return filtered;
}
聲明Exchanges
private void declareExchanges(final Channel channel, final Exchange... exchanges) throws IOException {
for (final Exchange exchange : exchanges) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("declaring Exchange '" + exchange.getName() + "'");
}
//不是默認(rèn)的Exchange
if (!isDeclaringDefaultExchange(exchange)) {
try {
//是否是delayed類型的Exchange
if (exchange.isDelayed()) {
Map<String, Object> arguments = exchange.getArguments();
if (arguments == null) {
arguments = new HashMap<String, Object>();
}
else {
arguments = new HashMap<String, Object>(arguments);
}
arguments.put("x-delayed-type", exchange.getType());
//調(diào)用exchangeDeclare進(jìn)行聲明
channel.exchangeDeclare(exchange.getName(), DELAYED_MESSAGE_EXCHANGE, exchange.isDurable(),
exchange.isAutoDelete(), exchange.isInternal(), arguments);
}
else {
//調(diào)用exchangeDeclare進(jìn)行聲明
channel.exchangeDeclare(exchange.getName(), exchange.getType(), exchange.isDurable(),
exchange.isAutoDelete(), exchange.isInternal(), exchange.getArguments());
}
}
catch (IOException e) {
logOrRethrowDeclarationException(exchange, "exchange", e);
}
}
}
}
聲明Queue隊(duì)列
private DeclareOk[] declareQueues(final Channel channel, final Queue... queues) throws IOException {
List<DeclareOk> declareOks = new ArrayList<DeclareOk>(queues.length);
for (int i = 0; i < queues.length; i++) {
Queue queue = queues[i];
//隊(duì)列不以amq.開頭的隊(duì)列才能進(jìn)行聲明
if (!queue.getName().startsWith("amq.")) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("declaring Queue '" + queue.getName() + "'");
}
try {
try {
//進(jìn)行隊(duì)列聲明
DeclareOk declareOk = channel.queueDeclare(queue.getName(), queue.isDurable(),
queue.isExclusive(), queue.isAutoDelete(), queue.getArguments());
declareOks.add(declareOk);
}
catch (IllegalArgumentException e) {
if (this.logger.isDebugEnabled()) {
this.logger.error("Exception while declaring queue: '" + queue.getName() + "'");
}
try {
if (channel instanceof ChannelProxy) {
((ChannelProxy) channel).getTargetChannel().close();
}
}
catch (TimeoutException e1) {
}
throw new IOException(e);
}
}
catch (IOException e) {
logOrRethrowDeclarationException(queue, "queue", e);
}
}
this.logger.debug("Queue with name that starts with 'amq.' cannot be declared.");
}
return declareOks.toArray(new DeclareOk[declareOks.size()]);
}
binding聲明:
private void declareBindings(final Channel channel, final Binding... bindings) throws IOException {
for (Binding binding : bindings) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Binding destination [" + binding.getDestination() + " (" + binding.getDestinationType()
+ ")] to exchange [" + binding.getExchange() + "] with routing key [" + binding.getRoutingKey()
+ "]");
}
try {
//QUEUE類型的綁定
if (binding.isDestinationQueue()) {
//并且不是綁定到默認(rèn)的Default Exchange
if (!isDeclaringImplicitQueueBinding(binding)) {
//綁定隊(duì)列
channel.queueBind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(),
binding.getArguments());
}
}
else {
//Exchange類型的綁定
channel.exchangeBind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(),
binding.getArguments());
}
}
catch (IOException e) {
logOrRethrowDeclarationException(binding, "binding", e);
}
}
}
參考資料
官網(wǎng)
github地址
官網(wǎng)文檔