java concurrent 之 ForkJoinPool

java concurrent 之 ForkJoinPool

ForkJoinPool在Java 7中被引入徙硅。ForkJoinPool類(lèi)似于Java ExecutorService狡逢,但有一個(gè)區(qū)別傲隶。 ForkJoinPool可以輕松將任務(wù)分解成較小的任務(wù)遂赠,然后將其提交給ForkJoinPool艾少。 任務(wù)可以將其工作分成較小的子任務(wù)疚漆,只要它能夠分解任務(wù)即可。 它可能聽(tīng)起來(lái)有點(diǎn)抽象奈附,所以在這個(gè)fork和join教程中,我將解釋ForkJoinPool如何工作煮剧,以及分裂任務(wù)如何工作斥滤。

解釋Fork和Join

在我們看看ForkJoinPool之前将鸵,我想解釋一下fork和join的原理。

Fork和Join原則由遞歸執(zhí)行的兩個(gè)步驟組成佑颇。 這兩個(gè)步驟是fork步驟和join步驟顶掉。

Fork

使用fork和join原理的任務(wù)可以將(自己)分割成更小的子任務(wù)

image

通過(guò)將其自身分解為子任務(wù),每個(gè)子任務(wù)可以由不同的CPU或同一CPU上的不同線程并行執(zhí)行挑胸。

如果任務(wù)給出的工作足夠大痒筒,任務(wù)只會(huì)分解成子任務(wù),這樣才有意義茬贵。 將任務(wù)分解為子任務(wù)有一個(gè)開(kāi)銷(xiāo)簿透,因此對(duì)于少量工作,此開(kāi)銷(xiāo)可能大于通過(guò)并發(fā)執(zhí)行子任務(wù)而實(shí)現(xiàn)的加速解藻。

將任務(wù)分解為子任務(wù)的時(shí)間限制也稱(chēng)為閾值老充。 每個(gè)任務(wù)都由決定一個(gè)明智的門(mén)檻決定。 這在很大程度上取決于正在做的工作螟左。

其實(shí)閾值的問(wèn)題就是在遞歸算法中的退出條件相似

Join

當(dāng)一個(gè)任務(wù)已經(jīng)分裂成子任務(wù)時(shí)啡浊,任務(wù)等待直到子任務(wù)完成執(zhí)行。

子任務(wù)完成執(zhí)行后胶背,任務(wù)可以將所有結(jié)果加入(合并)為一個(gè)結(jié)果巷嚣。 如下圖所示:

image

ForkJoinPool

ForkJoinPool是一個(gè)特殊的線程池,旨在使用fork-and-join任務(wù)拆分工作钳吟。 ForkJoinPool位于java.util.concurrent包中涂籽,因此完整的類(lèi)名稱(chēng)為java.util.concurrent.ForkJoinPool。

創(chuàng)建ForkJoinPool


ForkJoinPool forkJoinPool = new ForkJoinPool(4);

ForkJoinTask任務(wù)分為兩類(lèi)

  • RecursiveAction 用于沒(méi)有返回結(jié)果的任務(wù)砸抛。

  • RecursiveTask 用于有返回結(jié)果的任務(wù)

ForkJoinTask需要通過(guò)ForkJoinPool來(lái)執(zhí)行评雌,任務(wù)分割出的子任務(wù)會(huì)添加到當(dāng)前工作線程所維護(hù)的雙端隊(duì)列中,進(jìn)入隊(duì)列的頭部直焙。當(dāng)一個(gè)工作線程的隊(duì)列里暫時(shí)沒(méi)有任務(wù)時(shí)景东,它會(huì)隨機(jī)從其他工作線程的隊(duì)列的尾部獲取一個(gè)任務(wù)。

使用Fork/Join框架 Demo

package com.viashare.forkjoin;

import java.util.concurrent.*;

/**
 * Created by Jeffy on 16/01/12.
 */
