Java并發(fā):線程池實現(xiàn)原理

一盹靴、總覽

線程池類ThreadPoolExecutor的相關(guān)類需要先了解:

Executor:位于最頂層,只有一個 execute(Runnable runnable) 方法掏熬,用于提交任務(wù)。

ExecutorService :在 Executor 接口的基礎(chǔ)上添加了很多的接口方法醉旦,提交任務(wù),獲取結(jié)果颅围,關(guān)閉線程池蘸秘。

AbstractExecutorService:實現(xiàn)了ExecutorService 接口进宝,然后在其基礎(chǔ)上實現(xiàn)了幾個實用的方法刻坊,這些方法提供給子類進行調(diào)用。

ThreadPoolExecutor:線程池類

Executors:最常用的用于生成 ThreadPoolExecutor 的實例的工具類

FutureTask:Runnable, Future -> RunnableFuture -> FutureTask

FutureTask 通過 RunnableFuture 間接實現(xiàn)了 Runnable 接口党晋, 所以每個 Runnable 通常都先包裝成 FutureTask谭胚, 然后調(diào)用 executor.execute(Runnable command) 將其提交給線程池

Runnable 的 void run() 方法是沒有返回值的徐块,如果我們需要的話,會在 submit 中指定第二個參數(shù)作為返回值灾而。

Callable:Callable 也是因為線程池的需要胡控,所以才有了這個接口。它和 Runnable 的區(qū)別在于 run() 沒有返回值旁趟,而 Callable 的 call() 方法有返回值

二昼激、線程池狀態(tài)

線程池中的各個狀態(tài):

RUNNING:這個沒什么好說的,這是最正常的狀態(tài):接受新的任務(wù)锡搜,處理等待隊列中的任務(wù)

SHUTDOWN:不接受新的任務(wù)提交橙困,但是會繼續(xù)處理等待隊列中的任務(wù)

STOP:不接受新的任務(wù)提交,不再處理等待隊列中的任務(wù)耕餐,中斷正在執(zhí)行任務(wù)的線程

TIDYING:所有的任務(wù)都銷毀了凡傅,workCount 為 0。線程池的狀態(tài)在轉(zhuǎn)換為 TIDYING 狀態(tài)時肠缔,會執(zhí)行鉤子方法 terminated()

TERMINATED:terminated() 方法結(jié)束后夏跷,線程池的狀態(tài)就會變成這個

狀態(tài)轉(zhuǎn)換:

RUNNING -> SHUTDOWN:當調(diào)用了 shutdown() 后,會發(fā)生這個狀態(tài)轉(zhuǎn)換明未,這也是最重要的

(RUNNING or SHUTDOWN) -> STOP:當調(diào)用 shutdownNow() 后槽华,會發(fā)生這個狀態(tài)轉(zhuǎn)換,這下要清楚 shutDown() 和 shutDownNow() 的區(qū)別了

SHUTDOWN -> TIDYING:當任務(wù)隊列和線程池都清空后亚隅,會由 SHUTDOWN 轉(zhuǎn)換為 TIDYING

STOP -> TIDYING:當任務(wù)隊列清空后硼莽,發(fā)生這個轉(zhuǎn)換

TIDYING -> TERMINATED:這個前面說了庶溶,當 terminated() 方法結(jié)束后

ThreadPoolExecutor采用一個 32 位的整數(shù)來存放線程池的狀態(tài)和當前池中的線程數(shù)煮纵,其中高 3 位用于存放線程池狀態(tài),低 29 位表示線程數(shù)偏螺。


private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

? ? // 這里 COUNT_BITS 設(shè)置為 29(32-3)行疏,意味著前三位用于存放線程狀態(tài),后29位用于存放線程數(shù)

? ? // 很多初學(xué)者很喜歡在自己的代碼中寫很多 29 這種數(shù)字套像,或者某個特殊的字符串酿联,然后分布在各個地方,這是非常糟糕的

? ? private static final int COUNT_BITS = Integer.SIZE - 3;

? ? // 000 11111111111111111111111111111

? ? // 這里得到的是 29 個 1夺巩,也就是說線程池的最大線程數(shù)是 2^29-1=536870911

? ? // 以我們現(xiàn)在計算機的實際情況贞让,這個數(shù)量還是夠用的

? ? private static final int CAPACITY? = (1 << COUNT_BITS) - 1;

? ? // 我們說了,線程池的狀態(tài)存放在高 3 位中

? ? // 運算結(jié)果為 111跟29個0:111 00000000000000000000000000000

? ? private static final int RUNNING? ? = -1 << COUNT_BITS;

? ? // 000 00000000000000000000000000000

? ? private static final int SHUTDOWN? =? 0 << COUNT_BITS;

? ? // 001 00000000000000000000000000000

? ? private static final int STOP? ? ? =? 1 << COUNT_BITS;

? ? // 010 00000000000000000000000000000

? ? private static final int TIDYING? ? =? 2 << COUNT_BITS;

? ? // 011 00000000000000000000000000000

? ? private static final int TERMINATED =? 3 << COUNT_BITS;

? ? // 將整數(shù) c 的低 29 位修改為 0柳譬,就得到了線程池的狀態(tài)

? ? private static int runStateOf(int c)? ? { return c & ~CAPACITY; }

? ? // 將整數(shù) c 的高 3 為修改為 0喳张,就得到了線程池中的線程數(shù)

? ? private static int workerCountOf(int c)? { return c & CAPACITY; }

? ? private static int ctlOf(int rs, int wc) { return rs | wc; }

? ? private static boolean runStateLessThan(int c, int s) {

? ? ? ? return c < s;

? ? }

