Java技術(shù)專題「并發(fā)編程專題」Fork/Join框架基本使用和原理探究(基礎(chǔ)篇)

前提概述

Java 7開始引入了一種新的Fork/Join線程池夫啊,它可以執(zhí)行一種特殊的任務(wù):把一個大任務(wù)拆成多個小任務(wù)并行執(zhí)行刊苍。

我們舉個例子:如果要計(jì)算一個超大數(shù)組的和,最簡單的做法是用一個循環(huán)在一個線程內(nèi)完成:

算法原理介紹

相信大家此前或多或少有了解到ForkJoin框架,F(xiàn)orkJoin框架其實(shí)就是一個線程池ExecutorService的實(shí)現(xiàn),通過工作竊取(work-stealing)算法,獲取其他線程中未完成的任務(wù)來執(zhí)行移必。可以充分利用機(jī)器的多處理器優(yōu)勢毡鉴,利用空閑的線程去并行快速完成一個可拆分為小任務(wù)的大任務(wù)崔泵,類似于分治算法

實(shí)現(xiàn)達(dá)成目標(biāo)

  • ForkJoin的目標(biāo)猪瞬,就是利用所有可用的處理能力來提高程序的響應(yīng)和性能憎瘸。本文將介紹ForkJoin框架,依次介紹基礎(chǔ)特性陈瘦、案例使用幌甘、源碼剖析和實(shí)現(xiàn)亮點(diǎn)。

  • java.util.concurrent.ForkJoinPool由Java大師Doug Lea主持編寫痊项,它可以將一個大的任務(wù)拆分成多個子任務(wù)進(jìn)行并行處理锅风,最后將子任務(wù)結(jié)果合并成最后的計(jì)算結(jié)果,并進(jìn)行輸出鞍泉。

基本使用

入門例子皱埠,用Fork/Join框架使用示例,在這個示例中我們計(jì)算了1-5000累加后的值

public class TestForkAndJoinPlus {
    private static final Integer MAX = 400;
    static class WorkTask extends RecursiveTask<Integer> {
        // 子任務(wù)開始計(jì)算的值
        private Integer startValue;
        // 子任務(wù)結(jié)束計(jì)算的值
        private Integer endValue;
        public WorkTask(Integer startValue , Integer endValue) {
            this.startValue = startValue;
            this.endValue = endValue;
        }
        @Override
        protected Integer compute() {
            // 如果小于最小分片閾值咖驮,則說明要進(jìn)行相關(guān)的數(shù)據(jù)操作
            // 可以正式進(jìn)行累加計(jì)算了
            if(endValue - startValue < MAX) {
                System.out.println("開始計(jì)算的部分:startValue = " + startValue + ";endValue = " + endValue);
                Integer totalValue = 0;
                for(int index = this.startValue ; index <= this.endValue  ; index++) {
                    totalValue += index;
                }
                return totalValue;
            }
            // 否則再進(jìn)行任務(wù)拆分边器,拆分成兩個任務(wù)
            else {
                 // 因?yàn)椴捎枚址ㄑ凳啵鸱郑赃M(jìn)行1/2切分?jǐn)?shù)據(jù)量
                WorkTask subTask1 = new WorkTask(startValue, (startValue + endValue) / 2);
                subTask1.fork();//進(jìn)行拆分機(jī)制控制
                WorkTask subTask2 = new WorkTask((startValue + endValue) / 2 + 1 , endValue);
                subTask2.fork();
                return subTask1.join() + subTask2.join();
            }
        }
    }
    public static void main(String[] args) {
        // 這是Fork/Join框架的線程池
        ForkJoinPool pool = new ForkJoinPool();
        ForkJoinTask<Integer> taskFuture =  pool.submit(new MyForkJoinTask(1,1001));
        try {
            Integer result = taskFuture.get();
            System.out.println("result = " + result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace(System.out);
        }
    }
}

對此我封裝了一個框架集合忘巧,基于JDK1.8+中的Fork/Join框架實(shí)現(xiàn)恒界,參考的Fork/Join框架主要源代碼也基于JDK1.8+。

WorkTaskCallable實(shí)現(xiàn)抽象模型層次操作轉(zhuǎn)換

@Accessors(chain = true)
public class WorkTaskCallable<T> extends RecursiveTask<T> {

