本文總結(jié)一下對線程池源碼的學(xué)習(xí)逛绵,基于jdk 1.8
什么是線程池
顧名思義線程池就是一個可以提供一組可復(fù)用線程的對象郑气。線程池內(nèi)部有阻塞隊列柠并,用來存放等待執(zhí)行的任務(wù)弛饭。然后內(nèi)部的線程來執(zhí)行這些任務(wù)澜公,線程會不斷的從阻塞隊列中獲取任務(wù)來執(zhí)行姆另,而不是執(zhí)行完一個任務(wù)就銷毀。
線程池的作用
在高并發(fā)場景下坟乾,如果給每個任務(wù)都去創(chuàng)建一個線程來執(zhí)行迹辐,結(jié)果就是大量的線程創(chuàng)建與銷毀,系統(tǒng)的開銷將會很大甚侣,影響應(yīng)用的執(zhí)行效率明吩。
同時,線程池可以有效的限制應(yīng)用程序中同一時刻運行的線程數(shù)量殷费,避免CPU資源不足印荔,造成阻塞。
線程池的使用
定義一個線程池
ExecutorService executor = new ThreadPoolExecutor(1, 4, 20,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
該線程池详羡,核心線程數(shù)為1仍律,最大線程數(shù)為4,非核心線程空閑存活時間20s实柠,阻塞隊列是長度為10的 ArrayBlockingQueue染苛,線程工廠和飽和拒絕策略沒有定義,采用默認實現(xiàn)
線程池中添加任務(wù)
Executor接口提供了execute方法,傳入Runnable接口的實現(xiàn)(任務(wù))茶行,線程池將會調(diào)度執(zhí)行這些任務(wù)
for (int i = 0; i < 12; i++) {
executor.execute(() ->
System.out.println(Thread.currentThread().getName()));
}
ThreadPoolExecutor源碼分析
定義線程池
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
這是線程池最終執(zhí)行的構(gòu)造方法,共有7個參數(shù)登钥,分別是
- 核心線程數(shù)
- 最大線程數(shù)(核心線程+非核心線程)
- 非核心線程空閑存活時間
- 空閑存活時間單位
- 阻塞隊列
- 線程工廠
- 飽和拒絕策略
在定義時前5個參數(shù)是必須傳遞的畔师,后兩個參數(shù)不傳遞表示使用默認提供
注意看第三個參數(shù)呕童,默認它是作用在非核心線程上的春缕,如果希望同時作用在核心線程上久又,可以調(diào)用如下方法設(shè)置
allowCoreThreadTimeOut(true);
線程池的狀態(tài)
下面來看一下線程池內(nèi)部的一些狀態(tài)杏节,以及工作線程數(shù)的封裝
/**
* The main pool control state, ctl, is an atomic integer packing
* two conceptual fields
* workerCount, indicating the effective number of threads
* runState, indicating whether running, shutting down etc
*
* In order to pack them into one int, we limit workerCount to
* (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
* billion) otherwise representable.
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
節(jié)選了部分關(guān)鍵注釋說明
ctl是一個原子的Integer類型米者,包含了workerCount和runState柒爸,為了把這個兩個值拼到一個int中崎坊,限制了workerCount最大為2^29 -1宿百,大約為500多萬轮纫,而不是2^31-1腔寡。
也就是說作者把工作線程數(shù)和狀態(tài)值拼接到了一個int中,這些屬性含義如下
屬性 | 含義 | 值 |
---|---|---|
COUNT_BITS | 2進制計數(shù)位數(shù) | 29 |
CAPACITY | 線程數(shù)容量 | (2^29)-1 |
RUNNING | 運行狀態(tài) | -2^29 |
SHUTDOWN | 關(guān)閉狀態(tài)(不接受新任務(wù)掌唾,把已有任務(wù)執(zhí)行完) | 0 |
STOP | 停止(不接受新任務(wù)放前,終止正在執(zhí)行的) | 2^29 |
TIDYING | 所有任務(wù)終止,工作線程數(shù)為0 | 2^30 |
TERMINATED | terminated()方法執(zhí)行完成 | 2^29 + 2^30 |
通過上表可以看到糯彬,線程池的5個狀態(tài)數(shù)值是遞增的
所以只要狀態(tài)是>=SHUTDOWN凭语,就代表線程池不會再接受新的任務(wù)
三個靜態(tài)方法解釋
- ctlOf(int rs, int wc)
線程池狀態(tài)與線程數(shù)拼成一個int,高3位為狀態(tài)撩扒,低29位為工作線程數(shù) - runStateOf(int c)
獲取線程池狀態(tài) - workerCountOf(int c)
獲取工作線程數(shù)
執(zhí)行任務(wù)—execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
可以看到似扔,Doug Lea老爺子已經(jīng)將該方法的流程注釋的很清晰了,我這里就通俗的描述一下:
- 如果運行的線程數(shù)小于核心線程數(shù)搓谆,那么就新啟動一個線程炒辉,并將該任務(wù)作為此線程的firstTask
- 若線程池的核心線程數(shù)已經(jīng)滿了,就將任務(wù)添加到阻塞隊列中挽拔,需要二次檢查(因為有可能在上一次檢查之后死掉辆脸,或者是進入該方法時線程池被關(guān)閉),若線程池不是運行狀態(tài)螃诅,則將該任務(wù)從隊列中移除啡氢,并進行拒絕處理。如果二次檢查后沒有工作的線程了术裸,那么就新啟動一個線程執(zhí)行該任務(wù)
- 如果阻塞隊列也滿了倘是,就新啟動一個非核心線程,如果失敗的話袭艺,說明線程池被shutdown或者是隊列容量和最大線程數(shù)都已達到上限搀崭,將此任務(wù)拒絕掉
添加工作線程—addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 獲取工作線程數(shù)
int wc = workerCountOf(c);
// 如果大于CAPACITY最大容量,或者core為true,與corePoolSize比瘤睹,
// 否則與maximumPoolSize比較升敲,如果大于允許的線程數(shù)則返回 false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// worker + 1成功,跳出retry外層循環(huán)
if (compareAndIncrementWorkerCount(c))
break retry;
// cas操作失敗轰传,如果線程池狀態(tài)改變驴党,跳出內(nèi)層循環(huán),繼續(xù)判斷狀態(tài)
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 拿到鎖以后二次檢查
int rs = runStateOf(ctl.get());
// 如果在運行狀態(tài)获茬,或者是SHUTDOWN狀態(tài)且firstTask為空(取queue中任務(wù))
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 線程已經(jīng)啟動港庄,拋出異常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 添加到workers中
workers.add(w);
// 記錄最大的worker數(shù)量
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 啟動線程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 添加失敗
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
firstTask
addWorker方法的第一個參數(shù)是firstTask,firstTask是線程池中Worker對象的一個屬性恕曲,該對象代表新啟動線程的第一個任務(wù)鹏氧。
在execute方法源碼中可以看到,只有在新增線程時才會給firstTask賦值佩谣,如果任務(wù)被添加到queue中把还,將其置為null,線程會去阻塞隊列中獲取任務(wù)執(zhí)行稿存。
在添加worker前笨篷,會在有必要的情況下檢查阻塞隊列是否為空
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
1、如果狀態(tài)大于SHUTDOWN瓣履,不接受新任務(wù)率翅,直接返回false;
2袖迎、如果狀態(tài)等于SHUTDOWN冕臭,firstTask != null,返回false燕锥,不允許新增任務(wù)辜贵;
2、如果狀態(tài)等于SHUTDOWN归形,firstTask == null托慨,說明該線程會去隊列中取任務(wù)執(zhí)行,如果此時workQueue.isEmpty()暇榴,則返回false厚棵;
addWorkerFailed
添加線程失敗時,會將剛創(chuàng)建的worker對象移除掉
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// HashSet中移除worker
if (w != null)
workers.remove(w);
// 線程數(shù)減一
decrementWorkerCount();
// 嘗試關(guān)閉線程池
tryTerminate();
} finally {
mainLock.unlock();
}
}
內(nèi)部線程包裝對象—Worker
Worker是ThreadPoolExecutor中的內(nèi)部類蔼紧,包含了執(zhí)行任務(wù)的線程(節(jié)選了部分屬性和方法)
private final class Worker extends AbstractQueuedSynchronizer
implements Runnable {
/** 執(zhí)行任務(wù)的線程 */
final Thread thread;
/** 初始化運行的任務(wù)婆硬,可能為空 */
Runnable firstTask;
/** 每個worker完成任務(wù)的計數(shù)器 */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
// 無法獲取鎖,從而禁止 interrupt worker
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 線程工廠初始化線程
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
}
Worker對象通過繼承AbstractQueuedSynchronizer隊列同步器奸例,來控制worker的同步狀態(tài)彬犯,
新建worker時,setState(-1) ,設(shè)置狀態(tài)為 -1 使得其他線程無法獲取到worker的鎖谐区,禁止interrupt該線程(只有當前狀態(tài)為 0 時才有機會獲得鎖)
執(zhí)行任務(wù)——runWorker
線程池真正執(zhí)行任務(wù)的地方就在這里了
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
// 標記線程是否是因為發(fā)生異常中斷的
boolean completedAbruptly = true;
try {
// 獲取要執(zhí)行的任務(wù), firstTask為空就從隊列中取
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 鉤子函數(shù)湖蜕,可以在執(zhí)行前自定義些操作
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 執(zhí)行任務(wù)
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 鉤子函數(shù),任務(wù)執(zhí)行完成后調(diào)用的方法
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
// 執(zhí)行成功卢佣,將異常標記置為 false
completedAbruptly = false;
} finally {
// 執(zhí)行worker退出操作
processWorkerExit(w, completedAbruptly);
}
}
線程池狀態(tài)檢查
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
1重荠、如果runState >= stop(stop狀態(tài)線程池要中斷正在運行的任務(wù)),且線程未被設(shè)置為中斷虚茶,則interrupt線程
2、如果runState < stop仇参,進行二次檢查(有可能在第一次獲取狀態(tài)后嘹叫,調(diào)用了shutdownNow方法),此時線程如果有中斷標記诈乒,則清除(Thread.interrupted()返回線程中斷狀態(tài)罩扇,并將其清除),再次查看狀態(tài)怕磨,runSate >= stop 則interrupt線程
獲取任務(wù)——getTask
private Runnable getTask() {
// 上一次循環(huán)取task是否超時
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 檢查線程池狀態(tài)
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 線程空閑了是否需要退出
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 檢查線程池workerCount
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
1喂饥、循環(huán)取任務(wù),直到取到任務(wù)肠鲫,或者是不需要返回任務(wù)為止员帮;
2、如果線程池是 > stop狀態(tài)导饲,則workerCount減1捞高,返回null,
如果是shutdown狀態(tài)渣锦,且隊列為空硝岗,則workerCount減1,返回null
3袋毙、(wc > maximumPoolSize || (timed && timedOut))
wc > 最大線程數(shù) 或者是 線程空閑了keepAliveTime 且 空閑需被銷毀
(wc > 1 || workQueue.isEmpty())
wc > 1 或者 隊列為空
同時滿足上述兩個條件型檀,說明該線程不需要獲取任務(wù)來執(zhí)行,則workerCount減1听盖,返回null
- timedOut代表上一次循環(huán)中胀溺,取task時候是否超時(代表了該線程空閑了keepAliveTime時間)
- timed代表該線程空閑了是否需要銷毀
4、如果timed == true媳溺,則調(diào)用poll方法月幌,等待keepAliveTime時間,
否則調(diào)用take方法阻塞直到獲取到任務(wù)(到達這一步悬蔽,說明線程池狀態(tài)為running或者是shutdown且workQueue不為空)
worker退出——processWorkerExit
任務(wù)執(zhí)行完成后扯躺,在finally語句中執(zhí)行worker的退出操作
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果線程異常退出,則workerCount減 1
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// set 中異常執(zhí)行完的worker對象
workers.remove(w);
} finally {
mainLock.unlock();
}
// 嘗試停止線程池
tryTerminate();
int c = ctl.get();
// 狀態(tài)為running或者shutdown
if (runStateLessThan(c, STOP)) {
// 異常退出直接新增加一個worker
if (!completedAbruptly) {
// 計算最小線程數(shù)
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 當前工作線程大于min,則無需新增录语,直接返回
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
該方法中有個很重要的操作就是調(diào)用tryTerminate方法倍啥,嘗試終止線程池
接下來就來分析線程池的關(guān)閉操作
tryTerminate + awaitTermination
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 1、stop狀態(tài)澎埠,則往下執(zhí)行
// 2虽缕、shutdown且隊列為空則往下執(zhí)行,其余情況直接return
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 如果工作線程數(shù)不為空蒲稳,則中斷一個worker線程
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
// 執(zhí)行到這里氮趋,說明worker為0,且沒有任務(wù)需要執(zhí)行
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 設(shè)置線程池狀態(tài)為tidying
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 調(diào)用鉤子函數(shù)江耀,需繼承在子類中實現(xiàn)
terminated();
} finally {
// 設(shè)置線程池狀態(tài)為terminated
ctl.set(ctlOf(TERMINATED, 0));
// 線程池終止完成信號通知剩胁,通知awaitTermination方法
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
// 設(shè)置的阻塞超時時間
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
// 如果線程池已經(jīng)關(guān)閉,直接返回
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
// 阻塞祥国,如果tryTerminate方法關(guān)閉成功的話昵观,會喚醒這里
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
該方法中比較重要的一步操作就是中斷空閑線程interruptIdleWorkers(ONLY_ONE)
/**
* @param onlyOne If true, interrupt at most one worker. This is
* called only from tryTerminate when termination is otherwise
* enabled but there are still other workers. In this case, at
* most one waiting worker is interrupted to propagate shutdown
* signals in case all threads are currently waiting.
* Interrupting any arbitrary thread ensures that newly arriving
* workers since shutdown began will also eventually exit.
* To guarantee eventual termination, it suffices to always
* interrupt only one idle worker, but shutdown() interrupts all
* idle workers so that redundant workers exit promptly, not
* waiting for a straggler task to finish.
*/
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 如果線程未中斷,且可以獲取到鎖舌稀,則interrupt
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
// 如果該值為true啊犬,則跳出循環(huán)
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
粗略翻譯一下方法上面的注釋,如果onlyOne參數(shù)被設(shè)置為true的話壁查,該方法最多只會中斷一個worker線程觉至,為了把shutdown信號傳播下去,保證線程池最終的關(guān)閉潮罪,最多就只中斷一個空閑線程康谆。
線程阻塞的話就是阻塞在getTask方法中,這里中斷一個線程后嫉到,getTask --> processWorkerExit --> tryTerminate --> interruptIdleWorkers --> getTask
其實tryTerminate方法中沃暗,為什么要設(shè)置onlyOne為true,如果那個地方是false會是什么結(jié)果何恶,沒有思考的很明白孽锥,后續(xù)多查閱些資料實踐一下。
其實上面已經(jīng)涉及到了線程池的關(guān)閉流程细层,下面還有兩個比較重要的方法來分析一下
關(guān)閉線程池——shutdown
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 將線程池狀態(tài)設(shè)置為shutdown
advanceRunState(SHUTDOWN);
// 中斷所有空閑的線程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 嘗試關(guān)閉線程池
tryTerminate();
}
這個地方的巧妙之處就在于最后的tryTerminate方法惜辑,因為線程池shutdown狀態(tài)時,是要把剩下的任務(wù)執(zhí)行完的疫赎,如果調(diào)shutdown方法的時候恰好所有線程都在執(zhí)行任務(wù)盛撑,那么就無法中斷。
關(guān)閉線程池——shutdownNow
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 設(shè)置線程池狀態(tài)為stop
advanceRunState(STOP);
// 中斷所有worker
interruptWorkers();
// 取出隊列中的任務(wù)并返回
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 嘗試關(guān)閉線程池
tryTerminate();
return tasks;
}
stop狀態(tài)需要把所有線程中斷捧搞,任務(wù)也放棄抵卫,所有shutdownNow會中斷所有worker線程
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
// 此方法是內(nèi)部類Worker中的方法狮荔,提到這里來便于閱讀
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
持有worker鎖時,state 為 1介粘,未持有鎖時為 0殖氏,所以這里就可以看出區(qū)別,shutdown方法是只能中斷空閑的worker線程姻采,而shutdownNow則是把所有worker線程都中斷雅采。
線程池的基本流程就到這里了,如果有理解的不對的地方慨亲,或者需要補充的地方婚瓜,還望各位小伙伴不吝賜教 ^-^ ~