關(guān)鍵詞:divide and conquer algorithm际起,work-stealing,WorkQueue
ForkJoinPool 是什么?
ForkJoinPool 是 JDK 7 中妓局,@author Doug Lea 加入的一個線程池類萌焰。Fork/Join 框架的核心原理就是分治算法(Divide-and-Conquer)和工作竊取算法(work-stealing algorithm)哺眯。
Fork分解任務(wù)成獨立的子任務(wù),用多線程去執(zhí)行這些子任務(wù)扒俯,Join合并子任務(wù)的結(jié)果奶卓。這樣就能使用多線程的方式來執(zhí)行一個任務(wù)一疯。
JDK7引入的Fork/Join有三個核心類:
- ForkJoinPool,執(zhí)行任務(wù)的線程池
- ForkJoinWorkerThread夺姑,執(zhí)行任務(wù)的工作線程
- ForkJoinTask窿撬,一個用于ForkJoinPool的任務(wù)抽象類迎瞧。
ForkJoinPool是框架的核心,不同于其他線程池,它的構(gòu)建不需要提供核心線程數(shù)刹碾,最大線程數(shù),阻塞隊列等旬薯,還增加了未捕獲異常處理器般又,而該處理器會交給工作線程,由該線程處理丐黄,這樣的好處在于當(dāng)一個線程的工作隊列上的某個任務(wù)出現(xiàn)異常時斋配,不至于結(jié)束掉線程,而是讓它繼續(xù)運行隊列上的其他任務(wù)灌闺。它會依托于并行度(或默認(rèn)根據(jù)核數(shù)計算)來決定最大線程數(shù)艰争,它內(nèi)部維護(hù)了WorkQueue數(shù)組ws取代了阻塞隊列,ws中下標(biāo)為奇數(shù)的為工作線程的所屬隊列桂对,偶數(shù)的為共享隊列甩卓,雖然名稱有所區(qū)分,但重要的區(qū)別只有一點:共享隊列不存在工作線程蕉斜。
ForkJoinPool 的狀態(tài)控制變量有:
// runState bits: SHUTDOWN must be negative, others arbitrary powers of two
private static final int RSLOCK = 1;
private static final int RSIGNAL = 1 << 1;
private static final int STARTED = 1 << 2;
private static final int STOP = 1 << 29;
private static final int TERMINATED = 1 << 30;
private static final int SHUTDOWN = 1 << 31;
// Instance fields
volatile long ctl; // main pool control
volatile int runState; // lockable status
final int config; // parallelism, mode
int indexSeed; // to generate worker index
volatile WorkQueue[] workQueues; // main registry
final ForkJoinWorkerThreadFactory factory;
final UncaughtExceptionHandler ueh; // per-worker UEH
final String workerNamePrefix; // to create worker name string
volatile AtomicLong stealCounter; // also used as sync monitor
ForkJoinPool維護(hù)了一個ctl控制信號猛频,前16位表示活躍worker數(shù),33至48位表示worker總數(shù)蛛勉,后32位可以粗略理解用于表示worker等待隊列的棧頂鹿寻。ForkJoinPool利用這個ctl,WorkQueue的scanState和stackPred以及ws的索引算法維護(hù)了一個類似隊列(或者叫棧更貼切一些)的數(shù)據(jù)結(jié)構(gòu)诽凌。每當(dāng)有一個線程偷不到任務(wù)毡熏,就會存放此前的ctl后置標(biāo)記位到pred,并將自己的索引交給ctl作為棧頂侣诵。相應(yīng)的喚醒操作則由棧頂起痢法。相應(yīng)的方法在進(jìn)行嘗試添加worker時,會綜合當(dāng)前是否有阻塞等待任務(wù)的線程杜顺。
當(dāng)所有線程都不能竊取到新的任務(wù)财搁,進(jìn)入等待隊列時,稱之為“靜寂態(tài)”躬络。
ForkJoinPool對全局全狀的修改需要加鎖進(jìn)行尖奔,這些操作如修改ctl(改變棧頂,增刪活躍數(shù)或總數(shù)等),處理ws中的元素提茁,擴容ws淹禾,關(guān)閉線程池,初始化(包含ws的初始化)茴扁,注冊線程入池等铃岔。而這個鎖就是runState,它除了當(dāng)鎖峭火,也間接表示了運行狀態(tài)毁习,相應(yīng)的線程池的SHUTDOWN,STOP,TERMINATED等狀態(tài)均與其相應(yīng)的位有關(guān)。
線程池的并行度保存在config字段的后16位卖丸,config的第17位決定了是FIFO還是LIFO纺且。而這個并行度也通過間接地取反并計入到ctl的前32位,線程池中判斷是否當(dāng)前有活躍的線程坯苹,或者是否已進(jìn)入寂靜態(tài),都是用保存在config的并行度和保存在ctl前32位的活躍數(shù)與并行度的運算結(jié)果進(jìn)行相加摇天,判斷是否會溢出(正數(shù))來決定的粹湃。
ForkJoinPool還提供了補償機制,用于在線程將要阻塞在執(zhí)行過程中前釋放掉一個正在空閑的工作線程或創(chuàng)建一個新的工作線程泉坐,從而保證了并行度为鳄。
因為ForkJoinTask比較復(fù)雜,抽象方法比較多腕让,日常使用時一般不會繼承ForkJoinTask來實現(xiàn)自定義的任務(wù)孤钦,而是繼承ForkJoinTask的兩個子類:
- RecursiveTask:子任務(wù)帶返回結(jié)果時使用
- RecursiveAction:子任務(wù)不帶返回結(jié)果時使用
關(guān)于Fork/Join框架的原理,可參考:Doug Lea的文章:A Java Fork/Join Framework http://gee.cs.oswego.edu/dl/papers/fj.pdf
框架模型
ForkJoinPool 不是為了替代 ExecutorService纯丸,而是它的補充偏形,在某些應(yīng)用場景下性能比 ExecutorService 更好。
ForkJoinPool 主要用于實現(xiàn)“分而治之”的算法觉鼻,特別是分治之后遞歸調(diào)用的函數(shù)俊扭,例如 quick sort 等。
ForkJoinPool 最適合的是計算密集型的任務(wù)坠陈,如果存在 I/O萨惑,線程間同步,sleep() 等會造成線程長時間阻塞的情況時仇矾,最好配合使用 ManagedBlocker庸蔼。
ForkJoinPool 分治算法思想
分治(divide and conquer),也就是把一個復(fù)雜的問題分解成相似的子問題贮匕,然后子問題再分子問題姐仅,直到問題分的很簡單不必再劃分了。然后層層返回子問題的結(jié)果,最終合并返回問題結(jié)果萍嬉。
分治在算法上有很多應(yīng)用乌昔,類似大數(shù)據(jù)的MapReduce,歸并算法壤追、快速排序算法等磕道。JUC中的Fork/Join的并行計算框架類似于單機版的 MapReduce。
Fork/Join 行冰,從字面上我們就可以理解溺蕉,分治的過程分為兩個階段,第一個階段分解任務(wù)(fork)悼做,把任務(wù)分解為一個個小任務(wù)直至小任務(wù)可以簡單的計算返回結(jié)果疯特。
第二階段合并結(jié)果(join),把每個小任務(wù)的結(jié)果合并返回得到最終結(jié)果肛走。而Fork就是分解任務(wù)漓雅,Join就是合并結(jié)果。
Fork/Join框架主要包含兩部分:ForkJoinPool朽色、ForkJoinTask邻吞。
ForkJoinPool就是治理分治任務(wù)的線程池。它和在之前的文章提到ThreadPoolExecutor線程池葫男,共同點都是消費者-生產(chǎn)者模式的實現(xiàn)抱冷,但是有一些不同。
ThreadPoolExecutor的線程池是只有一個任務(wù)隊列的梢褐,而ForkJoinPool有多個任務(wù)隊列旺遮。通過ForkJoinPool的invoke或submit或execute提交任務(wù)的時候會根據(jù)一定規(guī)則分配給不同的任務(wù)隊列,并且任務(wù)隊列的雙端隊列盈咳。
ForkJoinPool 的每個工作線程都維護(hù)著一個工作隊列(WorkQueue)耿眉,這是一個雙端隊列(Deque),里面存放的對象是任務(wù)(ForkJoinTask)鱼响。 其中ForkJoinTask代表一個可以并行跷敬、合并的任務(wù)。ForkJoinTask是一個抽象類热押,它還有兩個抽象子類:RecusiveAction和RecusiveTask西傀。其中RecusiveTask代表有返回值的任務(wù),而RecusiveAction代表沒有返回值的任務(wù)桶癣。
ForkJoinPool 與 ExecutorService
ForkJoinPool是ExecutorService的實現(xiàn)類拥褂,因此是一種特殊的線程池。
使用方法:創(chuàng)建了ForkJoinPool實例之后牙寞,就可以調(diào)用ForkJoinPool的submit(ForkJoinTask task) 或invoke(ForkJoinTask task)方法來執(zhí)行指定任務(wù)了饺鹃。
那么莫秆,使用ThreadPoolExecutor或者ForkJoinPool,會有什么性能的差異呢悔详?
使用ForkJoinPool能夠使用數(shù)量有限的線程來完成非常多的具有父子關(guān)系的任務(wù)镊屎,比如使用4個線程來完成超過200萬個任務(wù)。但是茄螃,使用ThreadPoolExecutor時缝驳,是不可能完成的,因為ThreadPoolExecutor中的Thread無法選擇優(yōu)先執(zhí)行子任務(wù)归苍,需要完成200萬個具有父子關(guān)系的任務(wù)時用狱,也需要200萬個線程,顯然這是不可行的拼弃。
這正是工作竊取模式的優(yōu)點夏伊。
ForkJoinPool is an advanced version of ThreadPoolExecutor with concepts like work-stealing which enable faster and efficient solving of divide and conquer algorithms.
我們常用的數(shù)組工具類 Arrays 在JDK 8之后新增的并行排序方法(parallelSort)就運用了 ForkJoinPool 的特性,還有 ConcurrentHashMap 在JDK 8之后添加的函數(shù)式方法(如forEach等)也有運用吻氧。在整個JUC框架中溺忧,F(xiàn)orkJoinPool 相對其他類會復(fù)雜很多。
創(chuàng)建 ForkJoinPool 對象
ForkJoinPool作者Doug Lea 在ForkJoinPool主類的注釋說明中盯孙,有這樣一句話:
A static commonPool() is available and appropriate for most applications. The common pool is used by any ForkJoinTask that is not explicitly submitted to a specified pool.
Using the common pool normally reduces resource usage (its threads are slowly reclaimed during periods of non-use, and reinstated upon subsequent use).
以上描述大致的中文解釋是:ForkJoinPools類有一個靜態(tài)方法commonPool()鲁森,這個靜態(tài)方法所獲得的ForkJoinPools實例是由整個應(yīng)用進(jìn)程共享的,并且它適合絕大多數(shù)的應(yīng)用系統(tǒng)場景镀梭。使用commonPool通车渡可以幫助應(yīng)用程序中多種需要進(jìn)行歸并計算的任務(wù)共享計算資源踱启,從而使后者發(fā)揮最大作用(ForkJoinPools中的工作線程在閑置時會被緩慢回收报账,并在隨后需要使用時被恢復(fù)),而這種獲取ForkJoinPools實例的方式埠偿,才是Doug Lea推薦的使用方式透罢。代碼如下:
ForkJoinPool commonPool = ForkJoinPool.commonPool();
我們可以看到:
/**
* Returns the common pool instance. This pool is statically
* constructed; its run state is unaffected by attempts to {@link
* #shutdown} or {@link #shutdownNow}. However this pool and any
* ongoing processing are automatically terminated upon program
* {@link System#exit}. Any program that relies on asynchronous
* task processing to complete before program termination should
* invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
* before exit.
*
* @return the common pool instance
* @since 1.8
*/
public static ForkJoinPool commonPool() {
// assert common != null : "static init error";
return common;
}
其中,common 是:
/**
* Common (static) pool. Non-null for public use unless a static
* construction exception, but internal usages null-check on use
* to paranoically [偏執(zhí)地] avoid potential initialization circularities
* as well as to simplify generated code.
*/
static final ForkJoinPool common;
而這個 common 成員變量是在 ForkJoinPool 源代碼冠蒋, 用一段靜態(tài)代碼初始化的:
static {
// initialize field offsets for CAS etc
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = ForkJoinPool.class;
CTL = U.objectFieldOffset
(k.getDeclaredField("ctl"));
RUNSTATE = U.objectFieldOffset
(k.getDeclaredField("runState"));
STEALCOUNTER = U.objectFieldOffset
(k.getDeclaredField("stealCounter"));
Class<?> tk = Thread.class;
PARKBLOCKER = U.objectFieldOffset
(tk.getDeclaredField("parkBlocker"));
Class<?> wk = WorkQueue.class;
QTOP = U.objectFieldOffset
(wk.getDeclaredField("top"));
QLOCK = U.objectFieldOffset
(wk.getDeclaredField("qlock"));
QSCANSTATE = U.objectFieldOffset
(wk.getDeclaredField("scanState"));
QPARKER = U.objectFieldOffset
(wk.getDeclaredField("parker"));
QCURRENTSTEAL = U.objectFieldOffset
(wk.getDeclaredField("currentSteal"));
QCURRENTJOIN = U.objectFieldOffset
(wk.getDeclaredField("currentJoin"));
Class<?> ak = ForkJoinTask[].class;
ABASE = U.arrayBaseOffset(ak);
int scale = U.arrayIndexScale(ak);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {
throw new Error(e);
}
commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
defaultForkJoinWorkerThreadFactory =
new DefaultForkJoinWorkerThreadFactory();
modifyThreadPermission = new RuntimePermission("modifyThread");
common = java.security.AccessController.doPrivileged
(new java.security.PrivilegedAction<ForkJoinPool>() {
public ForkJoinPool run() { return makeCommonPool(); }});
int par = common.config & SMASK; // report 1 even if threads disabled
commonParallelism = par > 0 ? par : 1;
}
/**
* Creates and returns the common pool, respecting user settings
* specified via system properties.
*/
private static ForkJoinPool makeCommonPool() {
int parallelism = -1;
ForkJoinWorkerThreadFactory factory = null;
UncaughtExceptionHandler handler = null;
try { // ignore exceptions in accessing/parsing properties
String pp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.parallelism");
String fp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.threadFactory");
String hp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
if (pp != null)
parallelism = Integer.parseInt(pp);
if (fp != null)
factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
getSystemClassLoader().loadClass(fp).newInstance());
if (hp != null)
handler = ((UncaughtExceptionHandler)ClassLoader.
getSystemClassLoader().loadClass(hp).newInstance());
} catch (Exception ignore) {
}
if (factory == null) {
if (System.getSecurityManager() == null)
factory = defaultForkJoinWorkerThreadFactory;
else // use security-managed default
factory = new InnocuousForkJoinWorkerThreadFactory();
}
if (parallelism < 0 && // default 1 less than #cores 處理器核數(shù)
(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
parallelism = 1;
if (parallelism > MAX_CAP)
parallelism = MAX_CAP;
return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
"ForkJoinPool.commonPool-worker-");
}
工作線程和工作隊列
工作線程
ForkJoinWorkerThread是運行在ForkJoinPool中的線程羽圃,它內(nèi)部會維護(hù)一個存放ForkJoinTask的WorkQueue隊列,而WorkQueue是ForkJoinPool的內(nèi)部類抖剿。
一個WorkQueue工作隊列所屬的歸并計算工作線程ForkJoinWorkerThread朽寞,用來執(zhí)行 ForkJoinTask 。 注意斩郎,工作隊列也可能不屬于任何工作線程脑融。
/**
* A thread managed by a {@link ForkJoinPool}, which executes
* {@link ForkJoinTask}s.
* This class is subclassable solely for the sake of adding
* functionality -- there are no overridable methods dealing with
* scheduling or execution. However, you can override initialization
* and termination methods surrounding the main task processing loop.
* If you do create such a subclass, you will also need to supply a
* custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to
* {@linkplain ForkJoinPool#ForkJoinPool use it} in a {@code ForkJoinPool}.
*
* @since 1.7
* @author Doug Lea
*/
public class ForkJoinWorkerThread extends Thread {
/*
* ForkJoinWorkerThreads are managed by ForkJoinPools and perform
* ForkJoinTasks. For explanation, see the internal documentation
* of class ForkJoinPool.
*
* This class just maintains links to its pool and WorkQueue. The
* pool field is set immediately upon construction, but the
* workQueue field is not set until a call to registerWorker
* completes. This leads to a visibility race, that is tolerated
* by requiring that the workQueue field is only accessed by the
* owning thread.
*
* Support for (non-public) subclass InnocuousForkJoinWorkerThread
* requires that we break quite a lot of encapsulation (via Unsafe)
* both here and in the subclass to access and set Thread fields.
*/
final ForkJoinPool pool; // the pool this thread works in
final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics
}
工作隊列(WorkQueue)
為啥要雙端隊列呢?因為ForkJoinPool有一個機制缩宜,當(dāng)某個工作線程對應(yīng)消費的任務(wù)隊列空閑的時候它會去別的忙的任務(wù)隊列的尾部分擔(dān)(stealing)任務(wù)過來執(zhí)行(好伙伴啊)肘迎。然后那個忙的任務(wù)隊列還是頭部出任務(wù)給它對應(yīng)的工作線程消費甥温。這樣雙端就井然有序,不會有任務(wù)爭搶的情況妓布∫鲵荆看,這就是樸素的大師級的設(shè)計思想啊匣沼。
以下代碼片段示例了WorkQueue類中定義的一些重要屬性:
static final class WorkQueue {
......
// 隊列狀態(tài)
volatile int qlock; // 1: locked, < 0: terminate; else 0
// 下一個出隊元素的索引位(主要是為線程竊取準(zhǔn)備的索引位置)
volatile int base; // index of next slot for poll
// 下一個入隊元素準(zhǔn)備的索引位
int top; // index of next slot for push
// 隊列中使用數(shù)組存儲元素
ForkJoinTask<?>[] array; // the elements (initially unallocated)
// 隊列所屬的ForkJoinPool(可能為空)
// 注意狰挡,一個ForkJoinPool中會有多個執(zhí)行線程,還會有比執(zhí)行線程更多的(或一樣多的)隊列
final ForkJoinPool pool; // the containing pool (may be null)
// 這個隊列所屬的歸并計算工作線程肛著。注意圆兵,工作隊列也可能不屬于任何工作線程
final ForkJoinWorkerThread owner; // owning thread or null if shared
// 記錄當(dāng)前正在進(jìn)行join等待的其它任務(wù)
volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin
// 當(dāng)前正在偷取的任務(wù)
volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer
......
}
入隊
當(dāng)工作隊列的 owner ,也就是ForkJoinWorkerThread 需要向雙端隊列中放入一個新的待執(zhí)行子任務(wù)時枢贿,會調(diào)用WorkQueue中的push方法殉农。
/**
* Pushes a task. Call only by owner in unshared queues. (The
* shared-queue version is embedded in method externalPush.)
*
* @param task the task. Caller must ensure non-null.
* @throws RejectedExecutionException if array cannot be resized
*/
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
// base: 下一個出隊元素的索引位(主要是為線程竊取準(zhǔn)備的索引位置)
// top: 下一個入隊元素準(zhǔn)備的索引位
int b = base, s = top, n;
if ((a = array) != null) { // ignore if queue removed
int m = a.length - 1; // fenced write for task visibility
// 在指定的對象 ForkJoinTask<?>[] a 中,指定的內(nèi)存偏移量 ((m & s) << ASHIFT) + ABASE 的位置局荚,賦予一個新的元素
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
// 將workQueue對象本身中的top標(biāo)示的位置 + 1
U.putOrderedInt(this, QTOP, s + 1);
if ((n = s - b) <= 1) {
if ((p = pool) != null)
p.signalWork(p.workQueues, this);
}
else if (n >= m)
growArray();
}
}
這里使用Unsafe.putOrderedObject方法超凳,這個方法在對低延遲代碼是很有用的,它能夠?qū)崿F(xiàn)非堵塞的寫入耀态,這些寫入不會被Java的JIT重新排序指令(instruction reordering)轮傍,
這樣它使用快速的存儲-存儲(store-store) barrier,
而不是較慢的存儲-加載(store-load) barrier, (用在volatile的寫操作上)
這種性能提升是有代價的,雖然便宜首装,也就是寫后結(jié)果并不會被其他線程看到创夜,甚至是自己的線程,通常是幾納秒后被其他線程看到仙逻,這個時間比較短驰吓,所以代價可以忍受。
類似Unsafe.putOrderedObject還有unsafe.putOrderedLong等方法系奉,unsafe.putOrderedLong比使用 volatile long要快3倍左右檬贰。
sun.misc.Unsafe操作類直接基于操作系統(tǒng)控制層在硬件層面上進(jìn)行原子操作,它是ForkJoinPool高效性能的一大保證缺亮,類似的編程思路還體現(xiàn)在java.util.concurrent包中相當(dāng)規(guī)模的類功能實現(xiàn)中翁涤。實際上sun.misc.Unsafe操作類在Java中有著舉足輕重的地位,例如基于這個類實現(xiàn)的Java樂觀鎖機制萌踱。
出隊
當(dāng)ForkJoinWorkerThread需要從雙端隊列中取出下一個待執(zhí)行子任務(wù)葵礼,就會根據(jù)設(shè)定的asyncMode調(diào)用雙端隊列的不同方法,代碼概要如下所示:
/**
* If the current thread is operating in a ForkJoinPool,
* unschedules and returns, without executing, the next task
* queued by the current thread but not yet executed, if one is
* available, or if not available, a task that was forked by some
* other thread, if available. Availability may be transient, so a
* {@code null} result does not necessarily imply quiescence of
* the pool this task is operating in. This method is designed
* primarily to support extensions, and is unlikely to be useful
* otherwise.
*
* @return a task, or {@code null} if none are available
*/
protected static ForkJoinTask<?> pollTask() {
Thread t; ForkJoinWorkerThread wt;
return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(wt = (ForkJoinWorkerThread)t).pool.nextTaskFor(wt.workQueue) :
null;
}
/**
* Gets and removes a local or stolen task for the given worker.
*
* @return a task, if available
*/
final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
for (ForkJoinTask<?> t;;) {
WorkQueue q; int b;
if ((t = w.nextLocalTask()) != null)
return t;
if ((q = findNonEmptyStealQueue()) == null)
return null;
if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)
return t;
}
}
/**
* Takes next task, if one exists, in order specified by mode.
*/
final ForkJoinTask<?> nextLocalTask() {
return (config & FIFO_QUEUE) == 0 ? pop() : poll();
}
// Specialized scanning
/**
* Returns a (probably) non-empty steal queue, if one is found
* during a scan, else null. This method must be retried by
* caller if, by the time it tries to use the queue, it is empty.
*/
private WorkQueue findNonEmptyStealQueue() {
WorkQueue[] ws; int m; // one-shot version of scan loop
int r = ThreadLocalRandom.nextSecondarySeed();
if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) {
for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
WorkQueue q; int b;
if ((q = ws[k]) != null) {
if ((b = q.base) - q.top < 0)
return q;
checkSum += b;
}
if ((k = (k + 1) & m) == origin) {
if (oldSum == (oldSum = checkSum))
break;
checkSum = 0;
}
}
}
return null;
}
工作竊取算法(work-stealing)
ForkJoinPool 的核心特性是它使用了work-stealing(工作竊炔⑼摇)算法:線程池內(nèi)的所有工作線程都嘗試找到并執(zhí)行已經(jīng)提交的任務(wù)鸳粉,或者是被其他活動任務(wù)創(chuàng)建的子任務(wù)(如果不存在就阻塞等待)。
這種特性使得 ForkJoinPool 在運行多個可以產(chǎn)生子任務(wù)的任務(wù)能真,或者是提交的許多小任務(wù)時效率更高赁严。尤其是構(gòu)建異步模型的 ForkJoinPool 時扰柠,對不需要合并(join)的事件類型任務(wù)也非常適用。
在 ForkJoinPool 中疼约,線程池中每個工作線程(ForkJoinWorkerThread)都對應(yīng)一個任務(wù)隊列(WorkQueue)卤档,工作線程優(yōu)先處理來自自身隊列的任務(wù)(LIFO或FIFO順序,參數(shù) mode 決定)程剥,然后以FIFO的順序隨機竊取其他隊列中的任務(wù)劝枣。
ForkJoinPool 中的任務(wù)分為兩種:一種是本地提交的任務(wù)(Submission task,如 execute织鲸、submit 提交的任務(wù))舔腾;另外一種是 fork 出的子任務(wù)(Worker task)。兩種任務(wù)都會存放在 WorkQueue 數(shù)組中搂擦,但是這兩種任務(wù)并不會混合在同一個隊列里稳诚,F(xiàn)orkJoinPool 內(nèi)部使用了一種隨機哈希算法(有點類似 ConcurrentHashMap 的桶隨機算法)將工作隊列與對應(yīng)的工作線程關(guān)聯(lián)起來,Submission 任務(wù)存放在 WorkQueue 數(shù)組的偶數(shù)索引位置瀑踢,Worker 任務(wù)存放在奇數(shù)索引位扳还。實質(zhì)上,Submission 與 Worker 一樣橱夭,只不過他它們被限制只能執(zhí)行它們提交的本地任務(wù)氨距,在后面的源碼解析中,我們統(tǒng)一稱之為“Worker”棘劣。
任務(wù)的分布情況如下圖:
兩種策略:
Helping-幫助運行:如果偷取還未開始俏让,為這些 joiner 安排一些它可以執(zhí)行的其它任務(wù)。
Compensating-補償運行:如果沒有足夠的活動線程茬暇,tryCompensate()可能創(chuàng)建或重新激活一個備用的線程來為被阻塞的 joiner 補償運行首昔。
第三種形式(在方法 tryRemoveAndExec 中實現(xiàn))相當(dāng)于幫助一個假想的補償線程來運行任務(wù):如果補償線程竊取并執(zhí)行的是被join的任務(wù),那么 join 線程不需要補償線程就可以直接執(zhí)行它(盡管犧牲了更大的運行時堆棧,但這種權(quán)衡通常是值得的)而钞。設(shè)想一下這種互相幫助的場景:補償線程幫助 join 線程執(zhí)行任務(wù)沙廉,反過來 join 線程也會幫助補償線程執(zhí)行任務(wù)拘荡。
helpStealer(補償執(zhí)行)使用了一種“線性幫助(linear helping)”的算法臼节。每個工作線程都記錄了最近一個從其他工作隊列(或 submission 隊列)偷取過來的任務(wù)("currentSteal"引用),同樣也記錄了當(dāng)前被 join 的任務(wù)(currentJoin 引用)珊皿。helpStealer 方法使用這些標(biāo)記去嘗試找到偷取者并幫助它執(zhí)行任務(wù)网缝,(也就是說,從偷取任務(wù)中拿到任務(wù)并執(zhí)行蟋定,“偷取者偷我的任務(wù)執(zhí)行粉臊,我去偷偷取者的任務(wù)執(zhí)行”),這樣就可以加速任務(wù)的執(zhí)行驶兜。這種算法在 ForkJoinPool 中的大概實現(xiàn)方式如下:
從 worker 到 steal 之間我們只保存依賴關(guān)系扼仲,而不是記錄每個 steal 任務(wù)远寸。有時可能需要對 workQueues 進(jìn)行線性掃描來定位偷取者,但是一般不需要屠凶,因為偷取者在偷取任務(wù)時會把他的索引存放在在 hint 引用里驰后。一個 worker 可能進(jìn)行了多個偷取操作,但只記錄了其中一個偷取者的索引(通常是最近的那個)矗愧,為了節(jié)省開銷灶芝,hint 在需要時才會記錄。
它是相對“淺層的”唉韭,忽略了嵌套和可能發(fā)生的循環(huán)相互偷取夜涕。
"currentJoin"引用只有在 join 的時候被更新,這意味著我們在執(zhí)行生命周期比較長的任務(wù)時會丟失鏈接属愤,導(dǎo)致GC停轉(zhuǎn)(在這種情況下利用阻塞通常是一個好的方案)女器。
我們使用 checksum 限制查找任務(wù)的次數(shù),然后掛起工作線程住诸,必要時使用其他工作線程替換它晓避。
注意:CountedCompleter 的幫助動作不需要追蹤"currentJoin":helpComplete 方法會獲取并執(zhí)行在同一個父節(jié)點下等待的所有任務(wù)。不過只壳,這仍然需要對 completer 的鏈表進(jìn)行遍歷俏拱,所以使用 CountedCompleters 相對于直接定位"currentJoin"效率要低。
補償執(zhí)行的目的不在于在任何給定的時間內(nèi)保持未阻塞線程的目標(biāo)并行數(shù)吼句。這個類之前的版本為所有阻塞的join任務(wù)都提供即時補償锅必,然而,在實踐中惕艳,絕大多數(shù)阻塞都是由GC和其他JVM或OS活動產(chǎn)生的瞬時的附帶影響搞隐,這種情況下使用補償線程替換工作線程會使情況變得更糟。現(xiàn)在远搪,通過檢查 WorkQueue.scanState 的狀態(tài)確認(rèn)所有活動線程都正在運行劣纲,然后使用補償操作消除多余的活躍線程數(shù)。補償操作通常情況下是被忽略運行的(容忍少數(shù)線程)谁鳍,因為它帶來的利益很少:當(dāng)一個有著空等待隊列的工作線程在 join 時阻塞癞季,它仍然會有足夠的線程來保證活性,所以不需要進(jìn)行補償倘潜。補償機制可能是有界限的绷柒。commonPool的界限(commonMaxSpares)使 JVM 在資源耗盡之前能更好的處理程序錯誤和資源濫用。用戶可能通過自定義工廠來限制線程的構(gòu)造涮因,所以界限的作用在這種 pool 中是不精確的废睦。當(dāng)線程撤銷(deregister)時,工作線程的總數(shù)會隨之減少养泡,不用等到他們退出并且資源被 JVM 和 OS 回收時才減少工作線程數(shù)嗜湃,所以活躍線程在此瞬間可能會超過界限奈应。
簡單應(yīng)用
這樣分治思想用遞歸實現(xiàn)的經(jīng)典案例就是斐波那契數(shù)列了。
斐波那契數(shù)列:1购披、1钥组、2、3今瀑、5程梦、8、13橘荠、21屿附、34、……
公式 :F(1)=1哥童,F(xiàn)(2)=1, F(n)=F(n-1)+F(n-2)(n>=3挺份,n∈N*)
源代碼:
package i
import java.util.concurrent.ForkJoinPool
import java.util.concurrent.ForkJoinTask
import java.util.concurrent.RecursiveTask
/**
* @author: Jack
* 2020-05-03 16:08
* 斐波那契數(shù)列 計算第20個斐波那契數(shù)列
* 以遞歸的方法定義:F(1)=1,F(xiàn)(2)=1, F(n)=F(n-1)+F(n-2)(n>=2贮懈,n∈N*)
*/
class FibonacciComputation(var n: Int) : RecursiveTask<Long>() {
override fun compute(): Long {
return when {
n <= 1 -> 1L
n == 2 -> 1L
else -> {
val f1 = FibonacciComputation(n - 1)
val f2 = FibonacciComputation(n - 2)
ForkJoinTask.invokeAll(f1, f2)
Thread.sleep(10)
f1.join() + f2.join()
}
}
}
}
fun main() {
val n = 10
run {
val pool = ForkJoinPool.commonPool()
val s = System.currentTimeMillis()
val fn = pool.invoke(FibonacciComputation(n))
val t = System.currentTimeMillis()
println(pool)
println("fn=$fn")
println("Time=${t - s}ms")
}
run {
val s = System.currentTimeMillis()
val fn = fib(n)
val t = System.currentTimeMillis()
println("fn=$fn")
println("Time=${t - s}ms")
}
}
fun fib(n: Int): Long {
return when {
n <= 1 -> 1L
n == 2 -> 1L
else -> {
val f1 = fib(n - 1)
val f2 = fib(n - 2)
// 為了模擬計算密集型任務(wù),我們在這里sleep 10ms
Thread.sleep(10)
f2 + f1
}
}
}
運行測試的結(jié)果:
java.util.concurrent.ForkJoinPool@4b1210ee[Running, parallelism = 11, size = 11, active = 0, running = 0, steals = 24, tasks = 0, submissions = 0]
fn=55
Time=116ms
fn=55
Time=603ms
可以發(fā)現(xiàn)在有CPU密集計算任務(wù)的場景的時候匀泊,并行計算框架FJ表現(xiàn)的性能非常棒。
機器環(huán)境:
硬件概覽:
型號名稱: MacBook Pro
型號標(biāo)識符: MacBookPro15,1
處理器名稱: Intel Core i7
處理器速度: 2.6 GHz
處理器數(shù)目: 1
核總數(shù): 6
L2 緩存(每個核): 256 KB
L3 緩存: 12 MB
超線程技術(shù): 已啟用
內(nèi)存: 16 GB
Boot ROM 版本: 220.260.171.0.0 (iBridge: 16.16.5200.0.0,0)
序列號(系統(tǒng)): C02Z43JXLVCF
硬件 UUID: FAAEE2DB-8F7C-54B1-A0B7-F286C27EA35F
另外朵你,我們再舉一個計算大量元素數(shù)組元素的例子:
package i
import java.util.concurrent.ForkJoinPool
import java.util.concurrent.ForkJoinTask
import java.util.concurrent.RecursiveTask
import java.util.stream.LongStream
/**
* @author: Jack
* 2020-05-03 16:08
* 斐波那契數(shù)列 計算第20個斐波那契數(shù)列
* 以遞歸的方法定義:F(1)=1各聘,F(xiàn)(2)=1, F(n)=F(n-1)+F(n-2)(n>=2,n∈N*)
*/
class Calculator(var numbers: LongArray, var start: Int, var end: Int) : RecursiveTask<Long>() {
override fun compute(): Long {
// 當(dāng)需要計算的數(shù)字個數(shù)小于6時抡医,直接采用for loop方式計算結(jié)果
if (end - start < 6) {
var sum = 0L
for (i in start..end) {
sum += numbers[i]
Thread.sleep(10)
}
return sum
}
// 把任務(wù)一分為二躲因,遞歸拆分(注意此處有遞歸)到底拆分成多少分 需要根據(jù)具體情況而
val middle = (start + end) / 2
val left = Calculator(numbers, start, middle)
val right = Calculator(numbers, middle + 1, end)
ForkJoinTask.invokeAll(left, right)
// 為了模擬計算密集型任務(wù),我們在這里sleep 10ms
return left.join() + right.join()
}
}
fun sum(numbers: LongArray): Long {
var sum = 0L
for (i in numbers) {
sum += i
// 為了模擬計算密集型任務(wù),我們在這里sleep 10ms
Thread.sleep(10)
}
return sum
}
fun main() {
val numbers = LongStream.rangeClosed(1, 100).toArray()
run {
val calculator = Calculator(numbers, 0, numbers.size - 1)
val pool = ForkJoinPool.commonPool()
val s = System.currentTimeMillis()
val fn = pool.invoke(calculator)
val t = System.currentTimeMillis()
println(pool)
println("fn=$fn")
println("Time=${t - s}ms")
}
run {
val s = System.currentTimeMillis()
val fn = sum(numbers)
val t = System.currentTimeMillis()
println("fn=$fn")
println("Time=${t - s}ms")
}
}
運行結(jié)果:
java.util.concurrent.ForkJoinPool@3941a79c[Running, parallelism = 11, size = 11, active = 0, running = 0, steals = 17, tasks = 0, submissions = 0]
fn=5050
Time=137ms
fn=5050
Time=1169ms
參考鏈接
http://www.reibang.com/p/32a15ef2f1bf
http://www.reibang.com/p/9ca3a95cb61a
https://blog.csdn.net/tyrroo/article/details/81483608
http://www.reibang.com/p/32a15ef2f1bf
http://gee.cs.oswego.edu/dl/papers/fj.pdf
https://segmentfault.com/a/1190000019635250
Kotlin開發(fā)者社區(qū)
專注分享 Java、 Kotlin忌傻、Spring/Spring Boot大脉、MySQL、redis水孩、neo4j镰矿、NoSQL、Android俘种、JavaScript秤标、React、Node安疗、函數(shù)式編程抛杨、編程思想够委、"高可用荐类,高性能,高實時"大型分布式系統(tǒng)架構(gòu)設(shè)計主題茁帽。
High availability, high performance, high real-time large-scale distributed system architecture design玉罐。
分布式框架:Zookeeper屈嗤、分布式中間件框架等
分布式存儲:GridFS、FastDFS吊输、TFS饶号、MemCache、redis等
分布式數(shù)據(jù)庫:Cobar季蚂、tddl茫船、Amoeba、Mycat
云計算扭屁、大數(shù)據(jù)算谈、AI算法
虛擬化、云原生技術(shù)
分布式計算框架:MapReduce料滥、Hadoop然眼、Storm、Flink等
分布式通信機制:Dubbo葵腹、RPC調(diào)用高每、共享遠(yuǎn)程數(shù)據(jù)、消息隊列等
消息隊列MQ:Kafka践宴、MetaQ鲸匿,RocketMQ
怎樣打造高可用系統(tǒng):基于硬件、軟件中間件阻肩、系統(tǒng)架構(gòu)等一些典型方案的實現(xiàn):HAProxy晒骇、基于Corosync+Pacemaker的高可用集群套件中間件系統(tǒng)
Mycat架構(gòu)分布式演進(jìn)
大數(shù)據(jù)Join背后的難題:數(shù)據(jù)、網(wǎng)絡(luò)磺浙、內(nèi)存和計算能力的矛盾和調(diào)和
Java分布式系統(tǒng)中的高性能難題:AIO洪囤,NIO,Netty還是自己開發(fā)框架撕氧?
高性能事件派發(fā)機制:線程池模型瘤缩、Disruptor模型等等。伦泥。剥啤。
合抱之木,生于毫末不脯;九層之臺府怯,起于壘土;千里之行防楷,始于足下牺丙。不積跬步,無以至千里;不積小流冲簿,無以成江河粟判。