RabbitMQ筆記六:Spring AMQP簡(jiǎn)介與quick start

Spring AMQP

先對(duì)本片博客進(jìn)行總結(jié)敷钾,可直接跳過總結(jié)看下面的博客正文。

總結(jié):
spring-amqp二個(gè)核心類RabbitAdmin和RabbitTemplate類
1.RabbitAdmin類完成對(duì)Exchange晚树,Queue,Binging的操作,在容器中管理了RabbitAdmin類的時(shí)候茂洒,可以對(duì)Exchange搜变,Queue采缚,Binging進(jìn)行自動(dòng)聲明。
2.RabbitTemplate類是發(fā)送和接收消息的工具類挠他。(下一篇博客具體講解)

簡(jiǎn)介

Spring AMQP項(xiàng)目將核心Spring概念應(yīng)用于基于AMQP的消息傳遞解決方案的開發(fā)扳抽。 它提供了一個(gè)“模板”(template)作為發(fā)送和接收消息的高級(jí)抽象。 它還通過“偵聽器容器(listener container)”為消息驅(qū)動(dòng)的POJO提供支持。 這些庫(kù)促進(jìn)AMQP資源的管理贸呢,同時(shí)促進(jìn)使用依賴注入和聲明式配置镰烧。 在所有這些情況下,您將看到與Spring框架中的JMS支持的相似之處楞陷。

Spring AMQP包括兩個(gè)部分怔鳖;spring-amqp是對(duì)amqp的一些概念的一些抽象。spring-rabbit是對(duì)AMQP的實(shí)現(xiàn)RabbitMQ的實(shí)現(xiàn)猜谚。

特征

  1. 異步處理消費(fèi)消息的一個(gè)監(jiān)聽容器(Listener container
  2. 使用RabbitTemplate類的實(shí)例來發(fā)送和接收消息败砂。
  3. 使用RabbitAdmin去自動(dòng)聲明隊(duì)列(queues),交換機(jī)(exchanges)魏铅,綁定(bindings

spring-amqp模塊是對(duì)AMQP協(xié)議的一個(gè)抽象和封裝昌犹。所以說對(duì)所有的AMQP的實(shí)現(xiàn)都進(jìn)行的抽象和封裝,比如
org.springframework.amqp.core.Binding:綁定的封裝览芳,類型有QUEUEEXCHANGE斜姥。
org.springframework.amqp.core.Exchange:其有基本的四種實(shí)現(xiàn)

Exchange的實(shí)現(xiàn)

org.springframework.amqp.core.Message:消息是由屬性和body構(gòu)成,將屬性也封裝成一個(gè)對(duì)象MessageProperties沧竟。
org.springframework.amqp.core.MessageProperties:對(duì)消息屬性進(jìn)行了抽象铸敏。
org.springframework.amqp.core.Queue:隊(duì)列的封裝。

還有對(duì)消息的轉(zhuǎn)換進(jìn)行了封裝悟泵,相關(guān)的類在org.springframework.amqp.support.converter包下面杈笔。(下面的博客會(huì)專門講解消息轉(zhuǎn)換converter的一些實(shí)現(xiàn))。

spring-rabbit模塊是建立在spring糕非,spring-amqp蒙具,amqp-client(rabbitmq java client)之上的,是具體操作RabbitMQ的朽肥,底層對(duì)Rabbitmq的操作是使用amqp-client的禁筏。

二個(gè)核心類,一個(gè)是org.springframework.amqp.rabbit.core.RabbitAdminorg.springframework.amqp.rabbit.core.RabbitTemplate

spring-rabbit對(duì)日志進(jìn)行了擴(kuò)展衡招,可以將日志發(fā)送到mq中篱昔。

Demo

加入spring-amqp依賴:

    <dependencies>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.7.3.RELEASE</version>
        </dependency>
    </dependencies>

RabbitmqAdmin使用

容器中納入ConnectionFactory和RabbitAdmin管理

@Configuration
public class MQConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        return new RabbitAdmin(connectionFactory);
    }
}

