基本概念
-
任務
: 就是你自己實現(xiàn)的任務邏輯,一般為Runnable
實現(xiàn)類或Callable
實現(xiàn)類,不過在線程池中已經(jīng)被封裝成一個FutureTask
. 在我們向線程池中提交一個任務的時候,會先判斷目前線程池中的workerCount
是否小于核心線程數(shù)
,如果小于則將這個任務封裝成一個Worker
,然后啟動一個新線程;如果不小于則將這個任務添加到工作隊列
-
工作隊列
:工作隊列
是BlockQueue
的實現(xiàn)類,它的作用就是用來緩存任務
的,因為它本身是線程安全的,所以在向工作隊列
的時候不需要格外處理線程安全問題 -
Worker
: 可以認為每個Worker
對應一個線程,在我們創(chuàng)建Worker
的時候,會傳入一個任務
,這個任務就是這個Worker
首次要執(zhí)行的邏輯,執(zhí)行完之后它就會去工作隊列
拿任務執(zhí)行. 所有的Worker
都保存在一個HashSet
數(shù)據(jù)結構中,所以在向HashSet
添加Worker
的時候需要去處理線程安全問題,線程池中是通過ReentrantLock
來保證線程安全
工作流程
其實在說這個之前我們可以先考慮一下線程池出現(xiàn)的目的: 因為創(chuàng)建線程需要比較大的開銷,并且線程數(shù)太多的情況下上下文切換比較頻繁,所以我們希望有一種機制來改善它,這就是線程池,改善的核心就是控制線程的數(shù)量,通過暴露接口,可以滿足用戶創(chuàng)建不同場景下的線程池
- 來任務了,先創(chuàng)建幾個線程
核心線程數(shù)
- 任務太多了,處理不過來,總不能一直創(chuàng)建線程吧,這時候就將任務緩存到
工作隊列
- 任務實在是太多,
工作隊列
都滿了,那就再創(chuàng)建幾個線程吧最大線程數(shù)
- 任務真的真的太多了,還是處理不過來,拒絕吧,提供了幾種
拒絕策略
- 其他: 一段時間后,任務太少了,那些一直不工作的線程怎么處理?
空閑時間
使用示例
ExecutorService executorService = new ThreadPoolExecutor(
1,
1,
3,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2),
(r) -> new Thread(r),
(r, executor) -> System.out.println("拒絕"));
for (int i = 0; i < 5; i++) {
executorService.submit(() -> {
System.out.println(Thread.currentThread().getName());
sleep(TimeUnit.MILLISECONDS, 50);
});
}
----------------------------------------------執(zhí)行結果----------------------------------------------
拒絕
拒絕
Thread-0
Thread-0
Thread-0
ThreadPoolExecutor
線程池的核心實現(xiàn)類,基于ThreadPoolExecutor
可以實現(xiàn)滿足不同場景的線程池
-
acl
: 類型為AtomicInteger
,該變量包括兩部分內(nèi)容: 低29位用于表示workerCount
,即線程池中的線程數(shù),高3位用于表示線程池的狀態(tài),即RUNNING
SHUTDOWN
STOP
TIDYING
TERMINATED
- 狀態(tài)之間的轉換
RUNNING -> SHUTDOWN
On invocation of shutdown(), perhaps implicitly in finalize()
(RUNNING or SHUTDOWN) -> STOP
On invocation of shutdownNow()
SHUTDOWN -> TIDYING
When both queue and pool are empty
STOP -> TIDYING
When pool is empty
TIDYING -> TERMINATED
When the terminated() hook method has completed
構造函數(shù)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?null : AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
-
corePoolSize
: 核心線程數(shù),提交一個任務時,如果線程池中的線程數(shù)沒有達到核心線程數(shù),則會創(chuàng)建一個新的線程 -
maximumPoolSize
: 最大線程池,工作隊列滿了的情況下,如果線程池中的線程數(shù)沒有達到最大線程數(shù),則會創(chuàng)建一個新線程,否則使用拒絕策略 -
keepAliveTime
: 空閑線程存活時間,工作對立中沒有任務時,線程最大等待時間,其實就是工作隊列的帶時間阻塞 -
workQueue
: 工作隊列,存放任務的 -
threadFactory
: 創(chuàng)建線程工廠類 -
handler
: 線程池滿了情況下,提交任務時對應的拒絕策略,可以自己實現(xiàn),默認提供了幾種
提交任務
// AbstractExecutorService#submit
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// 創(chuàng)建一個FutureTask對象
RunnableFuture<Void> ftask = newTaskFor(task, null);
// 執(zhí)行 ThreadPoolExecutor#execute
execute(ftask);
return ftask;
}
- 基于
Runnable
創(chuàng)建一個FutureTask
對象,這樣可以獲取返回值了,因為Runnable
沒有返回值,所以這里直接傳null
- 調(diào)用
ThreadPoolExecutor#execute
方法
ThreadPoolExecutor#execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
// 如果`workerCount < corePoolSize`,則嘗試創(chuàng)建一個新線程,創(chuàng)建成功就直接返回,失敗繼續(xù)下面的流程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果線程池正在運行并且將該任務添加到工作隊列成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次檢查線程池是否正在運行,如果不在運行了就將該任務移除并執(zhí)行拒絕策略
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果`workerCount >= corePoolSize && 工作隊列放不下了`,再次嘗試添加一個新線程,如果添加失敗則執(zhí)行拒絕策略
else if (!addWorker(command, false))
reject(command);
}
- 如果
workerCount < corePoolSize
,則嘗試創(chuàng)建一個新線程,創(chuàng)建成功就直接返回,失敗繼續(xù)下面的流程 -
workerCount >= corePoolSize
,再次檢查線程池是否正在運行,如果不在運行了就將該任務移除并執(zhí)行拒絕策略 - 如果
workerCount >= corePoolSize && 工作隊列放不下了
,再次嘗試添加一個新線程,如果添加失敗則執(zhí)行拒絕策略
ThreadPoolExecutor#addWorker
該方法用于嘗試向線程池中添加一個新的線程,如果線程池運行狀態(tài)不正常,則會添加失敗
1. `firstTask`: 任務的具體邏輯,這里是一個`FutureTask`對象
2. `core`: 如果為true,這和`corePoolSize`比較,否則和`maximumPoolSize`比較. 因為執(zhí)行`addWorker`方法只有兩種情況:一種是`workerCount<corePoolSize`;一種是工作隊列已滿,這時需要和`maximumPoolSize`比較
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary. 僅在必要時檢查隊列是否為空, 狀態(tài) 第二個括號里的條件估計和SHUTDOWN語義有關,后面再看吧
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 通過自旋操作更新`workerCount`的值,即:加1
for (;;) {
// 獲取目前線程池中的線程數(shù)
int wc = workerCountOf(c);
// 因為執(zhí)行`addWorker`方法只有兩種情況:一種是`workerCount<corePoolSize`,這時需要和`corePoolSize`比較; 一種是工作隊列已滿,這時需要和`maximumPoolSize`比較
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 通過CAS操作嘗試對`workerCount`加1,如果成功就跳出最外層循環(huán)
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 如果在自旋(內(nèi)循環(huán)更新`workerCount`值)期間,線程池的狀態(tài)發(fā)生變化,重新進入外循環(huán)
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 這里的`ReentrantLock`主要作用是保證添加`Worker`到`workers`時是線程安全的,因為`workers`是`HashSet`結構,其本身不是線程安全的
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 如果線程池正在運行
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
// 如果t.isAlive()這說明線程已經(jīng)被啟動了,這時候直接拋異常,一般不會出現(xiàn)
if (t.isAlive()) // precheck that t is startable
throw new fIllegalThreadStateException();
// 將該任務添加到`workers`中,`workers`是一個`HashSet`結構,不過這里通過`ReentrantLock`保證它是線程安全的
workers.add(w);
int s = workers.size();
// 更新`largestPoolSize`,該值用于表示線程池中曾經(jīng)達到的最大線程數(shù)
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 最后啟動線程
t.start();
workerStarted = true;
}
}
} finally {
// 根據(jù)上面的代碼來判斷,如果線程池運行狀態(tài)不正常的時候,會添加`Worker`失敗,然后執(zhí)行`addWorkerFailed`方法
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
注釋已經(jīng)寫的很詳細了,總結一下:
- 先通過自旋更新
workerCount
的值 - 添加
Worker
到workers
中時需要通過ReentrantLock
保證線程安全,因為workers
是HashSet
結構,其本身不是線程安全的 - 線程池運行狀態(tài)不正常時,會添加
Worker
失敗,此時需要執(zhí)行ThreadPoolExecutor#addWorkerFailed
方法
ThreadPoolExecutor#addWorkerFailed
執(zhí)行到這里,說明線程池可能已經(jīng)出現(xiàn)了問題,這時候需要回滾之氣那的操作.即恢復workerCount
的值,然后將該Worker
從workers
中移除,并嘗試停止線程池
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
// 從`HashSet`中移除
workers.remove(w);
// 對`workerCount`減1
decrementWorkerCount();
// 嘗試停止線程池
tryTerminate();
} finally {
mainLock.unlock();
}
}
- 從
HashSet
中移除剛剛添加的Worker
- 對
workerCount
減1 - 嘗試停止線程池
執(zhí)行任務
通過上面的代碼可以發(fā)現(xiàn),提交任務的時候,如果創(chuàng)建了一個新的Worker
實例,就相當于創(chuàng)建了一個新的線程,并且會啟動該線程. 那線程啟動之后主要做了什么?
Thread#start
=> Worker#run
=> ThreadPoolExecutor#runWorker
Worker
- 實現(xiàn)了
Runnable
接口,因此當線程啟動之后,就會執(zhí)行Worker#run
方法 - 繼承自
AbstractQueuedSynchronizer
,說明它具有鎖的功能,但它是不可重入鎖 - 在構造函數(shù)中,已自己為參數(shù),創(chuàng)建一個線程,并將該線程作為自己的一個屬性
thread
Worker(Runnable firstTask) {
// 設置state=-1,則無法獲取鎖, 在runWorker中會先執(zhí)行unlock方法,然后再執(zhí)行l(wèi)ock方法獲取鎖
setState(-1); // inhibit interrupts until runWorker
// 最開始執(zhí)行的那個任務,之后的任務去隊列里面拿
this.firstTask = firstTask;
// 以自己為參數(shù)創(chuàng)建一個線程
this.thread = getThreadFactory().newThread(this);
}
在Worker#run
方法中,直接調(diào)用了ThreadPoolExecutor#runWorker
方法
ThreadPoolExecutor#runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 先取第一個任務執(zhí)行,這是在構造函數(shù)中傳入的
Runnable task = w.firstTask;
w.firstTask = null;
// 因為在Worker構造函數(shù)中默認設置了state為-1,需要先執(zhí)行`Worker#unlock`將state設置為0
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 如果首個任務不為null并且工作隊列里面還有任務
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 有點不太懂
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
wt.interrupt();
try {
// 前置處理,默認為空
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 執(zhí)行任務
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 后置處理,默認為空
afterExecute(task, thrown);
}
} finally {
// 設置task為null, 用于取下一個任務
task = null;
// 完成任務數(shù)加1
w.completedTasks++;
w.unlock();
}
}
// 工作隊列里面沒任務了,并且在獲取任務時在工作隊列上阻塞的時候大于空閑時間,并且時正常結束的,即沒有發(fā)生什么異常
completedAbruptly = false;
} finally {
// 將該Worker從HashSet中移除,執(zhí)行一些銷毀操作
processWorkerExit(w, completedAbruptly);
}
}
- 先執(zhí)行
firstTask
,該任務在創(chuàng)建Worker
時傳入 - 再從
工作隊列
中取任務執(zhí)行 - 執(zhí)行完成之后,說明
runWorker
將要退出了,這時候同時需要將該Worker
從HashSet中移除
todo 最核心的,中斷處理,即那個判斷條件,也就是Worker
實現(xiàn)AQS的目的
空閑線程清理
在創(chuàng)建線程池的時候,有提到一個參數(shù):空閑時間
,這個空閑時間
是什么意思呢?
Worker
執(zhí)行完firstTask
之后,就會去工作隊列
中拿任務繼續(xù)執(zhí)行,工作隊列
是一個阻塞隊列,當工作隊列
中沒有任務時,線程就會阻塞,直到有提交了新的任務. 這個空閑時間
其實就可以理解成該線程的阻塞時間,這部分邏輯在ThreadPoolExecutor#getTask
方法中
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// allowCoreThreadTimeOut表示線程是否永久存貨, 默認是永久存活, 結合下面的代碼說明在這兩種情況下,空閑時間生效: 1.allowCoreThreadTimeOut==true 2.工作線程數(shù)大于corePoolSize
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
-
空閑時間
其實就是線程在阻塞隊列上阻塞的最大時間,即通過阻塞隊列實現(xiàn) - 在這兩種情況下,空閑時間才會生效:
allowCoreThreadTimeOut==true
或者工作線程數(shù)大于corePoolSize
常見線程池
通過Executors
可以快速的創(chuàng)建一些不同類型的線程池
ExecutorService#newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
-
corePoolSize
為1摇零,maximumPoolSize
為1,意味著線程池中最多只有一個工作線程 -
空閑時間
為0,表示沒任務立即銷毀該線程 -
工作隊列
為LinkedBlockingQueue
,這其實是一個有界的阻塞隊列,但是由于這里沒有在創(chuàng)建LinkedBlockingQueue
的時設置容量,所以默認為Integer.MAX_VALUE
優(yōu)缺點
- 對阻塞隊列的長度沒有限制,可能會造成OOM
ExecutorService#newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
-
corePoolSize
為0,maximumPoolSize
為Integer.MAX_VALUE地技,意味著線程數(shù)可以為
Integer.MAX_VALUE -
空閑時間
為60s,也就是一分鐘
3.工作隊列
為SynchronousQueue
,這是一個比較特殊的阻塞隊列,當一個生產(chǎn)者線程向隊列中存數(shù)據(jù)時,生產(chǎn)者線程將被阻塞直到有另一個消費者線程從隊列中取數(shù)據(jù)(即take),反之亦然
優(yōu)缺點
- 適合執(zhí)行時間比較短的任務,這種情況下,很多線程可以被復用,避免每次都創(chuàng)建大量線程的開銷
- 但在任務執(zhí)行時間比較長的情況,由于該線程池對線程數(shù)沒有限制,可能會創(chuàng)建非常多的線程.
ExecutorService#newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
-
corePoolSize
為入?yún)ⅲ?code>maximumPoolSize為入?yún)?/li> -
空閑時間
為0,表示沒任務立即銷毀該線程 -
工作隊列
為LinkedBlockingQueue
,這其實是一個有界的阻塞隊列,但是由于這里沒有在創(chuàng)建LinkedBlockingQueue
的時設置容量,所以默認為Integer.MAX_VALUE
這個其實和newSingleThreadExecutor
有點像,只不過newSingleThreadExecutor
中只有一個線程,而newFixedThreadPool
是固定的線程
優(yōu)缺點
- 對阻塞隊列的長度沒有限制,可能會造成OOM
ExecutorService#ScheduledThreadPool
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue(),threadFactory);
}
-
corePoolSize
為入?yún)ⅲ?code>maximumPoolSize為Integer.MAX_VALUE -
空閑時間
為0,表示沒任務立即銷毀該線程 -
工作隊列
為DelayedWorkQueue
,這是一個有界的阻塞隊列
優(yōu)缺點
- 對阻塞隊列的長度沒有限制,可能會造成OOM
總結
還是推薦根據(jù)具體場景,基于ThreadPoolExecutor
定制自己的線程池