Netty 中的異步編程Future 和 promise

Netty 中大量 I/O 操作都是異步執(zhí)行瓶籽,本篇博文來聊聊 Netty 中的異步編程馍佑。

Java Future 提供的異步模型

JDK 5 引入了 Future 模式仓手。Future 接口是 Java 多線程 Future 模式的實現(xiàn)兼贸,在 java.util.concurrent包中晰绎,可以來進行異步計算寞奸。

對于異步編程呛谜,我們想要的實現(xiàn)是:提交一個任務,在任務執(zhí)行期間提交者可以做別的事情枪萄,這個任務是在異步執(zhí)行的隐岛,當任務執(zhí)行完畢通知提交者任務完成獲取結果。

那么在 Future 中是怎么實現(xiàn)的呢瓷翻?我們先看接口定義:

public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

我們看一個示例:

public class FutureTest {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        System.out.println("start");
        Future<Integer> submit = executorService.submit(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 1;
        });
        Integer value = null;
        try {
            value = submit.get();
        } catch (Exception e) {
            e.printStackTrace();
        } 
        System.out.println(value);
        System.out.println("end");
        
    }
}

Futrue 的使用方式是:投遞一個任務到 Future 中執(zhí)行聚凹,操作完之后調用 Future#get() 或者 Future#isDone() 方法判斷是否執(zhí)行完畢。從這個邏輯上看齐帚, Future 提供的功能是:用戶線程需要主動輪詢 Future 線程是否完成當前任務妒牙,如果不通過輪詢是否完成而是同步等待獲取則會阻塞直到執(zhí)行完畢為止。所以從這里看对妄,F(xiàn)uture并不是真正的異步湘今,因為它少了一個回調,充其量只能算是一個同步非阻塞模式剪菱。

executorService.submit()方法獲取帶返回值的 Future 結果有兩種方式:

  1. 一種是通過實現(xiàn) Callable接口摩瞎;
  2. 第二種是中間變量返回。繼承 Future 的子類: FutureTask孝常,通過 FutureTask 返回異步結果而不是在主線程中獲绕烀恰(FutureTask 本質也是使用 Callable 進行創(chuàng)建)。

上面兩種方式的代碼就變?yōu)檫@樣:

public class FutureTest {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        System.out.println("start");
        //方式1 通過 executorService 提交一個異步線程
        //Future<Integer> submit = executorService.submit(new NewCallableTask());

        //方式2 通過 FutureTask 包裝異步線程的返回构灸,返回結果在 FutureTask 中獲取而不是 在提交線程中
        FutureTask<Integer> task = new FutureTask<>(new NewCallableTask());
        executorService.submit(task);
        //-------------方式2--------------

        Integer value = null;
        try {
            value = task.get();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(value);
        System.out.println("end");

    }

    /**
     * 通過實現(xiàn) Callable 接口
     */
     static class NewCallableTask implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 1;
        }
    }

}

一般在使用線程池創(chuàng)建線程執(zhí)行任務的時候會有兩種方式上渴,要么實現(xiàn) Runnable 接口,要么實現(xiàn) Callable 接口喜颁,它們的區(qū)別在于:

  1. Callable 可以在任務結束的時候提供一個返回值稠氮,Runnable 無法提供這個功能;
  2. Callable 的 call 方法分可以拋出異常洛巢,而 Runnable 的 run 方法不能拋出異常括袒。

而我們的異步返回自然是使用 Callable 方式。那么 Callable 是如何實現(xiàn)的呢稿茉?

從 Callable 被提交的地方入手:executorService.submit(task), ExecutorService 是一個接口锹锰,他的默認實現(xiàn)類是:AbstractExecutorService芥炭,我們看這里的 submit()實現(xiàn)方式:

public <T> Future<T> submit(Callable<T> task) {
  if (task == null) throw new NullPointerException();
  RunnableFuture<T> ftask = newTaskFor(task);
  execute(ftask);
  return ftask;
}

可以看到將 Callable 又包裝成了 RunnableFuture。而這個 RunnableFuture 就比較神奇恃慧,它同時繼承了 Runnable 和 Future 园蝠,既有線程的能力又有可攜帶返回值的功能。

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

所以再看 submit()方法痢士,其實是將 RunnableFuture 線程送入線程池執(zhí)行彪薛,執(zhí)行是一個新線程,只是這個執(zhí)行的對象提供了 get()方法來獲取執(zhí)行結果怠蹂。