應(yīng)用類,使用RabbitAdmin進(jìn)行Exchange始腾,Queue州刽,Binding操作

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import java.util.HashMap;
import java.util.Map;

@ComponentScan
public class Application {
    public static void main(String[] args) {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);

        RabbitAdmin rabbitAdmin = context.getBean(RabbitAdmin.class);
        System.out.println(rabbitAdmin);

        //創(chuàng)建四種類型的Exchange,可重復(fù)執(zhí)行
        rabbitAdmin.declareExchange(new DirectExchange("zhihao.direct.exchange",true,false));
        rabbitAdmin.declareExchange(new TopicExchange("zhihao.topic.exchange",true,false));
        rabbitAdmin.declareExchange(new FanoutExchange("zhihao.fanout.exchange",true,false));
        rabbitAdmin.declareExchange(new HeadersExchange("zhihao.header.exchange",true,false));

        //刪除Exchange
        //rabbitAdmin.deleteExchange("zhihao.header.exchange");

        //定義隊(duì)列
        rabbitAdmin.declareQueue(new Queue("zhihao.debug",true));
        rabbitAdmin.declareQueue(new Queue("zhihao.info",true));
        rabbitAdmin.declareQueue(new Queue("zhihao.error",true));

        //刪除隊(duì)列
        //rabbitAdmin.deleteQueue("zhihao.debug");

        //將隊(duì)列中的消息全消費(fèi)掉
        rabbitAdmin.purgeQueue("zhihao.info",false);

        //綁定,指定要綁定的Exchange和Route key
        rabbitAdmin.declareBinding(new Binding("zhihao.debug",Binding.DestinationType.QUEUE,
                "zhihao.direct.exchange","zhihao.hehe",new HashMap()));

        rabbitAdmin.declareBinding(new Binding("zhihao.info",Binding.DestinationType.QUEUE,
                "zhihao.direct.exchange","zhihao.haha",new HashMap()));

        rabbitAdmin.declareBinding(new Binding("zhihao.error",Binding.DestinationType.QUEUE,
                "zhihao.direct.exchange","zhihao.welcome",new HashMap()));


        //綁定header exchange
        Map<String,Object> headerValues = new HashMap<>();
        headerValues.put("type",1);
        headerValues.put("size",10);

        //whereAll指定了x-match:   all參數(shù)
        rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("zhihao.debug")).
                to(new HeadersExchange("zhihao.header.exchange")).whereAll(headerValues).match());

        //whereAll指定了x-match:   any參數(shù)
        rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("zhihao.info")).
                to(new HeadersExchange("zhihao.header.exchange")).whereAny(headerValues).match());


        //進(jìn)行解綁
        rabbitAdmin.removeBinding(BindingBuilder.bind(new Queue("zhihao.info")).
              to(new TopicExchange("zhihao.direct.exchange")).with("zhihao.info"));

        //聲明topic類型的exchange
        rabbitAdmin.declareExchange(new TopicExchange("zhihao.hehe.exchange",true,false));
        rabbitAdmin.declareExchange(new TopicExchange("zhihao.miao.exchange",true,false));

        //exchange與exchange綁定
        rabbitAdmin.declareBinding(new Binding("zhihao.hehe.exchange",Binding.DestinationType.EXCHANGE,
                "zhihao.miao.exchange","zhihao",new HashMap()));

        //使用BindingBuilder進(jìn)行綁定
        rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("zhihao.debug")).
                to(new TopicExchange("zhihao.topic.exchange")).with("zhihao.miao"));

        //rabbitAdmin.declareBinding(new Binding("amq.rabbitmq.trace",Binding.DestinationType.EXCHANGE,
                //"amq.rabbitmq.log","zhihao",new HashMap()));

        context.close();

    }

}