? ? private static boolean runStateAtLeast(int c, int s) {

? ? ? ? return c >= s;

? ? }

? ? private static boolean isRunning(int c) {

? ? ? ? return c < SHUTDOWN;

? ? }


三、線程池參數(shù)

通過ThreadPoolExecutor構(gòu)造函數(shù)來看線程池參數(shù):


public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,

? ? ? ? ? ? long keepAliveTime, TimeUnit unit,

? ? ? ? ? ? BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,

? ? ? ? ? ? RejectedExecutionHandler handler) {

? ? ? ? if (corePoolSize < 0 || maximumPoolSize <= 0

? ? ? ? ? ? ? ? || maximumPoolSize < corePoolSize || keepAliveTime < 0)

? ? ? ? ? ? throw new IllegalArgumentException();

? ? ? ? if (workQueue == null || threadFactory == null || handler == null)

? ? ? ? ? ? throw new NullPointerException();

? ? ? ? this.corePoolSize = corePoolSize;

? ? ? ? this.maximumPoolSize = maximumPoolSize;

? ? ? ? this.workQueue = workQueue;

? ? ? ? this.keepAliveTime = unit.toNanos(keepAliveTime);

? ? ? ? this.threadFactory = threadFactory;

? ? ? ? this.handler = handler;

? ? }

corePoolSize:線程池中核心線程的數(shù)量美澳。當提交一個任務(wù)時销部,線程池會新建一個線程來執(zhí)行任務(wù)摸航,直到當前線程數(shù)等于corePoolSize。如果調(diào)用了線程池的prestartAllCoreThreads()方法舅桩,線程池會提前創(chuàng)建并啟動所有基本線程酱虎。

maximumPoolSize:線程池中允許的最大線程數(shù)。線程池的阻塞隊列滿了之后擂涛,如果還有任務(wù)提交读串,如果當前的線程數(shù)小于maximumPoolSize,則會新建線程來執(zhí)行任務(wù)歼指。注意爹土,如果使用的是無界隊列,該參數(shù)也就沒有什么效果了踩身。

keepAliveTime:空閑線程的闭鸵穑活時間,如果某線程的空閑時間超過這個值都沒有任務(wù)給它做挟阻,那么可以被關(guān)閉了琼娘。注意這個值并不會對所有線程起作用,如果線程池中的線程數(shù)少于等于核心線程數(shù) corePoolSize附鸽,那么這些線程不會因為空閑太長時間而被關(guān)閉脱拼,當然,也可以通過調(diào)用allowCoreThreadTimeOut(true)使核心線程數(shù)內(nèi)的線程也可以被回收

unit:keepAliveTime的單位坷备。TimeUnit

workQueue:

用來保存等待執(zhí)行的任務(wù)的阻塞隊列熄浓,等待的任務(wù)必須實現(xiàn)Runnable接口。我們可以選擇如下幾種:

ArrayBlockingQueue:基于數(shù)組結(jié)構(gòu)的有界阻塞隊列省撑,F(xiàn)IFO赌蔑。

LinkedBlockingQueue:基于鏈表結(jié)構(gòu)的有界阻塞隊列,F(xiàn)IFO竟秫。

SynchronousQueue:不存儲元素的阻塞隊列娃惯,每個插入操作都必須等待一個移出操作,反之亦然肥败。

threadFactory:用于設(shè)置創(chuàng)建線程的工廠趾浅。

handler:

RejectedExecutionHandler,線程池的拒絕策略馒稍。

所謂拒絕策略皿哨,是指將任務(wù)添加到線程池中時,線程池拒絕該任務(wù)所采取的相應(yīng)策略纽谒。當向線程池中提交任務(wù)時证膨,如果此時線程池中的線程已經(jīng)飽和了,而且阻塞隊列也已經(jīng)滿了佛舱,則線程池會選擇一種拒絕策略來處理該任務(wù)椎例。

線程池提供了四種拒絕策略:(重寫RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor))

  AbortPolicy:直接拋出異常挨决,默認策略;

  CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù)订歪;

  DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務(wù)脖祈,并執(zhí)行當前任務(wù);

  DiscardPolicy:直接丟棄任務(wù)刷晋;?

當然我們也可以實現(xiàn)自己的拒絕策略盖高,例如記錄日志等等,實現(xiàn)RejectedExecutionHandler接口寫rejectedExecution方法即可眼虱。

四喻奥、線程池創(chuàng)建

Executor工具類提供了三種線程池創(chuàng)建方式:

FixedThreadPool :可重用固定線程數(shù)的線程池

public static ExecutorService newFixedThreadPool(int nThreads) {

? ? ? ? return new ThreadPoolExecutor(nThreads, nThreads,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 0L, TimeUnit.MILLISECONDS,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? new LinkedBlockingQueue<Runnable>());

? ? }

corePoolSize 和 maximumPoolSize都設(shè)置為創(chuàng)建FixedThreadPool時指定的參數(shù)nThreads,意味著當線程池滿時且阻塞隊列也已經(jīng)滿時捏悬,如果繼續(xù)提交任務(wù)撞蚕,則會直接走拒絕策略,該線程池不會再新建線程來執(zhí)行任務(wù)过牙,而是直接走拒絕策略甥厦。FixedThreadPool使用的是默認的拒絕策略,即AbortPolicy寇钉,則直接拋出異常刀疙。

keepAliveTime設(shè)置為0L,表示空閑的線程會立刻終止扫倡。

