前言
在上一篇線程池的文章《Java線程池原理分析ThreadPoolExecutor篇》中從ThreadPoolExecutor源碼分析了其運(yùn)行機(jī)制。限于篇幅第岖,留下了ScheduledThreadPoolExecutor未做分析键袱,因此本文繼續(xù)從源代碼出發(fā)分析ScheduledThreadPoolExecutor的內(nèi)部原理。
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor同ThreadPoolExecutor一樣也可以從 Executors線程池工廠創(chuàng)建比藻,所不同的是它具有定時(shí)執(zhí)行,以周期或間隔循環(huán)執(zhí)行任務(wù)等功能。
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
}
public interface ScheduledExecutorService extends ExecutorService {
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor烛缔,因此它具有ThreadPoolExecutor的所有能力。
通過(guò)super方法的參數(shù)可知拿愧,核心線程的數(shù)量即傳入的參數(shù)杠河,而線程池的線程數(shù)為Integer.MAX_VALUE,幾乎為無(wú)上限柳洋。
這里采用了DelayedWorkQueue任務(wù)隊(duì)列陪白,也是定時(shí)任務(wù)的核心,留在后面分析膳灶。
ScheduledThreadPoolExecutor實(shí)現(xiàn)了ScheduledExecutorService 中的接口:
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
延時(shí)執(zhí)行Callable任務(wù)的功能咱士。
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
延時(shí)執(zhí)行Runnable任務(wù)的功能。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
可以延時(shí)循環(huán)執(zhí)行周期性任務(wù)轧钓。
假設(shè)任務(wù)執(zhí)行時(shí)間固定2s,period為1s序厉,因?yàn)槿蝿?wù)的執(zhí)行時(shí)間大于規(guī)定的period,所以任務(wù)會(huì)每隔2s(任務(wù)執(zhí)行時(shí)間)開(kāi)始執(zhí)行一次毕箍。如果任務(wù)執(zhí)行時(shí)間固定為0.5s,period為1s弛房,因?yàn)槿蝿?wù)執(zhí)行時(shí)間小于period,所以任務(wù)會(huì)每隔1s(period)開(kāi)始執(zhí)行一次而柑。實(shí)際任務(wù)的執(zhí)行時(shí)間即可能是大于period的文捶,也可能小于period,scheduleAtFixedRate的好處就是每次任務(wù)的開(kāi)始時(shí)間間隔必然大于等于period媒咳。
假設(shè)一項(xiàng)業(yè)務(wù)需求每天凌晨3點(diǎn)將數(shù)據(jù)庫(kù)備份粹排,然而數(shù)據(jù)庫(kù)備份的時(shí)間小于24H,最適合用scheduleAtFixedRate方法實(shí)現(xiàn)涩澡。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
可以延時(shí)以相同間隔時(shí)間循環(huán)執(zhí)行任務(wù)顽耳。
假設(shè)任務(wù)執(zhí)行的時(shí)間固定為2s,delay為1s妙同,那么任務(wù)會(huì)每隔3s(任務(wù)時(shí)間+delay)開(kāi)始執(zhí)行一次射富。
如果業(yè)務(wù)需求本次任務(wù)的結(jié)束時(shí)間與下一個(gè)任務(wù)的開(kāi)始時(shí)間固定,使用scheduleWithFixedDelay可以方便地實(shí)現(xiàn)業(yè)務(wù)粥帚。
ScheduledFuture
四個(gè)執(zhí)行任務(wù)的方法都返回了ScheduledFuture對(duì)象胰耗,它與Future有什么區(qū)別?
public interface ScheduledFuture<V> extends Delayed, Future<V> {
}
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
public interface Comparable<T> {
public int compareTo(T o);
}
可以看到ScheduledFuture也繼承了Future芒涡,并且繼承了Delayed柴灯,增加了getDelay方法卖漫,而Delayed繼承自Comparable,所以具有compareTo方法。
四種執(zhí)行定時(shí)任務(wù)的方法
schedule(Runnable command,long delay, TimeUnit unit)
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
這個(gè)方法中出現(xiàn)了幾個(gè)陌生的類弛槐,首先是ScheduledFutureTask:
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
...
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {
boolean isPeriodic();
}
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
這個(gè)類是ScheduledThreadPoolExecutor的內(nèi)部類懊亡,繼承自FutureTask實(shí)現(xiàn)了RunnableScheduledFuture接口。RunnableScheduledFuture有些復(fù)雜乎串,繼承自RunnableFuture和ScheduledFuture接口店枣。可見(jiàn)ScheduledThreadPoolExecutor身兼多職叹誉。這個(gè)類既可以作為Runnable被線程執(zhí)行鸯两,又可以作為FutureTask用于獲取Callable任務(wù)call方法返回的結(jié)果。
在FutureTask的構(gòu)造方法中傳入Runnable對(duì)象會(huì)將其轉(zhuǎn)換為返回值為null的Callable對(duì)象长豁。
/**
* Modifies or replaces the task used to execute a runnable.
* This method can be used to override the concrete
* class used for managing internal tasks.
* The default implementation simply returns the given task.
*
* @param runnable the submitted Runnable
* @param task the task created to execute the runnable
* @param <V> the type of the task's result
* @return a task that can execute the runnable
* @since 1.6
*/
protected <V> RunnableScheduledFuture<V> decorateTask(
Runnable runnable, RunnableScheduledFuture<V> task) {
return task;
}
從decorateTask的字面意義判斷它將具體的RunnableScheduledFuture實(shí)現(xiàn)類向上轉(zhuǎn)型為RunnableScheduledFuture接口钧唐。從它的方法描述和實(shí)現(xiàn)看出它只是簡(jiǎn)單的將ScheduledFutureTask向上轉(zhuǎn)型為RunnableScheduledFuture接口,由protected 修飾符可知設(shè)計(jì)者希望子類擴(kuò)展這個(gè)方法的實(shí)現(xiàn)匠襟。
之所以向上轉(zhuǎn)型為RunnableScheduledFuture接口钝侠,設(shè)計(jì)者也是希望將具體與接口分離。
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
//情況一
reject(task);
else {
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
//情況二
task.cancel(false);
else
//情況三
ensurePrestart();
}
}
boolean canRunInCurrentRunState(boolean periodic) {
return isRunningOrShutdown(periodic ?
continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
final boolean isRunningOrShutdown(boolean shutdownOK) {
int rs = runStateOf(ctl.get());
return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}
delayedExecute方法負(fù)責(zé)執(zhí)行延時(shí)任務(wù)酸舍。
情況一 : 先判斷線程池是否關(guān)閉帅韧,若關(guān)閉則拒絕任務(wù)。
情況二:線程池未關(guān)閉啃勉,將任務(wù)添加到父類的任務(wù)隊(duì)列忽舟,即DelayedWorkQueue中。下面再次判斷線程池是否關(guān)閉淮阐,并且判斷canRunInCurrentRunState方法的返回值是否為false叮阅。因?yàn)閭魅隦unnable參數(shù),task.isPeriodic()為false泣特,所以isRunningOrShutdown返回true浩姥。所以這里不會(huì)執(zhí)行到。
情況三:任務(wù)成功添加到任務(wù)隊(duì)列群扶,執(zhí)行ensurePrestart方法及刻。
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
addWorker已經(jīng)在ThreadPoolExecutor篇分析過(guò),該方法負(fù)責(zé)同步將線程池?cái)?shù)量+1竞阐,并且創(chuàng)建Worker對(duì)象添加到HashSet中,最后開(kāi)啟Worker對(duì)象中的線程暑劝。因?yàn)镽unnableScheduledFuture對(duì)象已經(jīng)被添加到任務(wù)隊(duì)列骆莹,Worker中的線程通過(guò)getTask方法自然會(huì)取到DelayedWorkQueue中的RunnableScheduledFuture任務(wù)并執(zhí)行它的run方法。
這里需要注意的是addWorker方法只在核心線程數(shù)未達(dá)上限或者沒(méi)有線程的情況下執(zhí)行,并不像ThreadPoolExecutor那樣可以同時(shí)存在多個(gè)非核心線程担猛,ScheduledThreadPoolExecutor最多只支持一個(gè)非核心線程幕垦,除非它終止了不會(huì)再創(chuàng)建新的非核心線程丢氢。
schedule(Callable<V> callable, long delay, TimeUnit unit)
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
protected <V> RunnableScheduledFuture<V> decorateTask(
Callable<V> callable, RunnableScheduledFuture<V> task) {
return task;
}
ScheduledFutureTask(Callable<V> callable, long ns) {
super(callable);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
與schedule(Runnable command,long delay,TimeUnit unit)相比除了可以通過(guò)ScheduledFutureTask的get方法得到返回值外沒(méi)有區(qū)別。
scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
與上述兩個(gè)方法的區(qū)別在于ScheduledFutureTask的構(gòu)造函數(shù)多了參數(shù)period,即任務(wù)執(zhí)行的最小周期:
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
與scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)的區(qū)別是參數(shù)delay傳入到ScheduledFutureTask的構(gòu)造方法中是以負(fù)數(shù)的形式先改。
小結(jié)
四種延時(shí)啟動(dòng)任務(wù)的方法除了構(gòu)造ScheduledFutureTask的參數(shù)不同外疚察,運(yùn)行機(jī)制是相同的。先將任務(wù)添加到DelayedWorkQueue 中仇奶,然后創(chuàng)建Worker對(duì)象貌嫡,啟動(dòng)內(nèi)部線程輪詢DelayedWorkQueue 中的任務(wù)。
那么DelayedWorkQueue的add方法是如何實(shí)現(xiàn)的该溯,線程輪詢DelayedWorkQueue 調(diào)用的poll和take方法又如何實(shí)現(xiàn)岛抄?
回顧getTask方法獲取任務(wù)時(shí)的代碼片段:
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
如果我們?cè)O(shè)置ScheduledThreadPoolExecutor的核心線程數(shù)量為0,則執(zhí)行poll方法狈茉。而對(duì)于核心線程則執(zhí)行take方法夫椭。
下面分析DelayedWorkQueue 的具體實(shí)現(xiàn)。
DelayedWorkQueue
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
private static final int INITIAL_CAPACITY = 16;
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;
}
首先DelayedWorkQueue 是ScheduledThreadPoolExecutor的靜態(tài)內(nèi)部類氯庆。它的內(nèi)部有一個(gè)RunnableScheduledFuture數(shù)組蹭秋,且初始容量為16.這里提前說(shuō)明下,queue 數(shù)組儲(chǔ)存的其實(shí)是二叉樹(shù)結(jié)構(gòu)的索引堤撵,這個(gè)二叉樹(shù)其實(shí)就是最小堆仁讨。
add方法
public boolean add(Runnable e) {
return offer(e);
}
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow();
size = i + 1;
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e);
}
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
private void grow() {
int oldCapacity = queue.length;
int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
if (newCapacity < 0) // overflow
newCapacity = Integer.MAX_VALUE;
queue = Arrays.copyOf(queue, newCapacity);
}
private void setIndex(RunnableScheduledFuture<?> f, int idx) {
if (f instanceof ScheduledFutureTask)
((ScheduledFutureTask)f).heapIndex = idx;
}
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}
在執(zhí)行add方法時(shí)內(nèi)部執(zhí)行的是offer方法,添加RunnableScheduledFuture任務(wù)到隊(duì)列時(shí)先通過(guò)內(nèi)部的ReentrantLock加鎖粒督,因此在多線程調(diào)用schedule(Runnable command,long delay, TimeUnit unit)添加任務(wù)時(shí)也能保證同步陪竿。
接下來(lái)先判斷隊(duì)列是否已滿,若已滿就先通過(guò)grow方法擴(kuò)容屠橄。擴(kuò)容算法是將現(xiàn)有容量*1.5族跛,然后將舊的數(shù)組復(fù)制到新的數(shù)組。(左移一位等于除以2)锐墙。
然后判斷插入的是否為第一個(gè)任務(wù)礁哄,如果是就將RunnableScheduledFuture向下轉(zhuǎn)型為ScheduledFutureTask,并將其heapIndex 屬性設(shè)置為0.
如果不是第一個(gè)任務(wù)溪北,則執(zhí)行siftUp方法桐绒。該方法先找到父親RunnableScheduledFuture對(duì)象節(jié)點(diǎn),將要插入的RunnableScheduledFuture節(jié)點(diǎn)與之compareTo比較之拨,若父親RunnableScheduledFuture對(duì)象的啟動(dòng)時(shí)間小于當(dāng)前要插入的節(jié)點(diǎn)的啟動(dòng)時(shí)間茉继,則將節(jié)點(diǎn)插入到末尾。反之會(huì)對(duì)二叉樹(shù)以啟動(dòng)時(shí)間升序重新排序RunnableScheduledFuture接口的實(shí)現(xiàn)其實(shí)是ScheduledFutureTask類:
new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit);
private long triggerTime(long delay, TimeUnit unit) {
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
/**
* Returns the trigger time of a delayed action.
*/
long triggerTime(long delay) {
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
final long now() {
return System.nanoTime();
}
第三個(gè)參數(shù)triggerTime方法返回的就是任務(wù)延時(shí)的時(shí)間加上當(dāng)前時(shí)間蚀乔。
在ScheduledFutureTask內(nèi)部實(shí)現(xiàn)了compareTo方法:
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
比較的兩個(gè)任務(wù)的啟動(dòng)時(shí)間烁竭。所以DelayedWorkQueue內(nèi)部的二叉樹(shù)是以啟動(dòng)時(shí)間早晚排序的。
poll方法
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null) {
//情況一 空隊(duì)列
if (nanos <= 0)
return null;
else
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
//情況二 已到啟動(dòng)時(shí)間
return finishPoll(first);
if (nanos <= 0)
//情況三 未到啟動(dòng)時(shí)間吉挣,但是線程等待超時(shí)
return null;
first = null; // don't retain ref while waiting
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
非核心線程會(huì)通過(guò)poll方法同步獲取任務(wù)隊(duì)列中的RunnableScheduledFuture派撕,如果隊(duì)列為空或者在timeout內(nèi)還等不到任務(wù)的啟動(dòng)時(shí)間婉弹,都將返回null。如果任務(wù)隊(duì)列不為空终吼,并且首個(gè)任務(wù)已到啟動(dòng)時(shí)間線程就能夠獲取RunnableScheduledFuture任務(wù)并執(zhí)行run方法镀赌。
take方法
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
與非核心線程執(zhí)行的poll方法相比,核心線程執(zhí)行的take方法并不會(huì)超時(shí)际跪,在獲取到首個(gè)將要啟動(dòng)的任務(wù)前商佛,核心線程會(huì)一直阻塞。
finishPoll方法
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
int s = --size;
RunnableScheduledFuture<?> x = queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);
setIndex(f, -1);
return f;
}
在成功獲取任務(wù)后垫卤,DelayedWorkQueue的finishPoll方法會(huì)將任務(wù)移除隊(duì)列威彰,并以啟動(dòng)時(shí)間升序重排二叉樹(shù)。
小結(jié)
DelayedWorkQueue內(nèi)部維持了一個(gè)以任務(wù)啟動(dòng)時(shí)間升序排序的二叉樹(shù)數(shù)組穴肘,啟動(dòng)時(shí)間最靠前的任務(wù)即數(shù)組的首個(gè)位置上的任務(wù)歇盼。核心線程通過(guò)take方法一直阻塞直到獲取首個(gè)要啟動(dòng)的任務(wù)。非核心線程通過(guò)poll方法會(huì)在timeout時(shí)間內(nèi)阻塞嘗試獲取首個(gè)要啟動(dòng)的任務(wù)评抚,如果超過(guò)timeout未得到任務(wù)不會(huì)繼續(xù)阻塞豹缀。
這里要特別說(shuō)明要啟動(dòng)的任務(wù)指的是RunnableScheduledFuture內(nèi)部的time減去當(dāng)前時(shí)間小于等于0,未滿足條件的任務(wù)不會(huì)被take或poll方法返回慨代,這也就保證了未到指定時(shí)間任務(wù)不會(huì)執(zhí)行邢笙。
執(zhí)行ScheduledFutureTask
前面已經(jīng)分析了schedule方法如何將RunnableScheduledFuture插入到DelayedWorkQueue,Worker內(nèi)的線程如何獲取定時(shí)任務(wù)。下面分析任務(wù)的執(zhí)行過(guò)程侍匙,即ScheduledFutureTask的run方法:
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
如果執(zhí)行的是非周期型任務(wù)氮惯,調(diào)用ScheduledFutureTask.super.run()方法,即ScheduledFutureTask的父類FutureTask的run方法想暗。FutureTask的run方法已經(jīng)在ThreadPoolExecutor篇分析過(guò)妇汗,這里不再多說(shuō)。
如果執(zhí)行的是周期型任務(wù)说莫,則執(zhí)行ScheduledFutureTask.super.runAndReset():
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}
這個(gè)方法同run方法比較的區(qū)別是call方法執(zhí)行后不設(shè)置結(jié)果杨箭,因?yàn)橹芷谛腿蝿?wù)會(huì)多次執(zhí)行,所以為了讓FutureTask支持這個(gè)特性除了發(fā)生異常不設(shè)置結(jié)果储狭。
執(zhí)行完任務(wù)后通過(guò)setNextRunTime方法計(jì)算下一次啟動(dòng)時(shí)間:
private void setNextRunTime() {
long p = period;
if (p > 0)
//情況一
time += p;
else
//情況二
time = triggerTime(-p);
}
long triggerTime(long delay) {
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
還記得ScheduledThreadPoolExecutor執(zhí)行定時(shí)任務(wù)的后兩種scheduleAtFixedRate和scheduleWithFixedDelay互婿。
scheduleAtFixedRate會(huì)執(zhí)行到情況一,下一次任務(wù)的啟動(dòng)時(shí)間最早為上一次任務(wù)的啟動(dòng)時(shí)間加period辽狈。
scheduleWithFixedDelay會(huì)執(zhí)行到情況二慈参,這里很巧妙的將period參數(shù)設(shè)置為負(fù)數(shù)到達(dá)這段代碼塊,在此又將負(fù)的period轉(zhuǎn)為正數(shù)刮萌。情況二將下一次任務(wù)的啟動(dòng)時(shí)間設(shè)置為當(dāng)前時(shí)間加period懂牧。
然后將任務(wù)再次添加到任務(wù)隊(duì)列:
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
ScheduledFuture的get方法
既然ScheduledFuture的實(shí)現(xiàn)是ScheduledFutureTask,而ScheduledFutureTask繼承自FutureTask尊勿,所以ScheduledFuture的get方法的實(shí)現(xiàn)就是FutureTask的get方法的實(shí)現(xiàn)僧凤,F(xiàn)utureTask的get方法的實(shí)現(xiàn)分析在ThreadPoolExecutor篇已經(jīng)寫(xiě)過(guò),這里不再敘述元扔。要注意的是ScheduledFuture的get方法對(duì)于非周期任務(wù)才是有效的躯保。
ScheduledThreadPoolExecutor總結(jié)
- ScheduledThreadPoolExecutor是實(shí)現(xiàn)自ThreadPoolExecutor的線程池,構(gòu)造方法中傳入?yún)?shù)n澎语,則最多會(huì)有n個(gè)核心線程工作途事,空閑的核心線程不會(huì)被自動(dòng)終止,而是一直阻塞在DelayedWorkQueue的take方法嘗試獲取任務(wù)。構(gòu)造方法傳入的參數(shù)為0擅羞,ScheduledThreadPoolExecutor將以非核心線程工作尸变,并且最多只會(huì)創(chuàng)建一個(gè)非核心線程,參考上文中ensurePrestart方法的執(zhí)行過(guò)程减俏。而這個(gè)非核心線程以poll方法獲取定時(shí)任務(wù)之所以不會(huì)因?yàn)槌瑫r(shí)就被回收召烂,是因?yàn)槿蝿?wù)隊(duì)列并不為空,只有在任務(wù)隊(duì)列為空時(shí)才會(huì)將空閑線程回收娃承,詳見(jiàn)ThreadPoolExecutor篇的runWorker方法,之前我以為空閑的非核心線程超時(shí)就會(huì)被回收是不正確的,還要具備任務(wù)隊(duì)列為空這個(gè)條件奏夫。
- ScheduledThreadPoolExecutor的定時(shí)執(zhí)行任務(wù)依賴于DelayedWorkQueue,其內(nèi)部用可擴(kuò)容的數(shù)組實(shí)現(xiàn)以啟動(dòng)時(shí)間升序的二叉樹(shù)历筝。
- 工作線程嘗試獲取DelayedWorkQueue的任務(wù)只有在任務(wù)到達(dá)指定時(shí)間才會(huì)成功酗昼,否則非核心線程會(huì)超時(shí)返回null,核心線程一直阻塞梳猪。
- 對(duì)于非周期型任務(wù)只會(huì)執(zhí)行一次并且可以通過(guò)ScheduledFuture的get方法阻塞得到結(jié)果麻削,其內(nèi)部實(shí)現(xiàn)依賴于FutureTask的get方法。
- 周期型任務(wù)通過(guò)get方法無(wú)法獲取有效結(jié)果春弥,因?yàn)镕utureTask對(duì)于周期型任務(wù)執(zhí)行的是runAndReset方法呛哟,并不會(huì)設(shè)置結(jié)果。周期型任務(wù)執(zhí)行完畢后會(huì)重新計(jì)算下一次啟動(dòng)時(shí)間并且再次添加到DelayedWorkQueue中惕稻。
在源代碼的分析過(guò)程中發(fā)現(xiàn)分析DelayedWorkQueue還需要有二叉樹(shù)的升序插入算法的知識(shí)竖共,一開(kāi)始也沒(méi)有認(rèn)出來(lái)這種數(shù)據(jù)結(jié)構(gòu),后來(lái)又看了別人的文章才了解俺祠。這里比較難理解公给,有興趣的同學(xué)可以參考《深度解析Java8 – ScheduledThreadPoolExecutor源碼解析》。