并發(fā)編程-ForkJoinPool

轉(zhuǎn)自: Java 并發(fā)編程筆記:如何使用 ForkJoinPool 以及原理

前言

Java 1.7 引入了一種新的并發(fā)框架—— Fork/Join Framework。

本文的主要目的是介紹 ForkJoinPool 的適用場景,實現(xiàn)原理察郁,以及示例代碼袜硫。

TLDR; 如果覺得文章太長的話,以下就是結(jié)論

  • ForkJoinPool 不是為了替代 ExecutorService,而是它的補充合瓢,在某些應(yīng)用場景下性能比 ExecutorService 更好市俊。(見 Java Tip: When to use ForkJoinPool vs ExecutorService
  • ForkJoinPool 主要用于實現(xiàn)“分而治之”的算法杨凑,特別是分治之后遞歸調(diào)用的函數(shù),例如 quick sort 等秕衙。
  • ForkJoinPool 最適合的是計算密集型的任務(wù)蠢甲,如果存在 I/O,線程間同步据忘,sleep() 等會造成線程長時間阻塞的情況時鹦牛,最好配合使用 ManagedBlocker
    一個任務(wù)會join其他小任務(wù)勇吊,如果一個小任務(wù)耗時比較長曼追,會造成很多大任務(wù)線程等待。

使用

首先介紹的是大家最關(guān)心的 Fork/Join Framework 的使用方法汉规,如果對使用方法已經(jīng)很熟悉的話礼殊,可以跳過這一節(jié),直接閱讀原理针史。

用一個特別簡單的求整數(shù)數(shù)組所有元素之和來作為我們現(xiàn)在需要解決的問題吧晶伦。

問題

計算1至1000的正整數(shù)之和。

解決方法

For-loop

最簡單的啄枕,顯然是不使用任何并行編程的手段婚陪,只用最直白的 for-loop 來實現(xiàn)。下面就是具體的實現(xiàn)代碼频祝。

不過為了便于橫向?qū)Ρ让诓危矠榱俗尨a更加 Java Style脆淹,首先我們先定義一個 interface。

public interface Calculator {
    long sumUp(long[] numbers);
}

這個 interface 非常簡單沽一,只有一個函數(shù) sumUp盖溺,就是返回數(shù)組內(nèi)所有元素的和。

再寫一個 main 方法铣缠。

public class Main {
    public static void main(String[] args) {
        long[] numbers = LongStream.rangeClosed(1, 1000).toArray();
        Calculator calculator = new MyCalculator();
        System.out.println(calculator.sumUp(numbers)); // 打印結(jié)果500500
    }
}

