并發(fā)編程之定時(shí)任務(wù)&定時(shí)線程池原理解析

點(diǎn)贊再看骡苞,養(yǎng)成習(xí)慣,搜一搜【一角錢技術(shù)】關(guān)注更多原創(chuàng)技術(shù)文章却紧。本文 GitHub org_hejianhui/JavaStudy 已收錄,有我的系列文章澈驼。

前言

線程池的具體實(shí)現(xiàn)有兩種畏纲,分別是ThreadPoolExecutor 默認(rèn)線程池和ScheduledThreadPoolExecutor 定時(shí)線程池建邓,上一篇已經(jīng)分析過ThreadPoolExecutor原理與使用了盈厘,本篇我們來重點(diǎn)分析下ScheduledThreadPoolExecutor的原理與使用。

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 與 ThreadPoolExecutor 線程池的概念有些區(qū)別官边,它是一個(gè)支持任務(wù)周期性調(diào)度的線程池沸手。

ScheduledThreadPoolExecutor 繼承 ThreadPoolExecutor外遇,同時(shí)通過實(shí)現(xiàn) ScheduledExecutorSerivce 來擴(kuò)展基礎(chǔ)線程池的功能,使其擁有了調(diào)度能力契吉。其整個(gè)調(diào)度的核心在于內(nèi)部類 DelayedWorkQueue 臀规,一個(gè)有序的延時(shí)隊(duì)列。

定時(shí)線程池類的類結(jié)構(gòu)圖如下:


ScheduledThreadPoolExecutor 的出現(xiàn)栅隐,很好的彌補(bǔ)了傳統(tǒng) Timer 的不足,具體對(duì)比看下表:

Timer ScheduledThreadPoolExecutor
線程 單線程 多線程
多任務(wù) 任務(wù)之間相互影響 任務(wù)之間不影響
調(diào)度時(shí)間 絕對(duì)時(shí)間 相對(duì)時(shí)間
異常 單任務(wù)異常玩徊,后續(xù)任務(wù)受影響 無影響

工作原理

它用來處理延時(shí)任務(wù)或定時(shí)任務(wù)



它接收SchduledFutureTask類型的任務(wù)租悄,是線程池調(diào)度任務(wù)的最小單位,有三種提交任務(wù)的方式:

  1. schedule恩袱,特定時(shí)間延時(shí)后執(zhí)行一次任務(wù)
  2. scheduledAtFixedRate泣棋,固定周期執(zhí)行任務(wù)(與任務(wù)執(zhí)行時(shí)間無關(guān),周期是固定的)
  3. scheduledWithFixedDelay畔塔,固定延時(shí)執(zhí)行任務(wù)(與任務(wù)執(zhí)行時(shí)間有關(guān),延時(shí)從上一次任務(wù)完成后開始)

它采用 DelayedWorkQueue 存儲(chǔ)等待的任務(wù)

  1. DelayedWorkQueue 內(nèi)部封裝了一個(gè) PriorityQueue ,它會(huì)根據(jù) time 的先后時(shí)間排序般哼,若 time 相同則根據(jù) sequenceNumber 排序燥爷;
  2. DelayedWorkQueue 也是一個(gè)無界隊(duì)列;

因?yàn)榍懊嬷v阻塞隊(duì)列實(shí)現(xiàn)的時(shí)候谅辣,已經(jīng)對(duì)DelayedWorkQueue進(jìn)行了說明修赞,更多內(nèi)容請(qǐng)查看《阻塞隊(duì)列 — DelayedWorkQueue源碼分析》

工作線程的執(zhí)行過程:

  • 工作線程會(huì)從DelayedWorkerQueue取已經(jīng)到期的任務(wù)去執(zhí)行;
  • 執(zhí)行結(jié)束后重新設(shè)置任務(wù)的到期時(shí)間桑阶,再次放回DelayedWorkerQueue柏副。

take方法是什么時(shí)候調(diào)用的呢?
在ThreadPoolExecutor中蚣录,getTask方法割择,工作線程會(huì)循環(huán)地從workQueue中取任務(wù)。但定時(shí)任務(wù)卻不同萎河,因?yàn)槿绻坏ゞetTask方法取出了任務(wù)就開始執(zhí)行了荔泳,而這時(shí)可能還沒有到執(zhí)行的時(shí)間,所以在take方法中公壤,要保證只有在到指定的執(zhí)行時(shí)間的時(shí)候任務(wù)才可以被取走换可。

