首先放上ThreadPoolExecutor的繼承實(shí)現(xiàn)圖:
代碼如下:
public class ThreadPoolExecutor extends AbstractExecutorService {}
public abstract class AbstractExecutorService implements ExecutorService {}
public interface ExecutorService extends Executor {}
我們先debug代碼來走一遍流程:
- ThreadPoolTest:
請注意我使用的是
ExecutorService service = Executors.newFixedThreadPool(1);
來生成線程格嘁,使用別的線程池流程跟我的不一樣。但是最后進(jìn)入的方法還會是execute().
-
AbstractExecutorService對ExecutorService一些方法做了默認(rèn)的實(shí)現(xiàn)稿湿,主要是submit和invoke方法符欠。
AbstractExecutorService#submit -
最后進(jìn)入的實(shí)現(xiàn)方法是ThreadPoolExecutor中的execute().這個方法會在后面有更為詳細(xì)的解答瓶您。
ThreadPoolExecutor#execute
變量
鏡頭給到變量和一些狀態(tài)量舟陆,這些很好理解坠非。
//int類型數(shù)字敏沉,高三位表示線程池狀態(tài),后29位表示worker數(shù)量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//29
private static final int COUNT_BITS = Integer.SIZE - 3;
//線程池允許的最大線程數(shù)炎码,1左移29位盟迟,也就是2的29次方-1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
//線程的狀態(tài)
// << COUNT_BITS代表左移 COUNT_BITS位,也就是 111,000,001,010,011后面跟29個0
private static final int RUNNING = -1 << COUNT_BITS;//可以接受新線程潦闲,可以繼續(xù)執(zhí)行隊(duì)列中的任務(wù)
private static final int SHUTDOWN = 0 << COUNT_BITS;//不再接受新任務(wù)攒菠,但可以繼續(xù)執(zhí)行隊(duì)列中的任務(wù)
private static final int STOP = 1 << COUNT_BITS;//新任務(wù)和隊(duì)列中的任務(wù)都拒絕執(zhí)行和接受,并中斷正在執(zhí)行的任務(wù)
private static final int TIDYING = 2 << COUNT_BITS;//所有任務(wù)已被中斷
private static final int TERMINATED = 3 << COUNT_BITS;//已清理完現(xiàn)場
核心變量
//池中所保存的線程數(shù),不包括空閑線程,線程池里一直不會被銷毀的線程數(shù)量
//活動線程小于corePoolSize則直接創(chuàng)建歉闰,大于等于則先加到workQueue中辖众,隊(duì)列滿了才創(chuàng)建新的線程。
private volatile int corePoolSize;
//池中允許的最大線程數(shù)
private volatile int maximumPoolSize;
//線程數(shù)大于核心數(shù)時和敬,此為多余的空閑線程在終止前等待新任務(wù)的最大等待時間
private volatile long keepAliveTime;
//執(zhí)行程序創(chuàng)建新線程使用的工廠
private volatile ThreadFactory threadFactory;
//執(zhí)行前保存任務(wù)的隊(duì)列凹炸,此隊(duì)列僅保持由execute方法提交的Runnable任務(wù)
private final BlockingQueue<Runnable> workQueue;
//由于超出線程范圍和隊(duì)列容量而使執(zhí)行的拒絕策略
private volatile RejectedExecutionHandler handler;
關(guān)于創(chuàng)建線程時根據(jù)核心線程數(shù)還有最大線程數(shù)的這些判斷來生成線程會在后面方法介紹中更詳細(xì)說明,在這里需要多說一點(diǎn)關(guān)于拒絕策略昼弟。也就是
private volatile RejectedExecutionHandler handler;
線程池是有容量大小的还惠,肯定沒有辦法無限制增長,這個變量便是針對線程無法提交到線程池時產(chǎn)生的。
內(nèi)部類
ThreadPoolExecutor是有5個內(nèi)部類的蚕键,代碼和圖如下:
可以分為兩類救欧,一類是Worker,一類是執(zhí)行的拒絕策略。Worker會在后面說明锣光。執(zhí)行策略如下:
-
CallerRunsPolicy
CallerRunsPolicy -
AbortPolicy:
AbortPolicy -
DiscardPolicy:
DiscardPolicy -
DiscardOldestPolicy:
DiscardOldestPolicy
AbortPolicy策略是默認(rèn)拒絕策略笆怠。
方法
看源碼的方法我一般跳過getXXX潘明,setXXX方法盖袭,構(gòu)造方法帶著看。
public void execute(Runnable command)
我覺得這是這個類最重要的方法笨忌,所以放在第一個講频丘,來牽扯出其它的方法办成。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//活動線程數(shù)量比核心線程數(shù)小,直接創(chuàng)建worker執(zhí)行任務(wù)
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
//添加失敗搂漠,獲取標(biāo)記
c = ctl.get();
}
//worker數(shù)量超過核心線程數(shù)迂卢,任務(wù)直接進(jìn)入隊(duì)列
//線程是運(yùn)行狀態(tài)并且扔到隊(duì)列成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//線程池不是running狀態(tài),說明執(zhí)行過shutdown命名桐汤,需要對新加入的任務(wù)執(zhí)行reject命令
//這里需要recheck原因在于任務(wù)加入隊(duì)列而克,線程池狀態(tài)可能發(fā)生變化
if (! isRunning(recheck) && remove(command))
reject(command);
//核心線程數(shù)允許為0
//發(fā)現(xiàn)線程池?cái)?shù)量是0
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//線程池不是運(yùn)行狀態(tài),或者任務(wù)進(jìn)入隊(duì)列失敗怔毛,則嘗試創(chuàng)建worker執(zhí)行任務(wù)
//1:線程池不是運(yùn)行狀態(tài)员萍,addworker內(nèi)部會判斷線程池狀態(tài)
//2:addworker第二個參數(shù)表示是否創(chuàng)建核心線程
//3:addWorker返回false,表示創(chuàng)建失敗拣度,需要執(zhí)行reject操作
else if (!addWorker(command, false))
reject(command);
}
我們可以看到核心代碼是3個if-elseif代碼塊碎绎,但其實(shí)這里執(zhí)行了4個邏輯:
1:活動線程小于corePoolSize的時候創(chuàng)建新的線程,不排隊(duì)
2:活動線程大于corePoolSize并且入隊(duì)成功(也就是隊(duì)列沒滿)加入到任務(wù)隊(duì)列當(dāng)中
3:執(zhí)行addWorker(command, false),也就是表示活動線程大于 corePoolSize 且小于 maximumPoolSize ,且等待隊(duì)列已滿,(addWorker核心邏輯就是判斷時候小于maximumPoolSize )
4::倘若addWorker(command, false)返回false抗果,也就是表示活動線程大于 corePoolSize 且大于 maximumPoolSize 筋帖,且等待隊(duì)列已滿,則調(diào)用拒絕策略來處理該任務(wù)
看完這個方法窖张,我想你對addWorker這個方法肯定想一探究竟:
private boolean addWorker(Runnable firstTask, boolean core)
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//外層自旋
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// (rs>=SHUTDOWN) || (rs == SHUTDOWN && firsttask != null) || (rs==SHUTDOWN && workerQueue.isEmpty())
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//內(nèi)部自旋
for (;;) {
int wc = workerCountOf(c);
//worker數(shù)量超過容量幕随,直接返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//cas方式 增加worker數(shù)量+1
//若增加成功,跳出外循環(huán)進(jìn)入第三部分
if (compareAndIncrementWorkerCount(c))
break retry;
//增加失敗宿接,再次讀取
c = ctl.get(); // Re-read ctl
//線程池狀態(tài)發(fā)生變化赘淮,對外層循環(huán)自旋
if (runStateOf(c) != rs)
continue retry;
//其它情況,內(nèi)循環(huán)自旋
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
//這個thread看worker可以看出來
//這個thread就是我們要啟動的thread
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
//worker添加是串行的睦霎,需要加鎖
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
//檢查線程池狀態(tài)
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//worker已經(jīng)調(diào)用過start梢卸,則不再創(chuàng)建worker
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//因?yàn)閣orkers不是線程安全的,所以前面需要上鎖
workers.add(w);
int s = workers.size();
//更新largestPoolSize
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//啟動
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//worker啟動失敗,說明線程池狀態(tài)發(fā)生變化(關(guān)閉操作被執(zhí)行)副女,需要進(jìn)行shutdown操作
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
這個方法分兩部分蛤高,一部分是雙層自旋,一部分是下面的真正執(zhí)行線程的代碼,你需要明白下面這些地方:
- 雙層自旋來不停的以保證在線程狀態(tài)沒有變化的情況下以cas方式將ctl變量+1(compareAndIncrementWorkerCount(c)),然后跳出雙層循環(huán)執(zhí)行下面的啟動線程步驟戴陡,否則返回false表示失敗塞绿。
- 加入HashSet<Worker> workers隊(duì)列,并執(zhí)行線程
Worker#runWorker
執(zhí)行線程就是Worker類中的run()方法,這個方法調(diào)用了runWorker();
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
//拿到當(dāng)前線程
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
//允許外部能夠中斷
w.unlock(); // allow interrupts
//線程執(zhí)行異常判斷標(biāo)志
boolean completedAbruptly = true;
try {
//自旋
//拿到task
while (task != null || (task = getTask()) != null) {
//加鎖
//1.降低鎖范圍恤批,提升性能
//2.保證每個worker是串行的
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
//線程池正在停止异吻,則對當(dāng)前線程進(jìn)行中斷操作
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
//beforeExecute()和afterExecute()拓展功能,在這路是空實(shí)現(xiàn)
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
//幫助gc
task = null;
//完成任務(wù)數(shù)+1喜庞,volatile類型
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//自旋操作被退出诀浪,說明線程池正在結(jié)束
processWorkerExit(w, completedAbruptly);
}
}
getTask
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//必要情況下需要檢查workQueue是否為空
// Check if queue empty only if necessary.
//SHUTDOWN狀態(tài)下還是可以執(zhí)行隊(duì)列中的線程,所以判斷是否是更高狀態(tài)STOP延都,同時隊(duì)列為空也不會再執(zhí)行了雷猪。
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
//wc > corePoolSize成立代表當(dāng)前線程數(shù)大于核心線程數(shù),那么超過corePoolSize的的線程必定有超時
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//根據(jù)超時與否拿到執(zhí)行線程
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
processWorkerExit
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//是否異常退出晰房,如果不是在runWorker的getTask方法workerCount已經(jīng)被減一了
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//在前面runWorker() finally代碼塊中+1
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
// 嘗試停止線程池
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
// 線程不是異常結(jié)束
if (!completedAbruptly) {
// 線程池最小空閑數(shù)求摇,允許core thread超時就是0,否則就是corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果min == 0但是隊(duì)列不為空要保證有1個線程來執(zhí)行隊(duì)列中的任務(wù)
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 線程池還不為空那就不用擔(dān)心了
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 1.線程異常退出
// 2.線程池為空嫉你,但是隊(duì)列中還有任務(wù)沒執(zhí)行月帝,看addWoker方法對這種情況的處理
addWorker(null, false);
}
}
tryTerminate()
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 以下狀態(tài)直接返回:
// 1.線程池還處于RUNNING狀態(tài)
// 2.SHUTDOWN狀態(tài)但是任務(wù)隊(duì)列非空
// 3.runState >= TIDYING 線程池已經(jīng)停止了或在停止了
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 只能是以下情形會繼續(xù)下面的邏輯:結(jié)束線程池躏惋。
// 1.SHUTDOWN狀態(tài)幽污,這時不再接受新任務(wù)而且任務(wù)隊(duì)列也空了
// 2.STOP狀態(tài),當(dāng)調(diào)用了shutdownNow方法
// workerCount不為0則還不能停止線程池,而且這時線程都處于空閑等待的狀態(tài)
// 需要中斷讓線程“醒”過來簿姨,醒過來的線程才能繼續(xù)處理shutdown的信號距误。
if (workerCountOf(c) != 0) { // Eligible to terminate
// runWoker方法中w.unlock就是為了可以被中斷,getTask方法也處理了中斷。
// ONLY_ONE:這里只需要中斷1個線程去處理shutdown信號就可以了
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 進(jìn)入TIDYING狀態(tài)
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 子類重載:一些資源清理工作扁位,可以自定義實(shí)現(xiàn)功能
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
shutdown():
將runState置為SHUTDOWN准潭,會終止所有空閑的線程。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 線程池狀態(tài)設(shè)為SHUTDOWN域仇,如果已經(jīng)至少是這個狀態(tài)那么則直接返回
advanceRunState(SHUTDOWN);
// 注意這里是中斷所有空閑的線程:runWorker中等待的線程被中斷
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// tryTerminate方法中會保證隊(duì)列中剩余的任務(wù)得到執(zhí)行刑然。
tryTerminate();
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
//w.tryLock()能拿到鎖就表示該線程空閑
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
shutdownNow():
將runState置為STOP。和shutdown方法的區(qū)別暇务,這個方法會終止所有的線程泼掠。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// STOP狀態(tài):不再接受新任務(wù)且不再執(zhí)行隊(duì)列中的任務(wù)
advanceRunState(STOP);
// 中斷所有線程
interruptWorkers();
// 返回隊(duì)列中還沒有被執(zhí)行的任務(wù)。
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
以上垦细。