DelayQueue類的主要作用:是一個無界的BlockingQueue纤泵,用于放置實現(xiàn)了Delayed接口的對象,其中的對象只能在其到期時才能從隊列中取走烤蜕。這種隊列是有序的眨攘,即隊頭對象的延遲到期時間最長。注意:不能將null元素放置到這種隊列中因谎。
主要屬性
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
// 持有內(nèi)部重入鎖基括。
private final transient ReentrantLock lock = new ReentrantLock();
// 優(yōu)先級隊列,存放工作任務财岔。
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader = null;
// 依賴于重入鎖的 condition(出隊列的線程使用)
private final Condition available = lock.newCondition();
}
1.Delayed接口
DelayQueue隊列與其它的隊列最大的不同就是這個隊列里的元素必須實現(xiàn)Delayed接口才能入隊风皿,我們來看一下這個接口:
public interface Delayed extends Comparable<Delayed> {
/**
* Returns the remaining delay associated with this object, in the
* given time unit.
*
* @param unit the time unit
* @return the remaining delay; zero or negative values indicate
* that the delay has already elapsed
*/
long getDelay(TimeUnit unit);
}
該接口繼承自Comparable,也就意味著實現(xiàn)了Delayed接口的類必須有兩個方法getDelay和compareTo匠璧,示例類:
static class Task implements Delayed{
long time = System.currentTimeMillis();
public Task(long time) {
this.time = time;
}
@Override
public int compareTo(Delayed o) {
if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))
return -1;
else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS))
return 1;
else
return 0;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(time - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
}
@Override
public String toString() {
return "" + time;
}
}
2.內(nèi)部隊列PriorityQueue
DelayQueue內(nèi)部使用優(yōu)先級隊列PriorityQueue來存放元素桐款,PriorityQueue隊列里的元素會根據(jù)某些屬性排列先后的順序,這里正好可以利用Delayed接口里的getDelay的返回值來進行排序夷恍,delayQueue其實就是在每次往優(yōu)先級隊列中添加元素,然后以元素的delay/過期值作為排序的因素,以此來達到先過期的元素會拍在隊首,每次從隊列里取出來都是最先要過期的元素魔眨。
3.offer()方法
/**
* Inserts the specified element into this delay queue.
*
* @param e the element to add
* @return {@code true}
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
// 獲取鎖
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
// 判斷是否添加成功
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
1.獲取鎖來執(zhí)行后續(xù)操作
2.元素添加到優(yōu)先級隊列中
3.查看元素是否為隊首,如果是隊首的話酿雪,設置leader為空遏暴,喚醒一個消費線程。
這里有一個leader元素它的作用我們后面說指黎,先看一下取元素過程
4.take()方法
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue.
*
* @return the head of this queue
* @throws InterruptedException {@inheritDoc}
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 獲取可中斷鎖朋凉。
lock.lockInterruptibly();
try {
for (;;) {
// 從優(yōu)先級隊列中獲取隊列頭元素
E first = q.peek();
if (first == null)
// 無元素,當前線程加入等待隊列醋安,并阻塞
available.await();
else {
// 通過getDelay 方法獲取延遲時間
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
// 延遲時間到期杂彭,獲取并刪除頭部元素。
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 線程節(jié)點進入等待隊列 x 納秒吓揪。
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// leader == null且還存在元素的話亲怠,喚醒一個消費線程。
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
1.獲取鎖
2.取出優(yōu)先級隊列q的首元素
3.如果元素q的隊首/隊列為空,阻塞
3.如果元素q的隊首(first)不為空,獲得這個元素的delay時間值柠辞,如果first的延遲delay時間值為0的話,說明該元素已經(jīng)到了可以使用的時間,調(diào)用poll方法彈出該元素,跳出方法
4.如果first的延遲delay時間值不為0的話,釋放元素first的引用,避免內(nèi)存泄露
5.循環(huán)以上操作团秽,直到return
5.leader元素的使用
leader是一個Thread元素,它在offer和take中都有使用叭首,它代表當前獲取到鎖的消費者線程习勤,
我們從take里的邏輯片段來分析
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
如果leader不為null,說明已經(jīng)有消費者線程拿到鎖,直接阻塞當前線程放棒,如果leader為null姻报,把當前線程賦值給leader,并等待剩余的到期時間间螟,最后釋放leader吴旋,這里我們想象著我們有個多個消費者線程用take方法去取,如果沒有l(wèi)eader!=null的判斷厢破,這些線程都會無限循環(huán)荣瑟,直到返回第一個元素,很顯然很浪費資源摩泪。所以leader的作用是設置一個標記疆柔,來避免消費者的無腦競爭。
first = null; // don't retain ref while waiting
這里是釋放first眼坏,是因為first是隊列第一個元素的引用,同時可以有很多線程執(zhí)行捏检,意味著有很多線程持有第一個元素的引用,很有可能導致內(nèi)存溢出,所以手動釋放不皆。