什么是延時隊列
延遲隊列首先它是一個隊列牺蹄,作為隊列它的第一個特征是有序的,而之所以它被稱為延時隊列它還有一個更重要的特性就是延時。對于普通隊列而言鞭达,如果有消費者訂閱隊列消費,則消費者可以立刻從隊列中獲取到元素皇忿。而作為延時隊列畴蹭,消費者訂閱了該隊列也無法從該隊列中獲取元素,必須延時結(jié)束才能從隊列中取出元素鳍烁。
延時隊列的使用場景
延時隊列的應(yīng)用場景很多叨襟,舉一個我們最常見的例子。例如在一個商場應(yīng)用中幔荒,用戶向服務(wù)器提交了訂單未支付糊闽,對于還未超時的訂單我們需要提醒用戶訂單還未支付梳玫,對于超時的訂單我們需要將他們關(guān)閉。
上面的業(yè)務(wù)場景非常常見右犹,我們可以通過定時掃描訂單表找到需要提醒的訂單和未支付的訂單提澎,然后對它們進行處理。但是這種方式有兩個缺點念链,第一我們需要掃描整個訂單庫效率上不高盼忌,同時數(shù)據(jù)量上去之后對數(shù)據(jù)庫的壓力較大。第二它的時效性不強掂墓,因為我們是通過定時任務(wù)去發(fā)現(xiàn)需要處理的訂單谦纱,如果定時任務(wù)間隔太久就會導(dǎo)致不能及時處理訂單。如果定時任務(wù)太頻繁君编,對服務(wù)器的性能壓力太大跨嘉。總結(jié)來說通過定時任務(wù)掃表的方式優(yōu)點就是足夠簡單啦粹,當(dāng)是它的致命確定就是效率太低偿荷。
Java中提供了DelayedQueue,它可以實現(xiàn)延時隊列的效果唠椭,它解決了上面效率低和延遲的缺點跳纳。但是它也不是完美無缺的,使用Java中提供的延遲隊列我們需要考慮隊列元素數(shù)據(jù)的持久化贪嫂。另一個點在于它的數(shù)據(jù)存在JVM的內(nèi)存中寺庄,如果數(shù)據(jù)量太大它會導(dǎo)致應(yīng)用占用的內(nèi)存過大。
而我們可以通過RabbitMQ來實現(xiàn)延遲隊列力崇,它不僅解決了定時掃表方案的效率低和延遲大的缺點斗塘,同時還解決了DelayedQueue需要處理數(shù)據(jù)持久化等繁瑣的問題。
RabbitMQ中如何配置延時隊列
RabbitMQ中可以使用兩種方式來實現(xiàn)延遲隊列亮靴,其一是通過在Queue上設(shè)置TTL來實現(xiàn)馍盟,另外一種則是通過在消息中設(shè)置expiration來實現(xiàn)。不過僅依靠這兩種方式還不能實現(xiàn)延時隊列茧吊,還需要配合死信隊列才行贞岭。關(guān)于什么是死信隊列可以參考《RabbitMQ中的死信隊列》該文中的介紹。
設(shè)置隊列的TTL
在定義隊列時搓侄,我們可以在隊列的定義中加入x-message-ttl參數(shù)瞄桨,當(dāng)消息被推送到該隊列,如果該隊列上沒有消費者消費讶踪,到了x-message-ttl設(shè)定的超時時間芯侥,消息會從隊列中移除。如果隊列上有消費者消費乳讥,它并不會等到x-message-ttl設(shè)定的時間超時才從隊列中移除柱查,而是立馬被訂閱的消費者消費廓俭。
所以我在前面說過,我們需要定義一個死信隊列物赶,即在隊列中增加x-dead-letter-exchange和x-dead-letter-routing-key參數(shù)白指,而我們的消費者不訂閱該隊列,而是訂閱死信隊列酵紫。這樣當(dāng)消息到了TTL設(shè)定的時間時告嘲,它會被推送到死信隊列,而我們的消費者訂閱了死信隊列奖地。正是通過這種方式橄唬,我們實現(xiàn)了延時隊列的效果。
/**
* 正常的業(yè)務(wù)Exchange和Queue
*/
channel.exchangeDeclare(Config.ORDER_EXCHANGE, BuiltinExchangeType.DIRECT, true);
Map<String, Object> arg = new HashMap<>();
arg.put("x-dead-letter-exchange", Config.DELAYED_EXCHANGE);
arg.put("x-dead-letter-routing-key", Config.DELAYED_ROUTING_KEY);
arg.put("x-message-ttl",10000);
channel.queueDeclare(Config.ORDER_QUEUE, true, false, false, arg);
channel.queueBind(Config.ORDER_QUEUE,Config.ORDER_EXCHANGE,Config.ORDER_ROUTING_KEY);
/**
* 死信Exchange和Queue
*/
channel.exchangeDeclare(Config.DELAYED_EXCHANGE, BuiltinExchangeType.DIRECT,true);
channel.queueDeclare(Config.DELAYED_QUEUE,true,false,false,new HashMap<>());
channel.queueBind(Config.DELAYED_QUEUE,Config.DELAYED_EXCHANGE,Config.DELAYED_ROUTING_KEY);
上面的代碼用圖表示如下:
客戶端將消息推送到A部分的交換機(order.exchange)参歹,然后通過路由鍵到隊列(order.queue)仰楚。隊列中的消息到了超時時間,則從A中的隊列出隊犬庇,進入死信隊列僧界。最后我們的消費者訂閱B部分的隊列(delayed.queue),從B隊列部分獲取消息臭挽。這就是整個消息延時隊列的基本原理捂襟。
向A中的交換機發(fā)送消息
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String content = sdf.format(new Date());
log.info("生產(chǎn)者發(fā)送消息:{}", content);
byte[] msg = content.getBytes(StandardCharsets.UTF_8);
channel.basicPublish(Config.ORDER_EXCHANGE, Config.ORDER_ROUTING_KEY, new AMQP.BasicProperties(), msg);
消費者訂閱B部分的隊列
channel.basicConsume(Config.DELAYED_QUEUE, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String content = sdf.format(new Date());
String msg = new String(body, StandardCharsets.UTF_8);
log.info("消費者收到消息:{},當(dāng)前時間:{}", msg, content);
}
});
測試生產(chǎn)者打印日志如下:
生產(chǎn)者發(fā)送消息:2021-05-12 10:01:31
消費者打印日志如下:
消費者收到消息:2021-05-12 10:01:31,當(dāng)前時間:2021-05-12 10:01:41
通過在隊列中設(shè)置x-message-ttl配合死信隊列可以實現(xiàn)延時隊列的功能,但是它存在一個缺陷欢峰。我們的超時時間是設(shè)置在隊列上的葬荷,如果我現(xiàn)在有多個不同時長的延時的需求,使用這種方式實現(xiàn)起來就比較麻煩纽帖。對于不同時長的延時宠漩,我需要設(shè)置對應(yīng)的隊列,如果每個消息都有自己的延時時間懊直,總不能每個消息給他創(chuàng)建一個隊列吧扒吁。
設(shè)置消息的expiration
上面我們說了在隊列中設(shè)置TTL的缺陷,而RabbitMQ對此作了進一步的優(yōu)化室囊。我們不僅可以將超時時間設(shè)置在隊列中瘦陈,還可以設(shè)置在消息上,只需要在消息的properties中添加expiration即可波俄。如果即在隊列中設(shè)置了x-message-ttl,又在消息上設(shè)置了expiration蛾默,應(yīng)用時都是以最小值為準(zhǔn)的懦铺。
還是之前的代碼,修改生產(chǎn)者相關(guān)代碼支鸡,在消息的properties中添加expiration為5000冬念,代碼如下:
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String content = sdf.format(new Date());
log.info("生產(chǎn)者發(fā)送消息:{}", content);
byte[] msg = content.getBytes(StandardCharsets.UTF_8);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration("5000").build();
channel.basicPublish(Config.ORDER_EXCHANGE, Config.ORDER_ROUTING_KEY, properties, msg);
運行程序趁窃,生產(chǎn)者打印日志如下:
生產(chǎn)者發(fā)送消息:2021-05-12 10:30:40
消費者打印日志如下:
消費者收到消息:2021-05-12 10:30:40,當(dāng)前時間:2021-05-12 10:30:45
但是這種方式也存在缺陷,如果我向隊列中依次推入兩條消息急前,第一條消息的expiration為5000醒陆,第二條消息的expiration為3000,代碼如下:
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
for (Integer delayedTime : delayedTimes) {
String content = String.format("消息時間:[%s],延時[%d]s", sdf.format(new Date()), delayedTime);
log.info(content);
byte[] msg = content.getBytes(StandardCharsets.UTF_8);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration(String.valueOf(delayedTime * 1000)).build();
channel.basicPublish(Config.ORDER_EXCHANGE, Config.ORDER_ROUTING_KEY, properties, msg);
}
運行程序裆针,生產(chǎn)者打印日志內(nèi)容如下:
消息時間:[2021-05-12 10:46:17],延時[5]s
消息時間:[2021-05-12 10:46:17],延時[3]s
消費者打印日志如下:
消費者收到消息:消息時間:[2021-05-12 10:46:17],延時[5]s,當(dāng)前時間:2021-05-12 10:46:22
消費者收到消息:消息時間:[2021-05-12 10:46:17],延時[3]s,當(dāng)前時間:2021-05-12 10:46:22
從打印的日志可以看出刨摩,第二條消息不是延時3秒后才會消費,而是延時了5秒世吨。這是因為RabbitMQ只會檢查第一個消息是否過期澡刹,如果過期了就會將數(shù)據(jù)丟到死信隊列,而第二個消息即使已經(jīng)過期了耘婚,RabbitMQ也不會將它丟到死信隊列罢浇。這就導(dǎo)致了第二個消息我們設(shè)置的3秒最后變成了5秒。
rabbitmq-delayed-message-exchange插件
通過上面我們了解了可以通過設(shè)置隊列或者消息的超時時間來實現(xiàn)延時隊列的功能沐祷,但是他們或多或少都有部分缺陷嚷闭。對于在隊列上設(shè)置超時時間來說,如果延時的時間不固定處理起來比較麻煩赖临。對于在消息中設(shè)置超時時間的方式來說胞锰,它會受到隊列中第一個消息的影響,導(dǎo)致消息已經(jīng)超時但是還在隊列中無法出隊思杯。
還好在RabbitMQ中提供了一個rabbitmq-delayed-message-exchange插件胜蛉,通過這個插件就能解決上面的問題。
插件安裝
該插件不是RabbitMQ本身就有的插件色乾,我們需要先去下載該插件誊册,然后將其放入到RabbitMQ安裝目錄下的plugins目錄中,然后執(zhí)行下面命令:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
如果一切正常的話會顯示安裝成功暖璧,然后重啟RabbitMQ服務(wù)即可案怯。
如何使用
如何使用也比較簡單,首先我們定義一個Exchange澎办,這個Exchange的類型為x-delayed-message嘲碱,這個類型就是插件提供的一個類型。這個類型不是RabbitMQ默認(rèn)的那幾種類型之一局蚀,而原先的Exchange類型變成在arguments中設(shè)置x-delayed-type麦锯。相關(guān)代碼如下所示:
Map<String,Object> argMap = new HashMap<>();
argMap.put("x-delayed-type","direct");
channel.exchangeDeclare(Config.PLUGINS_EXCHANGE,"x-delayed-message",true,false,argMap);
channel.queueDeclare(Config.PLUGINS_QUEUE,true,false,false,new HashMap<>());
channel.queueBind(Config.PLUGINS_QUEUE,Config.PLUGINS_EXCHANGE,Config.PLUGINS_ROUTING_KEY);
這里面定義隊列和綁定與正常使用沒有差別。想要實現(xiàn)延時效果琅绅,我們只需要在發(fā)送消息時在headers中添加x-delay即可扶欣,如果不需要實現(xiàn)延時效果不設(shè)置即可。
List<Integer> delayedTimes = Arrays.asList(5, 2, 3, 4, 1);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
for (Integer delayedTime : delayedTimes) {
String content = String.format("消息時間:[%s],延時[%d]s", sdf.format(new Date()), delayedTime);
log.info(content);
byte[] msg = content.getBytes(StandardCharsets.UTF_8);
Map<String,Object> headers = new HashMap<>();
headers.put("x-delay",delayedTime * 1000);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().headers(headers).build();
channel.basicPublish(Config.PLUGINS_EXCHANGE, Config.PLUGINS_ROUTING_KEY, properties, msg);
}
上面示例代碼中,生產(chǎn)者向Exchange推送5條消息料祠,且每條消息的延時時間是不同的骆捧,最后我們添加消費者代碼如下:
channel.basicConsume(Config.PLUGINS_QUEUE, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String content = sdf.format(new Date());
String msg = new String(body, StandardCharsets.UTF_8);
log.info("消費者收到消息:{},當(dāng)前時間:{}", msg, content);
}
});
對于消費者代碼來說,與正常的方式?jīng)]有區(qū)別髓绽。最后生產(chǎn)者和消費者打印的日志如下:
消息時間:[2021-05-12 13:43:56],延時[5]s
消息時間:[2021-05-12 13:43:56],延時[2]s
消息時間:[2021-05-12 13:43:56],延時[3]s
消息時間:[2021-05-12 13:43:56],延時[4]s
消息時間:[2021-05-12 13:43:56],延時[1]s
消費者打印的日志如下:
消費者收到消息:消息時間:[2021-05-12 13:43:56],延時[1]s,當(dāng)前時間:2021-05-12 13:43:58
消費者收到消息:消息時間:[2021-05-12 13:43:56],延時[2]s,當(dāng)前時間:2021-05-12 13:43:58
消費者收到消息:消息時間:[2021-05-12 13:43:56],延時[3]s,當(dāng)前時間:2021-05-12 13:44:00
消費者收到消息:消息時間:[2021-05-12 13:43:56],延時[4]s,當(dāng)前時間:2021-05-12 13:44:01
消費者收到消息:消息時間:[2021-05-12 13:43:56],延時[5]s,當(dāng)前時間:2021-05-12 13:44:01
從打印結(jié)果可以看出敛苇,生產(chǎn)者生產(chǎn)的第一條消息需要延時5秒再從隊列中出隊,但是它并沒有影響后續(xù)時間短的出隊顺呕,實現(xiàn)了我們想要的效果枫攀。
機制
其實該插件的機制比較好理解,也不需要借助死信隊列來完成塘匣,相對于設(shè)置隊列超時和消息超時的方式來說更加簡單脓豪。當(dāng)Exchange接收到消息后并未立即將消息投遞至目標(biāo)隊列中,而是存儲在mnesia(一個分布式數(shù)據(jù)系統(tǒng))表中忌卤,檢測消息延遲時間扫夜。如達到可投遞時間時并將其通過x-delayed-type類型標(biāo)記的交換機類型投遞至目標(biāo)隊列。
總結(jié)
簡單來說使用RabbitMQ來實現(xiàn)延時隊列我們可以在不使用插件的情況下通過TTL+死信隊列來實現(xiàn)驰徊,不過這種實現(xiàn)有部分局限性笤闯。如果有必要,我們可以通過安裝插件的機制來解決這部分局限性棍厂。在實際應(yīng)用中颗味,我們可以根據(jù)自己的需求來決定使用哪種方案。
相關(guān)資源
文中示例代碼:https://gitee.com/zengchao_workspace/rabbit-mq-demo
官網(wǎng)關(guān)于TTL的介紹:Time-To-Live and Expiration
rabbitmq-delayed-message-exchange插件地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange