java多線程線程池钦铁,線程池原理

第一部分

來看一下線程池的框架圖,如下:?

1廓啊、Executor任務(wù)提交接口與Executors工具類

  Executor框架同java.util.concurrent.Executor 接口在Java 5中被引入。Executor框架是一個(gè)根據(jù)一組執(zhí)行策略調(diào)用封豪,調(diào)度谴轮,執(zhí)行和控制的異步任務(wù)的框架。Executor存在的目的是提供一種將”任務(wù)提交”與”任務(wù)如何運(yùn)行”分離開來的機(jī)制吹埠。定義如下:

publicinterfaceExecutor{voidexecute(Runnable command);? }

1

2

3

雖然只有一個(gè)方法第步,但是卻為靈活且強(qiáng)大的異步任務(wù)執(zhí)行框架提供了基礎(chǔ)。它提供了一種標(biāo)準(zhǔn)的方法將任務(wù)的提交過程與執(zhí)行過程解耦開來藻雌,并用Runnable來表示任務(wù)雌续。那么我們怎么得到Executor對象呢?這就是接下來要介紹的Exectors了胯杭。?

Executors為Executor驯杜,ExecutorService,ScheduledExecutorService做个,ThreadFactory和Callable類提供了一些工具方法鸽心,類似于集合中的Collections類的功能。Executors方便的創(chuàng)建線程池居暖。

1>newCachedThreadPool?:該線程池比較適合沒有固定大小并且比較快速就能完成的小任務(wù)顽频,它將為每個(gè)任務(wù)創(chuàng)建一個(gè)線程。那這樣子它與直接創(chuàng)建線程對象(new Thread())有什么區(qū)別呢太闺?看到它的第三個(gè)參數(shù)60L和第四個(gè)參數(shù)TimeUnit.SECONDS了嗎糯景?好處就在于60秒內(nèi)能夠重用已創(chuàng)建的線程。下面是Executors中的newCachedThreadPool()的源代碼:

publicstaticExecutorServicenewCachedThreadPool() {returnnewThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,newSynchronousQueue());? ? ? }

1

2

3

2> newFixedThreadPool使用的Thread對象的數(shù)量是有限的,如果提交的任務(wù)數(shù)量大于限制的最大線程數(shù)省骂,那么這些任務(wù)講排隊(duì)蟀淮,然后當(dāng)有一個(gè)線程的任務(wù)結(jié)束之后,將會(huì)根據(jù)調(diào)度策略繼續(xù)等待執(zhí)行下一個(gè)任務(wù)钞澳。下面是Executors中的newFixedThreadPool()的源代碼:

publicstaticExecutorServicenewFixedThreadPool(intnThreads) {returnnewThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,newLinkedBlockingQueue());? ? }

1

2

3

3>newSingleThreadExecutor就是線程數(shù)量為1的FixedThreadPool,如果提交了多個(gè)任務(wù)怠惶,那么這些任務(wù)將會(huì)排隊(duì),每個(gè)任務(wù)都會(huì)在下一個(gè)任務(wù)開始之前運(yùn)行結(jié)束轧粟,所有的任務(wù)將會(huì)使用相同的線程策治。下面是Executors中的newSingleThreadExecutor()的源代碼:

publicstaticExecutorServicenewSingleThreadExecutor() {returnnewFinalizableDelegatedExecutorService? ? ? ? ? ? ? (newThreadPoolExecutor(1,1,0L, TimeUnit.MILLISECONDS,newLinkedBlockingQueue()));? ? ? }

1

2

3

4

4>newScheduledThreadPool創(chuàng)建一個(gè)固定長度的線程池,而且以延遲或定時(shí)的方式來執(zhí)行任務(wù)兰吟。?

通過如上配置的線程池的創(chuàng)建方法源代碼通惫,我們可以發(fā)現(xiàn):?

1> 除了CachedThreadPool使用的是直接提交策略的緩沖隊(duì)列以外,其余兩個(gè)用的采用的都是無界緩沖隊(duì)列混蔼,也就說讽膏,F(xiàn)ixedThreadPool和SingleThreadExecutor創(chuàng)建的線程數(shù)量就不會(huì)超過 corePoolSize。?

2> 我們可以再來看看三個(gè)線程池采用的ThreadPoolExecutor構(gòu)造方法都是同一個(gè)拄丰,使用的都是默認(rèn)的ThreadFactory和handler:

privatestaticfinalRejectedExecutionHandler defaultHandler =newAbortPolicy();publicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,? ? ? ? ? ? ? ? ? ? ? TimeUnit unit,? ? ? ? ? ? ? ? ? ? ? BlockingQueue workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,? ? ? ? ? Executors.defaultThreadFactory(), defaultHandler);? }

1

2

3

4

5

6

7

8

9

10

  也就說三個(gè)線程池創(chuàng)建的線程對象都是同組府树,優(yōu)先權(quán)等級(jí)為正常的Thread.NORM_PRIORITY(5)的非守護(hù)線程,使用的被拒絕任務(wù)處理方式是直接拋出異常的AbortPolicy策略(前面有介紹)料按。

2奄侠、ExecutorService任務(wù)周期管理接口

  Executor的實(shí)現(xiàn)通常都會(huì)創(chuàng)建線程來執(zhí)行任務(wù),但是使用異步方式來執(zhí)行任務(wù)時(shí)载矿,由于之前提交任務(wù)的狀態(tài)不是立即可見的垄潮,所以如果要關(guān)閉應(yīng)用程序時(shí),就需要將受影響的任務(wù)狀態(tài)反饋給應(yīng)用程序闷盔。

  為了解決執(zhí)行服務(wù)的生命周期問題弯洗,Executor擴(kuò)展了EecutorService接口,添加了一些用于生命周期管理的方法逢勾。如下:

