異步回調(diào)詳解

簡介

隨著移動互聯(lián)網(wǎng)的蓬勃發(fā)展欠啤,業(yè)務架構(gòu)也隨之變得錯綜復雜刀诬,業(yè)務系統(tǒng)越來越多咽扇。通常,我們處理方法是異步去調(diào)取這些接口陕壹。隨著高并發(fā)系統(tǒng)越來越多质欲,異步回調(diào)模式也越來越重要。

問題就來了糠馆,如何獲取處理異步調(diào)用的結(jié)果呢 嘶伟?讓我們一起來探討一下吧~~

Java Future的異步回調(diào)

Callable接口

在聊Callable接口之前,先提一下Runnable接口又碌。Runnable接口是在Java多線程中表示線程的業(yè)務代碼的抽象接口奋早。但是Runnable沒有返回值,為了解決這個問題赠橙,Java定義了一個和Runnable類似的接口 --- Callable接口耽装。并將業(yè)務處理方法名為call

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

Callable接口是一個范型接口,也聲明為了函數(shù)式接口期揪。唯一的抽象方法call有返回值掉奄,返回值類型為范型形參的實際類型

初探FutureTask類

故名思意,F(xiàn)utureTask類代表一個未來執(zhí)行的任務,表示新線程執(zhí)行的操作姓建。同時也位于java.util.concurrent包中诞仓。源碼如下:

public class FutureTask<V> implements RunnableFuture<V> {
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
}

FutureTask類就像一座搭在Callable實例與Thread線程實例之間的橋。FutureTask內(nèi)部封裝了一個Callable實例速兔,然后自身又作為Thread線程的target墅拭。

Future接口

Future接口并不復雜,主要是對并發(fā)任務的執(zhí)行及獲取其結(jié)果的一些操作涣狗。主要有三大功能谍婉。

  • 判斷并發(fā)任務是否執(zhí)行完。
  • 獲取并發(fā)的任務完成后的結(jié)果
  • 取消并發(fā)執(zhí)行的任務
package java.util.concurrent;
public interface Future<V> {
  boolean cancel(boolean mayInterruptIfRunning);
  boolean isCancelled();
  boolean isDone();
  V get() throws InterruptedException, ExecutionException;
  V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

詳細說法如下:

  • V get():獲取并發(fā)任務執(zhí)行的結(jié)果镀钓。這個方法是阻塞的穗熬,如果并發(fā)任務沒有執(zhí)行完成調(diào)用此方法的線程會一直阻塞
  • V get(Long timeout, TimeUtil unit):獲取并發(fā)任務執(zhí)行的結(jié)果。也是阻塞的丁溅,但是有阻塞的時間限制唤蔗,如果阻塞時間超過設定的時間,該方法將會拋出異常
  • boolean isDone():獲取并發(fā)任務的狀態(tài)是否結(jié)束
  • boolean isCancelled():獲取并發(fā)任務的取消狀態(tài)窟赏。如果任務被取消返回true
  • boolean cancel(boolean mayInterruptIfRunning):取消并發(fā)任務的執(zhí)行

再探FutureTask類

在FutureTask類中妓柜,有一個Callable的私有成員,F(xiàn)uturetask內(nèi)部有一個run方法涯穷。這個run方法是Runable接口的抽象方法领虹,在FutureTask類的內(nèi)部提供了自己的實現(xiàn)。在Thread線程實例執(zhí)行時求豫,會將這個run方法作為target目標去異步執(zhí)行塌衰。在FutureTask內(nèi)部的run方法中實際是會執(zhí)行Callable的call方法。

執(zhí)行完后結(jié)果會保存在私有成員-- outcome屬性中

private Object outcome; // non-volatile, protected by state reads/writes

outcome負責保存結(jié)果蝠嘉。然后FutureTask通過get方法獲取這個object的值最疆,那這個FutureTask的任務也就能成功完成了。

public void run() {
  if (state != NEW ||
      !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                   null, Thread.currentThread()))
    return;
  try {
    Callable<V> c = callable;
    if (c != null && state == NEW) {
      V result;
      boolean ran;
      try {
        result = c.call();
        ran = true;
      } catch (Throwable ex) {
        result = null;
        ran = false;
        setException(ex);
      }
      if (ran)
        // 將result保存到outcome中
        set(result);
    }
  } finally {
    // runner must be non-null until state is settled to
    // prevent concurrent calls to run()
    runner = null;
    // state must be re-read after nulling runner to prevent
    // leaked interrupts
    int s = state;
    if (s >= INTERRUPTING)
      handlePossibleCancellationInterrupt(s);
  }
}

