Fork/Join框架淺談

什么是Fork/Join框架

Fork/Join框架是Java7提供了的一個(gè)用于并行執(zhí)行任務(wù)的框架葵擎,采用類似于分治算法原在,就是把一個(gè)復(fù)雜的問(wèn)題分成兩個(gè)或更多的相同或相似的子問(wèn)題惠险,直到最后子問(wèn)題可以簡(jiǎn)單的直接求解喉刘,原問(wèn)題的解即子問(wèn)題的解的合并休吠。

Fork/Join框架

在這個(gè)框架中值得注意的一個(gè)重要概念是在理想狀態(tài)下是沒有空閑的工作線程吼渡。 它們實(shí)現(xiàn)了一種工作竊取算法,閑置的工作線程可以從忙碌的工作線程拿工作執(zhí)行恩溅。

Fork/Join框架處理復(fù)雜的線程問(wèn)題隔箍,你只需向框架指出哪些部分工作可以分解并遞歸處理。偽代碼來(lái)自Doug Lea's的論文

Result solve(Problem problem) {
    if (problem is small)
        directly solve problem
    else {
        split problem into independent parts
        fork new subtasks to solve each part
        join all subtasks
        compose result from subresults
    }
}

Fork/Join框架的核心類

ForkJoinPool和ForkJoinTask是支持Fork/Join機(jī)制的核心類暴匠。

ForkJoinPool

ForkjoinPool是實(shí)現(xiàn)了ExecutorService和work-stealing(工作竊取)算法鞍恢。如下所示,新建ForkJoinPool實(shí)例每窖,指定并行等級(jí)(處理器的個(gè)數(shù))帮掉。

ForkJoinPool pool = new ForkJoinPool(numberOfProcessors);
Where numberOfProcessors = Runtime.getRunTime().availableProcessors();

如果使用無(wú)參構(gòu)造函數(shù),默認(rèn)創(chuàng)建pool的大小為上面所示的可用的處理器個(gè)數(shù)窒典。盡管你可以指定任意大小的pool蟆炊,但pool會(huì)動(dòng)態(tài)調(diào)整大小來(lái)嘗試獲得足夠的活動(dòng)線程。與ExecutorService另一個(gè)重要的不同瀑志,pool不需要在程序退出時(shí)顯式關(guān)閉涩搓,因?yàn)樗乃芯€程都處于守護(hù)進(jìn)程模式。

三種提交任務(wù)到ForkJoinPool的方法:

  1. execute():期望異步執(zhí)行劈猪,調(diào)用其fork方法在多個(gè)線程之間拆分工作昧甘。
  2. invoke():等待獲得結(jié)果。
  3. submit():完成時(shí)返回一個(gè)future對(duì)象用于檢查狀態(tài)以及運(yùn)行結(jié)果战得。
ForkJoinTask

ForkJoinTask是在ForkJoinPool創(chuàng)建工作的抽象類充边,RecursiveAction 和RecursiveTask是ForkJoinTask的直接子類,都要實(shí)現(xiàn)compute方法,兩者唯一的不同點(diǎn)是:RecursiveAction沒有返回任務(wù)的結(jié)果浇冰,而 RecursiveTask有返回任務(wù)的結(jié)果(可以自己指定類型的對(duì)象)贬媒。

ForkJoinTask類提供了幾個(gè)方法用于檢查任務(wù)運(yùn)行的狀態(tài). 無(wú)論以什么方式結(jié)束任務(wù),isDone() 方法返回true肘习;如果完成任務(wù)過(guò)程中沒有被取消或者發(fā)生異常际乘,CompletedNormally() 方法返回true;如果任務(wù)被取消漂佩, isCancelled() 方法返回true脖含;如果任務(wù)被取消或者遇到異常,isCompletedabnormally() 方法返回true投蝉。
異常處理代碼如下:

if(task.isCompletedAbnormally())
{
    System.out.println(task.getException());
}

getException方法返回Throwable對(duì)象器赞,如果任務(wù)被取消了則返回CancellationException。如果任務(wù)沒有完成或者沒有拋出異常則返回null墓拜。

工作竊取算法

