線程池是什么?
線程池用于多線程處理中蝎宇,它可以根據(jù)系統(tǒng)的情況,可以有效控制線程執(zhí)行的數(shù)量祷安,優(yōu)化運(yùn)行效果姥芥。線程池做的工作主要是控制運(yùn)行的線程的數(shù)量,處理過(guò)程中將任務(wù)放入隊(duì)列汇鞭,然后在線程創(chuàng)建后啟動(dòng)這些任務(wù)凉唐,如果線程數(shù)量超過(guò)了最大數(shù)量超出數(shù)量的線程排隊(duì)等候,等其它線程執(zhí)行完畢霍骄,再?gòu)年?duì)列中取出任務(wù)來(lái)執(zhí)行台囱。
線程池的作用
在面向?qū)ο蟮木幊踢^(guò)程中,創(chuàng)建對(duì)象和銷毀對(duì)象是非常消耗時(shí)間和資源的读整。因此想要最小化這種消耗的一種思想就是『池化資源』簿训。線程池就是這樣的一種思想。我們通過(guò)重用線程池中的資源來(lái)減少創(chuàng)建和銷毀線程所需要耗費(fèi)的時(shí)間和資源。
線程池的一個(gè)作用是創(chuàng)建和銷毀線程的次數(shù)强品,每個(gè)工作線程可以多次使用膘侮;另一個(gè)作用是可根據(jù)系統(tǒng)情況調(diào)整執(zhí)行的線程數(shù)量,防止消耗過(guò)多內(nèi)存的榛。另外琼了,通過(guò)線程池,能有效的控制線程的最大并發(fā)數(shù)夫晌,提高系統(tǒng)資源利用率雕薪,同時(shí)避免過(guò)多的資源競(jìng)爭(zhēng),避免堵塞晓淀。
線程池的優(yōu)點(diǎn)總結(jié)如下幾個(gè)方面:
- 線程復(fù)用
- 控制最大并發(fā)數(shù)
- 管理線程
線程池的組成
一般的線程池主要分為以下4個(gè)組成部分:
- 線程池管理器:用于創(chuàng)建并管理線程池
- 工作線程:線程池中的線程
- 任務(wù)接口:每個(gè)任務(wù)必須實(shí)現(xiàn)的接口所袁,用于工作線程調(diào)度其運(yùn)行
- 任務(wù)隊(duì)列:用于存放待處理的任務(wù),提供一種緩沖機(jī)制
線程池的常見(jiàn)應(yīng)用場(chǎng)景
許多服務(wù)器應(yīng)用常常需要處理大量而短小的請(qǐng)求(例如要糊,Web 服務(wù)器纲熏,數(shù)據(jù)庫(kù)服務(wù)器等等)妆丘,通常它們收到的請(qǐng)求數(shù)量很大锄俄,一個(gè)簡(jiǎn)單的模型是,當(dāng)服務(wù)器收到來(lái)自遠(yuǎn)程的請(qǐng)求時(shí)勺拣,為每一個(gè)請(qǐng)求開(kāi)啟一個(gè)線程奶赠,在請(qǐng)求完畢之后再對(duì)線程進(jìn)行銷毀。這樣處理帶來(lái)的問(wèn)題是药有,創(chuàng)建和銷毀線程所消耗的時(shí)間往往比任務(wù)本身所需消耗的資源要大得多毅戈。那么應(yīng)該怎么辦呢?
線程池為線程生命周期開(kāi)銷問(wèn)題和資源不足問(wèn)題提供了解決方案愤惰。我們可以通過(guò)線程池做到線程復(fù)用苇经,不需要頻繁的創(chuàng)建和銷毀線程,讓線程池中的線程一直存在于線程池中宦言,然后線程從任務(wù)隊(duì)列中取得任務(wù)來(lái)執(zhí)行扇单。而且這樣做的另一個(gè)好處有,通過(guò)適當(dāng)?shù)卣{(diào)整線程池中的線程數(shù)目奠旺,也就是當(dāng)請(qǐng)求的數(shù)目超過(guò)某個(gè)閾值時(shí)蜘澜,就強(qiáng)制其它任何新到的請(qǐng)求一直等待,直到獲得一個(gè)線程來(lái)處理為止响疚,從而可以防止資源不足鄙信。
Java線程池的簡(jiǎn)介
Java中提供了實(shí)現(xiàn)線程池的框架Executor,并且提供了許多種類的線程池忿晕,接下來(lái)的文章中將會(huì)做詳細(xì)介紹装诡。
Java線程池框架
Java中的線程池是通過(guò)Executor框架實(shí)現(xiàn)的,該框架中用到了Executor,Executors慎王,ExecutorService蚓土,ThreadPoolExecutor ,Callable和Future赖淤、FutureTask這幾個(gè)類蜀漆。
- Executor:所有線程池的接口,只有一個(gè)方法
- Executors:Executor 的工廠類咱旱,提供了創(chuàng)建各種不同線程池的方法确丢,返回的線程池都實(shí)現(xiàn)了ExecutorService 接口
- ThreadPoolExecutor:線程池的具體實(shí)現(xiàn)類,一般所有的線程池都是基于這個(gè)類實(shí)現(xiàn)的
其中ThreadPoolExecutor的構(gòu)造方法如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
其中:
- corePoolSize:線程池的核心線程數(shù)
- maximumPoolSize:線程池中允許的最大線程數(shù)
- keepAliveTime:空閑線程結(jié)束的超時(shí)時(shí)間
- unit:是一個(gè)枚舉吐限,它表示的是 keepAliveTime 的單位
- workQueue:工作隊(duì)列鲜侥,用于任務(wù)的存放
Java線程池的工作過(guò)程
Java線程池的工作過(guò)程如下:
- 線程池剛創(chuàng)建時(shí),里面沒(méi)有一個(gè)線程诸典。任務(wù)隊(duì)列是作為參數(shù)傳進(jìn)來(lái)的描函。不過(guò),就算隊(duì)列里面有任務(wù)狐粱,線程池也不會(huì)馬上執(zhí)行它們舀寓。
- 當(dāng)調(diào)用 execute() 方法添加一個(gè)任務(wù)時(shí),線程池會(huì)做如下判斷:
- 如果正在運(yùn)行的線程數(shù)量小于 corePoolSize肌蜻,那么馬上創(chuàng)建線程運(yùn)行這個(gè)任務(wù)互墓;
- 如果正在運(yùn)行的線程數(shù)量大于或等于 corePoolSize,那么將這個(gè)任務(wù)放入隊(duì)列蒋搜;
- 如果這時(shí)候隊(duì)列滿了篡撵,而且正在運(yùn)行的線程數(shù)量小于 maximumPoolSize,那么還是要?jiǎng)?chuàng)建非核心線程立刻運(yùn)行這個(gè)任務(wù)豆挽;
- 如果隊(duì)列滿了育谬,而且正在運(yùn)行的線程數(shù)量大于或等于 maximumPoolSize,那么線程池會(huì)拋出異常RejectExecutionException帮哈。
- 當(dāng)一個(gè)線程完成任務(wù)時(shí)膛檀,它會(huì)從隊(duì)列中取下一個(gè)任務(wù)來(lái)執(zhí)行。
- 當(dāng)一個(gè)線程無(wú)事可做但汞,超過(guò)一定的時(shí)間(keepAliveTime)時(shí)宿刮,線程池會(huì)判斷,如果當(dāng)前運(yùn)行的線程數(shù)大于 corePoolSize私蕾,那么這個(gè)線程就被停掉僵缺。所以線程池的所有任務(wù)完成后,它最終會(huì)收縮到 corePoolSize 的大小踩叭。
常見(jiàn)的Java線程池
生成線程池使用的是Executors的工廠方法磕潮,以下是常見(jiàn)的 Java 線程池:
SingleThreadExecutor
SingleThreadExecutor是單個(gè)線程的線程池翠胰,即線程池中每次只有一個(gè)線程在運(yùn)行,單線程串行執(zhí)行任務(wù)自脯。如果這個(gè)唯一的線程因?yàn)楫惓=Y(jié)束之景,那么會(huì)有一個(gè)新的線程來(lái)替代它。此線程池保證所有任務(wù)的執(zhí)行順序按照任務(wù)的提交順序執(zhí)行膏潮。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
FixedThreadPool
FixedThreadPool是固定數(shù)量的線程池锻狗,只有核心線程,每提交一個(gè)任務(wù)就是一個(gè)線程焕参,直到達(dá)到線程池的最大數(shù)量轻纪,然后后面進(jìn)入等待隊(duì)列,直到前面的任務(wù)完成才繼續(xù)執(zhí)行叠纷。線程池的大小一旦達(dá)到最大值就會(huì)保持不變刻帚,如果某個(gè)線程因?yàn)閳?zhí)行異常而結(jié)束,那么線程池會(huì)補(bǔ)充一個(gè)新線程涩嚣。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
CachedThreadPool
CachedThreadPool是可緩存線程池崇众。如果線程池的大小超過(guò)了處理任務(wù)所需要的線程,那么就會(huì)回收部分空閑(60秒不執(zhí)行任務(wù))的線程航厚,當(dāng)任務(wù)數(shù)增加時(shí)顷歌,此線程池又可以智能的添加新線程來(lái)處理任務(wù)。此線程池不會(huì)對(duì)線程池大小做限制阶淘,線程池大小完全依賴于操作系統(tǒng)(或者說(shuō)JVM)能夠創(chuàng)建的最大線程大小衙吩。其中互妓,SynchronousQueue是一個(gè)是緩沖區(qū)為1的阻塞隊(duì)列溪窒。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
ScheduledThreadPool
ScheduledThreadPool是核心線程池固定,大小無(wú)限制的線程池冯勉,支持定時(shí)和周期性的執(zhí)行線程澈蚌。創(chuàng)建一個(gè)周期性執(zhí)行任務(wù)的線程池。如果閑置,非核心線程池會(huì)在DEFAULT_KEEPALIVEMILLIS時(shí)間內(nèi)回收灼狰。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
Java 線程池的創(chuàng)建和使用
我們可以通過(guò)Executors的工廠方法來(lái)創(chuàng)建一個(gè)線程池宛瞄。但是我們?cè)撊绾巫尵€程池執(zhí)行任務(wù)呢?
線程池最常用的提交任務(wù)的方法有兩種:
- execute:
ExecutorService.execute(Runnable runable)交胚;
- submit:
FutureTask task = ExecutorService.submit(Runnable runnable);
FutureTask<T> task = ExecutorService.submit(Runnable runnable,T Result);
FutureTask<T> task = ExecutorService.submit(Callable<T> callable);
可以看出submit開(kāi)啟的是有返回結(jié)果的任務(wù)份汗,會(huì)返回一個(gè)FutureTask對(duì)象,這樣就能通過(guò)get()方法得到結(jié)果蝴簇。submit最終調(diào)用的也是execute(Runnable runable)杯活,submit只是將Callable對(duì)象或Runnable封裝成一個(gè)FutureTask對(duì)象,因?yàn)镕utureTask是個(gè)Runnable熬词,所以可以在execute中執(zhí)行旁钧。
下面的示例代碼演示了如何創(chuàng)建一個(gè)線程池吸重,并且使用它管理線程:
public class MyThread extends Thread {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " is running.");
}
}
public class TestSingleThreadExecutor {
public static void main(String[] args) {
//創(chuàng)建一個(gè)可重用固定線程數(shù)的線程池
ExecutorService pool = Executors.newFixedThreadPool(2);
//創(chuàng)建實(shí)現(xiàn)了Runnable接口對(duì)象
Thread tt1 = new MyThread();
Thread tt2 = new MyThread();
Thread tt3 = new MyThread();
Thread tt4 = new MyThread();
Thread tt5 = new MyThread();
//將線程放入池中并執(zhí)行
pool.execute(tt1);
pool.execute(tt2);
pool.execute(tt3);
pool.execute(tt4);
pool.execute(tt5);
//關(guān)閉
pool.shutdown();
}
}
運(yùn)行結(jié)果:
pool-1-thread-1 is running.
pool-1-thread-2 is running.
pool-1-thread-1 is running.
pool-1-thread-2 is running.
pool-1-thread-1 is running.
Java線程池原理
這篇文章會(huì)分別從這三個(gè)方面,結(jié)合具體的代碼實(shí)現(xiàn)來(lái)剖析 Java 線程池的原理以及它的具體實(shí)現(xiàn)歪今。
線程復(fù)用
我們知道線程池的一個(gè)作用是創(chuàng)建和銷毀線程的次數(shù)嚎幸,每個(gè)工作線程可以多次使用。這個(gè)功能就是線程復(fù)用寄猩。想要了解 Java 線程池是如何進(jìn)行線程復(fù)用的嫉晶,我們首先需要了解線程的生命周期。
線程生命周期
下圖描述了線程完整的生命周期:
在一個(gè)線程完整的生命周期中田篇,它可能經(jīng)歷五種狀態(tài):新建(New)车遂、就緒(Runnable)、運(yùn)行(Running)斯辰、阻塞(Blocked)舶担、終止(Zombie)。
在 Java中彬呻,Thread 通過(guò)new來(lái)新建一個(gè)線程衣陶,這個(gè)過(guò)程是是初始化一些線程信息,如線程名闸氮、id剪况、線程所屬group等,可以認(rèn)為只是個(gè)普通的對(duì)象蒲跨。調(diào)用Thread的start()后Java虛擬機(jī)會(huì)為其創(chuàng)建方法調(diào)用棧和程序計(jì)數(shù)器译断,同時(shí)將hasBeenStarted為true,之后如果再次調(diào)用start()方法就會(huì)有異常或悲。
處于這個(gè)狀態(tài)中的線程并沒(méi)有開(kāi)始運(yùn)行孙咪,只是表示該線程可以運(yùn)行了。至于該線程何時(shí)開(kāi)始運(yùn)行巡语,取決于 JVM 里線程調(diào)度器的調(diào)度翎蹈。當(dāng)線程獲取CPU后,run()方法會(huì)被調(diào)用男公。不要自己去調(diào)用Thread的run()方法荤堪。之后根據(jù)CPU的調(diào)度,線程就會(huì)在就緒—運(yùn)行—阻塞間切換枢赔,直到run()方法結(jié)束或其他方式停止線程澄阳,進(jìn)入終止?fàn)顟B(tài)。
因此踏拜,如果要實(shí)現(xiàn)線程的復(fù)用碎赢,我們必須要保證線程池中的線程保持存活狀態(tài)(就緒籽孙、運(yùn)行怯伊、阻塞)筛婉。接下來(lái)却邓,我們就來(lái)看看ThreadPoolExecutor是如何實(shí)現(xiàn)線程復(fù)用的。
Worker 類
ThreadPoolExecutor主要是通過(guò)一個(gè)類來(lái)控制線程復(fù)用的:Worker 類峦嗤。
我們來(lái)看一下簡(jiǎn)化后的 Worker 類代碼:
private final class Worker implements Runnable {
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Runnable task = w.firstTask;
w.firstTask = null;
while (task != null || (task = getTask()) != null){
task.run();
}
}
……
}
從代碼中蕊唐,我們可以看到 Worker 實(shí)現(xiàn)了 Runnable 接口,并且它還有一個(gè) Thread成員變量 thread烁设,這個(gè) thread 就是要開(kāi)啟運(yùn)行的線程替梨。我們看到 Worker 的構(gòu)造方法中傳遞了一個(gè) Runnable 參數(shù),同時(shí)它把自己作為參數(shù)傳入 newThread()装黑,這樣的話副瀑,當(dāng) Thread 的start()方法得到調(diào)用時(shí),執(zhí)行的其實(shí)是 Worker 的run()方法恋谭,即runWorker()方法糠睡。
runWorker()方法之中有一個(gè) while 循環(huán),使用 getTask()來(lái)獲取任務(wù)疚颊,并執(zhí)行狈孔。接下來(lái),我們將會(huì)看到getTask()是如何獲取到 Runnable 對(duì)象的材义。
getTask()
我們來(lái)看一下簡(jiǎn)化后的getTask()代碼:
private Runnable getTask() {
if(一些特殊情況) {
return null;
}
Runnable r = workQueue.take();
return r;
}
我們可以看到任務(wù)是從 workQueue中獲取的均抽,這個(gè) workQueue 就是我們初始化 ThreadPoolExecutor 時(shí)存放任務(wù)的 BlockingQueue隊(duì)列,這個(gè)隊(duì)列里的存放的都是將要執(zhí)行的 Runnable任務(wù)其掂。因?yàn)?BlockingQueue 是個(gè)阻塞隊(duì)列油挥,BlockingQueue.take()返回的是空,則進(jìn)入等待狀態(tài)直到 BlockingQueue 有新的對(duì)象被加入時(shí)喚醒阻塞的線程款熬。所以一般情況下深寥,Thread的run()方法不會(huì)結(jié)束,而是不斷執(zhí)行workQueue里的Runnable任務(wù)华烟,這就達(dá)到了線程復(fù)用的目的了翩迈。
控制最大并發(fā)數(shù)
我們現(xiàn)在已經(jīng)知道了 Java 線程池是如何做到線程復(fù)用的了持灰,但是Runnable 是什么時(shí)候被放入 workQueue 隊(duì)列中的呢盔夜,Worker里的Thread的又是什么時(shí)候調(diào)用start()開(kāi)啟新線程來(lái)執(zhí)行Worker的run()方法的呢?從上面的分析中我們可以看出Worker里的runWorker()執(zhí)行任務(wù)時(shí)是一個(gè)接一個(gè)堤魁,串行進(jìn)行的喂链,那并發(fā)是怎么體現(xiàn)的呢?它又是如何做到控制最大并發(fā)數(shù)的呢妥泉?
execute()
通過(guò)查看 execute()就能解答上述的一些問(wèn)題椭微,同樣是簡(jiǎn)化后的代碼:
public void execute(Runnable command) {
if (command == null) throw new NullPointerException();
int c = ctl.get();
// 當(dāng)前線程數(shù) < corePoolSize
if (workerCountOf(c) < corePoolSize) {
// 直接啟動(dòng)新的線程。
if (addWorker(command, true))
return;
c = ctl.get();
}
// 活動(dòng)線程數(shù) >= corePoolSize
// runState為RUNNING && 隊(duì)列未滿
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次檢驗(yàn)是否為RUNNING狀態(tài)
// 非RUNNING狀態(tài) 則從workQueue中移除任務(wù)并拒絕
if (!isRunning(recheck) && remove(command))
reject(command);
// 采用線程池指定的策略拒絕任務(wù)
// 兩種情況:
// 1.非RUNNING狀態(tài)拒絕新的任務(wù)
// 2.隊(duì)列滿了啟動(dòng)新的線程失斆ち础(workCount > maximumPoolSize)
} else if (!addWorker(command, false))
reject(command);
}
addWorker()
我們?cè)賮?lái)看一下addWorker()的簡(jiǎn)化代碼:
private boolean addWorker(Runnable firstTask, boolean core) {
int wc = workerCountOf(c);
if (wc >= (core ? corePoolSize : maximumPoolSize)) {
return false;
}
w = new Worker(firstTask);
final Thread t = w.thread;
t.start();
}
根據(jù)上面的代碼蝇率,線程池工作過(guò)程中是如何添加任務(wù)的就很清晰了:
- 如果正在運(yùn)行的線程數(shù)量小于 corePoolSize迟杂,那么馬上創(chuàng)建線程運(yùn)行這個(gè)任務(wù);
- 如果正在運(yùn)行的線程數(shù)量大于或等于 corePoolSize本慕,那么將這個(gè)任務(wù)放入隊(duì)列排拷;
- 如果這時(shí)候隊(duì)列滿了,而且正在運(yùn)行的線程數(shù)量小于 maximumPoolSize锅尘,那么還是要?jiǎng)?chuàng)建非核心線程立刻運(yùn)行這個(gè)任務(wù)监氢;
- 如果隊(duì)列滿了,而且正在運(yùn)行的線程數(shù)量大于或等于 maximumPoolSize藤违,那么線程池會(huì)拋出異常RejectExecutionException
如果通過(guò)addWorker()成功創(chuàng)建新的線程浪腐,則通過(guò)start()開(kāi)啟新線程,同時(shí)將firstTask作為這個(gè)Worker里的run()中執(zhí)行的第一個(gè)任務(wù)顿乒。雖然每個(gè)Worker的任務(wù)是串行處理议街,但如果創(chuàng)建了多個(gè)Worker,因?yàn)楣灿靡粋€(gè)workQueue璧榄,所以就會(huì)并行處理了傍睹。所以可以根據(jù)corePoolSize和maximumPoolSize來(lái)控制最大并發(fā)數(shù)。
過(guò)程如下圖所示:
管理線程
上邊的文章已經(jīng)講了犹菱,通過(guò)線程池可以很好的管理線程的復(fù)用拾稳,控制并發(fā)數(shù),以及銷毀等過(guò)程腊脱,而線程的管理過(guò)程已經(jīng)穿插在其中了访得,也很好理解。
在 ThreadPoolExecutor 有個(gè)AtomicInteger變量 ctl陕凹,這一個(gè)變量保存了兩個(gè)內(nèi)容:
- 所有線程的數(shù)量
- 每個(gè)線程所處的狀態(tài)
其中低29位存線程數(shù)悍抑,高3位存runState,通過(guò)位運(yùn)算來(lái)得到不同的值杜耙。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//得到線程的狀態(tài)
private static int runStateOf(int c) { return c & ~CAPACITY; }
//得到Worker的的數(shù)量
private static int workerCountOf(int c) { return c & CAPACITY; }
// 判斷線程是否在運(yùn)行
private static boolean isRunning(int c) { return c < SHUTDOWN; }
這里主要通過(guò)shutdown和shutdownNow()來(lái)分析線程池的關(guān)閉過(guò)程搜骡。首先線程池有五種狀態(tài)來(lái)控制任務(wù)添加與執(zhí)行。主要介紹以下三種:
- RUNNING狀態(tài):線程池正常運(yùn)行佑女,可以接受新的任務(wù)并處理隊(duì)列中的任務(wù)记靡;
- SHUTDOWN狀態(tài):不再接受新的任務(wù),但是會(huì)執(zhí)行隊(duì)列中的任務(wù)团驱;
- STOP狀態(tài):不再接受新任務(wù)摸吠,不處理隊(duì)列中的任務(wù)
shutdown()這個(gè)方法會(huì)將runState置為SHUTDOWN,會(huì)終止所有空閑的線程嚎花,而仍在工作的線程不受影響寸痢,所以隊(duì)列中的任務(wù)人會(huì)被執(zhí)行;shutdownNow()方法將runState置為STOP紊选。和shutdown()方法的區(qū)別是啼止,這個(gè)方法會(huì)終止所有的線程道逗,所以隊(duì)列中的任務(wù)也不會(huì)被執(zhí)行了。
Java線程池框架源碼分析
前面的文章中已經(jīng)給出了Java線程池框架中幾個(gè)重要類的關(guān)系圖:
現(xiàn)在我們基于這張圖來(lái)逐步分析献烦。
Executor
public interface Executor {
void execute(Runnable command);
}
這個(gè)接口表示向線程池中提交一個(gè)任務(wù)憔辫。
ExecutorService
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
可以看到 ExecutorService 擴(kuò)展了 Executor 接口,在 Executor 基礎(chǔ)上提供了更多的提交任務(wù)的方式和管理線程池的一些方法仿荆。
AbstractExecutorService
public abstract class AbstractExecutorService implements ExecutorService {
}
ThreadPoolExecutor
ThreadPoolExecutor 是線程池框架的關(guān)鍵類. 首先來(lái)看一下 ThreadPoolExecutor 中幾個(gè)重要的屬性.
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* 整個(gè)線程池的狀態(tài)控制類 ctl, 是一個(gè) AtomicInteger 贰您,封裝了下面2個(gè)部分:
* workerCount 表示有效的線程數(shù)
* runState 表示線程池當(dāng)前狀態(tài),是running, shutdown還是其他狀態(tài)
*
* runState表示了線程池的整個(gè)整個(gè)生命周期拢操,可以取以下值:
* RUNNING: 接受新的task锦亦,處理隊(duì)列中的task
* SHUTDOWN: 不接受新的 task 但會(huì)處理隊(duì)列中的 task
* STOP: 不接受新的 task, 不處理隊(duì)列中的 task, 中斷正在執(zhí)行的 task
* TIDYING: 所有 task 執(zhí)行結(jié)束, workerCount 是 0, 線程過(guò)渡到TIDYING會(huì)調(diào)用 terminated()鉤子方法
* TERMINATED: terminated()執(zhí)行完成
*
* 狀態(tài)轉(zhuǎn)換關(guān)系:
*
* RUNNING -> SHUTDOWN(調(diào)用shutdown())
* On invocation of shutdown(), perhaps implicitly in finalize()
* (RUNNING or SHUTDOWN) -> STOP(調(diào)用shutdownNow())
* On invocation of shutdownNow()
* SHUTDOWN -> TIDYING(queue和pool均empty)
* When both queue and pool are empty
* TIDYING -> TERMINATED(調(diào)用terminated())
* When the terminated() hook method has completed
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
/**
* task 的隊(duì)列
*/
private final BlockingQueue<Runnable> workQueue;
/**
* worker 線程的 Set, 只有持有 mainlock 時(shí)才能訪問(wèn)
*/
private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet<Worker> workers = new HashSet<Worker>();
private final Condition termination = mainLock.newCondition();
private int largestPoolSize;
private long completedTaskCount;
private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler handler;
/**
* idle 線程waiting for work的超時(shí)時(shí)間
*/
private volatile long keepAliveTime;
/**
* 如果是 false,core threads將會(huì)保持 alive 即使處于 idel 狀態(tài)
* 如果是 true,core threads會(huì)keepAliveTime作為超時(shí)時(shí)間 wait for work
*/
private volatile boolean allowCoreThreadTimeOut;
private volatile int corePoolSize;
private volatile int maximumPoolSize;
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
}
workerCount, runState 使用一個(gè) AtomicInteger 進(jìn)行了封裝, runState用 int 的高3位標(biāo)書(shū),低位表示 workerCount, 所以我們能看到 ThreadPoolExecutor 中和 ctl 相關(guān)的常量和解析方法.
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
ThreadPoolExecutor 最主要的構(gòu)造函數(shù),設(shè)置上面說(shuō)的重要屬性.
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
現(xiàn)在來(lái)看一下 execute() 方法, execute()方法有3個(gè)處理步驟:
- 線程數(shù)小于 corePoolSize 時(shí),則試圖創(chuàng)建一個(gè)新的 worker 線程
- 如果上面一步失敗了,則試圖將任務(wù)添加到阻塞隊(duì)列中令境,并且要再一次判斷需要不需要回滾隊(duì)列杠园,或者說(shuō)創(chuàng)建線程
- 如果上面兩步都失敗了,則會(huì)試圖強(qiáng)行創(chuàng)建一個(gè)線程來(lái)執(zhí)行這個(gè)任務(wù)舔庶,如果還是失敗抛蚁,扔掉這個(gè)任務(wù)
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 1.判斷有效線程數(shù)是否小于核心線程數(shù)
if (workerCountOf(c) < corePoolSize) {
//創(chuàng)建新線程
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2.分開(kāi)來(lái)看,首先判斷當(dāng)前的池子是否是處于 running 狀態(tài)
// 因?yàn)橹挥?running 狀態(tài)才可以接收新任務(wù)
// 接下來(lái)判斷能否成功添加到隊(duì)列中惕橙,如果隊(duì)列滿了或者其他情況則會(huì)跳到下一步
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次檢查池子的狀態(tài)瞧甩,如果進(jìn)入了非 running 狀態(tài),回滾隊(duì)列弥鹦,扔掉這個(gè)任務(wù)
if (! isRunning(recheck) && remove(command))
reject(command);
//如果處于 running 狀態(tài)則檢查當(dāng)前的有效線程肚逸,如果沒(méi)有則創(chuàng)建一個(gè)線程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3.前兩步失敗了,就強(qiáng)行創(chuàng)建線程彬坏,成功會(huì)返回true朦促,如果失敗扔掉這個(gè)任務(wù)
else if (!addWorker(command, false))
reject(command);
}
解釋一下第二步,為什么要recheck
當(dāng)這個(gè)任務(wù)被添加到了阻塞隊(duì)列前栓始,池子處于 RUNNING 狀態(tài)务冕,但如果在添加到隊(duì)列成功后,池子進(jìn)入了 SHUTDOWN 狀態(tài)或者其他狀態(tài)幻赚,這時(shí)候是不應(yīng)該再接收新的任務(wù)的禀忆,所以需要把這個(gè)任務(wù)從隊(duì)列中移除,并且 reject
同樣坯屿,在沒(méi)有添加到隊(duì)列前油湖,可能有一個(gè)有效線程,但添加完任務(wù)后领跛,這個(gè)線程閑置超時(shí)或者因?yàn)楫惓1桓傻袅耍@時(shí)候需要?jiǎng)?chuàng)建一個(gè)新的線程來(lái)執(zhí)行任務(wù)
為了更直觀的理解一個(gè)任務(wù)的執(zhí)行過(guò)程撤奸,可以參考下面這張圖
addWorker()
前一步把 execute 的流程捋了一遍吠昭,里面多次出現(xiàn)了 addWorker() 方法喊括,前文說(shuō)到這是個(gè)創(chuàng)建線程的方法,來(lái)看看 addWorker 做了些什么矢棚,這個(gè)方法代碼比較長(zhǎng)郑什,我們拆開(kāi)來(lái)一點(diǎn)一點(diǎn)看.
- 第一部分 — 判斷各種基礎(chǔ)異常
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 檢查線程池狀態(tài),隊(duì)列狀態(tài)蒲肋,以及 firstask 蘑拯,拆開(kāi)來(lái)看
// 這段代碼看起來(lái)異常的蛋疼,轉(zhuǎn)換一下邏輯即
// rs>= SHUTDOWN && (rs != SHUTDOWN || firstTask != null ||workQueue.isEmpty())
// 總結(jié)起來(lái)就是 當(dāng)前處于非 Running 狀態(tài),并且這三種情況
// 1. 不是處于 SHUTDOWN 狀態(tài),不能再創(chuàng)建線程
// 2. 有新的任務(wù) (因?yàn)椴荒茉俳邮招碌娜蝿?wù))
// 3. 阻塞隊(duì)列中已經(jīng)沒(méi)有任務(wù) (不需要再創(chuàng)建線程)
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//當(dāng)前有效線程數(shù)目
int wc = workerCountOf(c);
// 根據(jù)傳入的參數(shù)確定以核心線程數(shù)還是最大線程數(shù)作為判斷條件
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
// 大于容量 或者指定的線程數(shù)兜粘,不允許創(chuàng)建
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
}
- 第二部分 — 試圖創(chuàng)建線程
創(chuàng)建一個(gè)Worker
boolean workerStarted = false; //標(biāo)記 worker 開(kāi)啟狀態(tài)
boolean workerAdded = false; //標(biāo)記 worker 添加狀態(tài)
Worker w = null;
try {
w = new Worker(firstTask); //將這個(gè)任務(wù)作為 worker 的第一個(gè)任務(wù)傳入
final Thread t = w.thread; //通過(guò) worker 獲取到一個(gè)線程
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// running狀態(tài)申窘,或者 shutdown 狀態(tài)但是沒(méi)有新的任務(wù)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 將這個(gè) worker 添加到線程池中
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// 標(biāo)記worker添加成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 如果 worker 創(chuàng)建成功,開(kāi)啟線程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
上面代碼從邏輯層面來(lái)看不算難懂孔轴,到這里一個(gè)任務(wù)到達(dá)后剃法,ThreadPoolExecutor 的處理就結(jié)束了,那么任務(wù)又是怎么被添加到阻塞隊(duì)列中路鹰,線程是如何從隊(duì)列中取出任務(wù)贷洲,上文中的 Worker 又是什么東西?
一個(gè)一個(gè)來(lái)晋柱,先來(lái)看看 Worker 到底是什么.
Worker
Worker 是 ThreadPoolExecutor 的一個(gè)內(nèi)部類优构,實(shí)現(xiàn)了 Runnable 接口,繼承自 AbstractQueuedSynchronizer,這又是個(gè)什么鬼雁竞?俩块?? 這個(gè)就是經(jīng)常見(jiàn)到的 AQS 的全稱,這個(gè)暫時(shí)還沒(méi)有研究.~~~~
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
}
簡(jiǎn)單來(lái)說(shuō),Worker實(shí)現(xiàn)了 lock 和 unLock 方法來(lái)標(biāo)示當(dāng)前線程的狀態(tài)是否為閑置
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
上一節(jié)創(chuàng)建線程成功后調(diào)用 t.start() 而這個(gè)線程又是 Worker 的成員變量
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
可以看到這里將 Worker 作為 Runnable 參數(shù)創(chuàng)建了一個(gè)新的線程浓领,我們知道 Thread 接收一個(gè) Runnable 對(duì)象后 start 運(yùn)行的是 Runnable 的 run 方法玉凯,Worker 的 run 方法調(diào)用了 runWorker ,這個(gè)方法里面就是取出任務(wù)執(zhí)行的邏輯
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask; // 獲取到 worker 的第一個(gè)任務(wù)
w.firstTask = null;
w.unlock(); // 標(biāo)記為閑置,還沒(méi)有開(kāi)始任務(wù) 允許打斷
boolean completedAbruptly = true; // 異常退出標(biāo)記
try {
// 循環(huán)取出任務(wù)联贩,如果第一個(gè)任務(wù)不為空漫仆,或者從隊(duì)列中拿到了任務(wù)
// 只要這兩個(gè)條件滿足,會(huì)一直循環(huán)泪幌,直到?jīng)]有任務(wù)盲厌,正常退出,或者異常退出
while (task != null || (task = getTask()) != null) {
w.lock();// 該線程標(biāo)記為非閑置
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 翻譯注釋:1.如果線程池STOPPING狀態(tài)祸泪,需要中斷線程
// 2.Thread.interrupted()是一個(gè)native方法吗浩,返回當(dāng)前線程是否有被等待中斷的請(qǐng)求
// 3.第二個(gè)條件成立時(shí),檢查線程池狀態(tài)没隘,如果為STOP懂扼,并且沒(méi)有被中斷,則中斷線程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
// 執(zhí)行任務(wù)
try {
beforeExecute(wt, task);// 執(zhí)行前
Throwable thrown = null;
try {
task.run(); // 執(zhí)行任務(wù)
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // 執(zhí)行結(jié)束
}
} finally {
task = null; // 將 worker 的任務(wù)置空
w.completedTasks++;
w.unlock(); // 釋放鎖,進(jìn)入閑置狀態(tài)
}
}// 循環(huán)結(jié)束
completedAbruptly = false; // 標(biāo)記為正常退出
} finally {
// 干掉 worker
processWorkerExit(w, completedAbruptly);
}
}
這里弄清楚了一件事情阀湿,進(jìn)入循環(huán)準(zhǔn)備執(zhí)行任務(wù)時(shí)赶熟,worker 加鎖標(biāo)記為非閑置,任務(wù)執(zhí)行完畢或者出現(xiàn)異常陷嘴,worker 釋放鎖映砖,進(jìn)入閑置狀態(tài)。
也就是當(dāng)一個(gè) worker 執(zhí)行任務(wù)前或者執(zhí)行完任務(wù)灾挨,到取出下一個(gè)任務(wù)期間邑退,都是閑置狀態(tài)可以被打斷
上面取出任務(wù)調(diào)用了 getTask() ,誒~為什么有一個(gè)死循環(huán)劳澄,別著急地技,慢慢看來(lái)。上面的代碼可以知道如果 getTask 返回任務(wù)則執(zhí)行浴骂,如果返回為 null 則 worker 需要被回收
private Runnable getTask() {
// 標(biāo)記取任務(wù)是否超時(shí)
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 如果線程池狀態(tài)為 STOP 或者 SHUTDOWN 并且隊(duì)列已經(jīng)為空乓土,回收 wroker
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//獲取當(dāng)前有效線程數(shù)
int wc = workerCountOf(c);
// Are workers subject to culling?
// timed 用來(lái)標(biāo)記當(dāng)前的 worker 是否設(shè)置超時(shí)時(shí)間,
// 還記得獲取線程池的時(shí)候 可以設(shè)置核心線程超時(shí)時(shí)間
//1.允許核心線程超時(shí)回收(即所有線程) 2.當(dāng)前有效線程超過(guò)核心線程數(shù)(需要回收)
// 如果timed == false 則該worker不會(huì)被回收溯警,如果沒(méi)有取到任務(wù) 會(huì)一直阻塞
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 回收線程條件
// 1. 有效線程數(shù)已經(jīng)大于了線程池的最大線程數(shù)或者設(shè)置了超時(shí)回收并且已經(jīng)超時(shí)
// 2. 有效線程數(shù)大于1或者隊(duì)列任務(wù)已經(jīng)為空
// 只有當(dāng)上面1和2 同時(shí)滿足時(shí) 則試圖回收線程
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 如果減少workercount成功 直接回收
if (compareAndDecrementWorkerCount(c))
return null;
// 否則重走循環(huán)趣苏,從第一個(gè)判斷條件處回收
continue;
}
// 取任務(wù)
try {
// 根據(jù)是否設(shè)置超時(shí)回收來(lái)選擇不同的取任務(wù)的方式
// poll 方法取任務(wù)會(huì)有超時(shí)時(shí)間,超過(guò)時(shí)間則返回null
// take 方法沒(méi)有超時(shí)時(shí)間梯轻,阻塞式方法
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 如果任務(wù)不為空返回任務(wù)
if (r != null)
return r;
// 否則標(biāo)記超時(shí) 進(jìn)入下一次循環(huán)等待回收
timedOut = true;
} catch (InterruptedException retry) {
// 如果出現(xiàn)異常食磕,試圖重試
timedOut = false;
}
}
}
getTask() 方法邏輯也捋得差不多了,這里又出現(xiàn)了兩個(gè)新的方法喳挑,workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 和 workQueue.take() 彬伦,這兩個(gè)都是阻塞隊(duì)列的方法,來(lái)看看它們又各自是怎么實(shí)現(xiàn)的
LinkedBlockingQueue — 阻塞隊(duì)列
ThreadPoolExecutor 使用的是鏈表結(jié)構(gòu)的阻塞隊(duì)列伊诵,實(shí)現(xiàn)了 BlockingQueue 接口单绑,而 BlockingQueue 則是繼承自 Queue 接口,再上層就是 Collection 接口曹宴。
因?yàn)楸酒P記主要是分析 ThreadPoolExecutor 的原理搂橙,所以不會(huì)詳細(xì)介紹 LinkedBlockingQueue 中的其它代碼,主要介紹這里所用的方法笛坦,首先來(lái)看一下上文所提到的 take()
public E take() throws InterruptedException {
E x; // 任務(wù)
int c = -1; // 取出任務(wù)后的剩余任務(wù)數(shù)量
final AtomicInteger count = this.count; // 當(dāng)前任務(wù)數(shù)量
final ReentrantLock takeLock = this.takeLock; // 加鎖防止并發(fā)
takeLock.lockInterruptibly();
try {
// 如果隊(duì)列數(shù)量為空区转,則一直循環(huán),阻塞線程
while (count.get() == 0) {
notEmpty.await();
}
// 取出任務(wù)
x = dequeue();
// 任務(wù)數(shù)量減一
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();// 標(biāo)記隊(duì)列非空
} finally {
takeLock.unlock(); // 釋放鎖
}
if (c == capacity)
signalNotFull();//標(biāo)記隊(duì)列已滿
return x;// 返回任務(wù)
}
上面的代碼可以知道 take 方法會(huì)一直阻塞直到隊(duì)列有新的任務(wù)為止
接下來(lái)是 poll 方法版扩,可以看到幾乎與 take 方法相同废离,唯一的區(qū)別是在阻塞的循環(huán)代碼塊里面加了時(shí)間判斷,如果超時(shí)則直接返回為空礁芦,不會(huì)一直阻塞下去
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null; // 存放的任務(wù)
int c = -1;
long nanos = unit.toNanos(timeout); // 超時(shí)時(shí)間
final AtomicInteger count = this.count; // 隊(duì)列中的數(shù)量
final ReentrantLock takeLock = this.takeLock; // 加鎖防止并發(fā)
takeLock.lockInterruptibly();
try {
// 如果隊(duì)列為空蜻韭,則不斷的循環(huán)
while (count.get() == 0) {
// 如果當(dāng)?shù)褂?jì)時(shí)小于0 即超時(shí)時(shí)間到 則返回空
if (nanos <= 0)
return null;
// 讓線程等待
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue(); // 取出一個(gè)任務(wù)
c = count.getAndDecrement(); // 取出后的隊(duì)列數(shù)量
if (c > 1)
notEmpty.signal(); // 標(biāo)記非空
} finally {
takeLock.unlock(); // 釋放鎖
}
if (c == capacity)
signalNotFull(); // 標(biāo)記隊(duì)列已滿
return x; // 返回任務(wù)
}
線程池的回收及終止
前一節(jié)分析了任務(wù)的執(zhí)行流程及原理,也留下了一個(gè)問(wèn)題,worker 是如何被回收的呢湘捎?線程池該如何管理呢诀豁?回到上一節(jié)的 runWorker() 方法中窄刘,還記得最后調(diào)用了一個(gè)方法
processWorkerExit(w, completedAbruptly);
這個(gè)方法傳入了兩個(gè)參數(shù)窥妇,第一個(gè)是當(dāng)前的 Woker ,第二個(gè)是標(biāo)記異常退出的標(biāo)識(shí)
首先判斷是否為異常退出,如果是異常退出的話需要手動(dòng)調(diào)整線程數(shù)量娩践,如果是正郴铘妫回收的,getTask 方法里面已經(jīng)手動(dòng)調(diào)整過(guò)了翻伺,不記得的小伙伴可以看看前文的代碼材泄,找找 decrementWorkerCount(),
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
// 加鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
// 記錄線程池完成的任務(wù)總數(shù),從 workers 中移除該 worker
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate(); // 嘗試關(guān)閉池子
int c = ctl.get();
// 以下的代碼是判斷需不需要給線程池創(chuàng)建一個(gè)新的線程
// 如果線程池的狀態(tài)是 RUNNING 或者 SHUTDOWN 進(jìn)一步判斷需不需要?jiǎng)?chuàng)建
if (runStateLessThan(c, STOP)) {
// 如果為異常退出直接創(chuàng)建吨岭,如果不是異常退出進(jìn)入判斷
if (!completedAbruptly) {
// 獲取線程池應(yīng)該存在的最小線程數(shù) 如果設(shè)置了超時(shí) 則是0拉宗,否則是核心線程數(shù)
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果 min 是0 但是隊(duì)列又不為空,則 min 應(yīng)該是1
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//如果當(dāng)前池中的有效線程數(shù)大于等于最小線程數(shù) 則不需要?jiǎng)?chuàng)建
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 創(chuàng)建線程
addWorker(null, false);
}
}
上面的代碼中調(diào)用了 tryTerminate() 方法辣辫,這個(gè)方法是用于終止線程池的旦事,又是一個(gè) for 循環(huán),從代碼結(jié)構(gòu)來(lái)看是異常情況的重試機(jī)制急灭。還是老方法姐浮,慢慢來(lái)看總共做了幾件事情
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 如果處于這三種情況不需要關(guān)閉線程池
// 1. Running 狀態(tài)
// 2. SHUTDOWN 狀態(tài)并且任務(wù)隊(duì)列不為空,不能終止
// 3. TIDYING 或者 TERMINATE 狀態(tài)葬馋,說(shuō)明已經(jīng)在關(guān)閉了 不需要重復(fù)關(guān)閉
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 進(jìn)入到關(guān)閉線程池的代碼卖鲤,如果線程池中還有線程,則需要打斷線程
if (workerCountOf(c) != 0) { // Eligible to terminate 可以關(guān)閉池子
// 打斷閑置線程畴嘶,只打斷一個(gè)
interruptIdleWorkers(ONLY_ONE);
return;
// 如果有兩個(gè)以上怎么辦蛋逾?只打斷一個(gè)?
// 這里只打斷一個(gè)是因?yàn)?worker 回收的時(shí)候都會(huì)進(jìn)入到該方法中來(lái)窗悯,可以回去再看看
// runWorker方法最后的代碼
}
// 線程已經(jīng)回收完畢区匣,準(zhǔn)備關(guān)閉線程池
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();// 加鎖
try {
// 將狀態(tài)改變?yōu)?TIDYING 并且即將調(diào)用 terminated
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated(); // 終止線程池
} finally {
ctl.set(ctlOf(TERMINATED, 0)); // 改變狀態(tài)
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
// 如果終止失敗會(huì)重試
}
// else retry on failed CAS
}
}
嘗試終止線程池的代碼分析完了,好像就結(jié)束了~但作為好奇寶寶蟀瞧,我們是不是應(yīng)該看看如何打斷閑置線程沉颂,以及 terminated 中做了什么呢?來(lái)吧悦污,繼續(xù)裝逼
先來(lái)看打斷線程
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//加鎖~
try {
// 遍歷線程池中的 wroker
for (Worker w : workers) {
Thread t = w.thread;
// 如果線程沒(méi)有被中斷铸屉,并且能夠獲取到 worker的鎖(說(shuō)明是閑置線程)
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();// 中斷線程
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
// 只中斷一個(gè) worker 跳出循環(huán),否則會(huì)將所有的閑置線程都中斷
if (onlyOne)
break;
}
} finally {
mainLock.unlock();// 釋放鎖
}
有同學(xué)開(kāi)始裝逼了切端,說(shuō)我們是好奇寶寶彻坛,t.interrupt() 方法也應(yīng)該看,嗯~沒(méi)錯(cuò),但這里是調(diào)用了 native 方法昌屉,會(huì) c 的可以去看看裝逼钙蒙,我就算了~
好了,再來(lái)看看 terminate, 是不是很坑爹间驮? terminated 里面神躬厌!馬!也竞帽!沒(méi)扛施!干!屹篓。疙渣。。淡定堆巧,其實(shí)這個(gè)方法類似于 Activity 的生命周期方法妄荔,允許你在被終止時(shí)做一些事情,默認(rèn)的線程池沒(méi)有什么要做的事情谍肤,當(dāng)然什么也沒(méi)寫(xiě)啦~
/**
* Method invoked when the Executor has terminated. Default
* implementation does nothing. Note: To properly nest multiple
* overridings, subclasses should generally invoke
* {@code super.terminated} within this method.
*/
protected void terminated() { }
異常處理
還記得前面講到啦租,出現(xiàn)各種異常情況,添加隊(duì)列失敗等等谣沸,只是籠統(tǒng)的說(shuō)了一句扔掉刷钢,當(dāng)然代碼實(shí)現(xiàn)不可能是簡(jiǎn)單一句扔掉就完了∪楦剑回到 execute() 方法中找到 reject() 任務(wù)内地,看看究竟是怎么處理的
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
還記得在創(chuàng)建線程池的時(shí)候,初始化了一個(gè) handler — RejectedExecutionHandler
這是一個(gè)接口赋除,只有一個(gè)方法,接收兩個(gè)參數(shù)
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
既然是一個(gè)接口阱缓,那么肯定有他的實(shí)現(xiàn)類,我們先不急著看所有實(shí)現(xiàn)類举农,先來(lái)看看這里的 handler 可能是什么荆针,記得在使用 Executors 獲取線程池調(diào)用構(gòu)造方法的時(shí)候并沒(méi)有傳入 handler 參數(shù),那么 ThreadPoolExecutor 應(yīng)該會(huì)有一個(gè)默認(rèn)的 handler
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
默認(rèn) handler 是 AbortPolicy ,這個(gè)類實(shí)現(xiàn)了 rejectedExecution() 方法颁糟,拋了一個(gè) Runtime 異常航背,也就是說(shuō)當(dāng)任務(wù)添加失敗,就會(huì)拋出異常棱貌。這個(gè)類在 AsyncTask 引發(fā)了一場(chǎng)血案~所以在 API19 以后修改了 AsyncTask 的部分代碼邏輯玖媚,這里就不細(xì)說(shuō)啦.
實(shí)際上,在 ThreadPoolExecutor 中除了 AbortPolicy 外還實(shí)現(xiàn)了三種不同類型的 handler
- CallerRunsPolicy — 在 線程池沒(méi)有 shutdown 的前提下婚脱,會(huì)直接在執(zhí)行 execute 方法的線程里執(zhí)行這個(gè)任務(wù)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
- DiscardPolicy — 啥也不干今魔,默默地丟掉任務(wù)~不信你看
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
- DiscardOldestPolicy — 丟棄掉隊(duì)列中未執(zhí)行的勺像,最老的任務(wù),也就是任務(wù)隊(duì)列排頭的任務(wù)错森,然后再試圖在執(zhí)行一次
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
歡迎關(guān)注我的公眾號(hào)