9. DelayQueue

DelayQueue類實現(xiàn)BlockingQueue接口咽袜。閱讀BlockingQueue文本以獲取有關的更多信息蹦狂。

DelayQueue內部阻止元素直到某個延遲到期,元素必須實現(xiàn)接口java.util.concurrent.Delayed圈浇。以下是java.util.concurrent.Delayed接口:

public interface Delayed extends Comparable<Delayed< {

    public long getDelay(TimeUnit timeUnit);

}

getDelay()方法返回的值應該是在釋放此元素之前剩余的延遲切省。如果返回0或負值,則延遲將被視為已過期蛔六,并且在DelayQueue調用下一個take()等操作時釋放荆永。

傳遞給getDelay()方法的TimeUnit實例是一個Enum,它說明了延遲的時間單位国章。TimeUnit枚舉有以下值:

DAYS
HOURS
MINUTES
SECONDS
MILLISECONDS
MICROSECONDS
NANOSECONDS

Delayed接口繼承了java.lang.Comparable接口具钥,這意味著Delayed對象可以被相互比較。這可能是在DelayQueue內部用于排序隊列中的元素液兽,因此它們能夠按到期時間排序釋放骂删。

以下是使用DelayQueue的示例:

public class DelayQueueExample {

    public static void main(String[] args) {
        DelayQueue queue = new DelayQueue();
        Delayed element1 = new DelayedElement();
        queue.put(element1);
        Delayed element2 = queue.take();
    }
}

DelayedElement是我創(chuàng)建的Delayed接口的實現(xiàn)。它不是java.util.concurrent包的一部分四啰。你必須創(chuàng)建自己的Delayed接口實現(xiàn)才能使用DelayQueue類桃漾。

源碼

DelayQueue類的泛型定義中可以看出,此類只能儲存繼承自Delayed接口的元素拟逮,內部使用一個優(yōu)先級隊列對元素進行排序撬统。

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {

    private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue<E> q = new PriorityQueue<E>();

    // 等待隊列的頭節(jié)點,可以視作一個緩存
    // 當一個線程成為leader敦迄,它只會等待指定延遲的時間恋追,但
    // 其他線程會一直等到。所以leader線程在獲取到元素后
    // 一定要釋放其他線程罚屋,除非其他線程臨時成為leader
    private Thread leader;

    /**
     * 當隊列頭部的一個新元素可獲得(即超時到期)或者一個新線程成為leader苦囱,喚醒此等待條件上的線程
     */
    private final Condition available = lock.newCondition();

構造函數

只有兩個構造方法,一個是默認構造方法脾猛,一個是給定一個集合撕彤,并將其中元素增加到等待隊列中。

public DelayQueue() {}

/**
 * Creates a {@code DelayQueue} initially containing the elements of the
 * given collection of {@link Delayed} instances.
 *
 * @param c the collection of elements to initially contain
 * @throws NullPointerException if the specified collection or any
 *         of its elements are null
 */
public DelayQueue(Collection<? extends E> c) {
    this.addAll(c);
}

增加操作

public boolean add(E e) {
    // 重用offer方法
    return offer(e);
}

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 將元素增加到優(yōu)先級隊列中
        q.offer(e);
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

public void put(E e) {
    // 因為是無界隊列猛拴,所以插入不會被阻塞羹铅。超時方法同理
    offer(e);
}

public boolean offer(E e, long timeout, TimeUnit unit) {
    return offer(e);
}

刪除操作

public E remove() {
    E x = poll();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}

// 提取并刪除第一個元素,如果隊列為空返回null
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 獲取第一個元素
        E first = q.peek();
        return (first == null || first.getDelay(NANOSECONDS) > 0)
            ? null
            : q.poll();
    } finally {
        lock.unlock();
    }
}

/**
 * 提取并刪除隊列的第一個元素愉昆,如果隊列為空則等待 
 * 直到有可獲得的元素
 *
 * @return the head of this queue
 * @throws InterruptedException {@inheritDoc}
 */
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            // 如果隊列為空职员,阻塞
            if (first == null)
                available.await();
            else {
                // 獲取頭元素的等待延遲
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0L)
                    return q.poll();
                first = null; // don't retain ref while waiting
                // 如果已經有線程在等待獲取頭元素,那么阻塞自己
                if (leader != null)
                    available.await();
                // 否則跛溉,自己就是leader焊切,等待給定延遲
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 如果成功獲取到元素并且隊列不為空扮授,喚醒其他線程
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

