在一個(gè)應(yīng)用程序中攀隔,我們需要多次使用線程皂贩,也就意味著需要多次創(chuàng)建并銷(xiāo)毀線程。而創(chuàng)建并銷(xiāo)毀線程的過(guò)程勢(shì)必會(huì)消耗內(nèi)存昆汹。而在Java中明刷,內(nèi)存資源是及其寶貴的,所以满粗,就提出了線程池的概念
構(gòu)造函數(shù)
public ThreadPoolExecutor(int corePoolSize, // 核心線程數(shù)
int maximumPoolSize, // 最大線程數(shù)(包含核心線程數(shù)量)
long keepAliveTime, // 大于核心線程數(shù)時(shí) 淘汰線程的時(shí)間
TimeUnit unit, // 時(shí)間單位
BlockingQueue<Runnable> workQueue, // 傳輸和保存等然任務(wù)的阻塞隊(duì)列
ThreadFactory threadFactory, // 用于創(chuàng)建新線程
RejectedExecutionHandler handler) { // 線程的淘汰策略
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0) // 這里會(huì)對(duì)線程數(shù)量以及等待時(shí)間做一些基本校驗(yàn)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
線程池中的狀態(tài)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
// 初始值 0001 1111 1111 1111 1111 1111 1111 1111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS; // 111
private static final int SHUTDOWN = 0 << COUNT_BITS; // 000
private static final int STOP = 1 << COUNT_BITS; // 001
private static final int TIDYING = 2 << COUNT_BITS; // 010
private static final int TERMINATED = 3 << COUNT_BITS; // 011
可以看到線程池使用一個(gè)AtomicInteger來(lái)表示當(dāng)前線程的情況辈末,其中高三位表示當(dāng)前線程池的狀態(tài),而低29位表示線程池的
需要注意的是 -1的二進(jìn)制表示 是 1的二進(jìn)制 取反在加1:
1: 0000 0000 0000 0000 0000 0000 0000 0001
取反: 1111 1111 1111 1111 1111 1111 1111 1110
加1: 1111 1111 1111 1111 1111 1111 1111 1111
線程狀態(tài)流轉(zhuǎn):
運(yùn)行狀態(tài) 狀態(tài)描述
RUNNING 接收新任務(wù),并且也能處理阻塞隊(duì)列中的任務(wù)本冲。
SHUTDOWN 不接收新任務(wù)准脂,但是卻可以繼續(xù)處理阻塞隊(duì)列中的任務(wù)。
STOP 不接收新任務(wù)檬洞,同時(shí)也不處理隊(duì)列任務(wù)狸膏,并且中斷正在進(jìn)行的任務(wù)。
TIDYING 所有任務(wù)都已終止添怔,workercount(有效線程數(shù))為0湾戳,線程轉(zhuǎn)向 TIDYING 狀態(tài)將會(huì)運(yùn)行 terminated() 鉤子方法。
TERMINATED terminated() 方法調(diào)用完成后變成此狀態(tài)广料。
execute
execute 是我們提交一個(gè)線程的核心方法砾脑, 我們以該方法為入口看看線程池的實(shí)現(xiàn):
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
// 當(dāng)前池中線程數(shù)量小于corePoolSize,基于傳入的command創(chuàng)建新的線程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) // 創(chuàng)建新線程
return;
c = ctl.get();
}
// 大于corePoolSize的話 則入隊(duì)(阻塞隊(duì)列的offer方法是不會(huì)阻塞的 插入失敗直接返回false)
if (isRunning(c) && workQueue.offer(command)) {
// 當(dāng)前線程池是running狀態(tài) 并且入隊(duì)成功
int recheck = ctl.get();
// recheck下如果不是running狀態(tài) 刪除掉這個(gè)命令
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0) // 池中沒(méi)有線程了 創(chuàng)建一個(gè)
addWorker(null, false);
}
else if (!addWorker(command, false)) // 嘗試創(chuàng)建非核心線程
reject(command); // 創(chuàng)建失敗 則執(zhí)行reject 策略
}
addWorker
首先我們來(lái)看下 worker是個(gè)什么東西:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/**
* Thread this worker is running in. Null if factory fails.
*/
final Thread thread;
/**
* Initial task to run. Possibly null.
*/
Runnable firstTask;
/**
* Per-thread task counter
*/
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
*
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
}
可以看出艾杏, worker 本身繼承Runnable韧衣,并且也是AQS的子類,所以worker本身也可以完成AQS的一些阻塞操作购桑,這里自己繼承AQS而不是使用ReentrangLock的目的就是防止重入畅铭。并且,在worker的構(gòu)造方法中勃蜘,會(huì)使用ThreadFactory創(chuàng)建一個(gè)新的線程硕噩,傳入的Runnable就是自己,所以說(shuō)線程池中跑的就是一個(gè)個(gè)Worker缭贡,然后通過(guò)firstTask保存用戶傳進(jìn)來(lái)的任務(wù)炉擅,然后通過(guò)run方法,調(diào)用到runWorker阳惹,從而執(zhí)行用戶任務(wù)谍失, 關(guān)于runWorker,后面解析莹汤。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c); // 獲取線程池當(dāng)前狀態(tài)
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
整個(gè)方法可以拆分成兩部分 :
第一部分:
retry: // 標(biāo)識(shí)一個(gè)循環(huán) 如果有多層嵌套循環(huán)袱贮, 可以直接跳到retry標(biāo)識(shí)的某個(gè)循環(huán)
for (;;) {
int c = ctl.get();
int rs = runStateOf(c); // 獲取線程池當(dāng)前狀態(tài)
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
在外層的for循環(huán)里,有一個(gè)判斷:
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;
轉(zhuǎn)換為 :
rs >= shutdown && (rs != shutdown || firstTask != null || workQueue.isEmpty())
這個(gè)判斷的意思是:
- rs > shoutdown , 也就是STOP,TIDYING,TERMINATED狀態(tài)直接返回失敗
- rs >= shutdown && firstTask != null体啰,線程池狀態(tài)處于 SHUTDOWN,STOP嗽仪,TIDYING荒勇,TERMINATED狀態(tài)且worker的首個(gè)任務(wù)不為空時(shí),添加工作線程失敗闻坚,不接受新任務(wù)
- rs >= shutdown && workQueue.isEmpty:線程池狀態(tài)處于 SHUTDOWN沽翔,STOP,TIDYING,TERMINATED狀態(tài)且阻塞隊(duì)列為空時(shí)仅偎,添加工作線程失敗跨蟹,不接受新任務(wù)。
所以橘沥,最外層的 for 循環(huán)是不斷的校驗(yàn)當(dāng)前的線程池狀態(tài)是否能接受新任務(wù)窗轩,如果校驗(yàn)通過(guò)了之后才能繼續(xù)往下運(yùn)行。
首先做判斷:
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
如果已經(jīng)大于最大容量 或 根據(jù)本次操作類型 判斷是否已經(jīng)大于核心線程數(shù) 或 最大線程數(shù) 如果已經(jīng)超過(guò) 則返回失敗座咆。
if (compareAndIncrementWorkerCount(c))
break retry;
這里會(huì)增加ctl中記錄的線程數(shù)痢艺,如果增加成功,則跳出外層循環(huán)介陶,執(zhí)行下面要將的第二部分堤舒。
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
上面增加ctl線程數(shù)量失敗的話,再次檢查當(dāng)前狀態(tài) 哺呜,從外層循環(huán)再次執(zhí)行舌缤。
當(dāng)上面增加ctl中記錄的線程個(gè)數(shù)后,就該執(zhí)行第二部分了:
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); // 創(chuàng)建一個(gè)work對(duì)象
final Thread t = w.thread; // 拿到work對(duì)象中的線程
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // 這邊會(huì)操作workers(HashSet) 所以要加鎖
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get()); // 獲取線程池運(yùn)行狀態(tài)
if (rs < SHUTDOWN || // RUNNING狀態(tài)
(rs == SHUTDOWN && firstTask == null)) { // 這里為了傳入任務(wù)為空的情況也可以創(chuàng)建
if (t.isAlive()) // 如果線程已經(jīng)被啟動(dòng)了 直接拋出異常
throw new IllegalThreadStateException();
workers.add(w); // 將新創(chuàng)建的worker添加到workersSet中
int s = workers.size();
if (s > largestPoolSize) // 維護(hù)下largestPoolSize 這個(gè)標(biāo)識(shí)
largestPoolSize = s;
workerAdded = true; // 標(biāo)識(shí)位設(shè)置為true
}
} finally {
mainLock.unlock();
}
if (workerAdded) {// 如果本次添加worker成功
t.start(); // 讓worker跑起來(lái)
workerStarted = true; // 設(shè)置啟動(dòng)成功的標(biāo)志位
}
}
} finally {
if (!workerStarted)
addWorkerFailed(w);
}
return workerStarted; // 返回啟動(dòng)標(biāo)志位
這邊還有個(gè)addWorkerFailed方法:
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w); // 從set 中刪除該worker
decrementWorkerCount(); // cas 將ctl中記錄的線程數(shù)減1
tryTerminate(); // 判斷是否需要終止整個(gè)線程池 需要的話 就終止
} finally {
mainLock.unlock();
}
}
runWorker
這里我們接著上面的Worker 某残,看下runWroker干了什么国撵,也就是Worker是如何從隊(duì)列中獲取任務(wù)執(zhí)行的(執(zhí)行worker.thread.start()其實(shí)就是執(zhí)行了這里):
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask; // worker 一開(kāi)始被啟動(dòng)的時(shí)候 會(huì)傳入這個(gè)任務(wù)
w.firstTask = null;
//由于Worker初始化時(shí)AQS中state設(shè)置為-1,這里要先做一次解鎖把state更新為 0驾锰,允許線程中斷
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// task為空 或者 從隊(duì)列中獲取任務(wù)為空
// 注意 這里getTask就是從隊(duì)列中獲取任務(wù)
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 如果線程池運(yùn)行狀態(tài)是stopping, 確保線程是中斷狀態(tài);
// 如果不是stopping, 確保線程是非中斷狀態(tài).
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 鉤子方法 方便子類做一些標(biāo)識(shí)
Throwable thrown = null;
try {
task.run(); // 執(zhí)行任務(wù)的run方法
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++; // 維護(hù)下總完成任務(wù)標(biāo)識(shí)
w.unlock();
}
}
completedAbruptly = false;
} finally
// 如果能執(zhí)行到這里 就說(shuō)明 該worker是時(shí)候被廢棄了
processWorkerExit(w, completedAbruptly);
}
}
// 從隊(duì)列中獲取任務(wù)
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) { // 循環(huán)拿任務(wù)
int c = ctl.get();
int rs = runStateOf(c); // 得到當(dāng)前線程池狀態(tài)
// Check if queue empty only if necessary.
// 針對(duì)當(dāng)前線程池狀態(tài)判斷是否要直接停止執(zhí)行隊(duì)列中的任務(wù)
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount(); // 將worker數(shù)量減1
return null;
}
int wc = workerCountOf(c); // 當(dāng)前線程池中worker總數(shù)量
// Are workers subject to culling?
// 如果設(shè)置了allowCoreThreadTimeOut = true 那么核心線程也是可以被淘汰的
// timed 用于表示是否需要表示是否需要校驗(yàn)時(shí)間(是否已經(jīng)大于核心線程數(shù))
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut)) // 當(dāng)前worker數(shù)量大于最大線程數(shù)量 或者 已經(jīng)超時(shí)
&& (wc > 1 || workQueue.isEmpty())) { // 線程數(shù)量大于1 并且隊(duì)列已經(jīng)空了
if (compareAndDecrementWorkerCount(c)) // 線程數(shù)量減1 成功 返回空
return null;
continue; // 否則繼續(xù)循環(huán)
}
try {
// 如果需要使用時(shí)間判斷 嘖使用poll 否則使用take
// 對(duì)列為空的話 take會(huì)無(wú)止境阻塞 而poll等待時(shí)間后 直接返回空
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null) // 拿到任務(wù)的話直接返回
return r;
timedOut = true; // 否則下次循環(huán)
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
submit
然后來(lái)看下submit方法:
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
可以看出 不管是哪種卸留,都是通過(guò)RunnableFuture 將其包裹起來(lái),然后通過(guò)execute執(zhí)行椭豫,最后將Future對(duì)象返回出去耻瑟。
動(dòng)態(tài)修改核心線程數(shù)以及最大線程數(shù)
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers(); // 當(dāng)前已存在的線程數(shù)大于新設(shè)置的值的話
else if (delta > 0) { // 增大核心線程數(shù)
// We don't really know how many new threads are "needed".
// As a heuristic, prestart enough new workers (up to new
// core size) to handle the current number of tasks in
// queue, but stop if queue becomes empty while doing so.
// 防止任務(wù)很多 ,這里提前創(chuàng)建一些worker 并start
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty()) // 如果等待隊(duì)列為空 則停止添加worker
break;
}
}
}
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize;
if (workerCountOf(ctl.get()) > maximumPoolSize) // 將最大線程數(shù)減小的話
interruptIdleWorkers();
}
private void interruptIdleWorkers() { // 中斷閑置線程
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 線程不是中斷狀態(tài) 并且 拿到了worker的鎖
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
這里需要注意的是在worker 中含有任務(wù)并且在運(yùn)行的時(shí)候赏酥,會(huì)上鎖(參看runWorker方法)喳整,也就是說(shuō) ,只有目前沒(méi)有執(zhí)行任務(wù)的worker 裸扶,這里才能拿到鎖框都, 進(jìn)而設(shè)置線程已中斷。
參考:
https://www.cnblogs.com/jajian/p/11442929.html
https://mp.weixin.qq.com/s/hduWrrK4B8x8Z3C7RnIhjw
https://mp.weixin.qq.com/s/FJQ5MhB1kMp8lP1NA6q4Vg
https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html