CompletableFuture
簡介
擴(kuò)展 Future
功能舱权,通過 CompletionStage
提供函數(shù)式編程的能力,簡化異步編程
基本用法
-
CompletableFuture
,實現(xiàn)了Future<T>
,CompletionStage<T>
兩個接口。 -
Executor
參數(shù)可以手動指定線程池,否則默認(rèn)ForkJoinPool.commonPool()
系統(tǒng)級公共線程池。這些線程都是守護(hù)線程,主線程結(jié)束守護(hù)線程不結(jié)束势就,只有JVM關(guān)閉時,生命周期終止脉漏。 - 以 Async 結(jié)尾的方法都是異步執(zhí)行苞冯,即到線程池取個新線程執(zhí)行
創(chuàng)建
-
supplyAsync
創(chuàng)建有返回值的任務(wù) 參數(shù)Supplier
-
runAsync
建沒有返回值的任務(wù) 參數(shù)Runnable
-
completedFuture
創(chuàng)建已完成的任務(wù) 參數(shù)為值
消費(fèi)
-
whenComplete
whenCompleteAsync
消費(fèi)任務(wù)結(jié)果 參數(shù)BiConsumer
-
exceptionally
消費(fèi)任務(wù)異常 參數(shù)Function
-
handle
handleAsync
消費(fèi)任務(wù)結(jié)果和異常 參數(shù)BiFunction
變換
thenApply
thenApplyAsync
有入?yún)⒂蟹祷刂?參數(shù) FunctionthenAccept
thenAcceptAsync
有入?yún)o返回值 參數(shù)Consumer
thenRun
thenRunAsync
無入?yún)o返回值 參數(shù)Runnable
thenCompose
thenComposeAsync
構(gòu)成新任務(wù) 參數(shù)Function
組合
thenCombine
thenCombineAsync
雙線有入?yún)⒂蟹祷刂?參數(shù)CompletionStage
BiFunction
thenAcceptBoth
thenAcceptBothAsync
雙線有入?yún)o返回值 參數(shù)CompletionStage
BiConsumer
runAfterBoth
runAfterBothAsync
雙線無入?yún)o返回值 參數(shù)CompletionStage
Runnable
applyToEither
applyToEitherAsync
二選快有入?yún)⒂蟹祷刂?參數(shù)CompletionStage
Function
acceptEither
acceptEitherAsync
二選快有入?yún)o返回值 參數(shù)CompletionStage
Consumer
runAfterEither
runAfterEitherAsync
二選快無入?yún)o返回值 參數(shù)CompletionStage
Runnable
群體
-
anyOf
只要有完成即觸發(fā)有返回值 參數(shù)CompletableFuture...
-
allOf
全部完成才觸發(fā)無返回值,可接收異常 參數(shù)CompletableFuture...
fun testAnyOfAndAllOf() {
val f1: CompletableFuture<String> = CompletableFuture.supplyAsync {
TimeUnit.SECONDS.sleep(2)
return@supplyAsync Thread.currentThread().name
}.whenComplete { t, u ->
println("t1: ${System.currentTimeMillis()}: ${Thread.currentThread().name}: $t $u")
}
val f2: CompletableFuture<String> = CompletableFuture.supplyAsync {
TimeUnit.SECONDS.sleep(1)
return@supplyAsync Thread.currentThread().name
}.whenCompleteAsync { t, u ->
println("t2: ${System.currentTimeMillis()}: ${Thread.currentThread().name}: $t $u")
}
CompletableFuture.anyOf(f1, f2)
.whenComplete { t, u ->
println("anyOf: ${System.currentTimeMillis()}: ${Thread.currentThread().name}: $t $u")
}
CompletableFuture.allOf(f1, f2)
.whenComplete { t, u ->
println("allOf: ${System.currentTimeMillis()}: ${Thread.currentThread().name}: $t $u")
}.join()
}
t2: 1550851312825: ForkJoinPool.commonPool-worker-2: ForkJoinPool.commonPool-worker-2 null
anyOf: 1550851312825: ForkJoinPool.commonPool-worker-2: ForkJoinPool.commonPool-worker-2 null
t1: 1550851313822: ForkJoinPool.commonPool-worker-1: ForkJoinPool.commonPool-worker-1 null
allOf: 1550851313822: ForkJoinPool.commonPool-worker-1: null null
完成
-
complete
completeExceptionally
cancel
觸發(fā)完成 -
isDone
isCompletedExceptionally
isCancelled
判斷任務(wù)是否已經(jīng)結(jié)束
獲取結(jié)果
-
get
阻塞等待鸠删,獲取任務(wù)結(jié)果抱完,會拋出ExecutionException
InterruptedException
CancellationException
-
join
阻塞等待,獲取任務(wù)結(jié)果刃泡,異常CompletionException
CancellationException
-
getNow
立即取到結(jié)果巧娱,任務(wù)未結(jié)束則返回傳入?yún)?shù)碉怔,異常CompletionException
CancellationException
切換線程
使用 Executor
切換執(zhí)行任務(wù)線程,例如在 Android 中在主線程和子線程切換
fun testExecutor() {
CompletableFuture.supplyAsync(Supplier {
TimeUnit.SECONDS.sleep(2)
println("t1: ${Thread.currentThread().name}") // t1: pool-1-thread-1
return@Supplier "t1"
}, AppExecutors.io).thenApplyAsync(Function<String, String> {
println("t2: ${Thread.currentThread().name}") // t2: main
return@Function "$it.t2"
}, AppExecutors.main).thenApplyAsync(Function<String, String> {
println("t3: ${Thread.currentThread().name}") // t3: pool-2-thread-1
return@Function "$it.t3"
}, AppExecutors.single).whenCompleteAsync(BiConsumer { t, u ->
println("t4: ${Thread.currentThread().name}: $t $u") // t4: main: t1.t2.t3 null
}, AppExecutors.main)
}
object AppExecutors {
val io: ExecutorService by lazy {
Executors.newCachedThreadPool()
}
val compute: ExecutorService by lazy {
val threadCount = 3
Executors.newFixedThreadPool(threadCount)
}
val single: ExecutorService by lazy {
Executors.newSingleThreadExecutor()
}
val main: Executor by lazy {
MainThreadExecutor()
}
private class MainThreadExecutor : Executor {
private val mainThreadHandler: Handler = Handler(Looper.getMainLooper())
override fun execute(command: Runnable?) {
mainThreadHandler.post(command);
}
}
}
參考
CompletableFuture
Java8新特性整理之CompletableFuture:組合式禁添、異步編程(七)
CompletableFuture 使用詳解
個人愚見:分布式框架中使用CompletableFuture提高效率
Java并發(fā)編程系列一:Future和CompletableFuture解析與使用