springBoot整合rabbitMq

pom文件引入依賴

<!--rabbitmq-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

yml文件寫入配置

rabbitmq:
  host: 127.0.0.1
  port: 5672
  username: guest
  password: guest
  publisher-confirm-type: correlated #開啟交換機確認(rèn)功能
  publisher-returns: true #開啟消息回退功能
  listener:
    simple:
      acknowledge-mode: manual #開啟手動應(yīng)答機制萎津,默認(rèn)自動應(yīng)答
      prefetch: 1 # 消費者每次從隊列獲取的消息數(shù)量,此屬性不設(shè)置時為:輪詢分發(fā)抹镊,設(shè)置1時:公平分發(fā),
                  #設(shè)置為其他數(shù)字時表示在單個請求中處理的消息個數(shù)锉屈,他應(yīng)該大于等于事務(wù)數(shù)量(unack的最大數(shù)量),用以達到限流的目的
      concurrency: 1 #消費者最小數(shù)量
      max-concurrency: 10 #消費者最大數(shù)量
      retry:
        enabled: true #開啟消費者重試
        max-attempts: 3 #最大重試次數(shù)
        initial-interval: 3000 #重試間隔時間 單位毫秒

配置隊列垮耳,交換機颈渊,路由key等信息

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqCofig {

    public static final String QUEUE_SMS="queue_sms"; //短信信息隊列
    public static final String EXCHANGE_SMS="exchange_sms";//短信交換機
    public static final String ROUTING_SMS="routing_sms"; //短信路由ke

    //聲明隊列
    @Bean(QUEUE_SMS)
    public Queue QUEUE_SMS(){
        //創(chuàng)建隊列方法及其屬性值
        /*
        * 隊列參數(shù)
        * durable() 入?yún)㈥犃忻Q,durable终佛,默認(rèn)為true 開啟持久化
        * exclusive 是否獨享俊嗽,默認(rèn)false,不獨享,true 獨享 .exclusive()方法開啟铃彰,隊列獨占連接绍豁,隊列只允許在連接中訪問
        * autoDelete 是否自動刪除,默認(rèn)false 不自動刪除牙捉, true 自動刪除 .autoDelete()方法開啟
        * 其他參數(shù)妹田,例如隊列長度,ttl等 .withArguments()開啟鹃共,入?yún)?map
        * 其他參數(shù)例如ttl時長等鬼佣,也可通過內(nèi)置方法.ttl()來設(shè)置
        * */
        //方法一
        return QueueBuilder.durable(QUEUE_SMS).build();

        /*
        * 隊列參數(shù)
        * String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments
        * 1.隊列名稱
        * 2.是否進行持久化,默認(rèn)true 開啟持久化霜浴,false 不開啟
        * 3.是否獨享晶衷,默認(rèn)false,不獨享, true 獨享阴孟,隊列獨占連接晌纫,隊列只允許在連接中訪問
        * 4.是否自動刪除,默認(rèn)false 不自動刪除永丝, true 自動刪除
        * 5.其他參數(shù)锹漱,例如隊列長度,ttl等
        arguments:隊列的其他屬性參數(shù)慕嚷,
         (1)x-message-ttl:消息的過期時間哥牍,單位:毫秒毕泌;
         (2)x-expires:隊列過期時間,隊列在多長時間未被訪問將被刪除嗅辣,單位:毫秒撼泛;
         (3)x-max-length:隊列最大長度,超過該最大值澡谭,則將從隊列頭部開始刪除消息愿题;
         (4)x-max-length-bytes:隊列消息內(nèi)容占用最大空間,受限于內(nèi)存大小蛙奖,超過該閾值則從隊列頭部開始刪除消息潘酗;
         (5)x-overflow:設(shè)置隊列溢出行為蚓挤。這決定了當(dāng)達到隊列的最大長度時消息會發(fā)生什么顶籽。有效值是drop-head、reject-publish或reject-publish-dlx图甜。仲裁隊列類型僅支持drop-head伯顶;
         (6)x-dead-letter-exchange:死信交換器名稱,過期或被刪除(因隊列長度超長或因空間超出閾值)的消息可指定發(fā)送到該交換器中骆膝;
         (7)x-dead-letter-routing-key:死信消息路由鍵祭衩,在消息發(fā)送到死信交換器時會使用該路由鍵,如果不設(shè)置阅签,則使用消息的原來的路由鍵值
         (8)x-single-active-consumer:表示隊列是否是單一活動消費者掐暮,true時,注冊的消費組內(nèi)只有一個消費者消費消息政钟,其他被忽略路克,false時消息循環(huán)分發(fā)給所有消費者(默認(rèn)false)
         (9)x-max-priority:隊列要支持的最大優(yōu)先級數(shù);如果未設(shè)置,隊列將不支持消息優(yōu)先級养交;
         (10)x-queue-mode(Lazy mode):將隊列設(shè)置為延遲模式精算,在磁盤上保留盡可能多的消息,以減少RAM的使用;如果未設(shè)置碎连,隊列將保留內(nèi)存緩存以盡可能快地傳遞消息灰羽;
         (11)x-queue-master-locator:在集群模式下設(shè)置鏡像隊列的主節(jié)點信息。
        * */
        //方法二
        //return new Queue(QUEUE_SMS);
    }

    //聲明交換機
    @Bean(EXCHANGE_SMS)
    public Exchange EXCHANGE_SMS(){
        //聲明交換機兩種方法
        /*
         * 交換機類型 fanout鱼辙、topic廉嚼、direct、headers
         * fanout 對應(yīng)的rabbit mq的工作模式是 Publish/Subscribe 發(fā)布訂閱
         * topic  對應(yīng) Topics 通配符
         * direct 對應(yīng) Routing 路由
         * headers 對應(yīng) Header轉(zhuǎn)發(fā)器
         * */
        //方法一
        return ExchangeBuilder.directExchange(EXCHANGE_SMS).build();

        //方法二
        //return new DirectExchange(EXCHANGE_SMS);
    }

    //綁定交換機與隊列
    @Bean
    public Binding queueBingExchangeSms(@Qualifier(QUEUE_SMS) Queue queue,@Qualifier(EXCHANGE_SMS) Exchange exchange){
        /*
        * 如果聲明交換機的時候沒有指定交換機類型則需添加.noargs()方法
        * 如果聲明交換機指定返回類型 如:public DirectExchange EXCHANGE_SMS(){}
        * */
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_SMS).noargs();
    }

}

