什么是線程池
所謂線程池舀透,就是將多個線程放在一個池子里面(所謂池化技術(shù)),然后需要線程的時候不是創(chuàng)建一個線程决左,而是從線程池里面獲取一個可用的線程愕够,然后執(zhí)行我們的任務(wù)。線程池的關(guān)鍵在于它為我們管理了多個線程佛猛,我們不需要關(guān)心如何創(chuàng)建線程惑芭,我們只需要關(guān)系我們的核心業(yè)務(wù),然后需要線程來執(zhí)行任務(wù)的時候從線程池中獲取線程继找。任務(wù)執(zhí)行完之后線程不會被銷毀遂跟,而是會被重新放到池子里面,等待機會去執(zhí)行任務(wù)婴渡。
我們?yōu)槭裁葱枰€程池呢幻锁?首先一點是線程池為我們提高了一種簡易的多線程編程方案,我們不需要投入太多的精力去管理多個線程边臼,線程池會自動幫我們管理好哄尔,它知道什么時候該做什么事情,我們只要在需要的時候去獲取就可以了柠并。其次岭接,我們使用線程池很大程度上歸咎于創(chuàng)建和銷毀線程的代價是非常昂貴的,甚至我們創(chuàng)建和銷毀線程的資源要比我們實際執(zhí)行的任務(wù)所花費的時間還要長臼予,這顯然是不科學(xué)也是不合理的鸣戴,而且如果沒有一個合理的管理者,可能會出現(xiàn)創(chuàng)建了過多的線程的情況粘拾,也就是在JVM中存活的線程過多窄锅,而存活著的線程也是需要銷毀資源的,另外一點缰雇,過多的線程可能會造成線程過度切換的尷尬境地酬滤。
如何使用線程池签餐。
定義2個線程
public class PlayThread implements Runnable{
@Override
public void run() {
// TODO Auto-generated method stub
for(int i=0;i<100;i++) {
System.out.println(Thread.currentThread().getName()+"在玩");
}
}
}
public class WorkThread implements Runnable {
@Override
public void run() {
// TODO Auto-generated method stub
for (int i = 0; i < 100; i++) {
System.out.println(Thread.currentThread().getName() + "在工作");
}
}
}
普通線程池
// 普通線程池
public static void fun1() {
// 1創(chuàng)建一個線程池 2個線程
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 2提交任務(wù)
WorkThread work1 = new WorkThread();
WorkThread work2 = new WorkThread();
PlayThread play = new PlayThread();
// execute()方法沒有返回值 submit()方法有返回值
executorService.execute(work1);
executorService.execute(work2);
executorService.execute(play);
// 初始2個。執(zhí)行3個線程 前面2個執(zhí)行完畢了盯串。第三個才會執(zhí)行
// 3停止線程池
// executorService.shutdownNow();
}
調(diào)度線程池
// 調(diào)度線程池
public static void fun2() {
// 1創(chuàng)建一個調(diào)度線程池
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
// 2提交周期性任務(wù)
WorkThread work1 = new WorkThread();
// command:執(zhí)行線程
// initialDelay:初始化延時
// period:兩次開始執(zhí)行最小間隔時間
// unit:計時單位
scheduledExecutorService.scheduleAtFixedRate(work1, 10, 2, TimeUnit.SECONDS);
// 3停止線程池
// scheduledExecutorService.shutdownNow();
}
可以發(fā)現(xiàn)使用線程池非常簡單氯檐,只需要極少的代碼就可以創(chuàng)建出我們需要的線程池,然后將我們的任務(wù)提交到線程池中去体捏。我們只需要在結(jié)束之時記得關(guān)閉線程池就可以了冠摄。本文的重點并非在于如何使用線程池,而是試圖剖析線程池的實現(xiàn)几缭,比如一個調(diào)度線程池是怎么實現(xiàn)的河泳?是靠什么實現(xiàn)的?為什么能這樣實現(xiàn)等等問題年栓。
Java線程池實現(xiàn)架構(gòu)
Java中與線程池相關(guān)的類有下面一些:
Executor
ExecutorService
ScheduledExecutorService
ThreadPoolExecutor
ScheduledThreadPoolExecutor
Executors
通過上面一節(jié)中的使用示例拆挥,可以發(fā)現(xiàn)Executors類是一個創(chuàng)建線程池的有用的類,事實上某抓,Executors類的角色也就是創(chuàng)建線程池纸兔,它是一個工廠類,可以產(chǎn)生不同類型的線程池否副,而Executor是線程池的鼻祖類汉矿,它有兩個子類是ExecutorService和ScheduledExecutorService,而ThreadPoolExecutor和ScheduledThreadPoolExecutor則是真正的線程池备禀,我們的任務(wù)將被這兩個類交由其所管理者的線程池運行洲拇,可以發(fā)現(xiàn),ScheduledThreadPoolExecutor是一個集大成者類曲尸,下面我們可以看看它的類關(guān)系圖:
ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor赋续,ThreadPoolExecutor實現(xiàn)了一般的線程池,沒有調(diào)度功能另患,而ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor的實現(xiàn)蚕捉,然后增加了調(diào)度功能。
最為原始的Executor只有一個方法execute柴淘,它接受一個Runnable類型的參數(shù),意思是使用線程池來執(zhí)行這個Runnable秘通,可以發(fā)現(xiàn)Executor不提供有返回值的任務(wù)为严。ExecutorService繼承了Executor,并且極大的增強了Executor的功能肺稀,不僅支持有返回值的任務(wù)執(zhí)行第股,而且還有很多十分有用的方法來為你提供服務(wù),下面展示了ExecutorService提供的方法:
ScheduledExecutorService繼承了ExecutorService话原,并且增加了特有的調(diào)度(schedule)功能夕吻。關(guān)于Executor诲锹、ExecutorService和ScheduledExecutorService的關(guān)系,可以見下圖:
總結(jié)一下归园,經(jīng)過我們的調(diào)研,可以發(fā)現(xiàn)其實對于我們編寫多線程代碼來說稚矿,最為核心的是Executors類庸诱,根據(jù)我們是需要ExecutorService類型的線程池還是ScheduledExecutorService類型的線程池調(diào)用相應(yīng)的工廠方法就可以了,而ExecutorService的實現(xiàn)表現(xiàn)在ThreadPoolExecutor上晤揣,ScheduledExecutorService的實現(xiàn)則表現(xiàn)在ScheduledThreadPoolExecutor上桥爽,下文將分別剖析這兩者,嘗試弄清楚線程池的原理昧识。
ThreadPoolExecutor解析
上文中描述了Java中線程池相關(guān)的架構(gòu)钠四,了解了這些內(nèi)容其實我們就可以使用java的線程池為我們工作了,使用其提供的線程池我們可以很方便的寫出高質(zhì)量的多線程代碼跪楞,本節(jié)將分析ThreadPoolExecutor的實現(xiàn)缀去,來探索線程池的運行原理。下面的圖片展示了ThreadPoolExecutor的類圖:
下面是幾個比較關(guān)鍵的類成員:
private final BlockingQueue<Runnable> workQueue; // 任務(wù)隊列习霹,我們的任務(wù)會添加到該隊列里面朵耕,線程將從該隊列獲取任務(wù)來執(zhí)行
private final HashSet<Worker> workers = new HashSet<Worker>();//任務(wù)的執(zhí)行值集合,來消費workQueue里面的任務(wù)
private volatile ThreadFactory threadFactory;//線程工廠
private volatile RejectedExecutionHandler handler;//拒絕策略淋叶,默認(rèn)會拋出異異常阎曹,還要其他幾種拒絕策略如下:
1、CallerRunsPolicy:在調(diào)用者線程里面運行該任務(wù)
2煞檩、DiscardPolicy:丟棄任務(wù)
3处嫌、DiscardOldestPolicy:丟棄workQueue的頭部任務(wù)
private volatile int corePoolSize;//最低保活work數(shù)量
private volatile int maximumPoolSize;//work上限
我們嘗試執(zhí)行submit方法斟湃,下面是執(zhí)行的關(guān)鍵路徑熏迹,總結(jié)起來就是:如果Worker數(shù)量還沒達到上限則繼續(xù)創(chuàng)建,否則提交任務(wù)到workQueue凝赛,然后讓worker來調(diào)度運行任務(wù)注暗。
step 1: <ExecutorService>
Future<?> submit(Runnable task);
step 2:<AbstractExecutorService>
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
step 3:<Executor>
void execute(Runnable command);
step 4:<ThreadPoolExecutor>
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { //提交我們的額任務(wù)到workQueue
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false)) //使用maximumPoolSize作為邊界
reject(command); //還不行?拒絕提交的任務(wù)
}
step 5:<ThreadPoolExecutor>
private boolean addWorker(Runnable firstTask, boolean core)
step 6:<ThreadPoolExecutor>
w = new Worker(firstTask); //包裝任務(wù)
final Thread t = w.thread; //獲取線程(包含任務(wù))
workers.add(w); // 任務(wù)被放到works中
t.start(); //執(zhí)行任務(wù)
上面的流程是高度概括的墓猎,實際情況遠比這復(fù)雜得多捆昏,但是我們關(guān)心的是怎么打通整個流程,所以這樣分析問題是沒有太大的問題的毙沾。觀察上面的流程骗卜,我們發(fā)現(xiàn)其實關(guān)鍵的地方在于Worker,如果弄明白它是如何工作的,那么我們也就大概明白了線程池是怎么工作的了寇仓。下面分析一下Worker類举户。
上面的圖片展示了Worker的類關(guān)系圖,關(guān)鍵在于他實現(xiàn)了Runnable接口遍烦,所以問題的關(guān)鍵就在于run方法上俭嘁。在這之前,我們來看一下Worker類里面的關(guān)鍵成員:
final Thread thread;
Runnable firstTask; //我們提交的任務(wù)乳愉,可能被立刻執(zhí)行兄淫,也可能被放到隊列里面
thread是Worker的工作線程,上面的分析我們也發(fā)現(xiàn)了在addWorker中會獲取worker里面的thread然后start蔓姚,也就是這個線程的執(zhí)行捕虽,而Worker實現(xiàn)了Runnable接口,所以在構(gòu)造thread的時候Worker將自己傳遞給了構(gòu)造函數(shù)坡脐,thread.start執(zhí)行的其實就是Worker的run方法泄私。下面是run方法的內(nèi)容:
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
我們來分析一下runWorker這個方法,這就是整個線程池的核心备闲。首先獲取到了我們剛提交的任務(wù)firstTask晌端,然后會循環(huán)從workQueue里面獲取任務(wù)來執(zhí)行,獲取任務(wù)的方法如下:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
其實核心也就一句:
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
我們再回頭看一下execute恬砂,其實我們上面只走了一條邏輯咧纠,在execute的時候,我們的worker的數(shù)量還沒有到達我們設(shè)定的corePoolSize的時候泻骤,會走上面我們分析的邏輯漆羔,而如果達到了我們設(shè)定的閾值之后,execute中會嘗試去提交任務(wù)狱掂,如果提交成功了就結(jié)束演痒,否則會拒絕任務(wù)的提交。我們上面還提到一個成員:maximumPoolSize趋惨,其實線程池的最大的Worker數(shù)量應(yīng)該是maximumPoolSize鸟顺,但是我們上面的分析是corePoolSize纠屋,這是因為我們的private boolean addWorker(Runnable firstTask, boolean core)的參數(shù)core的值來控制的理疙,core為true則使用corePoolSize來設(shè)定邊界蜓堕,否則使用maximumPoolSize來設(shè)定邊界组橄。直觀的解釋一下,當(dāng)線程池里面的Worker數(shù)量還沒有到corePoolSize轴踱,那么新添加的任務(wù)會伴隨著產(chǎn)生一個新的worker设褐,如果Worker的數(shù)量達到了corePoolSize讹俊,那么就將任務(wù)存放在阻塞隊列中等待Worker來獲取執(zhí)行挤悉,如果沒有辦法再向阻塞隊列放任務(wù)了,那么這個時候maximumPoolSize就變得有用了,新的任務(wù)將會伴隨著產(chǎn)生一個新的Worker装悲,如果線程池里面的Worker已經(jīng)達到了maximumPoolSize昏鹃,那么接下來提交的任務(wù)只能被拒絕策略拒絕了【髡铮可以參考下面的描述來理解:
* When a new task is submitted in method {@link #execute(Runnable)},
* and fewer than corePoolSize threads are running, a new thread is
* created to handle the request, even if other worker threads are
* idle. If there are more than corePoolSize but less than
* maximumPoolSize threads running, a new thread will be created only
* if the queue is full. By setting corePoolSize and maximumPoolSize
* the same, you create a fixed-size thread pool. By setting
* maximumPoolSize to an essentially unbounded value such as {@code
* Integer.MAX_VALUE}, you allow the pool to accommodate an arbitrary
* number of concurrent tasks. Most typically, core and maximum pool
* sizes are set only upon construction, but they may also be changed
* dynamically using {@link #setCorePoolSize} and {@link
* #setMaximumPoolSize}.
在此需要說明一點洞渤,有一個重要的成員:keepAliveTime,當(dāng)線程池里面的線程數(shù)量超過corePoolSize了属瓣,那么超出的線程將會在空閑keepAliveTime之后被terminated载迄。可以參考下面的文檔:
* If the pool currently has more than corePoolSize threads,
* excess threads will be terminated if they have been idle for more
* than the keepAliveTime (see {@link #getKeepAliveTime(TimeUnit)}).
ScheduledThreadPoolExecutor解析
ScheduledThreadPoolExecutor適用于延時執(zhí)行抡蛙,或者周期性執(zhí)行的任務(wù)調(diào)度护昧,ScheduledThreadPoolExecutor在實現(xiàn)上繼承了ThreadPoolExecutor,所以你依然可以將ScheduledThreadPoolExecutor當(dāng)成ThreadPoolExecutor來使用粗截,但是ScheduledThreadPoolExecutor的功能要強大得多惋耙,因為ScheduledThreadPoolExecutor可以根據(jù)設(shè)定的參數(shù)來周期性調(diào)度運行,下面的圖片展示了四個和周期性相關(guān)的方法:
- 如果你想延時一段時間之后運行一個Runnable熊昌,那么使用第一個方法
- 如果你想延時一段時間然后運行一個Callable绽榛,那么使用的第二個方法
- 如果你想要延時一段時間,然后根據(jù)設(shè)定的參數(shù)周期執(zhí)行Runnable婿屹,那么可以選擇第三個和第四個方法灭美,第三個方法和第四個方法的區(qū)別在于:第三個方法嚴(yán)格按照規(guī)劃的時間路徑來執(zhí)行,比如周期為2昂利,延時為0届腐,那么執(zhí)行的序列為0,2页眯,4梯捕,6,8....窝撵,而第四個方法將基于上次執(zhí)行時間來規(guī)劃下次的執(zhí)行傀顾,也就是在上次執(zhí)行完成之后再次執(zhí)行。比如上面的執(zhí)行序列0碌奉,2短曾,4,6赐劣,8...嫉拐,如果第2秒沒有被調(diào)度執(zhí)行,而在第三秒的時候才被調(diào)度魁兼,那么下次執(zhí)行的時間不是4婉徘,而是5,以此類推。
下面來看一下這四個方法的一些細節(jié):
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
通過上面的代碼我們可以發(fā)現(xiàn)盖呼,前兩個方法是類似的儒鹿,后兩個方法也是類似的。前兩個方法屬于一次性調(diào)度几晤,所以period都為0约炎,區(qū)別在于參數(shù)不同,一個是Runnable蟹瘾,而一個是Callable圾浅,可笑的是,最后都變?yōu)榱薈allable了憾朴,見下面的構(gòu)造函數(shù):
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
對于后兩個方法狸捕,區(qū)別僅僅在于period的,scheduleWithFixedDelay對參數(shù)進行了操作伊脓,將原來的時間變?yōu)樨?fù)數(shù)了府寒,而后面在計算下次被調(diào)度的時間的時候會根據(jù)這個參數(shù)的正負(fù)值來分別處理,正數(shù)代表scheduleAtFixedRate报腔,而負(fù)數(shù)代表了scheduleWithFixedDelay株搔。
一個需要被我們注意的細節(jié)是,以上四個方法最后都會調(diào)用一個方法: delayedExecute(t)纯蛾,下面看一下這個方法:
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
大概的意思就是先判斷線程池是否被關(guān)閉了纤房,如果被關(guān)閉了,則拒絕任務(wù)的提交翻诉,否則將任務(wù)加入到任務(wù)隊列中去等待被調(diào)度執(zhí)行炮姨。最后的ensurePrestart的意思是需要確保線程池已經(jīng)被啟動起來了。下面是這個方法:
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
主要是增加了一個沒有任務(wù)的worker碰煌,有什么用呢舒岸?我們還記得Worker的邏輯嗎?addWorker方法的執(zhí)行芦圾,會觸發(fā)Worker的run方法的執(zhí)行蛾派,然后runWorker方法就會被執(zhí)行,而runWorker方法是循環(huán)從workQueue中取任務(wù)執(zhí)行的个少,所以確保線程池被啟動起來是重要的洪乍,而只需要簡單的執(zhí)行addWorker便會觸發(fā)線程池的啟動流程。對于調(diào)度線程池來說夜焦,只要執(zhí)行了addWorker方法壳澳,那么線程池就會一直在后臺周期性的調(diào)度執(zhí)行任務(wù)。
到此茫经,似乎我們還是沒有鬧明白ScheduledThreadPoolExecutor是如何實現(xiàn)周期性的巷波,上面講到四個scheduled方法時萎津,我們沒有提一個重要的類:ScheduledFutureTask,對抹镊,所有神奇的事情將會發(fā)生在這個類中姜性,下面來分析一下這個類。
看上面的類圖髓考,貌似這個類非常復(fù)雜,還好弃酌,我們發(fā)現(xiàn)他實現(xiàn)了Runnable接口氨菇,那么必然會有一個run方法,而這個run方法必然是整個類的核心妓湘,下面來看一下這個run方法的內(nèi)容:
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
}
首先查蓉,判斷是否是周期性的任務(wù),如果不是榜贴,則直接執(zhí)行(一次性)豌研,否則執(zhí)行,然后設(shè)置下次執(zhí)行的時間唬党,然后重新調(diào)度鹃共,等待下次執(zhí)行。這里有一個方法需要注意驶拱,也就是setNextRunTime霜浴,上面我們提到scheduleAtFixedRate和scheduleWithFixedDelay在傳遞參數(shù)時不一樣,后者將delay值變?yōu)榱素?fù)數(shù)蓝纲,所以下面的處理正好印證了前文所述阴孟。
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
下面來看一下reExecutePeriodic方法是如何做的,他的目標(biāo)是將任務(wù)再次被調(diào)度執(zhí)行税迷,下面的代碼展示了這個功能的實現(xiàn):
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
可以看到永丝,這個方法就是將我們的任務(wù)再次放到了workQueue里面,那這個參數(shù)是什么箭养?在上面的run方法中我們調(diào)用了reExecutePeriodic方法慕嚷,參數(shù)為outerTask,而這個變量是什么露懒?看下面的代碼:
/** The actual task to be re-enqueued by reExecutePeriodic */
RunnableScheduledFuture<V> outerTask = this;
這個變量指向了自己闯冷,而this的類型是什么?是ScheduledFutureTask懈词,也就是可以被調(diào)度的task蛇耀,這樣就實現(xiàn)了循環(huán)執(zhí)行任務(wù)了。
上面的分析已經(jīng)到了循環(huán)執(zhí)行坎弯,但是ScheduledThreadPoolExecutor的功能是周期性執(zhí)行纺涤,所以我們接著分析ScheduledThreadPoolExecutor是如何根據(jù)我們的參數(shù)走走停停的译暂。這個時候,是應(yīng)該看一下ScheduledThreadPoolExecutor的構(gòu)造函數(shù)了撩炊,我們來看一個最簡單的構(gòu)造函數(shù):
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
我們知道ScheduledThreadPoolExecutor的父類是ThreadPoolExecutor外永,所以這里的super其實是ThreadPoolExecutor的構(gòu)造函數(shù),我們發(fā)現(xiàn)其中有一個參數(shù)DelayedWorkQueue拧咳,看名字貌似是一個延遲隊列的樣子伯顶,進一步跟蹤代碼,發(fā)現(xiàn)了下面的一行代碼(構(gòu)造函數(shù)中):
this.workQueue = workQueue;
所以在ScheduledThreadPoolExecutor中骆膝,workQueue是一個DelayedWorkQueue類型的隊列祭衩,我們暫且認(rèn)為DelayedWorkQueue是一種具備延遲功能的隊列吧,那么阅签,到此我們便可以想明白了掐暮,上面的分析我們明白了ScheduledThreadPoolExecutor是如何循環(huán)執(zhí)行任務(wù)的,而這里我們明白了ScheduledThreadPoolExecutor使用DelayedWorkQueue來達到延遲的目標(biāo)政钟,所以組合起來路克,就可以實現(xiàn)ScheduledThreadPoolExecutor周期性執(zhí)行的目標(biāo)。下面我們來看一下DelayedWorkQueue是如何做到延遲的吧养交,上文中提到一個方法:getTask精算,這個方法的作用是從workQueue中取出任務(wù)來執(zhí)行,而在ScheduledThreadPoolExecutor里面碎连,getTask方法是從DelayedWorkQueue中取任務(wù)的殖妇,而取任務(wù)無非兩個方法:poll或者take,下面我們對DelayedWorkQueue的take方法來分析一下:
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
在for循環(huán)里面破花,首先從queue中獲取第一個任務(wù)谦趣,然后從任務(wù)中取出延遲時間,而后使用available變量來實現(xiàn)延遲效果座每。這里面需要幾個點需要探索一下:
這個queue是什么東西前鹅?
延遲時間的來龍去脈?
available變量的來龍去脈峭梳?
對于第一個問題舰绘,看下面的代碼:
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
它是一個RunnableScheduledFuture類型的數(shù)組,下面是RunnableScheduledFuture類的類關(guān)系圖:
數(shù)組里面保存了我們的RunnableScheduledFuture葱椭,對queue的操作捂寿,主要來看一下增加元素和消費元素的操作。首先孵运,假設(shè)使用add方法來增加RunnableScheduledFuture到queue秦陋,調(diào)用的鏈路如下:
public boolean add(Runnable e) {
return offer(e);
}
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow();
size = i + 1;
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e);
}
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
解釋一下,add方法直接轉(zhuǎn)到了offer方法治笨,該方法中驳概,首先判斷數(shù)組的容量是否足夠赤嚼,如果不夠則grow,增長的策略如下:
int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
每次增長50%顺又,入戲下去更卒。增長完成后,如果這是第一個元素稚照,則放在坐標(biāo)為0的位置蹂空,否則,使用siftUp操作果录,下面是該方法的內(nèi)容:
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}
這個數(shù)組實現(xiàn)了堆這種數(shù)據(jù)結(jié)構(gòu)腌闯,使用對象比較將最需要被調(diào)度執(zhí)行的RunnableScheduledFuture放到數(shù)組的前面,而這得力于compareTo方法雕憔,下面是RunnableScheduledFuture類的compareTo方法的實現(xiàn),主要是通過延遲時間來做比較糖声。
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
上面是生產(chǎn)元素斤彼,下面來看一下消費數(shù)據(jù)。在上面我們提到的take方法中蘸泻,使用了一個方法如下:
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
int s = --size;
RunnableScheduledFuture<?> x = queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);
setIndex(f, -1);
return f;
}
這個方法中調(diào)用了一個方法siftDown琉苇,這個方法如下:
private void siftDown(int k, RunnableScheduledFuture<?> key) {
int half = size >>> 1;
while (k < half) {
int child = (k << 1) + 1;
RunnableScheduledFuture<?> c = queue[child];
int right = child + 1;
if (right < size && c.compareTo(queue[right]) > 0)
c = queue[child = right];
if (key.compareTo(c) <= 0)
break;
queue[k] = c;
setIndex(c, k);
k = child;
}
queue[k] = key;
setIndex(key, k);
}
對其的解釋就是:
Replaces first element with last and sifts it down. Call only when holding lock.
總結(jié)一下,當(dāng)我們向queue插入任務(wù)的時候悦施,會發(fā)生siftUp方法的執(zhí)行并扇,這個時候會把任務(wù)盡量往根部移動,而當(dāng)我們完成任務(wù)調(diào)度之后抡诞,會發(fā)生siftDown方法的執(zhí)行穷蛹,與siftUp相反,siftDown方法會將任務(wù)盡量移動到queue的末尾昼汗‰妊總之,大概的意思就是queue通過compareTo實現(xiàn)了類似于優(yōu)先級隊列的功能顷窒。
下面我們來看一下第二個問題:延遲時間的來龍去脈蛙吏。在上面的take方法里面,首先獲取了delay鞋吉,然后再使用available來做延遲效果鸦做,那這個delay從哪里來的呢?通過上面的類圖RunnableScheduledFuture的類圖我們知道谓着,RunnableScheduledFuture類實現(xiàn)了Delayed接口泼诱,而Delayed接口里面的唯一方法是getDelay,我們到RunnableScheduledFuture里面看一下這個方法的具體實現(xiàn):
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
time是我們設(shè)定的下次執(zhí)行的時間赊锚,所以延遲就是(time - now())坷檩,沒毛踩匆簟!
第三個問題:available變量的來龍去脈矢炼,至于這個問題系瓢,我們看下面的代碼:
/**
* Condition signalled when a newer task becomes available at the
* head of the queue or a new thread may need to become leader.
*/
private final Condition available = lock.newCondition();
這是一個條件變量,take方法里面使用這個變量來做延遲效果句灌。Condition可以在多個線程間做同步協(xié)調(diào)工作夷陋,更為具體細致的關(guān)于Condition的內(nèi)容,可以參考更多的資料來學(xué)習(xí)胰锌,本文對此知識點點到為止骗绕。
到此為止,我們梳理了ScheduledThreadPoolExecutor是如何實現(xiàn)周期性調(diào)度的资昧,首先分析了它的循環(huán)性酬土,然后分析了它的延遲效果,本文到此也就結(jié)束了格带,對于線程池的學(xué)習(xí)現(xiàn)在才剛剛起步撤缴,需要更多更專業(yè)的知識類幫我理解更為底層的內(nèi)容,當(dāng)然叽唱,為了更進一步理解線程池的實現(xiàn)細節(jié)屈呕,首先需要對線程間通信有足夠的把握,其次是要對各種數(shù)據(jù)結(jié)構(gòu)有清晰的認(rèn)識棺亭,比如隊列虎眨、優(yōu)先級隊列、堆等高級的數(shù)據(jù)結(jié)構(gòu)镶摘,以及java語言對于這些數(shù)據(jù)結(jié)構(gòu)的實現(xiàn)嗽桩,更為重要的是要結(jié)合實際情況分析問題,在工作和平時的學(xué)習(xí)中不斷總結(jié)凄敢,不斷迭代對于線程涤躲、線程池的認(rèn)知。
作者:一字馬胡
鏈接:http://www.reibang.com/p/5d5198b434a2
來源:簡書
簡書著作權(quán)歸作者所有贡未,任何形式的轉(zhuǎn)載都請聯(lián)系作者獲得授權(quán)并注明出處种樱。