Java進(jìn)階之并發(fā)基礎(chǔ)技術(shù)—線程池剖析

線程池的概念和定義

在服務(wù)器端的業(yè)務(wù)應(yīng)用開發(fā)中藕各,Web服務(wù)器(諸如Tomcat萎河、Jetty)需要接受并處理http請求臊岸,所以會為一個請求來分配一個線程來進(jìn)行處理橙数。如果每次請求都新創(chuàng)建一個線程的話實(shí)現(xiàn)起來非常簡便,但是存在這樣的嚴(yán)重問題:

隨著業(yè)務(wù)量的增加帅戒,如果并發(fā)的請求數(shù)量非常多灯帮,但每個線程執(zhí)行的時間很短,這樣就會頻繁的創(chuàng)建和銷毀線程(包括涉及JVM的GC),如此一來會大大降低業(yè)務(wù)系統(tǒng)的效率钟哥∮祝可能出現(xiàn)服務(wù)器在為每個請求創(chuàng)建新線程和銷毀線程上花費(fèi)的時間和消耗的系統(tǒng)資源要比處理實(shí)際的用戶請求的時間和資源更多。

那么有沒有一種解決方案可以使線程在執(zhí)行完一個任務(wù)后瞪醋,不被銷毀忿晕,而是可以繼續(xù)執(zhí)行其他的任務(wù)呢?

這就是線程池的出現(xiàn)的原因了银受,其為線程生命周期的開銷和資源不足問題提供了解決方案践盼。通過對多個任務(wù)重用線程,線程創(chuàng)建的開銷被分?jǐn)偟搅硕鄠€任務(wù)上宾巍。實(shí)際應(yīng)用中咕幻,上文所述Tomcat這樣的Web服務(wù)器也是利用線程池機(jī)制來接收并處理大量并發(fā)的http請求,可以通過其server.xml配置文件中的Connect節(jié)點(diǎn)的maxThreads(最大線程數(shù))/maxSpareThreads(最大空閑線程數(shù))/minSpareThreads(最小空閑線程數(shù))/acceptCount(最大等待隊(duì)列數(shù))/maxIdleTime(最大空閑時間)等參數(shù)進(jìn)行線程池調(diào)優(yōu)顶霞。

線程池的定義

線程池是一種多線程任務(wù)處理形式肄程,處理過程中將任務(wù)添加到隊(duì)列,然后在創(chuàng)建線程后自動啟動這些任務(wù)选浑。這里引用wiki上面的一個圖如下:

線程池使用的場景

(a)單個任務(wù)處理時間相對短

(b)需要處理的任務(wù)數(shù)量很大

線程池的主要作用

(a)降低資源消耗蓝厌。通過重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。

(b)提高響應(yīng)速度古徒。當(dāng)任務(wù)到達(dá)時拓提,任務(wù)可以不需要等到線程創(chuàng)建就能立即執(zhí)行。

(c)提高線程的可管理性隧膘。線程是稀缺資源代态,如果無限制的創(chuàng)建,不僅會消耗系統(tǒng)資源疹吃,還會降低系統(tǒng)的穩(wěn)定性蹦疑,使用線程池可以進(jìn)行統(tǒng)一的分配,調(diào)優(yōu)和監(jiān)控萨驶。

從一個簡單例子說起

在JDK 1.5后引入的Executor線程池框架的最大優(yōu)點(diǎn)是把任務(wù)的提交和執(zhí)行解耦歉摧。要執(zhí)行任務(wù)的人只需把Task描述清楚,然后提交即可篡撵。這個Task任務(wù)是怎么被執(zhí)行的判莉,被誰執(zhí)行的,什么時候執(zhí)行的育谬,提交的人就不用關(guān)心了。在最上面的Executor接口中定義了execute方法帮哈,該方法接收Runnable類型的任務(wù)命令膛檀,對用戶屏蔽底層線程的實(shí)現(xiàn)與調(diào)度細(xì)節(jié),這是一種典型命令設(shè)計模式的應(yīng)用。如下為一段非常簡單的線程池代碼例子:

// ExecutorService fixedThreadPool = ? ? ? ? ? ? ? ?Executors.newFixedThreadPool(5); ? ? ? ?fixedThreadPool.execute(new Runnable() { ? ? ? ? ? ?

????@Override ? ? ? ? ? ?

????????public void run() { ? ? ? ? ? ? ? ?

????????????????logger.info("the task is running") ? ? ? ? ? ?

????} ? ? ? ?});

在上面的例子中咖刃,生成線程池采用了工具類Executors的靜態(tài)方法泳炉,設(shè)置了線程池中核心線程數(shù)和最大線程數(shù)均為5,線程池中超過核心線程數(shù)目的空閑線程最大存活時間為0嚎杨,同時使用LinkedBlockingQueue這樣子的無界阻塞任務(wù)隊(duì)列花鹅。除了newFixedThreadPool可以生成線程數(shù)固定大小的線程池,newCachedThreadPool可以生成一個可緩存且自動回收的線程池枫浙,newSingleThreadScheduledExecutor可以生成一個單個線程的線程池刨肃。newScheduledThreadPool還可以生成支持周期任務(wù)的線程池。

ThreadPoolExecutor線程池源碼剖析

