上一小節(jié)主要講解了ThreadPoolExecutor的父類及其接口的源碼分析。本節(jié)將開始ThreadPoolExecutor的源碼分析饥瓷,我們將從源碼的角度分析線程池的實現(xiàn)原理谒出。首先講解和狀態(tài)相關(guān)的成員變量和函數(shù)叨橱,接著講解工作線程類Worker的實現(xiàn),然后從構(gòu)造函數(shù)稚伍、提交方法、線程池的兩種關(guān)閉操作入手分析線程池的工作原理戚宦,最后講解線程池的拒絕策略个曙,以及使用線程池時應(yīng)該注意的事項。本文內(nèi)容較多受楼,如果真的想掌握該內(nèi)容垦搬,請耐心閱讀。
線程池狀態(tài)
- 狀態(tài)相關(guān)成員變量
ctl:該變量是最重要的一個變量艳汽,貫穿于整個線程池的實現(xiàn)邏輯猴贰。類型為AtomicInteger類型,是一個32位原子int型數(shù)字河狐,在實現(xiàn)中米绕,其高三位用來表示線程池的狀態(tài)瑟捣,低29位用來表示線程池中線程個數(shù),默認值為ctlOf(RUNNING, 0)栅干,也就是狀態(tài)為running迈套,線程池個數(shù)為0
count_bits : 值為29, 即低count_bits位表示的數(shù)值就是線程池中線程的個數(shù)
capacity:2^29-1 即29位能夠表示的最大的數(shù)值碱鳞,表示線程池中能包含線程個數(shù)的最大值
線程池的狀態(tài)使用ctl的高三位表示:
狀態(tài) | 高三位值 | 含義 |
---|---|---|
running | 100 | 運行狀態(tài)桑李,數(shù)值為負數(shù) |
shutdown | 000 | 關(guān)閉狀態(tài),調(diào)用shutdown()方法 |
stop | 001 | 關(guān)閉狀態(tài)劫笙,調(diào)用shutdownNow()方法芙扎,與shutdown狀態(tài)稍有不同 |
tidying | 010 | 線程池關(guān)閉的后處理狀態(tài) |
terminated | 011 | 終止狀態(tài),這才是最終關(guān)閉 |
從上圖可以看出填大,線程池處于運行狀態(tài)時戒洼,其ctl值為負數(shù),并且狀態(tài)從上往下允华,數(shù)值是依次變大的圈浇,可以比較數(shù)值的大小來判斷線程的狀態(tài)
- 狀態(tài)轉(zhuǎn)化圖:
- 與狀態(tài)相關(guān)的方法
此類方法,基本都是根據(jù)ctl與狀態(tài)值比較大小而設(shè)定的方法靴寂,其中包括
函數(shù) | 含義 | 實現(xiàn) |
---|---|---|
runStateOf(int c) | 根據(jù)ctl獲取當前狀態(tài)磷蜀,即只保留高三位的值,后29位全為0 | c & ~capacity |
wokerCountOf(int c) | 根據(jù)ctl獲取當前線程數(shù)百炬,即低29位的值 | c & capacity |
ctlOf(int rs, int wc) | 根據(jù)當前狀態(tài)和線程數(shù)褐隆,反推出ctl的值,高三位與低29位拼接 | rs 或操作 wc |
runStateLessThan(int c, int s) | 當前狀態(tài)是否小于s狀態(tài) | c < s |
runStateAtLeast(int c, int s) | 當前狀態(tài)是否大于等于s狀態(tài) | c >= s |
isRunning(int c) | 是否為運行狀態(tài) | c < shutdown |
compareAndIncrementWorkerCount(int expect) | CAS操作+1剖踊,操作不一定成功 | ctl.compareAndSet(expect, expect + 1) |
compareAndDecrementWorkerCount(int expect) | CAS操作-1庶弃,操作不一定成功 | ctl.compareAndSet(expect, expect - 1) |
decrementWorkerCount() | ctl的值-1,操作一定成功 | 在死循環(huán)中處理德澈,直到-1操作成功 |
advanceRunState(int targetState) | 更新線程池狀態(tài)歇攻,必須保證更新成功 | 在死循環(huán)中處理 |
上述的方法在線程池的實現(xiàn)中會經(jīng)常的使用到,如果在看源碼的過程中碰到了梆造,可以到該表格進行查看
線程池的成員變量
類型 | 變量名 | 作用 |
---|---|---|
BlockingQueue<Runnable> | workQueue | 阻塞隊列缴守,當線程數(shù)達到規(guī)定線程數(shù)的最大值且沒有空閑線程執(zhí)行任務(wù)時,任務(wù)會被添加該隊列中 |
ReentrantLock | mainLock | 可重入鎖镇辉,控制并發(fā)操作 |
HashSet<Worker> | workers | 存放線程對象屡穗,實現(xiàn)為HashSet,不是線程安全的忽肛,操作時加鎖 |
Condition | termination | mainLock上的等待對象鸡捐,用于阻塞和喚醒調(diào)用awaitTermination()方法的線程 |
int | largestPoolSize | 記錄線程池已分配的最大線程數(shù) |
long | completedTaskCount | 記錄線程池完成的任務(wù)個數(shù) |
volatile ThreadFactory | threadFactory | 用于創(chuàng)建線程,可以設(shè)置線程名麻裁、線程組箍镜、優(yōu)先級等屬性 |
volatile RejectedExecutionHandler | handler | 拒絕策略 |
volatile long | keepAliveTime | 允許最大空閑時間 |
volatile boolean | allowCoreThreadTimeOut | 是否允許核心線程超時 |
volatile int | corePoolSize | 核心線程數(shù) |
volatile int | maximumPoolSize | 最大線程數(shù) |
RejectedExecutionHandler | defaultHandler | 默認拒絕策略 |
注意volatile類型的變量源祈,作用是為了保證各個線程間的可見性。對于沒有volatile修飾的變量色迂,在多線程環(huán)境下操作香缺,必須加鎖操作,進而保證線程間的可見性歇僧。
線程類-Worker
Woker類具有兩重身份
(1) Runnable對象
Worker內(nèi)部封裝了線程對象thread和任務(wù)firstTask图张,而自己本身又是線程對象thread的執(zhí)行對象。thread每處理完一個任務(wù)之后诈悍,便會去阻塞隊列中獲取任務(wù)祸轮,如果沒有任務(wù)就會等待,如果獲取到任務(wù)就繼續(xù)執(zhí)行侥钳。
(2)鎖對象-非可重入互斥鎖
Worker類繼承了AbstractQueuedSynchronizer類适袜,說明Worker也具有鎖的特性。同一個時刻舷夺,只能由一個線程操作苦酱。從其實現(xiàn)可以看出,該鎖不支持重入给猾。
- Worker類的成員變量
類型 | 變量名 | 作用 |
---|---|---|
Thread | thread | 由threadFactory創(chuàng)建 |
Runnable | firstTask | 由線程池使用者提交(submit()或者execute())任務(wù) |
volatile long | completedTasks | 該thread完成的任務(wù)數(shù)疫萤,用于統(tǒng)計 |
- 構(gòu)造函數(shù)
在Worker對象構(gòu)造完成之后,調(diào)用run()方法(此處的run指的是Worker的run(),該方法由線程池在添加Worker時啟動線程對象thread時主動調(diào)用)之前敢伸,其Worker對象處于不可被獲取狀態(tài)(即鎖的state位-1扯饶, 0表示可以被獲取, 1表示鎖被占用)池颈,同時也禁止中斷(加鎖的地方禁止響應(yīng)中斷尾序,只有在state為0時才會響應(yīng)中斷),其構(gòu)造方法如下:
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//this作為Runnable對象參數(shù)傳給線程類饶辙,那么線程類執(zhí)行時蹲诀,執(zhí)行的就是Worker的run()方法
this.thread = getThreadFactory().newThread(this);
}
設(shè)置鎖的狀態(tài)為-1斑粱,并對任務(wù)和線程賦值弃揽。
- run()方法
該方法只是簡單調(diào)用了runWorker(this);而runWorker()方法將是本節(jié)的討論的重點方法。請耐心繼續(xù)往下看则北。
- 與鎖相關(guān)的方法
方法 | 作用 |
---|---|
isHeldExclusively | 鎖是否已被占有 |
tryAcquire | 在調(diào)用lock()方法的時候會間接調(diào)用該方法矿微,如果狀態(tài)為0,則可以獲取尚揣,否則阻塞等待 |
tryRelease | unlock()時間接調(diào)用該方法涌矢,由于是互斥鎖,只會有一個線程調(diào)用快骗,修改狀態(tài)信息無需使用CAS娜庇,不會產(chǎn)生競爭 |
上述三個方法是實現(xiàn)AbstractQueuedSynchronizer重寫的方法塔次,不提供給外界直接調(diào)用,外界應(yīng)該調(diào)用如下方法:
方法 | 作用 |
---|---|
lock | 獲取鎖名秀,阻塞方法励负,當獲取鎖時才會返回。調(diào)用acquire()方法匕得,此處其參數(shù)1沒有用继榆。間接調(diào)用tryAcquire方法 |
tryLock | 獲取鎖,非阻塞方法汁掠,不管是否獲取鎖略吨,都直接返回 |
unLock | 釋放鎖,調(diào)用release()方法考阱,間接調(diào)用tryRelease()方法翠忠,因為只會有一個線程獲取鎖,所以釋放時不存在競爭關(guān)系 |
isLocked | 判斷鎖是否被占用羔砾,調(diào)用isHeldExclusively()方法 |
上述方法類外部可以直接調(diào)用负间。
- interruptIfStarted()方法
該方法在調(diào)用shutdownNow()方法時被調(diào)用,用于終止線程姜凄。其源碼如下:
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
當線程在執(zhí)行任務(wù)時政溃,且線程不為null,線程沒有被中斷态秧,則中斷線程董虱,但是該中斷不會立即響應(yīng),因為work定義的鎖為不響應(yīng)中斷的鎖申鱼,當任務(wù)執(zhí)行完成釋放鎖之后才會響應(yīng)該中斷愤诱。
總結(jié):Worker在構(gòu)造函數(shù)中,通過getThreadFactory().newThread(this)創(chuàng)建線程;this就是Worker對象捐友,一個Runnable對象淫半。換句話說,執(zhí)行該線程的start()方法匣砖,執(zhí)行的就是Worker類的run()方法科吭。其次,Worker作為鎖的角色有兩個作用:一是在執(zhí)行任務(wù)時猴鲫,通過加鎖lock()來屏蔽到中斷操作对人,只有在unlock()后,也就是獲取任務(wù)getTask()方法里,才會響應(yīng)中斷拂共。第二個作用是牺弄,在線程池調(diào)用tryLock()試圖中斷線程時,只要有任務(wù)執(zhí)行宜狐,tryLock()就會返回false势告,通過tryLock()操作可以判斷線程是否空閑蛇捌。
線程池構(gòu)造方法
線程池的構(gòu)造都間接調(diào)用該構(gòu)造方法,需要合理的設(shè)置參數(shù)咱台,這個內(nèi)容在之前的文章中已經(jīng)講過了豁陆,此處不再講解。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
execute(Runnable command)方法
- 處理邏輯
- 如果當前線程個數(shù) < corePoolSize, 則新增一個線程(addWorker(command, true))執(zhí)行該任務(wù)吵护,成功則返回
- 如果添加失敗盒音,并且為運行狀態(tài),說明線程個數(shù)已經(jīng)>=corePoolSize馅而,則執(zhí)行第4步
- 如果添加失敗祥诽,并且為非運行狀態(tài),則執(zhí)行拒絕策略
- 如果是運行狀態(tài)瓮恭,且當前線程數(shù)>=corePoolSize雄坪,則將任務(wù)添加到阻塞隊列
- 如果第4步添加阻塞隊列失敗,說明阻塞隊列滿屯蹦,并且當前線程數(shù)<maximumPoolSize,則新增一個線程執(zhí)行該任務(wù)
- 如果第5步中新增線程失斘(這里包括2,3兩種情況(2的話就是>=maximumPoolSize)),則執(zhí)行拒絕策略
更加詳細的實現(xiàn)登澜,請參照源碼注釋理解阔挠,源碼如下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//如果當前線程數(shù)<核心線程數(shù)
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//代碼執(zhí)行到這,存在兩種情況
//1. 當前線程數(shù)>=核心線程數(shù)
//2. 當前線程數(shù)<核心線程數(shù)脑蠕,但是添加Worker失敗购撼,失敗又有如下情況
//2.1 線程池的狀態(tài)為stop,tidying谴仙,terminated
//2.2 shutdown command不為null
//2.3 shutdown command為null迂求, 阻塞隊列也為空(這種情況不會出現(xiàn)在這里)
//如果是運行狀態(tài),說明屬于1這種情況晃跺,那么將任務(wù)添加到阻塞隊列揩局,此處使用offer()非阻塞方法
if (isRunning(c) && workQueue.offer(command)) {
//即便添加成功了,重新驗證一下狀態(tài)掀虎,如果狀態(tài)為非運行狀態(tài)凌盯,則一處該任務(wù),執(zhí)行決絕策略
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
//如果是運行狀態(tài)涩盾,保證工作線程存在
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//代碼運行到這里十气,存在兩種情況
//1. 不是運行狀態(tài) 也就是上面的2的情況(添加worker失敗的)励背,再次調(diào)用仍然會失敗
//2. 添加到阻塞隊列失敗的春霍,說明阻塞隊列滿
else if (!addWorker(command, false))
//1這種情況,肯定會執(zhí)行拒絕策略
//2這種情況叶眉,如果已達到線程數(shù)上限址儒,同樣會執(zhí)行拒絕策略
reject(command);
}
總結(jié):當線程狀態(tài)不為運行狀態(tài)時(不管是shutdown還是stop):都會執(zhí)行拒絕策略(這不是絕對的芹枷,shutdown狀態(tài)下,還是可以添加工作線程的(條件為shutdown狀態(tài)下 command==null 且 阻塞隊列非空莲趣,在該方法中不會出現(xiàn)該條件鸳慈,因為command做了非空判斷,這里只討論分支邏輯喧伞,不要考慮addWorker(null, false));這條語句)走芋,但是stop狀態(tài)卻不可以)。換句話說:執(zhí)行shutdown()或者shutdownNow()后潘鲫,不能夠再提交新任務(wù)
addWorker(Runnable firstTask, boolean core)方法
在execute()方法中翁逞,多次調(diào)用了addWorker()方法,addWorker()方法決定了添加一個工作線程的條件溉仑,如果符合添加一個線程的條件挖函,那么該方法會先保證將ctl的值加1,然后保證添加一個工作線程浊竟,并啟動該線程怨喘。
源碼如下:
private boolean addWorker(Runnable firstTask, boolean core) {
//在死循環(huán)中不斷判斷條件是否滿足添加一個工作線程
//如果不滿足條件啊,直接返回false
//如果滿足條件振定,則ctl值+1
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 不滿足條件的三種情況
// 1. 運行狀態(tài)為 stop, tidying, terminated
// 2. 運行狀態(tài)為 shutdown 且 任務(wù)不為空
// 3. 運行狀態(tài)為 shutdown 任務(wù)為空 且 任務(wù)線程為空(沒事可做必怜,為什么要增加線程)
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//代碼運行到這,存在兩種情況
// 1. 運行狀態(tài)為running
// 2. 運行狀態(tài)為shutdown后频, 任務(wù)為null棚赔,阻塞隊列不為空
for (;;) {
int wc = workerCountOf(c);
//如果線程數(shù)已經(jīng)達到上限,則返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//線程數(shù)ctl值+1徘郭, 設(shè)置失敗說明線程池環(huán)境發(fā)生了變化靠益,需要重試
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//代碼執(zhí)行到這里,說明線程數(shù)已經(jīng)成功+1残揉, 現(xiàn)在要添加一個worker胧后,并執(zhí)行
//標記worker是否已經(jīng)啟動
boolean workerStarted = false;
//標記worker是否已經(jīng)添加
boolean workerAdded = false;
Worker w = null;
try {
//新建worker對象
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//這里需要獲取全局鎖,因為存放worker的對象workers的實現(xiàn)是HashSet類型的抱环,線程不安全壳快,使用時需要加鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
//再次判斷線程池裝填
//1. 運行時狀
//2. shutdown 且 任務(wù)為空,才能執(zhí)行添加操作
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//如果線程已經(jīng)執(zhí)行了镇草,那么拋出異常眶痰,因為我們還沒有執(zhí)行start()
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//所有的事情都為了這一句,添加工作線程
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//如果添加成功則啟動該線程梯啤,線程啟動后會執(zhí)行runWorker()方法
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//如果沒有啟動成功竖伯,則應(yīng)該移除該worker
if (! workerStarted)
//該方法中存在ctl值-1的操作,保證計數(shù)和線程數(shù)一致
addWorkerFailed(w);
}
return workerStarted;
}
addWorkerFailed(Worker w)方法
當添加工作線程失敗時(并不是addWorker()方法返回false,而是workerStarted這個boolean值為false)七婴,需要刪除該worker祟偷,并且使ctl的值-1
private void addWorkerFailed(Worker w) {
//需要操作workers,類型為線程不安全的HashSet打厘,所以需要加鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
//執(zhí)行-1操作修肠,保證-1成功
decrementWorkerCount();
//該方法的作用就是終止掉一個線程,讓線程能夠跳出runWorker()方法的循環(huán)户盯,而正常執(zhí)行完
//shudown(),shutdownNow()方法都會調(diào)用該方法嵌施,很重要
tryTerminate();
} finally {
mainLock.unlock();
}
}
runWorker(Worker w)方法
當一個線程啟動之后,會執(zhí)行該方法莽鸭。該方法將任務(wù)的處理邏輯加鎖艰管,保證不被中斷。通過從阻塞隊列獲取任務(wù)不斷執(zhí)行任務(wù)蒋川,當獲取的任務(wù)為null時牲芋,該線程退出,需要執(zhí)行線程退出后的后處理邏輯捺球。
final void runWorker(Worker w) {
//當前線程即為worker中包含的線程對象
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
//將worker的鎖狀態(tài)置為0缸浦,這樣鎖可以被獲取
w.unlock();
//用來表示線程是否為正常退出,true:非正常退出氮兵,false:正常退出
boolean completedAbruptly = true;
try {
//不斷的從阻塞隊列中獲取任務(wù)(getTask())裂逐, 如果阻塞隊里為空,getTask()會阻塞
//getTask方法內(nèi)有超時判斷泣栈,如果工作線程多了卜高,同樣會通過超時返回null,讓該線程執(zhí)行完畢并退出
//遇到中斷南片,getTask()會返回null
//getTask()返回值為null掺涛,會正常停止該工作線程
while (task != null || (task = getTask()) != null) {
//加鎖,其他線程不能獲取該鎖(主要是調(diào)用shutdown()方法時會執(zhí)行tryLock()方法)
//加鎖的作用疼进,還可以屏蔽中斷薪缆,在加鎖和解鎖之間是不響應(yīng)中斷的
w.lock();
//如果狀態(tài)為stop, tidying伞广, terminated拣帽,則中斷
//如果為running,shutdown狀態(tài)嚼锄,且存在中斷的話减拭,清空中斷標記
//Thread.interrupted()方法會清除中斷標記
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
//該中斷會在getTask()里體現(xiàn)
wt.interrupt();
try {
//任務(wù)執(zhí)行前的執(zhí)行操作,空方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//任務(wù)執(zhí)行完成后執(zhí)行操作区丑,空方法
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
//代碼走到這代表是根據(jù)getTask()返回值為null退出的循環(huán)
completedAbruptly = false;
} finally {
//線程退出拧粪,執(zhí)行后處理操作修陡,從workers中刪除worker,ctl值-1
processWorkerExit(w, completedAbruptly);
}
}
總結(jié):從上述源碼中可以看出既们,shutdown狀態(tài)不會中斷線程,stop狀態(tài)會中斷線程正什,說明:shutdown狀態(tài)時啥纸,還會繼續(xù)執(zhí)行阻塞隊列中的任務(wù),stop狀態(tài)時婴氮,不會執(zhí)行阻塞隊列中的任務(wù)斯棒,這是兩個狀態(tài)的最主要的區(qū)別.
processWorkerExit(Worker w, boolean completedAbruptly)方法
該方法主要處理已存在的線程終止的后處理操作,分為正常中斷和非正常中斷兩種主经。對于非正常中斷的線程荣暮,需要通過新增工作線程的方法將線程補回來,對于正常中斷的線程罩驻,需要保證在阻塞隊列非空的情況有工作線程存在穗酥。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//如果是非正常啊中斷,那么先將ctl值-1
if (completedAbruptly)
decrementWorkerCount();
//獲取全局鎖惠遏,從workers中刪除worker砾跃,刪除前將該worker的執(zhí)行完成的任務(wù)數(shù)匯總到completedTaskCount
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
//只要減少worker的地方都會執(zhí)行該方法,主要是傳播停止信號
tryTerminate();
int c = ctl.get();
//如果狀態(tài)為running节吮, shutdown
if (runStateLessThan(c, STOP)) {
//正常終止抽高,那么判斷線程池中是否還有工作線程,如果沒有工作線程了透绩,那么新增工作線程
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//異常終止導(dǎo)致少了一個線程翘骂,應(yīng)該將該線程補回來
addWorker(null, false);
}
}
總結(jié):該方法只在線程意外終止時,ctl值-1帚豪,那么線程正常終止就不-1了嗎碳竟?這個問題找了好久,其實在runWorker()方法中的getTask()方法狸臣,當該方法返回null時瞭亮,代表著該線程正常終止,其中附加了ctl-1的操作固棚,所以正常終止的線程的ctl-1的操作是在getTask()方法中執(zhí)行的
getTask()方法
該方法的作用就是從阻塞隊列中獲取任務(wù)统翩,如果獲取不到任務(wù)就會阻塞,keepAliveTime的作用就在該方法中體現(xiàn)此洲。阻塞又分為兩種 有限阻塞poll()和完全阻塞take()厂汗,但是這兩個方法都會響應(yīng)中斷,(中斷的作用也是從這里體現(xiàn)出來的呜师,只有當線程池的狀態(tài)為stop娶桦, tidying,terminated,shutdown并且阻塞隊列為空時衷畦,該中斷才有效)栗涂,響應(yīng)中斷后,會重新執(zhí)行g(shù)etTask()方法的邏輯祈争。并不是響應(yīng)中斷就會返回null斤程,有可能線程池狀態(tài)為running,執(zhí)行runWorker()方法時菩混,在執(zhí)行g(shù)etTask()方法時就來了中斷(該中斷信號并沒有被runWork()方法的邏輯清除掉)忿墅,那么盡管getTask()處理邏輯重試,還是不會返回null沮峡,做了雙重保證疚脐。
private Runnable getTask() {
//標記是否超時
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//返回空的條件,存在兩種
//1. 狀態(tài)為stop邢疙, tidying棍弄, terminated
//2. 狀態(tài)為shutdown 且 隊列為空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//ctl的值-1,這個操作一定要重視疟游,經(jīng)常找不到他
decrementWorkerCount();
return null;
}
//代碼執(zhí)行到這照卦,存在兩種情況
//1. running
//2. shutdown 阻塞隊列非空
int wc = workerCountOf(c);
//判斷是否處理超時
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//如果需要減少工作線程(線程數(shù)>maxinumPoolSize,或者處理超時并超時兩種情況)
//并且存在可以減少的線程數(shù),那么就減少線程數(shù)乡摹,返回null役耕,結(jié)束一個線程
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//如果設(shè)置超時,那么就使用非阻塞的方法poll()聪廉,否則使用阻塞方法take()
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
//如果超時了瞬痘,為什么不能直接返回null,因為要確保wc > 1 || workQueue.isEmpty()
//所以又循環(huán)了一次
timedOut = true;
} catch (InterruptedException retry) {
//響應(yīng)中斷板熊,重試
timedOut = false;
}
}
}
tryTerminate()方法
該方法是關(guān)閉線程池的關(guān)鍵框全,涉及到了三種狀態(tài)的改變
具體細節(jié)請查看源碼注釋,源碼如下:
final void tryTerminate() {
//這是一個死循環(huán)干签,看到這種情況津辩,就需要注意返回值
for (;;) {
int c = ctl.get();
//直接返回的情況,有3種情況
// 1. 如果是運行狀態(tài)容劳,直接返回
// 2. 如果運行狀態(tài)為tidying喘沿, terminated(這兩種狀態(tài)時,說明已經(jīng)沒有工作線程了)
// 3. 如果是shutdown并且阻塞隊列非空竭贩,直接返回
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//代碼執(zhí)行到這里時蚜印,只有兩種情況
// 1. stop狀態(tài)
// 2. shutdown狀態(tài),且阻塞隊列為空
// 如果工作線程不為0留量,則中斷一個線程
if (workerCountOf(c) != 0) {
//中斷一個空閑線程
interruptIdleWorkers(ONLY_ONE);
return;
}
//代碼執(zhí)行到這窄赋,說明工作線程數(shù)為0,此處可能存在兩種狀態(tài)變化
//1. shutdown-->tidying
//2. stop--->tidying
//3. tidying-->terminated 執(zhí)行線程池關(guān)閉的后處理操作哟冬,terminated()空方法
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
//喚醒等待線程池關(guān)閉的線程(調(diào)用awaitTermination()方法的線程)
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
總結(jié):這里必須要明白為什么是中斷1個空閑線程。這是因為中斷1個空閑線程忆绰,就會執(zhí)行processWorkerExit(),而該方法內(nèi)部又調(diào)用了tryTerminate()方法浩峡,該方法在滿足關(guān)閉線程池的條件時會進而中斷一個空閑線程,調(diào)用processWorkerExit()方法错敢。因此只要條件滿足翰灾,所有的線程都會被中斷。因此該方法具有擴散中斷線程的作用伐债。
interruptIdleWorkers(boolean onlyOne)方法
如果參數(shù)為true预侯,則只中斷一個空閑線程致开,tryTerminate()方法中只傳入true
如果參數(shù)為false峰锁,則中斷所有的空閑線程
private void interruptIdleWorkers(boolean onlyOne) {
//獲取全局鎖,因為需要便利HashSet双戳,保證不能讓其他的線程修改HashSet的值
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
//tryLock()方法判斷該鎖有誤被占用虹蒋,占用說明在執(zhí)行任務(wù),返回false
//未占用飒货,表示空閑魄衅,返回true
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
//如果參數(shù)為true,則只中斷一個
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
shutdown()方法
該方法用于關(guān)閉線程池塘辅,將狀態(tài)修改為shutdown晃虫,存在以下幾點性質(zhì):
- 不能提交新任務(wù), 從excute()方法中可以看出
- 可以繼續(xù)執(zhí)行完阻塞隊列中的任務(wù)扣墩,從runWoker()方法中可以看出
- 終止所有的空閑線程
- 返回值為void
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//更新線程池狀態(tài)
advanceRunState(SHUTDOWN);
//中斷所有空閑線程哲银,調(diào)用interruptIdleWorkers(false)
interruptIdleWorkers();
//空方法,ScheduledThreadPoolExecutor使用
onShutdown();
} finally {
mainLock.unlock();
}
//又關(guān)閉線程的地方就有該方法
tryTerminate();
}
shutdownNow()方法
該方法用于關(guān)閉線程池呻惕,將狀態(tài)修改為stop狀態(tài)荆责,存在以下幾點性質(zhì):
- 不能提交新任務(wù),從 execute()方法可以看出
- 線程直接中斷亚脆,不在執(zhí)行任務(wù)做院,阻塞隊列的任務(wù)不在執(zhí)行,從runWorker()和interruptWorkers()方法可以看出
- 終止所有的線程 不管線程是否工作濒持,都終止键耕,任務(wù)的執(zhí)行是原子性的,不會出現(xiàn)執(zhí)行一半任務(wù)的情況
- 返回值為阻塞隊列中的任務(wù)列表
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//修改為stop狀態(tài)
advanceRunState(STOP);
//終止所有線程柑营,調(diào)用Worker的interruptIfStarted()方法
interruptWorkers();
//獲取阻塞隊列中的任務(wù)列表
tasks = drainQueue();
} finally {
mainLock.unlock();
}
//該方法必須調(diào)用郁竟,上面雖然是終止所有,但是并不定全部都能中斷掉由境,因為中斷是有條件的
tryTerminate();
//返回阻塞隊列中的任務(wù)列表
return tasks;
}
drainQueue()方法
使用阻塞隊列的drainTo()方法棚亩,獲取所有的任務(wù)蓖议,將任務(wù)添加到list中,并返回
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
//將任務(wù)添加到list中讥蟆,采用該方式效率高
q.drainTo(taskList);
//如果隊列中還存在元素勒虾,則一個個添加到list中
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
awaitTermination(long timeout, TimeUnit unit)方法
最長等待timeout時間,等待線程池完全關(guān)閉瘸彤,該方法沒有新的東西修然,就是判斷線程池的狀態(tài)是否為terminated狀態(tài),如果是則返回质况,如果不是愕宋,則等待timeout時間并添加到全局鎖的Conditon等待隊列中,如果超時返回false结榄。
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
線程池的拒絕策略
從上面的源碼講解中中贝,已經(jīng)知道了線程池在什么情況下會調(diào)用拒絕策略,合理的設(shè)置拒絕策略在項目的應(yīng)用中是十分重要的臼朗。線程池提供了四種拒絕策略邻寿,我們也可以根據(jù)實際的業(yè)務(wù)需求定義自己的拒絕策略,只需實現(xiàn)RejectedExecutionHandler接口视哑,在定義線程池時绣否,傳入我們自己定義的拒絕策略對象即可。
- CallerRunsPolicy
從名字就可以看出挡毅,該策略的意思就是調(diào)用者運行該任務(wù)蒜撮。那么調(diào)用者是誰?那就是提交任務(wù)的線程。
好處:就是線程池負載過重(或者是自己設(shè)置的參數(shù)有問題),調(diào)用者運行任務(wù)可以延緩提交任務(wù)的速度逻炊,任務(wù)不會丟失平绩。
壞處:加大主線程的處理時間
源碼如下:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
- AbortPolicy
該策略是線程池默認的拒絕策略,直接拋出異常,該異常時運行時異常,如果是線上服務(wù),一直拋異常怕是不好沐序。
其源碼如下:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
- DiscardPolicy
該策略就是什么都不做,假裝什么事情都沒有發(fā)生過堕绩,缺點也很明顯策幼,會丟任務(wù),對于不能丟任務(wù)的場景是不能用的奴紧,方法實現(xiàn)是空實現(xiàn)特姐,這里不貼代碼了。 - DiscardOldestPolicy
該策略會將最老的任務(wù)移除(阻塞隊列的對頭元素)黍氮,重新提交任務(wù)唐含。缺點就是任務(wù)還是會丟失浅浮,而且任務(wù)不一定能加入到阻塞隊列(如果此時,其他線程也提交任務(wù))
源碼實現(xiàn)如下:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
總結(jié)
使用線程池時捷枯,一般我們不會放任線程池中的線程和阻塞隊列占用的資源無限制的使用滚秩,因此一般會設(shè)定好線程數(shù),設(shè)置固定大小的阻塞隊列淮捆,由于阻塞隊列和線程的資源都分配了固定的郁油,就必須考慮任務(wù)的拒絕策略,至于使用什么拒絕策略攀痊,根據(jù)自己的業(yè)務(wù)需求選擇或者自定義拒絕策略桐腌。對于線程數(shù)量的把握,一定要經(jīng)過壓測之后確定苟径,具體業(yè)務(wù)情景具體分析案站。
此外,還要做好線程池的隔離涩笤,不要讓一個線程池做所有的事情嚼吞,避免一個功能模塊出現(xiàn)問題影響到其他的功能盒件。還應(yīng)該做好降級應(yīng)對線程池崩潰的情況蹬碧,如果線程池崩潰,使用其他的處理邏輯取代炒刁,保證線上服務(wù)穩(wěn)定可用恩沽。