DelayQueue 基本原理

阻塞隊列

阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:在隊列為空時屡萤,獲取元素的線程會等待隊列變?yōu)榉强照浣!.旉犃袧M時,存儲元素的線程會等待隊列可用死陆。阻塞隊列常用于生產(chǎn)者和消費者的場景招拙,生產(chǎn)者是往隊列里添加元素的線程,消費者是從隊列里拿元素的線程措译。阻塞隊列就是生產(chǎn)者存放元素的容器别凤,而消費者也只從容器里拿元素。下面是 java 常見的阻塞隊列领虹。

ArrayBlockingQueue :一個由數(shù)組結(jié)構(gòu)組成的有界阻塞隊列规哪。
LinkedBlockingQueue :一個由鏈表結(jié)構(gòu)組成的有界阻塞隊列。
PriorityBlockingQueue :一個支持優(yōu)先級排序的無界阻塞隊列塌衰。
DelayQueue:一個使用優(yōu)先級隊列實現(xiàn)的無界阻塞隊列诉稍。
SynchronousQueue:一個不存儲元素的阻塞隊列。
LinkedTransferQueue:一個由鏈表結(jié)構(gòu)組成的無界阻塞隊列最疆。
LinkedBlockingDeque:一個由鏈表結(jié)構(gòu)組成的雙向阻塞隊列杯巨。

基本簡介

DelayQueue是一個無界阻塞隊列然爆,只有在延遲期滿時才能從中提取元素座菠。該隊列的頭部是延遲期滿后保存時間最長的Delayed 元素像屋。
DelayQueue是一個用來延時處理的隊列佑惠,所謂延時處理就是說可以為隊列中元素設(shè)定一個過期時間,相關(guān)的操作受到這個設(shè)定時間的控制劳澄。

使用場景

a) 關(guān)閉空閑連接童本。服務器中摄职,有很多客戶端的連接烙荷,空閑一段時間之后需要關(guān)閉之镜会。
b) 緩存。緩存中的對象终抽,超過了空閑時間,需要從緩存中移出。
c) 任務超時處理昼伴。在網(wǎng)絡(luò)協(xié)議滑動窗口請求應答式交互時匾旭,處理超時未響應的請求。

如果不使用DelayQueue圃郊,那么常規(guī)的解決辦法就是:使用一個后臺線程价涝,遍歷所有對象,挨個檢查持舆。這種笨笨的辦法簡單好用色瘩,但是對象數(shù)量過多時,可能存在性能問題逸寓,檢查間隔時間不好設(shè)置居兆,間隔時間過大,影響精確度竹伸,過小則存在效率問題泥栖。而且做不到按超時的時間順序處理。

基本原理

  • 首先勋篓,這種隊列中只能存放實現(xiàn)Delayed接口的對象吧享,而此接口有兩個需要實現(xiàn)的方法。最重要的就是getDelay譬嚣,這個方法需要返回對象過期前的時間钢颂。簡單說,隊列在某些方法處理前拜银,會調(diào)用此方法來判斷對象有沒有超時殊鞭。
  • 其次,DelayQueue是一個BlockingQueue盐股,其特化的參數(shù)是Delayed钱豁。(不了解BlockingQueue的同學,先去了解BlockingQueue再看本文)
  • Delayed擴展了Comparable接口疯汁,比較的基準為延時的時間值牲尺,Delayed接口的實現(xiàn)類getDelay的返回值應為固定值(final)。DelayQueue內(nèi)部是使用PriorityQueue實現(xiàn)的幌蚊。

總結(jié)谤碳,DelayQueue的關(guān)鍵元素BlockingQueue、PriorityQueue溢豆、Delayed蜒简。可以這么說漩仙,DelayQueue是一個使用優(yōu)先隊列(PriorityQueue)實現(xiàn)的BlockingQueue搓茬,優(yōu)先隊列的比較基準值是時間犹赖。本質(zhì)上即:
DelayQueue = BlockingQueue +PriorityQueue + Delayed

他們的基本定義如下

public interface Comparable<T> {
    public int compareTo(T o);
} 
public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
} 
public class DelayQueue<E extends Delayed> implements BlockingQueue<E> { 
    private final PriorityQueue<E> q = new PriorityQueue<E>();
} 

基本用法

/**
 * 延遲隊列示例
 */
