1. ThreadPoolExecutor相關(guān)框架圖
a. Executors
提供newCachedThreadPool
, newFixedThreadPool
, newScheduledThreadPool
, newSingleThreadExecutor
創(chuàng)建創(chuàng)建線程池。
b. RunnableAdapter
是Executors
的內(nèi)部類乐埠,提供Runnable
轉(zhuǎn)接為Callable
的方法;
c. DefaultThreadFactory
是Executors
內(nèi)部的線程工廠類, PrivilegedThreadFactory
重寫其newThread
方法;
d. Worker
重寫AQS獲取鎖的方法,并包裝業(yè)務(wù)邏輯; ThreadPoolExecutor::workers
保存所有的工作業(yè)務(wù), 當(dāng)其滿時緩存到workQueue
;
2.1 線程池中斷策略
ThreadPoolExecutor.AbortPolicy: 丟棄任務(wù)并拋出
RejectedExecutionException異常装盯。默認(rèn)
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù)惹盼,但是不拋出異常逢倍。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù)怔鳖,然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)
2.2 自定義中斷策略
class SelfRejected extends RejectedExecutionHandler
2.3.1 線程池狀態(tài)
//接收新任務(wù)摹菠,并且執(zhí)行緩存任務(wù)隊(duì)列中的任務(wù)
private static final int RUNNING = -1
// 不在接收新的任務(wù)盒卸,但是會執(zhí)行緩存中的任務(wù)
private static final int SHUTDOWN = 0
// 不接收新的任務(wù),不執(zhí)行緩存中的任務(wù)次氨,中斷正在運(yùn)行的任務(wù)
private static final int STOP = 1
// 所有任務(wù)已經(jīng)終止蔽介,workCount = 0;
private static final int TIDYING = 2
// terminated 方法調(diào)用完成
private static final int TERMINATED = 3
2.3.2 線程池狀態(tài)切換
RUNNING -> SHUTDOWN
* On invocation of shutdown(), perhaps implicitly in finalize()
(RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow() // shutdownNow
SHUTDOWN -> TIDYING
* When both queue and pool are empty // 緩存隊(duì)列、線程池均空閑
STOP -> TIDYING
* When pool is empty
TIDYING -> TERMINATED
* When the terminated() hook method has completed
ctl
The main pool control state, ctl, is an atomic integer packing two conceptual fields
workerCount
indicating the effective number of threads // 有效線程數(shù)量
runState
indicating whether running, shutting down etc // 運(yùn)行狀態(tài)
2.3.3 其他成員
// 緩存任務(wù)阻塞隊(duì)列
private final BlockingQueue<Runnable> workQueue;
// 線程池主鎖
private final ReentrantLock mainLock = new ReentrantLock();
// 工作線程
private final HashSet<Worker> workers = new HashSet<Worker>();
// mainLock上的終止條件量煮寡,用于支持awaitTermination
private final Condition termination = mainLock.newCondition();
// 記錄曾經(jīng)創(chuàng)建的最大線程數(shù)
private int largestPoolSize;
// 已經(jīng)完成任務(wù)數(shù)
private long completedTaskCount;
private volatile ThreadFactory threadFactory;
// 設(shè)置默認(rèn)任務(wù)拒絕策略
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
3.1 Executors::newFixedThreadPool 創(chuàng)建固定大小線程池
newFixedThreadPool創(chuàng)建固定大小線程池虹蓄,無非核心線程,超出的線程保存在LinkedBlockingQueue中等待幸撕;
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads, // corePoolSize == maximumPoolSize, 無非核心線程
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
3.2 ThreadPoolExecutor::execute
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*/
public void execute(Runnable command) {
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 當(dāng)工作者線程小于核心線程薇组,則new thread, runnable作為其第一個task,
* addWorker將檢查線程運(yùn)行時狀態(tài) 和 工作者總數(shù),
* 目的是: 當(dāng)不允許添加線程時坐儿,防止假喚醒律胀;
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 當(dāng)核心線程已滿,線程池處于運(yùn)行時狀態(tài)貌矿,則向緩存隊(duì)列workQueue添加炭菌;
* 此時,需要double-check
* 因?yàn)楣渎?dāng)調(diào)用此方法后線程池可能被shutdown黑低、 線程died
* 如果,工作者線程為0尽楔,添加一個null task的worker
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
* 此時投储,緩存隊(duì)列已滿,則創(chuàng)建一個線程阔馋,添加到works
*/
int c = ctl.get();
// 1. 如果工作線程數(shù)小于核心線程數(shù)玛荞,則添加新的線程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) // 3.3.1
return; // 添加成功則返回
c = ctl.get(); // 否則獲取線程池狀態(tài)
}
// 2. 工作線程數(shù)大于等于核心線程數(shù),則將任務(wù)放入緩存任務(wù)隊(duì)列
// 如果線程池正在運(yùn)行呕寝,而且成功將任務(wù)插入緩存任務(wù)隊(duì)列兩個條件
if (isRunning(c) && workQueue.offer(command)) { // 第一次檢查
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) // 第二次檢查
reject(command); // 如果不處于運(yùn)行狀態(tài)勋眯,則將任務(wù)從任務(wù)緩存隊(duì)列移除
else if (workerCountOf(recheck) == 0)
addWorker(null, false); //啟動無初始任務(wù)的非核心線程 // 3.3.1
}
// 3. 任務(wù)入隊(duì)失敗,說明任務(wù)緩存任務(wù)隊(duì)列已滿,嘗試添加新的線程處理
else if (!addWorker(command, false)) // 3.3.1
reject(command);
}
3.3.1 ThreadPoolExecutor::addWorker
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* @return true if successful
*/
private boolean addWorker(Runnable firstTask, boolean core) {
// 對 runState進(jìn)行循環(huán)獲取和判斷客蹋,如果不滿足添加條件則返回false
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c); // 線程池狀態(tài)
// Check if queue empty only if necessary.
// 對線程池狀態(tài)進(jìn)行判斷塞蹭,是否適合添加新的線程
if (rs >= SHUTDOWN && // 線程池關(guān)閉
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS 設(shè)置成功,則跳出最外層循環(huán)
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 如果workerCount沒有設(shè)置成功讶坯,而且runState發(fā)生變化番电,
// 則繼續(xù)最外層的循環(huán),對runState重新獲取和判斷
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); // 3.4
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 獲取鎖后辆琅,對runState進(jìn)行再次檢查
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w); // workers是HashSet<Worker>, 包含池中所有worker, 只有當(dāng)持有才可mainlock訪問漱办。
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // 3.4 如果添加成功,則啟動線程婉烟,執(zhí)行worker::run
workerStarted = true;
}
}
} finally { // 因?yàn)橹型景l(fā)生異常而沒有讓添加的線程啟動娩井,則回滾
if (! workerStarted)
addWorkerFailed(w); // 3.7
}
return workerStarted;
}
- 在外循環(huán)對運(yùn)行狀態(tài)進(jìn)行判斷,內(nèi)循環(huán)通過CAS機(jī)制對
workerCount
進(jìn)行增加似袁,當(dāng)設(shè)置成功洞辣,則跳出外循環(huán),否則進(jìn)行進(jìn)行內(nèi)循環(huán)重試; - 外循環(huán)之后昙衅,獲取全局鎖扬霜,再次對運(yùn)行狀態(tài)進(jìn)行判斷,符合條件則添加新的工作線程绒尊,并啟動工作線程畜挥,如果在最后對添加線程沒有開始運(yùn)行(可能發(fā)生內(nèi)存溢出仔粥,操作系統(tǒng)無法分配線程等等)則對添加操作進(jìn)行回滾婴谱,移除之前添加的線程;
3.3.2 線程池添加任務(wù)流程
核心線程 -> 工作隊(duì)列 -> 線程池 -> 飽和策略
3.4 worker 線程封裝類
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
Worker(Runnable firstTask) { // 5. worker框架參考
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); // 新建thread to save runable
}
/** Delegates main run loop to outer runWorker. */
public void run() {
runWorker(this); // 3.5 工作線程主函數(shù)
}
// 同步鎖,重寫AQS中相關(guān)鎖的方法
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
}
3.5 工作線程主函數(shù) ThreadPoolExecutor::runWorker
/**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them, while coping with a number of issues:
*
* 1. We may start out with an initial task, in which case we
* don't need to get the first one. Otherwise, as long as pool is
* running, we get tasks from getTask. If it returns null then the
* worker exits due to changed pool state or configuration
* parameters. Other exits result from exception throws in
* external code, in which case completedAbruptly holds, which
* usually leads processWorkerExit to replace this thread.
*
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and then we
* ensure that unless pool is stopping, this thread does not have
* its interrupt set.
*
* 3. Each task run is preceded by a call to beforeExecute, which
* might throw an exception, in which case we cause thread to die
* (breaking loop with completedAbruptly true) without processing
* the task.
*
* 4. Assuming beforeExecute completes normally, we run the task,
* gathering any of its thrown exceptions to send to afterExecute.
* We separately handle RuntimeException, Error (both of which the
* specs guarantee that we trap) and arbitrary Throwables.
* Because we cannot rethrow Throwables within Runnable.run, we
* wrap them within Errors on the way out (to the thread's
* UncaughtExceptionHandler). Any thrown exception also
* conservatively causes thread to die.
*
* 5. After task.run completes, we call afterExecute, which may
* also throw an exception, which will also cause thread to
* die. According to JLS Sec 14.20, this exception is the one that
* will be in effect even if task.run throws.
*
* The net effect of the exception mechanics is that afterExecute
* and the thread's UncaughtExceptionHandler have as accurate
* information as we can provide about any problems encountered by
* user code.
*
* @param w the worker
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 獲取任務(wù)躯泰,如果沒有任務(wù)可以獲取谭羔,則此循環(huán)終止,
// 這個工作線程將結(jié)束工作麦向,等待被清理前的登記工作
while (task != null || (task = getTask()) != null) { // 3.8 getTask獲取任務(wù)
w.lock(); // 執(zhí)行任務(wù)之前獲取工作線程鎖
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 如果線程池關(guān)閉瘟裸,則確保線程被中斷
// 如果線程池沒有關(guān)閉,則確保線程不被中斷
// 這就要求在第二種情況下诵竭,進(jìn)行重新檢查话告,處理shutdownNow正在運(yùn)行同時清除中斷
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 執(zhí)行前業(yè)務(wù)處理
try {
task.run(); // 業(yè)務(wù)處理
} catch (RuntimeException x) {
...
} finally {
afterExecute(task, thrown); // 執(zhí)行后業(yè)務(wù)處理
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 登記信息,移除結(jié)束線程卵慰,然后根據(jù)情況添加新的線程等
processWorkerExit(w, completedAbruptly); // 3.6
}
}
3.6 ThreadPoolExecutor::processWorkerExit
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w); // 釋放worker
} finally {
mainLock.unlock();
}
tryTerminate(); // 終止線程
}
3.7 ThreadPoolExecutor::addWorkerFailed
/**
* Rolls back the worker thread creation.
* - removes worker from workers, if present
* - decrements worker count
* - rechecks for termination, in case the existence of this
* worker was holding up termination
*/
private void addWorkerFailed(Worker w) {
// 對線程回滾需要獲取全局鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount(); // 減少工作線程計數(shù)
tryTerminate(); // 嘗試終止線程
} finally {
mainLock.unlock();
}
}
3.8 ThreadPoolExecutor::getTask
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* //退出業(yè)務(wù)處理
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait, and if the queue is
* non-empty, this worker is not the last thread in the pool.
*/
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount(); //減少workerCount數(shù)量
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 包含對超時的判斷沙郭,如果發(fā)生超時,則說明該worker已經(jīng)空閑了
// keepAliveTime時間裳朋,則應(yīng)該返回null病线,這樣會使工作線程正常結(jié)束,
// 并被移除
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take(); // workQueue中keepAliveTime內(nèi)獲取任務(wù)
// 如果在keepAliveTime時間內(nèi)獲取到任務(wù)則返回
if (r != null)
return r;
timedOut = true; // 否則將超時標(biāo)志設(shè)置為true
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
4.1 Executors::newCachedThreadPool
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, // 核心線程為0
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), // SynchronousQueue
threadFactory);
}
4.2 Executors::newSingleThreadExecutor
FinalizableDelegatedExecutorService
內(nèi)部封裝ThreadPoolExecutor
,且只有一個核心線程送挑;
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1/*corePoolSize*/, 1/*maximumPoolSize*/, // corePoolSize == maximumPoolSize == 1
0L/*keepAliveTime*/, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
4.3 Executors::FinalizableDelegatedExecutorService
private static class FinalizableDelegatedExecutorService
extends DelegatedExecutorService {
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
protected void finalize() {
super.shutdown();
}
}
FinalizableDelegatedExecutorService
繼承自DelegatedExecutorService
绑莺,并實(shí)現(xiàn)finalize
方法;
DelegatedExecutorService
封裝了ThreadPoolExecutor
5. Worker 相關(guān)框架
a. AbstractQueuedSynchronizer
提供了一個基于FIFO隊(duì)列惕耕,可以用于構(gòu)建鎖或者其他相關(guān)同步裝置的基礎(chǔ)框架纺裁;
b. Node是AbstractQueuedSynchronizer
的內(nèi)部類,Node保存著線程引用和線程狀態(tài)的容器司澎,每個線程對同步器的訪問对扶,都可以看做是隊(duì)列中的一個節(jié)點(diǎn);
c. Worker 實(shí)現(xiàn)AbstractQueuedSynchronizer
中acquire
相關(guān)方法惭缰;
AQS 中Node節(jié)點(diǎn)