Java Stream的并行實(shí)現(xiàn)

作者: 一字馬胡
轉(zhuǎn)載標(biāo)志 【2017-11-03】

更新日志

日期 更新內(nèi)容 備注
2017-11-03 添加轉(zhuǎn)載標(biāo)志 持續(xù)更新

并行與并發(fā)

關(guān)于并發(fā)與并行嫉父,需要弄清楚的是,并行關(guān)注于多個(gè)任務(wù)同時(shí)進(jìn)行眼刃,而并發(fā)則通過(guò)調(diào)度來(lái)不停的切換多個(gè)任務(wù)執(zhí)行绕辖,而實(shí)質(zhì)上多個(gè)任務(wù)不是同時(shí)執(zhí)的。并發(fā)擂红,英文單詞為:Concurrent仪际。并行的英文單詞為:parallel。如果想對(duì)并發(fā)和并行有一個(gè)比較直觀的認(rèn)識(shí)昵骤,可以參考下面這張圖片:

并行與并發(fā)

Fork/Join 框架與 Java Stream API

Fork/Join框架屬于并行框架树碱,關(guān)于Fork/Join框架的一些內(nèi)容,可以參考這篇文章:Java Fork/Join并行框架变秦。簡(jiǎn)單來(lái)說(shuō)成榜,F(xiàn)ork/Join框架可以將大的任務(wù)切分為足夠小的任務(wù),然后將小任務(wù)分配給不同的線程來(lái)執(zhí)行蹦玫,而線程之間通過(guò)工作竊取算法來(lái)協(xié)調(diào)資源赎婚,提前昨晚任務(wù)的線程可以去“竊取”其他還沒(méi)有做完任務(wù)的線程的任務(wù)刘绣,而每一個(gè)線程都會(huì)持有一個(gè)雙端隊(duì)列,里面存儲(chǔ)著分配給自己的任務(wù)挣输,F(xiàn)ork/Join框架在實(shí)現(xiàn)上纬凤,為了防止線程之間的競(jìng)爭(zhēng),線程在消費(fèi)分配給自己的任務(wù)時(shí)歧焦,是從隊(duì)列頭取任務(wù)的移斩,而“竊取”線程則從隊(duì)列尾部取任務(wù)肚医。
Fork/Join框架通過(guò)fork方法來(lái)分割大任務(wù)绢馍,通過(guò)使用join來(lái)獲取小任務(wù)的結(jié)果,然后組合成大任務(wù)的結(jié)果肠套。關(guān)于Fork/Join任務(wù)模型舰涌,可以參考下面的圖片:

Fork/Join的任務(wù)模型

關(guān)于Java Stream API的相關(guān)內(nèi)容,可以參考該文章:Java Streams API你稚。

Stream在實(shí)現(xiàn)上使用了Fork/Join框架來(lái)實(shí)現(xiàn)并發(fā)瓷耙,所以使用Stream我們可以在不知不覺(jué)間就使得我們的程序跑得飛快,究其原因就是Stream使用了Fork/Join并發(fā)框架來(lái)處理任務(wù)刁赖,當(dāng)然搁痛,你需要顯示的指定Stream為parallel,否則Stream默認(rèn)都是串行流宇弛。比如對(duì)于Collection鸡典,你可以使用parallelStream來(lái)轉(zhuǎn)換為一個(gè)并發(fā)流,或者使用stream方法轉(zhuǎn)換為串行流枪芒,然后使用parallel操作使得串行流變?yōu)椴l(fā)流彻况。本文的重點(diǎn)是剖析Stream是如何使用Fork/Join來(lái)做并發(fā)的。

Stream的并發(fā)實(shí)現(xiàn)細(xì)節(jié)

