(十一)徹悟并發(fā)之JUC分治思想產(chǎn)物-ForkJoin分支合并框架原理剖析上篇

引言

在上篇文章《深入理解并發(fā)之Java線(xiàn)程池奥吩、工作原理时捌、復(fù)用原理及源碼分析》中亏吝,曾詳細(xì)談到了Java的線(xiàn)程池框架店诗。在其中也說(shuō)到了JDK提供的四種原生線(xiàn)程池以及自定義線(xiàn)程池裹刮,而本文則再來(lái)詳細(xì)談?wù)凧DK1.7中新推出的線(xiàn)程池:ForkJoinPool。但ForkJoinPool的出現(xiàn)并不是為了替代ThreadPoolExecutor庞瘸,而是作為它的補(bǔ)充捧弃,因?yàn)樵谀承﹫?chǎng)景下,它的性能會(huì)比ThreadPoolExecutor更好擦囊。在之前的模式中违霞,往往一個(gè)任務(wù)會(huì)分配給一條線(xiàn)程執(zhí)行,如果有個(gè)任務(wù)耗時(shí)比較長(zhǎng)瞬场,并且在處理期間也沒(méi)有新的任務(wù)到來(lái)买鸽,那么則會(huì)出現(xiàn)一種情況:線(xiàn)程池中只有一條線(xiàn)程在處理這個(gè)大任務(wù),而其他線(xiàn)程卻空閑著贯被,這會(huì)導(dǎo)致CPU負(fù)載不均衡眼五,空閑的處理器無(wú)法幫助工作,從而無(wú)法最大程度上發(fā)揮多核機(jī)器的性能彤灶。而ForkJoinPool則可以完美解決這類(lèi)問(wèn)題看幼,但ForkJoinPool更適合的是處理一些耗時(shí)的大任務(wù),如果是普通任務(wù)幌陕,反而會(huì)因?yàn)檫^(guò)多的任務(wù)拆分和多條線(xiàn)程CPU的來(lái)回切換導(dǎo)致效率下降诵姜。

一、初窺ForkJoin框架的神秘面紗

ForkJoinPool是一個(gè)建立在分治思想上的產(chǎn)物搏熄,其采用任務(wù)“大拆小”的方式以及工作竊取算法實(shí)現(xiàn)并行處理任務(wù)棚唆。通俗來(lái)說(shuō)暇赤,F(xiàn)orkJoin框架的作用主要是為了實(shí)現(xiàn)將大型復(fù)雜任務(wù)進(jìn)行遞歸的分解,直到任務(wù)小到指定閾值時(shí)才開(kāi)始執(zhí)行瑟俭,從而遞歸的返回各個(gè)小任務(wù)的結(jié)果匯集成一個(gè)大任務(wù)的結(jié)果翎卓,依次類(lèi)推最終得出最初提交的那個(gè)大型復(fù)雜任務(wù)的結(jié)果,這和方法的遞歸調(diào)用思想是一樣的摆寄。當(dāng)然ForkJoinPool線(xiàn)程池為了提高任務(wù)的并行度和吞吐量做了非常多而且復(fù)雜的設(shè)計(jì)實(shí)現(xiàn)失暴,其中最著名的就是任務(wù)竊取機(jī)制。但ForkJoinPool更適合于處理一些大型任務(wù)微饥,因此逗扒,F(xiàn)orkJoinPool的適用范圍不大,僅限于某些密集且能被分解成多個(gè)子任務(wù)的任務(wù)欠橘,同時(shí)這些子任務(wù)運(yùn)行的結(jié)果可以合并成最終結(jié)果矩肩。ForkJoin框架主體由三部分組成:

  • ①ForkJoinWorkerThread:任務(wù)的執(zhí)行者,具體的線(xiàn)程實(shí)體
  • ②ForkJoinTask:需要執(zhí)行的任務(wù)實(shí)體
  • ③ForkJoinPool:管理執(zhí)行者線(xiàn)程的管理池

后續(xù)源碼階段會(huì)詳細(xì)分析肃续!

OK~黍檩,接著先簡(jiǎn)單的來(lái)看看ForkJoin框架的使用,F(xiàn)orkJoinPool提交任務(wù)的方式也有三種始锚,分別為:

  • execute():可提交Runnbale類(lèi)型的任務(wù)
  • submit():可提交Callable類(lèi)型的任務(wù)
  • invoke():可提交ForkJoinTask類(lèi)型的任務(wù)刽酱,但ForkJoinTask存在三個(gè)子類(lèi):
    • ①RecursiveAction:無(wú)返回值型ForkJoinTask任務(wù)
    • ②RecursiveTask:有返回值型ForkJoinTask任務(wù)
    • ③CountedCompleter:任務(wù)執(zhí)行完成后可以觸發(fā)鉤子回調(diào)函數(shù)的任務(wù)

上個(gè)案例:

業(yè)務(wù)需求:需要根據(jù)ID值對(duì)某個(gè)范圍區(qū)間內(nèi)的每條數(shù)據(jù)進(jìn)行變更,變更后獲取最新數(shù)據(jù)更新緩存瞧捌。
運(yùn)行環(huán)境:四核機(jī)器

public class ForkJoinPoolDemo {
    public static void main(String[] args) {
        testFor();
        testForkJoin();
    }
    
    // 測(cè)試for循環(huán)
    private static void testFor(){
        Instant startTime = Instant.now();
        List<Integer> list = new ArrayList<Integer>();
        for (int id = 1; id <= 1000*10000; id++) {
            // ....... 模擬從數(shù)據(jù)庫(kù)根據(jù)id查詢(xún)數(shù)據(jù)
            list.add(id);
        }
        Instant endTime = Instant.now();
        System.out.println("For循環(huán)耗時(shí):"+
            Duration.between(startTime,endTime).toMillis() + "ms");
    }
    
    // 測(cè)試ForkJoin框架
    private static void testForkJoin(){
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        Instant startTime = Instant.now();
        List invoke = forkJoinPool.invoke(new IdByFindUpdate(1, 1000*10000));
        Instant endTime = Instant.now();
        System.out.println("ForkJoin耗時(shí):"+
            Duration.between(startTime,endTime).toMillis() + "ms");
    }
}

class IdByFindUpdate extends RecursiveTask<List> {
    private Integer startID;
    private Integer endID;

    private static final Integer THURSHOLD = 10000; // 臨界值/閾值

    public IdByFindUpdate(Integer startID, Integer endID) {
        this.startID = startID;
        this.endID = endID;
    }

    @Override
    protected List<Integer> compute() {
        int taskSize = endID - startID;
        List<Integer> list = new ArrayList<Integer>();

        // 如果任務(wù)小于或等于拆分的最小閾值棵里,那么則直接處理任務(wù)
        if (taskSize <= THURSHOLD) {
            for (int id = startID; id <= endID; id++) {
                // ....... 模擬從數(shù)據(jù)庫(kù)根據(jù)id查詢(xún)數(shù)據(jù)
                list.add(id);
            }
            return list;
        }
        // 任務(wù)fork拆分
        IdByFindUpdate leftTask = new IdByFindUpdate(startID,
                                (startID + endID) / 2);
        leftTask.fork();
        IdByFindUpdate rightTask = new IdByFindUpdate(((startID
                                + endID) / 2) + 1, endID);
        rightTask.fork();

        // 任務(wù)join合并
        list.addAll(leftTask.join());
        list.addAll(rightTask.join());

        return list;
    }
}

案例如上,在其中模擬了數(shù)據(jù)庫(kù)查詢(xún)1000W數(shù)據(jù)后姐呐,將數(shù)據(jù)添加到集合中的操作殿怜。其中定義了任務(wù)類(lèi):IdByFindUpdate,因?yàn)樾枰祷亟Y(jié)果曙砂,所以IdByFindUpdate類(lèi)實(shí)現(xiàn)了ForkJoinTask的子類(lèi)RecursiveTask头谜,確保任務(wù)可以提交給ForkJoinPool線(xiàn)程池執(zhí)行。任務(wù)的拆分閾值設(shè)定為1W鸠澈,當(dāng)任務(wù)的查詢(xún)數(shù)量小于閾值時(shí)乔夯,則直接執(zhí)行任務(wù)。反之款侵,拆分任務(wù)直至最小(達(dá)到閾值)為止才開(kāi)始執(zhí)行侧纯,執(zhí)行結(jié)束后合并結(jié)果并返回新锈。

同時(shí),我們?yōu)榱藢?duì)比區(qū)別眶熬,也使用了普通的for循環(huán)來(lái)對(duì)比測(cè)試妹笆,結(jié)果如下:

/* 運(yùn)行結(jié)果:
 *      For循環(huán)耗時(shí):3274ms
 *      ForkJoin耗時(shí):1270ms
 */

很明顯块请,F(xiàn)orkJoin的執(zhí)行速度比普通的for循環(huán)速度快上三倍左右,但是值得一提的是:如果任務(wù)的量級(jí)太小拳缠,F(xiàn)orkJoin的處理速度反而比不上普通的For循環(huán)墩新。這是因?yàn)镕orkJoin框架在拆分任務(wù)fork階段于合并結(jié)果join階段需要時(shí)間,并且開(kāi)啟多條線(xiàn)程處理任務(wù)窟坐,CPU切換也需要時(shí)間海渊,所以當(dāng)一個(gè)任務(wù)fork/join階段以及CPU切換的時(shí)間開(kāi)銷(xiāo)大于原本任務(wù)的執(zhí)行時(shí)間時(shí),這種情況下則沒(méi)有必要使用ForkJoin框架哲鸳。

注意:ForkJoin的執(zhí)行時(shí)間跟機(jī)器硬件配置以及拆分臨界值/閾值的設(shè)定也有關(guān)系臣疑,拆分的閾值并不是越小越好,因?yàn)殚撝翟叫r(shí)徙菠,一個(gè)任務(wù)拆分的小任務(wù)也就會(huì)越多讯沈,而拆分、合并階段都是需要時(shí)間的婿奔,所以閾值需要根據(jù)機(jī)器的具體硬件設(shè)施和任務(wù)的類(lèi)型進(jìn)行合理的計(jì)算缺狠,這樣才能保證任務(wù)執(zhí)行時(shí)能夠到達(dá)最佳狀態(tài)。

ok萍摊,我也做了一個(gè)比較有意思的小測(cè)試挤茄,把單線(xiàn)程for循環(huán)的模式來(lái)處理上述任務(wù)以及ForkJoin框架處理上述任務(wù)分別分為了兩次來(lái)執(zhí)行,同時(shí)監(jiān)控了CPU的利用率狀況记餐,具體如下:

CPU利用率對(duì)比

通過(guò)上圖可以非常清晰的看見(jiàn)驮樊,當(dāng)單線(xiàn)程for循環(huán)的模式處理任務(wù)時(shí),因?yàn)槭窃诙嗪藱C(jī)器上執(zhí)行片酝,所以對(duì)于CPU的利用率最高不到50%囚衔,而當(dāng)使用ForkJoin框架處理任務(wù)時(shí),幾次觸頂達(dá)到了100%的CPU利用率雕沿。所以我們可以得出一個(gè)結(jié)論:ForkJoin框架在處理任務(wù)時(shí)练湿,能夠在最大程度上發(fā)揮機(jī)器的性能。

二审轮、ForkJoin框架原理淺析及成員構(gòu)成

在如上肥哎,對(duì)于ForkJoin框架已經(jīng)建立的初步的認(rèn)知,接著慢慢繼續(xù)分析其內(nèi)部實(shí)現(xiàn)過(guò)程疾渣。

2.1篡诽、ForkJoin框架原理

在前面提到過(guò),F(xiàn)orkJoin框架是建立在分治思想上的產(chǎn)物榴捡,而向FoorkJoinPool中傳遞一個(gè)任務(wù)時(shí)杈女,任務(wù)的執(zhí)行流程大體如下:

ForkJoin框架執(zhí)行流程

從圖中可以很輕易的明白:提交的任務(wù)會(huì)被分割成一個(gè)個(gè)小的左/右任務(wù),當(dāng)分割到最小時(shí),會(huì)分別執(zhí)行每個(gè)小的任務(wù)达椰,執(zhí)行完成后翰蠢,會(huì)將每個(gè)左/右任務(wù)的結(jié)果進(jìn)行,從而合并出父級(jí)任務(wù)的結(jié)果啰劲,依次類(lèi)推梁沧,直至最終計(jì)算出整個(gè)任務(wù)的最終結(jié)果。

