一鸽斟、Fork-Join
java下多線程的開發(fā)可以是我們自己啟用多線程,線程池讼载,還可以使用forkjoin陶冷,forkjoin 可以讓我們不去了解諸如 Thread,Runnable 等相關的知識晌梨,只要遵循forkjoin 的開發(fā)模式桥嗤,就可以寫出很好的多線程并發(fā)程序, forkjoin 在處理分而治之這一類問題時非常的有用仔蝌。
1. 什么是 Fork-Join
Fork/Join框架是Java7提供的一個用于并行執(zhí)行任務的分治編程框架泛领,是一個把大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的框架敛惊,這種開發(fā)方法也叫分治編程师逸。分治編程可以極大地利用CPU資源,提高任務執(zhí)行的效率豆混,也是目前與多線程有關的前沿技術篓像。
2. 分治編程會遇到什么問題
分治的原理上面說了,就是切割大任務成小任務來完成皿伺。咦员辩,看起來好像也不難實現(xiàn)啊鸵鸥!為什么專門弄一個新的框架呢奠滑?我們先看一下,在不使用 Fork-Join 框架時妒穴,使用普通的線程池是怎么實現(xiàn)的宋税。我們往一個線程池提交了一個大任務,規(guī)定好任務切割的閥值讼油。由池中線程(假設是線程A)執(zhí)行大任務杰赛,發(fā)現(xiàn)大任務的大小大于閥值,于是切割成兩個子任務矮台,并調(diào)用 submit() 提交到線程池乏屯,得到返回的子任務的 Future根时。線程A就調(diào)用 返回的 Future 的 get() 方法阻塞等待子任務的執(zhí)行結果。池中的其他線程(除線程A外辰晕,線程A被阻塞)執(zhí)行兩個子任務蛤迎,然后判斷子任務的大小有沒有超過閥值,如果超過含友,則按照步驟2繼續(xù)切割替裆,否則,才計算并返回結果窘问。嘿辆童,好像一切都很美好。真的嗎南缓?別忘了, 每一個切割任務的線程(如線程A)都被阻塞了荧呐,直到其子任務完成汉形,才能繼續(xù)往下運行 。如果任務太大了倍阐,需要切割多次概疆,那么就會有多個線程被阻塞,性能將會急速下降峰搪。更糟糕的是岔冀,如果你的線程池的線程數(shù)量是有上限的,極可能會造成池中所有線程被阻塞概耻,線程池無法執(zhí)行任務使套。
@ Example1 普通線程池實現(xiàn)分治時阻塞的問題
來看一個例子,體會一下吧鞠柄!下面的例子是將 1+2+...+10 的任務 分割成相加的個數(shù)不能超過3(即兩端的差不能大于2)的多個子任務侦高。
//普通線程池下實現(xiàn)的分治效果測試
public class CommonThreadPoolTest {
//固定大小的線程池,池中線程數(shù)量為3
static ExecutorService fixPoolExcutor = Executors.newFixedThreadPool(3);
public static void main(String[] args) throws InterruptedException, ExecutionException {
//計算 1+2+...+10 的結果
CountTaskCallable task = new CountTaskCallable(1,10);
//提交主人翁
Future<Integer> future = fixPoolExcutor.submit(task);
System.out.println("計算的結果:"+future.get());
}
}
class CountTaskCallable implements Callable<Integer> {
//設置閥值為2
private static final int THRESHOLD = 2;
private int start;
private int end;
public CountTaskCallable(int start, int end) {
super();
this.start = start;
this.end = end;
}
@Override
public Integer call() throws Exception {
int sum = 0;
//判斷任務的大小是否超過閥值
boolean canCompute = (end - start) <= THRESHOLD;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
System.out.println("切割的任務:"+start+"加到"+end+" 執(zhí)行此任務的線程是 "+Thread.currentThread().getName());
int middle = (start + end) / 2;
CountTaskCallable leftTaskCallable = new CountTaskCallable(start, middle);
CountTaskCallable rightTaskCallable = new CountTaskCallable(middle + 1, end);
// 將子任務提交到線程池中
Future<Integer> leftFuture = CommonThreadPoolTest.fixPoolExcutor.submit(leftTaskCallable);
Future<Integer> rightFuture = CommonThreadPoolTest.fixPoolExcutor.submit(rightTaskCallable);
//阻塞等待子任務的執(zhí)行結果
int leftResult = leftFuture.get();
int rightResult = rightFuture.get();
// 合并子任務的執(zhí)行結果
sum = leftResult + rightResult;
}
return sum;
}
}
運行結果
切割的任務:1加到10 執(zhí)行此任務的線程是 pool-1-thread-1
切割的任務:1加到5 執(zhí)行此任務的線程是 pool-1-thread-2
切割的任務:6加到10 執(zhí)行此任務的線程是 pool-1-thread-3
池的線程只有三個厌杜,當任務分割了三次后奉呛,池中的線程也就都被阻塞了,無法再執(zhí)行任何任務夯尽,一直卡著動不了瞧壮。
3、Fork-Join 原理
4匙握、工作竊取
針對上面的問題咆槽,F(xiàn)ork-Join 框架使用了 “工作竊取(work-stealing)”算法圈纺。工作竊嚷拊巍(work-stealing)算法是指某個線程從其他隊列里竊取任務來執(zhí)行济欢。看一下《Java 并發(fā)編程的藝術》對工作竊取算法的解釋:
使用工作竊取算法有什么優(yōu)勢呢小渊?假如我們需要做一個比較大的任務法褥,我們可以把這個任務分割為若干互不依賴的子任務,為了減少線程間的競爭酬屉,于是把這些子任務分別放到不同的隊列里半等,并為每個隊列創(chuàng)建一個單獨的線程來執(zhí)行隊列里的任務,線程和隊列一一對應呐萨,比如A線程負責處理A隊列里的任務杀饵。但是有的線程會先把自己隊列里的任務干完,而其他線程對應的隊列里還有任務等待處理谬擦。干完活的線程與其等著切距,不如去幫其他線程干活,于是它就去其他線程的隊列里竊取一個任務來執(zhí)行惨远。而在這時它們會訪問同一個隊列谜悟,所以為了減少竊取任務線程和被竊取任務線程之間的競爭,通常會使用雙端隊列北秽,被竊取任務線程永遠從雙端隊列的頭部拿任務執(zhí)行葡幸,而竊取任務的線程永遠從雙端隊列的尾部拿任務執(zhí)行。
Fork-Join 框架使用工作竊取算法對分治編程實現(xiàn)的描述:
下面是 ForkJoin 框架對分治編程實現(xiàn)的過程的描述贺氓,增加對工作竊取算法的理解蔚叨。在下面的內(nèi)容提供了一個分治的例子,可結合這部分描述一起看辙培。
(1)Fork-Join 框架的線程池ForkJoinPool 的任務分為“外部任務” 和 “內(nèi)部任務”蔑水。
(2)“外部任務”是放在 ForkJoinPool 的全局隊列里;
(3)ForkJoinPool 池中的每個線程都維護著一個內(nèi)部隊列扬蕊,用于存放“內(nèi)部任務”肤粱。
(4)線程切割任務得到的子任務就會作為“內(nèi)部任務”放到內(nèi)部隊列中。
(5)當此線程要想要拿到子任務的計算結果時厨相,先判斷子任務沒有完成领曼,如果沒有完成,則再判斷子任務有沒有被其他線程“竊取”蛮穿,一旦子任務被竊取了則去執(zhí)行本線程“內(nèi)部隊列”的其他任務庶骄,或者掃描其他的任務隊列,竊取任務践磅,如果子任務沒有被竊取单刁,則由本線程來完成。
(6)最后,當線程完成了其“內(nèi)部任務”羔飞,處于空閑的狀態(tài)時肺樟,就會去掃描其他的任務隊列,竊取任務逻淌,盡可能地
總之么伯,F(xiàn)orkJoin線程在等待一個任務的完成時,要么自己來完成這個任務卡儒,或者在其他線程竊取了這個任務的情況下田柔,去執(zhí)行其他任務,是不會阻塞等待骨望,從而避免浪費資源硬爆,除非是所有任務隊列都為空。
工作竊取算法的優(yōu)點:
(1)線程是不會因為等待某個子任務的完成或者沒有內(nèi)部任務要執(zhí)行而被阻塞等待擎鸠、掛起缀磕,而是會掃描所有的隊列,竊取任務劣光,直到所有隊列都為空時袜蚕,才會被掛起。 就如上面所說的赎线。
(2)Fork-Join 框架在多CPU的環(huán)境下廷没,能提供很好的并行性能糊饱。在使用普通線程池的情況下垂寥,當CPU不再是性能瓶頸時,能并行地運行多個線程另锋,然而卻因為要互斥訪問一個任務隊列而導致性能提高不上去滞项。而 Fork-Join 框架為每個線程為維護著一個內(nèi)部任務隊列,以及一個全局的任務隊列夭坪,而且任務隊列都是雙向隊列文判,可從首尾兩端來獲取任務,極大地減少了競爭的可能性室梅,提高并行的性能戏仓。
5、Fork/Join 實戰(zhàn)
1)Fork/Join 使用的標準范式
我們要使用 ForkJoin 框架亡鼠,必須首先創(chuàng)建一個 ForkJoin 任務赏殃。它提供在任務中執(zhí)行 fork 和 join 的操作機制,通常我們不直接繼承 ForkjoinTask 類间涵,只需要直接繼承其子類仁热。
(1)RecursiveAction,用于沒有返回結果的任務
(2) RecursiveTask勾哩,用于有返回值的任務
task 要通過 ForkJoinPool 來執(zhí)行抗蠢,使用 submit 或 invoke 提交举哟,兩者的區(qū)別是:invoke 是同步執(zhí)行,調(diào)用之后需要等待任務完成迅矛,才能執(zhí)行后面的代碼妨猩;submit 是異步執(zhí)行。
join()和 get 方法當任務完成的時候返回計算結果诬乞。
在我們自己實現(xiàn)的 compute 方法里册赛,首先需要判斷任務是否足夠小,如果足夠小就直接執(zhí)行任務震嫉。如果不足夠小森瘪,就必須分割成兩個子任務,每個子任務在調(diào)用 invokeAll 方法時票堵,又會進入 compute 方法扼睬,看看當前子任務是否需要繼續(xù)分割成孫任務,如果不需要繼續(xù)分割悴势,則執(zhí)行當前子任務并返回結果窗宇。使用 join方法會等待子任務執(zhí)行完并得到其結果。
2)特纤、Fork/Join 的同步用法和異步用法
同步示例
/**
* forkjoin實現(xiàn)的歸并排序
*/
public class FkSort {
private static class SumTask extends RecursiveTask<int[]>{
private final static int THRESHOLD = 2;
private int[] src;
public SumTask(int[] src) {
this.src = src;
}
@Override
protected int[] compute() {
if(src.length<=THRESHOLD){
return InsertionSort.sort(src);
}else{
//fromIndex....mid.....toIndex
int mid = src.length / 2;
SumTask leftTask = new SumTask(Arrays.copyOfRange(src, 0, mid));
SumTask rightTask = new SumTask(Arrays.copyOfRange(src, mid, src.length));
invokeAll(leftTask,rightTask);
int[] leftResult = leftTask.join();
int[] rightResult = rightTask.join();
return MergeSort.merge(leftResult,rightResult);
}
}
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
int[] src = MakeArray.makeArray();
SumTask innerFind = new SumTask(src);
long start = System.currentTimeMillis();
/*同步提交*/
int[] invoke = pool.invoke(innerFind);
// for(int number:invoke){
// System.out.println(number);
// }
System.out.println(" spend time:"+(System.currentTimeMillis()-start)+"ms");
}
}
異步示例
/**
*類說明:遍歷指定目錄(含子目錄)找尋指定類型文件
*/
public class FindDirsFiles extends RecursiveAction {
private File path;
public FindDirsFiles(File path) {
this.path = path;
}
@Override
protected void compute() {
List<FindDirsFiles> subTasks = new ArrayList<>();
File[] files = path.listFiles();
if (files!=null){
for (File file : files) {
if (file.isDirectory()) {
// 對每個子目錄都新建一個子任務军俊。
subTasks.add(new FindDirsFiles(file));
} else {
// 遇到文件,檢查捧存。
if (file.getAbsolutePath().endsWith("txt")){
System.out.println("文件:" + file.getAbsolutePath());
}
}
}
if (!subTasks.isEmpty()) {
// 在當前的 ForkJoinPool 上調(diào)度所有的子任務粪躬。
for (FindDirsFiles subTask : invokeAll(subTasks)) {
subTask.join();
}
}
}
}
public static void main(String [] args){
try {
// 用一個 ForkJoinPool 實例調(diào)度總任務
ForkJoinPool pool = new ForkJoinPool();
FindDirsFiles task = new FindDirsFiles(new File("F:/"));
/*異步提交*/
pool.execute(task);
/*主線程做自己的業(yè)務工作*/
System.out.println("Task is Running......");
Thread.sleep(1);
int otherWork = 0;
for(int i=0;i<100;i++){
otherWork = otherWork+i;
}
System.out.println("Main Thread done sth......,otherWork="
+otherWork);
//task.join();//阻塞方法
System.out.println("Task end");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
二、CountDownLatch
閉鎖昔穴,CountDownLatch 這個類能夠使一個線程等待其他線程完成各自的工作后再執(zhí)行镰官。例如,應用程序的主線程希望在負責啟動框架服務的線程已經(jīng)啟動所有的框架服務之后再執(zhí)行吗货。
CountDownLatch 是通過一個計數(shù)器來實現(xiàn)的泳唠,計數(shù)器的初始值為初始任務的數(shù)量。每當完成了一個任務后宙搬,計數(shù)器的值就會減 1(CountDownLatch.countDown()方法)笨腥。當計數(shù)器值到達 0 時,它表示所有的已經(jīng)完成了任務勇垛,然后在閉鎖上等待 CountDownLatch.await()方法的線程就可以恢復執(zhí)行任務脖母。
應用場景:
實現(xiàn)最大的并行性:有時我們想同時啟動多個線程,實現(xiàn)最大程度的并行性窥摄。例如镶奉,我們想測試一個單例類。如果我們創(chuàng)建一個初始計數(shù)為 1 的CountDownLatch,并讓所有線程都在這個鎖上等待哨苛,那么我們可以很輕松地完成測試鸽凶。我們只需調(diào)用 一次 countDown()方法就可以讓所有的等待線程同時恢復執(zhí)行。
開始執(zhí)行前等待 n 個線程完成各自任務:例如應用程序啟動類要確保在處理用戶請求前建峭,所有 N 個外部系統(tǒng)已經(jīng)啟動和運行了玻侥,例如處理 excel 中多個表單。
參見代碼包 cn.enjoyedu.ch2.tools 下
三亿蒸、CyclicBarrier
CyclicBarrier 的字面意思是可循環(huán)使用(Cyclic)的屏障(Barrier)凑兰。它要做的事情是,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞边锁,直到最后一個線程到達屏障時姑食,屏障才會開門,所有被屏障攔截的線程才會繼續(xù)運行茅坛。CyclicBarrier 默認的構造方法是 CyclicBarrier(int parties)音半,其參數(shù)表示屏障攔截的線程數(shù)量,每個線程調(diào)用 await 方法告訴 CyclicBarrier 我已經(jīng)到達了屏障贡蓖,然后當前線程被阻塞曹鸠。
CyclicBarrier 還提供一個更高級的構造函數(shù) CyclicBarrie(r int parties,RunnablebarrierAction)斥铺,用于在線程到達屏障時彻桃,優(yōu)先執(zhí)行 barrierAction,方便處理更復雜的業(yè)務場景晾蜘。
CyclicBarrier 可以用于多線程計算數(shù)據(jù)邻眷,最后合并計算結果的場景。
參見代碼包 cn.enjoyedu.ch2.tools 下
四笙纤、CountDownLatch 和 CyclicBarrier 辨析
CountDownLatch 的計數(shù)器只能使用一次耗溜,而 CyclicBarrier 的計數(shù)器可以反復使用组力。
CountDownLatch.await 一般阻塞工作線程省容,所有的進行預備工作的線程執(zhí)行countDown,而 CyclicBarrier 通過工作線程調(diào)用 await 從而自行阻塞燎字,直到所有工作線程達到指定屏障腥椒,再大家一起往下走。
在控制多個線程同時運行上候衍,CountDownLatch 可以不限線程數(shù)量笼蛛,而CyclicBarrier 是固定線程數(shù)。同時蛉鹿,CyclicBarrier 還可以提供一個 barrierAction滨砍,合并多線程計算結果。
五、Semaphore
Semaphore(信號量)是用來控制同時訪問特定資源的線程數(shù)量惋戏,它通過協(xié)調(diào)各個線程领追,以保證合理的使用公共資源。應用場景 Semaphore 可以用于做流量控制响逢,特別是公用資源有限的應用場景绒窑,比如數(shù)據(jù)庫連接。假如有一個需求舔亭,要讀取幾萬個文件的數(shù)據(jù)些膨,因為都是 IO 密集型任務,我們可以啟動幾十個線程并發(fā)地讀取钦铺,但是如果讀到內(nèi)存后订雾,還需要存儲到數(shù)據(jù)庫中,而數(shù)據(jù)庫的連接數(shù)只有 10 個矛洞,這時我們必須控制只有 10 個線程同時獲取數(shù)據(jù)庫連接保存數(shù)據(jù)葬燎,否則會報錯無法獲取數(shù)據(jù)庫連接。這個時候缚甩,就可以使用 Semaphore 來做流量控制谱净。。Semaphore 的構造方法 Semaphore(int permits)接受一個整型的數(shù)字擅威,表示可用的許可證數(shù)量壕探。Semaphore 的用法也很簡單,首先線程使用 Semaphore的 acquire()方法獲取一個許可證郊丛,使用完之后調(diào)用 release()方法歸還許可證李请。還可以用 tryAcquire()方法嘗試獲取許可證。Semaphore 還提供一些其他方法厉熟,具體如下导盅。
?intavailablePermits():返回此信號量中當前可用的許可證數(shù)。
?intgetQueueLength():返回正在等待獲取許可證的線程數(shù)揍瑟。
?booleanhasQueuedThreads():是否有線程正在等待獲取許可證白翻。
?void reducePermit(s int reduction):減少 reduction 個許可證,是個 protected方法绢片。
?Collection getQueuedThreads():返回所有等待獲取許可證的線程集合滤馍,是個 protected 方法。
1底循、用 Semaphore 實現(xiàn)數(shù)據(jù)庫連接池
參見代碼,包 cn.enjoyedu.ch2.tools.semaphore 下
2巢株、Semaphore 注意事項
參見代碼類 cn.enjoyedu.ch2.tools.semaphore. DBPoolNoUseless 下
六、Exchange
Exchanger(交換者)是一個用于線程間協(xié)作的工具類熙涤。Exchanger 用于進行線程間的數(shù)據(jù)交換阁苞。它提供一個同步點困檩,在這個同步點,兩個線程可以交換彼此的數(shù)據(jù)那槽。這兩個線程通過 exchange 方法交換數(shù)據(jù)窗看,如果第一個線程先執(zhí)行exchange()方法,它會一直等待第二個線程也執(zhí)行 exchange 方法倦炒,當兩個線程都到達同步點時显沈,這兩個線程就可以交換數(shù)據(jù),將本線程生產(chǎn)出來的數(shù)據(jù)傳遞給對方逢唤。
參見代碼包 cn.enjoyedu.ch2.tools 下
七拉讯、Callable、Future 和 FutureTask
Runnable 是一個接口鳖藕,在它里面只聲明了一個 run()方法魔慷,由于 run()方法返回值為 void 類型,所以在執(zhí)行完任務之后無法返回任何結果著恩。
Callable 位于 java.util.concurrent 包下院尔,它也是一個接口,在它里面也只聲明了一個方法喉誊,只不過這個方法叫做 call()邀摆,這是一個泛型接口,call()函數(shù)返回的類型就是傳遞進來的 V 類型伍茄。
Future 就是對于具體的 Runnable 或者 Callable 任務的執(zhí)行結果進行取消栋盹、查詢是否完成、獲取結果敷矫。必要時可以通過 get 方法獲取執(zhí)行結果例获,該方法會阻塞直到任務返回結果。
因為 Future 只是一個接口曹仗,所以是無法直接用來創(chuàng)建對象使用的榨汤,因此就有了下面的 FutureTask。
FutureTask 類實現(xiàn)了 RunnableFuture 接口怎茫,RunnableFuture 繼承了 Runnable接口和 Future 接口收壕,而 FutureTask 實現(xiàn)了 RunnableFuture 接口。所以它既可以作為 Runnable 被線程執(zhí)行遭居,又可以作為 Future 得到 Callable 的返回值啼器。
因此我們通過一個線程運行 Callable旬渠,但是 Thread 不支持構造方法中傳遞Callable 的實例俱萍,所以我們需要通過 FutureTask 把一個 Callable 包裝成 Runnable,然后再通過這個 FutureTask 拿到Callable 運行后的返回值告丢。
要 new 一個 FutureTask 的實例枪蘑,有兩種方法
參見代碼包 cn.enjoyedu.ch2.tools 下