如果要使stream中的操作并行,使用起來非常簡單,只要加parallel()就可以了
public static void main(String[] args) throws Exception {
/**
* 并行操作:parallel()
* 順序操作:sequential()
*/
Optional<Integer> max = Stream.iterate(1, x -> x + 1).limit(20000).parallel().peek(x -> {
System.out.println(Thread.currentThread().getName());
}).max(Integer::compare);
System.out.println(max.get());
}
反之如果,想要讓并行轉(zhuǎn)換為串行也只要加sequential()
public static void main(String[] args) throws Exception {
/**
* 并行操作:parallel()
* 順序操作:sequential()
*/
Optional<Integer> max = Stream.iterate(1, x -> x + 1).limit(20000).parallel().sequential().peek(x -> {
System.out.println(Thread.currentThread().getName());
}).max(Integer::compare);
System.out.println(max.get());
}
sequential()和parallel()的前后位置決定了最后誰生效,最后一個聲明會生效.
此外,stream api流用的線程池是
ForkJoinPool.commonPool()
內(nèi)部默認核心線程數(shù)量是cpu的核心數(shù)
/**
* Creates a {@code ForkJoinPool} with parallelism equal to {@link
* java.lang.Runtime#availableProcessors}, using the {@linkplain
* #defaultForkJoinWorkerThreadFactory default thread factory},
* no UncaughtExceptionHandler, and non-async LIFO processing mode.
*
* @throws SecurityException if a security manager exists and
* the caller is not permitted to modify threads
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")}
*/
public ForkJoinPool() {
this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
defaultForkJoinWorkerThreadFactory, null, false);
}
如果需要手動執(zhí)行線程數(shù)的話可以使用
//設(shè)置為5個線程加上主線程就是6個線程在同時執(zhí)行
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","5");
當然也可以在java程序啟動時,給定參數(shù)
-D java.util.concurrent.ForkJoinPool.common.parallelism=5
也可以達到相同的效果