線程池之ScheduledThreadPoolExecutor調(diào)度原理

ScheduledThreadPoolExecutor 的調(diào)度原理主要基于兩個(gè)內(nèi)部類乖坠,ScheduledFutureTask 和 DelayedWorkQueue:

  1. ScheduledFutureTask 是對(duì)任務(wù)的一層封裝,將我們提交的 Runnable 或 Callable 封裝成具有時(shí)間周期的任務(wù)雁芙;
  2. DelayedWorkQueue 實(shí)現(xiàn)了對(duì) ScheduledFutureTask 的延遲出隊(duì)管理;

ScheduledFutureTask

ScheduledFutureTask類圖

ScheduledFutureTask有以下幾種構(gòu)造方法:

ScheduledFutureTask(Runnable r, V result, long ns) {
    super(r, result);
    this.time = ns;
    this.period = 0;
    this.sequenceNumber = sequencer.getAndIncrement();
}

ScheduledFutureTask(Runnable r, V result, long ns, long period) {
    super(r, result);
    this.time = ns;
    this.period = period;
    this.sequenceNumber = sequencer.getAndIncrement();
}

ScheduledFutureTask(Callable<V> callable, long ns) {
    super(callable);
    this.time = ns;
    this.period = 0;
    this.sequenceNumber = sequencer.getAndIncrement();
}

super 中調(diào)用 FutureTask 的構(gòu)造方法骨坑,可以參考 FutureTask實(shí)現(xiàn)原理洪乍。ScheduledFutureTask 主要配置參數(shù)如下:

名稱 含義
time 任務(wù)能夠執(zhí)行的時(shí)間點(diǎn)(單位:nanoTime )
period 正值表示固定時(shí)間周期執(zhí)行。
負(fù)值表示固定延遲周期執(zhí)行匾乓。
0表示非重復(fù)任務(wù)。
sequenceNumber FIFO調(diào)度序列值(用 AtomicLong 實(shí)現(xiàn))

注意:period 大于 0 或 小于 0 時(shí)又谋,都是周期性執(zhí)行的拼缝,只是執(zhí)行時(shí)間規(guī)律不一樣。

ScheduledFutureTask 的主要調(diào)度輔助方法如下:

// 任務(wù)的延遲執(zhí)行時(shí)間
public long getDelay(TimeUnit unit) {
    return unit.convert(time - now(), NANOSECONDS);
}
//實(shí)現(xiàn)任務(wù)的排序彰亥,執(zhí)行時(shí)間越小越靠前咧七,相同則按照隊(duì)列FIFO順序
public int compareTo(Delayed other) {
    if (other == this) // compare zero if same object
        return 0;
    if (other instanceof ScheduledFutureTask) {
        ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
        long diff = time - x.time;
        if (diff < 0)
            return -1;
        else if (diff > 0)
            return 1;
        else if (sequenceNumber < x.sequenceNumber) // 時(shí)間一樣時(shí),按照FIFO的順序
            return -1;
        else
            return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

// 是否是周期性任務(wù)
public boolean isPeriodic() {
    return period != 0;
}
// 設(shè)置下一次運(yùn)行時(shí)間
private void setNextRunTime() {
    long p = period;
    if (p > 0)
        time += p; // 按固定時(shí)間周期剩愧,下次執(zhí)行時(shí)間為上次執(zhí)行時(shí)間 + 周期時(shí)間
    else
        time = triggerTime(-p); // 按固定延時(shí)周期猪叙,下次執(zhí)行時(shí)間為當(dāng)前時(shí)間 + 延時(shí)時(shí)間
}

核心 run 方法

public void run() {
    boolean periodic = isPeriodic();
    if (!canRunInCurrentRunState(periodic)) // 判斷是否可以運(yùn)行任務(wù)
        cancel(false);  // 取消任務(wù),移除隊(duì)列
    else if (!periodic) // 非周期性任務(wù) 直接調(diào)用父類 FutureTask 的 run 方法
        ScheduledFutureTask.super.run();
    else if (ScheduledFutureTask.super.runAndReset()) {  // 周期性任務(wù)仁卷,調(diào)用父類 runAndReset 方法穴翩,返回是否執(zhí)行成功
        // 執(zhí)行成功后繼續(xù)設(shè)置下一次運(yùn)行時(shí)間
        setNextRunTime(); 
        // 重新執(zhí)行周期性任務(wù)(可能因?yàn)榫€程池運(yùn)行狀態(tài)的改變而被拒絕)
        reExecutePeriodic(outerTask);
    }
}

對(duì)于周期性任務(wù),在 run 方法中執(zhí)行成功后會(huì)繼續(xù)設(shè)置下一次執(zhí)行時(shí)間锦积,并把任務(wù)加入延時(shí)隊(duì)列芒帕。但需注意,如果任務(wù)執(zhí)行失敗丰介,將不會(huì)再被周期性調(diào)用背蟆。所以在可能執(zhí)行失敗的周期性任務(wù)中鉴分,必須做好異常處理。

DelayedWorkQueue

DelayedWorkQueue 是一個(gè)延時(shí)有序隊(duì)列带膀,內(nèi)部采用 數(shù)組 維護(hù)隊(duì)列元素志珍,采用 堆排序 的思想維護(hù)隊(duì)列順序,并在隊(duì)列元素(ScheduledFutureTask)建立索引垛叨,支持快速刪除伦糯。

注意:DelayedWorkQueue 的整個(gè)隊(duì)列不是完全有序的,只保證元素有序出隊(duì)嗽元。

DelayedWorkQueue類圖

下面詳細(xì)講解 DelayedWorkQueue 的實(shí)現(xiàn):

核心入隊(duì)方法:

public boolean add(Runnable e) {
      return offer(e);
}

public boolean offer(Runnable x) {
    if (x == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = size;
        if (i >= queue.length)
            grow(); // 隊(duì)列擴(kuò)容 類似 ArrayList 擴(kuò)容
        size = i + 1;
        if (i == 0) { // 隊(duì)列為空敛纲,直接加入
            queue[0] = e;
            setIndex(e, 0); // 設(shè)置元素在隊(duì)列的索引,即告訴元素自己在隊(duì)列的第幾位
        } else {
            siftUp(i, e); // 放入適當(dāng)?shù)奈恢?        }
        if (queue[0] == e) {
            leader = null; // 等待隊(duì)列頭的線程
            available.signal(); // 通知
        }
    } finally {
        lock.unlock();
    }
    return true;
}

入隊(duì)方法中最重要的是 siftUp 方法剂癌, sift 在英文單詞中是 的意思淤翔,這里可將 siftUp 理解為向前篩,找到合適的 堆排序點(diǎn) 加進(jìn)去佩谷。

private void siftUp(int k, RunnableScheduledFuture<?> key) {
    while (k > 0) {
        int parent = (k - 1) >>> 1; // (k-1)/2
        RunnableScheduledFuture<?> e = queue[parent];
        if (key.compareTo(e) >= 0)
            break;
        queue[k] = e;
        setIndex(e, k);
        k = parent;
    }
    queue[k] = key;
    setIndex(key, k);
}

siftUp 主要思想是將新增的任務(wù)與前 (k-1)/2 的位置比較旁壮,如果任務(wù)執(zhí)行時(shí)間較近者替換位置 (k-1)/2。依次往前比較琳要,直到無(wú)替換發(fā)生寡具。每次新增元素調(diào)用 siftUp 僅能保證第一個(gè)元素是最小的秤茅。整個(gè)隊(duì)列不一定有序:

例將:5 10 9 3 依次入隊(duì),隊(duì)列變化如下
 [5]
 [5,10]
 [5,9,10]
 [3,5,10,9] 

如果對(duì)上述的入隊(duì)方式不了解稚补,可用下面的排序代碼進(jìn)行斷點(diǎn)調(diào)試:

// DelayedWorkQueue 的入隊(duì)、出隊(duì)排序模擬
public class SortArray {
    Integer[] queue = new Integer[16];

    int size = 0;

    public static void main(String[] args) {
        SortArray array = new SortArray();
        array.add(5);
        array.add(9);
        array.add(10);
        array.add(3);
        System.out.println(array.take());
        System.out.println(array.take());
        System.out.println(array.take());
        System.out.println(array.take());
    }

    boolean add(Integer e) {
        if (e == null)
            throw new NullPointerException();
        int i = size;
        size = i + 1;
        if (i == 0) {
            queue[0] = e;
        } else {
            siftUp(i, e);
        }
        return true;
    }
    
    Integer take() {
        Integer i = queue[0];
        int s = --size;
        Integer k = queue[s];
        if (size != 0)
            siftDown(0, k);
        return i;
    }

    private void siftUp(int k, Integer key) {
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Integer e = queue[parent];
            if (key.compareTo(e) >= 0)
                break;
            queue[k] = e;
            k = parent;
        }
        queue[k] = key;
    }
    
     private void siftDown(int k, Integer key) {
         int half = size >>> 1;
         while (k < half) {
             int child = (k << 1) + 1;
             Integer c = queue[child];
             int right = child + 1;
             if (right < size && c.compareTo(queue[right]) > 0)
                 c = queue[child = right];
             if (key.compareTo(c) <= 0)
                 break;
             queue[k] = c;
             k = child;
         }
         queue[k] = key;
     }
}

核心出隊(duì)方法:

public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            // 直接獲取隊(duì)首任務(wù)
            RunnableScheduledFuture<?> first = queue[0];
            if (first == null) // 空 則等待
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS); // 看任務(wù)是否可以執(zhí)行
                if (delay <= 0)
                    return finishPoll(first); // 可執(zhí)行框喳,則進(jìn)行出隊(duì)操作
                // 可不執(zhí)行课幕,還需等待,則往下走
                first = null; 
                // 看是否有正在等待的leader線程
                if (leader != null)
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay); // 延時(shí)等待
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}

