目錄
類圖結(jié)構(gòu)
如圖所示担平,Executors是個(gè)工具類,用來(lái)提供不同特性的線程池。ThreadPoolExecutor中的ctl是一個(gè)原子變量啡氢,用來(lái)記錄線程池狀態(tài)和線程池中的線程個(gè)數(shù),類似于ReentrantReadWriteLock中使用一個(gè)變量來(lái)保存兩種信息梦抢。
以下為與ctl相關(guān)的變量與函數(shù):
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 假設(shè)Integer為32位(不同平臺(tái)下可能不同)住练,則前3位用來(lái)表示線程運(yùn)行狀態(tài),
// 后29位用來(lái)表示線程個(gè)數(shù)
private static final int COUNT_BITS = Integer.SIZE - 3;
// 00011111111111111111111111111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 11100000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
// 00000000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 00100000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
// 01000000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
// 01100000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
// 取高3位的值
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 低29位的值
private static int workerCountOf(int c) { return c & CAPACITY; }
// 通過指定的rs(Running State)和wc(Workers Count)生成新的ctl狀態(tài)值
private static int ctlOf(int rs, int wc) { return rs | wc; }
線程池的狀態(tài)含義如下:
- RUNNING:接受新任務(wù)并處理阻塞隊(duì)列里的任務(wù)送粱。
- SHUTDOWN:拒絕新任務(wù)但是處理阻塞隊(duì)列里面的任務(wù)。
- STOP:拒絕新任務(wù)并且拋棄阻塞隊(duì)列里的任務(wù)掂之,同時(shí)會(huì)中斷正在處理的任務(wù)抗俄。
- TIDYING:所有任務(wù)都執(zhí)行完后當(dāng)前線程池活動(dòng)線程數(shù)為0,將要調(diào)用terminated方法(相當(dāng)于一個(gè)過渡狀態(tài))世舰。
- TERMINATED: 終止?fàn)顟B(tài)动雹,terminated方法調(diào)用完成后的狀態(tài)。
線程池參數(shù)如下:
- corePoolSize:核心線程池跟压,通常情況下最多添加corePoolSize個(gè)Worker胰蝠,當(dāng)任務(wù)過多時(shí)(阻塞隊(duì)列滿了),會(huì)繼續(xù)添加Worker直到Worker數(shù)達(dá)到maximumPoolSize
- workQueue:用于保存等待執(zhí)行的任務(wù)的阻塞隊(duì)列震蒋。
- maximumPoolSize:線程池最大線程數(shù)量(能添加的Worker的最大數(shù)量)
- ThreadFactory:創(chuàng)建線程的工廠
- RejectedExecutionHandler:飽和策略茸塞,當(dāng)隊(duì)列滿并且線程個(gè)數(shù)達(dá)到maximumPoolSize后采取的策略。
- keepAliveTime: 存活時(shí)間查剖。如果當(dāng)前線程池中的線程數(shù)量比核心線程數(shù)量多钾虐,并且是閑置狀態(tài),則為這些閑置的線程能存活的最大時(shí)間梗搅。
mainLock是獨(dú)占鎖禾唁,用來(lái)控制新增Worker線程操作的原子性。termination是該鎖對(duì)應(yīng)的條件隊(duì)列无切。
Worker繼承AQS并實(shí)現(xiàn)了Runnable接口,是具體承載任務(wù)的而對(duì)象丐枉。Worker繼承了AQS哆键,自己實(shí)現(xiàn)了簡(jiǎn)單不可重入獨(dú)占鎖,其中state=0表示鎖未被獲取瘦锹,state=1表示鎖已經(jīng)被獲取籍嘹,state=-1是常見Worker的默認(rèn)狀態(tài)闪盔,是為了避免該線程在運(yùn)行runWorker方法前被中斷。
以下是對(duì)Executors中創(chuàng)建線程池的方法的介紹辱士。
- newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
創(chuàng)建一個(gè)核心線程數(shù)和做大線程數(shù)都是nThreads的線程池泪掀,并且阻塞隊(duì)列長(zhǎng)度為Integer.MAX_VALUE。keepAliveTime=0說(shuō)明只要線程個(gè)數(shù)比核心線程個(gè)數(shù)多并且當(dāng)前空閑則回收颂碘。
- newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
創(chuàng)建一個(gè)核心線程個(gè)數(shù)和最大線程個(gè)數(shù)都是1的線程池异赫。
- newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
創(chuàng)建一個(gè)按需創(chuàng)建線程的線程池,初始線程個(gè)數(shù)為0头岔,最多線程個(gè)數(shù)為Integer.MAX_VALUE塔拳。KeepAliveTime=60說(shuō)明只要當(dāng)前線程在60s內(nèi)空閑就會(huì)被回收。這個(gè)類型的特殊之處在于峡竣,加入同步隊(duì)列的任務(wù)會(huì)被馬上執(zhí)行靠抑,同步隊(duì)列里面最多只有一個(gè)任務(wù)。
源碼分析
ThreadPoolExecutor的實(shí)現(xiàn)實(shí)際是一個(gè)生產(chǎn)-消費(fèi)模型适掰,當(dāng)用戶添加任務(wù)到線程池時(shí)相當(dāng)于生產(chǎn)者生產(chǎn)元素颂碧,workers中的線程直接執(zhí)行任務(wù)或者從任務(wù)隊(duì)列里面獲取任務(wù)(當(dāng)沒有空閑的Worker時(shí),任務(wù)會(huì)被暫存于任務(wù)隊(duì)列中)時(shí)相當(dāng)于消費(fèi)者消費(fèi)元素类浪。
void execute(Runnable command)
// 執(zhí)行任務(wù)
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 獲取線程池狀態(tài)
int c = ctl.get();
// 如果Worker個(gè)數(shù)小于核心線程數(shù)則新增一個(gè)Worker
if (workerCountOf(c) < corePoolSize) {
// 添加Worker载城,第二個(gè)參數(shù)為true表示新增Worker為核心線程
if (addWorker(command, true))
return;
// 重新獲取ctl,多線程下ctl變化比較頻繁戚宦,要確保所獲取的狀態(tài)是最新的
c = ctl.get();
}
// 線程池關(guān)閉后沒有接受任務(wù)的必要
// 如果線程池還在運(yùn)行个曙,嘗試將任務(wù)加入工作隊(duì)列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 可能任務(wù)入隊(duì)后線程池又關(guān)閉了,則直接移除該任務(wù)
if (! isRunning(recheck) && remove(command))
reject(command);
// 在該任務(wù)成功入隊(duì)前受楼,可能所有Worker都因?yàn)閗eepAliveTime到達(dá)而被回收垦搬,
// 這時(shí)需要重新創(chuàng)建一個(gè)Worker來(lái)處理任務(wù)隊(duì)列里面的任務(wù)
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果任務(wù)隊(duì)列滿了,則嘗試增加一個(gè)非核心線程來(lái)處理任務(wù)艳汽,
// 失敗則執(zhí)行拒絕策略
else if (!addWorker(command, false))
reject(command);
}
// 添加一個(gè)Worker
private boolean addWorker(Runnable firstTask, boolean core) {
// 此循環(huán)用于增加Worker個(gè)數(shù)
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 當(dāng)線程池狀態(tài)為SHUTDOWN猴贰、STOP、TIDYING或TERMINATED時(shí)將不再增加Worker來(lái)處理任務(wù)河狐,
// 但要排除線程池狀態(tài)剛轉(zhuǎn)為SHUTDOWN且
// ((設(shè)置了Worker過期時(shí)間且所有Worker均被回收)或(未設(shè)置Worker過期時(shí)間且Worker個(gè)數(shù)小于corePoolSize))
// 但任務(wù)隊(duì)列還有任務(wù)的情況米绕。
// 因?yàn)橛蒘HUTDOWN狀態(tài)的定義可知線程池會(huì)拒絕新任務(wù)但會(huì)處理任務(wù)隊(duì)列里面剩余任務(wù)。
// firstTask==null表示此次調(diào)用addWorker方法并不是要直接給新創(chuàng)建的Worker分配一個(gè)任務(wù)馋艺,
// 而是要讓它從任務(wù)隊(duì)列中取嘗試獲取一個(gè)任務(wù)栅干。
// 在所有Worker都被回收且任務(wù)隊(duì)列非空的情況下,
// 自然要新增Worker來(lái)處理任務(wù)隊(duì)列中剩余的任務(wù)捐祠;
// 在未設(shè)置Worker過期時(shí)間且Worker數(shù)小于corePoolSize的情況下碱鳞,
// 仍需要添加一個(gè)Worker來(lái)提高處理剩余任務(wù)的效率。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// Worker數(shù)量檢測(cè)
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 成功增加了Worker個(gè)數(shù)踱蛀,直接跳出外層for循環(huán)執(zhí)行實(shí)際添加Worker的代碼
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
// 狀態(tài)改變則跳出內(nèi)層循環(huán)窿给,再次執(zhí)行外循環(huán)進(jìn)行新的狀態(tài)判斷
// 否則繼續(xù)在內(nèi)層循環(huán)自旋直到CAS操作成功
if (runStateOf(c) != rs)
continue retry;
}
}
// 執(zhí)行到此處說(shuō)明已通過CAS操作成功增減了Worker個(gè)數(shù)
// 以下代碼用于實(shí)際增加Worker
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 加獨(dú)占鎖是為了實(shí)現(xiàn)workers同步贵白,因?yàn)榭赡芏鄠€(gè)線程調(diào)用了線程池的execute方法
mainLock.lock();
try {
// 重新獲取線程池狀態(tài),因?yàn)橛锌赡茉讷@取鎖之前執(zhí)行了shutdown操作
int rs = runStateOf(ctl.get());
// 如果線程池還在運(yùn)行或(線程池處于SHUTDOWN狀態(tài)并且firstTast為null)崩泡,執(zhí)行添加Worker操作
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
// 將新創(chuàng)建的Worker添加到workers隊(duì)列
workers.add(w);
int s = workers.size();
// 更新線程池工作線程最大數(shù)量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 添加成功則啟動(dòng)工作線程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker的執(zhí)行
任務(wù)提交到線程池后由Worker來(lái)執(zhí)行禁荒。
Worker(Runnable firstTask) {
// 調(diào)用runWorker前禁止中斷
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 將state置為0,允許中斷
boolean completedAbruptly = true;
try {
// 執(zhí)行傳入的任務(wù)或任務(wù)隊(duì)列中的任務(wù)
// getTask用于從任務(wù)隊(duì)列中獲取任務(wù)角撞,可能會(huì)被阻塞
while (task != null || (task = getTask()) != null) {
w.lock();
...
try {
// 空方法呛伴,用于子類繼承重寫
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 執(zhí)行任務(wù)
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 空方法,用于子類繼承重寫
afterExecute(task, thrown);
}
} finally {
task = null;
// 添加任務(wù)完成數(shù)量
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// Worker被回收前執(zhí)行清理工作
processWorkerExit(w, completedAbruptly);
}
}
在構(gòu)造函數(shù)中設(shè)置Worker的狀態(tài)為-1是為了避免當(dāng)前Worker在調(diào)用runWorker方法前被中斷(當(dāng)其他線程調(diào)用了shutdownNow方法靴寂,如果Worker狀態(tài)>=0則會(huì)中斷該線程)磷蜀。
runWorker中調(diào)用unlock方法時(shí)將state置為0,使Worker線程可被中斷百炬。
processWorkerExit方法如下褐隆。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果runWorker方法非正常退出,則將workerCount遞減
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 記錄任務(wù)完成個(gè)數(shù)
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
// 嘗試設(shè)置線程池狀態(tài)為TERMINATED剖踊,如果當(dāng)前是SHUTDOWN狀態(tài)并且任務(wù)隊(duì)列為空
// 或當(dāng)前是STOP狀態(tài)庶弃,當(dāng)前線程池里沒有活動(dòng)線程
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
// 在設(shè)置了Worker過期時(shí)間的情況下,如果任務(wù)隊(duì)列為空德澈,不必新增Worker歇攻,
// 如果不為空,當(dāng)存在Worker時(shí)不必新增Worker梆造。
// 在沒有設(shè)置過期時(shí)間的情況下缴守,僅當(dāng)線程個(gè)數(shù)小于核心線程數(shù)時(shí)增加Worker。
// 由此可知镇辉,在不主動(dòng)關(guān)閉線程池的情況下屡穗,
// 將會(huì)一直有Worker存在來(lái)接受任務(wù)。
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // 將不執(zhí)行addWorker操作
}
addWorker(null, false);
}
}
void shutdown()
調(diào)用shutdown后忽肛,線程池將不再接受新任務(wù)村砂,但任務(wù)隊(duì)列中的任務(wù)還是要執(zhí)行的。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 檢查是否有關(guān)閉線程池的權(quán)限
checkShutdownAccess();
// 設(shè)置當(dāng)前線程池狀態(tài)為SHUTDOWN屹逛,如果已經(jīng)是SHUTDOWN則直接返回
advanceRunState(SHUTDOWN);
// 中斷空閑的Worker
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 嘗試將狀態(tài)轉(zhuǎn)為TERMINATED
tryTerminate();
}
private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
/**
* 檢查是否設(shè)置了安全管理器础废,是則看當(dāng)前調(diào)用shutdown命令的線程是否具有關(guān)閉線程的權(quán)限,
* 如果有還要看調(diào)用線程是否有中斷工作線程的權(quán)限罕模,
* 如果沒有權(quán)限則拋出異常
*/
private void checkShutdownAccess() {
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkPermission(shutdownPerm);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
security.checkAccess(w.thread);
} finally {
mainLock.unlock();
}
}
}
// ez
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
// 設(shè)置所有空閑線程的中斷標(biāo)志
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 只中斷那些還沒被中斷的
// 獲取w的鎖成功說(shuō)明w在執(zhí)行runWorker方法調(diào)用getTask時(shí)被阻塞评腺,
// 也就是說(shuō)w是空閑的,那就中斷它
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
// 如果只中斷一個(gè)則退出循環(huán)
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 判斷是否滿足可終止條件
// 線程池處于RUNNING狀態(tài)
// 或處于TIDYING狀態(tài)(說(shuō)明有其他線程調(diào)用了tryTerminate方法且即將成功終止線程池)
// 或線程池正處于SHUTDOWN狀態(tài)且任務(wù)隊(duì)列不為空時(shí)不可終止
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 還有Worker的話淑掌,中斷一個(gè)空閑Worker后返回
// 正在執(zhí)行任務(wù)的Worker會(huì)在執(zhí)行完任務(wù)后調(diào)用tryTerminate方法
if (workerCountOf(c) != 0) {
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 設(shè)置線程池狀態(tài)為TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 空方法歇僧,由子類繼承重寫,進(jìn)行線程池關(guān)閉時(shí)的清理工作
terminated();
} finally {
// 此處無(wú)需使用CAS锋拖,因?yàn)榧词笴AS失敗也說(shuō)明線程池終止了
ctl.set(ctlOf(TERMINATED, 0));
// 激活因調(diào)用條件變量termination的await系列方法而被阻塞的所有線程
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
List<Runnable> shutdownNow()
調(diào)用shutdownNow后诈悍,線程池將不會(huì)再接受新任務(wù),并且會(huì)丟棄任務(wù)隊(duì)列里面的任務(wù)且中斷正在執(zhí)行的任務(wù)兽埃,然后立刻返回任務(wù)隊(duì)列里面的任務(wù)列表侥钳。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
// 不是interruptIdleWorkers()
// 中斷所有在運(yùn)行的Worker
interruptWorkers();
// 將任務(wù)隊(duì)列中的任務(wù)移動(dòng)到tasks中
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
// 中斷所有在運(yùn)行的Worker
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
boolean awaitTermination(long timeout, TimeUnit unit)
當(dāng)線程調(diào)用awaitTermination后,當(dāng)前線程會(huì)被阻塞柄错,直到線程池狀態(tài)變成TERMINATIED或等待超時(shí)才返回舷夺。
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
// 如果線程池已經(jīng)終止,則直接返回
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
// 等待相應(yīng)時(shí)間售貌,線程池成功關(guān)閉后會(huì)調(diào)用termination.signalAll()將當(dāng)前線程激活
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
更多
相關(guān)筆記:《Java并發(fā)編程之美》閱讀筆記