工作竊扔恪:在引言中曾提到過(guò)ForkJoin框架是基于分治和工作竊取思想實(shí)現(xiàn)的廷支,那么何謂工作竊取呢?先舉個(gè)例子帶大家簡(jiǎn)單理解一下這個(gè)思想猖辫,具體的實(shí)現(xiàn)會(huì)在后面的源碼分析中詳細(xì)談到酥泞。
例子:我是開(kāi)了個(gè)工廠當(dāng)老板,一條流水線(xiàn)上招聘八個(gè)年輕小伙做事啃憎,每個(gè)人安排了五十個(gè)任務(wù)芝囤,并且對(duì)他們說(shuō):“你們是一個(gè)團(tuán)隊(duì),必須等到每個(gè)人做完了自己的任務(wù)之后才能一起下班辛萍!”悯姊。但是在這八個(gè)小伙里面,有手比較靈巧做的比較快的贩毕,也有做的比較慢悯许、效率比較低的人。那么當(dāng)一段時(shí)間過(guò)后辉阶,代號(hào)③的小伙率先完成了分配給自己的任務(wù)后先壕,為了早些下班,會(huì)跑到一些做的比較慢的小伙哪兒去拿一些任務(wù)過(guò)來(lái)幫助完成進(jìn)度谆甜,這便是工作竊取思想垃僚。在ForkJoin框架同樣存在這樣的情況,某條線(xiàn)程已經(jīng)執(zhí)行完成了分配給自己的任務(wù)后规辱,有些線(xiàn)程卻還在執(zhí)行并且堆積著很多任務(wù)谆棺,那么這條已經(jīng)處理完自己任務(wù)的線(xiàn)程則會(huì)去“竊取”其他線(xiàn)程的任務(wù)執(zhí)行。

2.2罕袋、ForkJoin框架成員分析

在前面說(shuō)過(guò)改淑,F(xiàn)orkJoin框架是由三部分組成,分別為:執(zhí)行者線(xiàn)程浴讯、任務(wù)實(shí)體以及線(xiàn)程池朵夏。接下來(lái)我們依次分析這些成員。

2.2.1榆纽、ForkJoinWorkerThread:任務(wù)的執(zhí)行者

ForkJoinWorkerThread繼承了Thread線(xiàn)程類(lèi)侍郭,作為T(mén)hread的子類(lèi)询吴,但是卻并沒(méi)有對(duì)線(xiàn)程的調(diào)度、執(zhí)行做改變亮元,只是僅僅增加了一些額外功能。ForkJoinWorkerThread線(xiàn)程被創(chuàng)建出來(lái)后都交由ForkJoinPool線(xiàn)程池管理唠摹,并且設(shè)置為了守護(hù)線(xiàn)程爆捞,而ForkJoinWorkerThread線(xiàn)程創(chuàng)建出來(lái)之后都是被注冊(cè)到FrokJoinPool線(xiàn)程池,由這些線(xiàn)程來(lái)執(zhí)行用戶(hù)提交的任務(wù)勾拉,所以ForkJoinWorkerThread也被稱(chēng)為任務(wù)的執(zhí)行者煮甥。

ForkJoinPool線(xiàn)程池與之前的線(xiàn)程池有一點(diǎn)區(qū)別在于:之前的線(xiàn)程池中,總共只有一個(gè)任務(wù)隊(duì)列藕赞,而ForkJoinPool中成肘,每個(gè)ForkJoinWorkerThread線(xiàn)程在創(chuàng)建時(shí),都會(huì)為它分配一個(gè)任務(wù)隊(duì)列斧蜕。同時(shí)為了實(shí)現(xiàn)工作竊取機(jī)制双霍,該隊(duì)列被設(shè)計(jì)為雙向隊(duì)列,線(xiàn)程執(zhí)行自身隊(duì)列中的任務(wù)時(shí)批销,采用LIFO的方式獲取任務(wù)洒闸,當(dāng)其他線(xiàn)程竊取任務(wù)時(shí),采用FIFO的方式獲取任務(wù)均芽。ForkJoinWorkerThread線(xiàn)程的主要工作為執(zhí)行自身隊(duì)列中的任務(wù)丘逸,其次是竊取其他線(xiàn)程隊(duì)列中的任務(wù)執(zhí)行。源碼如下:

public class ForkJoinWorkerThread extends Thread {
    final ForkJoinPool pool;    // 當(dāng)前線(xiàn)程所屬的線(xiàn)程池
    final ForkJoinPool.WorkQueue workQueue; // 當(dāng)前線(xiàn)程的雙向任務(wù)隊(duì)列
    
    protected ForkJoinWorkerThread(ForkJoinPool pool) {
        // 調(diào)用Thread父類(lèi)的構(gòu)造函數(shù)創(chuàng)建線(xiàn)程實(shí)體對(duì)象
        // 在這里是先暫時(shí)使用aFJT作為線(xiàn)程名稱(chēng)掀宋,當(dāng)外部傳遞線(xiàn)程名稱(chēng)時(shí)會(huì)替換
        super("aForkJoinWorkerThread");
        // 當(dāng)前設(shè)置線(xiàn)程池
        this.pool = pool;
        // 向ForkJoinPool線(xiàn)程池中注冊(cè)當(dāng)前線(xiàn)程深纲,為當(dāng)前線(xiàn)程分配任務(wù)隊(duì)列
        this.workQueue = pool.registerWorker(this);
    }
    
    // ForkJoinWorkerThread類(lèi) → run方法
    public void run() {
        // 如果隊(duì)列中有任務(wù)
        if (workQueue.array == null) { 
            // 定義異常對(duì)象,方便后續(xù)記錄異常
            Throwable exception = null;
            try {
                // 執(zhí)行前置鉤子函數(shù)(預(yù)留方法劲妙,內(nèi)部未實(shí)現(xiàn))
                onStart();
                // 執(zhí)行工作隊(duì)列中的任務(wù)
                pool.runWorker(workQueue);
            } catch (Throwable ex) {
                // 記錄捕獲的異常信息
                exception = ex;
            } finally {
                try {
                    // 對(duì)外寫(xiě)出捕獲的異常信息
                    onTermination(exception);
                } catch (Throwable ex) {
                    if (exception == null)
                        exception = ex;
                } finally {
                    // 調(diào)用 deregisterWorker 方法進(jìn)行清理
                    pool.deregisterWorker(this, exception);
                }
            }
        }
    }
    // 省略其他代碼.....
}

很明顯的可以看到湃鹊,ForkJoinWorkerThread的構(gòu)造函數(shù)中,在初始化時(shí)會(huì)將自身注冊(cè)進(jìn)線(xiàn)程池中是趴,然后由線(xiàn)程池給每個(gè)線(xiàn)程對(duì)象分配一個(gè)隊(duì)列涛舍。

2.2.2、ForkJoinTask:任務(wù)實(shí)體

ForkJoinTask與FutrueTask一樣唆途,是Futrue接口的子類(lèi)富雅,ForkJoinTask是一種可以將任務(wù)進(jìn)行遞歸分解執(zhí)行,從而提高執(zhí)行并行度的任務(wù)類(lèi)型肛搬,執(zhí)行結(jié)束后也可以支持結(jié)果返回没佑。但ForkJoinTask僅是一個(gè)抽象類(lèi),子類(lèi)有三個(gè):

  • ①RecursiveAction:無(wú)返回值型ForkJoinTask任務(wù)
  • ②RecursiveTask:有返回值型ForkJoinTask任務(wù)
  • ③CountedCompleter:任務(wù)執(zhí)行完成后可以觸發(fā)鉤子回調(diào)函數(shù)的任務(wù)

ForkJoinTask的作用就是根據(jù)任務(wù)的分解實(shí)現(xiàn)温赔,將任務(wù)進(jìn)行拆分蛤奢,以及等待子任務(wù)的執(zhí)行結(jié)果合并成父任務(wù)的結(jié)果。ForkJoinTask內(nèi)部存在一個(gè)整數(shù)類(lèi)型的成員status,該成員高16位記錄任務(wù)的執(zhí)行狀態(tài)啤贩,如:如NORMAL待秃、CANCELLED或EXCEPTIONAL,低16位預(yù)留用于記錄用戶(hù)自定義的任務(wù)標(biāo)簽痹屹。ForkJoinTask源碼具體如下:

public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
    // 表示任務(wù)的執(zhí)行狀態(tài)章郁,總共有如下幾種值
    volatile int status;
    // 獲取任務(wù)狀態(tài)的掩碼,后續(xù)用于位計(jì)算志衍,判斷任務(wù)是否正常執(zhí)行結(jié)束
    static final int DONE_MASK   = 0xf0000000;
    // 表示任務(wù)正常執(zhí)行結(jié)束
    static final int NORMAL      = 0xf0000000; 
    // 表示任務(wù)被取消
    static final int CANCELLED   = 0xc0000000;
    // 表示任務(wù)出現(xiàn)異常結(jié)束
    static final int EXCEPTIONAL = 0x80000000;
    // 表示當(dāng)前任務(wù)被別的任務(wù)依賴(lài)暖庄,在結(jié)束前會(huì)通知其他任務(wù)join結(jié)果
    static final int SIGNAL      = 0x00010000; 
    // 低16位掩碼,預(yù)留占位(short mask)
    // setForkJoinTaskTag方法中應(yīng)用了該成員楼肪,但這個(gè)方法沒(méi)實(shí)現(xiàn)/應(yīng)用
    static final int SMASK       = 0x0000ffff;

    // 異常哈希鏈表數(shù)組(異常哈希表培廓,類(lèi)似于hashmap1.8之前的實(shí)現(xiàn))
    // 因?yàn)槿蝿?wù)拆分之后會(huì)很多,異常信息要么都沒(méi)有春叫,要么都會(huì)出現(xiàn)
    // 所以不直接記錄在ForkJoinTask對(duì)象中肩钠,而是采用哈希表結(jié)構(gòu)存儲(chǔ)弱引用類(lèi)型的節(jié)點(diǎn)
    // 注意這些都是 static 類(lèi)屬性,所有的ForkJoinTask共用的
    private static final ExceptionNode[] exceptionTable;        
    private static final ReentrantLock exceptionTableLock;
    // 在ForkJoinTask的node被GC回收之后象缀,相應(yīng)的異常節(jié)點(diǎn)對(duì)象的引用隊(duì)列
    private static final ReferenceQueue<Object> exceptionTableRefQueue; 

    /**
    * 固定容量的exceptionTable(代表數(shù)組長(zhǎng)度為32蔬将,下標(biāo)存儲(chǔ)鏈表頭節(jié)點(diǎn))
    */
    private static final int EXCEPTION_MAP_CAPACITY = 32;

    // 內(nèi)部節(jié)點(diǎn)類(lèi):異常數(shù)組存儲(chǔ)的元素:
    //      數(shù)組是固定長(zhǎng)度,這樣方便外部訪(fǎng)問(wèn)
    //      但是為了保證內(nèi)存可用性央星,所以是弱引用類(lèi)型
    //      因?yàn)椴荒艽_定任務(wù)的最后一個(gè)join何時(shí)完成霞怀,所以在下次GC發(fā)生時(shí)會(huì)被回收
    //      在GC回收后,這些異常信息會(huì)被轉(zhuǎn)存到exceptionTableRefQueue隊(duì)列
    static final class ExceptionNode extends WeakReference<ForkJoinTask<?>> {
        final Throwable ex;
        ExceptionNode next;
        final long thrower;  // 拋出異常的線(xiàn)程id
        final int hashCode;  // 在弱引用消失之前存儲(chǔ)hashCode
        ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next) {
            // //在ForkJoinTask被GC回收之后莉给,會(huì)將該節(jié)點(diǎn)加入隊(duì)列exceptionTableRefQueue
            super(task, exceptionTableRefQueue); 
            this.ex = ex;
            this.next = next;
            this.thrower = Thread.currentThread().getId();
            this.hashCode = System.identityHashCode(task);
        }
    }

    /* 抽象方法:用于拓展 */
    // 任務(wù)執(zhí)行完成后返回結(jié)果毙石,未完成返回null
    public abstract V getRawResult();
    // 強(qiáng)制性的給定返回結(jié)果
    protected abstract void setRawResult(V value);
    // 執(zhí)行任務(wù)
    // 如果執(zhí)行過(guò)程拋出異常則記錄捕獲的異常并更改任務(wù)狀態(tài)為EXCEPTIONAL
    // 如果執(zhí)行正常結(jié)束,設(shè)置任務(wù)狀態(tài)為NORMAL正常結(jié)束狀態(tài)
    // 如果當(dāng)前是子任務(wù)颓遏,設(shè)置為SIGNAL狀態(tài)并通知其他需要join該任務(wù)的線(xiàn)程
    protected abstract boolean exec();
    
    /* 實(shí)現(xiàn)Future接口的方法 */
    // 阻塞等待任務(wù)執(zhí)行結(jié)果
    public final V get();
    // 在給定時(shí)間內(nèi)等待返回結(jié)果徐矩,超出給定時(shí)間則中斷線(xiàn)程
    public final V get(long timeout, TimeUnit unit);
    // 阻塞非工作線(xiàn)程直至任務(wù)結(jié)束或者中斷(該過(guò)程可能會(huì)發(fā)生竊取動(dòng)作),返回任務(wù)的status值
    private int externalInterruptibleAwaitDone();
    // 嘗試取消任務(wù)叁幢,成功返回true滤灯,反之false
    public boolean cancel(boolean mayInterruptIfRunning);
    // 判斷任務(wù)是否已執(zhí)行結(jié)束
    public final boolean isDone();
    // 判斷任務(wù)是否被取消
    public final boolean isCancelled();
    
    
    /* 一些重要的方法 */
    // 執(zhí)行任務(wù)的方法
    final int doExec();
    // 修改任務(wù)狀態(tài)的方法
    private int setcompletion (int completion);
    // 取消任務(wù)的方法
    public boolean cancel(boolean mayInterruptIfRunning);
    
    
    // 將新創(chuàng)建的子任務(wù)放入當(dāng)前線(xiàn)程的任務(wù)(工作)隊(duì)列
    public final ForkJoinTask<V> fork();
    // 將當(dāng)前線(xiàn)程阻塞,直到對(duì)應(yīng)的子任務(wù)完成運(yùn)行并返回執(zhí)行結(jié)果
    public final V join();
    // 獲取任務(wù)執(zhí)行狀態(tài)曼玩,如果還未結(jié)束鳞骤,當(dāng)前線(xiàn)程獲取任務(wù)幫助執(zhí)行
    private int doJoin();
    // 執(zhí)行任務(wù),正常結(jié)束則返回結(jié)果黍判,異常結(jié)束則報(bào)告異常
    public final V invoke();
    // 使用當(dāng)前線(xiàn)程執(zhí)行任務(wù)
    private int doInvoke();
    // 阻塞線(xiàn)程直至任務(wù)執(zhí)行結(jié)束豫尽,如果未執(zhí)行完成嫌拣,外部線(xiàn)程嘗試幫助執(zhí)行
    private int externalAwaitDone();
    // 同時(shí)執(zhí)行兩個(gè)任務(wù)卢鹦,第一個(gè)任務(wù)由當(dāng)前線(xiàn)程執(zhí)行,第二個(gè)交由工作線(xiàn)程執(zhí)行
    public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2);
    // 執(zhí)行多個(gè)任務(wù)渔肩,入?yún)槿我鈧€(gè)任務(wù)對(duì)象,除開(kāi)第一個(gè)任務(wù)榴嗅,其他交由工作線(xiàn)程執(zhí)行
    public static void invokeAll(ForkJoinTask<?>... tasks);
    // 入?yún)镃ollection集合妄呕,可以支持返回結(jié)果
    public static <T extends ForkJoinTask<?>> 
        Collection<T> invokeAll(Collection<T> tasks);
    
    /* 異常相關(guān)的方法 */
    // 記錄異常信息以及設(shè)置任務(wù)狀態(tài)
    final int recordExceptionalCompletion(Throwable ex);
    // 刪除異常結(jié)點(diǎn)并清理狀態(tài)
    private void clearExceptionalCompletion();
    // 刪除哈希表中過(guò)期的異常信息引用
    private static void expungeStaleExceptions();
    // 獲取任務(wù)異常判斷與當(dāng)前線(xiàn)程堆棧關(guān)系是否相關(guān),
    // 不相關(guān)則構(gòu)建一個(gè)相同類(lèi)型的異常录肯,作為記錄
    // 這樣做的原因是為了提供準(zhǔn)確的堆棧跟蹤
    private Throwable getThrowableException();
}

