延遲隊(duì)列
延遲隊(duì)列,也就是一定時(shí)間之后將消息體放入隊(duì)列匣距,然后消費(fèi)者才能正常消費(fèi)面哥。比如1分鐘之后發(fā)送短信,發(fā)送郵件毅待,檢測(cè)數(shù)據(jù)狀態(tài)等尚卫。
Redisson Delayed Queue
如果你項(xiàng)目中使用了redisson,那么恭喜你尸红,使用延遲隊(duì)列將非常的簡(jiǎn)單吱涉。
基于Redis的Redisson分布式延遲隊(duì)列(Delayed Queue)結(jié)構(gòu)的RDelayedQueue
Java對(duì)象在實(shí)現(xiàn)了RQueue
接口的基礎(chǔ)上提供了向隊(duì)列按要求延遲添加項(xiàng)目的功能。該功能可以用來實(shí)現(xiàn)消息傳送延遲按幾何增長(zhǎng)或幾何衰減的發(fā)送策略外里。
RQueue<String> distinationQueue = ...
RDelayedQueue<String> delayedQueue = getDelayedQueue(distinationQueue); // 10秒鐘以后將消息發(fā)送到指定隊(duì)列
delayedQueue.offer("msg1", 10, TimeUnit.SECONDS); // 一分鐘以后將消息發(fā)送到指定隊(duì)列
delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);
在該對(duì)象不再需要的情況下怎爵,應(yīng)該主動(dòng)銷毀。僅在相關(guān)的Redisson對(duì)象也需要關(guān)閉的時(shí)候可以不用主動(dòng)銷毀盅蝗。
Java DelayQueue
DelayQueue它本質(zhì)上是一個(gè)隊(duì)列鳖链,而這個(gè)隊(duì)列里也只有存放Delayed的子類才有意義。
延遲隊(duì)列demo
public class DelayTask implements Delayed { private long startDate; public DelayTask(Long delayMillions) { this.startDate = System.currentTimeMillis() + delayMillions;
}
@Override public int compareTo(Delayed o) {
Long.compare(this.getDelay(TimeUnit.NANOSECONDS), o.getDelay(TimeUnit.NANOSECONDS));
}
@Override public long getDelay(TimeUnit unit) { return this.startDate - System.currentTimeMillis();
} public static void main(String[] args) throws Exception {
BlockingQueue<DelayTask> queue = new DelayQueue<>();
DelayTask delayTask = new DelayTask(1000 * 5L);
queue.put(delayTask); while (queue.size()>0){
queue.take();
}
}
}
延遲隊(duì)列消費(fèi)原理
源碼中出現(xiàn)了三次await字眼:
第一次是當(dāng)隊(duì)列為空時(shí)风科,等待撒轮;
第二次等待是因?yàn)槠虻l(fā)現(xiàn)有任務(wù),沒有到執(zhí)行時(shí)間题山,并且有準(zhǔn)備執(zhí)行的線程(leader)兰粉,那不好意思,還得接續(xù)等待直到下一個(gè)可執(zhí)行的任務(wù)顶瞳。
第三次是真正延時(shí)的地方了玖姑,available.awaitNanos(delay),此時(shí)也沒有別的線程要執(zhí)行,也就是我將要執(zhí)行慨菱,等待剩下的延遲時(shí)間即可焰络。
延遲隊(duì)列生產(chǎn)原理
為保證消費(fèi)者正常消費(fèi),如果優(yōu)先隊(duì)列頭元素和當(dāng)前放入元素相等符喝,則說明當(dāng)前元素消費(fèi)的優(yōu)先級(jí)高闪彼,重置準(zhǔn)備消費(fèi)的線程(leader)為null,喚醒消費(fèi)者線程重新執(zhí)行take方法邏輯协饲。
手寫一個(gè)Redis延遲隊(duì)列
Redis延遲隊(duì)列設(shè)計(jì)
延遲消息體設(shè)計(jì)
延遲消息體Message實(shí)現(xiàn)了Delayed接口畏腕,這樣Java DelayQueue就知道什么時(shí)候取出消息體。
Redis延遲隊(duì)列實(shí)現(xiàn)
RedisDelayQueue構(gòu)造函數(shù)依賴redis操作緩存服務(wù)對(duì)象和目標(biāo)隊(duì)列名稱(redis key)茉稠。
offer方法傳入member(具體消息)描馅,delay(延遲時(shí)間),timeUnit(時(shí)間單位)而线,然后封裝成延遲消息體Message對(duì)象铭污,放入Java DelayQueue中。
run方法是一個(gè)循環(huán)體膀篮,不斷的從Java DelayQueue對(duì)象中獲取消息體嘹狞,然后放入redis對(duì)應(yīng)的目標(biāo)隊(duì)列里。
延遲隊(duì)列測(cè)試demo
控制臺(tái)打印效果
思考
這種方案實(shí)現(xiàn)比較簡(jiǎn)單各拷,使用的時(shí)候一定要謹(jǐn)慎刁绒,應(yīng)用于延遲小,消息量不大的場(chǎng)景是沒問題的烤黍,畢竟Java DelayQueue是占用內(nèi)存的。另外也可以考慮利用Redis的sorted set 結(jié)構(gòu)實(shí)現(xiàn)延遲隊(duì)列傻盟,使用TimeStamp作為score蝙昙,比如你的任務(wù)是要延遲5分鐘律适,那么就在當(dāng)前時(shí)間上加5分鐘作為 score ,輪詢?nèi)蝿?wù)每秒只輪詢 score 小于等于 當(dāng)前時(shí)間的 key即可,如果任務(wù)支持有誤差播赁,那么當(dāng)沒有掃描到有效數(shù)據(jù)的時(shí)候可以休眠對(duì)應(yīng)時(shí)間再繼續(xù)輪詢。