JUC并行計算框架 Fork/Join 原理圖文詳解&代碼示例

關(guān)鍵詞:divide and conquer algorithm际起,work-stealing,WorkQueue

ForkJoinPool 是什么?

ForkJoinPool 是 JDK 7 中妓局,@author Doug Lea 加入的一個線程池類萌焰。Fork/Join 框架的核心原理就是分治算法(Divide-and-Conquer)和工作竊取算法(work-stealing algorithm)哺眯。

Fork分解任務(wù)成獨立的子任務(wù),用多線程去執(zhí)行這些子任務(wù)扒俯,Join合并子任務(wù)的結(jié)果奶卓。這樣就能使用多線程的方式來執(zhí)行一個任務(wù)一疯。

JDK7引入的Fork/Join有三個核心類:

  • ForkJoinPool,執(zhí)行任務(wù)的線程池
  • ForkJoinWorkerThread夺姑,執(zhí)行任務(wù)的工作線程
  • ForkJoinTask窿撬,一個用于ForkJoinPool的任務(wù)抽象類迎瞧。

ForkJoinPool是框架的核心,不同于其他線程池,它的構(gòu)建不需要提供核心線程數(shù)刹碾,最大線程數(shù),阻塞隊列等旬薯,還增加了未捕獲異常處理器般又,而該處理器會交給工作線程,由該線程處理丐黄,這樣的好處在于當(dāng)一個線程的工作隊列上的某個任務(wù)出現(xiàn)異常時斋配,不至于結(jié)束掉線程,而是讓它繼續(xù)運行隊列上的其他任務(wù)灌闺。它會依托于并行度(或默認(rèn)根據(jù)核數(shù)計算)來決定最大線程數(shù)艰争,它內(nèi)部維護(hù)了WorkQueue數(shù)組ws取代了阻塞隊列,ws中下標(biāo)為奇數(shù)的為工作線程的所屬隊列桂对,偶數(shù)的為共享隊列甩卓,雖然名稱有所區(qū)分,但重要的區(qū)別只有一點:共享隊列不存在工作線程蕉斜。

ForkJoinPool 的狀態(tài)控制變量有:

    // runState bits: SHUTDOWN must be negative, others arbitrary powers of two
    private static final int  RSLOCK     = 1;
    private static final int  RSIGNAL    = 1 << 1;
    private static final int  STARTED    = 1 << 2;
    private static final int  STOP       = 1 << 29;
    private static final int  TERMINATED = 1 << 30;
    private static final int  SHUTDOWN   = 1 << 31;

    // Instance fields
    volatile long ctl;                   // main pool control
    volatile int runState;               // lockable status
    final int config;                    // parallelism, mode
    int indexSeed;                       // to generate worker index
    volatile WorkQueue[] workQueues;     // main registry
    final ForkJoinWorkerThreadFactory factory;
    final UncaughtExceptionHandler ueh;  // per-worker UEH
    final String workerNamePrefix;       // to create worker name string
    volatile AtomicLong stealCounter;    // also used as sync monitor

ForkJoinPool維護(hù)了一個ctl控制信號猛频,前16位表示活躍worker數(shù),33至48位表示worker總數(shù)蛛勉,后32位可以粗略理解用于表示worker等待隊列的棧頂鹿寻。ForkJoinPool利用這個ctl,WorkQueue的scanState和stackPred以及ws的索引算法維護(hù)了一個類似隊列(或者叫棧更貼切一些)的數(shù)據(jù)結(jié)構(gòu)诽凌。每當(dāng)有一個線程偷不到任務(wù)毡熏,就會存放此前的ctl后置標(biāo)記位到pred,并將自己的索引交給ctl作為棧頂侣诵。相應(yīng)的喚醒操作則由棧頂起痢法。相應(yīng)的方法在進(jìn)行嘗試添加worker時,會綜合當(dāng)前是否有阻塞等待任務(wù)的線程杜顺。

當(dāng)所有線程都不能竊取到新的任務(wù)财搁,進(jìn)入等待隊列時,稱之為“靜寂態(tài)”躬络。

ForkJoinPool對全局全狀的修改需要加鎖進(jìn)行尖奔,這些操作如修改ctl(改變棧頂,增刪活躍數(shù)或總數(shù)等),處理ws中的元素提茁,擴容ws淹禾,關(guān)閉線程池,初始化(包含ws的初始化)茴扁,注冊線程入池等铃岔。而這個鎖就是runState,它除了當(dāng)鎖峭火,也間接表示了運行狀態(tài)毁习,相應(yīng)的線程池的SHUTDOWN,STOP,TERMINATED等狀態(tài)均與其相應(yīng)的位有關(guān)。

線程池的并行度保存在config字段的后16位卖丸,config的第17位決定了是FIFO還是LIFO纺且。而這個并行度也通過間接地取反并計入到ctl的前32位,線程池中判斷是否當(dāng)前有活躍的線程坯苹,或者是否已進(jìn)入寂靜態(tài),都是用保存在config的并行度和保存在ctl前32位的活躍數(shù)與并行度的運算結(jié)果進(jìn)行相加摇天,判斷是否會溢出(正數(shù))來決定的粹湃。

ForkJoinPool還提供了補償機制,用于在線程將要阻塞在執(zhí)行過程中前釋放掉一個正在空閑的工作線程或創(chuàng)建一個新的工作線程泉坐,從而保證了并行度为鳄。

