并行執(zhí)行任務的Fork/Join框架

背書中(引用書上的話):Java7中提供了用于并行執(zhí)行任務的Fork/Join框架跛锌, 可以把任務分成若干個分任務产场,最終匯總每個分任務的結果得到總?cè)蝿盏慕Y果光戈。這篇我們來看看Fork/Join框架跃捣。

先舉個栗子

一個字符串數(shù)組泽论,需要把每個元素中的*字符的索引返回,并求和(自己編了個栗子,沒有撒實際意義)偷卧,用Fork/Join框架來實現(xiàn)豺瘤,可以定義一個處理字符串數(shù)組的總?cè)蝿眨缓蟀芽側(cè)蝿詹鸱痔睿褦?shù)組中每個字符串交給子任務去處理坐求,然后等待子任務執(zhí)行完畢,匯總結果晌梨,并返回:

package thread.ForkJoin;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

/**
 * @Description: .
 * @Author: ZhaoWeiNan .
 * @CreatedTime: 2017/6/21 .
 * @Version: 1.0 .
 */
public class StringTask extends RecursiveTask<Integer>{
    //要處理的字符串
    private String dest;

    public StringTask(String dest) {
        this.dest = dest;
    }
    //父類RecursiveTask是一個抽象類桥嗤,所以需要實現(xiàn)compute方法
    @Override
    protected Integer compute() {
        if (dest == null || "".equals(dest))
            return 0;
        //判斷字符串中 * 的索引,并返回
        return dest.indexOf("*");
    }
}

class ArrayTask extends RecursiveTask<Integer>{
    //需要處理的字符串數(shù)組
    private String[] array;

    public ArrayTask(String[] array) {
        this.array = array;
    }

    @Override
    protected Integer compute() {
        if (array == null || array.length < 1)
            return 0;

        //申明一個StringTask變量仔蝌,作為子任務
        StringTask stringTask;
        //定義一個子任務隊列泛领,用于任務執(zhí)行完畢后,獲取子任務的執(zhí)行結果
        List<StringTask> list = new ArrayList<>();
        int sum = 0;
        //把字符串數(shù)組的中每一個字符串分給多個StringTask子任務去處理
        for (String s : array){
            //創(chuàng)建一個變量敛惊,作為子任務去處理字符串
            stringTask = new StringTask(s);
            //執(zhí)行子任務
            stringTask.fork();
            //加入子任務隊列
            list.add(stringTask);
        }

        for (StringTask task : list){
            //等子任務執(zhí)行完畢渊鞋,獲取子任務執(zhí)行的結果,并累加
            sum += task.join();
        }

        return sum;
    }
}

class Demo{