工作竊取(work-stealing)算法是指某個(gè)線程從其他隊(duì)列里竊取任務(wù)來(lái)執(zhí)行请契。假如我們需要做一個(gè)比較大的任務(wù)咳榜,我們可以把這個(gè)任務(wù)分割為若干互不依賴的子任務(wù),為了減少線程間的競(jìng)爭(zhēng)爽锥,于是把這些子任務(wù)分別放到不同的隊(duì)列里涌韩,并為每個(gè)隊(duì)列創(chuàng)建一個(gè)單獨(dú)的線程來(lái)執(zhí)行隊(duì)列里的任務(wù),線程和隊(duì)列一一對(duì)應(yīng)氯夷,比如A線程負(fù)責(zé)處理A隊(duì)列里的任務(wù)臣樱。但是有的線程會(huì)先把自己隊(duì)列里的任務(wù)干完,而其他線程對(duì)應(yīng)的隊(duì)列里還有任務(wù)等待處理腮考。干完活的線程與其等著雇毫,不如去幫其他線程干活,于是它就去其他線程的隊(duì)列里竊取一個(gè)任務(wù)來(lái)執(zhí)行踩蔚。而在這時(shí)它們會(huì)訪問(wèn)同一個(gè)隊(duì)列棚放,所以為了減少竊取任務(wù)線程和被竊取任務(wù)線程之間的競(jìng)爭(zhēng),通常會(huì)使用雙端隊(duì)列馅闽,被竊取任務(wù)線程永遠(yuǎn)從雙端隊(duì)列的頭部拿任務(wù)執(zhí)行飘蚯,而竊取任務(wù)的線程永遠(yuǎn)從雙端隊(duì)列的尾部拿任務(wù)執(zhí)行。

工作竊取算法的優(yōu)點(diǎn)是充分利用線程進(jìn)行并行計(jì)算福也,并減少了線程間的競(jìng)爭(zhēng)局骤,其缺點(diǎn)是在某些情況下還是存在競(jìng)爭(zhēng),比如雙端隊(duì)列里只有一個(gè)任務(wù)時(shí)暴凑。并且消耗了更多的系統(tǒng)資源峦甩,比如創(chuàng)建多個(gè)線程和多個(gè)雙端隊(duì)列。

Fork/Join框架和ExecutorService的區(qū)別

Fork/Join框架和ExecutorService最主要的區(qū)別是工作竊取算法搬设。與Executor框架不同穴店,當(dāng)有線程完成了自己的所有子任務(wù)撕捍,而其他正在執(zhí)行的線程(稱為工作線程)還有子任務(wù)等待處理,就去其他線程的隊(duì)列里竊取一個(gè)任務(wù)來(lái)執(zhí)行泣洞。通過(guò)這種方式忧风,線程可以充分利用其運(yùn)行時(shí)間,從而提高應(yīng)用程序的性能球凰。

Fork/Join框架實(shí)踐例子

在這個(gè)例子中狮腿,我們使用ForkJoinPool和ForkJoinTask提供的異步方法來(lái)管理任務(wù)。我們將實(shí)現(xiàn)遍歷文件夾查找指定擴(kuò)展名的文件呕诉,F(xiàn)orkJoinTask實(shí)現(xiàn)處理一個(gè)文件夾內(nèi)的查找缘厢,如果存在子文件夾,為每一個(gè)文件夾fork一個(gè)新異步任務(wù)到ForkJoinPool中去甩挫,每個(gè)子任務(wù)會(huì)查找自己文件夾的指定擴(kuò)展名的文件贴硫。一旦任務(wù)已經(jīng)處理了所有的指定文件夾的內(nèi)容,利用ForkJoinPool的join()方法等待完成所有任務(wù)伊者。join方法是等待執(zhí)行完成并返回compute()方法的計(jì)算結(jié)果英遭。任務(wù)組的所有任務(wù),都將自己的結(jié)果返回添加到結(jié)果列表中亦渗。

詳細(xì)代碼如下:

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;

public class FolderProcessor extends RecursiveTask<List<String>> {
    private final String path;
    private final String extension;

    public FolderProcessor(String path,String extension) {
        this.extension = extension;
        this.path = path;
    }

