在上一篇文章中碟嘴,我們學習了死信隊列的相關內(nèi)容,文章最后我們提到力九,超時消息結合死信隊列也可以實現(xiàn)一個延時隊列耍铜。大致的流程是這樣的,如果正常業(yè)務隊列中的消息設置了過期時間跌前,并在消息過期后棕兼,讓消息流入一個死信隊列,然后消費者監(jiān)聽這個死信隊列抵乓,實現(xiàn)消息的延時處理伴挚,這樣就可以實現(xiàn)一個簡單的延時隊列。
上邊描述的延時隊列灾炭,其實是存在一些問題的茎芋,應付簡單的場景還行,如果需要獲得更加完善的功能體驗蜈出,可以選擇使用 RabbitMQ 提供的延時消息插件田弥。下邊我們分別了解兩種實現(xiàn)方式。
一铡原、準備工作
創(chuàng)建 SpringBoot 項目偷厦,添加依賴以及連接配置:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在application.properties
配置 RabbitMQ 服務的相關連接信息:
server.port=8083
# rabbitmq 相關配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/
二、延時隊列的簡單實現(xiàn)
首先創(chuàng)建過期消息的交換機燕刻、隊列只泼,以及死信交換機、隊列卵洗,并完成綁定配置请唱,讓過期消息可以流入死信隊列:
@Configuration
public class RabbitMQConfig {
// 創(chuàng)建過期消息的交換機
@Bean
DirectExchange ttlExchange() {
return new DirectExchange("ttl.exchange", true, false);
}
// 創(chuàng)建死信交換機
@Bean
DirectExchange deadLetterExchange() {
return new DirectExchange("dead.letter.exchange", true, false);
}
// 創(chuàng)建有過期時間的消息隊列,并配置死信隊列
@Bean
Queue ttlQueue() {
HashMap<String, Object> args = new HashMap<>();
// 設置隊列中消息的過期時間过蹂,單位毫秒
args.put("x-message-ttl", 10 * 60 * 1000);
// 設置死信交換機
args.put("x-dead-letter-exchange", "dead.letter.exchange");
// 設置死信交換機綁定隊列的routingKey
args.put("x-dead-letter-routing-key", "dead.letter");
return new Queue("ttl.queue", true, false, false, args);
}
// 創(chuàng)建死信隊列
@Bean
Queue deadLetterQueue() {
return new Queue("dead.letter.queue", true);
}
@Bean
Binding ttlBinding() {
return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}
@Bean
Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("dead.letter");
}
}
核心的內(nèi)容就是上邊的配置類了十绑,接下來就是發(fā)送消息到業(yè)務消息隊列,并只給死信隊列指定消費者榴啸,這樣發(fā)送的消息在正常業(yè)務隊列過期后孽惰,最終會流入死信隊列,進而被消費掉鸥印。生產(chǎn)者和消費者的代碼很簡單:
@Service
public class DelayedMessageSendService {
Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());
@Autowired
RabbitTemplate rabbitTemplate;
public void send(String message) {
rabbitTemplate.convertAndSend("ttl.exchange", "ttl", message);
logger.info("發(fā)送的業(yè)務消息:" + message);
}
}
@Service
public class DelayedMessageReceiveService {
Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());
@RabbitListener(queues = "dead.letter.queue")
public void receive(String message) {
logger.info("收到的延時消息:" + message);
}
}
如果我們有其它不同時間的延時業(yè)務需求勋功,就需要在配置類添加更多的和ttl.queue
類似配置的過期消息隊列,如果新的延時業(yè)務需求太多库说,新消息隊列的數(shù)量將不可控狂鞋。
那如果不給隊列配置消息的過期時間,而是在發(fā)送消息時單獨給每條消息配置呢潜的?這樣想想好像解決上邊的問題骚揍,實現(xiàn)的代碼如下:
@Service
public class DelayedMessageSendService {
Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());
@Autowired
RabbitTemplate rabbitTemplate;
public void send(String message, Integer delay) {
MessagePostProcessor processor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 設置消息的過期時間,單位毫秒
message.getMessageProperties().setExpiration(String.valueOf(delay));
return message;
}
};
rabbitTemplate.convertAndSend("ttl.exchange", "ttl", message, processor);
System.out.println("發(fā)送的業(yè)務消息:" + message);
}
}
消費者的代碼還是上邊的。
我們發(fā)送兩條消息測試一下:
delayedMessageSendService.send("hello world", 30000);
delayedMessageSendService.send("hello rabbitmq", 10000);
仔細觀察上邊的運行結果信不,按照預期hello rabbitmq
應該在10秒后先被消費嘲叔,然而由于我們先發(fā)送的hello world
消息設置的過期時間為30秒,導致hello rabbitmq
被阻塞抽活,直到30秒后陸續(xù)被消費掉硫戈。問題很明顯了,后發(fā)送消息的過期時間必須大于大于前邊已經(jīng)發(fā)送消息的過期時間下硕,這樣才能保證延時隊列正常工作丁逝,但實際使用中幾乎不能保證的。
可以看到梭姓,我們簡單實現(xiàn)的延時隊列雖然可用霜幼,但還是存在問題的。使用 RabbitMQ 延時消息插件誉尖,就不存在這些問題了罪既,可用性更高!
三释牺、RabbitMQ 延時消息插件
1萝衩、插件安裝
RabbitMQ 默認是沒有內(nèi)置延時消息插件的,需要我們單獨下載安裝没咙。下載地址入口:
https://www.rabbitmq.com/community-plugins.html
將下載好的插件放到 RabbitMQ 安裝目錄下的plugins
目錄,然后進入sbin
目錄千劈,我安裝的 Windows 版的 RabbitMQ祭刚,執(zhí)行rabbitmq-plugins.bat enable rabbitmq_delayed_message_exchange
命令來安裝插件:
安裝好后,重啟服務就可以使用了墙牌。
如果使用原生 Java client 操作延時消息插件涡驮,可以參考:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
2、用法介紹
首先創(chuàng)建延時消息交換機以及隊列喜滨,創(chuàng)建交換機和之前有所不同捉捅,需要使用CustomExchange
類
@Configuration
public class DelayedRabbitMQConfig {
@Bean
CustomExchange delayedExchange() {
HashMap<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange("delayed.exchange", "x-delayed-message", true, false, args);
}
@Bean
Queue delayedQueue() {
return new Queue("delayed.queue", true);
}
@Bean
Binding delayedBinding() {
return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delayed").noargs();
}
}
然后創(chuàng)建消費者,監(jiān)聽delayed.queue
:
@Service
public class DelayedMessageReceiveService {
Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());
@RabbitListener(queues = "delayed.queue")
public void receive2(String message) {
logger.info("收到的延時消息:" + message);
}
}
發(fā)送消息時指定消息要延時處理的時間:
@Service
public class DelayedMessageSendService {
Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());
@Autowired
RabbitTemplate rabbitTemplate;
/**
* @param message
* @param delay 延時時間虽风,單位毫秒
*/
public void send2(String message, Integer delay) {
MessagePostProcessor processor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 設置消息延時處理的時間
message.getMessageProperties().setDelay(delay);
return message;
}
};
rabbitTemplate.convertAndSend("delayed.exchange", "delayed", message, processor);
logger.info("發(fā)送的延時消息:" + message);
}
}
然后發(fā)送幾條消息棒口,測試下效果:
delayedMessageSendService.send2("hello world", 30000);
delayedMessageSendService.send2("hello rabbitmq", 10000);
delayedMessageSendService.send2("hello kitty", 20000);
最終的效果如下,符合我們的預期:
通過官方插件來實現(xiàn)延時消息隊列還是很簡單的辜膝。
使用延時隊列可以完成很多常見的需求无牵,比如預約商品開售前提醒用戶購買、下單后一定時間未支付則取消訂單厂抖、收到商品后一定時間還沒確認收貨則系統(tǒng)自動確認收貨等茎毁。
關于 RabbitMQ 延時隊列的內(nèi)容就介紹到這里了。
本文完忱辅!