那么 Callable 優(yōu)勢如何變?yōu)?RunnableFuture 的呢善延?我們看 newTaskFor(task)方法:

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
  return new FutureTask<T>(callable);
}

將 Callable 包裝為 FutureTask 對象,看到這里又關聯(lián)到 FutureTask 城侧, :

public class FutureTask<V> implements RunnableFuture<V> {
  
}

可以看到 FutureTask 是 RunnableFuture 的子類易遣,這也就解釋了上面的示例為什么在線程池中可以提交 FutureTask 實例。

更詳細的執(zhí)行過程這里就不再分析嫌佑,重點剖析 Future 的實現(xiàn)過程豆茫,它并不是真正的異步,沒有實現(xiàn)回調屋摇。所以在Java8 中又新增了一個真正的異步函數(shù):CompletableFuture揩魂。

CompletableFuture 非阻塞異步編程模型

Java 8 中新增加了一個類:CompletableFuture,它提供了非常強大的 Future 的擴展功能炮温,最重要的是實現(xiàn)了回調的功能火脉。

使用示例:

public class CallableFutureTest {

        
    public static void main(String[] args) {
        System.out.println("start");
        /**
         * 異步非阻塞
         */
        CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(3000);
                System.out.println("sleep done");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        
        try {
            Thread.sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        System.out.println("done");
    }
}

CompletableFuture.runAsync()方法提供了異步執(zhí)行無返回值任務的功能。

ExecutorService executorService = Executors.newFixedThreadPool(100);

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // do something
    return "result";
}, executorService);

CompletableFuture.supplyAsync()方法提供了異步執(zhí)行有返回值任務的功能柒啤。

CompletableFuture源碼中有四個靜態(tài)方法用來執(zhí)行異步任務:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){..}

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor){..}

public static CompletableFuture<Void> runAsync(Runnable runnable){..}

public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor){..}

前面兩個可以看到是帶返回值的方法忘分,后面兩個是不帶返回值的方法。同時支持傳入自定義的線程池白修,如果不傳入線程池的話默認是使用 ForkJoinPool.commonPool()作為它的線程池執(zhí)行異步代碼。

合并兩個異步任務

如果有兩個任務需要異步執(zhí)行重斑,且后面需要對這兩個任務的結果進行合并處理兵睛,CompletableFuture 也支持這種處理:

ExecutorService executorService = Executors.newFixedThreadPool(100);

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    return "Task1";
}, executorService);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    return "Task2";
}, executorService);
CompletableFuture<String> future = future1.thenCombineAsync(future2, (task1, task2) -> {
    return task1 + task2; // return "Task1Task2" String
});

通過 CompletableFuture.thenCombineAsync()方法獲取兩個任務的結果然后進行相應的操作。

下一個依賴上一個的結果

如果第二個任務依賴第一個任務的結果:

ExecutorService executorService = Executors.newFixedThreadPool(100);

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    return "Task1";
}, executorService);
CompletableFuture<String> future = future1.thenComposeAsync(task1 -> {
    return CompletableFuture.supplyAsync(() -> {
        return task1 + "Task2"; // return "Task1Task2" String
    });
}, executorService);

CompletableFuture.thenComposeAsync()支持將第一個任務的結果傳入第二個任務中窥浪。

常用 API 介紹

  1. 拿到上一個任務的結果做后續(xù)操作祖很,上一個任務完成后的動作
public CompletableFuture<T>     whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)

