![Threadpool](http://icdn.apigo.cn/blog/threadpool-start01.png)
本文你將獲得以下信息:
- 線程池源碼解讀
- 線程池執(zhí)行流程分析
- 帶返回值的線程池實(shí)現(xiàn)
- 延遲線程池實(shí)現(xiàn)
為了方便讀者理解烫幕,本文會(huì)由淺入深,先從線程池的使用開(kāi)始再延伸到源碼解讀和源碼分析等高級(jí)內(nèi)容,讀者可根據(jù)自己的情況自主選擇閱讀順序和需要了解的章節(jié)。
一、線程池優(yōu)點(diǎn)
線程池能夠更加充分的利用CPU票顾、內(nèi)存、網(wǎng)絡(luò)帆调、IO等系統(tǒng)資源奠骄,線程池的主要作用如下:
- 利用線程池可以復(fù)用線程,控制最大并發(fā)數(shù)番刊;
- 實(shí)現(xiàn)任務(wù)緩存策略和拒絕機(jī)制含鳞;
- 實(shí)現(xiàn)延遲執(zhí)行
阿里巴巴Java開(kāi)發(fā)手冊(cè)強(qiáng)制規(guī)定:線程資源必須通過(guò)線程池提供,如下圖:
![線程池規(guī)定](http://icdn.apigo.cn/blog/threadpool-ali01.png)
二芹务、線程池使用
本節(jié)會(huì)介紹7種線程池的創(chuàng)建與使用蝉绷,線程池的狀態(tài)介紹鸭廷,ThreadPoolExecutor參數(shù)介紹等。
2.1 線程池創(chuàng)建
線程池可以使用Executors和ThreadPoolExecutor熔吗,其中使用Executors有六種創(chuàng)建線程池的方法辆床,如下圖:
![線程池創(chuàng)建](http://icdn.apigo.cn/blog/threadpool-cmethed.png)
// 使用Executors方式創(chuàng)建
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(2);
ScheduledExecutorService singleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);
ExecutorService workStealingPool = Executors.newWorkStealingPool();
// 原始創(chuàng)建方式
ThreadPoolExecutor tp = new ThreadPoolExecutor(10, 10, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
2.1.1 線程池解讀
- newSingleThreadExecutor(),它的特點(diǎn)在于工作線程數(shù)目被限制為 1桅狠,操作一個(gè)無(wú)界的工作隊(duì)列讼载,所以它保證了所有任務(wù)的都是被順序執(zhí)行,最多會(huì)有一個(gè)任務(wù)處于活動(dòng)狀態(tài)中跌,并且不允許使用者改動(dòng)線程池實(shí)例咨堤,因此可以避免其改變線程數(shù)目。
- newCachedThreadPool()漩符,它是一種用來(lái)處理大量短時(shí)間工作任務(wù)的線程池一喘,具有幾個(gè)鮮明特點(diǎn):它會(huì)試圖緩存線程并重用,當(dāng)無(wú)緩存線程可用時(shí)嗜暴,就會(huì)創(chuàng)建新的工作線程凸克;如果線程閑置的時(shí)間超過(guò) 60 秒,則被終止并移出緩存灼伤;長(zhǎng)時(shí)間閑置時(shí)触徐,這種線程池咪鲜,不會(huì)消耗什么資源狐赡。其內(nèi)部使用 SynchronousQueue 作為工作隊(duì)列。
- newFixedThreadPool(int nThreads)疟丙,重用指定數(shù)目(nThreads)的線程颖侄,其背后使用的是無(wú)界的工作隊(duì)列,任何時(shí)候最多有 nThreads 個(gè)工作線程是活動(dòng)的享郊。這意味著览祖,如果任務(wù)數(shù)量超過(guò)了活動(dòng)隊(duì)列數(shù)目,將在工作隊(duì)列中等待空閑線程出現(xiàn)炊琉;如果有工作線程退出展蒂,將會(huì)有新的工作線程被創(chuàng)建,以補(bǔ)足指定的數(shù)目 nThreads苔咪。
- newSingleThreadScheduledExecutor() 創(chuàng)建單線程池锰悼,返回 ScheduledExecutorService,可以進(jìn)行定時(shí)或周期性的工作調(diào)度团赏。
- newScheduledThreadPool(int corePoolSize)和newSingleThreadScheduledExecutor()類似箕般,創(chuàng)建的是個(gè) ScheduledExecutorService,可以進(jìn)行定時(shí)或周期性的工作調(diào)度舔清,區(qū)別在于單一工作線程還是多個(gè)工作線程丝里。
- newWorkStealingPool(int parallelism)曲初,這是一個(gè)經(jīng)常被人忽略的線程池,Java 8 才加入這個(gè)創(chuàng)建方法杯聚,其內(nèi)部會(huì)構(gòu)建<a >ForkJoinPool</a>臼婆,利用<a >Work-Stealing</a>算法,并行地處理任務(wù)幌绍,不保證處理順序目锭。
- ThreadPoolExecutor是最原始的線程池創(chuàng)建,上面1-3創(chuàng)建方式都是對(duì)ThreadPoolExecutor的封裝纷捞。
總結(jié): 其中newSingleThreadExecutor痢虹、newCachedThreadPool、newFixedThreadPool是對(duì)ThreadPoolExecutor的封裝實(shí)現(xiàn)主儡,newSingleThreadScheduledExecutor奖唯、newScheduledThreadPool則為ThreadPoolExecutor子類ScheduledThreadPoolExecutor的封裝,用于執(zhí)行延遲任務(wù)糜值,newWorkStealingPool則為Java 8新加的方法丰捷。
2.1.2 單線程池的意義
從以上代碼可以看出newSingleThreadExecutor和newSingleThreadScheduledExecutor創(chuàng)建的都是單線程池,那么單線程池的意義是什么呢寂汇?
雖然是單線程池病往,但提供了工作隊(duì)列,生命周期管理骄瓣,工作線程維護(hù)等功能停巷。
2.2 ThreadPoolExecutor解讀
ThreadPoolExecutor作為線程池的核心方法,我們來(lái)看一下ThreadPoolExecutor內(nèi)部實(shí)現(xiàn)榕栏,以及封裝類是怎么調(diào)用ThreadPoolExecutor的畔勤。
先從構(gòu)造函數(shù)說(shuō)起,構(gòu)造函數(shù)源碼如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
參數(shù)說(shuō)明:
- corePoolSize:所謂的核心線程數(shù)扒磁,可以大致理解為長(zhǎng)期駐留的線程數(shù)目(除非設(shè)置了 allowCoreThreadTimeOut)庆揪。對(duì)于不同的線程池,這個(gè)值可能會(huì)有很大區(qū)別妨托,比如 newFixedThreadPool 會(huì)將其設(shè)置為 nThreads缸榛,而對(duì)于 newCachedThreadPool 則是為 0。
- maximumPoolSize:顧名思義兰伤,就是線程不夠時(shí)能夠創(chuàng)建的最大線程數(shù)内颗。同樣進(jìn)行對(duì)比,對(duì)于 newFixedThreadPool医清,當(dāng)然就是 nThreads起暮,因?yàn)槠湟笫枪潭ù笮。?newCachedThreadPool 則是 Integer.MAX_VALUE。
- keepAliveTime:空閑線程的备号常活時(shí)間筒捺,如果線程的空閑時(shí)間超過(guò)這個(gè)值,那么將會(huì)被關(guān)閉纸厉。注意此值生效條件必須滿足:空閑時(shí)間超過(guò)這個(gè)值系吭,并且線程池中的線程數(shù)少于等于核心線程數(shù)corePoolSize。當(dāng)然核心線程默認(rèn)是不會(huì)關(guān)閉的颗品,除非設(shè)置了allowCoreThreadTimeOut(true)那么核心線程也可以被回收肯尺。
- TimeUnit:時(shí)間單位。
- BlockingQueue:任務(wù)丟列躯枢,用于存儲(chǔ)線程池的待執(zhí)行任務(wù)的则吟。
- threadFactory:用于生成線程,一般我們可以用默認(rèn)的就可以了锄蹂。
- handler:當(dāng)線程池已經(jīng)滿了氓仲,但是又有新的任務(wù)提交的時(shí)候,該采取什么策略由這個(gè)來(lái)指定得糜。有幾種方式可供選擇敬扛,像拋出異常、直接拒絕然后返回等朝抖,也可以自己實(shí)現(xiàn)相應(yīng)的接口實(shí)現(xiàn)自己的邏輯啥箭。
來(lái)看一下線程池封裝類對(duì)于ThreadPoolExecutor的調(diào)用:
newSingleThreadExecutor對(duì)ThreadPoolExecutor的封裝源碼如下:
public static ExecutorService newSingleThreadExecutor() {
return new Executors.FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
newCachedThreadPool對(duì)ThreadPoolExecutor的封裝源碼如下:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newFixedThreadPool對(duì)ThreadPoolExecutor的封裝源碼如下:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
ScheduledExecutorService對(duì)ThreadPoolExecutor的封裝源碼如下:
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
newSingleThreadScheduledExecutor使用的是ThreadPoolExecutor的子類ScheduledThreadPoolExecutor,如下圖所示:
![Threadpool](http://icdn.apigo.cn/blog/threadpool-start01.png)
newScheduledThreadPool對(duì)ThreadPoolExecutor的封裝源碼如下:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
newScheduledThreadPool使用的也是ThreadPoolExecutor的子類ScheduledThreadPoolExecutor治宣。
2.3 線程池狀態(tài)
查看ThreadPoolExecutor源碼可知線程的狀態(tài)如下:
![Threadpool](http://icdn.apigo.cn/blog/threadpool-runstate.png)
線程狀態(tài)解讀(以下內(nèi)容來(lái)源于:https://javadoop.com/post/java-thread-pool):
- RUNNING:這個(gè)沒(méi)什么好說(shuō)的急侥,這是最正常的狀態(tài):接受新的任務(wù),處理等待隊(duì)列中的任務(wù)炼七;
- SHUTDOWN:不接受新的任務(wù)提交缆巧,但是會(huì)繼續(xù)處理等待隊(duì)列中的任務(wù);
- STOP:不接受新的任務(wù)提交豌拙,不再處理等待隊(duì)列中的任務(wù),中斷正在執(zhí)行任務(wù)的線程题暖;
- TIDYING:所有的任務(wù)都銷毀了按傅,workCount 為 0。線程池的狀態(tài)在轉(zhuǎn)換為 TIDYING 狀態(tài)時(shí)胧卤,會(huì)執(zhí)行鉤子方法 terminated()唯绍;
- TERMINATED:terminated() 方法結(jié)束后,線程池的狀態(tài)就會(huì)變成這個(gè)枝誊;
RUNNING 定義為 -1况芒,SHUTDOWN 定義為 0,其他的都比 0 大叶撒,所以等于 0 的時(shí)候不能提交任務(wù)绝骚,大于 0 的話耐版,連正在執(zhí)行的任務(wù)也需要中斷。
看了這幾種狀態(tài)的介紹压汪,讀者大體也可以猜到十之八九的狀態(tài)轉(zhuǎn)換了粪牲,各個(gè)狀態(tài)的轉(zhuǎn)換過(guò)程有以下幾種:
- RUNNING -> SHUTDOWN:當(dāng)調(diào)用了 shutdown() 后,會(huì)發(fā)生這個(gè)狀態(tài)轉(zhuǎn)換止剖,這也是最重要的腺阳;
- (RUNNING or SHUTDOWN) -> STOP:當(dāng)調(diào)用 shutdownNow() 后,會(huì)發(fā)生這個(gè)狀態(tài)轉(zhuǎn)換穿香,這下要清楚 shutDown() 和 shutDownNow() 的區(qū)別了亭引;
- SHUTDOWN -> TIDYING:當(dāng)任務(wù)隊(duì)列和線程池都清空后,會(huì)由 SHUTDOWN 轉(zhuǎn)換為 TIDYING皮获;
- STOP -> TIDYING:當(dāng)任務(wù)隊(duì)列清空后痛侍,發(fā)生這個(gè)轉(zhuǎn)換;
- TIDYING -> TERMINATED:這個(gè)前面說(shuō)了魔市,當(dāng) terminated() 方法結(jié)束后主届;
2.4 線程池執(zhí)行
說(shuō)了那么多下來(lái)一起來(lái)看線程池的是怎么執(zhí)行任務(wù)的,線程池任務(wù)提交有兩個(gè)方法:
- execute
- submit
其中execute只能接受Runnable類型的任務(wù)待德,使用如下:
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
singleThreadExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
});
submit可以接受Runnable或Callable類型的任務(wù)君丁,使用如下:
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
});
2.4.1 帶返回值的線程池實(shí)現(xiàn)
使用submit傳遞Callable類可以獲取執(zhí)行任務(wù)的返回值,Callable是JDK 1.5 添加的特性用于補(bǔ)充Runnable無(wú)返回的情況将宪。
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<Long> result = executorService.submit(new Callable<Long>() {
@Override
public Long call() throws Exception {
return new Date().getTime();
}
});
try {
System.out.println("運(yùn)行結(jié)果:" + result.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
2.4.2 延遲線程池實(shí)現(xiàn)
在線程池中newSingleThreadScheduledExecutor和newScheduledThreadPool返回的是ScheduledExecutorService绘闷,用于執(zhí)行延遲線程池的,代碼如下:
// 延遲線程池
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);
scheduledThreadPool.schedule(new Runnable() {
@Override
public void run() {
System.out.println("time:" + new Date().getTime());
}
}, 10, TimeUnit.SECONDS);
完整示例下載地址: https://github.com/vipstone/java-core-example
三较坛、線程池源碼解讀
閱讀線程池的源碼有一個(gè)小技巧印蔗,可以按照線程池執(zhí)行的順序進(jìn)行串連關(guān)聯(lián)閱讀,這樣更容易理解線程池的實(shí)現(xiàn)丑勤。
源碼閱讀流程解讀
我們先從線程池的任務(wù)提交方法execute()開(kāi)始閱讀华嘹,從execute()我們會(huì)發(fā)現(xiàn)線程池執(zhí)行的核心方法是addWorker(),在addWorker()中我們發(fā)現(xiàn)啟動(dòng)線程調(diào)用了start()方法法竞,調(diào)用start()方法之后會(huì)執(zhí)行Worker類的run()方法耙厚,run里面調(diào)用runWorker(),運(yùn)行程序的關(guān)鍵在于getTask()方法岔霸,getTask()方法之后就是此線程的關(guān)閉薛躬,整個(gè)線程池的工作流程也就完成了,下來(lái)一起來(lái)看吧(如果本段文章沒(méi)看懂的話也可以看完源碼之后呆细,回過(guò)頭來(lái)再看一遍)型宝。
3.1 execute() 源碼解讀
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 如果當(dāng)前線程數(shù)少于核心線程數(shù),那么直接添加一個(gè) worker 來(lái)執(zhí)行任務(wù),
// 創(chuàng)建一個(gè)新的線程趴酣,并把當(dāng)前任務(wù) command 作為這個(gè)線程的第一個(gè)任務(wù)(firstTask)
if (workerCountOf(c) < corePoolSize) {
// 添加任務(wù)成功梨树,那么就結(jié)束了。提交任務(wù)嘛价卤,線程池已經(jīng)接受了這個(gè)任務(wù)劝萤,這個(gè)方法也就可以返回了
// 至于執(zhí)行的結(jié)果,到時(shí)候會(huì)包裝到 FutureTask 中慎璧。
// 返回 false 代表線程池不允許提交任務(wù)
if (addWorker(command, true))
return;
c = ctl.get();
}
// 到這里說(shuō)明床嫌,要么當(dāng)前線程數(shù)大于等于核心線程數(shù),要么剛剛 addWorker 失敗了
// 如果線程池處于 RUNNING 狀態(tài)胸私,把這個(gè)任務(wù)添加到任務(wù)隊(duì)列 workQueue 中
if (isRunning(c) && workQueue.offer(command)) {
/* 這里面說(shuō)的是厌处,如果任務(wù)進(jìn)入了 workQueue,我們是否需要開(kāi)啟新的線程
* 因?yàn)榫€程數(shù)在 [0, corePoolSize) 是無(wú)條件開(kāi)啟新的線程
* 如果線程數(shù)已經(jīng)大于等于 corePoolSize岁疼,那么將任務(wù)添加到隊(duì)列中阔涉,然后進(jìn)到這里
*/
int recheck = ctl.get();
// 如果線程池已不處于 RUNNING 狀態(tài),那么移除已經(jīng)入隊(duì)的這個(gè)任務(wù)捷绒,并且執(zhí)行拒絕策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果線程池還是 RUNNING 的瑰排,并且線程數(shù)為 0,那么開(kāi)啟新的線程
// 到這里暖侨,我們知道了椭住,這塊代碼的真正意圖是:擔(dān)心任務(wù)提交到隊(duì)列中了,但是線程都關(guān)閉了
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果 workQueue 隊(duì)列滿了字逗,那么進(jìn)入到這個(gè)分支
// 以 maximumPoolSize 為界創(chuàng)建新的 worker京郑,
// 如果失敗,說(shuō)明當(dāng)前線程數(shù)已經(jīng)達(dá)到 maximumPoolSize葫掉,執(zhí)行拒絕策略
else if (!addWorker(command, false))
reject(command);
}
3.2 addWorker() 源碼解讀
// 第一個(gè)參數(shù)是準(zhǔn)備提交給這個(gè)線程執(zhí)行的任務(wù)些举,之前說(shuō)了,可以為 null
// 第二個(gè)參數(shù)為 true 代表使用核心線程數(shù) corePoolSize 作為創(chuàng)建線程的界線俭厚,也就說(shuō)創(chuàng)建這個(gè)線程的時(shí)候户魏,
// 如果線程池中的線程總數(shù)已經(jīng)達(dá)到 corePoolSize,那么不能響應(yīng)這次創(chuàng)建線程的請(qǐng)求
// 如果是 false套腹,代表使用最大線程數(shù) maximumPoolSize 作為界線
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 這個(gè)非常不好理解
// 如果線程池已關(guān)閉绪抛,并滿足以下條件之一,那么不創(chuàng)建新的 worker:
// 1. 線程池狀態(tài)大于 SHUTDOWN电禀,其實(shí)也就是 STOP, TIDYING, 或 TERMINATED
// 2. firstTask != null
// 3. workQueue.isEmpty()
// 簡(jiǎn)單分析下:
// 還是狀態(tài)控制的問(wèn)題,當(dāng)線程池處于 SHUTDOWN 的時(shí)候笤休,不允許提交任務(wù)尖飞,但是已有的任務(wù)繼續(xù)執(zhí)行
// 當(dāng)狀態(tài)大于 SHUTDOWN 時(shí),不允許提交任務(wù),且中斷正在執(zhí)行的任務(wù)
// 多說(shuō)一句:如果線程池處于 SHUTDOWN政基,但是 firstTask 為 null贞铣,且 workQueue 非空,那么是允許創(chuàng)建 worker 的
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 如果成功沮明,那么就是所有創(chuàng)建線程前的條件校驗(yàn)都滿足了辕坝,準(zhǔn)備創(chuàng)建線程執(zhí)行任務(wù)了
// 這里失敗的話,說(shuō)明有其他線程也在嘗試往線程池中創(chuàng)建線程
if (compareAndIncrementWorkerCount(c))
break retry;
// 由于有并發(fā)荐健,重新再讀取一下 ctl
c = ctl.get();
// 正常如果是 CAS 失敗的話酱畅,進(jìn)到下一個(gè)里層的for循環(huán)就可以了
// 可是如果是因?yàn)槠渌€程的操作,導(dǎo)致線程池的狀態(tài)發(fā)生了變更江场,如有其他線程關(guān)閉了這個(gè)線程池
// 那么需要回到外層的for循環(huán)
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
/*
* 到這里纺酸,我們認(rèn)為在當(dāng)前這個(gè)時(shí)刻,可以開(kāi)始創(chuàng)建線程來(lái)執(zhí)行任務(wù)了址否,
* 因?yàn)樵撔r?yàn)的都校驗(yàn)了餐蔬,至于以后會(huì)發(fā)生什么,那是以后的事佑附,至少當(dāng)前是滿足條件的
*/
// worker 是否已經(jīng)啟動(dòng)
boolean workerStarted = false;
// 是否已將這個(gè) worker 添加到 workers 這個(gè) HashSet 中
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
// 把 firstTask 傳給 worker 的構(gòu)造方法
w = new Worker(firstTask);
// 取 worker 中的線程對(duì)象樊诺,之前說(shuō)了,Worker的構(gòu)造方法會(huì)調(diào)用 ThreadFactory 來(lái)創(chuàng)建一個(gè)新的線程
final Thread t = w.thread;
if (t != null) {
// 這個(gè)是整個(gè)類的全局鎖音同,持有這個(gè)鎖才能讓下面的操作“順理成章”词爬,
// 因?yàn)殛P(guān)閉一個(gè)線程池需要這個(gè)鎖,至少我持有鎖的期間瘟斜,線程池不會(huì)被關(guān)閉
mainLock.lock();
try {
int c = ctl.get();
int rs = runStateOf(c);
// 小于 SHUTTDOWN 那就是 RUNNING缸夹,這個(gè)自不必說(shuō),是最正常的情況
// 如果等于 SHUTDOWN螺句,前面說(shuō)了虽惭,不接受新的任務(wù),但是會(huì)繼續(xù)執(zhí)行等待隊(duì)列中的任務(wù)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// worker 里面的 thread 可不能是已經(jīng)啟動(dòng)的
if (t.isAlive())
throw new IllegalThreadStateException();
// 加到 workers 這個(gè) HashSet 中
workers.add(w);
int s = workers.size();
// largestPoolSize 用于記錄 workers 中的個(gè)數(shù)的最大值
// 因?yàn)?workers 是不斷增加減少的蛇尚,通過(guò)這個(gè)值可以知道線程池的大小曾經(jīng)達(dá)到的最大值
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 添加成功的話芽唇,啟動(dòng)這個(gè)線程
if (workerAdded) {
// 啟動(dòng)線程
t.start();
workerStarted = true;
}
}
} finally {
// 如果線程沒(méi)有啟動(dòng),需要做一些清理工作取劫,如前面 workCount 加了 1匆笤,將其減掉
if (! workerStarted)
addWorkerFailed(w);
}
// 返回線程是否啟動(dòng)成功
return workerStarted;
}
在這段代碼可以看出,調(diào)用了t.start();
3.3 runWorker() 源碼解讀
根據(jù)上面代碼可知谱邪,調(diào)用了Worker的t.start()之后炮捧,緊接著會(huì)調(diào)用Worker的run()方法,run()源碼如下:
public void run() {
runWorker(this);
}
runWorker()源碼如下:
// worker 線程啟動(dòng)后調(diào)用,while 循環(huán)(即自旋!)不斷從等待隊(duì)列獲取任務(wù)并執(zhí)行
// worker 初始化時(shí)惦银,可指定 firstTask咆课,那么第一個(gè)任務(wù)也就可以不需要從隊(duì)列中獲取
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 該線程的第一個(gè)任務(wù)(若有)
Runnable task = w.firstTask;
w.firstTask = null;
// 允許中斷
w.unlock();
boolean completedAbruptly = true;
try {
// 循環(huán)調(diào)用 getTask 獲取任務(wù)
while (task != null || (task = getTask()) != null) {
w.lock();
// 若線程池狀態(tài)大于等于 STOP末誓,那么意味著該線程也要中斷
/**
* 若線程池STOP,請(qǐng)確保線程 已被中斷
* 如果沒(méi)有书蚪,請(qǐng)確保線程未被中斷
* 這需要在第二種情況下進(jìn)行重新檢查喇澡,以便在關(guān)中斷時(shí)處理shutdownNow競(jìng)爭(zhēng)
*/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 這是一個(gè)鉤子方法,留給需要的子類實(shí)現(xiàn)
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 到這里終于可以執(zhí)行任務(wù)了
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
// 這里不允許拋出 Throwable殊校,所以轉(zhuǎn)換為 Error
thrown = x; throw new Error(x);
} finally {
// 也是一個(gè)鉤子方法晴玖,將 task 和異常作為參數(shù),留給需要的子類實(shí)現(xiàn)
afterExecute(task, thrown);
}
} finally {
// 置空 task为流,準(zhǔn)備 getTask 下一個(gè)任務(wù)
task = null;
// 累加完成的任務(wù)數(shù)
w.completedTasks++;
// 釋放掉 worker 的獨(dú)占鎖
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 到這里呕屎,需要執(zhí)行線程關(guān)閉
// 1. 說(shuō)明 getTask 返回 null,也就是說(shuō)艺谆,這個(gè) worker 的使命結(jié)束了榨惰,執(zhí)行關(guān)閉
// 2. 任務(wù)執(zhí)行過(guò)程中發(fā)生了異常
// 第一種情況,已經(jīng)在代碼處理了將 workCount 減 1静汤,這個(gè)在 getTask 方法分析中說(shuō)
// 第二種情況琅催,workCount 沒(méi)有進(jìn)行處理,所以需要在 processWorkerExit 中處理
processWorkerExit(w, completedAbruptly);
}
}
3.4 getTask() 源碼解讀
runWorker里面的有g(shù)etTask()虫给,來(lái)看下具體的實(shí)現(xiàn):
// 此方法有三種可能
// 1. 阻塞直到獲取到任務(wù)返回藤抡。默認(rèn) corePoolSize 之內(nèi)的線程是不會(huì)被回收的,它們會(huì)一直等待任務(wù)
// 2. 超時(shí)退出抹估。keepAliveTime 起作用的時(shí)候缠黍,也就是如果這么多時(shí)間內(nèi)都沒(méi)有任務(wù),那么應(yīng)該執(zhí)行關(guān)閉
// 3. 如果發(fā)生了以下條件药蜻,須返回 null
// 池中有大于 maximumPoolSize 個(gè) workers 存在(通過(guò)調(diào)用 setMaximumPoolSize 進(jìn)行設(shè)置)
// 線程池處于 SHUTDOWN瓷式,而且 workQueue 是空的,前面說(shuō)了语泽,這種不再接受新的任務(wù)
// 線程池處于 STOP贸典,不僅不接受新的線程,連 workQueue 中的線程也不再執(zhí)行
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
// 允許核心線程數(shù)內(nèi)的線程回收踱卵,或當(dāng)前線程數(shù)超過(guò)了核心線程數(shù)廊驼,那么有可能發(fā)生超時(shí)關(guān)閉
// 這里 break,是為了不往下執(zhí)行后一個(gè) if (compareAndDecrementWorkerCount(c))
// 兩個(gè) if 一起看:如果當(dāng)前線程數(shù) wc > maximumPoolSize惋砂,或者超時(shí)妒挎,都返回 null
// 那這里的問(wèn)題來(lái)了,wc > maximumPoolSize 的情況西饵,為什么要返回 null酝掩?
// 換句話說(shuō),返回 null 意味著關(guān)閉線程眷柔。
// 那是因?yàn)橛锌赡荛_(kāi)發(fā)者調(diào)用了 setMaximumPoolSize 將線程池的 maximumPoolSize 調(diào)小了
// 如果此 worker 發(fā)生了中斷庸队,采取的方案是重試
// 解釋下為什么會(huì)發(fā)生中斷积蜻,這個(gè)讀者要去看 setMaximumPoolSize 方法闯割,
// 如果開(kāi)發(fā)者將 maximumPoolSize 調(diào)小了彻消,導(dǎo)致其小于當(dāng)前的 workers 數(shù)量,
// 那么意味著超出的部分線程要被關(guān)閉宙拉。重新進(jìn)入 for 循環(huán)宾尚,自然會(huì)有部分線程會(huì)返回 null
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// CAS 操作振定,減少工作線程數(shù)
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
// 如果此 worker 發(fā)生了中斷闭专,采取的方案是重試
// 解釋下為什么會(huì)發(fā)生中斷,這個(gè)讀者要去看 setMaximumPoolSize 方法说榆,
// 如果開(kāi)發(fā)者將 maximumPoolSize 調(diào)小了锥忿,導(dǎo)致其小于當(dāng)前的 workers 數(shù)量牛郑,
// 那么意味著超出的部分線程要被關(guān)閉。重新進(jìn)入 for 循環(huán)敬鬓,自然會(huì)有部分線程會(huì)返回 null
timedOut = false;
}
}
}
四淹朋、線程池執(zhí)行流程
線程池的執(zhí)行流程如下圖:
![Threadpool](http://icdn.apigo.cn/blog/threadpool-010.png)
五、總結(jié)
本文總結(jié)以問(wèn)答的形式展示钉答,引自《深度解讀 java 線程池設(shè)計(jì)思想及源碼實(shí)現(xiàn)》础芍,最下方附參考地址。
1数尿、線程池有哪些關(guān)鍵屬性仑性?
corePoolSize 到 maximumPoolSize 之間的線程會(huì)被回收,當(dāng)然 corePoolSize 的線程也可以通過(guò)設(shè)置而得到回收(allowCoreThreadTimeOut(true))右蹦。
workQueue 用于存放任務(wù)诊杆,添加任務(wù)的時(shí)候,如果當(dāng)前線程數(shù)超過(guò)了 corePoolSize何陆,那么往該隊(duì)列中插入任務(wù)晨汹,線程池中的線程會(huì)負(fù)責(zé)到隊(duì)列中拉取任務(wù)。
keepAliveTime 用于設(shè)置空閑時(shí)間甲献,如果線程數(shù)超出了 corePoolSize宰缤,并且有些線程的空閑時(shí)間超過(guò)了這個(gè)值,會(huì)執(zhí)行關(guān)閉這些線程的操作
rejectedExecutionHandler 用于處理當(dāng)線程池不能執(zhí)行此任務(wù)時(shí)的情況晃洒,默認(rèn)有拋出 RejectedExecutionException 異常慨灭、忽略任務(wù)、使用提交任務(wù)的線程來(lái)執(zhí)行此任務(wù)和將隊(duì)列中等待最久的任務(wù)刪除球及,然后提交此任務(wù)這四種策略氧骤,默認(rèn)為拋出異常。
2吃引、線程池中的線程創(chuàng)建時(shí)機(jī)筹陵?
如果當(dāng)前線程數(shù)少于 corePoolSize刽锤,那么提交任務(wù)的時(shí)候創(chuàng)建一個(gè)新的線程,并由這個(gè)線程執(zhí)行這個(gè)任務(wù)朦佩;
如果當(dāng)前線程數(shù)已經(jīng)達(dá)到 corePoolSize并思,那么將提交的任務(wù)添加到隊(duì)列中,等待線程池中的線程去隊(duì)列中取任務(wù)语稠;
如果隊(duì)列已滿宋彼,那么創(chuàng)建新的線程來(lái)執(zhí)行任務(wù),需要保證池中的線程數(shù)不會(huì)超過(guò) maximumPoolSize仙畦,如果此時(shí)線程數(shù)超過(guò)了 maximumPoolSize输涕,那么執(zhí)行拒絕策略。
3慨畸、任務(wù)執(zhí)行過(guò)程中發(fā)生異常怎么處理莱坎?
如果某個(gè)任務(wù)執(zhí)行出現(xiàn)異常,那么執(zhí)行任務(wù)的線程會(huì)被關(guān)閉寸士,而不是繼續(xù)接收其他任務(wù)檐什。然后會(huì)啟動(dòng)一個(gè)新的線程來(lái)代替它。
4碉京、什么時(shí)候會(huì)執(zhí)行拒絕策略厢汹?
- workers 的數(shù)量達(dá)到了 corePoolSize,任務(wù)入隊(duì)成功谐宙,以此同時(shí)線程池被關(guān)閉了烫葬,而且關(guān)閉線程池并沒(méi)有將這個(gè)任務(wù)出隊(duì),那么執(zhí)行拒絕策略凡蜻。這里說(shuō)的是非常邊界的問(wèn)題搭综,入隊(duì)和關(guān)閉線程池并發(fā)執(zhí)行,讀者仔細(xì)看看 execute 方法是怎么進(jìn)到第一個(gè) reject(command) 里面的划栓。
- workers 的數(shù)量大于等于 corePoolSize兑巾,準(zhǔn)備入隊(duì),可是隊(duì)列滿了忠荞,任務(wù)入隊(duì)失敗蒋歌,那么準(zhǔn)備開(kāi)啟新的線程,可是線程數(shù)已經(jīng)達(dá)到 maximumPoolSize委煤,那么執(zhí)行拒絕策略堂油。
六、參考資料
書(shū)籍:《碼出高效:Java開(kāi)發(fā)手冊(cè)》
Java核心技術(shù)36講:http://t.cn/EwUJvWA
深度解讀 java 線程池設(shè)計(jì)思想及源碼實(shí)現(xiàn):https://javadoop.com/post/java-thread-pool
Java線程池-ThreadPoolExecutor源碼解析(基于Java8):https://www.imooc.com/article/42990
課程推薦:
![線程池創(chuàng)建](http://icdn.apigo.cn/blog/java-yangxuefeng3.png)