4、 ThreadPoolExecutor 線程池
- 降低資源消耗。通過(guò)重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷(xiāo)毀造成的消耗。
- 提高響應(yīng)速度绘盟。當(dāng)任務(wù)到達(dá)時(shí),任務(wù)可以不需要等到線程創(chuàng)建就能立即執(zhí)行悯仙。
- 提高線程的可管理性龄毡。線程是稀缺資源,如果無(wú)限制地創(chuàng)建雁比,不僅會(huì)消耗系統(tǒng)資源,還會(huì)降低系統(tǒng)的穩(wěn)定性撤嫩,使用線程池可以進(jìn)行統(tǒng)一分配偎捎、調(diào)優(yōu)和監(jiān)控。但是序攘,要做到合理利用線程池茴她,必須對(duì)其實(shí)現(xiàn)原理了如指掌。
4.1程奠、創(chuàng)建ThreadPoolExecutor丈牢、參數(shù)、提交任務(wù)
// 1
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
}
// 2
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {}
// 3
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {}
- 參數(shù)解釋?zhuān)?
- corePoolSize(線程池的基本大小):當(dāng)提交一個(gè)任務(wù)到線程池時(shí)瞄沙,線程池會(huì)創(chuàng)建一個(gè)線程來(lái)執(zhí)行任務(wù)己沛,即使其他空閑的基本線程能夠執(zhí)行新任務(wù)也會(huì)創(chuàng)建線程慌核,等到需要執(zhí)行的任務(wù)數(shù)大于線程池基本大小時(shí)就不再創(chuàng)建。如果調(diào)用了線程池的prestartAllCoreThreads()方法申尼,線程池會(huì)提前創(chuàng)建并啟動(dòng)所有基本線程垮卓。
- maximumPoolSize(線程池最大數(shù)量):線程池允許創(chuàng)建的最大線程數(shù)。如果隊(duì)列滿了师幕,并且已創(chuàng)建的線程數(shù)小于最大線程數(shù)粟按,則線程池會(huì)再創(chuàng)建新的線程執(zhí)行任務(wù)。值得注意的是霹粥,如果使用了無(wú)界的任務(wù)隊(duì)列這個(gè)參數(shù)就沒(méi)什么效果灭将。
- keepAliveTime (線程活動(dòng)保持時(shí)間):線程池的工作線程空閑后,保持存活的時(shí)間后控。所以庙曙,如果任務(wù)很多,并且每個(gè)任務(wù)執(zhí)行的時(shí)間比較短忆蚀,可以調(diào)大時(shí)間矾利,提高線程的利用率。
- unit (線程活動(dòng)保持時(shí)間的單位):可選的單位有天(DAYS)馋袜、小時(shí)(HOURS)男旗、分鐘 (MINUTES)、毫秒(MILLISECONDS)欣鳖、微秒(MICROSECONDS察皇,千分之一毫秒)和納秒。
- workQueue runnableTaskQueue(任務(wù)隊(duì)列):用于保存等待執(zhí)行的任務(wù)的阻塞隊(duì)列泽台∈踩伲可以選擇以下幾個(gè)阻塞隊(duì)列。
- ArrayBlockingQueue:是一個(gè)基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列怀酷,此隊(duì)列按FIFO(先進(jìn)先出)原則對(duì)元素進(jìn)行排序稻爬。
- LinkedBlockingQueue:一個(gè)基于鏈表結(jié)構(gòu)的阻塞隊(duì)列,此隊(duì)列按FIFO排序元素蜕依,吞吐量通常要高于ArrayBlockingQueue桅锄。靜態(tài)工廠方法Executors.newFixedThreadPool()使用了這個(gè)隊(duì)列。
- SynchronousQueue:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列样眠。每個(gè)插入操作必須等到另一個(gè)線程調(diào)用 移除操作友瘤,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于Linked-BlockingQueue檐束,靜態(tài)工廠方法Executors.newCachedThreadPool使用了這個(gè)隊(duì)列辫秧。
- PriorityBlockingQueue:一個(gè)具有優(yōu)先級(jí)的無(wú)限阻塞隊(duì)列。
- handler (飽和策略):當(dāng)隊(duì)列和線程池都滿了被丧,說(shuō)明線程池處于飽和狀態(tài)盟戏,那么必須采取一種策略處理提交的新任務(wù)绪妹。這個(gè)策略默認(rèn)情況下是AbortPolicy,表示無(wú)法處理新任務(wù)時(shí)拋出異常抓半。在JDK1.5中Java線程池框架提供了以下4種策略喂急。
- AbortPolicy:直接拋出異常。
- CallerRunsPolicy:只用調(diào)用者所在線程來(lái)運(yùn)行任務(wù)笛求。
- DiscardOldestPolicy:丟棄隊(duì)列里最近的一個(gè)任務(wù)廊移,并執(zhí)行當(dāng)前任務(wù)。
- DiscardPolicy:不處理探入,丟棄掉狡孔。
- threadFactory 用于設(shè)置創(chuàng)建線程的工廠,可以通過(guò)線程工廠給每個(gè)創(chuàng)建出來(lái)的線程設(shè)置更有意義的名字蜂嗽。
- 提交任務(wù)
- execute() 方法用于提交不需要返回值的任務(wù)苗膝,所以無(wú)法判斷任務(wù)是否被線程池執(zhí)行成功。
- submit() 方法用于提交需要返回值的任務(wù)植旧。線程池會(huì)返回一個(gè)future類(lèi)型的對(duì)象辱揭,通過(guò)這個(gè)future對(duì)象可以判斷任務(wù)是否執(zhí)行成功,并且可以通過(guò)future的get()方法來(lái)獲取返回值病附,get()方法會(huì)阻塞當(dāng)前線程直到任務(wù)完成问窃,而使用get(long timeout,TimeUnit unit)方法則會(huì)阻塞當(dāng)前線 程一段時(shí)間后立即返回完沪,這時(shí)候有可能任務(wù)沒(méi)有執(zhí)行完域庇。
摘抄自《Java并發(fā)編程的藝術(shù)》
4.2、線程池的生命周期及其他狀態(tài)
了解線程池的生命周期對(duì)理解線程池的實(shí)現(xiàn)原理至關(guān)重要覆积。線程池的生命周期通過(guò)AtomicInteger類(lèi)型的變量ctl來(lái)維護(hù)听皿。
// ctl變量維護(hù)線程池的生命周期并提供一些額外方法
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
-
生命周期:
- RUNNING:接受新任務(wù)并處理隊(duì)列中已有的任務(wù)。
- SHUTDOWN:不接受新任務(wù)宽档,但會(huì)處理隊(duì)列中已有任務(wù)尉姨。
- STOP:不接受新任務(wù)、不處理隊(duì)列中已有任務(wù)吗冤,且中斷正在運(yùn)行的任務(wù)又厉。
- TIDYING:線程池中所有的線程都已終止,并將workerCount清零欣孤。
- TERMINATED:線程池完全停止馋没。
-
生命周期轉(zhuǎn)換:
-
RUNNING -> SHUTDOWN
昔逗,調(diào)用shutdown()方法降传。 -
(RUNNING or SHUTDOWN) -> STOP
,調(diào)用shutdownNow()方法勾怒。 -
SHUTDOWN -> TIDYING
婆排,線程池和任務(wù)隊(duì)列同時(shí)為空声旺。 -
STOP -> TIDYING
,線程池為空段只。 -
TIDYING -> TERMINATED
腮猖,調(diào)用terminated()鉤子方法。
-
了解了線程池的狀態(tài)赞枕,以及狀態(tài)之間的轉(zhuǎn)換之后澈缺,下面我們著手開(kāi)始分析線程池的源碼。
4.3炕婶、以execute方式提交任務(wù)
execute()方法用于提交不需要返回值的任務(wù)姐赡,所以無(wú)法判斷任務(wù)是否被線程池執(zhí)行成功。通過(guò)分析execute()方法柠掂,我們將了解任務(wù)提交之后项滑,線程池是如何創(chuàng)建線程、如何將線程池的線程數(shù)逐步擴(kuò)大到maximumPoolSize涯贞、以及如何處理隊(duì)列中的任務(wù)枪狂。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// ① 線程池中的線程數(shù)小于corePoolSize,創(chuàng)建線程宋渔。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// ② 線程池中的線程數(shù)大于corePoolSize州疾。
// 第二步的邏輯稍有些復(fù)雜,分為三個(gè)步驟:
// 2.1傻谁、若阻塞隊(duì)列未滿孝治,將任務(wù)加入阻塞隊(duì)列,而非創(chuàng)建線程
if (isRunning(c) && workQueue.offer(command)) {
// 因?yàn)樽枞?duì)列和線程池使用的不是同一把鎖审磁,任務(wù)入隊(duì)之后谈飒,
// 線程池的狀態(tài)可能會(huì)發(fā)生變化,從而導(dǎo)致任務(wù)無(wú)法正常執(zhí)行态蒂,所以這里要做一次二次校驗(yàn)
int recheck = ctl.get();
// 2.2杭措、若線程池非運(yùn)行狀態(tài),則將任務(wù)移出阻塞隊(duì)列
//(注意:!isRunning(recheck) = (STOP 或 TIDYING 或 TERMINATED))
// 處于這三種狀態(tài)之一的線程池即不接受新任務(wù)钾恢、也不處理隊(duì)列中的任務(wù)
if (!isRunning(recheck) && remove(command))
// 根據(jù)RejectedExecutionHandler策略回絕該任務(wù)
reject(command);
// 2.3 若線程池中的worker數(shù)為0
else if (workerCountOf(recheck) == 0)
// 創(chuàng)建一個(gè)任務(wù)為null的worker手素,初看之下較為晦澀難懂。
// 分析:雖然worker數(shù)為0瘩蚪,但是阻塞隊(duì)列不一定為空泉懦,
// 而阻塞隊(duì)列中任務(wù)的執(zhí)行又依賴worker來(lái)完成,所以這里加入一個(gè)任務(wù)為空的worker
// 以便繼續(xù)執(zhí)行阻塞隊(duì)列中的任務(wù)
addWorker(null, false);
}
// ③ 執(zhí)行到此處疹瘦,線程池中的線程數(shù)大于corePoolSize崩哩、且隊(duì)列已滿。
// 3.1、若線程池中的線程數(shù)小于maximumPoolSize邓嘹,繼續(xù)創(chuàng)建線程酣栈;
// 3.2、否則根據(jù)飽和策略汹押,回絕該任務(wù)矿筝。
else if (!addWorker(command, false))
reject(command);
}
以上就是通過(guò)execute()方法提交任務(wù)之后,線程池對(duì)任務(wù)的接受處理過(guò)程棚贾。主要分為三種情況:
- 線程池中線程數(shù)窖维,小于corePoolSize,創(chuàng)建線程妙痹。
- 線程池中線程數(shù)陈辱,大于corePoolSize,阻塞隊(duì)列未滿细诸,將任務(wù)加入阻塞隊(duì)列沛贪。
- 二次判斷線程池狀態(tài),若非運(yùn)行狀態(tài)震贵,回絕利赋;若運(yùn)行狀態(tài),再判斷worker數(shù)猩系。
- worker數(shù)小于0媚送,創(chuàng)建任務(wù)為空的worker,以便執(zhí)行阻塞隊(duì)列中的任務(wù)寇甸。
- 線程池中線程數(shù)塘偎,大于corePoolSize,小于maximumPoolSize拿霉,阻塞隊(duì)列已滿吟秩,創(chuàng)建線程。
4.3.1绽淘、addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 自旋
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// ① 快速檢查線程池涵防、阻塞隊(duì)列的狀態(tài),若不滿足添加條件沪铭,快速返回失敗
// 第一個(gè)條件:rs >= SHUTDOWN 根據(jù)線程池生命周期的定義壮池,若狀態(tài)大于等于SHUTDOWN不接受新任務(wù)
// 第二個(gè)條件:rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty() 三者有一個(gè)為不成立
// 綜合第一個(gè)條件:
// 1.1、若rs == SHUTDOWN不成立杀怠,rs為STOP椰憋、TIDYING、TERMINATED其中之一赔退,不接受新任務(wù)橙依、不處理隊(duì)列任務(wù),返回false;
// 1.2票编、若firstTask == null不成立,rs為SHUTDOWN卵渴,不接受新任務(wù)慧域,返回false;
// 1.3浪读、若!workQueue.isEmpty()不成立昔榴,rs為SHUTDOWN、且firstTask == null碘橘、且隊(duì)列為空
// 第三個(gè)條件較為晦澀互订,結(jié)合前文分析,此條件是為了保證線程池中依然有worker可以處理阻塞隊(duì)列的任務(wù)痘拆,
// 若阻塞隊(duì)列為空仰禽,則阻塞隊(duì)列無(wú)任務(wù)需要處理,返回false纺蛆。
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;
// ② 自旋吐葵,嘗試將worker數(shù)加一
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// ③ 將任務(wù)包裝成Worker(此步非常重要,下文單獨(dú)分析)
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// ④ 為防止線程池在獲取鎖之前關(guān)閉桥氏,這里要進(jìn)行二次檢查温峭。
// rs < SHUTDOWN 線程池的狀態(tài)為運(yùn)行中,可接受新任務(wù)字支、可以處理隊(duì)列中的任務(wù)凤藏。
// rs == SHUTDOWN && firstTask == null ?堕伪?揖庄? 此處尚不清晰
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 將任務(wù)添加至worker集合
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// ④ 開(kāi)啟線程(這一步最終會(huì)執(zhí)行到runWorker()方法,下文單獨(dú)分析其調(diào)用過(guò)程)
t.start();
workerStarted = true;
}
}
} finally {
// ⑤ 線程啟動(dòng)失敗欠雌,回滾
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
addWorker方法是線程池中較為核心的方法之一:
- 狀態(tài)檢查抠艾,快速返回失敗。如果不了解線程池的狀態(tài)桨昙、線程池運(yùn)行上下文检号,則較為晦澀難懂。
- 將任務(wù)包裝成Worker對(duì)象蛙酪,以便委托worker執(zhí)行任務(wù)齐苛。
- 開(kāi)啟線程。開(kāi)啟線程是指開(kāi)啟worker(worker實(shí)現(xiàn)了Runnable接口)線程桂塞,并通過(guò)run方法調(diào)用runWorker方法來(lái)執(zhí)行提交的任務(wù)凹蜂。
- worker添加失敗,回滾。
4.3.2玛痊、 將任務(wù)包裝成Worker對(duì)象
線程池將任務(wù)包裝成Worker對(duì)象汰瘫,并在其構(gòu)造方法中通過(guò)this
將任務(wù)指向其本身。由于Worker類(lèi)繼承了Runnable接口擂煞,線程啟動(dòng)后混弥,即可通過(guò)run()方法調(diào)用runWorker()方法來(lái)完成任務(wù)的執(zhí)行。
// Worker類(lèi)部分源碼
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
// 注意:前文addWorker方法中啟動(dòng)的線程即該線程对省,而該線程即worker本身
final Thread thread;
// 任務(wù)
Runnable firstTask;
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
// 注意這里傳入的參數(shù)為this蝗拿,即Worker本身
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
}
4.3.3、runWorker
// 委托run()方法來(lái)執(zhí)行runWorker()方法
public void run() {
runWorker(this);
}
// runWorker()方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
// ① 標(biāo)記任務(wù)執(zhí)行過(guò)程中蒿涎,是否拋出異常
boolean completedAbruptly = true;
try {
// ②
// 2.1哀托、任務(wù)不為空,直接執(zhí)行
// 2.2劳秋、任務(wù)為空仓手,通過(guò)getTask()方法從阻塞隊(duì)列中獲取任務(wù)
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
&& !wt.isInterrupted())
wt.interrupt();
try {
// 模板方法,任務(wù)開(kāi)始執(zhí)行之前玻淑,可在該方法中做一些額外處理
beforeExecute(wt, task);
Throwable thrown = null;
try {
// ③ 執(zhí)行任務(wù)
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 模板方法俗或,任務(wù)結(jié)束執(zhí)行之后,可在該方法中做一些額外處理
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// ④ 處理worker退出
processWorkerExit(w, completedAbruptly);
}
}
- 定義變量completedAbruptly岁忘,以標(biāo)記任務(wù)執(zhí)行過(guò)程中辛慰,是否拋出異常。
- 獲取任務(wù)
- 任務(wù)不為空干像,直接執(zhí)行帅腌。
- 否則,通過(guò)getTask()方法從隊(duì)列中獲取任務(wù)麻汰。若當(dāng)前任務(wù)隊(duì)列中無(wú)任務(wù)速客,則阻塞。直至獲取到任務(wù)或返回null五鲫。
- 執(zhí)行任務(wù)溺职。
- 調(diào)用processWorkerExit()方法,處理worker退出位喂。
4.3.4浪耘、 getTask()
以阻塞或超時(shí)(allowCoreThreadTimeOut為true)的方式從隊(duì)列中獲取任務(wù),返回任務(wù)或null塑崖。比較特殊的是七冲,在該方法有可能導(dǎo)致worker退出:
- 線程池中線程數(shù)大于maximumPoolSize(調(diào)用setMaximumPoolSize()導(dǎo)致)。
- 線程池狀態(tài)為STOP规婆。
- 線程池SHUTDOWN且任務(wù)隊(duì)列為空澜躺。
- 獲取任務(wù)超時(shí)蝉稳。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
// 自旋
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 快速判斷線程池、任務(wù)隊(duì)列狀態(tài)
// 1掘鄙、rs >= STOP 線程池不接受新任務(wù)耘戚、不處理隊(duì)列任務(wù),返回null
// 2操漠、rs = SHUTDOWN && workQueue.isEmpty() 隊(duì)列中無(wú)待處理任務(wù)收津,返回null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 當(dāng)allowCoreThreadTimeOut為true或線程池中線程數(shù)大于corePoolSize時(shí),可銷(xiāo)毀線程
// 1颅夺、allowCoreThreadTimeOut:false(默認(rèn)):即使核心線程處于空閑狀態(tài)也不會(huì)被銷(xiāo)毀;
// 2蛹稍、allowCoreThreadTimeOut:true: 核心任務(wù)超過(guò)keepAliveTime還未獲取到任務(wù)吧黄,則銷(xiāo)毀。
// 當(dāng)然是否銷(xiāo)毀線程唆姐,還需根據(jù)后續(xù)獲取任務(wù)的狀況判斷
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 若滿足銷(xiāo)毀條件拗慨,返回null
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 獲取任務(wù)
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
// 如獲取到任務(wù),則返回奉芦;否則將timedOut標(biāo)記為true赵抢,作為下一輪自旋銷(xiāo)毀線程的判斷條件
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
小結(jié):代碼執(zhí)行到這里,任務(wù)正常執(zhí)行声功、通過(guò)getTask()方法獲取任務(wù)并執(zhí)行兩種方式都已分析過(guò)烦却。接著分析runWorker方法,還剩下最后一步processWorkerExit()
4.3.5先巴、 processWorkerExit
processWorkerExit執(zhí)行條件:
- 任務(wù)在執(zhí)行過(guò)程中拋出異常
- getTask()方法返回null
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 移除worker
workers.remove(w);
} finally {
mainLock.unlock();
}
// 若線程池的狀態(tài)以及任務(wù)隊(duì)列的狀態(tài)符合線程池終止的條件其爵,則終止線程池
tryTerminate();
// 嘗試終止線程池后,線程池的狀態(tài)可能發(fā)生了變化
// 接下來(lái)還要對(duì)線程池狀態(tài)為RUNNING伸蚯、SHUTDOWN做一些判斷
// 防止不合理的清除核心線程摩渺、防止清除全部線程而任務(wù)隊(duì)列依然有待執(zhí)行任務(wù)的情況
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
// worker在執(zhí)行任務(wù)時(shí)未出異常
if (!completedAbruptly) {
// 定義變量min,記錄線程池中可以保留的最小線程數(shù)
// 若allowCoreThreadTimeOut為true剂邮,則線程池保留最小線程數(shù)可以為0摇幻;否則為corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 核心線程池保留最小線程數(shù)為0,則要考慮任務(wù)隊(duì)列是否為空
// 若任務(wù)隊(duì)列不為空挥萌,則至少要保留一個(gè)線程绰姻,以便繼續(xù)執(zhí)行任務(wù)隊(duì)列中的任務(wù)
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 若線程池中線程數(shù)大于等于min,則直接返回
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// worker在執(zhí)行任務(wù)時(shí)出現(xiàn)異常引瀑,則向線程池中加入一個(gè)任務(wù)為空的線程
addWorker(null, false);
}
}
// 嘗試終止線程池
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 檢查線程池的狀態(tài)龙宏,以下三種情況,不能終止線程池
// 1伤疙、線程池狀態(tài)為RUNNING
// 2寓调、線程池的狀態(tài)為T(mén)IDYING或TERMINATED
// 3、線程池的狀態(tài)為SHUTDOWN朦肘,但是任務(wù)隊(duì)列中有未處理的任務(wù)
if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 檢查線程池中線程數(shù)是否為0灯抛,并中斷線程池中空閑的線程
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 將線程池的狀態(tài)設(shè)置為T(mén)IDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 模板方法,可以在該方法中做一些額外的事情
terminated();
} finally {
// 將線程池的狀態(tài)設(shè)置為T(mén)ERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
processWorkerExit()方法在嘗試終止線程池之后嫉鲸,對(duì)線程池、任務(wù)隊(duì)列的狀態(tài)做了一些判斷。再結(jié)合completedAbruptly次慢、allowCoreThreadTimeOut屬性以確定是否真的銷(xiāo)毀線程、是否維護(hù)一個(gè)執(zhí)行任務(wù)隊(duì)列的線程翔曲。從代碼中可以看到迫像,即使是核心線程,也可能被銷(xiāo)毀瞳遍。該段代碼雖短闻妓,涉及的知識(shí)點(diǎn)很多,若分析有不正確的地方掠械,還希望大家可以指正由缆。
分析到此,關(guān)于線程池的創(chuàng)建猾蒂、運(yùn)行均唉、銷(xiāo)毀等核心代碼,都以分析完畢肚菠。下面再分析一些其他較為重要的代碼舔箭。
4.4、關(guān)閉線程池
關(guān)閉線程池有以下兩種方式:
- shutdownNow()
- 將線程池狀態(tài)更改為STOP蚊逢。
- 中斷所有已經(jīng)啟動(dòng)的線程限嫌。
- 返回等待執(zhí)行的任務(wù)列表。
- shutdown()
- 將線程池的狀態(tài)更改為SHUTDOWN
- 中斷所有空閑線程
shutdownNow()和shutdown()雖然同為關(guān)閉線程池时捌,但區(qū)別還是很大的怒医。在關(guān)閉線程池時(shí),如需要執(zhí)行完等待任務(wù)可使用shutdown()奢讨,若不考慮稚叹,可以使用shutdownNow()拿诸。兩者大體思路均是通過(guò)循環(huán)并中斷線程來(lái)實(shí)現(xiàn),所以任務(wù)要盡量可以響應(yīng)中斷亩码,否則有可能導(dǎo)致,雖然線程池關(guān)閉了飒泻,但是線程依然在運(yùn)行的情況鞭光。
4.4.1惰许、shutdown()
// 關(guān)閉線程池
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 將線程池的狀態(tài)更改為SHUTDOWN
advanceRunState(SHUTDOWN);
// 中斷所有空閑線程
interruptIdleWorkers();
// ScheduledThreadPoolExecutor 鉤子方法
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 嘗試徹底關(guān)閉線程池
tryTerminate();
}
// 中斷空閑線程
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 循環(huán)所有worker
for (Worker w : workers) {
Thread t = w.thread;
// 若線程未被中斷過(guò)史辙,且線程空閑
// “空閑” 是指:線程未獲取到鎖。所以這里嘗試去獲取鎖聊倔,一旦能獲取成功,則表明該線程確實(shí)是空閑線程耙蔑。
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
4.4.2、shutdownNow()
// 關(guān)閉線程池
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 將線程池的狀態(tài)更改為STOP
advanceRunState(STOP);
// 中斷所有worker
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
// 中斷所有已啟動(dòng)線程
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
4.5徐鹤、以submit方式提交任務(wù)
前文已經(jīng)分析了以execute()方式提交任務(wù)的過(guò)程邀层。下面分析一下另一種方式,以submit()方式提交任務(wù)寥院。與execute()方法不同涛目,submit()具有返回值。
// 方式一
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
// 方式二
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
// 方式三
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
submit()有三種提交任務(wù)的方式霹肝,可再歸納為兩種估蹄,一種提交Runnable任務(wù);一種提交Callable任務(wù)沫换。submit()較execute()方法只是將任務(wù)封裝成了FutureTask對(duì)象臭蚁,再調(diào)用execute()方法執(zhí)行任務(wù)而已。相對(duì)比較簡(jiǎn)單讯赏,不做過(guò)多的分析垮兑。
接下來(lái)介紹一下ThreadPoolExecutor的擴(kuò)展ScheduledThreadPoolExecutor。