深度解讀 Java 線程池設(shè)計思想及源碼實現(xiàn)

我相信大家都看過很多的關(guān)于線程池的文章鼻疮,基本上也是面試的時候必問的琳水,如果你在看過很多文章以后,還是一知半解的吊档,那希望這篇文章能讓你真正的掌握好 Java 線程池篙议。

本文一大重點是源碼解析,同時會有少量篇幅介紹線程池設(shè)計思想以及作者 Doug Lea 實現(xiàn)過程中的一些巧妙用法怠硼。本文還是會一行行關(guān)鍵代碼進行分析鬼贱,目的是為了讓那些自己看源碼不是很理解的同學可以得到參考。

線程池是非常重要的工具香璃,如果你要成為一個好的工程師这难,還是得比較好地掌握這個知識,很多線上問題都是因為沒有用好線程池導致的葡秒。即使你為了謀生姻乓,也要知道,這基本上是面試必問的題目眯牧,而且面試官很容易從被面試者的回答中捕捉到被面試者的技術(shù)水平蹋岩。

本文略長,建議在 pc 上閱讀炸站,邊看文章邊翻源碼(Java7 和 Java8 都一樣)星澳,建議想好好看的讀者抽出至少?30?分鐘的整塊時間來閱讀。當然旱易,如果讀者僅為面試準備禁偎,可以直接滑到最后的總結(jié)部分。

總覽

開篇來一些廢話阀坏。下圖是 java 線程池幾個相關(guān)類的繼承結(jié)構(gòu):

先簡單說說這個繼承結(jié)構(gòu)如暖,Executor 位于最頂層,也是最簡單的忌堂,就一個 execute(Runnable runnable) 接口方法定義盒至。

ExecutorService 也是接口,在 Executor 接口的基礎(chǔ)上添加了很多的接口方法士修,所以一般來說我們會使用這個接口枷遂。

然后再下來一層是 AbstractExecutorService,從名字我們就知道棋嘲,這是抽象類酒唉,這里實現(xiàn)了非常有用的一些方法供子類直接使用,之后我們再細說沸移。

然后才到我們的重點部分 ThreadPoolExecutor 類痪伦,這個類提供了關(guān)于線程池所需的非常豐富的功能侄榴。

另外,我們還涉及到下圖中的這些類:

同在并發(fā)包中的 Executors 類网沾,類名中帶字母 s癞蚕,我們猜到這個是工具類,里面的方法都是靜態(tài)方法辉哥,如以下我們最常用的用于生成 ThreadPoolExecutor 的實例的一些方法:

public static ExecutorServicenewCachedThreadPool{

return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

60L, TimeUnit.SECONDS,

new SynchronousQueue);

}

public static ExecutorServicenewFixedThreadPool(int nThreads) {

return new ThreadPoolExecutor(nThreads, nThreads,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue);

}

另外桦山,由于線程池支持獲取線程執(zhí)行的結(jié)果,所以证薇,引入了 Future 接口度苔,RunnableFuture 繼承自此接口,然后我們最需要關(guān)心的就是它的實現(xiàn)類 FutureTask浑度。到這里寇窑,記住這個概念,在線程池的使用過程中箩张,我們是往線程池提交任務(wù)(task)甩骏,使用過線程池的都知道,我們提交的每個任務(wù)是實現(xiàn)了 Runnable 接口的先慷,其實就是先將 Runnable 的任務(wù)包裝成 FutureTask饮笛,然后再提交到線程池。這樣论熙,讀者才能比較容易記住 FutureTask 這個類名:它首先是一個任務(wù)(Task)福青,然后具有 Future 接口的語義,即可以在將來(Future)得到執(zhí)行的結(jié)果脓诡。

當然无午,線程池中的 BlockingQueue 也是非常重要的概念,如果線程數(shù)達到 corePoolSize祝谚,我們的每個任務(wù)會提交到等待隊列中宪迟,等待線程池中的線程來取任務(wù)并執(zhí)行。這里的 BlockingQueue 通常我們使用其實現(xiàn)類 LinkedBlockingQueue交惯、ArrayBlockingQueue 和 SynchronousQueue次泽,每個實現(xiàn)類都有不同的特征,使用場景之后會慢慢分析席爽。想要詳細了解各個 BlockingQueue 的讀者意荤,可以參考我的前面的一篇對 BlockingQueue 的各個實現(xiàn)類進行詳細分析的文章。

把事情說完整:除了上面說的這些類外只锻,還有一個很重要的類玖像,就是定時任務(wù)實現(xiàn)類 ScheduledThreadPoolExecutor,它繼承自本文要重點講解的 ThreadPoolExecutor炬藤,用于實現(xiàn)定時執(zhí)行御铃。不過本文不會介紹它的實現(xiàn),我相信讀者看完本文后可以比較容易地看懂它的源碼沈矿。

以上就是本文要介紹的知識上真,廢話不多說,開始進入正文羹膳。

Executor 接口

/*

* @since 1.5

* @author Doug Lea

*/

public interfaceExecutor{

voidexecute(Runnable command);

}

我們可以看到 Executor 接口非常簡單睡互,就一個?void execute(Runnable command)方法,代表提交一個任務(wù)陵像。為了讓大家理解 java 線程池的整個設(shè)計方案就珠,我會按照 Doug Lea 的設(shè)計思路來多說一些相關(guān)的東西。

我們經(jīng)常這樣啟動一個線程:

new Thread(new Runnable{

// do something

}).start;

用了線程池 Executor 后就可以像下面這么使用:

Executor executor = anExecutor;

executor.execute(new RunnableTask1);

executor.execute(new RunnableTask2);

如果我們希望線程池同步執(zhí)行每一個任務(wù)醒颖,我們可以這么實現(xiàn)這個接口:

classDirectExecutorimplementsExecutor{

public voidexecute(Runnable r) {

r.run;// 這里不是用的new Thread(r).start妻怎,也就是說沒有啟動任何一個新的線程。

}

}

我們希望每個任務(wù)提交進來后泞歉,直接啟動一個新的線程來執(zhí)行這個任務(wù)逼侦,我們可以這么實現(xiàn):

classThreadPerTaskExecutorimplementsExecutor{

public voidexecute(Runnable r) {

new Thread(r).start; // 每個任務(wù)都用一個新的線程來執(zhí)行

}

}

我們再來看下怎么組合兩個 Executor 來使用,下面這個實現(xiàn)是將所有的任務(wù)都加到一個 queue 中腰耙,然后從 queue 中取任務(wù)榛丢,交給真正的執(zhí)行器執(zhí)行,這里采用 synchronized 進行并發(fā)控制:

classSerialExecutorimplementsExecutor{

// 任務(wù)隊列

final Queue tasks = new ArrayDeque;

// 這個才是真正的執(zhí)行器

final Executor executor;

// 當前正在執(zhí)行的任務(wù)

Runnable active;

// 初始化的時候挺庞,指定執(zhí)行器

SerialExecutor(Executor executor) {

this.executor = executor;

}

// 添加任務(wù)到線程池: 將任務(wù)添加到任務(wù)隊列晰赞,scheduleNext 觸發(fā)執(zhí)行器去任務(wù)隊列取任務(wù)

public synchronized voidexecute(final Runnable r) {

tasks.offer(new Runnable {

public voidrun{

try {

r.run;

} finally {

scheduleNext;

}

}

});

if (active == ) {

scheduleNext;

}

}

protected synchronized voidscheduleNext{

if ((active = tasks.poll) != ) {

// 具體的執(zhí)行轉(zhuǎn)給真正的執(zhí)行器 executor

executor.execute(active);

}

}

}

當然了,Executor 這個接口只有提交任務(wù)的功能选侨,太簡單了掖鱼,我們想要更豐富的功能,比如我們想知道執(zhí)行結(jié)果侵俗、我們想知道當前線程池有多少個線程活著锨用、已經(jīng)完成了多少任務(wù)等等,這些都是這個接口的不足的地方隘谣。接下來我們要介紹的是繼承自?Executor接口的ExecutorService接口增拥,這個接口提供了比較豐富的功能,也是我們最常使用到的接口寻歧。

ExecutorService

一般我們定義一個線程池的時候掌栅,往往都是使用這個接口:

ExecutorService executor = Executors.newFixedThreadPool(args...);

ExecutorService executor = Executors.newCachedThreadPool(args...);

因為這個接口中定義的一系列方法大部分情況下已經(jīng)可以滿足我們的需要了。

那么我們簡單初略地來看一下這個接口中都有哪些方法:

public interfaceExecutorServiceextendsExecutor{

// 關(guān)閉線程池码泛,已提交的任務(wù)繼續(xù)執(zhí)行猾封,不接受繼續(xù)提交新任務(wù)

voidshutdown;

// 關(guān)閉線程池,嘗試停止正在執(zhí)行的所有任務(wù)噪珊,不接受繼續(xù)提交新任務(wù)

// 它和前面的方法相比晌缘,加了一個單詞“now”齐莲,區(qū)別在于它會去停止當前正在進行的任務(wù)

ListshutdownNow;

// 線程池是否已關(guān)閉

booleanisShutdown;

// 如果調(diào)用了 shutdown 或 shutdownNow 方法后,所有任務(wù)結(jié)束了磷箕,那么返回true

// 這個方法必須在調(diào)用shutdown或shutdownNow方法之后調(diào)用才會返回true

booleanisTerminated;

// 等待所有任務(wù)完成选酗,并設(shè)置超時時間

// 我們這么理解,實際應(yīng)用中是岳枷,先調(diào)用 shutdown 或 shutdownNow芒填,

// 然后再調(diào)這個方法等待所有的線程真正地完成,返回值意味著有沒有超時

booleanawaitTermination(long timeout, TimeUnit unit)

throws InterruptedException;

// 提交一個 Callable 任務(wù)

Futuresubmit(Callable task);

// 提交一個 Runnable 任務(wù)空繁,第二個參數(shù)將會放到 Future 中筷弦,作為返回值乒省,

// 因為 Runnable 的 run 方法本身并不返回任何東西

Futuresubmit(Runnable task, T result);

// 提交一個 Runnable 任務(wù)

Future submit(Runnable task);

// 執(zhí)行所有任務(wù)罐韩,返回 Future 類型的一個 list

List> invokeAll(Collection> tasks)

throws InterruptedException;

// 也是執(zhí)行所有任務(wù)夷蚊,但是這里設(shè)置了超時時間

List> invokeAll(Collection> tasks,

long timeout, TimeUnit unit)

throws InterruptedException;

// 只有其中的一個任務(wù)結(jié)束了,就可以返回傲诵,返回執(zhí)行完的那個任務(wù)的結(jié)果

TinvokeAny(Collection> tasks)

throws InterruptedException, ExecutionException;

// 同上一個方法蜀踏,只有其中的一個任務(wù)結(jié)束了,就可以返回掰吕,返回執(zhí)行完的那個任務(wù)的結(jié)果果覆,

// 不過這個帶超時,超過指定的時間殖熟,拋出 TimeoutException 異常

TinvokeAny(Collection> tasks,

long timeout, TimeUnit unit)

throws InterruptedException, ExecutionException, TimeoutException;

}

這些方法都很好理解局待,一個簡單的線程池主要就是這些功能,能提交任務(wù)菱属,能獲取結(jié)果钳榨,能關(guān)閉線程池,這也是為什么我們經(jīng)常用這個接口的原因纽门。

FutureTask

在繼續(xù)往下層介紹 ExecutorService 的實現(xiàn)類之前薛耻,我們先來說說相關(guān)的類 FutureTask。

Future Runnable

\ /

\ /

RunnableFuture

|

|

FutureTask

FutureTask 通過 RunnableFuture 間接實現(xiàn)了 Runnable 接口赏陵,

所以每個 Runnable 通常都先包裝成 FutureTask饼齿,

然后調(diào)用 executor.execute(Runnable command) 將其提交給線程池

我們知道,Runnable 的 void run 方法是沒有返回值的蝙搔,所以缕溉,通常,如果我們需要的話吃型,會在 submit 中指定第二個參數(shù)作為返回值:

Futuresubmit(Runnable task, T result);

其實到時候會通過這兩個參數(shù)证鸥,將其包裝成 Callable。它和 Runnable 的區(qū)別在于 run 沒有返回值,而 Callable 的 call 方法有返回值枉层,同時泉褐,如果運行出現(xiàn)異常,call 方法會拋出異常鸟蜡。

public interfaceCallable<V> {

Vcallthrows Exception;

}

在這里兴枯,就不展開說 FutureTask 類了,因為本文篇幅本來就夠大了矩欠,這里我們需要知道怎么用就行了。

下面悠夯,我們來看看?ExecutorService的抽象實現(xiàn)AbstractExecutorService癌淮。

AbstractExecutorService

AbstractExecutorService 抽象類派生自 ExecutorService 接口,然后在其基礎(chǔ)上實現(xiàn)了幾個實用的方法沦补,這些方法提供給子類進行調(diào)用乳蓄。

這個抽象類實現(xiàn)了 invokeAny 方法和 invokeAll 方法,這里的兩個 newTaskFor 方法也比較有用夕膀,用于將任務(wù)包裝成 FutureTask虚倒。定義于最上層接口 Executor中的?void execute(Runnable command)由于不需要獲取結(jié)果,不會進行 FutureTask 的包裝产舞。

需要獲取結(jié)果(FutureTask)魂奥,用 submit 方法,不需要獲取結(jié)果易猫,可以用 execute 方法耻煤。

下面,我將一行一行源碼地來分析這個類准颓,跟著源碼來看看其實現(xiàn)吧:

Tips: invokeAny 和 invokeAll 方法占了這整個類的絕大多數(shù)篇幅哈蝇,讀者可以選擇適當跳過,因為它們可能在你的實踐中使用的頻次比較低攘已,而且它們不帶有承前啟后的作用炮赦,不用擔心會漏掉什么導致看不懂后面的代碼。

