pom文件引入依賴
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
yml文件寫入配置
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
publisher-confirm-type: correlated #開啟交換機確認(rèn)功能
publisher-returns: true #開啟消息回退功能
listener:
simple:
acknowledge-mode: manual #開啟手動應(yīng)答機制萎津,默認(rèn)自動應(yīng)答
prefetch: 1 # 消費者每次從隊列獲取的消息數(shù)量,此屬性不設(shè)置時為:輪詢分發(fā)抹镊,設(shè)置1時:公平分發(fā),
#設(shè)置為其他數(shù)字時表示在單個請求中處理的消息個數(shù)锉屈,他應(yīng)該大于等于事務(wù)數(shù)量(unack的最大數(shù)量),用以達到限流的目的
concurrency: 1 #消費者最小數(shù)量
max-concurrency: 10 #消費者最大數(shù)量
retry:
enabled: true #開啟消費者重試
max-attempts: 3 #最大重試次數(shù)
initial-interval: 3000 #重試間隔時間 單位毫秒
配置隊列垮耳,交換機颈渊,路由key等信息
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqCofig {
public static final String QUEUE_SMS="queue_sms"; //短信信息隊列
public static final String EXCHANGE_SMS="exchange_sms";//短信交換機
public static final String ROUTING_SMS="routing_sms"; //短信路由ke
//聲明隊列
@Bean(QUEUE_SMS)
public Queue QUEUE_SMS(){
//創(chuàng)建隊列方法及其屬性值
/*
* 隊列參數(shù)
* durable() 入?yún)㈥犃忻Q,durable终佛,默認(rèn)為true 開啟持久化
* exclusive 是否獨享俊嗽,默認(rèn)false,不獨享,true 獨享 .exclusive()方法開啟铃彰,隊列獨占連接绍豁,隊列只允許在連接中訪問
* autoDelete 是否自動刪除,默認(rèn)false 不自動刪除牙捉, true 自動刪除 .autoDelete()方法開啟
* 其他參數(shù)妹田,例如隊列長度,ttl等 .withArguments()開啟鹃共,入?yún)?map
* 其他參數(shù)例如ttl時長等鬼佣,也可通過內(nèi)置方法.ttl()來設(shè)置
* */
//方法一
return QueueBuilder.durable(QUEUE_SMS).build();
/*
* 隊列參數(shù)
* String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments
* 1.隊列名稱
* 2.是否進行持久化,默認(rèn)true 開啟持久化霜浴,false 不開啟
* 3.是否獨享晶衷,默認(rèn)false,不獨享, true 獨享阴孟,隊列獨占連接晌纫,隊列只允許在連接中訪問
* 4.是否自動刪除,默認(rèn)false 不自動刪除永丝, true 自動刪除
* 5.其他參數(shù)锹漱,例如隊列長度,ttl等
arguments:隊列的其他屬性參數(shù)慕嚷,
(1)x-message-ttl:消息的過期時間哥牍,單位:毫秒毕泌;
(2)x-expires:隊列過期時間,隊列在多長時間未被訪問將被刪除嗅辣,單位:毫秒撼泛;
(3)x-max-length:隊列最大長度,超過該最大值澡谭,則將從隊列頭部開始刪除消息愿题;
(4)x-max-length-bytes:隊列消息內(nèi)容占用最大空間,受限于內(nèi)存大小蛙奖,超過該閾值則從隊列頭部開始刪除消息潘酗;
(5)x-overflow:設(shè)置隊列溢出行為蚓挤。這決定了當(dāng)達到隊列的最大長度時消息會發(fā)生什么顶籽。有效值是drop-head、reject-publish或reject-publish-dlx图甜。仲裁隊列類型僅支持drop-head伯顶;
(6)x-dead-letter-exchange:死信交換器名稱,過期或被刪除(因隊列長度超長或因空間超出閾值)的消息可指定發(fā)送到該交換器中骆膝;
(7)x-dead-letter-routing-key:死信消息路由鍵祭衩,在消息發(fā)送到死信交換器時會使用該路由鍵,如果不設(shè)置阅签,則使用消息的原來的路由鍵值
(8)x-single-active-consumer:表示隊列是否是單一活動消費者掐暮,true時,注冊的消費組內(nèi)只有一個消費者消費消息政钟,其他被忽略路克,false時消息循環(huán)分發(fā)給所有消費者(默認(rèn)false)
(9)x-max-priority:隊列要支持的最大優(yōu)先級數(shù);如果未設(shè)置,隊列將不支持消息優(yōu)先級养交;
(10)x-queue-mode(Lazy mode):將隊列設(shè)置為延遲模式精算,在磁盤上保留盡可能多的消息,以減少RAM的使用;如果未設(shè)置碎连,隊列將保留內(nèi)存緩存以盡可能快地傳遞消息灰羽;
(11)x-queue-master-locator:在集群模式下設(shè)置鏡像隊列的主節(jié)點信息。
* */
//方法二
//return new Queue(QUEUE_SMS);
}
//聲明交換機
@Bean(EXCHANGE_SMS)
public Exchange EXCHANGE_SMS(){
//聲明交換機兩種方法
/*
* 交換機類型 fanout鱼辙、topic廉嚼、direct、headers
* fanout 對應(yīng)的rabbit mq的工作模式是 Publish/Subscribe 發(fā)布訂閱
* topic 對應(yīng) Topics 通配符
* direct 對應(yīng) Routing 路由
* headers 對應(yīng) Header轉(zhuǎn)發(fā)器
* */
//方法一
return ExchangeBuilder.directExchange(EXCHANGE_SMS).build();
//方法二
//return new DirectExchange(EXCHANGE_SMS);
}
//綁定交換機與隊列
@Bean
public Binding queueBingExchangeSms(@Qualifier(QUEUE_SMS) Queue queue,@Qualifier(EXCHANGE_SMS) Exchange exchange){
/*
* 如果聲明交換機的時候沒有指定交換機類型則需添加.noargs()方法
* 如果聲明交換機指定返回類型 如:public DirectExchange EXCHANGE_SMS(){}
* */
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_SMS).noargs();
}
}
controller發(fā)送消息
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
@RequestMapping("mq")
@RestController
public class SendMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("sendSms/{message}")
public void sendSms(@PathVariable String message) {
//交換機,路由key,信息體
rabbitTemplate.convertAndSend(RabbitMqCofig.EXCHANGE_SMS, RabbitMqCofig.ROUTING_SMS, message);
System.out.println("當(dāng)前時間:"+new Date()+",發(fā)送一條短信:"+message);
}
}
監(jiān)聽消息
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Date;
@Component
public class QueueConsumer {
//個人整合代碼
@RabbitListener(queues = RabbitMqCofig.QUEUE_SMS) //監(jiān)聽要收消息的隊列,參數(shù)隊列名稱
public void receiveSms(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
System.out.println("當(dāng)前時間:"+new Date().toString()+",收到信息:"+msg);
}
}
高級特性
死信隊列+TTL延遲消息
在config添加死信隊列,交換機,路由key,并生成綁定
public static final String QUEUE_DEAD="queue_dead"; //死信隊列
public static final String EXCHANGE_DEAD="exchange_dead"; //死信交換機
public static final String ROUTING_DEAD="routing_dead"; //死信路由key
@Bean(QUEUE_DEAD)
public Queue QUEUE_DEAD(){
return QueueBuilder.durable(QUEUE_DEAD).build();
}
@Bean(EXCHANGE_DEAD)
public Exchange EXCHANGE_DEAD(){
return ExchangeBuilder.directExchange(EXCHANGE_DEAD).build();
}
@Bean
public Binding queueBingExchangeDead(@Qualifier(QUEUE_DEAD) Queue queue,@Qualifier(EXCHANGE_DEAD) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_DEAD).noargs();
}
延遲消息可以有三種方式生成
注意:如不重新創(chuàng)建隊列,交換機測試需在重啟系統(tǒng)之前在客戶端頁面刪除交換機,隊列,不然啟動會報錯
取消短信監(jiān)聽方法,改為監(jiān)聽死信隊列,由于消息到期沒有被消費就會轉(zhuǎn)到死信隊列
1.設(shè)置隊列過期時間
此處把短信隊列改為以下代碼,發(fā)送消息代碼不變
@Bean(QUEUE_SMS)
public Queue QUEUE_SMS(){
/*
//兩種設(shè)置方法,二選其一
//key是默認(rèn)固定值固定值
Map<String, Object> args = new HashMap<>();
//聲明當(dāng)前隊列綁定的死信交換機
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//聲明當(dāng)前隊列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//聲明隊列的 TTL
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_SMS).withArguments(args).build();
*/
return QueueBuilder.durable(QUEUE_SMS)
.deadLetterExchange(EXCHANGE_DEAD) //設(shè)置死信交換機
.deadLetterRoutingKey(ROUTING_DEAD) //設(shè)置死信路由key
.ttl(10000) //設(shè)置過期時間,單位毫秒,這里設(shè)置10秒
.build();
}
優(yōu)點:時間準(zhǔn)確倒戏,過期即立馬推送到死信隊列
缺點:時間固定怠噪,不夠靈活多變,如果同一個功能需要多個不同的時間杜跷,則需要創(chuàng)建多個隊列傍念,不友好矫夷,除非時間固定
2.設(shè)置消息過期時間
config配置短信隊列取消時間設(shè)定,只保留死信隊列,路由信息
@Bean(QUEUE_SMS)
public Queue QUEUE_SMS(){
return QueueBuilder.durable(QUEUE_SMS)
.deadLetterExchange(EXCHANGE_DEAD) //設(shè)置死信交換機
.deadLetterRoutingKey(ROUTING_DEAD) //設(shè)置死信路由key
.build();
}
controller發(fā)送消息時設(shè)置消息過期時間
@GetMapping("sendSms/{message}/{ttlTime}")
public void sendSms(@PathVariable String message,@PathVariable String ttlTime) {
//交換機,路由key,信息體
rabbitTemplate.convertAndSend(RabbitMqCofig.EXCHANGE_SMS, RabbitMqCofig.ROUTING_SMS,message,(Message msg) ->{
msg.getMessageProperties().setExpiration(ttlTime);
return msg;
});
System.out.println("當(dāng)前時間:"+new Date()+",發(fā)送一條延遲短信:"+message);
}
優(yōu)點:時間靈活多變,可以隨意設(shè)置消息過期時間
缺點:如果使用在消息屬性上設(shè)置 TTL 的方式捂寿,消息可能并不會按時“死亡“口四,因為 RabbitMQ 只會檢查第一個消息是否過期,如果過期則丟到死信隊列秦陋,如果第一個消息的延時時長很長蔓彩,而第二個消息的延時時長很短,第二個消息并不會優(yōu)先得到執(zhí)行驳概。
即先發(fā)送一個三十秒過期的消息赤嚼,在發(fā)送一個五秒過期的消息,結(jié)果是同時收到兩條消息顺又,因為第一條過期時間沒到更卒,把第二條給堵塞住了。所以即便第二條過期時間很短稚照,也不會先發(fā)送蹂空。
3.安裝延遲插件
插件下載地址
https://www.rabbitmq.com/community-plugins.html
Linux安裝
解壓放置到 RabbitMQ 的插件目錄。
進入 RabbitMQ 的安裝目錄下的 plgins 目錄果录,執(zhí)行下面命令讓該插件生效上枕,然后重啟 RabbitMQ
/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Windows安裝
把 rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez 文件拷貝到RabbitMQ安裝目 錄下的 plugins 目錄。
進入RabbitMQ安裝目錄下的 sbin目錄弱恒,在cmd窗口下執(zhí)行如下命令使插件生效
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
congfig配置新增自定交換機,隊列,路由key
取消死信隊列監(jiān)聽辨萍,改為監(jiān)聽自定義隊列
public static final String QUEUE_CUSTOM="queue_custom"; //自定義隊列
public static final String EXCHANGE_CUSTOM="exchange_custom"; //自定義交換機
public static final String ROUTING_CUSTOM="routing_custom"; //自定義路由key
@Bean(QUEUE_CUSTOM)
public Queue QUEUE_CUSTOM(){
return QueueBuilder.durable(QUEUE_CUSTOM).build();
}
@Bean(EXCHANGE_CUSTOM)
public Exchange EXCHANGE_CUSTOM(){
//由于ExchangeBuilder沒有封裝自定義交換機方法所以使用new方法
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct"); //交換機發(fā)送消息的工作模式 direct,topic返弹,fanout锈玉,headers
/*
* 參數(shù)明細(xì)
* String name, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
* 1.交換機名稱
* 2.交換機類型
* 3.是否持久化
* 4.是否自動刪除
* 5.其他參數(shù)
* */
return new CustomExchange(EXCHANGE_CUSTOM,"x-delayed-message",true,false,args);
}
@Bean
public Binding queueBingExchangeCustom(@Qualifier(QUEUE_CUSTOM) Queue queue,@Qualifier(EXCHANGE_CUSTOM) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_CUSTOM).noargs();
}
發(fā)送方法
@GetMapping("sendSms/{message}/{ttlTime}")
public void sendSms(@PathVariable String message,@PathVariable Integer ttlTime) {
//交換機,路由key,信息體
rabbitTemplate.convertAndSend(RabbitMqCofig.EXCHANGE_CUSTOM, RabbitMqCofig.ROUTING_CUSTOM,message,(Message msg) ->{
//設(shè)置時長方法.setExpiration(ttlTime) 改為.setDelay(ttlTime)
msg.getMessageProperties().setDelay(ttlTime);
return msg;
});
System.out.println("當(dāng)前時間:"+new Date()+",發(fā)送一條延遲短信:"+message);
}
監(jiān)聽方法
@RabbitListener(queues = RabbitMqCofig.QUEUE_CUSTOM) //監(jiān)聽要收消息的隊列,參數(shù)隊列名稱
public void receiveDead(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
System.out.println("當(dāng)前時間:"+new Date().toString()+",收到延遲信息:"+msg);
}
優(yōu)點:發(fā)送的延遲消息統(tǒng)一在交換機處理,等待ttl時間义起,不會有被上一條覆蓋住的問題拉背,可以自定義設(shè)置消息時間。
缺點:需要安裝額外擴展插件
發(fā)布確認(rèn)
開啟發(fā)布確認(rèn)功能默终,pom文件添加
spring.rabbitmq.publisher-confirm-type=correlated //交換機應(yīng)答
spring.rabbitmq.publisher-returns= true //隊列應(yīng)答
.NONE
禁用發(fā)布確認(rèn)模式去团,是默認(rèn)值
.CORRELATED
發(fā)布消息成功到交換器后會觸發(fā)回調(diào)方法
.SIMPLE
經(jīng)測試有兩種效果,其一效果和 CORRELATED 值一樣會觸發(fā)回調(diào)方法穷蛹,其二在發(fā)布消息成功后使用 rabbitTemplate 調(diào)用 waitForConfirms 或waitForConfirmsOrDie 方法等待 broker 節(jié)點返回發(fā)送結(jié)果土陪,根據(jù)返回結(jié)果來判定下一步的邏輯,要注意的點是waitForConfirmsOrDie 方法如果返回 false 則會關(guān)閉 channel肴熏,則接下來無法發(fā)送消息到 broker
消息信息封裝類鬼雀,用于重發(fā)消息
import lombok.Data;
import org.springframework.amqp.rabbit.connection.CorrelationData;
//消息信息封裝類,用于重發(fā)消息
@Data
public class GmallCorrelationData extends CorrelationData {
//消息內(nèi)容
private Object message;
//交換機
private String exchange;
//路由key
private String routingKey;
//重試次數(shù)
private int retryCount=0;
//是否延遲消息蛙吏,true 延遲 false 不延遲
private Boolean isDelay=false;
//延遲時長
private int delayTime;
}
config短信隊列
@Bean(QUEUE_SMS)
public Queue QUEUE_SMS(){
return QueueBuilder.durable(QUEUE_SMS).build();
}
創(chuàng)建交換機源哩,隊列回調(diào)方法類
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
//rabbitTemplate 注入之后就設(shè)置該值鞋吉,把本類注入到rabbitTemplate中,不然調(diào)用rabbitTemplate的這個方法沒有具體實現(xiàn)
@PostConstruct
private void init() {
rabbitTemplate.setConfirmCallback(this);
/**
* true:
* 交換機無法將消息進行路由時励烦,會將該消息返回給生產(chǎn)者
* false:
* 如果發(fā)現(xiàn)消息無法進行路由谓着,則直接丟棄
*/
rabbitTemplate.setMandatory(true); //yml:publisher-returns: true配置true同等效果,二選一
//設(shè)置回退消息交給誰處理
rabbitTemplate.setReturnsCallback(this);
}
/*
* 參數(shù)列表 @NonNull CorrelationData correlationData, boolean ack, @Nullable String cause
* 1.回調(diào)消息id及其余信息
* 2.是否發(fā)送成功坛掠,交換機是否收到消息赊锚,true 成功 false 失敗
* 3.原因,b為true則s為空屉栓,b為false則s為失敗原因
* */
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
//此處能否拿到CorrelationData中的ReturnedMessage消息信息舷蒲,跟交換機時候接收成功無關(guān),而是跟后續(xù)的隊列友多,路由有關(guān)
//如果綁定的路由不對或者隊列不存在牲平,此處可以得到消息其余信息,因為此處只是監(jiān)聽交換機是否收到消息
//路由或隊列正確的話域滥,消息會被發(fā)送出去所以ReturnedMessage為空纵柿,不對消息發(fā)送不出去,所以有值启绰,消息還停留在交換機中
//交換機不正確昂儒,此處也拿不到ReturnedMessage,因為消息根本沒發(fā)送進來
//總結(jié)酬土,交換機不正確ReturnedMessage空值,交換機正確格带,綁定的路由或隊列正確撤缴,空值,交換機正確綁定的路由或隊列不正確叽唱,有值
if (!b){
//可以在此處寫入對失敗消息的處理邏輯屈呕,重新發(fā)送,還是存入數(shù)據(jù)庫棺亭,等待后續(xù)發(fā)送
sendRetry((GmallCorrelationData) correlationData);
}
//如果發(fā)送的是延遲消息(測試使用的插件延遲消息)correlationData是null虎眨,b是true
}
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
//此處監(jiān)聽消息是否被發(fā)送到隊列上等待消費,如果沒有通知生產(chǎn)者镶摘,進行下一步邏輯處理
String id = returnedMessage.getMessage().getMessageProperties().getHeader("spring_returned_message_correlation");
//此處取到id嗽桩,如果GmallCorrelationData以id為key存到redis中可以根據(jù)id獲取消息參數(shù),然后更改重試次數(shù)凄敢,從新放到redis中
//閑麻煩也可以在此處直接發(fā)送消息碌冶,不過有兩個問題
// 1。如果一直發(fā)送失敗涝缝,會進行一個死循環(huán)扑庞,一直發(fā)譬重,一直失敗
//2.重發(fā)之后恰巧在交換機處失敗,因為沒有GmallCorrelationData的消息信息所以無法進行重發(fā)罐氨,消息會丟失
//最好的方法還是發(fā)送消息的時候存進緩存臀规,此處根據(jù)id取值
//如果發(fā)送的是延遲消息(測試使用的插件延遲消息)此處也可以監(jiān)聽到,所以延遲消息要過濾掉不重新發(fā)送
/* GmallCorrelationData correlationData=new GmallCorrelationData();
if (correlationData.getIsDelay()){
return;
}*/
//測試了一下延遲消息如果路由不正確栅隐,好像在消息沒送達之前(ttl沒過期)監(jiān)聽到一次塔嬉,等ttl時間到期也監(jiān)聽不到了,直接丟失
}
//重新發(fā)送消息
public void sendRetry(GmallCorrelationData gmal){
//消息發(fā)送失敗從新發(fā)送三次约啊,(加原有的發(fā)送一共三次)或者一直從新發(fā)送
if (gmal.getRetryCount()<2) {
gmal.setRetryCount(gmal.getRetryCount()+1);
System.out.println("消息發(fā)送失敗邑遏,第"+gmal.getRetryCount()+"次發(fā)送");
rabbitTemplate.convertAndSend(gmal.getExchange(), gmal.getRoutingKey(), gmal.getMessage(), gmal);
}
}
}
消息發(fā)送方法
交換機寫錯confirm方法處理
路由寫錯returnedMessage處理
@GetMapping("sendSms/{message}")
public void sendSms(@PathVariable String message) {
//封裝消息參數(shù)用以confirm失敗的時候從發(fā)消息,延遲消息記得添加延遲參數(shù)
//根據(jù)消息id是取不到消息的參數(shù)的恰矩,所以可以把correlationData存儲到redis緩存中记盒,id作為key 在returnedMessage方法中根據(jù)id去redis中取值
//如果存放到redis中可以設(shè)置一個過期時間,一般五分鐘就夠了外傅,足夠消息重新發(fā)送了
GmallCorrelationData correlationData=new GmallCorrelationData();
correlationData.setId("1"); //實際開發(fā)可以uuid
correlationData.setExchange(RabbitMqCofig.EXCHANGE_SMS+"1");
correlationData.setRoutingKey(RabbitMqCofig.ROUTING_SMS);
correlationData.setMessage(message);
//交換機,路由key,信息體
rabbitTemplate.convertAndSend(RabbitMqCofig.EXCHANGE_SMS, RabbitMqCofig.ROUTING_SMS,message,correlationData);
System.out.println("當(dāng)前時間:"+new Date()+",發(fā)送一條短信短信1:"+message);
}
手動應(yīng)答
yml配置開啟acknowledge-mode
acknowledge-mode: manual #開啟手動應(yīng)答機制纪吮,默認(rèn)自動應(yīng)答
消費者代碼
@RabbitListener(queues = RabbitMqCofig.QUEUE_CUSTOM) //監(jiān)聽要收消息的隊列,參數(shù)隊列名稱
public void receiveDead(Message message, Channel channel) throws IOException {
try {
String msg = new String(message.getBody());
System.out.println("當(dāng)前時間:"+new Date().toString()+",收到信息:"+msg);
//手動應(yīng)答
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}catch (Exception e){
//判斷是否已經(jīng)處理過一次消息
if (message.getMessageProperties().isRedelivered()){
System.out.println("消息已處理過一次,拒絕再次接收");
//拒收消息
//true 消息重新進入隊列萎胰,false不在重新進入隊列碾盟,如果配置了死信交換機,則進入死信
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
}else {
System.out.println("消息沒有處理過技竟,即將再次進入隊列");
//不確認(rèn)消息
//參數(shù)二 是否批量 true批量冰肴,false單個處理
//參數(shù)三 是否從新進入隊列,true重新進入隊列榔组,false不進入
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
}
}
}
備份交換機
當(dāng)正常交換機無法處理消息時熙尉,消息發(fā)送到備份交換機,并推送到隊列
eg:短信交換機接收到消息搓扯,無法發(fā)送到隊列(測試發(fā)送時路由key寫錯)检痰,消息轉(zhuǎn)發(fā)到備份交換機
config添加備份交換機,備份隊列锨推,報警隊列铅歼,路由key并綁定
public static final String QUEUE_BACKUP="queue_backup"; //備份隊列
public static final String QUEUE_WARNING="queue_warning"; //備份報警信息隊列
public static final String EXCHANGE_BACKUP="exchange_backup"; //備份交換機
public static final String ROUTING_BACKUP="routing_backup"; //備份路由key
@Bean(QUEUE_BACKUP)
public Queue QUEUE_BACKUP(){
return QueueBuilder.durable(QUEUE_BACKUP).build();
}
@Bean(QUEUE_WARNING)
public Queue QUEUE_WARNING(){
return QueueBuilder.durable(QUEUE_WARNING).build();
}
@Bean(EXCHANGE_BACKUP)
public Exchange EXCHANGE_BACKUP(){
return ExchangeBuilder.fanoutExchange(EXCHANGE_BACKUP).build();
}
@Bean
public Binding queueBingExchangeBack(@Qualifier(QUEUE_BACKUP) Queue queue,@Qualifier(EXCHANGE_BACKUP) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_BACKUP).noargs();
}
@Bean
public Binding queueWarBingExchangeBack(@Qualifier(QUEUE_WARNING) Queue queue,@Qualifier(EXCHANGE_BACKUP) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_BACKUP).noargs();
}
短信交換機
@Bean(EXCHANGE_SMS)
public Exchange EXCHANGE_SMS(){
//return ExchangeBuilder.directExchange(EXCHANGE_SMS).withArgument("alternate-exchange",EXCHANGE_BACKUP()).build();
return ExchangeBuilder.directExchange(EXCHANGE_SMS).alternate(EXCHANGE_BACKUP).build();
}
發(fā)送消息,監(jiān)聽消息代碼不變换可,監(jiān)聽隊列改下就行
mandatory (消息回退)參數(shù)與備份交換機可以一起使用的時候椎椰,如果兩者同時開啟,消息究竟何去何從沾鳄?誰優(yōu)先級高俭识,經(jīng)過上面結(jié)果顯示答案是備份交換機優(yōu)先級高。
冪等性洞渔,隊列優(yōu)先級套媚,集群缚态,鏡像隊列功能(用到的場景少,冪等性消息全局id+redis防止重復(fù)消費)略堤瘤,太多了玫芦,不想復(fù)制了
完結(jié)撒花。本辐。桥帆。。慎皱。老虫。