ThreadPoolExecutor
ThreadPoolExecutor是Executor執(zhí)行框架最重要的一個實現(xiàn)類仁烹,提供了線程池管理和任務(wù)管理是兩個最基本的能力。這篇通過分析ThreadPoolExecutor的源碼來看看如何設(shè)計和實現(xiàn)一個基于生產(chǎn)者消費者模型的執(zhí)行器注盈。
線程池有多重要
線程是一個程序員一定會涉及到的一個概念晃危,但是線程的創(chuàng)建和切換都是代價比較大的。所以老客,我們有沒有一個好的方案能做到線程的復(fù)用呢僚饭?這就涉及到一個概念——線程池。合理的使用線程池能夠帶來3個很明顯的好處:
1.降低資源消耗:通過重用已經(jīng)創(chuàng)建的線程來降低線程創(chuàng)建和銷毀的消耗
2.提高響應(yīng)速度:任務(wù)到達時不需要等待線程創(chuàng)建就可以立即執(zhí)行胧砰。
3.提高線程的可管理性:線程池可以統(tǒng)一管理材蛛、分配荒揣、調(diào)優(yōu)和監(jiān)控捷沸。
ThreadPoolExecutor 設(shè)計原則
線程池就好像生活中的工廠,對于一個工廠最重要的是“訂單”和創(chuàng)建商品的“員工”击罪。對應(yīng)到線程池中就是訂單=任務(wù),員工=worker,
作為工廠必然滿足生產(chǎn)者消費者模型贪薪,工廠接到訂單分解為一個個任務(wù)會放入一個流水線上媳禁,而員工則會從流水線中拿到訂單做處理。處理完畢后重新在流水線上獲取任務(wù)画切。對應(yīng)線程池來說外部線程產(chǎn)生一個任務(wù)調(diào)用線程池處理竣稽,線程池首先會將任務(wù)放入工作隊列,線程池中worker線程不斷從工作隊列取出任務(wù)去處理(有條件無限循環(huán))霍弹,執(zhí)行完畢繼續(xù)從工作隊列獲取毫别,直到線程池關(guān)閉或worker被銷毀。
生產(chǎn)者是任務(wù)的提交者典格,是外部調(diào)用ThreadPoolExecutor的線程
工作隊列是一個阻塞隊列的接口岛宦,具體的實現(xiàn)類可以有很多種。
消費者是封裝了線程的Worker類的集合
創(chuàng)建線程池耍缴,需要準備哪些 砾肺?
線程池和工廠一樣首先需要創(chuàng)建出來。而創(chuàng)建工廠需要明確如下準備工作:
工廠生成需要的員工數(shù)量私恬,這里存在2個值债沮,一個是工廠正常運作需要的最少員工數(shù)量我們在線程池中稱為核心work數(shù)量corePoolSize炼吴,一個是工廠最多能夠招聘員工是多少本鸣,我們在線程池中稱為maximumPoolSize,
為了節(jié)省成本有些員工如果長期無法從工作隊列獲取任務(wù)(員工之間存在競爭關(guān)系)硅蹦。那么需要淘汰一定數(shù)量的員工荣德,我們在線程中規(guī)定超過keepAliveTime時間空閑的線程被淘汰。
需要購買一條流水線童芹,我們在線程池中被稱工作隊列workQueue
需要招聘一名HR來雇傭員工涮瞻,我們在線程中稱為threadFactory
需要招聘售前團隊,這里主要是負責(zé)工廠超負苛工作無力承擔(dān)新的訂單時拒絕策略假褪,我們在線程中稱為線程池的拒絕策略handler
我們線程池的工廠在創(chuàng)建之初為了節(jié)約成本并沒有招聘員工署咽,而是等到有訂單任務(wù)時在運作起來。
源碼實現(xiàn)
/**
* 創(chuàng)建一個線程池生音,
* @param corePoolSize 線程池中核心wor線程數(shù)量宁否。
* @param maximumPoolSize 線程池中允許的最大worker數(shù)量
* @param keepAliveTime worker線程(非核心線程)空閑的時間,大于此時間是被銷毀
* @param unit keepAliveTime的單位缀遍。TimeUnit
* @param workQueue 用來保存等待執(zhí)行的任務(wù)的阻塞隊列
* @param threadFactory 創(chuàng)建work工廠
* @param handler 線程池的拒絕策略
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
任務(wù)來了慕匠,如何運作 ?
前面說到一個線程需要運作起來需要任務(wù)。當一個訂單被指派到工廠時是如何運作的呢域醇?
1 當員工人數(shù) < corePoolSize台谊,每接到一個任務(wù)就會去雇傭一個新員工來完成這個任務(wù).對于線程池來說就創(chuàng)建一個work標記為core work線程執(zhí)行接收的任務(wù)
2 如果 (員工人數(shù)> corePoolSize) 且(員工人數(shù) < maximumPoolSize),將任務(wù)放入流水線蓉媳,不在雇傭員工。對于線程來說線程會將任務(wù)放入工作隊列锅铅。不在創(chuàng)建新的worker.
3 如果情況2中流水線容量滿了酪呻,說明當前任務(wù)已經(jīng)超負荷。需要雇傭新員工來處理新的任務(wù)盐须。對于線程池來說就創(chuàng)建一個worker來執(zhí)行新任務(wù)
4 如果雇傭的員工已達到上線maximumPoolSize号杠,且流水線容量也滿,則新任務(wù)只好讓售前拒絕丰歌。對于線程池來說就是交給RejectedExecutionHandler處理姨蟋。
[圖片上傳失敗...(image-5cf12c-1562081688936)]
作為一個工廠,也要有生命周期
對于一個工廠伴隨著創(chuàng)建到倒閉都需要經(jīng)歷一個生命周期立帖。對于一個工廠來說有如下幾個狀態(tài)
RUNNING:表示工廠正常運行眼溶,對應(yīng)到線程正常運行。
SHUTDOWN:表示工廠正常申請倒閉晓勇,這時工廠不接收新的訂單任務(wù)堂飞,對原先接收的訂單任務(wù)(包括還在工作隊列未執(zhí)行,和正在執(zhí)行的)绑咱;對應(yīng)到線程就是不再接收任務(wù)绰筛,但仍然會處理已接收的任務(wù)
STOP:表示工廠異常倒閉,這時工廠不僅不再接收新的訂單任務(wù)描融,還會清理流水線中任務(wù)铝噩,對正在處理的任務(wù)進行終止;對應(yīng)到線程池就是停止接收新的任務(wù)窿克,清空工作隊列中的任務(wù)骏庸。同時對work線程提交中斷interrupt
TIDYING: 表示工廠已經(jīng)沒有需要執(zhí)行的訂單任務(wù),等待執(zhí)行最后的清理動作年叮;對應(yīng)到線程池表示不在存在任務(wù)具被,等待執(zhí)行terminate函數(shù)(模板方法)
TERMINATED:工廠倒閉
源碼實現(xiàn)
線程池用一個整數(shù)記錄了線程狀態(tài)和work線程數(shù)量
前3位記錄線程池狀態(tài),后29位記錄運行work數(shù)量
/**
* 用于記錄線程池池的 狀態(tài)和work線程數(shù)量
* 前3位記錄線程池狀態(tài)
* 后29位記錄運行work數(shù)量
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
對于線程池不同狀態(tài)對應(yīng)二進制狀態(tài)
RUNNING -- 對應(yīng)的高3位值是111只损。
SHUTDOWN -- 對應(yīng)的高3位值是000一姿。
STOP -- 對應(yīng)的高3位值是001。
TIDYING -- 對應(yīng)的高3位值是010跃惫。
TERMINATED -- 對應(yīng)的高3位值是011叮叹。
具體實現(xiàn)和相關(guān)方法
/**
* 用于記錄線程池的 狀態(tài)和work線程數(shù)量
* 前3位記錄線程池狀態(tài)
* 后29位記錄運行work數(shù)量
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
/** Java 中Integer 類型長度為32位,線程池用一個int類型的前3位表示線程池的狀態(tài)**/
private static final int COUNT_BITS = Integer.SIZE - 3;
/** 用來計算出當前線程池狀態(tài)中間變量辈挂,同時也表示work最大數(shù)量
* 00011111 11111111 11111111 11111111
**/
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
/** -----------------線程池狀態(tài)----------------- **/
/**
* 線程池RUNNING狀態(tài),當前狀態(tài)下線程池可以接收新的任務(wù)衬横,對新接收的任務(wù)進行處理,
* 工廠正常運行
*
* -1 二進制 11111111111111111111111111111111 左移動 29位 前三位 111
*/
private static final int RUNNING = -1 << COUNT_BITS;
/**
* 線程池SHUTDOWN狀態(tài),當前狀態(tài)下線程池不在接收新任務(wù)终蒂,對之前接收的任務(wù)(其中包括還在隊列等待和正在執(zhí)行的任務(wù))
* 工廠不在接收新的訂單,工廠運行出現(xiàn)了問題
*
* 0 二進制 00000000000000000000000000000000 左移動 29位 前三位 000
*/
private static final int SHUTDOWN = 0 << COUNT_BITS;
/**
* 線程池STOP狀態(tài),當前狀態(tài)下線程池不在接收新任務(wù)蜂林,對之前接收的任務(wù)存在隊列沒有處理的不在處理遥诉,正在執(zhí)行做中斷
* 工廠不在接收新的訂單,工廠要倒閉了
*
* 1 二進制 00000000000000000000000000000001 左移動 29位 前三位 001
*/
private static final int STOP = 1 << COUNT_BITS;
/**
* 線程池TIDYING狀態(tài),當前沒有待執(zhí)行的任務(wù),等待執(zhí)行函數(shù)terminated()
* 工廠走倒閉程序噪叙,需要做最后清理工作
*
* 2 二進制 00000000000000000000000000000010 左移動 29位 前三位 010
*/
private static final int TIDYING = 2 << COUNT_BITS;
/**
* 執(zhí)行函數(shù)terminated()
* 工廠關(guān)閉
* 3 二進制 00000000000000000000000000000011 左移動 29位 前三位 011
*/
private static final int TERMINATED = 3 << COUNT_BITS;
/** 計算獲取當前線程池狀態(tài) **/
private static int runStateOf(int c) { return c & ~CAPACITY; }
/** 計算獲取當前運行work數(shù)量**/
private static int workerCountOf(int c) { return c & CAPACITY; }
/**
* 即根據(jù)線程池的狀態(tài)和worker數(shù)量合并成整形 ctl
*/
private static int ctlOf(int rs, int wc) { return rs | wc; }
/** 判斷當前線程池是否小于s,c表示當前線程池狀態(tài) **/
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
/** 判斷當前線程池是否大于等于s,c表示當前線程池狀態(tài) **/
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
/** 判斷當前線程池是否正在正常運行 RUNNING狀態(tài)**/
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* 使用CAS增加線程池中work數(shù)量(后29位可以直接整數(shù)運算)
* 成功返回true,失敗返回false
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* 使用CAS減少線程池中work數(shù)量(后29位可以直接整數(shù)運算)
* 成功返回true,失敗返回false
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
/**
* 使用CAS減少線程池中work數(shù)量(后29位可以直接整數(shù)運算),失敗則繼續(xù)循環(huán)直到成功
*/
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
流水線 原來是阻塞隊列
前面總是說到工廠是流水線用來放任務(wù)矮锈,那流水線具體有哪些功能呢?流水線在線程池中是一個BlockingQueue睁蕾,一個可以阻塞的隊列苞笨。說具體就是當流水線滿了再放任務(wù)會阻塞,當流水線不存在任務(wù)子眶,work線程去獲取任務(wù)時同樣會阻塞
private final BlockingQueue<Runnable> workQueue;
BlockingQueue API
public interface BlockingQueue<E> extends Queue<E> {
//將元素插入隊列尾部瀑凝,方法在添加失敗(比如隊列已滿)時會報 一些運行時錯誤.
boolean add(E e);
//將元素插入隊列尾部臭杰,方法在添加失斣吝洹(比如隊列已滿)時,不會拋出異常渴杆,只會返回false
boolean offer(E e);
//插入元素e至隊尾, 如果隊列已滿, 則阻塞調(diào)用線程直到隊列有空閑空間.
void put(E e) throws InterruptedException;
//插入元素e至隊列, 如果隊列已滿, 則限時阻塞調(diào)用線程寥枝,直到隊列有空閑空間或超時.
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
//將隊首的元素刪除,隊列為空則拋出異常
boolean remove(Object o);
//將隊首的元素刪除磁奖,隊列為空則返回null(繼承方法囊拜,方便統(tǒng)一寫在這)
E poll();
//從隊首刪除元素,如果隊列為空, 則阻塞調(diào)用線程直到隊列中有元素.
E take() throws InterruptedException;
//從隊首刪除元素比搭,如果隊列為空, 則限時阻塞調(diào)用線程冠跷,直到隊列中有元素或超時.
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
//返回剩余可用容量,沒有容量限制返回Integer.MAX_VALUE
int remainingCapacity();
public boolean contains(Object o);
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);
}
和Queue接口比較會發(fā)現(xiàn)BlockingQueue接口擴展了4個和阻塞相關(guān)的核心方法:put(e)敢辩、take()蔽莱;offer(e, time, unit)弟疆、poll(time, unit)戚长。總結(jié)如下
拋出異常:滿隊列時怠苔,執(zhí)行入隊會拋出異常同廉;空隊列時執(zhí)行出隊會拋出異常 。
返回特殊值:入隊操作會返回布爾值柑司;出隊操作成功返回操作值迫肖,失敗返回空值。
一直阻塞:滿隊列時攒驰,執(zhí)行入隊會進入條件等待隊列蟆湖,線程阻塞;空隊列時玻粪,執(zhí)行出隊會進入條件等待隊列隅津,線程阻塞 诬垂。
超時退出:滿隊列時,執(zhí)行入隊會阻塞生產(chǎn)者線程一段時間伦仍,如果超過一定的時間结窘,生產(chǎn)者線程就會退出。
線程池中工作隊列的策略
無界隊列
可以使用LinkedBlockingQueue(基于鏈表的有界隊列充蓝,F(xiàn)IFO)隧枫,理論上是該隊列可以對無限多的任務(wù)排隊
將導(dǎo)致在所有corePoolSize線程都工作的情況下將新任務(wù)加入到隊列中。這樣谓苟,創(chuàng)建的線程就不會超過corePoolSize官脓,也因此,maximumPoolSize的值也就無效了
有界隊列
可以使用ArrayBlockingQueue(基于數(shù)組結(jié)構(gòu)的有界隊列涝焙,F(xiàn)IFO)确买,并指定隊列的最大長度
使用有界隊列可以防止資源耗盡,但也會造成超過隊列大小和maximumPoolSize后纱皆,提交的任務(wù)被拒絕的問題湾趾,比較難調(diào)整和控制
不排隊,直接提交
將任務(wù)直接交給線程處理而不保持它們派草,可使用SynchronousQueue
如果不存在可用于立即運行任務(wù)的線程(即線程池中的線程都在工作)搀缠,則試圖把任務(wù)加入緩沖隊列將會失敗,因此會構(gòu)造一個新的線程來處理新添加的任務(wù)近迁,并將其加入到線程池中(corePoolSize-->maximumPoolSize擴容)
Executors.newCachedThreadPool()采用的便是這種策略
如何雇傭一個員工
線程池中一個work的本質(zhì)是一個Runnable(線程)
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
而ThreadFactory創(chuàng)建一個work線程艺普,本質(zhì)就是創(chuàng)建一個Thead,并將work這個Runnable設(shè)置到其屬性種,啟動這個Thead
public class DefaultThreadFactory implements ThreadFactory{
...省略代碼
@Override
public Thread newThread(Runnable r) {
/** 這里r就是work **/
Thread t = new Thread(r, prefix + nextId.incrementAndGet());
try {
if (t.isDaemon()) {
if (!daemon) {
t.setDaemon(false);
}
} else {
if (daemon) {
t.setDaemon(true);
}
}
if (t.getPriority() != priority) {
t.setPriority(priority);
}
} catch (Exception ignored) {
// Doesn't matter even if failed to set.
}
return t;
}
}
為什么要實現(xiàn)AQS同步狀態(tài)鉴竭,作為一個工廠員工同一個時間只能完成一個任務(wù)歧譬。因而需要在其開始工作和結(jié)束工作獲取同步狀態(tài)(加鎖),釋放同步狀態(tài)(解鎖)搏存。
員工又是如何工作的瑰步?
我們知道work是一個Runnable(線程),那么執(zhí)行線程最重要的是run方法。我們來看下work的run方法璧眠,run存在一個有條件的無限循環(huán)缩焦,work會不段獲取任務(wù)執(zhí)行。
/** 工作線程執(zhí)行责静,調(diào)用外部TheadPoolExecutor.runWorker方法 */
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
/** 獲取當前線程 **/
Thread wt = Thread.currentThread();
/** 獲取執(zhí)行任務(wù)**/
Runnable task = w.firstTask;
/** 將任務(wù)從work清理 **/
w.firstTask = null;
...省略代碼
try {
/**
* 如果當前work中存在任務(wù)則執(zhí)行袁滥,不存在則從WorkQueue獲取任務(wù)
* getTask()!=null 時work永遠不停止
**/
while (task != null || (task = getTask()) != null) {
/** 獲取work獨占同步狀態(tài)(表示任務(wù)) **/
w.lock();
...省略代碼
/** 處理任務(wù) **/
task.run();
...省略代碼
/**釋放work獨占同步狀態(tài) **/
w.unlock();
...省略代碼
}
}
社會的殘酷灾螃,淘汰機制
社會是殘酷的题翻,當工廠員工多而任務(wù)少,作為老板當然想淘汰一些員工腰鬼,怎么淘汰嵌赠,如果淘汰靴拱?這就需要實現(xiàn)一個淘汰機制。即一個work長期無法獲取任務(wù)時猾普。而何時開啟淘汰機制有2種情況袜炕。
何時開啟淘汰機制
int c = ctl.get();
int rs = runStateOf(c);
/** 是否允許回收核心work線程 **/
private volatile boolean allowCoreThreadTimeOut;
/** 判斷是否需要開啟work淘汰機制 **/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
當allowCoreThreadTimeOut=true時,淘汰機制隨時開啟
當allowCoreThreadTimeOut=false時初家,wc>corePoolSize 時淘汰機制才開啟偎窘。
如何實現(xiàn)淘汰
我們知道線程是不可控的。work之所以能夠無限運行是因為那個有條件的無限循環(huán)溜在,如果我們退出那個循環(huán)那么work線程自然銷毀陌知,也就說work被淘汰了。那么那個條件是什么掖肋?對就是getTask()返回null,
那么如何實現(xiàn)超時呢?你應(yīng)該知道BlockingQueue有一個獲取任務(wù)超時的方法
poll仆葡,如果長時間沒有獲取任務(wù)則返回null
/**
* 從WorkQueue獲取任務(wù)
* 同時用來判斷work何時退出銷毀
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
/** 無限循環(huán),
* 當work超過指定時間沒有獲取時志笼,設(shè)置timedOut = true進行二次遍歷時銷毀當前work **/
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
...省略代碼
/** 獲取work數(shù)量 **/
int wc = workerCountOf(c);
/** 判斷是否需要開啟work淘汰機制 **/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/**
* 以下幾種情況直接銷毀當前work
* 超時沒有獲取任務(wù)timedOut=tue,for循環(huán)遍歷第二次時
* 當前任務(wù)超過maximumPoolSize
* **/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
/**
* 如果開啟work淘汰機制超時獲取任務(wù)沿盅,調(diào)用poll阻塞獲取任務(wù),存在超時纫溃,如果超時沒有獲取到任務(wù)
* 設(shè)置timedOut = true 進入第二次循環(huán)銷毀
*
* 如果沒開啟work淘汰機制超時獲取任務(wù)腰涧,調(diào)用take阻塞獲取任務(wù)
* 【這里的阻塞都能被中斷響應(yīng)!紊浩!】
**/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
售前到底在做啥
private volatile RejectedExecutionHandler handler;
(1)AbortPolicy:直接拋出異常窖铡,默認策略;
(2)CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù)坊谁;
(3)DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務(wù)费彼,并執(zhí)行當前任務(wù);
(4)DiscardPolicy:直接丟棄任務(wù)口芍;
生意不好做箍铲,工廠如何倒閉
我們知道線程不不比員工,你說讓結(jié)束手頭工作就能結(jié)束的阶界。當你調(diào)用shutdown()時虹钮,需要保證每個員工都退出的。說不定很可能他還在傻傻在任務(wù)隊列那等待呢膘融。
既然要退出還是要退出那個有條件無限循環(huán)。退出還是要找到那個getTask()返回null,現(xiàn)在我們來看看之前那些省略的代碼祭玉。當線程池狀態(tài)不為運行狀態(tài)時會返回null.
/**
* 從WorkQueue獲取任務(wù)
* 同時用來判斷work何時退出銷毀
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
/** 無限循環(huán)氧映,
* 當work超過指定時間沒有獲取時,設(shè)置timedOut = true進行二次遍歷時銷毀當前work **/
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/** 線程池中狀態(tài) >= STOP 或者 線程池狀態(tài) == SHUTDOWN且阻塞隊列為空脱货,則停止worker - 1岛都,return null **/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
...省略代碼
try {
/**
* 如果開啟work淘汰機制超時獲取任務(wù)律姨,調(diào)用poll阻塞獲取任務(wù),存在超時臼疫,如果超時沒有獲取到任務(wù)
* 設(shè)置timedOut = true 進入第二次循環(huán)銷毀
*
* 如果沒開啟work淘汰機制超時獲取任務(wù)择份,調(diào)用take阻塞獲取任務(wù)
* 【這里的阻塞都能被中斷響應(yīng)!烫堤!】
**/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
那如果work線程在工作隊列那阻塞呢荣赶?這時就要中斷該線程了。注意了workQueue.poll是可以響應(yīng)中斷的哦鸽斟!
線程擴展
子類實現(xiàn)擴展
/**
* 模板方法給子類實現(xiàn)拔创,執(zhí)行任務(wù)前的操作
*/
protected void beforeExecute(Thread t, Runnable r) { }
/**
* 模板方法給子類實現(xiàn),執(zhí)行任務(wù)后的操作
*/
protected void afterExecute(Runnable r, Throwable t) { }
/**
* 模板方法給子類實現(xiàn)富蓄,線程池狀態(tài)從TIDYING到TERMINATED需要做的清理動作
*/
protected void terminated() { }
工作隊列擴展
可以修改offer方法邏輯在特定時候返回false剩燥,從而達到如果 (員工人數(shù)> corePoolSize) 且(員工人數(shù) < maximumPoolSize)依舊可以雇傭新員工(創(chuàng)建work)
Executors靜態(tài)工廠創(chuàng)建幾種常用線程池
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
創(chuàng)建一個指定工作線程數(shù)的線程池,其中參數(shù) corePoolSize 和 maximumPoolSize 相等立倍,阻塞隊列基于LinkedBlockingQueue
它是一個典型且優(yōu)秀的線程池灭红,它具有線程池提高程序效率和節(jié)省創(chuàng)建線程時所耗的開銷的優(yōu)點。但是在線程池空閑時口注,即線程池中沒有可運行任務(wù)時比伏,它也不會釋放工作線程,還會占用一定的系統(tǒng)資源
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
創(chuàng)建一個可緩存工作線程的線程池疆导,默認存活時間60秒赁项,線程池的線程數(shù)可達到Integer.MAX_VALUE,即2147483647澈段,內(nèi)部使用SynchronousQueue作為阻塞隊列悠菜;
在沒有任務(wù)執(zhí)行時,當線程的空閑時間超過keepAliveTime败富,則工作線程將會終止悔醋,當提交新任務(wù)時,如果沒有空閑線程兽叮,則創(chuàng)建新線程執(zhí)行任務(wù)芬骄,會導(dǎo)致一定的系統(tǒng)開銷
newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
初始化的線程池可以在指定的時間內(nèi)周期性的執(zhí)行所提交的任務(wù),在實際的業(yè)務(wù)場景中可以使用該線程池定期的同步數(shù)據(jù)