public abstract classAbstractExecutorServiceimplementsExecutorService{

// RunnableFuture 是用于獲取執(zhí)行結(jié)果的样勃,我們常用它的子類 FutureTask

// 下面兩個 newTaskFor 方法用于將我們的任務(wù)包裝成 FutureTask 提交到線程池中執(zhí)行

protected RunnableFuturenewTaskFor(Runnable runnable, T value) {

return new FutureTask(runnable, value);

}

protected RunnableFuturenewTaskFor(Callable callable) {

return new FutureTask(callable);

}

// 提交任務(wù)

public Future submit(Runnable task) {

if (task == ) throw new PointerException;

// 1. 將任務(wù)包裝成 FutureTask

RunnableFuture ftask = newTaskFor(task, );

// 2. 交給執(zhí)行器執(zhí)行吠勘,execute 方法由具體的子類來實現(xiàn)

// 前面也說了,F(xiàn)utureTask 間接實現(xiàn)了Runnable 接口峡眶。

execute(ftask);

return ftask;

}

public Futuresubmit(Runnable task, T result) {

if (task == ) throw new PointerException;

// 1. 將任務(wù)包裝成 FutureTask

RunnableFuture ftask = newTaskFor(task, result);

// 2. 交給執(zhí)行器執(zhí)行

execute(ftask);

return ftask;

}

public Futuresubmit(Callable task) {

if (task == ) throw new PointerException;

// 1. 將任務(wù)包裝成 FutureTask

RunnableFuture ftask = newTaskFor(task);

// 2. 交給執(zhí)行器執(zhí)行

execute(ftask);

return ftask;

}

// 此方法目的:將 tasks 集合中的任務(wù)提交到線程池執(zhí)行看幼,任意一個線程執(zhí)行完后就可以結(jié)束了

// 第二個參數(shù) timed 代表是否設(shè)置超時機制,超時時間為第三個參數(shù)幌陕,

// 如果 timed 為 true诵姜,同時超時了還沒有一個線程返回結(jié)果,那么拋出 TimeoutException 異常

private TdoInvokeAny(Collection> tasks,

boolean timed, long nanos)

throws InterruptedException, ExecutionException, TimeoutException {

if (tasks == )

throw new PointerException;

// 任務(wù)數(shù)

int ntasks = tasks.size;

if (ntasks == 0)

throw new IllegalArgumentException;

//

List> futures= new ArrayList>(ntasks);

// ExecutorCompletionService 不是一個真正的執(zhí)行器,參數(shù) this 才是真正的執(zhí)行器

// 它對執(zhí)行器進行了包裝棚唆,每個任務(wù)結(jié)束后暇赤,將結(jié)果保存到內(nèi)部的一個 completionQueue 隊列中

// 這也是為什么這個類的名字里面有個 Completion 的原因吧。

ExecutorCompletionService ecs =

new ExecutorCompletionService(this);

try {

// 用于保存異常信息宵凌,此方法如果沒有得到任何有效的結(jié)果鞋囊,那么我們可以拋出最后得到的一個異常

ExecutionException ee = ;

long lastTime = timed ? System.nanoTime : 0;

Iterator> it = tasks.iterator;

// 首先先提交一個任務(wù),后面的任務(wù)到下面的 for 循環(huán)一個個提交

futures.add(ecs.submit(it.next));

// 提交了一個任務(wù)瞎惫,所以任務(wù)數(shù)量減 1

--ntasks;

// 正在執(zhí)行的任務(wù)數(shù)(提交的時候 +1溜腐,任務(wù)結(jié)束的時候 -1)

int active = 1;

for (;;) {

// ecs 上面說了,其內(nèi)部有一個 completionQueue 用于保存執(zhí)行完成的結(jié)果

// BlockingQueue 的 poll 方法不阻塞瓜喇,返回 代表隊列為空

Future f = ecs.poll;

// 為 挺益,說明剛剛提交的第一個線程還沒有執(zhí)行完成

// 在前面先提交一個任務(wù),加上這里做一次檢查乘寒,也是為了提高性能

if (f == ) {

if (ntasks > 0) {

--ntasks;

futures.add(ecs.submit(it.next));

++active;

}

// 這里是 else if望众,不是 if。這里說明伞辛,沒有任務(wù)了烂翰,同時 active 為 0 說明

// 任務(wù)都執(zhí)行完成了。其實我也沒理解為什么這里做一次 break蚤氏?

// 因為我認為 active 為 0 的情況甘耿,必然從下面的 f.get 返回了

// 2018-02-23 感謝讀者 newmicro 的 comment,

// 這里的 active == 0竿滨,說明所有的任務(wù)都執(zhí)行失敗棵里,那么這里是 for 循環(huán)出口

else if (active == 0)

break;

// 這里也是 else if。這里說的是姐呐,沒有任務(wù)了殿怜,但是設(shè)置了超時時間,這里檢測是否超時

else if (timed) {

// 帶等待的 poll 方法

f = ecs.poll(nanos, TimeUnit.NANOSECONDS);

// 如果已經(jīng)超時曙砂,拋出 TimeoutException 異常头谜,這整個方法就結(jié)束了

if (f == )

throw new TimeoutException;

long now = System.nanoTime;

nanos -= now - lastTime;

lastTime = now;

}

// 這里是 else。說明鸠澈,沒有任務(wù)需要提交柱告,但是池中的任務(wù)沒有完成,還沒有超時(如果設(shè)置了超時)

// take 方法會阻塞笑陈,直到有元素返回际度,說明有任務(wù)結(jié)束了

else

f = ecs.take;

}

/*

* 我感覺上面這一段并不是很好理解,這里簡單說下涵妥。

* 1. 首先乖菱,這在一個 for 循環(huán)中,我們設(shè)想每一個任務(wù)都沒那么快結(jié)束,

* 那么窒所,每一次都會進到第一個分支鹉勒,進行提交任務(wù),直到將所有的任務(wù)都提交了

* 2. 任務(wù)都提交完成后吵取,如果設(shè)置了超時禽额,那么 for 循環(huán)其實進入了“一直檢測是否超時”

這件事情上

* 3. 如果沒有設(shè)置超時機制,那么不必要檢測超時皮官,那就會阻塞在 ecs.take 方法上脯倒,

等待獲取第一個執(zhí)行結(jié)果

* 4. 如果所有的任務(wù)都執(zhí)行失敗,也就是說 future 都返回了捺氢,

但是 f.get 拋出異常藻丢,那么從 active == 0 分支出去(感謝 newmicro 提出)

// 當然,這個需要看下面的 if 分支讯沈。

*/

// 有任務(wù)結(jié)束了

if (f != ) {

--active;

try {

// 返回執(zhí)行結(jié)果,如果有異常婿奔,都包裝成 ExecutionException

return f.get;

} catch (ExecutionException eex) {

ee = eex;

} catch (RuntimeException rex) {

ee = new ExecutionException(rex);

}

}

}// 注意看 for 循環(huán)的范圍缺狠,一直到這里

if (ee == )

ee = new ExecutionException;

throw ee;

} finally {

// 方法退出之前,取消其他的任務(wù)

for (Future f : futures)

f.cancel(true);

}

}

public TinvokeAny(Collection> tasks)

throws InterruptedException, ExecutionException {

try {

return doInvokeAny(tasks, false, 0);

} catch (TimeoutException cannotHappen) {

assert false;

return ;

}

}

public TinvokeAny(Collection> tasks,

long timeout, TimeUnit unit)

throws InterruptedException, ExecutionException, TimeoutException {

return doInvokeAny(tasks, true, unit.toNanos(timeout));

}

// 執(zhí)行所有的任務(wù)萍摊,返回任務(wù)結(jié)果挤茄。

// 先不要看這個方法,我們先想想冰木,其實我們自己提交任務(wù)到線程池穷劈,也是想要線程池執(zhí)行所有的任務(wù)

// 只不過,我們是每次 submit 一個任務(wù)踊沸,這里以一個集合作為參數(shù)提交

public List> invokeAll(Collection> tasks)

throws InterruptedException {

if (tasks == )

throw new PointerException;

List> futures = new ArrayList>(tasks.size);

boolean done = false;

try {

// 這個很簡單

for (Callable t : tasks) {

// 包裝成 FutureTask

RunnableFuture f = newTaskFor(t);

futures.add(f);

// 提交任務(wù)

execute(f);

}

for (Future f : futures) {

if (!f.isDone) {

try {

// 這是一個阻塞方法歇终,直到獲取到值,或拋出了異常

// 這里有個小細節(jié)逼龟,其實 get 方法簽名上是會拋出 InterruptedException 的

// 可是這里沒有進行處理评凝,而是拋給外層去了。此異常發(fā)生于還沒執(zhí)行完的任務(wù)被取消了

f.get;

} catch (CancellationException ignore) {

} catch (ExecutionException ignore) {

}

}

}

done = true;

// 這個方法返回腺律,不像其他的場景奕短,返回 List,其實執(zhí)行結(jié)果還沒出來

// 這個方法返回是真正的返回匀钧,任務(wù)都結(jié)束了

return futures;

} finally {

// 為什么要這個翎碑?就是上面說的有異常的情況

if (!done)

for (Future f : futures)

f.cancel(true);

}

}

// 帶超時的 invokeAll,我們找不同吧

public List> invokeAll(Collection> tasks,

long timeout, TimeUnit unit)

throws InterruptedException {

if (tasks == || unit == )

throw new PointerException;

long nanos = unit.toNanos(timeout);

List> futures = new ArrayList>(tasks.size);

boolean done = false;

try {

for (Callable t : tasks)

futures.add(newTaskFor(t));

long lastTime = System.nanoTime;

Iterator> it = futures.iterator;

// 每提交一個任務(wù)之斯,檢測一次是否超時

while (it.hasNext) {

execute((Runnable)(it.next));

long now = System.nanoTime;

nanos -= now - lastTime;

lastTime = now;

// 超時

if (nanos <= 0)

return futures;

}

for (Future f : futures) {

if (!f.isDone) {

if (nanos <= 0)

return futures;

try {

// 調(diào)用帶超時的 get 方法日杈,這里的參數(shù) nanos 是剩余的時間,

// 因為上面其實已經(jīng)用掉了一些時間了

f.get(nanos, TimeUnit.NANOSECONDS);

} catch (CancellationException ignore) {

} catch (ExecutionException ignore) {

} catch (TimeoutException toe) {

return futures;

}

long now = System.nanoTime;

nanos -= now - lastTime;

lastTime = now;

}

}

done = true;

return futures;

} finally {

if (!done)

for (Future f : futures)

f.cancel(true);

}

}

}

到這里,我們發(fā)現(xiàn)达椰,這個抽象類包裝了一些基本的方法翰蠢,可是像 submit、invokeAny啰劲、invokeAll 等方法梁沧,它們都沒有真正開啟線程來執(zhí)行任務(wù),它們都只是在方法內(nèi)部調(diào)用了 execute 方法蝇裤,所以最重要的 execute(Runnable runnable) 方法還沒出現(xiàn)廷支,需要等具體執(zhí)行器來實現(xiàn)這個最重要的部分,這里我們要說的就是 ThreadPoolExecutor 類了栓辜。

鑒于本文的篇幅恋拍,我覺得看到這里的讀者應(yīng)該已經(jīng)不多了,大家都習慣了快餐文化藕甩。我寫的每篇文章都力求讓讀者可以通過我的一篇文章而對相關(guān)內(nèi)容有全面的了解施敢,所以篇幅不免長了些。

ThreadPoolExecutor

ThreadPoolExecutor 是 JDK 中的線程池實現(xiàn)狭莱,這個類實現(xiàn)了一個線程池需要的各個方法僵娃,它實現(xiàn)了任務(wù)提交、線程管理腋妙、監(jiān)控等等方法默怨。

我們可以基于它來進行業(yè)務(wù)上的擴展,以實現(xiàn)我們需要的其他功能骤素,比如實現(xiàn)定時任務(wù)的類 ScheduledThreadPoolExecutor 就繼承自 ThreadPoolExecutor匙睹。當然,這不是本文關(guān)注的重點济竹,下面痕檬,還是趕緊進行源碼分析吧。

首先送浊,我們來看看線程池實現(xiàn)中的幾個概念和處理流程谆棺。

我們先回顧下提交任務(wù)的幾個方法:

public Future submit(Runnable task) {

if (task == ) throw new PointerException;

RunnableFuture ftask = newTaskFor(task, );

execute(ftask);

return ftask;

}

public Futuresubmit(Runnable task, T result) {

if (task == ) throw new PointerException;

RunnableFuture ftask = newTaskFor(task, result);

execute(ftask);

return ftask;

}

public Futuresubmit(Callable task) {

if (task == ) throw new PointerException;

RunnableFuture ftask = newTaskFor(task);

execute(ftask);

return ftask;

}

一個最基本的概念是,submit 方法中罕袋,參數(shù)是 Runnable 類型(也有Callable 類型)改淑,這個參數(shù)不是用于 new Thread(runnable).start 中的,此處的這個參數(shù)不是用于啟動線程的浴讯,這里指的是任務(wù)朵夏,任務(wù)要做的事情是 run 方法里面定義的或 Callable 中的 call 方法里面定義的。

