寫在前面
在JDK中委可,提供了這樣一種功能:它能夠?qū)碗s的邏輯拆分成一個個簡單的邏輯來并行執(zhí)行,待每個并行執(zhí)行的邏輯執(zhí)行完成后腊嗡,再將各個結果進行匯總着倾,得出最終的結果數(shù)據(jù)。有點像Hadoop中的MapReduce燕少。
ForkJoin是由JDK1.7之后提供的多線程并發(fā)處理框架卡者。ForkJoin框架的基本思想是分而治之。什么是分而治之客们?分而治之就是將一個復雜的計算崇决,按照設定的閾值分解成多個計算材诽,然后將各個計算結果進行匯總。相應的恒傻,F(xiàn)orkJoin將復雜的計算當做一個任務岳守,而分解的多個計算則是當做一個個子任務來并行執(zhí)行。
注:文章已同步收錄到:https://github.com/sunshinelyz/technology-binghe 和 https://gitee.com/binghe001/technology-binghe 碌冶。如果文件對你有點幫助,別忘記給個Star哦涝缝!如果小伙伴們有任何疑問扑庞,都可以加我微信【sun_shine_lyz】進行交流哦!
Java并發(fā)編程的發(fā)展
對于Java語言來說拒逮,生來就支持多線程并發(fā)編程罐氨,在并發(fā)編程領域也是在不斷發(fā)展的。Java在其發(fā)展過程中對并發(fā)編程的支持越來越完善也正好印證了這一點滩援。
- Java 1 支持thread栅隐,synchronized。
- Java 5 引入了 thread pools玩徊, blocking queues, concurrent collections租悄,locks, condition queues。
- Java 7 加入了fork-join庫恩袱。
- Java 8 加入了 parallel streams泣棋。
并發(fā)與并行
并發(fā)和并行在本質(zhì)上還是有所區(qū)別的。
并發(fā)
并發(fā)指的是在同一時刻畔塔,只有一個線程能夠獲取到CPU執(zhí)行任務潭辈,而多個線程被快速的輪換執(zhí)行,這就使得在宏觀上具有多個線程同時執(zhí)行的效果澈吨,并發(fā)不是真正的同時執(zhí)行把敢,并發(fā)可以使用下圖表示。
并行
并行指的是無論何時谅辣,多個線程都是在多個CPU核心上同時執(zhí)行的修赞,是真正的同時執(zhí)行。
分治法
基本思想
把一個規(guī)模大的問題劃分為規(guī)模較小的子問題桑阶,然后分而治之榔组,最后合并子問題的解得到原問題的解。
步驟
①分割原問題联逻;
②求解子問題搓扯;
③合并子問題的解為原問題的解。
我們可以使用如下偽代碼來表示這個步驟包归。
if(任務很邢峭啤){
直接計算得到結果
}else{
分拆成N個子任務
調(diào)用子任務的fork()進行計算
調(diào)用子任務的join()合并計算結果
}
在分治法中,子問題一般是相互獨立的,因此换可,經(jīng)常通過遞歸調(diào)用算法來求解子問題椎椰。
典型應用
二分搜索
大整數(shù)乘法
Strassen矩陣乘法
棋盤覆蓋
合并排序
快速排序
線性時間選擇
漢諾塔
ForkJoin并行處理框架
ForkJoin框架概述
Java 1.7 引入了一種新的并發(fā)框架—— Fork/Join Framework,主要用于實現(xiàn)“分而治之”的算法沾鳄,特別是分治之后遞歸調(diào)用的函數(shù)慨飘。
ForkJoin框架的本質(zhì)是一個用于并行執(zhí)行任務的框架, 能夠把一個大任務分割成若干個小任務译荞,最終匯總每個小任務結果后得到大任務的計算結果瓤的。在Java中,F(xiàn)orkJoin框架與ThreadPool共存吞歼,并不是要替換ThreadPool
其實圈膏,在Java 8中引入的并行流計算,內(nèi)部就是采用的ForkJoinPool來實現(xiàn)的篙骡。例如稽坤,下面使用并行流實現(xiàn)打印數(shù)組元組的程序。
public class SumArray {
public static void main(String[] args){
List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9);
numberList.parallelStream().forEach(System.out::println);
}
}
這段代碼的背后就使用到了ForkJoinPool糯俗。
說到這里尿褪,可能有讀者會問:可以使用線程池的ThreadPoolExecutor來實現(xiàn)啊得湘?為什么要使用ForkJoinPool懊6唷?ForkJoinPool是個什么鬼昂龉簟天揖?! 接下來跪帝,我們就來回答這個問題今膊。
ForkJoin框架原理
ForkJoin框架是從jdk1.7中引入的新特性,它同ThreadPoolExecutor一樣,也實現(xiàn)了Executor和ExecutorService接口伞剑。它使用了一個無限隊列來保存需要執(zhí)行的任務斑唬,而線程的數(shù)量則是通過構造函數(shù)傳入,如果沒有向構造函數(shù)中傳入指定的線程數(shù)量黎泣,那么當前計算機可用的CPU數(shù)量會被設置為線程數(shù)量作為默認值恕刘。
ForkJoinPool主要使用分治法(Divide-and-Conquer Algorithm)來解決問題。典型的應用比如快速排序算法抒倚。這里的要點在于褐着,F(xiàn)orkJoinPool能夠使用相對較少的線程來處理大量的任務。比如要對1000萬個數(shù)據(jù)進行排序托呕,那么會將這個任務分割成兩個500萬的排序任務和一個針對這兩組500萬數(shù)據(jù)的合并任務含蓉。以此類推频敛,對于500萬的數(shù)據(jù)也會做出同樣的分割處理,到最后會設置一個閾值來規(guī)定當數(shù)據(jù)規(guī)模到多少時馅扣,停止這樣的分割處理斟赚。比如,當元素的數(shù)量小于10時差油,會停止分割拗军,轉而使用插入排序?qū)λ鼈冞M行排序。那么到最后蓄喇,所有的任務加起來會有大概200萬+個发侵。問題的關鍵在于,對于一個任務而言公罕,只有當它所有的子任務完成之后,它才能夠被執(zhí)行耀销。
所以當使用ThreadPoolExecutor時楼眷,使用分治法會存在問題,因為ThreadPoolExecutor中的線程無法向任務隊列中再添加一個任務并在等待該任務完成之后再繼續(xù)執(zhí)行熊尉。而使用ForkJoinPool就能夠解決這個問題罐柳,它就能夠讓其中的線程創(chuàng)建新的任務,并掛起當前的任務狰住,此時線程就能夠從隊列中選擇子任務執(zhí)行张吉。
那么使用ThreadPoolExecutor或者ForkJoinPool,性能上會有什么差異呢催植?
首先肮蛹,使用ForkJoinPool能夠使用數(shù)量有限的線程來完成非常多的具有父子關系的任務,比如使用4個線程來完成超過200萬個任務创南。但是伦忠,使用ThreadPoolExecutor時,是不可能完成的稿辙,因為ThreadPoolExecutor中的Thread無法選擇優(yōu)先執(zhí)行子任務昆码,需要完成200萬個具有父子關系的任務時,也需要200萬個線程邻储,很顯然這是不可行的赋咽,也是很不合理的!吨娜!
工作竊取算法
假如我們需要做一個比較大的任務脓匿,我們可以把這個任務分割為若干互不依賴的子任務,為了減少線程間的競爭宦赠,于是把這些子任務分別放到不同的隊列里亦镶,并為每個隊列創(chuàng)建一個單獨的線程來執(zhí)行隊列里的任務日月,線程和隊列一一對應,比如A線程負責處理A隊列里的任務缤骨。但是有的線程會先把自己隊列里的任務干完爱咬,而其他線程對應的隊列里還有任務等待處理。干完活的線程與其等著绊起,不如去幫其他線程干活精拟,于是它就去其他線程的隊列里竊取一個任務來執(zhí)行。而在這時它們會訪問同一個隊列虱歪,所以為了減少竊取任務線程和被竊取任務線程之間的競爭蜂绎,通常會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執(zhí)行笋鄙,而竊取任務的線程永遠從雙端隊列的尾部拿任務執(zhí)行师枣。
工作竊取算法的優(yōu)點:
充分利用線程進行并行計算,并減少了線程間的競爭萧落。
工作竊取算法的缺點:
在某些情況下還是存在競爭践美,比如雙端隊列里只有一個任務時。并且該算法會消耗更多的系統(tǒng)資源找岖,比如創(chuàng)建多個線程和多個雙端隊列陨倡。
Fork/Join框架局限性:
對于Fork/Join框架而言,當一個任務正在等待它使用Join操作創(chuàng)建的子任務結束時许布,執(zhí)行這個任務的工作線程查找其他未被執(zhí)行的任務兴革,并開始執(zhí)行這些未被執(zhí)行的任務,通過這種方式蜜唾,線程充分利用它們的運行時間來提高應用程序的性能杂曲。為了實現(xiàn)這個目標,F(xiàn)ork/Join框架執(zhí)行的任務有一些局限性袁余。
(1)任務只能使用Fork和Join操作來進行同步機制解阅,如果使用了其他同步機制,則在同步操作時泌霍,工作線程就不能執(zhí)行其他任務了货抄。比如,在Fork/Join框架中朱转,使任務進行了睡眠蟹地,那么,在睡眠期間內(nèi)藤为,正在執(zhí)行這個任務的工作線程將不會執(zhí)行其他任務了怪与。
(2)在Fork/Join框架中,所拆分的任務不應該去執(zhí)行IO操作缅疟,比如:讀寫數(shù)據(jù)文件分别。
(3)任務不能拋出檢查異常遍愿,必須通過必要的代碼來出來這些異常。
ForkJoin框架的實現(xiàn)
ForkJoin框架中一些重要的類如下所示耘斩。
ForkJoinPool 框架中涉及的主要類如下所示沼填。
1.ForkJoinPool類
實現(xiàn)了ForkJoin框架中的線程池,由類圖可以看出括授,F(xiàn)orkJoinPool類實現(xiàn)了線程池的Executor接口坞笙。
我們也可以從下圖中看出ForkJoinPool的類圖關系。
其中荚虚,可以使用Executors.newWorkStealPool()方法創(chuàng)建ForkJoinPool薛夜。
ForkJoinPool中提供了如下提交任務的方法。
public void execute(ForkJoinTask<?> task)
public void execute(Runnable task)
public <T> T invoke(ForkJoinTask<T> task)
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
public <T> ForkJoinTask<T> submit(Callable<T> task)
public <T> ForkJoinTask<T> submit(Runnable task, T result)
public ForkJoinTask<?> submit(Runnable task)
2.ForkJoinWorkerThread類
實現(xiàn)ForkJoin框架中的線程版述。
3.ForkJoinTask<V>類
ForkJoinTask封裝了數(shù)據(jù)及其相應的計算梯澜,并且支持細粒度的數(shù)據(jù)并行。ForkJoinTask比線程要輕量渴析,F(xiàn)orkJoinPool中少量工作線程能夠運行大量的ForkJoinTask晚伙。
ForkJoinTask類中主要包括兩個方法fork()和join(),分別實現(xiàn)任務的分拆與合并檬某。
fork()方法類似于Thread.start()撬腾,但是它并不立即執(zhí)行任務螟蝙,而是將任務放入工作隊列中恢恼。跟Thread.join()方法不同,F(xiàn)orkJoinTask的join()方法并不簡單的阻塞線程胰默,而是利用工作線程運行其他任務场斑,當一個工作線程中調(diào)用join(),它將處理其他任務牵署,直到注意到目標子任務已經(jīng)完成漏隐。
我們可以使用下圖來表示這個過程。
ForkJoinTask有3個子類:
RecursiveAction:無返回值的任務奴迅。
RecursiveTask:有返回值的任務青责。
CountedCompleter:完成任務后將觸發(fā)其他任務。
4.RecursiveTask<V> 類
有返回結果的ForkJoinTask實現(xiàn)Callable取具。
5.RecursiveAction類
無返回結果的ForkJoinTask實現(xiàn)Runnable脖隶。
6.CountedCompleter<T> 類
在任務完成執(zhí)行后會觸發(fā)執(zhí)行一個自定義的鉤子函數(shù)。
ForkJoin示例程序
package io.binghe.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
@Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {
public static final int threshold = 2;
private int start;
private int end;
public ForkJoinTaskExample(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
//如果任務足夠小就計算任務
boolean canCompute = (end - start) <= threshold;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 如果任務大于閾值暇检,就分裂成兩個子任務計算
int middle = (start + end) / 2;
ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);
// 執(zhí)行子任務
leftTask.fork();
rightTask.fork();
// 等待任務執(zhí)行結束合并其結果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
// 合并子任務
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkjoinPool = new ForkJoinPool();
//生成一個計算任務产阱,計算1+2+3+4
ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);
//執(zhí)行一個任務
Future<Integer> result = forkjoinPool.submit(task);
try {
log.info("result:{}", result.get());
} catch (Exception e) {
log.error("exception", e);
}
}
}