java8(五)并行流之分支/合并框架

并行流背后使用的基礎(chǔ)架構(gòu)是Java 7中引入的分支/合并框架植影。我們會(huì)在本文仔細(xì)研究分支/合并框架箱锐。

分支/合并框架的目的是以遞歸方式將可以并行的任務(wù)拆分成更小的任務(wù)误甚,然后將每個(gè)子任務(wù)的結(jié)果合并起來(lái)生成整體結(jié)果宪萄。它是 ExecutorService 接口的一個(gè)實(shí)現(xiàn),它把子任務(wù)分配給線程池(稱為 ForkJoinPool )中的工作線程屎债。

image.png

一仅政、RecursiveTask

要把任務(wù)提交到這個(gè)池垢油,必須創(chuàng)建 RecursiveTask<R> 的一個(gè)子類,其中 R 是并行化任務(wù)(以及所有子任務(wù))產(chǎn)生的結(jié)果類型圆丹,或者如果任務(wù)不返回結(jié)果滩愁,則是 RecursiveAction 類型。

要定義 RecursiveTask辫封, 只需實(shí)現(xiàn)它唯一的抽象方法compute :

protected abstract R compute();

在我們實(shí)現(xiàn)這個(gè)方法時(shí)惊楼,需要同時(shí)定義將任務(wù)拆分成子任務(wù)的邏輯,以及無(wú)法再拆分或不方便再拆分時(shí)秸讹,生成
單個(gè)子任務(wù)結(jié)果的邏輯。

這個(gè)方法的實(shí)現(xiàn)類似于下面的偽代碼:

if (任務(wù)足夠小或不可分) {
    順序計(jì)算該任務(wù)
} else {
    將任務(wù)分成兩個(gè)子任務(wù)
    遞歸調(diào)用本方法雅倒,拆分每個(gè)子任務(wù)璃诀,等待所有子任務(wù)完成
    合并每個(gè)子任務(wù)的結(jié)果
}

遞歸任務(wù)拆分過(guò)程如下所示:

ForkJoinPool.png

分支/合并框架實(shí)例:為一個(gè)數(shù)字范圍Long[]求和

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;

import static com.cloud.bssp.java8.stream.TestStreamParallel.measureSumPerf;

/**
 * @description: 使用ForkJoinPool
 * @author:weirx
 * @date:2021/10/25 14:10
 * @version:3.0
 */
public class TestRecursiveTask extends RecursiveTask<Long> {

    /**
     * 要求和的數(shù)組
     */
    private final long[] numbers;

    /**
     * 子任務(wù)求和的數(shù)組的開始位置
     */
    private int start;

    /**
     * 子任務(wù)求和的數(shù)組的結(jié)束位置
     */
    private int end;

    /**
     * 私有構(gòu)造,用于以遞歸方式為主任務(wù)創(chuàng)建子任務(wù)
     *
     * @param numbers
     * @param start
     * @param end
     */
    private TestRecursiveTask(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }

    /**
     * 公共函數(shù)用于構(gòu)建主任務(wù)
     *
     * @param numbers
     */
    public TestRecursiveTask(long[] numbers) {
        this.numbers = numbers;
    }

    /**
     * 任務(wù)拆分的數(shù)組最大值
     */
    public static final long THRESHOLD = 10000L;

    @Override
    protected Long compute() {
        int length = end - start;
        if (length <= THRESHOLD) {
            // 如果大小小于等于閾值蔑匣,則順序計(jì)算
            return computeSequentially();
        } else {
            //創(chuàng)建一個(gè)子任務(wù)劣欢,為數(shù)組的前一半求和
            TestRecursiveTask left = new TestRecursiveTask(numbers, start, start + length / 2);
            //利用另一個(gè)ForkJoinPool線程異步執(zhí)行新創(chuàng)建的子任務(wù)
            left.fork();
            //創(chuàng)建一個(gè)子任務(wù),為數(shù)組的后一半求和
            TestRecursiveTask right = new TestRecursiveTask(numbers, start + length / 2, end);
            // 同步執(zhí)行第二個(gè)子任務(wù)
            Long compute = right.compute();
            //讀取第一個(gè)子任務(wù)的結(jié)果裁良,沒(méi)有完成則等待
            Long join = left.join();
            //結(jié)果合并
            return compute + join;
        }
    }

    /**
     * 當(dāng)子任務(wù)不可拆分時(shí)計(jì)算結(jié)果的簡(jiǎn)單算法
     *
     * @return
     */
    private Long computeSequentially() {
        long sum = 0;
        for (int i = start; i < end; i++) {
            sum += numbers[i];
        }
        return sum;
    }

