Java源碼里面都有大量的注釋承桥,認(rèn)真讀懂這些注釋,就可以把握其七分工作機(jī)制了根悼。關(guān)于ThreadPoolExecutor的解析凶异,我們就從其類注釋開始。
現(xiàn)將注釋大致翻譯如下:
ExecutorService(ThreadPoolExecutor的頂層接口)使用線程池中的線程執(zhí)行每個提交的任務(wù)挤巡,通常我們使用Executors的工廠方法來創(chuàng)建ExecutorService剩彬。
線程池解決了兩個不同的問題:
- 提升性能:它們通常在執(zhí)行大量異步任務(wù)時,由于減少了每個任務(wù)的調(diào)用開銷玄柏,并且它們提供了一種限制和管理資源(包括線程)的方法襟衰,使得性能提升明顯;
- 統(tǒng)計(jì)信息:每個ThreadPoolExecutor保持一些基本的統(tǒng)計(jì)信息粪摘,例如完成的任務(wù)數(shù)量瀑晒。
為了在廣泛的上下文中有用,此類提供了許多可調(diào)參數(shù)和可擴(kuò)展性鉤子徘意。 但是苔悦,在常見場景中,我們預(yù)配置了幾種線程池椎咧,我們敦促程序員使用更方便的Executors的工廠方法直接使用玖详。
- Executors.newCachedThreadPool(無界線程池把介,自動線程回收)
- Executors.newFixedThreadPool(固定大小的線程池);
- Executors.newSingleThreadExecutor(單一后臺線程)蟋座;
注:這里沒有提到ScheduledExecutorService 拗踢,后續(xù)解析。
在自定義線程池時向臀,請參考以下指南:
一巢墅、Core and maximum pool sizes 核心和最大線程池?cái)?shù)量
參數(shù) | 翻譯 |
---|---|
corePoolSize | 核心線程池?cái)?shù)量 |
maximumPoolSize | 最大線程池?cái)?shù)量 |
線程池執(zhí)行器將會根據(jù)corePoolSize和maximumPoolSize自動地調(diào)整線程池大小。
當(dāng)在execute(Runnable)方法中提交新任務(wù)并且少于corePoolSize線程正在運(yùn)行時券膀,即使其他工作線程處于空閑狀態(tài)君纫,也會創(chuàng)建一個新線程來處理該請求。 如果有多于corePoolSize但小于maximumPoolSize線程正在運(yùn)行芹彬,則僅當(dāng)隊(duì)列已滿時才會創(chuàng)建新線程蓄髓。 通過設(shè)置corePoolSize和maximumPoolSize相同,您可以創(chuàng)建一個固定大小的線程池舒帮。 通過將maximumPoolSize設(shè)置為基本上無界的值会喝,例如Integer.MAX_VALUE,您可以允許池容納任意數(shù)量的并發(fā)任務(wù)玩郊。 通常好乐,核心和最大池大小僅在構(gòu)建時設(shè)置,但也可以使用setCorePoolSize
和setMaximumPoolSize
進(jìn)行動態(tài)更改瓦宜。
這段話詳細(xì)了描述了線程池對任務(wù)的處理流程蔚万,這里用個圖總結(jié)一下
二、prestartCoreThread 核心線程預(yù)啟動
在默認(rèn)情況下临庇,只有當(dāng)新任務(wù)到達(dá)時反璃,才開始創(chuàng)建和啟動核心線程,但是我們可以使用 prestartCoreThread()
和 prestartAllCoreThreads()
方法動態(tài)調(diào)整假夺。
如果使用非空隊(duì)列構(gòu)建池淮蜈,則可能需要預(yù)先啟動線程。
方法 | 作用 |
---|---|
prestartCoreThread() | 創(chuàng)一個空閑任務(wù)線程等待任務(wù)的到達(dá) |
prestartAllCoreThreads() | 創(chuàng)建核心線程池?cái)?shù)量的空閑任務(wù)線程等待任務(wù)的到達(dá) |
三已卷、ThreadFactory 線程工廠
新線程使用ThreadFactory創(chuàng)建梧田。 如果未另行指定,則使用Executors.defaultThreadFactory默認(rèn)工廠侧蘸,使其全部位于同一個ThreadGroup中裁眯,并且具有相同的NORM_PRIORITY優(yōu)先級和非守護(hù)進(jìn)程狀態(tài)。
通過提供不同的ThreadFactory讳癌,您可以更改線程的名稱穿稳,線程組,優(yōu)先級晌坤,守護(hù)進(jìn)程狀態(tài)等逢艘。如果ThreadCactory在通過從newThread返回null詢問時未能創(chuàng)建線程旦袋,則執(zhí)行程序?qū)⒗^續(xù),但可能無法執(zhí)行任何任務(wù)它改。
線程應(yīng)該有modifyThread權(quán)限疤孕。 如果工作線程或使用該池的其他線程不具備此權(quán)限,則服務(wù)可能會降級:配置更改可能無法及時生效央拖,并且關(guān)閉池可能會保持可終止但尚未完成的狀態(tài)胰柑。
四、Keep-alive times 線程存活時間
如果線程池當(dāng)前擁有超過corePoolSize的線程爬泥,那么多余的線程在空閑時間超過keepAliveTime時會被終止 ( 請參閱getKeepAliveTime(TimeUnit) )。這提供了一種在不積極使用線程池時減少資源消耗的方法崩瓤。
如果池在以后變得更加活躍袍啡,則應(yīng)構(gòu)建新線程。 也可以使用方法setKeepAliveTime(long却桶,TimeUnit)
進(jìn)行動態(tài)調(diào)整境输。
防止空閑線程在關(guān)閉之前終止,可以使用如下方法:
setKeepAliveTime(Long.MAX_VALUE颖系,TimeUnit.NANOSECONDS);
默認(rèn)情況下嗅剖,keep-alive策略僅適用于存在超過corePoolSize線程的情況。 但是嘁扼,只要keepAliveTime值不為零信粮,方法allowCoreThreadTimeOut(boolean)
也可用于將此超時策略應(yīng)用于核心線程。
五趁啸、Queuing 隊(duì)列
BlockingQueu用于存放提交的任務(wù)强缘,隊(duì)列的實(shí)際容量與線程池大小相關(guān)聯(lián)。
如果當(dāng)前線程池任務(wù)線程數(shù)量小于核心線程池?cái)?shù)量不傅,執(zhí)行器總是優(yōu)先創(chuàng)建一個任務(wù)線程旅掂,而不是從線程隊(duì)列中取一個空閑線程。
如果當(dāng)前線程池任務(wù)線程數(shù)量大于核心線程池?cái)?shù)量访娶,執(zhí)行器總是優(yōu)先從線程隊(duì)列中取一個空閑線程商虐,而不是創(chuàng)建一個任務(wù)線程。
如果當(dāng)前線程池任務(wù)線程數(shù)量大于核心線程池?cái)?shù)量崖疤,且隊(duì)列中無空閑任務(wù)線程秘车,將會創(chuàng)建一個任務(wù)線程,直到超出maximumPoolSize劫哼,如果超時maximumPoolSize鲫尊,則任務(wù)將會被拒絕。
這個過程參考[線程任務(wù)處理流程圖.png]
主要有三種隊(duì)列策略:
Direct handoffs 直接握手隊(duì)列
Direct handoffs 的一個很好的默認(rèn)選擇是 SynchronousQueue沦偎,它將任務(wù)交給線程而不需要保留疫向。這里咳蔚,如果沒有線程立即可用來運(yùn)行它,那么排隊(duì)任務(wù)的嘗試將失敗搔驼,因此將構(gòu)建新的線程谈火。
此策略在處理可能具有內(nèi)部依賴關(guān)系的請求集時避免鎖定。Direct handoffs 通常需要無限制的maximumPoolSizes來避免拒絕新提交的任務(wù)舌涨。 但得注意糯耍,當(dāng)任務(wù)持續(xù)以平均提交速度大余平均處理速度時,會導(dǎo)致線程數(shù)量會無限增長問題囊嘉。Unbounded queues 無界隊(duì)列
當(dāng)所有corePoolSize線程繁忙時温技,使用無界隊(duì)列(例如,沒有預(yù)定義容量的LinkedBlockingQueue)將導(dǎo)致新任務(wù)在隊(duì)列中等待扭粱,從而導(dǎo)致maximumPoolSize的值沒有任何作用舵鳞。當(dāng)每個任務(wù)互不影響,完全獨(dú)立于其他任務(wù)時琢蛤,這可能是合適的; 例如蜓堕,在網(wǎng)頁服務(wù)器中, 這種隊(duì)列方式可以用于平滑瞬時大量請求博其。但得注意套才,當(dāng)任務(wù)持續(xù)以平均提交速度大余平均處理速度時,會導(dǎo)致隊(duì)列無限增長問題慕淡。Bounded queues 有界隊(duì)列
一個有界的隊(duì)列(例如背伴,一個ArrayBlockingQueue)和有限的maximumPoolSizes配置有助于防止資源耗盡,但是難以控制峰髓。隊(duì)列大小和maximumPoolSizes需要 相互權(quán)衡:
- 使用大隊(duì)列和較小的maximumPoolSizes可以最大限度地減少CPU使用率挂据,操作系統(tǒng)資源和上下文切換開銷,但會導(dǎo)致人為的低吞吐量儿普。如果任務(wù)經(jīng)常被阻塞(比如I/O限制)崎逃,那么系統(tǒng)可以調(diào)度比我們允許的更多的線程。
- 使用小隊(duì)列通常需要較大的maximumPoolSizes眉孩,這會使CPU更繁忙个绍,但可能會遇到不可接受的調(diào)度開銷,這也會降低吞吐量。
這里主要為了說明有界隊(duì)列大小和maximumPoolSizes的大小控制,若何降低資源消耗的同時辛孵,提高吞吐量
六坏快、Rejected tasks 拒絕任務(wù)
拒絕任務(wù)有兩種情況:1. 線程池已經(jīng)被關(guān)閉覆享;2. 任務(wù)隊(duì)列已滿且maximumPoolSizes已滿;
無論哪種情況,都會調(diào)用RejectedExecutionHandler的rejectedExecution方法昔逗。預(yù)定義了四種處理策略:
- AbortPolicy:默認(rèn)測策略钉迷,拋出RejectedExecutionException運(yùn)行時異常至非;
- CallerRunsPolicy:這提供了一個簡單的反饋控制機(jī)制,可以減慢提交新任務(wù)的速度糠聪;
- DiscardPolicy:直接丟棄新提交的任務(wù)荒椭;
-
DiscardOldestPolicy:如果執(zhí)行器沒有關(guān)閉,隊(duì)列頭的任務(wù)將會被丟棄舰蟆,然后執(zhí)行器重新嘗試執(zhí)行任務(wù)(如果失敗趣惠,則重復(fù)這一過程);
我們可以自己定義RejectedExecutionHandler身害,以適應(yīng)特殊的容量和隊(duì)列策略場景中味悄。
七、Hook methods 鉤子方法
ThreadPoolExecutor為提供了每個任務(wù)執(zhí)行前后提供了鉤子方法塌鸯,重寫beforeExecute(Thread侍瑟,Runnable)
和afterExecute(Runnable,Throwable)
方法來操縱執(zhí)行環(huán)境界赔; 例如,重新初始化ThreadLocals牵触,收集統(tǒng)計(jì)信息或記錄日志等淮悼。此外,terminated()
在Executor完全終止后需要完成后會被調(diào)用揽思,可以重寫此方法袜腥,以執(zhí)行任殊處理。
注意:如果hook或回調(diào)方法拋出異常钉汗,內(nèi)部的任務(wù)線程將會失敗并結(jié)束羹令。
八、Queue maintenance 維護(hù)隊(duì)列
getQueue()
方法可以訪問任務(wù)隊(duì)列损痰,一般用于監(jiān)控和調(diào)試福侈。絕不建議將這個方法用于其他目的。當(dāng)在大量的隊(duì)列任務(wù)被取消時卢未,remove()
和purge()
方法可用于回收空間肪凛。
九、Finalization 關(guān)閉
如果程序中不在持有線程池的引用辽社,并且線程池中沒有線程時伟墙,線程池將會自動關(guān)閉。如果您希望確保即使用戶忘記調(diào)用 shutdown()
方法也可以回收未引用的線程池滴铅,使未使用線程最終死亡戳葵。那么必須通過設(shè)置適當(dāng)?shù)?keep-alive times 并設(shè)置allowCoreThreadTimeOut(boolean) 或者 使 corePoolSize下限為0 。
一般情況下汉匙,線程池啟動后建議手動調(diào)用shutdown()關(guān)閉拱烁。
總結(jié)生蚁,通過解讀ThreadPoolExecutor的注釋,我們對ThreadPoolExecutor應(yīng)該有了比較全面的了解邻梆,其實(shí)現(xiàn)方式守伸,后續(xù)章節(jié)詳解。
多線程系列目錄(不斷更新中):
線程啟動原理
線程中斷機(jī)制
多線程實(shí)現(xiàn)方式
FutureTask實(shí)現(xiàn)原理
線程池之ThreadPoolExecutor概述
線程池之ThreadPoolExecutor使用
線程池之ThreadPoolExecutor狀態(tài)控制
線程池之ThreadPoolExecutor執(zhí)行原理
線程池之ScheduledThreadPoolExecutor概述
線程池的優(yōu)雅關(guān)閉實(shí)踐
英文原文如下:
/**
* An {@link ExecutorService} that executes each submitted task using
* one of possibly several pooled threads, normally configured
* using {@link Executors} factory methods.
*
* <p>Thread pools address two different problems: they usually
* provide improved performance when executing large numbers of
* asynchronous tasks, due to reduced per-task invocation overhead,
* and they provide a means of bounding and managing the resources,
* including threads, consumed when executing a collection of tasks.
* Each {@code ThreadPoolExecutor} also maintains some basic
* statistics, such as the number of completed tasks.
*
* <p>To be useful across a wide range of contexts, this class
* provides many adjustable parameters and extensibility
* hooks. However, programmers are urged to use the more convenient
* {@link Executors} factory methods {@link
* Executors#newCachedThreadPool} (unbounded thread pool, with
* automatic thread reclamation), {@link Executors#newFixedThreadPool}
* (fixed size thread pool) and {@link
* Executors#newSingleThreadExecutor} (single background thread), that
* preconfigure settings for theost common usage
* scenarios. Otherwise, use the following guide when manually
* configuring and tuning this class:
*
* <dl>
*
* <dt>Core and maximum pool sizes</dt>
*
* <dd>A {@code ThreadPoolExecutor} will automatically adjust the
* pool size (see {@link #getPoolSize})
* according to the bounds set by
* corePoolSize (see {@link #getCorePoolSize}) and
* maximumPoolSize (see {@link #getMaximumPoolSize}).
*
* When a new task is submitted in method {@link #execute(Runnable)},
* and fewer than corePoolSize threads are running, a new thread is
* created to handle the request, even if other worker threads are
* idle. If there are more than corePoolSize but less than
* maximumPoolSize threads running, a new thread will be created only
* if the queue is full. By setting corePoolSize and maximumPoolSize
* the same, you create a fixed-size thread pool. By setting
* maximumPoolSize to an essentially unbounded value such as {@code
* Integer.MAX_VALUE}, you allow the pool to accommodate an arbitrary
* number of concurrent tasks. Most typically, core and maximum pool
* sizes are set only upon construction, but they may also be changed
* dynamically using {@link #setCorePoolSize} and {@link
* #setMaximumPoolSize}. </dd>
*
* <dt>On-demand construction</dt>
*
* <dd>By default, even core threads are initially created and
* started only when new tasks arrive, but this can be overridden
* dynamically using method {@link #prestartCoreThread} or {@link
* #prestartAllCoreThreads}. You probably want to prestart threads if
* you construct the pool with a non-empty queue. </dd>
*
* <dt>Creating new threads</dt>
*
* <dd>New threads are created using a {@link ThreadFactory}. If not
* otherwise specified, a {@link Executors#defaultThreadFactory} is
* used, that creates threads to all be in the same {@link
* ThreadGroup} and with the same {@code NORM_PRIORITY} priority and
* non-daemon status. By supplying a different ThreadFactory, you can
* alter the thread's name, thread group, priority, daemon status,
* etc. If a {@code ThreadFactory} fails to create a thread when asked
* by returning null from {@code newThread}, the executor will
* continue, but might not be able to execute any tasks. Threads
* should possess the "modifyThread" {@code RuntimePermission}. If
* worker threads or other threads using the pool do not possess this
* permission, service may be degraded: configuration changes may not
* take effect in a timely manner, and a shutdown pool may remain in a
* state in which termination is possible but not completed.</dd>
*
* <dt>Keep-alive times</dt>
*
* <dd>If the pool currently has more than corePoolSize threads,
* excess threads will be terminated if they have been idle for more
* than the keepAliveTime (see {@link #getKeepAliveTime(TimeUnit)}).
* This provides a means of reducing resource consumption when the
* pool is not being actively used. If the pool becomes more active
* later, new threads will be constructed. This parameter can also be
* changed dynamically using method {@link #setKeepAliveTime(long,
* TimeUnit)}. Using a value of {@code Long.MAX_VALUE} {@link
* TimeUnit#NANOSECONDS} effectively disables idle threads from ever
* terminating prior to shut down. By default, the keep-alive policy
* applies only when there are more than corePoolSize threads. But
* method {@link #allowCoreThreadTimeOut(boolean)} can be used to
* apply this time-out policy to core threads as well, so long as the
* keepAliveTime value is non-zero. </dd>
*
* <dt>Queuing</dt>
*
* <dd>Any {@link BlockingQueue} may be used to transfer and hold
* submitted tasks. The use of this queue interacts with pool sizing:
*
* <ul>
*
* <li> If fewer than corePoolSize threads are running, the Executor
* always prefers adding a new thread
* rather than queuing.</li>
*
* <li> If corePoolSize or more threads are running, the Executor
* always prefers queuing a request rather than adding a new
* thread.</li>
*
* <li> If a request cannot be queued, a new thread is created unless
* this would exceed maximumPoolSize, in which case, the task will be
* rejected.</li>
*
* </ul>
*
* There are three general strategies for queuing:
* <ol>
*
* <li> <em> Direct handoffs.</em> A good default choice for a work
* queue is a {@link SynchronousQueue} that hands off tasks to threads
* without otherwise holding them. Here, an attempt to queue a task
* will fail if no threads are immediately available to run it, so a
* new thread will be constructed. This policy avoids lockups when
* handling sets of requests that might have internal dependencies.
* Direct handoffs generally require unbounded maximumPoolSizes to
* avoid rejection of new submitted tasks. This in turn admits the
* possibility of unbounded thread growth when commands continue to
* arrive on average faster than they can be processed. </li>
*
* <li><em> Unbounded queues.</em> Using an unbounded queue (for
* example a {@link LinkedBlockingQueue} without a predefined
* capacity) will cause new tasks to wait in the queue when all
* corePoolSize threads are busy. Thus, no more than corePoolSize
* threads will ever be created. (And the value of the maximumPoolSize
* therefore doesn't have any effect.) This may be appropriate when
* each task is completely independent of others, so tasks cannot
* affect each others execution; for example, in a web page server.
* While this style of queuing can be useful in smoothing out
* transient bursts of requests, it admits the possibility of
* unbounded work queue growth when commands continue to arrive on
* average faster than they can be processed. </li>
*
* <li><em>Bounded queues.</em> A bounded queue (for example, an
* {@link ArrayBlockingQueue}) helps prevent resource exhaustion when
* used with finite maximumPoolSizes, but can be more difficult to
* tune and control. Queue sizes and maximum pool sizes may be traded
* off for each other: Using large queues and small pools minimizes
* CPU usage, OS resources, and context-switching overhead, but can
* lead to artificially low throughput. If tasks frequently block (for
* example if they are I/O bound), a system may be able to schedule
* time for more threads than you otherwise allow. Use of small queues
* generally requires larger pool sizes, which keeps CPUs busier but
* may encounter unacceptable scheduling overhead, which also
* decreases throughput. </li>
*
* </ol>
*
* </dd>
*
* <dt>Rejected tasks</dt>
*
* <dd>New tasks submitted in method {@link #execute(Runnable)} will be
* <em>rejected</em> when the Executor has been shut down, and also when
* the Executor uses finite bounds for both maximum threads and work queue
* capacity, and is saturated. In either case, the {@code execute} method
* invokes the {@link
* RejectedExecutionHandler#rejectedExecution(Runnable, ThreadPoolExecutor)}
* method of its {@link RejectedExecutionHandler}. Four predefined handler
* policies are provided:
*
* <ol>
*
* <li> In the default {@link ThreadPoolExecutor.AbortPolicy}, the
* handler throws a runtime {@link RejectedExecutionException} upon
* rejection. </li>
*
* <li> In {@link ThreadPoolExecutor.CallerRunsPolicy}, the thread
* that invokes {@code execute} itself runs the task. This provides a
* simple feedback control mechanism that will slow down the rate that
* new tasks are submitted. </li>
*
* <li> In {@link ThreadPoolExecutor.DiscardPolicy}, a task that
* cannot be executed is simply dropped. </li>
*
* <li>In {@link ThreadPoolExecutor.DiscardOldestPolicy}, if the
* executor is not shut down, the task at the head of the work queue
* is dropped, and then execution is retried (which can fail again,
* causing this to be repeated.) </li>
*
* </ol>
*
* It is possible to define and use other kinds of {@link
* RejectedExecutionHandler} classes. Doing so requires some care
* especially when policies are designed to work only under particular
* capacity or queuing policies. </dd>
*
* <dt>Hook methods</dt>
*
* <dd>This class provides {@code protected} overridable
* {@link #beforeExecute(Thread, Runnable)} and
* {@link #afterExecute(Runnable, Throwable)} methods that are called
* before and after execution of each task. These can be used to
* manipulate the execution environment; for example, reinitializing
* ThreadLocals, gathering statistics, or adding log entries.
* Additionally, method {@link #terminated} can be overridden to perform
* any special processing that needs to be done once the Executor has
* fully terminated.
*
* <p>If hook or callback methods throw exceptions, internal worker
* threads may in turn fail and abruptly terminate.</dd>
*
* <dt>Queue maintenance</dt>
*
* <dd>Method {@link #getQueue()} allows access to the work queue
* for purposes of monitoring and debugging. Use of this method for
* any other purpose is strongly discouraged. Two supplied methods,
* {@link #remove(Runnable)} and {@link #purge} are available to
* assist in storage reclamation when large numbers of queued tasks
* become cancelled.</dd>
*
* <dt>Finalization</dt>
*
* <dd>A pool that is no longer referenced in a program <em>AND</em>
* has no remaining threads will be {@code shutdown} automatically. If
* you would like to ensure that unreferenced pools are reclaimed even
* if users forget to call {@link #shutdown}, then you must arrange
* that unused threads eventually die, by setting appropriate
* keep-alive times, using a lower bound of zero core threads and/or
* setting {@link #allowCoreThreadTimeOut(boolean)}. </dd>
*
* </dl>
*
* @since 1.5
* @author Doug Lea
*/