線程池的基本結(jié)構(gòu)
就以ThreadPoolExecutor為例。當(dāng)我們把一個(gè)Runnable交給線程池去執(zhí)行的時(shí)候,這個(gè)線程池處理的流程是這樣的:
- 先判斷線程池中的核心線程們是否空閑,如果空閑嘿架,就把這個(gè)新的任務(wù)指派給某一個(gè)空閑線程去執(zhí)行挪钓。如果沒有空閑河质,并且當(dāng)前線程池中的核心線程數(shù)還小于 corePoolSize押蚤,那就再創(chuàng)建一個(gè)核心線程。
- 如果線程池的線程數(shù)已經(jīng)達(dá)到核心線程數(shù)羹应,并且這些線程都繁忙揽碘,就把這個(gè)新來的任務(wù)放到等待隊(duì)列中去。如果等待隊(duì)列又滿了园匹,那么
- 查看一下當(dāng)前線程數(shù)是否到達(dá)maximumPoolSize雳刺,如果還未到達(dá),就繼續(xù)創(chuàng)建線程裸违。如果已經(jīng)到達(dá)了掖桦,就交給RejectedExecutionHandler來決定怎么處理這個(gè)任務(wù)。
看一下代碼:
public void execute(Runnable command){
if(command == null){
throw new NullPointerException
}
int c = ctl.get();
if(workerCountOf(c) < corePoolSize){
if(addWorker(command,true))
return;
c = ctl.get();
}
if(isRunning(c) && workQueue.offer(command)){
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)){
reject(command);
}else if(workerCountOf(recheck) == 0){
addWorker(null,false);
}
}else if (! addWorker(command,false)){
reject(command);
}
}
工作線程和等待隊(duì)列
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
這段代碼里有幾個(gè)地方要注意的供汛,第一枪汪,我們可以使用beforeExecute和afterExecute這兩個(gè)方法去監(jiān)控任務(wù)的執(zhí)行情況,這些方法在ThreadPoolExecutor里都是空方法怔昨,我們可以重寫這些方法來實(shí)現(xiàn)線程池的監(jiān)控雀久。第二,就是線程的邏輯是不斷地執(zhí)行一個(gè)循環(huán)趁舀,去調(diào)用 getTask 方法來獲得任務(wù)
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
RejectedExecutionHandler
當(dāng)隊(duì)列和線程池都滿了的時(shí)候赖捌,再有新的任務(wù)到達(dá),就必須要有一種辦法來處理新來的任務(wù)矮烹。Java線程池中提供了以下四種策略:
- AbortPolicy: 直接拋異常
- CallerRunsPolicy:讓調(diào)用者幫著跑這個(gè)任務(wù)
- DiscardOldestPolicy:丟棄隊(duì)列里最老的那個(gè)任務(wù)越庇,執(zhí)行當(dāng)前任務(wù)
- DiscardPolicy:不處理,直接扔掉