workQueue則是使用LinkedBlockingQueue谦秧,但是沒有設(shè)置范圍,那么則是最大值(Integer.MAX_VALUE)撵溃,這基本就相當于一個無界隊列了疚鲤。使用該“無界隊列”則會帶來哪些影響呢?當線程池中的線程數(shù)量等于corePoolSize 時征懈,如果繼續(xù)提交任務(wù)石咬,該任務(wù)會被添加到阻塞隊列workQueue中揩悄,當阻塞隊列也滿了之后卖哎,則線程池會新建線程執(zhí)行任務(wù)直到maximumPoolSize。由于FixedThreadPool使用的是“無界隊列”LinkedBlockingQueue删性,那么maximumPoolSize參數(shù)無效亏娜,同時指定的拒絕策略AbortPolicy也將無效。而且該線程池也不會拒絕提交的任務(wù)蹬挺,如果客戶端提交任務(wù)的速度快于任務(wù)的執(zhí)行维贺,那么keepAliveTime也是一個無效參數(shù)。

SingleThreadExecutor:只有一個線程的固定線程池


public static ExecutorService newSingleThreadExecutor() {

? ? ? ? return new FinalizableDelegatedExecutorService

? ? ? ? ? ? (new ThreadPoolExecutor(1, 1,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 0L, TimeUnit.MILLISECONDS,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? new LinkedBlockingQueue<Runnable>()));

? ? }


為單一worker線程的線程池巴帮,SingleThreadExecutor把corePool和maximumPoolSize均被設(shè)置為1溯泣,和FixedThreadPool一樣使用的是無界隊列LinkedBlockingQueue,所以帶來的影響和FixedThreadPool一樣虐秋。

CachedThreadPool:根據(jù)需要創(chuàng)建新線程的線程池

public static ExecutorService newCachedThreadPool() {

? ? ? ? return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 60L, TimeUnit.SECONDS,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? new SynchronousQueue<Runnable>());

? ? }

CachedThreadPool的corePool為0,maximumPoolSize為Integer.MAX_VALUE垃沦,這就意味著所有的任務(wù)一提交就會加入到阻塞隊列中客给。

keepAliveTime這是為60L,unit設(shè)置為TimeUnit.SECONDS肢簿,意味著空閑線程等待新任務(wù)的最長時間為60秒靶剑,空閑線程超過60秒后將會被終止。

阻塞隊列采用的SynchronousQueue池充,SynchronousQueue是一個沒有元素的阻塞隊列桩引,加上corePool = 0 ,maximumPoolSize = Integer.MAX_VALUE收夸,這樣就會存在一個問題坑匠,如果主線程提交任務(wù)的速度遠遠大于CachedThreadPool的處理速度,則CachedThreadPool會不斷地創(chuàng)建新線程來執(zhí)行任務(wù)卧惜,這樣有可能會導(dǎo)致系統(tǒng)耗盡CPU和內(nèi)存資源笛辟,所以在使用該線程池是,一定要注意控制并發(fā)的任務(wù)數(shù)序苏,否則創(chuàng)建大量的線程可能導(dǎo)致嚴重的性能問題手幢。

五、執(zhí)行過程

提交任務(wù):

線程池根據(jù)業(yè)務(wù)不同的需求提供了兩種方式提交任務(wù):Executor.execute()忱详、ExecutorService.submit()围来。其中ExecutorService.submit()可以獲取該任務(wù)執(zhí)行的Future。

execute()

執(zhí)行流程如下:

(1)如果線程池當前線程數(shù)小于corePoolSize匈睁,則調(diào)用addWorker創(chuàng)建新線程執(zhí)行任務(wù)监透,成功返回true,失敗執(zhí)行步驟2航唆。

(2)如果線程池處于RUNNING狀態(tài)胀蛮,則嘗試加入阻塞隊列,如果加入阻塞隊列成功糯钙,則嘗試進行Double Check粪狼,如果加入失敗,則執(zhí)行步驟3任岸。

  如果加入阻塞隊列成功了再榄,則會進行一個Double Check的過程。Double Check過程的主要目的是判斷加入到阻塞隊里中的線程是否可以被執(zhí)行享潜。如果線程池不是RUNNING狀態(tài)困鸥,則調(diào)用remove()方法從阻塞隊列中刪除該任務(wù),然后調(diào)用reject()方法處理任務(wù)剑按。否則需要確保還有線程執(zhí)行疾就。

(3)如果線程池不是RUNNING狀態(tài)或者加入阻塞隊列失敗澜术,則嘗試創(chuàng)建新線程直到maxPoolSize,如果失敗猬腰,則調(diào)用reject()方法運行相應(yīng)的拒絕策略瘪板。