    /**
     * 斷言操作控制
     */
    @Getter
    private Predicate<T> predicate;

    /**
     * 執(zhí)行參數(shù)化分割條件
     */
    @Getter
    private T splitParam;

    /**
     * 操作拆分方法操作機(jī)制
     */
    @Getter
    private Function<Object,Object[]> splitFunction;

    /**
     * 操作合并方法操作機(jī)制
     */
    @Getter
    private BiFunction<Object,Object,T> mergeFunction;

    /**
     * 操作處理機(jī)制
     */
    @Setter
    @Getter
    private Function<T,T> processHandler;


    /**
     * 構(gòu)造器是否進(jìn)行分割操作
     * @param predicate 判斷是否進(jìn)行下一步分割的條件關(guān)系
     * @param splitParam 分割參數(shù)
     * @param splitFunction 分割方法
     * @param mergeFunction 合并數(shù)據(jù)操作
     */
    public WorkTaskCallable(Predicate predicate,T splitParam,Function<Object,Object[]> splitFunction,BiFunction<Object,Object,T> mergeFunction,Function<T,T> processHandler){
        this.predicate = predicate;
        this.splitParam = splitParam;
        this.splitFunction = splitFunction;
        this.mergeFunction = mergeFunction;
        this.processHandler = processHandler;
    }

    /**
     * 實(shí)際執(zhí)行調(diào)用操作機(jī)制
     * @return
     */
    @Override
    protected T compute() {
        if(predicate.test(splitParam)){
            Object[] result = splitFunction.apply(splitParam);
            WorkTaskCallable workTaskCallable1 = new WorkTaskCallable(predicate,result[0],splitFunction,mergeFunction,processHandler);
            workTaskCallable1.fork();
            WorkTaskCallable workTaskCallable2 = new WorkTaskCallable(predicate,result[1],splitFunction,mergeFunction,processHandler);
            workTaskCallable2.fork();
            return mergeFunction.apply(workTaskCallable1.join(),workTaskCallable2.join());
        }else{
            return processHandler.apply(splitParam);
        }
    }
}

ArrayListWorkTaskCallable實(shí)現(xiàn)List集合層次操作轉(zhuǎn)換


/**
 * @project-name:wiz-shrding-framework
 * @package-name:com.wiz.sharding.framework.boot.common.thread.forkjoin
 * @author:LiBo/Alex
 * @create-date:2021-09-09 17:26
 * @copyright:libo-alex4java
 * @email:liboware@gmail.com
 * @description:
 */
public class ArrayListWorkTaskCallable extends WorkTaskCallable<List>{



    static Predicate<List> predicateFunction = param->param.size() > 3;


    static Function<List,List[]> splitFunction = (param)-> {
        if(predicateFunction.test(param)){
            return new List[]{param.subList(0,param.size()/ 2),param.subList(param.size()/2,param.size())};
        }else{
            return new List[]{param.subList(0,param.size()+1),Lists.newArrayList()};
        }
    };

    static BiFunction<List,List,List> mergeFunction = (param1,param2)->{
        List datalist = Lists.newArrayList();
        datalist.addAll(param2);
        datalist.addAll(param1);
        return datalist;
    };


    /**
     * 構(gòu)造器是否進(jìn)行分割操作
     * @param predicate     判斷是否進(jìn)行下一步分割的條件關(guān)系
     * @param splitParam    分割參數(shù)
     * @param splitFunction 分割方法
     * @param mergeFunction 合并數(shù)據(jù)操作
     */
    public ArrayListWorkTaskCallable(Predicate<List> predicate, List splitParam, Function splitFunction, BiFunction mergeFunction,
                                     Function<List,List> processHandler) {
        super(predicate, splitParam, splitFunction, mergeFunction,processHandler);
    }




