[轉(zhuǎn)]Java Fork/Join 框架

簡(jiǎn)介

從JDK1.7開(kāi)始叽奥,Java提供Fork/Join框架用于并行執(zhí)行任務(wù)俱两,它的思想就是講一個(gè)大任務(wù)分割成若干小任務(wù)饱狂,最終匯總每個(gè)小任務(wù)的結(jié)果得到這個(gè)大任務(wù)的結(jié)果。
這種思想和MapReduce很像(input --> split --> map --> reduce --> output)
主要有兩步:

  • 第一宪彩、任務(wù)切分休讳;
  • 第二、結(jié)果合并
    它的模型大致是這樣的:線程池中的每個(gè)線程都有自己的工作隊(duì)列(PS:這一點(diǎn)和ThreadPoolExecutor不同尿孔,ThreadPoolExecutor是所有線程公用一個(gè)工作隊(duì)列俊柔,所有線程都從這個(gè)工作隊(duì)列中取任務(wù)),當(dāng)自己隊(duì)列中的任務(wù)都完成以后活合,會(huì)從其它線程的工作隊(duì)列中偷一個(gè)任務(wù)執(zhí)行雏婶,這樣可以充分利用資源。

工作竊劝字浮(work-stealing)

工作竊攘敉怼(work-stealing)算法是指某個(gè)線程從其他隊(duì)列里竊取任務(wù)來(lái)執(zhí)行。工作竊取的運(yùn)行流程圖如下:


工作竊取流程圖

那么為什么需要使用工作竊取算法呢告嘲?

假如我們需要做一個(gè)比較大的任務(wù)错维,我們可以把這個(gè)任務(wù)分割為若干互不依賴的子任務(wù),為了減少線程間的競(jìng)爭(zhēng)橄唬,于是把這些子任務(wù)分別放到不同的隊(duì)列里赋焕,并為每個(gè)隊(duì)列創(chuàng)建一個(gè)單獨(dú)的線程來(lái)執(zhí)行隊(duì)列里的任務(wù),線程和隊(duì)列一一對(duì)應(yīng)仰楚,比如A線程負(fù)責(zé)處理A隊(duì)列里的任務(wù)隆判。但是有的線程會(huì)先把自己隊(duì)列里的任務(wù)干完,而其他線程對(duì)應(yīng)的隊(duì)列里還有任務(wù)等待處理缸血。干完活的線程與其等著蜜氨,不如去幫其他線程干活,于是它就去其他線程的隊(duì)列里竊取一個(gè)任務(wù)來(lái)執(zhí)行捎泻。而在這時(shí)它們會(huì)訪問(wèn)同一個(gè)隊(duì)列飒炎,所以為了減少竊取任務(wù)線程和被竊取任務(wù)線程之間的競(jìng)爭(zhēng),通常會(huì)使用雙端隊(duì)列笆豁,被竊取任務(wù)線程永遠(yuǎn)從雙端隊(duì)列的頭部拿任務(wù)執(zhí)行郎汪,而竊取任務(wù)的線程永遠(yuǎn)從雙端隊(duì)列的尾部拿任務(wù)執(zhí)行赤赊。

工作竊取算法的優(yōu)點(diǎn)是充分利用線程進(jìn)行并行計(jì)算,并減少了線程間的競(jìng)爭(zhēng)煞赢,其缺點(diǎn)是在某些情況下還是存在競(jìng)爭(zhēng)抛计,比如雙端隊(duì)列里只有一個(gè)任務(wù)時(shí)。并且消耗了更多的系統(tǒng)資源照筑,比如創(chuàng)建多個(gè)線程和多個(gè)雙端隊(duì)列吹截。

API介紹

ForkJoinPool

An ExecutorService for running ForkJoinTasks.
A ForkJoinPool differs from other kinds of ExecutorService mainly by virtue of employing work-stealing: all threads in the pool attempt to find and execute tasks submitted to the pool and/or created by other active tasks (eventually blocking waiting for work if none exist). This enables efficient processing when most tasks spawn other subtasks (as do most ForkJoinTasks), as well as when many small tasks are submitted to the pool from external clients. Especially when setting asyncMode to true in constructors, ForkJoinPools may also be appropriate for use with event-style tasks that are never joined.