實例

package com.zou;

import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;


public class futureTask {
    public static void main(String[] args) throws Exception {
        FutureTask<Boolean> TaskA = new FutureTask<>(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
                System.out.println("A任務準備好了??");
            } catch (Exception e) {
                System.out.println("A任務出問題了");
                return false;
            }
            System.out.println("A任務運行結(jié)束");
            return true;
        });

        FutureTask<Boolean> TaskB = new FutureTask<>(() -> {
            try {
                TimeUnit.SECONDS.sleep(4);
                System.out.println("B任務準備好了??");
            } catch (Exception e) {
                System.out.println("B任務出問題了");
                return false;
            }
            System.out.println("B任務運行結(jié)束");
            return true;
        });

        Thread threadA = new Thread(TaskA);
        Thread threadB = new Thread(TaskB);

        threadA.start();
        threadB.start();

        Thread.currentThread().setName("主線程");
        try {
            boolean a = TaskA.get();
            boolean b = TaskB.get();
            isReady(a, b);
        } catch (Exception e) {
            System.out.println("發(fā)生了中斷");
        }
        System.out.println("運行結(jié)束");
    }
    public static void isReady(boolean a, boolean b) {
        if (a && b) {
            System.out.println("都準備好了");
        } else if (!a) {
            System.out.println("A沒準備好");
        } else {
            System.out.println("B沒準備好");
        }
    }
}

運行結(jié)果為:

A任務準備好了??
A任務運行結(jié)束
B任務準備好了??
B任務運行結(jié)束
都準備好了
運行結(jié)束

要是將上面的代碼跑一下會發(fā)現(xiàn)蚤告,這里的FutureTask類的get方法努酸,異步獲取結(jié)果的同時,主線程是阻塞的杜恰。所以可以將其歸為異步阻塞模式获诈。

異步阻塞的效率往往是比較低的,被阻塞的主線程不能干任何事心褐。并沒有實現(xiàn)非阻塞的異步結(jié)果獲取方法舔涎。如果需要用到獲取異步結(jié)果,則需要引入一些框架逗爹,先介紹一下Google的Guava框架

Guava的異步回調(diào)

Guava是Google提供的Java擴展包亡嫌,提供一種異步回調(diào)的解決方案。相關(guān)的源碼在com.google.common.util.concurrent包中。包中很多類挟冠,都是對java.util.concurrent能力的擴展和增強于购。比如Guava的異步任務接口ListenableFuture, 實現(xiàn)了非阻塞獲取異步結(jié)果的功能。

對于異步回調(diào)知染,Guava主要做了以下增強:

  • 引入一個新的接口ListenableFuture肋僧,繼承了Java的Future接口,使得Java的Future異步任務控淡,在Guava中能被監(jiān)控和獲得非阻塞異步執(zhí)行的結(jié)果嫌吠。
  • 引入一個新的接口FutureCallback,這是一個獨立的新接口逸寓。該接口的目的是在異步任務執(zhí)行完成后,根據(jù)異步結(jié)果覆山,完成不同的回調(diào)處理竹伸,并且可以處理回調(diào)結(jié)果

詳解FutureCallback

FutureCallback是一個新增的接口簇宽,用來填寫異步任務執(zhí)行完后的監(jiān)聽邏輯勋篓。有兩個回調(diào)方法:

  • onSuccess方法,在異步任務執(zhí)行成功后回調(diào)魏割;調(diào)用時譬嚣,異步任務的執(zhí)行結(jié)果作為onSuccess方法的參數(shù)傳入
  • onFailure方法,在異步任務執(zhí)行過程中钞它,拋出異常時被回調(diào)拜银;調(diào)用時異步任務所拋出的異常,作為onFailure方法的參數(shù)傳入遭垛。

FutureCallback的源代碼如下:

public interface FutureCallback<V> {
  /** Invoked with the result of the {@code Future} computation when it is successful. */
  void onSuccess(@Nullable V result);

  /**
   * Invoked when a {@code Future} computation fails or is canceled.
   *
   * <p>If the future's {@link Future#get() get} method throws an {@link ExecutionException}, then
   * the cause is passed to this method. Any other thrown object is passed unaltered.
   */
  void onFailure(Throwable t);
}

