什么是CompletableFuture拱雏?
在Java 8中, 新增加了一個包含50個方法左右的類: CompletableFuture,結(jié)合了Future的優(yōu)點债朵,提供了非常強大的Future的擴展功能隐砸,可以幫助我們簡化異步編程的復雜性,提供了函數(shù)式編程的能力锄奢,可以通過回調(diào)的方式處理計算結(jié)果,并且提供了轉(zhuǎn)換和組合CompletableFuture的方法剧腻。
CompletableFuture被設(shè)計在Java中進行異步編程拘央。異步編程意味著在主線程之外創(chuàng)建一個獨立的線程,與主線程分隔開恕酸,并在上面運行一個非阻塞的任務(wù)堪滨,然后通知主線程進展胯陋,成功或者失敗蕊温。
通過這種方式,你的主線程不用為了任務(wù)的完成而阻塞/等待遏乔,你可以用主線程去并行執(zhí)行其他的任務(wù)义矛。 使用這種并行方式,極大地提升了程序的表現(xiàn)盟萨。
什么是Supplier凉翻?
DK提供了大量常用的函數(shù)式接口以豐富Lambda的典型使用場景,它們主要在 java.util.function 包中被提供捻激。
java.util.function.Supplier<T> 接口僅包含一個無參的方法: T get() 制轰。用來獲取一個泛型參數(shù)指定類型的對象數(shù)據(jù)。
下面通過2個Demo列出疑問點和最終問題的解答
- 首先定義一個線程池,嚴謹點不要使用JDK自帶的工具類
/**
* OMS訂單線程池
*/
public static final ListeningExecutorService executorService = MoreExecutors.listeningDecorator(
new ThreadPoolExecutor(
50,
100,
60L,
TimeUnit.SECONDS,new LinkedBlockingQueue(20000),
new DefaultThreadFactory("omsOrder"),
new ThreadPoolExecutor.CallerRunsPolicy()));
}
- 為了方便驗證胞谭,我重寫了DefaultThreadFactory線程池工廠自定義線程名稱垃杖,這樣方便測試
public class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
/**
* 自定義線程名稱
* @param threadName
*/
DefaultThreadFactory(String threadName) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-"+threadName;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
- Demo1:
@Override
public CommonResp sendSms() {
//準備基礎(chǔ)數(shù)據(jù),一個訂單集合丈屹,多少個訂單就發(fā)送多個條短信
List<OmsOrderDO> omsOrderDOList = iOmsOrderRepository.queryListOmsOrderDO();
//批量發(fā)送
List<Pair<Long,String>> resultList = new CopyOnWriteArrayList<>();
CompletableFuture[] completableFutures = omsOrderDOList.stream().map(omsOrderDO ->
CompletableFuture.supplyAsync(() -> test(omsOrderDO, this::send), ExecutorConfig.executorService).whenCompleteAsync((val, e) -> {
if (val != null) {
resultList.add(val);
}
}).exceptionally(e -> {
log.error("com.formssi.mall.order.application.impl.OmsOrderItemServiceImpl.sendSms:e{}", e);
return null;
})
).toArray(CompletableFuture[]::new);
CompletableFuture.allOf(completableFutures).join();
System.out.println(Thread.currentThread().getName()+"主線程執(zhí)行完成");
resultList.forEach(System.out::println);
return CommonResp.ok(resultList);
}
上面幾行代碼的邏輯是:到數(shù)據(jù)庫查詢訂單集合调俘,遍歷通過CompletableFuture線程池批量調(diào)用send方法發(fā)送短信,然后打印主線程的線程名旺垒,send方面也打印了當前執(zhí)行的線程名稱彩库,代碼如下:
private Pair<Long,String> send(OmsOrderDO omsOrderDO){
//調(diào)用短信服務(wù)發(fā)送短信
MessageCmd messageCmd = new MessageCmd();
messageCmd.setTos(Arrays.asList(omsOrderDO.getBillReceiverEmail()));
messageCmd.setSubject(omsOrderDO.getOrderSn());
messageCmd.setText(omsOrderDO.getReceiverName());
CommonResp smsCaptcha = commonClient.sendBatchSMS(messageCmd);
//打印當前線程的名稱
System.out.println("我是線程:"+Thread.currentThread().getName());
return new Pair(omsOrderDO.getId(),smsCaptcha.getCode());
}
然后執(zhí)行,看結(jié)果:
我是線程:pool-1-thread-omsOrder1
我是線程:pool-1-thread-omsOrder2
我是線程:pool-1-thread-omsOrder3
http-nio-18086-exec-3主線程執(zhí)行完成
- 下面來看Demo2:
@Override
public CommonResp sendSms2() {
//基礎(chǔ)數(shù)據(jù)
List<OmsOrderDO> omsOrderDOList = iOmsOrderRepository.queryListOmsOrderDO();
//遍歷基礎(chǔ)數(shù)據(jù)的同事去調(diào)用send方法發(fā)送短信先蒋,將返回的結(jié)果放到集合
List<Supplier<Pair<Long,String>>> omsOrderDOSupplierList = new ArrayList<>();
for (OmsOrderDO omsOrderDO:omsOrderDOList){
omsOrderDOSupplierList.add(()->send(omsOrderDO));
}
//遍歷集合骇钦,將結(jié)果放入到CompletableFuture,這里我就有一個疑問了竞漾,這里放入的是一個發(fā)送結(jié)果眯搭,
// 那是不是代碼走到這里的時候就sen方法就已經(jīng)同步的執(zhí)行完了皇忿,沒有走線程池呢?
List<Pair<Long,String>> resultList = new CopyOnWriteArrayList<>();
CompletableFuture[] completableFutures = omsOrderDOSupplierList.stream().map(sup ->
CompletableFuture.supplyAsync(sup, ExecutorConfig.executorService).whenCompleteAsync((r, e) -> {
if (r != null){
resultList.add(r);
}
}).exceptionally(e -> {
log.error("com.formssi.mall.order.application.impl.OmsOrderItemServiceImpl.sendSms2:e{}", e);
return null;
})
).toArray(CompletableFuture[]::new);
System.out.println(Thread.currentThread().getName()+"主線程執(zhí)行完成");
CompletableFuture.allOf(completableFutures).join();
return CommonResp.ok(resultList);
}
上面注釋我寫明我的疑問點坦仍,我一直認為這樣寫send方法是沒有經(jīng)過線程池的鳍烁,為了證明send方式是走的同步還是異步,執(zhí)行輸出線程的名稱就能確定了繁扎,輸入結(jié)果如下:
我是線程:pool-1-thread-omsOrder4
http-nio-18086-exec-1主線程執(zhí)行完成
我是線程:pool-1-thread-omsOrder5
我是線程:pool-1-thread-omsOrder6
從上面執(zhí)行的結(jié)果來看幔荒,send方法就是通過線程池執(zhí)行的,都快自閉了梳玫,怎么都想不明白爹梁,后來請教我同事龍哥,他給我說了情況提澎,我才理解:首先Supplier是一個函數(shù)姚垃,這個函數(shù)有一個特征類似延遲加載,也可以理解成一個回調(diào)函數(shù)盼忌,只有執(zhí)行Supplier.get()的時候這個函數(shù)才是真正的執(zhí)行积糯,所以我上面 omsOrderDOSupplierList.add(()->send(omsOrderDO));這行代碼只是放了一個函數(shù)進去,沒有真正的執(zhí)行send方法谦纱,為了驗證我同事的這個說法看成,我把線程池那一段代碼注釋掉在執(zhí)行,看下send方法里面的日志是否會打印跨嘉,代碼如下:
@Override
public CommonResp sendSms2() {
//基礎(chǔ)數(shù)據(jù)
List<OmsOrderDO> omsOrderDOList = iOmsOrderRepository.queryListOmsOrderDO();
//遍歷基礎(chǔ)數(shù)據(jù)的同事去調(diào)用send方法發(fā)送短信川慌,將返回的結(jié)果放到集合
List<Supplier<Pair<Long,String>>> omsOrderDOSupplierList = new ArrayList<>();
for (OmsOrderDO omsOrderDO:omsOrderDOList){
omsOrderDOSupplierList.add(()->send(omsOrderDO));
}
//遍歷集合,將結(jié)果放入到CompletableFuture祠乃,這里我就有一個疑問了梦重,這里放入的是一個發(fā)送結(jié)果,
// 那是不是代碼走到這里的時候就sen方法就已經(jīng)同步的執(zhí)行完了亮瓷,沒有走線程池呢琴拧?
List<Pair<Long,String>> resultList = new CopyOnWriteArrayList<>();
/* CompletableFuture[] completableFutures = omsOrderDOSupplierList.stream().map(sup ->
CompletableFuture.supplyAsync(sup, ExecutorConfig.executorService).whenCompleteAsync((r, e) -> {
if (r != null){
resultList.add(r);
}
}).exceptionally(e -> {
log.error("com.formssi.mall.order.application.impl.OmsOrderItemServiceImpl.sendSms2:e{}", e);
return null;
})
).toArray(CompletableFuture[]::new);
CompletableFuture.allOf(completableFutures).join();*/
System.out.println(Thread.currentThread().getName()+"主線程執(zhí)行完成");
return CommonResp.ok(resultList);
}
- 重啟服務(wù),然后看下結(jié)果:
2022-05-11 17:21:00.908 INFO 1300 --- [ main] c.a.c.n.registry.NacosServiceRegistry : nacos registry, DEFAULT_GROUP oms-server 10.31.3.154:18086 register finished
2022-05-11 17:21:00.995 INFO 1300 --- [ main] com.formssi.mall.order.OrderApplication : Started OrderApplication in 9.394 seconds (JVM running for 10.426)
2022-05-11 17:21:13.794 INFO 1300 --- [io-18086-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2022-05-11 17:21:13.794 INFO 1300 --- [io-18086-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2022-05-11 17:21:13.804 INFO 1300 --- [io-18086-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 10 ms
http-nio-18086-exec-1主線程執(zhí)行完成
果然沒有打印send里面的線程日志寺庄,說明send方法確實沒有執(zhí)行艾蓝,最后在看下omsOrderDOSupplierList.add(()->send(omsOrderDO));中的這個omsOrderDOSupplierList這個對象里面是不是放的對應的函數(shù),如圖:
從上圖來看斗塘,omsOrderDOSupplierList確實存放3個函數(shù)赢织,好了收工!