在了解了Fork/Join并發(fā)框架和Java Stream之后舅踪,首要的問(wèn)題就是:Stream是如何使用Fork/Join框架來(lái)做到并發(fā)的纽甘?其實(shí)對(duì)于使用者來(lái)說(shuō),了解Stream就是通過(guò)Fork/Join框架來(lái)做的就好了抽碌,但是如果想要深入了解一下Fork/Join框架的實(shí)踐悍赢,以及Java Stream的設(shè)計(jì)方法,那么去讀一下實(shí)現(xiàn)的源碼還是很有必要的货徙,下文中的分析僅代表個(gè)人觀點(diǎn)左权!

需要注意的一點(diǎn)是,Java Stream的操作分為兩類破婆,也可以分為三類涮总,具體的細(xì)節(jié)可以參考該文章:Java Streams API。一個(gè)簡(jiǎn)單的判斷一個(gè)操作是否是Terminal操作還是Intermediate操作的方法是祷舀,如果操作返回的是一個(gè)新的Stream瀑梗,那么就是一個(gè)Intermediate操作烹笔,否則就是一個(gè)Terminal操作。

  • Intermediate:一個(gè)流可以后面跟隨零個(gè)或多個(gè) intermediate 操作抛丽。其目的主要是打開(kāi)流谤职,做出某種程度的數(shù)據(jù)操作,然后返回一個(gè)新的流亿鲜,交給下一個(gè)操作使用允蜈。這類操作都是惰性化的(lazy),就是說(shuō)蒿柳,僅僅調(diào)用到這類方法饶套,并沒(méi)有真正開(kāi)始流的遍歷。

  • Terminal:一個(gè)流只能有一個(gè) terminal 操作垒探,當(dāng)這個(gè)操作執(zhí)行后妓蛮,流就被使用“光”了,無(wú)法再被操作圾叼。所以這必定是流的最后一個(gè)操作蛤克。Terminal 操作的執(zhí)行,才會(huì)真正開(kāi)始流的遍歷夷蚊,并且會(huì)生成一個(gè)結(jié)果构挤,或者一個(gè) side effect。

  • 還有一種操作被稱為 short-circuiting惕鼓。用以指:

    • 對(duì)于一個(gè) intermediate 操作筋现,如果它接受的是一個(gè)無(wú)限大(infinite/unbounded)的 Stream,但返回一個(gè) 有限的新 Stream呜笑。
    • 對(duì)于一個(gè) terminal 操作夫否,如果它接受的是一個(gè)無(wú)限大的 Stream,但能在有限的時(shí)間計(jì)算出結(jié)果叫胁。

Java Stream對(duì)四種類型的Terminal操作使用了Fork/Join實(shí)現(xiàn)了并發(fā)操作凰慈,下面的圖片展示了這四種操作類型:

支持并行的四種Stream操作

我們首先來(lái)走一遍Stream操作的執(zhí)行路徑,下面的代碼是我們想要做的操作流驼鹅,下文會(huì)根據(jù)該代碼示例來(lái)跟蹤Stream的執(zhí)行路徑:

        Stream.of(1,2,3,4)
                .parallel()
                .map(n -> n*2)
                .collect(Collectors.toCollection(ArrayList::new));

解釋一下微谓,上面的代碼想要實(shí)現(xiàn)的功能是將(1,2输钩,3豺型,4)這四個(gè)數(shù)字每一個(gè)都變?yōu)槠渥陨淼膬杀叮缓笫占@些元素到一個(gè)ArrayList中返回买乃。這是一個(gè)非常簡(jiǎn)單的功能姻氨,下面是上面的操作流的執(zhí)行路徑:


    step 1:
    
    public static<T> Stream<T> of(T... values) {
        return Arrays.stream(values);
    }
    
    step 2:
    
        public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
        Objects.requireNonNull(mapper);
        return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                return new Sink.ChainedReference<P_OUT, R>(sink) {
                    @Override
                    public void accept(P_OUT u) {
                        downstream.accept(mapper.apply(u));
                    }
                };
            }
        };
    }
    
    step 3:
    
        public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
            ...
            container = evaluate(ReduceOps.makeRef(collector));
            ...
    }
    
    step 4:
    
        final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;

        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }
    
    step 5:
    
    使用Fork/Join框架執(zhí)行操作。
    

