ThreadPoolExecutor
線(xiàn)程池的實(shí)現(xiàn)類(lèi)。
構(gòu)造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize——最大核心線(xiàn)程數(shù)。核心線(xiàn)程即使空閑也不會(huì)被銷(xiāo)毀寓搬,除非調(diào)用allowCoreThreadTimeOut(true)兄春。
- maximumPoolSize——線(xiàn)程池最多可運(yùn)行的線(xiàn)程數(shù)甜癞。該值不小于corePoolSize。
- keepAliveTime——非核心線(xiàn)程在空閑狀態(tài)的存活時(shí)間宰译。
- unit——keepAliveTime的時(shí)間單位。
- workQueue——存放等待執(zhí)行的任務(wù)的隊(duì)列敷扫,只有通過(guò)execute(Runnable)提交的任務(wù)才可能進(jìn)入該隊(duì)列。
- threadFactory——線(xiàn)程池創(chuàng)建通過(guò)該工廠(chǎng)創(chuàng)建線(xiàn)程哲身。
- handler——在線(xiàn)程池滿(mǎn)載的情況下脯丝,提交的任務(wù)交由handler處理堤器。
以上各參數(shù)均有響應(yīng)的setter方法裕膀。
ThreadPoolExecutor提供了四種handler:
- CallerRunsPolicy——在線(xiàn)程池未關(guān)閉的情況毅厚,直接在調(diào)用execute(Runnable)方法的線(xiàn)程執(zhí)行任務(wù)妆棒。
- AbortPolicy——拋出一個(gè)RejectedExecutionException。
- DiscardPolicy——丟棄
- DiscardOldestPolicy——丟棄等待隊(duì)列中最早提交的那個(gè)任務(wù),然后重新提交新的任務(wù)菱鸥。
ThreadPoolExecutor默認(rèn)使用AbortPolicy躯概。
線(xiàn)程池狀態(tài)和線(xiàn)程數(shù)量的指示字段
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl是一個(gè)32位原子整型献幔,高3位表示線(xiàn)程狀態(tài)(runState),剩余位代碼線(xiàn)程數(shù)量(workerCount)。所以線(xiàn)程池可容納的最大線(xiàn)程數(shù)是(2^29)-1术裸。
wokerCount未必和存活的線(xiàn)程數(shù)一致冻晤,比如使用ThreadFactory創(chuàng)建線(xiàn)程失敗時(shí)歼捏,或者線(xiàn)程終結(jié)前仍然進(jìn)行著某些工作時(shí)。
線(xiàn)程池狀態(tài)有以下幾種:
- RUNNING(-1)——可接收新的任務(wù),可執(zhí)行隊(duì)列中的任務(wù)歉井。
- SHUTDOWN(0)——不再接收新的任務(wù)仇参,仍可執(zhí)行隊(duì)列中的任務(wù)。
- STOP(1)——不再接收新的任務(wù),不再執(zhí)行隊(duì)列中的任務(wù)贱除,中斷正在執(zhí)行的任務(wù)难衰。
- TIDYING(2)——過(guò)渡狀態(tài)。所有任務(wù)都已終結(jié)凭峡,workerCount等于0,terminated()方法執(zhí)行之前的狀態(tài)≡侣澹可以覆寫(xiě)terminated()做一些清理工作钓株。
- TERMINATED(3)——terminated()執(zhí)行之后的狀態(tài)闰渔。
狀態(tài)的更改是遞增的席函,可能的狀態(tài)改變?nèi)缦拢?/p>
- RUNNING -> SHUTDOWN——調(diào)用了shutdown()方法,可能是通過(guò)finalize()隱式調(diào)用的冈涧。
- (RUNNING or SHUTDOWN) -> STOP——調(diào)用了shutdownNow()茂附。
- SHUTDOWN -> TIDYING——線(xiàn)程池和隊(duì)列都為空。
- STOP -> TIDYING——線(xiàn)程池為空督弓。
- TIDYING -> TERMINATED——terminated()方法執(zhí)行完畢何之。
awaitTermination()方法在狀態(tài)為T(mén)ERMINATED時(shí)返回。
public void execute(Runnable command)
提交任務(wù)有幾種情況:
- 工作線(xiàn)程數(shù) < 核心線(xiàn)程數(shù)——?jiǎng)?chuàng)建新的核心線(xiàn)程咽筋,并處理該任務(wù)溶推。
- 工作線(xiàn)程數(shù) >= 核心線(xiàn)程數(shù),隊(duì)列未滿(mǎn)——添加到隊(duì)列奸攻。
- 隊(duì)列滿(mǎn)蒜危,工作線(xiàn)程數(shù) < 最大線(xiàn)程數(shù)——?jiǎng)?chuàng)建非核心線(xiàn)程處理任務(wù)。
- 交由handler處理睹耐。
public boolean prestartCoreThread()
手動(dòng)啟動(dòng)一個(gè)核心線(xiàn)程辐赞,這樣新來(lái)的任務(wù)就可以直接運(yùn)行,從而減少線(xiàn)程啟動(dòng)的時(shí)間硝训。如果所有核心線(xiàn)程都已啟動(dòng)响委,返回false新思。
public int prestartAllCoreThreads()
手動(dòng)啟動(dòng)所有核心線(xiàn)程。返回啟動(dòng)的核心線(xiàn)程數(shù)赘风。
public void allowCoreThreadTimeOut(boolean value)
設(shè)置空閑時(shí)夹囚,核心線(xiàn)程是否允許超時(shí)關(guān)閉。存活時(shí)間同非核心線(xiàn)程邀窃。
public boolean allowsCoreThreadTimeOut()
查詢(xún)核心線(xiàn)程在空閑時(shí)荸哟,是否允許超時(shí)關(guān)閉。
public BlockingQueue<Runnable> getQueue()
獲取等待隊(duì)列瞬捕。
public boolean remove(Runnable task)
從等待隊(duì)列中刪除task鞍历。
public void purge()
將等待隊(duì)列中所有已經(jīng)取消的Future任務(wù)立即移除。
public int getActiveCount()
返回正在執(zhí)行任務(wù)的線(xiàn)程數(shù)肪虎。
public int getLargestPoolSize()
返回線(xiàn)程池中出現(xiàn)過(guò)的最大的線(xiàn)程數(shù)量劣砍,不大于最大線(xiàn)程數(shù)。
public long getTaskCount()
返回線(xiàn)程池總共執(zhí)行過(guò)的以及正在執(zhí)行的任務(wù)數(shù)扇救,該值是個(gè)大概值刑枝。
public long getCompletedTaskCount()
返回線(xiàn)程池總共執(zhí)行過(guò)的任務(wù)數(shù),該值是個(gè)大概值爵政。
以上是線(xiàn)程池的基本理解和使用,不想深究的話(huà)陶缺,到這里也就可以了钾挟。
下面是需要注意的點(diǎn)。
關(guān)于新建線(xiàn)程
線(xiàn)程池使用ThreadFactory來(lái)創(chuàng)建線(xiàn)程饱岸,如果沒(méi)有顯示提供ThreadFactory掺出,則使用默認(rèn)的Executors#DefaultThreadFactory來(lái)創(chuàng)建線(xiàn)程:
private static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
從以上代碼可以得到如下信息:
- 線(xiàn)程名稱(chēng)均是“pool-XX-thread-XX”形式。
- 均是非daemon線(xiàn)程苫费。
- 優(yōu)先級(jí)均是
Thread.NORM_PRIORITY
汤锨。
關(guān)于隊(duì)列選擇
常用的隊(duì)列選擇策略有以下幾種:
- 直接傳遞——使用SynchronousQueue同步隊(duì)列實(shí)現(xiàn)。該隊(duì)列不保存任務(wù)百框,可以理解為一個(gè)單純的管道闲礼。其“放入”和“取出”都是阻塞的,每個(gè)“放入”操作都要等待一個(gè)“取出”操作铐维,反之亦然柬泽。直接傳遞,通常要求最大線(xiàn)程數(shù)量不受限制嫁蛇,以避免新提交的任務(wù)交由handler處理锨并;但這就會(huì)導(dǎo)致線(xiàn)程數(shù)量不可控。該策略在任務(wù)間有內(nèi)部依賴(lài)時(shí)睬棚,可避免鎖住第煮。
- 無(wú)限隊(duì)列——使用一個(gè)無(wú)容量限制的隊(duì)列解幼,比如LinkedBlockingQueue(FIFO隊(duì)列)或者PriorityBlockingQueue(可自定義Comparator)。這樣在核心線(xiàn)程都在工作時(shí)包警,新的任務(wù)會(huì)被添加到隊(duì)列中撵摆,最大線(xiàn)程數(shù)不會(huì)超過(guò)核心線(xiàn)程數(shù),也就是說(shuō)maximumPoolSize這個(gè)參數(shù)將不起作用揽趾。該策略適合任務(wù)間完全獨(dú)立台汇,相互不影響的情況。所謂無(wú)限篱瞎,并非是真的無(wú)限苟呐,只是容量非常大而已,可能是Integer.MAX_VALUE俐筋,也可能是其他值牵素。
- 有限隊(duì)列——比如ArrayBlockingQueue〕握撸可以避免資源的過(guò)度消耗笆呆,但控制起來(lái)比較復(fù)雜,需要權(quán)衡隊(duì)列容量和最大線(xiàn)程數(shù)的關(guān)系:使用大隊(duì)列和小線(xiàn)程數(shù)量粱挡,會(huì)減少CPU的使用赠幕,以及操作系統(tǒng)資源的占用,但會(huì)降低吞吐率询筏;使用小隊(duì)列和大線(xiàn)程數(shù)榕堰,可以充分利用CPU資源,但可能會(huì)加大調(diào)度開(kāi)支嫌套,同樣降低吞吐率逆屡。
關(guān)于RejectedExecutionHandler
上面介紹了四種默認(rèn)的RejectedExecutionHandler,同樣也可以自己定義踱讨。需要注意的是魏蔗,RejectedExecutionHandler的選擇需要參照線(xiàn)程數(shù)量和隊(duì)列選擇策略。比如無(wú)限隊(duì)列的情況痹筛,可以隨意設(shè)置莺治。
關(guān)于覆寫(xiě)ThreadPoolExecutor
ThreadPoolExecutor提供了3個(gè)protected的hook方法。beforeExecute(Thread, Runnable)
和afterExecute(Runnable, Throwable)
分別在每個(gè)任務(wù)的執(zhí)行前/后調(diào)用帚稠,terminated()
方法在線(xiàn)程池終結(jié)時(shí)調(diào)用产雹。
以下是一個(gè)覆寫(xiě)的例子,添加了pause|resume方法:
class PausableThreadPoolExecutor extends ThreadPoolExecutor {
private boolean isPaused;
private ReentrantLock pauseLock = new ReentrantLock();
private Condition unpaused = pauseLock.newCondition();
public PausableThreadPoolExecutor(...) {
super(...);
}
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
pauseLock.lock();
try {
while (isPaused)
unpaused.await();
} catch (InterruptedException ie) {
t.interrupt();
} finally {
pauseLock.unlock();
}
}
public void pause() {
pauseLock.lock();
try {
isPaused = true;
} finally {
pauseLock.unlock();
}
}
public void resume() {
pauseLock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
pauseLock.unlock();
}
}
}
下面學(xué)習(xí)線(xiàn)程池的實(shí)現(xiàn)原理翁锡。
線(xiàn)程在哪里
線(xiàn)程池的線(xiàn)程由內(nèi)部類(lèi)Worker持有蔓挖。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker. */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Worker類(lèi)繼承自AbstractQueuedSynchronizer,該父類(lèi)這里不深究馆衔,只需要知道它是一個(gè)鎖的實(shí)現(xiàn)即可瘟判。Worker類(lèi)還實(shí)現(xiàn)了Runnable接口怨绣。
由構(gòu)造方法可知,Worker在創(chuàng)建時(shí)會(huì)通過(guò)ThreadFactory.newThread
方法創(chuàng)建一個(gè)線(xiàn)程拷获,并將自身作為Runnable對(duì)象傳遞給該線(xiàn)程篮撑。
protected修飾的方法是覆寫(xiě)的父類(lèi)方法,暫且不用管匆瓜。lock赢笨、tryLock、unlock驮吱、isLocked是自身鎖的調(diào)用方法茧妒。
Worker的run方法中調(diào)用了runWorker方法。如下:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();//獲取Worker類(lèi)的thread
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {//getTask為阻塞方法
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||//當(dāng)前線(xiàn)程池狀態(tài)至少為STOP左冬,不考慮offset桐筏,其值為1。則滿(mǎn)足的狀態(tài)為STOP拇砰、TIDYING梅忌、TERMINATED。
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);//hook方法除破,可重寫(xiě)
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);//hook方法牧氮,可重寫(xiě)
}
} finally {
task = null;
w.completedTasks++;//已完成的任務(wù)數(shù)加1
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);//Worker退出時(shí)的清理操作
}
}
由源碼可知,該方法不斷從隊(duì)列中取出任務(wù)瑰枫,交由該Worker所在的線(xiàn)程執(zhí)行踱葛,是執(zhí)行任務(wù)的最終場(chǎng)所。
順騰摸瓜躁垛,這里涉及到getTask
和processWorkerExit
兩個(gè)方法剖毯。
先看processWorkerExit:
/**
* @param completedAbruptly if the worker died due to user exception
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();//由異常引起的圾笨,需要ctl變量中存儲(chǔ)的工作線(xiàn)程數(shù)量減1教馆。
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);//workers是一個(gè)Set,存儲(chǔ)了所有活動(dòng)的Worker擂达。
} finally {
mainLock.unlock();
}
tryTerminate();//該方法留意一下土铺,下面會(huì)介紹
int c = ctl.get();
if (runStateLessThan(c, STOP)) {//線(xiàn)程池的狀態(tài)低于STOP,包括RUNNING板鬓、SHUTDOWN
if (!completedAbruptly) {//非異常引起的死亡悲敷,比如空閑狀態(tài)超過(guò)了存活時(shí)間。
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);//重新添加一個(gè)Worker俭令。
}
}
該方法為死掉的Worker做一些清理工作后德。首先將死亡的Worker已經(jīng)完成的任務(wù)數(shù)添加到線(xiàn)程池已完成的任務(wù)數(shù)中,緊接著從線(xiàn)程池的Worker集合中刪除該Worker抄腔,然后調(diào)用tryTerminate瓢湃,最后根據(jù)線(xiàn)程池的狀態(tài)等理张,判斷是否需要重新添加Worker到Worker集合中。
getTask方法從隊(duì)列中取出一個(gè)任務(wù)并返回绵患。在Worker由于一些原因退出時(shí)雾叭,返回null。這些原因包括:
- 線(xiàn)程數(shù)量超過(guò)maximumPoolSize(比如通過(guò)setMaximumPoolSize修改了最大線(xiàn)程數(shù)量)落蝙。
- 線(xiàn)程池被stop织狐。
- 線(xiàn)程池被關(guān)閉,并且隊(duì)列為空筏勒。
- 等待取出任務(wù)超時(shí)移迫,而超時(shí)的Worker需要終結(jié)。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);//線(xiàn)程池狀態(tài)
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {//對(duì)應(yīng)情況2奏寨、3
//該條件等價(jià)于if (rs >= STOP || (rs >= SHUTDOWN && workQueue.isEmpty()))
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);//當(dāng)前線(xiàn)程數(shù)量
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//當(dāng)前Worker是否允許超時(shí)關(guān)閉
//對(duì)應(yīng)情況1起意、4
if ((wc > maximumPoolSize || (timed && timedOut))//線(xiàn)程數(shù)量大于maximumPoolSize或者達(dá)到超時(shí)條件
&& (wc > 1 || workQueue.isEmpty())) {//并且此時(shí)隊(duì)列為空,或者線(xiàn)程數(shù)大于1(在隊(duì)列非空時(shí)病瞳,至少需要保留一個(gè)Worker來(lái)執(zhí)行任務(wù))
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ://允許超時(shí)關(guān)閉則調(diào)用poll超時(shí)阻塞方法
workQueue.take();//否則調(diào)用take揽咕,一直阻塞下去,直到新任務(wù)到來(lái)
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
在processWorkerExit方法中套菜,我們接觸到了addWorker方法亲善。
該方法根據(jù)線(xiàn)程池的狀態(tài)的最大線(xiàn)程數(shù)量,決定是否向線(xiàn)程池添加新的Worker逗柴。遇到以下條件之一時(shí)蛹头,添加失敗,返回false:
- 線(xiàn)程池已經(jīng)stop或者達(dá)到shutdown的條件戏溺。
- 當(dāng)前線(xiàn)程數(shù)達(dá)到上限渣蜗。
- ThreadFactory創(chuàng)建線(xiàn)程失敗。
Worker創(chuàng)建失敗時(shí)旷祸,會(huì)回滾一些數(shù)據(jù)耕拷。
/**
* @param firstTask 新Worker需要執(zhí)行的第一個(gè)任務(wù),可為null
* @param core 是否是核心線(xiàn)程
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry://添加一個(gè)標(biāo)簽
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))//對(duì)應(yīng)情況1
//等價(jià)于if (rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty()))
//之所以有firstTask != null是因?yàn)镾HUTDOWN后不再接收新任務(wù)托享。
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||//CAPACITY為線(xiàn)程池所能容納的最大線(xiàn)程數(shù)量2^29-1
wc >= (core ? corePoolSize : maximumPoolSize))//對(duì)應(yīng)情況2
return false;
if (compareAndIncrementWorkerCount(c))//ctl字段中的Worker數(shù)量加1骚烧,此時(shí)Worker還沒(méi)有被實(shí)際創(chuàng)建,
//也證實(shí)了講ctl字段時(shí)提到的“wokerCount未必和存活的線(xiàn)程數(shù)一致”
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)//線(xiàn)程池狀態(tài)發(fā)生變化時(shí)闰围,重新執(zhí)行前面的邏輯
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {//線(xiàn)程創(chuàng)建成功
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {//SHUTDOWN后不再接收新任務(wù)
if (t.isAlive()) // precheck that t is startable
//t.isAlive()返回true赃绊,則表明t.start已被調(diào)用過(guò),而正常來(lái)說(shuō)羡榴,此時(shí)還未調(diào)用t.start
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)//largestPoolSize是線(xiàn)程池中出現(xiàn)過(guò)的最大線(xiàn)程的數(shù)量
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);//執(zhí)行數(shù)據(jù)回滾
}
return workerStarted;
}
接下來(lái)看addWorkerFailed方法碧查。
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();//講ctl字段的workerCount減1
tryTerminate();//查看是否需要終結(jié)線(xiàn)程池
} finally {
mainLock.unlock();
}
}
processWorkerExit和addWorkerFailed都涉及到了tryTerminate方法。
僅tryTerminate方法會(huì)將線(xiàn)程池狀態(tài)置為T(mén)IDYING和TERMINATED校仑,且需要滿(mǎn)足以下條件之一:
- 當(dāng)前狀態(tài)為SHUTDOWN忠售,并且線(xiàn)程池和隊(duì)列都為空者冤。
- 當(dāng)前狀態(tài)為STOP,并且隊(duì)列為空档痪。
在執(zhí)行了可能導(dǎo)致線(xiàn)程池TERMINATED的操作后涉枫,必須調(diào)用該方法,比如減少了worker的數(shù)量腐螟,或者shutdown之后從隊(duì)列中移除了任務(wù)愿汰。
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||//線(xiàn)程池正在運(yùn)行中
runStateAtLeast(c, TIDYING) ||//線(xiàn)程池正在terminate
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))//或者不滿(mǎn)足條件1
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);//workerCount不為0時(shí),中斷一個(gè)空閑的worker乐纸,將中斷信號(hào)傳遞下去(如何傳遞請(qǐng)往下看)
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();//hook方法衬廷,可重寫(xiě)
} finally {
ctl.set(ctlOf(TERMINATED, 0));//此處可以看出TIDYING只是一個(gè)過(guò)渡態(tài)。
termination.signalAll();//用于喚醒a(bǔ)waitTermination阻塞方法
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
至此我們就見(jiàn)到了可重寫(xiě)的3個(gè)hook方法:beforeExecute
汽绢、afterExecute
吗跋、terminate
。
由termination.signalAll()
這一語(yǔ)句宁昭,簡(jiǎn)單看一下awaitTermination:
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
while (!runStateAtLeast(ctl.get(), TERMINATED)) {
if (nanos <= 0L)
return false;
nanos = termination.awaitNanos(nanos);//狀態(tài)不是TERMINATED跌宛,則阻塞下去,等待termination.signalAll()喚醒
}
return true;
} finally {
mainLock.unlock();
}
}
tryTerminate方法中調(diào)用了interruptIdleWorkers积仗,僅此處調(diào)用傳遞的onlyOne參數(shù)為true:
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
前面說(shuō)了疆拘,調(diào)用interruptIdleWorkers方法是為了將終結(jié)信號(hào)傳遞下去,那究竟是如何傳遞的呢寂曹?
- interruptIdleWorkers會(huì)中斷一個(gè)Worker哎迄。
- Worker中斷,則runWorker方法就會(huì)調(diào)用finally塊中的processWorkerExit方法隆圆,參數(shù)completedAbruptly為true漱挚。
- processWorkerExit方法中會(huì)再次調(diào)用tryTerminate方法,從而完成終結(jié)信號(hào)的傳遞渺氧。
至此我們就學(xué)習(xí)了線(xiàn)程池的實(shí)現(xiàn)原理旨涝。