要深入理解CompletableFuture纵潦,就必須先搞懂Future的原理辜贵,因為CompletableFuture是在其基礎上增加了回調(diào)和JDK1.8的操作
一、線程池提交Callable夹攒,返回Future洒忧,通過future.get运授,阻塞地獲取結果
項目中使用Future烤惊,將Callable<T> task作為參數(shù)提交到線程池中,然后通過get獲取結果
<pre> ListenableFuture<String> future = listeningExecutorService
.submit(() -> save());
executeFutures.add(future);
future.get();
submit方法吁朦,這里的線程池繼承了ThreadExecutor柒室,實現(xiàn)了submit方法。在原來的submit上做了增強逗宜,能夠在線程傳遞之時雄右,把spanId傳給子線程。submit依然是交給父類ThreadExecutor的submit方法執(zhí)行
@Override
public <T> Future<T> submit(Callable<T> task) {
// 獲取父線程MDC中的內(nèi)容纺讲,必須在run方法之前擂仍,否則等異步線程執(zhí)行的時候有可能MDC里面的值已經(jīng)被清空了,這個時候就會返回null
Map<String, String> context = MDC.getCopyOfContextMap();
return super.submit(() -> run(task, context));
}
提交后熬甚,AbstractExecutorService會將Callable通過newTaskFor利用構造函數(shù)新建一個FutureTask逢渔,由于RunnableFuture繼承了Runnable和Future接口,所以excute可以執(zhí)行ftask乡括。并且可以返回結果
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
構造函數(shù)保存callable肃廓,將state初始化為NEW,而且還可以兼容Runnable的傳入诲泌。通過Executors.callable適配器模式盲赊,將Runnable適配為Callable
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
提交到線程池之后,什么時候能夠執(zhí)行敷扫,就跟線程池的策略有關了哀蘑,具體跟核心線程數(shù)、工作隊列、最大線程數(shù)绘迁、拒絕策略等參數(shù)有關惨险。當輪到此任務執(zhí)行時,會調(diào)用FutureTask中的run()方法脊髓。
二辫愉、FutureTask的結構和狀態(tài)機
接下來先看下FutureTask的繼承體系和數(shù)據(jù)結構
/** 封裝的Callable對象,其call方法用來執(zhí)行異步任務将硝,其實就是把callable包裝在這里恭朗,執(zhí)行call()方法之后,future可以拿到結果,在get()中返回 */
private Callable<V> callable;
/** 在FutureTask里面定義一個成員變量outcome依疼,用來裝異步任務的執(zhí)行結果 */
private Object outcome; // non-volatile, protected by state reads/writes
/** 用來執(zhí)行callable任務的線程 痰腮,判斷這個參數(shù)是否為null,可以知道當前是否有線程在執(zhí)行*/
private volatile Thread runner;
/** 線程等待節(jié)點律罢,reiber stack的一種實現(xiàn)膀值。存的是獲取任務狀態(tài)或結果的線程 */
private volatile WaitNode waiters;
/** 任務執(zhí)行狀態(tài) ,由于更新狀態(tài)需要保證線程安全误辑,因此需要用volatile和CAS保證線程安全*/
private volatile int state;
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
// 對應成員變量state的偏移地址
private static final long stateOffset;
// 對應成員變量runner的偏移地址
private static final long runnerOffset;
// 對應成員變量waiters的偏移地址
private static final long waitersOffset;</pre>
狀態(tài)
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;</pre>
狀態(tài)的轉換如下:
-
NEW -> COMPLETING -> NORMAL
:這個狀態(tài)變化表示異步任務的正常結束沧踏,其中COMPLETING
是一個瞬間臨時的過渡狀態(tài),由set
方法設置狀態(tài)的變化巾钉; -
NEW -> COMPLETING -> EXCEPTIONAL
:這個狀態(tài)變化表示異步任務執(zhí)行過程中拋出異常翘狱,由setException
方法設置狀態(tài)的變化; -
NEW -> CANCELLED
:這個狀態(tài)變化表示被取消砰苍,即調(diào)用了cancel(false)
潦匈,由cancel
方法來設置狀態(tài)變化; -
NEW -> INTERRUPTING -> INTERRUPTED
:這個狀態(tài)變化表示被中斷赚导,即調(diào)用了cancel(true)
茬缩,由cancel
方法來設置狀態(tài)變化。
三吼旧、future的run方法:當前狀態(tài)為NEW->調(diào)用call()方法->保存異郴宋或者正常執(zhí)行的結果,并進行狀態(tài)的流轉黍少。保存結果后就可以用finishComplete喚醒阻塞的線程
然后我們就可以看看run()方法執(zhí)行過程了
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
//當前狀態(tài)不是NEW或者將當前線程設置為runner對象失敗時返回寡夹,說明有其他線程執(zhí)行過了。
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//執(zhí)行call()方法
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
//拋出異常時厂置,保存結果
setException(ex);
}
if (ran)
//保存返回值
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)//這里有個疑問菩掏,什么時候state會是中斷的狀態(tài)?
handlePossibleCancellationInterrupt(s);
}
}
set()和setException負責保存結果到outcome和狀態(tài)的流轉昵济。調(diào)用finishCompletion將阻塞的線程喚醒
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
finishCompletion
方法的作用就是不管異步任務正常還是異常結束智绸,此時都要喚醒且移除線程等待鏈表的等待線程節(jié)點野揪,這個鏈表實現(xiàn)的是一個是Treiber stack
,因此喚醒(移除)的順序是"后進先出"即后面先來的線程先被先喚醒(移除)瞧栗,關于這個線程等待鏈表是如何成鏈的斯稳,后面再繼續(xù)分析。
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {//這里為什么要把waiters設置為null
for (;;) {
Thread t = q.thread;//對于waiters的結構比較困惑迹恐≌醵瑁看不懂遍歷的過程
if (t != null) {
q.thread = null;
//喚醒等待結果的線程
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
四、get方法實現(xiàn):主要是調(diào)用awaitDone方法殴边,判斷當前狀態(tài)看是否可以返回結果憎茂。對于不同的狀態(tài)有不同的處理方式,而且可以設置阻塞的時間
get()方法的實現(xiàn)锤岸,當狀態(tài)<=COMPLETING竖幔,會阻塞當前線程
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}</pre>
|
awaitDone()
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {//如果當前獲取任務執(zhí)行結果的線程被中斷,此時移除該線程WaitNode鏈表節(jié)點是偷,并拋出InterruptedException
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
/**【問】此時將當前WaitNode節(jié)點的線程置空拳氢,其中在任務結束時也會調(diào)用finishCompletion將WaitNode節(jié)點的thread置空,
* 這里為什么又要再調(diào)用一次q.thread = null;呢蛋铆?
*【答】因為若很多線程來獲取任務執(zhí)行結果馋评,在任務執(zhí)行完的那一刻,此時獲取任務的線程要么已經(jīng)在線程等待鏈表中戒职,要么
*此時還是一個孤立的WaitNode節(jié)點栗恩。在線程等待鏈表中的的所有WaitNode節(jié)點將由finishCompletion來移除(同時喚醒)所有
* 等待的WaitNode節(jié)點,以便垃圾回收洪燥;而孤立的線程WaitNode節(jié)點此時還未阻塞,因此不需要被喚醒乳乌,此時只要把其屬性置為
*null捧韵,然后其有沒有被誰引用,因此可以被GC汉操。**/
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
//任務正在執(zhí)行再来,讓出線程
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)//若當前線程等待節(jié)點還未入線程等待隊列,此時加入到該線程等待隊列的頭部
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
五磷瘤、Future的優(yōu)點和缺點
Future的優(yōu)點就是結合Callable能夠異步執(zhí)行芒篷,然后返回結果。但是有以下局限
- 不能手動完成
當你寫了一個函數(shù)采缚,用于通過一個遠程API獲取一個電子商務產(chǎn)品最新價格针炉。因為這個 API 太耗時,你把它允許在一個獨立的線程中扳抽,并且從你的函數(shù)中返回一個 Future〈叟粒現(xiàn)在假設這個API服務宕機了殖侵,這時你想通過該產(chǎn)品的最新緩存價格手工完成這個Future 。你會發(fā)現(xiàn)無法這樣做(CompletableFuture的complete方法可以手動完成)
2. Future 的結果在非阻塞的情況下镰烧,不能執(zhí)行更進一步的操作 Future 不會通知你它已經(jīng)完成了拢军,它提供了一個阻塞的 get()
方法通知你結果。你無法給 Future 植入一個回調(diào)函數(shù)怔鳖,當 Future 結果可用的時候茉唉,用該回調(diào)函數(shù)自動的調(diào)用 Future 的結果。
2. 多個 Future 不能串聯(lián)在一起組成鏈式調(diào)用 有時候你需要執(zhí)行一個長時間運行的計算任務结执,并且當計算任務完成的時候赌渣,你需要把它的計算結果發(fā)送給另外一個長時間運行的計算任務等等。你會發(fā)現(xiàn)你無法使用 Future 創(chuàng)建這樣的一個工作流昌犹。(CompletableFuture的thenApply等方法可以鏈式操作)
3. 不能組合多個 Future 的結果 假設你有10個不同的Future坚芜,你想并行的運行,然后在它們運行未完成后運行一些函數(shù)斜姥。你會發(fā)現(xiàn)你也無法使用 Future 這樣做鸿竖。(allOf函數(shù))
4. 沒有異常處理 (CompletableFuture使用 exceptionally() 回調(diào)處理異常)