簡(jiǎn)介
從JDK1.7開(kāi)始叽奥,Java提供Fork/Join框架用于并行執(zhí)行任務(wù)俱两,它的思想就是講一個(gè)大任務(wù)分割成若干小任務(wù)饱狂,最終匯總每個(gè)小任務(wù)的結(jié)果得到這個(gè)大任務(wù)的結(jié)果。
這種思想和MapReduce很像(input --> split --> map --> reduce --> output)
主要有兩步:
- 第一宪彩、任務(wù)切分休讳;
- 第二、結(jié)果合并
它的模型大致是這樣的:線程池中的每個(gè)線程都有自己的工作隊(duì)列(PS:這一點(diǎn)和ThreadPoolExecutor不同尿孔,ThreadPoolExecutor是所有線程公用一個(gè)工作隊(duì)列俊柔,所有線程都從這個(gè)工作隊(duì)列中取任務(wù)),當(dāng)自己隊(duì)列中的任務(wù)都完成以后活合,會(huì)從其它線程的工作隊(duì)列中偷一個(gè)任務(wù)執(zhí)行雏婶,這樣可以充分利用資源。
工作竊劝字浮(work-stealing)
工作竊攘敉怼(work-stealing)算法是指某個(gè)線程從其他隊(duì)列里竊取任務(wù)來(lái)執(zhí)行。工作竊取的運(yùn)行流程圖如下:
那么為什么需要使用工作竊取算法呢告嘲?
假如我們需要做一個(gè)比較大的任務(wù)错维,我們可以把這個(gè)任務(wù)分割為若干互不依賴的子任務(wù),為了減少線程間的競(jìng)爭(zhēng)橄唬,于是把這些子任務(wù)分別放到不同的隊(duì)列里赋焕,并為每個(gè)隊(duì)列創(chuàng)建一個(gè)單獨(dú)的線程來(lái)執(zhí)行隊(duì)列里的任務(wù),線程和隊(duì)列一一對(duì)應(yīng)仰楚,比如A線程負(fù)責(zé)處理A隊(duì)列里的任務(wù)隆判。但是有的線程會(huì)先把自己隊(duì)列里的任務(wù)干完,而其他線程對(duì)應(yīng)的隊(duì)列里還有任務(wù)等待處理缸血。干完活的線程與其等著蜜氨,不如去幫其他線程干活,于是它就去其他線程的隊(duì)列里竊取一個(gè)任務(wù)來(lái)執(zhí)行捎泻。而在這時(shí)它們會(huì)訪問(wèn)同一個(gè)隊(duì)列飒炎,所以為了減少竊取任務(wù)線程和被竊取任務(wù)線程之間的競(jìng)爭(zhēng),通常會(huì)使用雙端隊(duì)列笆豁,被竊取任務(wù)線程永遠(yuǎn)從雙端隊(duì)列的頭部拿任務(wù)執(zhí)行郎汪,而竊取任務(wù)的線程永遠(yuǎn)從雙端隊(duì)列的尾部拿任務(wù)執(zhí)行赤赊。
工作竊取算法的優(yōu)點(diǎn)是充分利用線程進(jìn)行并行計(jì)算,并減少了線程間的競(jìng)爭(zhēng)煞赢,其缺點(diǎn)是在某些情況下還是存在競(jìng)爭(zhēng)抛计,比如雙端隊(duì)列里只有一個(gè)任務(wù)時(shí)。并且消耗了更多的系統(tǒng)資源照筑,比如創(chuàng)建多個(gè)線程和多個(gè)雙端隊(duì)列吹截。
API介紹
ForkJoinPool
An ExecutorService for running ForkJoinTasks.
A ForkJoinPool differs from other kinds of ExecutorService mainly by virtue of employing work-stealing: all threads in the pool attempt to find and execute tasks submitted to the pool and/or created by other active tasks (eventually blocking waiting for work if none exist). This enables efficient processing when most tasks spawn other subtasks (as do most ForkJoinTasks), as well as when many small tasks are submitted to the pool from external clients. Especially when setting asyncMode to true in constructors, ForkJoinPools may also be appropriate for use with event-style tasks that are never joined.
ForkJoinPool與其它的ExecutorService區(qū)別主要在于它使用“工作竊取”:線程池中的所有線程都企圖找到并執(zhí)行提交給線程池的任務(wù)。當(dāng)大量的任務(wù)產(chǎn)生子任務(wù)的時(shí)候凝危,或者同時(shí)當(dāng)有許多小任務(wù)被提交到線程池中的時(shí)候波俄,這種處理是非常高效的。特別的蛾默,當(dāng)在構(gòu)造方法中設(shè)置asyncMode為true的時(shí)候這種處理更加高效懦铺。
ForkJoinTask
ForkJoinTask代表運(yùn)行在ForkJoinPool中的任務(wù)。
主要方法:
- fork() 在當(dāng)前線程運(yùn)行的線程池中安排一個(gè)異步執(zhí)行支鸡。簡(jiǎn)單的理解就是再創(chuàng)建一個(gè)子任務(wù)冬念。
- join() 當(dāng)任務(wù)完成的時(shí)候返回計(jì)算結(jié)果。
- invoke() 開(kāi)始執(zhí)行任務(wù)牧挣,如果必要急前,等待計(jì)算完成。
子類:
- RecursiveAction 一個(gè)遞歸無(wú)結(jié)果的ForkJoinTask(沒(méi)有返回值)
- RecursiveTask 一個(gè)遞歸有結(jié)果的ForkJoinTask(有返回值)
ForkJoinWorkerThread
A thread managed by a ForkJoinPool, which executes ForkJoinTasks.
ForkJoinWorkerThread代表ForkJoinPool線程池中的一個(gè)執(zhí)行任務(wù)的線程瀑构。
類圖
代碼分析
接下來(lái)叔汁,簡(jiǎn)略的看一下關(guān)鍵代碼來(lái)加深對(duì)Fork/Join的理解。
ForkJoinPool
WorkQueue是一個(gè)ForkJoinPool中的內(nèi)部類检碗,它是線程池中線程的工作隊(duì)列的一個(gè)封裝据块,支持任務(wù)竊取。
什么叫線程的任務(wù)竊取呢折剃?就是說(shuō)你和你的一個(gè)伙伴一起吃水果另假,你的那份吃完了,他那份沒(méi)吃完怕犁,那你就偷偷的拿了他的一些水果吃了边篮。存在執(zhí)行2個(gè)任務(wù)的子線程,這里要講成存在A,B兩個(gè)個(gè)WorkQueue在執(zhí)行任務(wù)奏甫,A的任務(wù)執(zhí)行完了戈轿,B的任務(wù)沒(méi)執(zhí)行完,那么A的WorkQueue就從B的WorkQueue的ForkJoinTask數(shù)組中拿走了一部分尾部的任務(wù)來(lái)執(zhí)行阵子,可以合理的提高運(yùn)行和計(jì)算效率思杯。
submit()
可以看到:
- 同樣是提交任務(wù),submit會(huì)返回ForkJoinTask挠进,而execute不會(huì)
- 任務(wù)提交給線程池以后色乾,會(huì)將這個(gè)任務(wù)加入到當(dāng)前提交者的任務(wù)隊(duì)列中誊册。
前面我們說(shuō)過(guò),每個(gè)線程都有一個(gè)WorkQueue暖璧,而WorkQueue中有執(zhí)行任務(wù)的線程(ForkJoinWorkerThread owner)案怯,還有這個(gè)線程需要處理的任務(wù)(ForkJoinTask<?>[] array)。那么這個(gè)新提交的任務(wù)就是加到array中澎办。
ForkJoinWorkerThread
從代碼中我們可以清楚地看到嘲碱,F(xiàn)orkJoinWorkThread持有ForkJoinPool和ForkJoinPool.WorkQueue的引用,以表明該線程屬于哪個(gè)線程池局蚀,它的工作隊(duì)列是哪個(gè)
ForkJoinTask
fork()
可以看到悍汛,如果是ForkJoinWorkerThread運(yùn)行過(guò)程中fork(),則直接加入到它的工作隊(duì)列中至会,否則,重新提交任務(wù)谱俭。
join()和invoke()
可以看到它們都會(huì)等待計(jì)算完成
圖形化處理過(guò)程
下面盜兩張圖
使用示例
批量發(fā)送消息
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;
public class ForkJoinPoolDemo {
class SendMsgTask extends RecursiveAction {
private final int THRESHOLD = 10;
private int start;
private int end;
private List<String> list;
public SendMsgTask(int start, int end, List<String> list) {
this.start = start;
this.end = end;
this.list = list;
}
@Override
protected void compute() {
if ((end - start) <= THRESHOLD) {
for (int i = start; i < end; i++) {
System.out.println(Thread.currentThread().getName() + ": " + list.get(i));
}
}else {
int middle = (start + end) / 2;
invokeAll(new SendMsgTask(start, middle, list), new SendMsgTask(middle, end, list));
}
}
}
public static void main(String[] args) throws InterruptedException {
List<String> list = new ArrayList<>();
for (int i = 0; i < 123; i++) {
list.add(String.valueOf(i+1));
}
ForkJoinPool pool = new ForkJoinPool();
pool.submit(new ForkJoinPoolDemo().new SendMsgTask(0, list.size(), list));
pool.awaitTermination(10, TimeUnit.SECONDS);
pool.shutdown();
}
}
求和
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
public class ForkJoinTaskDemo {
private class SumTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 20;
private int arr[];
private int start;
private int end;
public SumTask(int[] arr, int start, int end) {
this.arr = arr;
this.start = start;
this.end = end;
}
/**
* 小計(jì)
*/
private Integer subtotal() {
Integer sum = 0;
for (int i = start; i < end; i++) {
sum += arr[i];
}
System.out.println(Thread.currentThread().getName() + ": ∑(" + start + "~" + end + ")=" + sum);
return sum;
}
@Override
protected Integer compute() {
if ((end - start) <= THRESHOLD) {
return subtotal();
}else {
int middle = (start + end) / 2;
SumTask left = new SumTask(arr, start, middle);
SumTask right = new SumTask(arr, middle, end);
left.fork();
right.fork();
return left.join() + right.join();
}
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
int[] arr = new int[100];
for (int i = 0; i < 100; i++) {
arr[i] = i + 1;
}
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Integer> result = pool.submit(new ForkJoinTaskDemo().new SumTask(arr, 0, arr.length));
System.out.println("最終計(jì)算結(jié)果: " + result.invoke());
pool.shutdown();
}
}
ForkJoinPool.commonPool-worker-2: ∑(50~62)=678
ForkJoinPool.commonPool-worker-2: ∑(62~75)=897
ForkJoinPool.commonPool-worker-2: ∑(75~87)=978
ForkJoinPool.commonPool-worker-2: ∑(87~100)=1222
ForkJoinPool-1-worker-1: ∑(0~12)=78
ForkJoinPool-1-worker-1: ∑(12~25)=247
ForkJoinPool-1-worker-1: ∑(25~37)=378
ForkJoinPool-1-worker-1: ∑(37~50)=572
ForkJoinPool-1-worker-2: ∑(75~87)=978
ForkJoinPool-1-worker-3: ∑(50~62)=678
ForkJoinPool-1-worker-5: ∑(62~75)=897
ForkJoinPool.commonPool-worker-7: ∑(0~12)=78
ForkJoinPool.commonPool-worker-3: ∑(37~50)=572
ForkJoinPool-1-worker-4: ∑(87~100)=1222
ForkJoinPool.commonPool-worker-2: ∑(25~37)=378
ForkJoinPool.commonPool-worker-5: ∑(12~25)=247
最終計(jì)算結(jié)果: 5050
api文檔中的兩個(gè)示例
import java.util.Arrays;
import java.util.concurrent.*;
public class RecursiveActionDemo {
private static class SortTask extends RecursiveAction {
static final int THRESHOLD = 100;
final long[] array;
final int lo, hi;
public SortTask(long[] array, int lo, int hi) {
this.array = array;
this.lo = lo;
this.hi = hi;
}
public SortTask(long[] array) {
this(array, 0, array.length);
}
public void sortSequentially(int lo, int hi) {
Arrays.sort(array, lo, hi);
}
public void merge(int lo, int mid, int hi) {
long[] buf = Arrays.copyOfRange(array, lo, mid);
for (int i = 0, j = lo, k = mid; i < buf.length; j++) {
array[j] = (k == hi || buf[i] < array[k]) ? buf[i++] : array[k++];
}
}
@Override
protected void compute() {
if (hi - lo < THRESHOLD) {
sortSequentially(lo, hi);
}else {
int mid = (lo + hi) >>> 1;
invokeAll(new SortTask(array, lo, mid), new SortTask(array, mid, hi));
merge(lo, mid, hi);
}
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
long[] array = new long[120];
for (int i = 0; i < array.length; i++) {
array[i] = (long) (Math.random() * 1000);
}
System.out.println(Arrays.toString(array));
ForkJoinPool pool = new ForkJoinPool();
pool.submit(new SortTask(array));
pool.awaitTermination(5, TimeUnit.SECONDS);
pool.shutdown();
}
}
import java.util.concurrent.*;
public class RecursiveTaskDemo {
private static class Fibonacci extends RecursiveTask<Integer> {
final int n;
public Fibonacci(int n) {
this.n = n;
}
@Override
protected Integer compute() {
if (n <= 1) {
return n;
}else {
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
Fibonacci f2 = new Fibonacci(n - 1);
return f2.compute() + f1.join();
}
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
ForkJoinPool pool = new ForkJoinPool();
Future<Integer> future = pool.submit(new Fibonacci(10));
System.out.println(future.get());
pool.shutdown();
}
}
參考
http://gee.cs.oswego.edu/dl/papers/fj.pdf
http://ifeve.com/talk-concurrency-forkjoin/
https://www.cnblogs.com/senlinyang/p/7885964.html
https://blog.csdn.net/u012403290/article/details/70917810