本文將會對DelayQueue做一個簡單的介紹碳竟,并提供部分源碼的分析寺擂。
DelayQueue的特性基本上由BlockingQueue浇坐、PriorityQueue和Delayed的特性來決定的诅诱。
簡而言之壹置,DelayQueue是通過Delayed,使得不同元素之間能按照剩余的延遲時間進(jìn)行排序踩娘,然后通過PriorityQueue刮刑,使得超時的元素能最先被處理,然后利用BlockingQueue,將元素處理的操作阻塞住雷绢。
基本定義如下:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader = null;
private final Condition available = lock.newCondition();
}
ReentrantLock lock = new ReentrantLock();
ReentrantLock是一個可重入的互斥鎖泛烙,將由最近成功獲得鎖,并且還沒有釋放該鎖的線程所擁有翘紊,當(dāng)鎖被其他線程獲得時蔽氨,調(diào)用lock的線程將無法獲得鎖。
在DelayQueue中帆疟,只有一個互斥鎖lock鹉究。
PriorityQueue<E> q = new PriorityQueue<E>();
PriorityQueue是一個優(yōu)先級隊列,每次從隊列中取出的是具有最高優(yōu)先權(quán)的元素鸯匹。
在DelayQueue中,因為E繼承于Delayed泄伪,所以q表示一個按照delayTime排序的優(yōu)先級隊列殴蓬,用于存放需要延遲執(zhí)行的元素。
Thread leader = null;
這里的leader設(shè)計出來是為了minimize unnecessary timed waiting(減少不必要的等待時間)蟋滴,如何實現(xiàn)的方案會在詳細(xì)解讀中解釋染厅。
在DelayQueue中l(wèi)eader表示一個等待從隊列中獲取消息的線程。
Condition available = lock.newCondition();
Condition是lock對象的條件變量津函,只能和鎖lock配合使用肖粮,用于控制并發(fā)程序訪問競爭資源的安全。
一個鎖lock可以有多個條件變量condition尔苦,每個條件上可以有多個線程等待涩馆,通過調(diào)用await()方法,可以讓線程在該條件下等待允坚。當(dāng)調(diào)用signalAll()方法魂那,又可以喚醒該條件下的等待的線程。
在DelayQueue中l(wèi)ock對象只有一個條件變量available稠项。
以下是DelayQueue的主要方法:
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
1涯雅、執(zhí)行l(wèi)ock.lock(),獲取鎖展运。
2活逆、把元素e添加到優(yōu)先隊列q(下稱隊列q)中。
3拗胜、判斷隊列q的隊首元素是否為e蔗候。
4、如果e是隊首元素的話埂软,即元素e是最近可被執(zhí)行的元素琴庵,意味著延遲隊列的執(zhí)行順序?qū)⒈蛔兏?br>
執(zhí)行l(wèi)eader = null,否則在執(zhí)行take時,所有線程就會在if(leader!=null)的判斷下進(jìn)入等待迷殿。
執(zhí)行available.signal()儿礼,喚醒其他等待中的線程,重新去循環(huán)執(zhí)行take中的操作1-8庆寺。
如果不執(zhí)行signal蚊夫,那么在take方法中,只有執(zhí)行awaitNanos(delay)的線程在等待delay指定的時間后自動喚醒懦尝,其他執(zhí)行await的線程將一直被掛起知纷。
如果沒有新的線程去執(zhí)行take方法,那么等待執(zhí)行awaitNanos(delay)的線程自動喚醒時陵霉,此時等待時間將超過元素e的delayTime,這不符合預(yù)期琅轧。
即便有新的線程去執(zhí)行take方法,那之前掛起的線程也將一直在等待踊挠,效率很低乍桂。
5、在finally塊中執(zhí)行l(wèi)ock.unlock()效床。
需要注意的是吏夯,鎖必須在 finally 塊中釋放姚炕。否則,如果代碼拋出異常,那么鎖就有可能永遠(yuǎn)得不到釋放植兰。如果沒有釋放鎖裸弦,那么就會產(chǎn)生死鎖的問題麸俘。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
1澈魄、執(zhí)行l(wèi)ock.lockInterruptibly(),獲取鎖运嗜。
lockInterruptibly和lock的區(qū)別在于
lock 在鎖被其他線程占有则披,當(dāng)前線程等待鎖期間(下稱等待鎖期間),只考慮獲取鎖洗出。只有在獲取鎖成功后士复,才會去響應(yīng)中斷。
而lockInterruptibly 在等待鎖期間翩活,會優(yōu)先考慮響應(yīng)中斷阱洪,而不是響應(yīng)鎖的獲取。如果當(dāng)前線程被打斷(interrupt)則該方法拋出InterruptedException菠镇。該方法提供了一種解除死鎖的途徑冗荸。
2、E first = q.peek()利耍,獲取隊列q的隊首元素first(下稱first)蚌本。
3盔粹、如果first為空,則執(zhí)行avaliable.await()讓線程進(jìn)入等待程癌。實際上就是釋放鎖舷嗡,然后掛起線程,等待被喚醒嵌莉,此時其他線程可以獲得鎖了进萄。
await()和awaitNanos(nanosTimeout)區(qū)別在于
執(zhí)行awaitNanos(nanosTimeout)的線程比執(zhí)行await()的線程多一個喚醒條件,超過等待nanosTimeout指定的時間锐峭,線程將自動喚醒中鼠。線程喚醒時,保證該線程是持有鎖的沿癞。
4援雇、如果first不為空,則執(zhí)行first.getDelay(NANOSECONDS)獲取first的剩余延遲時間delayTime(下稱delayTime)
5椎扬、如果first的delayTime<=0惫搏,表明該元素已經(jīng)達(dá)到之前設(shè)定的延遲時間了,則調(diào)用return q.poll()盗舰,將first從隊列q中的移除并且返回該元素first.
6晶府、如果first的delayTime>0桂躏,則將first指向null,釋放first的引用,避免內(nèi)存泄露.
7钻趋、如果線程leader(下稱leader)不為空的話,則執(zhí)行avaliable.await()讓線程進(jìn)入等待剂习。leader不為空的話蛮位,表明已經(jīng)有其他線程在獲取優(yōu)先隊列q的隊首元素了(下稱獲取隊首元素),此時只需要執(zhí)行avaliable.await()讓當(dāng)前線程進(jìn)入等待即可鳞绕。
8失仁、如果leader為空,則執(zhí)行Thread thisThread = Thread.currentThread();leader = thisThread;將leader指向當(dāng)前線程们何,然后執(zhí)行available.awaitNanos(delay);讓線程最長等待delayTime的時間萄焦。最后在finally塊中,如果leader依然指向前文獲取的當(dāng)前線程thisThread冤竹,那么將leader指向null,釋放leader引用拂封。
這里leader為空,表明尚未有其他線程在獲取隊首元素鹦蠕,此時設(shè)置leader對象冒签,指向當(dāng)前線程(下稱currentThread)。因為currentThread執(zhí)行了available.awaitNanos(delay)釋放了鎖钟病,所以其他線程(下稱otherThread)在調(diào)用take方法時能獲取鎖萧恕,但是因為leader非空刚梭,所以otherThread都會進(jìn)入7的那步,直接進(jìn)入等待票唆,而不需要像currentThread那樣執(zhí)行8的一系列操作朴读,達(dá)到設(shè)計leader線程的初衷。
9惰说、循環(huán)執(zhí)行以上1-8步磨德,直到first非空且first的delayTime<=0,跳出循環(huán)吆视。
10典挑、跳出循環(huán)后,進(jìn)入finally塊啦吧。
11您觉、如果leader為空且隊列q的隊首元素非null(q隊列中移除了上文的first元素后還有其他元素),此時執(zhí)行available.signal()授滓,調(diào)用signal喚醒其他等待中的線程琳水。
12、執(zhí)行l(wèi)ock.unlock()般堆,執(zhí)行解鎖操作在孝。
ok,源碼分析就先講到這里了淮摔,下一期我準(zhǔn)備講一下如何將DelayQueue封裝成可用的組件私沮,讓使用者調(diào)用起來更加方便。