Springboot整合RabbitMQ

1叁丧、簡介

RabbitMQ 即一個(gè)消息隊(duì)列,主要是用來實(shí)現(xiàn)應(yīng)用程序的異步和解耦冈止,同時(shí)也能起到消息緩沖狂票,消息分發(fā)的作用。

2熙暴、創(chuàng)建一個(gè)springboot的項(xiàng)目

3闺属、添加RabbitMQ依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

4慌盯、在application.yml中配置RabbitMQ

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    publisher-confirms: true
    virtual-host: /

5、創(chuàng)建一個(gè)rabbitMQ配置類(這個(gè)一定要看明白)

/**
 * 說明:〈該類初始化創(chuàng)建隊(duì)列掂器、轉(zhuǎn)發(fā)器亚皂,并把隊(duì)列綁定到轉(zhuǎn)發(fā)器〉
 */
@Configuration
public class ApplicationConfig {
    private static Logger log = LoggerFactory.getLogger(ApplicationConfig.class);

    @Autowired
    private CachingConnectionFactory connectionFactory;
    final static String queueName = "helloQuery";

    @Bean
    public Queue helloQueue() {
        return new Queue(queueName);
    }

    @Bean
    public Queue userQueue() {
        return new Queue("user");
    }

    @Bean
    public Queue dirQueue() {
        return new Queue("direct");
    }
    
    //===============以下是驗(yàn)證topic Exchange的隊(duì)列========== 
    // Bean默認(rèn)的name是方法名
    @Bean(name="message")
    public Queue queueMessage() {
        return new Queue("topic.message");
    }

    @Bean(name="messages")
    public Queue queueMessages() {
        return new Queue("topic.messages");
    }
    //===============以上是驗(yàn)證topic Exchange的隊(duì)列===========
    
    
    //===============以下是驗(yàn)證Fanout Exchange的隊(duì)列==========
    @Bean(name="AMessage")
    public Queue AMessage() {
        return new Queue("fanout.A");
    }

    @Bean
    public Queue BMessage() {
        return new Queue("fanout.B");
    }

    @Bean
    public Queue CMessage() {
        return new Queue("fanout.C");
    }
    //===============以上是驗(yàn)證Fanout Exchange的隊(duì)列==========

    /**
     * exchange是交換機(jī)交換機(jī)的主要作用是接收相應(yīng)的消息并且綁定到指定的隊(duì)列.交換機(jī)有四種類型,分別為Direct,topic,headers,Fanout.
     *
     * Direct是RabbitMQ默認(rèn)的交換機(jī)模式,也是最簡單的模式.即創(chuàng)建消息隊(duì)列的時(shí)候,指定一個(gè)BindingKey.當(dāng)發(fā)送者發(fā)送消息的時(shí)候,指定對應(yīng)的Key.當(dāng)Key和消息隊(duì)列的BindingKey一致的時(shí)候,消息將會(huì)被發(fā)送到該消息隊(duì)列中.
     *
     * topic轉(zhuǎn)發(fā)信息主要是依據(jù)通配符,隊(duì)列和交換機(jī)的綁定主要是依據(jù)一種模式(通配符+字符串),而當(dāng)發(fā)送消息的時(shí)候,只有指定的Key和該模式相匹配的時(shí)候,消息才會(huì)被發(fā)送到該消息隊(duì)列中.
     *
     * headers也是根據(jù)一個(gè)規(guī)則進(jìn)行匹配,在消息隊(duì)列和交換機(jī)綁定的時(shí)候會(huì)指定一組鍵值對規(guī)則,而發(fā)送消息的時(shí)候也會(huì)指定一組鍵值對規(guī)則,當(dāng)兩組鍵值對規(guī)則相匹配的時(shí)候,消息會(huì)被發(fā)送到匹配的消息隊(duì)列中.
     *
     * Fanout是路由廣播的形式,將會(huì)把消息發(fā)給綁定它的全部隊(duì)列,即便設(shè)置了key,也會(huì)被忽略.
     */
    @Bean
    DirectExchange directExchange(){
        return new DirectExchange("directExchange");
    }
    @Bean
    TopicExchange exchange() {
        // 參數(shù)1為交換機(jī)的名稱
        return new TopicExchange("exchange");
    }
    /**
     * //配置廣播路由器
     * @return FanoutExchange
     */
    @Bean
    FanoutExchange fanoutExchange() {
        // 參數(shù)1為交換機(jī)的名稱
        return new FanoutExchange("fanoutExchange");
    }

    @Bean
    Binding bindingExchangeDirect(@Qualifier("dirQueue")Queue dirQueue, DirectExchange directExchange){
        return  BindingBuilder.bind(dirQueue).to(directExchange).with("direct");
    }