因為ForkJoinTask比較復(fù)雜,抽象方法比較多腕让,日常使用時一般不會繼承ForkJoinTask來實現(xiàn)自定義的任務(wù)孤钦,而是繼承ForkJoinTask的兩個子類:

  • RecursiveTask:子任務(wù)帶返回結(jié)果時使用
  • RecursiveAction:子任務(wù)不帶返回結(jié)果時使用

關(guān)于Fork/Join框架的原理,可參考:Doug Lea的文章:A Java Fork/Join Framework http://gee.cs.oswego.edu/dl/papers/fj.pdf

框架模型

ForkJoinPool 不是為了替代 ExecutorService纯丸,而是它的補充偏形,在某些應(yīng)用場景下性能比 ExecutorService 更好。

ForkJoinPool 主要用于實現(xiàn)“分而治之”的算法觉鼻,特別是分治之后遞歸調(diào)用的函數(shù)俊扭,例如 quick sort 等。

ForkJoinPool 最適合的是計算密集型的任務(wù)坠陈,如果存在 I/O萨惑,線程間同步,sleep() 等會造成線程長時間阻塞的情況時仇矾,最好配合使用 ManagedBlocker庸蔼。

ForkJoinPool 分治算法思想

分治(divide and conquer),也就是把一個復(fù)雜的問題分解成相似的子問題贮匕,然后子問題再分子問題姐仅,直到問題分的很簡單不必再劃分了。然后層層返回子問題的結(jié)果,最終合并返回問題結(jié)果萍嬉。

分治在算法上有很多應(yīng)用乌昔,類似大數(shù)據(jù)的MapReduce,歸并算法壤追、快速排序算法等磕道。JUC中的Fork/Join的并行計算框架類似于單機版的 MapReduce。

Fork/Join 行冰,從字面上我們就可以理解溺蕉,分治的過程分為兩個階段,第一個階段分解任務(wù)(fork)悼做,把任務(wù)分解為一個個小任務(wù)直至小任務(wù)可以簡單的計算返回結(jié)果疯特。

第二階段合并結(jié)果(join),把每個小任務(wù)的結(jié)果合并返回得到最終結(jié)果肛走。而Fork就是分解任務(wù)漓雅,Join就是合并結(jié)果。

Fork/Join框架主要包含兩部分:ForkJoinPool朽色、ForkJoinTask邻吞。

ForkJoinPool就是治理分治任務(wù)的線程池。它和在之前的文章提到ThreadPoolExecutor線程池葫男,共同點都是消費者-生產(chǎn)者模式的實現(xiàn)抱冷,但是有一些不同。

ThreadPoolExecutor的線程池是只有一個任務(wù)隊列的梢褐,而ForkJoinPool有多個任務(wù)隊列旺遮。通過ForkJoinPool的invoke或submit或execute提交任務(wù)的時候會根據(jù)一定規(guī)則分配給不同的任務(wù)隊列,并且任務(wù)隊列的雙端隊列盈咳。

ForkJoinPool 的每個工作線程都維護(hù)著一個工作隊列(WorkQueue)耿眉,這是一個雙端隊列(Deque),里面存放的對象是任務(wù)(ForkJoinTask)鱼响。 其中ForkJoinTask代表一個可以并行跷敬、合并的任務(wù)。ForkJoinTask是一個抽象類热押,它還有兩個抽象子類:RecusiveAction和RecusiveTask西傀。其中RecusiveTask代表有返回值的任務(wù),而RecusiveAction代表沒有返回值的任務(wù)桶癣。

ForkJoinPool 與 ExecutorService

ForkJoinPool是ExecutorService的實現(xiàn)類拥褂,因此是一種特殊的線程池。
使用方法:創(chuàng)建了ForkJoinPool實例之后牙寞,就可以調(diào)用ForkJoinPool的submit(ForkJoinTask task) 或invoke(ForkJoinTask task)方法來執(zhí)行指定任務(wù)了饺鹃。

那么莫秆,使用ThreadPoolExecutor或者ForkJoinPool,會有什么性能的差異呢悔详?

使用ForkJoinPool能夠使用數(shù)量有限的線程來完成非常多的具有父子關(guān)系的任務(wù)镊屎,比如使用4個線程來完成超過200萬個任務(wù)。但是茄螃,使用ThreadPoolExecutor時缝驳,是不可能完成的,因為ThreadPoolExecutor中的Thread無法選擇優(yōu)先執(zhí)行子任務(wù)归苍,需要完成200萬個具有父子關(guān)系的任務(wù)時用狱,也需要200萬個線程,顯然這是不可行的拼弃。

這正是工作竊取模式的優(yōu)點夏伊。

ForkJoinPool is an advanced version of ThreadPoolExecutor with concepts like work-stealing which enable faster and efficient solving of divide and conquer algorithms.

我們常用的數(shù)組工具類 Arrays 在JDK 8之后新增的并行排序方法(parallelSort)就運用了 ForkJoinPool 的特性,還有 ConcurrentHashMap 在JDK 8之后添加的函數(shù)式方法(如forEach等)也有運用吻氧。在整個JUC框架中溺忧,F(xiàn)orkJoinPool 相對其他類會復(fù)雜很多。

創(chuàng)建 ForkJoinPool 對象

ForkJoinPool作者Doug Lea 在ForkJoinPool主類的注釋說明中盯孙,有這樣一句話:

A static commonPool() is available and appropriate for most applications. The common pool is used by any ForkJoinTask that is not explicitly submitted to a specified pool.
Using the common pool normally reduces resource usage (its threads are slowly reclaimed during periods of non-use, and reinstated upon subsequent use).

