ThreadPoolExecutor如何實(shí)現(xiàn)任務(wù)的提交和執(zhí)行的呢?
首先,看一下ThreadPoolExecutor的Worker內(nèi)部類窖壕。
Worker
ThreadPoolExecutor定義了內(nèi)部類Worker來表征線程池中的工作線程:
// 繼承了AQS宠纯,并實(shí)現(xiàn)了Runnable接口
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
// 工作線程
final Thread thread;
// 待執(zhí)行的任務(wù)
Runnable firstTask;
// 當(dāng)前線程已執(zhí)行的任務(wù)數(shù)
volatile long completedTasks;
// 構(gòu)造函數(shù)
Worker(Runnable firstTask) {
// 調(diào)用AQS的setState方法將鎖狀態(tài)設(shè)置為-1
setState(-1);
this.firstTask = firstTask;
// 通過線程工廠創(chuàng)建線程
// 注意: 創(chuàng)建線程時(shí)會將當(dāng)前worker傳入,worker本身也是一個(gè)runnable
this.thread = getThreadFactory().newThread(this);
}
// 定義啟動函數(shù)
// addWorker()-->t.start()-->t.run()-->worker.run()
public void run() {
runWorker(this);
}
// 0代表無鎖狀態(tài)
// 1代表有鎖狀態(tài)
protected boolean isHeldExclusively() {
return getState() != 0;
}
// 實(shí)現(xiàn)AQS的tryAcquire方法
protected boolean tryAcquire(int unused) {
// CAS將狀態(tài)值由0更新為1
if (compareAndSetState(0, 1)) {
// 若成功,則將當(dāng)前線程設(shè)置為鎖獨(dú)占線程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 實(shí)現(xiàn)AQS的tryRelease方法
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
// 將狀態(tài)值為0
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
// 中斷已啟動線程
void interruptIfStarted() {
Thread t;
// getState() >= 0 代表線程處于非Running狀態(tài)
// (t = thread) != null 代表工作線程不為null
// !t.isInterrupted() 代表當(dāng)前線程未被中斷過
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
為何將線程包裝成worker呢磅轻?其實(shí)主要為了實(shí)現(xiàn)工作線程和空閑線程的識別。
- 正在執(zhí)行任務(wù)的線程為工作線程;
- 未執(zhí)行任務(wù)的線程為空閑線程买雾。
Worker繼承了AQS睦尽,并定義了tryAcquire和tryRelease方法当凡。線程需要獲取鎖才可以執(zhí)行任務(wù)冤荆,任務(wù)執(zhí)行完畢后釋放鎖乌妒。
當(dāng)檢測到線程有鎖時(shí),則說明該線程為工作線程;反之光涂,當(dāng)檢測到線程無鎖時(shí)恋博,則說明該線程為空閑線程重虑。
下面從線程執(zhí)行方法開始跟一下源碼:
execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// worker數(shù)目小于corePoolSize
if (workerCountOf(c) < corePoolSize) {
// 將command作為firstTask創(chuàng)建1個(gè)核心worker
if (addWorker(command, true))
return;
c = ctl.get();
}
// 此時(shí)核心線程數(shù)已滿,嘗試創(chuàng)建非核心線程處理command任務(wù)
// 如果線程池狀態(tài)為running且將當(dāng)前任務(wù)添加到阻塞隊(duì)列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 如果線程池狀態(tài)不為running且當(dāng)前任務(wù)已成功移除出阻塞隊(duì)列,則執(zhí)行拒絕策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果線程池工作線程數(shù)目為0,則添加1個(gè)非核心工作線程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果線程池狀態(tài)不為running
// 或者線程池狀態(tài)為running且當(dāng)前任務(wù)添加到阻塞隊(duì)列失敗(阻塞隊(duì)列已滿),則嘗試添加非核心工作線程并處理當(dāng)前任務(wù)
// 若失敗家淤,則執(zhí)行拒絕策略
else if (!addWorker(command, false))
reject(command);
}
歸納一下任務(wù)提交流程:
- 若當(dāng)前工作線程數(shù)目小于corePoolSize歹苦,則創(chuàng)建新的核心線程狠角,并將command任務(wù)提交給該新建的核心線程執(zhí)行;
- 若當(dāng)前工作線程數(shù)目已等于corePoolSize狐赡,則將command任務(wù)添加到阻塞隊(duì)列宵蕉;
- 若command任務(wù)未添加到阻塞隊(duì)列(阻塞隊(duì)列已滿),則創(chuàng)建新的非核心線程薄榛,并將command任務(wù)提交給該新建的非核心線程執(zhí)行谋右。
可以看到啸蜜,execute方法主要落腳在addWorker方法上呢岗。
addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
// 外層循環(huán)挫酿,負(fù)責(zé)判斷線程池狀態(tài)
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 第一種情況: rs >= SHUTDOWN,即線程池的狀態(tài)為SHUTDOWN、STOP、TIDYING藏杖、TERMINATED,此時(shí)沒必要添加工作線程
// 第二種情況: 下列3種情況只要滿足1種,則沒必要要添加工作線程
// (1) rs != SHUTDOWN(隱含rs >= SHUTDOWN),即線程池狀態(tài)為STOP历葛、TIDYING咒程、TERMINATED
// (2) firstTask != null(隱含rs == SHUTDOWN)奶段,當(dāng)線程池狀態(tài)為SHUTDOWN時(shí),如果firstTask != null,此時(shí)添加任務(wù)會被拒絕
// (3) workQueue.isEmpty()(隱含rs == SHUTDOWN && firstTask == null)娜谊,如果此時(shí)任務(wù)隊(duì)列為空抹剩,則沒必要添加工作線程
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 內(nèi)層循環(huán)钳踊,將Worker數(shù)目+1
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS將worker數(shù)目+1,成功則跳出retry循環(huán)
if (compareAndIncrementWorkerCount(c))
break retry;
// CAS將worker數(shù)目+1失敗,再次讀取ctl
c = ctl.get(); // Re-read ctl
// 如果線程池狀態(tài)發(fā)生改變,則跳出內(nèi)層循環(huán),繼續(xù)外層循環(huán)
if (runStateOf(c) != rs)
continue retry;
}
}
// 1. 將線程添加到Workers Set集合
// 2. 啟動線程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
// rs < SHUTDOWN,表明線程池狀態(tài)為RUNNING
// rs == SHUTDOWN && firstTask == null,當(dāng)線程池狀態(tài)為SHUTDOWN耕突,且Worker的初始任務(wù)為null纵诞,但workQueue中可能有未執(zhí)行完的任務(wù),此時(shí)仍需添加worker
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 若線程添加成功,則啟動該線程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
概括一下上述流程:
- 判斷線程池當(dāng)前是否為可以添加worker線程的狀態(tài)窒悔,可以則繼續(xù)進(jìn)行下一步北救,不可以則return false;
- 線程池狀態(tài)>shutdown,可能為stop、tinying、terminated,不能添加worker線程;
- 線程池狀態(tài)為shutdown,且firstTask不為空,不能添加worker線程格粪,因?yàn)閟hutdown狀態(tài)的線程池不接收新任務(wù);
- 線程池狀態(tài)為shutdown吓肋,firstTask為空均蜜,且workQueue也為空,不能添加worker線程,因?yàn)閒irstTask為空是為了添加一個(gè)沒有任務(wù)的線程再從workQueue獲取Task,而workQueue為空,說明添加無任務(wù)線程已經(jīng)沒有意義。
- 線程池當(dāng)前線程數(shù)量是否超過上限(corePoolSize或maximumPoolSize),超過了return false,沒超過則對workerCount+1封锉,繼續(xù)下一步;
- 在線程池ReentrantLock保證下,向Workers Set中添加新創(chuàng)建的worker實(shí)例,添加完成后解鎖;
- 當(dāng)worker添加成功后,則啟動該線程。
t.start()方法很有意思,因?yàn)閠為worker持有的線程,t初始化時(shí)傳入的runnable又為worker本身。
t.start()本質(zhì)上會調(diào)用到Thread的run方法:
@Override
public void run() {
if (target != null) {
target.run();
}
}
Thread的run()方法又會調(diào)用到runnable的run()方法缕坎,worker繼承了Runnable接口荷腊,并覆寫了run方法:
public void run() {
runWorker(this);
}
本質(zhì)上調(diào)用的是ThreadPoolExecutor的runWorker方法:
runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 因?yàn)槌跏蓟蟮膚orker的state值為-1,需要通過unlock()方法將state值置為0,保證worker可執(zhí)行l(wèi)ock()操作
w.unlock();
boolean completedAbruptly = true;
try {
// 阻塞執(zhí)行(直到task為空才退出)
while (task != null || (task = getTask()) != null) {
// 需要獲取worker獨(dú)占鎖四瘫,且不重入
// 執(zhí)行任務(wù)前锹杈,獲取worker鎖咬清,任務(wù)執(zhí)行完畢后,才釋放worker鎖
// 只要檢測到worker為已獲取鎖狀態(tài),則證明該worker為active狀態(tài)
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 執(zhí)行任務(wù)
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 處理worker退出
processWorkerExit(w, completedAbruptly);
}
}
概括一下上述流程:
- 首先通過unlock()方法將state值置為0(初始化后的worker的state值為-1笆环,無法成功執(zhí)行l(wèi)ock()操作)账忘,保證worker后續(xù)可以獲取鎖以便執(zhí)行任務(wù)蒋荚;
- 阻塞獲取worker自身持有的task及阻塞隊(duì)列中的task,然后執(zhí)行寺酪;
- 當(dāng)獲取不到task時(shí)耿戚,釋放掉worker鎖變?yōu)榭臻e線程皂股;
- 最后執(zhí)行processWorkerExit方法處理空閑線程洋机。
接著看一下getTask和processWorkerExit方法。
getTask
private Runnable getTask() {
// poll獲取任務(wù)是否超時(shí)
boolean timedOut = false;
for (;;) { ①
int c = ctl.get();
int rs = runStateOf(c);
// 第一種情況: rs的狀態(tài)為stop心剥、tinying疑苫、terminated
// 第二種情況: rs的狀態(tài)為shutdown,且workQueue.isEmpty
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { ②
// 循環(huán)CAS減少worker數(shù)量蚊丐,直到成功
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 是否有超時(shí)機(jī)制timed
// 1. allowCoreThreadTimeOut允許核心線程空閑超時(shí)后回收
// 2. wc > corePoolSize代表非核心線程空閑均會超時(shí)回收
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; ③
// 1. wc > maximumPoolSize且wc > 1
// 2. (timed && timedOut)且wc > 1
// 3. wc > maximumPoolSize且workQueue.isEmpty()
// 4. (timed && timedOut)且workQueue.isEmpty()
if ((wc > maximumPoolSize || (timed && timedOut)) ④
&& (wc > 1 || workQueue.isEmpty())) { ⑤
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ? ⑥
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
timed標(biāo)識當(dāng)前線程是否具有超時(shí)機(jī)制:
- wc > corePoolSize時(shí),代表當(dāng)前worker線程為非核心線程,timed恒等于true,說明非核心線程均具有超時(shí)機(jī)制炮障;
- wc <= corePoolSize時(shí),代表當(dāng)前worker線程為核心線程由蘑,核心線程默認(rèn)不具有超時(shí)機(jī)制(allowCoreThreadTimeOut默認(rèn)為false)淆院,僅有allowCoreThreadTimeOut配置為true時(shí),核心線程才具有線程機(jī)制结洼。
換句話說。只要timed為true莫换,當(dāng)前worker線程必然具有超時(shí)機(jī)制。
(1)核心線程
假設(shè)某核心線程已將task執(zhí)行完沪么,且workQueue也為空,線程在runWorker()方法里繼續(xù)阻塞執(zhí)行g(shù)etTask()方法碗硬,因?yàn)閍llowCoreThreadTimeOut默認(rèn)為false恩尾,且wc <= corePoolSize猎物,故timed為false蔫磨。
此時(shí)搀罢,第一個(gè)判斷:(wc > maximumPoolSize || (timed && timedOut)為false唧取,直接執(zhí)行⑥處代碼款违,等待keepAliveTime后,因?yàn)閣orkQueue為空形真,所以超時(shí)之后返回null杉编,并將timeOut設(shè)置為true,接著繼續(xù)執(zhí)行①處循環(huán)咆霜。
由于timed一直等于false邓馒,所以該空閑的核心線程會一直阻塞在①處。
(2)非核心線程
假設(shè)某非核心線程已將task執(zhí)行完蛾坯,且workQueue也為空光酣,線程在runWorker()方法里繼續(xù)阻塞執(zhí)行g(shù)etTask()方法,因?yàn)閍llowCoreThreadTimeOut默認(rèn)為false脉课,但此時(shí)wc > corePoolSize救军,故timed為true。
此時(shí)倘零,第一個(gè)判斷:(wc > maximumPoolSize || (timed && timedOut)為false唱遭,直接執(zhí)行⑥處代碼,等待keepAliveTime后呈驶,因?yàn)閣orkQueue為空拷泽,所以超時(shí)之后返回null,并將timeOut設(shè)置為true俐东,接著繼續(xù)執(zhí)行①處循環(huán)跌穗。
繼續(xù)執(zhí)行到第一個(gè)判斷,此時(shí)(wc > maximumPoolSize || (timed && timedOut)為true虏辫,繼續(xù)執(zhí)行第二個(gè)判斷: (wc > 1 || workQueue.isEmpty())蚌吸,此時(shí)第二個(gè)判斷為true,嘗試將工作線程數(shù)減1砌庄,若成功羹唠,則直接返回null奕枢,若失敗,則繼續(xù)執(zhí)行①處循環(huán)佩微,直到工作線程數(shù)減1操作成功缝彬。
(3)wc > maximumPoolSize
正常情況下,wc不會大于maximumPoolSize哺眯,因?yàn)樘砑觲orker時(shí)谷浅,會先判斷線程數(shù)是否超過maximumPoolSize,若超過則不執(zhí)行addWorker操作奶卓。之所以出現(xiàn)wc > maximumPoolSize一疯,可能是某線程執(zhí)行了setMaximumPoolSize操作,新設(shè)置的maximumPoolSize低于現(xiàn)有worker數(shù)夺姑。
此時(shí)當(dāng)前worker執(zhí)行g(shù)etTask操作時(shí)墩邀,由于wc > maximumPoolSize,循環(huán)執(zhí)行compareAndDecrementWorkerCount操作盏浙,直到成功返回null眉睹。接著跳出addWorker的while循環(huán),繼續(xù)執(zhí)行finally代碼塊的processWorkerExit操作废膘。
processWorkerExit
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly)
decrementWorkerCount();
// 從Workers Set中移除worker
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
// 任何對線程池有負(fù)效益的操作時(shí)竹海,都需要嘗試終止線程池
tryTerminate();
int c = ctl.get();
// 線程池狀態(tài)為running或shutdown時(shí),如果不是突然終止的丐黄,但當(dāng)前線程數(shù)量少于需要維護(hù)的線程數(shù)量站削,則addWorker()
// 如果corePoolSize為0且workQueue不為空,則創(chuàng)建1個(gè)線程逐漸消耗掉workQueue中的任務(wù)
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
// allowCoreThreadTimeOut默認(rèn)為false孵稽,min為核心線程數(shù)
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果min為0,即不需要維持核心線程數(shù)
// 但workQueue不為空十偶,至少保持一個(gè)線程
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 如果線程數(shù)量大于最小數(shù)量菩鲜,直接返回
// 否則下面至少要addWorker一個(gè)
if (workerCountOf(c) >= min)
return;
}
addWorker(null, false);
}
}
如果線程是突然終止的,說明是task執(zhí)行時(shí)出現(xiàn)異常導(dǎo)致的惦积,即run()方法執(zhí)行時(shí)發(fā)生異常接校,那正在工作的線程數(shù)量需要減1。
如果不是突然終止的狮崩,說明是worker線程沒有task可執(zhí)行蛛勉,不用減1,因?yàn)間etTask()方法中已經(jīng)減1了睦柴。