Executor框架簡介
Eexecutor作為靈活且強(qiáng)大的異步執(zhí)行框架遭贸,其支持多種不同類型的任務(wù)執(zhí)行策略,提供了一種標(biāo)準(zhǔn)的方法將任務(wù)的提交過程和執(zhí)行過程解耦開發(fā)网沾,基于生產(chǎn)者-消費者模式癞蚕,其提交任務(wù)的線程相當(dāng)于生產(chǎn)者,執(zhí)行任務(wù)的線程相當(dāng)于消費者辉哥,并用Runnable來表示任務(wù)桦山,Executor的實現(xiàn)還提供了對生命周期的支持,以及統(tǒng)計信息收集醋旦,應(yīng)用程序管理機(jī)制和性能監(jiān)視等機(jī)制恒水。
Executor:一個接口,其定義了一個接收Runnable對象的方法executor(Runnable command))饲齐。
public interface Executor {
void execute(Runnable command);
}
ExecutorService:是一個比Executor使用更廣泛的子類接口寇窑,其提供了生命周期管理的方法,以及可跟蹤一個或多個異步任務(wù)執(zhí)行狀況返回Future的方法箩张。
public interface ExecutorService extends Executor {
/**
* 有序的關(guān)閉先前提交的執(zhí)行任務(wù)甩骏,若已經(jīng)關(guān)閉,則直接返回
*/
void shutdown();
/**
* 試圖停止已經(jīng)執(zhí)行的任務(wù)先慷,中止等待執(zhí)行的任務(wù)饮笛,并返回等待執(zhí)行的任務(wù)列表
*/
List<Runnable> shutdownNow();
/**
* 是否已經(jīng)停止
*/
boolean isShutdown();
/**
* 是否所有任務(wù)都已經(jīng)完成后關(guān)閉
* 注:除非shutdownNow()或者shutdown()先被調(diào)用,否則isTerminated()不可能返回true
*/
boolean isTerminated();
/**
* 阻塞论熙,一直到所有的task在shutdown之后都已經(jīng)執(zhí)行完畢 或者 發(fā)生超時 或者 當(dāng)前線程被中斷
*/
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
/**
* 提交一個有返回值的執(zhí)行任務(wù)福青,返回值是一個表示任務(wù)執(zhí)行結(jié)果的Future對象,這個Future對象的get方法將在任務(wù)執(zhí)行成功之后返回任務(wù)結(jié)果
* 如果你想立即阻塞等待任務(wù)脓诡,可以使用構(gòu)造器方式:result =exec.submit(aCallable).get();
* 注: Executors 類包括一組可以轉(zhuǎn)換其他常見的閉包類對象方法无午,比如:java.security.PrivilegedAction到Callable的形式,以便它們可以提交
*/
<T> Future<T> submit(Callable<T> task);
/**
* 提交一個有返回值的執(zhí)行任務(wù)祝谚,返回值是一個表示任務(wù)執(zhí)行結(jié)果的Future對象宪迟,這個Future對象的get方法將在任務(wù)執(zhí)行成功之后返回任務(wù)結(jié)果且返回到result入?yún)⑸稀? */
<T> Future<T> submit(Runnable task, T result);
/**
* 提交一個有返回值的執(zhí)行任務(wù),返回值是一個表示任務(wù)執(zhí)行結(jié)果的Future對象交惯,這個Future對象的get方法將在任務(wù)執(zhí)行成功之后返回任務(wù)結(jié)果且返回null次泽。
*/
Future<?> submit(Runnable task);
/**
* 執(zhí)行入?yún)⒅薪o的任務(wù),當(dāng)所有任務(wù)執(zhí)行完畢之后席爽,返回持有這些任務(wù)狀態(tài)和結(jié)果的Future集合
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
/**
* 執(zhí)行入?yún)⒅薪o的任務(wù)意荤,當(dāng)所有任務(wù)執(zhí)行完畢之后或者超時時,返回持有這些任務(wù)狀態(tài)和結(jié)果的Future集合
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
/**
* 執(zhí)行入?yún)⒅薪o的任務(wù)只锻,返回成功執(zhí)行任務(wù)的結(jié)果集中的一個
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
/**
* 執(zhí)行入?yún)⒅薪o的任務(wù)玖像,返回成功執(zhí)行任務(wù)的結(jié)果集中的一個,有執(zhí)行的超時時間
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
ThreadPoolExecutor:其構(gòu)造函數(shù)的各個參數(shù)說明如下:
/**
* @corePoolSize:核心線程數(shù)齐饮,如果運行的線程少于corePoolSize捐寥,則創(chuàng)建新線程來執(zhí)行新任務(wù),即使線程池中的其他線程是空閑的
* @maximumPoolSize:最大線程數(shù)沈矿,可允許創(chuàng)建的線程數(shù)
* 注:corePoolSize和maximumPoolSize設(shè)置的邊界自動調(diào)整池大猩险妗:corePoolSize <運行的線程數(shù)< maximumPoolSize:僅當(dāng)隊列滿時才創(chuàng)建新線程
corePoolSize=運行的線程數(shù)= maximumPoolSize:創(chuàng)建固定大小的線程池
* @keepAliveTime:如果線程數(shù)多于corePoolSize,則這些多余的線程的空閑時間超過keepAliveTime時將被終止
* @unit:keepAliveTime參數(shù)的時間單位
* @workQueue:保存線程任務(wù)的阻塞隊列,與線程池的大小有關(guān)
* 當(dāng)運行的線程數(shù)少于corePoolSize時羹膳,在有新任務(wù)時直接創(chuàng)建新線程來執(zhí)行任務(wù)而無需再進(jìn)隊列
* 當(dāng)運行的線程數(shù)等于或多于corePoolSize睡互,在有新任務(wù)添加時則先加入隊列,不直接創(chuàng)建線程
* 當(dāng)workQueue滿時陵像,再有新任務(wù)時就創(chuàng)建新線程
* @threadFactory(非必須) :使用ThreadFactory創(chuàng)建新線程就珠,默認(rèn)使用defaultThreadFactory創(chuàng)建線程
* @handler(非必須):定義處理被拒絕任務(wù)的策略,默認(rèn)使用ThreadPoolExecutor.AbortPolicy,任務(wù)被拒絕時將拋出RejectExecutorException
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 屬性賦值醒颖。妻怎。。
}
/**
* ThreadPoolExecutor的核心構(gòu)造器的參數(shù)詳解:
* corePoolSize 核心線程池大小
* maximumPoolSize 最大線程池大小
* keepAliveTime 線程池中超過corePoolSize數(shù)目的空閑線程最大存活時間泞歉;可以allowCoreThreadTimeOut(true)使得核心線程有效時間
* TimeUnit keepAliveTime時間單位
* workQueue 阻塞任務(wù)隊列
* threadFactory 新建線程工廠
* RejectedExecutionHandler 當(dāng)提交任務(wù)數(shù)超過maxmumPoolSize+workQueue之和時逼侦,任務(wù)會交給RejectedExecutionHandler來處理
*/
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
return thread;
}
});
Executors:提供了一系列靜態(tài)工廠方法用于創(chuàng)建各種線程池
/**
* 創(chuàng)建可重用且固定線程數(shù)的線程池匿辩,如果線程池中的所有線程都處于活動狀態(tài),此時再提交任務(wù)就在隊列中等待榛丢,直到有可用線程铲球;
* 如果線程池中的某個線程由于異常而結(jié)束時,線程池就會再補(bǔ)充一條新線程
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
// 使用一個基于FIFO排序的阻塞隊列晰赞,在所有corePoolSize線程都忙時新任務(wù)將在隊列中等待
new LinkedBlockingQueue<Runnable>());
}
/**
* 創(chuàng)建一個單線程的Executor稼病,如果該線程因為異常而結(jié)束就新建一條線程來繼續(xù)執(zhí)行后續(xù)的任務(wù)
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
// corePoolSize和maximumPoolSize都等于,表示固定線程池大小為1
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
/**
* 創(chuàng)建可緩存的線程池掖鱼,如果線程池中的線程在60秒未被使用就將被移除然走,在執(zhí)行新的任務(wù)時,當(dāng)線程池中有之前創(chuàng)建的可用線程戏挡,就重用可用線程芍瑞,否則就新建一條線程
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
// 使用同步隊列,將任務(wù)直接提交給線程
new SynchronousQueue<Runnable>());
}
Executor的生命周期
ExecutorService提供了管理Eecutor生命周期的方法增拥,ExecutorService的生命周期包括了:運行啄巧、 關(guān)閉和終止三種狀態(tài)。
- ExecutorService在初始化創(chuàng)建時處于運行狀態(tài)掌栅。
- shutdown()方法等待提交的任務(wù)執(zhí)行完成并不再接受新任務(wù)秩仆,在完成全部提交的任務(wù)后關(guān)閉
- shutdownNow()方法將強(qiáng)制終止所有運行中的任務(wù)并不再允許提交新任務(wù)