一模聋、為何要使用線程池
在Java中肩民,要使用多線程,除了使用new Thread()
之外链方,還可以使用線程池ExecutorService
持痰。
// 使用Thread
Thread t = new Thread(new Runnable() {
@Override
public void run() {
// ...
}
});
t.start();
// 使用線程池
ExecutorService es = Executors.newSingleThreadExecutor();
es.execute(new Runnable() {
@Override
public void run() {
// ...
}
});
線程池主要解決了兩個(gè)問(wèn)題:
- 頻繁創(chuàng)建銷毀線程的開(kāi)銷
- 任務(wù)的管理
在異步任務(wù)比較多時(shí),創(chuàng)建祟蚀、銷毀線程會(huì)占用很多系統(tǒng)資源工窍;這時(shí)候,使用線程池前酿,就可以實(shí)現(xiàn)線程的復(fù)用患雏,讓人專注于任務(wù)的實(shí)現(xiàn),而不是管理線程罢维。
二淹仑、線程池簡(jiǎn)介
1. 什么是線程池
線程池(本文特指ThreadPoolExecutor
類)顧名思義,就是一個(gè)裝了線程的池子。線程池創(chuàng)建和管理若干線程匀借,在需要使用的時(shí)候可以直接從線程池中取出來(lái)使用颜阐,在任務(wù)結(jié)束之后閑置等待復(fù)用,或者銷毀吓肋。
線程池中的線程分為兩種:核心線程和普通線程凳怨。核心線程即線程池中長(zhǎng)期存活的線程,即使閑置下來(lái)也不會(huì)被銷毀是鬼,需要使用的時(shí)候可以直接拿來(lái)用肤舞。而普通線程則有一定的壽命,如果閑置時(shí)間超過(guò)壽命均蜜,則這個(gè)線程就會(huì)被銷毀李剖。
查看ThreadPoolExecutor
類的其中一個(gè)典型的構(gòu)造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
線程池的具體行為和幾個(gè)參數(shù)有關(guān):
-
核心數(shù) corePoolSize
線程池中核心線程的數(shù)量。 -
最大容量 maximumPoolSize
線程池最大允許保留多少線程兆龙。 -
超時(shí)時(shí)間 keepAliveTime
線程池中普通線程的存活時(shí)間。
2. 線程池的使用
線程池的一般使用步驟如下:
- 使用
Executors
中的工廠方法來(lái)獲取ExecutorService
實(shí)例敲董; - 使用
ExecutorService
的execute(runnable)
或者submit(runnable)
方法來(lái)添加任務(wù)紫皇。
ExecutorService es = Executors.newSingleThreadExecutor();
es.execute(new Runnable() {
@Override
public void run() {
String response = new HttpUtil().get("http://littlefogcat.top");
System.out.println(response);
}
});
3. 線程池的分類
在Executors
工廠類中提供了多種線程池,典型的有以下四種:
1. SingleThreadExecutor 單線程線程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
核心線程數(shù)為1腋寨,最大線程數(shù)為1聪铺,也就是說(shuō)SingleThreadExecutor
這個(gè)線程池中的線程數(shù)固定為1。使用場(chǎng)景:當(dāng)多個(gè)任務(wù)都需要訪問(wèn)同一個(gè)資源的時(shí)候萄窜。
2. FixedThreadPool 固定容量線程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
核心線程數(shù)為n铃剔,最大線程數(shù)為n。使用場(chǎng)景:明確同時(shí)執(zhí)行任務(wù)數(shù)量時(shí)查刻。
3. CachedThreadPool 緩存線程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
核心線程數(shù)為0键兜,最大線程數(shù)無(wú)上限,線程超時(shí)時(shí)間60秒穗泵。使用場(chǎng)景:處理大量耗時(shí)較短的任務(wù)普气。
4. ScheduledThreadPool 定時(shí)線程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
/*
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
*/
核心線程數(shù)自定,最大線程數(shù)無(wú)上限佃延。使用場(chǎng)景:處理延時(shí)任務(wù)现诀。
可以看到,這四個(gè)方法都返回了一個(gè)ThreadPoolExecutor
對(duì)象(ScheduledThreadPoolExecutor
是其子類)履肃,僅僅是其中的參數(shù)略有不同仔沿。所以接下來(lái)就對(duì)ThreadPoolExecutor
類進(jìn)行解析。
我將這四種常見(jiàn)的線程池總結(jié)了一個(gè)表格:
三尺棋、線程池的工作流程
1. 典型的線程池使用方式
一個(gè)典型的線程池使用方式如下:
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new Runnable() {
@Override
public void run() {
// do sth
}
});
這里就以ThreadPoolExecutor.execute(runnable)
方法切入封锉,分析線程池的工作流程。
在ThreadPoolExecutor.execute(runnable)
方法的注釋中寫道:
Executes the given task sometime in the future. The task
may execute in a new thread or in an existing pooled thread.
If the task cannot be submitted for execution, either because this
executor has been shutdown or because its capacity has been reached,
the task is handled by the current {@code RejectedExecutionHandler}.
簡(jiǎn)單來(lái)說(shuō)就是將這個(gè)傳入的runnable對(duì)象提交到線程池中,等待執(zhí)行烘浦;如果線程池關(guān)閉抖坪,或者容量到上限不可以執(zhí)行了,那么就無(wú)法提交闷叉,會(huì)交給線程池的RejectedExecutionHandler
進(jìn)行處理(這個(gè)RejectedExecutionHandler
在構(gòu)造方法中傳入擦俐,或者通過(guò)setRejectedExecutionHandler(handler)
方法指定)。
2. 線程池工作流程
線程池的工作流程還是比較清晰的握侧,具體的源碼分析在第四節(jié)中蚯瞧,本節(jié)只做簡(jiǎn)要說(shuō)明。
2.1 添加任務(wù)
當(dāng)調(diào)用ThreadPoolExecutor.execute(runnable)
的時(shí)候品擎,會(huì)進(jìn)行以下判斷(這里不考慮延時(shí)任務(wù)):
- 如果線程池中埋合,運(yùn)行的線程數(shù)少于核心線程數(shù)(corePoolSize),那么就新建一個(gè)線程萄传,并執(zhí)行該任務(wù)甚颂。
- 如果線程池中,運(yùn)行的線程數(shù)大于等于corePoolSize秀菱,將線程添加到待執(zhí)行隊(duì)列中振诬,等待執(zhí)行;
- 如果2中添加到隊(duì)列失敗衍菱,那么就新建一個(gè)非核心線程赶么,并在該線程執(zhí)行該任務(wù);
- 如果當(dāng)前線程數(shù)已經(jīng)達(dá)到最大線程數(shù)(maximumPoolSize)脊串,那么拒絕這個(gè)任務(wù)辫呻。
這里有個(gè)問(wèn)題,什么情況下琼锋,任務(wù)會(huì)添加失敗呢放闺?這個(gè)問(wèn)題會(huì)在下面第四節(jié)源碼分析中workQueue部分說(shuō)明。
2.2 執(zhí)行任務(wù)
在2.1添加任務(wù)中缕坎,添加失敗自然不必執(zhí)行雄人,會(huì)直接拒絕任務(wù);任務(wù)添加成功有兩種情況:
- 將任務(wù)添加到任務(wù)隊(duì)列念赶;
- 新建線程執(zhí)行任務(wù)础钠。
新建線程自不必說(shuō),主要看看添加到任務(wù)隊(duì)列中的任務(wù)是如何被執(zhí)行的叉谜。
從2.1中我們知道旗吁,每一個(gè)工作線程必然是被一個(gè)任務(wù)喚醒的,這個(gè)任務(wù)被稱作初始任務(wù)(firstTask)停局。當(dāng)一個(gè)工作線程完了它的初始任務(wù)之后很钓,會(huì)從待執(zhí)行的任務(wù)隊(duì)列(workQueue)中取新的任務(wù)香府。workQueue是一個(gè)阻塞隊(duì)列,線程會(huì)一直等待直到有新的任務(wù)到來(lái)為止码倦。對(duì)于一個(gè)設(shè)置了超時(shí)時(shí)間的線程企孩,如果在指定的時(shí)間之后仍然沒(méi)有新任務(wù)到達(dá),那么這個(gè)線程就會(huì)停止等待任務(wù)并且銷毀袁稽。
四勿璃、線程池中的一些重要概念
Worker / workers
Worker
類是ThreadPoolExecutor
類的一個(gè)內(nèi)部類,也是線程池管理操作線程的核心所在推汽。每一個(gè)worker都對(duì)應(yīng)著一個(gè)thread补疑,所以在不混淆的情況下,可以把worker理解為工作線程歹撒。
ThreadPoolExecutor
有一個(gè)名為workers
的成員變量莲组,它保存了這個(gè)線程池所有存活的worker對(duì)象。
workQueue
workQueue
是線程池內(nèi)部用來(lái)保存待執(zhí)行任務(wù)的隊(duì)列暖夭。它是一個(gè)BlockingQueue<Runnable>
類型的變量锹杈,在沒(méi)有任務(wù)的時(shí)候,它的poll()
方法會(huì)阻塞迈着。
在一個(gè)允許超時(shí)的worker執(zhí)行完任務(wù)之后竭望,會(huì)調(diào)用workQueue.poll()
取出下一個(gè)任務(wù)執(zhí)行。如果沒(méi)有任務(wù)寥假,則會(huì)在這里阻塞市框;當(dāng)阻塞時(shí)間達(dá)到超時(shí)時(shí)間后霞扬,這個(gè)工作線程會(huì)退出并銷毀糕韧。
五、通過(guò)源碼詳細(xì)分析線程池
1. ctl
ThreadPoolExecutor
通過(guò)一個(gè)原子整型ctl
來(lái)保存線程池的兩個(gè)重要字段喻圃,workerCount和runState萤彩。workerCount即線程池工作線程的數(shù)量,而runState代表了線程池當(dāng)前的狀態(tài)(如:運(yùn)行中斧拍、關(guān)閉雀扶、終止)。通過(guò)位運(yùn)算肆汹,可以從ctl
得到workerCount和runState的值愚墓,反之也可以通過(guò)workerCount和runState組合得到ctl
。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
顯然昂勉,這跟Android中的MesureSpec通過(guò)一個(gè)整數(shù)來(lái)保存兩個(gè)屬性原理是相同的浪册。
2. execute(runnable)方法
本節(jié)所有流程都以ThreadPoolExecutor.execute(runnable)
方法切入,分析線程池的源碼:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
可以看到岗照,這個(gè)方法很簡(jiǎn)單村象,正如三-2.1小節(jié)所說(shuō)的一樣笆环,在添加任務(wù)時(shí)做一些判斷。在ThreadPoolExecutor
中厚者,有一個(gè)隊(duì)列workQueue
保存了待執(zhí)行的任務(wù)躁劣。而當(dāng)需要新建線程的時(shí)候,則執(zhí)行addWorker(runnable, core)
方法來(lái)創(chuàng)建一個(gè)worker/線程库菲。
因?yàn)檫@個(gè)方法是線程池執(zhí)行的核心账忘,所以下面重點(diǎn)理解這個(gè)方法里面的語(yǔ)句。
3. workQueue / Worker
workQueue是ThreadPoolExecutor
類的一個(gè)非常重要的成員變量蝙昙。在2中闪萄,我們知道了,當(dāng)正在執(zhí)行的線程數(shù)量大于核心線程數(shù)奇颠,那么會(huì)優(yōu)先將任務(wù)添加到任務(wù)隊(duì)列败去,即workQueue中。
通過(guò)execute(runnable)
方法可以知道烈拒,對(duì)于一個(gè)處在運(yùn)行中的線程池圆裕,只有在當(dāng)前工作線程數(shù)量大于等于核心數(shù)時(shí),才會(huì)將任務(wù)往隊(duì)列中添加荆几。并且吓妆,如果往任務(wù)隊(duì)列添加失敗的話,就會(huì)開(kāi)啟新的工作線程吨铸。
那么回到第三節(jié)中的問(wèn)題行拢,什么情況下會(huì)添加失敗呢?注意這一句:
if (isRunning(c) && workQueue.offer(command)) {
// ...
}
很簡(jiǎn)單诞吱,當(dāng)workQueue.offer(command)
返回false的時(shí)候舟奠,則說(shuō)明添加失敗。一般來(lái)說(shuō)房维,當(dāng)隊(duì)列的容量滿了沼瘫,offer方法就會(huì)返回false。即咙俩,在線程數(shù)超過(guò)了核心數(shù)(workerCount>corePoolSize)的情況下耿戚,只有在任務(wù)隊(duì)列被填滿之后,線程池才會(huì)考慮創(chuàng)建新線程阿趁,否則只會(huì)將任務(wù)添加到任務(wù)隊(duì)列中等待執(zhí)行膜蛔。在線程池的構(gòu)造方法中傳入不同的隊(duì)列類型,就會(huì)有不同的效果脖阵≡砉桑回到Executors
工廠類中,看看四種基本的線程池分別都是使用的什么隊(duì)列独撇?
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
// ScheduledThreadPoolExecutor的構(gòu)造方法
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
SingleThreadExecutor
核心數(shù)和最大線程數(shù)均為1屑墨,使用LinkedBlockingQueue躁锁,容量為Interger.MAX_VALUE
。也就是說(shuō)卵史,SingleThreadExecutor中永遠(yuǎn)只有一個(gè)線程战转,所有任務(wù)單線執(zhí)行,并且容量無(wú)上限以躯。CachedThreadPool
核心數(shù)為0槐秧,最大線程數(shù)Interger.MAX_VALUE,使用SynchronousQueue
忧设。這個(gè)隊(duì)列的特點(diǎn)是刁标,沒(méi)有內(nèi)部容量。也就是說(shuō)址晕,對(duì)于一個(gè)新任務(wù)膀懈,但凡是沒(méi)有空閑的線程,那么就創(chuàng)建一個(gè)新的線程谨垃。而由于核心數(shù)是0启搂,當(dāng)超過(guò)一定時(shí)間沒(méi)有新任務(wù)之后,線程池中所有線程都將被銷毀刘陶。FixedThreadPool
和SingleThreadExecutor類似胳赌,使用LinkedBlockingQueue
;不同的是核心數(shù)和最大線程數(shù)為n匙隔。ScheduledThreadPoolExecutor
使用DelayedWorkQueue
疑苫,可以實(shí)現(xiàn)延時(shí)/定時(shí)獲取任務(wù)。
看完這里纷责,就能很好的理解Executors
中的這些線程池為何能夠呈現(xiàn)出各自的特性了捍掺。
在第四節(jié)中我們知道,對(duì)于線程的操作等碰逸,不是直接通過(guò)Thread來(lái)進(jìn)行的乡小,而一般是通過(guò)Worker類進(jìn)行阔加。每一個(gè)Worker對(duì)應(yīng)了一個(gè)線程饵史,任務(wù)的添加、執(zhí)行等胜榔,都是通過(guò)Worker來(lái)實(shí)現(xiàn)的胳喷。ThreadPoolExecutor
中有一個(gè)HashSet<Worker>
類型的變量workers
,用來(lái)保存可用的Worker夭织。也就是說(shuō)吭露,我們所謂的“線程池”實(shí)際本質(zhì)上就是“Worker池”。由于Worker
和Thread
是一對(duì)一的關(guān)系尊惰,所以為了圖方便讲竿,有時(shí)候可以簡(jiǎn)單的把Worker
理解成一個(gè)工作線程泥兰,但需要知道其本質(zhì)上與真正的線程Thread
是不同的。
Worker
類是ThreadPoolExecutor
的一個(gè)內(nèi)部類题禀,繼承自AbstractQueuedSynchronizer
鞋诗,實(shí)現(xiàn)了Runnable
接口:
// ...略去一部分
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
}
真實(shí)運(yùn)行的線程,是worker.thread
迈嘹,在Worker
構(gòu)造方法中削彬,worker.thread
通過(guò)工廠方法創(chuàng)建。而線程肯定是要調(diào)用start()
方法運(yùn)行的秀仲,搜索一下worker.thread
的start()
方法融痛,發(fā)現(xiàn)是在ThreadPoolExecutor.addWorker()
這個(gè)方法里調(diào)用的。
在下面的第4小節(jié)中神僵,會(huì)專門分析這個(gè)addWorker(runnable, core)
方法雁刷。
另一方面,Worker本質(zhì)上又是一個(gè)Runnable對(duì)象保礼,是一個(gè)可運(yùn)行任務(wù)安券,在真實(shí)線程worker.thread
啟動(dòng)后,會(huì)調(diào)用其run()
方法:
// Worker中
public void run() {
runWorker(this);
}
4. addWorker(runnable, boolean)方法
線程池創(chuàng)建工作線程是通過(guò)addWorker
方法來(lái)進(jìn)行的氓英。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
雖然這段代碼有點(diǎn)長(zhǎng)侯勉,但是所做的事情其實(shí)只有兩件:
- 檢查是否應(yīng)該添加這個(gè)worker:只有在線程池處于正在運(yùn)行的狀態(tài)(runState==RUNNING),并且當(dāng)前worker數(shù)小于最大容量時(shí)铝阐,才能添加址貌;
- 新建worker對(duì)象并添加到workers中。
5. runWorker(Worker)方法
在3中我們得知徘键,當(dāng)Worker的線程開(kāi)始運(yùn)行之后练对,會(huì)調(diào)用其run()方法:
// Worker中
public void run() {
runWorker(this);
}
而run()又會(huì)調(diào)用ThreadPoolExecutor.runWorker(Worker)
方法。在這里看一下這個(gè)方法吹害。
// 省略一大部分
final void runWorker(Worker w) {
Runnable task = w.firstTask;
while (task != null || (task = getTask()) != null) {
try {
task.run();
} catch (Exception x) {
}
}
}
// 省略一大部分
private Runnable getTask() {
for (;;) {
if (/*無(wú)法獲取任務(wù)*/) {
return null;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
} catch (InterruptedException retry) {
}
}
}
為了便于觀看螟凭,我刪去了大部分代碼,只留了核心的幾行它呀÷菽校可以看到,在Worker的任務(wù)執(zhí)行完畢之后纵穿,會(huì)再?gòu)膚orkQueue隊(duì)列中獲取新的任務(wù)下隧,按此無(wú)限循環(huán)。什么時(shí)候Worker會(huì)結(jié)束并銷毀呢谓媒?從這一句while (task != null || (task = getTask()) != null)
中淆院,即worker中沒(méi)有任務(wù),并且getTast()
返回null句惯,worker就會(huì)結(jié)束執(zhí)行土辩。什么時(shí)候返回null支救,不讓worker繼續(xù)存活了呢?
- 線程池被shutdown拷淘,并且任務(wù)隊(duì)列空了搂妻;
- 線程池超容量;
- 超時(shí)辕棚;
也就是說(shuō)欲主,如果線程池在運(yùn)行狀態(tài),容量也沒(méi)有到最大逝嚎,并且任務(wù)隊(duì)列還有任務(wù)扁瓢,這個(gè)worker就會(huì)永遠(yuǎn)運(yùn)行下去。
六补君、總結(jié)
就用圖片來(lái)總結(jié)一下引几。
下圖闡述了線程池調(diào)用execute(runnable)
之后的流程。
這張圖表示了execute
之后的調(diào)用鏈挽铁,相當(dāng)于Worker的生命周期了(不包括銷毀)伟桅。