PS:對(duì)于以上原理的理解,可以通過下面的源碼分析加深印象厦幅。

源碼分析

構(gòu)造方法

ScheduledThreadPoolExecutor有四個(gè)構(gòu)造形式:

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
        new DelayedWorkQueue());
}

public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
        new DelayedWorkQueue(), threadFactory);
}

public ScheduledThreadPoolExecutor(int corePoolSize,
                                       RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
        new DelayedWorkQueue(), handler);
}

public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
        new DelayedWorkQueue(), threadFactory, handler);
}

當(dāng)然我們也可以使用工具類Executors的newScheduledThreadPool的方法沾鳄,快速創(chuàng)建。注意這里使用的DelayedWorkQueue确憨。

ScheduledThreadPoolExecutor沒有提供帶有最大線程數(shù)的構(gòu)造函數(shù)的译荞,默認(rèn)是Integer.MAX_VALUE瓤的,說明其可以無限制的開啟任意線程執(zhí)行任務(wù),在大量任務(wù)系統(tǒng)吞歼,應(yīng)注意這一點(diǎn)圈膏,避免內(nèi)存溢出。

核心方法

核心方法主要介紹ScheduledThreadPoolExecutor的調(diào)度方法篙骡,其他方法與 ThreadPoolExecutor 一致稽坤。調(diào)度方法均由 ScheduledExecutorService 接口定義:

public interface ScheduledExecutorService extends ExecutorService {
    // 特定時(shí)間延時(shí)后執(zhí)行一次Runnable
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);
    // 特定時(shí)間延時(shí)后執(zhí)行一次Callable
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);
    // 固定周期執(zhí)行任務(wù)(與任務(wù)執(zhí)行時(shí)間無關(guān),周期是固定的)
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
     // 固定延時(shí)執(zhí)行任務(wù)(與任務(wù)執(zhí)行時(shí)間有關(guān)糯俗,延時(shí)從上一次任務(wù)完成后開始)
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);
}

我們?cè)賮砜匆幌陆涌诘膶?shí)現(xiàn)尿褪,具體是怎么來實(shí)現(xiàn)線程池任務(wù)的提交。因?yàn)樽罱K都回調(diào)用 delayedExecute 提交任務(wù)得湘。所以杖玲,我們這里只分析schedule方法,該方法是指任務(wù)在指定延遲時(shí)間到達(dá)后觸發(fā)淘正,只會(huì)執(zhí)行一次摆马。源代碼如下:

public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    //參數(shù)校驗(yàn)
    if (command == null || unit == null)
        throw new NullPointerException();
    //這里是一個(gè)嵌套結(jié)構(gòu),首先把用戶提交的任務(wù)包裝成ScheduledFutureTask
    //然后在調(diào)用decorateTask進(jìn)行包裝鸿吆,該方法是留給用戶去擴(kuò)展的囤采,默認(rèn)是個(gè)空方法
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
   //包裝好任務(wù)以后,就進(jìn)行提交了
    delayedExecute(t);
    return t;
}

delayedExecute 任務(wù)提交方法

private void delayedExecute(RunnableScheduledFuture<?> task) {
    //如果線程池已經(jīng)關(guān)閉惩淳,則使用拒絕策略把提交任務(wù)拒絕掉
    if (isShutdown())
        reject(task);
    else {
        //與ThreadPoolExecutor不同斑唬,這里直接把任務(wù)加入延遲隊(duì)列
        super.getQueue().add(task);//使用用的DelayedWorkQueue
        //如果當(dāng)前狀態(tài)無法執(zhí)行任務(wù),則取消
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            //這里是增加一個(gè)worker線程黎泣,避免提交的任務(wù)沒有worker去執(zhí)行
            //原因就是該類沒有像ThreadPoolExecutor一樣恕刘,woker滿了才放入隊(duì)列
            ensurePrestart();
    }
}

我們可以看到提交到線程池的任務(wù)都包裝成了 ScheduledFutureTask,繼續(xù)往下我們?cè)賮硌芯肯隆?br> 造方

ScheduledFutureTask

