延遲隊列

一阿宅、場景介紹

1候衍、下單成功,30分鐘未支付洒放。支付超時蛉鹿,自動取消訂單

2、訂單簽收往湿,簽收后7天未進(jìn)行評價妖异。訂單超時未評價,系統(tǒng)默認(rèn)好評

3领追、下單成功他膳,商家5分鐘未接單,訂單取消

4绒窑、配送超時棕孙,推送短信提醒

5、三天會員試用期些膨,三天到期后準(zhǔn)時準(zhǔn)點通知用戶散罕,試用產(chǎn)品到期了

基于以上場景這個時候我們可能最先想到的應(yīng)該是采用定時任務(wù)去進(jìn)行輪訓(xùn)判斷,但是呢傀蓉,這個時間怎么確定才好呢欧漱,5分鐘。葬燎。1分鐘误甚。缚甩。執(zhí)行一次嗎。這樣就會非常影響性能窑邦。且時間誤差很大擅威。基于以上業(yè)務(wù)需要我們想到了有以下解決方案冈钦。

1郊丛、JDK延遲隊列,但是數(shù)據(jù)都在內(nèi)存中,重啟后什么都沒了瞧筛。

2厉熟、MQ中的延遲隊列,比如RocketMQ,但是社區(qū)版不能自定義较幌。

3揍瑟、elasticjob一次性動態(tài)任務(wù)

4、基于Redis實現(xiàn)

二乍炉、基于redis實現(xiàn)

redis也可以用來實現(xiàn)延時消息的功能绢片。理論上也有兩種方式

1、訂閱 key 過期事件(pub/sub)

2岛琼、使用 sorted-set 存儲消息底循,score為消息的過期時間

然而實際上訂閱過期事件存在諸多問題,所以并不合適:

1槐瑞、過期事件的不準(zhǔn)確此叠,過期時間只在key被刪除時才觸發(fā),并不是在key過期后就馬上刪除的

2随珠、pub/sub 不支持持久化,服務(wù)器宕機(jī)期間的事件會丟失

3猬错、pub/sub 存在丟失的可能窗看,線上使用的redis pub/sub 有丟失過消息(非過期時間)

4、所有的key過期都會發(fā)送過期事件倦炒,對redis性能有一定影響显沈。(除非單獨使用一個redis作為隊列服務(wù))

基于以上問題引出今天的主角redisson 的?RDelayedQueue ,下面就梳理下redis延時隊列的使用和原理逢唤。

redisson實際上是使用了 兩個隊列 + 一個 sorted-set + pub/sub 來實現(xiàn)延時隊列拉讯,而不是單一的sort-set,各自功能如下鳖藕。

1魔慷、sorted-set:存放未到期的消息&到期時間,提供消息延時排序功能

2著恩、兩個隊列:中轉(zhuǎn)隊列院尔;目標(biāo)隊列

a蜻展、目標(biāo)隊列

//目標(biāo)隊列-阻塞隊列-消費隊列,存放到期后的消息邀摆,提供消費

RBlockingQueue blockingQueue =redissonClient.getBlockingQueue(queueName());

b纵顾、中轉(zhuǎn)隊列

//中轉(zhuǎn)隊列-存放未到期消息,作為消息的原始順序視圖栋盹,提供如查詢施逾、刪除指定第幾條消息的功能

RDelayedQueue delayedQueue =redissonClient.getDelayedQueue(blockingQueue);

3、pub/sub:創(chuàng)建延時隊列的時候會創(chuàng)建一個QueueTransferTask,在里面會訂閱一個topic

在網(wǎng)上扒了一張延遲消息從發(fā)送到執(zhí)行的流程圖如下例获,

1汉额、首先創(chuàng)建延時隊列的時候,會創(chuàng)建一個QueueTransferTask, 在里面會訂閱一個topic躏敢,訂閱成功后闷愤,執(zhí)行其pushTask方法,里面會查詢sorted-set中100個已到期的消息件余,將其push到RBlockingQueue中讥脐,并從sorted-set和RDelayedQueue?中移除。(這里是為了投遞歷史未處理的消息)

2啼器、發(fā)送延時消息時旬渠,會將消息寫入RDelayedQueue? 和 sorted-set 中,msg會添加一個randomId端壳,支持發(fā)送相同的消息告丢。并且判斷sorted-set首調(diào)消息如果是剛插入的,則publish timeout(到期時間) 到 topic