接下來就是我們的 Plain Old For-loop Calculator烘嘱,簡稱 POFLC 的實現(xiàn)了。(這其實是個段子蝗蛙,和主題完全無關(guān)拙友,感興趣的請見文末的彩蛋

public class ForLoopCalculator implements Calculator {
    public long sumUp(long[] numbers) {
        long total = 0;
        for (long i : numbers) {
            total += i;
        }
        return total;
    }
}

這段代碼毫無出奇之處,也就不多解釋了歼郭,直接跳入下一節(jié)——并行計算遗契。

ExecutorService

在 Java 1.5 引入 ExecutorService 之后,基本上已經(jīng)不推薦直接創(chuàng)建 Thread 對象病曾,而是統(tǒng)一使用 ExecutorService牍蜂。畢竟從接口的易用程度上來說 ExecutorService 就遠勝于原始的 Thread,更不用提 java.util.concurrent 提供的數(shù)種線程池泰涂,F(xiàn)uture 類鲫竞,Lock 類等各種便利工具。

使用 ExecutorService 的實現(xiàn)

public class ExecutorServiceCalculator implements Calculator {
    private int parallism;
    private ExecutorService pool;

    public ExecutorServiceCalculator() {
        parallism = Runtime.getRuntime().availableProcessors(); // CPU的核心數(shù)
        pool = Executors.newFixedThreadPool(parallism);
    }

    private static class SumTask implements Callable<Long> {
        private long[] numbers;
        private int from;
        private int to;

        public SumTask(long[] numbers, int from, int to) {
            this.numbers = numbers;
            this.from = from;
            this.to = to;
        }

        @Override
        public Long call() throws Exception {
            long total = 0;
            for (int i = from; i <= to; i++) {
                total += numbers[i];
            }
            return total;
        }
    }

    @Override
    public long sumUp(long[] numbers) {
        List<Future<Long>> results = new ArrayList<>();

        // 把任務(wù)分解為 n 份逼蒙,交給 n 個線程處理
        int part = numbers.length / parallism;
        for (int i = 0; i < parallism; i++) {
            int from = i * part;
            int to = (i == parallism - 1) ? numbers.length - 1 : (i + 1) * part - 1;
            results.add(pool.submit(new SumTask(numbers, from, to)));
        }

        // 把每個線程的結(jié)果相加从绘,得到最終結(jié)果
        long total = 0L;
        for (Future<Long> f : results) {
            try {
                total += f.get();
            } catch (Exception ignore) {}
        }

        return total;
    }
}

如果對 ExecutorService 不太熟悉的話,推薦閱讀《七天七并發(fā)模型》的第二章是牢,對 Java 的多線程編程基礎(chǔ)講解得比較清晰僵井。當(dāng)然著名的《Java并發(fā)編程實戰(zhàn)》也是不可多得的好書。

ForkJoinPool

前面花了點時間講解了 ForkJoinPool 之前的實現(xiàn)方法驳棱,主要為了在代碼的編寫難度上進行一下對比∨玻現(xiàn)在就列出本篇文章的重點——ForkJoinPool 的實現(xiàn)方法。

public class ForkJoinCalculator implements Calculator {
    private ForkJoinPool pool;

    private static class SumTask extends RecursiveTask<Long> {
        private long[] numbers;
        private int from;
        private int to;

        public SumTask(long[] numbers, int from, int to) {
            this.numbers = numbers;
            this.from = from;
            this.to = to;
        }

        @Override
        protected Long compute() {
            // 當(dāng)需要計算的數(shù)字小于6時社搅,直接計算結(jié)果
            if (to - from < 6) {
                long total = 0;
                for (int i = from; i <= to; i++) {
                    total += numbers[i];
                }
                return total;
            // 否則驻债,把任務(wù)一分為二,遞歸計算
            } else {
                int middle = (from + to) / 2;
                SumTask taskLeft = new SumTask(numbers, from, middle);
                SumTask taskRight = new SumTask(numbers, middle+1, to);
                taskLeft.fork();
                taskRight.fork();
                return taskLeft.join() + taskRight.join();
            }
        }
    }

    public ForkJoinCalculator() {
        // 也可以使用公用的 ForkJoinPool:
        // pool = ForkJoinPool.commonPool()
        pool = new ForkJoinPool();
    }

    @Override
    public long sumUp(long[] numbers) {
        return pool.invoke(new SumTask(numbers, 0, numbers.length-1));
    }
}

可以看出形葬,使用了 ForkJoinPool 的實現(xiàn)邏輯全部集中在了 compute() 這個函數(shù)里合呐,僅用了14行就實現(xiàn)了完整的計算過程。特別是笙以,在這段代碼里沒有顯式地“把任務(wù)分配給線程”淌实,只是分解了任務(wù),而把具體的任務(wù)到線程的映射交給了 ForkJoinPool 來完成。

原理

如果你除了 ForkJoinPool 的用法以外翩伪,對 ForkJoinPoll 的原理也感興趣的話,那么請接著閱讀這一節(jié)谈息。在這一節(jié)中缘屹,我會結(jié)合 ForkJoinPool 的作者 Doug Lea 的論文——《A Java Fork/Join Framework》,盡可能通俗地解釋 Fork/Join Framework 的原理侠仇。

我一直以為轻姿,要理解一樣?xùn)|西的原理,最好就是自己嘗試著去實現(xiàn)一遍逻炊。根據(jù)上面的示例代碼互亮,可以看出 fork()join() 是 Fork/Join Framework “魔法”的關(guān)鍵。我們可以根據(jù)函數(shù)名假設(shè)一下 fork()join() 的作用:

  • fork():開啟一個新線程(或是重用線程池內(nèi)的空閑線程)余素,將任務(wù)交給該線程處理豹休。
  • join():等待該任務(wù)的處理線程處理完畢,獲得返回值桨吊。

以上模型似乎可以(威根?)解釋 ForkJoinPool 能夠多線程執(zhí)行的事實,但有一個很明顯的問題

當(dāng)任務(wù)分解得越來越細時视乐,所需要的線程數(shù)就會越來越多洛搀,而且大部分線程處于等待狀態(tài)。

但是如果我們在上面的示例代碼加入以下代碼

System.out.println(pool.getPoolSize());