從ScheduledFutureTask類的定義可以看出抒倚,ScheduledFutureTask類是ScheduledThreadPoolExecutor類的私有內(nèi)部類褐着,繼承了FutureTask類,并實(shí)現(xiàn)了RunnableScheduledFuture接口托呕。也就是說含蓉,ScheduledFutureTask具有FutureTask類的所有功能,并實(shí)現(xiàn)了RunnableScheduledFuture接口的所有方法项郊。ScheduledFutureTask類的定義如下所示:

private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V>

ScheduledFutureTask類繼承圖如下:


成員變量

SchduledFutureTask接收的參數(shù)(成員變量):

// 任務(wù)開始的時(shí)間
private long time;
// 任務(wù)添加到ScheduledThreadPoolExecutor中被分配的唯一序列號(hào)
private final long sequenceNumber;
// 任務(wù)執(zhí)行的時(shí)間間隔
private final long period;
//ScheduledFutureTask對(duì)象馅扣,實(shí)際指向當(dāng)前對(duì)象本身
RunnableScheduledFuture<V> outerTask = this;
//當(dāng)前任務(wù)在延遲隊(duì)列中的索引,能夠更加方便的取消當(dāng)前任務(wù)
int heapIndex;

解析

  • sequenceNumber:任務(wù)添加到ScheduledThreadPoolExecutor中被分配的唯一序列號(hào),可以根據(jù)這個(gè)序列號(hào)確定唯一的一個(gè)任務(wù)着降,如果在定時(shí)任務(wù)中差油,如果一個(gè)任務(wù)是周期性執(zhí)行的,但是它們的sequenceNumber的值相同,則被視為是同一個(gè)任務(wù)蓄喇。
  • time:下一次執(zhí)行任務(wù)的時(shí)間发侵。
  • period:任務(wù)的執(zhí)行周期。
  • outerTask:ScheduledFutureTask對(duì)象妆偏,實(shí)際指向當(dāng)前對(duì)象本身刃鳄。此對(duì)象的引用會(huì)被傳入到周期性執(zhí)行任務(wù)的ScheduledThreadPoolExecutor類的reExecutePeriodic方法中。
  • heapIndex:當(dāng)前任務(wù)在延遲隊(duì)列中的索引钱骂,這個(gè)索引能夠更加方便的取消當(dāng)前任務(wù)叔锐。

構(gòu)造方法

ScheduledFutureTask類繼承了FutureTask類,并實(shí)現(xiàn)了RunnableScheduledFuture接口见秽。在ScheduledFutureTask類中提供了如下構(gòu)造方法掌腰。

ScheduledFutureTask(Runnable r, V result, long ns) {
    super(r, result);
    this.time = ns;
    this.period = 0;
    this.sequenceNumber = sequencer.getAndIncrement();
}

ScheduledFutureTask(Runnable r, V result, long ns, long period) {
    super(r, result);
    this.time = ns;
    this.period = period;
    this.sequenceNumber = sequencer.getAndIncrement();
}

ScheduledFutureTask(Callable<V> callable, long ns) {
    super(callable);
    this.time = ns;
    this.period = 0;
    this.sequenceNumber = sequencer.getAndIncrement();
}

FutureTask的構(gòu)造方法如下:

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

通過源碼可以看到,在ScheduledFutureTask類的構(gòu)造方法中张吉,首先會(huì)調(diào)用FutureTask類的構(gòu)造方法為FutureTask類的callable和state成員變量賦值,接下來為ScheduledFutureTask類的time催植、period和sequenceNumber成員變量賦值肮蛹。理解起來比較簡(jiǎn)單。

getDelay方法

我們先來看getDelay方法的源碼创南,如下所示:

//獲取下次執(zhí)行任務(wù)的時(shí)間距離當(dāng)前時(shí)間的納秒數(shù)
public long getDelay(TimeUnit unit) {
    return unit.convert(time - now(), NANOSECONDS);
}

getDelay方法比較簡(jiǎn)單伦忠,主要用來獲取下次執(zhí)行任務(wù)的時(shí)間距離當(dāng)前系統(tǒng)時(shí)間的納秒數(shù)。

compareTo方法

ScheduledFutureTask類在類的結(jié)構(gòu)上實(shí)現(xiàn)了Comparable接口稿辙,compareTo方法主要是對(duì)Comparable接口定義的compareTo方法的實(shí)現(xiàn)昆码。源碼如下所示:

public int compareTo(Delayed other) {
    if (other == this) 
        return 0;
    if (other instanceof ScheduledFutureTask) {
        ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
        long diff = time - x.time;
        if (diff < 0)
            return -1;
        else if (diff > 0)
            return 1;
        else if (sequenceNumber < x.sequenceNumber)
            return -1;
        else
            return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

這段代碼看上去好像是對(duì)各種數(shù)值類型數(shù)據(jù)的比較,本質(zhì)上是對(duì)延遲隊(duì)列中的任務(wù)進(jìn)行排序邻储。排序規(guī)則為:

  • 首先比較延遲隊(duì)列中每個(gè)任務(wù)下次執(zhí)行的時(shí)間赋咽,下次執(zhí)行時(shí)間距離當(dāng)前時(shí)間短的任務(wù)會(huì)排在前面;
  • 如果下次執(zhí)行任務(wù)的時(shí)間相同吨娜,則會(huì)比較任務(wù)的sequenceNumber值脓匿,sequenceNumber值小的任務(wù)會(huì)排在前面。

isPeriodic方法

isPeriodic方法的源代碼如下所示:

//判斷是否是周期性任務(wù)
public boolean isPeriodic() {
    return period != 0;
}

這個(gè)方法主要是用來判斷當(dāng)前任務(wù)是否是周期性任務(wù)宦赠。這里只要判斷運(yùn)行任務(wù)的執(zhí)行周期不等于0就能確定為周期性任務(wù)了陪毡。因?yàn)闊o論period的值是大于0還是小于0,當(dāng)前任務(wù)都是周期性任務(wù)勾扭。

setNextRunTime方法

setNextRunTime方法的作用主要是設(shè)置當(dāng)前任務(wù)下次執(zhí)行的時(shí)間毡琉,源碼如下所示:

private void setNextRunTime() {
    long p = period;
    //固定頻率,上次執(zhí)行任務(wù)的時(shí)間加上任務(wù)的執(zhí)行周期
    if (p > 0)
        time += p;
    //相對(duì)固定的延遲執(zhí)行妙色,當(dāng)前系統(tǒng)時(shí)間加上任務(wù)的執(zhí)行周期
    else
        time = triggerTime(-p);
}

這里再一次證明了使用isPeriodic方法判斷當(dāng)前任務(wù)是否為周期性任務(wù)時(shí)桅滋,只要判斷period的值是否不等于0就可以了。

  • 因?yàn)槿绻?dāng)前任務(wù)時(shí)固定頻率執(zhí)行的周期性任務(wù)身辨,會(huì)將周期period當(dāng)作正數(shù)來處理虱歪;
  • 如果是相對(duì)固定的延遲執(zhí)行當(dāng)前任務(wù)蜂绎,則會(huì)將周期period當(dāng)作負(fù)數(shù)來處理。

這里我們看到在setNextRunTime方法中笋鄙,調(diào)用了ScheduledThreadPoolExecutor類的triggerTime方法师枣。接下來我們看下triggerTime方法的源碼。

ScheduledThreadPoolExecutor類的triggerTime方法

triggerTime方法用于獲取延遲隊(duì)列中的任務(wù)下一次執(zhí)行的具體時(shí)間萧落。源碼如下所示践美。

private long triggerTime(long delay, TimeUnit unit) {
    return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}

long triggerTime(long delay) {
    return now() +
        ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}