    /**
     * 將隊(duì)列topic.message與exchange綁定,routing_key為topic.message,就是完全匹配
     * @param queueMessage
     * @param exchange
     * @return
     */
    @Bean
    // 如果參數(shù)名和上面用到方法名稱一樣国瓮,可以不用寫@Qualifier
    Binding bindingExchangeMessage(@Qualifier("message")Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    /**
     * 將隊(duì)列topic.messages與exchange綁定灭必,routing_key為topic.#,模糊匹配
     * @param queueMessages
     * @param exchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessages(@Qualifier("messages")Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }

    @Bean
    Binding bindingExchangeA(@Qualifier("AMessage")Queue AMessage,FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(BMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(CMessage).to(fanoutExchange);
    }

    /**
     * rabbitTemplate是thread safe的,主要是channel不能共用乃摹,但是在rabbitTemplate源碼里channel是threadlocal的禁漓,所以singleton沒問題。
     * 但是rabbitTemplate要設(shè)置回調(diào)類孵睬,如果是singleton播歼,回調(diào)類就只能有一個(gè),所以如果想要設(shè)置不同的回調(diào)類掰读,就要設(shè)置為prototype的scope秘狞。
     * @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE, proxyMode = ScopedProxyMode.TARGET_CLASS)
     * @return
     */
    @Bean
    public RabbitTemplate rabbitTemplate(){
        //若使用confirm-callback或return-callback,必須要配置publisherConfirms或publisherReturns為true
        //每個(gè)rabbitTemplate只能有一個(gè)confirm-callback和return-callback蹈集,如果這里配置了烁试,那么寫生產(chǎn)者的時(shí)候不能再寫confirm-callback和return-callback
        //使用return-callback時(shí)必須設(shè)置mandatory為true,或者在配置中設(shè)置mandatory-expression的值為true
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        //將對象序列化為json串
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

        /**
         * 如果消息沒有到exchange,則confirm回調(diào),ack=false
         * 如果消息到達(dá)exchange,則confirm回調(diào),ack=true
         * exchange到queue成功,則不回調(diào)return
         * exchange到queue失敗,則回調(diào)return(需設(shè)置mandatory=true,否則不回回調(diào),消息就丟了)
         */
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if(ack){
                    log.info("消息發(fā)送成功: correlationData:{}, ack{}, cause:{}", correlationData, ack, cause);
                }else{
                    log.error("消息發(fā)送失敗: correlationData:{}, ack:{}, cause:{}", correlationData, ack, cause);
                }
            }
        });

        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.error("消息丟失: exchange:{}, route:{}, replyCode:{}, replyText:{}, message:{}", exchange, routingKey, replyCode, replyText, new String(message.getBody()));
            }
        });

        return rabbitTemplate;
    }
}

rabbitMQ配置類大約就這些內(nèi)容雾狈,里面我基本上都做了注釋廓潜。

下面我們就開始寫rabbitMQ的用法了

6、單生產(chǎn)者和單消費(fèi)者

6.1善榛、生產(chǎn)者

@Component 
public class HelloSender1 { 
    /**
     * AmqpTemplate可以說是RabbitTemplate父類,RabbitTemplate實(shí)現(xiàn)類RabbitOperations接口呻畸,RabbitOperations繼承了AmqpTemplate接口
     */ 

    @Autowired 
    private AmqpTemplate rabbitTemplate;

    @Autowired 
    private RabbitTemplate rabbitTemplate1; 

    /**
     * 用于單生產(chǎn)者-》單消費(fèi)者測試
     */
    public void send() {
        String sendMsg = "hello1 " + new Date();
        System.out.println("Sender1 : " + sendMsg); this.rabbitTemplate1.convertAndSend("helloQueue", sendMsg);
    }
}

名為helloQueue的隊(duì)列在配置類創(chuàng)建好了移盆,項(xiàng)目啟動(dòng)的時(shí)候會(huì)自動(dòng)創(chuàng)建

6.2、消費(fèi)者

@Component
@RabbitListener(queues = "helloQueue") 
public class HelloReceiver1 {

    @RabbitHandler 
    public void process(String hello) {
        System.out.println("Receiver1  : " + hello);
    }
}

@RabbitListener注解是監(jiān)聽隊(duì)列的伤为,當(dāng)隊(duì)列有消息的時(shí)候咒循,它會(huì)自動(dòng)獲取。

@RabbitListener 標(biāo)注在類上面表示當(dāng)有收到消息的時(shí)候绞愚,就交給 @RabbitHandler 的方法處理叙甸,具體使用哪個(gè)方法處理,根據(jù) MessageConverter 轉(zhuǎn)換后的參數(shù)類型

注意

  • 消息處理方法參數(shù)是由 MessageConverter 轉(zhuǎn)化位衩,若使用自定義 MessageConverter 則需要在 RabbitListenerContainerFactory 實(shí)例中去設(shè)置(默認(rèn) Spring 使用的實(shí)現(xiàn)是 SimpleRabbitListenerContainerFactory)

  • 消息的 content_type 屬性表示消息 body 數(shù)據(jù)以什么數(shù)據(jù)格式存儲(chǔ)裆蒸,接收消息除了使用 Message 對象接收消息(包含消息屬性等信息)之外,還可直接使用對應(yīng)類型接收消息 body 內(nèi)容糖驴,但若方法參數(shù)類型不正確會(huì)拋異常:

  • application/octet-stream:二進(jìn)制字節(jié)數(shù)組存儲(chǔ)僚祷,使用 byte[]

  • application/x-java-serialized-object:java 對象序列化格式存儲(chǔ)佛致,使用 Object、相應(yīng)類型(反序列化時(shí)類型應(yīng)該同包同名辙谜,否者會(huì)拋出找不到類異常)

  • text/plain:文本數(shù)據(jù)類型存儲(chǔ)俺榆,使用 String

  • application/json:JSON 格式,使用 Object装哆、相應(yīng)類型