上面四個方法表示在當前階段任務完成之后下一步要做什么。whenComplete 表示在當前線程內繼續(xù)做下一步漾脂,帶 Async 后綴的表示使用新線程去執(zhí)行假颇。

  1. 拿到上一個任務的結果做后續(xù)操作,使用 handler 來處理邏輯骨稿,可以返回與第一階段處理的返回類型不一樣的返回類型笨鸡。

    public <U> CompletableFuture<U>  handle(BiFunction<? super T,Throwable,? extends U> fn)
    public <U> CompletableFuture<U>  handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
    public <U> CompletableFuture<U>  handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)
    

    Handler 與 whenComplete 的區(qū)別是 handler 是可以返回一個新的 CompletableFuture 類型的姜钳。

    CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
        return "hahaha";
    }).handle((r, e) -> {
        return 1;
    });
    
  2. 拿到上一個任務的結果做后續(xù)操作, thenApply方法

    public <U> CompletableFuture<U>  thenApply(Function<? super T,? extends U> fn)
    public <U> CompletableFuture<U>  thenApplyAsync(Function<? super T,? extends U> fn)
    public <U> CompletableFuture<U>  thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
    

    注意到 thenApply 方法的參數(shù)中是沒有 Throwable形耗,這就意味著如有有異常就會立即失敗哥桥,不能在處理邏輯內處理。且 thenApply 返回的也是新的 CompletableFuture激涤。 這就是它與前面兩個的區(qū)別拟糕。

  3. 拿到上一個任務的結果做后續(xù)操作,可以不返回任何值倦踢,thenAccept方法

    public CompletableFuture<Void>  thenAccept(Consumer<? super T> action)
    public CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action)
    public CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action, Executor executor)
    

    看這里的示例:

    CompletableFuture.supplyAsync(() -> {
      return "result";
    }).thenAccept(r -> {
      System.out.println(r);
    }).thenAccept(r -> {
      System.out.println(r);
    });
    

    執(zhí)行完畢是不會返回任何值的送滞。

CompletableFuture 的特性提現(xiàn)在執(zhí)行完 runAsync 或者 supplyAsync 之后的操作上。CompletableFuture 能夠將回調放到與任務不同的線程中執(zhí)行辱挥,也能將回調作為繼續(xù)執(zhí)行的同步函數(shù)犁嗅,在與任務相同的線程中執(zhí)行。它避免了傳統(tǒng)回調最大的問題般贼,那就是能夠將控制流分離到不同的事件處理器中愧哟。

另外當你依賴 CompletableFuture 的計算結果才能進行下一步的時候,無需手動判斷當前計算是否完成哼蛆,可以通過 CompletableFuture 的事件監(jiān)聽自動去完成蕊梧。

Netty 中的異步編程

說 Netty 中的異步編程之前先說一個異步編程模型:Future/Promise異步模型。

future和promise起源于函數(shù)式編程和相關范例(如邏輯編程 )腮介,目的是將值(future)與其計算方式(promise)分離肥矢,從而允許更靈活地進行計算,特別是通過并行化叠洗。

Future 表示目標計算的返回值甘改,Promise 表示計算的方式,這個模型將返回結果和計算邏輯分離灭抑,目的是為了讓計算邏輯不影響返回結果十艾,從而抽象出一套異步編程模型。那計算邏輯如何與結果關聯(lián)呢腾节?它們之間的紐帶就是 callback忘嫉。

引用自:https://zh.wikipedia.org/wiki/Future%E4%B8%8Epromise

在 Netty 中的異步編程就是基于該模型來實現(xiàn)。Netty 中非常多的異步調用案腺,最簡單的例子就是我們 Server 和 Client 端啟動的例子:

Server:

1.png

Client:

2.png

Netty 中使用了一個 ChannelFuture 來實現(xiàn)異步操作庆冕,看似與 Java 中的 Future 相似,我們看一下代碼:

public interface ChannelFuture extends Future<Void> {
  
}

這里 ChannelFuture 繼承了一個 Future劈榨,這是 Java 中的 Future 嗎访递?跟下去發(fā)現(xiàn)并不是 JDK 的,而是 Netty 自己實現(xiàn)的同辣。該類位于:io.netty.util.concurrent包中:

public interface Future<V> extends java.util.concurrent.Future<V> {
  
  // 只有IO操作完成時才返回true
  boolean isSuccess();
  // 只有當cancel(boolean)成功取消時才返回true
  boolean isCancellable();
  // IO操作發(fā)生異常時拷姿,返回導致IO操作以此的原因默赂,如果沒有異常暇赤,返回null
  Throwable cause();
  // 向Future添加事件栅组,future完成時宴猾,會執(zhí)行這些事件,如果add時future已經完成抵乓,會立即執(zhí)行監(jiān)聽事件
  Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
  Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
  // 移除監(jiān)聽事件伴挚,future完成時,不會觸發(fā)
  Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
  Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
  // 等待future done
  Future<V> sync() throws InterruptedException;
  // 等待future done灾炭,不可打斷
  Future<V> syncUninterruptibly();
  // 等待future完成
  Future<V> await() throws InterruptedException;
  // 等待future 完成茎芋,不可打斷
  Future<V> awaitUninterruptibly();
  boolean await(long timeout, TimeUnit unit) throws InterruptedException;
  boolean await(long timeoutMillis) throws InterruptedException;
  boolean awaitUninterruptibly(long timeout, TimeUnit unit);
  boolean awaitUninterruptibly(long timeoutMillis);
  // 立刻獲得結果,如果沒有完成蜈出,返回null
  V getNow();
  // 如果成功取消田弥,future會失敗,導致CancellationException
  @Override
  boolean cancel(boolean mayInterruptIfRunning);
  
}