public class ForkJoinMain {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Integer> future = forkJoinPool.submit(new CountTask(1, 5));
        System.err.println(future.get());

    }

    static class CountTask extends RecursiveTask<Integer> {

        private static final int Threshold = 3;

        private int start;

        private int end;

        public CountTask(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected Integer compute() {
            int sum = 0;
            int temp = end - start;
            System.err.println(temp);
            if (temp <= Threshold) {
                for (int i = start; i <=end; i++) {
                    sum += i;
                }
            } else {
                int millde = (end+start)/Threshold;
                CountTask task1 = new CountTask(start, millde);
                CountTask task2 = new CountTask(millde+1, end);
                task1.fork();
                task2.fork();

                int sum1 = task1.join();
                int sum2 = task2.join();
                System.err.println("sum1  "+sum1);
                System.err.println("sum2  "+sum2);
                sum = sum1 + sum2;
            }

            return sum;
        }
    }
}


Demo2


package com.viashare.forkjoin;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

/**
 * Created by Jeffy on 16/01/12.
 */
public class ForkJoinmain2 {

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool(4);
        MyRecursiveTask myRecursiveTask = new MyRecursiveTask(128);

        long mergedResult = forkJoinPool.invoke(myRecursiveTask);

        System.out.println("mergedResult = " + mergedResult);
    }

    static class MyRecursiveTask extends RecursiveTask<Long> {

        private long workLoad = 0;

        public MyRecursiveTask(long workLoad) {
            this.workLoad = workLoad;
        }

        protected Long compute() {
            //if work is above threshold, break tasks up into smaller tasks
            if (this.workLoad > 16) {
                System.out.println("Splitting workLoad : " + this.workLoad);
                List<MyRecursiveTask> subtasks = new ArrayList<MyRecursiveTask>();
                subtasks.addAll(createSubtasks());

                for (MyRecursiveTask subtask : subtasks) {
                    subtask.fork();
                }

                long result = 0;
                for (MyRecursiveTask subtask : subtasks) {
                    result += subtask.join();
                }
                return result;

            } else {
                System.out.println("Doing workLoad myself: " + this.workLoad);
                return workLoad * 3;
            }
        }

        private List<MyRecursiveTask> createSubtasks() {
            List<MyRecursiveTask> subtasks = new ArrayList<MyRecursiveTask>();

            MyRecursiveTask subtask1 = new MyRecursiveTask(this.workLoad / 2);
            MyRecursiveTask subtask2 = new MyRecursiveTask(this.workLoad / 2);

            subtasks.add(subtask1);
            subtasks.add(subtask2);

            return subtasks;
        }
    }
}

Fork/Join框架的異常處理

ForkJoinTask在執(zhí)行的時(shí)候可能會(huì)拋出異常奔誓,但是我們沒(méi)辦法在主線程里直接捕獲異常斤吐,所以ForkJoinTask提供了isCompletedAbnormally()方法來(lái)檢查任務(wù)是否已經(jīng)拋出異常或已經(jīng)被取消了厨喂,并且可以通過(guò)ForkJoinTask的getException方法獲取異常和措。使用如下代碼:

if(task.isCompletedAbnormally())
{
    System.out.println(task.getException());
}

getException方法返回Throwable對(duì)象,如果任務(wù)被取消了則返回CancellationException蜕煌。如果任務(wù)沒(méi)有完成或者沒(méi)有拋出異常則返回null派阱。