代碼中的 available 是一個(gè)信號(hào)量五垮,會(huì)在隊(duì)列的頭部有新任務(wù)變?yōu)榭捎没蛘咝戮€程可能需要成為領(lǐng)導(dǎo)者時(shí)乍惊,發(fā)出信號(hào)。

private final Condition available = lock.newCondition();

take() 方法中重要的方法是 finishPoll(first) 思犁,主要進(jìn)行出隊(duì)時(shí)維護(hù)隊(duì)列順序:

private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
    int s = --size;
    RunnableScheduledFuture<?> x = queue[s];
    queue[s] = null;
    if (s != 0)
        siftDown(0, x);
    setIndex(f, -1);
    return f;
}

private void siftDown(int k, RunnableScheduledFuture<?> key) {
    int half = size >>> 1;
    while (k < half) {
        int child = (k << 1) + 1;
        RunnableScheduledFuture<?> c = queue[child];
        int right = child + 1;
        if (right < size && c.compareTo(queue[right]) > 0)
            c = queue[child = right];
        if (key.compareTo(c) <= 0)
            break;
        queue[k] = c;
        setIndex(c, k);
        k = child;
    }
    queue[k] = key;
    setIndex(key, k);
}

siftDown 跟前面的 siftUp 很像遂蛀,它也只能保證出隊(duì)后下一個(gè)仍為最近的任務(wù)拧咳。并不會(huì)移動(dòng)和清理整個(gè)隊(duì)列。

還是用上面列出的 SortArray 這個(gè)類為例:

    public static void main(String[] args) {
        SortArray array = new SortArray();
        array.add(5);
        array.add(9);
        array.add(10);
        array.add(3);
        System.out.println(Arrays.toString(array.queue));
        System.out.println(array.take());
        System.out.println(array.take());
        System.out.println(array.take());
        System.out.println(array.take());
        System.out.println(Arrays.toString(array.queue));
        array.add(20);
        array.add(4);
        System.out.println(Arrays.toString(array.queue));
    }

我們先將5,9,10,3 依次入隊(duì)莉撇,然后全部出隊(duì),再入隊(duì) 20,4惶傻,我們看下最后的隊(duì)列里面的數(shù)據(jù)是什么樣子:

[3, 5, 10, 9, null, null, null, null, null, null, null, null, null, null, null, null]
3
5
9
10
[10, 10, 10, 9, null, null, null, null, null, null, null, null, null, null, null, null]
[4, 20, 10, 9, null, null, null, null, null, null, null, null, null, null, null, null]

看了這個(gè)結(jié)果你可能有點(diǎn)奇怪棍郎,已經(jīng)出隊(duì)了的元素居然還在隊(duì)列里面。這是一種 lazy 策略银室,DelayedWorkQueue 并不會(huì)真正直接清理掉隊(duì)列里出隊(duì)的元素涂佃,用 size 來(lái)控制隊(duì)列的邏輯大小励翼,并發(fā)物理實(shí)際大小,后來(lái)的元素會(huì)根據(jù)size來(lái)覆蓋原有的元素辜荠。

關(guān)于 DelayedWorkQueue 的出隊(duì)和入隊(duì)還有疑問(wèn)的汽抚,可以自己調(diào)試 SortArray 的代碼,看看不同的情況的不同處理結(jié)果伯病。DelayedWorkQueue 的 siftUp 殊橙、siftDown 這種排序策略非常高效,并非維護(hù)整個(gè)隊(duì)列實(shí)時(shí)有序狱从,只保證第一個(gè)出隊(duì)元素的正確性膨蛮。

元素刪除

上文有提到 ScheduledFutureTask 的索引,DelayedWorkQueue 運(yùn)用索引可以快速定位刪除元素:

public boolean remove(Object x) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = indexOf(x);
        if (i < 0)
            return false;

        setIndex(queue[i], -1);
        int s = --size;
        RunnableScheduledFuture<?> replacement = queue[s];
        queue[s] = null;
        if (s != i) {
            siftDown(i, replacement); // 順序調(diào)整
            if (queue[i] == replacement)
                siftUp(i, replacement);
        }
        return true;
    } finally {
        lock.unlock();
    }
}

