什么是Fork/Join框架
Fork/Join框架是一組允許程序員利用多核處理器支持的并行執(zhí)行的API催什。它使用了“分而治之”策略:把非常大的問(wèn)題分成更小的部分,反過(guò)來(lái)茄蚯,小部分又可以進(jìn)一步分成更小的部分形耗,遞歸地直到一個(gè)部分可以直接解決。這被叫做“fork”岁钓。
然后所有部件在多個(gè)處理核心上并行執(zhí)行。每個(gè)部分的結(jié)果被“join”在一起以產(chǎn)生最終結(jié)果微王。因此屡限,框架的名稱是“Fork/Join”。
下面的為代碼展示了分治策略如何與Fork/Join框架一起工作:
if (problemSize < threshold)
solve problem directly
else {
break problem into subproblems
recursively solve each problem
combine the results
}
Fork/Join框架在JDk7中被加入炕倘,并在JDK8中進(jìn)行了改進(jìn)钧大。它用了Java語(yǔ)言中的幾個(gè)新特性,包括并行的Stream API和排序激才。
Fork/Join框架簡(jiǎn)化了并行程序的原因有:
- 它簡(jiǎn)化了線程的創(chuàng)建拓型,在框架中線程是自動(dòng)被創(chuàng)建和管理额嘿。
- 它自動(dòng)使用多個(gè)處理器瘸恼,因此程序可以擴(kuò)展到使用可用處理器。
由于支持真正的并行執(zhí)行册养,F(xiàn)ork/Join框架可以顯著減少計(jì)算時(shí)間东帅,并提高解決圖像處理、視頻處理球拦、大數(shù)據(jù)處理等非常大問(wèn)題的性能靠闭。
關(guān)于Fork/Join框架的一個(gè)有趣的地方是:它使用工作竊取算法來(lái)平衡線程之間的負(fù)載:如果一個(gè)工作線程沒(méi)有事情要做,它可以從其他仍然忙碌的線程竊取任務(wù)坎炼。
理解Fork/Join框架API
Fork/Join框架在java.util.concurrent
包下被實(shí)現(xiàn)愧膀。它的核心有4個(gè)類:
-
ForkJoinTask<V>: 這是一個(gè)抽象任務(wù)類,并且運(yùn)行在
ForkJoinPool
中谣光。 -
ForkJoinPool:這是一個(gè)線程池管理并運(yùn)行眾多
ForkJoinTask
任務(wù)檩淋。 -
RecursiveAction:
ForkJoinTask
的子類,這個(gè)類沒(méi)有返回值萄金。 -
RecursiveTask<V>:
ForkJoinTask
的子類蟀悦,有返回值。
基本上氧敢,我們解決問(wèn)題的代碼是在RecursiveAction
或者RecursiveTask
中進(jìn)行的日戈,然后將任務(wù)提交由ForkJoinPool`執(zhí)行,F(xiàn)orkJoinPool處理從線程管理到多核處理器的利用等各種事務(wù)孙乖。
我們先來(lái)理解一下這些類中的關(guān)鍵方法浙炼。
ForkJoinTask<V>
這是一個(gè)運(yùn)行在ForkJoinPool
中的抽象的任務(wù)類份氧。類型V
指定了任務(wù)的返回結(jié)果。ForkJoinTask是一個(gè)類似線程的實(shí)體弯屈,它表示任務(wù)的輕量級(jí)抽象半火,而不是實(shí)際的執(zhí)行線程。該機(jī)制允許由ForkJoinPool中的少量實(shí)際線程管理大量任務(wù)季俩。其關(guān)鍵方法是:
- final ForkJoinTask<V> fork()
- final V join()
- final V invoke()
fork()
方法提交并執(zhí)行異步任務(wù)钮糖,該方法返回ForkJoinTask
并且調(diào)用線程繼續(xù)運(yùn)行。
join()
方法等待任務(wù)直到返回結(jié)果酌住。
invoke()
方法是組合了fork()
和join()
店归,它開(kāi)始一個(gè)任務(wù)并等待結(jié)束返回結(jié)果。
此外酪我,ForkJoinTask
中還提供了用于一次調(diào)用多個(gè)任務(wù)的兩個(gè)靜態(tài)方法
- static void invokeAll(ForkJoinTask<?> task1, ForkJoinTask<?> task2) :執(zhí)行兩個(gè)任務(wù)
- static void invokeAll(ForkJoinTask<?>… taskList):執(zhí)行任務(wù)集合
RecursiveAction
這是一個(gè)遞歸的ForkJoinTask
子類消痛,不返回結(jié)果。Recursive
意思是任務(wù)可以通過(guò)分治策略分成自己的子任務(wù)(在下面的下一節(jié)中都哭,您將看到如何劃分代碼示例)秩伞。
我們必須重寫(xiě)compute()
方法,并將計(jì)算代碼寫(xiě)在其中:
protected abstract void compute();
RecursiveTask<V>
和RecursiveAction
一樣,但是RecursiveTask
有返回結(jié)果欺矫,結(jié)果類型由V
指定纱新。我們?nèi)匀恍枰貙?xiě)compute()
方法:
protected abstract V compute();
ForkJoinPool
這是Fork/Join框架的核心類。它負(fù)責(zé)線程的管理和ForkJoinTask
的執(zhí)行穆趴,為了執(zhí)行ForkJoinTask
脸爱,首先需要獲取到ForkJoinPool
的實(shí)例。
有兩種構(gòu)造器方式可以獲取ForkJoinPool
的實(shí)例未妹,第一種使用構(gòu)造器創(chuàng)建:
- ForkJoinPool(): 使用默認(rèn)的構(gòu)造器創(chuàng)建實(shí)例簿废,該構(gòu)造器創(chuàng)建出的池與系統(tǒng)中可用的處理器數(shù)量相等。
- ForkJoinPool(int parallelism):該構(gòu)造器指定處理器數(shù)量络它,創(chuàng)建具有自定義并行度級(jí)別的池族檬,該級(jí)別的并行度必須大于0,且不超過(guò)可用處理器的實(shí)際數(shù)量化戳。
并行性的級(jí)別決定了可以并發(fā)執(zhí)行的線程的數(shù)量单料。換句話說(shuō),它決定了可以同時(shí)執(zhí)行的任務(wù)的數(shù)量——但不能超過(guò)處理器的數(shù)量迂烁。
但是看尼,這并不限制池可以管理的任務(wù)的數(shù)量。ForkJoinPool可以管理比其并行級(jí)別多得多的任務(wù)盟步。
獲取ForkJoinPool實(shí)例的第二種方法是使用以下ForkJoinPool的靜態(tài)方法獲取公共池實(shí)例:
public static ForkJoinPool commonPool();
這種方式創(chuàng)建的池不受shutdown()
或者shutdownNow()
方法的影響藏斩,但是他會(huì)在System.exit()
時(shí)會(huì)自動(dòng)中止。任何依賴異步任務(wù)處理的程序在主體程序中止前都應(yīng)該調(diào)用awaitQuiescence()
方法却盘。該方式是靜態(tài)的狰域,可以自動(dòng)被使用媳拴。
在ForkJoinPool中執(zhí)行ForkJoinTasks
在創(chuàng)建好ForkJoinPool實(shí)例之后,可以使用下面的方法執(zhí)行任務(wù):
- <T>T invoke(ForkJoinTask<T> task):執(zhí)行指定任務(wù)并返回結(jié)果,該方法是異步的兆览,調(diào)用的線程會(huì)一直等待直到該方法返回結(jié)果屈溉,對(duì)于RecursiveAction任務(wù)來(lái)說(shuō),參數(shù)類型是Void.
- void execute(ForkJoinTask<?> task):異步執(zhí)行指定的任務(wù)抬探,調(diào)用的線程一直等待知道任務(wù)完成才會(huì)繼續(xù)執(zhí)行子巾。
另外,也可以通過(guò)ForkJoinTask自己擁有的方法fork()
和invoke()
執(zhí)行任務(wù)小压。在這種情況下线梗,如果任務(wù)還沒(méi)在ForkJoinPool中運(yùn)行,那么commonPool()
將會(huì)自動(dòng)被使用怠益。
值得注意的一點(diǎn)是:ForkJoinPool使用的是守護(hù)線程仪搔,當(dāng)所有的用戶線程被終止是它也會(huì)被終止,這意味著可以不必顯示的關(guān)閉ForkPoolJoin(雖然這樣也可以)蜻牢。如果是common pool的情況下烤咧,調(diào)用shutdown
沒(méi)有任何效果,應(yīng)為這個(gè)池總是可用的抢呆。
好了煮嫌,現(xiàn)在來(lái)看看一些例子。
案例
使用RecursiveAction
這里例子中镀娶,看一下如果使用Fork/Join框架去執(zhí)行一個(gè)沒(méi)有返回值的任務(wù)立膛。
假設(shè)要對(duì)一個(gè)很大的數(shù)字?jǐn)?shù)組進(jìn)行變換,為了簡(jiǎn)單簡(jiǎn)單起見(jiàn)梯码,轉(zhuǎn)換只需要將數(shù)組中的每個(gè)元素乘以指定的數(shù)字。下面的代碼用于轉(zhuǎn)換任務(wù):
import java.util.concurrent.*;
public class ArrayTransform extends RecursiveAction {
int[] array;
int number;
int threshold = 100_000;
int start;
int end;
public ArrayTransform(int[] array, int number, int start, int end) {
this.array = array;
this.number = number;
this.start = start;
this.end = end;
}
@Override
protected void compute() {
if (end - start < threshold) {
computeDirectly();
} else {
int middle = (end + start) / 2;
ArrayTransform subTask1 = new ArrayTransform(array, number, start, middle);
ArrayTransform subTask2 = new ArrayTransform(array, number, middle, end);
invokeAll(subTask1, subTask2);
}
}
protected void computeDirectly() {
for (int i = start; i < end; i++) {
array[i] = array[i] * number;
}
}
}
可以看到好啰,這是一個(gè)RecursiveAction的子類轩娶,我們重寫(xiě)了compute()
方法。
數(shù)組和數(shù)字從它的構(gòu)造函數(shù)傳遞框往。參數(shù)start和end指定要處理的數(shù)組中的元素的范圍鳄抒。如果數(shù)組的大小大于閾值,這有助于將數(shù)組拆分為子數(shù)組椰弊,否則直接對(duì)整個(gè)數(shù)組執(zhí)行計(jì)算许溅。
觀察else中的代碼片段:
protected void compute() {
if (end - start < threshold) {
computeDirectly();
} else {
int middle = (end + start) / 2;
ArrayTransform subTask1 = new ArrayTransform(array, number, start, middle);
ArrayTransform subTask2 = new ArrayTransform(array, number, middle, end);
invokeAll(subTask1, subTask2);
}
}
這里,將數(shù)組分成兩個(gè)部分秉版,并分別創(chuàng)建他們的子任務(wù)贤重,反過(guò)來(lái),子任務(wù)也可以遞歸的進(jìn)一步劃分為更小的子任務(wù)清焕,直到其大小小于直接調(diào)用computeDirectly();
方法的的閾值并蝗。
然后祭犯,在main函數(shù)中創(chuàng)建ForkJoinPool執(zhí)行任務(wù):
ArrayTransform mainTask = new ArrayTransform(array, number, 0, SIZE);
ForkJoinPool pool = new ForkJoinPool();
pool.invoke(mainTask);
或者使用common pool執(zhí)行任務(wù):
ArrayTransform mainTask = new ArrayTransform(array, number, 0, SIZE);
mainTask.invoke();
這里是全部的測(cè)試程序:
import java.util.*;
import java.util.concurrent.*;
public class ForkJoinRecursiveActionTest {
static final int SIZE = 10_000_000;
static int[] array = randomArray();
public static void main(String[] args) {
int number = 9;
System.out.println("數(shù)組中的初始元素: ");
print();
ArrayTransform mainTask = new ArrayTransform(array, number, 0, SIZE);
ForkJoinPool pool = new ForkJoinPool();
pool.invoke(mainTask);
System.out.println("并行計(jì)算之后的元素:");
print();
}
static int[] randomArray() {
int[] array = new int[SIZE];
Random random = new Random();
for (int i = 0; i < SIZE; i++) {
array[i] = random.nextInt(100);
}
return array;
}
static void print() {
for (int i = 0; i < 10; i++) {
System.out.print(array[i] + ", ");
}
System.out.println();
}
}
如您所見(jiàn),使用隨機(jī)生成的1,000萬(wàn)個(gè)元素?cái)?shù)組進(jìn)行測(cè)試滚停。由于數(shù)組太大,我們?cè)谟?jì)算前后只打印前10個(gè)元素,看效果如何:
數(shù)組中的初始元素:
42, 98, 43, 14, 9, 92, 33, 18, 18, 76,
并行計(jì)算之后的元素:
378, 882, 387, 126, 81, 828, 297, 162, 162, 684,
使用RecursiveTask
這個(gè)例子中橡卤,展示了如何使用帶有返回值的任務(wù)驻右,下面的任務(wù)計(jì)算在一個(gè)大數(shù)組中出現(xiàn)偶數(shù)的次數(shù):
import java.util.concurrent.*;
public class ArrayCounter extends RecursiveTask<Integer> {
int[] array;
int threshold = 100_000;
int start;
int end;
public ArrayCounter(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
protected Integer compute() {
if (end - start < threshold) {
return computeDirectly();
} else {
int middle = (end + start) / 2;
ArrayCounter subTask1 = new ArrayCounter(array, start, middle);
ArrayCounter subTask2 = new ArrayCounter(array, middle, end);
invokeAll(subTask1, subTask2);
return subTask1.join() + subTask2.join();
}
}
protected Integer computeDirectly() {
Integer count = 0;
for (int i = start; i < end; i++) {
if (array[i] % 2 == 0) {
count++;
}
}
return count;
}
}
如你所見(jiàn),這個(gè)類是RecursiveTask的子類并且重寫(xiě)了compute()
方法起惕,并且返回了一個(gè)整型的結(jié)果檩禾。
這里還使用了join()
方法去合并子任務(wù)的結(jié)果:
return subTask1.join() + subTask2.join();
測(cè)試程序就和RecursiveAction的一樣:
import java.util.*;
import java.util.concurrent.*;
public class ForkJoinRecursiveTaskTest {
static final int SIZE = 10_000_000;
static int[] array = randomArray();
public static void main(String[] args) {
ArrayCounter mainTask = new ArrayCounter(array, 0, SIZE);
ForkJoinPool pool = new ForkJoinPool();
Integer evenNumberCount = pool.invoke(mainTask);
System.out.println("偶數(shù)的個(gè)數(shù): " + evenNumberCount);
}
static int[] randomArray() {
int[] array = new int[SIZE];
Random random = new Random();
for (int i = 0; i < SIZE; i++) {
array[i] = random.nextInt(100);
}
return array;
}
}
運(yùn)行程序就會(huì)看到如下的結(jié)果:
偶數(shù)的個(gè)數(shù): 5000045
并行性試驗(yàn)
這個(gè)例子展示并行性的級(jí)別如何影響計(jì)算時(shí)間:
ArrayCounter
類讓閾值可以通過(guò)構(gòu)造器傳入:
import java.util.concurrent.*;
public class ArrayCounter extends RecursiveTask<Integer> {
int[] array;
int threshold;
int start;
int end;
public ArrayCounter(int[] array, int start, int end, int threshold) {
this.array = array;
this.start = start;
this.end = end;
this.threshold = threshold;
}
protected Integer compute() {
if (end - start < threshold) {
return computeDirectly();
} else {
int middle = (end + start) / 2;
ArrayCounter subTask1 = new ArrayCounter(array, start, middle, threshold);
ArrayCounter subTask2 = new ArrayCounter(array, middle, end, threshold);
invokeAll(subTask1, subTask2);
return subTask1.join() + subTask2.join();
}
}
protected Integer computeDirectly() {
Integer count = 0;
for (int i = start; i < end; i++) {
if (array[i] % 2 == 0) {
count++;
}
}
return count;
}
}
測(cè)試程序?qū)⒉⑿卸燃?jí)別和閾值作為參數(shù)傳遞:
import java.util.*;
import java.util.concurrent.*;
public class ParallelismTest {
static final int SIZE = 10_000_000;
static int[] array = randomArray();
public static void main(String[] args) {
int threshold = Integer.parseInt(args[0]);
int parallelism = Integer.parseInt(args[1]);
long startTime = System.currentTimeMillis();
ArrayCounter mainTask = new ArrayCounter(array, 0, SIZE, threshold);
ForkJoinPool pool = new ForkJoinPool(parallelism);
Integer evenNumberCount = pool.invoke(mainTask);
long endTime = System.currentTimeMillis();
System.out.println("偶數(shù)的個(gè)數(shù): " + evenNumberCount);
long time = (endTime - startTime);
System.out.println("執(zhí)行時(shí)間: " + time + " ms");
}
static int[] randomArray() {
int[] array = new int[SIZE];
Random random = new Random();
for (int i = 0; i < SIZE; i++) {
array[i] = random.nextInt(100);
}
return array;
}
}
該程序允許您使用不同的并行度和閾值輕松測(cè)試性能。注意疤祭,它在最后打印執(zhí)行時(shí)間盼产。嘗試用不同的參數(shù)多次運(yùn)行這個(gè)程序,并觀察執(zhí)行時(shí)間勺馆。
結(jié)論
- Fork/Join框架的設(shè)計(jì)簡(jiǎn)化了java語(yǔ)言的并行程序
-
ForkJoinPool
是Fork/Join框架的核心戏售,它允許多個(gè)ForkJoinTask
請(qǐng)求由少量實(shí)際線程執(zhí)行,每個(gè)線程運(yùn)行在單獨(dú)的處理核心上 - 既可以通過(guò)構(gòu)造器也可以通過(guò)靜態(tài)方法common pool去獲取ForkJoinPool的實(shí)例
- ForkJoinTask是一個(gè)抽象類草穆,它表示的任務(wù)比普通線程更輕灌灾。通過(guò)覆蓋其compute()方法實(shí)現(xiàn)計(jì)算邏輯
- RecursiveAction是一個(gè)沒(méi)有返回值的ForkJoinTask
- RecursiveTask是一個(gè)有返回值的ForkJoinTask
- ForkJoinPool與其它池的不同之處在于,它使用了工作竊取算法悲柱,該算法允許一個(gè)線程完成了可以做的事情锋喜,從仍然繁忙的其他線程竊取任務(wù)
- ForkJoinPool中的線程是守護(hù)線程,不必顯式地關(guān)閉池
- 執(zhí)行一個(gè)ForkJoinTask既可以通過(guò)調(diào)用它自己的
invoke()
或fork()
方法豌鸡,也可以提交任務(wù)給ForkJoinPool并調(diào)用它的invoke()
或者execute()
方法 - 直接使用ForkJoinTask自身的方法執(zhí)行任務(wù)嘿般,如果它還沒(méi)運(yùn)行在
ForkJoinPool
中那么將運(yùn)行在common pool中 - 在
ForkJoinTask
中使用join()
方法,可以合并子任務(wù)的結(jié)果 -
invoke()
方法會(huì)等待子任務(wù)完成涯冠,但是execute()
方法不會(huì)