如需轉(zhuǎn)載請(qǐng)?jiān)u論或簡(jiǎn)信匣距,并注明出處面哥,未經(jīng)允許不得轉(zhuǎn)載
目錄
前言
相信很多同學(xué)都有一定的多線程開發(fā)經(jīng)驗(yàn),常見的實(shí)現(xiàn)多線程的方式有兩種毅待,一種是直接繼承Thread類尚卫,一種是通過實(shí)現(xiàn)Runnable接口。但這兩種方式都有一個(gè)共同的問題尸红,就是線程運(yùn)行完畢后都會(huì)被虛擬機(jī)銷毀吱涉,當(dāng)需要使用的時(shí)候又會(huì)創(chuàng)建新的線程。而Java中創(chuàng)建和銷毀一個(gè)線程是比較昂貴的操作外里,需要系統(tǒng)調(diào)用怎爵。如果線程數(shù)量多的話,頻繁的創(chuàng)建和銷毀線程會(huì)大大浪費(fèi)時(shí)間和效率盅蝗,同時(shí)會(huì)浪費(fèi)內(nèi)存鳖链。所以本文將通過對(duì)線程池的學(xué)習(xí),從而更加高效的使用多線程
在面向?qū)ο缶幊讨卸漳瑒?chuàng)建和銷毀對(duì)象是很費(fèi)時(shí)間的芙委,提高服務(wù)程序效率的一個(gè)手段就是盡可能減少創(chuàng)建和銷毀對(duì)象的次數(shù),特別是一些很耗資源的對(duì)象創(chuàng)建和銷毀贼穆。如何利用已有對(duì)象來服務(wù)就是一個(gè)需要解決的關(guān)鍵問題题山,其實(shí)這就是一些"池化資源"技術(shù)產(chǎn)生的原因故痊。比如大家所熟悉的數(shù)據(jù)庫(kù)連接池正是遵循這一思想而產(chǎn)生的
對(duì)Java對(duì)象池相關(guān)概念有興趣的也可以看:對(duì)象池技術(shù):如何正確創(chuàng)建對(duì)象
概念
線程池就是首先創(chuàng)建一些線程顶瞳,將它們的集合稱為線程池。使用線程池可以很好地提高性能,線程池在系統(tǒng)啟動(dòng)時(shí)即創(chuàng)建大量空閑的線程慨菱,程序?qū)⒁粋€(gè)任務(wù)傳給線程池焰络,線程池就會(huì)啟動(dòng)一條線程來執(zhí)行這個(gè)任務(wù),執(zhí)行結(jié)束以后符喝,該線程并不會(huì)死亡闪彼,而是再次返回線程池中成為空閑狀態(tài),等待執(zhí)行下一個(gè)任務(wù)
作用
線程池的作用簡(jiǎn)單來說就是通過線程復(fù)用协饲,減少線程的創(chuàng)建于銷毀的性能損耗
工作機(jī)制
- 任務(wù)不是直接提交給某個(gè)線程畏腕,而是提交給整個(gè)線程池,線程池在拿到任務(wù)后茉稠,就在內(nèi)部尋找是否有空閑的線程描馅,如果有,則將任務(wù)交給某個(gè)空閑的線程
- 一個(gè)線程同時(shí)只能執(zhí)行一個(gè)任務(wù)而线,但可以同時(shí)向一個(gè)線程池提交多個(gè)任務(wù)
UML類圖
源碼分析
Executor
Executor
是一個(gè)接口铭污,其只定義了一個(gè)execute()
方法:void execute(Runnable command);
Executor
只能提交Runnable
形式的任務(wù),不支持提交Callable
帶有返回值的任務(wù)
public interface Executor {
void execute(Runnable command);
}
ExecutorService
ExecutorService
也是一個(gè)接口膀篮,在Executor
的基礎(chǔ)上加入了線程池的生命周期管理嘹狞,我們可以通過shutdown()
或者shutdownNow()
方法來關(guān)閉我們的線程池。
ExecutorService
支持提交Callable
形式的任務(wù)誓竿,提交完Callable
任務(wù)后我們拿到一個(gè)Future
磅网,它代表一個(gè)異步任務(wù)執(zhí)行的結(jié)果
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
shutdown()
和shutdownNow()
區(qū)別
- 相同點(diǎn)
這兩個(gè)方法都是非阻塞的,調(diào)用后立即返回筷屡,不會(huì)等待線程池關(guān)閉完成知市。如果我們需要等待線程池處理完成再返回可以使用ExecutorService#awaitTermination
來完成
- 不同點(diǎn)
shutdown()
會(huì)等待線程池中已經(jīng)運(yùn)行的任務(wù)和阻塞隊(duì)列中等待執(zhí)行的任務(wù)執(zhí)行完成
shutdownNow()
會(huì)嘗試中斷線程池中已經(jīng)運(yùn)行的任務(wù),阻塞隊(duì)列中等待的任務(wù)不會(huì)再被執(zhí)行速蕊,阻塞隊(duì)列中等待執(zhí)行的任務(wù)會(huì)作為返回值返回
ThreadPoolExecutor
這個(gè)類是線程池最核心的類
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
-
corePoolSize:核心線程數(shù)量,當(dāng)有新任務(wù)在
execute()
方法提交時(shí)娘赴,會(huì)執(zhí)行以下判斷:- 如果運(yùn)行的線程少于
corePoolSize
规哲,則創(chuàng)建新線程來處理任務(wù),即使線程池中的其他線程是空閑的 - 如果線程池中的線程數(shù)量大于等于
corePoolSize
且小于maximumPoolSize
诽表,則只有當(dāng)workQueue
滿時(shí)才創(chuàng)建新的線程去處理任務(wù) - 如果設(shè)置的
corePoolSize
和maximumPoolSize
相同唉锌,則創(chuàng)建的線程池的大小是固定的,這時(shí)如果有新任務(wù)提交竿奏,若workQueue
未滿袄简,則將請(qǐng)求放入workQueue
中,等待有空閑的線程去從workQueue
中取任務(wù)并處理 - 如果運(yùn)行的線程數(shù)量大于等于
maximumPoolSize
泛啸,這時(shí)如果workQueue
已經(jīng)滿了绿语,則通過handler
所指定的策略來處理任務(wù)
所以,任務(wù)提交時(shí),判斷的順序?yàn)?
corePoolSize
–>workQueue
–>maximumPoolSize
- 如果運(yùn)行的線程少于
maximumPoolSize:最大線程數(shù)量
-
workQueue:等待隊(duì)列吕粹,當(dāng)任務(wù)提交時(shí)种柑,如果線程池中的線程數(shù)量大于等于
corePoolSize
的時(shí)候,把該任務(wù)封裝成一個(gè)Worker
對(duì)象放入等待隊(duì)列匹耕。workQueue主要有三種:SynchronousQueue:
SynchronousQueue
沒有容量聚请,是無緩沖等待隊(duì)列,是一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列稳其,會(huì)直接將任務(wù)交給消費(fèi)者驶赏,必須等隊(duì)列中的添加元素被消費(fèi)后才能繼續(xù)添加新的元素。使用SynchronousQueue阻塞隊(duì)列一般要求maximumPoolSizes
為無界既鞠,避免線程拒絕執(zhí)行操作LinkedBlockingQueue:
LinkedBlockingQueue
是一個(gè)無界(沒有大小限制)緩存等待隊(duì)列煤傍。當(dāng)前執(zhí)行的線程數(shù)量達(dá)到corePoolSize
的數(shù)量時(shí),剩余的元素會(huì)在阻塞隊(duì)列里等待损趋,在使用此阻塞隊(duì)列時(shí)maximumPoolSizes
就相當(dāng)于無效了-
ArrayBlockingQueue
ArrayBlockingQueue
是一個(gè)有界緩存等待隊(duì)列患久,可以指定緩存隊(duì)列的大小,當(dāng)線程數(shù)量大于corePoolSize
時(shí)浑槽,多余的任務(wù)會(huì)緩存在ArrayBlockingQueue
隊(duì)列中等待有空閑的線程時(shí)繼續(xù)執(zhí)行蒋失;當(dāng)ArrayBlockingQueue
滿時(shí),則又會(huì)開啟新的線程去執(zhí)行桐玻,直到線程數(shù)量達(dá)到maximumPoolSize
篙挽;當(dāng)線程數(shù)已經(jīng)達(dá)到最大的maximumPoolSize
時(shí),再有新的任務(wù)到達(dá)時(shí)會(huì)執(zhí)行拒絕執(zhí)行策略(RejectedExecutionException
)
keepAliveTime:線程池維護(hù)線程所允許的空閑時(shí)間镊靴。當(dāng)線程池中的線程數(shù)量大于
corePoolSize
的時(shí)候铣卡,如果這時(shí)沒有新的任務(wù)提交,核心線程外的線程不會(huì)立即銷毀偏竟,而是會(huì)等待煮落,直到等待的時(shí)間超過了keepAliveTime
threadFactory:它是
ThreadFactory
類型的變量,用來創(chuàng)建新線程踊谋。默認(rèn)使用Executors.defaultThreadFactory()
來創(chuàng)建線程蝉仇。使用默認(rèn)的ThreadFactory
來創(chuàng)建線程時(shí),會(huì)使新創(chuàng)建的線程具有相同的NORM_PRIORITY優(yōu)先級(jí)并且是非守護(hù)線程殖蚕,同時(shí)也設(shè)置了線程的名稱-
handler:它是
RejectedExecutionHandler
類型的變量轿衔,表示線程池的飽和策略。如果阻塞隊(duì)列滿了并且沒有空閑的線程睦疫,這時(shí)如果繼續(xù)提交任務(wù)害驹,就需要采取一種策略處理該任務(wù)。線程池提供了4種策略:-
AbortPolicy:直接拋出異常(
RejectedExecutionException
)蛤育,這是默認(rèn)策略 - CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù)
- DiscardOldestPolicy:丟棄阻塞隊(duì)列中靠最前的任務(wù)宛官,并執(zhí)行當(dāng)前任務(wù)
- DiscardPolicy:直接丟棄任務(wù)
-
AbortPolicy:直接拋出異常(
如果線程池中的線程數(shù)量大于 corePoolSize
時(shí)葫松,如果某線程空閑時(shí)間超過keepAliveTime
,線程將被終止摘刑,直至線程池中的線程數(shù)目不大于corePoolSize
进宝;
如果允許為核心池中的線程設(shè)置存活時(shí)間,那么核心池中的線程空閑時(shí)間超過 keepAliveTime
枷恕,線程也會(huì)被終止
ThreadPoolExecutor#execute()方法執(zhí)行流程如下:
ScheduledThreadPoolExecutor
ThreadPoolExecutor
子類党晋,它在ThreadPoolExecutor
基礎(chǔ)上加入了任務(wù)定時(shí)執(zhí)行的功能
Executors
java提供Executors
類直接創(chuàng)建四種線程池
newCachedThreadPool
創(chuàng)建一個(gè)可緩存線程池,如果線程池長(zhǎng)度超過處理需要徐块,可靈活回收空閑線程未玻,若無可回收,則新建線程胡控。線程池為無限大扳剿,當(dāng)執(zhí)行第二個(gè)任務(wù)時(shí)第一個(gè)任務(wù)已經(jīng)完成,會(huì)復(fù)用執(zhí)行第一個(gè)任務(wù)的線程昼激,而不用每次新建線程
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newFixedThreadPool
創(chuàng)建一個(gè)定長(zhǎng)線程池庇绽,可控制線程最大并發(fā)數(shù),超出的線程會(huì)在隊(duì)列中等待橙困。定長(zhǎng)線程池的大小最好根據(jù)系統(tǒng)資源進(jìn)行設(shè)置瞧掺。如Runtime.getRuntime().availableProcessors()
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newScheduledThreadPool
創(chuàng)建一個(gè)定長(zhǎng)線程池,支持定時(shí)及周期性任務(wù)執(zhí)行
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
newSingleThreadExecutor
創(chuàng)建一個(gè)單線程化的線程池凡傅,它只會(huì)用唯一的工作線程來執(zhí)行任務(wù)辟狈,保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級(jí))執(zhí)行
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
線程池的監(jiān)控
通過線程池提供的參數(shù)進(jìn)行監(jiān)控。線程池里有一些屬性在監(jiān)控線程池的時(shí)候可以使用
- getTaskCount:線程池已經(jīng)執(zhí)行的和未執(zhí)行的任務(wù)總數(shù)
- getCompletedTaskCount:線程池已完成的任務(wù)數(shù)量夏跷,該值小于等于taskCount
-
getLargestPoolSize:線程池曾經(jīng)創(chuàng)建過的最大線程數(shù)量哼转。通過這個(gè)數(shù)據(jù)可以知道線程池是否滿過,也就是達(dá)到了
maximumPoolSize
- getPoolSize:線程池當(dāng)前的線程數(shù)量
- getActiveCount:當(dāng)前線程池中正在執(zhí)行任務(wù)的線程數(shù)量
通過這些方法槽华,可以對(duì)線程池進(jìn)行監(jiān)控壹蔓,在ThreadPoolExecutor
類中提供了幾個(gè)空方法,如beforeExecute()
方法猫态,afterExecute()
方法和terminated()
方法庶溶,可以擴(kuò)展這些方法在執(zhí)行前或執(zhí)行后增加一些新的操作,例如統(tǒng)計(jì)線程池的執(zhí)行任務(wù)的時(shí)間等懂鸵,可以繼承自ThreadPoolExecutor
來進(jìn)行擴(kuò)展
總結(jié)
ThreadPoolExecutor
的使用還是很有技巧的。使用無界workQueue
可能會(huì)耗盡系統(tǒng)資源行疏,使用有界workQueue
可能不能很好的滿足性能匆光,需要調(diào)節(jié)線程數(shù)和workQueue
大小,所以需要根據(jù)不同場(chǎng)景進(jìn)行調(diào)節(jié)