ForkJoinTask內(nèi)部成員主要由兩部分構(gòu)成趴腋,一個(gè)是表示任務(wù)狀態(tài)的int成員:status,其他的成員則都是跟任務(wù)異常信息記錄相關(guān)的论咏。不過(guò)值得注意一提的是:ForkJoinTask內(nèi)部有關(guān)異常信息記錄的成員都是static關(guān)鍵字修飾的,也就代表著這些成員是所有ForkJoinTask對(duì)象共享的颁井,ForkJoinTask使用類(lèi)似與HashMap的實(shí)現(xiàn)結(jié)構(gòu):固定長(zhǎng)度32的數(shù)組+單向鏈表實(shí)現(xiàn)了一個(gè)哈希表結(jié)構(gòu)厅贪,用于記錄所有ForkJoinTask執(zhí)行過(guò)程中出現(xiàn)的異常,所有異常信息都會(huì)被封裝成ExceptionNode節(jié)點(diǎn)加入哈希表中存儲(chǔ)雅宾,但是ExceptionNode節(jié)點(diǎn)是一種弱引用的實(shí)現(xiàn)养涮,當(dāng)程序下次GC發(fā)生時(shí)會(huì)被GC機(jī)制回收,GC時(shí)這些已捕獲的異常則會(huì)被轉(zhuǎn)移到exceptionTableRefQueue隊(duì)列中存儲(chǔ)眉抬。

而成員status代表任務(wù)的執(zhí)行狀態(tài)贯吓,成員類(lèi)型為int,從最大程度上減少了內(nèi)存占用蜀变,為了保證原子性悄谐,該成員使用了volatile修飾以及操作時(shí)都是CAS操作。而當(dāng)任務(wù)未結(jié)束時(shí)库北,status都會(huì)大于0爬舰,任務(wù)執(zhí)行結(jié)束后,status都會(huì)小于0寒瓦。在ForkJoinTask也定義了如下幾種狀態(tài):

  • ①DONE_MASK狀態(tài):屏蔽非完成位標(biāo)志情屹,與NORMAL值相同,主要后續(xù)用于位運(yùn)算判斷任務(wù)是否正常執(zhí)行結(jié)束
    • 二進(jìn)制值:1111 0000 0000 0000 0000 0000 0000 0000
  • ②NORMAL狀態(tài):表示任務(wù)正常執(zhí)行結(jié)束
    • 二進(jìn)制值:1111 0000 0000 0000 0000 0000 0000 0000
  • ③CANCELLED狀態(tài):表示任務(wù)被取消
    • 二進(jìn)制值:1100 0000 0000 0000 0000 0000 0000 0000
  • ④EXCEPTIONAL狀態(tài):表示任務(wù)執(zhí)行過(guò)程中出現(xiàn)異常杂腰,導(dǎo)致任務(wù)執(zhí)行終止結(jié)束
    • 二進(jìn)制值:1000 0000 0000 0000 0000 0000 0000 0000
  • ⑤SIGNAL狀態(tài):表示傳遞信號(hào)狀態(tài)垃你,代表著當(dāng)前任務(wù)存在依賴(lài)關(guān)系,執(zhí)行結(jié)束后需要通知其他任務(wù)join合并結(jié)果
    • 二進(jìn)制值:0000 0000 0000 0001 0000 0000 0000 0000
  • ⑥SMASK狀態(tài):低十六位的預(yù)留占位
    • 二進(jìn)制值:0000 0000 0000 0000 1111 1111 1111 1111
  • PS:②③④⑤為任務(wù)狀態(tài)喂很,其他的只是輔助標(biāo)識(shí)

ForkJoinTask中的所有方法也可以分為三大類(lèi):

  • ①基于status狀態(tài)成員操作以及維護(hù)方法
  • ②執(zhí)行任務(wù)以及等待完成方法
  • ③附加對(duì)外報(bào)告結(jié)果的用戶(hù)級(jí)方法

重點(diǎn)來(lái)看一下fork()以及join()方法:

// ForkJoinTask類(lèi) → fork方法
public final ForkJoinTask<V> fork() {
    Thread t;
    // 判斷當(dāng)前執(zhí)行的線(xiàn)程是否為池中的工作線(xiàn)程
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        // 如果是的則直接將任務(wù)壓入當(dāng)前線(xiàn)程的任務(wù)隊(duì)列
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        // 如果不是則壓入common池中的某個(gè)工作線(xiàn)程的任務(wù)隊(duì)列中
        ForkJoinPool.common.externalPush(this);
    // 返回當(dāng)前ForkJoinTask對(duì)象惜颇,方便遞歸拆分
    return this;
}

// ForkJoinTask類(lèi) → join方法
public final V join() {
    int s;
    // 判斷任務(wù)執(zhí)行狀態(tài)如果是非正常結(jié)束狀態(tài)
    if ((s = doJoin() & DONE_MASK) != NORMAL)
        // 拋出相關(guān)的異常堆棧信息
        reportException(s);
    // 正常執(zhí)行結(jié)束則返回執(zhí)行結(jié)果
    return getRawResult();
}
// ForkJoinTask類(lèi) → doJoin方法
private int doJoin() {
    int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
    // status<0則直接返回status值
    return (s = status) < 0 ? s :
      // 判斷當(dāng)前線(xiàn)程是否為池中的工作線(xiàn)程
        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        // 是則取出線(xiàn)程任務(wù)隊(duì)列中的當(dāng)前task執(zhí)行,執(zhí)行完成返回status值
        (w = (wt = (ForkJoinWorkerThread)t).workQueue).
        tryUnpush(this) && (s = doExec()) < 0 ? s :
        // 執(zhí)行未完成則調(diào)用awaitJoin方法等待執(zhí)行完成
        wt.pool.awaitJoin(w, this, 0L) :
      // 不是則調(diào)用externalAwaitDone()方法阻塞掛起當(dāng)前線(xiàn)程
        externalAwaitDone();
}
  • fork方法邏輯:
    • ①判斷當(dāng)前線(xiàn)程是否為池中的工作線(xiàn)程類(lèi)型
      • 是:將當(dāng)前任務(wù)壓入當(dāng)前線(xiàn)程的任務(wù)隊(duì)列中
      • 不是:將當(dāng)前任務(wù)壓入common池中某個(gè)工作線(xiàn)程的任務(wù)隊(duì)列中
    • ②返回當(dāng)前的ForkJoinTask任務(wù)對(duì)象恤筛,方便遞歸拆分
  • doJoin&join方法邏輯:
    • ①判斷任務(wù)狀態(tài)status是否小于0:
      • 小于:代表任務(wù)已經(jīng)結(jié)束官还,返回status值
      • 不小于:判斷當(dāng)前線(xiàn)程是否為池中的工作線(xiàn)程:
        • 是:取出線(xiàn)程任務(wù)隊(duì)列的當(dāng)前task執(zhí)行,判斷執(zhí)行是否結(jié)束:
          • 結(jié)束:返回執(zhí)行結(jié)束的status值
          • 未結(jié)束:調(diào)用awaitJoin方法等待執(zhí)行結(jié)束
        • 不是:調(diào)用externalAwaitDone()方法阻塞掛起當(dāng)前線(xiàn)程
    • ②判斷任務(wù)執(zhí)行狀態(tài)是否為非正常結(jié)束狀態(tài)毒坛,是則拋出異常堆棧信息
      • 任務(wù)狀態(tài)為被取消望伦,拋出CancellationException異常
      • 任務(wù)狀態(tài)為異常結(jié)束林说,拋出對(duì)應(yīng)的執(zhí)行異常信息
    • ③如果status為正常結(jié)束狀態(tài),則直接返回執(zhí)行結(jié)果

OK~屯伞,最后再看看ForkJoinTask的內(nèi)部類(lèi):

static final class ExceptionNode extends 
        WeakReference<ForkJoinTask<?>> {}
        
static final class AdaptedRunnable<T> extends ForkJoinTask<T>
        implements RunnableFuture<T> {}

static final class AdaptedRunnableAction extends ForkJoinTask<Void>
        implements RunnableFuture<Void> {}
        
static final class RunnableExecuteAction extends ForkJoinTask<Void> {}

static final class AdaptedCallable<T> extends ForkJoinTask<T>
        implements RunnableFuture<T> {}
  • ①ExceptionNode:用于記錄任務(wù)執(zhí)行過(guò)程中拋出的異常信息腿箩,是ForkJoinTask的弱引用
  • ②AdaptedRunnableAction:用于封裝Runable類(lèi)型任務(wù)的適配器,抽象方法實(shí)現(xiàn):
    • getRawResult()方法:直接返回null
    • setRawResult()方法:空實(shí)現(xiàn)
    • exec()方法:直接調(diào)用的run()方法
  • ③AdaptedCallable:用于封裝Callable類(lèi)型任務(wù)的適配器劣摇,抽象方法實(shí)現(xiàn):
    • getRawResult()方法:返回call方法的執(zhí)行結(jié)果
    • setRawResult()方法:設(shè)置Callable執(zhí)行后的返回值
    • exec()方法:調(diào)用的call()方法
  • ④AdaptedRunnable:用于封裝Runable類(lèi)型任務(wù)的適配器珠移,可以通過(guò)構(gòu)造器設(shè)置返回集
  • ⑤RunnableExecuteAction:同②類(lèi)似,區(qū)別在于它可以?huà)伋霎惓?/li>

