觀看本文章之前遏考,最好看一下這篇文章熟悉下ThreadPoolExecutor基礎(chǔ)知識(shí)殴蓬。
1.關(guān)于Java多線程的一些常考知識(shí)點(diǎn)
2.看ThreadPoolExecutor源碼前的騷操作
講解本篇文章從下面一個(gè)例子開始,test1()和test2()方法都會(huì)拋出RejectedExecutionException異常趟章,ThreadPoolExecutor默認(rèn)的拒絕任務(wù)策略是AbortPolicy。test1()中線程池中corePoolSize和maximumPoolSize都為2慎王,阻塞隊(duì)列的長度是10蚓土,線程池最多能處理12個(gè)任務(wù)。當(dāng)超過12個(gè)任務(wù)時(shí)赖淤,就會(huì)拒絕新的任務(wù)蜀漆,拋出RejectedExecutionException。而test2()中的任務(wù)沒有超過線程池的閥值咱旱,但是在線程池調(diào)用shutdown()后嗜愈,線程池的狀態(tài)會(huì)變成shutdown,此時(shí)不接收新任務(wù)莽龟,但會(huì)處理正在運(yùn)行的任務(wù)和在阻塞隊(duì)列中等待處理的任務(wù)蠕嫁。所以我們?cè)趕hutdown()之后再調(diào)用submit(),會(huì)拋出RejectedExecutionException異常毯盈。有了這個(gè)例子的基礎(chǔ)剃毒,我們?cè)賮矸治鲈创a,會(huì)好過一點(diǎn)搂赋。
/**
* @author cmazxiaoma
* @version V1.0
* @Description: 分析拋出RejectedExecutionException問題
* @date 2018/8/16 14:35
*/
public class RejectedExecutionExceptionTest {
public static void main(String[] args) {
// test1();
test2();
}
/**
* 提交的任務(wù)數(shù)量超過其本身最大能處理的任務(wù)量
*/
public static void test1() {
CustomThreadPoolExecutor customThreadPoolExecutor =
new CustomThreadPoolExecutor(2, 2,
0L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10));
for (int i = 0; i < 13; i++) {
CustomThreadPoolExecutor.CustomTask customTask
= new CustomThreadPoolExecutor.CustomTask(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(60 * 60);
System.out.println("線程" + Thread.currentThread().getName()
+ "正在執(zhí)行...");
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}, "success");
if (i == 12) {
// throw RejectedExectionException
customThreadPoolExecutor.submit(customTask);
} else {
customThreadPoolExecutor.submit(customTask);
}
}
customThreadPoolExecutor.shutdown();
}
/**
* 當(dāng)線程池shutdown()后赘阀,會(huì)中斷空閑線程。但是正在運(yùn)行的線程和處于阻塞隊(duì)列等待執(zhí)行的線程不會(huì)中斷脑奠。
* shutdown(),不會(huì)接收新的線程基公。
*/
public static void test2() {
CustomThreadPoolExecutor customThreadPoolExecutor =
new CustomThreadPoolExecutor(2, 2,
0L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10));
for (int i = 0; i < 2; i++) {
CustomThreadPoolExecutor.CustomTask customTask
= new CustomThreadPoolExecutor.CustomTask(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(60 * 60);
System.out.println("線程" + Thread.currentThread().getName()
+ "正在執(zhí)行...");
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}, "success");
customThreadPoolExecutor.submit(customTask);
}
customThreadPoolExecutor.shutdown();
CustomThreadPoolExecutor.CustomTask customTask
= new CustomThreadPoolExecutor.CustomTask(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(60 * 60);
System.out.println("線程" + Thread.currentThread().getName()
+ "正在執(zhí)行...");
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}, "success");
customThreadPoolExecutor.submit(customTask);
}
}
源碼分析
線程池執(zhí)行過程
關(guān)于線程池執(zhí)行過程,我們看下面一幅圖宋欺,就能明白個(gè)大概轰豆。
1.當(dāng)線程池中的線程數(shù)量小于corePoolSize,就會(huì)創(chuàng)建新的線程來處理添加的任務(wù)直至線程數(shù)量等于corePoolSize胰伍。
2.當(dāng)線程池中的線程數(shù)量大于等于corePoolSize且阻塞隊(duì)列(workQueue)未滿,就會(huì)把新添加的任務(wù)放到阻塞隊(duì)列中酸休。
3.當(dāng)線程池中的線程數(shù)量大于等于corePoolSize且阻塞隊(duì)列滿了骂租,就會(huì)創(chuàng)建線程來處理添加的任務(wù)直到線程數(shù)量等于maximumPoolSize
4.如果線程池的數(shù)量大于maximumPoolSize,會(huì)根據(jù)RejectedExecutionHandler策略來拒絕任務(wù)。AbortPolicy就是其中的一種拒絕任務(wù)策略斑司。
submit
submit()相比于execute()而言渗饮,多了RunnableFuture<Void> ftask = newTaskFor(task, null);
這一步,把task包裝成RunnableFuture類型的ftask宿刮。所以submit()有返回值互站,返回值類型是Future<?>,可以通過get()獲取線程執(zhí)行完畢后返回的值。還可以通過isDone()
僵缺、isCancelled()
云茸、cancel(boolean mayInterruptIfRunning)
這些方法進(jìn)行某些操作。比如判斷線程是否執(zhí)行完畢谤饭、判斷線程是否被取消标捺,顯式取消啟動(dòng)的線程的操作。
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
execute
線程池去處理被提交的任務(wù)揉抵,很明顯通過execute()方法提交的任務(wù)必須要實(shí)現(xiàn)Runnable接口亡容。
我們來仔細(xì)看下execute()注釋,發(fā)現(xiàn)它說到:如果任務(wù)不能被成功提交得到執(zhí)行冤今,因?yàn)榫€程池已經(jīng)處于shutdown狀態(tài)或者是任務(wù)數(shù)量已經(jīng)達(dá)到容器上限闺兢,任務(wù)會(huì)被RejectedExecutionHandler處理進(jìn)行拒絕操作。很明顯戏罢,注釋已經(jīng)告訴上文拋出RejectedExecutionException異常的答案了屋谭。有時(shí)候真的要仔細(xì)看注釋!9旮狻桐磁!多看注釋,事半功倍讲岁。
我們來看execute()中做了什么操作我擂。
1.獲取線程池的狀態(tài),如果線程池中的線程數(shù)量小于corePoolSize缓艳,調(diào)用addWorker(command, true)
創(chuàng)建新的線程去處理command任務(wù)校摩。如果addWorker()返回失敗,我們?cè)俅潍@取線程池的狀態(tài)阶淘。因?yàn)閍ddWorker()失敗的原因可能有:線程池已經(jīng)處于shutdown狀態(tài)不接收新的任務(wù)或者是存在并發(fā)衙吩,在workerCountOf(c) < corePoolSize
這塊代碼后,有其他的線程創(chuàng)建了worker線程溪窒,導(dǎo)致worker線程的數(shù)量大于等于corePoolSize
2.如果線程池的數(shù)量大于等于corePoolSize坤塞,且線程池的狀態(tài)處于RUNNING狀態(tài)冯勉,我們將任務(wù)放到阻塞隊(duì)列中。當(dāng)任務(wù)成功放入阻塞隊(duì)列中尺锚,我們?nèi)匀恍枰粋€(gè)雙重校驗(yàn)的機(jī)制去判斷是否應(yīng)該創(chuàng)建新的線程去處理任務(wù)珠闰。
因?yàn)闀?huì)存在這些情況:有些線程在我們上次校驗(yàn)后已經(jīng)死掉惜浅、線程池在上次校驗(yàn)后突然關(guān)閉處于shutdown狀態(tài)瘫辩。考慮到這些原因坛悉,我們必須再次校驗(yàn)線程池的狀態(tài)伐厌。如果線程池的狀態(tài)不處于RUNNING狀態(tài),那么就行回滾操作裸影,把剛才入隊(duì)的任務(wù)移除掉挣轨,后續(xù)通過reject(command)
執(zhí)行拒絕任務(wù)策略。
如果線程池處于RUNNING狀態(tài)且線程池中線程數(shù)量等于0或者從阻塞隊(duì)列中刪除任務(wù)失敗(意味著:這個(gè)任務(wù)已經(jīng)被其他線程處理掉了)且線程池中線程數(shù)量等于0轩猩,那么調(diào)用addWorker(null, false)
新建一個(gè)worker線程卷扮,去消費(fèi)workQueue中里面的任務(wù)
3.如果線程池不處于RUNNING狀態(tài)或者任務(wù)無法成功入隊(duì)(此時(shí)阻塞隊(duì)列已經(jīng)滿了),此時(shí)需要?jiǎng)?chuàng)建新的線程擴(kuò)容至maximumPoolSize均践。如果addWorker(command, false)
返回false晤锹,那么通過reject(command)
執(zhí)行拒絕任務(wù)策略。
這里再嘮叨幾句彤委,調(diào)用addWorker()有這4種傳參的方式鞭铆,適用于不同場(chǎng)景。
1.addWorker(command, true)
當(dāng)線程池中的線程數(shù)量少于corePoolSize焦影,會(huì)把command包裝成worker并且放入到workers集合中车遂。如果線程池中的線程數(shù)量超過了corePoolSize,會(huì)返回false斯辰。
2.addWorker(command, false)
當(dāng)阻塞隊(duì)列滿了舶担,同樣會(huì)把command包裝成worker并且放入到worker集合中。如果線程池中的線程數(shù)量超過了maximumPoolSize,會(huì)返回false彬呻。
3.addWorker(null, false)
說明firstTask是個(gè)空任務(wù)柄沮,同樣把它包裝成worker并且放入到worker集合中。如果線程池中的數(shù)量超過了maximumPoolSize,會(huì)返回false废岂。這樣firstTask為空的worker在線程執(zhí)行的時(shí)候祖搓,也可以從阻塞隊(duì)列中獲取任務(wù)去處理。
4.addWorker(null, true)
:和上面一樣湖苞,只是線程池的線程數(shù)量限制在corePoolSize拯欧,超過也是返回false。使用它的有prestartAllCoreThreads()
和prestartCoreThread()
這2個(gè)方法财骨,其使用目的是預(yù)加載線程池中的核心線程镐作。
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
addWorker
addWorker()主要是創(chuàng)建新的線程藏姐,然后執(zhí)行任務(wù)。
1.首先判斷線程池的狀態(tài)是否滿足創(chuàng)建worker線程的要求该贾。
如果線程池的狀態(tài)大于SHUTDOWN狀態(tài)羔杨,那么此時(shí)處于STOP、TIDYING杨蛋、TERMINATE狀態(tài)兜材,不能創(chuàng)建worker線程,返回false。
如果線程池處于shutdown狀態(tài)且firstTask不等于null,此時(shí)也無法創(chuàng)建worker線程杯矩。因?yàn)樘幱趕hutdown狀態(tài)的線程池不會(huì)去接收新的任務(wù)雨让。
如果線程池處于shutdown狀態(tài)且firstTask等于null且workQueue阻塞隊(duì)列為空,此時(shí)就更沒有必要?jiǎng)?chuàng)建worker線程了。因?yàn)閒irstTask為null,就是為了創(chuàng)建一個(gè)沒有任務(wù)的worker線程去阻塞隊(duì)列里面獲取任務(wù)。而阻塞隊(duì)列都已經(jīng)為空户侥,那么再創(chuàng)建一個(gè)firstTask為null的worker線程顯然沒有什么意思,返回false即可峦嗤。
- 判斷線程池中的線程數(shù)量是否超過最大值蕊唐。當(dāng)core為true,最大值為corePoolSize。當(dāng)core為false寻仗,最大值為maximumPoolSize刃泌。如果超過最大值,也無法創(chuàng)建worker線程署尤,直接返回false即可耙替。如果沒有超過最大值,通過CAS操作讓當(dāng)前線程數(shù)加1,然后通過標(biāo)簽跳轉(zhuǎn)跳出循環(huán)體至
retry:
位置曹体。如果CAS操作失敗俗扇,說明workerCount被其他線程修改過。我們?cè)俅潍@取ctl箕别,判斷當(dāng)前線程池狀態(tài)和之前的狀態(tài)是否匹配铜幽。如果不匹配,說明線程池狀態(tài)發(fā)生變更串稀,繼續(xù)循環(huán)操作除抛。
3.通過傳入來的firstTask創(chuàng)建worker線程。Worker的構(gòu)造方法中通過setState(-1)
設(shè)置state(同步狀態(tài))為-1母截。Worker繼承了AbstractQueuedSynchronizer到忽,其本身是一把不可重入鎖。getThreadFactory().newThread(this)
創(chuàng)建新線程清寇,因?yàn)閃orker實(shí)現(xiàn)了Runnable接口喘漏,其本身也是一個(gè)可執(zhí)行的任務(wù)护蝶。
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
4.我們往workers添加worker線程時(shí),通過ReentrantLock保證線程安全翩迈。只有在當(dāng)前線程池處于RUNNING狀態(tài)或者是處于SHUTDOWN狀態(tài)且firstTask等于null的情況下持灰,才可以添加worker線程。如果worker線程已經(jīng)處于啟動(dòng)且未死亡的狀態(tài)负饲,會(huì)拋出IllegalThreadStateException異常堤魁。
添加完畢后,啟動(dòng)worker線程绽族。如果worker線程啟動(dòng)成功返回true姨涡,啟動(dòng)失敗調(diào)用addWorkerFailed()進(jìn)行回滾操作衩藤。
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker
我們來看下ThreadPoolExecutor的內(nèi)部類Worker吧慢,上文已經(jīng)說到Worker繼承了AbstractQueuedSynchronizer類且實(shí)現(xiàn)了Runnable接口。所以說是一個(gè)可執(zhí)行的任務(wù)赏表,也是一把不可重入鎖检诗,具有排他性。
1.我們創(chuàng)建Worker對(duì)象時(shí)瓢剿,默認(rèn)的state為-1逢慌。我們中斷的時(shí)候,要獲取worker對(duì)象的鎖(state從0 CAS到1)间狂。獲取鎖成功后攻泼,才能進(jìn)行中斷。這說明了在初始化worker對(duì)象階段鉴象,不允許中斷忙菠。只有調(diào)用了runWorker()
之后,將state置為0纺弊,才能中斷牛欢。
2.shutdown()中調(diào)用interruptIdleWorkers()中斷空閑線程和shutdownNow()中調(diào)用interruptWorkers()中斷所有線程。
interruptIdleWorkers()中中斷空閑線程的前提是要獲取worker對(duì)象的鎖淆游。
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
interruptWorkers()中中斷所有線程時(shí)傍睹,不用調(diào)用tryLock()獲取worker對(duì)象的鎖,最終是通過worker中的interruptIfStarted()來中斷線程犹菱。在這個(gè)方法中只有state大于等于0且線程不等于null且線程沒有被中斷過拾稳,才能進(jìn)行中斷操作。說明只有經(jīng)過了runworker()
階段才能進(jìn)行中斷操作腊脱。
這也是Worker為什么要設(shè)計(jì)成不可重入的原因访得,就是為了防止中斷在運(yùn)行中的任務(wù),只會(huì)中斷在等待從workQueue中通過getTask()獲取任務(wù)的線程(因?yàn)樗麄儧]有上鎖虑椎,此時(shí)state為0)震鹉。
以下這5種方法都會(huì)調(diào)用到interruptIdleWorkers()去中斷空閑線程俱笛。
setCorePoolSize()
setKeepAliveTime(long time, TimeUnit unit)
setMaximumPoolSize(int maximumPoolSize)
shutdown()
allowCoreThreadTimeOut(boolean value)
還有一點(diǎn)必須強(qiáng)調(diào)。Task沒有真正的被執(zhí)行传趾,執(zhí)行的是Work線程迎膜。Work線程中只是調(diào)用到了Task中的run()方法。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
runWorker
1.work線程啟動(dòng)后浆兰,會(huì)調(diào)用其run()方法磕仅。run()方法再去調(diào)用runWorker(this)方法。
2.執(zhí)行任務(wù)之前簸呈,獲取work線程中的task榕订,然后釋放worker的鎖。讓state狀態(tài)從-1 CAS到0蜕便。當(dāng)state為0劫恒,說明可以去中斷此線程。
3.以輪詢的方式通過getTask()從阻塞隊(duì)列中獲取task轿腺,當(dāng)task為null两嘴,跳出輪詢。
4.開始執(zhí)行任務(wù)的時(shí)候族壳,通過lock()獲取鎖憔辫,將state從0 CAS到1。任務(wù)執(zhí)行完畢時(shí)仿荆,通過unlock()釋放鎖贰您。
5.如果線程池處于STOP、TIDYING拢操、TERMINATE狀態(tài)锦亦,要中斷worker線程。
6.通過beforeExecute(wt, task)和afterExecute(task, thrown)對(duì)task進(jìn)行前置和后置處理庐冯。
7.在task.run()孽亲、beforeExecute(wt, task)、afterExecute(task, thrown)發(fā)生異常時(shí)都會(huì)導(dǎo)致worker線程終止展父。通過調(diào)用processWorkerExit(w, completedAbruptly)
來進(jìn)行worker退出操作返劲。
8.在getTask()獲取阻塞隊(duì)列中的任務(wù),如果隊(duì)列中沒有任務(wù)或者是獲取任務(wù)超時(shí)栖茉,都會(huì)調(diào)用processWorkerExit(w, completedAbruptly)
來進(jìn)行worker退出操作篮绿。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
getTask
上文已經(jīng)提起過getTask()方法,主要是從阻塞隊(duì)列獲取task的吕漂。那么條件下task會(huì)返回null呢亲配?我們可以通過注釋得到一些信息。
- 超過了maximumPoolSize設(shè)置的線程數(shù)量,因?yàn)檎{(diào)用了setMaximumPoolSize()方法吼虎。
- 線程池處于stop狀態(tài)犬钢。
- 線程池處于shutdown狀態(tài)且workQueue為空.
- 獲取任務(wù)等待超時(shí)。
1.首先獲取線程池運(yùn)行狀態(tài)思灰,如果線程池的狀態(tài)處于shutdown狀態(tài)且workQueue為空玷犹,或者處于stop狀態(tài)。然后調(diào)用decrementWorkerCount()遞減workerCount洒疚,最后返回null歹颓。
* Decrements the workerCount field of ctl. This is called only on
* abrupt termination of a thread (see processWorkerExit). Other
* decrements are performed within getTask.
*/
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
2.allowCoreThreadTimeOut默認(rèn)為false。為false的時(shí)候油湖,核心線程即時(shí)在空閑時(shí)也會(huì)保持活躍巍扛。為true的時(shí)候,核心線程在keepAliveTime時(shí)間范圍內(nèi)等待工作乏德。如果線程池的數(shù)量超過maximumPoolSize或者等待任務(wù)超時(shí)或者workQueue為空撤奸,那么直接通過CAS減少workerCount數(shù)量,返回null鹅经。
3.如果timed為true寂呛,通過workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
獲取task怎诫,等待時(shí)間超過了keepAliveTime還沒獲取到task瘾晃,直接返回null。如果timed為false幻妓,通過workQueue.take()
獲取task蹦误。如果沒有獲取到task,會(huì)一直阻塞當(dāng)前線程直到獲取到task(當(dāng)阻塞隊(duì)列中加入了新的任務(wù)肉津,會(huì)喚醒當(dāng)前線程)為止强胰。
4.如果獲取task成功,就直接返回妹沙。如果獲取task超時(shí)偶洋,timedOut會(huì)置為true,會(huì)在下一次循環(huán)中以返回null告終距糖。
再強(qiáng)調(diào)一點(diǎn)玄窝,只有當(dāng)線程池中的線程數(shù)量大于corePoolSize才會(huì)進(jìn)行獲取任務(wù)超時(shí)檢查,這也體現(xiàn)線程池中的一種策略:當(dāng)線程池中線程數(shù)量達(dá)到maximumPoolSize大小后悍引,如果一直沒有任務(wù)進(jìn)來恩脂,會(huì)逐漸減少workerCount直到線程數(shù)量等于corePoolSize。
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait, and if the queue is
* non-empty, this worker is not the last thread in the pool.
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
processWorkerExit
1.completedAbruptly為true趣斤,說明worker線程時(shí)突然終止俩块,說明執(zhí)行task.run()發(fā)生了異常,所以要通過CAS減少workerCount的數(shù)量。
2.completedAbruptly為false玉凯,說明worker線程是正常終止势腮,不需要對(duì)workerCount進(jìn)行減少的操作。因?yàn)樵趃etTask()中已經(jīng)做了此操作漫仆。
3.對(duì)worker完成的任務(wù)數(shù)進(jìn)行統(tǒng)計(jì)嫉鲸,并且從workers集合中移出。
4.調(diào)用tryTerminate()方法歹啼,嘗試終止線程池玄渗。如果狀態(tài)滿足的話,線程池還存在線程狸眼,會(huì)調(diào)用interruptIdleWorkers(ONLY_ONE)
進(jìn)行中斷處理藤树,使其進(jìn)入退出流程。如果線程池中的線程數(shù)量等于0的話拓萌,通過CAS把線程池的狀態(tài)更新到TIDYING岁钓。然后通過terminated()進(jìn)行一些結(jié)束的處理,最后通過CAS把線程池狀態(tài)更新到TERMINATED微王。最后的最后屡限,調(diào)用termination.signalAll()
喚醒等待的線程,通知它們線程池已經(jīng)終止炕倘。
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
5.獲取線程池的狀態(tài)钧大。如果線程池的狀態(tài)還處于RUNNING、SHUTDOWN罩旋,說明tryTerminate()
沒有成功啊央。如果worker線程是突然終止的話,通過addWorker(null, false)
再創(chuàng)建一個(gè)沒有task的worker線程去處理任務(wù)涨醋。
6.如果worker線程是正常終止的話瓜饥,且當(dāng)前線程池中的線程數(shù)量小于需要維護(hù)的數(shù)量,我們也會(huì)通過addWorker(null, false)
再創(chuàng)建一個(gè)沒有task的worker線程去處理任務(wù)浴骂。
7.默認(rèn)情況下allowCoreThreadTimeOut為false乓土,那么min就等于corePoolSize。那么線程池需要維護(hù)的線程數(shù)量就是corePoolSize個(gè)溯警。如果allowCoreThreadTimeOut為true趣苏,min就等于0。在workQueue不等于空的情況愧膀,min會(huì)被賦值成1拦键。此時(shí)線程池需要維護(hù)的線程池?cái)?shù)量是1。
如果線程池處于shutdown狀態(tài)檩淋,在workQueue不為空的情況下芬为,線程池始終會(huì)維護(hù)corePoolSize個(gè)線程萄金。當(dāng)workQueue為空的話,線程池會(huì)逐漸銷毀這corePoolSize個(gè)線程媚朦。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
尾言
大家好氧敢,我是cmazxiaoma(寓意是沉夢(mèng)昂志的小馬),感謝各位閱讀本文章询张。
小弟不才孙乖。
如果您對(duì)這篇文章有什么意見或者錯(cuò)誤需要改進(jìn)的地方,歡迎與我討論。
如果您覺得還不錯(cuò)的話,希望你們可以點(diǎn)個(gè)贊份氧。
希望我的文章對(duì)你能有所幫助唯袄。
有什么意見、見解或疑惑蜗帜,歡迎留言討論恋拷。
最后送上:心之所向,素履以往厅缺。生如逆旅蔬顾,一葦以航。