一阿宅、場景介紹
1候衍、下單成功,30分鐘未支付洒放。支付超時蛉鹿,自動取消訂單
2、訂單簽收往湿,簽收后7天未進(jìn)行評價妖异。訂單超時未評價,系統(tǒng)默認(rèn)好評
3领追、下單成功他膳,商家5分鐘未接單,訂單取消
4绒窑、配送超時棕孙,推送短信提醒
5、三天會員試用期些膨,三天到期后準(zhǔn)時準(zhǔn)點通知用戶散罕,試用產(chǎn)品到期了
基于以上場景這個時候我們可能最先想到的應(yīng)該是采用定時任務(wù)去進(jìn)行輪訓(xùn)判斷,但是呢傀蓉,這個時間怎么確定才好呢欧漱,5分鐘。葬燎。1分鐘误甚。缚甩。執(zhí)行一次嗎。這樣就會非常影響性能窑邦。且時間誤差很大擅威。基于以上業(yè)務(wù)需要我們想到了有以下解決方案冈钦。
1郊丛、JDK延遲隊列,但是數(shù)據(jù)都在內(nèi)存中,重啟后什么都沒了瞧筛。
2厉熟、MQ中的延遲隊列,比如RocketMQ,但是社區(qū)版不能自定義较幌。
3揍瑟、elasticjob一次性動態(tài)任務(wù)
4、基于Redis實現(xiàn)
二乍炉、基于redis實現(xiàn)
redis也可以用來實現(xiàn)延時消息的功能绢片。理論上也有兩種方式
1、訂閱 key 過期事件(pub/sub)
2岛琼、使用 sorted-set 存儲消息底循,score為消息的過期時間
然而實際上訂閱過期事件存在諸多問題,所以并不合適:
1槐瑞、過期事件的不準(zhǔn)確此叠,過期時間只在key被刪除時才觸發(fā),并不是在key過期后就馬上刪除的
2随珠、pub/sub 不支持持久化,服務(wù)器宕機(jī)期間的事件會丟失
3猬错、pub/sub 存在丟失的可能窗看,線上使用的redis pub/sub 有丟失過消息(非過期時間)
4、所有的key過期都會發(fā)送過期事件倦炒,對redis性能有一定影響显沈。(除非單獨使用一個redis作為隊列服務(wù))
基于以上問題引出今天的主角redisson 的?RDelayedQueue ,下面就梳理下redis延時隊列的使用和原理逢唤。
redisson實際上是使用了 兩個隊列 + 一個 sorted-set + pub/sub 來實現(xiàn)延時隊列拉讯,而不是單一的sort-set,各自功能如下鳖藕。
1魔慷、sorted-set:存放未到期的消息&到期時間,提供消息延時排序功能
2著恩、兩個隊列:中轉(zhuǎn)隊列院尔;目標(biāo)隊列
a蜻展、目標(biāo)隊列
//目標(biāo)隊列-阻塞隊列-消費隊列,存放到期后的消息邀摆,提供消費
RBlockingQueue blockingQueue =redissonClient.getBlockingQueue(queueName());
b纵顾、中轉(zhuǎn)隊列
//中轉(zhuǎn)隊列-存放未到期消息,作為消息的原始順序視圖栋盹,提供如查詢施逾、刪除指定第幾條消息的功能
RDelayedQueue delayedQueue =redissonClient.getDelayedQueue(blockingQueue);
3、pub/sub:創(chuàng)建延時隊列的時候會創(chuàng)建一個QueueTransferTask,在里面會訂閱一個topic
在網(wǎng)上扒了一張延遲消息從發(fā)送到執(zhí)行的流程圖如下例获,
1汉额、首先創(chuàng)建延時隊列的時候,會創(chuàng)建一個QueueTransferTask, 在里面會訂閱一個topic躏敢,訂閱成功后闷愤,執(zhí)行其pushTask方法,里面會查詢sorted-set中100個已到期的消息件余,將其push到RBlockingQueue中讥脐,并從sorted-set和RDelayedQueue?中移除。(這里是為了投遞歷史未處理的消息)
2啼器、發(fā)送延時消息時旬渠,會將消息寫入RDelayedQueue? 和 sorted-set 中,msg會添加一個randomId端壳,支持發(fā)送相同的消息告丢。并且判斷sorted-set首調(diào)消息如果是剛插入的,則publish timeout(到期時間) 到 topic
3损谦、訂閱到topic消息后岖免,會先判斷其是否臨期(delay<10ms),如果是則調(diào)用pushTask方法(1中有說明)照捡,不是則啟動一個定時任務(wù)(使用的netty時間輪)颅湘,延時delay后執(zhí)行pushTask方法。
再結(jié)合源碼簡單描述一下:
1栗精、首選初始化兩個隊列,通過redissonClient的get方法創(chuàng)建的
getDelayedQueue方法里面實例化RedissonDelayedQueue對象闯参,我們看看RedissonDelayedQueue的代碼
org.redisson.RedissonDelayedQueue#RedissonDelayedQueue
會創(chuàng)建一個QueueTransferTask, 在里面會訂閱一個topic,訂閱成功后悲立,執(zhí)行其pushTask方法
看下pushTaskAsync方法鹿寨,通過lua腳本實現(xiàn),分析lua腳本里面會查詢sorted-set中100個已到期的消息薪夕,將其push到RBlockingQueue中脚草,并從sorted-set和RDelayedQueue?中移除。
@Override
? ? ? ? protected RFuture<Long> pushTaskAsync() {
? ? ? ? ? ? return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
? ? ? ? ? ? ? ? ? ? "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
? ? ? ? ? ? ? ? ? + "if #expiredValues > 0 then "
? ? ? ? ? ? ? ? ? ? ? + "for i, v in ipairs(expiredValues) do "
? ? ? ? ? ? ? ? ? ? ? ? ? + "local randomId, value = struct.unpack('dLc0', v);"
? ? ? ? ? ? ? ? ? ? ? ? ? + "redis.call('rpush', KEYS[1], value);"
? ? ? ? ? ? ? ? ? ? ? ? ? + "redis.call('lrem', KEYS[3], 1, v);"
? ? ? ? ? ? ? ? ? ? ? + "end; "
? ? ? ? ? ? ? ? ? ? ? + "redis.call('zrem', KEYS[2], unpack(expiredValues));"
? ? ? ? ? ? ? ? ? + "end; "
? ? ? ? ? ? ? ? ? ? // get startTime from scheduler queue head task
? ? ? ? ? ? ? ? ? + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "
? ? ? ? ? ? ? ? ? + "if v[1] ~= nil then "
? ? ? ? ? ? ? ? ? ? + "return v[2]; "
? ? ? ? ? ? ? ? ? + "end "
? ? ? ? ? ? ? ? ? + "return nil;",
? ? ? ? ? ? ? ? ? Arrays.<Object>asList(getRawName(), timeoutSetName, queueName),
? ? ? ? ? ? ? ? ? System.currentTimeMillis(), 100);
? ? ? ? }
2原献、發(fā)送延時消息時玩讳,會將消息寫入 RDelayedQueue和 sorted-set 中涩蜘,msg會添加一個randomId,支持發(fā)送相同的消息熏纯。并且判斷sorted-set首調(diào)消息如果是剛插入的同诫,則publish timeout(到期時間) 到 topic,發(fā)送消息調(diào)用如下方法樟澜,我們來看看
org.redisson.RedissonDelayedQueue#offerAsync(V, long, java.util.concurrent.TimeUnit)
public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {
? ? if (delay < 0) {
? ? ? ? throw new IllegalArgumentException("Delay can't be negative");
? ? }
? ? long delayInMs = timeUnit.toMillis(delay);
? ? long timeout = System.currentTimeMillis() + delayInMs;
? ? long randomId = ThreadLocalRandom.current().nextLong();
? ? return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_VOID,
? ? ? ? ? ? "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);"
? ? ? ? ? + "redis.call('zadd', KEYS[2], ARGV[1], value);"
? ? ? ? ? + "redis.call('rpush', KEYS[3], value);"
? ? ? ? ? // if new object added to queue head when publish its startTime
? ? ? ? ? // to all scheduler workers
? ? ? ? ? + "local v = redis.call('zrange', KEYS[2], 0, 0); "
? ? ? ? ? + "if v[1] == value then "
? ? ? ? ? ? + "redis.call('publish', KEYS[4], ARGV[1]); "
? ? ? ? ? + "end;",
? ? ? ? ? Arrays.<Object>asList(getRawName(), timeoutSetName, queueName, channelName),
? ? ? ? ? timeout, randomId, encode(e));
}
3误窖、訂閱到topic消息后,會先判斷其是否臨期(delay<10ms)秩贰,如果是則調(diào)用pushTask方法(1中有說明)霹俺,不是則啟動一個定時任務(wù)(使用的netty時間輪),延時delay后執(zhí)行pushTask方法毒费。
org.redisson.QueueTransferTask#scheduleTask
// 訂閱topic onMessage 時調(diào)用
private void scheduleTask(final Long startTime) {
? ? TimeoutTask oldTimeout = lastTimeout.get();
? ? if (startTime == null) {
? ? ? ? return;
? ? }
? ? if (oldTimeout != null) {
? ? ? ? oldTimeout.getTask().cancel();
? ? }
? ? long delay = startTime - System.currentTimeMillis();
? ? if (delay > 10) {
? ? // 使用 netty 時間輪 啟動一個定時任務(wù)
? ? ? ? Timeout timeout = connectionManager.newTimeout(new TimerTask() {? ? ? ? ? ? ? ? ? ?
? ? ? ? ? ? @Override
? ? ? ? ? ? public void run(Timeout timeout) throws Exception {
? ? ? ? ? ? ? ? pushTask();
? ? ? ? ? ? ? ? TimeoutTask currentTimeout = lastTimeout.get();
? ? ? ? ? ? ? ? if (currentTimeout.getTask() == timeout) {
? ? ? ? ? ? ? ? ? ? lastTimeout.compareAndSet(currentTimeout, null);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }, delay, TimeUnit.MILLISECONDS);
? ? ? ? if (!lastTimeout.compareAndSet(oldTimeout, new TimeoutTask(startTime, timeout))) {
? ? ? ? ? ? timeout.cancel();
? ? ? ? }
? ? } else {
? ? ? ? pushTask();
? ? }
}? ?