以上描述大致的中文解釋是:ForkJoinPools類有一個靜態(tài)方法commonPool()鲁森,這個靜態(tài)方法所獲得的ForkJoinPools實例是由整個應(yīng)用進(jìn)程共享的,并且它適合絕大多數(shù)的應(yīng)用系統(tǒng)場景镀梭。使用commonPool通车渡可以幫助應(yīng)用程序中多種需要進(jìn)行歸并計算的任務(wù)共享計算資源踱启,從而使后者發(fā)揮最大作用(ForkJoinPools中的工作線程在閑置時會被緩慢回收报账,并在隨后需要使用時被恢復(fù)),而這種獲取ForkJoinPools實例的方式埠偿,才是Doug Lea推薦的使用方式透罢。代碼如下:

ForkJoinPool commonPool =  ForkJoinPool.commonPool();

我們可以看到:

    /**
     * Returns the common pool instance. This pool is statically
     * constructed; its run state is unaffected by attempts to {@link
     * #shutdown} or {@link #shutdownNow}. However this pool and any
     * ongoing processing are automatically terminated upon program
     * {@link System#exit}.  Any program that relies on asynchronous
     * task processing to complete before program termination should
     * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
     * before exit.
     *
     * @return the common pool instance
     * @since 1.8
     */
    public static ForkJoinPool commonPool() {
        // assert common != null : "static init error";
        return common;
    }

其中,common 是:

    /**
     * Common (static) pool. Non-null for public use unless a static
     * construction exception, but internal usages null-check on use
     * to paranoically [偏執(zhí)地] avoid potential initialization circularities
     * as well as to simplify generated code.
     */
    static final ForkJoinPool common;

而這個 common 成員變量是在 ForkJoinPool 源代碼冠蒋, 用一段靜態(tài)代碼初始化的:

static {
        // initialize field offsets for CAS etc
        try {
            U = sun.misc.Unsafe.getUnsafe();
            Class<?> k = ForkJoinPool.class;
            CTL = U.objectFieldOffset
                (k.getDeclaredField("ctl"));
            RUNSTATE = U.objectFieldOffset
                (k.getDeclaredField("runState"));
            STEALCOUNTER = U.objectFieldOffset
                (k.getDeclaredField("stealCounter"));
            Class<?> tk = Thread.class;
            PARKBLOCKER = U.objectFieldOffset
                (tk.getDeclaredField("parkBlocker"));
            Class<?> wk = WorkQueue.class;
            QTOP = U.objectFieldOffset
                (wk.getDeclaredField("top"));
            QLOCK = U.objectFieldOffset
                (wk.getDeclaredField("qlock"));
            QSCANSTATE = U.objectFieldOffset
                (wk.getDeclaredField("scanState"));
            QPARKER = U.objectFieldOffset
                (wk.getDeclaredField("parker"));
            QCURRENTSTEAL = U.objectFieldOffset
                (wk.getDeclaredField("currentSteal"));
            QCURRENTJOIN = U.objectFieldOffset
                (wk.getDeclaredField("currentJoin"));
            Class<?> ak = ForkJoinTask[].class;
            ABASE = U.arrayBaseOffset(ak);
            int scale = U.arrayIndexScale(ak);
            if ((scale & (scale - 1)) != 0)
                throw new Error("data type scale not a power of two");
            ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
        } catch (Exception e) {
            throw new Error(e);
        }

        commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
        defaultForkJoinWorkerThreadFactory =
            new DefaultForkJoinWorkerThreadFactory();
        modifyThreadPermission = new RuntimePermission("modifyThread");

        common = java.security.AccessController.doPrivileged
            (new java.security.PrivilegedAction<ForkJoinPool>() {
                public ForkJoinPool run() { return makeCommonPool(); }});
        int par = common.config & SMASK; // report 1 even if threads disabled
        commonParallelism = par > 0 ? par : 1;
    }

    /**
     * Creates and returns the common pool, respecting user settings
     * specified via system properties.
     */
    private static ForkJoinPool makeCommonPool() {
        int parallelism = -1;
        ForkJoinWorkerThreadFactory factory = null;
        UncaughtExceptionHandler handler = null;
        try {  // ignore exceptions in accessing/parsing properties
            String pp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.parallelism");
            String fp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.threadFactory");
            String hp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
            if (pp != null)
                parallelism = Integer.parseInt(pp);
            if (fp != null)
                factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
                           getSystemClassLoader().loadClass(fp).newInstance());
            if (hp != null)
                handler = ((UncaughtExceptionHandler)ClassLoader.
                           getSystemClassLoader().loadClass(hp).newInstance());
        } catch (Exception ignore) {
        }
        if (factory == null) {
            if (System.getSecurityManager() == null)
                factory = defaultForkJoinWorkerThreadFactory;
            else // use security-managed default
                factory = new InnocuousForkJoinWorkerThreadFactory();
        }
        if (parallelism < 0 && // default 1 less than #cores 處理器核數(shù)
            (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
            parallelism = 1;
        if (parallelism > MAX_CAP)
            parallelism = MAX_CAP;
        return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
                                "ForkJoinPool.commonPool-worker-");
    }

工作線程和工作隊列

工作線程

ForkJoinWorkerThread是運行在ForkJoinPool中的線程羽圃,它內(nèi)部會維護(hù)一個存放ForkJoinTask的WorkQueue隊列,而WorkQueue是ForkJoinPool的內(nèi)部類抖剿。

一個WorkQueue工作隊列所屬的歸并計算工作線程ForkJoinWorkerThread朽寞,用來執(zhí)行 ForkJoinTask 。 注意斩郎,工作隊列也可能不屬于任何工作線程脑融。

/**
 * A thread managed by a {@link ForkJoinPool}, which executes
 * {@link ForkJoinTask}s.
 * This class is subclassable solely for the sake of adding
 * functionality -- there are no overridable methods dealing with
 * scheduling or execution.  However, you can override initialization
 * and termination methods surrounding the main task processing loop.
 * If you do create such a subclass, you will also need to supply a
 * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to
 * {@linkplain ForkJoinPool#ForkJoinPool use it} in a {@code ForkJoinPool}.
 *
 * @since 1.7
 * @author Doug Lea
 */
public class ForkJoinWorkerThread extends Thread {
    /*
     * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
     * ForkJoinTasks. For explanation, see the internal documentation
     * of class ForkJoinPool.
     *
     * This class just maintains links to its pool and WorkQueue.  The
     * pool field is set immediately upon construction, but the
     * workQueue field is not set until a call to registerWorker
     * completes. This leads to a visibility race, that is tolerated
     * by requiring that the workQueue field is only accessed by the
     * owning thread.
     *
     * Support for (non-public) subclass InnocuousForkJoinWorkerThread
     * requires that we break quite a lot of encapsulation (via Unsafe)
     * both here and in the subclass to access and set Thread fields.
     */

    final ForkJoinPool pool;                // the pool this thread works in
    final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics
}

工作隊列(WorkQueue)

為啥要雙端隊列呢?因為ForkJoinPool有一個機制缩宜,當(dāng)某個工作線程對應(yīng)消費的任務(wù)隊列空閑的時候它會去別的忙的任務(wù)隊列的尾部分擔(dān)(stealing)任務(wù)過來執(zhí)行(好伙伴啊)肘迎。然后那個忙的任務(wù)隊列還是頭部出任務(wù)給它對應(yīng)的工作線程消費甥温。這樣雙端就井然有序,不會有任務(wù)爭搶的情況妓布∫鲵荆看,這就是樸素的大師級的設(shè)計思想啊匣沼。

以下代碼片段示例了WorkQueue類中定義的一些重要屬性:

static final class WorkQueue {
    ......
    // 隊列狀態(tài)
    volatile int qlock;        // 1: locked, < 0: terminate; else 0
    // 下一個出隊元素的索引位(主要是為線程竊取準(zhǔn)備的索引位置)
    volatile int base;         // index of next slot for poll
    // 下一個入隊元素準(zhǔn)備的索引位
    int top;                   // index of next slot for push
    // 隊列中使用數(shù)組存儲元素
    ForkJoinTask<?>[] array;   // the elements (initially unallocated)
    // 隊列所屬的ForkJoinPool(可能為空)
    // 注意狰挡,一個ForkJoinPool中會有多個執(zhí)行線程,還會有比執(zhí)行線程更多的(或一樣多的)隊列
    final ForkJoinPool pool;   // the containing pool (may be null)
    // 這個隊列所屬的歸并計算工作線程肛著。注意圆兵,工作隊列也可能不屬于任何工作線程
    final ForkJoinWorkerThread owner; // owning thread or null if shared
    // 記錄當(dāng)前正在進(jìn)行join等待的其它任務(wù)
    volatile ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin
    // 當(dāng)前正在偷取的任務(wù)
    volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer
    ......
}

入隊

當(dāng)工作隊列的 owner ,也就是ForkJoinWorkerThread 需要向雙端隊列中放入一個新的待執(zhí)行子任務(wù)時枢贿,會調(diào)用WorkQueue中的push方法殉农。

        /**
         * Pushes a task. Call only by owner in unshared queues.  (The
         * shared-queue version is embedded in method externalPush.)
         *
         * @param task the task. Caller must ensure non-null.
         * @throws RejectedExecutionException if array cannot be resized
         */
        final void push(ForkJoinTask<?> task) {
            ForkJoinTask<?>[] a; ForkJoinPool p;
// base: 下一個出隊元素的索引位(主要是為線程竊取準(zhǔn)備的索引位置)
// top: 下一個入隊元素準(zhǔn)備的索引位
            int b = base, s = top, n;
            if ((a = array) != null) {    // ignore if queue removed
                int m = a.length - 1;     // fenced write for task visibility
// 在指定的對象 ForkJoinTask<?>[] a 中,指定的內(nèi)存偏移量 ((m & s) << ASHIFT) + ABASE 的位置局荚,賦予一個新的元素
                U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
// 將workQueue對象本身中的top標(biāo)示的位置 + 1
                U.putOrderedInt(this, QTOP, s + 1);
                if ((n = s - b) <= 1) {
                    if ((p = pool) != null)
                        p.signalWork(p.workQueues, this);
                }
                else if (n >= m)
                    growArray();
            }
        }

這里使用Unsafe.putOrderedObject方法超凳,這個方法在對低延遲代碼是很有用的,它能夠?qū)崿F(xiàn)非堵塞的寫入耀态,這些寫入不會被Java的JIT重新排序指令(instruction reordering)轮傍,
這樣它使用快速的存儲-存儲(store-store) barrier,
而不是較慢的存儲-加載(store-load) barrier, (用在volatile的寫操作上)
這種性能提升是有代價的,雖然便宜首装,也就是寫后結(jié)果并不會被其他線程看到创夜,甚至是自己的線程,通常是幾納秒后被其他線程看到仙逻,這個時間比較短驰吓,所以代價可以忍受。

類似Unsafe.putOrderedObject還有unsafe.putOrderedLong等方法系奉,unsafe.putOrderedLong比使用 volatile long要快3倍左右檬贰。

sun.misc.Unsafe操作類直接基于操作系統(tǒng)控制層在硬件層面上進(jìn)行原子操作,它是ForkJoinPool高效性能的一大保證缺亮,類似的編程思路還體現(xiàn)在java.util.concurrent包中相當(dāng)規(guī)模的類功能實現(xiàn)中翁涤。實際上sun.misc.Unsafe操作類在Java中有著舉足輕重的地位,例如基于這個類實現(xiàn)的Java樂觀鎖機制萌踱。

出隊

當(dāng)ForkJoinWorkerThread需要從雙端隊列中取出下一個待執(zhí)行子任務(wù)葵礼,就會根據(jù)設(shè)定的asyncMode調(diào)用雙端隊列的不同方法,代碼概要如下所示:


    /**
     * If the current thread is operating in a ForkJoinPool,
     * unschedules and returns, without executing, the next task
     * queued by the current thread but not yet executed, if one is
     * available, or if not available, a task that was forked by some
     * other thread, if available. Availability may be transient, so a
     * {@code null} result does not necessarily imply quiescence of
     * the pool this task is operating in.  This method is designed
     * primarily to support extensions, and is unlikely to be useful
     * otherwise.
     *
     * @return a task, or {@code null} if none are available
     */
    protected static ForkJoinTask<?> pollTask() {
        Thread t; ForkJoinWorkerThread wt;
        return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (wt = (ForkJoinWorkerThread)t).pool.nextTaskFor(wt.workQueue) :
            null;
    }

    /**
     * Gets and removes a local or stolen task for the given worker.
     *
     * @return a task, if available
     */
    final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
        for (ForkJoinTask<?> t;;) {
            WorkQueue q; int b;
            if ((t = w.nextLocalTask()) != null)
                return t;
            if ((q = findNonEmptyStealQueue()) == null)
                return null;
            if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)
                return t;
        }
    }


        /**
         * Takes next task, if one exists, in order specified by mode.
         */
        final ForkJoinTask<?> nextLocalTask() {
            return (config & FIFO_QUEUE) == 0 ? pop() : poll();
        }

    // Specialized scanning

    /**
     * Returns a (probably) non-empty steal queue, if one is found
     * during a scan, else null.  This method must be retried by
     * caller if, by the time it tries to use the queue, it is empty.
     */
    private WorkQueue findNonEmptyStealQueue() {
        WorkQueue[] ws; int m;  // one-shot version of scan loop
        int r = ThreadLocalRandom.nextSecondarySeed();
        if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) {
            for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
                WorkQueue q; int b;
                if ((q = ws[k]) != null) {
                    if ((b = q.base) - q.top < 0)
                        return q;
                    checkSum += b;
                }
                if ((k = (k + 1) & m) == origin) {
                    if (oldSum == (oldSum = checkSum))
                        break;
                    checkSum = 0;
                }
            }
        }
        return null;
    }

工作竊取算法(work-stealing)

ForkJoinPool 的核心特性是它使用了work-stealing(工作竊炔⑼摇)算法:線程池內(nèi)的所有工作線程都嘗試找到并執(zhí)行已經(jīng)提交的任務(wù)鸳粉,或者是被其他活動任務(wù)創(chuàng)建的子任務(wù)(如果不存在就阻塞等待)。

這種特性使得 ForkJoinPool 在運行多個可以產(chǎn)生子任務(wù)的任務(wù)能真,或者是提交的許多小任務(wù)時效率更高赁严。尤其是構(gòu)建異步模型的 ForkJoinPool 時扰柠,對不需要合并(join)的事件類型任務(wù)也非常適用。

在 ForkJoinPool 中疼约,線程池中每個工作線程(ForkJoinWorkerThread)都對應(yīng)一個任務(wù)隊列(WorkQueue)卤档,工作線程優(yōu)先處理來自自身隊列的任務(wù)(LIFO或FIFO順序,參數(shù) mode 決定)程剥,然后以FIFO的順序隨機竊取其他隊列中的任務(wù)劝枣。


ForkJoinPool 中的任務(wù)分為兩種:一種是本地提交的任務(wù)(Submission task,如 execute织鲸、submit 提交的任務(wù))舔腾;另外一種是 fork 出的子任務(wù)(Worker task)。兩種任務(wù)都會存放在 WorkQueue 數(shù)組中搂擦,但是這兩種任務(wù)并不會混合在同一個隊列里稳诚,F(xiàn)orkJoinPool 內(nèi)部使用了一種隨機哈希算法(有點類似 ConcurrentHashMap 的桶隨機算法)將工作隊列與對應(yīng)的工作線程關(guān)聯(lián)起來,Submission 任務(wù)存放在 WorkQueue 數(shù)組的偶數(shù)索引位置瀑踢,Worker 任務(wù)存放在奇數(shù)索引位扳还。實質(zhì)上,Submission 與 Worker 一樣橱夭,只不過他它們被限制只能執(zhí)行它們提交的本地任務(wù)氨距,在后面的源碼解析中,我們統(tǒng)一稱之為“Worker”棘劣。
任務(wù)的分布情況如下圖:

兩種策略:
Helping-幫助運行:如果偷取還未開始俏让,為這些 joiner 安排一些它可以執(zhí)行的其它任務(wù)。
Compensating-補償運行:如果沒有足夠的活動線程茬暇,tryCompensate()可能創(chuàng)建或重新激活一個備用的線程來為被阻塞的 joiner 補償運行首昔。
第三種形式(在方法 tryRemoveAndExec 中實現(xiàn))相當(dāng)于幫助一個假想的補償線程來運行任務(wù):如果補償線程竊取并執(zhí)行的是被join的任務(wù),那么 join 線程不需要補償線程就可以直接執(zhí)行它(盡管犧牲了更大的運行時堆棧,但這種權(quán)衡通常是值得的)而钞。設(shè)想一下這種互相幫助的場景:補償線程幫助 join 線程執(zhí)行任務(wù)沙廉,反過來 join 線程也會幫助補償線程執(zhí)行任務(wù)拘荡。

helpStealer(補償執(zhí)行)使用了一種“線性幫助(linear helping)”的算法臼节。每個工作線程都記錄了最近一個從其他工作隊列(或 submission 隊列)偷取過來的任務(wù)("currentSteal"引用),同樣也記錄了當(dāng)前被 join 的任務(wù)(currentJoin 引用)珊皿。helpStealer 方法使用這些標(biāo)記去嘗試找到偷取者并幫助它執(zhí)行任務(wù)网缝,(也就是說,從偷取任務(wù)中拿到任務(wù)并執(zhí)行蟋定,“偷取者偷我的任務(wù)執(zhí)行粉臊,我去偷偷取者的任務(wù)執(zhí)行”),這樣就可以加速任務(wù)的執(zhí)行驶兜。這種算法在 ForkJoinPool 中的大概實現(xiàn)方式如下:

從 worker 到 steal 之間我們只保存依賴關(guān)系扼仲,而不是記錄每個 steal 任務(wù)远寸。有時可能需要對 workQueues 進(jìn)行線性掃描來定位偷取者,但是一般不需要屠凶,因為偷取者在偷取任務(wù)時會把他的索引存放在在 hint 引用里驰后。一個 worker 可能進(jìn)行了多個偷取操作,但只記錄了其中一個偷取者的索引(通常是最近的那個)矗愧,為了節(jié)省開銷灶芝,hint 在需要時才會記錄。
它是相對“淺層的”唉韭,忽略了嵌套和可能發(fā)生的循環(huán)相互偷取夜涕。
"currentJoin"引用只有在 join 的時候被更新,這意味著我們在執(zhí)行生命周期比較長的任務(wù)時會丟失鏈接属愤,導(dǎo)致GC停轉(zhuǎn)(在這種情況下利用阻塞通常是一個好的方案)女器。
我們使用 checksum 限制查找任務(wù)的次數(shù),然后掛起工作線程住诸,必要時使用其他工作線程替換它晓避。
注意:CountedCompleter 的幫助動作不需要追蹤"currentJoin":helpComplete 方法會獲取并執(zhí)行在同一個父節(jié)點下等待的所有任務(wù)。不過只壳,這仍然需要對 completer 的鏈表進(jìn)行遍歷俏拱,所以使用 CountedCompleters 相對于直接定位"currentJoin"效率要低。

補償執(zhí)行的目的不在于在任何給定的時間內(nèi)保持未阻塞線程的目標(biāo)并行數(shù)吼句。這個類之前的版本為所有阻塞的join任務(wù)都提供即時補償锅必,然而,在實踐中惕艳,絕大多數(shù)阻塞都是由GC和其他JVM或OS活動產(chǎn)生的瞬時的附帶影響搞隐,這種情況下使用補償線程替換工作線程會使情況變得更糟。現(xiàn)在远搪,通過檢查 WorkQueue.scanState 的狀態(tài)確認(rèn)所有活動線程都正在運行劣纲,然后使用補償操作消除多余的活躍線程數(shù)。補償操作通常情況下是被忽略運行的(容忍少數(shù)線程)谁鳍,因為它帶來的利益很少:當(dāng)一個有著空等待隊列的工作線程在 join 時阻塞癞季,它仍然會有足夠的線程來保證活性,所以不需要進(jìn)行補償倘潜。補償機制可能是有界限的绷柒。commonPool的界限(commonMaxSpares)使 JVM 在資源耗盡之前能更好的處理程序錯誤和資源濫用。用戶可能通過自定義工廠來限制線程的構(gòu)造涮因,所以界限的作用在這種 pool 中是不精確的废睦。當(dāng)線程撤銷(deregister)時,工作線程的總數(shù)會隨之減少养泡,不用等到他們退出并且資源被 JVM 和 OS 回收時才減少工作線程數(shù)嗜湃,所以活躍線程在此瞬間可能會超過界限奈应。

簡單應(yīng)用

這樣分治思想用遞歸實現(xiàn)的經(jīng)典案例就是斐波那契數(shù)列了。

斐波那契數(shù)列:1购披、1钥组、2、3今瀑、5程梦、8、13橘荠、21屿附、34、……
公式 :F(1)=1哥童,F(xiàn)(2)=1, F(n)=F(n-1)+F(n-2)(n>=3挺份,n∈N*)

源代碼:

package i

import java.util.concurrent.ForkJoinPool
import java.util.concurrent.ForkJoinTask
import java.util.concurrent.RecursiveTask

/**
 * @author: Jack
 * 2020-05-03 16:08
 * 斐波那契數(shù)列 計算第20個斐波那契數(shù)列
 * 以遞歸的方法定義:F(1)=1,F(xiàn)(2)=1, F(n)=F(n-1)+F(n-2)(n>=2贮懈,n∈N*)
 */


class FibonacciComputation(var n: Int) : RecursiveTask<Long>() {
    override fun compute(): Long {
        return when {
            n <= 1 -> 1L
            n == 2 -> 1L
            else -> {
                val f1 = FibonacciComputation(n - 1)
                val f2 = FibonacciComputation(n - 2)
                ForkJoinTask.invokeAll(f1, f2)
                Thread.sleep(10)
                f1.join() + f2.join()
            }
        }
    }
}

fun main() {

    val n = 10

    run {
        val pool = ForkJoinPool.commonPool()

        val s = System.currentTimeMillis()
        val fn = pool.invoke(FibonacciComputation(n))
        val t = System.currentTimeMillis()

        println(pool)
        println("fn=$fn")
        println("Time=${t - s}ms")
    }

    run {
        val s = System.currentTimeMillis()
        val fn = fib(n)
        val t = System.currentTimeMillis()
        println("fn=$fn")
        println("Time=${t - s}ms")
    }

}


fun fib(n: Int): Long {
    return when {
        n <= 1 -> 1L
        n == 2 -> 1L
        else -> {
            val f1 = fib(n - 1)
            val f2 = fib(n - 2)
            // 為了模擬計算密集型任務(wù),我們在這里sleep 10ms
            Thread.sleep(10)
            f2 + f1
        }
    }
}

運行測試的結(jié)果:

java.util.concurrent.ForkJoinPool@4b1210ee[Running, parallelism = 11, size = 11, active = 0, running = 0, steals = 24, tasks = 0, submissions = 0]
fn=55
Time=116ms
fn=55
Time=603ms

可以發(fā)現(xiàn)在有CPU密集計算任務(wù)的場景的時候匀泊,并行計算框架FJ表現(xiàn)的性能非常棒。

機器環(huán)境:

硬件概覽:

  型號名稱: MacBook Pro
  型號標(biāo)識符:    MacBookPro15,1
  處理器名稱:    Intel Core i7
  處理器速度:    2.6 GHz
  處理器數(shù)目:    1
  核總數(shù):  6
  L2 緩存(每個核):   256 KB
  L3 緩存:    12 MB
  超線程技術(shù):    已啟用
  內(nèi)存:   16 GB
  Boot ROM 版本:  220.260.171.0.0 (iBridge: 16.16.5200.0.0,0)
  序列號(系統(tǒng)):  C02Z43JXLVCF
  硬件 UUID:  FAAEE2DB-8F7C-54B1-A0B7-F286C27EA35F

另外朵你,我們再舉一個計算大量元素數(shù)組元素的例子:

package i

import java.util.concurrent.ForkJoinPool
import java.util.concurrent.ForkJoinTask
import java.util.concurrent.RecursiveTask
import java.util.stream.LongStream

/**
 * @author: Jack
 * 2020-05-03 16:08
 * 斐波那契數(shù)列 計算第20個斐波那契數(shù)列
 * 以遞歸的方法定義:F(1)=1各聘,F(xiàn)(2)=1, F(n)=F(n-1)+F(n-2)(n>=2,n∈N*)
 */


class Calculator(var numbers: LongArray, var start: Int, var end: Int) : RecursiveTask<Long>() {
    override fun compute(): Long {
        // 當(dāng)需要計算的數(shù)字個數(shù)小于6時抡医,直接采用for loop方式計算結(jié)果
        if (end - start < 6) {
            var sum = 0L
            for (i in start..end) {
                sum += numbers[i]
                Thread.sleep(10)
            }
            return sum
        }

        // 把任務(wù)一分為二躲因,遞歸拆分(注意此處有遞歸)到底拆分成多少分 需要根據(jù)具體情況而
        val middle = (start + end) / 2
        val left = Calculator(numbers, start, middle)
        val right = Calculator(numbers, middle + 1, end)
        ForkJoinTask.invokeAll(left, right)
        // 為了模擬計算密集型任務(wù),我們在這里sleep 10ms
        return left.join() + right.join()
    }
}

fun sum(numbers: LongArray): Long {
    var sum = 0L
    for (i in numbers) {
        sum += i
        // 為了模擬計算密集型任務(wù),我們在這里sleep 10ms
        Thread.sleep(10)
    }
    return sum
}


fun main() {

    val numbers = LongStream.rangeClosed(1, 100).toArray()

    run {
        val calculator = Calculator(numbers, 0, numbers.size - 1)
        val pool = ForkJoinPool.commonPool()

        val s = System.currentTimeMillis()
        val fn = pool.invoke(calculator)
        val t = System.currentTimeMillis()

        println(pool)
        println("fn=$fn")
        println("Time=${t - s}ms")
    }

    run {
        val s = System.currentTimeMillis()
        val fn = sum(numbers)
        val t = System.currentTimeMillis()

        println("fn=$fn")
        println("Time=${t - s}ms")
    }

}

運行結(jié)果:

java.util.concurrent.ForkJoinPool@3941a79c[Running, parallelism = 11, size = 11, active = 0, running = 0, steals = 17, tasks = 0, submissions = 0]
fn=5050
Time=137ms
fn=5050
Time=1169ms

參考鏈接

http://www.reibang.com/p/32a15ef2f1bf
http://www.reibang.com/p/9ca3a95cb61a
https://blog.csdn.net/tyrroo/article/details/81483608
http://www.reibang.com/p/32a15ef2f1bf
http://gee.cs.oswego.edu/dl/papers/fj.pdf
https://segmentfault.com/a/1190000019635250


Kotlin開發(fā)者社區(qū)

專注分享 Java、 Kotlin忌傻、Spring/Spring Boot大脉、MySQL、redis水孩、neo4j镰矿、NoSQL、Android俘种、JavaScript秤标、React、Node安疗、函數(shù)式編程抛杨、編程思想够委、"高可用荐类,高性能,高實時"大型分布式系統(tǒng)架構(gòu)設(shè)計主題茁帽。

High availability, high performance, high real-time large-scale distributed system architecture design玉罐。

分布式框架:Zookeeper屈嗤、分布式中間件框架等
分布式存儲:GridFS、FastDFS吊输、TFS饶号、MemCache、redis等
分布式數(shù)據(jù)庫:Cobar季蚂、tddl茫船、Amoeba、Mycat
云計算扭屁、大數(shù)據(jù)算谈、AI算法
虛擬化、云原生技術(shù)
分布式計算框架:MapReduce料滥、Hadoop然眼、Storm、Flink等
分布式通信機制:Dubbo葵腹、RPC調(diào)用高每、共享遠(yuǎn)程數(shù)據(jù)、消息隊列等
消息隊列MQ:Kafka践宴、MetaQ鲸匿,RocketMQ
怎樣打造高可用系統(tǒng):基于硬件、軟件中間件阻肩、系統(tǒng)架構(gòu)等一些典型方案的實現(xiàn):HAProxy晒骇、基于Corosync+Pacemaker的高可用集群套件中間件系統(tǒng)
Mycat架構(gòu)分布式演進(jìn)
大數(shù)據(jù)Join背后的難題:數(shù)據(jù)、網(wǎng)絡(luò)磺浙、內(nèi)存和計算能力的矛盾和調(diào)和
Java分布式系統(tǒng)中的高性能難題:AIO洪囤,NIO,Netty還是自己開發(fā)框架撕氧?
高性能事件派發(fā)機制:線程池模型瘤缩、Disruptor模型等等。伦泥。剥啤。

合抱之木,生于毫末不脯;九層之臺府怯,起于壘土;千里之行防楷,始于足下牺丙。不積跬步,無以至千里;不積小流冲簿,無以成江河粟判。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市峦剔,隨后出現(xiàn)的幾起案子档礁,更是在濱河造成了極大的恐慌,老刑警劉巖吝沫,帶你破解...
    沈念sama閱讀 211,265評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件呻澜,死亡現(xiàn)場離奇詭異,居然都是意外死亡惨险,警方通過查閱死者的電腦和手機易迹,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,078評論 2 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來平道,“玉大人睹欲,你說我怎么就攤上這事∫晃荩” “怎么了窘疮?”我有些...
    開封第一講書人閱讀 156,852評論 0 347
  • 文/不壞的土叔 我叫張陵,是天一觀的道長冀墨。 經(jīng)常有香客問我闸衫,道長,這世上最難降的妖魔是什么诽嘉? 我笑而不...
    開封第一講書人閱讀 56,408評論 1 283
  • 正文 為了忘掉前任蔚出,我火速辦了婚禮,結(jié)果婚禮上虫腋,老公的妹妹穿的比我還像新娘骄酗。我一直安慰自己,他們只是感情好悦冀,可當(dāng)我...
    茶點故事閱讀 65,445評論 5 384
  • 文/花漫 我一把揭開白布趋翻。 她就那樣靜靜地躺著,像睡著了一般盒蟆。 火紅的嫁衣襯著肌膚如雪踏烙。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,772評論 1 290
  • 那天历等,我揣著相機與錄音讨惩,去河邊找鬼。 笑死寒屯,一個胖子當(dāng)著我的面吹牛荐捻,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播,決...
    沈念sama閱讀 38,921評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼靴患,長吁一口氣:“原來是場噩夢啊……” “哼仍侥!你這毒婦竟也來了要出?” 一聲冷哼從身側(cè)響起鸳君,我...
    開封第一講書人閱讀 37,688評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎患蹂,沒想到半個月后或颊,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,130評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡传于,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,467評論 2 325
  • 正文 我和宋清朗相戀三年囱挑,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片沼溜。...
    茶點故事閱讀 38,617評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡平挑,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出系草,到底是詐尸還是另有隱情通熄,我是刑警寧澤,帶...
    沈念sama閱讀 34,276評論 4 329
  • 正文 年R本政府宣布找都,位于F島的核電站唇辨,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏能耻。R本人自食惡果不足惜赏枚,卻給世界環(huán)境...
    茶點故事閱讀 39,882評論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望晓猛。 院中可真熱鬧饿幅,春花似錦、人聲如沸戒职。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,740評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽帕涌。三九已至摄凡,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間蚓曼,已是汗流浹背亲澡。 一陣腳步聲響...
    開封第一講書人閱讀 31,967評論 1 265
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留纫版,地道東北人床绪。 一個月前我還...
    沈念sama閱讀 46,315評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像,于是被迫代替她去往敵國和親癞己。 傳聞我的和親對象是個殘疾皇子膀斋,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,486評論 2 348