面試幾乎必問的問題就多線程的,一般都是以下面幾個(gè)開始逐個(gè)深入:
1.使用多線程嗎疙渣?
2.多線程有哪些優(yōu)勢(shì)和缺點(diǎn)酒甸?
3.線程池的了解過嗎?還有挖坑問題JDK有幾種線程城池虚婿,你們生產(chǎn)環(huán)境中用的哪個(gè)一個(gè)旋奢?
4.線程池都有哪些參數(shù)呀?跟著會(huì)問有哪些隊(duì)列類型然痊,拒絕策略都有哪些至朗?
5.線程池的線程復(fù)用原理是什么?
為了回答好這些問題剧浸,這幾天認(rèn)真的讀了一些線程池的源碼和《深入了解Java虛擬機(jī)》書锹引,來記錄以下自己的學(xué)習(xí);
首先線程分為用戶態(tài)線程和內(nèi)核態(tài)線程唆香,用戶線程串行執(zhí)行嫌变,是依托于進(jìn)程來調(diào)用cpu執(zhí)行指令,并且數(shù)據(jù)相互之間不可見躬它;而內(nèi)核線程是依附于內(nèi)核創(chuàng)建的腾啥,每個(gè)內(nèi)核進(jìn)程都可以調(diào)用cpu,jvm是內(nèi)核線程冯吓;但是Java中的sun虛擬機(jī)中的是采用應(yīng)用自己創(chuàng)建出來的線程是用戶線程倘待;用戶線程通過輕量級(jí)進(jìn)程來實(shí)現(xiàn)對(duì)內(nèi)核線程的調(diào)用的;如圖:
當(dāng)java thread1的時(shí)間片執(zhí)行完后切換到j(luò)ava thead 2時(shí),由于兩個(gè)線程之間數(shù)據(jù)不可見桑谍,需要把java thead 1中的寄存器/緩存/指令/中間數(shù)據(jù)刷新到主內(nèi)存中;線程的創(chuàng)建和銷毀是比重祸挪,Java的線程都會(huì)依賴于內(nèi)核線程锣披,創(chuàng)建線程都需要操作系統(tǒng)狀態(tài)切換耸峭;為了避免過度的資源消耗就誕生了線程池恋博;線程池就是一個(gè)線程的緩存,負(fù)責(zé)對(duì)線程統(tǒng)一分配調(diào)用和監(jiān)控玄组;
下面開始介紹線程池中類的關(guān)系圖整以;
從ThreadPoolExecutor類的代碼可以知道胧辽,ThreadPoolExecutor繼承了AbstractExecutorService,我們來看一下AbstractExecutorService的實(shí)現(xiàn),AbstractExecutorService是一個(gè)抽象類公黑,它實(shí)現(xiàn)了ExecutorService接口邑商。而ExecutorService又是繼承了Executor接口摄咆,我們看一下Executor接口的實(shí)現(xiàn):這下就弄明白了ThreadPoolExecutor、AbstractExecutorService人断、ExecutorService和Executor幾個(gè)之間的關(guān)系了吭从。
Executor是一個(gè)頂層接口,在它里面只聲明了一個(gè)方法execute(Runnable)恶迈,返回值為void涩金,參數(shù)為Runnable類型,從字面意思可以理解暇仲,就是用來執(zhí)行傳進(jìn)去的任務(wù)的步做;然后ExecutorService接口繼承了Executor接口,并聲明了一些方法:submit奈附、invokeAll全度、invokeAny以及shutDown等;抽象類AbstractExecutorService實(shí)現(xiàn)了ExecutorService接口桅狠,基本實(shí)現(xiàn)了ExecutorService中聲明的所有方法讼载;然后ThreadPoolExecutor繼承了類AbstractExecutorService。在ThreadPoolExecutor類中有幾個(gè)非常重要的方法:
execute()方法實(shí)際上是Executor中聲明的方法中跌,在ThreadPoolExecutor進(jìn)行了具體的實(shí)現(xiàn)咨堤,這個(gè)方法是ThreadPoolExecutor的核心方法,通過這個(gè)方法可以向線程池提交一個(gè)任務(wù)漩符,交由線程池去執(zhí)行一喘。
submit()方法是在ExecutorService中聲明的方法,在AbstractExecutorService就已經(jīng)有了具體的實(shí)現(xiàn)嗜暴,在ThreadPoolExecutor中并沒有對(duì)其進(jìn)行重寫凸克,這個(gè)方法也是用來向線程池提交任務(wù)的,但是它和execute()方法不同闷沥,它能夠返回任務(wù)執(zhí)行的結(jié)果萎战,去看submit()方法的實(shí)現(xiàn),會(huì)發(fā)現(xiàn)它實(shí)際上還是調(diào)用的execute()方法舆逃,只不過它利用了Future來獲取任務(wù)執(zhí)行結(jié)果(Future相關(guān)內(nèi)容將在下一篇講述)蚂维。
shutdown()和shutdownNow()是用來關(guān)閉線程池的。
ThreadPoolExecutor類概述
````
//間接調(diào)用最后一個(gè)構(gòu)造函數(shù)路狮,采用默認(rèn)的拒絕策略AbortPolicy和默認(rèn)的線程工廠
ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue)?
//間接調(diào)用最后一個(gè)構(gòu)造函數(shù)虫啥,采用默認(rèn)的默認(rèn)的線程工廠
ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue, RejectedExecutionHandler)
//間接調(diào)用最后一個(gè)構(gòu)造函數(shù),采用默認(rèn)的拒絕策略AbortPolicy
ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue, ThreadFactory)
//前面三個(gè)分別調(diào)用了最后一個(gè)
ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue, ThreadFactory, RejectedExecutionHandler)
//最后一個(gè)構(gòu)造函數(shù)的具體實(shí)現(xiàn)
public ThreadPoolExecutor(int corePoolSize,
? ? int maximumPoolSize,
? ? long keepAliveTime,
? ? TimeUnit unit,
? ? BlockingQueue workQueue,
? ? ThreadFactory threadFactory,
? ? RejectedExecutionHandler handler) {
//參數(shù)合法性檢驗(yàn)奄妨,核心線程數(shù)目涂籽、最大線程數(shù)目、線程空閑回收時(shí)間不得小于0砸抛,最大線程池不得小于核心線程數(shù)數(shù)目
? ? ? ? if (corePoolSize <0 ||maximumPoolSize <=0 ||maximumPoolSize < corePoolSize ||keepAliveTime <0)
throw new IllegalArgumentException();
? ? ? ? ? ? //參數(shù)合法性檢驗(yàn)评雌,阻塞隊(duì)列树枫,線程工廠,決絕策略不得為空
? ? ? ? if (workQueue ==null || threadFactory ==null || handler ==null)
throw new NullPointerException();
? ? ? ? this.corePoolSize = corePoolSize;//核心線程數(shù)大小
? ? ? ? this.maximumPoolSize = maximumPoolSize;//最大線程數(shù)目
? ? ? ? this.workQueue = workQueue;//阻塞隊(duì)列
? ? ? ? this.keepAliveTime = unit.toNanos(keepAliveTime);//空閑回收時(shí)間
? ? ? ? this.threadFactory = threadFactory;//線程工廠
? ? ? ? this.handler = handler;//拒絕策略
}
````
線程池主要幾個(gè)參數(shù)說明
corePoolSize:(volatile int)類型柳骄,表示線程池核心池的大小团赏。
maximumPoolSize:(volatile int)類型,表示線程池最多創(chuàng)建的線程數(shù)目耐薯。
注:Java線程池分兩部分舔清,一塊是核心池,一塊是臨時(shí)池曲初,當(dāng)核心池的線程滿了且阻塞隊(duì)列中任務(wù)滿了体谒,再有任務(wù)提交執(zhí)行時(shí),則創(chuàng)建"臨時(shí)線程",可創(chuàng)建的臨時(shí)線程的數(shù)目為maximumPoolSize-corePoolSize臼婆。當(dāng)線程總數(shù)等于maximumPoolSize且阻塞隊(duì)列滿了抒痒,再有任務(wù)提交時(shí),采取拒絕策略颁褂。
workQueue:阻塞隊(duì)列故响。常用的有ArrayBlockingQueue、LinkedBlockingQueue颁独、PriorityBlockingQueue彩届、DelayQueue。
keepAliveTime:線程空閑回收時(shí)間誓酒。
threadFactory:生成線程的工廠類樟蠕。默認(rèn)為:Executors.defaultThreadFactory();
handle:拒絕策略靠柑。默認(rèn)為defaultHandler = new AbortPolicy()寨辩;
注:常用拒絕策略有以下幾種:
ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù)歼冰,但是不拋出異常靡狞。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)
```
/*AtomicInteger類型隔嫡,用來標(biāo)識(shí)線程池的狀態(tài)甸怕,以及線程池里面線程的數(shù)量,
初始值為1110 0000 0000 0000 0000 0000 0000 0000 前三位是線程池的狀態(tài)畔勤,其中:
000 SHUTDOWN 不接受新任務(wù)但是處理阻塞隊(duì)列中的任務(wù)
010 TIDYING 所有任務(wù)都被終止蕾各,工作線程為0
001 STOP 不接受新任務(wù)也不處理阻塞隊(duì)列中的任務(wù)并且中斷所有線程池中正在運(yùn)行的任務(wù)
011 TERMINATED 不接受新任務(wù)也不處理阻塞隊(duì)列中的任務(wù)并且中斷所有線程池中正在運(yùn)行的任務(wù)
111 RUNNING 接受新的任務(wù)并處理阻塞隊(duì)列中的任務(wù)
注:關(guān)于阻塞隊(duì)列扒磁,后續(xù)會(huì)說庆揪,暫且理解為一個(gè)存放還未執(zhí)行線程的隊(duì)列就好。
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
? ? private static final int COUNT_BITS = Integer.SIZE - 3;
? ? private static final int CAPACITY? = (1 << COUNT_BITS) - 1;
? ? // runState is stored in the high-order bits
? ? private static final int RUNNING? ? = -1 << COUNT_BITS;
? ? private static final int SHUTDOWN? =? 0 << COUNT_BITS;
? ? private static final int STOP? ? ? =? 1 << COUNT_BITS;
? ? private static final int TIDYING? ? =? 2 << COUNT_BITS;
? ? private static final int TERMINATED =? 3 << COUNT_BITS;
? ? //在某些情況下用來存儲(chǔ)任務(wù)妨托,并將任務(wù)提供給線程池中的工作線程
? ? private final BlockingQueue<Runnable> workQueue;
? ? //用來對(duì)pooSize缸榛、corePoolSize吝羞、maximumPoolSize、runState内颗、workers修改時(shí)候同步
? ? private final ReentrantLock mainLock = new ReentrantLock();
? ? //線程池中所有線程的集合钧排,訪問和修改需要mainLock的配合
? ? private final HashSet<Worker> workers = new HashSet<Worker>();
? ? //用來支持waitTemination
? ? private final Condition termination = mainLock.newCondition();
? ? //跟蹤線程池中線程的最大值,具體的猜測(cè)是為了矯正poolsize均澳,訪問和修改需要配合mainLock
? ? private int largestPoolSize;
? ? //已完成任務(wù)的數(shù)量恨溜,在任務(wù)處于Terminate狀態(tài)時(shí)才更新,訪問和修改需要mainLock的配合
? ? private long completedTaskCount;
? ? /*
? ? * 一下參數(shù)都是用戶控制的找前,全部被聲明為了Volatile類型的值糟袁,這樣能夠確保在多線程下,每個(gè)
? ? * 線程都能夠獲取到最新值躺盛。
? ? */
? ? //線程工廠项戴,用戶可以自定義,以便在想線程池創(chuàng)建線程時(shí)附加一些個(gè)人操作
? ? private volatile ThreadFactory threadFactory;
? ? //當(dāng)線程池處于shutdown或者處于飽和時(shí)執(zhí)行的拒絕策略
? ? private volatile RejectedExecutionHandler handler;
? ? //設(shè)置線程池中空閑線程等待多時(shí)毫秒被回收
? ? private volatile long keepAliveTime;
? ? //指定線程池中的空閑線程是否一段時(shí)間被回收槽惫,false一直存活
? ? private volatile boolean allowCoreThreadTimeOut;
? ? //核心線程池大小周叮,若allowCoreThreadTimeOut被設(shè)置,全部空閑超時(shí)被回收的情況下會(huì)為0
? ? private volatile int corePoolSize;
? ? //最大線程池大小界斜,不得超過CAPACITY?
? ? private volatile int maximumPoolSize;
```
從代碼中能看出是將線程狀態(tài)和線程數(shù)放在了一個(gè)int值里面仿耽,高三位代表線程狀態(tài),低29位表示線程數(shù)目锄蹂。
其中clt(AtomicInteger)可以理解為線程安全的整數(shù)氓仲。關(guān)于clt常用的幾個(gè)操作,即對(duì)線程狀態(tài)和線程數(shù)的操作:
runStateOf(int c) 是通過與的方式得糜,在clt字段中獲取到clt的前三位敬扛,也就是線程池的狀態(tài)標(biāo)識(shí)。
workerCountOf(int c)是通過與的方式朝抖,在clt字段中獲取到clt的后29位啥箭,也就是線程池中的線程數(shù)量。
ctlOf(int rs, int wc) 是通過或的方式治宣,將修改后的線程池狀態(tài)rs和線程池中線程數(shù)量打包成clt急侥。
isRunning(int c) SHUTDOWN的狀態(tài)是0左移29為得到的,比他大的均是線程池停止或銷毀狀態(tài)
核心方法
線程池初始化之后侮邀,可以通過調(diào)用submit和execute方法來執(zhí)行任務(wù)坏怪。其實(shí)submit內(nèi)部也是調(diào)用execute方法,源碼如下:
```
//sumit函數(shù)是在ThreadPoolExecute的父類AbstractExecutorService實(shí)現(xiàn)的绊茧,最終還是調(diào)用的子類的execute方法
public Future<?> submit(Runnable task) {
? if (task == null) throw new NullPointerException();
? ? RunnableFuture<Void> ftask = newTaskFor(task, null);
? execute(ftask);
? return ftask;
```
接下來看execute的源碼
```
public void execute(Runnable command) {
? ? ? ? if (command == null)
? ? ? ? ? ? throw new NullPointerException();
? ? ? ? int c = ctl.get();//AtomicInteger
? ? ? ? if (workerCountOf(c) < corePoolSize) {
? ? ? ? ? ? if (addWorker(command, true))
? ? ? ? ? ? ? ? return;
? ? ? ? ? ? c = ctl.get();
? ? ? ? }
? ? ? ? if (isRunning(c) && workQueue.offer(command)) {
? ? ? ? ? ? int recheck = ctl.get();
? ? ? ? ? ? if (! isRunning(recheck) && remove(command))
? ? ? ? ? ? ? ? reject(command);
? ? ? ? ? ? else if (workerCountOf(recheck) == 0)
? ? ? ? ? ? ? ? addWorker(null, false);
? ? ? ? }
? ? ? ? else if (!addWorker(command, false))
? ? ? ? ? ? reject(command);
? ? }
```
執(zhí)行流程為下圖铝宵,圖片來至https://blog.csdn.net/u011637069/article/details/79591330
AddWorker 源碼 通過源碼發(fā)現(xiàn)addWorker實(shí)現(xiàn)了
1)才用循環(huán)CAS操作來將線程數(shù)加1;2)新建一個(gè)線程并啟用。
```
private boolean addWorker(Runnable firstTask, boolean core) {
? ? ? ? //(1)循環(huán)CAS操作鹏秋,將線程池中的線程數(shù)+1.
? ? ? ? retry:
? ? ? ? for (;;) {
? ? ? ? ? ? int c = ctl.get();
? ? ? ? ? ? int rs = runStateOf(c);
? ? ? ? ? ? // Check if queue empty only if necessary.
? ? ? ? ? ? if (rs >= SHUTDOWN &&
? ? ? ? ? ? ? ? ! (rs == SHUTDOWN &&
? ? ? ? ? ? ? ? ? firstTask == null &&
? ? ? ? ? ? ? ? ? ! workQueue.isEmpty()))
? ? ? ? ? ? ? ? return false;
? ? ? ? ? ? for (;;) {
? ? ? ? ? ? ? ? int wc = workerCountOf(c);
? ? ? ? ? ? ? ? //core true代表是往核心線程池中增加線程 false代表往最大線程池中增加線程
? ? ? ? ? ? ? ? //線程數(shù)超標(biāo)尊蚁,不能再添加了,直接返回
? ? ? ? ? ? ? ? if (wc >= CAPACITY ||
? ? ? ? ? ? ? ? ? ? wc >= (core ? corePoolSize : maximumPoolSize))
? ? ? ? ? ? ? ? ? ? return false;
? ? ? ? ? ? ? ? //CAS修改clt的值+1侣夷,在線程池中為將要添加的線程流出空間横朋,成功退出cas循環(huán),失敗繼續(xù)
? ? ? ? ? ? ? ? if (compareAndIncrementWorkerCount(c))
? ? ? ? ? ? ? ? ? ? break retry;
? ? ? ? ? ? ? ? c = ctl.get();? // Re-read ctl
? ? ? ? ? ? ? ? //如果線程池的狀態(tài)發(fā)生了變化回到retry外層循環(huán)
? ? ? ? ? ? ? ? if (runStateOf(c) != rs)
? ? ? ? ? ? ? ? ? ? continue retry;
? ? ? ? ? ? ? ? // else CAS failed due to workerCount change; retry inner loop
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? //(2)新建線程百拓,并加入到線程池workers中琴锭。
? ? ? ? boolean workerStarted = false;
? ? ? ? boolean workerAdded = false;
? ? ? ? Worker w = null;
? ? ? ? try {
? ? ? ? ? ? //對(duì)workers操作要通過加鎖來實(shí)現(xiàn)
? ? ? ? ? ? final ReentrantLock mainLock = this.mainLock;
? ? ? ? ? ? w = new Worker(firstTask);
? ? ? ? ? ? final Thread t = w.thread;
? ? ? ? ? ? if (t != null) {
? ? ? ? ? ? ? //細(xì)化鎖的力度,防止臨界區(qū)過大衙传,浪費(fèi)時(shí)間
? ? ? ? ? ? ? ? mainLock.lock();
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? // Recheck while holding lock.
? ? ? ? ? ? ? ? ? ? // Back out on ThreadFactory failure or if
? ? ? ? ? ? ? ? ? ? // shut down before lock acquired.
? ? ? ? ? ? ? ? ? ? int c = ctl.get();
? ? ? ? ? ? ? ? ? ? int rs = runStateOf(c);
? ? ? ? ? ? ? ? ? ? //判斷線程池的狀態(tài)
? ? ? ? ? ? ? ? ? ? if (rs < SHUTDOWN ||
? ? ? ? ? ? ? ? ? ? ? ? (rs == SHUTDOWN && firstTask == null)) {
? ? ? ? ? ? ? ? ? ? ? ? //判斷添加的任務(wù)狀態(tài),如果已經(jīng)開始丟出異常
? ? ? ? ? ? ? ? ? ? ? ? if (t.isAlive()) // precheck that t is startable
? ? ? ? ? ? ? ? ? ? ? ? ? ? throw new IllegalThreadStateException();
? ? ? ? ? ? ? ? ? ? ? //將新建的線程加入到線程池中
? ? ? ? ? ? ? ? ? ? ? ? workers.add(w);
? ? ? ? ? ? ? ? ? ? ? ? int s = workers.size();
? ? ? ? ? ? ? ? ? ? ? ? //修正largestPoolSize的值
? ? ? ? ? ? ? ? ? ? ? ? if (s > largestPoolSize)
? ? ? ? ? ? ? ? ? ? ? ? ? ? largestPoolSize = s;
? ? ? ? ? ? ? ? ? ? ? ? workerAdded = true;
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? } finally {
? ? ? ? ? ? ? ? ? ? mainLock.unlock();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? //線程添加線程池成功祠够,則開啟新創(chuàng)建的線程
? ? ? ? ? ? ? ? if (workerAdded) {
? ? ? ? ? ? ? ? ? ? t.start();//(3)
? ? ? ? ? ? ? ? ? ? workerStarted = true;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? } finally {
? ? ? ? ? ? //線程添加線程池失敗或者線程start失敗,則需要調(diào)用addWorkerFailed函數(shù)粪牲,
//如果添加成功則需要移除古瓤,并回復(fù)clt的值
? ? ? ? ? ? if (! workerStarted)
? ? ? ? ? ? ? ? addWorkerFailed(w);
? ? ? ? }
? ? ? ? return workerStarted;
? ? }
```
執(zhí)行流程如下:
再看一下worker內(nèi)部類
繼承自AQS,具有鎖的功能腺阳,實(shí)現(xiàn)了Runable接口落君,具有線程的功能
```
private final class Worker
? ? ? ? extends AbstractQueuedSynchronizer
? ? ? ? implements Runnable
? ? {
? ? ? ? /**
? ? ? ? * This class will never be serialized, but we provide a
? ? ? ? * serialVersionUID to suppress a javac warning.
? ? ? ? */
? ? ? ? private static final long serialVersionUID = 6138294804551838833L;
? ? ? ? //線程池中正真運(yùn)行的線程。通過我們指定的線程工廠創(chuàng)建而來
? ? ? ? final Thread thread;
? ? ? //線程包裝的任務(wù)亭引。thread 在run時(shí)主要調(diào)用了該任務(wù)的run方法
? ? ? ? Runnable firstTask;
? ? ? ? //記錄當(dāng)前線程完成的任務(wù)數(shù)
? ? ? ? volatile long completedTasks;
? ? ? ? /**
? ? ? ? * Creates with given first task and thread from ThreadFactory.
? ? ? ? * @param firstTask the first task (null if none)
? ? ? ? */
? ? ? ? Worker(Runnable firstTask) {
? ? ? ? ? ? setState(-1); // inhibit interrupts until runWorker
? ? ? ? ? ? this.firstTask = firstTask;
? ? ? ? ? ? //利用我們指定的線程工廠創(chuàng)建一個(gè)線程
? ? ? ? ? ? this.thread = getThreadFactory().newThread(this);
? ? ? ? }
? ? ? ? /** Delegates main run loop to outer runWorker? */
? ? ? ? public void run() {
? ? ? ? ? ? runWorker(this);
? ? ? ? }
```
從源碼可以看出來绎速,Worker類的run方法實(shí)際上調(diào)用的還是ThreadPoolExecutor的runworker方法。下面將看一下ThreadPoolExecutor的runworker源代碼和注釋解析焙蚓。
```
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
//線程池處于stop狀態(tài)或者當(dāng)前線程被中斷時(shí)纹冤,線程池狀態(tài)是stop狀態(tài)。
//但是當(dāng)前線程沒有中斷购公,則發(fā)出中斷請(qǐng)求
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
? runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted()) {
wt.interrupt();
}
try {
//開始執(zhí)行任務(wù)前的Hook萌京,類似回調(diào)函數(shù)
beforeExecute(wt, task);
Throwable thrown = null;
try {
//執(zhí)行任務(wù)
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//任務(wù)執(zhí)行后的Hook,類似回調(diào)函數(shù)
afterExecute(task, thrown);
}
} finally {
//執(zhí)行完畢后task重置宏浩,completedTasks計(jì)數(shù)器++知残,解鎖
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
? //線程空閑達(dá)到我們?cè)O(shè)定的值時(shí),Worker退出銷毀比庄。
processWorkerExit(w, completedAbruptly);
}
}
```
大概意思就是當(dāng)前任務(wù)不為null或者從隊(duì)列中取的任務(wù)不為null時(shí)求妹,worker線程就一直去執(zhí)行任務(wù)。當(dāng)無要執(zhí)行的任務(wù)時(shí)佳窑,嘗試回收線程制恍。
runWorker函數(shù)中最重要的是getTask(),他不斷的從阻塞隊(duì)列中取任務(wù)交給線程執(zhí)行神凑。下面分析一下:
```
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//如果線程池處于shutdown狀態(tài)净神,
//并且隊(duì)列為空,或者線程池處于stop或者terminate狀態(tài),
//在線程池?cái)?shù)量-1强挫,返回null,回收線程
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//標(biāo)識(shí)當(dāng)前線程在空閑時(shí)薛躬,是否應(yīng)該超時(shí)回收
boolean timed;? ?
for (;;) {
int wc = workerCountOf(c);
//如果allowCoreThreadTimeOut 為ture
//或者當(dāng)前線程數(shù)量大于核心線程池?cái)?shù)目俯渤,
//則需要超時(shí)回收
timed = allowCoreThreadTimeOut || wc > corePoolSize;
//(1)
//如果線程數(shù)目小于最大線程數(shù)目,
//且不允許超時(shí)回收或者未超時(shí)型宝,
//則跳出循環(huán)八匠,繼續(xù)去阻塞隊(duì)列中取任務(wù)(2)
if (wc <= maximumPoolSize && ! (timedOut && timed))
break;
//如果上面if沒有成立,則當(dāng)前線程數(shù)-1趴酣,返回null梨树,回收該線程
if (compareAndDecrementWorkerCount(c))
return null;
//如果上面if沒有成立,則CAS修改ctl失敗岖寞,重讀抡四,cas循環(huán)重新嘗試修改
c = ctl.get();? // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
(2)
try {
//如果允許空閑回收,則調(diào)用阻塞隊(duì)列的poll仗谆,
//否則take指巡,一直等到隊(duì)列中有可取任務(wù)
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
//取到任務(wù),返回任務(wù)隶垮,
//否則超時(shí)timedOut = true;進(jìn)入下一個(gè)循環(huán),
//并且在(1)處會(huì)不成立藻雪,進(jìn)而進(jìn)入到cas修改ctl的程序中
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
```