第一部分
來看一下線程池的框架圖,如下:?
1廓啊、Executor任務(wù)提交接口與Executors工具類
Executor框架同java.util.concurrent.Executor 接口在Java 5中被引入。Executor框架是一個(gè)根據(jù)一組執(zhí)行策略調(diào)用封豪,調(diào)度谴轮,執(zhí)行和控制的異步任務(wù)的框架。Executor存在的目的是提供一種將”任務(wù)提交”與”任務(wù)如何運(yùn)行”分離開來的機(jī)制吹埠。定義如下:
publicinterfaceExecutor{voidexecute(Runnable command);? }
1
2
3
雖然只有一個(gè)方法第步,但是卻為靈活且強(qiáng)大的異步任務(wù)執(zhí)行框架提供了基礎(chǔ)。它提供了一種標(biāo)準(zhǔn)的方法將任務(wù)的提交過程與執(zhí)行過程解耦開來藻雌,并用Runnable來表示任務(wù)雌续。那么我們怎么得到Executor對象呢?這就是接下來要介紹的Exectors了胯杭。?
Executors為Executor驯杜,ExecutorService,ScheduledExecutorService做个,ThreadFactory和Callable類提供了一些工具方法鸽心,類似于集合中的Collections類的功能。Executors方便的創(chuàng)建線程池居暖。
1>newCachedThreadPool?:該線程池比較適合沒有固定大小并且比較快速就能完成的小任務(wù)顽频,它將為每個(gè)任務(wù)創(chuàng)建一個(gè)線程。那這樣子它與直接創(chuàng)建線程對象(new Thread())有什么區(qū)別呢太闺?看到它的第三個(gè)參數(shù)60L和第四個(gè)參數(shù)TimeUnit.SECONDS了嗎糯景?好處就在于60秒內(nèi)能夠重用已創(chuàng)建的線程。下面是Executors中的newCachedThreadPool()的源代碼:
publicstaticExecutorServicenewCachedThreadPool() {returnnewThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,newSynchronousQueue());? ? ? }
1
2
3
2> newFixedThreadPool使用的Thread對象的數(shù)量是有限的,如果提交的任務(wù)數(shù)量大于限制的最大線程數(shù)省骂,那么這些任務(wù)講排隊(duì)蟀淮,然后當(dāng)有一個(gè)線程的任務(wù)結(jié)束之后,將會(huì)根據(jù)調(diào)度策略繼續(xù)等待執(zhí)行下一個(gè)任務(wù)钞澳。下面是Executors中的newFixedThreadPool()的源代碼:
publicstaticExecutorServicenewFixedThreadPool(intnThreads) {returnnewThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,newLinkedBlockingQueue());? ? }
1
2
3
3>newSingleThreadExecutor就是線程數(shù)量為1的FixedThreadPool,如果提交了多個(gè)任務(wù)怠惶,那么這些任務(wù)將會(huì)排隊(duì),每個(gè)任務(wù)都會(huì)在下一個(gè)任務(wù)開始之前運(yùn)行結(jié)束轧粟,所有的任務(wù)將會(huì)使用相同的線程策治。下面是Executors中的newSingleThreadExecutor()的源代碼:
publicstaticExecutorServicenewSingleThreadExecutor() {returnnewFinalizableDelegatedExecutorService? ? ? ? ? ? ? (newThreadPoolExecutor(1,1,0L, TimeUnit.MILLISECONDS,newLinkedBlockingQueue()));? ? ? }
1
2
3
4
4>newScheduledThreadPool創(chuàng)建一個(gè)固定長度的線程池,而且以延遲或定時(shí)的方式來執(zhí)行任務(wù)兰吟。?
通過如上配置的線程池的創(chuàng)建方法源代碼通惫,我們可以發(fā)現(xiàn):?
1> 除了CachedThreadPool使用的是直接提交策略的緩沖隊(duì)列以外,其余兩個(gè)用的采用的都是無界緩沖隊(duì)列混蔼,也就說讽膏,F(xiàn)ixedThreadPool和SingleThreadExecutor創(chuàng)建的線程數(shù)量就不會(huì)超過 corePoolSize。?
2> 我們可以再來看看三個(gè)線程池采用的ThreadPoolExecutor構(gòu)造方法都是同一個(gè)拄丰,使用的都是默認(rèn)的ThreadFactory和handler:
privatestaticfinalRejectedExecutionHandler defaultHandler =newAbortPolicy();publicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,? ? ? ? ? ? ? ? ? ? ? TimeUnit unit,? ? ? ? ? ? ? ? ? ? ? BlockingQueue workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,? ? ? ? ? Executors.defaultThreadFactory(), defaultHandler);? }
1
2
3
4
5
6
7
8
9
10
也就說三個(gè)線程池創(chuàng)建的線程對象都是同組府树,優(yōu)先權(quán)等級(jí)為正常的Thread.NORM_PRIORITY(5)的非守護(hù)線程,使用的被拒絕任務(wù)處理方式是直接拋出異常的AbortPolicy策略(前面有介紹)料按。
2奄侠、ExecutorService任務(wù)周期管理接口
Executor的實(shí)現(xiàn)通常都會(huì)創(chuàng)建線程來執(zhí)行任務(wù),但是使用異步方式來執(zhí)行任務(wù)時(shí)载矿,由于之前提交任務(wù)的狀態(tài)不是立即可見的垄潮,所以如果要關(guān)閉應(yīng)用程序時(shí),就需要將受影響的任務(wù)狀態(tài)反饋給應(yīng)用程序闷盔。
為了解決執(zhí)行服務(wù)的生命周期問題弯洗,Executor擴(kuò)展了EecutorService接口,添加了一些用于生命周期管理的方法逢勾。如下:
publicinterfaceExecutorServiceextendsExecutor{voidshutdown();? ? ? List shutdownNow();booleanisShutdown();booleanisTerminated();booleanawaitTermination(longtimeout, TimeUnit unit)throwsInterruptedException;// 省略部分方法? }
1
2
3
4
5
6
7
8
3牡整、ThreadPoolExecutor線程池實(shí)現(xiàn)類
先來看一下這個(gè)類中定義的重要變量,如下:
privatefinalBlockingQueue workQueue;// 阻塞隊(duì)列? privatefinalReentrantLock mainLock =newReentrantLock();// 互斥鎖? privatefinalHashSet workers =newHashSet();// 線程集合.一個(gè)Worker對應(yīng)一個(gè)線程? privatefinalCondition termination = mainLock.newCondition();// 終止條件? privateintlargestPoolSize;// 線程池中線程數(shù)量曾經(jīng)達(dá)到過的最大值溺拱。? privatelongcompletedTaskCount;// 已完成任務(wù)數(shù)量? privatevolatileThreadFactory threadFactory;// ThreadFactory對象逃贝,用于創(chuàng)建線程。? privatevolatileRejectedExecutionHandler handler;// 拒絕策略的處理句柄? privatevolatilelongkeepAliveTime;// 線程池維護(hù)線程所允許的空閑時(shí)間? privatevolatilebooleanallowCoreThreadTimeOut;privatevolatileintcorePoolSize;// 線程池維護(hù)線程的最小數(shù)量迫摔,哪怕是空閑的? privatevolatileintmaximumPoolSize;// 線程池維護(hù)的最大線程數(shù)量?
1
2
3
4
5
6
7
8
9
10
11
12
其中有幾個(gè)重要的規(guī)則需要說明一下:?
1> corePoolSize與maximumPoolSize?由于ThreadPoolExecutor 將根據(jù) corePoolSize和 maximumPoolSize設(shè)置的邊界自動(dòng)調(diào)整池大小沐扳,當(dāng)新任務(wù)在方法 execute(java.lang.Runnable) 中提交時(shí):?
(1)如果運(yùn)行的線程少于 corePoolSize,則創(chuàng)建新線程來處理請求句占,即使其他輔助線程是空閑的沪摄;?
(2)如果設(shè)置的corePoolSize 和 maximumPoolSize相同,則創(chuàng)建的線程池是大小固定的纱烘,如果運(yùn)行的線程與corePoolSize相同杨拐,當(dāng)有新請求過來時(shí),若workQueue未滿凹炸,則將請求放入workQueue中戏阅,等待有空閑的線程去從workQueue中取任務(wù)并處理?
(3)如果運(yùn)行的線程多于 corePoolSize 而少于 maximumPoolSize,則僅當(dāng)隊(duì)列滿時(shí)才創(chuàng)建新線程才創(chuàng)建新的線程去處理請求啤它;?
(4)如果運(yùn)行的線程多于corePoolSize 并且等于maximumPoolSize奕筐,若隊(duì)列已經(jīng)滿了,則通過handler所指定的策略來處理新請求变骡;?
(5)如果將 maximumPoolSize 設(shè)置為基本的無界值(如 Integer.MAX_VALUE)离赫,則允許池適應(yīng)任意數(shù)量的并發(fā)任務(wù)?
也就是說,處理任務(wù)的優(yōu)先級(jí)為:?
(1) 核心線程corePoolSize > 任務(wù)隊(duì)列workQueue > 最大線程maximumPoolSize塌碌,如果三者都滿了渊胸,使用handler處理被拒絕的任務(wù)。?
(2)當(dāng)池中的線程數(shù)大于corePoolSize的時(shí)候台妆,多余的線程會(huì)等待keepAliveTime長的時(shí)間翎猛,如果無請求可處理就自行銷毀胖翰。?
2> workQueue?線程池所使用的緩沖隊(duì)列,該緩沖隊(duì)列的長度決定了能夠緩沖的最大數(shù)量切厘,緩沖隊(duì)列有三種通用策略:?
1) 直接提交萨咳。工作隊(duì)列的默認(rèn)選項(xiàng)是 SynchronousQueue,它將任務(wù)直接提交給線程而不保持它們疫稿。在此培他,如果不存在可用于立即運(yùn)行任務(wù)的線程,則試圖把任務(wù)加入隊(duì)列將失敗遗座,因此會(huì)構(gòu)造一個(gè)新的線程舀凛。此策略可以避免在處理可能具有內(nèi)部依賴性的請求集時(shí)出現(xiàn)鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務(wù)途蒋。當(dāng)命令以超過隊(duì)列所能處理的平均數(shù)連續(xù)到達(dá)時(shí)猛遍,此策略允許無界線程具有增長的可能性;?
2) 無界隊(duì)列。使用無界隊(duì)列(例如碎绎,不具有預(yù)定義容量的 LinkedBlockingQueue)將導(dǎo)致在所有 corePoolSize 線程都忙時(shí)新任務(wù)在隊(duì)列中等待螃壤。這樣,創(chuàng)建的線程就不會(huì)超過 corePoolSize筋帖。(因此奸晴,maximumPoolSize 的值也就無效了。)當(dāng)每個(gè)任務(wù)完全獨(dú)立于其他任務(wù)日麸,即任務(wù)執(zhí)行互不影響時(shí)寄啼,適合于使用無界隊(duì)列;例如代箭,在 Web 頁服務(wù)器中墩划。這種排隊(duì)可用于處理瞬態(tài)突發(fā)請求,當(dāng)命令以超過隊(duì)列所能處理的平均數(shù)連續(xù)到達(dá)時(shí)嗡综,此策略允許無界線程具有增長的可能性;?
3>ThreadFactory?使用 ThreadFactory 創(chuàng)建新線程乙帮。如果沒有另外說明,則在同一個(gè) ThreadGroup 中一律使用 Executors.defaultThreadFactory() 創(chuàng)建線程极景,并且這些線程具有相同的 NORM_PRIORITY 優(yōu)先級(jí)和非守護(hù)進(jìn)程狀態(tài)察净。通過提供不同的 ThreadFactory,可以改變線程的名稱盼樟、線程組氢卡、優(yōu)先級(jí)、守護(hù)進(jìn)程狀態(tài)等等晨缴。如果從 newThread 返回 null 時(shí) ThreadFactory 未能創(chuàng)建線程译秦,則執(zhí)行程序?qū)⒗^續(xù)運(yùn)行,但不能執(zhí)行任何任務(wù)。
publicinterfaceThreadFactory{Thread newThread(Runnable r);? }
1
2
3
而構(gòu)造方法中的threadFactory對象筑悴,是通過 Executors.defaultThreadFactory()返回的们拙。Executors.java中的defaultThreadFactory()源碼如下:
publicstaticThreadFactorydefaultThreadFactory() {returnnewDefaultThreadFactory();? }
1
2
3
在DefaultThreadFactory類中實(shí)現(xiàn)了ThreadFactory接口并對其中定義的方法進(jìn)行了實(shí)現(xiàn),如下:
staticclass DefaultThreadFactory implements ThreadFactory {privatestaticfinalAtomicInteger poolNumber =newAtomicInteger(1);privatefinalThreadGroup group;privatefinalAtomicInteger threadNumber =newAtomicInteger(1);privatefinalString namePrefix;? ? ? DefaultThreadFactory() {? ? ? ? ? SecurityManager s = System.getSecurityManager();? ? ? ? ? group = (s !=null) ? s.getThreadGroup() :? Thread.currentThread().getThreadGroup();? ? ? ? ? namePrefix ="pool-"+? poolNumber.getAndIncrement() +"-thread-";? ? ? }// 為線程池創(chuàng)建新的任務(wù)執(zhí)行線程? publicThreadnewThread(Runnable r) {// 線程對應(yīng)的任務(wù)是Runnable對象r? Thread t =newThread(group, r,namePrefix + threadNumber.getAndIncrement(),0);// 設(shè)為非守護(hù)線程? if(t.isDaemon())? ? ? ? ? ? ? t.setDaemon(false);// 將優(yōu)先級(jí)設(shè)為Thread.NORM_PRIORITY? if(t.getPriority() != Thread.NORM_PRIORITY)? ? ? ? ? ? ? t.setPriority(Thread.NORM_PRIORITY);returnt;? ? ? }? }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
4>RejectedExecutionHandler?
當(dāng)Executor已經(jīng)關(guān)閉(即執(zhí)行了executorService.shutdown()方法后)阁吝,并且Executor將有限邊界用于最大線程和工作隊(duì)列容量睛竣,且已經(jīng)飽和時(shí),在方法execute()中提交的新任務(wù)將被拒絕.?
在以上述情況下求摇,execute 方法將調(diào)用其 RejectedExecutionHandler 的RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。下面提供了四種預(yù)定義的處理程序策略:?
1) 在默認(rèn)的 ThreadPoolExecutor.AbortPolicy 處理程序遭到拒絕將拋出運(yùn)行時(shí) RejectedExecutionException;?
2) 在 ThreadPoolExecutor.CallerRunsPolicy 線程調(diào)用運(yùn)行該任務(wù)的 execute 本身殊者。此策略提供簡單的反饋控制機(jī)制与境,能夠減緩新任務(wù)的提交速度?
3) 在 ThreadPoolExecutor.DiscardPolicy 不能執(zhí)行的任務(wù)將被刪除;?
4) 在 ThreadPoolExecutor.DiscardOldestPolicy 如果執(zhí)行程序尚未關(guān)閉,則位于工作隊(duì)列頭部的任務(wù)將被刪除猖吴,然后重試執(zhí)行程序(如果再次失敗摔刁,則重復(fù)此過程)。?
線程池默認(rèn)會(huì)采用的是defaultHandler策略海蔽。首先看defaultHandler的定義:
privatestaticfinalRejectedExecutionHandler defaultHandler =newAbortPolicy();// 使用默認(rèn)的拒絕策略?
1
publicstaticclassAbortPolicyimplementsRejectedExecutionHandler{publicAbortPolicy() { }// 拋出異常? publicvoidrejectedExecution(Runnable r, ThreadPoolExecutor e) {thrownewRejectedExecutionException("Task "+ r.toString() +" rejected from "+? e.toString());? ? ? }? }
1
2
3
4
5
6
7
看一下其他拒絕策略的具體實(shí)現(xiàn)共屈。
class MyRunnable implements Runnable {privateString name;publicMyRunnable(String name) {this.name = name;? ? ? }@Overridepublicvoidrun() {try{? ? ? ? ? ? ? System.out.println(this.name +" is running.");? ? ? ? ? ? ? Thread.sleep(100);? ? ? ? ? }catch(Exception e) {? ? ? ? ? ? ? e.printStackTrace();? ? ? ? ? }? ? ? }? }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
如上是一個(gè)測試任務(wù)的例子,下面編寫4個(gè)測試用例來測試党窜。?
1.DiscardPolicy 示例
publicclassDiscardPolicyDemo{privatestaticfinalintTHREADS_SIZE =1;privatestaticfinalintCAPACITY =1;publicstaticvoidmain(String[] args)throwsException {// 創(chuàng)建線程池拗引。線程池的"最大池大小"和"核心池大小"都為1(THREADS_SIZE),"線程池"的阻塞隊(duì)列容量為1(CAPACITY)幌衣。? ThreadPoolExecutor pool =newThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE,0, TimeUnit.SECONDS,newArrayBlockingQueue(CAPACITY));// 設(shè)置線程池的拒絕策略為"丟棄"? pool.setRejectedExecutionHandler(newThreadPoolExecutor.DiscardPolicy());// 新建10個(gè)任務(wù)矾削,并將它們添加到線程池中。? for(inti =0; i <10; i++) {? ? ? ? ? ? ? Runnable myrun =newMyRunnable("task-"+i);? ? ? ? ? ? ? pool.execute(myrun);? ? ? ? ? }// 關(guān)閉線程池? pool.shutdown();? ? ? }? }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
線程池pool的”最大池大小”和”核心池大小”都為1(THREADS_SIZE)豁护,這意味著”線程池能同時(shí)運(yùn)行的任務(wù)數(shù)量最大只能是1”哼凯。?
線程池pool的阻塞隊(duì)列是ArrayBlockingQueue,ArrayBlockingQueue是一個(gè)有界的阻塞隊(duì)列楚里,ArrayBlockingQueue的容量為1断部。這也意味著線程池的阻塞隊(duì)列只能有一個(gè)線程池阻塞等待。?
根據(jù)”“中分析的execute()代碼可知:線程池中共運(yùn)行了2個(gè)任務(wù)班缎。第1個(gè)任務(wù)直接放到Worker中充易,通過線程去執(zhí)行;第2個(gè)任務(wù)放到阻塞隊(duì)列中等待滥朱。其他的任務(wù)都被丟棄了妆兑!
2.DiscardOldestPolicy 示例
publicclassDiscardOldestPolicyDemo{privatestaticfinalintTHREADS_SIZE =1;privatestaticfinalintCAPACITY =1;publicstaticvoidmain(String[] args)throwsException {// 創(chuàng)建線程池。線程池的"最大池大小"和"核心池大小"都為1(THREADS_SIZE)苏携,"線程池"的阻塞隊(duì)列容量為1(CAPACITY)做瞪。? ThreadPoolExecutor pool =newThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE,0, TimeUnit.SECONDS,newArrayBlockingQueue(CAPACITY));// 設(shè)置線程池的拒絕策略為"DiscardOldestPolicy"? pool.setRejectedExecutionHandler(newThreadPoolExecutor.DiscardOldestPolicy());// 新建10個(gè)任務(wù),并將它們添加到線程池中。? for(inti =0; i <10; i++) {? ? ? ? ? ? ? Runnable myrun =newMyRunnable("task-"+i);? ? ? ? ? ? ? pool.execute(myrun);? ? ? ? ? }// 關(guān)閉線程池? pool.shutdown();? ? ? }? }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
運(yùn)行結(jié)果:
task-0isrunning.task-9isrunning.
1
2
將”線程池的拒絕策略”由DiscardPolicy修改為DiscardOldestPolicy之后装蓬,當(dāng)有任務(wù)添加到線程池被拒絕時(shí)著拭,線程池會(huì)丟棄阻塞隊(duì)列中末尾的任務(wù),然后將被拒絕的任務(wù)添加到末尾牍帚。
3.AbortPolicy 示例
publicclassAbortPolicyDemo{privatestaticfinalintTHREADS_SIZE =1;privatestaticfinalintCAPACITY =1;publicstaticvoidmain(String[] args)throwsException {// 創(chuàng)建線程池儡遮。線程池的"最大池大小"和"核心池大小"都為1(THREADS_SIZE),"線程池"的阻塞隊(duì)列容量為1(CAPACITY)暗赶。? ThreadPoolExecutor pool =newThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE,0, TimeUnit.SECONDS,newArrayBlockingQueue(CAPACITY));// 設(shè)置線程池的拒絕策略為"拋出異常"? pool.setRejectedExecutionHandler(newThreadPoolExecutor.AbortPolicy());try{// 新建10個(gè)任務(wù)鄙币,并將它們添加到線程池中。? for(inti =0; i <10; i++) {? ? ? ? ? ? ? ? ? Runnable myrun =newMyRunnable("task-"+i);? ? ? ? ? ? ? ? ? pool.execute(myrun);? ? ? ? ? ? ? }? ? ? ? ? }catch(RejectedExecutionException e) {? ? ? ? ? ? ? e.printStackTrace();// 關(guān)閉線程池? pool.shutdown();? ? ? ? ? }? ? ? }? }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
(某一次)運(yùn)行結(jié)果:
java.util.concurrent.RejectedExecutionExceptionat java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1774)? ? at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:768)? ? at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:656)? ? at AbortPolicyDemo.main(AbortPolicyDemo.java:27)task-0is running.task-1is running.
1
2
3
4
5
6
7
8
將”線程池的拒絕策略”由DiscardPolicy修改為AbortPolicy之后蹂随,當(dāng)有任務(wù)添加到線程池被拒絕時(shí)十嘿,會(huì)拋出RejectedExecutionException。
4.CallerRunsPolicy 示例
publicclassCallerRunsPolicyDemo{privatestaticfinalintTHREADS_SIZE =1;privatestaticfinalintCAPACITY =1;publicstaticvoidmain(String[] args)throwsException {// 創(chuàng)建線程池岳锁。線程池的"最大池大小"和"核心池大小"都為1(THREADS_SIZE)绩衷,"線程池"的阻塞隊(duì)列容量為1(CAPACITY)。? ThreadPoolExecutor pool =newThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE,0, TimeUnit.SECONDS,newArrayBlockingQueue(CAPACITY));// 設(shè)置線程池的拒絕策略為"CallerRunsPolicy"? pool.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy());// 新建10個(gè)任務(wù)激率,并將它們添加到線程池中咳燕。? for(inti =0; i <10; i++) {? ? ? ? ? ? ? Runnable myrun =newMyRunnable("task-"+i);? ? ? ? ? ? ? pool.execute(myrun);? ? ? ? ? }// 關(guān)閉線程池? pool.shutdown();? ? ? }? }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
(某一次)運(yùn)行結(jié)果:
task-2isrunning.task-3isrunning.task-4isrunning.task-5isrunning.task-6isrunning.task-7isrunning.task-8isrunning.task-9isrunning.task-0isrunning.task-1isrunning.
1
2
3
4
5
6
7
8
9
10
將”線程池的拒絕策略”由DiscardPolicy修改為CallerRunsPolicy之后,當(dāng)有任務(wù)添加到線程池被拒絕時(shí)乒躺,線程池會(huì)將被拒絕的任務(wù)添加到”線程池正在運(yùn)行的線程”中取運(yùn)行招盲。
轉(zhuǎn)自這里:
線程池能夠復(fù)用線程,減少線程創(chuàng)建聪蘸,銷毀宪肖,恢復(fù)等狀態(tài)切換的開銷,提高程序的性能健爬。一個(gè)線程池管理了一組工作線程控乾,同時(shí)它還包括了一個(gè)用于放置等待執(zhí)行的任務(wù)的隊(duì)列。?
ThreadPoolExecutor類中定義了一些與線程狀態(tài)與活動(dòng)線程數(shù)相關(guān)的一些變量娜遵,如下:
privatefinalAtomicInteger ctl =newAtomicInteger(ctlOf(RUNNING,0));// 將整型的24位分為高3位和低29位蜕衡,高3位表示線程池的狀態(tài),低29位表示活動(dòng)的線程數(shù)? privatestaticfinalintCOUNT_BITS = Integer.SIZE -3;privatestaticfinalintCAPACITY? = (1<< COUNT_BITS) -1;// 29位能表示的最大二進(jìn)制整數(shù),也就是活動(dòng)線程數(shù)? // 高3位數(shù)值代表的線程池狀態(tài)? privatestaticfinalintRUNNING? ? = -1<< COUNT_BITS;// running 線程池能接受新任務(wù)? privatestaticfinalintSHUTDOWN? =0<< COUNT_BITS;// shutdown 線程池不再接受新任務(wù)? privatestaticfinalintSTOP? ? ? =1<< COUNT_BITS;// stop 線程池不再接受新任務(wù)设拟,不再執(zhí)行隊(duì)列中的任務(wù)慨仿,而且要中斷正在處理的任務(wù)? privatestaticfinalintTIDYING? ? =2<< COUNT_BITS;// tidying 線程池所有任務(wù)均已終止? privatestaticfinalintTERMINATED =3<< COUNT_BITS;// terminated terminated()方法執(zhí)行結(jié)束?
1
2
3
4
5
6
7
8
9
10
11
由如上可知:?
ctl是一個(gè)AtomicInteger類型的原子對象。ctl記錄了”線程池中的任務(wù)數(shù)量”和”線程池狀態(tài)”2個(gè)信息纳胧。ctl共包括32位镰吆。其中,高3位表示”線程池狀態(tài)”跑慕,低29位表示”線程池中的任務(wù)數(shù)量”万皿。
RUNNING--對應(yīng)的高3位值是111SHUTDOWN--對應(yīng)的高3位值是000STOP--對應(yīng)的高3位值是001TIDYING--對應(yīng)的高3位值是010TERMINATED--對應(yīng)的高3位值是011
1
2
3
4
5
線程池各個(gè)狀態(tài)之間的切換如下圖所示:?
線程池各個(gè)狀態(tài)間的轉(zhuǎn)換的詳細(xì)解釋如下所示摧找。?
1> RUNNING(111) -> SHUTDOWN(000) : 調(diào)用了shutdown方法,線程池實(shí)現(xiàn)了finalize方法牢硅,在里面調(diào)用了shutdown方法蹬耘,因此shutdown可能是在finalize中被隱式調(diào)用的?
2> (RUNNING(111) or SHUTDOWN(000)) -> STOP(001) 調(diào)用了shutdownNow方法
3> SHUTDOWN(000) -> TIDYING(010) : 當(dāng)隊(duì)列和線程池均為空的時(shí)候
4> STOP(001) -> TIDYING(010) : 當(dāng)線程池為空的時(shí)候
5> TIDYING(010) -> TERMINATED(011) : terminated()方法調(diào)用完畢
說明:擴(kuò)號(hào)后的3位數(shù)字表示ctl的高3位二進(jìn)制值,并不關(guān)注低29位二進(jìn)制的值
還有一些對常量的操作方法减余,只說明部分综苔,其他的有興趣自己可以去查看,如下:
privatestaticintrunStateOf(intc)? ? {returnc & ~CAPACITY; }// 得到線程運(yùn)行狀態(tài)? privatestaticintworkerCountOf(intc)? {returnc & CAPACITY; }// 得到活動(dòng)線程數(shù)? privatestaticintctlOf(intrs,intwc) {returnrs | wc; }// 得到兩者表示的值?
1
2
3
來看一下ThreadPoolExecutor()中最主要的一個(gè)構(gòu)造函數(shù),如下:
publicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,? ? ? ? ? ? ? ? ? ? ? ? ? ? TimeUnit unit,? ? ? ? ? ? ? ? ? ? ? ? ? ? BlockingQueue workQueue,? ? ? ? ? ? ? ? ? ? ? ? ? ? ThreadFactory threadFactory,? ? ? ? ? ? ? ? ? ? ? ? ? ? RejectedExecutionHandler handler) {if(corePoolSize <0|| maximumPoolSize <=0|| maximumPoolSize < corePoolSize ||? keepAliveTime <0)thrownewIllegalArgumentException();if(workQueue ==null|| threadFactory ==null|| handler ==null)thrownewNullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;? }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
調(diào)用Executors方法中的幾個(gè)方法位岔,如newCachedThreadPool()如筛、newFixedThreadPool()時(shí),都會(huì)間接調(diào)用上面的構(gòu)造方法來初始化所有的線程池相關(guān)變量抒抬。
1妙黍、創(chuàng)建線程池并執(zhí)行任務(wù)
有了Executor對象后,就可以調(diào)用execute()方法執(zhí)行任務(wù)了瞧剖。方法的源代碼如下:
publicvoidexecute(Runnable command) {if(command ==null)// 任務(wù)為null,則拋出異常? thrownewNullPointerException();intc = ctl.get();// 取出記錄著runState和workerCount 的 ctl的當(dāng)前值? /*
? ? ? ? *? 通過workerCountOf方法從ctl所表示的int值中提取出低29位的值,也就是當(dāng)前活動(dòng)的線程數(shù)。如果當(dāng)前
? ? ? ? *? 活動(dòng)的線程數(shù)少于corePoolSize,則通過addWorker(command, true)新建一個(gè)線程,并將任務(wù)(command)
? ? ? ? *? 添加到該線程中
? ? ? ? */if(workerCountOf(c) < corePoolSize) {/*
? ? ? ? ? ? * addWorker()返回值表示:
? ? ? ? ? ? * 1可免、true 表示需要檢測當(dāng)前運(yùn)行的線程是否小于corePoolSize
? ? ? ? ? ? * 2抓于、false 表示需要檢測當(dāng)前運(yùn)行的線程數(shù)量是否小于maxPoolSize
? ? ? ? ? ? */if(addWorker(command,true))return;// 新線程創(chuàng)建成功,終止該方法的執(zhí)行? c = ctl.get();// 任務(wù)添加到線程失敗,取出記錄著runState和workerCount 的 ctl的當(dāng)前值? }/*
? ? ? ? * 方法解釋:
? ? ? ? * isRunning(c) 當(dāng)前線程池是否處于運(yùn)行狀態(tài)。源代碼是通過判斷c < SHUTDOWN 來確定返回值浇借。由于RUNNING才會(huì)接收新任務(wù)捉撮,且只有這個(gè)值-1才小于SHUTDOWN
? ? ? ? * workQueue.offer(command) 任務(wù)添加到緩沖隊(duì)列
? ? ? ? */if(isRunning(c) && workQueue.offer(command)) {// 當(dāng)前線程處于運(yùn)行狀態(tài)且成功添加到緩沖隊(duì)列? intrecheck = ctl.get();/*
? ? ? ? ? ? * 如果 線程池已經(jīng)處于非運(yùn)行狀態(tài),則從緩沖隊(duì)列中移除任務(wù)然后采用線程池指定的策略拒絕任務(wù)
? ? ? ? ? ? * 如果 線程池中任務(wù)數(shù)量為0,則通過addWorker(null, false)嘗試新建一個(gè)線程,新建線程對應(yīng)的任務(wù)為null
? ? ? ? ? ? */if(! isRunning(recheck) && remove(command))? ? ? ? ? ? ? ? ? ? ? reject(command);elseif(workerCountOf(recheck) ==0)// 得到活動(dòng)線程數(shù)為0? ? ? ? ? addWorker(null,false);? ? ? ? ? }/*
? ? ? ? * 當(dāng)不滿足以下兩個(gè)條件時(shí)執(zhí)行如下代碼:
? ? ? ? *? 1. 當(dāng)前線程池并不處于Running狀態(tài)? ?
? ? ? ? *? 2. 當(dāng)前線程池處于Running狀態(tài),但是緩沖隊(duì)列已經(jīng)滿了?
? ? ? ? */elseif(!addWorker(command,false))? ? ? ? ? ? ? reject(command);// 采用線程池指定的策略拒絕任務(wù)? }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
當(dāng)前活動(dòng)的線程小于corePoolSize了,那么等于和大于corePoolSize怎么處理呢妇垢?
1> 當(dāng)前活動(dòng)的線程數(shù)量 >= corePoolSize 的時(shí)候巾遭,都是優(yōu)先添加到隊(duì)列中,直到隊(duì)列滿了才會(huì)去創(chuàng)建新的線程闯估,在這里第27 行的if語句已經(jīng)體現(xiàn)出來了灼舍。這里利用了&&的特性,只有當(dāng)?shù)谝粋€(gè)條件會(huì)真時(shí)才會(huì)去判斷第二個(gè)條件涨薪,第一個(gè)條件是isRunning()骑素,判斷線程池是否處于RUNNING狀態(tài),因?yàn)橹挥性谶@個(gè)狀態(tài)下才會(huì)接受新任務(wù)刚夺,否則就拒絕献丑,如果正處于RUNNING狀態(tài),那么就加入隊(duì)列侠姑,如果加入失敗可能就是隊(duì)列已經(jīng)滿了创橄,這時(shí)候直接執(zhí)行第29行。
2> 在execute()方法中莽红,當(dāng) 當(dāng)前活動(dòng)的線程數(shù)量 < corePoolSize 時(shí)妥畏,會(huì)執(zhí)行addWorker()方法,關(guān)于addWorker(),它是用來直接新建線程用的咖熟,之所以叫addWorker而不是addThread是因?yàn)樵诰€程池中圃酵,所有的線程都用一個(gè)Worker對象包裝著,來看一下這個(gè)方法:
/**? * 創(chuàng)建并執(zhí)行新線程? * @paramfirstTack 用于指定新增的線程執(zhí)行的第一個(gè)任務(wù)? *? * @paramcore? ? ? true表示在新增線程時(shí)會(huì)判斷當(dāng)前活動(dòng)線程數(shù)是否少于corePoolSize馍管,? *? ? ? ? ? ? ? ? ? false表示新增線程前需要判斷當(dāng)前活動(dòng)線程數(shù)是否少于maximumPoolSize? *? * @return是否成功新增一個(gè)線程? */privatebooleanaddWorker(Runnable firstTask,booleancore) {? ? ? ? retry:for(;;) {intc = ctl.get();// 獲取記錄著runState和workCount的int變量的當(dāng)前值? intrs = runStateOf(c);// 獲取當(dāng)前線程池運(yùn)行的狀態(tài)? /*
? ? ? ? ? ? 這個(gè)條件代表著以下幾個(gè)情景郭赐,就直接返回false說明線程創(chuàng)建失敗:
? ? ? ? ? ? 1.rs > SHUTDOWN确沸; 此時(shí)不再接收新任務(wù)捌锭,且所有的任務(wù)已經(jīng)執(zhí)行完畢
? ? ? ? ? ? 2.rs = SHUTDOWN; 此時(shí)不再接收新任務(wù)罗捎,但是會(huì)執(zhí)行隊(duì)列中的任務(wù)观谦,在后面的或語句中,第一個(gè)不成立桨菜,firstTask != null成立
? ? ? ? ? ? 3.rs = SHUTDOWN豁状;此時(shí)不再接收新任務(wù),fistTask == null,任務(wù)隊(duì)列workQueue已經(jīng)空了
? ? ? ? ? */if(rs >= SHUTDOWN &&? ? ? ? ? ? ? ? ! (rs == SHUTDOWN &&? ? ? ? ? ? ? ? ? firstTask ==null&&? ? ? ? ? ? ? ? ? ! workQueue.isEmpty()))returnfalse;for(;;) {//獲取當(dāng)前活動(dòng)的線程數(shù)? intwc = workerCountOf(c);//先判斷當(dāng)前活動(dòng)的線程數(shù)是否大于最大值倒得,如果超過了就直接返回false說明線程創(chuàng)建失敗? //如果沒有超過再根據(jù)core的值再進(jìn)行以下判斷? /*
? ? ? ? ? ? ? ? ? 1.core為true泻红,則判斷當(dāng)前活動(dòng)的線程數(shù)是否大于corePoolSize
? ? ? ? ? ? ? ? ? 2.core為false,則判斷當(dāng)前活動(dòng)線程數(shù)是否大于maximumPoolSize
? ? ? ? ? ? ? */if(wc >= CAPACITY ||? ? ? ? ? ? ? ? ? ? wc >= (core ? corePoolSize : maximumPoolSize))returnfalse;//比較當(dāng)前值是否和c相同霞掺,如果相同谊路,則改為c+1,并且跳出大循環(huán)菩彬,直接執(zhí)行Worker進(jìn)行線程創(chuàng)建? if(compareAndIncrementWorkerCount(c))breakretry;? ? ? ? ? ? ? ? c = ctl.get();// 獲取ctl的當(dāng)前值? if(runStateOf(c) != rs)//檢查下當(dāng)前線程池的狀態(tài)是否已經(jīng)發(fā)生改變? continueretry;//如果已經(jīng)改變了缠劝,則進(jìn)行外層retry大循環(huán),否則只進(jìn)行內(nèi)層的循環(huán)? // else CAS failed due to workerCount change; retry inner loop? }? ? ? ? }//下面這里就是開始創(chuàng)建新的線程了? //Worker的也是Runnable的實(shí)現(xiàn)類? Worker w =newWorker(firstTask);//因?yàn)椴豢梢灾苯釉赪orker的構(gòu)造方法中進(jìn)行線程創(chuàng)建? //所以要把它的引用賦給t方便后面進(jìn)行線程創(chuàng)建? Thread t = w.thread;finalReentrantLock mainLock =this.mainLock;? ? ? ? mainLock.lock();try{//再次取出ctl的當(dāng)前值骗灶,用于進(jìn)行狀態(tài)的檢查惨恭,防止線程池的已經(jīng)狀態(tài)改變了? intc = ctl.get();intrs = runStateOf(c);//將if語句中的條件轉(zhuǎn)換為一個(gè)等價(jià)實(shí)現(xiàn) :t == null || (rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null))? //有個(gè)t == null是因?yàn)槿绻褂玫氖悄J(rèn)的ThreadFactory的話,那么它的newThread()可能會(huì)返回null? /*
? ? ? ? ? ? 1. 如果t == null, 則減少一個(gè)線程數(shù)耙旦,如果線程池處于的狀態(tài) > SHUTDOWN,則嘗試終止線程池
? ? ? ? ? ? 2. 如果t 喉恋!= null,且rs == SHUTDOWN母廷,則不再接收新任務(wù)轻黑,若firstTask != null,則此時(shí)也是返回false琴昆,創(chuàng)建線程失敗
? ? ? ? ? ? 3. 如果t 氓鄙!= null, 且rs > SHUTDOWN,同樣不再接受新任務(wù)业舍,此時(shí)也是返回false抖拦,創(chuàng)建線程失敗
? ? ? ? ? */if(t ==null||? ? ? ? ? ? ? ? (rs >= SHUTDOWN &&? ? ? ? ? ? ? ? ! (rs == SHUTDOWN &&? ? ? ? ? ? ? ? ? ? firstTask ==null))) {? ? ? ? ? ? ? ? decrementWorkerCount();//減少一個(gè)活動(dòng)的當(dāng)前線程數(shù)? tryTerminate();//嘗試終止線程池? returnfalse;//返回線程創(chuàng)建失敗? }? ? ? ? ? ? workers.add(w);//將創(chuàng)建的線程添加到workers容器中? ints = workers.size();//獲取當(dāng)前線程活動(dòng)的數(shù)量? if(s > largestPoolSize)//判斷當(dāng)前線程活動(dòng)的數(shù)量是否超過線程池最大的線程數(shù)量? largestPoolSize = s;//當(dāng)池中的工作線程創(chuàng)新高時(shí)升酣,會(huì)將這個(gè)數(shù)記錄到largestPoolSize字段中。然后就可以啟動(dòng)這個(gè)線程t了? }finally{? ? ? ? ? ? mainLock.unlock();? ? ? ? }? ? ? ? t.start();//開啟線程? //若start后态罪,狀態(tài)又變成了SHUTDOWN狀態(tài)(如調(diào)用了shutdownNow方法)且新建的線程沒有被中斷過噩茄,? //就要中斷該線程(shutdownNow方法要求中斷正在執(zhí)行的線程),? //shutdownNow方法本身也會(huì)去中斷存儲(chǔ)在workers中的所有線程? if(runStateOf(ctl.get()) == STOP && ! t.isInterrupted())? ? ? ? ? ? t.interrupt();returntrue;? ? }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
那么在創(chuàng)建線程的時(shí)候复颈,線程執(zhí)行的是什么的呢绩聘?
我們前面提到Worker繼承的其實(shí)也是Runnable,它在創(chuàng)建線程的時(shí)候是以自身作為任務(wù)傳進(jìn)先創(chuàng)建的線程中的耗啦,這段比較簡單凿菩,我就不一一注釋了,只是給出源代碼給大家看吧帜讲。
Worker(Runnable firstTask) {this.firstTask = firstTask;//this指的是worker對象本身this.thread = getThreadFactory().newThread(this); }
1
2
3
4
5
它以自身的對象作為線程任務(wù)傳進(jìn)去衅谷,那么它的run方法又是怎樣的呢?
publicvoidrun() {? ? runWorker(this);}
1
2
3
竟然只有一句話調(diào)用runWorker()方法似将,這個(gè)可是重頭戲获黔,我們來看看,究竟運(yùn)行的是什么在验。
/**? * 執(zhí)行Worker中的任務(wù)肢执,它的執(zhí)行流程是這樣的:? * 若存在第一個(gè)任務(wù),則先執(zhí)行第一個(gè)任務(wù)译红,否則,從隊(duì)列中拿任務(wù)兴溜,不斷的執(zhí)行侦厚,? * 直到getTask()返回null或執(zhí)行任務(wù)出錯(cuò)(中斷或任務(wù)本身拋出異常),就退出while循環(huán)拙徽。? * @paramw woker? */finalvoidrunWorker(Worker w) {? ? ? ? ? Runnable task = w.firstTask;//將當(dāng)前Worker中的任務(wù)取出來交給task刨沦,并釋放掉w.firstTask占用的內(nèi)存? w.firstTask =null;//用于判斷線程是否由于異常終止,如果不是異常終止膘怕,在后面將會(huì)將該變量的值改為false? //該變量的值在processWorkerExit()會(huì)使用來判斷線程是否由于異常終止? booleancompletedAbruptly =true;try{//執(zhí)行任務(wù)想诅,直到getTask()返回的值為null,在此處就相當(dāng)于復(fù)用了線程岛心,讓線程執(zhí)行了多個(gè)任務(wù)? while(task !=null|| (task = getTask()) !=null) {? ? ? ? ? ? ? ? ? ? ? w.lock();? ? ? ? ? ? ? ? ? clearInterruptsForTaskRun();//對線程池狀態(tài)進(jìn)行一次判斷来破,后面我們會(huì)講解一下該方法? try{? ? ? ? ? ? ? ? ? ? ? beforeExecute(w.thread, task);//在任務(wù)執(zhí)行前需要做的邏輯方法,該方面可以由用戶進(jìn)行重寫自定義? Throwable thrown =null;try{? ? ? ? ? ? ? ? ? ? ? ? ? task.run();//開始執(zhí)行任務(wù)? }catch(RuntimeException x) {? ? ? ? ? ? ? ? ? ? ? ? ? thrown = x;throwx;? ? ? ? ? ? ? ? ? ? ? }catch(Error x) {? ? ? ? ? ? ? ? ? ? ? ? ? thrown = x;throwx;? ? ? ? ? ? ? ? ? ? ? }catch(Throwable x) {? ? ? ? ? ? ? ? ? ? ? ? ? thrown = x;thrownewError(x);? ? ? ? ? ? ? ? ? ? ? }finally{? ? ? ? ? ? ? ? ? ? ? ? ? afterExecute(task, thrown);//在任務(wù)執(zhí)行后需要做的邏輯方法忘古,該方面可以由用戶進(jìn)行重寫自定義? }? ? ? ? ? ? ? ? ? }finally{? ? ? ? ? ? ? ? ? ? ? task =null;? ? ? ? ? ? ? ? ? ? ? ? ? w.completedTasks++;//增加該線程完成的任務(wù)? w.unlock();? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? }? ? ? ? ? ? ? completedAbruptly =false;//線程不是異常終止? }finally{? ? ? ? ? ? ? processWorkerExit(w, completedAbruptly);//結(jié)束該線程? }? ? ? }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
下面就是線程在執(zhí)行任務(wù)之前對線程池狀態(tài)的一次判斷:
/**? ? * 對線程的結(jié)束做一些清理和數(shù)據(jù)同步? ? * @paramw 封裝線程的Worker? ? * @paramcompletedAbruptly 表示該線程是否結(jié)束于異常? ? */privatevoidprocessWorkerExit(Worker w,booleancompletedAbruptly) {// 如果completedAbruptly值為true徘禁,則說明線程是結(jié)束于異常? //如果不是結(jié)束于異常,那么它降在runWorker方法的while循環(huán)中的getTask()方法中已經(jīng)減一了? if(completedAbruptly)? ? ? ? ? ? decrementWorkerCount();//此時(shí)將線程數(shù)量減一? finalReentrantLock mainLock =this.mainLock;? ? ? ? mainLock.lock();try{? ? ? ? ? ? completedTaskCount += w.completedTasks;//統(tǒng)計(jì)總共完成的任務(wù)數(shù)? workers.remove(w);//將該線程數(shù)從workers容器中移除? }finally{? ? ? ? ? ? mainLock.unlock();? ? ? ? }? ? ? ? tryTerminate();//嘗試終止線程池? intc = ctl.get();//接下來的這個(gè)if塊要做的事兒了髓堪。當(dāng)池的狀態(tài)還是RUNNING送朱,? //又要分兩種情況娘荡,一種是異常結(jié)束,一種是正常結(jié)束驶沼。異常結(jié)束比較好弄炮沐,直接加個(gè)線程替換死掉的線程就好了,? //也就是最后的addWorker操作? if(runStateLessThan(c, STOP)) {//如果當(dāng)前運(yùn)行狀態(tài)為RUNNING,SHUTDOWN? if(!completedAbruptly) {//如果線程不是結(jié)束于異常? intmin = allowCoreThreadTimeOut ?0: corePoolSize;//是否允許線程超時(shí)結(jié)束? if(min ==0&& ! workQueue.isEmpty())//如果允許把那個(gè)且隊(duì)列不為空? min =1;//至少要保留一個(gè)線程來完成任務(wù)? //如果當(dāng)前活動(dòng)的線程數(shù)大于等于最小的值? // 1.不允許核心線程超時(shí)結(jié)束回怜,則必須要使得活動(dòng)線程數(shù)超過corePoolSize數(shù)才可以? // 2. 允許核心線程超時(shí)結(jié)束大年,但是隊(duì)列中有任務(wù),必須留至少一個(gè)線程? if(workerCountOf(c) >= min)return;// replacement not needed? }//直接加個(gè)線程? addWorker(null,false);? ? ? ? ? ? }? ? }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
前面我們的方法遇見過很多次tryTerminate()方法鹉戚,到底他是怎樣嘗試結(jié)束線程池的呢鲜戒?
/**
* 執(zhí)行該方法,根據(jù)線程池狀態(tài)進(jìn)行? 判斷是否結(jié)束線程池
*/finalvoidtryTerminate() {for(;;) {intc = ctl.get();if(isRunning(c) ||//線程池正在運(yùn)行中抹凳,自然不能結(jié)束線程池啦? runStateAtLeast(c, TIDYING) ||//如果狀態(tài)為TIDYING或TERMINATED遏餐,池中的活動(dòng)線程數(shù)已經(jīng)是0,自然也不需要做什么操作了? (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))//線程池出于SHUTDOWN狀態(tài)赢底,但是任務(wù)隊(duì)列不為空失都,自然不能結(jié)束線程池啦? return;if(workerCountOf(c) !=0) {// Eligible to terminate? /*
? ? ? ? ? ? ? 調(diào)用這個(gè)方法的目的是將shutdown信號(hào)傳播給其它線程。
? ? ? ? ? ? ? 調(diào)用shutdown方法的時(shí)候會(huì)去中斷所有空閑線程幸冻,如果這時(shí)候池中所有的線程都正在執(zhí)行任務(wù)粹庞,
? ? ? ? ? ? ? 那么就不會(huì)有線程被中斷,調(diào)用shutdown方法只是設(shè)置了線程池的狀態(tài)為SHUTDOWN洽损,
? ? ? ? ? ? ? 在取任務(wù)(getTask,后面會(huì)細(xì)說)的時(shí)候庞溜,假如很多線程都發(fā)現(xiàn)隊(duì)列里還有任務(wù)(沒有使用鎖,存在競態(tài)條件)碑定,
? ? ? ? ? ? ? 然后都去調(diào)用take流码,如果任務(wù)數(shù)小于池中的線程數(shù),那么必然有方法調(diào)用take后會(huì)一直等待(shutdown的時(shí)候這些線程正在執(zhí)行任務(wù)延刘,
? ? ? ? ? ? ? 所以沒能調(diào)用它的interrupt漫试,其中斷狀態(tài)沒有被設(shè)置),那么在沒有任務(wù)且線程池的狀態(tài)為SHUTDWON的時(shí)候碘赖,
? ? ? ? ? ? ? 這些等待中的空閑線程就需要被終止iinterruptIdleWorkers(ONLY_ONE)回去中斷一個(gè)線程驾荣,讓其從take中退出,
? ? ? ? ? ? ? 然后這個(gè)線程也進(jìn)入同樣的邏輯普泡,去終止一個(gè)其它空閑線程播掷,直到池中的活動(dòng)線程數(shù)為0。
? ? ? ? ? ? */interruptIdleWorkers(ONLY_ONE);return;? ? ? ? ? }finalReentrantLock mainLock =this.mainLock;? ? ? ? ? mainLock.lock();try{/*
? ? ? ? ? ? ? 當(dāng)狀態(tài)為SHUTDOWN撼班,且活動(dòng)線程數(shù)為0的時(shí)候叮趴,就可以進(jìn)入TIDYING狀態(tài)了,
? ? ? ? ? ? ? 進(jìn)入TIDYING狀態(tài)就可以執(zhí)行方法terminated()权烧,
? ? ? ? ? ? ? 該方法執(zhí)行結(jié)束就進(jìn)入了TERMINATED狀態(tài)(參考前文中各狀態(tài)的含義以及可能的狀態(tài)轉(zhuǎn)變)
? ? ? ? ? ? */if(ctl.compareAndSet(c, ctlOf(TIDYING,0))) {try{? ? ? ? ? ? ? ? ? ? ? terminated();//執(zhí)行該方法眯亦,結(jié)束線程池? }finally{? ? ? ? ? ? ? ? ? ? ? ctl.set(ctlOf(TERMINATED,0));/*
? ? ? ? ? ? ? ? ? ? ? 當(dāng)線程池shutdown后伤溉,外部可能還有很多線程在等待線程池真正結(jié)束,
? ? ? ? ? ? ? ? ? ? ? 即調(diào)用了awaitTermination方法妻率,該方法中乱顾,外部線程就是在termination上await的,
? ? ? ? ? ? ? ? ? ? ? 所以宫静,線程池關(guān)閉之前要喚醒這些等待的線程走净,告訴它們線程池關(guān)閉結(jié)束了。
? ? ? ? ? ? ? ? ? ? */termination.signalAll();? ? ? ? ? ? ? ? ? }return;? ? ? ? ? ? ? }? ? ? ? ? }finally{? ? ? ? ? ? ? mainLock.unlock();? ? ? ? ? }// else retry on failed CAS? }? }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
關(guān)閉時(shí)使用shutdown()方法伏伯,源碼如下:
publicvoidshutdown() {finalReentrantLock mainLock =this.mainLock;? ? ? ? mainLock.lock();try{? ? ? ? ? ? checkShutdownAccess();// 檢查終止線程池的線程是否有權(quán)限。? advanceRunState(SHUTDOWN);// 設(shè)置線程池的狀態(tài)為關(guān)閉狀態(tài)捌袜。? interruptIdleWorkers();// 中斷線程池中空閑的線程? onShutdown();// 鉤子函數(shù)说搅,在ThreadPoolExecutor中沒有任何動(dòng)作? }finally{? ? ? ? ? ? mainLock.unlock();? ? ? ? }? ? ? ? tryTerminate();// 嘗試終止線程池? }