基礎(chǔ)
學(xué)習(xí)一個(gè)類昼钻,我們應(yīng)該先從其字段開始。首先看看ThreadPoolExecutor對(duì)應(yīng)的屬性有哪些仅财。
private volatile int corePoolSize; // 核心線程數(shù)盏求,線程池在阻塞獲取任務(wù)時(shí)可以保持永久存活的線程的最大值亿眠。當(dāng)線程池內(nèi)的線程超過(guò)此值的線程會(huì)通過(guò)poll(keepAliveTime)獲取任務(wù)
private volatile int maximumPoolSize; // 線程池中允許的最大的線程數(shù)纳像,這里使用volatile修飾竟趾,保證多線程下的可見性
private volatile long keepAliveTime; // Woker從workQueue獲取任務(wù)的最大等待時(shí)間岔帽,超過(guò)這個(gè)時(shí)間后,worker會(huì)被回收掉(run方法執(zhí)行完畢鞋邑,線程不可復(fù)生)
private final BlockingQueue<Runnable> workQueue; // 提交的任務(wù)的排隊(duì)隊(duì)列枚碗,這是一個(gè)接口铸本,通過(guò)不同的策略實(shí)現(xiàn)不同的線程池機(jī)制
private int largestPoolSize; // 線程池中最大的pool size箱玷,只會(huì)增加不會(huì)減少,其是一個(gè)統(tǒng)計(jì)信息
private final HashSet<Worker> workers = new HashSet<Worker>(); // 內(nèi)部運(yùn)行的Worker存放的地方波丰,通過(guò)mainLock保證線程安全
private final ReentrantLock mainLock = new ReentrantLock(); //內(nèi)部的一個(gè)獨(dú)占鎖掰烟,主要保證線程池的一些統(tǒng)計(jì)信息(最大的線程數(shù)纫骑、完成的任務(wù)數(shù))和worker添加到集合的安全性
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //線程安全類型,最高位為符號(hào)位发框,次高3位為狀態(tài)值梅惯,低28位為當(dāng)前的線程數(shù)
private volatile boolean allowCoreThreadTimeOut; // 是否允許核心線程從阻塞隊(duì)列獲取任務(wù)時(shí)銷毀个唧。默認(rèn)為false
private volatile ThreadFactory threadFactory; // 內(nèi)部為worker提供任務(wù)執(zhí)行的線程的生成工廠设预。我們通過(guò)自定義的工廠來(lái)使得業(yè)務(wù)日志更為清晰或者執(zhí)行不同的業(yè)務(wù)邏輯
private volatile RejectedExecutionHandler handler; // 拒絕策略鳖枕,默認(rèn)拒絕策略為拋出異常宾符。線程池的拒絕策略是策略模式在JDK中的一個(gè)應(yīng)用點(diǎn)灭翔「蜗洌可以自定義拒絕策略煌张,在生產(chǎn)者的速度遠(yuǎn)遠(yuǎn)大于消費(fèi)者時(shí)將超出的任務(wù)持久化到外部存儲(chǔ)。
其中corePoolSize链嘀、maximumPoolSize怀泊、keepAliveTime等變量使用volatile修飾,是因?yàn)榫€程池提供了public的set方法讓我們可以對(duì)其進(jìn)行修改刷允,這里需要使用volatile來(lái)使得修改對(duì)多線程可見树灶。
其他屬性的修改在mainLock的控制下進(jìn)行天通。
線程池狀態(tài)
了解線程池必須了解其狀態(tài)機(jī)制熄驼。線程池內(nèi)部使用AtomicInteger類型的clt屬性來(lái)進(jìn)行狀態(tài)控制瓜贾。其中次高三位分別表示running祭芦、shutdown、stop胃夏、tidying仰禀、teminated這5種狀態(tài)
常用的方法
- 任務(wù)提交
public void execute(Runnable command) {
// NPE檢查答恶,線程池不允許提交NULL任務(wù)
if (command == null)
throw new NullPointerException();
int c = ctl.get(); // 獲取當(dāng)前的clt悬嗓,AtomicInteger類型保證線程安全
if (workerCountOf(c) < corePoolSize) { //如果當(dāng)前運(yùn)行的線程數(shù)小于核心線程數(shù)
if (addWorker(command, true)) //如果添加核心線程數(shù)成功則方法返回
return;
c = ctl.get();//執(zhí)行到這里必定是添加核心線程失敗烫扼,重新讀取最新的clt
}
/**
* 這里分析一下添加核心態(tài)worker失敗的幾種場(chǎng)景:
* 1映企、線程池為shutdown以上的狀態(tài)
* 2、當(dāng)前線程池中運(yùn)行的worker的數(shù)量超過(guò)其本身最大限制(2^29 -1 )
* 3挤渐、當(dāng)前線程池中運(yùn)行的worker的數(shù)量超過(guò)corePoolSize
*/
// 如果線程池處于running狀態(tài)浴麻,則將當(dāng)前提交的任務(wù)提交到內(nèi)部的阻塞隊(duì)列進(jìn)行排隊(duì)等待worker處理
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
/**
* double check是否線程池仍在運(yùn)行中
* 如果線程池不在running狀態(tài)則將剛才進(jìn)行排隊(duì)的任務(wù)移除软免,并拒絕此次提交的任務(wù)
* 如果此時(shí)在線程池中運(yùn)行的worker數(shù)量減少到0(corePoolSize為0的線程池在并發(fā)的情況下會(huì)出現(xiàn)此場(chǎng)景)
* 則添加一個(gè)不攜帶任何任務(wù)的非核心態(tài)的worker去處理剛才排隊(duì)成功的任務(wù)
*/
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))//如果排隊(duì)失敻嘞簟(有界的阻塞隊(duì)列)則添加一個(gè)非核心態(tài)的worker
//添加失旈环骸:當(dāng)前運(yùn)行的worker數(shù)量超過(guò)maximumPoolSize或者本身最大的限制噩斟;線程池狀態(tài)在shutdown以上
reject(command);
}
- 新增處理線程(worker)
private boolean addWorker(Runnable firstTask, boolean core) {
//自旋進(jìn)行線程狀態(tài)check
retry:
for (;;) {
int c = ctl.get(); //讀取最新的clt剃允,其本身具有可見性
int rs = runStateOf(c);
// 檢查線程池狀態(tài)是否在shutdown以上
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;
/**
* 自旋進(jìn)行worker數(shù)量自增
* 如果當(dāng)前新增的是核心態(tài)的worker則與corePoolSize進(jìn)行比較
* 如果當(dāng)期新增的是非核心態(tài)的worker則與maximumPoolSize進(jìn)行比較
* 不滿足數(shù)量限制則直接添加失敗硅急,進(jìn)入后續(xù)的排隊(duì) or 拒絕流程
*/
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
/**
* 通過(guò)CAS進(jìn)行worker數(shù)量+1营袜。為什么不直接調(diào)用AtomicInteger提供的incrementAndGet() 方法荚板?
* 因?yàn)槲覀兪切枰獙orker數(shù)量+1,而后者并不能提供單純的+1功能跪另。將c-> c+1而不是變成c -> c + N
*/
if (compareAndIncrementWorkerCount(c))
break retry; //如果CAS成功則跳出自旋
c = ctl.get(); // 重新讀clt免绿,代碼執(zhí)行到這里意味著clt的值必定被其他線程修改擦盾,本次讀會(huì)從主存讀取最新的值到工作內(nèi)存
if (runStateOf(c) != rs)// 如果線程池狀態(tài)發(fā)生變化(只有running狀態(tài)才接受新任務(wù)),則跳到外層循環(huán)執(zhí)行拒絕
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 代碼執(zhí)行到此處徒仓,意味著worker的數(shù)量成功+1掉弛,則可以進(jìn)行worker的構(gòu)造過(guò)程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// new 一個(gè)worker殃饿,將本次提交的任務(wù)封裝到其內(nèi)部
w = new Worker(firstTask);
final Thread t = w.thread; // worker內(nèi)部真正用來(lái)執(zhí)行任務(wù)的線程
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
/**
* 進(jìn)行線程池狀態(tài)檢查乎芳,thread狀態(tài)檢查秒咐,進(jìn)行運(yùn)行的最大線程數(shù)(largestPoolSize)統(tǒng)計(jì)
* 將worker添加到wokrers容器(HashSet)中
* 修改workerAdded為true
*/
try {
...省略此處代碼
} finally {
mainLock.unlock();
}
//在這里workerAdded為false:thread已經(jīng)調(diào)用該start方法携取;線程池狀態(tài)為shutdown以上
if (workerAdded) {
// 啟動(dòng)worker內(nèi)部的線程帮孔,其會(huì)調(diào)用worker內(nèi)部的run方法
t.start();
// 添加成功
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
- 執(zhí)行任務(wù) (worker的工作流程)
什么是worker文兢?
private final class Worker extends AbstractQueuedSynchronizer implements Runnable Worker(Runnable firstTask) { setState(-1); this.firstTask = firstTask; //外部提交的任務(wù) this.thread = getThreadFactory().newThread(this); // 真實(shí)執(zhí)行任務(wù)的線程 }
從這里我們可以看出其實(shí)際是一個(gè)Runnable姆坚,并且是AQS的子類兼呵,那么我們可以簡(jiǎn)單的猜測(cè)到其能夠進(jìn)行并發(fā)的控制(lock击喂、unlock)
final void runWorker(Worker w) {
//在添加worker的流程中執(zhí)行thread.start()之后真實(shí)執(zhí)行的方法
Thread wt = Thread.currentThread();
Runnable task = w.firstTask; // 獲取當(dāng)前worker攜帶的任務(wù)
w.firstTask = null;
/**
* 直接unlock懂昂??沸柔?在unlock之前一定要lock嗎勉失?從這里我們可以看出不一定
*/
w.unlock(); // 修改state為0乱凿,將占用鎖的線程設(shè)為null(第一次執(zhí)行之前沒(méi)有線程占用)
boolean completedAbruptly = true;
try {
// 自旋徒蟆。先執(zhí)行自己攜帶的任務(wù)段审,然后從阻塞隊(duì)列中獲取一個(gè)任務(wù)直到無(wú)法獲取任務(wù)
while (task != null || (task = getTask()) != null) {
// 將state修改為1,設(shè)置占有鎖的線程為自己
w.lock();
/**
* check線程池的狀態(tài)抑淫,如果狀態(tài)為stop以上(stop以上不執(zhí)行任務(wù))始苇,則中斷當(dāng)前線程
* 如果當(dāng)前線程已被中斷(其他線程并發(fā)的調(diào)用線程池的shutdown()或shutdownNow()方法)催式,則check線程池狀態(tài)是否為stop以上
* 最后如果當(dāng)前線程未被中斷則中斷當(dāng)前線程(不可能荣月!筆者還未想到此種場(chǎng)景)
*/
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);// 空方法哺窄,留給子類實(shí)現(xiàn)
Throwable thrown = null;
try {
task.run(); //執(zhí)行外部提交的任務(wù)堂氯,通過(guò)try-catch來(lái)保證異常不會(huì)影響線程池本身的功能
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);// 空方法,留給子類實(shí)現(xiàn)
}
} finally {
task = null;
w.completedTasks++; //已完成任務(wù)數(shù)量統(tǒng)計(jì)
w.unlock();
}
}
// 如果執(zhí)行到這里代表非核心線程在keepAliveTime內(nèi)無(wú)法獲取任務(wù)而退出
completedAbruptly = false;
} finally {
/**
* 從上面可以看出如果實(shí)際業(yè)務(wù)(外部提交的Runnable)出現(xiàn)異常會(huì)導(dǎo)致當(dāng)前worker終止
* completedAbruptly 此時(shí)為true意味著worker是突然完成啤握,不是正常退出
*/
processWorkerExit(w, completedAbruptly);// 執(zhí)行worker退出收尾工作
}
}
- 獲取任務(wù)
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
// 自旋獲取任務(wù)(因?yàn)槭嵌嗑€程環(huán)境)
for (;;) {
int c = ctl.get();// 讀取最新的clt
int rs = runStateOf(c);
/**
* 1鸟缕、線程池狀態(tài)為shutdown并且任務(wù)隊(duì)列為空
* 2、線程池狀態(tài)為stop狀態(tài)以上
* 這2種情況直接減少worker數(shù)量,并且返回null從而保證外部獲取任務(wù)的worker進(jìn)行正常退出
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
/**
* 1懂从、允許核心線程退出
* 2授段、當(dāng)前的線程數(shù)量超過(guò)核心線程數(shù)
* 這時(shí)獲取任務(wù)的機(jī)制切換為poll(keepAliveTime)
*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/**
* 1、線程數(shù)大于maximumPoolSize(什么時(shí)候會(huì)出現(xiàn)這種情況番甩? 當(dāng)maximumPoolSize初始設(shè)置為0或者其他線程通過(guò)set方法對(duì)其進(jìn)行修改)
* 2侵贵、線程數(shù)未超過(guò)maximumPoolSize但是timed為true(允許核心線程退出或者線程數(shù)量超過(guò)核心線程)
* 并且上次獲取任務(wù)超時(shí)(沒(méi)獲取到任務(wù),我們推測(cè)本次依舊會(huì)超時(shí))
* 3、在滿足條件1或者條件2的情況下進(jìn)行check:運(yùn)行線程數(shù)大于1或者任務(wù)隊(duì)列沒(méi)有任務(wù)
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c)) // CAS進(jìn)行worker數(shù)量-1缘薛,成功則返回null進(jìn)行worker退出流程,失敗則繼續(xù)自旋
return null;
continue;
}
try {
// 如果允許超時(shí)退出宴胧,則調(diào)用poll(keepAliveTime)獲取任務(wù)漱抓,否則則通過(guò)tack()一直阻塞等待直到有任務(wù)提交到隊(duì)列
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
if (r != null)
return r;
timedOut = true;// 當(dāng)?shù)却^(guò)keepAliveTime時(shí)間未獲取到任務(wù)時(shí),標(biāo)記為true恕齐。在下次自旋時(shí)會(huì)進(jìn)入銷毀流程
} catch (InterruptedException retry) {
// 什么時(shí)候會(huì)拋出異常乞娄?當(dāng)調(diào)用shutdown或者shutdownNow方法觸發(fā)worker內(nèi)的Thread調(diào)用interrupt方法時(shí)會(huì)執(zhí)行到此處
timedOut = false;
}
}
}
- 關(guān)閉線程池
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
// 利用排它鎖進(jìn)行上鎖,保證只有一個(gè)線程執(zhí)行關(guān)閉流程
mainLock.lock();
try {
// 安全檢查
checkShutdownAccess();
// 內(nèi)部通過(guò)自旋+CAS修改線程池狀態(tài)為shutdown
advanceRunState(SHUTDOWN);
// 遍歷所有的worker显歧,進(jìn)行線程中斷通知
interruptIdleWorkers();
// 鉤子函數(shù)
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 進(jìn)行最后的整理工作
tryTerminate();
}
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
...和shutdown類似仪或,將狀態(tài)修改為stop并返回在任務(wù)隊(duì)列排隊(duì)的任務(wù) ...
return tasks;
}
總結(jié)
線程池能為我們減少線程創(chuàng)建的開銷,但是相應(yīng)參數(shù)的設(shè)置需要不斷測(cè)試從而到達(dá)一個(gè)相對(duì)最優(yōu)的配置
- 過(guò)大的線程數(shù)可能導(dǎo)致CPU切換過(guò)于頻繁從而導(dǎo)致效率降低
- 過(guò)小的線程數(shù)可能導(dǎo)致CPU利用率不高
- 有界隊(duì)列可以防止資源耗盡士骤,但是我們需要考慮在生產(chǎn)速度大于消費(fèi)速度時(shí)提交任務(wù)帶來(lái)的拒絕問(wèn)題
- 無(wú)界隊(duì)列在消費(fèi)速度小于生產(chǎn)隊(duì)列時(shí)可能導(dǎo)致頻繁的GC從而降低系統(tǒng)響應(yīng)速度
以上所述都是個(gè)人學(xué)習(xí)源碼之中的一點(diǎn)心得體會(huì)范删,如果不實(shí)之處,望大家諒解和指正