簡介
隨著移動互聯(lián)網(wǎng)的蓬勃發(fā)展欠啤,業(yè)務架構(gòu)也隨之變得錯綜復雜刀诬,業(yè)務系統(tǒng)越來越多咽扇。通常,我們處理方法是異步去調(diào)取這些接口陕壹。隨著高并發(fā)系統(tǒng)越來越多质欲,異步回調(diào)模式也越來越重要。
問題就來了糠馆,如何獲取處理異步調(diào)用的結(jié)果呢 嘶伟?讓我們一起來探討一下吧~~
Java Future的異步回調(diào)
Callable接口
在聊Callable接口之前,先提一下Runnable接口又碌。Runnable接口是在Java多線程中表示線程的業(yè)務代碼的抽象接口奋早。但是Runnable沒有返回值,為了解決這個問題赠橙,Java定義了一個和Runnable類似的接口 --- Callable
接口耽装。并將業(yè)務處理方法名為call
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
Callable接口是一個范型接口,也聲明為了函數(shù)式接口
期揪。唯一的抽象方法call有返回值掉奄,返回值類型為范型形參的實際類型
初探FutureTask類
故名思意,F(xiàn)utureTask類代表一個未來執(zhí)行的任務,表示新線程執(zhí)行的操作姓建。同時也位于java.util.concurrent
包中诞仓。源碼如下:
public class FutureTask<V> implements RunnableFuture<V> {
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
}
FutureTask類就像一座搭在Callable實例與Thread線程實例之間的橋。FutureTask內(nèi)部封裝了一個Callable實例速兔,然后自身又作為Thread線程的target墅拭。
Future接口
Future接口并不復雜,主要是對并發(fā)任務的執(zhí)行及獲取其結(jié)果的一些操作涣狗。主要有三大功能谍婉。
- 判斷并發(fā)任務是否執(zhí)行完。
- 獲取并發(fā)的任務完成后的結(jié)果
- 取消并發(fā)執(zhí)行的任務
package java.util.concurrent;
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
詳細說法如下:
- V get():獲取并發(fā)任務執(zhí)行的結(jié)果镀钓。這個方法是阻塞的穗熬,如果并發(fā)任務沒有執(zhí)行完成調(diào)用此方法的線程會一直阻塞
- V get(Long timeout, TimeUtil unit):獲取并發(fā)任務執(zhí)行的結(jié)果。也是阻塞的丁溅,但是有阻塞的時間限制唤蔗,如果阻塞時間超過設定的時間,該方法將會拋出異常
- boolean isDone():獲取并發(fā)任務的狀態(tài)是否結(jié)束
- boolean isCancelled():獲取并發(fā)任務的取消狀態(tài)窟赏。如果任務被取消返回true
- boolean cancel(boolean mayInterruptIfRunning):取消并發(fā)任務的執(zhí)行
再探FutureTask類
在FutureTask類中妓柜,有一個Callable的私有成員,F(xiàn)uturetask內(nèi)部有一個run方法涯穷。這個run方法是Runable接口的抽象方法领虹,在FutureTask類的內(nèi)部提供了自己的實現(xiàn)。在Thread線程實例執(zhí)行時求豫,會將這個run方法作為target目標去異步執(zhí)行塌衰。在FutureTask內(nèi)部的run方法中實際是會執(zhí)行Callable的call方法。
執(zhí)行完后結(jié)果會保存在私有成員-- outcome屬性中
private Object outcome; // non-volatile, protected by state reads/writes
outcome負責保存結(jié)果蝠嘉。然后FutureTask通過get方法獲取這個object的值最疆,那這個FutureTask的任務也就能成功完成了。
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
// 將result保存到outcome中
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
實例
package com.zou;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
public class futureTask {
public static void main(String[] args) throws Exception {
FutureTask<Boolean> TaskA = new FutureTask<>(() -> {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println("A任務準備好了??");
} catch (Exception e) {
System.out.println("A任務出問題了");
return false;
}
System.out.println("A任務運行結(jié)束");
return true;
});
FutureTask<Boolean> TaskB = new FutureTask<>(() -> {
try {
TimeUnit.SECONDS.sleep(4);
System.out.println("B任務準備好了??");
} catch (Exception e) {
System.out.println("B任務出問題了");
return false;
}
System.out.println("B任務運行結(jié)束");
return true;
});
Thread threadA = new Thread(TaskA);
Thread threadB = new Thread(TaskB);
threadA.start();
threadB.start();
Thread.currentThread().setName("主線程");
try {
boolean a = TaskA.get();
boolean b = TaskB.get();
isReady(a, b);
} catch (Exception e) {
System.out.println("發(fā)生了中斷");
}
System.out.println("運行結(jié)束");
}
public static void isReady(boolean a, boolean b) {
if (a && b) {
System.out.println("都準備好了");
} else if (!a) {
System.out.println("A沒準備好");
} else {
System.out.println("B沒準備好");
}
}
}
運行結(jié)果為:
A任務準備好了??
A任務運行結(jié)束
B任務準備好了??
B任務運行結(jié)束
都準備好了
運行結(jié)束
要是將上面的代碼跑一下會發(fā)現(xiàn)蚤告,這里的FutureTask
類的get方法努酸,異步獲取結(jié)果的同時,主線程是阻塞的杜恰。所以可以將其歸為異步阻塞模式获诈。
異步阻塞的效率往往是比較低的,被阻塞的主線程不能干任何事心褐。并沒有實現(xiàn)非阻塞的異步結(jié)果獲取方法舔涎。如果需要用到獲取異步結(jié)果,則需要引入一些框架逗爹,先介紹一下Google的Guava框架
Guava的異步回調(diào)
Guava是Google提供的Java擴展包亡嫌,提供一種異步回調(diào)的解決方案。相關(guān)的源碼在com.google.common.util.concurrent
包中。包中很多類挟冠,都是對java.util.concurrent
能力的擴展和增強于购。比如Guava的異步任務接口ListenableFuture
, 實現(xiàn)了非阻塞獲取異步結(jié)果的功能。
對于異步回調(diào)知染,Guava主要做了以下增強:
- 引入一個新的接口
ListenableFuture
肋僧,繼承了Java的Future接口,使得Java的Future異步任務控淡,在Guava中能被監(jiān)控和獲得非阻塞異步執(zhí)行的結(jié)果嫌吠。 - 引入一個新的接口
FutureCallback
,這是一個獨立的新接口逸寓。該接口的目的是在異步任務執(zhí)行完成后,根據(jù)異步結(jié)果覆山,完成不同的回調(diào)處理竹伸,并且可以處理回調(diào)結(jié)果。
詳解FutureCallback
FutureCallback
是一個新增的接口簇宽,用來填寫異步任務執(zhí)行完后的監(jiān)聽邏輯勋篓。有兩個回調(diào)方法:
- onSuccess方法,在異步任務執(zhí)行成功后回調(diào)魏割;調(diào)用時譬嚣,異步任務的執(zhí)行結(jié)果作為onSuccess方法的參數(shù)傳入
- onFailure方法,在異步任務執(zhí)行過程中钞它,拋出異常時被回調(diào)拜银;調(diào)用時異步任務所拋出的異常,作為onFailure方法的參數(shù)傳入遭垛。
FutureCallback的源代碼如下:
public interface FutureCallback<V> {
/** Invoked with the result of the {@code Future} computation when it is successful. */
void onSuccess(@Nullable V result);
/**
* Invoked when a {@code Future} computation fails or is canceled.
*
* <p>If the future's {@link Future#get() get} method throws an {@link ExecutionException}, then
* the cause is passed to this method. Any other thrown object is passed unaltered.
*/
void onFailure(Throwable t);
}
注意尼桶,Guava的FutureCallable與Java的Callable,名字相近锯仪,但實質(zhì)不同泵督,存在本質(zhì)的區(qū)別:
- Java的Callable接口,代表的是一部執(zhí)行的邏輯
- Guava的FutureCallback接口庶喜,代表的是Callable異步邏輯執(zhí)行完之后小腊,根據(jù)成功或者失敗的兩種情況的善后工作
那么問題來了,Guava如何實現(xiàn)異步任務Callable和FutureCallable結(jié)果回調(diào)之間的監(jiān)控關(guān)系呢久窟?Guava引入了一個新接口ListenableFuture秩冈,它繼承了Java的Future接口,增強了監(jiān)控能力斥扛。
詳解ListenableFuture
Guava
的ListenableFuture
接口是對Java的Future接口的擴展漩仙,可以理解為異步任務的實例。源代碼如下:
public interface ListenableFuture<V> extends Future<V> {
void addListener(Runnable listener, Executor executor);
}
ListenableFuture
僅僅增加了一個方法 -- addListener方法。他的作用是將前一小節(jié)的FutureCallback善后回調(diào)工作队他,封裝成一個內(nèi)部的Runnable異步回調(diào)任務卷仑,在Callable異步任務完成后,回調(diào)FutureCallback進行善后處理
在實際編程中麸折,如何將FutureCallback回調(diào)邏輯綁定到異步的ListenableFuture任務呢锡凝?可以使用Guaba的Futures工具類,他有一個addCallback靜態(tài)方法垢啼,可以將FutureCallback的回調(diào)實例綁定到ListenableFuture異步任務窜锯。類似這樣的綁定:
Futures.addCallback(ListenableFuture, new FutureCallback<Object>() {
@Override
public void onSuccess(@Nullable Object result) {
}
@Override
public void onFailure(Throwable t) {
}
}, executors);
同時,問題來了芭析,Guava都是對Future異步任務的擴展锚扎,但是Guava的異步任務從何而來?
獲取ListenableFuture異步任務
要獲取Guava的ListenableFuture異步任務實例馁启,主要是通過向線程池提交Callable任務的方式來獲取驾孔。不過這里所說的線程池不是Java的線程池,而是Guava自己的線程池惯疙。
Guava線程池是對Java線程池的一種裝飾翠勉,創(chuàng)建Guava線程池的方式如下:
ExecutorService jPool = Executors.newFixedThreadPool(10);
ListeningExecutorService Pool = MoreExecutors.listeningDecorator(jPool);
先創(chuàng)建一個Java的線程池,在作為Guava線程池的參數(shù)傳進去得到Guava的線程池霉颠,然后我們通過subimt提交任務就可以獲得ListenableFuture異步任務實例了
ListenableFuture<Boolean> task = Pool.submit(() -> {
return true;
});
Futures.addCallback(task, new FutureCallback<Boolean>() {
@Override
public void onSuccess(@Nullable Boolean result) {
}
@Override
public void onFailure(Throwable t) {
}
}, jPool);
實例
package com.zou;
import com.google.common.util.concurrent.*;
import org.checkerframework.checker.nullness.qual.Nullable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class guava {
public static void main(String[] args) throws Exception {
// 創(chuàng)建 Guava線程池
ExecutorService jPool = Executors.newFixedThreadPool(10);
ListeningExecutorService Pool = MoreExecutors.listeningDecorator(jPool);
// 獲取ListenableFuture異步任務
ListenableFuture<Boolean> task = Pool.submit(() -> {
TimeUnit.SECONDS.sleep(2);
System.out.println("副線程開始執(zhí)行");
return true;
});
// 做回調(diào)
Futures.addCallback(task, new FutureCallback<Boolean>() {
@Override
public void onSuccess(@Nullable Boolean result) {
System.out.println("執(zhí)行成功");
}
@Override
public void onFailure(Throwable t) {
System.out.println("執(zhí)行失敗");
t.printStackTrace();
}
}, jPool);
TimeUnit.SECONDS.sleep(1);
System.out.println("主線程執(zhí)行完成");
}
}
結(jié)果為:
主線程執(zhí)行完成
副線程開始執(zhí)行
執(zhí)行成功
可以發(fā)現(xiàn)程序已經(jīng)是異步非阻塞了对碌。
Guava異步回調(diào)和Java的FutureTask異步回調(diào),本質(zhì)的不同在于蒿偎;
- Guava是非阻塞的異步回調(diào)朽们,調(diào)用線程是不阻塞的,可以繼續(xù)執(zhí)行自己的業(yè)務邏輯
- FutureTask是阻塞的異步回調(diào)诉位,調(diào)用線程是阻塞的华坦,在獲取異步結(jié)果的過程中,一直阻塞不从,等待異步線程返回結(jié)果
Netty的異步回調(diào)
Netty官方文檔中指出Netty的網(wǎng)絡操作都是異步的惜姐。在Netty源碼中,大量使用異步回調(diào)處理模式椿息。在Netty的業(yè)務開發(fā)層面歹袁,Netty應用的Handler處理器中的業(yè)務處理代碼,也都是異步執(zhí)行的寝优。所以条舔,了解Netty的異步回調(diào)是很有必要而且很重要的。
同樣乏矾,Netty繼承和擴展了JDK Future系列異步回調(diào)的API孟抗,定義了自身的Future系列接口和類迁杨,實現(xiàn)了異步任務的監(jiān)控,異步執(zhí)行結(jié)果的獲取凄硼。
總體來說铅协,Netty對JavaFuture異步任務擴展如下:
- 繼承Java的Future接口,得到一個新的屬于Netty自己的Future異步任務接口摊沉,該接口對原有的接口進行了增強狐史,使得Netty異步任務能夠以非阻塞的方式處理回調(diào)的結(jié)果。Netty沒有修改Future的名稱说墨,只是調(diào)整了所在的包名骏全。
- 引入了一個新接口 -- GenericFutureListener, 用于表示異步執(zhí)行完的監(jiān)聽器。Netty使用了監(jiān)聽器模式尼斧,異步任務的執(zhí)行完成后的回調(diào)邏輯抽象成了Listener監(jiān)聽器接口姜贡。可以將Netty的GenericFutureListener監(jiān)聽器接口加入Netty異步任務Future中棺棵,實現(xiàn)對異步任務執(zhí)行狀態(tài)的事件監(jiān)聽楼咳。
總體來說設計思路和Guava差不多。對應關(guān)系為:
- Netty的Future接口律秃,可以對應到Guava的ListenableFuture接口爬橡。
- Netty的GenericFutureListener接口治唤,可以對應到Guava的FutureCallback接口棒动。
詳解GenericFutureListener接口
前面提到,和Guava的FutureCallback一樣宾添,Netty新增了一個接口來封裝異步非阻塞回調(diào)的邏輯 ----- 它就是GenericFutureListener接口船惨。
GenericFutureListener位于io.netty.util.concurrent
包中,源碼如下:
public interface GenericFutureListener<F extends Future<?>> extends EventListener {
/**
* Invoked when the operation associated with the {@link Future} has been completed.
*
* @param future the source {@link Future} which called this callback
*/
void operationComplete(F future) throws Exception;
}
GenericFutureListener擁有一個回調(diào)方法:operationComplete缕陕,表示異步任務操作完成粱锐。在Future異步任務執(zhí)行完成后,將回調(diào)此方法扛邑。
這里的EventListener
是一個空接口怜浅,沒有任何抽象方法,是一個僅僅具有表示作用的接口蔬崩。
詳解Netty的Future接口
Netty的future接口對一系列的方法做了擴展恶座,對執(zhí)行的過程進行了監(jiān)控,對異步回調(diào)完成事件進行了監(jiān)聽沥阳。Netty的Future接口的源代碼如下:
public interface Future<V> extends java.util.concurrent.Future<V> {
// 增加異步任務是否完成的監(jiān)聽器
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
// 移除異步任務是否執(zhí)行完成的監(jiān)聽器
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
.........
}
Netty的Future接口一般不會直接使用跨琳,而是會使用子接口。Netty有一系列的子接口桐罕,代表不同類型的異步任務脉让,如ChannelFuture接口桂敛。
ChannelFuture子接口表示通道IO操作的異步任務;如果在通道的異步IO操作完成后溅潜,需要執(zhí)行回調(diào)操作术唬,就需要使用到ChannelFuture。
ChannelFuture的使用
在Netty網(wǎng)絡編程中伟恶,網(wǎng)絡連接通道的輸入和輸出處理都是異步進行的碴开,都會返回一個ChannelFuture接口的實例。通過返回的異步任務實例博秫,可以為它增加異步回調(diào)的監(jiān)聽器潦牛。在異步任務真正結(jié)束后,回調(diào)才執(zhí)行挡育。
Netty的網(wǎng)絡連接的異步回調(diào)巴碗,實例代碼如下:
Bootstrap bootstrap = new Bootstrap();
ChannelFuture connect = bootstrap.connect("localhost", 6666);
connect.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
// 成功
System.out.println("yes");
} else {
// 失敗
System.out.println("exception");
future.cause().printStackTrace();
}
}
});
GenericFutureListener接口在Netty中是一個基礎(chǔ)類型接口即寒。在網(wǎng)絡編程的異步回調(diào)中,一般使用Netty中提供的某個子接口母赵,如ChannelFutureListener接口。
總結(jié)
好啦凹嘲,異步回調(diào)的部分基本就到這里了师倔。隨著高并發(fā)系統(tǒng)越來越多周蹭,異步回調(diào)模式也越來越重要趋艘。我們來回憶一下主要講了那些異步回調(diào)吧~
Java自帶的異步回調(diào):
- Future作為接口凶朗,對應的FutureTask中的get方法作為結(jié)果的回調(diào)瓷胧。但此是異步阻塞的
Guava異步回調(diào):
- ListenableFuture作為接口,對應的FutureCallback做結(jié)果的異步回調(diào)棚愤。異步非阻塞
Netty異步回調(diào):
- Future作為接口(和Java自帶的同名不同包),對應的GenericFutureListener做結(jié)果的異步回調(diào)宛畦。異步非阻塞