Exchange ,Queue,Binding的自動(dòng)聲明

  1. 直接把要自動(dòng)聲明的組件Bean納入到spring容器中管理即可浪箭。
    自動(dòng)聲明發(fā)生的rabbitmq第一次連接創(chuàng)建的時(shí)候穗椅。如果系統(tǒng)從啟動(dòng)到停止沒有創(chuàng)建任何連接,則不會(huì)自動(dòng)創(chuàng)建山林。

  2. 自定聲明支持單個(gè)和多個(gè)。

自動(dòng)聲明Exchange

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DeclareConfig {

    //聲明direct類型的Exchange
    @Bean
    public Exchange directExchange(){
        return new DirectExchange("zhihao.direct.exchange",true,false);
    }

    //聲明topic類型的Exchange
    @Bean
    public Exchange topicExchange(){
        return new TopicExchange("zhihao.topic.exchange",true,false);
    }

    //聲明fanout類型的Exchange
    @Bean
    public Exchange fanoutExchange(){
        return new FanoutExchange("zhihao.fanout.exchange",true,false);
    }

    //聲明headers類型的Exchange
    @Bean
    public Exchange headersExchange(){
        return new HeadersExchange("zhihao.header.exchange",true,false);
    }
}

配置類,在spring容器中納入ConnectionFactory實(shí)例和RabbitAdmin實(shí)例

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MQConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        return new RabbitAdmin(connectionFactory);
    }
}

啟動(dòng)應(yīng)用類驼抹,自動(dòng)聲明發(fā)生的rabbitmq第一次連接創(chuàng)建的時(shí)候桑孩。如果系統(tǒng)從啟動(dòng)到停止沒有創(chuàng)建任何連接,則不會(huì)自動(dòng)創(chuàng)建框冀。

import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

@ComponentScan
public class Application {
    public static void main(String[] args) {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
        //使得客戶端第一次連接rabbitmq
        context.getBean(RabbitAdmin.class).getQueueProperties("**");
        context.close();
    }
}
控制臺(tái)顯示已經(jīng)聲明創(chuàng)建

隊(duì)列的自動(dòng)聲明

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DeclareConfig {

    @Bean
    public Queue debugQueue(){
        return new Queue("zhihao.debug",true);
    }

    @Bean
    public Queue infoQueue(){
        return new Queue("zhihao.info",true);
    }

    @Bean
    public Queue errorQueue(){
        return new Queue("zhihao.error",true);
    }
}

上面的Application和DeclareConfig不列舉出來了流椒,執(zhí)行Application應(yīng)用啟動(dòng)類,查看web管控臺(tái)的隊(duì)列生成明也。

Queue自動(dòng)創(chuàng)建聲明

綁定的自動(dòng)生成
DeclareConfig類中宣虾,

import org.springframework.amqp.core.Binding;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

@Configuration
public class DeclareConfig {

    @Bean
    public Binding binding(){
        return new Binding("zhihao.debug",Binding.DestinationType.QUEUE,
                "zhihao.direct.exchange","zhihao.debug",new HashMap());
    }

    @Bean
    public Binding binding2(){
        return new Binding("zhihao.info",Binding.DestinationType.QUEUE,
                "zhihao.direct.exchange","zhihao.info",new HashMap());
    }

    @Bean
    public Binding binding3(){
        return new Binding("zhihao.error",Binding.DestinationType.QUEUE,
                "zhihao.direct.exchange","zhihao.error",new HashMap());
    }
}

上面的Application和DeclareConfig不列舉出來了,執(zhí)行Application應(yīng)用啟動(dòng)類温数,查看web管控臺(tái)的Binding生成绣硝。

Binding自動(dòng)創(chuàng)建聲明

一次性生成多個(gè)queue,exchange,binding

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

@Configuration
public class DeclareConfig {

