聊聊 Java 多線程(5)- 超詳細(xì)的ThreadPoolExecutor源碼解析

目前蝠猬,多線程編程可以說是在大部分平臺(tái)和應(yīng)用上都需要實(shí)現(xiàn)的一個(gè)基本需求克懊。本系列文章就來對(duì) Java 平臺(tái)下的多線程編程知識(shí)進(jìn)行講解,從概念入門焰宣、底層實(shí)現(xiàn)上層應(yīng)用都會(huì)涉及到霉囚,預(yù)計(jì)一共會(huì)有五篇文章,希望對(duì)你有所幫助 ????

本篇文章是第五篇匕积,應(yīng)該也是最后一篇了盈罐,從現(xiàn)實(shí)需求出發(fā)到源碼介紹,一步步理清楚線程池的作用和優(yōu)勢

線程池(ThreadPool)面對(duì)的是外部復(fù)雜多變的多線程環(huán)境闸天,既需要保證多線程環(huán)境下的狀態(tài)同步暖呕,也需要最大化對(duì)每個(gè)線程的利用率,還需要留給子類足夠多的余地來實(shí)現(xiàn)功能擴(kuò)展苞氮。所以說湾揽,線程池的難點(diǎn)在于如何實(shí)現(xiàn),而在概念上其實(shí)還是挺簡單的笼吟。在 Java 中库物,線程池這個(gè)概念一般都認(rèn)為對(duì)應(yīng)的是 JDK 中的 ThreadPoolExecutor 類及其各種衍生類,本篇文章就從實(shí)現(xiàn)思路出發(fā)贷帮,探索 ThreadPoolExecutor 的源碼到底是如何實(shí)現(xiàn)的以及為什么這么實(shí)現(xiàn)

一戚揭、線程池

線程是一種昂貴的系統(tǒng)資源,其“昂貴”不僅在于創(chuàng)建線程所需要的資源開銷撵枢,還在于使用過程中帶來的資源消耗民晒。一個(gè)系統(tǒng)能夠支持同時(shí)運(yùn)行的線程總數(shù)受限于該系統(tǒng)所擁有的處理器數(shù)目和內(nèi)存大小等硬件條件,線程的運(yùn)行需要占用處理器時(shí)間片锄禽,系統(tǒng)中處于運(yùn)行狀態(tài)的線程越多潜必,每個(gè)線程單位時(shí)間內(nèi)能分配到的時(shí)間片就會(huì)越少,線程調(diào)度帶來的上下文切換的次數(shù)就會(huì)越多沃但,最終導(dǎo)致處理器真正用于運(yùn)算的時(shí)間就會(huì)越少磁滚。此外,在現(xiàn)實(shí)場景中一個(gè)程序在其整個(gè)生命周期內(nèi)需要交由線程執(zhí)行的任務(wù)數(shù)量往往是遠(yuǎn)多于系統(tǒng)所能支持同時(shí)運(yùn)行的最大線程數(shù)〈谷粒基于以上原因维雇,為每個(gè)任務(wù)都創(chuàng)建一個(gè)線程來負(fù)責(zé)執(zhí)行是不太現(xiàn)實(shí)的。那么晒他,我們最直接的一個(gè)想法就是要考慮怎么來實(shí)現(xiàn)線程的復(fù)用了

線程池就是實(shí)現(xiàn)線程復(fù)用的一種有效方式吱型。線程池的思想可以看做是對(duì)資源是有限的而需要處理的任務(wù)幾乎是無限的這樣一個(gè)現(xiàn)狀的應(yīng)對(duì)措施。線程池的一般實(shí)現(xiàn)思路是:線程池內(nèi)部預(yù)先創(chuàng)建或者是先后創(chuàng)建一定數(shù)量的線程仪芒,外部將需要執(zhí)行的任務(wù)作為一個(gè)對(duì)象提交給線程池唁影,由線程池選擇某條空閑線程來負(fù)責(zé)執(zhí)行。如果所有線程都處于工作狀態(tài)且線程總數(shù)已經(jīng)達(dá)到限制條件了掂名,則先將任務(wù)緩存到任務(wù)隊(duì)列中,線程再不斷從任務(wù)隊(duì)列中取出任務(wù)并執(zhí)行哟沫。因此饺蔑,線程池可以看做是基于生產(chǎn)者-消費(fèi)者模式的一種服務(wù),內(nèi)部維護(hù)的多個(gè)線程相當(dāng)于消費(fèi)者嗜诀,提交的任務(wù)相當(dāng)于產(chǎn)品猾警,提交任務(wù)的外部就相當(dāng)于生產(chǎn)者

二、思考下

好了隆敢,既然已經(jīng)對(duì)線程池這個(gè)概念有了基本的了解发皿,那么就再來思考下線程池應(yīng)該具備的功能以及應(yīng)該如何來實(shí)現(xiàn)線程池

  1. 線程池中的線程最大數(shù)量應(yīng)該如何限定?

既然我們不可能無限制地創(chuàng)建線程拂蝎,那么在創(chuàng)建線程池前就需要為其設(shè)定一個(gè)最大數(shù)量穴墅,我們稱之為最大線程池大小(maximumPoolSize)温自,當(dāng)線程池中的當(dāng)前線程總數(shù)達(dá)到 maximumPoolSize 后就不應(yīng)該再創(chuàng)建線程了玄货。在開發(fā)中,我們需要根據(jù)運(yùn)行設(shè)備的硬件條件和任務(wù)類型(I/O 密集型或者 CPU 密集型)來實(shí)際衡量該數(shù)值的大小悼泌,但任務(wù)的提交頻率和任務(wù)的所需執(zhí)行時(shí)間是不固定的松捉,所以線程池的 maximumPoolSize 也應(yīng)該支持動(dòng)態(tài)調(diào)整

  1. 線程池中的線程應(yīng)該在什么時(shí)候被創(chuàng)建呢?

一般來說馆里,如果線程池中的線程數(shù)量還沒有達(dá)到 maximumPoolSize 時(shí)隘世,我們可以等到當(dāng)外部提交了任務(wù)時(shí)再來創(chuàng)建線程進(jìn)行處理。但是鸠踪,線程從被創(chuàng)建到被調(diào)度器選中運(yùn)行丙者,之間也是有著一定時(shí)間間隔的。從提高任務(wù)的處理響應(yīng)速度這方面考慮慢哈,我們也可以選擇預(yù)先就創(chuàng)建一批線程進(jìn)行等待

  1. 線程池中的線程可以一直存活著嗎蔓钟?

程序運(yùn)行過程中可能只是偶發(fā)性地大批量提交任務(wù),而大部分時(shí)間只是比較零散地提交少量任務(wù)卵贱,這就導(dǎo)致線程池中的線程可能會(huì)在一段時(shí)間內(nèi)處于空閑狀態(tài)滥沫。如果線程池中的線程只要?jiǎng)?chuàng)建了就可以一直存活著的話侣集,那么線程池的“性價(jià)比”就顯得沒那么高了。所以兰绣,當(dāng)線程處于空閑狀態(tài)的時(shí)間超出允許的最大空閑時(shí)間(keepAliveTime)后世分,我們就應(yīng)該將其回收,避免白白浪費(fèi)系統(tǒng)資源缀辩。而又為了避免頻繁地創(chuàng)建和銷毀線程臭埋,線程池需要緩存一定數(shù)量的線程,即使其處于空閑狀態(tài)也不會(huì)進(jìn)行回收臀玄,這類線程我們就稱之為核心線程瓢阴,相應(yīng)的線程數(shù)量就稱之為核心線程池大小(corePoolSize)健无。大于 corePoolSize 而小于等于 maximumPoolSize 的那一部分線程荣恐,就稱之為非核心線程

  1. 如何實(shí)現(xiàn)線程的復(fù)用?

我們知道,當(dāng) Thread.run() 方法執(zhí)行結(jié)束后線程就會(huì)被回收了,那么想要實(shí)現(xiàn)線程的復(fù)用期奔,那么就要考慮如何避免退出 Thread.run() 了。這里硼被,我們可以通過循環(huán)向任務(wù)隊(duì)列取值的方式來實(shí)現(xiàn)。上面有提到渗磅,如果外部提交的任務(wù)過多嚷硫,那么任務(wù)就需要被緩存到任務(wù)隊(duì)列中。那么夺溢,我們就可以考慮使用一個(gè)阻塞隊(duì)列來存放任務(wù)论巍。線程循環(huán)從任務(wù)隊(duì)列中取任務(wù),如果隊(duì)列不為空风响,那么就可以馬上拿到任務(wù)進(jìn)行執(zhí)行嘉汰;如果隊(duì)列為空,那么就讓線程一直阻塞等待状勤,直到外部提交了任務(wù)被該線程拿到或者由于超時(shí)退出循環(huán)鞋怀。通過這種循環(huán)獲取+阻塞等待的方式,就可以實(shí)現(xiàn)線程復(fù)用的目的

  1. 如何盡量實(shí)現(xiàn)線程的復(fù)用持搜?