上面的五個(gè)步驟是經(jīng)過(guò)一些省略的剪验,需要注意的一點(diǎn)是肴焊,intermediate類型的操作僅僅將操作加到一個(gè)upstream里面前联,具體的原文描述如下:


Construct a new Stream by appending a stateless intermediate operation to an existing stream.

比如上面我們的操作中的map操作,實(shí)際上只是將操作加到一個(gè)intermediate鏈條上面娶眷,不會(huì)立刻執(zhí)行似嗤。重點(diǎn)是第五步,Stream是如何使用Fork/Join來(lái)實(shí)現(xiàn)并發(fā)的届宠。evaluate這個(gè)方法至關(guān)重要烁落,在方法里面會(huì)分開(kāi)處理,對(duì)于設(shè)置了并發(fā)標(biāo)志的操作流豌注,會(huì)使用Fork/Join來(lái)并發(fā)執(zhí)行操作任務(wù)伤塌,而對(duì)于沒(méi)有打開(kāi)并發(fā)標(biāo)志的操作流,則串行執(zhí)行操作幌羞。

Fork/Join框架的核心方法是一個(gè)叫做compute的方法寸谜,下面分析一個(gè)forEach操作如何通過(guò)Fork/Join框架來(lái)實(shí)現(xiàn)并發(fā),通過(guò)追蹤代碼属桦,可以發(fā)現(xiàn)forEach的并發(fā)版本其實(shí)是一個(gè)交由一個(gè)ForEachTask對(duì)象來(lái)做,而ForEachTask類中實(shí)現(xiàn)了compute方法:

// Similar to AbstractTask but doesn't need to track child tasks
        public void compute() {
            Spliterator<S> rightSplit = spliterator, leftSplit;
            long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;
            if ((sizeThreshold = targetSize) == 0L)
                targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);
            boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
            boolean forkRight = false;
            Sink<S> taskSink = sink;
            ForEachTask<S, T> task = this;
            while (!isShortCircuit || !taskSink.cancellationRequested()) {
                if (sizeEstimate <= sizeThreshold ||
                    (leftSplit = rightSplit.trySplit()) == null) {
                    task.helper.copyInto(taskSink, rightSplit);
                    break;
                }
                ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);
                task.addToPendingCount(1);
                ForEachTask<S, T> taskToFork;
                if (forkRight) {
                    forkRight = false;
                    rightSplit = leftSplit;
                    taskToFork = task;
                    task = leftTask;
                }
                else {
                    forkRight = true;
                    taskToFork = leftTask;
                }
                taskToFork.fork();
                sizeEstimate = rightSplit.estimateSize();
            }
            task.spliterator = null;
            task.propagateCompletion();
        }
    }

在上面的代碼中將大任務(wù)拆成成了小任務(wù)他爸,那哪里收集了這些小任務(wù)呢聂宾?看下面的代碼:

        @Override
        public <S> Void evaluateParallel(PipelineHelper<T> helper,
                                         Spliterator<S> spliterator) {
            if (ordered)
                new ForEachOrderedTask<>(helper, spliterator, this).invoke();
            else
                new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
            return null;
        }

可以看到調(diào)用了invoke方法,而對(duì)invoke的描述如下:

     * Commences performing this task, awaits its completion if
     * necessary, and returns its result, or throws an (unchecked)
     * {@code RuntimeException} or {@code Error} if the underlying
     * computation did so.