    @Override
    protected List<String> compute() {
        List<String> list = new ArrayList<String>();
        List<FolderProcessor> tasks = new ArrayList<FolderProcessor>();
        File file = new File(path);
        File[] content= file.listFiles();
        if(content != null){
            for (File aContent : content) {
                if (aContent.isDirectory()) {
                    FolderProcessor task = 
              new FolderProcessor(aContent.getAbsolutePath(), extension);
                    task.fork();
                    tasks.add(task);
                } else {
                    if (checkFile(aContent.getName())) {
                        list.add(aContent.getAbsolutePath());
                    }
                }
            }
        }

        if (tasks.size() > 50)
        {
            System.out.printf("%s: %d tasks ran.\n", file.getAbsolutePath(), tasks.size());
        }
        addResultsFromTasks(list, tasks);
        return list;
    }

    private void addResultsFromTasks(List<String> list, List<FolderProcessor> tasks) {
        for (FolderProcessor item : tasks)
        {
            list.addAll(item.join());
        }
    }

    private boolean checkFile(String name) {
        return name.endsWith(extension);
    }

    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool();
        FolderProcessor system = new FolderProcessor("/System", "log");
        FolderProcessor library = new FolderProcessor("/Library", "log");
        FolderProcessor users = new FolderProcessor("/Users", "log");
        pool.execute(system);
        pool.execute(library);
        pool.execute(users);
        do
        {
            System.out.printf("******************************************\n");
            System.out.printf("Main: Parallelism: %d\n", pool.getParallelism());
            System.out.printf("Main: Active Threads: %d\n", pool.getActiveThreadCount());
            System.out.printf("Main: Task Count: %d\n", pool.getQueuedTaskCount());
            System.out.printf("Main: Steal Count: %d\n", pool.getStealCount());
            System.out.printf("******************************************\n");
            try
            {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e)
            {
                e.printStackTrace();
            }
        } while ((!system.isDone()) || (!library.isDone()) || (!users.isDone()));
        pool.shutdown();
        List<String> results;
        results = system.join();
        System.out.printf("System: %d files found.\n", results.size());
        results = library.join();
        System.out.printf("Library: %d files found.\n", results.size());
        results = users.join();
        System.out.printf("Users: %d files found.\n", results.size());
    }
}

結(jié)果輸出類似如下:

******************************************
Main: Parallelism: 8
Main: Active Threads: 60
Main: Task Count: 62370
Main: Steal Count: 81261
******************************************
******************************************
Main: Parallelism: 8
Main: Active Threads: 0
Main: Task Count: 19295
Main: Steal Count: 160629
******************************************

JDK中的使用實(shí)現(xiàn)

Java SE中有一些通用的功能挖诸,它們已經(jīng)使用fork/join框架來(lái)實(shí)現(xiàn)。
1.在Java 8的java.util.Arrays中的parallelSort方法采用了fork/join框架法精,在多處理器系統(tǒng)上多律,并行排序大量數(shù)據(jù)比順序排序更快
2.在Stream.parallel()中使用并行,更多請(qǐng)參考parallel stream operation in java 8搂蜓。

總結(jié):

設(shè)計(jì)優(yōu)秀的多線程算法是非常困難的狼荞,fork/join框架并不適用于所有情況,但是在它的適用范圍之內(nèi),能夠輕松的利用多個(gè)CPU提供的計(jì)算資源來(lái)協(xié)作完成一個(gè)復(fù)雜的計(jì)算任務(wù)洛勉。最終還是看你的問(wèn)題是否符合框架特性粘秆,若不符合,你可以使用基于java.util.concurrent包基礎(chǔ)工具方法實(shí)現(xiàn)自己的解決方案收毫。