    @Bean
    public List<Queue> queues(){
        List<Queue> queueList = new ArrayList<>();
        queueList.add(new Queue("chao.wang.debug",true));
        queueList.add(new Queue("chao.wang.info",true));
        queueList.add(new Queue("chao.wang.error",true));
        return queueList;
    }

    @Bean
    public List<Exchange> exchanges(){
        List<Exchange> exchangeList = new ArrayList<>();
        exchangeList.add(new TopicExchange("chao.wang.debug.topic.exchange",true,false));
        exchangeList.add(new TopicExchange("chao.wang.info.topic.exchange",true,false));
        exchangeList.add(new TopicExchange("chao.wang.error.topic.exchange",true,false));
        return exchangeList;
    }

    @Bean
    public List<Binding> bindings(){
        List<Binding> bindingList = new ArrayList<>();
        bindingList.add(BindingBuilder.bind(new Queue("chao.wang.debug")).
                to(new TopicExchange("chao.wang.debug.topic.exchange")).with("chao.wang.#"));
        bindingList.add(BindingBuilder.bind(new Queue("chao.wang.info")).
                to(new TopicExchange("chao.wang.debug.topic.exchange")).with("chao.wang.*"));
        bindingList.add(BindingBuilder.bind(new Queue("chao.wang.error")).
                to(new TopicExchange("chao.wang.debug.topic.exchange")).with("chao.wang.error.*"));
        return bindingList;
    }
}

上面的Application和DeclareConfig不列舉出來了撑刺,執(zhí)行Application應(yīng)用啟動(dòng)類鹉胖,查看web管控臺(tái)Exchange,Queue,Binding都已經(jīng)生成。

注意
當(dāng)聲明隊(duì)列是以amp開頭的時(shí)候够傍,隊(duì)列是不能創(chuàng)建聲明的甫菠。

@Bean
public Queue amqQueue(){
   return new Queue("amp.log",true);
}

總結(jié)

