【死磕Java并發(fā)】—–J.U.C之線程池:ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor解析

我們知道Timer與TimerTask雖然可以實現(xiàn)線程的周期和延遲調(diào)度热押,但是Timer與TimerTask存在一些缺陷斤贰,所以對于這種定期枯芬、周期執(zhí)行任務(wù)的調(diào)度策略,我們一般都是推薦ScheduledThreadPoolExecutor來實現(xiàn)。下面就深入分析ScheduledThreadPoolExecutor是如何來實現(xiàn)線程的周期、延遲調(diào)度的。

ScheduledThreadPoolExecutor旭寿,繼承ThreadPoolExecutor且實現(xiàn)了ScheduledExecutorService接口,它就相當(dāng)于提供了“延遲”和“周期執(zhí)行”功能的ThreadPoolExecutor崇败。在JDK API中是這樣定義它的:ThreadPoolExecutor盅称,它可另行安排在給定的延遲后運行命令肩祥,或者定期執(zhí)行命令。需要多個輔助線程時缩膝,或者要求 ThreadPoolExecutor 具有額外的靈活性或功能時混狠,此類要優(yōu)于 Timer。 一旦啟用已延遲的任務(wù)就執(zhí)行它疾层,但是有關(guān)何時啟用将饺,啟用后何時執(zhí)行則沒有任何實時保證。按照提交的先進(jìn)先出 (FIFO) 順序來啟用那些被安排在同一執(zhí)行時間的任務(wù)痛黎。 它提供了四個構(gòu)造方法:

public ScheduledThreadPoolExecutor(int corePoolSize) {
   super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); 
} 

public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
   super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); 
} 

public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
   super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler); 
} 

public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
   super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); 
}

當(dāng)然我們一般都不會直接通過其構(gòu)造函數(shù)來生成一個ScheduledThreadPoolExecutor對象(例如new ScheduledThreadPoolExecutor(10)之類的)予弧,而是通過Executors類(例如Executors.newScheduledThreadPool(int);)

在ScheduledThreadPoolExecutor的構(gòu)造函數(shù)中,我們發(fā)現(xiàn)它都是利用ThreadLocalExecutor來構(gòu)造的湖饱,唯一變動的地方就在于它所使用的阻塞隊列變成了DelayedWorkQueue掖蛤,而不是ThreadLocalhExecutor的LinkedBlockingQueue(通過Executors產(chǎn)生ThreadLocalhExecutor對象)。DelayedWorkQueue為ScheduledThreadPoolExecutor中的內(nèi)部類井厌,它其實和阻塞隊列DelayQueue有點兒類似蚓庭。DelayQueue是可以提供延遲的阻塞隊列,它只有在延遲期滿時才能從中提取元素仅仆,其列頭是延遲期滿后保存時間最長的Delayed元素器赞。如果延遲都還沒有期滿,則隊列沒有頭部墓拜,并且 poll 將返回 null港柜。有關(guān)于DelayQueue的更多介紹請參考這篇博客【死磕Java并發(fā)】-----J.U.C之阻塞隊列:DelayQueue。所以DelayedWorkQueue中的任務(wù)必然是按照延遲時間從短到長來進(jìn)行排序的撮弧。下面我們再深入分析DelayedWorkQueue潘懊,這里留一個引子。 ScheduledThreadPoolExecutor提供了如下四個方法贿衍,也就是四個調(diào)度器:

  1. schedule(Callable callable, long delay, TimeUnit unit) :創(chuàng)建并執(zhí)行在給定延遲后啟用的 ScheduledFuture。
  2. schedule(Runnable command, long delay, TimeUnit unit) :創(chuàng)建并執(zhí)行在給定延遲后啟用的一次性操作救恨。
  3. scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) :創(chuàng)建并執(zhí)行一個在給定初始延遲后首次啟用的定期操作贸辈,后續(xù)操作具有給定的周期;也就是將在 initialDelay 后開始執(zhí)行肠槽,然后在 initialDelay+period 后執(zhí)行擎淤,接著在 initialDelay + 2 * period 后執(zhí)行,依此類推秸仙。
  4. scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) :創(chuàng)建并執(zhí)行一個在給定初始延遲后首次啟用的定期操作嘴拢,隨后,在每一次執(zhí)行終止和下一次執(zhí)行開始之間都存在給定的延遲寂纪。

