第9章 Java并發(fā)包中ScheduledThreadPoolExecutor原理探究

目錄

類圖結(jié)構(gòu)

ScheduledThreadPoolExecutor時一個可以在指定一定延遲時間后或者定時進(jìn)行任務(wù)調(diào)度執(zhí)行的線程池硫豆。

ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor并實(shí)現(xiàn)了ScheduledExecutorService接口呀打。

線程池隊列是DelayedWorkQueue伟叛,與DelayedQueue一樣屬于延遲隊列寂汇。

ScheduledFuturetask是具有返回值的任務(wù)瘪撇,繼承自FutureTask。FutureTask內(nèi)部用一個變量state來表示任務(wù)的狀態(tài)干茉,一開始為NEW论颅。

各狀態(tài)意義如下:

private static final int NEW          = 0; // 初始狀態(tài)
private static final int COMPLETING   = 1; // 執(zhí)行中
private static final int NORMAL       = 2; // 正常運(yùn)行結(jié)束
private static final int EXCEPTIONAL  = 3; // 運(yùn)行中異常
private static final int CANCELLED    = 4; // 任務(wù)被取消
private static final int INTERRUPTING = 5; // 任務(wù)正在被中斷
private static final int INTERRUPTED  = 6; // 任務(wù)已經(jīng)被中斷

ScheduledFutureTask內(nèi)部用一個變量period來表示任務(wù)的類型:

  • period=0,說明當(dāng)前任務(wù)是一次性的镜雨,執(zhí)行完畢后就推出了嫂侍。
  • period為負(fù)數(shù),說明當(dāng)前任務(wù)為固定延遲的定時可重復(fù)執(zhí)行任務(wù)(執(zhí)行完一次后會停止指定時間后再次運(yùn)行荚坞,若每次執(zhí)行任務(wù)耗時不同挑宠,則顯然相鄰兩次任務(wù)執(zhí)行間隔不同)。
  • period為正數(shù)颓影,說明當(dāng)前任務(wù)為固定頻率的定尺可重復(fù)執(zhí)行任務(wù)(也即固定周期)各淀。

以下為ScheduledThreadPoolExecutor的構(gòu)造函數(shù):

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
            new DelayedWorkQueue());
}

// 指定了線程工廠
public ScheduledThreadPoolExecutor(int corePoolSize,
                                    ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
            new DelayedWorkQueue(), threadFactory);
}

// 指定了拒絕策略
public ScheduledThreadPoolExecutor(int corePoolSize,
                                    RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
            new DelayedWorkQueue(), handler);
}

// 指定了線程工廠和拒絕策略
public ScheduledThreadPoolExecutor(int corePoolSize,
                                    ThreadFactory threadFactory,
                                    RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
            new DelayedWorkQueue(), threadFactory, handler);
}        

從上面的代碼中可以看到,ScheduledThreadPoolExecutor的線程池隊列為DelayedWorkQueue诡挂。

源碼分析

schedule(Runnable command, long delay, TimeUnit unit)

提交一個延遲執(zhí)行的任務(wù)碎浇,任務(wù)從提交時間算起延遲單位為unit的delay后開始執(zhí)行。提交的任務(wù)不是周期性任務(wù)璃俗,任務(wù)只會執(zhí)行一次奴璃。

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
    // 參數(shù)校驗(yàn)
    if (command == null || unit == null)
        throw new NullPointerException();
    // 將任務(wù)包裝成ScheduledFutureTask
    // triggerTime方法用來計算觸發(fā)時間(即任務(wù)開始執(zhí)行的絕對時間)
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));
    // 添加任務(wù)到延遲隊列
    delayedExecute(t);
    return t;
}

以下是ScheduledFutureTask的相關(guān)代碼:

ScheduledFutureTask(Runnable r, V result, long ns) {
    // 調(diào)用父類構(gòu)造函數(shù)
    super(r, result);
    this.time = ns; // 等待ns納秒后開始執(zhí)行
    this.period = 0; // period=0說明為一次性任務(wù)
    // 記錄任務(wù)編號
    this.sequenceNumber = sequencer.getAndIncrement(); 
}