2.2.3末融、ForkJoinPool:線(xiàn)程池

ForkJoinPool也是實(shí)現(xiàn)了ExecutorService的線(xiàn)程池钧惧,但ForkJoinPool不同于其他類(lèi)型的線(xiàn)程池,因?yàn)槠鋬?nèi)部實(shí)現(xiàn)了工作竊取機(jī)制勾习,所有線(xiàn)程在執(zhí)行完自己的任務(wù)之后都會(huì)嘗試竊取其他線(xiàn)程的任務(wù)執(zhí)行浓瞪,只有當(dāng)竊取不到任務(wù)的情況下才會(huì)發(fā)生阻塞等待工作。ForkJoinPool主要是為了執(zhí)行ForkJoinTask而存在的巧婶,ForkJoinPool是整個(gè)ForkJoin框架的核心乾颁,負(fù)責(zé)整個(gè)框架的核心管理、檢查監(jiān)控與資源調(diào)度艺栈。

2.2.3.1英岭、ForkJoinPool構(gòu)造器

先來(lái)看看ForkJoinPool的構(gòu)造函數(shù)源碼:

// 構(gòu)造器1:使用默認(rèn)的參數(shù)配置創(chuàng)建
public ForkJoinPool() {
    this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
         defaultForkJoinWorkerThreadFactory, null, false);
}
// 構(gòu)造器2:可指定并行度
public ForkJoinPool(int parallelism) {
    this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}
// 構(gòu)造器3:可指定并行度、線(xiàn)程工廠湿右、異常策略以及調(diào)度模式
public ForkJoinPool(int parallelism,
                    ForkJoinWorkerThreadFactory factory,
                    UncaughtExceptionHandler handler,
                    boolean asyncMode) {
    this(checkParallelism(parallelism),
         checkFactory(factory),
         handler,
         asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
         "ForkJoinPool-" + nextPoolId() + "-worker-");
    checkPermission();
}
// 私有全參構(gòu)造函數(shù):提供給內(nèi)部其他三個(gè)構(gòu)造器調(diào)用
private ForkJoinPool(int parallelism,
                     ForkJoinWorkerThreadFactory factory,
                     UncaughtExceptionHandler handler,
                     int mode,
                     String workerNamePrefix) {
    this.workerNamePrefix = workerNamePrefix;
    this.factory = factory;
    this.ueh = handler;
    this.config = (parallelism & SMASK) | mode;
    long np = (long)(-parallelism); // offset ctl counts
    this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}

ForkJoinPool對(duì)外提供了三個(gè)構(gòu)造器诅妹,但是這三個(gè)構(gòu)造器都是基于內(nèi)部的私有構(gòu)造完成的,所以直接分析最后一個(gè)全參的私有構(gòu)造器诅需,該構(gòu)造器共有五個(gè)參數(shù):

  • ①parallelism并行度:默認(rèn)為CPU核數(shù)漾唉,最小為1。相當(dāng)于工作線(xiàn)程數(shù)堰塌,但會(huì)有些不同
  • ②factory線(xiàn)程工廠:用于創(chuàng)建ForkJoinWorkerThread線(xiàn)程
  • ③handler異常捕獲策略:默認(rèn)為null赵刑,執(zhí)行任務(wù)出現(xiàn)異常從中被拋出時(shí),就會(huì)被handler捕獲
  • ④mode調(diào)度模式:對(duì)應(yīng)前三個(gè)構(gòu)造中的asyncMode參數(shù)场刑,默認(rèn)為0般此,也就是false
    • false:使用LIFO_QUEUE成員,mode=0牵现,使用先進(jìn)后出的模式調(diào)度工作
    • true:使用FIFO_QUEUE成員铐懊,mode=1<<16,使用先進(jìn)先出的模式調(diào)度工作
  • ⑤workerNamePrefix工作名稱(chēng)前綴:工作線(xiàn)程的名稱(chēng)前綴瞎疼,有默認(rèn)值科乎,不需要傳遞該參數(shù)

創(chuàng)建ForkJoinPool線(xiàn)程池除開(kāi)通過(guò)構(gòu)造函數(shù)的方式之外,在JDK1.8中還提供了一個(gè)靜態(tài)方法:commonPool()贼急,該方法可以通過(guò)指定系統(tǒng)參數(shù)的方式(System.setProperty(?,?))定義“并行度茅茂、線(xiàn)程工廠和異常處理策略”捏萍,但是該方法是一個(gè)靜態(tài)方法,也就代表著通過(guò)該方法創(chuàng)建出來(lái)的線(xiàn)程池對(duì)象只會(huì)有一個(gè)空闲,調(diào)用commonPool()方法獲取到的ForkJoinPool對(duì)象是整個(gè)程序通用的令杈。

2.2.3.2、ForkJoinPool內(nèi)部成員

前面我們了解了ForkJoinPool的構(gòu)造器碴倾,現(xiàn)在再簡(jiǎn)單看看它的成員構(gòu)成:

// 線(xiàn)程池的ctl控制變量(與上篇中分析的ctl性質(zhì)相同)
volatile long ctl;      
// 線(xiàn)程池的運(yùn)行狀態(tài)逗噩,值為常量中對(duì)應(yīng)的值
volatile int runState;
// 將并行度和mode參數(shù)放到了一個(gè)int中,便于后續(xù)通過(guò)位操作計(jì)算
final int config;
// 隨機(jī)種子跌榔,與SEED_INCREMENT魔數(shù)配合使用
int indexSeed;
// 組成WorkQueue數(shù)組异雁,是線(xiàn)程池的核心數(shù)據(jù)結(jié)構(gòu)
volatile WorkQueue[] workQueues;
// 創(chuàng)建線(xiàn)程的線(xiàn)程工廠
final ForkJoinWorkerThreadFactory factory;
// 任務(wù)在執(zhí)行過(guò)程中出現(xiàn)拋出異常時(shí)的處理策略,類(lèi)似于之前線(xiàn)程池的拒絕策略
final UncaughtExceptionHandler ueh;
// 創(chuàng)建線(xiàn)程時(shí)僧须,線(xiàn)程名稱(chēng)的前綴
final String workerNamePrefix;
// 任務(wù)竊取的原子計(jì)數(shù)器
volatile AtomicLong stealCounter;

// 默認(rèn)創(chuàng)建工作線(xiàn)程的工廠類(lèi)
public static final ForkJoinWorkerThreadFactory
        defaultForkJoinWorkerThreadFactory;
// 線(xiàn)程修改許可片迅,用于檢測(cè)代碼是否具備修改線(xiàn)程狀態(tài)的權(quán)限
private static final RuntimePermission modifyThreadPermission;
// 通用的ForkJoinPool線(xiàn)程池,用于commonPool()方法
static final ForkJoinPool common;
// 通用線(xiàn)程池的并行數(shù)
static final int commonParallelism;
// 通用線(xiàn)程池的最大線(xiàn)程數(shù)
private static int commonMaxSpares;
// 用于記錄已創(chuàng)建的ForkJoinPool線(xiàn)程池的個(gè)數(shù)
private static int poolNumberSequence;

// 當(dāng)線(xiàn)程執(zhí)行完成自己的任務(wù)且池中沒(méi)有活躍線(xiàn)程時(shí)皆辽,用于計(jì)算阻塞時(shí)間,默認(rèn)2s
private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L;
// 平衡計(jì)數(shù)芥挣,通過(guò)IDLE_TIMEOUT會(huì)減去TIMEOUT_SLOP驱闷,
// 主要為了平衡系統(tǒng)定時(shí)器喚醒時(shí)帶來(lái)的延時(shí)時(shí)間,默認(rèn)20ms
private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L;
// 通用線(xiàn)程池默認(rèn)的最大線(xiàn)程數(shù) 256
private static final int DEFAULT_COMMON_MAX_SPARES = 256;
/**
 * 自旋次數(shù):阻塞之前旋轉(zhuǎn)等待的次數(shù)空免,目前使用的是隨機(jī)旋轉(zhuǎn)
 * 在awaitRunStateLock空另、awaitWork以及awaitRunstateLock方法中使用,
 * 當(dāng)前設(shè)置為零蹋砚,以減少自旋帶來(lái)的CPU開(kāi)銷(xiāo)
 * 如果大于零扼菠,則SPINS的值必須為2的冪,至少為 4
 */
private static final int SPINS  = 0;
// 這個(gè)是產(chǎn)生隨機(jī)性的魔數(shù)坝咐,用于掃描的時(shí)候進(jìn)行計(jì)算(與ThreadLocal類(lèi)似)
private static final int SEED_INCREMENT = 0x9e3779b9;


// runState的狀態(tài):處于SHUTDOWN時(shí)值必須為負(fù)數(shù)循榆,其他狀態(tài)只要是2的次冪即可
// 鎖定狀態(tài):線(xiàn)程池被某條線(xiàn)程獲取了鎖
private static final int  RSLOCK     = 1;
// 信號(hào)狀態(tài):線(xiàn)程阻塞前需要設(shè)置RSIGNAL,告訴其他線(xiàn)程在釋放鎖時(shí)要叫醒我
private static final int  RSIGNAL    = 1 << 1;
// 啟動(dòng)狀態(tài):表示線(xiàn)程池正常墨坚,可以創(chuàng)建線(xiàn)程且接受任務(wù)處理
private static final int  STARTED    = 1 << 2;
// 停止?fàn)顟B(tài):線(xiàn)程池已停止秧饮,不能創(chuàng)建線(xiàn)程且不接受新任務(wù),同時(shí)會(huì)取消未處理的任務(wù)
private static final int  STOP       = 1 << 29;
// 死亡狀態(tài):表示線(xiàn)程池內(nèi)所有任務(wù)已取消泽篮,所有工作線(xiàn)程已銷(xiāo)毀
private static final int  TERMINATED = 1 << 30;
// 關(guān)閉狀態(tài):嘗試關(guān)閉線(xiàn)程池盗尸,不再接受新的任務(wù),但依舊處理已接受的任務(wù)
private static final int  SHUTDOWN   = 1 << 31;


// CTL變量的一些掩碼
// 低32位的掩碼
private static final long SP_MASK    = 0xffffffffL;
// 高32位的掩碼
private static final long UC_MASK    = ~SP_MASK;

// 有效(活躍/存活)計(jì)數(shù):正在處理任務(wù)的活躍線(xiàn)程數(shù)
// 高16位的偏移量帽撑,用高16位記錄活躍線(xiàn)程數(shù)
private static final int  AC_SHIFT   = 48;
// 高16位:活躍線(xiàn)程的計(jì)數(shù)單元泼各,高16位+1
private static final long AC_UNIT    = 0x0001L << AC_SHIFT;
// 高16位:活躍線(xiàn)程數(shù)的掩碼
private static final long AC_MASK    = 0xffffL << AC_SHIFT;

// 總計(jì)數(shù):整個(gè)池中存在的所有線(xiàn)程數(shù)量
// 總線(xiàn)程數(shù)量的偏移量,使用高32位中的低16位記錄
private static final int  TC_SHIFT   = 32;
// 總線(xiàn)程數(shù)的計(jì)數(shù)單元
private static final long TC_UNIT    = 0x0001L << TC_SHIFT;
// 總線(xiàn)程數(shù)的掩碼
private static final long TC_MASK    = 0xffffL << TC_SHIFT;
// 最大總線(xiàn)程數(shù)的掩碼亏拉,用于判斷線(xiàn)程數(shù)量是否已達(dá)上限
private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); 


// 低16位掩碼扣蜻,表示workQueue在數(shù)組中的最大索引值
static final int SMASK        = 0xffff;
// 最大工作線(xiàn)程數(shù)
static final int MAX_CAP      = 0x7fff;
// 偶數(shù)掩碼逆巍,第一個(gè)bit為0,任何值與它進(jìn)行 與 計(jì)算弱贼,結(jié)果都為偶數(shù)
static final int EVENMASK     = 0xfffe;
// 用于計(jì)算偶數(shù)值下標(biāo)蒸苇,SQMASK值為126
// 0~126之間只存在64個(gè)偶數(shù),所以偶數(shù)位的槽數(shù)只有64個(gè)
static final int SQMASK       = 0x007e;

