定義類
public class ThreadPoolExecutor extends AbstractExecutorService
重要變量
//線程池控制器
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//任務(wù)隊(duì)列
private final BlockingQueue<Runnable> workQueue;
//全局鎖
private final ReentrantLock mainLock = new ReentrantLock();
//工作線程集合
private final HashSet<Worker> workers = new HashSet<Worker>();
//終止條件 - 用于等待任務(wù)完成后才終止線程池
private final Condition termination = mainLock.newCondition();
//曾創(chuàng)建過(guò)的最大線程數(shù)
private int largestPoolSize;
//線程池已完成總?cè)蝿?wù)數(shù)
private long completedTaskCount;
//工作線程創(chuàng)建工廠
private volatile ThreadFactory threadFactory;
//飽和拒絕策略執(zhí)行器
private volatile RejectedExecutionHandler handler;
//工作線程活動(dòng)保持時(shí)間(超時(shí)后會(huì)被回收) - 納秒
private volatile long keepAliveTime;
/**
* 允許核心工作線程響應(yīng)超時(shí)回收
* false:核心工作線程即使空閑超時(shí)依舊存活
* true:核心工作線程一旦超過(guò)keepAliveTime仍然空閑就被回收
*/
private volatile boolean allowCoreThreadTimeOut;
//核心工作線程數(shù)
private volatile int corePoolSize;
//最大工作線程數(shù)
private volatile int maximumPoolSize;
//默認(rèn)飽和策略執(zhí)行器 - AbortPolicy -> 直接拋出異常
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
1 ThreadPoolExecutor的構(gòu)造方法中各個(gè)參數(shù)
- ThreadPoolExecutor是線程的真正實(shí)現(xiàn)叛赚,通常使用工廠類Executors來(lái)創(chuàng)建慎式。
- 但它的構(gòu)造方法提供了一系列參數(shù)來(lái)配置線程池揣非,下面我們就先介紹ThreadPoolExecutor的構(gòu)造方法中各個(gè)參數(shù)的含義叛氨。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
1.1 corePoolSize
- 線程池的核心線程數(shù)
- 在創(chuàng)建了線程池后堕仔,默認(rèn)情況下擂橘,線程池中并沒(méi)有 任何線程
- 等待有任務(wù)到來(lái)才創(chuàng)建線程去執(zhí)行任務(wù),除非調(diào)用了prestartAllCoreThreads()或者 prestartCoreThread()方法摩骨,從這2個(gè)方法的名字就可以看出通贞,是預(yù)創(chuàng)建線程的意思,即在沒(méi)有任務(wù)到來(lái)之前就創(chuàng)建 corePoolSize個(gè)線程或者一個(gè)線程恼五。
- 默認(rèn)情況下昌罩,核心線程數(shù)會(huì)一直在線程池中存活,即使它們處理閑置狀態(tài)灾馒。
- 如果將ThreadPoolExecutor的allowCoreThreadTimeOut屬性設(shè)置為true茎用,那么閑置的核心線程在等待新任務(wù)到來(lái)時(shí)會(huì)執(zhí)行超時(shí)策略,這個(gè)時(shí)間間隔由keepAliveTime所指定睬罗,當(dāng)等待時(shí)間超過(guò)keepAliveTime所指定的時(shí)長(zhǎng)后轨功,核心線程就會(huì)被終止。
1.2 maximumPoolSize
- 線程池所能容納的最大線程數(shù)量
- 當(dāng)
(隊(duì)列已滿 && 實(shí)際工作線程數(shù) < 最大工作線程數(shù))
時(shí)容达,線程池會(huì)創(chuàng)建新的工作線程(即使此時(shí)仍有空閑的工作線程)執(zhí)行任務(wù)直到最大工作線程數(shù)為止古涧;設(shè)置無(wú)界隊(duì)列時(shí)該參數(shù)其實(shí)無(wú)效- 該值實(shí)際的可設(shè)置最大值不是Integer.MAX_VALUE,而是常量CAPACITY
1.3 keepAliveTime(工作線程最大空閑時(shí)間)
- 滿足超時(shí)條件且空閑的工作線程會(huì)被回收花盐;超時(shí)的非核心工作線程會(huì)被回收羡滑,核心工作線程不會(huì)被回收圆米;
- 當(dāng)allowCoreThreadTimeOut=true時(shí),則超時(shí)的核心工作線程也會(huì)被回收啄栓;
- 若該值沒(méi)有設(shè)置則線程會(huì)永遠(yuǎn)存活娄帖;建議當(dāng)場(chǎng)景為任務(wù)短而多時(shí),可以調(diào)高時(shí)間以提高線程利用率
- 默認(rèn)情況下昙楚,只有當(dāng)線程池中的線程數(shù)大于corePoolSize 時(shí)近速,keepAliveTime才會(huì)起作用,直到線程池中的線程數(shù)不大于corePoolSize堪旧。
- 即當(dāng)線程池中的線程數(shù)大于corePoolSize 時(shí)削葱,如果一個(gè)線程空閑的時(shí)間達(dá)到keepAliveTime,則會(huì)終止淳梦,直到線程池中的線程數(shù)不超過(guò)corePoolSize析砸。
1.4 unit
- 用于指定keepAliveTime參數(shù)的時(shí)間單位
- 這是一個(gè)枚舉,常用的有
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小時(shí)
TimeUnit.MINUTES; //分鐘
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //納秒
1.5 workQueue
- 線程池所使用的緩沖隊(duì)列爆袍,該緩沖隊(duì)列的長(zhǎng)度決定了能夠緩沖的最大數(shù)量首繁。
- 當(dāng)
(實(shí)際工作線程數(shù) >= 核心工作線程數(shù)) && (任務(wù)數(shù) < 任務(wù)隊(duì)列長(zhǎng)度)時(shí),任務(wù)會(huì)offer()入隊(duì)等待
陨囊;
1.6 threadFactory
- 線程工廠弦疮,為線程池提供創(chuàng)建新線程的功能。
- 如果沒(méi)有另外說(shuō)明蜘醋,則在同一個(gè) ThreadGroup 中一律使用 Executors.defaultThreadFactory() 創(chuàng)建線程胁塞,并且這些線程具有相同的 NORM_PRIORITY 優(yōu)先級(jí)和非守護(hù)進(jìn)程狀態(tài)。
- 通過(guò)提供不同的 ThreadFactory压语,可以改變線程的名稱啸罢、線程組、優(yōu)先級(jí)胎食、守護(hù)進(jìn)程狀態(tài)等等扰才。
- 如果從 newThread 返回 null 時(shí) ThreadFactory 未能創(chuàng)建線程,則執(zhí)行程序?qū)⒗^續(xù)運(yùn)行斥季,但不能執(zhí)行任何任務(wù)训桶。
1.7 RejectExecutionHandler
- 當(dāng)線程池和隊(duì)列都已滿,此時(shí)說(shuō)明線程已無(wú)力再接收更多的任務(wù)酣倾,即任務(wù)數(shù)飽和舵揭,沒(méi)法接單了;此時(shí)需要使用一種飽和策略處理新提交的任務(wù)躁锡,默認(rèn)是Abort(直拋Reject異常)午绳,還包括Discard(LIFO規(guī)則丟棄)、DiscardOldest(LRU規(guī)則丟棄) 以及 CallerRuns(調(diào)用者線程執(zhí)行)映之,允許自定義執(zhí)行器
- 有以下四種取值:
- ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常拦焚。
- ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù)蜡坊,但是不拋出異常。
- ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù)赎败,然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過(guò)程)
- ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)
2 ThreadPoolExecutor執(zhí)行規(guī)則
2.1 ThreadPoolExecutor執(zhí)行規(guī)則
2.2 執(zhí)行情況
隨著任務(wù)數(shù)增加秕衙,線程池的執(zhí)行情況主要有如下四種情況,分別對(duì)應(yīng)處理流程中的各項(xiàng)判斷:
1.若實(shí)際工作線程數(shù)workers<核心工作線程數(shù)corePoolSize僵刮,則創(chuàng)建新工作線程來(lái)執(zhí)行新任務(wù)execute(Runable)
2.若實(shí)際工作線程數(shù)workers>=核心工作線程數(shù)corePoolSize(核心工作線程們都在執(zhí)行任務(wù))且任務(wù)隊(duì)列workQueue未滿据忘,則將任務(wù)加入到任務(wù)隊(duì)列workQueue中
3.若任務(wù)隊(duì)列workQueue已滿,則創(chuàng)建新工作線程來(lái)執(zhí)行任務(wù)execute()
4.若實(shí)際工作線程數(shù)workers>=最大工作線程數(shù)maximumPoolSize(所有線程都在執(zhí)行任務(wù))搞糕,此時(shí)任務(wù)數(shù)已飽和勇吊,需要根據(jù)飽和拒絕策略rejectedExecutionHandler執(zhí)行相對(duì)應(yīng)的飽和拒絕操作
線程池的總體設(shè)計(jì)是基于性能考慮,盡可能避免獲取全局鎖:
1.由于創(chuàng)建新線程時(shí)都需要獲取全局鎖窍仰,因此步驟1和步驟3必須加鎖
2.為了避免多次獲取全局鎖(性能伸縮瓶頸)汉规,當(dāng)實(shí)際工作線程數(shù)>=核心工作線程數(shù)時(shí),之后會(huì)執(zhí)行步驟2(入隊(duì)時(shí)無(wú)須獲取全局鎖)
超時(shí)處理
若您需要處理超時(shí)的核心工作線程驹吮,選第二種针史;若不需要,選第一種:
1.若實(shí)際工作線程數(shù)workers>核心工作線程數(shù)corePoolSize钥屈,回收空閑時(shí)間超過(guò)keepAliveTime的空閑的非核心線程(減少工作線程數(shù)直到<=核心工作線程數(shù)即可)
2.若設(shè)置allowCoreThreadTimeOut為true時(shí)悟民,則超過(guò)keepAliveTime的空閑的核心工作線程也會(huì)被回收
2.2 提交和執(zhí)行任務(wù)
execute()
:適用于提交無(wú)須返回值的任務(wù)
-該方法是無(wú)法判斷任務(wù)是否被線程池執(zhí)行成功submit()
: 適用于提交需要返回值的任務(wù)
-可以通過(guò)返回的Future
對(duì)象得知任務(wù)是否已經(jīng)執(zhí)行成功
-get()
方法會(huì)阻塞當(dāng)前線程直到任務(wù)完成,但要注意防范無(wú)限阻塞E窬汀!近忙!
-使用get(long timeout,TimeUnit unit)
方法會(huì)阻塞當(dāng)前線程直到任務(wù)完成或超時(shí)竭业,不會(huì)有無(wú)限阻塞的發(fā)生但需要注意超時(shí)后任務(wù)可能還沒(méi)完成!<吧帷未辆!
2 線程池的關(guān)閉
- ThreadPoolExecutor提供了兩個(gè)方法,用于線程池的關(guān)閉锯玛,分別是shutdown()和shutdownNow()咐柜,其中:
shutdown()
: 有序地關(guān)閉線程池,已提交的任務(wù)會(huì)被執(zhí)行(包含正在執(zhí)行和任務(wù)隊(duì)列中的)攘残,但會(huì)拒絕新任務(wù)
shutdownNow()
: 立即(嘗試)停止執(zhí)行所有任務(wù)(包含正在執(zhí)行和任務(wù)隊(duì)列中的)拙友,并返回待執(zhí)行任務(wù)列表
注意:上述方法都可以通過(guò)調(diào)用awaitTermination()
等待任務(wù)完成后才終止線程池
合理配置線程池
要想合理的配置線程池的大小,首先得分析任務(wù)的特性歼郭,可以從以下幾個(gè)角度分析:
1遗契、任務(wù)的性質(zhì):CPU密集型任務(wù)、IO密集型任務(wù)病曾、混合型任務(wù)牍蜂。
2漾根、任務(wù)的優(yōu)先級(jí):高、中鲫竞、低辐怕。
3、任務(wù)的執(zhí)行時(shí)間:長(zhǎng)从绘、中秘蛇、短。
4顶考、任務(wù)的依賴性:是否依賴其他系統(tǒng)資源赁还,如數(shù)據(jù)庫(kù)連接等。
性質(zhì)不同的任務(wù)可以交給不同規(guī)模的線程池執(zhí)行驹沿。
對(duì)于不同性質(zhì)的任務(wù)來(lái)說(shuō)艘策,
CPU密集型任務(wù)應(yīng)配置盡可能小的線程,如配置CPU個(gè)數(shù)+1的線程數(shù)渊季,
IO密集型任務(wù)應(yīng)配置盡可能多的線程朋蔫,因?yàn)镮O操作不占用CPU,不要讓CPU閑下來(lái)却汉,應(yīng)加大線程數(shù)量驯妄,如配置兩倍CPU個(gè)數(shù)+1,
而對(duì)于混合型的任務(wù)合砂,如果可以拆分青扔,拆分成IO密集型和CPU密集型分別處理,前提是兩者運(yùn)行的時(shí)間是差不多的翩伪,如果處理時(shí)間相差很大微猖,則沒(méi)必要拆分了。
若任務(wù)對(duì)其他系統(tǒng)資源有依賴缘屹,如某個(gè)任務(wù)依賴數(shù)據(jù)庫(kù)的連接返回的結(jié)果凛剥,這時(shí)候等待的時(shí)間越長(zhǎng),則CPU空閑的時(shí)間越長(zhǎng)轻姿,那么線程數(shù)量應(yīng)設(shè)置得越大犁珠,才能更好的利用CPU。
高并發(fā)互亮、任務(wù)執(zhí)行時(shí)間短的業(yè)務(wù)怎樣使用線程池犁享?并發(fā)不高、任務(wù)執(zhí)行時(shí)間長(zhǎng)的業(yè)務(wù)怎樣使用線程池胳挎?并發(fā)高饼疙、業(yè)務(wù)執(zhí)行時(shí)間長(zhǎng)的業(yè)務(wù)怎樣使用線程池?
(1)高并發(fā)、任務(wù)執(zhí)行時(shí)間短的業(yè)務(wù)窑眯,線程池線程數(shù)可以設(shè)置為CPU核數(shù)+1屏积,減少線程上下文的切換
(2)并發(fā)不高、任務(wù)執(zhí)行時(shí)間長(zhǎng)的業(yè)務(wù)要區(qū)分開看:
a)假如是業(yè)務(wù)時(shí)間長(zhǎng)集中在IO操作上磅甩,也就是IO密集型的任務(wù)炊林,因?yàn)镮O操作并不占用CPU,所以不要讓所有的CPU閑下來(lái)卷要,可以適當(dāng)加大線程池中的線程數(shù)目渣聚,讓CPU處理更多的業(yè)務(wù)
b)假如是業(yè)務(wù)時(shí)間長(zhǎng)集中在計(jì)算操作上,也就是計(jì)算密集型任務(wù)僧叉,這個(gè)就沒(méi)辦法了奕枝,和(1)一樣吧,線程池中的線程數(shù)設(shè)置得少一些瓶堕,減少線程上下文的切換
(3)并發(fā)高衙熔、業(yè)務(wù)執(zhí)行時(shí)間長(zhǎng)其兴,解決這種類型任務(wù)的關(guān)鍵不在于線程池而在于整體架構(gòu)的設(shè)計(jì),看看這些業(yè)務(wù)里面某些數(shù)據(jù)是否能做緩存是第一步惶看,增加服務(wù)器是 第二步者铜,至于線程池的設(shè)置窃判,設(shè)置參考
線程池狀態(tài)
狀態(tài)控制器
//線程池狀態(tài)控制器斜姥,用于保證線程池狀態(tài)和工作線程數(shù) ps:低29位為工作線程數(shù)量惑惶,高3位為線程池狀態(tài)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//設(shè)定偏移量 Integer.SIZE = 32 -> 即COUNT_BITS = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
//確定最大的容量2^29-1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//獲取線程池狀態(tài),取高3位
private static int runStateOf(int c) { return c & ~CAPACITY; }
//獲取工作線程數(shù)量凄吏,取低29位
private static int workerCountOf(int c) { return c & CAPACITY; }
/**
* 獲取線程池狀態(tài)控制器
* @param rs 表示runState 線程池狀態(tài)
* @param wc 表示workerCount 工作線程數(shù)量
*/
private static int ctlOf(int rs, int wc) { return rs | wc; }
&:與運(yùn)算符远舅,同位都為1才為1,否則為0
|:或運(yùn)算符竞思,同位有一個(gè)為1即為1表谊,否則為0
~:非運(yùn)算符,0和1互換盖喷,即若是0變成1,1則變成0
^:異或運(yùn)算符,同位相同則為0难咕,不同則為1
線程池狀態(tài)
線程狀態(tài)的流轉(zhuǎn)遵循如下順序课梳,即由小到大順序排列:
RUNNING -> SHUTDOWN -> STOP -> TIDYING -> TERMINATED
// runState is stored in the high-order bits 用Integer的高三位表示
//高3位111,低29位為0 該狀態(tài)下線程池會(huì)接收新提交任務(wù)和執(zhí)行隊(duì)列任務(wù)
private static final int RUNNING = -1 << COUNT_BITS;
//高3位000余佃,低29位為0 該狀態(tài)下線程池不再接收新任務(wù)暮刃,但還會(huì)繼續(xù)執(zhí)行隊(duì)列任務(wù)
private static final int SHUTDOWN = 0 << COUNT_BITS;
//高3位001,低29位為0 該狀態(tài)下線程池不再接收新任務(wù)爆土,不會(huì)再執(zhí)行隊(duì)列任務(wù)椭懊,并會(huì)中斷正在執(zhí)行中的任務(wù)
private static final int STOP = 1 << COUNT_BITS;
//高3位010,低29位為0 該狀態(tài)下線程池的所有任務(wù)都被終止步势,工作線程數(shù)為0氧猬,期間會(huì)調(diào)用鉤子方法terminated()
private static final int TIDYING = 2 << COUNT_BITS;
//高3位011背犯,低29位為0 該狀態(tài)下表明線程池terminated()方法已經(jīng)調(diào)用完成
private static final int TERMINATED = 3 << COUNT_BITS;
Worker
Worker是線程池的內(nèi)部類,用于封裝工作線程和任務(wù)并管理工作線程的中斷狀態(tài)等功能
組成
Worker類封裝了 ( 鎖 + 線程 + 任務(wù) ) 這三個(gè)部分盅抚,從而成為了一個(gè)多面手的存在:
1.繼承AQS類: 實(shí)現(xiàn)簡(jiǎn)單的不可重入互斥鎖漠魏,以提供便捷的鎖操作,目的用于處理中斷情況
2.實(shí)現(xiàn)Runnable接口: "投機(jī)取巧"的設(shè)計(jì)妄均,主要是借用Runnable接口的統(tǒng)一寫法柱锹,好處是不用重新寫一個(gè)同功能接口
3.工作線程: Worker會(huì)通過(guò)thread變量綁定一個(gè)真正執(zhí)行任務(wù)的工作線程(一對(duì)一),初始化時(shí)就由線程工廠分配好丰包,它會(huì)反復(fù)地獲取和執(zhí)行任務(wù)
4.任務(wù): Worker每次都會(huì)將新任務(wù)賦值給firstTask變量禁熏,工作線程每次通過(guò)該變量處理新獲取到的任務(wù)(初始化時(shí)該值允許為null,有特殊作用)
可以簡(jiǎn)單的理解工作線程等價(jià)于worker邑彪,尤其是談及數(shù)量時(shí)瞧毙,比如創(chuàng)建工作線程實(shí)際上就是創(chuàng)建一個(gè)worker
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable{
/** 實(shí)際上真正的工作線程 - 幕后大佬,但可能因線程工廠創(chuàng)建失敗而為null */
final Thread thread;
/** 待執(zhí)行任務(wù)锌蓄,可能為null */
Runnable firstTask;
/** 該工作線程已完成的任務(wù)數(shù) -- 論KPI的重要性 */
volatile long completedTasks;
Worker(Runnable firstTask) {
//設(shè)置鎖狀態(tài)為-1升筏,目的是為了阻止在runWorker()之前被中斷
setState(-1);
/**
* 新任務(wù),任務(wù)來(lái)源有兩個(gè):
* 1.調(diào)用addWorker()方法新建線程時(shí)傳入的第一個(gè)任務(wù)
* 2.調(diào)用runWorker()方法時(shí)內(nèi)部循環(huán)調(diào)用getTask() -- 這就是線程復(fù)用的具現(xiàn)
*/
this.firstTask = firstTask;
/**
* 創(chuàng)建一個(gè)新的線程 -> 這個(gè)是真正的工作線程
* 注意Worker本身就是個(gè)Runnable對(duì)象
* 因此newThread(this)中的this也是個(gè)Runnable對(duì)象
*/
this.thread = getThreadFactory().newThread(this);
}
}
執(zhí)行任務(wù)
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
/**
* 工作線程運(yùn)行
* runWorker方法內(nèi)部會(huì)通過(guò)輪詢的方式
* 不停地獲取任務(wù)和執(zhí)行任務(wù)直到線程被回收
*/
public void run() {
runWorker(this);
}
(重點(diǎn))這里簡(jiǎn)單介紹一下線程在線程池執(zhí)行任務(wù)的工作流程:
1.工作線程開始執(zhí)行前瘸爽,需先對(duì)worker加鎖您访,任務(wù)完成解鎖
2.任務(wù)執(zhí)行前后分別執(zhí)行beforeExecute()和afterExecute()方法
3.執(zhí)行中遇到異常會(huì)向外拋出,線程是否死亡取決于您對(duì)于異常的處理
4.每個(gè)任務(wù)執(zhí)行完后剪决,當(dāng)前工作線程任務(wù)完成數(shù)自增灵汪,同時(shí)會(huì)循環(huán)調(diào)用getTask()從任務(wù)隊(duì)列中反復(fù)獲取任務(wù)并執(zhí)行,無(wú)任務(wù)可執(zhí)行時(shí)線程會(huì)阻塞在該方法上
5.當(dāng)工作線程因各種理由退出時(shí)柑潦,會(huì)執(zhí)行processWorkerExit()回收線程(核心是將該worker從workers集合中移除享言,注意之前worker已經(jīng)退出任務(wù)循環(huán),因此已經(jīng)不再做工了渗鬼,從集合移除后就方便gc了)
鎖方法
// Lock methods
// The value 0 represents the unlocked state. 0表示未鎖定
// The value 1 represents the locked state. 1表示已鎖定
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
//鎖狀態(tài)非0即1览露,即不可重入
//特殊情況:只有初始化時(shí)才為-1,目的是防止線程初始化階段被中斷
if (compareAndSetState(0, 1)) {
//當(dāng)前線程占有鎖
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
//釋放鎖
setExclusiveOwnerThread(null);
//狀態(tài)恢復(fù)成未鎖定狀態(tài)
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null
&& !t.isInterrupted()){
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
小問(wèn):為什么不直接執(zhí)行提交的command而必須使用Worker封裝譬胎?
友情小提示:這跟worker的作用有關(guān)系
小答:主要是為了控制中斷
小問(wèn):如何控制中斷差牛?
友情小提示:Worker繼承了AQS從而是一把AQS鎖
小答:Worker對(duì)于中斷處理有如下四個(gè)準(zhǔn)則:
1.當(dāng)工作線程真正開始執(zhí)行之前,不允許被中斷
2.當(dāng)工作線程正在執(zhí)行任務(wù)時(shí)堰乔,不允許被中斷
3.當(dāng)工作線程正等待從任務(wù)隊(duì)列中獲取任務(wù)getTask()時(shí)才能被中斷
4.調(diào)用interruptIdleWorkers()中斷空閑線程時(shí)必須先獲得worker鎖
小問(wèn):為什么Worker不被設(shè)計(jì)成可重入鎖偏化?
友情小提示:禁止在動(dòng)態(tài)控制時(shí)再次獲取鎖
小答:由于在動(dòng)態(tài)控制方法中可能會(huì)中斷線程,比如調(diào)用interruptIdleWorkers()镐侯,由此該方法在執(zhí)行interrupt()之前會(huì)調(diào)用worker.tryLock()侦讨,若此時(shí)允許重入,就會(huì)導(dǎo)致線程被意外中斷,這跟當(dāng)工作線程正在執(zhí)行任務(wù)時(shí)韵卤,不允許被中斷準(zhǔn)則是相違背的
動(dòng)態(tài)控制
線程池提供了幾個(gè)公共方法用于動(dòng)態(tài)控制線程池的配置信息:
/**
* 設(shè)置核心工作線程數(shù)
* 1.若新值<當(dāng)前值時(shí)骗污,將調(diào)用interruptIdleWorkers()處理超出部分線程
* 2.若新值>當(dāng)前值時(shí),新創(chuàng)建的線程(若有必要)直接會(huì)處理隊(duì)列中的任務(wù)
*/
public void setCorePoolSize(int corePoolSize)
/**
* 設(shè)置是否響應(yīng)核心工作線程超時(shí)處理
* 1.設(shè)置false時(shí)怜俐,核心工作線程不會(huì)因?yàn)槿蝿?wù)數(shù)不足(空閑)而被終止
* 2.設(shè)置true時(shí)身堡,核心工作線程和非核心工作線程待遇一樣,會(huì)因?yàn)槌瑫r(shí)而終止
* 注意:為了禁止出現(xiàn)持續(xù)性的線程替換拍鲤,當(dāng)設(shè)置true時(shí)贴谎,超時(shí)時(shí)間必須>0
* 注意:該方法通常應(yīng)在線程池被使用之前調(diào)用
*/
public void allowCoreThreadTimeOut(boolean value)
/**
* 設(shè)置最大工作線程數(shù)
* 1.若新值<當(dāng)前值時(shí),將調(diào)用interruptIdleWorkers()處理超出部分線程
* 注意:當(dāng)新值>當(dāng)前值時(shí)是無(wú)需做任何處理的季稳,跟設(shè)置核心工作線程數(shù)不一樣
*/
public void setMaximumPoolSize(int maximumPoolSize)
/**
* 設(shè)置超時(shí)時(shí)間擅这,超時(shí)后工作線程將被終止
* 注意:若實(shí)際工作線程數(shù)只剩一個(gè),除非線程池被終止景鼠,否則無(wú)須響應(yīng)超時(shí)
*/
public void setKeepAliveTime(long time, TimeUnit unit)
任務(wù)提交與執(zhí)行
execute() - 提交任務(wù)
/**
* 在未來(lái)的某個(gè)時(shí)刻執(zhí)行給定的任務(wù)
* 這個(gè)任務(wù)由一個(gè)新線程執(zhí)行仲翎,或者用一個(gè)線程池中已經(jīng)存在的線程執(zhí)行
* 如果任務(wù)無(wú)法被提交執(zhí)行,要么是因?yàn)檫@個(gè)Executor已經(jīng)被shutdown關(guān)閉
* 要么是已經(jīng)達(dá)到其容量上限铛漓,任務(wù)會(huì)被當(dāng)前的RejectedExecutionHandler處理
*/
public void execute(Runnable command) {
//新任務(wù)不允許為空溯香,空則拋出NPE
if (command == null)
throw new NullPointerException();
/**
* 1.若實(shí)際工作線程數(shù) < 核心工作線程數(shù),會(huì)嘗試創(chuàng)建一個(gè)工作線程去執(zhí)行該
* 任務(wù)浓恶,即該command會(huì)作為該線程的第一個(gè)任務(wù)玫坛,即第一個(gè)firstTask
*
* 2.若任務(wù)入隊(duì)成功,仍需要執(zhí)行雙重校驗(yàn)包晰,原因有兩點(diǎn):
* - 第一個(gè)是去確認(rèn)是否需要新建一個(gè)工作線程湿镀,因?yàn)榭赡艽嬖? * 在上次檢查后已經(jīng)死亡died的工作線程
* - 第二個(gè)是可能在進(jìn)入該方法后線程池被關(guān)閉了,
* 比如執(zhí)行shutdown()
* 因此需要再次檢查state狀態(tài)伐憾,并分別處理以上兩種情況:
* - 若線程池中已無(wú)可用工作線程了勉痴,則需要新建一個(gè)工作線程
* - 若線程池已被關(guān)閉,則需要回滾入隊(duì)列(若有必要)
*
* 3.若任務(wù)入隊(duì)失敗(比如隊(duì)列已滿)树肃,則需要新建一個(gè)工作線程蒸矛;
* 若新建線程失敗,說(shuō)明線程池已停止或者已飽和胸嘴,必須執(zhí)行拒絕策略
*/
int c = ctl.get();
/**
* 情況一:當(dāng)實(shí)際工作線程數(shù) < 核心工作線程數(shù)時(shí)
* 執(zhí)行方案:會(huì)創(chuàng)建一個(gè)新的工作線程去執(zhí)行該任務(wù)
* 注意:此時(shí)即使有其他空閑的工作線程也還是會(huì)新增工作線程莉钙,
* 直到達(dá)到核心工作線程數(shù)為止
*/
if (workerCountOf(c) < corePoolSize) {
/**
* 新增工作線程,true表示要對(duì)比的是核心工作線程數(shù)
* 一旦新增成功就開始執(zhí)行當(dāng)前任務(wù)
* 期間也會(huì)通過(guò)自旋獲取隊(duì)列任務(wù)進(jìn)行執(zhí)行
*/
if (addWorker(command, true))
return;
/**
* 需要重新獲取控制器狀態(tài)筛谚,說(shuō)明新增線程失敗
* 線程失敗的原因可能有兩種:
* - 1.線程池已被關(guān)閉,非RUNNING狀態(tài)的線程池是不允許接收新任務(wù)的
* - 2.并發(fā)時(shí)停忿,假如都通過(guò)了workerCountOf(c) < corePoolSize校驗(yàn)驾讲,但其他線程
* 可能會(huì)在addWorker先創(chuàng)建出線程,導(dǎo)致workerCountOf(c) >= corePoolSize,
* 即實(shí)際工作線程數(shù) >= 核心工作線程數(shù)吮铭,此時(shí)需要進(jìn)入情況二
*/
c = ctl.get();
}
/**
* 情況二:當(dāng)實(shí)際工作線程數(shù)>=核心線程數(shù)時(shí)时迫,新提交任務(wù)需要入隊(duì)
* 執(zhí)行方案:一旦入隊(duì)成功,仍需要處理線程池狀態(tài)突變和工作線程死亡的情況
*/
if (isRunning(c) && workQueue.offer(command)) {
//雙重校驗(yàn)
int recheck = ctl.get();
/**
* recheck的目的是為了防止線程池狀態(tài)的突變 - 即被關(guān)閉
* 一旦線程池非RUNNING狀態(tài)時(shí)谓晌,除了從隊(duì)列中移除該任務(wù)(回滾)外
* 還需要執(zhí)行任務(wù)拒絕策略處理新提交的任務(wù)
*/
if (!isRunning(recheck) && remove(command))
//執(zhí)行任務(wù)拒絕策略
reject(command);
/**
* 若線程池還是RUNNING狀態(tài) 或 隊(duì)列移除失敗(可能正好被一個(gè)工作線程拿到處理了)
* 此時(shí)需要確保至少有一個(gè)工作線程還可以干活
* 補(bǔ)充一句:之所有無(wú)須與核心工作線程數(shù)或最大線程數(shù)相比掠拳,而只是比較0的原因是
* 只要保證有一個(gè)工作線程可以干活就行,它會(huì)自動(dòng)去獲取任務(wù)
*/
else if (workerCountOf(recheck) == 0)
/**
* 若工作線程都已死亡纸肉,需要新增一個(gè)工作線程去干活
* 死亡原因可能是線程超時(shí)或者異常等等復(fù)雜情況
*
* 第一個(gè)參數(shù)為null指的是傳入一個(gè)空任務(wù)溺欧,
* 目的是創(chuàng)建一個(gè)新工作線程去處理隊(duì)列中的剩余任務(wù)
* 第二個(gè)參數(shù)為false目的是提示可以擴(kuò)容到最大工作線程數(shù)
*/
addWorker(null, false);
}
/**
* 情況三:一旦線程池被關(guān)閉 或者 新任務(wù)入隊(duì)失敗(隊(duì)列已滿)
* 執(zhí)行方案:會(huì)嘗試創(chuàng)建一個(gè)新的工作線程,并允許擴(kuò)容到最大工作線程數(shù)
* 注意:一旦創(chuàng)建失敗柏肪,比如超過(guò)最大工作線程數(shù)姐刁,需要執(zhí)行任務(wù)拒絕策略
*/
else if (!addWorker(command, false))
//執(zhí)行任務(wù)拒絕策略
reject(command);
}
addWorker() - 新增工作線程
/**
* 新增工作線程需要遵守線程池控制狀態(tài)規(guī)定和邊界限制
*
* @param core core為true時(shí)允許擴(kuò)容到核心工作線程數(shù),否則為最大工作線程數(shù)
* @return 新增成功返回true烦味,失敗返回false
*/
private boolean addWorker(Runnable firstTask, boolean core) {
//重試標(biāo)簽
retry:
/***
* 外部自旋 -> 目的是確認(rèn)是否能夠新增工作線程
* 允許新增線程的條件有兩個(gè):
* 1.滿足線程池狀態(tài)條件 -> 條件一
* 2.實(shí)際工作線程滿足數(shù)量邊界條件 -> 條件二
* 不滿足條件時(shí)會(huì)直接返回false聂使,表示新增工作線程失敗
*/
for (;;) {
//讀取原子控制量 - 包含workerCount(實(shí)際工作線程數(shù))和runState(線程池狀態(tài))
int c = ctl.get();
//讀取線程池狀態(tài)
int rs = runStateOf(c);
/**
* 條件一.判斷是否滿足線程池狀態(tài)條件
* 1.只有兩種情況允許新增線程:
* 1.1 線程池狀態(tài)==RUNNING
* 1.2 線程池狀態(tài)==SHUTDOWN且firstTask為null同時(shí)隊(duì)列非空
*
* 2.線程池狀態(tài)>=SHUTDOWN時(shí)不允許接收新任務(wù),具體如下:
* 2.1 線程池狀態(tài)>SHUTDOWN谬俄,即為STOP柏靶、TIDYING、TERMINATED
* 2.2 線程池狀態(tài)==SHUTDOWN溃论,但firstTask非空
* 2.3 線程池狀態(tài)==SHUTDOWN且firstTask為空屎蜓,但隊(duì)列為空
* 補(bǔ)充:針對(duì)1.2、2.2蔬芥、2.3的情況具體請(qǐng)參加后面的"小問(wèn)答"環(huán)節(jié)
*/
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;
/***
* 內(nèi)部自旋 -> 條件二.判斷實(shí)際工作線程數(shù)是否滿足數(shù)量邊界條件
* -數(shù)量邊界條件滿足會(huì)對(duì)嘗試workerCount實(shí)現(xiàn)CAS自增梆靖,否則新增失敗
* -當(dāng)CAS失敗時(shí)會(huì)再次重新判斷是否滿足新增條件:
* 1.若此期間線程池狀態(tài)突變(被關(guān)閉),重新判斷線程池狀態(tài)條件和數(shù)量邊界條件
* 2.若此期間線程池狀態(tài)一致笔诵,則只需重新判斷數(shù)量邊界條件
*/
for (;;) {
//讀取實(shí)際工作線程數(shù)
int wc = workerCountOf(c);
/**
* 新增工作線程會(huì)因兩種實(shí)際工作線程數(shù)超標(biāo)情況而失敺滴恰:
* 1.實(shí)際工作線程數(shù) >= 最大容量
* 2.實(shí)際工作線程數(shù) > 工作線程比較邊界數(shù)(當(dāng)前最大擴(kuò)容數(shù))
* -若core = true,比較邊界數(shù) = 核心工作線程數(shù)
* -若core = false乎婿,比較邊界數(shù) = 最大工作線程數(shù)
*/
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
/**
* 實(shí)際工作線程計(jì)數(shù)CAS自增:
* 1.一旦成功直接退出整個(gè)retry循環(huán)测僵,表明新增條件都滿足
* 2.因并發(fā)競(jìng)爭(zhēng)導(dǎo)致CAS更新失敗的原因有三種:
* 2.1 線程池剛好已新增一個(gè)工作線程
* -> 計(jì)數(shù)增加,只需重新判斷數(shù)量邊界條件
* 2.2 剛好其他工作線程運(yùn)行期發(fā)生錯(cuò)誤或因超時(shí)被回收
* -> 計(jì)數(shù)減少谢翎,只需重新判斷數(shù)量邊界條件
* 2.3 剛好線程池被關(guān)閉
* -> 計(jì)數(shù)減少捍靠,工作線程被回收,
* 需重新判斷線程池狀態(tài)條件和數(shù)量邊界條件
*/
if (compareAndIncrementWorkerCount(c))
break retry;
//重新讀取原子控制量 -> 原因是在此期間可能線程池被關(guān)閉了
c = ctl.get();
/**
* 快速檢測(cè)是否發(fā)生線程池狀態(tài)突變
* 1.若狀態(tài)突變森逮,重新判斷線程池狀態(tài)條件和數(shù)量邊界條件
* 2.若狀態(tài)一致榨婆,則只需重新判斷數(shù)量邊界條件
*/
if (runStateOf(c) != rs)
continue retry;
}
}
/**
* 這里是addWorker方法的一個(gè)分割線
* 前面的代碼的作用是決定了線程池接受還是拒絕新增工作線程
* 后面的代碼的作用是真正開始新增工作線程并封裝成Worker接著執(zhí)行后續(xù)操作
* PS:雖然筆者覺(jué)得這個(gè)方法其實(shí)可以拆分成兩個(gè)方法的(在break retry的位置)
*/
//記錄新增的工作線程是否開始工作
boolean workerStarted = false;
//記錄新增的worker是否成功添加到workers集合中
boolean workerAdded = false;
Worker w = null;
try {
//將新提交的任務(wù)和當(dāng)前線程封裝成一個(gè)Worker
w = new Worker(firstTask);
//獲取新創(chuàng)建的實(shí)際工作線程
final Thread t = w.thread;
/**
* 檢測(cè)是否有可執(zhí)行任務(wù)的線程,即是否成功創(chuàng)建了新的工作線程
* 1.若存在褒侧,則選擇執(zhí)行任務(wù)
* 2.若不存在良风,則需要執(zhí)行addWorkerFailed()方法
*/
if (t != null) {
/**
* 新增工作線程需要加全局鎖
* 目的是為了確保安全更新workers集合和largestPoolSize
*/
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
/**
* 獲得全局鎖后谊迄,需再次檢測(cè)當(dāng)前線程池狀態(tài)
* 原因在于預(yù)防兩種非法情況:
* 1.線程工廠創(chuàng)建線程失敗
* 2.在鎖被獲取之前,線程池就被關(guān)閉了
*/
int rs = runStateOf(ctl.get());
/**
* 只有兩種情況是允許添加work進(jìn)入works集合的
* 也只有進(jìn)入workers集合后才是真正的工作線程烟央,并開始執(zhí)行任務(wù)
* 1.線程池狀態(tài)為RUNNING(即rs<SHUTDOWN)
* 2.線程池狀態(tài)為SHUTDOWN且傳入一個(gè)空任務(wù)
* (理由參見(jiàn):小問(wèn)答之快速檢測(cè)線程池狀態(tài)?)
*/
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
/**
* 若線程處于活動(dòng)狀態(tài)時(shí)统诺,說(shuō)明線程已啟動(dòng),需要立即拋出"線程狀態(tài)非法異常"
* 原因是線程是在后面才被start的疑俭,已被start的不允許再被添加到workers集合中
* 換句話說(shuō)該方法新增線程時(shí)粮呢,而線程是新的,本身應(yīng)該是初始狀態(tài)(new)
* 可能出現(xiàn)的場(chǎng)景:自定義線程工廠newThread有可能會(huì)提前啟動(dòng)線程
*/
if (t.isAlive())
throw new IllegalThreadStateException();
//由于加鎖钞艇,所以可以放心的加入集合
workers.add(w);
int s = workers.size();
//更新最大工作線程數(shù)啄寡,由于持有鎖,所以無(wú)需CAS
if (s > largestPoolSize)
largestPoolSize = s;
//確認(rèn)新建的worker已被添加到workers集合中
workerAdded = true;
}
} finally {
//千萬(wàn)不要忘記主動(dòng)解鎖
mainLock.unlock();
}
/**
* 一旦新建工作線程被加入工作線程集合中香璃,就意味著其可以開始干活了
* 有心的您肯定發(fā)現(xiàn)在線程start之前已經(jīng)釋放鎖了
* 原因在于一旦workerAdded為true時(shí)这难,說(shuō)明鎖的目的已經(jīng)達(dá)到
* 根據(jù)最小化鎖作用域的原則,線程執(zhí)行任務(wù)無(wú)須加鎖葡秒,這是種優(yōu)化
* 也希望您在使用鎖時(shí)盡量保證鎖的作用域最小化
*/
if (workerAdded) {
/**
* 啟動(dòng)線程姻乓,開始干活啦
* 若您看過(guò)筆者的"并發(fā)番@Thread一文通"肯定知道start()后,
* 一旦線程初始化完成便會(huì)立即調(diào)用run()方法
*/
t.start();
//確認(rèn)該工作線程開始干活了
workerStarted = true;
}
}
} finally {
//若新建工作線程失敗或新建工作線程后沒(méi)有成功執(zhí)行眯牧,需要做新增失敗處理
if (!workerStarted)
addWorkerFailed(w);
}
//返回結(jié)果表明新建的工作線程是否已啟動(dòng)執(zhí)行
return workerStarted;
}
小問(wèn):快速檢測(cè)線程狀態(tài)時(shí)蹋岩,情況1.2、2.1学少、2.3的意義是什么剪个?
友情小提示:讀者可以反問(wèn)自己 -> 何時(shí)新增Worker才是有意義的呢?傳入一個(gè)空任務(wù)的目的是什么版确?
小答:在闡明這個(gè)問(wèn)題之前扣囊,我們先明確兩個(gè)知識(shí)點(diǎn):
1.新增Worker的目的是處理任務(wù),任務(wù)來(lái)源分初始任務(wù)和隊(duì)列任務(wù)(即剩余的待處理任務(wù))
2.線程池在非RUNNING狀態(tài)下是不允許接收新任務(wù)的绒疗,換句話說(shuō)您都要下班了侵歇,難道還想接新需求?
針對(duì)2.1 - > 線程池狀態(tài)==SHUTDOWN吓蘑,但firstTask惕虑!= null,不允許新增Worker
當(dāng)線程池狀態(tài)為SHUTDOWN時(shí)磨镶,由于不允許接收新任務(wù)溃蔫,因此一旦firstTask!= null需要直接拒絕
針對(duì)2.2 - > 線程池狀態(tài)==SHUTDOWN且firstTask == null琳猫,但隊(duì)列為空伟叛,不允許新增Worker
當(dāng)firstTask為null時(shí),說(shuō)明調(diào)用addWorker()目的不是為了處理新增任務(wù)
那么其目的應(yīng)該是為了處理剩余任務(wù)脐嫂,即隊(duì)列中的任務(wù)痪伦,而一旦隊(duì)列為空侄榴,那也沒(méi)必要新增Worker了
針對(duì)1.2 - > 若線程池狀態(tài)==SHUTDOWN,必須滿足firstTask為null且隊(duì)列非空网沾,才允許新增Worker
當(dāng)線程池狀態(tài)為SHUTDOWN時(shí)(調(diào)用shutdown()),此時(shí)不允許接收新任務(wù)蕊爵,因此firstTask必須為null
但需要處理剩余任務(wù)辉哥,因此隊(duì)列必須非空,否則新增的工作線程就無(wú)任務(wù)可做攒射,那就沒(méi)意義了
結(jié)論:傳入一個(gè)空任務(wù)的目的是為了新增工作線程去處理任務(wù)隊(duì)列中的剩余任務(wù)
小問(wèn):線程是如何真正開始工作的醋旦,即何時(shí)開始執(zhí)行runWorker()?
友情小提示:結(jié)合Thread和Worker的構(gòu)造器考慮一下
private final class Worker
extends AbstractQueuedSynchronizer
//步驟1:實(shí)現(xiàn)Runnable接口会放,從而自身是個(gè)Runnable饲齐,可以調(diào)用run方法
implements Runnable{
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
//步驟2:newThread()的參數(shù)傳入的是this,即Worker本身咧最,注意Worker是Runnable
this.thread = getThreadFactory().newThread(this);
}
/**
* 步驟3:調(diào)用run()最終執(zhí)行runWorker()
* - 在addWorker()中會(huì)使用 worker.thread.start()啟動(dòng)線程
* - thread啟動(dòng)后會(huì)立即調(diào)用run()方法捂人,這就意味著啟動(dòng)調(diào)用會(huì)經(jīng)歷這樣的過(guò)程:
* worker = new Worker(Runnable) - > thread = newThread(worker) -> thread.start() ->
* thread.run()[JVM自動(dòng)調(diào)用] -> worker.run() -> threadPoolExecuter.runWorker(worker)
*/
public void run() {
runWorker(this);
}
}
結(jié)論之啟動(dòng)調(diào)用會(huì)經(jīng)歷一下過(guò)程:
(1) worker = new Worker(Runnable) --> (2) thread = newThread(worker) --> (3) thread.start() --> (4) thread.run()[JVM自動(dòng)調(diào)用] --> (5) worker.run() --> (6) threadPoolExecuter.runWorker(worker)
runWorker() - 執(zhí)行任務(wù)
final void runWorker(Worker w) {
//讀取當(dāng)前線程 -即調(diào)用execute()方法的線程(一般是主線程)
Thread wt = Thread.currentThread();
//讀取待執(zhí)行任務(wù)
Runnable task = w.firstTask;
//清空任務(wù) -> 目的是用來(lái)接收下一個(gè)任務(wù)
w.firstTask = null;
/**
* 注意Worker本身也是一把不可重入的互斥鎖!
* 由于Worker初始化時(shí)state=-1矢沿,因此此處的解鎖的目的是:
* 將state-1變成0滥搭,因?yàn)橹挥衧tate>=0時(shí)才允許中斷;
* 同時(shí)也側(cè)面說(shuō)明在worker調(diào)用runWorker()之前是不允許被中斷的捣鲸,
* 即運(yùn)行前不允許被中斷
*/
w.unlock();
//記錄是否因異常/錯(cuò)誤突然完成瑟匆,默認(rèn)有異常/錯(cuò)誤發(fā)生
boolean completedAbruptly = true;
try {
/**
* 獲取任務(wù)并執(zhí)行任務(wù),取任務(wù)分兩種情況:
* 1.初始任務(wù):Worker被初始化時(shí)賦予的第一個(gè)任務(wù)(firstTask)
* 2.隊(duì)列任務(wù):當(dāng)firstTask任務(wù)執(zhí)行好后栽惶,線程不會(huì)被回收愁溜,而是之后自動(dòng)自旋從任務(wù)隊(duì)列中取任務(wù)(getTask)
* 此時(shí)即體現(xiàn)了線程的復(fù)用
*/
while (task != null || (task = getTask()) != null) {
/**
* Worker加鎖的目的是為了在shutdown()時(shí)不要立即終止正在運(yùn)行的worker,
* 因?yàn)樾枰瘸钟墟i才能終止外厂,而不是為了處理并發(fā)情況(注意不是全局鎖)
* 在shutdownNow()時(shí)會(huì)立即終止worker冕象,因?yàn)槠錈o(wú)須持有鎖就能終止
* 關(guān)于關(guān)閉線程池下文會(huì)再具體詳述
*/
w.lock();
/**
* 當(dāng)線程池被關(guān)閉且主線程非中斷狀態(tài)時(shí),需要重新中斷它
* 由于調(diào)用線程一般是主線程酣衷,因此這里是主線程代指調(diào)用線程
*/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
/**
* 每個(gè)任務(wù)執(zhí)行前都會(huì)調(diào)用"前置方法"交惯,
* 在"前置方法"可能會(huì)拋出異常,
* 結(jié)果是退出循環(huán)且completedAbruptly=true穿仪,
* 從而線程死亡席爽,任務(wù)未執(zhí)行(并被丟棄)
*/
beforeExecute(wt, task);
Throwable thrown = null;
try {
//執(zhí)行任務(wù)
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
/**
* 任務(wù)執(zhí)行結(jié)束后,會(huì)調(diào)用"后置方法"
* 該方法也可能拋異常從而導(dǎo)致線程死亡
* 但值得注意的是任務(wù)已經(jīng)執(zhí)行完畢
*/
afterExecute(task, thrown);
}
} finally {
//清空任務(wù) help gc
task = null;
//無(wú)論成功失敗任務(wù)數(shù)都要+1啊片,由于持有鎖所以無(wú)須CAS
w.completedTasks++;
//必須要主動(dòng)釋放鎖
w.unlock();
}
}
//無(wú)異常時(shí)需要清除異常狀態(tài)
completedAbruptly = false;
} finally {
/**
* 工作線程退出循環(huán)的原因有兩個(gè):
* 1.因意外的錯(cuò)誤/異常退出
* 2.getTask()返回空 -> 原因有四種只锻,下文會(huì)詳述
* 工作線程退出循環(huán)后,需要執(zhí)行相對(duì)應(yīng)的回收處理
*/
processWorkerExit(w, completedAbruptly);
}
}
小問(wèn):為什么新任務(wù)不直接放入任務(wù)隊(duì)列而是被新線程執(zhí)行呢紫谷?
小提示:主要是為了減少不必要的開銷齐饮,從而提供性能
小答:新任務(wù)不直接放入任務(wù)隊(duì)列目的是減少任務(wù)隊(duì)列的入隊(duì)和出隊(duì)操作捐寥,因?yàn)槿蝿?wù)隊(duì)列本身是阻塞隊(duì)列,因此其入隊(duì)和出隊(duì)操作會(huì)涉及鎖操作以及并發(fā)處理
getTask() - 獲取任務(wù)
造成getTask()方法返回null的原因有5種:
1.線程池被關(guān)閉祖驱,狀態(tài)為(STOP || TIDYING || TERMINATED)
2.線程池被關(guān)閉握恳,狀態(tài)為SHUTDOWN且任務(wù)隊(duì)列為空
3.實(shí)際工作線程數(shù)超過(guò)最大工作線程數(shù)
4.工作線程滿足超時(shí)條件后,同時(shí)符合下述的任意一種情況:
4.1 線程池中還存在至少一個(gè)其他可用的工作線程
4.2 線程池中已沒(méi)有其他可用的工作線程但任務(wù)隊(duì)列為空
private Runnable getTask() {
// 記錄任務(wù)隊(duì)列的poll()是否超時(shí)捺僻,默認(rèn)未超時(shí)
boolean timedOut = false;
//自旋獲取任務(wù)
for (;;) {
/**
* 線程池會(huì)依次判斷五種情況乡洼,滿足任意一種就返回null:
* 1.線程池被關(guān)閉,狀態(tài)為(STOP || TIDYING || TERMINATED)
* 2.線程池被關(guān)閉匕坯,狀態(tài)為SHUTDOWN且任務(wù)隊(duì)列為空
* 3.實(shí)際工作線程數(shù)超過(guò)最大工作線程數(shù)
* 4.工作線程滿足超時(shí)條件后束昵,同時(shí)符合下述的任意一種情況:
* 4.1 線程池中還存在至少一個(gè)其他可用的工作線程
* 4.2 線程池中已沒(méi)有其他可用的工作線程但任務(wù)隊(duì)列為空
*/
int c = ctl.get();
int rs = runStateOf(c);
/**
* 判斷線程池狀態(tài)條件,有兩種情況直接返回null
* 1.線程池狀態(tài)大于SHUTDOWN(STOP||TIDYING||TERMINATED)葛峻,說(shuō)明不允許再執(zhí)行任務(wù)
* - 因?yàn)?gt;=STOP以上狀態(tài)時(shí)不允許接收新任務(wù)同時(shí)會(huì)中斷正在執(zhí)行中的任務(wù)锹雏,任務(wù)隊(duì)列的任務(wù)也不執(zhí)行了
*
* 2.線程池狀態(tài)為SHUTDOWN且任務(wù)隊(duì)列為空,說(shuō)明已經(jīng)無(wú)任務(wù)可執(zhí)行
* - 因?yàn)镾HUTDOWN時(shí)還需要執(zhí)行任務(wù)隊(duì)列的剩余任務(wù)术奖,只有當(dāng)無(wú)任務(wù)才可退出
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
/**
* 減少一個(gè)工作線程數(shù)
* 值得注意的是工作線程的回收是放在processWorkerExit()中進(jìn)行的
* decrementWorkerCount()方法是內(nèi)部不斷循環(huán)執(zhí)行CAS的礁遵,保證最終一定會(huì)成功
* 補(bǔ)充:因線程池被關(guān)閉而計(jì)數(shù)減少可能與addWorker()的
* 計(jì)數(shù)CAS自增發(fā)生并發(fā)競(jìng)爭(zhēng)
*/
decrementWorkerCount();
return null;
}
//讀取實(shí)際工作線程數(shù)
int wc = workerCountOf(c);
/**
* 判斷是否需要處理超時(shí):
* 1.allowCoreThreadTimeOut = true 表示需要回收空閑超時(shí)的核心工作線程
* 2.wc > corePoolSize 表示存在空閑超時(shí)的非核心工作線程需要回收
*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/**
* 有三種情況會(huì)實(shí)際工作線程計(jì)數(shù)-1且直接返回null
*
* 1.實(shí)際工作線程數(shù)超過(guò)最大線程數(shù)
* 2.該工作線程滿足空閑超時(shí)條件需要被回收:
* 2.1 當(dāng)線程池中還存在至少一個(gè)其他可用的工作線程
* 2.2 線程池中已沒(méi)有其他可用的工作線程但任務(wù)隊(duì)列為空
*
* 結(jié)合2.1和2.2我們可以推導(dǎo)出:
*
* 1.當(dāng)任務(wù)隊(duì)列非空時(shí),線程池至少需要維護(hù)一個(gè)可用的工作線程腰耙,
* 因此此時(shí)即使該工作線程超時(shí)也不會(huì)被回收掉而是繼續(xù)獲取任務(wù)
*
* 2.當(dāng)實(shí)際工作線程數(shù)超標(biāo)或獲取任務(wù)超時(shí)時(shí)榛丢,線程池會(huì)因?yàn)? * 一直沒(méi)有新任務(wù)可執(zhí)行,而逐漸減少線程直到核心線程數(shù)為止挺庞;
* 若設(shè)置allowCoreThreadTimeOut為true晰赞,則減少到1為止;
*
* 提示:由于wc > maximumPoolSize時(shí)必定wc > 1选侨,因此無(wú)須比較
* (wc > maximumPoolSize && workQueue.isEmpty()) 這種情況
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
/**
* CAS失敗的原因還是出現(xiàn)并發(fā)競(jìng)爭(zhēng)掖鱼,具體參考上文
* 當(dāng)CAS失敗后,說(shuō)明實(shí)際工作線程數(shù)已經(jīng)發(fā)生變化援制,
* 必須重新判斷實(shí)際工作線程數(shù)和超時(shí)情況
* 因此需要countinue
*/
if (compareAndDecrementWorkerCount(c))
return null;
/**
*/
continue;
}
//若滿足獲取任務(wù)條件戏挡,根據(jù)是否需要超時(shí)獲取會(huì)調(diào)用不同方法
try {
/**
* 從任務(wù)隊(duì)列中取任務(wù)分兩種:
* 1.timed=true 表明需要處理超時(shí)情況
* -> 調(diào)用poll(),超過(guò)keepAliveTime返回null
* 2.timed=fasle 表明無(wú)須處理超時(shí)情況
* -> 調(diào)用take()晨仑,無(wú)任務(wù)則掛起等待
*/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
//一旦獲取到任務(wù)就返回該任務(wù)并退出循環(huán)
if (r != null)
return r;
//當(dāng)任務(wù)為空時(shí)說(shuō)明poll超時(shí)
timedOut = true;
/**
* 關(guān)于中斷異常獲取簡(jiǎn)單講一些超出本章范疇的內(nèi)容
* take()和poll(long timeout, TimeUnit unit)都會(huì)throws InterruptedException
* 原因在LockSupport.park(this)不會(huì)拋出異常但會(huì)響應(yīng)中斷褐墅;
* 但ConditionObject的await()會(huì)通過(guò)reportInterruptAfterWait()響應(yīng)中斷
* 具體內(nèi)容筆者會(huì)在阻塞隊(duì)列相關(guān)番中進(jìn)一步介紹
*/
} catch (InterruptedException retry) {
/**
* 一旦該工作線程被中斷,需要清除超時(shí)標(biāo)記
* 這表明當(dāng)工作線程在獲取隊(duì)列任務(wù)時(shí)被中斷洪己,
* 若您不對(duì)中斷異常做任務(wù)處理妥凳,線程池就默認(rèn)
* 您希望線程繼續(xù)執(zhí)行,這樣就會(huì)重置之前的超時(shí)標(biāo)記
*/
timedOut = false;
}
}
}
小問(wèn):為什么當(dāng)任務(wù)為空時(shí)說(shuō)明poll超時(shí)答捕?
友情小提示:可以聯(lián)想一下阻塞隊(duì)列操作接口
關(guān)閉線程池
關(guān)閉線程池主要有兩種方式逝钥,兩者的區(qū)別是:
shutdown() : 隊(duì)列剩余任務(wù)全部執(zhí)行完畢再終止
shutdownNow() : 放棄執(zhí)行隊(duì)列剩余任務(wù),但會(huì)將它們返回
兩者的共性在于:
1.正在執(zhí)行中的任務(wù)會(huì)繼續(xù)執(zhí)行拱镐,不會(huì)被終止或放棄
2.新提交的任務(wù)會(huì)被直接拒絕
shutdown() - 有序關(guān)閉
使用shutdown()關(guān)閉線程池最主要執(zhí)行5個(gè)操作:
1.獲取全局鎖
2.CAS自旋變更線程池狀態(tài)為SHUTDOWN
3.中斷所有空閑工作線程(設(shè)置中斷標(biāo)記) -> 注意是空閑
4.釋放全局鎖
5.嘗試終止線程池
/**
* 有序關(guān)閉線程池
* 在關(guān)閉過(guò)程中艘款,之前已提交的任務(wù)將被執(zhí)行(包括正在和隊(duì)列中的)持际,
* 但新提交的任務(wù)會(huì)被拒絕
* 如果線程池已經(jīng)被關(guān)閉,調(diào)用該方法不會(huì)有任何附加效果
*/
public void shutdown() {
//1.獲取全局鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//2.CAS自旋變更線程池狀態(tài)為SHUTDOWN
advanceRunState(SHUTDOWN);
//3.中斷所有空閑工作線程
interruptIdleWorkers();
//專門提供給ScheduledThreadPoolExecutor的鉤子方法
onShutdown();
} finally {
//4.釋放全局鎖
mainLock.unlock();
}
/**
* 5.嘗試終止線程池,此時(shí)線程池滿足兩個(gè)條件:
* 1.線程池狀態(tài)為SHUTDOWN
* 2.所有空閑工作線程已被中斷
*/
tryTerminate();
}
shutdownNow() - 立即關(guān)閉
使用shutdownNow()關(guān)閉線程池最主要執(zhí)行六個(gè)操作:
1.獲取全局鎖
2.CAS自旋變更線程池狀態(tài)為SHUTDOWN
3.中斷所有工作線程(設(shè)置中斷標(biāo)記)
4.將剩余任務(wù)重新放入一個(gè)list中并清空任務(wù)隊(duì)列
5.釋放全局鎖
6.嘗試終止線程池
/**
* 嘗試中斷所有工作線程,并返回待處理任務(wù)列表集合(從任務(wù)隊(duì)列中移除)
*
* 1.若想等待執(zhí)行中的線程完成任務(wù)洁段,可使用awaitTermination()
* 2.由于取消任務(wù)操作是通過(guò)Thread#interrupt實(shí)現(xiàn),因此
* 響應(yīng)中斷失敗的任務(wù)可能永遠(yuǎn)都不會(huì)被終止(謹(jǐn)慎使用!!!)
* 響應(yīng)中斷失敗指的是您選擇捕獲但不處理該中斷異常
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
//1.獲取全局鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//2.CAS自旋更新線程池狀態(tài)為STOP
advanceRunState(STOP);
//3.中斷所有工作線程
interruptWorkers();
//4.將剩余任務(wù)重新放入一個(gè)list中并清空任務(wù)隊(duì)列
tasks = drainQueue();
} finally {
//5.釋放全局鎖
mainLock.unlock();
}
/**
* 6.嘗試終止線程池芒填,此時(shí)線程池滿足兩個(gè)條件:
* 1.線程池狀態(tài)為STOP
* 2.任務(wù)隊(duì)列為空
* 注意:此時(shí)不一定所有工作線程都被中斷回收,詳述見(jiàn)
* 7.3 tryTerminate
*/
tryTerminate();
//5.返回待處理任務(wù)列表集合
return tasks;
}
awaitTermination() - 等待線程池終止
當(dāng)關(guān)閉線程池時(shí)空繁,awaitTermination()會(huì)一直阻塞直到下述任一種情況的出現(xiàn):
1.所有任務(wù)執(zhí)行完畢: 線程池只有在調(diào)用tryTerminated()嘗試終止線程池并成功將狀態(tài)變更為TERMINATED后才會(huì)調(diào)用termination.signalAll(),此后阻塞線程被喚醒后會(huì)再次判斷狀態(tài)朱庆,一旦滿足TERMINATED就會(huì)退出
2.到達(dá)阻塞超時(shí)時(shí)間: termination.awaitNanos()在到達(dá)超時(shí)間后便會(huì)返回剩余時(shí)間(此時(shí)為0)盛泡,隨后會(huì)因再次判斷滿足nano==0導(dǎo)致return false,即等待失敗
3.當(dāng)前線程被中斷: 若當(dāng)前線程(主線程)被中斷,線程會(huì)拋出InterruptException中斷異常娱颊,若不做異常處理就會(huì)因異常而解除阻塞
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
//1.獲取全局鎖
mainLock.lock();
try {
for (;;) {
//2.所有任務(wù)執(zhí)行完畢傲诵,等待成功而退出
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
//3.到達(dá)阻塞超時(shí)時(shí)間,等待失敗而退出
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
//4.釋放全局鎖
mainLock.unlock();
}
}
您可以通過(guò)以下方式得知線程池是否真正被關(guān)閉:
//關(guān)閉線程池
threadPoolExecutor.shutdown();
try{
//循環(huán)調(diào)用等待任務(wù)最終全部完成
while(!threadPoolExecutor.awaitTermination(300, TimeUnit.MILLISECONDS)) {
logger.info("task executing...");
}
//此時(shí)剩余任務(wù)全部執(zhí)行完畢箱硕,開始執(zhí)行終止流程
logger.info("shutdown completed!")
} catch (InterruptedException e) {
//中斷處理
}
中斷和終止處理
interruptIdleWorkers() - 中斷空閑線程
Worker對(duì)于中斷處理有如下四個(gè)準(zhǔn)則(前面的知識(shí)我們?cè)倩仡櫼槐?:
1.當(dāng)工作線程真正開始執(zhí)行之前拴竹,不允許被中斷
2.當(dāng)工作線程正在執(zhí)行任務(wù)時(shí),不允許被中斷
3.當(dāng)工作線程正等待從任務(wù)隊(duì)列中獲取任務(wù)getTask()時(shí)才能被中斷
4.調(diào)用interruptIdleWorkers()中斷空閑線程時(shí)必須先獲得worker鎖
/**
* 中斷全部空閑線程
*/
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
/**
* 中斷未上鎖且在等待任務(wù)的空閑線程
* 中斷的作用在于便于處理終止線程池或動(dòng)態(tài)控制的情況
*
* @param onlyOne 為true時(shí)為中斷一個(gè)剧罩,為false時(shí)為中斷全部
*/
private void interruptIdleWorkers(boolean onlyOne) {
//加全局鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
/**
* 循環(huán)方式中斷工作線程
* 這里也體現(xiàn)了workers集合的核心作用之一
*/
for (Worker w : workers) {
Thread t = w.thread;
/**
* 非中斷且成功獲取到worker鎖的工作線程才允許被中斷
*
* 1.已被中斷的工作線程無(wú)須再次標(biāo)記中斷
*
* 2.w.tryLock()體現(xiàn)了Worker作為一把鎖的核心作用:
* 即控制線程中斷 -> 當(dāng)線程還在運(yùn)行中是不允許被中斷的
*
* 3.具體可以參見(jiàn)runWorker()方法栓拜,運(yùn)行前都是調(diào)用lock()
*
* 4.由于該方法只會(huì)在shutdown()中調(diào)用,間接也說(shuō)明
* shutdown()只會(huì)中斷在該方法中獲取到worker鎖
* 的空閑線程(此時(shí)線程正在獲取新任務(wù)getTask()惠昔,還沒(méi)上鎖)
*/
if (!t.isInterrupted() && w.tryLock()) {
try {
//中斷工作線程
t.interrupt();
} catch (SecurityException ignore) {
} finally {
//注意這里釋放的是worker鎖幕与,對(duì)應(yīng)tryLock()
w.unlock();
}
}
//onlyOne為true時(shí),只隨機(jī)中斷一個(gè)空閑線程(Set可是無(wú)序的哦)
if (onlyOne)
break;
}
} finally {
//釋放全局鎖
mainLock.unlock();
}
}
interruptWorkers() - 中斷所有線程
/**
* 中斷所有線程镇防,包括正在執(zhí)行任務(wù)的線程
* 該方法只提供給shutdownNow()使用
*/
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//循環(huán)設(shè)置中斷標(biāo)志
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
/**
* Worker實(shí)現(xiàn)的中斷方法
*/
void interruptIfStarted() {
Thread t;
/**
* 當(dāng)線程池非RUNNING狀態(tài) && 線程非空 && 線程非中斷
* 三者同時(shí)滿足時(shí)才允許中斷
*
* 為什么線程池必須非RUNNING狀態(tài)才允許中斷呢啦鸣?
* 因?yàn)樵摲椒ㄖ惶峁┙ointerruptWorkers()使用
* 而interruptWorkers()只提供給shutdownNow()使用
* 因此此時(shí)線程狀態(tài)應(yīng)為STOP
*/
if (getState() >= 0 && (t = thread) != null
&& !t.isInterrupted()) {
try {
//設(shè)置中斷標(biāo)志
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
tryTerminate() - 嘗試終止線程池
小問(wèn):為什么正在執(zhí)行任務(wù)的工作線程不允許被中斷?
友情小提示:工作線程執(zhí)行任務(wù)前需加worker鎖且該鎖非重入
小答:回顧interruptIdleWorkers()我們發(fā)現(xiàn)在(1)必須先調(diào)用tryLock()成功獲取worker鎖后才允許中斷該工作線程来氧,而因?yàn)?2)工作線程獲取到任務(wù)后并在執(zhí)行任務(wù)之前也會(huì)先加worker鎖且worker鎖是不可重入的诫给,這就意味著正在執(zhí)行任務(wù)的工作線程不允許被中斷
小問(wèn):線程中斷是如何影響線程回收的?
友情小提示:核心在于當(dāng)getTask()返回null時(shí)會(huì)退出runWorker()并執(zhí)行processWorkerExit()
造成getTask()方法返回null的原因有5種:
1.線程池被關(guān)閉啦扬,狀態(tài)為(STOP || TIDYING || TERMINATED)
2.線程池被關(guān)閉中狂,狀態(tài)為SHUTDOWN且任務(wù)隊(duì)列為空
3.實(shí)際工作線程數(shù)超過(guò)最大工作線程數(shù)
4.工作線程滿足超時(shí)條件后,同時(shí)符合下述的任意一種情況:
4.1 線程池中還存在至少一個(gè)其他可用的工作線程
4.2 線程池中已沒(méi)有其他可用的工作線程但任務(wù)隊(duì)列為空
由上述可知考传,當(dāng)線程池關(guān)閉吃型、線程超時(shí)或動(dòng)態(tài)控制線程(比如池大小數(shù)、超時(shí)時(shí)間等)都可能造成getTask()返回null僚楞,那getTask()是如何影響回收的呢勤晚?
我們僅以關(guān)閉線程池為例(其他情況也只是條件判斷不同的區(qū)別)枉层,描述一下中斷后會(huì)發(fā)生的邏輯:
1.當(dāng)阻塞在getTask()上的工作線程被中斷后會(huì)拋出InterruptedException中斷異常,之后會(huì)解除阻塞重新獲取任務(wù)
2.重新獲取任務(wù)仍需重新校驗(yàn)任務(wù)獲取條件赐写,當(dāng)線程池關(guān)閉時(shí)鸟蜡,比如調(diào)用shutdown(),線程池狀態(tài)變?yōu)镾HUTDOWN挺邀,又因?yàn)榇藭r(shí)任務(wù)隊(duì)列為空揉忘,getTask()直接返回null;若調(diào)用shutdownNow()端铛,線程池狀態(tài)變?yōu)镾TOP泣矛,則直接返回null
3.在runWorker()方法中,當(dāng)getTask()返回null后禾蚕,會(huì)退出循環(huán)您朽,然后調(diào)用processWorkerExit()方法線程回收操作
小問(wèn):既然關(guān)閉線程池后線程池狀態(tài)變更且被中斷后的線程會(huì)被回收,為什么還要執(zhí)行tryTerminate()换淆?
小答:調(diào)用shutdown()后哗总,正在執(zhí)行任務(wù)的工作線程不會(huì)被中斷,當(dāng)它們結(jié)束任務(wù)后倍试,假設(shè)隊(duì)列非空讯屈,這些工作線程會(huì)繼續(xù)執(zhí)行剩余任務(wù)直到阻塞,隨著任務(wù)數(shù)的減少县习,實(shí)際工作線程數(shù)會(huì)不斷減少直到最小維護(hù)數(shù)量涮母;當(dāng)隊(duì)列為空時(shí),最小維護(hù)數(shù)的工作線程們會(huì)一直被阻塞在workerQueue.take()上面准颓,永遠(yuǎn)無(wú)法終止了哈蝇,且線程池被關(guān)閉后也不會(huì)再接收新提交的任務(wù)
-在任何可能導(dǎo)致線程池終止的地方都要調(diào)用tryTerminate(),該方法會(huì)判斷線程池是否已進(jìn)入終止流程攘已,若此時(shí)還有線程存在炮赦,會(huì)重新中斷一個(gè)空閑工作線程
終止流程:線程池狀態(tài)為SHUTDOWN且任務(wù)隊(duì)列為空,或線程池狀態(tài)為STOP
/**
* 終止線程池 -> 最終會(huì)將線程池狀態(tài)變更為TERMINATED
* 只有同時(shí)滿足下面兩個(gè)條件才允許做TERMINATED的狀態(tài)轉(zhuǎn)變:
* 1.線程池狀態(tài)為SHUTDOWN且任務(wù)隊(duì)列為空 或狀態(tài)為STOP
* 2.線程池中已沒(méi)有存活的工作線程 -> 實(shí)際工作線程為0
*/
final void tryTerminate() {
//自旋
for (;;) {
//獲取線程池控制器
int c = ctl.get();
/**
* 有4種情況是不允許執(zhí)行變更TERMINATED操作
*
* 1.線程池仍為運(yùn)行態(tài)RUNNING样勃,說(shuō)明線程池還在正常運(yùn)行中吠勘,
* 此時(shí)是不允許嘗試中斷,起碼要SHUTDOWN或STOP
* 規(guī)則參見(jiàn)shutdown()和shutdownNow()
*
* 2.線程池狀態(tài)已經(jīng)是TIDYING或TERMINATED峡眶,
* 前者說(shuō)明變更TERMINATED正在執(zhí)行中剧防,后者說(shuō)明終止已完成
* 這兩種情況都無(wú)須重復(fù)執(zhí)行終止
*
* 3.線程池狀態(tài)為SHUTDOWN且任務(wù)隊(duì)列非空,
* 說(shuō)明線程池雖然已被要求關(guān)閉辫樱,但還有任務(wù)還沒(méi)處理完
* 需要等待任務(wù)隊(duì)列中剩余任務(wù)被執(zhí)行完畢
*/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
/**
* 此時(shí)線程池狀態(tài)為SHUTDOWN狀態(tài)且隊(duì)列為空峭拘,或已是STOP狀態(tài)
* 4.若工作線程數(shù)非0,說(shuō)明還有工作線程可能正在執(zhí)行或等待任務(wù)中,
* 這種情況的原因參見(jiàn)上文中的小問(wèn)答之`為什么還要執(zhí)行tryTerminate()`
* 此時(shí)會(huì)選擇中斷一個(gè)空閑工作線程以確保SHUTDOWN信號(hào)的傳播
*/
if (workerCountOf(c) != 0) { // Eligible to terminate
/**
* 此時(shí)已經(jīng)進(jìn)入終止流程鸡挠,為了傳播SHUTDOWN信號(hào)辉饱,
* 每次總是中斷一個(gè)空閑工作線程以避免所有線程等待
*
* 小問(wèn):此時(shí)若調(diào)用interruptIdleWorkers(false)呢?
* 小答:注意每個(gè)線程的回收都會(huì)調(diào)用processWorkerExit()
* 而該方法都會(huì)調(diào)用tryTerminate()拣展,而此時(shí)一旦
* 設(shè)置為true(表示全部)的話彭沼,由于中斷操作前必須
* 通過(guò)worker.tryLock()加鎖,因此就可能因鎖競(jìng)爭(zhēng)
* 造成不必要的大量等待备埃,還不如一個(gè)個(gè)執(zhí)行
*
* 小問(wèn):那么為什么shutdown()的時(shí)候可以為true呢姓惑?
* 小答:那是因?yàn)榭臻e線程都是沒(méi)有持有worker鎖的!
* 那么就不會(huì)出現(xiàn)鎖競(jìng)爭(zhēng)帶來(lái)的不必要的開銷
*/
interruptIdleWorkers(ONLY_ONE);
return;
}
/**
* 當(dāng)進(jìn)入終止流程且無(wú)存活的工作線程時(shí)
* 那么就可以terminate終止線程池了
*/
//1.獲取全局鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
/**
* 2.先嘗試變成TIDYING狀態(tài)
* 1.一旦成功按脚,執(zhí)行??方法terminated()
* 2.CAS失敗后會(huì)重試于毙,失敗原因可能是線程池剛好
* 已被設(shè)置為TERMINATED,即線程池終止已經(jīng)完成辅搬,
* 之后在重新循環(huán)中會(huì)因runStateAtLeast(c, TIDYING)
* 而退出該方法
*/
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//3.執(zhí)行終止
terminated();
} finally {
//4.設(shè)置TERMINATED狀態(tài)
ctl.set(ctlOf(TERMINATED, 0));
/**
* 5.通過(guò)喚醒解除條件阻塞
* 當(dāng)關(guān)閉線程池后需要等待剩余任務(wù)完成才真正終止線程池望众,
* 會(huì)調(diào)用awaitTermination()方法,
* 此時(shí)主線程會(huì)被
*
*/
termination.signalAll();
}
return;
}
} finally {
//6.釋放全局鎖
mainLock.unlock();
}
// else retry on failed CAS
}
}
線程失敗和回收處理
addWorkerFailed() - 新增線程失敗處理
處理新增工作線程失敗會(huì)執(zhí)行如下操作:
1.獲得全局鎖
2.從workers集合中移除該worker
3.CAS自旋減少實(shí)際工作線程計(jì)數(shù)
4.嘗試終止線程池
5.釋放全局鎖
/**
* 新增工作線程失敗處理
*/
private void addWorkerFailed(Worker w) {
//1.獲取全局鎖 -> 目的是為了安全更新workers
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//2.從workers集合中移除該worker
if (w != null)
workers.remove(w);
/**
* 3.CAS自旋減少實(shí)際工作線程計(jì)數(shù) -> 最終會(huì)成功
* 小問(wèn):為何已經(jīng)加鎖還是使用CAS?
* 小答:workers必須在持有鎖環(huán)境下使用伞辛,ctl無(wú)須在持有鎖環(huán)境下使用
* 1.workers集合為非線程安全的HashSet,不能使用CAS只能加鎖(即外部控制方式)
* 2.ctl為AtomicInteger原子類型夯缺,因此可以直接使用CAS維護(hù)(即內(nèi)部控制方式)
* 注意:這里說(shuō)的持有鎖指的是持有全局鎖mainLock蚤氏,雖然ReentrantLock底層實(shí)現(xiàn)也是CAS
*/
decrementWorkerCount();
/**
* 4.嘗試終止線程池
*
* 小問(wèn):那么為什么此時(shí)要嘗試終止線程池呢?
* 小答:因?yàn)樾略鼍€程失敗的原因只有一個(gè)
* -> 線程池被關(guān)閉并進(jìn)入終止流程
* 具體可參見(jiàn)addWorker()方法
*/
tryTerminate();
} finally {
//5.釋放全局鎖
mainLock.unlock();
}
}
processWorkerExit() - 線程回收處理
線程回收處理主要分兩個(gè)部分:
1.回收該工作線程
2.根據(jù)需要新增工作線程
一.回收該工作線程主要有6個(gè)步驟:
1.因錯(cuò)誤異常而被突然中斷的線程踊兜,實(shí)際工作線程計(jì)數(shù)-1
2.獲取全局鎖
3.統(tǒng)計(jì)線程池總完成任務(wù)數(shù)
4.將該worker從workers集合中安全移除
5.釋放全局鎖
6.嘗試終止線程池
二.若線程池狀態(tài)為RUNNING或SHUTDOWN時(shí)竿滨,有兩種情況需要新增工作線程:
1.線程因錯(cuò)誤異常而被意外死亡
2.若非意外死亡,則至少保證有最小存活數(shù)個(gè)可用工作線程存活
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//1.因錯(cuò)誤異常而被意外死亡的線程捏境,實(shí)際工作線程計(jì)數(shù)-1
if (completedAbruptly)
// If abrupt, then workerCount wasn't adjusted 作者大佬的注釋真的沒(méi)寫錯(cuò)嗎....
decrementWorkerCount();
//2.獲取全局鎖于游,主要目的是為了安全將worker從workers集合中移除
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//3.統(tǒng)計(jì)線程池總完成任務(wù)數(shù)
completedTaskCount += w.completedTasks;
//4.將該worker從workers集合中安全移除
workers.remove(w);
} finally {
//5.釋放全局鎖
mainLock.unlock();
}
/**
* 6.嘗試終止線程池
* 小問(wèn):為什么此處需要嘗試終止線程池?
* 小答:由于processWorkerExit()方法只會(huì)在
* runWorker()中調(diào)用垫言,而調(diào)用的時(shí)機(jī)有兩個(gè):
* 1.工作線程因錯(cuò)誤異常而被中斷退出
* 2.getTask()返回null
* 根據(jù)tryTerminate()的終止條件可知贰剥,
* 前者實(shí)際上并不會(huì)終止線程池,但問(wèn)題是
* 后者的getTask()是有可能因進(jìn)入終止流程而返回null
*/
tryTerminate();
int c = ctl.get();
/**
* 若線程池狀態(tài)為RUNNING或SHUTDOWN時(shí)筷频,有兩種情況需要新增工作線程
* 1.線程因錯(cuò)誤異常而被意外死亡
* -> 目的是填補(bǔ)這個(gè)意外死亡的工作線程造成的線程缺口(填坑)
* 2.若非意外死亡蚌成,則至少保證有最小存活數(shù)個(gè)可用工作線程存活
* -> 目的是保證線程池正常運(yùn)行或SHUTDOWN時(shí)有能力完成隊(duì)列剩余任務(wù)
*/
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
/**
* 線程最小存活數(shù)由allowCoreThreadTimeOut和隊(duì)列長(zhǎng)度共同決定
* 1.當(dāng)allowCoreThreadTimeOut為true時(shí),若隊(duì)列非空凛捏,
* 至少保證一個(gè)可用線程存活
* 2.當(dāng)allowCoreThreadTimeOut為false時(shí)担忧,實(shí)際工作線程數(shù)
* 一旦超過(guò)核心工作線程數(shù),無(wú)須再新增工作線程了
*/
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//1.若允許響應(yīng)核心工作線程超市且隊(duì)列非空時(shí)
if (min == 0 && ! workQueue.isEmpty())
//至少保證一個(gè)可用線程可用
min = 1;
//2.實(shí)際工作線程數(shù)一旦超過(guò)核心工作線程數(shù)坯癣,無(wú)須再新增線程了
if (workerCountOf(c) >= min)
// replacement not needed
//"替換"指的是替已死亡的線程繼續(xù)填坑(完成剩余任務(wù))
return;
}
/**
* 新增工作線程根據(jù)原因區(qū)分的目的有兩個(gè):
* 1.因意外死亡的:
* -> 目的是為了填補(bǔ)線程空缺
* 2.非意外死亡正常退出且隊(duì)列非空:
* -> 處理任務(wù)隊(duì)列中的剩余任務(wù)
* 雖然目的有區(qū)別瓶盛,但實(shí)際上作用是一致的:
* -> 都是為了處理隊(duì)列任務(wù)(因?yàn)閒irstTask為null)
*/
addWorker(null, false);
}
}
任務(wù)隊(duì)列與排隊(duì)策略
任務(wù)隊(duì)列是用于存儲(chǔ)等待執(zhí)行的任務(wù)的阻塞隊(duì)列(在這里特指實(shí)現(xiàn)了BlockingQueue接口的阻塞隊(duì)列實(shí)現(xiàn)類),其目的是為了實(shí)現(xiàn)數(shù)據(jù)緩存和共享;并發(fā)包原生提供了7種阻塞隊(duì)列惩猫,根據(jù)界化可分成兩部分:
-有界隊(duì)列: 有界隊(duì)列指的是容量有限芝硬,不允許無(wú)限拓展的隊(duì)列,其最大可設(shè)置容量為Integer.MAX_VALUE帆锋,入隊(duì)和出隊(duì)都可能阻塞
有界隊(duì)列(bounded
): 必須給定出初始容量吵取,包括ArrayBlockingQueue
可配隊(duì)列(optionally-bounded
): 不設(shè)置初始容量時(shí)默認(rèn)最大容量為Integer.MAX_VALUE
,包括LinkedBlockingQueue
和LinkedBlockingDeque
-無(wú)界隊(duì)列:無(wú)界隊(duì)列指的無(wú)邊界锯厢,有兩種情況分別是0和無(wú)限制
無(wú)邊界(0):容量為0皮官,不存儲(chǔ)元素,無(wú)阻塞实辑,如SynchronousQueue
無(wú)邊界限制(unbounded
): 允許容量無(wú)限拓展捺氢,直到拋出OutOfMemoryError
,入隊(duì)不會(huì)阻塞剪撬,出隊(duì)才可能阻塞摄乒,包括DelayQueue
、LinkedTransferQueue
残黑、PriorityBlockingQueue
注意:若不特別說(shuō)明馍佑,阻塞隊(duì)列都遵循FIFO先進(jìn)先出規(guī)則
有界隊(duì)列
有界隊(duì)列指的是容量有限且固定,不容許無(wú)限拓展的阻塞隊(duì)列梨水,相對(duì)于無(wú)界隊(duì)列來(lái)說(shuō)拭荤,當(dāng)maximumPoolSizes有限時(shí)可以有效防止資源耗盡,但也增加了控制的難度 -> 有界隊(duì)列需要隊(duì)列大小和最大線程數(shù)之間相互"妥協(xié)":
-大型隊(duì)列+小型池:有效減少線程開銷但可能降低吞吐量疫诽,若任務(wù)頻繁阻塞舅世,比如頻繁I/O,
使用大型隊(duì)列和小型池可以最大限度地降低CPU 使用率奇徒、操作系統(tǒng)資源和上下文切換開銷雏亚,但是可能導(dǎo)致人工降低吞吐量。如果任務(wù)頻繁阻塞(例如摩钙,如果它們是 I/O 邊界)罢低,則系統(tǒng)可能為超過(guò)您許可的更多線程安排時(shí)間
-使用小型隊(duì)列通常要求較大的池大小,CPU 使用率較高腺律,但是可能遇到不可接受的調(diào)度開銷奕短,這樣也會(huì)降低吞吐量
ArrayBlockingQueue
作用:
-由數(shù)組結(jié)構(gòu)組成的有界阻塞隊(duì)列
-除了定長(zhǎng)數(shù)組外還包括兩個(gè)int變量標(biāo)識(shí)頭部和尾部在數(shù)組中的位置
-入隊(duì)和出隊(duì)時(shí)不會(huì)產(chǎn)生或回收任何額外的對(duì)象
-支持公平和非公平模式,默認(rèn)非公平鎖
-內(nèi)部采用一把鎖+兩個(gè)條件的同步方式匀钧,不能真正并發(fā)
LinkedBlockingQueue
作用:
-由鏈表結(jié)構(gòu)組成的有界阻塞隊(duì)列
-此隊(duì)列的默認(rèn)和最大長(zhǎng)度為Integer.MAX_VALUE
-入隊(duì)/出隊(duì)時(shí)每次都會(huì)生成/銷毀一個(gè)額外的Node對(duì)象翎碑,用于實(shí)現(xiàn)鏈表結(jié)構(gòu)
-鏈表的吞吐通常要好于數(shù)組列表(理論上),理由自行谷歌ArrayList和LinkedList的區(qū)別
-內(nèi)部采用兩把鎖+兩個(gè)條件的同步方式之斯,真正并發(fā)
-Executors.newFixedThreadPool()所用阻塞隊(duì)列
坑點(diǎn):
使用默認(rèn)容量時(shí)日杈,若生產(chǎn)速度遠(yuǎn)超過(guò)消費(fèi)速度,可能在因隊(duì)列滿倉(cāng)被阻塞之前內(nèi)存資源就已經(jīng)耗盡了
建議:
-通常只要使用LinkedBlockingQueue和ArrayListBlockingQueue就可以滿足大部分生產(chǎn)-消費(fèi)需求
LinkedBlockingDeque
作用:
-由鏈表結(jié)構(gòu)組成的雙端阻塞隊(duì)列
-雙端隊(duì)列允許從隊(duì)列雙端入隊(duì)和出隊(duì),具體表現(xiàn)在多了很多xxFirst和xxLast方法
-當(dāng)沒(méi)設(shè)置初始容量時(shí)莉擒,此隊(duì)列默認(rèn)使用最大容量值為Integer.MAX_VALUE
-同ArrayListBlockingQueue一樣酿炸,內(nèi)部采用一把鎖+兩個(gè)條件的同步方式,不能真正并發(fā)
無(wú)界隊(duì)列
無(wú)界隊(duì)列指的是容量無(wú)限大或容量為0的阻塞隊(duì)列涨冀,在使用時(shí)需要注意:
1.當(dāng)容量為0時(shí)則要慎重設(shè)置maxPoolSize以避免拒絕新提交的任務(wù)
2.當(dāng)容量無(wú)限大時(shí)填硕,就意味著maxPoolSize失效,設(shè)置該值無(wú)意義鹿鳖,創(chuàng)建線程數(shù)不會(huì)超過(guò)corePoolSize
適用場(chǎng)景:
當(dāng)每個(gè)任務(wù)相互獨(dú)立扁眯,相互無(wú)影響時(shí)最適合無(wú)界隊(duì)列
SynchronousQueue
特點(diǎn):
-不存儲(chǔ)元素的阻塞隊(duì)列,也是線程池默認(rèn)任務(wù)隊(duì)列類型
-隊(duì)列不存儲(chǔ)任務(wù)翅帜,只能進(jìn)行線程之間的元素傳送 -> 即直接提交
-支持公平模式和非公平模式姻檀,默認(rèn)非公平(關(guān)于公平參見(jiàn)reentrantLock)
-Executors.newCachedThreadPool()所用阻塞隊(duì)列
場(chǎng)景:
此策略可以避免在處理可能具有內(nèi)部依賴性的請(qǐng)求集時(shí)出現(xiàn)鎖
坑點(diǎn):
-當(dāng)不存在可用于立即運(yùn)行的任務(wù)時(shí),加入隊(duì)列會(huì)失敗涝滴,此時(shí)會(huì)新增一個(gè)新線程绣版;但若超過(guò)maxPoolSize時(shí)會(huì)出現(xiàn)拒絕新提交的任務(wù)的問(wèn)題!
-非公共模式下若生產(chǎn)和消費(fèi)速度差距較大歼疮,很容易出現(xiàn)饑餓的情況杂抽,某些數(shù)據(jù)可能永遠(yuǎn)無(wú)法執(zhí)行
建議:
-直接提交通常要求無(wú)界maximumPoolSizes以避免拒絕新提交的任務(wù)
-當(dāng)命令以超過(guò)隊(duì)列所能處理的平均數(shù)連續(xù)到達(dá)時(shí),此策略允許無(wú)界線程具有增長(zhǎng)的可能性
PriorityBlockingQueue
特點(diǎn):
-由數(shù)組結(jié)構(gòu)組成且具有優(yōu)先級(jí)的無(wú)界阻塞隊(duì)列韩脏,默認(rèn)容量11
-默認(rèn)自然排序默怨,同時(shí)支持入隊(duì)元素自定義順序(實(shí)現(xiàn)Comparable接口)
-排序算法為堆排序,內(nèi)部線程同步使用公平鎖
-內(nèi)部使用一把鎖+一條件的同步方式:由于是無(wú)界隊(duì)列骤素,因此只需要一個(gè) notEmpty非空條件即可
-值得注意的是只有頭節(jié)點(diǎn)才保證優(yōu)先級(jí)順序,其他節(jié)點(diǎn)不保障
場(chǎng)景:
需要排序后的數(shù)組的時(shí)候
坑點(diǎn):
由于使用堆排序愚屁,因此一旦消費(fèi)速度遠(yuǎn)小于生產(chǎn)速度時(shí)济竹,時(shí)間一長(zhǎng)由于任務(wù)擠壓再加上堆排序需要,很可能會(huì)耗盡所有的堆空間霎槐,即很容易內(nèi)存溢出
建議:
土豪隨意加內(nèi)存送浊,否則需要保證任務(wù)不要積壓過(guò)多
DelayQueue
特點(diǎn):
-使用優(yōu)先級(jí)隊(duì)列實(shí)現(xiàn)有序且獲取延遲的阻塞無(wú)界隊(duì)列
-入隊(duì)元素必須實(shí)現(xiàn)Delayed接口,給定初識(shí)延時(shí)時(shí)間丘跌,只有到達(dá)延時(shí)時(shí)間才能從隊(duì)列中獲取到該元素袭景,該元素不允許為null
-內(nèi)部使用一把鎖+一條件+優(yōu)先級(jí)隊(duì)列的同步方式:由于延遲特性,因此只需要一個(gè)available條件標(biāo)示任務(wù)是否可用即可
場(chǎng)景:
-用于實(shí)現(xiàn)重試機(jī)制闭树,多次延遲執(zhí)行同時(shí)可支持重試次數(shù)限制
-ScheduledThreadPoolExecutor延遲線程池中的DelayedWorkQueue延遲阻塞隊(duì)列是其的優(yōu)化版本耸棒,用于定時(shí)調(diào)度等操作
-用于實(shí)現(xiàn)緩存,雖然推薦使用NoSQL
-TimerQueue的底層數(shù)據(jù)結(jié)構(gòu)
LinkedTransferQueue
特點(diǎn):
-由鏈表結(jié)構(gòu)組成的無(wú)界阻塞隊(duì)列
-TransferQueue是ConcurrentLinkedQueue报辱、SynchronousQueue (公平模式下)与殃、無(wú)界的LinkedBlockingQueues等的超集
-相對(duì)于其他阻塞隊(duì)列LinkedTransferQueue多了tryTransfer()和transfer()方法
-當(dāng)沒(méi)有消費(fèi)者在等待接收元素,transfer()方法會(huì)將元素存放在隊(duì)列的tail節(jié)點(diǎn),阻塞直到有消費(fèi)者消費(fèi)該元素才返回幅疼;否則直接傳遞給消費(fèi)者
米奸,此時(shí)不會(huì)入隊(duì)
-區(qū)別于transfer(),tryTransfer()方法無(wú)論是否有消費(fèi)者在等待接收元素爽篷,都會(huì)立即返回操作結(jié)果是成功或是失敗悴晰,此時(shí)不會(huì)入隊(duì)且非阻塞
-此類使用的復(fù)雜的雙重?cái)?shù)據(jù)結(jié)構(gòu),其方法都需要通過(guò)兩個(gè)步驟實(shí)現(xiàn):
保留:當(dāng)消費(fèi)者從隊(duì)列中獲取元素時(shí)發(fā)現(xiàn)隊(duì)列為空逐工,此時(shí)會(huì)創(chuàng)建一個(gè)數(shù)據(jù)字段為null的Node節(jié)點(diǎn)元素放入隊(duì)列铡溪,然后消費(fèi)者必須阻塞(自旋等待)直到此數(shù)據(jù)字段非空
傳遞: 當(dāng)生產(chǎn)者準(zhǔn)備向隊(duì)列存入元素時(shí)發(fā)現(xiàn)最前面的元素的數(shù)據(jù)字段為null,此時(shí)它會(huì)直接把該元素的數(shù)據(jù)賦值給最前面的元素钻弄,即完成數(shù)據(jù)的傳遞
線程池的監(jiān)控
原生監(jiān)控
監(jiān)控線程池的運(yùn)行情況是挺重要的一件事情佃却,尤其是定位問(wèn)題的時(shí)候,所幸的是線程池原生給我們提供了幾個(gè)監(jiān)控屬性可供get:
1.taskCount:線程池需要執(zhí)行的任務(wù)數(shù)量(近似值)
2.completedTaskCount:線程池在運(yùn)行過(guò)程中已完成的任務(wù)數(shù)量窘俺,小于或等于taskCount
3.largestPoolSize:線程池里曾經(jīng)創(chuàng)建過(guò)的最大線程數(shù)饲帅,若該值與maxPoolSize一致的話說(shuō)明線程池曾經(jīng)滿過(guò)
4.poolSize:線程池中線程數(shù)量,包括不在干活的工作線程數(shù)瘤泪;值得注意的是若線程池不關(guān)閉的話灶泵,線程池中的線程不會(huì)被自動(dòng)回收的,因此對(duì)于運(yùn)行中的線程池來(lái)說(shuō)該值只增不減
5.activeCount:正在運(yùn)行中的工作線程數(shù)(近似值)
值得一提的是這些監(jiān)控屬性的get方法內(nèi)部雖然都是使用全局鎖維護(hù)的对途,但由于線程池運(yùn)行期間的狀態(tài)和線程數(shù)可以被動(dòng)態(tài)調(diào)整赦邻,比如allowCoreThreadTimeOut()、setMaximumPoolSize()实檀、setCorePoolSize()惶洲、shutdown()等等方法,因此有些值只能近似值
拓展監(jiān)控
線程池提供了三個(gè)鉤子方法可以用于拓展功能膳犹,比如監(jiān)控任務(wù)的平均執(zhí)行時(shí)間恬吕、最大執(zhí)行時(shí)間和最小執(zhí)行時(shí)間:
beforeExecute():位于runWorker()方法中,在run()方法前執(zhí)行
afterExecute():位于runWorker()方法中须床,在run()方法后執(zhí)行
terminated():位于tryTerminate()方法中铐料,狀態(tài)CAS為TIDYING之后執(zhí)行
注意:由于上述方法都為protected且線程池中默認(rèn)都是空方法,因此只能通過(guò)繼承線程池或構(gòu)造時(shí)才能重寫上述方法
飽和拒絕策略
線程池的飽和拒絕策略主要用于拒絕任務(wù)(但這并不意味著該任務(wù)不會(huì)被執(zhí)行)豺旬,線程池原生提供了四種飽和拒絕策略钠惩,基本涵蓋常見(jiàn)的飽和處理場(chǎng)景:
AbortPolicy:默認(rèn)策略,直接拋出異常
CallerRunsPolic:只用調(diào)用線程執(zhí)行該任務(wù)
DiscardPolicy:直接丟棄任務(wù)
DiscardOldestPolicy:丟棄隊(duì)尾任務(wù)并用線程池重新嘗試執(zhí)行該任務(wù)
所有的拒絕策略都需要實(shí)現(xiàn)該拒絕處理器接口族阅,以統(tǒng)一口徑:
/**
* 用于拒絕線程池任務(wù)的處理器
*/
public interface RejectedExecutionHandler {
/**
* 該方法用于拒絕接受線程池任務(wù)
*
* 有三種情況可能調(diào)用該方法:
* 1.沒(méi)有更多的工作線程可用
* 2.任務(wù)隊(duì)列已滿
* 3.關(guān)閉線程池
*
* 當(dāng)沒(méi)有其他處理選擇時(shí)篓跛,該方法會(huì)選擇拋出RejectedExecutionException異常
* 該異常會(huì)向上拋出直到execute()的調(diào)用者
*/
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
CallerRunsPolicy
處理規(guī)則:新提交任務(wù)由調(diào)用者線程直接執(zhí)行
推薦:拒絕策略推薦使用CallerRunsPolicy,理由是該策略不會(huì)拋棄任務(wù)坦刀,也不會(huì)拋出異常举塔,而是將任務(wù)回退到調(diào)用者線程中執(zhí)行
/**
* 不會(huì)直接丟棄绑警,而是直接用調(diào)用execute()方法的線程執(zhí)行該方法
* 當(dāng)然一旦線程池已經(jīng)被關(guān)閉,還是要丟棄的
*
* 補(bǔ)充:值得注意的是所有策略類都是public的靜態(tài)內(nèi)部類央渣,
* 其目的應(yīng)該是告知使用者 -> 該類與線程池相關(guān)但無(wú)需線程池實(shí)例便可直接使用
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
/**
* 直接使用調(diào)用該方法的線程執(zhí)行任務(wù)
* 除非線程池被關(guān)閉時(shí)才會(huì)丟棄該任務(wù)
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//一旦線程池被關(guān)閉计盒,丟棄該任務(wù)
if (!e.isShutdown()) {
//注意此時(shí)不是線程池執(zhí)行該任務(wù)
r.run();
}
}
}
AbortPolicy
處理規(guī)則:直接拋出RejectedExecutionException異常
/**
* 簡(jiǎn)單、粗暴的直接拋出RejectedExecutionException異常
*/
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
/**
* 直接拋出異常芽丹,但r.toString()方法會(huì)告訴你哪個(gè)任務(wù)失敗了
* 更人性化的一點(diǎn)是 e.toString()方法還會(huì)告訴你:
* 線程池的狀態(tài)北启、工作線程數(shù)、隊(duì)列長(zhǎng)度拔第、已完成任務(wù)數(shù)
* 建議若是不處理異常起碼也要在日志里面打印一下咕村,留個(gè)案底
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException(
"Task " + r.toString() + " rejected from " + e.toString());
}
}
DiscardPolicy
處理規(guī)則:根據(jù)LIFO(后進(jìn)先出)規(guī)則直接丟棄最新提交的任務(wù)
/**
* 直接丟棄任務(wù)
* 這個(gè)太狠了,連個(gè)案底都沒(méi)有蚊俺,慎用啊
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
/**
* 無(wú)作為即為丟棄
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
DiscardOldestPolicy
處理規(guī)則:根據(jù)LRU(最近最少使用)規(guī)則丟棄最后一個(gè)任務(wù)懈涛,然后嘗試執(zhí)行新提交的任務(wù)
/**
* 比起直接丟棄,該類會(huì)丟棄隊(duì)列里最后一個(gè)但仍未被處理的任務(wù)泳猬,
* 然后會(huì)重新調(diào)用execute()方法處理當(dāng)前任務(wù)
* 除非線程池被關(guān)閉時(shí)才會(huì)丟棄該任務(wù)
* 此類充分證明了"來(lái)得早不如來(lái)的巧"
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
/**
* 丟棄隊(duì)列里最近的一個(gè)任務(wù)批钠,并執(zhí)行當(dāng)前任務(wù)
* 除非線程池被關(guān)閉時(shí)才會(huì)丟棄該任務(wù)
* 原因是隊(duì)列是遵循先進(jìn)先出FIFO原則,poll()會(huì)彈出隊(duì)尾元素
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//一旦線程池被關(guān)閉得封,直接丟棄
if (!e.isShutdown()) {
//彈出隊(duì)尾元素
e.getQueue().poll();
//直接用線程池執(zhí)行當(dāng)前任務(wù)
e.execute(r);
}
}
}
線程池的異常處理
submit()異常處理
使用submit()處理異常時(shí)有四個(gè)注意事項(xiàng):
使用submit()處理異常時(shí)有四個(gè)注意事項(xiàng):
1.異常會(huì)保存在Future對(duì)象的ExecutionException中埋心,可以在調(diào)用get()使用try-catch方式捕獲,有N個(gè)任務(wù)有異常就會(huì)拋出來(lái)N個(gè)異常忙上,但不會(huì)終止當(dāng)前工作線程
2.單獨(dú)設(shè)置UncaughtExceptionHandler沒(méi)卵用拷呆,但結(jié)合(3)使用就有效
3.允許在submit()方法內(nèi)部用try-catch捕獲該異常,同樣不會(huì)終止當(dāng)前線程
4.若想在內(nèi)部處理異常疫粥,還可以重寫afterExecute()方法茬斧,例如:
static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 3, 3, TimeUnit.SECONDS, new SynchronousQueue<>()) {
//構(gòu)造時(shí)直接重寫afterExecute()方法
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
printException(r, t);
}
};
private static void printException(Runnable r, Throwable t) {
if (t == null && r instanceof Future<?>) {
try {
Future<?> future = (Future<?>) r;
if (future.isDone())
future.get();
} catch (ExecutionException e) {
t = e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
if (t != null) {
System.out.println(t);
}
}
execute()異常處理
使用execute()處理異常時(shí)有四個(gè)注意事項(xiàng):
1.默認(rèn)會(huì)在execute()方法內(nèi)部直接拋出異常,注意這不會(huì)中斷線程池運(yùn)行梗逮,但會(huì)終止當(dāng)前工作線程啥供,并重新創(chuàng)建新的工作線程執(zhí)行該任務(wù)
2.允許在execute()方法內(nèi)部用try-catch捕獲該異常,好處是不會(huì)終止當(dāng)前線程并重新創(chuàng)建一個(gè)新的線程了
3.重寫afterExecute()方法
4.還可以設(shè)置UncaughtExceptionHandler库糠,例如:
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2, 3, TimeUnit.SECONDS, new LinkedBlockingQueue(),
//我們自定義一個(gè)線程工廠和重寫線程的setUncaughtExceptionHandler方法
new ThreadFactory() {
final AtomicInteger threadNumber = new AtomicInteger(1);
public Thread newThread(Runnable r) {
Thread thread = new Thread(Thread.currentThread().getThreadGroup(), r, "thread-"
+ (threadNumber.getAndIncrement()));
thread.setUncaughtExceptionHandler((t,e) -> System.out.println(e));
return thread;
}
});
3 FixedThreadPool
- FixedThreadPool模式會(huì)使用一個(gè)優(yōu)先固定數(shù)目的線程來(lái)處理若干數(shù)目的任務(wù)。
- 規(guī)定數(shù)目的線程處理所有任務(wù)涮毫,一旦有線程處理完了任務(wù)就會(huì)被用來(lái)處理新的任務(wù)(如果有的話)瞬欧。
- FixedThreadPool模式下最多的線程數(shù)目是一定的。
3.1 創(chuàng)建FixedThreadPool對(duì)象代碼如:
ExecutorService fixedThreadPool=Executors.newFixedThreadPool(5);
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
}
- FixedThreadPool的corePoolSize和maximumPoolSize參數(shù)都被設(shè)置為nThreads罢防。
- 當(dāng)線程池中的線程數(shù)量大于corePoolSize時(shí)艘虎,keepAliveTime為非核心空閑線程等待新任務(wù)的最長(zhǎng)時(shí)間,超過(guò)這個(gè)時(shí)間后非核心線程將被終止咒吐,這里keepAliveTime設(shè)置為0L野建,就說(shuō)明非核心線程會(huì)立即被終止属划。
- 事實(shí)上這里也沒(méi)有非核心線程創(chuàng)建,因?yàn)楹诵木€程數(shù)和最大線程數(shù)都一樣的候生。
3.2 FixedThreadPool的execute()方法的運(yùn)行流程:
- 分析:
- (1)如果當(dāng)前運(yùn)行線程數(shù)少corePoolSize同眯,則創(chuàng)建一個(gè)新的線程來(lái)執(zhí)行任務(wù)。
- (2)如果當(dāng)前線程池的運(yùn)行線程數(shù)等于corePoolSize唯鸭,那么后面提交的任務(wù)將加入LinkedBlockingQueue须蜗。
- (3)線程在執(zhí)行完圖中的1后,會(huì)在循環(huán)中反復(fù)從LinkedBlockingQueue獲取任務(wù)來(lái)執(zhí)行目溉。
- FixedThreadPool使用的是無(wú)界隊(duì)列LinkedBlockingQueue作為線程池的工作隊(duì)列(隊(duì)列容量為Integer.MAX_VALUE)明肮。使用該隊(duì)列作為工作隊(duì)列會(huì)對(duì)線程池產(chǎn)生如下影響
- (1)當(dāng)前線程池中的線程數(shù)量達(dá)到corePoolSize后,新的任務(wù)將在無(wú)界隊(duì)列中等待缭付。
- (2)由于我們使用的是無(wú)界隊(duì)列柿估,所以參數(shù)maximumPoolSize和keepAliveTime無(wú)效。
- (3)由于使用無(wú)界隊(duì)列陷猫,運(yùn)行中的FixedThreadPool不會(huì)拒絕任務(wù)(當(dāng)然此時(shí)是未執(zhí)行shutdown和shutdownNow方法)秫舌,所以不會(huì)去調(diào)用RejectExecutionHandler的rejectExecution方法拋出異常。
3.3 例子
public class LiftOff implements Runnable{
protected int countDown = 10; //Default
private static int taskCount = 0;
private final int id = taskCount++;
public LiftOff() {}
public LiftOff(int countDown) {
this.countDown = countDown;
}
public String status() {
return "#" + id + "(" +
(countDown > 0 ? countDown : "LiftOff!") + ") ";
}
@Override
public void run() {
while(countDown-- > 0) {
System.out.print(status());
Thread.yield();
}
}
}
public class FixedThreadPool {
public static void main(String[] args) {
//三個(gè)線程來(lái)執(zhí)行五個(gè)任務(wù)
ExecutorService exec = Executors.newFixedThreadPool(3);
for(int i = 0; i < 5; i++) {
exec.execute(new LiftOff());
}
exec.shutdown();
}
}
4 CachedThreadPool
- 首先會(huì)按照需要?jiǎng)?chuàng)建足夠多的線程來(lái)執(zhí)行任務(wù)(Task)烙丛。
- 隨著程序執(zhí)行的過(guò)程舅巷,有的線程執(zhí)行完了任務(wù),可以被重新循環(huán)使用時(shí)河咽,才不再創(chuàng)建新的線程來(lái)執(zhí)行任務(wù)钠右。
- 該線程池比較適合沒(méi)有固定大小并且比較快速就能完成的小任務(wù),它將為每個(gè)任務(wù)創(chuàng)建一個(gè)線程忘蟹。
- 那這樣子它與直接創(chuàng)建線程對(duì)象(new Thread())有什么區(qū)別呢飒房?看到它的第三個(gè)參數(shù)60L和第四個(gè)參數(shù)TimeUnit.SECONDS了嗎?好處就在于60秒內(nèi)能夠重用已創(chuàng)建的線程媚值。
4.1 創(chuàng)建方式
ExecutorService cachedThreadPool=Executors.newCachedThreadPool();
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue());
}
- CachedThreadPool的corePoolSize被設(shè)置為0狠毯,而maximumPoolSize被設(shè)置Integer.MAX_VALUE,即maximumPoolSize是無(wú)界的
- keepAliveTime被設(shè)置為60L褥芒,單位為妙嚼松。也就是空閑線程等待時(shí)間最長(zhǎng)為60秒,超過(guò)該時(shí)間將會(huì)被終止锰扶。
- CachedThreadPool使用的是沒(méi)有容量的SynchronousQueue作為線程池的工作隊(duì)列献酗,
- maximumPoolSize是無(wú)界的,也就是意味著如果主線程提交任務(wù)的速度高于maximumPoolSize中線程處理任務(wù)的速度時(shí)CachedThreadPool將會(huì)不斷的創(chuàng)建新的線程坷牛,> * 在極端情況下罕偎,CachedThreadPool會(huì)因?yàn)閯?chuàng)建過(guò)多線程而耗盡CPU和內(nèi)存資源。
4.2 CachedThreadPool的execute()方法的運(yùn)行流程
- 分析:
- (1)首先執(zhí)行SynchronousQueue.offer(Runnable task)京闰,添加一個(gè)任務(wù)颜及。如果當(dāng)前CachedThreadPool中有空閑線程正在執(zhí)行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),其中NANOSECONDS是毫微秒即十億分之一秒(就是微秒/1000)甩苛,那么主線程執(zhí)行offer操作與空閑線程執(zhí)行poll操作配對(duì)成功,主線程把任務(wù)交給空閑線程執(zhí)行俏站,execute()方法執(zhí)行完成讯蒲,否則進(jìn)入第(2)步。
- (2)當(dāng)CachedThreadPool初始線程數(shù)為空時(shí)乾翔,或者當(dāng)前沒(méi)有空閑線程爱葵,將沒(méi)有線程去執(zhí)行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。這樣的情況下反浓,步驟(1)將會(huì)失敗萌丈,此時(shí)CachedThreadPool會(huì)創(chuàng)建一個(gè)新的線程來(lái)執(zhí)行任務(wù),execute()方法執(zhí)行完成雷则。
- (3)在步驟(2)中創(chuàng)建的新線程將任務(wù)執(zhí)行完成后辆雾,會(huì)執(zhí)行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),這個(gè)poll操作會(huì)讓空閑線程最多在SynchronousQueue中等待60秒月劈,如果60秒內(nèi)主線程提交了一個(gè)新任務(wù)度迂,那么這個(gè)空閑線程將會(huì)執(zhí)行主線程提交的新任務(wù),否則猜揪,這個(gè)空閑線程將被終止惭墓。由于空閑60秒的空閑線程會(huì)被終止,因此長(zhǎng)時(shí)間保持空閑的 CachedThreadPool是不會(huì)使用任何資源的而姐。
- 根據(jù)前面的分析我們知道SynchronousQueue是一個(gè)沒(méi)有容量的阻塞隊(duì)列(其實(shí)個(gè)人認(rèn)為是相對(duì)應(yīng)時(shí)間而已的沒(méi)有容量谈为,因?yàn)闀r(shí)間到空閑線程就會(huì)被移除)喂窟。每個(gè)插入操作必須等到一個(gè)線程與之對(duì)應(yīng)箱熬。CachedThreadPool使用SynchronousQueue春畔,把主線程的任務(wù)傳遞給空閑線程執(zhí)行。
public class CachedThreadPool {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < 10; i++) {
exec.execute(new LiftOff());
}
exec.shutdown();
}
}
5 SingleThreadExecutor
- 只會(huì)創(chuàng)建一個(gè)線程政鼠。它和FixedThreadPool比較類似风瘦,不過(guò)線程數(shù)是一個(gè)。
- 如果多個(gè)任務(wù)被提交給SingleThreadExecutor的話公般,那么這些任務(wù)會(huì)被保存在一個(gè)隊(duì)列中万搔,并且會(huì)按照任務(wù)提交的順序,一個(gè)先執(zhí)行完成再執(zhí)行另外一個(gè)線程官帘。
- SingleThreadExecutor模式可以保證只有一個(gè)任務(wù)會(huì)被執(zhí)行瞬雹。
- 這種特點(diǎn)可以被用來(lái)處理共享資源的問(wèn)題而不需要考慮同步的問(wèn)題。
5.1 創(chuàng)建方式
ExecutorService singleThreadExecutor=Executors.newSingleThreadExecutor();
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue()));
}
- SingleThreadExecutor的corePoolSize和maximumPoolSize被設(shè)置為1遏佣,其他參數(shù)則與FixedThreadPool相同。
- SingleThreadExecutor使用的工作隊(duì)列也是無(wú)界隊(duì)列LinkedBlockingQueue揽浙。
- 由于SingleThreadExecutor采用無(wú)界隊(duì)列的對(duì)線程池的影響與FixedThreadPool一樣
5.2 運(yùn)行流程
- 分析:
- (1)如果當(dāng)前線程數(shù)少于corePoolSize即線程池中沒(méi)有線程運(yùn)行状婶,則創(chuàng)建一個(gè)新的線程來(lái)執(zhí)行任務(wù)意敛。
- (2)在線程池的線程數(shù)量等于corePoolSize時(shí),將任務(wù)加入到LinkedBlockingQueue膛虫。
- (3)線程執(zhí)行完成(1)中的任務(wù)后草姻,會(huì)在一個(gè)無(wú)限循環(huán)中反復(fù)從LinkedBlockingQueue獲取任務(wù)來(lái)執(zhí)行。
5.3 代碼
public class SingleThreadExecutor {
public static void main(String[] args) {
ExecutorService exec = Executors.newSingleThreadExecutor();
for (int i = 0; i < 2; i++) {
exec.execute(new LiftOff());
}
}
}
6 比較
- FixedThreadPool:適用于為了滿足資源管理需求稍刀,而需要限制當(dāng)前線程的數(shù)量的應(yīng)用場(chǎng)景撩独,它適用于負(fù)載比較重的服務(wù)器。
- SingleThreadExecutor:適用于需要保證執(zhí)行順序地執(zhí)行各個(gè)任務(wù)账月;并且在任意時(shí)間點(diǎn)综膀,不會(huì)有多個(gè)線程是活動(dòng)的場(chǎng)景。
- CachedThreadPool:大小無(wú)界的線程池局齿,適用于執(zhí)行很多的短期異步任務(wù)的小程序剧劝,或者負(fù)載較輕的服務(wù)器。
參考
并發(fā)番@ThreadPoolExecutor一文通(1.8版)
[如何合理設(shè)置線程池大小]https://blog.csdn.net/u011519624/article/details/69263460
如何合理地估算線程池大凶ゼ摺讥此?