    public ArrayListWorkTaskCallable(List splitParam, Function splitFunction, BiFunction mergeFunction,
                                     Function<List,List> processHandler) {
        super(predicateFunction, splitParam, splitFunction, mergeFunction,processHandler);
    }


    public ArrayListWorkTaskCallable(Predicate<List> predicate,List splitParam,Function<List,List> processHandler) {
        this(predicate, splitParam, splitFunction, mergeFunction,processHandler);
    }


    public ArrayListWorkTaskCallable(List splitParam,Function<List,List> processHandler) {
        this(predicateFunction, splitParam, splitFunction, mergeFunction,processHandler);
    }



    public static void main(String[] args){
        List dataList = Lists.newArrayList(0,1,2,3,4,5,6,7,8,9);
        ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
        ForkJoinTask<List> forkJoinResult = forkJoinPool.submit(new ArrayListWorkTaskCallable(dataList,param->Lists.newArrayList(param.size())));
        try {
            System.out.println(forkJoinResult.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

ForkJoin代碼分析

ForkJoinPool構(gòu)造函數(shù)
  /**
     * Creates a {@code ForkJoinPool} with parallelism equal to {@link
     * java.lang.Runtime#availableProcessors}, using the {@linkplain
     * #defaultForkJoinWorkerThreadFactory default thread factory},
     * no UncaughtExceptionHandler, and non-async LIFO processing mode.
     *
     * @throws SecurityException if a security manager exists and
     *         the caller is not permitted to modify threads
     *         because it does not hold {@link
     *         java.lang.RuntimePermission}{@code ("modifyThread")}
     */
    public ForkJoinPool() {
        this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
             defaultForkJoinWorkerThreadFactory, null, false);
    }

    /**
     * Creates a {@code ForkJoinPool} with the indicated parallelism
     * level, the {@linkplain
     * #defaultForkJoinWorkerThreadFactory default thread factory},
     * no UncaughtExceptionHandler, and non-async LIFO processing mode.
     *
     * @param parallelism the parallelism level
     * @throws IllegalArgumentException if parallelism less than or
     *         equal to zero, or greater than implementation limit
     * @throws SecurityException if a security manager exists and
     *         the caller is not permitted to modify threads
     *         because it does not hold {@link
     *         java.lang.RuntimePermission}{@code ("modifyThread")}
     */
    public ForkJoinPool(int parallelism) {
        this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
    }

    /**
     * Creates a {@code ForkJoinPool} with the given parameters.
     *
     * @param parallelism the parallelism level. For default value,
     * use {@link java.lang.Runtime#availableProcessors}.
     * @param factory the factory for creating new threads. For default value,
     * use {@link #defaultForkJoinWorkerThreadFactory}.
     * @param handler the handler for internal worker threads that
     * terminate due to unrecoverable errors encountered while executing
     * tasks. For default value, use {@code null}.
     * @param asyncMode if true,
     * establishes local first-in-first-out scheduling mode for forked
     * tasks that are never joined. This mode may be more appropriate
     * than default locally stack-based mode in applications in which
     * worker threads only process event-style asynchronous tasks.
     * For default value, use {@code false}.
     * @throws IllegalArgumentException if parallelism less than or
     *         equal to zero, or greater than implementation limit
     * @throws NullPointerException if the factory is null
     * @throws SecurityException if a security manager exists and
     *         the caller is not permitted to modify threads
     *         because it does not hold {@link
     *         java.lang.RuntimePermission}{@code ("modifyThread")}
     */
    public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode) {
        this(checkParallelism(parallelism),
             checkFactory(factory),
             handler,
             (asyncMode ? FIFO_QUEUE : LIFO_QUEUE),
             "ForkJoinPool-" + nextPoolId() + "-worker-");
        checkPermission();
    }

    /**
     * Creates a {@code ForkJoinPool} with the given parameters, without
     * any security checks or parameter validation.  Invoked directly by
     * makeCommonPool.
     */
    private ForkJoinPool(int parallelism,
                         ForkJoinWorkerThreadFactory factory,
                         UncaughtExceptionHandler handler,
                         int mode,
                         String workerNamePrefix) {
        this.workerNamePrefix = workerNamePrefix;
        this.factory = factory;
        this.ueh = handler;
        this.mode = (short)mode;
        this.parallelism = (short)parallelism;
        long np = (long)(-parallelism); // offset ctl counts
        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
    }
  • parallelism:可并行級別砚嘴,F(xiàn)ork/Join框架將依據(jù)這個并行級別的設(shè)定仗处,決定框架內(nèi)并行執(zhí)行的線程數(shù)量。并行的每一個任務(wù)都會有一個線程進(jìn)行處理枣宫,但是千萬不要將這個屬性理解成Fork/Join框架中最多存在的線程數(shù)量婆誓。

  • factory:當(dāng)Fork/Join框架創(chuàng)建一個新的線程時,同樣會用到線程創(chuàng)建工廠也颤。只不過這個線程工廠不再需要實(shí)現(xiàn)ThreadFactory接口洋幻,而是需要實(shí)現(xiàn)ForkJoinWorkerThreadFactory接口。后者是一個函數(shù)式接口翅娶,只需要實(shí)現(xiàn)一個名叫newThread的方法文留。

在Fork/Join框架中有一個默認(rèn)的ForkJoinWorkerThreadFactory接口實(shí)現(xiàn):DefaultForkJoinWorkerThreadFactory。

  • handler:異常捕獲處理器竭沫。當(dāng)執(zhí)行的任務(wù)中出現(xiàn)異常燥翅,并從任務(wù)中被拋出時,就會被handler捕獲蜕提。

  • asyncMode:這個參數(shù)也非常重要森书,從字面意思來看是指的異步模式,它并不是說Fork/Join框架是采用同步模式還是采用異步模式工作谎势。Fork/Join框架中為每一個獨(dú)立工作的線程準(zhǔn)備了對應(yīng)的待執(zhí)行任務(wù)隊(duì)列凛膏,這個任務(wù)隊(duì)列是使用數(shù)組進(jìn)行組合的雙向隊(duì)列。即是說存在于隊(duì)列中的待執(zhí)行任務(wù)脏榆,即可以使用先進(jìn)先出的工作模式猖毫,也可以使用后進(jìn)先出的工作模式。

    • 先進(jìn)先出


      image
    • 后進(jìn)先出


      image
    • 當(dāng)asyncMode設(shè)置為true的時候须喂,隊(duì)列采用先進(jìn)先出方式工作吁断;反之則是采用后進(jìn)先出的方式工作,該值默認(rèn)為false

      • asyncMode ? FIFO_QUEUE : LIFO_QUEUE,

需要注意點(diǎn)

  • ForkJoinPool 一個構(gòu)造函數(shù)只帶有parallelism參數(shù)坞生,既是可以設(shè)定Fork/Join框架的最大并行任務(wù)數(shù)量仔役;另一個構(gòu)造函數(shù)則不帶有任何參數(shù),對于最大并行任務(wù)數(shù)量也只是一個默認(rèn)值——當(dāng)前操作系統(tǒng)可以使用的CPU內(nèi)核數(shù)量(Runtime.getRuntime().availableProcessors())恨胚。實(shí)際上ForkJoinPool還有一個私有的骂因、原生構(gòu)造函數(shù)炎咖,之上提到的三個構(gòu)造函數(shù)都是對這個私有的赃泡、原生構(gòu)造函數(shù)的調(diào)用寒波。

  • 如果你對Fork/Join框架沒有特定的執(zhí)行要求,可以直接使用不帶有任何參數(shù)的構(gòu)造函數(shù)升熊。也就是說推薦基于當(dāng)前操作系統(tǒng)可以使用的CPU內(nèi)核數(shù)作為Fork/Join框架內(nèi)最大并行任務(wù)數(shù)量俄烁,這樣可以保證CPU在處理并行任務(wù)時,盡量少發(fā)生任務(wù)線程間的運(yùn)行狀態(tài)切換(實(shí)際上單個CPU內(nèi)核上的線程間狀態(tài)切換基本上無法避免级野,因?yàn)椴僮飨到y(tǒng)同時運(yùn)行多個線程和多個進(jìn)程)页屠。


image
  • 從上面的的類關(guān)系圖可以看出來,F(xiàn)orkJoin框架的核心是ForkJoinPool類蓖柔,基于AbstractExecutorService擴(kuò)展(@sun.misc.Contended注解)辰企。

  • ForkJoinPool中維護(hù)了一個隊(duì)列數(shù)組WorkQueue[],每個WorkQueue維護(hù)一個ForkJoinTask數(shù)組和當(dāng)前工作線程。ForkJoinPool實(shí)現(xiàn)了工作竊取(work-stealing)算法并執(zhí)行ForkJoinTask况鸣。

ForkJoinPool類的屬性介紹
  • ADD_WORKER: 100000000000000000000000000000000000000000000000 -> 1000 0000 0000 0000,用來配合ctl在控制線程數(shù)量時使用

  • ctl: 控制ForkJoinPool創(chuàng)建線程數(shù)量牢贸,(ctl & ADD_WORKER) != 0L 時創(chuàng)建線程,也就是當(dāng)ctl的第16位不為0時镐捧,可以繼續(xù)創(chuàng)建線程

  • defaultForkJoinWorkerThreadFactory: 默認(rèn)線程工廠潜索,默認(rèn)實(shí)現(xiàn)是DefaultForkJoinWorkerThreadFactory

  • runState: 全局鎖控制,全局運(yùn)行狀態(tài)

  • workQueues: 工作隊(duì)列數(shù)組WorkQueue[]

  • config: 記錄并行數(shù)量和ForkJoinPool的模式(異步或同步)

WorkQueue類
  • qlock: 并發(fā)控制懂酱,put任務(wù)時的鎖控制

  • array: 任務(wù)數(shù)組ForkJoinTask<?>[]

  • pool: ForkJoinPool竹习,所有線程和WorkQueue共享,用于工作竊取列牺、任務(wù)狀態(tài)和工作狀態(tài)同步

  • base: array數(shù)組中取任務(wù)的下標(biāo)

  • top: array數(shù)組中放置任務(wù)的下標(biāo)

  • owner: 所屬線程整陌,F(xiàn)orkJoin框架中,只有一個WorkQueue是沒有owner的瞎领,其他的均有具體線程owner


image

ForkJoinTask是能夠在ForkJoinPool中執(zhí)行的任務(wù)抽象類蔓榄,父類是Future,具體實(shí)現(xiàn)類有很多默刚,這里主要關(guān)注RecursiveAction和RecursiveTask甥郑。

  • RecursiveAction是沒有返回結(jié)果的任務(wù)
  • RecursiveTask是需要返回結(jié)果的任務(wù)。
ForkJoinTask類屬性的介紹

status: 任務(wù)的狀態(tài)荤西,對其他工作線程和pool可見澜搅,運(yùn)行正常則status為負(fù)數(shù),異常情況為正數(shù)邪锌。

ForkJoinTask功能介紹
  • ForkJoinTask任務(wù)是一種能在Fork/Join框架中運(yùn)行的特定任務(wù)勉躺,也只有這種類型的任務(wù)可以在Fork/Join框架中被拆分運(yùn)行和合并運(yùn)行。

  • ForkJoinWorkerThread線程是一種在Fork/Join框架中運(yùn)行的特性線程觅丰,它除了具有普通線程的特性外饵溅,最主要的特點(diǎn)是每一個ForkJoinWorkerThread線程都具有一個獨(dú)立的任務(wù)等待隊(duì)列(work queue),這個任務(wù)隊(duì)列用于存儲在本線程中被拆分的若干子任務(wù)妇萄。


只需要實(shí)現(xiàn)其compute()方法蜕企,在compute()中做最小任務(wù)控制咬荷,任務(wù)分解(fork)和結(jié)果合并(join)。

image

ForkJoinPool中執(zhí)行的默認(rèn)線程是ForkJoinWorkerThread轻掩,由默認(rèn)工廠產(chǎn)生幸乒,可以自己重寫要實(shí)現(xiàn)的工作線程。同時會將ForkJoinPool引用放在每個工作線程中唇牧,供工作竊取時使用罕扎。

ForkJoinWorkerThread類屬性介紹
  • pool: ForkJoinPool,所有線程和WorkQueue共享丐重,用于工作竊取腔召、任務(wù)狀態(tài)和工作狀態(tài)同步。
  • workQueue: 當(dāng)前線程的任務(wù)隊(duì)列扮惦,與WorkQueue的owner呼應(yīng)宴咧。

簡易執(zhí)行圖

image

實(shí)際上Fork/Join框架的內(nèi)部工作過程要比這張圖復(fù)雜得多,例如如何決定某一個recursive task是使用哪條線程進(jìn)行運(yùn)行径缅;再例如如何決定當(dāng)一個任務(wù)/子任務(wù)提交到Fork/Join框架內(nèi)部后掺栅,是創(chuàng)建一個新的線程去運(yùn)行還是讓它進(jìn)行隊(duì)列等待。

邏輯模型圖(盜一張圖:)

盜一張圖:

()

fork方法和join方法

Fork/Join框架中提供的fork方法和join方法纳猪,可以說是該框架中提供的最重要的兩個方法氧卧,它們和parallelism“可并行任務(wù)數(shù)量”配合工作。

Fork方法介紹
  • Fork就是一個不斷分枝的過程氏堤,在當(dāng)前任務(wù)的基礎(chǔ)上長出n多個子任務(wù)沙绝,他將新創(chuàng)建的子任務(wù)放入當(dāng)前線程的work queue隊(duì)列中,F(xiàn)ork/Join框架將根據(jù)當(dāng)前正在并發(fā)執(zhí)行ForkJoinTask任務(wù)的ForkJoinWorkerThread線程狀態(tài)鼠锈,決定是讓這個任務(wù)在隊(duì)列中等待闪檬,還是創(chuàng)建一個新的ForkJoinWorkerThread線程運(yùn)行它,又或者是喚起其它正在等待任務(wù)的ForkJoinWorkerThread線程運(yùn)行它购笆。

當(dāng)一個ForkJoinTask任務(wù)調(diào)用fork()方法時粗悯,當(dāng)前線程會把這個任務(wù)放入到queue數(shù)組的queueTop位置,然后執(zhí)行以下兩句代碼:

if ((s -= queueBase) <= 2)
    pool.signalWork();
else if (s == m)
    growQueue();

當(dāng)調(diào)用signalWork()方法同欠。signalWork()方法做了兩件事:1样傍、喚配當(dāng)前線程;2、當(dāng)沒有活動線程時或者線程數(shù)較少時铺遂,添加新的線程衫哥。


Join方法介紹

Join是一個不斷等待,獲取任務(wù)執(zhí)行結(jié)果的過程襟锐。

private int doJoin() {
    Thread t; ForkJoinWorkerThread w; int s; boolean completed;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
        if ((s = status) < 0)
            return s;
        if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
            try {
                completed = exec();
            } catch (Throwable rex) {
                return setExceptionalCompletion(rex);
            }
            if (completed)
                return setCompletion(NORMAL);
        }
        return w.joinTask(this);
    }
    else
        return externalAwaitDone();
}
  • 第4行撤逢,(s=status)<0表示這個任務(wù)被執(zhí)行完,直接返回執(zhí)行結(jié)果狀態(tài),上層捕獲到狀態(tài)后蚊荣,決定是要獲取結(jié)果還是進(jìn)行錯誤處理初狰;
  • 第6行,從queue中取出這個任務(wù)來執(zhí)行妇押,如果執(zhí)行完了,就設(shè)置狀態(tài)為NORMAL姓迅;
  • 前面unpushTask()方法在隊(duì)列中沒有這個任務(wù)時會返回false敲霍,15行調(diào)用joinTask等待這個任務(wù)完成。
  • 由于ForkJoinPool中有一個數(shù)組叫submissionQueue丁存,通過submit方法調(diào)用而且非ForkJoinTask這種任務(wù)會被放到這個隊(duì)列中肩杈。這種任務(wù)有可能被非ForkJoinWorkerThread線程執(zhí)行,第18行表示如果是這種任務(wù)解寝,等待它執(zhí)行完成扩然。
    下面來看joinTask方法