這兩個(gè)triggerTime方法的代碼比較簡(jiǎn)單,就是獲取下一次執(zhí)行任務(wù)的具體時(shí)間找岖。有一點(diǎn)需要注意的是:delay < (Long.MAX_VALUE >> 1判斷delay的值是否小于Long.MAX_VALUE的一半陨倡,如果小于Long.MAX_VALUE值的一半,則直接返回delay许布,否則需要處理溢出的情況兴革。

我們看到在triggerTime方法中處理防止溢出的邏輯使用了ScheduledThreadPoolExecutor類的overflowFree方法,接下來蜜唾,我們就看看ScheduledThreadPoolExecutor類的overflowFree方法的實(shí)現(xiàn)杂曲。

ScheduledThreadPoolExecutor類的overflowFree方法

overflowFree方法的源代碼如下所示:

private long overflowFree(long delay) {
    //獲取隊(duì)列中的節(jié)點(diǎn)
    Delayed head = (Delayed) super.getQueue().peek();
    //獲取的節(jié)點(diǎn)不為空,則進(jìn)行后續(xù)處理
    if (head != null) {
        //從隊(duì)列節(jié)點(diǎn)中獲取延遲時(shí)間
        long headDelay = head.getDelay(NANOSECONDS);
        //如果從隊(duì)列中獲取的延遲時(shí)間小于0袁余,并且傳遞的delay
        //值減去從隊(duì)列節(jié)點(diǎn)中獲取延遲時(shí)間小于0
        if (headDelay < 0 && (delay - headDelay < 0))
            //將delay的值設(shè)置為L(zhǎng)ong.MAX_VALUE + headDelay
            delay = Long.MAX_VALUE + headDelay;
    }
    //返回延遲時(shí)間
    return delay;
}

通過對(duì)overflowFree方法的源碼分析擎勘,可以看出overflowFree方法本質(zhì)上就是為了限制隊(duì)列中的所有節(jié)點(diǎn)的延遲時(shí)間在Long.MAX_VALUE值之內(nèi),防止在compareTo方法中溢出颖榜。

cancel方法

cancel方法的作用主要是取消當(dāng)前任務(wù)的執(zhí)行棚饵,源碼如下所示:

public boolean cancel(boolean mayInterruptIfRunning) {
    //取消任務(wù),返回任務(wù)是否取消的標(biāo)識(shí)
    boolean cancelled = super.cancel(mayInterruptIfRunning);
    //如果任務(wù)已經(jīng)取消
    //并且需要將任務(wù)從延遲隊(duì)列中刪除
    //并且任務(wù)在延遲隊(duì)列中的索引大于或者等于0
    if (cancelled && removeOnCancel && heapIndex >= 0)
        //將當(dāng)前任務(wù)從延遲隊(duì)列中刪除
        remove(this);
    //返回是否成功取消任務(wù)的標(biāo)識(shí)
    return cancelled;
}

這段代碼理解起來相對(duì)比較簡(jiǎn)單掩完,首先調(diào)用取消任務(wù)的方法噪漾,并返回任務(wù)是否已經(jīng)取消的標(biāo)識(shí)。如果任務(wù)已經(jīng)取消且蓬,并且需要移除任務(wù)怪与,同時(shí),任務(wù)在延遲隊(duì)列中的索引大于或者等于0缅疟,則將當(dāng)前任務(wù)從延遲隊(duì)列中移除分别。最后返回任務(wù)是否成功取消的標(biāo)識(shí)。

run方法

run方法可以說是ScheduledFutureTask類的核心方法存淫,是對(duì)Runnable接口的實(shí)現(xiàn)耘斩,源碼如下所示:

public void run() {
    //當(dāng)前任務(wù)是否是周期性任務(wù)
    boolean periodic = isPeriodic();
    //線程池當(dāng)前運(yùn)行狀態(tài)下不能執(zhí)行周期性任務(wù)
    if (!canRunInCurrentRunState(periodic))
        //取消任務(wù)的執(zhí)行
        cancel(false);
    //如果不是周期性任務(wù)
    else if (!periodic)
        //則直接調(diào)用FutureTask類的run方法執(zhí)行任務(wù)
        ScheduledFutureTask.super.run();
    //如果是周期性任務(wù),則調(diào)用FutureTask類的runAndReset方法執(zhí)行任務(wù)
    //如果任務(wù)執(zhí)行成功
    else if (ScheduledFutureTask.super.runAndReset()) {
        //設(shè)置下次執(zhí)行任務(wù)的時(shí)間
        setNextRunTime();
        //重復(fù)執(zhí)行任務(wù)
        reExecutePeriodic(outerTask);
    }
}

整理一下方法的邏輯:

  1. 首先判斷當(dāng)前任務(wù)是否是周期性任務(wù)桅咆。如果線程池當(dāng)前運(yùn)行狀態(tài)下不能執(zhí)行周期性任務(wù)括授,則取消任務(wù)的執(zhí)行,否則執(zhí)行步驟2;
  2. 如果當(dāng)前任務(wù)不是周期性任務(wù)荚虚,則直接調(diào)用FutureTask類的run方法執(zhí)行任務(wù)薛夜,會(huì)設(shè)置執(zhí)行結(jié)果,然后直接返回版述,否則執(zhí)行步驟3梯澜;
  3. 如果當(dāng)前任務(wù)是周期性任務(wù),則調(diào)用FutureTask類的runAndReset方法執(zhí)行任務(wù)渴析,不會(huì)設(shè)置執(zhí)行結(jié)果晚伙,然后直接返回,否則執(zhí)行步驟4俭茧;
  4. 如果任務(wù)執(zhí)行成功咆疗,則設(shè)置下次執(zhí)行任務(wù)的時(shí)間,同時(shí)母债,將任務(wù)設(shè)置為重復(fù)執(zhí)行午磁。

這里,調(diào)用了FutureTask類的run方法和runAndReset方法毡们,并且調(diào)用了ScheduledThreadPoolExecutor類的reExecutePeriodic方法迅皇。接下來,我們分別看下這些方法的實(shí)現(xiàn)漏隐。

FutureTask類的run方法

FutureTask類的run方法源碼如下所示:

public void run() {
    //狀態(tài)如果不是NEW,說明任務(wù)或者已經(jīng)執(zhí)行過奴迅,或者已經(jīng)被取消青责,直接返回
    //狀態(tài)如果是NEW,則嘗試把當(dāng)前執(zhí)行線程保存在runner字段中
    //如果賦值失敗則直接返回
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                //執(zhí)行任務(wù)
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                //任務(wù)異常
                setException(ex);
            }
            if (ran)
                //任務(wù)正常執(zhí)行完畢
                set(result);
        }
    } finally {

        runner = null;
        int s = state;
        //如果任務(wù)被中斷取具,執(zhí)行中斷處理
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

代碼的整體邏輯為:

  • 判斷當(dāng)前任務(wù)的state是否等于NEW脖隶,如果不為NEW則說明任務(wù)或者已經(jīng)執(zhí)行過,或者已經(jīng)被取消暇检,直接返回产阱;
  • 如果狀態(tài)為NEW則接著會(huì)通過unsafe類把任務(wù)執(zhí)行線程引用CAS的保存在runner字段中,如果保存失敗块仆,則直接返回构蹬;
  • 執(zhí)行任務(wù);如果任務(wù)執(zhí)行發(fā)生異常悔据,則調(diào)用setException()方法保存異常信息庄敛。

FutureTask類的runAndReset方法

方法的源碼如下所示:

protected boolean runAndReset() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return false;
    boolean ran = false;
    int s = state;
    try {
        Callable<V> c = callable;
        if (c != null && s == NEW) {
            try {
                c.call(); // don't set result
                ran = true;
            } catch (Throwable ex) {
                setException(ex);
            }
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
    return ran && s == NEW;
}

FutureTask類的runAndReset方法與run方法的邏輯基本相同,只是runAndReset方法會(huì)重置當(dāng)前任務(wù)的執(zhí)行狀態(tài)科汗。

ScheduledThreadPoolExecutor類的reExecutePeriodic方法

reExecutePeriodic重復(fù)執(zhí)行任務(wù)方法藻烤,源代碼如下所示:

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    //線程池當(dāng)前狀態(tài)下能夠執(zhí)行任務(wù)
    if (canRunInCurrentRunState(true)) {
        //與ThreadPoolExecutor不同,這里直接把任務(wù)加入延遲隊(duì)列
        super.getQueue().add(task);//使用用的DelayedWorkQueue
        //線程池當(dāng)前狀態(tài)下不能執(zhí)行任務(wù),并且成功移除任務(wù)
        if (!canRunInCurrentRunState(true) && remove(task))
            //取消任務(wù)
            task.cancel(false);
        else
            //這里是增加一個(gè)worker線程怖亭,避免提交的任務(wù)沒有worker去執(zhí)行
            //原因就是該類沒有像ThreadPoolExecutor一樣涎显,woker滿了才放入隊(duì)列        
            ensurePrestart();
    }
}

總體來說reExecutePeriodic方法的邏輯比較簡(jiǎn)單,需要注意的是:調(diào)用reExecutePeriodic方法的時(shí)候已經(jīng)執(zhí)行過一次任務(wù)兴猩,所以期吓,并不會(huì)觸發(fā)線程池的拒絕策略;傳入reExecutePeriodic方法的任務(wù)一定是周期性的任務(wù)峭跳。

DelayedWorkQueue

ScheduledThreadPoolExecutor之所以要自己實(shí)現(xiàn)阻塞的工作隊(duì)列膘婶,是因?yàn)?ScheduleThreadPoolExecutor 要求的工作隊(duì)列有些特殊。