注意尼桶,Guava的FutureCallable與Java的Callable,名字相近锯仪,但實質(zhì)不同泵督,存在本質(zhì)的區(qū)別:

  1. Java的Callable接口,代表的是一部執(zhí)行的邏輯
  2. Guava的FutureCallback接口庶喜,代表的是Callable異步邏輯執(zhí)行完之后小腊,根據(jù)成功或者失敗的兩種情況的善后工作

那么問題來了,Guava如何實現(xiàn)異步任務Callable和FutureCallable結(jié)果回調(diào)之間的監(jiān)控關(guān)系呢久窟?Guava引入了一個新接口ListenableFuture秩冈,它繼承了Java的Future接口,增強了監(jiān)控能力斥扛。

詳解ListenableFuture

GuavaListenableFuture接口是對Java的Future接口的擴展漩仙,可以理解為異步任務的實例。源代碼如下:

public interface ListenableFuture<V> extends Future<V> {
  void addListener(Runnable listener, Executor executor);
}

ListenableFuture僅僅增加了一個方法 -- addListener方法。他的作用是將前一小節(jié)的FutureCallback善后回調(diào)工作队他,封裝成一個內(nèi)部的Runnable異步回調(diào)任務卷仑,在Callable異步任務完成后,回調(diào)FutureCallback進行善后處理

在實際編程中麸折,如何將FutureCallback回調(diào)邏輯綁定到異步的ListenableFuture任務呢锡凝?可以使用Guaba的Futures工具類,他有一個addCallback靜態(tài)方法垢啼,可以將FutureCallback的回調(diào)實例綁定到ListenableFuture異步任務窜锯。類似這樣的綁定:

Futures.addCallback(ListenableFuture, new FutureCallback<Object>() {

  @Override
  public void onSuccess(@Nullable Object result) {

  }

  @Override
  public void onFailure(Throwable t) {

  }
}, executors);

同時,問題來了芭析,Guava都是對Future異步任務的擴展锚扎,但是Guava的異步任務從何而來?

獲取ListenableFuture異步任務

要獲取Guava的ListenableFuture異步任務實例馁启,主要是通過向線程池提交Callable任務的方式來獲取驾孔。不過這里所說的線程池不是Java的線程池,而是Guava自己的線程池惯疙。

Guava線程池是對Java線程池的一種裝飾翠勉,創(chuàng)建Guava線程池的方式如下:

ExecutorService jPool = Executors.newFixedThreadPool(10);
ListeningExecutorService Pool = MoreExecutors.listeningDecorator(jPool);

先創(chuàng)建一個Java的線程池,在作為Guava線程池的參數(shù)傳進去得到Guava的線程池霉颠,然后我們通過subimt提交任務就可以獲得ListenableFuture異步任務實例了

ListenableFuture<Boolean> task = Pool.submit(() -> {
    return true;
});

Futures.addCallback(task, new FutureCallback<Boolean>() {
    @Override
    public void onSuccess(@Nullable Boolean result) {
      
    }

    @Override
    public void onFailure(Throwable t) {

    }
}, jPool);

實例

package com.zou;

import com.google.common.util.concurrent.*;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class guava {
    public static void main(String[] args) throws Exception {
        // 創(chuàng)建 Guava線程池
        ExecutorService jPool = Executors.newFixedThreadPool(10);
        ListeningExecutorService Pool = MoreExecutors.listeningDecorator(jPool);

        // 獲取ListenableFuture異步任務
        ListenableFuture<Boolean> task = Pool.submit(() -> {
            TimeUnit.SECONDS.sleep(2);
            System.out.println("副線程開始執(zhí)行");
            return true;
        });
                
        // 做回調(diào)
        Futures.addCallback(task, new FutureCallback<Boolean>() {
            @Override
            public void onSuccess(@Nullable Boolean result) {
                System.out.println("執(zhí)行成功");
            }

            @Override
            public void onFailure(Throwable t) {
                System.out.println("執(zhí)行失敗");
                t.printStackTrace();
            }
        }, jPool);

        TimeUnit.SECONDS.sleep(1);
        System.out.println("主線程執(zhí)行完成");

    }
}

結(jié)果為:

主線程執(zhí)行完成
副線程開始執(zhí)行
執(zhí)行成功

可以發(fā)現(xiàn)程序已經(jīng)是異步非阻塞了对碌。

Guava異步回調(diào)和Java的FutureTask異步回調(diào),本質(zhì)的不同在于蒿偎;

  • Guava是非阻塞的異步回調(diào)朽们,調(diào)用線程是不阻塞的,可以繼續(xù)執(zhí)行自己的業(yè)務邏輯
  • FutureTask是阻塞的異步回調(diào)诉位,調(diào)用線程是阻塞的华坦,在獲取異步結(jié)果的過程中,一直阻塞不从,等待異步線程返回結(jié)果