JDK中的Executor線程池框架是一個根據(jù)一組執(zhí)行策略調(diào)用箩帚、調(diào)度真友、執(zhí)行和控制線程的異步任務(wù)框架,其目的是提供一種將“任務(wù)提交”與“任務(wù)運(yùn)行”分離開來的機(jī)制紧帕。作為Java線程池框架中繼承Executor接口最為核心的類—ThreadPoolExecutor盔然,有必要對其源代碼進(jìn)行深入分析。因此是嗜,本節(jié)以ThreadPoolExecutor的源代碼舉例愈案,先對以Executor接口為核心的類結(jié)構(gòu)圖進(jìn)行一個較為全面的展示,然后回歸到源代碼中鹅搪,對線程池中任務(wù)如何提交站绪、如何執(zhí)行任務(wù)等方面分別進(jìn)行闡述。為了控制篇幅涩嚣,突出主要邏輯崇众,文章中引用的代碼片段去掉了非重點(diǎn)部分。

J.U.C線程池框架類圖

從上面的類結(jié)構(gòu)圖中可以看出航厚,在JDK的J.U.C包下面主要包含了三個接口顷歌,分別是:

(a)Executor:一個運(yùn)行新任務(wù)的簡單接口;

(b)ExecutorService:擴(kuò)展了Executor接口幔睬,增加了一些用來管理線程池狀態(tài)和任務(wù)生命周期的方法以及支持Future返回值任務(wù)提交的方法眯漩;

(c)ScheduledExecutorService:擴(kuò)展了ExecutorService,增加了定期和延遲任務(wù)執(zhí)行的方法麻顶;

這三個接口定義了JDK中Executor線程池框架的標(biāo)準(zhǔn)行為赦抖,這三個接口的具體代碼可以參考JDK的代碼。限于篇幅辅肾,這里就不對各個方法進(jìn)行一一詳細(xì)敘述了队萤。類圖中AbstractExecutorService類是一個抽象類,提供了線程池框架的一些模板方法矫钓,具體實(shí)現(xiàn)由其子類要尔,ThreadPoolExecutor和ScheduledThreadPoolExecutor分別實(shí)現(xiàn)舍杜。Executors是個工具類,里面提供了很多靜態(tài)方法赵辕,根據(jù)用戶的需求選擇返回不同的線程池實(shí)例既绩。

對于類結(jié)構(gòu)圖中左邊半部分定義了ThreadPoolExecutor核心類的成員變量,包括創(chuàng)建線程的類工廠—ThreadFactory/DefaultThreadFactory还惠,用以描述具體任務(wù)執(zhí)行線程的內(nèi)部類—Worker(其繼承AQS框架和Runnable接口)和提供線程池工作飽和策略的—RejectedExecutionHandler饲握。

線程池的狀態(tài)

線程有五種狀態(tài):新建、就緒蚕键、運(yùn)行救欧、阻塞、死亡嚎幸。對于線程池來說颜矿,同樣也具有五種狀態(tài):RUNNING, SHUTDOWN, STOP, TIDYING, TERMINATED。線程池狀態(tài)轉(zhuǎn)換圖如下:

在具體說說線程池的五種狀態(tài)之前有必要結(jié)合ThreadPoolExecutor核心類的代碼進(jìn)行一些分析嫉晶,在該類的代碼中對于線程池的五種狀態(tài)定義如下:

private final AtomicInteger ctl =new AtomicInteger(ctlOf(RUNNING,0)); ? ?

private static final int COUNT_BITS = Integer.SIZE -3; ? ?

private static final int CAPACITY ? = (1<< COUNT_BITS) -1;// runState is stored in the high-order bits

private static final int RUNNING ? ?= -1<< COUNT_BITS; ? ?

private static final int SHUTDOWN ? =0<< COUNT_BITS; ? ?

private static final int STOP ? ? ? =1<< COUNT_BITS; ? ?

private static final int TIDYING ? ?=2<< COUNT_BITS; ? ?

private static final int TERMINATED =3<< COUNT_BITS;// Packing and unpacking ctl

private static int runStateOf(int c) ? ? {

????return c & ~CAPACITY;

} ? ?

private static int workerCountOf(int c) ?{

? ? return c & CAPACITY;

} ? ?

private static int ctlOf(int rs, int wc) {

return rs | wc;

}

?????? 對于上面代碼中定義的常量ctl是對線程池的運(yùn)行狀態(tài)和線程池中有效線程的數(shù)量進(jìn)行控制的一個32位字段骑疆,它包含兩部分的信息:其中高三位表示線程池的運(yùn)行狀態(tài) (runState) ,低29位表示線程池內(nèi)有效線程的數(shù)量 (workerCount)替废。COUNT_BITS 就是29箍铭,CAPACITY就是1左移29位減1(29個1),這個常量表示workerCount的上限值椎镣,大約是5億诈火。另外這里還定義了三個靜態(tài)方法分別為,runStateOf—獲取運(yùn)行狀態(tài)状答;workerCountOf—獲取活動線程數(shù)冷守;ctlOf—獲取運(yùn)行狀態(tài)和活動線程數(shù)的值。

(a)RUNNING:處于RUNNING狀態(tài)的線程池能夠接受新任務(wù)惊科,以及對新添加的任務(wù)進(jìn)行處理拍摇。

(b)SHUTDOWN:處于SHUTDOWN狀態(tài)的線程池不可以接受新任務(wù),但是可以對已添加的任務(wù)進(jìn)行處理馆截。