自動(dòng)聲明的一些條件

  • 要有連接(對(duì)rabbitmq的連接)
  • 容器中要有org.springframework.amqp.rabbit.core.RabbitAdmin的實(shí)例
  • RabbitAdminautoStartup屬性必須為true。
  • 如果ConnectionFactory使用的是CachingConnectionFactory冕屯,則cacheMode必須是CachingConnectionFactory.CacheMode.CHANNEL(默認(rèn))寂诱。
  • 所要聲明的組件(QueueExchangeBinding)的shouldDeclare必須是true(默認(rèn)就是true
  • Queue隊(duì)列的名字不能以amq.開頭安聘。

注意:Queue痰洒,ExchangeBinding都直接或者間接的繼承Declarable,而Declarable中定義了shouldDeclare的方法搞挣。

自動(dòng)聲明源碼分析

org.springframework.amqp.rabbit.core.RabbitAdmin實(shí)現(xiàn)InitializingBean接口带迟,在BeanFactory設(shè)置完所有屬性之后執(zhí)行特定初始化(afterPropertiesSet方法)

RabbitAdminafterPropertiesSet方法,

@Override
    public void afterPropertiesSet() {

        synchronized (this.lifecycleMonitor) {

          //autoStartup屬性的值為false的時(shí)候囱桨,直接return
            if (this.running || !this.autoStartup) {
                return;
            }

          //connectionFactory實(shí)例如果是CachingConnectionFactory仓犬,并且CacheMode是CacheMode.CONNECTION也會(huì)return下面不執(zhí)行了。
            if (this.connectionFactory instanceof CachingConnectionFactory &&
                    ((CachingConnectionFactory) this.connectionFactory).getCacheMode() == CacheMode.CONNECTION) {
                this.logger.warn("RabbitAdmin auto declaration is not supported with CacheMode.CONNECTION");
                return;
            }

          //連接的監(jiān)聽器
            this.connectionFactory.addConnectionListener(new ConnectionListener() {

                // Prevent stack overflow...
                private final AtomicBoolean initializing = new AtomicBoolean(false);

                @Override
                public void onCreate(Connection connection) {
                    if (!initializing.compareAndSet(false, true)) {
                        // If we are already initializing, we don't need to do it again...
                        return;
                    }
                    try {
                        //執(zhí)行這個(gè)方法
                        initialize();
                    }
                    finally {
                        initializing.compareAndSet(true, false);
                    }
                }

                @Override
                public void onClose(Connection connection) {
                }

            });

            this.running = true;

        }
    }

RabbitAdmininitialize方法舍肠,聲明所有exchanges, queuesbindings

/**
     * Declares all the exchanges, queues and bindings in the enclosing application context, if any. It should be safe
     * (but unnecessary) to call this method more than once.
     */
    public void initialize() {

        if (this.applicationContext == null) {
            this.logger.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings");
            return;
        }

        this.logger.debug("Initializing declarations");
        //得到容器中所有的Exchange
        Collection<Exchange> contextExchanges = new LinkedList<Exchange>(
                this.applicationContext.getBeansOfType(Exchange.class).values());
        //得到容器中所有的Queue
        Collection<Queue> contextQueues = new LinkedList<Queue>(
                this.applicationContext.getBeansOfType(Queue.class).values());
       //得到容器中所有的Binding
        Collection<Binding> contextBindings = new LinkedList<Binding>(
                this.applicationContext.getBeansOfType(Binding.class).values());

       //獲取容器中所有的Collection搀继,如果容器中所有元素是Exchange,Queue或者Binding的時(shí)候?qū)⑦@些實(shí)例也加入到spring容器中翠语。
        @SuppressWarnings("rawtypes")
        Collection<Collection> collections = this.applicationContext.getBeansOfType(Collection.class, false, false)
                .values();
        for (Collection<?> collection : collections) {
            if (collection.size() > 0 && collection.iterator().next() instanceof Declarable) {
                for (Object declarable : collection) {
                    if (declarable instanceof Exchange) {
                        contextExchanges.add((Exchange) declarable);
                    }
                    else if (declarable instanceof Queue) {
                        contextQueues.add((Queue) declarable);
                    }
                    else if (declarable instanceof Binding) {
                        contextBindings.add((Binding) declarable);
                    }
                }
            }
        }

       //進(jìn)行了filter過濾叽躯,
        final Collection<Exchange> exchanges = filterDeclarables(contextExchanges);
        final Collection<Queue> queues = filterDeclarables(contextQueues);
        final Collection<Binding> bindings = filterDeclarables(contextBindings);

        for (Exchange exchange : exchanges) {
            if ((!exchange.isDurable() || exchange.isAutoDelete())  && this.logger.isInfoEnabled()) {
                this.logger.info("Auto-declaring a non-durable or auto-delete Exchange ("
                        + exchange.getName()
                        + ") durable:" + exchange.isDurable() + ", auto-delete:" + exchange.isAutoDelete() + ". "
                        + "It will be deleted by the broker if it shuts down, and can be redeclared by closing and "
                        + "reopening the connection.");
            }
        }

        for (Queue queue : queues) {
            if ((!queue.isDurable() || queue.isAutoDelete() || queue.isExclusive()) && this.logger.isInfoEnabled()) {
                this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue ("
                        + queue.getName()
                        + ") durable:" + queue.isDurable() + ", auto-delete:" + queue.isAutoDelete() + ", exclusive:"
                        + queue.isExclusive() + ". "
                        + "It will be redeclared if the broker stops and is restarted while the connection factory is "
                        + "alive, but all messages will be lost.");
            }
        }

        this.rabbitTemplate.execute(new ChannelCallback<Object>() {
            @Override
            public Object doInRabbit(Channel channel) throws Exception {
               //聲明exchange,如果exchange是默認(rèn)的exchange那么也不會(huì)聲明肌括。
                declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
                //聲明隊(duì)列点骑,如果隊(duì)列名以amq.開頭的也不會(huì)進(jìn)行聲明
                declareQueues(channel, queues.toArray(new Queue[queues.size()]));
                declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));
                return null;
            }
        });
        this.logger.debug("Declarations finished");

    }

