閱讀任何源碼开伏,我們都應該帶著幾個問題去閱讀栅迄,從源碼中找出這些問題的答案,這樣才能徹底搞明白某個知識點蚕键。
下面我們就帶著這樣幾個問題鲫趁,一起看一下ThreadPoolExecutor的源碼
- 為什么要用線程池
- 為什么不推薦使用juc直接創(chuàng)建的線程池
- 線程池的幾個核心參數(shù)
- 線程池是什么時候創(chuàng)建線程的轴术?
- 線程池是如何重復利用線程的?
- 任務提交的順序和執(zhí)行的順序是一樣的嗎洗贰?
1寄猩、為什么要使用線程池
這個其實可以寫一個簡單的程序去跑一下嫉晶,比如使用線程池去跑1000個task和開1000個線程去跑這1000個task,線程池的效率會高出很多倍田篇,原因是線程池能夠重復利用線程,沒有創(chuàng)建和銷毀線程的開銷箍铭。
其實池化的技術在很多地方都會用到比如數(shù)據(jù)庫的連接池泊柬,字符串常量池,netty的對象池等等
2诈火、為什么不推薦使用juc直接創(chuàng)建線程池的方式
我們找兩個Exectors創(chuàng)建線程池的源碼
a兽赁、newCachedTreadPool的源碼
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
可以看到這里的最大線程數(shù)是0xffff個,這個最大線程數(shù)在大并發(fā)提交任務的情況下會創(chuàng)建大量線程冷守,會導致CPU100%
b刀崖、newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
//看一下LinkedBlockingQueue的實現(xiàn)
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
會初始化一個容量為0xffff的隊列,由于這個隊列太大拍摇,如果我們提交的任務數(shù)很多并且自定義的線程里的對象又很大的話亮钦,就很容易發(fā)生oom的問題。
從這個工具類創(chuàng)建線程的參數(shù)我們可以看到底層調(diào)用的都是ThreadPoolExecutor的構造方法充活,所以我們建議根據(jù)具體業(yè)務的規(guī)模設置合適的線程池參數(shù)蜂莉。
new ThreadPoolExecutor()
3、線程池的幾個核心參數(shù)
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
corePoolSize:最大線程數(shù)
maximumPoolSize:最大線程數(shù)
keepAiveTime:線程存活時間
TimeUnit:存活時間的參數(shù)
BlockingQueue:線程池的任務隊列
task投遞到線程池中的整個過程如下
看一下具體的代碼
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//這里判斷線程數(shù)量是否小于corePoolSize混卵,如過小于corePoolSize則直接創(chuàng)建worker線程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//到這個if說明worker線程數(shù)量大于了corePoolSize了映穗,這里直接添加到任務隊列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//到這個if說明任務加入隊列失敗,隊列滿了幕随,則再創(chuàng)建線程worker線程蚁滋,如果創(chuàng)建失敗則執(zhí)行拒絕策略
else if (!addWorker(command, false))
reject(command);
}
看到這里從宏觀上我們就能看到整個task投遞到線程池的一個過程,其實這里最主要的方法是addWorker,其實addworker里才是線程池的精華辕录,里面有如何創(chuàng)建線程及start的邏輯澄阳,如何回收過期的線程,如何重復利用創(chuàng)建的線程去運行task的邏輯
4踏拜、線程池是什么時候創(chuàng)建線程的
線程池的線程不是線程池創(chuàng)建的時候創(chuàng)建的碎赢,線程池的線程是在調(diào)用addWorker方法,并且addWorker執(zhí)行成功才會創(chuàng)建
下面我們一起分析一下addworker代碼速梗,這個方法很長肮塞,其實要看明白這個方法我們要先看一下Worker這個類,可以看到這個類實際上實現(xiàn)了Runnable接口姻锁,其實線程池中運行的runnable任務都會被包裝成Worker對象
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
//當前worker會綁定一個線程
final Thread thread;
//當前worker處理的第一個任務
Runnable firstTask;
volatile long completedTasks;
//創(chuàng)建worker時就會生成 一個線程及賦值firstTask枕赵,
//注意線程池的線程就是在這里被創(chuàng)建的
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//注意看這里,這個線程創(chuàng)建的時候傳的是this位隶,也就意味著一會this.thread.start時拷窜,
//執(zhí)行的是worker對象的run方法
//這個大家想一下,回味一下
this.thread = getThreadFactory().newThread(this);
}
}
下面在具體分析一下addWorker是如何start線程及重復利用線程運行任務的
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
//獲取worker數(shù)量
int c = ctl.get();
//獲取線程池狀態(tài)
int rs = runStateOf(c);
// 如果線程池狀態(tài)為SHUTDOWN就不接收任務了直接returnfalse
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//這里會判斷一下worker數(shù)量
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//如果代碼運行到了這里涧黄,就表示線程池狀態(tài)正常篮昧,任務達到了創(chuàng)建worker線程的條件
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//這里會創(chuàng)建一個worker對象,
//結合上面的代碼笋妥,worker對象中會包含一個線程和一個firstTask
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//下面這一部分是用來判斷需不需要將worker線程進行緩存懊昨,給其他的任務使用
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//注意看這個workers對象,這是一個hashset春宣,用來緩存創(chuàng)建好的worker對象
//注意在強調(diào)一下這個worker對象包含一個Thread引用及Runnable引用
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//緩存好worker線程后酵颁,這里會執(zhí)行線程的start邏輯
//注意線程池的任務就是在這里開啟start使任務真正進入runnable狀態(tài)的
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
下面我們看一下start的具體邏輯:
剛剛我們已經(jīng)看到了,thread是worker對象的一個屬性月帝,實際上創(chuàng)建線程時躏惋,傳的參數(shù)就是worker對象本身,所以線程start執(zhí)行的邏輯就是worker對象的run方法
//worker的run方法很簡單嚷辅,下面我們看一下runWorker方法
public void run() {
runWorker(this);
}
看完這個我們就基本上看完了線程池的核心邏輯了簿姨,但是還有一些細節(jié)沒有仔細看,后面我會提到潦蝇,留給大家思考
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//獲取一下fisrtTask
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//這個task什么時候不為空款熬?
//注意這里會有一個隱含的問題,我們提交的任務存放的順序是 核心worker->隊列->最大線程worker
//這里的getTask是從隊列里獲取的任務
//這里task的執(zhí)行順序就變成了攘乒,核心worker->最大線程worker->隊列
//不知道大家有沒有get到我的點贤牛,可以做一個實驗就是為task編一個號,比如安1-10的順序提交任務则酝,最終執(zhí)行的結果卻是 1殉簸,2闰集,3,4般卑,8武鲁,9,10蝠检,5沐鼠,6,7這種順序叹谁,這里就是出現(xiàn)這種提交順序和執(zhí)行順序不一樣的原理
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//這里會最終調(diào)用task的run方法饲梭,到此線程池的創(chuàng)建到運行任務就看結束了
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//這里會銷毀線程,在隊列為空的時候焰檩,會銷毀線程憔涉,讓線程數(shù)量停留在corePoolsize范圍內(nèi),什么時候會執(zhí)行到這析苫,當task為空的時候兜叨,什么時候task為空,看getTask的邏輯衩侥,getTask會判斷隊列是否為空及活躍線程的時間來返回task的值国旷,這里就不細講了
processWorkerExit(w, completedAbruptly);
}
}
5、線程池是如何重復利用線程的
看完上面的分析顿乒,其實就能回答這個問題了议街,當任務提交時會創(chuàng)建worker對象,這個對象里會有綁定一個線程璧榄,同時會將worker對象放入workers Set中,創(chuàng)建成功后吧雹,會立馬調(diào)用worker.thread.start方法啟動線程骨杂,這個線程的run方法進行了包裝,首先判斷fisrtTask是否為空如果不為空則直接運行雄卷,否則會while循環(huán)拿緩存隊列中的任務搓蚪,知道緩存隊列為空,或者空閑線程超過了keepalive時間就會銷毀線程丁鹉,以保證線程維持在corePoolSize的大小
6妒潭、任務提交的順序和執(zhí)行的順序是一樣的嗎?
不一樣揣钦,提交順序是 核心worker線程->隊列->非核心worker線程雳灾,
但是執(zhí)行的順序卻是核心worker任務->非核心worker任務->隊列任務
如果能理解這個,線程池就真的理解的差不多了
7冯凹、其他
上面幾個問題有些是我在看源碼時的困惑點谎亩,有些是我看完源碼之后的一些想法,除了這些問題外,線程池還有很多精妙的地方比如匈庭,
a夫凸、線程池的狀態(tài)和核心線程數(shù)其實是用一個4個字節(jié)int表示的,為什么要這么表示
b阱持、線程池中使用到的設計模式有哪些
c夭拌、線程池本身就是并發(fā)場景下提交任務的,那它自己的安全性是如何保證的衷咽,execute方法是如何保證安全性的