publicinterfaceExecutorServiceextendsExecutor{voidshutdown();? ? ? List shutdownNow();booleanisShutdown();booleanisTerminated();booleanawaitTermination(longtimeout, TimeUnit unit)throwsInterruptedException;// 省略部分方法? }

1

2

3

4

5

6

7

8

3牡整、ThreadPoolExecutor線程池實(shí)現(xiàn)類

  先來看一下這個(gè)類中定義的重要變量,如下:

privatefinalBlockingQueue workQueue;// 阻塞隊(duì)列? privatefinalReentrantLock mainLock =newReentrantLock();// 互斥鎖? privatefinalHashSet workers =newHashSet();// 線程集合.一個(gè)Worker對應(yīng)一個(gè)線程? privatefinalCondition termination = mainLock.newCondition();// 終止條件? privateintlargestPoolSize;// 線程池中線程數(shù)量曾經(jīng)達(dá)到過的最大值溺拱。? privatelongcompletedTaskCount;// 已完成任務(wù)數(shù)量? privatevolatileThreadFactory threadFactory;// ThreadFactory對象逃贝,用于創(chuàng)建線程。? privatevolatileRejectedExecutionHandler handler;// 拒絕策略的處理句柄? privatevolatilelongkeepAliveTime;// 線程池維護(hù)線程所允許的空閑時(shí)間? privatevolatilebooleanallowCoreThreadTimeOut;privatevolatileintcorePoolSize;// 線程池維護(hù)線程的最小數(shù)量迫摔,哪怕是空閑的? privatevolatileintmaximumPoolSize;// 線程池維護(hù)的最大線程數(shù)量?

1

2

3

4

5

6

7

8

9

10

11

12

其中有幾個(gè)重要的規(guī)則需要說明一下:?

1> corePoolSize與maximumPoolSize?由于ThreadPoolExecutor 將根據(jù) corePoolSize和 maximumPoolSize設(shè)置的邊界自動(dòng)調(diào)整池大小沐扳,當(dāng)新任務(wù)在方法 execute(java.lang.Runnable) 中提交時(shí):?

(1)如果運(yùn)行的線程少于 corePoolSize,則創(chuàng)建新線程來處理請求句占,即使其他輔助線程是空閑的沪摄;?

(2)如果設(shè)置的corePoolSize 和 maximumPoolSize相同,則創(chuàng)建的線程池是大小固定的纱烘,如果運(yùn)行的線程與corePoolSize相同杨拐,當(dāng)有新請求過來時(shí),若workQueue未滿凹炸,則將請求放入workQueue中戏阅,等待有空閑的線程去從workQueue中取任務(wù)并處理?

(3)如果運(yùn)行的線程多于 corePoolSize 而少于 maximumPoolSize,則僅當(dāng)隊(duì)列滿時(shí)才創(chuàng)建新線程才創(chuàng)建新的線程去處理請求啤它;?

(4)如果運(yùn)行的線程多于corePoolSize 并且等于maximumPoolSize奕筐,若隊(duì)列已經(jīng)滿了,則通過handler所指定的策略來處理新請求变骡;?

(5)如果將 maximumPoolSize 設(shè)置為基本的無界值(如 Integer.MAX_VALUE)离赫,則允許池適應(yīng)任意數(shù)量的并發(fā)任務(wù)?


也就是說,處理任務(wù)的優(yōu)先級(jí)為:?

(1) 核心線程corePoolSize > 任務(wù)隊(duì)列workQueue > 最大線程maximumPoolSize塌碌,如果三者都滿了渊胸,使用handler處理被拒絕的任務(wù)。?

(2)當(dāng)池中的線程數(shù)大于corePoolSize的時(shí)候台妆,多余的線程會(huì)等待keepAliveTime長的時(shí)間翎猛,如果無請求可處理就自行銷毀胖翰。?

2> workQueue?線程池所使用的緩沖隊(duì)列,該緩沖隊(duì)列的長度決定了能夠緩沖的最大數(shù)量切厘,緩沖隊(duì)列有三種通用策略:?

1) 直接提交萨咳。工作隊(duì)列的默認(rèn)選項(xiàng)是 SynchronousQueue,它將任務(wù)直接提交給線程而不保持它們疫稿。在此培他,如果不存在可用于立即運(yùn)行任務(wù)的線程,則試圖把任務(wù)加入隊(duì)列將失敗遗座,因此會(huì)構(gòu)造一個(gè)新的線程舀凛。此策略可以避免在處理可能具有內(nèi)部依賴性的請求集時(shí)出現(xiàn)鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務(wù)途蒋。當(dāng)命令以超過隊(duì)列所能處理的平均數(shù)連續(xù)到達(dá)時(shí)猛遍,此策略允許無界線程具有增長的可能性;?


2) 無界隊(duì)列。使用無界隊(duì)列(例如碎绎,不具有預(yù)定義容量的 LinkedBlockingQueue)將導(dǎo)致在所有 corePoolSize 線程都忙時(shí)新任務(wù)在隊(duì)列中等待螃壤。這樣,創(chuàng)建的線程就不會(huì)超過 corePoolSize筋帖。(因此奸晴,maximumPoolSize 的值也就無效了。)當(dāng)每個(gè)任務(wù)完全獨(dú)立于其他任務(wù)日麸,即任務(wù)執(zhí)行互不影響時(shí)寄啼,適合于使用無界隊(duì)列;例如代箭,在 Web 頁服務(wù)器中墩划。這種排隊(duì)可用于處理瞬態(tài)突發(fā)請求,當(dāng)命令以超過隊(duì)列所能處理的平均數(shù)連續(xù)到達(dá)時(shí)嗡综,此策略允許無界線程具有增長的可能性;?

3>ThreadFactory?使用 ThreadFactory 創(chuàng)建新線程乙帮。如果沒有另外說明,則在同一個(gè) ThreadGroup 中一律使用 Executors.defaultThreadFactory() 創(chuàng)建線程极景,并且這些線程具有相同的 NORM_PRIORITY 優(yōu)先級(jí)和非守護(hù)進(jìn)程狀態(tài)察净。通過提供不同的 ThreadFactory,可以改變線程的名稱盼樟、線程組氢卡、優(yōu)先級(jí)、守護(hù)進(jìn)程狀態(tài)等等晨缴。如果從 newThread 返回 null 時(shí) ThreadFactory 未能創(chuàng)建線程译秦,則執(zhí)行程序?qū)⒗^續(xù)運(yùn)行,但不能執(zhí)行任何任務(wù)。

publicinterfaceThreadFactory{Thread newThread(Runnable r);? }

1

2

3

  而構(gòu)造方法中的threadFactory對象筑悴,是通過 Executors.defaultThreadFactory()返回的们拙。Executors.java中的defaultThreadFactory()源碼如下:

publicstaticThreadFactorydefaultThreadFactory() {returnnewDefaultThreadFactory();? }

1

2

3

  在DefaultThreadFactory類中實(shí)現(xiàn)了ThreadFactory接口并對其中定義的方法進(jìn)行了實(shí)現(xiàn),如下:

staticclass DefaultThreadFactory implements ThreadFactory {privatestaticfinalAtomicInteger poolNumber =newAtomicInteger(1);privatefinalThreadGroup group;privatefinalAtomicInteger threadNumber =newAtomicInteger(1);privatefinalString namePrefix;? ? ? DefaultThreadFactory() {? ? ? ? ? SecurityManager s = System.getSecurityManager();? ? ? ? ? group = (s !=null) ? s.getThreadGroup() :? Thread.currentThread().getThreadGroup();? ? ? ? ? namePrefix ="pool-"+? poolNumber.getAndIncrement() +"-thread-";? ? ? }// 為線程池創(chuàng)建新的任務(wù)執(zhí)行線程? publicThreadnewThread(Runnable r) {// 線程對應(yīng)的任務(wù)是Runnable對象r? Thread t =newThread(group, r,namePrefix + threadNumber.getAndIncrement(),0);// 設(shè)為非守護(hù)線程? if(t.isDaemon())? ? ? ? ? ? ? t.setDaemon(false);// 將優(yōu)先級(jí)設(shè)為Thread.NORM_PRIORITY? if(t.getPriority() != Thread.NORM_PRIORITY)? ? ? ? ? ? ? t.setPriority(Thread.NORM_PRIORITY);returnt;? ? ? }? }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

4>RejectedExecutionHandler?

當(dāng)Executor已經(jīng)關(guān)閉(即執(zhí)行了executorService.shutdown()方法后)阁吝,并且Executor將有限邊界用于最大線程和工作隊(duì)列容量睛竣,且已經(jīng)飽和時(shí),在方法execute()中提交的新任務(wù)將被拒絕.?

在以上述情況下求摇,execute 方法將調(diào)用其 RejectedExecutionHandler 的RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。下面提供了四種預(yù)定義的處理程序策略:?

1) 在默認(rèn)的 ThreadPoolExecutor.AbortPolicy 處理程序遭到拒絕將拋出運(yùn)行時(shí) RejectedExecutionException;?