初學者往往會搞混這個榆纽,因為 Runnable 總是在各個地方出現(xiàn)仰猖,經(jīng)常把一個 Runnable 包到另一個 Runnable 中捏肢。請把它想象成有個 Task 接口,這個接口里面有一個 run 方法饥侵。

我們回過神來繼續(xù)往下看鸵赫,我畫了一個簡單的示意圖來描述線程池中的一些主要的構(gòu)件:

當然,上圖沒有考慮隊列是否有界躏升,提交任務(wù)時隊列滿了怎么辦辩棒?什么情況下會創(chuàng)建新的線程寒矿?提交任務(wù)時線程池滿了怎么辦坪圾?空閑線程怎么關(guān)掉?這些問題下面我們會一一解決稼稿。

我們經(jīng)常會使用?Executors這個工具類來快速構(gòu)造一個線程池佃却,對于初學者而言者吁,這種工具類是很有用的,開發(fā)者不需要關(guān)注太多的細節(jié)饲帅,只要知道自己需要一個線程池复凳,僅僅提供必需的參數(shù)就可以了,其他參數(shù)都采用作者提供的默認值灶泵。

public static ExecutorServicenewFixedThreadPool(int nThreads) {

return new ThreadPoolExecutor(nThreads, nThreads,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue);

}

public static ExecutorServicenewCachedThreadPool{

return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

60L, TimeUnit.SECONDS,

new SynchronousQueue);

}

這里先不說有什么區(qū)別育八,它們最終都會導向這個構(gòu)造方法:

publicThreadPoolExecutor(int corePoolSize,

int maximumPoolSize,

long keepAliveTime,

TimeUnit unit,

BlockingQueue workQueue,

ThreadFactory threadFactory,

RejectedExecutionHandler handler) {

if (corePoolSize < 0 ||

maximumPoolSize <= 0 ||

maximumPoolSize < corePoolSize ||

keepAliveTime < 0)

throw new IllegalArgumentException;

// 這幾個參數(shù)都是必須要有的

if (workQueue == || threadFactory == || handler == )

throw new PointerException;

this.corePoolSize = corePoolSize;

this.maximumPoolSize = maximumPoolSize;

this.workQueue = workQueue;

this.keepAliveTime = unit.toNanos(keepAliveTime);

this.threadFactory = threadFactory;

this.handler = handler;

}

基本上,上面的構(gòu)造方法中列出了我們最需要關(guān)心的幾個屬性了丘逸,下面逐個介紹下構(gòu)造方法中出現(xiàn)的這幾個屬性:

corePoolSize

核心線程數(shù)单鹿,不要摳字眼掀宋,反正先記著有這么個屬性就可以了深纲。

maximumPoolSize

最大線程數(shù),線程池允許創(chuàng)建的最大線程數(shù)劲妙。

workQueue

任務(wù)隊列湃鹊,BlockingQueue 接口的某個實現(xiàn)(常使用 ArrayBlockingQueue 和 LinkedBlockingQueue)。

keepAliveTime

空閑線程的绷头埽活時間币呵,如果某線程的空閑時間超過這個值都沒有任務(wù)給它做,那么可以被關(guān)閉了侨颈。注意這個值并不會對所有線程起作用余赢,如果線程池中的線程數(shù)少于等于核心線程數(shù) corePoolSize,那么這些線程不會因為空閑太長時間而被關(guān)閉哈垢,當然妻柒,也可以通過調(diào)用?allowCoreThreadTimeOut(true)使核心線程數(shù)內(nèi)的線程也可以被回收。

threadFactory

用于生成線程耘分,一般我們可以用默認的就可以了举塔。通常绑警,我們可以通過它將我們的線程的名字設(shè)置得比較可讀一些,如 Message-Thread-1央渣, Message-Thread-2 類似這樣计盒。

handler:

當線程池已經(jīng)滿了,但是又有新的任務(wù)提交的時候芽丹,該采取什么策略由這個來指定北启。有幾種方式可供選擇,像拋出異常志衍、直接拒絕然后返回等暖庄,也可以自己實現(xiàn)相應(yīng)的接口實現(xiàn)自己的邏輯,這個之后再說楼肪。

除了上面幾個屬性外培廓,我們再看看其他重要的屬性。

Doug Lea 采用一個 32 位的整數(shù)來存放線程池的狀態(tài)和當前池中的線程數(shù)春叫,其中高 3 位用于存放線程池狀態(tài)肩钠,低 29 位表示線程數(shù)(即使只有 29 位,也已經(jīng)不小了暂殖,大概 5 億多价匠,現(xiàn)在還沒有哪個機器能起這么多線程的吧)。我們知道呛每,java 語言在整數(shù)編碼上是統(tǒng)一的踩窖,都是采用補碼的形式,下面是簡單的移位操作和布爾操作晨横,都是挺簡單的洋腮。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// 這里 COUNT_BITS 設(shè)置為 29(32-3),意味著前三位用于存放線程狀態(tài)手形,后29位用于存放線程數(shù)

// 很多初學者很喜歡在自己的代碼中寫很多 29 這種數(shù)字啥供,或者某個特殊的字符串,然后分布在各個地方库糠,這是非常糟糕的

private static final int COUNT_BITS = Integer.SIZE - 3;

// 000 11111111111111111111111111111

// 這里得到的是 29 個 1伙狐,也就是說線程池的最大線程數(shù)是 2^29-1=536870911

// 以我們現(xiàn)在計算機的實際情況,這個數(shù)量還是夠用的

private static final int CAPACITY = ( << COUNT_BITS) - 1 1;

// 我們說了瞬欧,線程池的狀態(tài)存放在高 3 位中

// 運算結(jié)果為 111跟29個0:111 00000000000000000000000000000

private static final int RUNNING = -1 << COUNT_BITS;

// 000 00000000000000000000000000000

private static final int SHUTDOWN = 0 << COUNT_BITS;

// 001 00000000000000000000000000000

private static final int STOP = 1 << COUNT_BITS;

// 010 00000000000000000000000000000

private static final int TIDYING = 2 << COUNT_BITS;

// 011 00000000000000000000000000000

private static final int TERMINATED = 3 << COUNT_BITS;

// 將整數(shù) c 的低 29 位修改為 0贷屎,就得到了線程池的狀態(tài)

private static intrunStateOf(int c) { return c & ~CAPACITY; }

// 將整數(shù) c 的高 3 為修改為 0,就得到了線程池中的線程數(shù)

private static intworkerCountOf(int c) { return c & CAPACITY; }

private static intctlOf(int rs, int wc) { return rs | wc; }

/*

* Bit field accessors that don't require unpacking ctl.

* These depend on the bit layout and on workerCount being never negative.

*/

private static booleanrunStateLessThan(int c, int s) {

return c < s;

}

private static booleanrunStateAtLeast(int c, int s) {

return c >= s;

}

private static booleanisRunning(int c) {

return c < SHUTDOWN;

}

上面就是對一個整數(shù)的簡單的位操作艘虎,幾個操作方法將會在后面的源碼中一直出現(xiàn)唉侄,所以讀者最好把方法名字和其代表的功能記住,看源碼的時候也就不需要來來回回翻了顷帖。

在這里美旧,介紹下線程池中的各個狀態(tài)和狀態(tài)變化的轉(zhuǎn)換過程:

RUNNING:這個沒什么好說的渤滞,這是最正常的狀態(tài):接受新的任務(wù),處理等待隊列中的任務(wù)

SHUTDOWN:不接受新的任務(wù)提交榴嗅,但是會繼續(xù)處理等待隊列中的任務(wù)

STOP:不接受新的任務(wù)提交妄呕,不再處理等待隊列中的任務(wù),中斷正在執(zhí)行任務(wù)的線程

TIDYING:所有的任務(wù)都銷毀了嗽测,workCount 為 0绪励。線程池的狀態(tài)在轉(zhuǎn)換為 TIDYING 狀態(tài)時,會執(zhí)行鉤子方法 terminated

TERMINATED:terminated 方法結(jié)束后唠粥,線程池的狀態(tài)就會變成這個

RUNNING 定義為 -1疏魏,SHUTDOWN 定義為 0,其他的都比 0 大晤愧,所以等于 0 的時候不能提交任務(wù)大莫,大于 0 的話,連正在執(zhí)行的任務(wù)也需要中斷官份。

看了這幾種狀態(tài)的介紹只厘,讀者大體也可以猜到十之八九的狀態(tài)轉(zhuǎn)換了,各個狀態(tài)的轉(zhuǎn)換過程有以下幾種:

RUNNING -> SHUTDOWN:當調(diào)用了 shutdown 后舅巷,會發(fā)生這個狀態(tài)轉(zhuǎn)換羔味,這也是最重要的

(RUNNING or SHUTDOWN) -> STOP:當調(diào)用 shutdownNow 后,會發(fā)生這個狀態(tài)轉(zhuǎn)換钠右,這下要清楚 shutDown 和 shutDownNow 的區(qū)別了