ForkJoinPool與其它的ExecutorService區(qū)別主要在于它使用“工作竊取”:線程池中的所有線程都企圖找到并執(zhí)行提交給線程池的任務(wù)。當(dāng)大量的任務(wù)產(chǎn)生子任務(wù)的時(shí)候凝危,或者同時(shí)當(dāng)有許多小任務(wù)被提交到線程池中的時(shí)候波俄,這種處理是非常高效的。特別的蛾默,當(dāng)在構(gòu)造方法中設(shè)置asyncMode為true的時(shí)候這種處理更加高效懦铺。

ForkJoinTask

ForkJoinTask代表運(yùn)行在ForkJoinPool中的任務(wù)。

主要方法

  • fork() 在當(dāng)前線程運(yùn)行的線程池中安排一個(gè)異步執(zhí)行支鸡。簡(jiǎn)單的理解就是再創(chuàng)建一個(gè)子任務(wù)冬念。
  • join() 當(dāng)任務(wù)完成的時(shí)候返回計(jì)算結(jié)果。
  • invoke() 開(kāi)始執(zhí)行任務(wù)牧挣,如果必要急前,等待計(jì)算完成。

子類

  • RecursiveAction 一個(gè)遞歸無(wú)結(jié)果的ForkJoinTask(沒(méi)有返回值)
  • RecursiveTask 一個(gè)遞歸有結(jié)果的ForkJoinTask(有返回值)

ForkJoinWorkerThread

A thread managed by a ForkJoinPool, which executes ForkJoinTasks.
ForkJoinWorkerThread代表ForkJoinPool線程池中的一個(gè)執(zhí)行任務(wù)的線程瀑构。


類圖


代碼分析

接下來(lái)叔汁,簡(jiǎn)略的看一下關(guān)鍵代碼來(lái)加深對(duì)Fork/Join的理解。

ForkJoinPool

WorkQueue是一個(gè)ForkJoinPool中的內(nèi)部類检碗,它是線程池中線程的工作隊(duì)列的一個(gè)封裝据块,支持任務(wù)竊取。

什么叫線程的任務(wù)竊取呢折剃?就是說(shuō)你和你的一個(gè)伙伴一起吃水果另假,你的那份吃完了,他那份沒(méi)吃完怕犁,那你就偷偷的拿了他的一些水果吃了边篮。存在執(zhí)行2個(gè)任務(wù)的子線程,這里要講成存在A,B兩個(gè)個(gè)WorkQueue在執(zhí)行任務(wù)奏甫,A的任務(wù)執(zhí)行完了戈轿,B的任務(wù)沒(méi)執(zhí)行完,那么A的WorkQueue就從B的WorkQueue的ForkJoinTask數(shù)組中拿走了一部分尾部的任務(wù)來(lái)執(zhí)行阵子,可以合理的提高運(yùn)行和計(jì)算效率思杯。


submit()



可以看到:

  1. 同樣是提交任務(wù),submit會(huì)返回ForkJoinTask挠进,而execute不會(huì)
  2. 任務(wù)提交給線程池以后色乾,會(huì)將這個(gè)任務(wù)加入到當(dāng)前提交者的任務(wù)隊(duì)列中誊册。

前面我們說(shuō)過(guò),每個(gè)線程都有一個(gè)WorkQueue暖璧,而WorkQueue中有執(zhí)行任務(wù)的線程(ForkJoinWorkerThread owner)案怯,還有這個(gè)線程需要處理的任務(wù)(ForkJoinTask<?>[] array)。那么這個(gè)新提交的任務(wù)就是加到array中澎办。

ForkJoinWorkerThread


從代碼中我們可以清楚地看到嘲碱,F(xiàn)orkJoinWorkThread持有ForkJoinPool和ForkJoinPool.WorkQueue的引用,以表明該線程屬于哪個(gè)線程池局蚀,它的工作隊(duì)列是哪個(gè)

ForkJoinTask

fork()


可以看到悍汛,如果是ForkJoinWorkerThread運(yùn)行過(guò)程中fork(),則直接加入到它的工作隊(duì)列中至会,否則,重新提交任務(wù)谱俭。

join()和invoke()


可以看到它們都會(huì)等待計(jì)算完成


圖形化處理過(guò)程

下面盜兩張圖


使用示例

批量發(fā)送消息

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;

public class ForkJoinPoolDemo {

    class SendMsgTask extends RecursiveAction {