(c)STOP:處于STOP狀態(tài)的線程池不接收新任務(wù)充活,不處理已添加的任務(wù),并且會中斷正在處理的任務(wù)蜡娶。

(d)TIDYING:當(dāng)所有的任務(wù)已終止混卵,ctl記錄的"任務(wù)數(shù)量"為0,線程池會變?yōu)門IDYING狀態(tài)窖张。當(dāng)線程池變?yōu)門IDYING狀態(tài)時幕随,會執(zhí)行鉤子函數(shù)terminated()。terminated()在ThreadPoolExecutor類中是空的宿接,若用戶想在線程池變?yōu)門IDYING時合陵,進(jìn)行相應(yīng)的處理枢赔;可以通過重載terminated()函數(shù)來實(shí)現(xiàn)澄阳。

(e)TERMINATED:線程池徹底終止的狀態(tài)拥知。

創(chuàng)建Executor線程池方法

在上文第二節(jié)內(nèi)容中,已經(jīng)給出了一個創(chuàng)建簡單線程池的例子碎赢,其中調(diào)用了JDK的ThreadPoolExecutor核心類的構(gòu)造函數(shù)來創(chuàng)建的線程池實(shí)例低剔。在這一節(jié)內(nèi)容中,通過分析ThreadPoolExecutor核心類的構(gòu)造函數(shù)以及參數(shù)來看下如何創(chuàng)建一個Executor線程池以及在創(chuàng)建時候需要關(guān)注哪些要素肮塞?

(a)corePoolSize:該參數(shù)指的是線程池中核心線程的數(shù)量襟齿。當(dāng)提交一個任務(wù)時,線程池會新建一個線程來執(zhí)行任務(wù)枕赵,直到當(dāng)前線程數(shù)等于corePoolSize猜欺。如果調(diào)用了線程池的prestartAllCoreThreads()方法,線程池會提前創(chuàng)建并啟動所有基本線程

(b)maximumPoolSize:從參數(shù)名稱上也應(yīng)該可以明白它的意思拷窜。該參數(shù)指的是線程池中允許的最大線程數(shù)开皿。線程池的阻塞隊(duì)列滿了之后,如果還有任務(wù)提交篮昧,如果當(dāng)前的線程數(shù)小于maximumPoolSize赋荆,則會新建線程來執(zhí)行任務(wù)。這里有必要說明的是懊昨,如果使用的是無界隊(duì)列窄潭,該參數(shù)也就沒有什么效果了。

(c)keepAliveTime:該參數(shù)為線程空閑的時間酵颁。線程的創(chuàng)建和銷毀是需要代價的嫉你。線程執(zhí)行完任務(wù)后不會立即銷毀,而是繼續(xù)存活一段時間:keepAliveTime躏惋。默認(rèn)情況下幽污,該參數(shù)只有在線程數(shù)大于corePoolSize時才會生效。

(d)unit:該參數(shù)用于表示keepAliveTime的單位其掂。

(e)workQueue:該參數(shù)用來表示保存線程池中等待執(zhí)行任務(wù)的阻塞隊(duì)列油挥,等待的任務(wù)需要實(shí)現(xiàn)Runnable接口。我們可以選擇這幾種:ArrayBlockingQueue:基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列款熬,F(xiàn)IFO深寥;LinkedBlockingQueue:基于鏈表結(jié)構(gòu)的無界阻塞隊(duì)列(如果設(shè)置初始化時的隊(duì)列大小)贤牛,F(xiàn)IFO惋鹅;SynchronousQueue:不存儲元素的阻塞隊(duì)列,每個插入操作都必須等待一個移出操作殉簸,反之亦然闰集;PriorityBlockingQueue:具有優(yōu)先級別的阻塞隊(duì)列沽讹。這里的幾種阻塞隊(duì)列都為JDK中J.U.C并發(fā)包下較為經(jīng)典的阻塞隊(duì)列,其源碼值得閱讀和學(xué)習(xí)武鲁,有興趣的朋友可以自己閱讀爽雄。

(f)threadFactory:該參數(shù)用于設(shè)置線程池中創(chuàng)建工作線程的工廠對象。該對象可以通過Executors.defaultThreadFactory()返回沐鼠。

(g)handler:該參數(shù)為線程池的拒絕策略挚瘟。所謂拒絕策略,是指將任務(wù)添加到線程池中時饲梭,線程池拒絕該任務(wù)所采取的相應(yīng)策略乘盖。當(dāng)向線程池中提交任務(wù)時,如果此時線程池中的線程已經(jīng)飽和了憔涉,而且阻塞隊(duì)列也已經(jīng)滿了订框,則線程池會選擇一種拒絕策略來處理該任務(wù)。線程池提供的拒絕策略主要有以下四種(位于上面J.U.C線程池的類結(jié)構(gòu)圖中左半部分):

AbortPolicy:直接拋出異常兜叨,默認(rèn)的線程池拒絕策略穿扳;

CallerRunsPolicy:用調(diào)用者所在的線程來完成待執(zhí)行的任務(wù);

DiscardOldestPolicy:丟棄阻塞隊(duì)列中靠最前的任務(wù)浪腐,并執(zhí)行當(dāng)前任務(wù)纵揍;

DiscardPolicy:直接丟棄任務(wù);