public FutureTask(Runnable runnable, V result) {
    // 將Runnable任務(wù)轉(zhuǎn)化成Callable任務(wù)
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

delayedExecute的代碼如下:

private void delayedExecute(RunnableScheduledFuture<?> task) {
    //  線程池關(guān)閉則執(zhí)行拒絕策略
    if (isShutdown())
        reject(task);
    else {
        // 將任務(wù)添加到任務(wù)隊列中
        // 任務(wù)隊列為DelayedWorkQueue
        // 所加的task實(shí)現(xiàn)了comparable接口
        // 添加到任務(wù)隊列中能保證隊首元素為最早需要執(zhí)行的
        super.getQueue().add(task);
        // 再次檢查線程池是否關(guān)閉
        // 因?yàn)閳?zhí)行上面add代碼過程中線程池完全有可能被關(guān)閉
        // 如果線程池被關(guān)閉則判斷當(dāng)前任務(wù)是否可以在當(dāng)前狀態(tài)下繼續(xù)執(zhí)行
        // 不能繼續(xù)執(zhí)行則移除當(dāng)前任務(wù)
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            // 保證至少有一個線程存活可以從任務(wù)隊列中獲取任務(wù)處理任務(wù)
            ensurePrestart();
    }
}

// 判斷任務(wù)是否是周期執(zhí)行的
public boolean isPeriodic() {
    return period != 0;
}

// 根據(jù)periodic來決定isRunningOrShutdown的參數(shù)
// continueExistingPeriodicTasksAfterShutdown和
// executeExistingDelayedTasksAfterShutdown的值可通過相應(yīng)的setter方法來設(shè)置
// 為true表示線程池關(guān)閉后當(dāng)前任務(wù)會繼續(xù)執(zhí)行完畢
// 為false則取消當(dāng)前任務(wù)
boolean canRunInCurrentRunState(boolean periodic) {
    return isRunningOrShutdown(periodic ?
                                continueExistingPeriodicTasksAfterShutdown :
                                executeExistingDelayedTasksAfterShutdown);
}

// 確保至少有一個線程存活來執(zhí)行任務(wù)
void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

線程池中具體執(zhí)行任務(wù)的是Worker,Worker通過調(diào)用run方法來執(zhí)行城豁。這里的任務(wù)是ScheduledFutureTask溺健,下面來看其run方法。

// ScheduledFutureTask.run
public void run() {
    boolean periodic = isPeriodic();
    // 判斷是否需要取消任務(wù)
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    // 一次性任務(wù)
    else if (!periodic)
        // 調(diào)用FutureTask的run方法
        ScheduledFutureTask.super.run();
    // 周期性任務(wù)
    // runAndReset為FutureTask中的方法
    // 用于執(zhí)行當(dāng)前任務(wù)但不改變future的狀態(tài)
    else if (ScheduledFutureTask.super.runAndReset()) {
        // 設(shè)置下次執(zhí)行的時間
        setNextRunTime();
        // 默認(rèn)情況下钮蛛,outerTask = this就是當(dāng)前對象
        reExecutePeriodic(outerTask);
    }
}

// 設(shè)置周期性任務(wù)下次執(zhí)行時間
private void setNextRunTime() {
    long p = period;
    // p > 0表示任務(wù)執(zhí)行頻率一定
    if (p > 0)
        // time為此次任務(wù)(已執(zhí)行完畢)剛開始執(zhí)行時的時間
        time += p;
    // p < 0表示任務(wù)固定延遲時間
    // 即此次任務(wù)完成后會等待-p時間再執(zhí)行下次任務(wù)  
    else
        // 獲取-p時間后的絕對時間
        time = triggerTime(-p);
}

// 周期性執(zhí)行
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    if (canRunInCurrentRunState(true)) {
        // 再次將task添加至任務(wù)隊列中等待執(zhí)行
        // 當(dāng)輪到task執(zhí)行時鞭缭,又會在run中調(diào)用此方法
        // 再次將自身添加到任務(wù)隊列中,從而達(dá)到周期性執(zhí)行效果
        super.getQueue().add(task);
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else
            ensurePrestart();
    }
}

scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)

當(dāng)任務(wù)執(zhí)行完畢后魏颓,讓其延遲固定時間后再次運(yùn)行岭辣。

