Java并行計(jì)算框架之ForkJoin

使用線程池框架可以大大提高資源利用率陨帆,做到任務(wù)的提交與執(zhí)行分離,每一個(gè)worker線程執(zhí)行完自己的任務(wù)就算結(jié)束了责静,這里面有一種情況,假如某一個(gè)線程執(zhí)行的特別慢齿税,因?yàn)樗鼉?nèi)部又不斷的生成新的task牵舱,所以任務(wù)比較大,在這種情況下就算其他線程池里面有空閑的線程也幫不上忙窖铡,在這類問題下TheadPoolEexcutor處理起來就比較乏力箍铲,所以在Java7之后,新增了一種并行計(jì)算框架ForkJoinPool今穿,當(dāng)然這個(gè)類也歸屬Java線程池里面的一個(gè)實(shí)現(xiàn),作為ForkJoinPool不會(huì)為每個(gè)子任務(wù)創(chuàng)建單獨(dú)的線程。相反毫缆,池中的每個(gè)線程都有自己的雙端隊(duì)列(或deque,發(fā)音甲板)撇贺,用于存儲(chǔ)任務(wù)。
TheadPoolEexcutor一個(gè)有力補(bǔ)充,其在解決分治問題場景下的效率比較好禁舷,因?yàn)檫@個(gè)框架采用了work-steal算法,可以充分利用線程池里面的cpu資源。

什么是ForkJoin

ForkJoin框架的核心思想分治思想,對于非常大的問題错沃,把它的切分成多個(gè)小問題醒叁,然后在多個(gè)小問題里面繼續(xù)切分成更小的部分篮奄,遞歸切分直到問題的粒度足夠小,可以被直接處理他匪。這種切分任務(wù)的操作我們就叫fork,然后對于這些小問題因?yàn)槭强梢圆⑿羞\(yùn)行在多核處理上夸研,所以我們需要聯(lián)合這些問題的結(jié)果從而得到最終的計(jì)算結(jié)果邦蜜,這種方式我們叫做join,這就是Fork/Join的名字含義亥至。


采用偽代碼來描述這一過程如下:


ForkJoin的API介紹

在Java的api里面ForkJoinPool線程池是ForkJoin框架的核心悼沈,這個(gè)線程池首先維護(hù)了一個(gè)總的任務(wù)隊(duì)列贱迟,每個(gè)線程會(huì)從總的任務(wù)隊(duì)列里面獲取任務(wù)執(zhí)行,得到自己的任務(wù)之后絮供,由該任務(wù)產(chǎn)生的子任務(wù)會(huì)放在自己線程維護(hù)的一個(gè)雙端隊(duì)列里面衣吠,然后從頭部獲取任務(wù)依次處理,直到完畢壤靶,此時(shí)如果別的線程處理完了自己隊(duì)列的任務(wù)缚俏,那么會(huì)幫著還沒有處理完任務(wù)的線程處理任務(wù),具體是從尾部讀取任務(wù)萍肆,依次處理袍榆,這樣就達(dá)到了work-steal的理念,從而有效的利用了cpu資源塘揣。

work stealing算法的優(yōu)點(diǎn):
利用了線程進(jìn)行并行計(jì)算包雀,減少了線程間的競爭。

work stealing算法的缺點(diǎn):
如果雙端隊(duì)列中只有一個(gè)任務(wù)時(shí)亲铡,線程間會(huì)存在競爭才写。
額外的開銷,例如雙端隊(duì)列

ForkJoin框架的api支持:

  • ForkJoinPool: 管理ForkJoin框架里面線程的執(zhí)行
  • ForkJoinTask:一個(gè)抽象的類定義了task在ForkJoinPool里面的運(yùn)行
  • RecursiveAction: ForkJoinTask的子類運(yùn)行一個(gè)沒有返回值的任務(wù)
  • RecursiveTask: ForkJoinTask的子類運(yùn)行一個(gè)有返回值的任務(wù)
    其中ForkJoinTask類代表了抽象的切成的任務(wù)奖蔓,而不是實(shí)際的執(zhí)行的線程赞草,
    這種機(jī)制允許一小部分?jǐn)?shù)量的線程去管理執(zhí)行一大部分任務(wù)。

ForkJoinTask的關(guān)鍵方法有:

 final   ForkJoinTask < V >  fork () 
 final  V join ()
 final  V invoke ()

fork方法提交一個(gè)任務(wù)異步的去執(zhí)行吆鹤,這個(gè)方法返回this會(huì)持續(xù)的調(diào)用線程去運(yùn)行厨疙。
join方法,會(huì)等待直到任務(wù)結(jié)束并返回其結(jié)果疑务,對于沒有返回值的任務(wù)返回的值Void類型沾凄。
invoke方法,包裝了fork和join方法知允,我們只需要調(diào)用invoke方法撒蟀,它就會(huì)啟動(dòng)任務(wù)然后返回結(jié)果。
額外的這里面還有兩個(gè)靜態(tài)方法:

static   void  invokeAll ( ForkJoinTask <?>  task1 ,   ForkJoinTask <?>  task2 ):  static   void  invokeAll ( ForkJoinTask <?>…  taskList )

前者執(zhí)行兩個(gè)任務(wù)温鸽,后者執(zhí)行一個(gè)任務(wù)列表保屯。
ForkJoinTask還有一個(gè)compute方法,這個(gè)方法用來編寫主要的計(jì)算邏輯:
ForkJoinPool類主要有兩個(gè)提交任務(wù)的方法:
第一個(gè)是invoke會(huì)同步等待直到所有的任務(wù)返回并得到執(zhí)行結(jié)果涤垫。
第二個(gè)是異步的提交使用execute方法姑尺,對于結(jié)果的獲取需要調(diào)用任務(wù)的get方法來阻塞獲取◎疴或者使用get超時(shí)方法來輪詢獲取結(jié)果股缸。
最后值得一提的是,對于ForkJoinPool方法吱雏,我們不需要顯式的調(diào)用shutdown來關(guān)閉敦姻,其使用的是守護(hù)線程瘾境,只要所有的線程運(yùn)行完畢,線程池會(huì)自動(dòng)退出镰惦。

ForkJoinPool組成類:

ForkJoinPool

充當(dāng)fork/join框架里面的管理者迷守,最原始的任務(wù)都要交給它才能處理。它負(fù)責(zé)控制整個(gè)fork/join有多少個(gè)workerThread旺入,workerThread的創(chuàng)建兑凿,激活都是由它來掌控。它還負(fù)責(zé)workQueue隊(duì)列的創(chuàng)建和分配茵瘾,每當(dāng)創(chuàng)建一個(gè)workerThread礼华,它負(fù)責(zé)分配相應(yīng)的workQueue。然后它把接到的活都交給workerThread去處理拗秘,它可以說是整個(gè)frok/join的容器圣絮。

ForkJoinWorkerThread

fork/join里面真正干活的"工人",本質(zhì)是一個(gè)線程雕旨。里面有一個(gè)ForkJoinPool.WorkQueue的隊(duì)列存放著它要干的活扮匠,接活之前它要向ForkJoinPool注冊(registerWorker),拿到相應(yīng)的workQueue凡涩。然后就從workQueue里面拿任務(wù)出來處理棒搜。它是依附于ForkJoinPool而存活,如果ForkJoinPool的銷毀了,它也會(huì)跟著結(jié)束活箕。

ForkJoinPool.WorkQueue