6.3罐脊、controller

 /**
  * 最簡單的hello生產(chǎn)和消費(fèi)實(shí)現(xiàn)(單生產(chǎn)者和單消費(fèi)者)
  */ 
@RequestMapping("/hello") 
public void hello() { 
    helloSender1.send(); 
} 

6.4、結(jié)果

控制臺(tái)的結(jié)果:

Sender1 : hello1 Mon Feb 18 10:13:35 CST 2019

2019-02-18 10:13:35,831 INFO (Application.java:169)- 消息發(fā)送成功:correlationData(null),ack(true),cause(null)

Receiver1  : hello1 Mon Feb 18 10:13:35 CST 2019

7蜕琴、單生產(chǎn)者對多消費(fèi)者

7.1萍桌、生產(chǎn)者

/**
 * 用于單/多生產(chǎn)者-》多消費(fèi)者測試
 */
public void send(String msg) {
    String sendMsg = msg + new Date();
    System.out.println("Sender1 : " + sendMsg); this.rabbitTemplate.convertAndSend("helloQueue", sendMsg);
}

7.2、消費(fèi)者

消費(fèi)者1

@Component
@RabbitListener(queues = "helloQueue") 
public class HelloReceiver1 {

    @RabbitHandler 
    public void process(String hello) {
        System.out.println("Receiver1  : " + hello);
    }
}

消費(fèi)者2

@Component
@RabbitListener(queues = "helloQueue") 
public class HelloReceiver2 {

    @RabbitHandler public void process(String hello) {
        System.out.println("Receiver2  : " + hello);
    }
}

7.3奸绷、controller

/**
 * 單生產(chǎn)者-多消費(fèi)者
 */
@RequestMapping("/oneToMany")
public void oneToMany() {
    for(int i=0;i<10;i++){
        helloSender1.send("hellomsg:"+i);
    }
}

7.4梗夸、結(jié)果:

Sender1 : hellomsg:0Mon Feb 18 10:19:09 CST 2019

Sender1 : hellomsg:1Mon Feb 18 10:19:09 CST 2019

Sender1 : hellomsg:2Mon Feb 18 10:19:09 CST 2019

Sender1 : hellomsg:3Mon Feb 18 10:19:09 CST 2019

Sender1 : hellomsg:4Mon Feb 18 10:19:09 CST 2019

Sender1 : hellomsg:5Mon Feb 18 10:19:09 CST 2019

Sender1 : hellomsg:6Mon Feb 18 10:19:09 CST 2019

Sender1 : hellomsg:7Mon Feb 18 10:19:10 CST 2019

Sender1 : hellomsg:8Mon Feb 18 10:19:10 CST 2019

Sender1 : hellomsg:9Mon Feb 18 10:19:10 CST 2019

Receiver2  : hellomsg:0Mon Feb 18 10:19:09 CST 2019

Receiver2  : hellomsg:2Mon Feb 18 10:19:09 CST 2019

Receiver2  : hellomsg:4Mon Feb 18 10:19:09 CST 2019

Receiver1  : hellomsg:1Mon Feb 18 10:19:09 CST 2019

Receiver2  : hellomsg:6Mon Feb 18 10:19:09 CST 2019

Receiver1  : hellomsg:3Mon Feb 18 10:19:09 CST 2019

Receiver2  : hellomsg:8Mon Feb 18 10:19:10 CST 2019

Receiver1  : hellomsg:5Mon Feb 18 10:19:09 CST 2019

Receiver1  : hellomsg:7Mon Feb 18 10:19:10 CST 2019

Receiver1  : hellomsg:9Mon Feb 18 10:19:10 CST 2019

2019-02-18 10:19:10,041 INFO (Application.java:169)- 消息發(fā)送成功:correlationData(null),ack(true),cause(null)

2019-02-18 10:19:10,041 INFO (Application.java:169)- 消息發(fā)送成功:correlationData(null),ack(true),cause(null)

2019-02-18 10:19:10,042 INFO (Application.java:169)- 消息發(fā)送成功:correlationData(null),ack(true),cause(null)

2019-02-18 10:19:10,042 INFO (Application.java:169)- 消息發(fā)送成功:correlationData(null),ack(true),cause(null)

2019-02-18 10:19:10,042 INFO (Application.java:169)- 消息發(fā)送成功:correlationData(null),ack(true),cause(null)

