1.線程池(java.util.concurrent)
為什么要有線程池 豫领?
我們知道抡柿。使用線程來處理任務(wù),可以達到一定程度的并行計算的效果等恐,在一些比較耗時的操作時候不用一直等待洲劣,比如以下i/o操作。那么每次需要的時候就創(chuàng)建一個線程來處理這種任務(wù)就好了鼠锈,為什么要引入線程池這個概念呢闪檬?
主要存在三方面的原因:
- 線程生命周期的開銷非常高。 創(chuàng)建線程是需要時間的购笆,并且需要JVM和底層操作系統(tǒng)提供一些輔助的支持粗悯,無限創(chuàng)建線程,必定在創(chuàng)建線程的時候消耗很多資源同欠。
- 資源消耗样傍。 活躍的線程必定要占據(jù)一定的內(nèi)存,線程越多铺遂,使用的內(nèi)存越大衫哥。當可運行的線程多于可用的處理器數(shù)量的時候,線程就會閑置襟锐。大量的閑置線程就會占據(jù)大量內(nèi)存撤逢,給垃圾回收帶來很多的壓力。而且這些線程在資源CPU競爭的時候也將產(chǎn)生更大的開銷。
- 穩(wěn)定性蚊荣。 之前的JVM的OOM中有提到過初狰,過多的線程還會可能出現(xiàn)OOM異常。因為線程數(shù)量受制于JVM的參數(shù)配置互例,Thread構(gòu)造方法中的請求棧大小奢入,以及底層操作系統(tǒng)對線程的閑置,一旦超出就會出現(xiàn)OOM的異常
所以媳叨,使用線程池腥光,用它來管理線程,可以有效的減少因為線程創(chuàng)建和線程數(shù)量過多導(dǎo)致的問題
1.1 Executor框架
1.1.1 框架基礎(chǔ)
先來看看住基本的框架結(jié)構(gòu)圖:
1. 主要元素:
- 頂層是一個Executor接口糊秆,主要常用的實現(xiàn)類是ThreadPoolExecutor和ScheduledThreadPoolExecutor
- BlockingQueue接口及其實現(xiàn)
- Future接口以及實現(xiàn)
- Executors 創(chuàng)建線程池的關(guān)鍵類
2. 框架執(zhí)行原理
關(guān)于執(zhí)行原理武福,說到這個問題,不得不說jdk源碼的作者寫代碼真是習慣好扩然,跟進源碼艘儒,查看Executor接口聋伦,在類上面夫偶,很大段的解釋和說明,還有示例代碼來說明觉增。相比周圍的我們寫的代碼兵拢,簡簡單單的幾行注釋,甚至有的完全寫出來就是沒有注釋逾礁,試問這樣代碼怎么看说铃。很多時候我覺得寫代碼好不好,代碼風格和格式很重要嘹履。
回答我們剛才的話題腻扇,一起來看看Executor接口上面的注釋吧
2.1 Excutor接口
我們?nèi)タ丛创a就發(fā)現(xiàn),Executor
接口只有個方核心方法execute
,接收的參數(shù)是Runnable
砾嫉。Runnable在jdk里面幼苛,我們都稱之為Task也就是要執(zhí)行的任務(wù),使用Executor可是避免我們反復(fù)的使用new Thread(new(RunnableTask())).start()
焕刮。當有很多任務(wù)需要執(zhí)行的時候舶沿,可以如下的方式:
// 異步執(zhí)行任務(wù)
Executor executor = anExecutor; // 此處偽代碼,實現(xiàn)時候就是使用Executors創(chuàng)建一個子類
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
...
上面的代碼配并,會使得多個任務(wù)異步的執(zhí)行括荡。在executor源碼注釋上有寫明,這個接口也可以不要求任務(wù)是異步執(zhí)行的溉旋,一個簡單例子就是直接執(zhí)行提交的任務(wù)的run方法
// 直接同步執(zhí)行
class DirectExecutor implements Executor {
public void execute(Runnable r) {
r.run();
}
}
但是更典型的方式使用一個線程來執(zhí)行任務(wù)而不是使用run方法畸冲,例如:
// 每個任務(wù)一個線程異步去執(zhí)行
class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start();
}
}
而在Executor框架中,Executor
的實現(xiàn)類都是解決的批量任務(wù)的執(zhí)行順序和時間的問題。下面的例子是一個順序執(zhí)行的Executor的一個實現(xiàn)邑闲。
// 多任務(wù)順序執(zhí)行
class SerialExecutor implements Executor {
final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
final Executor executor;
Runnable active;
SerialExecutor(Executor executor) {
this.executor = executor;
}
public synchronized void execute(final Runnable r) {
tasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (active == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((active = tasks.poll()) != null) {
executor.execute(active);
}
}
}
上面這個例子基本能簡單表現(xiàn)出執(zhí)行任務(wù)的思路岩喷,值得注意的一點就是,這個jdk注釋中的例子在executor中引入了一個任務(wù)隊列监憎,再把隊列中的任務(wù)取出順序執(zhí)行纱意。在JDK提供的Executor的實現(xiàn)類中,使用workQueue來存儲需要執(zhí)行的任務(wù)鲸阔,使用一個Worker集合works來執(zhí)行任務(wù)(不同于上例中的順序執(zhí)行偷霉,且上例中工作線程相當于只有一個)。執(zhí)行Worker啟動后執(zhí)行完自己的runnable后還會從workQueue中繼續(xù)獲取任務(wù)執(zhí)行褐筛,直到任務(wù)隊列為空类少。
2.2 ExecutorService 接口
ExecutorService接口繼承自Executor 接口,主要增加了線程生命周幾管理的幾個方法以及Future 來跟蹤任務(wù)一個或多個異步任務(wù)的處理情況渔扎。
其中
- shutDown() 關(guān)閉executor硫狞,已經(jīng)提交的任務(wù)會被執(zhí)行,新的任務(wù)不會再接受
- shutDownNow() 立即關(guān)閉executor,停止執(zhí)行晃痴,并返回一個等待執(zhí)行的任務(wù)列表
- isShutDown() executor是否終止
- isTerminated() 所有任務(wù)執(zhí)行完成残吩,只有在調(diào)用了shutDown或者shutDownNow之后,才會返回true
- submit() 幾種提交任務(wù)的方式
2.3 Executors
提供各種方法創(chuàng)建線程池倘核,從大的方向看泣侮,線程主要分為兩類,一種就是不同的異步執(zhí)行的紧唱,一種就是實現(xiàn)了ScheduledExecutorService 接口的線程活尊,兩類線程的區(qū)別在于在于ScheduledExecutorService是那種有計劃執(zhí)行的任務(wù),比如說定時任務(wù)或者延時執(zhí)行的任務(wù)漏益。
具體使用查看Executors.newXXX() 相關(guān)文檔
1.1.2. ThreadPoolExecutor & ScheduledThreadPoolExecutor
ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 都是executorService的實現(xiàn)類蛹锰,他們關(guān)系從之前類圖已經(jīng)可以清楚地看出來〈掳蹋基本使用差不多铜犬,卻別就在于定位或者延時功能。所以本文只分析ThreadPoolExecutor的源碼峦睡,來看看線程池的工作大致流程翎苫。
1.1.2.1 ThreadPoolExecutor源碼分析
在分析源碼前,我根據(jù)個人的理解榨了,先簡單說明線程池工作的流程煎谍,在進入代碼查看。
之前在看JDK的Executor接口的文檔的時候龙屉,在源碼上面的標準注釋里面的例子(也是生成的javadoc里面的注釋)的最后一個呐粘,有提到過一個概念满俗,任務(wù)隊列。前文還簡單說了下具體實現(xiàn)類和那個例子的不同∽麽現(xiàn)在來具體看看唆垃,在說之前,先明白幾個概念痘儡。
- 工作隊列
BlockingQueue<Runnable> workQueue
辕万。存放所有的runnable任務(wù)。 - 工作線程集合
HashSet<Worker> workers
沉删。線程池中所有的工作線程集合
Runnable都清楚是什么渐尿,Woker呢,先看看worker類可能更能方便理解線程池的工作過程
// Woker
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
.....
很明顯就是有個線程矾瑰,一個任務(wù)砖茸,和任務(wù)完成數(shù)量,核心方法是runWorker
// runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
runWorker做的事情很明確殴穴,如果Worker創(chuàng)建的時候帶了任務(wù)凉夯,則執(zhí)行這個任務(wù)的run()方法,如果沒有就去執(zhí)行g(shù)etTask()在workQueue中獲得一個任務(wù)來執(zhí)行采幌,直到?jīng)]任務(wù)可執(zhí)行為止劲够。
在回頭看execute方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
這段執(zhí)行邏輯:
- 查看當前Worker(工作線程)數(shù)量有沒有達到coreSize,沒有就創(chuàng)建一個工作線程
- 如果線程池沒有關(guān)閉,并且添加到隊列成功植榕,再次執(zhí)行下檢測再沧,或者拒絕,或者由于工作線程沒有重新添加工作線程尊残。這個分支需要注意的是,可能這個分支走完只添加了任務(wù)淤堵,沒有添加線程寝衫。也就是重復(fù)利用線程。利用已有的工作線程自己去隊列中消費任務(wù)拐邪。例外注意runWorker里面使用的getTask() 實際是個阻塞的慰毅,一直循環(huán)在取隊列中的任務(wù),取不到一直循環(huán)扎阶,這個線程就會一直在汹胃。runWorker也是個死循環(huán)一直執(zhí)行task.run。所以線程中的線程其實一直在運行的东臀。但是getActiveCount 是去HashSet<Worker> workers 里面的上鎖(在執(zhí)行run的線程着饥,而不是在getTask的)的線程數(shù)量。
- 添加任務(wù)失敗的時候惰赋,直接拒絕
這里另外說一下宰掉,呵哨。
// addWorker 部分代碼
...
w = new Worker(firstTask);
final Thread t = w.thread;
...
if (workerAdded) {
t.start();
workerStarted = true;
}
...
addWorker最后會啟動worker的私有屬性thread的線程,開始執(zhí)行runWorker,同事把worker添加到HashSet<Worker>中
由于worker的構(gòu)造函數(shù)中this.thread = getThreadFactory().newThread(this);
所以woker的thread啟動的時候轨奄,執(zhí)行的就是Wroker的run孟害,即threadPoolExecutor的runWorker方法。整個執(zhí)行鏈如下:
ThreadPoolExecutor.execute()-->addWorkder(可能添加成功或者失敗挪拟,失敗是涉及到拒絕處理問題)-->Workder.thread.start()-->Worker.run-->threadPoolExecutor.runWorker-->循環(huán)執(zhí)行g(shù)etTask挨务、task.run
以上就是線程基本的執(zhí)行流程了,觀察ThreadPoolExecutor的完整參數(shù)的構(gòu)造方法發(fā)現(xiàn):
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
其中ThreadFactory 是用來創(chuàng)建Worker的thread用的玉组,管理所有的線程耘子。
RejectedExecutionHandler handler是在addWorker的時候如果添加失敗,執(zhí)行的飽和策略球切。JUC(java.util.concurrent)包中有提供幾種實現(xiàn)谷誓。也可以根據(jù)需要自己實現(xiàn)自己的飽和策略。
1.1.2.2 Exexutors.newXXX的參數(shù)意義和是使用時候注意的問題
newFixedThreadPool
創(chuàng)建一個固定長度的線程池吨凑,每次提交任務(wù)就會創(chuàng)建線程捍歪,知道達到最大線程數(shù)。如果線程發(fā)生Exception死掉鸵钝,會新補充線程進來糙臼。默認工作隊列最大長度是Integer.MXA_VALUE。認為是一個無界的隊列newCachedThreadPool
創(chuàng)建一個可緩存的線程池恩商,如果線程池的當前規(guī)模超出了處理需求变逃,就回收空閑線程,如果需求增加就添加新的線程怠堪。線程值規(guī)模不受限制揽乱,所以在使用的時候,操作不當可能創(chuàng)建很多線程導(dǎo)致OOM粟矿。
使用的隊列是SynchronousQueue.newScheduledThreadPool
創(chuàng)建固定長度線程池凰棉,而且以延遲或定時的方式來執(zhí)行任務(wù)newSingleThreadExecutor、newSingleThreadScheduledExecutor
創(chuàng)建一個單線程的Executor陌粹,如果單個線程出現(xiàn)Exeception死掉撒犀,就是創(chuàng)建一個線程來替代。他可以確保任務(wù)隊列中的任務(wù)是順序執(zhí)行的掏秩。
1.2. 線程池任務(wù)管理 Queue & Deque
ThreadPoolExecutor提供了三中隊列方式:無界隊列或舞、有界對列、同步移交蒙幻。隊列的選擇與其他的參數(shù)有關(guān)映凳,例如:線程池的大小。
無界杆煞、有界隊列魏宽。使用無界隊列當線程池中的線程都處于忙碌狀態(tài)的時候腐泻,工作隊列就會無限制的增長。一種更加穩(wěn)妥的方式使用有界隊列队询,例如:ArrayBlockingQueue派桩,有界LinkedBlockingQueue,PriorityBlockingQueue蚌斩。有界隊列有助于避免資源耗盡情況的發(fā)生铆惑,但是就需要考慮隊列填滿時候的飽和策略問題。
同步移交送膳。對于非常大或者無界的線程池员魏,可以使用SynchronousQueue來避免任務(wù)排隊,以及直接將任務(wù)從生產(chǎn)者直接移交給工作線程叠聋,移交的時候必須要求有線程等待接受撕阎,如果沒有切線程池線程數(shù)小于最大線程,就創(chuàng)建線程接受碌补,否則就拒絕虏束。
執(zhí)行順序 。ArrayBlockingQueue 和 PriorityBlockingQueue是FIFO類型隊列厦章,如果想進一步的控制任務(wù)執(zhí)行的順序镇匀,可以使用PriorityBlockingQueue來進行管理,任務(wù)優(yōu)先級是通過自然順序或者Comparator接口來定義的袜啃。
注意:只有當任務(wù)相互獨立是汗侵,為線程池或者工作隊列設(shè)置界限才是合理的,如果任務(wù)之間存在依賴群发,那么有界的線程池或者隊列就可能導(dǎo)致“饑餓”死鎖問題
1.3 線程池飽和策略 RejectedExecutionHandler
當有界隊列被填滿的時候晰韵,飽和策略就開始發(fā)揮作用了。ThreadPoolExecutor的飽和策略可以通過調(diào)用setRejectedExecutionHandler來修改也物。JDK提供了四種默認的飽和策略宫屠。
AbortPolicy 默認策略,拋出一個未經(jīng)檢測的RejectedExecutionException,調(diào)用者捕獲這個異常滑蚯,根據(jù)自己的需求編寫自己的代碼。
DiscardPolicy 拋棄策略抵栈, 當新的任務(wù)無法添加到隊列的時候告材,默默的拋棄該任務(wù)
DiscardOldestPolicy 拋棄最早策略,次策略會拋棄寫一個要執(zhí)行的任務(wù)古劲,然后嘗試提交任務(wù)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
因此如果是個優(yōu)先隊列斥赋,則拋棄優(yōu)先級最高的策略,所有不建議這個策略和優(yōu)先隊列一起使用
CallerRunsPolicy 調(diào)用者直接執(zhí)行run策略产艾,這種直接在調(diào)用者的線程執(zhí)行任務(wù)的run方法疤剑。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
因為直接下調(diào)用者里面執(zhí)行的任務(wù)滑绒,所有會是一個同步的效果,就會帶來響應(yīng)的延時隘膘。
以上四種是JDK提供的策略疑故,我們還可以根據(jù)自己的需要,自己實現(xiàn)RejectedExecutionHandler弯菊,實現(xiàn)我們自己的飽和策略纵势。
1.4 線程池如何重復(fù)利用線程的 ?
1.4.1 ThreadFactory
線程工廠是創(chuàng)建線程的地方管钳,實際就是創(chuàng)建工作線程钦铁。
// DefaultthreadFactory
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
1.4.2 線程池如何重復(fù)利用線程?
通過前面對線程池的理解才漆,線程池的實現(xiàn)思路基本有一定的了解牛曹,那么線程池究竟如何重復(fù)利用線程的呢?
其實這里的“重復(fù)” 并沒有放開重新獲取醇滥,而是工作線程一直運行黎比。當運行的線程數(shù)量沒有達到coreSize的時候,不管任務(wù)多少腺办,新來任務(wù)會重新創(chuàng)建工作線程焰手。工作線程中執(zhí)行的是死循環(huán)一直獲取任務(wù)來執(zhí)行。通過使用工作線程來執(zhí)行任務(wù)的run方法達到避免創(chuàng)建線程的目的怀喉。前面源碼分析部分书妻,查看execute、addWorker躬拢、runWorker躲履、getTask 四個方法就很明了。
- execute: 添加工作線程聊闯,或者只添加任務(wù)工猜、或者拒絕任務(wù)
- addWorker: 實際上的創(chuàng)建工作線程,并start
- runWorker: 工作線程的run方法里面執(zhí)行的代碼菱蔬,循環(huán)取隊列的中的任務(wù)進行執(zhí)行篷帅。
- getTask: 一直去任務(wù),隊列為空就一直循環(huán)直到取到值或者線程池關(guān)閉拴泌。
所以線程池的工作線程一點啟動魏身,是一直在運行的。沒有任務(wù)可執(zhí)行的時候蚪腐,也是在執(zhí)行箭昵,只不過這個時候是阻塞在了getTask方法中。所以千萬不要理解成線程池做完任務(wù)就把線程放回去回季,要用的時候在拿出來家制。