異步&線程池&異步編排
初始化線程的4種方式
- 繼承Thread
- 實現(xiàn)Runnable接口
- 實現(xiàn)Callable接口+FutureTask(可以拿到返回結(jié)果,可以處理異常)
- 線程池(推薦使用)
創(chuàng)建線程池
- 使用Executors創(chuàng)建
// 當前系統(tǒng)中池只有一兩個,每個異步任務溢陪,提交給線程池去執(zhí)行
public static ExecutorService service = Executors.newFixedThreadPool(10);
- new原生的ThreadPoolExecutor
/**
* int corePoolSize:核心線程數(shù) 線程池創(chuàng)建好之后就準備就緒的線程數(shù)量來接受異步任務執(zhí)行
* int maximumPoolSize:最大線程數(shù)量棍厌,控制資源
* long keepAliveTime:存活時間泵琳,釋放空閑線程(maximumPoolSize-corePoolSize)的存活時間
* TimeUnit unit:時間單位
* BlockingQueue<Runnable> workQueue:阻塞隊列,異步任務過多將多的任務放在隊只要有線程空閑 就去隊列里面取出新的任務繼續(xù)執(zhí)行
* 1. new LinkedBlockingQueue<>():默認是Integer的最大值蜈七,根據(jù)服務器并發(fā)量決定,要傳入指定的數(shù)量
* ThreadFactory threadFactory:線程的創(chuàng)建工廠
* RejectedExecutionHandler handler:如果阻塞隊列滿了按照指定的拒絕策略拒絕執(zhí)行任務
* 1. DiscardOldestPolicy 丟棄最老的任務
* 2. CallerRunsPolicy 直接調(diào)用線程自己的run方法乱凿,同步調(diào)用
* 3. AbortPolicy 丟棄新來的任務拋出異常
* 4. DiscardPolicy 丟棄新來的任務不拋異常
*/
new ThreadPoolExecutor(5, 200, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
運行流程
- 線程池創(chuàng)建,準備好core數(shù)量的核心線程础倍,準備接受任務
- 新的任務進來烛占,用core準備好的空閑線程執(zhí)行
- core滿了,就將再進來的任務放入阻塞隊列沟启,空閑的core就會自己去阻塞隊列獲取任務執(zhí)行
- 阻塞隊列滿了忆家,就直接開新線程執(zhí)行,最大只能開到max指定的數(shù)量
- max都執(zhí)行好了德迹,max-core數(shù)量的空閑線程會在keepAliveTime指定的時間之后自動銷毀芽卿,最終保持到core大小
- 如果線程數(shù)開到了max數(shù)量,還有新的任務進來浦辨,就會使用reject指定的拒絕策略進行處理
- 所有的線程創(chuàng)建都是由指定的factory創(chuàng)建
常見線程池
- newCachedThreadPool:創(chuàng)建一個可緩存線程池,如果線程池長度超過處理需要沼沈,可靈活回收空閑線程流酬,若無可回收,則新建線程
- newFixedThreadPool:創(chuàng)建一個定長線程池列另,可控制線程池最大并發(fā)數(shù)芽腾,超出的線程會在隊列中等待
- newScheduledThreadPool:創(chuàng)建一個定長線程池,支持定時及周期性任務執(zhí)行
- newSingleThreadExecutor:創(chuàng)建一個單線程化的線程池页衙,它只會用唯一的工作線程來執(zhí)行任務摊滔,保證所有任務有序執(zhí)行
CompletableFuture異步編排
創(chuàng)建異步對象
- runAsync無返回值
/**
* 異步執(zhí)行沒有返回值
* Runnable runnable:要異步執(zhí)行的線程
* Executor executor:執(zhí)行該線程的線程池
*/
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
System.out.println("thread: " + Thread.currentThread().getId());
int i = 10 / 2;
}, executor);
- supplyAsync有返回值
/**
* 異步執(zhí)行有返回值
* Supplier<U> supplier:要異步執(zhí)行的線程
* Executor executor:執(zhí)行該線程的線程池
*/
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("thread: " + Thread.currentThread().getId());
int i = 10 / 2;
return i;
}, executor);
Integer i = completableFuture.get();
System.out.println(i);
完成回調(diào)和異步感知
- whenComplete可以處理正常和異常的計算結(jié)果阴绢,exceptionally處理異常情況
whenComplete和whenCompleteAsync的區(qū)別
1. whenComplete:是執(zhí)行當前任務的線程繼續(xù)執(zhí)行whenComplete任務
2. whenCompleteAsync:是把whenCompleteAsync這個任務提交給線程池來執(zhí)行
方法不以Async結(jié)尾,意味著Action使用相同的線程執(zhí)行艰躺,而Async可能會使用其他線程執(zhí)行
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("thread: " + Thread.currentThread().getId());
int i = 10 / 0;
return i;
}, executor).whenComplete((res, exception) -> {
// 能夠感知異常信息呻袭,但是沒法修改返回數(shù)據(jù)
System.out.println("異步任務完成,結(jié)果是: " + res);
System.out.println("異常是: " + exception);
}).exceptionally(throwable -> {
// 能夠感知異常信息腺兴,同時返回默認值
return 10;
});
handle最終處理
方法執(zhí)行完成后的處理 可以接受返回參數(shù)左电,感知異常,有返回值
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("thread: " + Thread.currentThread().getId());
int i = 10 / 0;
return i;
}, executor).handle((res, exception) -> {
if (res != null) {
return res;
}
if (exception != null) {
return 0;
}
return 1;
});
線程串行化
帶有Async默認是異步執(zhí)行页响,串行化需要前置任務成功完成
- thenApply方法:當一個線程依賴另一個線程時篓足,獲取上一個任務返回的結(jié)果,并返回當前任務的返回值
- thenAccept方法:消費處理結(jié)果闰蚕。接收任務的處理結(jié)果栈拖,并消費處理,無返回結(jié)果
- thenRun方法:只要上面的任務執(zhí)行完成没陡,就開始執(zhí)行thenRun涩哟,只是處理完任務后,執(zhí)行后續(xù)操作
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("執(zhí)行任務一");
return 1;
}, executor).thenApply((res) -> {
System.out.println("執(zhí)行任務二");
return 2;
});
組合任務--多任務組合
- allOf:等待所有任務完成
- anyOf:只要一個任務完成
// all Of
CompletableFuture<Void> allOf = CompletableFuture.allOf(completableFuture1, completableFuture2, completableFuture3);
// 等待所有結(jié)果執(zhí)行完成 阻塞式等待
allOf.get();
// 全部執(zhí)行完畢诗鸭,獲取執(zhí)行結(jié)果
System.out.println(completableFuture1.get() + ";" + completableFuture2.get() + ";" + completableFuture3.get());
// any Of
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(completableFuture1, completableFuture2, completableFuture3);
// 等待其中一個結(jié)果執(zhí)行完成 阻塞式等待
anyOf.get();
// 其中一個執(zhí)行完畢染簇,獲取執(zhí)行結(jié)果
System.out.println(anyOf.get());
組合任務--兩個任務必須都完成,觸發(fā)該任務
- thenCombine:組合兩個future强岸,獲取兩個future的返回結(jié)果锻弓,并返回當前任務的返回值
- thenAcceptBoth:組合兩個future规揪,獲取兩個future任務的返回結(jié)果竿滨,任何處理結(jié)果,沒有返回值
- runAfterBoth:組合兩個future辽旋,不需要獲取future的結(jié)果妓盲,只需要兩個future處理完任務后杂拨,處理該任務
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println("執(zhí)行任務一");
return 1;
}, executor);
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println("執(zhí)行任務二");
return 2;
}, executor);
completableFuture1.runAfterBoth(completableFuture2, () -> {
System.out.println("執(zhí)行任務三");
});
completableFuture1.thenCombineAsync(completableFuture2, (f1, f2) -> {
int f3 = f1 + f2;
System.out.println(f3);
return f3;
}, executor);
組合任務--兩個任務完成一個,觸發(fā)該任務
- applyToEither:兩個任務有一個執(zhí)行完成悯衬,獲取它的返回值弹沽,處理任務并有新的返回值
- acceptEither:兩個任務有一個執(zhí)行完成,獲取它的返回值筋粗,處理任務策橘,沒有新的返回值
- runAfterEither:兩個任務有一個執(zhí)行完成,不需要獲取future的結(jié)果娜亿,處理任務丽已,也沒有返回值
completableFuture1.applyToEitherAsync(completableFuture2, (res) -> {
System.out.println("執(zhí)行任務三");
return res;
}, executor);
completableFuture1.acceptEitherAsync(completableFuture2, (res) -> {
System.out.println("執(zhí)行任務三" + res);
}, executor);
completableFuture1.runAfterEitherAsync(completableFuture2, () -> {
System.out.println("執(zhí)行任務三");
}, executor);
SpringBoot中使用線程池
- 定制線程池配置屬性
@ConfigurationProperties(prefix = "czs.thread")
@Configuration
@Data
public class ThreadPoolConfigProperties {
// 核心線程數(shù)
private Integer corePoolSize;
// 最大線程數(shù)量
private Integer maximumPoolSize;
// 存活時間
private Integer keepAliveTime;
}
- 配置線程池參數(shù)
czs.thread.core-pool-size=20
czs.thread.maximum-pool-size=200
czs.thread.keep-alive-time=10
- 將定制化線程池注入容器
@Configuration
public class MyThreadConfig {
@Bean
public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties pool) {
return new ThreadPoolExecutor(pool.getCorePoolSize(), pool.getMaximumPoolSize(), pool.getKeepAliveTime(), TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
}
}