前言
Java 5在concurrency包中引入了java.util.concurrent.Callable
接口,它和Runnable接口很相似奕纫,但它可以返回一個對象或者拋出一個異常提陶。
Callable接口使用泛型去定義它的返回類型。Executors類提供了一些有用的方法在線程池中執(zhí)行Callable內(nèi)的任務(wù)匹层。由于Callable任務(wù)是并行的隙笆,我們必須等待它返回的結(jié)果。而線程是屬于異步計算模型又固,所以不可能直接從別的線程中得到函數(shù)返回值仲器。
java.util.concurrent.Future
對象為我們解決了這個問題。在線程池提交Callable任務(wù)后返回了一個Future對象仰冠,使用它可以知道Callable任務(wù)的狀態(tài)和得到Callable返回的執(zhí)行結(jié)果乏冀。Future提供了get()方法讓我們可以等待Callable結(jié)束并獲取它的執(zhí)行結(jié)果。
Future的作用
當(dāng)做一定運算的時候洋只,運算過程可能比較耗時辆沦,有時會去查數(shù)據(jù)庫昼捍,或是繁重的計算,比如壓縮肢扯、加密等妒茬,在這種情況下,如果我們一直在原地等待方法返回蔚晨,顯然是不明智的乍钻,整體程序的運行效率會大大降低。
我們可以把運算的過程放到子線程去執(zhí)行铭腕,再通過 Future 去控制子線程執(zhí)行的計算過程银择,最后獲取到計算結(jié)果。
這樣一來就可以把整個程序的運行效率提高累舷,是一種異步的思想浩考。
同時在JDK 1.8的doc中,對Future的描述如下:
A Future represents the result of an asynchronous computation. Methods are provided to check if the computation is complete, to wait for its completion, and to retrieve the result of the computation.
大概意思就是Future是一個用于異步計算的接口被盈。
舉個例子:
比如去吃早點時析孽,點了包子和涼菜,包子需要等3分鐘只怎,涼菜只需1分鐘袜瞬,如果是串行的一個執(zhí)行,在吃上早點的時候需要等待4分鐘身堡,但是如果你在準(zhǔn)備包子的時候吞滞,可以同時準(zhǔn)備涼菜,這樣只需要等待3分鐘盾沫。
Future就是后面這種執(zhí)行模式。
之前我寫的一篇文章:實現(xiàn)異步編程殿漠,這個工具類你得掌握赴精!,詳細(xì)的寫了關(guān)于Future這個的應(yīng)用
創(chuàng)建Future
線程池
class Task implements Callable<String> {
public String call() throws Exception {
return longTimeCalculation();
}
}
ExecutorService executor = Executors.newFixedThreadPool(4);
// 定義任務(wù):
Callable<String> task = new Task();
// 提交任務(wù)并獲得Future:
Future<String> future = executor.submit(task);
// 從Future獲取異步執(zhí)行返回的結(jié)果:
String result = future.get(); // 可能阻塞
復(fù)制代碼
當(dāng)我們提交一個Callable任務(wù)后绞幌,我們會同時獲得一個Future對象蕾哟,然后,我們在主線程某個時刻調(diào)用Future對象的get()方法莲蜘,就可以獲得異步執(zhí)行的結(jié)果谭确。
在調(diào)用get()時,如果異步任務(wù)已經(jīng)完成票渠,我們就直接獲得結(jié)果逐哈。如果異步任務(wù)還沒有完成,那么get()會阻塞问顷,直到任務(wù)完成后才返回結(jié)果
FutureTask
除了用線程池的 submit 方法會返回一個 future 對象之外昂秃,同樣還可以用 FutureTask 來獲取 Future 類和任務(wù)的結(jié)果禀梳。
我們來看一下 FutureTask 的代碼實現(xiàn):
public class FutureTask<V> implements RunnableFuture<V>{
...
}
復(fù)制代碼
可以看到,它實現(xiàn)了一個接口肠骆,這個接口叫作 RunnableFuture算途。
我們再來看一下 RunnableFuture 接口的代碼實現(xiàn):
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
復(fù)制代碼
既然 RunnableFuture 繼承了 Runnable 接口和 Future 接口,而 FutureTask 又實現(xiàn)了 RunnableFuture 接口蚀腿,所以 FutureTask 既可以作為 Runnable 被線程執(zhí)行嘴瓤,又可以作為 Future 得到 Callable 的返回值。
典型用法是莉钙,把 Callable 實例當(dāng)作 FutureTask 構(gòu)造函數(shù)的參數(shù)廓脆,生成 FutureTask 的對象,然后把這個對象當(dāng)作一個 Runnable 對象胆胰,放到線程池中或另起線程去執(zhí)行狞贱,最后還可以通過 FutureTask 獲取任務(wù)執(zhí)行的結(jié)果。
下面我們就用代碼來演示一下:
public class FutureTaskDemo {
public static void main(String[] args) {
Task task = new Task();
FutureTask<Integer> integerFutureTask = new FutureTask<>(task);
new Thread(integerFutureTask).start();
try {
System.out.println("task運行結(jié)果:"+integerFutureTask.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
class Task implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("子線程正在計算");
int sum = 0;
for (int i = 0; i < 100; i++) {
sum += i;
}
return sum;
}
}
復(fù)制代碼
在這段代碼中可以看出蜀涨,首先創(chuàng)建了一個實現(xiàn)了 Callable 接口的 Task瞎嬉,然后把這個 Task 實例傳入到 FutureTask 的構(gòu)造函數(shù)中去,創(chuàng)建了一個 FutureTask 實例厚柳,并且把這個實例當(dāng)作一個 Runnable 放到 new Thread() 中去執(zhí)行氧枣,最后再用 FutureTask 的 get 得到結(jié)果,并打印出來别垮。
Future常用方法
方法名 | 返回值 | 入?yún)?/th> | 備注 | 總結(jié) |
---|---|---|---|---|
cancel | boolean | (boolean mayInterruptIfRunning) | 用來取消任務(wù)便监,如果取消任務(wù)成功則返回true,如果取消任務(wù)失敗則返回false碳想。 | 也就是說Future提供了三種功能:判斷任務(wù)是否完成烧董,能夠中斷任務(wù),能夠獲取任務(wù)執(zhí)行結(jié)果 |
isCancelled | boolean | 無 | 方法表示任務(wù)是否被取消成功胧奔,如果在任務(wù)正常完成前被取消成功逊移,則返回 true。 | |
isDone | boolean | 無 | 方法表示任務(wù)是否已經(jīng)完成龙填,若任務(wù)完成胳泉,則返回true; | |
get | V | 無 | 方法用來獲取執(zhí)行結(jié)果岩遗,這個方法會產(chǎn)生阻塞扇商,會一直等到任務(wù)執(zhí)行完畢才返回 | |
get | V | (long timeout, TimeUnit unit) | 用來獲取執(zhí)行結(jié)果,如果在指定時間內(nèi)宿礁,還沒獲取到結(jié)果案铺,就直接返回null |
get()方法
get方法最主要的作用就是獲取任務(wù)執(zhí)行的結(jié)果
我們來看一個代碼示例:
public class FutureTest {
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(10);
Future<Integer> future = service.submit(new CallableTask());
try {
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
service.shutdown();
}
static class CallableTask implements Callable<Integer> {
@Override
public Integer call() throws Exception {
Thread.sleep(3000);
return new Random().nextInt();
}
}
}
復(fù)制代碼
在這段代碼中,main 方法新建了一個 10 個線程的線程池梆靖,并且用 submit 方法把一個任務(wù)提交進(jìn)去红且。
這個任務(wù)它所做的內(nèi)容就是先休眠三秒鐘坝茎,然后返回一個隨機數(shù)。
接下來我們就直接把future.get
結(jié)果打印出來暇番,其結(jié)果是正常打印出一個隨機數(shù)嗤放,比如 9527 等。
isDone()方法
該方法是用來判斷當(dāng)前這個任務(wù)是否執(zhí)行完畢了壁酬。
需要注意的是次酌,這個方法如果返回 true 則代表執(zhí)行完成了;如果返回 false 則代表還沒完成舆乔。
但這里如果返回 true岳服,并不代表這個任務(wù)是成功執(zhí)行的,比如說任務(wù)執(zhí)行到一半拋出了異常希俩。那么在這種情況下吊宋,對于這個 isDone 方法而言,它其實也是會返回 true 的颜武,因為對它來說璃搜,雖然有異常發(fā)生了,但是這個任務(wù)在未來也不會再被執(zhí)行鳞上,它確實已經(jīng)執(zhí)行完畢了这吻。
所以 isDone 方法在返回 true 的時候,不代表這個任務(wù)是成功執(zhí)行的篙议,只代表它執(zhí)行完畢了唾糯。
我們用一個代碼示例來看一看蛔垢,代碼如下所示:
public class GetException {
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(20);
Future<Integer> future = service.submit(new CallableTask());
try {
for (int i = 0; i < 5; i++) {
System.out.println(i);
Thread.sleep(500);
}
System.out.println(future.isDone());
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
static class CallableTask implements Callable<Integer> {
@Override
public Integer call() throws Exception {
throw new IllegalArgumentException("Callable拋出異常");
}
}
}
復(fù)制代碼
在這段代碼中腊徙,可以看到有一個線程池,并且往線程池中去提交任務(wù)和橙,這個任務(wù)會直接拋出一個異常这难。
那么接下來我們就用一個 for 循環(huán)去休眠芋酌,同時讓它慢慢打印出 0 ~ 4 這 5 個數(shù)字,這樣做的目的是起到了一定的延遲作用雁佳。
在這個執(zhí)行完畢之后,再去調(diào)用 isDone() 方法同云,并且把這個結(jié)果打印出來糖权,然后再去調(diào)用 future.get()
cancel方法
如果不想執(zhí)行某個任務(wù)了,則可以使用 cancel 方法炸站,會有以下三種情況:
- 第一種情況最簡單星澳,那就是當(dāng)任務(wù)還沒有開始執(zhí)行時,一旦調(diào)用 cancel旱易,這個任務(wù)就會被正常取消禁偎,未來也不會被執(zhí)行腿堤,那么 cancel 方法返回 true。
- 第二種情況也比較簡單如暖。如果任務(wù)已經(jīng)完成笆檀,或者之前已經(jīng)被取消過了,那么執(zhí)行 cancel 方法則代表取消失敗盒至,返回 false酗洒。因為任務(wù)無論是已完成還是已經(jīng)被取消過了,都不能再被取消了枷遂。
- 第三種情況就是這個任務(wù)正在執(zhí)行樱衷,這個時候會根據(jù)我們傳入的參數(shù)mayInterruptIfRunning做判斷,如果傳入的參數(shù)是 true酒唉,執(zhí)行任務(wù)的線程就會收到一個中斷的信號矩桂,正在執(zhí)行的任務(wù)可能會有一些處理中斷的邏輯,進(jìn)而停止痪伦,如果傳入的是 false 則就代表不中斷正在運行的任務(wù)
isCancelled()方法
判斷是否被取消侄榴,它和 cancel 方法配合使用,比較簡單流妻。
應(yīng)用場景
目前對于Future方式牲蜀,我們經(jīng)常使用的有這么幾類:
Guava
ListenableFutrue,通過增加監(jiān)聽器的方式绅这,計算完成時立即得到結(jié)果涣达,而無需一直循環(huán)查詢
CompletableFuture
Java8的CompletableFuture,使用thenApply证薇,thenApplyAsync可以達(dá)到和Guava類似的鏈?zhǔn)秸{(diào)用效果度苔。
不同的是,對于Java8浑度,如果thenApplyAsync不傳入線程池寇窑,則會使用ForkJoinPools線程池來執(zhí)行對應(yīng)的方法,如此可以避免對其他線程產(chǎn)生影響箩张。
之前我寫的一篇文章:實現(xiàn)異步編程甩骏,這個工具類你得掌握!先慷,詳細(xì)的寫了關(guān)于Future這個的應(yīng)用
Netty
Netty解決的問題:
- 原生Future的isDone()方法判斷一個異步操作是否完成饮笛,但是定義比較模糊:正常終止、拋出異常论熙、用戶取消都會使isDone方法返回true福青。
- 對于一個異步操作,我們有些時候更關(guān)注的是這個異步操作觸發(fā)或者結(jié)束后能否再執(zhí)行一系列的動作。
與JDK相比无午,增加了完成狀態(tài)的細(xì)分媒役,增加了監(jiān)聽者,異步線程結(jié)束之后能夠觸發(fā)一系列的動作宪迟。
注意事項
添加超時機制
假設(shè)一共有四個任務(wù)需要執(zhí)行酣衷,我們都把它放到線程池中,然后它獲取的時候是按照從 1 到 4 的順序踩验,也就是執(zhí)行 get() 方法來獲取的
代碼如下所示:
public class FutureDemo {
public static void main(String[] args) {
//創(chuàng)建線程池
ExecutorService service = Executors.newFixedThreadPool(10);
//提交任務(wù)鸥诽,并用 Future 接收返回結(jié)果
ArrayList<Future> allFutures = new ArrayList<>();
for (int i = 0; i < 4; i++) {
Future<String> future;
if (i == 0 || i == 1) {
future = service.submit(new SlowTask());
} else {
future = service.submit(new FastTask());
}
allFutures.add(future);
}
for (int i = 0; i < 4; i++) {
Future<String> future = allFutures.get(i);
try {
String result = future.get();
System.out.println(result);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
service.shutdown();
}
static class SlowTask implements Callable<String> {
@Override
public String call() throws Exception {
Thread.sleep(5000);
return "速度慢的任務(wù)";
}
}
static class FastTask implements Callable<String> {
@Override
public String call() throws Exception {
return "速度快的任務(wù)";
}
}
}
復(fù)制代碼
可以看出,在代碼中我們新建了線程池箕憾,并且用一個 list 來保存 4 個 Future牡借。
其中,前兩個 Future 所對應(yīng)的任務(wù)是慢任務(wù)袭异,也就是代碼下方的 SlowTask钠龙,而后兩個 Future 對應(yīng)的任務(wù)是快任務(wù)。
慢任務(wù)在執(zhí)行的時候需要 5 秒鐘的時間才能執(zhí)行完畢御铃,而快任務(wù)很快就可以執(zhí)行完畢碴里,幾乎不花費時間。
在提交完這 4 個任務(wù)之后上真,我們用 for 循環(huán)對它們依次執(zhí)行 get 方法咬腋,來獲取它們的執(zhí)行結(jié)果,然后再把這個結(jié)果打印出來睡互。
實際上在執(zhí)行的時候會先等待 5 秒根竿,然后再很快打印出這 4 行語句。
所以問題是:
第三個的任務(wù)量是比較小的就珠,它可以很快返回結(jié)果寇壳,緊接著第四個任務(wù)也會返回結(jié)果。
但是由于前兩個任務(wù)速度很慢妻怎,所以我們在利用 get 方法執(zhí)行時壳炎,會卡在第一個任務(wù)上。也就是說逼侦,雖然此時第三個和第四個任務(wù)很早就得到結(jié)果了匿辩,但我們在此時使用這種 for 循環(huán)的方式去獲取結(jié)果,依然無法及時獲取到第三個和第四個任務(wù)的結(jié)果榛丢。直到 5 秒后铲球,第一個任務(wù)出結(jié)果了,我們才能獲取到涕滋,緊接著也可以獲取到第二個任務(wù)的結(jié)果,然后才輪到第三挠阁、第四個任務(wù)溯饵。
假設(shè)由于網(wǎng)絡(luò)原因锨用,第一個任務(wù)可能長達(dá) 1 分鐘都沒辦法返回結(jié)果,那么這個時候增拥,我們的主線程會一直卡著,影響了程序的運行效率掌栅。
此時我們就可以用 Future 的帶超時參數(shù)的get(long timeout, TimeUnit unit)
方法來解決這個問題秩仆。
這個方法的作用是,如果在限定的時間內(nèi)沒能返回結(jié)果的話猾封,那么便會拋出一個 TimeoutException 異常澄耍,隨后就可以把這個異常捕獲住,或者是再往上拋出去晌缘,這樣就不會一直卡著了齐莲。
源碼分析
超時實現(xiàn)原理
具體實現(xiàn)類:FutureTask
get()方法可以分為兩步:
- 判斷當(dāng)前任務(wù)的執(zhí)行狀態(tài),如果不是COMPLETING磷箕,就調(diào)用awaitDone()方法開始進(jìn)行死循環(huán)輪旋选酗,如果任務(wù)還沒有執(zhí)行完成會使用
nanos = deadline - System.nanoTime()
檢查是否超時,如果方法已經(jīng)超時岳枷,則會返回芒填,在返回后如果任務(wù)的狀態(tài)仍然<=COMPLETING
,就會拋出TimeoutException()嫩舟。 - 如果調(diào)用時任務(wù)沒有執(zhí)行完成氢烘,會調(diào)用parkNanos(),調(diào)用線程會阻塞在這里家厌。
接下來分兩種情況:
- 在阻塞時間完以后任務(wù)的執(zhí)行狀態(tài)仍然沒有改變?yōu)橥瓿刹ゾ粒M(jìn)入下一次循環(huán),直接返回饭于。
- 如果在輪詢中狀態(tài)已經(jīng)改變蜀踏,任務(wù)完成,則會中斷死循環(huán)掰吕,返回任務(wù)執(zhí)行的返回值果覆。