2019-02-18 10:19:10,042 INFO (Application.java:169)- 消息發(fā)送成功:correlationData(null),ack(true),cause(null)

2019-02-18 10:19:10,044 INFO (Application.java:169)- 消息發(fā)送成功:correlationData(null),ack(true),cause(null)

2019-02-18 10:19:10,045 INFO (Application.java:169)- 消息發(fā)送成功:correlationData(null),ack(true),cause(null)

2019-02-18 10:19:10,045 INFO (Application.java:169)- 消息發(fā)送成功:correlationData(null),ack(true),cause(null)

2019-02-18 10:19:10,045 INFO (Application.java:169)- 消息發(fā)送成功:correlationData(null),ack(true),cause(null)

8、實(shí)體類的傳輸号醉,必須格式化

8.1反症、實(shí)體類

public class User implements Serializable { 

    private String name; 

    private String pass; 

    public String getName() { 
        return name;
    } 
   
    public void setName(String name) { 
        this.name = name;
    } 
    public String getPass() { 
        return pass;
    } 
    public void setPass(String pass) { 
        this.pass = pass;
    }

    @Override 
    public String toString() { 
        return "User{" +

                "name='" + name + '\'' +

                ", pass='" + pass + '\'' +

                '}';
    }
}

8.2、生產(chǎn)者

/**
 * 實(shí)體類的傳輸(springboot完美的支持對象的發(fā)送和接收畔派,不需要格外的配置铅碍。實(shí)體類必須序列化)
 * @param user
 */
public void send(User user) {
    System.out.println("user send : " + user.getName()+"/"+user.getPass());     
    this.rabbitTemplate.convertAndSend("userQueue", user);
}

8.3、消費(fèi)者

@Component
@RabbitListener(queues = "userQueue") 
public class HelloReceiver3 {

    @RabbitHandler 
    public void process(User user){
        System.out.println("user receive  : " + user.getName()+"/"+user.getPass());
    }
}

8.4线椰、controller

/**
 * 實(shí)體列的傳輸
 */ 
@RequestMapping("/userTest") 
public void userTest(){
    User user=new User();
    user.setName("黃義波");
    user.setPass("123456");
    userSender.send(user);
}

8.5胞谈、結(jié)果

user send : 黃義波/123456
2019-02-18 10:24:24,251 INFO (Application.java:169)- 消息發(fā)送成功:correlationData(null),ack(true),cause(null)
user receive  : 黃義波/123456

9、directExchange

Direct是RabbitMQ默認(rèn)的交換機(jī)模式,也是最簡單的模式.即創(chuàng)建消息隊(duì)列的時(shí)候,指定一個(gè)BindingKey.當(dāng)發(fā)送者發(fā)送消息的時(shí)候,指定對應(yīng)的Key.當(dāng)Key和消息隊(duì)列的BindingKey一致的時(shí)候,消息將會(huì)被發(fā)送到該消息隊(duì)列中.

9.1憨愉、在rabbitMQ配置類中添加內(nèi)容

@Bean 
public Queue dirQueue() { 
    return new Queue("direct");
}

@Bean
DirectExchange directExchange(){ 
    return new DirectExchange("directExchange");
} 

/**
 * 將隊(duì)列dirQueue與directExchange交換機(jī)綁定烦绳,routing_key為direct
 * @param dirQueue
 * @param directExchange
 * @return
 */ 
@Bean
Binding bindingExchangeDirect(@Qualifier("dirQueue")Queue dirQueue,DirectExchange directExchange){ 
    return  BindingBuilder.bind(dirQueue).to(directExchange).with("direct");
}

9.2、生產(chǎn)者

@Component 
public class DirectSender {

    @Autowired 
    private AmqpTemplate rabbitTemplate; 

    public void send() {
        String msgString="directSender :hello i am hzb";
        System.out.println(msgString);   
        this.rabbitTemplate.convertAndSend("direct", msgString);
    }
}

9.3配紫、消費(fèi)者

@Component
@RabbitListener(queues = "direct") 
public class DirectReceiver {

    @RabbitHandler 
    public void process(String msg) {
        System.out.println("directReceiver  : " + msg);
    }
}

9.4径密、controller

@RequestMapping("/directTest") 
public void directTest() { 
    directSender.send(); 
}

9.5、結(jié)果

directSender :hello i am hyb
directReceiver  : directSender :hello i am hyb
2019-02-18 10:33:25,974 INFO (Application.java:175)- 消息發(fā)送成功:correlationData(null),ack(true),cause(null)

10躺孝、topicExchange

topic轉(zhuǎn)發(fā)信息主要是依據(jù)通配符,隊(duì)列和交換機(jī)的綁定主要是依據(jù)一種模式(通配符+字符串),而當(dāng)發(fā)送消息的時(shí)候,只有指定的Key和該模式相匹配的時(shí)候,消息才會(huì)被發(fā)送到該消息隊(duì)列中.

10.1享扔、在rabbitMQ配置類中添加內(nèi)容

// Bean默認(rèn)的name是方法名 
@Bean(name="message") 
public Queue queueMessage() { 
    return new Queue("topic.message");
}

