再次回顧這幅圖费就,在上一章中禾进,我們分析了Reactor的完整實現(xiàn)。由于Java NIO事件驅(qū)動的模型殿怜,要求Netty的事件處理采用異步的方式典蝌,異步處理則需要表示異步操作的結(jié)果。Future正是用來表示異步操作結(jié)果的對象头谜,F(xiàn)uture的類簽名為:
public interface Future<V>;
其中的泛型參數(shù)V即表示異步結(jié)果的類型骏掀。
5.1 總述
也許你已經(jīng)使用過JDK的Future對象,該接口的方法如下:
// 取消異步操作
boolean cancel(boolean mayInterruptIfRunning);
// 異步操作是否取消
boolean isCancelled();
// 異步操作是否完成柱告,正常終止截驮、異常、取消都是完成
boolean isDone();
// 阻塞直到取得異步操作結(jié)果
V get() throws InterruptedException, ExecutionException;
// 同上末荐,但最長阻塞時間為timeout
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
我們的第一印象會覺得這樣的設(shè)計并不壞侧纯,但仔細(xì)思考,便會發(fā)現(xiàn)問題:
(1).接口中只有isDone()方法判斷一個異步操作是否完成甲脏,但是對于完成的定義過于模糊眶熬,JDK文檔指出正常終止妹笆、拋出異常、用戶取消都會使isDone()方法返回真娜氏。在我們的使用中拳缠,我們極有可能是對這三種情況分別處理,而JDK這樣的設(shè)計不能滿足我們的需求贸弥。
(2).對于一個異步操作窟坐,我們更關(guān)心的是這個異步操作觸發(fā)或者結(jié)束后能否再執(zhí)行一系列動作。比如說绵疲,我們?yōu)g覽網(wǎng)頁時點擊一個按鈕后實現(xiàn)用戶登錄哲鸳。在javascript中,處理代碼如下:
$("#login").click(function(){
login();
});
可見在這樣的情況下盔憨,JDK中的Future便不能處理徙菠,所以,Netty擴展了JDK的Future接口郁岩,使其能解決上面的兩個問題婿奔。擴展的方法如下(類似方法只列出一個):
// 異步操作完成且正常終止
boolean isSuccess();
// 異步操作是否可以取消
boolean isCancellable();
// 異步操作失敗的原因
Throwable cause();
// 添加一個監(jiān)聽者,異步操作完成時回調(diào)问慎,類比javascript的回調(diào)函數(shù)
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
// 阻塞直到異步操作完成
Future<V> await() throws InterruptedException;
// 同上萍摊,但異步操作失敗時拋出異常
Future<V> sync() throws InterruptedException;
// 非阻塞地返回異步結(jié)果,如果尚未完成返回null
V getNow();
如果你對Future的狀態(tài)還有疑問如叼,放上代碼注釋中的ascii圖打消你的疑慮:
* +---------------------------+
* | Completed successfully |
* +---------------------------+
* +----> isDone() = true |
* +--------------------------+ | | isSuccess() = true |
* | Uncompleted | | +===========================+
* +--------------------------+ | | Completed with failure |
* | isDone() = false | | +---------------------------+
* | isSuccess() = false |----+----> isDone() = true |
* | isCancelled() = false | | | cause() = non-null |
* | cause() = null | | +===========================+
* +--------------------------+ | | Completed by cancellation |
* | +---------------------------+
* +----> isDone() = true |
* | isCancelled() = true |
* +---------------------------+
可知冰木,F(xiàn)uture對象有兩種狀態(tài)尚未完成和已完成,其中已完成又有三種狀態(tài):成功薇正、失敗片酝、用戶取消囚衔。各狀態(tài)的狀態(tài)斷言請在此圖中查找挖腰。
仔細(xì)看完上面的圖并聯(lián)系Future接口中的方法,你是不是也會和我有相同的疑問:Future接口中的方法都是getter方法而沒有setter方法练湿,也就是說這樣實現(xiàn)的Future子類的狀態(tài)是不可變的猴仑,如果我們想要變化,那該怎么辦呢肥哎?Netty提供的解決方法是:使用可寫的Future即Promise辽俗。Promise接口擴展的方法如下:
// 標(biāo)記異步操作結(jié)果為成功,如果已被設(shè)置(不管成功還是失敶鄯獭)則拋出異常IllegalStateException
Promise<V> setSuccess(V result);
// 同上崖飘,只是結(jié)果已被設(shè)置時返回False
boolean trySuccess(V result);
Promise<V> setFailure(Throwable cause);
boolean tryFailure(Throwable cause);
// 設(shè)置結(jié)果為不可取消,結(jié)果已被取消返回False
boolean setUncancellable();
需要注意的是:Promise接口繼承自Future接口杈女,它提供的setter方法與常見的setter方法大為不同朱浴。Promise從Uncompleted-->Completed的狀態(tài)轉(zhuǎn)變有且只能有一次吊圾,也就是說setSuccess和setFailure方法最多只會成功一個,此外翰蠢,在setSuccess和setFailure方法中會通知注冊到其上的監(jiān)聽者项乒。為了加深對Future和Promise的理解,我們可以將Future類比于定額發(fā)票梁沧,Promise類比于機打發(fā)票檀何。當(dāng)商戶拿到稅務(wù)局的發(fā)票時,如果是定額發(fā)票廷支,則已經(jīng)確定好金額是100還是50或其他频鉴,商戶再也不能更改;如果是機打發(fā)票恋拍,商戶相當(dāng)于拿到了一個發(fā)票模板砚殿,需要多少金額按實際情況填到模板指定處。顯然芝囤,不能兩次使用同一張機打發(fā)票打印似炎,這會使發(fā)票失效,而Promise做的更好悯姊,它使第二次調(diào)用setter方法失敗羡藐。
至此,我們從總體上了解了Future和Promise的原理悯许。我們再看一下類圖:
類圖給我們的第一印象是:繁雜仆嗦。我們抓住關(guān)鍵點:Future和Promise兩條分支,分而治之先壕。我們使用自頂向下的方法分析其實現(xiàn)細(xì)節(jié)瘩扼,使用兩條線索:
AbstractFuture<--CompleteFuture<--CompleteChannelFuture<--Succeeded/FailedChannelFuture
DefaultPromise<--DefaultChannelPromise
5.2 Future
5.2.1 AbstractFuture
AbstractFuture主要實現(xiàn)Future的get()方法,取得Future關(guān)聯(lián)的異步操作結(jié)果:
@Override
public V get() throws InterruptedException, ExecutionException {
await(); // 阻塞直到異步操作完成
Throwable cause = cause();
if (cause == null) {
return getNow(); // 成功則返回關(guān)聯(lián)結(jié)果
}
if (cause instanceof CancellationException) {
throw (CancellationException) cause; // 由用戶取消
}
throw new ExecutionException(cause); // 失敗拋出異常
}
其中的實現(xiàn)簡單明了垃僚,但關(guān)鍵調(diào)用方法的具體實現(xiàn)并沒有集绰,我們將在子類實現(xiàn)中分析。對應(yīng)的加入超時時間的get(long timeout, TimeUnit unit)實現(xiàn)也類似谆棺,不再列出栽燕。
5.2.2 CompleteFuture
Complete表示操作已完成,所以CompleteFuture表示一個異步操作已完成的結(jié)果改淑,由此可推知:該類的實例在異步操作完成時創(chuàng)建碍岔,返回給用戶,用戶則使用addListener()方法定義一個異步操作朵夏。如果你熟悉javascript蔼啦,將Listener類比于回調(diào)函數(shù)callback()可方便理解。
我們首先看其中的字段和構(gòu)造方法:
// 執(zhí)行器仰猖,執(zhí)行Listener中定義的操作
private final EventExecutor executor;
// 這有一個構(gòu)造方法捏肢,可知executor是必須的
protected CompleteFuture(EventExecutor executor) {
this.executor = executor;
}
CompleteFuture類定義了一個EventExecutor掠河,可視為一個線程,用于執(zhí)行Listener中的操作猛计。我們再看addListener()和removeListener()方法:
public Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
// 由于這是一個已完成的Future唠摹,所以立即通知Listener執(zhí)行
DefaultPromise.notifyListener(executor(), this, listener);
return this;
}
public Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener) {
// 由于已完成,Listener中的操作已完成奉瘤,沒有需要刪除的Listener
return this;
}
其中的實現(xiàn)也很簡單勾拉,我們看一下GenericFutureListener接口,其中只定義了一個方法:
// 異步操作完成是調(diào)用
void operationComplete(F future) throws Exception;
關(guān)于Listener我們再關(guān)注一下ChannelFutureListener盗温,它并沒有擴展GenericFutureListener接口藕赞,所以類似于一個標(biāo)記接口。我們看其中實現(xiàn)的三個通用ChannelFutureListener:
ChannelFutureListener CLOSE = (future) --> {
future.channel().close(); //操作完成時關(guān)閉Channel
};
ChannelFutureListener CLOSE_ON_FAILURE = (future) --> {
if (!future.isSuccess()) {
future.channel().close(); // 操作失敗時關(guān)閉Channel
}
};
ChannelFutureListener FIRE_EXCEPTION_ON_FAILURE = (future) --> {
if (!future.isSuccess()) {
// 操作失敗時觸發(fā)一個ExceptionCaught事件
future.channel().pipeline().fireExceptionCaught(future.cause());
}
};
這三個Listener對象定義了對Channel處理時常用的操作卖局,如果符合需求斧蜕,可以直接使用。
由于CompleteFuture表示一個已完成的異步操作砚偶,所以可推知sync()和await()方法都將立即返回批销。此外,可推知線程的狀態(tài)如下染坯,不再列出代碼:
isDone() = true; isCancelled() = false;
5.2.3 CompleteChannelFuture
CompleteChannelFuture的類簽名如下:
abstract class CompleteChannelFuture extends CompleteFuture<Void> implements ChannelFuture
ChannelFuture是不是覺得很親切均芽?你肯定已經(jīng)使用過ChannelFuture。ChannelFuture接口相比于Future只擴展了一個方法channel()用于取得關(guān)聯(lián)的Channel對象单鹿。CompleteChannelFuture還繼承了CompleteFuture<Void>掀宋,尖括號中的泛型表示Future關(guān)聯(lián)的結(jié)果,此結(jié)果為Void仲锄,意味著CompleteChannelFuture不關(guān)心這個特定結(jié)果即get()相關(guān)方法返回null劲妙。也就是說,我們可以將CompleteChannelFuture純粹的視為一種回調(diào)函數(shù)機制儒喊。
CompleteChannelFuture的字段只有一個:
private final Channel channel; // 關(guān)聯(lián)的Channel對象
CompleteChannelFuture的大部分方法實現(xiàn)中镣奋,只是將方法返回的Future覆蓋為ChannelFuture對象(ChannelFuture接口的要求),代碼不在列出澄惊。我們看一下executor()方法:
@Override
protected EventExecutor executor() {
EventExecutor e = super.executor(); // 構(gòu)造方法指定
if (e == null) {
return channel().eventLoop(); // 構(gòu)造方法未指定使用channel注冊到的eventLoop
} else {
return e;
}
}
5.2.4 Succeeded/FailedChannelFuture
Succeeded/FailedChannelFuture為特定的兩個異步操作結(jié)果唆途,回憶總述中關(guān)于Future狀態(tài)的講解富雅,成功意味著
Succeeded: isSuccess() == true, cause() == null;
Failed: isSuccess() == false, cause() == non-null
代碼中的實現(xiàn)也很簡單掸驱,不再列出。需要注意的是没佑,其中的構(gòu)造方法不建議用戶調(diào)用毕贼,一般使用Channel對象的方法newSucceededFuture()和newFailedFuture(Throwable)代替。
5.3 Promise
5.3.1 DefaultPromise
我們首先看其中的static字段:
// 可以嵌套的Listener的最大層數(shù)蛤奢,可見最大值為8
private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
// result字段由使用RESULT_UPDATER更新
private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER;
// 此處的Signal是Netty定義的類鬼癣,繼承自Error陶贼,異步操作成功且結(jié)果為null時設(shè)置為改值
private static final Signal SUCCESS = Signal.valueOf(DefaultPromise.class.getName() + ".SUCCESS");
// 異步操作不可取消
private static final Signal UNCANCELLABLE = Signal.valueOf(...);
// 異步操作失敗時保存異常原因
private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(...);
嵌套的Listener,是指在listener的operationComplete方法中待秃,可以再次使用future.addListener()繼續(xù)添加listener拜秧,Netty限制的最大層數(shù)是8,用戶可使用系統(tǒng)變量io.netty.defaultPromise.maxListenerStackDepth設(shè)置章郁。
再看其中的私有字段:
// 異步操作結(jié)果
private volatile Object result;
// 執(zhí)行l(wèi)istener操作的執(zhí)行器
private final EventExecutor executor;
// 監(jiān)聽者
private Object listeners;
// 阻塞等待該結(jié)果的線程數(shù)
private short waiters;
// 通知正在進行標(biāo)識
private boolean notifyingListeners;
也許你已經(jīng)注意到枉氮,listeners是一個Object類型。這似乎不合常理暖庄,一般情況下我們會使用一個集合或者一個數(shù)組聊替。Netty之所以這樣設(shè)計,是因為大多數(shù)情況下listener只有一個培廓,用集合和數(shù)組都會造成浪費惹悄。當(dāng)只有一個listener時,該字段為一個GenericFutureListener對象肩钠;當(dāng)多余一個listener時泣港,該字段為DefaultFutureListeners,可以儲存多個listener价匠。明白了這些爷速,我們分析關(guān)鍵方法addListener():
@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
synchronized (this) {
addListener0(listener); // 保證多線程情況下只有一個線程執(zhí)行添加操作
}
if (isDone()) {
notifyListeners(); // 異步操作已經(jīng)完成通知監(jiān)聽者
}
return this;
}
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
if (listeners == null) {
listeners = listener; // 只有一個
} else if (listeners instanceof DefaultFutureListeners) {
((DefaultFutureListeners) listeners).add(listener); // 大于兩個
} else {
// 從一個擴展為兩個
listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);
}
}
從代碼中可以看出,在添加Listener時霞怀,如果異步操作已經(jīng)完成惫东,則會notifyListeners():
private void notifyListeners() {
EventExecutor executor = executor();
if (executor.inEventLoop()) { //執(zhí)行線程為指定線程
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth(); // 嵌套層數(shù)
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
// 執(zhí)行前增加嵌套層數(shù)
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
notifyListenersNow();
} finally {
// 執(zhí)行完畢,無論如何都要回滾嵌套層數(shù)
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
// 外部線程則提交任務(wù)給執(zhí)行線程
safeExecute(executor, () -> { notifyListenersNow(); });
}
private static void safeExecute(EventExecutor executor, Runnable task) {
try {
executor.execute(task);
} catch (Throwable t) {
rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);
}
}
所以毙石,外部線程不能執(zhí)行監(jiān)聽者Listener中定義的操作廉沮,只能提交任務(wù)到指定Executor,其中的操作最終由指定Executor執(zhí)行徐矩。我們再看notifyListenersNow()方法:
private void notifyListenersNow() {
Object listeners;
// 此時外部線程可能會執(zhí)行添加Listener操作滞时,所以需要同步
synchronized (this) {
if (notifyingListeners || this.listeners == null) {
// 正在通知或已沒有監(jiān)聽者(外部線程刪除)直接返回
return;
}
notifyingListeners = true;
listeners = this.listeners;
this.listeners = null;
}
for (;;) {
if (listeners instanceof DefaultFutureListeners) { // 通知單個
notifyListeners0((DefaultFutureListeners) listeners);
} else { // 通知多個(遍歷集合調(diào)用單個)
notifyListener0(this, (GenericFutureListener<? extends Future<V>>) listeners);
}
synchronized (this) {
// 執(zhí)行完畢且外部線程沒有再添加監(jiān)聽者
if (this.listeners == null) {
notifyingListeners = false;
return;
}
// 外部線程添加了監(jiān)聽者繼續(xù)執(zhí)行
listeners = this.listeners;
this.listeners = null;
}
}
}
private static void notifyListener0(Future future, GenericFutureListener l) {
try {
l.operationComplete(future);
} catch (Throwable t) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
}
}
到此為止,我們分析完了Promise最重要的addListener()和notifyListener()方法滤灯。在源碼中還有static的notifyListener()方法坪稽,這些方法是CompleteFuture使用的,對于CompleteFuture鳞骤,添加監(jiān)聽者的操作不需要緩存窒百,直接執(zhí)行Listener中的方法即可,執(zhí)行線程為調(diào)用線程豫尽,相關(guān)代碼可回顧CompleteFuture篙梢。addListener()相對的removeListener()方法實現(xiàn)簡單,我們不再分析美旧。
回憶result字段渤滞,修飾符有volatile贬墩,所以使用RESULT_UPDATER更新,保證更新操作為原子操作妄呕。Promise不攜帶特定的結(jié)果(即攜帶Void)時陶舞,成功時設(shè)置為靜態(tài)字段的Signal對象SUCCESS;如果攜帶泛型參數(shù)結(jié)果绪励,則設(shè)置為泛型一致的結(jié)果吊说。對于Promise,設(shè)置成功优炬、設(shè)置失敗颁井、取消操作,三個操作至多只能調(diào)用一個且同一個方法至多生效一次蠢护,再次調(diào)用會拋出異常(set)或返回失斞疟觥(try)。這些設(shè)置方法原理相同葵硕,我們以setSuccess()為例分析:
public Promise<V> setSuccess(V result) {
if (setSuccess0(result)) {
notifyListeners(); // 可以設(shè)置結(jié)果說明異步操作已完成眉抬,故通知監(jiān)聽者
return this;
}
throw new IllegalStateException("complete already: " + this);
}
private boolean setSuccess0(V result) {
// 為空設(shè)置為Signal對象Success
return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
// 只有結(jié)果為null或者UNCANCELLABLE時才可設(shè)置且只可以設(shè)置一次
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
checkNotifyWaiters(); // 通知等待的線程
return true;
}
return false;
}
checkNotifyWaiters()方法喚醒調(diào)用await()和sync()方法等待該異步操作結(jié)果的線程,代碼如下:
private synchronized void checkNotifyWaiters() {
// 確實有等待的線程才notifyAll
if (waiters > 0) {
notifyAll(); // JDK方法
}
}
有了喚醒操作懈凹,那么sync()和await()的實現(xiàn)是怎么樣的呢蜀变?我們首先看sync()的代碼:
public Promise<V> sync() throws InterruptedException {
await();
rethrowIfFailed(); // 異步操作失敗拋出異常
return this;
}
可見,sync()和await()很類似介评,區(qū)別只是sync()調(diào)用库北,如果異步操作失敗,則會拋出異常们陆。我們接著看await()的實現(xiàn):
public Promise<V> await() throws InterruptedException {
// 異步操作已經(jīng)完成寒瓦,直接返回
if (isDone()) {
return this;
}
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
// 死鎖檢測
checkDeadLock();
// 同步使修改waiters的線程只有一個
synchronized (this) {
while (!isDone()) { // 等待直到異步操作完成
incWaiters(); // ++waiters;
try {
wait(); // JDK方法
} finally {
decWaiters(); // --waiters
}
}
}
return this;
}
其中的實現(xiàn)簡單明了,其他await()方法也類似坪仇,不再分析杂腰。我們注意其中的checkDeadLock()方法用來進行死鎖檢測:
protected void checkDeadLock() {
EventExecutor e = executor();
if (e != null && e.inEventLoop()) {
throw new BlockingOperationException(toString());
}
}
也就是說,不能在同一個線程中調(diào)用await()相關(guān)的方法椅文。為了更好的理解這句話喂很,我們使用代碼注釋中的例子來解釋。Handler中的channelRead()方法是由Channel注冊到的eventLoop執(zhí)行的皆刺,其中的Future的Executor也是這個eventLoop少辣,所以不能在channelRead()方法中調(diào)用await這一類(包括sync)方法。
// 錯誤的例子
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ChannelFuture future = ctx.channel().close();
future.awaitUninterruptibly();
// ...
}
// 正確的做法
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ChannelFuture future = ctx.channel().close();
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
// ... 使用異步操作
}
});
}
到了這里芹橡,我們已經(jīng)分析完Future和Promise的主要實現(xiàn)毒坛。剩下的DefaultChannelPromise、VoidChannelPromise實現(xiàn)都很簡單林说,我們不再分析煎殷。ProgressivePromise表示異步的進度結(jié)果,也不再進行分析腿箩。