-- ForkJoinPool --
? ForkJoinPool與ThreadPool的不同是:ForkJoinPool中的任務是有同步關(guān)系的包吝,某個任務必須在其它的某個/某些任務完成后才能進行进苍,甚至需要使用其它任務的計算結(jié)果涣达。典型的例子是并發(fā)的快速排序以及并發(fā)的查找最大值掩宜。
? 提交給ForkJoinPool的任務是ForkJoinTask犹赖。提交的時候芭碍,F(xiàn)orkJoinTask和ForkJoinPool之間記錄雙向指針佳恬。ForkJoinTask中可以創(chuàng)建新的ForkJoinTask捏境,并通過fork()提交給所在的ForkJoinPool。所有的ForkJoinTask都可以在提交后使用join()方法獲取計算結(jié)果毁葱。join()方法是線程阻塞的垫言。
? ForkJoinPool的實現(xiàn)極為復雜,其設(shè)計目標是盡量使得所有線程都在工作倾剿,并且減少線程之間的競爭筷频。由于核心線程數(shù)大于等于CPU核數(shù),這就意味著CPU資源被充分利用前痘。大致的實現(xiàn)方法是:
* 一個數(shù)組(workQueues)凛捏,長度大于等于CPU核數(shù),最大為64芹缔,數(shù)組成員是任務隊列坯癣。偶數(shù)位置放外部提交的任務,奇數(shù)位置放fork()出來的子任務最欠。偶數(shù)位置的任務隊列不配線程示罗,它們是shared的蓬网;奇數(shù)位置的任務隊列配一個專屬的worker線程。
* 任務隊列是一個Deque鹉勒。線程取自己隊列里的任務來執(zhí)行時帆锋,用的是棧的方式,后進先出禽额;線程執(zhí)行別人的隊列里的任務時(叫work stealing)锯厢,用的是隊的方式,先進先出脯倒。
* 每個worker線程每一輪的工作是:
- 從workQueues的一個隨機位置開始往后找实辑,找到第一個可以從隊列中取出來的任務(通常是偷別人一個),執(zhí)行這個取出來的任務藻丢。(外部提交的任務和fork()出來的子任務用奇偶位置間隔就是為了隨機找任務的時候不會落入一大片同類任務的區(qū)域剪撬。)
- 執(zhí)行完這第一個(通常是偷來的)任務之后,回到自己隊列悠反,不斷pop()任務出來執(zhí)行残黑,直到隊列為空。
* 一個外部任務提交到哪個位置取決于調(diào)用提交任務的線程斋否。每個線程維護一個數(shù)梨水,只要這個數(shù)不變,每次提交任務都到同一個任務隊列茵臭;如果那個隊列太忙了疫诽,就會提交失敗,這時候就把數(shù)變一下旦委,重新提交奇徒,直到成功,將來的任務也都會提交到這個新地方缨硝。
* 已有任務fork()出來的子任務放到哪里摩钙,取決于它被哪個worker線程執(zhí)行:它總被放到執(zhí)行這個任務的線程所擁有的任務隊列中。這意味著:
- 外部提交的任務(偶數(shù)位置上的)追葡,總是會用偷的方式執(zhí)行的腺律。它fork()出來的任務放在哪里取決于是被哪個線程偷的奕短。
- 子任務(奇數(shù)位置上的)宜肉,如果被自己的線程執(zhí)行了,fork()出來的子任務還在自己的任務隊列里翎碑。如果被別的線程偷去執(zhí)行了谬返,fork()出來的任務就到哪個偷竊線程的隊列里去了。
* 新的任務隊列及其對應的worker線程的產(chǎn)生是被動的方式:每當有新任務來的時候日杈,都會調(diào)用signalWork()方法遣铝。這個方法檢查當前的線程是否夠用佑刷,如果不夠用,就釋放一個閑置線程(idle worker)酿炸;如果沒有閑置線程瘫絮,就創(chuàng)建一個新線程。
* 新線程創(chuàng)建后填硕,配給它一個新建的任務隊列麦萤,然后在workQueues里找一個合適的位置(奇數(shù)位置)把任務隊列放進去。這個位置不是隨機的扁眯,而是算出來的:從上一次新建隊列的位置向后移動一個固定的偏移量壮莹。
* join()的執(zhí)行(注意它是阻塞的):如果調(diào)用時該任務已經(jīng)完成,則返回結(jié)果姻檀;如果沒有命满,則試圖在當前線程立即完成它;如果做不到绣版,必須等待胶台,就盡量讓當前線程在等待期間干點別的,充分利用它所在CPU核的計算資源杂抽。具體方法是:
- 如果發(fā)起調(diào)用的ForkJoinTask和目標任務不在同一個隊列概作,則試圖把目標任務從隊列中unpush()出來(只有在隊的最上面才能被unpush())執(zhí)行。如果不成功默怨,就block讯榕,等待目標任務完成。
- 如果發(fā)起調(diào)用的ForkJoinTask和目標任務在同一個隊列匙睹,試著把目標任務從隊列里remove出來執(zhí)行愚屁。如果不成功或者隊列為空,說明目標被偷了痕檬,這時就去幫偷取者執(zhí)行一個任務霎槐。如果執(zhí)行完回來目標任務還沒有完成,就block梦谜,block之前讓線程池釋放一個空閑線程或者創(chuàng)建一個線程丘跌,作為補償。這個補償線程由于自己的任務隊列為空唁桩,它的工作就是偷別人的任務來干闭树,也就是說它“補償”了自己block期間損失的算力。
? ForkJoinPool的static{}塊中構(gòu)造了一個對象荒澡,可以用ForkJoinPool.commonPool()取得报辱。通常用這個ForkJoinPool就可以了,不用自己new()一個出來用单山,也不太需要再繼承ForkJoinPool做更復雜的實現(xiàn)了碍现。
? ForkJoinPool的主要功能是并行實現(xiàn)分治算法幅疼。分治算法的本質(zhì)是把任務不斷切小,再把結(jié)果逐層歸納昼接。這就意味著并行可以使得每一層的計算同時執(zhí)行爽篷,但是層的深度是不能改變的,也就是說慢睡,算法的時間復雜度取決于層數(shù)狼忱。以歸并排序為例,在單核上一睁,復雜度是O(NlogN)钻弄,其中N是每一輪的計算量,logN是輪數(shù)者吁。并行計算是每一輪都可以以O(shè)(1)的速度完成窘俺,只要并發(fā)度大于N。但是輪數(shù)logN是不可能被縮減的复凳,因為輪次之間有依賴關(guān)系瘤泪,上一輪的結(jié)果歸納需要下一輪的計算結(jié)果作為輸入,也就是說輪次之間必須是線性完成的育八。這在ForkJoinPool中表現(xiàn)為:任務開始后对途,某個線程把fork()后的兩個子任務以入棧的方式壓入自己的隊列,之后再取出棧頂計算髓棋,又會fork()出兩個子任務壓棧实檀,一直下去。這個過程中一旦有別的線程來偷任務按声,就會有某個子任務以及它的子任務們被轉(zhuǎn)移到另一個任務隊列中完成膳犹。如果有p個線程,p大于等于CPU核數(shù)签则,由于互相偷须床,最終就會有p個任務隊列在并發(fā)執(zhí)行。但是join()的時候渐裂,發(fā)起join()的線程無論是自己努力去完成必須完成的子任務還是block住等其它的線程完成所有的子任務豺旬,所需要的時間都是一樣長的,就是logL次子任務計算柒凉,L是它往下的層數(shù)族阅。
? 從這個例子可以看出,F(xiàn)orkJoinPool的設(shè)計思想是每個線程維護自己的任務隊列扛拨,以減少資源競爭耘分,大家可以獨立完成自己的任務举塔;再通過偷的方式將熱點地區(qū)的計算任務散布到其它線程绑警,實現(xiàn)負載均衡求泰。最終這個算法實現(xiàn)了:忙的時候大家各干各的,互不干擾(因為每一輪每個worker要把自己的任務隊列執(zhí)行完计盒,才能進入下一輪偷別人一個)渴频,并且總是把已經(jīng)開始執(zhí)行的外部任務執(zhí)行完成后再去接新的外部任務來做(外部任務沒有配線程,因此忙的時候所有的CPU都在執(zhí)行被fork()出來的子任務)北启;閑的時候一有活兒大家都撲上去搶著干卜朗,新提交的外部任務能夠能迅速占據(jù)所有CPU。