如果你要開啟線程執(zhí)行任務(wù),你會怎么做蒋情?
- 開啟一個線程,然后串行執(zhí)行所有任務(wù)
- 一個任務(wù)開啟一個線程耸携,任務(wù)不會再等待棵癣,但是線程的創(chuàng)建銷毀是需要時間的;線程會消耗系統(tǒng)資源夺衍,尤其是內(nèi)存狈谊;線程數(shù)的增加在一定范圍內(nèi)會提高吞吐率,但過多只會降低系統(tǒng)的執(zhí)行速度沟沙。
由于以上幾點原因有了Executor河劝,接下來看看Executor的好處。
先來看看幾個類:Executors, Executor, ThreadPoolExecutor, ExecutorService, Future, Callable, Runnable, FutureTask矛紫,先理清這些類關(guān)系丧裁。
public interface Executor {
void execute(Runnable command);
}
Executor 接口是任務(wù)執(zhí)行的主要抽象,將任務(wù)本身和執(zhí)行任務(wù)分離含衔,一個接收Runnable任務(wù)的execute方法煎娇。
public interface ExecutorService extends Executor
ExecutorService用來控制線程池的方法;如:shutdown:當(dāng)調(diào)用 shutDown 方法時贪染,線程池會停止接受新的任務(wù)缓呛,isShutdown, isTerminated...來判斷線程池運(yùn)行情況杭隙;他還提供了
<T> Future<T> submit(Callable<T> task);
該方法與Executor.execute一樣代表任務(wù)執(zhí)行哟绊,不過不同的是submit接收一個Callable而且有返回值,返回一個Future痰憎;
那么Callable是什么票髓?
public interface Callable<V> {
V call() throws Exception;
}
Callable是Runnable的替代,它與Runnable一樣是任務(wù)的抽象铣耘,跟Executor(任務(wù)執(zhí)行的抽象)概念區(qū)分開洽沟,它有返回值,而且能拋異常蜗细,能拋異常我們就可以利用這一點來在任務(wù)執(zhí)行過程中判斷任務(wù)執(zhí)行情況裆操,是被取消了怒详,或者是出錯拋異常了,我們還可以實現(xiàn)他的超時拋異常功能主動通知調(diào)用放任務(wù)執(zhí)行超時了踪区;
那么Future昆烁?
Future接口代表異步計算的結(jié)果,通過Future接口提供的方法可以查看異步計算是否執(zhí)行完成缎岗,或者等待執(zhí)行結(jié)果并獲取執(zhí)行結(jié)果静尼,同時還可以取消執(zhí)行。Future接口的定義如下:
public interface Future<V> {
(只有當(dāng)你非常清楚線程的中斷策略時传泊,才應(yīng)該為true)
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
- cancel():cancel()方法用來取消異步任務(wù)的執(zhí)行鼠渺。如果異步任務(wù)已經(jīng)完成或者已經(jīng)被取消,或者由于某些原因不能取消或渤,則會返回false系冗。如果任務(wù)還沒有被執(zhí)行,則會返回true并且異步任務(wù)不會被執(zhí)行薪鹦。如果任務(wù)已經(jīng)開始執(zhí)行了但是還沒有執(zhí)行完成掌敬,若mayInterruptIfRunning為true,則會立即中斷執(zhí)行任務(wù)的線程并返回true池磁,若mayInterruptIfRunning為false奔害,則會返回true且不會中斷任務(wù)執(zhí)行線程。
- isCanceled():判斷任務(wù)是否被取消地熄,如果任務(wù)在結(jié)束(正常執(zhí)行結(jié)束或者執(zhí)行異常結(jié)束)前被取消則返回true华临,否則返回false。
- isDone():判斷任務(wù)是否已經(jīng)完成端考,如果完成則返回true雅潭,否則返回false。需要注意的是:任務(wù)執(zhí)行過程中發(fā)生異常却特、任務(wù)被取消也屬于任務(wù)已完成扶供,也會返回true。
- get():獲取任務(wù)執(zhí)行結(jié)果裂明,如果任務(wù)還沒完成則會阻塞等待直到任務(wù)執(zhí)行完成椿浓。如果任務(wù)被取消則會拋出CancellationException異常,如果任務(wù)執(zhí)行過程發(fā)生異常則會拋出ExecutionException異常闽晦,如果阻塞等待過程中被中斷則會拋出InterruptedException異常扳碍。
- get(long timeout,Timeunit unit):帶超時時間的get()版本,如果阻塞等待過程中超時則會拋出TimeoutException異常
submit返回了Future仙蛉,我們就可以利用Future來查看任務(wù)的狀態(tài)isCancelled(是否被取消)笋敞,isDone(是否完成),get阻塞直到任務(wù)完成返回任務(wù)執(zhí)行結(jié)果捅儒;
FutureTask
FutureTask是Future是唯一實現(xiàn)類液样,它同時實現(xiàn)了Runnable
Exectors工廠類提供了線程池的初始化接口
ThreadPoolExecutor為一些Executor提供了基本實現(xiàn)振亮,這些Executor是由Executors中的newCachedThreadPool, newFixedThreadPool和newScheduledThreadExecutor等工廠方法返回的巧还。ThreadPoolExecutor是一個靈活穩(wěn)定的線程池鞭莽,允許進(jìn)行各種定制。
Executors
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
初始化一個指定線程數(shù)的線程池麸祷,其中corePoolSize == maximumPoolSize澎怒,使用LinkedBlockingQuene作為阻塞隊列,不過當(dāng)線程池沒有可執(zhí)行任務(wù)時阶牍,也不會釋放線程喷面。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- 初始化一個可以緩存線程的線程池,默認(rèn)緩存60s走孽,線程池的線程數(shù)可達(dá)到Integer.MAX_VALUE惧辈,即2147483647,內(nèi)部使用SynchronousQueue作為阻塞隊列磕瓷;
- 和newFixedThreadPool創(chuàng)建的線程池不同盒齿,newCachedThreadPool在沒有任務(wù)執(zhí)行時,當(dāng)線程的空閑時間超過keepAliveTime困食,會自動釋放線程資源边翁,當(dāng)提交新任務(wù)時,如果沒有空閑線程硕盹,則創(chuàng)建新線程執(zhí)行任務(wù)符匾,會導(dǎo)致一定的系統(tǒng)開銷;
所以瘩例,使用該線程池時啊胶,一定要注意控制并發(fā)的任務(wù)數(shù),否則創(chuàng)建大量的線程可能導(dǎo)致嚴(yán)重的性能問題垛贤。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
初始化的線程池中只有一個線程焰坪,如果該線程異常結(jié)束,會重新創(chuàng)建一個新的線程繼續(xù)執(zhí)行任務(wù)南吮,唯一的線程可以保證所提交任務(wù)的順序執(zhí)行琳彩,內(nèi)部使用LinkedBlockingQueue作為阻塞隊列。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
初始化的線程池可以在指定的時間內(nèi)周期性的執(zhí)行所提交的任務(wù)部凑,在實際的業(yè)務(wù)場景中可以使用該線程池定期的同步數(shù)據(jù)露乏。
參數(shù)
以newFixedThreadPool為例來說說各個參數(shù)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}
corePoolSize核心線程數(shù),每提交一個任務(wù)創(chuàng)建一個線程涂邀,直到線程數(shù)等于核心線程數(shù)瘟仿,此后提交的任務(wù)會被放在阻塞隊列中;如果執(zhí)行了線程池的prestartAllCoreThreads()方法比勉,線程池會提前創(chuàng)建并啟動所有核心線程劳较。
maximumPoolSize最大線程數(shù);如果阻塞隊列滿了驹止,會創(chuàng)建線程執(zhí)行新提交的任務(wù),前提是當(dāng)前線程數(shù)要小于maximumPoolSize
keepAliveTime線程空閑時的存活時間观蜗,即當(dāng)線程沒有任務(wù)執(zhí)行時臊恋,繼續(xù)存活的時間;默認(rèn)情況下墓捻,該參數(shù)只在線程數(shù)大于corePoolSize時才有用抖仅;
unitkeepAliveTime的單位
workQueue
用來保存等待被執(zhí)行的任務(wù)的阻塞隊列,且任務(wù)必須實現(xiàn)Runable接口砖第,在JDK中提供了如下阻塞隊列:
1撤卢、ArrayBlockingQueue:基于數(shù)組結(jié)構(gòu)的有界阻塞隊列,按FIFO排序任務(wù)梧兼;
2放吩、LinkedBlockingQuene:基于鏈表結(jié)構(gòu)的阻塞隊列,按FIFO排序任務(wù)羽杰,吞吐量通常要高于ArrayBlockingQuene渡紫;
3、SynchronousQuene:一個不存儲元素的阻塞隊列忽洛,每個插入操作必須等到另一個線程調(diào)用移除操作腻惠,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于LinkedBlockingQuene欲虚;
4集灌、priorityBlockingQuene:具有優(yōu)先級的無界阻塞隊列;
threadFactory創(chuàng)建線程的工廠
handler線程池的飽和策略复哆,當(dāng)阻塞隊列滿了欣喧,且沒有空閑的工作線程,如果繼續(xù)提交任務(wù)梯找,必須采取一種策略處理該任務(wù)唆阿,線程池提供了4種策略:
1、AbortPolicy:直接拋出異常锈锤,默認(rèn)策略驯鳖;
2、CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù)久免;
3浅辙、DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù)阎姥;
4记舆、DiscardPolicy:直接丟棄任務(wù);
參考
《Java并發(fā)編程實戰(zhàn)》
深入分析java線程池的實現(xiàn)原理
Executor, ExecutorService 和 Executors 間的不同