寫在前面的話
并發(fā)編程里面贴谎,線程池這個(gè)一直就想寫一篇文章來總結(jié)下,但是直到并發(fā)編程系列的第12篇才寫的原因是線程池里面用到了AQS同步隊(duì)列和阻塞隊(duì)列等一些知識季稳,所以為了鋪墊赴精,就先把前面的知識點(diǎn)寫完了,到現(xiàn)在绞幌,終于可以總結(jié)一下線程池的實(shí)現(xiàn)原理了蕾哟。
什么是線程池
在Java中,創(chuàng)建一個(gè)線程可以通過繼承Thread或者實(shí)現(xiàn)Runnable接口來實(shí)現(xiàn),但是,如果每個(gè)請求都創(chuàng)建一個(gè)新線程谭确,那么創(chuàng)建和銷毀線程花費(fèi)的時(shí)間和消耗的系統(tǒng)資源都相當(dāng)大帘营,甚至可能要比在處理實(shí)際的用戶請求的時(shí)間和資源要多的多。
為了解決這個(gè)問題,就有了線程池的概念逐哈,線程池的核心邏輯是提前創(chuàng)建好若干個(gè)線程放在一個(gè)容器中芬迄。如果有任務(wù)需要處理,則將任務(wù)直接分配給線程池中的線程來執(zhí)行就行昂秃,任務(wù)處理完以后這個(gè)線程不會被銷毀禀梳,而是等待后續(xù)分配任務(wù)。同時(shí)通過線程池來重復(fù)管理線程還可以避免創(chuàng)建大量線程增加開銷肠骆。
創(chuàng)建線程池
為了方便使用算途,Java中的Executors類里面提供了幾個(gè)線程池的工廠方法,可以直接利用提供的方法創(chuàng)建不同類型的線程池:
- newFixedThreadPool:創(chuàng)建一個(gè)固定線程數(shù)的線程池
- newSingleThreadExecutor:創(chuàng)建只有1個(gè)線程的線程池
- newCachedThreadPool:返回一個(gè)可根據(jù)實(shí)際情況調(diào)整線程個(gè)數(shù)的線程池蚀腿,不限制最大線程 數(shù)量嘴瓤,若用空閑的線程則執(zhí)行任務(wù),若無任務(wù)則不創(chuàng)建線程莉钙。并且每一個(gè)空閑線程會在60秒 后自動回收廓脆。
- newScheduledThreadPool: 創(chuàng)建一個(gè)可以指定線程的數(shù)量的線程池,但是這個(gè)線程池還帶有 延遲和周期性執(zhí)行任務(wù)的功能磁玉,類似定時(shí)器停忿。
FixedThreadPool
創(chuàng)建一個(gè)固定數(shù)量N個(gè)線程在一個(gè)共享的無邊界隊(duì)列上操作的線程池。在任何時(shí)候蚊伞,最多N個(gè)線程被激活處理任務(wù)席赂。如果所有線程都在活動狀態(tài)時(shí)又有新的任務(wù)被提交,那么新提交的任務(wù)會加入隊(duì)列等待直到有線程可用為止厚柳。
如果有任何線程在shutdown前因?yàn)槭《唤K止,那么當(dāng)有新的任務(wù)需要執(zhí)行時(shí)會產(chǎn)生一個(gè)新的線程沐兵,新的線程將會一直存在線程池中别垮,直到被顯式的shutdown。
示例
package com.zwx.concurrent.threadPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestThreadPool {
public static void main(String[] args) {
//FixedThreadPool - 固定線程數(shù)
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i=0;i<10;i++){
fixedThreadPool.execute(()-> {
System.out.println("線程名:" + Thread.currentThread().getName());
});
}
fixedThreadPool.shutdown();
}
}
輸出結(jié)果為:
可以看到扎谎,最多只有3個(gè)線程在循環(huán)執(zhí)行任務(wù)(運(yùn)行結(jié)果是不一定的碳想,但是最多只會有3個(gè)線程)。
FixedThreadPool調(diào)用了如下方法構(gòu)造線程池:
SingleThreadExecutor
只有一個(gè)工作線程的執(zhí)行器毁靶。如果這個(gè)線程在正常關(guān)閉前因?yàn)閳?zhí)行失敗而被關(guān)閉胧奔,那么就會重新創(chuàng)建一個(gè)新的線程加入執(zhí)行器。
這種執(zhí)行器可以保證所有的任務(wù)按順序執(zhí)行预吆,并且在任何給定的時(shí)間內(nèi)龙填,確保活動的任務(wù)只有1個(gè)。
示例
package com.zwx.concurrent.threadPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestThreadPool {
public static void main(String[] args) {
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i=0;i<9;i++){
singleThreadExecutor.execute(()-> {
System.out.println("線程名:" + Thread.currentThread().getName());
});
}
}
}
singleThreadExecutor.shutdown();
運(yùn)行結(jié)果只有1個(gè)線程:
SingleThreadExecutor調(diào)用了如下方法構(gòu)造線程池:
CachedThreadPool
一個(gè)在需要處理任務(wù)時(shí)才會創(chuàng)建線程的線程池岩遗,如果一個(gè)線程處理完任務(wù)了還沒有被回收扇商,那么線程可以被重復(fù)使用。
當(dāng)我們調(diào)用execute方法時(shí)宿礁,如果之前創(chuàng)建的線程有空閑可用的案铺,則會復(fù)用之前創(chuàng)建好的線程,否則就會創(chuàng)建新的線程加入到線程池中梆靖。
創(chuàng)建好的線程如果在60s內(nèi)沒被使用控汉,那么線程就會被終止并移出緩存。因此返吻,這種線程池可以保持長時(shí)間空閑狀態(tài)而不會消耗任何資源姑子。
示例
package com.zwx.concurrent.threadPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestThreadPool {
public static void main(String[] args) {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i=0;i<9;i++){
cachedThreadPool.execute(()-> {
System.out.println("線程名:" + Thread.currentThread().getName());
});
}
cachedThreadPool.shutdown();
}
輸出結(jié)果可以看到,創(chuàng)建了9個(gè)不同的線程:
接下來我們對上面的示例改造一下思喊,在執(zhí)行execute之前休眠一段時(shí)間:
package com.zwx.concurrent.threadPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestThreadPool {
public static void main(String[] args) {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i=0;i<9;i++){
try {
Thread.sleep(i * 10L);
} catch (InterruptedException e) {
e.printStackTrace();
}
cachedThreadPool.execute(()-> {
System.out.println("線程名:" + Thread.currentThread().getName());
});
}
cachedThreadPool.shutdown();
}
這時(shí)候輸出的結(jié)果就只有1個(gè)線程了壁酬,因?yàn)橛胁糠志€程可以被復(fù)用:
注意:這兩個(gè)示例的結(jié)果都不是固定的,第一種有可能也不會創(chuàng)建9個(gè)線程恨课,第二種也有可能不止創(chuàng)建1個(gè)線程舆乔,具體要看線程的執(zhí)行情況。
CachedThreadPool調(diào)用了如下方法構(gòu)造線程池
ScheduledThreadPool
創(chuàng)建一個(gè)線程池剂公,它可以在調(diào)度命令給定的延遲后運(yùn)行或定期執(zhí)行希俩。這個(gè)相比較于其他的線程池,其自定義了一個(gè)子類ScheduledExecutorService繼承了ExecutorService纲辽。
示例
package com.zwx.concurrent.threadPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Executors;
public class TestThreadPool {
public static void main(String[] args) {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
for (int i=0;i<9;i++){
scheduledThreadPool.execute(()->{
System.out.println("線程名:" + Thread.currentThread().getName());
});
}
scheduledThreadPool.shutdown();
}
}
輸出結(jié)果(執(zhí)行結(jié)果具有隨機(jī)性颜武,最多只有3個(gè)線程執(zhí)行):
ScheduledThreadPool最終調(diào)用了如下方法構(gòu)造線程池
線程池原理
根據(jù)上面的截圖可以看到,列舉的4中常用的線程池在構(gòu)造時(shí)拖吼,最終調(diào)用的方法都是ThreadPoolExecutor類的構(gòu)造方法鳞上,所以要分析原理,我們就去看看ThreadPoolExecutor吧吊档!
構(gòu)造線程池7大參數(shù)
下面就是ThreadPoolExecutor類中最完整的一個(gè)構(gòu)造方法:
這個(gè)就是是構(gòu)造線程池的核心方法篙议,總共有7個(gè)參數(shù):
- corePoolSize:核心線程數(shù)量。一直保留在池中的線程怠硼,核心線程即使空閑狀態(tài)也不會被回收鬼贱,除非設(shè)置了allowCoreThreadTimeOut屬性
- maximumPoolSize:最大線程數(shù)量。線程池中允許的最大線程數(shù)香璃,大于等于核心線程數(shù)
- keepAliveTime:活躍時(shí)間这难。當(dāng)最大線程數(shù)比核心線程數(shù)更大時(shí),超出核心的線程數(shù)的其他線程如果空間時(shí)間超過keepAliveTime會被回收
- TimeUnit:活躍時(shí)間的單位
- BlockingQueue:阻塞隊(duì)列葡秒。用于存儲尚等待被執(zhí)行的任務(wù)姻乓。
- ThreadFactory:創(chuàng)建線程的工廠類
- RejectedExecutionHandler:拒絕策略嵌溢。當(dāng)達(dá)到了線程邊界和隊(duì)列容量時(shí)提交的任務(wù)被阻塞時(shí)執(zhí)行的策略。
線程池執(zhí)行流程
execute(Runnable) 方法的主流程非常清晰:
根據(jù)上面源碼糖权,可以得出線程池執(zhí)行流程圖如下:
源碼分析
首先看看ThreadPoolExecutor類中的ctl堵腹,是一個(gè)32位的int類型,其中將高3位用來表示線程數(shù)量星澳,低29位用來表示疚顷,其中的計(jì)算方式都是采用二進(jìn)制來計(jì)算。
其中各種狀態(tài)的轉(zhuǎn)換關(guān)系如下圖:
其中狀態(tài)的大小關(guān)系為:
RUNNING<SHUTDOWN<STOP<TIDYING<TERMINATED
addWork方法
private boolean addWorker(Runnable firstTask, boolean core) {
//第一段邏輯:線程數(shù)+1
retry:
for (;;) {
int c = ctl.get();//獲取線程池容量
int rs = runStateOf(c);//獲取狀態(tài)
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&//即:SHUTDOWN禁偎,STOP腿堤,TIDYING,TERMINATED
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))//即:rs==RUNNING,firstTask!=null,queue==null
return false;//如果已經(jīng)關(guān)閉如暖,不接受任務(wù)笆檀;如果正在運(yùn)行,且queue為null盒至,也返回false
for (;;) {
int wc = workerCountOf(c);//獲取當(dāng)前的工作線程數(shù)
//如果工作線程數(shù)大于等于容量或者大于等于核心線程數(shù)(最大線程數(shù))酗洒,那么就不能再添加worker
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))//cas增加線程數(shù),失敗則再次自旋嘗試
break retry;
c = ctl.get(); // Re-read ctl //再次獲取工作線程數(shù)
if (runStateOf(c) != rs)//不相等說明線程池的狀態(tài)發(fā)生了變化枷遂,繼續(xù)自旋嘗試
continue retry;
}
}
//第二段邏輯:將線程構(gòu)造成Worker對象樱衷,并添加到線程池
boolean workerStarted = false;//工作線程是否啟動成功
boolean workerAdded = false;//工作線程是否添加成功
Worker w = null;
try {
w = new Worker(firstTask);//構(gòu)建一個(gè)worker
final Thread t = w.thread;//去除worker中的線程
if (t != null) {
final ReentrantLock mainLock = this.mainLock;//獲取重入鎖
mainLock.lock();//上鎖
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());//獲得鎖之后,再次檢查狀態(tài)
//只有當(dāng)前線程池是正在運(yùn)行狀態(tài)酒唉,[或是 SHUTDOWN 且 firstTask 為空]矩桂,才能添加到 workers 集合中
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);//將新創(chuàng)建的 Worker 添加到 workers 集合中
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;//更新線程池中線程的數(shù)量
workerAdded = true;//添加線程(worker)成功
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();//這里就會去執(zhí)行Worker中的run()方法
workerStarted = true;//啟動成功
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);//如果啟動線程失敗,需要回滾
}
return workerStarted;
}
這個(gè)方法主要就是做兩件事:
- 一痪伦、將線程數(shù)+1
- 二侄榴、將線程構(gòu)造成Worker對象,加入到線程池中网沾,并調(diào)用start()方法啟動線程
Worker對象
上面這個(gè)方法繼承了AbstractQueuedSynchronizer癞蚕,前面我們講述AQS同步隊(duì)列的時(shí)候知道,AQS就是一個(gè)同步器辉哥,那么既然有線程的同步器桦山,這里為什么不直接使用,反而要繼承之后重寫呢证薇?
這是因?yàn)锳QS同步器內(nèi)是支持鎖重入的度苔,但是線程池這里的設(shè)計(jì)思想是并不希望支持重入匆篓,所以才會重寫一個(gè)AQS來避免重入浑度。
Worker中state初始化狀態(tài)設(shè)置為-1,原因是在初始化Worker對象的時(shí)候鸦概,在線程真正執(zhí)行runWorker()方法之前箩张,不能被中斷甩骏。而一旦線程構(gòu)造完畢并開始執(zhí)行任務(wù)的時(shí)候,是允許被中斷的先慷,所以在線程進(jìn)入runWorker()之后的第一件事就是將state設(shè)置為0(無鎖狀態(tài))饮笛,也就是允許被中斷。
我們再看看Worker的構(gòu)造器:
addWork方法執(zhí)行到這句:w = new Worker(firstTask);//構(gòu)建一個(gè)worker 的時(shí)候论熙,就會調(diào)用構(gòu)造器創(chuàng)建一個(gè)Worker對象福青,state=-1,并且將當(dāng)前任務(wù)作為firstTask脓诡,后面再運(yùn)行的時(shí)候會優(yōu)先執(zhí)行firstTask无午。
上面addWorker方法在worker構(gòu)造成功之后,就會調(diào)用worker.start方法祝谚,這時(shí)候就會去執(zhí)行Worker中的run()方法宪迟,這也是一種委派的方式
run()方法中調(diào)用了runWorker(this)方法,這個(gè)方法就是真正執(zhí)行任務(wù)的方法:
runWorker(this)方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
/**
* 表示當(dāng)前worker線程允許中斷交惯,因?yàn)閚ew Worker默認(rèn)的 state=-1,此處是調(diào)用
* Worker類的 tryRelease()方法次泽,state置為 0,
* 而 interruptIfStarted()中只有 state>=0 才允許調(diào)用中斷
*/
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
/**
* 加鎖席爽,這里加鎖不僅僅是為了防止并發(fā)意荤,更是為了當(dāng)調(diào)用shutDown()方法的時(shí)候線程不被中斷,
* 因?yàn)閟hutDown()的時(shí)候在中斷線程之前會調(diào)用tryLock方法嘗試獲取鎖拳昌,獲取鎖成功才會中斷
*/
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
/**
* 如果是以下兩種情況袭异,需要中斷線程
* 1.如果state>=STOP,且線程中斷標(biāo)記為false
* 2.如果state<STOP,獲取中斷標(biāo)記并復(fù)位,如果線程被中斷炬藤,那么御铃,再次判斷state是否STOP
* 如果是的話,且線程中斷標(biāo)記為false
*/
if ((runStateAtLeast(ctl.get(), STOP) ||//狀態(tài)>=STOP
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();//中斷線程
try {
beforeExecute(wt, task);//空方法沈矿,我們可以重寫它上真,在執(zhí)行任務(wù)前做點(diǎn)事情,常用于線程池運(yùn)行的監(jiān)控和統(tǒng)計(jì)
Throwable thrown = null;
try {
task.run();//正式調(diào)用run()執(zhí)行任務(wù)
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);//執(zhí)行任務(wù)之后調(diào)用,也是個(gè)空方法羹膳,我們可以重寫它睡互,在執(zhí)行任務(wù)后做點(diǎn)事情,常用于線程池運(yùn)行的監(jiān)控和統(tǒng)計(jì)
}
} finally {
task = null;//將任務(wù)設(shè)置為空,那么下次循環(huán)就會通過getTask()方法從workerQueue中取任務(wù)了
w.completedTasks++;//任務(wù)完成數(shù)+1
w.unlock();
}
}
completedAbruptly = false;
} finally {
//核心線程會阻塞在getTask()方法中等待線程陵像,除非設(shè)置了允許核心線程被銷毀就珠,
// 否則正常的情況下只有非核心線程才會執(zhí)行這里
processWorkerExit(w, completedAbruptly);//銷毀線程
}
}
主要執(zhí)行步驟為:
- 1、首先釋放鎖醒颖,因?yàn)檫M(jìn)入這個(gè)方法之后線程允許被中斷
- 2妻怎、首先看看傳入的firstTask是否為空,不為空則優(yōu)先執(zhí)行
- 3泞歉、如果firstTask為空(執(zhí)行完了)逼侦,則嘗試從getTask()中獲取任務(wù)匿辩,getTask()就是從隊(duì)列l(wèi)里面獲取任務(wù)
- 4、如果獲取到任務(wù)則開始執(zhí)行榛丢,執(zhí)行的時(shí)候需要重新上鎖铲球,因?yàn)閳?zhí)行任務(wù)期間也不允許中斷
- 5、任務(wù)運(yùn)行前后分別有一個(gè)空方法晰赞,我們可以在有需要的時(shí)候重寫這兩個(gè)方法稼病,實(shí)現(xiàn)付線程池的監(jiān)控
- 6、如果獲取不到任務(wù)掖鱼,則會執(zhí)行processWorkerExit方法銷毀線程
getTask()方法
private Runnable getTask() {
//上一次獲取任務(wù)是否超時(shí)溯饵,第一次進(jìn)來默認(rèn)false,第一次自旋后如果超時(shí)就會設(shè)置為true锨用,則第二次自旋就會返回null
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/**
* 1\. 線程池狀態(tài)為shutdown,那么就必須要等到workQueue為空才行丰刊,因?yàn)閟hutdown()狀態(tài)是需要執(zhí)行隊(duì)列中剩余任務(wù)的
* 2.線程池狀態(tài)為stop,那么就不需要關(guān)注workQueue中是否有任務(wù)
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();//線程池中的線程數(shù)-1
return null;//返回null的話,那么runWorker方法中就會跳出循環(huán)增拥,執(zhí)行finally中的processWorkerExit方法銷毀線程
}
int wc = workerCountOf(c);
// Are workers subject to culling?
//1.allowCoreThreadTimeOut-默認(rèn)false啄巧,表示核心線程數(shù)不會超時(shí)
//2.如果總線程數(shù)大于核心線程數(shù),那就說明需要有線程被銷毀
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/**
* 1\. 線程數(shù)量超過maximumPoolSize可能是線程池在運(yùn)行時(shí)被調(diào)用了setMaximumPoolSize()
* 被改變了大小掌栅,否則已經(jīng) addWorker()成功的話是不會超過maximumPoolSize秩仆。
* 2.timed && timedOut 如果為 true,表示當(dāng)前操作需要進(jìn)行超時(shí)控制猾封,并且上次從阻塞隊(duì)列中
* 獲取任務(wù)發(fā)生了超時(shí).其實(shí)就是體現(xiàn)了空閑線程的存活時(shí)間
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ://等待指定時(shí)間后返回
workQueue.take();//拿不到任務(wù)會一直阻塞(如核心線程)
if (r != null)
return r;//如果拿到任務(wù)了澄耍,返回給worker進(jìn)行處理
timedOut = true;//走到這里就說明到了超期時(shí)間還沒拿到任務(wù),設(shè)置為true晌缘,第二次自旋就可以直接返回null
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
這個(gè)方法主要步驟為:
- 1齐莲、首先判斷狀態(tài)是不是對的,如果是SHUTDOWN之類不符合要求的狀態(tài)磷箕,那就直接返回null选酗,并把線程數(shù)-1,而返回null之后前面的方法就會跳出while循環(huán)岳枷,執(zhí)行銷毀線程流程芒填。
- 2、判斷下是不是有設(shè)置超時(shí)時(shí)間或者最大線程數(shù)超過了核心線程數(shù)
- 3空繁、根據(jù)上面的判斷決定是執(zhí)行帶有超時(shí)時(shí)間的poll方法還是take方法從隊(duì)列中獲取元素殿衰。
情況一:如果是執(zhí)行帶超時(shí)時(shí)間的poll方法,那么時(shí)間到了如果還沒取到元素盛泡,那么就返回空闷祥,這種情況說明當(dāng)前系統(tǒng)并不繁忙,所以返回null之后線程就會被銷毀饭于;
情況二:如果是執(zhí)行take方法蜀踏,根據(jù)第2點(diǎn)的判斷知道,除非我們?nèi)藶樵O(shè)置了核心線程可以被回收掰吕,否則核心線程就是會執(zhí)行take方法果覆,如果獲取不到任務(wù)就會一直阻塞等待獲取到任務(wù)為止。
processWorkerExit方法
這是銷毀線程的方法殖熟,上面的getTask()方法返回空局待,就會執(zhí)行線程銷毀方法,因?yàn)間etTask()當(dāng)中已經(jīng)把線程數(shù)-1了菱属,所以這里可以直接執(zhí)行線程銷毀工作钳榨。
直接調(diào)用的是workers集合的remove()方法,后面還有就是嘗試中止和一些異常異常情況的補(bǔ)償操作纽门。
拒絕策略
JDK默認(rèn)提供的拒絕策略有如下幾種:
- AbortPolicy:直接拋出異常薛耻,默認(rèn)策略
- CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù)
- DiscardOldestPolicy:丟棄阻塞隊(duì)列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù)
- DiscardPolicy:直接丟棄任務(wù)
我們也可以自定義自己的拒絕策略赏陵,只要實(shí)現(xiàn)RejectedExecutionHandler接口饼齿,重寫其中的唯一一個(gè)方法rejectedExecution就可以了。
常見的面試問題
線程池這一塊面試非常喜歡問蝙搔,我們來舉幾個(gè)常見的問題:
問題一
Q:為什么不建議直接使用Executors來構(gòu)建線程池缕溉?
A:用Executors 使得我們不用關(guān)心線程池的參數(shù)含義,這樣可能會導(dǎo)致問題吃型,比如我們用newFixdThreadPool或者newSingleThreadPool.允許的隊(duì)列長度為Integer.MAX_VALUE证鸥,如果使用不當(dāng)會導(dǎo)致大量請求堆積到隊(duì)列中導(dǎo)致OOM的風(fēng)險(xiǎn)而newCachedThreadPool,允許創(chuàng)建線程數(shù)量為 Integer.MAX_VALUE勤晚,也可能會導(dǎo)致大量 線程的創(chuàng)建出現(xiàn)CPU使用過高或者OOM的問題枉层。而如果我們通過ThreadPoolExecutor來構(gòu)造線程池的話,我們勢必要了解線程池構(gòu)造中每個(gè) 參數(shù)的具體含義赐写,會更加謹(jǐn)慎返干。
問題二
Q:如何合理配置線程池的大小血淌?
A:要想合理地配置線程池矩欠,就必須首先分析任務(wù)特性,可以從以下幾個(gè)角度來分析:
- 任務(wù)的性質(zhì):CPU密集型任務(wù)悠夯、IO密集型任務(wù)和混合型任務(wù)癌淮。
- 任務(wù)的優(yōu)先級:高、中和低沦补。
- 任務(wù)的執(zhí)行時(shí)間:長乳蓄、中和短。
- 任務(wù)的依賴性:是否依賴其他系統(tǒng)資源夕膀,如數(shù)據(jù)庫連接虚倒。
CPU密集型:
CPU密集型的特點(diǎn)是響應(yīng)時(shí)間很快美侦,cpu一直在運(yùn)行,這種任務(wù)cpu 的利用率很高魂奥,那么線程數(shù)的配置應(yīng)該根據(jù)CPU核心數(shù)來決定菠剩,CPU核心數(shù)=最大同時(shí)執(zhí)行線程數(shù),假如CPU核心數(shù)為4耻煤,那么服務(wù)器最多能同時(shí)執(zhí)行4個(gè)線程具壮。過多的線程會導(dǎo)致上 下文切換反而使得效率降低。那線程池的最大線程數(shù)可以配置為cpu核心數(shù)+1哈蝇。
IO密集型:
主要是進(jìn)行IO操作棺妓,執(zhí)行IO操作的時(shí)間較長,這是cpu會處于空閑狀態(tài)炮赦, 導(dǎo)致cpu的利用率不高怜跑,這種情況下可以增加線程池的大小》涂保可以結(jié)合線程的等待時(shí)長來做判斷妆艘,等待時(shí)間越高,那么線程數(shù)也相對越多看幼。一般可以配置cpu核心數(shù)的2倍批旺。
一個(gè)公式:線程池設(shè)定最佳線程數(shù)目 = ((線程池設(shè)定的線程等待時(shí)間+線程 CPU 時(shí)間)/ 線程CPU時(shí)間 )* CPU數(shù)目
附:獲取CPU個(gè)數(shù)方法:Runtime.getRuntime().availableProcessors()
問題三
Q:線程池中的核心線程什么時(shí)候會初始化?
A:默認(rèn)情況下诵姜,創(chuàng)建線程池之后汽煮,線程池中是沒有線程的,需要提交任務(wù)之后才會創(chuàng)建線程棚唆。 在實(shí)際中如果需要線程池創(chuàng)建之后立即創(chuàng)建線程暇赤,可以通過如下兩個(gè)方法:
- prestartCoreThread():初始化一個(gè)核心線程。
- prestartAllCoreThreads():初始化所有核心線程
問題四
Q:線程池被關(guān)閉時(shí)宵凌,如果還有任務(wù)在執(zhí)行鞋囊,怎么辦?
A:線程池的關(guān)閉有兩個(gè)方法:
- shutdown()
不會立即終止線程池瞎惫,要等所有任務(wù)緩存隊(duì)列中的任務(wù)都執(zhí)行完后才終止溜腐,但是不會接受新的任務(wù) - shutdownNow()
立即終止線程池,并嘗試打斷正在執(zhí)行的任務(wù)瓜喇,并且清空任務(wù)緩存隊(duì)列挺益,返回尚未執(zhí)行的任務(wù)任務(wù)
問題五
Q:線程池容量是否可以動態(tài)調(diào)整?
A:可以通過兩個(gè)方法動態(tài)調(diào)整線程池的大小乘寒。
- setCorePoolSize():設(shè)置最大核心線程數(shù)
- setMaximumPoolSize():設(shè)置最大工作線程數(shù)
總結(jié)
本文從線程池的常見的四種用法使用示例開始入手望众,最終發(fā)現(xiàn)都調(diào)用了同一個(gè)類去構(gòu)造線程池(ThreadPoolExecutor),所以我們就從ThreadPoolExecutor構(gòu)造器開始分析了構(gòu)建一個(gè)線程池的7大參數(shù)稚机,并從execute()方法開始逐步分析了線程池的使用原理弓坞,當(dāng)然,其實(shí)線程池還有一個(gè)方法submit()也可以作為入口代赁,這個(gè)會放在下篇并發(fā)系列講述Future/Callable的時(shí)候再去分析甘耿。