CompletableFuture實(shí)現(xiàn)異步并阻塞獲取返回結(jié)果,巧用CompletableFuture返回值解決性能瓶頸,線程池,異步編排
參考: https://blog.csdn.net/LUOHUAPINGWIN/article/details/122222011
? ? ? https://blog.csdn.net/sunquan291/article/details/103991184
配置:
gulimall.thread.coreSize=20
gulimall.thread.maxSize=200
gulimall.thread.keepAliveTime=10
讀取配置:
package com.xunqi.gulimall.order.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* @Description:
* @Created: with IntelliJ IDEA.
* @author: 夏沫止水
* @createTime: 2020-06-23 20:28
**/
@ConfigurationProperties(prefix = "gulimall.thread")
// @Component
@Data
public class ThreadPoolConfigProperties {
? ? private Integer coreSize;
? ? private Integer maxSize;
? ? private Integer keepAliveTime;
}
注入線程池:
package com.xunqi.gulimall.order.config;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @Description: 線程池配置類
* @Created: with IntelliJ IDEA.
* @author: 夏沫止水
* @createTime: 2020-06-23 20:24
**/
@EnableConfigurationProperties(ThreadPoolConfigProperties.class)
@Configuration
public class MyThreadConfig {
? ? @Bean
? ? public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties pool) {
? ? ? ? return new ThreadPoolExecutor(
? ? ? ? ? ? ? ? pool.getCoreSize(),
? ? ? ? ? ? ? ? pool.getMaxSize(),
? ? ? ? ? ? ? ? pool.getKeepAliveTime(),
? ? ? ? ? ? ? ? TimeUnit.SECONDS,
? ? ? ? ? ? ? ? new LinkedBlockingDeque<>(100000),
? ? ? ? ? ? ? ? Executors.defaultThreadFactory(),
? ? ? ? ? ? ? ? new ThreadPoolExecutor.AbortPolicy()
? ? ? ? );
? ? }
}
使用:
? ? @Autowired
? ? private ThreadPoolExecutor threadPoolExecutor;
@Override
? ? public List<WxUserInfo> getWxUserInfoByUid(String appid, List<Long> uidList) {
? ? ? ? // 數(shù)據(jù)太多了.分片執(zhí)行
? ? ? ? List<List<Long>> uidListGroupList = CollectionUtil.split(uidList, 500);
? ? ? ? List<CompletableFuture<List<WxUserInfo>>> futures = uidListGroupList.stream().map(list -> {
? ? ? ? ? ? return CompletableFuture.supplyAsync(() -> {
? ? ? ? ? ? ? ? RestResult<List<WxUserInfo>> wxUserInfoByAppIdUid = passportFeignService.getWxUserInfoByAppIdUid(appid, list, appName);
? ? ? ? ? ? ? ? return wxUserInfoByAppIdUid.getData();
? ? ? ? ? ? }, threadPoolExecutor);
? ? ? ? }).collect(Collectors.toList());
? ? ? ? // List<WxUserInfo> collect = futures.stream().map(p -> {
? ? ? ? //? ? try {
? ? ? ? //? ? ? ? return p.get();
? ? ? ? //? ? } catch (InterruptedException e) {
? ? ? ? //? ? ? ? e.printStackTrace();
? ? ? ? //? ? } catch (ExecutionException e) {
? ? ? ? //? ? ? ? e.printStackTrace();
? ? ? ? //? ? }
? ? ? ? //? ? return null;
? ? ? ? // }).filter(Objects::nonNull).flatMap(List::stream).collect(Collectors.toList());
? ? ? ? List<WxUserInfo> biddingList = futures.stream().map(CompletableFuture::join).filter(Objects::nonNull).flatMap(List::stream).collect(Collectors.toList());
? ? ? ? return biddingList;
? ? }