下面這個圖為線程池的邏輯結(jié)構(gòu)圖:

由上面的邏輯結(jié)構(gòu)圖可以初步知道線程池主要的執(zhí)行流程:

(a)當(dāng)有任務(wù)進(jìn)入時议街,線程池創(chuàng)建線程去執(zhí)行任務(wù)泽谨,直到核心線程數(shù)滿為止;

(b)核心線程數(shù)量滿了之后特漩,任務(wù)就會進(jìn)入一個緩沖的任務(wù)隊(duì)列中吧雹;

1、當(dāng)任務(wù)隊(duì)列為無界隊(duì)列時(諸如LinkedBlockingQueue隊(duì)列涂身,通過構(gòu)造函數(shù)來配置為無界隊(duì)列)雄卷,任務(wù)就會一直放入緩沖的任務(wù)隊(duì)列中,不會和最大線程數(shù)量進(jìn)行比較蛤售。

2丁鹉、當(dāng)任務(wù)隊(duì)列為有界隊(duì)列時(諸如ArrayBlockingQueue隊(duì)列),任務(wù)先放入緩沖的任務(wù)隊(duì)列中悴能,當(dāng)任務(wù)隊(duì)列滿了之后揣钦,才會將任務(wù)放入線程池,此時會與線程池中最大的線程數(shù)量進(jìn)行比較漠酿,如果超出了冯凹,則默認(rèn)會拋出異常進(jìn)行拒絕動作。否則炒嘲,線程池才會執(zhí)行任務(wù)宇姚。當(dāng)任務(wù)執(zhí)行完匈庭,又會將緩沖隊(duì)列中的任務(wù)推入線程池中,然后重復(fù)此操作浑劳。

以下是線程池采用有界隊(duì)列來處理任務(wù)的主要流程如下圖所示:

線程池中任務(wù)提交與執(zhí)行

本節(jié)將介紹線程池中最為核心的任務(wù)提交代碼流程阱持。Executor線程池可以根據(jù)業(yè)務(wù)需求的不同提供兩種方式提交任務(wù):Executor.execute()、ExecutorService.submit()呀洲。在實(shí)際應(yīng)用中紊选,我們可以使用execute方法來提交沒有返回值的任務(wù)。因?yàn)闆]有返回值道逗,所以實(shí)際也沒有辦法知道任務(wù)是否被線程池中的線程執(zhí)行成功。通過以下的示例代碼可以提交一個沒有返回值的任務(wù):

threadpool.execute(newRunnable() {

@Override

public void run() {

//TODO

//logical code here

}});

另外献烦,我們也可以通過submit方法來提交帶有返回類型future的任務(wù)滓窍,通過這個返回值可以判斷任務(wù)是否已經(jīng)成功執(zhí)行。通過future的get方法可以獲取任務(wù)執(zhí)行的返回值巩那,這里需要說明的是get方法是阻塞的吏夯,直到任務(wù)返回為止,也可以通過get(long timeout,TimeUnit unit方法)設(shè)置超時時間即横,避免一直阻塞等待噪生。以下為submit方法提交任務(wù)的示例代碼:

Future future = executor.submit();

try{

Object ret = future.get();

}catch(InterruptedException e1){

//處理中斷異常

}catch(ExecutionException e2){

//處理無法執(zhí)行任務(wù)異常

}finally{

//最后關(guān)閉線程池executor.shutdown();

}

限于篇幅,本文僅對ThreadPoolExecutor核心類的execute方法的實(shí)現(xiàn)進(jìn)行深入分析东囚,而對submit方法則不做分析闡述跺嗽,感興趣的朋友可以自行按照同樣的方法進(jìn)行分析(其實(shí),通過看submit的源代碼可以發(fā)現(xiàn)页藻,它實(shí)際上還是調(diào)用的execute方法桨嫁,只不過它利用了Future來獲取任務(wù)執(zhí)行結(jié)果)。execute方法的具體代碼如下:

從以上的ThreadPoolExecutor類的execute方法的實(shí)現(xiàn)代碼中可以得出以下幾步主要的執(zhí)行流程:

Step1:如果線程池當(dāng)前線程數(shù)小于corePoolSize份帐,則調(diào)用addWorker創(chuàng)建新線程執(zhí)行任務(wù)璃吧,且方法返回,如果調(diào)用失敗執(zhí)行Step2废境;

Step2:如果線程池處于RUNNING狀態(tài)畜挨,則嘗試加入待執(zhí)行任務(wù)的阻塞隊(duì)列;如果加入該隊(duì)列成功噩凹,則嘗試進(jìn)行Double Check巴元;如果加入失敗,則執(zhí)行Step3栓始;

Step3:如果線程池當(dāng)前為非RUNNING狀態(tài)或者加入阻塞隊(duì)列失敗务冕,則嘗試創(chuàng)建新線程直到maxPoolSize;如果失敗幻赚,則調(diào)用reject()方法執(zhí)行相應(yīng)的飽和拒絕策略禀忆;

這里需要注意的是臊旭,在Step2中如果加入隊(duì)列成功,則會進(jìn)行一個雙重校驗(yàn)的過程箩退。其主要目的是判斷加入到阻塞隊(duì)列中的任務(wù)是否可以被執(zhí)行离熏。如果線程池不是RUNNING狀態(tài),則調(diào)用remove()方法從阻塞隊(duì)列中刪除該任務(wù)戴涝,然后調(diào)用reject()方法處理任務(wù)滋戳。否則需要確保還有線程執(zhí)行。