2) 在 ThreadPoolExecutor.CallerRunsPolicy 線程調(diào)用運(yùn)行該任務(wù)的 execute 本身殊者。此策略提供簡單的反饋控制機(jī)制与境,能夠減緩新任務(wù)的提交速度?

3) 在 ThreadPoolExecutor.DiscardPolicy 不能執(zhí)行的任務(wù)將被刪除;?

4) 在 ThreadPoolExecutor.DiscardOldestPolicy 如果執(zhí)行程序尚未關(guān)閉,則位于工作隊(duì)列頭部的任務(wù)將被刪除猖吴,然后重試執(zhí)行程序(如果再次失敗摔刁,則重復(fù)此過程)。?

線程池默認(rèn)會(huì)采用的是defaultHandler策略海蔽。首先看defaultHandler的定義:

privatestaticfinalRejectedExecutionHandler defaultHandler =newAbortPolicy();// 使用默認(rèn)的拒絕策略?

1

publicstaticclassAbortPolicyimplementsRejectedExecutionHandler{publicAbortPolicy() { }// 拋出異常? publicvoidrejectedExecution(Runnable r, ThreadPoolExecutor e) {thrownewRejectedExecutionException("Task "+ r.toString() +" rejected from "+? e.toString());? ? ? }? }

1

2

3

4

5

6

7

  看一下其他拒絕策略的具體實(shí)現(xiàn)共屈。

class MyRunnable implements Runnable {privateString name;publicMyRunnable(String name) {this.name = name;? ? ? }@Overridepublicvoidrun() {try{? ? ? ? ? ? ? System.out.println(this.name +" is running.");? ? ? ? ? ? ? Thread.sleep(100);? ? ? ? ? }catch(Exception e) {? ? ? ? ? ? ? e.printStackTrace();? ? ? ? ? }? ? ? }? }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

如上是一個(gè)測試任務(wù)的例子,下面編寫4個(gè)測試用例來測試党窜。?


1.DiscardPolicy 示例

publicclassDiscardPolicyDemo{privatestaticfinalintTHREADS_SIZE =1;privatestaticfinalintCAPACITY =1;publicstaticvoidmain(String[] args)throwsException {// 創(chuàng)建線程池拗引。線程池的"最大池大小"和"核心池大小"都為1(THREADS_SIZE),"線程池"的阻塞隊(duì)列容量為1(CAPACITY)幌衣。? ThreadPoolExecutor pool =newThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE,0, TimeUnit.SECONDS,newArrayBlockingQueue(CAPACITY));// 設(shè)置線程池的拒絕策略為"丟棄"? pool.setRejectedExecutionHandler(newThreadPoolExecutor.DiscardPolicy());// 新建10個(gè)任務(wù)矾削,并將它們添加到線程池中。? for(inti =0; i <10; i++) {? ? ? ? ? ? ? Runnable myrun =newMyRunnable("task-"+i);? ? ? ? ? ? ? pool.execute(myrun);? ? ? ? ? }// 關(guān)閉線程池? pool.shutdown();? ? ? }? }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

線程池pool的”最大池大小”和”核心池大小”都為1(THREADS_SIZE)豁护,這意味著”線程池能同時(shí)運(yùn)行的任務(wù)數(shù)量最大只能是1”哼凯。?

線程池pool的阻塞隊(duì)列是ArrayBlockingQueue,ArrayBlockingQueue是一個(gè)有界的阻塞隊(duì)列楚里,ArrayBlockingQueue的容量為1断部。這也意味著線程池的阻塞隊(duì)列只能有一個(gè)線程池阻塞等待。?

根據(jù)”“中分析的execute()代碼可知:線程池中共運(yùn)行了2個(gè)任務(wù)班缎。第1個(gè)任務(wù)直接放到Worker中充易,通過線程去執(zhí)行;第2個(gè)任務(wù)放到阻塞隊(duì)列中等待滥朱。其他的任務(wù)都被丟棄了妆兑!

2.DiscardOldestPolicy 示例

publicclassDiscardOldestPolicyDemo{privatestaticfinalintTHREADS_SIZE =1;privatestaticfinalintCAPACITY =1;publicstaticvoidmain(String[] args)throwsException {// 創(chuàng)建線程池。線程池的"最大池大小"和"核心池大小"都為1(THREADS_SIZE)苏携,"線程池"的阻塞隊(duì)列容量為1(CAPACITY)做瞪。? ThreadPoolExecutor pool =newThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE,0, TimeUnit.SECONDS,newArrayBlockingQueue(CAPACITY));// 設(shè)置線程池的拒絕策略為"DiscardOldestPolicy"? pool.setRejectedExecutionHandler(newThreadPoolExecutor.DiscardOldestPolicy());// 新建10個(gè)任務(wù),并將它們添加到線程池中。? for(inti =0; i <10; i++) {? ? ? ? ? ? ? Runnable myrun =newMyRunnable("task-"+i);? ? ? ? ? ? ? pool.execute(myrun);? ? ? ? ? }// 關(guān)閉線程池? pool.shutdown();? ? ? }? }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

運(yùn)行結(jié)果:

task-0isrunning.task-9isrunning.

1

2

將”線程池的拒絕策略”由DiscardPolicy修改為DiscardOldestPolicy之后装蓬,當(dāng)有任務(wù)添加到線程池被拒絕時(shí)著拭,線程池會(huì)丟棄阻塞隊(duì)列中末尾的任務(wù),然后將被拒絕的任務(wù)添加到末尾牍帚。

3.AbortPolicy 示例

publicclassAbortPolicyDemo{privatestaticfinalintTHREADS_SIZE =1;privatestaticfinalintCAPACITY =1;publicstaticvoidmain(String[] args)throwsException {// 創(chuàng)建線程池儡遮。線程池的"最大池大小"和"核心池大小"都為1(THREADS_SIZE),"線程池"的阻塞隊(duì)列容量為1(CAPACITY)暗赶。? ThreadPoolExecutor pool =newThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE,0, TimeUnit.SECONDS,newArrayBlockingQueue(CAPACITY));// 設(shè)置線程池的拒絕策略為"拋出異常"? pool.setRejectedExecutionHandler(newThreadPoolExecutor.AbortPolicy());try{// 新建10個(gè)任務(wù)鄙币,并將它們添加到線程池中。? for(inti =0; i <10; i++) {? ? ? ? ? ? ? ? ? Runnable myrun =newMyRunnable("task-"+i);? ? ? ? ? ? ? ? ? pool.execute(myrun);? ? ? ? ? ? ? }? ? ? ? ? }catch(RejectedExecutionException e) {? ? ? ? ? ? ? e.printStackTrace();// 關(guān)閉線程池? pool.shutdown();? ? ? ? ? }? ? ? }? }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

(某一次)運(yùn)行結(jié)果:

java.util.concurrent.RejectedExecutionExceptionat java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1774)? ? at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:768)? ? at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:656)? ? at AbortPolicyDemo.main(AbortPolicyDemo.java:27)task-0is running.task-1is running.

1

2

3

4

5

6

7

8

  將”線程池的拒絕策略”由DiscardPolicy修改為AbortPolicy之后蹂随,當(dāng)有任務(wù)添加到線程池被拒絕時(shí)十嘿,會(huì)拋出RejectedExecutionException。

4.CallerRunsPolicy 示例

publicclassCallerRunsPolicyDemo{privatestaticfinalintTHREADS_SIZE =1;privatestaticfinalintCAPACITY =1;publicstaticvoidmain(String[] args)throwsException {// 創(chuàng)建線程池岳锁。線程池的"最大池大小"和"核心池大小"都為1(THREADS_SIZE)绩衷,"線程池"的阻塞隊(duì)列容量為1(CAPACITY)。? ThreadPoolExecutor pool =newThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE,0, TimeUnit.SECONDS,newArrayBlockingQueue(CAPACITY));// 設(shè)置線程池的拒絕策略為"CallerRunsPolicy"? pool.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy());// 新建10個(gè)任務(wù)激率,并將它們添加到線程池中咳燕。? for(inti =0; i <10; i++) {? ? ? ? ? ? ? Runnable myrun =newMyRunnable("task-"+i);? ? ? ? ? ? ? pool.execute(myrun);? ? ? ? ? }// 關(guān)閉線程池? pool.shutdown();? ? ? }? }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

(某一次)運(yùn)行結(jié)果:

task-2isrunning.task-3isrunning.task-4isrunning.task-5isrunning.task-6isrunning.task-7isrunning.task-8isrunning.task-9isrunning.task-0isrunning.task-1isrunning.

1

2

3

4

5

6

7

8

9

10

  將”線程池的拒絕策略”由DiscardPolicy修改為CallerRunsPolicy之后,當(dāng)有任務(wù)添加到線程池被拒絕時(shí)乒躺,線程池會(huì)將被拒絕的任務(wù)添加到”線程池正在運(yùn)行的線程”中取運(yùn)行招盲。

第二部分

轉(zhuǎn)自這里

線程池能夠復(fù)用線程,減少線程創(chuàng)建聪蘸,銷毀宪肖,恢復(fù)等狀態(tài)切換的開銷,提高程序的性能健爬。一個(gè)線程池管理了一組工作線程控乾,同時(shí)它還包括了一個(gè)用于放置等待執(zhí)行的任務(wù)的隊(duì)列。?

ThreadPoolExecutor類中定義了一些與線程狀態(tài)與活動(dòng)線程數(shù)相關(guān)的一些變量娜遵,如下:

privatefinalAtomicInteger ctl =newAtomicInteger(ctlOf(RUNNING,0));// 將整型的24位分為高3位和低29位蜕衡,高3位表示線程池的狀態(tài),低29位表示活動(dòng)的線程數(shù)? privatestaticfinalintCOUNT_BITS = Integer.SIZE -3;privatestaticfinalintCAPACITY? = (1<< COUNT_BITS) -1;// 29位能表示的最大二進(jìn)制整數(shù),也就是活動(dòng)線程數(shù)? // 高3位數(shù)值代表的線程池狀態(tài)? privatestaticfinalintRUNNING? ? = -1<< COUNT_BITS;// running 線程池能接受新任務(wù)? privatestaticfinalintSHUTDOWN? =0<< COUNT_BITS;// shutdown 線程池不再接受新任務(wù)? privatestaticfinalintSTOP? ? ? =1<< COUNT_BITS;// stop 線程池不再接受新任務(wù)设拟,不再執(zhí)行隊(duì)列中的任務(wù)慨仿,而且要中斷正在處理的任務(wù)? privatestaticfinalintTIDYING? ? =2<< COUNT_BITS;// tidying 線程池所有任務(wù)均已終止? privatestaticfinalintTERMINATED =3<< COUNT_BITS;// terminated terminated()方法執(zhí)行結(jié)束?

1

2

3

4

5

6

7

8

9

10

11

由如上可知:?

ctl是一個(gè)AtomicInteger類型的原子對象。ctl記錄了”線程池中的任務(wù)數(shù)量”和”線程池狀態(tài)”2個(gè)信息纳胧。ctl共包括32位镰吆。其中,高3位表示”線程池狀態(tài)”跑慕,低29位表示”線程池中的任務(wù)數(shù)量”万皿。

RUNNING--對應(yīng)的高3位值是111SHUTDOWN--對應(yīng)的高3位值是000STOP--對應(yīng)的高3位值是001TIDYING--對應(yīng)的高3位值是010TERMINATED--對應(yīng)的高3位值是011

1

2

3

4

5

線程池各個(gè)狀態(tài)之間的切換如下圖所示:?


線程池各個(gè)狀態(tài)間的轉(zhuǎn)換的詳細(xì)解釋如下所示摧找。?

1> RUNNING(111) -> SHUTDOWN(000) : 調(diào)用了shutdown方法,線程池實(shí)現(xiàn)了finalize方法牢硅,在里面調(diào)用了shutdown方法蹬耘,因此shutdown可能是在finalize中被隱式調(diào)用的?

2> (RUNNING(111) or SHUTDOWN(000)) -> STOP(001) 調(diào)用了shutdownNow方法

  3> SHUTDOWN(000) -> TIDYING(010) : 當(dāng)隊(duì)列和線程池均為空的時(shí)候

  4> STOP(001) -> TIDYING(010) : 當(dāng)線程池為空的時(shí)候

  5> TIDYING(010) -> TERMINATED(011) : terminated()方法調(diào)用完畢

  說明:擴(kuò)號(hào)后的3位數(shù)字表示ctl的高3位二進(jìn)制值,并不關(guān)注低29位二進(jìn)制的值

  還有一些對常量的操作方法减余,只說明部分综苔,其他的有興趣自己可以去查看,如下:

privatestaticintrunStateOf(intc)? ? {returnc & ~CAPACITY; }// 得到線程運(yùn)行狀態(tài)? privatestaticintworkerCountOf(intc)? {returnc & CAPACITY; }// 得到活動(dòng)線程數(shù)? privatestaticintctlOf(intrs,intwc) {returnrs | wc; }// 得到兩者表示的值?

1

2

3

  來看一下ThreadPoolExecutor()中最主要的一個(gè)構(gòu)造函數(shù),如下:

publicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,? ? ? ? ? ? ? ? ? ? ? ? ? ? TimeUnit unit,? ? ? ? ? ? ? ? ? ? ? ? ? ? BlockingQueue workQueue,? ? ? ? ? ? ? ? ? ? ? ? ? ? ThreadFactory threadFactory,? ? ? ? ? ? ? ? ? ? ? ? ? ? RejectedExecutionHandler handler) {if(corePoolSize <0|| maximumPoolSize <=0|| maximumPoolSize < corePoolSize ||? keepAliveTime <0)thrownewIllegalArgumentException();if(workQueue ==null|| threadFactory ==null|| handler ==null)thrownewNullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;? }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

  調(diào)用Executors方法中的幾個(gè)方法位岔,如newCachedThreadPool()如筛、newFixedThreadPool()時(shí),都會(huì)間接調(diào)用上面的構(gòu)造方法來初始化所有的線程池相關(guān)變量抒抬。

1妙黍、創(chuàng)建線程池并執(zhí)行任務(wù)

有了Executor對象后,就可以調(diào)用execute()方法執(zhí)行任務(wù)了瞧剖。方法的源代碼如下:

publicvoidexecute(Runnable command) {if(command ==null)// 任務(wù)為null,則拋出異常? thrownewNullPointerException();intc = ctl.get();// 取出記錄著runState和workerCount 的 ctl的當(dāng)前值? /*

? ? ? ? *? 通過workerCountOf方法從ctl所表示的int值中提取出低29位的值,也就是當(dāng)前活動(dòng)的線程數(shù)。如果當(dāng)前

? ? ? ? *? 活動(dòng)的線程數(shù)少于corePoolSize,則通過addWorker(command, true)新建一個(gè)線程,并將任務(wù)(command)

? ? ? ? *? 添加到該線程中

? ? ? ? */if(workerCountOf(c) < corePoolSize) {/*

? ? ? ? ? ? * addWorker()返回值表示:

? ? ? ? ? ? * 1可免、true 表示需要檢測當(dāng)前運(yùn)行的線程是否小于corePoolSize

? ? ? ? ? ? * 2抓于、false 表示需要檢測當(dāng)前運(yùn)行的線程數(shù)量是否小于maxPoolSize

? ? ? ? ? ? */if(addWorker(command,true))return;// 新線程創(chuàng)建成功,終止該方法的執(zhí)行? c = ctl.get();// 任務(wù)添加到線程失敗,取出記錄著runState和workerCount 的 ctl的當(dāng)前值? }/*

? ? ? ? * 方法解釋:

? ? ? ? * isRunning(c) 當(dāng)前線程池是否處于運(yùn)行狀態(tài)。源代碼是通過判斷c < SHUTDOWN 來確定返回值浇借。由于RUNNING才會(huì)接收新任務(wù)捉撮,且只有這個(gè)值-1才小于SHUTDOWN

? ? ? ? * workQueue.offer(command) 任務(wù)添加到緩沖隊(duì)列

? ? ? ? */if(isRunning(c) && workQueue.offer(command)) {// 當(dāng)前線程處于運(yùn)行狀態(tài)且成功添加到緩沖隊(duì)列? intrecheck = ctl.get();/*

? ? ? ? ? ? * 如果 線程池已經(jīng)處于非運(yùn)行狀態(tài),則從緩沖隊(duì)列中移除任務(wù)然后采用線程池指定的策略拒絕任務(wù)

? ? ? ? ? ? * 如果 線程池中任務(wù)數(shù)量為0,則通過addWorker(null, false)嘗試新建一個(gè)線程,新建線程對應(yīng)的任務(wù)為null

? ? ? ? ? ? */if(! isRunning(recheck) && remove(command))? ? ? ? ? ? ? ? ? ? ? reject(command);elseif(workerCountOf(recheck) ==0)// 得到活動(dòng)線程數(shù)為0? ? ? ? ? addWorker(null,false);? ? ? ? ? }/*

? ? ? ? * 當(dāng)不滿足以下兩個(gè)條件時(shí)執(zhí)行如下代碼:

? ? ? ? *? 1. 當(dāng)前線程池并不處于Running狀態(tài)? ?

? ? ? ? *? 2. 當(dāng)前線程池處于Running狀態(tài),但是緩沖隊(duì)列已經(jīng)滿了?

? ? ? ? */elseif(!addWorker(command,false))? ? ? ? ? ? ? reject(command);// 采用線程池指定的策略拒絕任務(wù)? }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

  當(dāng)前活動(dòng)的線程小于corePoolSize了,那么等于和大于corePoolSize怎么處理呢妇垢?

  1> 當(dāng)前活動(dòng)的線程數(shù)量 >= corePoolSize 的時(shí)候巾遭,都是優(yōu)先添加到隊(duì)列中,直到隊(duì)列滿了才會(huì)去創(chuàng)建新的線程闯估,在這里第27 行的if語句已經(jīng)體現(xiàn)出來了灼舍。這里利用了&&的特性,只有當(dāng)?shù)谝粋€(gè)條件會(huì)真時(shí)才會(huì)去判斷第二個(gè)條件涨薪,第一個(gè)條件是isRunning()骑素,判斷線程池是否處于RUNNING狀態(tài),因?yàn)橹挥性谶@個(gè)狀態(tài)下才會(huì)接受新任務(wù)刚夺,否則就拒絕献丑,如果正處于RUNNING狀態(tài),那么就加入隊(duì)列侠姑,如果加入失敗可能就是隊(duì)列已經(jīng)滿了创橄,這時(shí)候直接執(zhí)行第29行。

  2> 在execute()方法中莽红,當(dāng) 當(dāng)前活動(dòng)的線程數(shù)量 < corePoolSize 時(shí)妥畏,會(huì)執(zhí)行addWorker()方法,關(guān)于addWorker(),它是用來直接新建線程用的咖熟,之所以叫addWorker而不是addThread是因?yàn)樵诰€程池中圃酵,所有的線程都用一個(gè)Worker對象包裝著,來看一下這個(gè)方法:

/**? * 創(chuàng)建并執(zhí)行新線程? * @paramfirstTack 用于指定新增的線程執(zhí)行的第一個(gè)任務(wù)? *? * @paramcore? ? ? true表示在新增線程時(shí)會(huì)判斷當(dāng)前活動(dòng)線程數(shù)是否少于corePoolSize馍管,? *? ? ? ? ? ? ? ? ? false表示新增線程前需要判斷當(dāng)前活動(dòng)線程數(shù)是否少于maximumPoolSize? *? * @return是否成功新增一個(gè)線程? */privatebooleanaddWorker(Runnable firstTask,booleancore) {? ? ? ? retry:for(;;) {intc = ctl.get();// 獲取記錄著runState和workCount的int變量的當(dāng)前值? intrs = runStateOf(c);// 獲取當(dāng)前線程池運(yùn)行的狀態(tài)? /*

? ? ? ? ? ? 這個(gè)條件代表著以下幾個(gè)情景郭赐,就直接返回false說明線程創(chuàng)建失敗:

? ? ? ? ? ? 1.rs > SHUTDOWN确沸; 此時(shí)不再接收新任務(wù)捌锭,且所有的任務(wù)已經(jīng)執(zhí)行完畢

? ? ? ? ? ? 2.rs = SHUTDOWN; 此時(shí)不再接收新任務(wù)罗捎,但是會(huì)執(zhí)行隊(duì)列中的任務(wù)观谦,在后面的或語句中,第一個(gè)不成立桨菜,firstTask != null成立

? ? ? ? ? ? 3.rs = SHUTDOWN豁状;此時(shí)不再接收新任務(wù),fistTask == null,任務(wù)隊(duì)列workQueue已經(jīng)空了

? ? ? ? ? */if(rs >= SHUTDOWN &&? ? ? ? ? ? ? ? ! (rs == SHUTDOWN &&? ? ? ? ? ? ? ? ? firstTask ==null&&? ? ? ? ? ? ? ? ? ! workQueue.isEmpty()))returnfalse;for(;;) {//獲取當(dāng)前活動(dòng)的線程數(shù)? intwc = workerCountOf(c);//先判斷當(dāng)前活動(dòng)的線程數(shù)是否大于最大值倒得,如果超過了就直接返回false說明線程創(chuàng)建失敗? //如果沒有超過再根據(jù)core的值再進(jìn)行以下判斷? /*

? ? ? ? ? ? ? ? ? 1.core為true泻红,則判斷當(dāng)前活動(dòng)的線程數(shù)是否大于corePoolSize

? ? ? ? ? ? ? ? ? 2.core為false,則判斷當(dāng)前活動(dòng)線程數(shù)是否大于maximumPoolSize

? ? ? ? ? ? ? */if(wc >= CAPACITY ||? ? ? ? ? ? ? ? ? ? wc >= (core ? corePoolSize : maximumPoolSize))returnfalse;//比較當(dāng)前值是否和c相同霞掺,如果相同谊路,則改為c+1,并且跳出大循環(huán)菩彬,直接執(zhí)行Worker進(jìn)行線程創(chuàng)建? if(compareAndIncrementWorkerCount(c))breakretry;? ? ? ? ? ? ? ? c = ctl.get();// 獲取ctl的當(dāng)前值? if(runStateOf(c) != rs)//檢查下當(dāng)前線程池的狀態(tài)是否已經(jīng)發(fā)生改變? continueretry;//如果已經(jīng)改變了缠劝,則進(jìn)行外層retry大循環(huán),否則只進(jìn)行內(nèi)層的循環(huán)? // else CAS failed due to workerCount change; retry inner loop? }? ? ? ? }//下面這里就是開始創(chuàng)建新的線程了? //Worker的也是Runnable的實(shí)現(xiàn)類? Worker w =newWorker(firstTask);//因?yàn)椴豢梢灾苯釉赪orker的構(gòu)造方法中進(jìn)行線程創(chuàng)建? //所以要把它的引用賦給t方便后面進(jìn)行線程創(chuàng)建? Thread t = w.thread;finalReentrantLock mainLock =this.mainLock;? ? ? ? mainLock.lock();try{//再次取出ctl的當(dāng)前值骗灶,用于進(jìn)行狀態(tài)的檢查惨恭,防止線程池的已經(jīng)狀態(tài)改變了? intc = ctl.get();intrs = runStateOf(c);//將if語句中的條件轉(zhuǎn)換為一個(gè)等價(jià)實(shí)現(xiàn) :t == null || (rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null))? //有個(gè)t == null是因?yàn)槿绻褂玫氖悄J(rèn)的ThreadFactory的話,那么它的newThread()可能會(huì)返回null? /*

? ? ? ? ? ? 1. 如果t == null, 則減少一個(gè)線程數(shù)耙旦,如果線程池處于的狀態(tài) > SHUTDOWN,則嘗試終止線程池

? ? ? ? ? ? 2. 如果t 喉恋!= null,且rs == SHUTDOWN母廷,則不再接收新任務(wù)轻黑,若firstTask != null,則此時(shí)也是返回false琴昆,創(chuàng)建線程失敗

? ? ? ? ? ? 3. 如果t 氓鄙!= null, 且rs > SHUTDOWN,同樣不再接受新任務(wù)业舍,此時(shí)也是返回false抖拦,創(chuàng)建線程失敗

? ? ? ? ? */if(t ==null||? ? ? ? ? ? ? ? (rs >= SHUTDOWN &&? ? ? ? ? ? ? ? ! (rs == SHUTDOWN &&? ? ? ? ? ? ? ? ? ? firstTask ==null))) {? ? ? ? ? ? ? ? decrementWorkerCount();//減少一個(gè)活動(dòng)的當(dāng)前線程數(shù)? tryTerminate();//嘗試終止線程池? returnfalse;//返回線程創(chuàng)建失敗? }? ? ? ? ? ? workers.add(w);//將創(chuàng)建的線程添加到workers容器中? ints = workers.size();//獲取當(dāng)前線程活動(dòng)的數(shù)量? if(s > largestPoolSize)//判斷當(dāng)前線程活動(dòng)的數(shù)量是否超過線程池最大的線程數(shù)量? largestPoolSize = s;//當(dāng)池中的工作線程創(chuàng)新高時(shí)升酣,會(huì)將這個(gè)數(shù)記錄到largestPoolSize字段中。然后就可以啟動(dòng)這個(gè)線程t了? }finally{? ? ? ? ? ? mainLock.unlock();? ? ? ? }? ? ? ? t.start();//開啟線程? //若start后态罪,狀態(tài)又變成了SHUTDOWN狀態(tài)(如調(diào)用了shutdownNow方法)且新建的線程沒有被中斷過噩茄,? //就要中斷該線程(shutdownNow方法要求中斷正在執(zhí)行的線程),? //shutdownNow方法本身也會(huì)去中斷存儲(chǔ)在workers中的所有線程? if(runStateOf(ctl.get()) == STOP && ! t.isInterrupted())? ? ? ? ? ? t.interrupt();returntrue;? ? }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

  那么在創(chuàng)建線程的時(shí)候复颈,線程執(zhí)行的是什么的呢绩聘?

  我們前面提到Worker繼承的其實(shí)也是Runnable,它在創(chuàng)建線程的時(shí)候是以自身作為任務(wù)傳進(jìn)先創(chuàng)建的線程中的耗啦,這段比較簡單凿菩,我就不一一注釋了,只是給出源代碼給大家看吧帜讲。

Worker(Runnable firstTask) {this.firstTask = firstTask;//this指的是worker對象本身this.thread = getThreadFactory().newThread(this); }

1

2

3

4

5

  它以自身的對象作為線程任務(wù)傳進(jìn)去衅谷,那么它的run方法又是怎樣的呢?

publicvoidrun() {? ? runWorker(this);}

1

2

3

竟然只有一句話調(diào)用runWorker()方法似将,這個(gè)可是重頭戲获黔,我們來看看,究竟運(yùn)行的是什么在验。

/**? * 執(zhí)行Worker中的任務(wù)肢执,它的執(zhí)行流程是這樣的:? * 若存在第一個(gè)任務(wù),則先執(zhí)行第一個(gè)任務(wù)译红,否則,從隊(duì)列中拿任務(wù)兴溜,不斷的執(zhí)行侦厚,? * 直到getTask()返回null或執(zhí)行任務(wù)出錯(cuò)(中斷或任務(wù)本身拋出異常),就退出while循環(huán)拙徽。? * @paramw woker? */finalvoidrunWorker(Worker w) {? ? ? ? ? Runnable task = w.firstTask;//將當(dāng)前Worker中的任務(wù)取出來交給task刨沦,并釋放掉w.firstTask占用的內(nèi)存? w.firstTask =null;//用于判斷線程是否由于異常終止,如果不是異常終止膘怕,在后面將會(huì)將該變量的值改為false? //該變量的值在processWorkerExit()會(huì)使用來判斷線程是否由于異常終止? booleancompletedAbruptly =true;try{//執(zhí)行任務(wù)想诅,直到getTask()返回的值為null,在此處就相當(dāng)于復(fù)用了線程岛心,讓線程執(zhí)行了多個(gè)任務(wù)? while(task !=null|| (task = getTask()) !=null) {? ? ? ? ? ? ? ? ? ? ? w.lock();? ? ? ? ? ? ? ? ? clearInterruptsForTaskRun();//對線程池狀態(tài)進(jìn)行一次判斷来破,后面我們會(huì)講解一下該方法? try{? ? ? ? ? ? ? ? ? ? ? beforeExecute(w.thread, task);//在任務(wù)執(zhí)行前需要做的邏輯方法,該方面可以由用戶進(jìn)行重寫自定義? Throwable thrown =null;try{? ? ? ? ? ? ? ? ? ? ? ? ? task.run();//開始執(zhí)行任務(wù)? }catch(RuntimeException x) {? ? ? ? ? ? ? ? ? ? ? ? ? thrown = x;throwx;? ? ? ? ? ? ? ? ? ? ? }catch(Error x) {? ? ? ? ? ? ? ? ? ? ? ? ? thrown = x;throwx;? ? ? ? ? ? ? ? ? ? ? }catch(Throwable x) {? ? ? ? ? ? ? ? ? ? ? ? ? thrown = x;thrownewError(x);? ? ? ? ? ? ? ? ? ? ? }finally{? ? ? ? ? ? ? ? ? ? ? ? ? afterExecute(task, thrown);//在任務(wù)執(zhí)行后需要做的邏輯方法忘古,該方面可以由用戶進(jìn)行重寫自定義? }? ? ? ? ? ? ? ? ? }finally{? ? ? ? ? ? ? ? ? ? ? task =null;? ? ? ? ? ? ? ? ? ? ? ? ? w.completedTasks++;//增加該線程完成的任務(wù)? w.unlock();? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? }? ? ? ? ? ? ? completedAbruptly =false;//線程不是異常終止? }finally{? ? ? ? ? ? ? processWorkerExit(w, completedAbruptly);//結(jié)束該線程? }? ? ? }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

  下面就是線程在執(zhí)行任務(wù)之前對線程池狀態(tài)的一次判斷:

/**? ? * 對線程的結(jié)束做一些清理和數(shù)據(jù)同步? ? * @paramw 封裝線程的Worker? ? * @paramcompletedAbruptly 表示該線程是否結(jié)束于異常? ? */privatevoidprocessWorkerExit(Worker w,booleancompletedAbruptly) {// 如果completedAbruptly值為true徘禁,則說明線程是結(jié)束于異常? //如果不是結(jié)束于異常,那么它降在runWorker方法的while循環(huán)中的getTask()方法中已經(jīng)減一了? if(completedAbruptly)? ? ? ? ? ? decrementWorkerCount();//此時(shí)將線程數(shù)量減一? finalReentrantLock mainLock =this.mainLock;? ? ? ? mainLock.lock();try{? ? ? ? ? ? completedTaskCount += w.completedTasks;//統(tǒng)計(jì)總共完成的任務(wù)數(shù)? workers.remove(w);//將該線程數(shù)從workers容器中移除? }finally{? ? ? ? ? ? mainLock.unlock();? ? ? ? }? ? ? ? tryTerminate();//嘗試終止線程池? intc = ctl.get();//接下來的這個(gè)if塊要做的事兒了髓堪。當(dāng)池的狀態(tài)還是RUNNING送朱,? //又要分兩種情況娘荡,一種是異常結(jié)束,一種是正常結(jié)束驶沼。異常結(jié)束比較好弄炮沐,直接加個(gè)線程替換死掉的線程就好了,? //也就是最后的addWorker操作? if(runStateLessThan(c, STOP)) {//如果當(dāng)前運(yùn)行狀態(tài)為RUNNING,SHUTDOWN? if(!completedAbruptly) {//如果線程不是結(jié)束于異常? intmin = allowCoreThreadTimeOut ?0: corePoolSize;//是否允許線程超時(shí)結(jié)束? if(min ==0&& ! workQueue.isEmpty())//如果允許把那個(gè)且隊(duì)列不為空? min =1;//至少要保留一個(gè)線程來完成任務(wù)? //如果當(dāng)前活動(dòng)的線程數(shù)大于等于最小的值? // 1.不允許核心線程超時(shí)結(jié)束回怜,則必須要使得活動(dòng)線程數(shù)超過corePoolSize數(shù)才可以? // 2. 允許核心線程超時(shí)結(jié)束大年,但是隊(duì)列中有任務(wù),必須留至少一個(gè)線程? if(workerCountOf(c) >= min)return;// replacement not needed? }//直接加個(gè)線程? addWorker(null,false);? ? ? ? ? ? }? ? }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

  前面我們的方法遇見過很多次tryTerminate()方法鹉戚,到底他是怎樣嘗試結(jié)束線程池的呢鲜戒?

/**

* 執(zhí)行該方法,根據(jù)線程池狀態(tài)進(jìn)行? 判斷是否結(jié)束線程池

*/finalvoidtryTerminate() {for(;;) {intc = ctl.get();if(isRunning(c) ||//線程池正在運(yùn)行中抹凳,自然不能結(jié)束線程池啦? runStateAtLeast(c, TIDYING) ||//如果狀態(tài)為TIDYING或TERMINATED遏餐,池中的活動(dòng)線程數(shù)已經(jīng)是0,自然也不需要做什么操作了? (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))//線程池出于SHUTDOWN狀態(tài)赢底,但是任務(wù)隊(duì)列不為空失都,自然不能結(jié)束線程池啦? return;if(workerCountOf(c) !=0) {// Eligible to terminate? /*

? ? ? ? ? ? ? 調(diào)用這個(gè)方法的目的是將shutdown信號(hào)傳播給其它線程。

? ? ? ? ? ? ? 調(diào)用shutdown方法的時(shí)候會(huì)去中斷所有空閑線程幸冻,如果這時(shí)候池中所有的線程都正在執(zhí)行任務(wù)粹庞,

? ? ? ? ? ? ? 那么就不會(huì)有線程被中斷,調(diào)用shutdown方法只是設(shè)置了線程池的狀態(tài)為SHUTDOWN洽损,

? ? ? ? ? ? ? 在取任務(wù)(getTask,后面會(huì)細(xì)說)的時(shí)候庞溜,假如很多線程都發(fā)現(xiàn)隊(duì)列里還有任務(wù)(沒有使用鎖,存在競態(tài)條件)碑定,

? ? ? ? ? ? ? 然后都去調(diào)用take流码,如果任務(wù)數(shù)小于池中的線程數(shù),那么必然有方法調(diào)用take后會(huì)一直等待(shutdown的時(shí)候這些線程正在執(zhí)行任務(wù)延刘,

? ? ? ? ? ? ? 所以沒能調(diào)用它的interrupt漫试,其中斷狀態(tài)沒有被設(shè)置),那么在沒有任務(wù)且線程池的狀態(tài)為SHUTDWON的時(shí)候碘赖,

? ? ? ? ? ? ? 這些等待中的空閑線程就需要被終止iinterruptIdleWorkers(ONLY_ONE)回去中斷一個(gè)線程驾荣,讓其從take中退出,

? ? ? ? ? ? ? 然后這個(gè)線程也進(jìn)入同樣的邏輯普泡,去終止一個(gè)其它空閑線程播掷,直到池中的活動(dòng)線程數(shù)為0。

? ? ? ? ? ? */interruptIdleWorkers(ONLY_ONE);return;? ? ? ? ? }finalReentrantLock mainLock =this.mainLock;? ? ? ? ? mainLock.lock();try{/*

? ? ? ? ? ? ? 當(dāng)狀態(tài)為SHUTDOWN撼班,且活動(dòng)線程數(shù)為0的時(shí)候叮趴,就可以進(jìn)入TIDYING狀態(tài)了,

? ? ? ? ? ? ? 進(jìn)入TIDYING狀態(tài)就可以執(zhí)行方法terminated()权烧,

? ? ? ? ? ? ? 該方法執(zhí)行結(jié)束就進(jìn)入了TERMINATED狀態(tài)(參考前文中各狀態(tài)的含義以及可能的狀態(tài)轉(zhuǎn)變)

? ? ? ? ? ? */if(ctl.compareAndSet(c, ctlOf(TIDYING,0))) {try{? ? ? ? ? ? ? ? ? ? ? terminated();//執(zhí)行該方法眯亦,結(jié)束線程池? }finally{? ? ? ? ? ? ? ? ? ? ? ctl.set(ctlOf(TERMINATED,0));/*

? ? ? ? ? ? ? ? ? ? ? 當(dāng)線程池shutdown后伤溉,外部可能還有很多線程在等待線程池真正結(jié)束,

? ? ? ? ? ? ? ? ? ? ? 即調(diào)用了awaitTermination方法妻率,該方法中乱顾,外部線程就是在termination上await的,

? ? ? ? ? ? ? ? ? ? ? 所以宫静,線程池關(guān)閉之前要喚醒這些等待的線程走净,告訴它們線程池關(guān)閉結(jié)束了。

? ? ? ? ? ? ? ? ? ? */termination.signalAll();? ? ? ? ? ? ? ? ? }return;? ? ? ? ? ? ? }? ? ? ? ? }finally{? ? ? ? ? ? ? mainLock.unlock();? ? ? ? ? }// else retry on failed CAS? }? }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

2孤里、關(guān)閉線程池

  關(guān)閉時(shí)使用shutdown()方法伏伯,源碼如下:

publicvoidshutdown() {finalReentrantLock mainLock =this.mainLock;? ? ? ? mainLock.lock();try{? ? ? ? ? ? checkShutdownAccess();// 檢查終止線程池的線程是否有權(quán)限。? advanceRunState(SHUTDOWN);// 設(shè)置線程池的狀態(tài)為關(guān)閉狀態(tài)捌袜。? interruptIdleWorkers();// 中斷線程池中空閑的線程? onShutdown();// 鉤子函數(shù)说搅,在ThreadPoolExecutor中沒有任何動(dòng)作? }finally{? ? ? ? ? ? mainLock.unlock();? ? ? ? }? ? ? ? tryTerminate();// 嘗試終止線程池? }

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市虏等,隨后出現(xiàn)的幾起案子弄唧,更是在濱河造成了極大的恐慌,老刑警劉巖霍衫,帶你破解...
    沈念sama閱讀 206,126評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件候引,死亡現(xiàn)場離奇詭異,居然都是意外死亡敦跌,警方通過查閱死者的電腦和手機(jī)澄干,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來柠傍,“玉大人麸俘,你說我怎么就攤上這事⌒” “怎么了?”我有些...
    開封第一講書人閱讀 152,445評(píng)論 0 341
  • 文/不壞的土叔 我叫張陵搂誉,是天一觀的道長徐紧。 經(jīng)常有香客問我,道長炭懊,這世上最難降的妖魔是什么并级? 我笑而不...
    開封第一講書人閱讀 55,185評(píng)論 1 278
  • 正文 為了忘掉前任,我火速辦了婚禮侮腹,結(jié)果婚禮上嘲碧,老公的妹妹穿的比我還像新娘。我一直安慰自己父阻,他們只是感情好愈涩,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,178評(píng)論 5 371
  • 文/花漫 我一把揭開白布望抽。 她就那樣靜靜地躺著,像睡著了一般履婉。 火紅的嫁衣襯著肌膚如雪煤篙。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 48,970評(píng)論 1 284
  • 那天毁腿,我揣著相機(jī)與錄音辑奈,去河邊找鬼。 笑死已烤,一個(gè)胖子當(dāng)著我的面吹牛鸠窗,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播胯究,決...
    沈念sama閱讀 38,276評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼稍计,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了唐片?” 一聲冷哼從身側(cè)響起丙猬,我...
    開封第一講書人閱讀 36,927評(píng)論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎费韭,沒想到半個(gè)月后茧球,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,400評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡星持,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,883評(píng)論 2 323
  • 正文 我和宋清朗相戀三年抢埋,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片督暂。...
    茶點(diǎn)故事閱讀 37,997評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡揪垄,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出逻翁,到底是詐尸還是另有隱情饥努,我是刑警寧澤,帶...
    沈念sama閱讀 33,646評(píng)論 4 322
  • 正文 年R本政府宣布八回,位于F島的核電站酷愧,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏缠诅。R本人自食惡果不足惜溶浴,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,213評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望管引。 院中可真熱鬧士败,春花似錦、人聲如沸褥伴。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至戏自,卻和暖如春邦投,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背擅笔。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評(píng)論 1 260
  • 我被黑心中介騙來泰國打工志衣, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人猛们。 一個(gè)月前我還...
    沈念sama閱讀 45,423評(píng)論 2 352
  • 正文 我出身青樓念脯,卻偏偏與公主長得像,于是被迫代替她去往敵國和親弯淘。 傳聞我的和親對象是個(gè)殘疾皇子绿店,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,722評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容

  • 在我們的開發(fā)中“池”的概念并不罕見,有數(shù)據(jù)庫連接池庐橙、線程池假勿、對象池、常量池等等态鳖。下面我們主要針對線程池來一步一步揭...
    Alukar閱讀 897評(píng)論 2 1
  • 在我們的開發(fā)中“池”的概念并不罕見转培,有數(shù)據(jù)庫連接池、線程池浆竭、對象池浸须、常量池等等。下面我們主要針對線程池來一步一步揭...
    Java架構(gòu)閱讀 4,064評(píng)論 4 87
  • 為什么使用線程池 當(dāng)我們在使用線程時(shí)邦泄,如果每次需要一個(gè)線程時(shí)都去創(chuàng)建一個(gè)線程删窒,這樣實(shí)現(xiàn)起來很簡單,但是會(huì)有一個(gè)問題...
    閩越布衣閱讀 4,278評(píng)論 10 45
  • 楚城人間四月天顺囊, 湖邊垂柳低照顏肌索。 澄黃一片菜花田, 爭相怒放惹人念特碳。 “《墻頭馬上》――我在洛陽的陽春三月诚亚,牡丹...
    o0嘴角微翹0o閱讀 203評(píng)論 0 0
  • 01 “愛笑的女孩運(yùn)氣不會(huì)太差”亡电,這是快被用濫的一句話届巩。 我曾把它當(dāng)做座右銘硅瞧。臉都快笑爛了,也沒中彩票恕汇,反倒收獲了...
    俐緣閱讀 1,301評(píng)論 2 4