1. 死信隊列之延遲隊列
死信隊列:用來保存處理失敗或者過期的消息忿危,確保消息不被丟失以便排查問題姻乓!
延遲隊列:顧名思義就是消息在隊列中存在一定時間后再被消費。比如下單后半小時沒有支付的訂單自動取消跷叉,比如預(yù)約某項功能時提前15分鐘提醒嫌吠,比如希望某一個功能在多長時間后執(zhí)行等都可以使用延遲隊列。
- RabbitMQ本身是沒有延遲隊列功能的搁拙,但是可以通過死信隊列的TTL和DLX模擬延遲隊列功能秒梳。
- Time To Live:可以在發(fā)送消息時設(shè)置過期時間,也可以設(shè)置整個隊列的過期時間箕速,如果兩個同時設(shè)置已最早過期時間為準(zhǔn)酪碘。
- Dead Letter Exchanges:可以通過綁定隊列的死信交換器來實現(xiàn)死信隊列。
x-dead-letter-exchange:綁定死信交換器(其實也是普通交換器盐茎,與類型無關(guān))
x-dead-letter-routing-key:綁定死信隊列的路由鍵(可選)
x-message-ttl:綁定隊列消息的過期時間(可選)
- 死信隊列設(shè)計思路
生產(chǎn)者 --> 消息 --> 交換機 --> 隊列 --> 變成死信 --> DLX交換機 -->隊列 --> 消費者
進(jìn)入消息隊列:
1. 消息被拒絕兴垦,并且requeue= false
2. 消息ttl過期
3. 隊列達(dá)到最大的長度
做延遲隊列需要創(chuàng)建一個沒有消費者的隊列,用了存儲消息字柠。然后創(chuàng)建一個真正的消費隊列探越,用來做具體的業(yè)務(wù)邏輯。當(dāng)帶有TTL的消息到達(dá)綁定死信交換器的隊列窑业,因為沒有消費者所以會一直等到消息過期钦幔,然后消息被投遞到死信隊列也就是真正的消費隊列。
新建配置類MQDelayConfig.java常柄,創(chuàng)建支付交換器鲤氢、支付隊列綁定死信隊列搀擂、他們的綁定關(guān)系。無消費者卷玉,暫時不知道怎么用注解創(chuàng)建哨颂。
設(shè)置x-dead-letter-exchange、x-dead-letter-routing-key揍庄、x-message-ttl咆蒿。
package com.fzb.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* @Description 利用死信隊列和過期時間模擬延遲隊列,沒有消費者蚂子,所以不能用注解形式
* Time To Live(TTL)
* 1. 可以在發(fā)送消息時設(shè)置過期時間(message.getMessageProperties().setExpiration("5000");)
* 2. 也可以設(shè)置整個隊列的過期時間(args.put("x-message-ttl",10000);)
* 3. 如果兩個同時設(shè)置已最早過期時間為準(zhǔn)
* Dead Letter Exchanges(DLX)
* @Author jxb
* @Date 2019-03-10 10:25:30
*/
@Component
public class MQDelayConfig {
/**
* @Description 定義支付交換器
* @Author jxb
* @Date 2019-04-02 14:39:31
*/
@Bean
private DirectExchange directPayExchange() {
return new DirectExchange("direct.pay.exchange");
}
/**
* @Description 定義支付隊列 綁定死信隊列(其實是綁定的交換器沃测,然后通過交換器路由鍵綁定隊列) 設(shè)置過期時間
* @Author jxb
* @Date 2019-04-02 14:40:24
*/
@Bean
private Queue directPayQueue() {
Map<String, Object> args = new HashMap<>(3);
//聲明死信交換器
args.put("x-dead-letter-exchange", "direct.delay.exchange");
//聲明死信路由鍵
args.put("x-dead-letter-routing-key", "DelayKey");
//聲明隊列消息過期時間
args.put("x-message-ttl", 10000);
return new Queue("direct.pay.queue", true, false, false, args);
}
/**
* @Description 定義支付綁定
* @Author jxb
* @Date 2019-04-02 14:46:10
*/
@Bean
private Binding bindingOrderDirect() {
return BindingBuilder.bind(directPayQueue()).to(directPayExchange()).with("OrderPay");
}
}
- 帶有過期時間且綁定死信交換器的隊列
- 生產(chǎn)者,為消息設(shè)置過期時間setExpiration("15000");
/**
* @Description 支付隊列食茎、綁定死信隊列蒂破,測試消息延遲功能
* @Author jxb
* @Date 2019-04-02 14:07:25
*/
@RequestMapping(value = "/directDelayMQ", method = {RequestMethod.GET})
public List<User> directDelayMQ() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
List<User> users = userService.getUserList(null);
for (User user : users) {
CorrelationData correlationData = new CorrelationData(String.valueOf(user.getId()));
rabbitTemplate.convertAndSend("direct.pay.exchange", "OrderPay", user,
message -> {
// 設(shè)置5秒過期
message.getMessageProperties().setExpiration("15000");
return message;
},
correlationData);
System.out.println(user.getName() + ":" + sdf.format(new Date()));
}
return users;
}
- 消費者,聲明真正消費的隊列别渔、交換器附迷、綁定
/**
* @Description 延遲隊列
* @Author jxb
* @Date 2019-04-04 16:34:28
*/
@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "direct.delay.queue"), exchange = @Exchange(value = "direct.delay.exchange"), key = {"DelayKey"})})
public void getDLMessage(User user, Channel channel, Message message) throws InterruptedException, IOException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// 模擬執(zhí)行任務(wù)
System.out.println("這是延遲隊列消費:" + user.getName() + ":" + sdf.format(new Date()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
測試結(jié)果,因為消息配置的是15秒后到期哎媚,而隊列配置了10秒到期喇伯,所以最終按照時間短的計算。
思考: 如果先放入一條A消息過期時間是10秒拨与,再放入一個b消息過期時間是5秒稻据,那延遲隊列是否可以先消費b消息?
答案是否定的买喧,因為隊列就會遵循先進(jìn)先出的規(guī)則捻悯,b消息會等a消息過期后,一起消費淤毛,這就是所謂的隊列阻塞今缚。由這個問題我們引出插件形式來實現(xiàn)延遲隊列
2. 用rabbitmq-delayed-message-exchange插件實現(xiàn)延遲隊列
強烈建議安裝erlang20+版本和RabbitMQ3.7+版本,另插件版本要和RabbitMQ版本一致低淡。
解壓成.ez的文件姓言,上傳到RabbitMQ安裝目錄的plugins文件夾下,停止服務(wù)器蔗蹋,開啟插件事期,啟動服務(wù)器。
1. 查看yum 安裝的軟件路徑
查找安裝包:rpm -qa|grep rabbitmq
查找位置: rpm -ql rabbitmq-server-3.6.15-1.el6.noarch
卸載yum安裝:yum remove rabbitmq-server-3.6.15-1.el6.noarch
2. 上傳到plugins文件夾
3. 停止服務(wù)器
service rabbitmq-server stop
4. 開啟插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
(關(guān)閉插件)
rabbitmq-plugins disable rabbitmq_delayed_message_exchange
5. 啟動服務(wù)器
service rabbitmq-server start
6. 查看插件
rabbitmq-plugins list
- 生產(chǎn)者纸颜,設(shè)置Header屬性x-delay過期時間
/**
* @Description 插件延遲隊列功能
* @Author jxb
* @Date 2019-04-02 14:07:25
*/
@RequestMapping(value = "/directPluginDelayMQ", method = {RequestMethod.GET})
public List<User> directPluginDelayMQ() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
List<User> users = userService.getUserList(null);
for (User user : users) {
CorrelationData correlationData = new CorrelationData(String.valueOf(user.getId()));
rabbitTemplate.convertAndSend("direct.plugin.delay.exchange", "PluginDelayKey", user,
message -> {
// 設(shè)置5秒過期
message.getMessageProperties().setHeader("x-delay",5000);
return message;
},
correlationData);
System.out.println(user.getName() + ":" + sdf.format(new Date()));
}
return users;
}
- 消費者兽泣,設(shè)置x-delayed-message類型的交換器,增加參數(shù)x-delayed-type為direct
/**
* @Description 插件延遲隊列
* @Author jxb
* @Date 2019-04-04 16:34:28
*/
@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "direct.plugin.delay.queue"), exchange = @Exchange(value = "direct.plugin.delay.exchange",type = "x-delayed-message",arguments = {@Argument(name="x-delayed-type",value = "direct")}), key = {"PluginDelayKey"})})
public void getPDLMessage(User user, Channel channel, Message message) throws InterruptedException, IOException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// 模擬執(zhí)行任務(wù)
System.out.println("這是插件延遲隊列消費:" + user.getName() + ":" + sdf.format(new Date()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
- 插件形式交換器
注:用代碼是創(chuàng)建一個:CustomExchange自定義交換器胁孙,類型一定要設(shè)置成:x-delayed-message
注:如果配置了發(fā)送回調(diào)ReturnCallback唠倦,插件延遲隊列則會回調(diào)該方法称鳞,因為發(fā)送方確實沒有投遞到隊列上,只是在交換器上暫存稠鼻,等過期時間到了 才會發(fā)往隊列冈止。
- SpringBoot集成RabbitMQ常用配置(非本系列用)
#rabbitmq
spring.rabbitmq.host=192.168.89.168
spring.rabbitmq.port=5672
spring.rabbitmq.username=fzb
spring.rabbitmq.password=fzb2019
spring.rabbitmq.virtual-host=fzb_host
#消費者數(shù)量
spring.rabbitmq.listener.simple.concurrency=10
#最大消費者數(shù)量
spring.rabbitmq.listener.simple.max-concurrency=10
#消費者每次從隊列獲取的消息數(shù)量。寫多了候齿,如果長時間得不到消費熙暴,數(shù)據(jù)就一直得不到處理
spring.rabbitmq.listener.simple.prefetch=1
#消費者自動啟動
spring.rabbitmq.listener.simple.auto-startup=true
#消費者消費失敗,自動重新入隊
spring.rabbitmq.listener.simple.default-requeue-rejected=true
#啟用發(fā)送重試 隊列滿了發(fā)不進(jìn)去時啟動重試
spring.rabbitmq.template.retry.enabled=true
#1秒鐘后重試一次
spring.rabbitmq.template.retry.initial-interval=1000
#最大重試次數(shù) 3次
spring.rabbitmq.template.retry.max-attempts=3
#最大間隔 10秒鐘
spring.rabbitmq.template.retry.max-interval=10000
#等待間隔 的倍數(shù)慌盯。如果為2 第一次 乘以2 等1秒周霉, 第二次 乘以2 等2秒 ,第三次 乘以2 等4秒
spring.rabbitmq.template.retry.multiplier=1.0
做一個有趣的人亚皂,讓生活更好玩一些