在上面的executor方法中多次調(diào)用了addWorker方法啥刻,可能已經(jīng)有同學(xué)在默默關(guān)注這個方法了奸鸯。addWorker方法的主要工作是在線程池中創(chuàng)建一個新的線程并執(zhí)行,firstTask參數(shù) 用于指定新增的線程執(zhí)行的第一個任務(wù)可帽,core參數(shù)為true表示在新增線程時會判斷當(dāng)前活動線程數(shù)是否少于corePoolSize娄涩,false表示新增線程前需要判斷當(dāng)前活動線程數(shù)是否少于maximumPoolSize。下面我們主要來看下這個addWork方法里面的具體實(shí)現(xiàn)代碼:

private boolean addWorker(Runnable firstTask, boolean core) { ? ? ? ?

retry: for(;;) { ? ? ? ? ? ?

int c = ctl.get();// 獲取當(dāng)前線程池的狀態(tài)

int rs = runStateOf(c);

if(rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask ==null&&! workQueue.isEmpty()))

return false;// 內(nèi)層循環(huán)映跟,worker + 1

for(;;) {

// 線程數(shù)量

int wc = workerCountOf(c);

// 如果當(dāng)前線程數(shù)大于線程最大上限CAPACITY ?return false// 若core == true蓄拣,則與corePoolSize 比較,否則與maximumPoolSize 努隙,大于 return false

if(wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false;// worker + 1,成功跳出retry循環(huán)

if(compareAndIncrementWorkerCount(c)) break retry; ? ? ? ? ? ? ? ?

c = ctl.get();// Re-read ctl

// 如果狀態(tài)不等于之前獲取的state球恤,跳出內(nèi)層循環(huán),繼續(xù)去外層循環(huán)判斷

if(runStateOf(c) != rs) continue retry; ? ? ? ? ? ?

} ? ? ?

?} ? ? ? ?

boolean workerStarted =false; ? ? ?

?boolean workerAdded =false; ? ? ? ?

Worker w =null;

try{

// 新建線程:Worker

final ReentrantLock mainLock =this.mainLock; ? ? ? ? ? ?

w =new Worker(firstTask); ? ? ? ? ? ?

final Thread t = w.thread;

if(t !=null) { ? ? ? ? ? ? ? ?

mainLock.lock();

try{ ? ? ? ? ? ? ? ? ? ?

int c = ctl.get(); ? ? ? ? ? ? ? ? ? ?

int rs = runStateOf(c);

// rs < SHUTDOWN ==> 線程處于RUNNING狀態(tài)// 或者線程處于SHUTDOWN狀態(tài)荸镊,且firstTask == null(可能是workQueue中仍有未執(zhí)行完成的任務(wù)咽斧,創(chuàng)建沒有初始任務(wù)的worker線程執(zhí)行)

if(rs < SHUTDOWN || (rs == SHUTDOWN && firstTask ==null)) {

// 當(dāng)前線程已經(jīng)啟動,拋出異常

if(t.isAlive())// precheck that t is startable

throw new IllegalThreadStateException(); ? ? ? ? ? ? ? ? ? ? ? ?

workers.add(w);// 設(shè)置最大的池大小largestPoolSize贷洲,workerAdded設(shè)置為true

int s = workers.size();

if(s > largestPoolSize)?

largestPoolSize = s; ? ? ? ? ? ? ? ? ? ? ? ?

workerAdded =true; ? ? ? ? ? ? ? ? ? ?

} ? ? ? ? ? ? ? ?

}finally{ ? ? ? ? ? ? ? ? ? ?

mainLock.unlock(); ? ? ? ? ? ? ? ?

}

// 啟動線程

if(workerAdded) { ? ? ? ? ? ? ? ? ?

?t.start(); ? ? ? ? ? ? ? ? ?

?workerStarted =true; ? ? ? ? ? ? ? ?

} ? ? ? ? ? ?

} ? ? ? ?

}finally{

if(! workerStarted) ? ? ? ? ? ? ? ?

addWorkerFailed(w); ? ? ? ?

}

return workerStarted; ? ?

}

如果對Executor線程池本身就有所了解的同學(xué)可能可以以上面的代碼加上注釋就可以明白addWorker方法中的具體含義了收厨。但是這里仍然有必要再說下,在上面的addWorker方法的代碼中优构,主要完成了以下幾步流程:

Step1.首先诵叁,獲取線程池的狀態(tài)后先進(jìn)行條件的判斷,如果rs >= SHUTDOWN钦椭,則表示此時不再接收新任務(wù)拧额;其次,判斷以下三個條件只要有一個不滿足則addWorker方法返回false彪腔。這三個條件分別為:

(a)rs == SHUTDOWN侥锦,此時表示線程池處于關(guān)閉狀態(tài),不再接受新提交的任務(wù)德挣,但卻可以繼續(xù)處理阻塞隊(duì)列中已保存的任務(wù)恭垦;

(b)firsTask為空;

(c)阻塞隊(duì)列不為空;

這里,首先考慮rs == SHUTDOWN的情況,在這種情況下不會接受新提交的任務(wù)番挺,所以在firstTask不為空的時候會返回false唠帝;其次,如果firstTask為空玄柏,并且workQueue也為空襟衰,則返回false,因?yàn)殛?duì)列中已經(jīng)沒有任務(wù)了粪摘,不需要再添加線程了瀑晒。