    /**
     * 并行對(duì)前n個(gè)自然數(shù)求和
     *
     * @param n
     * @return
     */
    public static long forkJoinSum(long n) {
        long[] numbers = LongStream.rangeClosed(1, n).toArray();
        ForkJoinTask<Long> task = new TestRecursiveTask(numbers);
        return new ForkJoinPool().invoke(task);
    }

    public static void main(String[] args) {
        System.out.println("ForkJoin sum done in: " + measureSumPerf(
                TestRecursiveTask::forkJoinSum, 10000000) + " msecs");
    }

}

輸出結(jié)果:

ForkJoin sum done in: 64 msecs

這個(gè)性能看起來(lái)比用并行流的版本要差凿将,但這只是因?yàn)楸仨毾纫颜麄€(gè)數(shù)字流都放進(jìn)一個(gè)long[] ,之后才能在任務(wù)中使用它价脾。

二牧抵、Fork/join的最佳用法

雖然分支/合并框架還算簡(jiǎn)單易用,不幸的是它也很容易被誤用侨把。以下是幾個(gè)有效使用它的最佳做法:

1)對(duì)一個(gè)任務(wù)調(diào)用 join 方法會(huì)阻塞調(diào)用方犀变,直到該任務(wù)做出結(jié)果。因此秋柄,有必要在兩個(gè)子任務(wù)的計(jì)算都開始之后再調(diào)用它获枝。否則,你得到的版本會(huì)比原始的順序算法更慢更復(fù)雜骇笔,因?yàn)槊總€(gè)子任務(wù)都必須等待另一個(gè)子任務(wù)完成才能啟動(dòng)省店。

2)不應(yīng)該在 RecursiveTask 內(nèi)部使用 ForkJoinPool 的 invoke 方法。相反笨触,你應(yīng)該始終直接調(diào)用 compute 或 fork 方法懦傍,只有順序代碼才應(yīng)該用 invoke 來(lái)啟動(dòng)并行計(jì)算。

3) 對(duì)子任務(wù)調(diào)用 fork 方法可以把它排進(jìn) ForkJoinPool 旭旭。同時(shí)對(duì)左邊和右邊的子任務(wù)調(diào)用fork()似乎很自然谎脯,但這樣做的效率要比直接對(duì)其中一個(gè)調(diào)用 compute 低。調(diào)用compute你可以為其中一個(gè)子任務(wù)重用同一線程持寄,從而避免在線程池中多分配一個(gè)任務(wù)造成的開銷源梭。

4)調(diào)試分支/合并框架的并行計(jì)算代碼可能有點(diǎn)棘手娱俺。特別是你平常都在你喜歡的IDE里面看棧跟蹤(stack trace)來(lái)找問(wèn)題,但放在分支/合并計(jì)算上就不行了废麻,因?yàn)檎{(diào)用 compute的線程并不是概念上的調(diào)用方荠卷,后者是調(diào)用 fork 的那個(gè)。

5)和并行流一樣烛愧,你不應(yīng)理所當(dāng)然地認(rèn)為在多核處理器上使用分支/合并框架就比順序計(jì)算快油宜。一個(gè)任務(wù)可以分解成多個(gè)獨(dú)立的子任務(wù),才能讓性能在并行化時(shí)有所提升怜姿。所有這些子任務(wù)的運(yùn)行時(shí)間都應(yīng)該比分出新任務(wù)所花的時(shí)長(zhǎng)慎冤。

三、工作竊取

工作竊取為何被提出沧卢?

如前面的例子蚁堤,我們指定數(shù)組的大小是10000L,即允許任務(wù)被拆分為每個(gè)數(shù)組大小為10000但狭,共1000個(gè)任務(wù)披诗。

在理想的情況下,每個(gè)任務(wù)完成的時(shí)間應(yīng)該是相同的立磁,這樣在多核cpu的前提下呈队,我們能保證每個(gè)核處理的時(shí)間都是相同的。

實(shí)際情況中唱歧,每個(gè)子任務(wù)花費(fèi)的時(shí)間可以說(shuō)是天差地別宪摧,磁盤,網(wǎng)絡(luò)迈喉,或等等很多的因素導(dǎo)致绍刮。

Fork/Join框架為了解決這個(gè)提出,提出了工作竊劝っ(work stealing)的概念孩革。

在實(shí)際應(yīng)用中,這意味著這些任務(wù)差不多被平均分配到 ForkJoinPool 中的所有線程上得运。每個(gè)線程都為分配給它的任務(wù)保存一個(gè)雙向鏈?zhǔn)疥?duì)列膝蜈,每完成一個(gè)任務(wù),就會(huì)從隊(duì)列頭上取出下一個(gè)任務(wù)開始執(zhí)行熔掺。

