任何一種語言豹芯、框架悄雅,線程都是非常重要的一部分。要想實現(xiàn)異步就需要通過異步線程铁蹈,但是頻繁地創(chuàng)建銷毀線程會帶來較大的性能開銷宽闲,而線程池就是為解決這一問題而出現(xiàn)的。簡單來說線程池有以下幾大優(yōu)勢:
- 降低資源開銷:通過復(fù)用已經(jīng)創(chuàng)建的線程,降低線程頻繁創(chuàng)建容诬、銷毀帶來的資源開銷和性能損耗
- 快速啟動任務(wù):通過復(fù)用已有線程娩梨,快速啟動任務(wù)
- 易于管理:線程池可以統(tǒng)一管理、分配览徒、調(diào)優(yōu)和監(jiān)控
Java中的線程池是基于ThreadPoolExecutor
實現(xiàn)的狈定,我們使用的ExecutorService
的各種線程池策略都是基于ThreadPoolExecutor
實現(xiàn)的,所以ThreadPoolExecutor
十分重要吱殉。要弄明白各種線程池策略掸冤,必須先弄明白ThreadPoolExecutor
。
1 創(chuàng)建線程池
首先來看下線程池的創(chuàng)建:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- corePoolSize 核心線程池大小
- maximumPoolSize 線程池最大容量大小
- keepAliveTime 線程池空閑時友雳,線程存活的時間
- TimeUnit 時間單位
- ThreadFactory 線程工廠
- BlockingQueue任務(wù)隊列
- RejectedExecutionHandler 線程拒絕策略
ThreadPoolExecutor的基本流程如下:
- 當(dāng)用戶通過submit或者execute提交任務(wù)時稿湿,如果當(dāng)前線程池中線程數(shù)小于
corePoolSize
,直接創(chuàng)建一個線程執(zhí)行任務(wù) - 如果當(dāng)前線程數(shù)大于
corePoolSize
,則將任務(wù)加入到BlockingQueue
中 - 如果
BlockingQueue
也滿了押赊,在小于MaxPoolSize
的情況下創(chuàng)建線程執(zhí)行任務(wù) - 如果線程數(shù)大于等于
MaxPoolSize
饺藤,那么執(zhí)行拒絕策略RejectedExecutionHandler
- 當(dāng)線程池中超過
corePoolSiz
e線程,空閑時間達到keepAliveTime
時流礁,關(guān)閉空閑線程
2 線程池狀態(tài)
ThreadPoolExecutor
內(nèi)部有多個狀態(tài)涕俗,理解線程池內(nèi)部狀態(tài)對于理解線程池原理至關(guān)重要,所以接下來看下線程池的狀態(tài):
/*
* runState是整個線程池的運行生命周期狀態(tài)神帅,有如下取值:
* 1. RUNNING:可以新加線程再姑,同時可以處理queue中的線程。
* 2. SHUTDOWN:不增加新線程找御,但是處理queue中的線程元镀。
* 3. STOP 不增加新線程,同時不處理queue中的線程霎桅。
* 4. TIDYING 所有的線程都終止了(queue中)栖疑,同時workerCount為0,那么此時進入TIDYING
* 5. terminated()方法結(jié)束滔驶,變?yōu)門ERMINATED
* The runState provides the main lifecyle control, taking on values:
*
* RUNNING: Accept new tasks and process queued tasks
* SHUTDOWN: Don't accept new tasks, but process queued tasks
* STOP: Don't accept new tasks, don't process queued tasks,
* and interrupt in-progress tasks
* TIDYING: All tasks have terminated, workerCount is zero,
* the thread transitioning to state TIDYING
* will run the terminated() hook method
* TERMINATED: terminated() has completed
*
* The numerical order among these values matters, to allow
* ordered comparisons. The runState monotonically increases over
* time, but need not hit each state. The transitions are:
* 狀態(tài)的轉(zhuǎn)化主要是:
* RUNNING -> SHUTDOWN(調(diào)用shutdown())
* On invocation of shutdown(), perhaps implicitly in finalize()
* (RUNNING or SHUTDOWN) -> STOP(調(diào)用shutdownNow())
* On invocation of shutdownNow()
* SHUTDOWN -> TIDYING(queue和pool均empty)
* When both queue and pool are empty
* STOP -> TIDYING(pool empty遇革,此時queue已經(jīng)為empty)
* When pool is empty
* TIDYING -> TERMINATED(調(diào)用terminated())
* When the terminated() hook method has completed
*
* Threads waiting in awaitTermination() will return when the
* state reaches TERMINATED.
*
* Detecting the transition from SHUTDOWN to TIDYING is less
* straightforward than you'd like because the queue may become
* empty after non-empty and vice versa during SHUTDOWN state, but
* we can only terminate if, after seeing that it is empty, we see
* that workerCount is 0 (which sometimes entails a recheck -- see
* below).
*/
runState的存儲也值得一說,它并不是用一個單獨的int或者enum進行存儲揭糕,而是和線程數(shù)workerCount共同保存到一個原子量ctl中:
//利用ctl來保證當(dāng)前線程池的狀態(tài)和當(dāng)前的線程的數(shù)量萝快。ps:低29位為線程池容量,高3位為線程狀態(tài)著角。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//設(shè)定偏移量
private static final int COUNT_BITS = Integer.SIZE - 3;
//確定最大的容量2^29-1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//幾個狀態(tài)揪漩,用Integer的高三位表示
// runState is stored in the high-order bits
//111
private static final int RUNNING = -1 << COUNT_BITS;
//000
private static final int SHUTDOWN = 0 << COUNT_BITS;
//001
private static final int STOP = 1 << COUNT_BITS;
//010
private static final int TIDYING = 2 << COUNT_BITS;
//011
private static final int TERMINATED = 3 << COUNT_BITS;
//獲取線程池狀態(tài),取前三位
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
//獲取當(dāng)前正在工作的worker,主要是取后面29位
private static int workerCountOf(int c) { return c & CAPACITY; }
//獲取ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
- 通過調(diào)用
runStateOf()
方法獲取當(dāng)前線程池狀態(tài) - 通過調(diào)用
workerCountOf()
獲取當(dāng)前線程數(shù)
3 添加任務(wù)
向線程池添加任務(wù)一般通過execute
或者submit
方法添加雇寇,接下來通過execute
方法介紹下添加任務(wù)的原理:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//當(dāng)前的Worker的數(shù)量小于核心線程池大小時氢拥,新建一個Worker線程執(zhí)行該任務(wù)蚌铜。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果worker數(shù)量已經(jīng)大于核心線程數(shù),嘗試將任務(wù)添加到任務(wù)隊列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))//recheck防止線程池狀態(tài)的突變嫩海,如果突變冬殃,那么將reject線程,防止workQueue中增加新線程
reject(command);
else if (workerCountOf(recheck) == 0)//上下兩個操作都有addWorker的操作叁怪,但是如果在workQueue.offer的時候Worker變?yōu)?审葬,
//那么將沒有Worker執(zhí)行新的task,所以增加一個Worker.
addWorker(null, false);
}
//如果workQueue滿了奕谭,那么這時候可能還沒到線程池的maximum涣觉,所以嘗試增加一個Worker
else if (!addWorker(command, false))
reject(command);//如果Worker數(shù)量到達上限,那么就拒絕此線程
}
可以看到execute
方法內(nèi)部的核心邏輯在于添加工作線程addWorker
方法血柳,所以接下來看下addWorker
:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/**
* rs!=Shutdown || fistTask官册!=null || workCount.isEmpty
* 如果當(dāng)前的線程池的狀態(tài)>SHUTDOWN 那么拒絕Worker的add
* 如果=SHUTDOWN,那么此時不能新加入不為null的Task促王,如果在WorkCount為empty的時候不能加入任何 * 類型的Worker农曲,
* 如果不為empty可以加入task為null的Worker,增加消費的Worker
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//如果當(dāng)前線程數(shù)已經(jīng)超標,直接返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//如果線程數(shù)沒有超標欧聘,則嘗試通過CAS將workercount加一根吁,如果成功直接跳出循環(huán)
if (compareAndIncrementWorkerCount(c))
break retry;
//如果失敗员淫,對狀態(tài)進行double check,如果狀態(tài)已改變則重試
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//接下來開始真正創(chuàng)建新的線程
//創(chuàng)建一個新的worker線程
Worker w = new Worker(firstTask);
Thread t = w.thread;
//獲取鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
int rs = runStateOf(c);
/**
* rs!=SHUTDOWN ||firstTask!=null
*
* 同樣檢測當(dāng)rs>SHUTDOWN時直接拒絕減小Wc击敌,同時Terminate介返,如果為SHUTDOWN同時firstTask不為null的時候也要Terminate
*/
if (t == null ||
(rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null))) {
decrementWorkerCount();
tryTerminate();
return false;
}
//將新建的worker線程加入到workers數(shù)組中
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
} finally {
mainLock.unlock();
}
//新建線程開始執(zhí)行
t.start();
// It is possible (but unlikely) for a thread to have been
// added to workers, but not yet started, during transition to
// STOP, which could result in a rare missed interrupt,
// because Thread.interrupt is not guaranteed to have any effect
// on a non-yet-started Thread (see Thread#interrupt).
//若此時線程池狀態(tài)變?yōu)镾TOP,但當(dāng)前線程并未interrupt,執(zhí)行interrupt
if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())
t.interrupt();
return true;
}
整個addWorker方法大致分為兩大階段:
- workerCount++:此時并不創(chuàng)建真正的線程沃斤,而僅僅是通過CAS操作把workerCount加一
- 創(chuàng)建線程:創(chuàng)建worker線程圣蝎,將其加入到workers隊列中,并根據(jù)狀態(tài)對線程進行不同操作
3.1 workerCount++
workerCount
++操作主要涉及上述代碼中標號retry
覆蓋的代碼轰枝,主要邏輯有以下三大部分:
- 根據(jù)線程池當(dāng)前狀態(tài)判斷是否可以添加線程捅彻,如果不能添加直接返回false:
- 如果當(dāng)前的線程池的狀態(tài)>SHUTDOWN 那么拒絕Worker的add
- 如果=SHUTDOWN,那么此時不能新加入不為null的Task组去,如果在WorkCount為empty的時候不能加入任何類型的Worker
- 如果不為empty可以加入task為null的Worker,增加消費的Worker
- 根據(jù)當(dāng)前worker數(shù)判斷是否可以添加線程:
- 如果core為true鞍陨,且當(dāng)前worker數(shù)超過corePoolSize則不允許添加線程
- 如果core為fasle,且worker數(shù)超過maximumPoolSize則不允許添加線程
- 通過
compareAndIncrementWorkerCount
執(zhí)行workerCount++操作从隆,如果成功跳出循環(huán)诚撵;如果失敗對當(dāng)前狀態(tài)進行doubleCheck,如果狀態(tài)改變重新回到步驟1键闺,如果狀態(tài)不變重新回到步驟2
3.2 創(chuàng)建線程
創(chuàng)建線程的操作主要分為以下幾個步驟:
- 創(chuàng)建一個worker線程實例
- 獲取當(dāng)前線程池鎖進行互斥操作
- 對線程池狀態(tài)再次進行判斷寿烟。同樣檢測當(dāng)rs>SHUTDOWN時直接拒絕減小Wc,同時Terminate辛燥,如果為SHUTDOWN同時firstTask不為null的時候也要Terminate
- 將線程加入線程隊列中筛武,釋放鎖
- 執(zhí)行線程
- 若此時線程池狀態(tài)變?yōu)镾TOP,但當(dāng)前線程并未interrupt缝其,執(zhí)行interrupt
4 Worker
在第3節(jié)中看到添加的線程是通過Worker實現(xiàn)的,所以接下來看下Worker這個類:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
private static final long serialVersionUID = 6138294804551838833L;
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable var2) {
this.setState(-1);
this.firstTask = var2;
this.thread = ThreadPoolExecutor.this.getThreadFactory().newThread(this);
}
......
public void run() {
ThreadPoolExecutor.this.runWorker(this);
}
}
可以看到Worker實現(xiàn)了Runnable接口徘六,并在內(nèi)部維護了一個線程變量内边,看到這里其實Worker的大致邏輯明顯了,無非是維護一個線程實例待锈,執(zhí)行添加的runnable實例漠其。
4.1runWorker
在addWorker方法中,Worker實例創(chuàng)建好后會就會執(zhí)行其thread變量的start方法竿音,進而也就會執(zhí)行Worker的run方法:
public void run() {
ThreadPoolExecutor.this.runWorker(this);
}
所以接下來看下ThreadPoolExecutor的runWorker方法:
final void runWorker(Worker w) {
Runnable task = w.firstTask;
w.firstTask = null;
//標識線程是不是異常終止的
boolean completedAbruptly = true;
try {
//task不為null情況是初始化worker時和屎,如果task為null,則去隊列中取線程--->getTask()
while (task != null || (task = getTask()) != null) {
w.lock();
//獲取woker的鎖春瞬,防止線程被其他線程中斷
clearInterruptsForTaskRun();//清楚所有中斷標記
try {
beforeExecute(w.thread, task);//線程開始執(zhí)行之前執(zhí)行此方法柴信,可以實現(xiàn)Worker未執(zhí)行退出,本類中未實現(xiàn)
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);//線程執(zhí)行后執(zhí)行宽气,可以實現(xiàn)標識Worker異常中斷的功能颠印,本類中未實現(xiàn)
}
} finally {
task = null;//運行過的task標null,方便GC
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//處理worker退出的邏輯
processWorkerExit(w, completedAbruptly);
}
}
整個方法的邏輯比較簡單:
- task不為null情況是初始化worker時抹竹,如果task為null线罕,則去隊列中取線程--->getTask()
- 獲取woker的鎖,防止線程被其他線程中斷
- 線程開始執(zhí)行之前執(zhí)行beforeExecute方法窃判,可以實現(xiàn)Worker未執(zhí)行退出钞楼,本類中未實現(xiàn)
- 執(zhí)行任務(wù)
- 線程執(zhí)行后執(zhí)行,可以實現(xiàn)標識Worker異常中斷的功能袄琳,本類中未實現(xiàn)
- 處理worker退出的邏輯
4.2 getTask
接下來再來看看runWorker中的getTask方法:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//當(dāng)前狀態(tài)為>stop時询件,不處理workQueue中的任務(wù),同時減小worker的數(shù)量所以返回null唆樊,如果為shutdown 同時workQueue已經(jīng)empty了宛琅,同樣減小worker數(shù)量并返回null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
boolean timed; // Are workers subject to culling?
for (;;) {
int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc > corePoolSize;
if (wc <= maximumPoolSize && ! (timedOut && timed))
break;
if (compareAndDecrementWorkerCount(c))
return null;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
這段代碼十分關(guān)鍵,首先看幾個局部變量:
boolean timedOut = false;//主要是判斷后面的poll是否要超時
boolean timed;//主要是標識著當(dāng)前Worker超時是否要退出
wc > corePoolSize時需要減小空閑的Worker數(shù)逗旁,那么timed為true嘿辟,但是wc <= corePoolSize時,不能減小核心線程數(shù)timed為false片效。
timedOut初始為false红伦,如果timed為true那么使用poll取線程。如果正常返回淀衣,那么返回取到的task昙读。如果超時,證明worker空閑膨桥,同時worker超過了corePoolSize蛮浑,需要刪除唠叛。返回r=null。則 timedOut = true沮稚。此時循環(huán)到wc <= maximumPoolSize && ! (timedOut && timed)
時玻墅,減小worker數(shù),并返回null壮虫,導(dǎo)致worker退出澳厢。如果線程數(shù)<= corePoolSize,那么此時調(diào)用 workQueue.take()囚似,沒有線程獲取到時將一直阻塞剩拢,直到獲取到線程或者中斷。