合理使用線程池能夠帶來3個好處。
第一矿筝,降低資源消耗。通過重復利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗棚贾。
第二窖维,提高響應速度。當任務到達時妙痹,任務可以不需要等到線程創(chuàng)建就立即執(zhí)行铸史。
第三,提高線程的可管理性怯伊。線程是稀缺資源琳轿,如果無限制的創(chuàng)建,不僅會消耗系統(tǒng)資源耿芹,還會降低系統(tǒng)的穩(wěn)定性崭篡,使用線程池可以統(tǒng)一的分配、調(diào)優(yōu)和監(jiān)控吧秕。
以下是本文的目錄大綱:
- ThreadPoolExecutor類
- 線程池實現(xiàn)原理和源碼分析
- 使用示例
- 合理配置線程池大小
一琉闪、ThreadPoolExecutor類
java.util.concurrent.ThreadPoolExecutor類是線程池中最核心的一個類,該類提供4個構造方法:
public class ThreadPoolExecutor extends AbstractExecutorService {
......
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
從上面代碼可知砸彬,ThreadPoolExecutor繼承了AbstractExecutorService颠毙,而且,前三個構造器均調(diào)用第四個構造器進行初始化工作砂碉。
下面解釋下構造器中各個參數(shù)的含義:
** corePoolSize **:核心池的大小吟秩,該參數(shù)與之后的線程池實現(xiàn)原理有很大的關系。在創(chuàng)建了線程池后绽淘,默認情況下涵防,線程池中并沒有任何線程,而是等待有任務到來才創(chuàng)建線程去執(zhí)行任務沪铭,除非調(diào)用prestartAllCoreThreads()和prestartCoreThread()方法壮池,從方法名字可以看出,是預創(chuàng)建線程的意思杀怠,即在沒有任務到來之前椰憋,就創(chuàng)建corePoolSize個線程或1個線程。默認情況下赔退,在創(chuàng)建了線程池后橙依,線程池中的線程數(shù)為0证舟,當有任務來之后,就會創(chuàng)建一個線程去執(zhí)行任務窗骑,當線程池中的線程數(shù)目達到corePoolSize后女责,就會把到達的任務放到緩存隊列當中;
** maximumPoolSize **:線程池中的最大線程數(shù)创译,表示線程池中最多能創(chuàng)建多少個線程抵知。
** keepAliveTime **:表示線程沒有任務執(zhí)行時最多存活多久。默認情況下软族,只有當線程池中的線程數(shù)大于corePoolSize時刷喜,keepAliveTime才會起作用,知道線程池中的線程不大于corePoolSize立砸,即當線程池中的線程數(shù)大于corePoolSize時掖疮,如果一個線程空閑的時間達到keepAliveTime,則會終止颗祝,直到線程池中的線程數(shù)不超過corePoolSize浊闪。但是如果調(diào)用了allowCoreThreadTimeOut(boolean value)方法,在線程池中的線程數(shù)不大于corePoolSize時吐葵,keepAliveTime參數(shù)也會起作用规揪,直到線程池中的線程數(shù)為0;
** unit **:參數(shù)keepAliveTime的時間單位温峭,有7種取值猛铅,在TimeUnit類中有7種靜態(tài)屬性:
TimeUnit.DAYS; // 天
TimeUnit.HOURS; // 時
TimeUnit.MINUTES; // 分
TimeUnit.SECONDS; // 秒
TimeUnit.MILLISECONDS; // 毫秒
TimeUnit.MICROSECONDS; // 微妙
TimeUnit.NANOSECONDS; // 納秒
- ** workQueue **: 一個阻塞隊列,用來存儲等待執(zhí)行的任務凤藏。該參數(shù)也很重要奸忽,會對線程池的運行過程產(chǎn)生巨大影響,一般而言揖庄,有一下幾種選擇:
ArrayBlockingQueue:是一個基于數(shù)組結構的有界阻塞隊列栗菜,此隊列按 FIFO(先進先出)原則對元素進行排序;
LinkedBlockingQueue:一個基于鏈表結構的阻塞隊列蹄梢,此隊列按FIFO (先進先出) 排序元素疙筹,吞吐量通常要高于ArrayBlockingQueue。靜態(tài)工廠方法Executors.newFixedThreadPool()使用了這個隊列禁炒;
SynchronousQueue:一個不存儲元素的阻塞隊列而咆。每個插入操作必須等到另一個線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài)幕袱,吞吐量通常要高于LinkedBlockingQueue暴备,靜態(tài)工廠方法Executors.newCachedThreadPool使用了這個隊列;
PriorityBlockingQueue:一個具有優(yōu)先級的無限阻塞隊列们豌;
** threadFactory **:線程工廠涯捻,主要用于創(chuàng)建線程浅妆;
** handler **:飽和策略,即當隊列和線程池都滿了障癌,說明線程池處于飽和狀態(tài)凌外,那么必須采取一種策略處理提交的新任務。這個策略默認情況下是AbortPolicy混弥,表示無法處理新任務時拋出異常趴乡。以下是JDK1.5提供的四種策略:
ThreadPoolExecutor.AbortPolicy:
丟棄任務并拋出RejectedExecutionException異常对省;
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務蝗拿,但是不拋出異常;
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新嘗試執(zhí)行任務(重復此過程)蒿涎;
ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務;
從上面給出的ThreadPoolExecutor類的代碼可以知道哀托,ThreadPoolExecutor繼承了AbstractExecutorService,我們來看一下AbstractExecutorService的實現(xiàn):
public abstract class AbstractExecutorService implements ExecutorService {
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value)劳秋;
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
public Future<?> submit(Runnable task) {};
public <T> Future<T> submit(Runnable task, T result) { };
public <T> Future<T> submit(Callable<T> task) { };
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
};
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
};
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
};
}
AbstractExecutorService是一個抽象類仓手,它實現(xiàn)了ExecutorService接口。
我們接著看ExecutorService接口的實現(xiàn):
public interface ExecutorService extends Executor {
void shutdown();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
而ExecutorService又繼承了Executor接口玻淑,實現(xiàn)如下:
public interface Executor {
void execute(Runnable command);
}
到這里嗽冒,大家應該明白了ThreadPoolExecutor、AbstractExecutorService补履、ExecutorService和Executor幾個之間的關系了添坊。
Executor是一個頂層接口,在它里面只聲明了一個方法execute(Runnable)箫锤,返回值為void贬蛙,參數(shù)為Runnable類型,從字面意思可以理解谚攒,就是用來執(zhí)行傳進去的任務的阳准;
然后ExecutorService接口繼承了Executor接口,并聲明了一些方法:submit馏臭、invokeAll野蝇、invokeAny以及shutDown等;
抽象類AbstractExecutorService實現(xiàn)了ExecutorService接口括儒,基本實現(xiàn)了ExecutorService中聲明的所有方法绕沈;
然后ThreadPoolExecutor繼承了類AbstractExecutorService。
在ThreadPoolExecutor類中有幾個非常重要的方法:
execute()
submit()
shutdown()
shutdownNow()
execute()方法實際上是Executor中聲明的方法塑崖,在ThreadPoolExecutor進行了具體的實現(xiàn)七冲,這個方法是ThreadPoolExecutor的核心方法,通過這個方法可以向線程池提交一個任務规婆,交由線程池去執(zhí)行澜躺。
submit()方法是在ExecutorService中聲明的方法蝉稳,在AbstractExecutorService就已經(jīng)有了具體的實現(xiàn),在ThreadPoolExecutor中并沒有對其進行重寫掘鄙,這個方法也是用來向線程池提交任務的耘戚,但是它和execute()方法不同,它能夠返回任務執(zhí)行的結果操漠,去看submit()方法的實現(xiàn)收津,會發(fā)現(xiàn)它實際上還是調(diào)用的execute()方法,只不過它利用了Future來獲取任務執(zhí)行結果浊伙。
shutdown()和shutdownNow()是用來關閉線程池的撞秋。
還有很多其他的方法:
比如:getQueue() 、getPoolSize() 嚣鄙、getActiveCount()吻贿、getCompletedTaskCount()等獲取與線程池相關屬性的方法,有興趣的朋友可以自行查閱API哑子。
二舅列、線程池實現(xiàn)原理
下面將深入解析一下線程池的實現(xiàn)原理,分為以下幾個方面:
線程池狀態(tài)
任務的執(zhí)行
線程池中的線程初始化
任務緩存隊列及排隊策略
任務拒絕策略
線程池的關閉
線程池容量的動態(tài)調(diào)整
1. 線程池狀態(tài)
static final int RUNNING = 0;
static final int SHUTDOWN = 1;
static final int STOP = 2;
static final int TERMINATED = 3;
當創(chuàng)建線程池后卧蜓,初始時帐要,線程池處于RUNNING狀態(tài);
如果調(diào)用了shutdown()方法弥奸,則線程池處于SHUTDOWN狀態(tài)榨惠,此時線程池不能夠接受新的任務,它會等待所有任務執(zhí)行完畢其爵;
如果調(diào)用了shutdownNow()方法冒冬,則線程池處于STOP狀態(tài),此時線程池不能接受新的任務摩渺,并且會去嘗試終止正在執(zhí)行的任務简烤;
當線程池處于SHUTDOWN或STOP狀態(tài),并且所有工作線程已經(jīng)銷毀摇幻,任務緩存隊列已經(jīng)清空或執(zhí)行結束后横侦,線程池被設置為TERMINATED狀態(tài)。
2. 任務的執(zhí)行
在了解將任務提交給線程池到任務執(zhí)行完畢整個過程之前绰姻,我們先來看一下ThreadPoolExecutor類中其他的一些比較重要成員變量:
private final BlockingQueue<Runnable> workQueue; // 任務緩存隊列枉侧,用來存放等待執(zhí)行的任務
private final ReentrantLock mainLock = new ReentrantLock(); // 線程池的主要狀態(tài)鎖,對線程池的狀態(tài)(比如線程池的大小狂芋,狀態(tài)等)的改變都要使用這個鎖
private final HashSet<Worker> workers = new HashSet<Worker>(); // 用來存放工作集
private volatile long keepAliveTime; //線程存活時間
private volatile boolean allowCoreThreadTimeOut; // 是否允許為核心線程設置存活時間
private volatile int corePoolSize; // 核心池的大姓ツ佟(即線程池中的線程數(shù)目大于這個參數(shù)時,提交的任務會被放進任務緩存隊列)
private volatile int maximumPoolSize; // 線程池最大能容忍的線程數(shù)
private volatile int poolSize; // 線程池中當前的線程數(shù)
private volatile RejectedExecutionHandler handler; // 任務拒絕策略
private volatile ThreadFactory threadFactory; // 線程工廠帜矾,用來創(chuàng)建線程
private int largestPoolSize; // 用來記錄線程池中曾經(jīng)出現(xiàn)過的最大線程數(shù)
private long completedTaskCount; //用來記錄已經(jīng)執(zhí)行完畢的任務個數(shù)
每個變量的作用都已經(jīng)標明出來了翼虫,這里要重點解釋一下corePoolSize屑柔、maximumPoolSize、largestPoolSize三個變量珍剑。
corePoolSize在很多地方被翻譯成核心池大小掸宛,其實我的理解這個就是線程池的大小。舉個簡單的例子:
假如有一個工廠招拙,工廠里面有10個工人唧瘾,每個工人同時只能做一件任務。
因此只要當10個工人中有工人是空閑的别凤,來了任務就分配給空閑的工人做饰序;
當10個工人都有任務在做時,如果還來了任務闻妓,就把任務進行排隊等待菌羽;
如果說新任務數(shù)目增長的速度遠遠大于工人做任務的速度掠械,那么此時工廠主管可能會想補救措施由缆,比如重新招4個臨時工人進來;
然后就將任務也分配給這4個臨時工人做猾蒂;
如果說著14個工人做任務的速度還是不夠均唉,此時工廠主管可能就要考慮不再接收新的任務或者拋棄前面的一些任務了。
當這14個工人當中有人空閑時肚菠,而新任務增長的速度又比較緩慢舔箭,工廠主管可能就考慮辭掉4個臨時工了,只保持原來的10個工人蚊逢,畢竟請額外的工人是要花錢的层扶。
這個例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)烙荷。
也就是說corePoolSize就是線程池大小镜会,maximumPoolSize在我看來是線程池的一種補救措施,即任務量突然過大時的一種補救措施终抽。
不過為了方便理解戳表,在本文后面還是將corePoolSize翻譯成核心池大小。
largestPoolSize只是一個用來起記錄作用的變量昼伴,用來記錄線程池中曾經(jīng)有過的最大線程數(shù)目匾旭,跟線程池的容量沒有任何關系。不過圃郊,在分析問題時价涝,可以知道線程池是否滿過。
下面我們進入正題持舆,看一下任務從提交到最終執(zhí)行完畢經(jīng)歷了哪些過程色瘩。
在ThreadPoolExecutor類中鞭光,最核心的任務提交方法是execute()方法,雖然通過submit也可以提交任務泞遗,但是實際上submit方法里面最終調(diào)用的還是execute()方法惰许,所以我們只需要研究execute()方法的實現(xiàn)原理即可:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
(未完待續(xù))