Netty 自己實現(xiàn)的 Future 繼承了 JDK 的 Future铡原,新增了 sync()await() 用于阻塞等待偷厦,還加了 Listeners,只要任務結束去回調 Listener 就可以了燕刻,那么我們就不一定要主動調用 isDone()來獲取狀態(tài)只泼,或通過 get()阻塞方法來獲取值。

Netty的 Future 與 Java 的 Future 雖然類名相同卵洗,但功能上略有不同请唱,Netty 中引入了 Promise 機制。在 Java 的 Future 中过蹂,業(yè)務邏輯為一個 Callable 或 Runnable 實現(xiàn)類十绑,該類的 call()run()執(zhí)行完畢意味著業(yè)務邏輯的完結,在 Promise 機制中酷勺,可以在業(yè)務邏輯中人工設置業(yè)務邏輯的成功與失敗本橙,這樣更加方便的監(jiān)控自己的業(yè)務邏輯。

public interface Promise<V> extends Future<V> {
    // 設置future執(zhí)行結果為成功
    Promise<V> setSuccess(V result);
    
    // 嘗試設置future執(zhí)行結果為成功,返回是否設置成功
    boolean trySuccess(V result);
    // 設置失敗
    Promise<V> setFailure(Throwable cause);
    // 嘗試設置future執(zhí)行結果為失敗,返回是否設置成功 
    boolean tryFailure(Throwable cause);
    // 設置為不能取消
    boolean setUncancellable();
    
    // 源碼中脆诉,以下為覆蓋了Future的方法勋功,例如;
    
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    
    @Override
    Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
}

Promise 接口繼承自 Future 接口库说,重點添加了上述幾個方法,可以人工設置 future 的執(zhí)行成功與失敗片择,并通知所有監(jiān)聽的 listener潜的。

從 Future 和 Promise 提供的方法來看,F(xiàn)uture 都是 get 類型的方法字管,主要用來判斷當前任務的狀態(tài)啰挪。而 Promise 中是 set 類型的方法信不,主要來對任務的狀態(tài)來進行操作。這里就體現(xiàn)出來將 結果和操作過程分離的設計亡呵。

Promise 實現(xiàn)類是DefaultPromise類抽活,該類十分重要,F(xiàn)uture 的 listener 機制也是由它實現(xiàn)的锰什,所以我們先來分析一下該類下硕。先來看一下它的重要屬性:

// 可以嵌套的Listener的最大層數(shù),可見最大值為8
private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
                                                           SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
// result字段由使用RESULT_UPDATER更新
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER;
private static final Signal SUCCESS = Signal.valueOf(DefaultPromise.class, "SUCCESS");
// 異步操作不可取消
private static final Signal UNCANCELLABLE = Signal.valueOf(DefaultPromise.class, "UNCANCELLABLE");
// 異步操作失敗時保存異常原因
private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(
  new CancellationException(), DefaultPromise.class, "cancel(...)"));

第一個套 listener汁胆,是指在 listener 的 operationComplete() 方法中梭姓,可以再次使用 future.addListener() 繼續(xù)添加 listener,Netty 限制的最大層數(shù)是8嫩码,用戶可使用系統(tǒng)變量io.netty.defaultPromise.maxListenerStackDepth設置誉尖。

為了更好的說明,先寫了一個示例铸题,Netty 中的 Future/Promise模型是可以單獨拿出來使用的铡恕。


import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Promise;

import java.util.concurrent.TimeUnit;

/**
 * @author rickiyang
 * @date 2020-04-19
 * @Desc TODO
 */
public class PromiseTest {

    public static void main(String[] args) {
        PromiseTest testPromise = new PromiseTest();
        Promise<String> promise = testPromise.doSomething("哈哈");
        promise.addListener(future -> System.out.println(promise.get()+", something is done"));

    }

