結(jié)合JDK(1.8)源碼分析線程池(ThreadPoolExecutor)實現(xiàn)原理
我們平時所提的線程池乖菱,大多指的是ThreadPoolExecutor静陈,而非ThreadPool,關(guān)于ThreadPool這里不做贅敘螟碎。集中探討一下ThreadPoolExecutor眉菱。
- 帶著問題解讀源碼,先看問題:
- JDK中有多種線程池,我們應(yīng)該使用哪一種線程池掉分?選擇的依據(jù)是什么俭缓?
- 線程池中的線程會一直存活著嗎?
- 線程池飽和之后酥郭,無法接收新任務(wù)华坦,線程池如何處理這部分任務(wù)(線程池拒絕策略)?
- 線程池是如何實現(xiàn)將任務(wù)提交與任務(wù)執(zhí)行分離的不从?
- 線程池中使用了“生產(chǎn)者-消費者”模式惜姐,那么它是如何實現(xiàn)的?
- 線程池中的各個參數(shù)分別代表什么消返?并且線程池中的線程數(shù)量如何確定载弄?
如果上述問題,很清楚的話撵颊,也請同學留步宇攻,下面是從源碼角度來分析上述問題。
為了便于后續(xù)描述倡勇,先貼出使用線程池的基本代碼:
/**
* 演示線程池基本操作
* @author Fooisart
* Created on 2018-12-11
*/
public class ExecutorServiceDemo {
public static void main(String[] args) {
//1逞刷,線程池創(chuàng)建
ExecutorService executor = Executors.newFixedThreadPool(2);
//2嘉涌,新建任務(wù)
Runnable task = new Runnable() {
@Override
public void run() {
System.out.println("The real task is executing!");
}
};
//3,提交任務(wù)到線程池
executor.submit(task);
//4夸浅,線程池關(guān)閉
executor.shutdown();
}
}
上述代碼仑最,演示了線程池的一個簡單使用方式。文首提到帆喇,線程池一般指的是ThreadPoolExecutor,而上述示例代碼中警医,并未出現(xiàn)ThreadPoolExecutor。那是不是表明坯钦,在Java中是不是還有其他類表示線程池呢预皇?其實不然,示例代碼中婉刀,有兩個類:
- ExecutorService
- Executors
分別解釋一下吟温,這哥倆與ThreadPoolExecutor的關(guān)系。
- Executors 看一下源碼注釋突颊,
Factory and utility methods for Executor
第一句話表明其真正意義鲁豪,表明自己是一個工具類,進一步說是一個工廠類律秃。這個工廠的產(chǎn)品是—ThreadPoolExecutor爬橡。組裝不同的參數(shù),生產(chǎn)出不同功能的線程池友绝。Executors源碼解讀.
-
ExecutorService 不貼它源碼了堤尾。從他的名字可以看出,它是一個服務(wù)(Service)迁客,為線程池服務(wù)郭宝。它是一個接口,提供了一些操作線程池的接口方法掷漱。
結(jié)合示例代碼粘室,分析ThreadPoolExecutor源碼。
第1步創(chuàng)建線程池卜范,已解釋衔统。第2部創(chuàng)建任務(wù),線程的基礎(chǔ)知識海雪〗蹙簦看第3步,提交任務(wù)到線程池奥裸,跟進(AbstractExecutorService):
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
一共四行代碼险掀,第二行是任務(wù)封裝(封裝成FutureTask);第三行湾宙,execute樟氢,把上一步封裝好的任務(wù)冈绊,在此執(zhí)行。(BTW埠啃,之前讀過1.7源碼死宣,這個方法是在ThreadPoolExecutor中的,1.8抽象出了一個AbstractExecutorService)碴开。繼續(xù)跟進毅该,進入ThreadPoolExecutor.execute()方法。此時潦牛,正式進入ThreadPoolExecutor,它是線程池的核心類鹃骂,先簡要介紹一下ThreadPoolExecutor。
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
ThreadPoolExecutor中有多個構(gòu)造函數(shù)罢绽,其中有一個最基礎(chǔ)的構(gòu)造函數(shù),貼出的是它的說明静盅。其中有7個參數(shù)良价,分別介紹,先從簡單地講起:
- threadFactory : 用來創(chuàng)建線程蒿叠,一般是用默認即可明垢。除非你想把你線程池中的線程編號,或者有規(guī)律地命名市咽。
- handler : 拒絕策略痊银。所謂的拒絕策略,即當我們自己所定義的線程池無法再添加任務(wù)時(什么時候無法添加一會說)施绎,那么會使用拒絕策略溯革。看一下源碼中對拒絕策略的說明:
1,In the default ThreadPoolExecutor.AbortPolicy, the handler throws a runtime
RejectedExecutionException upon rejection.
簡介:此為默認拒絕策略谷醉,會拋異常
2,In ThreadPoolExecutor.CallerRunsPolicy, the thread that invokes execute
itself runs the task. This provides a simple feedback control mechanism that will
slow down the rate that new tasks are submitted.
簡介:把并行改串行致稀,顯然會降低線程池的執(zhí)行效率。與策略1相比俱尼,優(yōu)點是不
會拋異常抖单,已不會丟任務(wù),缺點也很明細遇八,太慢啦矛绘!
3,In ThreadPoolExecutor.DiscardPolicy, a task that cannot be executed is simply dropped.
簡介:這個很痛快,直接拋棄任務(wù)刃永,也沒有異常货矮。效率是兼顧了,正確性與完整
性丟失了揽碘,或者起碼得讓我知道次屠,任務(wù)被拋棄了
4,In ThreadPoolExecutor.DiscardOldestPolicy, if the executor is not shut down,
the task at the head of the work queue is dropped, and then execution is retried
(which can fail again, causing this to be repeated.)
簡介:無
- corePoolSize : 核心線程數(shù)园匹,即一直存活在線程池中。當有新任務(wù)提價時劫灶,開始創(chuàng)建裸违,直至創(chuàng)建的線程數(shù)達到corePoolSize值。
- workQueue : 工作隊列本昏,如果說線程數(shù)已經(jīng)達到了corePoolSize,此時供汛,又有新任務(wù)進入,咋辦涌穆?那么這些任務(wù)會進入到等待隊列中怔昨。
- maximumPoolSize : 最大線程數(shù)。前面提到了工作隊列宿稀,既然是隊列趁舀,就會有滿的那一天。滿了之后祝沸,咋辦矮烹?則會接著創(chuàng)建臨時線程(有點像快遞公司在大促期間找的臨時工),臨時處理一下這些任務(wù)罩锐。
- unit : 時間單位奉狈。
- keepAliveTime : 存活時間。既然是存活時間涩惑,那代表什么存活時間呢仁期?代表的是,上面提到的臨時線程存活時間竭恬,配合時間單位(unit)參數(shù)跛蛋,確定臨時線程干完活之后,還能活多久(快遞公司大促期間雇的臨時工痊硕,雇傭時間)问芬。
介紹完線程池各個參數(shù)的意義,回到ThreadPoolExecutor.execute方法:
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
* 介紹:該方法是任務(wù)執(zhí)行寿桨,從注釋中看出任務(wù)會在某個時間點執(zhí)行此衅。所有這個方法,只是保證了任務(wù)
* 提交亭螟。提交了之后挡鞍,具體如何執(zhí)行,還得參照線程池中線程的實際情況预烙。是立即執(zhí)行還是排隊,
* 是用已有線程執(zhí)行扁掸,還是創(chuàng)建新線程處理它翘县,都是未知的最域。
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
* 以下為線程池的核心三步,仔細看上文中7個參數(shù)的介紹锈麸,下面三步也就容易理解了
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
* 第一步镀脂,如果線程池中線程數(shù)量小于核心線程數(shù),則創(chuàng)建線程忘伞,然后調(diào)用執(zhí)行薄翅。
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 第二步,達到核心線程數(shù)之后氓奈,加入隊列翘魄。加入隊列成功后,等待被取出執(zhí)行舀奶。
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
* 第三步暑竟,如果隊列滿了(加入隊列失敗)育勺,繼續(xù)創(chuàng)建線程光羞,直至達到max
* 線程數(shù)。如果達到max線程數(shù)怀大,開始使用某一種拒絕策略,拒絕任務(wù)
*/
/**
* ctl這個參數(shù)使用了二進制方式存了線程池的兩個信息:
* ①當下線程池中線程數(shù)量呀闻;②線程池狀態(tài)化借,比如線程池關(guān)閉了等。
*
* workerCountOf捡多,獲取線程池線程數(shù)量;
* isRunning() 判斷線程池是否處于運行態(tài)
*/
int c = ctl.get();
//第一步
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//第二步
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//第三步
else if (!addWorker(command, false))
reject(command);
}
源碼中已添加了一些我的個人理解蓖康,應(yīng)該很容易理解了±菔郑回想一下蒜焊,咱們使用線程池,主要是為了高效地執(zhí)行任務(wù)科贬。執(zhí)行任務(wù)泳梆,才是我們最關(guān)心的“裾疲可是至今优妙,仍未明顯地看到任務(wù)執(zhí)行,所以繼續(xù)跟進憎账。我已經(jīng)趟平了套硼,直接進入addWorder():
private boolean addWorker(Runnable firstTask, boolean core) {
//線程創(chuàng)建前的一些合理性校驗
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//任務(wù)執(zhí)行
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
/**
* 其實這里的firstTask是我們最初傳進來要執(zhí)行的任務(wù),這里把它封裝成了一個Worker胞皱。
* 看Worker源碼可以知道邪意,其實它是Runnable接口的一個實現(xiàn)類九妈。在這里,Worker充當?shù)? * 是一個線程的角色雾鬼,它參數(shù)中有thread萌朱。那么線程的啟動肯定要調(diào)用start()方法。
* 在下方也確實找到了start()方法呆贿,啟動之后嚷兔,根據(jù)線程的基本知識,肯定是進入了線程
* 的run()方法做入。
*/
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
根據(jù)上文代碼注釋中冒晰,進入Worker.run()方法:
public void run() {
runWorker(this);
}
繼續(xù)跟進runWorker(),
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
/**
* firstTask是我們自己提交的任務(wù)
*
* 可以看到竟块,下午中task直接調(diào)用了run()方法壶运。根據(jù)線程的基礎(chǔ)知識,只用調(diào)用線程的start()方法浪秘,
* 才會啟動一個線程蒋情。而調(diào)用線程的run()方法,則會當做普通的方法執(zhí)行耸携,不會異步執(zhí)行棵癣。
* 其實在這里就解釋了任務(wù)提交與執(zhí)行分離。線程池幫我們創(chuàng)建了線程夺衍,然后使用它創(chuàng)建的線程串行地
* 調(diào)用了我們的run()方法狈谊,從而執(zhí)行了我們的任務(wù)。
*/
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//getTask()此處是不斷地獲取隊列中的任務(wù)沟沙。
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
至此河劝,線程池已基本分析完了。文章開頭介紹的那幾個問題矛紫,沒有一一解釋赎瞎,但如果是認真讀到這里的話,那幾個問題應(yīng)該已經(jīng)了然于胸了颊咬。
As always and let me know what you think, leave the comments below. Bye :)