一渊啰、前言
- Java并行流探橱,方便了 并發(fā)操作,但是不注意可能會導(dǎo)致問題绘证。
- 如 最大線程數(shù)隧膏,怎么控制并發(fā)數(shù),類加載器嚷那,線程上下文變化胞枕,F(xiàn)orkJoinPool 的 execute、submit魏宽、invoke 方法的區(qū)別 等腐泻。
- 注意:本文以 openjdk 11.0.10 為例,沒有特殊說明時队询,都是指 ForkJoinPool.commonPool()
二派桩、注意點(diǎn)
1. 并行度
- 并行度 不等于 最大線程數(shù)(maximumPoolSize),下圖 commonPool 有49個線程蚌斩,但是 并行度為1
- 默認(rèn)的 并行度為 CPU核數(shù) - 1铆惑,最小為 1
-
可通過 -Djava.util.concurrent.ForkJoinPool.common.parallelism=數(shù)量 設(shè)置
c22ac0cc03c53b78fd0c6d60b174c877.png
2. 容器里面的并行度
- 下圖中,/sys/fs/cgroup/cpu/cpu.cfs_quota_us 除以 /sys/fs/cgroup/cpu/cpu.cfs_period_us = cpu核數(shù)
-
不等于 nproc送膳,更不等于 獲得宿主機(jī)的 lscpu | grep 'CPU(s):'
7bc5992947992d55e4bb0dc1a0d05f5e.png
3. 最大線程數(shù)
- 并行度 不等于 最大線程數(shù)(maximumPoolSize)
- 即使 并行度 parallelism 為1员魏,還有 備用線程(maximumPoolSize、COMMON_MAX_SPARES)
-
commonPool 默認(rèn) 256肠缨,自定義 ForkJoinPool() 默認(rèn) 32767逆趋。這樣看,比較少會出現(xiàn) 線程數(shù)不夠的情況晒奕。
8d8f505acccb54054c30e0fa62222e3f.png
4. 并發(fā)太大闻书,壓垮后端
- 假如 ForkJoinPool.commonPool() 線程比較多名斟,并行流集合的元素也比較多時,給下游較大壓力
- jstack pid | grep -c commonPool
5. 線程上下文變化
如:獲取不到用戶信息了魄眉,可以獲取到用戶信息以后砰盐,傳到并行流使用
final String deviceUdid = RequestUtils.getDeviceUdid();
data.parallelStream().forEach(d -> {
// use deviceUdid instead of RequestUtils.getDeviceUdid() do something
});
6. ForkJoinPool 的 execute、submit坑律、invoke 方法的區(qū)別
- 有些簡單的任務(wù)岩梳,不想單獨(dú)創(chuàng)建線程池,可以用 ForkJoinPool.commonPool()
- execute():異步執(zhí)行晃择,沒有返回值冀值,不能等待執(zhí)行完成
- submit():異步執(zhí)行,返回 ForkJoinTask宫屠,需增加 .join() 等待完成
- invoke():等于 submit() + join()
7. spring boot使用Java并行流發(fā)送kafka消息報錯
- 類加載器不一樣列疗,詳見 spring boot 使用 Java 并行流發(fā)送 kafka 消息報錯
- 使用 spring-boot-maven-plugin 打包以后,依賴在 jar里面自定義位置(BOOT-INF/lib/)浪蹂,使用 org.springframework.boot.loader.LaunchedURLClassLoader 加載
- ForkJoinPool.commonPool 默認(rèn)使用 DefaultForkJoinWorkerThreadFactory抵栈,用的 系統(tǒng)ClassLoader,所以 并行流加載不到依賴的 class
-
可通過 -Djava.util.concurrent.ForkJoinPool.common.threadFactory 設(shè)置 自定義線程工廠坤次,使用當(dāng)前 ClassLoader 解決
9a05845206e25efa91f762f5ed4b4847.png
8. 自定義并行流線程池
參考 concurrency - Custom thread pool in Java 8 parallel stream - Stack Overflow
- 方案一(各種情況都有效)
CompletableFuture.runAsync(runnable, new ForkJoinPool(2)).join()
- 方案二(部分場景似乎沒有效果)
// 第4個參數(shù) asyncMode古劲,默認(rèn) false,設(shè)置為 true 適用于 FIFO
ForkJoinPool forkJoinPool = new ForkJoinPool(2, pool -> new ForkJoinWorkerThread(pool) {
}, null, false);
forkJoinPool.invoke(() -> list.parallelStream().forEach());
9. 控制并發(fā)數(shù)
- 可考慮把 集合切分成需要的份數(shù)缰猴,然后 parallelStream()
List<String> list = List.of("a", "b", "c");
CollUtil.split(list, list.size() / 2 + 1).parallelStream().forEach(b -> {
b.stream().forEach(System.out::println);
});
10. 順序消費(fèi)
- 如 forEachOrdered 會導(dǎo)致沒有并發(fā)效果
- 需要并行产艾,還要使用輸入順序的,可考慮把 集合切分成需要的份數(shù)滑绒,然后 parallelStream()
三胰舆、總結(jié)
- Java并行流,方便了 并發(fā)操作蹬挤,同時需要 了解底層實(shí)現(xiàn)缚窿、限制,避免不注意可能導(dǎo)致的問題
- 參考:ForkJoinPool 源碼分析 - 竺旭東 - 博客園 (cnblogs.com)
- https://github.com/agile6v/container_cpu_detection
- https://stackoverflow.com/questions/21163108/custom-thread-pool-in-java-8-parallel-stream
本文遵守【CC BY-NC】協(xié)議焰扳,轉(zhuǎn)載請保留原文出處及本版權(quán)聲明倦零,否則將追究法律責(zé)任。