    /**
     * 創(chuàng)建一個DefaultPromise并返回,將業(yè)務邏輯放入線程池中執(zhí)行
     * @param value
     * @return
     */
    private Promise<String> doSomething(String value) {
        NioEventLoopGroup loop = new NioEventLoopGroup();
        DefaultPromise<String> promise = new DefaultPromise<>(loop.next());
        loop.schedule(() -> {
            try {
                Thread.sleep(1000);
                promise.setSuccess("執(zhí)行成功丢间。" + value);
                return promise;
            } catch (InterruptedException ignored) {
                promise.setFailure(ignored);
            }
            return promise;
        }, 0, TimeUnit.SECONDS);
        return promise;
    }

}

通過這個例子可以看到探熔,Promise 能夠在業(yè)務邏輯線程中通知 Future 成功或失敗,由于 Promise 繼承了 Netty 的 Future千劈,因此可以加入監(jiān)聽事件祭刚。而 Future 和 Promise 的好處在于,獲取到 Promise 對象后可以為其設置異步調用完成后的操作墙牌,然后立即繼續(xù)去做其他任務涡驮。

來看一下 addListener() 方法:

@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
  checkNotNull(listener, "listener");
    //并發(fā)控制,保證多線程情況下只有一個線程執(zhí)行添加操作
  synchronized (this) {
    addListener0(listener);
  }
    // 操作完成喜滨,通知監(jiān)聽者
  if (isDone()) {
    notifyListeners();
  }

  return this;
}



private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
  if (listeners == null) {
    listeners = listener;
  } else if (listeners instanceof DefaultFutureListeners) {
    // 如果當前Promise實例持有l(wèi)isteners的是DefaultFutureListeners類型捉捅,則調用它的add()方法進行添加
    ((DefaultFutureListeners) listeners).add(listener);
  } else {
    // 步入這里說明當前Promise實例持有l(wèi)isteners為單個GenericFutureListener實例,需要轉換為DefaultFutureListeners實例
    listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);
  }
}

這里看到有一個全局變量listeners虽风,我們看到他的定義:

private Object listeners;

為啥會是一個 Object 類型的對象呢棒口,不是應該是 List 或者是數(shù)組才對嘛。Netty之所以這樣設計辜膝,是因為大多數(shù)情況下 listener 只有一個无牵,用集合和數(shù)組都會造成浪費。當只有一個 listener 時厂抖,該字段為一個 GenericFutureListener 對象茎毁;當多于一個 listener 時,該字段為 DefaultFutureListeners ,可以儲存多個 listener七蜘。

我們再來看 notifyListeners() 方法:

private void notifyListeners() {
  EventExecutor executor = executor();
  //當前EventLoop線程需要檢查listener嵌套
  if (executor.inEventLoop()) {
    final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
    //這里是當前l(fā)istener的嵌套層數(shù)
    final int stackDepth = threadLocals.futureListenerStackDepth();
    if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
      threadLocals.setFutureListenerStackDepth(stackDepth + 1);
      try {
        notifyListenersNow();
      } finally {
        threadLocals.setFutureListenerStackDepth(stackDepth);
      }
      return;
    }
  }
    //外部線程直接提交給新線程執(zhí)行
  safeExecute(executor, new Runnable() {
    @Override
    public void run() {
      notifyListenersNow();
    }
  });
}

這里有個疑問就是為什么要設置當前的調用棧深度+1谭溉。

接著看真正執(zhí)行通知的方法:

private void notifyListenersNow() {
  Object listeners;
  synchronized (this) {
     // 正在通知或已沒有監(jiān)聽者(外部線程刪除)直接返回
    if (notifyingListeners || this.listeners == null) {
      return;
    }
    notifyingListeners = true;
    listeners = this.listeners;
    this.listeners = null;
  }
  for (;;) {
    //只有一個listener
    if (listeners instanceof DefaultFutureListeners) {
      notifyListeners0((DefaultFutureListeners) listeners);
    } else {
      //有多個listener
      notifyListener0(this, (GenericFutureListener<? extends Future<V>>) listeners);
    }
    synchronized (this) {
      if (this.listeners == null) {
        // 執(zhí)行完畢且外部線程沒有再添加監(jiān)聽者
        notifyingListeners = false;
        return;
      }
      //外部線程添加了新的監(jiān)聽者繼續(xù)執(zhí)行
      listeners = this.listeners;
      this.listeners = null;
    }
  }
}