        private final int THRESHOLD = 10;

        private int start;
        private int end;
        private List<String> list;

        public SendMsgTask(int start, int end, List<String> list) {
            this.start = start;
            this.end = end;
            this.list = list;
        }

        @Override
        protected void compute() {
            if ((end - start) <= THRESHOLD) {
                for (int i = start; i < end; i++) {
                    System.out.println(Thread.currentThread().getName() + ": " + list.get(i));
                }
            }else {
                int middle = (start + end) / 2;
                invokeAll(new SendMsgTask(start, middle, list), new SendMsgTask(middle, end, list));
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        List<String> list = new ArrayList<>();
        for (int i = 0; i < 123; i++) {
            list.add(String.valueOf(i+1));
        }

        ForkJoinPool pool = new ForkJoinPool();
        pool.submit(new ForkJoinPoolDemo().new SendMsgTask(0, list.size(), list));
        pool.awaitTermination(10, TimeUnit.SECONDS);
        pool.shutdown();
    }
}

求和

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

public class ForkJoinTaskDemo {

    private class SumTask extends RecursiveTask<Integer> {

        private static final int THRESHOLD = 20;

        private int arr[];
        private int start;
        private int end;

        public SumTask(int[] arr, int start, int end) {
            this.arr = arr;
            this.start = start;
            this.end = end;
        }

        /**
         * 小計(jì)
         */
        private Integer subtotal() {
            Integer sum = 0;
            for (int i = start; i < end; i++) {
                sum += arr[i];
            }
            System.out.println(Thread.currentThread().getName() + ": ∑(" + start + "~" + end + ")=" + sum);
            return sum;
        }

        @Override
        protected Integer compute() {

            if ((end - start) <= THRESHOLD) {
                return subtotal();
            }else {
                int middle = (start + end) / 2;
                SumTask left = new SumTask(arr, start, middle);
                SumTask right = new SumTask(arr, middle, end);
                left.fork();
                right.fork();

                return left.join() + right.join();
            }
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        int[] arr = new int[100];
        for (int i = 0; i < 100; i++) {
            arr[i] = i + 1;
        }

        ForkJoinPool pool = new ForkJoinPool();
        ForkJoinTask<Integer> result = pool.submit(new ForkJoinTaskDemo().new SumTask(arr, 0, arr.length));
        System.out.println("最終計(jì)算結(jié)果: " + result.invoke());
        pool.shutdown();
    }
}
ForkJoinPool.commonPool-worker-2: ∑(50~62)=678
ForkJoinPool.commonPool-worker-2: ∑(62~75)=897
ForkJoinPool.commonPool-worker-2: ∑(75~87)=978
ForkJoinPool.commonPool-worker-2: ∑(87~100)=1222
ForkJoinPool-1-worker-1: ∑(0~12)=78
ForkJoinPool-1-worker-1: ∑(12~25)=247
ForkJoinPool-1-worker-1: ∑(25~37)=378
ForkJoinPool-1-worker-1: ∑(37~50)=572
ForkJoinPool-1-worker-2: ∑(75~87)=978
ForkJoinPool-1-worker-3: ∑(50~62)=678
ForkJoinPool-1-worker-5: ∑(62~75)=897
ForkJoinPool.commonPool-worker-7: ∑(0~12)=78
ForkJoinPool.commonPool-worker-3: ∑(37~50)=572
ForkJoinPool-1-worker-4: ∑(87~100)=1222
ForkJoinPool.commonPool-worker-2: ∑(25~37)=378
ForkJoinPool.commonPool-worker-5: ∑(12~25)=247
最終計(jì)算結(jié)果: 5050

api文檔中的兩個(gè)示例

import java.util.Arrays;
import java.util.concurrent.*;

public class RecursiveActionDemo {

    private static class SortTask extends RecursiveAction {

        static final int THRESHOLD = 100;

        final long[] array;
        final int lo, hi;

        public SortTask(long[] array, int lo, int hi) {
            this.array = array;
            this.lo = lo;
            this.hi = hi;
        }

        public SortTask(long[] array) {
            this(array, 0, array.length);
        }

        public void sortSequentially(int lo, int hi) {
            Arrays.sort(array, lo, hi);
        }

