ScheduledThreadPoolExecutor解析
我們知道Timer與TimerTask雖然可以實現(xiàn)線程的周期和延遲調(diào)度热押,但是Timer與TimerTask存在一些缺陷斤贰,所以對于這種定期枯芬、周期執(zhí)行任務(wù)的調(diào)度策略,我們一般都是推薦ScheduledThreadPoolExecutor來實現(xiàn)。下面就深入分析ScheduledThreadPoolExecutor是如何來實現(xiàn)線程的周期、延遲調(diào)度的。
ScheduledThreadPoolExecutor旭寿,繼承ThreadPoolExecutor且實現(xiàn)了ScheduledExecutorService接口,它就相當(dāng)于提供了“延遲”和“周期執(zhí)行”功能的ThreadPoolExecutor崇败。在JDK API中是這樣定義它的:ThreadPoolExecutor盅称,它可另行安排在給定的延遲后運行命令肩祥,或者定期執(zhí)行命令。需要多個輔助線程時缩膝,或者要求 ThreadPoolExecutor 具有額外的靈活性或功能時混狠,此類要優(yōu)于 Timer。 一旦啟用已延遲的任務(wù)就執(zhí)行它疾层,但是有關(guān)何時啟用将饺,啟用后何時執(zhí)行則沒有任何實時保證。按照提交的先進(jìn)先出 (FIFO) 順序來啟用那些被安排在同一執(zhí)行時間的任務(wù)痛黎。 它提供了四個構(gòu)造方法:
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler);
}
當(dāng)然我們一般都不會直接通過其構(gòu)造函數(shù)來生成一個ScheduledThreadPoolExecutor對象(例如new ScheduledThreadPoolExecutor(10)之類的)予弧,而是通過Executors類(例如Executors.newScheduledThreadPool(int);)
在ScheduledThreadPoolExecutor的構(gòu)造函數(shù)中,我們發(fā)現(xiàn)它都是利用ThreadLocalExecutor來構(gòu)造的湖饱,唯一變動的地方就在于它所使用的阻塞隊列變成了DelayedWorkQueue掖蛤,而不是ThreadLocalhExecutor的LinkedBlockingQueue(通過Executors產(chǎn)生ThreadLocalhExecutor對象)。DelayedWorkQueue為ScheduledThreadPoolExecutor中的內(nèi)部類井厌,它其實和阻塞隊列DelayQueue有點兒類似蚓庭。DelayQueue是可以提供延遲的阻塞隊列,它只有在延遲期滿時才能從中提取元素仅仆,其列頭是延遲期滿后保存時間最長的Delayed元素器赞。如果延遲都還沒有期滿,則隊列沒有頭部墓拜,并且 poll 將返回 null港柜。有關(guān)于DelayQueue的更多介紹請參考這篇博客【死磕Java并發(fā)】-----J.U.C之阻塞隊列:DelayQueue。所以DelayedWorkQueue中的任務(wù)必然是按照延遲時間從短到長來進(jìn)行排序的撮弧。下面我們再深入分析DelayedWorkQueue潘懊,這里留一個引子。 ScheduledThreadPoolExecutor提供了如下四個方法贿衍,也就是四個調(diào)度器:
- schedule(Callable callable, long delay, TimeUnit unit) :創(chuàng)建并執(zhí)行在給定延遲后啟用的 ScheduledFuture。
- schedule(Runnable command, long delay, TimeUnit unit) :創(chuàng)建并執(zhí)行在給定延遲后啟用的一次性操作救恨。
- scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) :創(chuàng)建并執(zhí)行一個在給定初始延遲后首次啟用的定期操作贸辈,后續(xù)操作具有給定的周期;也就是將在 initialDelay 后開始執(zhí)行肠槽,然后在 initialDelay+period 后執(zhí)行擎淤,接著在 initialDelay + 2 * period 后執(zhí)行,依此類推秸仙。
- scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) :創(chuàng)建并執(zhí)行一個在給定初始延遲后首次啟用的定期操作嘴拢,隨后,在每一次執(zhí)行終止和下一次執(zhí)行開始之間都存在給定的延遲寂纪。
第一席吴、二個方法差不多赌结,都是一次性操作,只不過參數(shù)一個是Callable孝冒,一個是Runnable柬姚。稍微分析下第三(scheduleAtFixedRate)、四個(scheduleWithFixedDelay)方法庄涡,加入initialDelay = 5量承,period/delay = 3,unit為秒穴店。如果每個線程都是都運行非常良好不存在延遲的問題撕捍,那么這兩個方法線程運行周期是5、8泣洞、11卦洽、14、17.......斜棚,但是如果存在延遲呢阀蒂?比如第三個線程用了5秒鐘,那么這兩個方法的處理策略是怎樣的弟蚀?第三個方法(scheduleAtFixedRate)是周期固定蚤霞,也就說它是不會受到這個延遲的影響的,每個線程的調(diào)度周期在初始化時就已經(jīng)絕對了义钉,是什么時候調(diào)度就是什么時候調(diào)度昧绣,它不會因為上一個線程的調(diào)度失效延遲而受到影響。但是第四個方法(scheduleWithFixedDelay)捶闸,則不一樣夜畴,它是每個線程的調(diào)度間隔固定,也就是說第一個線程與第二線程之間間隔delay删壮,第二個與第三個間隔delay贪绘,以此類推。如果第二線程推遲了那么后面所有的線程調(diào)度都會推遲央碟,例如税灌,上面第二線程推遲了2秒,那么第三個就不再是11秒執(zhí)行了亿虽,而是13秒執(zhí)行菱涤。 查看著四個方法的源碼,會發(fā)現(xiàn)其實他們的處理邏輯都差不多洛勉,所以我們就挑scheduleWithFixedDelay方法來分析粘秆,如下:
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
if (command == null || unit == null) throw new NullPointerException();
if (delay <= 0) throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
scheduleWithFixedDelay方法處理的邏輯如下:
- 校驗,如果參數(shù)不合法則拋出異常
- 構(gòu)造一個task收毫,該task為ScheduledFutureTask
- 調(diào)用delayedExecute()方法做后續(xù)相關(guān)處理
這段代碼涉及兩個類ScheduledFutureTask和RunnableScheduledFuture攻走,其中RunnableScheduledFuture不用多說殷勘,他繼承RunnableFuture和ScheduledFuture兩個接口,除了具備RunnableFuture和ScheduledFuture兩類特性外陋气,它還定義了一個方法isPeriodic() 劳吠,該方法用于判斷執(zhí)行的任務(wù)是否為定期任務(wù),如果是則返回true巩趁。而ScheduledFutureTask作為ScheduledThreadPoolExecutor的內(nèi)部類痒玩,它扮演著極其重要的作用,因為它的作用則是負(fù)責(zé)ScheduledThreadPoolExecutor中任務(wù)的調(diào)度议慰。 ScheduledFutureTask內(nèi)部繼承FutureTask蠢古,實現(xiàn)RunnableScheduledFuture接口,它內(nèi)部定義了三個比較重要的變量
/** 任務(wù)被添加到ScheduledThreadPoolExecutor中的序號 */
private final long sequenceNumber;
/** 任務(wù)要執(zhí)行的具體時間 */
private long time;
/** 任務(wù)的間隔周期 */
private final long period;
這三個變量與任務(wù)的執(zhí)行有著非常密切的關(guān)系别凹,什么關(guān)系草讶?先看ScheduledFutureTask的幾個構(gòu)造函數(shù)和核心方法:
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
ScheduledFutureTask(Callable<V> callable, long ns) {
super(callable);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
ScheduledFutureTask(Callable<V> callable, long ns) {
super(callable);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
ScheduledFutureTask 提供了四個構(gòu)造方法,這些構(gòu)造方法與上面三個參數(shù)是不是一一對應(yīng)了炉菲?這些參數(shù)有什么用堕战,如何用,則要看ScheduledFutureTask在那些方法使用了該方法拍霜,在ScheduledFutureTask中有一個compareTo()方法:
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0) return -1;
else if (diff > 0) return 1;
else if (sequenceNumber < x.sequenceNumber) return -1;
else return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
相信各位都知道該方法是干嘛用的嘱丢,提供一個排序算法,該算法規(guī)則是:首先按照time排序祠饺,time小的排在前面越驻,大的排在后面,如果time相同道偷,則使用sequenceNumber排序缀旁,小的排在前面,大的排在后面勺鸦。那么為什么在這個類里面提供compareTo()方法呢并巍?在前面就介紹過ScheduledThreadPoolExecutor在構(gòu)造方法中提供的是DelayedWorkQueue()隊列中,也就是說ScheduledThreadPoolExecutor是把任務(wù)添加到DelayedWorkQueue中的祝旷,而DelayedWorkQueue則是類似于DelayQueue履澳,內(nèi)部維護(hù)著一個以時間為先后順序的隊列,所以compareTo()方法使用與DelayedWorkQueue隊列對其元素ScheduledThreadPoolExecutor task進(jìn)行排序的算法怀跛。 排序已經(jīng)解決了,那么ScheduledThreadPoolExecutor 是如何對task任務(wù)進(jìn)行調(diào)度和延遲的呢柄冲?任何線程的執(zhí)行吻谋,都是通過run()方法執(zhí)行,ScheduledThreadPoolExecutor 的run()方法如下:
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic) ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
- 調(diào)用isPeriodic()獲取該線程是否為周期性任務(wù)標(biāo)志现横,然后調(diào)用canRunInCurrentRunState()方法判斷該線程是否可以執(zhí)行漓拾,如果不可以執(zhí)行則調(diào)用cancel()取消任務(wù)阁最。
- 如果當(dāng)線程已經(jīng)到達(dá)了執(zhí)行點,則調(diào)用run()方法執(zhí)行task骇两,該run()方法是在FutureTask中定義的速种。
- 否則調(diào)用runAndReset()方法運行并充值,調(diào)用setNextRunTime()方法計算任務(wù)下次的執(zhí)行時間低千,重新把任務(wù)添加到隊列中配阵,讓該任務(wù)可以重復(fù)執(zhí)行。
isPeriodic() 該方法用于判斷指定的任務(wù)是否為定期任務(wù)示血。
public boolean isPeriodic() {
return period != 0;
}
canRunInCurrentRunState()判斷任務(wù)是否可以取消棋傍,cancel()取消任務(wù),這兩個方法比較簡單难审,而run()執(zhí)行任務(wù)瘫拣,runAndReset()運行并重置狀態(tài),牽涉比較廣告喊,我們放在FutureTask后面介紹麸拄。所以重點介紹setNextRunTime()和reExecutePeriodic()這兩個涉及到延遲的方法。 setNextRunTime() setNextRunTime()方法用于重新計算任務(wù)的下次執(zhí)行時間黔姜。如下:
private void setNextRunTime() {
long p = period;
if (p > 0) time += p;
else time = triggerTime(-p);
}
該方法定義很簡單拢切,p > 0 ,time += p ,否則調(diào)用triggerTime()方法重新計算time:
long triggerTime(long delay) {
return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
reExecutePeriodic
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false); else ensurePrestart();
}
}
reExecutePeriodic重要的是調(diào)用super.getQueue().add(task);將任務(wù)task加入的隊列DelayedWorkQueue中地淀。ensurePrestart()在【死磕Java并發(fā)】-----J.U.C之線程池:ThreadPoolExecutor已經(jīng)做了詳細(xì)介紹失球。 到這里ScheduledFutureTask已經(jīng)介紹完了,ScheduledFutureTask在ScheduledThreadPoolExecutor扮演作用的重要性不言而喻帮毁。其實ScheduledThreadPoolExecutor的實現(xiàn)不是很復(fù)雜实苞,因為有FutureTask和ThreadPoolExecutor的支撐,其實現(xiàn)就顯得不是那么難了烈疚。