DelayQueue
類實現(xiàn)BlockingQueue
接口咽袜。閱讀BlockingQueue
文本以獲取有關的更多信息蹦狂。
DelayQueue
內部阻止元素直到某個延遲到期,元素必須實現(xiàn)接口java.util.concurrent.Delayed
圈浇。以下是java.util.concurrent.Delayed
接口:
public interface Delayed extends Comparable<Delayed< {
public long getDelay(TimeUnit timeUnit);
}
getDelay()
方法返回的值應該是在釋放此元素之前剩余的延遲切省。如果返回0或負值,則延遲將被視為已過期蛔六,并且在DelayQueue
調用下一個take()
等操作時釋放荆永。
傳遞給getDelay()
方法的TimeUnit
實例是一個Enum
,它說明了延遲的時間單位国章。TimeUnit
枚舉有以下值:
DAYS
HOURS
MINUTES
SECONDS
MILLISECONDS
MICROSECONDS
NANOSECONDS
Delayed
接口繼承了java.lang.Comparable
接口具钥,這意味著Delayed
對象可以被相互比較。這可能是在DelayQueue
內部用于排序隊列中的元素液兽,因此它們能夠按到期時間排序釋放骂删。
以下是使用DelayQueue
的示例:
public class DelayQueueExample {
public static void main(String[] args) {
DelayQueue queue = new DelayQueue();
Delayed element1 = new DelayedElement();
queue.put(element1);
Delayed element2 = queue.take();
}
}
DelayedElement
是我創(chuàng)建的Delayed
接口的實現(xiàn)。它不是java.util.concurrent
包的一部分四啰。你必須創(chuàng)建自己的Delayed
接口實現(xiàn)才能使用DelayQueue
類桃漾。
源碼
從DelayQueue
類的泛型定義中可以看出,此類只能儲存繼承自Delayed
接口的元素拟逮,內部使用一個優(yōu)先級隊列對元素進行排序撬统。
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 等待隊列的頭節(jié)點,可以視作一個緩存
// 當一個線程成為leader敦迄,它只會等待指定延遲的時間恋追,但
// 其他線程會一直等到。所以leader線程在獲取到元素后
// 一定要釋放其他線程罚屋,除非其他線程臨時成為leader
private Thread leader;
/**
* 當隊列頭部的一個新元素可獲得(即超時到期)或者一個新線程成為leader苦囱,喚醒此等待條件上的線程
*/
private final Condition available = lock.newCondition();
構造函數
只有兩個構造方法,一個是默認構造方法脾猛,一個是給定一個集合撕彤,并將其中元素增加到等待隊列中。
public DelayQueue() {}
/**
* Creates a {@code DelayQueue} initially containing the elements of the
* given collection of {@link Delayed} instances.
*
* @param c the collection of elements to initially contain
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
增加操作
public boolean add(E e) {
// 重用offer方法
return offer(e);
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 將元素增加到優(yōu)先級隊列中
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
public void put(E e) {
// 因為是無界隊列猛拴,所以插入不會被阻塞羹铅。超時方法同理
offer(e);
}
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e);
}
刪除操作
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
// 提取并刪除第一個元素,如果隊列為空返回null
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 獲取第一個元素
E first = q.peek();
return (first == null || first.getDelay(NANOSECONDS) > 0)
? null
: q.poll();
} finally {
lock.unlock();
}
}
/**
* 提取并刪除隊列的第一個元素愉昆,如果隊列為空則等待
* 直到有可獲得的元素
*
* @return the head of this queue
* @throws InterruptedException {@inheritDoc}
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
// 如果隊列為空职员,阻塞
if (first == null)
available.await();
else {
// 獲取頭元素的等待延遲
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0L)
return q.poll();
first = null; // don't retain ref while waiting
// 如果已經有線程在等待獲取頭元素,那么阻塞自己
if (leader != null)
available.await();
// 否則跛溉,自己就是leader焊切,等待給定延遲
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 如果成功獲取到元素并且隊列不為空扮授,喚醒其他線程
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue,
* or the specified wait time expires.
*
* @return the head of this queue, or {@code null} if the
* specified waiting time elapses before an element with
* an expired delay becomes available
* @throws InterruptedException {@inheritDoc}
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
// 如果隊列為空,超時等待
if (first == null) {
if (nanos <= 0L)
return null;
else
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0L)
return q.poll();
// 如果延遲還未到期专肪,而指定的超時已到期刹勃,那么返回null
if (nanos <= 0L)
return null;
first = null; // don't retain ref while waiting
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
訪問操作
public E element() {
E x = peek();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 委托給優(yōu)先級隊列獲取
return q.peek();
} finally {
lock.unlock();
}
}
其他操作
public int drainTo(Collection<? super E> c) {
return drainTo(c, Integer.MAX_VALUE);
}
public int drainTo(Collection<? super E> c, int maxElements) {
Objects.requireNonNull(c);
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int n = 0;
for (E first;
n < maxElements
&& (first = q.peek()) != null
&& first.getDelay(NANOSECONDS) <= 0;) {
// 增加到集合中
c.add(first); // In this order, in case add() throws.
// 從隊列中刪除此元素
q.poll();
++n;
}
return n;
} finally {
lock.unlock();
}
}
迭代器
迭代器使用數組保存隊列中的元素,當創(chuàng)建一個迭代器時嚎尤,使用toArray()
方法將當前隊列轉換為數組荔仁,所以此迭代器不一定會和內部的優(yōu)先級隊列保持一致。迭代器除了提供訪問操作外诺苹,只提供了一個刪除操作咕晋,這個刪除操作保證不會出現(xiàn)不一致的情況雹拄。
public Iterator<E> iterator() {
return new Itr(toArray());
}
/**
* Snapshot iterator that works off copy of underlying q array.
*/
private class Itr implements Iterator<E> {
final Object[] array; // Array of all elements
int cursor; // index of next element to return
int lastRet; // index of last element, or -1 if no such
Itr(Object[] array) {
lastRet = -1;
this.array = array;
}
public boolean hasNext() {
return cursor < array.length;
}
@SuppressWarnings("unchecked")
public E next() {
if (cursor >= array.length)
throw new NoSuchElementException();
return (E)array[lastRet = cursor++];
}
public void remove() {
if (lastRet < 0)
throw new IllegalStateException();
removeEQ(array[lastRet]);
lastRet = -1;
}
}
void removeEQ(Object o) {
final ReentrantLock lock = this.lock;
// 加鎖
lock.lock();
try {
// 獲取優(yōu)先級隊列的迭代器收奔,然后執(zhí)行刪除操作
for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
if (o == it.next()) {
it.remove();
break;
}
}
} finally {
lock.unlock();
}
}
示例:
import org.junit.Assert;
import org.junit.Test;
import java.util.*;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class DelayQueueTest {
private DelayQueue<DelayElement> queue;
private int count;
// 測試迭代器和內部組件的不一致性
@Test
public void test() {
List<DelayElement> list = new ArrayList<>();
for(int i = 1; i < 6; ++i) {
list.add(new DelayElement(i, TimeUnit.SECONDS));
}
queue = new DelayQueue<>(list);
Iterator<DelayElement> iterator = queue.iterator();
// 增加一個元素
queue.add(new DelayElement(6, TimeUnit.SECONDS));
iterator.forEachRemaining((e) -> ++count);
Assert.assertEquals(count, queue.size());
iterator.next();
iterator.remove();
System.out.println(queue.size());
}
// 測試reomove方法的一致性
@Test
public void testRemoveInItr() {
List<DelayElement> list = new ArrayList<>();
for(int i = 1; i < 6; ++i) {
list.add(new DelayElement(i, TimeUnit.SECONDS));
}
queue = new DelayQueue<>(list);
Iterator<DelayElement> iterator = queue.iterator();
// 增加一個元素
queue.add(new DelayElement(6, TimeUnit.SECONDS));
System.out.println(queue.size());
iterator.next();
iterator.remove();
System.out.println(queue.size());
}
private static class DelayElement implements Delayed {
private long deadline;
DelayElement(long delay) {
this.deadline = System.nanoTime() + delay;
}
DelayElement(long delay, TimeUnit unit) {
this.deadline = System.nanoTime() + unit.toNanos(delay);
}
DelayElement(Date date) {
this.deadline = TimeUnit.MILLISECONDS.toNanos(date.getTime());
}
@Override
public long getDelay(TimeUnit unit) {
return unit.toNanos(deadline - System.nanoTime());
}
@Override
public int compareTo(Delayed o) {
Objects.requireNonNull(o);
return (int) (deadline - o.getDelay(TimeUnit.NANOSECONDS));
}
}
}
輸出:
java.lang.AssertionError:
Expected :5
Actual :6
<Click to see difference>
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:834)
at org.junit.Assert.assertEquals(Assert.java:645)
at org.junit.Assert.assertEquals(Assert.java:631)
at p6.DelayQueueTest.test(DelayQueueTest.java:28)
6
5
核心要點
- 使用此隊列時,元素必須要實現(xiàn)
Delayed
接口 - 當已經有一個線程等待獲取隊列頭元素時滓玖,其他也想要獲取元素的線程就會進行等待阻塞狀態(tài)
- 迭代器不和內部的優(yōu)先級隊列保持一致性
- 迭代器的
remove()
方法與內部的優(yōu)先級隊列保持一致性