ThreadPool 之 線程池實(shí)現(xiàn)類(lèi) ThreadPoolExecutor
接上篇文章 ThreadPool 之 線程池概覽承粤。
ThreadPoolExecutor
線程池
ThreadPoolExecutor
繼承了 AbstractExecutorService
砰逻,實(shí)現(xiàn)了核心方法 execute
以及一些獲取線程池信息的方法。
ThreadPoolExecutor
有一些重要的參數(shù):
// ctl存儲(chǔ)了線程狀態(tài)以及當(dāng)前線程池的線程數(shù)量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;// 最多可容納 2^29 - 1 個(gè)線程
// 運(yùn)行時(shí)狀態(tài)存儲(chǔ)在高字節(jié)位
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
private final ReentrantLock mainLock = new ReentrantLock(); // 對(duì)線程池進(jìn)行操作的時(shí)候的鎖
private final HashSet<Worker> workers = new HashSet<Worker>(); // 存放工作線程
private final Condition termination = mainLock.newCondition(); // 支持等待終止的等待條件
private int largestPoolSize; // 記錄線程池曾經(jīng)出現(xiàn)過(guò)的最大大小肥缔,只記錄,和容量沒(méi)有關(guān)系
private long completedTaskCount; // 已經(jīng)完成的任務(wù)數(shù)
private volatile boolean allowCoreThreadTimeOut; // 是否允許核心池設(shè)置存活時(shí)間
// 下面的參數(shù)是線程池的核心參數(shù)
private final BlockingQueue<Runnable> workQueue; // 存放等待被執(zhí)行的任務(wù)的隊(duì)列
private volatile ThreadFactory threadFactory; // 用來(lái)創(chuàng)建線程的線程工廠
private volatile RejectedExecutionHandler handler; // 任務(wù)拒絕策略
private volatile long keepAliveTime; // 線程存活時(shí)間
private volatile int corePoolSize; // 核心池大小
private volatile int maximumPoolSize; // 線程池的最大線程數(shù)
上面有幾個(gè)參數(shù)是線程池的核心參數(shù),在構(gòu)造函數(shù)中不一定需要傳入所有的值,但是 ThreadPoolExecutor
的構(gòu)造函數(shù)最終都調(diào)用了下面這個(gè)構(gòu)造函數(shù):
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException(); // 先進(jìn)行參數(shù)檢驗(yàn)
if (workQueue == null || threadFactory == null || handler == null) // 判斷空指針
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
下面來(lái)逐一解釋一下各個(gè)參數(shù)的含義。
int corePoolSize
核心池大小谎脯,一個(gè)線程池是同時(shí)在執(zhí)行很多任務(wù)的葱跋,核心池就是正在執(zhí)行的任務(wù)池。
int maximumPoolSize
線程池的最大線程數(shù),當(dāng)核心池滿了以后娱俺,新添加的任務(wù)就會(huì)放到等待隊(duì)列中稍味,如果等待隊(duì)列滿了,線程池就想快點(diǎn)執(zhí)行任務(wù)荠卷,騰出位置給新加進(jìn)來(lái)的線程模庐,如果當(dāng)前工作線程數(shù)小于 maximumPoolSize
,那么就創(chuàng)建新的線程來(lái)執(zhí)行剛加進(jìn)來(lái)的任務(wù)油宜,可以認(rèn)為是線程池負(fù)荷過(guò)重掂碱,創(chuàng)建新的線程來(lái)減輕壓力。
long keepAliveTime
線程存活時(shí)間慎冤,如果一個(gè)線程處在空閑狀態(tài)的時(shí)間超過(guò)了這個(gè)值疼燥,就會(huì)因?yàn)槌瑫r(shí)而退出。
BlockingQueue<Runnable> workQueue
workQueue
是一個(gè)阻塞隊(duì)列蚁堤,用來(lái)存放等待被執(zhí)行的任務(wù)的隊(duì)列醉者。如果核心池滿了,就把等待執(zhí)行的線程放到這里披诗。
ThreadFactory threadFactory
用來(lái)創(chuàng)建線程的線程工廠撬即。
RejectedExecutionHandler handler
任務(wù)拒絕策略。ThreadPoolExecutor
中有四個(gè)實(shí)現(xiàn)了 RejectedExecutionHandler
接口的內(nèi)部類(lèi)呈队,分別是:
- ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常剥槐。
- ThreadPoolExecutor.DiscardPolicy:內(nèi)部什么也沒(méi)有做,也就是丟棄任務(wù)掂咒,不拋出異常才沧。
- ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后提交當(dāng)前插入的任務(wù)绍刮。
- ThreadPoolExecutor.CallerRunsPolicy:調(diào)用策略的調(diào)用者直接在內(nèi)部執(zhí)行了任務(wù)的
run
方法温圆。
execute
方法
線程池的核心方法是 execute
方法,來(lái)看這個(gè)方法做了什么:
public void execute(Runnable command) {
if (command == null) // 先判斷輸入?yún)?shù)的合法性
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*
* 上面是 JDK 自帶的注釋?zhuān)瑏?lái)翻譯一下:
* 1. 如果核心池沒(méi)有滿孩革,嘗試執(zhí)行當(dāng)前任務(wù)岁歉, addWorker 原子性地檢查線程池狀態(tài)和線程數(shù)量,
* 通過(guò)返回 false 防止不應(yīng)該加入線程卻加入了線程這樣的錯(cuò)誤警告
* 2. 如果一個(gè)線程能夠成功插入隊(duì)列膝蜈,我們還應(yīng)該二次檢查我們是否已經(jīng)加了一個(gè)線程
* (因?yàn)榭赡苡芯€程在上次檢查過(guò)后死掉了)或者進(jìn)到這個(gè)方法以后線程池關(guān)閉了锅移。
* 因此我們?cè)俅螜z查線程池狀態(tài),如果線程池已經(jīng)關(guān)閉了我們有必要回滾進(jìn)隊(duì)操作饱搏,
* 或者如果沒(méi)有非剃,就啟動(dòng)新線程
* 3. 如果我們不能把線程插入隊(duì)列,那么我們嘗試添加一個(gè)新線程推沸,
* 如果失敗了备绽,那就是線程池飽和了或者關(guān)閉了券坞,按照之前的拒絕策略拒絕任務(wù)。
*/
int c = ctl.get(); // 獲取當(dāng)前線程池狀態(tài)信息
// 如果當(dāng)前核心池沒(méi)有滿肺素,就執(zhí)行當(dāng)前任務(wù)
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) // 如果執(zhí)行成功直接返回
return;
c = ctl.get(); // 沒(méi)執(zhí)行成功恨锚,再次獲取線程池狀態(tài)
}
// 如果線程池正在運(yùn)行并且成功加入到等待隊(duì)列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get(); // 再次檢查線程池狀態(tài),因?yàn)榫€程池在上次檢查之后可能關(guān)閉了
// 如果線程池已經(jīng)關(guān)閉并且成功從等待隊(duì)列中移除剛插入的任務(wù)倍靡,拒絕任務(wù)
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果添加到等待隊(duì)列失敗猴伶,拒絕任務(wù)
else if (!addWorker(command, false))
reject(command);
}
源碼中出現(xiàn)了多次 addWorker
操作,繼續(xù)查看 addWorker
的源碼塌西。
addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get(); // 獲取線程池執(zhí)行狀態(tài)
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 如果線程池不是正常運(yùn)行狀態(tài)他挎,如果出現(xiàn)以下3種情況之一的,就返回 false :
// 1. 線程池不是關(guān)閉狀態(tài)
// 2. 線程池關(guān)閉了雨让,但是傳入的任務(wù)非空
// 3. 線程池關(guān)閉了雇盖,傳入的線程非空但是沒(méi)有任務(wù)正在執(zhí)行
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 如果線程池一切正常,那么執(zhí)行以下邏輯
for (;;) {
int wc = workerCountOf(c); // 獲取當(dāng)前線程池中線程數(shù)量
// 如果線程池已滿栖忠,返回 false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 如果線程池沒(méi)滿崔挖,線程安全地增加線程數(shù)量,增加成功就退出循環(huán)
if (compareAndIncrementWorkerCount(c))
break retry;
// 添加失敗的話就再獲取狀態(tài)庵寞,如果線程池狀態(tài)和之前獲取的狀態(tài)不一致狸相,繼續(xù)循環(huán)
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 添加任務(wù)成功之后才會(huì)執(zhí)行到這里
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 當(dāng)拿到鎖以后再次檢查狀態(tài)
// 如果 ThreadFactory 失敗或者獲取鎖過(guò)程中線程池關(guān)閉,就退出
int rs = runStateOf(ctl.get());
// 如果線程池狀態(tài)正尘璐ǎ或者線程池關(guān)閉了同時(shí)任務(wù)為空
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 如果線程已經(jīng)啟動(dòng)脓鹃,就拋出異常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w); // 將 worker 加入到工作線程隊(duì)列中
int s = workers.size();
if (s > largestPoolSize) // 記錄線程池達(dá)到過(guò)的最大容量
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) { // 如果任務(wù)添加成功,就開(kāi)始執(zhí)行任務(wù)
t.start(); // 啟動(dòng)線程古沥,也就是 worker瘸右,worker 會(huì)不斷從等待隊(duì)列中獲取任務(wù)并執(zhí)行
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
在 addWorker
方法中將任務(wù)封裝成了一個(gè) Worker
類(lèi),執(zhí)行任務(wù)的時(shí)候執(zhí)行的線程是從 Worker
類(lèi)中獲取的線程岩齿,Worker
是線程池的一個(gè)內(nèi)部類(lèi)太颤,查看它的源碼。
Worker
類(lèi)
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
* 為了抑制 javac 警告添加了序列化ID
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
// worker 運(yùn)行的線程盹沈,如果 ThreadFactory 生成失敗的話這個(gè)值為 null
final Thread thread;
/** Initial task to run. Possibly null. */
// 運(yùn)行的初始任務(wù)龄章,可能為 null
Runnable firstTask;
/** Per-thread task counter */
// 記錄總共執(zhí)行過(guò)的任務(wù)
volatile long completedTasks;
/**
* 用傳進(jìn)來(lái)的參數(shù)作為第一個(gè)任務(wù),用 ThreadFactory 創(chuàng)建線程
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/**
* 將運(yùn)行任務(wù)交給其他的方法執(zhí)行
* Worker 作為一個(gè)實(shí)現(xiàn)了 Runnable 接口的類(lèi)乞封,要實(shí)現(xiàn) run 方法做裙,
* 線程啟動(dòng)的時(shí)候調(diào)用的是 start 方法,start 方法內(nèi)部調(diào)用 run 方法肃晚,
* 所以實(shí)際運(yùn)行時(shí)候執(zhí)行的是這個(gè) run 方法
*/
public void run() {
runWorker(this);
}
/**
* Worker 繼承了 AbstractQueuedSynchronizer锚贱,下面就是需要重寫(xiě)的一些必要的方法
*/
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
// 是否是獨(dú)占鎖
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
從 Worker
中可以看出,它繼承了 AbstractQueuedSynchronizer
关串,方便加鎖解鎖拧廊,并且實(shí)現(xiàn)了 Runnable
接口杂穷,本身作為一個(gè)線程運(yùn)行。
這里就是線程池為什么任務(wù)執(zhí)行之后線程沒(méi)有銷(xiāo)毀卦绣,提交到線程池的線程不是調(diào)用了線程的 start
方法,而是被 Worker
中的 run
方法調(diào)用飞蚓,run
方法內(nèi)部等下再看滤港。Worker
中的屬性 thread
是將 Worker
本身封裝成為了一個(gè) Thread
,然后啟動(dòng)線程趴拧,雖然執(zhí)行的是 run
方法而不是我們所熟知的 start
方法啟動(dòng)線程溅漾,但是任務(wù)的 run
方法被 Worker
的 run
方法調(diào)用,Worker
的 run
方法又是被 start
方法所啟動(dòng)著榴,因此實(shí)現(xiàn)了線程的交互運(yùn)行添履。
接下來(lái)看一下 runWorker
方法,這個(gè)方法是線程池如何不銷(xiāo)毀線程而不斷執(zhí)行任務(wù)的脑又。
runWorker
runWorker
實(shí)際上是線程池 ThreadPoolExecutor
中的方法而不是 Worker
中的:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask; // 獲取 Worker 中當(dāng)前要執(zhí)行的任務(wù)
w.firstTask = null; // 已經(jīng)拿到了任務(wù)暮胧,將 Worker 中的任務(wù)置為 null
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 如果任務(wù)不為空或者任務(wù)為空但是從隊(duì)列中獲取到了任務(wù),就執(zhí)行任務(wù)
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 如果線程池停止了问麸,確保線程被中斷了往衷,如果線程池正在運(yùn)行,確保線程沒(méi)有被中斷
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 開(kāi)始執(zhí)行任務(wù)之前應(yīng)該做的严卖,本身什么都不做席舍,可以子類(lèi)重寫(xiě)
Throwable thrown = null;
try {
task.run(); // 執(zhí)行任務(wù)的 run 方法,這里才真正執(zhí)行了任務(wù)
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // 同 beforeExecute
}
} finally { // 將要執(zhí)行的任務(wù)置為空哮笆,已完成任務(wù) +1来颤,釋放鎖
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false; // 標(biāo)記是否是正常完成,如果出現(xiàn)異常是不會(huì)執(zhí)行這一步的稠肘,直接執(zhí)行 finally
} finally {
// 結(jié)束線程福铅,在之前提到的核心池滿了等待隊(duì)列也滿了會(huì)創(chuàng)建臨時(shí)線程執(zhí)行任務(wù),執(zhí)行完銷(xiāo)毀
// 或者線程池停止了也要結(jié)束工作線程
processWorkerExit(w, completedAbruptly);
}
}
可以看出 runWorker
方法中真正執(zhí)行了任務(wù)启具,然后不停從等待隊(duì)列中獲取新的任務(wù)繼續(xù)執(zhí)行本讥。
下面看一下是怎么從等待隊(duì)列中獲取任務(wù)的。
getTask
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 線程池已經(jīng)不在運(yùn)行而且線程池被停止或者等待隊(duì)列為空鲁冯,將工作線程數(shù)減 1
// 因?yàn)?getTask 是被一個(gè)工作線程調(diào)用的拷沸,如果返回 null,調(diào)用 getTask 方法的 Worker 就結(jié)束運(yùn)行
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
// 如果線程池一切正常薯演,繼續(xù)下面的邏輯
int wc = workerCountOf(c);
// Are workers subject to culling?
// 如果創(chuàng)建線程池時(shí)候設(shè)置了超時(shí)或者當(dāng)前啟用了超出核心池的線程“加班”執(zhí)行任務(wù)撞芍,timed 為 true
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果當(dāng)前請(qǐng)求任務(wù)的是超出核心池大小的線程或者已經(jīng)超時(shí),同時(shí)工作線程數(shù)大于 1 或者等待隊(duì)列為空
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c)) // 嘗試將工作線程數(shù)減 1跨扮,如果成功返回 null序无,否則繼續(xù)執(zhí)行
return null;
continue;
}
try {
// 如果超時(shí)的話從等待隊(duì)列中獲取任務(wù)验毡,如果一段時(shí)間內(nèi)沒(méi)有任務(wù)就返回null
// 否則阻塞地從等待隊(duì)列中獲取任務(wù),一直到有任務(wù)返回才繼續(xù)執(zhí)行下一步帝嗡,也就是一定會(huì)返回一個(gè)任務(wù)
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
線程池原理小結(jié)
看了 execute
晶通、addWorker
、Worker
類(lèi)哟玷、runWorker
的源碼狮辽,可以清楚地了解線程池的原理:
每當(dāng)有任務(wù)被提交(execute
方法),如果核心池沒(méi)有滿巢寡,就創(chuàng)建一個(gè) Worker
(也就是 addWorker
方法)喉脖,創(chuàng)建后 worker
開(kāi)始工作(在 addWorker
方法中調(diào)用啟動(dòng) Worker
中的 thread
,也就是執(zhí)行了 runWorker
方法)抑月,runWorker
方法會(huì)不停地從等待隊(duì)列 workQueue
中獲取任務(wù)并執(zhí)行(之前說(shuō)過(guò)树叽,執(zhí)行任務(wù)的時(shí)候執(zhí)行的是任務(wù)的 run
方法,但是 worker
是一個(gè)線程谦絮,所以相當(dāng)于 Thread
的 start
方法間接調(diào)用了任務(wù)的run
方法)题诵。
如果核心池滿了,并且等待隊(duì)列也滿了挨稿,而且核心池大小小于線程池最大大小仇轻,就會(huì)進(jìn)行挽救措施:創(chuàng)建新的 Worker
來(lái)執(zhí)行新提交的任務(wù),這個(gè)新的 Worker
執(zhí)行完任務(wù)以后就會(huì)銷(xiāo)毀奶甘。
如果核心池滿了篷店,等待隊(duì)列也滿了,核心池大小也等于最大線程池大小臭家,那就只能拒絕任務(wù)了疲陕,根據(jù)構(gòu)造函數(shù)中傳入的拒絕策略拒絕任務(wù)。
下篇文章 ThreadPool 之 Callable钉赁、Future 和 FutureTask 將分析 Callable蹄殃、Future 和 FutureTask。