ScheduledThreadPoolExecutor 源碼分析

概述

ScheduledThreadPoolExecutor是執(zhí)行定時(shí)調(diào)度的線程池寸癌,它是ThreadPoolExecutor的子類。今天我們就一起看看ScheduledThreadPoolExecutor的源碼钮糖,測(cè)試?yán)舆€是在github中怠苔。

構(gòu)造方法

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

可以看到ScheduledThreadPoolExecutor默認(rèn)最大線程數(shù)使用的無(wú)限線程垃杖,其實(shí)這個(gè)值沒有效果酬屉,我們通過ThreadPoolExecutor源碼知道,只有隊(duì)列滿了才會(huì)蜈缤,創(chuàng)建核心線程數(shù)以外的線程處理任務(wù)拾氓。而DelayedWorkQueue又是無(wú)界的,DelayedWorkQueue我們先知道它是無(wú)界的有序的隊(duì)列就好底哥,稍后再詳細(xì)分析咙鞍,先一起看ScheduledThreadPoolExecutor常用的scheduleWithFixedDelay方法。

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalArgumentException();
    //通過提交的Runnable對(duì)象趾徽,構(gòu)造一個(gè)ScheduledFutureTask對(duì)象
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(-delay));
    //t其實(shí)就是ScheduledFutureTask對(duì)象sft                                  
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    //延遲執(zhí)行t
    delayedExecute(t);
    return t;
}

可以看到就是將提交的Runnable對(duì)象封裝成ScheduledFutureTask對(duì)象執(zhí)行任務(wù)续滋,一起看看ScheduledFutureTask類吧。
ScheduledFutureTask類結(jié)構(gòu)

private class ScheduledFutureTask<V>
        extends FutureTask<V> implements RunnableScheduledFuture<V> {

    //加入線程池的任務(wù)序號(hào)孵奶,用于任務(wù)的比較
    private final long sequenceNumber;

    //任務(wù)執(zhí)行的時(shí)間(納秒數(shù))
    private long time;

    //重復(fù)任務(wù)的周期疲酌,以納秒為單位(值為0時(shí)表示只執(zhí)行一次)
    private final long period;

    //用于重復(fù)執(zhí)行時(shí),重新進(jìn)入隊(duì)列排隊(duì)
    RunnableScheduledFuture<V> outerTask = this;

    //對(duì)象在隊(duì)列中的下標(biāo)值了袁,方便取消任務(wù)時(shí)朗恳,快速移除節(jié)點(diǎn)
    int heapIndex;
    
    //構(gòu)造方法
    ScheduledFutureTask(Runnable r, V result, long ns) {
        super(r, result);
        this.time = ns;
        this.period = 0;
        this.sequenceNumber = sequencer.getAndIncrement();
    }
    。载绿。粥诫。
}

該類有兩個(gè)重要的方法: compareTo用于排序確定在隊(duì)列中的位置,run方法是任務(wù)執(zhí)行時(shí)調(diào)用的方法崭庸。