這個(gè)問題和“如何實(shí)現(xiàn)線程的復(fù)用”不太一樣密似,“如何實(shí)現(xiàn)線程的復(fù)用”針對(duì)的是單個(gè)線程的復(fù)用流程,本問題針對(duì)的是整個(gè)線程池范圍的復(fù)用葫盼。線程池中需要使用到任務(wù)隊(duì)列進(jìn)行緩存残腌,那么任務(wù)隊(duì)列的使用時(shí)機(jī)可以有以下幾種:

  • 當(dāng)線程數(shù)已經(jīng)達(dá)到 maximumPoolSize ,且所有線程均處于工作狀態(tài)時(shí),此后外部提交的任務(wù)才被緩存到任務(wù)隊(duì)列中
  • 當(dāng)核心線程都已經(jīng)被創(chuàng)建了時(shí)抛猫,此后外部提交的任務(wù)就被緩存到任務(wù)隊(duì)列中蟆盹,當(dāng)任務(wù)隊(duì)列滿了后才創(chuàng)建非核心線程來循環(huán)處理任務(wù)

很明顯的,第二種方案會(huì)更加優(yōu)秀闺金。由于核心線程一般情況下是會(huì)被長久保留的逾滥,核心線程的存在保證了外部提交的任務(wù)一直有在被循環(huán)處理。如果外部提交的大部分都是耗時(shí)較短的任務(wù)或者任務(wù)的提交頻率比較低的話败匹,那么任務(wù)隊(duì)列就可能沒那么容易滿寨昙,第二種方案就可以盡量避免去創(chuàng)建非核心線程。而且對(duì)于“偶發(fā)性地大批量提交任務(wù)掀亩,而大部分時(shí)間只是比較零散地提交少量任務(wù)”這種情況舔哪,第二種方案也會(huì)更加合適。當(dāng)然槽棍,在任務(wù)的處理速度方面尸红,第一種方案就會(huì)高一些,但是如果想要盡量提高第二種方案的任務(wù)處理速度的話刹泄,也可以通過將任務(wù)隊(duì)列的容量調(diào)小的方式來實(shí)現(xiàn)

  1. 當(dāng)任務(wù)隊(duì)列滿了后該如何處理?

如果線程池實(shí)在“忙不過來”的話怎爵,那么任務(wù)隊(duì)列也是有可能滿的特石,那么就需要為這種情況指定處理策略。當(dāng)然鳖链,我們也可以選擇使用一個(gè)無界隊(duì)列來緩存任務(wù)姆蘸,但是無界隊(duì)列容易掩蓋掉一些程序異常。因?yàn)橛薪珀?duì)列之所以會(huì)滿芙委,可能是由于發(fā)生線程池死鎖或者依賴的某個(gè)基礎(chǔ)服務(wù)失效導(dǎo)致的逞敷,從而令線程池中的任務(wù)一直遲遲得不到解決。如果使用的是無界隊(duì)列的話灌侣,就可能使得當(dāng)系統(tǒng)發(fā)生異常時(shí)程序還是看起來運(yùn)轉(zhuǎn)正常推捐,從而降低了系統(tǒng)健壯性。所以侧啼,最常用的還是有界隊(duì)列

現(xiàn)實(shí)需求是多樣化的牛柒,在實(shí)現(xiàn)線程池時(shí)就需要留有交由外部自定義處理策略的余地。例如痊乾,當(dāng)隊(duì)列滿了后皮壁,我們可以選擇直接拋出異常來向外部“告知”這一異常情況。對(duì)于重要程度較低的任務(wù)哪审,可以選擇直接拋棄該任務(wù)蛾魄,也可以選擇拋棄隊(duì)列頭的任務(wù)而嘗試接納新到來的任務(wù)。如果任務(wù)必須被執(zhí)行的話,也可以直接就在提交任務(wù)的線程上進(jìn)行執(zhí)行

以上就是線程池在實(shí)現(xiàn)過程中需要主要考慮的幾個(gè)點(diǎn)滴须,下面就來看下 Java 實(shí)際上是怎么實(shí)現(xiàn)線程池的

三舌狗、ThreadPoolExecutor

java.util.concurrent.ThreadPoolExecutor 類就是 Java 對(duì)線程池的默認(rèn)實(shí)現(xiàn),下文如果沒有特別說明的話描馅,所說的線程池就是指 ThreadPoolExecutor

ThreadPoolExecutor 的繼承關(guān)系如下圖所示

ThreadPoolExecutor 實(shí)現(xiàn)的最頂層接口是 Executor把夸。Executor 提供了一種將任務(wù)的提交任務(wù)的執(zhí)行兩個(gè)操作進(jìn)行解耦的思路:客戶端無需關(guān)注執(zhí)行任務(wù)的線程是如何創(chuàng)建、運(yùn)行和回收的铭污,只需要將任務(wù)的執(zhí)行邏輯包裝為一個(gè) Runnable 對(duì)象傳遞進(jìn)來即可恋日,由 Executor 的實(shí)現(xiàn)類自己來完成最復(fù)雜的執(zhí)行邏輯

ExecutorService 接口在 Executor 的基礎(chǔ)上擴(kuò)展了一些功能:

  1. 擴(kuò)展執(zhí)行任務(wù)的能力。例如:獲取任務(wù)的執(zhí)行結(jié)果嘹狞、取消任務(wù)等功能
  2. 提供了關(guān)閉線程池岂膳、停止線程池,以及阻塞等待線程池完全終止的方法

AbstractExecutorService 則是上層的抽象類磅网,負(fù)責(zé)將任務(wù)的執(zhí)行流程串聯(lián)起來谈截,從而使得下層的實(shí)現(xiàn)類 ThreadPoolExecutor 只需要實(shí)現(xiàn)一個(gè)執(zhí)行任務(wù)的方法即可

也正如上文所說的那樣,ThreadPoolExecutor 可以看做是基于生產(chǎn)者-消費(fèi)者模式的一種服務(wù)涧偷,內(nèi)部維護(hù)的多個(gè)線程相當(dāng)于消費(fèi)者簸喂,提交的任務(wù)相當(dāng)于產(chǎn)品,提交任務(wù)的外部就相當(dāng)于生產(chǎn)者燎潮。其整個(gè)運(yùn)行流程如下圖所示

而在線程池的整個(gè)生命周期中喻鳄,以下三個(gè)關(guān)于線程數(shù)量的統(tǒng)計(jì)結(jié)果也影響著線程池的流程走向

  1. 當(dāng)前線程池大小(currentPoolSize)确封。表示當(dāng)前實(shí)時(shí)狀態(tài)下線程池中線程的數(shù)量
  2. 最大線程池大谐恰(maximumPoolSize)。表示線程池中允許存在的線程的數(shù)量上限
  3. 核心線程池大凶Υ(corePoolSize)颜曾。表示一個(gè)不大于 maximumPoolSize 的線程數(shù)量上限

三者之間的關(guān)系如下:

當(dāng)前線程池大小  ≤  核心線程池大小  ≤  最大線程池大小
or
核心線程池大小  ≤  當(dāng)前線程池大小  ≤  最大線程池大小

當(dāng)中,除了“當(dāng)前線程池大小”是對(duì)線程池現(xiàn)有的工作者線程進(jìn)行實(shí)時(shí)計(jì)數(shù)的結(jié)果秉剑,其它兩個(gè)值都是對(duì)線程池配置的參數(shù)值泛豪。三個(gè)值的作用在上文也都已經(jīng)介紹了

ThreadPoolExecutor 中參數(shù)最多的一個(gè)構(gòu)造函數(shù)的聲明如下所示:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
  • corePoolSize :用于指定線程池的核心線程數(shù)大小
  • maximumPoolSize:用于指定最大線程池大小
  • keepAliveTime、unit :一起用于指定線程池中空閑線程的最大存活時(shí)間
  • workQueue :任務(wù)隊(duì)列秃症,相當(dāng)于生產(chǎn)者-消費(fèi)者模式中的傳輸管道候址,用于存放待處理的任務(wù)
  • threadFactory:用于指定創(chuàng)建線程的線程工廠
  • handler:用于指定當(dāng)任務(wù)隊(duì)列已滿且線程數(shù)量達(dá)到 maximumPoolSize 時(shí)任務(wù)的處理策略