public void execute(Runnable command) {

? ? if (command == null)

? ? ? ? throw new NullPointerException();

? ? // 前面說的那個表示 “線程池狀態(tài)” 和 “線程數(shù)” 的整數(shù)

? ? int c = ctl.get();

? ? // 如果當前線程數(shù)少于核心線程數(shù),那么直接添加一個 worker 來執(zhí)行任務(wù)漆诽,

? ? // 創(chuàng)建一個新的線程侮攀,并把當前任務(wù) command 作為這個線程的第一個任務(wù)(firstTask)

? ? if (workerCountOf(c) < corePoolSize) {

? ? ? ? // 添加任務(wù)成功,那么就結(jié)束了厢拭。提交任務(wù)嘛兰英,線程池已經(jīng)接受了這個任務(wù),這個方法也就可以返回了

? ? ? ? // 至于執(zhí)行的結(jié)果供鸠,到時候會包裝到 FutureTask 中畦贸。

? ? ? ? // 返回 false 代表線程池不允許提交任務(wù)

? ? ? ? if (addWorker(command, true))

? ? ? ? ? ? return;

? ? ? ? c = ctl.get();

? ? }

? ? // 到這里說明,要么當前線程數(shù)大于等于核心線程數(shù)楞捂,要么剛剛 addWorker 失敗了

? ? // 如果線程池處于 RUNNING 狀態(tài)薄坏,把這個任務(wù)添加到任務(wù)隊列 workQueue 中

? ? if (isRunning(c) && workQueue.offer(command)) {

? ? ? ? /* 這里面說的是,如果任務(wù)進入了 workQueue寨闹,我們是否需要開啟新的線程

? ? ? ? * 因為線程數(shù)在 [0, corePoolSize) 是無條件開啟新的線程

? ? ? ? * 如果線程數(shù)已經(jīng)大于等于 corePoolSize胶坠,那么將任務(wù)添加到隊列中,然后進到這里

? ? ? ? */

? ? ? ? int recheck = ctl.get();

? ? ? ? // 如果線程池已不處于 RUNNING 狀態(tài)繁堡,那么移除已經(jīng)入隊的這個任務(wù)沈善,并且執(zhí)行拒絕策略

? ? ? ? if (! isRunning(recheck) && remove(command))

? ? ? ? ? ? reject(command);

? ? ? ? // 如果線程池還是 RUNNING 的,并且線程數(shù)為 0椭蹄,那么開啟新的線程

? ? ? ? // 到這里闻牡,我們知道了,這塊代碼的真正意圖是:擔心任務(wù)提交到隊列中了绳矩,但是線程都關(guān)閉了

? ? ? ? else if (workerCountOf(recheck) == 0)

? ? ? ? ? ? addWorker(null, false);

? ? }

? ? // 如果 workQueue 隊列滿了罩润,那么進入到這個分支

? ? // 以 maximumPoolSize 為界創(chuàng)建新的 worker,

? ? // 如果失敗翼馆,說明當前線程數(shù)已經(jīng)達到 maximumPoolSize割以,執(zhí)行拒絕策略

? ? else if (!addWorker(command, false))

? ? ? ? reject(command);

}

addWorker

在這里需要好好理論addWorker中的參數(shù),在execute()方法中写妥,有三處調(diào)用了該方法:

第一次:workerCountOf(c) < corePoolSize ==> addWorker(command, true)拳球,這個很好理解审姓,當然線程池的線程數(shù)量小于 corePoolSize ,則新建線程執(zhí)行任務(wù)即可魔吐,在執(zhí)行過程core == true莱找,內(nèi)部與corePoolSize比較即可奥溺。

第二次:加入阻塞隊列進行Double Check時,else if (workerCountOf(recheck) == 0) ==>addWorker(null, false)浮定。如果線程池中的線程==0层亿,按照道理應(yīng)該該任務(wù)應(yīng)該新建線程執(zhí)行任務(wù)桦卒,但是由于已經(jīng)該任務(wù)已經(jīng)添加到了阻塞隊列,那么就在線程池中新建一個空線程匿又,然后從阻塞隊列中取線程即可方灾。

第三次:線程池不是RUNNING狀態(tài)或者加入阻塞隊列失敗:else if (!addWorker(command, false))碌更,這里core == fase裕偿,則意味著是與maximumPoolSize比較。

執(zhí)行流程:

(1)判斷當前線程是否可以添加任務(wù)痛单,如果可以則進行下一步嘿棘,否則return false;

rs >= SHUTDOWN 旭绒,表示當前線程處于SHUTDOWN 蔫巩,STOP、TIDYING快压、TERMINATED狀態(tài)

rs == SHUTDOWN , firstTask != null時不允許添加線程圆仔,因為線程處于SHUTDOWN 狀態(tài),不允許添加任務(wù)

rs == SHUTDOWN , firstTask == null蔫劣,但workQueue.isEmpty() == true坪郭,不允許添加線程,因為firstTask == null是為了添加一個沒有任務(wù)的線程然后再從workQueue中獲取任務(wù)的脉幢,如果workQueue == null歪沃,則說明添加的任務(wù)沒有任何意義。

(2)內(nèi)嵌循環(huán)嫌松,通過CAS worker + 1

(3)獲取主鎖mailLock沪曙,如果線程池處于RUNNING狀態(tài)獲取處于SHUTDOWN狀態(tài)且 firstTask == null,則將任務(wù)添加到workers Queue中萎羔,然后釋放主鎖mainLock液走,然后啟動線程缘眶,然后return true该抒,如果中途失敗導(dǎo)致workerStarted= false凑保,則調(diào)用addWorkerFailed()方法進行處理愉适。


// 第一個參數(shù)是準備提交給這個線程執(zhí)行的任務(wù)维咸,之前說了癌蓖,可以為 null

// 第二個參數(shù)為 true 代表使用核心線程數(shù) corePoolSize 作為創(chuàng)建線程的界線,也就說創(chuàng)建這個線程的時候用僧,

//? ? ? ? 如果線程池中的線程總數(shù)已經(jīng)達到 corePoolSize责循,那么不能響應(yīng)這次創(chuàng)建線程的請求

//? ? ? ? 如果是 false院仿,代表使用最大線程數(shù) maximumPoolSize 作為界線