// command:所要執(zhí)行的任務(wù)
// initialDelay: 提交任務(wù)后等待多少時間再開始執(zhí)行任務(wù)
// delay: 一次任務(wù)執(zhí)行完后等待多少時間才執(zhí)行下次任務(wù)
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                    long initialDelay,
                                                    long delay,
                                                    TimeUnit unit) {
    // 參數(shù)校驗(yàn)                                                      
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalArgumentException();
    // 將Runnable任務(wù)轉(zhuǎn)換成ScheduledFutureTask
    // 第四個參數(shù)period為-delay < 0表示當(dāng)前任務(wù)是固定延遲的
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                        null,
                                        triggerTime(initialDelay, unit),
                                        unit.toNanos(-delay));
    // decorateTask默認(rèn)直接返回第二個參數(shù),即sft                                     
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    // 默認(rèn)情況下t = sft
    // outerTask即為sft自身甸饱,用于實(shí)現(xiàn)周期性執(zhí)行
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

// 可被子類重寫拓展
// 默認(rèn)實(shí)現(xiàn)只是簡單的返回task
protected <V> RunnableScheduledFuture<V> decorateTask(
    Runnable runnable, RunnableScheduledFuture<V> task) {
    return task;
}

主要不同在于設(shè)置了period=-delay沦童,其他代碼與schedule相同,相應(yīng)的代碼會判斷period的取值從而決定程序不同的行為叹话。

scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)

按固定頻率周期性地執(zhí)行任務(wù)偷遗。

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                long initialDelay,
                                                long period,
                                                TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    // 關(guān)鍵在于此處第四個參數(shù)period > 0   
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                        null,
                                        triggerTime(initialDelay, unit),
                                        unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

除了設(shè)置period的值大于0外,總體與scheduleWithFixedDelay類似驼壶,不再贅述氏豌。

更多

相關(guān)筆記:《Java并發(fā)編程之美》閱讀筆記

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市热凹,隨后出現(xiàn)的幾起案子泵喘,更是在濱河造成了極大的恐慌泪电,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,817評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件纪铺,死亡現(xiàn)場離奇詭異相速,居然都是意外死亡聚蝶,警方通過查閱死者的電腦和手機(jī)敷扫,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,329評論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來睦袖,“玉大人芜繁,你說我怎么就攤上這事旺隙。” “怎么了浆洗?”我有些...
    開封第一講書人閱讀 157,354評論 0 348
  • 文/不壞的土叔 我叫張陵催束,是天一觀的道長集峦。 經(jīng)常有香客問我伏社,道長,這世上最難降的妖魔是什么塔淤? 我笑而不...
    開封第一講書人閱讀 56,498評論 1 284
  • 正文 為了忘掉前任摘昌,我火速辦了婚禮,結(jié)果婚禮上高蜂,老公的妹妹穿的比我還像新娘聪黎。我一直安慰自己,他們只是感情好备恤,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,600評論 6 386
  • 文/花漫 我一把揭開白布稿饰。 她就那樣靜靜地躺著,像睡著了一般露泊。 火紅的嫁衣襯著肌膚如雪喉镰。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,829評論 1 290
  • 那天惭笑,我揣著相機(jī)與錄音侣姆,去河邊找鬼。 笑死沉噩,一個胖子當(dāng)著我的面吹牛捺宗,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播川蒙,決...
    沈念sama閱讀 38,979評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼蚜厉,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了畜眨?” 一聲冷哼從身側(cè)響起弯囊,我...
    開封第一講書人閱讀 37,722評論 0 266
  • 序言:老撾萬榮一對情侶失蹤痰哨,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后匾嘱,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體斤斧,經(jīng)...
    沈念sama閱讀 44,189評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,519評論 2 327
  • 正文 我和宋清朗相戀三年霎烙,在試婚紗的時候發(fā)現(xiàn)自己被綠了撬讽。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,654評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡悬垃,死狀恐怖游昼,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情尝蠕,我是刑警寧澤烘豌,帶...
    沈念sama閱讀 34,329評論 4 330
  • 正文 年R本政府宣布,位于F島的核電站看彼,受9級特大地震影響廊佩,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜靖榕,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,940評論 3 313
  • 文/蒙蒙 一标锄、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧茁计,春花似錦料皇、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,762評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至娜膘,卻和暖如春逊脯,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背劲绪。 一陣腳步聲響...
    開封第一講書人閱讀 31,993評論 1 266
  • 我被黑心中介騙來泰國打工男窟, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人贾富。 一個月前我還...
    沈念sama閱讀 46,382評論 2 360
  • 正文 我出身青樓歉眷,卻偏偏與公主長得像,于是被迫代替她去往敵國和親颤枪。 傳聞我的和親對象是個殘疾皇子汗捡,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,543評論 2 349

推薦閱讀更多精彩內(nèi)容