Java中的線程池
Java中的線程池是運用場景最多的并發(fā)框架,幾乎所有需要異步或并發(fā)執(zhí)行任務的程序都可以使用線程池.在開發(fā)過程中,合理使用線程池能夠帶來3個好處.
- 降低資源消耗.通過重復利用以創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗.
- 提高響應速度:任務可以不用等到線程創(chuàng)建就能執(zhí)行任務.
- 提高線程的可管理性:線程是稀缺資源,如果無限制地創(chuàng)建,不僅會消耗系統(tǒng)資源,還會降低系統(tǒng)穩(wěn)定性,使用線程池可以進行統(tǒng)一分配,調(diào)優(yōu)和監(jiān)控.
線程池的實現(xiàn)原理
從上圖中可以看出,當提交一個新任務到線程池時,線程池的處理流程如下:
- 線程池判斷核心線程池里的線程是否都在執(zhí)行任務.如果不是,則創(chuàng)建新的一個工作線程來執(zhí)行任務.如果核心線程池都在執(zhí)行任務帆吻,則進入下個流程。
- 線程池判斷工作隊列是否已經(jīng)滿。如果工作隊列沒有滿挡鞍,則將新提交的任務存儲在這個工作隊列里腔呜。如果工作隊列滿了害晦,則進入下個流程设拟。
- 線程池判斷線程池的線程是否都處于工作狀態(tài)蛮瞄。如果沒有置济,則創(chuàng)建一個新的工作線程來執(zhí)行任務解恰。如果已經(jīng)滿了,則交給飽和策略來處理這個任務浙于。
ThreadPoolExecutor執(zhí)行execute方法分下面4種情況护盈。
- 如果當前運行的線程少于corePoolSize,則創(chuàng)建新線程來執(zhí)行任務(注意羞酗,執(zhí)行這一步驟需要獲取全局鎖)腐宋。
- 如果運行的線程等于或多于corePoolSize并且小于maximumPoolSize,則將任務加入BlockingQueue。
- 如果無法將任務加入BlockingQueue(隊列已滿)胸竞,則創(chuàng)建新的線程來處理任務(注意欺嗤,執(zhí)行這一步驟需要獲取全局鎖)。
4)如果創(chuàng)建新線程將使當前運行的線程超出maximumPoolSize卫枝,任務將被拒絕煎饼,并調(diào)用RejectedExecutionHandler.rejectedExecution()方法。
ThreadPoolExecutor采取上述步驟的總體設計思路校赤,是為了在執(zhí)行execute()方法時吆玖,盡可能地避免獲取全局鎖(那將會是一個嚴重的可伸縮瓶頸)。在ThreadPoolExecutor完成預熱之后(當前運行的線程數(shù)大于等于corePoolSize)痒谴,幾乎所有的execute()方法調(diào)用都是執(zhí)行步驟2衰伯,而步驟2不需要獲取全局鎖。
源碼分析
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//如果線程數(shù)小于基本線程數(shù)积蔚,則創(chuàng)建線程并執(zhí)行當前任務
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//如線程數(shù)大于等于基本線程數(shù)或線程創(chuàng)建失敗意鲸,則將當前任務放到工作隊列中。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//超過最大線程池大小
if (! isRunning(recheck) && remove(command))
reject(command);
//如果線程池不處于運行中或任務無法放入隊列尽爆,并且當前線程數(shù)量小于最大允許的線程數(shù)量
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
工作線程:線程池創(chuàng)建線程時怎顾,會將線程封裝成工作線程Worker,Worker在執(zhí)行完任務后漱贱,還會循環(huán)獲取工作隊列里的任務來執(zhí)行槐雾。
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//從阻塞隊列中"取出"任務
while (task != null || (task = getTask()) != null) {
//占用任務
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//執(zhí)行任務
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
線程池中的線程執(zhí)行任務分兩種情況,如下幅狮。
- 在execute()方法中創(chuàng)建一個線程時募强,會讓這個線程執(zhí)行當前任務。
- 這個線程執(zhí)行完上圖中1的任務后崇摄,會反復從BlockingQueue獲取任務來執(zhí)行擎值。
線程池的使用
ThreadPoolExecutor的各種構(gòu)造方法
參數(shù)解釋
- corePoolSize(線程池的基本大小--核心線程池大小):當提交一個任務到線程池時,線程池會創(chuàng)建一個線程來執(zhí)行任務,即使其他空閑的基本線程能夠執(zhí)行新任務也會創(chuàng)建線程逐抑,等到需要執(zhí)行的任務數(shù)大于線程池基本大小時就不再創(chuàng)建鸠儿。如果調(diào)用了線程池的
prestartAllCoreThreads()
方法線程池會提前創(chuàng)建并啟動所有基本線程。 - runnableTaskQueue(任務隊列):用于保存等待執(zhí)行的任務的阻塞隊列厕氨〗浚可以選擇以下幾個阻塞隊列。
- ArrayBlockingQueue:是一個基于數(shù)組結(jié)構(gòu)的有界阻塞隊列命斧,此隊列按FIFO(先進先出)原則對元素進行排序田晚。
- LinkedBlockingQueue:一個基于鏈表結(jié)構(gòu)的阻塞隊列,此隊列按FIFO排序元素国葬,吞吐量通常要高于ArrayBlockingQueue肉瓦。靜態(tài)工廠方法
Executors.newFixedThreadPool(
)使用了這個隊列遭京。 - SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調(diào)用移除操作泞莉,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于Linked-BlockingQueue船殉,靜態(tài)工廠方法
Executors.newCachedThreadPool()
使用了這個隊列鲫趁。 - PriorityBlockingQueue:一個具有優(yōu)先級的無限阻塞隊列。
- maximumPoolSize(線程池最大數(shù)量):線程池允許創(chuàng)建的最大線程數(shù)利虫。如果隊列滿了挨厚,并且已創(chuàng)建的線程數(shù)小于最大線程數(shù),則線程池會再創(chuàng)建新的線程執(zhí)行任務糠惫。值得注意的是疫剃,如果使用了無界的任務隊列這個參數(shù)就沒什么效果。
- ThreadFactory:用于設置創(chuàng)建線程的工廠硼讽,可以通過線程工廠給每個創(chuàng)建出來的線程設置更有意義的名字巢价。使用開源框架guava提供的ThreadFactoryBuilder可以快速給線程池里的線程設置有意義的名字,代碼如下固阁。
new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();
- RejectedExecutionHandler(飽和策略):當隊列和線程池都滿了壤躲,說明線程池處于飽和狀態(tài),那么必須采取一種策略處理提交的新任務备燃。這個策略默認情況下是AbortPolicy碉克,表示無法處理新任務時拋出異常。
- AbortPolicy:直接拋出異常并齐。
- CallerRunsPolicy:只用調(diào)用者所在線程來運行任務漏麦。
- DiscardOldestPolicy:丟棄隊列里最近的一個任務,并執(zhí)行當前任務况褪。
- DiscardPolicy:不處理撕贞,丟棄掉
也可以根據(jù)應用場景需要來實現(xiàn)RejectedExecutionHandler接口自定義策略。如記錄日志或持久化存儲不能處理的任務窝剖。
- keepAliveTime(線程活動保持時間):線程池的工作線程空閑后麻掸,保持存活的時間。所以赐纱,如果任務很多脊奋,并且每個任務執(zhí)行的時間比較短,可以調(diào)大時間疙描,提高線程的利用率诚隙。
- TimeUnit(線程活動保持時間的單位):可選的單位有天(DAYS)、小時(HOURS)起胰、分鐘(MINUTES)久又、毫秒(MILLISECONDS)巫延、微秒(MICROSECONDS,千分之一毫秒)和納秒
(NANOSECONDS地消,千分之一微秒)炉峰。
向線程池提交任務
可以使用兩個方法向線程池提交任務,分別為execute()和submit()方法。
- execute()方法用于提交不需要返回值的任務脉执,所以無法判斷任務是否被線程池執(zhí)行成功疼阔。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
-
submit()方法用于提交需要返回值的任務。
線程池會返回一個Future類型的對象半夷,通過這個Future對象可以判斷任務是否執(zhí)行成功婆廊,并且可以通過future的get()方法來獲取返回值,get()方法會阻塞當前線程直到任務完成巫橄,而使用get(long timeout淘邻,TimeUnit unit)方法則會阻塞當前線程一段時間后立即返回,這時候有可能任務沒有執(zhí)行完湘换。
關(guān)閉線程池
可以通過調(diào)用線程池的shutdown()
或shutdownNow()
方法來關(guān)閉線程池宾舅。它們的原理是遍歷線程池中的工作線程,然后逐個調(diào)用線程的interrupt方法來中斷線程枚尼,所以無法響應中斷的任務可能永遠無法終止贴浙。但是它們存在一定的區(qū)別,shutdownNow首先將線程池的狀態(tài)設置成STOP署恍,然后嘗試停止所有的正在執(zhí)行或暫停任務的線程崎溃,并返回等待執(zhí)行任務的列表,而shutdown只是將線程池的狀態(tài)設置成SHUTDOWN狀態(tài)盯质,然后中斷所有沒有正在執(zhí)行任務的線程袁串。
只要調(diào)用了這兩個關(guān)閉方法中的任意一個,isShutdown方法就會返回true呼巷。當所有的任務都已關(guān)閉后囱修,才表示線程池關(guān)閉成功,這時調(diào)用isTerminaed方法會返回true王悍。至于應該調(diào)用哪一種方法來關(guān)閉線程池破镰,應該由提交到線程池的任務特性決定,通常調(diào)用shutdown方法來關(guān)閉線程池压储,如果任務不一定要執(zhí)行完鲜漩,則可以調(diào)用shutdownNow方法。
合理地配置線程池
要想合理地配置線程池集惋,就必須首先分析任務特性孕似,可以從以下幾個角度來分析。
- 任務的性質(zhì):CPU密集型任務刮刑、IO密集型任務和混合型任務喉祭。
- 任務的優(yōu)先級:高养渴、中和低。
- 任務的執(zhí)行時間:長泛烙、中和短理卑。
- 任務的依賴性:是否依賴其他系統(tǒng)資源,如數(shù)據(jù)庫連接胶惰。
性質(zhì)不同的任務可以用不同規(guī)模的線程池分開處理傻工。
- CPU密集型任務應配置盡可能小的線程,如配置Ncpu+1個線程的線程池孵滞。
- 于IO密集型任務線程并不是一直在執(zhí)行任務,則應配置盡可能多的線程鸯匹,如2*Ncpu坊饶。混合型的任務殴蓬,如果可以拆分匿级,將其拆分成一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執(zhí)行的時間相差不是太大染厅,那么分解后執(zhí)行的吞吐量將高于串行執(zhí)行的吞吐量痘绎。如果這兩個任務執(zhí)行時間相差太大,則沒必要進行分解肖粮。
可以通過
Runtime.getRuntime().availableProcessors()
方法獲得當前設備的CPU個數(shù)孤页。
優(yōu)先級不同的任務可以使用優(yōu)先級隊列PriorityBlockingQueue來處理。它可以讓優(yōu)先級高的任務先執(zhí)行涩馆。
注意:如果一直有優(yōu)先級高的任務提交到隊列里行施,那么優(yōu)先級低的任務可能永遠不能執(zhí)行。
執(zhí)行時間不同的任務可以交給不同規(guī)模的線程池來處理魂那,或者可以使用優(yōu)先級隊列蛾号,讓執(zhí)行時間短的任務先執(zhí)行。
依賴數(shù)據(jù)庫連接池的任務涯雅,因為線程提交SQL后需要等待數(shù)據(jù)庫返回結(jié)果鲜结,等待的時間越長,則CPU空閑時間就越長活逆,那么線程數(shù)應該設置得越大精刷,這樣才能更好地利用CPU。
建議使用有界隊列划乖。有界隊列能增加系統(tǒng)的穩(wěn)定性和預警能力贬养,可以根據(jù)需要設大一點,比如幾千琴庵。
線程池的監(jiān)控
如果在系統(tǒng)中大量使用線程池误算,則有必要對線程池進行監(jiān)控仰美,方便在出現(xiàn)問題時,可以根據(jù)線程池的使用狀況快速定位問題儿礼】г樱可以通過線程池提供的參數(shù)進行監(jiān)控,在監(jiān)控線程池的時候可以使用以下屬性蚊夫。
- taskCount:線程池需要執(zhí)行的任務數(shù)量诉字。
- completedTaskCount:線程池在運行過程中已完成的任務數(shù)量,小于或等于taskCount知纷。
- ·largestPoolSize:線程池里曾經(jīng)創(chuàng)建過的最大線程數(shù)量壤圃。通過這個數(shù)據(jù)可以知道線程池是否曾經(jīng)滿過。如該數(shù)值等于線程池的最大大小琅轧,則表示線程池曾經(jīng)滿過伍绳。
- getPoolSize:線程池的線程數(shù)量。如果線程池不銷毀的話乍桂,線程池里的線程不會自動銷毀冲杀,所以這個大小只增不減。
- getActiveCount:獲取活動的線程數(shù)睹酌。
通過擴展線程池進行監(jiān)控权谁。可以通過繼承線程池來自定義線程池憋沿,重寫線程池的
beforeExecute
旺芽、afterExecute
和terminated
方法,也可以在任務執(zhí)行前卤妒、執(zhí)行后和線程池關(guān)閉前執(zhí)行一些代碼來進行監(jiān)控甥绿。例如,監(jiān)控任務的平均執(zhí)行時間则披、最大執(zhí)行時間和最小執(zhí)行時間等共缕。這幾個方法在線程池里是空方法。
protected void beforeExecute(Thread t, Runnable r) { }
參考書籍:《Java并發(fā)編程的藝術(shù)》