雙端隊(duì)列就是它力麸,它負(fù)責(zé)存儲(chǔ)接收的任務(wù)。
這是一個(gè)雙端隊(duì)列(Deque)育韩,里面存放的對象是任務(wù)(ForkJoinTask)克蚂。
每個(gè)工作線程在運(yùn)行中產(chǎn)生新的任務(wù)(通常是因?yàn)檎{(diào)用了 fork())時(shí),會(huì)放入工作隊(duì)列的隊(duì)尾座慰,并且工作線程在處理自己的工作隊(duì)列時(shí),使用的是 LIFO 方式翠拣,也就是說每次從隊(duì)尾取出任務(wù)來執(zhí)行版仔。
每個(gè)工作線程在處理自己的工作隊(duì)列同時(shí),會(huì)嘗試竊取一個(gè)任務(wù)(或是來自于剛剛提交到 pool的任務(wù)误墓,或是來自于其他工作線程的工作隊(duì)列)蛮粮,竊取的任務(wù)位于其他線程的工作隊(duì)列的隊(duì)首,也就是說工作線程在竊取其他工作線程的任務(wù)時(shí)谜慌,使用的是 FIFO 方式然想。

 Mode bits for ForkJoinPool.config and WorkQueue.config
    static final int MODE_MASK    = 0xffff << 16;  // top half of int
    static final int LIFO_QUEUE   = 0;
    static final int FIFO_QUEUE   = 1 << 16;
    static final int SHARED_QUEUE = 1 << 31;       // must be negative

控制是FIFO還是LIFO

        /**
         * Takes next task, if one exists, in order specified by mode.
         */
        final ForkJoinTask<?> nextLocalTask() {
            return (config & FIFO_QUEUE) == 0 ? pop() : poll();
        }
  • ForkJoinTask:代表fork/join里面任務(wù)類型,我們一般用它的兩個(gè)子類RecursiveTask欣范、RecursiveAction变泄。這兩個(gè)區(qū)別在于RecursiveTask任務(wù)是有返回值令哟,RecursiveAction沒有返回值。任務(wù)的處理邏輯包括任務(wù)的切分都集中在compute()方法里面妨蛹。

配置參數(shù)

通過代碼指定屏富,必須得在commonPool初始化之前(parallel的stream被調(diào)用之前,一般可在系統(tǒng)啟動(dòng)后設(shè)置)注入進(jìn)去蛙卤,否則無法生效狠半。
通過啟動(dòng)參數(shù)指定無此限制,較為安全

  • parallelism(即配置線程池個(gè)數(shù))
    可以通過java.util.concurrent.ForkJoinPool.common.parallelism進(jìn)行配置颤难,最大值不能超過MAX_CAP,即32767.
    static final int MAX_CAP = 0x7fff; //32767
    如果沒有指定神年,則默認(rèn)為Runtime.getRuntime().availableProcessors() - 1.
    代碼指定(必須得在commonPool初始化之前注入進(jìn)去,否則無法生效)

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "8");

或者參數(shù)指定

-Djava.util.concurrent.ForkJoinPool.common.parallelism=8

Fork Join注意的點(diǎn)

protected Long compute() {
    if (任務(wù)足夠小?) {
        return computeDirect();
    }
    // 任務(wù)太大,一分為二:
    SumTask subtask1 = new SumTask(...);
    SumTask subtask2 = new SumTask(...);
    // 分別對子任務(wù)調(diào)用fork():
    subtask1.fork();
    subtask2.fork();
    // 合并結(jié)果:
    Long subresult1 = subtask1.join();
    Long subresult2 = subtask2.join();
    return subresult1 + subresult2;
}


很遺憾行嗤,這種寫法是錯(cuò)已日!誤!的昂验!這樣寫沒有正確理解Fork/Join模型的任務(wù)執(zhí)行邏輯捂敌。

JDK用來執(zhí)行Fork/Join任務(wù)的工作線程池大小等于CPU核心數(shù)。在一個(gè)4核CPU上既琴,最多可以同時(shí)執(zhí)行4個(gè)子任務(wù)占婉。對400個(gè)元素的數(shù)組求和,執(zhí)行時(shí)間應(yīng)該為1秒甫恩。但是逆济,換成上面的代碼,執(zhí)行時(shí)間卻是兩秒磺箕。

這是因?yàn)閳?zhí)行compute()方法的線程本身也是一個(gè)Worker線程奖慌,當(dāng)對兩個(gè)子任務(wù)調(diào)用fork()時(shí),這個(gè)Worker線程就會(huì)把任務(wù)分配給另外兩個(gè)Worker松靡,但是它自己卻停下來等待不干活了简僧!這樣就白白浪費(fèi)了Fork/Join線程池中的一個(gè)Worker線程,導(dǎo)致了4個(gè)子任務(wù)至少需要7個(gè)線程才能并發(fā)執(zhí)行雕欺。

