前言
JAVA通過多線程的方式實(shí)現(xiàn)并發(fā)俐筋,為了方便線程池的管理知纷,JAVA采用線程池的方式對(duì)線線程的整個(gè)生命周期進(jìn)行管理免猾。1.5后引入的Executor框架的最大優(yōu)點(diǎn)是把任務(wù)的提交和執(zhí)行解耦岂嗓。
要執(zhí)行任務(wù)的人只需把Task描述清楚互站,然后提交即可私蕾。這個(gè)Task是怎么被執(zhí)行的,被誰(shuí)執(zhí)行的胡桃,什么時(shí)候執(zhí)行的踩叭,提交的人就不用關(guān)心了。
具體點(diǎn)講翠胰,提交一個(gè)Callable對(duì)象給ExecutorService(如最常用的線程池ThreadPoolExecutor)容贝,將得到一個(gè)Future對(duì)象,調(diào)用Future對(duì)象的get方法等待執(zhí)行結(jié)果就好了之景。
一個(gè)簡(jiǎn)單的例子
// 一個(gè)有5個(gè)作業(yè)線程的線程池斤富,
//老大的老大找到一個(gè)管5個(gè)人的小團(tuán)隊(duì)的老大
ExecutorService executorService = Executors.newFixedThreadPool(5);
//提交作業(yè)給老大,作業(yè)內(nèi)容封裝在Callable中锻狗,約定好了輸出的類型是String满力。
String outputs = executorService.submit(
new Callable<String>() {
public String call() throws Exception {
return "I am a task";
}
//提交后就等著結(jié)果吧,到底是手下5個(gè)作業(yè)中誰(shuí)領(lǐng)到任務(wù)了屋谭,老大是不關(guān)心的脚囊。
}).get();
System.out.println(outputs);
使用上非常簡(jiǎn)單,通過一個(gè)工場(chǎng)類Executors創(chuàng)建了一個(gè)工作類桐磁,工場(chǎng)類返回一個(gè)ExecutorService對(duì)象悔耘。
執(zhí)行過程
任務(wù)提交
ExecutorService是一個(gè)接口,沒有具體實(shí)現(xiàn)我擂,最后的具體實(shí)現(xiàn)應(yīng)該由ThreadPoolExecutor實(shí)現(xiàn)的衬以。
Executor 定義了一個(gè)execute接口,ExecutorService繼承了Executor校摩,并定義了管理線程生命周期的接口看峻,可以接受提交任務(wù)、執(zhí)行任務(wù)衙吩、關(guān)閉服務(wù)互妓。
抽象類AbstractExecutorService 實(shí)現(xiàn)了ExecutorService接口,也實(shí)現(xiàn)了接口定義的默認(rèn)行為;ThreadPoolExecutor繼承了AbstractExecutorService。
AbstractExecutorService任務(wù)提交的submit方法有三個(gè)實(shí)現(xiàn)冯勉。
-
接收一個(gè)Runnable的Task澈蚌,沒有執(zhí)行結(jié)果;
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }
-
接收兩個(gè)參數(shù):一個(gè)任務(wù)灼狰,一個(gè)執(zhí)行結(jié)果宛瞄;
public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; }
-
接收一個(gè)Callable,本身就包含執(zhí)任務(wù)內(nèi)容和執(zhí)行結(jié)果交胚。
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
submit方法的返回結(jié)果是Future類型份汗,調(diào)用該接口定義的get方法即可獲得執(zhí)行結(jié)果。 V get() 方法的返回值類型V是在提交任務(wù)時(shí)就約定好了的蝴簇。
分析
-
看AbstractExecutorService中submit(Callable<T> task)杯活,構(gòu)造好一個(gè)FutureTask對(duì)象后,調(diào)用execute()方法執(zhí)行任務(wù)熬词。
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
} -
Submit傳入的參數(shù)都被封裝成了FutureTask類型來execute的轩猩,對(duì)應(yīng)前面三個(gè)不同的參數(shù)類型都會(huì)封裝成FutureTask。
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
} Executor接口中定義的void execute(Runnable command)方法的作用就是執(zhí)行提交的任務(wù)荡澎,該方法在抽象類AbstractExecutorService的子類ThreadPoolExecutor中實(shí)現(xiàn)。
一個(gè)任務(wù)的執(zhí)行過程
先補(bǔ)充下ThreadPoolExecutor有兩個(gè)最重要的集合屬性晤锹,分別是存儲(chǔ)接收任務(wù)的任務(wù)隊(duì)列和用來干活的作業(yè)集合摩幔。
//任務(wù)隊(duì)列
private final BlockingQueue<Runnable> workQueue;
//作業(yè)線程集合
private final HashSet<Worker> workers = new HashSet<Worker>();
execute源碼分析
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. 如果當(dāng)前正在執(zhí)行的Worker數(shù)量比corePoolSize(核心線程大小)要小。
* 直接創(chuàng)建一個(gè)新的Worker執(zhí)行任務(wù)鞭铆,會(huì)調(diào)用addWorker方法
*
* 2. 如果當(dāng)前正在執(zhí)行的Worker數(shù)量大于等于corePoolSize(核心線程大小)或衡。
* 將任務(wù)放到阻塞隊(duì)列里,如果阻塞隊(duì)列沒滿并且狀態(tài)是RUNNING的話车遂,直接丟到阻塞隊(duì)列封断,否則執(zhí)行第3步。
* 丟到阻塞隊(duì)列之后舶担,還需要再做一次驗(yàn)證(丟到阻塞隊(duì)列之后可能另外一個(gè)線程關(guān)閉了線程池或者剛剛加入到隊(duì)列的線程死了)坡疼。
* 如果這個(gè)時(shí)候線程池不在RUNNING狀態(tài),把剛剛丟入隊(duì)列的任務(wù)remove掉衣陶,調(diào)用reject方法柄瑰,
* 否則查看Worker數(shù)量,如果Worker數(shù)量為0剪况,起一個(gè)新的Worker去阻塞隊(duì)列里拿任務(wù)執(zhí)行
*
* 3. 丟到阻塞失敗的話教沾,會(huì)調(diào)用addWorker方法嘗試起一個(gè)新的Worker去阻塞隊(duì)列拿任務(wù)并執(zhí)行任務(wù),
* 如果這個(gè)新的Worker創(chuàng)建失敗译断,調(diào)用reject方法
*/
int c = ctl.get();
// 第一個(gè)步驟授翻,滿足線程池中的線程大小比核心線程大小要小
if (workerCountOf(c) < corePoolSize) {
// addWorker方法第二個(gè)參數(shù)true表示使用基本大小
if (addWorker(command, true))
return;
c = ctl.get();
}
// 第二個(gè)步驟,線程池的線程大小比核心線程大小要大,
// 并且線程池還在RUNNING狀態(tài)堪唐,阻塞隊(duì)列也沒滿的情況巡语,加阻塞隊(duì)列里
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 雖然滿足了第二個(gè)步驟,但是這個(gè)時(shí)候可能突然線程池關(guān)閉了羔杨,所以再做一層判斷
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 第三個(gè)步驟捌臊,直接使用線程池最大大小。addWorker方法第二個(gè)參數(shù)false表示使用最大大小
else if (!addWorker(command, false))
reject(command);
}
在前面方法中都會(huì)調(diào)用addWorker(Runnable firstTask, boolean core)方法創(chuàng)建一個(gè)工作線程兜材,差別是創(chuàng)建的有些工作線程上面關(guān)聯(lián)接收到的任務(wù)firstTask理澎,有些沒有。該方法為當(dāng)前接收到的任務(wù)firstTask創(chuàng)建Worker曙寡,并將Worker添加到作業(yè)集合HashSet<Worker> workers中糠爬,并啟動(dòng)作業(yè)。
addWorker源碼分析
// 返回值是boolean類型举庶,true表示新任務(wù)被接收了执隧,并且執(zhí)行了。否則是false
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c); // 線程池當(dāng)前狀態(tài)
// 這個(gè)判斷轉(zhuǎn)換成 rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty)户侥。
// 概括為3個(gè)條件:
// 1. 線程池不在RUNNING狀態(tài)并且狀態(tài)是STOP镀琉、TIDYING或TERMINATED中的任意一種狀態(tài)
// 2. 線程池不在RUNNING狀態(tài),線程池接受了新的任務(wù)
// 3. 線程池不在RUNNING狀態(tài)蕊唐,阻塞隊(duì)列為空屋摔。 滿足這3個(gè)條件中的任意一個(gè)的話,拒絕執(zhí)行任務(wù)
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c); // 線程池線程個(gè)數(shù)
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize)) // 如果線程池線程數(shù)量超過線程池最大容量或者線程數(shù)量超過了基本大小(core參數(shù)為true替梨,core參數(shù)為false的話判斷超過最大大小)
return false; // 超過直接返回false
if (compareAndIncrementWorkerCount(c)) // 沒有超過各種大小的話钓试,cas操作線程池線程數(shù)量+1,cas成功的話跳出循環(huán)
break retry;
c = ctl.get(); // 重新檢查狀態(tài)
if (runStateOf(c) != rs) // 如果狀態(tài)改變了副瀑,重新循環(huán)操作
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 參數(shù) firstTask != null, core = true
// 驗(yàn)證可以滿足可新增線程的條件
boolean workerStarted = false; // 任務(wù)是否成功啟動(dòng)標(biāo)識(shí)
boolean workerAdded = false; // 任務(wù)是否添加成功標(biāo)識(shí)
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock; // 得到線程池的可重入鎖
w = new Worker(firstTask); // 基于任務(wù)firstTask構(gòu)造worker
final Thread t = w.thread;
if (t != null) {
mainLock.lock(); // 鎖住弓熏,防止并發(fā)
try {
// 在鎖住之后再重新檢測(cè)一下狀態(tài)
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) { // 如果線程池在RUNNING狀態(tài)或者線程池在SHUTDOWN狀態(tài)并且任務(wù)是個(gè)null
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true; // 標(biāo)識(shí)一下任務(wù)已經(jīng)添加成功
}
} finally {
mainLock.unlock(); // 解鎖
}
if (workerAdded) {
t.start(); // 線程啟動(dòng),調(diào)用worker.run
workerStarted = true;
}
}
} finally {
if (! workerStarted) // 任務(wù)啟動(dòng)失敗
addWorkerFailed(w);
}
return workerStarted;
}
任務(wù)執(zhí)行
private boolean addWorker(Runnable firstTask, boolean core) {
...
...
if (workerAdded) {
t.start(); // 線程啟動(dòng)糠睡,調(diào)用worker.run
workerStarted = true;
}
...
return workerStarted;
}
Worker中的線程start的時(shí)候挽鞠,調(diào)用Worker本身run方法
run()內(nèi)部調(diào)用runWorker(Worker w),就是在一直調(diào)用getTask()方法獲取任務(wù),然后調(diào)用 runTask(task)方法執(zhí)行任務(wù)铜幽。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
...
...
//循環(huán)從線程池的任務(wù)隊(duì)列獲取任務(wù)
while (task != null || (task = getTask()) != null) {
...
...
try {
//執(zhí)行任務(wù)
task.run();
} catch (RuntimeException x) {
...
...
} finally {
//執(zhí)行正常完成
afterExecute(task, thrown);
}
}finally {
//調(diào)用processWorkerExit方法進(jìn)行回收
processWorkerExit(w, completedAbruptly);
}
}
getTask()方法是ThreadPoolExecutor提供給其內(nèi)部類Worker的的方法滞谢。作用就是一個(gè),從任務(wù)隊(duì)列中取任務(wù)除抛,源源不斷地輸出任務(wù)狮杨。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
...
for (;;) {
...
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
//從任務(wù)隊(duì)列的頭部取任務(wù)
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
如果發(fā)生了以下四件事中的任意一件,那么Worker需要被回收:
Worker個(gè)數(shù)比線程池最大大小要大
線程池處于STOP狀態(tài)
線程池處于SHUTDOWN狀態(tài)并且阻塞隊(duì)列為空
使用超時(shí)時(shí)間從阻塞隊(duì)列里拿數(shù)據(jù)到忽,并且超時(shí)之后沒有拿到數(shù)據(jù)(allowCoreThreadTimeOut || workerCount > corePoolSize)
如果getTask返回的是null橄教,那說明阻塞隊(duì)列已經(jīng)沒有任務(wù)了清寇,那么會(huì)調(diào)用processWorkerExit方法進(jìn)行回收:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 集合移除掉需要回收的Worker
workers.remove(w);
} finally {
mainLock.unlock();
}
// 嘗試結(jié)束線程池
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
//如果Worker不是異常而死亡結(jié)束流程
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//異常結(jié)束,新開啟一個(gè)Worker
//開啟條件:由于用戶任務(wù)異常退出; Worker任務(wù)少于corePoolSize或者工作正在運(yùn)行;阻塞隊(duì)列不為空,但沒有workers护蝶。
addWorker(null, false);
}
}