@Bean(name="messages") 
public Queue queueMessages() { 
    return new Queue("topic.messages");
}

@Bean
TopicExchange exchange() { 
    // 參數(shù)1為交換機(jī)的名稱
    return new TopicExchange("exchange");
} 

/**
 * 將隊(duì)列topic.message與exchange綁定,routing_key為topic.message,就是完全匹配
 * @param queueMessage
 * @param exchange
 * @return
 */ 
@Bean 
// 如果參數(shù)名和上面用到方法名稱一樣植袍,可以不用寫@Qualifier 
Binding bindingExchangeMessage(@Qualifier("message")Queue queueMessage, TopicExchange exchange) { 
    return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");

}

/**
 * 將隊(duì)列topic.messages與exchange綁定惧眠,routing_key為topic.#,模糊匹配
 * @param queueMessages
 * @param exchange
 * @return
 */ 
@Bean
Binding bindingExchangeMessages(@Qualifier("messages")Queue queueMessages, TopicExchange exchange) { 
    return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");

}

10.2、生產(chǎn)者

@Component 
public class TopicSender {

    @Autowired 
    private AmqpTemplate rabbitTemplate; 

    public void send() {

        String msg1 = "I am topic.mesaage msg======";

        System.out.println("sender1 : " + msg1); 
        this.rabbitTemplate.convertAndSend("exchange", "topic.message", msg1);

        String msg2 = "I am topic.mesaages msg########";

        System.out.println("sender2 : " + msg2); 
        this.rabbitTemplate.convertAndSend("exchange", "topic.messages", msg2);
    }
}

10.3于个、消費(fèi)者

消費(fèi)者1

@Component
@RabbitListener(queues = "topic.message") 
public class TopicMessageReceiver {

    @RabbitHandler 
    public void process(String msg) {
        System.out.println("topicMessageReceiver  : " +msg);
    }
}

消費(fèi)者2

@Component
@RabbitListener(queues = "topic.messages") 
public class TopicMessagesReceiver {

    @RabbitHandler 
    public void process(String msg) {
        System.out.println("topicMessagesReceiver  : " +msg);
    }
}

10.4氛魁、controller

/**
 * topic exchange類型rabbitmq測試
 */ 
@RequestMapping("/topicTest") 
public void topicTest() { 
    topicSender.send(); 
} 

10.5、結(jié)果

sender1 : I am topic.mesaage msg======

sender2 : I am topic.mesaages msg########

topicMessageReceiver  : I am topic.mesaage msg======

topicMessagesReceiver  : I am topic.mesaage msg======

topicMessagesReceiver  : I am topic.mesaages msg########

2019-02-18 10:39:46,150 INFO (Application.java:175)- 消息發(fā)送成功:correlationData(null),ack(true),cause(null)

2019-02-18 10:39:46,206 INFO (Application.java:175)- 消息發(fā)送成功:correlationData(null),ack(true),cause(null)

11、fanoutExchange

Fanout是路由廣播的形式,將會(huì)把消息發(fā)給綁定它的全部隊(duì)列,即便設(shè)置了key,也會(huì)被忽略.

11.1呆盖、在rabbitMQ配置類中添加內(nèi)容

//===============以下是驗(yàn)證Fanout Exchange的隊(duì)列==========
@Bean(name="AMessage") 
public Queue AMessage() { 
    return new Queue("fanout.A");
}

@Bean 
public Queue BMessage() { 
    return new Queue("fanout.B");
}

@Bean 
public Queue CMessage() { 
    return new Queue("fanout.C");
}

@Bean
FanoutExchange fanoutExchange() { 
    // 參數(shù)1為交換機(jī)的名稱
    return new FanoutExchange("fanoutExchange");
}

@Bean
Binding bindingExchangeA(@Qualifier("AMessage")Queue AMessage,FanoutExchange fanoutExchange) { 
    return BindingBuilder.bind(AMessage).to(fanoutExchange);
}

@Bean
Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) { 
    return BindingBuilder.bind(BMessage).to(fanoutExchange);
}

@Bean
Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) { 
    return BindingBuilder.bind(CMessage).to(fanoutExchange);
}

11.2拖云、生產(chǎn)者

@Component 
public class FanoutSender {

    @Autowired 
    private AmqpTemplate rabbitTemplate; 

    public void send() {
        String msgString="fanoutSender :hello i am hzb";
        System.out.println(msgString); // 參數(shù)2被忽略
        this.rabbitTemplate.convertAndSend("fanoutExchange","", msgString);
    }
}

11.3、消費(fèi)者

消費(fèi)者A

@Component
@RabbitListener(queues = "fanout.A") 
public class FanoutReceiverA {

    @RabbitHandler 
    public void process(String msg) {
        System.out.println("FanoutReceiverA  : " + msg);
    }
}

消費(fèi)者B

@Component
@RabbitListener(queues = "fanout.B") 
public class FanoutReceiverB {