controller發(fā)送消息

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

@RequestMapping("mq")
@RestController
public class SendMsgController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("sendSms/{message}")
    public void sendSms(@PathVariable String message) {
        //交換機,路由key,信息體
        rabbitTemplate.convertAndSend(RabbitMqCofig.EXCHANGE_SMS, RabbitMqCofig.ROUTING_SMS, message);
        System.out.println("當(dāng)前時間:"+new Date()+",發(fā)送一條短信:"+message);
    }
}

監(jiān)聽消息

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Date;

@Component
public class QueueConsumer {

    //個人整合代碼
    @RabbitListener(queues = RabbitMqCofig.QUEUE_SMS) //監(jiān)聽要收消息的隊列,參數(shù)隊列名稱
    public void receiveSms(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        System.out.println("當(dāng)前時間:"+new Date().toString()+",收到信息:"+msg);
    }
}

高級特性
死信隊列+TTL延遲消息
在config添加死信隊列,交換機,路由key,并生成綁定

public static final String QUEUE_DEAD="queue_dead"; //死信隊列
public static final String EXCHANGE_DEAD="exchange_dead"; //死信交換機
public static final String ROUTING_DEAD="routing_dead"; //死信路由key

@Bean(QUEUE_DEAD)
public Queue QUEUE_DEAD(){
    return QueueBuilder.durable(QUEUE_DEAD).build();
}

@Bean(EXCHANGE_DEAD)
public Exchange EXCHANGE_DEAD(){
    return ExchangeBuilder.directExchange(EXCHANGE_DEAD).build();
}

@Bean
public Binding queueBingExchangeDead(@Qualifier(QUEUE_DEAD) Queue queue,@Qualifier(EXCHANGE_DEAD) Exchange exchange){
    return BindingBuilder.bind(queue).to(exchange).with(ROUTING_DEAD).noargs();
}

延遲消息可以有三種方式生成
注意:如不重新創(chuàng)建隊列,交換機測試需在重啟系統(tǒng)之前在客戶端頁面刪除交換機,隊列,不然啟動會報錯
取消短信監(jiān)聽方法,改為監(jiān)聽死信隊列,由于消息到期沒有被消費就會轉(zhuǎn)到死信隊列