下面就以點(diǎn)見面,從細(xì)節(jié)處來把握整個(gè)線程池的流程走向

1种柑、線程池的狀態(tài)如何保存

這里所說的“狀態(tài)”指的是一個(gè)復(fù)合數(shù)據(jù)岗仑,包含“線程池生命周期狀態(tài)”和“線程池當(dāng)前線程數(shù)量”這兩項(xiàng)。線程池從啟動(dòng)到最終終止聚请,其內(nèi)部需要記錄其當(dāng)前狀態(tài)來決定流程走向荠雕。而線程池的當(dāng)前線程數(shù)量稳其,也關(guān)乎著線程是否需要進(jìn)行回收以及是否需要執(zhí)行任務(wù)的拒絕策略

線程池一共包含以下五種生命周期狀態(tài),涵蓋了線程池從啟動(dòng)到終止的這整個(gè)范圍炸卑。線程池的生命周期狀態(tài)可以按順序躍遷既鞠,但無法反向回轉(zhuǎn),每個(gè)狀態(tài)的數(shù)值大小也是逐步遞增的

//運(yùn)行狀態(tài)盖文,線程池正處于運(yùn)行中
private static final int RUNNING    = -1 << COUNT_BITS;
//關(guān)閉狀態(tài)嘱蛋,當(dāng)調(diào)用 shutdown() 方法后處于這個(gè)狀態(tài),任務(wù)隊(duì)列中的任務(wù)會(huì)繼續(xù)處理五续,但不再接受新任務(wù)洒敏,
private static final int SHUTDOWN   =  0 << COUNT_BITS;
//停止?fàn)顟B(tài),當(dāng)調(diào)用 shutdownNow() 方法后處于這個(gè)狀態(tài)
//任務(wù)隊(duì)列中的任務(wù)也不再處理且作為方法返回值返回疙驾,此后不再接受新任務(wù)
private static final int STOP       =  1 << COUNT_BITS;
//TERMINATED 之前的臨時(shí)狀態(tài)凶伙,當(dāng)線程都被回收且任務(wù)隊(duì)列已清空后就會(huì)處于這個(gè)狀態(tài)
private static final int TIDYING    =  2 << COUNT_BITS;
//終止?fàn)顟B(tài),在處于 TIDYING 狀態(tài)后會(huì)立即調(diào)用 terminated() 方法它碎,調(diào)用完成就會(huì)馬上轉(zhuǎn)到此狀態(tài)
private static final int TERMINATED =  3 << COUNT_BITS;

在日常開發(fā)中函荣,如果我們想要用一個(gè) int 類型的 state 變量來表示這五種狀態(tài)的話,那么就可能是通過讓 state 分別取值 1扳肛,2傻挂,3,4挖息,5 來進(jìn)行標(biāo)識(shí)踊谋,而 state 作為一個(gè) int 類型是一共有三十二位的,那其實(shí)上僅需要占用三位就足夠標(biāo)識(shí)了旋讹,即 2 x 2 x 2 = 8 種可能。那還剩下 29 位可以用來存放其它數(shù)據(jù)

實(shí)際上 ThreadPoolExecutor 就是通過將一個(gè) 32 位的 int 類型變量分割為兩段轿衔,高 3 位用來表示線程池的當(dāng)前生命周期狀態(tài)沉迹,低 29 位就拿來表示線程池的當(dāng)前線程數(shù)量,從而做到用一個(gè)變量值來維護(hù)兩份數(shù)據(jù)害驹,這個(gè)變量值就是 ctl鞭呕。從 ctl 的初始值就可以知道線程池的初始生命周期狀態(tài)( runState )是 RUNNING,初始線程數(shù)量 ( workerCount )是 0宛官。這種用一個(gè)變量去存儲(chǔ)兩個(gè)值的做法葫松,可以避免在做相關(guān)決策時(shí)出現(xiàn)不一致的情況,且不必為了維護(hù)兩者的一致而使用鎖底洗,后續(xù)需要獲取線程池的當(dāng)前生命周期狀態(tài)線程數(shù)量的時(shí)候腋么,也可以直接采用位運(yùn)算的方式獲取,在速度上相比基本運(yùn)算會(huì)快很多

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ThreadPoolExecutor 還聲明了以下兩個(gè)常量用來參與位運(yùn)算

//用來表示線程數(shù)量的位數(shù)亥揖,即 29
private static final int COUNT_BITS = Integer.SIZE - 3;
//線程池所能表達(dá)的最大線程數(shù)珊擂,即一個(gè)“高3位全是0圣勒,低29位全是1”的數(shù)值
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

相應(yīng)的,那么就需要有幾個(gè)方法可以來分別取“生命周期狀態(tài)”和“線程數(shù)”這兩個(gè)值摧扇,以及將這兩個(gè)值合并保存的方法圣贸,這幾個(gè)方法都使用到了位運(yùn)算

// Packing and unpacking ctl

//通過按位取反 + 按位與運(yùn)算,將 c 的高3位保留扛稽,舍棄低29位吁峻,從而得到線程池狀態(tài)
private static int runStateOf(int c)     { return c & ~CAPACITY; }

//通過按位與運(yùn)算,將 c 的高3位舍棄在张,保留低29位用含,從而得到工作線程數(shù)
private static int workerCountOf(int c)  { return c & CAPACITY; }

//rs,即 runState瞧掺,線程池的生命周期狀態(tài)
//wc耕餐,即 workerCount,工作線程數(shù)量
//通過按位或運(yùn)算來合并值
private static int ctlOf(int rs, int wc) { return rs | wc; }

private static boolean runStateLessThan(int c, int s) {
    return c < s;
}

private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}

//用于判斷線程池是否處于 RUNNING 狀態(tài)
//由于五個(gè)狀態(tài)值的大小是依次遞增的辟狈,所以只需要和 SHUTDOWN 比較即可
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

public boolean isShutdown() {
    return !isRunning(ctl.get());
}

//用于判斷當(dāng)前狀態(tài)是否處于 SHUTDOWN肠缔、STOP、TIDYING 三者中的一個(gè)
public boolean isTerminating() {
    int c = ctl.get();
    return !isRunning(c) && runStateLessThan(c, TERMINATED);
}

//用于判斷當(dāng)前狀態(tài)是否處于 TERMINATED
public boolean isTerminated() {
    return runStateAtLeast(ctl.get(), TERMINATED);
}

2哼转、線程的創(chuàng)建流程

在初始狀態(tài)下明未,客戶端每提交一個(gè)任務(wù),線程池就會(huì)通過 threadFactory 創(chuàng)建線程來處理該任務(wù)壹蔓,如果開發(fā)者沒有指定 threadFactory 的話趟妥,則會(huì)使用 Executors.DefaultThreadFactory。線程池在最先開始創(chuàng)建的線程屬于核心線程佣蓉,線程數(shù)量在大于 corePoolSize 而小于等于 maximumPoolSize 這個(gè)范圍內(nèi)的線程屬于非核心線程披摄。需要注意的是,核心線程和非核心線程并非是兩種不同類型的線程對(duì)象勇凭,這兩個(gè)概念只是對(duì)不同數(shù)量范圍內(nèi)的線程進(jìn)行的區(qū)分疚膊,實(shí)質(zhì)上這兩者指向的都是同一類型

線程的創(chuàng)建流程可以通過任務(wù)的提交流程來了解,任務(wù)的提交流程圖如下所示

