作為Executor框架中最核心的類,ThreadPoolExecutor代表著鼎鼎大名的線程池黔州,它給了我們足夠的理由來弄清楚它耍鬓。
下面我們就通過源碼來一步一步弄清楚它。
內(nèi)部狀態(tài)
線程有五種狀態(tài):新建流妻,就緒牲蜀,運(yùn)行,阻塞绅这,死亡涣达,
線程池同樣有五種狀態(tài):Running, SHUTDOWN, STOP, TIDYING, TERMINATED。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
變量ctl定義為AtomicInteger ,其功能非常強(qiáng)大度苔,記錄了“線程池中的任務(wù)數(shù)量”和“線程池的狀態(tài)”兩個信息匆篓。共32位,其中高3位表示”線程池狀態(tài)”寇窑,低29位表示”線程池中的任務(wù)數(shù)量”鸦概。
RUNNING -- 對應(yīng)的高3位值是111。
SHUTDOWN -- 對應(yīng)的高3位值是000甩骏。
STOP -- 對應(yīng)的高3位值是001窗市。
TIDYING -- 對應(yīng)的高3位值是010。
TERMINATED -- 對應(yīng)的高3位值是011饮笛。
RUNNING:處于RUNNING狀態(tài)的線程池能夠接受新任務(wù)咨察,以及對新添加的任務(wù)進(jìn)行處理。
SHUTDOWN:處于SHUTDOWN狀態(tài)的線程池不可以接受新任務(wù)福青,但是可以對已添加的任務(wù)進(jìn)行處理摄狱。
STOP:處于STOP狀態(tài)的線程池不接收新任務(wù),不處理已添加的任務(wù)素跺,并且會中斷正在處理的任務(wù)二蓝。
TIDYING:當(dāng)所有的任務(wù)已終止,ctl記錄的”任務(wù)數(shù)量”為0指厌,線程池會變?yōu)門IDYING狀態(tài)刊愚。當(dāng)線程池變?yōu)門IDYING狀態(tài)時,會執(zhí)行鉤子函數(shù)terminated()踩验。terminated()在ThreadPoolExecutor類中是空的鸥诽,若用戶想在線程池變?yōu)門IDYING時,進(jìn)行相應(yīng)的處理箕憾;可以通過重載terminated()函數(shù)來實現(xiàn)牡借。
TERMINATED:線程池徹底終止的狀態(tài)。
各個狀態(tài)的轉(zhuǎn)換如下:
創(chuàng)建線程池
我們可以通過ThreadPoolExecutor構(gòu)造函數(shù)來創(chuàng)建一個線程池:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
共有七個參數(shù)袭异,每個參數(shù)含義如下:
corePoolSize
線程池中核心線程的數(shù)量钠龙。當(dāng)提交一個任務(wù)時,線程池會新建一個線程來執(zhí)行任務(wù)御铃,直到當(dāng)前線程數(shù)等于corePoolSize碴里。如果調(diào)用了線程池的prestartAllCoreThreads()方法,線程池會提前創(chuàng)建并啟動所有基本線程上真。
maximumPoolSize
線程池中允許的最大線程數(shù)咬腋。線程池的阻塞隊列滿了之后,如果還有任務(wù)提交睡互,如果當(dāng)前的線程數(shù)小于maximumPoolSize根竿,則會新建線程來執(zhí)行任務(wù)陵像。注意,如果使用的是無界隊列寇壳,該參數(shù)也就沒有什么效果了醒颖。
keepAliveTime
線程空閑的時間。線程的創(chuàng)建和銷毀是需要代價的九巡。線程執(zhí)行完任務(wù)后不會立即銷毀图贸,而是繼續(xù)存活一段時間:keepAliveTime蹂季。默認(rèn)情況下冕广,該參數(shù)只有在線程數(shù)大于corePoolSize時才會生效。
unit
keepAliveTime的單位偿洁。TimeUnit
workQueue
用來保存等待執(zhí)行的任務(wù)的阻塞隊列撒汉,等待的任務(wù)必須實現(xiàn)Runnable接口。我們可以選擇如下幾種:
- ArrayBlockingQueue:基于數(shù)組結(jié)構(gòu)的有界阻塞隊列涕滋,F(xiàn)IFO睬辐。【死磕Java并發(fā)】—-J.U.C之阻塞隊列:ArrayBlockingQueue
- LinkedBlockingQueue:基于鏈表結(jié)構(gòu)的有界阻塞隊列,F(xiàn)IFO宾肺。
- SynchronousQueue:不存儲元素的阻塞隊列溯饵,每個插入操作都必須等待一個移出操作,反之亦然锨用。【死磕Java并發(fā)】—-J.U.C之阻塞隊列:SynchronousQueue
- PriorityBlockingQueue:具有優(yōu)先界別的阻塞隊列丰刊。【死磕Java并發(fā)】—-J.U.C之阻塞隊列:PriorityBlockingQueue
threadFactory
用于設(shè)置創(chuàng)建線程的工廠。該對象可以通過Executors.defaultThreadFactory()增拥,如下:
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
返回的是DefaultThreadFactory對象啄巧,源碼如下:
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
ThreadFactory的左右就是提供創(chuàng)建線程的功能的線程工廠。他是通過newThread()方法提供創(chuàng)建線程的功能掌栅,newThread()方法創(chuàng)建的線程都是“非守護(hù)線程”而且“線程優(yōu)先級都是Thread.NORM_PRIORITY”秩仆。
handler
RejectedExecutionHandler,線程池的拒絕策略猾封。所謂拒絕策略澄耍,是指將任務(wù)添加到線程池中時,線程池拒絕該任務(wù)所采取的相應(yīng)策略晌缘。當(dāng)向線程池中提交任務(wù)時齐莲,如果此時線程池中的線程已經(jīng)飽和了,而且阻塞隊列也已經(jīng)滿了枚钓,則線程池會選擇一種拒絕策略來處理該任務(wù)铅搓。
線程池提供了四種拒絕策略:
- AbortPolicy:直接拋出異常,默認(rèn)策略搀捷;
- CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù)星掰;
- DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務(wù)多望,并執(zhí)行當(dāng)前任務(wù);
- DiscardPolicy:直接丟棄任務(wù)氢烘;
當(dāng)然我們也可以實現(xiàn)自己的拒絕策略怀偷,例如記錄日志等等,實現(xiàn)RejectedExecutionHandler接口即可播玖。
線程池
Executor框架提供了三種線程池椎工,他們都可以通過工具類Executors來創(chuàng)建。
FixedThreadPool
FixedThreadPool蜀踏,可重用固定線程數(shù)的線程池维蒙,其定義如下:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
corePoolSize 和 maximumPoolSize都設(shè)置為創(chuàng)建FixedThreadPool時指定的參數(shù)nThreads,意味著當(dāng)線程池滿時且阻塞隊列也已經(jīng)滿時果覆,如果繼續(xù)提交任務(wù)颅痊,則會直接走拒絕策略,該線程池不會再新建線程來執(zhí)行任務(wù)局待,而是直接走拒絕策略斑响。FixedThreadPool使用的是默認(rèn)的拒絕策略,即AbortPolicy钳榨,則直接拋出異常舰罚。
keepAliveTime設(shè)置為0L,表示空閑的線程會立刻終止薛耻。
workQueue則是使用LinkedBlockingQueue营罢,但是沒有設(shè)置范圍,那么則是最大值(Integer.MAX_VALUE)昭卓,這基本就相當(dāng)于一個無界隊列了愤钾。使用該“無界隊列”則會帶來哪些影響呢?當(dāng)線程池中的線程數(shù)量等于corePoolSize 時候醒,如果繼續(xù)提交任務(wù)能颁,該任務(wù)會被添加到阻塞隊列workQueue中,當(dāng)阻塞隊列也滿了之后倒淫,則線程池會新建線程執(zhí)行任務(wù)直到maximumPoolSize伙菊。由于FixedThreadPool使用的是“無界隊列”LinkedBlockingQueue,那么maximumPoolSize參數(shù)無效敌土,同時指定的拒絕策略AbortPolicy也將無效镜硕。而且該線程池也不會拒絕提交的任務(wù),如果客戶端提交任務(wù)的速度快于任務(wù)的執(zhí)行返干,那么keepAliveTime也是一個無效參數(shù)兴枯。
其運(yùn)行圖如下(參考《Java并發(fā)編程的藝術(shù)》):
SingleThreadExecutor
SingleThreadExecutor是使用單個worker線程的Executor,定義如下:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
作為單一worker線程的線程池矩欠,SingleThreadExecutor把corePool和maximumPoolSize均被設(shè)置為1财剖,和FixedThreadPool一樣使用的是無界隊列LinkedBlockingQueue,所以帶來的影響和FixedThreadPool一樣悠夯。
CachedThreadPool
CachedThreadPool是一個會根據(jù)需要創(chuàng)建新線程的線程池 ,他定義如下:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
CachedThreadPool的corePool為0躺坟,maximumPoolSize為Integer.MAX_VALUE沦补,這就意味著所有的任務(wù)一提交就會加入到阻塞隊列中。keepAliveTime這是為60L咪橙,unit設(shè)置為TimeUnit.SECONDS夕膀,意味著空閑線程等待新任務(wù)的最長時間為60秒,空閑線程超過60秒后將會被終止美侦。阻塞隊列采用的SynchronousQueue产舞,而我們在【死磕Java并發(fā)】—-J.U.C之阻塞隊列:SynchronousQueue中了解到SynchronousQueue是一個沒有元素的阻塞隊列,加上corePool = 0 音榜,maximumPoolSize = Integer.MAX_VALUE庞瘸,這樣就會存在一個問題捧弃,如果主線程提交任務(wù)的速度遠(yuǎn)遠(yuǎn)大于CachedThreadPool的處理速度赠叼,則CachedThreadPool會不斷地創(chuàng)建新線程來執(zhí)行任務(wù),這樣有可能會導(dǎo)致系統(tǒng)耗盡CPU和內(nèi)存資源违霞,所以在使用該線程池是嘴办,一定要注意控制并發(fā)的任務(wù)數(shù),否則創(chuàng)建大量的線程可能導(dǎo)致嚴(yán)重的性能問題买鸽。
任務(wù)提交
線程池根據(jù)業(yè)務(wù)不同的需求提供了兩種方式提交任務(wù):Executor.execute()涧郊、ExecutorService.submit()。其中ExecutorService.submit()可以獲取該任務(wù)執(zhí)行的Future眼五。
我們以Executor.execute()為例妆艘,來看看線程池的任務(wù)提交經(jīng)歷了那些過程。
定義:
public interface Executor {
void execute(Runnable command);
}
ThreadPoolExecutor提供實現(xiàn):
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
執(zhí)行流程如下:
- 如果線程池當(dāng)前線程數(shù)小于corePoolSize看幼,則調(diào)用addWorker創(chuàng)建新線程執(zhí)行任務(wù)批旺,成功返回true,失敗執(zhí)行步驟2诵姜。
- 如果線程池處于RUNNING狀態(tài)汽煮,則嘗試加入阻塞隊列,如果加入阻塞隊列成功棚唆,則嘗試進(jìn)行Double Check暇赤,如果加入失敗,則執(zhí)行步驟3宵凌。
- 如果線程池不是RUNNING狀態(tài)或者加入阻塞隊列失敗鞋囊,則嘗試創(chuàng)建新線程直到maxPoolSize,如果失敗瞎惫,則調(diào)用reject()方法運(yùn)行相應(yīng)的拒絕策略溜腐。
在步驟2中如果加入阻塞隊列成功了坯门,則會進(jìn)行一個Double Check的過程。Double Check過程的主要目的是判斷加入到阻塞隊里中的線程是否可以被執(zhí)行逗扒。如果線程池不是RUNNING狀態(tài)古戴,則調(diào)用remove()方法從阻塞隊列中刪除該任務(wù),然后調(diào)用reject()方法處理任務(wù)矩肩。否則需要確保還有線程執(zhí)行现恼。
addWorker
當(dāng)線程中的當(dāng)前線程數(shù)小于corePoolSize,則調(diào)用addWorker()創(chuàng)建新線程執(zhí)行任務(wù)黍檩,當(dāng)前線程數(shù)則是根據(jù)ctl變量來獲取的叉袍,調(diào)用workerCountOf(ctl)獲取低29位即可:
private static int workerCountOf(int c) { return c & CAPACITY; }
addWorker(Runnable firstTask, boolean core)方法用于創(chuàng)建線程執(zhí)行任務(wù),源碼如下:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 獲取當(dāng)前線程狀態(tài)
int rs = runStateOf(c);
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 內(nèi)層循環(huán)刽酱,worker + 1
for (;;) {
// 線程數(shù)量
int wc = workerCountOf(c);
// 如果當(dāng)前線程數(shù)大于線程最大上限CAPACITY return false
// 若core == true喳逛,則與corePoolSize 比較,否則與maximumPoolSize 棵里,大于 return false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// worker + 1,成功跳出retry循環(huán)
if (compareAndIncrementWorkerCount(c))
break retry;
// CAS add worker 失敗润文,再次讀取ctl
c = ctl.get();
// 如果狀態(tài)不等于之前獲取的state,跳出內(nèi)層循環(huán)殿怜,繼續(xù)去外層循環(huán)判斷
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 新建線程:Worker
w = new Worker(firstTask);
// 當(dāng)前線程
final Thread t = w.thread;
if (t != null) {
// 獲取主鎖:mainLock
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 線程狀態(tài)
int rs = runStateOf(ctl.get());
// rs < SHUTDOWN ==> 線程處于RUNNING狀態(tài)
// 或者線程處于SHUTDOWN狀態(tài)典蝌,且firstTask == null(可能是workQueue中仍有未執(zhí)行完成的任務(wù),創(chuàng)建沒有初始任務(wù)的worker線程執(zhí)行)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 當(dāng)前線程已經(jīng)啟動头谜,拋出異常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// workers是一個HashSet<Worker>
workers.add(w);
// 設(shè)置最大的池大小largestPoolSize骏掀,workerAdded設(shè)置為true
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
// 釋放鎖
mainLock.unlock();
}
// 啟動線程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 線程啟動失敗
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
-
判斷當(dāng)前線程是否可以添加任務(wù),如果可以則進(jìn)行下一步柱告,否則return false截驮;
- rs >= SHUTDOWN ,表示當(dāng)前線程處于SHUTDOWN 际度,STOP葵袭、TIDYING、TERMINATED狀態(tài)
- rs == SHUTDOWN , firstTask != null時不允許添加線程甲脏,因為線程處于SHUTDOWN 狀態(tài)眶熬,不允許添加任務(wù)
- rs == SHUTDOWN , firstTask == null,但workQueue.isEmpty() == true块请,不允許添加線程娜氏,因為firstTask == null是為了添加一個沒有任務(wù)的線程然后再從workQueue中獲取任務(wù)的,如果workQueue == null墩新,則說明添加的任務(wù)沒有任何意義贸弥。
內(nèi)嵌循環(huán),通過CAS worker + 1
獲取主鎖mailLock海渊,如果線程池處于RUNNING狀態(tài)獲取處于SHUTDOWN狀態(tài)且 firstTask == null绵疲,則將任務(wù)添加到workers Queue中哲鸳,然后釋放主鎖mainLock,然后啟動線程盔憨,然后return true徙菠,如果中途失敗導(dǎo)致workerStarted= false,則調(diào)用addWorkerFailed()方法進(jìn)行處理郁岩。
在這里需要好好理論addWorker中的參數(shù)婿奔,在execute()方法中,有三處調(diào)用了該方法:
第一次:workerCountOf(c) < corePoolSize ==> addWorker(command, true)问慎,這個很好理解萍摊,當(dāng)然線程池的線程數(shù)量小于 corePoolSize ,則新建線程執(zhí)行任務(wù)即可如叼,在執(zhí)行過程core == true冰木,內(nèi)部與corePoolSize比較即可。
第二次:加入阻塞隊列進(jìn)行Double Check時笼恰,else if (workerCountOf(recheck) == 0) ==>addWorker(null, false)硬毕。如果線程池中的線程==0人乓,按照道理應(yīng)該該任務(wù)應(yīng)該新建線程執(zhí)行任務(wù)镇辉,但是由于已經(jīng)該任務(wù)已經(jīng)添加到了阻塞隊列佛掖,那么就在線程池中新建一個空線程,然后從阻塞隊列中取線程即可猴仑。
第三次:線程池不是RUNNING狀態(tài)或者加入阻塞隊列失敗:else if (!addWorker(command, false))肥哎,這里core == fase辽俗,則意味著是與maximumPoolSize比較。
在新建線程執(zhí)行任務(wù)時篡诽,將講Runnable包裝成一個Worker崖飘,Woker為ThreadPoolExecutor的內(nèi)部類
Woker內(nèi)部類
Woker的源碼如下:
private final class Worker extends AbstractQueuedSynchronizer
implements Runnable {
private static final long serialVersionUID = 6138294804551838833L;
// task 的thread
final Thread thread;
// 運(yùn)行的任務(wù)task
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
//設(shè)置AQS的同步狀態(tài)private volatile int state,是一個計數(shù)器杈女,大于0代表鎖已經(jīng)被獲取
setState(-1);
this.firstTask = firstTask;
// 利用ThreadFactory和 Worker這個Runnable創(chuàng)建的線程對象
this.thread = getThreadFactory().newThread(this);
}
// 任務(wù)執(zhí)行
public void run() {
runWorker(this);
}
}
從Worker的源碼中我們可以看到Woker繼承AQS朱浴,實現(xiàn)Runnable接口,所以可以認(rèn)為Worker既是一個可以執(zhí)行的任務(wù)达椰,也可以達(dá)到獲取鎖釋放鎖的效果翰蠢。這里繼承AQS主要是為了方便線程的中斷處理。這里注意兩個地方:構(gòu)造函數(shù)啰劲、run()梁沧。構(gòu)造函數(shù)主要是做三件事:1.設(shè)置同步狀態(tài)state為-1,同步狀態(tài)大于0表示就已經(jīng)獲取了鎖蝇裤,2.設(shè)置將當(dāng)前任務(wù)task設(shè)置為firstTask廷支,3.利用Worker本身對象this和ThreadFactory創(chuàng)建線程對象频鉴。
當(dāng)線程thread啟動(調(diào)用start()方法)時,其實就是執(zhí)行Worker的run()方法恋拍,內(nèi)部調(diào)用runWorker()垛孔。
runWorker
final void runWorker(Worker w) {
// 當(dāng)前線程
Thread wt = Thread.currentThread();
// 要執(zhí)行的任務(wù)
Runnable task = w.firstTask;
w.firstTask = null;
// 釋放鎖,運(yùn)行中斷
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
// worker 獲取鎖
w.lock();
// 確保只有當(dāng)線程是stoping時施敢,才會被設(shè)置為中斷似炎,否則清楚中斷標(biāo)示
// 如果線程池狀態(tài) >= STOP ,且當(dāng)前線程沒有設(shè)置中斷狀態(tài),則wt.interrupt()
// 如果線程池狀態(tài) < STOP悯姊,但是線程已經(jīng)中斷了羡藐,再次判斷線程池是否 >= STOP,如果是 wt.interrupt()
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 自定義方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 執(zhí)行任務(wù)
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
// 完成任務(wù)數(shù) + 1
w.completedTasks++;
// 釋放鎖
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
運(yùn)行流程
- 根據(jù)worker獲取要執(zhí)行的任務(wù)task悯许,然后調(diào)用unlock()方法釋放鎖仆嗦,這里釋放鎖的主要目的在于中斷,因為在new Worker時先壕,設(shè)置的state為-1瘩扼,調(diào)用unlock()方法可以將state設(shè)置為0,這里主要原因就在于interruptWorkers()方法只有在state >= 0時才會執(zhí)行垃僚;
- 通過getTask()獲取執(zhí)行的任務(wù)集绰,調(diào)用task.run()執(zhí)行,當(dāng)然在執(zhí)行之前會調(diào)用worker.lock()上鎖谆棺,執(zhí)行之后調(diào)用worker.unlock()放鎖栽燕;
- 在任務(wù)執(zhí)行前后,可以根據(jù)業(yè)務(wù)場景自定義beforeExecute() 和 afterExecute()方法改淑,則兩個方法在ThreadPoolExecutor中是空實現(xiàn)碍岔;
- 如果線程執(zhí)行完成,則會調(diào)用getTask()方法從阻塞隊列中獲取新任務(wù)朵夏,如果阻塞隊列為空蔼啦,則根據(jù)是否超時來判斷是否需要阻塞;
- task == null或者拋出異常(beforeExecute()仰猖、task.run()捏肢、afterExecute()均有可能)導(dǎo)致worker線程終止,則調(diào)用processWorkerExit()方法處理worker退出流程饥侵。
getTask()
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
// 線程池狀態(tài)
int c = ctl.get();
int rs = runStateOf(c);
// 線程池中狀態(tài) >= STOP 或者 線程池狀態(tài) == SHUTDOWN且阻塞隊列為空鸵赫,則worker - 1,return null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 判斷是否需要超時控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 從阻塞隊列中獲取task
// 如果需要超時控制爆捞,則調(diào)用poll()奉瘤,否則調(diào)用take()
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
timed == true,調(diào)用poll()方法,如果在keepAliveTime時間內(nèi)還沒有獲取task的話盗温,則返回null藕赞,繼續(xù)循環(huán)。timed == false卖局,則調(diào)用take()方法斧蜕,該方法為一個阻塞方法,沒有任務(wù)時會一直阻塞掛起砚偶,直到有任務(wù)加入時對該線程喚醒批销,返回任務(wù)。
在runWorker()方法中染坯,無論最終結(jié)果如何均芽,都會執(zhí)行processWorkerExit()方法對worker進(jìn)行退出處理。
processWorkerExit()
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// true:用戶線程運(yùn)行異常,需要扣減
// false:getTask方法中扣減線程數(shù)量
if (completedAbruptly)
decrementWorkerCount();
// 獲取主鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 從HashSet中移出worker
workers.remove(w);
} finally {
mainLock.unlock();
}
// 有worker線程移除单鹿,可能是最后一個線程退出需要嘗試終止線程池
tryTerminate();
int c = ctl.get();
// 如果線程為running或shutdown狀態(tài)掀宋,即tryTerminate()沒有成功終止線程池,則判斷是否有必要一個worker
if (runStateLessThan(c, STOP)) {
// 正常退出仲锄,計算min:需要維護(hù)的最小線程數(shù)量
if (!completedAbruptly) {
// allowCoreThreadTimeOut 默認(rèn)false:是否需要維持核心線程的數(shù)量
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果min ==0 或者workerQueue為空劲妙,min = 1
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 如果線程數(shù)量大于最少數(shù)量min,直接返回儒喊,不需要新增線程
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 添加一個沒有firstTask的worker
addWorker(null, false);
}
}
首先completedAbruptly的值來判斷是否需要對線程數(shù)-1處理镣奋,如果completedAbruptly == true,說明在任務(wù)運(yùn)行過程中出現(xiàn)了異常怀愧,那么需要進(jìn)行減1處理侨颈,否則不需要,因為減1處理在getTask()方法中處理了掸驱。然后從HashSet中移出該worker肛搬,過程需要獲取mainlock。然后調(diào)用tryTerminate()方法處理毕贼,該方法是對最后一個線程退出做終止線程池動作。如果線程池沒有終止蛤奢,那么線程池需要保持一定數(shù)量的線程鬼癣,則通過addWorker(null,false)新增一個空的線程。
addWorkerFailed()
在addWorker()方法中啤贩,如果線程t==null待秃,或者在add過程出現(xiàn)異常,會導(dǎo)致workerStarted == false痹屹,那么在最后會調(diào)用addWorkerFailed()方法:
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 從HashSet中移除該worker
if (w != null)
workers.remove(w);
// 線程數(shù) - 1
decrementWorkerCount();
// 嘗試終止線程
tryTerminate();
} finally {
mainLock.unlock();
}
}
整個邏輯顯得比較簡單章郁。
tryTerminate()
當(dāng)線程池涉及到要移除worker時候都會調(diào)用tryTerminate(),該方法主要用于判斷線程池中的線程是否已經(jīng)全部移除了,如果是的話則關(guān)閉線程池暖庄。
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 線程池處于Running狀態(tài)
// 線程池已經(jīng)終止了
// 線程池處于ShutDown狀態(tài)聊替,但是阻塞隊列不為空
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 執(zhí)行到這里,就意味著線程池要么處于STOP狀態(tài)培廓,要么處于SHUTDOWN且阻塞隊列為空
// 這時如果線程池中還存在線程惹悄,則會嘗試中斷線程
if (workerCountOf(c) != 0) {
// /線程池還有線程,但是隊列沒有任務(wù)了肩钠,需要中斷喚醒等待任務(wù)的線程
// (runwoker的時候首先就通過w.unlock設(shè)置線程可中斷泣港,getTask最后面的catch處理中斷)
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 嘗試終止線程池
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
// 線程池狀態(tài)轉(zhuǎn)為TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
}
}
在關(guān)閉線程池的過程中,如果線程池處于STOP狀態(tài)或者處于SHUDOWN狀態(tài)且阻塞隊列為null价匠,則線程池會調(diào)用interruptIdleWorkers()方法中斷所有線程当纱,注意ONLY_ONE== true,表示僅中斷一個線程踩窖。
interruptIdleWorkers
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
onlyOne==true僅終止一個線程坡氯,否則終止所有線程。
線程終止
線程池ThreadPoolExecutor提供了shutdown()和shutDownNow()用于關(guān)閉線程池毙石。
shutdown():按過去執(zhí)行已提交任務(wù)的順序發(fā)起一個有序的關(guān)閉廉沮,但是不接受新任務(wù)。
shutdownNow() :嘗試停止所有的活動執(zhí)行任務(wù)徐矩、暫停等待任務(wù)的處理滞时,并返回等待執(zhí)行的任務(wù)列表。
shutdown
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 推進(jìn)線程狀態(tài)
advanceRunState(SHUTDOWN);
// 中斷空閑的線程
interruptIdleWorkers();
// 交給子類實現(xiàn)
onShutdown();
} finally {
mainLock.unlock();
}
tryTerminate();
}
shutdownNow
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
// 中斷所有線程
interruptWorkers();
// 返回等待執(zhí)行的任務(wù)列表
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
與shutdown不同滤灯,shutdownNow會調(diào)用interruptWorkers()方法中斷所有線程坪稽。
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
同時會調(diào)用drainQueue()方法返回等待執(zhí)行到任務(wù)列表。
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}