// 使用索引獲取下標(biāo)
private int indexOf(Object x) {
    if (x != null) {
        if (x instanceof ScheduledFutureTask) {
            int i = ((ScheduledFutureTask) x).heapIndex; // 索引
            if (i >= 0 && i < size && queue[i] == x)
                return i;
        } else {
            for (int i = 0; i < size; i++)
                if (x.equals(queue[i]))
                    return i;
        }
    }
    return -1;
}

remove方法里面首先利用 indexOf 調(diào)用索引獲取下標(biāo)季研,然后使用 siftDown敞葛,siftUp 來(lái)調(diào)整隊(duì)列順序。這里索引的使用能夠極大提高元素定位的效率与涡,尤其是在隊(duì)列比較長(zhǎng)的時(shí)候惹谐。

最后思考一個(gè)問(wèn)題:為什么 DelayedWorkQueue 使用數(shù)組而不是鏈表結(jié)構(gòu)?

個(gè)人認(rèn)為驼卖,因?yàn)槭褂脭?shù)據(jù)結(jié)構(gòu)氨肌,利用下標(biāo)快速訪問(wèn),可以發(fā)揮基于 siftDown酌畜,siftUp 的高效排序算法怎囚,而鏈表的下標(biāo)訪問(wèn)效率低,因此選擇使用數(shù)組桥胞。

多線程系列目錄(不斷更新中):
線程啟動(dòng)原理
線程中斷機(jī)制
多線程實(shí)現(xiàn)方式
FutureTask實(shí)現(xiàn)原理
線程池之ThreadPoolExecutor概述
線程池之ThreadPoolExecutor使用
線程池之ThreadPoolExecutor狀態(tài)控制
線程池之ThreadPoolExecutor執(zhí)行原理
線程池之ScheduledThreadPoolExecutor概述
線程池之ScheduledThreadPoolExecutor調(diào)度原理
線程池的優(yōu)雅關(guān)閉實(shí)踐

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末恳守,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子贩虾,更是在濱河造成了極大的恐慌催烘,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,122評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件缎罢,死亡現(xiàn)場(chǎng)離奇詭異伊群,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)策精,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門舰始,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人蛮寂,你說(shuō)我怎么就攤上這事蔽午。” “怎么了酬蹋?”我有些...
    開(kāi)封第一講書人閱讀 164,491評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵及老,是天一觀的道長(zhǎng)抽莱。 經(jīng)常有香客問(wèn)我,道長(zhǎng)骄恶,這世上最難降的妖魔是什么食铐? 我笑而不...
    開(kāi)封第一講書人閱讀 58,636評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮僧鲁,結(jié)果婚禮上虐呻,老公的妹妹穿的比我還像新娘。我一直安慰自己寞秃,他們只是感情好斟叼,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,676評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著春寿,像睡著了一般朗涩。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上绑改,一...
    開(kāi)封第一講書人閱讀 51,541評(píng)論 1 305
  • 那天谢床,我揣著相機(jī)與錄音,去河邊找鬼厘线。 笑死识腿,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的造壮。 我是一名探鬼主播渡讼,決...
    沈念sama閱讀 40,292評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼费薄!你這毒婦竟也來(lái)了硝全?” 一聲冷哼從身側(cè)響起栖雾,我...
    開(kāi)封第一講書人閱讀 39,211評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤楞抡,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后析藕,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體召廷,經(jīng)...
    沈念sama閱讀 45,655評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,846評(píng)論 3 336
  • 正文 我和宋清朗相戀三年账胧,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了竞慢。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,965評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡治泥,死狀恐怖筹煮,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情居夹,我是刑警寧澤败潦,帶...
    沈念sama閱讀 35,684評(píng)論 5 347
  • 正文 年R本政府宣布本冲,位于F島的核電站,受9級(jí)特大地震影響劫扒,放射性物質(zhì)發(fā)生泄漏檬洞。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,295評(píng)論 3 329
  • 文/蒙蒙 一沟饥、第九天 我趴在偏房一處隱蔽的房頂上張望添怔。 院中可真熱鬧,春花似錦贤旷、人聲如沸广料。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 31,894評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)性昭。三九已至,卻和暖如春县遣,著一層夾襖步出監(jiān)牢的瞬間糜颠,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 33,012評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工萧求, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留其兴,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,126評(píng)論 3 370
  • 正文 我出身青樓夸政,卻偏偏與公主長(zhǎng)得像元旬,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子守问,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,914評(píng)論 2 355