在日常工作中我們常常需要使用到分布式延時(shí)隊(duì)列灸叼,本文淺談目前筆者使用過(guò)的幾種實(shí)現(xiàn)方式遵馆。
一蜡塌、rabbitMQ(TTL+死信隊(duì)列)
利用rabbitMQ的高級(jí)特性——TTL(Time To Live),對(duì)延時(shí)隊(duì)列設(shè)置TTL适室,當(dāng)消息過(guò)期后自動(dòng)路由到對(duì)應(yīng)的死信隊(duì)列嫡意,消費(fèi)者監(jiān)控死信隊(duì)列即可,如圖捣辆。
@Configuration
public class RabbitMQConfig {
/**
* pay延時(shí)exchange
*/
public static final String PAY_DELAY_EXCHANGE_NAME = "delay.queue.pay.business.exchange";
/**
* pay死信Exchange
*/
private static final String PAY_DEAD_LETTER_EXCHANGE = "delay.queue.pay.deadletter.exchange";
/**
* pay 延時(shí)隊(duì)列 1 2 3
*/
private static final String PAY_DELAY_QUEUE_ONE_NAME = "delay.queue.pay.business.queueOne";
private static final String PAY_DELAY_QUEUE_TWO_NAME = "delay.queue.pay.business.queueTwo";
private static final String PAY_DELAY_QUEUE_THREE_NAME = "delay.queue.pay.business.queueThree";
private static final String PAY_DELAY_QUEUE_FOUR_NAME = "delay.queue.pay.business.queueFour";
private static final String PAY_DELAY_QUEUE_FIVE_NAME = "delay.queue.pay.business.queueFive";
/**
* pay 死信隊(duì)列
*/
public static final String PAY_DEAD_LETTER_QUEUE_NAME = "delay.queue.pay.deadletter.queue";
/**
* pay 死信隊(duì)列 路由鍵
*/
private static final String PAY_DEAD_LETTER_QUEUE_ROUTING_KEY = "delay.queue.pay.deadletter.delay.routingkey";
/**
* pay延時(shí)隊(duì)列 路由鍵
*/
public static final String PAY_DELAY_QUEUE_ONE_ROUTING_KEY = "delay.queue.pay.business.queueone.routingkey";
public static final String PAY_DELAY_QUEUE_TWO_ROUTING_KEY = "delay.queue.pay.business.queuetwo.routingkey";
public static final String PAY_DELAY_QUEUE_THREE_ROUTING_KEY = "delay.queue.pay.business.queuethree.routingkey";
public static final String PAY_DELAY_QUEUE_FOUR_ROUTING_KEY = "delay.queue.pay.business.queuefour.routingkey";
public static final String PAY_DELAY_QUEUE_FIVE_ROUTING_KEY = "delay.queue.pay.business.queuefive.routingkey";
/**
* pay 延時(shí)exchange到死信隊(duì)列 路由鍵
*/
public static final String PAY_DELAY_QUEUE_DEAD_ROUTING_KEY = "delay.queue.pay.business.queuedead.routingkey";
/**
* 付款 聲明延時(shí)Exchange
*
* @return
*/
@Bean("delayExchange")
public DirectExchange delayExchange() {
return new DirectExchange(PAY_DELAY_EXCHANGE_NAME);
}
/**
* 付款 聲明死信Exchange
*
* @return
*/
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange() {
return new DirectExchange(PAY_DEAD_LETTER_EXCHANGE);
}
/**
* 聲明延時(shí)隊(duì)列A 延時(shí)1m
* 并綁定到對(duì)應(yīng)的死信交換機(jī)
*
* @return
*/
@Bean("delayQueueOne")
public Queue delayQueueOne() {
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 這里聲明當(dāng)前隊(duì)列綁定的死信交換機(jī)
args.put("x-dead-letter-exchange", PAY_DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 這里聲明當(dāng)前隊(duì)列的死信路由key
args.put("x-dead-letter-routing-key", PAY_DEAD_LETTER_QUEUE_ROUTING_KEY);
// x-message-ttl 聲明隊(duì)列的TTL
args.put("x-message-ttl", 60000);
return QueueBuilder.durable(PAY_DELAY_QUEUE_ONE_NAME).withArguments(args).build();
}
/**
* 聲明延時(shí)隊(duì)列B 延時(shí) 2m
* 并綁定到對(duì)應(yīng)的死信交換機(jī)
*
* @return
*/
@Bean("delayQueueTwo")
public Queue delayQueueTwo() {
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 這里聲明當(dāng)前隊(duì)列綁定的死信交換機(jī)
args.put("x-dead-letter-exchange", PAY_DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 這里聲明當(dāng)前隊(duì)列的死信路由key
args.put("x-dead-letter-routing-key", PAY_DEAD_LETTER_QUEUE_ROUTING_KEY);
// x-message-ttl 聲明隊(duì)列的TTL
args.put("x-message-ttl", 120000);
return QueueBuilder.durable(PAY_DELAY_QUEUE_TWO_NAME).withArguments(args).build();
}
/**
* 聲明延時(shí)隊(duì)列C 延時(shí) 3m
* 并綁定到對(duì)應(yīng)的死信交換機(jī)
*
* @return
*/
@Bean("delayQueueThree")
public Queue delayQueueThree() {
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 這里聲明當(dāng)前隊(duì)列綁定的死信交換機(jī)
args.put("x-dead-letter-exchange", PAY_DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 這里聲明當(dāng)前隊(duì)列的死信路由key
args.put("x-dead-letter-routing-key", PAY_DEAD_LETTER_QUEUE_ROUTING_KEY);
// x-message-ttl 聲明隊(duì)列的TTL
args.put("x-message-ttl", 180000);
return QueueBuilder.durable(PAY_DELAY_QUEUE_THREE_NAME).withArguments(args).build();
}
/**
* 聲明延時(shí)隊(duì)列D 延時(shí) 4m
* 并綁定到對(duì)應(yīng)的死信交換機(jī)
*
* @return
*/
@Bean("delayQueueFour")
public Queue delayQueueFour() {
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 這里聲明當(dāng)前隊(duì)列綁定的死信交換機(jī)
args.put("x-dead-letter-exchange", PAY_DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 這里聲明當(dāng)前隊(duì)列的死信路由key
args.put("x-dead-letter-routing-key", PAY_DEAD_LETTER_QUEUE_ROUTING_KEY);
// x-message-ttl 聲明隊(duì)列的TTL
args.put("x-message-ttl", 240000);
return QueueBuilder.durable(PAY_DELAY_QUEUE_FOUR_NAME).withArguments(args).build();
}
/**
* 聲明延時(shí)隊(duì)列F 延時(shí) 5m
* 并綁定到對(duì)應(yīng)的死信交換機(jī)
*
* @return
*/
@Bean("delayQueueFive")
public Queue delayQueueFive() {
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 這里聲明當(dāng)前隊(duì)列綁定的死信交換機(jī)
args.put("x-dead-letter-exchange", PAY_DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 這里聲明當(dāng)前隊(duì)列的死信路由key
args.put("x-dead-letter-routing-key", PAY_DEAD_LETTER_QUEUE_ROUTING_KEY);
// x-message-ttl 聲明隊(duì)列的TTL
args.put("x-message-ttl", 300000);
return QueueBuilder.durable(PAY_DELAY_QUEUE_FIVE_NAME).withArguments(args).build();
}
/**
* 聲明死信隊(duì)列 用于接收延時(shí)處理的消息
*
* @return
*/
@Bean("deadLetterQueue")
public Queue deadLetterQueue() {
return new Queue(PAY_DEAD_LETTER_QUEUE_NAME);
}
/**
* 聲明延時(shí)隊(duì)列one綁定關(guān)系
*
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding delayBindingOne(@Qualifier("delayQueueOne") Queue queue,
@Qualifier("delayExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(PAY_DELAY_QUEUE_ONE_ROUTING_KEY);
}
/**
* 聲明業(yè)務(wù)隊(duì)列two綁定關(guān)系
*
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding delayBindingTwo(@Qualifier("delayQueueTwo") Queue queue,
@Qualifier("delayExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(PAY_DELAY_QUEUE_TWO_ROUTING_KEY);
}
/**
* 聲明業(yè)務(wù)隊(duì)列three綁定關(guān)系
*
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding delayBindingThree(@Qualifier("delayQueueThree") Queue queue,
@Qualifier("delayExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(PAY_DELAY_QUEUE_THREE_ROUTING_KEY);
}
/**
* 聲明業(yè)務(wù)隊(duì)列four綁定關(guān)系
*
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding delayBindingFour(@Qualifier("delayQueueFour") Queue queue,
@Qualifier("delayExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(PAY_DELAY_QUEUE_FOUR_ROUTING_KEY);
}
/**
* 聲明業(yè)務(wù)隊(duì)列five綁定關(guān)系
*
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding delayBindingFive(@Qualifier("delayQueueFive") Queue queue,
@Qualifier("delayExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(PAY_DELAY_QUEUE_FIVE_ROUTING_KEY);
}
/**
* 延時(shí)exchange到死信隊(duì)列
*
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding delayBindingDead(@Qualifier("deadLetterQueue") Queue queue,
@Qualifier("delayExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(PAY_DELAY_QUEUE_DEAD_ROUTING_KEY);
}
/**
* 聲明死信隊(duì)列綁定關(guān)系
*
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding deadLetterBindingQueue(@Qualifier("deadLetterQueue") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(PAY_DEAD_LETTER_QUEUE_ROUTING_KEY);
}
}