Java并發(fā)編程源碼分析系列:
- 分析Java線程池的創(chuàng)建
- 分析Java線程池執(zhí)行原理
- 分析Java線程池Callable任務(wù)執(zhí)行原理
- 分析ReentrantLock的實(shí)現(xiàn)原理
- 分析CountDownLatch的實(shí)現(xiàn)原理
- 分析同步工具Semaphore和CyclicBarrier的實(shí)現(xiàn)原理
延遲或周期執(zhí)行任務(wù)可以使用Timer或者ScheduledThreadPoolExecutor,前者可以拋棄,后者是今天的主角娶桦。
ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor,對(duì)應(yīng)執(zhí)行任務(wù)變成ScheduledFutureTask袖肥。本文會(huì)在前三篇分析線程池原理的基礎(chǔ)上,分析ScheduledThreadPoolExecutor的實(shí)現(xiàn)原理振劳,最后介紹下為什么不用Timer了椎组。
ScheduledThreadPoolExecutor的創(chuàng)建
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(10);
ScheduledExecutorService singleScheduledThreadPool = Executors.newSingleThreadScheduledExecutor();
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
ScheduledThreadPoolExecutor的創(chuàng)建可以使用Executors,也可以自己傳參構(gòu)建历恐。上面的構(gòu)造函數(shù)是參數(shù)最全的版本寸癌,可以設(shè)置線程目標(biāo)數(shù)量、線程工廠和飽和策略弱贼。至于等待隊(duì)列蒸苇,使用內(nèi)部類DelayedWorkQueue,看后文分析吮旅。
ScheduledFutureTask
ScheduledFutureTask的構(gòu)造函數(shù)沒(méi)什么特別溪烤,保存了三個(gè)參數(shù)。
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
ScheduledFutureTask(Callable<V> callable, long ns) {
super(callable);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
- time:任務(wù)執(zhí)行時(shí)間;
- period:任務(wù)周期執(zhí)行間隔氛什;
- sequenceNumber:自增的任務(wù)序號(hào)莺葫。
Callable默認(rèn)period=0,表示任務(wù)不是周期執(zhí)行枪眉,因?yàn)橹挥蠷unnable可以周期執(zhí)行。想想也是再层,Callable目的是獲得執(zhí)行結(jié)果贸铜,沒(méi)有必要重復(fù)調(diào)用。
ScheduledFutureTask繼承了我們熟悉的FutureTask聂受,這個(gè)不用多說(shuō)蒿秦。圖1是它實(shí)現(xiàn)的接口,比較陌生的是Delayed蛋济,而Delayed又繼承了Comparable棍鳖。
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
這兩個(gè)接口的存在很容易理解,ScheduledFutureTask在等待隊(duì)列里調(diào)度不再按照FIFO碗旅,而是按照?qǐng)?zhí)行時(shí)間渡处,誰(shuí)即將執(zhí)行,誰(shuí)就排在前面祟辟。在這里也可以看到sequenceNumber的作用医瘫,當(dāng)執(zhí)行時(shí)間相同時(shí),按照序號(hào)排序旧困。
添加延遲任務(wù)
對(duì)ScheduledThreadPoolExecutor使用通用的execute或者submit提交任務(wù)醇份,最終調(diào)用schedule方法,默認(rèn)馬上執(zhí)行吼具。如果需要延遲執(zhí)行僚纷,需要直接使用schedule,傳遞時(shí)間參數(shù)拗盒。
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
Runnable和Callable包裝成ScheduledFutureTask實(shí)例怖竭,保存了延遲信息,然后執(zhí)行delayedExecute锣咒。
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
boolean canRunInCurrentRunState(boolean periodic) {
return isRunningOrShutdown(periodic ?
continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}
如果線程池已經(jīng)關(guān)閉侵状,直接調(diào)用飽和策略,否則將任務(wù)加入等待隊(duì)列毅整。加入之后趣兄,需要再判斷線程池的狀態(tài),和當(dāng)前任務(wù)是否能運(yùn)行悼嫉。如果不能繼續(xù)執(zhí)行艇潭,將任務(wù)移出隊(duì)列并取消任務(wù)。
canRunInCurrentRunState處理任務(wù)加入等待隊(duì)列后,又未執(zhí)行就發(fā)生線程池關(guān)閉的情況蹋凝,它通過(guò)預(yù)設(shè)的兩個(gè)變量判斷任務(wù)到底能不能執(zhí)行鲁纠。
- 延遲任務(wù)用executeExistingDelayedTasksAfterShutdown
- 周期任務(wù)用continueExistingPeriodicTasksAfterShutdown
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
最后調(diào)用到ensurePrestart,使用addWorkder增加工作線程鳍寂,這在ThreadPoolExecutor解釋過(guò)了
添加周期任務(wù)
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,long period,TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,
long delay,TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
執(zhí)行周期任務(wù)有上面兩個(gè)方法改含,具體作用方法名寫(xiě)得很清楚:
- scheduleAtFixedRate:按固定的頻率執(zhí)行,不受執(zhí)行時(shí)長(zhǎng)影響迄汛,到點(diǎn)就執(zhí)行捍壤;
- scheduleWithFixedDelay:任務(wù)執(zhí)行完后,按固定的延后時(shí)間再執(zhí)行鞍爱。
兩個(gè)方法幾乎一樣鹃觉,不同的是構(gòu)建ScheduledFutureTask時(shí),period一個(gè)傳正數(shù)睹逃,另一個(gè)傳負(fù)數(shù)盗扇。不用懷疑,區(qū)分兩種情況就是用正負(fù)沉填。
等待隊(duì)列
線程池的等待隊(duì)列使用了內(nèi)部類DelayedWorkQueue疗隶,和普通線程池等待隊(duì)列最大的不同是它的任務(wù)是按照目標(biāo)執(zhí)行時(shí)間進(jìn)行排序。
入隊(duì)的offer被重寫(xiě)了拜轨,add和put方法也是調(diào)用offer抽减,具體BlockingQueue的實(shí)現(xiàn)邏輯不在這里討論,重點(diǎn)是看offer里的siftUp方法橄碾。
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}
siftUp根據(jù)任務(wù)的compareTo卵沉,將任務(wù)移動(dòng)到隊(duì)列中指定的位置,就是這樣法牲。
對(duì)應(yīng)地史汗,出隊(duì)take方法,根據(jù)任務(wù)的delay時(shí)間拒垃,小于等于0時(shí)將任務(wù)出隊(duì)停撞,否則等待。
任務(wù)執(zhí)行
當(dāng)線程池從等待隊(duì)列取出一個(gè)任務(wù)時(shí)悼瓮,會(huì)執(zhí)行它的run方法戈毒。
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
方法有三個(gè)分支,第一個(gè)if判斷任務(wù)在當(dāng)前線程池狀態(tài)下是否能執(zhí)行横堡,canRunInCurrentRunState已經(jīng)講解過(guò)埋市。第二個(gè)if是判斷是否周期任務(wù),不是的話直接執(zhí)行命贴,不需要多余的操作道宅。重點(diǎn)來(lái)看第三個(gè)if食听,也就是周期執(zhí)行任務(wù)。
- runAndReset:任務(wù)執(zhí)行完重置為初始狀態(tài)污茵,等待下一次執(zhí)行樱报;
- setNextRunTime:計(jì)算下次執(zhí)行時(shí)間;
- reExecutePeriodic:再調(diào)度任務(wù)泞当。
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
計(jì)算下次執(zhí)行時(shí)間迹蛤,period根據(jù)正負(fù)有不同的計(jì)算邏輯,負(fù)的時(shí)間也會(huì)先改正襟士,很明顯對(duì)應(yīng)上文的scheduleAtFixedRate和scheduleWithFixedDelay兩個(gè)方法笤受。
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
將任務(wù)重新加入等待隊(duì)列,中間幾個(gè)方法都解釋過(guò)了敌蜂。
Timer的缺陷
自從知道ScheduledThreadPoolExecutor,再?zèng)]有使用Timer津肛,因?yàn)樗袔讉€(gè)缺陷:
- 多任務(wù)在單線程里執(zhí)行章喉,一個(gè)任務(wù)結(jié)束,另一個(gè)任務(wù)才能開(kāi)始身坐,時(shí)間間隔不準(zhǔn)秸脱;
- 出現(xiàn)異常會(huì)導(dǎo)致全部任務(wù)停止;
- 絕對(duì)時(shí)間部蛇,受系統(tǒng)時(shí)間影響摊唇。
private final TaskQueue queue = new TaskQueue();
private final TimerThread thread = new TimerThread(queue);
Timer的代碼很簡(jiǎn)單,主要數(shù)據(jù)結(jié)構(gòu)是一個(gè)任務(wù)隊(duì)列和一個(gè)執(zhí)行線程涯鲁。新增的任務(wù)會(huì)加入任務(wù)隊(duì)列巷查,到達(dá)時(shí)間后,由執(zhí)行線程執(zhí)行抹腿。只有一個(gè)線程岛请,很容易理解上面講的缺陷。
ScheduledThreadPoolExecutor每個(gè)任務(wù)都有對(duì)應(yīng)的執(zhí)行線程警绩,時(shí)間使用相對(duì)時(shí)間計(jì)算崇败,也就沒(méi)有上面的缺陷,所以沒(méi)有理由使用Timer了肩祥。