    public static void main(String[] args){
        //初始化字符串數(shù)組
        String[] array = new String[]{"#####*####","##*########","###*#######","#*############"};
        //創(chuàng)建一個總?cè)蝿涨萍罚幚碜址當?shù)組
        ArrayTask arrayTask = new ArrayTask(array);
        //創(chuàng)建執(zhí)行任務的線程池ForkJoinPool對象
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        //執(zhí)行總?cè)蝿?        ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(arrayTask);

        //返回任務的結果
        try {
            System.out.println(forkJoinTask.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

代碼放到了開源中國:http://git.oschina.net/zhaoweinan/forkjoin锡宋,有興趣的小伙伴可以拿去
總的來說,F(xiàn)ork/Join框架就是一個用來并行執(zhí)行任務的框架特恬,可以把一個大任務执俩,分成若干個子任務,等各個子任務執(zhí)行完畢癌刽,可以把他們的執(zhí)行結果獲取到役首,并匯聚,起到了并行執(zhí)行任務作用显拜。

Fork/Join框架的構成

1.ForkJoinPool

ForkJoinPool類圖

ForkJoinPool繼承了AbstractExecutorService抽象類衡奥,AbstractExecutorService實現(xiàn)了ExecutorService接口,由此看來ForkJoinPool也是線程池家族的一員讼油,


過濾了下方法杰赛,只顯示了公共方法,并截取了一下

ForkJoinPool使用invoke矮台、execute乏屯、submit用來執(zhí)行任務。

2.ForkJoinTask

ForkJoinTask類圖

ForkJoinTask是Fork/Join框架使用的任務類瘦赫,實現(xiàn)了Future接口辰晕,我們一般使用它的兩個子類RecursiveTask和RecursiveAction,


RecursiveAction類圖
public abstract class RecursiveAction extends ForkJoinTask<Void> {
    private static final long serialVersionUID = 5232453952276485070L;

RecursiveAction適用于沒有返回結果的任務确虱,

RecursiveTask類圖
public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
    private static final long serialVersionUID = 5232453952276485270L;

RecursiveTask適用于有返回值的任務含友。

Work-Stealing (工作竊取)

粗略說一下Work-Stealing,F(xiàn)orkJoinPool具有 Work-Stealing (工作竊染轿省)的能力辆童,什么意思呢?就拿文章開頭的栗子來說惠赫,把處理字符串數(shù)組的大任務把鉴,分成了若干個處理字符串的子任務,這些子任務線程執(zhí)行完畢后儿咱,不會閑著庭砍,回去執(zhí)行別的子任務,通俗的來說混埠,Work-Stealing (工作竊鹊「住)就是線程從其他隊列里面獲取任務來執(zhí)行。

Work-Stealing的優(yōu)點

充分利用了線程钳宪,提高了線程并行執(zhí)行任務的效率揭北,并減少了線程間競爭帶來的系統(tǒng)開銷。

Work-Stealing的缺點

存在競爭的情況使套,而且占用了更多的系統(tǒng)資源罐呼。

Fork/Join框架原理

ForkJoinPool分析

貼一張ForkJoinPool的類圖


大小不好調(diào)整,就截取一般吧

注意箭頭所指的兩個屬性侦高,
ForkJoinTask<?>數(shù)組submissionQueue,存放程序加到ForkJoinPool的任務

    private ForkJoinTask<?>[] submissionQueue;

ForkJoinWorkerThread類繼承了Thread厌杜,是一個線程類奉呛, ForkJoinWorkerThread[] workers就是一個線程數(shù)組,負責去執(zhí)行submissionQueue中的任務

    ForkJoinWorkerThread[] workers;

    .....
    public class ForkJoinWorkerThread extends Thread

ForkJoinTask分析

fork方法

獲取當前ForkJoinWorkerThread線程夯尽,調(diào)用ForkJoinWorkerThread的pushTask方法執(zhí)行ForkJoinTask任務

   public final ForkJoinTask<V> fork() {
        //獲取當前ForkJoinWorkerThread線程瞧壮,調(diào)用ForkJoinWorkerThread的pushTask方法執(zhí)行任務
        ((ForkJoinWorkerThread) Thread.currentThread())
                .pushTask(this);
        return this;
    }

再來看看ForkJoinWorkerThread的pushTask方法:

final void pushTask(ForkJoinTask<?> t) {
        ForkJoinTask<?>[] q; int s, m;
        if ((q = queue) != null) {    // ignore if queue removed
            long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
            UNSAFE.putOrderedObject(q, u, t);
            queueTop = s + 1;         // or use putOrderedInt
            if ((s -= queueBase) <= 2)
                //調(diào)用線程池ForkJoinPool的signalWork方法
                pool.signalWork();
            else if (s == m)
                growQueue();
        }
    }

ForkJoinWorkerThread的pushTask方法把任務ForkJoinTask加到了ForkJoinTask[]任務數(shù)組中,并調(diào)用了ForkJoinPool線程池的signalWork方法喚醒線程或者創(chuàng)建一個線程去執(zhí)行任務匙握,粗略的貼一下signalWork的關鍵代碼:

private void addWorker() {
        Throwable ex = null;
        ForkJoinWorkerThread t = null;
        try {
            t = factory.newThread(this);
        } catch (Throwable e) {
            ex = e;
        }
        if (t == null) {  // null or exceptional factory return
            long c;       // adjust counts
            do {} while (!UNSAFE.compareAndSwapLong
                         (this, ctlOffset, c = ctl,
                          (((c - AC_UNIT) & AC_MASK) |
                           ((c - TC_UNIT) & TC_MASK) |
                           (c & ~(AC_MASK|TC_MASK)))));
            // Propagate exception if originating from an external caller
            if (!tryTerminate(false) && ex != null &&
                !(Thread.currentThread() instanceof ForkJoinWorkerThread))
                UNSAFE.throwException(ex);
        }
        else
            t.start();
    }

最終調(diào)用到了這里咆槽,來執(zhí)行任務。

join方法

從文章開頭的栗子來看圈纺,join方法會阻塞當前線程秦忿,等待獲取任務執(zhí)行的結果

    //百度了這四種狀態(tài)的含義
    private static final int NORMAL      = -1;   //NORMAL已完成
    private static final int CANCELLED   = -2;  //CANCELLED已取消
    private static final int EXCEPTIONAL = -3;  //EXCEPTIONAL出現(xiàn)異常
    private static final int SIGNAL      =  1;  //SIGNAL信號

     public final V join() {
        //先調(diào)用doJoin方法判斷上面定義的四個狀態(tài)
        if (doJoin() != NORMAL)
            return reportResult();
        else
            return getRawResult();
    }

join方法先調(diào)用doJoin方法判斷任務的狀態(tài),看看doJoin方法蛾娶,

   private int doJoin() {
        Thread t; ForkJoinWorkerThread w; int s; boolean completed;
        //獲取當前ForkJoinWorkerThread線程
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
            //如果狀態(tài)是小于0 也就是 -1灯谣,-2,-3 分別代表已完成蛔琅、已取消胎许、出現(xiàn)異常
            //直接返回狀態(tài)
            if ((s = status) < 0)
                return s;
            if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
                //如果狀態(tài)為1|SIGNAL|信號
                //執(zhí)行任務
                try {
                    completed = exec();
                } catch (Throwable rex) {
                    //出現(xiàn)異常,把狀態(tài)改為-3|EXCEPTIONAL|出現(xiàn)異常,返回
                    return setExceptionalCompletion(rex);
                }
                if (completed)
                    //執(zhí)行成功辜窑,把狀態(tài)改為-1|NORMAL|已完成钩述,返回
                    return setCompletion(NORMAL);
            }
            return w.joinTask(this);
        }
        else
            return externalAwaitDone();
    }

doJoin查看任務的狀態(tài),如果狀態(tài)是-1|NORMAL|已完成穆碎,-2|CANCELLED|已取消牙勘,-3|EXCEPTIONAL|出現(xiàn)異常,證明任務已經(jīng)執(zhí)行完畢惨远,返回狀態(tài)位谜悟,如果狀態(tài)是 1|SIGNAL|信號,則去執(zhí)行任務北秽,如果執(zhí)行成功返回-1|NORMAL|已完成葡幸,出現(xiàn)異常返回-3|EXCEPTIONAL|出現(xiàn)異常。
再來看看返回結果的reportResult方法和getRawResult方法:

private V reportResult() {
        int s; Throwable ex;
        //如果狀態(tài)為-2|CANCELLED|已取消贺氓,拋出一個CancellationException異常
        if ((s = status) == CANCELLED)
            throw new CancellationException();
        if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
            UNSAFE.throwException(ex);
        //調(diào)用getRawResult方法返回結果
        return getRawResult();
    }

reportResult方法蔚叨,先會判斷狀態(tài),如果狀態(tài)為-2|CANCELLED|已取消辙培,則拋出一個CancellationException異常蔑水,否則調(diào)用getRawResult方法返回結果:

    public abstract V getRawResult();

getRawResult方法在ForkJoinTask類是抽象方法,具體實現(xiàn)在他的兩子類中扬蕊。
RecursiveAction子類:

    public final Void getRawResult() { return null; }

所以說RecursiveAction子類使用于沒有返回值的任務搀别。
RecursiveTask子類:

public final V getRawResult() {
        return result;
    }

RecursiveTask子類適用于有返回值的任務。

并行執(zhí)行任務的Fork/Join框架是說完了尾抑。
歡迎大家來交流歇父,指出文中一些說錯的地方,讓我加深認識再愈。
謝謝大家榜苫!

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市翎冲,隨后出現(xiàn)的幾起案子垂睬,更是在濱河造成了極大的恐慌,老刑警劉巖抗悍,帶你破解...
    沈念sama閱讀 211,194評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件驹饺,死亡現(xiàn)場離奇詭異,居然都是意外死亡檐春,警方通過查閱死者的電腦和手機逻淌,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,058評論 2 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來疟暖,“玉大人卡儒,你說我怎么就攤上這事田柔。” “怎么了骨望?”我有些...
    開封第一講書人閱讀 156,780評論 0 346
  • 文/不壞的土叔 我叫張陵硬爆,是天一觀的道長。 經(jīng)常有香客問我擎鸠,道長缀磕,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,388評論 1 283
  • 正文 為了忘掉前任劣光,我火速辦了婚禮袜蚕,結果婚禮上,老公的妹妹穿的比我還像新娘绢涡。我一直安慰自己牲剃,他們只是感情好,可當我...
    茶點故事閱讀 65,430評論 5 384
  • 文/花漫 我一把揭開白布雄可。 她就那樣靜靜地躺著凿傅,像睡著了一般。 火紅的嫁衣襯著肌膚如雪数苫。 梳的紋絲不亂的頭發(fā)上聪舒,一...
    開封第一講書人閱讀 49,764評論 1 290
  • 那天,我揣著相機與錄音虐急,去河邊找鬼箱残。 笑死,一個胖子當著我的面吹牛止吁,可吹牛的內(nèi)容都是我干的疚宇。 我是一名探鬼主播,決...
    沈念sama閱讀 38,907評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼赏殃,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了间涵?” 一聲冷哼從身側(cè)響起仁热,我...
    開封第一講書人閱讀 37,679評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎勾哩,沒想到半個月后抗蠢,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,122評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡思劳,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,459評論 2 325
  • 正文 我和宋清朗相戀三年迅矛,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片潜叛。...
    茶點故事閱讀 38,605評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡秽褒,死狀恐怖壶硅,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情销斟,我是刑警寧澤庐椒,帶...
    沈念sama閱讀 34,270評論 4 329
  • 正文 年R本政府宣布,位于F島的核電站蚂踊,受9級特大地震影響约谈,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜犁钟,卻給世界環(huán)境...
    茶點故事閱讀 39,867評論 3 312
  • 文/蒙蒙 一棱诱、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧涝动,春花似錦迈勋、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,734評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至昔穴,卻和暖如春镰官,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背吗货。 一陣腳步聲響...
    開封第一講書人閱讀 31,961評論 1 265
  • 我被黑心中介騙來泰國打工泳唠, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人宙搬。 一個月前我還...
    沈念sama閱讀 46,297評論 2 360
  • 正文 我出身青樓笨腥,卻偏偏與公主長得像,于是被迫代替她去往敵國和親勇垛。 傳聞我的和親對象是個殘疾皇子脖母,可洞房花燭夜當晚...
    茶點故事閱讀 43,472評論 2 348

推薦閱讀更多精彩內(nèi)容

  • 什么是Fork/Join框架 Fork/Join框架是Java7提供了的一個用于并行執(zhí)行任務的框架,采用類似于分治...
    碼農(nóng)歷險記閱讀 2,208評論 0 2
  • 一闲孤、多線程 說明下線程的狀態(tài) java中的線程一共有 5 種狀態(tài)谆级。 NEW:這種情況指的是,通過 New 關鍵字創(chuàng)...
    Java旅行者閱讀 4,662評論 0 44
  • 摘要 這篇論文描述了Fork/Join框架的設計讼积、實現(xiàn)以及性能肥照。這個框架通過(遞歸的)把問題劃分為子任務,然后并行...
    itonyli閱讀 1,157評論 0 5
  • 導讀目錄 線程組(ThreadGroup) 線程池(Thread Pool) Fork/Join框架和Execut...
    ql2012jz閱讀 1,448評論 0 0
  • 安老師今天讓我們制作小卡片勤众,小卡片長是21厘米寬度7厘米舆绎。為了讓我們練習五以內(nèi)的加減法,媽媽給我找了許多煙盒们颜,讓我...
    張余蔚閱讀 471評論 0 0