[懷舊并發(fā)09]分析Java延遲與周期任務(wù)的實(shí)現(xiàn)原理

Java并發(fā)編程源碼分析系列:

延遲或周期執(zhí)行任務(wù)可以使用Timer或者ScheduledThreadPoolExecutor,前者可以拋棄,后者是今天的主角娶桦。

ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor,對(duì)應(yīng)執(zhí)行任務(wù)變成ScheduledFutureTask袖肥。本文會(huì)在前三篇分析線程池原理的基礎(chǔ)上,分析ScheduledThreadPoolExecutor的實(shí)現(xiàn)原理振劳,最后介紹下為什么不用Timer了椎组。

ScheduledThreadPoolExecutor的創(chuàng)建

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(10);
ScheduledExecutorService singleScheduledThreadPool = Executors.newSingleThreadScheduledExecutor();

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}

ScheduledThreadPoolExecutor的創(chuàng)建可以使用Executors,也可以自己傳參構(gòu)建历恐。上面的構(gòu)造函數(shù)是參數(shù)最全的版本寸癌,可以設(shè)置線程目標(biāo)數(shù)量、線程工廠和飽和策略弱贼。至于等待隊(duì)列蒸苇,使用內(nèi)部類DelayedWorkQueue,看后文分析吮旅。

ScheduledFutureTask

ScheduledFutureTask的構(gòu)造函數(shù)沒(méi)什么特別溪烤,保存了三個(gè)參數(shù)。

ScheduledFutureTask(Runnable r, V result, long ns, long period) {
    super(r, result);
    this.time = ns;
    this.period = period;
    this.sequenceNumber = sequencer.getAndIncrement();
}

ScheduledFutureTask(Callable<V> callable, long ns) {
    super(callable);
    this.time = ns;
    this.period = 0;
    this.sequenceNumber = sequencer.getAndIncrement();
}
  • time:任務(wù)執(zhí)行時(shí)間;
  • period:任務(wù)周期執(zhí)行間隔氛什;
  • sequenceNumber:自增的任務(wù)序號(hào)莺葫。

Callable默認(rèn)period=0,表示任務(wù)不是周期執(zhí)行枪眉,因?yàn)橹挥蠷unnable可以周期執(zhí)行。想想也是再层,Callable目的是獲得執(zhí)行結(jié)果贸铜,沒(méi)有必要重復(fù)調(diào)用。

圖1

ScheduledFutureTask繼承了我們熟悉的FutureTask聂受,這個(gè)不用多說(shuō)蒿秦。圖1是它實(shí)現(xiàn)的接口,比較陌生的是Delayed蛋济,而Delayed又繼承了Comparable棍鳖。

public long getDelay(TimeUnit unit) {
        return unit.convert(time - now(), NANOSECONDS);
}
public int compareTo(Delayed other) {
    if (other == this) // compare zero if same object
        return 0;
    if (other instanceof ScheduledFutureTask) {
        ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
        long diff = time - x.time;
        if (diff < 0)
            return -1;
        else if (diff > 0)
            return 1;
        else if (sequenceNumber < x.sequenceNumber)
            return -1;
        else
            return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

這兩個(gè)接口的存在很容易理解,ScheduledFutureTask在等待隊(duì)列里調(diào)度不再按照FIFO碗旅,而是按照?qǐng)?zhí)行時(shí)間渡处,誰(shuí)即將執(zhí)行,誰(shuí)就排在前面祟辟。在這里也可以看到sequenceNumber的作用医瘫,當(dāng)執(zhí)行時(shí)間相同時(shí),按照序號(hào)排序旧困。

添加延遲任務(wù)

對(duì)ScheduledThreadPoolExecutor使用通用的execute或者submit提交任務(wù)醇份,最終調(diào)用schedule方法,默認(rèn)馬上執(zhí)行吼具。如果需要延遲執(zhí)行僚纷,需要直接使用schedule,傳遞時(shí)間參數(shù)拗盒。