線程池開放了多個(gè)讓外部提交任務(wù)的方法虾标,這里主要看 execute(Runnable command) 方法寓盗。該方法需要先后多次校驗(yàn)狀態(tài)值,因?yàn)榫€程池面對(duì)的調(diào)用方可以來自于多個(gè)不同的線程璧函】觯可能在當(dāng)前線程提交任務(wù)的同時(shí),其它線程就剛好關(guān)閉了線程池或者是調(diào)整了線程池的線程大小參數(shù)蘸吓,需要考慮當(dāng)前的線程數(shù)量是否已經(jīng)達(dá)到限制了

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        //如果當(dāng)前線程數(shù)還未達(dá)到 corePoolSize善炫,則嘗試創(chuàng)建一個(gè)核心線程來處理任務(wù)
        //addWorker 可能會(huì)因?yàn)榫€程池被關(guān)閉了、線程數(shù)量超出限制等原因返回 false
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }

    if (isRunning(c) && workQueue.offer(command)) { //線程池還處于運(yùn)行狀態(tài)且成功添加任務(wù)到任務(wù)隊(duì)列
        //需要重新檢查下運(yùn)行狀態(tài)
        //因?yàn)榈葓?zhí)行到這里時(shí)库继,線程池可能被其它線程關(guān)閉了
        int recheck = ctl.get();

        //1销部、如果線程池已經(jīng)處于非運(yùn)行狀態(tài)了
        //1.1摸航、如果移除 command 成功,則走拒絕策略
        //1.2舅桩、如果移除 command 失斀椿ⅰ(因?yàn)?command 可能已經(jīng)被其它線程拿去執(zhí)行了),則走第 3 步
        //2擂涛、如果線程池還處于運(yùn)行狀態(tài)读串,則走第 3 步
        //3、如果當(dāng)前線程數(shù)量為 0撒妈,則創(chuàng)建線程進(jìn)行處理
        //第 3 步的意義在于:corePoolSize 可以被設(shè)為 0恢暖,所以這里需要檢查下,在需要的時(shí)候創(chuàng)建一個(gè)非核心線程
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //如果線程池處于非運(yùn)行狀態(tài)了狰右,或者是處于運(yùn)行狀態(tài)但隊(duì)列已滿了杰捂,此時(shí)就會(huì)走到這里
    //在這里嘗試創(chuàng)建一個(gè)非核心線程
    //如果線程創(chuàng)建失敗,說明要么是線程池當(dāng)前狀態(tài)大于等于 STOP棋蚌,或者是任務(wù)隊(duì)列已滿且線程總數(shù)達(dá)到 maximumPoolSize 了
    //此時(shí)就走拒絕策略
    else if (!addWorker(command, false))
        reject(command);
}

當(dāng)中嫁佳,addWorker(Runnable firstTask, boolean core) 方法用于嘗試創(chuàng)建并啟動(dòng)線程,同時(shí)將線程保存到 workers谷暮。第一個(gè)參數(shù)用于指定線程啟動(dòng)時(shí)需要執(zhí)行的第一個(gè)任務(wù)蒿往,可以為 null。第二個(gè)參數(shù)用于指定要?jiǎng)?chuàng)建的是否是核心線程湿弦,這個(gè)參數(shù)會(huì)關(guān)系到線程是否能被成功創(chuàng)建

該方法在實(shí)際創(chuàng)建線程前瓤漏,都需要先通過 CAS 來更新(遞增加一)當(dāng)前的線程總數(shù),通過 for 循環(huán)來不斷進(jìn)行重試颊埃。當(dāng) CAS 成功后蔬充,則會(huì)再來實(shí)際進(jìn)行線程的創(chuàng)建操作。但在這時(shí)候線程也未必能夠創(chuàng)建成功班利,因?yàn)樵?CAS 成功后線程池可能被關(guān)閉了娃惯,或者是在創(chuàng)建線程時(shí)拋出異常了,此時(shí)就需要回滾對(duì) workerCount 的修改

該方法如果返回 true肥败,意味著新創(chuàng)建了一個(gè) Worker 線程,同時(shí)線程也被啟動(dòng)了

該方法如果返回 false愕提,則可能是由于以下情況:

  1. 生命周期狀態(tài)大于等于 STOP
  2. 生命周期狀態(tài)等于 SHUTDOWN馒稍,但 firstTask 不為 null,或者任務(wù)隊(duì)列為空
  3. 當(dāng)前線程數(shù)已經(jīng)超出限制
  4. 符合創(chuàng)建線程的條件浅侨,但創(chuàng)建過程中或啟動(dòng)線程的過程中拋出了異常
private boolean addWorker(Runnable firstTask, boolean core) {
    //下面的 for 主要邏輯:
    //在創(chuàng)建線程前通過 CAS 原子性地將“工作者線程數(shù)量遞增加一”
    //由于 CAS 可能會(huì)失敗纽谒,所以將之放到 for 循環(huán)中進(jìn)行循環(huán)重試
    //每次循環(huán)前后都需要檢查下當(dāng)前狀態(tài)是否允許創(chuàng)建線程
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))   
            //當(dāng)外部調(diào)用 shutdown() 方法后,線程池狀態(tài)會(huì)變遷為 SHUTDOWN
            //此時(shí)依然允許創(chuàng)建線程來對(duì)隊(duì)列中的任務(wù)進(jìn)行處理如输,但是不會(huì)再接受新任務(wù)
            //除這種情況之外不允許在非 RUNNING 的時(shí)候還創(chuàng)建線程
            return false;

        for (;;) {  
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                //當(dāng)前線程數(shù)已經(jīng)超出最大限制
                return false;
            if (compareAndIncrementWorkerCount(c))
                //通過 CAS 更新工作者線程數(shù)成功后就跳出循環(huán)鼓黔,去實(shí)際創(chuàng)建線程
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                //循環(huán)過程中線程池狀態(tài)被改變了央勒,重新循環(huán)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    //更新線程池曾經(jīng)達(dá)到的最大線程數(shù)
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //如果線程沒有被成功啟動(dòng),則需要將該任務(wù)從隊(duì)列中移除并重新更新工作者線程數(shù)
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

3澳化、線程的執(zhí)行流程

上面所講的線程其實(shí)指的是 ThreadPoolExecutor 的內(nèi)部類 Worker 崔步,Worker 內(nèi)部包含了一個(gè) Thread 對(duì)象,所以本文就把 Worker 實(shí)例也看做線程來對(duì)待

Worker 繼承于 AbstractQueuedSynchronizer缎谷,意味著 Worker 就相當(dāng)于一個(gè)鎖井濒。之沒有使用 synchronized 或者 ReentrantLock,是因?yàn)樗鼈兌际?strong>可重入鎖列林,Worker 繼承于 AQS 為的就是自定義實(shí)現(xiàn)不可重入的特性來輔助判斷線程是否處于執(zhí)行任務(wù)的狀態(tài):在開始執(zhí)行任務(wù)前進(jìn)行加鎖瑞你,在任務(wù)執(zhí)行結(jié)束后解鎖,以便在后續(xù)通過判斷 Worker 是否處于鎖定狀態(tài)來得知其是否處于執(zhí)行階段

  1. Worker 在開始執(zhí)行任務(wù)前會(huì)執(zhí)行 Worker.lock() 希痴,表明線程正在執(zhí)行任務(wù)
  2. 如果 Worker 處于鎖定狀態(tài)者甲,則不應(yīng)該對(duì)其進(jìn)行中斷,避免任務(wù)執(zhí)行一半就被打斷
  3. 如果 Worker 處于非鎖定狀態(tài)砌创,說明其當(dāng)前是處于阻塞獲取任務(wù)的狀態(tài)虏缸,此時(shí)才允許對(duì)其進(jìn)行中斷
  4. 線程池在執(zhí)行 shutdown() 方法或 shutdownNow() 方法時(shí)會(huì)調(diào)用 interruptIdleWorkers() 方法來回收空閑的線程,interruptIdleWorkers() 方法會(huì)使用Worker.tryLock() 方法來嘗試獲取鎖纺铭,由于 Worker 是不可重入鎖寇钉,所以如果鎖獲取成功就說明線程處于空閑狀態(tài),此時(shí)才可以進(jìn)行回收

Worker 同時(shí)也是 Runnable 類型舶赔,thread 是通過 getThreadFactory().newThread(this) 來創(chuàng)建的扫倡,即將 Worker 本身作為構(gòu)造參數(shù)傳給 Thread 進(jìn)行初始化,所以在 thread 啟動(dòng)的時(shí)候 Worker 的 run() 方法就會(huì)被執(zhí)行

private final class Worker extends AbstractQueuedSynchronizer implements Runnable  {
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;

    //線程要執(zhí)行的第一個(gè)任務(wù)竟纳,可能為 null
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;

    //用于標(biāo)記 Worker 執(zhí)行過的任務(wù)數(shù)(不管成功與否都記錄)
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }

    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    //只有在 state 值為 0 的時(shí)候才能獲取到鎖撵溃,以此實(shí)現(xiàn)不可重入的特性
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    //向線程發(fā)起中斷請求
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

