1. 線程池的概念
1.1 基本概念
由于線程的生命周期中包括創(chuàng)建击敌、就緒介返、運(yùn)行、阻塞、銷(xiāo)毀階段圣蝎,當(dāng)我們待處理的任務(wù)數(shù)目較小時(shí)刃宵,我們可以自己創(chuàng)建幾個(gè)線程來(lái)處理相應(yīng)的任務(wù),但當(dāng)有大量的任務(wù)時(shí)徘公,由于創(chuàng)建牲证、銷(xiāo)毀線程需要很大的開(kāi)銷(xiāo),運(yùn)用線程池這些問(wèn)題就大大的緩解了关面。
1.2 使用線程池的好處
1.2.1 使用new Thread()創(chuàng)建線程的弊端
每次通過(guò)new Thread()創(chuàng)建對(duì)象性能不佳坦袍。
線程缺乏統(tǒng)一管理,可能無(wú)限制新建線程等太,相互之間競(jìng)爭(zhēng)捂齐,及可能占用過(guò)多系統(tǒng)資源導(dǎo)致死機(jī)或oom。
缺乏更多功能缩抡,如定時(shí)執(zhí)行奠宜、定期執(zhí)行、線程中斷瞻想。
1.2.2 使用Java線程池的好處
重用存在的線程压真,減少對(duì)象創(chuàng)建、消亡的開(kāi)銷(xiāo)蘑险,提升性能滴肿。
可有效控制最大并發(fā)線程數(shù),提高系統(tǒng)資源的使用率佃迄,同時(shí)避免過(guò)多資源競(jìng)爭(zhēng)泼差,避免堵塞。
提供定時(shí)執(zhí)行和屎、定期執(zhí)行拴驮、單線程、并發(fā)數(shù)控制等功能柴信。
2. Java 中有哪幾種線程池
2.1 CachedThreadPool
優(yōu)點(diǎn):
工作線程的創(chuàng)建數(shù)量幾乎沒(méi)有限制(其實(shí)也有限制的,數(shù)目為Interger. MAX_VALUE), 這樣可靈活的往線程池中添加線程。
如果長(zhǎng)時(shí)間沒(méi)有往線程池中提交任務(wù)宽气,即如果工作線程空閑了指定的時(shí)間(默認(rèn)為1分鐘)随常,則該工作線程將自動(dòng)終止。終止后萄涯,如果你又提交了新的任務(wù)绪氛,則線程池重新創(chuàng)建一個(gè)工作線程。
缺點(diǎn):
在使用CachedThreadPool時(shí)涝影,一定要注意控制任務(wù)的數(shù)量枣察,否則,由于大量線程同時(shí)運(yùn)行,很有會(huì)造成系統(tǒng)癱瘓序目。
2.2 FixedThreadPool
創(chuàng)建一個(gè)指定工作線程數(shù)量的線程池臂痕。每當(dāng)提交一個(gè)任務(wù)就創(chuàng)建一個(gè)工作線程,如果工作線程數(shù)量達(dá)到線程池初始的最大數(shù)猿涨,則將提交的任務(wù)存入到池隊(duì)列中握童。定長(zhǎng)線程池的大小最好根據(jù)系統(tǒng)資源進(jìn)行設(shè)置如Runtime.getRuntime().availableProcessors()
優(yōu)點(diǎn):
FixedThreadPool是一個(gè)典型且優(yōu)秀的線程池,它具有線程池提高程序效率和節(jié)省創(chuàng)建線程時(shí)所耗的開(kāi)銷(xiāo)的優(yōu)點(diǎn)叛赚。
缺點(diǎn):
但是澡绩,在線程池空閑時(shí),即線程池中沒(méi)有可運(yùn)行任務(wù)時(shí)俺附,它不會(huì)釋放工作線程肥卡,還會(huì)占用一定的系統(tǒng)資源。
2.3 SingleThreadExecutor
創(chuàng)建一個(gè)單線程化的Executor事镣,即只創(chuàng)建唯一的工作者線程來(lái)執(zhí)行任務(wù)召调,它只會(huì)用唯一的工作線程來(lái)執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級(jí))執(zhí)行蛮浑。如果這個(gè)線程異常結(jié)束唠叛,會(huì)有另一個(gè)取代它,保證順序執(zhí)行沮稚。單工作線程最大的特點(diǎn)是可保證順序地執(zhí)行各個(gè)任務(wù)艺沼,并且在任意給定的時(shí)間不會(huì)有多個(gè)線程是活動(dòng)的。
2.4 ScheduleThreadPool
創(chuàng)建一個(gè)定長(zhǎng)的線程池蕴掏,而且支持定時(shí)的以及周期性的任務(wù)執(zhí)行障般,支持定時(shí)及周期性任務(wù)執(zhí)行。
3. 如何實(shí)現(xiàn)自定義的線程池
3.1 線程池具體實(shí)現(xiàn)解析
當(dāng)我們使用 線程池的時(shí)候盛杰,可以使用 newCachedThreadPool()或者 newFixedThreadPool(int)等方法挽荡,其實(shí)我們深入到這些方法里面,就可以看到它們的是實(shí)現(xiàn)方式是這樣的即供。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
可以看到定拟,線程池的具體實(shí)現(xiàn)是調(diào)用 ThreadPoolExecutor 的構(gòu)造方法實(shí)現(xiàn)的,那么下面來(lái)具體看看 ThreadPoolExecutor 構(gòu)造方法的參數(shù)逗嫡。
3.2 ThreadPoolExecutor 參數(shù)詳解
先來(lái)看看 ThreadPoolExecutor 的構(gòu)造方法:
通過(guò)查看 ThreadPoolExecutor 源碼可以看到青自,該類(lèi)有四個(gè)構(gòu)造方法,頭三個(gè)構(gòu)造方法驱证,其實(shí)都是調(diào)用的第四個(gè)構(gòu)造方法延窜,所以我們就解釋一下第四個(gè)構(gòu)造方法的參數(shù)含義。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize:核心線程池的大小抹锄,在線程池被創(chuàng)建之后逆瑞,其實(shí)里面是沒(méi)有線程的荠藤。(當(dāng)然,調(diào)用 prestartAllCoreThreads() 或者 prestartCoreThread() 方法會(huì)預(yù)創(chuàng)建線程获高,而不用等著任務(wù)的到來(lái))哈肖。當(dāng)有任務(wù)進(jìn)來(lái)的時(shí)候,才會(huì)創(chuàng)建線程谋减。當(dāng)線程池中的線程數(shù)量達(dá)到corePoolSize之后牡彻,就把任務(wù)放到 緩存隊(duì)列當(dāng)中。(就是 workQueue )出爹。
maximumPoolSize:最大線程數(shù)量是多少庄吼。它標(biāo)志著這個(gè)線程池的最大線程數(shù)量。如果沒(méi)有最大數(shù)量严就,當(dāng)創(chuàng)建的線程數(shù)量達(dá)到了 某個(gè)極限值总寻,到最后內(nèi)存肯定就爆掉了。
keepAliveTime:當(dāng)線程沒(méi)有任務(wù)時(shí)梢为,最多保持的時(shí)間渐行,超過(guò)這個(gè)時(shí)間就被終止了。默認(rèn)情況下铸董,只有 線程池中線程數(shù)量 大于 corePoolSize 時(shí)祟印,keepAliveTime 值才會(huì)起作用。也就說(shuō)說(shuō)粟害,只有在線程池線程數(shù)量超出 corePoolSize 了蕴忆。我們才會(huì)把超時(shí)的空閑線程給停止掉。否則就保持線程池中有 corePoolSize 個(gè)線程就可以了悲幅。默認(rèn)值是60秒套鹅。
Unit:參數(shù)keepAliveTime的時(shí)間單位,就是 TimeUnit類(lèi)當(dāng)中的幾個(gè)屬性汰具。
-
workQueue:用來(lái)存儲(chǔ)待執(zhí)行任務(wù)的隊(duì)列卓鹿,不同的線程池它的隊(duì)列實(shí)現(xiàn)方式不同(因?yàn)檫@關(guān)系到排隊(duì)策略的問(wèn)題)比如有以下幾種:
ArrayBlockingQueue:基于數(shù)組的隊(duì)列,創(chuàng)建時(shí)需要指定大小留荔。
LinkedBlockingQueue:基于鏈表的隊(duì)列吟孙,如果沒(méi)有指定大小,則默認(rèn)值是 Integer.MAX_VALUE存谎。(newFixedThreadPool和newSingleThreadExecutor使用的就是這種隊(duì)列)拔疚,吞吐量通常要高于ArrayBlockingQuene。
SynchronousQueue:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列既荚,每個(gè)插入操作必須等到另一個(gè)線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài)栋艳,吞吐量通常要高于LinkedBlockingQuene(newCachedThreadPool使用的就是這種隊(duì)列)恰聘。
threadFactory:線程工廠,用來(lái)創(chuàng)建線程。通過(guò)自定義的線程工廠可以給每個(gè)新建的線程設(shè)置一個(gè)具有識(shí)別度的線程名晴叨。
-
Handler:拒絕執(zhí)行任務(wù)時(shí)的策略凿宾,一般來(lái)講有以下四種策略:
ThreadPoolExecutor.AbortPolicy 丟棄任務(wù),并拋出 RejectedExecutionException 異常兼蕊。
ThreadPoolExecutor.CallerRunsPolicy:該任務(wù)被線程池拒絕初厚,由調(diào)用 execute 方法的線程執(zhí)行該任務(wù)。
ThreadPoolExecutor.DiscardOldestPolicy : 拋棄隊(duì)列最前面的任務(wù)孙技,然后重新嘗試執(zhí)行任務(wù)产禾。
ThreadPoolExecutor.DiscardPolicy,丟棄任務(wù)牵啦,不過(guò)也不拋出異常亚情。
3.3 線程池的處理流程
下圖是提交任務(wù)給線程池之后, 線程池的處理流程圖
如果當(dāng)前線程池線程數(shù)目小于 corePoolSize(核心池還沒(méi)滿(mǎn)呢)哈雏,那么就創(chuàng)建一個(gè)新線程去處理任務(wù)楞件。
如果核心池已經(jīng)滿(mǎn)了裳瘪,來(lái)了一個(gè)新的任務(wù)后皆怕,會(huì)嘗試將其添加到任務(wù)隊(duì)列中虱黄,如果成功,則等待空閑線程將其從隊(duì)列中取出并且執(zhí)行,如果隊(duì)列已經(jīng)滿(mǎn)了,則繼續(xù)下一步搪桂。
此時(shí)裸燎,如果線程池線程數(shù)量 小于 maximumPoolSize蕴纳,則創(chuàng)建一個(gè)新線程執(zhí)行任務(wù)稻薇,否則案狠,那就說(shuō)明線程池到了最大飽和能力了,沒(méi)辦法再處理了名段,此時(shí)就按照拒絕策略來(lái)處理阱扬。(就是構(gòu)造函數(shù)當(dāng)中的 Handler 對(duì)象)泣懊。
如果線程池的線程數(shù)量大于 corePoolSize伸辟,則當(dāng)某個(gè)線程的空閑時(shí)間超過(guò)了 keepAliveTime,那么這個(gè)線程就要被銷(xiāo)毀了馍刮,直到線程池中線程數(shù)量不大于 corePoolSize 為止信夫。
4. 線程池的實(shí)現(xiàn)原理
4.1 線程池的處理流程
一個(gè)線程從被提交(submit)到執(zhí)行共經(jīng)歷以下流程:
判斷核心線程池中的線程是否都在執(zhí)行任務(wù),如果不是則新建一個(gè)工作線程執(zhí)行任務(wù)卡啰,如果都在執(zhí)行任務(wù)則進(jìn)入到第二個(gè)流程
判斷工作隊(duì)列是否已滿(mǎn)静稻,如果工作隊(duì)列沒(méi)有滿(mǎn),則新提交的任務(wù)存儲(chǔ)在工作隊(duì)列中匈辱,如果滿(mǎn)了則進(jìn)入到第三個(gè)流程
判斷線程池內(nèi)部的線程是否都處于工作狀態(tài)振湾,如果不是則新建一個(gè)工作線程執(zhí)行任務(wù),如果都在執(zhí)行任務(wù)亡脸,則交給任務(wù)飽和度策略來(lái)處理這個(gè)任務(wù)押搪。
線程池在執(zhí)行excute方法時(shí),主要有以下四種情況:
如果當(dāng)前線程數(shù)少于 coolPoolSize 浅碾,那么新建一個(gè)工作線程執(zhí)行任務(wù)大州。(需要獲得全局鎖)
如果當(dāng)前線程數(shù)等于或大于 coolPoolSize , 那么將創(chuàng)建的線程放入到 BlockQueue 中垂谢。
如果如果無(wú)法將任務(wù)放入到 BlockQueue(已滿(mǎn))厦画,那么新建一個(gè)線程執(zhí)行任務(wù)。(需要獲得全局鎖)
如果新建的線程數(shù)大于了當(dāng)前線程池的 maxiumPoolSize 滥朱, 那么任務(wù)會(huì)被拒絕根暑,交給 RejectedExecutionHandler.rejectedExecution() 方法處理。
4.2 ThreadPoolExecutor 源碼分析
4.2.1 定義的幾個(gè)變量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
在分析源碼前有必要理解一個(gè)變量 ctl 徙邻。這是 Java 大神們?yōu)榱税压ぷ骶€程數(shù)量和線程池狀態(tài)放在一個(gè)int類(lèi)型變量里存儲(chǔ)而設(shè)置的一個(gè)原子類(lèi)型的變量排嫌。 在 ctl 中,低位的 29 位表示工作線程的數(shù)量鹃栽,高位用來(lái)表示 RUNNING躏率、SHUTDOWN、STOP 等狀態(tài)民鼓。 因此一個(gè)線程池的數(shù)量也就變成了 (2^29)-1 薇芝,大約 500 million ,而不是 (2^31)-1 丰嘉, 2billion 夯到。上面定義的三個(gè)方法只是為了計(jì)算得到線程池的狀態(tài)和工作線程的數(shù)量。
4.2.2 Execute 方法
public void execute(Runnable command) {
//如果提交了空的任務(wù) 拋出異常
if (command == null)
throw new NullPointerException();
int c = ctl.get();//獲取當(dāng)前線程池的狀態(tài)
//檢查當(dāng)前工作線程數(shù)量是否小于核心線程數(shù)量
if (workerCountOf(c) < corePoolSize) {
//通過(guò)addWorker方法提交任務(wù)
if (addWorker(command, true))
return;
c = ctl.get();//如果提交失敗 需要二次檢查狀態(tài)
}
//向工作線程提交任務(wù)
if (isRunning(c) && workQueue.offer(command)) {
// 再次檢查狀態(tài)
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//擴(kuò)容失敗 則拒絕任務(wù)
else if (!addWorker(command, false))
reject(command);
}
從源碼中可以看到提交任務(wù)的這一過(guò)程基本與第二個(gè)圖的四個(gè)流程是一致的饮亏,需要檢查的是當(dāng)前工作線程的數(shù)量與核心線程數(shù)量的關(guān)系耍贾,來(lái)決定提交任務(wù)的方式或者是拒絕任務(wù)提交阅爽。而具體任務(wù)的提交工作是在addWorker方法中。在這里面看到了recheck這樣的變量荐开,這是在執(zhí)行了一些動(dòng)作失敗后再次檢查線程池的狀態(tài)付翁,因?yàn)樵谶@期間可能有線程池關(guān)閉獲得線程池飽和等狀態(tài)的改變。
4.2.3 addWorker 方法
這個(gè)方法是任務(wù)提交的一個(gè)核心方法晃听。在里面完成了狀態(tài)檢查百侧、新建任務(wù)、執(zhí)行任務(wù)等一系列動(dòng)作能扒。它有兩個(gè)參數(shù)佣渴,第一個(gè)參數(shù)是提交的任務(wù),第二個(gè)參數(shù)是一個(gè)標(biāo)識(shí)符初斑,標(biāo)識(shí)在檢查工作線程數(shù)量的時(shí)候是應(yīng)該與 corePoolSize 對(duì)比還是應(yīng)該 maximumPoolSize 對(duì)比辛润。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//死循環(huán)更新?tīng)顟B(tài)
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);//獲取運(yùn)行狀態(tài)
//檢查線程池是否處于關(guān)閉狀態(tài)
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//獲取當(dāng)前工作線程數(shù)量
int wc = workerCountOf(c);
//如果已經(jīng)超過(guò)corePoolSize獲取maximumPoolSize 返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS增加一個(gè)工作線程
if (compareAndIncrementWorkerCount(c))
break retry;
//再次獲取狀態(tài)
c = ctl.get(); // Re-read ctl
//如果狀態(tài)更新失敗 則循環(huán)更新
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);//初始化一個(gè)工作線程
final Thread t = w.thread;
if (t != null) {
//獲得鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // SHUTDOWN以后的狀態(tài)和SHUTDOWN狀態(tài)下firstTask為null,不可新增線程
throw new IllegalThreadStateException();
workers.add(w); //添加工作這到hashset中保存
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s; //記錄最大線程數(shù)
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//工作線程啟動(dòng) 執(zhí)行第一個(gè)任務(wù) 就是新提交的任務(wù)
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
這個(gè)方法可以分為兩個(gè)階段來(lái)看见秤,第一個(gè)階段是判斷是否有必要新增一個(gè)工作線程砂竖,如果有則利用CAS更新工作線程的數(shù)量;第二部分是將提交的任務(wù)封裝成一個(gè)工作線程Worker然后加入到線程池的容器中秦叛,開(kāi)始執(zhí)行新提交的任務(wù)晦溪。這個(gè)Worker在執(zhí)行完任務(wù)后,還會(huì)循環(huán)地獲取工作隊(duì)列里的任務(wù)來(lái)執(zhí)行挣跋。下面來(lái)看一下Worker的構(gòu)造方法就能更好地理解上面的代碼了
4.2.4 runWorker 方法
在addWorker方法快要結(jié)束的地方三圆,調(diào)用了t.start()方法,我們知道它實(shí)際執(zhí)行的就是Worker對(duì)象的run()方法避咆,而worker的run()方法是這樣定義的:
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
它實(shí)際上是將自己委托給線程池的runWorker方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//不斷地從blockingQueue獲取任務(wù)
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//執(zhí)行beforeExecute方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
//調(diào)用Runable的run方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 執(zhí)行aferExecute方法
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
這個(gè)方法呢也比較好理解舟肉,它在不斷執(zhí)行我們提交的任務(wù)的run方法。而這個(gè)任務(wù)可能是我們新提交的查库,也有可能是從等待隊(duì)列中獲取的路媚。這樣就實(shí)現(xiàn)了線程池的完整邏輯。
4.2.5 shutdown樊销,shutdownNow 方法
shutdown 方法
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess(); //這個(gè)方法校驗(yàn)線程訪問(wèn)許可整慎,不是很理解,后面有時(shí)間再單獨(dú)解析围苫;
advanceRunState(SHUTDOWN); //轉(zhuǎn)換線程池狀態(tài)為SHUTDOWN
interruptIdleWorkers(); //中斷所有空閑的線程
onShutdown(); // 空實(shí)現(xiàn)方法裤园,是做shutdown清理操作的
} finally {
mainLock.unlock();
}
tryTerminate(); //嘗試結(jié)束線程池(設(shè)置狀態(tài)為T(mén)ERMINATED)
}
shutdownNow 方法
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();//同上
advanceRunState(STOP);//轉(zhuǎn)換線程池狀態(tài)到STOP
interruptWorkers();//中斷所有線程
tasks = drainQueue();//獲取到任務(wù)隊(duì)列所有任務(wù),并清空隊(duì)列
} finally {
mainLock.unlock();
}
tryTerminate();//同上
return tasks;
}
由上可知剂府,兩個(gè)關(guān)閉方法的區(qū)別:
shutdown 設(shè)置狀態(tài)為 SHUTDOWN拧揽,而 shutdownNow 設(shè)置狀態(tài)為 STOP
shutdown 值中斷空閑的線程,已提交的任務(wù)可以繼續(xù)被執(zhí)行,而 shutdownNow 中斷所有線程
shutdown 無(wú)返回值淤袜,shutdownNow 返回任務(wù)隊(duì)列中還未執(zhí)行的任務(wù)
雖然有 shutdown 和 shutdownNow 方法痒谴,但是還是不能滿(mǎn)足一個(gè)需求:就是需要知道等待所有任務(wù)已完成線程池結(jié)束,這里 ThreadPoolExecutor 提供了 awaitTermination 方法滿(mǎn)足這個(gè)需求:
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
這個(gè)方法兩個(gè)入?yún)ⅲO(shè)置等待超時(shí)時(shí)間铡羡。
如果狀態(tài)已經(jīng)是 TERMINATED 返回 true 积蔚,表示已關(guān)閉。
否則一直等到 termination 的 signalAll 至超時(shí)或者當(dāng)前線程中斷蓖墅。超時(shí)后都線程池都沒(méi)有關(guān)閉库倘,返回 false。
5. 代碼練習(xí)
5.1 當(dāng)線程池滿(mǎn)后處理剩余任務(wù)的 handler
ThreadPoolExecutor.CallerRunsPolicy:該任務(wù)被線程池拒絕论矾,由調(diào)用 execute 方法的線程執(zhí)行該任務(wù)。
public class ThreadPoolDemo {
public static void main(String[] args) {
ThreadPoolExecutor exec = new ThreadPoolExecutor(3, 5, 2,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(5), new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 15; i++) {
exec.execute(new Task());
}
exec.shutdown();
}
}
class Task implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " >>> start.");
// mock run for a while
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
運(yùn)行結(jié)果:可以看到有兩個(gè)線程交給了 main 方法處理杆勇。
pool-1-thread-1 >>> start.
pool-1-thread-2 >>> start.
pool-1-thread-3 >>> start.
main >>> start.
pool-1-thread-5 >>> start.
pool-1-thread-4 >>> start.
main >>> start.
pool-1-thread-4 >>> start.
pool-1-thread-3 >>> start.
pool-1-thread-2 >>> start.
pool-1-thread-1 >>> start.
pool-1-thread-5 >>> start.
pool-1-thread-4 >>> start.
pool-1-thread-5 >>> start.
pool-1-thread-1 >>> start.
Process finished with exit code 0
5.2 自定義線程池和線程工廠
5.2.1 自定義線程池贪壳,實(shí)現(xiàn)計(jì)時(shí)和統(tǒng)計(jì)功能
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
private final Logger log = Logger.getLogger("MyThreadPoolExecutor");
private final AtomicLong numTasks = new AtomicLong(1);
private final AtomicLong totalTime = new AtomicLong();
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
/**
* 任務(wù)執(zhí)行前
*/
protected void beforeExecute(Thread t,Runnable r){
super.beforeExecute(t, r);
log.fine(String.format("Thread %s: start %s",t,r));
startTime.set((long) (System.nanoTime()/Math.pow(10, 9)));
}
/**
* 任務(wù)執(zhí)行后
* @param r 任務(wù)
* @param t 執(zhí)行任務(wù)的線程
*/
protected void afterExecutor(Runnable r,Throwable t){
try {
Long endTime = (long) (System.nanoTime() / Math.pow(10,9));
Long taskTime = endTime - startTime.get();
numTasks.incrementAndGet();
totalTime.addAndGet(taskTime);
log.fine(String.format("Thread %s: end%s,time=%ds", taskTime));
} finally {
super.afterExecute(r, t);
}
}
protected void terminated () {
try {
log.info(String.format("Terminated: avg time=%ds", totalTime.get() / numTasks.get()));
} finally {
super.terminated();
}
}
}
5.2.2 自定義線程工廠,自定義線程名稱(chēng)
class MyThreadFactory implements ThreadFactory {
private String name;
private int counter;
public MyThreadFactory(String name) {
this.name = name;
this.counter = 1;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("[Task ID + " + (counter++) + ": " + name + "]");
return t;
}
}
public class ThreadPoolDemo {
public static void main(String[] args) {
ThreadPoolExecutor exec = new ThreadPoolExecutor(3, 5, 2,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(5), new MyThreadFactory("Test Thread"), new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 15; i++) {
exec.execute(new Task());
}
exec.shutdown();
}
}