JDK線程池相關(guān)一
jdk中將計(jì)算任務(wù)(task)和計(jì)算任務(wù)執(zhí)行本身解耦身弊。
基礎(chǔ)接口與類
與計(jì)算任務(wù)相關(guān)的兩個(gè)接口和
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
public interface Runnable {
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see java.lang.Thread#run()
*/
public abstract void run();
}
兩個(gè)接口的區(qū)別從代碼和注釋中顯而易見杈抢。
Callable帶返回值纸型,且可能拋異常(受檢)盆顾;Runnable不帶返回值类咧,且不拋異常(受檢)丐谋。
而任務(wù)的執(zhí)行則由另一個(gè)接口Executor
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
注意还惠,這個(gè)接口只描述了可以執(zhí)行任務(wù)饲握,并沒(méi)有線程池的概念,當(dāng)然線程池是一定可以執(zhí)行任務(wù)的蚕键,因此線程池需要實(shí)現(xiàn)該接口救欧。
為了追蹤任務(wù)的異步計(jì)算,比如提交完一個(gè)任務(wù)之后需要知道任務(wù)是否已完成锣光,或者是取消該任務(wù)笆怠,jdk提供了一個(gè)名為Future的接口
public interface Future<V> {
/**
* 取消與之關(guān)聯(lián)的任務(wù)。如果任務(wù)已經(jīng)完成或者已經(jīng)取消嫉晶,或者是因?yàn)槟承┰虿荒苋∠麆t返回false
* 如果任務(wù)還未開始骑疆,且取消成功,則該任務(wù)永遠(yuǎn)不會(huì)再被執(zhí)行
* 如果任務(wù)已經(jīng)開始替废,則輸入?yún)?shù)mayInterruptIfRunning決定是否采用中斷的方式叫停任務(wù)
*/
boolean cancel(boolean mayInterruptIfRunning);
/**
* 任務(wù)完成前被取消 則返回true
*/
boolean isCancelled();
/**
* 返回任務(wù)是否完成
* 任務(wù)完成包括以下情況
* 正常結(jié)束箍铭、異常或者取消
* 以上任何一種情況下都返回true
*/
boolean isDone();
/**
* 阻塞到任務(wù)結(jié)束椎镣,返回任務(wù)執(zhí)行結(jié)果
*/
V get() throws InterruptedException, ExecutionException;
/**
* 等待一段時(shí)間诈火,還沒(méi)結(jié)束則拋超時(shí)異常
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
至此,我們了解了三個(gè)概念:
- 任務(wù):Callable或者Runnable實(shí)現(xiàn)状答,定義任務(wù)做什么
- 任務(wù)執(zhí)行器:Executor冷守,用于執(zhí)行任務(wù)
- 任務(wù)狀態(tài):Future刀崖,用于追蹤任務(wù)的執(zhí)行,任務(wù)的一次執(zhí)行
但是這幾部分是怎么聯(lián)系起來(lái)的呢拍摇?線程池在哪里呢亮钦?
先回答第二個(gè)問(wèn)題。
jdk提供了另一個(gè)接口充活,該接口定義了執(zhí)行器的一系列行為(方法)蜂莉,那就是ExecutorService。
public interface ExecutorService extends Executor {
/**
* 該方法會(huì)以一種比較平滑的方式關(guān)閉執(zhí)行器:
* 1.已經(jīng)提交的任務(wù)會(huì)被執(zhí)行(但不保證執(zhí)行完畢)
* 2.不再接收新的任務(wù)
* 如果該執(zhí)行器已經(jīng)關(guān)閉了混卵,再調(diào)用此方法沒(méi)有任何作用
* 該方法不會(huì)等到已經(jīng)提交的任務(wù)執(zhí)行完畢
*/
void shutdown();
/**
* 比較粗暴的關(guān)閉執(zhí)行器映穗,直接試圖中止所有的任務(wù),掛起所有等待執(zhí)行的任務(wù)
* 返回所有等待執(zhí)行的任務(wù)
* 該方法不會(huì)等待正在執(zhí)行中的任務(wù)執(zhí)行完畢
*/
List<Runnable> shutdownNow();
/**
* 執(zhí)行器已經(jīng)被關(guān)閉時(shí)返回true
*
*/
boolean isShutdown();
/**
* 關(guān)閉執(zhí)行器后如果所有的任務(wù)都已執(zhí)行完畢則返回true
* 這意味著該方法只可能在調(diào)用shutdown或者shutdownNow后返回true
*/
boolean isTerminated();
/**
* 執(zhí)行器關(guān)閉后阻塞至所有的任務(wù)執(zhí)行完畢或者超時(shí)幕随、中斷
* 當(dāng)執(zhí)行器已經(jīng)終止時(shí)返回true蚁滋;超過(guò)指定時(shí)間還沒(méi)終止則返回false
*/
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 提交一個(gè)帶返回值的任務(wù)給執(zhí)行器,并返回一個(gè)Future對(duì)象用于跟蹤任務(wù)的執(zhí)行
*/
<T> Future<T> submit(Callable<T> task);
/**
* 提交一個(gè)沒(méi)有返回值的任務(wù)給執(zhí)行器赘淮,并返回一個(gè)Futrue對(duì)象用于跟蹤任務(wù)的執(zhí)行
* 任務(wù)執(zhí)行完畢后使用get可以得到指定的result
*/
<T> Future<T> submit(Runnable task, T result);
/**
* 提交一個(gè)沒(méi)有返回值的任務(wù)給執(zhí)行器辕录,并返回一個(gè)Futrue對(duì)象用于跟蹤任務(wù)的執(zhí)行
* 任務(wù)執(zhí)行完畢后使用get返回null
*/
Future<?> submit(Runnable task);
/**
* 提交多個(gè)任務(wù),該方法是阻塞的拥知,只有所有的任務(wù)完成后才返回與這些任務(wù)關(guān)聯(lián)的Future列表
* Future列表中的每一個(gè)對(duì)象調(diào)用isDone都返回true
* 任務(wù)正常結(jié)束或者拋異常才成為完成
* 輸入的列表在執(zhí)行該方法時(shí)被修改時(shí)踏拜,則返回的結(jié)果未定義
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
/**
* 指定執(zhí)行的時(shí)長(zhǎng)碎赢,到時(shí)間后任務(wù)要么執(zhí)行完畢要么超時(shí)
* 所有的Future對(duì)象調(diào)用isDone都返回true
* 返回前低剔,所有未能完成的任務(wù)都會(huì)被取消掉
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 只要任意一個(gè)任務(wù)完成,則返回該任務(wù)的返回值肮塞,其他未完成的任務(wù)都會(huì)被取消
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
/**
* 只要任意一個(gè)任務(wù)在指定的時(shí)間點(diǎn)前返回襟齿,則返回該任務(wù)的返回值揽咕,其他未完成的任務(wù)都會(huì)被取消
* 如果沒(méi)有任何一個(gè)任務(wù)及時(shí)完成缘回,則拋出超時(shí)異常
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
再來(lái)回答第一個(gè)問(wèn)題瓷胧。
jdk提供了一個(gè)接口
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
該接口直接就將任務(wù)和任務(wù)追蹤關(guān)聯(lián)起來(lái)了摧阅。
接下來(lái)我們看看執(zhí)行器的實(shí)現(xiàn)极颓。jdk中提供了一個(gè)執(zhí)行器(其實(shí)是線程池)的抽象類丽旅,該類實(shí)現(xiàn)了其上層接口的很多方法锡足,留給一些必要的方法給子類去實(shí)現(xiàn)漓帚。
public abstract class AbstractExecutorService implements ExecutorService
這里我們只看看這個(gè)類的submit方法
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
可以看到這兩個(gè)方法是對(duì)ExecutorService接口中方法的實(shí)現(xiàn)篮昧。
可以看到這兩個(gè)方法中都是將任務(wù)封裝成一個(gè)RunnableFuture對(duì)象赋荆,然后扔給executor執(zhí)行,最后返回該RunnableFuture對(duì)象懊昨。
來(lái)看看這個(gè)newTaskFor方法
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
返回了一個(gè)我們尚未提及的一個(gè)類示例:FutureTask窄潭。顯然這個(gè)FutureTask是RunnableFuture的實(shí)現(xiàn)類。
目前為止酵颁,還有一個(gè)疑問(wèn):既然任務(wù)是執(zhí)行器來(lái)執(zhí)行的嫉你,任務(wù)的狀態(tài)是通過(guò)Future來(lái)查詢的月帝,那Future中的狀態(tài)是什么時(shí)候設(shè)置的呢?那就得看FutrueTask的源碼了幽污。
我們挑選它的一個(gè)構(gòu)造函數(shù)來(lái)看
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
可以看到FutureTask包含了傳遞進(jìn)去的Callable嚷辅。而AbstractExecutorService中的execute執(zhí)行的實(shí)際上是包含Callable對(duì)象的FutureTask。因此執(zhí)行器(Executor)執(zhí)行的是FutureTask的run方法距误。對(duì)于執(zhí)行器來(lái)說(shuō)潦蝇,它并不知道FutureTask內(nèi)部的狀態(tài),它只負(fù)責(zé)調(diào)用FutureTask的run方法深寥,該run方法會(huì)完成FutureTask的狀態(tài)變更攘乒。
FutureTask有一個(gè)成員變量
private volatile Thread runner;
這個(gè)成員變量是用于執(zhí)行FutureTask的線程。對(duì)于線程池而言惋鹅,這個(gè)線程就是線程池分配給它的则酝。來(lái)看看run方法
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
這個(gè)run方法執(zhí)行時(shí)其實(shí)已經(jīng)是由某一個(gè)線程來(lái)執(zhí)行了,對(duì)于線程池而言就是分配給它的線程闰集。而第一個(gè)if是為了將當(dāng)前線程賦值給成員變量runner沽讹。而cancel方法中會(huì)嘗試中斷該線程。
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
FutureTask我會(huì)單獨(dú)寫一篇源碼分析武鲁,此處大致了解即可爽雄。
回到AbstractExecutorService的兩個(gè)submit方法,注意到execute方法沐鼠,該方法其實(shí)是Executor接口中的方法挚瘟,抽象類并沒(méi)有實(shí)現(xiàn)該方法,這說(shuō)明需要子類去實(shí)現(xiàn)饲梭。
目前為止還沒(méi)有出現(xiàn)過(guò)線程池的概念乘盖。其實(shí)jdk中的線程池就是用AbstractExecutorService來(lái)實(shí)現(xiàn)的。該類就是ThreadPoolExecutor憔涉《┛颍看名字就能知道是線程池執(zhí)行器。這個(gè)類會(huì)另寫文章單獨(dú)分析兜叨。
總結(jié)
jdk線程池的設(shè)計(jì)將任務(wù)(靜態(tài))穿扳,任務(wù)的一次執(zhí)行(動(dòng)態(tài))以及任務(wù)的執(zhí)行解耦。定義了不同的接口分別去完成這些事情国旷。
任務(wù)由Callable或者Runnable來(lái)定義矛物,只定義了任務(wù)需要做什么,這是個(gè)靜態(tài)的概念议街。
任務(wù)的一次執(zhí)行則由Future來(lái)表示泽谨,通過(guò)Future可以知道與之關(guān)聯(lián)的任務(wù)的執(zhí)行狀態(tài),這是個(gè)動(dòng)態(tài)的概念。
任務(wù)的執(zhí)行則由執(zhí)行器來(lái)完成吧雹,執(zhí)行器只負(fù)責(zé)執(zhí)行任務(wù)骨杂,并不直接對(duì)外提供查詢某個(gè)任務(wù)是否完成的功能(由Future來(lái)提供)。
任務(wù)被封裝成FutureTask后交由執(zhí)行器執(zhí)行雄卷,F(xiàn)utureTask對(duì)任務(wù)(Callable或者Runnable)進(jìn)行封裝搓蚪,加上了一些狀態(tài)(未執(zhí)行、執(zhí)行中丁鹉、執(zhí)行完成等)妒潭。執(zhí)行器執(zhí)行FutureTask的run方法最終會(huì)調(diào)用任務(wù)的run或者call方法,在調(diào)用任務(wù)的run或者call的前后揣钦,F(xiàn)uturTask負(fù)責(zé)更改自身的狀態(tài)雳灾。因此,對(duì)于執(zhí)行器來(lái)說(shuō)并不關(guān)心任務(wù)的狀態(tài)冯凹,它只負(fù)責(zé)調(diào)用FutureTask的run方法谎亩,至于FutureTask的run方法中怎么處理,那就是FutureTask自己的事情了宇姚。