    @RabbitHandler 
    public void process(String msg) {
        System.out.println("FanoutReceiverB  : " + msg);
    }
}

消費(fèi)者C

@Component
@RabbitListener(queues = "fanout.C") 
public class FanoutReceiverC {

    @RabbitHandler 
    public void process(String msg) {
        System.out.println("FanoutReceiverC  : " + msg);
    }
}

11.4应又、controller

/**
 * fanout exchange類型rabbitmq測試
 */ 

@RequestMapping("/fanoutTest") 
public void fanoutTest() { 
    fanoutSender.send(); 
} 

11.5宙项、結(jié)果

fanoutSender :hello i am hzb

FanoutReceiverA  : fanoutSender :hello i am hyb

FanoutReceiverC  : fanoutSender :hello i am hyb

FanoutReceiverB  : fanoutSender :hello i am hyb

2019-02-18 10:45:38,760 INFO (Application.java:175)- 消息發(fā)送成功:correlationData(null),ack(true),cause(null)

12、配置類中的rabbitTemplate

@Bean
public RabbitTemplate rabbitTemplate(){
    //若使用confirm-callback或return-callback株扛,必須要配置publisherConfirms或publisherReturns為true
    //每個(gè)rabbitTemplate只能有一個(gè)confirm-callback和return-callback尤筐,如果這里配置了,那么寫生產(chǎn)者的時(shí)候不能再寫confirm-callback和return-callback
    //使用return-callback時(shí)必須設(shè)置mandatory為true洞就,或者在配置中設(shè)置mandatory-expression的值為true
    connectionFactory.setPublisherConfirms(true);
    connectionFactory.setPublisherReturns(true);
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMandatory(true);

    /**
     * 如果消息沒有到exchange,則confirm回調(diào),ack=false
     * 如果消息到達(dá)exchange,則confirm回調(diào),ack=true
     * exchange到queue成功,則不回調(diào)return
     * exchange到queue失敗,則回調(diào)return(需設(shè)置mandatory=true,否則不回回調(diào),消息就丟了)
     */
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if(ack){
                log.info("消息發(fā)送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
            }else{
                log.info("消息發(fā)送失敗:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
            }
        }
    });

    rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
        @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            log.info("消息丟失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
        }
    });

    return rabbitTemplate;
}

好好看看注釋

13盆繁、不在配置類中配置callback

方法一:

13.1、配置一個(gè)接口

/**
 * 說明:〈定義一個(gè)名為SendMessageService 的接口旬蟋,這個(gè)接口繼承了RabbitTemplate.ConfirmCallback油昂,

 * ConfirmCallback接口是用來回調(diào)消息發(fā)送成功后的方法,當(dāng)一個(gè)消息被成功寫入到RabbitMQ服務(wù)端時(shí)倾贰,

 * 會(huì)自動(dòng)的回調(diào)RabbitTemplate.ConfirmCallback接口內(nèi)的confirm方法完成通知
 */
public interface SendMessageService extends RabbitTemplate.ConfirmCallback{ 
    void sendMessage(String exchange,String routekey,Object message);

}

13.2冕碟、實(shí)現(xiàn)這個(gè)接口

/**
 * 說明:〈該類注入了RabbitTemplate,RabbitTemplate封裝了發(fā)送消息的方法匆浙,我們直接使用即可安寺。

 * 可以看到我們構(gòu)建了一個(gè)回調(diào)返回的數(shù)據(jù),并使用convertAndSend方法發(fā)送了消息首尼。同時(shí)實(shí)現(xiàn)了confirm回調(diào)方法味赃,

 * 通過判斷isSendSuccess可以知道消息是否發(fā)送成功脐帝,這樣我們就可以進(jìn)行進(jìn)一步處理厅瞎。
 */ 
@Service 
public class SendMessageServiceImpl implements SendMessageService{  
    
    private static Logger logger = LoggerFactory.getLogger(SendMessageServiceImpl.class);

    @Autowired private RabbitTemplate rabbitTemplate;

    @Override 
    public void sendMessage(String exchange,String routekey,Object message) { 
        //設(shè)置回調(diào)對象 
        //rabbitTemplate.setConfirmCallback(this);           
        //rabbitTemplate.setMandatory(true); 
        //構(gòu)建回調(diào)返回的數(shù)據(jù)
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());  
        
        //rabbitTemplate.convertAndSend(Constants.SAVE_USER_EXCHANGE_NAME, Constants.SAVE_USER_QUEUE_ROUTE_KEY, message, correlationData);
        rabbitTemplate.convertAndSend(exchange, routekey, message, correlationData);
        logger.info("SendMessageServiceImpl() >>> 發(fā)送消息到RabbitMQ, 消息內(nèi)容: " + message);

    }

    /**
     * 消息回調(diào)確認(rèn)方法
     * @param correlationData 回調(diào)數(shù)據(jù)
     * @param isSendSuccess   是否發(fā)送成功
     * @param
     */ 
    @Override 
    public void confirm(CorrelationData correlationData, boolean isSendSuccess, String s) {
        logger.info("confirm回調(diào)方法>>>>>>>>>>>>>回調(diào)消息ID為: " + correlationData.getId()); 
        if (isSendSuccess) {
            logger.info("confirm回調(diào)方法>>>>>>>>>>>>>消息發(fā)送成功");
        } else {
            logger.info("confirm回調(diào)方法>>>>>>>>>>>>>消息發(fā)送失敗" + s);
        }
    }
}