Netty 中 DefalutPromise 是一個非常常用的類,這是 Promise 實現(xiàn)的基礎橡卤。DefaultChannelPromise DefalutPromise 的子類扮念,加入了 channel 這個屬性。

Promise 目前支持兩種類型的監(jiān)聽器:

  • GenericFutureListener:支持泛型的 Future 碧库;
  • GenericProgressiveFutureListener:它是GenericFutureListener的子類柜与,支持進度表示和支持泛型的Future 監(jiān)聽器(有些場景需要多個步驟實現(xiàn),類似于進度條那樣)谈为。

為了讓 Promise 支持多個監(jiān)聽器旅挤,Netty 添加了一個默認修飾符修飾的DefaultFutureListeners類用于保存監(jiān)聽器實例數(shù)組:

final class DefaultFutureListeners {

    private GenericFutureListener<? extends Future<?>>[] listeners;
    private int size;
    private int progressiveSize; // the number of progressive listeners
    
    // 這個構造相對特別,是為了讓Promise中的listeners(Object類型)實例由單個GenericFutureListener實例轉換為DefaultFutureListeners類型
    @SuppressWarnings("unchecked")
    DefaultFutureListeners(GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) {
        listeners = new GenericFutureListener[2];
        listeners[0] = first;
        listeners[1] = second;
        size = 2;
        if (first instanceof GenericProgressiveFutureListener) {
            progressiveSize ++;
        }
        if (second instanceof GenericProgressiveFutureListener) {
            progressiveSize ++;
        }
    }

    public void add(GenericFutureListener<? extends Future<?>> l) {
        GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
        final int size = this.size;
        // 注意這里伞鲫,每次擴容數(shù)組長度是原來的2倍
        if (size == listeners.length) {
            this.listeners = listeners = Arrays.copyOf(listeners, size << 1);
        }
        // 把當前的GenericFutureListener加入數(shù)組中
        listeners[size] = l;
        // 監(jiān)聽器總數(shù)量加1
        this.size = size + 1;
        // 如果為GenericProgressiveFutureListener粘茄,則帶進度指示的監(jiān)聽器總數(shù)量加1
        if (l instanceof GenericProgressiveFutureListener) {
            progressiveSize ++;
        }
    }

    public void remove(GenericFutureListener<? extends Future<?>> l) {
        final GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
        int size = this.size;
        for (int i = 0; i < size; i ++) {
            if (listeners[i] == l) {
                // 計算需要需要移動的監(jiān)聽器的下標
                int listenersToMove = size - i - 1;
                if (listenersToMove > 0) {
                    // listenersToMove后面的元素全部移動到數(shù)組的前端
                    System.arraycopy(listeners, i + 1, listeners, i, listenersToMove);
                }
                // 當前監(jiān)聽器總量的最后一個位置設置為null,數(shù)量減1
                listeners[-- size] = null;
                this.size = size;
                // 如果監(jiān)聽器是GenericProgressiveFutureListener秕脓,則帶進度指示的監(jiān)聽器總數(shù)量減1
                if (l instanceof GenericProgressiveFutureListener) {
                    progressiveSize --;
                }
                return;
            }
        }
    }
    
    // 返回監(jiān)聽器實例數(shù)組
    public GenericFutureListener<? extends Future<?>>[] listeners() {
        return listeners;
    }
    
    // 返回監(jiān)聽器總數(shù)量
    public int size() {
        return size;
    }
    
    // 返回帶進度指示的監(jiān)聽器總數(shù)量
    public int progressiveSize() {
        return progressiveSize;
    }
}

以上就是關于 Promise 和監(jiān)聽器相關的實現(xiàn)分析柒瓣,再回到之前的啟動類,是不是還有一個 sync() 方法:

@Override
public Promise<V> sync() throws InterruptedException {
  await();
  rethrowIfFailed();
  return this;
}