1.設(shè)置隊列過期時間
此處把短信隊列改為以下代碼,發(fā)送消息代碼不變

@Bean(QUEUE_SMS)
public Queue QUEUE_SMS(){
    /*
    //兩種設(shè)置方法,二選其一
    //key是默認(rèn)固定值固定值
    Map<String, Object> args = new HashMap<>();
    //聲明當(dāng)前隊列綁定的死信交換機
    args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
    //聲明當(dāng)前隊列的死信路由 key
    args.put("x-dead-letter-routing-key", "YD");
    //聲明隊列的 TTL
    args.put("x-message-ttl", 10000);
    return QueueBuilder.durable(QUEUE_SMS).withArguments(args).build();
    */
    return QueueBuilder.durable(QUEUE_SMS)
            .deadLetterExchange(EXCHANGE_DEAD) //設(shè)置死信交換機
            .deadLetterRoutingKey(ROUTING_DEAD) //設(shè)置死信路由key
            .ttl(10000) //設(shè)置過期時間,單位毫秒,這里設(shè)置10秒
            .build();
}

優(yōu)點:時間準(zhǔn)確倒戏,過期即立馬推送到死信隊列
缺點:時間固定怠噪,不夠靈活多變,如果同一個功能需要多個不同的時間杜跷,則需要創(chuàng)建多個隊列傍念,不友好矫夷,除非時間固定

2.設(shè)置消息過期時間
config配置短信隊列取消時間設(shè)定,只保留死信隊列,路由信息

@Bean(QUEUE_SMS)
public Queue QUEUE_SMS(){
    return QueueBuilder.durable(QUEUE_SMS)
            .deadLetterExchange(EXCHANGE_DEAD) //設(shè)置死信交換機
            .deadLetterRoutingKey(ROUTING_DEAD) //設(shè)置死信路由key
            .build();
}

controller發(fā)送消息時設(shè)置消息過期時間

@GetMapping("sendSms/{message}/{ttlTime}")
public void sendSms(@PathVariable String message,@PathVariable String ttlTime) {
    //交換機,路由key,信息體
    rabbitTemplate.convertAndSend(RabbitMqCofig.EXCHANGE_SMS, RabbitMqCofig.ROUTING_SMS,message,(Message msg) ->{
        msg.getMessageProperties().setExpiration(ttlTime);
        return msg;
    });
    System.out.println("當(dāng)前時間:"+new Date()+",發(fā)送一條延遲短信:"+message);
}

優(yōu)點:時間靈活多變,可以隨意設(shè)置消息過期時間
缺點:如果使用在消息屬性上設(shè)置 TTL 的方式捂寿,消息可能并不會按時“死亡“口四,因為 RabbitMQ 只會檢查第一個消息是否過期,如果過期則丟到死信隊列秦陋,如果第一個消息的延時時長很長蔓彩,而第二個消息的延時時長很短,第二個消息并不會優(yōu)先得到執(zhí)行驳概。
即先發(fā)送一個三十秒過期的消息赤嚼,在發(fā)送一個五秒過期的消息,結(jié)果是同時收到兩條消息顺又,因為第一條過期時間沒到更卒,把第二條給堵塞住了。所以即便第二條過期時間很短稚照,也不會先發(fā)送蹂空。

3.安裝延遲插件
插件下載地址
https://www.rabbitmq.com/community-plugins.html
Linux安裝

image.png

解壓放置到 RabbitMQ 的插件目錄。
進入 RabbitMQ 的安裝目錄下的 plgins 目錄果录,執(zhí)行下面命令讓該插件生效上枕,然后重啟 RabbitMQ

/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

Windows安裝


image.png

把 rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez 文件拷貝到RabbitMQ安裝目 錄下的 plugins 目錄。
進入RabbitMQ安裝目錄下的 sbin目錄弱恒,在cmd窗口下執(zhí)行如下命令使插件生效

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
image.png

congfig配置新增自定交換機,隊列,路由key
取消死信隊列監(jiān)聽辨萍,改為監(jiān)聽自定義隊列

public static final String QUEUE_CUSTOM="queue_custom"; //自定義隊列
public static final String EXCHANGE_CUSTOM="exchange_custom"; //自定義交換機
public static final String ROUTING_CUSTOM="routing_custom"; //自定義路由key

