因為一篇微信上關(guān)于時間輪的文章引起了我的興趣伤极,在網(wǎng)上看了很多關(guān)于時間輪的文章方灾,看到了Kafka中的時間輪定時器是基于DelayQueue實現(xiàn)的,于是先來把DelayQueue相關(guān)的知識梳理清楚
概述
DelayQueue 是一個支持延時獲取元素的無界阻塞隊列。隊列使用 PriorityQueue 來實現(xiàn)。隊列中的元素必須實現(xiàn) Delayed 接口筋岛,在創(chuàng)建元素時可以指定多久才能從隊列中獲取當前元素。只有在延遲期滿時才能從隊列中提取元素晒哄。我們可以將 DelayQueue 運用在以下應(yīng)用場景:
緩存系統(tǒng)的設(shè)計:可以用 DelayQueue 保存緩存元素的有效期睁宰,使用一個線程循環(huán)查詢 DelayQueue,一旦能從 DelayQueue 中獲取元素時揩晴,表示緩存有效期到了勋陪。
定時任務(wù)調(diào)度。使用 DelayQueue 保存當天將會執(zhí)行的任務(wù)和執(zhí)行時間硫兰,一旦從 DelayQueue 中獲取到任務(wù)就開始執(zhí)行,從比如 TimerQueue 就是使用 DelayQueue 實現(xiàn)的
繼承關(guān)系:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
...
}
DelayQueue實際是由優(yōu)先隊列(PriorityQueue)實現(xiàn)的BlockingQueue寒锚,優(yōu)先隊列比較的基準是時間
相關(guān)知識
BlockingQueue
BlockingQueue是一個阻塞隊列劫映,支持兩個附加操作:在隊列為空時,獲取元素的線程會等待隊列變?yōu)榉强丈睬啊.旉犃袧M時泳赋,存儲元素的線程會等待隊列可用。阻塞隊列常用于生產(chǎn)者和消費者的場景喇喉,生產(chǎn)者是往隊列里添加元素的線程祖今,消費者是從隊列里拿元素的線程。阻塞隊列就是生產(chǎn)者存放元素的容器拣技,而消費者也只從容器里拿元素千诬。
PriorityQueue
優(yōu)先級隊列,根據(jù)比較器來比較集合中元素的權(quán)重膏斤,每次出隊都彈出優(yōu)先級最小或最大的元素
java中的優(yōu)先級隊列是由一個可擴容的數(shù)組(堆)實現(xiàn)的徐绑,初始化時需要傳入一個實現(xiàn)了comparator接口的比較器,如果未傳入莫辨,則使用元素自身的comparable接口進行元素比較傲茄,最后得出一個根據(jù)比較權(quán)重排序的隊列
Delayed
Delayed接口繼承了Comparable接口,自身需要有一個getDelay的實現(xiàn)沮榜,比較的基準是延時的時間值
public interface Delayed extends Comparable<Delayed> {
//該元素距離失效還剩余的時間盘榨,當<=0時元素就失效了
long getDelay(TimeUnit unit);
DelayedQueue中的元素必須實現(xiàn)Delayed接口
DelayedQueue源碼
DelayedQueue中的重要屬性
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader = null;
private final Condition available = lock.newCondition();
DelayedQueue中的add()及put()方法都調(diào)用了offer(),因為使用了條件隊列作為實現(xiàn)蟆融,條件隊列是會擴容且無界的草巡,所以對于生產(chǎn)者來說,可以無限向隊列中生產(chǎn)任務(wù)從而不需要阻塞
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
//獲取鎖
lock.lock();
try {
//加入條件隊列q
q.offer(e);
//如果剛剛放入的元素是隊列的第一個元素振愿,喚醒線程捷犹,并更換leader
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
DelayedQueue中poll()和take()方法的實現(xiàn)不同弛饭,poll方法并不會阻塞并等待隊列中有元素才返回
public E poll() {
final ReentrantLock lock = this.lock;
//獲取鎖
lock.lock();
try {
//獲取條件隊列中首個元素,未刪除
E first = q.peek();
//如果首個元素為空或者時延還未達到萍歉,則說明隊列里沒有任務(wù)需要執(zhí)行侣颂,返回空
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
//如果都不為空且時延已到,則刪除條件隊列中首位并返回
else
return q.poll();
} finally {
lock.unlock();
}
}
take方法會阻塞到直到隊列中有元素才會被喚醒
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//獲取的是可中斷的鎖枪孩,如果已經(jīng)被中斷了就會拋出InterruptedException異常憔晒,而不需要等到調(diào)用await方法才拋異常了
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
//如果當前隊列為空,則進入available的條件等待隊列
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
//如果leader線程不為空蔑舞,說明當前線程為等待線程拒担,加入availabe中等待
if (leader != null)
available.await();
else {
//如果leader線程為空,將當前線程設(shè)為leader攻询,調(diào)用await方法休息delay時延从撼,等待隊首元素過期,然后更換leader
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
//如果當前沒有l(wèi)eader線程在工作且條件隊列不為空钧栖,喚醒available中線程
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
步驟:
(1)獲取鎖低零;
(2)若隊列為空,則阻塞等待拯杠;
(3)否則掏婶,獲取隊首元素delay時間;
(4)若delay時間已過期潭陪,則釋放鎖雄妥,將隊首元素出隊(注意:take返回前,如果leader為null且隊列不為空依溯,則發(fā)送available信號)老厌;
(5)若delay時間未到期且已設(shè)置leader,則阻塞等待誓沸;
(6)若delay時間未到期且未設(shè)置leader梅桩,則設(shè)置當前線程為leader,等待隊首元素過期拜隧;
(7)循環(huán)(2)——(6)宿百,返回前,釋放鎖洪添。
leader-follower模式
在DelayQueue中使用到了leader-follower模式
在多線程環(huán)境下垦页,我們理所當然會考慮使用線程池,而任何池的使用干奢,都會帶來一個管理和切換的問題痊焊。
這個模式的作用其實是解決線程頻繁切換帶來的開銷的,大家都知道,線程切換是有代價的薄啥,雖然進程內(nèi)數(shù)據(jù)共享辕羽,
但不斷的將cpu的寄存器,二級緩存的東西進進出出和指令來回折騰垄惧,現(xiàn)場復(fù)原刁愿,這些代價都很大,
所以這也就是到逊,線程不是越多就性能越高铣口,當在一定的機器的環(huán)境下,線程到了一定程度性能就上不去觉壶,反而下降脑题,其罪魁禍首就是頻繁的線程切換
leader-follower一定程度上能解決一些線程切換帶來的問題,這種模式的基本思想是所有線程會有三種身份中的一種:leader和follower铜靶,以及一個干活中的狀態(tài):proccesser叔遂。它的基本原則就是,永遠最多只有一個leader争剿。而所有follower都在等待成為leader掏熬。線程池啟動時會自動產(chǎn)生一個Leader負責(zé)等待事件,當有一個事件產(chǎn)生時秒梅,Leader線程首先通知一個Follower線程將其提拔為新的Leader,然后自己就去干活了舌胶,去處理這個事件捆蜀,處理完畢后加入Follower線程等待隊列,等待下次成為Leader幔嫂。這種方法可以增強CPU高速緩存相似性辆它,及消除動態(tài)內(nèi)存分配和線程間的數(shù)據(jù)交換。
參考文獻: