ForkJoinPool
背景描述
過去我們在線程池解決問題時筷狼,通常維護了一個阻塞的任務隊列橘忱。每個工作線程在任務完成后,就會去任務隊列里面尋找任務袖订。這種方式在我們執(zhí)行數(shù)量較多且不互相依賴的任務時非常方便且高效慰毅。但是當我們需要執(zhí)行一個很大的任務時隘截,普通的線程池似乎就很難有什么幫助了。
在JDK7中新增了ForkJoinPool汹胃。ForkJoinPool采用分治+work-stealing的思想婶芭。可以讓我們很方便地將一個大任務拆散成小任務着饥,并行地執(zhí)行犀农,提高CPU的使用率。關于ForkJoinPool的精妙之處宰掉,我們將在后面的使用中慢慢說明呵哨。
如何使用
構造方法
Android官方文檔中給出了三個構造方法。我們注意到在構造方法中轨奄,我們可以設置ForkJoinPool的最大工作線程數(shù)孟害、工作線程工廠、拒絕任務的Handler和同步模式挪拟。
執(zhí)行任務
ForkJoinPool提供了兩套執(zhí)行任務的API挨务,它們的區(qū)別主要是返回的結果類型不同。invoke方法返回執(zhí)行的結果,而submit方法返回執(zhí)行的任務谎柄。
使用示例
需求
遍歷系統(tǒng)所有文件丁侄,得到系統(tǒng)中文件的總數(shù)。
思路
通過遞歸的方法朝巫。任務在遍歷中如果發(fā)現(xiàn)文件夾就創(chuàng)建新的任務讓線程池執(zhí)行鸿摇,將返回的文件數(shù)加起來,如果發(fā)現(xiàn)文件則將計數(shù)加一捍歪,最終將該文件夾下的文件數(shù)返回。
代碼實現(xiàn)
CountingTask countingTask = new CountingTask(Environment.getExternalStorageDirectory());
forkJoinPool.invoke(countingTask);
class CountingTask extends RecursiveTask<Integer> {
private File dir;
public CountingTask(File dir) {
this.dir = dir;
}
@Override
protected Integer compute() {
int count = 0;
File files[] = dir.listFiles();
if(files != null){
for (File f : files){
if(f.isDirectory()){
// 對每個子目錄都新建一個子任務鸵钝。
CountingTask countingTask = new CountingTask(f);
countingTask.fork();
count += countingTask.join();
}else {
Log.d("tag" , "current path = "+f.getAbsolutePath());
count++;
}
}
}
return count;
}
}
原理說明
所謂work-stealing模式糙臼,即每個工作線程都會有自己的任務隊列。當工作線程完成了自己所有的工作后恩商,就會去“偷”別的工作線程的任務变逃。
那么這樣的工作模式,有什么好處呢怠堪?
假如我們需要做一個比較大的任務揽乱,我們可以把這個任務分割為若干互不依賴的子任務,為了減少線程間的競爭粟矿,于是把這些子任務分別放到不同的隊列里凰棉,并為每個隊列創(chuàng)建一個單獨的線程來執(zhí)行隊列里的任務,線程和隊列一一對應陌粹,比如A線程負責處理A隊列里的任務撒犀。但是有的線程會先把自己隊列里的任務干完,而其他線程對應的隊列里還有任務等待處理掏秩。干完活的線程與其等著或舞,不如去幫其他線程干活,于是它就去其他線程的隊列里竊取一個任務來執(zhí)行蒙幻。而在這時它們會訪問同一個隊列映凳,所以為了減少竊取任務線程和被竊取任務線程之間的競爭,通常會使用雙端隊列邮破,被竊取任務線程永遠從雙端隊列的頭部拿任務執(zhí)行诈豌,而竊取任務的線程永遠從雙端隊列的尾部拿任務執(zhí)行。
上面的需求抒和,如果我們用普通的線程池該如何完成队询?
如果我們使用newFixedThreadPool,當核心線程的路徑下都有子文件夾時构诚,它們會將路徑下的子文件夾拋給任務隊列蚌斩,最終變成所有的核心線程都在等待子文件夾的返回結果,從而造成死鎖。最終任務無法完成送膳。
如果我們使用newCachedThreadPool员魏,依然用上面的思路可以完成任務。但是每次子文件夾就會創(chuàng)建一個新的工作線程叠聋,這樣消耗過大撕阎。
因此,在這樣的情況下碌补,ForkJoinPool的work-stealing的方式就體現(xiàn)出了優(yōu)勢虏束。每個任務分配的子任務也由自己執(zhí)行,只有自己的任務執(zhí)行完成時厦章,才會去執(zhí)行別的工作線程的任務镇匀。
再來個例子
N項的Fibonacci數(shù)列求和,我們不再只能仰仗單個線程為我們執(zhí)行任務袜啃。
package com.example;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class MyClass {
static int computeCount = 0;
static class Fibonacci extends RecursiveTask<Integer> {
int n;
Fibonacci(int n) {
this.n = n;
}
@Override
protected Integer compute() {
computeCount ++;
System.out.printf("Current thread is " + Thread.currentThread()
+ "\n n = " + n + "\n");
if (n <= 2)
return 1;
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Fibonacci f2 = new Fibonacci(n - 2);
f2.fork();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("wati temp answer is :" + n + "\n");
int answer = f1.join() + f2.join();
System.out.printf("temp answer is :" + answer + ", n is :" +n +"\n");
return answer;
}
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool(2);
Fibonacci task = new Fibonacci(5);
int answer = 0;
answer = pool.invoke(task);
System.out.printf("Hello answer is :" + answer + " , compute count is :" + computeCount);
}
}
結語
實測下來汗侵,當情況足夠復雜時,ForkJoinPool的優(yōu)勢會愈加明顯群发。但是晰韵,就像快排一樣,最優(yōu)策略并不是一個思路走到死熟妓,當分治的區(qū)域較小時雪猪,可以將小區(qū)域改用插入排序進行排序。同理起愈,當我們遞歸到情況不再復雜時浪蹂,就可以轉而用別的線程池進行處理。
以上