ForkJoin 學(xué)習(xí)使用筆記
Fork/Join框架是Java7提供了的一個(gè)用于并行執(zhí)行任務(wù)的框架勺馆, 是一個(gè)把大任務(wù)分割成若干個(gè)小任務(wù)戏售,最終匯總每個(gè)小任務(wù)結(jié)果后得到大任務(wù)結(jié)果的框架
背景
在日常的業(yè)務(wù)需求中,經(jīng)常出現(xiàn)的批量查詢草穆,批量寫入等接口的提供灌灾,一般來(lái)說(shuō),最簡(jiǎn)單最low的方式就是寫一個(gè)for循環(huán)來(lái)一次執(zhí)行悲柱,但是當(dāng)業(yè)務(wù)方對(duì)接口的性能要求較高時(shí)锋喜,就比較尷尬了
通常可以想到的方式是采用并發(fā)操作豌鸡,首先想到可以實(shí)現(xiàn)的方式就是利用線程池來(lái)做
通常實(shí)現(xiàn)方式如下
// 1. 創(chuàng)建線程池
ExecutorService executorService = new ThreadPoolExecutor(3, 5, 60,
TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(10), new DefaultThreadFactory("biz-exec"),
new ThreadPoolExecutor.CallerRunsPolicy());
// 2. 創(chuàng)建執(zhí)行任務(wù)
List<Future<Object>> futureList = new ArrayList<>();
for(Object arg : list) {
futureList.add(executorService.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
// xxx
}
}));
}
// 3. 結(jié)果獲取
for(Future f: futureList) {
Object obj = f.get();
}
用上面的這種方式并沒(méi)有什么問(wèn)題嘿般,我們接下來(lái)考慮的是如何使用ForkJoin框架來(lái)實(shí)現(xiàn)類似的功能
ForkJoin 基本知識(shí)
Fork: 將大任務(wù)拆分成若干個(gè)可以并發(fā)執(zhí)行的小任務(wù)
Join: 合并所有小任務(wù)的執(zhí)行結(jié)果
任務(wù)分割
ForkJoinTask
: 基本任務(wù),使用forkjoin框架必須創(chuàng)建的對(duì)象直颅,提供fork,join操作博个,常用的兩個(gè)子類
-
RecursiveAction
: 無(wú)結(jié)果返回的任務(wù) -
RecursiveTask
: 有返回結(jié)果的任務(wù)
說(shuō)明:
-
fork
: 讓task異步執(zhí)行 -
join
: 讓task同步執(zhí)行,可以獲取返回值 - ForkJoinTask 在不顯示使用ForkJoinPool.execute/invoke/submit()方法進(jìn)行執(zhí)行的情況下功偿,也可以使用自己的fork/invoke方法進(jìn)行執(zhí)行
結(jié)果合并
ForkJoinPool
執(zhí)行 ForkJoinTask
盆佣,
- 任務(wù)分割出的子任務(wù)會(huì)添加到當(dāng)前工作線程所維護(hù)的雙端隊(duì)列中,進(jìn)入隊(duì)列的頭部械荷。
- 當(dāng)一個(gè)工作線程的隊(duì)列里暫時(shí)沒(méi)有任務(wù)時(shí)共耍,它會(huì)隨機(jī)從其他工作線程的隊(duì)列的尾部獲取一個(gè)任務(wù)
三中提交方式:
-
execute
異步,無(wú)返回結(jié)果 -
submit
異步吨瞎,有返回結(jié)果 (返回Future<T>
) -
invoke
同步痹兜,有返回結(jié)果 (會(huì)阻塞)
使用說(shuō)明
結(jié)合兩個(gè)場(chǎng)景,給出使用姿勢(shì)
1. 累加
實(shí)現(xiàn)從 start - end 的累加求和
首先是定義一個(gè)CountTask
來(lái)實(shí)現(xiàn)求和
首先是確定任務(wù)分割的閥值颤诀,當(dāng) end-start
的差值大于閥值時(shí)字旭,將任務(wù)一分為二
public class CountTask extends RecursiveTask<Integer> {
private int start;
private int end;
private static final int THRED_HOLD = 30;
public CountTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
boolean canCompute = (end - start) <= THRED_HOLD;
if (canCompute) { // 不需要拆分
for (int i = start; i <= end; i++) {
sum += i;
}
System.out.println("thread: " + Thread.currentThread() + " start: " + start + " end: " + end);
} else {
int mid = (end + start) / 2;
CountTask left = new CountTask(start, mid);
CountTask right = new CountTask(mid + 1, end);
left.fork();
right.fork();
sum = left.join() + right.join();
}
return sum;
}
}
調(diào)用case
@Test
public void testFork() throws ExecutionException, InterruptedException {
int start = 0;
int end = 200;
CountTask task = new CountTask(start, end);
ForkJoinPool pool = ForkJoinPool.commonPool();
Future<Integer> ans = pool.submit(task);
int sum = ans.get();
System.out.println(sum);
}
輸出結(jié)果:
thread: Thread[ForkJoinPool.commonPool-worker-0,5,main] start: 51 end: 75
thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] start: 101 end: 125
thread: Thread[ForkJoinPool.commonPool-worker-1,5,main] start: 0 end: 25
thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] start: 126 end: 150
thread: Thread[ForkJoinPool.commonPool-worker-0,5,main] start: 76 end: 100
thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] start: 151 end: 175
thread: Thread[ForkJoinPool.commonPool-worker-1,5,main] start: 26 end: 50
thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] start: 176 end: 200
20100
2. 排序
int 數(shù)組進(jìn)行排序
同樣先定義一個(gè)SortTask, 主要是為了演示ForkJoin的使用姿勢(shì),具體的排序和合并的邏輯比較簡(jiǎn)陋的實(shí)現(xiàn)了一下(這塊不是重點(diǎn))
public class SortTask extends RecursiveTask<List<Integer>> {
private List<Integer> list;
private final static int THRESHOLD = 5;
public SortTask(List<Integer> list) {
this.list = list;
}
@Override
protected List<Integer> compute() {
if (list.size() < THRESHOLD) {
Collections.sort(list);
System.out.println("thread: " + Thread.currentThread() + " sort: " + list);
return list;
}
int mid = list.size() >> 1;
SortTask l = new SortTask(list.subList(0, mid));
SortTask r = new SortTask(list.subList(mid, list.size()));
l.fork();
r.fork();
List<Integer> left = l.join();
List<Integer> right = r.join();
return merge(left, right);
}
private List<Integer> merge(List<Integer> left, List<Integer> right) {
List<Integer> result = new ArrayList<>(left.size() + right.size());
int rightIndex = 0;
for (int i = 0; i < left.size(); i++) {
if (rightIndex >= right.size() || left.get(i) <= right.get(rightIndex)) {
result.add(left.get(i));
} else {
result.add(right.get(rightIndex++));
i -= 1;
}
}
if (rightIndex < right.size()) {
result.addAll(right.subList(rightIndex, right.size()));
}
return result;
}
}
測(cè)試case和上面基本一樣崖叫,我們改用 invoke 替換上面的 submit
@Test
public void testMerge() throws ExecutionException, InterruptedException {
List<Integer> list = Arrays.asList(100, 200, 150, 123, 4512, 3414, 3123, 34, 5412, 34, 1234, 893, 213, 455, 6, 123, 23);
SortTask sortTask = new SortTask(list);
ForkJoinPool pool = ForkJoinPool.commonPool();
List<Integer> ans = pool.invoke(sortTask);
System.out.println(ans);
}
輸出結(jié)果
thread: Thread[ForkJoinPool.commonPool-worker-0,5,main] sort: [34, 3123, 3414, 4512]
thread: Thread[ForkJoinPool.commonPool-worker-1,5,main] sort: [100, 123, 150, 200]
thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] sort: [34, 893, 1234, 5412]
thread: Thread[ForkJoinPool.commonPool-worker-0,5,main] sort: [213, 455]
thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] sort: [6, 23, 123]
[6, 23, 34, 34, 100, 123, 123, 150, 200, 213, 455, 893, 1234, 3123, 3414, 4512, 5412]