runWorker(Worker) 方法就是線程正式進(jìn)行任務(wù)執(zhí)行的地方。該方法通過 while 循環(huán)不斷從任務(wù)隊(duì)列中取出任務(wù)來進(jìn)行執(zhí)行锥累,如果 getTask()方法返回了 null缘挑,那此時(shí)就需要將此線程進(jìn)行回收。如果在任務(wù)執(zhí)行過程中拋出了異常桶略,那也需要回收此線程

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;

    //因?yàn)?Worker 的默認(rèn)值是 -1语淘,而 Worker 的 interruptIfStarted() 方法只有在 state >=0 的時(shí)候才允許進(jìn)行中斷
    //所以這里調(diào)用 unlock() 并不是為了解鎖,而是為了讓 Worker 的 state 值變?yōu)?0际歼,讓 Worker 允許外部進(jìn)行中斷
    //所以惶翻,即使客戶端調(diào)用了 shutdown 或者 shutdownNow 方法,在 Worker 線程還未執(zhí)行到這里前鹅心,無法在 interruptWorkers() 方法里發(fā)起中斷請求
    w.unlock(); // allow interrupts

    //用于標(biāo)記是否由于被打斷而非正常結(jié)束導(dǎo)致的線程終止
    //為 true 表示非正常結(jié)束
    boolean completedAbruptly = true;

    try {
        // 如果 getTask() 為 null吕粗,說明線程池已經(jīng)被停止或者需要進(jìn)行線程回收
        while (task != null || (task = getTask()) != null) {

            //在開始執(zhí)行任務(wù)前進(jìn)行加鎖,在任務(wù)執(zhí)行結(jié)束后解鎖
            //以便在后續(xù)通過判斷 Worker 是否處于鎖定狀態(tài)來得知其是否處于執(zhí)行階段
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            //確保當(dāng)狀態(tài)值大于等于 STOP 時(shí)有向線程發(fā)起過中斷請求
            if ((runStateAtLeast(ctl.get(), STOP) 
                ||
                (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) 
                &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //回收此線程
        processWorkerExit(w, completedAbruptly);
    }
}

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (; ; ) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        //如何當(dāng)前狀態(tài)大于等于 STOP旭愧,則返回 null
        //如何當(dāng)前狀態(tài)是 SHUTDOWN 且任務(wù)隊(duì)列為空颅筋,則返回 null
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        //timed 用于標(biāo)記從任務(wù)隊(duì)列中取任務(wù)時(shí)是否需要進(jìn)行超時(shí)控制
        //如果允許回收空閑核心線程或者是當(dāng)前的線程總數(shù)已經(jīng)超出 corePoolSize 了宙暇,那么就需要進(jìn)行超時(shí)控制
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        //1. 線程總數(shù)超出 maximumPoolSize
        //2. 允許回收核心線程,且核心線程的空閑時(shí)間已達(dá)到限制了
        //如果以上兩種情況之一有一個(gè)滿足议泵,且當(dāng)前線程數(shù)大于 1 或者任務(wù)隊(duì)列為空時(shí)就返回 null(如果 CAS 更新 WorkerCount 成功的話)
        //避免在任務(wù)隊(duì)列不為空且只有一個(gè)線程時(shí)還回收線程導(dǎo)致任務(wù)沒人處理
        if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
            if (r != null)
                return r;
            //如果 r 為 null占贫,說明是由于超時(shí)導(dǎo)致 poll 返回了 null
            //在下一次循環(huán)時(shí)將判斷是否回收此線程
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

getTask() 方法獲取任務(wù)的流程圖如下所示:

4、線程的回收流程

當(dāng)外部調(diào)用了線程池的以下幾個(gè)方法之一時(shí)肢簿,就會(huì)觸發(fā)到線程的回收機(jī)制:

  1. 允許回收核心線程:allowCoreThreadTimeOut()
  2. 重置核心線程池大邪薪!:setCorePoolSize()
  3. 重置最大線程池大小:setMaximumPoolSize()
  4. 重置線程最大空閑時(shí)間:setKeepAliveTime()
  5. 關(guān)閉線程池:shutdown()
  6. 停止線程池:shutdownNow()
/**
 * 用于控制核心線程是否可以由于空閑時(shí)間超時(shí)而被回收
 * 超時(shí)時(shí)間和非核心線程一樣由 keepAliveTime 來指定
 *
 * @param value
 */
public void allowCoreThreadTimeOut(boolean value) {
    if (value && keepAliveTime <= 0)
        throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
    if (value != allowCoreThreadTimeOut) {
        allowCoreThreadTimeOut = value;
        if (value)
            //回收掉空閑線程
            interruptIdleWorkers();
    }
}

/**
 * 重置 corePoolSize
 *
 * @param corePoolSize
 */
public void setCorePoolSize(int corePoolSize) {
    if (corePoolSize < 0)
        throw new IllegalArgumentException();
    int delta = corePoolSize - this.corePoolSize;
    this.corePoolSize = corePoolSize;
    if (workerCountOf(ctl.get()) > corePoolSize)
        //如果當(dāng)前的線程總數(shù)已經(jīng)超出新的 corePoolSize 的話那就進(jìn)行線程回收
        interruptIdleWorkers();
    else if (delta > 0) {
        //會(huì)走進(jìn)這里池充,說明新的 corePoolSize 比原先的大桩引,但當(dāng)前線程總數(shù)還小于等于新的 corePoolSize
        //此時(shí)如果任務(wù)隊(duì)列不為空的話,那么就需要?jiǎng)?chuàng)建一批新的核心線程來處理任務(wù)
        //delta 和 workQueueSize 中的最小值就是需要啟動(dòng)的線程數(shù)
        //而如果在創(chuàng)建過程中任務(wù)隊(duì)列已經(jīng)空了(被其它線程拿去處理了)收夸,那就不再創(chuàng)建線程
        int k = Math.min(delta, workQueue.size());
        while (k-- > 0 && addWorker(null, true)) {
            if (workQueue.isEmpty())
                break;
        }
    }
}

/**
 * 用于設(shè)置線程池允許存在的最大活躍線程數(shù)
 *
 * @param maximumPoolSize
 */
public void setMaximumPoolSize(int maximumPoolSize) {
    if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
        throw new IllegalArgumentException();
    this.maximumPoolSize = maximumPoolSize;
    if (workerCountOf(ctl.get()) > maximumPoolSize)
        //回收掉空閑線程
        interruptIdleWorkers();
}

/**
 * 用于設(shè)置非核心線程在空閑狀態(tài)能夠存活的時(shí)間
 *
 * @param time
 * @param unit
 */
public void setKeepAliveTime(long time, TimeUnit unit) {
    if (time < 0)
        throw new IllegalArgumentException();
    //為了避免頻繁創(chuàng)建線程坑匠,核心線程如果允許超時(shí)回收的話,超時(shí)時(shí)間不能為 0
    if (time == 0 && allowsCoreThreadTimeOut())
        throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
    long keepAliveTime = unit.toNanos(time);
    long delta = keepAliveTime - this.keepAliveTime;
    this.keepAliveTime = keepAliveTime;
    if (delta < 0) //如果新設(shè)置的值比原先的超時(shí)時(shí)間小的話卧惜,那就需要去回收掉空閑線程
        interruptIdleWorkers();
}

 /**
 * 關(guān)閉線程池
 */
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        //將當(dāng)前狀態(tài)設(shè)置為 SHUTDOWN
        advanceRunState(SHUTDOWN);
        //回收掉空閑線程
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    //嘗試看是否能把線程池狀態(tài)置為 TERMINATED
    tryTerminate();
}

/**
 * 停止線程池
 *
 * @return 任務(wù)隊(duì)列中緩存的所有任務(wù)
 */
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        //將當(dāng)前狀態(tài)設(shè)置為 STOP
        advanceRunState(STOP);
        //回收掉空閑線程
        interruptWorkers();
        //獲取任務(wù)隊(duì)列中緩存的所有任務(wù)
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    //嘗試看是否能把線程池狀態(tài)置為 TERMINATED
    tryTerminate();
    return tasks;
}

上述的幾個(gè)方法最終都會(huì)調(diào)用 interruptIdleWorkers(boolean onlyOne) 方法來回收空閑線程厘灼。該方法通過向線程發(fā)起中斷請求來使 Worker 退出 runWorker(Worker w) 方法,最終會(huì)調(diào)用 processWorkerExit(Worker w, boolean completedAbruptly) 方法來完成實(shí)際的線程回收操作

private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            //僅在線程的中斷標(biāo)記為 false 時(shí)才發(fā)起中斷咽瓷,避免重復(fù)發(fā)起中斷請求
            //且僅在 w.tryLock() 能成功(即 Worker 并非處于執(zhí)行任務(wù)的階段)才發(fā)起中斷设凹,避免任務(wù)還未執(zhí)行完就被打斷
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

