并行流背后使用的基礎(chǔ)架構(gòu)是Java 7中引入的分支/合并框架植影。我們會(huì)在本文仔細(xì)研究分支/合并框架箱锐。
分支/合并框架的目的是以遞歸方式將可以并行的任務(wù)拆分成更小的任務(wù)误甚,然后將每個(gè)子任務(wù)的結(jié)果合并起來(lái)生成整體結(jié)果宪萄。它是 ExecutorService 接口的一個(gè)實(shí)現(xiàn),它把子任務(wù)分配給線程池(稱為 ForkJoinPool )中的工作線程屎债。
一仅政、RecursiveTask
要把任務(wù)提交到這個(gè)池垢油,必須創(chuàng)建 RecursiveTask<R> 的一個(gè)子類,其中 R 是并行化任務(wù)(以及所有子任務(wù))產(chǎn)生的結(jié)果類型圆丹,或者如果任務(wù)不返回結(jié)果滩愁,則是 RecursiveAction 類型。
要定義 RecursiveTask辫封, 只需實(shí)現(xiàn)它唯一的抽象方法compute :
protected abstract R compute();
在我們實(shí)現(xiàn)這個(gè)方法時(shí)惊楼,需要同時(shí)定義將任務(wù)拆分成子任務(wù)的邏輯,以及無(wú)法再拆分或不方便再拆分時(shí)秸讹,生成
單個(gè)子任務(wù)結(jié)果的邏輯。
這個(gè)方法的實(shí)現(xiàn)類似于下面的偽代碼:
if (任務(wù)足夠小或不可分) {
順序計(jì)算該任務(wù)
} else {
將任務(wù)分成兩個(gè)子任務(wù)
遞歸調(diào)用本方法雅倒,拆分每個(gè)子任務(wù)璃诀,等待所有子任務(wù)完成
合并每個(gè)子任務(wù)的結(jié)果
}
遞歸任務(wù)拆分過(guò)程如下所示:
分支/合并框架實(shí)例:為一個(gè)數(shù)字范圍Long[]求和
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;
import static com.cloud.bssp.java8.stream.TestStreamParallel.measureSumPerf;
/**
* @description: 使用ForkJoinPool
* @author:weirx
* @date:2021/10/25 14:10
* @version:3.0
*/
public class TestRecursiveTask extends RecursiveTask<Long> {
/**
* 要求和的數(shù)組
*/
private final long[] numbers;
/**
* 子任務(wù)求和的數(shù)組的開始位置
*/
private int start;
/**
* 子任務(wù)求和的數(shù)組的結(jié)束位置
*/
private int end;
/**
* 私有構(gòu)造,用于以遞歸方式為主任務(wù)創(chuàng)建子任務(wù)
*
* @param numbers
* @param start
* @param end
*/
private TestRecursiveTask(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
/**
* 公共函數(shù)用于構(gòu)建主任務(wù)
*
* @param numbers
*/
public TestRecursiveTask(long[] numbers) {
this.numbers = numbers;
}
/**
* 任務(wù)拆分的數(shù)組最大值
*/
public static final long THRESHOLD = 10000L;
@Override
protected Long compute() {
int length = end - start;
if (length <= THRESHOLD) {
// 如果大小小于等于閾值蔑匣,則順序計(jì)算
return computeSequentially();
} else {
//創(chuàng)建一個(gè)子任務(wù)劣欢,為數(shù)組的前一半求和
TestRecursiveTask left = new TestRecursiveTask(numbers, start, start + length / 2);
//利用另一個(gè)ForkJoinPool線程異步執(zhí)行新創(chuàng)建的子任務(wù)
left.fork();
//創(chuàng)建一個(gè)子任務(wù),為數(shù)組的后一半求和
TestRecursiveTask right = new TestRecursiveTask(numbers, start + length / 2, end);
// 同步執(zhí)行第二個(gè)子任務(wù)
Long compute = right.compute();
//讀取第一個(gè)子任務(wù)的結(jié)果裁良,沒(méi)有完成則等待
Long join = left.join();
//結(jié)果合并
return compute + join;
}
}
/**
* 當(dāng)子任務(wù)不可拆分時(shí)計(jì)算結(jié)果的簡(jiǎn)單算法
*
* @return
*/
private Long computeSequentially() {
long sum = 0;
for (int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
}
/**
* 并行對(duì)前n個(gè)自然數(shù)求和
*
* @param n
* @return
*/
public static long forkJoinSum(long n) {
long[] numbers = LongStream.rangeClosed(1, n).toArray();
ForkJoinTask<Long> task = new TestRecursiveTask(numbers);
return new ForkJoinPool().invoke(task);
}
public static void main(String[] args) {
System.out.println("ForkJoin sum done in: " + measureSumPerf(
TestRecursiveTask::forkJoinSum, 10000000) + " msecs");
}
}
輸出結(jié)果:
ForkJoin sum done in: 64 msecs
這個(gè)性能看起來(lái)比用并行流的版本要差凿将,但這只是因?yàn)楸仨毾纫颜麄€(gè)數(shù)字流都放進(jìn)一個(gè)long[] ,之后才能在任務(wù)中使用它价脾。
二牧抵、Fork/join的最佳用法
雖然分支/合并框架還算簡(jiǎn)單易用,不幸的是它也很容易被誤用侨把。以下是幾個(gè)有效使用它的最佳做法:
1)對(duì)一個(gè)任務(wù)調(diào)用 join 方法會(huì)阻塞調(diào)用方犀变,直到該任務(wù)做出結(jié)果。因此秋柄,有必要在兩個(gè)子任務(wù)的計(jì)算都開始之后再調(diào)用它获枝。否則,你得到的版本會(huì)比原始的順序算法更慢更復(fù)雜骇笔,因?yàn)槊總€(gè)子任務(wù)都必須等待另一個(gè)子任務(wù)完成才能啟動(dòng)省店。
2)不應(yīng)該在 RecursiveTask 內(nèi)部使用 ForkJoinPool 的 invoke 方法。相反笨触,你應(yīng)該始終直接調(diào)用 compute 或 fork 方法懦傍,只有順序代碼才應(yīng)該用 invoke 來(lái)啟動(dòng)并行計(jì)算。
3) 對(duì)子任務(wù)調(diào)用 fork 方法可以把它排進(jìn) ForkJoinPool 旭旭。同時(shí)對(duì)左邊和右邊的子任務(wù)調(diào)用fork()似乎很自然谎脯,但這樣做的效率要比直接對(duì)其中一個(gè)調(diào)用 compute 低。調(diào)用compute你可以為其中一個(gè)子任務(wù)重用同一線程持寄,從而避免在線程池中多分配一個(gè)任務(wù)造成的開銷源梭。
4)調(diào)試分支/合并框架的并行計(jì)算代碼可能有點(diǎn)棘手娱俺。特別是你平常都在你喜歡的IDE里面看棧跟蹤(stack trace)來(lái)找問(wèn)題,但放在分支/合并計(jì)算上就不行了废麻,因?yàn)檎{(diào)用 compute的線程并不是概念上的調(diào)用方荠卷,后者是調(diào)用 fork 的那個(gè)。
5)和并行流一樣烛愧,你不應(yīng)理所當(dāng)然地認(rèn)為在多核處理器上使用分支/合并框架就比順序計(jì)算快油宜。一個(gè)任務(wù)可以分解成多個(gè)獨(dú)立的子任務(wù),才能讓性能在并行化時(shí)有所提升怜姿。所有這些子任務(wù)的運(yùn)行時(shí)間都應(yīng)該比分出新任務(wù)所花的時(shí)長(zhǎng)慎冤。
三、工作竊取
工作竊取為何被提出沧卢?
如前面的例子蚁堤,我們指定數(shù)組的大小是10000L,即允許任務(wù)被拆分為每個(gè)數(shù)組大小為10000但狭,共1000個(gè)任務(wù)披诗。
在理想的情況下,每個(gè)任務(wù)完成的時(shí)間應(yīng)該是相同的立磁,這樣在多核cpu的前提下呈队,我們能保證每個(gè)核處理的時(shí)間都是相同的。
實(shí)際情況中唱歧,每個(gè)子任務(wù)花費(fèi)的時(shí)間可以說(shuō)是天差地別宪摧,磁盤,網(wǎng)絡(luò)迈喉,或等等很多的因素導(dǎo)致绍刮。
Fork/Join框架為了解決這個(gè)提出,提出了工作竊劝っ(work stealing)的概念孩革。
在實(shí)際應(yīng)用中,這意味著這些任務(wù)差不多被平均分配到 ForkJoinPool 中的所有線程上得运。每個(gè)線程都為分配給它的任務(wù)保存一個(gè)雙向鏈?zhǔn)疥?duì)列膝蜈,每完成一個(gè)任務(wù),就會(huì)從隊(duì)列頭上取出下一個(gè)任務(wù)開始執(zhí)行熔掺。
基于前面所述的原因饱搏,某個(gè)線程可能早早完成了分配給它的所有任務(wù),也就是它的隊(duì)列已經(jīng)空了置逻,而其他的線程還很忙推沸。這時(shí),這個(gè)線程并沒(méi)有閑下來(lái),而是隨機(jī)選了一個(gè)別的線程鬓催,從隊(duì)列的尾巴上“偷走”一個(gè)任務(wù)肺素。這個(gè)過(guò)程一直繼續(xù)下去,直到所有的任務(wù)都執(zhí)行完畢宇驾,所有的隊(duì)列都清空倍靡。這就是為什么要?jiǎng)澇稍S多小任務(wù)而不是少數(shù)幾個(gè)大任務(wù),這有助于更好地在工作線程之間平衡負(fù)載课舍。
一般來(lái)說(shuō)塌西,這種工作竊取算法用于在池中的工作線程之間重新分配和平衡任務(wù)。如下圖展示了這個(gè)過(guò)程筝尾。當(dāng)工作線程隊(duì)列中有一個(gè)任務(wù)被分成兩個(gè)子任務(wù)時(shí)捡需,一個(gè)子任務(wù)就被閑置的工作線程“偷走”了。如前所述筹淫,這個(gè)過(guò)程可以不斷遞歸栖忠,直到規(guī)定子任務(wù)應(yīng)順序執(zhí)行的條件為真。
四贸街、Spliterator
那么Stream是如何實(shí)現(xiàn)并行的呢?我們并不需要手動(dòng)去實(shí)現(xiàn)Fork/join狸相,這就意味著薛匪,肯定有一種自動(dòng)機(jī)制來(lái)為你拆分流。這種新的自動(dòng)機(jī)制稱為 Spliterator脓鹃。
Spliterator 是Java 8中加入的另一個(gè)新接口逸尖;這個(gè)名字代表“可分迭代器”(splitableiterator)。和 Iterator 一樣瘸右, Spliterator 也用于遍歷數(shù)據(jù)源中的元素特愿,但它是為了并行執(zhí)行而設(shè)計(jì)的榛做。
public interface Spliterator<T> {
/**
* tryAdvance 方法的行為類似于普通的Iterator ,因?yàn)樗鼤?huì)按順序一個(gè)一個(gè)使用 Spliterator 中的元素,
* 并且如果有其他元素要遍歷就返回 true
*/
boolean tryAdvance(Consumer<? super T> action);
/**
* 專為 Spliterator 接口設(shè)計(jì)的努释,因?yàn)樗梢园岩恍┰貏澇鋈シ? * 給第二個(gè) Spliterator (由該方法返回),讓它們兩個(gè)并行處理须板。
*/
Spliterator<T> trySplit();
/**
* estimateSize 方法估計(jì)還剩下多少元素要遍歷
*/
long estimateSize();
int characteristics();
}
4.1 拆分過(guò)程
將 Stream 拆分成多個(gè)部分的算法是一個(gè)遞歸過(guò)程撕予,這個(gè)框架不斷對(duì) Spliterator 調(diào)用 trySplit直到它返回 null ,表明它處理的數(shù)據(jù)結(jié)構(gòu)不能再分割做裙,流程如下描述岗憋。
1)第一步是對(duì)第一個(gè)Spliterator 調(diào)用 trySplit ,生成第二個(gè) Spliterator 锚贱。
2)第二步對(duì)這兩個(gè) Spliterator 調(diào)用trysplit 仔戈,這樣總共就有了四個(gè) Spliterator 。
3)第三步,對(duì)當(dāng)前所有的Spliterator 調(diào)用trysplit 监徘,當(dāng)所有的trysplit 都返回null晋修,則表示拆分結(jié)束。
4.2 Spliterator特性
Spliterator的拆分過(guò)程也收到其本身的特性所影響耐量,特性是通過(guò)characteristics()方法來(lái)聲明的飞蚓。
Spliterator 接口聲明的最后一個(gè)抽象方法是 characteristics ,它將返回一個(gè) int 廊蜒,代表 Spliterator 本身特性集的編碼趴拧。
有如下特性:
/**
* 元素有既定的順序(例如 List ),因此 Spliterator 在遍歷和劃分時(shí)也會(huì)遵循這一順序
*/
public static final int ORDERED = 0x00000010;
/**
* 對(duì)于任意一對(duì)遍歷過(guò)的元素 x 和 y 山叮, x.equals(y) 返回 false
*/
public static final int DISTINCT = 0x00000001;
/**
* 遍歷的元素按照一個(gè)預(yù)定義的順序排序
*/
public static final int SORTED = 0x00000004;
/**
* 該 Spliterator 由一個(gè)已知大小的源建立(例如 Set )著榴,因此 estimatedSize() 返回的是準(zhǔn)確值
*/
public static final int SIZED = 0x00000040;
/**
* 保證遍歷的元素不會(huì)為 null
*/
public static final int NONNULL = 0x00000100;
/**
* Spliterator 的數(shù)據(jù)源不能修改。這意味著在遍歷時(shí)不能添加屁倔、刪除或修改任何元素
*/
public static final int IMMUTABLE = 0x00000400;
/**
* 該 Spliterator 的數(shù)據(jù)源可以被其他線程同時(shí)修改而無(wú)需同步
*/
public static final int CONCURRENT = 0x00001000;
/**
* 該 Spliterator 和所有從它拆分出來(lái)的 Spliterator 都是 SIZED
*/
public static final int SUBSIZED = 0x00004000;