DelayQueue源碼

因為一篇微信上關(guān)于時間輪的文章引起了我的興趣伤极,在網(wǎng)上看了很多關(guān)于時間輪的文章方灾,看到了Kafka中的時間輪定時器是基于DelayQueue實現(xiàn)的,于是先來把DelayQueue相關(guān)的知識梳理清楚

概述

DelayQueue 是一個支持延時獲取元素的無界阻塞隊列。隊列使用 PriorityQueue 來實現(xiàn)。隊列中的元素必須實現(xiàn) Delayed 接口筋岛,在創(chuàng)建元素時可以指定多久才能從隊列中獲取當前元素。只有在延遲期滿時才能從隊列中提取元素晒哄。我們可以將 DelayQueue 運用在以下應(yīng)用場景:

  1. 緩存系統(tǒng)的設(shè)計:可以用 DelayQueue 保存緩存元素的有效期睁宰,使用一個線程循環(huán)查詢 DelayQueue,一旦能從 DelayQueue 中獲取元素時揩晴,表示緩存有效期到了勋陪。

  2. 定時任務(wù)調(diào)度。使用 DelayQueue 保存當天將會執(zhí)行的任務(wù)和執(zhí)行時間硫兰,一旦從 DelayQueue 中獲取到任務(wù)就開始執(zhí)行,從比如 TimerQueue 就是使用 DelayQueue 實現(xiàn)的

繼承關(guān)系:


public class DelayQueue<E extends Delayed> extends AbstractQueue<E>

    implements BlockingQueue<E> {

    private final transient ReentrantLock lock = new ReentrantLock();

    private final PriorityQueue<E> q = new PriorityQueue<E>();

        ...

    }

DelayQueue實際是由優(yōu)先隊列(PriorityQueue)實現(xiàn)的BlockingQueue寒锚,優(yōu)先隊列比較的基準是時間

相關(guān)知識

BlockingQueue

BlockingQueue是一個阻塞隊列劫映,支持兩個附加操作:在隊列為空時,獲取元素的線程會等待隊列變?yōu)榉强丈睬啊.旉犃袧M時泳赋,存儲元素的線程會等待隊列可用。阻塞隊列常用于生產(chǎn)者和消費者的場景喇喉,生產(chǎn)者是往隊列里添加元素的線程祖今,消費者是從隊列里拿元素的線程。阻塞隊列就是生產(chǎn)者存放元素的容器拣技,而消費者也只從容器里拿元素千诬。

PriorityQueue

優(yōu)先級隊列,根據(jù)比較器來比較集合中元素的權(quán)重膏斤,每次出隊都彈出優(yōu)先級最小或最大的元素

java中的優(yōu)先級隊列是由一個可擴容的數(shù)組(堆)實現(xiàn)的徐绑,初始化時需要傳入一個實現(xiàn)了comparator接口的比較器,如果未傳入莫辨,則使用元素自身的comparable接口進行元素比較傲茄,最后得出一個根據(jù)比較權(quán)重排序的隊列

Delayed

Delayed接口繼承了Comparable接口,自身需要有一個getDelay的實現(xiàn)沮榜,比較的基準是延時的時間值


public interface Delayed extends Comparable<Delayed> {

    //該元素距離失效還剩余的時間盘榨,當<=0時元素就失效了