@Bean(QUEUE_CUSTOM)
public Queue QUEUE_CUSTOM(){
    return QueueBuilder.durable(QUEUE_CUSTOM).build();
}

@Bean(EXCHANGE_CUSTOM)
public Exchange EXCHANGE_CUSTOM(){
    //由于ExchangeBuilder沒有封裝自定義交換機方法所以使用new方法
    Map<String, Object> args = new HashMap<>();
    args.put("x-delayed-type", "direct"); //交換機發(fā)送消息的工作模式 direct,topic返弹,fanout锈玉,headers
    /*
    * 參數(shù)明細(xì)
    * String name, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
    * 1.交換機名稱
    * 2.交換機類型
    * 3.是否持久化
    * 4.是否自動刪除
    * 5.其他參數(shù)
    * */
    return new CustomExchange(EXCHANGE_CUSTOM,"x-delayed-message",true,false,args);
}

@Bean
public Binding queueBingExchangeCustom(@Qualifier(QUEUE_CUSTOM) Queue queue,@Qualifier(EXCHANGE_CUSTOM) Exchange exchange){
    return BindingBuilder.bind(queue).to(exchange).with(ROUTING_CUSTOM).noargs();
}

發(fā)送方法

@GetMapping("sendSms/{message}/{ttlTime}")
public void sendSms(@PathVariable String message,@PathVariable Integer ttlTime) {
    //交換機,路由key,信息體
    rabbitTemplate.convertAndSend(RabbitMqCofig.EXCHANGE_CUSTOM, RabbitMqCofig.ROUTING_CUSTOM,message,(Message msg) ->{
        //設(shè)置時長方法.setExpiration(ttlTime) 改為.setDelay(ttlTime)
        msg.getMessageProperties().setDelay(ttlTime);
        return msg;
    });
    System.out.println("當(dāng)前時間:"+new Date()+",發(fā)送一條延遲短信:"+message);
}

監(jiān)聽方法

@RabbitListener(queues = RabbitMqCofig.QUEUE_CUSTOM) //監(jiān)聽要收消息的隊列,參數(shù)隊列名稱
public void receiveDead(Message message, Channel channel) throws IOException {
    String msg = new String(message.getBody());
    System.out.println("當(dāng)前時間:"+new Date().toString()+",收到延遲信息:"+msg);
}

優(yōu)點:發(fā)送的延遲消息統(tǒng)一在交換機處理,等待ttl時間义起,不會有被上一條覆蓋住的問題拉背,可以自定義設(shè)置消息時間。

缺點:需要安裝額外擴展插件

發(fā)布確認(rèn)
開啟發(fā)布確認(rèn)功能默终,pom文件添加

spring.rabbitmq.publisher-confirm-type=correlated //交換機應(yīng)答
spring.rabbitmq.publisher-returns= true //隊列應(yīng)答

.NONE
禁用發(fā)布確認(rèn)模式去团,是默認(rèn)值
.CORRELATED
發(fā)布消息成功到交換器后會觸發(fā)回調(diào)方法

.SIMPLE
經(jīng)測試有兩種效果,其一效果和 CORRELATED 值一樣會觸發(fā)回調(diào)方法穷蛹,其二在發(fā)布消息成功后使用 rabbitTemplate 調(diào)用 waitForConfirms 或waitForConfirmsOrDie 方法等待 broker 節(jié)點返回發(fā)送結(jié)果土陪,根據(jù)返回結(jié)果來判定下一步的邏輯,要注意的點是waitForConfirmsOrDie 方法如果返回 false 則會關(guān)閉 channel肴熏,則接下來無法發(fā)送消息到 broker

消息信息封裝類鬼雀,用于重發(fā)消息

import lombok.Data;
import org.springframework.amqp.rabbit.connection.CorrelationData;

//消息信息封裝類,用于重發(fā)消息
@Data
public class GmallCorrelationData extends CorrelationData {

    //消息內(nèi)容
    private Object message;
    //交換機
    private String exchange;
    //路由key
    private String routingKey;
    //重試次數(shù)
    private int retryCount=0;
    //是否延遲消息蛙吏,true 延遲 false 不延遲
    private Boolean isDelay=false;
    //延遲時長
    private int delayTime;

}

config短信隊列

@Bean(QUEUE_SMS)
public Queue QUEUE_SMS(){
    return QueueBuilder.durable(QUEUE_SMS).build();
}

創(chuàng)建交換機源哩,隊列回調(diào)方法類

import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{

    @Autowired
    private RabbitTemplate rabbitTemplate;
    //rabbitTemplate 注入之后就設(shè)置該值鞋吉,把本類注入到rabbitTemplate中,不然調(diào)用rabbitTemplate的這個方法沒有具體實現(xiàn)
    @PostConstruct
    private void init() {
        rabbitTemplate.setConfirmCallback(this);
        /**
         * true:
         * 交換機無法將消息進行路由時励烦,會將該消息返回給生產(chǎn)者
         * false:
         * 如果發(fā)現(xiàn)消息無法進行路由谓着,則直接丟棄
         */
        rabbitTemplate.setMandatory(true); //yml:publisher-returns: true配置true同等效果,二選一
        //設(shè)置回退消息交給誰處理
        rabbitTemplate.setReturnsCallback(this);
    }


    /*
    * 參數(shù)列表 @NonNull CorrelationData correlationData, boolean ack, @Nullable String cause
    * 1.回調(diào)消息id及其余信息
    * 2.是否發(fā)送成功坛掠,交換機是否收到消息赊锚,true 成功 false 失敗
    * 3.原因,b為true則s為空屉栓,b為false則s為失敗原因
    * */
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        //此處能否拿到CorrelationData中的ReturnedMessage消息信息舷蒲,跟交換機時候接收成功無關(guān),而是跟后續(xù)的隊列友多,路由有關(guān)
        //如果綁定的路由不對或者隊列不存在牲平,此處可以得到消息其余信息,因為此處只是監(jiān)聽交換機是否收到消息
        //路由或隊列正確的話域滥,消息會被發(fā)送出去所以ReturnedMessage為空纵柿,不對消息發(fā)送不出去,所以有值启绰,消息還停留在交換機中
        //交換機不正確昂儒,此處也拿不到ReturnedMessage,因為消息根本沒發(fā)送進來
        //總結(jié)酬土,交換機不正確ReturnedMessage空值,交換機正確格带,綁定的路由或隊列正確撤缴,空值,交換機正確綁定的路由或隊列不正確叽唱,有值
        if (!b){
            //可以在此處寫入對失敗消息的處理邏輯屈呕,重新發(fā)送,還是存入數(shù)據(jù)庫棺亭,等待后續(xù)發(fā)送
            sendRetry((GmallCorrelationData) correlationData);
        }

        //如果發(fā)送的是延遲消息(測試使用的插件延遲消息)correlationData是null虎眨,b是true
    }


    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        //此處監(jiān)聽消息是否被發(fā)送到隊列上等待消費,如果沒有通知生產(chǎn)者镶摘,進行下一步邏輯處理
        String id = returnedMessage.getMessage().getMessageProperties().getHeader("spring_returned_message_correlation");
        //此處取到id嗽桩,如果GmallCorrelationData以id為key存到redis中可以根據(jù)id獲取消息參數(shù),然后更改重試次數(shù)凄敢,從新放到redis中

        //閑麻煩也可以在此處直接發(fā)送消息碌冶,不過有兩個問題
        // 1。如果一直發(fā)送失敗涝缝,會進行一個死循環(huán)扑庞,一直發(fā)譬重,一直失敗
        //2.重發(fā)之后恰巧在交換機處失敗,因為沒有GmallCorrelationData的消息信息所以無法進行重發(fā)罐氨,消息會丟失
        //最好的方法還是發(fā)送消息的時候存進緩存臀规,此處根據(jù)id取值


        //如果發(fā)送的是延遲消息(測試使用的插件延遲消息)此處也可以監(jiān)聽到,所以延遲消息要過濾掉不重新發(fā)送
       /* GmallCorrelationData correlationData=new GmallCorrelationData();
        if (correlationData.getIsDelay()){
            return;
        }*/
        //測試了一下延遲消息如果路由不正確栅隐,好像在消息沒送達之前(ttl沒過期)監(jiān)聽到一次塔嬉,等ttl時間到期也監(jiān)聽不到了,直接丟失



    }

    //重新發(fā)送消息
    public void sendRetry(GmallCorrelationData gmal){
        //消息發(fā)送失敗從新發(fā)送三次约啊,(加原有的發(fā)送一共三次)或者一直從新發(fā)送
        if (gmal.getRetryCount()<2) {
            gmal.setRetryCount(gmal.getRetryCount()+1);
            System.out.println("消息發(fā)送失敗邑遏,第"+gmal.getRetryCount()+"次發(fā)送");
            rabbitTemplate.convertAndSend(gmal.getExchange(), gmal.getRoutingKey(), gmal.getMessage(), gmal);
        }
    }

}

