Netty 中大量 I/O 操作都是異步執(zhí)行瓶籽,本篇博文來聊聊 Netty 中的異步編程馍佑。
Java Future 提供的異步模型
JDK 5 引入了 Future 模式仓手。Future 接口是 Java 多線程 Future 模式的實現(xiàn)兼贸,在 java.util.concurrent
包中晰绎,可以來進行異步計算寞奸。
對于異步編程呛谜,我們想要的實現(xiàn)是:提交一個任務,在任務執(zhí)行期間提交者可以做別的事情枪萄,這個任務是在異步執(zhí)行的隐岛,當任務執(zhí)行完畢通知提交者任務完成獲取結果。
那么在 Future 中是怎么實現(xiàn)的呢瓷翻?我們先看接口定義:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
我們看一個示例:
public class FutureTest {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
System.out.println("start");
Future<Integer> submit = executorService.submit(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
});
Integer value = null;
try {
value = submit.get();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(value);
System.out.println("end");
}
}
Futrue 的使用方式是:投遞一個任務到 Future 中執(zhí)行聚凹,操作完之后調用 Future#get()
或者 Future#isDone()
方法判斷是否執(zhí)行完畢。從這個邏輯上看齐帚, Future 提供的功能是:用戶線程需要主動輪詢 Future 線程是否完成當前任務妒牙,如果不通過輪詢是否完成而是同步等待獲取則會阻塞直到執(zhí)行完畢為止。所以從這里看对妄,F(xiàn)uture并不是真正的異步湘今,因為它少了一個回調,充其量只能算是一個同步非阻塞模式剪菱。
executorService.submit()
方法獲取帶返回值的 Future 結果有兩種方式:
- 一種是通過實現(xiàn)
Callable
接口摩瞎; - 第二種是中間變量返回。繼承 Future 的子類: FutureTask孝常,通過 FutureTask 返回異步結果而不是在主線程中獲绕烀恰(FutureTask 本質也是使用
Callable
進行創(chuàng)建)。
上面兩種方式的代碼就變?yōu)檫@樣:
public class FutureTest {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
System.out.println("start");
//方式1 通過 executorService 提交一個異步線程
//Future<Integer> submit = executorService.submit(new NewCallableTask());
//方式2 通過 FutureTask 包裝異步線程的返回构灸,返回結果在 FutureTask 中獲取而不是 在提交線程中
FutureTask<Integer> task = new FutureTask<>(new NewCallableTask());
executorService.submit(task);
//-------------方式2--------------
Integer value = null;
try {
value = task.get();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(value);
System.out.println("end");
}
/**
* 通過實現(xiàn) Callable 接口
*/
static class NewCallableTask implements Callable<Integer> {
@Override
public Integer call() throws Exception {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
}
}
}
一般在使用線程池創(chuàng)建線程執(zhí)行任務的時候會有兩種方式上渴,要么實現(xiàn) Runnable 接口,要么實現(xiàn) Callable 接口喜颁,它們的區(qū)別在于:
- Callable 可以在任務結束的時候提供一個返回值稠氮,Runnable 無法提供這個功能;
- Callable 的 call 方法分可以拋出異常洛巢,而 Runnable 的 run 方法不能拋出異常括袒。
而我們的異步返回自然是使用 Callable 方式。那么 Callable 是如何實現(xiàn)的呢稿茉?
從 Callable 被提交的地方入手:executorService.submit(task)
, ExecutorService 是一個接口锹锰,他的默認實現(xiàn)類是:AbstractExecutorService芥炭,我們看這里的 submit()
實現(xiàn)方式:
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
可以看到將 Callable 又包裝成了 RunnableFuture。而這個 RunnableFuture
就比較神奇恃慧,它同時繼承了 Runnable 和 Future 园蝠,既有線程的能力又有可攜帶返回值的功能。
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
所以再看 submit()
方法痢士,其實是將 RunnableFuture 線程送入線程池執(zhí)行彪薛,執(zhí)行是一個新線程,只是這個執(zhí)行的對象提供了 get()
方法來獲取執(zhí)行結果怠蹂。
那么 Callable 優(yōu)勢如何變?yōu)?RunnableFuture 的呢善延?我們看 newTaskFor(task)
方法:
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
將 Callable 包裝為 FutureTask 對象,看到這里又關聯(lián)到 FutureTask 城侧, :
public class FutureTask<V> implements RunnableFuture<V> {
}
可以看到 FutureTask 是 RunnableFuture 的子類易遣,這也就解釋了上面的示例為什么在線程池中可以提交 FutureTask 實例。
更詳細的執(zhí)行過程這里就不再分析嫌佑,重點剖析 Future 的實現(xiàn)過程豆茫,它并不是真正的異步,沒有實現(xiàn)回調屋摇。所以在Java8 中又新增了一個真正的異步函數(shù):CompletableFuture揩魂。
CompletableFuture 非阻塞異步編程模型
Java 8 中新增加了一個類:CompletableFuture,它提供了非常強大的 Future 的擴展功能炮温,最重要的是實現(xiàn)了回調的功能火脉。
使用示例:
public class CallableFutureTest {
public static void main(String[] args) {
System.out.println("start");
/**
* 異步非阻塞
*/
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(3000);
System.out.println("sleep done");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("done");
}
}
CompletableFuture.runAsync()
方法提供了異步執(zhí)行無返回值任務的功能。
ExecutorService executorService = Executors.newFixedThreadPool(100);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// do something
return "result";
}, executorService);
CompletableFuture.supplyAsync()
方法提供了異步執(zhí)行有返回值任務的功能柒啤。
CompletableFuture源碼中有四個靜態(tài)方法用來執(zhí)行異步任務:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){..}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor){..}
public static CompletableFuture<Void> runAsync(Runnable runnable){..}
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor){..}
前面兩個可以看到是帶返回值的方法忘分,后面兩個是不帶返回值的方法。同時支持傳入自定義的線程池白修,如果不傳入線程池的話默認是使用 ForkJoinPool.commonPool()
作為它的線程池執(zhí)行異步代碼。
合并兩個異步任務
如果有兩個任務需要異步執(zhí)行重斑,且后面需要對這兩個任務的結果進行合并處理兵睛,CompletableFuture 也支持這種處理:
ExecutorService executorService = Executors.newFixedThreadPool(100);
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "Task1";
}, executorService);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "Task2";
}, executorService);
CompletableFuture<String> future = future1.thenCombineAsync(future2, (task1, task2) -> {
return task1 + task2; // return "Task1Task2" String
});
通過 CompletableFuture.thenCombineAsync()
方法獲取兩個任務的結果然后進行相應的操作。
下一個依賴上一個的結果
如果第二個任務依賴第一個任務的結果:
ExecutorService executorService = Executors.newFixedThreadPool(100);
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "Task1";
}, executorService);
CompletableFuture<String> future = future1.thenComposeAsync(task1 -> {
return CompletableFuture.supplyAsync(() -> {
return task1 + "Task2"; // return "Task1Task2" String
});
}, executorService);
CompletableFuture.thenComposeAsync()
支持將第一個任務的結果傳入第二個任務中窥浪。
常用 API 介紹
- 拿到上一個任務的結果做后續(xù)操作祖很,上一個任務完成后的動作
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
上面四個方法表示在當前階段任務完成之后下一步要做什么。whenComplete 表示在當前線程內繼續(xù)做下一步漾脂,帶 Async 后綴的表示使用新線程去執(zhí)行假颇。
-
拿到上一個任務的結果做后續(xù)操作,使用 handler 來處理邏輯骨稿,可以返回與第一階段處理的返回類型不一樣的返回類型笨鸡。
public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn) public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn) public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)
Handler 與 whenComplete 的區(qū)別是 handler 是可以返回一個新的 CompletableFuture 類型的姜钳。
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> { return "hahaha"; }).handle((r, e) -> { return 1; });
-
拿到上一個任務的結果做后續(xù)操作, thenApply方法
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
注意到 thenApply 方法的參數(shù)中是沒有 Throwable形耗,這就意味著如有有異常就會立即失敗哥桥,不能在處理邏輯內處理。且 thenApply 返回的也是新的 CompletableFuture激涤。 這就是它與前面兩個的區(qū)別拟糕。
-
拿到上一個任務的結果做后續(xù)操作,可以不返回任何值倦踢,thenAccept方法
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
看這里的示例:
CompletableFuture.supplyAsync(() -> { return "result"; }).thenAccept(r -> { System.out.println(r); }).thenAccept(r -> { System.out.println(r); });
執(zhí)行完畢是不會返回任何值的送滞。
CompletableFuture 的特性提現(xiàn)在執(zhí)行完 runAsync 或者 supplyAsync 之后的操作上。CompletableFuture 能夠將回調放到與任務不同的線程中執(zhí)行辱挥,也能將回調作為繼續(xù)執(zhí)行的同步函數(shù)犁嗅,在與任務相同的線程中執(zhí)行。它避免了傳統(tǒng)回調最大的問題般贼,那就是能夠將控制流分離到不同的事件處理器中愧哟。
另外當你依賴 CompletableFuture 的計算結果才能進行下一步的時候,無需手動判斷當前計算是否完成哼蛆,可以通過 CompletableFuture 的事件監(jiān)聽自動去完成蕊梧。
Netty 中的異步編程
說 Netty 中的異步編程之前先說一個異步編程模型:Future/Promise異步模型。
future和promise起源于函數(shù)式編程和相關范例(如邏輯編程 )腮介,目的是將值(future)與其計算方式(promise)分離肥矢,從而允許更靈活地進行計算,特別是通過并行化叠洗。
Future 表示目標計算的返回值甘改,Promise 表示計算的方式,這個模型將返回結果和計算邏輯分離灭抑,目的是為了讓計算邏輯不影響返回結果十艾,從而抽象出一套異步編程模型。那計算邏輯如何與結果關聯(lián)呢腾节?它們之間的紐帶就是 callback忘嫉。
在 Netty 中的異步編程就是基于該模型來實現(xiàn)。Netty 中非常多的異步調用案腺,最簡單的例子就是我們 Server 和 Client 端啟動的例子:
Server:
Client:
Netty 中使用了一個 ChannelFuture 來實現(xiàn)異步操作庆冕,看似與 Java 中的 Future 相似,我們看一下代碼:
public interface ChannelFuture extends Future<Void> {
}
這里 ChannelFuture 繼承了一個 Future劈榨,這是 Java 中的 Future 嗎访递?跟下去發(fā)現(xiàn)并不是 JDK 的,而是 Netty 自己實現(xiàn)的同辣。該類位于:io.netty.util.concurrent
包中:
public interface Future<V> extends java.util.concurrent.Future<V> {
// 只有IO操作完成時才返回true
boolean isSuccess();
// 只有當cancel(boolean)成功取消時才返回true
boolean isCancellable();
// IO操作發(fā)生異常時拷姿,返回導致IO操作以此的原因默赂,如果沒有異常暇赤,返回null
Throwable cause();
// 向Future添加事件栅组,future完成時宴猾,會執(zhí)行這些事件,如果add時future已經完成抵乓,會立即執(zhí)行監(jiān)聽事件
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 移除監(jiān)聽事件伴挚,future完成時,不會觸發(fā)
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 等待future done
Future<V> sync() throws InterruptedException;
// 等待future done灾炭,不可打斷
Future<V> syncUninterruptibly();
// 等待future完成
Future<V> await() throws InterruptedException;
// 等待future 完成茎芋,不可打斷
Future<V> awaitUninterruptibly();
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
boolean await(long timeoutMillis) throws InterruptedException;
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
boolean awaitUninterruptibly(long timeoutMillis);
// 立刻獲得結果,如果沒有完成蜈出,返回null
V getNow();
// 如果成功取消田弥,future會失敗,導致CancellationException
@Override
boolean cancel(boolean mayInterruptIfRunning);
}
Netty 自己實現(xiàn)的 Future 繼承了 JDK 的 Future铡原,新增了 sync()
和await()
用于阻塞等待偷厦,還加了 Listeners,只要任務結束去回調 Listener 就可以了燕刻,那么我們就不一定要主動調用 isDone()
來獲取狀態(tài)只泼,或通過 get()
阻塞方法來獲取值。
Netty的 Future 與 Java 的 Future 雖然類名相同卵洗,但功能上略有不同请唱,Netty 中引入了 Promise 機制。在 Java 的 Future 中过蹂,業(yè)務邏輯為一個 Callable 或 Runnable 實現(xiàn)類十绑,該類的 call()
或 run()
執(zhí)行完畢意味著業(yè)務邏輯的完結,在 Promise 機制中酷勺,可以在業(yè)務邏輯中人工設置業(yè)務邏輯的成功與失敗本橙,這樣更加方便的監(jiān)控自己的業(yè)務邏輯。
public interface Promise<V> extends Future<V> {
// 設置future執(zhí)行結果為成功
Promise<V> setSuccess(V result);
// 嘗試設置future執(zhí)行結果為成功,返回是否設置成功
boolean trySuccess(V result);
// 設置失敗
Promise<V> setFailure(Throwable cause);
// 嘗試設置future執(zhí)行結果為失敗,返回是否設置成功
boolean tryFailure(Throwable cause);
// 設置為不能取消
boolean setUncancellable();
// 源碼中脆诉,以下為覆蓋了Future的方法勋功,例如;
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
}
Promise 接口繼承自 Future 接口库说,重點添加了上述幾個方法,可以人工設置 future 的執(zhí)行成功與失敗片择,并通知所有監(jiān)聽的 listener潜的。
從 Future 和 Promise 提供的方法來看,F(xiàn)uture 都是 get 類型的方法字管,主要用來判斷當前任務的狀態(tài)啰挪。而 Promise 中是 set 類型的方法信不,主要來對任務的狀態(tài)來進行操作。這里就體現(xiàn)出來將 結果和操作過程分離的設計亡呵。
Promise 實現(xiàn)類是DefaultPromise類抽活,該類十分重要,F(xiàn)uture 的 listener 機制也是由它實現(xiàn)的锰什,所以我們先來分析一下該類下硕。先來看一下它的重要屬性:
// 可以嵌套的Listener的最大層數(shù),可見最大值為8
private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
// result字段由使用RESULT_UPDATER更新
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER;
private static final Signal SUCCESS = Signal.valueOf(DefaultPromise.class, "SUCCESS");
// 異步操作不可取消
private static final Signal UNCANCELLABLE = Signal.valueOf(DefaultPromise.class, "UNCANCELLABLE");
// 異步操作失敗時保存異常原因
private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(
new CancellationException(), DefaultPromise.class, "cancel(...)"));
第一個套 listener汁胆,是指在 listener 的 operationComplete()
方法中梭姓,可以再次使用 future.addListener()
繼續(xù)添加 listener,Netty 限制的最大層數(shù)是8嫩码,用戶可使用系統(tǒng)變量io.netty.defaultPromise.maxListenerStackDepth
設置誉尖。
為了更好的說明,先寫了一個示例铸题,Netty 中的 Future/Promise模型是可以單獨拿出來使用的铡恕。
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Promise;
import java.util.concurrent.TimeUnit;
/**
* @author rickiyang
* @date 2020-04-19
* @Desc TODO
*/
public class PromiseTest {
public static void main(String[] args) {
PromiseTest testPromise = new PromiseTest();
Promise<String> promise = testPromise.doSomething("哈哈");
promise.addListener(future -> System.out.println(promise.get()+", something is done"));
}
/**
* 創(chuàng)建一個DefaultPromise并返回,將業(yè)務邏輯放入線程池中執(zhí)行
* @param value
* @return
*/
private Promise<String> doSomething(String value) {
NioEventLoopGroup loop = new NioEventLoopGroup();
DefaultPromise<String> promise = new DefaultPromise<>(loop.next());
loop.schedule(() -> {
try {
Thread.sleep(1000);
promise.setSuccess("執(zhí)行成功丢间。" + value);
return promise;
} catch (InterruptedException ignored) {
promise.setFailure(ignored);
}
return promise;
}, 0, TimeUnit.SECONDS);
return promise;
}
}
通過這個例子可以看到探熔,Promise 能夠在業(yè)務邏輯線程中通知 Future 成功或失敗,由于 Promise 繼承了 Netty 的 Future千劈,因此可以加入監(jiān)聽事件祭刚。而 Future 和 Promise 的好處在于,獲取到 Promise 對象后可以為其設置異步調用完成后的操作墙牌,然后立即繼續(xù)去做其他任務涡驮。
來看一下 addListener() 方法:
@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
checkNotNull(listener, "listener");
//并發(fā)控制,保證多線程情況下只有一個線程執(zhí)行添加操作
synchronized (this) {
addListener0(listener);
}
// 操作完成喜滨,通知監(jiān)聽者
if (isDone()) {
notifyListeners();
}
return this;
}
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
if (listeners == null) {
listeners = listener;
} else if (listeners instanceof DefaultFutureListeners) {
// 如果當前Promise實例持有l(wèi)isteners的是DefaultFutureListeners類型捉捅,則調用它的add()方法進行添加
((DefaultFutureListeners) listeners).add(listener);
} else {
// 步入這里說明當前Promise實例持有l(wèi)isteners為單個GenericFutureListener實例,需要轉換為DefaultFutureListeners實例
listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);
}
}
這里看到有一個全局變量listeners
虽风,我們看到他的定義:
private Object listeners;
為啥會是一個 Object 類型的對象呢棒口,不是應該是 List 或者是數(shù)組才對嘛。Netty之所以這樣設計辜膝,是因為大多數(shù)情況下 listener 只有一個无牵,用集合和數(shù)組都會造成浪費。當只有一個 listener 時厂抖,該字段為一個 GenericFutureListener 對象茎毁;當多于一個 listener 時,該字段為 DefaultFutureListeners ,可以儲存多個 listener七蜘。
我們再來看 notifyListeners()
方法:
private void notifyListeners() {
EventExecutor executor = executor();
//當前EventLoop線程需要檢查listener嵌套
if (executor.inEventLoop()) {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
//這里是當前l(fā)istener的嵌套層數(shù)
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
notifyListenersNow();
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
//外部線程直接提交給新線程執(zhí)行
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}
這里有個疑問就是為什么要設置當前的調用棧深度+1谭溉。
接著看真正執(zhí)行通知的方法:
private void notifyListenersNow() {
Object listeners;
synchronized (this) {
// 正在通知或已沒有監(jiān)聽者(外部線程刪除)直接返回
if (notifyingListeners || this.listeners == null) {
return;
}
notifyingListeners = true;
listeners = this.listeners;
this.listeners = null;
}
for (;;) {
//只有一個listener
if (listeners instanceof DefaultFutureListeners) {
notifyListeners0((DefaultFutureListeners) listeners);
} else {
//有多個listener
notifyListener0(this, (GenericFutureListener<? extends Future<V>>) listeners);
}
synchronized (this) {
if (this.listeners == null) {
// 執(zhí)行完畢且外部線程沒有再添加監(jiān)聽者
notifyingListeners = false;
return;
}
//外部線程添加了新的監(jiān)聽者繼續(xù)執(zhí)行
listeners = this.listeners;
this.listeners = null;
}
}
}
Netty 中 DefalutPromise 是一個非常常用的類,這是 Promise 實現(xiàn)的基礎橡卤。DefaultChannelPromise DefalutPromise 的子類扮念,加入了 channel 這個屬性。
Promise 目前支持兩種類型的監(jiān)聽器:
- GenericFutureListener:支持泛型的 Future 碧库;
- GenericProgressiveFutureListener:它是
GenericFutureListener
的子類柜与,支持進度表示和支持泛型的Future 監(jiān)聽器(有些場景需要多個步驟實現(xiàn),類似于進度條那樣)谈为。
為了讓 Promise 支持多個監(jiān)聽器旅挤,Netty 添加了一個默認修飾符修飾的DefaultFutureListeners
類用于保存監(jiān)聽器實例數(shù)組:
final class DefaultFutureListeners {
private GenericFutureListener<? extends Future<?>>[] listeners;
private int size;
private int progressiveSize; // the number of progressive listeners
// 這個構造相對特別,是為了讓Promise中的listeners(Object類型)實例由單個GenericFutureListener實例轉換為DefaultFutureListeners類型
@SuppressWarnings("unchecked")
DefaultFutureListeners(GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) {
listeners = new GenericFutureListener[2];
listeners[0] = first;
listeners[1] = second;
size = 2;
if (first instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
if (second instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
}
public void add(GenericFutureListener<? extends Future<?>> l) {
GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
final int size = this.size;
// 注意這里伞鲫,每次擴容數(shù)組長度是原來的2倍
if (size == listeners.length) {
this.listeners = listeners = Arrays.copyOf(listeners, size << 1);
}
// 把當前的GenericFutureListener加入數(shù)組中
listeners[size] = l;
// 監(jiān)聽器總數(shù)量加1
this.size = size + 1;
// 如果為GenericProgressiveFutureListener粘茄,則帶進度指示的監(jiān)聽器總數(shù)量加1
if (l instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
}
public void remove(GenericFutureListener<? extends Future<?>> l) {
final GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
int size = this.size;
for (int i = 0; i < size; i ++) {
if (listeners[i] == l) {
// 計算需要需要移動的監(jiān)聽器的下標
int listenersToMove = size - i - 1;
if (listenersToMove > 0) {
// listenersToMove后面的元素全部移動到數(shù)組的前端
System.arraycopy(listeners, i + 1, listeners, i, listenersToMove);
}
// 當前監(jiān)聽器總量的最后一個位置設置為null,數(shù)量減1
listeners[-- size] = null;
this.size = size;
// 如果監(jiān)聽器是GenericProgressiveFutureListener秕脓,則帶進度指示的監(jiān)聽器總數(shù)量減1
if (l instanceof GenericProgressiveFutureListener) {
progressiveSize --;
}
return;
}
}
}
// 返回監(jiān)聽器實例數(shù)組
public GenericFutureListener<? extends Future<?>>[] listeners() {
return listeners;
}
// 返回監(jiān)聽器總數(shù)量
public int size() {
return size;
}
// 返回帶進度指示的監(jiān)聽器總數(shù)量
public int progressiveSize() {
return progressiveSize;
}
}
以上就是關于 Promise 和監(jiān)聽器相關的實現(xiàn)分析柒瓣,再回到之前的啟動類,是不是還有一個 sync() 方法:
@Override
public Promise<V> sync() throws InterruptedException {
await();
rethrowIfFailed();
return this;
}
public Promise<V> await() throws InterruptedException {
// 異步操作已經完成吠架,直接返回
if (isDone()) {
return this;
}
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
// 死鎖檢測
checkDeadLock();
// 同步使修改waiters的線程只有一個
synchronized (this) {
while (!isDone()) { // 等待直到異步操作完成
incWaiters(); // ++waiters;
try {
wait(); // JDK方法
} finally {
decWaiters(); // --waiters
}
}
}
return this;
}
這里其實就是一個同步檢測當前事件是否完成的過程芙贫。
以上就是 Netty 中實現(xiàn)的 Future/Promise 異步回調機制。實現(xiàn)并不是很難懂傍药,代碼很值得學習磺平。除了 Netty 中實現(xiàn)了 Future/Promise模型,在Guava中也有相關的實現(xiàn)拐辽,大家日常使用可以看習慣引用相關的包拣挪。
Guava實現(xiàn):
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>21.0</version>
</dependency>
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
ListenableFuture<Integer> future = service.submit(new Callable<Integer>() {
public Integer call() throws Exception {
TimeUnit.SECONDS.sleep(5);
return 100;
}
});
Futures.addCallback(future, new FutureCallback<Integer>() {
public void onSuccess(Integer result) {
System.out.println("success:" + result);
}
public void onFailure(Throwable throwable) {
System.out.println("fail, e = " + throwable);
}
});
Thread.currentThread().join();