SHUTDOWN -> TIDYING:當任務(wù)隊列和線程池都清空后赋元,會由 SHUTDOWN 轉(zhuǎn)換為 TIDYING

STOP -> TIDYING:當任務(wù)隊列清空后,發(fā)生這個轉(zhuǎn)換

TIDYING -> TERMINATED:這個前面說了飒房,當 terminated 方法結(jié)束后

上面的幾個記住核心的就可以了搁凸,尤其第一個和第二個。

另外情屹,我們還要看看一個內(nèi)部類 Worker坪仇,因為 Doug Lea 把線程池中的線程包裝成了一個個 Worker杂腰,翻譯成工人垃你,就是線程池中做任務(wù)的線程。所以到這里喂很,我們知道任務(wù)是 Runnable(內(nèi)部變量名叫 task 或 command)惜颇,線程是 Worker。

Worker 這里又用到了抽象類 AbstractQueuedSynchronizer少辣。題外話凌摄,AQS 在并發(fā)中真的是到處出現(xiàn),而且非常容易使用漓帅,寫少量的代碼就能實現(xiàn)自己需要的同步方式(對 AQS 源碼感興趣的讀者請參看我之前寫的幾篇文章)锨亏。

private final classWorker

extendsAbstractQueuedSynchronizer

implementsRunnable{

private static final long serialVersionUID = 6138294804551838833L;

// 這個是真正的線程痴怨,任務(wù)靠你啦

final Thread thread;

// 前面說了,這里的 Runnable 是任務(wù)器予。為什么叫 firstTask浪藻?因為在創(chuàng)建線程的時候,如果同時指定了

// 這個線程起來以后需要執(zhí)行的第一個任務(wù)乾翔,那么第一個任務(wù)就是存放在這里的(線程可不止執(zhí)行這一個任務(wù))

// 當然了爱葵,也可以為 ,這樣線程起來了反浓,自己到任務(wù)隊列(BlockingQueue)中取任務(wù)(getTask 方法)就行了

Runnable firstTask;

// 用于存放此線程完成的任務(wù)數(shù)萌丈,注意了,這里用了 volatile雷则,保證可見性

volatile long completedTasks;

// Worker 只有這一個構(gòu)造方法辆雾,傳入 firstTask,也可以傳

Worker(Runnable firstTask) {

setState(-1); // inhibit interrupts until runWorker

this.firstTask = firstTask;

// 調(diào)用 ThreadFactory 來創(chuàng)建一個新的線程

this.thread = getThreadFactory.newThread(this);

}

// 這里調(diào)用了外部類的 runWorker 方法

public voidrun{

runWorker(this);

}

...// 其他幾個方法沒什么好看的月劈,就是用 AQS 操作乾颁,來獲取這個線程的執(zhí)行權(quán),用了獨占鎖

}

前面雖然啰嗦艺栈,但是簡單英岭。有了上面的這些基礎(chǔ)后,我們終于可以看看 ThreadPoolExecutor 的 execute 方法了湿右,前面源碼分析的時候也說了诅妹,各種方法都最終依賴于 execute 方法:

public voidexecute(Runnable command) {

if (command == )

throw new PointerException;

// 前面說的那個表示 “線程池狀態(tài)” 和 “線程數(shù)” 的整數(shù)

int c = ctl.get;

// 如果當前線程數(shù)少于核心線程數(shù),那么直接添加一個 worker 來執(zhí)行任務(wù)毅人,

// 創(chuàng)建一個新的線程吭狡,并把當前任務(wù) command 作為這個線程的第一個任務(wù)(firstTask)

if (workerCountOf(c) < corePoolSize) {

// 添加任務(wù)成功,那么就結(jié)束了丈莺。提交任務(wù)嘛划煮,線程池已經(jīng)接受了這個任務(wù),這個方法也就可以返回了

// 至于執(zhí)行的結(jié)果缔俄,到時候會包裝到 FutureTask 中弛秋。

// 返回 false 代表線程池不允許提交任務(wù)

if (addWorker(command, true))

return;

c = ctl.get;

}

// 到這里說明,要么當前線程數(shù)大于等于核心線程數(shù)俐载,要么剛剛 addWorker 失敗了

// 如果線程池處于 RUNNING 狀態(tài)蟹略,把這個任務(wù)添加到任務(wù)隊列 workQueue 中

if (isRunning(c) && workQueue.offer(command)) {

/* 這里面說的是,如果任務(wù)進入了 workQueue遏佣,我們是否需要開啟新的線程

* 因為線程數(shù)在 [0, corePoolSize) 是無條件開啟新的線程

* 如果線程數(shù)已經(jīng)大于等于 corePoolSize挖炬,那么將任務(wù)添加到隊列中,然后進到這里

*/

int recheck = ctl.get;

// 如果線程池已不處于 RUNNING 狀態(tài)状婶,那么移除已經(jīng)入隊的這個任務(wù)意敛,并且執(zhí)行拒絕策略

if (! isRunning(recheck) && remove(command))

reject(command);

// 如果線程池還是 RUNNING 的馅巷,并且線程數(shù)為 0,那么開啟新的線程

// 到這里草姻,我們知道了令杈,這塊代碼的真正意圖是:擔心任務(wù)提交到隊列中了,但是線程都關(guān)閉了

else if (workerCountOf(recheck) == 0)

addWorker(, false);

}

// 如果 workQueue 隊列滿了碴倾,那么進入到這個分支

// 以 maximumPoolSize 為界創(chuàng)建新的 worker逗噩,

// 如果失敗,說明當前線程數(shù)已經(jīng)達到 maximumPoolSize跌榔,執(zhí)行拒絕策略

else if (!addWorker(command, false))

reject(command);

}

對創(chuàng)建線程的錯誤理解:如果線程數(shù)少于 corePoolSize异雁,創(chuàng)建一個線程,如果線程數(shù)在 [corePoolSize, maximumPoolSize] 之間那么可以創(chuàng)建線程或復用空閑線程僧须,keepAliveTime 對這個區(qū)間的線程有效纲刀。

從上面的幾個分支,我們就可以看出担平,上面的這段話是錯誤的示绊。

上面這些一時半會也不可能全部消化搞定,我們先繼續(xù)往下吧暂论,到時候再回頭看幾遍面褐。

這個方法非常重要 addWorker(Runnable firstTask, boolean core) 方法,我們看看它是怎么創(chuàng)建新的線程的:

// 第一個參數(shù)是準備提交給這個線程執(zhí)行的任務(wù)取胎,之前說了展哭,可以為

// 第二個參數(shù)為 true 代表使用核心線程數(shù) corePoolSize 作為創(chuàng)建線程的界限,也就說創(chuàng)建這個線程的時候闻蛀,

// 如果線程池中的線程總數(shù)已經(jīng)達到 corePoolSize匪傍,那么不能響應(yīng)這次創(chuàng)建線程的請求

// 如果是 false,代表使用最大線程數(shù) maximumPoolSize 作為界限