public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                       long delay,
                                       TimeUnit unit) {
    if (callable == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<V> t = decorateTask(callable,
        new ScheduledFutureTask<V>(callable,
                                   triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}

Runnable和Callable包裝成ScheduledFutureTask實(shí)例怖竭,保存了延遲信息,然后執(zhí)行delayedExecute锣咒。

private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
        reject(task);
    else {
        super.getQueue().add(task);
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            ensurePrestart();
    }
}

boolean canRunInCurrentRunState(boolean periodic) {
    return isRunningOrShutdown(periodic ?
                               continueExistingPeriodicTasksAfterShutdown :
                               executeExistingDelayedTasksAfterShutdown);
}

如果線程池已經(jīng)關(guān)閉侵状,直接調(diào)用飽和策略,否則將任務(wù)加入等待隊(duì)列毅整。加入之后趣兄,需要再判斷線程池的狀態(tài),和當(dāng)前任務(wù)是否能運(yùn)行悼嫉。如果不能繼續(xù)執(zhí)行艇潭,將任務(wù)移出隊(duì)列并取消任務(wù)。

canRunInCurrentRunState處理任務(wù)加入等待隊(duì)列后,又未執(zhí)行就發(fā)生線程池關(guān)閉的情況蹋凝,它通過(guò)預(yù)設(shè)的兩個(gè)變量判斷任務(wù)到底能不能執(zhí)行鲁纠。

  • 延遲任務(wù)用executeExistingDelayedTasksAfterShutdown
  • 周期任務(wù)用continueExistingPeriodicTasksAfterShutdown
void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

最后調(diào)用到ensurePrestart,使用addWorkder增加工作線程鳍寂,這在ThreadPoolExecutor解釋過(guò)了

添加周期任務(wù)

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                             long initialDelay,long period,TimeUnit unit) {
   if (command == null || unit == null)
       throw new NullPointerException();
   if (period <= 0)
       throw new IllegalArgumentException();
   ScheduledFutureTask<Void> sft =
       new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));
   RunnableScheduledFuture<Void> t = decorateTask(command, sft);
   sft.outerTask = t;
   delayedExecute(t);
   return t;
}

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,
                                                long delay,TimeUnit unit) {
   if (command == null || unit == null)
       throw new NullPointerException();
   if (delay <= 0)
       throw new IllegalArgumentException();
   ScheduledFutureTask<Void> sft =
       new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),
                                     unit.toNanos(-delay));
   RunnableScheduledFuture<Void> t = decorateTask(command, sft);
   sft.outerTask = t;
   delayedExecute(t);
   return t;
}

執(zhí)行周期任務(wù)有上面兩個(gè)方法改含,具體作用方法名寫(xiě)得很清楚:

  • scheduleAtFixedRate:按固定的頻率執(zhí)行,不受執(zhí)行時(shí)長(zhǎng)影響迄汛,到點(diǎn)就執(zhí)行捍壤;
  • scheduleWithFixedDelay:任務(wù)執(zhí)行完后,按固定的延后時(shí)間再執(zhí)行鞍爱。

兩個(gè)方法幾乎一樣鹃觉,不同的是構(gòu)建ScheduledFutureTask時(shí),period一個(gè)傳正數(shù)睹逃,另一個(gè)傳負(fù)數(shù)盗扇。不用懷疑,區(qū)分兩種情況就是用正負(fù)沉填。

等待隊(duì)列

線程池的等待隊(duì)列使用了內(nèi)部類DelayedWorkQueue疗隶,和普通線程池等待隊(duì)列最大的不同是它的任務(wù)是按照目標(biāo)執(zhí)行時(shí)間進(jìn)行排序。

入隊(duì)的offer被重寫(xiě)了拜轨,add和put方法也是調(diào)用offer抽减,具體BlockingQueue的實(shí)現(xiàn)邏輯不在這里討論,重點(diǎn)是看offer里的siftUp方法橄碾。

private void siftUp(int k, RunnableScheduledFuture<?> key) {
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        RunnableScheduledFuture<?> e = queue[parent];
        if (key.compareTo(e) >= 0)
            break;
        queue[k] = e;
        setIndex(e, k);
        k = parent;
    }
    queue[k] = key;
    setIndex(key, k);
}

siftUp根據(jù)任務(wù)的compareTo卵沉,將任務(wù)移動(dòng)到隊(duì)列中指定的位置,就是這樣法牲。

對(duì)應(yīng)地史汗,出隊(duì)take方法,根據(jù)任務(wù)的delay時(shí)間拒垃,小于等于0時(shí)將任務(wù)出隊(duì)停撞,否則等待。

任務(wù)執(zhí)行

當(dāng)線程池從等待隊(duì)列取出一個(gè)任務(wù)時(shí)悼瓮,會(huì)執(zhí)行它的run方法戈毒。

public void run() {
    boolean periodic = isPeriodic();
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    else if (!periodic)
        ScheduledFutureTask.super.run();
    else if (ScheduledFutureTask.super.runAndReset()) {
        setNextRunTime();
        reExecutePeriodic(outerTask);
    }
}

方法有三個(gè)分支,第一個(gè)if判斷任務(wù)在當(dāng)前線程池狀態(tài)下是否能執(zhí)行横堡,canRunInCurrentRunState已經(jīng)講解過(guò)埋市。第二個(gè)if是判斷是否周期任務(wù),不是的話直接執(zhí)行命贴,不需要多余的操作道宅。重點(diǎn)來(lái)看第三個(gè)if食听,也就是周期執(zhí)行任務(wù)。

  1. runAndReset:任務(wù)執(zhí)行完重置為初始狀態(tài)污茵,等待下一次執(zhí)行樱报;
  2. setNextRunTime:計(jì)算下次執(zhí)行時(shí)間;
  3. reExecutePeriodic:再調(diào)度任務(wù)泞当。
