前言
前兩天和粉絲聊天的時(shí)候,粉絲問(wèn)了我一個(gè)挺有意思的問(wèn)題悉稠,說(shuō)他之前在面試的時(shí)候被問(wèn)到線程池的線程復(fù)用原理宫蛆,當(dāng)時(shí)我跟他簡(jiǎn)單的說(shuō)了一下,沒(méi)想到過(guò)了幾天又來(lái)問(wèn)我這個(gè)問(wèn)題了的猛,說(shuō)他最近又被問(wèn)到了這個(gè)問(wèn)題.......想了想耀盗,干脆寫篇文章把這個(gè)東西講清楚吧想虎,滿滿的干貨都放在下面了
1.什么是線程復(fù)用?
在線程池中叛拷,通過(guò)同一個(gè)線程去執(zhí)行不同的任務(wù)舌厨,這就是線程復(fù)用。
假設(shè)現(xiàn)在有 100 個(gè)任務(wù)忿薇,我們創(chuàng)建一個(gè)固定線程的線程池(FixedThreadPool)裙椭,核心線程數(shù)和最大線程數(shù)都是 3,那么當(dāng)這個(gè) 100 個(gè)任務(wù)執(zhí)行完署浩,都只會(huì)使用三個(gè)線程揉燃。
示例:
public class FixedThreadPoolDemo {
static ExecutorService executorService = Executors.newFixedThreadPool(3);
public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + "-> 執(zhí)行");
});
}
// 關(guān)閉線程池
executorService.shutdown();
}
}
執(zhí)行結(jié)果:
pool-1-thread-1-> 執(zhí)行
pool-1-thread-2-> 執(zhí)行
pool-1-thread-3-> 執(zhí)行
pool-1-thread-1-> 執(zhí)行
pool-1-thread-3-> 執(zhí)行
pool-1-thread-2-> 執(zhí)行
pool-1-thread-3-> 執(zhí)行
pool-1-thread-1-> 執(zhí)行
...
2.線程復(fù)用的原理
線程池將線程和任務(wù)進(jìn)行解耦,線程是線程瑰抵,任務(wù)是任務(wù)你雌,擺脫了之前通過(guò) Thread 創(chuàng)建線程時(shí)的一個(gè)線程必須對(duì)應(yīng)一個(gè)任務(wù)的限制。
在線程池中二汛,同一個(gè)線程可以從阻塞隊(duì)列中不斷獲取新任務(wù)來(lái)執(zhí)行婿崭,其核心原理在于線程池對(duì) Thread 進(jìn)行了封裝,并不是每次執(zhí)行任務(wù)都會(huì)調(diào)用 Thread.start() 來(lái)創(chuàng)建新線程肴颊,而是讓每個(gè)線程去執(zhí)行一個(gè)“循環(huán)任務(wù)”氓栈,在這個(gè)“循環(huán)任務(wù)”中不停的檢查是否有任務(wù)需要被執(zhí)行,如果有則直接執(zhí)行婿着,也就是調(diào)用任務(wù)中的 run 方法授瘦,將 run 方法當(dāng)成一個(gè)普通的方法執(zhí)行,通過(guò)這種方式將只使用固定的線程就將所有任務(wù)的 run 方法串聯(lián)起來(lái)竟宋。
3.線程池執(zhí)行流程
這部分內(nèi)容在 Java 線程池的各個(gè)參數(shù)的含義 討論過(guò)提完,這里我們?cè)購(gòu)?fù)習(xí)一次,再?gòu)闹腥チ私饩€程復(fù)用丘侠。
3.1 流程圖
3.2 線程創(chuàng)建的流程
當(dāng)任務(wù)提交之后徒欣,線程池首先會(huì)檢查當(dāng)前線程數(shù),如果當(dāng)前的線程數(shù)小于核心線程數(shù)(corePoolSize)蜗字,比如最開始創(chuàng)建的時(shí)候線程數(shù)為 0打肝,則新建線程并執(zhí)行任務(wù)。
當(dāng)提交的任務(wù)不斷增加挪捕,創(chuàng)建的線程數(shù)等于核心線程數(shù)(corePoolSize)粗梭,新增的任務(wù)會(huì)被添加到 workQueue 任務(wù)隊(duì)列中,等待核心線程執(zhí)行完當(dāng)前任務(wù)后断医,重新從 workQueue 中獲取任務(wù)執(zhí)行。
假設(shè)任務(wù)非常多孩锡,達(dá)到了 workQueue 的最大容量,但是當(dāng)前線程數(shù)小于最大線程數(shù)(maximumPoolSize)躬窜,線程池會(huì)在核心線程數(shù)(corePoolSize)的基礎(chǔ)上繼續(xù)創(chuàng)建線程來(lái)執(zhí)行任務(wù)浇垦。
假設(shè)任務(wù)繼續(xù)增加,線程池的線程數(shù)達(dá)到最大線程數(shù)(maximumPoolSize)荣挨,如果任務(wù)繼續(xù)增加,這個(gè)時(shí)候線程池就會(huì)采用拒絕策略來(lái)拒絕這些任務(wù)此虑。
在任務(wù)不斷增加的過(guò)程中,線程池會(huì)逐一進(jìn)行以下 4 個(gè)方面的判斷
核心線程數(shù)(corePoolSize)
任務(wù)隊(duì)列(workQueue)
最大線程數(shù)(maximumPoolSize)
拒絕策略
3.3 ThreadPoolExecutor#execute 源碼分析
java.util.concurrent.ThreadPoolExecutor#execute
public void execute(Runnable command) {
// 如果傳入的Runnable的空口锭,就拋出異常
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 線程池中的線程比核心線程數(shù)少
if (workerCountOf(c) < corePoolSize) {
// 新建一個(gè)核心線程執(zhí)行任務(wù)
if (addWorker(command, true))
return;
c = ctl.get();
}
// 核心線程已滿朦前,但是任務(wù)隊(duì)列未滿,添加到隊(duì)列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 任務(wù)成功添加到隊(duì)列以后鹃操,再次檢查是否需要添加新的線程,因?yàn)橐汛嬖诘木€程可能被銷毀了
if (! isRunning(recheck) && remove(command))
// 如果線程池處于非運(yùn)行狀態(tài)恩伺,并且把當(dāng)前的任務(wù)從任務(wù)隊(duì)列中移除成功椰拒,則拒絕該任務(wù)
reject(command);
else if (workerCountOf(recheck) == 0)
// 如果之前的線程已經(jīng)被銷毀完,新建一個(gè)非核心線程
addWorker(null, false);
}
// 核心線程池已滿燃观,隊(duì)列已滿,嘗試創(chuàng)建一個(gè)非核心新的線程
else if (!addWorker(command, false))
// 如果創(chuàng)建新線程失敗缆毁,說(shuō)明線程池關(guān)閉或者線程池滿了,拒絕任務(wù)
reject(command);
}
3.4 逐行分析
//如果傳入的Runnable的空积锅,就拋出異常
if (command == null)
throw new NullPointerException();
execute 方法中通過(guò) if 語(yǔ)句判斷 command 缚陷,也就是 Runnable 任務(wù)是否等于 null,如果為 null 就拋出異常往核。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
判斷當(dāng)前線程數(shù)是否小于核心線程數(shù),如果小于核心線程數(shù)就調(diào)用 addWorker() 方法增加一個(gè) Worker虎锚,這里的 Worker 就可以理解為一個(gè)線程。
addWorker 方法的主要作用是在線程池中創(chuàng)建一個(gè)線程并執(zhí)行傳入的任務(wù)窜护,如果返回 true 代表添加成功,如果返回 false 代表添加失敗柱徙。
第一個(gè)參數(shù)表示傳入的任務(wù)
第二個(gè)參數(shù)是個(gè)布爾值,如果布爾值傳入 true 代表增加線程時(shí)判斷當(dāng)前線程是否少于 corePoolSize敌完,小于則增加新線程(核心線程)羊初,大于等于則不增加滨溉;同理长赞,如果傳入 false 代表增加線程時(shí)判斷當(dāng)前線程是否少于 maximumPoolSize,小于則增加新線程(非核心線程)涧卵,大于等于則不增加,所以這里的布爾值的含義是以核心線程數(shù)為界限還是以最大線程數(shù)為界限進(jìn)行是否新增非核心線程的判斷
這一段判斷相關(guān)源碼如下
private boolean addWorker(Runnable firstTask, boolean core) {
...
int wc = workerCountOf(c);//當(dāng)前工作線程數(shù)
//判斷當(dāng)前工作線程數(shù)>=最大線程數(shù) 或者 >=核心線程數(shù)(當(dāng)core = true)
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
...
最核心的就是 core ? corePoolSize : maximumPoolSize 這個(gè)三目運(yùn)算伐脖。
// 核心線程已滿乐设,但是任務(wù)隊(duì)列未滿,添加到隊(duì)列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 任務(wù)成功添加到隊(duì)列以后蠕啄,再次檢查是否需要添加新的線程戈锻,因?yàn)橐汛嬖诘木€程可能被銷毀了
if (! isRunning(recheck) && remove(command))
// 如果線程池處于非運(yùn)行狀態(tài),并且把當(dāng)前的任務(wù)從任務(wù)隊(duì)列中移除成功格遭,則拒絕該任務(wù)
reject(command);
else if (workerCountOf(recheck) == 0)
// 如果之前的線程已經(jīng)被銷毀完拒迅,新建一個(gè)非核心線程
addWorker(null, false);
}
如果代碼執(zhí)行到這里她倘,說(shuō)明當(dāng)前線程數(shù)大于或等于核心線程數(shù)或者 addWorker 失敗了作箍,那么就需要通過(guò)
if (isRunning(c) && workQueue.offer(command)) 檢查線程池狀態(tài)是否為 Running,如果線程池狀態(tài)是 Running 就通過(guò) workQueue.offer(command) 將任務(wù)放入任務(wù)隊(duì)列中胞得,
任務(wù)成功添加到隊(duì)列以后,再次檢查線程池狀態(tài)罩息,如果線程池不處于 Running 狀態(tài)个扰,說(shuō)明線程池被關(guān)閉,那么就移除剛剛添加到任務(wù)隊(duì)列中的任務(wù)递宅,并執(zhí)行拒絕策略,代碼如下:
if (! isRunning(recheck) && remove(command))
// 如果線程池處于非運(yùn)行狀態(tài)烘绽,并且把當(dāng)前的任務(wù)從任務(wù)隊(duì)列中移除成功俐填,則拒絕該任務(wù)
reject(command);
下面我們?cè)賮?lái)看后一個(gè) else 分支:
else if (workerCountOf(recheck) == 0)
// 如果之前的線程已經(jīng)被銷毀完,新建一個(gè)非核心線程
addWorker(null, false);
進(jìn)入這個(gè) else 說(shuō)明前面判斷到線程池狀態(tài)為 Running盏檐,那么當(dāng)任務(wù)被添加進(jìn)來(lái)之后就需要防止沒(méi)有可執(zhí)行線程的情況發(fā)生(比如之前的線程被回收了或意外終止了)驶悟,所以此時(shí)如果檢查當(dāng)前線程數(shù)為 0,也就是 workerCountOf(recheck) == 0痕鳍,那就執(zhí)行 addWorker() 方法新建一個(gè)非核心線程。
我們?cè)賮?lái)看最后一部分代碼:
// 核心線程池已滿熊响,隊(duì)列已滿诗赌,嘗試創(chuàng)建一個(gè)非核心新的線程
else if (!addWorker(command, false))
// 如果創(chuàng)建新線程失敗,說(shuō)明線程池關(guān)閉或者線程池滿了剔难,拒絕任務(wù)
reject(command);
執(zhí)行到這里奥喻,說(shuō)明線程池不是 Running 狀態(tài),又或者線程數(shù) >= 核心線程數(shù)并且任務(wù)隊(duì)列已經(jīng)滿了环鲤,根據(jù)規(guī)則,此時(shí)需要添加新線程吵冒,直到線程數(shù)達(dá)到“最大線程數(shù)”西剥,所以此時(shí)就會(huì)再次調(diào)用 addWorker 方法并將第二個(gè)參數(shù)傳入 false,傳入 false 代表增加非核心線程瞭空。
addWorker 方法如果返回 true 代表添加成功,如果返回 false 代表任務(wù)添加失敗南捂,說(shuō)明當(dāng)前線程數(shù)已經(jīng)達(dá)到 maximumPoolSize旧找,然后執(zhí)行拒絕策略 reject 方法。
如果執(zhí)行到這里線程池的狀態(tài)不是 Running鞭缭,那么 addWorker 會(huì)失敗并返回 false愿卒,所以也會(huì)執(zhí)行拒絕策略 reject 方法。
4.線程復(fù)用源碼分析
java.util.concurrent.ThreadPoolExecutor#runWorker
省略掉部分和復(fù)用無(wú)關(guān)的代碼之后琼开,代碼如下:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 釋放鎖 設(shè)置work的state=0 允許中斷
boolean completedAbruptly = true;
try {
//一直執(zhí)行 如果task不為空 或者 從隊(duì)列中獲取的task不為空
while (task != null || (task = getTask()) != null) {
task.run();//執(zhí)行task中的run方法
}
}
completedAbruptly = false;
} finally {
//1.將 worker 從數(shù)組 workers 里刪除掉
//2.根據(jù)布爾值 allowCoreThreadTimeOut 來(lái)決定是否補(bǔ)充新的 Worker 進(jìn)數(shù)組 workers
processWorkerExit(w, completedAbruptly);
}
}
可以看到柜候,實(shí)現(xiàn)線程復(fù)用的邏輯主要在一個(gè)不停循環(huán)的 while 循環(huán)體中。
通過(guò)獲取 Worker 的 firstTask 或者通過(guò) getTask 方法從 workQueue 中獲取待執(zhí)行的任務(wù)
直接通過(guò) task.run() 來(lái)執(zhí)行具體的任務(wù)(而不是新建線程)
在這里渣刷,我們找到了線程復(fù)用最終的實(shí)現(xiàn),通過(guò)取 Worker 的 firstTask 或者 getTask 方法從 workQueue 中取出了新任務(wù)箩溃,并直接調(diào)用 Runnable 的 run 方法來(lái)執(zhí)行任務(wù),也就是如之前所說(shuō)的涣旨,每個(gè)線程都始終在一個(gè)大循環(huán)中霹陡,反復(fù)獲取任務(wù)和蚪,然后執(zhí)行任務(wù)烹棉,從而實(shí)現(xiàn)了線程的復(fù)用。
總結(jié)
這篇關(guān)于線程池的線程復(fù)用原理的文章就到這里了催束,大家看完有什么不懂的歡迎在下方留言評(píng)論辅髓,也可以私信問(wèn)我,我看到了一般都會(huì)回復(fù)的!