- 帶著問題解讀源碼,先看問題:
- JDK中有多種線程池,我們應(yīng)該使用哪一種線程池掉分?選擇的依據(jù)是什么俭缓?
- 線程池中的線程會一直存活著嗎?
- 線程池飽和之后酥郭,無法接收新任務(wù)华坦,線程池如何處理這部分任務(wù)(線程池拒絕策略)?
- 線程池是如何實現(xiàn)將任務(wù)提交與任務(wù)執(zhí)行分離的不从?
- 線程池中使用了“生產(chǎn)者-消費者”模式惜姐,那么它是如何實現(xiàn)的?
- 線程池中的各個參數(shù)分別代表什么消返?并且線程池中的線程數(shù)量如何確定载弄?
* 演示線程池基本操作
* @author Fooisart
* Created on 2018-12-11
public class ExecutorServiceDemo {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);
Runnable task = new Runnable() {
public void run() {
System.out.println("The real task is executing!");
- ExecutorService
- Executors
- Executors 看一下源碼注釋突颊,
Factory and utility methods for Executor
ExecutorService 不貼它源碼了堤尾。從他的名字可以看出,它是一個服務(wù)(Service)迁客,為線程池服務(wù)郭宝。它是一個接口,提供了一些操作線程池的接口方法掷漱。
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
return ftask;
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- threadFactory : 用來創(chuàng)建線程蒿叠,一般是用默認即可明垢。除非你想把你線程池中的線程編號,或者有規(guī)律地命名市咽。
- handler : 拒絕策略痊银。所謂的拒絕策略,即當我們自己所定義的線程池無法再添加任務(wù)時(什么時候無法添加一會說)施绎,那么會使用拒絕策略溯革。看一下源碼中對拒絕策略的說明:
1,In the default ThreadPoolExecutor.AbortPolicy, the handler throws a runtime
RejectedExecutionException upon rejection.
2,In ThreadPoolExecutor.CallerRunsPolicy, the thread that invokes execute
itself runs the task. This provides a simple feedback control mechanism that will
slow down the rate that new tasks are submitted.
3,In ThreadPoolExecutor.DiscardPolicy, a task that cannot be executed is simply dropped.
4,In ThreadPoolExecutor.DiscardOldestPolicy, if the executor is not shut down,
the task at the head of the work queue is dropped, and then execution is retried
(which can fail again, causing this to be repeated.)
- corePoolSize : 核心線程數(shù)园匹,即一直存活在線程池中。當有新任務(wù)提價時劫灶,開始創(chuàng)建裸违,直至創(chuàng)建的線程數(shù)達到corePoolSize值。
- workQueue : 工作隊列本昏,如果說線程數(shù)已經(jīng)達到了corePoolSize,此時供汛,又有新任務(wù)進入,咋辦涌穆?那么這些任務(wù)會進入到等待隊列中怔昨。
- maximumPoolSize : 最大線程數(shù)。前面提到了工作隊列宿稀,既然是隊列趁舀,就會有滿的那一天。滿了之后祝沸,咋辦矮烹?則會接著創(chuàng)建臨時線程(有點像快遞公司在大促期間找的臨時工),臨時處理一下這些任務(wù)罩锐。
- unit : 時間單位奉狈。
- keepAliveTime : 存活時間。既然是存活時間涩惑,那代表什么存活時間呢仁期?代表的是,上面提到的臨時線程存活時間竭恬,配合時間單位(unit)參數(shù)跛蛋,確定臨時線程干完活之后,還能活多久(快遞公司大促期間雇的臨時工痊硕,雇傭時間)问芬。
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
* 介紹:該方法是任務(wù)執(zhí)行寿桨,從注釋中看出任務(wù)會在某個時間點執(zhí)行此衅。所有這個方法,只是保證了任務(wù)
* 提交亭螟。提交了之后挡鞍,具體如何執(zhí)行,還得參照線程池中線程的實際情況预烙。是立即執(zhí)行還是排隊,
* 是用已有線程執(zhí)行扁掸,還是創(chuàng)建新線程處理它翘县,都是未知的最域。
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
* Proceed in 3 steps:
* 以下為線程池的核心三步,仔細看上文中7個參數(shù)的介紹锈麸,下面三步也就容易理解了
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
* 第一步镀脂,如果線程池中線程數(shù)量小于核心線程數(shù),則創(chuàng)建線程忘伞,然后調(diào)用執(zhí)行薄翅。
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
* 第二步,達到核心線程數(shù)之后氓奈,加入隊列翘魄。加入隊列成功后,等待被取出執(zhí)行舀奶。
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
* 第三步暑竟,如果隊列滿了(加入隊列失敗)育勺,繼續(xù)創(chuàng)建線程光羞,直至達到max
* 線程數(shù)。如果達到max線程數(shù)怀大,開始使用某一種拒絕策略,拒絕任務(wù)
* ctl這個參數(shù)使用了二進制方式存了線程池的兩個信息:
* ①當下線程池中線程數(shù)量呀闻;②線程池狀態(tài)化借,比如線程池關(guān)閉了等。
* workerCountOf捡多,獲取線程池線程數(shù)量;
* isRunning() 判斷線程池是否處于運行態(tài)
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
c = ctl.get();
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
else if (!addWorker(command, false))
private boolean addWorker(Runnable firstTask, boolean core) {
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
* 其實這里的firstTask是我們最初傳進來要執(zhí)行的任務(wù),這里把它封裝成了一個Worker胞皱。
* 看Worker源碼可以知道邪意,其實它是Runnable接口的一個實現(xiàn)類九妈。在這里,Worker充當?shù)? * 是一個線程的角色雾鬼,它參數(shù)中有thread萌朱。那么線程的啟動肯定要調(diào)用start()方法。
* 在下方也確實找到了start()方法呆贿,啟動之后嚷兔,根據(jù)線程的基本知識,肯定是進入了線程
* 的run()方法做入。
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
} finally {
if (workerAdded) {
workerStarted = true;
} finally {
if (! workerStarted)
return workerStarted;
public void run() {
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
* firstTask是我們自己提交的任務(wù)
* 可以看到竟块,下午中task直接調(diào)用了run()方法壶运。根據(jù)線程的基礎(chǔ)知識,只用調(diào)用線程的start()方法浪秘,
* 才會啟動一個線程蒋情。而調(diào)用線程的run()方法,則會當做普通的方法執(zhí)行耸携,不會異步執(zhí)行棵癣。
* 其實在這里就解釋了任務(wù)提交與執(zhí)行分離。線程池幫我們創(chuàng)建了線程夺衍,然后使用它創(chuàng)建的線程串行地
* 調(diào)用了我們的run()方法狈谊,從而執(zhí)行了我們的任務(wù)。
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
} finally {
task = null;
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