private booleanaddWorker(Runnable firstTask, boolean core) {

retry:

for (;;) {

int c = ctl.get;

int rs = runStateOf(c);

// 這個非常不好理解

// 如果線程池已關(guān)閉觉痛,并滿足以下條件之一役衡,那么不創(chuàng)建新的 worker:

// 1. 線程池狀態(tài)大于 SHUTDOWN,其實也就是 STOP, TIDYING, 或 TERMINATED

// 2. firstTask !=

// 3. workQueue.isEmpty

// 簡單分析下:

// 還是狀態(tài)控制的問題薪棒,當線程池處于 SHUTDOWN 的時候手蝎,不允許提交任務(wù),但是已有的任務(wù)繼續(xù)執(zhí)行

// 當狀態(tài)大于 SHUTDOWN 時盗尸,不允許提交任務(wù)柑船,且中斷正在執(zhí)行的任務(wù)

// 多說一句:如果線程池處于 SHUTDOWN帽撑,但是 firstTask 為 泼各,且 workQueue 非空,那么是允許創(chuàng)建 worker 的

// 這是因為 SHUTDOWN 的語義:不允許提交新的任務(wù)亏拉,但是要把已經(jīng)進入到 workQueue 的任務(wù)執(zhí)行完扣蜻,所以在滿足條件的基礎(chǔ)上逆巍,是允許創(chuàng)建新的 Worker 的

if (rs >= SHUTDOWN &&

! (rs == SHUTDOWN &&

firstTask == &&

! workQueue.isEmpty))

return false;

for (;;) {

int wc = workerCountOf(c);

if (wc >= CAPACITY ||

wc >= (core ? corePoolSize : maximumPoolSize))

return false;

// 如果成功,那么就是所有創(chuàng)建線程前的條件校驗都滿足了莽使,準備創(chuàng)建線程執(zhí)行任務(wù)了

// 這里失敗的話锐极,說明有其他線程也在嘗試往線程池中創(chuàng)建線程

if (compareAndIncrementWorkerCount(c))

break retry;

// 由于有并發(fā),重新再讀取一下 ctl

c = ctl.get;

// 正常如果是 CAS 失敗的話芳肌,進到下一個里層的for循環(huán)就可以了

// 可是如果是因為其他線程的操作灵再,導致線程池的狀態(tài)發(fā)生了變更,如有其他線程關(guān)閉了這個線程池

// 那么需要回到外層的for循環(huán)

if (runStateOf(c) != rs)

continue retry;

// else CAS failed due to workerCount change; retry inner loop

}

}

/*

* 到這里亿笤,我們認為在當前這個時刻翎迁,可以開始創(chuàng)建線程來執(zhí)行任務(wù)了,

* 因為該校驗的都校驗了净薛,至于以后會發(fā)生什么汪榔,那是以后的事,至少當前是滿足條件的

*/

// worker 是否已經(jīng)啟動

boolean workerStarted = false;

// 是否已將這個 worker 添加到 workers 這個 HashSet 中

boolean workerAdded = false;

Worker w = ;

try {

final ReentrantLock mainLock = this.mainLock;

// 把 firstTask 傳給 worker 的構(gòu)造方法

w = new Worker(firstTask);

// 取 worker 中的線程對象肃拜,之前說了痴腌,Worker的構(gòu)造方法會調(diào)用 ThreadFactory 來創(chuàng)建一個新的線程

final Thread t = w.thread;

if (t != ) {

// 這個是整個線程池的全局鎖,持有這個鎖才能讓下面的操作“順理成章”,

// 因為關(guān)閉一個線程池需要這個鎖,至少我持有鎖的期間沿侈,線程池不會被關(guān)閉

mainLock.lock;

try {

int c = ctl.get;

int rs = runStateOf(c);

// 小于 SHUTTDOWN 那就是 RUNNING初斑,這個自不必說,是最正常的情況

// 如果等于 SHUTDOWN侥猬,前面說了,不接受新的任務(wù),但是會繼續(xù)執(zhí)行等待隊列中的任務(wù)

if (rs < SHUTDOWN ||

(rs == SHUTDOWN && firstTask == )) {

// worker 里面的 thread 可不能是已經(jīng)啟動的

if (t.isAlive)

throw new IllegalThreadStateException;

// 加到 workers 這個 HashSet 中

workers.add(w);

int s = workers.size;

// largestPoolSize 用于記錄 workers 中的個數(shù)的最大值

// 因為 workers 是不斷增加減少的懦胞,通過這個值可以知道線程池的大小曾經(jīng)達到的最大值

if (s > largestPoolSize)

largestPoolSize = s;

workerAdded = true;

}

} finally {

mainLock.unlock;

}

// 添加成功的話,啟動這個線程

if (workerAdded) {

// 啟動線程

t.start;

workerStarted = true;

}

}

} finally {

// 如果線程沒有啟動凉泄,需要做一些清理工作躏尉,如前面 workCount 加了 1,將其減掉

if (! workerStarted)

addWorkerFailed(w);

}

// 返回線程是否啟動成功

return workerStarted;

}

簡單看下 addWorkFailed 的處理:

// workers 中刪除掉相應(yīng)的 worker

// workCount 減 1

private voidaddWorkerFailed(Worker w) {

final ReentrantLock mainLock = this.mainLock;

mainLock.lock;

try {

if (w != )

workers.remove(w);

decrementWorkerCount;

// rechecks for termination, in case the existence of this worker was holding up termination

tryTerminate;

} finally {

mainLock.unlock;

}

}

回過頭來后众,繼續(xù)往下走胀糜。我們知道,worker 中的線程 start 后蒂誉,其 run 方法會調(diào)用 runWorker 方法:

// Worker 類的 run 方法

public voidrun{

runWorker(this);

}

繼續(xù)往下看 runWorker 方法:

// 此方法由 worker 線程啟動后調(diào)用教藻,這里用一個 while 循環(huán)來不斷地從等待隊列中獲取任務(wù)并執(zhí)行

// 前面說了,worker 在初始化的時候右锨,可以指定 firstTask括堤,那么第一個任務(wù)也就可以不需要從隊列中獲取

final voidrunWorker(Worker w) {

//

Thread wt = Thread.currentThread;

// 該線程的第一個任務(wù)(如果有的話)

Runnable task = w.firstTask;

w.firstTask = ;

w.unlock; // allow interrupts

boolean completedAbruptly = true;

try {

// 循環(huán)調(diào)用 getTask 獲取任務(wù)

while (task != || (task = getTask) != ) {

w.lock;

// 如果線程池狀態(tài)大于等于 STOP,那么意味著該線程也要中斷

if ((runStateAtLeast(ctl.get, STOP) ||

(Thread.interrupted &&

runStateAtLeast(ctl.get, STOP))) &&

!wt.isInterrupted)

wt.interrupt;

try {

// 這是一個鉤子方法,留給需要的子類實現(xiàn)

beforeExecute(wt, task);

Throwable thrown = ;

try {

// 到這里終于可以執(zhí)行任務(wù)了

task.run;

} catch (RuntimeException x) {

thrown = x; throw x;

} catch (Error x) {

thrown = x; throw x;

} catch (Throwable x) {

// 這里不允許拋出 Throwable悄窃,所以轉(zhuǎn)換為 Error

thrown = x; throw new Error(x);

} finally {

// 也是一個鉤子方法讥电,將 task 和異常作為參數(shù),留給需要的子類實現(xiàn)

afterExecute(task, thrown);

}

} finally {

// 置空 task轧抗,準備 getTask 獲取下一個任務(wù)

task = ;

//累加完成的任務(wù)數(shù)

w.completedTasks++;

// 釋放掉 worker 的獨占鎖

w.unlock;

}

}

completedAbruptly = false;

} finally {

// 如果到這里恩敌,需要執(zhí)行線程關(guān)閉:

// 1. 說明 getTask 返回 ,也就是說横媚,隊列中已經(jīng)沒有任務(wù)需要執(zhí)行了纠炮,執(zhí)行關(guān)閉

// 2. 任務(wù)執(zhí)行過程中發(fā)生了異常

// 第一種情況,已經(jīng)在代碼處理了將 workCount 減 1灯蝴,這個在 getTask 方法分析中會說

// 第二種情況抗碰,workCount 沒有進行處理,所以需要在 processWorkerExit 中處理

// 限于篇幅绽乔,我不準備分析這個方法了弧蝇,感興趣的讀者請自行分析源碼

processWorkerExit(w, completedAbruptly);

}

}

我們看看 getTask 是怎么獲取任務(wù)的,這個方法寫得真的很好折砸,每一行都很簡單看疗,組合起來卻所有的情況都想好了:

// 此方法有三種可能:

// 1. 阻塞直到獲取到任務(wù)返回。我們知道睦授,默認 corePoolSize 之內(nèi)的線程是不會被回收的两芳,

// 它們會一直等待任務(wù)

// 2. 超時退出。keepAliveTime 起作用的時候去枷,也就是如果這么多時間內(nèi)都沒有任務(wù)怖辆,那么應(yīng)該執(zhí)行關(guān)閉

// 3. 如果發(fā)生了以下條件,此方法必須返回 :

// - 池中有大于 maximumPoolSize 個 workers 存在(通過調(diào)用 setMaximumPoolSize 進行設(shè)置)

// - 線程池處于 SHUTDOWN删顶,而且 workQueue 是空的竖螃,前面說了,這種不再接受新的任務(wù)

// - 線程池處于 STOP逗余,不僅不接受新的線程特咆,連 workQueue 中的線程也不再執(zhí)行

private RunnablegetTask{

boolean timedOut = false; // Did the last poll time out?

retry:

for (;;) {

int c = ctl.get;

int rs = runStateOf(c);

// 兩種可能

// 1. rs == SHUTDOWN && workQueue.isEmpty

// 2. rs >= STOP

if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty)) {

// CAS 操作,減少工作線程數(shù)

decrementWorkerCount;

return ;

}

boolean timed; // Are workers subject to culling?

for (;;) {

int wc = workerCountOf(c);

// 允許核心線程數(shù)內(nèi)的線程回收录粱,或當前線程數(shù)超過了核心線程數(shù)腻格,那么有可能發(fā)生超時關(guān)閉

timed = allowCoreThreadTimeOut || wc > corePoolSize;

// 這里 break,是為了不往下執(zhí)行后一個 if (compareAndDecrementWorkerCount(c))

// 兩個 if 一起看:如果當前線程數(shù) wc > maximumPoolSize啥繁,或者超時菜职,都返回

// 那這里的問題來了,wc > maximumPoolSize 的情況旗闽,為什么要返回 酬核?

// 換句話說蜜另,返回 意味著關(guān)閉線程。

// 那是因為有可能開發(fā)者調(diào)用了 setMaximumPoolSize 將線程池的 maximumPoolSize 調(diào)小了愁茁,那么多余的 Worker 就需要被關(guān)閉

if (wc <= maximumPoolSize && ! (timedOut && timed))

break;

if (compareAndDecrementWorkerCount(c))

return ;

c = ctl.get; // Re-read ctl

// compareAndDecrementWorkerCount(c) 失敗蚕钦,線程池中的線程數(shù)發(fā)生了改變

if (runStateOf(c) != rs)

continue retry;

// else CAS failed due to workerCount change; retry inner loop

}

