本平臺的文章更新會有延遲艇炎,大家可以關注微信公眾號-顧林海窝革,包括年底前會更新kotlin由淺入深系列教程,目前計劃在微信公眾號進行首發(fā)宋梧,如果大家想獲取最新教程,請關注微信公眾號狰挡,謝謝!
ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor捂龄,而ThreadPoolExecutor是線程池的核心實現(xiàn)類,用來執(zhí)行被提交的任務加叁,ScheduledThreadPoolExecutor是一個實現(xiàn)定時任務的類倦沧,可以在給定的延遲后運行命令,或者定期執(zhí)行命令它匕。
ScheduledThreadPoolExecutor定義了四個構造函數(shù)展融,這四個構造函數(shù)如下:
/**
* @param corePoolSize 核心線程池的大小
*/
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
/**
* @param corePoolSize 核心線程池的大小
* @param threadFactory 用于設置創(chuàng)建線程的工廠
*/
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory);
}
/**
* @param corePoolSize 核心線程池的大小
* @param handler 飽和策略
*/
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), handler);
}
/**
* @param corePoolSize 核心線程池的大小
* @param threadFactory 用于設置創(chuàng)建線程的工廠
* @param handler 飽和策略
*/
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
通過源碼可以發(fā)現(xiàn),ScheduledThreadPoolExecutor的構造器都是調用父類的構造器也就是ThreadPoolExecutor的構造器豫柬,以此來創(chuàng)建一個線程池粹污。
ThreadPoolExecutor的構造器如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
創(chuàng)建一個線程池時需要輸入幾個參數(shù)跛璧,如下:
corePoolSize(線程池的基本大斜』怠):當提交一個任務到線程池時侵浸,線程池會創(chuàng)建一個線程來執(zhí)行任務,即使其它空閑的基本線程能夠執(zhí)行新任務也會創(chuàng)建線程创夜,等到需要執(zhí)行的任務數(shù)大于線程池基本大小時就不再創(chuàng)建杭跪,會把到達的任務放到緩存隊列當中仙逻。如果調用了線程池的prestartAllCoreThreads()方法驰吓,線程池會提前創(chuàng)建并啟動所有基本線程,或調用線程池的prestartCoreThread()方法系奉,線程池會提前創(chuàng)建一個線程檬贰。
maximumPoolSize(線程池最大數(shù)量):線程池允許創(chuàng)建的最大線程數(shù)。如果隊列滿了缺亮,并且已創(chuàng)建的線程數(shù)小于最大線程數(shù)翁涤,則線程池會再創(chuàng)建新的線程執(zhí)行任務。值得注意的是,如果使用了無界的任務隊列這個參數(shù)就沒什么效果葵礼。
KeepAliveTime(線程活動保持時間):線程池的工作線程空閑后号阿,保持存貨的時間。如果任務很多鸳粉,并且每個任務執(zhí)行的時間比較短扔涧,可以調大時間,提供線程的利用率届谈。
unit(線程活動保持時間的單位):可選的單位有天(DAYS)枯夜、小時(HOURS)、分鐘(MINUTES)艰山、毫秒(MILLISECONDS)湖雹、微妙(MICROSECONDS)、千分之一毫秒和納秒(NANOSECONDS曙搬、千分之一微妙)摔吏。
workQueue(任務隊列):用于保持等待執(zhí)行的任務的阻塞隊列,可以選擇以下幾個阻塞隊列纵装。
(1)ArrayBlockingQueue:是一個基于數(shù)組結構的有界阻塞隊列舔腾,此隊列按FIFO(先進先出)原則對元素進行排序。
(2)LinkedBlockingQueue:一個基于鏈表結構的阻塞隊列搂擦,此隊列按FIFO排序元素稳诚,吞吐量通常要高于ArrayBlockingQueue。
(3)SynchronousQueue:一個不存儲元素的阻塞隊列瀑踢。每個插入操作必須等到另一個線程調用移除操作扳还,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于LinkedBlockingQueue橱夭。
(4)PriorityBlockingQueue:一個具有優(yōu)先級的無限阻塞隊列氨距。ThreadFactory:用于設置創(chuàng)建線程的工廠,可以通過線程工廠給每個創(chuàng)建出來的線程設置更有意義的名字棘劣。
RejectedExecutionHandler(飽和策略):當隊列和線程池都滿了俏让,說明線程池處于飽和狀態(tài),那么必須采取一種策略處理提交的新任務茬暇。這個策略默認情況下是AbortPolicy首昔,表示無法處理新任務時拋出異常。在JDK1.5中Java線程池框架提供了4種策略(也可通過實現(xiàn)RejectedExecutionHandler接口自定義策略)糙俗。
(1)AbortPolicy:直接拋出異常勒奇。
(2)CallerRunsPolicy:只用調用者所在線程來運行任務。
(3)DiscardOldestPolicy:丟棄隊列里最近的一個任務巧骚,并執(zhí)行當前任務赊颠。
(4)DiscardPolicy:處理格二,丟棄掉。
在ScheduledThreadPoolExecutor構造器中使用了工作隊列java.util.concurrent.ScheduledThreadPoolExecutor.DelayedWorkQueue竣蹦,DelayedWorkQueue是一個無界的BlockingQueue顶猜,
用于放置實現(xiàn)了Delayed接口的對象,其中的對象只能在其到期才能從隊列中取走痘括。
由于ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor驶兜,因此它也實現(xiàn)了ThreadPoolExecutor的方法,如下:
public void execute(Runnable command) {
...
}
public Future<?> submit(Runnable task) {
...
}
public <T> Future<T> submit(Runnable task, T result) {
...
}
public <T> Future<T> submit(Callable<T> task) {
...
}
同時它也有自己的定時執(zhí)行任務的方法:
/**
* 延遲delay時間后開始執(zhí)行task远寸,無法獲取task的執(zhí)行結果抄淑。
*/
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
...
}
/**
* 延遲delay時間后開始執(zhí)行callable,它接收的是一個Callable實例驰后,
* 此方法會返回一個ScheduleFuture對象肆资,通過ScheduleFuture我們
* 可以取消一個未執(zhí)行的task,也可以獲得這個task的執(zhí)行結果灶芝。
*/
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
...
}
/**
* 延遲initialDelay時間后開始執(zhí)行command郑原,并且按照period時間周期性
* 重復調用,當任務執(zhí)行時間大于間隔時間時夜涕,之后的任務都會延遲犯犁,此時與
* Timer中的schedule方法類似。
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
...
}
/**
*延遲initialDelay時間后開始執(zhí)行command女器,并且按照period時間周期性重復
*調用酸役,這里的間隔時間delay是等上一個任務完全執(zhí)行完畢才開始計算。
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
...
}
ScheduledThreadPoolExecutor把待調度的任務放到一個DelayedWorkQueue 驾胆,并且DelayedWorkQueue 是一個無界隊列涣澡,ThreadPoolExecutor的maximumPoolSize在ScheduledThreadPoolExecutor中沒有什么意義。整個ScheduledThreadPoolExecutor的運行可以分為兩大部分丧诺。
1入桂、當調用ScheduledThreadPoolExecutor的上面4個方法時,會向ScheduledThreadPoolExecutor的DelayedWorkQueue 添加一個實現(xiàn)了RunnableScheduleFuture接口的ScheduledFutureTask驳阎,如下ScheduledThreadPoolExecutor其中的一個schedule方法抗愁。
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit),
sequencer.getAndIncrement()));
delayedExecute(t);
return t;
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);//向ScheduledThreadPoolExecutor的DelayedWorkQueue添加一個實現(xiàn)了RunnableScheduleFuture接口的ScheduledFutureTask
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
2、線程池中的線程從DelayedWorkQueue 中獲取ScheduledFutureTask呵晚,然后執(zhí)行蜘腌。
ScheduledFutureTask是ScheduledThreadPoolExecutor的內部類并繼承自FutureTask,包含3個成員變量劣纲。
//ong型成員變量sequenceNumber逢捺,表示這個任務被添加到
//ScheduledThreadPoolExecutor中的序號谁鳍。
private final long sequenceNumber;
//long型成員變量time癞季,表示這個任務將要被執(zhí)行的具體時間劫瞳。
private volatile long time;
//long型成員變量period,表示任務執(zhí)行的間隔周期绷柒。
private final long period;
ScheduledFutureTask內部實現(xiàn)了compareTo()方法志于,用于對task的排序
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
排序時,time小的排在前面废睦,如果兩個ScheduledFutureTask的time相同伺绽,就比較sequenceNumber,sequenceNumber小的排在前面嗜湃。
DelayedWorkQueue 內部使用了二叉堆算法奈应,DelayedWorkQueue 中的元素第一個元素永遠是 延遲時間最小的那個元素。當執(zhí)行 schedule 方法是购披。如果不是重復的任務杖挣,那任務從 DelayedWorkQueue 取出之后執(zhí)行完了就結束了。如果是重復的任務刚陡,那在執(zhí)行結束前會重置執(zhí)行時間并將自己重新加入到 DelayedWorkQueue 中
總結來說惩妇,ScheduledThreadPoolExecutor是一個實現(xiàn)ScheduledExecutorService的可以調度任務的執(zhí)行框架;DelayedWorkQueue是一個數(shù)組實現(xiàn)的阻塞隊列筐乳,根據(jù)任務所提供的時間參數(shù)來調整位置歌殃,實際上就是個小根堆(優(yōu)先隊列);ScheduledFutureTask包含任務單元蝙云,存有時間氓皱、周期、外部任務勃刨、堆下標等調度過程中必須用到的參數(shù)匀泊,被工作線程執(zhí)行。ScheduledThreadPoolExecutor與Timer都是用作定時任務朵你,它們直接的差異是Timer使用的是絕對時間各聘,系統(tǒng)時間的改變會對Timer產生一定的影響;而ScheduledThreadPoolExecutor使用的是相對時間抡医,不會導致這個問題躲因。Timer使用的是單線程來處理任務,長時間運行的任務會導致其他任務的延遲處理忌傻;而ScheduledThreadPoolExecutor可以自定義線程數(shù)量大脉。并且Timer沒有對運行時異常進行處理,一旦某個任務觸發(fā)運行時異常水孩,會導致整個Timer崩潰镰矿;而ScheduledThreadPoolExecutor對運行時異常做了捕獲(通過afterExecute()回調方法中進行處理),所以更安全俘种。