轉自http://blog.luoyuanhang.com/2017/02/27/thread-pool-in-java-2/
這是【從0到1學習Java線程池】系列文章的第 貳 篇蔚袍,該系列文章總共三篇阴绢,介紹了 Java 線程池的使用以及原理十艾,并且最后會實現(xiàn)一個基本的線程池仙蚜。本篇文章介紹了 Java 線程池的原理椭微。
【從0到1學習Java線程池】系列文章共有3篇,目錄如下:
在上一篇文章中(【從0到1學習Java線程池】Java線程池的簡介以及使用)疤孕,我們總結了線程池的3個優(yōu)點:
- 線程復用
- 控制最大并發(fā)數(shù)
- 管理線程
這篇文章會分別從這三個方面传蹈,結合具體的代碼實現(xiàn)來剖析 Java 線程池的原理以及它的具體實現(xiàn)。
線程復用
我們知道線程池的一個作用是創(chuàng)建和銷毀線程的次數(shù)习寸,每個工作線程可以多次使用胶惰。這個功能就是線程復用。想要了解 Java 線程池是如何進行線程復用的霞溪,我們首先需要了解線程的生命周期孵滞。
線程生命周期
下圖描述了線程完整的生命周期:
在一個線程完整的生命周期中中捆,它可能經歷五種狀態(tài):新建(New)、就緒(Runnable)坊饶、運行(Running)泄伪、阻塞(Blocked)、終止(Zombie)幼东。
在 Java中臂容,Thread 通過new來新建一個線程,這個過程是是初始化一些線程信息根蟹,如線程名脓杉、id、線程所屬group等简逮,可以認為只是個普通的對象球散。調用Thread的start()
后Java虛擬機會為其創(chuàng)建方法調用棧和程序計數(shù)器,同時將hasBeenStarted
為true散庶,之后如果再次調用start()
方法就會有異常蕉堰。
處于這個狀態(tài)中的線程并沒有開始運行,只是表示該線程可以運行了悲龟。至于該線程何時開始運行屋讶,取決于 JVM 里線程調度器的調度。當線程獲取CPU后须教,run()
方法會被調用皿渗。不要自己去調用Thread的run()
方法。之后根據(jù)CPU的調度轻腺,線程就會在就緒—運行—阻塞間切換乐疆,直到run()
方法結束或其他方式停止線程,進入終止狀態(tài)贬养。
因此挤土,如果要實現(xiàn)線程的復用,我們必須要保證線程池中的線程保持存活狀態(tài)(就緒误算、運行仰美、阻塞)。接下來儿礼,我們就來看看ThreadPoolExecutor
是如何實現(xiàn)線程復用的筒占。
Worker 類
ThreadPoolExecutor
主要是通過一個類來控制線程復用的:Worker 類。
我們來看一下簡化后的 Worker 類代碼:
private final class Worker implements Runnable {
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Runnable task = w.firstTask;
w.firstTask = null;
while (task != null || (task = getTask()) != null){
task.run();
}
}
……
}
從代碼中蜘犁,我們可以看到 Worker 實現(xiàn)了 Runnable 接口翰苫,并且它還有一個 Thread成員變量 thread,這個 thread 就是要開啟運行的線程。我們看到 Worker 的構造方法中傳遞了一個 Runnable 參數(shù)奏窑,同時它把自己作為參數(shù)傳入 newThread()
导披,這樣的話,當 Thread 的start()
方法得到調用時埃唯,執(zhí)行的其實是 Worker 的run()
方法撩匕,即runWorker()
方法。
runWorker()
方法之中有一個 while 循環(huán)墨叛,使用 getTask()
來獲取任務止毕,并執(zhí)行。接下來漠趁,我們將會看到getTask()
是如何獲取到 Runnable 對象的扁凛。
getTask()
我們來看一下簡化后的getTask()
代碼:
private Runnable getTask() {
if(一些特殊情況) {
return null;
}
Runnable r = workQueue.take();
return r;
}
我們可以看到任務是從 workQueue中獲取的,這個 workQueue 就是我們初始化 ThreadPoolExecutor 時存放任務的 BlockingQueue隊列闯传,這個隊列里的存放的都是將要執(zhí)行的 Runnable任務谨朝。因為 BlockingQueue 是個阻塞隊列,BlockingQueue.take()
返回的是空甥绿,則進入等待狀態(tài)直到 BlockingQueue 有新的對象被加入時喚醒阻塞的線程字币。所以一般情況下,Thread的run()方法不會結束共缕,而是不斷執(zhí)行workQueue里的Runnable任務洗出,這就達到了線程復用的目的了。
控制最大并發(fā)數(shù)
我們現(xiàn)在已經知道了 Java 線程池是如何做到線程復用的了图谷,但是Runnable 是什么時候被放入 workQueue 隊列中的呢翩活,Worker里的Thread的又是什么時候調用start()
開啟新線程來執(zhí)行Worker的run()方法的呢?從上面的分析中我們可以看出Worker里的runWorker()
執(zhí)行任務時是一個接一個蜓萄,串行進行的隅茎,那并發(fā)是怎么體現(xiàn)的呢澄峰?它又是如何做到控制最大并發(fā)數(shù)的呢嫉沽?
execute()
通過查看 execute()
就能解答上述的一些問題,同樣是簡化后的代碼:
public void execute(Runnable command) {
if (command == null) throw new NullPointerException();
int c = ctl.get();
// 當前線程數(shù) < corePoolSize
if (workerCountOf(c) < corePoolSize) {
// 直接啟動新的線程俏竞。
if (addWorker(command, true))
return;
c = ctl.get();
}
// 活動線程數(shù) >= corePoolSize
// runState為RUNNING && 隊列未滿
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次檢驗是否為RUNNING狀態(tài)
// 非RUNNING狀態(tài) 則從workQueue中移除任務并拒絕
if (!isRunning(recheck) && remove(command))
reject(command);
// 采用線程池指定的策略拒絕任務
// 兩種情況:
// 1.非RUNNING狀態(tài)拒絕新的任務
// 2.隊列滿了啟動新的線程失敵袼丁(workCount > maximumPoolSize)
} else if (!addWorker(command, false))
reject(command);
}
addWorker()
我們再來看一下addWorker()
的簡化代碼:
private boolean addWorker(Runnable firstTask, boolean core) {
int wc = workerCountOf(c);
if (wc >= (core ? corePoolSize : maximumPoolSize)) {
return false;
}
w = new Worker(firstTask);
final Thread t = w.thread;
t.start();
}
根據(jù)上面的代碼,線程池工作過程中是如何添加任務的就很清晰了:
- 如果正在運行的線程數(shù)量小于 corePoolSize魂毁,那么馬上創(chuàng)建線程運行這個任務玻佩;
- 如果正在運行的線程數(shù)量大于或等于 corePoolSize,那么將這個任務放入隊列席楚;
- 如果這時候隊列滿了咬崔,而且正在運行的線程數(shù)量小于 maximumPoolSize,那么還是要創(chuàng)建非核心線程立刻運行這個任務;
- 如果隊列滿了垮斯,而且正在運行的線程數(shù)量大于或等于 maximumPoolSize郎仆,那么線程池會拋出異常RejectExecutionException
如果通過addWorker()
成功創(chuàng)建新的線程,則通過start()
開啟新線程兜蠕,同時將firstTask作為這個Worker里的run()
中執(zhí)行的第一個任務扰肌。雖然每個Worker的任務是串行處理,但如果創(chuàng)建了多個Worker熊杨,因為共用一個workQueue曙旭,所以就會并行處理了。所以可以根據(jù)corePoolSize和maximumPoolSize來控制最大并發(fā)數(shù)晶府。
過程如下圖所示:
一個例子
如果是做 Android 開發(fā)的桂躏,并且對 Handler 原理比較熟悉,你可能會覺得這個圖挺熟悉郊霎,其中的一些過程和Handler沼头,Looper,Meaasge使用中书劝,很相似进倍。Handler.send(Message)
相當于execute(Runnuble)
,Looper中維護的Meaasge隊列相當于BlockingQueue购对,只不過需要自己通過同步來維護這個隊列猾昆,Looper中的loop()
函數(shù)循環(huán)從Meaasge隊列取Meaasge和Worker中的runWork()
不斷從BlockingQueue取Runnable是同樣的道理。
管理線程
上邊的文章已經講了骡苞,通過線程池可以很好的管理線程的復用垂蜗,控制并發(fā)數(shù),以及銷毀等過程解幽,而線程的管理過程已經穿插在其中了贴见,也很好理解。
在 ThreadPoolExecutor 有個AtomicInteger變量 ctl躲株,這一個變量保存了兩個內容:
- 所有線程的數(shù)量
- 每個線程所處的狀態(tài)
其中低29位存線程數(shù)片部,高3位存runState,通過位運算來得到不同的值霜定。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//得到線程的狀態(tài)
private static int runStateOf(int c) { return c & ~CAPACITY; }
//得到Worker的的數(shù)量
private static int workerCountOf(int c) { return c & CAPACITY; }
// 判斷線程是否在運行
private static boolean isRunning(int c) { return c < SHUTDOWN; }
這里主要通過shutdown和shutdownNow()來分析線程池的關閉過程档悠。首先線程池有五種狀態(tài)來控制任務添加與執(zhí)行。主要介紹以下三種:
- RUNNING狀態(tài):線程池正常運行望浩,可以接受新的任務并處理隊列中的任務辖所;
- SHUTDOWN狀態(tài):不再接受新的任務,但是會執(zhí)行隊列中的任務磨德;
- STOP狀態(tài):不再接受新任務缘回,不處理隊列中的任務
shutdown()
這個方法會將runState置為SHUTDOWN,會終止所有空閑的線程,而仍在工作的線程不受影響酥宴,所以隊列中的任務人會被執(zhí)行揩环;shutdownNow()
方法將runState置為STOP。和shutdown()
方法的區(qū)別是幅虑,這個方法會終止所有的線程丰滑,所以隊列中的任務也不會被執(zhí)行了。