DelayedWorkQueue是一個(gè)基于堆的數(shù)據(jù)結(jié)構(gòu)蛀醉,類似于DelayQueue和PriorityQueue悬襟。在執(zhí)行定時(shí)任務(wù)的時(shí)候,每個(gè)任務(wù)的執(zhí)行時(shí)間都不同拯刁,所以DelayedWorkQueue的工作就是按照?qǐng)?zhí)行時(shí)間的升序來排列脊岳,執(zhí)行時(shí)間距離當(dāng)前時(shí)間越近的任務(wù)在隊(duì)列的前面(注意:這里的順序并不是絕對(duì)的,堆中的排序只保證了子節(jié)點(diǎn)的下次執(zhí)行時(shí)間要比父節(jié)點(diǎn)的下次執(zhí)行時(shí)間要大垛玻,而葉子節(jié)點(diǎn)之間并不一定是順序的)割捅。

堆結(jié)構(gòu)如下圖:



可見,DelayedWorkQueue是一個(gè)基于最小堆結(jié)構(gòu)的隊(duì)列帚桩。堆結(jié)構(gòu)可以使用數(shù)組表示亿驾,可以轉(zhuǎn)換成如下的數(shù)組:



在這種結(jié)構(gòu)中,可以發(fā)現(xiàn)有如下特性:
假設(shè)“第一個(gè)元素” 在數(shù)組中的索引為 0 的話账嚎,則父結(jié)點(diǎn)和子結(jié)點(diǎn)的位置關(guān)系如下:
  • 索引為 的左孩子的索引是 (2 * i + 1)莫瞬;
  • 索引為 的右孩子的索引是 (2 * i + 2)
  • 索引為 的父結(jié)點(diǎn)的索引是 floor((i-1)/2)郭蕉;

