在這篇文章中,將覆蓋如下內(nèi)容:
- 什么是Fork/Join框架
- 工作竊取算法
- Fork/Join框架的設(shè)計
- RecursiveAction 抽象類
- RecursiveAction 實戰(zhàn)--同步方式
- RecursiveTask 抽象類
- RecursiveTask 實戰(zhàn)-- 同步方式
- RecursiveTask實戰(zhàn)--異步方式
- Fork/Join框架的異常處理
- ForkJoinTask 抽象類
- ForkJoinPool
- 最佳實踐
1.什么是Fork/Join 框架
Fork/Join 框架是java7提供的一個用于并行執(zhí)行任務(wù)的框架,是一個把大任務(wù)分割成若干個小任務(wù)部服,最終匯總每個小任務(wù)結(jié)果后得到大任務(wù)結(jié)果的框架抗斤。
Fork Join 的運(yùn)行流程圖如下:
2.工作竊取
工作竊取(work-stealing)算法是指某個線程從其他隊列里竊取任務(wù)來執(zhí)行丈咐。
那么為什么需要使用工作竊取算法呢瑞眼?
假如我們需要做一個比較大的任務(wù),我們可以把這個任務(wù)分割為若干互不依賴的子任務(wù)棵逊,為了減少線程間的競爭伤疙,于是把這些子任務(wù)分別放到不同的隊列里,并為每個隊列創(chuàng)建一個單獨的線程來執(zhí)行隊列里的任務(wù)辆影,線程和隊列一一對應(yīng)锄码,比如A線程負(fù)責(zé)處理A隊列里的任務(wù)稍刀。
但是有的線程會先把自己隊列里的任務(wù)干完,而其他線程對應(yīng)的隊列里還有任務(wù)等待處理危号。干完活的線程與其等著扭仁,不如去幫其他線程干活歹啼,于是它就去其他線程的隊列里竊取一個任務(wù)來執(zhí)行岩馍。而在這時它們會訪問同一個隊列粱檀,所以為了減少竊取任務(wù)線程和被竊取任務(wù)線程之間的競爭,通常會使用雙端隊列迫像,被竊取任務(wù)線程永遠(yuǎn)從雙端隊列的頭部拿任務(wù)執(zhí)行拭抬,而竊取任務(wù)的線程永遠(yuǎn)從雙端隊列的尾部拿任務(wù)執(zhí)行。
3.Fork/Join框架的設(shè)計
如何設(shè)計Fork/Join框架侵蒙?
步驟1 分割任務(wù)。
步驟2 執(zhí)行任務(wù)并合并結(jié)果傅蹂。分割的子任務(wù)分別放在雙端隊列里纷闺,然后幾個啟動線程分別從雙端隊列里獲取任務(wù)執(zhí)行算凿。子任務(wù)執(zhí)行完的結(jié)果都統(tǒng)一放在一個隊列里,啟動一個線程從隊列里拿數(shù)據(jù)犁功,然后合并這些數(shù)據(jù)氓轰。
Fork/Join 使用兩個類來完成以上兩件事情。
① ForkJoinTask: 我們需要使用ForkJoin框架浸卦,必須首先創(chuàng)建一個ForkJoin任務(wù)署鸡。它提供在任務(wù)中執(zhí)行fork()和join()操作的機(jī)制。通常情況下限嫌,我們不需要直接繼承ForkJoinTask類靴庆,只需要繼承它的子類,F(xiàn)ork/Join框架提供了以下兩個子類怒医。
- RecursiveAction: 用于沒有返回結(jié)果的任務(wù)
- RecursiveTask: 用于有返回結(jié)果的任務(wù)
② ForkJoinPool: ForkJoinTask需要通過ForkJoinPool執(zhí)行炉抒。
任務(wù)分割出的子任務(wù)會添加到當(dāng)前工作線程所維護(hù)的雙端隊列中,進(jìn)入隊列的頭部稚叹。當(dāng)一個工作線程的隊列里暫時沒有任務(wù)時焰薄,它會隨機(jī)從其他工作線程的隊列的尾部獲取一個任務(wù)。
4.RecursiveAction抽象類——沒有返回值的任務(wù)
源碼淺讀
public abstract class RecursiveAction extends ForkJoinTask<Void> {
private static final long serialVersionUID = 5232453952276485070L;
/**
* 由這個任務(wù)執(zhí)行的主要計算
*/
protected abstract void compute();
/**
* 總是返回null
*/
public final Void getRawResult() { return null; }
/**
* 搞不懂為什么要這么寫扒袖?既然參數(shù)不用塞茅,為什么還要傳入
*/
protected final void setRawResult(Void mustBeNull) { }
/**
* 執(zhí)行計算
*/
protected final boolean exec() {
compute();
return true;
}
}
繼承自ForkJoinTask類。
compute()是抽象方法季率。
5. RecursiveAction實戰(zhàn)--同步方式
目標(biāo):實現(xiàn)一個任務(wù)來修改產(chǎn)品列表的價格
步驟如下:
1.創(chuàng)建類Product野瘦,將用來存儲產(chǎn)品的名稱和價格。
public class Product {
private String name;
private double price;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}
}
2.創(chuàng)建ProductListGenerator類蚀同,用來產(chǎn)生隨機(jī)產(chǎn)品的數(shù)列
public class ProductListGenerator {
public List<Product> generate(int size){
List<Product> ret=new ArrayList<>();
for(int i=0;i<size;i++){
Product product = new Product();
product.setName("Product"+i);
product.setPrice(10);
ret.add(product);
}
return ret;
}
}
3.創(chuàng)建Task類缅刽,指定它繼承RecursiveAction類。
public class Task extends RecursiveAction {
private static final long serialVersionUID = 1L;
private List<Product> products;
private int first;
private int last;
private double increment;
public Task(List<Product> products, int first, int last, double increment) {
this.products = products;
this.first = first;
this.last = last;
this.increment = increment;
}
@Override
protected void compute() {
if (last - first < 10) {
updatePrices();
} else {
int middle = (last + first) / 2;
System.out.printf("Task: Pending tasks: %s\n", getQueuedTaskCount());
Task t1 = new Task(products, first, middle + 1, increment);
Task t2 = new Task(products, middle + 1, last, increment);
//它調(diào)用invokeAll()方法蠢络,執(zhí)行每個任務(wù)所創(chuàng)建的子任務(wù)衰猛。這是一個同步調(diào)用,這個任務(wù)在繼續(xù)(可能完成)它的執(zhí)行之前刹孔,必須等待子任務(wù)的結(jié)束啡省。
// 當(dāng)任務(wù)正在等待它的子任務(wù)(結(jié)束)時,正在執(zhí)行它的工作線程執(zhí)行其他正在等待的任務(wù)髓霞。
// 在這種行為下卦睹,F(xiàn)ork/Join框架比Runnable和Callable對象本身提供一種更高效的任務(wù)管理。
invokeAll(t1, t2);
}
}
private void updatePrices() {
for (int i = first; i < last; i++) {
Product product = products.get(i);
product.setPrice(product.getPrice() * (1 + increment));
}
}
}
4.通過創(chuàng)建Main類方库,并實現(xiàn)main()方法结序。
public class TaskMain {
public static void main(String[] args) {
ProductListGenerator generator = new ProductListGenerator();
List<Product> products = generator.generate(100000);
Task task = new Task(products, 0, products.size(), 0.2);
System.out.println("task start");
//創(chuàng)建pool使用默認(rèn)的無參構(gòu)造函數(shù),線程池使用的線程數(shù)由Runtime.getRuntime().availableProcessors()返回值決定
ForkJoinPool pool = new ForkJoinPool();
//在池中使用execute()方法執(zhí)行這個任務(wù)纵潦。一個異步調(diào)用徐鹤,而主線程繼續(xù)它的執(zhí)行垃环。
pool.execute(task);
//顯示每隔5毫秒池中的變化信息
do {
System.out.printf("Main: Thread Count: %d\n", pool.getActiveThreadCount());
System.out.printf("Main: Thread Steal: %d\n", pool.getStealCount());
System.out.printf("Main: Parallelism: %d\n", pool.getParallelism());
System.out.println("------------------------");
try {
TimeUnit.MILLISECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
} while (!task.isDone());
pool.shutdown();
//使用isCompletedNormally()方法檢查假設(shè)任務(wù)完成時沒有出錯,在這種情況下返敬,寫入一條信息到控制臺遂庄。
if (task.isCompletedNormally()) {
System.out.printf("Main: The process has completed normally.\n");
}
for (int i = 0; i < products.size(); i++) {
Product product = products.get(i);
if (product.getPrice() != 12) {
System.out.printf("Product %s: %f\n", product.getName(), product.getPrice());
}
}
System.out.println("Main: End of the program.\n");
}
}
6. RecursiveTask抽象類——有返回值的任務(wù)
public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
private static final long serialVersionUID = 5232453952276485270L;
/**
* 計算的結(jié)果
*/
V result;
/**
* 由這個任務(wù)執(zhí)行的主要計算
* @return 返回計算的結(jié)果
*/
protected abstract V compute();
public final V getRawResult() {
return result;
}
protected final void setRawResult(V value) {
result = value;
}
/**
* 執(zhí)行計算
*/
protected final boolean exec() {
result = compute();
return true;
}
}
繼承自ForkJoinTask類。
compute()是抽象方法劲赠。
7.RecursiveTask 實戰(zhàn)--同步方式
目標(biāo):開發(fā)一個在文檔中查找單詞的應(yīng)用程序
步驟如下:
1.創(chuàng)建一個DocumentMock類涛目,它將產(chǎn)生用來模擬文檔的字符串的二維數(shù)組。
public class DocumentMock {
private String words[] = {"the", "hello", "goodbye", "packt", "java", "thread", "pool", "random", "class", "main"};
public String[][] generateDocument(int numLines, int numWords, String word) {
int counter = 0;
String document[][] = new String[numLines][numWords];
Random random = new Random();
for (int i = 0; i < numLines; i++) {
for (int j = 0; j < numWords; j++) {
int index = random.nextInt(words.length);
document[i][j] = words[index];
if (words[index].equals(word)) {
counter++;
}
}
}
System.out.println("DocumentMock: The word appers " + counter + " times in the document");
return document;
}
}
2.創(chuàng)建一個DocumentTask類凛澎,指定它繼承RecursiveTask類霹肝,并參數(shù)化為Integer類型。該類將實現(xiàn)統(tǒng)計單詞在一組行中出現(xiàn)的次數(shù)的任務(wù)预厌。
public class DocumentTask extends RecursiveTask<Integer> {
private String document[][];
private int start, end;
private String word;
public DocumentTask(String document[][], int start, int end, String word) {
this.document = document;
this.start = start;
this.end = end;
this.word = word;
}
@Override
protected Integer compute() {
int result = 0;
if (end - start < 10) {
result = processLines(document, start, end, word);
} else {
int mid = (start + end) / 2;
DocumentTask task1 = new DocumentTask(document, start, mid, word);
DocumentTask task2 = new DocumentTask(document, mid, end, word);
invokeAll(task1, task2);
try {
result = task1.get() + task2.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
return result;
}
private int processLines(String[][] document, int start, int end, String word) {
List<LineTask> tasks = new ArrayList<LineTask>();
for (int i = start; i < end; i++) {
LineTask task = new LineTask(document[i], 0, document[i].length, word);
tasks.add(task);
}
invokeAll(tasks);
int result = 0;
for (int i = 0; i < tasks.size(); i++) {
LineTask task = tasks.get(i);
try {
result = result + task.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
return result;
}
}
3..創(chuàng)建LineTask類阿迈,指定它繼承RecursiveTask類,并參數(shù)化為Integer類型轧叽。這個類將實現(xiàn)統(tǒng)計單詞在一行中出現(xiàn)的次數(shù)的任務(wù)苗沧。
public class LineTask extends RecursiveTask<Integer> {
private static final long serialVersionUID = 1L;
private String line[];
private int start;
private int end;
private String word;
public LineTask(String line[], int start, int end, String word) {
this.line = line;
this.start = start;
this.end = end;
this.word = word;
}
@Override
protected Integer compute() {
int result = 0;
if (end - start < 100) {
result = count(line, start, end, word);
} else {
int mid = (start + end) / 2;
LineTask task1 = new LineTask(line, start, mid, word);
LineTask task2 = new LineTask(line, mid, end, word);
invokeAll(task1, task2);
try {
result = task1.get() + task2.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
return result;
}
private int count(String[] line, int start, int end, String word) {
int counter = 0;
for (int i = start; i < end; i++) {
if (line[i].equals(word)) {
counter++;
}
}
//為了顯示demo的執(zhí)行,令任務(wù)睡眠10毫秒炭晒。
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
return counter;
}
}
4.實現(xiàn)示例的主類待逞,通過創(chuàng)建Main類,并實現(xiàn)main()方法网严。
public class TaskMain {
public static void main(String[] args) {
DocumentMock documentMock = new DocumentMock();
int numLines = 100;
int lineWordsNum = 1000;
String word = "the";
String[][] document = documentMock.generateDocument(numLines, lineWordsNum, word);
DocumentTask task = new DocumentTask(document, 0, numLines, word);
ForkJoinPool forkJoinPool = new ForkJoinPool();
forkJoinPool.execute(task);
do {
System.out.printf("******************************************\n");
System.out.printf("Main: Parallelism: %d\n", forkJoinPool.getParallelism());
System.out.printf("Main: Active Threads: %d\n", forkJoinPool.getActiveThreadCount());
System.out.printf("Main: Task Count: %d\n", forkJoinPool.getQueuedTaskCount());
System.out.printf("Main: Steal Count: %d\n", forkJoinPool.getStealCount());
System.out.printf("******************************************\n");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
} while (!task.isDone());
forkJoinPool.shutdown();
try {
System.out.printf("Main: The word appears %d in the document", task.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
8.RecursiveTask實戰(zhàn)--異步方式
目標(biāo):使用ForkJoinPool和ForkJoinTask類提供的異步方法來管理任務(wù)识樱。實現(xiàn)一個程序,在一個文件夾及其子文件夾內(nèi)查找確定擴(kuò)展名的文件震束。
public class FolderProcessor extends RecursiveTask<List<String>> {
private static final long serialVersionUID = 1L;
private String path;
private String extension;
public FolderProcessor(String path, String extension) {
this.path = path;
this.extension = extension;
}
@Override
protected List<String> compute() {
//用來保存存儲在文件夾中的文件怜庸。
List<String> list = new ArrayList<>();
//聲明一個FolderProcessor任務(wù)的數(shù)列,用來保存將要處理存儲在文件夾內(nèi)的子文件夾的子任務(wù)
List<FolderProcessor> tasks = new ArrayList<>();
File file = new File(path);
File content[] = file.listFiles();
if (content != null) {
//對于文件夾里的每個元素垢村,如果是子文件夾割疾,則創(chuàng)建一個新的FolderProcessor對象,并使用fork()方法異步地執(zhí)行它嘉栓。
for (int i = 0; i < content.length; i++) {
if (content[i].isDirectory()) {
FolderProcessor task = new FolderProcessor(content[i].getAbsolutePath(), extension);
task.fork();
tasks.add(task);
} else {
//否則宏榕,使用checkFile()方法比較這個文件的擴(kuò)展名和你想要查找的擴(kuò)展名
// 如果它們相等,在前面聲明的字符串?dāng)?shù)列中存儲這個文件的全路徑侵佃。
if (checkFile(content[i].getName())) {
list.add(content[i].getAbsolutePath());
}
}
}
}
if (tasks.size() > 50) {
System.out.printf("%s: %d tasks ran.\n", file.getAbsolutePath(), tasks.size());
}
addResultsFromTasks(list, tasks);
return list;
}
private void addResultsFromTasks(List<String> list, List<FolderProcessor> tasks) {
//對于保存在tasks數(shù)列中的每個任務(wù)麻昼,調(diào)用join()方法,這將等待任務(wù)執(zhí)行的完成馋辈,并且返回任務(wù)的結(jié)果
for (FolderProcessor item : tasks) {
list.addAll(item.join());
}
}
private boolean checkFile(String name) {
return name.endsWith(extension);
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
FolderProcessor folderProcessor1 = new FolderProcessor("/Users/wuzhenyu/IdeaProjects/ares2/ares-web/src/main/web/h5/src", "css");
FolderProcessor folderProcessor2 = new FolderProcessor("/Users/wuzhenyu/IdeaProjects/ares2/ares-web/src/main/web/h5/node_modules/_acorn-dynamic-import@2.0.2@acorn-dynamic-import", "css");
forkJoinPool.execute(folderProcessor1);
forkJoinPool.execute(folderProcessor2);
do {
System.out.printf("******************************************\n");
System.out.printf("Main: Parallelism: %d\n", forkJoinPool.getParallelism());
System.out.printf("Main: Active Threads: %d\n", forkJoinPool.getActiveThreadCount());
System.out.printf("Main: Task Count: %d\n", forkJoinPool.getQueuedTaskCount());
System.out.printf("Main: Steal Count: %d\n", forkJoinPool.getStealCount());
System.out.printf("***************************************** *\n");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
} while ((!folderProcessor1.isDone()) || (!folderProcessor2.isDone()));
forkJoinPool.shutdown();
List<String> results;
results = folderProcessor1.join();
System.out.printf("Documents: %d files found.\n", results.size());
results = folderProcessor2.join();
System.out.printf("Documents: %d files found.\n", results.size());
}
}
9.Fork/Join框架的異常處理
在Java中有兩種異常:
- 已檢查異常(Checked exceptions):這些異常必須在一個方法的throws從句中指定或在內(nèi)部捕捉它們抚芦。比如:IOException或ClassNotFoundException。
- 未檢查異常(Unchecked exceptions):這些異常不必指定或捕捉。比如:NumberFormatException燕垃。
在ForkJoinTask類的compute()方法中枢劝,你不能拋出任何已檢查異常,因為在這個方法的實現(xiàn)中卜壕,它沒有包含任何拋出(異常)聲明。你必須包含必要的代碼來處理異常烙常。但是轴捎,你可以拋出一個未檢查異常。
ForkJoinTask和ForkJoinPool類的行為與你可能的期望不同蚕脏。程序不會結(jié)束執(zhí)行侦副,并且你將不會在控制臺看到任何關(guān)于異常的信息。它只是被吞沒驼鞭,好像它沒拋出(異常)秦驯。
實戰(zhàn)目標(biāo):判斷一個任務(wù)是否拋出異常
public class ExceptionTask extends RecursiveTask<Integer> {
private int array[];
private int start;
private int end;
public ExceptionTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
System.out.printf("Task: Start from %d to %d\n", start, end);
if (end - start < 10) {
if (start < 3 && end > 3) {
throw new RuntimeException("This task throws an Exception:Task from " + start + " to " + end);
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
int mid = (start + end) / 2;
ExceptionTask exceptionTask1 = new ExceptionTask(array, start, mid);
ExceptionTask exceptionTask2 = new ExceptionTask(array, mid, end);
invokeAll(exceptionTask1, exceptionTask2);
}
System.out.printf("Task: End form %d to %d\n", start, end);
return 0;
}
public static void main(String[] args) {
int array[] = new int[100];
ExceptionTask task = new ExceptionTask(array, 0, 100);
ForkJoinPool pool = new ForkJoinPool();
pool.execute(task);
pool.shutdown();
try {
pool.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
e.printStackTrace();
}
//使用isCompletedAbnormally()方法,檢查這個任務(wù)或它的子任務(wù)是否已經(jīng)拋出異常挣棕。
if (task.isCompletedAbnormally()) {
System.out.printf("Main: An exception has ocurred\n");
System.out.printf("Main: %s\n", task.getException());
}
System.out.printf("Main: Result: %d", task.join());
}
}
10.ForkJoinTask 抽象類
public abstract class ForkJoinTask<V> implements Future<V>, Serializable
ForkJoinTask是運(yùn)行于ForkJoinPool的任務(wù)的抽象基類译隘。在犧牲一些使用上的限制的情況下,大量任務(wù)和子任務(wù)可以被 ForkJoinPool 的少量實際線程托管洛心。
主任務(wù)被明確提交到ForkJoinPool時開始執(zhí)行固耘。一旦開始,它通常會開始啟動其他子任務(wù)词身。正如該類的名稱所示厅目,許多使用ForkJoinTask的程序只使用方法fork()和join()或派生方法(如invokeAll)。然而這個類還提供了許多其他方法法严,用來支持更高級的用法损敷。
ForkJoinTask的任務(wù)有四種狀態(tài):已完成(NORMAL),被取消(CANCELLED),信號(SIGNAL)和出現(xiàn)異常(EXCEPTIONAL)
常用方法介紹
方法1: fork()
public final ForkJoinTask<V> fork()
安排異步執(zhí)行此任務(wù)
returns:
this
源碼淺讀:
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
如果當(dāng)前線程是ForkJoinWorkerThread的實例,就把任務(wù)加入workQueue中深啤。否則把任務(wù)提交到commonPool中拗馒。
方法2: get()
public final V get() throws InterruptedException,ExecutionException
等待必要的計算完成,然后獲取其結(jié)果墓塌。
該方法會拋出三類異常:
- 當(dāng)計算被取消時瘟忱,拋出CancellationException
- 如果計算拋出異常,拋出ExecutionException
- 如果當(dāng)前線程不是ForkJoinPool的成員苫幢,并且在等待時被中斷访诱,
則拋出InterruptedException
returns:
返回計算結(jié)果
源碼淺讀:
public final V get() throws InterruptedException, ExecutionException {
int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
doJoin() : externalInterruptibleAwaitDone();
Throwable ex;
if ((s &= DONE_MASK) == CANCELLED)
throw new CancellationException();
if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
throw new ExecutionException(ex);
return getRawResult();
}
doJoin()和externalInterruptibleAwaitDone()方法都會返回當(dāng)前任務(wù)的狀態(tài), 且都會造成阻塞, 直到完成或中斷韩肝。其中externalInterruptibleAwaitDone()方法會在線程中斷時触菜,拋出InterruptedException。
根據(jù)當(dāng)前線程是否是ForkJoinWorkerThread的實例哀峻,來決定調(diào)用哪個方法涡相。
如果任務(wù)被取消(CANCELLED),則拋出CancellationException
如果任務(wù)出現(xiàn)異常哲泊,則拋出對應(yīng)的異常。
否則返回結(jié)果催蝗。
方法3:join()
public final V join()
該方法會阻塞當(dāng)前線程并等待獲取結(jié)果切威。
returns:
返回計算結(jié)果
源碼淺讀:
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
如果doJoin()方法返回的狀態(tài)不是 NORMAL(已完成),就返回拋出與給定狀態(tài)關(guān)聯(lián)的異常(如果有的話)丙号。
否則 返回結(jié)果先朦。
方法4:invoke()
public final V invoke()
開始執(zhí)行此任務(wù),如果有必要則等待它完成犬缨,返回結(jié)果或拋出一個(非檢查) RuntimeException 或 Error(如果底層運(yùn)算出現(xiàn)這樣的操作)
returns:
返回計算結(jié)果
源碼淺讀:
public final V invoke() {
int s;
if ((s = doInvoke() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
如果doInvoke()方法返回的狀態(tài)不是 NORMAL(已完成)喳魏,就返回拋出與給定狀態(tài)關(guān)聯(lián)的異常(如果有的話)。
否則 返回結(jié)果怀薛。
我們可以看到刺彩,invoke()方法內(nèi)部調(diào)用了doInvoke(),join方法內(nèi)部調(diào)用了doJoin()枝恋。那么這兩者有什么區(qū)別创倔?請看源碼:
doJoin源碼:
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
doJoin方法首先會判斷任務(wù)是否完成。如果完成了鼓择,就返回status三幻。如果還沒有完成,則會判斷當(dāng)前線程是不是ForkJoinWorkerThread的實例呐能。
如果當(dāng)前線程是ForkJoinWorkerThread的實例念搬,就進(jìn)行下面的判斷。
(w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0? s : wt.pool.awaitJoin(w, this, 0L) :
意思是該任務(wù)位于workQueue的頂部(tryUnpush會返回布爾值摆出,判斷任務(wù)是否處于Top)朗徊,就調(diào)用doExec()方法執(zhí)行任務(wù),如果任務(wù)已經(jīng)完成偎漫,就返回status爷恳。否則就調(diào)用wt.pool.awaitJoin(w, this, 0L)等待任務(wù)完成。
如果當(dāng)前線程不是ForkJoinWorkerThread的實例象踊,就調(diào)用externalAwaitDone(),等待完成温亲。
doInvoke源碼:
private int doInvoke() {
int s; Thread t; ForkJoinWorkerThread wt;
return (s = doExec()) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(wt = (ForkJoinWorkerThread)t).pool.
awaitJoin(wt.workQueue, this, 0L) :
externalAwaitDone();
}
從源碼中可知,doInvoke會立即調(diào)用doExec()方法來執(zhí)行任務(wù)杯矩。如果任務(wù)完成了栈虚,就返回status。
否則根據(jù)當(dāng)前線程是否是ForkJoinWorkerThread的實例史隆,來決定調(diào)用哪個方法等待任務(wù)完成魂务。
結(jié)論: invoke()方法會立即執(zhí)行任務(wù),而join()方法需要當(dāng)前任務(wù)處于workQueue頂部才會執(zhí)行任務(wù)(沒有完成的情況下)。
方法5: invokeAll()
public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2)
public static void invokeAll(ForkJoinTask<?>... tasks)
public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks)
執(zhí)行所給定的任務(wù)粘姜。當(dāng)所有任務(wù)都結(jié)束了或碰到非受查異常(此時異常將會被重新拋出)就會返回鬓照。如果不止一個任務(wù)遇到異常,那么這個方法將會拋出其中一個異常孤紧〔蝰桑可以使用getException()方法得到每個任務(wù)的狀態(tài),或者使用相關(guān)方法檢查任務(wù)是否是被取消坛芽,或異常完成留储,或正常完成,或未處理咙轩。
如果任意任務(wù)為null,拋出NullPointerException阴颖。
源碼淺讀:
public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
int s1, s2;
t2.fork();
if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL)
t1.reportException(s1);
if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL)
t2.reportException(s2);
}
t2調(diào)用fork()異步執(zhí)行任務(wù)活喊。t1順序執(zhí)行任務(wù)。如果有任務(wù)非正常完成量愧,就會拋出與任務(wù)狀態(tài)相對應(yīng)的異常钾菊。
方法6:cancel()
public boolean cancel(boolean mayInterruptIfRunning)
嘗試取消此任務(wù)。當(dāng)此方法返回成功后偎肃,除非調(diào)用reinitialize()進(jìn)行干預(yù)煞烫,否則隨后調(diào)用isCancelled()、isDone()和cancel方法都將返回true累颂,調(diào)用join()和其相關(guān)方法將導(dǎo)致CancellationException結(jié)果滞详。
此方法旨在由其他任務(wù)調(diào)用。要終止當(dāng)前任務(wù)紊馏,只需從其計算方法中返回或拋出未經(jīng)檢查的異常料饥,或者調(diào)用completeExceptionally(Throwable)
returns:
如果此任務(wù)當(dāng)前已取消,返回true
源碼淺讀:
public boolean cancel(boolean mayInterruptIfRunning) {
return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED;
}
mayInterruptIfRunning這個參數(shù)對這個方法的結(jié)果沒有影響朱监。
通過setCompletion方法設(shè)置任務(wù)的狀態(tài)岸啡。
方法7: isDone()
public final boolean isDone()
returns:
如果任務(wù)已經(jīng)完成,返回true
源碼淺讀:
public final boolean isDone() {
return status < 0;
}
結(jié)束可能是由于正常終止赫编,異逞舱海或取消,在所有這些情況下,此方法將返回true擂送。
方法8: isCancelled()
public final boolean isCancelled()
returns:
如果該任務(wù)在正常完成之前被取消悦荒,則返回true
源碼淺讀:
public final boolean isCancelled() {
return (status & DONE_MASK) == CANCELLED;
}
任務(wù)狀態(tài)為CANCELLED就返回true
方法9: isCompletedAbnormally()
public final boolean isCompletedAbnormally()
returns:
如果此任務(wù)拋出異常或被取消团甲,返回true逾冬。
public final boolean isCompletedAbnormally() {
return status < NORMAL;
}
方法10:isCompletedNormally()
public final boolean isCompletedNormally()
returns:
如果此任務(wù)已經(jīng)完成且沒有拋異常和被取消,返回true。
public final boolean isCompletedNormally() {
return (status & DONE_MASK) == NORMAL;
}
方法11:getException()
public final Throwable getException()
returns:
返回異常
源碼淺讀:
public final Throwable getException() {
int s = status & DONE_MASK;
return ((s >= NORMAL) ? null :
(s == CANCELLED) ? new CancellationException() :
getThrowableException());
}
如果任務(wù)沒有完成或沒有拋出異常,則返回null身腻。
如果任務(wù)被取消了則返回CancellationException产还。
如果任務(wù)出現(xiàn)了異常,就返回該異常嘀趟。
11.ForkJoinPool
Java7 提供了ForkJoinPool來支持將一個任務(wù)拆分成多個“小任務(wù)”并行計算脐区,再把多個“小任務(wù)”的結(jié)果合并成總的計算結(jié)果。
ForkJoinPool是ExecutorService的實現(xiàn)類她按,因此是一種特殊的線程池牛隅。ForkJoinPool提供了如下兩個常用的構(gòu)造器。
- ForkJoinPool(int parallelism): 創(chuàng)建一個包含parallelism個并行線程的ForkJoinPool酌泰。
- ForkJoinPool():以Runtime.availableProcessors()方法的返回值作為parallellism參數(shù)來構(gòu)建ForkJoinPool媒佣。
Java8為ForkJoinPool增加了通用池功能。ForkJoinPool類通過如下兩個靜態(tài)方法提供通用池功能陵刹。
ForkJoinPool commonPool(): 該方法返回一個通用池默伍,通用池的運(yùn)行狀態(tài)不會受shutdown()或shutdownNow()方法的影響。當(dāng)然衰琐,如果程序直接執(zhí)行System.exit(0);來終止虛擬機(jī)也糊,通用池以及通用池中正在執(zhí)行的任務(wù)都會被自動終止。
int getCommonPoolParallelism():該方法返回通用池的并行級別羡宙。
常用方法介紹
方法1. execute()
public void execute(ForkJoinTask<?> task)
異步執(zhí)行任務(wù)
源碼如下:
public void execute(ForkJoinTask<?> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
}
另一個版本 public void execute(Runnable task)
執(zhí)行Runnable對象狸剃。
源碼如下:
public void execute(Runnable task) {
if (task == null)
throw new NullPointerException();
ForkJoinTask<?> job;
if (task instanceof ForkJoinTask<?>) // avoid re-wrap
job = (ForkJoinTask<?>) task;
else
job = new ForkJoinTask.RunnableExecuteAction(task);
externalPush(job);
}
RunnableExecuteAction是ForkJoinTask的靜態(tài)內(nèi)部類,繼承ForkJoinTask類狗热。
這里使用ForkJoinTask.RunnableExecuteAction方法钞馁,將Runnable包裝成了ForkJoinTask。
方法2.invoke()
public <T> T invoke(ForkJoinTask<T> task)
同步執(zhí)行任務(wù)斗搞。直到任務(wù)完成返回結(jié)果指攒。如果執(zhí)行過程中遇到未受查異常或Error, 將重新拋出僻焚,作為這次調(diào)用的結(jié)果允悦。
源碼如下:
public <T> T invoke(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
return task.join();
}
invoke與execute的區(qū)別是多了task.join()
。task.join()會阻塞線程,還會拋出異常虑啤。
方法3: awaitTermination()
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
等待任務(wù)結(jié)束隙弛。
部分源碼:
if (this == common) {
awaitQuiescence(timeout, unit);
return false;
}
這段代碼的意思是,如果this是通用池狞山,就執(zhí)行awaitQuiescence()方法全闷,并返回false。因此萍启,F(xiàn)orkJoinPool.commonPool().awaitTermination 等同于
ForkJoinPool.commonPool(). awaitQuiescence总珠。
這個方法會在線程池shutdown后任務(wù)完成屏鳍,或者出現(xiàn)超時,或者當(dāng)前線程中斷時返回局服。
方法4: commonPool()
public static ForkJoinPool commonPool()
返回通用池池實例钓瞭。這個池是靜態(tài)構(gòu)建的;其運(yùn)行狀態(tài)不受嘗試shutdown()或shutdownNow()的影響。使用commonPool通骋迹可以幫助應(yīng)用程序中多種需要進(jìn)行歸并計算的任務(wù)共享計算資源山涡,從而使后者發(fā)揮最大作用(ForkJoinPools中的工作線程在閑置時會被緩慢回收,并在隨后需要使用時被恢復(fù))
方法5: submit()
returns:
返回ForkJoinTask對象
版本1 :public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
return task;
}
提交一個ForkJoinTask 用于執(zhí)行唆迁。
該版本與public void execute(ForkJoinTask<?> task)區(qū)別是:多了一個ForkJoinTask類型的返回值鸭丛。
版本2: public <T> ForkJoinTask<T> submit(Callable<T> task)
public <T> ForkJoinTask<T> submit(Callable<T> task) {
ForkJoinTask<T> job = new ForkJoinTask.AdaptedCallable<T>(task);
externalPush(job);
return job;
}
提交一個有返回值的任務(wù)用于執(zhí)行,并返回 Future唐责。該 Future 的 get 方法在任務(wù)成功完成時將會返回該任務(wù)的結(jié)果
AdaptedCallable作為ForkJoinTask的靜態(tài)內(nèi)部類鳞溉,目的是適配Callable類型的task,使之能夠在ForkJoinPool中運(yùn)行。
AdaptedCallable繼承了ForkJoinTask類和RunnableFuture接口(實現(xiàn)run方法)鼠哥。
版本3:public <T> ForkJoinTask<T> submit(Runnable task, T result)
public <T> ForkJoinTask<T> submit(Runnable task, T result) {
ForkJoinTask<T> job = new ForkJoinTask.AdaptedRunnable<T>(task, result);
externalPush(job);
return job;
}
提交一個 Runnable 任務(wù)用于執(zhí)行穿挨,并返回一個表示該任務(wù)的 Future。該 Future 的 get 方法在成功完成時將會返回給定的結(jié)果肴盏。
AdaptedRunnable作為ForkJoinTask的靜態(tài)內(nèi)部類,目的是適配Runnable類型的task,使之能夠在ForkJoinPool中運(yùn)行帽衙。
AdaptedRunnable繼承了ForkJoinTask類和RunnableFuture接口(實現(xiàn)run方法)菜皂。
版本4: public ForkJoinTask<?> submit(Runnable task)
public ForkJoinTask<?> submit(Runnable task) {
if (task == null)
throw new NullPointerException();
ForkJoinTask<?> job;
if (task instanceof ForkJoinTask<?>) // avoid re-wrap
job = (ForkJoinTask<?>) task;
else
job = new ForkJoinTask.AdaptedRunnableAction(task);
externalPush(job);
return job;
}
這個版本task沒有返回值。
提交一個 Runnable 任務(wù)用于執(zhí)行厉萝,并返回一個表示該任務(wù)的 Future恍飘。該 Future 的 get 方法在成功完成時將會返回給定的結(jié)果。
AdaptedRunnableAction作為ForkJoinTask的靜態(tài)內(nèi)部類谴垫,目的是適配Runnable類型的task,使之能夠在ForkJoinPool中運(yùn)行章母。
AdaptedRunnableAction繼承了ForkJoinTask類和RunnableFuture接口(實現(xiàn)run方法)。
方法6: invokeAll()
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
執(zhí)行給定的任務(wù)列表翩剪,返回持有任務(wù)狀態(tài)和結(jié)果的 Future 列表乳怎。 Future.isDone()對于返回列表的每個元素都是true。注意:已完成的任務(wù)前弯,可能正常終止蚪缀,也可能通過拋出異常終止。如果在進(jìn)行此操作時修改了給定集合恕出,則此方法的結(jié)果未定義询枚。
方法7: getActiveThreadCount()
public int getActiveThreadCount()
返回當(dāng)前正在竊取任務(wù)或執(zhí)行任務(wù)的線程數(shù)的估計值,此方法可能高估活動線程的數(shù)量浙巫。
方法8: getStealCount()
public long getStealCount()
當(dāng)前ForkJoinPool線程池內(nèi)部各個work queue間發(fā)生的“工作竊取”操作的總次數(shù)金蜀,是個估計值刷后。
方法9:getParallelism()
public int getParallelism()
返回這個線程池的并行級別。
方法10:getPoolSize()
public int getPoolSize()
返回已啟動但尚未終止的工作線程數(shù)
方法11: isTerminated()
public boolean isTerminated()
如果所有任務(wù)在shutdown后都完成渊抄,則返回true尝胆。線程沒有shutdown時,這個方法返回false抒线。
方法12: isTerminating()
public boolean isTerminating()
如果此線程池正在終止任務(wù)但尚未全部終止班巩,返回true。
12.最佳實踐
在實際應(yīng)用時嘶炭,使用多個ForkJoinPool是沒有什么意義的抱慌。正是出于這個原因,一般來說來它實例化一次眨猎,然后把實例保存在靜態(tài)字段抑进,使之成為單例,這樣就可以在軟件中任何部分方便地重用了睡陪。
對一個任務(wù)調(diào)用join方法會阻塞調(diào)用方寺渗,直到該任務(wù)做出結(jié)果。因此兰迫,有必要在兩個子任務(wù)的計算都開始之后再調(diào)用它信殊。否則,你得到的版本會比原始的順序算法更慢更復(fù)雜汁果,因為每個子任務(wù)都必須等待另一個子任務(wù)完成才能啟動涡拘。
不應(yīng)該在RecursiveTask內(nèi)部使用ForkJoinPool的invoke方法。相反据德,你應(yīng)該始終直接調(diào)用compute或fork方法鳄乏,只有順序代碼才應(yīng)該用invoke來啟動并行計算。
get方法可以得到當(dāng)前結(jié)果棘利,不過一般不太使用橱野。因為它可能拋出已檢查異常,而在compute方法中不允許拋出這些異常善玫。
對子任務(wù)調(diào)用fork方法可以把它排進(jìn)ForkJoinPool水援。同時對左邊和右邊的子任務(wù)調(diào)用它似乎很自然,但這樣做的效率要比直接對其中一個調(diào)用compute低蝌焚。這樣做你可以為其中一個子任務(wù)重用同一線程裹唆,從而避免在線程池中多分配一個任務(wù)造成的開銷。
調(diào)用使用分支/合并框架的并行計算可能有點棘手只洒。特別是你平常都在你喜歡的IDE里面看棧跟蹤(stack trace)來找問題许帐,但放在分支-合并計算上就不行了,因為調(diào)用compute的線程并不是概念上的調(diào)用方毕谴,后者是調(diào)用fork的那個成畦。
你不應(yīng)理所當(dāng)然地認(rèn)為在多核處理器上使用分支/合并框架就比順序計算快距芬。一個任務(wù)可以分解成多個獨立的子任務(wù),才能讓性能在并行化時有所提升循帐。所有這些子任務(wù)的運(yùn)行時間都應(yīng)該比分出新任務(wù)所花的時間長框仔;一個慣用方法是把輸入/輸出放在一個子任務(wù)里,計算放在另一個里拄养,這樣計算就可以和輸入/輸出同時進(jìn)行离斩。此外,在比較同一算法的順序和并行版本的性能時還有別的因素要考慮瘪匿。就像任何其他Java代碼一樣跛梗,分支/合并框架需要“預(yù)熱”或者說要執(zhí)行幾遍才會被JIT編譯器優(yōu)化。這就是為什么在測量性能之前跑幾遍程序很重要棋弥,我們的測試框架就是這么做的核偿。同時還要知道,編譯器內(nèi)置的優(yōu)化可能會為順序版本帶來一些優(yōu)勢(例如執(zhí)行死碼分析——刪去從未被使用的計算)