消息發(fā)送方法
交換機寫錯confirm方法處理
路由寫錯returnedMessage處理

@GetMapping("sendSms/{message}")
public void sendSms(@PathVariable String message) {

    //封裝消息參數(shù)用以confirm失敗的時候從發(fā)消息,延遲消息記得添加延遲參數(shù)
    //根據(jù)消息id是取不到消息的參數(shù)的恰矩,所以可以把correlationData存儲到redis緩存中记盒,id作為key 在returnedMessage方法中根據(jù)id去redis中取值
    //如果存放到redis中可以設(shè)置一個過期時間,一般五分鐘就夠了外傅,足夠消息重新發(fā)送了
    GmallCorrelationData correlationData=new GmallCorrelationData();
    correlationData.setId("1"); //實際開發(fā)可以uuid
    correlationData.setExchange(RabbitMqCofig.EXCHANGE_SMS+"1");
    correlationData.setRoutingKey(RabbitMqCofig.ROUTING_SMS);
    correlationData.setMessage(message);

    //交換機,路由key,信息體
    rabbitTemplate.convertAndSend(RabbitMqCofig.EXCHANGE_SMS, RabbitMqCofig.ROUTING_SMS,message,correlationData);
    System.out.println("當(dāng)前時間:"+new Date()+",發(fā)送一條短信短信1:"+message);

}

手動應(yīng)答
yml配置開啟acknowledge-mode

acknowledge-mode: manual #開啟手動應(yīng)答機制纪吮,默認(rèn)自動應(yīng)答

消費者代碼

@RabbitListener(queues = RabbitMqCofig.QUEUE_CUSTOM) //監(jiān)聽要收消息的隊列,參數(shù)隊列名稱
public void receiveDead(Message message, Channel channel) throws IOException {
    try {

        String msg = new String(message.getBody());
        System.out.println("當(dāng)前時間:"+new Date().toString()+",收到信息:"+msg);
        //手動應(yīng)答
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    }catch (Exception e){

         //判斷是否已經(jīng)處理過一次消息
         if (message.getMessageProperties().isRedelivered()){
             System.out.println("消息已處理過一次,拒絕再次接收");
             //拒收消息
             //true 消息重新進入隊列萎胰,false不在重新進入隊列碾盟,如果配置了死信交換機,則進入死信
             channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
         }else {
             System.out.println("消息沒有處理過技竟,即將再次進入隊列");
             //不確認(rèn)消息
             //參數(shù)二 是否批量 true批量冰肴,false單個處理
             //參數(shù)三 是否從新進入隊列,true重新進入隊列榔组,false不進入
             channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
         }
    }

}

備份交換機
當(dāng)正常交換機無法處理消息時熙尉,消息發(fā)送到備份交換機,并推送到隊列
eg:短信交換機接收到消息搓扯,無法發(fā)送到隊列(測試發(fā)送時路由key寫錯)检痰,消息轉(zhuǎn)發(fā)到備份交換機

config添加備份交換機,備份隊列锨推,報警隊列铅歼,路由key并綁定

public static final String QUEUE_BACKUP="queue_backup"; //備份隊列
public static final String QUEUE_WARNING="queue_warning"; //備份報警信息隊列
public static final String EXCHANGE_BACKUP="exchange_backup"; //備份交換機
public static final String ROUTING_BACKUP="routing_backup"; //備份路由key

@Bean(QUEUE_BACKUP)
public Queue QUEUE_BACKUP(){
    return QueueBuilder.durable(QUEUE_BACKUP).build();
}

@Bean(QUEUE_WARNING)
public Queue QUEUE_WARNING(){
    return QueueBuilder.durable(QUEUE_WARNING).build();
}

@Bean(EXCHANGE_BACKUP)
public Exchange EXCHANGE_BACKUP(){
    return ExchangeBuilder.fanoutExchange(EXCHANGE_BACKUP).build();
}

