應用場景
- 延遲發(fā)送短信
- 用戶下單叶堆,30分鐘超時未支付,取消訂單
- 預約工作會議斥杜,20分鐘后自動通知所有參會人員
解決方案
- 方案一:通過
死信隊列
和設置TTL
超時時間虱颗,故意讓消息超時未消費(相當于延時再處理),讓消息投遞到死信隊列蔗喂,然后處理死信消息時忘渔,進行延時后的處理 - 方案二:使用RabbitMQ官方提供的
DelayExchange
插件
安裝RabbitMQ
- 我使用的RabbitMQ,是使用Docker進行安裝的缰儿,如果你也想使用Docker安裝畦粮,可以參考一下
- Centos7 + Docker
下載RabbitMQ鏡像
docker pull rabbitmq:3.8-management
安裝RabbitMQ
- 這里的命令,映射了RabbitMQ的plugin插件目錄為
mq-plugins
乖阵,待會要使用這個名稱去查詢真實目錄地址
docker run \
-v mq-plugins:/plugins \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3.8-management
下載插件
- 插件社區(qū)地址:https://www.rabbitmq.com/community-plugins.html
- 使用方式:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq/
注意:插件的版本要和安裝的RabbitMQ的版本相搭配宣赔,否則可能會有意外的問題產生!
- 例如我使用的RabbitMQ版本是
3.8.5
瞪浸,而DelayExchange
插件則需要使用3.8.9
儒将,這個版本的插件適用于RabbitMQ3.8.5
及其以上版本。其他版本可去Github
中下載 - 插件地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/3.8.9
安裝插件
上傳插件
- 查看剛才映射的
mq-plugins
目錄的真實目錄路徑
docker volume inspect mq-plugins
查看輸出的信息中的
Mountpoint
字段对蒲,我的是/var/lib/docker/volumes/mq-plugins/_data
使用
FinalShell
或其他shell工具钩蚊,將下載好的DelayExchange
插件壓縮包,拖拽上傳到/var/lib/docker/volumes/mq-plugins/_data
目錄去(注:不要解壓)
安裝插件
- 進入RabbitMQ容器內部齐蔽,
-it
后面的mq
為剛才使用Docker安裝RabbitMQ時,起的名稱床估,請改成你定義的名稱
docker exec -it mq bash
- 進入容器內部后含滴,使用命令安裝插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- 等待命令執(zhí)行,輸出
The following plugins have been enabled: rabbitmq_delayed_message_exchange
丐巫,即為成功安裝插件
DelayExchange插件的原理
DelayExchange
需要將一個交換機聲明為delayed
類型。當我們發(fā)送消息到delayExchange時陷虎,流程如下:
- 接收消息
- 判斷消息是否具備
x-delay
屬性 - 如果有
x-delay
屬性谣拣,說明是延遲消息
,持久化到硬盤赡茸,讀取x-delay
值,作為延遲時間 - 返回
routing not found
結果給消息發(fā)送者 -
x-delay
時間到期后祝闻,重新投遞消息到指定隊列
使用插件
聲明一個交換機占卧,交換機的類型可以是任意類型,只需要設定delayed
屬性為true
联喘,然后聲明隊列與其綁定即可
聲明DelayExchange交換機
- 方式一:基于注解方式(推薦)
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delayed.queue", durable = "true"),
exchange = @Exchange(name = "delayed.direct",delayed = "true"),
key = "delayed"
))
public void listenDelayedQueue(String msg){
log.info("接收到 delayed.queue的延遲消息:{}", msg);
}
- 方式二:基于@Bean的方式
// 交換機和隊列的配置
@Configuration
public class DelayedConfig {
// 創(chuàng)建延遲交換機
@Bean
public DirectExchange delayedExchange() {
return ExchangeBuilder.directExchange("delayed.direct")
// 聲明延遲屬性
.delayed()
.build();
}
// 創(chuàng)建延遲隊列
@Bean
public Queue delayedQueue() {
return new Queue("delayed.queue");
}
// 綁定交換機和隊列
@Bean
public Binding delayedBinding() {
return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delayed");
}
}
// 監(jiān)聽器
@Component
@Slf4j
public class SpringRabbitListener {
@RabbitListener(queues = "delayed.queue")
public void listenSimpleQueue(Message msg) throws Exception {
log.info("接收到 delayed.queue的延遲消息:{}", msg);
}
}
發(fā)送延遲消息
- 發(fā)送消息時华蜒,一定要攜帶
x-delay
屬性,指定延遲的時間 - 下面使用一個Controller的接口來測試豁遭,傳入秒值叭喜,發(fā)送一個延遲消息到交換機中
- 例如:
http://localhost:8001/msg/sendDelayed/3
,消息將會延遲3秒后發(fā)送給消費者的監(jiān)聽器
@GetMapping("/sendDelayed/{time}")
public ResponseEntity sendDelayed(@PathVariable("time") Integer time) {
String exchange = "delayed.direct";
Message message = MessageBuilder.withBody("delayed message".getBytes())
// 設置延時時間蓖谢,時間單位為毫秒值
.setHeader("x-delay", time * 1000)
.build();
rabbitTemplate.send(exchange, "delayed", message);
return ResponseEntity.ok("success" + new Date());
}
總結
- 要使用
DelayExchange
插件的步驟- 聲明一個交換機捂蕴,添加
delayed
屬性為true
,表明是一個延遲消息交換機 - 發(fā)送消息時闪幽,添加
x-delay
頭啥辨,值為超時時間,單位為毫秒
- 聲明一個交換機捂蕴,添加