1.Java中的ThreadPoolExecutor類
1)Java中的ThreadPoolExecutor類是線程池最核心的類:
ThreadPoolExecutor有四個(gè)構(gòu)造方法:
2) 下面解釋下一下構(gòu)造器中各個(gè)參數(shù)的含義:
corePoolSize:核心池的大小,這個(gè)參數(shù)跟后面講述的線程池的實(shí)現(xiàn)原理有非常大的關(guān)系隐绵。在創(chuàng)建了線程池后洋幻,默認(rèn)情況下,線程池中并沒(méi)有任何線程缕允,而是等待有任務(wù)到來(lái)才創(chuàng)建線程去執(zhí)行任務(wù)旁趟,除非調(diào)用了prestartAllCoreThreads()或者prestartCoreThread()方法艰匙,從這2個(gè)方法的名字就可以看出限煞,是預(yù)創(chuàng)建線程的意思,即在沒(méi)有任務(wù)到來(lái)之前就創(chuàng)建corePoolSize個(gè)線程或者一個(gè)線程员凝。默認(rèn)情況下署驻,在創(chuàng)建了線程池后,線程池中的線程數(shù)為0绊序,當(dāng)有任務(wù)來(lái)之后硕舆,就會(huì)創(chuàng)建一個(gè)線程去執(zhí)行任務(wù),當(dāng)線程池中的線程數(shù)目達(dá)到corePoolSize后骤公,就會(huì)把到達(dá)的任務(wù)放到緩存隊(duì)列當(dāng)中抚官;
maximumPoolSize:線程池最大線程數(shù),這個(gè)參數(shù)也是一個(gè)非常重要的參數(shù)阶捆,它表示在線程池中最多能創(chuàng)建多少個(gè)線程凌节;
keepAliveTime:表示線程沒(méi)有任務(wù)執(zhí)行時(shí)最多保持多久時(shí)間會(huì)終止。默認(rèn)情況下洒试,只有當(dāng)線程池中的線程數(shù)大于corePoolSize時(shí)倍奢,keepAliveTime才會(huì)起作用,直到線程池中的線程數(shù)不大于corePoolSize垒棋,即當(dāng)線程池中的線程數(shù)大于corePoolSize時(shí)卒煞,如果一個(gè)線程空閑的時(shí)間達(dá)到keepAliveTime,則會(huì)終止叼架,直到線程池中的線程數(shù)不超過(guò)corePoolSize畔裕。但是如果調(diào)用了allowCoreThreadTimeOut(boolean)方法捶闸,在線程池中的線程數(shù)不大于corePoolSize時(shí)勾效,keepAliveTime參數(shù)也會(huì)起作用躏精,直到線程池中的線程數(shù)為0障癌;
unit:參數(shù)keepAliveTime的時(shí)間單位,有7種取值旨袒,在TimeUnit類中有7種靜態(tài)屬性:
workQueue:一個(gè)阻塞隊(duì)列撒璧,用來(lái)存儲(chǔ)等待執(zhí)行的任務(wù)喳张,這個(gè)參數(shù)的選擇也很重要,會(huì)對(duì)線程池的運(yùn)行過(guò)程產(chǎn)生重大影響岂丘,一般來(lái)說(shuō)陵究,這里的阻塞隊(duì)列有以下幾種選擇:
ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;
ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous元潘。線程池的排隊(duì)策略與BlockingQueue有關(guān)畔乙。
threadFactory:線程工廠,主要用來(lái)創(chuàng)建線程翩概;
handler:表示當(dāng)拒絕處理任務(wù)時(shí)的策略,有以下四種取值:
??ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常返咱。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù)钥庇,但是不拋出異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù)咖摹,然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過(guò)程)
ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)
ThreadPoolExecutor繼承了AbstractExecutorService评姨,我們來(lái)看一下AbstractExecutorService的實(shí)現(xiàn):
? ??
我們接著看ExecutorService接口的實(shí)現(xiàn):
ExecutorService提供了終止線程池的管理方法,并且還提供了一些返回一個(gè)Future對(duì)象的方法萤晴,通過(guò)Future對(duì)象吐句,我們就可以跟蹤到異步任務(wù)的進(jìn)程了。
一個(gè)ExecutorService是可以被關(guān)閉的店读,如果ExecutorService被關(guān)閉了嗦枢,它將會(huì)拒絕接收新的任務(wù)。有兩個(gè)不同的方法可以關(guān)閉ExecutorService
shutdown()? 允許先前提交的任務(wù)在終止之前執(zhí)行屯断。
shutdownNow()? 會(huì)阻止開啟新的任務(wù)并且嘗試停止當(dāng)前正在執(zhí)行的任務(wù)文虏。
當(dāng)一個(gè)線程池被終止時(shí),沒(méi)有正在執(zhí)行的任務(wù)殖演,也沒(méi)有等待執(zhí)行的任務(wù)氧秘,也沒(méi)有新的任務(wù)可以提交,一個(gè)沒(méi)有被使用的池程池應(yīng)該關(guān)閉以允許回收它的資源趴久。
submit()方法擴(kuò)展了Executor#execute(Runnable)方法丸相,創(chuàng)建被返回一個(gè)Future對(duì)象,這個(gè)對(duì)象可以用于取消任務(wù)的執(zhí)行或者等待任務(wù)完成并取出返回值彼棍,至于如何取消任務(wù)灭忠,或者取值,大家可以參考一些對(duì)Future接口的使用案例滥酥,這里就不擴(kuò)展了更舞。
ThreadPoolExecutor線程池
根據(jù)treadpollexecutor構(gòu)造方法種的中的參數(shù)進(jìn)行 進(jìn)行創(chuàng)建
intmaximumPoolSize,
longkeepAliveTime,?
TimeUnit unit,
?BlockingQueue workQueue,?
ThreadFactory threadFactory,?
RejectedExecutionHandler handler
這些參數(shù)進(jìn)行創(chuàng)建其中keepAliveTime的時(shí)間單位由unit參數(shù)控制,必須>0,然后maximumPoolSize>corePoolSize>0坎吻,任務(wù)隊(duì)列缆蝉,線程工廠,拒絕策略均不能為null。如果在使用了其它的構(gòu)造函數(shù)刊头,可以會(huì)使用默認(rèn)的的線程工廠和默認(rèn)的拒絕策略
其中根據(jù)阿里巴巴java開發(fā)手冊(cè)規(guī)范不允許使用executor創(chuàng)建 因?yàn)闀?huì)使系統(tǒng)產(chǎn)生oom危險(xiǎn)黍瞧。具體原因 請(qǐng)自行查找。
execute()方法中:
? ? ? * 1:如果當(dāng)前運(yùn)行的線程數(shù)小于 corePoolSize原杂,則馬上嘗試使用command對(duì)象創(chuàng)建一個(gè)新線程印颤。
? ? ? ? * 調(diào)用addWorker()方法進(jìn)行原子性檢查runState和workerCount,然后通過(guò)返回false來(lái)防止在不應(yīng)該
? ? ? ? * 添加線程時(shí)添加了線程產(chǎn)生的錯(cuò)誤警告。
? ? ? ? *
? ? ? ? * 2:如果一個(gè)任務(wù)能成功添加到任務(wù)隊(duì)列穿肄,在我們添加一個(gè)新的線程時(shí)仍然需要進(jìn)行雙重檢查
? ? ? ? * (因?yàn)樽?上一次檢查后年局,可能線程池中的其它線程全部都被回收了) 或者在進(jìn)入此方法后,
? ? ? ? * 線程池已經(jīng) shutdown了咸产。所以我們必須重新檢查狀態(tài)矢否,如果有必要,就在線程池shutdown時(shí)采取
? ? ? ? * 回滾入隊(duì)操作移除任務(wù)脑溢,如果線程池的工作線程數(shù)為0僵朗,就啟動(dòng)新的線程。
? ? ? ? *
? ? ? ? * 3:如果任務(wù)不能入隊(duì)屑彻,那么需要嘗試添加一個(gè)新的線程验庙,但如果這個(gè)操作失敗了,那么我們知道線程
? ? ? ? * 池可能已經(jīng)shutdown了或者已經(jīng)飽和了社牲,從而拒絕任務(wù).
? ??????intc = ctl.get();if(workerCountOf(c) < corePoolSize) {//如果工作線程數(shù)<核心線程數(shù)
????????if(addWorker(command,true))//添加一個(gè)工作線程來(lái)運(yùn)行任務(wù)粪薛,如果成功了,則直接返回//if 還有要有大括號(hào)比較好膳沽,至少讓閱讀的人看起來(lái)更清楚汗菜,這里我要小小地批評(píng)一下小Leareturn;//如果添加線程失敗了,就再次獲取線程池控制狀態(tài)
c = ctl.get(); }
? ? 然后判斷線程狀態(tài)是否在運(yùn)行狀態(tài)挑社,如果在在運(yùn)行狀態(tài) 直接添加到任務(wù)隊(duì)列中陨界。
int recheck = ctl.get();
? ? 再次獲取線程池狀態(tài)
if (! isRunning(recheck) && remove(command)) reject(command);
? ? 如果線程狀態(tài)已經(jīng)不是運(yùn)行狀態(tài)了? 在對(duì)列中移除任務(wù) 執(zhí)行拒絕策略
else if (workerCountOf(recheck) == 0) addWorker(null, false);
檢查工作線程數(shù) 如果等于0 添加隊(duì)列中
else if (!addWorker(command, false)) reject(command);
再次嘗試獲取添加工作線程中 如果失敗直接執(zhí)行拒絕策略
addWorker() 分析:
int c = ctl.get();
int rs = runStateOf(c);
獲取線程控制狀態(tài)
獲取線程池運(yùn)行狀態(tài)
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;
判斷? 如果運(yùn)行狀態(tài)是否在運(yùn)行狀態(tài) 和 任務(wù)是否為空? 緩存隊(duì)列為空直接返回 false
int wc = workerCountOf(c);
獲取工作線程數(shù)
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false;
根據(jù)線程數(shù) 比較否大于 核心線程數(shù) 和最大線程池?cái)?shù)
if (compareAndIncrementWorkerCount(c)) break retry;
線程控制狀態(tài) 原子操作 +1 卻退出循環(huán)
c = ctl.get(); if (runStateOf(c) != rs) continue retry;
再次獲取線程控制狀態(tài)? 如果線程的運(yùn)行狀態(tài)不相等 直接退出循環(huán)
Worker w = null;try { w = new Worker(firstTask);final Thread t = w.thread;
構(gòu)建worker對(duì)象,獲取worker對(duì)應(yīng)得線程
if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock();
如果線程不是空痛阻,獲取線程池得鎖
int rs = runStateOf(ctl.get());//拿著鎖重新檢查池程池的狀
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { //如果線程已經(jīng)運(yùn)行了或者還沒(méi)有死掉菌瘪,拋出一個(gè)IllegalThreadStateException異常
if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException();//把worker加入到工作線程Set里面
workers.add(w); int s = workers.size(); if (s > largestPoolSize)//如果工作線程池的大小大于
largestPoolSize largestPoolSize = s;//讓largestPoolSize記錄工作線程池的最大的大小
workerAdded = true;//工作線程被添加的標(biāo)記置為true
}
} finally {
mainLock.unlock();//釋放鎖 }
if (workerAdded) {//如果工作線程已經(jīng)被添加到工作線程池了
t.start();//開始執(zhí)行任務(wù)
workerStarted = true;//把工作線程開始的標(biāo)記置為true
}}
inally { if (! workerStarted)//如果沒(méi)有添加,那么移除任務(wù)阱当,并減少工作線程的數(shù)量(-1)
addWorkerFailed(w); }
return workerStarted;
真正執(zhí)行任務(wù)的runWorker()方法
final void runWorker(Worker w) { Thread wt = Thread.currentThread();//獲取當(dāng)前線程(和worker綁定的線程)
Runnable task = w.firstTask;//用task保存在worker中的任務(wù)
w.firstTask = null;//把worker中的任務(wù)置為null
w.unlock(); //釋放鎖
boolean completedAbruptly = true; try { //這個(gè)while循環(huán)俏扩,保證了如果任務(wù)隊(duì)列中還有任務(wù)就繼續(xù)拿出來(lái)執(zhí)行,注意這里的短路情況
while (task != null || (task = getTask()) != null) {//如果任務(wù)不為空弊添,或者任務(wù)隊(duì)列中還有任務(wù) w.lock();//獲取鎖 // If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||//如果線程池的狀態(tài)>STOP录淡,直接中斷 (Thread.interrupted() &&//調(diào)用者線程被中斷
runStateAtLeast(ctl.get(), STOP))) &&//再次檢查線程池的狀態(tài)如果>STOP
!wt.isInterrupted())//當(dāng)前線程還沒(méi)有被中斷 wt.interrupt();//中斷當(dāng)前線程
try { beforeExecute(wt, task);//在任務(wù)執(zhí)行前的鉤子方法
Throwable thrown = null;//用于記錄運(yùn)行任務(wù)時(shí)可能出現(xiàn)的異常 try { //開始正式運(yùn)行任務(wù)
task.run(); }
catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); }
finally { //任務(wù)執(zhí)行完成后的鉤子方法 afterExecute(task, thrown); } }
finally { task = null;//把任務(wù)置為null w.completedTasks++;//把任務(wù)完成的數(shù)量+1
w.unlock();//釋放鎖 } }
completedAbruptly = false; }
finally { //當(dāng)所有任務(wù)完成之后的一個(gè)鉤子方法
processWorkerExit(w, completedAbruptly);
} }