java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個(gè)類轰绵,因此如果要透徹地了解Java中的線程池,必須先了解這個(gè)類。
先附上一段使用代碼:
public class Test {
public static void main(String[] args) {
//創(chuàng)建線程池executor
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(5));
for(int i=0;i<15;i++){
MyTask myTask = new MyTask(i);、乐严、
executor.execute(myTask);
System.out.println("線程池中線程數(shù)目:"+executor.getPoolSize()+",隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:"+
executor.getQueue().size()+"衣摩,已執(zhí)行玩別的任務(wù)數(shù)目:"+executor.getCompletedTaskCount());
}
executor.shutdown();
}
}
class MyTask implements Runnable {
private int taskNum;
public MyTask(int num) {
this.taskNum = num;
}
@Override
public void run() {
System.out.println("正在執(zhí)行task "+taskNum);
try {
Thread.currentThread().sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task "+taskNum+"執(zhí)行完畢");
}
}
下面我們來看一下ThreadPoolExecutor類的具體實(shí)現(xiàn)源碼麦备。
變量
先來看一下這個(gè)類中定義的重要變量,如下:
private final BlockingQueue<Runnable> workQueue; // 阻塞隊(duì)列
private final ReentrantLock mainLock = new ReentrantLock(); // 互斥鎖
private final HashSet<Worker> workers = new HashSet<Worker>();// 線程集合.一個(gè)Worker對應(yīng)一個(gè)線程
private final Condition termination = mainLock.newCondition();// 終止條件
private int largestPoolSize; // 線程池中線程數(shù)量曾經(jīng)達(dá)到過的最大值昭娩。
private long completedTaskCount; // 已完成任務(wù)數(shù)量
private volatile ThreadFactory threadFactory; // ThreadFactory對象凛篙,用于創(chuàng)建線程。
private volatile RejectedExecutionHandler handler;// 拒絕策略的處理句柄
private volatile long keepAliveTime; // 線程池維護(hù)線程所允許的空閑時(shí)間
private volatile boolean allowCoreThreadTimeOut;
private volatile int corePoolSize; // 線程池維護(hù)線程的最小數(shù)量栏渺,哪怕是空閑的
private volatile int maximumPoolSize; // 線程池維護(hù)的最大線程數(shù)量
1> corePoolSize與maximumPoolSize 由于ThreadPoolExecutor 將根據(jù) corePoolSize和 maximumPoolSize設(shè)置的邊界自動調(diào)整池大小呛梆,當(dāng)新任務(wù)在方法 execute(java.lang.Runnable) 中提交時(shí):
- 如果運(yùn)行的線程少于 corePoolSize,則創(chuàng)建新線程來處理請求磕诊,即使其他輔助線程是空閑的填物;
- 如果設(shè)置的corePoolSize 和 maximumPoolSize相同,則創(chuàng)建的線程池是大小固定的霎终,如果運(yùn)行的線程與corePoolSize相同滞磺,當(dāng)有新請求過來時(shí),若workQueue未滿莱褒,則將請求放入workQueue中击困,等待有空閑的線程去從workQueue中取任務(wù)并處理
- 如果運(yùn)行的線程多于 corePoolSize 而少于 maximumPoolSize,則僅當(dāng)隊(duì)列滿時(shí)才創(chuàng)建新線程才創(chuàng)建新的線程去處理請求广凸;
- 如果運(yùn)行的線程多于corePoolSize 并且等于maximumPoolSize阅茶,若隊(duì)列已經(jīng)滿了,則通過handler所指定的策略來處理新請求谅海;
- 如果將 maximumPoolSize 設(shè)置為基本的無界值(如 Integer.MAX_VALUE)脸哀,則允許池適應(yīng)任意數(shù)量的并發(fā)任務(wù)
也就是說,處理任務(wù)的優(yōu)先級為:
- 核心線程corePoolSize > 任務(wù)隊(duì)列workQueue > 最大線程maximumPoolSize扭吁,如果三者都滿了撞蜂,使用handler處理被拒絕的任務(wù)。
- 當(dāng)池中的線程數(shù)大于corePoolSize的時(shí)候侥袜,多余的線程會等待keepAliveTime長的時(shí)間蝌诡,如果無請求可處理就自行銷毀。
2> workQueue 線程池所使用的緩沖隊(duì)列系馆,該緩沖隊(duì)列的長度決定了能夠緩沖的最大數(shù)量送漠,緩沖隊(duì)列有三種通用策略:
- 直接提交。工作隊(duì)列的默認(rèn)選項(xiàng)是 SynchronousQueue由蘑,它將任務(wù)直接提交給線程而不保持它們闽寡。在此,如果不存在可用于立即運(yùn)行任務(wù)的線程尼酿,則試圖把任務(wù)加入隊(duì)列將失敗爷狈,因此會構(gòu)造一個(gè)新的線程。此策略可以避免在處理可能具有內(nèi)部依賴性的請求集時(shí)出現(xiàn)鎖裳擎。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務(wù)涎永。當(dāng)命令以超過隊(duì)列所能處理的平均數(shù)連續(xù)到達(dá)時(shí),此策略允許無界線程具有增長的可能性;
- 無界隊(duì)列鹿响。使用無界隊(duì)列(例如羡微,不具有預(yù)定義容量的 LinkedBlockingQueue)將導(dǎo)致在所有 corePoolSize 線程都忙時(shí)新任務(wù)在隊(duì)列中等待。這樣惶我,創(chuàng)建的線程就不會超過 corePoolSize妈倔。(因此,maximumPoolSize 的值也就無效了绸贡。)當(dāng)每個(gè)任務(wù)完全獨(dú)立于其他任務(wù)盯蝴,即任務(wù)執(zhí)行互不影響時(shí),適合于使用無界隊(duì)列听怕;例如捧挺,在 Web 頁服務(wù)器中。這種排隊(duì)可用于處理瞬態(tài)突發(fā)請求尿瞭,當(dāng)命令以超過隊(duì)列所能處理的平均數(shù)連續(xù)到達(dá)時(shí)闽烙,此策略允許無界線程具有增長的可能性;
- 有界隊(duì)列。當(dāng)使用有限的 maximumPoolSizes 時(shí)声搁,有界隊(duì)列(如 ArrayBlockingQueue)有助于防止資源耗盡鸣峭,但是可能較難調(diào)整和控制。隊(duì)列大小和最大池大小可能需要相互折衷:使用大型隊(duì)列和小型池可以最大限度地降低 CPU 使用率酥艳、操作系統(tǒng)資源和上下文切換開銷摊溶,但是可能導(dǎo)致人工降低吞吐量。如果任務(wù)頻繁阻塞(例如充石,如果它們是 I/O 邊界)莫换,則系統(tǒng)可能為超過您許可的更多線程安排時(shí)間。使用小型隊(duì)列通常要求較大的池大小骤铃,CPU 使用率較高拉岁,但是可能遇到不可接受的調(diào)度開銷,這樣也會降低吞吐量.
3>ThreadFactory 使用 ThreadFactory 創(chuàng)建新線程惰爬。如果沒有另外說明喊暖,則在同一個(gè) ThreadGroup 中一律使用 Executors.defaultThreadFactory() 創(chuàng)建線程,并且這些線程具有相同的 NORM_PRIORITY 優(yōu)先級和非守護(hù)進(jìn)程狀態(tài)撕瞧。通過提供不同的 ThreadFactory陵叽,可以改變線程的名稱狞尔、線程組、優(yōu)先級巩掺、守護(hù)進(jìn)程狀態(tài)等等偏序。如果從 newThread 返回 null 時(shí) ThreadFactory 未能創(chuàng)建線程,則執(zhí)行程序?qū)⒗^續(xù)運(yùn)行胖替,但不能執(zhí)行任何任務(wù)研儒。
public interface ThreadFactory {
Thread newThread(Runnable r);
}
而構(gòu)造方法中的threadFactory對象,是通過 Executors.defaultThreadFactory()返回的独令。Executors.java中的defaultThreadFactory()源碼如下:
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
在DefaultThreadFactory類中實(shí)現(xiàn)了ThreadFactory接口并對其中定義的方法進(jìn)行了實(shí)現(xiàn)端朵,如下:
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
}
// 為線程池創(chuàng)建新的任務(wù)執(zhí)行線程
public Thread newThread(Runnable r) {
// 線程對應(yīng)的任務(wù)是Runnable對象r
Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(), 0);
// 設(shè)為非守護(hù)線程
if (t.isDaemon())
t.setDaemon(false);
// 將優(yōu)先級設(shè)為Thread.NORM_PRIORITY
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
構(gòu)造方法
在ThreadPoolExecutor類中提供了四個(gè)構(gòu)造方法:
public class ThreadPoolExecutor extends AbstractExecutorService {
.....
//5個(gè)參數(shù),缺少ThreadFactory threadFactory,RejectedExecutionHandler handler
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue);
//6個(gè)參數(shù)燃箭,缺少RejectedExecutionHandler handler
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
//6個(gè)參數(shù)冲呢,缺少ThreadFactory threadFactory
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
//7個(gè)參數(shù),主要供前面3個(gè)方法調(diào)用
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
...
}
從上面的代碼可以得知遍膜,ThreadPoolExecutor繼承了AbstractExecutorService類碗硬,并提供了四個(gè)構(gòu)造器,事實(shí)上瓢颅,通過觀察每個(gè)構(gòu)造器的源碼具體實(shí)現(xiàn)恩尾,發(fā)現(xiàn)前面三個(gè)構(gòu)造器都是調(diào)用的第四個(gè)構(gòu)造器進(jìn)行的初始化工作。
下面解釋下一下構(gòu)造器中各個(gè)參數(shù)的含義:
corePoolSize:核心池的大小挽懦,這個(gè)參數(shù)跟后面講述的線程池的實(shí)現(xiàn)原理有非常大的關(guān)系翰意。在創(chuàng)建了線程池后,默認(rèn)情況下信柿,線程池中并沒有任何線程冀偶,而是等待有任務(wù)到來才創(chuàng)建線程去執(zhí)行任務(wù),除非調(diào)用了prestartAllCoreThreads()或者prestartCoreThread()方法渔嚷,從這2個(gè)方法的名字就可以看出进鸠,是預(yù)創(chuàng)建線程的意思,即在沒有任務(wù)到來之前就創(chuàng)建corePoolSize個(gè)線程或者一個(gè)線程形病。默認(rèn)情況下客年,在創(chuàng)建了線程池后,線程池中的線程數(shù)為0漠吻,當(dāng)有任務(wù)來之后量瓜,就會創(chuàng)建一個(gè)線程去執(zhí)行任務(wù),當(dāng)線程池中的線程數(shù)目達(dá)到corePoolSize后途乃,就會把到達(dá)的任務(wù)放到緩存隊(duì)列當(dāng)中绍傲;
maximumPoolSize:線程池最大線程數(shù),這個(gè)參數(shù)也是一個(gè)非常重要的參數(shù)耍共,它表示在線程池中最多能創(chuàng)建多少個(gè)線程烫饼;
keepAliveTime:表示線程沒有任務(wù)執(zhí)行時(shí)最多保持多久時(shí)間會終止猎塞。默認(rèn)情況下,只有當(dāng)線程池中的線程數(shù)大于corePoolSize時(shí)枫弟,keepAliveTime才會起作用邢享,直到線程池中的線程數(shù)不大于corePoolSize鹏往,即當(dāng)線程池中的線程數(shù)大于corePoolSize時(shí)淡诗,如果一個(gè)線程空閑的時(shí)間達(dá)到keepAliveTime,則會終止伊履,直到線程池中的線程數(shù)不超過corePoolSize韩容。但是如果調(diào)用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數(shù)不大于corePoolSize時(shí)唐瀑,keepAliveTime參數(shù)也會起作用群凶,直到線程池中的線程數(shù)為0;
unit:參數(shù)keepAliveTime的時(shí)間單位哄辣,有7種取值请梢,在TimeUnit類中有7種靜態(tài)屬性:
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小時(shí)
TimeUnit.MINUTES; //分鐘
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //納秒
workQueue:一個(gè)阻塞隊(duì)列,用來存儲等待執(zhí)行的任務(wù)力穗,這個(gè)參數(shù)的選擇也很重要毅弧,會對線程池的運(yùn)行過程產(chǎn)生重大影響,一般來說当窗,這里的阻塞隊(duì)列有以下幾種選擇:
ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;
//ArrayBlockingQueue和PriorityBlockingQueue使用較少够坐,一般使用LinkedBlockingQueue和Synchronous。
//線程池的排隊(duì)策略與BlockingQueue有關(guān)崖面。
<small>(BlockingQueue:阻塞隊(duì)列(BlockingQueue)是java.util.concurrent下的主要用來控制線程同步的工具元咙。如果BlockQueue是空的,從BlockingQueue取東西的操作將會被阻斷進(jìn)入等待狀態(tài),直到BlockingQueue進(jìn)了東西才會被喚醒。同樣,如果BlockingQueue是滿的,任何試圖往里存東西的操作也會被阻斷進(jìn)入等待狀態(tài),直到BlockingQueue里有空間才會被喚醒繼續(xù)操作巫员。阻塞隊(duì)列常用于生產(chǎn)者和消費(fèi)者的場景庶香,生產(chǎn)者是往隊(duì)列里添加元素的線程,消費(fèi)者是從隊(duì)列里拿元素的線程简识。阻塞隊(duì)列就是生產(chǎn)者存放元素的容器赶掖,而消費(fèi)者也只從容器里拿元素。具體的實(shí)現(xiàn)類有LinkedBlockingQueue,ArrayBlockingQueued等财异。一般其內(nèi)部的都是通過Lock和Condition(顯示鎖(Lock)及Condition的學(xué)習(xí)與使用)來實(shí)現(xiàn)阻塞和喚醒倘零。)</small>
threadFactory:線程工廠,主要用來創(chuàng)建線程(上面有介紹)戳寸;
handler:表示當(dāng)拒絕處理任務(wù)時(shí)的策略呈驶,有以下四種取值:
ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù)疫鹊,但是不拋出異常袖瞻。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù)司致,然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)
從上面給出的ThreadPoolExecutor類的代碼可以知道,ThreadPoolExecutor繼承了AbstractExecutorService聋迎,我們來看一下AbstractExecutorService的實(shí)現(xiàn):
public abstract class AbstractExecutorService implements ExecutorService {
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
public Future<?> submit(Runnable task) {};
public <T> Future<T> submit(Runnable task, T result) { };
public <T> Future<T> submit(Callable<T> task) { };
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
};
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
};
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
};
}
AbstractExecutorService是一個(gè)抽象類脂矫,它實(shí)現(xiàn)了ExecutorService接口。
我們接著看ExecutorService接口的實(shí)現(xiàn):
public interface ExecutorService extends Executor {
void shutdown();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
而ExecutorService又是繼承了Executor接口霉晕,我們看一下Executor接口的實(shí)現(xiàn):
public interface Executor {
void execute(Runnable command);
}
到這里庭再,大家應(yīng)該明白了ThreadPoolExecutor、AbstractExecutorService牺堰、ExecutorService和Executor幾個(gè)之間的關(guān)系了拄轻。
Executor是一個(gè)頂層接口,在它里面只聲明了一個(gè)方法execute(Runnable)伟葫,返回值為void恨搓,參數(shù)為Runnable類型,從字面意思可以理解筏养,就是用來執(zhí)行傳進(jìn)去的任務(wù)的斧抱;
然后ExecutorService接口繼承了Executor接口,并聲明了一些方法:submit渐溶、invokeAll辉浦、invokeAny以及shutDown等;
(<small>ExecutorService任務(wù)周期管理接口:Executor的實(shí)現(xiàn)通常都會創(chuàng)建線程來執(zhí)行任務(wù)掌猛,但是使用異步方式來執(zhí)行任務(wù)時(shí)盏浙,由于之前提交任務(wù)的狀態(tài)不是立即可見的,所以如果要關(guān)閉應(yīng)用程序時(shí)荔茬,就需要將受影響的任務(wù)狀態(tài)反饋給應(yīng)用程序废膘。
為了解決執(zhí)行服務(wù)的生命周期問題,Executor擴(kuò)展了EecutorService接口慕蔚,添加了一些用于生命周期管理的方法丐黄。如上:</small>)
抽象類AbstractExecutorService實(shí)現(xiàn)了ExecutorService接口,基本實(shí)現(xiàn)了ExecutorService中聲明的所有方法孔飒;
然后ThreadPoolExecutor繼承了類AbstractExecutorService灌闺。
在ThreadPoolExecutor類中有幾個(gè)非常重要的方法:
execute()
submit()
shutdown()
shutdownNow()
execute()方法實(shí)際上是Executor中聲明的方法,在ThreadPoolExecutor進(jìn)行了具體的實(shí)現(xiàn)坏瞄,這個(gè)方法是ThreadPoolExecutor的核心方法桂对,通過這個(gè)方法可以向線程池提交一個(gè)任務(wù),交由線程池去執(zhí)行鸠匀。
submit()方法是在ExecutorService中聲明的方法蕉斜,在AbstractExecutorService就已經(jīng)有了具體的實(shí)現(xiàn),在ThreadPoolExecutor中并沒有對其進(jìn)行重寫,這個(gè)方法也是用來向線程池提交任務(wù)的宅此,但是它和execute()方法不同机错,它能夠返回任務(wù)執(zhí)行的結(jié)果,去看submit()方法的實(shí)現(xiàn)父腕,會發(fā)現(xiàn)它實(shí)際上還是調(diào)用的execute()方法弱匪,只不過它利用了Future來獲取任務(wù)執(zhí)行結(jié)果。
shutdown()和shutdownNow()是用來關(guān)閉線程池的璧亮。
還有很多其他的方法:
比如:getQueue() 萧诫、getPoolSize() 、getActiveCount()杜顺、getCompletedTaskCount()等獲取與線程池相關(guān)屬性的方法财搁,有興趣的朋友可以自行查閱API蘸炸。
參考來自http://blog.csdn.net/he90227/article/details/52576452
參考來自http://blog.csdn.net/mazhimazh/article/details/19243889