filterDeclarables方法過濾一些ExchangeQueueBinding黑滴,因?yàn)檫@三個(gè)類都是繼承Declarable這個(gè)類憨募,


    private <T extends Declarable> Collection<T> filterDeclarables(Collection<T> declarables) {
        Collection<T> filtered = new ArrayList<T>();
        for (T declarable : declarables) {
            Collection<?> adminsWithWhichToDeclare = declarable.getDeclaringAdmins();
            //shouldDeclare屬性必須是true,否則就會(huì)被過濾掉了
            if (declarable.shouldDeclare() &&
                (adminsWithWhichToDeclare.isEmpty() || adminsWithWhichToDeclare.contains(this))) {
                filtered.add(declarable);
            }
        }
        return filtered;
    }

聲明Exchanges

    private void declareExchanges(final Channel channel, final Exchange... exchanges) throws IOException {
        for (final Exchange exchange : exchanges) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("declaring Exchange '" + exchange.getName() + "'");
            }

            //不是默認(rèn)的Exchange
            if (!isDeclaringDefaultExchange(exchange)) {
                try {
                   //是否是delayed類型的Exchange
                    if (exchange.isDelayed()) {
                        Map<String, Object> arguments = exchange.getArguments();
                        if (arguments == null) {
                            arguments = new HashMap<String, Object>();
                        }
                        else {
                            arguments = new HashMap<String, Object>(arguments);
                        }
                        arguments.put("x-delayed-type", exchange.getType());
                       //調(diào)用exchangeDeclare進(jìn)行聲明
                        channel.exchangeDeclare(exchange.getName(), DELAYED_MESSAGE_EXCHANGE, exchange.isDurable(),
                                exchange.isAutoDelete(), exchange.isInternal(), arguments);
                    }
                    else {
                       //調(diào)用exchangeDeclare進(jìn)行聲明
                      channel.exchangeDeclare(exchange.getName(), exchange.getType(), exchange.isDurable(),
                                exchange.isAutoDelete(), exchange.isInternal(), exchange.getArguments());
                    }
                }
                catch (IOException e) {
                    logOrRethrowDeclarationException(exchange, "exchange", e);
                }
            }
        }
    }

聲明Queue隊(duì)列

private DeclareOk[] declareQueues(final Channel channel, final Queue... queues) throws IOException {
        List<DeclareOk> declareOks = new ArrayList<DeclareOk>(queues.length);
        for (int i = 0; i < queues.length; i++) {
            Queue queue = queues[i];
            //隊(duì)列不以amq.開頭的隊(duì)列才能進(jìn)行聲明
            if (!queue.getName().startsWith("amq.")) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("declaring Queue '" + queue.getName() + "'");
                }
                try {
                    try {
                       //進(jìn)行隊(duì)列聲明
                        DeclareOk declareOk = channel.queueDeclare(queue.getName(), queue.isDurable(),
                                queue.isExclusive(), queue.isAutoDelete(), queue.getArguments());
                        declareOks.add(declareOk);
                    }
                    catch (IllegalArgumentException e) {
                        if (this.logger.isDebugEnabled()) {
                            this.logger.error("Exception while declaring queue: '" + queue.getName() + "'");
                        }
                        try {
                            if (channel instanceof ChannelProxy) {
                                ((ChannelProxy) channel).getTargetChannel().close();
                            }
                        }
                        catch (TimeoutException e1) {
                        }
                        throw new IOException(e);
                    }
                }
                catch (IOException e) {
                    logOrRethrowDeclarationException(queue, "queue", e);
                }
            }
            this.logger.debug("Queue with name that starts with 'amq.' cannot be declared.");
        }
        return declareOks.toArray(new DeclareOk[declareOks.size()]);
}

