一、前言
線程池是開發(fā)中繞不開的一個知識點 改衩。
對于移動開發(fā)而言岖常,網(wǎng)絡框架、圖片加載燎字、AsyncTask腥椒、RxJava, 都和線程池有關。
正因為線程池應用如此廣泛候衍,所以也成了面試的高頻考點笼蛛。
我們今天就來講講線程池的基本原理和周邊知識。
先從線程的生命周期開始蛉鹿。
二滨砍、線程生命周期
線程是程序執(zhí)行流的最小單元。
Java線程可分為五個階段:
- 新建(New): 創(chuàng)建Thread對象妖异,并且未調用start()惋戏;
- 就緒(Runnable): 調用start()之后, 等待操作系統(tǒng)調度;
- 運行(Running): 獲取CPU時間分片他膳,執(zhí)行 run()方法中的代碼响逢;
- 阻塞(Blocked): 線程讓出CPU,進入等待(就緒)棕孙;
- 終止(Terminated): 自然退出或者被終止舔亭。
線程的創(chuàng)建和銷毀代價較高,當有大量的任務時蟀俊,可復用線程钦铺,以提高執(zhí)行任務的時間占比。
如上圖肢预,不斷地 Runnable->Runing->Blocked->Runnable, 就可避免過多的線程創(chuàng)建和銷毀矛洞。
此外,線程的上下文切換也是開銷比較大的烫映,若要使用線程池沼本,需注意設置合理的參數(shù)噩峦,控制線程并發(fā)。
三抽兆、ThreadPoolExecutor
JDK提供了一個很好用的線程池的封裝:ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize:核心線程大小
maximumPoolSize:線程池最大容量(需大于等于corePoolSize壕探,否則會拋異常)
keepAliveTime:線程執(zhí)行任務結束之后的存活時間
unit:時間單位
workQueue:任務隊列
threadFactory:線程工廠
handler:拒絕策略
線程池中有兩個任務容器:
private final HashSet<Worker> workers = new HashSet<Worker>();
private final BlockingQueue<Runnable> workQueue;
前者用于存儲Worker,后者用于緩沖任務(Runnable)郊丛。
下面是execute方法的簡要代碼:
public void execute(Runnable command) {
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
}
// 若workQueue已滿,offer會返回false
if (isRunning(c) && workQueue.offer(command)) {
// ...
} else if (!addWorker(command, false))
reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
int wc = workerCountOf(c);
if (wc >= (core ? corePoolSize : maximumPoolSize))
return false;
Worker w = new Worker(firstTask);
final Thread t = w.thread;
workers.add(w);
t.start();
}
一個任務到來瞧筛,假設此時容器workers中Worker數(shù)的數(shù)量為c厉熟,則
- 1、當c < corePoolSize時较幌,創(chuàng)建Worker來執(zhí)行這個任務揍瑟,并放入workers;
- 當c >= corePoolSize時,
- 2乍炉、若workQueue未滿绢片,則將任務放入workQueue
- 若workQueue已滿,
- 3岛琼、若c < maximumPoolSize,創(chuàng)建Worker來執(zhí)行這個任務底循,并放入workers;
- 4槐瑞、若c >= maximumPoolSize, 執(zhí)行拒絕策略熙涤。
很多人在講線程池的時候,干脆把workers說成“線程池”困檩,將Worker和線程混為一談祠挫;
不過這也無妨,能幫助理解就好悼沿,就像看到一杯水等舔,說“這是水”一樣,很少人會說這是“杯子裝著水”糟趾。
Worker和線程慌植,好比汽車和引擎:汽車裝著引擎,但汽車的行駛拉讯,其實是引擎在做功涤浇。
Worker本身實現(xiàn)了Runnable,然后有一個Thread和Runnable的成員魔慷;
構造函數(shù)中只锭,將自身(this)委托給自己的成員thread;
當thread.start()院尔, Worker的run()函數(shù)被回調蜻展,從而開啟 “執(zhí)行任務-獲取任務”的輪回喉誊。
private final class Worker implements Runnable{
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
}
final void runWorker(Worker w) {
Runnable task = w.firstTask;
while (task != null || (task = getTask()) != null) {
task.run();
}
}
private Runnable getTask() {
for (;;) {
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
}
}
當線程執(zhí)行完任務(task.run()結束),會嘗試去workQueue取下一個任務纵顾,
如果workQueue已經(jīng)清空伍茄,則線程進入阻塞態(tài):workQueue是阻塞隊列,如果取不到元素會block當前線程施逾。
此時敷矫,allowCoreThreadTimeOut為true, 或者 n > corePoolSize,workQueue等待keepAliveTime的時間汉额,
如果時間到了還沒有任務進來曹仗, 則退出循環(huán), 線程銷毀蠕搜;
否則怎茫,一直等待,直到新的任務到來(或者線程池關閉)妓灌。
這就是線程池可以保留corePoolSize個線程存活的原理轨蛤。
從線程的角度,要么執(zhí)行任務虫埂,要么阻塞等待祥山,或者銷毀;
從任務的角度掉伏,要么馬上被執(zhí)行枪蘑,要么進入隊列等待被執(zhí)行,或者被拒絕執(zhí)行岖免。
上圖第2步岳颇,任務進入workQueue, 如果隊列為空且有空閑的Worker的話,可馬上得到執(zhí)行颅湘。
關于workQueue话侧,常用的有以下隊列:
-
LinkedBlockingQueue(capacity)
傳入capacity(大于0), 則LinkedBlockingQueue的容量為capacity;
如果不傳闯参,默認為Integer.MAX_VALUE瞻鹏,相當于無限容量(不考慮內存因素),多少元素都裝不滿鹿寨。 -
SynchronousQueue
除非另一個線程試圖獲取元素新博,否則不能添加元素。 -
DelayedWorkQueue
DelayedWorkQueue是一個優(yōu)先級的隊列脚草,用于ScheduledThreadPoolExecutor(執(zhí)行定時任務)赫悄;
其實現(xiàn)用到了最小堆,取出任務時,能夠從對頂獲取最早需要執(zhí)行的任務埂淮。
四姑隅、 ExecutorService
為了方便使用,JDK還封裝了一些常用的ExecutorService:
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
}
值得注意的, 最后類 ExecutorService 返回的是ScheduledThreadPoolExecutor(ThreadPoolExecutor的子類)倔撞。
ScheduledThreadPoolExecutor的阻塞隊列是DelayedWorkQueue:
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor
implements ScheduledExecutorService {
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
}
類型 | 最大并發(fā) | 適用場景 |
---|---|---|
newFixedThreadPool | nThreads | 計算密集型任務 |
newSingleThreadExecutor | 1 | 串行執(zhí)行的任務 |
newCachedThreadPool | Integer.MAX_VALUE | IO密集型任務 |
newScheduledThreadPool | Integer.MAX_VALUE | 定時任務讲仰,周期任務 |
newSingleThreadExecutor 其實是 newFixedThreadPool的特例 (nThreads=1),
寫日志等任務痪蝇,比較適合串行執(zhí)行鄙陡,一者不會占用太多資源,二者為保證日志有序與完整躏啰,同一時間一個線程寫入即可柔吼。
眾多方法中,newCachedThreadPool() 是比較特別的丙唧,
1、corePoolSize = 0觅玻,
2想际、maximumPoolSize = Integer.MAX_VALUE,
3溪厘、workQueue 為 SynchronousQueue胡本。
結合上一節(jié)的分析:
當一個任務提交過來,由于corePoolSize = 0畸悬,任務會嘗試放入workQueue侧甫;
如果沒有線程在嘗試從workQueue獲取任務,offer()會返回false蹋宦,然后會創(chuàng)建線程執(zhí)行任務披粟;
如果有空閑線程在等待任務,任務可以放進workQueue冷冗,但是放進去后馬上就被等待任務的線程取走執(zhí)行了守屉。
總的來說,就是有空閑線程則交給空閑線程執(zhí)行蒿辙,沒有則創(chuàng)建線程執(zhí)行拇泛;
SynchronousQueue類型workQueue并不保存任務,只是一個傳遞者思灌。
所以俺叭,最終效果為:所有任務立即調度,無容量限制泰偿,無并發(fā)限制熄守。
這樣的特點比較適合網(wǎng)絡請求任務。
OkHttp的異步請求所用線程池與此類似(除了ThreadFactory ,其他參數(shù)一模一樣)柠横。
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
五窃款、 任務并發(fā)的估算
一臺設備上,給定一批任務牍氛,要想最快時間完成所有任務晨继,并發(fā)量應該如何控制?
并發(fā)量太小搬俊,CPU利用率不高紊扬;
并發(fā)量太大,CPU 滿負荷唉擂,但是花在線程切換的時間增加餐屎,用于執(zhí)行任務的時間反而減少。
一些文章提到如下估算公式:
M:并發(fā)數(shù)玩祟;
C:任務占用CPU的時間键兜;
I:等待IO完成的時間(為簡化討論,且只考慮IO)凡泣;
N:CPU核心數(shù)赔嚎。
代入特定參數(shù)驗證這條公式:
1、比方說 I 接近于0转锈,則M≈N盘寡,一個線程對應一個CPU,剛好滿負荷且較少線程切換撮慨;
2竿痰、假如 I=C,則M = 2N砌溺,兩個線程對應一個CPU影涉,每個線程一半時間在等待IO,一半時間在計算规伐,也是剛好常潮。
遺憾的是,對于APP而言這條公式并不適用:
-
任務占用CPU時間和IO時間無法估算
APP上的異步任務通常是碎片化的楷力,而不同的任務性質不一樣喊式,有的計算耗時多,有的IO耗時多萧朝;
然后同樣是IO任務岔留,比方說網(wǎng)絡請求,IO時間也是不可估計的(受服務器和網(wǎng)速影響)检柬。 -
可用CPU核心可能會變化
有的設備可能會考慮省電或者熱量控制而關閉一些核心献联;
大家經(jīng)常吐槽的“一核有難竖配,九核圍觀”映射的就是這種現(xiàn)象。
雖然該公式不能直接套用來求解最大并發(fā)里逆,但仍有一些指導意義:
IO等待時間較多进胯,則需要高的并發(fā),來達到高的吞吐率原押;
CPU計算部分較多胁镐,則需要降低并發(fā),來提高CPU的利用率诸衔。
換言之盯漂,就是:
做計算密集型任務時控制并發(fā)小一點;
做IO密集型任務時控制并發(fā)大一點笨农。
問題來了就缆,小一點是多小,大一點又是多大呢谒亦?
說實話這個只能憑經(jīng)驗了竭宰,跟“多吃水果”,“加鹽少許”一樣份招,看實際情況而定切揭。
比如RxJava就提供了Schedulers.computation()和Schedulers.io(),
前者默認情況下為最大并發(fā)為CPU核心數(shù)脾还,后者最大并發(fā)為Integer.MAX_VALUE(相當于不限制并發(fā))。
可能是作者也不知道多少才合適入愧,所以干脆就不限制了鄙漏。
這樣其實很危險的,JVM對進程有最大線程數(shù)限制棺蛛,超過則會拋OutOfMemoryError怔蚌。
六、總結
回顧文章的內容旁赊,大概有這些點:
- 介紹了線程的生命周期桦踊;
- 從線程池的參數(shù)入手,分析這些參數(shù)是如何影響線程池的運作终畅;
- 列舉常用的ExecutorService籍胯,介紹其各自特點和適用場景;
- 對并發(fā)估算的一些理解离福。
文章沒有對Java線程池做太過深入的探討杖狼,而是從使用的角度講述基本原理和周邊知識;
第二節(jié)有結合關鍵代碼作簡要分析妖爷,也是點到為止蝶涩,目的在于加深對線程池相關參數(shù)的理解,
以便在平時使用線程池的時候合理斟酌,在閱讀涉及線程池的開源代碼時也能“知其所以然”绿聘。