ScheduledFutureTask
run
public void run() {
// 首先判斷是否是周期性任務(wù)
boolean periodic = isPeriodic();
// 如果該任務(wù)不能再當(dāng)前的線程池狀態(tài)下運(yùn)行与境,那么取消任務(wù)
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 如果不是周期性任務(wù)验夯,那么該任務(wù)就是個(gè)普通的FutureTask,直接調(diào)用父類的run
else if (!periodic)
ScheduledFutureTask.super.run();
// 到這里摔刁,說(shuō)明是周期性任務(wù)挥转,那么執(zhí)行且重置任務(wù)
else if (ScheduledFutureTask.super.runAndReset()) {
// 計(jì)算下個(gè)周期
setNextRunTime();
//
reExecutePeriodic(outerTask);
}
}
reExecutePeriodic
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
// 如果當(dāng)前線程池狀態(tài)允許執(zhí)行周期性任務(wù)
if (canRunInCurrentRunState(true)) {
// 將該任務(wù)重新再假如到任務(wù)列表中
super.getQueue().add(task);
// 如果現(xiàn)在線程池狀態(tài)又不允許了,那么從任務(wù)列表中移除該任務(wù)共屈,且取消執(zhí)行
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
// 否則新增工作線程绑谣,worker會(huì)自己去拿任務(wù)
ensurePrestart();
}
}
delayedExecute
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 如果線程池已經(jīng)SHUTDOWN,那么執(zhí)行拒絕策略
if (isShutdown())
reject(task);
else {
// 將任務(wù)加到任務(wù)列表中
super.getQueue().add(task);
// 如果線程池是SHUTDOWN拗引,且任務(wù)不能再SHUTDOWN之后繼續(xù)執(zhí)行
// 那么這個(gè)任務(wù)沒(méi)有存在的意義借宵,從隊(duì)列中刪除該任務(wù),并嘗試取消任務(wù)執(zhí)行
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
// 添加工作線程矾削,worker會(huì)自己去拿任務(wù)
ensurePrestart();
}
}
ensurePrestart
void ensurePrestart() {
// 拿到工作線程數(shù)
int wc = workerCountOf(ctl.get());
// 如果小于核心線程數(shù)壤玫,那么增加工作線程,worker會(huì)自己去拿任務(wù)
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
DelayedWorkQueue
offer
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
// 如果超過(guò)queue的長(zhǎng)度哼凯,那么擴(kuò)容
if (i >= queue.length)
grow();
// 否則長(zhǎng)度加一
size = i + 1;
// 如果當(dāng)前隊(duì)列為空垦细,那么該任務(wù)作為首節(jié)點(diǎn)
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
// 否則做堆上浮的操作,將該任務(wù)放在二叉堆中合適的位置
} else {
siftUp(i, e);
}
// 經(jīng)過(guò)上面的操作挡逼,如果當(dāng)前隊(duì)列的首節(jié)點(diǎn)是當(dāng)前任務(wù)括改,那么喚醒一個(gè)等待線程開始處理
// 如果首節(jié)點(diǎn),不是當(dāng)前任務(wù)的話,那么這里只是插入嘱能,不做喚醒
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
take
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 先鎖隊(duì)列
lock.lockInterruptibly();
try {
for (;;) {
// 拿到首節(jié)點(diǎn)
RunnableScheduledFuture<?> first = queue[0];
// 如果首節(jié)點(diǎn)為空吝梅,那么等待,直到有任務(wù)為止
// Leader-Follower pattern
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
// 如果首節(jié)點(diǎn)已經(jīng)超時(shí)惹骂,說(shuō)明到了該任務(wù)的執(zhí)行時(shí)間了苏携,那么返回該任務(wù)
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
// 如果leader不為空,說(shuō)明有其他線程在執(zhí)行任務(wù)对粪,那么這里無(wú)限等待
if (leader != null)
available.await();
else {
// 否則右冻,說(shuō)明leader現(xiàn)在是空擋
Thread thisThread = Thread.currentThread();
// 將當(dāng)前線程作為新的leader
leader = thisThread;
try {
// 等待任務(wù)
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
finishPoll
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
int s = --size;
RunnableScheduledFuture<?> x = queue[s];
queue[s] = null;
if (s != 0)
// 將最后節(jié)點(diǎn)放到堆頂,做堆下沉著拭,重新整理二叉堆
siftDown(0, x);
setIndex(f, -1);
return f;
}