一、池化技術(shù)
程序的運(yùn)行巷蚪,其本質(zhì)上病毡,是對系統(tǒng)資源(CPU、內(nèi)存屁柏、磁盤啦膜、網(wǎng)絡(luò)等等)的使用,如何高效的使用這些資源是編程優(yōu)化演進(jìn)的一個(gè)方向淌喻,池化技術(shù)就是非常重要的一項(xiàng)優(yōu)化手段僧家。
池化技術(shù)簡單點(diǎn)來說,就是提前保存大量的資源裸删,以備不時(shí)之需八拱。在機(jī)器資源有限的情況下,使用池化技術(shù)可以大大的提高資源的利用率,提升性能等乘粒。在編程領(lǐng)域豌注,比較典型的池化技術(shù)有:線程池、連接池灯萍、內(nèi)存池轧铁、對象池等。
下面代碼可以創(chuàng)建一個(gè)線程:
public class Application {
public static void main(String[] args) throws Exception {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("線程運(yùn)行......");
}
}).start();
}
}
實(shí)現(xiàn)Runnable接口就可以實(shí)現(xiàn)一個(gè)簡單的線程旦棉〕莘纾可以利用上多核CPU。當(dāng)一個(gè)任務(wù)結(jié)束绑洛,當(dāng)前線程就接收救斑。但很多時(shí)候,我們不止會執(zhí)行一個(gè)任務(wù)真屯。如果每次都是如此的創(chuàng)建線程脸候、執(zhí)行任務(wù),銷毀線程绑蔫,會造成很大的性能開銷携添。
那能否一個(gè)線程創(chuàng)建后母谎,執(zhí)行完一個(gè)任務(wù)后廊勃,又去執(zhí)行另一個(gè)任務(wù)皮迟,而不是銷毀兽掰?這就是線程池能解決的問題。這也就是池化技術(shù)的思想窖壕,通過預(yù)先創(chuàng)建好多個(gè)線程忧勿,放在池中,這樣可以在需要使用線程的時(shí)候直接獲取瞻讽,避免多次重復(fù)創(chuàng)建鸳吸、銷毀帶來的開銷。
二速勇、線程池的使用
1. 通過Executors創(chuàng)建:
ExecutorService executorService1 = Executors.newFixedThreadPool(3);
ExecutorService executorService2 = Executors.newSingleThreadExecutor();
ExecutorService executorService3 = Executors.newCachedThreadPool();
ExecutorService executorService4 = Executors.newScheduledThreadPool(3);
executorService1.execute(new Runnable() {
@Override
public void run() {
}
});
通過Executors工廠類中的靜態(tài)方法可以更簡單的進(jìn)行線程池的創(chuàng)建晌砾。在阿里巴巴Java開發(fā)手冊中,明確說明不允許使用Executors烦磁,因?yàn)镋xecutors的不正確使用會帶來若干問題养匈,如OOM等。
2. 通過ThreadPoolExecutor創(chuàng)建:
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5));
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
}
});
我們在實(shí)踐中開發(fā)中都伪,應(yīng)盡量選擇此方式呕乎,并進(jìn)行完善的異常控制機(jī)制陨晶。
三猬仁、線程池的類設(shè)計(jì)
我們先看一下ThreadPoolExecutor類概要圖:
ExecutorService是真正的線程池接口。
Executor是線程池的頂級接口,只是一個(gè)執(zhí)行線程的工具湿刽,只提供一個(gè)execute(Runnable command)的方法的烁,真正的線程池接口是ExecutorService。
AbstractExecutorService實(shí)現(xiàn)了ExecutorService接口诈闺,實(shí)現(xiàn)了其中大部分的方法(有未實(shí)現(xiàn)的方法撮躁,所以被聲明為Abstract)。
ThreadPoolExecutor买雾,繼承了AbstractExecutorService把曼,是ExecutorService的默認(rèn)實(shí)現(xiàn)。
另外漓穿,Executors也是ThreadPoolExecutor相關(guān)類嗤军,生產(chǎn)各種類型線程池,上文已作介紹晃危,不推薦使用叙赚。
ThreadPoolExecutor類
1. 構(gòu)造方法
java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個(gè)類,因此如果要透徹地了解Java中的線程池僚饭,必須先了解這個(gè)類震叮。
在ThreadPoolExecutor類中提供了四個(gè)構(gòu)造方法:
/**
* 注釋省略.
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
/**
* 注釋省略.
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
/**
* 注釋省略.
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
/**
* 注釋省略.
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
前面三個(gè)構(gòu)造方法缺省部分參數(shù),會使用缺省參數(shù)的默認(rèn)值作為相應(yīng)實(shí)參鳍鸵,調(diào)用最后一個(gè)構(gòu)造方法苇瓣。通過構(gòu)造方法即可創(chuàng)建線程池。
構(gòu)造方法中各個(gè)參數(shù)的含義:
corePoolSize:核心池的大小偿乖,這個(gè)參數(shù)跟后面講述的線程池的實(shí)現(xiàn)原理有非常大的關(guān)系击罪。在創(chuàng)建了線程池后,默認(rèn)情況下贪薪,線程池中并沒有任何線程媳禁,而是等待有任務(wù)到來才創(chuàng)建線程去執(zhí)行任務(wù),除非調(diào)用了prestartAllCoreThreads()或者prestartCoreThread()方法画切,從這2個(gè)方法的名字就可以看出竣稽,是預(yù)創(chuàng)建線程的意思,即在沒有任務(wù)到來之前就創(chuàng)建corePoolSize個(gè)線程或者一個(gè)線程霍弹。默認(rèn)情況下毫别,在創(chuàng)建了線程池后,線程池中的線程數(shù)為0庞萍,當(dāng)有任務(wù)來之后拧烦,就會創(chuàng)建一個(gè)線程去執(zhí)行任務(wù),當(dāng)線程池中的線程數(shù)目達(dá)到corePoolSize后钝计,就會把到達(dá)的任務(wù)放到緩存隊(duì)列當(dāng)中恋博。
maximumPoolSize:線程池最大線程數(shù)齐佳,這個(gè)參數(shù)也是一個(gè)非常重要的參數(shù),它表示在線程池中最多能創(chuàng)建多少個(gè)線程债沮。
keepAliveTime:表示線程沒有任務(wù)執(zhí)行時(shí)最多保持多久時(shí)間會終止炼吴。默認(rèn)情況下,只有當(dāng)線程池中的線程數(shù)大于corePoolSize時(shí)疫衩,keepAliveTime才會起作用硅蹦,直到線程池中的線程數(shù)不大于corePoolSize,即當(dāng)線程池中的線程數(shù)大于corePoolSize時(shí)闷煤,如果一個(gè)線程空閑的時(shí)間達(dá)到keepAliveTime童芹,則會終止,直到線程池中的線程數(shù)不超過corePoolSize鲤拿。但是如果調(diào)用了allowCoreThreadTimeOut(boolean)方法假褪,在線程池中的線程數(shù)不大于corePoolSize時(shí),keepAliveTime參數(shù)也會起作用近顷,直到線程池中的線程數(shù)為0生音。
unit:參數(shù)keepAliveTime的時(shí)間單位,有7種取值窒升,在TimeUnit類中有7種靜態(tài)屬性:
TimeUnit.DAYS; // 天
TimeUnit.HOURS; // 小時(shí)
TimeUnit.MINUTES; // 分鐘
TimeUnit.SECONDS; // 秒
TimeUnit.MILLISECONDS; // 毫秒
TimeUnit.MICROSECONDS; // 微妙
TimeUnit.NANOSECONDS; // 納秒-
workQueue:一個(gè)阻塞隊(duì)列缀遍,用來存儲等待執(zhí)行的任務(wù),這個(gè)參數(shù)的選擇也很重要饱须,會對線程池的運(yùn)行過程產(chǎn)生重大影響域醇,一般來說,這里的阻塞隊(duì)列有以下幾種選擇:
- ArrayBlockingQueue和PriorityBlockingQueue使用較少冤寿,一般使用LinkedBlockingQueue和Synchronous歹苦。線程池的排隊(duì)策略與BlockingQueue有關(guān)。
常用的workQueue類型: - SynchronousQueue:這個(gè)隊(duì)列接收到任務(wù)的時(shí)候督怜,會直接提交給線程處理,而不保留它狠角,如果所有線程都在工作怎么辦号杠?那就新建一個(gè)線程來處理這個(gè)任務(wù)!所以為了保證不出現(xiàn)「線程數(shù)達(dá)到了maximumPoolSize而不能新建線程」的錯(cuò)誤丰歌,使用這個(gè)類型隊(duì)列的時(shí)候姨蟋,maximumPoolSize一般指定成Integer.MAX_VALUE,即無限大立帖。
- LinkedBlockingQueue:這個(gè)隊(duì)列接收到任務(wù)的時(shí)候眼溶,如果當(dāng)前線程數(shù)小于核心線程數(shù),則新建線程(核心線程)處理任務(wù)晓勇;如果當(dāng)前線程數(shù)等于核心線程數(shù)堂飞,則進(jìn)入隊(duì)列等待灌旧。由于這個(gè)隊(duì)列沒有最大值限制,即所有超過核心線程數(shù)的任務(wù)都將被添加到隊(duì)列中绰筛,這也就導(dǎo)致了maximumPoolSize的設(shè)定失效枢泰,因?yàn)榭偩€程數(shù)永遠(yuǎn)不會超過corePoolSize。
- ArrayBlockingQueue:可以限定隊(duì)列的長度铝噩,接收到任務(wù)的時(shí)候衡蚂,如果沒有達(dá)到corePoolSize的值,則新建線程(核心線程)執(zhí)行任務(wù)骏庸,如果達(dá)到了毛甲,則入隊(duì)等候,如果隊(duì)列已滿具被,則新建線程(非核心線程)執(zhí)行任務(wù)玻募,又如果總線程數(shù)到了maximumPoolSize,并且隊(duì)列也滿了硬猫,則發(fā)生錯(cuò)誤补箍。
- DelayQueue:隊(duì)列內(nèi)元素必須實(shí)現(xiàn)Delayed接口,這就意味著你傳進(jìn)去的任務(wù)必須先實(shí)現(xiàn)Delayed接口啸蜜。這個(gè)隊(duì)列接收到任務(wù)時(shí)坑雅,首先先入隊(duì),只有達(dá)到了指定的延時(shí)時(shí)間衬横,才會執(zhí)行任務(wù)裹粤。
- ArrayBlockingQueue和PriorityBlockingQueue使用較少冤寿,一般使用LinkedBlockingQueue和Synchronous歹苦。線程池的排隊(duì)策略與BlockingQueue有關(guān)。
threadFactory:線程工廠,主要用來創(chuàng)建線程蜂林。如果沒指定的話遥诉,默認(rèn)會使用Executors.defaultThreadFactory(),一般來說噪叙,我們會在這里對線程設(shè)置名稱矮锈、異常處理器等。
handler:表示當(dāng)拒絕處理任務(wù)時(shí)的策略睁蕾,有以下四種取值:
ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常苞笨。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常子眶。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù)瀑凝,然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程)。
ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)臭杰。
2. 方法
- execute()
- submit()
- shutdown()
- shutdownNow()
- getQueue()
- getPoolSize()
- getActiveCount()
- getCompletedTaskCount()
execute()方法實(shí)際上是Executor中聲明的方法粤咪,在ThreadPoolExecutor進(jìn)行了具體的實(shí)現(xiàn),這個(gè)方法是ThreadPoolExecutor的核心方法渴杆,通過這個(gè)方法可以向線程池提交一個(gè)任務(wù)寥枝,交由線程池去執(zhí)行宪塔。
submit()方法是在ExecutorService中聲明的方法,在AbstractExecutorService就已經(jīng)有了具體的實(shí)現(xiàn)脉顿,在ThreadPoolExecutor中并沒有對其進(jìn)行重寫蝌麸,這個(gè)方法也是用來向線程池提交任務(wù)的,但是它和execute()方法不同艾疟,它能夠返回任務(wù)執(zhí)行的結(jié)果来吩,去看submit()方法的實(shí)現(xiàn),會發(fā)現(xiàn)它實(shí)際上還是調(diào)用的execute()方法蔽莱,只不過它利用了Future來獲取任務(wù)執(zhí)行結(jié)果(Future相關(guān)內(nèi)容將在下一篇講述)弟疆。
shutdown()和shutdownNow()是用來關(guān)閉線程池的。
getQueue() 盗冷、getPoolSize() 怠苔、getActiveCount()、getCompletedTaskCount()等獲取與線程池相關(guān)屬性的方法仪糖,還有在下文提到的runwork()柑司、addwork()、processworkerExit() 方法等等锅劝。
AbstractExecutorService類
下面我們看一下AbstractExecutorService的各方法:
public abstract class AbstractExecutorService implements ExecutorService {
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {...};
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {...};
public Future<?> submit(Runnable task) {};
public <T> Future<T> submit(Runnable task, T result) {...};
public <T> Future<T> submit(Callable<T> task) {...};
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {...};
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {...};
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {...};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {...};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {...};
}
AbstractExecutorService是一個(gè)抽象類攒驰,它實(shí)現(xiàn)了ExecutorService接口。
ExecutorService類
public interface ExecutorService extends Executor {
void shutdown();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
ExecutorService繼承了Executor接口故爵。
Executor接口
public interface Executor {
void execute(Runnable command);
}
上述類與接口之間的關(guān)系
Executor是一個(gè)頂層接口玻粪,在它里面只聲明了一個(gè)方法execute(Runnable),返回值為void诬垂,參數(shù)為Runnable類型劲室,從字面意思可以理解,就是用來執(zhí)行傳進(jìn)去的任務(wù)的结窘;
然后ExecutorService接口繼承了Executor接口很洋,并聲明了一些方法:submit、invokeAll隧枫、invokeAny以及shutDown等蹲缠;
抽象類AbstractExecutorService實(shí)現(xiàn)了ExecutorService接口,基本實(shí)現(xiàn)了ExecutorService中聲明的所有方法悠垛;
然后ThreadPoolExecutor繼承了類AbstractExecutorService。
四娜谊、線程池的執(zhí)行過程
AtomicInteger ctl 相關(guān)代碼
/**
* 注釋省略.
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
其中ctl這個(gè) AtomicInteger 的功能很強(qiáng)大确买,其高3位用于維護(hù)線程池運(yùn)行狀態(tài),低29位維護(hù)線程池中線程數(shù)量
- RUNNING:-1 << COUNT_BITS纱皆,即高3位為1湾趾,低29位為0芭商,該狀態(tài)的線程池會接收新任務(wù),也會處理在阻塞隊(duì)列中等待處理的任務(wù)搀缠。
- SHUTDOWN:0 << COUNT_BITS铛楣,即高3位為0,低29位為0艺普,該狀態(tài)的線程池不會再接收新任務(wù)簸州,但還會處理已經(jīng)提交到阻塞隊(duì)列中等待處理的任務(wù)。
- STOP:1 << COUNT_BITS歧譬,即高3位為001岸浑,低29位為0,該狀態(tài)的線程池不會再接收新任務(wù)瑰步,不會處理在阻塞隊(duì)列中等待的任務(wù)矢洲,而且還會中斷正在運(yùn)行的任務(wù)。
- TIDYING:2 << COUNT_BITS缩焦,即高3位為010读虏,低29位為0,所有任務(wù)都被終止了袁滥,workerCount為0盖桥,為此狀態(tài)時(shí)還將調(diào)用terminated()方法。
- TERMINATED:3 << COUNT_BITS呻拌,即高3位為100葱轩,低29位為0,terminated()方法調(diào)用完成后變成此狀態(tài)藐握。
這些狀態(tài)均由int型表示靴拱,大小關(guān)系為 RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED,這個(gè)順序基本上也是遵循線程池從運(yùn)行到終止這個(gè)過程猾普。
runStateOf(int c) 方法:c & 高3位為1袜炕,低29位為0的~CAPACITY,用于獲取高3位保存的線程池狀態(tài)初家。
workerCountOf(int c) 方法:c & 高3位為0偎窘,低29位為1的CAPACITY,用于獲取低29位的線程數(shù)量溜在。
ctlOf(int rs, int wc) 方法:參數(shù)rs表示runState陌知,參數(shù)wc表示workerCount,即根據(jù)runState和workerCount打包合并成ctl掖肋。
核心方法源碼
execute(Runnable command)方法
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* (待執(zhí)行的command)
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
// 需要執(zhí)行的任務(wù)command為空仆葡,拋出空指針異常
if (command == null) // 1
throw new NullPointerException();
/*
* 執(zhí)行的流程實(shí)際上分為三步:
* 1. 如果運(yùn)行的線程小于 corePoolSize习劫,以用戶給定的 Runable 對象新開一個(gè)線程去執(zhí)行
* 并且執(zhí)行addWorker方法會以原子性操作去檢查 runState 和 workerCount滔迈,以防止當(dāng)返回false的
* 時(shí)候添加了不應(yīng)該添加的線程
* 2. 如果任務(wù)能夠成功添加到隊(duì)列當(dāng)中掸鹅,我們?nèi)孕枰獙μ砑拥木€程進(jìn)行雙重檢查肄扎,有可能添加的線程在前
* 一次檢查時(shí)已經(jīng)死亡,又或者在進(jìn)入該方法的時(shí)候線程池關(guān)閉了腰涧。所以我們需要復(fù)查狀態(tài)韧掩,并有有必
* 要的話需要在停止時(shí)回滾入列操作,或者在沒有線程的時(shí)候新開一個(gè)線程
* 3. 如果任務(wù)無法入列窖铡,那我們需要嘗試新增一個(gè)線程疗锐,如果新建線程失敗了,我們就知道線程可能關(guān)閉了
* 或者飽和了万伤,就需要拒絕這個(gè)任務(wù)
*
*/
// 獲取線程池的控制狀態(tài)
int c = ctl.get(); // 2
// 通過workCountOf方法算workerCount值窒悔,小于corePoolSize
if (workerCountOf(c) < corePoolSize) {
// 添加任務(wù)到worker集合當(dāng)中
if (addWorker(command, true))
return; // 成功返回
c = ctl.get(); // 失敗的話再次獲取線程池的控制狀態(tài)
}
/*
* 判斷線程池是否正處于RUNNING狀態(tài)
* 是的話添加Runnable對象到workQueue隊(duì)列當(dāng)中
*/
if (isRunning(c) && workQueue.offer(command)) { // 3
int recheck = ctl.get(); // 再次獲取線程池的狀態(tài)
// 再次檢查狀態(tài)
// 線程池不處于RUNNING狀態(tài),將任務(wù)從workQueue隊(duì)列中移除
if (! isRunning(recheck) && remove(command))
// 拒絕任務(wù)
reject(command);
// workerCount等于0
else if (workerCountOf(recheck) == 0) // 4
// 添加worker
addWorker(null, false);
}
// 加入阻塞隊(duì)列失敗敌买,則嘗試以線程池最大線程數(shù)新開線程去執(zhí)行該任務(wù)
else if (!addWorker(command, false)) // 5
// 執(zhí)行失敗則拒絕任務(wù)
reject(command);
}
我們來說一下上面這個(gè)代碼的流程:
1. 首先判斷任務(wù)是否為空简珠,空則拋出空指針異常
2. 不為空則獲取線程池控制狀態(tài),判斷小于corePoolSize虹钮,添加到worker集合當(dāng)中執(zhí)行聋庵。若成功,則返回芙粱;失敗的話再接著獲取線程池控制狀態(tài)祭玉,因?yàn)橹挥袪顟B(tài)變了才會失敗,所以重新獲取
3. 判斷線程池是否處于運(yùn)行狀態(tài)春畔,是的話則添加command到阻塞隊(duì)列脱货,加入時(shí)也會再次獲取狀態(tài)并且檢測狀態(tài)是否不處于運(yùn)行狀態(tài),不處于的話則將command從阻塞隊(duì)列移除律姨,并且拒絕任務(wù)
4. 如果線程池里沒有了線程振峻,則創(chuàng)建新的線程去執(zhí)行獲取阻塞隊(duì)列的任務(wù)執(zhí)行
5. 如果以上都沒執(zhí)行成功,則需要開啟最大線程池里的線程來執(zhí)行任務(wù)择份,失敗的話就丟棄
addWorker(Runnable firstTask, boolean core)方法
private boolean addWorker(Runnable firstTask, boolean core) {
//外部循環(huán)標(biāo)記
retry:
//外層死循環(huán)
for (;;) {
//獲取線程池控制狀態(tài)
int c = ctl.get();
//獲取runState
int rs = runStateOf(c);
?
// Check if queue empty only if necessary.
/**
*1.如果線程池runState至少已經(jīng)是SHUTDOWN
*2\. 有一個(gè)是false則addWorker失敗扣孟,看false的情況
* - runState==SHUTDOWN,即狀態(tài)已經(jīng)大于SHUTDOWN了
* - firstTask為null荣赶,即傳進(jìn)來的任務(wù)為空凤价,結(jié)合上面就是runState是SHUTDOWN,但是
* firstTask不為空拔创,代表線程池已經(jīng)關(guān)閉了還在傳任務(wù)進(jìn)來
* - 隊(duì)列為空利诺,既然任務(wù)已經(jīng)為空,隊(duì)列為空剩燥,就不需要往線程池添加任務(wù)了
*/
if (rs >= SHUTDOWN && //runState大于等于SHUTDOWN,初始位RUNNING
! (rs == SHUTDOWN && //runState等于SHUTDOWN
firstTask == null && //firstTask為null
! workQueue.isEmpty())) //workQueue隊(duì)列不為空
return false;
?
//內(nèi)層死循環(huán)
for (;;) {
//獲取線程池的workerCount數(shù)量
int wc = workerCountOf(c);
//如果workerCount超出最大值或者大于corePoolSize/maximumPoolSize
//返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//通過CAS操作立轧,使workerCount數(shù)量+1,成功則跳出循環(huán),回到retry標(biāo)記
if (compareAndIncrementWorkerCount(c))
break retry;
//CAS操作失敗氛改,再次獲取線程池的控制狀態(tài)
c = ctl.get(); // Re-read ctl
//如果當(dāng)前runState不等于剛開始獲取的runState,則跳出內(nèi)層循環(huán)比伏,繼續(xù)外層循環(huán)
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
//CAS由于更改workerCount而失敗胜卤,繼續(xù)內(nèi)層循環(huán)
}
}
?
//通過以上循環(huán),能執(zhí)行到這是workerCount成功+1了
//worker開始標(biāo)記
boolean workerStarted = false;
//worker添加標(biāo)記
boolean workerAdded = false;
//初始化worker為null
Worker w = null;
try {
//初始化一個(gè)當(dāng)前Runnable對象的worker對象
w = new Worker(firstTask);
//獲取該worker對應(yīng)的線程
final Thread t = w.thread;
//如果線程不為null
if (t != null) {
//初始線程池的鎖
final ReentrantLock mainLock = this.mainLock;
//獲取鎖
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
//獲取鎖后再次檢查赁项,獲取線程池runState
int rs = runStateOf(ctl.get());
?
//當(dāng)runState小于SHUTDOWN或者runState等于SHUTDOWN并且firstTask為null
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//線程已存活
if (t.isAlive()) // precheck that t is startable
//線程未啟動就存活葛躏,拋出IllegalThreadStateException異常
throw new IllegalThreadStateException();
//將worker對象添加到workers集合當(dāng)中
workers.add(w);
//獲取workers集合的大小
int s = workers.size();
//如果大小超過largestPoolSize
if (s > largestPoolSize)
//重新設(shè)置largestPoolSize
largestPoolSize = s;
//標(biāo)記worker已經(jīng)被添加
workerAdded = true;
}
} finally {
//釋放鎖
mainLock.unlock();
}
//如果worker添加成功
if (workerAdded) {
//啟動線程
t.start();
//標(biāo)記worker已經(jīng)啟動
workerStarted = true;
}
}
} finally {
//如果worker沒有啟動成功
if (! workerStarted)
//workerCount-1的操作
addWorkerFailed(w);
}
//返回worker是否啟動的標(biāo)記
return workerStarted;
}
簡單說一下代碼流程:
1. 獲取線程池的控制狀態(tài),進(jìn)行判斷悠菜,不符合則返回false舰攒,符合則下一步
2. 死循環(huán),判斷workerCount是否大于上限悔醋,或者大于corePoolSize/maximumPoolSize摩窃,沒有的話則對workerCount+1操作
3. 如果不符合上述判斷或+1操作失敗,再次獲取線程池的控制狀態(tài)芬骄,獲取runState與剛開始獲取的runState相比猾愿,不一致則跳出內(nèi)層循環(huán)繼續(xù)外層循環(huán),否則繼續(xù)內(nèi)層循環(huán)
4. +1操作成功后账阻,使用重入鎖ReentrantLock來保證往workers當(dāng)中添加worker實(shí)例蒂秘,添加成功就啟動該實(shí)例
addWorker方法有4種傳參的方式(在execute方法中使用了前3種):
addWorker(command, true)
addWorker(command, false)
addWorker(null, false)
addWorker(null, true)
第一個(gè):線程數(shù)小于corePoolSize時(shí),放一個(gè)需要處理的task進(jìn)Workers Set淘太。如果Workers Set長度超過corePoolSize姻僧,就返回false。
第二個(gè):當(dāng)隊(duì)列被放滿時(shí)蒲牧,就嘗試將這個(gè)新來的task直接放入Workers Set撇贺,而此時(shí)Workers Set的長度限制是maximumPoolSize。如果線程池也滿了的話就返回false造成。
第三個(gè):放入一個(gè)空的task進(jìn)workers Set显熏,長度限制是maximumPoolSize。這樣一個(gè)task為空的worker在線程執(zhí)行的時(shí)候會去任務(wù)隊(duì)列里拿任務(wù)晒屎,這樣就相當(dāng)于創(chuàng)建了一個(gè)新的線程喘蟆,只是沒有馬上分配任務(wù)。
第四個(gè):這個(gè)方法就是放一個(gè)null的task進(jìn)Workers Set鼓鲁,而且是在小于corePoolSize時(shí)蕴轨,如果此時(shí)Set中的數(shù)量已經(jīng)達(dá)到corePoolSize那就返回false,什么也不干骇吭。實(shí)際使用中是在prestartAllCoreThreads()方法橙弱,這個(gè)方法用來為線程池預(yù)先啟動corePoolSize個(gè)worker等待從workQueue中獲取任務(wù)執(zhí)行。
addWorkerFailed(Worker w)
private void addWorkerFailed(Worker w) {
//重入鎖
final ReentrantLock mainLock = this.mainLock;
//獲取鎖
mainLock.lock();
try {
//如果worker不為null
if (w != null)
//workers移除worker
workers.remove(w);
//通過CAS操作,workerCount-1
decrementWorkerCount();
tryTerminate();
} finally {
//釋放鎖
mainLock.unlock();
}
}
addWorker方法添加worker失敗棘脐,并且沒有成功啟動任務(wù)的時(shí)候斜筐,就會調(diào)用此方法,將任務(wù)從workers中移除蛀缝,并且workerCount做-1操作顷链。
tryTerminate()
final void tryTerminate() {
//死循環(huán)
for (;;) {
//獲取線程池控制狀態(tài)
int c = ctl.get();
/*
*線程池處于RUNNING狀態(tài)
*線程池狀態(tài)最小大于TIDYING
*線程池==SHUTDOWN并且workQUeue不為空
*直接return,不能終止
*/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//如果workerCount不為0
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
?
//獲取線程池的鎖
final ReentrantLock mainLock = this.mainLock;
//獲取鎖
mainLock.lock();
try {
//通過CAS操作屈梁,設(shè)置線程池狀態(tài)為TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
//設(shè)置線程池的狀態(tài)為TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
//發(fā)送釋放信號給在termination條件上等待的線程
termination.signalAll();
}
return;
}
} finally {
//釋放鎖
mainLock.unlock();
}
// else retry on failed CAS
}
}
當(dāng)對線程池執(zhí)行了非正常成功邏輯的操作時(shí)嗤练,都會需要執(zhí)行tryTerminate嘗試終止線程池。
runWorker(Worker w)
final void runWorker(Worker w) {
//獲取當(dāng)前線程
Thread wt = Thread.currentThread();
//獲取worker里的任務(wù)
Runnable task = w.firstTask;
//將worker實(shí)例的任務(wù)賦值為null
w.firstTask = null;
/*
*unlock方法會調(diào)用AQS的release方法
*release方法會調(diào)用具體實(shí)現(xiàn)類也就是Worker的tryRelease方法
*也就是將AQS狀態(tài)置為0在讶,允許中斷
*/
w.unlock(); // allow interrupts
//是否突然完成
boolean completedAbruptly = true;
try {
//worker實(shí)例的task不為空煞抬,或者通過getTask獲取的不為空
while (task != null || (task = getTask()) != null) {
//獲取鎖
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
/*
*獲取線程池的控制狀態(tài),至少要大于STOP狀態(tài)
*如果狀態(tài)不對构哺,檢查當(dāng)前線程是否中斷并清除中斷狀態(tài)革答,并且再次檢查線程池狀態(tài)是否大于STOP
*如果上述滿足,檢查該對象是否處于中斷狀態(tài)遮婶,不清除中斷標(biāo)記
*/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
//中斷改對象
wt.interrupt();
try {
//執(zhí)行前的方法蝗碎,由子類具體實(shí)現(xiàn)
beforeExecute(wt, task);
Throwable thrown = null;
try {
//執(zhí)行任務(wù)
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
//執(zhí)行完后調(diào)用的方法,也是由子類具體實(shí)現(xiàn)
afterExecute(task, thrown);
}
} finally {//執(zhí)行完后
//task設(shè)置為null
task = null;
//已完成任務(wù)數(shù)+1
w.completedTasks++;
//釋放鎖
w.unlock();
}
}
completedAbruptly = false;
} finally {
//處理并退出當(dāng)前worker
processWorkerExit(w, completedAbruptly);
}
}
該方法的作用就是去執(zhí)行任務(wù)旗扑。執(zhí)行步驟如下:
1. 首先在方法一進(jìn)來蹦骑,就執(zhí)行了w.unlock(),這是為了將AQS的狀態(tài)改為0臀防,因?yàn)橹挥術(shù)etState() >= 0的時(shí)候眠菇,線程才可以被中斷
2. 判斷firstTask是否為空,為空則通過getTask()獲取任務(wù)袱衷,不為空接著往下執(zhí)行
3. 判斷是否符合中斷狀態(tài)捎废,符合的話設(shè)置中斷標(biāo)記
4. 執(zhí)行beforeExecute(),task.run()致燥,afterExecute()方法
5. 任何一個(gè)出異常都會導(dǎo)致任務(wù)執(zhí)行的終止登疗;進(jìn)入processWorkerExit來退出任務(wù)
6. 正常執(zhí)行的話會接著回到步驟2
getTask()
private Runnable getTask() {
//標(biāo)志是否獲取任務(wù)超時(shí)
boolean timedOut = false; // Did the last poll() time out?
?
//死循環(huán)
for (;;) {
//獲取線程池的控制狀態(tài)
int c = ctl.get();
//獲取線程池的runState
int rs = runStateOf(c);
?
// Check if queue empty only if necessary.
/*
*判斷線程池的狀態(tài),出現(xiàn)以下兩種情況
*1嫌蚤、runState大于等于SHUTDOWN狀態(tài)
*2辐益、runState大于等于STOP或者阻塞隊(duì)列為空
*將會通過CAS操作,進(jìn)行workerCount-1并返回null
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
?
//獲取線程池的workerCount
int wc = workerCountOf(c);
?
// Are workers subject to culling?
/*
*allowCoreThreadTimeOut:是否允許core Thread超時(shí)脱吱,默認(rèn)false
*workerCount是否大于核心核心線程池
*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
?
/*
*1智政、wc大于maximumPoolSize或者已超時(shí)
*2、隊(duì)列不為空時(shí)保證至少有一個(gè)任務(wù)
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
/*
*通過CAS操作箱蝠,workerCount-1
*能進(jìn)行-1操作续捂,證明wc大于maximumPoolSize或者已經(jīng)超時(shí)
*/
if (compareAndDecrementWorkerCount(c))
//-1操作成功垦垂,返回null
return null;
//-1操作失敗,繼續(xù)循環(huán)
continue;
}
?
try {
/*
*wc大于核心線程池
*執(zhí)行poll方法
*小于核心線程池
*執(zhí)行take方法
*/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
//判斷任務(wù)不為空返回任務(wù)
if (r != null)
return r;
//獲取一段時(shí)間沒有獲取到牙瓢,獲取超時(shí)
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
在上面的runWorker方法當(dāng)中我們可以看出劫拗,當(dāng)firstTask為空的時(shí)候,會通過該方法來接著獲取任務(wù)去執(zhí)行一罩,執(zhí)行邏輯順序如下:
1. 獲取線程池控制狀態(tài)和runState杨幼,判斷線程池是否已經(jīng)關(guān)閉或者正在關(guān)閉,是的話則workerCount-1操作返回null
2. 獲取workerCount判斷是否大于核心線程池
3. 判斷workerCount是否大于最大線程池?cái)?shù)目或者已經(jīng)超時(shí)聂渊,是的話workerCount-1,-1成功則返回null四瘫,不成功則回到步驟1重新繼續(xù)
4. 判斷workerCount是否大于核心線程池汉嗽,大于則用poll方法從隊(duì)列獲取任務(wù),否則用take方法從隊(duì)列獲取任務(wù)
5. 判斷任務(wù)是否為空找蜜,不為空則返回獲取的任務(wù)饼暑,否則回到步驟1重新繼續(xù)
processWorkerExit(Worker w, boolean completedAbruptly)
private void processWorkerExit(Worker w, boolean completedAbruptly) {
/*
*completedAbruptly:在runWorker出現(xiàn),代表是否突然完成的意思
*也就是在執(zhí)行任務(wù)過程當(dāng)中出現(xiàn)異常洗做,就會突然完成弓叛,傳true
*
*如果是突然完成,需要通過CAS操作诚纸,workerCount-1
*不是突然完成撰筷,則不需要-1,因?yàn)間etTask方法當(dāng)中已經(jīng)-1
*
*下面的代碼注釋貌似與代碼意思相反了
*/
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
?
//生成重入鎖
final ReentrantLock mainLock = this.mainLock;
//獲取鎖
mainLock.lock();
try {
//線程池統(tǒng)計(jì)的完成任務(wù)數(shù)completedTaskCount加上worker當(dāng)中完成的任務(wù)數(shù)
completedTaskCount += w.completedTasks;
//從HashSet<Worker>中移除
workers.remove(w);
} finally {
//釋放鎖
mainLock.unlock();
}
?
//因?yàn)樯鲜霾僮魇轻尫湃蝿?wù)或線程畦徘,所以會判斷線程池狀態(tài)毕籽,嘗試終止線程池
tryTerminate();
?
//獲取線程池的控制狀態(tài)
int c = ctl.get();
//判斷runState是否小魚STOP,即是RUNNING或者SHUTDOWN
//如果是RUNNING或者SHUTDOWN井辆,代表沒有成功終止線程池
if (runStateLessThan(c, STOP)) {
/*
*是否突然完成
*如若不是关筒,代表已經(jīng)沒有任務(wù)可獲取完成,因?yàn)間etTask當(dāng)中是while循環(huán)
*/
if (!completedAbruptly) {
/*
*allowCoreThreadTimeOut:是否允許core thread超時(shí)杯缺,默認(rèn)false
*min-默認(rèn)是corePoolSize
*/
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//允許core thread超時(shí)并且隊(duì)列不為空
//min為0蒸播,即允許core thread超時(shí),這樣就不需要維護(hù)核心核心線程池了
//如果workQueue不為空萍肆,則至少保持一個(gè)線程存活
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//如果workerCount大于min袍榆,則表示滿足所需,可以直接返回
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//如果是突然完成匾鸥,添加一個(gè)空任務(wù)的worker線程--這里我也不太理解
addWorker(null, false);
}
}
明顯的蜡塌,在執(zhí)行任務(wù)當(dāng)中,會去獲取任務(wù)進(jìn)行執(zhí)行勿负,那既然是執(zhí)行任務(wù)馏艾,肯定就會有執(zhí)行完或者出現(xiàn)異常中斷執(zhí)行的時(shí)候劳曹,那這時(shí)候肯定也會有相對應(yīng)的操作。代碼邏輯如下:
- 首先判斷線程是否突然終止琅摩,如果是突然終止铁孵,通過CAS,workerCount-1
- 統(tǒng)計(jì)線程池完成任務(wù)數(shù)房资,并將worker從workers當(dāng)中移除
- 判斷線程池狀態(tài)蜕劝,嘗試終止線程池
- 線程池沒有成功終止
- 判斷是否突然完成任務(wù),不是則進(jìn)行下一步轰异,是則進(jìn)行第三步
- 如允許核心線程超時(shí)岖沛,隊(duì)列不為空,則至少保證一個(gè)線程存活
- 添加一個(gè)空任務(wù)的worker線程
Worker內(nèi)部類
我們在上面已經(jīng)算是挺詳細(xì)地講了線程池執(zhí)行任務(wù)execute的執(zhí)行流程和一些細(xì)節(jié)搭独,在上面頻繁地出現(xiàn)了一個(gè)字眼婴削,那就是worker實(shí)例,那么這個(gè)worker究竟是什么呢牙肝?
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
?
/** 工作線程唉俗,如果工廠失敗則為空. */
final Thread thread;
/** 初始化任務(wù),有可能為空 */
Runnable firstTask;
/** 已完成的任務(wù)計(jì)數(shù) */
volatile long completedTasks;
?
/**
* 創(chuàng)建并初始化第一個(gè)任務(wù)配椭,使用線程工廠來創(chuàng)建線程
* 初始化有3步
*1虫溜、設(shè)置AQS的同步狀態(tài)為-1,表示該對象需要被喚醒
*2股缸、初始化第一個(gè)任務(wù)
*3衡楞、調(diào)用ThreadFactory來使自身創(chuàng)建一個(gè)線程,并賦值給worker的成員變量thread
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
?
//重寫Runnable的run方法
/** Delegates main run loop to outer runWorker */
public void run() {
//調(diào)用ThreadPoolExecutor的runWorker方法
runWorker(this);
}
?
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
//代表是否獨(dú)占鎖乓序,0-非獨(dú)占 1-獨(dú)占
protected boolean isHeldExclusively() {
return getState() != 0;
}
//重寫AQS的tryAcquire方法嘗試獲取鎖
protected boolean tryAcquire(int unused) {
//嘗試將AQS的同步狀態(tài)從0改為1
if (compareAndSetState(0, 1)) {
//如果改變成寺酪,則將當(dāng)前獨(dú)占模式的線程設(shè)置為當(dāng)前線程并返回true
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
//否則返回false
return false;
}
?
//重寫AQS的tryRelease嘗試釋放鎖
protected boolean tryRelease(int unused) {
//設(shè)置當(dāng)前獨(dú)占模式的線程為null
setExclusiveOwnerThread(null);
//設(shè)置AQS同步狀態(tài)為0
setState(0);
//返回true
return true;
}
?
//獲取鎖
public void lock() { acquire(1); }
//嘗試獲取鎖
public boolean tryLock() { return tryAcquire(1); }
//釋放鎖
public void unlock() { release(1); }
//是否被獨(dú)占
public boolean isLocked() { return isHeldExclusively(); }
?
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
我們可以看到Worker內(nèi)部類繼承AQS同步器并且實(shí)現(xiàn)了Runnable接口,所以Worker很明顯就是一個(gè)可執(zhí)行任務(wù)并且又可以控制中斷替劈、起到鎖效果的類寄雀。