final int joinTask(ForkJoinTask<?> joinMe) {
    ForkJoinTask<?> prevJoin = currentJoin;
    currentJoin = joinMe;
    for (int s, retries = MAX_HELP;;) {
        if ((s = joinMe.status) < 0) {
            currentJoin = prevJoin;
            return s;
        }
        if (retries > 0) {
            if (queueTop != queueBase) {
                if (!localHelpJoinTask(joinMe))
                    retries = 0;           // cannot help
            }
            else if (retries == MAX_HELP >>> 1) {
                --retries;                 // check uncommon case
                if (tryDeqAndExec(joinMe) >= 0)
                    Thread.yield();        // for politeness
            }
            else
                retries = helpJoinTask(joinMe) ? MAX_HELP : retries - 1;
        }
        else {
            retries = MAX_HELP;           // restart if not done
            pool.tryAwaitJoin(joinMe);
        }
    }
}
  • (1)這里有個常量MAX_HELP=16,表示幫助join的次數(shù)。第11行聋伦,queueTop!=queueBase表示本地隊(duì)列中有任務(wù)夫偶,如果這個任務(wù)剛好在隊(duì)首,則嘗試自己執(zhí)行觉增;否則返回false兵拢。這時retries被設(shè)置為0,表示不能幫助,因?yàn)樽砸殃?duì)列不為空逾礁,自己并不空閑说铃。在下一次循環(huán)就會進(jìn)入第24行,等待這個任務(wù)執(zhí)行完成嘹履。
  • (2)第20行helpJoinTask()方法返回false時腻扇,retries-1,連續(xù)8次都沒有幫到忙砾嫉,就會進(jìn)入第14行幼苛,調(diào)用yield讓權(quán)等待。沒辦法人口太差焕刮,想做點(diǎn)好事都不行蚓峦,只有停下來休息一下。
  • (3)當(dāng)執(zhí)行到第20行济锄,表示自己隊(duì)列為空暑椰,可以去幫助這個任務(wù)了,下面來看是怎么幫助的荐绝?