binding聲明:

    private void declareBindings(final Channel channel, final Binding... bindings) throws IOException {
        for (Binding binding : bindings) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Binding destination [" + binding.getDestination() + " (" + binding.getDestinationType()
                        + ")] to exchange [" + binding.getExchange() + "] with routing key [" + binding.getRoutingKey()
                        + "]");
            }

            try {
               //QUEUE類型的綁定
                if (binding.isDestinationQueue()) {
                   //并且不是綁定到默認(rèn)的Default Exchange
                    if (!isDeclaringImplicitQueueBinding(binding)) {
                         //綁定隊(duì)列
                        channel.queueBind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(),
                                binding.getArguments());
                    }
                }
                else {
                    //Exchange類型的綁定
                    channel.exchangeBind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(),
                            binding.getArguments());
                }
            }
            catch (IOException e) {
                logOrRethrowDeclarationException(binding, "binding", e);
            }
        }
    }

參考資料
官網(wǎng)
github地址
官網(wǎng)文檔

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末袁辈,一起剝皮案震驚了整個(gè)濱河市菜谣,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌晚缩,老刑警劉巖尾膊,帶你破解...
    沈念sama閱讀 206,311評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異荞彼,居然都是意外死亡冈敛,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,339評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門卿泽,熙熙樓的掌柜王于貴愁眉苦臉地迎上來莺债,“玉大人,你說我怎么就攤上這事签夭∑氚睿” “怎么了?”我有些...
    開封第一講書人閱讀 152,671評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵第租,是天一觀的道長(zhǎng)措拇。 經(jīng)常有香客問我,道長(zhǎng)慎宾,這世上最難降的妖魔是什么丐吓? 我笑而不...
    開封第一講書人閱讀 55,252評(píng)論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮趟据,結(jié)果婚禮上券犁,老公的妹妹穿的比我還像新娘。我一直安慰自己汹碱,他們只是感情好粘衬,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,253評(píng)論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著咳促,像睡著了一般稚新。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上跪腹,一...
    開封第一講書人閱讀 49,031評(píng)論 1 285
  • 那天褂删,我揣著相機(jī)與錄音,去河邊找鬼冲茸。 笑死屯阀,一個(gè)胖子當(dāng)著我的面吹牛缅帘,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播难衰,決...
    沈念sama閱讀 38,340評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼股毫,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了召衔?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,973評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤祭陷,失蹤者是張志新(化名)和其女友劉穎苍凛,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體兵志,經(jīng)...
    沈念sama閱讀 43,466評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡醇蝴,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,937評(píng)論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了想罕。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片悠栓。...
    茶點(diǎn)故事閱讀 38,039評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖按价,靈堂內(nèi)的尸體忽然破棺而出惭适,到底是詐尸還是另有隱情,我是刑警寧澤楼镐,帶...
    沈念sama閱讀 33,701評(píng)論 4 323
  • 正文 年R本政府宣布癞志,位于F島的核電站,受9級(jí)特大地震影響框产,放射性物質(zhì)發(fā)生泄漏凄杯。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,254評(píng)論 3 307
  • 文/蒙蒙 一秉宿、第九天 我趴在偏房一處隱蔽的房頂上張望戒突。 院中可真熱鬧,春花似錦描睦、人聲如沸膊存。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,259評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)膝舅。三九已至,卻和暖如春窑多,著一層夾襖步出監(jiān)牢的瞬間仍稀,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評(píng)論 1 262
  • 我被黑心中介騙來泰國(guó)打工埂息, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留技潘,地道東北人遥巴。 一個(gè)月前我還...
    沈念sama閱讀 45,497評(píng)論 2 354
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像享幽,于是被迫代替她去往敵國(guó)和親铲掐。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,786評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容