// Masks and units for WorkQueue.scanState and ctl sp subfield
// 用于檢測(cè)工作線(xiàn)程是否在執(zhí)行任務(wù)的掩碼
static final int SCANNING     = 1;
// 負(fù)數(shù)吮旅,用于workQueue.scanState溪烤,與scanState進(jìn)行位或可將scanState變成負(fù)數(shù),
// 表示工作線(xiàn)程掃描不到任務(wù)庇勃,進(jìn)入不活躍狀態(tài)檬嘀,將可能被阻塞
static final int INACTIVE     = 1 << 31;
// 版本計(jì)數(shù),用于workQueue.scanState责嚷,解決ABA問(wèn)題
static final int SS_SEQ       = 1 << 16;

// Mode bits for ForkJoinPool.config and WorkQueue.config
// 獲取隊(duì)列的工作調(diào)度模式
static final int MODE_MASK    = 0xffff << 16;
// 表示先進(jìn)后出模式
static final int LIFO_QUEUE   = 0;
// 表示先進(jìn)先出模式
static final int FIFO_QUEUE   = 1 << 16;
// 表示共享模式
static final int SHARED_QUEUE = 1 << 31;


// Unsafe類(lèi)對(duì)象:用于直接操作內(nèi)存
private static final sun.misc.Unsafe U;
// ForkJoinTask[]數(shù)組的基準(zhǔn)偏移量
// 使用這個(gè)值+元素的大小可以直接定位一個(gè)內(nèi)存位置
private static final int  ABASE;
// ForkJoinTask[]數(shù)組兩個(gè)元素之間的間距的冪 → log(間距) 底數(shù)為2
private static final int  ASHIFT;
// ctl成員的內(nèi)存偏移量地址
private static final long CTL;
// runState成員的內(nèi)存偏移量地址
private static final long RUNSTATE;
// stealCounter成員的內(nèi)存偏移量地址
private static final long STEALCOUNTER;
// parkBlocker成員的內(nèi)存偏移量地址
private static final long PARKBLOCKER;
// WorkQueue隊(duì)列的top元素偏移量地址
private static final long QTOP;
// WorkQueue隊(duì)列的qlock偏移量地址
private static final long QLOCK;
// WorkQueue隊(duì)列的scanState偏移量地址
private static final long QSCANSTATE;
// WorkQueue隊(duì)列的parker偏移量地址
private static final long QPARKER;
// WorkQueue隊(duì)列的CurrentSteal偏移量地址
private static final long QCURRENTSTEAL;
// WorkQueue隊(duì)列的CurrentJoin偏移量地址
private static final long QCURRENTJOIN;

如上既是ForkJoinPool中的所有成員變量鸳兽,總的可以分為四類(lèi):

  • ForkJoinPool線(xiàn)程池運(yùn)行過(guò)程中的成員結(jié)構(gòu)
  • ②通用線(xiàn)程池common以及所有線(xiàn)程池對(duì)象的默認(rèn)配置
  • ③線(xiàn)程池運(yùn)行狀態(tài)以及ctl成員的位存儲(chǔ)記錄
  • ④直接操作內(nèi)存的Unsafe相關(guān)成員及內(nèi)存偏移量

具體的解釋可以參考我源碼中的注釋?zhuān)覀兙吞糁攸c(diǎn)說(shuō),先來(lái)說(shuō)說(shuō)核心成員:ctl罕拂,這個(gè)成員比較特殊揍异,我估計(jì)DougLea在設(shè)計(jì)它的時(shí)候是抱著"一分錢(qián)掰成兩份用"的心思寫(xiě)的,它是一個(gè)long類(lèi)型的成員爆班,占位8byte/64bit衷掷,是不是有些疑惑?用int類(lèi)型不是更省內(nèi)存嘛柿菩?如果你這樣想戚嗅,那你就錯(cuò)了。因?yàn)?code>ctl不是只存儲(chǔ)一個(gè)數(shù)據(jù)枢舶,而是同時(shí)記錄四個(gè)懦胞,64Bit被拆分為四個(gè)16位的子字段,分別記錄:

  • ①1-16bit/AC:記錄池中的活躍線(xiàn)程數(shù)
  • ②17-32bit/TC:記錄池中總線(xiàn)程數(shù)
  • ③33-48bit/SS:記錄WorkQueue狀態(tài)凉泄,第一位表示active還是inactive躏尉,其余15位表示版本號(hào)(避免ABA問(wèn)題)
  • ④49-64bit/ID:記錄第一個(gè)WorkQueue在數(shù)組中的下標(biāo),和其他worker通過(guò)字段stackPred組成的一個(gè)Treiber堆棧
  • 低32位可以直接獲取旧困,如SP=(int)ctl醇份,如果為負(fù)則代表存在空閑worker

ok~,了解了ctl成員之后吼具,再來(lái)看看runState成員僚纷,它和上篇分析的線(xiàn)程池中的runState不同,它除開(kāi)用于表示線(xiàn)程池的運(yùn)行狀態(tài)之外拗盒,同時(shí)也作為鎖資源保護(hù)WorkQueues[]數(shù)組的更新怖竭。

2.2.3.3、ForkJoinPool內(nèi)部類(lèi)WorkQueue工作隊(duì)列

WorkQueue是整個(gè)Fork/Join框架的橋接者陡蝇,每個(gè)執(zhí)行者ForkJoinWorkerThread對(duì)象中存在各自的工作隊(duì)列痊臭,F(xiàn)orkJoinTask被存儲(chǔ)在工作隊(duì)列中哮肚,而ForkJoinPool使用一個(gè)WorkQueue數(shù)組管理協(xié)調(diào)所有執(zhí)行者線(xiàn)程的隊(duì)列。接著再看看WorkQueue工作隊(duì)列的實(shí)現(xiàn):

// 使用@Contended防止偽共享
@sun.misc.Contended
static final class WorkQueue {
    // 隊(duì)列可存放任務(wù)數(shù)的初始容量:8192
    // 初始化時(shí)分配這么大容量的原因是為了減少哈希沖突導(dǎo)致的擴(kuò)容
    static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
    //  隊(duì)列可存放任務(wù)數(shù)的最大容量
    static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
    // 隊(duì)列的掃描狀態(tài)广匙。高十六位用于版本計(jì)數(shù)允趟,低16位用于記錄掃描狀態(tài)
    // 偶數(shù)位的WorkQueue,該值為負(fù)數(shù)時(shí)表示不活躍
    // 奇數(shù)位的WorkQueue鸦致,該值一般為workQueues數(shù)組中的下標(biāo)值潮剪,表示當(dāng)前線(xiàn)程在執(zhí)行
    // 如果scanState為負(fù)數(shù),代表線(xiàn)程沒(méi)有找到任務(wù)執(zhí)行分唾,被掛起了抗碰,處于不活躍狀態(tài)
    // 如果scanState是奇數(shù),代表線(xiàn)程在尋找任務(wù)過(guò)程中绽乔,如果變成了偶數(shù)弧蝇,代表線(xiàn)程在執(zhí)行任務(wù)
    volatile int scanState;
    // 前一個(gè)棧頂?shù)腸tl值
    int stackPred;
    // 竊取任務(wù)的數(shù)量統(tǒng)計(jì)
    int nsteals;
    // 用于記錄隨機(jī)選擇竊取任務(wù),被竊取任務(wù)workQueue在數(shù)組中的下標(biāo)值
    int hint;
    // 記錄當(dāng)前隊(duì)列在數(shù)組中的下標(biāo)和工作模式
    // 高十六位記錄工作模式折砸,低十六位記錄數(shù)組下標(biāo)
    int config;
    // 鎖標(biāo)識(shí)(類(lèi)似于AQS是state鎖標(biāo)識(shí)) 
    // 為1表示隊(duì)列被鎖定看疗,為0表示未鎖定
    // 小于0代表當(dāng)前隊(duì)列注銷(xiāo)或線(xiàn)程池關(guān)閉(terminate狀態(tài)時(shí)為-1)
    volatile int qlock;
    // 下一個(gè)pool操作的索引值(棧底/隊(duì)列頭部)
    volatile int base;
    // 下一個(gè)push操作的索引值(棧頂/隊(duì)列尾部)
    int top;
    // 存放任務(wù)的數(shù)組,初始化時(shí)不會(huì)分配空間睦授,采用懶加載形式初始化空間
    ForkJoinTask<?>[] array;
    // 所屬線(xiàn)程池的引用指向
    final ForkJoinPool pool;
    // 當(dāng)前隊(duì)列所屬線(xiàn)程的引用鹃觉,如果為外部提交任務(wù)的共享隊(duì)列則為null
    final ForkJoinWorkerThread owner;
    // 線(xiàn)程在執(zhí)行過(guò)程中如果被掛起阻塞,該成員保存被掛起的線(xiàn)程睹逃,否則為空
    volatile Thread parker;
    // 等待join合并的任務(wù)
    volatile ForkJoinTask<?> currentJoin;
    // 竊取過(guò)來(lái)的任務(wù)
    volatile ForkJoinTask<?> currentSteal;

    // 構(gòu)造器
    WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
        this.pool = pool;
        this.owner = owner;
        // 開(kāi)始的時(shí)候都指向棧頂
        base = top = INITIAL_QUEUE_CAPACITY >>> 1;
    }
    
    /* 重點(diǎn)方法 */
    // 添加方法:將一個(gè)任務(wù)添加進(jìn)隊(duì)列中
    // 注意:索引不是通過(guò)數(shù)組下標(biāo)計(jì)算的,而是通過(guò)計(jì)算內(nèi)存偏移量定位
    final void push(ForkJoinTask<?> task);
    // 擴(kuò)容方法:隊(duì)列元素?cái)?shù)量達(dá)到容量時(shí)祷肯,擴(kuò)容兩倍并移動(dòng)元素到新數(shù)組
    final ForkJoinTask<?>[] growArray();
    // 獲取方法:從棧頂(LIFO)彈出一個(gè)任務(wù)
    final ForkJoinTask<?> pop();
    // 獲取方法:從棧底(FIFO)彈出一個(gè)任務(wù)
    final ForkJoinTask<?> poll();
}

WorkQueue內(nèi)部采用一個(gè)數(shù)組存儲(chǔ)所有分配的任務(wù)沉填,線(xiàn)程執(zhí)行時(shí)會(huì)從該隊(duì)列中獲取任務(wù),如果數(shù)組為空佑笋,那么則會(huì)嘗試竊取其他線(xiàn)程的任務(wù)翼闹。

至此,結(jié)合前面談到的ForkJoinPool線(xiàn)程池的結(jié)構(gòu)一同理解蒋纬,在ForkJoinPool中存在一個(gè)由WorkQueue構(gòu)成的數(shù)組成員workQueues猎荠,而在每個(gè)WorkQueue中又存在一個(gè)ForkJoinTask構(gòu)成的數(shù)組成員array,所以Fork/Join框架中存儲(chǔ)任務(wù)的結(jié)構(gòu)如下:

ForkJoin任務(wù)存儲(chǔ)結(jié)構(gòu)

  • 重點(diǎn):
    • workQueues數(shù)組的容量必須為2的整次冪蜀备。下標(biāo)為偶數(shù)的用于存儲(chǔ)外部提交的任務(wù)关摇,奇數(shù)位置存儲(chǔ)內(nèi)部fork出的子任務(wù)
    • 偶數(shù)位置的任務(wù)屬于共享任務(wù),由工作線(xiàn)程競(jìng)爭(zhēng)獲取碾阁,模式為FIFO
    • 奇數(shù)位置的任務(wù)屬于某個(gè)工作線(xiàn)程输虱,一般是fork產(chǎn)生的子任務(wù)
    • 工作線(xiàn)程在處理完自身任務(wù)時(shí)會(huì)竊取其他線(xiàn)程的任務(wù),竊取方式為FIFO
    • 工作線(xiàn)程執(zhí)行自己隊(duì)列中任務(wù)的模式默認(rèn)為L(zhǎng)IFO(可以改成FIFO脂凶,不推薦)

關(guān)于ForkJoinWorkerThreadFactory線(xiàn)程工廠以及ManagedBlocker就不再闡述了宪睹,有興趣的可以自己去參考上篇文章研究一下愁茁。

三、ForkJoin框架任務(wù)提交原理

在前面給出的ForkJoin使用案例中亭病,我們使用invoke()方法將自己定義的任務(wù)提交給了ForkJoinPool線(xiàn)程池執(zhí)行鹅很。前面曾提到過(guò),提交任務(wù)的方式有三種:invoke()罪帖、execute()以及submit()促煮,但它們?nèi)N方式的最終實(shí)現(xiàn)都大致相同,所以我們從invoke()方法開(kāi)始胸蛛,以它作為入口分析ForkJoin框架任務(wù)提交任務(wù)的原理實(shí)現(xiàn)污茵。源碼如下:

// ForkJoinPool類(lèi) → invoke()方法
public <T> T invoke(ForkJoinTask<T> task) {
    // 如果任務(wù)為空,拋出空指針異常
    if (task == null)
        throw new NullPointerException();
    // 如果不為空則提交任務(wù)執(zhí)行
    externalPush(task);
    // 等待任務(wù)執(zhí)行結(jié)果返回
    return task.join();
}

// ForkJoinPool類(lèi) → externalPush()方法
final void externalPush(ForkJoinTask<?> task) {
    WorkQueue[] ws; WorkQueue q; int m;
    // 獲取線(xiàn)程的探針哈希值以及線(xiàn)程池運(yùn)行狀態(tài)
    int r = ThreadLocalRandom.getProbe();
    int rs = runState;
    // 判斷線(xiàn)程池是否具備了任務(wù)提交的環(huán)境
    // 如果工作隊(duì)列數(shù)組已經(jīng)初始化
    //      并且數(shù)組以及數(shù)組中偶數(shù)位的工作隊(duì)列不為空
    //      并且線(xiàn)程池狀態(tài)正常
    //      并且獲取隊(duì)列鎖成功
    // 滿(mǎn)足條件則開(kāi)始提交任務(wù)
    if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
        (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
        U.compareAndSwapInt(q, QLOCK, 0, 1)) {
        ForkJoinTask<?>[] a; int am, n, s;
        // 判斷隊(duì)列中的任務(wù)數(shù)組是否初始化并且數(shù)組是否還有空位
        if ((a = q.array) != null &&
            (am = a.length - 1) > (n = (s = q.top) - q.base)) {
            // 通過(guò)計(jì)算內(nèi)存偏移量得到任務(wù)要被的存儲(chǔ)索引
            int j = ((am & s) << ASHIFT) + ABASE;
            // 通過(guò)Unsafe類(lèi)將任務(wù)寫(xiě)入到數(shù)組中
            U.putOrderedObject(a, j, task);
            U.putOrderedInt(q, QTOP, s + 1);
            U.putIntVolatile(q, QLOCK, 0);
            // 如果隊(duì)列任務(wù)很多
            if (n <= 1)
                // 喚醒或者新啟一條線(xiàn)程幫忙處理
                signalWork(ws, q);
            return;
        }
        // 釋放隊(duì)列鎖
        U.compareAndSwapInt(q, QLOCK, 1, 0);
    }
    // 提交執(zhí)行
    externalSubmit(task);
}

// ForkJoinPool類(lèi) → externalSubmit()方法
private void externalSubmit(ForkJoinTask<?> task) {
    int r; 
    // 如果當(dāng)前提交任務(wù)的線(xiàn)程的探針哈希值為0葬项,
    // 則初始化當(dāng)前線(xiàn)程的探針哈希值
    if ((r = ThreadLocalRandom.getProbe()) == 0) {
        ThreadLocalRandom.localInit();
        r = ThreadLocalRandom.getProbe();
    }
    // 開(kāi)啟死循環(huán)直至成功提交任務(wù)為止
    for (;;) {
        WorkQueue[] ws; WorkQueue q; int rs, m, k;
        // 定義競(jìng)爭(zhēng)標(biāo)識(shí)
        boolean move = false;
        // 如果runState小于0代表為負(fù)數(shù)泞当,代表線(xiàn)程池已經(jīng)要關(guān)閉了
        if ((rs = runState) < 0) {
            // 嘗試關(guān)閉線(xiàn)程池
            tryTerminate(false, false);     // help terminate
            // 線(xiàn)程池關(guān)閉后同時(shí)拋出異常
            throw new RejectedExecutionException();
        }
        // 如果線(xiàn)程池還未初始化,先對(duì)線(xiàn)程池進(jìn)行初始化操作
        else if ((rs & STARTED) == 0 ||     // initialize
                 ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
            int ns = 0;
            // 獲取池鎖民珍,沒(méi)獲取鎖的線(xiàn)程則會(huì)自旋或者阻塞掛起
            rs = lockRunState();
            try {
                // 再次檢測(cè)是否已初始化
                if ((rs & STARTED) == 0) {
                    U.compareAndSwapObject(this, STEALCOUNTER, null,
                                           new AtomicLong());
                    // 獲取并行數(shù)
                    int p = config & SMASK; 
                    // 通過(guò)如下計(jì)算得到最接近2次冪的值
                    // 找到之后對(duì)該值 * 2倍
                    // 原理:將p中最高位的那個(gè)1以后的位都設(shè)置為1襟士,
                    // 最后加1得到最接近的二次冪的值
                    int n = (p > 1) ? p - 1 : 1;
                    n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
                    n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
                    workQueues = new WorkQueue[n];
                    ns = STARTED;
                }
            } finally {
                // 釋放鎖,并更改運(yùn)行狀態(tài)為STARTED
                unlockRunState(rs, (rs & ~RSLOCK) | ns);
            }
        }
        // r:隨機(jī)值,m:工作隊(duì)列的容量減1,SQMASK:偶數(shù)位最大的64個(gè)的掩碼
        // r&m計(jì)算出了下標(biāo)嚷量,位與SQMASK之后會(huì)變成一個(gè)<=126的偶數(shù)
        // 如果隨機(jī)出來(lái)的偶數(shù)位下標(biāo)位置隊(duì)列不為空
        else if ((q = ws[k = r & m & SQMASK]) != null) {
            // 先獲取隊(duì)列鎖
            if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                // 獲取隊(duì)列的任務(wù)數(shù)組
                ForkJoinTask<?>[] a = q.array;
                // 記錄隊(duì)列原本的棧頂/隊(duì)列尾部的數(shù)組下標(biāo)
                int s = q.top;
                // 提交標(biāo)識(shí)
                boolean submitted = false;
                try {
                    // 如果數(shù)組不為空并且數(shù)組中還有空位
                    // (a.length > s+1-q.base如果不成立則代表空位不足)
                    // 隊(duì)列元素?cái)?shù)量達(dá)到容量陋桂,沒(méi)有空位時(shí)調(diào)用growArray進(jìn)行擴(kuò)容
                    if ((a != null && a.length > s + 1 - q.base) ||
                        (a = q.growArray()) != null) {
                        // 通過(guò)計(jì)算內(nèi)存偏移量得到棧頂/隊(duì)列尾部位置
                        int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
                        // 將新的任務(wù)放在棧頂/隊(duì)列尾部位置
                        U.putOrderedObject(a, j, task);
                        // 更新棧頂/隊(duì)列尾部
                        U.putOrderedInt(q, QTOP, s + 1);
                        // 提交標(biāo)識(shí)改為true
                        submitted = true;
                    }
                } finally {
                    // 釋放隊(duì)列鎖
                    U.compareAndSwapInt(q, QLOCK, 1, 0);
                }
                // 如果任務(wù)已經(jīng)提交到了工作隊(duì)列
                if (submitted) {
                    // 創(chuàng)建新的線(xiàn)程處理,如果線(xiàn)程數(shù)已滿(mǎn)蝶溶,喚醒線(xiàn)程處理
                    signalWork(ws, q);
                    return;
                }
            }
            // 能執(zhí)行到這里則代表前面沒(méi)有獲取到鎖嗜历,該位置的隊(duì)列有其他線(xiàn)程在操作
            // 將競(jìng)爭(zhēng)標(biāo)識(shí)改為true
            move = true;            
        }
        // 如果隨機(jī)出來(lái)的偶數(shù)下標(biāo)位置的隊(duì)列為空
        // 那么則在該位置上新建工作隊(duì)列,然后將任務(wù)放進(jìn)去
        else if (((rs = runState) & RSLOCK) == 0) {
            // 新建一個(gè)工作隊(duì)列抖所,第二個(gè)參數(shù)是所屬線(xiàn)程
            // 現(xiàn)在創(chuàng)建的第二個(gè)參數(shù)為null梨州,因?yàn)榕紨?shù)位的隊(duì)列是共享的
            q = new WorkQueue(this, null);
            // 隊(duì)列記錄一下前面的隨機(jī)值
            q.hint = r;
            // k是前面計(jì)算出的偶數(shù)位置索引,SHARED_QUEUE是共享隊(duì)列模式
            // 使用高16位存儲(chǔ)隊(duì)列模式田轧,低16位存儲(chǔ)數(shù)組索引
            q.config = k | SHARED_QUEUE;
            // 掃描狀態(tài)為失活狀態(tài)(負(fù)數(shù)暴匠,因?yàn)楣蚕黻?duì)列
            // 不屬于任何一個(gè)工作線(xiàn)程,它不需要標(biāo)記工作線(xiàn)程狀態(tài))
            q.scanState = INACTIVE;
            // 獲取池鎖
            rs = lockRunState();
            // 將新創(chuàng)建的工作隊(duì)列放入數(shù)組中
            if (rs > 0 &&  (ws = workQueues) != null &&
                k < ws.length && ws[k] == null)
                ws[k] = q;
            // 釋放池鎖
            unlockRunState(rs, rs & ~RSLOCK);
        }
        else
            move = true;
        // 如果計(jì)算出的偶數(shù)位置有其他線(xiàn)程在操作傻粘,為了減少競(jìng)爭(zhēng)每窖,
        // 獲取下一個(gè)隨機(jī)值,重新定位一個(gè)新的位置處理
        if (move)
            r = ThreadLocalRandom.advanceProbe(r);
    }
}

源碼如上弦悉,可以很明顯的看到窒典,流程比較長(zhǎng),總結(jié)一下核心稽莉,如下:

  • ①判斷任務(wù)是否為空崇败,為空拋出異常,不為空則開(kāi)始提交任務(wù)
  • ②調(diào)用externalPush()方法嘗試快速提交任務(wù)
    • 快速提交條件:探針哈希值已初始化、池中隊(duì)列數(shù)組已初始化后室、隨機(jī)的偶數(shù)位置隊(duì)列不為空缩膝、線(xiàn)程池已初始化并狀態(tài)正常、能成功獲取隊(duì)列鎖
    • 如上條件全部成立則快速提交任務(wù)岸霹,提交成功直接返回疾层,結(jié)束執(zhí)行
    • 如果不成立則調(diào)用externalSubmit()提交任務(wù),流程如下述步驟
  • ③初始化線(xiàn)程的探針哈希值并開(kāi)啟死循環(huán)提交任務(wù)(這個(gè)循環(huán)會(huì)直至提交成功才終止)
  • ④檢查線(xiàn)程池狀態(tài)是否正常贡避,如果狀態(tài)為關(guān)閉狀態(tài)則拒絕任務(wù)痛黎,拋出異常
  • ⑤檢查線(xiàn)程池是否已啟動(dòng),如果還未啟動(dòng)則獲取池鎖刮吧,初始化線(xiàn)程池
  • ⑥如果通過(guò)探針值+隊(duì)列數(shù)組容量減一的值+掩碼計(jì)算出的偶數(shù)位隊(duì)列不為空:
    • 嘗試獲取隊(duì)列鎖:
      • 成功:將任務(wù)添加到計(jì)算出的偶數(shù)位隊(duì)列的任務(wù)數(shù)組中湖饱,如過(guò)數(shù)組長(zhǎng)度不夠則先擴(kuò)容,任務(wù)添加成功后新建或喚醒一條線(xiàn)程杀捻,然后返回
      • 失斁帷:代表有其他線(xiàn)程在操作這個(gè)偶數(shù)位置的隊(duì)列,將move標(biāo)識(shí)改為true
  • ⑦如果計(jì)算出的偶數(shù)位置隊(duì)列還未初始化致讥,那么則先嘗試獲取池鎖
    • 成功:在該位置上創(chuàng)建一個(gè)共享隊(duì)列仅仆,最后再釋放池鎖
    • 失敗:代表有其他線(xiàn)程也在操作池中的隊(duì)列數(shù)組垢袱,將move標(biāo)識(shí)改為true
  • ⑧如果move競(jìng)爭(zhēng)標(biāo)識(shí)為true墓拜,代表本次操作存在線(xiàn)程競(jìng)爭(zhēng),為了減少競(jìng)爭(zhēng)请契,重新獲取一個(gè)新的探針哈希值咳榜,計(jì)算出一個(gè)新的偶數(shù)位進(jìn)行操作
  • ⑨當(dāng)任務(wù)第一次執(zhí)行沒(méi)有添加成功時(shí),會(huì)繼續(xù)重復(fù)這些步驟爽锥,直至任務(wù)成功入列
    • 第一次添加失敗的情況:
      • 隊(duì)列存在其他線(xiàn)程操作沒(méi)有獲取到隊(duì)列鎖
      • 計(jì)算出的偶數(shù)索引位的隊(duì)列為空贿衍,第一次執(zhí)行會(huì)先初始化隊(duì)列
  • ⑩任務(wù)成功提交到工作隊(duì)列后,join等待任務(wù)執(zhí)行完成救恨,返回結(jié)果合并

