前言
原以為線程池還挺簡單的(平時常用钮蛛,也分析過原理)洒忧,這次是想自己動手寫一個線程池來更加深入的了解它;但在動手寫的過程中落地到細節(jié)時發(fā)現(xiàn)并沒想的那么容易咏尝。結(jié)合源碼對比后確實不得不佩服 Doug Lea
。
我覺得大部分人直接去看 java.util.concurrent.ThreadPoolExecutor
的源碼時都是看一個大概啸罢,因為其中涉及到了許多細節(jié)處理编检,還有部分 AQS
的內(nèi)容,所以想要理清楚具體細節(jié)并不是那么容易扰才。
與其挨個分析源碼不如自己實現(xiàn)一個簡版允懂,當(dāng)然簡版并不意味著功能缺失,需要保證核心邏輯一致衩匣。
所以也是本篇文章的目的:
自己動手寫一個五臟俱全的線程池蕾总,同時會了解到線程池的工作原理,以及如何在工作中合理的利用線程池琅捏。
再開始之前建議對線程池不是很熟悉的朋友看看這幾篇:
這里我截取了部分內(nèi)容生百,也許可以埋個伏筆(坑)。
具體請看這兩個鏈接柄延。
由于篇幅限制蚀浆,本次可能會分為上下兩篇。
創(chuàng)建線程池
現(xiàn)在進入正題搜吧,新建了一個 CustomThreadPool
類,它的工作原理如下:
簡單來說就是往線程池里邊丟任務(wù)市俊,丟的任務(wù)會緩沖到隊列里;線程池里存儲的其實就是一個個的 Thread
滤奈,他們會一直不停的從剛才緩沖的隊列里獲取任務(wù)執(zhí)行摆昧。
流程還是挺簡單。
先來看看我們這個自創(chuàng)的線程池的效果如何吧:
初始化了一個核心為3僵刮、最大線程數(shù)為5据忘、隊列大小為 4 的線程池鹦牛。
先往其中丟了 10 個任務(wù),由于阻塞隊列的大小為 4 勇吊,最大線程數(shù)為 5 曼追,所以由于隊列里緩沖不了最終會創(chuàng)建 5 個線程(上限)。
過段時間沒有任務(wù)提交后(sleep
)則會自動縮容到三個線程(保證不會小于核心線程數(shù))汉规。
構(gòu)造函數(shù)
來看看具體是如何實現(xiàn)的礼殊。
下面則是這個線程池的構(gòu)造函數(shù):
會有以下幾個核心參數(shù):
-
miniSize
最小線程數(shù),等效于ThreadPool
中的核心線程數(shù)针史。 -
maxSize
最大線程數(shù)晶伦。 -
keepAliveTime
線程保活時間啄枕。 -
workQueue
阻塞隊列婚陪。 -
notify
通知接口。
大致上都和 ThreadPool
中的參數(shù)相同频祝,并且作用也是類似的泌参。
需要注意的是其中初始化了一個 workers
成員變量:
/**
* 存放線程池
*/
private volatile Set<Worker> workers;
public CustomThreadPool(int miniSize, int maxSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, Notify notify) {
workers = new ConcurrentHashSet<>();
}
workers
是最終存放線程池中運行的線程,在 j.u.c
源碼中是一個 HashSet
所以對他所有的操作都是需要加鎖常空。
我這里為了簡便起見就自己定義了一個線程安全的 Set
稱為 ConcurrentHashSet
沽一。
其實原理也非常簡單,和 HashSet
類似也是借助于 HashMap
來存放數(shù)據(jù)漓糙,利用其 key
不可重復(fù)的特性來實現(xiàn) set
铣缠,只是這里的 HashMap
是用并發(fā)安全的 ConcurrentHashMap
來實現(xiàn)的。
這樣就能保證對它的寫入昆禽、刪除都是線程安全的蝗蛙。
不過由于 ConcurrentHashMap
的 size()
函數(shù)并不準(zhǔn)確,所以我這里單獨利用了一個 AtomicInteger
來統(tǒng)計容器大小为狸。
創(chuàng)建核心線程
往線程池中丟一個任務(wù)的時候其實要做的事情還蠻多的歼郭,最重要的事情莫過于創(chuàng)建線程存放到線程池中了。
當(dāng)然我們不能無限制的創(chuàng)建線程辐棒,不然拿線程池來就沒任何意義了病曾。于是 miniSize maxSize
這兩個參數(shù)就有了它的意義。
但這兩個參數(shù)再哪一步的時候才起到作用呢漾根?這就是首先需要明確的泰涂。
從這個流程圖可以看出第一步是需要判斷是否大于核心線程數(shù),如果沒有則創(chuàng)建辐怕。
結(jié)合代碼可以發(fā)現(xiàn)在執(zhí)行任務(wù)的時候會判斷是否大于核心線程數(shù)逼蒙,從而創(chuàng)建線程。
worker.startTask()
執(zhí)行任務(wù)部分放到后面分析寄疏。
這里的 miniSize
由于會在多線程場景下使用是牢,所以也用 volatile
關(guān)鍵字來保證可見性僵井。
隊列緩沖
結(jié)合上面的流程圖,第二步自然是要判斷隊列是否可以存放任務(wù)(是否已滿)驳棱。
優(yōu)先會往隊列里存放批什。
上至封頂
一旦寫入失敗則會判斷當(dāng)前線程池的大小是否大于最大線程數(shù),如果沒有則繼續(xù)創(chuàng)建線程執(zhí)行社搅。
不然則執(zhí)行會嘗試阻塞寫入隊列(j.u.c
會在這里執(zhí)行拒絕策略)
以上的步驟和剛才那張流程圖是一樣的驻债,這樣大家是否有看出什么坑嘛?
時刻小心
從上面流程圖的這兩步可以看出會直接創(chuàng)建新的線程形葬。
這個過程相對于中間直接寫入阻塞隊列的開銷是非常大的合呐,主要有以下兩個原因:
- 創(chuàng)建線程會加鎖,雖說最終用的是 ConcurrentHashMap 的寫入函數(shù)笙以,但依然存在加鎖的可能淌实。
- 會創(chuàng)建新的線程,創(chuàng)建線程還需要調(diào)用操作系統(tǒng)的 API 開銷較大源织。
所以理想情況下我們應(yīng)該避免這兩步翩伪,盡量讓丟入線程池中的任務(wù)進入阻塞隊列中。
執(zhí)行任務(wù)
任務(wù)是添加進來了谈息,那是如何執(zhí)行的?
在創(chuàng)建任務(wù)的時候提到過 worker.startTask()
函數(shù):
/**
* 添加任務(wù)凛剥,需要加鎖
* @param runnable 任務(wù)
*/
private void addWorker(Runnable runnable) {
Worker worker = new Worker(runnable, true);
worker.startTask();
workers.add(worker);
}
也就是在創(chuàng)建線程執(zhí)行任務(wù)的時候會創(chuàng)建 Worker
對象侠仇,利用它的 startTask()
方法來執(zhí)行任務(wù)。
所以先來看看 Worker
對象是長啥樣的:
其實他本身也是一個線程犁珠,將接收到需要執(zhí)行的任務(wù)存放到成員變量 task
處逻炊。
而其中最為關(guān)鍵的則是執(zhí)行任務(wù) worker.startTask()
這一步驟。
public void startTask() {
thread.start();
}
其實就是運行了 worker
線程自己犁享,下面來看 run
方法余素。
- 第一步是將創(chuàng)建線程時傳過來的任務(wù)執(zhí)行(
task.run
),接著會一直不停的從隊列里獲取任務(wù)執(zhí)行,直到獲取不到新任務(wù)了炊昆。 - 任務(wù)執(zhí)行完畢后將內(nèi)置的計數(shù)器 -1 桨吊,方便后面任務(wù)全部執(zhí)行完畢進行通知。
- worker 線程獲取不到任務(wù)后退出凤巨,需要將自己從線程池中釋放掉(
workers.remove(this)
)视乐。
從隊列里獲取任務(wù)
其實 getTask
也是非常關(guān)鍵的一個方法,它封裝了從隊列中獲取任務(wù)敢茁,同時對不需要庇拥恚活的線程進行回收。
很明顯彰檬,核心作用就是從隊列里獲取任務(wù)伸刃;但有兩個地方需要注意:
- 當(dāng)線程數(shù)超過核心線程數(shù)時谎砾,在獲取任務(wù)的時候需要通過保活時間從隊列里獲取任務(wù)捧颅;一旦獲取不到任務(wù)則隊列肯定是空的景图,這樣返回
null
之后在上文的run()
中就會退出這個線程;從而達到了回收線程的目的隘道,也就是我們之前演示的效果
- 這里需要加鎖症歇,加鎖的原因是這里肯定會出現(xiàn)并發(fā)情況,不加鎖會導(dǎo)致
workers.size() > miniSize
條件多次執(zhí)行谭梗,從而導(dǎo)致線程被全部回收完畢忘晤。
關(guān)閉線程池
最后來談?wù)劸€程關(guān)閉的事;
還是以剛才那段測試代碼為例,如果提交任務(wù)后我們沒有關(guān)閉線程激捏,會發(fā)現(xiàn)即便是任務(wù)執(zhí)行完畢后程序也不會退出设塔。
從剛才的源碼里其實也很容易看出來,不退出的原因是 Worker
線程一定還會一直阻塞在 task = workQueue.take();
處远舅,即便是線程縮容了也不會小于核心線程數(shù)闰蛔。
通過堆棧也能證明:
恰好剩下三個線程阻塞于此處。
而關(guān)閉線程通常又有以下兩種:
- 立即關(guān)閉:執(zhí)行關(guān)閉方法后不管現(xiàn)在線程池的運行狀況图柏,直接一刀切全部停掉序六,這樣會導(dǎo)致任務(wù)丟失。
- 不接受新的任務(wù)蚤吹,同時等待現(xiàn)有任務(wù)執(zhí)行完畢后退出線程池例诀。
立即關(guān)閉
我們先來看第一種立即關(guān)閉
:
/**
* 立即關(guān)閉線程池,會造成任務(wù)丟失
*/
public void shutDownNow() {
isShutDown.set(true);
tryClose(false);
}
/**
* 關(guān)閉線程池
*
* @param isTry true 嘗試關(guān)閉 --> 會等待所有任務(wù)執(zhí)行完畢
* false 立即關(guān)閉線程池--> 任務(wù)有丟失的可能
*/
private void tryClose(boolean isTry) {
if (!isTry) {
closeAllTask();
} else {
if (isShutDown.get() && totalTask.get() == 0) {
closeAllTask();
}
}
}
/**
* 關(guān)閉所有任務(wù)
*/
private void closeAllTask() {
for (Worker worker : workers) {
//LOGGER.info("開始關(guān)閉");
worker.close();
}
}
public void close() {
thread.interrupt();
}
很容易看出裁着,最終就是遍歷線程池里所有的 worker
線程挨個執(zhí)行他們的中斷函數(shù)繁涂。
我們來測試一下:
可以發(fā)現(xiàn)后面丟進去的三個任務(wù)其實是沒有被執(zhí)行的。
完事后關(guān)閉
而正常關(guān)閉則不一樣:
/**
* 任務(wù)執(zhí)行完畢后關(guān)閉線程池
*/
public void shutdown() {
isShutDown.set(true);
tryClose(true);
}
他會在這里多了一個判斷二驰,需要所有任務(wù)都執(zhí)行完畢之后才會去中斷線程扔罪。
同時在線程需要回收時都會嘗試關(guān)閉線程:
來看看實際效果:
回收線程
上文或多或少提到了線程回收的事情,其實總結(jié)就是以下兩點:
- 一旦執(zhí)行了
shutdown/shutdownNow
方法都會將線程池的狀態(tài)置為關(guān)閉狀態(tài)桶雀,這樣只要worker
線程嘗試從隊列里獲取任務(wù)時就會直接返回空矿酵,導(dǎo)致worker
線程被回收。
- 一旦線程池大小超過了核心線程數(shù)就會使用北撤福活時間來從隊列里獲取任務(wù)坏瘩,所以一旦獲取不到返回
null
時就會觸發(fā)回收。
但如果我們的隊列足夠大漠魏,導(dǎo)致線程數(shù)都不會超過核心線程數(shù)倔矾,這樣是不會觸發(fā)回收的。
比如這里我將隊列大小調(diào)為 10 ,這樣任務(wù)就會累計在隊列里哪自,不會創(chuàng)建五個 worker
線程丰包。
所以一直都是 Thread-1~3
這三個線程在反復(fù)調(diào)度任務(wù)。
總結(jié)
本次實現(xiàn)了線程池里大部分核心功能壤巷,我相信只要看完并動手敲一遍一定會對線程池有不一樣的理解邑彪。
結(jié)合目前的內(nèi)容來總結(jié)下:
- 線程池、隊列大小要設(shè)計的合理胧华,盡量的讓任務(wù)從隊列中獲取執(zhí)行寄症。
- 慎用
shutdownNow()
方法關(guān)閉線程池,會導(dǎo)致任務(wù)丟失(除非業(yè)務(wù)允許)矩动。 - 如果任務(wù)多有巧,線程執(zhí)行時間短可以調(diào)大
keepalive
值,使得線程盡量不被回收從而可以復(fù)用線程悲没。
同時下次會分享一些線程池的新特性篮迎,如:
- 執(zhí)行帶有返回值的線程。
- 異常處理怎么辦示姿?
- 所有任務(wù)執(zhí)行完怎么通知我甜橱?
本文所有源碼:
你的點贊與分享是對我最大的支持