    long getDelay(TimeUnit unit);

DelayedQueue中的元素必須實現(xiàn)Delayed接口

DelayedQueue源碼

DelayedQueue中的重要屬性


private final transient ReentrantLock lock = new ReentrantLock();

private final PriorityQueue<E> q = new PriorityQueue<E>();

private Thread leader = null;

private final Condition available = lock.newCondition();



DelayedQueue中的add()及put()方法都調(diào)用了offer(),因為使用了條件隊列作為實現(xiàn)蟆融,條件隊列是會擴容且無界的草巡,所以對于生產(chǎn)者來說,可以無限向隊列中生產(chǎn)任務(wù)從而不需要阻塞


public boolean offer(E e) {

        final ReentrantLock lock = this.lock;

        //獲取鎖

        lock.lock();

        try {

            //加入條件隊列q

            q.offer(e);

            //如果剛剛放入的元素是隊列的第一個元素振愿,喚醒線程捷犹,并更換leader

            if (q.peek() == e) {

                leader = null;

                available.signal();

            }

            return true;

        } finally {

            lock.unlock();

        }

    }

DelayedQueue中poll()和take()方法的實現(xiàn)不同弛饭,poll方法并不會阻塞并等待隊列中有元素才返回


public E poll() {

        final ReentrantLock lock = this.lock;

        //獲取鎖

        lock.lock();

        try {

            //獲取條件隊列中首個元素,未刪除

            E first = q.peek();

            //如果首個元素為空或者時延還未達到萍歉,則說明隊列里沒有任務(wù)需要執(zhí)行侣颂,返回空

            if (first == null || first.getDelay(NANOSECONDS) > 0)

                return null;

            //如果都不為空且時延已到,則刪除條件隊列中首位并返回

            else

                return q.poll();

        } finally {

            lock.unlock();

        }

    }

take方法會阻塞到直到隊列中有元素才會被喚醒


public E take() throws InterruptedException {

        final ReentrantLock lock = this.lock;

        //獲取的是可中斷的鎖枪孩,如果已經(jīng)被中斷了就會拋出InterruptedException異常憔晒,而不需要等到調(diào)用await方法才拋異常了

        lock.lockInterruptibly();

        try {

            for (;;) {

                E first = q.peek();

                //如果當前隊列為空,則進入available的條件等待隊列

                if (first == null)

                    available.await();

                else {

                    long delay = first.getDelay(NANOSECONDS);

                    if (delay <= 0)

                        return q.poll();

                    first = null; // don't retain ref while waiting

                    //如果leader線程不為空蔑舞,說明當前線程為等待線程拒担,加入availabe中等待

                    if (leader != null)

                        available.await();

                    else {

                        //如果leader線程為空,將當前線程設(shè)為leader攻询,調(diào)用await方法休息delay時延从撼,等待隊首元素過期,然后更換leader

                        Thread thisThread = Thread.currentThread();

                        leader = thisThread;

                        try {

                            available.awaitNanos(delay);

                        } finally {

                            if (leader == thisThread)

                                leader = null;

                        }

                    }

                }

            }

        } finally {

            //如果當前沒有l(wèi)eader線程在工作且條件隊列不為空钧栖,喚醒available中線程

            if (leader == null && q.peek() != null)

                available.signal();

            lock.unlock();

        }

    }

步驟:

(1)獲取鎖低零;

(2)若隊列為空,則阻塞等待拯杠;

(3)否則掏婶,獲取隊首元素delay時間;

(4)若delay時間已過期潭陪,則釋放鎖雄妥,將隊首元素出隊(注意:take返回前,如果leader為null且隊列不為空依溯,則發(fā)送available信號)老厌;

(5)若delay時間未到期且已設(shè)置leader,則阻塞等待誓沸;

(6)若delay時間未到期且未設(shè)置leader梅桩,則設(shè)置當前線程為leader,等待隊首元素過期拜隧;

(7)循環(huán)(2)——(6)宿百,返回前,釋放鎖洪添。

leader-follower模式

在DelayQueue中使用到了leader-follower模式

在多線程環(huán)境下垦页,我們理所當然會考慮使用線程池,而任何池的使用干奢,都會帶來一個管理和切換的問題痊焊。

這個模式的作用其實是解決線程頻繁切換帶來的開銷的,大家都知道,線程切換是有代價的薄啥,雖然進程內(nèi)數(shù)據(jù)共享辕羽,

但不斷的將cpu的寄存器,二級緩存的東西進進出出和指令來回折騰垄惧,現(xiàn)場復(fù)原刁愿,這些代價都很大,

所以這也就是到逊,線程不是越多就性能越高铣口,當在一定的機器的環(huán)境下,線程到了一定程度性能就上不去觉壶,反而下降脑题,其罪魁禍首就是頻繁的線程切換

leader-follower一定程度上能解決一些線程切換帶來的問題,這種模式的基本思想是所有線程會有三種身份中的一種:leader和follower铜靶,以及一個干活中的狀態(tài):proccesser叔遂。它的基本原則就是,永遠最多只有一個leader争剿。而所有follower都在等待成為leader掏熬。線程池啟動時會自動產(chǎn)生一個Leader負責(zé)等待事件,當有一個事件產(chǎn)生時秒梅,Leader線程首先通知一個Follower線程將其提拔為新的Leader,然后自己就去干活了舌胶,去處理這個事件捆蜀,處理完畢后加入Follower線程等待隊列,等待下次成為Leader幔嫂。這種方法可以增強CPU高速緩存相似性辆它,及消除動態(tài)內(nèi)存分配和線程間的數(shù)據(jù)交換。

參考文獻:

  1. Leader/Follower多線程網(wǎng)絡(luò)模型介紹

  2. DelayQueue源碼分析

  3. 死磕 java集合之PriorityQueue源碼分析

  4. 聊聊并發(fā)(七)——Java 中的阻塞隊列

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末履恩,一起剝皮案震驚了整個濱河市锰茉,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌切心,老刑警劉巖飒筑,帶你破解...
    沈念sama閱讀 218,204評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異绽昏,居然都是意外死亡协屡,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評論 3 395
  • 文/潘曉璐 我一進店門全谤,熙熙樓的掌柜王于貴愁眉苦臉地迎上來肤晓,“玉大人,你說我怎么就攤上這事〔购叮” “怎么了漫萄?”我有些...
    開封第一講書人閱讀 164,548評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長盈匾。 經(jīng)常有香客問我腾务,道長,這世上最難降的妖魔是什么威酒? 我笑而不...
    開封第一講書人閱讀 58,657評論 1 293
  • 正文 為了忘掉前任窑睁,我火速辦了婚禮,結(jié)果婚禮上葵孤,老公的妹妹穿的比我還像新娘担钮。我一直安慰自己,他們只是感情好尤仍,可當我...
    茶點故事閱讀 67,689評論 6 392
  • 文/花漫 我一把揭開白布箫津。 她就那樣靜靜地躺著,像睡著了一般宰啦。 火紅的嫁衣襯著肌膚如雪苏遥。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,554評論 1 305
  • 那天赡模,我揣著相機與錄音田炭,去河邊找鬼。 笑死漓柑,一個胖子當著我的面吹牛教硫,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播辆布,決...
    沈念sama閱讀 40,302評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼瞬矩,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了锋玲?” 一聲冷哼從身側(cè)響起景用,我...
    開封第一講書人閱讀 39,216評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎惭蹂,沒想到半個月后伞插,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,661評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡剿干,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,851評論 3 336
  • 正文 我和宋清朗相戀三年蜂怎,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片置尔。...
    茶點故事閱讀 39,977評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡杠步,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情幽歼,我是刑警寧澤朵锣,帶...
    沈念sama閱讀 35,697評論 5 347
  • 正文 年R本政府宣布,位于F島的核電站甸私,受9級特大地震影響诚些,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜皇型,卻給世界環(huán)境...
    茶點故事閱讀 41,306評論 3 330
  • 文/蒙蒙 一诬烹、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧弃鸦,春花似錦绞吁、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,898評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至购岗,卻和暖如春汰聋,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背喊积。 一陣腳步聲響...
    開封第一講書人閱讀 33,019評論 1 270
  • 我被黑心中介騙來泰國打工烹困, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人乾吻。 一個月前我還...
    沈念sama閱讀 48,138評論 3 370
  • 正文 我出身青樓韭邓,卻偏偏與公主長得像,于是被迫代替她去往敵國和親溶弟。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,927評論 2 355

推薦閱讀更多精彩內(nèi)容