首先看ThreadPoolExecutor的核心方法execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {//workerCountOf(c) 就是當(dāng)前線程數(shù)
if (addWorker(command, true))//步驟1
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {//步驟2
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);//這里注意為傳入的task為null
}
else if (!addWorker(command, false))//步驟3
reject(command);
}
這個(gè)方法很簡明暑椰,就是線程池的基本原理:
1.線程池?cái)?shù)量小于核心線程池?cái)?shù)量,則通過addWorker(稍后分析該方法)將任務(wù)加入線程池中荐绝,如果成功則返回干茉。
2.如果步驟1返回失敗,則看線程池是否在running狀態(tài)很泊,如果在則把任務(wù)送進(jìn)等待隊(duì)列角虫。如果這一步成功,再檢查一次線程池狀態(tài)委造,如果線程池不是running狀態(tài)并且當(dāng)前任務(wù)從隊(duì)列移除成功戳鹅,則執(zhí)行拒絕策略,否則如果worker數(shù)量等于0的話昏兆,則相當(dāng)于新建一個(gè)線程枫虏。如果沒有這個(gè)調(diào)用,當(dāng)你把coreSize設(shè)置為0時(shí)爬虱,往線程池里添加任務(wù)隶债,任務(wù)會(huì)被放在任務(wù)隊(duì)列了,永遠(yuǎn)得不到執(zhí)行跑筝。
3.如果addWoker失敗死讹,即超過了最大線程池?cái)?shù)量,則執(zhí)行拒絕策略曲梗。
BTW赞警,線程池中的有個(gè)ctl變量妓忍,這個(gè)變量的定義是
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
道格老爺子用的這個(gè)變量的前3位定義線程池的狀態(tài),后29位作為worker的數(shù)量愧旦。
我們可以看出世剖,整個(gè)流程的核心方法就是addWoker,在看它的代碼前笤虫,先來看一看Worker這個(gè)類旁瘫,該類定義如下
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
先看它的成員變量和構(gòu)造方法:
Worker的變量中有thread和runnable,那么大概可以猜出Worker就是一個(gè)Thread的包裝類琼蚯,負(fù)責(zé)運(yùn)行任務(wù)境蜕。
構(gòu)造方法中,就使用線程池的ThreadFactory來new了線程凌停。
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); //不希望在runWorker之前中斷
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
看到這個(gè)類也繼承了AQS,看一下實(shí)現(xiàn)的方法售滤,使用的互斥模式罚拟,也就是很正常的并發(fā)訪問控制。state=0代表解鎖狀態(tài)完箩,state=1代表上鎖狀態(tài)赐俗。但是要注意的是這個(gè)并不是像ReentrantLock一樣的重入鎖。因?yàn)楫?dāng)執(zhí)行interruptIdleWorkers時(shí)(shutdown等會(huì)調(diào)用)弊知,會(huì)獲取Worker的鎖阻逮,而我們不希望這時(shí)候Worker能獲取鎖中斷線程,因?yàn)闀?huì)增大線程管理和中斷控制的難度秩彤。
再看為什么初始化Worker時(shí)要setState(-1)叔扼,就是要避免開始執(zhí)行之前的Worker不會(huì)被中斷。那么什么時(shí)候會(huì)中斷Worker的線程漫雷?就是調(diào)用shutdownNow時(shí)(shutdownNow不像shutdown需要提前獲取worker的鎖才能中斷線程)瓜富,里面會(huì)調(diào)用interruptIfStarted方法,判斷了state>=0才會(huì)被中斷(見下interruptIfStarted方法)降盹。
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {//state=-1不會(huì)被中斷
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
ok与柑,這個(gè)類也實(shí)現(xiàn)了Runnable接口,看下它的run方法蓄坏,一個(gè)while循環(huán)价捧,先運(yùn)行自己傳進(jìn)來的任務(wù),如果傳進(jìn)來的任務(wù)為null涡戳,則從隊(duì)列里面取任務(wù)運(yùn)行结蟋。代碼中看到如果要走出這個(gè)循環(huán)的話,要么Worker的線程被中斷渔彰,要么getTask=null椎眯。順便一提的是挠将,Worker本身不運(yùn)行run,而是里面thread通過start運(yùn)行這個(gè)方法编整。再進(jìn)去看下getTask方法舔稀。
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 因?yàn)閃orker初始化state=-1,這里先設(shè)置為0掌测,否則獲取不了鎖
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {//getTask是從隊(duì)列取出任務(wù)内贮,取到就繼續(xù)運(yùn)行。
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();//線程池shutdownNow后中斷worker
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();//運(yùn)行自己的業(yè)務(wù)方法
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
getTask方法就是不斷從隊(duì)列里面取任務(wù)汞斧。如果是核心線程夜郁,就一直取任務(wù);如果是非核心線程粘勒,在keepAliveTime沒取到任務(wù)就返回null竞端。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
//...省略代碼
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //看到如果當(dāng)前線程數(shù)大于coreSize,則啟動(dòng)從隊(duì)列取任務(wù)時(shí)采用超時(shí)的方法取庙睡。
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {//可以看到如果線程池只有一個(gè)線程事富,那么這個(gè)線程就算是非核心線程,也不會(huì)銷毀的乘陪。
if (compareAndDecrementWorkerCount(c))
return null;//如果當(dāng)前線程數(shù)大于coreSize统台,并且隊(duì)列是空,返回null
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ://這里終于看到keepAliveTime的作用了啡邑。
//如果是非核心線程贱勃,在keepAliveTime時(shí)間內(nèi)沒有任務(wù)進(jìn)來,那么根據(jù)上面的runWorker方法谤逼,取到的值是null贵扰,那么就跳出循環(huán),線程自動(dòng)銷毀流部。
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
??分析完Worker后拔鹰,又回到addWorker方法。每一步的注釋如下
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 省略代碼...
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))//這里就是看是否是添加的核心線程池?cái)?shù)的任務(wù)還是核心線程之外的任務(wù)贵涵。
return false;
// 省略代碼...
}
}
//經(jīng)過一系列校驗(yàn)后
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);//這個(gè)就是線程池中的核心內(nèi)部類列肢。
final Thread t = w.thread;//取出new好的線程
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());//線程池的狀態(tài)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {//再次檢查線程池狀態(tài)
if (t.isAlive()) //如果線程池SHUTDOWN或者RUNNING,但是線程被啟動(dòng)了宾茂,拋異常
throw new IllegalThreadStateException();
workers.add(w);//緩存worker
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;//更新池中最大線程數(shù)量
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {如果添加成功
t.start();//最終調(diào)用runWorker的地方瓷马。
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);//如果添加失敗就會(huì)嘗試關(guān)閉線程池。
}
return workerStarted;
}
總結(jié):首先大致分析了線程池運(yùn)行的基本流程跨晴,簡單來說execute就是一直往隊(duì)列中扔任務(wù)欧聘,創(chuàng)建好的Worker不斷從中取任務(wù)運(yùn)行。這里要注意的是端盆,線程池在初始化時(shí)并沒有將核心線程數(shù)的線程一起初始化怀骤,而是來一個(gè)任務(wù)费封,創(chuàng)建一個(gè)線程。還有就是當(dāng)線程數(shù)超過核心線程數(shù)并且開始銷毀多出核心線程數(shù)的線程時(shí)蒋伦,有可能銷毀的是在小于核心線程數(shù)時(shí)創(chuàng)建出來的舊線程弓摘。
??然后是內(nèi)部類Worker,這里Worker就是thread和task的一個(gè)包裝類痕届,它的職能就是控制中斷和任務(wù)的運(yùn)行韧献。Worker是一個(gè)集成了AQS,實(shí)現(xiàn)了Runnable方法的內(nèi)部類研叫。Worker創(chuàng)建好后锤窑,通過new好的線程來運(yùn)行任務(wù)。核心Worker通過while不斷從隊(duì)列中取出任務(wù)嚷炉,任務(wù)隊(duì)列為空線程就阻塞渊啰;非核心Worker也是通過while不斷取任務(wù),只是有個(gè)取任務(wù)時(shí)keepAliveTime的超時(shí)時(shí)間申屹,在時(shí)間之內(nèi)取不到的任務(wù)的話線程就跳出循環(huán)绘证,自動(dòng)銷毀了。