更多精彩請關(guān)注公眾號xhJaver,京東工程師和你一起成長
一庸毫、線程池狀態(tài)
首先我們要明確線程池的幾種狀態(tài)
1. RUNNING
- 這個(gè)狀態(tài)表明線程池處于正常狀態(tài),可以處理任務(wù),可以接受任務(wù)
2. SHUTDOWN
- 這個(gè)狀態(tài)表明線程池處于正常關(guān)閉狀態(tài),不再接受任務(wù),但是可以處理線程池中剩余的任務(wù)
3. STOP
- 這個(gè)狀態(tài)表明線程池處于停止?fàn)顟B(tài)几莽,不僅不會(huì)再接收新任務(wù),并且還會(huì)打斷正在執(zhí)行的任務(wù)
4. TIDYING
- 這個(gè)狀態(tài)表明線程池已經(jīng)沒有了任務(wù)宅静,所有的任務(wù)都被停掉了
5. TERMINATED
- 線程池徹底終止?fàn)顟B(tài)
他們的狀態(tài)轉(zhuǎn)換圖如下
<figcaption style="margin-top: 5px; text-align: center; color: #888; font-size: 14px;">線程池狀態(tài)</figcaption>
好了,知道了線程池的幾種狀態(tài)和他們是如何轉(zhuǎn)換的關(guān)系之后站欺,我們來看一下 當(dāng)我們提交一個(gè)任務(wù)時(shí)姨夹,線程池到底發(fā)生了什么?矾策!
我們平常使用線程池是這樣使用的
for (int i=0;i<10;i++){
//創(chuàng)建10個(gè)任務(wù)
Task task = new Task("task" + i);
//讓我們自定義的線程池去跑這些任務(wù)
threadPoolExecutor.execute(task);
}
我們來看一下 execute里面究竟有什么奇怪的東西磷账?
二、execute源碼
public void execute(Runnable command) {
//1.先判斷提交的任務(wù)是不是空的
if (command == null)
throw new NullPointerException();
//2.獲得線程池狀態(tài)
int c = ctl.get();
//3.判斷線程池?cái)?shù)量是否小于核心線程數(shù)
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//4.線程池?cái)?shù)量大于等于核心線程數(shù)并且線程池處于Running狀態(tài)贾虽,這時(shí)添加任務(wù)至阻塞隊(duì)列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 5.走到這里說明添加阻塞隊(duì)列失敗逃糟,
// 創(chuàng)建非核心線程也失敗的話,執(zhí)行拒絕策略
else if (!addWorker(command, false))
reject(command);
}
然后我們來看3那里蓬豁,
//3.判斷線程池?cái)?shù)量是否小于核心線程數(shù)
if (workerCountOf(c) < corePoolSize) {
//如果小于核心線程數(shù)
//3.1添加worker
if (addWorker(command, true))
//添加成功绰咽,返回
return;
//3.2添加失敗,獲取線程池狀態(tài)
c = ctl.get();
}
我用頭發(fā)想想都知道地粪,線程復(fù)用的秘密肯定藏在了addworker里取募,哦對我沒有頭發(fā) 我們再來看一看他里面有什么鬼
三、addworker源碼
private boolean addWorker(Runnable firstTask, boolean core) {
//標(biāo)志位蟆技,一會(huì)兒會(huì)跳過來
retry:
for (;;) {
//判斷線程池狀態(tài)
int c = ctl.get();
int rs = runStateOf(c);
//如果狀態(tài)非法則返回false
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;
for (;;) {
//判斷線程池線程總數(shù)量
int wc = workerCountOf(c);
//如果數(shù)量不符合要求則返回false
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//線程池?cái)?shù)量加1
if (compareAndIncrementWorkerCount(c))
//跳到開始 retry處且往下執(zhí)行玩敏,不在進(jìn)入大循環(huán)
break retry;
//數(shù)量增加失敗的話判斷當(dāng)前線程池狀態(tài)若和剛才狀態(tài)不一致則繼續(xù)執(zhí)行大循環(huán)
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//將提交的任務(wù)封裝進(jìn)worker
w = new Worker(firstTask);
//得到worker中的線程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//得到線程池狀態(tài)
int rs = runStateOf(ctl.get());
//如果狀態(tài)合法
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
//將worker添加至workers中 (這是個(gè)set集合,真正的線程池)
workers.add(w);
//判斷線程數(shù)量
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
//添加worker成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//執(zhí)行worker中的線程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
其中很重要的一段代碼是
//將提交的任務(wù)封裝進(jìn)worker
w = new Worker(firstTask);
//得到worker中的線程
final Thread t = w.thread;
....
.....
//執(zhí)行worker中的線程
t.start();
主要我們看這其中的worker是什么東西 (截取了worker中一部分源碼)
四质礼、Worker源碼
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
Worker(Runnable firstTask) {
//線程池狀態(tài)設(shè)為running
setState(-1); // inhibit interrupts until runWorker
//用戶提交的任務(wù)
this.firstTask = firstTask;
//通過創(chuàng)建一個(gè)線程旺聚,傳入的this是woker自身 worker繼承了Runnable 那么這個(gè)線程在t.start就是調(diào)用重寫的run()方法了
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
我們注意到剛才的t.start(); 就是執(zhí)行woker中的run方法,run方法又執(zhí)行了runworker() 方法 我們再來看下 runworker() 方法
五眶蕉、runworker源碼
final void runWorker(Worker w) {
//得到當(dāng)前線程
Thread wt = Thread.currentThread();
//這是我們提交的任務(wù)
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//線程復(fù)用的密碼就在這里砰粹,是一個(gè)while循環(huán),判斷如果提交的任務(wù)不為空或者隊(duì)列里有任務(wù)的話
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//執(zhí)行前的函數(shù)妻坝,用戶可以自己拓展
beforeExecute(wt, task);
Throwable thrown = null;
try {
//任務(wù)自己的run方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//執(zhí)行后的函數(shù)伸眶,用戶可以自己拓展
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//銷毀線程
processWorkerExit(w, completedAbruptly);
}
}
重點(diǎn)來了惊窖,我們來看一下 getTask()
六、getTask源碼
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//得到線程池線程數(shù)量
int wc = workerCountOf(c);
// 是否設(shè)置超時(shí)時(shí)間 allowCoreThreadTimeOut默認(rèn)是false
//判斷線程池?cái)?shù)量是否大于核心線程數(shù)厘贼,如果大于的話 timed為true
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//這里 timed為ture的時(shí)候界酒,采用帶超時(shí)時(shí)間的獲取元素的方法, 否則采取一直阻塞的方法
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
//獲取到任務(wù)就返回
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
由此可見 getTask里面是有超時(shí)標(biāo)志的timed的嘴秸,我們在第一篇平常說的線程池原理里面講到毁欣,若非核心線程空閑keepAliveTime分鐘則銷毀,就是在這里岳掐,keepAliveTime時(shí)間內(nèi)未獲取到任務(wù)凭疮,即為線程空閑狀態(tài),就退出了runWorker中的while循環(huán)串述,進(jìn)行銷毀線程的操作执解。
核心線程一定不會(huì)銷毀嗎? 我們注意到纲酗,這里面有一個(gè)allowCoreThreadTimeOut變量衰腌,如果他要是為true的話,那么核心線程也是可以銷毀的
threadPoolExecutor.allowCoreThreadTimeOut(true);
真的有核心線程與非核心線程之分嗎觅赊? 其實(shí)是沒有區(qū)別的右蕊,他們都是一樣的線程,線程池源碼中并沒有核心線程這個(gè)標(biāo)記吮螺,只是有一個(gè)核心線程數(shù)量饶囚,在這個(gè)數(shù)量之前創(chuàng)建先線程和在這個(gè)數(shù)量之后創(chuàng)建線程,默認(rèn)在這個(gè)數(shù)量之后創(chuàng)建的線程會(huì)在keepAliveTime空閑時(shí)間內(nèi)銷毀鸠补,我們?yōu)榱朔奖阌洃浡芊纾鴮⑵浞Q為非核心線程
七、總結(jié)
大體流程如下圖所示
我們向線程池提交任務(wù)后莫鸭,線程池會(huì)將我們的任務(wù)封裝成一個(gè)worker闹丐,這個(gè)worker里面有要執(zhí)行的線程t和要執(zhí)行的任務(wù),這個(gè)線程t的主要任務(wù)就是t.start被因,運(yùn)行runworker方法卿拴,在runworker方法中,會(huì)一直while循環(huán)獲取提交的任務(wù)若沒有提交的任務(wù)則會(huì)看隊(duì)列里有沒有任務(wù)梨与,獲取隊(duì)列任務(wù)時(shí)就會(huì)判斷超時(shí)標(biāo)志是否為true,如果為true的話堕花,則在超時(shí)時(shí)間內(nèi)未獲取到任務(wù)則返回null,然后銷毀當(dāng)前線程粥鞋,否則一直等待到獲取到任務(wù)為止缘挽,不銷毀線程,這樣就做到了線程復(fù)用。
更多精彩請關(guān)注公眾號xhJaver,京東工程師和你一起成長