Future
Future是Java5增加的類荣恐,它用來描述一個(gè)異步計(jì)算的結(jié)果叠穆。你可以使用 isDone 方法檢查計(jì)算是否完成硼被,或者使用 get 方法阻塞住調(diào)用線程嚷硫,直到計(jì)算完成返回結(jié)果仔掸。你也可以使用 cancel 方法停止任務(wù)的執(zhí)行起暮。下面來一個(gè)栗子:
public class FutureDemo {
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(10);
Future<Integer> f = es.submit(() ->{
Thread.sleep(10000);
// 結(jié)果
return 100;
});
// do something
Integer result = f.get();
System.out.println(result);
// while (f.isDone()) {
// System.out.println(result);
// }
}
}
在這個(gè)例子中,我們往線程池中提交了一個(gè)任務(wù)并立即返回了一個(gè)Future對(duì)象柏腻,接著可以做一些其他操作五嫂,最后利用它的 get 方法阻塞等待結(jié)果或 isDone 方法輪詢等待結(jié)果(關(guān)于Future的原理可以參考之前的文章:【并發(fā)編程】Future模式及JDK中的實(shí)現(xiàn))
雖然這些方法提供了異步執(zhí)行任務(wù)的能力贫导,但是對(duì)于結(jié)果的獲取卻還是很不方便孩灯,只能通過阻塞或者輪詢的方式得到任務(wù)的結(jié)果峰档。
阻塞的方式顯然和我們的異步編程的初衷相違背讥巡,輪詢的方式又會(huì)耗費(fèi)無謂的CPU資源欢顷,而且也不能及時(shí)的得到計(jì)算結(jié)果抬驴,為什么不能用觀察者設(shè)計(jì)模式當(dāng)計(jì)算結(jié)果完成及時(shí)通知監(jiān)聽者呢布持?
很多語言题暖,比如Node.js胧卤,采用Callback的方式實(shí)現(xiàn)異步編程灌侣。Java的一些框架侧啼,比如Netty痊乾,自己擴(kuò)展了Java的 Future 接口哪审,提供了 addListener 等多個(gè)擴(kuò)展方法湿滓。Google的guava也提供了通用的擴(kuò)展Future:ListenableFuture 叽奥、 SettableFuture 以及輔助類 Futures 等朝氓,方便異步編程赵哲。為此枫夺,Java終于在JDK1.8這個(gè)版本中增加了一個(gè)能力更強(qiáng)的Future類:CompletableFuture 橡庞。它提供了非常強(qiáng)大的Future的擴(kuò)展功能,可以幫助我們簡(jiǎn)化異步編程的復(fù)雜性,提供了函數(shù)式編程的能力扼倘,可以通過回調(diào)的方式處理計(jì)算結(jié)果再菊。下面來看看這幾種方式纠拔。
Netty-Future
引入Maven依賴:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.29.Final</version>
</dependency>
public class NettyFutureDemo {
public static void main(String[] args) throws InterruptedException {
EventExecutorGroup group = new DefaultEventExecutorGroup(4);
System.out.println("開始:" + DateUtils.getNow());
Future<Integer> f = group.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("開始耗時(shí)計(jì)算:" + DateUtils.getNow());
Thread.sleep(10000);
System.out.println("結(jié)束耗時(shí)計(jì)算:" + DateUtils.getNow());
return 100;
}
});
f.addListener(new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> objectFuture) throws Exception {
System.out.println("計(jì)算結(jié)果:" + objectFuture.get());
}
});
System.out.println("結(jié)束:" + DateUtils.getNow());
// 不讓守護(hù)線程退出
new CountDownLatch(1).await();
}
}
輸出結(jié)果:
開始:2019-05-16 08:25:40:779
結(jié)束:2019-05-16 08:25:40:788
開始耗時(shí)計(jì)算:2019-05-16 08:25:40:788
結(jié)束耗時(shí)計(jì)算:2019-05-16 08:25:50:789
計(jì)算結(jié)果:100
從結(jié)果可以看出稠诲,耗時(shí)計(jì)算結(jié)束后自動(dòng)觸發(fā)Listener的完成方法,避免了主線程無謂的阻塞等待略水,那么它究竟是怎么做到的呢渊涝?下面看源碼
DefaultEventExecutorGroup 實(shí)現(xiàn)了 EventExecutorGroup 接口跨释,而 EventExecutorGroup 則是實(shí)現(xiàn)了JDK ScheduledExecutorService 接口的線程組接口厌处,所以它擁有線程池的所有方法蚯姆。然而它卻把所有返回 java.util.concurrent.Future 的方法重寫為返回 io.netty.util.concurrent.Future 洒敏,把所有返回 java.util.concurrent.ScheduledFuture 的方法重寫為返回 io.netty.util.concurrent.ScheduledFuture 凶伙。
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
/**
- 返回一個(gè)EventExecutor
*/
EventExecutor next();
Iterator<EventExecutor> iterator();
Future<?> submit(Runnable task);
<T> Future<T> submit(Runnable task, T result);
<T> Future<T> submit(Callable<T> task);
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}
EventExecutorGroup 的submit方法因?yàn)?newTaskFor 的重寫導(dǎo)致返回了netty的 Future 實(shí)現(xiàn)類显押,而這個(gè)實(shí)現(xiàn)類正是 PromiseTask 傻挂。
@Override
public <T> Future<T> submit(Callable<T> task) {
return (Future<T>) super.submit(task);
}
@Override
protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new PromiseTask<T>(this, callable);
}
PromiseTask 的實(shí)現(xiàn)很簡(jiǎn)單兽肤,它緩存了要執(zhí)行的 Callable 任務(wù)资铡,并在run方法中完成了任務(wù)調(diào)用和Listener的通知幢码。
@Override
public void run() {
try {
if (setUncancellableInternal()) {
V result = task.call();
setSuccessInternal(result);
}
} catch (Throwable e) {
setFailureInternal(e);
}
}
@Override
public Promise<V> setSuccess(V result) {
if (setSuccess0(result)) {
notifyListeners();
return this;
}
throw new IllegalStateException("complete already: " + this);
}
@Override
public Promise<V> setFailure(Throwable cause) {
if (setFailure0(cause)) {
notifyListeners();
return this;
}
throw new IllegalStateException("complete already: " + this, cause);
}
任務(wù)調(diào)用成功或者失敗都會(huì)調(diào)用 notifyListeners 來通知Listener店雅,所以大家得在回調(diào)的函數(shù)里調(diào)用 isSuccess 方法來檢查狀態(tài)底洗。
這里有一個(gè)疑惑,會(huì)不會(huì) Future 在調(diào)用 addListener 方法的時(shí)候任務(wù)已經(jīng)執(zhí)行完成了珊擂,這樣子會(huì)不會(huì)通知就會(huì)失敗了按萆取扛稽?
@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
synchronized (this) {
addListener0(listener);
}
if (isDone()) {
notifyListeners();
}
return this;
}
可以發(fā)現(xiàn)在张,在Listener添加成功之后矮慕,會(huì)立即檢查狀態(tài),如果任務(wù)已經(jīng)完成立刻進(jìn)行回調(diào)瘟斜,所以這里不用擔(dān)心啦螺句。OK蛇尚,下面看看Guava-Future的實(shí)現(xiàn)取劫。
Guava-Future
首先引入guava的Maven依賴:
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>22.0</version>
</dependency>
public class GuavaFutureDemo {
public static void main(String[] args) throws InterruptedException {
System.out.println("開始:" + DateUtils.getNow());
ExecutorService executorService = Executors.newFixedThreadPool(10);
ListeningExecutorService service = MoreExecutors.listeningDecorator(executorService);
ListenableFuture<Integer> future = service.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("開始耗時(shí)計(jì)算:" + DateUtils.getNow());
Thread.sleep(10000);
System.out.println("結(jié)束耗時(shí)計(jì)算:" + DateUtils.getNow());
return 100;
}
});
future.addListener(new Runnable() {
@Override
public void run() {
System.out.println("調(diào)用成功");
}
}, executorService);
System.out.println("結(jié)束:" + DateUtils.getNow());
new CountDownLatch(1).await();
}
}
ListenableFuture 可以通過 addListener 方法增加回調(diào)函數(shù)疚膊,一般用于不在乎執(zhí)行結(jié)果的地方寓盗。如果需要在執(zhí)行成功時(shí)獲取結(jié)果或者執(zhí)行失敗時(shí)獲取異常信息璧函,需要用到 Futures 工具類的 addCallback 方法:
Futures.addCallback(future, new FutureCallback<Integer>() {
@Override
public void onSuccess(@Nullable Integer result) {
System.out.println("成功,計(jì)算結(jié)果:" + result);
}
@Override
public void onFailure(Throwable t) {
System.out.println("失敗");
}
}, executorService);
前面提到除了 ListenableFuture 外库继,還有一個(gè) SettableFuture 類也支持回調(diào)能力宪萄。它實(shí)現(xiàn)自 ListenableFuture 拜英,所以擁有 ListenableFuture 的所有能力居凶。
public class GuavaFutureDemo {
public static void main(String[] args) throws InterruptedException {
System.out.println("開始:" + DateUtils.getNow());
ExecutorService executorService = Executors.newFixedThreadPool(10);
ListenableFuture<Integer> future = submit(executorService);
Futures.addCallback(future, new FutureCallback<Integer>() {
@Override
public void onSuccess(@Nullable Integer result) {
System.out.println("成功侠碧,計(jì)算結(jié)果:" + result);
}
@Override
public void onFailure(Throwable t) {
System.out.println("失敗:" + t.getMessage());
}
}, executorService);
Thread.sleep(1000);
System.out.println("結(jié)束:" + DateUtils.getNow());
new CountDownLatch(1).await();
}
private static ListenableFuture<Integer> submit(Executor executor) {
SettableFuture<Integer> future = SettableFuture.create();
executor.execute(new Runnable() {
@Override
public void run() {
System.out.println("開始耗時(shí)計(jì)算:" + DateUtils.getNow());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("結(jié)束耗時(shí)計(jì)算:" + DateUtils.getNow());
// 返回值
future.set(100);
// 設(shè)置異常信息
// future.setException(new RuntimeException("custom error!"));
}
});
return future;
}
}
看起來用法上沒有太多差別,但是有一個(gè)很容易被忽略的重要問題嫁佳。當(dāng) SettableFuture 的這種方式最后調(diào)用了 cancel 方法后盛垦,線程池中的任務(wù)還是會(huì)繼續(xù)執(zhí)行瓤漏,而通過 submit 方法返回的 ListenableFuture 方法則會(huì)立即取消執(zhí)行蔬充,這點(diǎn)尤其要注意。下面看看源碼:
和Netty的Future一樣榨呆,Guava也是通過實(shí)現(xiàn)了自定義的 ExecutorService 實(shí)現(xiàn)類 ListeningExecutorService 來重寫了 submit 方法积蜻。
public interface ListeningExecutorService extends ExecutorService {
<T> ListenableFuture<T> submit(Callable<T> task);
ListenableFuture<?> submit(Runnable task);
<T> ListenableFuture<T> submit(Runnable task, T result);
}
同樣的竿拆,newTaskFor 方法也被進(jìn)行了重寫丙笋,返回了自定義的Future類:TrustedListenableFutureTask
@Override
protected final <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return TrustedListenableFutureTask.create(runnable, value);
}
@Override
protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return TrustedListenableFutureTask.create(callable);
}
任務(wù)調(diào)用會(huì)走 TrustedFutureInterruptibleTask 的run方法:
@Override
public void run() {
TrustedFutureInterruptibleTask localTask = task;
if (localTask != null) {
localTask.run();
}
}
@Override
public final void run() {
if (!ATOMIC_HELPER.compareAndSetRunner(this, null, Thread.currentThread())) {
return; // someone else has run or is running.
}
try {
// 抽象方法澳化,子類進(jìn)行重寫
runInterruptibly();
} finally {
if (wasInterrupted()) {
while (!doneInterrupting) {
Thread.yield();
}
}
}
}
最終還是調(diào)用到 TrustedFutureInterruptibleTask 的 runInterruptibly 方法缎谷,等待任務(wù)完成后調(diào)用 set 方法列林。
@Override
void runInterruptibly() {
if (!isDone()) {
try {
set(callable.call());
} catch (Throwable t) {
setException(t);
}
}
}
protected boolean set(@Nullable V value) {
Object valueToSet = value == null ? NULL : value;
// CAS設(shè)置值
if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
complete(this);
return true;
}
return false;
}
在 complete 方法的最后會(huì)獲取到Listener進(jìn)行回調(diào)。
上面提到的 SettableFuture 和 ListenableFuture 的 cancel 方法效果不同酪惭,原因在于一個(gè)重寫了 afterDone 方法而一個(gè)沒有希痴。
下面是 ListenableFuture 的 afterDone 方法:
@Override
protected void afterDone() {
super.afterDone();
if (wasInterrupted()) {
TrustedFutureInterruptibleTask localTask = task;
if (localTask != null) {
localTask.interruptTask();
}
}
this.task = null;
}
wasInterrupted 用來判斷是否調(diào)用了 cancel (cancel方法會(huì)設(shè)置一個(gè)取消對(duì)象Cancellation到value中)
protected final boolean wasInterrupted() {
final Object localValue = value;
return (localValue instanceof Cancellation) && ((Cancellation) localValue).wasInterrupted;
}
interruptTask 方法通過線程的 interrupt 方法真正取消線程任務(wù)的執(zhí)行:
final void interruptTask() {
Thread currentRunner = runner;
if (currentRunner != null) {
currentRunner.interrupt();
}
doneInterrupting = true;
}
由 Callback Hell 引出 Promise 模式
如果你對(duì) ES6 有所接觸,就不會(huì)對(duì) Promise 這個(gè)模式感到陌生春感,如果你對(duì)前端不熟悉砌创,也不要緊,我們先來看看回調(diào)地獄(Callback Hell)是個(gè)什么概念鲫懒。
回調(diào)是一種我們推崇的異步調(diào)用方式嫩实,但也會(huì)遇到問題,也就是回調(diào)的嵌套窥岩。當(dāng)需要多個(gè)異步回調(diào)一起書寫時(shí)颂翼,就會(huì)出現(xiàn)下面的代碼(以 js 為例):
asyncFunc1(opt, (...args1) => {
asyncFunc2(opt, (...args2) => {
asyncFunc3(opt, (...args3) => {
asyncFunc4(opt, (...args4) => {
// some operation
});
});
});
});
雖然在 JAVA 業(yè)務(wù)代碼中很少出現(xiàn)回調(diào)的多層嵌套,但總歸是個(gè)問題,這樣的代碼不易讀,嵌套太深修改也麻煩。于是 ES6 提出了 Promise 模式來解決回調(diào)地獄的問題∈淇荩可能就會(huì)有人想問:java 中存在 Promise 模式嗎瞳收?答案是肯定的界弧。
前面提到了 Netty 和 Guava 的擴(kuò)展都提供了 addListener 這樣的接口,用于處理 Callback 調(diào)用,但其實(shí) jdk1.8 已經(jīng)提供了一種更為高級(jí)的回調(diào)方式:CompletableFuture。首先嘗試用 CompletableFuture 來重寫上面回調(diào)的問題。
public class CompletableFutureTest {
public static void main(String[] args) throws InterruptedException {
System.out.println("開始:" + DateUtils.getNow());
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("開始耗時(shí)計(jì)算:" + DateUtils.getNow());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("結(jié)束耗時(shí)計(jì)算:" + DateUtils.getNow());
return 100;
});
completableFuture.whenComplete((result, e) -> {
System.out.println("回調(diào)結(jié)果:" + result);
});
System.out.println("結(jié)束:" + DateUtils.getNow());
new CountDownLatch(1).await();
}
}
使用CompletableFuture耗時(shí)操作沒有占用主線程的時(shí)間片癣诱,達(dá)到了異步調(diào)用的效果。我們也不需要引入任何第三方的依賴,這都是依賴于 java.util.concurrent.CompletableFuture 的出現(xiàn)。CompletableFuture 提供了近 50 多個(gè)方法六水,大大便捷了 java 多線程操作胯盯,和異步調(diào)用的寫法叉趣。
使用 CompletableFuture 解決回調(diào)地獄問題:
public class CompletableFutureDemo {
public static void main(String[] args) throws InterruptedException {
long l = System.currentTimeMillis();
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("在回調(diào)中執(zhí)行耗時(shí)操作...");
Thread.sleep(10000);
return 100;
});
completableFuture = completableFuture.thenCompose(i -> {
return CompletableFuture.supplyAsync(() -> {
System.out.println("在回調(diào)的回調(diào)中執(zhí)行耗時(shí)操作...");
Thread.sleep(10000);
return i + 100;
});
});
completableFuture.whenComplete((result, e) -> {
System.out.println("計(jì)算結(jié)果:" + result);
});
System.out.println("主線程運(yùn)算耗時(shí):" + (System.currentTimeMillis() - l) + " ms");
new CountDownLatch(1).await();
}
}
輸出:
在回調(diào)中執(zhí)行耗時(shí)操作...主線程運(yùn)算耗時(shí):58 ms在回調(diào)的回調(diào)中執(zhí)行耗時(shí)操作...計(jì)算結(jié)果:200
使用 thenCompose 或者 thenComposeAsync 等方法可以實(shí)現(xiàn)回調(diào)的回調(diào)奠蹬,且寫出來的方法易于維護(hù)。
總的看來僻他,為Future模式增加回調(diào)功能就不需要阻塞等待結(jié)果的返回并且不需要消耗無謂的CPU資源去輪詢處理狀態(tài)翩瓜,JDK8之前使用Netty或者Guava提供的工具類,JDK8之后則可以使用自帶的 CompletableFuture 類。Future 有兩種模式:將來式和回調(diào)式。而回調(diào)式會(huì)出現(xiàn)回調(diào)地獄的問題宾抓,由此衍生出了 Promise 模式來解決這個(gè)問題。這才是 Future 模式和 Promise 模式的相關(guān)性。