Java的一大優(yōu)勢是能完成多線程任務(wù)缎浇,對(duì)線程的封裝和調(diào)度非常好,那么它又是如何實(shí)現(xiàn)的呢蒋院?
jdk的包下和線程相關(guān)類的類圖买决。
從上圖可以看出Java線程池的實(shí)現(xiàn)類主要有兩個(gè)類ForkJoinPool和ThreadPoolExecutor。
ThreadPoolExecutor :標(biāo)準(zhǔn)線程池
ScheduledThreadPoolExecutor: 支持延遲任務(wù)的線程池
ForkJoinPool :類似于ThreadPoolExecutor 带猴,但是使用work-stealing模式昔汉,其會(huì)為線程池中每個(gè)線程創(chuàng)建一個(gè)隊(duì)列,從而用work-stealing(任務(wù)竊人┣濉)算法使得線程可以從其他線程隊(duì)列里竊取任務(wù)來執(zhí)行靶病。即如果自己的任務(wù)處理完成了,則可以去忙碌的工作線程那里竊取任務(wù)來執(zhí)行
使用Executors來創(chuàng)建線程池口予。
1娄周、創(chuàng)建單線程的線程池。
public static ExecutorServicenewSingleThreadExecutor(){
? ? return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 0L, TimeUnit.MILLISECONDS,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? new LinkedBlockingQueue<Runnable>()));
}
2苹威、創(chuàng)建固定數(shù)量的線程池
public static ExecutorServicenewFixedThreadPool(int nThreads) {
? ? return new ThreadPoolExecutor(nThreads, nThreads,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 0L, TimeUnit.MILLISECONDS,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? new LinkedBlockingQueue<Runnable>());
}
3昆咽、創(chuàng)建可緩沖的線程池
public static ExecutorServicenewCachedThreadPool(){
? ? return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 60L, TimeUnit.SECONDS,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? new SynchronousQueue<Runnable>());
}
其中ThreadPoolExecutor的構(gòu)造函數(shù)中有幾個(gè)參數(shù)驾凶,現(xiàn)在介紹這些參數(shù)牙甫,是理解線程池工作原理的重要方式:
1、第一個(gè)參數(shù):int corePoolSIze调违,核心池大小窟哺,也就是線程池中會(huì)維持不被釋放的線程數(shù)量。我們可以看到FixedThreadPool中這個(gè)參數(shù)值就是設(shè)定的線程數(shù)量技肩,而SingleThreadExcutor中就是1且轨,newCachedThreadPool中就是0,不會(huì)維持虚婿,只會(huì)緩存60L旋奢。但需要注意的是,在線程池剛創(chuàng)建時(shí)然痊,里面并沒有建好的線程至朗,只有當(dāng)有任務(wù)來的時(shí)候才會(huì)創(chuàng)建(除非調(diào)用方法prestartAllCoreThreads()與prestartCoreThread()方法),在corePoolSize數(shù)量范圍的線程在完成任務(wù)后不會(huì)被回收剧浸。
2锹引、第二個(gè)參數(shù):int maximumPoolSize矗钟,線程池的最大線程數(shù),代表著線程池中能創(chuàng)建多少線程池嫌变。超出corePoolSize吨艇,小于maximumPoolSize的線程會(huì)在執(zhí)行任務(wù)結(jié)束后被釋放。此配置在CatchedThreadPool中有效腾啥。
3东涡、第三個(gè)參數(shù):long keepAliveTime,剛剛說到的會(huì)被釋放的線程緩存的時(shí)間倘待。我們可以看到软啼,正如我們所說的,在CachedThreadPool()構(gòu)造過程中延柠,會(huì)被設(shè)置緩存時(shí)間為60s(時(shí)間單位由第四個(gè)參數(shù)控制)祸挪。
4、第四個(gè)參數(shù):TimeUnit unit贞间,設(shè)置第三個(gè)參數(shù)keepAliveTime的時(shí)間單位贿条。
5、第五個(gè)參數(shù):存儲(chǔ)等待執(zhí)行任務(wù)的阻塞隊(duì)列增热,有多種選擇整以,分別介紹:
SynchronousQueue——直接提交策略,適用于CachedThreadPool峻仇。它將任務(wù)直接提交給線程而不保持它們公黑。如果不存在可用于立即運(yùn)行任務(wù)的線程,則試圖把任務(wù)加入隊(duì)列將失敗摄咆,因此會(huì)構(gòu)造一個(gè)新的線程凡蚜。此策略可以避免在處理可能具有內(nèi)部依賴性的請(qǐng)求集時(shí)出現(xiàn)鎖。直接提交通常要求最大的 maximumPoolSize 以避免拒絕新提交的任務(wù)(正如CachedThreadPool這個(gè)參數(shù)的值為Integer.MAX_VALUE)吭从。當(dāng)任務(wù)以超過隊(duì)列所能處理的量朝蜘、連續(xù)到達(dá)時(shí),此策略允許線程具有增長的可能性涩金。吞吐量較高谱醇。
LinkedBlockingQueue——無界隊(duì)列,適用于FixedThreadPool與SingleThreadExcutor步做「笨剩基于鏈表的阻塞隊(duì)列,創(chuàng)建的線程數(shù)不會(huì)超過corePoolSizes(maximumPoolSize值與其一致)全度,當(dāng)線程正忙時(shí)煮剧,任務(wù)進(jìn)入隊(duì)列等待。按照FIFO原則對(duì)元素進(jìn)行排序,吞吐量高于ArrayBlockingQueue轿秧。
ArrayBlockingQueue——有界隊(duì)列中跌,有助于防止資源耗盡,但是可能較難調(diào)整和控制菇篡。隊(duì)列大小和最大池大小可能需要相互折衷:使用大型隊(duì)列和小型池可以最大限度地降低 CPU 使用率漩符、操作系統(tǒng)資源和上下文切換開銷,但是可能導(dǎo)致人工降低吞吐量驱还。如果任務(wù)頻繁阻塞(例如嗜暴,如果它們是 I/O邊界),則系統(tǒng)可能為超過您許可的更多線程安排時(shí)間议蟆。使用小型隊(duì)列通常要求較大的池大小闷沥,CPU使用率較高,但是可能遇到不可接受的調(diào)度開銷咐容,這樣也會(huì)降低吞吐量舆逃。
ThreadPoolExcutor的構(gòu)造方式不僅有這一種,總共有四種戳粒,還可以在最后加入一個(gè)參數(shù)以控制線程池任務(wù)超額處理策略:
當(dāng)用來緩存待處理任務(wù)的隊(duì)列已滿時(shí)路狮,又加入了新的任務(wù),那么這時(shí)候就該考慮如何處理這個(gè)任務(wù)蔚约。
可以通過實(shí)現(xiàn)RejectedExceptionHandler接口奄妨,實(shí)現(xiàn)rejectedException(ThreadPoolExecutor e, Runnable r)方法自定義操作。但通常我們使用JDK提供了4種處理策略苹祟,在ThreadPoolExecutor構(gòu)造時(shí)以參數(shù)傳入:
ThreadPoolExcutor.AbortPolicy()——直接拋出異常砸抛,默認(rèn)操作
ThreadPoolExcutor.CallerRunsPolicy()——只用調(diào)用者所在線程來運(yùn)行任務(wù)
ThreadPoolExcutor.DiscardOldersPolicy()——丟棄隊(duì)列里最近的一個(gè)任務(wù),并執(zhí)行當(dāng)前任務(wù)
ThreadPoolExcutor.DiscardPolicy()——不處理树枫,直接丟掉
接著直焙,我們看一下線程池中比較重要的execute方法,該方法用于向線程池中添加一個(gè)任務(wù)团赏。
核心模塊用紅框標(biāo)記了箕般。
第一個(gè)紅框:workerCountOf方法根據(jù)ctl的低29位耐薯,得到線程池的當(dāng)前線程數(shù)舔清,如果線程數(shù)小于corePoolSize,則執(zhí)行addWorker方法創(chuàng)建新的線程執(zhí)行任務(wù)曲初;
第二個(gè)紅框:判斷線程池是否在運(yùn)行体谒,如果在,任務(wù)隊(duì)列是否允許插入臼婆,插入成功再次驗(yàn)證線程池是否運(yùn)行抒痒,如果不在運(yùn)行,移除插入的任務(wù)颁褂,然后拋出拒絕策略故响。如果在運(yùn)行傀广,沒有線程了,就啟用一個(gè)線程彩届。
第三個(gè)紅框:如果添加非核心線程失敗伪冰,就直接拒絕了。
接下來樟蠕,我們看看如何添加一個(gè)工作線程的贮聂?
添加worker線程
從方法execute的實(shí)現(xiàn)可以看出:addWorker主要負(fù)責(zé)創(chuàng)建新的線程并執(zhí)行任務(wù),代碼如下(這里代碼有點(diǎn)長寨辩,沒關(guān)系吓懈,也是分塊的,總共有5個(gè)關(guān)鍵的代碼塊):
第一個(gè)紅框:做是否能夠添加工作線程條件過濾:
判斷線程池的狀態(tài)靡狞,如果線程池的狀態(tài)值大于或等SHUTDOWN耻警,則不處理提交的任務(wù),直接返回甸怕;
第二個(gè)紅框:做自旋钾虐,更新創(chuàng)建線程數(shù)量:
通過參數(shù)core判斷當(dāng)前需要?jiǎng)?chuàng)建的線程是否為核心線程,如果core為true夯尽,且當(dāng)前線程數(shù)小于corePoolSize袱箱,則跳出循環(huán),開始創(chuàng)建新的線程
有人或許會(huì)疑問 retry 是什么式曲?這個(gè)是java中的goto語法妨托。只能運(yùn)用在break和continue后面。
接著看后面的代碼:
第一個(gè)紅框:獲取線程池主鎖吝羞。
線程池的工作線程通過Woker類實(shí)現(xiàn)兰伤,通過ReentrantLock鎖保證線程安全。
第二個(gè)紅框:添加線程到workers中(線程池中)钧排。
第三個(gè)紅框:啟動(dòng)新建的線程敦腔。
接下來,我們看看workers是什么恨溜。
一個(gè)hashSet符衔。所以,線程池底層的存儲(chǔ)結(jié)構(gòu)其實(shí)就是一個(gè)HashSet糟袁。
worker線程處理隊(duì)列任務(wù)
第一個(gè)紅框:是否是第一次執(zhí)行任務(wù)判族,或者從隊(duì)列中可以獲取到任務(wù)。
第二個(gè)紅框:獲取到任務(wù)后项戴,執(zhí)行任務(wù)開始前操作鉤子形帮。
第三個(gè)紅框:執(zhí)行任務(wù)。
第四個(gè)紅框:執(zhí)行任務(wù)后鉤子。
這兩個(gè)鉤子(beforeExecute辩撑,afterExecute)允許我們自己繼承線程池界斜,做任務(wù)執(zhí)行前后處理。
到這里暫時(shí)結(jié)束