/**
 * Retrieves and removes the head of this queue, waiting if necessary
 * until an element with an expired delay is available on this queue,
 * or the specified wait time expires.
 *
 * @return the head of this queue, or {@code null} if the
 *         specified waiting time elapses before an element with
 *         an expired delay becomes available
 * @throws InterruptedException {@inheritDoc}
 */
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            // 如果隊列為空,超時等待
            if (first == null) {
                if (nanos <= 0L)
                    return null;
                else
                    nanos = available.awaitNanos(nanos);
            } else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0L)
                    return q.poll();
                // 如果延遲還未到期专肪,而指定的超時已到期刹勃,那么返回null
                if (nanos <= 0L)
                    return null;
                first = null; // don't retain ref while waiting
                if (nanos < delay || leader != null)
                    nanos = available.awaitNanos(nanos);
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        long timeLeft = available.awaitNanos(delay);
                        nanos -= delay - timeLeft;
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

訪問操作

public E element() {
    E x = peek();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 委托給優(yōu)先級隊列獲取
        return q.peek();
    } finally {
        lock.unlock();
    }
}

其他操作

public int drainTo(Collection<? super E> c) {
    return drainTo(c, Integer.MAX_VALUE);
}

public int drainTo(Collection<? super E> c, int maxElements) {
    Objects.requireNonNull(c);
    if (c == this)
        throw new IllegalArgumentException();
    if (maxElements <= 0)
        return 0;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int n = 0;
        for (E first;
             n < maxElements
                 && (first = q.peek()) != null
                 && first.getDelay(NANOSECONDS) <= 0;) {
            // 增加到集合中
            c.add(first);   // In this order, in case add() throws.
            // 從隊列中刪除此元素
            q.poll();
            ++n;
        }
        return n;
    } finally {
        lock.unlock();
    }
}

迭代器

迭代器使用數組保存隊列中的元素,當創(chuàng)建一個迭代器時嚎尤,使用toArray()方法將當前隊列轉換為數組荔仁,所以此迭代器不一定會和內部的優(yōu)先級隊列保持一致。迭代器除了提供訪問操作外诺苹,只提供了一個刪除操作咕晋,這個刪除操作保證不會出現(xiàn)不一致的情況雹拄。

public Iterator<E> iterator() {
    return new Itr(toArray());
}

/**
 * Snapshot iterator that works off copy of underlying q array.
 */
private class Itr implements Iterator<E> {
    final Object[] array; // Array of all elements
    int cursor;           // index of next element to return
    int lastRet;          // index of last element, or -1 if no such

    Itr(Object[] array) {
        lastRet = -1;
        this.array = array;
    }

    public boolean hasNext() {
        return cursor < array.length;
    }

    @SuppressWarnings("unchecked")
    public E next() {
        if (cursor >= array.length)
            throw new NoSuchElementException();
        return (E)array[lastRet = cursor++];
    }

    public void remove() {
        if (lastRet < 0)
            throw new IllegalStateException();
        removeEQ(array[lastRet]);
        lastRet = -1;
    }
}

void removeEQ(Object o) {
    final ReentrantLock lock = this.lock;
    // 加鎖
    lock.lock();
    try {
        // 獲取優(yōu)先級隊列的迭代器收奔,然后執(zhí)行刪除操作
        for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
            if (o == it.next()) {
                it.remove();
                break;
            }
        }
    } finally {
        lock.unlock();
    }
}

示例:

import org.junit.Assert;
import org.junit.Test;

import java.util.*;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class DelayQueueTest {
    private DelayQueue<DelayElement> queue;
    private int count;

    // 測試迭代器和內部組件的不一致性

    @Test
    public void test() {
        List<DelayElement> list = new ArrayList<>();
        for(int i = 1; i < 6; ++i) {
            list.add(new DelayElement(i, TimeUnit.SECONDS));
        }
        queue = new DelayQueue<>(list);

        Iterator<DelayElement> iterator = queue.iterator();
        // 增加一個元素
        queue.add(new DelayElement(6, TimeUnit.SECONDS));

        iterator.forEachRemaining((e) -> ++count);
        Assert.assertEquals(count, queue.size());

        iterator.next();
        iterator.remove();
        System.out.println(queue.size());
    }

    // 測試reomove方法的一致性
    @Test
    public void testRemoveInItr() {
        List<DelayElement> list = new ArrayList<>();
        for(int i = 1; i < 6; ++i) {
            list.add(new DelayElement(i, TimeUnit.SECONDS));
        }
        queue = new DelayQueue<>(list);

        Iterator<DelayElement> iterator = queue.iterator();
        // 增加一個元素
        queue.add(new DelayElement(6, TimeUnit.SECONDS));

        System.out.println(queue.size());
        iterator.next();
        iterator.remove();
        System.out.println(queue.size());
    }


    private static class DelayElement implements Delayed {
        private long deadline;

        DelayElement(long delay) {
            this.deadline = System.nanoTime() + delay;
        }

        DelayElement(long delay, TimeUnit unit) {
            this.deadline = System.nanoTime() + unit.toNanos(delay);
        }

        DelayElement(Date date) {
            this.deadline = TimeUnit.MILLISECONDS.toNanos(date.getTime());
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.toNanos(deadline - System.nanoTime());
        }

        @Override
        public int compareTo(Delayed o) {
            Objects.requireNonNull(o);
            return (int) (deadline - o.getDelay(TimeUnit.NANOSECONDS));
        }
    }
}

