線程源碼分析
上一篇中已經(jīng)講了線程池的原理讯柔。這一次來(lái)說(shuō)說(shuō)源碼執(zhí)行過(guò)程孵延。建議先看看細(xì)說(shuō)線程池---入門(mén)篇 細(xì)說(shuō)線程池---中級(jí)篇
依然使用newFixedThreadPool()方法創(chuàng)建線程池女淑。
看源碼從execute(Runnable runable)開(kāi)始扯再。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
///1.當(dāng)前池中線程比核心數(shù)少焊唬,新建一個(gè)線程執(zhí)行任務(wù)
//workerCountOf計(jì)算出線程個(gè)數(shù)
if (workerCountOf(c) < corePoolSize) {
//線程池中創(chuàng)建一個(gè)線程worker
if (addWorker(command, true))
return;
c = ctl.get();
}
////2.核心池已滿,但任務(wù)隊(duì)列未滿,添加到隊(duì)列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//任務(wù)成功添加到隊(duì)列以后,再次檢查是否需要添加新的線程缎谷,
// 因?yàn)橐汛嬖诘木€程可能被銷毀了
if (! isRunning(recheck) && remove(command))
////如果線程池處于非運(yùn)行狀態(tài),
// 并且把當(dāng)前的任務(wù)從任務(wù)隊(duì)列中
//移除成功灶似,則拒絕該任務(wù)
reject(command);
////如果之前的線程已被銷毀完列林,新建一個(gè)線程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
////3.核心池已滿瑞你,隊(duì)列已滿,試著創(chuàng)建一個(gè)新線程
else if (!addWorker(command, false))
////如果創(chuàng)建新線程失敗了希痴,說(shuō)明線程池被關(guān)閉或者線程池完全滿了者甲, 拒絕任務(wù)
reject(command);
}
ctl 的作用
在線程池中,ctl 貫穿在線程池的整個(gè)生命周期中
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING,0));
它是一個(gè)原子類砌创,主要作用是用來(lái)保存線程數(shù)量和線程池的狀態(tài)虏缸。我們來(lái)分析一下這段代碼,其實(shí)比較有意思嫩实,他用到了位運(yùn)算一個(gè) int 數(shù)值是 32 個(gè) bit 位刽辙,這里采用高 3 位來(lái)保存運(yùn)行狀態(tài),低 29 位來(lái)保存線程數(shù)量舶赔。我們來(lái)分析默認(rèn)情況下扫倡,也就是 ctlOf(RUNNING)運(yùn)行狀態(tài),調(diào)用了 ctlOf(int rs,int wc)方法竟纳;其中
private static int ctlOf(int rs, int wc) { return rs | wc; }
其中 RUNNING =-1 << COUNT_BITS 撵溃;-1 左移 29 位,
-1 的二進(jìn)制是 32 個(gè) 1(1111 1111 11111111 1111 1111 1111 1111)锥累;</pre>
-1 的二進(jìn)制計(jì)算方法原碼是 1000…001 . 高位 1 表示符號(hào)位缘挑。然后對(duì)原碼取反,高位不變得到 1111…110然后對(duì)反碼進(jìn)行+1 桶略,也就是補(bǔ)碼操作语淘, 最后得到 1111…1111
那么-1 <<左移 29 位, 也就是 【111】 表示际歼;rs | wc 惶翻。二進(jìn)制的 111 | 000 。
得到的結(jié)果仍然是 111鹅心。
那么同理可得其他的狀態(tài)的 bit 位表示
//32-3
private static final int COUNT_BITS = Integer.SIZE - 3;
//將 1 的二進(jìn)制向右位移 29 位,再減 1 表示最大線程容量
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 運(yùn)行狀態(tài)保存在 int 值的高 3 位 ( 所有數(shù)值左移 29 位 )
// 接收新任務(wù),并執(zhí)行隊(duì)列中的任務(wù)
private static final int RUNNING = -1 << COUNT_BITS;
// 不接收新任務(wù),但是執(zhí)行隊(duì)列中的任務(wù)
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 不接收新任務(wù),不執(zhí)行隊(duì)列中的任務(wù),中斷正在執(zhí)行中的任務(wù)
private static final int STOP = 1 << COUNT_BITS;
// 所有的任務(wù)都已結(jié)束,線程數(shù)量為 0,處于該狀態(tài)的線程池即將調(diào)用 terminated()方法
private static final int TIDYING = 2 << COUNT_BITS;
// terminated()方法執(zhí)行完成
private static final int TERMINATED = 3 << COUNT_BITS;</pre>
線程池狀態(tài)變化圖
addWorker
再回到上面源碼中吕粗,當(dāng)線程池中線程數(shù)小于核心線程數(shù)的時(shí)候:會(huì)調(diào)用 addWorker,顧名思義旭愧,其實(shí)就是要?jiǎng)?chuàng)建一個(gè)工作線程颅筋。我們來(lái)看看源碼的實(shí)現(xiàn)源碼比較長(zhǎng),看起來(lái)比較唬人输枯,其實(shí)就做了兩件事议泵。
才用循環(huán)
CAS
操作來(lái)將線程數(shù)加 1新建一個(gè)線程并啟用
private boolean addWorker(Runnable firstTask, boolean core) {
//goto 語(yǔ)句,避免死循環(huán)
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//如果線程處于非運(yùn)行狀態(tài),并且 rs 不等于 SHUTDOWN 且 firstTask 不等于空且且
// workQueue 為空桃熄,直接返回 false (表示不可添加 work 狀態(tài))
// 1\. 線程池已經(jīng) shutdown 后先口,還要添加新的任務(wù),拒絕
// 2\. (第二個(gè)判斷) SHUTDOWN 狀態(tài)不接受新任務(wù),但仍然會(huì)執(zhí)行已經(jīng)加入任務(wù)隊(duì)列的任
// 務(wù)池充,所以當(dāng)進(jìn)入 SHUTDOWN 狀態(tài)桩引,而傳進(jìn)來(lái)的任務(wù)為空,并且任務(wù)隊(duì)列不為空的時(shí)候收夸,是允許添加
// 新線程的 , 如果把這個(gè)條件取反,就表示不允許添加 worker
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) { //自旋
int wc = workerCountOf(c);//獲得 Worker 工作線程數(shù)
//如果工作線程數(shù)大于默認(rèn)容量大小或者大于核心線程數(shù)大小血崭,
// 則直接返回 false 表示不能再添加 worker卧惜。
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//通過(guò) cas 來(lái)增加工作線程數(shù),
if (compareAndIncrementWorkerCount(c))
//如果 cas 失敗夹纫,則直接重試
break retry;
// 再次獲取 ctl 的值
c = ctl.get();
//這里如果不想等咽瓷,說(shuō)明線程的狀態(tài)發(fā)生了變化,繼續(xù)重試
if (runStateOf(c) != rs)
continue retry;
}
}
//上面這段代碼主要是對(duì) worker 數(shù)量做原子+1 操作,
// 下面的邏輯才是正式構(gòu)建一個(gè) worker
boolean workerStarted = false; //工作線程是否啟動(dòng)的標(biāo)識(shí)
boolean workerAdded = false; //工作線程是否已經(jīng)添加成功的標(biāo)識(shí)
Worker w = null;
try {
//構(gòu)建一個(gè) Worker舰讹,這個(gè) worker 是什么呢茅姜?
//我們 可以看到構(gòu)造方法里面?zhèn)魅肓艘粋€(gè) Runnable 對(duì)象
w = new Worker(firstTask);
final Thread t = w.thread; //從 worker 對(duì)象中取出線程
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); //這里有個(gè)重入鎖,避免并發(fā)問(wèn)題
try {
int rs = runStateOf(ctl.get());
//只有當(dāng)前線程池是正在運(yùn)行狀態(tài)月匣,[或是 SHUTDOWN
// 且 firstTask 為空]钻洒,才能添加到 workers 集合中
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//任務(wù)剛封裝到 work 里面,還沒(méi) start,你封裝的線程就是 alive锄开,
// 幾個(gè)意思素标?肯定是要拋異常出去的
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w); //將新創(chuàng)建的 Worker 添加到 workers 集合中
int s = workers.size();
//如果集合中的工作線程數(shù)大于最大線程數(shù),
// 這個(gè)最大線程數(shù)表示線程池曾經(jīng)出現(xiàn)過(guò)的最大線程數(shù)
if (s > largestPoolSize)
largestPoolSize = s; //更新線程池出現(xiàn)過(guò)的最大線程數(shù)
workerAdded = true;//表示工作線程創(chuàng)建成功了
}
} finally {
mainLock.unlock(); //釋放鎖
}
if (workerAdded) {//如果 worker 添加成功
t.start();//啟動(dòng)線程
workerStarted = true;
}
}
} finally {
if (! workerStarted)
//如果添加失敗萍悴,就需要做一件事头遭,就是遞減實(shí)際工作線
//程數(shù)(還記得我們最開(kāi)始的時(shí)候增加了工作線程數(shù)嗎)
addWorkerFailed(w);
}
//返回結(jié)果
return workerStarted;
}
Worker說(shuō)明
我們發(fā)現(xiàn) addWorker 方法只是構(gòu)造了一個(gè) Worker,并且把 firstTask 封裝到 worker 中癣诱,它是做什么的呢计维?我們來(lái)看看
每個(gè) worker,都是一條線程,同時(shí)里面包含了一個(gè) firstTask,即初始化時(shí)要被首先執(zhí)行的任務(wù).
最終執(zhí)行任務(wù)的,是 runWorker()方法Worker 類繼承了 AQS,并實(shí)現(xiàn)了 Runnable 接口撕予,注意其中的 firstTask 和 thread 屬性:firstTask 用它來(lái)保存?zhèn)魅氲娜蝿?wù)鲫惶;thread 是在調(diào)用構(gòu)造方法時(shí)通過(guò) ThreadFactory 來(lái)創(chuàng)建的線程,是用來(lái)處理任務(wù)的線程嗅蔬。
在調(diào)用構(gòu)造方法時(shí)剑按,需要傳入任務(wù),這里通過(guò) getThreadFactory().newThread(this);來(lái)新建一個(gè)線程澜术,newThread 方法傳入的參數(shù)是 this艺蝴,因?yàn)?Worker 本身繼承了 Runnable 接口,也就是一個(gè)線程鸟废,所以一個(gè) Worker 對(duì)象在啟動(dòng)的時(shí)候會(huì)調(diào)用 Worker 類中的 run 方法猜敢。Worker 繼承了 AQS,使用 AQS 來(lái)實(shí)現(xiàn)獨(dú)占鎖的功能。為什么不使用 ReentrantLock 來(lái)實(shí)現(xiàn)呢缩擂?可以看到 tryAcquire 方法鼠冕,它是不允許重入的,而 ReentrantLock 是允許重入的:lock 方法一旦獲取了獨(dú)占鎖胯盯,表示當(dāng)前線程正在執(zhí)行任務(wù)中懈费;那么它會(huì)有以下幾個(gè)作用
如果正在執(zhí)行任務(wù),則不應(yīng)該中斷線程博脑;
如果該線程現(xiàn)在不是獨(dú)占鎖的狀態(tài)憎乙,也就是空閑的狀態(tài),說(shuō)明它沒(méi)有在處理任務(wù)叉趣,這時(shí)可以對(duì)該線程進(jìn)行中斷泞边;
線程池在執(zhí)行 shutdown 方法或 tryTerminate 方法時(shí)會(huì)調(diào)用 interruptIdleWorkers 方法來(lái)中斷空閑的線程,interruptIdleWorkers 方法會(huì)使用 tryLock 方法來(lái)判斷線程池中的線程是否是空閑狀態(tài)
之所以設(shè)置為不可重入疗杉,是因?yàn)槲覀儾幌M蝿?wù)在調(diào)用像 setCorePoolSize 這樣的線程池控制方法時(shí)重新獲取鎖阵谚,這樣會(huì)中斷正在運(yùn)行的線程
//繼承了AQS還實(shí)現(xiàn)了Runnable
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
//注意了,這才是真正執(zhí)行task的線程烟具,從構(gòu)造函數(shù)可知是由
//ThreadFactury 創(chuàng)建的
final Thread thread;
//這就是需要執(zhí)行的 task
Runnable firstTask;
/** 每個(gè)線程完成任務(wù)的計(jì)數(shù)器*/
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
//// 初始狀態(tài) -1, 防止在調(diào)用 runWorker() 梢什,
// 也就是真正執(zhí)行task前中斷thread 。
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
addWorkerFailed
方法
addWorker
方法中净赴,如果添加 Worker 并且啟動(dòng)線程失敗绳矩,則會(huì)做失敗后的處理。這個(gè)方法主要做兩件事
如果 worker 已經(jīng)構(gòu)造好了玖翅,則從 workers 集合中移除這個(gè) worker
原子遞減核心線程數(shù)(因?yàn)樵?addWorker 方法中先做了原子增加)
嘗試結(jié)束線程池
private void addWorkerFailed(java.util.concurrent.ThreadPoolExecutor.Worker w) {
final ReentrantLock mainLock = this.mainLock;
//上鎖
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
//釋放鎖
mainLock.unlock();
}
}</pre>
runWorker 方法
前面已經(jīng)了解了 ThreadPoolExecutor
的核心方法 addWorker
翼馆,主要作用是增加工作線程,而 Worker 簡(jiǎn)單理解其實(shí)就是一個(gè)線程金度,里面重新了 run 方法应媚,這塊是線程池中執(zhí)行任務(wù)的真正處理邏輯,也就是 runWorker
方法猜极,這個(gè)方法主要做幾件事:
如果 task 不為空,則開(kāi)始執(zhí)行 task
如果 task 為空,則通過(guò)
getTask()
再去取任務(wù),并賦值給 task,如果取到的 Runnable 不為空,則執(zhí)行該任務(wù)執(zhí)行完畢后,通過(guò) while 循環(huán)繼續(xù)
getTask()
取任務(wù)如果
getTask()
取到的任務(wù)依然是空,那么整個(gè)runWorker()
方法執(zhí)行完畢