為什么要使用DelayedWorkQueue呢疼邀?

  • 定時(shí)任務(wù)執(zhí)行時(shí)需要取出最近要執(zhí)行的任務(wù),所以任務(wù)在隊(duì)列中每次出隊(duì)時(shí)一定要是當(dāng)前隊(duì)列中執(zhí)行時(shí)間最靠前的召锈,所以自然要使用優(yōu)先級(jí)隊(duì)列旁振。
  • DelayedWorkQueue是一個(gè)優(yōu)先級(jí)隊(duì)列,它可以保證每次出隊(duì)的任務(wù)都是當(dāng)前隊(duì)列中執(zhí)行時(shí)間最靠前的涨岁,由于它是基于堆結(jié)構(gòu)的隊(duì)列拐袜,堆結(jié)構(gòu)在執(zhí)行插入和刪除操作時(shí)的最壞時(shí)間復(fù)雜度是 O(logN)

因?yàn)榍懊嬷v阻塞隊(duì)列實(shí)現(xiàn)的時(shí)候梢薪,已經(jīng)對(duì)DelayedWorkQueue進(jìn)行了說明阻肿,更多內(nèi)容請(qǐng)查看《阻塞隊(duì)列 — DelayedWorkQueue源碼分析》

總結(jié)

  1. 與Timer執(zhí)行定時(shí)任務(wù)比較,相比Timer沮尿,ScheduledThreadPoolExecutor有說明優(yōu)點(diǎn)丛塌?(文章前面分析過)
  2. ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor较解,所以它也是一個(gè)線程池,也有 coorPoolSize 和 workQueue赴邻,但是 ScheduledThreadPoolExecutor特殊的地方在于印衔,自己實(shí)現(xiàn)了優(yōu)先工作隊(duì)列 DelayedWorkQueue
  3. ScheduledThreadPoolExecutor 實(shí)現(xiàn)了 ScheduledExecutorService姥敛,所以就有了任務(wù)調(diào)度的方法奸焙,如 schedulescheduleAtFixedRate 彤敛、 scheduleWithFixedDelay 与帆,同時(shí)注意他們之間的區(qū)別;
  4. 內(nèi)部類 ScheduledFutureTask 繼承者FutureTask墨榄,實(shí)現(xiàn)了任務(wù)的異步執(zhí)行并且可以獲取返回結(jié)果玄糟。同時(shí)實(shí)現(xiàn)了Delayed接口,可以通過getDelay方法獲取將要執(zhí)行的時(shí)間間隔袄秩;
  5. 周期任務(wù)的執(zhí)行其實(shí)是調(diào)用了FutureTask的 runAndReset 方法阵翎,每次執(zhí)行完不設(shè)置結(jié)果和狀態(tài)。
  6. DelayedWorkQueue的數(shù)據(jù)結(jié)構(gòu)之剧,它是一個(gè)基于最小堆結(jié)構(gòu)的優(yōu)先隊(duì)列郭卫,并且每次出隊(duì)時(shí)能夠保證取出的任務(wù)是當(dāng)前隊(duì)列中下次執(zhí)行時(shí)間最小的任務(wù)。同時(shí)注意一下優(yōu)先隊(duì)列中堆的順序背稼,堆中的順序并不是絕對(duì)的贰军,但要保證子節(jié)點(diǎn)的值要比父節(jié)點(diǎn)的值要大,這樣就不會(huì)影響出隊(duì)的順序蟹肘。

