???????ThreadPoolExecutor顧名思義江耀,是一個(gè)線程池管理工具類萨咳,該類主要提供了任務(wù)管理截粗,線程的調(diào)度和相關(guān)的hook方法來控制線程池的狀態(tài)垮刹。
1.方法說明
任務(wù)管理主要方法如下:
public void execute(Runnable command);
public <T> Future<T> submit(Callable<T> task);
public <T> Future<T> submit(Runnable task, T result);
public Future<?> submit(Runnable task);
public void shutdown();
public List<Runnable> shutdownNow();
???????上述方法中达吞,execute()和submit()方法在有空閑線程存在的情況下會(huì)立即調(diào)用該線程執(zhí)行任務(wù),區(qū)別在于execute()方法是忽略任務(wù)執(zhí)行結(jié)果的荒典,而submit()方法則可以獲取結(jié)果酪劫。除此之外,ThreadPoolExecutor還提供了shutdown()和shutdownNow()方法用于關(guān)閉線程池寺董,區(qū)別在于shutdown()方法在調(diào)用之后會(huì)將任務(wù)隊(duì)列中的任務(wù)都執(zhí)行完畢之后再關(guān)閉線程池覆糟,而shutdownNow()方法則會(huì)直接關(guān)閉線程池,并且將任務(wù)隊(duì)列中的任務(wù)導(dǎo)出到一個(gè)列表中返回遮咖。
???????除上述用于執(zhí)行任務(wù)的方法外滩字,ThreadPoolExecutor還提供了如下幾個(gè)hook(鉤子)方法:
protected void beforeExecute(Thread t, Runnable r);
protected void afterExecute(Runnable r, Throwable t);
protected void terminated();
???????在ThreadPoolExecutor中這幾個(gè)方法默認(rèn)都是空方法,beforeExecute()會(huì)在每次任務(wù)執(zhí)行之前調(diào)用,afterExecute()會(huì)在每次任務(wù)結(jié)束之后調(diào)用麦箍,terminated()方法則會(huì)在線程池被終止時(shí)調(diào)用酗电。使用這幾個(gè)方法的方式就是聲明一個(gè)子類繼承ThreadPoolExecutor,并且在子類中重寫需要定制的鉤子方法内列,最后在創(chuàng)建線程池時(shí)使用該子類實(shí)例即可。
2.任務(wù)調(diào)度
a.相關(guān)參數(shù)
???????對(duì)于ThreadPoolExecutor的實(shí)例化背率,其主要有如下幾個(gè)重要的參數(shù):
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory, RejectedExecutionHandler handler);
- corePoolSize: 線程池核心線程的數(shù)量话瞧;
- maximumPoolSize: 線程池可創(chuàng)建的最大線程數(shù)量;
- keepAliveTime: 當(dāng)線程數(shù)量超過了corePoolSize指定的線程數(shù)寝姿,并且空閑線程空閑的時(shí)間達(dá)到當(dāng)前參數(shù)指定的時(shí)間時(shí)該線程就會(huì)被銷毀交排,如果調(diào)用過allowCoreThreadTimeOut(boolean value)方法允許核心線程過期,那么該策略針對(duì)核心線程也是生效的饵筑;
- unit: 指定了keepAliveTime的單位埃篓,可以為毫秒,秒根资,分架专,小時(shí)等;
- workQueue: 存儲(chǔ)未執(zhí)行的任務(wù)的隊(duì)列玄帕;
- threadFactory: 創(chuàng)建線程的工廠部脚,如果未指定則使用默認(rèn)的線程工廠;
- handler: 指定了當(dāng)任務(wù)隊(duì)列已滿裤纹,并且沒有可用線程執(zhí)行任務(wù)時(shí)對(duì)新添加的任務(wù)的處理策略委刘;
b.調(diào)度策略
???????當(dāng)初始化一個(gè)線程池之后,池中是沒有任何用戶執(zhí)行任務(wù)的活躍線程的鹰椒,當(dāng)新的任務(wù)到來時(shí)锡移,根據(jù)配置的參數(shù)其主要的執(zhí)行任務(wù)如下:
- 若線程池中線程數(shù)小于corePoolSize指定的線程數(shù)時(shí),每來一個(gè)任務(wù)漆际,都會(huì)創(chuàng)建一個(gè)新的線程執(zhí)行該任務(wù)淆珊,無論線程池中是否已有空閑的線程;
- 若當(dāng)前執(zhí)行的任務(wù)達(dá)到了corePoolSize指定的線程數(shù)時(shí)奸汇,也即所有的核心線程都在執(zhí)行任務(wù)時(shí)套蒂,此時(shí)來的新任務(wù)會(huì)保存在workQueue指定的任務(wù)隊(duì)列中;
- 當(dāng)所有的核心線程都在執(zhí)行任務(wù)茫蛹,并且任務(wù)隊(duì)列中存滿了任務(wù)操刀,此時(shí)若新來了任務(wù),那么線程池將會(huì)創(chuàng)建新線程執(zhí)行任務(wù)婴洼;
- 若所有的線程(maximumPoolSize指定的線程數(shù))都在執(zhí)行任務(wù)骨坑,并且任務(wù)隊(duì)列也存滿了任務(wù)時(shí),對(duì)于新添加的任務(wù),其都會(huì)使用handler所指定的方式對(duì)其進(jìn)行處理欢唾。
c.調(diào)度策略注意點(diǎn)
- 在第二步中且警,當(dāng)前核心線程都在執(zhí)行任務(wù),并且任務(wù)隊(duì)列已滿時(shí)礁遣,會(huì)創(chuàng)建新的線程執(zhí)行任務(wù)斑芜,這里需要注意的是,創(chuàng)建新線程的時(shí)候當(dāng)前總共需要執(zhí)行的任務(wù)數(shù)是(corePoolSize + workQueueSize)祟霍,并不是只有corePoolSize個(gè)任務(wù)杏头;
- 在第三步中,這里workQueue主要有三種類型:ArrayBlockingQueue沸呐、LinkedBlockingQueue醇王、SynchronousQueue,第一個(gè)是有界阻塞隊(duì)列崭添,第二個(gè)是無界阻塞隊(duì)列寓娩,當(dāng)然也可以為其指定界限大小,第三個(gè)是同步隊(duì)列呼渣,對(duì)于ArrayBlockingQueue棘伴,其是需要指定隊(duì)列大小的,當(dāng)隊(duì)列存滿了任務(wù)線程池就會(huì)創(chuàng)建新的線程執(zhí)行任務(wù)屁置,對(duì)于LinkedBlockingQueue排嫌,如果其指定界限,那么和ArrayBlockingQueue區(qū)別不大缰犁,如果其不指定界限淳地,那么其理論上是可以存儲(chǔ)無限量的任務(wù)的,實(shí)際上能夠存儲(chǔ)Integer.MAX_VALUE個(gè)任務(wù)(還是相當(dāng)于可以存儲(chǔ)無限量的任務(wù))帅容,此時(shí)由于LinkedBlockingQueue是永遠(yuǎn)無法存滿任務(wù)的颇象,因而maxPoolSize的設(shè)定將沒有意義,一般其會(huì)設(shè)定為和corePoolSize相同的值并徘,對(duì)于SynchronousQueue遣钳,其內(nèi)部是沒有任何結(jié)構(gòu)存儲(chǔ)任務(wù)的,當(dāng)一個(gè)任務(wù)添加到該隊(duì)列時(shí)麦乞,當(dāng)前線程和后續(xù)添加任務(wù)的線程都會(huì)被阻塞蕴茴,直至有一個(gè)線程從該隊(duì)列中取出任務(wù),當(dāng)前線程才會(huì)被釋放姐直,因而如果線程池使用了該隊(duì)列倦淀,那么一般corePoolSize都會(huì)設(shè)計(jì)得比較小,maxPoolSize會(huì)設(shè)計(jì)得比較大声畏,因?yàn)樵撽?duì)列比較適合大量并且執(zhí)行時(shí)間較短的任務(wù)的執(zhí)行撞叽;
- 在第四步中姻成,DiscardPolicy和DiscardOldestPolicy一般不會(huì)配合SynchronousQueue使用,因?yàn)楫?dāng)同步隊(duì)列阻塞了任務(wù)時(shí)愿棋,該任務(wù)都會(huì)被拋棄科展;對(duì)于AbortPolicy,因?yàn)槿绻?duì)列已滿糠雨,那么其會(huì)拋出異常才睹,因而使用時(shí)需要小心;對(duì)于CallerRunsPolicy甘邀,由于當(dāng)有新的任務(wù)到達(dá)時(shí)會(huì)使用調(diào)用線程執(zhí)行當(dāng)前任務(wù)琅攘,因而使用時(shí)需要考慮其對(duì)服務(wù)器響應(yīng)的影響,并且還需要注意的是鹃答,相對(duì)于其他幾個(gè)策略,該策略不會(huì)拋棄任務(wù)到達(dá)的任務(wù)突硝,因?yàn)槿绻竭_(dá)的任務(wù)使隊(duì)列滿了而只能使用調(diào)用線程執(zhí)行任務(wù)時(shí)测摔,說明線程池設(shè)計(jì)得不夠合理,如果任其發(fā)展解恰,那么所有的調(diào)用線程都可能會(huì)被需要執(zhí)行的任務(wù)所阻塞锋八,導(dǎo)致服務(wù)器出現(xiàn)問題。
3.源碼講解
a.主要屬性
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 32
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 00011111 11111111 11111111 11111111
private static final int RUNNING = -1 << COUNT_BITS; // 11100000 00000000 00000000 00000000
private static final int SHUTDOWN = 0 << COUNT_BITS; // 00000000 00000000 00000000 00000000
private static final int STOP = 1 << COUNT_BITS; // 00100000 00000000 00000000 00000000
private static final int TIDYING = 2 << COUNT_BITS; // 01000000 00000000 00000000 00000000
private static final int TERMINATED = 3 << COUNT_BITS; // 01100000 00000000 00000000 00000000
???????由于ThreadPoolExecutor需要管理多種狀態(tài)护盈,并且還要記錄當(dāng)前執(zhí)行任務(wù)的線程的數(shù)量挟纱,如果使用多個(gè)變量,并發(fā)更新時(shí)管理將會(huì)非常復(fù)雜腐宋,這里ThreadPoolExecutor則主要使用一個(gè)AtomicInteger類型的變量ctl存儲(chǔ)所有主要的信息紊服。ctl是一個(gè)32位的整形數(shù)字,初始值為0胸竞,其最高的三位用于存儲(chǔ)當(dāng)前線程池的狀態(tài)信息欺嗤,主要有RUNNING,SHUTDOWN卫枝,STOP煎饼,TIDING和TERMINATED,分別表示運(yùn)行狀態(tài)校赤,關(guān)閉狀態(tài)吆玖,終止?fàn)顟B(tài),整理狀態(tài)和結(jié)束狀態(tài)马篮。這幾種狀態(tài)對(duì)應(yīng)的具體數(shù)值信息如上述代碼所示沾乘,這里需要注意的一點(diǎn)是,在ThreadPoolExecutor中浑测,這幾種狀態(tài)在數(shù)值上是從小到大依次增大的意鲸,并且狀態(tài)流轉(zhuǎn)也是依次往下的,這就為其判斷狀態(tài)信息提供了比較便利的方式,如當(dāng)需要判斷線程池狀態(tài)是否處于SHUTDOWN狀態(tài)時(shí)怎顾,只需要判斷其代表狀態(tài)位部分的值是否等于SHUTDOWN即可读慎。在ctl中,除了最高三位用于表示狀態(tài)外槐雾,其余位所代表的數(shù)值則指定了當(dāng)前線程池中正在執(zhí)行任務(wù)的線程數(shù)夭委。如下是操作ctl屬性的相關(guān)方法:
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
- runStateOf(int c): 用于獲取當(dāng)前線程池的狀態(tài),c為當(dāng)前線程池工作時(shí)的ctl屬性值募强;
- workerCountOf(int c): 用于獲取當(dāng)前線程池正在工作的線程數(shù)量株灸,c為當(dāng)前線程池工作時(shí)的ctl屬性值;
- ctlOf(int rs, int wc): 這里rs表示當(dāng)前線程的工作狀態(tài)擎值,wc則表示正在工作的線程數(shù)慌烧,該方法用于將這兩個(gè)參數(shù)組裝為一個(gè)ctl屬性值;
- runStateLessThan(int c, int s): 判斷當(dāng)前線程池狀態(tài)是否未達(dá)到指定狀態(tài)鸠儿,如前所述屹蚊,狀態(tài)流轉(zhuǎn)在數(shù)值上是依次增大的,因而這里只需要判斷其大小即可进每;
- runStateAtLeast(int c, int s): 用于判斷當(dāng)前線程池狀態(tài)是否至少處于某種狀態(tài)汹粤;
- isRunning(int c): 用于判斷當(dāng)前線程池是否處于正常運(yùn)行狀態(tài);
- compareAndIncrementWorkerCount(int expect): 增加當(dāng)前線程池的工作線程數(shù)量值田晚;
- compareAndDecrementWorkerCount(int expect): 減少當(dāng)前線程池的工作線程數(shù)量值嘱兼。
b.主要方法
???????對(duì)于線程池的execute()和submit()方法,其實(shí)在底層submit()方法會(huì)將傳入的任務(wù)封裝為一個(gè)FutureTask對(duì)象贤徒,由于FutureTask對(duì)象是實(shí)現(xiàn)了Runnable接口的芹壕,因而其也可以當(dāng)做一個(gè)任務(wù)執(zhí)行,這里就是將封裝后的FutureTask對(duì)象傳遞給execute()方法執(zhí)行的接奈。我們這里則主要講解execute()方法的實(shí)現(xiàn)方式哪雕,如下是execute()方法的代碼:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get(); // 獲取當(dāng)前線程池狀態(tài)
if (workerCountOf(c) < corePoolSize) {
// 當(dāng)工作線程數(shù)小于核心線程數(shù)時(shí),則調(diào)用addWorker()方法創(chuàng)建線程并執(zhí)行任務(wù)
if (addWorker(command, true))
return;
c = ctl.get(); // 若添加失敗鲫趁,則更新當(dāng)前線程池狀態(tài)
}
// 執(zhí)行到此處斯嚎,則說明線程池中的工作線程要么大于等于核心線程數(shù),要么當(dāng)前線程池已經(jīng)被命令關(guān)閉了(addWorker方法添加失敗的原因)挨厚,因而這里判斷線程池是否為RUNNING狀態(tài)堡僻,是則將任務(wù)添加到任務(wù)隊(duì)列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 添加隊(duì)列成功后雙重驗(yàn)證,確保線程池處于正確狀態(tài)
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false); // 若線程池中沒有線程疫剃,則創(chuàng)建一個(gè)新線程執(zhí)行添加的任務(wù)
} else if (!addWorker(command, false))
reject(command); // 線程池至少處于SHUTDOWN狀態(tài)钉疫,拒絕當(dāng)前任務(wù)的執(zhí)行
}
???????在execute()方法中,其首先判斷線程池工作線程數(shù)是否小于核心線程數(shù)巢价,是則創(chuàng)建核心線程執(zhí)行任務(wù)牲阁,添加失敗或者工作線程數(shù)大于等于核心線程數(shù)時(shí)固阁,則將任務(wù)添加到任務(wù)隊(duì)列中,添加成功后會(huì)進(jìn)行雙重驗(yàn)證確保當(dāng)前線程池處于正確的狀態(tài)城菊,并且確保當(dāng)前有可用的線程執(zhí)行新添加的任務(wù)备燃。由此可見對(duì)于execute()方法的實(shí)現(xiàn),其比較核心的方法是addWorker()方法凌唬,如下是addWorker()方法的實(shí)現(xiàn)方式:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c); // 獲取當(dāng)前運(yùn)行狀態(tài)
// 判斷當(dāng)前線程池是否至少為SHUTDOWN狀態(tài)并齐,并且firstTask和任務(wù)隊(duì)列中沒有任務(wù),是則直接返回
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 判斷是否工作線程數(shù)大于可記錄的最大線程數(shù)客税,或者工作線程超過了指定的核心線程或者最大線程數(shù)
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 走到這一步說明當(dāng)前線程池處于RUNNING狀態(tài)况褪,或者任務(wù)隊(duì)列存在任務(wù),并且工作線程數(shù)不超過
// 指定的線程數(shù)量更耻,那么就增加工作線程數(shù)量测垛,成功則繼續(xù)往下執(zhí)行,失敗則重復(fù)上述添加步驟
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
// 記錄工作線程數(shù)的變量已經(jīng)更新秧均,接下來創(chuàng)建線程執(zhí)行任務(wù)
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); // 創(chuàng)建一個(gè)工作者對(duì)象
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
// 重新檢查線程池狀態(tài)食侮,或者是判斷當(dāng)前是SHUTDOWN狀態(tài),而firstTask為空熬北,這說明任務(wù)隊(duì)列此時(shí)不為空
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w); // 將創(chuàng)建的工作者添加到工作者集合中
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s; // 更新已使用的最大線程數(shù)
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // 工作者對(duì)象成功創(chuàng)建之后疙描,調(diào)用該工作者執(zhí)行任務(wù)
workerStarted = true;
}
}
} finally {
if (!workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
???????在addWorker()方法中诚隙,其首先檢查當(dāng)前線程池是否處于RUNNING狀態(tài)讶隐,或者處于SHUTDOWN狀態(tài),但是任務(wù)隊(duì)列中還存在有任務(wù)久又,那么其就會(huì)創(chuàng)建一個(gè)新的Worker對(duì)象巫延,并且將其添加到工作者對(duì)象集合中,然后調(diào)用工作者對(duì)象所維護(hù)的線程執(zhí)行任務(wù)地消,如下是工作者對(duì)象的實(shí)現(xiàn)代碼:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
private static final long serialVersionUID = 6138294804551838833L;
final Thread thread; // 當(dāng)前工作者中執(zhí)行任務(wù)的線程
Runnable firstTask; // 第一個(gè)需要執(zhí)行的任務(wù)
volatile long completedTasks; // 當(dāng)前工作者完成的任務(wù)數(shù)
Worker(Runnable firstTask) {
// 默認(rèn)設(shè)置為-1炉峰,那么如果不調(diào)用當(dāng)前工作者的run()方法,那么其狀態(tài)是不會(huì)改變的脉执,
// 其他的線程也無法使用當(dāng)前工作者執(zhí)行任務(wù)疼阔,在run()方法調(diào)用的runWorker()方法中會(huì)
// 調(diào)用unlock()方法使當(dāng)前工作者處于正常狀態(tài)
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); // 使用線程工廠創(chuàng)建線程
}
public void run() {
runWorker(this); // 使用當(dāng)前工作者執(zhí)行任務(wù)
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
// 如果當(dāng)前線程已經(jīng)在執(zhí)行任務(wù),那么將其標(biāo)記為打斷狀態(tài)半夷,待其任務(wù)執(zhí)行完畢則終止任務(wù)的執(zhí)行
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
???????在工作者對(duì)象中婆廊,其主要維護(hù)了一個(gè)工作者線程,用于執(zhí)行任務(wù)巫橄。該工作者對(duì)象繼承了AbstractQueuedSynchronizer淘邻,用于控制當(dāng)前工作者工作狀態(tài)的獲取,并且其也實(shí)現(xiàn)了Runnable接口湘换,將主要任務(wù)的執(zhí)行封裝到run()方法中宾舅。如下是runWorker()方法的具體實(shí)現(xiàn):
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 重置Worker對(duì)象的狀態(tài)
boolean completedAbruptly = true;
try {
// 首先執(zhí)行工作者線程中的任務(wù)统阿,然后循環(huán)從任務(wù)隊(duì)列中獲取任務(wù)執(zhí)行
while (task != null || (task = getTask()) != null) {
w.lock();
// 檢查當(dāng)前線程池的狀態(tài),如果線程池被終止或者線程池終止并且當(dāng)前線程已被打斷
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 調(diào)用鉤子方法進(jìn)行預(yù)處理
Throwable thrown = null;
try {
task.run(); // 執(zhí)行任務(wù)
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // 調(diào)用鉤子方法進(jìn)行任務(wù)完成后的處理工作
}
} finally {
task = null; // 重置工作者的初始任務(wù)
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
???????可以看到筹我,在runWorker()方法中扶平,其首先會(huì)執(zhí)行工作者對(duì)象的初始化任務(wù),當(dāng)執(zhí)行完畢后會(huì)通過一個(gè)無限循環(huán)不斷在任務(wù)隊(duì)列中獲取任務(wù)執(zhí)行崎溃。如下是getTask()方法的源碼:
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 判斷當(dāng)前線程是否處于STOP狀態(tài)蜻直,或者處于SHUTDOWN狀態(tài),并且工作隊(duì)列是空的袁串,是則不返回任務(wù)
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 是否允許空閑線程過期
// 工作線程數(shù)大于最大允許線程數(shù)概而,或者線程在指定時(shí)間內(nèi)無法從工作隊(duì)列中獲取到新任務(wù),則銷毀當(dāng)前線程
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 允許核心線程過期或者工作線程數(shù)大于corePoolSize時(shí)囱修,從任務(wù)隊(duì)列獲取任務(wù)時(shí)會(huì)指定等待時(shí)間赎瑰,
// 否則會(huì)一直等待任務(wù)隊(duì)列中新的任務(wù)
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
???????可以看到,getTask方法首先會(huì)判斷當(dāng)前線程池狀態(tài)是否為STOP狀態(tài)破镰,或者是SHUTDOWN狀態(tài)餐曼,并且任務(wù)隊(duì)列是空的,是則不返回任務(wù)鲜漩,否則會(huì)根據(jù)相關(guān)參數(shù)從任務(wù)隊(duì)列中獲取任務(wù)執(zhí)行源譬。
???????以上execute()方法的主要實(shí)現(xiàn)步驟,在ThreadPoolExecutor中另一個(gè)至關(guān)重要的方法則是shutdown()方法孕似,以下是shutdown()方法的主要代碼:
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess(); // 檢查對(duì)線程狀態(tài)的控制權(quán)限
advanceRunState(SHUTDOWN); // 更新當(dāng)前線程池狀態(tài)為SHUTDOWN
interruptIdleWorkers(); // 打斷空閑的工作者
onShutdown(); // 鉤子方法踩娘,但是沒有對(duì)外公開,因?yàn)樵摲椒ㄖ挥邪L問權(quán)限
} finally {
mainLock.unlock();
}
tryTerminate();
}
???????在shutdown()方法中喉祭,其首先檢查當(dāng)前線程是否有修改線程狀態(tài)的權(quán)限养渴,然后將當(dāng)前線程池的狀態(tài)修改為SHUTDOWN,接著調(diào)用interruptIdleWorkers()方法中斷所有處于空閑狀態(tài)的線程泛烙,最后則是調(diào)用tryTerminate()方法嘗試將當(dāng)前線程池的狀態(tài)由SHUTDOWN修改為TERMINATED理卑,這里interruptIdleWorkers()方法最終會(huì)調(diào)用其重載方法interruptIdleWorkers(boolean)方法,該方法代碼如下:
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
???????可以看到蔽氨,該方法會(huì)遍歷所有的工作者對(duì)象藐唠,如果其處于空閑狀態(tài),則將其終止鹉究。對(duì)于處于工作狀態(tài)的線程宇立,由于在shutdown()方法中已經(jīng)將當(dāng)前線程池的狀態(tài)設(shè)置為SHUTDOWN,那么工作狀態(tài)的線程會(huì)將任務(wù)隊(duì)列中的任務(wù)都執(zhí)行完畢之后自動(dòng)銷毀坊饶。
???????本文主要講解了ThreadPoolExecutor的主要方法泄伪,線程池的調(diào)度方式,以及其核心功能的實(shí)現(xiàn)原理匿级,如本文有任何不當(dāng)之處蟋滴,敬請(qǐng)指正染厅,謝謝!