// wc <= maximumPoolSize 同時沒有超時

try {

// 到 workQueue 中獲取任務(wù)

Runnable r = timed ?

workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

workQueue.take;

if (r != )

return r;

timedOut = true;

} catch (InterruptedException retry) {

// 如果此 worker 發(fā)生了中斷亭病,采取的方案是重試

// 解釋下為什么會發(fā)生中斷鹅很,這個讀者要去看 setMaximumPoolSize 方法。

// 如果開發(fā)者將 maximumPoolSize 調(diào)小了罪帖,導致其小于當前的 workers 數(shù)量促煮,

// 那么意味著超出的部分線程要被關(guān)閉。重新進入 for 循環(huán)整袁,自然會有部分線程會返回

timedOut = false;

}

}

}

到這里菠齿,基本上也說完了整個流程,讀者這個時候應(yīng)該回到 execute(Runnable command) 方法坐昙,看看各個分支绳匀,我把代碼貼過來一下:

public voidexecute(Runnable command) {

if (command == )

throw new PointerException;

// 前面說的那個表示 “線程池狀態(tài)” 和 “線程數(shù)” 的整數(shù)

int c = ctl.get;

// 如果當前線程數(shù)少于核心線程數(shù),那么直接添加一個 worker 來執(zhí)行任務(wù)炸客,

// 創(chuàng)建一個新的線程疾棵,并把當前任務(wù) command 作為這個線程的第一個任務(wù)(firstTask)

if (workerCountOf(c) < corePoolSize) {

// 添加任務(wù)成功,那么就結(jié)束了痹仙。提交任務(wù)嘛是尔,線程池已經(jīng)接受了這個任務(wù),這個方法也就可以返回了

// 至于執(zhí)行的結(jié)果开仰,到時候會包裝到 FutureTask 中拟枚。

// 返回 false 代表線程池不允許提交任務(wù)

if (addWorker(command, true))

return;

c = ctl.get;

}

// 到這里說明,要么當前線程數(shù)大于等于核心線程數(shù)众弓,要么剛剛 addWorker 失敗了

// 如果線程池處于 RUNNING 狀態(tài)恩溅,把這個任務(wù)添加到任務(wù)隊列 workQueue 中

if (isRunning(c) && workQueue.offer(command)) {

/* 這里面說的是,如果任務(wù)進入了 workQueue谓娃,我們是否需要開啟新的線程

* 因為線程數(shù)在 [0, corePoolSize) 是無條件開啟新的線程

* 如果線程數(shù)已經(jīng)大于等于 corePoolSize暴匠,那么將任務(wù)添加到隊列中,然后進到這里

*/

int recheck = ctl.get;

// 如果線程池已不處于 RUNNING 狀態(tài)傻粘,那么移除已經(jīng)入隊的這個任務(wù)每窖,并且執(zhí)行拒絕策略

if (! isRunning(recheck) && remove(command))

reject(command);

// 如果線程池還是 RUNNING 的,并且線程數(shù)為 0弦悉,那么開啟新的線程

// 到這里窒典,我們知道了,這塊代碼的真正意圖是:擔心任務(wù)提交到隊列中了稽莉,但是線程都關(guān)閉了

else if (workerCountOf(recheck) == 0)

addWorker(, false);

}

// 如果 workQueue 隊列滿了瀑志,那么進入到這個分支

// 以 maximumPoolSize 為界創(chuàng)建新的 worker,

// 如果失敗,說明當前線程數(shù)已經(jīng)達到 maximumPoolSize劈猪,執(zhí)行拒絕策略

else if (!addWorker(command, false))

reject(command);

}

上面各個分支中昧甘,有兩種情況會調(diào)用 reject(command) 來處理任務(wù),因為按照正常的流程战得,線程池此時不能接受這個任務(wù)充边,所以需要執(zhí)行我們的拒絕策略。接下來常侦,我們說一說 ThreadPoolExecutor 中的拒絕策略浇冰。

final voidreject(Runnable command) {

// 執(zhí)行拒絕策略

handler.rejectedExecution(command, this);

}

此處的 handler 我們需要在構(gòu)造線程池的時候就傳入這個參數(shù),它是 RejectedExecutionHandler 的實例聋亡。

RejectedExecutionHandler 在 ThreadPoolExecutor 中有四個已經(jīng)定義好的實現(xiàn)類可供我們直接使用肘习,當然,我們也可以實現(xiàn)自己的策略坡倔,不過一般也沒有必要漂佩。

// 只要線程池沒有被關(guān)閉,那么由提交任務(wù)的線程自己來執(zhí)行這個任務(wù)罪塔。

public static classCallerRunsPolicyimplementsRejectedExecutionHandler{

publicCallerRunsPolicy{ }

public voidrejectedExecution(Runnable r, ThreadPoolExecutor e) {

if (!e.isShutdown) {

r.run;

}

}

}

// 不管怎樣投蝉,直接拋出 RejectedExecutionException 異常

// 這個是默認的策略,如果我們構(gòu)造線程池的時候不傳相應(yīng)的 handler 的話垢袱,那就會指定使用這個

public static classAbortPolicyimplementsRejectedExecutionHandler{

publicAbortPolicy{ }

public voidrejectedExecution(Runnable r, ThreadPoolExecutor e) {

throw new RejectedExecutionException("Task " + r.toString +

" rejected from " +

e.toString);

}

}

// 不做任何處理墓拜,直接忽略掉這個任務(wù)

public static classDiscardPolicyimplementsRejectedExecutionHandler{

publicDiscardPolicy{ }

public voidrejectedExecution(Runnable r, ThreadPoolExecutor e) {

}

}

// 這個相對霸道一點,如果線程池沒有被關(guān)閉的話请契,

// 把隊列隊頭的任務(wù)(也就是等待了最長時間的)直接扔掉咳榜,然后提交這個任務(wù)到等待隊列中

public static classDiscardOldestPolicyimplementsRejectedExecutionHandler{

publicDiscardOldestPolicy{ }

public voidrejectedExecution(Runnable r, ThreadPoolExecutor e) {

if (!e.isShutdown) {

e.getQueue.poll;

e.execute(r);

}

}

}

到這里,ThreadPoolExecutor 的源碼算是分析結(jié)束了爽锥。單純從源碼的難易程度來說涌韩,ThreadPoolExecutor 的源碼還算是比較簡單的,只是需要我們靜下心來好好看看罷了氯夷。

Executors

這節(jié)其實也不是分析 Executors 這個類臣樱,因為它僅僅是工具類,它的所有方法都是 static 的腮考。

生成一個固定大小的線程池:

public static ExecutorServicenewFixedThreadPool(int nThreads) {

return new ThreadPoolExecutor(nThreads, nThreads,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue);

}

最大線程數(shù)設(shè)置為與核心線程數(shù)相等雇毫,此時 keepAliveTime 設(shè)置為 0(因為這里它是沒用的,即使不為 0踩蔚,線程池默認也不會回收 corePoolSize 內(nèi)的線程)棚放,任務(wù)隊列采用 LinkedBlockingQueue,無界隊列馅闽。

過程分析:剛開始飘蚯,每提交一個任務(wù)都創(chuàng)建一個 worker馍迄,當 worker 的數(shù)量達到 nThreads 后,不再創(chuàng)建新的線程局骤,而是把任務(wù)提交到 LinkedBlockingQueue 中攀圈,而且之后線程數(shù)始終為 nThreads。

生成只有一個線程的固定線程池峦甩,這個更簡單赘来,和上面的一樣,只要設(shè)置線程數(shù)為 1 就可以了:

public static ExecutorServicenewSingleThreadExecutor{

return new FinalizableDelegatedExecutorService

(new ThreadPoolExecutor(, 1 1,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue));

}

生成一個需要的時候就創(chuàng)建新的線程穴店,同時可以復用之前創(chuàng)建的線程(如果這個線程當前沒有任務(wù))的線程池:

public static ExecutorServicenewCachedThreadPool{

return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

60L, TimeUnit.SECONDS,

new SynchronousQueue);

}

核心線程數(shù)為 0撕捍,最大線程數(shù)為 Integer.MAX_VALUE拿穴,keepAliveTime 為 60 秒泣洞,任務(wù)隊列采用 SynchronousQueue。

這種線程池對于任務(wù)可以比較快速地完成的情況有比較好的性能默色。如果線程空閑了 60 秒都沒有任務(wù)球凰,那么將關(guān)閉此線程并從線程池中移除。所以如果線程池空閑了很長時間也不會有問題腿宰,因為隨著所有的線程都會被關(guān)閉呕诉,整個線程池不會占用任何的系統(tǒng)資源。

