一合砂、引言
1.?FutureTask在高并發(fā)場景下能確保任務(wù)只執(zhí)行一次嗎?
2.?任務(wù)還在執(zhí)行的時候用戶調(diào)用cancel能否讓任務(wù)停止執(zhí)行?
二德绿、功能簡介
FutureTask是一種異步任務(wù)(或異步計算)岔帽,舉個栗子玫鸟,主線程的邏輯中需要使用某個值,但這個值需要負責(zé)的運算得來犀勒,那么主線程可以提前建立一個異步任務(wù)來計算這個值(在其他的線程中計算)屎飘,然后去做其他事情,當(dāng)需要這個值的時候再通過剛才建立的異步任務(wù)來獲取這個值贾费,有點并行的意思钦购,這樣可以縮短整個主線程邏輯的執(zhí)行時間。
與1.6版本不同褂萧,1.7的FutureTask不再基于AQS來構(gòu)建押桃,而是在內(nèi)部采用簡單的Treiber Stack來保存等待線程。
三导犹、前置知識
LockSupport
LockSupport是用來創(chuàng)建鎖及其他同步類的基本線程阻塞元素唱凯,它的park和 unpark能夠分別阻塞線程和解除線程阻塞。它提供了可以指定阻塞時長的park方法谎痢。park和unpark的基本接口為:
public static void park() {
? ? unsafe.park(false, 0L);
}
public static void unpark(Thread thread) {
? ? if (thread != null)
? ? ? ? unsafe.unpark(thread);
}
Unsafe
Java不能夠直接訪問操作系統(tǒng)底層磕昼,而是通過本地方法來訪問。Unsafe提供了硬件級別的原子訪問节猿,主要提供一下功能:
1. 分配釋放內(nèi)存
2. 定位某個字段的內(nèi)存位置
3. 掛起一個線程和恢復(fù)票从,更多的是通過LockSupport來訪問。park和unpark
4. CAS操作滨嘱,比較一個對象的某個位置的內(nèi)存值是否與期望值一致峰鄙,一致則更新對應(yīng)值,此更新是不可中斷的太雨。主要方法是compareAndSwap*
并發(fā)工具三板斧
狀態(tài)吟榴,隊列,CAS
四囊扳、源碼分析
1. FutureTask介紹
FutureTask是一種可以取消的異步的計算任務(wù)煤墙。它的計算是通過Callable實現(xiàn)的,可以把它理解為是可以返回結(jié)果的Runnable宪拥。
使用FutureTask的優(yōu)勢有:
可以獲取線程執(zhí)行后的返回結(jié)果仿野;
提供了超時控制功能。
它實現(xiàn)了Runnable接口和Future接口:
2.?FutureTask的狀態(tài)
在FutureTask中她君,狀態(tài)是由state屬性來表示的脚作,不出所料,它是volatile類型的,確保了不同線程對它修改的可見性:
private volatile int state;
private static final int NEW? ? ? ? ? =0;
private static final int COMPLETING? =1;
private static final int NORMAL? ? ? =2;
private static final int EXCEPTIONAL? =3;
private static final int CANCELLED? ? =4;
private static final int INTERRUPTING =5;
private static final int INTERRUPTED? =6;
狀態(tài)轉(zhuǎn)換路徑
棧結(jié)構(gòu)
run方法
public void run(){
/*
? ? * 首先判斷狀態(tài)球涛,如果不是初始狀態(tài)劣针,說明任務(wù)已經(jīng)被執(zhí)行或取消;
? ? * runner是FutureTask的一個屬性亿扁,用于保存執(zhí)行任務(wù)的線程捺典,
? ? * 如果不為空則表示已經(jīng)有線程正在執(zhí)行,這里用CAS來設(shè)置从祝,失敗則返回襟己。
? ? */
if(state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))
?return;
try{
? ? ? ? Callable<V> c = callable;
? ? ? ? ? // 只有初始狀態(tài)才會執(zhí)行
? ? ? ? ?if(c !=null&& state == NEW) {
? ? ? ? ? ? V result;
? ? ? ? ? ?boolean ran;
try{
? ? ? ? ? ? ? ?// 執(zhí)行任務(wù)
? ? ? ? ? ? ? ? result = c.call();
? ? ? ? ? ? ? ?// 如果沒出現(xiàn)異常牍陌,則說明執(zhí)行成功了
? ? ? ? ? ? ? ?ran =true;
}catch(Throwable ex) {
result =null;
ran =false;
// 設(shè)置異常
setException(ex);
? }
// 如果執(zhí)行成功擎浴,則設(shè)置返回結(jié)果
if(ran)
? ? ? ? ? ? ? ? set(result);
? ? ? ? }
}finally{
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
// 無論是否執(zhí)行成功,把runner設(shè)置為null
runner =null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
ints = state;
// 如果被中斷毒涧,則說明調(diào)用的cancel(true)贮预,
// 這里要保證在cancel方法中把state設(shè)置為INTERRUPTED
// 否則可能在cancel方法中還沒執(zhí)行中斷,造成中斷的泄露
if(s >= INTERRUPTING)
? ? ? ? ? ? handlePossibleCancellationInterrupt(s);
? ? }
}
run方法總結(jié)
校驗當(dāng)前任務(wù)狀態(tài)是否為NEW以及runner是否已賦值契讲。這一步是防止任務(wù)被取消仿吞。
double-check任務(wù)狀態(tài)state
執(zhí)行業(yè)務(wù)邏輯,也就是c.call()方法被執(zhí)行
如果業(yè)務(wù)邏輯異常捡偏,則調(diào)用setException方法將異常對象賦給outcome唤冈,并且更新state值
如果業(yè)務(wù)正常,則調(diào)用set方法將執(zhí)行結(jié)果賦給outcome霹琼,并且更新state值
awaitDone方法
//? 第一次循環(huán):創(chuàng)建棧頭結(jié)點
// 第二次循環(huán): 入棧操作
// 第三次循環(huán): 開始阻塞务傲,等待通知
//? 隊列針對一個FutureTask示例
// 一個FutureTask示例的多個線程凉当,依次入棧枣申,通過next節(jié)點鏈接,后面再依次喚醒
private int awaitDone(boolean timed, long nanos)throws InterruptedException {
// 計算到期時間
? ? final long deadline = timed ? System.nanoTime() + nanos :0L;
? ? WaitNode q =null;
? ? boolean queued =false;
? ? for (; ; ) {
// 如果被中斷闽巩,刪除節(jié)點缸匪,拋出異常
? ? ? ? if (Thread.interrupted()) {
? ? ? ? ? ? removeWaiter(q);
? ? ? ? ? ? throw new InterruptedException();
? ? ? ? }
? ? ? ?int s = state;
? ? ? ? // 如果任務(wù)執(zhí)行完畢并且設(shè)置了最終狀態(tài)或者被取消被环,則返回
? ? ? ? if (s > COMPLETING) {
? ? ? ? ? if (q !=null)
? ? ? ? ? q.thread =null;
? ? ? ? ? ? return s;
? ? ? ? }
// s == COMPLETING時通過Thread.yield();讓步其他線程執(zhí)行,
// 主要是為了讓狀態(tài)改變
? ? ? ? else if (s == COMPLETING)// cannot time out yet
? ? ? ? ? ? Thread.yield(); // 主動讓出CPU
? ? ? ? ? ? // 創(chuàng)建一個WaitNode
? ? ? ? else if (q ==null)
? ? ? ? ? q =new WaitNode();? ?// 第一次頭節(jié)點
? ? ? ? ? ? // CAS設(shè)置棧頂節(jié)點
? ? ? ? else if (!queued)
? ? ? ? ?queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);? // 入棧操作
? ? ? ? ? ? // 如果設(shè)置了超時模孩,則計算是否已經(jīng)到了開始設(shè)置的到期時間
? ? ? ? else if (timed) {
? ? ? ? ? ? ?nanos = deadline - System.nanoTime();
? ? ? ? ? ? // 如果已經(jīng)到了到期時間,刪除節(jié)點贮缅,返回狀態(tài)
? ? ? ? ? ? if (nanos <=0L) {
? ? ? ? ? ? ? removeWaiter(q);
? ? ? ? ? ? ? ? return state;
? ? ? ? ? ? }
? ? ? ? ? ? // 阻塞到到期時間
? ? ? ? ? ? LockSupport.parkNanos(this, nanos);
? ? ? ? }
? ? ? ? // 如果沒有設(shè)置超時榨咐,會一直阻塞,直到被中斷或者被喚醒
? ? ? ? else
? ? ? ? ? ? LockSupport.park(this);
? ? }
}
awaitDone方法總結(jié)
計算deadline谴供,也就是到某個時間點后如果還沒有返回結(jié)果块茁,那么就超時了。
進入自旋,也就是死循環(huán)数焊。
首先判斷是否響應(yīng)線程中斷永淌。對于線程中斷的響應(yīng)往往會放在線程進入阻塞之前,這里也印證了這一點佩耳。
判斷state值遂蛀,如果>COMPLETING表明任務(wù)已經(jīng)取消或者已經(jīng)執(zhí)行完畢,就可以直接返回了干厚。
如果任務(wù)還在執(zhí)行李滴,則為當(dāng)前線程初始化一個等待節(jié)點WaitNode,入等待隊列萍诱。這里和AQS的等待隊列類似悬嗓,只不過Node只關(guān)聯(lián)線程,而沒有狀態(tài)裕坊。AQS里面的等待節(jié)點是有狀態(tài)的包竹。
計算nanos,判斷是否已經(jīng)超時籍凝。如果已經(jīng)超時周瞎,則移除所有等待節(jié)點,直接返回state饵蒂。超時的話声诸,state的值仍然還是COMPLETING。
如果還未超時退盯,就通過LockSupprot類提供的方法在指定時間內(nèi)掛起當(dāng)前線程彼乌,等待任務(wù)線程喚醒或者超時喚醒。
入棧示意圖
五渊迁、總結(jié)
FutureTask是線程安全的慰照,在多線程下任務(wù)也只會被執(zhí)行一次;
注意在執(zhí)行時各種狀態(tài)的切換琉朽;
get方法調(diào)用時毒租,如果任務(wù)沒有結(jié)束,要阻塞當(dāng)前線程箱叁,法阻塞的線程會保存在一個Treiber Stack中墅垮;
get方法超時功能如果超時未獲取成功,會拋出TimeoutException耕漱;
注意在取消時的線程中斷算色,在run方法中一定要保證結(jié)束時的狀態(tài)是INTERRUPTED,否則在cancel方法中可能沒有執(zhí)行interrupt螟够,造成中斷的泄露灾梦。