第一席吴、二個方法差不多赌结,都是一次性操作,只不過參數(shù)一個是Callable孝冒,一個是Runnable柬姚。稍微分析下第三(scheduleAtFixedRate)、四個(scheduleWithFixedDelay)方法庄涡,加入initialDelay = 5量承,period/delay = 3,unit為秒穴店。如果每個線程都是都運行非常良好不存在延遲的問題撕捍,那么這兩個方法線程運行周期是5、8泣洞、11卦洽、14、17.......斜棚,但是如果存在延遲呢阀蒂?比如第三個線程用了5秒鐘,那么這兩個方法的處理策略是怎樣的弟蚀?第三個方法(scheduleAtFixedRate)是周期固定蚤霞,也就說它是不會受到這個延遲的影響的,每個線程的調(diào)度周期在初始化時就已經(jīng)絕對了义钉,是什么時候調(diào)度就是什么時候調(diào)度昧绣,它不會因為上一個線程的調(diào)度失效延遲而受到影響。但是第四個方法(scheduleWithFixedDelay)捶闸,則不一樣夜畴,它是每個線程的調(diào)度間隔固定,也就是說第一個線程與第二線程之間間隔delay删壮,第二個與第三個間隔delay贪绘,以此類推。如果第二線程推遲了那么后面所有的線程調(diào)度都會推遲央碟,例如税灌,上面第二線程推遲了2秒,那么第三個就不再是11秒執(zhí)行了亿虽,而是13秒執(zhí)行菱涤。 查看著四個方法的源碼,會發(fā)現(xiàn)其實他們的處理邏輯都差不多洛勉,所以我們就挑scheduleWithFixedDelay方法來分析粘秆,如下:

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
    if (command == null || unit == null) throw new NullPointerException(); 
    if (delay <= 0) throw new IllegalArgumentException(); 
    ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay)); 
    RunnableScheduledFuture<Void> t = decorateTask(command, sft); 
    sft.outerTask = t; 
    delayedExecute(t); 
    return t;
 }

scheduleWithFixedDelay方法處理的邏輯如下:

  1. 校驗,如果參數(shù)不合法則拋出異常
  2. 構(gòu)造一個task收毫,該task為ScheduledFutureTask
  3. 調(diào)用delayedExecute()方法做后續(xù)相關(guān)處理

這段代碼涉及兩個類ScheduledFutureTask和RunnableScheduledFuture攻走,其中RunnableScheduledFuture不用多說殷勘,他繼承RunnableFuture和ScheduledFuture兩個接口,除了具備RunnableFuture和ScheduledFuture兩類特性外陋气,它還定義了一個方法isPeriodic() 劳吠,該方法用于判斷執(zhí)行的任務(wù)是否為定期任務(wù),如果是則返回true巩趁。而ScheduledFutureTask作為ScheduledThreadPoolExecutor的內(nèi)部類痒玩,它扮演著極其重要的作用,因為它的作用則是負(fù)責(zé)ScheduledThreadPoolExecutor中任務(wù)的調(diào)度议慰。 ScheduledFutureTask內(nèi)部繼承FutureTask蠢古,實現(xiàn)RunnableScheduledFuture接口,它內(nèi)部定義了三個比較重要的變量

/** 任務(wù)被添加到ScheduledThreadPoolExecutor中的序號 */ 
private final long sequenceNumber; 

/** 任務(wù)要執(zhí)行的具體時間 */ 
private long time; 

/**  任務(wù)的間隔周期 */ 
private final long period;

這三個變量與任務(wù)的執(zhí)行有著非常密切的關(guān)系别凹,什么關(guān)系草讶?先看ScheduledFutureTask的幾個構(gòu)造函數(shù)和核心方法:

ScheduledFutureTask(Runnable r, V result, long ns) {
    super(r, result); 
    this.time = ns; 
    this.period = 0; 
    this.sequenceNumber = sequencer.getAndIncrement(); 
} 

ScheduledFutureTask(Runnable r, V result, long ns, long period) { 
    super(r, result); 
    this.time = ns; 
    this.period = period; 
    this.sequenceNumber = sequencer.getAndIncrement(); 
} 