對(duì)于上述步驟中提及到的一些名詞解釋?zhuān)?br> 探針哈希值:Thread類(lèi)threadLocalRandomProbe成員的值,ThreadLocalRandom.getProbe()通過(guò)Unsafe計(jì)算當(dāng)前線(xiàn)程的內(nèi)存偏移量來(lái)獲取
池鎖:runState的RSLOCK狀態(tài)释树,如果要對(duì)池中的隊(duì)列數(shù)組進(jìn)行操作則要先獲取這個(gè)鎖
隊(duì)列鎖:隊(duì)列中的qlock成員肠槽,如果要對(duì)隊(duì)列的任務(wù)數(shù)組進(jìn)行操作則要先獲取這個(gè)鎖
偶數(shù)位隊(duì)列:前面提到過(guò),池中的隊(duì)列數(shù)組workQueues下標(biāo)為偶數(shù)的位置用來(lái)存儲(chǔ)用戶(hù)提交的任務(wù)奢啥,屬于共享隊(duì)列秸仙,不屬于任何一條線(xiàn)程,里面的任務(wù)需要線(xiàn)程競(jìng)爭(zhēng)獲取

OK~桩盲,至此任務(wù)提交的流程分析完畢寂纪,關(guān)于execute()、submit()方法則不再分析,最終實(shí)現(xiàn)都是相同的捞蛋,只是入口不同孝冒。最后再來(lái)個(gè)圖理解一下:

ForkJoin任務(wù)提交流程

四、ForkJoin框架任務(wù)工作原理

在前面的提交原理分析中可以得知拟杉,任務(wù)的執(zhí)行都是通過(guò)調(diào)用signalWork()執(zhí)行的庄涡,而這個(gè)方法會(huì)新創(chuàng)建一條線(xiàn)程處理任務(wù),但當(dāng)線(xiàn)程數(shù)量已經(jīng)達(dá)到線(xiàn)程池的最大線(xiàn)程數(shù)時(shí)搬设,則會(huì)嘗試喚醒一條線(xiàn)程執(zhí)行穴店。下面則以signalWork()做為入口來(lái)分析ForkJoin框架任務(wù)工作原理,先來(lái)看看signalWork()源碼:

// ForkJoinPool類(lèi) → signalWork()方法
final void signalWork(WorkQueue[] ws, WorkQueue q) {
    long c; int sp, i; WorkQueue v; Thread p;
    // ctl初始化時(shí)為負(fù)數(shù)拿穴,如果ctl<0則代表有任務(wù)需要處理
    while ((c = ctl) < 0L) {
        // sp==0代表不存在空閑的線(xiàn)程
        // 前面分析成員構(gòu)成的時(shí)候提到過(guò):
        //      低32位可以直接獲取泣洞,如SP=(int)ctl,
        //      如果sp為負(fù)則代表存在空閑worker
        if ((sp = (int)c) == 0) {
            // 如果池中線(xiàn)程數(shù)量還未達(dá)到最大線(xiàn)程數(shù)
            if ((c & ADD_WORKER) != 0L)
                // 創(chuàng)建一條新的線(xiàn)程來(lái)處理工作
                tryAddWorker(c);
            // 新建線(xiàn)程完成后退出循環(huán)并返回
            break;
        }
        // 下面這三個(gè)判斷都是在檢測(cè)線(xiàn)程池狀態(tài)是否正常
        // 因?yàn)閟ignalWork只能框架內(nèi)部調(diào)用默色,所以傳入的隊(duì)列不可能為空球凰,
        // 除非是處于unstarted/terminated狀態(tài),代表線(xiàn)程池即將關(guān)閉该窗,
        // 嘗試中斷未執(zhí)行任務(wù)弟蚀,直接清空了任務(wù),所以此時(shí)直接中斷執(zhí)行
        if (ws == null)
            break;
        if (ws.length <= (i = sp & SMASK)) 
            break;
        if ((v = ws[i]) == null)
            break;
        // 下述代碼是獲取所有阻塞線(xiàn)程鏈中的top線(xiàn)程并喚醒它
        // 但是在喚醒之前需要先把top線(xiàn)程的stackPerd標(biāo)識(shí)放在ctl中
        int vs = (sp + SS_SEQ) & ~INACTIVE;        // next scanState
        int d = sp - v.scanState;                  // screen CAS
        long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
        // 利用CAS機(jī)制修改ctl值
        if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
            v.scanState = vs;
            if ((p = v.parker) != null)
                // 喚醒線(xiàn)程
                U.unpark(p);
            // 喚醒后退出
            break;
        }
        // 如果隊(duì)列為空或者沒(méi)有task酗失,退出執(zhí)行
        if (q != null && q.base == q.top)
            break;
    }
}
// ForkJoinPool類(lèi) → tryAddWorker()方法
private void tryAddWorker(long c) {
    // 定義新增標(biāo)識(shí)
    boolean add = false;
    do {
        // 添加活躍線(xiàn)程數(shù)和總線(xiàn)程數(shù)
        long nc = ((AC_MASK & (c + AC_UNIT)) |
                   (TC_MASK & (c + TC_UNIT)));
        // 如果ctl值沒(méi)有被其他線(xiàn)程修改
        if (ctl == c) {
            int rs, stop; 
            // 獲取鎖并檢測(cè)線(xiàn)程池狀態(tài)是否正常
            if ((stop = (rs = lockRunState()) & STOP) == 0)
                // 只有當(dāng)線(xiàn)程池沒(méi)有停止才可以創(chuàng)建線(xiàn)程
                add = U.compareAndSwapLong(this, CTL, c, nc);
            // 釋放池鎖
            unlockRunState(rs, rs & ~RSLOCK);
            // 如果線(xiàn)程池狀態(tài)已經(jīng)stop义钉,那么則退出執(zhí)行
            if (stop != 0)
                break;
            // 如果沒(méi)有stop
            if (add) {
                // 則新建線(xiàn)程
                createWorker();
                // 退出
                break;
            }
        }
    // ADD_WORKER的第48位是1,和ctl位與運(yùn)算是為了檢查總線(xiàn)程是否已滿(mǎn)
    // (int)c == 0代表池中不存在空閑線(xiàn)程數(shù)
    // 只有當(dāng)總線(xiàn)程數(shù)未滿(mǎn)時(shí)以及池中不存在空閑線(xiàn)程數(shù)才會(huì)創(chuàng)建線(xiàn)程
    } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
}

如上兩個(gè)方法的邏輯比較簡(jiǎn)單:

  • ①如果有任務(wù)需要處理并且池中目前不存在空閑線(xiàn)程并且池中線(xiàn)程還未滿(mǎn)规肴,調(diào)用tryAddWorker()方法嘗試創(chuàng)建線(xiàn)程
  • ②獲取池鎖更改ctl值并檢測(cè)線(xiàn)程池的狀態(tài)是否正常捶闸,正常則調(diào)用createWorker()創(chuàng)建線(xiàn)程
  • ③ryAddWorker()是一個(gè)自旋方法,在池中線(xiàn)程數(shù)未滿(mǎn)且沒(méi)有出現(xiàn)空閑線(xiàn)程的情況下拖刃,會(huì)一直循環(huán)至成功創(chuàng)建線(xiàn)程或者池關(guān)閉
  • ④如果池中存在空閑線(xiàn)程或者線(xiàn)程數(shù)已滿(mǎn)删壮,那么則會(huì)嘗試喚醒阻塞鏈上的第一條線(xiàn)程

4.1、工作線(xiàn)程創(chuàng)建及注冊(cè)原理

接著繼續(xù)看看createWorker()方法:

// ForkJoinPool類(lèi) → createWorker()方法
private boolean createWorker() {
    // 獲取池中的線(xiàn)程工廠
    ForkJoinWorkerThreadFactory fac = factory;
    Throwable ex = null;
    ForkJoinWorkerThread wt = null;
    try {
        // 通過(guò)線(xiàn)程工廠的newThread方法創(chuàng)建一條新線(xiàn)程
        if (fac != null && (wt = fac.newThread(this)) != null) {
            // 創(chuàng)建成功后返回true
            wt.start();
            return true;
        }
    } catch (Throwable rex) {
        // 如果出現(xiàn)異常則記錄異常信息
        ex = rex;
    }
    // 然后注銷(xiāo)線(xiàn)程以及將之前tryAddWorker()方法中修改的ctl值改回去
    deregisterWorker(wt, ex);
    return false;
}

// DefaultForkJoinWorkerThreadFactory類(lèi) → newThread()方法
public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
    // 直接創(chuàng)建了一條工作線(xiàn)程
    return new ForkJoinWorkerThread(pool);
}

創(chuàng)建工作線(xiàn)程的源碼比較簡(jiǎn)單兑牡,首先會(huì)獲取池中采用的線(xiàn)程工廠央碟,然后通過(guò)線(xiàn)程工廠創(chuàng)建一條ForkJoinWorkerThread工作線(xiàn)程。ok均函,再回到最開(kāi)始的ForkJoin框架成員構(gòu)成分析中的ForkJoinWorkerThread構(gòu)造函數(shù):

// ForkJoinWorkerThread類(lèi) → 構(gòu)造函數(shù)
protected ForkJoinWorkerThread(ForkJoinPool pool) {
    // 調(diào)用Thread父類(lèi)的構(gòu)造函數(shù)創(chuàng)建線(xiàn)程實(shí)體對(duì)象
    // 在這里是先暫時(shí)使用aFJT作為線(xiàn)程名稱(chēng)亿虽,當(dāng)外部傳遞線(xiàn)程名稱(chēng)時(shí)會(huì)替換
    super("aForkJoinWorkerThread");
    // 當(dāng)前設(shè)置線(xiàn)程池
    this.pool = pool;
    // 向ForkJoinPool線(xiàn)程池中注冊(cè)當(dāng)前線(xiàn)程,為當(dāng)前線(xiàn)程分配任務(wù)隊(duì)列
    this.workQueue = pool.registerWorker(this);
}

// ForkJoinPool類(lèi) → registerWorker()方法
final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
    // 異常策略
    UncaughtExceptionHandler handler;
    // 設(shè)置為守護(hù)線(xiàn)程
    wt.setDaemon(true);
    // 為創(chuàng)建的線(xiàn)程設(shè)置異常匯報(bào)策略
    if ((handler = ueh) != null)
        wt.setUncaughtExceptionHandler(handler);
    // 為創(chuàng)建的線(xiàn)程分配任務(wù)隊(duì)列
    WorkQueue w = new WorkQueue(this, wt);
    int i = 0;   // 池中隊(duì)列數(shù)組的索引
    int mode = config & MODE_MASK; // 獲取隊(duì)列模式
    int rs = lockRunState(); // 獲取池鎖
    try {
        WorkQueue[] ws; int n;   // skip if no array
        // 如果池中隊(duì)列數(shù)組不為空并且已經(jīng)初始化
        if ((ws = workQueues) != null && (n = ws.length) > 0) {
            // 獲取用于計(jì)算數(shù)組下標(biāo)隨機(jī)的索引種子
            int s = indexSeed += SEED_INCREMENT;
            int m = n - 1; // 獲取隊(duì)列數(shù)組的最大索引值
            // 計(jì)算出一個(gè)奇數(shù)位索引
            // 與1位或苞也,就是將第1個(gè)bit位設(shè)為1洛勉,此時(shí)這個(gè)數(shù)必然是奇數(shù)
            // 與m位與,為了保證得到的值是在m以?xún)?nèi)奇數(shù)下標(biāo)值
            i = ((s << 1) | 1) & m; 
            // 如果計(jì)算出的位置不為空則代表已經(jīng)有隊(duì)列了如迟,
            // 代表此時(shí)發(fā)生了碰撞沖突收毫,那么此時(shí)則需要換個(gè)位置
            if (ws[i] != null) {
                int probes = 0;
                // 計(jì)算步長(zhǎng)攻走,步長(zhǎng)是一個(gè)不能為2以及2次冪值的偶數(shù)
                // 為了保證計(jì)算出的值不為2的次冪值,會(huì)在最后進(jìn)行+2操作
                // 后續(xù)會(huì)用原本的索引值+步長(zhǎng)得到一個(gè)新的奇數(shù)索引值
                // 奇數(shù)+偶數(shù)=奇數(shù)此再,所以不需要擔(dān)心會(huì)成為偶數(shù)位索引
                // 這里計(jì)算步長(zhǎng)是通過(guò)長(zhǎng)度n來(lái)計(jì)算的昔搂,
                // 因?yàn)椴介L(zhǎng)大一些,避免沖突的概念就會(huì)小一些
                int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
                // 如果新計(jì)算出的奇數(shù)位索引位置依舊不為空
                while (ws[i = (i + step) & m] != null) {
                    // 從下標(biāo)0開(kāi)始遍歷整個(gè)數(shù)組
                    if (++probes >= n) {
                        // 如果所有奇數(shù)位值都不為空引润,代表數(shù)組滿(mǎn)了巩趁,
                        // 那么擴(kuò)容兩倍,擴(kuò)容后重新再遍歷一次新數(shù)組
                        // 直至找出為空的奇數(shù)位下標(biāo)
                        workQueues = ws = Arrays.copyOf(ws, n <<= 1);
                        m = n - 1;
                        probes = 0;
                    }
                }
            }
            // 記錄這個(gè)隨機(jī)出來(lái)的索引種子
            w.hint = s;          // use as random seed
            // 將前面計(jì)算得到的奇數(shù)位索引值以及工作模式記錄在config
            w.config = i | mode;
            // 掃描狀態(tài)隊(duì)列在數(shù)組中的下標(biāo)淳附,為正數(shù)表示正在掃描任務(wù)狀態(tài)
            w.scanState = i;    // publication fence
            // 將前面創(chuàng)建的隊(duì)列放在隊(duì)列數(shù)組的i位置上
            ws[i] = w;
        }
    } finally {
        // 釋放池鎖
        unlockRunState(rs, rs & ~RSLOCK);
    }
    // 在這里再設(shè)置線(xiàn)程的名稱(chēng)
    wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
    return w;
}

