1、簡介
FutureTask是一種異步任務(或異步計算)幕随,舉個栗子竟趾,主線程的邏輯中需要使用某個值,但這個值需要負責的運算得來泼疑,那么主線程可以提前建立一個異步任務來計算這個值(在其他的線程中計算)德绿,然后去做其他事情,當需要這個值的時候再通過剛才建立的異步任務來獲取這個值退渗,有點并行的意思移稳,這樣可以縮短整個主線程邏輯的執(zhí)行時間。
與1.6版本不同会油,1.7的FutureTask不再基于AQS來構建个粱,而是在內部采用簡單的Treiber Stack來保存等待線程。
2翻翩、框架
我們先來看看類圖:
可以看到FutureTask實現了Runnable接口和Future接口都许,因此FutureTask可以傳遞到線程對象Thread或Excutor(線程池)來執(zhí)行。
如果在當前線程中需要執(zhí)行比較耗時的操作嫂冻,但又不想阻塞當前線程時胶征,可以把這些作業(yè)交給FutureTask,另開一個線程在后臺完成桨仿,當當前線程將來需要時睛低,就可以通過FutureTask對象獲得后臺作業(yè)的計算結果或者執(zhí)行狀態(tài)。
我們來看看它的構造方法
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
可見,構造一個FutureTask很簡單钱雷,可以通過一個Callable來構建骂铁,也可以通過一個Runnable和一個result來構建。
這里要注意的是必須把state的寫放到最后急波,因為state本身由volatile修飾从铲,所以可以保證callable的可見性。(因為后續(xù)讀callable之前會先讀state澄暮,還記得這個volatile寫讀的HB規(guī)則吧)名段。
接下來我們看一下它的內部結構:
public class FutureTask<V> implements RunnableFuture<V> {
/**
* 內部狀態(tài)可能得遷轉過程:
* NEW -> COMPLETING -> NORMAL //正常完成
* NEW -> COMPLETING -> EXCEPTIONAL //發(fā)生異常
* NEW -> CANCELLED //取消
* NEW -> INTERRUPTING -> INTERRUPTED //中斷
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/** 內部的callable,運行完成后設置為null */
private Callable<V> callable;
/** 如果正常完成泣懊,就是執(zhí)行結果伸辟,通過get方法獲取馍刮;如果發(fā)生異常信夫,就是具體的異常對象,通過get方法拋出卡啰。 */
private Object outcome; // 本身沒有volatile修飾, 依賴state的讀寫來保證可見性静稻。
/** 執(zhí)行內部callable的線程。 */
private volatile Thread runner;
/** 存放等待線程的Treiber Stack*/
private volatile WaitNode waiters;
}
內部結構很明確匈辱,重點看下WaitNode的結構吧:
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
這個也很簡單振湾,就是包含了當前線程對象,并有指向下一個WaitNode的指針亡脸,所謂的Treiber Stack就是由WaitNode組成的(一個單向鏈表)押搪。
經常使用FutureTask的話一定會非常熟悉它的運行過程:
- 創(chuàng)建任務,實際使用時浅碾,一般會結合線程池(ThreadPoolExecutor)使用大州,所以是在線程池內部創(chuàng)建FutureTask。
- 執(zhí)行任務垂谢,一般會有由工作線程(對于我們當前線程來說的其他線程)調用FutureTask的run方法厦画,完成執(zhí)行。
- 獲取結果滥朱,一般會有我們的當前線程去調用get方法來獲取執(zhí)行結果苛白,如果獲取時,任務并沒有被執(zhí)行完畢焚虱,當前線程就會被阻塞购裙,直到任務被執(zhí)行完畢,然后獲取結果鹃栽。
- 取消任務躏率,某些情況下會放棄任務的執(zhí)行躯畴,進行任務取消。
接下來我們從源碼的角度看下執(zhí)行任務過程薇芝,也就是運行相關方法吧 蓬抄。
3、源碼分析
run()
public void run() {
// 如果state是NEW夯到,設置線程為當前線程
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 調用Callable的call方法嚷缭,得到結果
result = c.call();
ran = true;
} catch (Throwable ex) {
// 處理異常狀態(tài)和結果
result = null;
ran = false;
setException(ex);
}
if (ran)
// 正常處理設置狀態(tài)和結果
set(result);
}
} finally {
// runner必須在設置了state之后再置空,避免run方法出現并發(fā)問題耍贾。
runner = null;
// 這里還必須再讀一次state阅爽,避免丟失中斷。
int s = state;
if (s >= INTERRUPTING)
// 處理可能發(fā)生的取消中斷(cancel(true))荐开。
handlePossibleCancellationInterrupt(s);
}
}
看下run過程中付翁,正常完成后調用的set方法:
/**
* 設置結果,狀態(tài)從 NEW 變?yōu)?COMPLETING
* 設置返回結果為t(正常結果)
* 改變狀態(tài)從 COMPLETING 到 NORMAL
* 調用finishCompletion完成收尾工作
*/
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
set過程中晃听,首先嘗試將當前任務狀態(tài)state從NEW改為COMPLETING百侧。如果成功的話,再設置執(zhí)行結果到outcome能扒。然后將state再次設置為NORMAL佣渴,注意這次使用的是putOrderedInt,其實就是原子量的LazySet內部使用的方法初斑。為什么使用這個方法观话?首先LazySet相對于Volatile-Write來說更廉價,因為它沒有昂貴的Store/Load屏障越平,只有Store/Store屏障(x86下Store/Store屏障是一個空操作),其次灵迫,后續(xù)線程不會及時的看到state從COMPLETING變?yōu)镹ORMAL秦叛,但這沒什么關系,而且NORMAL是state的最終狀態(tài)之一瀑粥,以后不會在變化了挣跋。
上述過程最后還調用了一個finishCompletion方法:
/**
* 遍歷waiters的next節(jié)點,喚醒節(jié)點的線程并把引用變?yōu)閚ull狞换,等待GC
*/
private void finishCompletion() {
for (WaitNode q; (q = waiters) != null;) {
// 嘗試將waiters設置為null避咆。
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
// 然后將waiters中的等待線程全部喚醒。
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t); // 喚醒線程
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
// 回調下鉤子方法修噪。
done();
// 置空callable查库,減少內存占用
callable = null;
}
可見,finishCompletion主要就是在任務執(zhí)行完畢后黄琼,移除Treiber Stack樊销,并將Treiber Stack中所有等待獲取任務結果的線程喚醒,然后回調下done鉤子方法。
看完了set围苫,再看下run過程中如果發(fā)生異常裤园,調用的setException方法:
/**
* 發(fā)生異常,狀態(tài)從 NEW 變?yōu)?COMPLETING
* 設置返回結果為t(異常結果)
* 改變狀態(tài)從 COMPLETING 到 EXCEPTIONAL
* 調用finishCompletion完成收尾工作
*/
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
和set方法一個套路剂府。
最后看下run過程中最后調用的handlePossibleCancellationInterrupt方法:
/**
* 確保cancel(true)產生的中斷發(fā)生在run或runAndReset方法過程中拧揽。
*/
private void handlePossibleCancellationInterrupt(int s) {
// 如果當前正在中斷過程中,自旋等待一下腺占,等中斷完成淤袜。
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
// 這里的state狀態(tài)一定是INTERRUPTED;
// 這里不能清除中斷標記,因為沒辦法區(qū)分來自cancel(true)的中斷湾笛。
// Thread.interrupted();
}
這里總結一下run方法:
- 只有state為NEW的時候才執(zhí)行任務(調用內部callable的run方法)饮怯。執(zhí)行前會原子的設置執(zhí)行線程(runner),防止競爭嚎研。
- 如果任務執(zhí)行成功蓖墅,設置執(zhí)行結果,狀態(tài)變更:NEW -> COMPLETING -> NORMAL临扮。
- 如果任務執(zhí)行發(fā)生異常论矾,設置異常結果,狀態(tài)變更:NEW -> COMPLETING -> EXCEPTIONAL杆勇。
- 將Treiber Stack中等待當前任務執(zhí)行結果的等待節(jié)點中的線程全部喚醒贪壳,同時刪除這些等待節(jié)點,將整個Treiber Stack置空蚜退。
- 最后別忘了等一下可能發(fā)生的cancel(true)中引起的中斷闰靴,讓這些中斷發(fā)生在執(zhí)行任務過程中(別泄露出去)。
runAndReset()
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}
該方法和run方法的區(qū)別是钻注,run方法只能被運行一次任務蚂且,而該方法可以多次運行任務。而runAndReset這個方法不會設置任務的執(zhí)行結果值,如果該任務成功執(zhí)行完成后幅恋,不修改state的狀態(tài)杏死,還是可運行(NEW)狀態(tài),如果取消任務或出現異常捆交,則不會再次執(zhí)行淑翼。
get()
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L); // 如果任務還沒執(zhí)行完畢,等待任務執(zhí)行完畢品追。
return report(s); // 如果任務執(zhí)行完畢玄括,獲取執(zhí)行結果色难。
}
看下awaitDone方法
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// 先算出到期時間瞻佛。
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
// 如果當前線程被中斷,移除等待節(jié)點q徒仓,然后拋出中斷異常。
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
// 如果任務已經執(zhí)行完畢
if (q != null)
q.thread = null; // 如果q不為null洁墙,將q中的thread置空蛹疯。
return s; // 返回任務狀態(tài)。
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield(); // 如果當前正在完成過程中热监,出讓CPU捺弦。
else if (q == null)
q = new WaitNode(); // 創(chuàng)建一個等待節(jié)點。
else if (!queued)
// 將q(包含當前線程的等待節(jié)點)入隊孝扛。
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
//如果超時列吼,移除等待節(jié)點q
removeWaiter(q);
//返回任務狀態(tài)。
return state;
}
//超時的話苦始,就阻塞給定時間寞钥。
LockSupport.parkNanos(this, nanos);
}
else
//沒設置超時的話,就阻塞當前線程陌选。
LockSupport.park(this);
}
}
再看下awaitDone方法中調用的removeWaiter:
private void removeWaiter(WaitNode node) {
if (node != null) {
//將node的thread域置空理郑。
node.thread = null;
//下面過程中會將node從等待隊列中移除,以thread域為null為依據咨油,
//如果過程中發(fā)生了競爭您炉,重試。
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}
再看下get方法中獲取結果時調用的report:
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
report如果是正常狀態(tài)役电,就返回結果赚爵。否則拋出異常。
看完了get方法法瑟,再看下get(long timeout, TimeUnit unit)方法:
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
小結一下get方法:
- 首先檢查當前任務的狀態(tài)冀膝,如果狀態(tài)表示執(zhí)行完成,進入第2步霎挟。
- 獲取執(zhí)行結果窝剖,也可能得到取消或者執(zhí)行異常,get過程結束氓扛。
- 如果當前任務狀態(tài)表示未執(zhí)行或者正在執(zhí)行,那么當前線程放入一個新建的等待節(jié)點论笔,然后進入Treiber Stack進行阻塞等待采郎。
- 如果任務被工作線程(對當前線程來說是其他線程)執(zhí)行完畢,執(zhí)行完畢時工作線程會喚醒Treiber Stack上等待的所有線程狂魔,所以當前線程被喚醒蒜埋,清空當前等待節(jié)點上的線程域,然后進入第2步最楷。
- 當前線程在阻塞等待結果過程中可能被中斷整份,如果被中斷待错,那么會移除當前線程在Treiber Stack上對應的等待節(jié)點,然后拋出中斷異常烈评,get過程結束火俄。
- 當前線程也可能執(zhí)行帶有超時時間的阻塞等待,如果超時時間過了讲冠,還沒得到執(zhí)行結果瓜客,那么會除當前線程在Treiber Stack上對應的等待節(jié)點,然后拋出超時異常竿开,get過程結束谱仪。
cancel(boolean)
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
// mayInterruptIfRunning并且有正在運行的線程,調用interrupt中斷否彩,最后設置狀態(tài)為INTERRUPTED
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
cancel分為兩種情況:
- mayInterruptIfRunning == true疯攒。這個時候狀態(tài)從 NEW 變?yōu)?INTERRUPTING ,如果有正在運行的線程列荔,調用interrupt中斷敬尺,最后把狀態(tài)從 INTERRUPTING 變?yōu)?INTERRUPTED。
- mayInterruptIfRunning == false肌毅。這個時候狀態(tài)從 NEW 變?yōu)?CANCELLED筷转。
- 最后都會執(zhí)行finishCompletion方法,完成結束的收尾工作悬而。喚醒所有在get()方法等待的線程呜舒。
4、jdk1.6不同的地方
為什么jdk 1.6以后的FutureTask不像1.6那樣基于AQS構建了笨奠?
首先袭蝗,前面貼代碼了時候故意去掉了一些注釋,避免讀代碼的時候受影響般婆,現在我們來看一下關鍵的一段:
/*
* Revision notes: This differs from previous versions of this
* class that relied on AbstractQueuedSynchronizer, mainly to
* avoid surprising users about retaining interrupt status during
* cancellation races.
*/
主要是這句:mainly to avoid surprising users about retaining interrupt status during cancellation races到腥。
大概意思是:使用AQS的方式,可能會在取消發(fā)生競爭過程中詭異的保留了中斷狀態(tài)蔚袍。這里之所以沒有采用這種方式乡范,是為了避免這種情況的發(fā)生。
具體什么情況下會發(fā)生呢啤咽?
ThreadPoolExecutor executor = ...;
executor.submit(task1).cancel(true);
executor.submit(task2);
看上面的代碼晋辆,雖然中斷的是task1,但可能task2得到中斷信號宇整。
原因是什么呢瓶佳?看下JDK1.6的FutureTask的中斷代碼:
boolean innerCancel(boolean mayInterruptIfRunning) {
for (;;) {
int s = getState();
if (ranOrCancelled(s))
return false;
if (compareAndSetState(s, CANCELLED))
break;
}
if (mayInterruptIfRunning) {
Thread r = runner;
if (r != null) //第1行
r.interrupt(); //第2行
}
releaseShared(0);
done();
return true;
}
結合上面代碼例子看一下,如果主線程執(zhí)行到第1行的時候鳞青,線程池可能會認為task1已經執(zhí)行結束(被取消)霸饲,然后讓之前執(zhí)行task1工作線程去執(zhí)行task2为朋,工作線程開始執(zhí)行task2之后,然后主線程執(zhí)行第2行(我們會發(fā)現并沒有任何同步機制來阻止這種情況的發(fā)生)厚脉,這樣就會導致task2被中斷了习寸。更多的相關信息參考這個Bug說明。
所以現在就能更好的理解JDK1.7 FutureTask的handlePossibleCancellationInterrupt中為什么要將cancel(true)中的中斷保留在當前run方法運行范圍內了吧!
JDK1.7的FutureTask的代碼解析完畢器仗!