public int compareTo(Delayed other) {
    if (other == this) // compare zero if same object
        return 0;
    if (other instanceof ScheduledFutureTask) {
        ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
        //比較任務(wù)執(zhí)行的時(shí)間怀浆,時(shí)間相同時(shí),根據(jù)sequenceNumber確定大小
        long diff = time - x.time;
        if (diff < 0)
            return -1;
        else if (diff > 0)
            return 1;
        else if (sequenceNumber < x.sequenceNumber)
            return -1;
        else
            return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
public void run() {
    //根據(jù)periodic是否為0判斷是否是周期性任務(wù)
    boolean periodic = isPeriodic();
    //判斷為非可執(zhí)行任務(wù)狀態(tài)時(shí)怕享,取消任務(wù)
    //(注意進(jìn)方法內(nèi)部可以看到执赡,判斷中多一個(gè)關(guān)閉線程池后是否繼續(xù)執(zhí)行后續(xù)延遲任務(wù)或周期性任務(wù),就是說ScheduledThreadPoolExecutor提供了一個(gè)關(guān)閉線程池函筋,仍然執(zhí)行定時(shí)任務(wù)的方法)
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    //不是周期性任務(wù)沙合,直接執(zhí)行    
    else if (!periodic)
        ScheduledFutureTask.super.run();
    //是周期性任務(wù)runAndReset方法會(huì)執(zhí)行在執(zhí)行結(jié)束時(shí)將任務(wù)的狀態(tài)重置為NEW,便于下次再次執(zhí)行    
    else if (ScheduledFutureTask.super.runAndReset()) {
        //設(shè)置下周執(zhí)行的時(shí)間
        setNextRunTime();
        //再次執(zhí)行周期行任務(wù)
        reExecutePeriodic(outerTask);
    }
}

reExecutePeriodic方法 再次執(zhí)行周期性任務(wù)

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    if (canRunInCurrentRunState(true)) {
        //將任務(wù)重新入隊(duì)
        super.getQueue().add(task);
        //如果線程池已關(guān)閉驻呐,將任務(wù)取消
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else
            ensurePrestart();//再次執(zhí)行任務(wù)
    }
}

ensurePrestart 方法

void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    //如果線程池小于核心線程數(shù)灌诅,新建線程執(zhí)行,否則在隊(duì)列中等待線程池中的線程調(diào)用getTask方法
    if (wc < corePoolSize)
        addWorker(null, true); //這里加了一個(gè)空任務(wù)含末,說明讓從延遲隊(duì)列中獲取任務(wù)執(zhí)行,達(dá)到延遲執(zhí)行的目的
    else if (wc == 0)
        addWorker(null, false);
}

任務(wù)的運(yùn)行周期性執(zhí)行邏輯我們已經(jīng)看了即舌,我們?cè)俅位氐絪cheduleAtFixedRate方法中佣盒,看看任務(wù)的第一次執(zhí)行,可以看到方法中有個(gè)delayedExecute方法顽聂。

private void delayedExecute(RunnableScheduledFuture<?> task) {
    //如果線程池是關(guān)閉狀態(tài)肥惭,則拒絕任務(wù)
    if (isShutdown())
        reject(task);
    else {
        //將任務(wù)入隊(duì)
        super.getQueue().add(task);
        //入隊(duì)后有個(gè)反悔期盯仪,如果線程池已關(guān)閉,移除任務(wù)
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            ensurePrestart(); //這里也是調(diào)用ensurePrestart蜜葱,如果線程數(shù)量小于核心線程數(shù)全景,則新建線程執(zhí)行任務(wù)
    }
}

至此,任務(wù)的執(zhí)行邏輯已經(jīng)清楚了牵囤,scheduleWithFixedDelay每次執(zhí)行完才會(huì)放一個(gè)延遲任務(wù)爸黄,而scheduleAtFixedRate是執(zhí)行前,先放一個(gè)延遲任務(wù)入隊(duì)揭鳞,所以scheduleWithFixedDelay是每次執(zhí)行完成后的指定時(shí)間后炕贵,再執(zhí)行下一任務(wù),scheduleAtFixedRate是固定時(shí)間后執(zhí)行下一任務(wù)野崇,不管前面的任務(wù)是否完成称开。
下面我們看看ScheduledThreadPoolExecutor中的阻塞隊(duì)列和普通的隊(duì)列有啥區(qū)別。
DelayedWorkQueue 有序乓梨、無(wú)界鳖轰,其實(shí)就是一個(gè)專門接受RunnableScheduledFuture對(duì)象的DelayQueue隊(duì)列。
在上面的分析中扶镀,可以看到任務(wù)執(zhí)行前都是先入隊(duì)蕴侣,DelayedWorkQueue的add方法

public boolean add(Runnable e) {
    return offer(e);
}