3损谦、訂閱到topic消息后岖免,會先判斷其是否臨期(delay<10ms),如果是則調(diào)用pushTask方法(1中有說明)照捡,不是則啟動一個定時任務(wù)(使用的netty時間輪)颅湘,延時delay后執(zhí)行pushTask方法。

再結(jié)合源碼簡單描述一下:

1栗精、首選初始化兩個隊列,通過redissonClient的get方法創(chuàng)建的

getDelayedQueue方法里面實例化RedissonDelayedQueue對象闯参,我們看看RedissonDelayedQueue的代碼

org.redisson.RedissonDelayedQueue#RedissonDelayedQueue

會創(chuàng)建一個QueueTransferTask, 在里面會訂閱一個topic,訂閱成功后悲立,執(zhí)行其pushTask方法

看下pushTaskAsync方法鹿寨,通過lua腳本實現(xiàn),分析lua腳本里面會查詢sorted-set中100個已到期的消息薪夕,將其push到RBlockingQueue中脚草,并從sorted-set和RDelayedQueue?中移除。
@Override

? ? ? ? protected RFuture<Long> pushTaskAsync() {

? ? ? ? ? ? return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,

? ? ? ? ? ? ? ? ? ? "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "

? ? ? ? ? ? ? ? ? + "if #expiredValues > 0 then "

? ? ? ? ? ? ? ? ? ? ? + "for i, v in ipairs(expiredValues) do "

? ? ? ? ? ? ? ? ? ? ? ? ? + "local randomId, value = struct.unpack('dLc0', v);"

? ? ? ? ? ? ? ? ? ? ? ? ? + "redis.call('rpush', KEYS[1], value);"

? ? ? ? ? ? ? ? ? ? ? ? ? + "redis.call('lrem', KEYS[3], 1, v);"

? ? ? ? ? ? ? ? ? ? ? + "end; "

? ? ? ? ? ? ? ? ? ? ? + "redis.call('zrem', KEYS[2], unpack(expiredValues));"

? ? ? ? ? ? ? ? ? + "end; "

? ? ? ? ? ? ? ? ? ? // get startTime from scheduler queue head task

? ? ? ? ? ? ? ? ? + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "

? ? ? ? ? ? ? ? ? + "if v[1] ~= nil then "

? ? ? ? ? ? ? ? ? ? + "return v[2]; "

? ? ? ? ? ? ? ? ? + "end "

? ? ? ? ? ? ? ? ? + "return nil;",

? ? ? ? ? ? ? ? ? Arrays.<Object>asList(getRawName(), timeoutSetName, queueName),

? ? ? ? ? ? ? ? ? System.currentTimeMillis(), 100);

? ? ? ? }

2原献、發(fā)送延時消息時玩讳,會將消息寫入 RDelayedQueue和 sorted-set 中涩蜘,msg會添加一個randomId,支持發(fā)送相同的消息熏纯。并且判斷sorted-set首調(diào)消息如果是剛插入的同诫,則publish timeout(到期時間) 到 topic,發(fā)送消息調(diào)用如下方法樟澜,我們來看看

org.redisson.RedissonDelayedQueue#offerAsync(V, long, java.util.concurrent.TimeUnit)

public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {

? ? if (delay < 0) {

? ? ? ? throw new IllegalArgumentException("Delay can't be negative");

? ? }

? ? long delayInMs = timeUnit.toMillis(delay);

? ? long timeout = System.currentTimeMillis() + delayInMs;

? ? long randomId = ThreadLocalRandom.current().nextLong();

? ? return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_VOID,

? ? ? ? ? ? "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);"

? ? ? ? ? + "redis.call('zadd', KEYS[2], ARGV[1], value);"

? ? ? ? ? + "redis.call('rpush', KEYS[3], value);"

? ? ? ? ? // if new object added to queue head when publish its startTime

? ? ? ? ? // to all scheduler workers

? ? ? ? ? + "local v = redis.call('zrange', KEYS[2], 0, 0); "

? ? ? ? ? + "if v[1] == value then "

? ? ? ? ? ? + "redis.call('publish', KEYS[4], ARGV[1]); "

? ? ? ? ? + "end;",

? ? ? ? ? Arrays.<Object>asList(getRawName(), timeoutSetName, queueName, channelName),

? ? ? ? ? timeout, randomId, encode(e));

}

3误窖、訂閱到topic消息后,會先判斷其是否臨期(delay<10ms)秩贰,如果是則調(diào)用pushTask方法(1中有說明)霹俺,不是則啟動一個定時任務(wù)(使用的netty時間輪),延時delay后執(zhí)行pushTask方法毒费。

