源碼篇-ScheduledThreadPoolExecutor之DelayedWorkQueue

一、添加元素

public void put(Runnable e) {
    offer(e);
}

public boolean add(Runnable e) {
    return offer(e);
}

public boolean offer(Runnable e, long timeout, TimeUnit unit) {
    return offer(e);
}
  • put方法和add方法都會(huì)調(diào)用offer方法午磁,put方法沒有返回值莺戒,add返回是否添加成功
  • 因?yàn)镈elayedWorkQueue可以擴(kuò)容,添加元素沒有阻塞爽蝴,所以帶時(shí)間的offer方法最終調(diào)用的還是不帶時(shí)間的offer方法
1.offer方法
public boolean offer(Runnable x) {
    if (x == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = size;
        // 如果超過(guò)了數(shù)組的容量沐批,執(zhí)行擴(kuò)容50%
        if (i >= queue.length)
            grow();
        // 數(shù)組元素個(gè)數(shù)加1
        size = i + 1;
        // 如果是第一個(gè)元素,直接設(shè)置值
        if (i == 0) {
            queue[0] = e;
            setIndex(e, 0);
        // 如果不是第一個(gè)元素蝎亚,需要向上比較并移動(dòng)
        } else {
            siftUp(i, e);
        }
        // 如果是第一個(gè)元素兴溜,說(shuō)明之前是空的糊啡,將leader置為空幽污,通知等待獲取隊(duì)列數(shù)據(jù)的線程
        if (queue[0] == e) {
            leader = null;
            available.signal();
        }
    } finally {
        lock.unlock();
    }
    return true;
}
  • 首先看是否需要擴(kuò)容伸辟,每次擴(kuò)容1.5倍
  • 由于queue是小堆數(shù)據(jù)結(jié)構(gòu),如果是第一個(gè)元素梅惯,直接添加到數(shù)組中宪拥;如果不是第一個(gè)元素,需要與父結(jié)點(diǎn)比較并移動(dòng)
  • 如果是第一個(gè)元素个唧,需通知從隊(duì)列獲取數(shù)據(jù)的線程
private void siftUp(int k, RunnableScheduledFuture<?> key) {
    while (k > 0) {
        // 獲取父結(jié)點(diǎn)索引位置
        int parent = (k - 1) >>> 1;
        // 獲取父結(jié)點(diǎn)元素
        RunnableScheduledFuture<?> e = queue[parent];
        // 如果當(dāng)前插入的元素延遲時(shí)間或者序列號(hào)大于父結(jié)點(diǎn)江解,直接退出while循環(huán)
        if (key.compareTo(e) >= 0)
            break;
        // 走到這,說(shuō)明需要將新元素上移徙歼,將父元素替換到新元素的位置
        queue[k] = e;
        setIndex(e, k);
        // 將k的值更改為父元素的值犁河,進(jìn)行下一次循環(huán)
        k = parent;
    }
    // 重新插入新的元素
    queue[k] = key;
    // 設(shè)置元素在數(shù)組的位置
    setIndex(key, k);
}
  • DelayedWorkQueue是小堆樹的數(shù)據(jù)結(jié)構(gòu)鳖枕,如果以0為下標(biāo)開始編號(hào),當(dāng)前結(jié)點(diǎn)的在數(shù)組中的位置為i桨螺,那么當(dāng)前結(jié)點(diǎn)的父結(jié)點(diǎn)在數(shù)組的(i-1)/2位置宾符,左結(jié)點(diǎn)是2i+1的位置,右結(jié)點(diǎn)是2i+2的位置灭翔;
  • key是類ScheduledFutureTask魏烫,重寫了compareTo方法,比較的是time肝箱,sequenceNumber哄褒,NANOSECONDS大小

二、獲取元素

1. poll()
public RunnableScheduledFuture<?> poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 獲取小堆樹的根結(jié)點(diǎn)
        RunnableScheduledFuture<?> first = queue[0];
        // 如果根結(jié)點(diǎn)為空或者時(shí)間還沒到煌张,則返回null
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        // 返回根結(jié)點(diǎn)并重新平衡小堆樹
        else
            return finishPoll(first);
    } finally {
        lock.unlock();
    }
}

private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
    // 元素個(gè)數(shù)減1
    int s = --size;
    // 獲取最后一個(gè)元素
    RunnableScheduledFuture<?> x = queue[s];
    // 將最后一個(gè)元素置為空
    queue[s] = null;
    // 如果最后一個(gè)元素不是唯一的元素呐赡,那么放到根結(jié)點(diǎn),并再平衡小堆樹
    if (s != 0)
        siftDown(0, x);
    setIndex(f, -1);
    return f;
}