正確的寫法:

SumTask subtask1 = new SumTask(this.array, start, middle);
        SumTask subtask2 = new SumTask(this.array, middle, end);
        invokeAll(subtask1, subtask2);
        Long subresult1 = subtask1.join();
        Long subresult2 = subtask2.join();
        Long result = subresult1 + subresult2;

其實(shí)岛马,我們查看JDK的invokeAll()方法的源碼就可以發(fā)現(xiàn),invokeAll的N個(gè)任務(wù)中屠列,其中N-1個(gè)任務(wù)會(huì)使用fork()交給其它線程執(zhí)行啦逆,但是,它還會(huì)留一個(gè)任務(wù)自己執(zhí)行笛洛,這樣夏志,就充分利用了線程池,保證沒有空閑的不干活的線程苛让。

https://www.liaoxuefeng.com/article/1146802219354112

例子

下面的代碼是測試forkjoin和for循環(huán)的耗時(shí)沟蔑。
測試結(jié)果:首次forkjoin的耗時(shí)會(huì)遠(yuǎn)遠(yuǎn)比for循環(huán)的耗時(shí)高湿诊,這是因?yàn)槭状涡枰コ跏蓟Y源,比如創(chuàng)建線程池等溉贿。所以耗時(shí)會(huì)遠(yuǎn)遠(yuǎn)比for循環(huán)高枫吧,但是第二次的耗時(shí)會(huì)比for循環(huán)少很多,這是因?yàn)樗枰馁Y源都已經(jīng)創(chuàng)建好了宇色,所以耗時(shí)會(huì)很低九杂。
結(jié)論:forkjoin首次運(yùn)行會(huì)需要?jiǎng)?chuàng)建線程池等資源所以會(huì)比實(shí)際運(yùn)行任務(wù)的時(shí)間會(huì)高一些,接下里在使用forkjoin運(yùn)行任務(wù)就會(huì)和運(yùn)行任務(wù)本身耗時(shí)差不多宣蠕。

public static void main(String[] args) {

        //測試forkjoin耗時(shí)
        List<Integer> list = new ArrayList<>();
        for (int i = 0; i < 8; i++) {
            list.add(i);
        }
        long s = System.currentTimeMillis();
        list.parallelStream().forEach(i -> {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        long e = System.currentTimeMillis();
        System.out.println("forkjoin:" + (e - s));

        //測試for循環(huán)耗時(shí)
        s = System.currentTimeMillis();
        for (int i : list) {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        }
        e = System.currentTimeMillis();
        System.out.println("for:" + (e - s));

        //再次測試forkjoin耗時(shí)
        s = System.currentTimeMillis();
        list.parallelStream().forEach(i -> {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e2) {
                e2.printStackTrace();
            }
        });
        e = System.currentTimeMillis();
        System.out.println("forkjoin:" + (e - s));
    }

結(jié)果

forkjoin:81
for:69
forkjoin:12

上面的代碼有一個(gè)細(xì)節(jié)要注意的是例隆,由于我本機(jī)的機(jī)器是8核,通過以下代碼可以獲取forkjoin默認(rèn)啟動(dòng)線程池大小:

ForkJoinPool.getCommonPoolParallelism();

可以得出結(jié)果是7抢蚀,也就是可以同時(shí)運(yùn)行7個(gè)任務(wù)镀层。
但是我上面的循環(huán)次數(shù)確實(shí)8,也就是有8個(gè)任務(wù)同時(shí)運(yùn)行皿曲,按理來說任務(wù)耗時(shí)要大于20ms唱逢,可實(shí)際打印結(jié)果確實(shí)12ms。這是為什么呢屋休?原因是因?yàn)閙ain線程也會(huì)運(yùn)行任務(wù)坞古,因?yàn)樽铋_始分解任務(wù)的線程是main線程,main線程分解任務(wù)后自己也會(huì)運(yùn)行任務(wù)劫樟,這樣就不會(huì)白白浪費(fèi)一個(gè)線程痪枫,所以這也就是為什么 ForkJoinPool.getCommonPoolParallelism();得到可以同時(shí)并行運(yùn)行任務(wù)的是 = 本機(jī)CPU核數(shù)-1 的原因,這是因?yàn)閙ain線程已經(jīng)占用了一個(gè)線程運(yùn)行任務(wù)叠艳。