輸出:

java.lang.AssertionError:
Expected :5
Actual :6
<Click to see difference>
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:834)
at org.junit.Assert.assertEquals(Assert.java:645)
at org.junit.Assert.assertEquals(Assert.java:631)
at p6.DelayQueueTest.test(DelayQueueTest.java:28)

6
5

核心要點

  1. 使用此隊列時,元素必須要實現(xiàn)Delayed接口
  2. 當已經有一個線程等待獲取隊列頭元素時滓玖,其他也想要獲取元素的線程就會進行等待阻塞狀態(tài)
  3. 迭代器不和內部的優(yōu)先級隊列保持一致性
  4. 迭代器的remove()方法與內部的優(yōu)先級隊列保持一致性
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末坪哄,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子势篡,更是在濱河造成了極大的恐慌翩肌,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,482評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件禁悠,死亡現(xiàn)場離奇詭異念祭,居然都是意外死亡,警方通過查閱死者的電腦和手機碍侦,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,377評論 2 382
  • 文/潘曉璐 我一進店門粱坤,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人瓷产,你說我怎么就攤上這事站玄。” “怎么了濒旦?”我有些...
    開封第一講書人閱讀 152,762評論 0 342
  • 文/不壞的土叔 我叫張陵株旷,是天一觀的道長。 經常有香客問我尔邓,道長晾剖,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,273評論 1 279
  • 正文 為了忘掉前任梯嗽,我火速辦了婚禮钞瀑,結果婚禮上,老公的妹妹穿的比我還像新娘慷荔。我一直安慰自己雕什,他們只是感情好缠俺,可當我...
    茶點故事閱讀 64,289評論 5 373
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著贷岸,像睡著了一般壹士。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上偿警,一...
    開封第一講書人閱讀 49,046評論 1 285
  • 那天躏救,我揣著相機與錄音,去河邊找鬼螟蒸。 笑死盒使,一個胖子當著我的面吹牛,可吹牛的內容都是我干的七嫌。 我是一名探鬼主播少办,決...
    沈念sama閱讀 38,351評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼诵原!你這毒婦竟也來了英妓?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 36,988評論 0 259
  • 序言:老撾萬榮一對情侶失蹤绍赛,失蹤者是張志新(化名)和其女友劉穎蔓纠,沒想到半個月后,有當地人在樹林里發(fā)現(xiàn)了一具尸體吗蚌,經...
    沈念sama閱讀 43,476評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡腿倚,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 35,948評論 2 324
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了蚯妇。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片敷燎。...
    茶點故事閱讀 38,064評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖侮措,靈堂內的尸體忽然破棺而出懈叹,到底是詐尸還是另有隱情,我是刑警寧澤分扎,帶...
    沈念sama閱讀 33,712評論 4 323
  • 正文 年R本政府宣布澄成,位于F島的核電站,受9級特大地震影響畏吓,放射性物質發(fā)生泄漏墨状。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,261評論 3 307
  • 文/蒙蒙 一菲饼、第九天 我趴在偏房一處隱蔽的房頂上張望肾砂。 院中可真熱鬧,春花似錦宏悦、人聲如沸镐确。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,264評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽源葫。三九已至诗越,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間息堂,已是汗流浹背嚷狞。 一陣腳步聲響...
    開封第一講書人閱讀 31,486評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留荣堰,地道東北人床未。 一個月前我還...
    沈念sama閱讀 45,511評論 2 354
  • 正文 我出身青樓,卻偏偏與公主長得像振坚,于是被迫代替她去往敵國和親薇搁。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,802評論 2 345

推薦閱讀更多精彩內容