熟悉Java多線程編程的同學(xué)都知道嘀粱,當(dāng)我們線程創(chuàng)建過(guò)多時(shí)宋彼,容易引發(fā)內(nèi)存溢出录别,因此我們就有必要使用線程池的技術(shù)了。
最近看了一些相關(guān)文章古话,并親自研究了一下源碼空民,發(fā)現(xiàn)有些文章還是有些問(wèn)題的设捐,所以我也總結(jié)了一下梗夸,在此奉獻(xiàn)給大家异希。
1 線程池的優(yōu)勢(shì)
總體來(lái)說(shuō),線程池有如下的優(yōu)勢(shì):
(1)降低資源消耗绒瘦。通過(guò)重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。
(2)提高響應(yīng)速度扣癣。當(dāng)任務(wù)到達(dá)時(shí)惰帽,任務(wù)可以不需要等到線程創(chuàng)建就能立即執(zhí)行。
(3)提高線程的可管理性父虑。線程是稀缺資源该酗,如果無(wú)限制的創(chuàng)建,不僅會(huì)消耗系統(tǒng)資源士嚎,還會(huì)降低系統(tǒng)的穩(wěn)定性呜魄,使用線程池可以進(jìn)行統(tǒng)一的分配,調(diào)優(yōu)和監(jiān)控莱衩。
2 線程池的使用
線程池的真正實(shí)現(xiàn)類是ThreadPoolExecutor
爵嗅,其構(gòu)造方法有如下4種:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
可以看到,其需要如下幾個(gè)參數(shù):
-
corePoolSize(必需):核心線程數(shù)笨蚁。默認(rèn)情況下睹晒,核心線程會(huì)一直存活,但是當(dāng)將
allowCoreThreadTimeout
設(shè)置為true時(shí)括细,核心線程也會(huì)超時(shí)回收伪很。 - maximumPoolSize(必需):線程池所能容納的最大線程數(shù)。當(dāng)活躍線程數(shù)達(dá)到該數(shù)值后奋单,后續(xù)的新任務(wù)將會(huì)阻塞锉试。
-
keepAliveTime(必需):線程閑置超時(shí)時(shí)長(zhǎng)。如果超過(guò)該時(shí)長(zhǎng)览濒,非核心線程就會(huì)被回收呆盖。如果將
allowCoreThreadTimeout
設(shè)置為true時(shí),核心線程也會(huì)超時(shí)回收匾七。 -
unit(必需):指定keepAliveTime參數(shù)的時(shí)間單位絮短。常用的有:
TimeUnit.MILLISECONDS
(毫秒)、TimeUnit.SECONDS
(秒)昨忆、TimeUnit.MINUTES
(分)丁频。 - workQueue(必需):任務(wù)隊(duì)列。通過(guò)線程池的execute()方法提交的Runnable對(duì)象將存儲(chǔ)在該參數(shù)中。其采用阻塞隊(duì)列實(shí)現(xiàn)席里。
- threadFactory(可選):線程工廠叔磷。用于指定為線程池創(chuàng)建新線程的方式。
- handler(可選):拒絕策略奖磁。當(dāng)達(dá)到最大線程數(shù)時(shí)需要執(zhí)行的飽和策略改基。
線程池的使用流程如下:
// 創(chuàng)建線程池
Executor threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE,
KEEP_ALIVE,
TimeUnit.SECONDS,
sPoolWorkQueue,
sThreadFactory);
// 向線程池提交任務(wù)
threadPool.execute(new Runnable() {
@Override
public void run() {
... // 線程執(zhí)行的任務(wù)
}
});
// 關(guān)閉線程池
threadPool.shutdown(); // 設(shè)置線程池的狀態(tài)為SHUTDOWN,然后中斷所有沒有正在執(zhí)行任務(wù)的線程
threadPool.shutdownNow(); // 設(shè)置線程池的狀態(tài)為 STOP咖为,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程秕狰,并返回等待執(zhí)行任務(wù)的列表
3 線程池的工作原理
下面來(lái)描述一下線程池工作的原理,同時(shí)對(duì)上面的參數(shù)有一個(gè)更深的了解躁染。其工作原理流程圖如下:
通過(guò)上圖鸣哀,相信大家已經(jīng)對(duì)所有參數(shù)有個(gè)了解了。其實(shí)還有一點(diǎn)吞彤,在線程池中并沒有區(qū)分線程是否是核心線程的我衬。下面我們?cè)賹?duì)任務(wù)隊(duì)列、線程工廠和拒絕策略做更多的說(shuō)明饰恕。
4 線程池的參數(shù)
4.1 任務(wù)隊(duì)列(workQueue)
任務(wù)隊(duì)列是基于阻塞隊(duì)列實(shí)現(xiàn)的挠羔,即采用生產(chǎn)者消費(fèi)者模式,在Java中需要實(shí)現(xiàn)BlockingQueue
接口埋嵌。但Java已經(jīng)為我們提供了7種阻塞隊(duì)列的實(shí)現(xiàn):
- ArrayBlockingQueue:一個(gè)由數(shù)組結(jié)構(gòu)組成的有界阻塞隊(duì)列(數(shù)組結(jié)構(gòu)可配合指針實(shí)現(xiàn)一個(gè)環(huán)形隊(duì)列)破加。
-
LinkedBlockingQueue: 一個(gè)由鏈表結(jié)構(gòu)組成的有界阻塞隊(duì)列,在未指明容量時(shí)雹嗦,容量默認(rèn)為
Integer.MAX_VALUE
拌喉。 -
PriorityBlockingQueue: 一個(gè)支持優(yōu)先級(jí)排序的無(wú)界阻塞隊(duì)列,對(duì)元素沒有要求俐银,可以實(shí)現(xiàn)
Comparable
接口也可以提供Comparator來(lái)對(duì)隊(duì)列中的元素進(jìn)行比較尿背。跟時(shí)間沒有任何關(guān)系,僅僅是按照優(yōu)先級(jí)取任務(wù)捶惜。 -
DelayQueue:類似于
PriorityBlockingQueue
田藐,是二叉堆實(shí)現(xiàn)的無(wú)界優(yōu)先級(jí)阻塞隊(duì)列。要求元素都實(shí)現(xiàn)Delayed
接口吱七,通過(guò)執(zhí)行時(shí)延從隊(duì)列中提取任務(wù)汽久,時(shí)間沒到任務(wù)取不出來(lái)。 - SynchronousQueue: 一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列踊餐,消費(fèi)者線程調(diào)用take()方法的時(shí)候就會(huì)發(fā)生阻塞景醇,直到有一個(gè)生產(chǎn)者線程生產(chǎn)了一個(gè)元素,消費(fèi)者線程就可以拿到這個(gè)元素并返回吝岭;生產(chǎn)者線程調(diào)用put()方法的時(shí)候也會(huì)發(fā)生阻塞三痰,直到有一個(gè)消費(fèi)者線程消費(fèi)了一個(gè)元素吧寺,生產(chǎn)者才會(huì)返回。
- LinkedBlockingDeque: 使用雙向隊(duì)列實(shí)現(xiàn)的有界雙端阻塞隊(duì)列散劫。雙端意味著可以像普通隊(duì)列一樣FIFO(先進(jìn)先出)稚机,也可以像棧一樣FILO(先進(jìn)后出)。
-
LinkedTransferQueue: 它是
ConcurrentLinkedQueue
获搏、LinkedBlockingQueue
和SynchronousQueue
的結(jié)合體赖条,但是把它用在ThreadPoolExecutor中,和LinkedBlockingQueue
行為一致常熙,但是是無(wú)界的阻塞隊(duì)列纬乍。
注意有界隊(duì)列和無(wú)界隊(duì)列的區(qū)別:如果使用有界隊(duì)列,當(dāng)隊(duì)列飽和時(shí)并超過(guò)最大線程數(shù)時(shí)就會(huì)執(zhí)行拒絕策略裸卫;而如果使用無(wú)界隊(duì)列蕾额,因?yàn)槿蝿?wù)隊(duì)列永遠(yuǎn)都可以添加任務(wù),所以設(shè)置maximumPoolSize
沒有任何意義彼城。
4.2 線程工廠(threadFactory)
線程工廠指定創(chuàng)建線程的方式,需要實(shí)現(xiàn)ThreadFactory
接口退个,并實(shí)現(xiàn)newThread(Runnable r)
方法募壕。該參數(shù)可以不用指定,Executors框架已經(jīng)為我們實(shí)現(xiàn)了一個(gè)默認(rèn)的線程工廠:
/**
* The default thread factory.
*/
private static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
4.3 拒絕策略(handler)
當(dāng)線程池的線程數(shù)達(dá)到最大線程數(shù)時(shí)语盈,需要執(zhí)行拒絕策略舱馅。拒絕策略需要實(shí)現(xiàn)RejectedExecutionHandler
接口,并實(shí)現(xiàn)rejectedExecution(Runnable r, ThreadPoolExecutor executor)
方法刀荒。不過(guò)Executors框架已經(jīng)為我們實(shí)現(xiàn)了4種拒絕策略:
-
AbortPolicy(默認(rèn)):丟棄任務(wù)并拋出
RejectedExecutionException
異常代嗤。 - CallerRunsPolicy:由調(diào)用線程處理該任務(wù)。
- DiscardPolicy:丟棄任務(wù)缠借,但是不拋出異常干毅。可以配合這種模式進(jìn)行自定義的處理方式泼返。
- DiscardOldestPolicy:丟棄隊(duì)列最早的未處理任務(wù)硝逢,然后重新嘗試執(zhí)行任務(wù)。
5 功能線程池
嫌上面使用線程池的方法太麻煩绅喉?其實(shí)Executors已經(jīng)為我們封裝好了4種常見的功能線程池渠鸽,如下:
- 定長(zhǎng)線程池(FixedThreadPool)
- 定時(shí)線程池(ScheduledThreadPool )
- 可緩存線程池(CachedThreadPool)
- 單線程化線程池(SingleThreadExecutor)
5.1 定長(zhǎng)線程池(FixedThreadPool)
創(chuàng)建方法的源碼:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
- 特點(diǎn):只有核心線程,線程數(shù)量固定柴罐,執(zhí)行完立即回收徽缚,任務(wù)隊(duì)列為鏈表結(jié)構(gòu)的有界隊(duì)列。
- 應(yīng)用場(chǎng)景:控制線程最大并發(fā)數(shù)革屠。
使用示例:
// 1. 創(chuàng)建定長(zhǎng)線程池對(duì)象 & 設(shè)置線程池線程數(shù)量固定為3
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
// 2. 創(chuàng)建好Runnable類線程對(duì)象 & 需執(zhí)行的任務(wù)
Runnable task =new Runnable(){
public void run() {
System.out.println("執(zhí)行任務(wù)啦");
}
};
// 3. 向線程池提交任務(wù)
fixedThreadPool.execute(task);
5.2 定時(shí)線程池(ScheduledThreadPool )
創(chuàng)建方法的源碼:
private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory);
}
- 特點(diǎn):核心線程數(shù)量固定凿试,非核心線程數(shù)量無(wú)限排宰,執(zhí)行完閑置10ms后回收,任務(wù)隊(duì)列為延時(shí)阻塞隊(duì)列红省。
- 應(yīng)用場(chǎng)景:執(zhí)行定時(shí)或周期性的任務(wù)额各。
使用示例:
// 1. 創(chuàng)建 定時(shí)線程池對(duì)象 & 設(shè)置線程池線程數(shù)量固定為5
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
// 2. 創(chuàng)建好Runnable類線程對(duì)象 & 需執(zhí)行的任務(wù)
Runnable task =new Runnable(){
public void run() {
System.out.println("執(zhí)行任務(wù)啦");
}
};
// 3. 向線程池提交任務(wù)
scheduledThreadPool.schedule(task, 1, TimeUnit.SECONDS); // 延遲1s后執(zhí)行任務(wù)
scheduledThreadPool.scheduleAtFixedRate(task,10,1000,TimeUnit.MILLISECONDS);// 延遲10ms后、每隔1000ms執(zhí)行任務(wù)
5.3 可緩存線程池(CachedThreadPool)
創(chuàng)建方法的源碼:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
- 特點(diǎn):無(wú)核心線程吧恃,非核心線程數(shù)量無(wú)限虾啦,執(zhí)行完閑置60s后回收,任務(wù)隊(duì)列為不存儲(chǔ)元素的阻塞隊(duì)列痕寓。
- 應(yīng)用場(chǎng)景:執(zhí)行大量傲醉、耗時(shí)少的任務(wù)。
使用示例:
// 1. 創(chuàng)建可緩存線程池對(duì)象
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
// 2. 創(chuàng)建好Runnable類線程對(duì)象 & 需執(zhí)行的任務(wù)
Runnable task =new Runnable(){
public void run() {
System.out.println("執(zhí)行任務(wù)啦");
}
};
// 3. 向線程池提交任務(wù)
cachedThreadPool.execute(task);
5.4 單線程化線程池(SingleThreadExecutor)
創(chuàng)建方法的源碼:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
- 特點(diǎn):只有1個(gè)核心線程呻率,無(wú)非核心線程硬毕,執(zhí)行完立即回收,任務(wù)隊(duì)列為鏈表結(jié)構(gòu)的有界隊(duì)列礼仗。
- 應(yīng)用場(chǎng)景:不適合并發(fā)但可能引起IO阻塞性及影響UI線程響應(yīng)的操作吐咳,如數(shù)據(jù)庫(kù)操作、文件操作等元践。
使用示例:
// 1. 創(chuàng)建單線程化線程池
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
// 2. 創(chuàng)建好Runnable類線程對(duì)象 & 需執(zhí)行的任務(wù)
Runnable task =new Runnable(){
public void run() {
System.out.println("執(zhí)行任務(wù)啦");
}
};
// 3. 向線程池提交任務(wù)
singleThreadExecutor.execute(task);
5.5 對(duì)比
6 總結(jié)
Executors的4個(gè)功能線程池雖然方便韭脊,但現(xiàn)在已經(jīng)不建議使用了,而是建議直接通過(guò)使用ThreadPoolExecutor的方式单旁,這樣的處理方式讓寫的同學(xué)更加明確線程池的運(yùn)行規(guī)則沪羔,規(guī)避資源耗盡的風(fēng)險(xiǎn)。
其實(shí)Executors的4個(gè)功能線程有如下弊端:
-
FixedThreadPool
和SingleThreadExecutor
:主要問(wèn)題是堆積的請(qǐng)求處理隊(duì)列均采用LinkedBlockingQueue
象浑,可能會(huì)耗費(fèi)非常大的內(nèi)存蔫饰,甚至OOM。 -
CachedThreadPool
和ScheduledThreadPool
:主要問(wèn)題是線程數(shù)最大數(shù)是Integer.MAX_VALUE
愉豺,可能會(huì)創(chuàng)建數(shù)量非常多的線程篓吁,甚至OOM。