網(wǎng)上一搜取消正在執(zhí)行的異步任務,會出現(xiàn)很多
Future
戚嗅,FutureTask
相關的文章雨涛,最近我也用了一下FutureTask
枢舶,這里記錄一下使用中遇到的問題,最后結合源碼分析一下替久。
- FutureTask的用法凉泄。
- 開發(fā)中我遇到的問題。
- 結合FutureTask的源碼分析問題蚯根。
1. FutureTask的用法
在Java中后众,一般是通過繼承
Thread
類或者實現(xiàn)Runnable
接口來創(chuàng)建多線程,Runnable
接口不能返回結果颅拦,如果要獲取子線程的執(zhí)行結果蒂誉,一般都是在子線程執(zhí)行結束之后,通過Handler
將結果返回到調(diào)用線程距帅,jdk1.5之后右锨,Java提供了Callable
接口來封裝子任務,Callable
接口可以獲取返回結果碌秸。
與FutureTask
相關的類或接口绍移,有Runnable
,Callable
讥电,Future
蹂窖,直接從Callable
開始。
Callable接口
下面可以看一下Callable
接口的定義:
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
Callable
接口很簡單恩敌,是一個泛型接口瞬测,就是定義了一個call()方法,與Runnable
的run()
方法相比纠炮,這個有返回值月趟,泛型V
就是要返回的結果類型,可以返回子任務的執(zhí)行結果抗碰。
Future接口
Future
接口表示異步計算的結果狮斗,通過Future
接口提供的方法绽乔,可以很方便的查詢異步計算任務是否執(zhí)行完成弧蝇,獲取異步計算的結果,取消未執(zhí)行的異步任務折砸,或者中斷異步任務的執(zhí)行看疗,接口定義如下:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
-
cancel(boolean mayInterruptIfRunning)
:取消子任務的執(zhí)行,如果這個子任務已經(jīng)執(zhí)行結束睦授,或者已經(jīng)被取消两芳,或者不能被取消,這個方法就會執(zhí)行失敗并返回false
去枷;如果子任務還沒有開始執(zhí)行怖辆,那么子任務會被取消是复,不會再被執(zhí)行;如果子任務已經(jīng)開始執(zhí)行了竖螃,但是還沒有執(zhí)行結束淑廊,根據(jù)mayInterruptIfRunning
的值,如果mayInterruptIfRunning = true
特咆,那么會中斷執(zhí)行任務的線程季惩,然后返回true,如果參數(shù)為false腻格,會返回true画拾,不會中斷執(zhí)行任務的線程。這個方法在FutureTask
的實現(xiàn)中有很多值得關注的地方菜职,后面再細說青抛。
需要注意,這個方法執(zhí)行結束酬核,返回結果之后脂凶,再調(diào)用isDone()
會返回true。 -
isCancelled()
愁茁,判斷任務是否被取消蚕钦,如果任務執(zhí)行結束(正常執(zhí)行結束和發(fā)生異常結束,都算執(zhí)行結束)前被取消鹅很,也就是調(diào)用了cancel()
方法嘶居,并且cancel()
返回true,則該方法返回true促煮,否則返回false. -
isDone()
:判斷任務是否執(zhí)行結束邮屁,正常執(zhí)行結束,或者發(fā)生異常結束菠齿,或者被取消佑吝,都屬于結束,該方法都會返回true. -
V get()
:獲取結果绳匀,如果這個計算任務還沒有執(zhí)行結束芋忿,該調(diào)用線程會進入阻塞狀態(tài)。如果計算任務已經(jīng)被取消疾棵,調(diào)用get()
會拋出CancellationException
戈钢,如果計算過程中拋出異常,該方法會拋出ExecutionException
是尔,如果當前線程在阻塞等待的時候被中斷了殉了,該方法會拋出InterruptedException
。 -
V get(long timeout, TimeUnit unit)
:帶超時限制的get()拟枚,等待超時之后薪铜,該方法會拋出TimeoutException
众弓。
FutureTask
FutureTask
可以像Runnable
一下,封裝異步任務隔箍,然后提交給Thread
或線程池執(zhí)行田轧,然后獲取任務執(zhí)行結果。原因在于FutureTask
實現(xiàn)了RunnableFuture
接口鞍恢,RunnableFuture
是什么呢傻粘,其實就是Runnable
和Callable
的結合,它繼承自Runnable
和Callable
帮掉。繼承關系如下:
public class FutureTask<V> implements RunnableFuture<V> {
public interface RunnableFuture<V> extends Runnable, Future<V> {
FutureTask使用
- FutureTask + Thread
上面介紹過弦悉,F(xiàn)utureTask有Runnable接口和Callable接口的特征,可以被Thread執(zhí)行蟆炊。
//step1:封裝一個計算任務稽莉,實現(xiàn)Callable接口
class Task implements Callable<Boolean> {
@Override
public Boolean call() throws Exception {
try {
for (int i = 0; i < 10; i++) {
Log.d(TAG, "task......." + Thread.currentThread().getName() + "...i = " + i);
//模擬耗時操作
Thread.sleep(100);
}
} catch (InterruptedException e) {
Log.e(TAG, " is interrupted when calculating, will stop...");
return false; // 注意這里如果不return的話,線程還會繼續(xù)執(zhí)行涩搓,所以任務超時后在這里處理結果然后返回
}
return true;
}
}
//step2:創(chuàng)建計算任務污秆,作為參數(shù),傳入FutureTask
Task task = new Task();
FutureTask futureTask = new FutureTask(task);
//step3:將FutureTask提交給Thread執(zhí)行
Thread thread1 = new Thread(futureTask);
thread1.setName("task thread 1");
thread1.start();
//step4:獲取執(zhí)行結果昧甘,由于get()方法可能會阻塞當前調(diào)用線程良拼,如果子任務執(zhí)行時間不確定,最好在子線程中獲取執(zhí)行結果
try {
// boolean result = (boolean) futureTask.get();
boolean result = (boolean) futureTask.get(5, TimeUnit.SECONDS);
Log.d(TAG, "result:" + result);
} catch (InterruptedException e) {
Log.e(TAG, "守護線程阻塞被打斷...");
e.printStackTrace();
} catch (ExecutionException e) {
Log.e(TAG, "執(zhí)行任務時出錯...");
e.printStackTrace();
} catch (TimeoutException e) {
Log.e(TAG, "執(zhí)行超時...");
futureTask.cancel(true);
e.printStackTrace();
} catch (CancellationException e) {
//如果線程已經(jīng)cancel了充边,再執(zhí)行get操作會拋出這個異常
Log.e(TAG, "future已經(jīng)cancel了...");
e.printStackTrace();
}
- Future + ExecutorService
//step1 ......
//step2:創(chuàng)建計算任務
Task task = new Task();
//step3:創(chuàng)建線程池庸推,將Callable類型的task提交給線程池執(zhí)行,通過Future獲取子任務的執(zhí)行結果
ExecutorService executorService = Executors.newCachedThreadPool();
final Future<Boolean> future = executorService.submit(task);
//step4:通過future獲取執(zhí)行結果
boolean result = (boolean) future.get();
- FutureTask + ExecutorService
//step1 ......
//step2 ......
//step3:將FutureTask提交給線程池執(zhí)行
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(futureTask);
//step4 ......
2. 開發(fā)中我遇到的問題浇冰。
FutureTask使用還是比較簡單的贬媒,FutureTask
與Runnable
,最大的區(qū)別有兩個肘习,一個是可以獲取執(zhí)行結果际乘,另一個是可以取消,使用方法可以參考以上步驟漂佩,不過我在項目中使用FutureTask出現(xiàn)了以下兩個問題:
有的情況下脖含,使用
futuretask.cancel(true)
方法并不能真正的結束子任務執(zhí)行。FutureTask
的get(long timeout, TimeUnit unit)
方法仅仆,是等待timeout時間后器赞,獲取子線程的執(zhí)行結果垢袱,但是如果子任務執(zhí)行結束了墓拜,但是超時時間還沒有到,這個方法也會返回結果请契。
3. 結合FutureTask的源碼分析問題咳榜。
成員變量
下面夏醉,結合FutureTask的源碼,分析一下以上兩個問題涌韩。在此之前畔柔,先看一下FutureTask內(nèi)部比較值得關注的幾個成員變量。
-
private volatile int state
臣樱,state
用來標識當前任務的運行狀態(tài)靶擦。FutureTask
的所有方法都是圍繞這個狀態(tài)進行的,需要注意雇毫,這個值用volatile
(易變的)來標記玄捕,如果有多個子線程在執(zhí)行FutureTask,那么它們看到的都會是同一個state棚放,有如下幾個值:private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
NEW
:表示這是一個新的任務枚粘,或者還沒有執(zhí)行完的任務,是初始狀態(tài)飘蚯。
COMPLETING
:表示任務執(zhí)行結束(正常執(zhí)行結束馍迄,或者發(fā)生異常結束),但是還沒有將結果保存到outcome
中局骤。是一個中間狀態(tài)攀圈。
NORMAL
:表示任務正常執(zhí)行結束,并且已經(jīng)把執(zhí)行結果保存到outcome
字段中峦甩。是一個最終狀態(tài)量承。
EXCEPTIONAL
:表示任務發(fā)生異常結束,異常信息已經(jīng)保存到outcome
中穴店,這是一個最終狀態(tài)撕捍。
CANCELLED
:任務在新建之后,執(zhí)行結束之前被取消了泣洞,但是不要求中斷正在執(zhí)行的線程忧风,也就是調(diào)用了cancel(false)
,任務就是CANCELLED
狀態(tài)球凰,這時任務狀態(tài)變化是NEW -> CANCELLED狮腿。
INTERRUPTING
:任務在新建之后,執(zhí)行結束之前被取消了呕诉,并要求中斷線程的執(zhí)行缘厢,也就是調(diào)用了cancel(true)
,這時任務狀態(tài)就是INTERRUPTING
甩挫。這是一個中間狀態(tài)贴硫。
INTERRUPTED
:調(diào)用cancel(true)
取消異步任務,會調(diào)用interrupt()
中斷線程的執(zhí)行,然后狀態(tài)會從INTERRUPTING
變到INTERRUPTED
英遭。
狀態(tài)變化有如下4種情況:
NEW -> COMPLETING -> NORMAL --------------------------------------- 正常執(zhí)行結束的流程
NEW -> COMPLETING -> EXCEPTIONAL ---------------------執(zhí)行過程中出現(xiàn)異常的流程
NEW -> CANCELLED -------------------------------------------被取消间护,即調(diào)用了cancel(false)
NEW -> INTERRUPTING -> INTERRUPTED -------------被中斷,即調(diào)用了cancel(true)
private Callable<V> callable
挖诸,一個Callable
類型的變量汁尺,封裝了計算任務,可獲取計算結果多律。從上面的用法中可以看到痴突,FutureTask
的構造函數(shù)中,我們傳入的就是實現(xiàn)了Callable的接口的計算任務狼荞。private Object outcome
苞也,Object
類型的變量outcome
,用來保存計算任務的返回結果粘秆,或者執(zhí)行過程中拋出的異常如迟。private volatile Thread runner
,指向當前在運行Callable
任務的線程攻走,runner
在FutureTask中的賦值變化很值得關注殷勘,后面源碼會詳細介紹這個。private volatile WaitNode waiters
昔搂,WaitNode
是FutureTask的內(nèi)部類玲销,表示一個阻塞隊列,如果任務還沒有執(zhí)行結束摘符,那么調(diào)用get()獲取結果的線程會阻塞贤斜,在這個阻塞隊列中排隊等待。
成員函數(shù)
下面從構造函數(shù)說起逛裤,看一下FutureTask的源碼瘩绒。
1. 構造函數(shù)
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
FutureTask的第一個構造函數(shù),參數(shù)是Callable類型的變量带族。將傳入的參數(shù)賦值給this.callable锁荔,然后設置state
狀態(tài)為NEW,表示這是新任務蝙砌。
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
FutureTask還有一個構造函數(shù)阳堕,接收Runnable類型的參數(shù),通過Executors.callable(runnable, result)
將傳入的Runnable
和result
轉換成Callable
類型择克。使用該構造方法恬总,可以定制返回結果。
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
可以看一下Executors.callable(runnable, result)
方法肚邢,這里通過適配器模式進行適配壹堰,創(chuàng)建一個RunnableAdapter
適配器。
private static final class RunnableAdapter<T> implements Callable<T> {
private final Runnable task;
private final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
RunnableAdapter
是Executors
的內(nèi)部類,實現(xiàn)也比較簡單缀旁,實現(xiàn)了適配對象Callable
接口记劈,在call()
方法中執(zhí)行Runnable
的run()
勺鸦,然后返回result
并巍。
2. 任務被執(zhí)行——run()
FutureTask封裝了計算任務,無論是提交給Thread執(zhí)行换途,或者線程池執(zhí)行懊渡,調(diào)用的都是FutureTask
的run()
。
public void run() {
//1.判斷狀態(tài)是否是NEW军拟,不是NEW剃执,說明任務已經(jīng)被其他線程執(zhí)行,甚至執(zhí)行結束懈息,或者被取消了肾档,直接返回
//2.調(diào)用CAS方法,判斷RUNNER為null的話辫继,就將當前線程保存到RUNNER中怒见,設置RUNNER失敗,就直接返回
if (state != NEW ||
!U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//3.執(zhí)行Callable任務姑宽,結果保存到result中
result = c.call();
ran = true;
} catch (Throwable ex) {
//3.1 如果執(zhí)行任務過程中發(fā)生異常遣耍,將調(diào)用setException()設置異常
result = null;
ran = false;
setException(ex);
}
//3.2 任務正常執(zhí)行結束調(diào)用set(result)保存結果
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
//4. 任務執(zhí)行結束,runner設置為null炮车,表示當前沒有線程在執(zhí)行這個任務了
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
//5. 讀取狀態(tài)舵变,判斷是否在執(zhí)行的過程中,被中斷了瘦穆,如果被中斷纪隙,處理中斷
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
- 首先,判斷state的值是不是NEW扛或,如果不是NEW瘫拣,說明線程已經(jīng)被執(zhí)行了,可能已經(jīng)執(zhí)行結束告喊,或者被取消了麸拄,直接返回。
- 這里其實是調(diào)用了
Unsafe
的CAS方法黔姜,讀取并設置runner
的值拢切,將當前線程保存到runner
中,表示當前正在執(zhí)行任務的線程秆吵』匆可以看到,這里設置的其實是RUNNER
,和前面介紹的Thread
類型的runner
變量不一樣的主穗,那為什么還說設置的是runner
的值泻拦?RUNNER在FutureTask中定義如下:
private static final long RUNNER;
//RUNNER是一個long類型的變量,指向runner字段的偏移地址忽媒,相當于指針
RUNNER = U.objectFieldOffset
(FutureTask.class.getDeclaredField("runner"));
關于Unsafe
的CAS方法争拐,簡單介紹一下,它提供了一種對runner
進行原子操作的方法晦雨,原子操作架曹,意味著,這個操作不會被打斷闹瞧。runner
被volatile
字段修飾绑雄,只能保證,當多個子線程在執(zhí)行FutureTask的時候奥邮,它們讀取到的runner
的值是同一個万牺,但是不能保證原子操作,所以很容易讀到臟數(shù)據(jù)(舉個例子:線程A準備對runner進行讀和寫操作洽腺,讀取到runner的值為null脚粟,這是,cpu切換執(zhí)行線程B已脓,線程B讀取到runner的值也是null珊楼,然后又切換到線程A執(zhí)行,線程A對runner賦值thread-A度液,此時runner的值已經(jīng)不再是null深浮,線程B讀取到的runner=null就是臟數(shù)據(jù))烤蜕,用Unsafe
的CAS方法孕锄,來對runner進行讀寫座掘,就能保證原子操作。多個線程訪問run()方法時霹购,會在這里同步佑惠。
- 讀取
callable
變量,執(zhí)行call()
齐疙,并獲取執(zhí)行結果膜楷。
如果執(zhí)行call()的過程中發(fā)生異常,就調(diào)用setException()
設置異常贞奋,setException()
定義如下:
protected void setException(Throwable t) {
if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
outcome = t;
U.putOrderedInt(this, STATE, EXCEPTIONAL); // final state
finishCompletion();
}
}
//a. 調(diào)用Unsafe的CAS方法赌厅,state從NEW --> COMPLETING,這里的STATE和上面的RUNNER定義類似轿塔,指向state字段的偏移地址特愿。
//b. 將異常信息保存到outcome字段仲墨,state變成EXCEPTIONAL。
//c. 調(diào)用finishCompletion()揍障。
//NEW --> COMPLETING --> EXCEPTIONAL目养。
如果任務正常執(zhí)行結束,就調(diào)用set(result)保存結果毒嫡,定義如下:
protected void set(V v) {
if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
outcome = v;
U.putOrderedInt(this, STATE, NORMAL); // final state
finishCompletion();
}
}
//a. 和setException()類似癌蚁,state從NEW --> COMPLETING。
//b. 將正常執(zhí)行的結果result保存到outcome审胚,state變成NORMAL匈勋。
//c. 調(diào)用finishCompletion()礼旅。NEW --> COMPLETING --> NORMAL膳叨。
- 任務執(zhí)行結束,
runner
設置為null痘系,表示當前沒有線程在執(zhí)行這個任務了菲嘴。 - 讀取
state
狀態(tài),判斷是否在執(zhí)行的過程中被中斷了汰翠,如果被中斷龄坪,處理中斷,看一下這個中斷處理:
private void handlePossibleCancellationInterrupt(int s) {
// It is possible for our interrupter to stall before getting a
// chance to interrupt us. Let's spin-wait patiently.
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
}
如果狀態(tài)是INTERRUPTING
复唤,表示正在被中斷健田,這時就讓出線程的執(zhí)行權,給其他線程來執(zhí)行佛纫。
3. 獲取任務的執(zhí)行結果——get()
一般情況下妓局,執(zhí)行任務的線程和獲取結果的線程不會是同一個,當我們在主線程或者其他線程中呈宇,獲取計算任務的結果時好爬,就會調(diào)用get方法,如果這時計算任務還沒有執(zhí)行完成甥啄,調(diào)用get()的線程就會阻塞等待存炮。get()
實現(xiàn)如下:
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
- 讀取任務的執(zhí)行狀態(tài) state ,如果
state <= COMPLETING
蜈漓,說明線程還沒有執(zhí)行完(run()中可以看到穆桂,只有任務執(zhí)行結束,或者發(fā)生異常的時候融虽,state才會被設置成COMPLETING)享完。 - 調(diào)用
awaitDone(false, 0L)
,進入阻塞狀態(tài)衣形⊥障溃看一下awaitDone(false, 0L)
的實現(xiàn):
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
long startTime = 0L; // Special value 0L means not yet parked
WaitNode q = null;
boolean queued = false;
for (;;) {
//1. 讀取狀態(tài)
//1.1 如果s > COMPLETING姿鸿,表示任務已經(jīng)執(zhí)行結束,或者發(fā)生異常結束了倒源,就不會阻塞苛预,直接返回
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
//1.2 如果s == COMPLETING,表示任務結束(正常/異常)笋熬,但是結果還沒有保存到outcome字段热某,當前線程讓出執(zhí)行權,給其他線程先執(zhí)行
else if (s == COMPLETING)
// We may have already promised (via isDone) that we are done
// so never return empty-handed or throw InterruptedException
Thread.yield();
//2. 如果調(diào)用get()的線程被中斷了胳螟,就從等待的線程棧中移除這個等待節(jié)點昔馋,然后拋出中斷異常
else if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
//3. 如果等待節(jié)點q=null,就創(chuàng)建一個等待節(jié)點
else if (q == null) {
if (timed && nanos <= 0L)
return s;
q = new WaitNode();
}
//4. 如果這個等待節(jié)點還沒有加入等待隊列,就加入隊列頭
else if (!queued)
queued = U.compareAndSwapObject(this, WAITERS,
q.next = waiters, q);
//5. 如果設置了超時等待時間
else if (timed) {
//5.1 設置startTime,用于計算超時時間糖耸,如果超時時間到了秘遏,就等待隊列中移除當前節(jié)點
final long parkNanos;
if (startTime == 0L) { // first time
startTime = System.nanoTime();
if (startTime == 0L)
startTime = 1L;
parkNanos = nanos;
} else {
long elapsed = System.nanoTime() - startTime;
if (elapsed >= nanos) {
removeWaiter(q);
return state;
}
parkNanos = nanos - elapsed;
}
// nanoTime may be slow; recheck before parking
//5.2 如果超時時間還沒有到,而且任務還沒有結束嘉竟,就阻塞特定時間
if (state < COMPLETING)
LockSupport.parkNanos(this, parkNanos);
}
//6. 阻塞邦危,等待喚醒
else
LockSupport.park(this);
}
}
這里主要有幾個步驟:
a. 讀取state,如果s > COMPLETING舍扰,表示任務已經(jīng)執(zhí)行結束倦蚪,或者發(fā)生異常結束了,此時边苹,調(diào)用get()的線程就不會阻塞陵且;如果s == COMPLETING,表示任務結束(正常/異常)个束,但是結果還沒有保存到outcome字段慕购,當前線程讓出執(zhí)行權,給其他線程先執(zhí)行播急。
b. 判斷Thread.interrupted()
脓钾,如果調(diào)用get()
的線程被中斷了,就從等待的線程棧(其實就是一個WaitNode節(jié)點隊列或者說是棧)中移除這個等待節(jié)點桩警,然后拋出中斷異常可训。
c. 判斷q == null,如果等待節(jié)點q為null捶枢,就創(chuàng)建等待節(jié)點握截,這個節(jié)點后面會被插入阻塞隊列。
d. 判斷queued烂叔,這里是將c中創(chuàng)建節(jié)點q加入隊列頭谨胞。使用Unsafe
的CAS方法,對waiters
進行賦值蒜鸡,waiters
也是一個WaitNode節(jié)點胯努,相當于隊列頭牢裳,或者理解為隊列的頭指針。通過WaitNode可以遍歷整個阻塞隊列叶沛。
e. 之后蒲讯,判斷timed
,這是從get()
傳入的值灰署,表示是否設置了超時時間判帮。設置超時時間之后,調(diào)用get()
的線程最多阻塞nanos溉箕,就會從阻塞狀態(tài)醒過來晦墙。如果沒有設置超時時間,就直接進入阻塞狀態(tài)肴茄,等待被其他線程喚醒晌畅。
awaitDone()
方法內(nèi)部有一個無限循環(huán),看似有很多判斷独郎,比較難理解踩麦,其實這個循環(huán)最多循環(huán)3次枚赡。
假設Thread A執(zhí)行了get()獲取計算任務執(zhí)行結果氓癌,但是子任務還沒有執(zhí)行完,而且Thread A沒有被中斷贫橙,它會進行以下步驟贪婉。
step1:Thread A執(zhí)行了awaitDone()
,1卢肃,2兩次判斷都不成立疲迂,Thread A判斷q=null,會創(chuàng)建一個WaitNode節(jié)點q莫湘,然后進入第二次循環(huán)尤蒿。
step2:第二次循環(huán),判斷4不成立幅垮,此時將step1創(chuàng)建的節(jié)點q加入隊列頭腰池。
step3:第三次循環(huán),判斷是否設置了超時時間忙芒,如果設置了超時時間示弓,就阻塞特定時間,否則呵萨,一直阻塞奏属,等待被其他線程喚醒。
- 從
awaitDone()
返回潮峦,最后調(diào)用report(int s)
囱皿,這個后面再介紹勇婴。
4. 取消任務——cancel(boolean mayInterruptIfRunning)
通常調(diào)用cancel()
的線程和執(zhí)行子任務的線程不會是同一個。當FutureTask
的cancel(boolean mayInterruptIfRunning)
方法被調(diào)用時嘱腥,如果子任務還沒有執(zhí)行咆耿,那么這個任務就不會執(zhí)行了,如果子任務已經(jīng)執(zhí)行爹橱,且mayInterruptIfRunning=true
萨螺,那么執(zhí)行子任務的線程會被中斷(注意:這里說的是線程被中斷,不是任務被取消)愧驱,下面看一下這個方法的實現(xiàn):
public boolean cancel(boolean mayInterruptIfRunning) {
//1.判斷state是否為NEW慰技,如果不是NEW,說明任務已經(jīng)結束或者被取消了组砚,該方法會執(zhí)行返回false
//state=NEW時吻商,判斷mayInterruptIfRunning,如果mayInterruptIfRunning=true糟红,說明要中斷任務的執(zhí)行艾帐,NEW->INTERRUPTING
//如果mayInterruptIfRunning=false,不需要中斷,狀態(tài)改為CANCELLED
if (!(state == NEW &&
U.compareAndSwapInt(this, STATE, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
//2.讀取當前正在執(zhí)行子任務的線程runner,調(diào)用t.interrupt()盆偿,中斷線程執(zhí)行
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
//3.修改狀態(tài)為INTERRUPTED
U.putOrderedInt(this, STATE, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
cancel()
分析:
- 判斷
state
柒爸,保證state = NEW
才能繼續(xù)cancel()
的后續(xù)操作。state=NEW
且mayInterruptIfRunning=true
事扭,說明要中斷任務的執(zhí)行捎稚,此時,NEW->INTERRUPTING求橄。然后讀取當前執(zhí)行任務的線程runner
今野,調(diào)用t.interrupt()
,中斷線程執(zhí)行罐农,NEW->INTERRUPTING->INTERRUPTED条霜,最后調(diào)用finishCompletion()
。 - 如果
NEW->INTERRUPTING
涵亏,那么cancel()
方法宰睡,只是修改了狀態(tài),NEW->CANCELLED溯乒,然后直接調(diào)用finishCompletion()
夹厌。
所以cancel(true)
方法,只是調(diào)用t.interrupt()
裆悄,此時矛纹,如果t
因為sleep(),wait()
等方法進入阻塞狀態(tài)光稼,那么阻塞的地方會拋出InterruptedException
或南;如果線程正常運行孩等,需要結合Thread
的interrupted()
方法進行判斷,才能結束采够,否則肄方,cancel(true)
不能結束正在執(zhí)行的任務。
這也就可以解釋前面我遇到的問題蹬癌,有的情況下权她,使用 futuretask.cancel(true)方法并不能真正的結束子任務執(zhí)行。
5. 子線程返回結果前的最后一步——finishCompletion()
前面多次出現(xiàn)過這個方法逝薪,set(V v)
(保存執(zhí)行結果隅要,設置狀態(tài)為NORMAL),setException(Throwable t)
(保存結果董济,設置狀態(tài)為EXCEPTIONAL)和cancel(boolean mayInterruptIfRunning)
(設置狀態(tài)為CANCELLED/INTERRUPTED)步清,該方法在state變成最終態(tài)之后,會被調(diào)用虏肾。
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (U.compareAndSwapObject(this, WAITERS, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
finishCompletion()
主要做了三件事情:
- 遍歷
waiters
等待隊列廓啊,調(diào)用LockSupport.unpark(t)
喚醒等待返回結果的線程,釋放資源封豪。 - 調(diào)用
done()
谴轮,這個方法什么都沒有做,不過子類可以實現(xiàn)這個方法撑毛,做一些額外的操作书聚。 - 設置
callable
為null,callable
是FutureTask
封裝的任務藻雌,任務執(zhí)行完,釋放資源斩个。
這里可以解答上面的第二個問題了胯杭。FutureTask的get(long timeout, TimeUnit unit)方法,表示阻塞timeout時間后受啥,獲取子線程的執(zhí)行結果做个,但是如果子任務執(zhí)行結束了,但是超時時間還沒有到滚局,這個方法也會返回結果居暖。因為任務執(zhí)行完之后,會遍歷阻塞隊列藤肢,喚醒阻塞的線程太闺。LockSupport.unpark(t)
執(zhí)行之后,阻塞的線程會從LockSupport.park(this)/LockSupport.parkNanos(this, parkNanos)
醒來嘁圈,然后會繼續(xù)進入awaitDone(boolean timed, long nanos)
的while
循環(huán)省骂,此時蟀淮,state >= COMPLETING
,然后從awaitDone()
返回钞澳。此時怠惶,get()/get(long timeout, TimeUnit unit)
會繼續(xù)執(zhí)行,return report(s)
轧粟,上面介紹get()的時候沒介紹的方法策治。看一下report(int s)
:
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
其實就是讀取outcome兰吟,將state映射到最后返回的結果中览妖,s == NORMAL
說明任務正常結束,返回正常結果揽祥,s >= CANCELLED
讽膏,就拋出CancellationException
。
6.其他方法
FutureTask
的還有兩個方法isCancelled()
和isDone()
拄丰,其實就是判斷state府树,沒有過多的步驟。
public boolean isCancelled() {
return state >= CANCELLED;
}
public boolean isDone() {
return state != NEW;
}
總結
到此FutureTask分析完畢料按,其中感受最深的是Unsafe
的用法奄侠,對于多線程共享的對象,采用volatile
+ Unsafe
的方法载矿,代替鎖操作垄潮,進行同步;其次闷盔,是LockSupport
的park(Object blocker)
和unpark(Thread thread)
的使用
-
park(Object blocker)
:線程進入阻塞狀態(tài)弯洗,告訴線程調(diào)度,當前線程不可用逢勾,直到線程再次獲取permit
(允許)牡整;如果在調(diào)用park(Object blocker)
之前,線程已經(jīng)獲得了permit
(比如說溺拱,已經(jīng)調(diào)用了unpark(t))逃贝,那么該方法會返回。 -
unpark(Thread thread)
:使得傳入的線程再次獲得permit
.這里的permit
可以理解為一個信號量迫摔。
LockSupport
在這里的作用沐扳,類似于wait()
,notify()/notifyAll()
,關于二者的區(qū)別句占,可以看一下
Java的LockSupport.park()實現(xiàn)分析 沪摄。