Step2.這里先獲取線程數(shù),根據(jù)addWorker方法的第二個參數(shù)為true或者false進(jìn)行判斷徘意。如果為true表示根據(jù)corePoolSize來比較苔悦,如果為false則根據(jù)maximumPoolSize來比較。然后映砖,通過CAS進(jìn)行worker + 1间坐。

Step3.獲取主鎖mailLock,隨后再次判斷線程池的狀態(tài)邑退。如果線程池處于RUNNING狀態(tài)或者是處于SHUTDOWN狀態(tài)且 firstTask == null,則向線程池中添加線程劳澄,然后釋放主鎖mainLock并啟動線程地技,最后return true。如果中途失敗導(dǎo)致workerStarted= false秒拔,則調(diào)用addWorkerFailed()方法進(jìn)行處理莫矗。這里需要注意的是,t.start()這個語句砂缩,啟動時會調(diào)用Worker類中的run方法作谚,Worker本身實(shí)現(xiàn)了Runnable接口,所以一個Worker類型的對象也是一個線程庵芭。

仔細(xì)的同學(xué)會發(fā)現(xiàn)上面addWorker方法的代碼中還有一個Worker對象妹懒,在線程池中每一個線程被封裝成一個Worker對象,線程池中維護(hù)的其實(shí)就是一組Worker對象双吆,那么我們來看一下Worker的定義:

private final class Worker? ? ?extends AbstractQueuedSynchronizer ? ? ? ?implements Runnable{

//省略代碼......

/** Thread this worker is running in. ?Null if factory fails. */

final Thread thread;

/** Initial task to run. ?Possibly null. */

Runnable firstTask;

/** Per-thread task counter */

volatile long completedTasks;

/**

? ? * Creates with given first task and thread from ThreadFactory.

? ? * @param firstTask the first task (null if none)

? ? */

Worker(Runnable firstTask) { ? ? ? ?

setState(-1);// inhibit interrupts until run Worker

this.firstTask = firstTask;

this.thread = getThreadFactory().newThread(this); ? ?

}

/** Delegates main run loop to outer runWorker ?*/

public void run() { ? ? ? ?

runWorker(this); ? ?

}

//省略代碼......

protected boolean tryAcquire(int unused) {

if(compareAndSetState(0,1)) { ? ? ? ? ? ?

setExclusiveOwnerThread(Thread.currentThread());

return true; ? ? ? ?

}

return false; ? ?

} ? ?

protected boolean tryRelease(int unused) { ? ? ? ?

setExclusiveOwnerThread(null); ? ? ? ?

setState(0);

return true; ? ?

} ? ?

public void lock() ? ? ? ?{

acquire(1);

} ? ?

public boolean tryLock() ?{

returntryAcquire(1);

} ? ?

public void unlock() ? ? ?{

release(1);

} ? ?

public boolean isLocked() {

return isHeldExclusively();

}

//省略代碼......

}

內(nèi)部Worker類繼承了AQS眨唬,并實(shí)現(xiàn)了Runnable接口,其中firstTask用它來保存?zhèn)魅氲娜蝿?wù)好乐;thread是在調(diào)用構(gòu)造方法時通過ThreadFactory來創(chuàng)建的線程匾竿,是用來處理任務(wù)的線程。在調(diào)用構(gòu)造方法時蔚万,需要把任務(wù)傳入岭妖,這里通過getThreadFactory().newThread(this);來新建一個線程,newThread方法傳入的參數(shù)是this,因?yàn)閃orker本身繼承了Runnable接口昵慌,也就是一個線程假夺,所以一個Worker對象在啟動的時候會調(diào)用Worker類中的run方法。同時废离,該類繼承了AQS框架侄泽,使用其來實(shí)現(xiàn)獨(dú)占鎖的功能。這里可能有同學(xué)會問為什么不使用ReentrantLock來實(shí)現(xiàn)呢蜻韭?可以看到tryAcquire方法衫樊,它是不允許重入的植康,而ReentrantLock是允許重入的。這里,之所以設(shè)置為不可重入鲁沥,是因?yàn)椴幌M蝿?wù)在調(diào)用類似像setCorePoolSize這樣的線程池控制方法時重新獲取鎖,而去中斷正在運(yùn)行的線程排抬。

講到這里還沒有看到線程池任務(wù)運(yùn)行的代碼两踏,對Java線程Runnable接口比較熟悉的同學(xué)可能知道應(yīng)該是在上面的run方法來執(zhí)行具體的任務(wù)運(yùn)行,那么下面我們再進(jìn)一步的看下runWorker方法里面究竟是干什么的艰垂?

這里總結(jié)一下runWorker方法的執(zhí)行過程:

Step1.外層循環(huán)不斷地通過getTask()方法獲取任務(wù)泡仗,其中g(shù)etTask()方法從阻塞隊(duì)列中取任務(wù);

Step2. 如果線程池當(dāng)前正在停止猜憎,那么要保證當(dāng)前線程是中斷狀態(tài)娩怎,否則要保證當(dāng)前線程不是中斷狀態(tài);

Step3.調(diào)用task.run()去執(zhí)行任務(wù)胰柑,這里就是真正去執(zhí)行任務(wù)了截亦;

