什么是Fork/Join框架
Fork/Join框架是Java7提供了的一個(gè)用于并行執(zhí)行任務(wù)的框架葵擎,采用類似于分治算法原在,就是把一個(gè)復(fù)雜的問(wèn)題分成兩個(gè)或更多的相同或相似的子問(wèn)題惠险,直到最后子問(wèn)題可以簡(jiǎn)單的直接求解喉刘,原問(wèn)題的解即子問(wèn)題的解的合并休吠。
在這個(gè)框架中值得注意的一個(gè)重要概念是在理想狀態(tài)下是沒有空閑的工作線程吼渡。 它們實(shí)現(xiàn)了一種工作竊取算法,閑置的工作線程可以從忙碌的工作線程拿工作執(zhí)行恩溅。
Fork/Join框架處理復(fù)雜的線程問(wèn)題隔箍,你只需向框架指出哪些部分工作可以分解并遞歸處理。偽代碼來(lái)自Doug Lea's的論文
Result solve(Problem problem) {
if (problem is small)
directly solve problem
else {
split problem into independent parts
fork new subtasks to solve each part
join all subtasks
compose result from subresults
}
}
Fork/Join框架的核心類
ForkJoinPool和ForkJoinTask是支持Fork/Join機(jī)制的核心類暴匠。
ForkJoinPool
ForkjoinPool是實(shí)現(xiàn)了ExecutorService和work-stealing(工作竊取)算法鞍恢。如下所示,新建ForkJoinPool實(shí)例每窖,指定并行等級(jí)(處理器的個(gè)數(shù))帮掉。
ForkJoinPool pool = new ForkJoinPool(numberOfProcessors);
Where numberOfProcessors = Runtime.getRunTime().availableProcessors();
如果使用無(wú)參構(gòu)造函數(shù),默認(rèn)創(chuàng)建pool的大小為上面所示的可用的處理器個(gè)數(shù)窒典。盡管你可以指定任意大小的pool蟆炊,但pool會(huì)動(dòng)態(tài)調(diào)整大小來(lái)嘗試獲得足夠的活動(dòng)線程。與ExecutorService另一個(gè)重要的不同瀑志,pool不需要在程序退出時(shí)顯式關(guān)閉涩搓,因?yàn)樗乃芯€程都處于守護(hù)進(jìn)程模式。
三種提交任務(wù)到ForkJoinPool的方法:
- execute():期望異步執(zhí)行劈猪,調(diào)用其fork方法在多個(gè)線程之間拆分工作昧甘。
- invoke():等待獲得結(jié)果。
- submit():完成時(shí)返回一個(gè)future對(duì)象用于檢查狀態(tài)以及運(yùn)行結(jié)果战得。
ForkJoinTask
ForkJoinTask是在ForkJoinPool創(chuàng)建工作的抽象類充边,RecursiveAction 和RecursiveTask是ForkJoinTask的直接子類,都要實(shí)現(xiàn)compute方法,兩者唯一的不同點(diǎn)是:RecursiveAction沒有返回任務(wù)的結(jié)果浇冰,而 RecursiveTask有返回任務(wù)的結(jié)果(可以自己指定類型的對(duì)象)贬媒。
ForkJoinTask類提供了幾個(gè)方法用于檢查任務(wù)運(yùn)行的狀態(tài). 無(wú)論以什么方式結(jié)束任務(wù),isDone() 方法返回true肘习;如果完成任務(wù)過(guò)程中沒有被取消或者發(fā)生異常际乘,CompletedNormally() 方法返回true;如果任務(wù)被取消漂佩, isCancelled() 方法返回true脖含;如果任務(wù)被取消或者遇到異常,isCompletedabnormally() 方法返回true投蝉。
異常處理代碼如下:
if(task.isCompletedAbnormally())
{
System.out.println(task.getException());
}
getException方法返回Throwable對(duì)象器赞,如果任務(wù)被取消了則返回CancellationException。如果任務(wù)沒有完成或者沒有拋出異常則返回null墓拜。
工作竊取算法
工作竊取(work-stealing)算法是指某個(gè)線程從其他隊(duì)列里竊取任務(wù)來(lái)執(zhí)行请契。假如我們需要做一個(gè)比較大的任務(wù)咳榜,我們可以把這個(gè)任務(wù)分割為若干互不依賴的子任務(wù),為了減少線程間的競(jìng)爭(zhēng)爽锥,于是把這些子任務(wù)分別放到不同的隊(duì)列里涌韩,并為每個(gè)隊(duì)列創(chuàng)建一個(gè)單獨(dú)的線程來(lái)執(zhí)行隊(duì)列里的任務(wù),線程和隊(duì)列一一對(duì)應(yīng)氯夷,比如A線程負(fù)責(zé)處理A隊(duì)列里的任務(wù)臣樱。但是有的線程會(huì)先把自己隊(duì)列里的任務(wù)干完,而其他線程對(duì)應(yīng)的隊(duì)列里還有任務(wù)等待處理腮考。干完活的線程與其等著雇毫,不如去幫其他線程干活,于是它就去其他線程的隊(duì)列里竊取一個(gè)任務(wù)來(lái)執(zhí)行踩蔚。而在這時(shí)它們會(huì)訪問(wèn)同一個(gè)隊(duì)列棚放,所以為了減少竊取任務(wù)線程和被竊取任務(wù)線程之間的競(jìng)爭(zhēng),通常會(huì)使用雙端隊(duì)列馅闽,被竊取任務(wù)線程永遠(yuǎn)從雙端隊(duì)列的頭部拿任務(wù)執(zhí)行飘蚯,而竊取任務(wù)的線程永遠(yuǎn)從雙端隊(duì)列的尾部拿任務(wù)執(zhí)行。
工作竊取算法的優(yōu)點(diǎn)是充分利用線程進(jìn)行并行計(jì)算福也,并減少了線程間的競(jìng)爭(zhēng)局骤,其缺點(diǎn)是在某些情況下還是存在競(jìng)爭(zhēng),比如雙端隊(duì)列里只有一個(gè)任務(wù)時(shí)暴凑。并且消耗了更多的系統(tǒng)資源峦甩,比如創(chuàng)建多個(gè)線程和多個(gè)雙端隊(duì)列。
Fork/Join框架和ExecutorService的區(qū)別
Fork/Join框架和ExecutorService最主要的區(qū)別是工作竊取算法搬设。與Executor框架不同穴店,當(dāng)有線程完成了自己的所有子任務(wù)撕捍,而其他正在執(zhí)行的線程(稱為工作線程)還有子任務(wù)等待處理,就去其他線程的隊(duì)列里竊取一個(gè)任務(wù)來(lái)執(zhí)行泣洞。通過(guò)這種方式忧风,線程可以充分利用其運(yùn)行時(shí)間,從而提高應(yīng)用程序的性能球凰。
Fork/Join框架實(shí)踐例子
在這個(gè)例子中狮腿,我們使用ForkJoinPool和ForkJoinTask提供的異步方法來(lái)管理任務(wù)。我們將實(shí)現(xiàn)遍歷文件夾查找指定擴(kuò)展名的文件呕诉,F(xiàn)orkJoinTask實(shí)現(xiàn)處理一個(gè)文件夾內(nèi)的查找缘厢,如果存在子文件夾,為每一個(gè)文件夾fork一個(gè)新異步任務(wù)到ForkJoinPool中去甩挫,每個(gè)子任務(wù)會(huì)查找自己文件夾的指定擴(kuò)展名的文件贴硫。一旦任務(wù)已經(jīng)處理了所有的指定文件夾的內(nèi)容,利用ForkJoinPool的join()方法等待完成所有任務(wù)伊者。join方法是等待執(zhí)行完成并返回compute()方法的計(jì)算結(jié)果英遭。任務(wù)組的所有任務(wù),都將自己的結(jié)果返回添加到結(jié)果列表中亦渗。
詳細(xì)代碼如下:
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;
public class FolderProcessor extends RecursiveTask<List<String>> {
private final String path;
private final String extension;
public FolderProcessor(String path,String extension) {
this.extension = extension;
this.path = path;
}
@Override
protected List<String> compute() {
List<String> list = new ArrayList<String>();
List<FolderProcessor> tasks = new ArrayList<FolderProcessor>();
File file = new File(path);
File[] content= file.listFiles();
if(content != null){
for (File aContent : content) {
if (aContent.isDirectory()) {
FolderProcessor task =
new FolderProcessor(aContent.getAbsolutePath(), extension);
task.fork();
tasks.add(task);
} else {
if (checkFile(aContent.getName())) {
list.add(aContent.getAbsolutePath());
}
}
}
}
if (tasks.size() > 50)
{
System.out.printf("%s: %d tasks ran.\n", file.getAbsolutePath(), tasks.size());
}
addResultsFromTasks(list, tasks);
return list;
}
private void addResultsFromTasks(List<String> list, List<FolderProcessor> tasks) {
for (FolderProcessor item : tasks)
{
list.addAll(item.join());
}
}
private boolean checkFile(String name) {
return name.endsWith(extension);
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
FolderProcessor system = new FolderProcessor("/System", "log");
FolderProcessor library = new FolderProcessor("/Library", "log");
FolderProcessor users = new FolderProcessor("/Users", "log");
pool.execute(system);
pool.execute(library);
pool.execute(users);
do
{
System.out.printf("******************************************\n");
System.out.printf("Main: Parallelism: %d\n", pool.getParallelism());
System.out.printf("Main: Active Threads: %d\n", pool.getActiveThreadCount());
System.out.printf("Main: Task Count: %d\n", pool.getQueuedTaskCount());
System.out.printf("Main: Steal Count: %d\n", pool.getStealCount());
System.out.printf("******************************************\n");
try
{
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e)
{
e.printStackTrace();
}
} while ((!system.isDone()) || (!library.isDone()) || (!users.isDone()));
pool.shutdown();
List<String> results;
results = system.join();
System.out.printf("System: %d files found.\n", results.size());
results = library.join();
System.out.printf("Library: %d files found.\n", results.size());
results = users.join();
System.out.printf("Users: %d files found.\n", results.size());
}
}
結(jié)果輸出類似如下:
******************************************
Main: Parallelism: 8
Main: Active Threads: 60
Main: Task Count: 62370
Main: Steal Count: 81261
******************************************
******************************************
Main: Parallelism: 8
Main: Active Threads: 0
Main: Task Count: 19295
Main: Steal Count: 160629
******************************************
JDK中的使用實(shí)現(xiàn)
Java SE中有一些通用的功能挖诸,它們已經(jīng)使用fork/join框架來(lái)實(shí)現(xiàn)。
1.在Java 8的java.util.Arrays中的parallelSort方法采用了fork/join框架法精,在多處理器系統(tǒng)上多律,并行排序大量數(shù)據(jù)比順序排序更快
2.在Stream.parallel()中使用并行,更多請(qǐng)參考parallel stream operation in java 8搂蜓。
總結(jié):
設(shè)計(jì)優(yōu)秀的多線程算法是非常困難的狼荞,fork/join框架并不適用于所有情況,但是在它的適用范圍之內(nèi),能夠輕松的利用多個(gè)CPU提供的計(jì)算資源來(lái)協(xié)作完成一個(gè)復(fù)雜的計(jì)算任務(wù)洛勉。最終還是看你的問(wèn)題是否符合框架特性粘秆,若不符合,你可以使用基于java.util.concurrent包基礎(chǔ)工具方法實(shí)現(xiàn)自己的解決方案收毫。
參考