1、線程池介紹
在web開(kāi)發(fā)中毕莱,服務(wù)器需要接受并處理請(qǐng)求片任,所以會(huì)為一個(gè)請(qǐng)求來(lái)分配一個(gè)線程來(lái)進(jìn)行處理偏友。如果每次請(qǐng)求都新創(chuàng)建一個(gè)線程的話(huà)實(shí)現(xiàn)起來(lái)非常簡(jiǎn)便,但是存在一個(gè)問(wèn)題:
如果并發(fā)的請(qǐng)求數(shù)量非常多对供,但每個(gè)線程執(zhí)行的時(shí)間很短位他,這樣就會(huì)頻繁的創(chuàng)建和銷(xiāo)毀線程,如此一來(lái)會(huì)大大降低系統(tǒng)的效率产场《焖瑁可能出現(xiàn)服務(wù)器在為每個(gè)請(qǐng)求創(chuàng)建新線程和銷(xiāo)毀線程上花費(fèi)的時(shí)間和消耗的系統(tǒng)資源要比處理實(shí)際的用戶(hù)請(qǐng)求的時(shí)間和資源更多。
所以線程池就出現(xiàn)了京景。線程池為線程生命周期的開(kāi)銷(xiāo)和資源不足問(wèn)題提供了解決方案窿冯。通過(guò)對(duì)多個(gè)任務(wù)重用線程,線程創(chuàng)建的開(kāi)銷(xiāo)被分?jǐn)偟搅硕鄠€(gè)任務(wù)上确徙。
使用線程池的好處:
- 降低資源消耗醒串。通過(guò)重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷(xiāo)毀造成的消耗执桌。
- 提高響應(yīng)速度。當(dāng)任務(wù)到達(dá)時(shí)芜赌,任務(wù)可以不需要的等到線程創(chuàng)建就能立即執(zhí)行仰挣。
- 提高線程的可管理性。線程是稀缺資源缠沈,如果無(wú)限制的創(chuàng)建椎木,不僅會(huì)消耗系統(tǒng)資源,還會(huì)降低系統(tǒng)的穩(wěn)定性博烂,使用線程池可以進(jìn)行統(tǒng)一的分配香椎,調(diào)優(yōu)和監(jiān)控。
Java中的線程池是用ThreadPoolExecutor類(lèi)來(lái)實(shí)現(xiàn)的. 本文就結(jié)合JDK 1.8對(duì)該類(lèi)的源碼來(lái)分析一下這個(gè)類(lèi)內(nèi)部對(duì)于線程的創(chuàng)建, 管理以及后臺(tái)任務(wù)的調(diào)度等方面的執(zhí)行原理禽篱。
2畜伐、繼承關(guān)系
我們首先來(lái)看一下線程池的類(lèi)圖:
Executor接口
public interface Executor {
/**
* 在將來(lái)的某個(gè)時(shí)候執(zhí)行傳入的命令,執(zhí)行命令可以在實(shí)現(xiàn)類(lèi)里通過(guò)新創(chuàng)建的線程躺率、線程池玛界、當(dāng)前線程來(lái)完成。
*/
void execute(Runnable command);
}
ExecutorService接口
public interface ExecutorService extends Executor {
/**
* 啟動(dòng)先前提交的任務(wù)被執(zhí)行的有序關(guān)閉悼吱,但不接受新的任務(wù)慎框。 如果已經(jīng)關(guān)閉,則調(diào)用沒(méi)有其他影響后添。
*/
void shutdown();
/**
* 嘗試停止所有正在執(zhí)行的任務(wù)笨枯,停止等待任務(wù)的處理,并返回正在等待執(zhí)行的任務(wù)的列表遇西。
* 該方法不能等待之前提交的任務(wù)執(zhí)行完馅精,如果需要等待執(zhí)行,可以使用{@link #awaitTermination awaitTermination}
* 從這個(gè)方法返回后粱檀,這些任務(wù)從任務(wù)隊(duì)列中排出(移除)洲敢。 除了竭盡全力地停止處理主動(dòng)執(zhí)行任務(wù)之外,沒(méi)有任何保證茄蚯。
*/
List<Runnable> shutdownNow();
/**
* 線程池有沒(méi)有被關(guān)閉压彭,關(guān)閉返回true,否則false
*/
boolean isShutdown();
/**
* 如果所有任務(wù)在關(guān)閉后都完成了渗常。返回true
* 提示:如果沒(méi)有在調(diào)用該方法前調(diào)用shutdown或者shutdownNow方法壮不,此方法永遠(yuǎn)不會(huì)返回true
*/
boolean isTerminated();
/**
* 在指定時(shí)間內(nèi)阻塞等待任務(wù)全部完成,完成了返回true凳谦,否則false
*/
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 提交一個(gè)有返回值的任務(wù)
*/
<T> Future<T> submit(Callable<T> task);
/**
* 提交一個(gè)任務(wù)來(lái)執(zhí)行忆畅,返回一個(gè)有返回值的結(jié)果,返回值為傳入的result
*/
<T> Future<T> submit(Runnable task, T result);
/**
* 提交一個(gè)任務(wù)來(lái)執(zhí)行尸执,返回一個(gè)有返回值的結(jié)果家凯,返回值為null
*/
Future<?> submit(Runnable task);
/**
* 執(zhí)行一批有返回值的任務(wù)
* 返回的結(jié)果調(diào)用{@link Future#isDone}都是true
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
/**
* 執(zhí)行給定的任務(wù),當(dāng)全部完成或者超時(shí)返回一個(gè)有狀態(tài)和結(jié)果的Future集合如失。
* 返回的結(jié)果調(diào)用{@link Future#isDone}都是true
* 返回時(shí)绊诲,尚未完成的任務(wù)將被取消。
* 如果在進(jìn)行此操作時(shí)修改了給定的集合褪贵,則此方法的結(jié)果是不確定的掂之。
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 執(zhí)行給定的任務(wù),返回一個(gè)成功完成任務(wù)的結(jié)果(即脆丁,沒(méi)有拋出異常)世舰,
* 如果有的話(huà)。 在正巢畚溃或異常返回時(shí)跟压,尚未完成的任務(wù)將被取消。
* 如果在進(jìn)行此操作時(shí)修改了給定的集合歼培,則此方法的結(jié)果是不確定的震蒋。
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
/**
* 執(zhí)行給定的任務(wù),返回一個(gè)成功完成任務(wù)的結(jié)果(即躲庄,沒(méi)有拋出異常)查剖,
* 如果有的話(huà)。 在正吃刖剑或異常返回時(shí)笋庄,尚未完成的任務(wù)將被取消。
* 如果在進(jìn)行此操作時(shí)修改了給定的集合倔监,則此方法的結(jié)果是不確定的液斜。
* 超時(shí)沒(méi)有成功結(jié)果拋出TimeoutException
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
AbstractExecutorService接口
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
// ...
}
3净当、ThreadPoolExecutor分析
想要深入理解ThreadPoolExecutor,就要先理解其中最重要的幾個(gè)參數(shù):
3.1、核心變量與方法(狀態(tài)轉(zhuǎn)換)
// 狀態(tài)|工作數(shù)的一個(gè)32bit的值
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
// 0001-1111-1111-1111-1111-1111-1111-1111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 1110-0000-0000-0000-0000-0000-0000-0000
private static final int RUNNING = -1 << COUNT_BITS;
// 0000-0000-0000-0000-0000-0000-0000-0000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 0010-0000-0000-0000-0000-0000-0000-0000
private static final int STOP = 1 << COUNT_BITS;
// 0100-0000-0000-0000-0000-0000-0000-0000
private static final int TIDYING = 2 << COUNT_BITS;
// 0110-0000-0000-0000-0000-0000-0000-0000
private static final int TERMINATED = 3 << COUNT_BITS;
// ~CAPACITY就是前3位狀態(tài)位兰伤,和c進(jìn)行&就能得到當(dāng)前的狀態(tài)
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 和c進(jìn)行&就能得到當(dāng)前的工作數(shù)
private static int workerCountOf(int c) { return c & CAPACITY; }
// rs就是狀態(tài)值,wc就是工作數(shù)逗物,這兩個(gè)進(jìn)行或操作方库,就能得到ctl的值(32bit的值)
private static int ctlOf(int rs, int wc) { return rs | wc; }
可能很多人看到上面的寫(xiě)法都蒙圈了。我其實(shí)基礎(chǔ)也不太好弯院,所以我看到這里的時(shí)候索性寫(xiě)了個(gè)工具類(lèi)去測(cè)試他們的輸出結(jié)果辱士,如下:
public class ExecutorTest {
private final static int COUNT_BITS = Integer.SIZE - 3;
private final static int RUNNING = -1 << COUNT_BITS;
private final static int SHUTDOWN = 0 << COUNT_BITS;
private final static int STOP = 1 << COUNT_BITS;
private final static int TIDYING = 2 << COUNT_BITS;
private final static int TERMINATED = 3 << COUNT_BITS;
private final static int CAPACITY = (1 << COUNT_BITS) - 1;
public static void main(String[] args) {
System.out.println("狀態(tài)位===");
System.out.println(getFormatStr(RUNNING));
System.out.println(getFormatStr(SHUTDOWN));
System.out.println(getFormatStr(STOP));
System.out.println(getFormatStr(TIDYING));
System.out.println(getFormatStr(TERMINATED));
System.out.println(getFormatStr(CAPACITY));
}
private static String getFormatStr(int n) {
String integerMaxValueStr = Integer.toBinaryString(n);
int a = 32;
StringBuilder sb = new StringBuilder();
int l = integerMaxValueStr.length();
int i = 0;
for (; a > 0; --a) {
if (--l >= 0) {
sb.append(integerMaxValueStr.charAt(l));
} else {
sb.append("0");
}
if (++i % 4 == 0) {
if (a > 1) {
sb.append("-");
}
i = 0;
}
}
return sb.reverse().toString();
}
}
輸出結(jié)果為:
狀態(tài)位===
1110-0000-0000-0000-0000-0000-0000-0000
0000-0000-0000-0000-0000-0000-0000-0000
0010-0000-0000-0000-0000-0000-0000-0000
0100-0000-0000-0000-0000-0000-0000-0000
0110-0000-0000-0000-0000-0000-0000-0000
0001-1111-1111-1111-1111-1111-1111-1111
通過(guò)上面的注釋以及測(cè)試用例可以發(fā)現(xiàn),源碼的作者巧妙的運(yùn)用一個(gè)值代表了2種意思(前3bit位是狀態(tài)听绳,后29bit是工作數(shù))颂碘,下面我們來(lái)看看線程池最重要的5種狀態(tài):
- RUNNING:能接受新提交的任務(wù),并且也能處理阻塞隊(duì)列中的任務(wù)椅挣;
- SHUTDOWN:關(guān)閉狀態(tài)头岔,不再接受新提交的任務(wù)塔拳,但卻可以繼續(xù)處理阻塞隊(duì)列中已保存的任務(wù)。在線程池處于 RUNNING 狀態(tài)時(shí)峡竣,調(diào)用 shutdown()方法會(huì)使線程池進(jìn)入到該狀態(tài)靠抑。(finalize() 方法在執(zhí)行過(guò)程中也會(huì)調(diào)用shutdown()方法進(jìn)入該狀態(tài));
- STOP:不能接受新任務(wù)适掰,也不處理隊(duì)列中的任務(wù)颂碧,會(huì)中斷正在處理任務(wù)的線程。在線程池處于 RUNNING 或 SHUTDOWN 狀態(tài)時(shí)类浪,調(diào)用 shutdownNow() 方法會(huì)使線程池進(jìn)入到該狀態(tài)载城;
- TIDYING:如果所有的任務(wù)都已終止了,workerCount (有效線程數(shù)) 為0费就,線程池進(jìn)入該狀態(tài)后會(huì)調(diào)用 terminated() 方法進(jìn)入TERMINATED 狀態(tài)诉瓦。
- TERMINATED:在terminated() 方法執(zhí)行完后進(jìn)入該狀態(tài),默認(rèn)terminated()方法中什么也沒(méi)有做受楼。
下圖為線程池的狀態(tài)轉(zhuǎn)換過(guò)程:
3.2垦搬、構(gòu)造方法
/**
* @param corePoolSize 保留在線程池中的線程數(shù),即使它們處于空閑狀態(tài)艳汽,除非設(shè)置了{(lán)@code allowCoreThreadTimeOut}
* @param maximumPoolSize 線程池中允許的最大線程數(shù)
* @param keepAliveTime 當(dāng)線程數(shù)大于corePoolSize時(shí)猴贰,這是多余空閑線程在終止之前等待新任務(wù)的最大時(shí)間。
* @param unit {@code keepAliveTime}參數(shù)的時(shí)間單位
* @param workQueue 在執(zhí)行任務(wù)之前用于保存任務(wù)的隊(duì)列河狐。 這個(gè)隊(duì)列將只保存{@code execute}方法提交的{@code Runnable}任務(wù)米绕。
* @param threadFactory 用來(lái)執(zhí)行的時(shí)候創(chuàng)建線程的線程工廠
* @param handler 在執(zhí)行被阻塞時(shí)使用的處理程序,因?yàn)檫_(dá)到了線程邊界和隊(duì)列容量
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
對(duì)于參數(shù)handler:線程池提供了4種策略:
- AbortPolicy:直接拋出異常馋艺,這是默認(rèn)策略栅干;
- CallerRunsPolicy:用調(diào)用者所在的線程來(lái)執(zhí)行任務(wù);
- DiscardOldestPolicy:丟棄阻塞隊(duì)列中靠最前的任務(wù)捐祠,并執(zhí)行當(dāng)前任務(wù)碱鳞;
- DiscardPolicy:直接丟棄任務(wù);
3.3踱蛀、核心方法
execute方法
線程池最核心的方法莫過(guò)于execute了窿给,execute()方法用來(lái)提交任務(wù),下面我們順著這個(gè)方法看看其實(shí)現(xiàn)原理:
/**
* 在未來(lái)的某個(gè)時(shí)刻執(zhí)行給定的任務(wù)率拒。這個(gè)任務(wù)用一個(gè)新線程執(zhí)行崩泡,或者用一個(gè)線程池中已經(jīng)存在的線程執(zhí)行
* 如果任務(wù)無(wú)法被提交執(zhí)行,要么是因?yàn)檫@個(gè)Executor已經(jīng)被shutdown關(guān)閉猬膨,要么是已經(jīng)達(dá)到其容量上限角撞,任務(wù)會(huì)被當(dāng)前的RejectedExecutionHandler處理
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* 執(zhí)行分以下3步:
*
* 1. 如果運(yùn)行的線程少于corePoolSize,嘗試開(kāi)啟一個(gè)新線程去運(yùn)行command,command作為這個(gè)線程的第一個(gè)任務(wù)
*
* 2. 如果線程入隊(duì)成功谒所,然后還是要進(jìn)行double-check的热康,因?yàn)榫€程池在入隊(duì)之后狀態(tài)是可能會(huì)發(fā)生變化的
*
* 3. 如果無(wú)法將任務(wù)入隊(duì)列(可能隊(duì)列滿(mǎn)了),需要新開(kāi)一個(gè)線程
* 如果失敗了百炬,說(shuō)明線程池shutdown 或者 飽和了褐隆,所以我們拒絕任務(wù)污它。
*/
int c = ctl.get();
/**
* 1剖踊、如果當(dāng)前線程數(shù)少于corePoolSize,開(kāi)啟一個(gè)線程執(zhí)行命令
*(可能是由于addWorker()操作已經(jīng)包含對(duì)線程池狀態(tài)的判斷衫贬,如此處沒(méi)加德澈,而入workQueue前加了)
*/
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
/**
* 沒(méi)有成功addWorker(),再次獲取c(凡是需要再次用ctl做判斷時(shí)固惯,都會(huì)再次調(diào)用ctl.get())
* 失敗的原因可能是:
* 1梆造、線程池已經(jīng)shutdown,shutdown的線程池不再接收新任務(wù)
* 2葬毫、workerCountOf(c) < corePoolSize 判斷后镇辉,由于并發(fā),別的線程先創(chuàng)建了worker線程贴捡,導(dǎo)致workerCount>=corePoolSize
*/
c = ctl.get();
}
/**
* 2忽肛、如果線程池RUNNING狀態(tài),且入隊(duì)列成功
*/
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();//再次校驗(yàn)位
//如果再次校驗(yàn)過(guò)程中烂斋,線程池不是RUNNING狀態(tài)屹逛,并且remove(command)--workQueue.remove()成功,拒絕當(dāng)前command
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
// 新建一個(gè)worker線程汛骂,沒(méi)有指定firstTask罕模,因?yàn)槊钜呀?jīng)放入queue里了
addWorker(null, false);
}
/**
* 3、如果線程池不是running狀態(tài) 或者 無(wú)法入隊(duì)列
* 嘗試開(kāi)啟新線程帘瞭,擴(kuò)容至maxPoolSize淑掌,如果addWork(command, false)失敗了,拒絕當(dāng)前command
*/
else if (!addWorker(command, false))
reject(command);
}
在執(zhí)行execute()方法時(shí)如果狀態(tài)一直是RUNNING時(shí)蝶念,的執(zhí)行過(guò)程如下:
- 如果workerCount < corePoolSize抛腕,則創(chuàng)建并啟動(dòng)一個(gè)線程來(lái)執(zhí)行新提交的任務(wù);
- 如果workerCount >= corePoolSize祸轮,且線程池內(nèi)的阻塞隊(duì)列未滿(mǎn)兽埃,則將任務(wù)添加到該阻塞隊(duì)列中;
- 如果workerCount >= corePoolSize && workerCount < maximumPoolSize适袜,且線程池內(nèi)的阻塞隊(duì)列已滿(mǎn)柄错,則創(chuàng)建并啟動(dòng)一個(gè)線程來(lái)執(zhí)行新提交的任務(wù);
- 如果workerCount >= maximumPoolSize,并且線程池內(nèi)的阻塞隊(duì)列已滿(mǎn), 則根據(jù)拒絕策略來(lái)處理該任務(wù), 默認(rèn)的處理方式是直接拋異常售貌。
addWorker方法
addWorker方法的主要工作是在線程池中創(chuàng)建一個(gè)新的線程并執(zhí)行给猾,firstTask參數(shù) 用于指定新增的線程執(zhí)行的第一個(gè)任務(wù)。core為true表示在新增線程時(shí)會(huì)判斷當(dāng)前活動(dòng)線程數(shù)是否少于corePoolSize颂跨,false表示新增線程前需要判斷當(dāng)前活動(dòng)線程數(shù)是否少于maximumPoolSize敢伸,代碼如下:
/**
* 檢查是否可以針對(duì)當(dāng)前的池狀態(tài)和給定的界限(核心或最大值)添加新的工作者。相應(yīng)地調(diào)整工人數(shù)量恒削,并且如果可能的話(huà)池颈,創(chuàng)建并開(kāi)始新的工作者,運(yùn)行firstTask作為其第一個(gè)任務(wù)钓丰。
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/**
* 只有當(dāng)下面兩種情況會(huì)繼續(xù)執(zhí)行躯砰,其他直接返回false(添加失敗)
* 1携丁、rs == RUNNING
* 2琢歇、rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty() (執(zhí)行了shutdown方法,但是阻塞隊(duì)列還有任務(wù)沒(méi)有執(zhí)行)
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 當(dāng)wc超過(guò)最大限制 || 如果是核心線程梦鉴,超過(guò)了核心數(shù)李茫,否則超過(guò)了最大線程數(shù),直接返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
// count累加成功肥橙,直接跳出兩層for循環(huán)魄宏,執(zhí)行下面的邏輯
break retry;
/**
* 能執(zhí)行到這里,都是因?yàn)槎嗑€程競(jìng)爭(zhēng)快骗,只有兩種情況
* 1娜庇、workCount發(fā)生變化,compareAndIncrementWorkerCount失敗方篮,這種情況不需要重新獲取ctl名秀,繼續(xù)for循環(huán)即可。
* 2藕溅、runState發(fā)生變化匕得,可能執(zhí)行了shutdown或者shutdownNow,這種情況重新走retry巾表,取得最新的ctl并判斷狀態(tài)汁掠。
*/
c = ctl.get(); // 重新讀取ctl,可能狀態(tài)發(fā)生了變化
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 獲取鎖后重新檢測(cè)runState集币,因?yàn)橛锌赡躶hutdown了
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
// 線程不能是活躍狀態(tài)
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s; //記錄最大線程數(shù)
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// 失敗回退,從 wokers 移除 w, 線程數(shù)減一考阱,嘗試結(jié)束線程池(調(diào)用tryTerminate 方法)
addWorkerFailed(w);
}
return workerStarted;
}
注意一下這里的t.start()這個(gè)語(yǔ)句,啟動(dòng)時(shí)會(huì)調(diào)用Worker類(lèi)中的run方法鞠苟,Worker本身實(shí)現(xiàn)了Runnable接口乞榨,所以一個(gè)Worker類(lèi)型的對(duì)象也是一個(gè)線程秽之。
Worker類(lèi)
線程池中的每一個(gè)線程被封裝成一個(gè)Worker對(duì)象,ThreadPool維護(hù)的其實(shí)就是一組Worker對(duì)象吃既,看一下Worker的定義:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
// 還沒(méi)有執(zhí)行任務(wù)時(shí)考榨,這時(shí)就不應(yīng)該被中斷,設(shè)置狀態(tài)為-1
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
// 調(diào)用runWorker方法執(zhí)行
runWorker(this);
}
// Lock methods
//
// 0代表沒(méi)有鎖定狀態(tài)
// 1代表鎖定狀態(tài)
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Worker繼承了AQS鹦倚,使用AQS來(lái)實(shí)現(xiàn)獨(dú)占鎖的功能河质。為什么不使用ReentrantLock來(lái)實(shí)現(xiàn)呢?可以看到tryAcquire方法震叙,它是不允許重入的掀鹅,而ReentrantLock是允許重入的:
- lock方法一旦獲取了獨(dú)占鎖,表示當(dāng)前線程正在執(zhí)行任務(wù)中捐友;
- 如果正在執(zhí)行任務(wù)淫半,則不應(yīng)該中斷線程溃槐;
- 如果該線程現(xiàn)在不是獨(dú)占鎖的狀態(tài)匣砖,也就是空閑的狀態(tài),說(shuō)明它沒(méi)有在處理任務(wù)昏滴,這時(shí)可以對(duì)該線程進(jìn)行中斷猴鲫;
- 線程池在執(zhí)行shutdown方法或tryTerminate方法時(shí)會(huì)調(diào)用interruptIdleWorkers方法來(lái)中斷空閑的線程,interruptIdleWorkers方法會(huì)使用tryLock方法來(lái)判斷線程池中的線程是否是空閑狀態(tài)谣殊;
- 之所以設(shè)置為不可重入拂共,是因?yàn)槲覀儾幌M蝿?wù)在調(diào)用像setCorePoolSize這樣的線程池控制方法時(shí)重新獲取鎖。如果使用ReentrantLock姻几,它是可重入的宜狐,這樣如果在任務(wù)中調(diào)用了如setCorePoolSize這類(lèi)線程池控制的方法,會(huì)中斷正在運(yùn)行的線程蛇捌。
所以抚恒,Worker繼承自AQS,用于判斷線程是否空閑以及是否可以被中斷络拌。
runWorker方法
在Worker類(lèi)中的run方法調(diào)用了runWorker方法來(lái)執(zhí)行任務(wù)俭驮,runWorker方法的代碼如下:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 允許中斷
boolean completedAbruptly = true;
try {
// 如果task為空,則通過(guò)getTask來(lái)獲取任務(wù)
while (task != null || (task = getTask()) != null) {
w.lock(); // 開(kāi)始運(yùn)行春贸,不允許中斷
/**
* 確保只有在線程STOP時(shí)混萝,才會(huì)被設(shè)置中斷標(biāo)示,否則清除中斷標(biāo)示
* 1萍恕、如果線程池狀態(tài)>=STOP逸嘀,且當(dāng)前線程沒(méi)有設(shè)置中斷狀態(tài),wt.interrupt()
* 2允粤、如果一開(kāi)始判斷線程池狀態(tài)<STOP崭倘,但Thread.interrupted()為true屯蹦,即線程已經(jīng)被中斷,又清除了中斷標(biāo)示绳姨,再次判斷線程池狀態(tài)是否>=STOP
* 是登澜,再次設(shè)置中斷標(biāo)示,wt.interrupt()
* 否飘庄,不做操作脑蠕,清除中斷標(biāo)示后進(jìn)行后續(xù)步驟
*/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 用戶(hù)自己實(shí)現(xiàn)
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 真正執(zhí)行任務(wù)
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 用戶(hù)自己實(shí)現(xiàn)
afterExecute(task, thrown);
}
} finally {
task = null;
// worker已經(jīng)完成的任務(wù)數(shù) + 1
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
/**
* getTask方法用來(lái)從阻塞隊(duì)列中取任務(wù)
* 以下情況會(huì)返回null(被回收)
* 1、超過(guò)了maximumPoolSize設(shè)置的線程數(shù)量(因?yàn)檎{(diào)用了setMaximumPoolSize())
* 2跪削、線程池被stop
* 3谴仙、線程池被shutdown,并且workQueue空了
* 4碾盐、線程等待任務(wù)超時(shí)
* 返回null表示這個(gè)worker要結(jié)束了晃跺,這種情況下workerCount-1
*/
private Runnable getTask() {
boolean timedOut = false; // 上一次poll()是否超時(shí)
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/**
* 滿(mǎn)足以下幾點(diǎn),wc - 1毫玖,返回null
* 1掀虎、rs >= STOP
* 2、rs == SHUTDOWN && workQueue.isEmpty()
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 隊(duì)列獲取值是否要阻塞等待
// allowCoreThreadTimeOut默認(rèn)是false付枫,也就是核心線程不允許進(jìn)行超時(shí)烹玉;
// wc > corePoolSize,表示當(dāng)前線程池中的線程數(shù)量大于核心線程數(shù)量阐滩;
// 對(duì)于超過(guò)核心線程數(shù)量的這些線程二打,需要進(jìn)行超時(shí)控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/**
* 滿(mǎn)足以下幾點(diǎn),wc - 1掂榔,返回null
* 1继效、wc > maximumPoolSize
* 2、1 < wc <= maximumPoolSize && timed && timedOut
* 3装获、wc <= 1 && workQueue.isEmpty() && timed && timedOut
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 獲取Runnable
Runnable r = timed ?
// 超時(shí)會(huì)被回收
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// 阻塞等待瑞信,默認(rèn)設(shè)置最后最多會(huì)有corePoolSize的線程一起阻塞。
// 如果設(shè)置allowCoreThreadTimeOut=true的話(huà)饱溢,最后所有線程都會(huì)被回收喧伞。
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
/**
* @param completedAbruptly true:worker執(zhí)行的時(shí)候異常了
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
/**
* 1、worker數(shù)量-1
* 如果是突然終止绩郎,說(shuō)明是task執(zhí)行時(shí)異常情況導(dǎo)致潘鲫,即run()方法執(zhí)行時(shí)發(fā)生了異常,那么正在工作的worker線程數(shù)量需要-1
* 如果不是突然終止肋杖,說(shuō)明是worker線程沒(méi)有task可執(zhí)行了溉仑,不用-1,因?yàn)橐呀?jīng)在getTask()方法中-1了
*/
if (completedAbruptly)
decrementWorkerCount();
/**
* 2状植、從Workers Set中移除worker
*/
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 統(tǒng)計(jì)完成的任務(wù)數(shù)
completedTaskCount += w.completedTasks;
// 從workers中移除浊竟,也就表示著從線程池中移除了一個(gè)工作線程
workers.remove(w);
} finally {
mainLock.unlock();
}
/**
* 3怨喘、在對(duì)線程池有負(fù)效益的操作時(shí),都需要“嘗試終止”線程池
* 主要是判斷線程池是否滿(mǎn)足終止的狀態(tài)
* 如果狀態(tài)滿(mǎn)足振定,但還有線程池還有線程必怜,嘗試對(duì)其發(fā)出中斷響應(yīng),使其能進(jìn)入退出流程
* 沒(méi)有線程了后频,更新?tīng)顟B(tài)為tidying->terminated
*/
tryTerminate();
/**
* 4梳庆、是否需要增加worker線程
* 線程池狀態(tài)是running 或 shutdown
* 如果當(dāng)前線程是突然終止的,addWorker()
* 如果當(dāng)前線程不是突然終止的卑惜,但當(dāng)前線程數(shù)量 < 要維護(hù)的線程數(shù)量膏执,addWorker()
* 故如果調(diào)用線程池shutdown(),直到workQueue為空前露久,線程池都會(huì)維持corePoolSize個(gè)線程更米,然后再逐漸銷(xiāo)毀這corePoolSize個(gè)線程
*/
int c = ctl.get();
/**
* 以下情況會(huì)增加一個(gè)worker addWorker(null, false);
* 1、completedAbruptly == true
* 2毫痕、completedAbruptly == false && allowCoreThreadTimeOut == true && wc < 1
* 3征峦、completedAbruptly == false && allowCoreThreadTimeOut == false && wc < corePoolSize
*/
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
getTask重要的地方是第二個(gè)if判斷,目的是控制線程池的有效線程數(shù)量镇草。由上文中的分析可以知道眶痰,在執(zhí)行execute方法時(shí),如果當(dāng)前線程池的線程數(shù)量超過(guò)了corePoolSize且小于maximumPoolSize梯啤,并且workQueue已滿(mǎn)時(shí),則可以增加工作線程存哲,但這時(shí)如果超時(shí)沒(méi)有獲取到任務(wù)因宇,也就是timedOut為true的情況,說(shuō)明workQueue已經(jīng)為空了祟偷,也就說(shuō)明了當(dāng)前線程池中不需要那么多線程來(lái)執(zhí)行任務(wù)了察滑,可以把多于corePoolSize數(shù)量的線程銷(xiāo)毀掉,保持線程數(shù)量在corePoolSize即可修肠。
processWorkerExit執(zhí)行完之后贺辰,工作線程被銷(xiāo)毀,以上就是整個(gè)工作線程的生命周期嵌施,從execute方法開(kāi)始饲化,Worker使用ThreadFactory創(chuàng)建新的工作線程,runWorker通過(guò)getTask獲取任務(wù)吗伤,然后執(zhí)行任務(wù)吃靠,如果getTask返回null,進(jìn)入processWorkerExit方法足淆,整個(gè)線程結(jié)束
下面是從execute到線程銷(xiāo)毀的整個(gè)流程圖:
3.4巢块、其他外部調(diào)用方法
下面的方法都是用戶(hù)可以自己進(jìn)行調(diào)用的:
/**
* 狀態(tài)改為SHUTDOWN
* 啟動(dòng)先前提交的任務(wù)被執(zhí)行的有序關(guān)閉礁阁,但不接受新的任務(wù)。 如果已經(jīng)關(guān)閉族奢,則調(diào)用沒(méi)有其他影響姥闭。
* 該方法不能等待之前提交的任務(wù)執(zhí)行完,如果需要等待執(zhí)行越走,可以使用{@link #awaitTermination awaitTermination}
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
/**
* 狀態(tài)改為SHUTDOWN
* 嘗試停止所有正在執(zhí)行的任務(wù)泣栈,停止等待任務(wù)的處理,并返回正在等待執(zhí)行的任務(wù)的列表弥姻。
* 該方法不能等待之前提交的任務(wù)執(zhí)行完南片,如果需要等待執(zhí)行,可以使用{@link #awaitTermination awaitTermination}
* 從這個(gè)方法返回后庭敦,這些任務(wù)從任務(wù)隊(duì)列中排出(移除)疼进。 除了竭盡全力地停止處理主動(dòng)執(zhí)行任務(wù)之外,沒(méi)有任何保證秧廉。
* 這個(gè)實(shí)現(xiàn)通過(guò){@link Thread#interrupt}來(lái)取消任務(wù)伞广,所以任何不能響應(yīng)中斷的任務(wù)都不會(huì)終止。
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
// 執(zhí)行任務(wù)前的hook
protected void beforeExecute(Thread t, Runnable r) { }
// 執(zhí)行任務(wù)后的hook
protected void afterExecute(Runnable r, Throwable t) { }
/**
* 什么都不做疼电,交給子類(lèi)實(shí)現(xiàn)嚼锄,注意實(shí)現(xiàn)的時(shí)候使用super.terminated();
*/
protected void terminated() { }
/**
* 判斷狀態(tài) >= SHUTDOWN
*/
public boolean isShutdown() {
return ! isRunning(ctl.get());
}
/**
* 判斷 SHUTDOWN <= 狀態(tài) < TERMINATED
*/
public boolean isTerminating() {
int c = ctl.get();
return ! isRunning(c) && runStateLessThan(c, TERMINATED);
}
/**
* 判斷狀態(tài) == TERMINATED
*/
public boolean isTerminated() {
return runStateAtLeast(ctl.get(), TERMINATED);
}
/**
* 在指定的超時(shí)時(shí)間范圍內(nèi)等待狀態(tài)變?yōu)門(mén)ERMINATED
*/
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
/**
* 1、當(dāng)前workCount > 傳入的corePoolSize蔽豺,中斷空閑worker
* 2区丑、傳入的corePoolSize比之前的要大,選出差值和queue的大小做比較修陡,比較小的作為要增加的線程數(shù)沧侥,調(diào)用addWorker,如果中途遇到workQueue為空魄鸦,就不增加了宴杀。
*/
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();
else if (delta > 0) {
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
}
/**
* 提前準(zhǔn)備一個(gè)core的線程
*/
public boolean prestartCoreThread() {
return workerCountOf(ctl.get()) < corePoolSize &&
addWorker(null, true);
}
/**
* 提前準(zhǔn)備所有的core線程
*/
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))
++n;
return n;
}
// 設(shè)置coreThreadTimeOut的值
public void allowCoreThreadTimeOut(boolean value) {
if (value && keepAliveTime <= 0)
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
if (value != allowCoreThreadTimeOut) {
allowCoreThreadTimeOut = value;
if (value)
interruptIdleWorkers();
}
}
// 設(shè)置maximumPoolSize
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize;
if (workerCountOf(ctl.get()) > maximumPoolSize)
interruptIdleWorkers();
}
// 從隊(duì)列里面移除任務(wù)
public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
tryTerminate(); // In case SHUTDOWN and now empty
return removed;
}
/**
* 清除隊(duì)列里所有唄cancel的Future類(lèi)型的任務(wù),此方法可用作存儲(chǔ)回收操作
* 該方法可能存在其他線程的干擾拾因,導(dǎo)致清除失敗
*/
public void purge() {
final BlockingQueue<Runnable> q = workQueue;
try {
Iterator<Runnable> it = q.iterator();
while (it.hasNext()) {
Runnable r = it.next();
if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
it.remove();
}
} catch (ConcurrentModificationException fallThrough) {
// 如果在遍歷期間遇到干擾旺罢,請(qǐng)采取慢速路徑。進(jìn)行遍歷復(fù)制并調(diào)用remove取消條目绢记。慢路徑更可能是O(N * N)扁达。
for (Object r : q.toArray())
if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
q.remove(r);
}
tryTerminate(); // In case SHUTDOWN and now empty
}
/**
* 返回線程池大小
*/
public int getPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// runState == TIDYING 或者 runState == TERMINATED 返回0
// 否則返回workers的大小
return runStateAtLeast(ctl.get(), TIDYING) ? 0
: workers.size();
} finally {
mainLock.unlock();
}
}
/**
* 獲取活躍線程數(shù):根據(jù)isLocked來(lái)判斷是不是在執(zhí)行任務(wù)
*/
public int getActiveCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int n = 0;
for (Worker w : workers)
if (w.isLocked())
++n;
return n;
} finally {
mainLock.unlock();
}
}
/**
* 返回最大線程池的大小
*/
public int getLargestPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
return largestPoolSize;
} finally {
mainLock.unlock();
}
}
/**
* 返回任務(wù)總數(shù)(包括已經(jīng)完成的和未完成的)
*/
public long getTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers) {
n += w.completedTasks;
if (w.isLocked())
++n;
}
return n + workQueue.size();
} finally {
mainLock.unlock();
}
}
/**
* 返回已完成任務(wù)總數(shù)
*/
public long getCompletedTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers)
n += w.completedTasks;
return n;
} finally {
mainLock.unlock();
}
}
3.5、內(nèi)部方法以及空方法
下面的方法都是用戶(hù)自己調(diào)用不了的方法庭惜,這里也做一下說(shuō)明:
/**
* 替換狀態(tài)
* 如果現(xiàn)在的ctl狀態(tài) >= targetState罩驻,什么都不做
* 如果現(xiàn)在的ctl狀態(tài) < targetState,嘗試替換狀態(tài)
*/
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
// 前3位替換护赊,后29位保持ctl原來(lái)的數(shù)目
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
/**
* 嘗試終止惠遏,只有當(dāng)以下幾種情況才把狀態(tài)改為T(mén)ERMINATED
* 1砾跃、SHUTDOWN狀態(tài) && queue是空的 && wc == 0
* 2、STOP狀態(tài) && wc == 0
* workCount如果不是0节吮,這時(shí)候就中斷其中一個(gè)idle的worker來(lái)傳播關(guān)閉信號(hào)
* 該方法必須在執(zhí)行任何可能會(huì)終止的操作之后調(diào)用此方法 - 在關(guān)閉期間減少工作人員數(shù)量或從隊(duì)列中刪除任務(wù)抽高。
* ScheduledThreadPoolExecutor里面也用到了,所以這里修飾符沒(méi)用private
*/
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// c是運(yùn)行時(shí)的狀態(tài)
if (isRunning(c) ||
// c的狀態(tài)值 >= TIDYING
runStateAtLeast(c, TIDYING) ||
// c的狀態(tài)是SHUTDOWN && 隊(duì)列不是空
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// worker數(shù)不是0
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 設(shè)置ctl的狀態(tài)為T(mén)IDYING透绩,為中間過(guò)渡狀態(tài)
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 終止方法翘骂,空方法什么都不做,子類(lèi)去實(shí)現(xiàn)
terminated();
} finally {
// 設(shè)置ctl的狀態(tài)為T(mén)ERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
}
}
/**
* 中斷worker的空閑線程
* @param onlyOne 是否僅僅中斷worker中的第一個(gè)
*/
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 嘗試獲取鎖帚豪,這里只有當(dāng)線程沒(méi)有運(yùn)行的時(shí)候才能夠tryLock成功
if (!t.isInterrupted() && w.tryLock()) {
try {
// 設(shè)置worker線程的中斷變量
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
// true碳竟,只中斷隊(duì)列的第一個(gè)就退出
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
/**
* 中斷所有worker的線程
*/
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
/**
* 中斷所有worker的空閑線程
*/
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
/**
* 根據(jù)拒絕策略拒絕執(zhí)行命令
*/
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
/**
* 移除隊(duì)列中的Runnable到一個(gè)新list中,使用的是阻塞隊(duì)列的drainTo方法
* 但是如果隊(duì)列是DelayQueue或者其他能夠讓poll或者drainTo失敗移除元素的隊(duì)列的話(huà)狸臣,遍歷隊(duì)列并刪除它
*/
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
/**
* 預(yù)留方法莹桅,ScheduledThreadPoolExecutor重寫(xiě)了此方法
*/
void onShutdown() {
}
// ScheduledThreadPoolExecutor進(jìn)行調(diào)用,判斷是不是running或shutdown狀態(tài)
final boolean isRunningOrShutdown(boolean shutdownOK) {
int rs = runStateOf(ctl.get());
return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}
// ScheduledThreadPoolExecutor進(jìn)行調(diào)用烛亦,確認(rèn)都提前準(zhǔn)備好了
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
4诈泼、線程池的監(jiān)控
通過(guò)線程池提供的參數(shù)進(jìn)行監(jiān)控。線程池里有一些屬性在監(jiān)控線程池的時(shí)候可以使用
- getTaskCount:線程池已經(jīng)執(zhí)行的和未執(zhí)行的任務(wù)總數(shù)煤禽;
- getCompletedTaskCount:線程池已完成的任務(wù)數(shù)量铐达,該值小于等于taskCount;
- getLargestPoolSize:線程池曾經(jīng)創(chuàng)建過(guò)的最大線程數(shù)量檬果。通過(guò)這個(gè)數(shù)據(jù)可以知道線程池是否滿(mǎn)過(guò)瓮孙,也就是達(dá)到了maximumPoolSize;
- getPoolSize:線程池當(dāng)前的線程數(shù)量汁汗;
- getActiveCount:當(dāng)前線程池中正在執(zhí)行任務(wù)的線程數(shù)量衷畦。
通過(guò)這些方法,可以對(duì)線程池進(jìn)行監(jiān)控知牌,在ThreadPoolExecutor類(lèi)中提供了幾個(gè)空方法,如beforeExecute方法斤程,afterExecute方法和terminated方法角寸,可以擴(kuò)展這些方法在執(zhí)行前或執(zhí)行后增加一些新的操作,例如統(tǒng)計(jì)線程池的執(zhí)行任務(wù)的時(shí)間等忿墅,可以繼承自ThreadPoolExecutor來(lái)進(jìn)行擴(kuò)展扁藕。
5、合理的配置線程池
要想合理的配置線程池疚脐,就必須首先分析任務(wù)特性亿柑,可以從以下幾個(gè)角度來(lái)進(jìn)行分析:
- 任務(wù)的性質(zhì):CPU密集型任務(wù),IO密集型任務(wù)和混合型任務(wù)棍弄。
- 任務(wù)的優(yōu)先級(jí):高望薄,中和低疟游。
- 任務(wù)的執(zhí)行時(shí)間:長(zhǎng),中和短痕支。
- 任務(wù)的依賴(lài)性:是否依賴(lài)其他系統(tǒng)資源颁虐,如數(shù)據(jù)庫(kù)連接。
任務(wù)性質(zhì)不同的任務(wù)可以用不同規(guī)模的線程池分開(kāi)處理卧须。CPU密集型任務(wù)配置盡可能少的線程數(shù)量另绩,如配置Ncpu+1個(gè)線程的線程池。IO密集型任務(wù)則由于需要等待IO操作花嘶,線程并不是一直在執(zhí)行任務(wù)笋籽,則配置盡可能多的線程,如2*Ncpu椭员〕岛#混合型的任務(wù),如果可以拆分拆撼,則將其拆分成一個(gè)CPU密集型任務(wù)和一個(gè)IO密集型任務(wù)容劳,只要這兩個(gè)任務(wù)執(zhí)行的時(shí)間相差不是太大,那么分解后執(zhí)行的吞吐率要高于串行執(zhí)行的吞吐率闸度,如果這兩個(gè)任務(wù)執(zhí)行時(shí)間相差太大竭贩,則沒(méi)必要進(jìn)行分解。我們可以通過(guò)Runtime.getRuntime().availableProcessors()方法獲得當(dāng)前設(shè)備的CPU個(gè)數(shù)莺禁。
優(yōu)先級(jí)不同的任務(wù)可以使用優(yōu)先級(jí)隊(duì)列PriorityBlockingQueue來(lái)處理留量。它可以讓優(yōu)先級(jí)高的任務(wù)先得到執(zhí)行,需要注意的是如果一直有優(yōu)先級(jí)高的任務(wù)提交到隊(duì)列里哟冬,那么優(yōu)先級(jí)低的任務(wù)可能永遠(yuǎn)不能執(zhí)行楼熄。
執(zhí)行時(shí)間不同的任務(wù)可以交給不同規(guī)模的線程池來(lái)處理,或者也可以使用優(yōu)先級(jí)隊(duì)列浩峡,讓執(zhí)行時(shí)間短的任務(wù)先執(zhí)行可岂。
依賴(lài)數(shù)據(jù)庫(kù)連接池的任務(wù),因?yàn)榫€程提交SQL后需要等待數(shù)據(jù)庫(kù)返回結(jié)果翰灾,如果等待的時(shí)間越長(zhǎng)CPU空閑時(shí)間就越長(zhǎng)缕粹,那么線程數(shù)應(yīng)該設(shè)置越大,這樣才能更好的利用CPU纸淮。
建議使用有界隊(duì)列平斩,有界隊(duì)列能增加系統(tǒng)的穩(wěn)定性和預(yù)警能力,可以根據(jù)需要設(shè)大一點(diǎn)咽块,比如幾千绘面。有一次我們組使用的后臺(tái)任務(wù)線程池的隊(duì)列和線程池全滿(mǎn)了,不斷的拋出拋棄任務(wù)的異常,通過(guò)排查發(fā)現(xiàn)是數(shù)據(jù)庫(kù)出現(xiàn)了問(wèn)題揭璃,導(dǎo)致執(zhí)行SQL變得非常緩慢晚凿,因?yàn)楹笈_(tái)任務(wù)線程池里的任務(wù)全是需要向數(shù)據(jù)庫(kù)查詢(xún)和插入數(shù)據(jù)的,所以導(dǎo)致線程池里的工作線程全部阻塞住塘辅,任務(wù)積壓在線程池里晃虫。如果當(dāng)時(shí)我們?cè)O(shè)置成無(wú)界隊(duì)列,線程池的隊(duì)列就會(huì)越來(lái)越多扣墩,有可能會(huì)撐滿(mǎn)內(nèi)存哲银,導(dǎo)致整個(gè)系統(tǒng)不可用,而不只是后臺(tái)任務(wù)出現(xiàn)問(wèn)題呻惕。當(dāng)然我們的系統(tǒng)所有的任務(wù)是用的單獨(dú)的服務(wù)器部署的荆责,而我們使用不同規(guī)模的線程池跑不同類(lèi)型的任務(wù),但是出現(xiàn)這樣問(wèn)題時(shí)也會(huì)影響到其他任務(wù)亚脆。
我參考了:如何合理地估算線程池大凶鲈骸? 這篇文章里的使用程序評(píng)估線程池大小濒持。
6键耕、結(jié)論
本文比較詳細(xì)的分析了線程池的工作流程,總體來(lái)說(shuō)有如下幾個(gè)內(nèi)容:
- 分析了線程的創(chuàng)建柑营,任務(wù)的提交屈雄,狀態(tài)的轉(zhuǎn)換以及線程池的關(guān)閉;
- 這里通過(guò)execute方法來(lái)展開(kāi)線程池的工作流程官套,execute方法通過(guò)corePoolSize酒奶,maximumPoolSize以及阻塞隊(duì)列的大小來(lái)判斷決定傳入的任務(wù)應(yīng)該被立即執(zhí)行,還是應(yīng)該添加到阻塞隊(duì)列中奶赔,還是應(yīng)該拒絕任務(wù)惋嚎。
- 介紹了線程池關(guān)閉時(shí)的過(guò)程,也分析了shutdown方法與getTask方法存在競(jìng)態(tài)條件站刑;
- 在獲取任務(wù)時(shí)另伍,要通過(guò)線程池的狀態(tài)來(lái)判斷應(yīng)該結(jié)束工作線程還是阻塞線程等待新的任務(wù),也解釋了為什么關(guān)閉線程池時(shí)要中斷工作線程以及為什么每一個(gè)worker都需要lock绞旅。
在向線程池提交任務(wù)時(shí)质况,除了execute方法,還有一個(gè)submit方法玻靡,submit方法會(huì)返回一個(gè)Future對(duì)象用于獲取返回值,有關(guān)Future和Callable請(qǐng)自行了解一下相關(guān)的文章中贝,這里就不介紹了囤捻。
7、擴(kuò)展
一般開(kāi)發(fā)中core線程數(shù)量是很難確定的邻寿,可以參考上面提到的如何合理的估算線程池的大小蝎土,但是一般都是開(kāi)發(fā)者自己經(jīng)過(guò)壓測(cè)后得到的數(shù)據(jù)视哑,之后到真正的線程環(huán)境驗(yàn)證,得出一個(gè)合理的core數(shù)字誊涯。假設(shè)是5挡毅,但是為了預(yù)防某些瞬時(shí)大流量(我們也無(wú)法預(yù)知到底流量會(huì)有多大),通常會(huì)再設(shè)置一個(gè)比core線程數(shù)要大的max線程暴构,假設(shè)是10跪呈。那么當(dāng)這種瞬時(shí)流量真的發(fā)生了,如果希望服務(wù)器能盡快的提高處理速度取逾,當(dāng)然是需要讓MAX線程盡快啟動(dòng)起來(lái)耗绿,幫著處理任務(wù)。這時(shí)候我們就可以自己擴(kuò)展線程池砾隅。
8误阻、參考
聊聊并發(fā)(三)Java線程池的分析和使用
深入理解Java線程池:ThreadPoolExecutor
Java線程池ThreadPoolExecutor使用和分析(二) - execute()原理