1.落筆緣由
由于之前希望對Java異步操作進行一次梳理惜颇,碰巧看到了Fork/Join,之前并沒有了解過皆刺,所以借這次機會來了解一下它的用途。
2.Fork/Join作用
Fork/Join框架是Java7提供了的一個用于并行執(zhí)行任務的框架凌摄, 是一個把大任務分割成若干個小任務羡蛾,最終匯總每個小任務結果后得到大任務結果的框架。Fork/Join框架是ExecutorService接口的一種具體實現锨亏,目的是為了幫助你更好地利用多處理器帶來的好處痴怨。它是為那些能夠被遞歸地拆解成子任務的工作類型量身設計的。其目的在于能夠使用所有可用的運算能力來提升你的應用的性能器予。
和ExecutorService接口的其他實現一樣(其實我正是在整理ExecutorService的SingleThreadExecutor等子類的時候浪藻,看到了WorkStealingPool,才進而接觸到Fork/Join框架)乾翔,Fork/Join會將任務分發(fā)給線程池中的工作線程爱葵。Fork/Join使用工作竊取(work-stealing)算法。那么什么是工作竊取方法反浓?簡單來說就是某個線程從其他隊列里竊取任務來執(zhí)行萌丈。我們知道Fork/Join的作用就是將一個大任務分為若干小任務,最后將這些小任務的執(zhí)行結果整合起來得到大任務的結果雷则。而Fork/Join把這些子任務分別放到不同的隊列里辆雾,并為每個隊列創(chuàng)建一個單獨的線程來執(zhí)行隊列里的任務。那么這里就會出現一種情況月劈,有些線程的任務隊列里的任務已經完成度迂,但其他線程的隊列還有任務沒完成,這樣就造成已完成任務線程閑置猜揪,這也太浪費了吧惭墓,所以為了提高效率,完成自己的任務而處于空閑的工作線程能夠從其他仍然處于忙碌(busy)狀態(tài)的工作線程處竊取等待執(zhí)行的任務而姐。為了減少竊取任務線程和被竊取任務線程之間的競爭诅妹,通常會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執(zhí)行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執(zhí)行吭狡。
Fork/Join框架的核心是ForkJoinPool
類:
//ForkJoinPool繼承抽象類AbstractExecutorService
ForkJoinPool extends AbstractExecutorService
//而AbstractExecutorService實現的是ExecutorService接口
AbstractExecutorService implements ExecutorService
//創(chuàng)建ForkJoinPool對象:
//使用Runtime.availableProcessors()獲取的數值作為并行級別;使用默認的default thread factory丈莺;UncaughtExceptionHandler為空划煮;非異步LIFO模式。
public ForkJoinPool()
//使用指定數值(parallelism)作為并行級別缔俄;使用默認的default thread factory弛秋;UncaughtExceptionHandler為空;非異步LIFO模式俐载。
public ForkJoinPool(int parallelism)
//通過Executors創(chuàng)建ForkJoinPool對象
ForkJoinPool forkJoinPool = (ForkJoinPool) Executors.newWorkStealingPool();
ForkJoinPool實現了工作偷取算法蟹略,通過執(zhí)行ForkJoinTask任務來實現。
3.執(zhí)行過程
Fork/Join的執(zhí)行過程主要有兩步:
第一步分割任務遏佣。通過ForkJoinTask對象挖炬,調用fork()方法把大任務分割成子任務。
第二步執(zhí)行任務并合并結果状婶。分割的子任務分別放在雙端隊列里意敛,然后幾個啟動線程分別從雙端隊列里獲取任務執(zhí)行。子任務執(zhí)行完的結果都統(tǒng)一放在一個隊列里膛虫,啟動一個線程從隊列里拿數據草姻,然后合并這些數據,而子任務執(zhí)行的結果是通過ForkJoinTask的join()方法獲取的稍刀。
下面說一下ForkJoinTask撩独,ForkJoinTask有點像Thread的Runable,都是用來定義要執(zhí)行的任務的。
ForkJoinTask有兩個子類:
RecursiveAction:用于沒有返回結果的任務账月。
RecursiveTask :用于有返回結果的任務综膀。
具體可以看一下oracle的Fork and Join: Java Can Excel at Painless Parallel Programming Too!
這篇文章,里面有介紹使用Fork/Join框架的例子(是關于計算文檔中的單詞出現次數)捶障,下面這個類就是這個例子中的僧须,他繼承了RecursiveTask ,返回一個Long型的結果项炼。
public class FolderSearchTask extends RecursiveTask<Long> {
private final Folder folder;
private final String searchedWord;
FolderSearchTask(Folder folder, String searchedWord) {
super();
this.folder = folder;
this.searchedWord = searchedWord;
}
@Override
protected Long compute() {
long count = 0L;
List<RecursiveTask<Long>> forks = new LinkedList<>();
for (Folder subFolder : folder.getSubFolders()) {
FolderSearchTask task = new FolderSearchTask(subFolder, searchedWord);
forks.add(task);
task.fork();
}
for (Document document : folder.getDocuments()) {
DocumentSearchTask task = new DocumentSearchTask(document, searchedWord);
forks.add(task);
task.fork();
}
for (RecursiveTask<Long> task : forks) {
count = count + task.join();
}
return count;
}
}
4.Fork/Join的基本用法
在Oracle的文檔里有關于Fork/Join的用法担平。基本思維就是給出是否需要將當前任務分成小任務的條件锭部。
if (當前這個任務工作量足夠小)
直接完成這個任務
else
將這個任務分解成兩個部分
分別觸發(fā)(invoke)這兩個子任務的執(zhí)行暂论,并等待結果
這個操作是發(fā)生在ForkJoinTask里的compute()方法里,不管是繼承RecursiveAction還是RecursiveTask 拌禾,都要重寫compute()取胎。
5.例子解析
1)定義任務
下面是一個關于計算斐波那契數列的例子,由于我嗎要計算指定長度斐波那契數列的和,所以我們的任務是需要有返回值的,所以繼承RecursiveTask闻蛀,定義返回值是Long型匪傍。而在compute()方法里,就是當斐波那契數列數字的個數小于10就直接返回這10個值的和觉痛,而大于10的時候役衡,就將這個任務分成兩個小任務。
public class FibonacciTask extends RecursiveTask<Long>
{
private static final long serialVersionUID = 1L;
private List<Long> mList = null;
private int size = 0;
public FibonacciTask(List<Long> list, int size)
{
mList = list;
this.size = size;
System.out.println("num:"+(++Num.num));
}
@Override
protected Long compute()
{
if (mList!=null)
{
if (mList.size()<10)
{
return cal(mList);
}else
{
List<List<Long>> lists = averageAssign(mList,2);
FibonacciTask fibonacciTask1 = new FibonacciTask(lists.get(0), 5);
FibonacciTask fibonacciTask2 = new FibonacciTask(lists.get(1), 5);
fibonacciTask1.fork();
fibonacciTask2.fork();
System.out.println("list0:"+lists.get(0).size()+" list1:"+lists.get(1).size());
return fibonacciTask1.join()+fibonacciTask2.join();
}
}
return null;
}
private long cal(List<Long> list)
{
long total = 0;
for (int i = 0; i < list.size(); i++)
{
total = list.get(i)+total;
}
return total;
}
/**
* 將一個list均分成n個list,主要通過偏移量來實現的
* @param source
* @return
*/
public static <T> List<List<T>> averageAssign(List<T> source,int n){
List<List<T>> result=new ArrayList<List<T>>();
int remaider=source.size()%n; //(先計算出余數)
int number=source.size()/n; //然后是商
int offset=0;//偏移量
for(int i=0;i<n;i++){
List<T> value=null;
if(remaider>0){
value=source.subList(i*number+offset, (i+1)*number+offset+1);
remaider--;
offset++;
}else{
value=source.subList(i*number+offset, (i+1)*number+offset);
}
result.add(value);
}
return result;
}
}
2)執(zhí)行任務
在執(zhí)行任務之前薪棒,我們先生成通過下面的createFibonacci方法生成一個斐波那契數列手蝎。然后通過ForkJoinPool 對象調用submit方法執(zhí)行FibonacciTask 任務。
/**
* @author LGY
* @time 2017-4-8
* @action
*/
public class TestFibonacci
{
public static void main(String[] args)
{
long startTime;
long stopTime;
long singleThreadTimes;
FibonacciTask fibonacciTask = new FibonacciTask(createFibonacci(0, 1, 100), 5);
ForkJoinPool forkJoinPool = new ForkJoinPool();
startTime = System.currentTimeMillis();
Future<Long> future = forkJoinPool.submit(fibonacciTask);
stopTime = System.currentTimeMillis();
singleThreadTimes = (stopTime - startTime);
System.out.println( " fork / join search took "
+ singleThreadTimes + "ms");
try
{
System.out.println("result:"+future.get());
} catch (Exception e)
{
e.printStackTrace();
}
}
/**
* @author LGY
* @action 生成指定長度斐波那契數列
* @time 2017-4-9
* @param first 第一個值
* @param secend 第一個值
* @param size 斐波那契數列長度
* @return
*/
private static List<Long> createFibonacci(long first , long secend , int size)
{
List<Long> list = new ArrayList<Long>();
long total = 0;
long startTime;
long stopTime;
long singleThreadTimes;
if (size == 1)
{
list.add(first);
}else if (size == 2)
{
list.add(first);
list.add(secend);
}else if (size>2) {
list.add(first);
list.add(secend);
for (int i = 0; i < size-2; i++)
{
list.add(list.get(i)+list.get(i+1));
}
}
System.out.print("[");
startTime = System.currentTimeMillis();
for (int i = 0; i < list.size(); i++)
{
System.out.print(list.get(i)+" ");
total = list.get(i)+total;
}
System.out.print("]"+"result:"+total);
stopTime = System.currentTimeMillis();
singleThreadTimes = (stopTime - startTime);
System.out.println();
System.out.println( " single thread search took "
+ singleThreadTimes + "ms");
return list;
}
}
6.總結
關于Fork/Join的Fork and Join: Java Can Excel at Painless Parallel Programming Too!一文的例子源碼可以到http://www.oracle.com/technetwork/articles/java/forkjoinsources-430155.zip下載俐芯,也可以在本文文章的源碼地址處獲取棵介。
在這個源碼的WordCounter類中,需要傳三個參數給main方法吧史,右擊WordCounter類邮辽,點擊Run As-->Run Configurations,選擇頁卡Arguments
輸入三個參數,第一個是文檔地址扣蜻,第二個是要計算出現了多少次的單詞逆巍,最后一個是要重復執(zhí)行任務的次數
7.參考文章
http://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html
http://www.oracle.com/technetwork/articles/java/fork-join-422606.html
Fork and Join: Java Can Excel at Painless Parallel Programming Too!譯文資料