引言
在上篇文章《深入理解并發(fā)之Java線(xiàn)程池奥吩、工作原理时捌、復(fù)用原理及源碼分析》中亏吝,曾詳細(xì)談到了Java的線(xiàn)程池框架店诗。在其中也說(shuō)到了JDK提供的四種原生線(xiàn)程池以及自定義線(xiàn)程池裹刮,而本文則再來(lái)詳細(xì)談?wù)凧DK1.7中新推出的線(xiàn)程池:ForkJoinPool。但ForkJoinPool的出現(xiàn)并不是為了替代ThreadPoolExecutor庞瘸,而是作為它的補(bǔ)充捧弃,因?yàn)樵谀承﹫?chǎng)景下,它的性能會(huì)比ThreadPoolExecutor更好擦囊。在之前的模式中违霞,往往一個(gè)任務(wù)會(huì)分配給一條線(xiàn)程執(zhí)行,如果有個(gè)任務(wù)耗時(shí)比較長(zhǎng)瞬场,并且在處理期間也沒(méi)有新的任務(wù)到來(lái)买鸽,那么則會(huì)出現(xiàn)一種情況:線(xiàn)程池中只有一條線(xiàn)程在處理這個(gè)大任務(wù),而其他線(xiàn)程卻空閑著贯被,這會(huì)導(dǎo)致CPU負(fù)載不均衡眼五,空閑的處理器無(wú)法幫助工作,從而無(wú)法最大程度上發(fā)揮多核機(jī)器的性能彤灶。而ForkJoinPool則可以完美解決這類(lèi)問(wèn)題看幼,但ForkJoinPool更適合的是處理一些耗時(shí)的大任務(wù),如果是普通任務(wù)幌陕,反而會(huì)因?yàn)檫^(guò)多的任務(wù)拆分和多條線(xiàn)程CPU的來(lái)回切換導(dǎo)致效率下降诵姜。
一、初窺ForkJoin框架的神秘面紗
ForkJoinPool是一個(gè)建立在分治思想上的產(chǎn)物搏熄,其采用任務(wù)“大拆小”的方式以及工作竊取算法實(shí)現(xiàn)并行處理任務(wù)棚唆。通俗來(lái)說(shuō)暇赤,F(xiàn)orkJoin框架的作用主要是為了實(shí)現(xiàn)將大型復(fù)雜任務(wù)進(jìn)行遞歸的分解,直到任務(wù)小到指定閾值時(shí)才開(kāi)始執(zhí)行瑟俭,從而遞歸的返回各個(gè)小任務(wù)的結(jié)果匯集成一個(gè)大任務(wù)的結(jié)果翎卓,依次類(lèi)推最終得出最初提交的那個(gè)大型復(fù)雜任務(wù)的結(jié)果,這和方法的遞歸調(diào)用思想是一樣的摆寄。當(dāng)然ForkJoinPool線(xiàn)程池為了提高任務(wù)的并行度和吞吐量做了非常多而且復(fù)雜的設(shè)計(jì)實(shí)現(xiàn)失暴,其中最著名的就是任務(wù)竊取機(jī)制。但ForkJoinPool更適合于處理一些大型任務(wù)微饥,因此逗扒,F(xiàn)orkJoinPool的適用范圍不大,僅限于某些密集且能被分解成多個(gè)子任務(wù)的任務(wù)欠橘,同時(shí)這些子任務(wù)運(yùn)行的結(jié)果可以合并成最終結(jié)果矩肩。ForkJoin框架主體由三部分組成:
- ①ForkJoinWorkerThread:任務(wù)的執(zhí)行者,具體的線(xiàn)程實(shí)體
- ②ForkJoinTask:需要執(zhí)行的任務(wù)實(shí)體
- ③ForkJoinPool:管理執(zhí)行者線(xiàn)程的管理池
后續(xù)源碼階段會(huì)詳細(xì)分析肃续!
OK~黍檩,接著先簡(jiǎn)單的來(lái)看看ForkJoin框架的使用,F(xiàn)orkJoinPool提交任務(wù)的方式也有三種始锚,分別為:
- execute():可提交Runnbale類(lèi)型的任務(wù)
- submit():可提交Callable類(lèi)型的任務(wù)
- invoke():可提交ForkJoinTask類(lèi)型的任務(wù)刽酱,但ForkJoinTask存在三個(gè)子類(lèi):
- ①RecursiveAction:無(wú)返回值型ForkJoinTask任務(wù)
- ②RecursiveTask:有返回值型ForkJoinTask任務(wù)
- ③CountedCompleter:任務(wù)執(zhí)行完成后可以觸發(fā)鉤子回調(diào)函數(shù)的任務(wù)
上個(gè)案例:
業(yè)務(wù)需求:需要根據(jù)ID值對(duì)某個(gè)范圍區(qū)間內(nèi)的每條數(shù)據(jù)進(jìn)行變更,變更后獲取最新數(shù)據(jù)更新緩存瞧捌。
運(yùn)行環(huán)境:四核機(jī)器
public class ForkJoinPoolDemo {
public static void main(String[] args) {
testFor();
testForkJoin();
}
// 測(cè)試for循環(huán)
private static void testFor(){
Instant startTime = Instant.now();
List<Integer> list = new ArrayList<Integer>();
for (int id = 1; id <= 1000*10000; id++) {
// ....... 模擬從數(shù)據(jù)庫(kù)根據(jù)id查詢(xún)數(shù)據(jù)
list.add(id);
}
Instant endTime = Instant.now();
System.out.println("For循環(huán)耗時(shí):"+
Duration.between(startTime,endTime).toMillis() + "ms");
}
// 測(cè)試ForkJoin框架
private static void testForkJoin(){
ForkJoinPool forkJoinPool = new ForkJoinPool();
Instant startTime = Instant.now();
List invoke = forkJoinPool.invoke(new IdByFindUpdate(1, 1000*10000));
Instant endTime = Instant.now();
System.out.println("ForkJoin耗時(shí):"+
Duration.between(startTime,endTime).toMillis() + "ms");
}
}
class IdByFindUpdate extends RecursiveTask<List> {
private Integer startID;
private Integer endID;
private static final Integer THURSHOLD = 10000; // 臨界值/閾值
public IdByFindUpdate(Integer startID, Integer endID) {
this.startID = startID;
this.endID = endID;
}
@Override
protected List<Integer> compute() {
int taskSize = endID - startID;
List<Integer> list = new ArrayList<Integer>();
// 如果任務(wù)小于或等于拆分的最小閾值棵里,那么則直接處理任務(wù)
if (taskSize <= THURSHOLD) {
for (int id = startID; id <= endID; id++) {
// ....... 模擬從數(shù)據(jù)庫(kù)根據(jù)id查詢(xún)數(shù)據(jù)
list.add(id);
}
return list;
}
// 任務(wù)fork拆分
IdByFindUpdate leftTask = new IdByFindUpdate(startID,
(startID + endID) / 2);
leftTask.fork();
IdByFindUpdate rightTask = new IdByFindUpdate(((startID
+ endID) / 2) + 1, endID);
rightTask.fork();
// 任務(wù)join合并
list.addAll(leftTask.join());
list.addAll(rightTask.join());
return list;
}
}
案例如上,在其中模擬了數(shù)據(jù)庫(kù)查詢(xún)1000W數(shù)據(jù)后姐呐,將數(shù)據(jù)添加到集合中的操作殿怜。其中定義了任務(wù)類(lèi):IdByFindUpdate
,因?yàn)樾枰祷亟Y(jié)果曙砂,所以IdByFindUpdate
類(lèi)實(shí)現(xiàn)了ForkJoinTask的子類(lèi)RecursiveTask
头谜,確保任務(wù)可以提交給ForkJoinPool
線(xiàn)程池執(zhí)行。任務(wù)的拆分閾值設(shè)定為1W鸠澈,當(dāng)任務(wù)的查詢(xún)數(shù)量小于閾值時(shí)乔夯,則直接執(zhí)行任務(wù)。反之款侵,拆分任務(wù)直至最小(達(dá)到閾值)為止才開(kāi)始執(zhí)行侧纯,執(zhí)行結(jié)束后合并結(jié)果并返回新锈。
同時(shí),我們?yōu)榱藢?duì)比區(qū)別眶熬,也使用了普通的for循環(huán)來(lái)對(duì)比測(cè)試妹笆,結(jié)果如下:
/* 運(yùn)行結(jié)果:
* For循環(huán)耗時(shí):3274ms
* ForkJoin耗時(shí):1270ms
*/
很明顯块请,F(xiàn)orkJoin的執(zhí)行速度比普通的for循環(huán)速度快上三倍左右,但是值得一提的是:如果任務(wù)的量級(jí)太小拳缠,F(xiàn)orkJoin的處理速度反而比不上普通的For循環(huán)墩新。這是因?yàn)镕orkJoin框架在拆分任務(wù)fork
階段于合并結(jié)果join
階段需要時(shí)間,并且開(kāi)啟多條線(xiàn)程處理任務(wù)窟坐,CPU切換也需要時(shí)間海渊,所以當(dāng)一個(gè)任務(wù)fork/join
階段以及CPU切換的時(shí)間開(kāi)銷(xiāo)大于原本任務(wù)的執(zhí)行時(shí)間時(shí),這種情況下則沒(méi)有必要使用ForkJoin框架哲鸳。
注意:ForkJoin的執(zhí)行時(shí)間跟機(jī)器硬件配置以及拆分臨界值/閾值的設(shè)定也有關(guān)系臣疑,拆分的閾值并不是越小越好,因?yàn)殚撝翟叫r(shí)徙菠,一個(gè)任務(wù)拆分的小任務(wù)也就會(huì)越多讯沈,而拆分、合并階段都是需要時(shí)間的婿奔,所以閾值需要根據(jù)機(jī)器的具體硬件設(shè)施和任務(wù)的類(lèi)型進(jìn)行合理的計(jì)算缺狠,這樣才能保證任務(wù)執(zhí)行時(shí)能夠到達(dá)最佳狀態(tài)。
ok萍摊,我也做了一個(gè)比較有意思的小測(cè)試挤茄,把單線(xiàn)程for循環(huán)的模式來(lái)處理上述任務(wù)以及ForkJoin框架處理上述任務(wù)分別分為了兩次來(lái)執(zhí)行,同時(shí)監(jiān)控了CPU的利用率狀況记餐,具體如下:
通過(guò)上圖可以非常清晰的看見(jiàn)驮樊,當(dāng)單線(xiàn)程for循環(huán)的模式處理任務(wù)時(shí),因?yàn)槭窃诙嗪藱C(jī)器上執(zhí)行片酝,所以對(duì)于CPU的利用率最高不到50%囚衔,而當(dāng)使用ForkJoin框架處理任務(wù)時(shí),幾次觸頂達(dá)到了100%的CPU利用率雕沿。所以我們可以得出一個(gè)結(jié)論:ForkJoin框架在處理任務(wù)時(shí)练湿,能夠在最大程度上發(fā)揮機(jī)器的性能。
二审轮、ForkJoin框架原理淺析及成員構(gòu)成
在如上肥哎,對(duì)于ForkJoin框架已經(jīng)建立的初步的認(rèn)知,接著慢慢繼續(xù)分析其內(nèi)部實(shí)現(xiàn)過(guò)程疾渣。
2.1篡诽、ForkJoin框架原理
在前面提到過(guò),F(xiàn)orkJoin框架是建立在分治思想上的產(chǎn)物榴捡,而向FoorkJoinPool中傳遞一個(gè)任務(wù)時(shí)杈女,任務(wù)的執(zhí)行流程大體如下:
從圖中可以很輕易的明白:提交的任務(wù)會(huì)被分割成一個(gè)個(gè)小的左/右任務(wù),當(dāng)分割到最小時(shí),會(huì)分別執(zhí)行每個(gè)小的任務(wù)达椰,執(zhí)行完成后翰蠢,會(huì)將每個(gè)左/右任務(wù)的結(jié)果進(jìn)行,從而合并出父級(jí)任務(wù)的結(jié)果啰劲,依次類(lèi)推梁沧,直至最終計(jì)算出整個(gè)任務(wù)的最終結(jié)果。
工作竊扔恪:在引言中曾提到過(guò)ForkJoin框架是基于分治和工作竊取思想實(shí)現(xiàn)的廷支,那么何謂工作竊取呢?先舉個(gè)例子帶大家簡(jiǎn)單理解一下這個(gè)思想猖辫,具體的實(shí)現(xiàn)會(huì)在后面的源碼分析中詳細(xì)談到酥泞。
例子:我是開(kāi)了個(gè)工廠當(dāng)老板,一條流水線(xiàn)上招聘八個(gè)年輕小伙做事啃憎,每個(gè)人安排了五十個(gè)任務(wù)芝囤,并且對(duì)他們說(shuō):“你們是一個(gè)團(tuán)隊(duì),必須等到每個(gè)人做完了自己的任務(wù)之后才能一起下班辛萍!”悯姊。但是在這八個(gè)小伙里面,有手比較靈巧做的比較快的贩毕,也有做的比較慢悯许、效率比較低的人。那么當(dāng)一段時(shí)間過(guò)后辉阶,代號(hào)③的小伙率先完成了分配給自己的任務(wù)后先壕,為了早些下班,會(huì)跑到一些做的比較慢的小伙哪兒去拿一些任務(wù)過(guò)來(lái)幫助完成進(jìn)度谆甜,這便是工作竊取思想垃僚。在ForkJoin框架同樣存在這樣的情況,某條線(xiàn)程已經(jīng)執(zhí)行完成了分配給自己的任務(wù)后规辱,有些線(xiàn)程卻還在執(zhí)行并且堆積著很多任務(wù)谆棺,那么這條已經(jīng)處理完自己任務(wù)的線(xiàn)程則會(huì)去“竊取”其他線(xiàn)程的任務(wù)執(zhí)行。
2.2罕袋、ForkJoin框架成員分析
在前面說(shuō)過(guò)改淑,F(xiàn)orkJoin框架是由三部分組成,分別為:執(zhí)行者線(xiàn)程浴讯、任務(wù)實(shí)體以及線(xiàn)程池朵夏。接下來(lái)我們依次分析這些成員。
2.2.1榆纽、ForkJoinWorkerThread:任務(wù)的執(zhí)行者
ForkJoinWorkerThread
繼承了Thread線(xiàn)程類(lèi)侍郭,作為T(mén)hread的子類(lèi)询吴,但是卻并沒(méi)有對(duì)線(xiàn)程的調(diào)度、執(zhí)行做改變亮元,只是僅僅增加了一些額外功能。ForkJoinWorkerThread
線(xiàn)程被創(chuàng)建出來(lái)后都交由ForkJoinPool
線(xiàn)程池管理唠摹,并且設(shè)置為了守護(hù)線(xiàn)程爆捞,而ForkJoinWorkerThread
線(xiàn)程創(chuàng)建出來(lái)之后都是被注冊(cè)到FrokJoinPool
線(xiàn)程池,由這些線(xiàn)程來(lái)執(zhí)行用戶(hù)提交的任務(wù)勾拉,所以ForkJoinWorkerThread
也被稱(chēng)為任務(wù)的執(zhí)行者煮甥。
ForkJoinPool
線(xiàn)程池與之前的線(xiàn)程池有一點(diǎn)區(qū)別在于:之前的線(xiàn)程池中,總共只有一個(gè)任務(wù)隊(duì)列藕赞,而ForkJoinPool
中成肘,每個(gè)ForkJoinWorkerThread
線(xiàn)程在創(chuàng)建時(shí),都會(huì)為它分配一個(gè)任務(wù)隊(duì)列斧蜕。同時(shí)為了實(shí)現(xiàn)工作竊取機(jī)制双霍,該隊(duì)列被設(shè)計(jì)為雙向隊(duì)列,線(xiàn)程執(zhí)行自身隊(duì)列中的任務(wù)時(shí)批销,采用LIFO的方式獲取任務(wù)洒闸,當(dāng)其他線(xiàn)程竊取任務(wù)時(shí),采用FIFO的方式獲取任務(wù)均芽。ForkJoinWorkerThread
線(xiàn)程的主要工作為執(zhí)行自身隊(duì)列中的任務(wù)丘逸,其次是竊取其他線(xiàn)程隊(duì)列中的任務(wù)執(zhí)行。源碼如下:
public class ForkJoinWorkerThread extends Thread {
final ForkJoinPool pool; // 當(dāng)前線(xiàn)程所屬的線(xiàn)程池
final ForkJoinPool.WorkQueue workQueue; // 當(dāng)前線(xiàn)程的雙向任務(wù)隊(duì)列
protected ForkJoinWorkerThread(ForkJoinPool pool) {
// 調(diào)用Thread父類(lèi)的構(gòu)造函數(shù)創(chuàng)建線(xiàn)程實(shí)體對(duì)象
// 在這里是先暫時(shí)使用aFJT作為線(xiàn)程名稱(chēng)掀宋,當(dāng)外部傳遞線(xiàn)程名稱(chēng)時(shí)會(huì)替換
super("aForkJoinWorkerThread");
// 當(dāng)前設(shè)置線(xiàn)程池
this.pool = pool;
// 向ForkJoinPool線(xiàn)程池中注冊(cè)當(dāng)前線(xiàn)程深纲,為當(dāng)前線(xiàn)程分配任務(wù)隊(duì)列
this.workQueue = pool.registerWorker(this);
}
// ForkJoinWorkerThread類(lèi) → run方法
public void run() {
// 如果隊(duì)列中有任務(wù)
if (workQueue.array == null) {
// 定義異常對(duì)象,方便后續(xù)記錄異常
Throwable exception = null;
try {
// 執(zhí)行前置鉤子函數(shù)(預(yù)留方法劲妙,內(nèi)部未實(shí)現(xiàn))
onStart();
// 執(zhí)行工作隊(duì)列中的任務(wù)
pool.runWorker(workQueue);
} catch (Throwable ex) {
// 記錄捕獲的異常信息
exception = ex;
} finally {
try {
// 對(duì)外寫(xiě)出捕獲的異常信息
onTermination(exception);
} catch (Throwable ex) {
if (exception == null)
exception = ex;
} finally {
// 調(diào)用 deregisterWorker 方法進(jìn)行清理
pool.deregisterWorker(this, exception);
}
}
}
}
// 省略其他代碼.....
}
很明顯的可以看到湃鹊,ForkJoinWorkerThread
的構(gòu)造函數(shù)中,在初始化時(shí)會(huì)將自身注冊(cè)進(jìn)線(xiàn)程池中是趴,然后由線(xiàn)程池給每個(gè)線(xiàn)程對(duì)象分配一個(gè)隊(duì)列涛舍。
2.2.2、ForkJoinTask:任務(wù)實(shí)體
ForkJoinTask
與FutrueTask一樣唆途,是Futrue接口的子類(lèi)富雅,ForkJoinTask
是一種可以將任務(wù)進(jìn)行遞歸分解執(zhí)行,從而提高執(zhí)行并行度的任務(wù)類(lèi)型肛搬,執(zhí)行結(jié)束后也可以支持結(jié)果返回没佑。但ForkJoinTask
僅是一個(gè)抽象類(lèi),子類(lèi)有三個(gè):
- ①RecursiveAction:無(wú)返回值型ForkJoinTask任務(wù)
- ②RecursiveTask:有返回值型ForkJoinTask任務(wù)
- ③CountedCompleter:任務(wù)執(zhí)行完成后可以觸發(fā)鉤子回調(diào)函數(shù)的任務(wù)
ForkJoinTask
的作用就是根據(jù)任務(wù)的分解實(shí)現(xiàn)温赔,將任務(wù)進(jìn)行拆分蛤奢,以及等待子任務(wù)的執(zhí)行結(jié)果合并成父任務(wù)的結(jié)果。ForkJoinTask
內(nèi)部存在一個(gè)整數(shù)類(lèi)型的成員status
,該成員高16位記錄任務(wù)的執(zhí)行狀態(tài)啤贩,如:如NORMAL待秃、CANCELLED或EXCEPTIONAL
,低16位預(yù)留用于記錄用戶(hù)自定義的任務(wù)標(biāo)簽痹屹。ForkJoinTask
源碼具體如下:
public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
// 表示任務(wù)的執(zhí)行狀態(tài)章郁,總共有如下幾種值
volatile int status;
// 獲取任務(wù)狀態(tài)的掩碼,后續(xù)用于位計(jì)算志衍,判斷任務(wù)是否正常執(zhí)行結(jié)束
static final int DONE_MASK = 0xf0000000;
// 表示任務(wù)正常執(zhí)行結(jié)束
static final int NORMAL = 0xf0000000;
// 表示任務(wù)被取消
static final int CANCELLED = 0xc0000000;
// 表示任務(wù)出現(xiàn)異常結(jié)束
static final int EXCEPTIONAL = 0x80000000;
// 表示當(dāng)前任務(wù)被別的任務(wù)依賴(lài)暖庄,在結(jié)束前會(huì)通知其他任務(wù)join結(jié)果
static final int SIGNAL = 0x00010000;
// 低16位掩碼,預(yù)留占位(short mask)
// setForkJoinTaskTag方法中應(yīng)用了該成員楼肪,但這個(gè)方法沒(méi)實(shí)現(xiàn)/應(yīng)用
static final int SMASK = 0x0000ffff;
// 異常哈希鏈表數(shù)組(異常哈希表培廓,類(lèi)似于hashmap1.8之前的實(shí)現(xiàn))
// 因?yàn)槿蝿?wù)拆分之后會(huì)很多,異常信息要么都沒(méi)有春叫,要么都會(huì)出現(xiàn)
// 所以不直接記錄在ForkJoinTask對(duì)象中肩钠,而是采用哈希表結(jié)構(gòu)存儲(chǔ)弱引用類(lèi)型的節(jié)點(diǎn)
// 注意這些都是 static 類(lèi)屬性,所有的ForkJoinTask共用的
private static final ExceptionNode[] exceptionTable;
private static final ReentrantLock exceptionTableLock;
// 在ForkJoinTask的node被GC回收之后象缀,相應(yīng)的異常節(jié)點(diǎn)對(duì)象的引用隊(duì)列
private static final ReferenceQueue<Object> exceptionTableRefQueue;
/**
* 固定容量的exceptionTable(代表數(shù)組長(zhǎng)度為32蔬将,下標(biāo)存儲(chǔ)鏈表頭節(jié)點(diǎn))
*/
private static final int EXCEPTION_MAP_CAPACITY = 32;
// 內(nèi)部節(jié)點(diǎn)類(lèi):異常數(shù)組存儲(chǔ)的元素:
// 數(shù)組是固定長(zhǎng)度,這樣方便外部訪(fǎng)問(wèn)
// 但是為了保證內(nèi)存可用性央星,所以是弱引用類(lèi)型
// 因?yàn)椴荒艽_定任務(wù)的最后一個(gè)join何時(shí)完成霞怀,所以在下次GC發(fā)生時(shí)會(huì)被回收
// 在GC回收后,這些異常信息會(huì)被轉(zhuǎn)存到exceptionTableRefQueue隊(duì)列
static final class ExceptionNode extends WeakReference<ForkJoinTask<?>> {
final Throwable ex;
ExceptionNode next;
final long thrower; // 拋出異常的線(xiàn)程id
final int hashCode; // 在弱引用消失之前存儲(chǔ)hashCode
ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next) {
// //在ForkJoinTask被GC回收之后莉给,會(huì)將該節(jié)點(diǎn)加入隊(duì)列exceptionTableRefQueue
super(task, exceptionTableRefQueue);
this.ex = ex;
this.next = next;
this.thrower = Thread.currentThread().getId();
this.hashCode = System.identityHashCode(task);
}
}
/* 抽象方法:用于拓展 */
// 任務(wù)執(zhí)行完成后返回結(jié)果毙石,未完成返回null
public abstract V getRawResult();
// 強(qiáng)制性的給定返回結(jié)果
protected abstract void setRawResult(V value);
// 執(zhí)行任務(wù)
// 如果執(zhí)行過(guò)程拋出異常則記錄捕獲的異常并更改任務(wù)狀態(tài)為EXCEPTIONAL
// 如果執(zhí)行正常結(jié)束,設(shè)置任務(wù)狀態(tài)為NORMAL正常結(jié)束狀態(tài)
// 如果當(dāng)前是子任務(wù)颓遏,設(shè)置為SIGNAL狀態(tài)并通知其他需要join該任務(wù)的線(xiàn)程
protected abstract boolean exec();
/* 實(shí)現(xiàn)Future接口的方法 */
// 阻塞等待任務(wù)執(zhí)行結(jié)果
public final V get();
// 在給定時(shí)間內(nèi)等待返回結(jié)果徐矩,超出給定時(shí)間則中斷線(xiàn)程
public final V get(long timeout, TimeUnit unit);
// 阻塞非工作線(xiàn)程直至任務(wù)結(jié)束或者中斷(該過(guò)程可能會(huì)發(fā)生竊取動(dòng)作),返回任務(wù)的status值
private int externalInterruptibleAwaitDone();
// 嘗試取消任務(wù)叁幢,成功返回true滤灯,反之false
public boolean cancel(boolean mayInterruptIfRunning);
// 判斷任務(wù)是否已執(zhí)行結(jié)束
public final boolean isDone();
// 判斷任務(wù)是否被取消
public final boolean isCancelled();
/* 一些重要的方法 */
// 執(zhí)行任務(wù)的方法
final int doExec();
// 修改任務(wù)狀態(tài)的方法
private int setcompletion (int completion);
// 取消任務(wù)的方法
public boolean cancel(boolean mayInterruptIfRunning);
// 將新創(chuàng)建的子任務(wù)放入當(dāng)前線(xiàn)程的任務(wù)(工作)隊(duì)列
public final ForkJoinTask<V> fork();
// 將當(dāng)前線(xiàn)程阻塞,直到對(duì)應(yīng)的子任務(wù)完成運(yùn)行并返回執(zhí)行結(jié)果
public final V join();
// 獲取任務(wù)執(zhí)行狀態(tài)曼玩,如果還未結(jié)束鳞骤,當(dāng)前線(xiàn)程獲取任務(wù)幫助執(zhí)行
private int doJoin();
// 執(zhí)行任務(wù),正常結(jié)束則返回結(jié)果黍判,異常結(jié)束則報(bào)告異常
public final V invoke();
// 使用當(dāng)前線(xiàn)程執(zhí)行任務(wù)
private int doInvoke();
// 阻塞線(xiàn)程直至任務(wù)執(zhí)行結(jié)束豫尽,如果未執(zhí)行完成嫌拣,外部線(xiàn)程嘗試幫助執(zhí)行
private int externalAwaitDone();
// 同時(shí)執(zhí)行兩個(gè)任務(wù)卢鹦,第一個(gè)任務(wù)由當(dāng)前線(xiàn)程執(zhí)行,第二個(gè)交由工作線(xiàn)程執(zhí)行
public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2);
// 執(zhí)行多個(gè)任務(wù)渔肩,入?yún)槿我鈧€(gè)任務(wù)對(duì)象,除開(kāi)第一個(gè)任務(wù)榴嗅,其他交由工作線(xiàn)程執(zhí)行
public static void invokeAll(ForkJoinTask<?>... tasks);
// 入?yún)镃ollection集合妄呕,可以支持返回結(jié)果
public static <T extends ForkJoinTask<?>>
Collection<T> invokeAll(Collection<T> tasks);
/* 異常相關(guān)的方法 */
// 記錄異常信息以及設(shè)置任務(wù)狀態(tài)
final int recordExceptionalCompletion(Throwable ex);
// 刪除異常結(jié)點(diǎn)并清理狀態(tài)
private void clearExceptionalCompletion();
// 刪除哈希表中過(guò)期的異常信息引用
private static void expungeStaleExceptions();
// 獲取任務(wù)異常判斷與當(dāng)前線(xiàn)程堆棧關(guān)系是否相關(guān),
// 不相關(guān)則構(gòu)建一個(gè)相同類(lèi)型的異常录肯,作為記錄
// 這樣做的原因是為了提供準(zhǔn)確的堆棧跟蹤
private Throwable getThrowableException();
}
ForkJoinTask
內(nèi)部成員主要由兩部分構(gòu)成趴腋,一個(gè)是表示任務(wù)狀態(tài)的int成員:status
,其他的成員則都是跟任務(wù)異常信息記錄相關(guān)的论咏。不過(guò)值得注意一提的是:ForkJoinTask
內(nèi)部有關(guān)異常信息記錄的成員都是static
關(guān)鍵字修飾的,也就代表著這些成員是所有ForkJoinTask
對(duì)象共享的颁井,ForkJoinTask
使用類(lèi)似與HashMap
的實(shí)現(xiàn)結(jié)構(gòu):固定長(zhǎng)度32的數(shù)組+單向鏈表實(shí)現(xiàn)了一個(gè)哈希表結(jié)構(gòu)厅贪,用于記錄所有ForkJoinTask
執(zhí)行過(guò)程中出現(xiàn)的異常,所有異常信息都會(huì)被封裝成ExceptionNode
節(jié)點(diǎn)加入哈希表中存儲(chǔ)雅宾,但是ExceptionNode
節(jié)點(diǎn)是一種弱引用的實(shí)現(xiàn)养涮,當(dāng)程序下次GC發(fā)生時(shí)會(huì)被GC機(jī)制回收,GC時(shí)這些已捕獲的異常則會(huì)被轉(zhuǎn)移到exceptionTableRefQueue
隊(duì)列中存儲(chǔ)眉抬。
而成員status
代表任務(wù)的執(zhí)行狀態(tài)贯吓,成員類(lèi)型為int,從最大程度上減少了內(nèi)存占用蜀变,為了保證原子性悄谐,該成員使用了volatile
修飾以及操作時(shí)都是CAS操作。而當(dāng)任務(wù)未結(jié)束時(shí)库北,status
都會(huì)大于0爬舰,任務(wù)執(zhí)行結(jié)束后,status
都會(huì)小于0寒瓦。在ForkJoinTask
也定義了如下幾種狀態(tài):
- ①DONE_MASK狀態(tài):屏蔽非完成位標(biāo)志情屹,與NORMAL值相同,主要后續(xù)用于位運(yùn)算判斷任務(wù)是否正常執(zhí)行結(jié)束
- 二進(jìn)制值:
1111 0000 0000 0000 0000 0000 0000 0000
- 二進(jìn)制值:
- ②NORMAL狀態(tài):表示任務(wù)正常執(zhí)行結(jié)束
- 二進(jìn)制值:
1111 0000 0000 0000 0000 0000 0000 0000
- 二進(jìn)制值:
- ③CANCELLED狀態(tài):表示任務(wù)被取消
- 二進(jìn)制值:
1100 0000 0000 0000 0000 0000 0000 0000
- 二進(jìn)制值:
- ④EXCEPTIONAL狀態(tài):表示任務(wù)執(zhí)行過(guò)程中出現(xiàn)異常杂腰,導(dǎo)致任務(wù)執(zhí)行終止結(jié)束
- 二進(jìn)制值:
1000 0000 0000 0000 0000 0000 0000 0000
- 二進(jìn)制值:
- ⑤SIGNAL狀態(tài):表示傳遞信號(hào)狀態(tài)垃你,代表著當(dāng)前任務(wù)存在依賴(lài)關(guān)系,執(zhí)行結(jié)束后需要通知其他任務(wù)join合并結(jié)果
- 二進(jìn)制值:
0000 0000 0000 0001 0000 0000 0000 0000
- 二進(jìn)制值:
- ⑥SMASK狀態(tài):低十六位的預(yù)留占位
- 二進(jìn)制值:
0000 0000 0000 0000 1111 1111 1111 1111
- 二進(jìn)制值:
- PS:②③④⑤為任務(wù)狀態(tài)喂很,其他的只是輔助標(biāo)識(shí)
而ForkJoinTask
中的所有方法也可以分為三大類(lèi):
- ①基于
status
狀態(tài)成員操作以及維護(hù)方法 - ②執(zhí)行任務(wù)以及等待完成方法
- ③附加對(duì)外報(bào)告結(jié)果的用戶(hù)級(jí)方法
重點(diǎn)來(lái)看一下fork()
以及join()
方法:
// ForkJoinTask類(lèi) → fork方法
public final ForkJoinTask<V> fork() {
Thread t;
// 判斷當(dāng)前執(zhí)行的線(xiàn)程是否為池中的工作線(xiàn)程
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
// 如果是的則直接將任務(wù)壓入當(dāng)前線(xiàn)程的任務(wù)隊(duì)列
((ForkJoinWorkerThread)t).workQueue.push(this);
else
// 如果不是則壓入common池中的某個(gè)工作線(xiàn)程的任務(wù)隊(duì)列中
ForkJoinPool.common.externalPush(this);
// 返回當(dāng)前ForkJoinTask對(duì)象惜颇,方便遞歸拆分
return this;
}
// ForkJoinTask類(lèi) → join方法
public final V join() {
int s;
// 判斷任務(wù)執(zhí)行狀態(tài)如果是非正常結(jié)束狀態(tài)
if ((s = doJoin() & DONE_MASK) != NORMAL)
// 拋出相關(guān)的異常堆棧信息
reportException(s);
// 正常執(zhí)行結(jié)束則返回執(zhí)行結(jié)果
return getRawResult();
}
// ForkJoinTask類(lèi) → doJoin方法
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
// status<0則直接返回status值
return (s = status) < 0 ? s :
// 判斷當(dāng)前線(xiàn)程是否為池中的工作線(xiàn)程
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
// 是則取出線(xiàn)程任務(wù)隊(duì)列中的當(dāng)前task執(zhí)行,執(zhí)行完成返回status值
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
// 執(zhí)行未完成則調(diào)用awaitJoin方法等待執(zhí)行完成
wt.pool.awaitJoin(w, this, 0L) :
// 不是則調(diào)用externalAwaitDone()方法阻塞掛起當(dāng)前線(xiàn)程
externalAwaitDone();
}
-
fork方法邏輯:
- ①判斷當(dāng)前線(xiàn)程是否為池中的工作線(xiàn)程類(lèi)型
- 是:將當(dāng)前任務(wù)壓入當(dāng)前線(xiàn)程的任務(wù)隊(duì)列中
- 不是:將當(dāng)前任務(wù)壓入common池中某個(gè)工作線(xiàn)程的任務(wù)隊(duì)列中
- ②返回當(dāng)前的ForkJoinTask任務(wù)對(duì)象恤筛,方便遞歸拆分
- ①判斷當(dāng)前線(xiàn)程是否為池中的工作線(xiàn)程類(lèi)型
-
doJoin&join方法邏輯:
- ①判斷任務(wù)狀態(tài)status是否小于0:
- 小于:代表任務(wù)已經(jīng)結(jié)束官还,返回status值
- 不小于:判斷當(dāng)前線(xiàn)程是否為池中的工作線(xiàn)程:
- 是:取出線(xiàn)程任務(wù)隊(duì)列的當(dāng)前task執(zhí)行,判斷執(zhí)行是否結(jié)束:
- 結(jié)束:返回執(zhí)行結(jié)束的status值
- 未結(jié)束:調(diào)用awaitJoin方法等待執(zhí)行結(jié)束
- 不是:調(diào)用externalAwaitDone()方法阻塞掛起當(dāng)前線(xiàn)程
- 是:取出線(xiàn)程任務(wù)隊(duì)列的當(dāng)前task執(zhí)行,判斷執(zhí)行是否結(jié)束:
- ②判斷任務(wù)執(zhí)行狀態(tài)是否為非正常結(jié)束狀態(tài)毒坛,是則拋出異常堆棧信息
- 任務(wù)狀態(tài)為被取消望伦,拋出CancellationException異常
- 任務(wù)狀態(tài)為異常結(jié)束林说,拋出對(duì)應(yīng)的執(zhí)行異常信息
- ③如果status為正常結(jié)束狀態(tài),則直接返回執(zhí)行結(jié)果
- ①判斷任務(wù)狀態(tài)status是否小于0:
OK~屯伞,最后再看看ForkJoinTask的內(nèi)部類(lèi):
static final class ExceptionNode extends
WeakReference<ForkJoinTask<?>> {}
static final class AdaptedRunnable<T> extends ForkJoinTask<T>
implements RunnableFuture<T> {}
static final class AdaptedRunnableAction extends ForkJoinTask<Void>
implements RunnableFuture<Void> {}
static final class RunnableExecuteAction extends ForkJoinTask<Void> {}
static final class AdaptedCallable<T> extends ForkJoinTask<T>
implements RunnableFuture<T> {}
- ①ExceptionNode:用于記錄任務(wù)執(zhí)行過(guò)程中拋出的異常信息腿箩,是ForkJoinTask的弱引用
- ②AdaptedRunnableAction:用于封裝Runable類(lèi)型任務(wù)的適配器,抽象方法實(shí)現(xiàn):
- getRawResult()方法:直接返回null
- setRawResult()方法:空實(shí)現(xiàn)
- exec()方法:直接調(diào)用的run()方法
- ③AdaptedCallable:用于封裝Callable類(lèi)型任務(wù)的適配器劣摇,抽象方法實(shí)現(xiàn):
- getRawResult()方法:返回call方法的執(zhí)行結(jié)果
- setRawResult()方法:設(shè)置Callable執(zhí)行后的返回值
- exec()方法:調(diào)用的call()方法
- ④AdaptedRunnable:用于封裝Runable類(lèi)型任務(wù)的適配器珠移,可以通過(guò)構(gòu)造器設(shè)置返回集
- ⑤RunnableExecuteAction:同②類(lèi)似,區(qū)別在于它可以?huà)伋霎惓?/li>
2.2.3末融、ForkJoinPool:線(xiàn)程池
ForkJoinPool
也是實(shí)現(xiàn)了ExecutorService
的線(xiàn)程池钧惧,但ForkJoinPool
不同于其他類(lèi)型的線(xiàn)程池,因?yàn)槠鋬?nèi)部實(shí)現(xiàn)了工作竊取機(jī)制勾习,所有線(xiàn)程在執(zhí)行完自己的任務(wù)之后都會(huì)嘗試竊取其他線(xiàn)程的任務(wù)執(zhí)行浓瞪,只有當(dāng)竊取不到任務(wù)的情況下才會(huì)發(fā)生阻塞等待工作。ForkJoinPool
主要是為了執(zhí)行ForkJoinTask
而存在的巧婶,ForkJoinPool
是整個(gè)ForkJoin
框架的核心乾颁,負(fù)責(zé)整個(gè)框架的核心管理、檢查監(jiān)控與資源調(diào)度艺栈。
2.2.3.1英岭、ForkJoinPool構(gòu)造器
先來(lái)看看ForkJoinPool
的構(gòu)造函數(shù)源碼:
// 構(gòu)造器1:使用默認(rèn)的參數(shù)配置創(chuàng)建
public ForkJoinPool() {
this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
defaultForkJoinWorkerThreadFactory, null, false);
}
// 構(gòu)造器2:可指定并行度
public ForkJoinPool(int parallelism) {
this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}
// 構(gòu)造器3:可指定并行度、線(xiàn)程工廠湿右、異常策略以及調(diào)度模式
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
// 私有全參構(gòu)造函數(shù):提供給內(nèi)部其他三個(gè)構(gòu)造器調(diào)用
private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
int mode,
String workerNamePrefix) {
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.config = (parallelism & SMASK) | mode;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
ForkJoinPool
對(duì)外提供了三個(gè)構(gòu)造器诅妹,但是這三個(gè)構(gòu)造器都是基于內(nèi)部的私有構(gòu)造完成的,所以直接分析最后一個(gè)全參的私有構(gòu)造器诅需,該構(gòu)造器共有五個(gè)參數(shù):
- ①parallelism并行度:默認(rèn)為CPU核數(shù)漾唉,最小為1。相當(dāng)于工作線(xiàn)程數(shù)堰塌,但會(huì)有些不同
- ②factory線(xiàn)程工廠:用于創(chuàng)建ForkJoinWorkerThread線(xiàn)程
- ③handler異常捕獲策略:默認(rèn)為null赵刑,執(zhí)行任務(wù)出現(xiàn)異常從中被拋出時(shí),就會(huì)被handler捕獲
- ④mode調(diào)度模式:對(duì)應(yīng)前三個(gè)構(gòu)造中的asyncMode參數(shù)场刑,默認(rèn)為0般此,也就是false
- false:使用LIFO_QUEUE成員,mode=0牵现,使用先進(jìn)后出的模式調(diào)度工作
- true:使用FIFO_QUEUE成員铐懊,mode=1<<16,使用先進(jìn)先出的模式調(diào)度工作
- ⑤workerNamePrefix工作名稱(chēng)前綴:工作線(xiàn)程的名稱(chēng)前綴瞎疼,有默認(rèn)值科乎,不需要傳遞該參數(shù)
創(chuàng)建ForkJoinPool
線(xiàn)程池除開(kāi)通過(guò)構(gòu)造函數(shù)的方式之外,在JDK1.8中還提供了一個(gè)靜態(tài)方法:commonPool()
贼急,該方法可以通過(guò)指定系統(tǒng)參數(shù)的方式(System.setProperty(?,?))定義“并行度茅茂、線(xiàn)程工廠和異常處理策略”捏萍,但是該方法是一個(gè)靜態(tài)方法,也就代表著通過(guò)該方法創(chuàng)建出來(lái)的線(xiàn)程池對(duì)象只會(huì)有一個(gè)空闲,調(diào)用commonPool()
方法獲取到的ForkJoinPool
對(duì)象是整個(gè)程序通用的令杈。
2.2.3.2、ForkJoinPool內(nèi)部成員
前面我們了解了ForkJoinPool的構(gòu)造器碴倾,現(xiàn)在再簡(jiǎn)單看看它的成員構(gòu)成:
// 線(xiàn)程池的ctl控制變量(與上篇中分析的ctl性質(zhì)相同)
volatile long ctl;
// 線(xiàn)程池的運(yùn)行狀態(tài)逗噩,值為常量中對(duì)應(yīng)的值
volatile int runState;
// 將并行度和mode參數(shù)放到了一個(gè)int中,便于后續(xù)通過(guò)位操作計(jì)算
final int config;
// 隨機(jī)種子跌榔,與SEED_INCREMENT魔數(shù)配合使用
int indexSeed;
// 組成WorkQueue數(shù)組异雁,是線(xiàn)程池的核心數(shù)據(jù)結(jié)構(gòu)
volatile WorkQueue[] workQueues;
// 創(chuàng)建線(xiàn)程的線(xiàn)程工廠
final ForkJoinWorkerThreadFactory factory;
// 任務(wù)在執(zhí)行過(guò)程中出現(xiàn)拋出異常時(shí)的處理策略,類(lèi)似于之前線(xiàn)程池的拒絕策略
final UncaughtExceptionHandler ueh;
// 創(chuàng)建線(xiàn)程時(shí)僧须,線(xiàn)程名稱(chēng)的前綴
final String workerNamePrefix;
// 任務(wù)竊取的原子計(jì)數(shù)器
volatile AtomicLong stealCounter;
// 默認(rèn)創(chuàng)建工作線(xiàn)程的工廠類(lèi)
public static final ForkJoinWorkerThreadFactory
defaultForkJoinWorkerThreadFactory;
// 線(xiàn)程修改許可片迅,用于檢測(cè)代碼是否具備修改線(xiàn)程狀態(tài)的權(quán)限
private static final RuntimePermission modifyThreadPermission;
// 通用的ForkJoinPool線(xiàn)程池,用于commonPool()方法
static final ForkJoinPool common;
// 通用線(xiàn)程池的并行數(shù)
static final int commonParallelism;
// 通用線(xiàn)程池的最大線(xiàn)程數(shù)
private static int commonMaxSpares;
// 用于記錄已創(chuàng)建的ForkJoinPool線(xiàn)程池的個(gè)數(shù)
private static int poolNumberSequence;
// 當(dāng)線(xiàn)程執(zhí)行完成自己的任務(wù)且池中沒(méi)有活躍線(xiàn)程時(shí)皆辽,用于計(jì)算阻塞時(shí)間,默認(rèn)2s
private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L;
// 平衡計(jì)數(shù)芥挣,通過(guò)IDLE_TIMEOUT會(huì)減去TIMEOUT_SLOP驱闷,
// 主要為了平衡系統(tǒng)定時(shí)器喚醒時(shí)帶來(lái)的延時(shí)時(shí)間,默認(rèn)20ms
private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L;
// 通用線(xiàn)程池默認(rèn)的最大線(xiàn)程數(shù) 256
private static final int DEFAULT_COMMON_MAX_SPARES = 256;
/**
* 自旋次數(shù):阻塞之前旋轉(zhuǎn)等待的次數(shù)空免,目前使用的是隨機(jī)旋轉(zhuǎn)
* 在awaitRunStateLock空另、awaitWork以及awaitRunstateLock方法中使用,
* 當(dāng)前設(shè)置為零蹋砚,以減少自旋帶來(lái)的CPU開(kāi)銷(xiāo)
* 如果大于零扼菠,則SPINS的值必須為2的冪,至少為 4
*/
private static final int SPINS = 0;
// 這個(gè)是產(chǎn)生隨機(jī)性的魔數(shù)坝咐,用于掃描的時(shí)候進(jìn)行計(jì)算(與ThreadLocal類(lèi)似)
private static final int SEED_INCREMENT = 0x9e3779b9;
// runState的狀態(tài):處于SHUTDOWN時(shí)值必須為負(fù)數(shù)循榆,其他狀態(tài)只要是2的次冪即可
// 鎖定狀態(tài):線(xiàn)程池被某條線(xiàn)程獲取了鎖
private static final int RSLOCK = 1;
// 信號(hào)狀態(tài):線(xiàn)程阻塞前需要設(shè)置RSIGNAL,告訴其他線(xiàn)程在釋放鎖時(shí)要叫醒我
private static final int RSIGNAL = 1 << 1;
// 啟動(dòng)狀態(tài):表示線(xiàn)程池正常墨坚,可以創(chuàng)建線(xiàn)程且接受任務(wù)處理
private static final int STARTED = 1 << 2;
// 停止?fàn)顟B(tài):線(xiàn)程池已停止秧饮,不能創(chuàng)建線(xiàn)程且不接受新任務(wù),同時(shí)會(huì)取消未處理的任務(wù)
private static final int STOP = 1 << 29;
// 死亡狀態(tài):表示線(xiàn)程池內(nèi)所有任務(wù)已取消泽篮,所有工作線(xiàn)程已銷(xiāo)毀
private static final int TERMINATED = 1 << 30;
// 關(guān)閉狀態(tài):嘗試關(guān)閉線(xiàn)程池盗尸,不再接受新的任務(wù),但依舊處理已接受的任務(wù)
private static final int SHUTDOWN = 1 << 31;
// CTL變量的一些掩碼
// 低32位的掩碼
private static final long SP_MASK = 0xffffffffL;
// 高32位的掩碼
private static final long UC_MASK = ~SP_MASK;
// 有效(活躍/存活)計(jì)數(shù):正在處理任務(wù)的活躍線(xiàn)程數(shù)
// 高16位的偏移量帽撑,用高16位記錄活躍線(xiàn)程數(shù)
private static final int AC_SHIFT = 48;
// 高16位:活躍線(xiàn)程的計(jì)數(shù)單元泼各,高16位+1
private static final long AC_UNIT = 0x0001L << AC_SHIFT;
// 高16位:活躍線(xiàn)程數(shù)的掩碼
private static final long AC_MASK = 0xffffL << AC_SHIFT;
// 總計(jì)數(shù):整個(gè)池中存在的所有線(xiàn)程數(shù)量
// 總線(xiàn)程數(shù)量的偏移量,使用高32位中的低16位記錄
private static final int TC_SHIFT = 32;
// 總線(xiàn)程數(shù)的計(jì)數(shù)單元
private static final long TC_UNIT = 0x0001L << TC_SHIFT;
// 總線(xiàn)程數(shù)的掩碼
private static final long TC_MASK = 0xffffL << TC_SHIFT;
// 最大總線(xiàn)程數(shù)的掩碼亏拉,用于判斷線(xiàn)程數(shù)量是否已達(dá)上限
private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15);
// 低16位掩碼扣蜻,表示workQueue在數(shù)組中的最大索引值
static final int SMASK = 0xffff;
// 最大工作線(xiàn)程數(shù)
static final int MAX_CAP = 0x7fff;
// 偶數(shù)掩碼逆巍,第一個(gè)bit為0,任何值與它進(jìn)行 與 計(jì)算弱贼,結(jié)果都為偶數(shù)
static final int EVENMASK = 0xfffe;
// 用于計(jì)算偶數(shù)值下標(biāo)蒸苇,SQMASK值為126
// 0~126之間只存在64個(gè)偶數(shù),所以偶數(shù)位的槽數(shù)只有64個(gè)
static final int SQMASK = 0x007e;
// Masks and units for WorkQueue.scanState and ctl sp subfield
// 用于檢測(cè)工作線(xiàn)程是否在執(zhí)行任務(wù)的掩碼
static final int SCANNING = 1;
// 負(fù)數(shù)吮旅,用于workQueue.scanState溪烤,與scanState進(jìn)行位或可將scanState變成負(fù)數(shù),
// 表示工作線(xiàn)程掃描不到任務(wù)庇勃,進(jìn)入不活躍狀態(tài)檬嘀,將可能被阻塞
static final int INACTIVE = 1 << 31;
// 版本計(jì)數(shù),用于workQueue.scanState责嚷,解決ABA問(wèn)題
static final int SS_SEQ = 1 << 16;
// Mode bits for ForkJoinPool.config and WorkQueue.config
// 獲取隊(duì)列的工作調(diào)度模式
static final int MODE_MASK = 0xffff << 16;
// 表示先進(jìn)后出模式
static final int LIFO_QUEUE = 0;
// 表示先進(jìn)先出模式
static final int FIFO_QUEUE = 1 << 16;
// 表示共享模式
static final int SHARED_QUEUE = 1 << 31;
// Unsafe類(lèi)對(duì)象:用于直接操作內(nèi)存
private static final sun.misc.Unsafe U;
// ForkJoinTask[]數(shù)組的基準(zhǔn)偏移量
// 使用這個(gè)值+元素的大小可以直接定位一個(gè)內(nèi)存位置
private static final int ABASE;
// ForkJoinTask[]數(shù)組兩個(gè)元素之間的間距的冪 → log(間距) 底數(shù)為2
private static final int ASHIFT;
// ctl成員的內(nèi)存偏移量地址
private static final long CTL;
// runState成員的內(nèi)存偏移量地址
private static final long RUNSTATE;
// stealCounter成員的內(nèi)存偏移量地址
private static final long STEALCOUNTER;
// parkBlocker成員的內(nèi)存偏移量地址
private static final long PARKBLOCKER;
// WorkQueue隊(duì)列的top元素偏移量地址
private static final long QTOP;
// WorkQueue隊(duì)列的qlock偏移量地址
private static final long QLOCK;
// WorkQueue隊(duì)列的scanState偏移量地址
private static final long QSCANSTATE;
// WorkQueue隊(duì)列的parker偏移量地址
private static final long QPARKER;
// WorkQueue隊(duì)列的CurrentSteal偏移量地址
private static final long QCURRENTSTEAL;
// WorkQueue隊(duì)列的CurrentJoin偏移量地址
private static final long QCURRENTJOIN;
如上既是ForkJoinPool中的所有成員變量鸳兽,總的可以分為四類(lèi):
- ①
ForkJoinPool
線(xiàn)程池運(yùn)行過(guò)程中的成員結(jié)構(gòu) - ②通用線(xiàn)程池
common
以及所有線(xiàn)程池對(duì)象的默認(rèn)配置 - ③線(xiàn)程池運(yùn)行狀態(tài)以及
ctl
成員的位存儲(chǔ)記錄 - ④直接操作內(nèi)存的Unsafe相關(guān)成員及內(nèi)存偏移量
具體的解釋可以參考我源碼中的注釋?zhuān)覀兙吞糁攸c(diǎn)說(shuō),先來(lái)說(shuō)說(shuō)核心成員:ctl
罕拂,這個(gè)成員比較特殊揍异,我估計(jì)DougLea
在設(shè)計(jì)它的時(shí)候是抱著"一分錢(qián)掰成兩份用"的心思寫(xiě)的,它是一個(gè)long
類(lèi)型的成員爆班,占位8byte/64bit
衷掷,是不是有些疑惑?用int
類(lèi)型不是更省內(nèi)存嘛柿菩?如果你這樣想戚嗅,那你就錯(cuò)了。因?yàn)?code>ctl不是只存儲(chǔ)一個(gè)數(shù)據(jù)枢舶,而是同時(shí)記錄四個(gè)懦胞,64Bit
被拆分為四個(gè)16位的子字段,分別記錄:
- ①1-16bit/AC:記錄池中的活躍線(xiàn)程數(shù)
- ②17-32bit/TC:記錄池中總線(xiàn)程數(shù)
- ③33-48bit/SS:記錄WorkQueue狀態(tài)凉泄,第一位表示active還是inactive躏尉,其余15位表示版本號(hào)(避免ABA問(wèn)題)
- ④49-64bit/ID:記錄第一個(gè)WorkQueue在數(shù)組中的下標(biāo),和其他worker通過(guò)字段stackPred組成的一個(gè)Treiber堆棧
- 低32位可以直接獲取旧困,如SP=(int)ctl醇份,如果為負(fù)則代表存在空閑worker
ok~,了解了ctl
成員之后吼具,再來(lái)看看runState
成員僚纷,它和上篇分析的線(xiàn)程池中的runState
不同,它除開(kāi)用于表示線(xiàn)程池的運(yùn)行狀態(tài)之外拗盒,同時(shí)也作為鎖資源保護(hù)WorkQueues[]
數(shù)組的更新怖竭。
2.2.3.3、ForkJoinPool內(nèi)部類(lèi)WorkQueue工作隊(duì)列
WorkQueue是整個(gè)Fork/Join框架的橋接者陡蝇,每個(gè)執(zhí)行者ForkJoinWorkerThread對(duì)象中存在各自的工作隊(duì)列痊臭,F(xiàn)orkJoinTask被存儲(chǔ)在工作隊(duì)列中哮肚,而ForkJoinPool使用一個(gè)WorkQueue數(shù)組管理協(xié)調(diào)所有執(zhí)行者線(xiàn)程的隊(duì)列。接著再看看WorkQueue工作隊(duì)列的實(shí)現(xiàn):
// 使用@Contended防止偽共享
@sun.misc.Contended
static final class WorkQueue {
// 隊(duì)列可存放任務(wù)數(shù)的初始容量:8192
// 初始化時(shí)分配這么大容量的原因是為了減少哈希沖突導(dǎo)致的擴(kuò)容
static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
// 隊(duì)列可存放任務(wù)數(shù)的最大容量
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
// 隊(duì)列的掃描狀態(tài)广匙。高十六位用于版本計(jì)數(shù)允趟,低16位用于記錄掃描狀態(tài)
// 偶數(shù)位的WorkQueue,該值為負(fù)數(shù)時(shí)表示不活躍
// 奇數(shù)位的WorkQueue鸦致,該值一般為workQueues數(shù)組中的下標(biāo)值潮剪,表示當(dāng)前線(xiàn)程在執(zhí)行
// 如果scanState為負(fù)數(shù),代表線(xiàn)程沒(méi)有找到任務(wù)執(zhí)行分唾,被掛起了抗碰,處于不活躍狀態(tài)
// 如果scanState是奇數(shù),代表線(xiàn)程在尋找任務(wù)過(guò)程中绽乔,如果變成了偶數(shù)弧蝇,代表線(xiàn)程在執(zhí)行任務(wù)
volatile int scanState;
// 前一個(gè)棧頂?shù)腸tl值
int stackPred;
// 竊取任務(wù)的數(shù)量統(tǒng)計(jì)
int nsteals;
// 用于記錄隨機(jī)選擇竊取任務(wù),被竊取任務(wù)workQueue在數(shù)組中的下標(biāo)值
int hint;
// 記錄當(dāng)前隊(duì)列在數(shù)組中的下標(biāo)和工作模式
// 高十六位記錄工作模式折砸,低十六位記錄數(shù)組下標(biāo)
int config;
// 鎖標(biāo)識(shí)(類(lèi)似于AQS是state鎖標(biāo)識(shí))
// 為1表示隊(duì)列被鎖定看疗,為0表示未鎖定
// 小于0代表當(dāng)前隊(duì)列注銷(xiāo)或線(xiàn)程池關(guān)閉(terminate狀態(tài)時(shí)為-1)
volatile int qlock;
// 下一個(gè)pool操作的索引值(棧底/隊(duì)列頭部)
volatile int base;
// 下一個(gè)push操作的索引值(棧頂/隊(duì)列尾部)
int top;
// 存放任務(wù)的數(shù)組,初始化時(shí)不會(huì)分配空間睦授,采用懶加載形式初始化空間
ForkJoinTask<?>[] array;
// 所屬線(xiàn)程池的引用指向
final ForkJoinPool pool;
// 當(dāng)前隊(duì)列所屬線(xiàn)程的引用鹃觉,如果為外部提交任務(wù)的共享隊(duì)列則為null
final ForkJoinWorkerThread owner;
// 線(xiàn)程在執(zhí)行過(guò)程中如果被掛起阻塞,該成員保存被掛起的線(xiàn)程睹逃,否則為空
volatile Thread parker;
// 等待join合并的任務(wù)
volatile ForkJoinTask<?> currentJoin;
// 竊取過(guò)來(lái)的任務(wù)
volatile ForkJoinTask<?> currentSteal;
// 構(gòu)造器
WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
this.pool = pool;
this.owner = owner;
// 開(kāi)始的時(shí)候都指向棧頂
base = top = INITIAL_QUEUE_CAPACITY >>> 1;
}
/* 重點(diǎn)方法 */
// 添加方法:將一個(gè)任務(wù)添加進(jìn)隊(duì)列中
// 注意:索引不是通過(guò)數(shù)組下標(biāo)計(jì)算的,而是通過(guò)計(jì)算內(nèi)存偏移量定位
final void push(ForkJoinTask<?> task);
// 擴(kuò)容方法:隊(duì)列元素?cái)?shù)量達(dá)到容量時(shí)祷肯,擴(kuò)容兩倍并移動(dòng)元素到新數(shù)組
final ForkJoinTask<?>[] growArray();
// 獲取方法:從棧頂(LIFO)彈出一個(gè)任務(wù)
final ForkJoinTask<?> pop();
// 獲取方法:從棧底(FIFO)彈出一個(gè)任務(wù)
final ForkJoinTask<?> poll();
}
WorkQueue內(nèi)部采用一個(gè)數(shù)組存儲(chǔ)所有分配的任務(wù)沉填,線(xiàn)程執(zhí)行時(shí)會(huì)從該隊(duì)列中獲取任務(wù),如果數(shù)組為空佑笋,那么則會(huì)嘗試竊取其他線(xiàn)程的任務(wù)翼闹。
至此,結(jié)合前面談到的ForkJoinPool線(xiàn)程池的結(jié)構(gòu)一同理解蒋纬,在ForkJoinPool中存在一個(gè)由WorkQueue構(gòu)成的數(shù)組成員workQueues猎荠,而在每個(gè)WorkQueue中又存在一個(gè)ForkJoinTask構(gòu)成的數(shù)組成員array,所以Fork/Join框架中存儲(chǔ)任務(wù)的結(jié)構(gòu)如下:
- 重點(diǎn):
-
workQueues
數(shù)組的容量必須為2的整次冪蜀备。下標(biāo)為偶數(shù)的用于存儲(chǔ)外部提交的任務(wù)关摇,奇數(shù)位置存儲(chǔ)內(nèi)部fork出的子任務(wù) - 偶數(shù)位置的任務(wù)屬于共享任務(wù),由工作線(xiàn)程競(jìng)爭(zhēng)獲取碾阁,模式為FIFO
- 奇數(shù)位置的任務(wù)屬于某個(gè)工作線(xiàn)程输虱,一般是fork產(chǎn)生的子任務(wù)
- 工作線(xiàn)程在處理完自身任務(wù)時(shí)會(huì)竊取其他線(xiàn)程的任務(wù),竊取方式為FIFO
- 工作線(xiàn)程執(zhí)行自己隊(duì)列中任務(wù)的模式默認(rèn)為L(zhǎng)IFO(可以改成FIFO脂凶,不推薦)
-
關(guān)于ForkJoinWorkerThreadFactory線(xiàn)程工廠以及ManagedBlocker就不再闡述了宪睹,有興趣的可以自己去參考上篇文章研究一下愁茁。
三、ForkJoin框架任務(wù)提交原理
在前面給出的ForkJoin使用案例中亭病,我們使用invoke()
方法將自己定義的任務(wù)提交給了ForkJoinPool
線(xiàn)程池執(zhí)行鹅很。前面曾提到過(guò),提交任務(wù)的方式有三種:invoke()罪帖、execute()以及submit()
促煮,但它們?nèi)N方式的最終實(shí)現(xiàn)都大致相同,所以我們從invoke()
方法開(kāi)始胸蛛,以它作為入口分析ForkJoin框架任務(wù)提交任務(wù)的原理實(shí)現(xiàn)污茵。源碼如下:
// ForkJoinPool類(lèi) → invoke()方法
public <T> T invoke(ForkJoinTask<T> task) {
// 如果任務(wù)為空,拋出空指針異常
if (task == null)
throw new NullPointerException();
// 如果不為空則提交任務(wù)執(zhí)行
externalPush(task);
// 等待任務(wù)執(zhí)行結(jié)果返回
return task.join();
}
// ForkJoinPool類(lèi) → externalPush()方法
final void externalPush(ForkJoinTask<?> task) {
WorkQueue[] ws; WorkQueue q; int m;
// 獲取線(xiàn)程的探針哈希值以及線(xiàn)程池運(yùn)行狀態(tài)
int r = ThreadLocalRandom.getProbe();
int rs = runState;
// 判斷線(xiàn)程池是否具備了任務(wù)提交的環(huán)境
// 如果工作隊(duì)列數(shù)組已經(jīng)初始化
// 并且數(shù)組以及數(shù)組中偶數(shù)位的工作隊(duì)列不為空
// 并且線(xiàn)程池狀態(tài)正常
// 并且獲取隊(duì)列鎖成功
// 滿(mǎn)足條件則開(kāi)始提交任務(wù)
if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
U.compareAndSwapInt(q, QLOCK, 0, 1)) {
ForkJoinTask<?>[] a; int am, n, s;
// 判斷隊(duì)列中的任務(wù)數(shù)組是否初始化并且數(shù)組是否還有空位
if ((a = q.array) != null &&
(am = a.length - 1) > (n = (s = q.top) - q.base)) {
// 通過(guò)計(jì)算內(nèi)存偏移量得到任務(wù)要被的存儲(chǔ)索引
int j = ((am & s) << ASHIFT) + ABASE;
// 通過(guò)Unsafe類(lèi)將任務(wù)寫(xiě)入到數(shù)組中
U.putOrderedObject(a, j, task);
U.putOrderedInt(q, QTOP, s + 1);
U.putIntVolatile(q, QLOCK, 0);
// 如果隊(duì)列任務(wù)很多
if (n <= 1)
// 喚醒或者新啟一條線(xiàn)程幫忙處理
signalWork(ws, q);
return;
}
// 釋放隊(duì)列鎖
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
// 提交執(zhí)行
externalSubmit(task);
}
// ForkJoinPool類(lèi) → externalSubmit()方法
private void externalSubmit(ForkJoinTask<?> task) {
int r;
// 如果當(dāng)前提交任務(wù)的線(xiàn)程的探針哈希值為0葬项,
// 則初始化當(dāng)前線(xiàn)程的探針哈希值
if ((r = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit();
r = ThreadLocalRandom.getProbe();
}
// 開(kāi)啟死循環(huán)直至成功提交任務(wù)為止
for (;;) {
WorkQueue[] ws; WorkQueue q; int rs, m, k;
// 定義競(jìng)爭(zhēng)標(biāo)識(shí)
boolean move = false;
// 如果runState小于0代表為負(fù)數(shù)泞当,代表線(xiàn)程池已經(jīng)要關(guān)閉了
if ((rs = runState) < 0) {
// 嘗試關(guān)閉線(xiàn)程池
tryTerminate(false, false); // help terminate
// 線(xiàn)程池關(guān)閉后同時(shí)拋出異常
throw new RejectedExecutionException();
}
// 如果線(xiàn)程池還未初始化,先對(duì)線(xiàn)程池進(jìn)行初始化操作
else if ((rs & STARTED) == 0 || // initialize
((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
int ns = 0;
// 獲取池鎖民珍,沒(méi)獲取鎖的線(xiàn)程則會(huì)自旋或者阻塞掛起
rs = lockRunState();
try {
// 再次檢測(cè)是否已初始化
if ((rs & STARTED) == 0) {
U.compareAndSwapObject(this, STEALCOUNTER, null,
new AtomicLong());
// 獲取并行數(shù)
int p = config & SMASK;
// 通過(guò)如下計(jì)算得到最接近2次冪的值
// 找到之后對(duì)該值 * 2倍
// 原理:將p中最高位的那個(gè)1以后的位都設(shè)置為1襟士,
// 最后加1得到最接近的二次冪的值
int n = (p > 1) ? p - 1 : 1;
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4;
n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
workQueues = new WorkQueue[n];
ns = STARTED;
}
} finally {
// 釋放鎖,并更改運(yùn)行狀態(tài)為STARTED
unlockRunState(rs, (rs & ~RSLOCK) | ns);
}
}
// r:隨機(jī)值,m:工作隊(duì)列的容量減1,SQMASK:偶數(shù)位最大的64個(gè)的掩碼
// r&m計(jì)算出了下標(biāo)嚷量,位與SQMASK之后會(huì)變成一個(gè)<=126的偶數(shù)
// 如果隨機(jī)出來(lái)的偶數(shù)位下標(biāo)位置隊(duì)列不為空
else if ((q = ws[k = r & m & SQMASK]) != null) {
// 先獲取隊(duì)列鎖
if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
// 獲取隊(duì)列的任務(wù)數(shù)組
ForkJoinTask<?>[] a = q.array;
// 記錄隊(duì)列原本的棧頂/隊(duì)列尾部的數(shù)組下標(biāo)
int s = q.top;
// 提交標(biāo)識(shí)
boolean submitted = false;
try {
// 如果數(shù)組不為空并且數(shù)組中還有空位
// (a.length > s+1-q.base如果不成立則代表空位不足)
// 隊(duì)列元素?cái)?shù)量達(dá)到容量陋桂,沒(méi)有空位時(shí)調(diào)用growArray進(jìn)行擴(kuò)容
if ((a != null && a.length > s + 1 - q.base) ||
(a = q.growArray()) != null) {
// 通過(guò)計(jì)算內(nèi)存偏移量得到棧頂/隊(duì)列尾部位置
int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
// 將新的任務(wù)放在棧頂/隊(duì)列尾部位置
U.putOrderedObject(a, j, task);
// 更新棧頂/隊(duì)列尾部
U.putOrderedInt(q, QTOP, s + 1);
// 提交標(biāo)識(shí)改為true
submitted = true;
}
} finally {
// 釋放隊(duì)列鎖
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
// 如果任務(wù)已經(jīng)提交到了工作隊(duì)列
if (submitted) {
// 創(chuàng)建新的線(xiàn)程處理,如果線(xiàn)程數(shù)已滿(mǎn)蝶溶,喚醒線(xiàn)程處理
signalWork(ws, q);
return;
}
}
// 能執(zhí)行到這里則代表前面沒(méi)有獲取到鎖嗜历,該位置的隊(duì)列有其他線(xiàn)程在操作
// 將競(jìng)爭(zhēng)標(biāo)識(shí)改為true
move = true;
}
// 如果隨機(jī)出來(lái)的偶數(shù)下標(biāo)位置的隊(duì)列為空
// 那么則在該位置上新建工作隊(duì)列,然后將任務(wù)放進(jìn)去
else if (((rs = runState) & RSLOCK) == 0) {
// 新建一個(gè)工作隊(duì)列抖所,第二個(gè)參數(shù)是所屬線(xiàn)程
// 現(xiàn)在創(chuàng)建的第二個(gè)參數(shù)為null梨州,因?yàn)榕紨?shù)位的隊(duì)列是共享的
q = new WorkQueue(this, null);
// 隊(duì)列記錄一下前面的隨機(jī)值
q.hint = r;
// k是前面計(jì)算出的偶數(shù)位置索引,SHARED_QUEUE是共享隊(duì)列模式
// 使用高16位存儲(chǔ)隊(duì)列模式田轧,低16位存儲(chǔ)數(shù)組索引
q.config = k | SHARED_QUEUE;
// 掃描狀態(tài)為失活狀態(tài)(負(fù)數(shù)暴匠,因?yàn)楣蚕黻?duì)列
// 不屬于任何一個(gè)工作線(xiàn)程,它不需要標(biāo)記工作線(xiàn)程狀態(tài))
q.scanState = INACTIVE;
// 獲取池鎖
rs = lockRunState();
// 將新創(chuàng)建的工作隊(duì)列放入數(shù)組中
if (rs > 0 && (ws = workQueues) != null &&
k < ws.length && ws[k] == null)
ws[k] = q;
// 釋放池鎖
unlockRunState(rs, rs & ~RSLOCK);
}
else
move = true;
// 如果計(jì)算出的偶數(shù)位置有其他線(xiàn)程在操作傻粘,為了減少競(jìng)爭(zhēng)每窖,
// 獲取下一個(gè)隨機(jī)值,重新定位一個(gè)新的位置處理
if (move)
r = ThreadLocalRandom.advanceProbe(r);
}
}
源碼如上弦悉,可以很明顯的看到窒典,流程比較長(zhǎng),總結(jié)一下核心稽莉,如下:
- ①判斷任務(wù)是否為空崇败,為空拋出異常,不為空則開(kāi)始提交任務(wù)
- ②調(diào)用externalPush()方法嘗試快速提交任務(wù)
- 快速提交條件:探針哈希值已初始化、池中隊(duì)列數(shù)組已初始化后室、隨機(jī)的偶數(shù)位置隊(duì)列不為空缩膝、線(xiàn)程池已初始化并狀態(tài)正常、能成功獲取隊(duì)列鎖
- 如上條件全部成立則快速提交任務(wù)岸霹,提交成功直接返回疾层,結(jié)束執(zhí)行
- 如果不成立則調(diào)用externalSubmit()提交任務(wù),流程如下述步驟
- ③初始化線(xiàn)程的探針哈希值并開(kāi)啟死循環(huán)提交任務(wù)(這個(gè)循環(huán)會(huì)直至提交成功才終止)
- ④檢查線(xiàn)程池狀態(tài)是否正常贡避,如果狀態(tài)為關(guān)閉狀態(tài)則拒絕任務(wù)痛黎,拋出異常
- ⑤檢查線(xiàn)程池是否已啟動(dòng),如果還未啟動(dòng)則獲取池鎖刮吧,初始化線(xiàn)程池
- ⑥如果通過(guò)探針值+隊(duì)列數(shù)組容量減一的值+掩碼計(jì)算出的偶數(shù)位隊(duì)列不為空:
- 嘗試獲取隊(duì)列鎖:
- 成功:將任務(wù)添加到計(jì)算出的偶數(shù)位隊(duì)列的任務(wù)數(shù)組中湖饱,如過(guò)數(shù)組長(zhǎng)度不夠則先擴(kuò)容,任務(wù)添加成功后新建或喚醒一條線(xiàn)程杀捻,然后返回
- 失斁帷:代表有其他線(xiàn)程在操作這個(gè)偶數(shù)位置的隊(duì)列,將move標(biāo)識(shí)改為true
- 嘗試獲取隊(duì)列鎖:
- ⑦如果計(jì)算出的偶數(shù)位置隊(duì)列還未初始化致讥,那么則先嘗試獲取池鎖
- 成功:在該位置上創(chuàng)建一個(gè)共享隊(duì)列仅仆,最后再釋放池鎖
- 失敗:代表有其他線(xiàn)程也在操作池中的隊(duì)列數(shù)組垢袱,將move標(biāo)識(shí)改為true
- ⑧如果move競(jìng)爭(zhēng)標(biāo)識(shí)為true墓拜,代表本次操作存在線(xiàn)程競(jìng)爭(zhēng),為了減少競(jìng)爭(zhēng)请契,重新獲取一個(gè)新的探針哈希值咳榜,計(jì)算出一個(gè)新的偶數(shù)位進(jìn)行操作
- ⑨當(dāng)任務(wù)第一次執(zhí)行沒(méi)有添加成功時(shí),會(huì)繼續(xù)重復(fù)這些步驟爽锥,直至任務(wù)成功入列
- 第一次添加失敗的情況:
- 隊(duì)列存在其他線(xiàn)程操作沒(méi)有獲取到隊(duì)列鎖
- 計(jì)算出的偶數(shù)索引位的隊(duì)列為空贿衍,第一次執(zhí)行會(huì)先初始化隊(duì)列
- 第一次添加失敗的情況:
- ⑩任務(wù)成功提交到工作隊(duì)列后,join等待任務(wù)執(zhí)行完成救恨,返回結(jié)果合并
對(duì)于上述步驟中提及到的一些名詞解釋?zhuān)?br> 探針哈希值:Thread類(lèi)threadLocalRandomProbe成員的值,ThreadLocalRandom.getProbe()通過(guò)Unsafe計(jì)算當(dāng)前線(xiàn)程的內(nèi)存偏移量來(lái)獲取
池鎖:runState的RSLOCK狀態(tài)释树,如果要對(duì)池中的隊(duì)列數(shù)組進(jìn)行操作則要先獲取這個(gè)鎖
隊(duì)列鎖:隊(duì)列中的qlock成員肠槽,如果要對(duì)隊(duì)列的任務(wù)數(shù)組進(jìn)行操作則要先獲取這個(gè)鎖
偶數(shù)位隊(duì)列:前面提到過(guò),池中的隊(duì)列數(shù)組workQueues下標(biāo)為偶數(shù)的位置用來(lái)存儲(chǔ)用戶(hù)提交的任務(wù)奢啥,屬于共享隊(duì)列秸仙,不屬于任何一條線(xiàn)程,里面的任務(wù)需要線(xiàn)程競(jìng)爭(zhēng)獲取
OK~桩盲,至此任務(wù)提交的流程分析完畢寂纪,關(guān)于execute()、submit()
方法則不再分析,最終實(shí)現(xiàn)都是相同的捞蛋,只是入口不同孝冒。最后再來(lái)個(gè)圖理解一下:
四、ForkJoin框架任務(wù)工作原理
在前面的提交原理分析中可以得知拟杉,任務(wù)的執(zhí)行都是通過(guò)調(diào)用signalWork()
執(zhí)行的庄涡,而這個(gè)方法會(huì)新創(chuàng)建一條線(xiàn)程處理任務(wù),但當(dāng)線(xiàn)程數(shù)量已經(jīng)達(dá)到線(xiàn)程池的最大線(xiàn)程數(shù)時(shí)搬设,則會(huì)嘗試喚醒一條線(xiàn)程執(zhí)行穴店。下面則以signalWork()
做為入口來(lái)分析ForkJoin框架任務(wù)工作原理,先來(lái)看看signalWork()
源碼:
// ForkJoinPool類(lèi) → signalWork()方法
final void signalWork(WorkQueue[] ws, WorkQueue q) {
long c; int sp, i; WorkQueue v; Thread p;
// ctl初始化時(shí)為負(fù)數(shù)拿穴,如果ctl<0則代表有任務(wù)需要處理
while ((c = ctl) < 0L) {
// sp==0代表不存在空閑的線(xiàn)程
// 前面分析成員構(gòu)成的時(shí)候提到過(guò):
// 低32位可以直接獲取泣洞,如SP=(int)ctl,
// 如果sp為負(fù)則代表存在空閑worker
if ((sp = (int)c) == 0) {
// 如果池中線(xiàn)程數(shù)量還未達(dá)到最大線(xiàn)程數(shù)
if ((c & ADD_WORKER) != 0L)
// 創(chuàng)建一條新的線(xiàn)程來(lái)處理工作
tryAddWorker(c);
// 新建線(xiàn)程完成后退出循環(huán)并返回
break;
}
// 下面這三個(gè)判斷都是在檢測(cè)線(xiàn)程池狀態(tài)是否正常
// 因?yàn)閟ignalWork只能框架內(nèi)部調(diào)用默色,所以傳入的隊(duì)列不可能為空球凰,
// 除非是處于unstarted/terminated狀態(tài),代表線(xiàn)程池即將關(guān)閉该窗,
// 嘗試中斷未執(zhí)行任務(wù)弟蚀,直接清空了任務(wù),所以此時(shí)直接中斷執(zhí)行
if (ws == null)
break;
if (ws.length <= (i = sp & SMASK))
break;
if ((v = ws[i]) == null)
break;
// 下述代碼是獲取所有阻塞線(xiàn)程鏈中的top線(xiàn)程并喚醒它
// 但是在喚醒之前需要先把top線(xiàn)程的stackPerd標(biāo)識(shí)放在ctl中
int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState
int d = sp - v.scanState; // screen CAS
long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
// 利用CAS機(jī)制修改ctl值
if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
v.scanState = vs;
if ((p = v.parker) != null)
// 喚醒線(xiàn)程
U.unpark(p);
// 喚醒后退出
break;
}
// 如果隊(duì)列為空或者沒(méi)有task酗失,退出執(zhí)行
if (q != null && q.base == q.top)
break;
}
}
// ForkJoinPool類(lèi) → tryAddWorker()方法
private void tryAddWorker(long c) {
// 定義新增標(biāo)識(shí)
boolean add = false;
do {
// 添加活躍線(xiàn)程數(shù)和總線(xiàn)程數(shù)
long nc = ((AC_MASK & (c + AC_UNIT)) |
(TC_MASK & (c + TC_UNIT)));
// 如果ctl值沒(méi)有被其他線(xiàn)程修改
if (ctl == c) {
int rs, stop;
// 獲取鎖并檢測(cè)線(xiàn)程池狀態(tài)是否正常
if ((stop = (rs = lockRunState()) & STOP) == 0)
// 只有當(dāng)線(xiàn)程池沒(méi)有停止才可以創(chuàng)建線(xiàn)程
add = U.compareAndSwapLong(this, CTL, c, nc);
// 釋放池鎖
unlockRunState(rs, rs & ~RSLOCK);
// 如果線(xiàn)程池狀態(tài)已經(jīng)stop义钉,那么則退出執(zhí)行
if (stop != 0)
break;
// 如果沒(méi)有stop
if (add) {
// 則新建線(xiàn)程
createWorker();
// 退出
break;
}
}
// ADD_WORKER的第48位是1,和ctl位與運(yùn)算是為了檢查總線(xiàn)程是否已滿(mǎn)
// (int)c == 0代表池中不存在空閑線(xiàn)程數(shù)
// 只有當(dāng)總線(xiàn)程數(shù)未滿(mǎn)時(shí)以及池中不存在空閑線(xiàn)程數(shù)才會(huì)創(chuàng)建線(xiàn)程
} while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
}
如上兩個(gè)方法的邏輯比較簡(jiǎn)單:
- ①如果有任務(wù)需要處理并且池中目前不存在空閑線(xiàn)程并且池中線(xiàn)程還未滿(mǎn)规肴,調(diào)用tryAddWorker()方法嘗試創(chuàng)建線(xiàn)程
- ②獲取池鎖更改ctl值并檢測(cè)線(xiàn)程池的狀態(tài)是否正常捶闸,正常則調(diào)用createWorker()創(chuàng)建線(xiàn)程
- ③ryAddWorker()是一個(gè)自旋方法,在池中線(xiàn)程數(shù)未滿(mǎn)且沒(méi)有出現(xiàn)空閑線(xiàn)程的情況下拖刃,會(huì)一直循環(huán)至成功創(chuàng)建線(xiàn)程或者池關(guān)閉
- ④如果池中存在空閑線(xiàn)程或者線(xiàn)程數(shù)已滿(mǎn)删壮,那么則會(huì)嘗試喚醒阻塞鏈上的第一條線(xiàn)程
4.1、工作線(xiàn)程創(chuàng)建及注冊(cè)原理
接著繼續(xù)看看createWorker()
方法:
// ForkJoinPool類(lèi) → createWorker()方法
private boolean createWorker() {
// 獲取池中的線(xiàn)程工廠
ForkJoinWorkerThreadFactory fac = factory;
Throwable ex = null;
ForkJoinWorkerThread wt = null;
try {
// 通過(guò)線(xiàn)程工廠的newThread方法創(chuàng)建一條新線(xiàn)程
if (fac != null && (wt = fac.newThread(this)) != null) {
// 創(chuàng)建成功后返回true
wt.start();
return true;
}
} catch (Throwable rex) {
// 如果出現(xiàn)異常則記錄異常信息
ex = rex;
}
// 然后注銷(xiāo)線(xiàn)程以及將之前tryAddWorker()方法中修改的ctl值改回去
deregisterWorker(wt, ex);
return false;
}
// DefaultForkJoinWorkerThreadFactory類(lèi) → newThread()方法
public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
// 直接創(chuàng)建了一條工作線(xiàn)程
return new ForkJoinWorkerThread(pool);
}
創(chuàng)建工作線(xiàn)程的源碼比較簡(jiǎn)單兑牡,首先會(huì)獲取池中采用的線(xiàn)程工廠央碟,然后通過(guò)線(xiàn)程工廠創(chuàng)建一條ForkJoinWorkerThread
工作線(xiàn)程。ok均函,再回到最開(kāi)始的ForkJoin框架成員構(gòu)成分析中的ForkJoinWorkerThread
構(gòu)造函數(shù):
// ForkJoinWorkerThread類(lèi) → 構(gòu)造函數(shù)
protected ForkJoinWorkerThread(ForkJoinPool pool) {
// 調(diào)用Thread父類(lèi)的構(gòu)造函數(shù)創(chuàng)建線(xiàn)程實(shí)體對(duì)象
// 在這里是先暫時(shí)使用aFJT作為線(xiàn)程名稱(chēng)亿虽,當(dāng)外部傳遞線(xiàn)程名稱(chēng)時(shí)會(huì)替換
super("aForkJoinWorkerThread");
// 當(dāng)前設(shè)置線(xiàn)程池
this.pool = pool;
// 向ForkJoinPool線(xiàn)程池中注冊(cè)當(dāng)前線(xiàn)程,為當(dāng)前線(xiàn)程分配任務(wù)隊(duì)列
this.workQueue = pool.registerWorker(this);
}
// ForkJoinPool類(lèi) → registerWorker()方法
final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
// 異常策略
UncaughtExceptionHandler handler;
// 設(shè)置為守護(hù)線(xiàn)程
wt.setDaemon(true);
// 為創(chuàng)建的線(xiàn)程設(shè)置異常匯報(bào)策略
if ((handler = ueh) != null)
wt.setUncaughtExceptionHandler(handler);
// 為創(chuàng)建的線(xiàn)程分配任務(wù)隊(duì)列
WorkQueue w = new WorkQueue(this, wt);
int i = 0; // 池中隊(duì)列數(shù)組的索引
int mode = config & MODE_MASK; // 獲取隊(duì)列模式
int rs = lockRunState(); // 獲取池鎖
try {
WorkQueue[] ws; int n; // skip if no array
// 如果池中隊(duì)列數(shù)組不為空并且已經(jīng)初始化
if ((ws = workQueues) != null && (n = ws.length) > 0) {
// 獲取用于計(jì)算數(shù)組下標(biāo)隨機(jī)的索引種子
int s = indexSeed += SEED_INCREMENT;
int m = n - 1; // 獲取隊(duì)列數(shù)組的最大索引值
// 計(jì)算出一個(gè)奇數(shù)位索引
// 與1位或苞也,就是將第1個(gè)bit位設(shè)為1洛勉,此時(shí)這個(gè)數(shù)必然是奇數(shù)
// 與m位與,為了保證得到的值是在m以?xún)?nèi)奇數(shù)下標(biāo)值
i = ((s << 1) | 1) & m;
// 如果計(jì)算出的位置不為空則代表已經(jīng)有隊(duì)列了如迟,
// 代表此時(shí)發(fā)生了碰撞沖突收毫,那么此時(shí)則需要換個(gè)位置
if (ws[i] != null) {
int probes = 0;
// 計(jì)算步長(zhǎng)攻走,步長(zhǎng)是一個(gè)不能為2以及2次冪值的偶數(shù)
// 為了保證計(jì)算出的值不為2的次冪值,會(huì)在最后進(jìn)行+2操作
// 后續(xù)會(huì)用原本的索引值+步長(zhǎng)得到一個(gè)新的奇數(shù)索引值
// 奇數(shù)+偶數(shù)=奇數(shù)此再,所以不需要擔(dān)心會(huì)成為偶數(shù)位索引
// 這里計(jì)算步長(zhǎng)是通過(guò)長(zhǎng)度n來(lái)計(jì)算的昔搂,
// 因?yàn)椴介L(zhǎng)大一些,避免沖突的概念就會(huì)小一些
int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
// 如果新計(jì)算出的奇數(shù)位索引位置依舊不為空
while (ws[i = (i + step) & m] != null) {
// 從下標(biāo)0開(kāi)始遍歷整個(gè)數(shù)組
if (++probes >= n) {
// 如果所有奇數(shù)位值都不為空引润,代表數(shù)組滿(mǎn)了巩趁,
// 那么擴(kuò)容兩倍,擴(kuò)容后重新再遍歷一次新數(shù)組
// 直至找出為空的奇數(shù)位下標(biāo)
workQueues = ws = Arrays.copyOf(ws, n <<= 1);
m = n - 1;
probes = 0;
}
}
}
// 記錄這個(gè)隨機(jī)出來(lái)的索引種子
w.hint = s; // use as random seed
// 將前面計(jì)算得到的奇數(shù)位索引值以及工作模式記錄在config
w.config = i | mode;
// 掃描狀態(tài)隊(duì)列在數(shù)組中的下標(biāo)淳附,為正數(shù)表示正在掃描任務(wù)狀態(tài)
w.scanState = i; // publication fence
// 將前面創(chuàng)建的隊(duì)列放在隊(duì)列數(shù)組的i位置上
ws[i] = w;
}
} finally {
// 釋放池鎖
unlockRunState(rs, rs & ~RSLOCK);
}
// 在這里再設(shè)置線(xiàn)程的名稱(chēng)
wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
return w;
}
工作線(xiàn)程的創(chuàng)建與注冊(cè)原理:
- ①將線(xiàn)程設(shè)置為守護(hù)線(xiàn)程议慰,同時(shí)為新線(xiàn)程創(chuàng)建工作隊(duì)列和設(shè)置異常處理策略
- ②嘗試獲取池鎖成功后,先獲取一個(gè)隨機(jī)生成的用于計(jì)算數(shù)組下標(biāo)的索引種子奴曙,然后通過(guò)種子和數(shù)組最大下標(biāo)計(jì)算出一個(gè)奇數(shù)索引值
- ③如果計(jì)算出的奇數(shù)位值不為空别凹,則通過(guò)偶數(shù)掩碼+數(shù)組最大下標(biāo)計(jì)算出一個(gè)偶數(shù)步長(zhǎng),然后通過(guò)這個(gè)步長(zhǎng)循環(huán)整個(gè)數(shù)組找一個(gè)空的位置洽糟,如果找完了整個(gè)數(shù)組還是沒(méi)有奇數(shù)空位炉菲,則對(duì)數(shù)組發(fā)生兩倍擴(kuò)容,然后再次依照步長(zhǎng)遍歷新數(shù)組找空位坤溃,直至找到奇數(shù)空位為止
- ④為隊(duì)列設(shè)置
hint拍霜、config、scanState
值并將隊(duì)列放到計(jì)算出的奇數(shù)位置上 - ⑤釋放池鎖并設(shè)置工作線(xiàn)程名字
工作線(xiàn)程注冊(cè)的原理實(shí)則不難理解薪介,難點(diǎn)在于計(jì)算奇數(shù)位索引有些玄妙祠饺,不理解的小伙伴可以看看下面這個(gè)案例:
// 模擬線(xiàn)程池中的隊(duì)列數(shù)組和EVENMASK偶數(shù)掩碼值
private static final int EVENMASK = 0xfffe;
private static Object[] ws = new Object[8];
public static void main(String[] args) {
// 先將所有位置填滿(mǎn)
for (int i = 0; i < ws.length; i++) {
ws[i] = new Object();
}
// 然后開(kāi)始查找
findOddNumberIdenx();
}
private static void findOddNumberIdenx() {
int n, m, i, probes;
m = (n = ws.length) - 1;
// 模擬第一次計(jì)算出的奇數(shù)位索引為3
i = 3;
probes = 0;
int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
while (ws[i = (i + step) & m] != null) {
System.out.println("查找奇數(shù)位:" + i);
if (++probes >= n) {
System.out.println("擴(kuò)容兩倍....");
ws = Arrays.copyOf(ws, n <<= 1);
m = n - 1;
probes = 0;
}
}
System.out.println("最終確定索引值:" + i);
}
/*
運(yùn)行結(jié)果:
查找奇數(shù)位:1
查找奇數(shù)位:7
查找奇數(shù)位:5
查找奇數(shù)位:3
查找奇數(shù)位:1
查找奇數(shù)位:7
查找奇數(shù)位:5
查找奇數(shù)位:3
擴(kuò)容兩倍....
最終確定索引值:9
*/
4.2、工作線(xiàn)程注銷(xiāo)原理
// ForkJoinPool類(lèi) → deregisterWorker()方法
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
WorkQueue w = null;
// 如果工作線(xiàn)程以及它的工作隊(duì)列不為空
if (wt != null && (w = wt.workQueue) != null) {
WorkQueue[] ws;
// 獲取隊(duì)列在池中數(shù)組的下標(biāo)
int idx = w.config & SMASK;
// 獲取池鎖
int rs = lockRunState();
// 移除隊(duì)列數(shù)組中idx位置的隊(duì)列
if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)
ws[idx] = null;
// 釋放池鎖
unlockRunState(rs, rs & ~RSLOCK);
}
long c;
// 在CTL成員中減去一個(gè)線(xiàn)程數(shù)
do {} while (!U.compareAndSwapLong
(this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
(TC_MASK & (c - TC_UNIT)) |
(SP_MASK & c))));
if (w != null) {
// 標(biāo)識(shí)這個(gè)隊(duì)列已停止工作
w.qlock = -1;
// 將當(dāng)前工作隊(duì)列的偷取任務(wù)數(shù)加到ForkJoinPool#stealCounter中
w.transferStealCount(this);
// 取消隊(duì)列中剩余的任務(wù)
w.cancelAll();
}
for (;;) {
WorkQueue[] ws; int m, sp;
// 如果線(xiàn)程池是要關(guān)閉了汁政,那么直接退出
if (tryTerminate(false, false) || w == null || w.array == null ||
(runState & STOP) != 0 || (ws = workQueues) == null ||
(m = ws.length - 1) < 0)
break;
// 如果線(xiàn)程池不是要關(guān)閉道偷,那么先通過(guò)ctl看看有沒(méi)有阻塞的線(xiàn)程
if ((sp = (int)(c = ctl)) != 0) {
// 如果有則喚醒它來(lái)代替被銷(xiāo)毀的線(xiàn)程工作
if (tryRelease(c, ws[sp & m], AC_UNIT))
break;
}
// 如果池中不存在阻塞掛起的線(xiàn)程,則先判斷池內(nèi)線(xiàn)程是否已滿(mǎn)
else if (ex != null && (c & ADD_WORKER) != 0L) {
// 如果沒(méi)滿(mǎn)則新建一條線(xiàn)程代替被銷(xiāo)毀的線(xiàn)程工作
tryAddWorker(c);
break;
}
// 如果池運(yùn)行正常记劈,不存在線(xiàn)程阻塞勺鸦,線(xiàn)程數(shù)已滿(mǎn)
else
// 那么直接退出
break;
}
if (ex == null)
// 清理異常哈希表中當(dāng)前線(xiàn)程的異常節(jié)點(diǎn)信息
ForkJoinTask.helpExpungeStaleExceptions();
else
// 拋出異常
ForkJoinTask.rethrow(ex);
}
線(xiàn)程注銷(xiāo)的邏輯相對(duì)比較簡(jiǎn)單,如下:
- ①獲取池鎖之后將工作線(xiàn)程的任務(wù)隊(duì)列從數(shù)組中移除目木,移除后釋放池鎖
- ②將偷竊的任務(wù)數(shù)加到stealCounter成員换途,然后取消自身隊(duì)列中的所有任務(wù)
- ③判斷當(dāng)前線(xiàn)程池的情況,判斷當(dāng)前銷(xiāo)毀線(xiàn)程是否是因?yàn)榫€(xiàn)程池要關(guān)閉了:
- 如果是:直接退出
- 如果不是:再判斷池中是否存在掛起阻塞的線(xiàn)程
- 存在:?jiǎn)拘炎枞€(xiàn)程來(lái)代替被銷(xiāo)毀的線(xiàn)程工作
- 不存在:判斷池中線(xiàn)程是否已滿(mǎn)
- 沒(méi)滿(mǎn):新建一條線(xiàn)程代替被銷(xiāo)毀的線(xiàn)程工作
- 滿(mǎn)了:直接退出
- ④清除異常哈希表中當(dāng)前線(xiàn)程的異常節(jié)點(diǎn)信息刽射,然后拋出異常
總的來(lái)說(shuō)军拟,在銷(xiāo)毀線(xiàn)程時(shí),會(huì)先注銷(xiāo)已注冊(cè)的工作隊(duì)列柄冲,注銷(xiāo)之后會(huì)根據(jù)情況選擇喚醒或新建一條線(xiàn)程來(lái)補(bǔ)償線(xiàn)程池。
至此整個(gè)篇幅就較長(zhǎng)了忠蝗,關(guān)于工作線(xiàn)程的執(zhí)行现横、任務(wù)結(jié)果的合并以及任務(wù)竊取的實(shí)現(xiàn)原理則在下篇中為大家分析:(十二)《徹悟并發(fā)之JUC分治思想產(chǎn)物-ForkJoin分支合并框架原理剖析下篇》