Netty的異步回調(diào)

Netty官方文檔中指出Netty的網(wǎng)絡操作都是異步的惜姐。在Netty源碼中,大量使用異步回調(diào)處理模式椿息。在Netty的業(yè)務開發(fā)層面歹袁,Netty應用的Handler處理器中的業(yè)務處理代碼,也都是異步執(zhí)行的寝优。所以条舔,了解Netty的異步回調(diào)是很有必要而且很重要的。

同樣乏矾,Netty繼承和擴展了JDK Future系列異步回調(diào)的API孟抗,定義了自身的Future系列接口和類迁杨,實現(xiàn)了異步任務的監(jiān)控,異步執(zhí)行結(jié)果的獲取凄硼。

總體來說铅协,Netty對JavaFuture異步任務擴展如下:

  • 繼承Java的Future接口,得到一個新的屬于Netty自己的Future異步任務接口摊沉,該接口對原有的接口進行了增強狐史,使得Netty異步任務能夠以非阻塞的方式處理回調(diào)的結(jié)果。Netty沒有修改Future的名稱说墨,只是調(diào)整了所在的包名骏全。
  • 引入了一個新接口 -- GenericFutureListener, 用于表示異步執(zhí)行完的監(jiān)聽器。Netty使用了監(jiān)聽器模式尼斧,異步任務的執(zhí)行完成后的回調(diào)邏輯抽象成了Listener監(jiān)聽器接口姜贡。可以將Netty的GenericFutureListener監(jiān)聽器接口加入Netty異步任務Future中棺棵,實現(xiàn)對異步任務執(zhí)行狀態(tài)的事件監(jiān)聽楼咳。

總體來說設計思路和Guava差不多。對應關(guān)系為:

  • Netty的Future接口律秃,可以對應到Guava的ListenableFuture接口爬橡。
  • Netty的GenericFutureListener接口治唤,可以對應到Guava的FutureCallback接口棒动。

詳解GenericFutureListener接口

前面提到,和Guava的FutureCallback一樣宾添,Netty新增了一個接口來封裝異步非阻塞回調(diào)的邏輯 ----- 它就是GenericFutureListener接口船惨。

GenericFutureListener位于io.netty.util.concurrent包中,源碼如下:

public interface GenericFutureListener<F extends Future<?>> extends EventListener {

    /**
     * Invoked when the operation associated with the {@link Future} has been completed.
     *
     * @param future  the source {@link Future} which called this callback
     */
    void operationComplete(F future) throws Exception;
}

GenericFutureListener擁有一個回調(diào)方法:operationComplete缕陕,表示異步任務操作完成粱锐。在Future異步任務執(zhí)行完成后,將回調(diào)此方法扛邑。

這里的EventListener是一個空接口怜浅,沒有任何抽象方法,是一個僅僅具有表示作用的接口蔬崩。

詳解Netty的Future接口

Netty的future接口對一系列的方法做了擴展恶座,對執(zhí)行的過程進行了監(jiān)控,對異步回調(diào)完成事件進行了監(jiān)聽沥阳。Netty的Future接口的源代碼如下:

public interface Future<V> extends java.util.concurrent.Future<V> {
  // 增加異步任務是否完成的監(jiān)聽器
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
  // 移除異步任務是否執(zhí)行完成的監(jiān)聽器
  Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
  .........
}

Netty的Future接口一般不會直接使用跨琳,而是會使用子接口。Netty有一系列的子接口桐罕,代表不同類型的異步任務脉让,如ChannelFuture接口桂敛。

ChannelFuture子接口表示通道IO操作的異步任務;如果在通道的異步IO操作完成后溅潜,需要執(zhí)行回調(diào)操作术唬,就需要使用到ChannelFuture。

ChannelFuture的使用

在Netty網(wǎng)絡編程中伟恶,網(wǎng)絡連接通道的輸入和輸出處理都是異步進行的碴开,都會返回一個ChannelFuture接口的實例。通過返回的異步任務實例博秫,可以為它增加異步回調(diào)的監(jiān)聽器潦牛。在異步任務真正結(jié)束后,回調(diào)才執(zhí)行挡育。

Netty的網(wǎng)絡連接的異步回調(diào)巴碗,實例代碼如下:

Bootstrap bootstrap = new Bootstrap();
ChannelFuture connect = bootstrap.connect("localhost", 6666);

connect.addListener(new ChannelFutureListener() {
  @Override
  public void operationComplete(ChannelFuture future) throws Exception {
    if (future.isSuccess()) {
      // 成功
      System.out.println("yes");
    } else {
      // 失敗
      System.out.println("exception");
      future.cause().printStackTrace();
    }
  }
});

GenericFutureListener接口在Netty中是一個基礎(chǔ)類型接口即寒。在網(wǎng)絡編程的異步回調(diào)中,一般使用Netty中提供的某個子接口母赵,如ChannelFutureListener接口。

總結(jié)

好啦凹嘲,異步回調(diào)的部分基本就到這里了师倔。隨著高并發(fā)系統(tǒng)越來越多周蹭,異步回調(diào)模式也越來越重要趋艘。我們來回憶一下主要講了那些異步回調(diào)吧~

Java自帶的異步回調(diào)

  • Future作為接口凶朗,對應的FutureTask中的get方法作為結(jié)果的回調(diào)瓷胧。但此是異步阻塞的

Guava異步回調(diào)

  • ListenableFuture作為接口,對應的FutureCallback做結(jié)果的異步回調(diào)棚愤。異步非阻塞

Netty異步回調(diào)

  • Future作為接口(和Java自帶的同名不同包),對應的GenericFutureListener做結(jié)果的異步回調(diào)宛畦。異步非阻塞
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末刃永,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子斯够,更是在濱河造成了極大的恐慌喧锦,老刑警劉巖抓督,帶你破解...
    沈念sama閱讀 206,968評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異阵具,居然都是意外死亡定铜,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評論 2 382
  • 文/潘曉璐 我一進店門帘皿,熙熙樓的掌柜王于貴愁眉苦臉地迎上來畸陡,“玉大人,你說我怎么就攤上這事曹动∩溃” “怎么了墓陈?”我有些...
    開封第一講書人閱讀 153,220評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長竭恬。 經(jīng)常有香客問我熬的,道長,這世上最難降的妖魔是什么岔绸? 我笑而不...
    開封第一講書人閱讀 55,416評論 1 279
  • 正文 為了忘掉前任橡伞,我火速辦了婚禮,結(jié)果婚禮上刚盈,老公的妹妹穿的比我還像新娘挂脑。我一直安慰自己欲侮,他們只是感情好肋联,可當我...
    茶點故事閱讀 64,425評論 5 374
  • 文/花漫 我一把揭開白布橄仍。 她就那樣靜靜地躺著,像睡著了一般侮繁。 火紅的嫁衣襯著肌膚如雪宪哩。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,144評論 1 285
  • 那天育勺,我揣著相機與錄音罗岖,去河邊找鬼。 笑死桑包,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的哑了。 我是一名探鬼主播,決...
    沈念sama閱讀 38,432評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼窄陡,長吁一口氣:“原來是場噩夢啊……” “哼拆火!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起币叹,我...
    開封第一講書人閱讀 37,088評論 0 261
  • 序言:老撾萬榮一對情侶失蹤模狭,失蹤者是張志新(化名)和其女友劉穎嚼鹉,沒想到半個月后九妈,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體雾鬼,經(jīng)...
    沈念sama閱讀 43,586評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,028評論 2 325
  • 正文 我和宋清朗相戀三年晶疼,在試婚紗的時候發(fā)現(xiàn)自己被綠了又憨。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片蠢莺。...
    茶點故事閱讀 38,137評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖锄弱,靈堂內(nèi)的尸體忽然破棺而出祸憋,到底是詐尸還是另有隱情,我是刑警寧澤蚯窥,帶...
    沈念sama閱讀 33,783評論 4 324
  • 正文 年R本政府宣布拦赠,位于F島的核電站,受9級特大地震影響荷鼠,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜务甥,卻給世界環(huán)境...
    茶點故事閱讀 39,343評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望态辛。 院中可真熱鬧,春花似錦炊邦、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,333評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至忍啸,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間悄晃,已是汗流浹背凿滤。 一陣腳步聲響...
    開封第一講書人閱讀 31,559評論 1 262
  • 我被黑心中介騙來泰國打工翁脆, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人鹃祖。 一個月前我還...
    沈念sama閱讀 45,595評論 2 355
  • 正文 我出身青樓恬口,卻偏偏與公主長得像,于是被迫代替她去往敵國和親祖能。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,901評論 2 345

推薦閱讀更多精彩內(nèi)容