public Promise<V> await() throws InterruptedException {
  // 異步操作已經完成吠架,直接返回
  if (isDone()) {
    return this;    
  }
  if (Thread.interrupted()) {
    throw new InterruptedException(toString());
  }
  // 死鎖檢測
  checkDeadLock();
  // 同步使修改waiters的線程只有一個
  synchronized (this) {
    while (!isDone()) { // 等待直到異步操作完成
      incWaiters();   // ++waiters;
      try {
        wait(); // JDK方法
      } finally {
        decWaiters(); // --waiters
      }
    }
  }
  return this;
}

這里其實就是一個同步檢測當前事件是否完成的過程芙贫。

以上就是 Netty 中實現(xiàn)的 Future/Promise 異步回調機制。實現(xiàn)并不是很難懂傍药,代碼很值得學習磺平。除了 Netty 中實現(xiàn)了 Future/Promise模型,在Guava中也有相關的實現(xiàn)拐辽,大家日常使用可以看習慣引用相關的包拣挪。

Guava實現(xiàn):

<dependency>
   <groupId>com.google.guava</groupId>
   <artifactId>guava</artifactId>
   <version>21.0</version>
</dependency>
  
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
ListenableFuture<Integer> future = service.submit(new Callable<Integer>() {
    public Integer call() throws Exception {
        TimeUnit.SECONDS.sleep(5);
        return 100;
    }
});
Futures.addCallback(future, new FutureCallback<Integer>() {
    public void onSuccess(Integer result) {
        System.out.println("success:" + result);
    }

    public void onFailure(Throwable throwable) {
        System.out.println("fail, e = " + throwable);
    }
});

Thread.currentThread().join();
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市俱诸,隨后出現(xiàn)的幾起案子菠劝,更是在濱河造成了極大的恐慌,老刑警劉巖睁搭,帶你破解...
    沈念sama閱讀 219,039評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件赶诊,死亡現(xiàn)場離奇詭異,居然都是意外死亡园骆,警方通過查閱死者的電腦和手機舔痪,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,426評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來锌唾,“玉大人锄码,你說我怎么就攤上這事。” “怎么了巍耗?”我有些...
    開封第一講書人閱讀 165,417評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長渐排。 經常有香客問我炬太,道長,這世上最難降的妖魔是什么驯耻? 我笑而不...
    開封第一講書人閱讀 58,868評論 1 295
  • 正文 為了忘掉前任亲族,我火速辦了婚禮,結果婚禮上可缚,老公的妹妹穿的比我還像新娘霎迫。我一直安慰自己,他們只是感情好帘靡,可當我...
    茶點故事閱讀 67,892評論 6 392
  • 文/花漫 我一把揭開白布知给。 她就那樣靜靜地躺著,像睡著了一般描姚。 火紅的嫁衣襯著肌膚如雪涩赢。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,692評論 1 305
  • 那天轩勘,我揣著相機與錄音筒扒,去河邊找鬼。 笑死绊寻,一個胖子當著我的面吹牛花墩,可吹牛的內容都是我干的。 我是一名探鬼主播澄步,決...
    沈念sama閱讀 40,416評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼冰蘑,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了驮俗?” 一聲冷哼從身側響起懂缕,我...
    開封第一講書人閱讀 39,326評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎王凑,沒想到半個月后搪柑,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經...
    沈念sama閱讀 45,782評論 1 316
  • 正文 獨居荒郊野嶺守林人離奇死亡索烹,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,957評論 3 337
  • 正文 我和宋清朗相戀三年工碾,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片百姓。...
    茶點故事閱讀 40,102評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡渊额,死狀恐怖,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情旬迹,我是刑警寧澤火惊,帶...
    沈念sama閱讀 35,790評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站奔垦,受9級特大地震影響屹耐,放射性物質發(fā)生泄漏。R本人自食惡果不足惜椿猎,卻給世界環(huán)境...
    茶點故事閱讀 41,442評論 3 331
  • 文/蒙蒙 一惶岭、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧犯眠,春花似錦按灶、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,996評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至嗜浮,卻和暖如春羡亩,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背危融。 一陣腳步聲響...
    開封第一講書人閱讀 33,113評論 1 272
  • 我被黑心中介騙來泰國打工畏铆, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人吉殃。 一個月前我還...
    沈念sama閱讀 48,332評論 3 373
  • 正文 我出身青樓辞居,卻偏偏與公主長得像,于是被迫代替她去往敵國和親蛋勺。 傳聞我的和親對象是個殘疾皇子瓦灶,可洞房花燭夜當晚...
    茶點故事閱讀 45,044評論 2 355