ThreadPoolExecutor類結(jié)構(gòu)
線程池執(zhí)行流程
- 線程池判斷核心線程是否都處于運行狀態(tài)候址,如果不是慢叨,就創(chuàng)建一個新線程來執(zhí)行任務(wù)。如果是丽惭,執(zhí)行2击奶。
- 判斷線程池中的工作隊列是否已經(jīng)滿,如果未滿责掏,那么直接添加到工作隊列柜砾,如果滿了就執(zhí)行3。
- 線程池判斷池中的線程是否處于工作狀態(tài)换衬。如果沒有就創(chuàng)建一個新工作線程執(zhí)行任務(wù)痰驱。如果已經(jīng)滿了,則執(zhí)行飽和策略的任務(wù)瞳浦。
ThreadPoolExecutor運行機制
- 工作線程數(shù) < 核心線程數(shù)
1.1 new Thread創(chuàng)建新線程作為當前工作線程担映,將當前工作線程Worker添加到workers集合
1.2 啟動當前線程(調(diào)用runWorker方法)
1.2.1 獲取當前線程,當前線程已執(zhí)行完畢并釋放叫潦,那么從阻塞隊列獲取任務(wù)(調(diào)用getTask)
1.2.1.1 如果等待隊列中存在了等待的任務(wù)蝇完,那么取出等待隊列中第一個任務(wù)返回。
1.2.1.2 如果超過核心線程數(shù)的線程,那么timed = true短蜕,同時隊列中已經(jīng)沒有任務(wù)了氢架,那么等待keepalivetime秒就釋放。
1.2.1.3 如果不超過(等于)核心線程數(shù)的線程朋魔,那么timed=false岖研,同時隊列中已經(jīng)沒有任務(wù)了,那么執(zhí)行take方法阻塞警检。
1.2.1.3.1 如果此時進來一個新任務(wù)缎玫,那么發(fā)現(xiàn)阻塞線程數(shù)(工作線程數(shù))不小于核心線程數(shù),那么直接執(zhí)行offer插入等待隊列解滓,等待隊列有值了赃磨,阻塞的線程們又開始搶任務(wù)干活了。
1.2.2 執(zhí)行我們自己的業(yè)務(wù)邏輯(調(diào)用run方法)
1.2.3 移除workers中的當前線程洼裤,回收線程(調(diào)用processWorkerExit方法)
1.3 移除workers中的當前線程邻辉,回收線程(調(diào)用addWorkerFailed方法)
- 工作線程數(shù) < 核心線程數(shù)
- 工作線程數(shù) > 核心線程數(shù)
將當前線程加入到等待隊列BlockQueue。
- 工作線程數(shù) > 核心線程數(shù)
- 等待隊列滿了
直接創(chuàng)建新線程作為工作線程執(zhí)行邏輯腮鞍。
- 等待隊列滿了
- 工作線程數(shù) > 最大線程數(shù)
拋出拒絕策略異常
- 工作線程數(shù) > 最大線程數(shù)
ThreadPoolExecutor源碼分析
關(guān)鍵屬性:
// 該變量ctl作為線程池中的重要屬性值骇,屬性一分為2;
// 高位(前三位)用來保存各個線程的狀態(tài),低位(后三位)保存有效線程的數(shù)量移国。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 線程數(shù)量占用的位數(shù)
private static final int COUNT_BITS = Integer.SIZE - 3;
// 最大線程容量為2的29次冪減1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 接受新任務(wù)并處理排隊任務(wù)
private static final int RUNNING = -1 << COUNT_BITS;
// 不接受新任務(wù)吱瘩,而是處理隊列任務(wù)
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 不接受新任務(wù),不處理排隊任務(wù)和中斷進程中的任務(wù)
private static final int STOP = 1 << COUNT_BITS;
// 終止所有的任務(wù)迹缀,有效的線程數(shù)量設(shè)置為0使碾,TIDYING狀態(tài)的線程執(zhí)行回調(diào)方法terminated()
private static final int TIDYING = 2 << COUNT_BITS;
// terminated()方法已經(jīng)執(zhí)行完畢
private static final int TERMINATED = 3 << COUNT_BITS;
// 持有工作線程隊列
private final BlockingQueue<Runnable> workQueue;
// 工作線程所在的集合上的鎖
private final ReentrantLock mainLock = new ReentrantLock();
// 包含所有工作線程的集合,僅在獲取到鎖時才可以訪問
private final HashSet<Worker> workers = new HashSet<Worker>();
// 使用等待隊列
private final Condition termination = mainLock.newCondition();
// 最大池的大小
private int largestPoolSize;
// 完成任務(wù)時的計數(shù)值祝懂,只能在終止工作線程時更新票摇,在獲取到鎖時才可以訪問
private long completedTaskCount;
// 創(chuàng)建新線程的工廠
private volatile ThreadFactory threadFactory;
// 關(guān)閉執(zhí)行的線程池時調(diào)用的Handler
private volatile RejectedExecutionHandler handler;
// 等待工作的線程的超時時間,單位納秒
private volatile long keepAliveTime;
// 是否允許核心線程等待砚蓬,
// 默認false矢门,就是空閑也可以生存。
// true的話灰蛙,在keepAliveTime時間內(nèi)生存祟剔。
private volatile boolean allowCoreThreadTimeOut;
// 核心運行的線程數(shù)
private volatile int corePoolSize;
// 最大線程數(shù),超出這個數(shù)量將執(zhí)行丟棄策略
private volatile int maximumPoolSize;
// 默認拒絕執(zhí)行策略Handler
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
關(guān)鍵方法:
private static int runStateOf(int c) { return c & ~CAPACITY; }
說明:作用就是獲取線程池的狀態(tài)摩梧;過程:~CAPACITY :按位取反的值為11100000000000000000000000000000物延。c & ~CAPACITY : 執(zhí)行按位與操作,那么結(jié)果的低29位肯定為0障本。
private static int workerCountOf(int c) { return c & CAPACITY; }
說明:獲取線程池工作線程的數(shù)量教届;過程:CAPACITY : 00011111111111111111111111111111响鹃。&操作將參數(shù)的高3位設(shè)置0。
private static int ctlOf(int rs, int wc) { return rs | wc; }
說明:將runState和workerCount存到同一個int中案训。使用或運算买置,將兩個值合并。
// 使用cas方式將線程的數(shù)量加1
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
// 使用cas方式將線程的數(shù)量減1
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
// 減少ctl的計數(shù)值
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
詳解execute方法
- 說明(不關(guān)心線程池的狀態(tài)和能否添加到等待隊列):
① 如果當前工作線程數(shù)小于核心線程數(shù)强霎,委派給addWorker方法(將封裝線程的Worker添加到Worker集合中忿项,然后啟動線程)。
② 如果當前工作線程數(shù)大于核心線程數(shù)城舞,將工作線程command添加到等待隊列轩触。
addWorker方法,將封裝線程的Worker添加到Worker集合中家夺,然后啟動線程脱柱。
// 添加任務(wù)
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 死循環(huán)遍歷
for (;;) {
// 獲取當前的線程池屬性值
int c = ctl.get();
// 獲取線程池的狀態(tài)
int rs = runStateOf(c);
// 如果線程池不可以接受任務(wù)
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN &&
firstTask == null && ! workQueue.isEmpty()))
return false;
// 死循環(huán)遍歷
for (;;) {
// 獲取工作線程的數(shù)量
int wc = workerCountOf(c);
// 如果工作線程的數(shù)量超出線程池最大線程容量
// 或者工作線程的數(shù)量超出核心線程池數(shù)量或者最大線程池的數(shù)量
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 采用cas方法更新線程狀態(tài)值,如果成功拉馋,那么worker數(shù)量加1
if (compareAndIncrementWorkerCount(c))
break retry;
// 重新讀取線程池屬性值
c = ctl.get(); // Re-read ctl
// 如果線程池的狀態(tài)不等于之前的線程狀態(tài)榨为,重新執(zhí)行retry代碼塊
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 獲取線程池的鎖
final ReentrantLock mainLock = this.mainLock;
// 將firstTask封裝成worker
w = new Worker(firstTask);
// 獲取worker的工作線程
final Thread t = w.thread;
// 如果工作線程不為空
if (t != null) {
// 給線程池上鎖
mainLock.lock();
try {
// 獲取當前的線程池的屬性值
int c = ctl.get();
// 獲取當前線程池的狀態(tài)
int rs = runStateOf(c);
// 如果線程池的狀態(tài)可以接受任務(wù)正常工作
// 或者如果線程池不接受任務(wù)并且當前任務(wù)為空
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 判斷工作線程是否可以啟動
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 將當前的工作線程添加到workers集合中
workers.add(w);
// 獲取Worker工作線程的數(shù)量
int s = workers.size();
// 如果Worker工作線程的數(shù)量超出最大池長度
if (s > largestPoolSize)
// 更新最大池長度
largestPoolSize = s;
// 設(shè)置工作線程添加成功
workerAdded = true;
}
} finally {
// 線程池釋放鎖
mainLock.unlock();
}
// 如果工作線程添加成功
if (workerAdded) {
// 啟動工作線程
t.start();
// 設(shè)置工作線程啟動成功
workerStarted = true;
}
}
} finally {
// 如果工作線程啟動失敗煌茴!
if (! workerStarted)
// 回滾Worker線程的創(chuàng)建
addWorkerFailed(w);
}
// 工作線程啟動成功標識
return workerStarted;
}
說明(代碼分為兩個大塊邏輯):
- 第一大塊随闺,兩個死循環(huán):可以發(fā)現(xiàn)代碼結(jié)構(gòu)是兩個死循環(huán),外循環(huán)的作用主要是檢查當前線程池的狀態(tài)蔓腐,如果線程池不可以接受任務(wù)矩乐,那么直接退出。內(nèi)循環(huán)的作用主要是檢查工作線程的數(shù)量回论。
① 如果線程池不可以接受任務(wù)散罕,那么直接退出。
② 如果工作線程的數(shù)量大于最大線程容量或者工作線程的數(shù)量大于核心線程池數(shù)量(最大線程池的數(shù)量)透葛,那么直接退出笨使。
③ 使用cas更新線程屬性值ctl(將ctl值加1),因為ctl的低位用來存儲工作線程的數(shù)量僚害,所以就是cas方式更新工作線程加1,跳出循環(huán)繁调。如果線程池的狀態(tài)和之前的不一致萨蚕,說明cas方式失敗了,那么重新執(zhí)行retry蹄胰。 - 第二大塊岳遥,將當前的工作線程Worker添加到集合,然后啟動線程裕寨。
addWorkerFailed方法:線程添加失敗策略浩蓉。
工作線程Worker類設(shè)計
前面源碼設(shè)計派继,主要還是使用當前工作線程Worker對象,設(shè)計在線程池中執(zhí)行的流程捻艳,然后前面addWorker方法里面驾窟,啟動線程,執(zhí)行Worker的run方法认轨,現(xiàn)在進入Worker類里面绅络,看看源碼怎么設(shè)計的?
Worker類:封裝了需要執(zhí)行的線程類
Worker實現(xiàn)了Runnable接口嘁字,封裝了線程的實現(xiàn)和執(zhí)行恩急。并且繼承了AQS隊列同步器。簡化了獲取鎖和釋放鎖的過程纪蜒,交給AQS去實現(xiàn)了衷恭。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
private static final long serialVersionUID = 6138294804551838833L;
// 運行中的線程,由ThreadFactory工廠創(chuàng)建
final Thread thread;
// 第一個執(zhí)行的任務(wù)纯续,可能為null
Runnable firstTask;
// 線程計數(shù)器匾荆,記住完成的任務(wù)
volatile long completedTasks;
// 使用線程工廠ThreadFactory為第一個任務(wù)firstTask創(chuàng)建線程
Worker(Runnable firstTask) {
// 在執(zhí)行runWorker之前,禁止中斷操作
setState(-1); // inhibit interrupts until runWorker
// 初始化第一個執(zhí)行的任務(wù)
this.firstTask = firstTask;
// 獲取線程工廠杆烁,為當前對象創(chuàng)建一個線程來初始化
this.thread = getThreadFactory().newThread(this);
}
// 運行線程run方法委托給外部的runWorker來執(zhí)行
public void run() {
runWorker(this);
}
// 判斷當前線程是否獨占(使用state狀態(tài)值判斷牙丽,如果獲取鎖就加1,如果釋放鎖就減1)
protected boolean isHeldExclusively() {
return getState() != 0;
}
// 當前線程嘗試獲取鎖
protected boolean tryAcquire(int unused) {
// 使用cas的方式來設(shè)置當前線程獲取鎖
if (compareAndSetState(0, 1)) {
// 設(shè)置當前線程為獨占線程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 當前線程嘗試釋放鎖
protected boolean tryRelease(int unused) {
// 將獨占線程設(shè)置為null
setExclusiveOwnerThread(null);
// 使用cas方式更新當前同步狀態(tài)值為0
setState(0);
return true;
}
// 獨占式獲取鎖
public void lock() { acquire(1); }
// 嘗試以獨占式獲取鎖
public boolean tryLock() { return tryAcquire(1); }
// 獨占式釋放鎖
public void unlock() { release(1); }
// 判斷當前線程是否獨占著鎖
public boolean isLocked() { return isHeldExclusively(); }
// 線程啟動后中斷
void interruptIfStarted() {
Thread t;
// 標識線程中斷
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
說明:
- 里面大多實現(xiàn)的方法兔魂,獲取鎖烤芦,釋放鎖,判斷當前線程是否中斷都是AQS的cas操作同步狀態(tài)值state的方式析校。
AbstractQueuedSynchronizer同步隊列源碼 - Worker本身被設(shè)計為一個線程類构罗,需要初始化第一個執(zhí)行的線程以及線程Thread對象,線程執(zhí)行run方法的邏輯委派給外部的runWorker方法智玻。
runWorker方法遂唧,Worker的run線程執(zhí)行的方法;就是執(zhí)行線程任務(wù)吊奢。
getTask方法盖彭,從等待隊列獲取工作線程。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// 設(shè)置核心線程超時或者當前工作線程數(shù)大于核心線程數(shù)
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
// 超時keepAliveTime秒釋放當前工作線程
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// 當前工作線程阻塞
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
processWorkerExit方法页滚,線程池關(guān)閉召边,回收線程
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
// 工作線程數(shù)量減1
decrementWorkerCount();
// 獲取重入鎖
final ReentrantLock mainLock = this.mainLock;
// 上鎖
mainLock.lock();
try {
// 完成任務(wù)的數(shù)量增加
completedTaskCount += w.completedTasks;
// 工作線程集合移除工作線程
workers.remove(w);
} finally {
// 釋放鎖
mainLock.unlock();
}
// 終止線程池
tryTerminate();
// cas方式獲取ctl值
int c = ctl.get();
// 如果當前線程池接受新任務(wù)且處理排隊任務(wù)
// 或者線程池雖然不接受新任務(wù)但是還處理排隊任務(wù)
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
// min 線程池最小空閑數(shù)
// allowCoreThreadTimeOut允許核心線程在keepAliveTime時間等待之后是否允許生存
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果min == 0但是隊列不為空要保證有1個線程來執(zhí)行隊列中的任務(wù)
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 如果線程池中工作線程不為空,就直接返回了
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 線程池將不處理隊列中的任務(wù)并且等待隊列不為空裹驰,直接返回false
addWorker(null, false);
}
}
shutdown方法隧熙,將線程池的狀態(tài)轉(zhuǎn)換為SHUTDOWN并且終止所有線程
// 將線程池的狀態(tài)轉(zhuǎn)換為SHUTDOWN并且終止所有線程
public void shutdown() {
// 獲取重入鎖
final ReentrantLock mainLock = this.mainLock;
// 上鎖
mainLock.lock();
try {
// 檢查線程池關(guān)閉的權(quán)限
checkShutdownAccess();
// 將當前線程池的狀態(tài)轉(zhuǎn)換到目標狀態(tài)targetState
advanceRunState(SHUTDOWN);
// 中斷等待隊列中的空閑線程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
// 釋放鎖
mainLock.unlock();
}
// 將線程池的狀態(tài)轉(zhuǎn)換為終止狀態(tài)
tryTerminate();
}
// 將當前線程池的狀態(tài)轉(zhuǎn)換到目標狀態(tài)targetState
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
線程回收
問題:如果我們不設(shè)置線程池回收線程,那么線程池中的線程會怎么樣幻林。
public static void main(String[] args) {
ThreadPoolExecutor tp = new ThreadPoolExecutor(3,
5,
1,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10),
new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0 ; i < 5; i++) {
tp.execute(new Runnable() {
public void run() {
System.out.println(Thread.currentThread().getName());
}
});
}
}
說明:線程池中的線程執(zhí)行完任務(wù)后贞盯,當從等待隊列中獲取任務(wù)時音念,因為任務(wù)已經(jīng)執(zhí)行完了,沒有任務(wù)了躏敢,阻塞的工作隊列已經(jīng)空了闷愤,那么線程只能處于阻塞狀態(tài)了(這段答案在上面getTask方法的源碼)。
allowCoreThreadTimeOut方法父丰,設(shè)置線程超時回收(包括核心線程和非核心線程)
// 設(shè)置線程超時回收(包括核心線程和非核心線程)
public void allowCoreThreadTimeOut(boolean value) {
// 如果keepAliveTime設(shè)置不大于0肝谭,那么報出非法參數(shù)異常
if (value && keepAliveTime <= 0)
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
// value為true時向下執(zhí)行,allowCoreThreadTimeOut默認值為false
if (value != allowCoreThreadTimeOut) {
// allowCoreThreadTimeOut設(shè)置為true
allowCoreThreadTimeOut = value;
if (value)
// 中斷等待任務(wù)的線程
interruptIdleWorkers();
}
}
// 中斷等待任務(wù)的線程
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
// 中斷等待任務(wù)的線程
private void interruptIdleWorkers(boolean onlyOne) {
// 獲取重入鎖
final ReentrantLock mainLock = this.mainLock;
// 上鎖
mainLock.lock();
try {
// 遍歷線程集合
for (Worker w : workers) {
Thread t = w.thread;
// 允許嘗試獲取鎖且非中斷
if (!t.isInterrupted() && w.tryLock()) {
try {
// 將線程中斷
t.interrupt();
} catch (SecurityException ignore) {
} finally {
// 釋放鎖
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
// 釋放鎖
mainLock.unlock();
}
}
說明:通過設(shè)置allowCoreThreadTimeOut為true蛾扇,線程池中的線程如果阻塞的時間超過了keepAliveTime攘烛,那么將被標記為中斷標志。
還可以通過前面提到的關(guān)閉線程池shutdown方法:檢查線程池關(guān)閉的權(quán)限镀首、將當前線程池狀態(tài)轉(zhuǎn)為為SHUTDOWN坟漱、中斷等待(阻塞)的線程。
源碼閱讀總結(jié)
1. 線程池采用ThreadFactory中默認的DefaultThreadFactory實現(xiàn)類來創(chuàng)建的更哄,使用工廠方法模式來設(shè)計的線程工廠創(chuàng)建線程芋齿。(而工廠方法模式對于我的理解:一個產(chǎn)物對應一個工廠)
2. 線程池的任務(wù)飽和策略?
AbortPolicy:直接拋出RejectedExecutionException異常成翩。
CallerRunsPolicy:只用調(diào)用者所在線程來運行任務(wù)觅捆。
DiscardOldestPolicy:丟棄隊列里最近的一個任務(wù),并執(zhí)行當前任務(wù)麻敌。
DiscardPolicy:不處理栅炒,丟棄掉。
3. 線程池什么時候啟動术羔?
答案:在new ThreadPoolExecutor類時并沒有啟動線程池赢赊,只是設(shè)置了參數(shù),而線程池的啟動是在執(zhí)行execute方法(源碼中是addWorker方法)级历。
4.線程池執(zhí)行任務(wù)的邏輯(execute方法的執(zhí)行邏輯)释移?
4.1. 工作線程數(shù) < 核心線程數(shù)(調(diào)用addWorker方法)
4.1.1 new Thread創(chuàng)建新線程作為當前工作線程,將當前工作線程Worker添加到workers集合
4.1.2 啟動當前線程(調(diào)用runWorker方法)
4.1.2.1 獲取當前線程寥殖,當前線程已執(zhí)行完畢并釋放玩讳,那么從阻塞隊列獲取任務(wù)(調(diào)用getTask)
4.1.2.1.1 如果等待隊列中存在了等待的任務(wù),那么取出等待隊列中第一個任務(wù)返回扛禽。
4.1.2.1.2 如果超過核心線程數(shù)的線程锋边,那么timed = true,同時隊列中已經(jīng)沒有任務(wù)了编曼,那么等待keepalivetime秒就釋放。
4.1.2.1.3 如果不超過(等于)核心線程數(shù)的線程剩辟,那么timed=false掐场,同時隊列中已經(jīng)沒有任務(wù)了往扔,那么執(zhí)行take方法阻塞。
4.1.2.1.3.1 如果此時進來一個新任務(wù)熊户,那么發(fā)現(xiàn)阻塞線程數(shù)(工作線程數(shù))不小于核心線程數(shù)萍膛,那么直接執(zhí)行offer插入等待隊列,等待隊列有值了嚷堡,阻塞的線程們又開始搶任務(wù)干活了蝗罗。
4.1.2.2 執(zhí)行我們自己的業(yè)務(wù)邏輯(調(diào)用run方法)
4.1.2.3 移除workers中的當前線程,回收線程(調(diào)用processWorkerExit方法)
4.1.3 移除workers中的當前線程蝌戒,回收線程(調(diào)用addWorkerFailed方法)
4.2. 工作線程數(shù) > 核心線程數(shù)
4.2.1 將當前線程加入到等待隊列BlockQueue<Runnable>
4.3. 如果等待隊列滿了
4.3.1 直接創(chuàng)建新線程作為工作線程執(zhí)行邏輯串塑。
4.4. 如果當前工作線程數(shù) > 最大線程數(shù)
4.4.1 拋出拒絕策略異常
5. 線程釋放策略?
①第一種策略:使用allowCoreThreadTimeOut方法對核心線程和非核心線程進行釋放(調(diào)用poll方法北苟,超出了keepAliveTime桩匪,自動釋放)。
②第二種策略:使用shutdown方法關(guān)閉線程池來對核心線程和非核心線程進行釋放(主動為Workers集合中的線程設(shè)置中斷標識友鼻,自動釋放)傻昙。
6. 線程回收處理?
getTask中使用cas自旋彩扔,達到-1時妆档,退出方法,回收線程虫碉。
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 工作線程的數(shù)量減1
decrementWorkerCount();
return null;
}
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}