????在JDK1.5已經(jīng)提供了Future和Callable的實(shí)現(xiàn),可以用于阻塞式獲取結(jié)果,如果想要異步獲取結(jié)果,通常都會(huì)以輪詢的方式去獲取結(jié)果,如下:
//定義一個(gè)異步任務(wù)
Future<String> future = executor.submit(()->{
Thread.sleep(2000);
return "hello world";
});
//輪詢獲取結(jié)果
while (true){
if(future.isDone()) {
System.out.println(future.get());
break;
}
}
????從上面的形式看來(lái)輪詢的方式會(huì)耗費(fèi)無(wú)謂的CPU資源,而且也不能及時(shí)地得到計(jì)算結(jié)果.所以要實(shí)現(xiàn)真正的異步,上述這樣是完全不夠的,在Netty中,我們隨處可見(jiàn)異步編程
ChannelFuture f = serverBootstrap.bind(port).sync();
f.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
System.out.println("complete");
}
});
????而JDK1.8中的CompletableFuture
就為我們提供了異步函數(shù)式編程,CompletableFuture
提供了非常強(qiáng)大的Future
的擴(kuò)展功能,可以幫助我們簡(jiǎn)化異步編程的復(fù)雜性绢涡,提供了函數(shù)式編程的能力抡蛙,可以通過(guò)回調(diào)的方式處理計(jì)算結(jié)果妇智,并且提供了轉(zhuǎn)換和組合CompletableFuture
的方法齐蔽。
1. 創(chuàng)建CompletableFuture對(duì)象
????CompletableFuture
提供了四個(gè)靜態(tài)方法用來(lái)創(chuàng)建CompletableFuture對(duì)象:
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
????Asynsc
表示異步,而supplyAsync
與runAsync
不同在與前者異步返回一個(gè)結(jié)果,后者是void.第二個(gè)函數(shù)第二個(gè)參數(shù)表示是用我們自己創(chuàng)建的線程池,否則采用默認(rèn)的ForkJoinPool.commonPool()
作為它的線程池.其中Supplier
是一個(gè)函數(shù)式接口,代表是一個(gè)生成者的意思,傳入0個(gè)參數(shù),返回一個(gè)結(jié)果.(更詳細(xì)的可以看我另一篇文章)
CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
return "hello world";
});
System.out.println(future.get()); //阻塞的獲取結(jié)果 ''helllo world"
2. 主動(dòng)計(jì)算
????以下4個(gè)方法用于獲取結(jié)果
//同步獲取結(jié)果
public T get()
public T get(long timeout, TimeUnit unit)
public T getNow(T valueIfAbsent)
public T join()
getNow
有點(diǎn)特殊杠巡,如果結(jié)果已經(jīng)計(jì)算完則返回結(jié)果或者拋出異常剑勾,否則返回給定的valueIfAbsent值埃撵。join()
與get()
區(qū)別在于join()
返回計(jì)算的結(jié)果或者拋出一個(gè)unchecked異常(CompletionException),而get()
返回一個(gè)具體的異常.
- 主動(dòng)觸發(fā)計(jì)算.
public boolean complete(T value)
public boolean completeExceptionally(Throwable ex)
上面方法表示當(dāng)調(diào)用CompletableFuture.get()
被阻塞的時(shí)候,那么這個(gè)方法就是結(jié)束阻塞,并且get()
獲取設(shè)置的value.
public static CompletableFuture<Integer> compute() {
final CompletableFuture<Integer> future = new CompletableFuture<>();
return future;
}
public static void main(String[] args) throws Exception {
final CompletableFuture<Integer> f = compute();
class Client extends Thread {
CompletableFuture<Integer> f;
Client(String threadName, CompletableFuture<Integer> f) {
super(threadName);
this.f = f;
}
@Override
public void run() {
try {
System.out.println(this.getName() + ": " + f.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
new Client("Client1", f).start();
new Client("Client2", f).start();
System.out.println("waiting");
//設(shè)置Future.get()獲取到的值
f.complete(100);
//以異常的形式觸發(fā)計(jì)算
//f.completeExceptionally(new Exception());
Thread.sleep(1000);
}
3. 計(jì)算結(jié)果完成時(shí)的處理
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
上面4個(gè)方法是當(dāng)計(jì)算階段結(jié)束的時(shí)候觸發(fā),BiConsumer
有兩個(gè)入?yún)?分別代表計(jì)算返回值,另外一個(gè)是異常.無(wú)返回值.方法不以Async結(jié)尾虽另,意味著Action使用相同的線程執(zhí)行暂刘,而Async可能會(huì)使用其它的線程去執(zhí)行(如果使用相同的線程池,也可能會(huì)被同一個(gè)線程選中執(zhí)行)捂刺。
future.whenCompleteAsync((v,e)->{
System.out.println("return value:"+v+" exception:"+e);
});
- handle()
public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)
與whenComplete()
不同的是這個(gè)函數(shù)返回CompletableFuture
并不是原始的CompletableFuture
返回的值,而是BiFunction
返回的值.
4. CompletableFuture的組合
- thenApply
當(dāng)計(jì)算結(jié)算完成之后,后面可以接繼續(xù)一系列的thenApply,來(lái)完成值的轉(zhuǎn)化.
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
它們與handle方法的區(qū)別在于handle方法會(huì)處理正常計(jì)算值和異常谣拣,因此它可以屏蔽異常,避免異常繼續(xù)拋出族展。而thenApply方法只是用來(lái)處理正常值森缠,因此一旦有異常就會(huì)拋出。
CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
return "hello world";
});
CompletableFuture<String> future3 = future.thenApply((element)->{
return element+" addPart";
}).thenApply((element)->{
return element+" addTwoPart";
});
System.out.println(future3.get());//hello world addPart addTwoPart
5. CompletableFuture的Consumer
只對(duì)CompletableFuture
的結(jié)果進(jìn)行消費(fèi),無(wú)返回值,也就是最后的CompletableFuture
是void.
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
//入?yún)樵嫉腃ompletableFuture的結(jié)果.
CompletableFuture future4 = future.thenAccept((e)->{
System.out.println("without return value");
});
future4.get();
- thenAcceptBoth
這個(gè)方法用來(lái)組合兩個(gè)CompletableFuture
,其中一個(gè)CompletableFuture
等待另一個(gè)CompletableFuture
的結(jié)果.
CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
return "hello world";
});
CompletableFuture future5 = future.thenAcceptBoth(CompletableFuture.completedFuture("compose"),
(x, y) -> System.out.println(x+y));//hello world compose
6. Either和ALL
????thenAcceptBoth
是當(dāng)兩個(gè)CompletableFuture
都計(jì)算完成仪缸,而我們下面要了解的方法applyToEither
是當(dāng)任意一個(gè)CompletableFuture計(jì)算完成的時(shí)候就會(huì)執(zhí)行贵涵。
Random rand = new Random();
CompletableFuture<Integer> future9 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000 + rand.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return 100;
});
CompletableFuture<Integer> future10 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000 + rand.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return 200;
});
//兩個(gè)中任意一個(gè)計(jì)算完成,那么觸發(fā)Runnable的執(zhí)行
CompletableFuture<String> f = future10.applyToEither(future9,i -> i.toString());
//兩個(gè)都計(jì)算完成,那么觸發(fā)Runnable的執(zhí)行
CompletableFuture f1 = future10.acceptEither(future9,(e)->{
System.out.println(e);
});
System.out.println(f.get());
????如果想組合超過(guò)2個(gè)以上的CompletableFuture
,allOf
和anyOf
可能會(huì)滿足你的要求.allOf
方法是當(dāng)所有的CompletableFuture
都執(zhí)行完后執(zhí)行計(jì)算。anyOf
方法是當(dāng)任意一個(gè)CompletableFuture
執(zhí)行完后就會(huì)執(zhí)行計(jì)算,計(jì)算的結(jié)果相同宾茂。
總結(jié)
????有了CompletableFuture
之后,我們自己實(shí)現(xiàn)異步編程變得輕松很多,這個(gè)類(lèi)也提供了許多方法來(lái)組合CompletableFuture
.結(jié)合Lambada表達(dá)式來(lái)用,變得很輕松.