總體來說词疼,ScheduedThreadPoolExecutor的重點(diǎn)是要理解下次執(zhí)行時(shí)間的計(jì)算,以及優(yōu)先隊(duì)列的出隊(duì)疆前、入隊(duì)和刪除的過程寒跳,這兩個(gè)是理解ScheduedThreadPoolExecutor的關(guān)鍵聘萨。

PS:以上代碼提交在 Githubhttps://github.com/Niuh-Study/niuh-juc-final.git

文章持續(xù)更新竹椒,可以公眾號(hào)搜一搜「 一角錢技術(shù) 」第一時(shí)間閱讀, 本文 GitHub org_hejianhui/JavaStudy 已經(jīng)收錄米辐,歡迎 Star胸完。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市翘贮,隨后出現(xiàn)的幾起案子赊窥,更是在濱河造成了極大的恐慌,老刑警劉巖狸页,帶你破解...
    沈念sama閱讀 206,214評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件锨能,死亡現(xiàn)場(chǎng)離奇詭異扯再,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)址遇,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門熄阻,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人倔约,你說我怎么就攤上這事秃殉。” “怎么了浸剩?”我有些...
    開封第一講書人閱讀 152,543評(píng)論 0 341
  • 文/不壞的土叔 我叫張陵钾军,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我绢要,道長(zhǎng)吏恭,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,221評(píng)論 1 279
  • 正文 為了忘掉前任袖扛,我火速辦了婚禮砸泛,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘蛆封。我一直安慰自己唇礁,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,224評(píng)論 5 371
  • 文/花漫 我一把揭開白布惨篱。 她就那樣靜靜地躺著盏筐,像睡著了一般。 火紅的嫁衣襯著肌膚如雪砸讳。 梳的紋絲不亂的頭發(fā)上琢融,一...
    開封第一講書人閱讀 49,007評(píng)論 1 284
  • 那天,我揣著相機(jī)與錄音簿寂,去河邊找鬼漾抬。 笑死,一個(gè)胖子當(dāng)著我的面吹牛常遂,可吹牛的內(nèi)容都是我干的纳令。 我是一名探鬼主播,決...
    沈念sama閱讀 38,313評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼克胳,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼平绩!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起漠另,我...
    開封第一講書人閱讀 36,956評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤捏雌,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后笆搓,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體性湿,經(jīng)...
    沈念sama閱讀 43,441評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡纬傲,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,925評(píng)論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了肤频。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片嘹锁。...
    茶點(diǎn)故事閱讀 38,018評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖着裹,靈堂內(nèi)的尸體忽然破棺而出领猾,到底是詐尸還是另有隱情,我是刑警寧澤骇扇,帶...
    沈念sama閱讀 33,685評(píng)論 4 322
  • 正文 年R本政府宣布摔竿,位于F島的核電站,受9級(jí)特大地震影響少孝,放射性物質(zhì)發(fā)生泄漏继低。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,234評(píng)論 3 307
  • 文/蒙蒙 一稍走、第九天 我趴在偏房一處隱蔽的房頂上張望袁翁。 院中可真熱鬧,春花似錦婿脸、人聲如沸粱胜。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,240評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽焙压。三九已至,卻和暖如春抑钟,著一層夾襖步出監(jiān)牢的瞬間涯曲,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,464評(píng)論 1 261
  • 我被黑心中介騙來泰國打工在塔, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留幻件,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,467評(píng)論 2 352
  • 正文 我出身青樓蛔溃,卻偏偏與公主長(zhǎng)得像绰沥,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子城榛,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,762評(píng)論 2 345