過程分析:我把 execute 方法的主體黏貼過來吃度,讓大家看得明白些甩挫。鑒于 corePoolSize 是 0,那么提交任務(wù)的時候椿每,直接將任務(wù)提交到隊列中伊者,由于采用了 SynchronousQueue,所以如果是第一個任務(wù)提交的時候间护,offer 方法肯定會返回 false亦渗,因為此時沒有任何 worker 對這個任務(wù)進行接收,那么將進入到最后一個分支來創(chuàng)建第一個 worker汁尺。之后再提交任務(wù)的話法精,取決于是否有空閑下來的線程對任務(wù)進行接收,如果有痴突,會進入到第二個 if 語句塊中搂蜓,否則就是和第一個任務(wù)一樣,進到最后的 else if 分支創(chuàng)建新線程辽装。

int c = ctl.get;

// corePoolSize 為 0帮碰,所以不會進到這個 if 分支

if (workerCountOf(c) < corePoolSize) {

if (addWorker(command, true))

return;

c = ctl.get;

}

// offer 如果有空閑線程剛好可以接收此任務(wù),那么返回 true如迟,否則返回 false

if (isRunning(c) && workQueue.offer(command)) {

int recheck = ctl.get;

if (! isRunning(recheck) && remove(command))

reject(command);

else if (workerCountOf(recheck) == 0)

addWorker(, false);

}

else if (!addWorker(command, false))

reject(command);

SynchronousQueue 是一個比較特殊的 BlockingQueue收毫,其本身不儲存任何元素攻走,它有一個虛擬隊列(或虛擬棧),不管讀操作還是寫操作此再,如果當前隊列中存儲的是與當前操作相同模式的線程昔搂,那么當前操作也進入隊列中等待;如果是相反模式输拇,則配對成功摘符,從當前隊列中取隊頭節(jié)點。具體的信息策吠,可以看我的另一篇關(guān)于 BlockingQueue 的文章逛裤。

總結(jié)

我一向不喜歡寫總結(jié),因為我把所有需要表達的都寫在正文中了猴抹,寫小篇幅的總結(jié)并不能真正將話說清楚带族,本文的總結(jié)部分為準備面試的讀者而寫,希望能幫到面試者或者沒有足夠的時間看完全文的讀者蟀给。

java 線程池有哪些關(guān)鍵屬性蝙砌?

corePoolSize,maximumPoolSize跋理,workQueue择克,keepAliveTime,rejectedExecutionHandler

corePoolSize 到 maximumPoolSize 之間的線程會被回收前普,當然 corePoolSize 的線程也可以通過設(shè)置而得到回收(allowCoreThreadTimeOut(true))肚邢。

workQueue 用于存放任務(wù),添加任務(wù)的時候拭卿,如果當前線程數(shù)超過了 corePoolSize骡湖,那么往該隊列中插入任務(wù),線程池中的線程會負責到隊列中拉取任務(wù)记劈。

keepAliveTime 用于設(shè)置空閑時間勺鸦,如果線程數(shù)超出了 corePoolSize,并且有些線程的空閑時間超過了這個值目木,會執(zhí)行關(guān)閉這些線程的操作

rejectedExecutionHandler 用于處理當線程池不能執(zhí)行此任務(wù)時的情況换途,默認有拋出 RejectedExecutionException 異常、忽略任務(wù)刽射、使用提交任務(wù)的線程來執(zhí)行此任務(wù)和將隊列中等待最久的任務(wù)刪除军拟,然后提交此任務(wù)這四種策略,默認為拋出異常誓禁。

說說線程池中的線程創(chuàng)建時機懈息?

* 注意:如果將隊列設(shè)置為無界隊列,那么線程數(shù)達到 corePoolSize 后摹恰,其實線程數(shù)就不會再增長了辫继。因為后面的任務(wù)直接往隊列塞就行了怒见,此時 maximumPoolSize 參數(shù)就沒有什么意義。

如果當前線程數(shù)少于 corePoolSize姑宽,那么提交任務(wù)的時候創(chuàng)建一個新的線程遣耍,并由這個線程執(zhí)行這個任務(wù);

如果當前線程數(shù)已經(jīng)達到 corePoolSize炮车,那么將提交的任務(wù)添加到隊列中舵变,等待線程池中的線程去隊列中取任務(wù);

如果隊列已滿瘦穆,那么創(chuàng)建新的線程來執(zhí)行任務(wù)纪隙,需要保證池中的線程數(shù)不會超過 maximumPoolSize,如果此時線程數(shù)超過了 maximumPoolSize扛或,那么執(zhí)行拒絕策略绵咱。

Executors.newFixedThreadPool(…) 和 Executors.newCachedThreadPool 構(gòu)造出來的線程池有什么差別?

細說太長,往上滑一點點,在 Executors 的小節(jié)進行了詳盡的描述氛悬。

任務(wù)執(zhí)行過程中發(fā)生異常怎么處理剃浇?

如果某個任務(wù)執(zhí)行出現(xiàn)異常,那么執(zhí)行任務(wù)的線程會被關(guān)閉蒂萎,而不是繼續(xù)接收其他任務(wù)秆吵。然后會啟動一個新的線程來代替它。

什么時候會執(zhí)行拒絕策略五慈?

workers 的數(shù)量達到了 corePoolSize(任務(wù)此時需要進入任務(wù)隊列)纳寂,任務(wù)入隊成功,與此同時線程池被關(guān)閉了泻拦,而且關(guān)閉線程池并沒有將這個任務(wù)出隊毙芜,那么執(zhí)行拒絕策略。這里說的是非常邊界的問題争拐,入隊和關(guān)閉線程池并發(fā)執(zhí)行腋粥,讀者仔細看看 execute 方法是怎么進到第一個 reject(command) 里面的。

workers 的數(shù)量大于等于 corePoolSize架曹,將任務(wù)加入到任務(wù)隊列隘冲,可是隊列滿了,任務(wù)入隊失敗绑雄,那么準備開啟新的線程展辞,可是線程數(shù)已經(jīng)達到 maximumPoolSize,那么執(zhí)行拒絕策略万牺。

因為本文實在太長了罗珍,所以我沒有說執(zhí)行結(jié)果是怎么獲取的洽腺,也沒有說關(guān)閉線程池相關(guān)的部分,這個就留給讀者吧覆旱。

本文篇幅是有點長已脓,如果讀者發(fā)現(xiàn)什么不對的地方,或者有需要補充的地方通殃,請不吝提出度液,謝謝。

(全文完)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末画舌,一起剝皮案震驚了整個濱河市堕担,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌曲聂,老刑警劉巖霹购,帶你破解...
    沈念sama閱讀 206,602評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異朋腋,居然都是意外死亡齐疙,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,442評論 2 382
  • 文/潘曉璐 我一進店門旭咽,熙熙樓的掌柜王于貴愁眉苦臉地迎上來贞奋,“玉大人,你說我怎么就攤上這事穷绵〗嗡” “怎么了?”我有些...
    開封第一講書人閱讀 152,878評論 0 344
  • 文/不壞的土叔 我叫張陵仲墨,是天一觀的道長勾缭。 經(jīng)常有香客問我,道長目养,這世上最難降的妖魔是什么俩由? 我笑而不...
    開封第一講書人閱讀 55,306評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮癌蚁,結(jié)果婚禮上幻梯,老公的妹妹穿的比我還像新娘。我一直安慰自己匈勋,他們只是感情好礼旅,可當我...
    茶點故事閱讀 64,330評論 5 373
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著洽洁,像睡著了一般痘系。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上饿自,一...
    開封第一講書人閱讀 49,071評論 1 285
  • 那天汰翠,我揣著相機與錄音龄坪,去河邊找鬼。 笑死复唤,一個胖子當著我的面吹牛健田,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播佛纫,決...
    沈念sama閱讀 38,382評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼妓局,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了呈宇?” 一聲冷哼從身側(cè)響起好爬,我...
    開封第一講書人閱讀 37,006評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎甥啄,沒想到半個月后存炮,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,512評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡蜈漓,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,965評論 2 325
  • 正文 我和宋清朗相戀三年穆桂,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片融虽。...
    茶點故事閱讀 38,094評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡享完,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出衣形,到底是詐尸還是另有隱情驼侠,我是刑警寧澤,帶...
    沈念sama閱讀 33,732評論 4 323
  • 正文 年R本政府宣布谆吴,位于F島的核電站,受9級特大地震影響苛预,放射性物質(zhì)發(fā)生泄漏句狼。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,283評論 3 307
  • 文/蒙蒙 一热某、第九天 我趴在偏房一處隱蔽的房頂上張望腻菇。 院中可真熱鬧,春花似錦昔馋、人聲如沸筹吐。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,286評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽丘薛。三九已至,卻和暖如春邦危,著一層夾襖步出監(jiān)牢的瞬間洋侨,已是汗流浹背舍扰。 一陣腳步聲響...
    開封第一講書人閱讀 31,512評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留希坚,地道東北人边苹。 一個月前我還...
    沈念sama閱讀 45,536評論 2 354
  • 正文 我出身青樓,卻偏偏與公主長得像裁僧,于是被迫代替她去往敵國和親个束。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,828評論 2 345

推薦閱讀更多精彩內(nèi)容