- execute源碼
public void execute(Runnable command) {
// 任務(wù)的具體實(shí)現(xiàn)邏輯類不能為空
if (command == null)
throw new NullPointerException();
/*
* 1 如果當(dāng)前線程池中的線程數(shù)小于核心線程數(shù)corePoolSize垢啼,則創(chuàng)建一個(gè)新的線程它浅,不過該線程是封裝在Worker對(duì)象中
* 2 addWorker方法中的第一個(gè)參數(shù)是該線程的第一個(gè)任務(wù),而第二個(gè)參數(shù)就是代表是否創(chuàng)建的是核心線程
* 3 如果當(dāng)前線程池中的線程數(shù)已經(jīng)滿足了核心線程數(shù)corePoolSize罩引,那么就會(huì)通過workQueue.offer()方法將任務(wù)添加到阻塞隊(duì)列中等待執(zhí)行
* 4 如果線程數(shù)已經(jīng)達(dá)到了corePoolSize且阻塞隊(duì)列中無法插入該任務(wù)(比如已滿)嵌器,并且沒有超過最大線程數(shù)maximumPoolSize,那么線程池就會(huì)再增加一個(gè)非核心線程來執(zhí)行該任務(wù)
* 5 如果確實(shí)已經(jīng)達(dá)到了最大線程數(shù)层宫,那么就拒絕這個(gè)任務(wù)
* */
int c = ctl.get();
// 檢查當(dāng)前線程數(shù)是否達(dá)到了核心線程數(shù)
if (workerCountOf(c) < corePoolSize) {
// 未達(dá)到核心線程數(shù)杨伙,則創(chuàng)建新核心線程
// 并將傳入的任務(wù)作為該線程的第一個(gè)任務(wù)
if (addWorker(command, true))
// 添加核心線程成功則直接返回捶闸,如果沒有添加成功儡司,就該執(zhí)行上面所說的3
return;
// 因?yàn)榍懊嬲{(diào)用了耗時(shí)操作addWorker方法
// 所以線程池狀態(tài)有可能發(fā)生了改變,重新獲取狀態(tài)等信息
c = ctl.get();
}
// 走到這里說明當(dāng)前線程池中的線程數(shù)已經(jīng)滿足了核心線程數(shù)corePoolSize
// 如果線程池當(dāng)前狀態(tài)是運(yùn)行中就調(diào)用workQueue.offer方法將任務(wù)放入阻塞隊(duì)列
if (isRunning(c) && workQueue.offer(command)) {
// 因?yàn)檫@個(gè)放入操作比較耗時(shí)历极,所以在放入成功之后毁菱,又做了一些列的校驗(yàn)操作
int recheck = ctl.get();
// 如果當(dāng)前狀態(tài)變成了非運(yùn)行中(因?yàn)榫€程池的狀態(tài)只能從小到大進(jìn)行狀態(tài)遷移米死,如果不是RUNNING那么肯定是至少處于SHUTDOWN狀態(tài),從該狀態(tài)開始就不再接收新任務(wù)了)贮庞,則將剛才放入阻塞隊(duì)列的任務(wù)拿出峦筒,拿出成功后,直接拒絕這個(gè)任務(wù)
// 疑問1
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果線程池中沒有線程了窗慎,那就創(chuàng)建一個(gè)非核心線程
// 這里沒有看明白物喷,一個(gè)線程也沒有了卤材,為什么創(chuàng)建的不是核心線程?峦失?扇丛??
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果如果放入阻塞隊(duì)列失敵杞(如隊(duì)列已滿)晕拆,則添加一個(gè)非核心線程
// 如果不是RUNNING 這里為什么還能新建線程呢?而疑問1的地方不是運(yùn)行態(tài)為什么要取出來拒絕材蹬,而不是跟這里一樣建一個(gè)非核心線程進(jìn)行處理?吝镣?堤器?
// 目測只能是addWorker邏輯中有非運(yùn)行態(tài)的判斷肯定也會(huì)創(chuàng)建不成功(具體可以查看addWorker源碼注釋)
else if (!addWorker(command, false))
// 如果添加線程失敗(如已經(jīng)達(dá)到了最大線程數(shù))末贾,則拒絕任務(wù)
reject(command);
}
- addWorker源碼
// 改方法的主要作用是新建一個(gè)線程并啟用,創(chuàng)建的線程被包裝成了Worker對(duì)象
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 1 rs > SHUTDOWN 此時(shí)不再接受新的任務(wù)闸溃,直接返回添加失敗
// 2 rs = SHUTDOWN:firtTask != null 也會(huì)創(chuàng)建線程失敗(這里也正好印證了我們?cè)趀xecute函數(shù)注解中留下的疑問)拱撵,此時(shí)不再接受任務(wù)辉川,但是仍然會(huì)執(zhí)行隊(duì)列中的任務(wù)
// 3 線程池SHUTDOWN了不再接受新任務(wù),但是此時(shí)隊(duì)列不為空拴测,那么還得創(chuàng)建線程把任務(wù)給執(zhí)行完才行
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
/*
* 走都這里乓旗,說明線程池狀態(tài)或者為RUNNING,
* 或者是SHUTDOWN狀態(tài)集索,但是任務(wù)隊(duì)列中還有任務(wù)屿愚,這個(gè)時(shí)候也是需要新建線程(當(dāng)然這個(gè)線程究竟能不能新建成功,還需要后續(xù)一系列條件的限制)去執(zhí)行完任務(wù)
* */
for (;;) {
int wc = workerCountOf(c);
/*
* 線程數(shù)不能超過容量限制务荆;如果是創(chuàng)建核心線程妆距,也不能超過核心線程數(shù),非核心線程也不能超過設(shè)置的最大值
* */
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 原子的把線程數(shù)+1函匕,這里只是把線程數(shù)+1娱据,但是還沒有真正的創(chuàng)建工作線程
if (compareAndIncrementWorkerCount(c))
break retry; // 操作成功就退出重試循環(huán)
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs) // 如果線程池的狀態(tài)發(fā)生變化重試
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 走到這里就是真正的開始創(chuàng)建工作線程了
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 利用Worker構(gòu)造方法中的線程池工廠創(chuàng)建線程,并把線程封裝成Worker對(duì)象
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 對(duì)線程池的操作都要獲取主鎖盅惜,避免添加和啟動(dòng)線程時(shí)受到干擾
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
/*
* 如果線程池沒有關(guān)閉中剩,或者 已經(jīng)關(guān)閉且任務(wù)不為空
* */
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 檢查線程是否仍然存活其實(shí)就是檢查線程是否已啟動(dòng)(線程啟動(dòng)了就是存活了)
// 正常邏輯這里線程應(yīng)該不是啟動(dòng)狀態(tài),只有在調(diào)用了start之后才是啟動(dòng)狀態(tài)
// 所以如果這里是alive就報(bào)錯(cuò)
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w); //添加到workers線程數(shù)組中
int s = workers.size();
// 整個(gè)線程池運(yùn)行期間的最大并發(fā)線程數(shù)更新
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 添加到線程數(shù)組成功后酷窥,才啟動(dòng)新添加的線程
if (workerAdded) {
// 啟動(dòng)線程的方法是調(diào)用Thread類的start()方法咽安,該方法會(huì)在內(nèi)部調(diào)用Runnable接口的run()方法,即會(huì)調(diào)用Worker對(duì)象中的run方法蓬推,
// 這是因?yàn)?w = new Worker(firstTask)時(shí) 其中的newThread時(shí)把this(即Worker本身傳進(jìn)去了)而worker實(shí)現(xiàn)了Runnable接口
// 所以執(zhí)行start就是執(zhí)行run就是執(zhí)行Worker對(duì)象的runWorker(this)方法
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// 啟動(dòng)失敗就回退線程的創(chuàng)建妆棒,即釋放掉線程
addWorkerFailed(w);
}
return workerStarted;
}
我們知道一個(gè)Thread在執(zhí)行完其中的run方法之后就會(huì)退出,線程的生命周期也就結(jié)束了,那線程池中的線程是如何做到復(fù)用的呢糕珊?我們來看以下Worker
類中的runWorker(this)方法动分。該方法會(huì)在線程啟動(dòng)時(shí)被調(diào)用(具體調(diào)用邏輯請(qǐng)看addWorker的源碼注釋)
- runWorker(this)源碼分析
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//在構(gòu)造Worker對(duì)象的時(shí)候,會(huì)把一個(gè)任務(wù)添加進(jìn)Worker對(duì)象中作為新增線程的第一個(gè)任務(wù)來執(zhí)行
Runnable task = w.firstTask;
//已經(jīng)將該任務(wù)拿出來進(jìn)行執(zhí)行红选,則需要將該worker對(duì)象 即線程池中的線程對(duì)象持有的任務(wù)清空
w.firstTask = null;
//將AQS鎖資源的狀態(tài)有-1變成0澜公,運(yùn)行該線程允許進(jìn)行中斷
w.unlock(); // allow interrupts
//用來判斷執(zhí)行任務(wù)的過程中,是否出現(xiàn)了異常喇肋,默認(rèn)是異常退出
boolean completedAbruptly = true;
try {
// 這里先大概說一下getTask的大概邏輯坟乾,核心線程會(huì)一直阻塞等待任務(wù)隊(duì)列中有任務(wù)
// 非核心線程會(huì)在等待keepAliveTime的超時(shí)時(shí)間之后,如果任務(wù)隊(duì)列中還沒有任務(wù)就會(huì)獲取到null
// 然后退出whie循環(huán)蝶防,進(jìn)入到finally中 執(zhí)行processWorkerExit結(jié)束調(diào)線程
while (task != null || (task = getTask()) != null) {
//給該線程加鎖甚侣,一個(gè)線程只處理一個(gè)任務(wù)
w.lock();
// 線程池是否是STOP狀態(tài)
// 如果是,則確保當(dāng)前線程是中斷狀態(tài)
// 如果不是间学,則確保當(dāng)前線程不是 中斷狀態(tài)
// 中斷對(duì)正在運(yùn)行的線程不起作用殷费,只對(duì)阻塞的線程起作用,這里具體怎么用低葫?详羡??嘿悬?实柠??
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//擴(kuò)展使用鹊漠,在執(zhí)行任務(wù)的run方法之前執(zhí)行主到;前置鉤子
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 執(zhí)行run方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 后置鉤子
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
// 正常執(zhí)行完任務(wù)
completedAbruptly = false;
} finally {
//所有的任務(wù)都處理完后,或者執(zhí)行任務(wù)的過程中出現(xiàn)了異常
processWorkerExit(w, completedAbruptly);
}
}
- getTask()源碼
核心線程池可以復(fù)用的原理就在這個(gè)函數(shù)里面
private Runnable getTask() {
// 標(biāo)記是否獲取任務(wù)超時(shí)
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/*
* 如果線程池的當(dāng)前狀態(tài)為>=關(guān)閉的狀態(tài) 并且 (已停止或者任務(wù)隊(duì)列中為空)
* */
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount(); //Worker總數(shù)減1躯概,返回null 外層的runWorker就會(huì)退出循環(huán)登钥,進(jìn)入銷毀線程的邏輯
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// allowCoreThreadTimeOut:是否允許core Thread超時(shí),默認(rèn)false
// workerCount是否大于核心核心線程池
// 該標(biāo)志的含義是:是否需要等待keepAliveTime時(shí)間再去隊(duì)列中獲取
// 也就是說如果隊(duì)列中沒有任務(wù)娶靡,可以等待keepAliveTime時(shí)長牧牢,如果還沒有任務(wù)的話,那么則返回null
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果設(shè)置了allowCoreThreadTimeOut姿锭,那么大于coreThread(核心線程)就會(huì)有會(huì)有超時(shí)時(shí)間(即keepAliveTime)的限制塔鳍,
// 即如果當(dāng)前的WorkerCount大于corePoolSize的話,那么超過的這些線程都會(huì)有超時(shí)的限制
//1如果線程數(shù)超過最大限制呻此,直接線程數(shù)-1轮纫,然后返回null(該線程就會(huì)銷毀)
//2如果啟用了非核心線程超時(shí)設(shè)置,那么當(dāng)獲取任務(wù)超時(shí)并且線程池中有多于1個(gè)線程焚鲜,就返回null并回收此線程
//3如果啟用了非核心線程超時(shí)設(shè)置掌唾,那么當(dāng)獲取任務(wù)超時(shí) 并且任務(wù)隊(duì)列為空放前,就減1個(gè)線程(如果只有一個(gè)線程就會(huì)減失敗)
// 仍熱會(huì)繼續(xù)該循環(huán)糯彬,也就是說線程池中始終會(huì)至少有一個(gè)線程
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 如果啟用了非核心線程超時(shí)設(shè)置凭语,就使用poll獲取任務(wù),如果隊(duì)列中沒任務(wù)撩扒,該方法會(huì)獲取超時(shí)似扔,就會(huì)在下次循環(huán)中返回null
// 核心線程會(huì)使用take方法獲取任務(wù),take方法在任務(wù)隊(duì)列中沒有任務(wù)時(shí)搓谆,會(huì)阻塞炒辉。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
//判斷任務(wù)不為空返回任務(wù)
return r;
//獲取一段時(shí)間沒有獲取到,獲取超時(shí)
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}