夢(mèng)想在沒有實(shí)現(xiàn)之前扯罐,不必對(duì)他人講。
先從全局看問題總是沒錯(cuò)的,線程池的繼承體系:
Executors 是一個(gè)用來生產(chǎn)線程池的靜態(tài)工廠,可以通過該類生產(chǎn)ExecutorService、ScheduledExecutorService等對(duì)象唠摹。
在 Executors 這個(gè)類里面爆捞,定義了這么幾種常用的線程池:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
這幾種線程池都構(gòu)造了ThreadPoolExecutor類,只是參數(shù)不同勾拉,所以看一下這個(gè)ThreadPoolExecutor類煮甥。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
ThreadPoolExecutor
參數(shù)描述如下:
-
corePoolSize
線程池核心線程數(shù)。當(dāng)提交一個(gè)任務(wù)時(shí)藕赞,線程池會(huì)新創(chuàng)建一個(gè)新線程執(zhí)行任務(wù)成肘,直到線程數(shù)達(dá)到corePoolSize;之后繼續(xù)提交的任務(wù)會(huì)被保存到阻塞隊(duì)列中斧蜕。 -
maximumPoolSize
線程池最大線程數(shù)双霍。這個(gè)參數(shù)只有在隊(duì)列有界的情況下才有效。當(dāng)前阻塞隊(duì)列滿了的情況下批销,繼續(xù)提交任務(wù)時(shí)洒闸,則會(huì)繼續(xù)創(chuàng)建新的線程執(zhí)行任務(wù),直到線程數(shù)達(dá)到maximumPoolSize均芽。之后再提交任務(wù)丘逸,會(huì)執(zhí)行拒絕策略。 -
keepAliveTime
空閑隊(duì)列存活時(shí)間掀宋。大于corePoolSize的空閑線程在該時(shí)間之后會(huì)被銷毀 -
unit
keepAliveTime 的單位 -
workQueue
阻塞隊(duì)列深纲,一般有如下幾種阻塞隊(duì)列
- ArrayBlockingQueue:基于數(shù)組的有界阻塞隊(duì)列
- inkedBlockingQuene:基于隊(duì)列的無界阻塞隊(duì)列
- SynchronousQuene:不實(shí)際存儲(chǔ)元素的阻塞隊(duì)列,每個(gè)插入操作必須等到另一個(gè)線程調(diào)用移除操作劲妙,反之亦然湃鹊。如果使用該隊(duì)列,提交的任務(wù)不會(huì)保存镣奋,而總是將新任務(wù)提交給線程執(zhí)行涛舍,如果沒有空閑線程,則嘗試創(chuàng)建新的線程唆途,如果線程已達(dá)最大值富雅,則執(zhí)行拒絕策略掸驱。
- priorityBlockingQuene:具有優(yōu)先級(jí)的無界阻塞隊(duì)列
-
threadFactory
線程工廠 -
handler
拒絕策略,當(dāng)隊(duì)列已滿没佑,且沒有空閑線程時(shí)毕贼,會(huì)執(zhí)行一種拒絕策略,JDK一共有四種拒絕策略
- AbortPolicy:直接拋出異常
- CallerRunsPolicy :在調(diào)用者線程中運(yùn)行任務(wù)
- DiscardOldestPolicy: 丟棄最早的一個(gè)請(qǐng)求蛤奢,再次提交該任務(wù)
- DiscardPolicy: 直接丟棄鬼癣,不做任何處理
結(jié)合之前的代碼可以看到,當(dāng)corePoolSize
等于maximumPoolSize
時(shí)啤贩,構(gòu)造的就是newFixedThreadPool
待秃,這兩個(gè)都為1 時(shí),構(gòu)造的是newSingleThreadExecutor痹屹。
newCachedThreadPool線程池在沒有任務(wù)執(zhí)行時(shí)章郁,數(shù)量為0,其數(shù)量會(huì)動(dòng)態(tài)變化志衍,最大值為
Integer.MAX_VALUE`
ScheduledThreadPoolExecutor
繼承了ThreadPoolExecutor
,構(gòu)造方法:
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue());
}
ScheduledThreadPoolExecutor
增加了一些定時(shí)任務(wù)的功能暖庄,這里使用到了DelayedWorkQueue,這個(gè)隊(duì)列也很有意思楼肪,模擬了二叉查找樹的性質(zhì)培廓,用來存放有序的計(jì)劃任務(wù)。
主要方法如下:
//在指定的時(shí)間后春叫,對(duì)任務(wù)進(jìn)行一次調(diào)度
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
//對(duì)任務(wù)進(jìn)行周期性調(diào)度肩钠,以開始時(shí)間計(jì)算,周期性調(diào)度
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
//對(duì)任務(wù)進(jìn)行周期性調(diào)度暂殖,以結(jié)束時(shí)間計(jì)算蔬将,經(jīng)過延遲后,才進(jìn)行下一次
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
那么在線程池中的線程是如何調(diào)度的央星,線程池的原理是什么呢霞怀?
先看一下線程池的狀態(tài)表示:
//這個(gè)原子類非常強(qiáng)大,其中的高3為表示線程池狀態(tài)莉给,后29位表示線程數(shù)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; //29
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//高3位為111毙石,表示線程池能接受新任務(wù),并且可以運(yùn)行隊(duì)列中的任務(wù)
private static final int RUNNING = -1 << COUNT_BITS;
//高3位000颓遏,表示線程池不再接受新任務(wù)徐矩,但可以處理隊(duì)列中的任務(wù)
private static final int SHUTDOWN = 0 << COUNT_BITS;
//高3為001,表示線程池不再接受新任務(wù)叁幢,不再執(zhí)行隊(duì)列中的任務(wù)滤灯,而且要中斷正在處理的任務(wù)
private static final int STOP = 1 << COUNT_BITS;
//高3位010,表示線程池位為空,準(zhǔn)備關(guān)閉
private static final int TIDYING = 2 << COUNT_BITS;
//高3位011鳞骤,表示線程池已關(guān)閉
private static final int TERMINATED = 3 << COUNT_BITS;
//獲取高3位
private static int runStateOf(int c) { return c & ~CAPACITY; }
//獲取低29位
private static int workerCountOf(int c) { return c & CAPACITY; }
//將高3位窒百,低29位保存在一個(gè)int里
private static int ctlOf(int rs, int wc) { return rs | wc; }
接下來分析線程池的調(diào)度代碼,當(dāng)我們用線程池執(zhí)行一個(gè)任務(wù)的時(shí)候豫尽,會(huì)執(zhí)行以下方法篙梢。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//獲取ctl值,上面的分析知道美旧,這個(gè)值包含了高3位的線程池狀態(tài)和低29位的線程池?cái)?shù)量
int c = ctl.get();
//拿到線程數(shù)量和核心線程數(shù)比較
if (workerCountOf(c) < corePoolSize) {
// 如果當(dāng)前線程數(shù)量< 核心線程數(shù)渤滞,則執(zhí)行addWorker 方法,這個(gè)方法會(huì)新建線程并執(zhí)行任務(wù)
if (addWorker(command, true))
return;
//如果執(zhí)行失敗榴嗅,再拿一次ctl的值
c = ctl.get();
}
// 當(dāng)線程數(shù)大于核心線程妄呕,或上邊任務(wù)添加失敗時(shí)
// 在線程池可用的時(shí)候,會(huì)將任務(wù)添加到阻塞隊(duì)列中
if (isRunning(c) && workQueue.offer(command)) {
// 再次確認(rèn)線程池狀態(tài)嗽测,若線程池停止了绪励,將任務(wù)刪除,并執(zhí)行拒絕策略
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
//如果線程數(shù)量為0论咏,則放入一個(gè)空任務(wù)
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果隊(duì)列無法放入,則再新建線程執(zhí)行任務(wù)颁井,如果失敗厅贪,執(zhí)行 拒接策略
// 這里就是從core 到 max 的擴(kuò)展
else if (!addWorker(command, false))
reject(command);
}
下面看一下addWorker方法
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 獲取線程池狀態(tài)
int rs = runStateOf(c);
// 如果線程池不在運(yùn)行狀態(tài),則不再處理提交的任務(wù)雅宾,直接返回 , 但可以繼續(xù)執(zhí)行隊(duì)列中已有的任務(wù)
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//這里的死循環(huán)是為了CAS 線程數(shù)量养涮,直到成功之后跳出外層循環(huán)
for (;;) {
// 獲取線程數(shù)
int wc = workerCountOf(c);
//判斷線程數(shù)是否已達(dá)最大值,超過容量直接返回
if (wc >= CAPACITY ||
//判斷是核心線程還是最大線程
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//增加線程數(shù)眉抬,跳出外層循環(huán)
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 檢查線程池狀態(tài)贯吓,如果與開始不同,則從外層循環(huán)重新開始
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
// 用傳進(jìn)來的任務(wù)構(gòu)造一個(gè)worker 蜀变,該類繼承了AQS悄谐,實(shí)現(xiàn)了Runnable
w = new Worker(firstTask);
// 獲取worker中創(chuàng)建的線程
final Thread t = w.thread;
if (t != null) {
//加鎖 ,HashSet線程不安全
mainLock.lock();
try {
int c = ctl.get();
int rs = runStateOf(c);
// 檢測(cè)線程池狀態(tài)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//確認(rèn)創(chuàng)建的線程還沒開始運(yùn)行
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//將線程加入集合
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//添加成功之后,啟動(dòng)worker線程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
//返回值標(biāo)識(shí)線程是否啟動(dòng)
return workerStarted;
}
看一下線程是怎么啟動(dòng)的:
// worker類
Worker(Runnable firstTask) {
//在運(yùn)行之前不允許中斷
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
線程啟動(dòng)執(zhí)行的是runWorker方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
//由于在worker構(gòu)造方法中抑制了中斷库北,這里解除抑制
w.unlock(); // allow interrupts
//默認(rèn)為true爬舰,說明發(fā)生了異常
boolean completedAbruptly = true;
try {
//先執(zhí)行傳進(jìn)來的任務(wù),之后從隊(duì)列獲取任務(wù)執(zhí)行
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//在任務(wù)執(zhí)行之前寒瓦,可以做一些事情
beforeExecute(wt, task);
Throwable thrown = null;
try {
//任務(wù)的真正的執(zhí)行
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//任務(wù)執(zhí)行完情屹,可以做些事情,注意:這里可以拿到任務(wù)運(yùn)行時(shí)的異常
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
// 如果一切正常杂腰,置為false , 清理時(shí)會(huì)做判斷
completedAbruptly = false;
} finally {
//清理工作垃你,同時(shí) 任務(wù)如果有異常,會(huì)通過這個(gè)方法擦屁股
processWorkerExit(w, completedAbruptly);
}
}
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
boolean timed; // Are workers subject to culling?
// 兩種情況:
// 1.RUNING狀態(tài)
// 2.SHUTDOWN狀態(tài),但隊(duì)列中還有任務(wù)需要執(zhí)行
for (;;) {
int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc > corePoolSize;
if (wc <= maximumPoolSize && ! (timedOut && timed))
break;
// 執(zhí)行到這里說明線程已超核心線程數(shù)并且超時(shí)惜颇,這時(shí)返回null回收線程
if (compareAndDecrementWorkerCount(c))
return null;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
try {
//如果核心線程允許超時(shí)皆刺,或者線程數(shù)已達(dá)到核心線程數(shù),則執(zhí)行poll
//poll方法在規(guī)定時(shí)間內(nèi)沒返回會(huì)返回null官还,在下一輪循環(huán)的時(shí)候芹橡,會(huì)返回null,線程會(huì)被銷毀
// 否則,執(zhí)行take方法望伦,該方法會(huì)阻塞直到隊(duì)列中有任務(wù)林说,所以當(dāng)線程數(shù)在核心線程數(shù)以下的線程不會(huì)被銷毀
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
最后看一下runWorker中的清理工作:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//如果非正常結(jié)束,將線程數(shù)減一
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
//從線程池中移出異常和超時(shí)的線程
workers.remove(w);
} finally {
mainLock.unlock();
}
// 嘗試關(guān)閉線程池
tryTerminate();
int c = ctl.get();
//線程池狀態(tài)在RUNNING或SHUTDOWN時(shí)
if (runStateLessThan(c, STOP)) {
// 線程正常結(jié)束
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//如果線程為0 但是隊(duì)列中還有任務(wù)要執(zhí)行
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//線程數(shù)量滿足條件屯伞,直接返回
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//新建空的任務(wù)腿箩,假如隊(duì)列中有任務(wù)的話,這里保證能執(zhí)行
//如果線程是因?yàn)楫惓M顺龅牧右。@里進(jìn)行補(bǔ)充
addWorker(null, false);
}
}
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 線程池正在運(yùn)行時(shí)
// 線程池是SHUTDOWN狀態(tài)珠移,但是隊(duì)列還有任務(wù)時(shí)
// 線程池已經(jīng)準(zhǔn)備停止時(shí) 直接返回
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//下面的代碼說明線程池真的需要關(guān)閉了
//如果線程數(shù)量不為0,說明需要將線程中斷末融,這里只中斷一個(gè)線程就可以(為啥呢钧惧?)
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
//執(zhí)行關(guān)閉操作
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 使用 CAS 設(shè)置狀態(tài)位
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
到這里,線程池的基本原理基本能明白一二吧...