1 ThreadPoolExecutor介紹
ThreadPoolExecutor是線程池是實(shí)現(xiàn)怒坯。包含了對(duì)線程生命周期的管理视译。ThreadPoolExecutor的核心參數(shù)包括:
//線程工廠酷含,設(shè)置線程名字,方便定位線程
private volatile ThreadFactory threadFactory;
//超過(guò)最大線程數(shù)量之后執(zhí)行的飽和策略
1.AbortPolicy:直接拋出異常。 2.CallerRunsPolicy:只用調(diào)用者所在線程來(lái)運(yùn)行任務(wù)慌闭。
3.DiscardOldestPolicy:丟棄隊(duì)列里最近的一個(gè)任務(wù),并執(zhí)行當(dāng)前任務(wù)丧失。
4.DiscardPolicy:不處理布讹,丟棄掉。
5.也可以根據(jù)自定義策略。如先持久化呼股,記錄日志或者數(shù)據(jù)庫(kù)吸奴,后續(xù)補(bǔ)償。
private volatile RejectedExecutionHandler handler;
//線程空閑時(shí)間逞度,超過(guò)這個(gè)時(shí)間,線程自動(dòng)退出(默認(rèn)針對(duì)超出corePoolSize的線程)
private volatile long keepAliveTime;
//是否回收核心線程
private volatile boolean allowCoreThreadTimeOut;
//核心線程數(shù)量:
//小于該數(shù)量馆匿,執(zhí)行任務(wù)時(shí)會(huì)創(chuàng)建新的線程
//默認(rèn)核心線程是不會(huì)回收的,當(dāng)然也可以設(shè)置成可回收
private volatile int corePoolSize;
//最大線程數(shù)量(包括線程隊(duì)列里面的線程數(shù)量),達(dá)到這個(gè)線程數(shù)的時(shí)候會(huì)執(zhí)行飽和策略
private volatile int maximumPoolSize;
//線程任務(wù)隊(duì)列
1.ArrayBlockingQueue:基于數(shù)組的有界阻塞隊(duì)列呕臂,此隊(duì)列有序: FIFO(先進(jìn)先出)歧蒋。
2.LinkedBlockingQueue:基于鏈表的阻塞隊(duì)列吴叶,此隊(duì)列有序: 吞吐量通常要高于ArrayBlockingQueue
3.SynchronousQueue:不存儲(chǔ)元素的阻塞隊(duì)列敌呈。每個(gè)插入操作必須等到另一個(gè)線程調(diào)用取出操作吭练,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于LinkedBlockingQueue
4.PriorityBlockingQueue:具有優(yōu)先級(jí)的無(wú)界阻塞隊(duì)列分尸。
private final BlockingQueue<Runnable> workQueue;
//默認(rèn)飽和策略
private static final RejectedExecutionHandler defaultHandler =
new ThreadPoolExecutor.AbortPolicy();
注意:
threadFactory
一般自定義為線程池創(chuàng)建線程用尺上,通常會(huì)為線程創(chuàng)建一個(gè)易于理解的名稱卑吭。
allowCoreThreadTimeOut
默認(rèn)是false
马绝,當(dāng)為true
是可以回收空閑的核心線程數(shù)豆赏。
handler
通常可以自定義拒絕策略富稻,可以實(shí)現(xiàn)任務(wù)持久化河绽,便于后期補(bǔ)償。
2 ThreadPoolExecutor的狀態(tài)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 000-1111111111111111111111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 111 - 0000000000000000000000000000(十進(jìn)制: -536, 870, 912)
private static final int RUNNING = -1 << COUNT_BITS;
// 000 - 0000000000000000000000000(十進(jìn)制: 0)
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001 - 00000000000000000000000000(十進(jìn)制: 536,870唉窃, 912)
private static final int STOP = 1 << COUNT_BITS;
// 010 - 00000000000000000000000000.(十進(jìn)制值: 1, 073, 741, 824)
private static final int TIDYING = 2 << COUNT_BITS;
// 011 - 000000000000000000000000000(十進(jìn)制值: 1, 610,612, 736)
private static final int TERMINATED = 3 << COUNT_BITS;
ThreadPoolExecutor
通過(guò)ctl
保存線程池的狀態(tài)和工作線程數(shù),ctl
是一個(gè)32位整數(shù)蔓涧,高3位表示線程池狀態(tài)鉴未,低29位表示工作線程數(shù)量(workCount)巍糯,所有工作線程的最大數(shù)量是2^29-1咱筛。
線程池的狀態(tài)數(shù)值大小一次是RUNNING
<SHUTDOWN
<STOP
<TIDYING
<TERMINATED
,并且只有RUNNING
狀態(tài)是負(fù)數(shù)盟猖,這樣設(shè)計(jì)的原因是方便狀態(tài)判斷你弦。
線程池狀態(tài)分為5種:RUNNING
、SHUTDOWN
磁浇、STOP
、TIDYING
后专、TERMINATED
- RUNNING:(運(yùn)行)接收新的任務(wù)并處理隊(duì)列里的任務(wù)
- SHUTDOWN:(關(guān)閉)不接收新的任務(wù)殷蛇,但是隊(duì)列中任務(wù)會(huì)執(zhí)行
- STOP:(停止)不接收新的任務(wù)露氮,不處理隊(duì)列中的任務(wù),并終止正在處理的任務(wù)
- TIDYING:(整理)所有任務(wù)都被終止的,同時(shí)工作線程數(shù)量為0
- TERMINATED:(已終止) terminated()方法執(zhí)行結(jié)束后會(huì)進(jìn)入這一狀態(tài)慧瘤,表示線程池已關(guān)閉桦沉。terminated()方法默認(rèn)是空方法,可以自定義實(shí)現(xiàn)淘菩。
下面是這五種狀態(tài)的轉(zhuǎn)換:
RUNNING -> SHUTDOWN:通過(guò)調(diào)用
shutdown()
方法實(shí)現(xiàn)殖告。(RUNNING or SHUTDOWN) -> STOP:通過(guò)調(diào)用
shutdownNow()
方法實(shí)現(xiàn)。SHUTDOWN -> TIDYING:task隊(duì)列和墓懂,worker集合為空焰宣。
STOP -> TIDYING:worker集合為空。
TIDYING -> TERMINATED:執(zhí)行完
terminated()
方法后捕仔。
注意:
ThreadPoolExecutor
只提供了shutdown()
和shutdownNow()
兩個(gè)方法轉(zhuǎn)換狀態(tài)匕积。其他狀態(tài)的轉(zhuǎn)變都是隱式裝換(tryTerminate()方法中完成)。但是可以通過(guò)如下方法查看線程池狀態(tài):
public boolean isTerminating() {
int c = ctl.get();
return ! isRunning(c) && runStateLessThan(c, TERMINATED);
}
public boolean isTerminated() {
return runStateAtLeast(ctl.get(), TERMINATED);
}
public boolean isShutdown() {
return ! isRunning(ctl.get());
}
2.1 ThreadPoolExecutor獲取狀態(tài)和線程數(shù)
//~CAPACITY: 11100000000000000000000000000001
//return c & ~CAPACITY:獲取c高3位
private static int runStateOf(int c) { return c & ~CAPACITY; }
//CAPACITY:00011111111111111111111111111111
//c & CAPACITY:返回的就是c的低29位
private static int workerCountOf(int c) { return c & CAPACITY; }
//SHUTDOWN是0榜跌,小于SHUTDOWN闪唆,也就是判斷是否是RUNNING
private static boolean isRunning(int c) { return c < SHUTDOWN;}
//通過(guò)狀態(tài)和線程數(shù)量組合ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
2.2 線程池狀態(tài)判斷
// 判斷當(dāng)前線程池運(yùn)行狀態(tài)值是否小于給定值
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
// 判斷當(dāng)前線程池運(yùn)行狀態(tài)值是否大于等于給定值
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
3 ThreadPoolExecutor的運(yùn)行過(guò)程
3.1 execute方法
public void execute(Runnable command) {
if (command == null)
//不能為null
throw new NullPointerException();
int c = ctl.get();
//返回ctl的低29位,判斷是否小于核心線程數(shù)
if (workerCountOf(c) < corePoolSize) {
//創(chuàng)建核心線程執(zhí)行任務(wù)
if (addWorker(command, true))
//成功就返回
return;
//失敗重新獲取ctl
c = ctl.get();
}
//走到這钓葫,說(shuō)明大于核心線程數(shù)悄蕾,則判斷是否是運(yùn)行狀態(tài),是就往隊(duì)列中加任務(wù)
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//如果不是RUNNING础浮,就移除當(dāng)前任務(wù)
if (! isRunning(recheck) && remove(command))
//失敗就執(zhí)行拒絕策略
reject(command);
//判斷線程總數(shù)量是否為0(有可能線程執(zhí)行任務(wù)報(bào)錯(cuò)了帆调,然后就被銷毀了)
else if (workerCountOf(recheck) == 0)
//創(chuàng)建非核心線程執(zhí)行任務(wù)(注意這里是null)
addWorker(null, false);
}
//走到這,說(shuō)明不是運(yùn)行狀態(tài)或者隊(duì)列滿了豆同,就創(chuàng)建非核心線程執(zhí)行任務(wù)
else if (!addWorker(command, false))
//失敗就執(zhí)行拒絕策略
reject(command);
}
上述代碼的執(zhí)行流程如下:
注意:
只有在線程池處于RUNNING
狀態(tài)時(shí)提交任務(wù)的任務(wù)才會(huì)被執(zhí)行番刊,否則執(zhí)行拒絕策略。
3.2 addWorker方法
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 運(yùn)行狀態(tài)
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 分兩種情況考慮:
// 1影锈、如果線程池狀態(tài)>SHUTDOWN芹务,即(STOP,TIDYING,TERMINATED)中的一個(gè),直接返回false(因?yàn)檫@些條件下要求 清空隊(duì)列鸭廷,正在運(yùn)行的任務(wù)也要停止)
// 2枣抱、如果線程池狀態(tài)=SHUTDOWN & (firstTask!=null || 隊(duì)列為空)直接返回false,
// 換言之,在SHUTDOWN狀態(tài)下, 想要?jiǎng)?chuàng)建一個(gè)firstTask為空的新worker靴姿,需要確保隊(duì)列不為空(隊(duì)列為空就意味著這個(gè)新的worker暫時(shí)還沒有任務(wù)可以執(zhí)行沃但,所以也沒有創(chuàng)建worker的必要, 因?yàn)檫@個(gè)worker是用來(lái)消化隊(duì)列中的任務(wù))
// 大部分情況下,我們需要新創(chuàng)建的worker的firstTask都有初始化任務(wù), 在SHUTDOWN狀態(tài)下佛吓,就不允許在創(chuàng)建worker了宵晚,直接返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 計(jì)算worker的數(shù)量
int wc = workerCountOf(c);
// 如果worker數(shù)量已經(jīng)超過(guò)指定大小,則不允許創(chuàng)建
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 嘗試cas累加worker的數(shù)量维雇,如果成功就跳出最外層循環(huán)
if (compareAndIncrementWorkerCount(c))
break retry;
// 本次cas失敗淤刃,表明ctl已經(jīng)變了,檢查線程池狀態(tài)吱型,如果狀態(tài)變了就跳到最外層重新執(zhí)行一次
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
// 如果到這里了說(shuō)明是worker的數(shù)量改變了導(dǎo)致的cas失敗逸贾,那就在內(nèi)層自旋操作 再來(lái)一次
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 狀態(tài)rs < SHUTDOWN表明是RUNNING狀態(tài)
// 如果線程池的狀態(tài)是SHUTDOWN,那么創(chuàng)建的worker是用來(lái)處理隊(duì)列中的任務(wù),因此需要滿足firstTask == null
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 如果線程提前start則認(rèn)為是異常狀態(tài)
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
// 就是記錄一下線程池在運(yùn)行中線程數(shù)量的最大值
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 這里啟動(dòng)worker運(yùn)行
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// 如果添加worker失敗了铝侵,這里需要處理一下
addWorkerFailed(w);
}
return workerStarted;
}
當(dāng)創(chuàng)建新worker時(shí)灼伤,firstTask為null的含義:
首先需要知道的是worker執(zhí)行的任務(wù)的來(lái)源,一是初始化時(shí)的firstTask 即當(dāng)前worker的第一個(gè)任務(wù), 二是從隊(duì)列里取
firstTask表示當(dāng)前worker初始化任務(wù)咪鲜,也就是第一個(gè)要執(zhí)行的任務(wù)狐赡;如果firstTask=null, 說(shuō)明此worker只從隊(duì)列中取任務(wù)來(lái)執(zhí)行
所以當(dāng)創(chuàng)建firstTask為null的worker時(shí),只有隊(duì)列不為空才有創(chuàng)建的必要疟丙,因?yàn)槟康氖侨ハ?duì)列中的任務(wù)
分三種情況來(lái)看創(chuàng)建worker:
正常情況下線程池的狀態(tài)是RUNNING颖侄,這個(gè)時(shí)候只需根據(jù)corePoolSize或者maximumPoolSize來(lái)判斷是否應(yīng)該創(chuàng)建新的woker
如果是STOP,TIDYING,TERMINATED狀態(tài),表明線程池處于清理資源享郊,關(guān)閉線程池(清空隊(duì)列览祖,終止正在運(yùn)行的任務(wù),清空worker)炊琉,這個(gè)時(shí)候不允許創(chuàng)建新的worker
SHUTDOWN狀態(tài)展蒂,此狀態(tài)比較特殊,因?yàn)樵诖藸顟B(tài)會(huì)繼續(xù)處理隊(duì)列中的任務(wù)苔咪,但是不允許往隊(duì)列中新增任務(wù)玄货,同時(shí)正常處理的任務(wù)也會(huì)繼續(xù)處理; 此狀態(tài)下悼泌,在firstTask為null并且隊(duì)列不為空的情況下可以創(chuàng)建新的worker來(lái)處理隊(duì)列中的任務(wù)松捉,其他情況是不允許的
3.3 addWorkerFailed方法
當(dāng)worker啟動(dòng)失敗時(shí)處理:
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
// worker計(jì)數(shù)器減1
decrementWorkerCount();
// 嘗試終止線程池
tryTerminate();
} finally {
mainLock.unlock();
}
}
4 Worker線程
worker繼承AbstractQueuedSynchronizer,實(shí)現(xiàn)了非重入鎖
- state=0, 表示鎖未被持有
- state=1, 表示鎖被持有
- state=-1, 初始化的值馆里,防止worker線程在真正運(yùn)行task之前被中斷隘世,
Worker也是一個(gè)任務(wù),實(shí)現(xiàn)了Runnable接口鸠踪。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
// 每個(gè)worker中都會(huì)封裝一個(gè)真正用于處理任務(wù)的線程
final Thread thread;
// 這個(gè)worker初始化的時(shí)候分配的首個(gè)任務(wù)
Runnable firstTask;
// 記錄當(dāng)前worker處理完的任務(wù)數(shù)量
volatile long completedTasks;
// 構(gòu)造方法
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 使用ThreadFactory創(chuàng)建線程
// 這里this是當(dāng)前實(shí)現(xiàn)了Runnable接口的Worker對(duì)象丙者,也就是說(shuō)當(dāng)我調(diào)用thread.start()方法時(shí),就會(huì)調(diào)用worker的入口方法run
this.thread = getThreadFactory().newThread(this);
}
// 基于AQS實(shí)現(xiàn)的非重入鎖
protected boolean isHeldExclusively() {
return getState() != 0;
}
// 實(shí)際并未使用傳入的參數(shù)
// state=0表示可以獲取营密,將state設(shè)置為1表示獲取成功
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 實(shí)際并未使用傳入的參數(shù)
// 將state設(shè)置為0表示鎖釋放成功
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
// worker的主要入口邏輯
public void run() {
runWorker(this);
}
}
4.1 runWorker方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 和setState(-1); 相對(duì)應(yīng)
// 這個(gè)時(shí)候state=0了械媒,也就可以被中斷了,(ThreadPoolExecutor.Worker#interruptIfStarted)
w.unlock(); // allow interrupts
// 是否有異常產(chǎn)生
boolean completedAbruptly = true;
try {
// task的來(lái)源要么是firstTask评汰,要么是隊(duì)列
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 如果線程池是STOP及之后的狀態(tài)纷捞,需要確保它是中斷的
// 如果是STOP之前的狀態(tài),就要確保它不能被中斷(如果有的話就要清除中斷標(biāo)志被去,Thread.interrupted會(huì)清除中斷標(biāo)志)
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
// 執(zhí)行成功的任務(wù)數(shù)量++
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 處理worker退出
// 一種是異常退出
// 一種是普通線程(非核心)主儡,空閑時(shí)間到了退出
processWorkerExit(w, completedAbruptly);
}
}
注意:
Worker有兩種方式結(jié)束線程執(zhí)行:
task.run()
執(zhí)行業(yè)務(wù)方法時(shí)拋出異常,或者響應(yīng)wt.interrupt()
中斷拋出異常惨缆。getTask()
方法返回null糜值,正常退出執(zhí)行丰捷。
線程退出時(shí)會(huì)執(zhí)行processWorkerExit()
方法,completedAbruptly
為true
表示異常退出寂汇,false
表示正常退出病往。
Worker線程通過(guò)加鎖改變state狀態(tài)來(lái)表示線程是否空閑。beforeExecute
和afterExecute
可以自定義實(shí)現(xiàn)擴(kuò)展骄瓣。
4.2 getTask方法
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
// 線程池狀態(tài)
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 兩種情況從隊(duì)列里面取不到task
// 1荣恐、線程池是STOP狀態(tài),這個(gè)狀態(tài)會(huì)清空隊(duì)列累贤,同時(shí)停止正在處理的任務(wù),自然少漆,如果是這個(gè)狀態(tài)臼膏,直接返回null,表明取不到task
// 2、線程池是SHUTDOWN狀態(tài)并且隊(duì)列為空示损,因?yàn)榇藸顟B(tài)下 隊(duì)列里是不會(huì)新增任何task渗磅,所以在隊(duì)列為空的情況下,自然也是取不到
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// worker數(shù)量減一
decrementWorkerCount();
return null;
}
// 計(jì)算worker數(shù)量
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 通過(guò)最大線程數(shù)量或者獲取task超時(shí) 來(lái)決定是否要消減此worker
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 1检访、timed期限的始鱼,非核心線程如果空閑時(shí)間超過(guò)keepAliveTime,就會(huì)被清理掉
// 因此如果這里從阻塞隊(duì)列里在keepAliveTime時(shí)間內(nèi)都沒有取到task脆贵,說(shuō)明處理超時(shí)了
// 2医清、沒有timed限制的,take阻塞的取task
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
此方法有兩處返回null卖氨,用于退出線程会烙。
第一處:
當(dāng)線程池處于STOP及之后的狀態(tài)
線程池處于SHUTDOWN狀態(tài)并且任務(wù)隊(duì)列沒有任務(wù)。
第二處:
線程數(shù)超過(guò)最大線程數(shù)
線程獲取任務(wù)超時(shí)
注意:
此方法是用于清除空閑線程筒捺,通過(guò)超時(shí)獲取任務(wù)隊(duì)列來(lái)清除非核心線程柏腻,通過(guò)設(shè)置allowCoreThreadTimeOut
為true
也可以清除空閑的核心線程。
4.3 processWorkerExit方法
對(duì)worker執(zhí)行結(jié)束之后系吭,清理掉當(dāng)前worker之后考慮是否采取用新的worker來(lái)替換
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//異常結(jié)束workerCount減一五嫂,正常結(jié)束在getTask已經(jīng)
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 當(dāng)前worker完成的task數(shù)量累加
completedTaskCount += w.completedTasks;
// 從woker Set中移除當(dāng)前worker
workers.remove(w);
} finally {
mainLock.unlock();
}
// 每次worker結(jié)束后都要嘗試終止線程池,說(shuō)不定某個(gè)時(shí)刻worker都被清理了并且達(dá)到了線程池終止的條件
// 就可以從這里結(jié)束
tryTerminate();
int c = ctl.get();
// 如果線程池狀態(tài)小于STOP
if (runStateLessThan(c, STOP)) {
// 如果不是異常
// 如果用戶任務(wù)發(fā)生了異常肯尺,嘗試替換worker
// 或者核心線程數(shù)量小于corePoolSize沃缘,嘗試添加worker
// 或者隊(duì)列非空,但是已經(jīng)沒有worker了则吟,嘗試添加worker
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 這里新添加worker孩灯,沒有初始化的firstTask, 這里worker處理的任務(wù)來(lái)自于隊(duì)列
addWorker(null, false);
}
}
流程說(shuō)明:
從worker set中移除當(dāng)前worker
嘗試終止線程池
-
如果線程池狀態(tài)還未達(dá)到STOP,則可以根據(jù)以下情況添加新的worker
用戶task拋出了異常(也就是completedAbruptly=true)
運(yùn)行的線程數(shù)已經(jīng)小于核心線程數(shù)
運(yùn)行的線程數(shù)為0逾滥,但是隊(duì)列中的任務(wù)不為空
5 線程池關(guān)閉
5.1 shutDown方法
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 如果有security管理峰档,需要確保有權(quán)限去shutdown線程池
checkShutdownAccess();
// 設(shè)置線程池狀態(tài)為SHUTDOWN
advanceRunState(SHUTDOWN);
// 中斷所有空閑線程
interruptIdleWorkers();
// 為ScheduledThreadPoolExecutor提供的鉤子方法
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 嘗試終止線程池
tryTerminate();
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 空閑狀態(tài)下worker的鎖資源肯定是可以直接獲取到的败匹,因此根據(jù)此便可以判別線程是否空閑
if (!t.isInterrupted() && w.tryLock()) {
try {
// 打上中斷標(biāo)志
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
流程說(shuō)明:
- 先將線程池的狀態(tài)改為SHUTDOWN
- 嘗試中斷所有空閑線程(通過(guò)線程是否可加鎖判斷是否空閑)
- 嘗試終止線程池
5.2 shutDownNow方法
// 返回隊(duì)列中未執(zhí)行的task列表
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 將線池的狀態(tài)設(shè)置為STOP
advanceRunState(STOP);
// 中斷所有worker
interruptWorkers();
// 清空隊(duì)列
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 嘗試終止線程池
// 雖然線程池狀態(tài)已經(jīng)改成了STOP狀態(tài),但還需要workers被清空之后才會(huì)真正變成TERMINATED狀態(tài)
// 所以這里不一定會(huì)成功, runWorker方法中處理worker退出時(shí)會(huì)觸發(fā)tryTerminate
tryTerminate();
return tasks;
}
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
// Worker內(nèi)部方法
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
流程說(shuō)明:
先將線程池的狀態(tài)改為STOP
嘗試中斷所有線程
清空任務(wù)隊(duì)列
嘗試終止線程池
5.3 tryTerminate方法
嘗試關(guān)閉線程池
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 如果線程池是RUNNING狀態(tài)的
// 如果線程池是TIDYING讥巡、TERMINATED狀態(tài)的不管(基本已經(jīng)處于關(guān)閉狀態(tài)了)
// 如果線程池是SHUTDOWN狀態(tài)并且隊(duì)列中還有task(SHUTDOWN態(tài)下還是會(huì)處理現(xiàn)有的task)
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 到這里就說(shuō)明需要開始線程池終止操作
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 操作1:設(shè)置線程池狀態(tài)為TIDYING掀亩,worker數(shù)量為0
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// terminated鉤子方法
terminated();
} finally {
// 操作1設(shè)置后,又處理了terminated方法欢顷,那么就可以把線程池置為TERMINATED狀態(tài)了(線程池已完全關(guān)閉)
ctl.set(ctlOf(TERMINATED, 0));
// 通知在termination條件上等待的操作
// 比如awaitTermination方法的等待操作槽棍,這里喚醒后,可能會(huì)提前結(jié)束等到操作
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
當(dāng)線程池處于以下狀態(tài)不需要關(guān)閉:
如果線程池是RUNNING狀態(tài)的
如果線程池是SHUTDOWN狀態(tài)并且隊(duì)列中還有task(SHUTDOWN狀態(tài)下還是會(huì)處理現(xiàn)有的task)