/**
 * 回收線程
 *
 * @param w                 Worker
 * @param completedAbruptly 是否是由于任務(wù)執(zhí)行過程拋出異常導(dǎo)致需要來回收線程
 *                          true:由于任務(wù)拋出異常
 *                          false:由于線程空閑時(shí)間達(dá)到限制條件
 */
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //更新線程池總共處理過的任務(wù)數(shù)
        completedTaskCount += w.completedTasks;
        //移除此線程
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        //在任務(wù)隊(duì)列不為空的時(shí)候,需要確保至少有一個(gè)線程可以來處理任務(wù)茅姜,否則就還是需要再創(chuàng)建一個(gè)新線程
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && !workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

除了上述幾個(gè)方法會(huì)主動(dòng)觸發(fā)到線程回收機(jī)制外闪朱,當(dāng)線程池滿足以下幾種情況之一時(shí),也會(huì)進(jìn)行線程的回收:

  1. 非核心線程的空閑時(shí)間超出了 keepAliveTime
  2. allowCoreThreadTimeOut 為 true 且核心線程的空閑時(shí)間超出了 keepAliveTime

以上幾種情況其觸發(fā)時(shí)機(jī)主要看 getTask() 方法就可以钻洒。在向任務(wù)隊(duì)列 workQueue 獲取任務(wù)前奋姿,通過判斷當(dāng)前線程池的 allowCoreThreadTimeOut、corePoolSize素标、workerCount 等參數(shù)來決定是否需要對(duì)“從任務(wù)隊(duì)列獲取任務(wù)”這個(gè)操作進(jìn)行限時(shí)称诗。如果需要進(jìn)行限時(shí)且獲取任務(wù)的時(shí)間超出 keepAliveTime 的話,那就說明此線程的空閑時(shí)間已經(jīng)達(dá)到限制了头遭,需要對(duì)其進(jìn)行回收

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (; ; ) {
        int c = ctl.get();
        int rs = runStateOf(c);

        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        //timed 用于標(biāo)記從任務(wù)隊(duì)列中取任務(wù)時(shí)是否需要進(jìn)行超時(shí)控制
        //如果允許回收空閑核心線程或者是當(dāng)前的線程總數(shù)已經(jīng)超出 corePoolSize 了寓免,那么就需要進(jìn)行超時(shí)控制
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        //1. 線程總數(shù)超出 maximumPoolSize
        //2. 允許回收核心線程,且核心線程的空閑時(shí)間已達(dá)到限制了
        //如果以上兩種情況之一有一個(gè)滿足计维,且當(dāng)前線程數(shù)大于 1 或者任務(wù)隊(duì)列為空時(shí)就返回 null(如果 CAS 更新 WorkerCount 成功的話)
        //避免在任務(wù)隊(duì)列不為空且只有一個(gè)線程時(shí)還回收線程導(dǎo)致任務(wù)沒人處理
        if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
            //如果能執(zhí)行到 timedOut = true 說明是由于超時(shí)導(dǎo)致 poll 返回了 null
            //之所以不在判斷到 r 不為 null 的時(shí)候就直接 return 出去
            //是因?yàn)榭赡茉讷@取任務(wù)的過程中外部又重新修改了 allowCoreThreadTimeOut 和 corePoolSize 等配置
            //導(dǎo)致此時(shí)又不需要回收此線程了袜香,所以就在下一次循環(huán)時(shí)再判斷是否回收此線程
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

以上就是線程池基本所有的線程回收流程。線程回收機(jī)制有助于節(jié)約系統(tǒng)資源享潜,但如果 corePoolSize、keepAliveTime 等參數(shù)設(shè)置得和系統(tǒng)的實(shí)際運(yùn)行情況不符的話嗅蔬,反而會(huì)導(dǎo)致線程頻繁地被創(chuàng)建和回收剑按,反而加大了資源開銷

5疾就、線程池的關(guān)閉流程

shutdown()shutdownNow() 方法可以用來關(guān)閉和停止線程池

  • shutdown()。使用該方法艺蝴,已提交的任務(wù)會(huì)被繼續(xù)執(zhí)行猬腰,而后續(xù)新提交的任務(wù)則會(huì)走拒絕策略。該方法返回時(shí)猜敢,線程池可能尚未走向終止?fàn)顟B(tài) TERMINATED姑荷,即線程池中可能還有線程還在執(zhí)行任務(wù)
  • shutdownNow()。使用該方法缩擂,正在運(yùn)行的線程會(huì)嘗試停止鼠冕,任務(wù)隊(duì)列中的任務(wù)也不會(huì)執(zhí)行而是作為方法返回值返回。由于該方法是通過調(diào)用 Thread.interrupt() 方法來停止正在執(zhí)行的任務(wù)的胯盯,因此某些無法響應(yīng)中斷的任務(wù)可能需要等到任務(wù)完成后才能停止線程

由于這兩個(gè)方法調(diào)用過后線程池都不會(huì)再接收新任務(wù)了懈费,所以在回收空閑線程后,還需要檢查下線程是否都已經(jīng)回收完畢了博脑,是的話則需要將線程池的生命周期狀態(tài)向 TIDYING 和 TERMINATED 遷移

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        //在以下幾種情況不需要終止線程池:
        //1.還處于運(yùn)行狀態(tài)
        //2.已經(jīng)處于 TIDYING 或 TERMINATED 狀態(tài)
        //3.處于 SHUTDOWN 狀態(tài)且還有待處理的任務(wù)
        if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        //在達(dá)到 TIDYING 狀態(tài)前需要確保所有線程都被關(guān)閉了
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    //terminated() 方法執(zhí)行完畢后憎乙,線程池狀態(tài)就從 TIDYING 轉(zhuǎn)為 TERMINATED 了,此時(shí)線程池就走向終止了
                    terminated();
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
                    //喚醒所有在等待線程池 TERMINATED 的線程
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

6叉趣、任務(wù)隊(duì)列的選擇

阻塞隊(duì)列(BlockingQueue)是一個(gè)支持兩個(gè)附加操作的隊(duì)列泞边。這兩個(gè)附加的操作是:在隊(duì)列為空時(shí),獲取數(shù)據(jù)的線程會(huì)阻塞等待直到從隊(duì)列獲取到任務(wù)疗杉。當(dāng)隊(duì)列已滿時(shí)阵谚,存儲(chǔ)數(shù)據(jù)的線程會(huì)阻塞等待直到隊(duì)列空出位置可以存入數(shù)據(jù)。阻塞隊(duì)列常用于生產(chǎn)者和消費(fèi)者的場景乡数,生產(chǎn)者是往隊(duì)列里添加數(shù)據(jù)的線程椭蹄,消費(fèi)者是從隊(duì)列里取出數(shù)據(jù)的線程。阻塞隊(duì)列就是生產(chǎn)者存放數(shù)據(jù)的容器净赴,而消費(fèi)者也只從容器里取數(shù)據(jù)

線程池實(shí)現(xiàn)解耦的關(guān)鍵就是有了 任務(wù)隊(duì)列/阻塞隊(duì)列 的存在绳矩。線程池中是以生產(chǎn)者消費(fèi)者模式+阻塞隊(duì)列來實(shí)現(xiàn)的,任務(wù)隊(duì)列負(fù)責(zé)緩存外部提交的任務(wù)玖翅,線程負(fù)責(zé)從任務(wù)隊(duì)列取出任務(wù)翼馆,這樣客戶端提交的任務(wù)就避免了和線程直接關(guān)聯(lián)

選擇不同的阻塞隊(duì)列可以實(shí)現(xiàn)不一樣的任務(wù)存取策略:

7、任務(wù)的拒絕策略

隨著客戶端不斷地提交任務(wù)金度,當(dāng)前線程池大小也會(huì)不斷增加应媚。在當(dāng)前線程池大小達(dá)到 corePoolSize 的時(shí)候,新提交的任務(wù)會(huì)被緩存到任務(wù)隊(duì)列之中猜极,由線程后續(xù)不斷從隊(duì)列中取出任務(wù)并執(zhí)行中姜。當(dāng)任務(wù)隊(duì)列滿了之后,線程池就會(huì)創(chuàng)建非核心線程。當(dāng)線程總數(shù)達(dá)到 maximumPoolSize 且所有線程都處于工作狀態(tài)丢胚,同時(shí)任務(wù)隊(duì)列也滿了后翩瓜,客戶端再次提交任務(wù)時(shí)就會(huì)被拒絕。而被拒絕的任務(wù)具體的處理策略則由 RejectedExecutionHandler 來進(jìn)行定義

public interface RejectedExecutionHandler {

    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
    
}