工作線(xiàn)程的創(chuàng)建與注冊(cè)原理:

  • ①將線(xiàn)程設(shè)置為守護(hù)線(xiàn)程议慰,同時(shí)為新線(xiàn)程創(chuàng)建工作隊(duì)列和設(shè)置異常處理策略
  • ②嘗試獲取池鎖成功后,先獲取一個(gè)隨機(jī)生成的用于計(jì)算數(shù)組下標(biāo)的索引種子奴曙,然后通過(guò)種子和數(shù)組最大下標(biāo)計(jì)算出一個(gè)奇數(shù)索引值
  • ③如果計(jì)算出的奇數(shù)位值不為空别凹,則通過(guò)偶數(shù)掩碼+數(shù)組最大下標(biāo)計(jì)算出一個(gè)偶數(shù)步長(zhǎng),然后通過(guò)這個(gè)步長(zhǎng)循環(huán)整個(gè)數(shù)組找一個(gè)空的位置洽糟,如果找完了整個(gè)數(shù)組還是沒(méi)有奇數(shù)空位炉菲,則對(duì)數(shù)組發(fā)生兩倍擴(kuò)容,然后再次依照步長(zhǎng)遍歷新數(shù)組找空位坤溃,直至找到奇數(shù)空位為止
  • ④為隊(duì)列設(shè)置hint拍霜、config、scanState值并將隊(duì)列放到計(jì)算出的奇數(shù)位置上
  • ⑤釋放池鎖并設(shè)置工作線(xiàn)程名字

工作線(xiàn)程注冊(cè)的原理實(shí)則不難理解薪介,難點(diǎn)在于計(jì)算奇數(shù)位索引有些玄妙祠饺,不理解的小伙伴可以看看下面這個(gè)案例:

// 模擬線(xiàn)程池中的隊(duì)列數(shù)組和EVENMASK偶數(shù)掩碼值
private static final int EVENMASK = 0xfffe;
private static Object[] ws = new Object[8];

public static void main(String[] args) {
    // 先將所有位置填滿(mǎn)
    for (int i = 0; i < ws.length; i++) {
        ws[i] = new Object();
    }
    // 然后開(kāi)始查找
    findOddNumberIdenx();
}

 private static void findOddNumberIdenx() {
    int n, m, i, probes;
    m = (n = ws.length) - 1;
    // 模擬第一次計(jì)算出的奇數(shù)位索引為3
    i = 3;
    probes = 0;
    int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
    while (ws[i = (i + step) & m] != null) {
        System.out.println("查找奇數(shù)位:" + i);
        if (++probes >= n) {
            System.out.println("擴(kuò)容兩倍....");
            ws = Arrays.copyOf(ws, n <<= 1);
            m = n - 1;
            probes = 0;
        }
    }
     System.out.println("最終確定索引值:" + i);
}

/*
 運(yùn)行結(jié)果:
    查找奇數(shù)位:1
    查找奇數(shù)位:7
    查找奇數(shù)位:5
    查找奇數(shù)位:3
    查找奇數(shù)位:1
    查找奇數(shù)位:7
    查找奇數(shù)位:5
    查找奇數(shù)位:3
    擴(kuò)容兩倍....
    最終確定索引值:9
*/

4.2、工作線(xiàn)程注銷(xiāo)原理

// ForkJoinPool類(lèi) → deregisterWorker()方法
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
    WorkQueue w = null;
    // 如果工作線(xiàn)程以及它的工作隊(duì)列不為空
    if (wt != null && (w = wt.workQueue) != null) {
        WorkQueue[] ws;
        // 獲取隊(duì)列在池中數(shù)組的下標(biāo)
        int idx = w.config & SMASK;
        // 獲取池鎖
        int rs = lockRunState();
        // 移除隊(duì)列數(shù)組中idx位置的隊(duì)列
        if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)
            ws[idx] = null;
        // 釋放池鎖
        unlockRunState(rs, rs & ~RSLOCK);
    }
    long c;
    // 在CTL成員中減去一個(gè)線(xiàn)程數(shù)
    do {} while (!U.compareAndSwapLong
                 (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
                                       (TC_MASK & (c - TC_UNIT)) |
                                       (SP_MASK & c))));
    if (w != null) {
        // 標(biāo)識(shí)這個(gè)隊(duì)列已停止工作
        w.qlock = -1; 
        // 將當(dāng)前工作隊(duì)列的偷取任務(wù)數(shù)加到ForkJoinPool#stealCounter中
        w.transferStealCount(this);
        // 取消隊(duì)列中剩余的任務(wù)
        w.cancelAll();
    }
    for (;;) {                                  
        WorkQueue[] ws; int m, sp;
        // 如果線(xiàn)程池是要關(guān)閉了汁政,那么直接退出
        if (tryTerminate(false, false) || w == null || w.array == null ||
            (runState & STOP) != 0 || (ws = workQueues) == null ||
            (m = ws.length - 1) < 0)    
            break;
        // 如果線(xiàn)程池不是要關(guān)閉道偷,那么先通過(guò)ctl看看有沒(méi)有阻塞的線(xiàn)程
        if ((sp = (int)(c = ctl)) != 0) {  
            // 如果有則喚醒它來(lái)代替被銷(xiāo)毀的線(xiàn)程工作
            if (tryRelease(c, ws[sp & m], AC_UNIT))
                break;
        }
        // 如果池中不存在阻塞掛起的線(xiàn)程,則先判斷池內(nèi)線(xiàn)程是否已滿(mǎn)
        else if (ex != null && (c & ADD_WORKER) != 0L) {
            // 如果沒(méi)滿(mǎn)則新建一條線(xiàn)程代替被銷(xiāo)毀的線(xiàn)程工作
            tryAddWorker(c);                  
            break;
        }
        // 如果池運(yùn)行正常记劈,不存在線(xiàn)程阻塞勺鸦,線(xiàn)程數(shù)已滿(mǎn)
        else
            // 那么直接退出
            break;
    }
    if (ex == null) 
        // 清理異常哈希表中當(dāng)前線(xiàn)程的異常節(jié)點(diǎn)信息
        ForkJoinTask.helpExpungeStaleExceptions();
    else
        // 拋出異常
        ForkJoinTask.rethrow(ex);
}

線(xiàn)程注銷(xiāo)的邏輯相對(duì)比較簡(jiǎn)單,如下:

  • ①獲取池鎖之后將工作線(xiàn)程的任務(wù)隊(duì)列從數(shù)組中移除目木,移除后釋放池鎖
  • ②將偷竊的任務(wù)數(shù)加到stealCounter成員换途,然后取消自身隊(duì)列中的所有任務(wù)
  • ③判斷當(dāng)前線(xiàn)程池的情況,判斷當(dāng)前銷(xiāo)毀線(xiàn)程是否是因?yàn)榫€(xiàn)程池要關(guān)閉了:
    • 如果是:直接退出
    • 如果不是:再判斷池中是否存在掛起阻塞的線(xiàn)程
      • 存在:?jiǎn)拘炎枞€(xiàn)程來(lái)代替被銷(xiāo)毀的線(xiàn)程工作
      • 不存在:判斷池中線(xiàn)程是否已滿(mǎn)
        • 沒(méi)滿(mǎn):新建一條線(xiàn)程代替被銷(xiāo)毀的線(xiàn)程工作
        • 滿(mǎn)了:直接退出
  • ④清除異常哈希表中當(dāng)前線(xiàn)程的異常節(jié)點(diǎn)信息刽射,然后拋出異常

總的來(lái)說(shuō)军拟,在銷(xiāo)毀線(xiàn)程時(shí),會(huì)先注銷(xiāo)已注冊(cè)的工作隊(duì)列柄冲,注銷(xiāo)之后會(huì)根據(jù)情況選擇喚醒或新建一條線(xiàn)程來(lái)補(bǔ)償線(xiàn)程池。

至此整個(gè)篇幅就較長(zhǎng)了忠蝗,關(guān)于工作線(xiàn)程的執(zhí)行现横、任務(wù)結(jié)果的合并以及任務(wù)竊取的實(shí)現(xiàn)原理則在下篇中為大家分析:(十二)《徹悟并發(fā)之JUC分治思想產(chǎn)物-ForkJoin分支合并框架原理剖析下篇》

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子戒祠,更是在濱河造成了極大的恐慌骇两,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,013評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件姜盈,死亡現(xiàn)場(chǎng)離奇詭異低千,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)馏颂,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,205評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門(mén)示血,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人救拉,你說(shuō)我怎么就攤上這事难审〉溃” “怎么了暑脆?”我有些...
    開(kāi)封第一講書(shū)人閱讀 152,370評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵往枷,是天一觀的道長(zhǎng)牺氨。 經(jīng)常有香客問(wèn)我实柠,道長(zhǎng)傻丝,這世上最難降的妖魔是什么粗悯? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,168評(píng)論 1 278
  • 正文 為了忘掉前任巾兆,我火速辦了婚禮蒂萎,結(jié)果婚禮上秆吵,老公的妹妹穿的比我還像新娘。我一直安慰自己岖是,他們只是感情好帮毁,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,153評(píng)論 5 371
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著豺撑,像睡著了一般烈疚。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上聪轿,一...
    開(kāi)封第一講書(shū)人閱讀 48,954評(píng)論 1 283
  • 那天爷肝,我揣著相機(jī)與錄音,去河邊找鬼陆错。 笑死灯抛,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的音瓷。 我是一名探鬼主播对嚼,決...
    沈念sama閱讀 38,271評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼绳慎!你這毒婦竟也來(lái)了纵竖?” 一聲冷哼從身側(cè)響起漠烧,我...
    開(kāi)封第一講書(shū)人閱讀 36,916評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎靡砌,沒(méi)想到半個(gè)月后已脓,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,382評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡通殃,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,877評(píng)論 2 323
  • 正文 我和宋清朗相戀三年度液,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片画舌。...
    茶點(diǎn)故事閱讀 37,989評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡堕担,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出骗炉,到底是詐尸還是另有隱情照宝,我是刑警寧澤,帶...
    沈念sama閱讀 33,624評(píng)論 4 322
  • 正文 年R本政府宣布句葵,位于F島的核電站厕鹃,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏乍丈。R本人自食惡果不足惜剂碴,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,209評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望轻专。 院中可真熱鬧忆矛,春花似錦、人聲如沸请垛。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,199評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)宗收。三九已至漫拭,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間混稽,已是汗流浹背采驻。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,418評(píng)論 1 260
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留匈勋,地道東北人礼旅。 一個(gè)月前我還...
    沈念sama閱讀 45,401評(píng)論 2 352
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像洽洁,于是被迫代替她去往敵國(guó)和親痘系。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,700評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容