前言
在前面一節(jié)JDK線程池(一):體系結構中已經分析了JDK的線程池核心接口的組成,通過那些接口的描述忧风,我們知道線程池它所提供的功能默色,而本文將圍繞JDK中的線程池是如何具體實現這些功能展開分析。
(PS:由于ThreadPoolExecutor的具體實現比較復雜狮腿,如果分析存在有誤的地方腿宰,請拍磚!)
本文將以如下內容作為大綱進行分析說明:
- ThreadPoolExcutor的核心組成
- 任務的提交的執(zhí)行流程分析
- RejectedExecutionHandler分析
- ThreadFactory分析
1.走進ThreadPoolExecutor的世界
首先讓我們看看ThreadPoolExecutor的繼承結構:
通過上圖我們我們可以看到ThreadPoolExecutor繼承了抽象類AbstractExecutorService呕诉,從而完成對ExecutorService接口的實現,而AbstractExecutorService只是提供了一些模板方法的實現吃度,具體的處理細節(jié)都還是落實到ThreadPoolExecutor中甩挫。
1.1 核心參數
由于線程池要應對不同的負載情況,ThreadPoolExecutor為了更好的適配不同的場景椿每,因此其提供了很多的可調節(jié)的參數伊者,讓用戶根據實際的負載情況進行調節(jié)。這些核心參數需要在創(chuàng)建ThreadPoolExecutor時通過構造方法來進行指定间护,ThreadPoolExecutor中提供了4個重載的構造方法亦渗,下面讓我們看看ThreadPoolExecutor中最復雜的一個的構造方法的實現(其余的構造方法底層都是調用下面的這個):
/*
(1).corePoolSize:設置一個線程池中的核心線程數
如果設置allowCoreThreadTimeOut為false的情況下:
即使當線程池中的線程處于空閑狀態(tài),這些線程也不會被線程池中移除兑牡。
如果設置了allowCoreThreadTimeOut為true,
那么當核心線程在空閑了一段時間后依舊沒有用于工作央碟,那么將會從線程池中移除。
注意:(allowCoreThreadTimeOut默認為false均函,通常情況下也無需做修改)
(2).maximumPoolSize:線程池中所允許創(chuàng)建最大線程數量
(3).keepAliveTime:當線程池中的線程數量大于核心線程數亿虽,
如果這些多出的線程在經過了keepAliveTime時間后,
依然處于空閑狀態(tài)苞也,那么這些多出的空閑線程將會被結束其生命周期洛勉。
(4).unit:keepAliveTime的時間單位
(5).workQueue:用于存放任務的阻塞隊列,當線程池中的核心線程都處在執(zhí)行任務時如迟,
提交的任務將被存儲在workQueue進行緩沖收毫。
該隊列只能存放通過execute方法提交的Runnable任務。
(6).threadFactory:線程池中用于創(chuàng)建線程的工廠
在這里使用線程工廠的目的也是為了解耦,將創(chuàng)建的實現細節(jié)通過工廠進行封裝殷勘,
而不是直接將創(chuàng)建的方式固化在ThreadPoolExecutor本身的代碼中此再。
(7)handler:當線程池中的線程數量達到最大并且阻塞隊列也已經滿了無法再添加任務時,
線程池所采取的處理策略玲销。
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//對給定的核心參數進行合法性的校驗
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
ThreadPoolExecutor根據上述給定的參數输拇,會根據實際的負載情況對一個線程池中的實際工作線程做出動態(tài)調正,我們可以通過getPoolSize()
方法來查看當前線程池中實際的線程數量贤斜。
/**
* 返回當前線程池中實際線程的數量
*
* @return the number of threads
*/
public int getPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
/*根據當前線程池的狀態(tài)進行判斷,如果線程池已經處
于terimated狀態(tài)時策吠,則返回0,否則就通過worker集合(底層實際是一個HaseSet)中
返回當前線程的數量瘩绒。
*/
return runStateAtLeast(ctl.get(), TIDYING) ? 0
: workers.size();
} finally {
mainLock.unlock();
}
}
上面的這些核心參數除了在ThreadPoolExecutor創(chuàng)建時指定猴抹,在后期運行的過程中也可以進行動態(tài)的修改,只是提供其對應的setXXX方法完成修改即可锁荔,從這點我們可以很好的感覺到ThreadPoolExecutor實現的靈活性蟀给。
1.2 線程池的狀態(tài)與工作線程數量
橫看成嶺側成峰,遠近高低各不同。
普通程序猿看整數坤溃,它可能就是占4個字節(jié)的數字(Java中)拍霜,而大神看整數卻能看出不同的風景。在ThreadPoolExecutor中使用了一種非常巧妙的方式對表示線程池的狀態(tài)和工作線程的數量薪介。
在ThreadPoolExecutor中,使用了一個AtomicInteger對將當前線程的工作狀態(tài)和工作線程數量(有效線程數)使用同一個整數進行包裝越驻。
為了將兩個數值包裝在同一個整數中汁政,它將32位的高3位表示線程的狀態(tài)值,而后29位來表示線程的數量缀旁。
這也意味著记劈,在ThreadPoolExecutor中最多可以存在線程數實際為2^29-1個,當然這個只是理論值并巍,實際的應用根本不可能有這么多線程數量目木。
設計思想與目的:
也許很多人會覺得這樣的設計有點奇怪,因為不就是表示2個信息嘛懊渡,我的線程數量用個int來表示刽射,而線程狀態(tài)用個byte來表示不就OK嘛,不就多浪費一個字節(jié)數量而已嘛剃执。其原因在于誓禁,線程的狀態(tài)和數量往往需要同時更新,然而線程池天生處在一個并發(fā)的環(huán)境下肾档,那么當對2個變量進行修改時摹恰,那么就勢必需要通過鎖來進行線程安全的處理,從而保證2個變量修改具備原子性怒见;但是這種做法對于性能的影響是非常嚴重的印颤,因此在ThreadPoolExecutor將兩個變量的分別包裝在一個變量中,最后的并發(fā)操作發(fā)生在AtomicInteger上尿扯,而AtomicInteger恰恰就是具有一個無鎖原子操作類,這樣既可以解決線程安全的問題殉摔,又可以規(guī)避避免所的使用,從而提供性能配阵。
下面是ThreadPoolExecutor中對狀態(tài)和線程數量的源碼馏颂,這里使用的是JDK1.7,在第375行。
/*
使用AtomicInteger來對實際的線程數量(workCount)
以及這個線程池的狀態(tài)(runState)棋傍,
該值默認為111...000(29個0)救拉,每增加一個線程,ctl值就會+1
使用后29位來保存線程數量瘫拣。
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
/*
使用int的bit數量減去3亿絮,即32-3=29
*/
private static final int COUNT_BITS = Integer.SIZE - 3;
/*
線程池中工作線程的最大數量為2^29-1
*/
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
/*
使用如下整型值來表示線程池的狀態(tài)
RUNNING:當處于RUNNING狀態(tài)時,
線程池可以接受新的任務并且會執(zhí)行任務隊列中的任務。
*/
private static final int RUNNING = -1 << COUNT_BITS;
/*
SHUTDOWN:不再接受新的任務派昧,但是會繼續(xù)處理隊列中還沒有處理完成的任務
*/
private static final int SHUTDOWN = 0 << COUNT_BITS;
/*
STOP:不再接受新的任務黔姜,
并且不會繼續(xù)處理隊列中還沒有處理完成的任務,
同時還會去中斷當前正在執(zhí)行的任務蒂萎。
*/
private static final int STOP = 1 << COUNT_BITS;
/*
TIDYING:所有的任務都已經結束秆吵,并且workCount的數量為0。
*/
private static final int TIDYING = 2 << COUNT_BITS;
/*
當線程池的狀態(tài)變到TERMINATED狀態(tài)后五慈,ThreadPoolExecutor
提供了一個terminated()方法供用戶進行擴展實現纳寂。我們可以通過這個方法記錄線程池關閉等信息。
通過上面分析可以自己手動進行運算一下泻拦,會得到如下的結果:
(1).當線程池處于RUNNING時毙芜,ctl值小于0,
(2).而當線程池處于其他狀態(tài)時争拐,則ctl將大于等于0腋粥。
*/
private static final int TERMINATED = 3 << COUNT_BITS;
/*
使用一個整數值進行兩個信息的包裝與拆解的過程
獲取線程的狀態(tài),取32位的前3位即可架曹。
即就ctl與11100....(29個0)進行按位與隘冲。
*/
private static int runStateOf(int c) { return c & ~CAPACITY; }
/*
取線程的數量,即取后29位的值音瓷,即將ctl與
00011...(29個1)進行按位與運算对嚼。
*/
private static int workerCountOf(int c) { return c & CAPACITY; }
/*
將兩個狀態(tài)值組裝成一個整數
通過rs(狀態(tài)值)與wc(workCount)值進行或運算,它們各自獨立
不會產生相互的響應绳慎。
*/
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
/*
對線程池狀態(tài)的判斷方法纵竖,此時無需對ctl進行拆分獲前3位進行比較。
因為SHUTDOWN的值為0杏愤,而只要ctl小于0靡砌,則說明線程池就處于運行狀態(tài)。
*/
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
2.看我如幫你完成任務
之前我們可能將一個任務丟給線程池就不管了珊楼,它的底層執(zhí)行對于我們來說可能是一個黑匣子通殃;下面我們來深入的看看一個任務是如何被提交到線程池中,如何被線程池的線程所執(zhí)行厕宗,我們以execute(Runnable)方法作為例子來進行分析画舌。
查看ThreadPoolExecutor的executor方法:
public void execute(Runnable command) {
//判斷提交的任務是否為null
if (command == null)
throw new NullPointerException();
//獲取到ctl值
int c = ctl.get();
/*
通過ctl值進行拆解,獲取到具體的線程池中實際的線程數量,
判斷其是否小于用戶所執(zhí)行的corePoolSize已慢,如果小于則
直接創(chuàng)建一個線程進行執(zhí)行(通過addWorker(command,true)方法來處理)
否則就繼續(xù)執(zhí)行下面代碼
*/
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
/*
判斷線程池中的狀態(tài)是否為RUNNING,如果是
則將任務提交到任務隊列中佑惠,如果提交任務隊列成功齐疙,
則會對線程池的狀態(tài)進行一次重復檢查,再次檢查當前線程的狀態(tài)以及實際的線程數量贞奋。
提交失敗可能是由于任務隊列已經滿了,從而無法提交轿塔。
*/
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/*
當提交任務隊列失敗,則會去判斷當前線程池線程是否已經到達最大值
如果未到達催训,則創(chuàng)建一個線程繼續(xù)執(zhí)行,否則則執(zhí)行拒絕處理。
*/
else if (!addWorker(command, false))
reject(command);
}
/*
根據給定的任務來創(chuàng)建線程,創(chuàng)建的過程中會根據實際的線程數量以及狀態(tài)來判斷是否去創(chuàng)建亚兄。
core:當core為true,判斷當前線程池中實際線程數是否大于corePoolSize审胚,如果大于匈勋,則不執(zhí)行。
當core為false膳叨,判斷當前線程池中實際線程數是否大于maximumPoolSize洽洁,如果大于,則不執(zhí)行菲嘴。
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//獲取實際的線程數量
int wc = workerCountOf(c);
/*
判斷是否大于2^29-1,同時根據給定的core參數饿自,
來選擇到底是與corePoolSize還是maximumPoolSize進行比較
*/
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//當比較成功,則對工作線程數+1龄坪,跳出循環(huán)
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
/*
完成對任務的真正執(zhí)行
*/
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
/*
將任務封裝在一個Worker中昭雌,這里的Worker是
ThreadPoolExecutor中所定義的一個Runnable實現類。
*/
w = new Worker(firstTask);
/*
worker中同時封裝了線程對象,該線程對象是從線程工廠中所獲取
因此Worker的數量和線程池中線程的數量是一一對應的健田。
*/
final Thread t = w.thread;
if (t != null) {
mainLock.lock();
try {
int c = ctl.get();
int rs = runStateOf(c);
//對狀態(tài)進行重復檢測,這里具體的細節(jié)我們暫且忽略烛卧,只關注任務什么時候被執(zhí)行成功。
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
/*
通過一個HashSet專門存儲Worker妓局,本質上是存儲線程,
因為每一個Worker底層都維護著一個線程
*/
workers.add(w);
/*
獲取當前的Set集合的大小总放,用戶統(tǒng)計線程池中線程數量達到最高峰時的線程數,
使用largestPoolSize來存儲好爬。
*/
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
//設置任務添加成功,只有該值為true時局雄,任務才會真正得到執(zhí)行
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//執(zhí)行任務
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
總結線程池任務提交的過程:
通過上面的代碼分析,我們大體上可以知道一個任務提交到線程池中會經歷如下的3步過程:
- 如果線程池中實際的線程數量小于corePoolSize,那么就啟動一個新的線程進行任務的處理抵拘。
- 如果線程池中實際的線程數量大于等于corePoolSize哎榴,則將任務放置到任務隊列中進行處理型豁。
- 如果由于任務隊列已經滿了,無法再存放新的任務尚蝌,則判斷線程池中實際的線程數量是否大于maximumPoolSize迎变,如果小于,則創(chuàng)建新的線程執(zhí)行飘言,否則將拒絕執(zhí)行任務衣形。
3.負載過高我該怎么辦?
上面我們已經提到了當線程池實際的數據量到達最大值時姿鸿,如果再次提交新的執(zhí)行谆吴,則會拒絕執(zhí)行。那么ThreadPoolExecutor是如何拒絕執(zhí)行的呢苛预?
ThreadPool中默認的拒絕策略使用的是中斷策略句狼,即當無法接哦受新的任務時热某,直接拋出RejectedExecutionException異常。
/**
* ThreadPool中默認定義的RejectedExecutionHandler
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
AbortPolicy實現了java.util.concurrent.RejectedExecutionHandler接口筹吐。
public interface RejectedExecutionHandler {
//當ThreadPoolExecutor無法接受新的任務時秘遏,將會執(zhí)行該方法完成任務的拒絕執(zhí)行。
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
/*
AbortPolicy的執(zhí)行拒絕策略非常簡單洋侨,當無法再次接受新的任務時铡俐,就拋出一個異常审丘。
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
除了AbortPolicy,ThreadPoolExecutor中還定義3個RejectedExecutionHandler的實現類锅知,它們分別是DiscardPolicy脓钾、DiscardOldestPolicy可训、CallerRunsPolicy捶枢,下面是這3個實現類所對應的源碼:
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
//DiscardPolicy的處理方式非常簡單烂叔,直接忽略提交的任務
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
/*
DiscardOldestPolicy首先判斷線程池是否已經關閉蒜鸡,
如果未關閉牢裳,則將任務隊列中之前提交的任務移除,將
新提交的任務加入到隊列中
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
/*
CallerRunsPolicy所采取的策略是不再啟動新的線程忘朝,
而是讓當前提交任務的線程直接自己去處理這個任務
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
在實際的應用中辜伟,我們還可以自己去擴展RejectedExecutionHandler脊另,根據自己的業(yè)務需求來進行相應的處理偎痛,將拒絕的信息通過日志記錄從而方便后期進行參數調優(yōu)独郎。
4.線程從何而來?
我們在分析ThreadPoolExecutor的參數時谓谦,提到了一個ThreadFactory的東東贪婉,它是一個用于創(chuàng)建線程的工廠,該接口的定義也非常的簡單才顿,里面只有一個方法。
public interface ThreadFactory {
Thread newThread(Runnable r);
}
在ThreadPoolExecutor中郑气,默認所使用的ThreadFactory是Executors中所定義的DefaultThreadFactory,該方法的具體實現如下:
public Thread newThread(Runnable r) {
//設置線程組以及線程的名字
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
//設置線程為前臺線程尾组,同時設置線程的優(yōu)先級
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
ThreadFactory在ThreadPool中主要用于Worker中,在執(zhí)行任務的時候呵萨,用戶所提交的任務會被包裝成一個Worker來進行執(zhí)行爷耀。而Worker內部維護著一個線程對象,這個線程對象就是從ThreadFactory中所得到的跑杭。
/*
Worker的構造方法咆耿,將用戶提交的任務進行封裝
*/
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
//通過獲取線程工廠來獲取到線程
this.thread = getThreadFactory().newThread(this);
}
至此,ThreadPoolExecutor中比較核心的內容就分析到這里窄做,如果發(fā)現在分析的過程中存在問題椭盏,請及時指正吻商!后面的內容將分析一下線程池工具類--Executors以及JDK1.7推出的ForkJoinPool。