引言
關(guān)于Runnable恼除、Callable接口大家可能在最開始學(xué)習(xí)Java多線程編程時豁辉,都曾學(xué)習(xí)過一個概念:在Java中創(chuàng)建多線程的方式有三種:繼承Thread類徽级、實現(xiàn)Runnable接口以及實現(xiàn)Callable接口餐抢。但是實則不然旷痕,真正創(chuàng)建多線程的方式只有一種:繼承Thread類欺抗,因為只有new Thread().start()
這種方式才能真正的映射一條OS的內(nèi)核線程執(zhí)行绞呈,而關(guān)于實現(xiàn)Runnable接口以及實現(xiàn)Callable接口創(chuàng)建出的Runnable佃声、Callable對象在我看來只能姑且被稱為“多線程任務(wù)”圾亏,因為無論是Runnable對象還是Callable對象召嘶,最終執(zhí)行都要交由Thread對象來執(zhí)行甲喝。
一、Runnable與Callable接口淺析
在Java中淳玩,一般開啟多線程的方式都是使用創(chuàng)建Thread對象的方式蜕着,如下:
new Thread(
@Override
public void run() {}
).start();
但是這種方式顯而易見的存在一個致命的缺陷:創(chuàng)建對象時就將線程執(zhí)行者與要執(zhí)行的任務(wù)綁定在了一塊兒承匣,使用起來不怎么靈活韧骗。所以可以通過實現(xiàn)Runnable接口的實現(xiàn)類創(chuàng)建出多線程任務(wù)對象袍暴,也可以直接通過創(chuàng)建Runnable匿名內(nèi)部類對象達(dá)到同樣的效果政模,如下:
public class Task implements Runnable{
@Override
public void run() {}
public static void main(String[] args){
Task task = new Task();
Thread t1 = new Thread(task);
Thread t2 = new Thread(task);
t1.start();t2.start();
// 或者也可以這樣览徒!
Runnable taskRunnable = new Runnable(){
@Override
public void run() {}
};
Thread tA = new Thread(task);
Thread tB = new Thread(task);
tA.start();tB.start();
}
}
總的來說习蓬,上面的兩種方式都將執(zhí)行者線程實體Thread對象
和任務(wù)Runnable對象
分開,在實際編碼過程中枫慷,可以選擇多條線程同時執(zhí)行一個task
任務(wù)或听,這種方式會使得多線程編程更加靈活誉裆。
但是在實際開發(fā)過程中足丢,往往有時候的多線程任務(wù)執(zhí)行完成之后是需要返回值的斩跌,但是Runnable
接口的run()
方法是void
無返回類型的耀鸦,那么當(dāng)需要返回值時又該怎么辦呢袖订?此時我們就可以用到Callable
揪漩。先來看看Callable
接口的定義:
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
顯而易見奄容,Callable
接口與Runnable
接口一樣昂勒,都是被定義為一個函數(shù)式接口,且與Runnable
接口一樣僅提供了一個方法call()
塘娶。與Runnable
接口的不同點在于:
-
call()
方法可以有返回值刁岸,返回類型為泛型V
虹曙,代表著支持所有類型的返回值。 -
call()
方法定義時聲明了可以拋出異常:throws Exception
疏哗,而run()
則不行沃斤。
但是需要注意的是:雖然Thread類提供了很多構(gòu)造器方法衡瓶,但是沒有一個構(gòu)造方法是可以接收Callable對象的,如下:
public Thread()
public Thread(Runnable target)
Thread(Runnable target, AccessControlContext acc)
public Thread(ThreadGroup group, Runnable target)
public Thread(String name)
public Thread(ThreadGroup group, String name)
public Thread(Runnable target, String name)
public Thread(ThreadGroup group, Runnable target, String name)
public Thread(ThreadGroup group, Runnable target,
String name,long stackSize)
如上便是Thread類提供的所有構(gòu)造器十厢,如前面所說的那樣,并沒有提供一個可以直接接收Callable對象的構(gòu)造器包颁,那么在使用Callable時到底是怎么交給線程執(zhí)行的呢娩嚼?肯定需要依賴于別的東西才能交由線程執(zhí)行岳悟,哪這個Callable依賴的東西到底是什么?FutureTask滔灶!先上個案例:
public static void main(String[] args) throws Exception {
FutureTask<String> futureTask = new FutureTask<>(() ->
Thread.currentThread().getName() + "-竹子愛熊貓......");
new Thread(futureTask, "T1").start();
System.out.println("main線程獲取異步執(zhí)行結(jié)果:"
+ futureTask.get());
}
// 執(zhí)行結(jié)果:main線程獲取異步執(zhí)行結(jié)果:T1-竹子愛熊貓......
嗯宽气?怎么實現(xiàn)的萄涯?下面我們可以簡單的看看源碼(后續(xù)會詳細(xì)分析):
// FutureTask類
public class FutureTask<V> implements RunnableFuture<V> {
// FutureTask類 → 構(gòu)造方法
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // 將當(dāng)前的任務(wù)狀態(tài)設(shè)置為NEW新建狀態(tài)
}
}
// RunnableFuture接口
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
ok~,到目前為止我們可以發(fā)現(xiàn)燃逻,FutureTask
構(gòu)造器中可以接收一個Callable
類型的對象猿涨,而FutureTask
實現(xiàn)了Runnable,Future
兩個接口叛赚,所以當(dāng)我們創(chuàng)建了一個Callable
類型的任務(wù)時俺附,可以先封裝成一個FutureTask
對象,再將封裝好的FutureTask
傳遞給線程執(zhí)行即可璃哟。
二沮稚、Callable接口與FutureTask結(jié)構(gòu)詳解
上面我們簡單的對于Runnable、Callable接口以及FutureTask進(jìn)行介紹盛杰,接下來我們可以從源碼角度分析一下具體的實現(xiàn)即供,先簡單看看大體的類結(jié)構(gòu):
// Callable接口
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
// FutureTask類(省略其他代碼,后續(xù)會詳細(xì)介紹)
public class FutureTask<V> implements RunnableFuture<V>{}
// RunnableFuture接口
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
// Runnable接口
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
// Future接口
public interface Future<V> {
// 嘗試取消Callable任務(wù)驱证,取消成功返回true抹锄,反之false
boolean cancel(boolean mayInterruptIfRunning);
// 判斷Callable任務(wù)是否被取消
boolean isCancelled();
// 判斷call()是否執(zhí)行結(jié)束获高,結(jié)束返回true念秧,反之false
// 返回true的情況一共有三種:
// ①正常執(zhí)行完成返回結(jié)果
// ②執(zhí)行過程中拋出異常中斷了執(zhí)行
// ③Callable任務(wù)被取消導(dǎo)致執(zhí)行結(jié)束
boolean isDone();
// 獲取call()執(zhí)行結(jié)果庄吼,執(zhí)行完成則返回总寻,未完成則阻塞至完成為止
V get() throws InterruptedException, ExecutionException;
// get()方法的升級版渐行,如果未完成會阻塞直至執(zhí)行完畢或超時
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
如上肴沫,Callable
任務(wù)依賴FutureTask
類執(zhí)行颤芬,而FutureTask
類實現(xiàn)了RunnableFuture
接口,RunnableFuture
接口實則沒有提供新的方法菱魔,只是簡單的繼承整合了Runnable,Future
兩個接口澜倦,所以大體類圖關(guān)系如下:
在
FutureTask
中,與AQS一樣栋艳,存在一個用volatile
關(guān)鍵字修飾的int變量state
晴叨,FutureTask
通過它對任務(wù)的執(zhí)行狀態(tài)進(jìn)行管理兼蕊。如下:
/*
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
state
存在四種狀態(tài)轉(zhuǎn)換變化,如源碼中的注釋:
/*
* Possible state transitions:
* ①:NEW -> COMPLETING -> NORMAL
* ②:NEW -> COMPLETING -> EXCEPTIONAL
* ③:NEW -> CANCELLED
* ④:NEW -> INTERRUPTING -> INTERRUPTED
*/
-
NEW
:初始化狀態(tài)牵啦,任務(wù)剛被創(chuàng)建出來處于的狀態(tài) -
COMPLETING
:終止中間狀態(tài),任務(wù)從NEW變?yōu)镹ORMAL/EXCEPTIONAL時會經(jīng)歷的短暫狀態(tài) -
NORMAL
:正常終止?fàn)顟B(tài)裳瘪,任務(wù)正常執(zhí)行完成后處于的狀態(tài) -
EXCEPTIONAL
:異常終止?fàn)顟B(tài),任務(wù)執(zhí)行過程中拋出了異常后處于的狀態(tài) -
CANCELLED
:取消狀態(tài)派殷,任務(wù)被成功取消后處于的狀態(tài) -
INTERRUPTING
:中斷中間狀態(tài)愈腾,任務(wù)還未執(zhí)行或在執(zhí)行中時調(diào)用cancel(true)處于的中間狀態(tài) -
INTERRUPTED
:中斷最終狀態(tài),執(zhí)行任務(wù)被取消且執(zhí)行線程被中斷后處于的狀態(tài)
如上便是FutureTask
在不同情況下會經(jīng)歷的所有狀態(tài)橱乱,當(dāng)我們創(chuàng)建一個FutureTask
時,FutureTask
對象的state
一定是處于NEW
新建狀態(tài)的危纫,因為在FutureTask
的構(gòu)造方法中會執(zhí)行this.state = NEW;
操作契耿。
同時搪桂,隨著任務(wù)開始執(zhí)行之后,FutureTask
的狀態(tài)會開始發(fā)生轉(zhuǎn)換内列,在NEW→NORMAL
與NEW→EXCEPTIONAL
兩種狀態(tài)轉(zhuǎn)換過程中,還會出現(xiàn)COMPLETING
中間狀態(tài),但這種中間狀態(tài)存在的時間非常短暫会油,也會馬上變成相應(yīng)的最終狀態(tài)。不過值得我們注意的是:嫂冻,FutureTask
的狀態(tài)轉(zhuǎn)換是不可逆的桨仿,并且同時只要狀態(tài)不處于NEW
初始化狀態(tài),那么就可以認(rèn)為該任務(wù)已經(jīng)結(jié)束吹零,例如FutureTask
判斷任務(wù)是否執(zhí)行結(jié)束的isDone()
方法:
public boolean isDone() {
// 只要任務(wù)狀態(tài)不是NEW都返回true
return state != NEW;
}
ok~,到目前為止我們已經(jīng)簡單了解了FutureTask
的類圖結(jié)構(gòu)以及任務(wù)狀態(tài)state
茫蛹,接下來再簡單的看看FutureTask
的整體成員結(jié)構(gòu)馍刮。在FutureTask
中存在兩類線程:
①執(zhí)行者:執(zhí)行異步任務(wù)的線程卡啰,只存在一條線程
②等待者:等待獲取執(zhí)行結(jié)果的線程,可能存在多條線程
后續(xù)我們都以“執(zhí)行者”亡脸、“等待者”來稱呼這兩類線程浅碾,下面我們來看看FutureTask
的成員:
// 任務(wù)的執(zhí)行狀態(tài)
private volatile int state;
// 異步任務(wù):Callable對象
private Callable<V> callable;
// 任務(wù)執(zhí)行結(jié)果(因為是Object類型,所以異常也是可以保存的)
private Object outcome;
// 執(zhí)行者線程
private volatile Thread runner;
// 等待者線程:由WaitNode內(nèi)部類構(gòu)成的鏈表
private volatile WaitNode waiters;
// 靜態(tài)內(nèi)部類:WaitNode
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
前面我們曾提到過,Future
接口的get()
方法作用是獲取異步執(zhí)行之后的結(jié)果徙邻,若執(zhí)行還未完成缰犁,獲取執(zhí)行結(jié)果的等待者則會阻塞。那么FutureTask
作為Future
接口的實現(xiàn)者丰嘉,自然也對該方法進(jìn)行了實現(xiàn)耍贾。而在FutureTask
是怎么在"執(zhí)行者"未執(zhí)行完成的情況下,將想要獲取結(jié)果的“等待者”阻塞的呢晃听?這是因為FutureTask
內(nèi)部在邏輯上存在一個由WaitNode
節(jié)點組成的單向鏈表,當(dāng)一條線程嘗試獲取執(zhí)行結(jié)果但是還未執(zhí)行結(jié)束時初斑,FutureTask
則會每個等待者封裝成一個WaitNode
節(jié)點见秤,并將其加入該鏈表中,直至執(zhí)行者的任務(wù)執(zhí)行完成后,再喚醒鏈表的每個節(jié)點中的線程。(有些類似于ReetrantLock的Condition等待隊列實現(xiàn)模式)
而因為FutureTask
內(nèi)部的鏈表因為僅僅只是邏輯鏈表的原因修噪,所以FutureTask
本身只會將鏈表的head
節(jié)點存儲在成員變量waiters
中,其余的節(jié)點通過WaitNode
中的next
后繼指針連接脏款。
ok~,到目前為止剃盾,對FutureTask
的內(nèi)部結(jié)構(gòu)已經(jīng)有了基本的全面認(rèn)識衰伯,接下里我們再看看FutureTask
的具體實現(xiàn)過程意鲸。
三、深入源碼分析FutureTask執(zhí)行流程及等待原理
在前面的類圖結(jié)構(gòu)中,可以看到FutureTask
是實現(xiàn)了RunnableFuture
接口的蚜退,而RunnableFuture
接口繼承了Runnable,Future
兩個接口,那么接下來分別從兩個接口的實現(xiàn)來分析FutureTask
接口的實現(xiàn)幅恋。
3.1泵肄、Runnable接口的執(zhí)行流程實現(xiàn)分析
// Runnable接口
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
Runnable
接口中僅提供了一個run()
方法捆交,所以我們先看看FutureTask
中的run()
實現(xiàn):
// FutureTask類 → run()方法
public void run() {
// ①判斷state是否為NEW,如果不是代表任務(wù)已經(jīng)執(zhí)行過或被取消
// ②判斷執(zhí)行者位置上是否有線程腐巢,有則代表著當(dāng)前任務(wù)正在執(zhí)行
// 如果state不為NEW或者執(zhí)行者不為空都會直接終止當(dāng)前線程執(zhí)行
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
// 如果state==NEW且runner==currentThread,接著繼續(xù)往下執(zhí)行
try {
// 獲取需要執(zhí)行的異步任務(wù)
Callable<V> c = callable;
// 檢查任務(wù)是否為空并再次檢查state是否處于初始化狀態(tài)
if (c != null && state == NEW) {
// 接收結(jié)果
V result;
// 接收終止?fàn)顟B(tài):true為正常結(jié)束冯丙,false則為異常結(jié)束
boolean ran;
try {
// 調(diào)用call()執(zhí)行任務(wù)并獲取執(zhí)行結(jié)果
result = c.call();
// 終止?fàn)顟B(tài)改為正常結(jié)束
ran = true;
} catch (Throwable ex) {
// 返回結(jié)果置為空
result = null;
// 終止?fàn)顟B(tài)改為異常結(jié)束
ran = false;
// CAS-將捕獲的異常設(shè)置給outcome全局變量
setException(ex);
}
// 如果執(zhí)行狀態(tài)為正常結(jié)束
if (ran)
// CAS-將執(zhí)行結(jié)果設(shè)置給outcome全局變量
set(result);
}
} finally {
// 將執(zhí)行者線程的引用置為null
runner = null;
// 檢查state是否為INTERRUPTING肉瓦、INTERRUPTED中斷狀態(tài)
int s = state;
if (s >= INTERRUPTING)
// 如果是則響應(yīng)線程中斷
handlePossibleCancellationInterrupt(s);
}
}
如上便是FutureTask
對run()
方法的實現(xiàn),簡單來說胃惜,在FutureTask.run()
方法中主要做了如下四步:
- ①判斷任務(wù)執(zhí)行狀態(tài)泞莉,如果正在執(zhí)行或被執(zhí)行過則直接
return
,反之則繼續(xù)執(zhí)行任務(wù) - ②如果任務(wù)執(zhí)行過程中出現(xiàn)異常船殉,則調(diào)用
setException()
寫出捕獲的異常信息 - ③如果任務(wù)執(zhí)行成功后鲫趁,獲取執(zhí)行返回值并調(diào)用
set()
寫出任務(wù)執(zhí)行完成后的返回值 - ④任務(wù)執(zhí)行結(jié)束時,判斷任務(wù)狀態(tài)是否需要中斷捺弦,需要則調(diào)用
handlePossibleCancellationInterrupt()
進(jìn)行中斷處理
ok~饮寞,接下來再詳細(xì)看看setException()
與set()
方法:
// FutureTask類 → setException()方法
protected void setException(Throwable t) {
// 利用CAS機制將state改為COMPLETING中間狀態(tài)
if (UNSAFE.compareAndSwapInt(this,stateOffset,NEW,COMPLETING)) {
// 將捕獲的異常寫出給outcome成員
outcome = t;
// 再次利用CAS修改state為EXCEPTIONAL異常終止?fàn)顟B(tài)
UNSAFE.putOrderedInt(this,stateOffset,EXCEPTIONAL); // 最終態(tài)
// 喚醒等待隊列中的等待者線程
finishCompletion();
}
}
// FutureTask類 → set()方法
protected void set(V v) {
// 利用CAS機制修改state為COMPLETING中間狀態(tài)
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 將執(zhí)行結(jié)果寫出給outcome成員
outcome = v;
// 再次利用CAS修改state為NORMAL正常終止?fàn)顟B(tài)
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最終態(tài)
// 喚醒等待隊列中的等待者線程
finishCompletion();
}
}
如源碼所示孝扛,當(dāng)任務(wù)執(zhí)行過程中拋出異常時會調(diào)用setException()
方法而該方法的邏輯也分為四步:
- ①先使用CAS操作將
state
修改為COMPLETING
中間狀態(tài) - ②將捕獲的異常寫出給
outcome
成員 - ③寫出捕獲的異常后再次使用CAS將
state
改為EXCEPTIONAL
異常終止?fàn)顟B(tài) - ④調(diào)用
finishCompletion()
方法喚醒等待列表中的等待者線程
不過當(dāng)任務(wù)正常執(zhí)行結(jié)束后會調(diào)用set()
方法,而set()
方法與setException()
方法同理幽崩,也分為四步執(zhí)行:
- ①先使用CAS操作將
state
修改為COMPLETING
中間狀態(tài) - ②將任務(wù)正常執(zhí)行結(jié)束的返回值寫出給
outcome
成員 - ③寫出后再次使用CAS將
state
改為NORMAL
正常終止?fàn)顟B(tài) - ④調(diào)用
finishCompletion()
方法喚醒等待列表中的等待者線程
ok~苦始,緊接著再來看看喚醒等待鏈表中“等待者”線程的finishCompletion()
方法:
// FutureTask類 → finishCompletion()方法
private void finishCompletion() {
// 該方法調(diào)用前state一定要為最終態(tài)
// 獲取waiters中保存的head節(jié)點,根據(jù)head遍歷整個邏輯鏈表
for (WaitNode q; (q = waiters) != null;) {
// 利用cas操作將原本的head節(jié)點置null
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
// 獲取q節(jié)點的線程
Thread t = q.thread;
if (t != null) {
q.thread = null; // 置空線程引用信息
LockSupport.unpark(t);// 喚醒節(jié)點中的線程
}
// 獲取鏈表中的下一個節(jié)點
WaitNode next = q.next;
// 如果下一個節(jié)點為空則代表著已經(jīng)是鏈表尾部了
if (next == null)
// 那么則終止循環(huán)
break;
// 置空當(dāng)前節(jié)點的后繼節(jié)點引用信息方便GC
q.next = null;
// 將獲取到的后繼節(jié)點賦值給q
q = next;
}
// 當(dāng)遍歷完成整個鏈表后退出循環(huán)
break;
}
}
// done()方法沒有具體實現(xiàn)慌申,留給使用者自身用于拓展的
// 可以根據(jù)需求讓執(zhí)行者線程在執(zhí)行完畢前多做一點善后工作
done();
callable = null; // to reduce footprint
}
finishCompletion()
方法的邏輯相對來說就比較簡單了陌选,通過成員變量waiters
中保存的head
頭節(jié)點與next
后繼節(jié)點指針,遍歷喚醒整個邏輯鏈表所有節(jié)點中的等待者線程蹄溉。
到目前為止咨油,任務(wù)執(zhí)行的邏輯都已經(jīng)分析完畢了,但是在FutureTask.run()
的最后柒爵,還存在finally
語句塊役电,這代表著不管任務(wù)執(zhí)行過程中有沒有出現(xiàn)異常都會執(zhí)行的邏輯,如下:
finally {
// 將執(zhí)行者線程的引用置為null
runner = null;
// 檢查state是否為INTERRUPTING棉胀、INTERRUPTED中斷狀態(tài)
int s = state;
if (s >= INTERRUPTING)
// 如果是則響應(yīng)線程中斷
handlePossibleCancellationInterrupt(s);
}
// FutureTask類 → handlePossibleCancellationInterrupt()方法
private void handlePossibleCancellationInterrupt(int s) {
// 1.如果state==INTERRUPTING中斷中間狀態(tài)
if (s == INTERRUPTING)
// 3.如果線程再次從就緒狀態(tài)獲取到cpu資源回到執(zhí)行狀態(tài)
// 循環(huán)調(diào)用yield()方法讓當(dāng)前線程持續(xù)處于就緒狀態(tài)
// 直至線程被中斷且state==INTERRUPTED為止
while (state == INTERRUPTING)
// 2.調(diào)用yield()讓當(dāng)前線程讓出cpu資源退出執(zhí)行狀態(tài)
// 回到就緒狀態(tài)方便響應(yīng)線程中斷
Thread.yield();
}
在finally
語句塊中法瑟,會判斷state
是否為為INTERRUPTING、INTERRUPTED
中斷狀態(tài)唁奢,如果是會調(diào)用handlePossibleCancellationInterrupt()
方法響應(yīng)線程中斷操作霎挟,該方法的具體邏輯請參考如上源碼我寫的注釋。整體執(zhí)行流程如下:
至此麻掸,整個FutureTask執(zhí)行任務(wù)的流程分析結(jié)束酥夭。
3.2、FutureTask對Future接口的實現(xiàn)分析
在前面曾提到FutureTask
實現(xiàn)了RunnableFuture
接口脊奋,而RunnableFuture
除開繼承了Runnable
外熬北,還繼承了Future
接口,接著首先來看看Future接口的定義:
// Future接口
public interface Future<V> {
// 嘗試取消Callable任務(wù)诚隙,取消成功返回true蒜埋,反之false
boolean cancel(boolean mayInterruptIfRunning);
// 判斷Callable任務(wù)是否被取消
boolean isCancelled();
// 判斷call()是否執(zhí)行結(jié)束,結(jié)束返回true最楷,反之false
// 返回true的情況一共有三種:
// ①正常執(zhí)行完成返回結(jié)果
// ②執(zhí)行過程中拋出異常中斷了執(zhí)行
// ③Callable任務(wù)被取消導(dǎo)致執(zhí)行結(jié)束
boolean isDone();
// 獲取call()執(zhí)行結(jié)果,執(zhí)行完成則返回待错,未完成則阻塞至完成為止
V get() throws InterruptedException, ExecutionException;
// get()方法的升級版籽孙,如果未完成會阻塞直至執(zhí)行完畢或超時
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
在Future
接口中主要提供了兩類方法:獲取任務(wù)執(zhí)行結(jié)果的get
方法以及取消任務(wù)的cancel
方法,我們依次從get -> cancel
逐步分析火俄。先來看看FutureTask
類對get
方法的實現(xiàn):
// FutureTask類 → get()方法
public V get() throws InterruptedException, ExecutionException {
// 獲取任務(wù)狀態(tài)
int s = state;
// 如果任務(wù)狀態(tài)不大于終止中間狀態(tài)則阻塞線程
if (s <= COMPLETING)
s = awaitDone(false, 0L);
// 如果state大于終止中間狀態(tài)則代表是最終態(tài)了犯建,則返回執(zhí)行結(jié)果
return report(s);
}
// FutureTask類 → 超時版get()方法
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
// 如果任務(wù)還沒執(zhí)行完成,那么調(diào)用awaitDone傳遞給定的時間阻塞線程
// 如果時間到了狀態(tài)還是不大于COMPLETING則代表任務(wù)還未執(zhí)行結(jié)束
// 那么拋出TimeoutException強制中斷當(dāng)前方法執(zhí)行瓜客,退出等待狀態(tài)
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
// 如果任務(wù)執(zhí)行完成或在給定時間內(nèi)執(zhí)行完成則返回執(zhí)行結(jié)果
return report();
}
FutureTask.get()
方法的邏輯相對來說就比較簡單了:
- ①首先判斷了一下任務(wù)有沒有執(zhí)行結(jié)束(state是否<=COMPLETING适瓦,如果<=代表還沒結(jié)束)
- ②如果任務(wù)還未執(zhí)行或還在執(zhí)行過程中竿开,調(diào)用
awaitDone()
方法阻塞當(dāng)前等待者線程 - ③如果任務(wù)已經(jīng)執(zhí)行結(jié)束(狀態(tài)變?yōu)榱俗罱K態(tài)),調(diào)用
report()
返回執(zhí)行的結(jié)果
而超時等待版的FutureTask.get()
方法則是在第二步的時候有些不一致玻熙,在超時版本的get方法中否彩,當(dāng)任務(wù)還未執(zhí)行或者還在執(zhí)行過程中的情況下,會調(diào)用awaitDone(true,unit.toNanos(timeout)))
方法在給定的時間內(nèi)等待嗦随,如果時間到了任務(wù)還未執(zhí)行結(jié)束則會拋出TimeoutException
異常強制線程退出等待狀態(tài)列荔。
ok~,接著再來看看awaitDone()
和report()
方法:
// FutureTask類 → awaitDone()方法
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// 計算等待的截止時間
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
// 如果出現(xiàn)線程中斷信息則移除等待鏈表節(jié)點信息
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
// 再次獲取任務(wù)狀態(tài)枚尼,如果大于COMPLETING
// 代表任務(wù)已經(jīng)執(zhí)行結(jié)束贴浙,直接返回最新的state值
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
// 如果state==中間狀態(tài)代表任務(wù)已經(jīng)快執(zhí)行完成了
// 那么則讓當(dāng)前線程讓出cpu資源進(jìn)入就緒狀態(tài)稍微等待
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// 如果任務(wù)還在執(zhí)行或還未執(zhí)行則構(gòu)建waitnode節(jié)點
else if (q == null)
q = new WaitNode();
// 利用cas機制將構(gòu)建好的waitnode節(jié)點加入邏輯鏈表
// 注意:該鏈表是棧的結(jié)構(gòu),所以并不是將新的節(jié)點
// 變?yōu)橹肮?jié)點的next署恍,而是新節(jié)點變?yōu)閔ead節(jié)點
// 舊節(jié)點變?yōu)閚ext后繼節(jié)點(這樣方便維護(hù)邏輯鏈表結(jié)構(gòu))
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 如果是超時等待的模式崎溃,則先判斷時間有沒有超時
// 如果已經(jīng)超時則刪除對應(yīng)節(jié)點并返回最新的state值
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
// 如果還沒超時則將線程掛起阻塞指定時間
LockSupport.parkNanos(this, nanos);
}
else
// 如果不是超時版本則直接掛起阻塞線程
LockSupport.park(this);
}
}
// FutureTask類 → report()方法
private V report(int s) throws ExecutionException {
// 獲取成員變量outcome的值
Object x = outcome;
// 如果state為正常終止?fàn)顟B(tài)則返回執(zhí)行結(jié)果
if (s == NORMAL)
return (V)x;
// 如果任務(wù)被取消或線程被中斷則拋出CancellationException異常
if (s >= CANCELLED)
throw new CancellationException();
// 如果state為異常終止?fàn)顟B(tài)則拋出捕獲的異常信息
throw new ExecutionException((Throwable)x);
}
ok~,源碼如上盯质,先來看看略微復(fù)雜的awaitDone()
方法袁串,在awaitDone()
方法首先會計算等待的截止時間(如果不需要超時等待則沒有截止時間),計算好等待的截止時間之后會開啟一個死循環(huán)唤殴,在循環(huán)中每次都會執(zhí)行如下步驟:
- ①判斷當(dāng)前等待者線程是否被其他線程中斷般婆,如果是則移除對應(yīng)節(jié)點信息并拋出
InterruptedException
異常強制中斷執(zhí)行 - ②判斷state是否大于COMPLETING,大于則代表任務(wù)執(zhí)行結(jié)束朵逝,置空當(dāng)前節(jié)點線程信息并返回最新state值
- ③判斷state是否等于COMPLETING蔚袍,等于則代表執(zhí)行即將結(jié)束,當(dāng)前線程讓出CPU資源配名,退出執(zhí)行狀態(tài)并進(jìn)入就緒狀態(tài)啤咽,等待最終態(tài)的出現(xiàn)
- ④如果任務(wù)還未執(zhí)行結(jié)束,則判斷當(dāng)前等待者線程是否已經(jīng)構(gòu)建waitnode節(jié)點渠脉,如果沒有則為當(dāng)前線程構(gòu)建節(jié)點信息
- ⑤如果新構(gòu)建出的節(jié)點還未加入等待鏈表宇整,則利用cas機制將構(gòu)建的節(jié)點設(shè)置為head節(jié)點,老head變?yōu)楫?dāng)前節(jié)點的next節(jié)點
- ⑥使用
LockSupport
類將當(dāng)前線程掛起阻塞芋膘,如果是超時等待的get()
則掛起阻塞特定時間
如上即是整個awaitDone()
方法的執(zhí)行流程鳞青,上個例子理解一下:
假設(shè)目前一個FutureTask任務(wù)狀態(tài)為NEW,并且成員waiter=null(相當(dāng)于鏈表為空为朋,head=null)臂拓。這時候兩條線程:T1先調(diào)用get()獲取執(zhí)行結(jié)果,緊接著T2也調(diào)用get()嘗試獲取執(zhí)行結(jié)果,同時此時任務(wù)還未開始執(zhí)行或者還在執(zhí)行過程中习寸,那么這兩條線程的執(zhí)行流程如下:
- T1:第一次循環(huán)時胶惰,
q==null且!queued
條件成立,為T1構(gòu)建節(jié)點信息并利用cas機制將waiter=T1
,T1節(jié)點成為head節(jié)點霞溪,第一次循環(huán)結(jié)束 - T2:第一次循環(huán)時孵滞,
q==null且!queued
條件成立中捆,為T2構(gòu)建節(jié)點信息并利用cas機制將waiter=T2,next=T1
,T2成為head節(jié)點,T1成為后繼節(jié)點坊饶,第一次循環(huán)結(jié)束 - T1:第二次循環(huán)時泄伪,假設(shè)任務(wù)還在執(zhí)行或還未執(zhí)行,所有條件都不成立幼东,只有最后的阻塞條件成立臂容,阻塞T1線程,T1掛起直至執(zhí)行結(jié)束
- T2:第二次循環(huán)時根蟹,假設(shè)任務(wù)還在執(zhí)行或還未執(zhí)行脓杉,所有條件都不成立,只有最后的阻塞條件成立简逮,阻塞T2線程球散,T2掛起直至執(zhí)行結(jié)束
- T1:任務(wù)執(zhí)行結(jié)束后調(diào)用
finishCompletion()
喚醒鏈表中的線程,T1恢復(fù)執(zhí)行散庶,第三次循環(huán)開始蕉堰,state>COMPLETING
成立,清空節(jié)點線程信息并返回最新state值 - T2:任務(wù)執(zhí)行結(jié)束后調(diào)用
finishCompletion()
喚醒鏈表中的線程悲龟,T2恢復(fù)執(zhí)行屋讶,第三次循環(huán)開始,state>COMPLETING
成立须教,清空節(jié)點線程信息并返回最新state值
ok~皿渗,F(xiàn)uture接口的get
方法分析到此結(jié)束了,接著繼續(xù)來看看cancel()
方法:
// FutureTask類 → cancel()方法
// 入?yún)閠rue代表中斷執(zhí)行者線程執(zhí)行
// 入?yún)閒alse代表取消任務(wù)執(zhí)行
public boolean cancel(boolean mayInterruptIfRunning) {
// 如果state!=NEW代表任務(wù)已經(jīng)執(zhí)行結(jié)束轻腺,直接返回false
// 如果入?yún)閒alse則CAS修改state=CANCELLED乐疆,返回true即可
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
// 因為call()方法執(zhí)行過程中可能會拋出異常
// 所以需要try finally語句塊保證等待者線程喚醒
try {
// 如果入?yún)閠rue,則中斷執(zhí)行者線程
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
// interrupt()不能保證一定中斷執(zhí)行線程
// 因為強制中斷線程不安全贬养,所以java棄用了stop()方法
// 而是換成了協(xié)調(diào)式中斷挤土,線程調(diào)用interrupt()后
// 只會發(fā)出中斷信號,由被中斷線程決定響不響應(yīng)中斷操作
t.interrupt();
} finally { // final state
// 中斷后利用CAS修改state=INTERRUPTED最終態(tài)
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
// 不管是取消任務(wù)還是中斷執(zhí)行误算,完成后都會喚醒等待鏈表的所有線程
finishCompletion();
}
return true;
}
當(dāng)某條調(diào)用FutureTask.cancel()
方法后仰美,會根據(jù)傳入的boolean
值決定是進(jìn)行中斷執(zhí)行者線程操作還是取消任務(wù)執(zhí)行,具體執(zhí)行邏輯如下:
-
mayInterruptIfRunning
傳入true
的情況下:- ①判斷任務(wù)是否已經(jīng)執(zhí)行結(jié)束儿礼,是則直接返回
false
筒占,反之則利用cas機制將state=INTERRUPTING
中斷中間狀態(tài) - ②獲取執(zhí)行者線程
runner
成員,調(diào)用執(zhí)行者的interrupt()
方法向執(zhí)行者線程發(fā)出一個中斷信號 - ③利用CAS機制將
state=INTERRUPTED
中斷最終態(tài) - 調(diào)用
finishCompletion()
方法喚醒鏈表中的所有等待者線程
- ①判斷任務(wù)是否已經(jīng)執(zhí)行結(jié)束儿礼,是則直接返回
-
mayInterruptIfRunning
傳入false
的情況下:- ①判斷任務(wù)是否已經(jīng)執(zhí)行結(jié)束蜘犁,是則直接返回
false
- ②如果任務(wù)還未執(zhí)行或還在執(zhí)行則利用cas機制將
state=CANCELLED
取消狀態(tài)
- ①判斷任務(wù)是否已經(jīng)執(zhí)行結(jié)束蜘犁,是則直接返回
但是不難發(fā)現(xiàn),在前面的源碼注釋中止邮,我曾寫到t.interrupt()
方法是無法強制中斷線程執(zhí)行的这橙,具體原因并不是因為無法做到強制中斷奏窑,這個是可以實現(xiàn)的,在JDK的早期版本中Thread
也提供了stop()
方法可以用來強制停止執(zhí)行線程屈扎,但是因為強制關(guān)閉執(zhí)行線程會導(dǎo)致一系列的問題產(chǎn)生埃唯,如:安全性問題闷哆、數(shù)據(jù)一致性問題等是目,所以在JDK后續(xù)的版本中便棄用了stop()
這類強制式中斷線程的方法,而是換成interrupt()
這種協(xié)調(diào)式中斷的方法尝盼。當(dāng)一條線程調(diào)用另外一條線程的interrupt()
方法后并不會直接強制中斷線程模蜡,而是僅僅給需要中斷的線程發(fā)送一個中斷信號漠趁,而是否中斷執(zhí)行則是由執(zhí)行線程本身決定的,而關(guān)于執(zhí)行線程是否能檢測到這個發(fā)出的中斷信號則需要取決于執(zhí)行線程運行的代碼忍疾。
ok~闯传,哪有人可能會疑惑:不能保證一定中斷執(zhí)行者線程,那么這個cancel()
方法還有作用嗎卤妒?
答案是肯定有用的甥绿,因為調(diào)用cancel()
方法之后,只要任務(wù)沒有執(zhí)行結(jié)束的情況下则披,state
一定會被修改成CANCELLED
或者INTERRUPTING
狀態(tài)共缕,那么后續(xù)等待者線程在執(zhí)行get
時則會檢測到任務(wù)狀態(tài)已經(jīng)不是NEW
了,那么則不會awaitDone()
方法阻塞等待士复。
到目前為止图谷,F(xiàn)uture接口的兩個核心方法已經(jīng)分析結(jié)束了,接著來看看兩個is
判斷類型的方法:isCancelled()判没、isDone()
:
// FutureTask類 → isCancelled()方法
public boolean isCancelled() {
// 如果state>=CANCELLED都代表狀態(tài)不是執(zhí)行終止?fàn)顟B(tài)
// 那么則代表著肯定是中斷或者取消狀態(tài)蜓萄,所以:
// state>=CANCELLED代表任務(wù)被取消,反之則是沒有
return state >= CANCELLED;
}
// FutureTask類 → isDone()方法
public boolean isDone() {
// 如果state!=NEW初始化狀態(tài)則代表任務(wù)執(zhí)行結(jié)束
// 因為就算是中間狀態(tài)也會很快變?yōu)樽罱K態(tài)
return state != NEW;
}
最后再上個簡單的FutureTask.get()
執(zhí)行流程:
3.3澄峰、FutureTask總結(jié)
在FutureTask出現(xiàn)之前嫉沽,Java中的多線程編程執(zhí)行任務(wù)后是不能獲取執(zhí)行結(jié)果的,當(dāng)我們需要多線程執(zhí)行后的結(jié)果時則需要自己經(jīng)過復(fù)雜的實現(xiàn)(如寫到緩存或者全局變量中主線程再去讀取)俏竞。而FutureTask整合了Runnable绸硕、Callable、Future三個接口魂毁,使得我們的多線程任務(wù)執(zhí)行后可以異步獲取到多線程的執(zhí)行結(jié)果玻佩。FutureTask會將執(zhí)行結(jié)束后的結(jié)果保存在成員變量:outcome中,等待獲取執(zhí)行結(jié)果的線程則讀取outcome成員值即可席楚。
四咬崔、CompletableFuture類詳解
在之前的FutureTask中,如果想要獲取到多線程執(zhí)行的結(jié)果,有兩種辦法垮斯,一種是輪詢FutureTask.isDone()
方法郎仆,當(dāng)結(jié)果為true的時候獲取執(zhí)行結(jié)果,第二種則是調(diào)用FutureTask.get()
方法兜蠕。但是無論那種方式都無法實現(xiàn)真正意義上的異步回調(diào)扰肌,因為任務(wù)執(zhí)行需要時間,所以都會使得主線程被迫阻塞熊杨,等待執(zhí)行結(jié)果返回后才能接著往下執(zhí)行曙旭,最多只能在一定程度上減少等待方面開銷的時間,如:
public static void main(String[] args) throws Exception {
FutureTask<String> futureTask = new FutureTask<>(() ->
Thread.currentThread().getName() + "-竹子愛熊貓......");
new Thread(futureTask, "T1").start();
// 可以在這里先完成別的工作晶府,因為任務(wù)執(zhí)行需要時間
// 最后再獲取執(zhí)行結(jié)果
System.out.println("main線程獲取異步執(zhí)行結(jié)果:"
+ futureTask.get());
}
這種方式可以在一定程度上利用異步任務(wù)執(zhí)行的時間來完成別的工作桂躏,但是總歸來說與“異步獲取執(zhí)行結(jié)果”這個設(shè)計的初衷還是存在出入。而CompletableFuture的出現(xiàn)則可以實現(xiàn)真正意義上的實現(xiàn)異步郊霎,不會在使用時因為任務(wù)還沒執(zhí)行完成導(dǎo)致獲取執(zhí)行結(jié)果的線程也被迫阻塞沼头,CompletableFuture將處理執(zhí)行結(jié)果的過程也放到異步線程里去完成,采用回調(diào)函數(shù)的概念解決問題书劝。
CompletableFuture是JDK8支持函數(shù)式編程后引入的一個類进倍,實現(xiàn)了Future與CompletionStage接口,利用CompletionStage接口中提供的方法去支持任務(wù)完成時觸發(fā)的函數(shù)和操作购对,用then猾昆,when等操作來防止FutureTask的get阻塞和輪詢isDone的現(xiàn)象出現(xiàn)。不過因為CompletableFuture類是JDK8中新加入的骡苞,所以它在使用的時候垂蜗,會配合大量的函數(shù)編程、鏈?zhǔn)骄幊探庥摹ambda表達(dá)式等一些JDK1.8的特性贴见,所以如果對于JDK1.8特性不太了解的可以參考之前的文章:Java8新特性。
因為CompletableFuture類中提供的方法有大概五六十個躲株,同時也存在非常多的內(nèi)部類片部,所以本次我們就一個個的依次分析一些重要的方法,先簡單的認(rèn)識后再逐步深入的分析原理實現(xiàn)霜定。
4.1档悠、CompletableFuture異步任務(wù)的創(chuàng)建方式
CompletableFuture中創(chuàng)建一個異步任務(wù)的方式總歸有三種:
- ①與之前的FutureTask一樣的使用方式,通過new對象完成創(chuàng)建
- ②通過CompletableFuture提供的靜態(tài)方法完成創(chuàng)建
- ③通過CompletableFuture提供的成員方法完成創(chuàng)建
下面逐步來一個個演示:
4.1.1望浩、創(chuàng)建CompletableFuture對象的方式創(chuàng)建異步任務(wù)
CompletableFuture類因為是作為FutureTask的優(yōu)化版本辖所,所以除開Java8語法的寫法之外,也保留了FutureTask的用法磨德,如下:
public class CompletableFutureDemo {
public static void main(String[] args) throws Exception {
CompletableFuture completableFuture = new CompletableFuture();
new Thread(()->{
System.out.println("異步任務(wù)......");
// 執(zhí)行完成后可以往CompletableFuture對象里面寫出返回值
completableFuture.complete(Thread.currentThread().getName());
}).start();
// 主線程獲取異步任務(wù)執(zhí)行結(jié)果
System.out.println("main線程獲取執(zhí)行結(jié)果:" + completableFuture.get());
for (int i = 1; i <= 3; i++){
System.out.println("main線程 - 輸出:"+i);
}
}
/**
* 執(zhí)行結(jié)果:
* 異步任務(wù)......
* main線程獲取執(zhí)行結(jié)果:Thread-0
* main線程 - 輸出:1
* main線程 - 輸出:2
* main線程 - 輸出:3
*/
}
這種方式比較簡單缘回,也比較容易理解,創(chuàng)建一條線程執(zhí)行異步操作,執(zhí)行完成后往completableFuture
對象中寫入需要返回的值切诀,而主線程則調(diào)用completableFuture.get()
方法獲取異步線程寫回的值揩环。單身顯而易見,這種方式與之前的FutureTask
沒任何區(qū)別幅虑,在主線程獲取到執(zhí)行結(jié)果之前,因為任務(wù)還在執(zhí)行顾犹,所以主線程會被迫阻塞倒庵,等待任務(wù)執(zhí)行結(jié)束后才能繼續(xù)往下執(zhí)行。
4.1.2炫刷、通過CompletableFuture靜態(tài)方法完成異步任務(wù)創(chuàng)建
CompletableFuture類提供了五個靜態(tài)方法可以完成創(chuàng)建異步任務(wù)的操作擎宝,如下:
public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor);
public static <U> CompletableFuture<U> completedFuture(U value);
在這四個方法中,run
開頭的代表創(chuàng)建一個沒有返回值的異步任務(wù)浑玛,supply
開頭的方法代表創(chuàng)建一個具備返回值的異步任務(wù)绍申。同時這兩類方法都支持指定執(zhí)行線程池,如果不指定執(zhí)行線程池顾彰,CompletableFuture則會默認(rèn)使用ForkJoinPool.commonPool()
線程池內(nèi)的線程執(zhí)行創(chuàng)建出的異步任務(wù)极阅。 ok~,上個案例理解一下:
創(chuàng)建一個異步任務(wù)完成100內(nèi)的偶數(shù)求和涨享,執(zhí)行完成后返回求和結(jié)果筋搏,代碼如下:
public class CompletableFutureDemo {
public static void main(String[] args) throws Exception {
// 創(chuàng)建有返回值的異步任務(wù) ::為方法引用的語法
CompletableFuture<String> supplyCF = CompletableFuture
.supplyAsync(CompletableFutureDemo::evenNumbersSum);
// 執(zhí)行成功的回調(diào)
supplyCF.thenAccept(System.out::println);
// 執(zhí)行過程中出現(xiàn)異常的回調(diào)
supplyCF.exceptionally((e)->{
e.printStackTrace();
return "異步任務(wù)執(zhí)行過程中出現(xiàn)異常....";
});
// 主線程執(zhí)行打印1234...操作
// 因為如果不為CompletableFuture指定線程池執(zhí)行任務(wù)的情況下,
// CompletableFuture默認(rèn)是使用ForkJoinPool.commonPool()的線程
// 同時是作為main線程的守護(hù)線程進(jìn)行的厕隧,如果main掛了奔脐,執(zhí)行異步任
// 務(wù)的線程也會隨之終止結(jié)束,并不會繼續(xù)執(zhí)行異步任務(wù)
for (int i = 1; i <= 10; i++){
System.out.println("main線程 - 輸出:"+i);
Thread.sleep(50);
}
}
// 求和100內(nèi)的偶數(shù)
private static String evenNumbersSum() {
int sum = 0;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 1; i <= 100; i++) {
if (i % 2 == 0) sum += i;
}
return Thread.currentThread().getName()+"線程 - 100內(nèi)偶數(shù)之和:"+sum;
}
/**
* 執(zhí)行結(jié)果:
* main線程 - 輸出:1
* main線程 - 輸出:2
* ForkJoinPool.commonPool-worker-1線程 - 100內(nèi)偶數(shù)之和:2550
* main線程 - 輸出:3
* main線程 - 輸出:4
* main線程 - 輸出:5
* main線程 - 輸出:6
* main線程 - 輸出:7
* main線程 - 輸出:8
* main線程 - 輸出:9
* main線程 - 輸出:10
**/
}
如上案例吁讨,通過了CompletableFuture.supplyAsync
創(chuàng)建了一個帶返回值的異步任務(wù)supplyCF
髓迎,因為沒有指定線程池則使用默認(rèn)的ForkJoinPool.commonPool()
線程池來完成該任務(wù)的執(zhí)行,同時采用supplyCF.thenAccept
作為成功的回調(diào)方法建丧,采用supplyCF.exceptionally
作為執(zhí)行過程中拋出異常時的回調(diào)方法排龄,同時主線程main
創(chuàng)建完成異步任務(wù)后,寫好了成功和失敗的回調(diào)函數(shù)后茶鹃,繼續(xù)執(zhí)行打印1涣雕、2、3闭翩、4...
的邏輯挣郭。從上面的執(zhí)行結(jié)果我們可以看出,當(dāng)main線程創(chuàng)建好異步任務(wù)以及相關(guān)后續(xù)處理后疗韵,其實并沒有阻塞等待任務(wù)的完成兑障,而是繼續(xù)執(zhí)行接下來的邏輯,當(dāng)任務(wù)執(zhí)行結(jié)束時則會觸發(fā)提前定義好的回調(diào)函數(shù),返回任務(wù)執(zhí)行結(jié)果(執(zhí)行出現(xiàn)異常則會將捕獲的異常信息返回給exceptionally回調(diào)函數(shù))流译。顯而易見逞怨,CompletableFuture
任務(wù)對比之前的FutureTask
任務(wù),在執(zhí)行上以及執(zhí)行結(jié)果返回上實現(xiàn)了真正意義上的“異步”福澡。
ok~叠赦,接下來再看看其他幾種創(chuàng)建CompletableFuture
異步任務(wù)的靜態(tài)方法使用。如下:
public class CompletableFutureDemo {
public static void main(String[] args) throws Exception {
// 創(chuàng)建有返回值的異步任務(wù)
CompletableFuture<String> supplyCF = CompletableFuture
.supplyAsync(CompletableFutureDemo::evenNumbersSum);
// 執(zhí)行成功的回調(diào)
supplyCF.thenAccept(System.out::println);
// 執(zhí)行過程中出現(xiàn)異常的回調(diào)
supplyCF.exceptionally((e)->{
e.printStackTrace();
return "異步任務(wù)執(zhí)行過程中出現(xiàn)異常....";
});
// 主線程執(zhí)行打印1234...操作
// 因為如果不為CompletableFuture指定線程池執(zhí)行任務(wù)的情況下革砸,
// CompletableFuture默認(rèn)是使用ForkJoinPool.commonPool()的線程
// 同時是作為main線程的守護(hù)線程進(jìn)行的除秀,如果main掛了,執(zhí)行異步任
// 務(wù)的線程也會隨之終止結(jié)束算利,并不會繼續(xù)執(zhí)行異步任務(wù)
for (int i = 1; i <= 10; i++){
System.out.println("main線程 - 輸出:"+i);
Thread.sleep(50);
}
/***************************************************/
// 創(chuàng)建一個異步任務(wù)册踩,已經(jīng)給定返回值了
CompletableFuture c = CompletableFuture.completedFuture("竹子");
c.thenApply(r -> {
System.out.println("上個任務(wù)結(jié)果:"+r);
return r+"...熊貓";
});
c.thenAccept(System.out::println);
/***************************************************/
// 創(chuàng)建一個沒有返回值的異步任務(wù)
CompletableFuture runCF = CompletableFuture.runAsync(()->{
System.out.println(Thread.currentThread().getName()+"沒有返回值的異步任務(wù)");
});
/***************************************************/
// 創(chuàng)建單例的線程池
ExecutorService executor = Executors.newSingleThreadExecutor();
// 創(chuàng)建一個有返回值的異步任務(wù)并指定執(zhí)行的線程池
CompletableFuture<String> supplyCFThreadPool =
CompletableFuture.supplyAsync(CompletableFutureDemo::oddNumbersSum,executor);
// // 執(zhí)行過程中出現(xiàn)異常的回調(diào)
supplyCFThreadPool.thenAccept(System.out::println);
// 執(zhí)行過程中出現(xiàn)異常的回調(diào)
supplyCF.exceptionally((e)->{
e.printStackTrace();
return "異步任務(wù)執(zhí)行過程中出現(xiàn)異常....";
});
// 關(guān)閉線程池
executor.shutdown();
}
// 求和100內(nèi)的偶數(shù)
private static String evenNumbersSum() {
int sum = 0;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 1; i <= 100; i++) {
if (i % 2 == 0) sum += i;
}
return Thread.currentThread().getName()+"線程 - 100內(nèi)偶數(shù)之和:"+sum;
}
// 求和100內(nèi)的奇數(shù)
private static String oddNumbersSum() {
int sum = 0;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 1; i <= 100; i++) {
if (i % 2 != 0) sum += i;
}
return Thread.currentThread().getName()+"線程 - 100內(nèi)奇數(shù)之和:"+sum;
}
}
在上述的案例中,分別創(chuàng)建了四個異步任務(wù)效拭,第一個則是前面分析的案例暂吉,不再贅述。
- 第二個則是創(chuàng)建了一個有返回值的異步任務(wù)缎患,同時與第一個任務(wù)不同的是:我們指定執(zhí)行的線程池
executor
慕的,那么該任務(wù)在執(zhí)行的時候則不會使用默認(rèn)的ForkJoinPool.commonPool()
線程池,不過在使用這種方式的時候较锡,一定要記住需要關(guān)閉自己創(chuàng)建的線程池业稼。 - 第三個異步任務(wù)則是通過
CompletableFuture.runAsync
方法創(chuàng)建了一個沒有返回值的異步任務(wù),傳遞的參數(shù)是一個Runnable
對象蚂蕴,與最開始的new Thread()
方式區(qū)別不大低散,但是與之不同的是:該任務(wù)的執(zhí)行因為沒有指定線程池,所以也是通過默認(rèn)的ForkJoinPool.commonPool()
線程池執(zhí)行的骡楼,而不會另起線程執(zhí)行熔号。 - 第四個任務(wù)則是創(chuàng)建了一個已經(jīng)提前指定了返回值的
CompletableFuture
任務(wù),很多人可能會感覺這種方式很雞肋鸟整,但是可以配合CompletableFuture成員方法完成鏈?zhǔn)絼?chuàng)建引镊。
4.1.3、通過CompletableFuture成員方法完成異步任務(wù)創(chuàng)建
在使用這種方式創(chuàng)建任務(wù)的前提是需要建立在已經(jīng)創(chuàng)建出一個CompletableFuture
對象上篮条〉芡罚總歸來說這類成員方法創(chuàng)建異步任務(wù)的方式屬于串行化的形式創(chuàng)建的,下一個任務(wù)依賴于上一個任務(wù)的執(zhí)行結(jié)果時涉茧,就可以采用這種方式赴恨。CompletableFuture中提供很多這類方法:
// 可以基于CompletableFuture對象接著創(chuàng)建一個有返回任務(wù)
public <U> CompletableFuture<U> thenApply(Function<? super T,
? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,
? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,
? extends U> fn, Executor executor)
// 可以在上一個任務(wù)執(zhí)行失敗的情況下接著執(zhí)行
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable,
? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable,
? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable,
? extends U> fn,Executor executor);
// 可以基于CompletableFuture對象接著創(chuàng)建一個無返回任務(wù)
CompletionStage<Void> thenRun(Runnable action);
CompletionStage<Void> thenRunAsync(Runnable action);
CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
// 與thenApply方法類似,但是thenApply方法操作的是同一個CompletableFuture
// 而該類方法則是生產(chǎn)新的CompletableFuture<返回值>對象進(jìn)行操作
public <U> CompletableFuture<U> thenCompose(Function<? super T,
? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,
? extends CompletionStage<U>> fn,Executor executor)
如上列出來了一些常用的方法伴栓,方法總體分為四類伦连,這四類方法都可以使得任務(wù)串行化執(zhí)行:
- ①thenApply類:此類方法可以基于上一個任務(wù)再創(chuàng)建一個新的有返回型任務(wù)雨饺。
- ②handle類:與thenApply類作用相同,不同點在于thenApply類方法只能在上一個任務(wù)執(zhí)行正常的情況下才能執(zhí)行惑淳,當(dāng)上一個任務(wù)執(zhí)行拋出異常后則不會執(zhí)行额港。而handle類在上個任務(wù)出現(xiàn)異常的情況下也可以接著執(zhí)行。
- ③thenRun類:此類方法可以基于上一個任務(wù)再創(chuàng)建一個新的無返回型任務(wù)歧焦。
- ④thenCompose類:與thenApply類大致相同移斩,不同點在于每次向下傳遞都是新的CompletableFuture對象,而thenApply向下傳遞的都是同一個CompletableFuture對象對象
但是不難發(fā)現(xiàn)绢馍,不管是哪類方法叹哭,其實方法名都會有后面跟了Async
的和沒跟Async
的,那么這種跟了Async
代表什么意思呢痕貌?如果調(diào)用方法名不帶Async
的方法創(chuàng)建出的任務(wù)都是由上一個任務(wù)的執(zhí)行線程來執(zhí)行的,在上一個任務(wù)沒有執(zhí)行完成的情況下糠排,當(dāng)前創(chuàng)建出來的任務(wù)會等待上一個任務(wù)執(zhí)行完成后再執(zhí)行舵稠。而如果是通過Async
這類方法創(chuàng)建出來的任務(wù)則不受到這個限制,通過調(diào)用方法名帶Async
的方法創(chuàng)建出的任務(wù)入宦,具體的執(zhí)行線程會根據(jù)實際情況來決定哺徊,主要會分為如下三種情況:
- ①上一個任務(wù)已經(jīng)執(zhí)行結(jié)束了,那么當(dāng)前創(chuàng)建出的任務(wù)會交給上個任務(wù)的執(zhí)行線程來執(zhí)行
- ②上一個任務(wù)還沒有執(zhí)行結(jié)束乾闰,那么則會另啟一條線程來執(zhí)行
- ③如果創(chuàng)建任務(wù)時指定了執(zhí)行線程池落追,則會使用指定線程池的線程來執(zhí)行
ok~,對這些方法有了基本了解之后再接著來看個案例:
public class CompletableFutureDemo {
public static void main(String[] args) throws Exception {
CompletableFuture cf =
CompletableFuture.supplyAsync(CompletableFutureDemo::evenNumbersSum)
// 鏈?zhǔn)骄幊蹋夯谏蟼€任務(wù)的返回繼續(xù)執(zhí)行新的任務(wù)
.thenApply(r -> {
System.out.println("獲取上個任務(wù)的執(zhí)行結(jié)果:" + r);
// 通過上個任務(wù)的執(zhí)行結(jié)果完成計算:求和100所有數(shù)
return r + oddNumbersSum();
}).thenApplyAsync(r -> {
System.out.println("獲取上個任務(wù)的執(zhí)行結(jié)果:" + r);
Integer i = r / 0; // 拋出異常
return r;
}).handle((param, throwable) -> {
if (throwable == null) {
return param * 2;
}
// 獲取捕獲的異常
System.out.println(throwable.getMessage());
System.out.println("我可以在上個任務(wù)" +
"拋出異常時依舊執(zhí)行....");
return -1;
}).thenCompose(x ->
CompletableFuture.supplyAsync(() -> x+1
)).thenRun(() -> {
System.out.println("我是串行無返回任務(wù)....");
});
// 主線程執(zhí)行休眠一段時間
// 因為如果不為CompletableFuture指定線程池執(zhí)行任務(wù)的情況下涯肩,
// CompletableFuture默認(rèn)是使用ForkJoinPool.commonPool()的線程
// 同時是作為main線程的守護(hù)線程進(jìn)行的轿钠,如果main掛了,執(zhí)行異步任
// 務(wù)的線程也會隨之終止結(jié)束病苗,并不會繼續(xù)執(zhí)行異步任務(wù)
Thread.sleep(2000);
}
// 求和100內(nèi)的偶數(shù)
private static int evenNumbersSum() {
int sum = 0;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 1; i <= 100; i++) {
if (i % 2 == 0) sum += i;
}
return sum;
}
// 求和100內(nèi)的奇數(shù)
private static int oddNumbersSum() {
int sum = 0;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 1; i <= 100; i++) {
if (i % 2 != 0) sum += i;
}
return sum;
}
}
在案例中疗垛,我們創(chuàng)建了六個異步任務(wù):
- ①求和100內(nèi)的所有偶數(shù)
- ②基于第一個任務(wù)的結(jié)果再加上100內(nèi)奇數(shù)總值計算100內(nèi)所有數(shù)字的總和
- ③基于第二個任務(wù)的結(jié)果除0拋出一個異常
- ④使用handle創(chuàng)建一個可以在上個任務(wù)拋出異常時依舊執(zhí)行的任務(wù)
- ⑤使用thenCompose創(chuàng)建一個基于上個任務(wù)返回值+1的任務(wù)
- ⑥使用thenRun創(chuàng)建了一個沒有返回值的任務(wù)
執(zhí)行結(jié)果如下:
/* *
* 執(zhí)行結(jié)果:
* 獲取上個任務(wù)的執(zhí)行結(jié)果:2550
* 獲取上個任務(wù)的執(zhí)行結(jié)果:5050
* java.lang.ArithmeticException: / by zero
* 我可以在上個任務(wù)拋出異常時依舊執(zhí)行....
* 我是串行無返回任務(wù)....
* */
至此,三種創(chuàng)建CompletableFuture異步任務(wù)的方式介紹完畢硫朦,接下來再看看CompletableFuture中一些其他的常用操作贷腕。
4.2、CompletableFuture異步任務(wù)執(zhí)行結(jié)果獲取及回調(diào)處理
在之前的FutureTask
獲取執(zhí)行結(jié)果是通過FutureTask.get()
獲得咬展,而在CompletableFuture中則提供了多樣化的方式泽裳,可以與之前的FutureTask阻塞式獲取,也可以通過回調(diào)函數(shù)的方式異步通知破婆,接下來可以看看CompletableFuture中提供的一些相關(guān)方法:
// 阻塞主線程獲取執(zhí)行結(jié)果(與FutureTask.get()一致)
public T get();
// 上個方法的超時版本
public T get(long timeout,TimeUnit unit);
// 嘗試獲取執(zhí)行結(jié)果涮总,執(zhí)行完成返回執(zhí)行結(jié)果,未完成返回任務(wù)參數(shù)
public T getNow(T valueIfAbsent);
// 阻塞主線程等待任務(wù)執(zhí)行結(jié)束
public T join();
// 無返回的異步任務(wù)正常執(zhí)行完成后可以通過此方法寫出返回值
public boolean complete(T value);
// 無返回的異步任務(wù)執(zhí)行異常后可以通過此方法寫出捕獲的異常信息
public boolean completeExceptionally(Throwable ex);
// 任務(wù)正常執(zhí)行完成后的回調(diào)函數(shù):默認(rèn)由執(zhí)行任務(wù)的線程執(zhí)行回調(diào)邏輯
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
// 任務(wù)正常執(zhí)行完成后的回調(diào)函數(shù):另啟一條線程執(zhí)行回調(diào)邏輯
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
// 任務(wù)正常執(zhí)行完成后的回調(diào)函數(shù):指定線程池執(zhí)行回調(diào)邏輯
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,
Executor executor);
// 執(zhí)行過程中出現(xiàn)異常時執(zhí)行的回調(diào)方法
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
// 執(zhí)行結(jié)束后會執(zhí)行的回調(diào)邏輯
public CompletableFuture<T> whenComplete(BiConsumer<? super T,
? super Throwable> action)
// 上個方法的異步版本
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,
? super Throwable> action)
// 上個方法的指定線程池版本
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,
? super Throwable> action, Executor executor)
上面列出的這些方法總歸分為如下幾種:
- ①主線程調(diào)用直接獲取執(zhí)行結(jié)果的
get荠割、getNow
方法 - ②可以為無返回值的異步任務(wù)寫出執(zhí)行結(jié)果的:
complete
開頭的方法 - ②任務(wù)正常執(zhí)行成功的回調(diào):
thenAccept
開頭的方法 - ④任務(wù)執(zhí)行拋出異常的回調(diào):
exceptionally
方法 - ⑤任務(wù)執(zhí)行結(jié)束的回調(diào):
whenComplete
開頭的方法
ok~妹卿,這些方法使用起來比較簡單旺矾,就不再上案例演示了,因為本章篇幅已經(jīng)夠長了夺克,下面還有內(nèi)容要談箕宙,所以感興趣的可以自己寫個簡單的Demo
調(diào)試。
4.3铺纽、CompletionStage任務(wù)時序關(guān)系:串行柬帕、并行、匯聚實現(xiàn)
CompletableFuture是同時實現(xiàn)了CompletionStage
與Future
接口的狡门,而在CompletionStage接口中提供了一些方法可以很好的描述任務(wù)之間串行陷寝、并行、匯聚等時序關(guān)系其馏。
4.3.1凤跑、CompletionStage任務(wù)之間的時序關(guān)系 - 串行
CompletableFuture類中的實現(xiàn)中,總歸有五大類方法用來描述任務(wù)之間串行關(guān)系的方法叛复,它們分別為:thenApply
仔引、thenAccept
、thenRun
褐奥、handle
以及thenCompose
咖耘,在CompletableFuture類中方法名以這五個開頭的函數(shù)都是用來描述任務(wù)之間的串行關(guān)系。
關(guān)于CompletableFuture串行的解讀:比如目前創(chuàng)建三個任務(wù)①②③撬码,這三個任務(wù)都是異步的方式執(zhí)行的儿倒,但是任務(wù)②依賴于①的執(zhí)行結(jié)果,任務(wù)③依賴于②的執(zhí)行結(jié)果呜笑,執(zhí)行順序總為①→②→③夫否,這種關(guān)系便被稱為串行關(guān)系。
ok~蹈垢,關(guān)于串行的案例可以參考4.1.3慷吊、通過CompletableFuture成員方法完成異步任務(wù)創(chuàng)建
段落。
4.3.2曹抬、CompletionStage任務(wù)之間的時序關(guān)系 - 并行
在CompletionStage中并沒有關(guān)于描述任務(wù)之間并行并行關(guān)系的方法溉瓶,因為也沒有必要,畢竟并行只是串行的多執(zhí)行谤民。
可能有人會不解我上面最后一句話:“畢竟并行只是串行的多執(zhí)行”堰酿。那我們舉個例子理解:比如main主線程創(chuàng)建三個任務(wù)①②③,全部交由T1線程執(zhí)行张足,那么執(zhí)行關(guān)系便是T1執(zhí)行①→②→③触创,這種情況被稱為串行異步。而并行則是指main主線程創(chuàng)建三個任務(wù)①②③为牍,并將這三個任務(wù)分別交由:T1哼绑、T2岩馍、T3三條不同的線程執(zhí)行,三條線程同時執(zhí)行三個不同的任務(wù)抖韩,執(zhí)行則是T1→①蛀恩,T2→②,T3→③茂浮,這種情況便被稱為并行異步双谆。
ok~,上個例子:假設(shè)目前需要分別求和100內(nèi)的所有奇數(shù)與偶數(shù)之和席揽,實現(xiàn)如下:
public class CompletableFutureDemo {
public static void main(String[] args) throws Exception {
CompletableFuture cf1 =
CompletableFuture.supplyAsync(CompletableFutureDemo::evenNumbersSum);
CompletableFuture cf2 =
CompletableFuture.supplyAsync(CompletableFutureDemo::oddNumbersSum);
// 防止main線程死亡導(dǎo)致執(zhí)行異步任務(wù)的線程也終止執(zhí)行
Thread.sleep(3000);
}
// 求和100內(nèi)的偶數(shù)
private static int evenNumbersSum() {
int sum = 0;
System.out.println(Thread.currentThread().getName()
+"線程...執(zhí)行了求和偶數(shù)....");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 1; i <= 100; i++) {
if (i % 2 == 0) sum += i;
}
return sum;
}
// 求和100內(nèi)的奇數(shù)
private static int oddNumbersSum() {
int sum = 0;
System.out.println(Thread.currentThread().getName()
+"線程...執(zhí)行了求和奇數(shù)....");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 1; i <= 100; i++) {
if (i % 2 != 0) sum += i;
}
return sum;
}
/* *
* 執(zhí)行結(jié)果:
* ForkJoinPool.commonPool-worker-1線程...執(zhí)行了求和偶數(shù)....
* ForkJoinPool.commonPool-worker-2線程...執(zhí)行了求和奇數(shù)....
* */
}
如上案例中顽馋,創(chuàng)建了兩個任務(wù)cf1、cf2
分別執(zhí)行求和偶數(shù)盒求和奇數(shù)的邏輯幌羞,同時為了可以觀察到并行執(zhí)行的效果寸谜,在兩個計算方法:evenNumbersSum
、oddNumbersSum
中分別休眠1秒鐘模擬任務(wù)執(zhí)行耗時属桦。執(zhí)行如上案例后的結(jié)果也可以明顯觀測到程帕,這兩個任務(wù)分別是由兩條不同的線程worker-1
、worker-2
并行執(zhí)行地啰。
4.3.3、CompletionStage任務(wù)之間的時序關(guān)系 - 匯聚
CompletionStage接口中描述任務(wù)之間匯聚關(guān)系的方法總供有兩類讲逛,一類是and類型的亏吝,代表著任務(wù)都要執(zhí)行完成后才開始處理的方法。另一類則是or類型的盏混,代表著任務(wù)只要有任意一個執(zhí)行完成就開始處理的方法蔚鸥。提供的方法如下:
- AND類型:
- thenCombine系列:可以接收前面任務(wù)的結(jié)果進(jìn)行匯聚計算,并且計算后可以返回值
- thenAcceptBoth系列:可以接收前面任務(wù)的結(jié)果進(jìn)行匯聚計算许赃,但計算后沒有返回值
- runAfterBoth系列:不可以接收前面任務(wù)的結(jié)果且無返回止喷,但可以在任務(wù)結(jié)束后進(jìn)行匯聚計算
- CompletableFuture類的allOf系列:不可接收之前任務(wù)的結(jié)果,但可以匯聚多個任務(wù)混聊,但是要配合回調(diào)處理方法一起使用
- OR類型:
- applyToEither系列:接收最先完成的任務(wù)結(jié)果進(jìn)行處理弹谁,處理完成后可以返回值
- acceptEither系列:接收最先完成的任務(wù)結(jié)果進(jìn)行處理,但是處理完成后不能返回
- runAfterEither系列:不能接收前面任務(wù)的返回值且無返回句喜,單可以為最先完成的任務(wù)進(jìn)行后繼處理
- CompletableFuture類的anyOf系列:可以同時匯聚任意個任務(wù)预愤,并接收最先執(zhí)行完成的任務(wù)結(jié)果進(jìn)行處理,處理完成后沒有返回值咳胃,需要配合回調(diào)方法一起使用
ok~植康,在如上,無論是描述任務(wù)之間AND類型還是OR類型的匯聚關(guān)系展懈,CompletionStage接口都分別提供了三個系列的方法销睁,而這三個系列的方法作用大體相同供璧,區(qū)別則是在于入?yún)㈩愋偷牟煌鋵嵄举|(zhì)上也只是BiFunction冻记、BiConsumer睡毒、Runnable
三個函數(shù)式接口的不同。而除開CompletionStage接口都提供的三個系列外檩赢,CompletableFuture類還分別拓展了兩個方法:anyOf吕嘀、allOf
,這兩個方法的則是可以同時匯聚多個任務(wù)贞瞒,之前的只能同時匯聚兩個任務(wù)進(jìn)行處理偶房。接下來看看案例:
public class CompletableFutureDemo {
public static void main(String[] args) throws Exception {
/*--------------------創(chuàng)建兩個異步任務(wù)CF1/CF2------------------*/
CompletableFuture<Integer> cf1 =
CompletableFuture.supplyAsync(CompletableFutureDemo::evenNumbersSum);
CompletableFuture<Integer> cf2 =
CompletableFuture.supplyAsync(CompletableFutureDemo::oddNumbersSum);
/*--------------------測試AND類型匯聚方法------------------*/
CompletableFuture<Integer> cfThenCombine = cf1.thenCombine(cf2, (r1, r2) -> {
System.out.println("cf1任務(wù)計算結(jié)果:" + r1);
System.out.println("cf2任務(wù)計算結(jié)果:" + r2);
return r1 + r2;
});
System.out.println("cf1,cf2任務(wù)ThenCombine匯聚處理結(jié)果:" + cfThenCombine.get());
// thenAcceptBoth、runAfterBoth系列與thenCombine差不多相同
// 區(qū)別就在于入?yún)iFunction军浆、BiConsumer棕洋、Runnable三個函數(shù)式接口的不同
// 使用allOf匯聚兩個任務(wù)(可以匯聚多個)
CompletableFuture cfAllOf = CompletableFuture.allOf(cf1, cf2);
// 配合thenAccept成功回調(diào)函數(shù)使用
cfAllOf.thenAccept( o -> System.out.println("所有任務(wù)完成后進(jìn)行斷后處理...."));
//分割線
Thread.sleep(2000);
System.err.println("--------------------------------------");
/*--------------------測試OR類型匯聚方法------------------*/
CompletableFuture<Integer> cfApplyToEither = cf1.applyToEither(cf2, r -> {
System.out.println("最先執(zhí)行完成的任務(wù)結(jié)果:" + r);
return r * 10;
});
System.out.println("cf1,cf2任務(wù)applyToEither匯聚處理結(jié)果:"+cfApplyToEither.get());
// acceptEither、runAfterEither系列與applyToEither系列也差不多相同
// 區(qū)別就也是在于入?yún)iFunction乒融、BiConsumer掰盘、Runnable三個函數(shù)式接口的不同
// 使用anyOf匯聚兩個任務(wù),誰先執(zhí)行完成就處理誰的執(zhí)行結(jié)果
CompletableFuture cfAnyOf = CompletableFuture.anyOf(cf1, cf2);
// 配合thenAccept成功回調(diào)函數(shù)使用
cfAnyOf.thenAccept(r -> {
System.out.println("最先執(zhí)行完成的任務(wù)結(jié)果:" + r);
System.out.println("對先完成的任務(wù)結(jié)果進(jìn)行后續(xù)處理....");
});
}
// 求和100內(nèi)的偶數(shù)
private static int evenNumbersSum() {
int sum = 0;
try {
Thread.sleep(800); // 模擬耗時
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 1; i <= 100; i++) {
if (i % 2 == 0) sum += i;
}
return sum;
}
// 求和100內(nèi)的奇數(shù)
private static int oddNumbersSum() {
int sum = 0;
try {
Thread.sleep(1000); // 模擬耗時
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 1; i <= 100; i++) {
if (i % 2 != 0) sum += i;
}
return sum;
}
}
簡單分析一下如上案例赞季。首先分別創(chuàng)建了兩個異步任務(wù):①求和100內(nèi)的所有偶數(shù)愧捕、②求和100內(nèi)的所有奇數(shù)。
and類型測試:等到兩個任務(wù)都執(zhí)行完成后申钩,奇數(shù)和與偶數(shù)和累加求和100內(nèi)所有數(shù)字次绘。
or類型測試:任意一個任務(wù)執(zhí)行完成后,都對該任務(wù)的執(zhí)行結(jié)果x10倍撒遣。
為了能夠觀察到結(jié)果邮偎,我們在偶數(shù)求和的方法中休眠0.8s,奇數(shù)求和方法中休眠1s义黎,使得兩個任務(wù)執(zhí)行時間存在差距禾进,這可以很清晰的讓我們觀察到or類型的測試結(jié)果。
ok~廉涕,執(zhí)行如上案例結(jié)果如下:
/* *
* 執(zhí)行結(jié)果:
* cf1任務(wù)計算結(jié)果:2550
* cf2任務(wù)計算結(jié)果:2500
* cf1,cf2任務(wù)ThenCombine匯聚處理結(jié)果:5050
* 所有任務(wù)完成后進(jìn)行斷后處理....
* --------------------------------------
* 最先執(zhí)行完成的任務(wù)結(jié)果:2550
* cf1,cf2任務(wù)applyToEither匯聚處理結(jié)果:25500
* 最先執(zhí)行完成的任務(wù)結(jié)果:2550
* 對先完成的任務(wù)結(jié)果進(jìn)行后續(xù)處理....
* */
五泻云、CompletableFuture異步回調(diào)原理淺析
CompletableFuture的原理如若要深究,則比較復(fù)雜狐蜕,因為需要結(jié)合線程池一塊兒理解壶愤。同時本章的篇幅夠長了,所以我們關(guān)于原理的深究則放在后續(xù)的文章中再從源碼角度窺探馏鹤,下面簡單的談?wù)凜ompletableFuture的原理:
- ①為什么CompletableFuture可以實現(xiàn)異步回調(diào)征椒,將結(jié)果實現(xiàn)異步通知?
因為創(chuàng)建出的異步任務(wù)在執(zhí)行時湃累,會由一條新的線程執(zhí)行勃救,任務(wù)執(zhí)行完成后碍讨,回調(diào)函數(shù)的邏輯也是由執(zhí)行任務(wù)的線程處理的。 - 為什么CompletableFuture可以實現(xiàn)鏈?zhǔn)骄幊堂擅耄瓿扇蝿?wù)的串行創(chuàng)建且執(zhí)行勃黍?
因為CompletableFuture實現(xiàn)了CompletionStage接口,在每個任務(wù)執(zhí)行完成后又回返回一個CompletableFuture對象晕讲,使用時我們可以接著基于該對象繼續(xù)創(chuàng)建新的任務(wù)覆获,同時每個CompletableFuture對象中存在一個鏈表,一個新創(chuàng)建的任務(wù)到來瓢省,如果線程還未執(zhí)行完當(dāng)前任務(wù)弄息,則會將新到來的任務(wù)加入鏈表等待,線程處理完當(dāng)前任務(wù)則會接著執(zhí)行鏈表中的任務(wù)勤婚。