Step4.如果task為null則跳出循環(huán),執(zhí)行processWorkerExit()方法柬讨;

上面代碼中的getTask()方法是用于從阻塞隊(duì)列中獲取任務(wù)的崩瓤;processWorkerExit()方法則進(jìn)行收尾工作,統(tǒng)計完成的任務(wù)數(shù)目并從線程池中移除一個工作線程以銷毀工作線程踩官。講到這里基本可以總結(jié)下整個工作線程的生命周期:從execute方法開始却桶,Worker使用ThreadFactory創(chuàng)建新的工作線程,runWorker通過getTask獲取任務(wù)卖鲤,然后執(zhí)行任務(wù)肾扰,如果getTask返回null,進(jìn)入processWorkerExit方法蛋逾,整個線程結(jié)束集晚。執(zhí)行流程圖如下:

限于篇幅所限,對于ThreadPoolExecutor核心類中的tryTerminate区匣、shutdown偷拔、shutdownNow和interruptIdleWorkers等方法就不再在此進(jìn)行贅述了蒋院。感興趣的同學(xué)可以自己再閱讀這幾個類的源代碼。

如何合理應(yīng)用線程池

一般在我們自己的Spring-boot工程中都用如下的配置方式來注入線程池的Bean(Spring-MVC的工程是基于XML莲绰,配置大同小異)欺旧。不注意其中細(xì)節(jié)的同學(xué)覺得這里的幾個參數(shù)按照自己的感覺隨便設(shè)置即可,如果是自己做練習(xí)或者一些demo樣例的話蛤签,確實(shí)是無所謂辞友,但是要應(yīng)用于生產(chǎn)實(shí)際環(huán)境的話,還是需要經(jīng)過一番分析和思考來設(shè)置線程池的參數(shù)震肮,才能滿足業(yè)務(wù)需求達(dá)到優(yōu)化系統(tǒng)性能的目標(biāo)称龙。

@Bean(name ="taskAsyncPool")

public Executor taskAsyncPool() { ? ? ? ? ?

ThreadPoolTaskExecutor executor =new ThreadPoolTaskExecutor(); ? ? ? ? ? executor.setCorePoolSize(corePoolSize); ? ? ? ? ?

executor.setMaxPoolSize(maxPoolSize); ? ? ? ? ?

executor.setQueueCapacity(queueCapacity); ? ? ? ? ? executor.setAllowCoreThreadTimeOut(allowCoreThreadTimeOut); ? ? ? ? ? executor.setKeepAliveSeconds(keepAliveSeconds); ? ? ? ? ? executor.setThreadNamePrefix("ExecutorThread-");// rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時候,如何處理新任務(wù)// CALLER_RUNS:不在新線程中執(zhí)行任務(wù)戳晌,而是有調(diào)用者所在的線程來執(zhí)行executor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy()); ? ? ? ? ? executor.initialize();

return executor;

}

要合理地應(yīng)用線程池技術(shù)鲫尊,就需要首先對任務(wù)的特性有足夠的了解,可以從以下的幾個方面來分析:

(a)任務(wù)性質(zhì):提交的任務(wù)到底是CPU密集型沦偎,IO密集型還是兼有兩者的混合型疫向;

(b)任務(wù)優(yōu)先級:任務(wù)是否具備低、中豪嚎、高的優(yōu)先級搔驼;

(c)任務(wù)執(zhí)行時間:任務(wù)執(zhí)行時間需要較長、中侈询、還是較短匙奴;

(d)任務(wù)依賴性:是否依賴其他系統(tǒng),比如數(shù)據(jù)庫連接妄荔;

首先,對于(a)我們可以根據(jù)任務(wù)性質(zhì)的不同用不同規(guī)模的線程池來進(jìn)行處理谍肤。CPU密集型的任務(wù)可以盡可能配置核心線程數(shù)較小的線程池啦租,如配置Ncpu+1個線程的線程池。而對于IO密集型的任務(wù)由于任務(wù)長時間處于IO等待中荒揣,因此可以配置較多的線程篷角,如2*Ncpu線程的線程池。如果對于混合型則通過適當(dāng)?shù)貙⑵洳鸱殖蒀PU密集型和IO密集型的分別處理系任。如果編程者不確定當(dāng)前機(jī)器的CPU核數(shù)恳蹲,JDK提供了Runtime.getRuntime().availableProcessors()方法進(jìn)行獲取。

對于(b)可以通過J.U.C并發(fā)包中的優(yōu)先級隊(duì)列—PriorityBlockingQueue來進(jìn)行任務(wù)處理俩滥。它可以讓高優(yōu)先級的任務(wù)先執(zhí)行嘉蕾,低優(yōu)先級任務(wù)延遲執(zhí)行。

對于(c)(d)可以根據(jù)任務(wù)不同執(zhí)行時間霜旧,分別建立不同規(guī)模類型的線程池來進(jìn)行任務(wù)處理错忱。

另外,需要說明的是在實(shí)際應(yīng)用中,還是建議在線程池中設(shè)置有界隊(duì)列來初始化以清。因?yàn)橛薪珀?duì)列可以增加系統(tǒng)的穩(wěn)定性和預(yù)警設(shè)置儿普,如果遇到線程池中線程數(shù)和任務(wù)隊(duì)列均滿的情況,可以直接執(zhí)行飽和拒絕策略并拋出異常告警掷倔。若是采用了無界隊(duì)列眉孩,則線程池中的任務(wù)隊(duì)列的任務(wù)數(shù)目會積壓得越來越多,最后撐爆服務(wù)器的內(nèi)存勒葱,造成整個系統(tǒng)不可用浪汪。

