延時(shí)隊(duì)列我們可以簡(jiǎn)單粗暴的理解它為延時(shí)發(fā)送消息的隊(duì)列
那延時(shí)隊(duì)列的應(yīng)用場(chǎng)景有哪些呢飞几,比如訂單在一段時(shí)間內(nèi)未支付則取消訂單掏击,就是需要在某個(gè)事件發(fā)生之后或者之前的某個(gè)時(shí)間點(diǎn)完成另一事件。
這種場(chǎng)景講道理可以用定時(shí)器來完成泵琳,但是如果有些事件的時(shí)間點(diǎn)需要精確到秒真屯,我們就需要每秒輪詢一次,或者在數(shù)據(jù)量較大的時(shí)候倍试,定時(shí)器可能需要跑很久讯屈,給系統(tǒng)帶來很大壓力!(好了就這把县习,編不下去了涮母。。QAQ
那rabbitMQ的TTL只是消息在隊(duì)列中的存活時(shí)間准颓,并不是說消息在存活一段時(shí)間后才會(huì)發(fā)送給消費(fèi)者哈蝇,所以并不能只用TTL來實(shí)現(xiàn)延時(shí)隊(duì)列,還需要借助死信隊(duì)列攘已。
生產(chǎn)者發(fā)送一條延時(shí)消息到延時(shí)隊(duì)列炮赦,當(dāng)消息過期又會(huì)被路由到死信隊(duì)列,消費(fèi)者只要監(jiān)聽死信隊(duì)列即可样勃。
配置類
@Configuration
public class DelayMQConfig {
/**
* 延時(shí)隊(duì)列交換機(jī)
*/
public static final String DELAY_EXCHANGE_NAME = "delay.exchange";
/**
* 延時(shí)隊(duì)列
*/
public static final String DELAY_QUEUE_NAME = "delay.queue";
/**
* 死信交換機(jī)
*/
public static final String DEAD_EXCHANGE_NAME = "dead.exchange";
/**
* 死信隊(duì)列
*/
public static final String DEAD_QUEUE_NAME = "dead.queue";
/**
* 聲明延時(shí)交換機(jī)
*/
@Bean("delayExchange")
public DirectExchange delayExchange(){
return new DirectExchange(DELAY_EXCHANGE_NAME);
}
/**
* 聲明延時(shí)隊(duì)列吠勘,并綁定到死信交換機(jī)上
*/
@Bean("delayQueue")
public Queue delayQueue(){
Map<String, Object> args = new HashMap<>(2);
//綁定死信交換機(jī)
args.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
//路由
args.put("x-dead-letter-routing-key", "dead.routing.key");
//延時(shí)10S
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(DELAY_QUEUE_NAME).withArguments(args).build();
}
/**
* 延時(shí)隊(duì)列綁定到延時(shí)隊(duì)列交換機(jī)
*
**/
@Bean
public Binding delayBinding(@Qualifier("delayQueue")Queue queue,
@Qualifier("delayExchange")DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("delay.routing.key");
}
/**
* 聲明死信交換機(jī)
*/
@Bean("deadExchange")
public DirectExchange deadExchange(){
return new DirectExchange(DEAD_EXCHANGE_NAME);
}
@Bean("deadQueue")
public Queue deadQueue(){
return new Queue(DEAD_QUEUE_NAME);
}
@Bean
public Binding deadBinding(@Qualifier("deadQueue")Queue queue,
@Qualifier("deadExchange")DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("dead.routing.key");
}
}
截圖看一下其中一個(gè)
消費(fèi)者
@Component
public class DelayListener {
private Logger logger = LoggerFactory.getLogger(getClass());
@RabbitListener(queues = DelayMQConfig.DEAD_QUEUE_NAME)
public void receive(Message message, Channel channel) throws Exception{
String msg = new String(message.getBody());
logger.info("收到消息的時(shí)間:{},收到的消息內(nèi)容:{}", new Date().toString(), msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
生產(chǎn)者
@PostMapping("delayMsgSend")
public void delayMsgSend(@RequestParam("msg")String msg){
rabbitTemplate.convertSendAndReceive(DelayMQConfig.DELAY_EXCHANGE_NAME,"delay.routing.key",msg);
}
發(fā)送一個(gè)消息峡眶,日志如下
2020-10-13 10:47:05.465 INFO 44044 --- [nio-9000-exec-1] com.lyy.study.controller.MyController : 發(fā)送消息的時(shí)間:Tue Oct 13 10:47:05 CST 2020剧防,發(fā)送的消息內(nèi)容:delayMsgSend
2020-10-13 10:47:05.532 INFO 44044 --- [nio-9000-exec-1] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService
2020-10-13 10:47:05.545 INFO 44044 --- [nio-9000-exec-1] .l.DirectReplyToMessageListenerContainer : Container initialized for queues: [amq.rabbitmq.reply-to]
2020-10-13 10:47:05.627 INFO 44044 --- [nio-9000-exec-1] .l.DirectReplyToMessageListenerContainer : SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-KMeIYI6Pcun_gzAm5AYe3w identity=49703e41] started
2020-10-13 10:47:15.675 INFO 44044 --- [ntContainer#0-1] com.lyy.study.mq.listener.DelayListener : 收到消息的時(shí)間:Tue Oct 13 10:47:15 CST 2020,收到的消息內(nèi)容:delayMsgSend
到此一個(gè)簡(jiǎn)單的延時(shí)隊(duì)列就實(shí)現(xiàn)了辫樱。
不過更多的時(shí)候峭拘,我們其實(shí)需要的是給每一個(gè)消息添加自己的TTL,那么就會(huì)有一個(gè)問題拉,假如我們先發(fā)送了一個(gè)10S的延時(shí)消息鸡挠,又發(fā)送了一個(gè)5S的延時(shí)消息辉饱,預(yù)期的結(jié)果應(yīng)該是先消費(fèi)5S的那個(gè)消息,然而實(shí)際的結(jié)果是10S的消息先被消費(fèi)了拣展,才會(huì)消費(fèi)5S的那個(gè)消息(這個(gè)問題在TTL那篇文章有提過)彭沼,那怎么辦呢?
不要慌备埃,https://www.rabbitmq.com/community-plugins.html從官網(wǎng)上下載一個(gè)叫rabbitmq_delayed_message_exchange的插件就好了姓惑!
下載之后解壓放到rabbitMQ的插件目錄
然后進(jìn)入sbin目錄,執(zhí)行以下命令
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
重啟就OK了按脚!
驗(yàn)證的部分我就不寫了于毙,懶惰.jpg