CompletableFuture常見用法,CompletableFuture使用示例责球,CompletableFuture等待所有異步任務(wù)返回焦履,包括常見的用法,結(jié)合自定義線程池使用雏逾。結(jié)合實(shí)際業(yè)務(wù)Demo代碼運(yùn)行效果如下嘉裤,可直接運(yùn)行查看效果,具體請(qǐng)參考如下代碼
1.創(chuàng)建CompletableFuture對(duì)象
方法名 功能描述
completedFuture(U value) 返回一個(gè)已經(jīng)計(jì)算好的CompletableFuture
runAsync(Runnable runnable) 使用ForkJoinPool.commonPool()作為線程池執(zhí)行任務(wù)栖博,沒有返回值
runAsync(Runnable runnable, Executor executor) 使用指定的線程池執(zhí)行任務(wù)屑宠,沒有返回值
supplyAsync(Supplier<U> supplier) 使用ForkJoinPool.commonPool()作為線程池執(zhí)行任務(wù),有返回值
supplyAsync(Supplier<U> supplier, Executor executor) 使用指定的線程池執(zhí)行任務(wù)仇让,有返回值
各種使用示例如下(可直接復(fù)制運(yùn)行):
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1.創(chuàng)建CompletableFuture對(duì)象
// 方法名 功能描述
// completedFuture(U value) 返回一個(gè)已經(jīng)計(jì)算好的CompletableFuture
// runAsync(Runnable runnable) 使用ForkJoinPool.commonPool()作為線程池執(zhí)行任務(wù)典奉,沒有返回值
// runAsync(Runnable runnable, Executor executor) 使用指定的線程池執(zhí)行任務(wù)躺翻,沒有返回值
// supplyAsync(Supplier<U> supplier) 使用ForkJoinPool.commonPool()作為線程池執(zhí)行任務(wù),有返回值
// supplyAsync(Supplier<U> supplier, Executor executor) 使用指定的線程池執(zhí)行任務(wù)卫玖,有返回值
CompletableFuture<Integer> intFuture = CompletableFuture.completedFuture(100);
// 100
System.out.println(intFuture.get());
CompletableFuture<Void> voidFuture = CompletableFuture.runAsync(() -> System.out.println("hello"));
// null
System.out.println(voidFuture.get());
CompletableFuture<String> stringFuture = CompletableFuture.supplyAsync(() -> "hello");
// hello
System.out.println("stringFuture="+stringFuture.get());
// 2.計(jì)算結(jié)果完成時(shí).可以執(zhí)行的方法
// 方法名
// whenComplete(BiConsumer<? super T,? super Throwable> action)
// whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
// whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "hello";
}).whenComplete((v, e) -> {
// hello
System.out.println(v);
});
// hello
System.out.println(future.get());
// 3.轉(zhuǎn)換公你,消費(fèi),執(zhí)行
// 方法名 功能描述
// thenApply 獲取上一個(gè)任務(wù)的返回假瞬,并返回當(dāng)前任務(wù)的值
// thenAccept 獲取上一個(gè)任務(wù)的返回陕靠,單純消費(fèi),沒有返回值
// thenRun 上一個(gè)任務(wù)執(zhí)行完成后脱茉,開始執(zhí)行thenRun中的任務(wù)
CompletableFuture.supplyAsync(() -> {
return "hello ";
}).thenAccept(str -> {
// hello world
System.out.println(str + "world");
}).thenRun(() -> {
// task finish
System.out.println("task finish");
});
// 4.組合(兩個(gè)任務(wù)都完成)
// 方法名 描述
// thenCombine 組合兩個(gè)future剪芥,獲取兩個(gè)future的返回結(jié)果,并返回當(dāng)前任務(wù)的返回值
// thenAcceptBoth 組合兩個(gè)future琴许,獲取兩個(gè)future任務(wù)的返回結(jié)果税肪,然后處理任務(wù),沒有返回值
// runAfterBoth 組合兩個(gè)future榜田,不需要獲取future的結(jié)果益兄,只需兩個(gè)future處理完任務(wù)后,處理該任務(wù)
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
return "歡迎關(guān)注 ";
}).thenApply(t -> {
return t + "微信公眾號(hào) ";
}).thenCombine(CompletableFuture.completedFuture("Java"), (t, u) -> {
return t + u;
}).whenComplete((t, e) -> {
// 歡迎關(guān)注 微信公眾號(hào) Java
System.out.println(t);
});
// 5.組合(只需要一個(gè)任務(wù)完成)
// 方法名 描述
// applyToEither 兩個(gè)任務(wù)有一個(gè)執(zhí)行完成串慰,獲取它的返回值偏塞,處理任務(wù)并返回當(dāng)前任務(wù)的返回值
// acceptEither 兩個(gè)任務(wù)有一個(gè)執(zhí)行完成,獲取它的返回值邦鲫,處理任務(wù)灸叼,沒有返回值
// runAfterEither 兩個(gè)任務(wù)有一個(gè)執(zhí)行完成,不需要獲取future的結(jié)果庆捺,處理任務(wù)古今,也沒有返回值
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "歡迎關(guān)注微信公眾號(hào)";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "Java";
});
CompletableFuture<String> future3 = future1.applyToEither(future2, str -> str);
// 歡迎關(guān)注微信公眾號(hào) Java 隨機(jī)輸出
System.out.println(future3.get());
// 6.多任務(wù)組合
// 方法名 描述
// allOf 當(dāng)所有的CompletableFuture完成后執(zhí)行計(jì)算
// anyOf 任意一個(gè)CompletableFuture完成后執(zhí)行計(jì)算
// allOf的使用
CompletableFuture<String> future1A = CompletableFuture.supplyAsync(() -> {
sleepRandom();
return "歡迎關(guān)注";
});
CompletableFuture<String> future2A = CompletableFuture.supplyAsync(() -> {
sleepRandom();
return "微信公眾號(hào)";
});
CompletableFuture<String> future3A = CompletableFuture.supplyAsync(() -> {
sleepRandom();
return "Java識(shí)堂";
});
// 歡迎關(guān)注 微信公眾號(hào) Java
CompletableFuture.allOf(future1A, future2A, future3A)
.thenApply(v -> {
List<Object> collect = Stream.of(future1, future2, future3A)
.map(CompletableFuture::join)
.collect(Collectors.toList());
return collect;
}
)
.thenAccept(System.out::print);
// anyOf的使用
CompletableFuture<String> future1C = CompletableFuture.supplyAsync(() -> {
sleepRandom();
return "歡迎關(guān)注";
});
CompletableFuture<String> future2C = CompletableFuture.supplyAsync(() -> {
sleepRandom();
return "微信公眾號(hào)";
});
CompletableFuture<String> future3C = CompletableFuture.supplyAsync(() -> {
sleepRandom();
return "Java";
});
CompletableFuture<Object> resultFuture = CompletableFuture.anyOf(future1C, future2C, future3C);
// 歡迎關(guān)注 微信公眾號(hào) Java 隨機(jī)輸出
System.out.println(resultFuture.get());
// 7.異常處理
// exceptionally 捕獲異常嚷兔,進(jìn)行處理
CompletableFuture<Integer> futureC = CompletableFuture.supplyAsync(() -> {
return 100 / 0;
}).thenApply(num -> {
return num + 10;
}).exceptionally(throwable -> {
return 0;
});
// 0
System.out.println(future.get());
// 當(dāng)然有一些接口能捕獲異常
CompletableFuture futureAAA = CompletableFuture.supplyAsync(() -> {
String str = null;
return str.length();
}).whenComplete((v, e) -> {
if (e == null) {
System.out.println("正常結(jié)果為" + v);
} else {
// 發(fā)生異常了java.util.concurrent.CompletionException: java.lang.NullPointerException
System.out.println("發(fā)生異常了" + e.toString());
}
});
// 8.集合業(yè)務(wù)使用示例举瑰,假設(shè)stringList為業(yè)務(wù)執(zhí)行集合
ExecutorService executor = new ThreadPoolExecutor(10, 16, 10, TimeUnit.MICROSECONDS, new ArrayBlockingQueue<>(1000));
List<String> stringList = new ArrayList<>();
for (int i = 0; i < 100; i++) {
stringList.add("a" + i);
}
CompletableFuture<Void> all = null;
CompletableFuture<Void> all1 = null;
// 開始我們的業(yè)務(wù)處理
for (String personName : stringList) {
CompletableFuture<Object> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
return null;
}).whenComplete((v, e) -> {
if (e == null) {
System.out.println("正常結(jié)果為" + v);
} else {
// 發(fā)生異常了java.util.concurrent.CompletionException: java.lang.NullPointerException
System.out.println("發(fā)生異常了" + e.toString());
}
});
all = CompletableFuture.allOf(stringCompletableFuture);
CompletableFuture<String> stringCompletableFuture1 = CompletableFuture.supplyAsync(() -> {
// 模擬業(yè)務(wù)邏輯,say hello world
System.out.println(personName + ": Hello World!");
return "task finished!";
});
all1 = CompletableFuture.allOf(stringCompletableFuture1);
}
// 開始等待所有任務(wù)執(zhí)行完成
all.join();
all1.join();
// 使用JDK 1.8的特性何陆,stream()和Lambda表達(dá)式: (參數(shù)) -> {表達(dá)式}
long start = System.currentTimeMillis();
if (CollectionUtils.isEmpty(stringList)) {
return;
}
final CompletableFuture[] completableFutures = stringList.stream().
map(t -> CompletableFuture
.supplyAsync(() -> pause(t), executor)
.whenComplete((result, th) -> {
System.out.println("hello" + result);
})).toArray(CompletableFuture[]::new);
// 開始等待所有任務(wù)執(zhí)行完成
System.out.println("start block");
CompletableFuture.allOf(completableFutures).join();
System.out.println("block finish, consume time:" + (System.currentTimeMillis() - start));
stringList.forEach(name -> CompletableFuture.supplyAsync(() -> {
// 封裝了業(yè)務(wù)邏輯
System.out.println("name = " + name);
return "success";
}).exceptionally(e -> {
System.out.println(e);
return "false";
}));
// 關(guān)閉線程池
executor.shutdown();
}
private static void sleepRandom() {
System.out.println("測(cè)試方法");
}
public static String pause(String name) {
try {
Thread.sleep(300);
} catch (Exception e) {
e.printStackTrace();
}
return name;
}
}
簡(jiǎn)約寫法CompletableFuture執(zhí)行任務(wù)并返回
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
/**
* @author lisanwei24282
*/
public class CompletableFutureProblem {
public void batchOperation() {
/* 集合任務(wù)你画,統(tǒng)一處理 */
List<String> stringList = new ArrayList<>();
stringList.add("task1");
stringList.add("task2");
/* 任務(wù)提交匯總 */
List<CompletableFuture<String>> futures = new ArrayList<>();
stringList.parallelStream().forEach(str -> {
/* 調(diào)用業(yè)務(wù)方法 */
CompletableFuture<String> response = restApiCall(str);
futures.add(response);
});
/* 等待所有返回 */
CompletableFuture<Void> result = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
CompletableFuture<List<String>> convertedResult = result.thenApply(v ->
futures.stream().map(CompletableFuture::join).collect(Collectors.toList())
);
/* 獲取返回?cái)?shù)據(jù)結(jié)果 */
try {
List<String> finishedTask = convertedResult.get();
System.out.println(finishedTask.toString());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
/* 假設(shè)執(zhí)行具體業(yè)務(wù)邏輯功能 */
public CompletableFuture<String> restApiCall(String str) {
return CompletableFuture.supplyAsync(() -> {
return "Complete-" + str;
});
}
public static void main(String[] args) {
CompletableFutureProblem problem = new CompletableFutureProblem();
problem.batchOperation();
}
}