private boolean addWorker(Runnable firstTask, boolean core) {

? ? retry:

? ? for (;;) {

? ? ? ? int c = ctl.get();

? ? ? ? int rs = runStateOf(c);

? ? ? ? // 這個非常不好理解

? ? ? ? // 如果線程池已關(guān)閉歹垫,并滿足以下條件之一,那么不創(chuàng)建新的 worker:

? ? ? ? // 1. 線程池狀態(tài)大于 SHUTDOWN暮芭,其實也就是 STOP, TIDYING, 或 TERMINATED

? ? ? ? // 2. firstTask != null

? ? ? ? // 3. workQueue.isEmpty()

? ? ? ? // 簡單分析下:

? ? ? ? // 還是狀態(tài)控制的問題,當線程池處于 SHUTDOWN 的時候蠢沿,不允許提交任務(wù),但是已有的任務(wù)繼續(xù)執(zhí)行

? ? ? ? // 當狀態(tài)大于 SHUTDOWN 時面哼,不允許提交任務(wù)魔策,且中斷正在執(zhí)行的任務(wù)

? ? ? ? // 多說一句:如果線程池處于 SHUTDOWN,但是 firstTask 為 null政敢,且 workQueue 非空喷户,那么是允許創(chuàng)建 worker 的

? ? ? ? if (rs >= SHUTDOWN &&

? ? ? ? ? ? ! (rs == SHUTDOWN &&

? ? ? ? ? ? ? firstTask == null &&

? ? ? ? ? ? ? ! workQueue.isEmpty()))

? ? ? ? ? ? return false;

? ? ? ? for (;;) {

? ? ? ? ? ? int wc = workerCountOf(c);

? ? ? ? ? ? if (wc >= CAPACITY ||

? ? ? ? ? ? ? ? wc >= (core ? corePoolSize : maximumPoolSize))

? ? ? ? ? ? ? ? return false;

? ? ? ? ? ? // 如果成功,那么就是所有創(chuàng)建線程前的條件校驗都滿足了河哑,準備創(chuàng)建線程執(zhí)行任務(wù)了

? ? ? ? ? ? // 這里失敗的話灾馒,說明有其他線程也在嘗試往線程池中創(chuàng)建線程

? ? ? ? ? ? if (compareAndIncrementWorkerCount(c))

? ? ? ? ? ? ? ? break retry;

? ? ? ? ? ? // 由于有并發(fā)睬罗,重新再讀取一下 ctl

? ? ? ? ? ? c = ctl.get();

? ? ? ? ? ? // 正常如果是 CAS 失敗的話容达,進到下一個里層的for循環(huán)就可以了

? ? ? ? ? ? // 可是如果是因為其他線程的操作羡滑,導(dǎo)致線程池的狀態(tài)發(fā)生了變更柒昏,如有其他線程關(guān)閉了這個線程池

? ? ? ? ? ? // 那么需要回到外層的for循環(huán)

? ? ? ? ? ? if (runStateOf(c) != rs)

? ? ? ? ? ? ? ? continue retry;

? ? ? ? ? ? // else CAS failed due to workerCount change; retry inner loop

? ? ? ? }

? ? }

? ? /*

? ? * 到這里职祷,我們認為在當前這個時刻有梆,可以開始創(chuàng)建線程來執(zhí)行任務(wù)了,

? ? * 因為該校驗的都校驗了痰催,至于以后會發(fā)生什么夸溶,那是以后的事,至少當前是滿足條件的

? ? */

? ? // worker 是否已經(jīng)啟動

? ? boolean workerStarted = false;

? ? // 是否已將這個 worker 添加到 workers 這個 HashSet 中

? ? boolean workerAdded = false;

? ? Worker w = null;

? ? try {

? ? ? ? final ReentrantLock mainLock = this.mainLock;

? ? ? ? // 把 firstTask 傳給 worker 的構(gòu)造方法

? ? ? ? w = new Worker(firstTask);

? ? ? ? // 取 worker 中的線程對象压语,之前說了胎食,Worker的構(gòu)造方法會調(diào)用 ThreadFactory 來創(chuàng)建一個新的線程

? ? ? ? final Thread t = w.thread;

? ? ? ? if (t != null) {

? ? ? ? ? ? // 這個是整個類的全局鎖,持有這個鎖才能讓下面的操作“順理成章”粥航,

? ? ? ? ? ? // 因為關(guān)閉一個線程池需要這個鎖,至少我持有鎖的期間缀程,線程池不會被關(guān)閉

? ? ? ? ? ? mainLock.lock();

? ? ? ? ? ? try {

? ? ? ? ? ? ? ? int c = ctl.get();

? ? ? ? ? ? ? ? int rs = runStateOf(c);

? ? ? ? ? ? ? ? // 小于 SHUTTDOWN 那就是 RUNNING杨凑,這個自不必說撩满,是最正常的情況

? ? ? ? ? ? ? ? // 如果等于 SHUTDOWN搞糕,前面說了曼追,不接受新的任務(wù),但是會繼續(xù)執(zhí)行等待隊列中的任務(wù)

? ? ? ? ? ? ? ? if (rs < SHUTDOWN ||

? ? ? ? ? ? ? ? ? ? (rs == SHUTDOWN && firstTask == null)) {

? ? ? ? ? ? ? ? ? ? // worker 里面的 thread 可不能是已經(jīng)啟動的

? ? ? ? ? ? ? ? ? ? if (t.isAlive())

? ? ? ? ? ? ? ? ? ? ? ? throw new IllegalThreadStateException();

? ? ? ? ? ? ? ? ? ? // 加到 workers 這個 HashSet 中

? ? ? ? ? ? ? ? ? ? workers.add(w);

? ? ? ? ? ? ? ? ? ? int s = workers.size();

? ? ? ? ? ? ? ? ? ? // largestPoolSize 用于記錄 workers 中的個數(shù)的最大值

? ? ? ? ? ? ? ? ? ? // 因為 workers 是不斷增加減少的针史,通過這個值可以知道線程池的大小曾經(jīng)達到的最大值

? ? ? ? ? ? ? ? ? ? if (s > largestPoolSize)

? ? ? ? ? ? ? ? ? ? ? ? largestPoolSize = s;

? ? ? ? ? ? ? ? ? ? workerAdded = true;

? ? ? ? ? ? ? ? }

? ? ? ? ? ? } finally {

? ? ? ? ? ? ? ? mainLock.unlock();

? ? ? ? ? ? }

? ? ? ? ? ? // 添加成功的話,啟動這個線程

? ? ? ? ? ? if (workerAdded) {

? ? ? ? ? ? ? ? // 啟動線程

? ? ? ? ? ? ? ? t.start();

? ? ? ? ? ? ? ? workerStarted = true;

? ? ? ? ? ? }

? ? ? ? }

? ? } finally {

? ? ? ? // 如果線程沒有啟動泌参,需要做一些清理工作沽一,如前面 workCount 加了 1,將其減掉

? ? ? ? if (! workerStarted)

? ? ? ? ? ? addWorkerFailed(w);

? ? }

? ? // 返回線程是否啟動成功

? ? return workerStarted;

Woker內(nèi)部類

從Worker的源碼中我們可以看到Woker繼承AQS蝗蛙,實現(xiàn)Runnable接口,所以可以認為Worker既是一個可以執(zhí)行的任務(wù)牍蜂,也可以達到獲取鎖釋放鎖的效果泰涂。這里繼承AQS主要是為了方便線程的中斷處理鲫竞。這里注意兩個地方:構(gòu)造函數(shù)、run()逼蒙。構(gòu)造函數(shù)主要是做三件事:1.設(shè)置同步狀態(tài)state為-1从绘,同步狀態(tài)大于0表示就已經(jīng)獲取了鎖,2.設(shè)置將當前任務(wù)task設(shè)置為firstTask是牢,3.利用Worker本身對象this和ThreadFactory創(chuàng)建線程對象僵井。

private final class Worker extends AbstractQueuedSynchronizer

? ? ? ? ? ? implements Runnable {

? ? ? ? private static final long serialVersionUID = 6138294804551838833L;

? ? ? ? // task 的thread

? ? ? ? final Thread thread;

? ? ? ? // 運行的任務(wù)task

? ? ? ? Runnable firstTask;

? ? ? ? volatile long completedTasks;

? ? ? ? Worker(Runnable firstTask) {

? ? ? ? ? ? //設(shè)置AQS的同步狀態(tài)private volatile int state,是一個計數(shù)器驳棱,大于0代表鎖已經(jīng)被獲取

? ? ? ? ? ? setState(-1);

? ? ? ? ? ? this.firstTask = firstTask;

? ? ? ? ? ? // 利用ThreadFactory和 Worker這個Runnable創(chuàng)建的線程對象

? ? ? ? ? ? this.thread = getThreadFactory().newThread(this);

? ? ? ? }

? ? ? ? // 任務(wù)執(zhí)行

? ? ? ? public void run() {

? ? ? ? ? ? runWorker(this);

? ? ? ? }

? ? }

runWorker

運行流程:

(1)根據(jù)worker獲取要執(zhí)行的任務(wù)task合呐,然后調(diào)用unlock()方法釋放鎖,這里釋放鎖的主要目的在于中斷,因為在new Worker時,設(shè)置的state為-1豹休,調(diào)用unlock()方法可以將state設(shè)置為0敢茁,這里主要原因就在于interruptWorkers()方法只有在state >= 0時才會執(zhí)行逢倍;

(2)通過getTask()獲取執(zhí)行的任務(wù)忘晤,調(diào)用task.run()執(zhí)行闰蛔,當然在執(zhí)行之前會調(diào)用worker.lock()上鎖,執(zhí)行之后調(diào)用worker.unlock()放鎖;

(3)在任務(wù)執(zhí)行前后矿酵,可以根據(jù)業(yè)務(wù)場景自定義beforeExecute() 和 afterExecute()方法妄均,則兩個方法在ThreadPoolExecutor中是空實現(xiàn)寄症;

(4)如果線程執(zhí)行完成,則會調(diào)用getTask()方法從阻塞隊列中獲取新任務(wù)岂傲,如果阻塞隊列為空,則根據(jù)是否超時來判斷是否需要阻塞苟翻;

(5)task == null或者拋出異常(beforeExecute()涕烧、task.run()、afterExecute()均有可能)導(dǎo)致worker線程終止肝集,則調(diào)用processWorkerExit()方法處理worker退出流程勉痴。

// 此方法由 worker 線程啟動后調(diào)用雏掠,這里用一個 while 循環(huán)來不斷地從等待隊列中獲取任務(wù)并執(zhí)行

// 前面說了屋群,worker 在初始化的時候,可以指定 firstTask痘昌,那么第一個任務(wù)也就可以不需要從隊列中獲取

final void runWorker(Worker w) {

? ? //

? ? Thread wt = Thread.currentThread();

? ? // 該線程的第一個任務(wù)(如果有的話)

? ? Runnable task = w.firstTask;

? ? w.firstTask = null;

? ? w.unlock(); // allow interrupts

? ? boolean completedAbruptly = true;

? ? try {

? ? ? ? // 循環(huán)調(diào)用 getTask 獲取任務(wù)

? ? ? ? while (task != null || (task = getTask()) != null) {

? ? ? ? ? ? w.lock();? ? ? ? ?

? ? ? ? ? ? // 如果線程池狀態(tài)大于等于 STOP钥勋,那么意味著該線程也要中斷

? ? ? ? ? ? if ((runStateAtLeast(ctl.get(), STOP) ||

? ? ? ? ? ? ? ? (Thread.interrupted() &&

? ? ? ? ? ? ? ? ? runStateAtLeast(ctl.get(), STOP))) &&

? ? ? ? ? ? ? ? !wt.isInterrupted())

? ? ? ? ? ? ? ? wt.interrupt();

? ? ? ? ? ? try {

? ? ? ? ? ? ? ? // 這是一個鉤子方法,留給需要的子類實現(xiàn)

? ? ? ? ? ? ? ? beforeExecute(wt, task);

? ? ? ? ? ? ? ? Throwable thrown = null;

? ? ? ? ? ? ? ? try {

? ? ? ? ? ? ? ? ? ? // 到這里終于可以執(zhí)行任務(wù)了

? ? ? ? ? ? ? ? ? ? task.run();

? ? ? ? ? ? ? ? } catch (RuntimeException x) {

? ? ? ? ? ? ? ? ? ? thrown = x; throw x;

? ? ? ? ? ? ? ? } catch (Error x) {

? ? ? ? ? ? ? ? ? ? thrown = x; throw x;

? ? ? ? ? ? ? ? } catch (Throwable x) {

? ? ? ? ? ? ? ? ? ? // 這里不允許拋出 Throwable辆苔,所以轉(zhuǎn)換為 Error

? ? ? ? ? ? ? ? ? ? thrown = x; throw new Error(x);

? ? ? ? ? ? ? ? } finally {

? ? ? ? ? ? ? ? ? ? // 也是一個鉤子方法算灸,將 task 和異常作為參數(shù),留給需要的子類實現(xiàn)

? ? ? ? ? ? ? ? ? ? afterExecute(task, thrown);

? ? ? ? ? ? ? ? }

? ? ? ? ? ? } finally {

? ? ? ? ? ? ? ? // 置空 task驻啤,準備 getTask 獲取下一個任務(wù)

? ? ? ? ? ? ? ? task = null;

? ? ? ? ? ? ? ? // 累加完成的任務(wù)數(shù)

? ? ? ? ? ? ? ? w.completedTasks++;

? ? ? ? ? ? ? ? // 釋放掉 worker 的獨占鎖

? ? ? ? ? ? ? ? w.unlock();

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? completedAbruptly = false;

? ? } finally {

? ? ? ? // 如果到這里菲驴,需要執(zhí)行線程關(guān)閉:

? ? ? ? // 1. 說明 getTask 返回 null,也就是說骑冗,這個 worker 的使命結(jié)束了赊瞬,執(zhí)行關(guān)閉

? ? ? ? // 2. 任務(wù)執(zhí)行過程中發(fā)生了異常

? ? ? ? // 第一種情況,已經(jīng)在代碼處理了將 workCount 減 1贼涩,這個在 getTask 方法分析中會說

? ? ? ? // 第二種情況巧涧,workCount 沒有進行處理,所以需要在 processWorkerExit 中處理

? ? ? ? // 限于篇幅遥倦,我不準備分析這個方法了谤绳,感興趣的讀者請自行分析源碼

? ? ? ? processWorkerExit(w, completedAbruptly);

? ? }

}

getTask()

// 此方法有三種可能:

// 1. 阻塞直到獲取到任務(wù)返回。我們知道,默認 corePoolSize 之內(nèi)的線程是不會被回收的闷供,

//? ? ? 它們會一直等待任務(wù)

// 2. 超時退出烟央。keepAliveTime 起作用的時候,也就是如果這么多時間內(nèi)都沒有任務(wù)歪脏,那么應(yīng)該執(zhí)行關(guān)閉

????在此我向大家推薦一個架構(gòu)學(xué)習(xí)交流圈:609164807 ?幫助突破瓶頸 提升思維能力

// 3. 如果發(fā)生了以下條件疑俭,此方法必須返回 null:

//? ? - 池中有大于 maximumPoolSize 個 workers 存在(通過調(diào)用 setMaximumPoolSize 進行設(shè)置)

//? ? - 線程池處于 SHUTDOWN,而且 workQueue 是空的婿失,前面說了钞艇,這種不再接受新的任務(wù)

//? ? - 線程池處于 STOP,不僅不接受新的線程豪硅,連 workQueue 中的線程也不再執(zhí)行

private Runnable getTask() {

? ? boolean timedOut = false; // Did the last poll() time out?

? ? retry:

? ? for (;;) {

? ? ? ? int c = ctl.get();

? ? ? ? int rs = runStateOf(c);

? ? ? ? // 兩種可能

? ? ? ? // 1. rs == SHUTDOWN && workQueue.isEmpty()

? ? ? ? // 2. rs >= STOP

? ? ? ? if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {

? ? ? ? ? ? // CAS 操作哩照,減少工作線程數(shù)

? ? ? ? ? ? decrementWorkerCount();

? ? ? ? ? ? return null;

? ? ? ? }

? ? ? ? boolean timed;? ? ? // Are workers subject to culling?

? ? ? ? for (;;) {

? ? ? ? ? ? int wc = workerCountOf(c);

? ? ? ? ? ? // 允許核心線程數(shù)內(nèi)的線程回收,或當前線程數(shù)超過了核心線程數(shù)懒浮,那么有可能發(fā)生超時關(guān)閉

? ? ? ? ? ? timed = allowCoreThreadTimeOut || wc > corePoolSize;

? ? ? ? ? ? // 這里 break飘弧,是為了不往下執(zhí)行后一個 if (compareAndDecrementWorkerCount(c))

? ? ? ? ? ? // 兩個 if 一起看:如果當前線程數(shù) wc > maximumPoolSize,或者超時砚著,都返回 null

? ? ? ? ? ? // 那這里的問題來了次伶,wc > maximumPoolSize 的情況,為什么要返回 null稽穆?

? ? ? ? ? ? //? ? 換句話說冠王,返回 null 意味著關(guān)閉線程。

? ? ? ? ? ? // 那是因為有可能開發(fā)者調(diào)用了 setMaximumPoolSize 將線程池的 maximumPoolSize 調(diào)小了

? ? ? ? ? ? if (wc <= maximumPoolSize && ! (timedOut && timed))

? ? ? ? ? ? ? ? break;

? ? ? ? ? ? if (compareAndDecrementWorkerCount(c))

? ? ? ? ? ? ? ? return null;

? ? ? ? ? ? c = ctl.get();? // Re-read ctl

? ? ? ? ? ? // compareAndDecrementWorkerCount(c) 失敗舌镶,線程池中的線程數(shù)發(fā)生了改變

? ? ? ? ? ? if (runStateOf(c) != rs)

? ? ? ? ? ? ? ? continue retry;

? ? ? ? ? ? // else CAS failed due to workerCount change; retry inner loop

? ? ? ? }

? ? ? ? // wc <= maximumPoolSize 同時沒有超時

? ? ? ? try {

? ? ? ? ? ? // 到 workQueue 中獲取任務(wù)

? ? ? ? ? ? Runnable r = timed ?

? ? ? ? ? ? ? ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

? ? ? ? ? ? ? ? workQueue.take();

? ? ? ? ? ? if (r != null)

? ? ? ? ? ? ? ? return r;

? ? ? ? ? ? timedOut = true;

? ? ? ? } catch (InterruptedException retry) {

? ? ? ? ? ? // 如果此 worker 發(fā)生了中斷柱彻,采取的方案是重試

? ? ? ? ? ? // 解釋下為什么會發(fā)生中斷,這個讀者要去看 setMaximumPoolSize 方法餐胀,

? ? ? ? ? ? // 如果開發(fā)者將 maximumPoolSize 調(diào)小了哟楷,導(dǎo)致其小于當前的 workers 數(shù)量,

? ? ? ? ? ? // 那么意味著超出的部分線程要被關(guān)閉骂澄。重新進入 for 循環(huán)吓蘑,自然會有部分線程會返回 null

? ? ? ? ? ? timedOut = false;

? ? ? ? }

? ? }

}

在此我向大家推薦一個架構(gòu)學(xué)習(xí)交流群。交流學(xué)習(xí)群號874811168 里面會分享一些資深架構(gòu)師錄制的視頻錄像:有Spring坟冲,MyBatis,Netty源碼分析溃蔫,高并發(fā)健提、高性能、分布式伟叛、微服務(wù)架構(gòu)的原理私痹,JVM性能優(yōu)化、分布式架構(gòu)等這些成為架構(gòu)師必備的知識體系。還能領(lǐng)取免費的學(xué)習(xí)資源紊遵,目前受益良多

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末账千,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子暗膜,更是在濱河造成了極大的恐慌匀奏,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,599評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件学搜,死亡現(xiàn)場離奇詭異娃善,居然都是意外死亡,警方通過查閱死者的電腦和手機瑞佩,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,629評論 3 385
  • 文/潘曉璐 我一進店門聚磺,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人炬丸,你說我怎么就攤上這事瘫寝。” “怎么了稠炬?”我有些...
    開封第一講書人閱讀 158,084評論 0 348
  • 文/不壞的土叔 我叫張陵焕阿,是天一觀的道長。 經(jīng)常有香客問我酸纲,道長捣鲸,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,708評論 1 284
  • 正文 為了忘掉前任闽坡,我火速辦了婚禮栽惶,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘疾嗅。我一直安慰自己外厂,他們只是感情好,可當我...
    茶點故事閱讀 65,813評論 6 386
  • 文/花漫 我一把揭開白布代承。 她就那樣靜靜地躺著汁蝶,像睡著了一般。 火紅的嫁衣襯著肌膚如雪论悴。 梳的紋絲不亂的頭發(fā)上掖棉,一...
    開封第一講書人閱讀 50,021評論 1 291
  • 那天,我揣著相機與錄音膀估,去河邊找鬼皂吮。 笑死蹬铺,一個胖子當著我的面吹牛拼余,可吹牛的內(nèi)容都是我干的褐桌。 我是一名探鬼主播针肥,決...
    沈念sama閱讀 39,120評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼香伴!你這毒婦竟也來了慰枕?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,866評論 0 268
  • 序言:老撾萬榮一對情侶失蹤即纲,失蹤者是張志新(化名)和其女友劉穎具帮,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體崇裁,經(jīng)...
    沈念sama閱讀 44,308評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡匕坯,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,633評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了拔稳。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片葛峻。...
    茶點故事閱讀 38,768評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖巴比,靈堂內(nèi)的尸體忽然破棺而出术奖,到底是詐尸還是另有隱情,我是刑警寧澤轻绞,帶...
    沈念sama閱讀 34,461評論 4 333
  • 正文 年R本政府宣布采记,位于F島的核電站,受9級特大地震影響政勃,放射性物質(zhì)發(fā)生泄漏唧龄。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 40,094評論 3 317
  • 文/蒙蒙 一奸远、第九天 我趴在偏房一處隱蔽的房頂上張望既棺。 院中可真熱鬧,春花似錦懒叛、人聲如沸丸冕。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,850評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽胖烛。三九已至,卻和暖如春诅迷,著一層夾襖步出監(jiān)牢的瞬間佩番,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,082評論 1 267
  • 我被黑心中介騙來泰國打工罢杉, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留答捕,地道東北人。 一個月前我還...
    沈念sama閱讀 46,571評論 2 362
  • 正文 我出身青樓屑那,卻偏偏與公主長得像拱镐,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子持际,可洞房花燭夜當晚...
    茶點故事閱讀 43,666評論 2 350

推薦閱讀更多精彩內(nèi)容