基于前面所述的原因饱搏,某個(gè)線程可能早早完成了分配給它的所有任務(wù),也就是它的隊(duì)列已經(jīng)空了置逻,而其他的線程還很忙推沸。這時(shí),這個(gè)線程并沒(méi)有閑下來(lái),而是隨機(jī)選了一個(gè)別的線程鬓催,從隊(duì)列的尾巴上“偷走”一個(gè)任務(wù)肺素。這個(gè)過(guò)程一直繼續(xù)下去,直到所有的任務(wù)都執(zhí)行完畢宇驾,所有的隊(duì)列都清空倍靡。這就是為什么要?jiǎng)澇稍S多小任務(wù)而不是少數(shù)幾個(gè)大任務(wù),這有助于更好地在工作線程之間平衡負(fù)載课舍。

一般來(lái)說(shuō)塌西,這種工作竊取算法用于在池中的工作線程之間重新分配和平衡任務(wù)。如下圖展示了這個(gè)過(guò)程筝尾。當(dāng)工作線程隊(duì)列中有一個(gè)任務(wù)被分成兩個(gè)子任務(wù)時(shí)捡需,一個(gè)子任務(wù)就被閑置的工作線程“偷走”了。如前所述筹淫,這個(gè)過(guò)程可以不斷遞歸栖忠,直到規(guī)定子任務(wù)應(yīng)順序執(zhí)行的條件為真。

ForkJoinPool-工作竊取 (2).png

四贸街、Spliterator

那么Stream是如何實(shí)現(xiàn)并行的呢?我們并不需要手動(dòng)去實(shí)現(xiàn)Fork/join狸相,這就意味著薛匪,肯定有一種自動(dòng)機(jī)制來(lái)為你拆分流。這種新的自動(dòng)機(jī)制稱為 Spliterator脓鹃。

Spliterator 是Java 8中加入的另一個(gè)新接口逸尖;這個(gè)名字代表“可分迭代器”(splitableiterator)。和 Iterator 一樣瘸右, Spliterator 也用于遍歷數(shù)據(jù)源中的元素特愿,但它是為了并行執(zhí)行而設(shè)計(jì)的榛做。

public interface Spliterator<T> {

  /**
    *  tryAdvance 方法的行為類似于普通的Iterator ,因?yàn)樗鼤?huì)按順序一個(gè)一個(gè)使用 Spliterator 中的元素,
    * 并且如果有其他元素要遍歷就返回 true
    */
  boolean tryAdvance(Consumer<? super T> action);

   /**
     * 專為 Spliterator 接口設(shè)計(jì)的努释,因?yàn)樗梢园岩恍┰貏澇鋈シ?     * 給第二個(gè) Spliterator (由該方法返回),讓它們兩個(gè)并行處理须板。
     */
  Spliterator<T> trySplit();

  /**
    * estimateSize 方法估計(jì)還剩下多少元素要遍歷
    */
  long estimateSize();

  int characteristics();
}

4.1 拆分過(guò)程

將 Stream 拆分成多個(gè)部分的算法是一個(gè)遞歸過(guò)程撕予,這個(gè)框架不斷對(duì) Spliterator 調(diào)用 trySplit直到它返回 null ,表明它處理的數(shù)據(jù)結(jié)構(gòu)不能再分割做裙,流程如下描述岗憋。

1)第一步是對(duì)第一個(gè)Spliterator 調(diào)用 trySplit ,生成第二個(gè) Spliterator 锚贱。

2)第二步對(duì)這兩個(gè) Spliterator 調(diào)用trysplit 仔戈,這樣總共就有了四個(gè) Spliterator 。

3)第三步,對(duì)當(dāng)前所有的Spliterator 調(diào)用trysplit 监徘,當(dāng)所有的trysplit 都返回null晋修,則表示拆分結(jié)束。

4.2 Spliterator特性

Spliterator的拆分過(guò)程也收到其本身的特性所影響耐量,特性是通過(guò)characteristics()方法來(lái)聲明的飞蚓。

Spliterator 接口聲明的最后一個(gè)抽象方法是 characteristics ,它將返回一個(gè) int 廊蜒,代表 Spliterator 本身特性集的編碼趴拧。

