什么是死鎖绊茧?如何避免
所謂死鎖:是指兩個或兩個以上的進程在執(zhí)行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用菱农,它們都將無法推進下去。此時稱系統(tǒng)處于死鎖狀態(tài)或系統(tǒng)產生了死鎖柿估,這些永遠在互相等待的進程稱為死鎖進程循未。由于資源占用是互斥的,當某個進程提出申請資源后秫舌,使得有關進程在無外力協助下的妖,永遠分配不到必需的資源而無法繼續(xù)運行,這就產生了一種特殊現象死鎖足陨。
雖然進程在運行過程中嫂粟,可能發(fā)生死鎖,但死鎖的發(fā)生也必須具備一定的條件墨缘,死鎖的發(fā)生必須具備以下四個必要條件星虹。
1)互斥條件:指進程對所分配到的資源進行排它性使用,即在一段時間內某資源只由一個進程占用镊讼。如果此時還有其它進程請求資源宽涌,則請求者只能等待,直至占有資源的進程用畢釋放蝶棋。
2)請求和保持條件:指進程已經保持至少一個資源卸亮,但又提出了新的資源請求,而該資源已被其它進程占有嚼松,此時請求進程阻塞嫡良,但又對自己已獲得的其它資源保持不放。
3)不剝奪條件:指進程已獲得的資源献酗,在未使用完之前寝受,不能被剝奪,只能在使用完時由自己釋放罕偎。
4)環(huán)路等待條件:指在發(fā)生死鎖時很澄,必然存在一個進程——資源的環(huán)形鏈,即進程集合{P0颜及,P1甩苛,P2,···俏站,Pn}中的P0正在等待一個P1占用的資源讯蒲;P1正在等待P2占用的資源,……肄扎,Pn正在等待已被P0占用的資源墨林。
在系統(tǒng)中已經出現死鎖后,應該及時檢測到死鎖的發(fā)生犯祠,并采取適當的措施來解除死鎖旭等。目前處理死鎖的方法可歸結為以下四種:
1)預防死鎖。
這是一種較簡單和直觀的事先預防的方法衡载。方法是通過設置某些限制條件搔耕,去破壞產生死鎖的四個必要條件中的一個或者幾個,來預防發(fā)生死鎖痰娱。預防死鎖是一種較易實現的方法弃榨,已被廣泛使用。但是由于所施加的限制條件往往太嚴格猜揪,可能會導致系統(tǒng)資源利用率和系統(tǒng)吞吐量降低惭墓。
2)避免死鎖。
該方法同樣是屬于事先預防的策略而姐,但它并不須事先采取各種限制措施去破壞產生死鎖的的四個必要條件腊凶,而是在資源的動態(tài)分配過程中,用某種方法去防止系統(tǒng)進入不安全狀態(tài)拴念,從而避免發(fā)生死鎖钧萍。
3)檢測死鎖。
這種方法并不須事先采取任何限制性措施政鼠,也不必檢查系統(tǒng)是否已經進入不安全區(qū)风瘦,此方法允許系統(tǒng)在運行過程中發(fā)生死鎖。但可通過系統(tǒng)所設置的檢測機構公般,及時地檢測出死鎖的發(fā)生万搔,并精確地確定與死鎖有關的進程和資源胡桨,然后采取適當措施,從系統(tǒng)中將已發(fā)生的死鎖清除掉瞬雹。
4)解除死鎖昧谊。
這是與檢測死鎖相配套的一種措施。當檢測到系統(tǒng)中已發(fā)生死鎖時酗捌,須將進程從死鎖狀態(tài)中解脫出來呢诬。常用的實施方法是撤銷或掛起一些進程,以便回收一些資源胖缤,再將這些資源分配給已處于阻塞狀態(tài)的進程尚镰,使之轉為就緒狀態(tài),以繼續(xù)運行哪廓。死鎖的檢測和解除措施狗唉,有可能使系統(tǒng)獲得較好的資源利用率和吞吐量,但在實現上難度也最大撩独。
線程和進程的差別是什么敞曹?
進程是指在系統(tǒng)中正在運行的一個應用程序;程序一旦運行就是進程综膀,或者更專業(yè)化來說:進程是指程序執(zhí)行時的一個實例澳迫。線程是進程的一個實體。
進程——資源分配的最小單位剧劝,
線程——程序執(zhí)行的最小單位橄登。
Java里面的Threadlocal是怎樣實現的?
ThreadLocal 是線程本地數據存儲類讥此,通過ThreadLocal可以在特定的線程中存儲數據和變量, 并且這些數據之后只能由該線程訪問,其他線程是訪問不了的, 保證各個線程里數據和變量的獨立性; 即ThreadLocal使每個線程可以訪問自己內部的副本變量拢锹。
ThreadLocal類提供的幾個方法:????
public Tget() { }
get 方法是用來獲取 ThreadLocal 在當前線程中保存的變量副本
public voidset(T value) { }
set 用來設置當前線程中變量的副本
public voidremove() { }
remove 用來移除當前線程中變量的副本
protected TinitialValue() { }
initialValue 是一個protected方法,一般是用來在使用時進行重寫的萄喳,做初始化操作
ConcurrentHashMap的實現原理是卒稳?
ConcurrentHashMap是Java1.5中引用的一個線程安全的支持高并發(fā)的HashMap集合類。
1他巨、線程不安全的HashMap
因為多線程環(huán)境下充坑,使用Hashmap進行put操作會引起死循環(huán),導致CPU利用率接近100%染突,所以在并發(fā)情況下不能使用HashMap捻爷。
2、效率低下的HashTable
HashTable容器使用synchronized來保證線程安全份企,但在線程競爭激烈的情況下HashTable的效率非常低下也榄。
因為當一個線程訪問HashTable的同步方法時,其他線程訪問HashTable的同步方法時司志,可能會進入阻塞或輪詢狀態(tài)甜紫。
如線程1使用put進行添加元素降宅,線程2不但不能使用put方法添加元素,并且也不能使用get方法來獲取元素囚霸,所以競爭越激烈效率越低钉鸯。
3、鎖分段技術
HashTable容器在競爭激烈的并發(fā)環(huán)境下表現出效率低下的原因邮辽,是因為所有訪問HashTable的線程都必須競爭同一把鎖,
那假如容器里有多把鎖贸营,每一把鎖用于鎖容器其中一部分數據吨述,那么當多線程訪問容器里不同數據段的數據時,線程間就不會存在鎖競爭钞脂,從而可以有效的提高并發(fā)訪問效率揣云,這就是ConcurrentHashMap所使用的鎖分段技術。首先將數據分成一段一段的存儲冰啃,然后給每一段數據配一把鎖邓夕,當一個線程占用鎖訪問其中一個段數據的時候,其他段的數據也能被其他線程訪問阎毅。有些方法需要跨段焚刚,比如size()和containsValue(),它們可能需要鎖定整個表而而不僅僅是某個段扇调,這需要按順序鎖定所有段矿咕,操作完畢后,又按順序釋放所有段的鎖狼钮。這里“按順序”是很重要的碳柱,否則極有可能出現死鎖,在ConcurrentHashMap內部熬芜,段數組是final的莲镣,并且其成員變量實際上也是final的,但是涎拉,僅僅是將數組聲明為final的并不保證數組成員也是final的瑞侮,這需要實現上的保證。這可以確保不會出現死鎖曼库,因為獲得鎖的順序是固定的区岗。
sleep和wait區(qū)別
對于sleep()方法,我們首先要知道該方法是屬于Thread類中的毁枯。而wait()方法慈缔,則是屬于Object類中的。
sleep()方法導致了程序暫停執(zhí)行指定的時間种玛,讓出cpu該其他線程藐鹤,但是他的監(jiān)控狀態(tài)依然保持著瓤檐,當指定的時間到了又會自動恢復運行狀態(tài)缩滨。在調用sleep()方法的過程中凳干,線程不會釋放對象鎖。而當調用wait()方法的時候断傲,線程會放棄對象鎖肄满,進入等待此對象的等待鎖定池谴古,只有針對此對象調用notify()方法后本線程才進入對象鎖定池準備,獲取對象鎖進入運行狀態(tài)稠歉。
notify和notifyAll區(qū)別
wait后notify方法只喚醒一個等待(對象的)線程并使該線程開始執(zhí)行掰担。所以如果有多個線程等待一個對象,這個方法只會喚醒其中一個線程怒炸,選擇哪個線程取決于操作系統(tǒng)對多線程管理的實現带饱。notifyAll會喚醒所有等待(對象的)線程,盡管哪一個線程將會第一個處理取決于操作系統(tǒng)的實現
ThreadLocal的作用與實現
ThreadLocal<Boolean> mBooleanThreadLocal = new ThreadLocal<>();
這些數據之后只能由該線程訪問,其他線程是訪問不了的, 保證各個線程里數據和變量的獨立性; 即ThreadLocal使每個線程可以訪問自己內部的副本變量
兩個線程如何串行執(zhí)行
為了控制線程執(zhí)行的順序阅羹,如ThreadA->ThreadB->ThreadC->ThreadA循環(huán)執(zhí)行三個線程勺疼,
我們需要確定喚醒、等待的順序捏鱼。這時我們可以同時使用 Obj.wait()执庐、Obj.notify()與synchronized(Obj)來實現這個目標。
通常情況下导梆,wait是線程在獲取對象鎖后耕肩,主動釋放對象鎖,同時本線程休眠问潭,直到有其它線程調用對象的notify()喚醒該線程猿诸,才能繼續(xù)獲取對象鎖,并繼續(xù)執(zhí)行狡忙。而notify()則是對等待對象鎖的線程的喚醒操作梳虽。但值得注意的是notify()調用后,并不是馬上就釋放對象鎖灾茁,而是在相應的synchronized(){}語句塊執(zhí)行結束窜觉。釋放對象鎖后,JVM會在執(zhí)行wait()等待對象鎖的線程中隨機選取一線程北专,賦予其對象鎖禀挫,喚醒線程,繼續(xù)執(zhí)行拓颓。
上下文切換是什么含義
每個任務運行前语婴,CPU 都需要知道任務從哪里加載、又從哪里開始運行,這就涉及到 CPU 寄存器 和 程序計數器(PC):
CPU 寄存器是 CPU 內置的容量小砰左、但速度極快的內存匿醒;程序計數器會存儲 CPU 正在執(zhí)行的指令位置,或者即將執(zhí)行的指令位置缠导。
這兩個是 CPU 運行任何任務前都必須依賴的環(huán)境廉羔,因此叫做 CPU 上下文。
上下文切換
將前一個 CPU 的上下文(也就是 CPU 寄存器和程序計數器里邊的內容)保存起來僻造;
然后加載新任務的上下文到寄存器和程序計數器憋他;
最后跳轉到程序計數器所指的新位置,運行新任務髓削。
被保存起來的上下文會存儲到系統(tǒng)內核中举瑰,等待任務重新調度執(zhí)行時再次加載進來。
CPU 的上下文切換分三種:進程上下文切換蔬螟、線程上下文切換、中斷上下文切換汽畴。
可以運行時kill掉一個線程嗎旧巾?
如果一個線程由于等待某些事件的發(fā)生而被阻塞,又該怎樣停止該線程呢忍些?這種情況經常會發(fā)生鲁猩,比如當一個線程由于需要等候鍵盤輸入而被阻塞,或者調用Thread.join()方法罢坝,或者Thread.sleep()方法廓握,在網絡中調用ServerSocket.accept()方法,或者調用了DatagramSocket.receive()方法時嘁酿,都有可能導致線程阻塞隙券,使線程處于處于不可運行狀態(tài)時,即使主程序中將該線程的共享變量設置為true闹司,但該線程此時根本無法檢查循環(huán)標志娱仔,當然也就無法立即中斷。這里我們給出的建議是游桩,不要使用stop()方法牲迫,而是使用Thread提供的interrupt()方法,因為該方法雖然不會中斷一個正在運行的線程借卧,但是它可以使一個被阻塞的線程拋出一個中斷異常盹憎,從而使線程提前結束阻塞狀態(tài),退出堵塞代碼铐刘。
public class TestPersonProxy extends Thread {
????volatile boolean stop = false;
????public void run() {
????????while (!stop) {
????????????System.out.println(getName() + " is running");
????????????try {
????????????????sleep(1000);
????????????} catch (InterruptedException e) {
????????????????System.out.println("week up from blcok...");
????????????????stop = true; // 在異常處理代碼中修改共享變量的狀態(tài)
????????????}
????????}
????????System.out.println(getName() + " is exiting...");
????}
}
class InterruptThreadDemo3 {
????public static void main(String[] args) throws InterruptedException {
????????TestPersonProxy m1 = new TestPersonProxy();
????????System.out.println("Starting thread...");
????????m1.start();
????????Thread.sleep(3000);
????????System.out.println("Interrupt thread...: " + m1.getName());
????????m1.stop = true; // 設置共享變量為true
????????m1.interrupt(); // 阻塞時退出阻塞狀態(tài)
????????Thread.sleep(3000); // 主線程休眠3秒以便觀察線程m1的中斷情況
????????System.out.println("Stopping application...");
????}
}
什么是條件鎖陪每、讀寫鎖、自旋鎖、可重入鎖奶稠?
自旋鎖可以使線程在沒有取得鎖的時候俯艰,不被掛起,而轉去執(zhí)行一個空循環(huán)锌订,(即所謂的自旋竹握,就是自己執(zhí)行空循環(huán)),若在若干個空循環(huán)后辆飘,線程如果可以獲得鎖啦辐,則繼續(xù)執(zhí)行。若線程依然不能獲得鎖蜈项,才會被掛起芹关。
使用自旋鎖后,線程被掛起的幾率相對減少紧卒,線程執(zhí)行的連貫性相對加強侥衬。因此,對于那些鎖競爭不是很激烈跑芳,鎖占用時間很短的并發(fā)線程轴总,具有一定的積極意義,但對于鎖競爭激烈博个,單線程鎖占用很長時間的并發(fā)程序怀樟,自旋鎖在自旋等待后,往往毅然無法獲得對應的鎖盆佣,不僅僅白白浪費了CPU時間往堡,最終還是免不了被掛起的操作 ,反而浪費了系統(tǒng)的資源共耍。
在JDK1.6中虑灰,Java虛擬機提供-XX:+UseSpinning參數來開啟自旋鎖,使用-XX:PreBlockSpin參數來設置自旋鎖等待的次數痹兜。
在JDK1.7開始瘩缆,自旋鎖的參數被取消,虛擬機不再支持由用戶配置自旋鎖佃蚜,自旋鎖總是會執(zhí)行庸娱,自旋鎖次數也由虛擬機自動調整。
可能引起的問題:
1.過多占據CPU時間:如果鎖的當前持有者長時間不釋放該鎖谐算,那么等待者將長時間的占據cpu時間片熟尉,導致CPU資源的浪費,因此可以設定一個時間洲脂,當鎖持有者超過這個時間不釋放鎖時斤儿,等待者會放棄CPU時間片阻塞剧包;
2.死鎖問題:試想一下,有一個線程連續(xù)兩次試圖獲得自旋鎖(比如在遞歸程序中)往果,第一次這個線程獲得了該鎖疆液,當第二次試圖加鎖的時候,檢測到鎖已被占用(其實是被自己占用)陕贮,那么這時堕油,線程會一直等待自己釋放該鎖,而不能繼續(xù)執(zhí)行肮之,這樣就引起了死鎖掉缺。因此遞歸程序使用自旋鎖應該遵循以下原則:遞歸程序決不能在持有自旋鎖時調用它自己,也決不能在遞歸調用時試圖獲得相同的自旋鎖戈擒。
2眶明、阻塞鎖
讓線程進入阻塞狀態(tài)進行等待,當獲得相應的信號(喚醒筐高,時間) 時搜囱,才可以進入線程的準備就緒狀態(tài),準備就緒狀態(tài)的所有線程柑土,通過競爭蜀肘,進入運行狀態(tài)。冰单。
JAVA中,能夠進入\退出灸促、阻塞狀態(tài)或包含阻塞鎖的方法有 诫欠,synchronized 關鍵字(其中的重量鎖),ReentrantLock浴栽,Object.wait()\notify()
3荒叼、可重入鎖
可重入鎖,也叫做遞歸鎖典鸡,指的是同一線程 外層函數獲得鎖之后 被廓,內層遞歸函數仍然有獲取該鎖的代碼,但不受影響萝玷。
在JAVA環(huán)境下 ReentrantLock 和synchronized 都是 可重入鎖
線程池ThreadPoolExecutor的實現原理嫁乘?
線程池有多重要
線程是一個程序員一定會涉及到的一個概念,但是線程的創(chuàng)建和切換都是代價比較大的球碉。所以蜓斧,我們有沒有一個好的方案能做到線程的復用呢?這就涉及到一個概念——線程池睁冬。合理的使用線程池能夠帶來3個很明顯的好處:
1.降低資源消耗:通過重用已經創(chuàng)建的線程來降低線程創(chuàng)建和銷毀的消耗
2.提高響應速度:任務到達時不需要等待線程創(chuàng)建就可以立即執(zhí)行挎春。
3.提高線程的可管理性:線程池可以統(tǒng)一管理、分配、調優(yōu)和監(jiān)控直奋。
java多線程池的支持——ThreadPoolExecutor
java的線程池支持主要通過ThreadPoolExecutor來實現能庆,我們使用的ExecutorService的各種線程池策略都是基于ThreadPoolExecutor實現的,所以ThreadPoolExecutor十分重要脚线。要弄明白各種線程池策略搁胆,必須先弄明白ThreadPoolExecutor。
實現原理
首先看一個線程池的流程圖
step1.調用ThreadPoolExecutor的execute提交線程殉挽,首先檢查CorePool丰涉,如果CorePool內的線程小于CorePoolSize,新創(chuàng)建線程執(zhí)行任務斯碌。
step2.如果當前CorePool內的線程大于等于CorePoolSize一死,那么將線程加入到BlockingQueue。
step3.如果不能加入BlockingQueue傻唾,在小于MaxPoolSize的情況下創(chuàng)建線程執(zhí)行任務投慈。
step4.如果線程數大于等于MaxPoolSize,那么執(zhí)行拒絕策略
線程池的創(chuàng)建
線程池的創(chuàng)建可以通過ThreadPoolExecutor的構造方法實現:
public ThreadPoolExecutor(int corePoolSize,
??????????????????????????????int maximumPoolSize,
??????????????????????????????long keepAliveTime,
??????????????????????????????TimeUnit unit,
??????????????????????????????BlockingQueue<Runnable> workQueue,
??????????????????????????????ThreadFactory threadFactory,
??????????????????????????????RejectedExecutionHandler handler) {
????????if (corePoolSize < 0 ||
????????????maximumPoolSize <= 0 ||
????????????maximumPoolSize < corePoolSize ||
????????????keepAliveTime < 0)
????????????throw new IllegalArgumentException();
????????if (workQueue == null || threadFactory == null || handler == null)
????????????throw new NullPointerException();
????????this.corePoolSize = corePoolSize;
????????this.maximumPoolSize = maximumPoolSize;
????????this.workQueue = workQueue;
????????this.keepAliveTime = unit.toNanos(keepAliveTime);
????????this.threadFactory = threadFactory;
????????this.handler = handler;
????具體解釋一下上述參數:
corePoolSize 核心線程池大小
maximumPoolSize 線程池最大容量大小
keepAliveTime 線程池空閑時冠骄,線程存活的時間
TimeUnit 時間單位
ThreadFactory 線程工廠
BlockingQueue任務隊列
RejectedExecutionHandler 線程拒絕策略
線程的提交
ThreadPoolExecutor的構造方法如上所示伪煤,但是只是做一些參數的初始化,ThreadPoolExecutor被初始化好之后便可以提交線程任務凛辣,線程的提交方法主要是execute和submit抱既。這里主要說execute,submit會在后續(xù)的博文中分析
public void execute(Runnable command) {
????????if (command == null)
????????????throw new NullPointerException();
????????/*
?????????* Proceed in 3 steps:
?????????*
?????????* 1. If fewer than corePoolSize threads are running, try to
?????????* start a new thread with the given command as its first
?????????* task.??The call to addWorker atomically checks runState and
?????????* workerCount, and so prevents false alarms that would add
?????????* threads when it shouldn't, by returning false.
?????????* 如果當前的線程數小于核心線程池的大小扁誓,根據現有的線程作為第一個Worker運行的線程防泵,
?????????* 新建一個Worker,addWorker自動的檢查當前線程池的狀態(tài)和Worker的數量蝗敢,
?????????* 防止線程池在不能添加線程的狀態(tài)下添加線程
?????????*
?????????* 2. If a task can be successfully queued, then we still need
?????????* to double-check whether we should have added a thread
?????????* (because existing ones died since last checking) or that
?????????* the pool shut down since entry into this method. So we
?????????* recheck state and if necessary roll back the enqueuing if
?????????* stopped, or start a new thread if there are none.
?????????*??如果線程入隊成功捷泞,然后還是要進行double-check的,因為線程池在入隊之后狀態(tài)是可能會發(fā)生變化的
?????????*
?????????* 3. If we cannot queue task, then we try to add a new
?????????* thread.??If it fails, we know we are shut down or saturated
?????????* and so reject the task.
?????????*
?????????* 如果task不能入隊(隊列滿了)寿谴,這時候嘗試增加一個新線程锁右,如果增加失敗那么當前的線程池狀態(tài)變化了或者線程池已經滿了
?????????* 然后拒絕task
?????????*/
????????int c = ctl.get();
????????//當前的Worker的數量小于核心線程池大小時,新建一個Worker讶泰。
????????if (workerCountOf(c) < corePoolSize) {
????????????if (addWorker(command, true))
????????????????return;
????????????c = ctl.get();
????????}
????????if (isRunning(c) && workQueue.offer(command)) {
????????????int recheck = ctl.get();
????????????if (! isRunning(recheck) && remove(command))//recheck防止線程池狀態(tài)的突變咏瑟,如果突變,那么將reject線程痪署,防止workQueue中增加新線程
????????????????reject(command);
????????????else if (workerCountOf(recheck) == 0)//上下兩個操作都有addWorker的操作响蕴,但是如果在workQueue.offer的時候Worker變?yōu)?,
????????????????????????????????????????????????//那么將沒有Worker執(zhí)行新的task惠桃,所以增加一個Worker.
????????????????addWorker(null, false);
????????}
????????//如果workQueue滿了浦夷,那么這時候可能還沒到線程池的maxnum辖试,所以嘗試增加一個Worker
????????else if (!addWorker(command, false))
????????????reject(command);//如果Worker數量到達上限,那么就拒絕此線程
????}
這里需要明確幾個概念:
Worker和Task的區(qū)別劈狐,Worker是當前線程池中的線程罐孝,而task雖然是runnable,但是并沒有真正執(zhí)行肥缔,只是被Worker調用了run方法莲兢,后面會看到這部分的實現。
maximumPoolSize和corePoolSize的區(qū)別:這個概念很重要续膳,maximumPoolSize為線程池最大容量改艇,也就是說線程池最多能起多少Worker。corePoolSize是核心線程池的大小坟岔,當corePoolSize滿了時谒兄,同時workQueue full(ArrayBolckQueue是可能滿的) 那么此時允許新建Worker去處理workQueue中的Task,但是不能超過maximumPoolSize社付。超過corePoolSize之外的線程會在空閑超時后終止承疲。
核心方法:addWorker
Worker的增加和Task的獲取以及終止都是在此方法中實現的,也就是這一個方法里面包含了很多東西鸥咖。在addWorker方法中提到了Status的概念燕鸽,Status是線程池的核心概念,這里我們先看一段關于status的注釋:
/**
?????* 首先ctl是一個原子量啼辣,同時它里面包含了兩個field啊研,一個是workerCount,另一個是runState
?????* workerCount表示當前有效的線程數鸥拧,也就是Worker的數量
?????* runState表示當前線程池的狀態(tài)
?????* The main pool control state, ctl, is an atomic integer packing
?????* two conceptual fields
?????*???workerCount, indicating the effective number of threads
?????*???runState,????indicating whether running, shutting down etc
?????*
?????* 兩者是怎么結合的呢党远?首先workerCount是占據著一個atomic integer的后29位的,而狀態(tài)占據了前3位
?????* 所以住涉,workerCount上限是(2^29)-1麸锉。
?????* In order to pack them into one int, we limit workerCount to
?????* (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
?????* billion) otherwise representable. If this is ever an issue in
?????* the future, the variable can be changed to be an AtomicLong,
?????* and the shift/mask constants below adjusted. But until the need
?????* arises, this code is a bit faster and simpler using an int.
?????*
?????* The workerCount is the number of workers that have been
?????* permitted to start and not permitted to stop.??The value may be
?????* transiently different from the actual number of live threads,
?????* for example when a ThreadFactory fails to create a thread when
?????* asked, and when exiting threads are still performing
?????* bookkeeping before terminating. The user-visible pool size is
?????* reported as the current size of the workers set.
?????*
?????* runState是整個線程池的運行生命周期钠绍,有如下取值:
?????*??1. RUNNING:可以新加線程舆声,同時可以處理queue中的線程。
?????*??2. SHUTDOWN:不增加新線程柳爽,但是處理queue中的線程媳握。
?????*??3.STOP 不增加新線程,同時不處理queue中的線程磷脯。
?????*??4.TIDYING 所有的線程都終止了(queue中)蛾找,同時workerCount為0,那么此時進入TIDYING
?????*??5.terminated()方法結束赵誓,變?yōu)門ERMINATED
?????* The runState provides the main lifecyle control, taking on values:
?????*
?????*???RUNNING:??Accept new tasks and process queued tasks
?????*???SHUTDOWN: Don't accept new tasks, but process queued tasks
?????*???STOP:?????Don't accept new tasks, don't process queued tasks,
?????*?????????????and interrupt in-progress tasks
?????*???TIDYING:??All tasks have terminated, workerCount is zero,
?????*?????????????the thread transitioning to state TIDYING
?????*?????????????will run the terminated() hook method
?????*???TERMINATED: terminated() has completed
?????*
?????* The numerical order among these values matters, to allow
?????* ordered comparisons. The runState monotonically increases over
?????* time, but need not hit each state. The transitions are:
?????* 狀態(tài)的轉化主要是:
?????* RUNNING -> SHUTDOWN(調用shutdown())
?????*????On invocation of shutdown(), perhaps implicitly in finalize()
?????* (RUNNING or SHUTDOWN) -> STOP(調用shutdownNow())
?????*????On invocation of shutdownNow()
?????* SHUTDOWN -> TIDYING(queue和pool均empty)
?????*????When both queue and pool are empty
?????* STOP -> TIDYING(pool empty打毛,此時queue已經為empty)
?????*????When pool is empty
?????* TIDYING -> TERMINATED(調用terminated())
?????*????When the terminated() hook method has completed
?????*
?????* Threads waiting in awaitTermination() will return when the
?????* state reaches TERMINATED.
?????*
?????* Detecting the transition from SHUTDOWN to TIDYING is less
?????* straightforward than you'd like because the queue may become
?????* empty after non-empty and vice versa during SHUTDOWN state, but
?????* we can only terminate if, after seeing that it is empty, we see
?????* that workerCount is 0 (which sometimes entails a recheck -- see
?????* below).
?????*/
下面是狀態(tài)的代碼:
//利用ctl來保證當前線程池的狀態(tài)和當前的線程的數量柿赊。ps:低29位為線程池容量,高3位為線程狀態(tài)幻枉。
????private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
????//設定偏移量
????private static final int COUNT_BITS = Integer.SIZE - 3;
????//確定最大的容量2^29-1
????private static final int CAPACITY???= (1 << COUNT_BITS) - 1;
????//幾個狀態(tài)碰声,用Integer的高三位表示
????// runState is stored in the high-order bits
????//111
????private static final int RUNNING????= -1 << COUNT_BITS;
????//000
????private static final int SHUTDOWN???=??0 << COUNT_BITS;
????//001
????private static final int STOP???????=??1 << COUNT_BITS;
????//010
????private static final int TIDYING????=??2 << COUNT_BITS;
????//011
????private static final int TERMINATED =??3 << COUNT_BITS;
????//獲取線程池狀態(tài),取前三位
????// Packing and unpacking ctl
????private static int runStateOf(int c)?????{ return c & ~CAPACITY; }
????//獲取當前正在工作的worker,主要是取后面29位
????private static int workerCountOf(int c)??{ return c & CAPACITY; }
????//獲取ctl
????private static int ctlOf(int rs, int wc) { return rs | wc; }
接下來貼上addWorker方法看看:
????/**
?????* Checks if a new worker can be added with respect to current
?????* pool state and the given bound (either core or maximum). If so,
?????* the worker count is adjusted accordingly, and, if possible, a
?????* new worker is created and started running firstTask as its
?????* first task. This method returns false if the pool is stopped or
?????* eligible to shut down. It also returns false if the thread
?????* factory fails to create a thread when asked, which requires a
?????* backout of workerCount, and a recheck for termination, in case
?????* the existence of this worker was holding up termination.
?????*
?????* @param firstTask the task the new thread should run first (or
?????* null if none). Workers are created with an initial first task
?????* (in method execute()) to bypass queuing when there are fewer
?????* than corePoolSize threads (in which case we always start one),
?????* or when the queue is full (in which case we must bypass queue).
?????* Initially idle threads are usually created via
?????* prestartCoreThread or to replace other dying workers.
?????*
?????* @param core if true use corePoolSize as bound, else
?????* maximumPoolSize. (A boolean indicator is used here rather than a
?????* value to ensure reads of fresh values after checking other pool
?????* state).
?????* @return true if successful
?????*/
????private boolean addWorker(Runnable firstTask, boolean core) {
????????retry:
????????for (;;) {
????????????int c = ctl.get();
????????????int rs = runStateOf(c);
????????????// Check if queue empty only if necessary.
????????????/**
?????????????* rs!=Shutdown || fistTask熬甫!=null || workCount.isEmpty
?????????????* 如果當前的線程池的狀態(tài)>SHUTDOWN 那么拒絕Worker的add 如果=SHUTDOWN
?????????????* 那么此時不能新加入不為null的Task胰挑,如果在WorkCount為empty的時候不能加入任何類型的Worker,
?????????????* 如果不為empty可以加入task為null的Worker,增加消費的Worker
?????????????*/
????????????if (rs >= SHUTDOWN &&
????????????????! (rs == SHUTDOWN &&
???????????????????firstTask == null &&
???????????????????! workQueue.isEmpty()))
????????????????return false;
????????????for (;;) {
????????????????int wc = workerCountOf(c);
????????????????if (wc >= CAPACITY ||
????????????????????wc >= (core ? corePoolSize : maximumPoolSize))
????????????????????return false;
????????????????if (compareAndIncrementWorkerCount(c))
????????????????????break retry;
????????????????c = ctl.get();??// Re-read ctl
????????????????if (runStateOf(c) != rs)
????????????????????continue retry;
????????????????// else CAS failed due to workerCount change; retry inner loop
????????????}
????????}
????????Worker w = new Worker(firstTask);
????????Thread t = w.thread;
????????final ReentrantLock mainLock = this.mainLock;
????????mainLock.lock();
????????try {
????????????// Recheck while holding lock.
????????????// Back out on ThreadFactory failure or if
????????????// shut down before lock acquired.
????????????int c = ctl.get();
????????????int rs = runStateOf(c);
????????????/**
?????????????* rs!=SHUTDOWN ||firstTask!=null
?????????????*
?????????????* 同樣檢測當rs>SHUTDOWN時直接拒絕減小Wc椿肩,同時Terminate瞻颂,如果為SHUTDOWN同時firstTask不為null的時候也要Terminate
?????????????*/
????????????if (t == null ||
????????????????(rs >= SHUTDOWN &&
?????????????????! (rs == SHUTDOWN &&
????????????????????firstTask == null))) {
????????????????decrementWorkerCount();
????????????????tryTerminate();
????????????????return false;
????????????}
????????????workers.add(w);
????????????int s = workers.size();
????????????if (s > largestPoolSize)
????????????????largestPoolSize = s;
????????} finally {
????????????mainLock.unlock();
????????}
????????t.start();
????????// It is possible (but unlikely) for a thread to have been
????????// added to workers, but not yet started, during transition to
????????// STOP, which could result in a rare missed interrupt,
????????// because Thread.interrupt is not guaranteed to have any effect
????????// on a non-yet-started Thread (see Thread#interrupt).
????????//Stop或線程Interrupt的時候要中止所有的運行的Worker
????????if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())
????????????t.interrupt();
????????return true;
????}
addWorker中首先進行了一次線程池狀態(tài)的檢測
????????????int c = ctl.get();
????????????int rs = runStateOf(c);
????????????// Check if queue empty only if necessary.
????????????//判斷當前線程池的狀態(tài)是不是已經shutdown,如果shutdown了拒絕線程加入
????????????//(rs!=SHUTDOWN || first!=null || workQueue.isEmpty())
????????????//如果rs不為SHUTDOWN郑象,此時狀態(tài)是STOP贡这、TIDYING或TERMINATED,所以此時要拒絕請求
????????????//如果此時狀態(tài)為SHUTDOWN扣唱,而傳入一個不為null的線程藕坯,那么需要拒絕
????????????//如果狀態(tài)為SHUTDOWN,同時隊列中已經沒任務了噪沙,那么拒絕掉
????????????if (rs >= SHUTDOWN &&
????????????????! (rs == SHUTDOWN &&
???????????????????firstTask == null &&
???????????????????! workQueue.isEmpty()))
????????????????return false;
其實是比較難懂的炼彪,主要在線程池狀態(tài)判斷條件這里:
如果是runing,那么跳過if正歼。
如果rs>=SHUTDOWN,同時不等于SHUTDOWN辐马,即為SHUTDOWN以上的狀態(tài),那么不接受新線程局义。
如果rs>=SHUTDOWN喜爷,同時等于SHUTDOWN,同時first萄唇!=null檩帐,那么拒絕新線程,如果first==null另萤,那么可能是新增加線程消耗Queue中的線程湃密。但是同時還要檢測workQueue是否isEmpty(),如果為Empty四敞,那么隊列已空泛源,不需要增加消耗線程,如果隊列沒有空那么運行增加first=null的Worker忿危。
從這里是可以看出一些策略的
首先达箍,在rs>SHUTDOWN時,拒絕一切線程的增加铺厨,因為STOP是會終止所有的線程缎玫,同時移除Queue中所有的待執(zhí)行的線程的硬纤,所以也不需要增加first=null的Worker了
其次,在SHUTDOWN狀態(tài)時赃磨,是不能增加first咬摇!=null的Worker的,同時即使first=null煞躬,但是此時Queue為Empty也是不允許增加Worker的肛鹏,SHUTDOWN下增加的Worker主要用于消耗Queue中的任務。
SHUTDOWN狀態(tài)時恩沛,是不允許向workQueue中增加線程的在扰,isRunning(c) && workQueue.offer(command) 每次在offer之前都要做狀態(tài)檢測,也就是線程池狀態(tài)變?yōu)?gt;=SHUTDOWN時不允許新線程進入線程池了雷客。
????????????for (;;) {
????????????????int wc = workerCountOf(c);
????????????????//如果當前的數量超過了CAPACITY芒珠,或者超過了corePoolSize和maximumPoolSize(試core而定)
????????????????if (wc >= CAPACITY ||
????????????????????wc >= (core ? corePoolSize : maximumPoolSize))
????????????????????return false;
????????????????//CAS嘗試增加線程數,如果失敗搅裙,證明有競爭皱卓,那么重新到retry。
????????????????if (compareAndIncrementWorkerCount(c))
????????????????????break retry;
????????????????c = ctl.get();??// Re-read ctl
????????????????//判斷當前線程池的運行狀態(tài)
????????????????if (runStateOf(c) != rs)
????????????????????continue retry;
????????????????// else CAS failed due to workerCount change; retry inner loop
????????????}
這段代碼做了一個兼容部逮,主要是沒有到corePoolSize 或maximumPoolSize上限時娜汁,那么允許添加線程,CAS增加Worker的數量后兄朋,跳出循環(huán)掐禁。
接下來實例化Worker,實例化Worker其實是很關鍵的,后面會說颅和。
因為workers是HashSet線程不安全的傅事,那么此時需要加鎖,所以mainLock.lock(); 之后重新檢查線程池的狀態(tài)峡扩,如果狀態(tài)不正確蹭越,那么減小Worker的數量,為什么tryTerminate()目前不大清楚教届。如果狀態(tài)正常响鹃,那么添加Worker到workers。最后:
??if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())
????????????t.interrupt();
注釋說的很清楚巍佑,為了能及時的中斷此Worker茴迁,因為線程存在未Start的情況寄悯,此時是不能響應中斷的萤衰,如果此時status變?yōu)镾TOP,則不能中斷線程猜旬。此處用作中斷線程之用。
接下來我們看Worker的方法:
/**
?????????* Creates with given first task and thread from ThreadFactory.
?????????* @param firstTask the first task (null if none)
?????????*/
????????Worker(Runnable firstTask) {
????????????this.firstTask = firstTask;
????????????this.thread = getThreadFactory().newThread(this);
????????}
這里可以看出Worker是對firstTask的包裝,并且Worker本身就是Runnable的稀并,看上去真心很流氓的感覺~~~
通過ThreadFactory為Worker自己構建一個線程沛硅。
因為Worker是Runnable類型的,所以是有run方法的,上面也看到了會調用t.start() 其實就是執(zhí)行了run方法:
????????/** Delegates main run loop to outer runWorker??*/
????????public void run() {
????????????runWorker(this);
????????}
調用了runWorker:
/**
?????* Main worker run loop.??Repeatedly gets tasks from queue and
?????* executes them, while coping with a number of issues:
?????* 1 Worker可能還是執(zhí)行一個初始化的task——firstTask猿推。
?????*????但是有時也不需要這個初始化的task(可以為null),只要pool在運行,就會
?????*???通過getTask從隊列中獲取Task,如果返回null褐捻,那么worker退出。
?????*???另一種就是external拋出異常導致worker退出椅邓。
?????* 1. We may start out with an initial task, in which case we
?????* don't need to get the first one. Otherwise, as long as pool is
?????* running, we get tasks from getTask. If it returns null then the
?????* worker exits due to changed pool state or configuration
?????* parameters.??Other exits result from exception throws in
?????* external code, in which case completedAbruptly holds, which
?????* usually leads processWorkerExit to replace this thread.
?????*
?????*
?????* 2 在運行任何task之前柠逞,都需要對worker加鎖來防止other pool中斷worker。
?????*???clearInterruptsForTaskRun保證除了線程池stop景馁,那么現場都沒有中斷標志
?????* 2. Before running any task, the lock is acquired to prevent
?????* other pool interrupts while the task is executing, and
?????* clearInterruptsForTaskRun called to ensure that unless pool is
?????* stopping, this thread does not have its interrupt set.
?????*
?????* 3. Each task run is preceded by a call to beforeExecute, which
?????* might throw an exception, in which case we cause thread to die
?????* (breaking loop with completedAbruptly true) without processing
?????* the task.
?????*
?????* 4. Assuming beforeExecute completes normally, we run the task,
?????* gathering any of its thrown exceptions to send to
?????* afterExecute. We separately handle RuntimeException, Error
?????* (both of which the specs guarantee that we trap) and arbitrary
?????* Throwables.??Because we cannot rethrow Throwables within
?????* Runnable.run, we wrap them within Errors on the way out (to the
?????* thread's UncaughtExceptionHandler).??Any thrown exception also
?????* conservatively causes thread to die.
?????*
?????* 5. After task.run completes, we call afterExecute, which may
?????* also throw an exception, which will also cause thread to
?????* die. According to JLS Sec 14.20, this exception is the one that
?????* will be in effect even if task.run throws.
?????*
?????* The net effect of the exception mechanics is that afterExecute
?????* and the thread's UncaughtExceptionHandler have as accurate
?????* information as we can provide about any problems encountered by
?????* user code.
?????*
?????* @param w the worker
?????*/
????final void runWorker(Worker w) {
????????Runnable task = w.firstTask;
????????w.firstTask = null;
????????//標識線程是不是異常終止的
????????boolean completedAbruptly = true;
????????try {
????????????//task不為null情況是初始化worker時板壮,如果task為null,則去隊列中取線程--->getTask()
????????????while (task != null || (task = getTask()) != null) {
????????????????w.lock();
????????????????//獲取woker的鎖合住,防止線程被其他線程中斷
????????????????clearInterruptsForTaskRun();//清楚所有中斷標記
????????????????try {
????????????????????beforeExecute(w.thread, task);//線程開始執(zhí)行之前執(zhí)行此方法绰精,可以實現Worker未執(zhí)行退出,本類中未實現
????????????????????Throwable thrown = null;
????????????????????try {
????????????????????????task.run();
????????????????????} catch (RuntimeException x) {
????????????????????????thrown = x; throw x;
????????????????????} catch (Error x) {
????????????????????????thrown = x; throw x;
????????????????????} catch (Throwable x) {
????????????????????????thrown = x; throw new Error(x);
????????????????????} finally {
????????????????????????afterExecute(task, thrown);//線程執(zhí)行后執(zhí)行透葛,可以實現標識Worker異常中斷的功能笨使,本類中未實現
????????????????????}
????????????????} finally {
????????????????????task = null;//運行過的task標null
????????????????????w.completedTasks++;
????????????????????w.unlock();
????????????????}
????????????}
????????????completedAbruptly = false;
????????} finally {
????????????//處理worker退出的邏輯
????????????processWorkerExit(w, completedAbruptly);
????????}
????}
????從上面代碼可以看出,execute的Task是被“包裝 ”了一層僚害,線程啟動時是內部調用了Task的run方法阱表。
接下來所有的核心集中在getTask()方法上:
/**
?????* Performs blocking or timed wait for a task, depending on
?????* current configuration settings, or returns null if this worker
?????* must exit because of any of:
?????* 1. There are more than maximumPoolSize workers (due to
?????*????a call to setMaximumPoolSize).
?????* 2. The pool is stopped.
?????* 3. The pool is shutdown and the queue is empty.
?????* 4. This worker timed out waiting for a task, and timed-out
?????*????workers are subject to termination (that is,
?????*????{@code allowCoreThreadTimeOut || workerCount > corePoolSize})
?????*????both before and after the timed wait.
?????*
?????* @return task, or null if the worker must exit, in which case
?????*?????????workerCount is decremented
?????*?????????
?????*?????????
?????*??隊列中獲取線程
?????*/
????private Runnable getTask() {
????????boolean timedOut = false; // Did the last poll() time out?
????????retry:
????????for (;;) {
????????????int c = ctl.get();
????????????int rs = runStateOf(c);
????????????// Check if queue empty only if necessary.
????????????//當前狀態(tài)為>stop時,不處理workQueue中的任務贡珊,同時減小worker的數量所以返回null最爬,如果為shutdown 同時workQueue已經empty了,同樣減小worker數量并返回null
????????????if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
????????????????decrementWorkerCount();
????????????????return null;
????????????}
????????????boolean timed;??????// Are workers subject to culling?
????????????for (;;) {
????????????????int wc = workerCountOf(c);
????????????????timed = allowCoreThreadTimeOut || wc > corePoolSize;
????????????????if (wc <= maximumPoolSize && ! (timedOut && timed))
????????????????????break;
????????????????if (compareAndDecrementWorkerCount(c))
????????????????????return null;
????????????????c = ctl.get();??// Re-read ctl
????????????????if (runStateOf(c) != rs)
????????????????????continue retry;
????????????????// else CAS failed due to workerCount change; retry inner loop
????????????}
????????????try {
????????????????Runnable r = timed ?
????????????????????workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
????????????????????workQueue.take();
????????????????if (r != null)
????????????????????return r;
????????????????timedOut = true;
????????????} catch (InterruptedException retry) {
????????????????timedOut = false;
????????????}
????????}
????}
????這段代碼十分關鍵门岔,首先看幾個局部變量:
boolean timedOut = false;
主要是判斷后面的poll是否要超時
boolean timed;
主要是標識著當前Worker超時是否要退出爱致。wc > corePoolSize時需要減小空閑的Worker數,那么timed為true寒随,但是wc <= corePoolSize時糠悯,不能減小核心線程數timed為false。
timedOut初始為false妻往,如果timed為true那么使用poll取線程互艾。如果正常返回,那么返回取到的task讯泣。如果超時纫普,證明worker空閑,同時worker超過了corePoolSize好渠,需要刪除昨稼。返回r=null节视。則 timedOut = true。此時循環(huán)到wc <= maximumPoolSize && ! (timedOut && timed)時假栓,減小worker數寻行,并返回null,導致worker退出匾荆。如果線程數<= corePoolSize拌蜘,那么此時調用 workQueue.take(),沒有線程獲取到時將一直阻塞牙丽,知道獲取到線程或者中斷拦坠,關于中斷后面Shutdown的時候會說。
至此線程執(zhí)行過程就分析完了~~~~
關于終止線程池
我個人認為剩岳,如果想了解明白線程池贞滨,那么就一定要理解好各個狀態(tài)之間的轉換,想理解轉換拍棕,線程池的終止機制是很好的一個途徑晓铆。對于關閉線程池主要有兩個方法shutdown()和shutdownNow():
首先從shutdown()方法開始:
????/**
?????* Initiates an orderly shutdown in which previously submitted
?????* tasks are executed, but no new tasks will be accepted.
?????* Invocation has no additional effect if already shut down.
?????*
?????* <p>This method does not wait for previously submitted tasks to
?????* complete execution.??Use {@link #awaitTermination awaitTermination}
?????* to do that.
?????*
?????* @throws SecurityException {@inheritDoc}
?????*/
????public void shutdown() {
????????final ReentrantLock mainLock = this.mainLock;
????????mainLock.lock();
????????try {
????????????//判斷是否可以操作目標線程
????????????checkShutdownAccess();
????????????//設置線程池狀態(tài)為SHUTDOWN,此處之后,線程池中不會增加新Task
????????????advanceRunState(SHUTDOWN);
????????????//中斷所有的空閑線程
????????????interruptIdleWorkers();
????????????onShutdown(); // hook for ScheduledThreadPoolExecutor
????????} finally {
????????????mainLock.unlock();
????????}
????????//轉到Terminate
????????tryTerminate();
????}????????
shutdown做了幾件事:
1. 檢查是否能操作目標線程
2. 將線程池狀態(tài)轉為SHUTDOWN
3. 中斷所有空閑線程
這里就引發(fā)了一個問題绰播,什么是空閑線程骄噪?
這需要接著看看interruptIdleWorkers是怎么回事。
private void interruptIdleWorkers(boolean onlyOne) {
????????final ReentrantLock mainLock = this.mainLock;
????????mainLock.lock();
????????//這里的意圖很簡單蠢箩,遍歷workers 對所有worker做中斷處理链蕊。
????????// w.tryLock()對Worker加鎖,這保證了正在運行執(zhí)行Task的Worker不會被中斷谬泌,那么能中斷哪些線程呢滔韵?
????????try {
????????????for (Worker w : workers) {
????????????????Thread t = w.thread;
????????????????if (!t.isInterrupted() && w.tryLock()) {
????????????????????try {
????????????????????????t.interrupt();
????????????????????} catch (SecurityException ignore) {
????????????????????} finally {
????????????????????????w.unlock();
????????????????????}
????????????????}
????????????????if (onlyOne)
????????????????????break;
????????????}
????????} finally {
????????????mainLock.unlock();
????????}
????}
????這里主要是為了中斷worker,但是中斷之前需要先獲取鎖掌实,這就意味著正在運行的Worker不能中斷陪蜻。但是上面的代碼有w.tryLock(),那么獲取不到鎖就不會中斷贱鼻,shutdown的Interrupt只是對所有的空閑Worker(正在從workQueue中取Task宴卖,此時Worker沒有加鎖)發(fā)送中斷信號。
????????????while (task != null || (task = getTask()) != null) {
????????????????w.lock();
????????????????//獲取woker的鎖邻悬,防止線程被其他線程中斷
????????????????clearInterruptsForTaskRun();//清楚所有中斷標記
????????????????try {
????????????????????beforeExecute(w.thread, task);//線程開始執(zhí)行之前執(zhí)行此方法症昏,可以實現Worker未執(zhí)行退出,本類中未實現
????????????????????Throwable thrown = null;
????????????????????try {
????????????????????????task.run();
????????????????????} catch (RuntimeException x) {
????????????????????????thrown = x; throw x;
????????????????????} catch (Error x) {
????????????????????????thrown = x; throw x;
????????????????????} catch (Throwable x) {
????????????????????????thrown = x; throw new Error(x);
????????????????????} finally {
????????????????????????afterExecute(task, thrown);//線程執(zhí)行后執(zhí)行父丰,可以實現標識Worker異常中斷的功能肝谭,本類中未實現
????????????????????}
????????????????} finally {
????????????????????task = null;//運行過的task標null
????????????????????w.completedTasks++;
????????????????????w.unlock();
????????????????}
????????????}
在runWorker中,每一個Worker getTask成功之后都要獲取Worker的鎖之后運行,也就是說運行中的Worker不會中斷分苇。因為核心線程一般在空閑的時候會一直阻塞在獲取Task上,也只有中斷才可能導致其退出屁桑。這些阻塞著的Worker就是空閑的線程(當然医寿,非核心線程,并且阻塞的也是空閑線程)蘑斧。在getTask方法中
????private Runnable getTask() {
????????boolean timedOut = false; // Did the last poll() time out?
????????retry:
????????for (;;) {
????????????int c = ctl.get();
????????????int rs = runStateOf(c);
????????????// Check if queue empty only if necessary.
????????????//當前狀態(tài)為>stop時靖秩,不處理workQueue中的任務,同時減小worker的數量所以返回null竖瘾,如果為shutdown 同時workQueue已經empty了沟突,同樣減小worker數量并返回null
????????????if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
????????????????decrementWorkerCount();
????????????????return null;
????????????}
????????????boolean timed;??????// Are workers subject to culling?
????????????for (;;) {
????????????????//allowCoreThreadTimeOu是判斷CoreThread是否會超時的,true為會超時捕传,false不會超時惠拭。默認為false
????????????????int wc = workerCountOf(c);
????????????????timed = allowCoreThreadTimeOut || wc > corePoolSize;
????????????????if (wc <= maximumPoolSize && ! (timedOut && timed))
????????????????????break;
????????????????if (compareAndDecrementWorkerCount(c))
????????????????????return null;
????????????????c = ctl.get();??// Re-read ctl
????????????????if (runStateOf(c) != rs)
????????????????????continue retry;
????????????????// else CAS failed due to workerCount change; retry inner loop
????????????}
????????????try {
????????????????Runnable r = timed ?
????????????????????workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
????????????????????workQueue.take();
????????????????if (r != null)
????????????????????return r;
????????????????timedOut = true;
????????????} catch (InterruptedException retry) {
????????????????timedOut = false;
????????????}
????????}
????}
會有兩階段的Worker:
剛進入getTask(),還沒進行狀態(tài)判斷庸论。
block在poll或者take上的Worker职辅。
當調用ShutDown方法時,首先設置了線程池的狀態(tài)為ShutDown聂示,此時1階段的worker進入到狀態(tài)判斷時會返回null域携,此時Worker退出。
因為getTask的時候是不加鎖的鱼喉,所以在shutdown時可以調用worker.Interrupt.此時會中斷退出秀鞭,Loop到狀態(tài)判斷時,同時workQueue為empty扛禽。那么拋出中斷異常锋边,導致重新Loop,在檢測線程池狀態(tài)時编曼,Worker退出宠默。如果workQueue不為null就不會退出,此處有些疑問灵巧,因為沒有看見中斷標志位清除的邏輯搀矫,那么這里就會不停的循環(huán)直到workQueue為Empty退出。
這里也能看出來SHUTDOWN只是清除一些空閑Worker刻肄,并且拒絕新Task加入瓤球,對于workQueue中的線程還是繼續(xù)處理的。
對于shutdown中獲取mainLock而addWorker中也做了mainLock的獲取敏弃,這么做主要是因為Works是HashSet類型的卦羡,是線程不安全的,我們也看到在addWorker后面也是對線程池狀態(tài)做了判斷,將Worker添加和中斷邏輯分離開绿饵。
接下來做了tryTerminate()操作欠肾,這操作是進行了后面狀態(tài)的轉換,在shutdownNow后面說拟赊。
接下來看看shutdownNow:
????/**
?????* Attempts to stop all actively executing tasks, halts the
?????* processing of waiting tasks, and returns a list of the tasks
?????* that were awaiting execution. These tasks are drained (removed)
?????* from the task queue upon return from this method.
?????*
?????* <p>This method does not wait for actively executing tasks to
?????* terminate.??Use {@link #awaitTermination awaitTermination} to
?????* do that.
?????*
?????* <p>There are no guarantees beyond best-effort attempts to stop
?????* processing actively executing tasks.??This implementation
?????* cancels tasks via {@link Thread#interrupt}, so any task that
?????* fails to respond to interrupts may never terminate.
?????*
?????* @throws SecurityException {@inheritDoc}
?????*/
????public List<Runnable> shutdownNow() {
????????List<Runnable> tasks;
????????final ReentrantLock mainLock = this.mainLock;
????????mainLock.lock();
????????try {
????????????checkShutdownAccess();
????????????advanceRunState(STOP);
????????????interruptWorkers();
????????????tasks = drainQueue();
????????} finally {
????????????mainLock.unlock();
????????}
????????tryTerminate();
????????return tasks;
????}
shutdownNow和shutdown代碼類似刺桃,但是實現卻很不相同。首先是設置線程池狀態(tài)為STOP吸祟,前面的代碼我們可以看到瑟慈,是對SHUTDOWN有一些額外的判斷邏輯,但是對于>=STOP,基本都是reject屋匕,STOP也是比SHUTDOWN更加嚴格的一種狀態(tài)葛碧。此時不會有新Worker加入,所有剛執(zhí)行完一個線程后去GetTask的Worker都會退出过吻。
之后調用interruptWorkers:
????/**
?????* Interrupts all threads, even if active. Ignores SecurityExceptions
?????* (in which case some threads may remain uninterrupted).
?????*/
????private void interruptWorkers() {
????????final ReentrantLock mainLock = this.mainLock;
????????mainLock.lock();
????????try {
????????????for (Worker w : workers) {
????????????????try {
????????????????????w.thread.interrupt();
????????????????} catch (SecurityException ignore) {
????????????????}
????????????}
????????} finally {
????????????mainLock.unlock();
????????}
????}