1仰猖、由于系統(tǒng)創(chuàng)建和銷(xiāo)毀線程都會(huì)占用系統(tǒng)資源(CPU時(shí)間)芬探,如果對(duì)于某些執(zhí)行耗時(shí)很少神得,但是數(shù)量很多的任務(wù),大部分的時(shí)間都會(huì)花在創(chuàng)建和銷(xiāo)毀線程偷仿,所以引入了線程池的概念循头;其實(shí)原理和數(shù)據(jù)庫(kù)連接池(對(duì)象池)是一樣的,都是為了避免某些不必要的損耗炎疆。
2、我們可以使用構(gòu)造方法去初始化線程池国裳,
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 5, 10, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
其中有幾個(gè)必要的參數(shù):
(1)corePoolSize: 核心線程池的大行稳搿(可以理解為空閑時(shí)期線程池中最小數(shù)目,但是需要任何時(shí)期線程數(shù)量大于等于過(guò)corePoolSize)缝左,必須大于等于0亿遂。
(2)maximumPoolSize: 線程池中最大線程數(shù)量,必須大于0渺杉。
(3)keepAliveTime: 空閑時(shí)期非核心線程最大存活時(shí)間蛇数,必須大于0。
(4)workQueue: 任務(wù)隊(duì)列是越。
(5)threadFactory: 線程創(chuàng)建工廠耳舅。
(6)handler: 飽和策略。
上面幾個(gè)參數(shù)很好理解,也是初始化線程池的必要參數(shù)浦徊;然后線程池還提供了一個(gè)比較有意思的參數(shù):
allowCoreThreadTimeOut: 是否允許核心線程超時(shí)馏予,它的默認(rèn)值是false
從字面意思理解,就是是否讓核心線程超時(shí)銷(xiāo)毀盔性,所以我們一般所理解的核心線程不會(huì)被銷(xiāo)毀霞丧,在這個(gè)值設(shè)置為true的時(shí)候,就是不正確的哦(我就被面試官問(wèn)到過(guò)冕香,然后還自信滿滿的說(shuō)核心線程不會(huì)被銷(xiāo)毀S汲ⅰ);具體使用后面解釋悉尾。
3突那、現(xiàn)在我們需要使用線程池來(lái)執(zhí)行我們的任務(wù),它提供了
// 使用Future模式焕襟,有三種重載
Future future = executor.submit(() -> System.out.println("do work"));
// 普通提交任務(wù)方式
executor.execute(() -> System.out.println("do work"));
等幾種方法提交任務(wù)陨收。但是最終都是使用execute方法去執(zhí)行任務(wù),只不過(guò)submit是對(duì)我們的任務(wù)進(jìn)行了封裝鸵赖;所以我們關(guān)注execute的邏輯务漩,究竟線程池是怎樣幫助我們完成任務(wù)的。
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
// 獲取當(dāng)前運(yùn)行狀態(tài)以及線程數(shù)量
int c = ctl.get();
// 如果線程數(shù)小于核心線程數(shù)它褪,則創(chuàng)建新線程(無(wú)論線程池中是否有可以執(zhí)行任務(wù)的線程)
if (workerCountOf(c) < corePoolSize) {
// 創(chuàng)建新線程并且執(zhí)行任務(wù)
if (addWorker(command, true))
return;
// 核心線程創(chuàng)建失敗饵骨,獲取當(dāng)前運(yùn)行狀態(tài)以及線程數(shù)量
c = ctl.get();
}
// 判斷是否可以接受新任務(wù)(當(dāng)前運(yùn)行狀態(tài)),以及隊(duì)列是否滿
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次檢測(cè)茫打,如果當(dāng)前運(yùn)行狀態(tài)是非RUNNING居触,而且任務(wù)移除成功,那么拒絕任務(wù)(會(huì)執(zhí)行飽和策略)
if (! isRunning(recheck) && remove(command))
reject(command);
// 當(dāng)前運(yùn)行狀態(tài)是RUNNING或者移除任務(wù)失敗老赤,再判斷未終止的線程數(shù)是否等于0轮洋,是則創(chuàng)建新線程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 當(dāng)運(yùn)行狀態(tài)為非RUNNING或者隊(duì)列滿了(邏輯上如此,但是實(shí)際在addWorker中如果運(yùn)行狀態(tài)是非RUNNING并且傳入任務(wù)非空抬旺,是無(wú)法創(chuàng)建新線程的)弊予,創(chuàng)建新線程;如果創(chuàng)建失斂啤(由于運(yùn)行狀態(tài)汉柒、或者最大線程數(shù)),則拒絕任務(wù)
else if (!addWorker(command, false))
reject(command);
以上是execute的源碼责鳍,大致上的邏輯我們都清楚了碾褂,有一點(diǎn)需要我們注意,就是核心線程的創(chuàng)建历葛,它并不是有可執(zhí)行任務(wù)的核心線程就不去創(chuàng)建正塌,而是只要當(dāng)前線程數(shù)小于核心線程數(shù)的時(shí)候,有新任務(wù)添加就會(huì)直接創(chuàng)建核心線程,然后我們需要明白它是如何判斷運(yùn)行狀態(tài)的传货;線程池中提供了幾個(gè)常量:
// 位移位數(shù) 32 - 3
private static final int COUNT_BITS = Integer.SIZE - 3;
// 計(jì)量新建線程數(shù)量的最大值 000111...111(29個(gè)1)
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
// RUNNING狀態(tài) 111000...000(29個(gè)0)
private static final int RUNNING = -1 << COUNT_BITS;
// SHUTDOWN狀態(tài) 000000...000(32個(gè)0)
private static final int SHUTDOWN = 0 << COUNT_BITS;
// STOP狀態(tài) 001000...000(29個(gè)0)
private static final int STOP = 1 << COUNT_BITS;
// TIDYING狀態(tài) 010000...000(30個(gè)0)
private static final int TIDYING = 2 << COUNT_BITS;
// TERMINATED狀態(tài) 011000...000(29個(gè)0)
private static final int TERMINATED = 3 << COUNT_BITS;
以上常量就是用來(lái)表示線程池運(yùn)行狀態(tài)的屎鳍,然后記錄當(dāng)前運(yùn)行狀態(tài)以及線程數(shù)量用
// 將當(dāng)前線程池設(shè)置為RUNNING狀態(tài),并計(jì)入0個(gè)線程
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
所以我們也就可以理解问裕,它用了32位的整型做狀態(tài)表示以及計(jì)數(shù)逮壁,前三位表示運(yùn)行狀態(tài),后29位用來(lái)計(jì)數(shù)粮宛。所以對(duì)于源碼里面的幾個(gè)函數(shù)我們也就可以理解了
// 判斷當(dāng)前運(yùn)行狀態(tài) c是ctl.get()獲取的當(dāng)前運(yùn)行狀態(tài)以及線程數(shù)量值窥淆,然后與上111000...000
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 計(jì)算當(dāng)前線程數(shù)目 c是ctl.get()獲取的,然后與上000111...111
private static int workerCountOf(int c) { return c & CAPACITY; }
// 運(yùn)行狀態(tài)(rs)下計(jì)入線程數(shù)量(wc)
private static int ctlOf(int rs, int wc) { return rs | wc; }
// 判斷運(yùn)行狀態(tài)大小
private static boolean runStateLessThan(int c, int s) { return c < s; }
private static boolean runStateAtLeast(int c, int s) { return c >= s; }
// 判斷是否RUNNING狀態(tài)
private static boolean isRunning(int c) { return c < SHUTDOWN; }
現(xiàn)在關(guān)于整個(gè)execute的執(zhí)行邏輯巍杈、判斷條件基本理解忧饭,所以我們需要理解它是如何添加任務(wù),如何讓線程運(yùn)行起來(lái)筷畦,執(zhí)行完任務(wù)之后如何去等待繼續(xù)去執(zhí)行新任務(wù)词裤。
以下是addWorker的源碼,我們分為兩部分分析鳖宾,先看CAS判斷:
retry:
for (;;) {
int c = ctl.get();
// 獲取當(dāng)前運(yùn)行狀態(tài)
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//這里就是我們之前說(shuō)的吼砂,如果運(yùn)行狀態(tài)是非RUNNING并且當(dāng)前任務(wù)是非空,是無(wú)法創(chuàng)建新線程的
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 獲取當(dāng)前未終止的線程數(shù)量
int wc = workerCountOf(c);
// 判斷數(shù)量是否超過(guò)限制(計(jì)數(shù)最大限制鼎文、核心線程限制渔肩、最大線程數(shù)量限制)
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 比較并增加1,如果成功拇惋,那么結(jié)束判斷周偎,進(jìn)入創(chuàng)建線程邏輯
if (compareAndIncrementWorkerCount(c))
break retry;
// 重新判斷運(yùn)行狀態(tài),如果有變化撑帖,則重新進(jìn)入retry循環(huán)蓉坎,否則繼續(xù)當(dāng)前循環(huán)
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
以上是CAS算法判斷是否能夠新創(chuàng)建線程,如果成功break出retry循環(huán)胡嘿,那么就進(jìn)入創(chuàng)建線程的邏輯袍嬉。
然后我們?cè)诜治鼍€程創(chuàng)建:
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 創(chuàng)建Worker(就是我們的線程),這里Worker中會(huì)帶一個(gè)Thread對(duì)象與它(Worker)做雙向引用灶平,后續(xù)分析Worker的工作原理
// firstTask就是我們真正的需要執(zhí)行的任務(wù)
w = new Worker(firstTask);
// 這就是Worker中的Thread對(duì)象
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 將新創(chuàng)建的Worker加入HashSet中
workers.add(w);
// 記錄至今最大的workers數(shù)量
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 如果創(chuàng)建成功,則啟動(dòng)Worker中的線程箍土,這里很重要逢享,這也是Worker的啟動(dòng),幫助我們執(zhí)行任務(wù)的關(guān)鍵吴藻,需要結(jié)合Worker初始化的源碼分析瞒爬,才能更好理解
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果創(chuàng)建失敗,那么做失敗處理
if (! workerStarted)
addWorkerFailed(w);
}
// 返回是否成功的標(biāo)志
return workerStarted;
上面就是創(chuàng)建Worker(線程)的邏輯,比較關(guān)鍵的是Worker的初始化和啟動(dòng)侧但,現(xiàn)在我們繼續(xù)分析Worker的源碼矢空,理解它是如何與Thread做綁定,然后幫助我們執(zhí)行任務(wù)的:
Worker(Runnable firstTask) {
// 這里是標(biāo)志W(wǎng)orker狀態(tài)
setState(-1); // inhibit interrupts until runWorker
// 需要執(zhí)行的任務(wù)
this.firstTask = firstTask;
// 創(chuàng)建Thread禀横,并且Thread中的Runnable對(duì)象是Worker本身
this.thread = getThreadFactory().newThread(this);
}
Worker其實(shí)也是實(shí)現(xiàn)了Runnable接口屁药,從構(gòu)造函數(shù)我們可以知道,在初始化Worker的時(shí)候柏锄,將本身和它的Thread對(duì)象進(jìn)行雙向引用酿箭,再結(jié)合addWorker中啟動(dòng)Worker中Thread的邏輯,就明白了趾娃,t.start實(shí)際上是執(zhí)行了Worker中的run方法缭嫡,然后我們繼續(xù)分析Worker中的run方法,它執(zhí)行了runWorker方法:
final void runWorker(Worker w) {
// 獲取當(dāng)前線程抬闷,也就是Worker中的Thread
Thread wt = Thread.currentThread();
// 獲取Worker需要執(zhí)行的任務(wù)(也就是我們實(shí)際的任務(wù))
Runnable task = w.firstTask;
w.firstTask = null;
// 改變Worker的狀態(tài)
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 判斷當(dāng)前任務(wù)是否為null妇蛀,如果是空,則去隊(duì)列獲取任務(wù)
while (task != null || (task = getTask()) != null) {
// 改變Worker的狀態(tài)
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 執(zhí)行任務(wù)之前笤成,可以自己重寫(xiě)該方法评架,從而做響應(yīng)的預(yù)處理
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 執(zhí)行我們實(shí)際的任務(wù)
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 執(zhí)行任務(wù)之后,可以自己重寫(xiě)該方法疹启,從而在結(jié)束之后做相應(yīng)處理
afterExecute(task, thrown);
}
} finally {
task = null;
// 當(dāng)前Worker至今完成的所有任務(wù)總和
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 由于獲取任務(wù)超時(shí)終止當(dāng)前Worker古程,這里對(duì)Worker做終止處理
processWorkerExit(w, completedAbruptly);
}
}
以上是Worker的工作原理,其中最主要的是getTask方法喊崖,這里就是保證它不退出一直WAITING或者TIMED_WAITING挣磨,等待任務(wù)入隊(duì)的關(guān)鍵(這里有個(gè)面試題喲,面試官問(wèn)過(guò)我荤懂,當(dāng)線程執(zhí)行完任務(wù)之后會(huì)處于什么狀態(tài)茁裙,很多人可能認(rèn)為會(huì)處于阻塞狀態(tài),因?yàn)锽lockingQueue嘛节仿,但是是不對(duì)哦晤锥,BlockingQueue中take方法是用了LockSupport.park來(lái)使當(dāng)前線程進(jìn)入WAITING,而poll(timeout, timeunit)方法則是用LockSupport.parkNanos使線程進(jìn)入TIMED_WAITING廊宪!這里可以了解ReentrantLock矾瘾;不過(guò)我們根據(jù)線程狀態(tài)改變的條件也能推斷,這里狀態(tài)改變的情況)箭启;然后我們?cè)賮?lái)看一下getTask的源碼:
/**
* 我們需要明白一點(diǎn)壕翩,當(dāng)getTask方法返回null的時(shí)候,就表示當(dāng)前調(diào)用Worker需要終止
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 如果運(yùn)行狀態(tài)為SHUTDOWN并且隊(duì)列為空或者運(yùn)行狀態(tài)是STOP傅寡,那么終止Worker
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// 這里就是我們之前說(shuō)的放妈,那個(gè)比較有意思的屬性北救,是否讓核心線程超時(shí)
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果線程數(shù)量大于最大數(shù)量或者已經(jīng)超時(shí) 并且 線程池中有線程或者隊(duì)列為空,則嘗試結(jié)束當(dāng)前Worker
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 判斷是否需要有超時(shí)獲取任務(wù)
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
上面就是getTask的邏輯芜抒,然后主要就是在
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
如果timed是true珍策,也就是當(dāng)前線程數(shù)量大于核心數(shù)量或者是我們把a(bǔ)llowCoreThreadTimeOut屬性設(shè)置為true,那么就使用poll超時(shí)獲取宅倒,否則使用take一直等待獲取任務(wù)攘宙;所以其實(shí)對(duì)于線程池,核心線程也是有可能被銷(xiāo)毀的唉堪!
到這里模聋,我們基本將線程池的整個(gè)工作邏輯都串起來(lái)了,也基本明白它是如何幫助我們執(zhí)行任務(wù)唠亚;但是這僅僅是主干邏輯链方,還有很多細(xì)節(jié),比如它的shutdown處理灶搜、terminal處理以及Worker的狀態(tài)改變等等祟蚀。所以看似簡(jiǎn)單,但是要吃透割卖,還需要更深入的理解前酿。
如果有不正確的地方,請(qǐng)幫忙指正鹏溯,謝謝罢维!