不是說(shuō)Fork/Join框架嘛诊笤?那有了fork為什么沒(méi)有join而是invoke呢系谐?下面是對(duì)join方法的描述:


     * Returns the result of the computation when it {@link #isDone is
     * done}.  This method differs from {@link #get()} in that
     * abnormal completion results in {@code RuntimeException} or
     * {@code Error}, not {@code ExecutionException}, and that
     * interrupts of the calling thread do <em>not</em> cause the
     * method to abruptly return by throwing {@code
     * InterruptedException}.
     

根據(jù)join的描述,我們知道還可以使用get方法來(lái)獲取結(jié)果讨跟,但是get方法會(huì)拋出異常而join和invoke方法都不會(huì)拋出異常纪他,而是將異常報(bào)告給ForkJoinTask,讓ForkJoinTask來(lái)拋出異常晾匠。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末茶袒,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子凉馆,更是在濱河造成了極大的恐慌薪寓,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,122評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件澜共,死亡現(xiàn)場(chǎng)離奇詭異向叉,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)嗦董,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén)母谎,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人京革,你說(shuō)我怎么就攤上這事奇唤」╄担” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,491評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵冻记,是天一觀的道長(zhǎng)睡毒。 經(jīng)常有香客問(wèn)我,道長(zhǎng)冗栗,這世上最難降的妖魔是什么演顾? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,636評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮隅居,結(jié)果婚禮上钠至,老公的妹妹穿的比我還像新娘。我一直安慰自己胎源,他們只是感情好棉钧,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,676評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著涕蚤,像睡著了一般宪卿。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上万栅,一...
    開(kāi)封第一講書(shū)人閱讀 51,541評(píng)論 1 305
  • 那天佑钾,我揣著相機(jī)與錄音,去河邊找鬼烦粒。 笑死休溶,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的扰她。 我是一名探鬼主播兽掰,決...
    沈念sama閱讀 40,292評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼徒役!你這毒婦竟也來(lái)了孽尽?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,211評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤廉涕,失蹤者是張志新(化名)和其女友劉穎泻云,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體狐蜕,經(jīng)...
    沈念sama閱讀 45,655評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡宠纯,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,846評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了层释。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片婆瓜。...
    茶點(diǎn)故事閱讀 39,965評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出廉白,到底是詐尸還是另有隱情个初,我是刑警寧澤,帶...
    沈念sama閱讀 35,684評(píng)論 5 347
  • 正文 年R本政府宣布猴蹂,位于F島的核電站院溺,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏磅轻。R本人自食惡果不足惜珍逸,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,295評(píng)論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望聋溜。 院中可真熱鬧谆膳,春花似錦、人聲如沸撮躁。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,894評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)把曼。三九已至杨帽,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間祝迂,已是汗流浹背睦尽。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,012評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留型雳,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,126評(píng)論 3 370
  • 正文 我出身青樓山害,卻偏偏與公主長(zhǎng)得像纠俭,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子浪慌,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,914評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • 轉(zhuǎn)自: Java 8 中的 Streams API 詳解 為什么需要 Stream Stream 作為 Java ...
    普度眾生的面癱青年閱讀 2,918評(píng)論 0 11
  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理冤荆,服務(wù)發(fā)現(xiàn),斷路器权纤,智...
    卡卡羅2017閱讀 134,657評(píng)論 18 139
  • 為什么需要 Stream Stream 作為 Java 8 的一大亮點(diǎn)钓简,它與 java.io 包里的 InputS...
    鐵鋼0閱讀 143評(píng)論 0 0
  • 本文采用實(shí)例驅(qū)動(dòng)的方式,對(duì)JAVA8的stream API進(jìn)行一個(gè)深入的介紹汹想。雖然JAVA8中的stream AP...
    浮梁翁閱讀 25,754評(píng)論 3 50
  • 伴隨著一場(chǎng)淋漓的大雨外邓,呼吸著新鮮的空氣,開(kāi)始了小歐的第一天的正課古掏。最開(kāi)始是入學(xué)儀式损话,陽(yáng)光挺拔的男生、端莊美...
    王延旭閱讀 549評(píng)論 0 2