參考

  1. Fork/Join Framework Tutorial: ForkJoinPool Example
  2. 聊聊并發(fā)(八)——Fork/Join框架介紹
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末攻走,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子此再,更是在濱河造成了極大的恐慌昔搂,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,627評(píng)論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件输拇,死亡現(xiàn)場(chǎng)離奇詭異摘符,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,180評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門逛裤,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)瘩绒,“玉大人,你說(shuō)我怎么就攤上這事带族∷螅” “怎么了?”我有些...
    開封第一講書人閱讀 169,346評(píng)論 0 362
  • 文/不壞的土叔 我叫張陵蝙砌,是天一觀的道長(zhǎng)阳堕。 經(jīng)常有香客問(wèn)我,道長(zhǎng)择克,這世上最難降的妖魔是什么恬总? 我笑而不...
    開封第一講書人閱讀 60,097評(píng)論 1 300
  • 正文 為了忘掉前任,我火速辦了婚禮肚邢,結(jié)果婚禮上壹堰,老公的妹妹穿的比我還像新娘。我一直安慰自己骡湖,他們只是感情好缀旁,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,100評(píng)論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著勺鸦,像睡著了一般。 火紅的嫁衣襯著肌膚如雪目木。 梳的紋絲不亂的頭發(fā)上换途,一...
    開封第一講書人閱讀 52,696評(píng)論 1 312
  • 那天,我揣著相機(jī)與錄音刽射,去河邊找鬼军拟。 笑死,一個(gè)胖子當(dāng)著我的面吹牛誓禁,可吹牛的內(nèi)容都是我干的懈息。 我是一名探鬼主播,決...
    沈念sama閱讀 41,165評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼摹恰,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼辫继!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起俗慈,我...
    開封第一講書人閱讀 40,108評(píng)論 0 277
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤姑宽,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后闺阱,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體炮车,經(jīng)...
    沈念sama閱讀 46,646評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,709評(píng)論 3 342
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了瘦穆。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片纪隙。...
    茶點(diǎn)故事閱讀 40,861評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖扛或,靈堂內(nèi)的尸體忽然破棺而出绵咱,到底是詐尸還是另有隱情,我是刑警寧澤告喊,帶...
    沈念sama閱讀 36,527評(píng)論 5 351
  • 正文 年R本政府宣布麸拄,位于F島的核電站,受9級(jí)特大地震影響黔姜,放射性物質(zhì)發(fā)生泄漏拢切。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,196評(píng)論 3 336
  • 文/蒙蒙 一秆吵、第九天 我趴在偏房一處隱蔽的房頂上張望淮椰。 院中可真熱鬧,春花似錦纳寂、人聲如沸主穗。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,698評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)忽媒。三九已至,卻和暖如春腋粥,著一層夾襖步出監(jiān)牢的瞬間晦雨,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,804評(píng)論 1 274
  • 我被黑心中介騙來(lái)泰國(guó)打工隘冲, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留闹瞧,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 49,287評(píng)論 3 379
  • 正文 我出身青樓展辞,卻偏偏與公主長(zhǎng)得像奥邮,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子罗珍,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,860評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容

  • 作者: 一字馬胡 轉(zhuǎn)載標(biāo)志 【2017-11-01】 更新日志 日期更新內(nèi)容備注2017-11-01新建文章V1...
    一字馬胡閱讀 7,376評(píng)論 9 134
  • 摘要 這篇論文描述了Fork/Join框架的設(shè)計(jì)洽腺、實(shí)現(xiàn)以及性能。這個(gè)框架通過(guò)(遞歸的)把問(wèn)題劃分為子任務(wù)覆旱,然后并行...
    itonyli閱讀 1,166評(píng)論 0 5
  • 一已脓、多線程 說(shuō)明下線程的狀態(tài) java中的線程一共有 5 種狀態(tài)。 NEW:這種情況指的是通殃,通過(guò) New 關(guān)鍵字創(chuàng)...
    Java旅行者閱讀 4,688評(píng)論 0 44
  • 秋天我想起了關(guān)羽 風(fēng)揚(yáng)起麥子 有三粒 落在盆地 漂在湖面 飛向雪山 秋天度液, 麥城還有馬尾 掃著蒼蠅 秋天厕宗,我想起了...
    西紅柿tomato閱讀 216評(píng)論 0 1
  • 用時(shí):1h 總評(píng): 如果你看過(guò)其他的時(shí)間管理的書籍,那么本書很值得你快速的看一看堕担,里面提到的時(shí)間模式化已慢、格式化的觀...
    tuionf閱讀 851評(píng)論 0 2