public boolean offer(Runnable x) {
    if (x == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = size;
        //隊(duì)列滿時(shí),將數(shù)組增加百分之五十
        if (i >= queue.length)
            grow();
        size = i + 1;
        //第一次入隊(duì)時(shí)狈惫,放入隊(duì)列的第一個(gè)位置
        if (i == 0) {
            queue[0] = e;
            setIndex(e, 0);
        } else {
            //將節(jié)點(diǎn)入隊(duì)
            siftUp(i, e);
        }
        //如果新入隊(duì)的節(jié)點(diǎn)成為了頭節(jié)點(diǎn)睛蛛,leader線程置為空,進(jìn)行一次喚醒等待線程
        if (queue[0] == e) {
            leader = null;
            available.signal();
        }
    } finally {
        lock.unlock();
    }
    return true;
}

siftUp方法

private void siftUp(int k, RunnableScheduledFuture<?> key) {
    //將等待隊(duì)列數(shù)組構(gòu)建成一個(gè)小頂堆
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        RunnableScheduledFuture<?> e = queue[parent];
        if (key.compareTo(e) >= 0)
            break;
        queue[k] = e;
        setIndex(e, k);
        k = parent;
    }
    queue[k] = key;
    setIndex(key, k);
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末胧谈,一起剝皮案震驚了整個(gè)濱河市忆肾,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌菱肖,老刑警劉巖客冈,帶你破解...
    沈念sama閱讀 217,277評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異稳强,居然都是意外死亡场仲,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門退疫,熙熙樓的掌柜王于貴愁眉苦臉地迎上來渠缕,“玉大人,你說我怎么就攤上這事褒繁∫嗔郏” “怎么了?”我有些...
    開封第一講書人閱讀 163,624評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)燕差。 經(jīng)常有香客問我遭笋,道長(zhǎng),這世上最難降的妖魔是什么徒探? 我笑而不...
    開封第一講書人閱讀 58,356評(píng)論 1 293
  • 正文 為了忘掉前任瓦呼,我火速辦了婚禮,結(jié)果婚禮上测暗,老公的妹妹穿的比我還像新娘央串。我一直安慰自己,他們只是感情好偷溺,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,402評(píng)論 6 392
  • 文/花漫 我一把揭開白布蹋辅。 她就那樣靜靜地躺著,像睡著了一般挫掏。 火紅的嫁衣襯著肌膚如雪侦另。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,292評(píng)論 1 301
  • 那天尉共,我揣著相機(jī)與錄音褒傅,去河邊找鬼。 笑死袄友,一個(gè)胖子當(dāng)著我的面吹牛殿托,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播剧蚣,決...
    沈念sama閱讀 40,135評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼支竹,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了鸠按?” 一聲冷哼從身側(cè)響起礼搁,我...
    開封第一講書人閱讀 38,992評(píng)論 0 275
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎目尖,沒想到半個(gè)月后馒吴,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,429評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡瑟曲,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,636評(píng)論 3 334
  • 正文 我和宋清朗相戀三年饮戳,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片洞拨。...
    茶點(diǎn)故事閱讀 39,785評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡扯罐,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出烦衣,到底是詐尸還是另有隱情篮赢,我是刑警寧澤齿椅,帶...
    沈念sama閱讀 35,492評(píng)論 5 345
  • 正文 年R本政府宣布琉挖,位于F島的核電站启泣,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏示辈。R本人自食惡果不足惜寥茫,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,092評(píng)論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望矾麻。 院中可真熱鬧纱耻,春花似錦、人聲如沸险耀。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,723評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)甩牺。三九已至蘑志,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間贬派,已是汗流浹背急但。 一陣腳步聲響...
    開封第一講書人閱讀 32,858評(píng)論 1 269
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留搞乏,地道東北人波桩。 一個(gè)月前我還...
    沈念sama閱讀 47,891評(píng)論 2 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像请敦,于是被迫代替她去往敵國(guó)和親镐躲。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,713評(píng)論 2 354