ScheduledFutureTask(Callable<V> callable, long ns) { 
    super(callable); 
    this.time = ns; 
    this.period = 0; 
    this.sequenceNumber = sequencer.getAndIncrement(); 
} 

ScheduledFutureTask(Callable<V> callable, long ns) { 
    super(callable); 
    this.time = ns; 
    this.period = 0; 
    this.sequenceNumber = sequencer.getAndIncrement(); 
}

ScheduledFutureTask 提供了四個構(gòu)造方法,這些構(gòu)造方法與上面三個參數(shù)是不是一一對應(yīng)了炉菲?這些參數(shù)有什么用堕战,如何用,則要看ScheduledFutureTask在那些方法使用了該方法拍霜,在ScheduledFutureTask中有一個compareTo()方法:

public int compareTo(Delayed other) { 
    if (other == this) // compare zero if same object return 0; 
    if (other instanceof ScheduledFutureTask) { 
        ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; 
        long diff = time - x.time; 
        if (diff < 0) return -1; 
        else if (diff > 0) return 1; 
        else if (sequenceNumber < x.sequenceNumber) return -1; 
        else return 1; 
    } 
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); 
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; 
}

相信各位都知道該方法是干嘛用的嘱丢,提供一個排序算法,該算法規(guī)則是:首先按照time排序祠饺,time小的排在前面越驻,大的排在后面,如果time相同道偷,則使用sequenceNumber排序缀旁,小的排在前面,大的排在后面勺鸦。那么為什么在這個類里面提供compareTo()方法呢并巍?在前面就介紹過ScheduledThreadPoolExecutor在構(gòu)造方法中提供的是DelayedWorkQueue()隊列中,也就是說ScheduledThreadPoolExecutor是把任務(wù)添加到DelayedWorkQueue中的祝旷,而DelayedWorkQueue則是類似于DelayQueue履澳,內(nèi)部維護(hù)著一個以時間為先后順序的隊列,所以compareTo()方法使用與DelayedWorkQueue隊列對其元素ScheduledThreadPoolExecutor task進(jìn)行排序的算法怀跛。 排序已經(jīng)解決了,那么ScheduledThreadPoolExecutor 是如何對task任務(wù)進(jìn)行調(diào)度和延遲的呢柄冲?任何線程的執(zhí)行吻谋,都是通過run()方法執(zhí)行,ScheduledThreadPoolExecutor 的run()方法如下:

public void run() { 
    boolean periodic = isPeriodic(); 
    if (!canRunInCurrentRunState(periodic)) 
        cancel(false); 
    else if (!periodic) ScheduledFutureTask.super.run(); 
    else if (ScheduledFutureTask.super.runAndReset()) { 
        setNextRunTime(); 
        reExecutePeriodic(outerTask); 
    } 
}
  1. 調(diào)用isPeriodic()獲取該線程是否為周期性任務(wù)標(biāo)志现横,然后調(diào)用canRunInCurrentRunState()方法判斷該線程是否可以執(zhí)行漓拾,如果不可以執(zhí)行則調(diào)用cancel()取消任務(wù)阁最。
  2. 如果當(dāng)線程已經(jīng)到達(dá)了執(zhí)行點,則調(diào)用run()方法執(zhí)行task骇两,該run()方法是在FutureTask中定義的速种。
  3. 否則調(diào)用runAndReset()方法運行并充值,調(diào)用setNextRunTime()方法計算任務(wù)下次的執(zhí)行時間低千,重新把任務(wù)添加到隊列中配阵,讓該任務(wù)可以重復(fù)執(zhí)行。

isPeriodic() 該方法用于判斷指定的任務(wù)是否為定期任務(wù)示血。

public boolean isPeriodic() { 
    return period != 0; 
}

canRunInCurrentRunState()判斷任務(wù)是否可以取消棋傍,cancel()取消任務(wù),這兩個方法比較簡單难审,而run()執(zhí)行任務(wù)瘫拣,runAndReset()運行并重置狀態(tài),牽涉比較廣告喊,我們放在FutureTask后面介紹麸拄。所以重點介紹setNextRunTime()和reExecutePeriodic()這兩個涉及到延遲的方法。 setNextRunTime() setNextRunTime()方法用于重新計算任務(wù)的下次執(zhí)行時間黔姜。如下:

private void setNextRunTime() { 
    long p = period; 
    if (p > 0) time += p; 
    else time = triggerTime(-p); 
}

該方法定義很簡單拢切,p > 0 ,time += p ,否則調(diào)用triggerTime()方法重新計算time:

long triggerTime(long delay) { 
    return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); 
}

reExecutePeriodic

void reExecutePeriodic(RunnableScheduledFuture<?> task) { 
    if (canRunInCurrentRunState(true)) { 
        super.getQueue().add(task); 
    if (!canRunInCurrentRunState(true) && remove(task)) 
        task.cancel(false); else ensurePrestart(); 
    } 
}

reExecutePeriodic重要的是調(diào)用super.getQueue().add(task);將任務(wù)task加入的隊列DelayedWorkQueue中地淀。ensurePrestart()在【死磕Java并發(fā)】-----J.U.C之線程池:ThreadPoolExecutor已經(jīng)做了詳細(xì)介紹失球。 到這里ScheduledFutureTask已經(jīng)介紹完了,ScheduledFutureTask在ScheduledThreadPoolExecutor扮演作用的重要性不言而喻帮毁。其實ScheduledThreadPoolExecutor的實現(xiàn)不是很復(fù)雜实苞,因為有FutureTask和ThreadPoolExecutor的支撐,其實現(xiàn)就顯得不是那么難了烈疚。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末黔牵,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子爷肝,更是在濱河造成了極大的恐慌猾浦,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,590評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件灯抛,死亡現(xiàn)場離奇詭異金赦,居然都是意外死亡,警方通過查閱死者的電腦和手機对嚼,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,157評論 3 399
  • 文/潘曉璐 我一進(jìn)店門夹抗,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人纵竖,你說我怎么就攤上這事漠烧⌒臃撸” “怎么了?”我有些...
    開封第一講書人閱讀 169,301評論 0 362
  • 文/不壞的土叔 我叫張陵已脓,是天一觀的道長珊楼。 經(jīng)常有香客問我,道長度液,這世上最難降的妖魔是什么厕宗? 我笑而不...
    開封第一講書人閱讀 60,078評論 1 300
  • 正文 為了忘掉前任,我火速辦了婚禮恨诱,結(jié)果婚禮上媳瞪,老公的妹妹穿的比我還像新娘。我一直安慰自己照宝,他們只是感情好蛇受,可當(dāng)我...
    茶點故事閱讀 69,082評論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著厕鹃,像睡著了一般兢仰。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上剂碴,一...
    開封第一講書人閱讀 52,682評論 1 312
  • 那天把将,我揣著相機與錄音,去河邊找鬼忆矛。 笑死察蹲,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的催训。 我是一名探鬼主播洽议,決...
    沈念sama閱讀 41,155評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼漫拭!你這毒婦竟也來了亚兄?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 40,098評論 0 277
  • 序言:老撾萬榮一對情侶失蹤采驻,失蹤者是張志新(化名)和其女友劉穎审胚,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體礼旅,經(jīng)...
    沈念sama閱讀 46,638評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡膳叨,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,701評論 3 342
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了痘系。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片懒鉴。...
    茶點故事閱讀 40,852評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖碎浇,靈堂內(nèi)的尸體忽然破棺而出临谱,到底是詐尸還是另有隱情,我是刑警寧澤奴璃,帶...
    沈念sama閱讀 36,520評論 5 351
  • 正文 年R本政府宣布悉默,位于F島的核電站,受9級特大地震影響苟穆,放射性物質(zhì)發(fā)生泄漏抄课。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 42,181評論 3 335
  • 文/蒙蒙 一雳旅、第九天 我趴在偏房一處隱蔽的房頂上張望跟磨。 院中可真熱鬧,春花似錦攒盈、人聲如沸抵拘。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,674評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽僵蛛。三九已至,卻和暖如春迎变,著一層夾襖步出監(jiān)牢的瞬間充尉,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,788評論 1 274
  • 我被黑心中介騙來泰國打工衣形, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留驼侠,地道東北人。 一個月前我還...
    沈念sama閱讀 49,279評論 3 379
  • 正文 我出身青樓谆吴,卻偏偏與公主長得像倒源,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子纪铺,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,851評論 2 361