三缤至、ThreadPoolExecutor解析
上文中描述了Java中線程池相關(guān)的架構(gòu)牡辽,了解了這些內(nèi)容其實(shí)我們就可以使用java的線程池為我們工作了,使用其提供的線程池我們可以很方便的寫出高質(zhì)量的多線程代碼街夭,本節(jié)將分析ThreadPoolExecutor的實(shí)現(xiàn)祸轮,來探索線程池的運(yùn)行原理。下面的圖片展示了ThreadPoolExecutor的類圖:
ThreadPoolExecutor的類圖
下面是幾個(gè)比較關(guān)鍵的類成員:
private final BlockingQueue<Runnable> workQueue; // 任務(wù)隊(duì)列哀峻,我們的任務(wù)會(huì)添加到該隊(duì)列里面涡相,線程將從該隊(duì)列獲取任務(wù)來執(zhí)行
private final HashSet<Worker> workers = new HashSet<Worker>();//任務(wù)的執(zhí)行值集合,來消費(fèi)workQueue里面的任務(wù)
private volatile ThreadFactory threadFactory;//線程工廠
private volatile RejectedExecutionHandler handler;//拒絕策略剩蟀,默認(rèn)會(huì)拋出異異常催蝗,還要其他幾種拒絕策略如下:
1、CallerRunsPolicy:在調(diào)用者線程里面運(yùn)行該任務(wù)
2育特、DiscardPolicy:丟棄任務(wù)
3丙号、DiscardOldestPolicy:丟棄workQueue的頭部任務(wù)
private volatile int corePoolSize;//最下保活work數(shù)量
private volatile int maximumPoolSize;//work上限
我們嘗試執(zhí)行submit方法,下面是執(zhí)行的關(guān)鍵路徑犬缨,總結(jié)起來就是:如果Worker數(shù)量還沒達(dá)到上限則繼續(xù)創(chuàng)建喳魏,否則提交任務(wù)到workQueue,然后讓worker來調(diào)度運(yùn)行任務(wù)遍尺。
step 1: <ExecutorService>
Future<?> submit(Runnable task);
step 2:<AbstractExecutorService>
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
step 3:<Executor>
void execute(Runnable command);
step 4:<ThreadPoolExecutor>
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { //提交我們的額任務(wù)到workQueue
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false)) //使用maximumPoolSize作為邊界
reject(command); //還不行截酷?拒絕提交的任務(wù)
}
step 5:<ThreadPoolExecutor>
private boolean addWorker(Runnable firstTask, boolean core)
step 6:<ThreadPoolExecutor>
w = new Worker(firstTask); //包裝任務(wù)
final Thread t = w.thread; //獲取線程(包含任務(wù))
workers.add(w); // 任務(wù)被放到works中
t.start(); //執(zhí)行任務(wù)
上面的流程是高度概括的涮拗,實(shí)際情況遠(yuǎn)比這復(fù)雜得多乾戏,但是我們關(guān)心的是怎么打通整個(gè)流程,所以這樣分析問題是沒有太大的問題的三热。觀察上面的流程鼓择,我們發(fā)現(xiàn)其實(shí)關(guān)鍵的地方在于Worker,如果弄明白它是如何工作的就漾,那么我們也就大概明白了線程池是怎么工作的了呐能。下面分析一下Worker類。
worker類圖
上面的圖片展示了Worker的類關(guān)系圖抑堡,關(guān)鍵在于他實(shí)現(xiàn)了Runnable接口摆出,所以問題的關(guān)鍵就在于run方法上。在這之前首妖,我們來看一下Worker類里面的關(guān)鍵成員:
final Thread thread;
Runnable firstTask; //我們提交的任務(wù)偎漫,可能被立刻執(zhí)行,也可能被放到隊(duì)列里面
thread是Worker的工作線程有缆,上面的分析我們也發(fā)現(xiàn)了在addWorker中會(huì)獲取worker里面的thread然后start象踊,也就是這個(gè)線程的執(zhí)行,而Worker實(shí)現(xiàn)了Runnable接口棚壁,所以在構(gòu)造thread的時(shí)候Worker將自己傳遞給了構(gòu)造函數(shù)杯矩,thread.start執(zhí)行的其實(shí)就是Worker的run方法。下面是run方法的內(nèi)容:
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
我們來分析一下runWorker這個(gè)方法袖外,這就是整個(gè)線程池的核心史隆。首先獲取到了我們剛提交的任務(wù)firstTask,然后會(huì)循環(huán)從workQueue里面獲取任務(wù)來執(zhí)行曼验,獲取任務(wù)的方法如下:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
其實(shí)核心也就一句:
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
我們再回頭看一下execute逆害,其實(shí)我們上面只走了一條邏輯,在execute的時(shí)候蚣驼,我們的worker的數(shù)量還沒有到達(dá)我們設(shè)定的corePoolSize的時(shí)候魄幕,會(huì)走上面我們分析的邏輯,而如果達(dá)到了我們設(shè)定的閾值之后颖杏,execute中會(huì)嘗試去提交任務(wù)纯陨,如果提交成功了就結(jié)束,否則會(huì)拒絕任務(wù)的提交。我們上面還提到一個(gè)成員:maximumPoolSize翼抠,其實(shí)線程池的最大的Worker數(shù)量應(yīng)該是maximumPoolSize咙轩,但是我們上面的分析是corePoolSize,這是因?yàn)槲覀兊膒rivate boolean addWorker(Runnable firstTask, boolean core)的參數(shù)core的值來控制的阴颖,core為true則使用corePoolSize來設(shè)定邊界活喊,否則使用maximumPoolSize來設(shè)定邊界盗尸。直觀的解釋一下霎肯,當(dāng)線程池里面的Worker數(shù)量還沒有到corePoolSize,那么新添加的任務(wù)會(huì)伴隨著產(chǎn)生一個(gè)新的worker匣砖,如果Worker的數(shù)量達(dá)到了corePoolSize偎肃,那么就將任務(wù)存放在阻塞隊(duì)列中等待Worker來獲取執(zhí)行煞烫,如果沒有辦法再向阻塞隊(duì)列放任務(wù)了,那么這個(gè)時(shí)候maximumPoolSize就變得有用了累颂,新的任務(wù)將會(huì)伴隨著產(chǎn)生一個(gè)新的Worker滞详,如果線程池里面的Worker已經(jīng)達(dá)到了maximumPoolSize,那么接下來提交的任務(wù)只能被拒絕策略拒絕了紊馏×霞ⅲ可以參考下面的描述來理解:
* When a new task is submitted in method {@link #execute(Runnable)},
* and fewer than corePoolSize threads are running, a new thread is
* created to handle the request, even if other worker threads are
* idle. If there are more than corePoolSize but less than
* maximumPoolSize threads running, a new thread will be created only
* if the queue is full. By setting corePoolSize and maximumPoolSize
* the same, you create a fixed-size thread pool. By setting
* maximumPoolSize to an essentially unbounded value such as {@code
* Integer.MAX_VALUE}, you allow the pool to accommodate an arbitrary
* number of concurrent tasks. Most typically, core and maximum pool
* sizes are set only upon construction, but they may also be changed
* dynamically using {@link #setCorePoolSize} and {@link
* #setMaximumPoolSize}.
在此需要說明一點(diǎn),有一個(gè)重要的成員:keepAliveTime朱监,當(dāng)線程池里面的線程數(shù)量超過corePoolSize了岸啡,那么超出的線程將會(huì)在空閑keepAliveTime之后被terminated《呐螅可以參考下面的文檔:
* If the pool currently has more than corePoolSize threads,
* excess threads will be terminated if they have been idle for more
* than the keepAliveTime (see {@link #getKeepAliveTime(TimeUnit)}).