| 一宿崭、如何創(chuàng)建線程池? |
1、七大參數(shù)介紹
| 1)corePoolSize |
核心線程數(shù),一直存在線程池中(除非設(shè)置了allowCoreThreadTimeOut)垃你,創(chuàng)建好就等待就緒趾娃,去執(zhí)行任務(wù)
| 2)maximumPoolSize |
最大線程數(shù)缭嫡,設(shè)置最大線程數(shù)是為了控制資源
| 3)keepAliveTime |
存活時(shí)間,如果當(dāng)前的線程數(shù)大于核心線程數(shù)抬闷,并且線程空閑的時(shí)間大于存活時(shí)間了妇蛀,則會(huì)執(zhí)行釋放線程的操作。(釋放的數(shù)量為:maximumPoolSize - corePoolSize)
| 4)unit |
時(shí)間單位
| 5)workQueue |
阻塞隊(duì)列笤成,如果任務(wù)有很多评架,就會(huì)將目前多的任務(wù)放到隊(duì)列中,當(dāng)有空閑的線程時(shí)炕泳,就會(huì)從隊(duì)列中取出新的任務(wù)繼續(xù)執(zhí)行纵诞。
| 6)threadFactory |
線程的創(chuàng)建工廠
| 7)handler |
拒絕策略,如果隊(duì)列滿了培遵,按照我們指定的拒絕策略拒絕執(zhí)行任務(wù)
有哪些拒絕策略浙芙?
DiscardOldestPolicy
如果有新的任務(wù)進(jìn)來就會(huì)丟去最舊的未執(zhí)行的任務(wù)AbortPolicy
直接丟棄新任務(wù)登刺,拋出異常CallerRunsPolicy
如果有新任務(wù)進(jìn)來,直接調(diào)用run()方法茁裙,同步執(zhí)行操作
-
DiscardPolicy
直接丟棄新進(jìn)來的任務(wù)塘砸,不會(huì)拋出異常
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
// 常見的創(chuàng)建線程的方式
// 1)Executors . newCachedThreadApol() // 核心為0,所有都可回收的線程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
// 2)Ехесutоr? . nеwF?хеdТhrеаdРооl(xiāng)() 固定大小的線程池晤锥,不會(huì)過期
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
// 3)Executors . newScheduledThreadPool() 定時(shí)任務(wù)的線程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
// 4)Executors . newSingleThreadExecutor() 單線程的線程池掉蔬,后臺(tái)從隊(duì)列中獲取任務(wù),一個(gè)一個(gè)執(zhí)行
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
問:一個(gè)corePoolSize=7 maximumPoolSize=20 workQueue=50的線程池矾瘾,如果本次有100個(gè)并發(fā)進(jìn)來女轿,是如何執(zhí)行的?
答:7個(gè)會(huì)立即被執(zhí)行壕翩,50個(gè)會(huì)進(jìn)入隊(duì)列蛉迹,然后會(huì)另外開13個(gè)新的線程,剩余的30個(gè)線程就需要看當(dāng)前線程池的拒絕策略了放妈。
| 二北救、CompletableFeture異步編排 |
1、runAsync 創(chuàng)建異步對(duì)象的方式
// 1)無返回值的異步操作
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
// 2)無返回值的異步操作,可指定線程池
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}
// 3)有返回值的異步操作
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
// 4)有返回值的異步操作,可指定線程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}
2芜抒、whenComplete 計(jì)算完成時(shí)回調(diào)的方法
1)方法介紹
// 上一個(gè)異步完成時(shí)執(zhí)行該方法珍策,和上一個(gè)任務(wù)用同一個(gè)線程
public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(null, action);
}
// 上一個(gè)異步完成時(shí)執(zhí)行該方法,異步的方式執(zhí)行
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(asyncPool, action);
}
// 上一個(gè)異步完成時(shí)執(zhí)行該方法宅倒,異步的方式執(zhí)行攘宙,可以自己指定線程池
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action, Executor executor) {
return uniWhenCompleteStage(screenExecutor(executor), action);
}
// 處理異常
public CompletableFuture<T> exceptionally(
Function<Throwable, ? extends T> fn) {
return uniExceptionallyStage(fn);
}
2)示例代碼
// 示例代碼線程池
public static ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("當(dāng)前線程號(hào) -> " + Thread.currentThread().getId());
int n = 10 / 0;
return n;
}, executor).whenComplete((result,excption) -> {
System.out.println("運(yùn)行結(jié)果:" + result + "異常:" + excption);
}).exceptionally(throwable -> {
// 出現(xiàn)異常 exceptionally感知并處理異常,返回最終結(jié)果
return 10;
});
Integer integer = future.get();
System.out.println("最終運(yùn)行結(jié)果:" + integer); // 10
3拐迁、handleAsync 方法
1)方法介紹
// 上一個(gè)方法執(zhí)行后作出的處理
public <U> CompletableFuture<U> handle(
BiFunction<? super T, Throwable, ? extends U> fn) {
return uniHandleStage(null, fn);
}
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn) {
return uniHandleStage(asyncPool, fn);
}
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
return uniHandleStage(screenExecutor(executor), fn);
}
2)示例代碼
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("當(dāng)前線程號(hào) -> " + Thread.currentThread().getId());
int n = 10 / 0;
return n;
}, executor).handle((res, exception) -> {
if (res != null) {
// 如果上一個(gè)任務(wù)沒出現(xiàn)異常蹭劈,修改返回結(jié)果
return res * 10;
}
if (exception != null) {
// 上一個(gè)任務(wù)出現(xiàn)了異常
return 0;
}
return 0;
});
4、線程串行化方法
1)方法介紹
thenApply方法:當(dāng)一個(gè)線程依賴另一個(gè) 線程時(shí)线召,獄取上一個(gè)任務(wù)返回的結(jié)果铺韧,開返回當(dāng)前任務(wù)的返回值。
thenAccept方法:消費(fèi)處理結(jié)果缓淹。接收任務(wù)的處理結(jié)果祟蚀,并消費(fèi)處理,無返回結(jié)果割卖。
thenRun方法:只要上面的任務(wù)執(zhí)行完成前酿,就開始執(zhí)行thenRun,只是處理完任務(wù)后,執(zhí)行thenRun的后續(xù)操作
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn) {
return uniApplyStage(asyncPool, fn);
}
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn, Executor executor) {
return uniApplyStage(screenExecutor(executor), fn);
}
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
return uniAcceptStage(asyncPool, action);
}
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
Executor executor) {
return uniAcceptStage(screenExecutor(executor), action);
}
public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(asyncPool, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action,
Executor executor) {
return uniRunStage(screenExecutor(executor), action);
}
2)示例代碼
| ① thenRunAsync |
thenRunAsync 不能獲取上一步執(zhí)行結(jié)果
// thenRunAsync 不能獲取上一步執(zhí)行結(jié)果
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
System.out.println("當(dāng)前線程號(hào) -> " + Thread.currentThread().getId());
int n = 10 / 0;
return n;
}, executor).thenRunAsync(() -> {
System.out.println("線程2運(yùn)行了鹏溯!");
}, executor);
| ② thenAcceptAsync |
thenAcceptAsync可以獲取上一個(gè)任務(wù)執(zhí)行的結(jié)果罢维,但是無法對(duì)其進(jìn)行修改
// thenAcceptAsync可以獲取上一個(gè)任務(wù)執(zhí)行的結(jié)果,但是無法對(duì)其進(jìn)行修改
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
System.out.println("當(dāng)前線程號(hào) -> " + Thread.currentThread().getId());
int n = 10;
return n;
}, executor).thenAcceptAsync((res) -> {
// 如果上一個(gè)任務(wù)產(chǎn)生異常或者執(zhí)行失敗肺孵,則不執(zhí)行該任務(wù)
System.out.println("上一個(gè)任務(wù)獲取的結(jié)果:" + res);
}, executor);
| ③ thenApplyAsync |
thenApplyAsync 可以獲取上一個(gè)任務(wù)返回的結(jié)果匀借,并對(duì)其進(jìn)行修改再返回
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("當(dāng)前線程號(hào) -> " + Thread.currentThread().getId());
int n = 10;
return n;
}, executor).thenApplyAsync((res) -> {
return res * 2;
}, executor);
Integer result = future.get();
System.out.println("最終返回結(jié)果:" + result);
5、組合任務(wù)平窘,一個(gè)完成
1)方法介紹
applyToEitherAsync:阻塞等待吓肋,只要有一個(gè)任務(wù)完成了,就執(zhí)行該任務(wù)
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn,
Executor executor) {
return orApplyStage(screenExecutor(executor), other, fn);
}
2)示例代碼
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("當(dāng)前線程號(hào) -> " + Thread.currentThread().getId());
int n = 5;
// 模擬這個(gè)任務(wù)比較慢完成瑰艘,讓future2先完成是鬼,測(cè)試applyToEitherAsync 只要有一個(gè)任務(wù)完成就執(zhí)行
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return n;
}, executor);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("當(dāng)前線程號(hào) -> " + Thread.currentThread().getId());
int n = 10;
return n;
}, executor);
future1.applyToEitherAsync(future2, res -> {
System.out.println(res);
return res + 1;
}, executor);
6、組合任務(wù)紫新,所有的完成
1)方法介紹
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
return andTree(cfs, 0, cfs.length - 1);
}
2)示例代碼
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任務(wù)1當(dāng)前線程號(hào) -> " + Thread.currentThread().getId());
int n = 5;
return n;
}, executor);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任務(wù)2當(dāng)前線程號(hào) -> " + Thread.currentThread().getId());
int n = 10;
return n;
}, executor);
CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2);
// 阻塞等待所有的任務(wù)執(zhí)行完成
allOf.get();
Integer result1 = future1.get();
Integer result2 = future2.get();
讓我們來試試項(xiàng)目中如何使用異步編排吧均蜜!
| 三、異步編排實(shí)際開發(fā) |
1芒率、配置線程池
@ConfigurationProperties(prefix = "coke.thread")
@Component
@Data
public class ThreadPoolProperties {
private Integer coreSize;
private Integer maxSize;
private Integer keepAliveTime;
}
//@EnableConfigurationProperties(ThreadPoolProperties.class) 如果沒有把線程池的常量配置類放到容器中囤耳,則使用該注解
@Configuration
public class MyThreadConfig {
@Bean
public ThreadPoolExecutor threadPoolExecutor(ThreadPoolProperties pool) {
return new ThreadPoolExecutor(
pool.getCoreSize(),
pool.getMaxSize(),
pool.getKeepAliveTime(),
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(100000),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
}
}
3、示例代碼
public SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException {
SkuItemVo skuItemVo = new SkuItemVo();
// supplyAsync 需要返回結(jié)果 因?yàn)?3 4 5 依賴1
CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> {
// 1偶芍、獲取sku基本信息 pms_sku_info
SkuInfoEntity skuInfoEntity = getById(skuId);
skuItemVo.setInfo(skuInfoEntity);
return skuInfoEntity;
}, executor);
CompletableFuture<Void> saleFuture = infoFuture.thenAcceptAsync((res) -> {
// 3充择、獲取spu的銷售屬性組合
List<SkuItemSaleAttrVo> skuItemSaleAttrVos = skuSaleAttrValueService.getSaleAttrsBySpuId(res.getSpuId());
skuItemVo.setSaleAttrs(skuItemSaleAttrVos);
}, executor);
CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync((res) -> {
// 4、獲取spu的介紹 pms_spu_info_desc
SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(res.getSpuId());
skuItemVo.setDesp(spuInfoDescEntity);
}, executor);
CompletableFuture<Void> baseAttrFuture = infoFuture.thenAcceptAsync((res) -> {
// 5匪蟀、獲取spu的規(guī)格參數(shù)信息
List<SpuItemAttrGroupVo> attrGroupVos = attrGroupService.getAttrGroupWithAttrsBySpuId(res.getCatalogId(), res.getSpuId());
skuItemVo.setAttrGroups(attrGroupVos);
}, executor);
CompletableFuture<Void> imageFuture = CompletableFuture.runAsync(() -> {
// 2椎麦、獲取sku的圖片信息 pms_spu_images
List<SkuImagesEntity> skuImagesEntities = skuImagesService.getImageBySkuId(skuId);
skuItemVo.setImages(skuImagesEntities);
}, executor);
// 6、查詢當(dāng)前sku是否參與秒殺優(yōu)惠
CompletableFuture<Void> secKillFuture = CompletableFuture.runAsync(() -> {
R skuSecKillInfo = secKillFeignService.getSkuSecKillInfo(skuId);
if (skuSecKillInfo.getCode() == 0) {
SecKillInfoVo skuSecKillInfoData = skuSecKillInfo.getData(new TypeReference<SecKillInfoVo>() {
});
skuItemVo.setSecKillInfoVo(skuSecKillInfoData);
}
}, executor);
// 等到所有任務(wù)都完成
CompletableFuture.allOf(saleFuture, descFuture, baseAttrFuture, imageFuture, secKillFuture).get();
return skuItemVo;
}
結(jié)尾
本文到這里就結(jié)束了萄窜,感謝看到最后的朋友铃剔,都看到最后了點(diǎn)個(gè)贊再走啦撒桨,如有不對(duì)之處還請(qǐng)多多指正查刻。