private void siftDown(int k, RunnableScheduledFuture<?> key) {
    int half = size >>> 1;
    // while循環(huán)骏融,k<half說(shuō)明還沒到葉子結(jié)點(diǎn)链嘀,所以一直要比較
    while (k < half) {
        // 左結(jié)點(diǎn)索引
        int child = (k << 1) + 1;
        // 獲取左結(jié)點(diǎn)元素
        RunnableScheduledFuture<?> c = queue[child];
        // 右結(jié)點(diǎn)索引
        int right = child + 1;
        // 因?yàn)榻Y(jié)點(diǎn)向下移動(dòng),所以要與子節(jié)點(diǎn)的較小值比較档玻,所以這里比較左結(jié)點(diǎn)元素與右結(jié)點(diǎn)元素大小
        // 如果右結(jié)點(diǎn)較小怀泊,將c變更為右結(jié)點(diǎn)
        if (right < size && c.compareTo(queue[right]) > 0)
            c = queue[child = right];
        // 如果當(dāng)前結(jié)點(diǎn)比子節(jié)點(diǎn)的較小值小,直接退出循環(huán)
        if (key.compareTo(c) <= 0)
            break;
        // 與較小的子結(jié)點(diǎn)調(diào)換位置误趴,再循環(huán)
        queue[k] = c;
        setIndex(c, k);
        k = child;
    }
    queue[k] = key;
    setIndex(key, k);
}
  • 如果隊(duì)列為空霹琼,或者時(shí)間還沒到,則返回空
  • 返回的元素是第一個(gè)元素冤留,將末尾元素放到一個(gè)位置碧囊,并向下再平衡
2. poll(long timeout, TimeUnit unit)
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            RunnableScheduledFuture<?> first = queue[0];
            // 隊(duì)列為空
            if (first == null) {
                // 阻塞時(shí)間小于等于0树灶,返回空
                if (nanos <= 0)
                    return null;
                // 有等待時(shí)間纤怒,那么就等待一段時(shí)間
                else
                    nanos = available.awaitNanos(nanos);
            // 隊(duì)列不為空
            } else {                
                long delay = first.getDelay(NANOSECONDS);
                // 時(shí)間到了,返回第一個(gè)元素
                if (delay <= 0)
                    return finishPoll(first);
                // 時(shí)間沒到天通,如果等待時(shí)間小于等于0泊窘,返回空
                if (nanos <= 0)
                    return null;
                first = null; // don't retain ref while waiting
                // 如果等待時(shí)間小于延遲時(shí)間
                // 或者
                // leader不為空,說(shuō)明有l(wèi)eader線程
                // 那么等待一段時(shí)間
                if (nanos < delay || leader != null)
                    nanos = available.awaitNanos(nanos);
                // 等待時(shí)間大于延遲時(shí)間且leader為空
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 因?yàn)榈却龝r(shí)間大于延遲時(shí)間像寒,所以這里只能等待延遲時(shí)間
                        long timeLeft = available.awaitNanos(delay);
                        nanos -= delay - timeLeft;
                    } finally {
                        // leader線程結(jié)束
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 如果沒有l(wèi)eader線程且隊(duì)列不為空烘豹,喚醒獲取元素的線程
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}
  • 如果隊(duì)列為空,等待時(shí)間小于等于0诺祸,直接返回空携悯,如果有等待時(shí)間,那么就阻塞一段時(shí)間筷笨;
  • 如果隊(duì)列不為空
    • 任務(wù)時(shí)間到了憔鬼,返回隊(duì)列第一個(gè)位置的元素
    • 任務(wù)時(shí)間沒到龟劲,但是等待的時(shí)間小于等于0,那么返回空
    • 如果等待的時(shí)間小于任務(wù)執(zhí)行的時(shí)間點(diǎn)轴或,或者leader線程不為空昌跌,那么阻塞等待的時(shí)間
    • 如果等待時(shí)間大于任務(wù)執(zhí)行的時(shí)間點(diǎn)且leader線程為空,那么阻塞任務(wù)執(zhí)行的時(shí)間點(diǎn)
    • 最后如果leader線程不為空且隊(duì)列不為空照雁,那么通知獲取對(duì)列元素的線程
3. take()
public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            RunnableScheduledFuture<?> first = queue[0];
            // 如果隊(duì)列為空蚕愤,一直阻塞
            if (first == null)
                available.await();
            // 隊(duì)列不為空
            else {
                // 獲取任務(wù)執(zhí)行的延遲時(shí)間
                long delay = first.getDelay(NANOSECONDS);
                // 沒有延遲時(shí)間,返回隊(duì)列的元素
                if (delay <= 0)
                    return finishPoll(first);
                first = null; // don't retain ref while waiting
                // 如果leader線程不為空饺蚊,阻塞當(dāng)前線程
                if (leader != null)
                    available.await();
                // 如果leader線程為空萍诱,那么阻塞延遲的時(shí)間
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 如果leader線程為空且隊(duì)列不為空,通知等待獲取隊(duì)列元素的線程
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}
  • 如果隊(duì)列為空污呼,一直阻塞砂沛,直至被喚醒
  • 如果隊(duì)列不為空,到了任務(wù)的延遲時(shí)間曙求,那么從隊(duì)列里彈出任務(wù)碍庵;如果沒到任務(wù)的延遲時(shí)間,那么阻塞任務(wù)悟狱,阻塞時(shí)間為任務(wù)的延遲時(shí)間静浴,但是如果leader線程不為空,那么會(huì)一直阻塞挤渐,直至被喚醒
