目錄
類圖結(jié)構(gòu)
ScheduledThreadPoolExecutor時一個可以在指定一定延遲時間后或者定時進(jìn)行任務(wù)調(diào)度執(zhí)行的線程池硫豆。
ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor并實(shí)現(xiàn)了ScheduledExecutorService接口呀打。
線程池隊列是DelayedWorkQueue伟叛,與DelayedQueue一樣屬于延遲隊列寂汇。
ScheduledFuturetask是具有返回值的任務(wù)瘪撇,繼承自FutureTask。FutureTask內(nèi)部用一個變量state來表示任務(wù)的狀態(tài)干茉,一開始為NEW论颅。
各狀態(tài)意義如下:
private static final int NEW = 0; // 初始狀態(tài)
private static final int COMPLETING = 1; // 執(zhí)行中
private static final int NORMAL = 2; // 正常運(yùn)行結(jié)束
private static final int EXCEPTIONAL = 3; // 運(yùn)行中異常
private static final int CANCELLED = 4; // 任務(wù)被取消
private static final int INTERRUPTING = 5; // 任務(wù)正在被中斷
private static final int INTERRUPTED = 6; // 任務(wù)已經(jīng)被中斷
ScheduledFutureTask內(nèi)部用一個變量period來表示任務(wù)的類型:
- period=0,說明當(dāng)前任務(wù)是一次性的镜雨,執(zhí)行完畢后就推出了嫂侍。
- period為負(fù)數(shù),說明當(dāng)前任務(wù)為固定延遲的定時可重復(fù)執(zhí)行任務(wù)(執(zhí)行完一次后會停止指定時間后再次運(yùn)行荚坞,若每次執(zhí)行任務(wù)耗時不同挑宠,則顯然相鄰兩次任務(wù)執(zhí)行間隔不同)。
- period為正數(shù)颓影,說明當(dāng)前任務(wù)為固定頻率的定尺可重復(fù)執(zhí)行任務(wù)(也即固定周期)各淀。
以下為ScheduledThreadPoolExecutor的構(gòu)造函數(shù):
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
// 指定了線程工廠
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
// 指定了拒絕策略
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
// 指定了線程工廠和拒絕策略
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
從上面的代碼中可以看到,ScheduledThreadPoolExecutor的線程池隊列為DelayedWorkQueue诡挂。
源碼分析
schedule(Runnable command, long delay, TimeUnit unit)
提交一個延遲執(zhí)行的任務(wù)碎浇,任務(wù)從提交時間算起延遲單位為unit的delay后開始執(zhí)行。提交的任務(wù)不是周期性任務(wù)璃俗,任務(wù)只會執(zhí)行一次奴璃。
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
// 參數(shù)校驗(yàn)
if (command == null || unit == null)
throw new NullPointerException();
// 將任務(wù)包裝成ScheduledFutureTask
// triggerTime方法用來計算觸發(fā)時間(即任務(wù)開始執(zhí)行的絕對時間)
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));
// 添加任務(wù)到延遲隊列
delayedExecute(t);
return t;
}
以下是ScheduledFutureTask的相關(guān)代碼:
ScheduledFutureTask(Runnable r, V result, long ns) {
// 調(diào)用父類構(gòu)造函數(shù)
super(r, result);
this.time = ns; // 等待ns納秒后開始執(zhí)行
this.period = 0; // period=0說明為一次性任務(wù)
// 記錄任務(wù)編號
this.sequenceNumber = sequencer.getAndIncrement();
}
public FutureTask(Runnable runnable, V result) {
// 將Runnable任務(wù)轉(zhuǎn)化成Callable任務(wù)
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
delayedExecute的代碼如下:
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 線程池關(guān)閉則執(zhí)行拒絕策略
if (isShutdown())
reject(task);
else {
// 將任務(wù)添加到任務(wù)隊列中
// 任務(wù)隊列為DelayedWorkQueue
// 所加的task實(shí)現(xiàn)了comparable接口
// 添加到任務(wù)隊列中能保證隊首元素為最早需要執(zhí)行的
super.getQueue().add(task);
// 再次檢查線程池是否關(guān)閉
// 因?yàn)閳?zhí)行上面add代碼過程中線程池完全有可能被關(guān)閉
// 如果線程池被關(guān)閉則判斷當(dāng)前任務(wù)是否可以在當(dāng)前狀態(tài)下繼續(xù)執(zhí)行
// 不能繼續(xù)執(zhí)行則移除當(dāng)前任務(wù)
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
// 保證至少有一個線程存活可以從任務(wù)隊列中獲取任務(wù)處理任務(wù)
ensurePrestart();
}
}
// 判斷任務(wù)是否是周期執(zhí)行的
public boolean isPeriodic() {
return period != 0;
}
// 根據(jù)periodic來決定isRunningOrShutdown的參數(shù)
// continueExistingPeriodicTasksAfterShutdown和
// executeExistingDelayedTasksAfterShutdown的值可通過相應(yīng)的setter方法來設(shè)置
// 為true表示線程池關(guān)閉后當(dāng)前任務(wù)會繼續(xù)執(zhí)行完畢
// 為false則取消當(dāng)前任務(wù)
boolean canRunInCurrentRunState(boolean periodic) {
return isRunningOrShutdown(periodic ?
continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}
// 確保至少有一個線程存活來執(zhí)行任務(wù)
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
線程池中具體執(zhí)行任務(wù)的是Worker,Worker通過調(diào)用run方法來執(zhí)行城豁。這里的任務(wù)是ScheduledFutureTask溺健,下面來看其run方法。
// ScheduledFutureTask.run
public void run() {
boolean periodic = isPeriodic();
// 判斷是否需要取消任務(wù)
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 一次性任務(wù)
else if (!periodic)
// 調(diào)用FutureTask的run方法
ScheduledFutureTask.super.run();
// 周期性任務(wù)
// runAndReset為FutureTask中的方法
// 用于執(zhí)行當(dāng)前任務(wù)但不改變future的狀態(tài)
else if (ScheduledFutureTask.super.runAndReset()) {
// 設(shè)置下次執(zhí)行的時間
setNextRunTime();
// 默認(rèn)情況下钮蛛,outerTask = this就是當(dāng)前對象
reExecutePeriodic(outerTask);
}
}
// 設(shè)置周期性任務(wù)下次執(zhí)行時間
private void setNextRunTime() {
long p = period;
// p > 0表示任務(wù)執(zhí)行頻率一定
if (p > 0)
// time為此次任務(wù)(已執(zhí)行完畢)剛開始執(zhí)行時的時間
time += p;
// p < 0表示任務(wù)固定延遲時間
// 即此次任務(wù)完成后會等待-p時間再執(zhí)行下次任務(wù)
else
// 獲取-p時間后的絕對時間
time = triggerTime(-p);
}
// 周期性執(zhí)行
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
// 再次將task添加至任務(wù)隊列中等待執(zhí)行
// 當(dāng)輪到task執(zhí)行時鞭缭,又會在run中調(diào)用此方法
// 再次將自身添加到任務(wù)隊列中,從而達(dá)到周期性執(zhí)行效果
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
當(dāng)任務(wù)執(zhí)行完畢后魏颓,讓其延遲固定時間后再次運(yùn)行岭辣。
// command:所要執(zhí)行的任務(wù)
// initialDelay: 提交任務(wù)后等待多少時間再開始執(zhí)行任務(wù)
// delay: 一次任務(wù)執(zhí)行完后等待多少時間才執(zhí)行下次任務(wù)
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
// 參數(shù)校驗(yàn)
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
// 將Runnable任務(wù)轉(zhuǎn)換成ScheduledFutureTask
// 第四個參數(shù)period為-delay < 0表示當(dāng)前任務(wù)是固定延遲的
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
// decorateTask默認(rèn)直接返回第二個參數(shù),即sft
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
// 默認(rèn)情況下t = sft
// outerTask即為sft自身甸饱,用于實(shí)現(xiàn)周期性執(zhí)行
sft.outerTask = t;
delayedExecute(t);
return t;
}
// 可被子類重寫拓展
// 默認(rèn)實(shí)現(xiàn)只是簡單的返回task
protected <V> RunnableScheduledFuture<V> decorateTask(
Runnable runnable, RunnableScheduledFuture<V> task) {
return task;
}
主要不同在于設(shè)置了period=-delay沦童,其他代碼與schedule相同,相應(yīng)的代碼會判斷period的取值從而決定程序不同的行為叹话。
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
按固定頻率周期性地執(zhí)行任務(wù)偷遗。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
// 關(guān)鍵在于此處第四個參數(shù)period > 0
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
除了設(shè)置period的值大于0外,總體與scheduleWithFixedDelay類似驼壶,不再贅述氏豌。
更多
相關(guān)筆記:《Java并發(fā)編程之美》閱讀筆記