當(dāng)客戶端提交的任務(wù)被拒絕時(shí)携龟,線程池關(guān)聯(lián)的 RejectedExecutionHandler 對(duì)象的 rejectedExecution 方法就會(huì)被調(diào)用兔跌,相應(yīng)的拒絕策略可以由客戶端來指定

ThreadPoolExecutor 提供了以下幾種拒絕策略,默認(rèn)使用的是 AbortPolicy

實(shí)現(xiàn)類 策略
AbortPolicy 直接拋出異常峡蟋,是 ThreadPoolExecutor 的默認(rèn)策略
DiscardPolicy 直接丟棄該任務(wù)坟桅,不做任何響應(yīng)也不會(huì)拋出異常
DiscardOldestPolicy 如果線程池未被停止,則將工作隊(duì)列中最老的任務(wù)丟棄蕊蝗,然后嘗試接納該任務(wù)
CallerRunsPolicy 如果線程池未被停止仅乓,則直接在客戶端線程上執(zhí)行該任務(wù)

任務(wù)的拒絕策略只會(huì)在提交任務(wù)的時(shí)候被觸發(fā),即只在 execute(Runnable command) 方法中被觸發(fā)到匿又。execute(Runnable command) 方法會(huì)判斷當(dāng)前狀態(tài)是否允許接受該任務(wù)方灾,如果不允許的話則會(huì)走拒絕任務(wù)的流程

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }

    if (isRunning(c) && workQueue.offer(command)) { //線程池還處于運(yùn)行狀態(tài)且成功添加任務(wù)到任務(wù)隊(duì)列
        //需要重新檢查下運(yùn)行狀態(tài)
        //因?yàn)榈葓?zhí)行到這里時(shí),線程池可能被其它線程關(guān)閉了
        int recheck = ctl.get();

        //1碌更、如果線程池已經(jīng)處于非運(yùn)行狀態(tài)了
        //1.1裕偿、如果移除 command 成功,則走拒絕策略
        //1.2痛单、如果移除 command 失敽偌(因?yàn)?command 可能已經(jīng)被其它線程拿去執(zhí)行了),則走第 3 步
        //2旭绒、如果線程池還處于運(yùn)行狀態(tài)鸟妙,則走第 3 步
        //3、如果當(dāng)前線程數(shù)量為 0挥吵,則創(chuàng)建線程進(jìn)行處理
        //第 3 步的意義在于:corePoolSize 可以被設(shè)為 0重父,所以這里需要檢查下,在需要的時(shí)候創(chuàng)建一個(gè)非核心線程
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //如果線程池處于非運(yùn)行狀態(tài)了忽匈,或者是處于運(yùn)行狀態(tài)但隊(duì)列已滿了房午,此時(shí)就會(huì)走到這里
    //在這里嘗試創(chuàng)建一個(gè)非核心線程
    //如果線程創(chuàng)建失敗,說明要么是線程池當(dāng)前狀態(tài)大于等于 STOP丹允,或者是任務(wù)隊(duì)列已滿且線程總數(shù)達(dá)到 maximumPoolSize 了
    //此時(shí)就走拒絕策略
    else if (!addWorker(command, false))
        reject(command);
}

final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

8郭厌、監(jiān)控線程池的運(yùn)行狀態(tài)

ThreadPoolExecutor 提供了多個(gè)配置參數(shù)以便滿足多種不同的需求,這些配置參數(shù)包含:corePoolSize雕蔽、maximumPoolSize折柠、keepAliveTime、allowCoreThreadTimeOut 等批狐。但很多時(shí)候我們一開始使用線程池時(shí)并不知道該如何配置參數(shù)才最為適應(yīng)當(dāng)前需求扇售,那么就只能通過監(jiān)控線程池的運(yùn)行狀態(tài)來進(jìn)行考察,最終得到一份最合理的配置參數(shù)

可以通過 ThreadPoolExecutor 的以下幾個(gè)屬性來監(jiān)控線程池的運(yùn)行狀態(tài):

  1. taskCount:線程池已執(zhí)行結(jié)束(不管成功與否)的任務(wù)數(shù)加上任務(wù)隊(duì)列中目前包含的任務(wù)數(shù)
  2. completedTaskCount:線程池已執(zhí)行結(jié)束(不管成功與否)的任務(wù)數(shù),小于等于 taskCount
  3. largestPoolSize:線程池曾經(jīng)創(chuàng)建過的最大線程數(shù)量承冰。如果該數(shù)值等于 maximumPoolSize 那就說明線程池曾經(jīng)滿過
  4. getPoolSize():獲取當(dāng)前線程總數(shù)
  5. getActiveCount():獲取當(dāng)前正在執(zhí)行任務(wù)的線程總數(shù)

此外嘱根,ThreadPoolExecutor 也預(yù)留了幾個(gè)鉤子方法可以由子類去實(shí)現(xiàn)。通過以下幾個(gè)方法巷懈,就可以實(shí)現(xiàn)每個(gè)任務(wù)開始執(zhí)行前和執(zhí)行后,以及線程池走向終止時(shí)插入一些自定義的監(jiān)控代碼慌洪,以此來實(shí)現(xiàn):計(jì)算任務(wù)的平均執(zhí)行時(shí)間顶燕、最小執(zhí)行時(shí)間和最大執(zhí)行時(shí)間等功能

protected void beforeExecute(Thread t, Runnable r) { }

protected void afterExecute(Runnable r, Throwable t) { }

protected void terminated() { }

四、Executors

Executors 是 JDK 提供的一個(gè)線程池創(chuàng)建工具類冈爹,封裝了很多個(gè)創(chuàng)建 ExecutorService 實(shí)例的方法涌攻,這里就來介紹下這幾個(gè)方法,這些線程池的差別主要都是由于選擇了不同的任務(wù)隊(duì)列導(dǎo)致的频伤,讀者需要先認(rèn)識(shí)下以下幾種任務(wù)隊(duì)列

newFixedThreadPool方法創(chuàng)建的線程池吹由,核心線程數(shù)和最大線程數(shù)都是 nThreads咒林,所以線程池在任何時(shí)候最多也只會(huì)有 nThreads 個(gè)線程在同時(shí)運(yùn)行,且在停止線程池前所有線程都不會(huì)被回收。LinkedBlockingQueue 的默認(rèn)容量是 Integer.MAX_VALUE供屉,近乎無限,在線程繁忙的情況下有可能導(dǎo)致等待處理的任務(wù)持續(xù)堆積秕豫,使得系統(tǒng)頻繁 GC硕并,最終導(dǎo)致 OOM

此類線程池適合用于希望所有任務(wù)都能夠被執(zhí)行的情況

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}

newSingleThreadExecutor方法創(chuàng)建的線程池,核心線程數(shù)和最大線程數(shù)都是 1怎炊,所以線程池在任何時(shí)候最多也只會(huì)有 1 個(gè)線程在同時(shí)運(yùn)行谭企,且在停止線程池前所有線程都不會(huì)被回收。由于使用了 LinkedBlockingQueue评肆,所以在極端情況下也是有發(fā)生 OOM 的可能

此類線程池適合用于執(zhí)行需要串行處理的任務(wù)债查,或者是任務(wù)的提交間隔比任務(wù)的執(zhí)行時(shí)間長的情況

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>(),
                                threadFactory));
}

newCachedThreadPool方法創(chuàng)建的線程池,核心線程數(shù)是 0瓜挽,最大線程數(shù)是 Integer.MAX_VALUE盹廷,所以允許同時(shí)運(yùn)行的線程數(shù)量近乎無限。再加上 SynchronousQueue 是一個(gè)不儲(chǔ)存元素的阻塞隊(duì)列秸抚,每當(dāng)有新任務(wù)到來時(shí)速和,如果當(dāng)前沒有空閑線程的話就會(huì)馬上啟動(dòng)一個(gè)新線程來執(zhí)行任務(wù),這使得任務(wù)總是能夠很快被執(zhí)行剥汤,提升了響應(yīng)速度颠放,但同時(shí)也存在由于要執(zhí)行的任務(wù)過多導(dǎo)致一直創(chuàng)建線程的可能性,這在任務(wù)耗時(shí)過長且任務(wù)量過多的情況下也可能導(dǎo)致 OOM

此類線程池適合用于對(duì)任務(wù)的處理速度要求比較高的情況

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}