org.redisson.QueueTransferTask#scheduleTask

// 訂閱topic onMessage 時調(diào)用

private void scheduleTask(final Long startTime) {

? ? TimeoutTask oldTimeout = lastTimeout.get();

? ? if (startTime == null) {

? ? ? ? return;

? ? }


? ? if (oldTimeout != null) {

? ? ? ? oldTimeout.getTask().cancel();

? ? }

? ? long delay = startTime - System.currentTimeMillis();

? ? if (delay > 10) {

? ? // 使用 netty 時間輪 啟動一個定時任務(wù)

? ? ? ? Timeout timeout = connectionManager.newTimeout(new TimerTask() {? ? ? ? ? ? ? ? ? ?

? ? ? ? ? ? @Override

? ? ? ? ? ? public void run(Timeout timeout) throws Exception {

? ? ? ? ? ? ? ? pushTask();


? ? ? ? ? ? ? ? TimeoutTask currentTimeout = lastTimeout.get();

? ? ? ? ? ? ? ? if (currentTimeout.getTask() == timeout) {

? ? ? ? ? ? ? ? ? ? lastTimeout.compareAndSet(currentTimeout, null);

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? }, delay, TimeUnit.MILLISECONDS);

? ? ? ? if (!lastTimeout.compareAndSet(oldTimeout, new TimeoutTask(startTime, timeout))) {

? ? ? ? ? ? timeout.cancel();

? ? ? ? }

? ? } else {

? ? ? ? pushTask();

? ? }

}? ?

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末丙唧,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子觅玻,更是在濱河造成了極大的恐慌想际,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,386評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件溪厘,死亡現(xiàn)場離奇詭異胡本,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)畸悬,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,142評論 3 394
  • 文/潘曉璐 我一進(jìn)店門侧甫,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人蹋宦,你說我怎么就攤上這事披粟。” “怎么了冷冗?”我有些...
    開封第一講書人閱讀 164,704評論 0 353
  • 文/不壞的土叔 我叫張陵守屉,是天一觀的道長。 經(jīng)常有香客問我贾惦,道長,這世上最難降的妖魔是什么敦捧? 我笑而不...
    開封第一講書人閱讀 58,702評論 1 294
  • 正文 為了忘掉前任须板,我火速辦了婚禮,結(jié)果婚禮上兢卵,老公的妹妹穿的比我還像新娘习瑰。我一直安慰自己,他們只是感情好秽荤,可當(dāng)我...
    茶點故事閱讀 67,716評論 6 392
  • 文/花漫 我一把揭開白布甜奄。 她就那樣靜靜地躺著柠横,像睡著了一般。 火紅的嫁衣襯著肌膚如雪课兄。 梳的紋絲不亂的頭發(fā)上牍氛,一...
    開封第一講書人閱讀 51,573評論 1 305
  • 那天,我揣著相機(jī)與錄音烟阐,去河邊找鬼搬俊。 笑死,一個胖子當(dāng)著我的面吹牛蜒茄,可吹牛的內(nèi)容都是我干的唉擂。 我是一名探鬼主播,決...
    沈念sama閱讀 40,314評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼檀葛,長吁一口氣:“原來是場噩夢啊……” “哼玩祟!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起屿聋,我...
    開封第一講書人閱讀 39,230評論 0 276
  • 序言:老撾萬榮一對情侶失蹤空扎,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后胜臊,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體勺卢,經(jīng)...
    沈念sama閱讀 45,680評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,873評論 3 336
  • 正文 我和宋清朗相戀三年象对,在試婚紗的時候發(fā)現(xiàn)自己被綠了黑忱。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,991評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡勒魔,死狀恐怖甫煞,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情冠绢,我是刑警寧澤抚吠,帶...
    沈念sama閱讀 35,706評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站弟胀,受9級特大地震影響楷力,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜孵户,卻給世界環(huán)境...
    茶點故事閱讀 41,329評論 3 330
  • 文/蒙蒙 一萧朝、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧夏哭,春花似錦检柬、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,910評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽里逆。三九已至,卻和暖如春用爪,著一層夾襖步出監(jiān)牢的瞬間原押,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,038評論 1 270
  • 我被黑心中介騙來泰國打工项钮, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留班眯,地道東北人。 一個月前我還...
    沈念sama閱讀 48,158評論 3 370
  • 正文 我出身青樓烁巫,卻偏偏與公主長得像署隘,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子亚隙,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,941評論 2 355

推薦閱讀更多精彩內(nèi)容