outer:for (ForkJoinWorkerThread thread = this;;) {
    // Try to find v, the stealer of task, by first using hint
    ForkJoinWorkerThread v = ws[thread.stealHint & m];
    if (v == null || v.currentSteal != task) {
        for (int j = 0; ;) {        // search array
            if ((v = ws[j]) != null && v.currentSteal == task) {
                thread.stealHint = j;
                break;              // save hint for next time
            }
            if (++j > m)
                break outer;        // can't find stealer
        }
    }
    // Try to help v, using specialized form of deqTask
    for (;;) {
        ForkJoinTask<?>[] q; int b, i;
        if (joinMe.status < 0)
            break outer;
        if ((b = v.queueBase) == v.queueTop ||
            (q = v.queue) == null ||
            (i = (q.length-1) & b) < 0)
            break;                  // empty
        long u = (i << ASHIFT) + ABASE;
        ForkJoinTask<?> t = q[i];
        if (task.status < 0)
            break outer;            // stale
        if (t != null && v.queueBase == b &&
            UNSAFE.compareAndSwapObject(q, u, t, null)) {
            v.queueBase = b + 1;
            v.stealHint = poolIndex;
            ForkJoinTask<?> ps = currentSteal;
            currentSteal = t;
            t.doExec();
            currentSteal = ps;
            helped = true;
        }
    }
    // Try to descend to find v's stealer
    ForkJoinTask<?> next = v.currentJoin;
    if (--levels > 0 && task.status >= 0 &&
        next != null && next != task) {
        task = next;
        thread = v;
    }
}
  • (1)通過查看stealHint這個字段的注釋可以知道一汽,它表示最近一次誰來偷過我的queue中的任務(wù)。因此通過stealHint并不能找到當(dāng)前任務(wù)被誰偷了?所以第4行v.currentSteal != task完全可能召夹。這時還有一個辦法找到這個任務(wù)被誰偷了岩喷,看看currentSteal這個字段的注釋表示最近偷的哪個任務(wù)。這里掃描所有偷來的任務(wù)與當(dāng)前任務(wù)比較监憎,如果相等纱意,就是這個線程偷的。如果這兩種方法都不能找到小偷鲸阔,只能等待了偷霉。
  • (2)當(dāng)找到了小偷后,以其人之身還之其人之道褐筛,從小偷那里偷任務(wù)過來类少,相當(dāng)于你和小偷共同執(zhí)行你的任務(wù),會加速你的任務(wù)完成渔扎。
  • (3)小偷也是爺硫狞,如果小偷也在等待一個任務(wù)完成氛雪,權(quán)利反轉(zhuǎn)(小偷等待的這個任務(wù)做為當(dāng)前任務(wù)乒躺,小偷扮演當(dāng)事人角色把前面的流程走一遍)房铭,這是一個遞歸的過程榆骚。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末掸哑,一起剝皮案震驚了整個濱河市获印,隨后出現(xiàn)的幾起案子杆怕,更是在濱河造成了極大的恐慌糠溜,老刑警劉巖笤虫,帶你破解...
    沈念sama閱讀 217,084評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件旁瘫,死亡現(xiàn)場離奇詭異,居然都是意外死亡琼蚯,警方通過查閱死者的電腦和手機(jī)酬凳,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,623評論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來遭庶,“玉大人宁仔,你說我怎么就攤上這事÷退” “怎么了翎苫?”我有些...
    開封第一講書人閱讀 163,450評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長榨了。 經(jīng)常有香客問我煎谍,道長,這世上最難降的妖魔是什么龙屉? 我笑而不...
    開封第一講書人閱讀 58,322評論 1 293
  • 正文 為了忘掉前任呐粘,我火速辦了婚禮满俗,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘作岖。我一直安慰自己唆垃,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,370評論 6 390
  • 文/花漫 我一把揭開白布痘儡。 她就那樣靜靜地躺著辕万,像睡著了一般。 火紅的嫁衣襯著肌膚如雪沉删。 梳的紋絲不亂的頭發(fā)上渐尿,一...
    開封第一講書人閱讀 51,274評論 1 300
  • 那天,我揣著相機(jī)與錄音丑念,去河邊找鬼涡戳。 笑死结蟋,一個胖子當(dāng)著我的面吹牛脯倚,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播嵌屎,決...
    沈念sama閱讀 40,126評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼推正,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了宝惰?” 一聲冷哼從身側(cè)響起植榕,我...
    開封第一講書人閱讀 38,980評論 0 275
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎尼夺,沒想到半個月后尊残,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,414評論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡淤堵,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,599評論 3 334
  • 正文 我和宋清朗相戀三年寝衫,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片拐邪。...
    茶點(diǎn)故事閱讀 39,773評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡慰毅,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出扎阶,到底是詐尸還是另有隱情汹胃,我是刑警寧澤,帶...
    沈念sama閱讀 35,470評論 5 344
  • 正文 年R本政府宣布东臀,位于F島的核電站着饥,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏惰赋。R本人自食惡果不足惜贱勃,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,080評論 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧贵扰,春花似錦仇穗、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,713評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至舞丛,卻和暖如春耘子,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背球切。 一陣腳步聲響...
    開封第一講書人閱讀 32,852評論 1 269
  • 我被黑心中介騙來泰國打工谷誓, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人吨凑。 一個月前我還...
    沈念sama閱讀 47,865評論 2 370
  • 正文 我出身青樓捍歪,卻偏偏與公主長得像,于是被迫代替她去往敵國和親鸵钝。 傳聞我的和親對象是個殘疾皇子糙臼,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,689評論 2 354

推薦閱讀更多精彩內(nèi)容