線程池的執(zhí)行任務(wù)有兩種方法可款,一種是 submit、一種是 execute末盔;這兩個(gè)方法是有區(qū)別的筑舅,那么基于這個(gè)區(qū)別我們?cè)賮?lái)看看。
execute 和 submit 區(qū)別
execute 只可以接收一個(gè) Runnable 的參數(shù)
execute 如果出現(xiàn)異常會(huì)拋出
execute 沒(méi)有返回值
submit 可以接收 Runable 和 Callable 這兩種類型的參數(shù)陨舱,
對(duì)于 submit 方法翠拣,如果傳入一個(gè) Callable,可以得到一個(gè) Future 的返回值
submit 方法調(diào)用不會(huì)拋異常游盲,除非調(diào)用 Future.get
這里误墓,我們重點(diǎn)了解一下 Callable/Future,可能很多同學(xué)知道他是一個(gè)帶返回值的線程益缎,但是具體的實(shí)現(xiàn)可能不清楚谜慌。
Callable/Future 案例演示
Callable/Future 和 Thread 之類的線程構(gòu)建最大的區(qū)別在于,能夠很方便的獲取線程執(zhí)行完以后的結(jié)果莺奔。首先來(lái)看一個(gè)簡(jiǎn)單的例子
public class CallableDemo implements Callable<String> {
public String call() throws Exception {
Thread.sleep(3000);//阻塞案例演示
return "hello world";
}
public static void main(String[] args) throws ExecutionException,
InterruptedException {
CallableDemo callableDemo = new CallableDemo();
FutureTask futureTask = new FutureTask(callableDemo);
new Thread(futureTask).start();
System.out.println(futureTask.get());
}
}
想一想我們?yōu)槭裁葱枰褂没卣{(diào)呢欣范?那是因?yàn)榻Y(jié)果值是由另一線程計(jì)算的,當(dāng)前線程是不知道結(jié)果值什么時(shí)候計(jì)算完成令哟,所以它傳遞一個(gè)回調(diào)接口給計(jì)算線程恼琼,當(dāng)計(jì)算完成時(shí),調(diào)用這個(gè)回調(diào)接口屏富,回傳結(jié)果值晴竞。
這個(gè)在很多地方有用到,比如 Dubbo 的異步調(diào)用狠半,比如消息中間件的異步通信等等… 利用 FutureTask噩死、 Callable、 Thread 對(duì)耗時(shí)任務(wù)(如查詢數(shù)據(jù)庫(kù))做預(yù)處理神年,在需要計(jì)算結(jié)果之前就啟動(dòng)計(jì)算已维。
所以我們來(lái)看一下 Future/Callable 是如何實(shí)現(xiàn)的
Callable/Future 原理分析
在剛剛實(shí)現(xiàn)的 demo 中,我們用到了兩個(gè) api已日,分別是 Callable 和 FutureTask垛耳。Callable 是一個(gè)函數(shù)式接口,里面就只有一個(gè) call 方法。子類可以重寫(xiě)這個(gè)方法艾扮,并且這個(gè)方法會(huì)有一個(gè)返回值
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
FutureTask
FutureTask 的類關(guān)系圖如下,它實(shí)現(xiàn) RunnableFuture 接口占婉,那么這個(gè) RunnableFuture 接口的作用是什么呢泡嘴。
在講解 FutureTask 之前,先看看 Callable, Future, FutureTask 它們之間的關(guān)系圖逆济,如下:
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
RunnableFuture 是一個(gè)接口酌予,它繼承了 Runnable 和 Future 這兩個(gè)接口, Runnable 太熟悉了奖慌, 那么 Future 是什么呢抛虫?
Future 表示一個(gè)任務(wù)的生命周期,并提供了相應(yīng)的方法來(lái)判斷是否已經(jīng)完成或取消简僧,以及獲取任務(wù)的結(jié)果和取消任務(wù)等建椰。
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
// 當(dāng)前的 Future 是否被取消,返回 true 表示已取消
boolean isCancelled();
// 當(dāng)前 Future 是否已結(jié)束岛马。包括運(yùn)行完成棉姐、拋出異常以及取消,都表示當(dāng)前 Future 已結(jié)束
boolean isDone();
// 獲取 Future 的結(jié)果值啦逆。如果當(dāng)前 Future 還沒(méi)有結(jié)束伞矩,那么當(dāng)前線程就等待,
// 直到 Future 運(yùn)行結(jié)束夏志,那么會(huì)喚醒等待結(jié)果值的線程的乃坤。
V get() throws InterruptedException, ExecutionException;
// 獲取 Future 的結(jié)果值。與 get()相比較多了允許設(shè)置超時(shí)時(shí)間
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
分析到這里我們其實(shí)有一些初步的頭緒了沟蔑, FutureTask 是 Runnable 和 Future 的結(jié)合湿诊,如果我們把 Runnable 比作是生產(chǎn)者, Future 比作是消費(fèi)者溉贿,那么 FutureTask 是被這兩者共享的枫吧,生產(chǎn)者運(yùn)行 run 方法計(jì)算結(jié)果,消費(fèi)者通過(guò) get 方法獲取結(jié)果宇色。
作為生產(chǎn)者消費(fèi)者模式九杂,有一個(gè)很重要的機(jī)制,就是如果生產(chǎn)者數(shù)據(jù)還沒(méi)準(zhǔn)備的時(shí)候宣蠕,消費(fèi)者會(huì)被阻塞例隆。當(dāng)生產(chǎn)者數(shù)據(jù)準(zhǔn)備好了以后會(huì)喚醒消費(fèi)者繼續(xù)執(zhí)行。
這個(gè)有點(diǎn)像我們上次可分析的阻塞隊(duì)列抢蚀,那么在 FutureTask 里面是基于什么方式實(shí)現(xiàn)的呢镀层?
state 的含義
表示 FutureTask 當(dāng)前的狀態(tài),分為七種狀態(tài)
private static final int NEW = 0; // NEW 新建狀態(tài),表示這個(gè) FutureTask還沒(méi)有開(kāi)始運(yùn)行
// COMPLETING 完成狀態(tài)唱逢, 表示 FutureTask 任務(wù)已經(jīng)計(jì)算完畢了
// 但是還有一些后續(xù)操作吴侦,例如喚醒等待線程操作,還沒(méi)有完成坞古。
private static final int COMPLETING = 1;
// FutureTask 任務(wù)完結(jié)备韧,正常完成,沒(méi)有發(fā)生異常
private static final int NORMAL = 2;
// FutureTask 任務(wù)完結(jié)痪枫,因?yàn)榘l(fā)生異常织堂。
private static final int EXCEPTIONAL = 3;
// FutureTask 任務(wù)完結(jié),因?yàn)槿∠蝿?wù)
private static final int CANCELLED = 4;
// FutureTask 任務(wù)完結(jié)奶陈,也是取消任務(wù)易阳,不過(guò)發(fā)起了中斷運(yùn)行任務(wù)線程的中斷請(qǐng)求
private static final int INTERRUPTING = 5;
// FutureTask 任務(wù)完結(jié),也是取消任務(wù)吃粒,已經(jīng)完成了中斷運(yùn)行任務(wù)線程的中斷請(qǐng)求
private static final int INTERRUPTED = 6;
run 方法
public void run() {
// 如果狀態(tài) state 不是 NEW潦俺,或者設(shè)置 runner 值失敗
// 表示有別的線程在此之前調(diào)用 run 方法,并成功設(shè)置了 runner 值
// 保證了只有一個(gè)線程可以運(yùn)行 try 代碼塊中的代碼声搁。
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
//只有c不為null且狀態(tài)state為NEW的情況
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//調(diào)用callable的call方法黑竞,并獲得返回結(jié)果
result = c.call();
//運(yùn)行成功
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
//設(shè)置結(jié)果
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
其實(shí) run 方法作用非常簡(jiǎn)單,就是調(diào)用 callable 的 call 方法返回結(jié)果值 result疏旨,根據(jù)是否發(fā)生異常很魂,調(diào)用 set(result)或 setException(ex)方法表示 FutureTask 任務(wù)完結(jié)。
不過(guò)因?yàn)?FutureTask 任務(wù)都是在多線程環(huán)境中使用檐涝,所以要注意并發(fā)沖突問(wèn)題遏匆。注意在 run方法中,我們沒(méi)有使用 synchronized 代碼塊或者 Lock 來(lái)解決并發(fā)問(wèn)題谁榜,而是使用了 CAS 這個(gè)樂(lè)觀鎖來(lái)實(shí)現(xiàn)并發(fā)安全幅聘,保證只有一個(gè)線程能運(yùn)行 FutureTask 任務(wù)。
get 方法
get 方法就是阻塞獲取線程執(zhí)行結(jié)果窃植,這里主要做了兩個(gè)事情
- 判斷當(dāng)前的狀態(tài)帝蒿,如果狀態(tài)小于等于 COMPLETING,表示 FutureTask 任務(wù)還沒(méi)有完結(jié)巷怜,所以調(diào)用 awaitDone 方法葛超,讓當(dāng)前線程等待。
- report 返回結(jié)果值或者拋出異常
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
awaitDone
如果當(dāng)前的結(jié)果還沒(méi)有被執(zhí)行完延塑,把當(dāng)前線程線程和插入到等待隊(duì)列
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
FutureTask.WaitNode q = null;
boolean queued = false;// 節(jié)點(diǎn)是否已添加
for (;;) {
// 如果當(dāng)前線程中斷標(biāo)志位是 true绣张,
// 那么從列表中移除節(jié)點(diǎn) q,并拋出 InterruptedException 異常
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// 當(dāng)狀態(tài)大于 COMPLETING 時(shí)关带,表示FutureTask任務(wù)已結(jié)束
if (s > COMPLETING) {
if (q != null)
// 將節(jié)點(diǎn) q 線程設(shè)置為 null侥涵,因?yàn)榫€程沒(méi)有阻塞等待
q.thread = null;
return s;
}
// 表示還有一些后序操作沒(méi)有完成,那么當(dāng)前線程讓出執(zhí)行權(quán)
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
//表示狀態(tài)是 NEW,那么就需要將當(dāng)前線程阻塞等待芜飘。
// 就是將它插入等待線程鏈表中
else if (q == null)
q = new FutureTask.WaitNode();
// 使用 CAS 函數(shù)將新節(jié)點(diǎn)添加到鏈表中务豺,如果添加失敗,那么queued 為 false嗦明,
// 下次循環(huán)時(shí)冲呢,會(huì)繼續(xù)添加,知道成功招狸。
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// timed 為 true 表示需要設(shè)置超時(shí)
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
// 讓當(dāng)前線程等待 nanos 時(shí)間
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
被阻塞的線程,會(huì)等到 run 方法執(zhí)行結(jié)束之后被喚醒
report
report 方法就是根據(jù)傳入的狀態(tài)值 s邻薯,來(lái)決定是拋出異常裙戏,還是返回結(jié)果值。 這個(gè)兩種情況都表示 FutureTask 完結(jié)了
private V report(int s) throws ExecutionException {
Object x = outcome;//表示 call 的返回值
if (s == NORMAL) // 表示正常完結(jié)狀態(tài)厕诡,所以返回結(jié)果值
return (V)x;
// 大于或等于 CANCELLED累榜,都表示手動(dòng)取消 FutureTask 任務(wù),
// 所以拋出 CancellationException 異常
if (s >= CANCELLED)
throw new CancellationException();
// 否則就是運(yùn)行過(guò)程中灵嫌,發(fā)生了異常壹罚,這里就拋出這個(gè)異常
throw new ExecutionException((Throwable)x);
}
線程池對(duì)于 Future/Callable 的執(zhí)行
我們現(xiàn)在再來(lái)看線程池里面的 submit 方法,就會(huì)很清楚了寿羞。
public class CallableDemo implements Callable<String> {
public String call() throws Exception {
Thread.sleep(3000);//阻塞案例演示
return "hello world";
}
public static void main(String[] args) throws ExecutionException,
InterruptedException {
ExecutorService es= Executors.newFixedThreadPool(1);
CallableDemo callableDemo = new CallableDemo();
FutureTask futureTask = new FutureTask(callableDemo);
Future future=es.submit(callableDemo);
System.out.println(futureTask.get());
}
}
AbstractExecutorService.submit
調(diào)用抽象類中的 submit 方法猖凛,這里其實(shí)相對(duì)于 execute 方法來(lái)說(shuō),只多做了一步操作绪穆,就是封裝了一個(gè) RunnableFuture
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
newTaskFor
更簡(jiǎn)單
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
然后調(diào)用 execute 方法辨泳,這里面的邏輯前面分析過(guò)了,會(huì)通過(guò) worker 線程來(lái)調(diào)用過(guò) ftask 的run 方法玖院。而這個(gè) ftask 其實(shí)就是 FutureTask 里面最終實(shí)現(xiàn)的邏輯菠红。