大家好巢块,我是小黑掉蔬,一個(gè)在互聯(lián)網(wǎng)茍且偷生的農(nóng)民工。
本期帶來線程池的第二期內(nèi)容浓瞪,如果對(duì)線程池的基本概念還不是很清楚懈玻,可以先看我上一篇文章。
本期內(nèi)容會(huì)從以下幾個(gè)方面解析線程池的具體實(shí)現(xiàn):
- 線程池狀態(tài)
- 線程池初始化
- 如何執(zhí)行任務(wù)
- 鉤子方法
- 等待隊(duì)列和排隊(duì)策略
- 自定義拒絕策略
- 線程池關(guān)閉
- 動(dòng)態(tài)調(diào)整容量
- 合理配置容量
線程池狀態(tài)
ThreadPoolExecutor中定義了如下幾種線程池狀態(tài):
- RUNNING :運(yùn)行狀態(tài)乾颁,該線程池可以接受新任務(wù)和處理排隊(duì)任務(wù)
- SHUTDOWN:關(guān)閉狀態(tài)涂乌,不接受新任務(wù),但處理排隊(duì)任務(wù)
- STOP:停止?fàn)顟B(tài)英岭,不接受新任務(wù)湾盒、不處理排隊(duì)任務(wù)和中斷進(jìn)行中任務(wù)
- TIDYING:整理狀態(tài),所有的線程任務(wù)已終止诅妹,workerCount為零罚勾,轉(zhuǎn)換到整理狀態(tài)的線程將運(yùn)行terminated()鉤子方法
- TERMINATED:終止?fàn)顟B(tài),terminated()執(zhí)行完畢吭狡,線程池將會(huì)被設(shè)置為TERNINATED狀態(tài)尖殃。
我們?cè)谏厦娲a中看到了對(duì)于線程池狀態(tài)的定義,但是并沒有發(fā)現(xiàn)有定義一個(gè)int類型的變量表示當(dāng)前線程池的狀態(tài)划煮,那是怎么做的呢送丰?
我們看到在最上面有定義一個(gè)AtomicInteger ctl
這樣一個(gè)原子類型的Integer,這個(gè)ctl不光可以表示線程池的運(yùn)行狀態(tài)弛秋,同時(shí)能夠表示線程池的有效線程數(shù)workerCount
器躏。那么是怎么做到的呢?我們都知道Integer類型的內(nèi)存大小是4個(gè)字節(jié)蟹略,對(duì)應(yīng)32個(gè)bit登失,ctl將32位的高三位用來表示線程池的運(yùn)行狀態(tài),低29位表示有效線程數(shù)科乎。
這里可以想一下為什么是用高三位表示runState壁畸,不是兩位,也不是4位呢?
因?yàn)榫€程池的狀態(tài)定義了5種捏萍,而二進(jìn)制要能夠表示5種值最少要用3位太抓。比如1位只能表示0和1,兩位能表示00/01/10/11四種令杈,那3位能表示的值則是2^3
也就是8種走敌,所以要標(biāo)識(shí)5種狀態(tài)則最少需要3位。
因?yàn)檫@一點(diǎn)逗噩,也限制了一個(gè)線程池理論上能設(shè)置的最大線程數(shù)是2^29-1個(gè)掉丽。
那如果想要從這一個(gè)字段里取出runState或者workCount的值應(yīng)該怎么做的?
可以看到是通過位運(yùn)算來實(shí)現(xiàn)的异雁。這里先給大家插播一下位運(yùn)算的邏輯捶障。
- 按位與&:兩數(shù)同為1則為1,其他情況為0纲刀,如0101& 0100 的結(jié)果是0100
- 按位或|:兩數(shù)只要有1個(gè)是1项炼,則為1,如 0101|0100的結(jié)果是0101
- 按位取反~:0變1,1變0示绊,如0101按位取反則等于1010
要獲取高位的runState則使用ctl的值c和容量(也就是2^29-1
)取反做與運(yùn)算锭部;
取低位的workerCount則不用取反。
通過一個(gè)字段來表示兩個(gè)概念面褐,并且使用Atomic可以保證操作的原子性拌禾,不得不說Doug Lea,YYDS!!!
線程池初始化
通過ThreadPoolExecutor的構(gòu)造方法我們來看一下線程池在創(chuàng)建的時(shí)候都做了些什么展哭。
可以發(fā)現(xiàn)湃窍,在構(gòu)造方法中只是對(duì)7個(gè)參數(shù)進(jìn)行賦值,并沒有去做線程的創(chuàng)建摄杂,所以在默認(rèn)情況下坝咐,線程池創(chuàng)建后是沒有線程的循榆,需要在任務(wù)提交時(shí)才會(huì)創(chuàng)建線程析恢。
如果需要在線程池創(chuàng)建之后立即創(chuàng)建線程,ThreadPoolExecutor提供了兩個(gè)方法可以實(shí)現(xiàn):
如何執(zhí)行任務(wù)
通過ThreadPoolExecutor執(zhí)行任務(wù)時(shí)可以通過調(diào)用execute()
方法和submit()
方法來完成秧饮。
在這之前我們需要先知道ThreadPoolExecutor中一些比較重要的變量映挂,可點(diǎn)開下圖查看。
接下來我們來看execute()
是如何執(zhí)行任務(wù)的盗尸。
以上execute()
方法的執(zhí)行步驟可以總結(jié)為3步:
- 如果有效線程數(shù)workerCount小于核心線程數(shù)柑船,則嘗試增加一個(gè)線程執(zhí)行當(dāng)前任務(wù),如果成功泼各,則會(huì)在新線程中執(zhí)行任務(wù)鞍时,如果失敗,則執(zhí)行下一步;
- 如果線程池狀態(tài)是running逆巍,則嘗試加入到等待隊(duì)列及塘,如果入隊(duì)成功,則需要重新檢查線程池狀態(tài)是否是running锐极,如果已經(jīng)不是running則要將任務(wù)從隊(duì)列中remove并按照拒絕策略處理笙僚;如果重新檢查線程池狀態(tài)是running,則要判斷workCount是不是等于0灵再,如果等于0則需要?jiǎng)?chuàng)建一個(gè)新的Worker用于執(zhí)行剛?cè)腙?duì)的任務(wù)肋层;
- 如果在第二步入隊(duì)失敗,會(huì)再次嘗試增加一個(gè)Worker執(zhí)行該任務(wù)翎迁,如果這里還是不行栋猖,則表示線程池確實(shí)shutdown或者等待隊(duì)列滿了,就執(zhí)行拒絕策略汪榔。
有了這個(gè)整體之后掂铐,我們?cè)龠M(jìn)一步看看addWorker()
方法,在這之前需要先了解Worker類的結(jié)構(gòu)揍异。
可以發(fā)現(xiàn)Worker類繼承了AQS全陨,并且實(shí)現(xiàn)Runnable接口,也就是說Worker對(duì)象可以交給一個(gè)Thread創(chuàng)建線程后執(zhí)行衷掷。從Worker的構(gòu)造方法里我們也能看出辱姨,thread是通過線程工廠創(chuàng)建一個(gè)線程,將this作為參數(shù)傳遞的戚嗅。
然后我們?cè)賮砜碼ddWorker()方法:
簡單總結(jié)一下addWorker()
方法分以下4步:
- 如果線程池狀態(tài)并且工作隊(duì)列為空雨涛,則直接返回false,如果工作線程數(shù)workerCount小于核心線程數(shù)或者最大線程數(shù)(這里取決于傳入的參數(shù))懦胞,則對(duì)workerCount自旋加1替久;
- 先加鎖,然后將Worker加入到workerset中躏尉,解鎖蚯根;
- 如果worker加入workerset成功,將線程啟動(dòng)胀糜;
- 如果線程啟動(dòng)失敗颅拦,則將worke移除,workerCount原子減1
既然Worker類實(shí)現(xiàn)了Runnable方法教藻,那對(duì)應(yīng)run()
方法中的邏輯就必須要看一下了距帅。
在Worker的run方法中直接調(diào)用外部類ThreadPoolExecutor的runWorker(Worker)
方法。
對(duì)runWorker()
方法總結(jié)為以下幾個(gè)步驟:
- 從傳入worker對(duì)象的初始任務(wù)開始執(zhí)行括堤,如果初始任務(wù)為空則會(huì)調(diào)用
getTask()
方法獲取任務(wù)碌秸,如果返回空著該Worker線程則會(huì)退出绍移;如果因?yàn)橥獠咳蝿?wù)代碼導(dǎo)致的異常拋出,則也會(huì)終止循環(huán)讥电,但是不會(huì)將Worker線程退出登夫; - 在運(yùn)行任務(wù)之前都會(huì)獲取鎖防止其他任務(wù)執(zhí)行時(shí)發(fā)生其他的池中斷,并確保在池沒有停止的情況下保證該線程不會(huì)設(shè)置其他中斷允趟;
- 每個(gè)運(yùn)行任務(wù)都會(huì)調(diào)用
beforeExecute()
方法恼策,這個(gè)方法可能會(huì)拋出異常,這種情況下會(huì)導(dǎo)致任務(wù)不處理潮剪,并且線程會(huì)終止涣楷; - 如果
beforeExecute()
正常執(zhí)行,則會(huì)運(yùn)行任務(wù)執(zhí)行run()
抗碰,在運(yùn)行任務(wù)出拋出的異常和Error等會(huì)收集在thrown變量上狮斗,傳給afterExecute()
; -
run()
執(zhí)行完之后會(huì)執(zhí)行afterExecute()
如果該方法拋出異常同樣會(huì)讓線程終止弧蝇。
那么getTask()
是去哪里獲取任務(wù)呢碳褒?當(dāng)然是從等待隊(duì)列中獲取。
getTask()
的執(zhí)行可以總結(jié)如下:
- 會(huì)根據(jù)當(dāng)前線程池設(shè)置的核心線程數(shù)看疗,最大線程數(shù)沙峻,超時(shí)時(shí)間等,從任務(wù)隊(duì)列獲取任務(wù)两芳,可能會(huì)超時(shí)等待摔寨,也可能會(huì)阻塞知道任務(wù)到達(dá);
- 如果線程池STOP怖辆,或者有超過最大線程數(shù)的工作線程是复,或者線程是SHUTDOWN并且隊(duì)列為空,或者在超時(shí)等待任務(wù)時(shí)超時(shí)這4種情況下會(huì)返回空竖螃。
鉤子方法
在線程池執(zhí)行任務(wù)的runWorker(Worker)
方法中我們發(fā)現(xiàn)淑廊,會(huì)在任務(wù)執(zhí)行前和執(zhí)行后有兩個(gè)方法。
從方法簽名上可以看到這兩個(gè)方法都是protected
的特咆,當(dāng)時(shí)在ThreadPoolExecutor中都沒有具體實(shí)現(xiàn)季惩。所以這兩個(gè)方法主要用于在自定義線程池時(shí)覆蓋,可以在任務(wù)執(zhí)行前和執(zhí)行后做一些事情坚弱。比如初始化threadLocals蜀备,收集統(tǒng)計(jì)信息关摇,打印日志等荒叶。需要注意的是這兩個(gè)方法中如果拋出異常都會(huì)使線程終止。
另外输虱,ThreadPoolExecutor中的terminated()
也可以被覆蓋些楣,可以用于在線程完全終止后執(zhí)行一些特殊處理。
等待隊(duì)列和排隊(duì)策略
在上面的內(nèi)容中很多次的提到了等待隊(duì)列,也就是ThreadPoolExecutor中的workQueue愁茁,用來存放等待執(zhí)行的任務(wù)蚕钦。
workQueue的類型定義為BlockingQueue<Runnable>
通過可以使用以下的三種類型。
有界隊(duì)列
有界隊(duì)列顧名思義就是有邊界的隊(duì)列鹅很,需要指定隊(duì)列的大小嘶居,主要有ArrayBlockingQueue和PriorityBlockingQueue。PriorityBlockingQueue的優(yōu)點(diǎn)是等待隊(duì)列中的任務(wù)可以按照任務(wù)的優(yōu)先級(jí)處理促煮。
有界隊(duì)列的大小設(shè)置需要和線程池大小相互配合邮屁,線程池較小隊(duì)列較大時(shí),可以減少內(nèi)存消耗菠齿,降低線程切換次數(shù)和CPU的使用率佑吝,但是可能會(huì)限制系統(tǒng)的吞吐量,所以要結(jié)合實(shí)際場景考慮如何設(shè)置绳匀。
無界隊(duì)列
隊(duì)列的大小沒有限制芋忿,常用的有LinkedBlockingQueue,使用該隊(duì)列是要謹(jǐn)慎疾棵,當(dāng)任務(wù)比較耗時(shí)時(shí)戈钢,可能會(huì)導(dǎo)致大量任務(wù)堆積在隊(duì)列中導(dǎo)致內(nèi)存溢出。使用Executors.newFixedThreadPool創(chuàng)建的線程池就是使用的LinkedBlockingQueue是尔。
同步移交隊(duì)列
如果不希望隊(duì)列等待逆趣,而是直接交給工作線程執(zhí)行,則可以使用同步移交隊(duì)列SynchronousQueue嗜历,該隊(duì)列實(shí)際不會(huì)存放元素宣渗,要放入時(shí)必須有另一個(gè)現(xiàn)在在等待接收元素才能成功,在這之前會(huì)一直阻塞梨州。
自定義拒絕策略
上一期我們有講到過線程池的4種拒絕策略痕囱。
- AbortPolicy:拒絕處理,拋出異常
- CallerRunsPolicy:由創(chuàng)建該線程的線程(main)執(zhí)行
- DiscardPolicy: 丟棄暴匠,不拋出異常
- DiscardOldestPolicy:和最早創(chuàng)建的線程進(jìn)行競爭鞍恢,不拋出異常
當(dāng)然我們也可以自定義拒絕策略,比如我們?cè)诰芙^策略中做一些日志記錄等自定義的需求每窖。
線程池關(guān)閉
ThreadPoolExecutor中有兩個(gè)方法可以讓線程池關(guān)閉帮掉,如下:
- shutdown():不會(huì)立即終止線程池,而是要等所有任務(wù)緩存隊(duì)列中的任務(wù)都執(zhí)行完后才終止窒典,但再也不會(huì)接受新的任務(wù)
- shutdownNow():立即終止線程池蟆炊,并嘗試打斷正在執(zhí)行的任務(wù),并且清空任務(wù)緩存隊(duì)列瀑志,返回尚未執(zhí)行的任務(wù)
動(dòng)態(tài)調(diào)整容量
ThreadPoolExecutor提供了動(dòng)態(tài)調(diào)整線程池容量大小的方法:setCorePoolSize()
和setMaximumPoolSize()
涩搓。
- setCorePoolSize:設(shè)置核心池大小污秆,如果設(shè)置的值小于核心線程數(shù),則多余線程會(huì)在下一次空閑時(shí)終止昧甘,如果設(shè)置的值較大良拼,并且等待隊(duì)列中有任務(wù),則會(huì)立即創(chuàng)建線程執(zhí)行等待隊(duì)列中的任務(wù)充边。
- setMaximumPoolSize:設(shè)置線程池最大能創(chuàng)建的線程數(shù)目大小庸推,如果新值小于當(dāng)前的最大線程數(shù),則多余的線程會(huì)在下次空閑時(shí)終止浇冰。
配置線程池大小
那么最后我們來說一下應(yīng)該如何來配置線程池的大小呢予弧?或許大多數(shù)程序員都聽過這樣一種說法:
- CPU 密集型應(yīng)用,線程池大小設(shè)置為 CPU核數(shù) + 1
- IO 密集型應(yīng)用湖饱,線程池大小設(shè)置為 2*CPU核數(shù)
到底對(duì)不對(duì)呢掖蛤?
我認(rèn)為是不對(duì)的,因?yàn)樵趯?shí)際場景中井厌,一臺(tái)服務(wù)器可能都不止一個(gè)應(yīng)用蚓庭,而這兩個(gè)公式都只和CPU核數(shù)相關(guān),所以肯定是不正確的仅仆,只有在一臺(tái)服務(wù)器只部署一個(gè)應(yīng)用時(shí)才能勉強(qiáng)說的通器赞;還有一個(gè)原因就是在一個(gè)應(yīng)用中可能不僅僅是CPU密集型或者IO密集型,可能二者都有墓拜,那又該如何選擇呢港柜?以及一個(gè)應(yīng)用中可能會(huì)按照功能劃分多個(gè)線程池,所以最終結(jié)論我覺得這兩種說法不對(duì)咳榜。
那么我們到底應(yīng)該如何設(shè)置線程池的大小呢夏醉?有沒有什么可以實(shí)踐的方法,這里需要給大家介紹一個(gè)理論知識(shí)涌韩。
利特爾法則 (Little's Law)
一個(gè)系統(tǒng)請(qǐng)求數(shù)等于請(qǐng)求的到達(dá)率與平均每個(gè)單獨(dú)請(qǐng)求花費(fèi)的時(shí)間之乘積
別看這個(gè)名字感覺很高大上畔柔,其實(shí)概念很簡單。
結(jié)合到我們的場景中臣樱,我們?cè)O(shè)定單位時(shí)間為1秒鐘來計(jì)算靶擦,λ=每秒收到的請(qǐng)求數(shù),W=每個(gè)任務(wù)執(zhí)行的時(shí)間雇毫,L=λW=每秒平均在系統(tǒng)中運(yùn)行的線程玄捕。
假設(shè)我們的應(yīng)用是單核的,則可以直接將L設(shè)置為線程池大小棚放,但是真實(shí)情況并不是枚粘,那么多個(gè)CPU對(duì)我們這個(gè)公式中的哪部分?jǐn)?shù)據(jù)會(huì)有影響呢?
主要是對(duì)于W的值有影響席吴,需要知道一個(gè)請(qǐng)求中的線程IO時(shí)間和線程CPU時(shí)間赌结。帶入公式后則是:
λ=((IO時(shí)間+CPU時(shí)間)/CPU時(shí)間)*CPU個(gè)數(shù)
那么需要獲得IO時(shí)間和CPU時(shí)間捞蛋,則需要通過在代碼中進(jìn)行埋點(diǎn)才能準(zhǔn)確獲得孝冒,比如通過AOP切面編程在請(qǐng)求前后獲取時(shí)間得到結(jié)果柬姚。
當(dāng)然,僅僅依靠這個(gè)公式還是不夠的庄涡,還需要通過壓力測試進(jìn)行調(diào)整和檢驗(yàn)量承,才能更準(zhǔn)確的配置。
以上就是本期的全部內(nèi)容穴店,我們下期見撕捍,關(guān)注我的公眾號(hào)【小黑說Java】,更多干貨內(nèi)容。