方法二:

直接在生產(chǎn)者發(fā)送信息的時(shí)候修改rabbitTemplate

@Service
public class SendMessage1 {
    
    private static Logger log = LoggerFactory.getLogger(SendMessage1.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String exchange, String routekey, Object message) {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) {
                    log.info("消息發(fā)送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
                } else {
                    log.info("消息發(fā)送失敗:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
                }
            }
        });

        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

                log.info("消息丟失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
            }
        });
    }
}

13慌闭、有時(shí)候消費(fèi)者出現(xiàn)錯(cuò)誤,需要人工處理

//構(gòu)建回調(diào)返回的數(shù)據(jù)
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

//rabbitTemplate.convertAndSend(Constants.SAVE_USER_EXCHANGE_NAME, Constants.SAVE_USER_QUEUE_ROUTE_KEY, message, correlationData); 

//rabbitTemplate.convertAndSend(exchange, routekey, message, correlationData); 

// 將 CorrelationData的id 與 Message的correlationId綁定查排,然后關(guān)系保存起來,例如放到緩存中,然后人工處理 

// 當(dāng)confirm或return回調(diào)時(shí),根據(jù)ack類別等,分別處理. 例如return或者ack=false則說明有問題,報(bào)警, 如果ack=true則刪除關(guān)系 

// (因?yàn)閞eturn在confirm前,所以一條消息在return后又ack=true的情況也是按return處理)
Message message1 = MessageBuilder.withBody(message.toString().getBytes()).setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN).setCorrelationId(correlationData.getId()).build();

rabbitTemplate.send(exchange, routekey, message1, correlationData);

將 CorrelationData的id 與 Message的correlationId綁定破加,然后關(guān)系保存起來,例如放到緩存中,然后人工處理

image

我們可以看到,這兩條消息關(guān)聯(lián)起來了雹嗦。

14、事務(wù)消息

消息確認(rèn)機(jī)制

RabbitMQ提供了transaction合是、confirm兩種消息確認(rèn)機(jī)制了罪。transaction即事務(wù)機(jī)制,手動(dòng)提交和回滾聪全;confirm機(jī)制提供了Confirmlistener和waitForConfirms兩種方式泊藕。confirm機(jī)制效率明顯會(huì)高于transaction機(jī)制,但后者的優(yōu)勢在于強(qiáng)一致性难礼。如果沒有特別的要求娃圆,建議使用confrim機(jī)制玫锋。

  • 1、從實(shí)驗(yàn)來看讼呢,消息的確認(rèn)機(jī)制只是確認(rèn)publisher發(fā)送消息到broker撩鹿,由broker進(jìn)行應(yīng)答,不能確認(rèn)消息是否有效消費(fèi)悦屏。
  • 2节沦、而為了確認(rèn)消息是否被發(fā)送給queue,應(yīng)該在發(fā)送消息中啟用參數(shù)mandatory=true础爬,使用ReturnListener接收未被發(fā)送成功的消息甫贯。
  • 3、接下來就需要確認(rèn)消息是否被有效消費(fèi)看蚜。publisher端目前并沒有提供監(jiān)聽事件叫搁,但提供了應(yīng)答機(jī)制來保證消息被成功消費(fèi),應(yīng)答方式:
    • basicAck:成功消費(fèi)供炎,消息從隊(duì)列中刪除
    • basicNack:requeue=true渴逻,消息重新進(jìn)入隊(duì)列,false被刪除
    • basicReject:等同于basicNack
    • basicRecover:消息重入隊(duì)列碱茁,requeue=true裸卫,發(fā)送給新的consumer,false發(fā)送給相同的consumer

RabbitMQ是基于AMQP協(xié)議實(shí)現(xiàn)的纽竣,該協(xié)議實(shí)現(xiàn)了事務(wù)機(jī)制墓贿,因此RabbitMQ也支持事務(wù)機(jī)制。RabbitMQ中蜓氨,與事務(wù)機(jī)制有關(guān)的方法有三個(gè):

  • txSelect():將當(dāng)前channel設(shè)置成transaction模式聋袋。
  • txCommit():提交事務(wù)。
  • txRollback():回滾事務(wù)穴吹。
  • channel.basicPublish:發(fā)送消息幽勒,可以是多條,可以是消費(fèi)消息提交ack港令。
  • autoAck=false啥容,手動(dòng)提交ack,以事務(wù)提交或回滾為準(zhǔn)顷霹。
  • autoAck=true咪惠,不支持事務(wù)的,也就是說你即使在收到消息之后在回滾事務(wù)也是于事無補(bǔ)的淋淀,隊(duì)列已經(jīng)把消息移除了遥昧。