newScheduledThreadPool方法創(chuàng)建的線程池對(duì)應(yīng)的是 ScheduledThreadPoolExecutor吭敢,其繼承于 ThreadPoolExecutor 并實(shí)現(xiàn)了 ScheduledExecutorService 接口碰凶,在線程池的基礎(chǔ)上擴(kuò)展實(shí)現(xiàn)了執(zhí)行定時(shí)任務(wù)的能力。ScheduledThreadPoolExecutor 的核心線程數(shù)由入?yún)?corePoolSize 決定,最大線程數(shù)是 Integer.MAX_VALUE欲低,keepAliveTime 是 0 秒辕宏,所以該線程池可能同時(shí)運(yùn)行近乎無限的線程,但一旦當(dāng)前沒有待執(zhí)行的任務(wù)的話砾莱,線程就會(huì)馬上被回收

此類線程池適合用于需要定時(shí)多次執(zhí)行特定任務(wù)的情況

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public static ScheduledExecutorService newScheduledThreadPool(
        int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

newSingleThreadScheduledExecutor方法 newScheduledThreadPool 方法基本一樣瑞筐,只是直接指定了核心線程數(shù)為 1

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1));
}

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1, threadFactory));
}

五、線程池故障

1腊瑟、線程池死鎖

多個(gè)線程會(huì)因?yàn)檠h(huán)等待對(duì)方持有的排他性資源而導(dǎo)致死鎖聚假,線程池也可能會(huì)因?yàn)槎鄠€(gè)任務(wù)間的相互依賴而導(dǎo)致線程池死鎖。例如闰非,如果在線程池中執(zhí)行的任務(wù) A 在其執(zhí)行過程中又向同個(gè)線程池提交了任務(wù) B膘格,且任務(wù) A 的執(zhí)行結(jié)束又依賴于任務(wù) B 的執(zhí)行結(jié)果,那么就可能出現(xiàn)這樣的一種極端情形:線程池中的所有正在執(zhí)行任務(wù)的線程都在等待其它任務(wù)的處理結(jié)果财松,而這些任務(wù)均在任務(wù)隊(duì)列中處于待執(zhí)行狀態(tài)瘪贱,且由于線程總數(shù)已經(jīng)達(dá)到線程池的最大線程數(shù)量限制,所以任務(wù)隊(duì)列中的任務(wù)就會(huì)一直無法被執(zhí)行辆毡,最終導(dǎo)致所有任務(wù)都無法完成菜秦,從而形成線程池死鎖

因此,提交給同一個(gè)線程池的任務(wù)必須是沒有互相依賴關(guān)系的舶掖。對(duì)于有依賴關(guān)系的任務(wù)喷户,應(yīng)該提交給不同的線程池,以此來避免死鎖的發(fā)生

2访锻、線程泄漏

線程泄漏指由于某種原因?qū)е戮€程池中實(shí)際可用的線程變少的一種異常情況褪尝。如果線程泄漏持續(xù)存在,那么線程池中的線程會(huì)越來越少期犬,最終使得線程池再也無法處理任務(wù)河哑。導(dǎo)致線程泄露的原因可能有兩種:由于線程異常自動(dòng)終止或者由于程序缺陷導(dǎo)致線程處于非有效運(yùn)行狀態(tài)。前者通常是由于 Thread.run() 方法中沒有捕獲到任務(wù)拋出的 Exception 或者 Error 導(dǎo)致的龟虎,使得相應(yīng)線程被提前終止而沒有相應(yīng)更新線程池當(dāng)前的線程數(shù)量璃谨,ThreadPoolExecutor 內(nèi)部已經(jīng)對(duì)這種情形進(jìn)行了預(yù)防。后者可能是由于客戶端提交的任務(wù)包含阻塞操作(Object.wait() 等操作)鲤妥,而該操作又沒有相應(yīng)的時(shí)間或者條件方面的限制佳吞,那么就有可能導(dǎo)致線程一直處于等待狀態(tài)而無法執(zhí)行其它任務(wù),這樣最終也是形成了線程泄漏

六棉安、總結(jié)

線程池通過復(fù)用一定數(shù)量的線程來執(zhí)行不斷被提交的任務(wù)底扳,除了可以節(jié)約線程這種有限而昂貴的資源外,還包含以下好處:

  • 提高響應(yīng)速度贡耽。ThreadPoolExecutor 提供了預(yù)先創(chuàng)建一定數(shù)量的線程的功能衷模,使得后續(xù)提交的任務(wù)可以立即被執(zhí)行而無須等待線程被創(chuàng)建鹊汛,從而提高了系統(tǒng)的響應(yīng)速度
  • 抵消線程創(chuàng)建的開銷。一個(gè)線程可以先后用于執(zhí)行多個(gè)任務(wù)阱冶,那創(chuàng)建線程帶來的成本(資源和時(shí)間)就可以看做是被平攤到其執(zhí)行的所有任務(wù)中刁憋。一個(gè)線程執(zhí)行的任務(wù)越多,那么創(chuàng)建該線程的“性價(jià)比”就越高
  • 封裝了任務(wù)的具體執(zhí)行過程木蹬。線程池封裝了每個(gè)線程在創(chuàng)建至耻、管理、復(fù)用镊叁、回收等各個(gè)階段的邏輯有梆,使得客戶端代碼只需要提交任務(wù)和獲取任務(wù)的執(zhí)行結(jié)果,而無須關(guān)心任務(wù)的具體執(zhí)行過程意系。即使后續(xù)想要將任務(wù)的執(zhí)行方式從并發(fā)改為串行,往往也只需要修改線程池內(nèi)部的處理邏輯即可饺汹,而無需修改客戶端代碼
  • 減少銷毀線程的開銷蛔添。JVM 在銷毀一個(gè)已經(jīng)停止的線程時(shí)也有著資源和時(shí)間方面的開銷,采用線程池可以避免頻繁地創(chuàng)建線程兜辞,從而減少了銷毀線程的次數(shù)迎瞧,減少了相應(yīng)開銷

七、參考資料

Java線程池實(shí)現(xiàn)原理及其在美團(tuán)業(yè)務(wù)中的實(shí)踐

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末逸吵,一起剝皮案震驚了整個(gè)濱河市凶硅,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌扫皱,老刑警劉巖足绅,帶你破解...
    沈念sama閱讀 206,126評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異韩脑,居然都是意外死亡氢妈,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門段多,熙熙樓的掌柜王于貴愁眉苦臉地迎上來首量,“玉大人,你說我怎么就攤上這事进苍〖釉担” “怎么了?”我有些...
    開封第一講書人閱讀 152,445評(píng)論 0 341
  • 文/不壞的土叔 我叫張陵觉啊,是天一觀的道長拣宏。 經(jīng)常有香客問我,道長杠人,這世上最難降的妖魔是什么蚀浆? 我笑而不...
    開封第一講書人閱讀 55,185評(píng)論 1 278
  • 正文 為了忘掉前任缀程,我火速辦了婚禮,結(jié)果婚禮上市俊,老公的妹妹穿的比我還像新娘杨凑。我一直安慰自己,他們只是感情好摆昧,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,178評(píng)論 5 371
  • 文/花漫 我一把揭開白布撩满。 她就那樣靜靜地躺著,像睡著了一般绅你。 火紅的嫁衣襯著肌膚如雪伺帘。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 48,970評(píng)論 1 284
  • 那天忌锯,我揣著相機(jī)與錄音伪嫁,去河邊找鬼。 笑死偶垮,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的似舵。 我是一名探鬼主播脚猾,決...
    沈念sama閱讀 38,276評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼砚哗!你這毒婦竟也來了龙助?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,927評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤蛛芥,失蹤者是張志新(化名)和其女友劉穎提鸟,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體仅淑,經(jīng)...
    沈念sama閱讀 43,400評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡沽一,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,883評(píng)論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了漓糙。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片铣缠。...
    茶點(diǎn)故事閱讀 37,997評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖昆禽,靈堂內(nèi)的尸體忽然破棺而出蝗蛙,到底是詐尸還是另有隱情,我是刑警寧澤醉鳖,帶...
    沈念sama閱讀 33,646評(píng)論 4 322
  • 正文 年R本政府宣布捡硅,位于F島的核電站,受9級(jí)特大地震影響盗棵,放射性物質(zhì)發(fā)生泄漏壮韭。R本人自食惡果不足惜北发,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,213評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望喷屋。 院中可真熱鬧琳拨,春花似錦、人聲如沸屯曹。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽恶耽。三九已至密任,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間偷俭,已是汗流浹背浪讳。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評(píng)論 1 260
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留涌萤,地道東北人淹遵。 一個(gè)月前我還...
    沈念sama閱讀 45,423評(píng)論 2 352
  • 正文 我出身青樓,卻偏偏與公主長得像形葬,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子暮的,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,722評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容