@Bean
public Binding queueBingExchangeBack(@Qualifier(QUEUE_BACKUP) Queue queue,@Qualifier(EXCHANGE_BACKUP) Exchange exchange){
    return BindingBuilder.bind(queue).to(exchange).with(ROUTING_BACKUP).noargs();
}

@Bean
public Binding queueWarBingExchangeBack(@Qualifier(QUEUE_WARNING) Queue queue,@Qualifier(EXCHANGE_BACKUP) Exchange exchange){
    return BindingBuilder.bind(queue).to(exchange).with(ROUTING_BACKUP).noargs();
}

短信交換機

@Bean(EXCHANGE_SMS)
public Exchange EXCHANGE_SMS(){
    //return ExchangeBuilder.directExchange(EXCHANGE_SMS).withArgument("alternate-exchange",EXCHANGE_BACKUP()).build();
    return ExchangeBuilder.directExchange(EXCHANGE_SMS).alternate(EXCHANGE_BACKUP).build();
}

發(fā)送消息,監(jiān)聽消息代碼不變换可,監(jiān)聽隊列改下就行

mandatory (消息回退)參數(shù)與備份交換機可以一起使用的時候椎椰,如果兩者同時開啟,消息究竟何去何從沾鳄?誰優(yōu)先級高俭识,經(jīng)過上面結(jié)果顯示答案是備份交換機優(yōu)先級高。

冪等性洞渔,隊列優(yōu)先級套媚,集群缚态,鏡像隊列功能(用到的場景少,冪等性消息全局id+redis防止重復(fù)消費)略堤瘤,太多了玫芦,不想復(fù)制了
完結(jié)撒花。本辐。桥帆。。慎皱。老虫。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市茫多,隨后出現(xiàn)的幾起案子祈匙,更是在濱河造成了極大的恐慌,老刑警劉巖天揖,帶你破解...
    沈念sama閱讀 218,122評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件夺欲,死亡現(xiàn)場離奇詭異,居然都是意外死亡今膊,警方通過查閱死者的電腦和手機些阅,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來斑唬,“玉大人市埋,你說我怎么就攤上這事∷×酰” “怎么了缤谎?”我有些...
    開封第一講書人閱讀 164,491評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長雪营。 經(jīng)常有香客問我弓千,道長衡便,這世上最難降的妖魔是什么献起? 我笑而不...
    開封第一講書人閱讀 58,636評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮镣陕,結(jié)果婚禮上谴餐,老公的妹妹穿的比我還像新娘。我一直安慰自己呆抑,他們只是感情好岂嗓,可當(dāng)我...
    茶點故事閱讀 67,676評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著鹊碍,像睡著了一般厌殉。 火紅的嫁衣襯著肌膚如雪食绿。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,541評論 1 305
  • 那天公罕,我揣著相機與錄音器紧,去河邊找鬼。 笑死楼眷,一個胖子當(dāng)著我的面吹牛铲汪,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播罐柳,決...
    沈念sama閱讀 40,292評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼掌腰,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了张吉?” 一聲冷哼從身側(cè)響起齿梁,我...
    開封第一講書人閱讀 39,211評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎芦拿,沒想到半個月后士飒,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,655評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡蔗崎,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,846評論 3 336
  • 正文 我和宋清朗相戀三年酵幕,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片缓苛。...
    茶點故事閱讀 39,965評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡芳撒,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出未桥,到底是詐尸還是另有隱情笔刹,我是刑警寧澤,帶...
    沈念sama閱讀 35,684評論 5 347
  • 正文 年R本政府宣布冬耿,位于F島的核電站舌菜,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏亦镶。R本人自食惡果不足惜日月,卻給世界環(huán)境...
    茶點故事閱讀 41,295評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望缤骨。 院中可真熱鬧爱咬,春花似錦、人聲如沸绊起。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至蜂绎,卻和暖如春栅表,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背师枣。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評論 1 269
  • 我被黑心中介騙來泰國打工谨读, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人坛吁。 一個月前我還...
    沈念sama閱讀 48,126評論 3 370
  • 正文 我出身青樓劳殖,卻偏偏與公主長得像,于是被迫代替她去往敵國和親拨脉。 傳聞我的和親對象是個殘疾皇子哆姻,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,914評論 2 355

推薦閱讀更多精彩內(nèi)容