下面看一個服務(wù)的調(diào)用鏈
設(shè)想一下這5個查詢服務(wù)曲秉,平均每次消耗50ms二蓝,那么本次調(diào)用至少是250ms翘盖,我們細(xì)想一下,在這個這五個服務(wù)其實并沒有任何的依賴瓶摆,誰先獲取誰后獲取都可以凉逛,那么我們可以想想,是否可以用多重影分身之術(shù)群井,同時獲取這五個服務(wù)的信息呢状飞?優(yōu)化如下:
將這五個查詢服務(wù)并行查詢,在理想情況下可以優(yōu)化至50ms书斜。當(dāng)然說起來簡單叭爱,我們真正如何落地呢造寝?
1.CountDownLatch
CountDownLatch钳垮,可以將其看成是一個計數(shù)器蝗拿,await()方法可以阻塞至超時或者計數(shù)器減至0,其他線程當(dāng)完成自己目標(biāo)的時候可以減少1样屠,利用這個機制我們可以將其用來做并發(fā)穿撮。 可以用如下的代碼實現(xiàn)我們上面的下訂單的需求:
public class CountDownTask {
private static final int CORE_POOL_SIZE = 4;
private static final int MAX_POOL_SIZE = 12;
private static final long KEEP_ALIVE_TIME = 5L;
private final static int QUEUE_SIZE = 1600;
protected final static ExecutorService THREAD_POOL = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(QUEUE_SIZE)
);
public static void main(String[] args) throws InterruptedException {
// 新建一個為5的計數(shù)器
CountDownLatch countDownLatch = new CountDownLatch(5);
OrderInfo orderInfo = new OrderInfo();
THREAD_POOL.execute(() -> {
System.out.println("當(dāng)前任務(wù)Customer,線程名字為:"+ Thread.currentThread().getName());
orderInfo.setCustomerInfo(new CustomerInfo());
countDownLatch.countDown();
});
THREAD_POOL.execute(() -> {
System.out.println("當(dāng)前任務(wù)Discount,線程名字為:"+ Thread.currentThread().getName());
orderInfo.setDiscountInfo(new DiscountInfo());
countDownLatch.countDown();
});
THREAD_POOL.execute(() -> {
System.out.println("當(dāng)前任務(wù)Food,線程名字為:"+ Thread.currentThread().getName());
orderInfo.setFoodListInfo(new FoodListInfo());
countDownLatch.countDown();
});
THREAD_POOL.execute(() -> {
System.out.println("當(dāng)前任務(wù)Tenant,線程名字為:"+ Thread.currentThread().getName());
orderInfo.setTenantInfo(new TenantInfo());
countDownLatch.countDown();
});
THREAD_POOL.execute(() -> {
System.out.println("當(dāng)前任務(wù)OtherInfo,線程名字為:"+ Thread.currentThread().getName());
orderInfo.setOtherInfo(new OtherInfo());
countDownLatch.countDown();
});
countDownLatch.await(1, TimeUnit.SECONDS);
System.out.println("主線程:"+ Thread.currentThread().getName());
}
}
建立一個線程池(具體配置根據(jù)具體業(yè)務(wù),具體機器配置)痪欲,進行并發(fā)的執(zhí)行我們的任務(wù)(生成用戶信息悦穿,菜品信息等),最后利用await方法阻塞等待結(jié)果成功返回勤揩。
2.CompletableFuture
CountDownLatch雖然能實現(xiàn)我們需要滿足的功能但是其任然有個問題是,在我們的業(yè)務(wù)代碼需要耦合CountDownLatch的代碼秘蛔,比如在我們獲取用戶信息之后我們會執(zhí)行countDownLatch.countDown()陨亡,很明顯我們的業(yè)務(wù)代碼顯然不應(yīng)該關(guān)心這一部分邏輯傍衡,并且在開發(fā)的過程中萬一寫漏了,那我們的await方法將只會被各種異常喚醒负蠕。
在JDK1.8中提供了一個類CompletableFuture蛙埂,它是一個多功能的非阻塞的Future。(什么是Future:用來代表異步結(jié)果遮糖,并且提供了檢查計算完成绣的,等待完成,檢索結(jié)果完成等方法欲账。)
我們將每個任務(wù)的計算完成的結(jié)果都用CompletableFuture來表示屡江,利用CompletableFuture.allOf匯聚成一個大的CompletableFuture,那么利用get()方法就可以阻塞赛不。
public class CompletableFutureParallel {
private static final int CORE_POOL_SIZE = 4;
private static final int MAX_POOL_SIZE = 12;
private static final long KEEP_ALIVE_TIME = 5L;
private final static int QUEUE_SIZE = 1600;
protected final static ExecutorService THREAD_POOL = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(QUEUE_SIZE)
);
public static void main(String[] args) throws Exception {
OrderInfo orderInfo = new OrderInfo();
List futures = new ArrayList<>();
futures.add(CompletableFuture.runAsync(() -> {
System.out.println("當(dāng)前任務(wù)Customer,線程名字為:" + Thread.currentThread().getName());
orderInfo.setCustomerInfo(new CustomerInfo());
}, THREAD_POOL));
futures.add(CompletableFuture.runAsync(() -> {
System.out.println("當(dāng)前任務(wù)Discount,線程名字為:" + Thread.currentThread().getName());
orderInfo.setDiscountInfo(new DiscountInfo());
}, THREAD_POOL));
futures.add(CompletableFuture.runAsync(() -> {
System.out.println("當(dāng)前任務(wù)Food,線程名字為:" + Thread.currentThread().getName());
orderInfo.setFoodListInfo(new FoodListInfo());
}, THREAD_POOL));
futures.add(CompletableFuture.runAsync(() -> {
System.out.println("當(dāng)前任務(wù)Other,線程名字為:" + Thread.currentThread().getName());
orderInfo.setOtherInfo(new OtherInfo());
}, THREAD_POOL));
CompletableFuture allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
allDoneFuture.get(10, TimeUnit.SECONDS);
System.out.println(orderInfo);
}
}
可以看見我們使用CompletableFuture能很快的完成的需求惩嘉,當(dāng)然這還不夠。
3.Fork/Join
我們上面用CompletableFuture完成了我們對多組任務(wù)并行執(zhí)行踢故,但是其依然是依賴我們的線程池文黎,在我們的線程池中使用的是阻塞隊列,也就是當(dāng)我們某個線程執(zhí)行完任務(wù)的時候需要通過這個阻塞隊列進行殿较,那么肯定會發(fā)生競爭耸峭,所以在JDK1.7中提供了ForkJoinTask和ForkJoinPool。
ForkJoinPool中每個線程都有自己的工作隊列淋纲,并且采用Work-Steal算法防止線程饑餓劳闹。 Worker線程用LIFO的方法取出任務(wù),但是會用FIFO的方法去偷取別人隊列的任務(wù)帚戳,這樣就減少了鎖的沖突玷或。
網(wǎng)上這個框架的例子很多,我們看看如何使用代碼其完成我們上面的下訂單需求:
public class OrderTask extends RecursiveTask {
@Override
protected OrderInfocompute() {
System.out.println("執(zhí)行" + this.getClass().getSimpleName() + "線程名字為:" + Thread.currentThread().getName());
CustomerTask customerTask = new CustomerTask();
TenantTask tenantTask = new TenantTask();
DiscountTask discountTask = new DiscountTask();
FoodTask foodTask = new FoodTask();
OtherTask otherTask = new OtherTask();
invokeAll(customerTask, tenantTask, discountTask, foodTask, otherTask);
OrderInfo orderInfo = new OrderInfo(customerTask.join(), tenantTask.join(), discountTask.join(), foodTask.join(), otherTask.join());
returnorderInfo;
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() - 1);
System.out.println(forkJoinPool.invoke(new OrderTask()));
}
}
class CustomerTask extends RecursiveTask {
@Override
protected CustomerInfocompute() {
System.out.println("執(zhí)行" + this.getClass().getSimpleName() + "線程名字為:" + Thread.currentThread().getName());
return new CustomerInfo();
}
}
class TenantTask extends RecursiveTask {
@Override
protected TenantInfocompute() {
System.out.println("執(zhí)行" + this.getClass().getSimpleName() + "線程名字為:" + Thread.currentThread().getName());
return new TenantInfo();
}
}
class DiscountTask extends RecursiveTask {
@Override
protected DiscountInfocompute() {
System.out.println("執(zhí)行" + this.getClass().getSimpleName() + "線程名字為:" + Thread.currentThread().getName());
return new DiscountInfo();
}
}
class FoodTask extends RecursiveTask {
@Override
protected FoodListInfocompute() {
System.out.println("執(zhí)行" + this.getClass().getSimpleName() + "線程名字為:" + Thread.currentThread().getName());
return new FoodListInfo();
}
}
class OtherTask extends RecursiveTask {
@Override
protected OtherInfocompute() {
System.out.println("執(zhí)行" + this.getClass().getSimpleName() + "線程名字為:" + Thread.currentThread().getName());
return new OtherInfo();
}
}
我們定義一個OrderTask并且定義五個獲取信息的任務(wù)片任,在compute中分別fork執(zhí)行這五個任務(wù)偏友,最后在將這五個任務(wù)的結(jié)果通過Join獲得,最后完成我們的并行化的需求对供。
4. parallelStream
在jdk1.8中提供了并行流的API位他,當(dāng)我們使用集合的時候能很好的進行并行處理,下面舉了一個簡單的例子從1加到100:
public class ParallelStream {
public static void main(String[] args) {
ArrayList list = new ArrayList();
for (int i = 1; i <= 100; i++) {
list.add(i);
}
LongAdder sum = new LongAdder();
list.parallelStream().forEach(integer -> {
System.out.println("當(dāng)前線程" + Thread.currentThread().getName());
sum.add(integer);
});
System.out.println(sum);
}
}
parallelStream中底層使用的那一套也是Fork/Join的那一套,默認(rèn)的并發(fā)程度是可用CPU數(shù)-1产场。
5.分片
可以想象有這么一個需求鹅髓,每天定時對id在某個范圍之間的用戶發(fā)券,比如這個范圍之間的用戶有幾百萬京景,如果給一臺機器發(fā)的話窿冯,可能全部發(fā)完需要很久的時間,所以分布式調(diào)度框架比如:elastic-job确徙。都提供了分片的功能醒串,比如你用50臺機器执桌,那么id%50=0的在第0臺機器上,=1的在第1臺機器上發(fā)券芜赌,那么我們的執(zhí)行時間其實就分?jǐn)偟搅瞬煌臋C器上了仰挣。
并行化注意事項
線程安全:在parallelStream中我們列舉的代碼中使用的是LongAdder,并沒有直接使用我們的Integer和Long缠沈,這個是因為在多線程環(huán)境下Integer和Long線程不安全膘壶。所以線程安全我們需要特別注意。
合理參數(shù)配置:可以看見我們需要配置的參數(shù)比較多洲愤,比如我們的線程池的大小颓芭,等待隊列大小,并行度大小以及我們的等待超時時間等等禽篱,我們都需要根據(jù)自己的業(yè)務(wù)不斷的調(diào)優(yōu)防止出現(xiàn)隊列不夠用或者超時時間不合理等等畜伐。
CompletableFuture詳細(xì)介紹
JDK8以前的Future
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return Thread.currentThread().getName();
}
});
doSomethingElse();//在我們異步操作的同時一樣可以做其他操作
try {
String res = future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
上面展示了我們的線程可以并發(fā)方式調(diào)用另一個線程去做我們耗時的操作。當(dāng)我們必須依賴我們的異步結(jié)果的時候我們就可以調(diào)用get方法去獲得躺率。當(dāng)我們調(diào)用get方法的時候如果我們的任務(wù)完成就可以立馬返回玛界,但是如果任務(wù)沒有完成就會阻塞,直到超時為止悼吱。
Future底層是怎么實現(xiàn)的呢慎框? 我們首先來到我們ExecutorService的代碼中submit方法這里會返回一個Future
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
在sumbmit中會對我們的Callable進行包裝封裝成我們的FutureTask,我們最后的Future其實也是Future的實現(xiàn)類FutureTask后添,F(xiàn)utureTask實現(xiàn)了Runnable接口所以這里直接調(diào)用execute笨枯。在FutureTask代碼中的run方法代碼如下:
public void run() {
if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
}
.......
}
可以看見當(dāng)我們執(zhí)行完成之后會set(result)來通知我們的結(jié)果完成了。set(result)代碼如下:
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
首先用CAS置換狀態(tài)為完成遇西,以及替換結(jié)果馅精,當(dāng)替換結(jié)果完成之后,才會替換為我們的最終狀態(tài)粱檀,這里主要是怕我們設(shè)置完COMPLETING狀態(tài)之后最終值還沒有真正的賦值出去洲敢,而我們的get就去使用了,所以還會有個最終狀態(tài)茄蚯。我們的get()方法的代碼如下:
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
首先獲得當(dāng)前狀態(tài)压彭,然后判斷狀態(tài)是否完成,如果沒有完成則進入awaitDone循環(huán)等待渗常,這也是我們阻塞的代碼壮不,然后返回我們的最終結(jié)果。
缺陷
我們的Future使用很簡單皱碘,這也導(dǎo)致了如果我們想完成一些復(fù)雜的任務(wù)可能就比較難询一。比如下面一些例子:
將兩個異步計算合成一個異步計算,這兩個異步計算互相獨立,同時第二個又依賴第一個的結(jié)果健蕊。
當(dāng)Future集合中某個任務(wù)最快結(jié)束時缓醋,返回結(jié)果。
等待Future結(jié)合中的所有任務(wù)都完成绊诲。
通過編程方式完成一個Future任務(wù)的執(zhí)行。
應(yīng)對Future的完成時間褪贵。也就是我們的回調(diào)通知掂之。
CompletableFuture
CompletableFuture是JDK8提出的一個支持非阻塞的多功能的Future,同樣也是實現(xiàn)了Future接口脆丁。
下面會寫一個比較簡單的例子:
public static void main(String[] args) {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
new Thread(()->{
completableFuture.complete(Thread.currentThread().getName());
}).start();
doSomethingelse();//做你想做的其他操作
try {
System.out.println(completableFuture.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
用法上來說和Future有一點不同世舰,我們這里fork了一個新的線程來完成我們的異步操作,在異步操作中我們會設(shè)置值槽卫,然后在外部做我們其他操作跟压。在complete中會用CAS替換result,然后當(dāng)我們get如果可以獲取到值得時候就可以返回了歼培。
錯誤處理
上面介紹了正常情況下但是當(dāng)我們在我們異步線程中產(chǎn)生了錯誤的話就會非常的不幸震蒋,錯誤的異常不會告知給你,會被扼殺在我們的異步線程中躲庄,而我們的get方法會被阻塞查剖。
對于我們的CompletableFuture提供了completeException方法可以讓我們返回我們異步線程中的異常,代碼如下:
public static void main(String[] args) {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
new Thread(()->{
completableFuture.completeExceptionally(new RuntimeException("error"));
completableFuture.complete(Thread.currentThread().getName());
}).start();
doSomethingelse();//做你想做的耗時操作
try {
System.out.println(completableFuture.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
--------------
輸出:
java.util.concurrent.ExecutionException: java.lang.RuntimeException: error
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1887)
at futurepackge.jdk8Future.main(jdk8Future.java:19)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.lang.RuntimeException: error
at futurepackge.jdk8Future.lambda$main$0(jdk8Future.java:13)
at futurepackge.jdk8Future$$Lambda$1/1768305536.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)
在我們新建的異步線程中直接New一個異常拋出,在我們客戶端中依然可以獲得異常噪窘。
工廠方法創(chuàng)建CompletableFuture
我們的上面的代碼雖然不復(fù)雜笋庄,但是我們的java8依然對其提供了大量的工廠方法,用這些方法更容易完成整個流程倔监。如下面的例子:
public static void main(String[] args) {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() ->{
return Thread.currentThread().getName();
});
doSomethingelse();//做你想做的耗時操作
try {
System.out.println(completableFuture.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
---------
輸出:
ForkJoinPool.commonPool-worker-1
上面的例子通過工廠方法supplyAsync提供了一個Completable直砂,在異步線程中的輸出是ForkJoinPool可以看出當(dāng)我們不指定線程池的時候會使用ForkJoinPool,而我們上面的compelte的操作在我們的run方法中做了,源代碼如下:
public void run() {
CompletableFuture<T> d; Supplier<T> f;
if ((d = dep) != null && (f = fn) != null) {
dep = null;
fn = null;
if (d.result == null) {
try {
d.completeValue(f.get());
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
d.postComplete();
}
}
上面代碼中通過d.completeValue(f.get());設(shè)置了我們的值浩习。同樣的構(gòu)造方法還有runasync等等静暂。
計算結(jié)果完成時的處理
當(dāng)CompletableFuture計算結(jié)果完成時,我們需要對結(jié)果進行處理,或者當(dāng)CompletableFuture產(chǎn)生異常的時候需要對異常進行處理瘦锹。有如下幾種方法:
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
上面的四種方法都返回了CompletableFuture籍嘹,當(dāng)我們Action執(zhí)行完畢的時候,future返回的值和我們原始的CompletableFuture的值是一樣的弯院。上面以Async結(jié)尾的會在新的線程池中執(zhí)行辱士,上面沒有一Async結(jié)尾的會在之前的CompletableFuture執(zhí)行的線程中執(zhí)行。例子代碼如下:
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(jdk8Future::getMoreData);
Future<Integer> f = future.whenComplete((v, e) -> {
System.out.println(Thread.currentThread().getName());
System.out.println(v);
});
System.out.println("Main" + Thread.currentThread().getName());
System.out.println(f.get());
}
exceptionally方法返回一個新的CompletableFuture听绳,當(dāng)原始的CompletableFuture拋出異常的時候颂碘,就會觸發(fā)這個CompletableFuture的計算,調(diào)用function計算值,否則如果原始的CompletableFuture正常計算完后头岔,這個新的CompletableFuture也計算完成塔拳,它的值和原始的CompletableFuture的計算的值相同。也就是這個exceptionally方法用來處理異常的情況峡竣。
上面我們討論了如何計算結(jié)果完成時進行的處理靠抑,接下來我們討論如何對計算結(jié)果完成時,對結(jié)果進行轉(zhuǎn)換适掰。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
這里同樣也是返回CompletableFuture颂碧,但是這個結(jié)果會由我們自定義返回去轉(zhuǎn)換他,同樣的不以Async結(jié)尾的方法由原來的線程計算类浪,以Async結(jié)尾的方法由默認(rèn)的線程池ForkJoinPool.commonPool()或者指定的線程池executor運行载城。Java的CompletableFuture類總是遵循這樣的原則,下面就不一一贅述了费就。
例子代碼如下:
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 10;
});
CompletableFuture<String> f = future.thenApply(i ->i+1 ).thenApply(i-> String.valueOf(i));
System.out.println(f.get());
}
上面的最終結(jié)果會輸出11诉瓦,我們成功將其用兩個thenApply轉(zhuǎn)換為String。
計算結(jié)果完成時的消費
上面已經(jīng)講了結(jié)果完成時的處理和轉(zhuǎn)換力细,他們最后的CompletableFuture都會返回對應(yīng)的值睬澡,這里還會有一個只會對計算結(jié)果消費不會返回任何結(jié)果的方法。
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
函數(shù)接口為Consumer眠蚂,就知道了只會對函數(shù)進行消費猴贰,例子代碼如下:
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 10;
});
future.thenAccept(System.out::println);
}
這個方法用法很簡單我就不多說了.Accept家族還有個方法是用來合并結(jié)果當(dāng)兩個CompletionStage都正常執(zhí)行的時候就會執(zhí)行提供的action,它用來組合另外一個異步的結(jié)果河狐。
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor)
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
runAfterBoth是當(dāng)兩個CompletionStage都正常完成計算的時候,執(zhí)行一個Runnable米绕,這個Runnable并不使用計算的結(jié)果。 示例代碼如下:
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 10;
});
System.out.println(future.thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
return 20;
}),(x,y) -> System.out.println(x+y)).get());
}
CompletableFuture也提供了執(zhí)行Runnable的辦法馋艺,這里我們就不能使用我們future中的值了栅干。
public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
對計算結(jié)果的組合
首先是介紹一下連接兩個future的方法:
public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)
對于Compose可以連接兩個CompletableFuture,其內(nèi)部處理邏輯是當(dāng)?shù)谝粋€CompletableFuture處理沒有完成時會合并成一個CompletableFuture,如果處理完成捐祠,第二個future會緊接上一個CompletableFuture進行處理碱鳞。
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 10;
});
System.out.println(future.thenCompose(i -> CompletableFuture.supplyAsync(() -> { return i+1;})).get());
}
我們上面的thenAcceptBoth講了合并兩個future,但是沒有返回值這里將介紹一個有返回值的方法,如下:
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
例子比較簡單如下:
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 10;
});
CompletableFuture<String> f = future.thenCombine(CompletableFuture.supplyAsync(() -> {
return 20;
}),(x,y) -> {return "計算結(jié)果:"+x+y;});
System.out.println(f.get());
}
上面介紹了兩個future完成的時候應(yīng)該完成的工作踱蛀,接下來介紹任意一個future完成時需要執(zhí)行的工作窿给,方法如下:
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)
上面兩個是一個是純消費不返回結(jié)果,一個是計算后返回結(jié)果率拒。
其他方法
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
allOf方法是當(dāng)所有的CompletableFuture都執(zhí)行完后執(zhí)行計算崩泡。
anyOf方法是當(dāng)任意一個CompletableFuture執(zhí)行完后就會執(zhí)行計算,計算的結(jié)果相同猬膨。