合理利用線程池能夠帶來三個好處。第一:降低資源消耗筝野。通過重復利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗晌姚。第二:提高響應速度粤剧。當任務到達時,任務可以不需要的等到線程創(chuàng)建就能立即執(zhí)行挥唠。第三:提高線程的可管理性抵恋。線程是稀缺資源,如果無限制的創(chuàng)建宝磨,不僅會消耗系統(tǒng)資源弧关,還會降低系統(tǒng)的穩(wěn)定性,使用線程池可以進行統(tǒng)一的分配唤锉,調(diào)優(yōu)和監(jiān)控世囊。但是要做到合理的利用線程池,必須對其原理了如指掌窿祥。
一. Java中的ThreadPoolExecutor類
java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個類株憾。因此如果要透徹地了解Java中的線程池,必須先了解這個類晒衩。下面我們來看一下ThreadPoolExecutor類的具體實現(xiàn)源碼嗤瞎。
public class ThreadPoolExecutor extends AbstractExecutorService {
.....
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
...
}
從構(gòu)造函數(shù)中,我們可以看出听系,創(chuàng)建一個線程池需要輸入幾個參數(shù):
- corePoolSize: 核心池的大小贝奇,這個參數(shù)跟后面講述的線程池的實現(xiàn)原理有非常大的關系。在創(chuàng)建了線程池后靠胜,默認情況下掉瞳,線程池中并沒有任何線程,而是等待有任務到來才創(chuàng)建線程去執(zhí)行任務髓帽,除非調(diào)用了prestartAllCoreThreads()或者prestartCoreThread()方法菠赚,從這2個方法的名字就可以看出,是預創(chuàng)建線程的意思郑藏,即在沒有任務到來之前就創(chuàng)建corePoolSize個線程或者一個線程。默認情況下瘩欺,在創(chuàng)建了線程池后必盖,線程池中的線程數(shù)為0,當有任務來之后俱饿,就會創(chuàng)建一個線程去執(zhí)行任務歌粥,當線程池中的線程數(shù)目達到corePoolSize后,就會把到達的任務放到緩存隊列當中拍埠;
- maximumPoolSize: 線程池允許創(chuàng)建的最大線程數(shù)失驶。如果隊列滿了,并且已創(chuàng)建的線程數(shù)小于最大線程數(shù)枣购,則線程池會再創(chuàng)建新的線程執(zhí)行任務嬉探。值得注意的是如果使用了無界的任務隊列這個參數(shù)就沒什么效果擦耀。
- keepAliveTime: 表示線程沒有任務執(zhí)行時最多保持多久時間會終止。默認情況下涩堤,只有當線程池中的線程數(shù)大于corePoolSize時眷蜓,keepAliveTime才會起作用,直到線程池中的線程數(shù)不大于corePoolSize胎围,即當線程池中的線程數(shù)大于corePoolSize時吁系,如果一個線程空閑的時間達到keepAliveTime,則會終止白魂,直到線程池中的線程數(shù)不超過corePoolSize汽纤。但是如果調(diào)用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數(shù)不大于corePoolSize時福荸,keepAliveTime參數(shù)也會起作用冒版,直到線程池中的線程數(shù)為0;
-
TimeUnit: 參數(shù)keepAliveTime的時間單位逞姿,有7種取值:
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小時
TimeUnit.MINUTES; //分鐘
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //納秒
* **workQueue**: 用于保存等待執(zhí)行的任務的阻塞隊列辞嗡。可以選擇以下幾個阻塞隊列:
ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;
PriorityBlockingQueue
1. ArrayBlockingQueue:是一個基于數(shù)組結(jié)構(gòu)的有界阻塞隊列滞造,此隊列按 FIFO(先進先出)原則對元素進行排序续室。
2. LinkedBlockingQueue:一個基于鏈表結(jié)構(gòu)的阻塞隊列,此隊列按FIFO (先進先出) 排序元素谒养,吞吐量通常要高于ArrayBlockingQueue挺狰。靜態(tài)工廠方法Executors.newFixedThreadPool()使用了這個隊列。
3. SynchronousQueue:一個不存儲元素的阻塞隊列买窟。每個插入操作必須等到另一個線程調(diào)用移除操作丰泊,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于LinkedBlockingQueue始绍,靜態(tài)工廠方法Executors.newCachedThreadPool使用了這個隊列瞳购。
4. PriorityBlockingQueue:一個具有優(yōu)先級得無限阻塞隊列。
* **ThreadFactory**:線程工廠亏推,主要用來創(chuàng)建線程
* **RejectedExecutionHandler**:當隊列和線程池都滿了学赛,說明線程池處于飽和狀態(tài),那么必須采取一種策略處理提交的新任務吞杭。這個策略默認情況下是AbortPolicy盏浇,表示無法處理新任務時拋出異常。以下是JDK1.5提供的四種策略芽狗。
ThreadPoolExecutor.AbortPolicy:丟棄任務并拋出RejectedExecutionException異常绢掰。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務滴劲,然后重新嘗試執(zhí)行任務(重復此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務
具體參數(shù)的配置與線程池的關系將在下一節(jié)講述攻晒。
通過ThreadPoolExecutor的代碼我們看到,ThreadPoolExecutor繼承了AbstractExecutorService哑芹,AbstractExecutorService是一個抽象類炎辨,它實現(xiàn)了ExecutorService接口。ExecutorService又是繼承了Executor接口
Excutor 整體結(jié)構(gòu)如下:
![](http://upload-images.jianshu.io/upload_images/327713-11bfcfd3cf6369d0.jpg?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
Executor是一個頂層接口聪姿,在它里面只聲明了一個方法execute(Runnable)碴萧,返回值為void,參數(shù)為Runnable類型末购,從字面意思可以理解破喻,就是用來執(zhí)行傳進去的任務的;
然后ExecutorService接口繼承了Executor接口盟榴,并聲明了一些方法:submit曹质、invokeAll、invokeAny以及shutDown等擎场;
抽象類AbstractExecutorService實現(xiàn)了ExecutorService接口羽德,基本實現(xiàn)了ExecutorService中聲明的所有方法;
然后ThreadPoolExecutor繼承了類AbstractExecutorService迅办。在ThreadPoolExecutor類中有幾個非常重要的方法:
execute()
submit()
shutdown()
shutdownNow()
execute()方法實際上是Executor中聲明的方法宅静,在ThreadPoolExecutor進行了具體的實現(xiàn),這個方法是ThreadPoolExecutor的核心方法站欺,通過這個方法可以向線程池提交一個任務姨夹,交由線程池去執(zhí)行。
submit()方法是在ExecutorService中聲明的方法矾策,在AbstractExecutorService就已經(jīng)有了具體的實現(xiàn)磷账,在ThreadPoolExecutor中并沒有對其進行重寫,這個方法也是用來向線程池提交任務的贾虽,但是它和execute()方法不同逃糟,它能夠返回任務執(zhí)行的結(jié)果,去看submit()方法的實現(xiàn)榄鉴,會發(fā)現(xiàn)它實際上還是調(diào)用的execute()方法履磨,只不過它利用了Future來獲取任務執(zhí)行結(jié)果。
shutdown()和shutdownNow()是用來關閉線程池的庆尘。
ScheduledThreadPoolExecutor 在 ThreadPoolExecutor 的基礎上提供了支持定時調(diào)度的功能。線程任務可以在一定延時時間后才被觸發(fā)執(zhí)行巷送。
##二. 深入剖析線程池實現(xiàn)原理
###1. 線程池狀態(tài)
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
當創(chuàng)建線程池后驶忌,初始時,線程池處于RUNNING狀態(tài);
如果調(diào)用了shutdown()方法付魔,則線程池處于SHUTDOWN狀態(tài)聊品,此時線程池不能夠接受新的任務,它會等待所有任務執(zhí)行完畢几苍;
如果調(diào)用了shutdownNow()方法翻屈,則線程池處于STOP狀態(tài),此時線程池不能接受新的任務妻坝,并且會去嘗試終止正在執(zhí)行的任務伸眶;
當線程池處于SHUTDOWN或STOP狀態(tài),并且所有工作線程已經(jīng)銷毀刽宪,任務緩存隊列已經(jīng)清空或執(zhí)行結(jié)束后厘贼,線程池被設置為TERMINATED狀態(tài)。
###2. ThreadPoolExecutor 原理
####2.1 ThreadPoolExecutor的幾個重要屬性
private final BlockingQueue<Runnable> workQueue; //任務緩存隊列圣拄,用來存放等待執(zhí)行的任務
private final ReentrantLock mainLock = new ReentrantLock(); //線程池的主要狀態(tài)鎖嘴秸,對線程池狀態(tài)(比如線程池大小
//、runState等)的改變都要使用這個鎖
private final HashSet<Worker> workers = new HashSet<Worker>(); //用來存放工作集
private volatile long keepAliveTime; //線程存貨時間
private volatile boolean allowCoreThreadTimeOut; //是否允許為核心線程設置存活時間
private volatile int corePoolSize; //核心池的大斜幼弧(即線程池中的線程數(shù)目大于這個參數(shù)時岳掐,提交的任務會被放進任務緩存隊列)
private volatile int maximumPoolSize; //線程池最大能容忍的線程數(shù)
private volatile int poolSize; //線程池中當前的線程數(shù)
private volatile RejectedExecutionHandler handler; //任務拒絕策略
private volatile ThreadFactory threadFactory; //線程工廠,用來創(chuàng)建線程
private int largestPoolSize; //用來記錄線程池中曾經(jīng)出現(xiàn)過的最大線程數(shù)
private long completedTaskCount; //用來記錄已經(jīng)執(zhí)行完畢的任務個數(shù)
####2.2 ThreadPoolExecutor的內(nèi)部工作原理
線程池的主要工作流程如下圖:
![](http://upload-images.jianshu.io/upload_images/327713-9d0e1d992971ccef.jpg?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
1. 如果當前池大小poolSize小于corePoolSize饭耳,則創(chuàng)建新線程執(zhí)行任務串述;
2. 如果當前池大小poolSize大于corePoolSize,且等待隊列未滿哥攘,則進入等待隊列剖煌;
3. 如果當前池大小poolSize大于corePoolSize,且等待隊列已滿,且小于maxmumPoolSize逝淹,則創(chuàng)建新線程執(zhí)行任務耕姊;
4. 如果當前池大小poolSize大于corePoolSize,且等待隊列已滿,且大于maxmumPoolSize栅葡,則調(diào)用拒絕策略來處理該任務
5. 線程池里的每個線程執(zhí)行完任務后不會立即退出茉兰,而是會去檢查下等待隊列中是否有任務等待執(zhí)行,如果在keepAliveTime里等不到新任務欣簇,那么線程就退出了规脸。
####下面看看代碼實現(xiàn):
線程池最重要的方法是由 Executor 接口定義的 execute 方法 , 是任務提交的入口。
我們看看 ThreadPoolExecutor.execute(Runnable cmd) 的實現(xiàn):
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}
當提交一個新的 Runnable 任務:
分支1 : 如果當前池大小小于 corePoolSize, 執(zhí)行 addIfUnderCorePoolSize(command) , 如果線程池處于運行狀態(tài)且 poolSize < corePoolSize addIfUnderCorePoolSize(command) 會做如下事情熊咽,將 Runnable 任務封裝成 Worker 任務 , 創(chuàng)建新的 Thread 莫鸭,執(zhí)行 Worker 任務。如果不滿足條件横殴,則返回 false 被因。
代碼如下:
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < corePoolSize && runState == RUNNING)
t = addThread(firstTask); //創(chuàng)建線程去執(zhí)行firstTask任務
} finally {
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}
分支2 : 如果大于 corePoolSize 或 1 失敗失敗,則:
* 如果等待隊列未滿,把 Runnable 任務加入到 workQueue 等待隊列workQueue .offer(command)
* 如果等待隊列已經(jīng)滿了梨与,調(diào)用 addIfUnderMaximumPoolSize(command) 堕花,和 addIfUnderCorePoolSize 基本類似,只不過判斷條件是 poolSize < maximumPoolSize 粥鞋。如果大于 maximumPoolSize 缘挽,則把 Runnable 任務交由 RejectedExecutionHandler 來處理。
####2.3 線程池的初始化
默認情況下呻粹,創(chuàng)建線程池之后壕曼,線程池中是沒有線程的,需要提交任務之后才會創(chuàng)建線程尚猿。
在實際中如果需要線程池創(chuàng)建之后立即創(chuàng)建線程窝稿,可以通過以下兩個方法辦到:
? prestartCoreThread():初始化一個核心線程;
? prestartAllCoreThreads():初始化所有核心線程
####2.4 任務緩存隊列及排隊策略
在前面我們多次提到了任務緩存隊列凿掂,即workQueue伴榔,它用來存放等待執(zhí)行的任務。
workQueue的類型為BlockingQueue<Runnable>庄萎,通匙偕伲可以取下面三種類型:
1)ArrayBlockingQueue:基于數(shù)組的先進先出隊列,此隊列創(chuàng)建時必須指定大锌诽巍援奢;
2)LinkedBlockingQueue:基于鏈表的先進先出隊列,如果創(chuàng)建時沒有指定此隊列大小忍捡,則默認為Integer.MAX_VALUE集漾;
3)synchronousQueue:這個隊列比較特殊,它不會保存提交的任務砸脊,而是將直接新建一個線程來執(zhí)行新來的任務具篇。
####2.5 任務拒絕策略
當線程池的任務緩存隊列已滿并且線程池中的線程數(shù)目達到maximumPoolSize,如果還有任務到來就會采取任務拒絕策略凌埂,通常有以下四種策略:
- ThreadPoolExecutor.AbortPolicy:丟棄任務并拋出RejectedExecutionException異常驱显。
- ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常瞳抓。
- ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務埃疫,然后重新嘗試執(zhí)行任務(重復此過程)
- ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務
####2.6 線程池的關閉
ThreadPoolExecutor提供了兩個方法,用于線程池的關閉孩哑,分別是shutdown()和shutdownNow()栓霜,其中:
? shutdown():不會立即終止線程池,而是要等所有任務緩存隊列中的任務都執(zhí)行完后才終止横蜒,但再也不會接受新的任務
? shutdownNow():立即終止線程池叙淌,并嘗試打斷正在執(zhí)行的任務秤掌,并且清空任務緩存隊列愁铺,返回尚未執(zhí)行的任務
####2.7 線程池容量的動態(tài)調(diào)整
ThreadPoolExecutor提供了動態(tài)調(diào)整線程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize()鹰霍,
? setCorePoolSize:設置核心池大小
? setMaximumPoolSize:設置線程池最大能創(chuàng)建的線程數(shù)目大小
當上述參數(shù)從小變大時,ThreadPoolExecutor進行線程賦值茵乱,還可能立即創(chuàng)建新的線程來執(zhí)行任務茂洒。
###3. ScheduledThreadPoolExecutor
ScheduleThreadPoolExecutor 是對ThreadPoolExecutor的集成。增加了定時觸發(fā)線程任務的功能瓶竭。需要注意:從內(nèi)部實現(xiàn)看督勺, ScheduleThreadPoolExecutor 使用的是 corePoolSize 線程和一個無界隊列的固定大小的池,所以調(diào)整 maximumPoolSize 沒有效果斤贰。無界隊列是一個內(nèi)部自定義的 DelayedWorkQueue 智哀。
ScheduleThreadPoolExecutor 線程池接收定時任務的方法是 schedule ,看看內(nèi)部實現(xiàn):
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
以上代碼會初始化一個 RunnableScheduledFuture 類型的任務 t, 并交給 delayedExecute 方法荧恍。 delayedExecute(t) 方法實現(xiàn)如下:
private void delayedExecute(Runnable command) {
if (isShutdown()) {
reject(command);
return;
}
if (getPoolSize() < getCorePoolSize())
prestartCoreThread();
super.getQueue().add(command);
}
如果當前線程池大小 poolSize 小于 CorePoolSize 瓷叫,則創(chuàng)建一個新的線程,注意這里創(chuàng)建的線程是空的送巡,不會把任務直接交給線程來做摹菠,而是把線程任務放到隊列里。因為任務是要定時觸發(fā)的骗爆,所以不能直接交給線程去執(zhí)行次氨。
**那如何做到定時觸發(fā)呢?**
關鍵在于DelayedWorkQueue,它代理了 DelayQueue 摘投≈蠊眩可以認為 DelayQueue 是這樣一個隊列(具體可以去看下源碼,不詳細分析):
1. 隊列里的元素按照任務的 delay 時間長短升序排序犀呼, delay 時間短的在隊頭幸撕, delay 時間長的在隊尾。
2. 從 DelayQueue 里 FIFO 的獲取一個元素的時候圆凰,不會直接返回 head 杈帐。可能會阻塞专钉,等到 head 節(jié)點到達 delay 時間后才能被獲取挑童。可以看下 DelayQueue 的 take 方法實現(xiàn):
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) {
available.await();
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay > 0) {
long tl = available.awaitNanos(delay);//等待delay時間
} else {
E x = q.poll();
assert x != null;
if (q.size() != 0)
available.signalAll(); // wake up other takers
return x;
}
}
}
} finally {
lock.unlock();
}
}
博客地址:[Java線程池跃须,知道這些就夠了](http://www.wjd1024.com/2017/07/03/java%E7%BA%BF%E7%A8%8B%E6%B1%A0/>)
參考資料:
<http://www.cnblogs.com/dolphin0520/p/3932921.html>
<http://developer.51cto.com/art/201203/321885.htm>
<http://ifeve.com/java-threadpool/>