1.ThreadPoolExecutor類
java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個類羹与,因此如果要透徹地了解Java中的線程池挨下,必須先了解這個類涛癌。下面我們來看一下ThreadPoolExecutor類的具體實現源碼。
在ThreadPoolExecutor類中提供了四個構造方法:
public class ThreadPoolExecutor extends AbstractExecutorService {
.....
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
...
}
從上面的代碼可以得知弊添,ThreadPoolExecutor繼承了AbstractExecutorService類,并提供了四個構造器,事實上箩兽,通過觀察每個構造器的源碼具體實現,發(fā)現前面三個構造器都是調用的第四個構造器進行的初始化工作章喉。
下面解釋一下構造器中各個參數的含義:
corePoolSize
核心池的大小汗贫,這個參數跟后面講述的線程池的實現原理有非常大的關系。在創(chuàng)建了線程池后秸脱,默認情況下落包,線程池中并沒有任何線程,而是等待有任務到來才創(chuàng)建線程去執(zhí)行任務摊唇,除非調用了prestartAllCoreThreads()或者prestartCoreThread()方法咐蝇,從這2個方法的名字就可以看出,是預創(chuàng)建線程的意思巷查,即在沒有任務到來之前就創(chuàng)建corePoolSize個線程或者一個線程嘹害。默認情況下,在創(chuàng)建了線程池后吮便,線程池中的線程數為0笔呀,當有任務來之后,就會創(chuàng)建一個線程去執(zhí)行任務髓需,當線程池中的線程數目達到corePoolSize后许师,就會把到達的任務放到緩存隊列當中;
maximumPoolSize
線程池最大線程數僚匆,這個參數也是一個非常重要的參數微渠,它表示在線程池中最多能創(chuàng)建多少個線程;
keepAliveTime
表示線程沒有任務執(zhí)行時最多保持多久時間會終止咧擂。默認情況下逞盆,只有當線程池中的線程數大于corePoolSize時,keepAliveTime才會起作用松申,直到線程池中的線程數不大于corePoolSize云芦,即當線程池中的線程數大于corePoolSize時俯逾,如果一個線程空閑的時間達到keepAliveTime,則會終止舅逸,直到線程池中的線程數不超過corePoolSize桌肴。但是如果調用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數不大于corePoolSize時琉历,keepAliveTime參數也會起作用坠七,直到線程池中的線程數為0;
unit
參數keepAliveTime的時間單位旗笔,有7種取值彪置,在TimeUnit類中有7種靜態(tài)屬性:
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小時
TimeUnit.MINUTES; //分鐘
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //納秒
workQueue
一個阻塞隊列,用來存儲等待執(zhí)行的任務蝇恶,這個參數的選擇也很重要悉稠,會對線程池的運行過程產生重大影響,一般來說艘包,這里的阻塞隊列有以下幾種選擇:
ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;
PriorityBlockingQueue;
ArrayBlockingQueue和PriorityBlockingQueue使用較少的猛,一般使用LinkedBlockingQueue和Synchronous。線程池的排隊策略與BlockingQueue有關.
threadFactory
線程工廠想虎,主要用來創(chuàng)建線程卦尊;
handler
ThreadPoolExecutor.AbortPolicy:丟棄任務并拋出RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務舌厨,但是不拋出異常岂却。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新嘗試執(zhí)行任務(重復此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務
2.ThreadPoolExecutor的父類
AbstractExecutorService
public abstract class AbstractExecutorService implements ExecutorService {
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
public Future<?> submit(Runnable task) {};
public <T> Future<T> submit(Runnable task, T result) { };
public <T> Future<T> submit(Callable<T> task) { };
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
};
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
};
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
};
}
AbstractExecutorService是一個抽象類裙椭,它實現了ExecutorService接口躏哩。
public interface ExecutorService extends Executor {
void shutdown();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
而ExecutorService又是繼承了Executor接口
public interface Executor {
void execute(Runnable command);
}
Executor是一個頂層接口,在它里面只聲明了一個方法execute(Runnable)揉燃,返回值為void扫尺,參數為Runnable類型,從字面意思可以理解炊汤,就是用來執(zhí)行傳進去的任務的正驻;
然后ExecutorService接口繼承了Executor接口,并聲明了一些方法:submit抢腐、invokeAll姑曙、invokeAny以及shutDown等;
抽象類AbstractExecutorService實現了ExecutorService接口迈倍,基本實現了ExecutorService中聲明的所有方法伤靠;
然后ThreadPoolExecutor繼承了類AbstractExecutorService。
在ThreadPoolExecutor類中有幾個非常重要的方法:
execute() //向線程池提交一個任務啼染,交由線程池去執(zhí)行
submit()
shutdown() //不會立即終止線程池宴合,而是要等所有任務緩存隊列中的任務都執(zhí)行完后才終止焕梅,但再也不會接受新的任務
shutdownNow() //立即終止線程池,并嘗試打斷正在執(zhí)行的任務形纺,并且清空任務緩存隊列,返回尚未執(zhí)行的任務
execute()方法實際上是Executor中聲明的方法徒欣,在ThreadPoolExecutor進行了具體的實現逐样,這個方法是ThreadPoolExecutor的核心方法,通過這個方法可以向線程池提交一個任務打肝,交由線程池去執(zhí)行脂新。
submit()方法是在ExecutorService中聲明的方法,在AbstractExecutorService就已經有了具體的實現粗梭,在ThreadPoolExecutor中并沒有對其進行重寫争便,這個方法也是用來向線程池提交任務的,但是它和execute()方法不同断医,它能夠返回任務執(zhí)行的結果滞乙,去看submit()方法的實現,會發(fā)現它實際上還是調用的execute()方法鉴嗤,只不過它利用了Future來獲取任務執(zhí)行結果(Future相關內容將在下一篇講述)斩启。
shutdown()和shutdownNow()是用來關閉線程池的.
3.Executors
在java doc中,并不提倡直接使用ThreadPoolExecutor醉锅,而是使用Executors類中提供的幾個靜態(tài)方法來創(chuàng)建線程池:
CachedThreadPool
創(chuàng)建一個可緩存線程池兔簇,如果線程池長度超過處理需要,可靈活回收空閑線程硬耍,若無可回收垄琐,則新建線程。
這種類型的線程池特點是:
工作線程的創(chuàng)建數量幾乎沒有限制(其實也有限制的,數目為Interger. MAX_VALUE), 這樣可靈活的往線程池中添加線程经柴。
如果長時間沒有往線程池中提交任務狸窘,即如果工作線程空閑了指定的時間(默認為1分鐘),則該工作線程將自動終止坯认。終止后朦前,如果你又提交了新的任務,則線程池重新創(chuàng)建一個工作線程鹃操。
在使用CachedThreadPool時韭寸,一定要注意控制任務的數量,否則荆隘,由于大量線程同時運行恩伺,很有會造成系統(tǒng)癱瘓。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
newCachedThreadPool將corePoolSize設置為0椰拒,將maximumPoolSize設置為Integer.MAX_VALUE晶渠,使用的SynchronousQueue凰荚,也就是說來了任務就創(chuàng)建線程運行,當線程空閑超過60秒褒脯,就銷毀線程便瑟。
newFixedThreadPool
創(chuàng)建一個指定工作線程數量的線程池。每當提交一個任務就創(chuàng)建一個工作線程番川,如果工作線程數量達到線程池初始的最大數到涂,則將提交的任務存入到池隊列中。
FixedThreadPool是一個典型且優(yōu)秀的線程池颁督,它具有線程池提高程序效率和節(jié)省創(chuàng)建線程時所耗的開銷的優(yōu)點践啄。但是,在線程池空閑時沉御,即線程池中沒有可運行任務時屿讽,它不會釋放工作線程,還會占用一定的系統(tǒng)資源吠裆。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
newFixedThreadPool創(chuàng)建的線程池corePoolSize和maximumPoolSize值是相等的伐谈,它使用的LinkedBlockingQueue;
newSingleThreadExecutor
創(chuàng)建一個單線程化的Executor试疙,即只創(chuàng)建唯一的工作者線程來執(zhí)行任務衩婚,它只會用唯一的工作線程來執(zhí)行任務,保證所有任務按照指定順序(FIFO, LIFO, 優(yōu)先級)執(zhí)行效斑。如果這個線程異常結束非春,會有另一個取代它,保證順序執(zhí)行缓屠。單工作線程最大的特點是可保證順序地執(zhí)行各個任務奇昙,并且在任意給定的時間不會有多個線程是活動的。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
newSingleThreadExecutor將corePoolSize和maximumPoolSize都設置為1敌完,使用的是LinkedBlockingQueue
newScheduleThreadPool
創(chuàng)建一個定長的線程池储耐,而且支持定時的以及周期性的任務執(zhí)行,支持定時及周期性任務執(zhí)行滨溉。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
3.線程池工作原理
1.如果當前線程池中的線程數目小于corePoolSize什湘,則每來一個任務,就會創(chuàng)建一個線程去執(zhí)行這個任務晦攒;
2.如果當前線程池中的線程數目>=corePoolSize闽撤,則每來一個任務,會嘗試將其添加到任務緩存隊列當中脯颜,若添加成功哟旗,則該任務會等待空閑線程將其取出去執(zhí)行;若添加失敗(一般來說是任務緩存隊列已滿)闸餐,則會嘗試創(chuàng)建新的線程去執(zhí)行這個任務饱亮;
3.如果當前線程池中的線程數目達到maximumPoolSize,則會采取任務拒絕策略進行處理舍沙;
4.如果線程池中的線程數量大于 corePoolSize時近上,如果某線程空閑時間超過keepAliveTime,線程將被終止拂铡,直至線程池中的線程數目不大于corePoolSize壹无;如果允許為核心池中的線程設置存活時間,那么核心池中的線程空閑時間超過keepAliveTime和媳,線程也會被終止格遭。