總結(jié)

本文從線程池的概念和定義出發(fā),簡單介紹了其使用場景和主要作用错森。然后從一個簡單的線程池樣例說起吟宦,闡述了Executor線程池框架的任務(wù)的提交和執(zhí)行解耦機(jī)制。通過給出J.U.C包下與線程池相關(guān)的類結(jié)構(gòu)圖涩维,簡要介紹了以ThreadPoolExecutor為核心的相關(guān)接口和類殃姓,并結(jié)合代碼給出線程池幾種狀態(tài)定義以及轉(zhuǎn)換圖,詳細(xì)描述了創(chuàng)建線程池方法的ThreadPoolExecutor構(gòu)造函數(shù)瓦阐。最后蜗侈,以ThreadPoolExecutor類的execute方法為入口深入分析addWorker和runWorker核心方法的源代碼,梳理了線程池整體工作原理睡蟋、生命周期和運(yùn)行機(jī)制踏幻。在向線程池提交任務(wù)時,除本文敘述的execute方法外戳杀,還有一個submit方法该面,submit方法會返回一個Future對象用于獲取返回值,限于篇幅信卡,有關(guān)Future和Callable將在其他篇幅中進(jìn)行介紹隔缀。限于筆者的才疏學(xué)淺,對JDK的Executor線程池可能還有理解不到位的地方傍菇,如有闡述不合理之處還望留言一起探討猾瘸。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市丢习,隨后出現(xiàn)的幾起案子牵触,更是在濱河造成了極大的恐慌,老刑警劉巖咐低,帶你破解...
    沈念sama閱讀 218,682評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件揽思,死亡現(xiàn)場離奇詭異,居然都是意外死亡渊鞋,警方通過查閱死者的電腦和手機(jī)绰更,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,277評論 3 395
  • 文/潘曉璐 我一進(jìn)店門瞧挤,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人儡湾,你說我怎么就攤上這事特恬。” “怎么了徐钠?”我有些...
    開封第一講書人閱讀 165,083評論 0 355
  • 文/不壞的土叔 我叫張陵癌刽,是天一觀的道長。 經(jīng)常有香客問我尝丐,道長显拜,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,763評論 1 295
  • 正文 為了忘掉前任爹袁,我火速辦了婚禮远荠,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘失息。我一直安慰自己譬淳,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,785評論 6 392
  • 文/花漫 我一把揭開白布盹兢。 她就那樣靜靜地躺著邻梆,像睡著了一般。 火紅的嫁衣襯著肌膚如雪绎秒。 梳的紋絲不亂的頭發(fā)上浦妄,一...
    開封第一講書人閱讀 51,624評論 1 305
  • 那天,我揣著相機(jī)與錄音见芹,去河邊找鬼剂娄。 笑死,一個胖子當(dāng)著我的面吹牛玄呛,可吹牛的內(nèi)容都是我干的宜咒。 我是一名探鬼主播,決...
    沈念sama閱讀 40,358評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼把鉴,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了儿咱?” 一聲冷哼從身側(cè)響起庭砍,我...
    開封第一講書人閱讀 39,261評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎混埠,沒想到半個月后怠缸,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,722評論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡钳宪,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年揭北,在試婚紗的時候發(fā)現(xiàn)自己被綠了扳炬。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,030評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡搔体,死狀恐怖恨樟,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情疚俱,我是刑警寧澤劝术,帶...
    沈念sama閱讀 35,737評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站呆奕,受9級特大地震影響养晋,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜梁钾,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,360評論 3 330
  • 文/蒙蒙 一绳泉、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧姆泻,春花似錦零酪、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,941評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至潜秋,卻和暖如春蛔琅,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背峻呛。 一陣腳步聲響...
    開封第一講書人閱讀 33,057評論 1 270
  • 我被黑心中介騙來泰國打工罗售, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人钩述。 一個月前我還...
    沈念sama閱讀 48,237評論 3 371
  • 正文 我出身青樓寨躁,卻偏偏與公主長得像,于是被迫代替她去往敵國和親牙勘。 傳聞我的和親對象是個殘疾皇子职恳,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,976評論 2 355

推薦閱讀更多精彩內(nèi)容

  • 距離國考只有二十多天的時間了,之前一直說要復(fù)習(xí)方面,到現(xiàn)在為止父母幫忙找來的資料還是一眼都沒有看放钦。每天只是推脫白...
    好女孩閱讀 207評論 0 0
  • 反面教材 冬天的早晨格外地冷,噼里啪啦的鞭...
    Ariel一本閱讀 283評論 0 3
  • 心之所往恭金,總是在我們不經(jīng)意的瞬間操禀。
    be54d1574ebd閱讀 180評論 1 0
  • 那鼓鼓的東西只有男人才有—— 某晚,和女友約會横腿,突然女友用手輕輕的揉著我的那個部位問道:“親愛的颓屑,你這個地方怎么鼓...
    范末末閱讀 534評論 0 0
  • 人這一生啊
    Nii閱讀 209評論 0 0