4. peek()
public RunnableScheduledFuture<?> peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return queue[0];
    } finally {
        lock.unlock();
    }
}
  • 直接返回第一個(gè)元素苹享,不是彈出,不判斷是否到了延遲時(shí)間
5. peekExpired()
private RunnableScheduledFuture<?> peekExpired() {
    // assert lock.isHeldByCurrentThread();
    RunnableScheduledFuture<?> first = queue[0];
    return (first == null || first.getDelay(NANOSECONDS) > 0) ?
        null : first;
}
  • 只返回到期的數(shù)據(jù)

三浴麻、刪除元素

public boolean remove(Object x) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 獲取刪除元素的索引
        int i = indexOf(x);
        if (i < 0)
            return false;

        // 將刪除的元素索引設(shè)為-1
        setIndex(queue[i], -1);
        // 元素個(gè)數(shù)減1
        int s = --size;
        // 用數(shù)組尾部元素替換刪除的結(jié)點(diǎn)
        RunnableScheduledFuture<?> replacement = queue[s];
        queue[s] = null;
        // 如果刪除不是尾部元素
        if (s != i) {
            // 先向下平衡
            siftDown(i, replacement);
            // 如果向下平衡失敗得问,也就是下面的都是大于當(dāng)前結(jié)點(diǎn),那么就想上平衡
            if (queue[i] == replacement)
                siftUp(i, replacement);
        }
        return true;
    } finally {
        lock.unlock();
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末软免,一起剝皮案震驚了整個(gè)濱河市宫纬,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌膏萧,老刑警劉巖漓骚,帶你破解...
    沈念sama閱讀 217,185評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異榛泛,居然都是意外死亡蝌蹂,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門曹锨,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)孤个,“玉大人,你說(shuō)我怎么就攤上這事沛简∑肜穑” “怎么了硅急?”我有些...
    開封第一講書人閱讀 163,524評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)佳遂。 經(jīng)常有香客問(wèn)我营袜,道長(zhǎng),這世上最難降的妖魔是什么丑罪? 我笑而不...
    開封第一講書人閱讀 58,339評(píng)論 1 293
  • 正文 為了忘掉前任荚板,我火速辦了婚禮,結(jié)果婚禮上吩屹,老公的妹妹穿的比我還像新娘跪另。我一直安慰自己,他們只是感情好煤搜,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,387評(píng)論 6 391
  • 文/花漫 我一把揭開白布免绿。 她就那樣靜靜地躺著,像睡著了一般擦盾。 火紅的嫁衣襯著肌膚如雪嘲驾。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,287評(píng)論 1 301
  • 那天迹卢,我揣著相機(jī)與錄音辽故,去河邊找鬼。 笑死腐碱,一個(gè)胖子當(dāng)著我的面吹牛誊垢,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播症见,決...
    沈念sama閱讀 40,130評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼喂走,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了谋作?” 一聲冷哼從身側(cè)響起芋肠,我...
    開封第一講書人閱讀 38,985評(píng)論 0 275
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎瓷们,沒想到半個(gè)月后业栅,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體秒咐,經(jīng)...
    沈念sama閱讀 45,420評(píng)論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡谬晕,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,617評(píng)論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了携取。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片攒钳。...
    茶點(diǎn)故事閱讀 39,779評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖雷滋,靈堂內(nèi)的尸體忽然破棺而出不撑,到底是詐尸還是另有隱情文兢,我是刑警寧澤,帶...
    沈念sama閱讀 35,477評(píng)論 5 345
  • 正文 年R本政府宣布焕檬,位于F島的核電站姆坚,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏实愚。R本人自食惡果不足惜兼呵,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,088評(píng)論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望腊敲。 院中可真熱鬧击喂,春花似錦、人聲如沸碰辅。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,716評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)没宾。三九已至凌彬,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間循衰,已是汗流浹背饿序。 一陣腳步聲響...
    開封第一講書人閱讀 32,857評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留羹蚣,地道東北人原探。 一個(gè)月前我還...
    沈念sama閱讀 47,876評(píng)論 2 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像顽素,于是被迫代替她去往敵國(guó)和親咽弦。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,700評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容