CompletableFuture避坑1——需要自定義線程池
CompletableFuture避坑2——allOf()超時時間不合理的后果
CompletableFuture避坑3——線程池的DiscardPolicy()導(dǎo)致整個程序卡死
1. 限制IO密集型任務(wù)的性能
CompletableFuture默認使用的線程池是 ForkJoinPool.commonPool()叁熔,commonPool是當(dāng)前 JVM(進程) 上的所有 CompletableFuture渴频、并行 Stream 共享的,commonPool 的目標(biāo)場景是非阻塞的 CPU 密集型任務(wù)梧宫,其線程數(shù)默認為 CPU 數(shù)量減1妄均,所以對于我們用java常做的IO密集型任務(wù)暮屡,默認線程池是遠遠不夠使用的;在雙核及以下機器上耍贾,默認線程池又會退化為為每個任務(wù)創(chuàng)建一個線程忠藤,相當(dāng)于沒有線程池挟伙。
以runAsync的代碼舉例,不指定線程池時模孩,使用的是ASYNC_POOL
尖阔,而這個ASYNC_POOL
的大小,是根據(jù) CPU 核數(shù)計算出來的(COMMON_PARALLELISM
)如果COMMON_PARALLELISM
小于1榨咐,USE_COMMON_POOL
為false(此時ForkJoinPool.commonPool()不支持并發(fā))介却,直接退化為 ThreadPerTaskExecutor,每個任務(wù)新開一個線程块茁。
下面是部分代碼及注釋齿坷。
// 這段用來計算ForkJoinPool.commonPool()的線程池大小的
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
CTL = l.findVarHandle(ForkJoinPool.class, "ctl", long.class);
MODE = l.findVarHandle(ForkJoinPool.class, "mode", int.class);
QA = MethodHandles.arrayElementVarHandle(ForkJoinTask[].class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
Class<?> ensureLoaded = LockSupport.class;
int commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
try {
String p = System.getProperty
("java.util.concurrent.ForkJoinPool.common.maximumSpares");
if (p != null)
commonMaxSpares = Integer.parseInt(p);
} catch (Exception ignore) {
}
COMMON_MAX_SPARES = commonMaxSpares;
defaultForkJoinWorkerThreadFactory =
new DefaultForkJoinWorkerThreadFactory();
modifyThreadPermission = new RuntimePermission("modifyThread");
common = AccessController.doPrivileged(new PrivilegedAction<>() {
public ForkJoinPool run() {
return new ForkJoinPool((byte) 0);
}
});
COMMON_PARALLELISM = Math.max(common.mode & SMASK, 1);
}
public static int getCommonPoolParallelism() {
return commonParallelism;
}
// ForkJoinPool.commonPool()線程池大小為1或0,就不使用ForkJoinPool.commonPool()了
private static final boolean USE_COMMON_POOL =
(ForkJoinPool.getCommonPoolParallelism() > 1);
// 為每個任務(wù)開一個線程的線程工廠
static final class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start();
}
}
private static final Executor ASYNC_POOL = USE_COMMON_POOL ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(ASYNC_POOL, runnable);
}
2. 子線程中不繼承當(dāng)前的類加載器
參考這個:https://zhuanlan.zhihu.com/p/339203275数焊,作者已經(jīng)寫得很清楚了永淌,我就不重復(fù)了。