概述
ScheduledThreadPoolExecutor是執(zhí)行定時(shí)調(diào)度的線程池寸癌,它是ThreadPoolExecutor的子類。今天我們就一起看看ScheduledThreadPoolExecutor的源碼钮糖,測(cè)試?yán)舆€是在github中怠苔。
構(gòu)造方法
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
可以看到ScheduledThreadPoolExecutor默認(rèn)最大線程數(shù)使用的無(wú)限線程垃杖,其實(shí)這個(gè)值沒有效果酬屉,我們通過ThreadPoolExecutor源碼知道,只有隊(duì)列滿了才會(huì)蜈缤,創(chuàng)建核心線程數(shù)以外的線程處理任務(wù)拾氓。而DelayedWorkQueue又是無(wú)界的,DelayedWorkQueue我們先知道它是無(wú)界的有序的隊(duì)列就好底哥,稍后再詳細(xì)分析咙鞍,先一起看ScheduledThreadPoolExecutor常用的scheduleWithFixedDelay方法。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
//通過提交的Runnable對(duì)象趾徽,構(gòu)造一個(gè)ScheduledFutureTask對(duì)象
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
//t其實(shí)就是ScheduledFutureTask對(duì)象sft
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
//延遲執(zhí)行t
delayedExecute(t);
return t;
}
可以看到就是將提交的Runnable對(duì)象封裝成ScheduledFutureTask對(duì)象執(zhí)行任務(wù)续滋,一起看看ScheduledFutureTask類吧。
ScheduledFutureTask類結(jié)構(gòu)
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
//加入線程池的任務(wù)序號(hào)孵奶,用于任務(wù)的比較
private final long sequenceNumber;
//任務(wù)執(zhí)行的時(shí)間(納秒數(shù))
private long time;
//重復(fù)任務(wù)的周期疲酌,以納秒為單位(值為0時(shí)表示只執(zhí)行一次)
private final long period;
//用于重復(fù)執(zhí)行時(shí),重新進(jìn)入隊(duì)列排隊(duì)
RunnableScheduledFuture<V> outerTask = this;
//對(duì)象在隊(duì)列中的下標(biāo)值了袁,方便取消任務(wù)時(shí)朗恳,快速移除節(jié)點(diǎn)
int heapIndex;
//構(gòu)造方法
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
。载绿。粥诫。
}
該類有兩個(gè)重要的方法: compareTo用于排序確定在隊(duì)列中的位置,run方法是任務(wù)執(zhí)行時(shí)調(diào)用的方法崭庸。
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
//比較任務(wù)執(zhí)行的時(shí)間怀浆,時(shí)間相同時(shí),根據(jù)sequenceNumber確定大小
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
public void run() {
//根據(jù)periodic是否為0判斷是否是周期性任務(wù)
boolean periodic = isPeriodic();
//判斷為非可執(zhí)行任務(wù)狀態(tài)時(shí)怕享,取消任務(wù)
//(注意進(jìn)方法內(nèi)部可以看到执赡,判斷中多一個(gè)關(guān)閉線程池后是否繼續(xù)執(zhí)行后續(xù)延遲任務(wù)或周期性任務(wù),就是說ScheduledThreadPoolExecutor提供了一個(gè)關(guān)閉線程池函筋,仍然執(zhí)行定時(shí)任務(wù)的方法)
if (!canRunInCurrentRunState(periodic))
cancel(false);
//不是周期性任務(wù)沙合,直接執(zhí)行
else if (!periodic)
ScheduledFutureTask.super.run();
//是周期性任務(wù)runAndReset方法會(huì)執(zhí)行在執(zhí)行結(jié)束時(shí)將任務(wù)的狀態(tài)重置為NEW,便于下次再次執(zhí)行
else if (ScheduledFutureTask.super.runAndReset()) {
//設(shè)置下周執(zhí)行的時(shí)間
setNextRunTime();
//再次執(zhí)行周期行任務(wù)
reExecutePeriodic(outerTask);
}
}
reExecutePeriodic方法 再次執(zhí)行周期性任務(wù)
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
//將任務(wù)重新入隊(duì)
super.getQueue().add(task);
//如果線程池已關(guān)閉驻呐,將任務(wù)取消
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();//再次執(zhí)行任務(wù)
}
}
ensurePrestart 方法
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
//如果線程池小于核心線程數(shù)灌诅,新建線程執(zhí)行,否則在隊(duì)列中等待線程池中的線程調(diào)用getTask方法
if (wc < corePoolSize)
addWorker(null, true); //這里加了一個(gè)空任務(wù)含末,說明讓從延遲隊(duì)列中獲取任務(wù)執(zhí)行,達(dá)到延遲執(zhí)行的目的
else if (wc == 0)
addWorker(null, false);
}
任務(wù)的運(yùn)行周期性執(zhí)行邏輯我們已經(jīng)看了即舌,我們?cè)俅位氐絪cheduleAtFixedRate方法中佣盒,看看任務(wù)的第一次執(zhí)行,可以看到方法中有個(gè)delayedExecute方法顽聂。
private void delayedExecute(RunnableScheduledFuture<?> task) {
//如果線程池是關(guān)閉狀態(tài)肥惭,則拒絕任務(wù)
if (isShutdown())
reject(task);
else {
//將任務(wù)入隊(duì)
super.getQueue().add(task);
//入隊(duì)后有個(gè)反悔期盯仪,如果線程池已關(guān)閉,移除任務(wù)
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart(); //這里也是調(diào)用ensurePrestart蜜葱,如果線程數(shù)量小于核心線程數(shù)全景,則新建線程執(zhí)行任務(wù)
}
}
至此,任務(wù)的執(zhí)行邏輯已經(jīng)清楚了牵囤,scheduleWithFixedDelay每次執(zhí)行完才會(huì)放一個(gè)延遲任務(wù)爸黄,而scheduleAtFixedRate是執(zhí)行前,先放一個(gè)延遲任務(wù)入隊(duì)揭鳞,所以scheduleWithFixedDelay是每次執(zhí)行完成后的指定時(shí)間后炕贵,再執(zhí)行下一任務(wù),scheduleAtFixedRate是固定時(shí)間后執(zhí)行下一任務(wù)野崇,不管前面的任務(wù)是否完成称开。
下面我們看看ScheduledThreadPoolExecutor中的阻塞隊(duì)列和普通的隊(duì)列有啥區(qū)別。
DelayedWorkQueue 有序乓梨、無(wú)界鳖轰,其實(shí)就是一個(gè)專門接受RunnableScheduledFuture對(duì)象的DelayQueue隊(duì)列。
在上面的分析中扶镀,可以看到任務(wù)執(zhí)行前都是先入隊(duì)蕴侣,DelayedWorkQueue的add方法
public boolean add(Runnable e) {
return offer(e);
}
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
//隊(duì)列滿時(shí),將數(shù)組增加百分之五十
if (i >= queue.length)
grow();
size = i + 1;
//第一次入隊(duì)時(shí)狈惫,放入隊(duì)列的第一個(gè)位置
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
//將節(jié)點(diǎn)入隊(duì)
siftUp(i, e);
}
//如果新入隊(duì)的節(jié)點(diǎn)成為了頭節(jié)點(diǎn)睛蛛,leader線程置為空,進(jìn)行一次喚醒等待線程
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
siftUp方法
private void siftUp(int k, RunnableScheduledFuture<?> key) {
//將等待隊(duì)列數(shù)組構(gòu)建成一個(gè)小頂堆
while (k > 0) {
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}