public class DelayQueueTester {
    private static DelayQueue<DelayTask> delayQueue = new DelayQueue<>();
    static class DelayTask implements Delayed {
        // 延遲時間
        private final long delay;
        // 到期時間
        private final long expire;
        // 數(shù)據(jù)
        private final String msg;
        // 創(chuàng)建時間
        private final long now;
        /**
         * 初始化 DelayTask 對象
         *
         * @param delay 延遲時間 單位:微妙
         * @param msg   業(yè)務信息
         */
        DelayTask(long delay, String msg) {
            this.delay = delay; // 延遲時間
            this.msg = msg; // 業(yè)務信息
            this.now = Instant.now().toEpochMilli();
            this.expire = now + delay; // 到期時間 = 當前時間+延遲時間
        }
        /**
         * 獲取延遲時間
         *
         * @param unit 單位對象
         * @return
         */
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(expire - Instant.now().toEpochMilli(), TimeUnit.MILLISECONDS);
        }
        /**
         * 比較器
         * 比較規(guī)則:延遲時間越長的對象越靠后
         *
         * @param o
         * @return
         */
        @Override
        public int compareTo(Delayed o) {
            if (o == this) // compare zero ONLY if same object
                return 0;
            return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
        }
        @Override
        public String toString() {
            return "DelayTask{" +
                    "delay=" + delay +
                    ", expire=" + expire +
                    ", msg='" + msg + '\'' +
                    ", now=" + now +
                    '}';
        }
    }
    /**
     * 生產(chǎn)者線程
     *
     * @param args
     */
    public static void main(String[] args) {
        initConsumer();
        try {
            // 等待消費者初始化完畢
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        delayQueue.add(new DelayTask(1000, "Task1"));
        delayQueue.add(new DelayTask(2000, "Task2"));
        delayQueue.add(new DelayTask(3000, "Task3"));
        delayQueue.add(new DelayTask(4000, "Task4"));
        delayQueue.add(new DelayTask(5000, "Task5"));
    }
    /**
     * 初始化消費者線程
     */
    private static void initConsumer() {
        Runnable task = () -> {
            while (true) {
                try {
                    System.out.println("嘗試獲取延遲隊列中的任務。" + LocalDateTime.now());
                    System.out.println(delayQueue.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        Thread consumer = new Thread(task);
        consumer.start();
    }
}
---
嘗試獲取延遲隊列中的任務卷仑。2017-04-05T18:28:03.282
DelayTask{delay=1000, expire=1491388087234, msg='Task1', now=1491388086234}
嘗試獲取延遲隊列中的任務峻村。2017-04-05T18:28:07.235
DelayTask{delay=2000, expire=1491388088235, msg='Task2', now=1491388086235}
嘗試獲取延遲隊列中的任務。2017-04-05T18:28:08.237
DelayTask{delay=3000, expire=1491388089235, msg='Task3', now=1491388086235}
嘗試獲取延遲隊列中的任務锡凝。2017-04-05T18:28:09.237
DelayTask{delay=4000, expire=1491388090235, msg='Task4', now=1491388086235}
嘗試獲取延遲隊列中的任務粘昨。2017-04-05T18:28:10.240
DelayTask{delay=5000, expire=1491388091235, msg='Task5', now=1491388086235}
嘗試獲取延遲隊列中的任務。2017-04-05T18:28:11.240

DelayQueue 實現(xiàn)原理

主要屬性

// 可以看看AbstractQueue 窜锯,實現(xiàn)了阻塞Queue接口
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
    // 阻塞等待使用了可重入鎖张肾,只有一把
    private final transient ReentrantLock lock = new ReentrantLock();
    // 優(yōu)先隊列,用來對不同延遲任務的排序
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    /**
     * Thread designated to wait for the element at the head of
     * the queue.  This variant of the Leader-Follower pattern
     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
     * minimize unnecessary timed waiting.  When a thread becomes
     * the leader, it waits only for the next delay to elapse, but
     * other threads await indefinitely.  The leader thread must
     * signal some other thread before returning from take() or
     * poll(...), unless some other thread becomes leader in the
     * interim.  Whenever the head of the queue is replaced with
     * an element with an earlier expiration time, the leader
     * field is invalidated by being reset to null, and some
     * waiting thread, but not necessarily the current leader, is
     * signalled.  So waiting threads must be prepared to acquire
     * and lose leadership while waiting.
     */

    // 這個Leader 有意思锚扎,解決了隊列頭的數(shù)據(jù)和線程的關(guān)聯(lián)
    // 同時解決了其他線程由誰喚醒
    private Thread leader = null;

    /**
     * Condition signalled when a newer element becomes available
     * at the head of the queue or a new thread may need to
     * become leader.
     */
    // 與Leader Thread配合 喚醒等待的Leader或者新Leader替換
    private final Condition available = lock.newCondition();

DelayQueue的take方法吞瞪,把優(yōu)先隊列q的first拿出來(peek),如果沒有達到延時閥值工秩,則進行await處理尸饺。
如下:

    public E take() throws InterruptedException {
        // 獲取鎖。每個延遲隊列內(nèi)聚了一個重入鎖助币。
        final ReentrantLock lock = this.lock;
        // 獲取可中斷的鎖浪听。
        lock.lockInterruptibly();
        try {
            for (;;) {
                // 嘗試從優(yōu)先級隊列中獲取隊列頭部元素
                E first = q.peek();
                if (first == null)
                    // 無元素,當前線程節(jié)點加入等待隊列眉菱,并阻塞當前線程
                    available.await();
                else {
                    // 通過延遲任務的 getDelay 方法獲取延遲時間
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        // 延遲時間到期迹栓,獲取并刪除頭部元素。
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    // 存在leader線程俭缓,則其他的線程進入時克伊,直接進入等待 
                    if (leader != null)
                        available.await();
                    else {
                        // 獲取當前線程 說明線程變了
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            // 線程節(jié)點進入等待隊列 x 納秒。
                            available.awaitNanos(delay);
                        } finally {
                            // 等待完了华坦,該線程則設(shè)置為null
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            // 若還存在元素的話愿吹,則將等待隊列頭節(jié)點中的線程節(jié)點移動到同步隊列中。
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

Add

    public boolean add(E e) {
        return offer(e);
    }
    /**
     * Inserts the specified element into this delay queue.
     *
     * @param e the element to add
     * @return {@code true}
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        // 獲取到重入鎖
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            if (q.peek() == e) {
                //  剛添加的元素成為頭節(jié)點
                // 那之前的頭結(jié)點就直接廢掉
                leader = null;
                // 喚醒take等待的線程惜姐,重新走查一遍
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

Ref:
http://blog.csdn.net/kobejayandy/article/details/46833623

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末犁跪,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子歹袁,更是在濱河造成了極大的恐慌坷衍,老刑警劉巖,帶你破解...
    沈念sama閱讀 207,248評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件条舔,死亡現(xiàn)場離奇詭異枫耳,居然都是意外死亡,警方通過查閱死者的電腦和手機孟抗,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,681評論 2 381
  • 文/潘曉璐 我一進店門迁杨,熙熙樓的掌柜王于貴愁眉苦臉地迎上來钻心,“玉大人,你說我怎么就攤上這事仑最∪右郏” “怎么了帆喇?”我有些...
    開封第一講書人閱讀 153,443評論 0 344
  • 文/不壞的土叔 我叫張陵警医,是天一觀的道長。 經(jīng)常有香客問我坯钦,道長预皇,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,475評論 1 279
  • 正文 為了忘掉前任婉刀,我火速辦了婚禮吟温,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘突颊。我一直安慰自己鲁豪,他們只是感情好,可當我...
    茶點故事閱讀 64,458評論 5 374
  • 文/花漫 我一把揭開白布律秃。 她就那樣靜靜地躺著爬橡,像睡著了一般。 火紅的嫁衣襯著肌膚如雪棒动。 梳的紋絲不亂的頭發(fā)上糙申,一...
    開封第一講書人閱讀 49,185評論 1 284
  • 那天,我揣著相機與錄音船惨,去河邊找鬼柜裸。 笑死,一個胖子當著我的面吹牛粱锐,可吹牛的內(nèi)容都是我干的疙挺。 我是一名探鬼主播,決...
    沈念sama閱讀 38,451評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼怜浅,長吁一口氣:“原來是場噩夢啊……” “哼铐然!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起海雪,我...
    開封第一講書人閱讀 37,112評論 0 261
  • 序言:老撾萬榮一對情侶失蹤锦爵,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后奥裸,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體险掀,經(jīng)...
    沈念sama閱讀 43,609評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,083評論 2 325
  • 正文 我和宋清朗相戀三年湾宙,在試婚紗的時候發(fā)現(xiàn)自己被綠了樟氢。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片冈绊。...
    茶點故事閱讀 38,163評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖埠啃,靈堂內(nèi)的尸體忽然破棺而出死宣,到底是詐尸還是另有隱情,我是刑警寧澤碴开,帶...
    沈念sama閱讀 33,803評論 4 323
  • 正文 年R本政府宣布毅该,位于F島的核電站,受9級特大地震影響潦牛,放射性物質(zhì)發(fā)生泄漏眶掌。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,357評論 3 307
  • 文/蒙蒙 一巴碗、第九天 我趴在偏房一處隱蔽的房頂上張望朴爬。 院中可真熱鬧,春花似錦橡淆、人聲如沸召噩。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,357評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽具滴。三九已至,卻和暖如春痊银,著一層夾襖步出監(jiān)牢的瞬間抵蚊,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,590評論 1 261
  • 我被黑心中介騙來泰國打工溯革, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留贞绳,地道東北人。 一個月前我還...
    沈念sama閱讀 45,636評論 2 355
  • 正文 我出身青樓致稀,卻偏偏與公主長得像冈闭,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子抖单,可洞房花燭夜當晚...
    茶點故事閱讀 42,925評論 2 344

推薦閱讀更多精彩內(nèi)容