Fork/Join框架是Java 7提供的一個用于并行執(zhí)行任務(wù)的框架梦皮,是一個把大任務(wù)分割成若干 個小任務(wù)(Fork)嫌蚤,最終匯總每個小任務(wù)結(jié)果后得到大任務(wù)結(jié)果(Join)的框架护盈。
分而治之
一個規(guī)模為N的問題莺奸,當(dāng)N小于閥值時直接執(zhí)行奕巍,當(dāng)N大于閥值時將N分解成K個小規(guī)模子問題呵燕,子問題之間相互獨立窘面,并與原問題形式相同翠语,最后將所有子問題的解合并得到原問題的解,叫做分而治之财边。
工作竊取算法
工作竊燃±ā(work-stealing)算法是指某個線程從其他隊列里竊取任務(wù)來執(zhí)行。一般任務(wù)被拆分后會分配到不同的隊列酣难,并為每個隊列建立單獨的線程來處理任務(wù)谍夭,線程和隊列一一對應(yīng)。但是有的線程會將自己的任務(wù)先處理完憨募,這個時候先處理完的線程就可以去幫助其他線程干活紧索,以此來提升整個任務(wù)的工作效率。為了減少竊取任務(wù)時候的沖突菜谣,通常會使用雙端隊列珠漂,竊取任務(wù)的線程永遠從隊列尾部拿任務(wù)晚缩,正常線程從隊列頭部拿任務(wù)。
- 工作竊取算法的優(yōu)點:充分利用線程進行并行計算媳危,減少了線程間的競爭荞彼。
- 工作竊取算法的缺點:在某些情況下還是存在競爭,比如雙端隊列里只有一個任務(wù)時待笑。并且該算法會消耗了更多的系統(tǒng)資源鸣皂,比如創(chuàng)建多個線程和多個雙端隊列。
Fork/Join框架使用
使用的標(biāo)準(zhǔn)范式
- invokeAll()方法里面最終調(diào)用的還是 fork()方法暮蹂。
- ForkJoinPool新建的線程數(shù)默認等于CPU核數(shù)
Fork/Join使用兩個類來完成任務(wù)分割和執(zhí)行任務(wù)合并結(jié)果兩件事情寞缝。
- ForkJoinTask:我們要使用ForkJoin框架,必須首先創(chuàng)建一個ForkJoin任務(wù)仰泻。它提供在任務(wù)中執(zhí)行fork()和join()操作的機制荆陆。通常情況下,我們不需要直接繼承ForkJoinTask類我纪,只需要繼承它的子類慎宾,F(xiàn)ork/Join框架提供了以下兩個子類。
- RecursiveAction:用于沒有返回結(jié)果的任務(wù)浅悉。
- RecursiveTask:用于有返回結(jié)果的任務(wù)趟据。
- ForkJoinPool:ForkJoinTask需要通過ForkJoinPool來執(zhí)行。
任務(wù)分割出的子任務(wù)會添加到當(dāng)前工作線程所維護的雙端隊列中术健,進入隊列的頭部杀赢。當(dāng)一個工作線程的隊列里暫時沒有任務(wù)時蕾久,它會隨機從其他工作線程的隊列的尾部獲取一個任務(wù)摧玫。
示例
計算1+2+3+4+...+n的值
package com.xiaolyuh;
import java.util.Objects;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
/**
* 計算1+2+3+...+n的值
* 使用同步執(zhí)行的方式
*
* @author yuhao.wang3
* @since 2019/6/25 17:07
*/
public class ForkJoinCountTask extends RecursiveTask<Long> {
/**
* 閥值
*/
private int threshold = 10;
/**
* 任務(wù)的開始值
*/
private long start;
/**
* 任務(wù)的結(jié)束值
*/
private long end;
public ForkJoinCountTask(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if (end - start <= threshold) {
long count = 0;
for (int i = 0; i <= end - start; i++) {
count = count + start + i;
}
return count;
} else {
// 如果任務(wù)大于閾值弱判,就分裂成三個子任務(wù)計算
long slip = (end - start) / 3;
ForkJoinCountTask oneTask = new ForkJoinCountTask(start, start + slip);
ForkJoinCountTask twoTask = new ForkJoinCountTask(start + slip + 1, start + slip * 2);
ForkJoinCountTask threeTask = new ForkJoinCountTask(start + slip * 2 + 1, end);
// 提交子任務(wù)到框架去執(zhí)行
invokeAll(oneTask, twoTask, threeTask);
// 等待子任務(wù)執(zhí)行完,得到其結(jié)果勘伺,并合并子任務(wù)
return oneTask.join() + twoTask.join() + threeTask.join();
}
}
public static void main(String[] args) {
long start = System.currentTimeMillis();
ForkJoinPool pool = new ForkJoinPool();
// 生成一個計算任務(wù)跪腹,負責(zé)計算1+2+3+n
ForkJoinCountTask countTask = new ForkJoinCountTask(1, 1000000);
// 執(zhí)行一個任務(wù)(同步執(zhí)行,任務(wù)會阻塞在這里直到任務(wù)執(zhí)行完成)
pool.invoke(countTask);
// 異常檢查
if (countTask.isCompletedAbnormally()) {
Throwable throwable = countTask.getException();
if (Objects.nonNull(throwable)) {
System.out.println(throwable.getMessage());
}
}
// join方法是一個阻塞方法飞醉,會等待任務(wù)執(zhí)行完成
System.out.println("計算為:" + countTask.join() + ", 耗時:" + (System.currentTimeMillis() - start) + "毫秒");
}
}
搜索指定目錄下的指定文件
package com.xiaolyuh;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
/**
* 搜索指定目錄下的指定文件
* 使用異步執(zhí)行的方式
*
* @author yuhao.wang3
* @since 2019/6/25 17:07
*/
public class ForkJoinSearchFileTask extends RecursiveAction {
/**
* 指定目錄
*/
private File file;
/**
* 文件后綴
*/
private String suffix;
public ForkJoinSearchFileTask(File file, String suffix) {
this.file = file;
this.suffix = suffix;
}
@Override
protected void compute() {
if (Objects.isNull(file)) {
return;
}
File[] files = file.listFiles();
List<ForkJoinSearchFileTask> fileTasks = new ArrayList<>();
if (Objects.nonNull(files)) {
for (File f : files) {
// 拆分任務(wù)
if (f.isDirectory()) {
fileTasks.add(new ForkJoinSearchFileTask(f, suffix));
} else {
if (f.getAbsolutePath().endsWith(suffix)) {
System.out.println("文件: " + f.getAbsolutePath());
}
}
}
// 提交并執(zhí)行任務(wù)
invokeAll(fileTasks);
for (ForkJoinSearchFileTask fileTask : fileTasks) {
// 等待任務(wù)執(zhí)行完成
fileTask.join();
}
}
}
public static void main(String[] args) throws Exception {
File file = new File("d:/");
ForkJoinPool pool = new ForkJoinPool();
// 生成一個計算任務(wù)冲茸,負責(zé)查找指定木目錄
ForkJoinSearchFileTask searchFileTask = new ForkJoinSearchFileTask(file, ".txt");
// 異步執(zhí)行一個任務(wù)
pool.execute(searchFileTask);
Thread.sleep(10);
// 做另外的事情
int count = 0;
for (int i = 0; i < 1000; i++) {
count += i;
}
System.out.println("計算任務(wù):" + count);
// join方法是一個阻塞方法,會等待任務(wù)執(zhí)行完成
searchFileTask.join();
}
}
Fork/Join框架的異常處理
ForkJoinTask在執(zhí)行的時候可能會拋出異常缅帘,但是我們沒辦法在主線程里直接捕獲異常轴术, 所以ForkJoinTask提供了isCompletedAbnormally()方法來檢查任務(wù)是否已經(jīng)拋出異常或已經(jīng)被 取消了钦无,并且可以通過ForkJoinTask的getException方法獲取異常逗栽。getException方法返回Throwable對象,如果任務(wù)被取消了則返回CancellationException失暂。如 果任務(wù)沒有完成或者沒有拋出異常則返回null彼宠。
// 異常檢查
if(countTask.isCompletedAbnormally()) {
Throwable throwable = countTask.getException();
if (Objects.nonNull(throwable)) {
System.out.println(throwable.getMessage());
}
}
參考
《java并發(fā)編程的藝術(shù)》
源碼
https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases
spring-boot-student-concurrent 工程
layering-cache
為監(jiān)控而生的多級緩存框架 layering-cache這是我開源的一個多級緩存框架的實現(xiàn)鳄虱,如果有興趣可以看一下