原文地址:https://mp.weixin.qq.com/s/n2cZxqHqaMJM2mh9Tk13Bg
目錄
- TreadPoolexecutor源碼解析
- 類關系圖
- Executor接口
- ExecutorService接口
- AbstractExecutorService
- 成員變量
- 構造函數(shù)
- Worker類解析
- Worker簡介
- 成員變量
- 構造方法
- execute()解析
- addWorker()解析
- runWorker()解析
- getTask()解析
- processWorkerExit()解析
- tryTerminate()解析
- interruptIdleWorker()解析
- 監(jiān)控線程池
TreadPoolexecutor源碼解析
1.類關系圖
image
Executor接口
Executor提供execute()用來啟動任務
public interface Executor {
//用來啟動任務
void execute(Runnable command);
}
2.ExecutorService接口
ExecutorService提供了一些管理線程池方法和任務執(zhí)行的方法
public interface ExecutorService extends Executor {
//關閉線程池钝荡,隊列已經(jīng)存在的任務可以繼續(xù)執(zhí)行
void shutdown();
//關閉線程池傀顾,中斷未執(zhí)行的任務
List<Runnable> shutdownNow();
//判斷是否關閉
boolean isShutdown();
//判斷是否終止
boolean isTerminated();
//設置超時終止
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
//提交Callable任務
<T> Future<T> submit(Callable<T> task);
//提交Runable任務只祠,帶返回值
<T> Future<T> submit(Runnable task, T result);
//提交Runnable任務不帶返回值
Future<?> submit(Runnable task);
//invokeAll()是同步的修然,其需要等待任務的完成非区,才能返回郭膛。submit()是異步的
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
//invokeAny()取第一個任務的返回值麸拄,然后調用interrupt方法中斷其它任務惜索。
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
AbstractExecutorService
AbstractExecutorService類重寫某些方法特笋。自定義了newTaskFor()用于構建RunnableFuture。
成員變量
/**
* 線程池使用一個int變量存儲線程池狀態(tài)和工作線程數(shù)
* int4個字節(jié),32位猎物,用高三位存儲線程池狀態(tài)虎囚,低29位存儲工作線程數(shù)
* 為什么使用一個變量來同時表示線程狀態(tài)和線程數(shù)?就是節(jié)省空間蔫磨。咨詢了一下寫c的朋友淘讥,他們經(jīng)常這么寫
**/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//COUNT_BITS=29
private static final int COUNT_BITS = Integer.SIZE - 3;
//理論上線程池最大線程數(shù)量CAPACITY=536870911
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
/**
* 線程池狀態(tài)轉換
* RUNNING -> SHUTDOWN
* RUNNING or SHUTDOWN -> STOP
* SHUTDOWN or STOP -> TIDYING
* TIDYING -> TERMINATED terminated()執(zhí)行完后變?yōu)樵揟ERMINATED
*/
//接受新任務,可以處理阻塞隊列里的任務
private static final int RUNNING = -1 << COUNT_BITS;
//不接受新任務堤如,可以處理阻塞隊列里的任務蒲列。執(zhí)行shutdown()會變?yōu)镾HUTDOWN
private static final int SHUTDOWN = 0 << COUNT_BITS;
//不接受新的任務,不處理阻塞隊列里的任務搀罢,中斷正在處理的任務蝗岖。執(zhí)行shutdownNow()會變?yōu)镾TOP
private static final int STOP = 1 << COUNT_BITS;
//臨時過渡狀態(tài),所有的任務都執(zhí)行完了榔至,當前線程池有效的線程數(shù)量為0抵赢,這個時候線程池的狀態(tài)是TIDYING,執(zhí)行terminated()變?yōu)門ERMINATED
private static final int TIDYING = 2 << COUNT_BITS;
//終止狀態(tài)唧取,terminated()調用完成后的狀態(tài)
private static final int TERMINATED = 3 << COUNT_BITS;
//獲取線程池狀態(tài)
private static int runStateOf(int c) { return c & ~CAPACITY; }
//獲取工作線程數(shù)
private static int workerCountOf(int c) { return c & CAPACITY; }
//初始化ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
//用于保存等待執(zhí)行的任務的阻塞隊列铅鲤。比如LinkedBlockQueue,SynchronousQueue等
private final BlockingQueue<Runnable> workQueue;
//重入鎖枫弟,更新線程池核心大小彩匕、線程池最大大小等都有用到
private final ReentrantLock mainLock = new ReentrantLock();
//用于存儲woker
private final HashSet<Worker> workers = new HashSet<Worker>();
//用于終止線程池
private final Condition termination = mainLock.newCondition();
//記錄線程池中曾經(jīng)出現(xiàn)過的最大線程數(shù)
private int largestPoolSize;
//完成任務數(shù)量
private long completedTaskCount;
//線程工廠
private volatile ThreadFactory threadFactory;
/**
* rejectedExecutionHandler:任務拒絕策略
* DiscardOldestPolicy:丟棄隊列里最近的一個任務,并執(zhí)行當前任務
* AbortPolicy:拋出異常媒区。這也是默認的策略
* CallerRunsPolicy:用調用者所在線程來運行任務
* DiscardPolicy:不處理驼仪,丟棄掉
*/
private volatile RejectedExecutionHandler handler;
/**
* 線程空閑時間
* 當線程空閑時間達到keepAliveTime,該線程會退出袜漩,直到線程數(shù)量等于corePoolSize绪爸。
* 如果allowCoreThreadTimeout設置為true,則所有線程均會退出宙攻。
*/
private volatile long keepAliveTime;
//是否允許核心線程空閑超時退出奠货,默認值為false。
private volatile boolean allowCoreThreadTimeOut;
/**
* 核心線程數(shù)
* 核心線程會一直存活座掘,即使沒有任務需要處理递惋,當線程數(shù)小于核心線程數(shù)時。
* 即使現(xiàn)有的線程空閑溢陪,線程池也會優(yōu)先創(chuàng)建新線程來處理任務萍虽,而不是直接交給現(xiàn)有的線程處理。
* 核心線程數(shù)在初始化時不會創(chuàng)建形真,只有提交任務的時候才會創(chuàng)建杉编。核心線程在allowCoreThreadTimeout為true的時候超時會退出。
*/
private volatile int corePoolSize;
/** 最大線程數(shù)
* 當線程數(shù)大于或者等于核心線程,且任務隊列已滿時邓馒,線程池會創(chuàng)建新的線程嘶朱,直到線程數(shù)量達到maxPoolSize。
* 如果線程數(shù)已等于maxPoolSize光酣,且任務隊列已滿疏遏,則已超出線程池的處理能力,線程池會采取拒絕操作救军。
*/
private volatile int maximumPoolSize;
//默認的拒絕策略:拋出異常
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");
構造函數(shù)
//直接提供了一個最終調用的構造函數(shù)
//大致邏輯就是給線程池核心參數(shù)賦值
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
//轉為納秒
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
Worker類解析
Worker簡介
Woeker類是ThreadPoolExecutor一個內部類财异。此類繼AbstractQueuedSynchronizer,
目的在于判斷線程是否空閑以及是否可以被中斷缤言。實現(xiàn)Runnable宝当,在run()中調用了runWorker()视事。
Worker類中firstTask用來保存?zhèn)魅氲娜蝿盏ㄏ簦瑃hread是在調用構造方法時通過ThreadFactory來創(chuàng)建的線程,是用來處理任務的線程俐东。注意:這個線程并不是task線程跌穗。
成員變量
/** 工作線程空間,由線程池中所設置的線程工廠創(chuàng)建*/
final Thread thread;
/** 在構造方法中傳入的任務*/
Runnable firstTask;
/** 執(zhí)行完任務的總數(shù)*/
volatile long completedTasks;
構造方法
Worker(Runnable firstTask) {
//state設置為-1是為了禁止在執(zhí)行任務前對任務進行中斷虏辫。
setState(-1);
//提交的task
this.firstTask = firstTask;
//從線程工廠獲取的線程蚌吸,注意這個thread并不是用戶線程
this.thread = getThreadFactory().newThread(this);
}
execute()解析
public void execute(Runnable command) {
//判斷提交的任務是否為空
if (command == null)
throw new NullPointerException();
//獲取線程池狀態(tài)和工作線程數(shù)量結合體(下文統(tǒng)稱為ctl)
int c = ctl.get();
//判斷工作線程數(shù)量是否小于核心線程數(shù)
if (workerCountOf(c) < corePoolSize) {
//把任務添加到worker,添加成功則返回
if (addWorker(command, true))
return;
//再次獲取ctl
c = ctl.get();
}
//如果線程池狀態(tài)是正在運行并且添加一個任務到隊列成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//再次校驗線程池狀態(tài)砌庄,如果狀態(tài)不是RUNNING則需要從隊列中移除任務
if (! isRunning(recheck) && remove(command))
//執(zhí)行拒絕策略
reject(command);
else if (workerCountOf(recheck) == 0)
//進入這里說明空閑核心線程數(shù)都超時退出啦
//因為任務已經(jīng)放入隊列了羹唠,所以此處不需要傳入任務
//注意事項,網(wǎng)上很多說這里創(chuàng)建一個線程不啟動這是錯誤的娄昆。博主親測這里創(chuàng)建了一個線程并且需要啟動
addWorker(null, false);
}
/*
* 如果執(zhí)行到這里有兩種情況:
* 線程池已經(jīng)不是RUNNING狀態(tài)
* 線程池是RUNNING狀態(tài)佩微,workerCount>=corePoolSize并且workQueue已滿
*/
//調用addWorker(),傳入false代表把線程池線程數(shù)量設置maximumPoolSize,如果添加失敗則執(zhí)行拒絕策略萌焰。
else if (!addWorker(command, false))
reject(command);
}
addWorker()解析
/**
*@param firstTask 表示執(zhí)行的任務哺眯;
*@param core 表示限制添加線程的數(shù)量是根據(jù)corePoolSize來判斷還是maximumPoolSize來判斷;
* 如果為true扒俯,根據(jù)corePoolSize來判斷奶卓;如果為false,則根據(jù)maximumPoolSize來判斷
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
//獲取線程池狀態(tài)
int rs = runStateOf(c);
/**
* 只要滿足下面任一條就直接返回false
* 線程池狀態(tài)為STOP, TYDING 或 TERMINATD 狀態(tài)
* 線程池狀態(tài)為SHUTDOWN撼玄,并且firstTask != null 或者workQueue為空
*/
//這里為什么不直接判斷線程池狀態(tài)呢夺姑?是因為有可能在線程池狀態(tài)為RUNNING時已經(jīng)把任務放入隊列中,放入完成以后狀態(tài)變?yōu)镾HUTDOWN
if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&
firstTask == null &&! workQueue.isEmpty()))
return false;
for (;;) {
//得到工作的線程數(shù)量
int wc = workerCountOf(c);
//如果工作線程數(shù)量大于理論上線程池容量掌猛;或者工作線程數(shù)大于(corePoolSize or maximumPoolSize) 直接返回false瑟幕,添加失敗
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//cas修改工作線程數(shù),工作線程數(shù)+1。如果修改失敗需要重新執(zhí)行只盹;成功退出循環(huán)
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
//如果線程池狀態(tài)變化則需要重新執(zhí)行
if (runStateOf(c) != rs)
continue retry;
}
}
//worker是否已經(jīng)啟動
boolean workerStarted = false;
//worker是否添加成功
boolean workerAdded = false;
Worker w = null;
try {
//構建worker
w = new Worker(firstTask);
//注意辣往,這個thread不是firstTask,是從線程工廠造出來的
final Thread t = w.thread;
if (t != null) {
//獲取鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//獲取線程池狀態(tài)
int rs = runStateOf(ctl.get());
//如果線程池狀態(tài)是RUNNING或者狀態(tài)是SHUTDOWN但是隊列里面還有任務
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//如果t.isAlive()=true殖卑,說明是有問題的站削,都沒有啟動,t怎么會是活的呢孵稽。所以拋出異常许起。
if (t.isAlive())
throw new IllegalThreadStateException();
//把worker添加到set集合
workers.add(w);
//記錄線程池中出現(xiàn)的最大線程數(shù)
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
//添加成功標識
workerAdded = true;
}
} finally {
//釋放鎖
mainLock.unlock();
}
//添加成功啟動線程,啟動線程是調用了runWorker()
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//啟動失敗
if (! workerStarted)
//啟動失敗需要從workers中移除當前構造的woker菩鲜;工作線程數(shù)減1了园细;執(zhí)行tryTerminate()判斷是否終止線程池。
addWorkerFailed(w);
}
return workerStarted;
}
runWorker()解析
final void runWorker(Worker w) {
//獲取當前線程
Thread wt = Thread.currentThread();
//需要執(zhí)行的任務
Runnable task = w.firstTask;
//置為null
w.firstTask = null;
//這里是為了把之前的state=-1設置為state=0接校,此時允許中斷
w.unlock();
//是否異常退出循環(huán)
boolean completedAbruptly = true;
try {
//如果任務不為null或者getTask()不為null
while (task != null || (task = getTask()) != null) {
//獲取鎖猛频。這里使用鎖的目的在于標識正在處理任務
w.lock();
//線程池=SHUWDOWN,要保證當前線程是中斷狀態(tài)
//線程池!=SHUWDOWN蛛勉,要保證當前線程不是中斷狀態(tài)
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//留給子類實現(xiàn)
beforeExecute(wt, task);
Throwable thrown = null;
try {
//執(zhí)行任務
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//子類執(zhí)行
afterExecute(task, thrown);
}
} finally {
task = null;
//更新任務完成數(shù)
w.completedTasks++;
w.unlock();
}
}
//沒有出現(xiàn)異常
completedAbruptly = false;
} finally {
//一定要注意鹿寻。執(zhí)行到這里說明getTask()返回null。說明當前線程池中不需要那么多線程來執(zhí)行任務了诽凌,可以把多于corePoolSize數(shù)量的工作線程干掉
processWorkerExit(w, completedAbruptly);
}
}
getTask()解析
//什么情況才會執(zhí)行getTask呢毡熏?說明工作線程數(shù)已經(jīng)大于核心線程數(shù)才會執(zhí)行getTask()。一定要記住這一點
private Runnable getTask() {
//表示上次從阻塞隊列中取任務時是否超時
boolean timedOut = false;
for (;;) {
int c = ctl.get();
//獲取線程池狀態(tài)
int rs = runStateOf(c);
//如果線程池狀態(tài)rs >= SHUTDOWN并且(rs >= STOP侣诵,或者阻塞隊列為空)痢法。則將workerCount減1并返回null。
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//獲取工作線程數(shù)
int wc = workerCountOf(c);
//表示是否需要超時控制杜顺。allowCoreThreadTimeOut默認false财搁;如果線程池數(shù)量超過核心線程數(shù)也是需要超時控制的
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//校驗工作線程數(shù)量和任務隊列是否為空
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//工作線程數(shù)量-1
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//根據(jù)是否超時從隊列中獲取任務
Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
//說明發(fā)生了中斷
timedOut = false;
}
}
}
processWorkerExit()解析
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//是否異常,如果異常工作線程數(shù)量-1
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//增加線程池完成任務數(shù)量
completedTaskCount += w.completedTasks;
//從worker中移除完成任務
workers.remove(w);
} finally {
mainLock.unlock();
}
//根據(jù)線程池狀態(tài)進行判斷是否結束線程池
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
tryTerminate()解析
//根據(jù)線程池狀態(tài)判斷是否結束線程池
final void tryTerminate() {
for (;;) {
int c = ctl.get();
//RUNNING不能結束線程池
//線程池狀態(tài)是TIDYING或TERMINATED說明線程池已經(jīng)處于正在終止的路上
//狀態(tài)為SHUTDOWN哑舒,但是任務隊列不為空不能結束線程池
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//工作線程數(shù)量不等于0妇拯,中斷一個空閑的工作線程,并返回
if (workerCountOf(c) != 0) {
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 設置線程池狀態(tài)為TIDYING洗鸵,如果設置成功越锈,則調用terminated方法
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//子類實現(xiàn)
terminated();
} finally {
// 設置狀態(tài)為TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
}
}
interruptIdleWorker()解析
//該方法用于給空閑工作線程一個中斷標識
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//遍歷worker,根據(jù)onlyOne判斷膘滨,如果為ture只中斷一個空閑線程
for (Worker w : workers) {
Thread t = w.thread;
//線程沒有被中斷并且線程是空閑狀態(tài)tryLock()判斷是否空閑
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
監(jiān)控線程池
通過線程池提供的參數(shù)進行監(jiān)控
getTaskCount:線程池任務總數(shù)甘凭。
getCompletedTaskCount:線程池已完成的任務數(shù)量,小于等于completedTaskCount火邓。
getPoolSize:線程池當前的核心線程數(shù)量丹弱。
getLargestPoolSize:線程池曾經(jīng)創(chuàng)建過的最大線程數(shù)量德撬。
getActiveCount:當前線程池中正在執(zhí)行任務的線程數(shù)量。