開篇
?部門以前有一個(gè)很跳的程序員寫了一個(gè)神奇的代碼猪落,通過不停創(chuàng)建ThreadPoolExecutor而不調(diào)用shutdown導(dǎo)致線程過多溢出囚衔,然后想想畢竟我還是一個(gè)不跳的程序員睛蛛,所以還是研究研究比較合適痴怨。這篇文章就是說明線程池的退出過程棵癣。
java源碼-ThreadPoolExecutor(1)
java源碼-ThreadPoolExecutor(2)
java源碼-ThreadPoolExecutor(3)
shutdown源碼解析
- 1苞七、上鎖藐守,mainLock是線程池的主鎖,是可重入鎖蹂风,當(dāng)要操作workers set這個(gè)保持線程的HashSet時(shí)卢厂,需要先獲取mainLock,還有當(dāng)要處理largestPoolSize惠啄、completedTaskCount這類統(tǒng)計(jì)數(shù)據(jù)時(shí)需要先獲取mainLock
- 2慎恒、判斷調(diào)用者是否有權(quán)限shutdown線程池
- 3、使用CAS操作將線程池狀態(tài)設(shè)置為shutdown撵渡,shutdown之后將不再接收新任務(wù)
- 4融柬、中斷所有空閑線程 interruptIdleWorkers()
- 5、onShutdown()姥闭,ScheduledThreadPoolExecutor中實(shí)現(xiàn)了這個(gè)方法丹鸿,可以在shutdown()時(shí)做一些處理
- 6、解鎖
- 7棚品、嘗試終止線程池 tryTerminate()
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 權(quán)限檢查
checkShutdownAccess();
// 設(shè)置當(dāng)前線程池狀態(tài)為SHUTDOWN靠欢,如果已經(jīng)是SHUTDOWN則直接返回
advanceRunState(SHUTDOWN);
// 設(shè)置中斷標(biāo)志,中斷所有等待任務(wù)的空閑線程
interruptIdleWorkers();
// ThreadPoolExecutor沒什么操作
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 嘗試狀態(tài)變?yōu)門ERMINATED
tryTerminate();
}
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
// 設(shè)置執(zhí)行狀態(tài)為SHUTDOWN
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
// onlyOne在這里傳入為false
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 遍歷所有worker铜跑,通過判斷能否獲取鎖來判定worker是否空閑
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
// 針對(duì)能夠獲取到鎖的線程门怪,線程正阻塞在獲取任務(wù)的過程中
// 通過中斷線程然后迫使線程退出阻塞然后工作線程退出工作
// 然后工作線程就自然的被回收了
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
shutdownNow源碼解析
?shutdownNow() 和 shutdown()的大體流程相似,差別是:
- 1锅纺、將線程池更新為stop狀態(tài)
- 2掷空、調(diào)用 interruptWorkers() 中斷所有線程,包括正在運(yùn)行的線程
- 3、將workQueue中待處理的任務(wù)移到一個(gè)List中坦弟,并在方法最后返回护锤,說明shutdownNow()后不會(huì)再處理workQueue中的任務(wù)
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 權(quán)限檢查
checkShutdownAccess();
// 設(shè)置線程池狀態(tài)為stop
advanceRunState(STOP);
// 中斷所有線程
interruptWorkers();
// 移除所有待消費(fèi)的任務(wù)
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 嘗試狀態(tài)變?yōu)門ERMINATED
tryTerminate();
return tasks;
}
awaitTermination源碼解析
?等待線程池狀態(tài)變?yōu)門ERMINATED則返回,或者時(shí)間超時(shí)酿傍。由于整個(gè)過程獨(dú)占鎖烙懦,所以一般調(diào)用shutdown或者shutdownNow后使用。
- 等待過程中如果發(fā)現(xiàn)未超時(shí)那么通過for循環(huán)繼續(xù)等待超時(shí)
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
工作線程Worker退出邏輯
?如果獲取task為null就會(huì)退出while循環(huán)從而執(zhí)行finally部分的邏輯赤炒,也就是processWorkerExit()方法執(zhí)行清了工作.
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 如果獲取任務(wù)為null氯析,那么就會(huì)退出當(dāng)前工作線程
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 執(zhí)行清了工作
processWorkerExit(w, completedAbruptly);
}
}
?getTask()方法(rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty()))當(dāng)中我們?cè)O(shè)置狀態(tài)為SHUTDOWN,同時(shí)workQueue為空的情況下就會(huì)返回null莺褒,在外層循環(huán)中就會(huì)退出線程的工作掩缓,實(shí)現(xiàn)線程退出。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
參考文章
Java中線程池ThreadPoolExecutor原理探究
Java線程池ThreadPoolExecutor使用和分析(三) - 終止線程池原理