1.線程池概覽
線程池主要用于線程資源的管理,防止頻繁的創(chuàng)建以及銷毀線程你踩,提升資源的利用率凰荚。JDK中的線程池實現(xiàn)昌渤,本質(zhì)上來說就是基于生產(chǎn)者-消費者模型來實現(xiàn)的,如圖所示:
向線程池中提交待執(zhí)行任務(wù),首先進(jìn)入阻塞隊列中排隊等待,然后統(tǒng)一由消費者worker執(zhí)行(這里的說法不是太嚴(yán)謹(jǐn),如果worker沒有超過核心線程數(shù)的話贼穆,會被直接創(chuàng)建出來的worker執(zhí)行,具體后面會有分析)兰粉,本文基于JDK8講解ThreadPoolExecutor的原理故痊。
2.一些基本變量
- workerCount:indicating the effective number of threads(線程池中的線程數(shù)量)
- runState:indicating whether running, shutting down etc(線程池狀態(tài))
- ctl: the combination of workerCount and runState(In order to pack them into one int, we limit workerCount to (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2 billion) otherwise representable.)(該變量為int類型,前3位保存線程池狀態(tài)玖姑,剩下29位保存線程數(shù)量愕秫,線程池中對該變量的更新都是使用CAS操作來保證線程安全)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
//計算ctl值
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
線程池一共有五種狀態(tài)
- RUNNING: Accept new tasks and process queued tasks(接收新的任務(wù)并且處理阻塞隊列中的任務(wù))
- SHUTDOWN: Don't accept new tasks, but process queued tasks(不再接收新的任務(wù),但是處理阻塞隊列中的任務(wù))
- STOP: Don't accept new tasks, don't process queued tasks,and interrupt in-progress tasks(新的任務(wù)以及阻塞隊列中的任務(wù)都不再處理焰络,中斷正在被處理的任務(wù))
- TIDYING: All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING,will run the terminated() hook method(所有的任務(wù)都被終止戴甩,線程數(shù)量為0,線程池狀態(tài)改變闪彼,執(zhí)行terminated()鉤子函數(shù)甜孤,terminated函數(shù)是線程提供出去的擴(kuò)展點,使用者可以重寫該函數(shù)畏腕,在該階段執(zhí)行自己的邏輯)
- TERMINATED: terminated() has completed(terminated()執(zhí)行完畢)
整個狀態(tài)的流轉(zhuǎn)注釋里面也寫得很清楚
* RUNNING -> SHUTDOWN
* On invocation of shutdown(), perhaps implicitly in finalize()
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* SHUTDOWN -> TIDYING
* When both queue and pool are empty
* STOP -> TIDYING
* When pool is empty
* TIDYING -> TERMINATED
* When the terminated() hook method has completed
流程圖如下:
shutdown
方法執(zhí)行時比較優(yōu)雅缴川,會允許線程池把阻塞隊列中的任務(wù)執(zhí)行完成再進(jìn)行下一步,shutdownNow
方法就比較暴力描馅,直接會丟棄掉阻塞隊列中未執(zhí)行的任務(wù)
3.創(chuàng)建線程池任務(wù)流程
整體流程如下:
線程池處理任務(wù)的主要流程分為4步驟把夸,如上圖中step所示,主要邏輯為
- 提交任務(wù)铭污,判斷線程數(shù)決定是否創(chuàng)建worker對象
- 創(chuàng)建worker對象(worker對象內(nèi)部保存線程池中的線程)
- 啟動worker對象中的線程執(zhí)行
- 直接執(zhí)行提交任務(wù)或者從阻塞隊列中獲取待執(zhí)行的任務(wù)并執(zhí)行
上述步驟中每次在創(chuàng)建線程對象時恋日,都會判斷當(dāng)前線程池中的線程數(shù)是否符合設(shè)置并做相應(yīng)的調(diào)整,具體細(xì)節(jié)會在下文討論嘹狞;針對于獲取任務(wù)的途徑岂膳,依據(jù)于當(dāng)前線程數(shù)量與核心線程數(shù)量的值來決定是直接讓創(chuàng)建出的worker對象來執(zhí)行或是先放入阻塞隊列中,讓運行中的worker從隊列中獲取任務(wù)來執(zhí)行磅网,也會在后文中詳細(xì)討論
3.1 Step1 提交任務(wù)
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//獲取表示線程數(shù)量以及線程池狀態(tài)的變量
int c = ctl.get();
//線程數(shù)量小于核心線程數(shù)谈截,直接將提交的任務(wù)傳入addWorker,
//創(chuàng)建出worker對象后直接執(zhí)行該任務(wù)
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//線程數(shù)量大于核心線程數(shù),所有提交任務(wù)直接放入到阻塞隊列中傻盟,
//由運行中的worker從隊首獲取任務(wù)來執(zhí)行
if (isRunning(c) && workQueue.offer(command)) {
//double check,再次獲取線程池狀態(tài)以及線程數(shù)嫂丙,
//以防中途線程池狀態(tài)改變或者線程數(shù)量減少為0
int recheck = ctl.get();
//線程池非running狀態(tài)娘赴,則將該任務(wù)從阻塞隊列中移除并且調(diào)用tryTerminate()方法
if (! isRunning(recheck) && remove(command))
reject(command);
//線程數(shù)為0時,創(chuàng)建新的worker對象從阻塞隊列中獲取任務(wù)
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//1.阻塞隊列已滿跟啤,丟棄任務(wù)诽表,調(diào)用reject()方法
//2.線程池為非running狀態(tài),丟棄任務(wù)隅肥,調(diào)用reject()方法
else if (!addWorker(command, false))
reject(command);
}
3.2 Step2 創(chuàng)建worker
對象
addWorker
主要負(fù)責(zé)創(chuàng)建worker對象竿奏,每一個worker對象中有擁有一個線程,創(chuàng)建worker的本質(zhì)就是創(chuàng)建線程池中的執(zhí)行任務(wù)的線程腥放,同時worker繼承AbstractQueuedSynchronizer泛啸,自身攜帶鎖的機(jī)制,可以控制并發(fā)秃症,比如在后面提到的runWorker調(diào)用中候址,執(zhí)行任務(wù)前都會先獲取自身的鎖,在調(diào)用shutdown方法后种柑,中斷線程的調(diào)用中岗仑,也會先獲取鎖。addWorker
整體流程如下:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
//獲取當(dāng)前線程池狀態(tài)
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
//SHUTDOWN狀態(tài)時聚请,允許線程池把阻塞隊列中的任務(wù)執(zhí)行完荠雕,
//如果workQueue還有任務(wù),是允許增加線程數(shù)量的驶赏,
//firstTask == null代表不是外部新提交的任務(wù)
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//獲取線程數(shù)量
int wc = workerCountOf(c);
//是否大于能存儲線程數(shù)量的最大值
if (wc >= CAPACITY ||
//根據(jù)core的值選擇是對核心線程數(shù)還是最大線程數(shù)比較
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//通過CAS操作增加線程池中記錄的線程數(shù)量
if (compareAndIncrementWorkerCount(c))
//更新線程池線程數(shù)成功炸卑,繼續(xù)向下執(zhí)行
break retry;
//更新失敗說明線程池數(shù)量發(fā)生改變,重新獲取線程池線程數(shù)量以及狀態(tài)
c = ctl.get(); // Re-read ctl
//如果狀態(tài)發(fā)生改變母市,則回到retry位置矾兜,重新執(zhí)行,判斷狀態(tài)等操作
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
//(狀態(tài)沒有發(fā)生改變患久,繼續(xù)重新計算線程數(shù)椅寺,通過CAS操作更新線程數(shù))
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
/**創(chuàng)建worker對象,worker對象在創(chuàng)建的時候會調(diào)用threadFactory創(chuàng)建一個新的線程蒋失,
新的線程會把worker對象傳入構(gòu)造函數(shù)
小于核心線程數(shù)的時候返帕,firstTask不為null,提交的任務(wù)會被該線程直接執(zhí)行
**/
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
//線程池為shutdown狀態(tài)并且沒有向線程池中提交任務(wù)的情況
(rs == SHUTDOWN && firstTask == null)) {
//檢查線程是否被提前啟動
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//把worker添加到workers集合中
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//啟動該worker中的線程
t.start();
workerStarted = true;
}
}
} finally {
//如果線程沒有正常啟動
if (! workerStarted)
//1.如果workers集合中存在worker篙挽,則移除 2.減少線程數(shù) 3.調(diào)用tryTerminate()方法
addWorkerFailed(w);
}
return workerStarted;
}
3.3 Step3 調(diào)用runWorker
方法
runWorker
方法是由每個worker
對象中的線程來執(zhí)行荆萤,具體執(zhí)行的任務(wù)放在worker
對象中的firstTask中,如果firstTask為null,則會調(diào)用getTask
方法從阻塞隊列中獲取任務(wù)链韭,如下圖流程所示:
簡單來說偏竟,
runWorker
就是線程執(zhí)行任務(wù)的地方,但是實際runWorker
的設(shè)計是有許多地方的考量敞峭,包括:
- 線程的回收:線程池狀態(tài)或者參數(shù)的變化時踊谋,已經(jīng)啟動的線程如何被回收,針對于線程池狀態(tài)的改變旋讹,該方法中會獲取一次線程池的狀態(tài)殖蚕,如果是STOP,TIDYING沉迹,TERMINATED狀態(tài)睦疫,就會調(diào)用
interrupt
方法釋放中斷信號;線程池參數(shù)的變化是依賴getTask
來實現(xiàn)的鞭呕,只要getTask
返回為null
蛤育,線程就會跳出while循環(huán),執(zhí)行結(jié)束葫松。例如:默認(rèn)allowCoreThreadTimeOut
為false缨伊,阻塞隊列為空,線程數(shù)量大于設(shè)置的核心線程數(shù)之后进宝,在特定時間內(nèi)被回收的邏輯刻坊,或者線程池在運行中,動態(tài)修改核心線程數(shù)之后党晋,線程的是否會回收的邏輯均在getTask
方法中谭胚,具體會在后文詳細(xì)分析getTask
方法。 - 擴(kuò)展性:線程執(zhí)行任務(wù)的上下文中穿插hook函數(shù)用以實現(xiàn)擴(kuò)展邏輯未玻,包括
beforeExecute
方法用以實現(xiàn)執(zhí)行任務(wù)前的邏輯以及afterExecute
用以實現(xiàn)執(zhí)行任務(wù)后的邏輯灾而,使用者可以針對這兩個函數(shù)自行擴(kuò)展 - 異常情況的處理:異常發(fā)生的地方主要有,
beforeExecute
方法產(chǎn)生的異常扳剿,線程執(zhí)行任務(wù)中產(chǎn)生的異常(最常見)旁趟,afterExecute
方法產(chǎn)生的異常,當(dāng)產(chǎn)生異常的時庇绽,completedAbruptly
會一直是true
锡搜,同時異常會被拋出到開發(fā)者的代碼層面,在后續(xù)執(zhí)行processWorkerExit
的方法中調(diào)用addWorker
方法瞧掺,產(chǎn)生新的線程來彌補(bǔ)因為異常終止的線程耕餐。
runWorker
詳細(xì)流程如下:
源碼參照:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//第一次被創(chuàng)建時如果自帶任務(wù)則執(zhí)行該任務(wù),除此之外則從阻塞隊列中獲取
while (task != null || (task = getTask()) != null) {
//獲取自身的鎖辟狈,線程池調(diào)用shutdown方法肠缔,終止線程時也會搶奪該鎖
w.lock();
//再次判斷線程狀態(tài)夏跷,如果大于STOP狀態(tài),則釋放中斷信號
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//執(zhí)行提交任務(wù)前的hook函數(shù)明未,留給用戶自己實現(xiàn)
beforeExecute(wt, task);
Throwable thrown = null;
try {
//真正的執(zhí)行提交給線程池的任務(wù)
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//執(zhí)行提交任務(wù)后的hook函數(shù)槽华,留給用戶自己實現(xiàn)
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//發(fā)生異常以及獲取task=null時的兜底方法
processWorkerExit(w, completedAbruptly);
}
}
processWorkerExit
作為runWorker
方法中的重要一環(huán),主要承擔(dān)(cleanup)清理和(bookkeeping)簿記dying worker
的作用趟妥,dying worker
其實就是指因為task==null或者執(zhí)行過程中因為異常走到finally代碼塊中的線程硼莽。執(zhí)行processWorkerExit
時,線程池的參數(shù)設(shè)定以及狀態(tài)相對于runWorker
方法中的代碼塊煮纵,也有可能是已經(jīng)發(fā)生變化了的,processWorkerExit
也會有相應(yīng)的判斷偏螺,線程池的設(shè)計目的本來就是動態(tài)維護(hù)一定的線程數(shù)量行疏,因此但凡涉及到添加線程的操作,都會進(jìn)行狀態(tài)以及參數(shù)的判斷套像。
processWorkerExit
參照的源碼如下:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//如果是發(fā)生異常時進(jìn)入到該方法中酿联,先將線程計數(shù)減1
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
//從workers集合中移除該worker
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
//再次對線程池狀態(tài)進(jìn)行判斷
if (runStateLessThan(c, STOP)) {
//task==null時進(jìn)入該方法,如果設(shè)置核心線程始終存活夺巩,當(dāng)線程池數(shù)大于設(shè)置的核心
//線程數(shù)時贞让,則不需要補(bǔ)充線程;如果設(shè)置核心線程有存活時間并且當(dāng)前阻塞隊列中有
//任務(wù)柳譬,當(dāng)線程數(shù)大于1時則不需要補(bǔ)充線程喳张,同樣當(dāng)阻塞隊列中沒有任務(wù)時,
//也不需要補(bǔ)充
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//異常退出時美澳,直接調(diào)用addWorker向線程池補(bǔ)充新的線程销部,無論此時線程池配置
//是否發(fā)生變化,都會在addWorker校驗
addWorker(null, false);
}
}
3.4 Step4 調(diào)用getTask
方法
getTask
方法主要用于根據(jù)當(dāng)前線程池配置(線程池中的配置是能在啟動后動態(tài)設(shè)置的)獲取任務(wù)制跟,該過程主要包括獲取任務(wù)的過程中被阻塞以及在特定時間內(nèi)獲取任務(wù)舅桩,如果無法及時獲取到任務(wù)則會直接返回null,getTask返回null后雨膨,在上層的runWorker中就會跳出while
循環(huán)擂涛,繼續(xù)往下執(zhí)行直到被回收。當(dāng)無法獲取任務(wù)即return null主要包括以下情況:
- 最大線程數(shù)減少聊记,線程在運行過程中調(diào)用
setMaximumPoolSize
可以重新設(shè)置核心線程數(shù) - 線程池處于STOP狀態(tài)
- 線程池處于SHUTDOWN并且阻塞隊列中任務(wù)為0
- 從阻塞隊列中獲取任務(wù)超時并且超時的worker需要被終止(當(dāng)設(shè)置allowCoreThreadTimeOut為true允許核心線程數(shù)擁有存活時間限制或者當(dāng)前線程池中的線程數(shù)大于核心線程數(shù))
主要流程圖如下:
參照源碼:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//檢查線程池狀態(tài)撒妈,線程池處于STOP狀態(tài)或者線程池處于SHUTDOWN并且阻塞隊列中
//任務(wù)為0時減少線程數(shù)量并return null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
//查看是否設(shè)置了核心線程數(shù)超時或者當(dāng)前線程線程數(shù)大于核心線程數(shù)
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//1.如果在上一次循環(huán)中獲取任務(wù)超時,只要當(dāng)前線程池中線程數(shù)量大于1排监,
//就會回收worker并且return nll(該邏輯就是當(dāng)線程池中線程數(shù)量大于核心線程數(shù)小于最
//大線程數(shù)時踩身,空閑狀態(tài)下的線程會被回收的邏輯,同理當(dāng)設(shè)置核心線程數(shù)有存活時間
//時社露,核心線程也擁有了被回收掉的邏輯)
//2.如果當(dāng)前線程數(shù)大于最大線程數(shù)挟阻,也會被回收掉
if ((wc > maximumPoolSize || (timed && timedOut))
//兜底策略,不論線程池配置如何設(shè)置,只要阻塞隊列中有任務(wù)附鸽,線程池中至少都要保
//留一個線程
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
//設(shè)定獲取任務(wù)的時間脱拼,超時則在下一次循環(huán)中return null
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
//無timed邏輯則在此處直接阻塞,直到能獲取到任務(wù)
workQueue.take();
//成功獲取任務(wù)則直接返回
if (r != null)
return r;
//獲取任務(wù)超時后設(shè)置超時標(biāo)志
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
3.5 拒絕策略
當(dāng)線程池中的阻塞隊列任務(wù)已滿坷备,繼續(xù)提交任務(wù)便會調(diào)用reject
方法熄浓,觸發(fā)拒絕策略,四種RejectedExecutionHandler
接口的實現(xiàn)如下:
-
AbortPolicy
:直接拋出異常
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
-
CallerRunsPolicy
:當(dāng)線程池處于運行狀態(tài)中時省撑,直接在調(diào)用方線程中執(zhí)行提交給線程池的任務(wù)
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
-
DiscardOldestPolicy
:當(dāng)線程池處于運行狀態(tài)中時赌蔑,丟棄阻塞隊列中隊首的任務(wù),然后再嘗試向線程池中提交任務(wù)
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
}
-
DiscardPolicy
:針對丟棄的任務(wù)不做任何的處理
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
4.線程池的關(guān)閉
處于運行中狀態(tài)的線程池調(diào)用shutdown
或許shutdownNow
方法就能終止線程竟秫,這兩者方法的主要區(qū)別在于shutdown
會將線程池的狀態(tài)置為SHUTDOWN娃惯,該方法會允許阻塞隊列中的
任務(wù)被執(zhí)行完后才會流轉(zhuǎn)到TIDYING狀態(tài),而shutdownNow
方法會將線程池的狀態(tài)置為STOP肥败,該方法會直接丟棄阻塞隊列中任務(wù)趾浅。
shutdown
方法
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//檢查調(diào)用方是否有權(quán)限操作
checkShutdownAccess();
//更新線程池狀態(tài)
advanceRunState(SHUTDOWN);
//中斷空閑狀態(tài)的線程,調(diào)用worker的tryLock方法馒稍,如果能獲取到說明該線程處于
//空閑狀態(tài)皿哨,沒有在執(zhí)行任務(wù)
interruptIdleWorkers();
//為ScheduledThreadPoolExecutor提供的鉤子函數(shù)
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//調(diào)用最后的terminated方法,線程池狀態(tài)變?yōu)門ERMINATED
tryTerminate();
}
shutdownNow
方法
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//設(shè)置線程池狀態(tài)為STOP
advanceRunState(STOP);
//遍歷workers集合纽谒,中斷所有線程
interruptWorkers();
//返回阻塞隊列中沒有執(zhí)行完的任務(wù)
tasks = drainQueue();
} finally {
mainLock.unlock();
}
//調(diào)用最后的terminated方法证膨,線程池狀態(tài)變?yōu)門ERMINATED
tryTerminate();
return tasks;
}
5.結(jié)尾
線程池的本質(zhì)就是動態(tài)維護(hù)一定數(shù)量的線程來執(zhí)行任務(wù),采用生產(chǎn)者-消費者模型鼓黔。但是整體分析下來發(fā)現(xiàn)實現(xiàn)其實并不簡單椎例,主要原因有:
- 線程池狀態(tài)是變化的
- 線程數(shù)量是變化的
- 線程池的配置參數(shù)也可能在運行中被外部調(diào)用方法改變,比如最大線程數(shù)请祖,核心線程數(shù)等等
難點在于變化订歪,因此不論是增加線程或者是中斷線程設(shè)置是獲取任務(wù),都離不開狀態(tài)與線程數(shù)的判斷肆捕。