Java線程池實(shí)現(xiàn)原理和源碼分析
前言
本文章是從2019年11月下旬開始打開寫的善绎,一直拖到2020年的年尾才開始寫屯蹦,直到2021年年初才寫完搁胆。
時(shí)間太快也太慢~夺溢!
依稀記得2019年10月份的時(shí)候某東從創(chuàng)業(yè)公司離職打算面試找工作阔逼,他問我線程池你會(huì)么兆衅?然后給我他發(fā)了一篇我2017年寫的筆記《Java并發(fā)編程之線程池必用知識點(diǎn)》,他說就這么點(diǎn)嗜浮?我當(dāng)時(shí)想線程池也差不多就這么多吧~羡亩!
2019年11月9號我和某東一起從大望路做815公交去燕郊。當(dāng)時(shí)只是因?yàn)槲艺趯W(xué)習(xí)一部分多線程相關(guān)的知識點(diǎn)危融,剛好公交車上沒啥事情我倆就嘮了嘮畏铆。當(dāng)時(shí)他問了我一些線程池的問題,我覺得在平時(shí)的工作線程池知道該怎么用就行頂多優(yōu)化一下核心線程數(shù)量吉殃。主要討論的還是多線程并發(fā)和鎖的相關(guān)問題辞居。
年底工作一般比較忙也就很少進(jìn)行自我學(xué)習(xí)了,隔了一周想起了某東問我的問題“線程池中線程是怎么產(chǎn)生的蛋勺,任務(wù)是怎么等待執(zhí)行瓦灶?”。
自己也不是很清楚這塊的邏輯抱完,臨時(shí)把這個(gè)TODO項(xiàng)紀(jì)錄了下來贼陶,想以后有時(shí)間了研究一下。結(jié)果這個(gè)以后跨越了2019年和2020年直接來到了2021年。
原諒我的啰里啰嗦碉怔,關(guān)鍵這個(gè)篇文章時(shí)間跨度太長了烘贴,給我的印象太深了。不得不說道說道撮胧,下面開始進(jìn)去正題~桨踪!
JDK1.8
的源碼來分析Java線程池的核心設(shè)計(jì)與實(shí)現(xiàn)。
本文參考了Java線程池實(shí)現(xiàn)原理及其在美團(tuán)業(yè)務(wù)中的實(shí)踐這篇文章中的部分內(nèi)容趴樱。
Java線程池實(shí)現(xiàn)原理及其在美團(tuán)業(yè)務(wù)中的實(shí)踐這篇文章寫的非常好馒闷,除過本文內(nèi)容之外這篇文章還講述了的關(guān)于線程池的背景,線程池在業(yè)務(wù)中的實(shí)踐和動(dòng)態(tài)化線程池等叁征,所以想了解線程池關(guān)于這些類容的可以閱讀Java線程池實(shí)現(xiàn)原理及其在美團(tuán)業(yè)務(wù)中的實(shí)踐這篇文章纳账。
如果讀者為做服務(wù)端開發(fā)的同學(xué)那么強(qiáng)烈建議閱讀Java線程池實(shí)現(xiàn)原理及其在美團(tuán)業(yè)務(wù)中的實(shí)踐。
外觀
外觀主要是我們平常使用線程池的時(shí)候所看到的一些點(diǎn)捺疼。
- 繼承關(guān)系疏虫;
- 構(gòu)造函數(shù);
- 構(gòu)造函數(shù)中的參數(shù)啤呼;
- 構(gòu)造函數(shù)中的阻塞隊(duì)列卧秘;
- 線程池的創(chuàng)建;
- 構(gòu)造函數(shù)中的拒絕策略官扣;
線程池繼承關(guān)系
ThreadPoolExecutor
實(shí)現(xiàn)的頂層接口是Executor
翅敌,在接口Executor
中用戶無需關(guān)注如何創(chuàng)建線程,如何調(diào)度線程來執(zhí)行任務(wù)惕蹄,用戶只需提供Runnable
對象蚯涮,將任務(wù)的運(yùn)行邏輯提交到執(zhí)行器Executor
中,由Executor
框架完成線程的調(diào)配和任務(wù)的執(zhí)行部分卖陵。
ExecutorService
接口增加了一些能力:
- 擴(kuò)充執(zhí)行任務(wù)的能力遭顶,補(bǔ)充可以為一個(gè)或一批異步任務(wù)生成
Future
的方法; - 提供了管控線程池的方法泪蔫,比如停止線程池的運(yùn)行棒旗。
AbstractExecutorService
則是上層的抽象類,將執(zhí)行任務(wù)的流程串聯(lián)了起來撩荣,保證下層的實(shí)現(xiàn)只需關(guān)注一個(gè)執(zhí)行任務(wù)的方法即可铣揉。
最下層的實(shí)現(xiàn)類ThreadPoolExecutor
實(shí)現(xiàn)最復(fù)雜的運(yùn)行部分:
可以自動(dòng)創(chuàng)建、管理和復(fù)用指定數(shù)量的一組線程餐曹,適用方只需提交任務(wù)即可
線程安全老速,
ThreadPoolExecutor
內(nèi)部有狀態(tài)、核心線程數(shù)凸主、非核心線程等屬性橘券,廣泛使用了CAS和AQS鎖機(jī)制避免并發(fā)帶來的沖突問題提供了核心線程、緩沖阻塞隊(duì)列、非核心線程旁舰、拋棄策略的概念锋华,可以根據(jù)實(shí)際應(yīng)用場景進(jìn)行組合使用
提供了
beforeExecute
和afterExecute()
可以支持對線程池的功能進(jìn)行擴(kuò)展
構(gòu)造函數(shù)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
-
corePoolSize:線程池的核心線程數(shù),一般情況下不管有沒有任務(wù)都會(huì)一直在線程池中一直存活箭窜,只有在
ThreadPoolExecutor
中的方法allowCoreThreadTimeOut(boolean value)
設(shè)置為true
時(shí)毯焕,閑置的核心線程會(huì)存在超時(shí)機(jī)制,如果在指定時(shí)間沒有新任務(wù)來時(shí)磺樱,核心線程也會(huì)被終止纳猫,而這個(gè)時(shí)間間隔由第3個(gè)屬性keepAliveTime
指定。 - maximumPoolSize:線程池所能容納的最大線程數(shù)竹捉,當(dāng)活動(dòng)的線程數(shù)達(dá)到這個(gè)值后芜辕,后續(xù)的新任務(wù)將會(huì)被阻塞。
-
keepAliveTime:控制線程閑置時(shí)的超時(shí)時(shí)長块差,超過則終止該線程侵续。一般情況下用于非核心線程,只有在
ThreadPoolExecutor
中的方法allowCoreThreadTimeOut(boolean value)
設(shè)置為true
時(shí)憨闰,也作用于核心線程状蜗。 -
unit:用于指定
keepAliveTime
參數(shù)的時(shí)間單位,TimeUnit
是個(gè)enum
枚舉類型鹉动,常用的有:TimeUnit.HOURS(小時(shí))
轧坎、TimeUnit.MINUTES(分鐘)
、TimeUnit.SECONDS(秒)
和TimeUnit.MILLISECONDS(毫秒)
等泽示。 -
workQueue:線程池的任務(wù)隊(duì)列眶根,通過線程池的
execute(Runnable command)
方法會(huì)將任務(wù)Runnable
存儲在隊(duì)列中。 - threadFactory:線程工廠边琉,它是一個(gè)接口,用來為線程池創(chuàng)建新線程的记劝。
- handler:拒絕策略变姨,所謂拒絕策略,是指將任務(wù)添加到線程池中時(shí)厌丑,線程池拒絕該任務(wù)所采取的相應(yīng)策略定欧。
成員變量
/**
* 任務(wù)阻塞隊(duì)列
*/
private final BlockingQueue<Runnable> workQueue;
/**
* 非公平的互斥鎖(可重入鎖)
*/
private final ReentrantLock mainLock = new ReentrantLock();
/**
* 線程集合一個(gè)Worker對應(yīng)一個(gè)線程,沒有核心線程的說話怒竿,只有核心線程數(shù)
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
/**
* 配合mainLock通過Condition能夠更加精細(xì)的控制多線程的休眠與喚醒
*/
private final Condition termination = mainLock.newCondition();
/**
* 線程池中線程數(shù)量曾經(jīng)達(dá)到過的最大值砍鸠。
*/
private int largestPoolSize;
/**
* 已完成任務(wù)數(shù)量
*/
private long completedTaskCount;
/**
* ThreadFactory對象,用于創(chuàng)建線程耕驰。
*/
private volatile ThreadFactory threadFactory;
/**
* 拒絕策略的處理句柄
* 現(xiàn)在默認(rèn)提供了CallerRunsPolicy爷辱、AbortPolicy、DiscardOldestPolicy、DiscardPolicy
*/
private volatile RejectedExecutionHandler handler;
/**
* 線程池維護(hù)線程(超過核心線程數(shù))所允許的空閑時(shí)間
*/
private volatile long keepAliveTime;
/**
* 允許線程池中的核心線程超時(shí)進(jìn)行銷毀
*/
private volatile boolean allowCoreThreadTimeOut;
/**
* 線程池維護(hù)線程的最小數(shù)量饭弓,哪怕是空閑的
*/
private volatile int corePoolSize;
/**
* 線程池維護(hù)的最大線程數(shù)量双饥,線程數(shù)超過這個(gè)數(shù)量之后新提交的任務(wù)就需要進(jìn)入阻塞隊(duì)列
*/
private volatile int maximumPoolSize;
創(chuàng)建線程池
Executors
提供獲取幾種常用的線程池的方法:
- 緩存程線程池
newCachedThreadPool
是一個(gè)可根據(jù)需要?jiǎng)?chuàng)建新線程的線程池,但是在以前構(gòu)造的線程可用時(shí)將重用它們弟断。對于執(zhí)行很多短期異步任務(wù)的程序而言咏花,這些線程池通常可提高程序性能阀趴。調(diào)用execute()
將重用以前構(gòu)造的線程(如果線程可用)昏翰。如果現(xiàn)有線程沒有可用的,則創(chuàng)建一個(gè)新線程并添加到池中刘急。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程棚菊。因此,長時(shí)間保持空閑的線程池不會(huì)使用任何資源排霉。注意窍株,可以使用ThreadPoolExecutor
構(gòu)造方法創(chuàng)建具有類似屬性但細(xì)節(jié)不同(例如超時(shí)參數(shù))的線程池。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- 單線程線程池
newSingleThreadExecutor
創(chuàng)建是一個(gè)單線程池攻柠,也就是該線程池只有一個(gè)線程在工作球订,所有的任務(wù)是串行執(zhí)行的,如果這個(gè)唯一的線程因?yàn)楫惓=Y(jié)束瑰钮,那么會(huì)有一個(gè)新的線程來替代它冒滩,此線程池保證所有任務(wù)的執(zhí)行順序按照任務(wù)的提交順序執(zhí)行。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- 固定大小線程池
newFixedThreadPool
創(chuàng)建固定大小的線程池浪谴,每次提交一個(gè)任務(wù)就創(chuàng)建一個(gè)線程开睡,直到線程達(dá)到線程池的最大大小,線程池的大小一旦達(dá)到最大值就會(huì)保持不變苟耻,如果某個(gè)線程因?yàn)閳?zhí)行異常而結(jié)束篇恒,那么線程池會(huì)補(bǔ)充一個(gè)新線程。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
- 單線程線程池
newScheduledThreadPool
創(chuàng)建一個(gè)大小無限的線程池凶杖,此線程池支持定時(shí)以及周期性執(zhí)行任務(wù)的需求胁艰。
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
我們可以看出來上面的方法一共使用了DelayedWorkQueue
、LinkedBlockingQueue
和 SynchronousQueue
智蝠。這個(gè)就是線程核心之一的阻塞隊(duì)列腾么。
任務(wù)阻塞隊(duì)列
它一般分為直接提交隊(duì)列、有界任務(wù)隊(duì)列杈湾、無界任務(wù)隊(duì)列解虱、優(yōu)先任務(wù)隊(duì)列;
SynchronousQueue
1漆撞、直接提交隊(duì)列:設(shè)置為SynchronousQueue
隊(duì)列殴泰,SynchronousQueue
是一個(gè)特殊的BlockingQueue
寂纪,它沒有容量倡蝙,每執(zhí)行一個(gè)插入操作就會(huì)阻塞,需要再執(zhí)行一個(gè)刪除操作才會(huì)被喚醒,反之每一個(gè)刪除操作也都要等待對應(yīng)的插入操作凯傲。
使用SynchronousQueue
隊(duì)列妇斤,提交的任務(wù)不會(huì)被保存响鹃,總是會(huì)馬上提交執(zhí)行瞧掺。如果用于執(zhí)行任務(wù)的線程數(shù)量小于maximumPoolSize
,則嘗試創(chuàng)建新的進(jìn)程健霹,如果達(dá)到maximumPoolSize
設(shè)置的最大值旺上,則根據(jù)你設(shè)置的handler
執(zhí)行拒絕策略。因此這種方式你提交的任務(wù)不會(huì)被緩存起來糖埋,而是會(huì)被馬上執(zhí)行宣吱,在這種情況下,你需要對你程序的并發(fā)量有個(gè)準(zhǔn)確的評估瞳别,才能設(shè)置合適的maximumPoolSize
數(shù)量征候,否則很容易就會(huì)執(zhí)行拒絕策略;
ArrayBlockingQueue
2祟敛、有界的任務(wù)隊(duì)列:有界的任務(wù)隊(duì)列可以使用ArrayBlockingQueue
實(shí)現(xiàn)疤坝,如下所示:
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
使用ArrayBlockingQueue
有界任務(wù)隊(duì)列,若有新的任務(wù)需要執(zhí)行時(shí)馆铁,線程池會(huì)創(chuàng)建新的線程跑揉,直到創(chuàng)建的線程數(shù)量達(dá)到corePoolSize
時(shí),則會(huì)將新的任務(wù)加入到等待隊(duì)列中埠巨。若等待隊(duì)列已滿历谍,即超過ArrayBlockingQueue
初始化的容量,則繼續(xù)創(chuàng)建線程辣垒,直到線程數(shù)量達(dá)到maximumPoolSize
設(shè)置的最大線程數(shù)量望侈,若大于maximumPoolSize
,則執(zhí)行拒絕策略勋桶。在這種情況下脱衙,線程數(shù)量的上限與有界任務(wù)隊(duì)列的狀態(tài)有直接關(guān)系,如果有界隊(duì)列初始容量較大或者沒有達(dá)到超負(fù)荷的狀態(tài)哥遮,線程數(shù)將一直維持在corePoolSiz
e以下,反之當(dāng)任務(wù)隊(duì)列已滿時(shí)陵究,則會(huì)以maximumPoolSize
為最大線程數(shù)上限眠饮。
LinkedBlockingQueue
3、無界的任務(wù)隊(duì)列:無界任務(wù)隊(duì)列可以使用LinkedBlockingQueue
實(shí)現(xiàn)铜邮,如下所示:
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
使用無界任務(wù)隊(duì)列仪召,線程池的任務(wù)隊(duì)列可以無限制的添加新的任務(wù)寨蹋,而線程池創(chuàng)建的最大線程數(shù)量就是你corePoolSize
設(shè)置的數(shù)量,也就是說在這種情況下maximumPoolSize
這個(gè)參數(shù)是無效的扔茅,哪怕你的任務(wù)隊(duì)列中緩存了很多未執(zhí)行的任務(wù)已旧,當(dāng)線程池的線程數(shù)達(dá)到corePoolSize
后,就不會(huì)再增加了召娜;若后續(xù)有新的任務(wù)加入运褪,則直接進(jìn)入隊(duì)列等待,當(dāng)使用這種任務(wù)隊(duì)列模式時(shí)玖瘸,一定要注意你任務(wù)提交與處理之間的協(xié)調(diào)與控制秸讹,不然會(huì)出現(xiàn)隊(duì)列中的任務(wù)由于無法及時(shí)處理導(dǎo)致一直增長,直到最后資源耗盡的問題雅倒。
PriorityBlockingQueue
4璃诀、優(yōu)先任務(wù)隊(duì)列:優(yōu)先任務(wù)隊(duì)列通過PriorityBlockingQueue
實(shí)現(xiàn):
任務(wù)會(huì)按優(yōu)先級重新排列執(zhí)行,且線程池的線程數(shù)一直為corePoolSize
蔑匣,也就是只有一個(gè)劣欢。
PriorityBlockingQueue
其實(shí)是一個(gè)特殊的無界隊(duì)列,它其中無論添加了多少個(gè)任務(wù)裁良,線程池創(chuàng)建的線程數(shù)也不會(huì)超過corePoolSize
的數(shù)量凿将,只不過其他隊(duì)列一般是按照先進(jìn)先出的規(guī)則處理任務(wù),而PriorityBlockingQueue
隊(duì)列可以自定義規(guī)則根據(jù)任務(wù)的優(yōu)先級順序先后執(zhí)行趴久。
其實(shí)LinkedBlockingQueue
也是可以設(shè)置界限的丸相,它默認(rèn)的界限是Integer.MAX_VALUE
。同時(shí)也支持也支持構(gòu)造的時(shí)候設(shè)置隊(duì)列大小彼棍。
拒絕策略
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
當(dāng)Executor
已經(jīng)關(guān)閉灭忠,即執(zhí)行了executorService.shutdown()
方法后,或者Executor
將有限邊界用于最大線程和工作隊(duì)列容量座硕,且已經(jīng)飽和時(shí)弛作。使用方法execute()
提交的新任務(wù)將被拒絕.
在以上述情況下,execute
方法將調(diào)用其RejectedExecutionHandler
的 RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor)
方法华匾。
AbortPolicy 默認(rèn)的拒絕策略
也稱為終止策略映琳,遭到拒絕將拋出運(yùn)行時(shí)RejectedExecutionException
。業(yè)務(wù)方能通過捕獲異常及時(shí)得到對本次任務(wù)提交的結(jié)果反饋蜘拉。
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
CallerRunsPolicy
擁有自主反饋控制萨西,讓提交者執(zhí)行提交任務(wù),能夠減緩新任務(wù)的提交速度旭旭。這種情況是需要讓所有的任務(wù)都執(zhí)行完畢谎脯。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
DiscardPolicy
拒絕任務(wù)的處理程序,靜默丟棄任務(wù)持寄。使用此策略源梭,我們可能無法感知系統(tǒng)的異常狀態(tài)娱俺。慎用~!
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
DiscardOldestPolicy
丟棄隊(duì)列中最前面的任務(wù)废麻,然后重新提交被拒絕的任務(wù)荠卷。是否要使用此策略需要看業(yè)務(wù)是否需要新老的替換,慎用~烛愧!
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
內(nèi)核
前面講了線程池的外觀油宜,接下來講述它的內(nèi)核屑彻。
線程池在內(nèi)部實(shí)際上構(gòu)建了一個(gè)生產(chǎn)者消費(fèi)者模型验庙,將線程
和任務(wù)
兩者解耦,并不直接關(guān)聯(lián)社牲,從而良好的緩沖任務(wù)粪薛,復(fù)用線程。
線程池的運(yùn)行主要分成兩部分:任務(wù)管理搏恤、線程管理违寿。
任務(wù)管理部分充當(dāng)生產(chǎn)者的角色,當(dāng)任務(wù)提交后熟空,線程池會(huì)判斷該任務(wù)后續(xù)的流轉(zhuǎn):
- 直接申請線程執(zhí)行該任務(wù)藤巢;
- 緩沖到隊(duì)列中等待線程執(zhí)行;
- 拒絕該任務(wù)息罗。
線程管理部分是消費(fèi)者掂咒,它們被統(tǒng)一維護(hù)在線程池內(nèi),根據(jù)任務(wù)請求進(jìn)行線程的分配迈喉,當(dāng)線程執(zhí)行完任務(wù)后則會(huì)繼續(xù)獲取新的任務(wù)去執(zhí)行绍刮,最終當(dāng)線程獲取不到任務(wù)的時(shí)候,線程就會(huì)被回收挨摸。
接下來孩革,我們會(huì)按照以下三個(gè)部分去詳細(xì)講解線程池運(yùn)行機(jī)制:
- 線程池如何維護(hù)自身狀態(tài)。
- 線程池如何管理任務(wù)得运。
- 線程池如何管理線程膝蜈。
線程池的生命周期
線程池運(yùn)行的狀態(tài),并不是用戶顯式設(shè)置的熔掺,而是伴隨著線程池的運(yùn)行饱搏,由內(nèi)部來維護(hù)。
線程池內(nèi)部使用一個(gè)變量維護(hù)兩個(gè)值:運(yùn)行狀態(tài)(runState
)和線程數(shù)量 (workerCount
)置逻。
在具體實(shí)現(xiàn)中推沸,線程池將運(yùn)行狀態(tài)(runState
)、線程數(shù)量 (workerCount
)兩個(gè)關(guān)鍵參數(shù)的維護(hù)放在了一起:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl
這個(gè)AtomicInteger
類型诽偷,是對線程池的運(yùn)行狀態(tài)和線程池中有效線程的數(shù)量進(jìn)行控制的一個(gè)字段.
它同時(shí)包含兩部分的信息:線程池的運(yùn)行狀態(tài) (runState
) 和線程池內(nèi)有效線程的數(shù)量 (workerCount
)坤学,高3位保存runState
,低29位保存workerCount
报慕,兩個(gè)變量之間互不干擾深浮。
用一個(gè)變量去存儲兩個(gè)值,可避免在做相關(guān)決策時(shí)眠冈,出現(xiàn)不一致的情況飞苇,不必為了維護(hù)兩者的一致,而占用鎖資源蜗顽。通過閱讀線程池源代碼也可以發(fā)現(xiàn)布卡,經(jīng)常出現(xiàn)要同時(shí)判斷線程池運(yùn)行狀態(tài)和線程數(shù)量的情況。線程池也提供了若干方法去供用戶獲得線程池當(dāng)前的運(yùn)行狀態(tài)雇盖、線程個(gè)數(shù)忿等。這里都使用的是位運(yùn)算的方式,相比于基本運(yùn)算崔挖,速度也會(huì)快很多(PS:這種用法在許多源代碼中都可以看到)贸街。
關(guān)于內(nèi)部封裝的獲取生命周期狀態(tài)、獲取線程池線程數(shù)量的計(jì)算方法如以下代碼所示:
private static final int COUNT_BITS = Integer.SIZE - 3;//32-3
private static final int CAPACITY = (1 << COUNT_BITS) - 1;//低29位都為1狸相,高位都為0
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;//111
private static final int SHUTDOWN = 0 << COUNT_BITS;//000
private static final int STOP = 1 << COUNT_BITS;//001
private static final int TIDYING = 2 << COUNT_BITS;//010
private static final int TERMINATED = 3 << COUNT_BITS;//011
// Packing and unpacking ctl
//計(jì)算當(dāng)前運(yùn)行狀態(tài)薛匪,取高三位
private static int runStateOf(int c) { return c & ~CAPACITY; }
//計(jì)算當(dāng)前線程數(shù)量,取低29位
private static int workerCountOf(int c) { return c & CAPACITY; }
//通過狀態(tài)和線程數(shù)生成ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
ThreadPoolExecutor
的運(yùn)行狀態(tài)有5種脓鹃,分別為:
運(yùn)行狀態(tài) | 狀態(tài)描述 |
---|---|
RUNNING | 能接受新提交的任務(wù)逸尖,并且也能處理阻塞隊(duì)列中的任務(wù) |
SHUTDOWN | 不能接受新提交的任務(wù),但卻可以繼續(xù)處理阻塞隊(duì)列中的任務(wù) |
STOP | 不能接受新任務(wù)瘸右,也不能處理隊(duì)列中的任務(wù)同時(shí)會(huì)中斷正在處理的任務(wù)線程 |
TIDYING | 所有的任務(wù)都已經(jīng)終止娇跟,workCount(有效線程數(shù))為0 |
TERMINATED | 在terminated方法執(zhí)行完之后進(jìn)入該狀態(tài) |
任務(wù)調(diào)度機(jī)制
任務(wù)調(diào)度是線程池的主要入口,當(dāng)用戶提交了一個(gè)任務(wù)尊浓,接下來這個(gè)任務(wù)將如何執(zhí)行都是由這個(gè)階段決定的逞频。了解這部分就相當(dāng)于了解了線程池的核心運(yùn)行機(jī)制。
首先栋齿,所有任務(wù)的調(diào)度都是由execute
方法完成的苗胀,這部分完成的工作是:檢查現(xiàn)在線程池的運(yùn)行狀態(tài)、運(yùn)行線程數(shù)瓦堵、運(yùn)行策略基协,決定接下來執(zhí)行的流程,是直接申請線程執(zhí)行菇用,或是緩沖到隊(duì)列中執(zhí)行澜驮,亦或是直接拒絕該任務(wù)。其執(zhí)行過程如下:
- 首先檢測線程池運(yùn)行狀態(tài)惋鸥,如果不是
RUNNING
杂穷,則直接拒絕悍缠,線程池要保證在RUNNING
的狀態(tài)下執(zhí)行任務(wù)。 - 如果
workerCount < corePoolSize
耐量,則創(chuàng)建并啟動(dòng)一個(gè)線程來執(zhí)行新提交的任務(wù)飞蚓。 - 如果
workerCount >= corePoolSize
,且線程池內(nèi)的阻塞隊(duì)列未滿廊蜒,則將任務(wù)添加到該阻塞隊(duì)列中趴拧。 - 如果
workerCount >= corePoolSize && workerCount < maximumPoolSize
,且線程池內(nèi)的阻塞隊(duì)列已滿山叮,則創(chuàng)建并啟動(dòng)一個(gè)線程來執(zhí)行新提交的任務(wù)著榴。 - 如果
workerCount >= maximumPoolSize
,并且線程池內(nèi)的阻塞隊(duì)列已滿, 則根據(jù)拒絕策略來處理該任務(wù), 默認(rèn)的處理方式是直接拋異常屁倔。
接下來進(jìn)入源代碼分析時(shí)間~脑又!
提交任務(wù)
//AbstractExecutorService.java
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
//ThreadPoolExecutor.java
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();//獲取ctl
//檢查當(dāng)前核心線程數(shù),是否小于核心線程數(shù)的大小限制
if (workerCountOf(c) < corePoolSize) {
//沒有達(dá)到核心線程數(shù)的大小限制锐借,那么添家核心線程執(zhí)行該任務(wù)
if (addWorker(command, true))
return;
//如果添加失敗挂谍,刷新ctl值
c = ctl.get();
}
//再次檢查線程池的運(yùn)行狀態(tài),將任務(wù)添加到等待隊(duì)列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();//刷新ctl值
//如果當(dāng)前線程池的裝不是運(yùn)行狀態(tài)瞎饲,那么移除剛才添加的任務(wù)
if (! isRunning(recheck) && remove(command))
reject(command);//移除成功后口叙,使用拒絕策略處理該任務(wù);
else if (workerCountOf(recheck) == 0)//當(dāng)前工作線程數(shù)為0
//線程池正在運(yùn)行嗅战,或者移除任務(wù)失敗妄田。
//添加一個(gè)非核心線程,并不指定該線程的運(yùn)行任務(wù)驮捍。
//等線程創(chuàng)建完成之后疟呐,會(huì)從等待隊(duì)列中獲取任務(wù)執(zhí)行。
addWorker(null, false);
}
//邏輯到這里說明線程池已經(jīng)不是RUNNING狀態(tài)东且,或者等待隊(duì)列已滿启具,需要?jiǎng)?chuàng)建一個(gè)新的非核心線程執(zhí)行該任務(wù);
//如果創(chuàng)建失敗珊泳,那么非核心線程已滿鲁冯,使用拒絕策略處理該任務(wù);
else if (!addWorker(command, false))
reject(command);
}
添加工作線程和執(zhí)行任務(wù)
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;//初始化的任務(wù)色查,可以為null
this.thread = getThreadFactory().newThread(this);//Worker持有的線程
}
/**部分代碼省略*/
public void run() {
runWorker(this);
}
}
添加工作線程和執(zhí)行任務(wù):總體就是創(chuàng)建Worker
薯演,并且為它找到匹配的Runnable
。
添加工作線程
增加線程是通過線程池中的addWorker
方法秧了,該方法的功能就是增加一個(gè)線程跨扮,該方法不考慮線程池是在哪個(gè)階段增加的該線程,這個(gè)分配線程的策略是在上個(gè)步驟完成的,該步驟僅僅完成增加線程衡创,并使它運(yùn)行帝嗡,最后返回是否成功這個(gè)結(jié)果。
addWorker
方法有兩個(gè)參數(shù):firstTask
璃氢、core
丈探。
firstTask
參數(shù)用于指定新增的線程執(zhí)行的第一個(gè)任務(wù),該參數(shù)可以為空拔莱;
core
參數(shù)為true
表示在新增線程時(shí)會(huì)判斷當(dāng)前活動(dòng)線程數(shù)是否少于corePoolSize
,false
表示新增線程前需要判斷當(dāng)前活動(dòng)線程數(shù)是否少于maximumPoolSize
隘竭。
private boolean addWorker(Runnable firstTask, boolean core) {
retry://break和continue的跳出標(biāo)簽
for (;;) {
int c = ctl.get();//獲取ctl的值
int rs = runStateOf(c);//獲取當(dāng)前線程池的狀態(tài)塘秦;
/**
* 1、如果當(dāng)前的線程池狀態(tài)不是RUNNING
* 2动看、當(dāng)前線程池是RUNNING而且沒有添加新任務(wù)尊剔,而且等待隊(duì)列不為空。這種情況下是需要?jiǎng)?chuàng)建執(zhí)行線程的菱皆。
* 所以滿足1须误,但不滿足2就創(chuàng)建執(zhí)行線程失敗,返回false仇轻。
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
/**進(jìn)入內(nèi)層循環(huán) */
for (;;) {
int wc = workerCountOf(c);//獲取當(dāng)前執(zhí)行線程的數(shù)量
/**
* 1、工作線程數(shù)量大于或等于計(jì)數(shù)器的最大閾值,那么創(chuàng)建執(zhí)行線程失敗裤唠,返回false花椭。
* 2、如果當(dāng)前創(chuàng)建的核心線程疲陕,那么工作線程數(shù)大于corePoolSize的話方淤,創(chuàng)建執(zhí)行線程失敗,返回false蹄殃。
* 3携茂、如果當(dāng)前創(chuàng)建的是非核心線程,那么工作線程數(shù)大于maximumPoolSize的話诅岩,創(chuàng)建執(zhí)行線程失敗讳苦,返回false。
*/
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//用CAS操作讓線程數(shù)加1吩谦,如果成功跳出整個(gè)循環(huán)
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)//線程狀態(tài)前后不一樣医吊,重新執(zhí)行外循環(huán)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
//如果CAS操作由于工作線程數(shù)的增加失敗,那么重新進(jìn)行內(nèi)循環(huán)
}
}
/**就現(xiàn)在逮京,線程數(shù)已經(jīng)增加了卿堂。但是真正的線程對象還沒有創(chuàng)建出來。*/
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//加鎖
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
/**
* 再次檢查線程池的運(yùn)行狀態(tài)
* 1、如果是RUNNING狀態(tài)草描,那么可以創(chuàng)建览绿;
* 2、如果是SHUTDOWN狀態(tài)穗慕,但沒有執(zhí)行線程饿敲,可以創(chuàng)建(創(chuàng)建后執(zhí)行等待隊(duì)列中的任務(wù))
*/
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//檢測該線程是否已經(jīng)開啟
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);//將改工作線程添加到集合中
int s = workers.size();
if (s > largestPoolSize)//更新線程池的運(yùn)行時(shí)的最大線程數(shù)
largestPoolSize = s;
workerAdded = true;//標(biāo)識工作線程添加成功
}
} finally {//釋放鎖
mainLock.unlock();
}
if (workerAdded) {//如果工作線程添加成功,那么開啟線程
t.start();
workerStarted = true;
}
}
} finally {
//如果工作線程添加失敗逛绵,那么進(jìn)行失敗處理
//將已經(jīng)增加的線程數(shù)減少怀各,將添加到集合的工作線程刪除
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
執(zhí)行任務(wù)
在添加工作線程部分我們看到了,添加成功之后會(huì)開啟線程執(zhí)行任務(wù)术浪。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
//解鎖瓢对,允許中斷
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//如果當(dāng)前的工作線程已經(jīng)有執(zhí)行任務(wù),或者可以從等待隊(duì)列中獲取到執(zhí)行任務(wù)
//getTask獲取任務(wù)時(shí)候會(huì)進(jìn)行阻塞
while (task != null || (task = getTask()) != null) {
w.lock();//開始執(zhí)行胰苏,上鎖
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
//判斷線程是否需要中斷
//如果線程池狀態(tài)是否為STOP\TIDYING\TERMINATED,同時(shí)當(dāng)前線程沒有被中斷那么將當(dāng)前線程進(jìn)行中斷
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;//異常處理
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;//工作線程的當(dāng)前任務(wù)置空
w.completedTasks++;//當(dāng)前工作線程執(zhí)行完成的線程數(shù)+1
w.unlock();//執(zhí)行完成解鎖
}
}
completedAbruptly = false;//完成了所有任務(wù)硕蛹,正常退出
} finally {//執(zhí)行工作線程的退出操作
processWorkerExit(w, completedAbruptly);
}
}
工作線程獲取任務(wù)
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();//獲取ctl的值
int rs = runStateOf(c);//獲取線程池狀態(tài)
// Check if queue empty only if necessary.
/**
* 1、rs為STOP\TIDYING\TERMINATED硕并,標(biāo)識無法繼續(xù)執(zhí)行任務(wù)
* 2法焰、等待隊(duì)列中沒有任務(wù)可以被執(zhí)行
* 工作線程數(shù)量減一
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);//獲取工作線程數(shù)量
// Are workers subject to culling?
//如果允許核心線程超時(shí),或者當(dāng)前工作線程數(shù)量大于核心線程數(shù)量倔毙。標(biāo)識需要進(jìn)行超時(shí)檢測
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/**
* 1埃仪、如果當(dāng)前工作線程數(shù)是否大于線程池可允許的最大工作線程數(shù)(maximumPoolSize可以動(dòng)態(tài)設(shè)置)
* ,或者當(dāng)前需要進(jìn)行超時(shí)控制并且上次從等待隊(duì)列中獲取執(zhí)行任務(wù)發(fā)生了超時(shí)陕赃。
* 2贵试、如果當(dāng)前不是唯一的線程,并且等待隊(duì)列中沒有需要執(zhí)行的任務(wù)凯正。
* 這兩種情況下一起存在就表示毙玻,工作線程發(fā)生了超時(shí)需要回收,所以對線程數(shù)進(jìn)行-1廊散;
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))//線程數(shù)量減少成功桑滩,否則重新執(zhí)行本次循環(huán)
return null;
continue;
}
try {
//如果設(shè)置有超時(shí),那么設(shè)定超時(shí)時(shí)間允睹。否則進(jìn)行無限的阻塞等待執(zhí)行任務(wù)
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;//獲取超時(shí),設(shè)置標(biāo)記
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
工作線程的退出
線程池中線程的銷毀依賴JVM自動(dòng)的回收运准,線程池做的工作是根據(jù)當(dāng)前線程池的狀態(tài)維護(hù)一定數(shù)量的線程引用,防止這部分線程被JVM回收缭受,當(dāng)線程池決定哪些線程需要回收時(shí)胁澳,只需要將其引用消除即可。Worker
被創(chuàng)建出來后米者,就會(huì)不斷地進(jìn)行輪詢韭畸,然后獲取任務(wù)去執(zhí)行宇智,核心線程可以無限等待獲取任務(wù),非核心線程要限時(shí)獲取任務(wù)胰丁。當(dāng)Worker
無法獲取到任務(wù)随橘,也就是獲取的任務(wù)為空時(shí),循環(huán)會(huì)結(jié)束锦庸,Worker
會(huì)主動(dòng)消除自身在線程池內(nèi)的引用机蔗。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//completedAbruptly為true,標(biāo)識該工作線程執(zhí)行出現(xiàn)了異常甘萧,將工作線程數(shù)減一
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
//否則標(biāo)識該工作線程為正常結(jié)束萝嘁,這種情況下getTask方法中已經(jīng)對工作線程進(jìn)行了減一
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//加鎖
try {
completedTaskCount += w.completedTasks;//更新線程池的,線程執(zhí)行完成數(shù)量
workers.remove(w);//工作線程容器移除該工作線程
} finally {
mainLock.unlock();//解鎖
}
//嘗試結(jié)束線程池
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {//如果當(dāng)前線程池的運(yùn)行狀態(tài)是RUNNING\SHUTDOWN
if (!completedAbruptly) {//如果該工作線程為正常結(jié)束
/**
* 判斷當(dāng)前需要的最少的核心線程數(shù)(如果允許核心線程超時(shí)扬卷,那么最小的核心線程數(shù)為0牙言,否則為corePoolSize)
*/
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//如果允許核心線程超時(shí),而且等待隊(duì)列不為空邀泉,那么工作線程的最小值為1,否則為0钝鸽。
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//當(dāng)前工作線程數(shù)汇恤,是否滿足最先的核心線程數(shù)
if (workerCountOf(c) >= min)
//如果滿足那么直接return
return; // replacement not needed
}
//如果是異常結(jié)束,或者當(dāng)前線程數(shù)不滿足最小的核心線程數(shù)拔恰,那么添加一個(gè)非核心線程
//核心線程和非核心線程沒有什么不同因谎,只是在創(chuàng)建的時(shí)候判斷邏輯不同
addWorker(null, false);
}
}
特需
線程池的監(jiān)控
通過線程池提供的參數(shù)進(jìn)行監(jiān)控。線程池里有一些屬性在監(jiān)控線程池的時(shí)候可以使用
-
getTaskCount
:線程池已經(jīng)執(zhí)行的和未執(zhí)行的任務(wù)總數(shù)颜懊; -
getCompletedTaskCount
:線程池已完成的任務(wù)數(shù)量财岔,該值小于等于taskCount
; -
getLargestPoolSize
:線程池曾經(jīng)創(chuàng)建過的最大線程數(shù)量河爹。通過這個(gè)數(shù)據(jù)可以知道線程池是否滿過匠璧,也就是達(dá)到了maximumPoolSize
; -
getPoolSize
:線程池當(dāng)前的線程數(shù)量咸这; -
getActiveCount
:當(dāng)前線程池中正在執(zhí)行任務(wù)的線程數(shù)量夷恍。
動(dòng)態(tài)調(diào)整線程池的大小
JDK
允許線程池使用方通過ThreadPoolExecutor
的實(shí)例來動(dòng)態(tài)設(shè)置線程池的核心策略,以setCorePoolSize
為方法例;
在運(yùn)行期線程池使用方調(diào)用此方法設(shè)置
corePoolSize
之后媳维,線程池會(huì)直接覆蓋原來的corePoolSize
值酿雪,并且基于當(dāng)前值和原始值的比較結(jié)果采取不同的處理策略。對于當(dāng)前值小于當(dāng)前工作線程數(shù)的情況侄刽,說明有多余的
worker
線程指黎,此時(shí)會(huì)向當(dāng)前idle
的worker
線程發(fā)起中斷請求以實(shí)現(xiàn)回收,多余的worker
在下次idel
的時(shí)候也會(huì)被回收州丹;對于當(dāng)前值大于原始值且當(dāng)前隊(duì)列中有待執(zhí)行任務(wù)醋安,則線程池會(huì)創(chuàng)建新的worker
線程來執(zhí)行隊(duì)列任務(wù)(PS:idel
狀態(tài)為worker
線程釋放鎖之后的狀態(tài),因?yàn)樗谶\(yùn)行期間都是上鎖的)。
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
//計(jì)算增量
int delta = corePoolSize - this.corePoolSize;
//覆蓋原有的corePoolSize
this.corePoolSize = corePoolSize;
//如果當(dāng)前的工作線程數(shù)量大于線程池的最大可運(yùn)行核心線程數(shù)量茬故,那么進(jìn)行中斷工作線程處理
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();
else if (delta > 0) {//如果增量大于0
// We don't really know how many new threads are "needed".
// As a heuristic, prestart enough new workers (up to new
// core size) to handle the current number of tasks in
// queue, but stop if queue becomes empty while doing so.
//等待隊(duì)列非空盖灸,獲取等待任務(wù)和增量的最小值
int k = Math.min(delta, workQueue.size());
//循環(huán)創(chuàng)建核心工作線程執(zhí)行等待隊(duì)列中的任務(wù)
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//加鎖
try {
//遍歷工作線程的集合
for (Worker w : workers) {
Thread t = w.thread;
//如果當(dāng)前線程沒有被中斷,而且能獲取到鎖磺芭,那么嘗試進(jìn)行中斷赁炎,最后釋放鎖
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
//是否僅僅中斷一個(gè)工作線程
if (onlyOne)
break;
}
} finally {//釋放鎖
mainLock.unlock();
}
}
優(yōu)雅的關(guān)閉線程池
從《線程池聲明周期》圖上還可以看到,當(dāng)我們執(zhí)行 ThreadPoolExecutor#shutdown
方法將會(huì)使線程池狀態(tài)從 RUNNING 轉(zhuǎn)變?yōu)?SHUTDOWN钾腺。而調(diào)用 ThreadPoolExecutor#shutdownNow
之后線程池狀態(tài)將會(huì)從 RUNNING 轉(zhuǎn)變?yōu)?STOP徙垫。
shutdown
停止接收新任務(wù),原來的任務(wù)繼續(xù)執(zhí)行
- 停止接收新的submit的任務(wù)放棒;
- 已經(jīng)提交的任務(wù)(包括正在跑的和隊(duì)列中等待的),會(huì)繼續(xù)執(zhí)行完成姻报;
- 等到第2步完成后,才真正停止间螟;
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();// 檢查權(quán)限
advanceRunState(SHUTDOWN);// 設(shè)置線程池狀態(tài)
interruptIdleWorkers();// 中斷空閑線程
// 鉤子函數(shù)吴旋,主要用于清理一些資源
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
shutdown
方法首先加鎖,其次先檢查系統(tǒng)安裝狀態(tài)厢破。接著就會(huì)將線程池狀態(tài)變?yōu)?SHUTDOWN荣瑟,在這之后線程池不再接受提交的新任務(wù)。此時(shí)如果還繼續(xù)往線程池提交任務(wù)摩泪,將會(huì)使用線程池拒絕策略響應(yīng)笆焰,默認(rèn)情況下將會(huì)使用 ThreadPoolExecutor.AbortPolicy
,拋出 RejectedExecutionException
異常见坑。
interruptIdleWorkers
方法在動(dòng)態(tài)調(diào)整線程池大小部分有源碼講述嚷掠,它只會(huì)中斷空閑的線程,不會(huì)中斷正在執(zhí)行任務(wù)的的線程荞驴〔唤裕空閑的線程將會(huì)阻塞在線程池的阻塞隊(duì)列上。
shutdownNow
停止接收新任務(wù)熊楼,原來的任務(wù)停止執(zhí)行
- 跟
shutdown()
一樣粟焊,先停止接收新submit
的任務(wù); - 忽略隊(duì)列里等待的任務(wù)孙蒙;
- 嘗試將正在執(zhí)行的任務(wù)
interrupt
中斷项棠; - 返回未執(zhí)行的任務(wù)列表;
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();// 檢查狀態(tài)
advanceRunState(STOP);// 將線程池狀態(tài)變?yōu)?STOP
interruptWorkers();// 中斷所有線程挎峦,包括工作線程以及空閑線程
tasks = drainQueue();// 丟棄工作隊(duì)列中存量任務(wù)
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
//如果工作線程已經(jīng)開始香追,那么調(diào)用interrupt進(jìn)行中斷
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
//從此隊(duì)列中刪除所有可用的元素,并將它們添加到給定的集合中坦胶。
q.drainTo(taskList);
//如果隊(duì)列是DelayQueue或其他類型的隊(duì)列透典,而poll或drainTo可能無法刪除某些元素晴楔,則會(huì)將它們逐個(gè)刪除。
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
shutdownNow
試圖終止線程的方法是通過調(diào)用 Thread.interrupt()
方法來實(shí)現(xiàn)的峭咒,這種方法的作用有限税弃,如果線程中沒有sleep
、wait
凑队、Condition
则果、定時(shí)鎖等應(yīng)用, interrupt()
方法是無法中斷當(dāng)前的線程的。所以漩氨,shutdownNow()
并不代表線程池就一定立即就能退出西壮,它也可能必須要等待所有正在執(zhí)行的任務(wù)都執(zhí)行完成了才能退出。但是大多數(shù)時(shí)候是能立即退出的叫惊。
線程中斷機(jī)制:
thread#interrupt
只是設(shè)置一個(gè)中斷標(biāo)志款青,不會(huì)立即中斷正常的線程。如果想讓中斷立即生效霍狰,必須在線程 內(nèi)調(diào)用Thread.interrupted()
判斷線程的中斷狀態(tài)抡草。 對于阻塞的線程,調(diào)用中斷時(shí)蔗坯,線程將會(huì)立刻退出阻塞狀態(tài)并拋出InterruptedException
異常康震。所以對于阻塞線程需要正確處理InterruptedException
異常。
awaitTermination
線程池 shutdown
與 shutdownNow
方法都不會(huì)主動(dòng)等待執(zhí)行任務(wù)的結(jié)束步悠,如果需要等到線程池任務(wù)執(zhí)行結(jié)束签杈,需要調(diào)用 awaitTermination
主動(dòng)等待任務(wù)調(diào)用結(jié)束瘫镇。
- 等所有已提交的任務(wù)(包括正在跑的和隊(duì)列中等待的)執(zhí)行完鼎兽;
- 等超時(shí)時(shí)間到了;
- 線程被中斷铣除,拋出
InterruptedException
;
如果線程池任務(wù)執(zhí)行結(jié)束谚咬,awaitTermination
方法將會(huì)返回 true
,否則當(dāng)?shù)却龝r(shí)間超過指定時(shí)間后將會(huì)返回 false
尚粘。
// 關(guān)閉線程池的鉤子函數(shù)
private static void shutdown(ExecutorService executorService) {
// 第一步:使新任務(wù)無法提交
executorService.shutdown();
try {
// 第二步:等待未完成任務(wù)結(jié)束
if(!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
// 第三步:取消當(dāng)前執(zhí)行的任務(wù)
executorService.shutdownNow();
// 第四步:等待任務(wù)取消的響應(yīng)
if(!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("Thread pool did not terminate");
}
}
} catch(InterruptedException ie) {
// 第五步:出現(xiàn)異常后择卦,重新取消當(dāng)前執(zhí)行的任務(wù)
executorService.shutdownNow();
Thread.currentThread().interrupt(); // 設(shè)置本線程中斷狀態(tài)
}
}
其他
感覺內(nèi)容好多,寫不完啊~郎嫁!
說的線程池就不得不說多線程并發(fā)操作秉继,同步
,異步
泽铛,CSA
尚辑,AQS
,公平鎖和非公平鎖
盔腔,可重入鎖和非可重入鎖
等各種并發(fā)控制需要的知識點(diǎn)杠茬。
平常工作中使用比較少月褥,自己有沒有系統(tǒng)的知識體系結(jié)構(gòu)。導(dǎo)致好多學(xué)過之后忘掉瓢喉,然后又學(xué)習(xí)又忘記宁赤。
希望我以后有機(jī)會(huì)能逐步進(jìn)行學(xué)習(xí)和分享。
文章到這里就全部講述完啦栓票,若有其他需要交流的可以留言哦决左!!