原創(chuàng)文章&經(jīng)驗(yàn)總結(jié)&從校招到A廠一路陽光一路滄桑
詳情請戳www.codercc.com
1. ScheduledThreadPoolExecutor簡介
ScheduledThreadPoolExecutor可以用來在給定延時后執(zhí)行異步任務(wù)或者周期性執(zhí)行任務(wù),相對于任務(wù)調(diào)度的Timer來說,其功能更加強(qiáng)大色徘,Timer只能使用一個后臺線程執(zhí)行任務(wù),而ScheduledThreadPoolExecutor則可以通過構(gòu)造函數(shù)來指定后臺線程的個數(shù)项贺。ScheduledThreadPoolExecutor類的UML圖如下:
- 從UML圖可以看出,ScheduledThreadPoolExecutor繼承了
ThreadPoolExecutor
峭判,也就是說ScheduledThreadPoolExecutor擁有execute()和submit()提交異步任務(wù)的基礎(chǔ)功能开缎,關(guān)于ThreadPoolExecutor可以看這篇文章。但是林螃,ScheduledThreadPoolExecutor類實(shí)現(xiàn)了ScheduledExecutorService
奕删,該接口定義了ScheduledThreadPoolExecutor能夠延時執(zhí)行任務(wù)和周期執(zhí)行任務(wù)的功能; - ScheduledThreadPoolExecutor也兩個重要的內(nèi)部類:DelayedWorkQueue和ScheduledFutureTask疗认⊥瓴校可以看出DelayedWorkQueue實(shí)現(xiàn)了BlockingQueue接口,也就是一個阻塞隊列横漏,ScheduledFutureTask則是繼承了FutureTask類谨设,也表示該類用于返回異步任務(wù)的結(jié)果。這兩個關(guān)鍵類缎浇,下面會具體詳細(xì)來看扎拣。
1.1 構(gòu)造方法
ScheduledThreadPoolExecutor有如下幾個構(gòu)造方法:
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
};
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}素跺;
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}二蓝;
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
可以看出由于ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor,它的構(gòu)造方法實(shí)際上是調(diào)用了ThreadPoolExecutor指厌,對ThreadPoolExecutor的介紹可以可以看這篇文章刊愚,理解ThreadPoolExecutor構(gòu)造方法的幾個參數(shù)的意義后,理解這就很容易了踩验∨阜蹋可以看出商玫,ScheduledThreadPoolExecutor的核心線程池的線程個數(shù)為指定的corePoolSize,當(dāng)核心線程池的線程個數(shù)達(dá)到corePoolSize后衙传,就會將任務(wù)提交給有界阻塞隊列DelayedWorkQueue决帖,對DelayedWorkQueue在下面進(jìn)行詳細(xì)介紹,線程池允許最大的線程個數(shù)為Integer.MAX_VALUE蓖捶,也就是說理論上這是一個大小無界的線程池。
1.2 特有方法
ScheduledThreadPoolExecutor實(shí)現(xiàn)了ScheduledExecutorService
接口扁远,該接口定義了可延時執(zhí)行異步任務(wù)和可周期執(zhí)行異步任務(wù)的特有功能俊鱼,相應(yīng)的方法分別為:
//達(dá)到給定的延時時間后,執(zhí)行任務(wù)畅买。這里傳入的是實(shí)現(xiàn)Runnable接口的任務(wù)并闲,
//因此通過ScheduledFuture.get()獲取結(jié)果為null
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
//達(dá)到給定的延時時間后,執(zhí)行任務(wù)谷羞。這里傳入的是實(shí)現(xiàn)Callable接口的任務(wù)帝火,
//因此,返回的是任務(wù)的最終計算結(jié)果
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
//是以上一個任務(wù)開始的時間計時湃缎,period時間過去后犀填,
//檢測上一個任務(wù)是否執(zhí)行完畢,如果上一個任務(wù)執(zhí)行完畢嗓违,
//則當(dāng)前任務(wù)立即執(zhí)行九巡,如果上一個任務(wù)沒有執(zhí)行完畢,則需要等上一個任務(wù)執(zhí)行完畢后立即執(zhí)行
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
//當(dāng)達(dá)到延時時間initialDelay后蹂季,任務(wù)開始執(zhí)行冕广。上一個任務(wù)執(zhí)行結(jié)束后到下一次
//任務(wù)執(zhí)行,中間延時時間間隔為delay偿洁。以這種方式撒汉,周期性執(zhí)行任務(wù)。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
2. 可周期性執(zhí)行的任務(wù)---ScheduledFutureTask
ScheduledThreadPoolExecutor最大的特色是能夠周期性執(zhí)行異步任務(wù)涕滋,當(dāng)調(diào)用schedule,scheduleAtFixedRate和scheduleWithFixedDelay方法
時睬辐,實(shí)際上是將提交的任務(wù)轉(zhuǎn)換成的ScheduledFutureTask類,從源碼就可以看出何吝。以schedule方法為例:
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
可以看出溉委,通過decorateTask
會將傳入的Runnable轉(zhuǎn)換成ScheduledFutureTask
類。線程池最大作用是將任務(wù)和線程進(jìn)行解耦爱榕,線程主要是任務(wù)的執(zhí)行者瓣喊,而任務(wù)也就是現(xiàn)在所說的ScheduledFutureTask。緊接著黔酥,會想到任何線程執(zhí)行任務(wù)藻三,總會調(diào)用run()
方法洪橘。為了保證ScheduledThreadPoolExecutor能夠延時執(zhí)行任務(wù)以及能夠周期性執(zhí)行任務(wù),ScheduledFutureTask重寫了run方法:
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
//如果不是周期性執(zhí)行任務(wù)棵帽,則直接調(diào)用run方法
ScheduledFutureTask.super.run();
//如果是周期性執(zhí)行任務(wù)的話熄求,需要重設(shè)下一次執(zhí)行任務(wù)的時間
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
從源碼可以很明顯的看出,在重寫的run方法中會先if (!periodic)
判斷當(dāng)前任務(wù)是否是周期性任務(wù)逗概,如果不是的話就直接調(diào)用run()方法
弟晚;否則的話執(zhí)行setNextRunTime()
方法重設(shè)下一次任務(wù)執(zhí)行的時間,并通過reExecutePeriodic(outerTask)
方法將下一次待執(zhí)行的任務(wù)放置到DelayedWorkQueue
中逾苫。
因此卿城,可以得出結(jié)論:ScheduledFutureTask
最主要的功能是根據(jù)當(dāng)前任務(wù)是否具有周期性,對異步任務(wù)進(jìn)行進(jìn)一步封裝铅搓。如果不是周期性任務(wù)(調(diào)用schedule方法)則直接通過run()
執(zhí)行瑟押,若是周期性任務(wù),則需要在每一次執(zhí)行完后星掰,重設(shè)下一次執(zhí)行的時間多望,然后將下一次任務(wù)繼續(xù)放入到阻塞隊列中。
3. DelayedWorkQueue
在ScheduledThreadPoolExecutor中還有另外的一個重要的類就是DelayedWorkQueue氢烘。為了實(shí)現(xiàn)其ScheduledThreadPoolExecutor能夠延時執(zhí)行異步任務(wù)以及能夠周期執(zhí)行任務(wù)怀偷,DelayedWorkQueue進(jìn)行相應(yīng)的封裝。DelayedWorkQueue是一個基于堆的數(shù)據(jù)結(jié)構(gòu)威始,類似于DelayQueue和PriorityQueue枢纠。在執(zhí)行定時任務(wù)的時候,每個任務(wù)的執(zhí)行時間都不同黎棠,所以DelayedWorkQueue的工作就是按照執(zhí)行時間的升序來排列晋渺,執(zhí)行時間距離當(dāng)前時間越近的任務(wù)在隊列的前面。
為什么要使用DelayedWorkQueue呢脓斩?
定時任務(wù)執(zhí)行時需要取出最近要執(zhí)行的任務(wù)木西,所以任務(wù)在隊列中每次出隊時一定要是當(dāng)前隊列中執(zhí)行時間最靠前的,所以自然要使用優(yōu)先級隊列随静。
DelayedWorkQueue是一個優(yōu)先級隊列八千,它可以保證每次出隊的任務(wù)都是當(dāng)前隊列中執(zhí)行時間最靠前的,由于它是基于堆結(jié)構(gòu)的隊列燎猛,堆結(jié)構(gòu)在執(zhí)行插入和刪除操作時的最壞時間復(fù)雜度是 O(logN)恋捆。
DelayedWorkQueue的數(shù)據(jù)結(jié)構(gòu)
//初始大小
private static final int INITIAL_CAPACITY = 16;
//DelayedWorkQueue是由一個大小為16的數(shù)組組成,數(shù)組元素為實(shí)現(xiàn)RunnableScheduleFuture接口的類
//實(shí)際上為ScheduledFutureTask
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;
可以看出DelayedWorkQueue底層是采用數(shù)組構(gòu)成的重绷,關(guān)于DelayedWorkQueue可以看這篇博主的文章沸停,很詳細(xì)。
關(guān)于DelayedWorkQueue我們可以得出這樣的結(jié)論:DelayedWorkQueue是基于堆的數(shù)據(jù)結(jié)構(gòu)昭卓,按照時間順序?qū)⒚總€任務(wù)進(jìn)行排序愤钾,將待執(zhí)行時間越近的任務(wù)放在在隊列的隊頭位置瘟滨,以便于最先進(jìn)行執(zhí)行。
4.ScheduledThreadPoolExecutor執(zhí)行過程
現(xiàn)在我們對ScheduledThreadPoolExecutor的兩個內(nèi)部類ScheduledFutueTask和DelayedWorkQueue進(jìn)行了了解能颁,實(shí)際上這也是線程池工作流程中最重要的兩個關(guān)鍵因素:任務(wù)以及阻塞隊列≡尤常現(xiàn)在我們來看下ScheduledThreadPoolExecutor提交一個任務(wù)后,整體的執(zhí)行過程伙菊。以ScheduledThreadPoolExecutor的schedule方法為例败玉,具體源碼為:
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
//將提交的任務(wù)轉(zhuǎn)換成ScheduledFutureTask
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
//延時執(zhí)行任務(wù)ScheduledFutureTask
delayedExecute(t);
return t;
}
方法很容易理解,為了滿足ScheduledThreadPoolExecutor能夠延時執(zhí)行任務(wù)和能周期執(zhí)行任務(wù)的特性占业,會先將實(shí)現(xiàn)Runnable接口的類轉(zhuǎn)換成ScheduledFutureTask绒怨。然后會調(diào)用delayedExecute
方法進(jìn)行執(zhí)行任務(wù),這個方法也是關(guān)鍵方法谦疾,來看下源碼:
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
//如果當(dāng)前線程池已經(jīng)關(guān)閉,則拒絕任務(wù)
reject(task);
else {
//將任務(wù)放入阻塞隊列中
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
//保證至少有一個線程啟動犬金,即使corePoolSize=0
ensurePrestart();
}
}
delayedExecute
方法的主要邏輯請看注釋念恍,可以看出該方法的重要邏輯會是在ensurePrestart()
方法中,它的源碼為:
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
可以看出該方法邏輯很簡單晚顷,關(guān)鍵在于它所調(diào)用的addWorker方法
峰伙,該方法主要功能:新建Worker類
,當(dāng)執(zhí)行任務(wù)時该默,就會調(diào)用被Worker所重寫的run方法
瞳氓,進(jìn)而會繼續(xù)執(zhí)行runWorker
方法。在runWorker
方法中會調(diào)用getTask
方法從阻塞隊列中不斷的去獲取任務(wù)進(jìn)行執(zhí)行栓袖,直到從阻塞隊列中獲取的任務(wù)為null的話匣摘,線程結(jié)束終止。addWorker方法是ThreadPoolExecutor類中的方法裹刮,對ThreadPoolExecutor的源碼分析可以看這篇文章音榜,很詳細(xì)。
5.總結(jié)
ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor類捧弃,因此赠叼,整體上功能一致,線程池主要負(fù)責(zé)創(chuàng)建線程(Worker類)违霞,線程從阻塞隊列中不斷獲取新的異步任務(wù)嘴办,直到阻塞隊列中已經(jīng)沒有了異步任務(wù)為止。但是相較于ThreadPoolExecutor來說买鸽,ScheduledThreadPoolExecutor具有延時執(zhí)行任務(wù)和可周期性執(zhí)行任務(wù)的特性涧郊,ScheduledThreadPoolExecutor重新設(shè)計了任務(wù)類
ScheduleFutureTask
,ScheduleFutureTask重寫了run
方法使其具有可延時執(zhí)行和可周期性執(zhí)行任務(wù)的特性。另外癞谒,阻塞隊列DelayedWorkQueue
是可根據(jù)優(yōu)先級排序的隊列底燎,采用了堆的底層數(shù)據(jù)結(jié)構(gòu)刃榨,使得與當(dāng)前時間相比,待執(zhí)行時間越靠近的任務(wù)放置隊頭双仍,以便線程能夠獲取到任務(wù)進(jìn)行執(zhí)行枢希;-
線程池?zé)o論是ThreadPoolExecutor還是ScheduledThreadPoolExecutor,在設(shè)計時的三個關(guān)鍵要素是:任務(wù)朱沃,執(zhí)行者以及任務(wù)結(jié)果苞轿。它們的設(shè)計思想也是完全將這三個關(guān)鍵要素進(jìn)行了解耦。
執(zhí)行者
任務(wù)的執(zhí)行機(jī)制逗物,完全交由
Worker類
搬卒,也就是進(jìn)一步了封裝了Thread。向線程池提交任務(wù)翎卓,無論為ThreadPoolExecutor的execute方法和submit方法契邀,還是ScheduledThreadPoolExecutor的schedule方法,都是先將任務(wù)移入到阻塞隊列中失暴,然后通過addWork方法新建了Work類坯门,并通過runWorker方法啟動線程,并不斷的從阻塞對列中獲取異步任務(wù)執(zhí)行交給Worker執(zhí)行逗扒,直至阻塞隊列中無法取到任務(wù)為止古戴。任務(wù)
在ThreadPoolExecutor和ScheduledThreadPoolExecutor中任務(wù)是指實(shí)現(xiàn)了Runnable接口和Callable接口的實(shí)現(xiàn)類。ThreadPoolExecutor中會將任務(wù)轉(zhuǎn)換成
FutureTask
類矩肩,而在ScheduledThreadPoolExecutor中為了實(shí)現(xiàn)可延時執(zhí)行任務(wù)和周期性執(zhí)行任務(wù)的特性现恼,任務(wù)會被轉(zhuǎn)換成ScheduledFutureTask
類,該類繼承了FutureTask黍檩,并重寫了run方法叉袍。任務(wù)結(jié)果
在ThreadPoolExecutor中提交任務(wù)后,獲取任務(wù)結(jié)果可以通過Future接口的類建炫,在ThreadPoolExecutor中實(shí)際上為FutureTask類畦韭,而在ScheduledThreadPoolExecutor中則是
ScheduledFutureTask
類