private void setNextRunTime() {
    long p = period;
    if (p > 0)
        time += p;
    else
        time = triggerTime(-p);
}

計(jì)算下次執(zhí)行時(shí)間迹蛤,period根據(jù)正負(fù)有不同的計(jì)算邏輯,負(fù)的時(shí)間也會(huì)先改正襟士,很明顯對(duì)應(yīng)上文的scheduleAtFixedRate和scheduleWithFixedDelay兩個(gè)方法笤受。

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    if (canRunInCurrentRunState(true)) {
        super.getQueue().add(task);
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else
            ensurePrestart();
    }
}

將任務(wù)重新加入等待隊(duì)列,中間幾個(gè)方法都解釋過(guò)了敌蜂。

Timer的缺陷

自從知道ScheduledThreadPoolExecutor,再?zèng)]有使用Timer津肛,因?yàn)樗袔讉€(gè)缺陷:

  • 多任務(wù)在單線程里執(zhí)行章喉,一個(gè)任務(wù)結(jié)束,另一個(gè)任務(wù)才能開(kāi)始身坐,時(shí)間間隔不準(zhǔn)秸脱;
  • 出現(xiàn)異常會(huì)導(dǎo)致全部任務(wù)停止;
  • 絕對(duì)時(shí)間部蛇,受系統(tǒng)時(shí)間影響摊唇。
private final TaskQueue queue = new TaskQueue();
private final TimerThread thread = new TimerThread(queue);

Timer的代碼很簡(jiǎn)單,主要數(shù)據(jù)結(jié)構(gòu)是一個(gè)任務(wù)隊(duì)列和一個(gè)執(zhí)行線程涯鲁。新增的任務(wù)會(huì)加入任務(wù)隊(duì)列巷查,到達(dá)時(shí)間后,由執(zhí)行線程執(zhí)行抹腿。只有一個(gè)線程岛请,很容易理解上面講的缺陷。

ScheduledThreadPoolExecutor每個(gè)任務(wù)都有對(duì)應(yīng)的執(zhí)行線程警绩,時(shí)間使用相對(duì)時(shí)間計(jì)算崇败,也就沒(méi)有上面的缺陷,所以沒(méi)有理由使用Timer了肩祥。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末后室,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子混狠,更是在濱河造成了極大的恐慌岸霹,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,204評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件檀蹋,死亡現(xiàn)場(chǎng)離奇詭異松申,居然都是意外死亡云芦,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén)贸桶,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)舅逸,“玉大人,你說(shuō)我怎么就攤上這事皇筛×鹄” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,548評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵水醋,是天一觀的道長(zhǎng)旗笔。 經(jīng)常有香客問(wèn)我,道長(zhǎng)拄踪,這世上最難降的妖魔是什么蝇恶? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,657評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮惶桐,結(jié)果婚禮上撮弧,老公的妹妹穿的比我還像新娘。我一直安慰自己姚糊,他們只是感情好贿衍,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,689評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著救恨,像睡著了一般贸辈。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上肠槽,一...
    開(kāi)封第一講書(shū)人閱讀 51,554評(píng)論 1 305
  • 那天泽西,我揣著相機(jī)與錄音吐葱,去河邊找鬼岩瘦。 笑死掸哑,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的筋栋。 我是一名探鬼主播炊汤,決...
    沈念sama閱讀 40,302評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼弊攘!你這毒婦竟也來(lái)了抢腐?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,216評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤襟交,失蹤者是張志新(化名)和其女友劉穎迈倍,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體捣域,經(jīng)...
    沈念sama閱讀 45,661評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡啼染,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,851評(píng)論 3 336
  • 正文 我和宋清朗相戀三年宴合,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片迹鹅。...
    茶點(diǎn)故事閱讀 39,977評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡卦洽,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出斜棚,到底是詐尸還是另有隱情阀蒂,我是刑警寧澤,帶...
    沈念sama閱讀 35,697評(píng)論 5 347
  • 正文 年R本政府宣布弟蚀,位于F島的核電站蚤霞,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏义钉。R本人自食惡果不足惜昧绣,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,306評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望捶闸。 院中可真熱鬧滞乙,春花似錦、人聲如沸鉴嗤。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,898評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)醉锅。三九已至,卻和暖如春发绢,著一層夾襖步出監(jiān)牢的瞬間硬耍,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,019評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工边酒, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留经柴,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,138評(píng)論 3 370
  • 正文 我出身青樓墩朦,卻偏偏與公主長(zhǎng)得像坯认,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子氓涣,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,927評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容