介紹
線程池的作用就是提供一種對線程的管理,避免由于過多的創(chuàng)建和銷毀線程所造成的開銷。在一個“池”中維護著一定數(shù)量的線程角寸,達到可重復(fù)利用的效果乖酬。在Java中死相,線程池的實現(xiàn)主要是通過ThreadPoolExecutor來實現(xiàn)的。接下來先從類圖結(jié)構(gòu)來分析一下咬像。
類結(jié)構(gòu)
- Executor
public interface Executor {
void execute(Runnable command);
}
在這看得出Executor是一個頂層的接口算撮,里邊只有execute方法双肤。這個接口的目的就是表明需要執(zhí)行一個Runnable任務(wù)。
- ExecutorService
它直接繼承至Executor钮惠,但是依然是一個接口茅糜,只是在此基礎(chǔ)上增加了其他的方法。
<T> Future<T> submit(Callable<T> task);
submit方法主要就是接受Callable接口的參數(shù)素挽,不再是Runnable參數(shù)了蔑赘,而且增加了返回值Future。
在ExecutorService類中還有好幾個重載函數(shù)预明。這幾個方法的設(shè)計主要是為了讓執(zhí)行任務(wù)者能夠得到任務(wù)的運行結(jié)果缩赛。
void shutdown();
這個方法主要是提供了關(guān)閉線程池的操作,調(diào)用此方法后撰糠,線程池不再接收新的任務(wù)酥馍,但是會把當(dāng)前緩存隊列的任務(wù)全部執(zhí)行完畢。
List<Runnable> shutdownNow();
這個方法調(diào)用后阅酪,不但不能接收新的任務(wù)旨袒,也會嘗試中斷正在執(zhí)行的任務(wù),同時不再執(zhí)行緩存隊列中的任務(wù)术辐。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
這個方法提供執(zhí)行一系列的任務(wù)的功能砚尽,最后返回所有任務(wù)的Future對象,用于活動任務(wù)的執(zhí)行結(jié)果辉词。
- AbstractExecutorService
它是一個抽象類必孤,實現(xiàn)了ExecutorService接口。對其中絕大部分的方法進行了實現(xiàn)瑞躺。
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
任務(wù)為空直接返回空指針異常敷搪,新建一個ftask對象,最終還是調(diào)用了execute方法去執(zhí)行任務(wù)〈鄙冢現(xiàn)在追蹤一下newTaskFor方法
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
返回的是一個FutureTask對象赡勘,這個類即實現(xiàn)了Runnable接口又實現(xiàn)了Callable接口,這樣它既可以當(dāng)作線程的執(zhí)行對象又可以對任務(wù)執(zhí)行后的結(jié)果進行獲取嘱么。(為了不脫離主線狮含,暫時就不再分析FutureTask的源碼了)
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
try {
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
首先創(chuàng)建用于存儲結(jié)果的集合futures,大小為傳入的任務(wù)數(shù)(一個任務(wù)對應(yīng)一個future對象)曼振。然后遍歷所有的Callable對象几迄,把它們封裝到RunnableFuture中(實際傳入的是futureTask對象),然后把創(chuàng)建的futureTask對象加入到結(jié)果集futures中冰评,然后調(diào)用execute方法去依次執(zhí)行傳入的任務(wù)映胁。
接下來又是一個for循環(huán),在其中去保證每個任務(wù)已經(jīng)執(zhí)行完畢甲雅,當(dāng)判斷某一個任務(wù)if (!f.isDone())沒有完成時解孙,會調(diào)用f.get()坑填,這個方法是一個阻塞方法,也就是說當(dāng)前線程會一直等到任務(wù)執(zhí)行結(jié)束才會返回弛姜。這樣保證了所有的任務(wù)都會在這個for循環(huán)中全部執(zhí)行完畢脐瑰,然后返回futures結(jié)果集。
此抽象類中并沒有實現(xiàn)execute廷臼、shutdown苍在、shutdownNow等方法,具體的實現(xiàn)放在了ThreadPoolExecutor中荠商。
- ThreadPoolExecutor
這個類繼承自AbstractExecutorService 寂恬,實現(xiàn)了execute方法。引入了線程池的管理莱没。
源碼分析
先看一下構(gòu)造函數(shù)初肉,因為線程池最重要的就是其中的一些參數(shù),不同的參數(shù)配置饰躲,就可以實現(xiàn)不同的線程池牙咏。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
.....
}
corePoolSize
指的就是線程池的核心線程數(shù)。當(dāng)當(dāng)前線程池中的線程個數(shù)小于corePoolSize時属铁,對新來的任務(wù)眠寿,直接開啟一個新的線程去執(zhí)行它。maximumPoolSize
代表最大能夠容納的線程數(shù)量焦蘑。當(dāng)線程池中的線程個數(shù)大于等于corePoolSize后,當(dāng)需要執(zhí)行一個新的任務(wù)時會先把任務(wù)放入緩存隊列中盒发,等待后續(xù)空閑的線程去執(zhí)行例嘱。如果此時緩存隊列已滿,那么就會新啟一個線程去執(zhí)行它宁舰,如果線程數(shù)量已經(jīng)超過了maximumPoolSize拼卵,那么就會調(diào)用reject方法,拒絕執(zhí)行該次任務(wù)(后邊會分析reject方法)蛮艰。keepAliveTime
用于指定線程存活的時間腋腮,當(dāng)線程池中的線程大于corePoolSize后,會監(jiān)控每一個線程的空閑時間壤蚜,如果某個線程的空閑時間大于keepAliveTime即寡,那么就會銷毀該線程,釋放資源袜刷。unit
這個是keepAliveTime的單位聪富,可以為秒著蟹、毫秒等等梢莽。workQueue
這個就是我們的任務(wù)緩存隊列了奸披。是一個阻塞隊列的類型,常用的有ArrayBlockingQueue阵面、LinkedBlockingQueue(默認(rèn)容量是Integer.MAX_VALUE)和SynchronousQueue轻局。threadFactory
這個就是創(chuàng)建線程的工廠類膜钓。用于新建線程實體。-
handler
這是拒絕某個任務(wù)的回調(diào)颂斜。當(dāng)線程池不能夠處理某個任務(wù)時,會通過調(diào)用handler.rejectedExecution()去處理沃疮。內(nèi)置了四種策略
AbortPolicy(默認(rèn)情況):直接丟棄,并且拋出RejectedExecutionException異常司蔬。
DiscardPolicy:直接丟棄邑茄,不做任何處理俊啼。
DiscardOldestPolicy:從緩存隊列丟棄最老的任務(wù)肺缕,然后調(diào)用execute立刻執(zhí)行該任務(wù)。
CallerRunsPolicy:在調(diào)用者的當(dāng)前線程去執(zhí)行這個任務(wù)授帕。分析完了構(gòu)造函數(shù)同木,想必大家對其中的參數(shù)有了一定的認(rèn)識,可以看出跛十,不同的參數(shù)組合就可以實現(xiàn)不同需求的線程池彤路,當(dāng)然Java中已經(jīng)為我們內(nèi)置了很多常用的線程池,它們?nèi)课挥贓xecutors類當(dāng)中芥映。如果沒有特殊需求洲尊,建議直接使用其中的線程池。
看完了構(gòu)造函數(shù)奈偏,接下來分析最重要的execute函數(shù)坞嘀,看看到底是怎么一個執(zhí)行流程。英語好的直接閱讀注釋霎苗,已經(jīng)寫的非常仔細(xì)了姆吭。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
不喜歡看文字的直接看下邊的流程圖:
既不想看注釋也不喜歡流程圖的,咱們直接分析代碼:
首先是判斷
workerCountOf(c) < corePoolSize
workerCountOf(c)其實就是計算當(dāng)前線程池中的線程數(shù)唁盏,如果小于核心線程數(shù)那么直接
addWorker(command, true)
如果添加成功内狸,那么方法就結(jié)束了检眯。添加失敗,則需要
isRunning(c) && workQueue.offer(command)
其實就是判斷線程池是否處于運行狀態(tài)昆淡,且嘗試往緩存隊列添加任務(wù)锰瘸,如果同時為真,則:
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
進入if的語句塊昂灵,前邊不是已經(jīng)判斷過是否處于運行狀態(tài)了嗎避凝?為什么還要再次判斷呢?因為考慮多線程訪問眨补,而且沒有進行同步措施管削,所以需要再次進行double-check。
如果添加任務(wù)到緩存隊列失敗撑螺,那么就嘗試添加新的線程:
else if (!addWorker(command, false))
reject(command);
如果添加失敗就需要拒絕此次任務(wù)了含思。
次方法中關(guān)鍵調(diào)用的幾個方法為:addWorker(),workQueue.offer()甘晤,reject()含潘,接下來一個一個分析。