FutureTask介紹
FutureTask是一種可以取消的異步的計(jì)算任務(wù)坝橡。它的計(jì)算是通過(guò)Callable實(shí)現(xiàn)的,可以把它理解為是可以返回結(jié)果的Runnable。
使用FutureTask的優(yōu)勢(shì)有:
- 可以獲取線程執(zhí)行后的返回結(jié)果;
- 提供了超時(shí)控制功能。
它實(shí)現(xiàn)了Runnable接口和Future接口:
什么是異步計(jì)算呢权薯?也就是說(shuō)侄非,在讓該任務(wù)執(zhí)行時(shí)浪读,不需要一直等待其運(yùn)行結(jié)束返回結(jié)果,而是可以先去處理其他的事情妓肢,然后再獲取返回結(jié)果捌省。例如你想下載一個(gè)很大的文件,這時(shí)很耗時(shí)的操作碉钠,沒(méi)必要一直等待著文件下載完纲缓,你可以先去吃個(gè)飯,然后再回來(lái)看下文件是否下載完成喊废,如果下載完成就可以使用了祝高,否則還需要繼續(xù)等待。
FutureTask的實(shí)現(xiàn)
FutureTask的狀態(tài)
FutureTask內(nèi)部有這樣幾種狀態(tài):
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
看名字應(yīng)該很好理解了污筷,當(dāng)創(chuàng)建一個(gè)FutureTask對(duì)象是工闺,初始的狀態(tài)是NEW,在運(yùn)行時(shí)狀態(tài)會(huì)轉(zhuǎn)換颓屑,有4中狀態(tài)的轉(zhuǎn)換過(guò)程:
- NEW -> COMPLETING -> NORMAL:正常執(zhí)行并返回斤寂;
- NEW -> COMPLETING -> EXCEPTIONAL:執(zhí)行過(guò)程中出現(xiàn)了異常;
- NEW -> CANCELLED揪惦;執(zhí)行前被取消;
- NEW -> INTERRUPTING -> INTERRUPTED:取消時(shí)被中斷罗侯。
使用FutureTask
下面看一下具體的使用過(guò)程:
public class FutureTaskTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newSingleThreadExecutor();
FutureTask<Integer> future = new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int result = 0;
for (int i = 0; i < 100; i++) {
result += i;
}
return result;
}
});
executor.execute(future);
System.out.println(future.get());
}
}
FutureTask內(nèi)部結(jié)構(gòu)
public class FutureTask<V> implements RunnableFuture<V> {
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** 執(zhí)行callable的線程 **/
private volatile Thread runner;
/**
* Treiber stack of waiting threads
* 使用Treiber算法實(shí)現(xiàn)的無(wú)阻塞的Stack器腋,
* 用于存放等待的線程
*/
private volatile WaitNode waiters;
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
public V get() throws InterruptedException, ExecutionException {
...
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
...
}
...
這里的waiters理解為一個(gè)stack,因?yàn)樵谡{(diào)用get方法時(shí)任務(wù)可能還沒(méi)有執(zhí)行完钩杰,這時(shí)需要將調(diào)用get方法的線程放入waiters中纫塌。
最重要的兩個(gè)get方法,用于獲取返回結(jié)果讲弄,第二種提供了超時(shí)控制功能措左。
FutureTask構(gòu)造方法
FutureTask有兩個(gè)構(gòu)造方法:
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
第二種構(gòu)造方法傳入一個(gè)Runnable對(duì)象和一個(gè)返回值對(duì)象,因?yàn)镽unnable是沒(méi)有返回值的避除,所以要通過(guò)result參數(shù)在執(zhí)行完之后返回結(jié)果怎披。
run方法
FutureTask實(shí)現(xiàn)了Runnable接口胸嘁,所以需要實(shí)現(xiàn)run方法,代碼如下:
public void run() {
/*
* 首先判斷狀態(tài)凉逛,如果不是初始狀態(tài)性宏,說(shuō)明任務(wù)已經(jīng)被執(zhí)行或取消;
* runner是FutureTask的一個(gè)屬性状飞,用于保存執(zhí)行任務(wù)的線程毫胜,
* 如果不為空則表示已經(jīng)有線程正在執(zhí)行,這里用CAS來(lái)設(shè)置诬辈,失敗則返回酵使。
*/
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
// 只有初始狀態(tài)才會(huì)執(zhí)行
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 執(zhí)行任務(wù)
result = c.call();
// 如果沒(méi)出現(xiàn)異常,則說(shuō)明執(zhí)行成功了
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
// 設(shè)置異常
setException(ex);
}
// 如果執(zhí)行成功焙糟,則設(shè)置返回結(jié)果
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
// 無(wú)論是否執(zhí)行成功口渔,把runner設(shè)置為null
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
// 如果被中斷,則說(shuō)明調(diào)用的cancel(true)酬荞,
// 這里要保證在cancel方法中把state設(shè)置為INTERRUPTED
// 否則可能在cancel方法中還沒(méi)執(zhí)行中斷搓劫,造成中斷的泄露
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
總結(jié)一下run方法的執(zhí)行過(guò)程
- 只有state為NEW的時(shí)候才執(zhí)行任務(wù);
- 執(zhí)行前要設(shè)置runner為當(dāng)前線程混巧,使用CAS來(lái)設(shè)置是為了防止競(jìng)爭(zhēng)枪向;
- 如果任務(wù)執(zhí)行成功,任務(wù)狀態(tài)從NEW轉(zhuǎn)換為COMPLETING咧党,如果執(zhí)行正常秘蛔,設(shè)置最終狀態(tài)為NORMAL;如果執(zhí)行中出現(xiàn)了異常傍衡,設(shè)置最終狀態(tài)為EXCEPTIONAL深员;
- 喚醒并刪除Treiber Stack中的所有節(jié)點(diǎn);
- 如果調(diào)用了cancel(true)方法進(jìn)行了中斷蛙埂,要確保在run方法執(zhí)行結(jié)束前的狀態(tài)是INTERRUPTED倦畅。
這里涉及到3個(gè)比較重要的方法:setException,set和handlePossibleCancellationInterrupt绣的。
setException方法
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
如果在執(zhí)行過(guò)程中(也就是調(diào)用call方法時(shí))出現(xiàn)了異常叠赐,則要把狀態(tài)先設(shè)置為COMPLETING,如果成功屡江,設(shè)置outcome = t
芭概,outcome對(duì)象是Object類型的,用來(lái)保存異吵图危或者返回結(jié)果對(duì)象罢洲,也就是說(shuō),在正常的執(zhí)行過(guò)程中(沒(méi)有異常文黎,沒(méi)有調(diào)用cancel方法)惹苗,outcome保存著返回結(jié)果對(duì)象殿较,會(huì)被返回,如果出現(xiàn)了異掣敕郏或者中斷斜脂,則不會(huì)返回并拋出異常,這個(gè)在介紹report方法時(shí)會(huì)講到触机。
接著設(shè)置狀態(tài)為EXCEPTIONAL帚戳,這也是最終的狀態(tài)。
finishCompletion方法稍后再分析儡首。
set方法
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
很簡(jiǎn)單片任,與setException類似,只不過(guò)這里的outcome是返回結(jié)果對(duì)象蔬胯,狀態(tài)先設(shè)置為COMPLETING对供,然后再設(shè)置為MORMAL。
handlePossibleCancellationInterrupt方法
private void handlePossibleCancellationInterrupt(int s) {
// It is possible for our interrupter to stall before getting a
// chance to interrupt us. Let's spin-wait patiently.
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
// assert state == INTERRUPTED;
// We want to clear any interrupt we may have received from
// cancel(true). However, it is permissible to use interrupts
// as an independent mechanism for a task to communicate with
// its caller, and there is no way to clear only the
// cancellation interrupt.
//
// Thread.interrupted();
}
handlePossibleCancellationInterrupt方法要確保cancel(true)產(chǎn)生的中斷發(fā)生在run或runAndReset方法執(zhí)行的過(guò)程中氛濒。這里會(huì)循環(huán)的調(diào)用Thread.yield()來(lái)確保狀態(tài)在cancel方法中被設(shè)置為INTERRUPTED产场。
這里不能夠清除中斷標(biāo)記,因?yàn)椴荒艽_定中斷一定來(lái)自于cancel方法舞竿。
finishCompletion方法
private void finishCompletion() {
// assert state > COMPLETING;
// 執(zhí)行該方法時(shí)state必須大于COMPLETING
// 逐個(gè)喚醒waiters中的線程
for (WaitNode q; (q = waiters) != null;) {
// 設(shè)置棧頂節(jié)點(diǎn)為null
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
// 喚醒線程
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
// 如果next為空京景,說(shuō)明棧空了骗奖,跳出循環(huán)
WaitNode next = q.next;
if (next == null)
break;
// 方便gc回收
q.next = null; // unlink to help gc
// 重新設(shè)置棧頂node
q = next;
}
break;
}
}
// 鉤子方法
done();
callable = null; // to reduce footprint
}
在調(diào)用get方法時(shí)确徙,如果任務(wù)還沒(méi)有執(zhí)行結(jié)束,則會(huì)阻塞調(diào)用的線程执桌,然后把調(diào)用的線程放入waiters中鄙皇,這時(shí),如果任務(wù)執(zhí)行完畢仰挣,也就是調(diào)用了finishCompletion方法伴逸,waiters會(huì)依次出棧并逐個(gè)喚醒對(duì)應(yīng)的線程。
由此可以想到膘壶,WaitNode一定是在get方法中被添加到棧中的违柏,下面來(lái)看下get方法的實(shí)現(xiàn)。
get方法
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
這兩個(gè)方法類似香椎,首先判斷狀態(tài),如果s <= COMPLETING
禽篱,說(shuō)明任務(wù)已經(jīng)執(zhí)行完畢畜伐,但set方法或setException方法還未執(zhí)行結(jié)束(還未設(shè)置狀態(tài)為NORMAL或EXCEPTIONAL),這時(shí)需要將當(dāng)前線程添加到waiters中并阻塞躺率。
第二種get提供了超時(shí)功能玛界,如果在規(guī)定時(shí)間內(nèi)任務(wù)還未執(zhí)行完畢或者狀態(tài)還是COMPLETING万矾,則獲取結(jié)果超時(shí),拋出TimeoutException慎框。而第一種get會(huì)一直阻塞直到state > COMPLETING
良狈。
awaitDone方法
awaitDone方法的工作是根據(jù)狀態(tài)來(lái)判斷是否能夠返回結(jié)果,如果任務(wù)還未執(zhí)行完畢笨枯,要添加到waiters中并阻塞薪丁,否則返回狀態(tài)。代碼如下:
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// 計(jì)算到期時(shí)間
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
// 如果被中斷馅精,刪除節(jié)點(diǎn)严嗜,拋出異常
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// 如果任務(wù)執(zhí)行完畢并且設(shè)置了最終狀態(tài)或者被取消,則返回
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
// s == COMPLETING時(shí)通過(guò)Thread.yield();讓步其他線程執(zhí)行洲敢,
// 主要是為了讓狀態(tài)改變
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// 創(chuàng)建一個(gè)WaitNode
else if (q == null)
q = new WaitNode();
// CAS設(shè)置棧頂節(jié)點(diǎn)
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 如果設(shè)置了超時(shí)漫玄,則計(jì)算是否已經(jīng)到了開(kāi)始設(shè)置的到期時(shí)間
else if (timed) {
nanos = deadline - System.nanoTime();
// 如果已經(jīng)到了到期時(shí)間,刪除節(jié)點(diǎn)压彭,返回狀態(tài)
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
// 阻塞到到期時(shí)間
LockSupport.parkNanos(this, nanos);
}
// 如果沒(méi)有設(shè)置超時(shí)睦优,會(huì)一直阻塞,直到被中斷或者被喚醒
else
LockSupport.park(this);
}
}
removeWaiter方法
private void removeWaiter(WaitNode node) {
if (node != null) {
// 將thread設(shè)置為null是因?yàn)橄旅嬉鶕?jù)thread是否為null判斷是否要把node移出
node.thread = null;
// 這里自旋保證刪除成功
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
// q.thread != null說(shuō)明該q節(jié)點(diǎn)不需要移除
if (q.thread != null)
pred = q;
// 如果q.thread == null壮不,且pred != null汗盘,需要?jiǎng)h除q節(jié)點(diǎn)
else if (pred != null) {
// 刪除q節(jié)點(diǎn)
pred.next = s;
// pred.thread == null時(shí)說(shuō)明在并發(fā)情況下被其他線程修改了;
// 返回第一個(gè)for循環(huán)重試
if (pred.thread == null) // check for race
continue retry;
}
// 如果q.thread != null且pred == null忆畅,說(shuō)明q是棧頂節(jié)點(diǎn)
// 設(shè)置棧頂元素為s節(jié)點(diǎn)衡未,如果失敗則返回重試
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}
cancel方法
cancel方法用于取消任務(wù),這里可能有兩種情況家凯,一種是任務(wù)已經(jīng)執(zhí)行了缓醋,另一種是還未執(zhí)行,代碼如下:
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
// mayInterruptIfRunning參數(shù)表示是否要進(jìn)行中斷
if (mayInterruptIfRunning) {
try {
// runner保存著當(dāng)前執(zhí)行任務(wù)的線程
Thread t = runner;
// 中斷線程
if (t != null)
t.interrupt();
} finally { // final state
// 設(shè)置最終狀態(tài)為INTERRUPTED
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
第一個(gè)if判斷可能有些不好理解绊诲,其實(shí)等價(jià)于如下代碼:
if (!state == NEW ||
!UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED))
如果狀態(tài)不是NEW送粱,或者設(shè)置狀態(tài)為INTERRUPTING或CANCELLED失敗,則取消失敗掂之,返回false抗俄。
簡(jiǎn)單來(lái)說(shuō)有一下兩種情況:
- 如果當(dāng)前任務(wù)還沒(méi)有執(zhí)行,那么state == NEW世舰,那么會(huì)嘗試設(shè)置狀態(tài)动雹,如果設(shè)置狀態(tài)失敗會(huì)返回false,表示取消失敻埂胰蝠;
- 如果當(dāng)前任務(wù)已經(jīng)被執(zhí)行了,那么state > NEW,也就是!state == NEW為true茸塞,直接返回false躲庄。
也就是說(shuō),如果任務(wù)一旦開(kāi)始執(zhí)行了(state != NEW)钾虐,那么就不能被取消噪窘。
如果mayInterruptIfRunning為true,要中斷當(dāng)前執(zhí)行任務(wù)的線程效扫。
report方法
get方法在調(diào)用awaitDone方法后倔监,會(huì)調(diào)用report方法進(jìn)行返回:
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
很簡(jiǎn)單,可以看到有3中執(zhí)行情況:
- 如果
s == NORMAL
為true荡短,說(shuō)明是正常執(zhí)行結(jié)束丐枉,那么根據(jù)上述的分析,在正常執(zhí)行結(jié)束時(shí)outcome存放的是返回結(jié)果掘托,把outcome返回瘦锹; - 如果
s >= CANCELLED
為true,說(shuō)明是被取消了闪盔,拋出CancellationException弯院; - 如果
s < CANCELLED
,那么狀態(tài)只能是是EXCEPTIONAL泪掀,表示在執(zhí)行過(guò)程中出現(xiàn)了異常听绳,拋出ExecutionException。
runAndReset方法
該方法和run方法類似异赫,區(qū)別在于這個(gè)方法不會(huì)設(shè)置任務(wù)的執(zhí)行結(jié)果值椅挣,所以在正常執(zhí)行時(shí),不會(huì)修改state塔拳,除非發(fā)生了異呈笾ぃ或者中斷,最后返回是否正確的執(zhí)行并復(fù)位:
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
// 不獲取和設(shè)置返回值
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
// 是否正確的執(zhí)行并復(fù)位
return ran && s == NEW;
}
總結(jié)
本文分析了FutureTask的執(zhí)行過(guò)程和獲取返回值的過(guò)程靠抑,要注意以下幾個(gè)地方:
- FutureTask是線程安全的量九,在多線程下任務(wù)也只會(huì)被執(zhí)行一次;
- 注意在執(zhí)行時(shí)各種狀態(tài)的切換颂碧;
- get方法調(diào)用時(shí)荠列,如果任務(wù)沒(méi)有結(jié)束,要阻塞當(dāng)前線程载城,法阻塞的線程會(huì)保存在一個(gè)Treiber Stack中肌似;
- get方法超時(shí)功能如果超時(shí)未獲取成功,會(huì)拋出TimeoutException诉瓦;
- 注意在取消時(shí)的線程中斷锈嫩,在run方法中一定要保證結(jié)束時(shí)的狀態(tài)是INTERRUPTED受楼,否則在cancel方法中可能沒(méi)有執(zhí)行interrupt,造成中斷的泄露呼寸。