        public void merge(int lo, int mid, int hi) {
            long[] buf = Arrays.copyOfRange(array, lo, mid);
            for (int i = 0, j = lo, k = mid; i < buf.length; j++) {
                array[j] = (k == hi || buf[i] < array[k]) ? buf[i++] : array[k++];
            }
        }

        @Override
        protected void compute() {
            if (hi - lo < THRESHOLD) {
                sortSequentially(lo, hi);
            }else {
                int mid = (lo + hi) >>> 1;
                invokeAll(new SortTask(array, lo, mid), new SortTask(array, mid, hi));
                merge(lo, mid, hi);
            }
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        long[] array = new long[120];
        for (int i = 0; i < array.length; i++) {
            array[i] = (long) (Math.random() * 1000);
        }
        System.out.println(Arrays.toString(array));

        ForkJoinPool pool = new ForkJoinPool();
        pool.submit(new SortTask(array));
        pool.awaitTermination(5, TimeUnit.SECONDS);
        pool.shutdown();
    }
}
import java.util.concurrent.*;

public class RecursiveTaskDemo {

    private static class Fibonacci extends RecursiveTask<Integer> {

        final int n;

        public Fibonacci(int n) {
            this.n = n;
        }

        @Override
        protected Integer compute() {
            if (n <= 1) {
                return n;
            }else {
                Fibonacci f1 = new Fibonacci(n - 1);
                f1.fork();
                Fibonacci f2 = new Fibonacci(n - 1);
                return f2.compute() + f1.join();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ForkJoinPool pool = new ForkJoinPool();
        Future<Integer> future = pool.submit(new Fibonacci(10));
        System.out.println(future.get());
        pool.shutdown();
    }

}

參考

http://gee.cs.oswego.edu/dl/papers/fj.pdf
http://ifeve.com/talk-concurrency-forkjoin/
https://www.cnblogs.com/senlinyang/p/7885964.html
https://blog.csdn.net/u012403290/article/details/70917810

原文鏈接:https://www.cnblogs.com/cjsblog/p/9078341.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末奉件,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子昆著,更是在濱河造成了極大的恐慌县貌,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,386評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件凑懂,死亡現(xiàn)場(chǎng)離奇詭異煤痕,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)接谨,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,142評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門摆碉,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人脓豪,你說(shuō)我怎么就攤上這事巷帝。” “怎么了扫夜?”我有些...
    開(kāi)封第一講書人閱讀 164,704評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵楞泼,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我笤闯,道長(zhǎng)堕阔,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書人閱讀 58,702評(píng)論 1 294
  • 正文 為了忘掉前任颗味,我火速辦了婚禮超陆,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘浦马。我一直安慰自己侥猬,他們只是感情好例驹,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,716評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著退唠,像睡著了一般鹃锈。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上瞧预,一...
    開(kāi)封第一講書人閱讀 51,573評(píng)論 1 305
  • 那天屎债,我揣著相機(jī)與錄音,去河邊找鬼垢油。 笑死盆驹,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的滩愁。 我是一名探鬼主播躯喇,決...
    沈念sama閱讀 40,314評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼硝枉!你這毒婦竟也來(lái)了廉丽?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書人閱讀 39,230評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤妻味,失蹤者是張志新(化名)和其女友劉穎正压,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體责球,經(jīng)...
    沈念sama閱讀 45,680評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡焦履,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,873評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了雏逾。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片嘉裤。...
    茶點(diǎn)故事閱讀 39,991評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖栖博,靈堂內(nèi)的尸體忽然破棺而出价脾,到底是詐尸還是另有隱情,我是刑警寧澤笛匙,帶...
    沈念sama閱讀 35,706評(píng)論 5 346
  • 正文 年R本政府宣布侨把,位于F島的核電站,受9級(jí)特大地震影響妹孙,放射性物質(zhì)發(fā)生泄漏秋柄。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,329評(píng)論 3 330
  • 文/蒙蒙 一蠢正、第九天 我趴在偏房一處隱蔽的房頂上張望骇笔。 院中可真熱鬧,春花似錦、人聲如沸笨触。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 31,910評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)芦劣。三九已至粗俱,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間虚吟,已是汗流浹背寸认。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 33,038評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留串慰,地道東北人偏塞。 一個(gè)月前我還...
    沈念sama閱讀 48,158評(píng)論 3 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像邦鲫,于是被迫代替她去往敵國(guó)和親灸叼。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,941評(píng)論 2 355