有如下特性:

    /**
     * 元素有既定的順序(例如 List ),因此 Spliterator 在遍歷和劃分時(shí)也會(huì)遵循這一順序
     */
    public static final int ORDERED    = 0x00000010;

    /**
     * 對(duì)于任意一對(duì)遍歷過(guò)的元素 x 和 y 山叮, x.equals(y) 返回 false
     */
    public static final int DISTINCT   = 0x00000001;

    /**
     * 遍歷的元素按照一個(gè)預(yù)定義的順序排序
     */
    public static final int SORTED     = 0x00000004;

    /**
     * 該 Spliterator 由一個(gè)已知大小的源建立(例如 Set )著榴,因此 estimatedSize() 返回的是準(zhǔn)確值
     */
    public static final int SIZED      = 0x00000040;

    /**
     * 保證遍歷的元素不會(huì)為 null
     */
    public static final int NONNULL    = 0x00000100;

    /**
     * Spliterator 的數(shù)據(jù)源不能修改。這意味著在遍歷時(shí)不能添加屁倔、刪除或修改任何元素
     */
    public static final int IMMUTABLE  = 0x00000400;

    /**
     * 該 Spliterator 的數(shù)據(jù)源可以被其他線程同時(shí)修改而無(wú)需同步
     */
    public static final int CONCURRENT = 0x00001000;

    /**
     * 該 Spliterator 和所有從它拆分出來(lái)的 Spliterator 都是 SIZED
     */
    public static final int SUBSIZED = 0x00004000;
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末脑又,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子锐借,更是在濱河造成了極大的恐慌问麸,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,000評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件钞翔,死亡現(xiàn)場(chǎng)離奇詭異严卖,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)布轿,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,745評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門哮笆,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人汰扭,你說(shuō)我怎么就攤上這事稠肘。” “怎么了萝毛?”我有些...
    開封第一講書人閱讀 168,561評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵项阴,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我笆包,道長(zhǎng)鲁冯,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,782評(píng)論 1 298
  • 正文 為了忘掉前任色查,我火速辦了婚禮薯演,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘秧了。我一直安慰自己跨扮,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,798評(píng)論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著衡创,像睡著了一般帝嗡。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上璃氢,一...
    開封第一講書人閱讀 52,394評(píng)論 1 310
  • 那天哟玷,我揣著相機(jī)與錄音,去河邊找鬼一也。 笑死巢寡,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的椰苟。 我是一名探鬼主播抑月,決...
    沈念sama閱讀 40,952評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼舆蝴!你這毒婦竟也來(lái)了谦絮?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,852評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤洁仗,失蹤者是張志新(化名)和其女友劉穎层皱,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體赠潦,經(jīng)...
    沈念sama閱讀 46,409評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡奶甘,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,483評(píng)論 3 341
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了祭椰。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,615評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡疲陕,死狀恐怖方淤,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情蹄殃,我是刑警寧澤携茂,帶...
    沈念sama閱讀 36,303評(píng)論 5 350
  • 正文 年R本政府宣布,位于F島的核電站诅岩,受9級(jí)特大地震影響讳苦,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜吩谦,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,979評(píng)論 3 334
  • 文/蒙蒙 一鸳谜、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧式廷,春花似錦咐扭、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,470評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)袜爪。三九已至,卻和暖如春薛闪,著一層夾襖步出監(jiān)牢的瞬間辛馆,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,571評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工豁延, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留昙篙,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 49,041評(píng)論 3 377
  • 正文 我出身青樓术浪,卻偏偏與公主長(zhǎng)得像瓢对,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子胰苏,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,630評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容

  • 在Java 7之前硕蛹,并行處理數(shù)據(jù)集合非常麻煩。第一硕并,你得明確地把包含數(shù)據(jù)的數(shù)據(jù)結(jié)構(gòu)分成若干子部分法焰。第二,你要給每個(gè)...
    夏與清風(fēng)閱讀 889評(píng)論 0 1
  • Java8 in action 沒(méi)有共享的可變數(shù)據(jù)倔毙,將方法和函數(shù)即代碼傳遞給其他方法的能力就是我們平常所說(shuō)的函數(shù)式...
    鐵牛很鐵閱讀 1,235評(píng)論 1 2
  • 本文主要總結(jié)了《Java8實(shí)戰(zhàn)》埃仪,適用于學(xué)習(xí) Java8 的同學(xué),也可以作為一個(gè) API 手冊(cè)文檔適用陕赃,平時(shí)使用時(shí)...
    _曉__閱讀 1,790評(píng)論 2 7
  • 點(diǎn)贊+收藏 就學(xué)會(huì)系列卵蛉,文章收錄在 GitHub JavaEgg ,N線互聯(lián)網(wǎng)開發(fā)必備技能兵器譜 Java8早在2...
    JavaKeeper_海星閱讀 326評(píng)論 0 0
  • 緒論 之前的幾章中么库,我們已經(jīng)看到了新的Stream接口可以讓你以聲明性方式處理數(shù)據(jù)集傻丝。我們還解釋了將外部迭代換為內(nèi)...
    潯它芉咟渡閱讀 3,232評(píng)論 0 2