這會顯示當(dāng)前線程池的大小佑淀,在我的機器上這個值是4留美,也就是說只有4個工作線程。甚至即使我們在初始化 pool 時指定所使用的線程數(shù)為1時伸刃,上述程序也沒有任何問題——除了變成了一個串行程序以外谎砾。

public ForkJoinCalculator() {
    pool = new ForkJoinPool(1);
}

這個矛盾可以導(dǎo)出,我們的假設(shè)是錯誤的捧颅,并不是每個 fork() 都會促成一個新線程被創(chuàng)建棺榔,而每個 join() 也不是一定會造成線程被阻塞。Fork/Join Framework 的實現(xiàn)算法并不是那么“顯然”隘道,而是一個更加復(fù)雜的算法——這個算法的名字就叫做 work stealing 算法症歇。

work stealing 算法在 Doung Lea 的論文中有詳細的描述,以下是我在結(jié)合 Java 1.8 代碼的閱讀以后——現(xiàn)有代碼的實現(xiàn)有一部分相比于論文中的描述發(fā)生了變化——得到的相對通俗的解釋:

基本思想

image
  • ForkJoinPool 的每個工作線程都維護著一個工作隊列WorkQueue)谭梗,這是一個雙端隊列(Deque)忘晤,里面存放的對象是任務(wù)ForkJoinTask)。
  • 每個工作線程在運行中產(chǎn)生新的任務(wù)(通常是因為調(diào)用了 fork())時激捏,會放入工作隊列的隊尾设塔,并且工作線程在處理自己的工作隊列時,使用的是 LIFO 方式远舅,也就是說每次從隊尾取出任務(wù)來執(zhí)行闰蛔。
  • 每個工作線程在處理自己的工作隊列同時痕钢,會嘗試竊取一個任務(wù)(或是來自于剛剛提交到 pool 的任務(wù),或是來自于其他工作線程的工作隊列)序六,竊取的任務(wù)位于其他線程的工作隊列的隊首任连,也就是說工作線程在竊取其他工作線程的任務(wù)時,使用的是 FIFO 方式例诀。
  • 在遇到 join() 時随抠,如果需要 join 的任務(wù)尚未完成,則會先處理其他任務(wù)繁涂,并等待其完成拱她。
  • 在既沒有自己的任務(wù),也沒有可以竊取的任務(wù)時扔罪,進入休眠秉沼。

下面來介紹一下關(guān)鍵的兩個函數(shù):fork()join() 的實現(xiàn)細節(jié),相比來說 fork()join() 簡單很多矿酵,所以先來介紹 fork()氧猬。

fork

fork() 做的工作只有一件事,既是把任務(wù)推入當(dāng)前工作線程的工作隊列里坏瘩≈迅В可以參看以下的源代碼:

public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}

join

join() 的工作則復(fù)雜得多,也是 join() 可以使得線程免于被阻塞的原因——不像同名的 Thread.join()倔矾。

  1. 檢查調(diào)用 join() 的線程是否是 ForkJoinThread 線程妄均。如果不是(例如 main 線程),則阻塞當(dāng)前線程哪自,等待任務(wù)完成丰包。如果是,則不阻塞壤巷。
  2. 查看任務(wù)的完成狀態(tài)邑彪,如果已經(jīng)完成,直接返回結(jié)果胧华。
  3. 如果任務(wù)尚未完成寄症,但處于自己的工作隊列內(nèi),則完成它矩动。
  4. 如果任務(wù)已經(jīng)被其他的工作線程偷走有巧,則竊取這個小偷的工作隊列內(nèi)的任務(wù)(以 FIFO 方式),執(zhí)行悲没,以期幫助它早日完成欲 join 的任務(wù)篮迎。
  5. 如果偷走任務(wù)的小偷也已經(jīng)把自己的任務(wù)全部做完,正在等待需要 join 的任務(wù)時,則找到小偷的小偷甜橱,幫助它完成它的任務(wù)逊笆。
  6. 遞歸地執(zhí)行第5步。

將上述流程畫成序列圖的話就是這個樣子:

image

以上就是 fork()join() 的原理岂傲,這可以解釋 ForkJoinPool 在遞歸過程中的執(zhí)行邏輯难裆,但還有一個問題

最初的任務(wù)是 push 到哪個線程的工作隊列里的?

這就涉及到 submit() 函數(shù)的實現(xiàn)方法了

submit