我把上面的循環(huán)次數(shù)改為9奶陈,然后再次運(yùn)行,得到結(jié)果:

forkjoin:116
for:98
forkjoin:23

可以看到第二次forkjoin耗費(fèi)了23s附较。因?yàn)槲业腃PU是8核吃粒,每次最多可以運(yùn)行8個(gè)任務(wù),這次有9個(gè)任務(wù)拒课,所以只能等其他8個(gè)任務(wù)運(yùn)行完了之后徐勃,第9個(gè)任務(wù)在運(yùn)行,所以耗時(shí)會(huì)是23ms捕发。

參考

https://mp.weixin.qq.com/s/yvmWS4cCwTPzyEB3AxK5MQ

https://kaimingwan.com/post/java/forkjoinpooljie-du

https://segmentfault.com/a/1190000008470012

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末疏旨,一起剝皮案震驚了整個(gè)濱河市很魂,隨后出現(xiàn)的幾起案子扎酷,更是在濱河造成了極大的恐慌,老刑警劉巖遏匆,帶你破解...
    沈念sama閱讀 217,406評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件法挨,死亡現(xiàn)場離奇詭異谁榜,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)凡纳,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,732評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門窃植,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人荐糜,你說我怎么就攤上這事巷怜。” “怎么了暴氏?”我有些...
    開封第一講書人閱讀 163,711評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵延塑,是天一觀的道長。 經(jīng)常有香客問我答渔,道長关带,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,380評(píng)論 1 293
  • 正文 為了忘掉前任沼撕,我火速辦了婚禮宋雏,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘务豺。我一直安慰自己磨总,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,432評(píng)論 6 392
  • 文/花漫 我一把揭開白布冲呢。 她就那樣靜靜地躺著舍败,像睡著了一般。 火紅的嫁衣襯著肌膚如雪敬拓。 梳的紋絲不亂的頭發(fā)上邻薯,一...
    開封第一講書人閱讀 51,301評(píng)論 1 301
  • 那天,我揣著相機(jī)與錄音乘凸,去河邊找鬼厕诡。 笑死,一個(gè)胖子當(dāng)著我的面吹牛营勤,可吹牛的內(nèi)容都是我干的灵嫌。 我是一名探鬼主播,決...
    沈念sama閱讀 40,145評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼葛作,長吁一口氣:“原來是場噩夢啊……” “哼寿羞!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起赂蠢,我...
    開封第一講書人閱讀 39,008評(píng)論 0 276
  • 序言:老撾萬榮一對情侶失蹤绪穆,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體玖院,經(jīng)...
    沈念sama閱讀 45,443評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡菠红,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,649評(píng)論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了难菌。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片试溯。...
    茶點(diǎn)故事閱讀 39,795評(píng)論 1 347
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖郊酒,靈堂內(nèi)的尸體忽然破棺而出遇绞,到底是詐尸還是另有隱情,我是刑警寧澤燎窘,帶...
    沈念sama閱讀 35,501評(píng)論 5 345
  • 正文 年R本政府宣布试读,位于F島的核電站,受9級(jí)特大地震影響荠耽,放射性物質(zhì)發(fā)生泄漏钩骇。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,119評(píng)論 3 328
  • 文/蒙蒙 一铝量、第九天 我趴在偏房一處隱蔽的房頂上張望倘屹。 院中可真熱鬧,春花似錦慢叨、人聲如沸纽匙。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,731評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽烛缔。三九已至,卻和暖如春轩拨,著一層夾襖步出監(jiān)牢的瞬間践瓷,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,865評(píng)論 1 269
  • 我被黑心中介騙來泰國打工亡蓉, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留晕翠,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,899評(píng)論 2 370
  • 正文 我出身青樓砍濒,卻偏偏與公主長得像淋肾,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子爸邢,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,724評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容