在JDK1.8中 提供了CompletableFuture類來進行異步編程坟岔,下面我們一起看看怎么實現
1.創(chuàng)建異步任務
package com.wwj.test.thread;
import java.util.Optional;
import java.util.concurrent.*;
public class CompletableFutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//創(chuàng)建線程池
ExecutorService executorService = Executors.newFixedThreadPool(100);
//創(chuàng)建有返回值的異步線程
CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
//具體的業(yè)務邏輯
int a = 10 / 5;
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
return a;
}, executorService).handle((value,thr)-> {
//使用handle進行處理也可以用其它方法
//CompletableFuture.whenComplete():用于接收帶有返回值的CompletableFuture對象顶瞳,無法修改返回值。
//CompletableFuture.exceptionally():用于處理異常赶促,只要異步線程中有拋出異常液肌,則進入該方法,修改返回值鸥滨。
//CompletableFuture.handle():用于處理返回結果嗦哆,可以接收返回值和異常,可以對返回值進行修改婿滓。
if (thr != null) {
return -1;
}
System.out.println("后續(xù)線程處理");
System.out.println(Thread.currentThread().getName());
//handle對線程的后續(xù)處理
Optional<Integer> value1 = Optional.ofNullable(value);
Integer integer = value1.get();
return integer;
});
//創(chuàng)建無返回值線程任務
CompletableFuture.runAsync(()->{
//具體的業(yè)務邏輯
int a = 10 / 2;
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
System.out.println(a);
},executorService);
System.out.println("main 線程工作");
int result = integerCompletableFuture.get();
System.out.println(result);
}
}
2.多異步任務進行組合
2.1多異步任務串行
package com.wwj.test.thread;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureTest1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//創(chuàng)建線程池
ExecutorService executorService = Executors.newFixedThreadPool(100);
//創(chuàng)建有返回值的異步線程
CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
//具體的業(yè)務邏輯
System.out.println(Thread.currentThread().getName());
System.out.println("任務1執(zhí)行");
return 0;
});
//在上一個異步線程完成后執(zhí)行下一個異步線程(異步線程的串行)
CompletableFuture<Integer> integerCompletableFuture1 = integerCompletableFuture.thenApplyAsync((value -> {
System.out.println(Thread.currentThread().getName());
System.out.println("任務1的返回值" + value);
return 2;
}), executorService);
System.out.println("main 線程工作");
int result = integerCompletableFuture1.get();
System.out.println(result);
//注:
// 使線程串行執(zhí)行老速,無入參,無返回值
//public CompletableFuture<Void> thenRun(Runnable action);
//public CompletableFuture<Void> thenRunAsync(Runnable action);
//public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor);
//// 使線程串行執(zhí)行凸主,有入參橘券,無返回值
//public CompletableFuture<Void> thenAccept(Consumer<? super T> action);
//public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);
//public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor);
//// 使線程串行執(zhí)行,有入參卿吐,有返回值
//public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
//public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
//public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);
}
}
2.2兩個任務并行執(zhí)行完成后再執(zhí)行下一個任務
package com.wwj.test.thread;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureTest2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//創(chuàng)建線程池
ExecutorService executorService = Executors.newFixedThreadPool(100);
// 任務1
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任務1線程:" + Thread.currentThread().getName());
int i = 10 / 2;
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任務1結束:");
return i;
}, executorService);
// 任務2
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任務2線程:" + Thread.currentThread().getName());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任務2結束");
return 0;
}, executorService);
// 線程并行執(zhí)行完成旁舰,并且執(zhí)行新任務action,新任務無入參嗡官,無返回值
//public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action);
//public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action);
//public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor);
// 線程并行執(zhí)行完成箭窜,并且執(zhí)行新任務action,新任務有入參衍腥,無返回值
//public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);
//public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);
//public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor);
// 線程并行執(zhí)行完成磺樱,并且執(zhí)行新任務action,新任務有入參婆咸,有返回值
//public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
//public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
//public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor);
CompletableFuture<Integer> integerCompletableFuture = completableFuture1.thenCombineAsync(completableFuture2, (value1, value2) -> {
System.out.println("接收前任務參數:" + value1 + ":" + value2);
System.out.println("任務3線程:" + Thread.currentThread().getName());
return 3;
},executorService);
System.out.println("main 線程工作");
int result = integerCompletableFuture.get();
System.out.println(result);
}
}
2.3 兩個異步任務只要其中一個執(zhí)行完就執(zhí)行下一個異步任務
package com.wwj.test.thread;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureTest3 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//創(chuàng)建線程池
ExecutorService executorService = Executors.newFixedThreadPool(100);
// 任務1
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任務1線程:" + Thread.currentThread().getName());
int i = 10 / 2;
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任務1結束:");
return i;
}, executorService);
// 任務2
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任務2線程:" + Thread.currentThread().getName());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任務2結束");
return 0;
}, executorService);
// 任務并行執(zhí)行竹捉,只要其中有一個執(zhí)行完,就開始執(zhí)行新任務action擅耽,新任務無入參活孩,無返回值
//public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action);
//public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action);
//public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor);
// 任務并行執(zhí)行,只要其中有一個執(zhí)行完,就開始執(zhí)行新任務action憾儒,新任務有入參(入參類型為Object询兴,因為不確定是哪個任務先執(zhí)行完成),無返回值
//public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action);
//public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action);
//public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor);
// 任務并行執(zhí)行起趾,只要其中有一個執(zhí)行完诗舰,就開始執(zhí)行新任務action,新任務有入參(入參類型為Object训裆,因為不確定是哪個任務先執(zhí)行完成)眶根,有返回值
//public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn);
//public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn);
//public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor);
CompletableFuture<Integer> integerCompletableFuture = completableFuture1.applyToEitherAsync(completableFuture2, (value) -> {
System.out.println("接收前任務參數:" + value.toString());
System.out.println("任務3線程:" + Thread.currentThread().getName());
return 3;
},executorService);
System.out.println("main 線程工作");
int result = integerCompletableFuture.get();
System.out.println(result);
}
}
2.4 多任務組合(超過兩個任務)
package com.wwj.test.thread;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureTest4 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//創(chuàng)建線程池
ExecutorService executorService = Executors.newFixedThreadPool(100);
// 任務1
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> 1, executorService);
// 任務2
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> 2, executorService);
// 任務3
CompletableFuture<Integer> completableFuture3 = CompletableFuture.supplyAsync(() -> 3, executorService);
//返回3個任務中最快執(zhí)行任務完成的結果
CompletableFuture<Object> anyOf = completableFuture1.anyOf(completableFuture1, completableFuture2, completableFuture3);
Object o1 = anyOf.get();
Object o2 = anyOf.join();
//等待3個任務全部執(zhí)行完畢,在逐一拿回返回結果
//completableFuture1.allOf(completableFuture1, completableFuture2, completableFuture3);
System.out.println("main 線程工作");
System.out.println(o1+":"+o2);
}
}