其實除了前面介紹過的每個工作線程自己擁有的工作隊列以外譬胎,ForkJoinPool 自身也擁有工作隊列,這些工作隊列的作用是用來接收由外部線程(非 ForkJoinThread 線程)提交過來的任務(wù)命锄,而這些工作隊列被稱為 submitting queue 堰乔。

submit()fork() 其實沒有本質(zhì)區(qū)別,只是提交對象變成了 submitting queue 而已(還有一些同步脐恩,初始化的操作)镐侯。submitting queue 和其他 work queue 一樣,是工作線程”竊取“的對象驶冒,因此當(dāng)其中的任務(wù)被一個工作線程成功竊取時苟翻,就意味著提交的任務(wù)真正開始進入執(zhí)行階段。

總結(jié)

在了解了 Fork/Join Framework 的工作原理之后骗污,相信很多使用上的注意事項就可以從原理中找到原因崇猫。例如:為什么在 ForkJoinTask里最好不要存在 I/O 等會阻塞線程的行為狱从?刃鳄,這個我姑且留作思考題吧 :)

還有一些延伸閱讀的內(nèi)容,在此僅提及一下:

  1. ForkJoinPool 有一個 Async Mode 赂乐,效果是工作線程在處理本地任務(wù)時也使用 FIFO 順序屋厘。這種模式下的 ForkJoinPool 更接近于是一個消息隊列涕烧,而不是用來處理遞歸式的任務(wù)。
  2. 在需要阻塞工作線程時汗洒,可以使用 ManagedBlocker议纯。
  3. Java 1.8 新增加的 CompletableFuture 類可以實現(xiàn)類似于 Javascript 的 promise-chain,內(nèi)部就是使用 ForkJoinPool 來實現(xiàn)的溢谤。
  4. 采取雙端隊列的一個原因是為了減小競爭(還有其他原因瞻凤?)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末世杀,一起剝皮案震驚了整個濱河市鲫构,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌玫坛,老刑警劉巖结笨,帶你破解...
    沈念sama閱讀 217,657評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異,居然都是意外死亡炕吸,警方通過查閱死者的電腦和手機伐憾,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,889評論 3 394
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來赫模,“玉大人树肃,你說我怎么就攤上這事∑俾蓿” “怎么了胸嘴?”我有些...
    開封第一講書人閱讀 164,057評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長斩祭。 經(jīng)常有香客問我劣像,道長,這世上最難降的妖魔是什么摧玫? 我笑而不...
    開封第一講書人閱讀 58,509評論 1 293
  • 正文 為了忘掉前任耳奕,我火速辦了婚禮,結(jié)果婚禮上诬像,老公的妹妹穿的比我還像新娘屋群。我一直安慰自己,他們只是感情好坏挠,可當(dāng)我...
    茶點故事閱讀 67,562評論 6 392
  • 文/花漫 我一把揭開白布芍躏。 她就那樣靜靜地躺著,像睡著了一般降狠。 火紅的嫁衣襯著肌膚如雪纸肉。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,443評論 1 302
  • 那天喊熟,我揣著相機與錄音柏肪,去河邊找鬼。 笑死芥牌,一個胖子當(dāng)著我的面吹牛烦味,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播壁拉,決...
    沈念sama閱讀 40,251評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼谬俄,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了弃理?” 一聲冷哼從身側(cè)響起溃论,我...
    開封第一講書人閱讀 39,129評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎痘昌,沒想到半個月后钥勋,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體炬转,經(jīng)...
    沈念sama閱讀 45,561評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,779評論 3 335
  • 正文 我和宋清朗相戀三年算灸,在試婚紗的時候發(fā)現(xiàn)自己被綠了扼劈。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,902評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡菲驴,死狀恐怖荐吵,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情赊瞬,我是刑警寧澤先煎,帶...
    沈念sama閱讀 35,621評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站巧涧,受9級特大地震影響薯蝎,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜褒侧,卻給世界環(huán)境...
    茶點故事閱讀 41,220評論 3 328
  • 文/蒙蒙 一良风、第九天 我趴在偏房一處隱蔽的房頂上張望谊迄。 院中可真熱鬧闷供,春花似錦、人聲如沸统诺。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,838評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽粮呢。三九已至婿失,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間啄寡,已是汗流浹背豪硅。 一陣腳步聲響...
    開封第一講書人閱讀 32,971評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留挺物,地道東北人懒浮。 一個月前我還...
    沈念sama閱讀 48,025評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像识藤,于是被迫代替她去往敵國和親砚著。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,843評論 2 354

推薦閱讀更多精彩內(nèi)容