Fork/Join框架的實(shí)現(xiàn)原理

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市斜纪,隨后出現(xiàn)的幾起案子贫母,更是在濱河造成了極大的恐慌文兑,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,372評(píng)論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件腺劣,死亡現(xiàn)場(chǎng)離奇詭異绿贞,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)橘原,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén)籍铁,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人趾断,你說(shuō)我怎么就攤上這事拒名。” “怎么了歼冰?”我有些...
    開(kāi)封第一講書(shū)人閱讀 162,415評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵靡狞,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我隔嫡,道長(zhǎng)甸怕,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,157評(píng)論 1 292
  • 正文 為了忘掉前任腮恩,我火速辦了婚禮梢杭,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘秸滴。我一直安慰自己武契,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,171評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布荡含。 她就那樣靜靜地躺著咒唆,像睡著了一般。 火紅的嫁衣襯著肌膚如雪释液。 梳的紋絲不亂的頭發(fā)上全释,一...
    開(kāi)封第一講書(shū)人閱讀 51,125評(píng)論 1 297
  • 那天,我揣著相機(jī)與錄音误债,去河邊找鬼浸船。 笑死,一個(gè)胖子當(dāng)著我的面吹牛寝蹈,可吹牛的內(nèi)容都是我干的李命。 我是一名探鬼主播,決...
    沈念sama閱讀 40,028評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼箫老,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼封字!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 38,887評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤周叮,失蹤者是張志新(化名)和其女友劉穎辩撑,沒(méi)想到半個(gè)月后界斜,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體仿耽,經(jīng)...
    沈念sama閱讀 45,310評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,533評(píng)論 2 332
  • 正文 我和宋清朗相戀三年各薇,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了项贺。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,690評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡峭判,死狀恐怖开缎,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情林螃,我是刑警寧澤奕删,帶...
    沈念sama閱讀 35,411評(píng)論 5 343
  • 正文 年R本政府宣布睬澡,位于F島的核電站孕暇,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏盗痒。R本人自食惡果不足惜横漏,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,004評(píng)論 3 325
  • 文/蒙蒙 一谨设、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧缎浇,春花似錦扎拣、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至指厌,卻和暖如春刊愚,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背仑乌。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,812評(píng)論 1 268
  • 我被黑心中介騙來(lái)泰國(guó)打工百拓, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人晰甚。 一個(gè)月前我還...
    沈念sama閱讀 47,693評(píng)論 2 368
  • 正文 我出身青樓衙传,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親厕九。 傳聞我的和親對(duì)象是個(gè)殘疾皇子蓖捶,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,577評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容

  • 一、多線程 說(shuō)明下線程的狀態(tài) java中的線程一共有 5 種狀態(tài)扁远。 NEW:這種情況指的是俊鱼,通過(guò) New 關(guān)鍵字創(chuàng)...
    Java旅行者閱讀 4,676評(píng)論 0 44
  • 進(jìn)程和線程 進(jìn)程 所有運(yùn)行中的任務(wù)通常對(duì)應(yīng)一個(gè)進(jìn)程,當(dāng)一個(gè)程序進(jìn)入內(nèi)存運(yùn)行時(shí),即變成一個(gè)進(jìn)程.進(jìn)程是處于運(yùn)行過(guò)程中...
    小徐andorid閱讀 2,807評(píng)論 3 53
  • 雞年第一場(chǎng)雪來(lái)得如此早刻像,潤(rùn)雪兆豐年,頗有大吉的征兆并闲。 韓愈的一首春雪细睡,把那種二月雪的欣喜與驚奇刻畫(huà)得妙...
    4af85c91ffc8閱讀 223評(píng)論 0 1
  • 如果你已經(jīng)很好的記下你的發(fā)現(xiàn),以及對(duì)產(chǎn)生的想法進(jìn)行批判性的思考帝火,為了交流而進(jìn)行的寫(xiě)作會(huì)變得更簡(jiǎn)單也會(huì)更享受溜徙。...
    張?zhí)硌?/span>閱讀 266評(píng)論 0 1
  • 永遠(yuǎn)都不會(huì)對(duì)你說(shuō) 我愛(ài)你 亦或者,我只說(shuō)一次犀填。 我不怕錯(cuò)過(guò)你 因?yàn)槊鎸?duì)你蠢壹,我從里到外滿是羞澀 如果你懂我, 你知道...
    小小的田閱讀 96評(píng)論 0 1