RabbitConfig

import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

@Configuration
@Data
public class RabbitConfig {
    @Value("${spring.rabbitmq.host}")
    private String host;
    @Value("${spring.rabbitmq.port}")
    private int port;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

}

RabbitUtil

  • 通過@PostConstruct注解,在依賴注入之后,初始化ConnectionFactory炭臭。
  • 提供創(chuàng)建ConnectionFactory的方法和創(chuàng)建Connection的方法永脓。
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

@Component
public final class RabbitUtil {

    @Resource
    private RabbitConfig rabbitConfig;

    private ConnectionFactory factory;

    @PostConstruct
    public void init() {
        factory = new ConnectionFactory();
        factory.setHost(rabbitConfig.getHost());
        factory.setPort(rabbitConfig.getPort());
        factory.setUsername(rabbitConfig.getUsername());
        factory.setPassword(rabbitConfig.getPassword());
        factory.setVirtualHost(rabbitConfig.getVirtualHost());
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(60000L);
    }

    public ConnectionFactory newConnectionFactory() {
        return factory;
    }

    public Connection newConnection() throws IOException, TimeoutException {
        return factory.newConnection();
    }
}

TransactionProducer

import com.panda.rabbitmq.config.RabbitUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

@Component
public class TransactionProducer {

    private final static String EXCHANGE_NAME = "publisherconfirm-exchange";

    @Resource
    private RabbitUtil rabbitUtil;


    public void send(String routingKey, String message) throws IOException, TimeoutException {
        Channel channel = null;
        try {
            Connection connection = rabbitUtil.newConnection();
            channel = connection.createChannel();
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 聲明一個(gè)direct交換機(jī)
            // 開啟事務(wù)
            channel.txSelect();
            channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));

            //int i = 1 / 0;
            
            // 提交事務(wù)
            channel.txCommit();
        } catch (Exception e) {
            // 回滾事務(wù)
            if (channel != null) {
                channel.txRollback();
            }
        } finally {
            if (channel != null) {
                channel.close();
            }
        }
    }
}

參考:
Springboot整合RabbitMQ

https://zhuanlan.zhihu.com/p/582787505

http://www.uml.org.cn/zjjs/202009082.asp

https://codeleading.com/article/2222792253/

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市鞋仍,隨后出現(xiàn)的幾起案子常摧,更是在濱河造成了極大的恐慌,老刑警劉巖凿试,帶你破解...
    沈念sama閱讀 211,884評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件排宰,死亡現(xiàn)場離奇詭異,居然都是意外死亡那婉,警方通過查閱死者的電腦和手機(jī)板甘,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,347評論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來详炬,“玉大人盐类,你說我怎么就攤上這事∏好眨” “怎么了在跳?”我有些...
    開封第一講書人閱讀 157,435評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長隐岛。 經(jīng)常有香客問我猫妙,道長,這世上最難降的妖魔是什么聚凹? 我笑而不...
    開封第一講書人閱讀 56,509評論 1 284
  • 正文 為了忘掉前任割坠,我火速辦了婚禮,結(jié)果婚禮上妒牙,老公的妹妹穿的比我還像新娘彼哼。我一直安慰自己,他們只是感情好湘今,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,611評論 6 386
  • 文/花漫 我一把揭開白布敢朱。 她就那樣靜靜地躺著,像睡著了一般摩瞎。 火紅的嫁衣襯著肌膚如雪拴签。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,837評論 1 290
  • 那天旗们,我揣著相機(jī)與錄音篓吁,去河邊找鬼。 笑死蚪拦,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播驰贷,決...
    沈念sama閱讀 38,987評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼盛嘿,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了括袒?” 一聲冷哼從身側(cè)響起次兆,我...
    開封第一講書人閱讀 37,730評論 0 267
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎锹锰,沒想到半個(gè)月后芥炭,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,194評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡恃慧,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,525評論 2 327
  • 正文 我和宋清朗相戀三年园蝠,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片痢士。...
    茶點(diǎn)故事閱讀 38,664評論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡彪薛,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出怠蹂,到底是詐尸還是另有隱情善延,我是刑警寧澤,帶...
    沈念sama閱讀 34,334評論 4 330
  • 正文 年R本政府宣布城侧,位于F島的核電站易遣,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏嫌佑。R本人自食惡果不足惜豆茫,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,944評論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望歧强。 院中可真熱鬧澜薄,春花似錦、人聲如沸摊册。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,764評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽茅特。三九已至忘分,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間白修,已是汗流浹背妒峦。 一陣腳步聲響...
    開封第一講書人閱讀 31,997評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留兵睛,地道東北人肯骇。 一個(gè)月前我還...
    沈念sama閱讀 46,389評論 2 360
  • 正文 我出身青樓窥浪,卻偏偏與公主長得像,于是被迫代替她去往敵國和親笛丙。 傳聞我的和親對象是個(gè)殘疾皇子漾脂,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,554評論 2 349

推薦閱讀更多精彩內(nèi)容