定時任務(wù)線程池ScheduledThreadPool源碼

基本用法

class Main {
    public static void main(String[] args) {
        ScheduledThreadPoolExecutor poolExecutor = new ScheduledThreadPoolExecutor(1);
        poolExecutor.scheduleAtFixedRate(new task(), 5, 10, TimeUnit.SECONDS);
    }

    public static class task implements Runnable {
        @Override
        public void run() {
            try {
                 Thread.sleep(5000);
             } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

帶著問題看源碼

實戰(zhàn)案例

案發(fā)現(xiàn)場

之前寫過的一段原生Java定期執(zhí)行任務(wù)的代碼 設(shè)想效果是每天早上八點跑一次任務(wù)

寫上面那段代碼時碰到兩個問題

一迹缀、ScheduledThreadPool好像還挺復雜 比起直接用while(true) sleep 用 ScheduledThreadPool這種方式有什么性能優(yōu)勢嗎

二贮勃、因為數(shù)據(jù)量很大可能耗時很久 當設(shè)置每天8點執(zhí)行一次 如果到第二天8點 上一次的任務(wù)還沒執(zhí)行完成 會發(fā)生什么

image.png

類結(jié)構(gòu)

image.png

實現(xiàn)了ScheduledExecutorService接口并繼承了ThreadPoolExecutor

同樣是出自熟悉的狗哥之手

先拖到最后看下本次目標多少 也不多 加上注釋以及最后一行留白一共1284行

image.png

提交任務(wù)

提交任務(wù)的三種方式

  • pool.schedule 提交一次性任務(wù)
  • pool.scheduleWithFixedDelay 提交周期任務(wù)(固定時間間隔)
  • pool.scheduleAtFixedRate 提交周期任務(wù)(固定執(zhí)行時間)
image.png

提交任務(wù)方式一

image.png

提交任務(wù)方式二

與上一個方法基本一致 兩處不同

1)多了一步outerTask賦值

2)多傳了一個參數(shù) delay 執(zhí)行延遲時間

image.png

提交任務(wù)方式三

與方式二代碼幾乎一模一樣 (找不同 重要伏筆

一個想法:這里是不是可以用個方法重載減少代碼重復呢

image.png

接下來看三種提交方式都有的核心方法:delayedExcute( ) 將任務(wù)加入父類隊列

image.png
image.png
image.png

任務(wù)最終被加到了父類ThreadPoolExecutor中的workQueue工作隊列中

image.png

提交階段總結(jié):提交任務(wù)方法完成了兩件事:

1)把用戶傳入的runnable任務(wù)封裝成一個ScheduledFutureTask內(nèi)部類對象

2)將該ScheduledFutureTask添加到父類ThreadPoolExecutorworkQueue工作隊列中

執(zhí)行任務(wù)

用戶提交的任務(wù)被封裝成ScheduledFutureTask 該內(nèi)部類中有一個run方法用于執(zhí)行任務(wù)

內(nèi)部類ScheduledFutureTask源碼

private class ScheduledFutureTask<V>
        extends FutureTask<V> implements RunnableScheduledFuture<V> {

    //任務(wù)添加到ScheduledThreadPoolExecutor中被分配的唯一序列號
    private final long sequenceNumber;

    //該任務(wù)的執(zhí)行時間
    private long time;

    //該任務(wù)的循環(huán)周期(延遲時間)
    private final long period;

    //重新入隊的實際任務(wù) 實際指向當前對象本身
    RunnableScheduledFuture<V> outerTask = this;

    //當前任務(wù)在延遲隊列中的索引
        //能夠更加方便的取消當前任務(wù)
    int heapIndex;

    //構(gòu)造方法1
    ScheduledFutureTask(Runnable r, V result, long ns) {
        super(r, result);
        this.time = ns;
        this.period = 0;
        this.sequenceNumber = sequencer.getAndIncrement();
    }

    //構(gòu)造方法2
    ScheduledFutureTask(Runnable r, V result, long ns, long period) {
        super(r, result);
        this.time = ns;
        this.period = period;
        this.sequenceNumber = sequencer.getAndIncrement();
    }

    //構(gòu)造方法3
    ScheduledFutureTask(Callable<V> callable, long ns) {
        super(callable);
        this.time = ns;
        this.period = 0;
        this.sequenceNumber = sequencer.getAndIncrement();
    }
        //該任務(wù)還有多久執(zhí)行
    public long getDelay(TimeUnit unit) {
        return unit.convert(time - now(), NANOSECONDS);
    }
        //比較兩個任務(wù)誰先執(zhí)行
    public int compareTo(Delayed other) {
        if (other == this) // compare zero if same object
            return 0;
        if (other instanceof ScheduledFutureTask) {
            ScheduledFutureTask<?> x = (ScheduledFutureTask<?>) other;
            long diff = time - x.time;
            if (diff < 0)
                return -1;
            else if (diff > 0)
                return 1;
                        //如果下次執(zhí)行任務(wù)的時間相同,則會比較任務(wù)的sequenceNumber值,
                        //sequenceNumber值小的任務(wù)會排在前面叠洗。
            else if (sequenceNumber < x.sequenceNumber)
                return -1;
            else
                return 1;
        }
        long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
        return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
    }

    //是否周期性任務(wù)
    public boolean isPeriodic() {
        return period != 0;
    }

    // 設(shè)置下一次的執(zhí)行時間
    // time 為上一次任務(wù)的開始執(zhí)行時間
    // p > 0 說明是提交方式二 固定頻率執(zhí)行 time += p 在上一次開始時間的基礎(chǔ)上計算下一次執(zhí)行時間
    // p < 0 說明是提交方式三 固定延遲執(zhí)行 轉(zhuǎn)回正數(shù)之后調(diào)用一次triggerTime 上次任務(wù)執(zhí)行完畢now() + 執(zhí)行周期p 而不是time+p
    private void setNextRunTime() {
        long p = period;
        if (p > 0)
            time += p;
        else
            time = triggerTime(-p);
    }

    //overflowFree方法 防止溢出 任務(wù)運行足夠久這個long是有可能溢出
    long triggerTime(long delay) {
        // 上次任務(wù)執(zhí)行完畢now() + delay
        return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        boolean cancelled = super.cancel(mayInterruptIfRunning);
        if (cancelled && removeOnCancel && heapIndex >= 0)
            remove(this);
        return cancelled;
    }

    public void run() {
        boolean periodic = isPeriodic();
        //檢查當前狀態(tài)是否能運行 不能運行則取消任務(wù)
        if (!canRunInCurrentRunState(periodic))
            cancel(false);
            //如果能運行 則再判斷是否周期性任務(wù)
        else if (!periodic)
            //不是周期性任務(wù)則直接調(diào)用run方法執(zhí)行一次就行了
            ScheduledFutureTask.super.run();
            //如果是需要周期性執(zhí)行的任務(wù) 則調(diào)用runAndReset方法
            //runAndReset返回true代表執(zhí)行成功了 然后計算并設(shè)置出下一次的執(zhí)行時間
        else if (ScheduledFutureTask.super.runAndReset()) {
            // 設(shè)置下一次執(zhí)行時間
            setNextRunTime();
            // 將該任務(wù)重新放入隊列
            reExecutePeriodic(outerTask);
        }
    }
}

image.png

核心方法:FutureTask.runAndReset()

// 執(zhí)行任務(wù)并忽略任務(wù)返回值愧膀,執(zhí)行完畢后將該任務(wù)重置為初始狀態(tài),
// 如果計算遇到異潮肷迹或被取消毅往,則無法重置回初始狀態(tài)
// 該方法用于需要重復執(zhí)行的場景
// 如果成功運行并重置,則返回true
protected boolean runAndReset() {
    //如果狀態(tài)為NEW則接著會通過unsafe類把任務(wù)執(zhí)行線程引用CAS的保存在runner字段中 如果保存失敗派近,則直接返回攀唯;
    if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
        return false;
    boolean ran = false;
    int s = state;
    try {
        Callable<V> c = callable;
        //當成員變量callable不為null且狀態(tài)為NEW則可以執(zhí)行
        if (c != null && s == NEW) {
            try {
                c.call(); // don't set result
                //任務(wù)正常執(zhí)行完畢
                ran = true;
            } catch (Throwable ex) {
                setException(ex);
            }
        }
    } finally {
        //執(zhí)行完畢之后將runner執(zhí)行線程置為null
        runner = null;
        s = state;
        //當執(zhí)行狀態(tài)為中斷 處理中斷
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
    return ran && s == NEW;
}

是否是周期性任務(wù)的判斷依據(jù)是看period成員變量是否為0 只有提交方式一沒傳這個參數(shù) 默認值0

image.png

計算下一次的執(zhí)行時間時就用到了period的正負符號了

image.png

到這里開頭的第二個問題似乎能解決了 由于我使用的是方式二scheduleAtFixedRate

p > 0 走的是 time += p (上一次的開始時間+delay )假如開始時間是前一天早上8點

那么算出的下一次time確實是第二天8點

但是實際上第二天9點上一個任務(wù)才執(zhí)行完畢

所以第二天8點 上一個任務(wù)還沒完成

此時會重新啟動一個任務(wù) 兩個任務(wù)一起跑榕栏?

還是說只有前一次執(zhí)行完畢才會處理下一次任務(wù) 然后拿到time發(fā)現(xiàn)已經(jīng)錯過了第二天8點 然后略過第二次任務(wù) 等第三天8點再執(zhí)行噪窘?

想要回答這個問題 關(guān)鍵還得看看從延遲隊列中取出任務(wù)并執(zhí)行的那段邏輯是怎么寫的

image.png

等不及了 直接劇透 實際測試一把

image.png

測試結(jié)果顯示 問題二場景 最后結(jié)果既不是錯過第二天8點直接第三天8點執(zhí)行 也不是第二天8點兩個任務(wù)并行執(zhí)行 而是的等到第一個任務(wù)9點執(zhí)行完畢之后立即執(zhí)行第二次任務(wù)

執(zhí)行階段總結(jié):當定時任務(wù)屬于周期性任務(wù) 那么執(zhí)行任務(wù)的run方法會在任務(wù)執(zhí)行完畢后 計算出下一次的執(zhí)行時間 并將該任務(wù)重新放回DelayWorkQueue 延遲隊列

任務(wù)調(diào)度

上面我們看了任務(wù)的run方法 那么是誰去調(diào)用這個run方法 又是什么時候去調(diào)用呢

延遲阻塞隊列DelayedWorkQueue中放的元素是ScheduledFutureTask,提交的任務(wù)被包裝成ScheduledFutureTask放進工作隊列询张,Woker工作線程消費工作隊列中的任務(wù),即調(diào)用ScheduledFutureTask.run()戒幔,ScheduledFutureTask又調(diào)用任務(wù)的run()吠谢,這點和ThreadPoolExecutor差不多,而ScheduledThreadPoolExecutor是如何實現(xiàn)按時間調(diào)度的呢诗茎?

猜測可能是有一個專門負責調(diào)度的額外線程在監(jiān)聽隊列中存儲的任務(wù)的執(zhí)行時間是否到了 到了的話就取出任務(wù)調(diào)用run方法(后面發(fā)現(xiàn)猜錯了 狗哥使用了一種更加優(yōu)秀的設(shè)計)

DelayedWorkQueue源碼

image.png

DelayedWorkQueue實現(xiàn)自BlockingQueue 是高度定制化的阻塞隊列+優(yōu)先隊列 其核心數(shù)據(jù)結(jié)構(gòu)是二叉最小堆的優(yōu)先隊列工坊,隊列滿時會自動擴容,所以offer操作永遠不會阻塞敢订,maximumPoolSize也就用不上了王污,所以線程池中永遠會保持至多有corePoolSize個工作線程正在運行。

類結(jié)構(gòu)

//位于ScheduledThreadPoolExecutor中的一個內(nèi)部類
//一種定制化的延遲隊列+優(yōu)先隊列 只支持放入RunnableScheduledFutures
static class DelayedWorkQueue extends AbstractQueue<Runnable>
        implements BlockingQueue<Runnable> {
    //DelayedWorkQueue 基于基于堆的數(shù)據(jù)結(jié)構(gòu)
    //類似于 DelayQueue 和 PriorityQueue 中的數(shù)據(jù)結(jié)構(gòu)枢析,
    //除了每個 ScheduledFutureTask 還將其索引記錄到堆數(shù)組中玉掸。
    //這消除了在取消時查找任務(wù)的需要,大大加快了刪除速度(從 O(n)下降到 O(log n))醒叁,
    //并減少了垃圾保留司浪,否則會因等待元素上升到頂部而發(fā)生清除前。
    //但是因為隊列也可能包含 RunnableScheduledFutures 不是 ScheduledFutureTasks把沼,
    //我們不能保證有這樣的索引可用啊易,在這種情況下我們回退到線性搜索。
    //(我們希望大多數(shù)任務(wù)不會被修飾饮睬,并且速度更快的情況會更常見租谈。)
    //所有堆操作都必須記錄索引更改——主要是在 siftUp 和 siftDown 中。
    //刪除后捆愁,任務(wù)的heapIndex 設(shè)置為 -1割去。請注意,ScheduledFutureTasks
    //最多可以在隊列中出現(xiàn)一次(對于其他類型的任務(wù)或工作隊列不需要如此)昼丑,
    //因此由 heapIndex 唯一標識呻逆。

    // 初始容量
    private static final int INITIAL_CAPACITY = 16;
    // 隊列
    private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
    // 可重入鎖 用于隊列元素操作時加鎖
    private final ReentrantLock lock = new ReentrantLock();
    //隊列容量
    private int size = 0;
        //leader線程
    private Thread leader = null;
        //線程狀態(tài)控制器
    private final Condition available = lock.newCondition();

      private void siftUp(int k, RunnableScheduledFuture<?> key) {...}
        private void siftDown(int k, RunnableScheduledFuture<?> key) {...}
        public boolean offer(Runnable x) {...}
        public RunnableScheduledFuture<?> take() throws InterruptedException {...}
        public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit){...}
        ...
}

核心成員變量:Condition available

關(guān)鍵字synchronize可以與wait()和nitify()方法相結(jié)合實現(xiàn)實現(xiàn)等待/通知模式 類ReentrantLock也可以實現(xiàn)同樣的功能,但需要借助condition對象 condition類是在JDK5中出現(xiàn)的技術(shù)菩帝,使用他有更好的靈活性咖城,比如可以實現(xiàn)多路通知功能,也就是在一個Lock對象里可以創(chuàng)建多個condition實例呼奢,線程對象可以注冊在指定的condition中從而選擇性的進行線程通知宜雀,在調(diào)度線程上更加靈活。

  • 調(diào)用Condition的await()和signal()方法握础,都必須在lock保護之內(nèi)辐董,就是說必須在lock.lock()和lock.unlock之間才可以使用
  • Conditon中的await()對應(yīng)Object的wait();
  • Condition中的signal()對應(yīng)Object的notify()禀综;

核心成員變量:Thread leader

指定等待隊列頭部任務(wù)的線程郎哭。 Leader-Follower 模式的變體他匪。用于減少不必要的定時等待。當一個線程成為領(lǐng)導者時夸研,它只等待下一個任務(wù)的延遲時間是否到達,而其他線程則無限期地等待依鸥。領(lǐng)導線程必須在從 take() 或 poll(...) 返回之前向某個其他線程發(fā)出信號亥至,除非某個其他線程在此期間成為領(lǐng)導。 每當隊列的頭部被具有較早到期時間的任務(wù)替換時贱迟,leader 字段將通過重置為空而無效姐扮,并且一些等待線程(但不一定是當前的leader)被發(fā)出信號。因此衣吠,等待線程必須準備好在等待期間獲得和失去領(lǐng)導權(quán)

核心方法:offer添加元素

ScheduledThreadPoolExecutor提交任務(wù)時調(diào)用的是DelayedWorkQueue.add茶敏,而addput等一些對外提供的添加元素的方法都調(diào)用了offer缚俏,其基本流程如下:

  • 其作為生產(chǎn)者的入口惊搏,首先獲取鎖。
  • 判斷隊列是否要滿了(size >= queue.length)忧换,滿了就擴容grow()恬惯。
  • 隊列未滿,size+1亚茬。
  • 判斷添加的元素是否是第一個酪耳,是則不需要堆化。
  • 添加的元素不是第一個刹缝,則需要堆化siftUp碗暗。
  • 如果堆頂元素剛好是此時被添加的元素,則喚醒take線程消費梢夯。
  • 最終釋放鎖言疗。
public boolean offer(Runnable x) {
    if (x == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = size;
        if (i >= queue.length)
            //擴容
            grow();
        size = i + 1;
        if (i == 0) {
            //如果是入的是第一個元素,不需要堆化
            queue[0] = e;
            setIndex(e, 0);
        } else {
            //堆化
            siftUp(i, e);
        }
        if (queue[0] == e) {
            //如果堆頂元素剛好是入隊列的元素厨疙,則喚醒take
            leader = null;
            available.signal();
        }
    } finally {
        lock.unlock();
    }
    return true;
}

image.png

核心方法:siftUp向上堆化

新添加的元素先會加到堆底洲守,然后一步步和上面的父親節(jié)點比較,若小于父親節(jié)點則和父親節(jié)點互換位置沾凄,循環(huán)比較直至大于父親節(jié)點才結(jié)束循環(huán)梗醇。

private void siftUp(int k, RunnableScheduledFuture<?> key) {
    while (k > 0) {
        //找到父親節(jié)點
        int parent = (k - 1) >>> 1;
        RunnableScheduledFuture<?> e = queue[parent];
        if (key.compareTo(e) >= 0)
            // 添加的元素 大于父親節(jié)點,則結(jié)束循環(huán)
            break;
        //添加的元素小于父親節(jié)點撒蟀,則位置互換
        queue[k] = e;
        setIndex(e, k);
        k = parent;
    }
    queue[k] = key;
    setIndex(key, k);
}

image.png

核心方法:take消費元素

Worker工作線程啟動后就會調(diào)用take方法循環(huán)消費工作隊列中的元素

take基本流程如下:

  • 首先獲取可中斷鎖叙谨,判斷堆頂元素是否是空,空的則阻塞等待available.await()保屯。
  • 堆頂元素不為空手负,則獲取其延遲執(zhí)行時間delay涤垫,delay <= 0說明到了執(zhí)行時間,出隊列finishPoll竟终。
  • delay > 0還沒到執(zhí)行時間蝠猬,判斷leader線程是否為空,不為空則說明有其他take線程也在等待统捶,當前take將無限期阻塞等待榆芦。
  • leader線程為空,當前take線程設(shè)置為leader喘鸟,并阻塞等待delay時長匆绣。
  • 當前l(fā)eader線程等待delay時長自動喚醒護著被其他take線程喚醒,則最終將leader設(shè)置為null什黑。
  • 再循環(huán)一次判斷delay <= 0出隊列崎淳。
  • 跳出循環(huán)后判斷l(xiāng)eader為空并且堆頂元素不為空,則喚醒其他take線程愕把,最后是否鎖拣凹。
public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            RunnableScheduledFuture<?> first = queue[0]; //取出堆頂元素
            if (first == null)
                //堆為空,等待
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    //到了執(zhí)行時間礼华,出隊列
                    return finishPoll(first);
                first = null; // don't retain ref while waiting
                //還沒到執(zhí)行時間
                if (leader != null)
                    //此時若有其他take線程在等待咐鹤,當前take將無限期等待
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}

image.png
image.png

核心方法:finishPoll出隊列

堆頂元素delay<=0,執(zhí)行時間到圣絮,出隊列就是一個向下堆化的過程siftDown

private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
    int s = --size;
    RunnableScheduledFuture<?> x = queue[s];
    queue[s] = null;
    if (s != 0)
        siftDown(0, x);
    setIndex(f, -1);
    return f;
}

核心方法:siftDown向下堆化

由于堆頂元素出隊列后祈惶,就破壞了堆的結(jié)構(gòu),需要組織整理下扮匠,將堆尾元素移到堆頂捧请,然后向下堆化:

  • 從堆頂開始,父親節(jié)點與左右子節(jié)點中較小的孩子節(jié)點比較(左孩子不一定小于右孩子)棒搜。
  • 父親節(jié)點小于等于較小孩子節(jié)點疹蛉,則結(jié)束循環(huán),不需要交換位置力麸。
  • 若父親節(jié)點大于較小孩子節(jié)點可款,則交換位置。
  • 繼續(xù)向下循環(huán)判斷父親節(jié)點和孩子節(jié)點的關(guān)系克蚂,直到父親節(jié)點小于等于較小孩子節(jié)點才結(jié)束循環(huán)闺鲸。
private void siftDown(int k, RunnableScheduledFuture<?> key) {
    //k = 0, key = queue[size-1]
    //無符號右移,相當于size/2
    int half = size >>> 1;
    while (k < half) {
        //只需要比較一半
        //找到左孩子節(jié)點
        // child = 2k + 1
        int child = (k << 1) + 1;
        RunnableScheduledFuture<?> c = queue[child];
        //右孩子節(jié)點
        int right = child + 1;
        //比較左右孩子大小
        if (right < size && c.compareTo(queue[right]) > 0)
            //c左孩子大于右孩子埃叭,則將有孩子賦值給左孩子
            c = queue[child = right];
        //比較key和孩子c
        if (key.compareTo(c) <= 0)
            //key小于等于c摸恍,則結(jié)束循環(huán)
            break;
        //key大于孩子c,則key與孩子交換位置
        queue[k] = c;
        setIndex(c, k);
        k = child;
    }
    queue[k] = key;
    setIndex(key, k);
}

image.png

核心方法:compareTo比較器

延遲隊列是也是個優(yōu)先隊列 每次元素出隊和入隊也都需要經(jīng)過堆化調(diào)整順序

根據(jù)任務(wù)到期時間從小到大排序 通過調(diào)用compareTo方法完成大小判斷

public int compareTo(Delayed other) {
            if (other == this) // compare zero if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>) other;
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
        }

ScheduledThreadPool中的Leader-Follower模式

定時調(diào)度線程池與一般線程池的一個重要不同:提交的任務(wù)是延遲而非立即執(zhí)行的,因此worker線程調(diào)用隊列的take以取出執(zhí)行任務(wù)必定是要阻塞的立镶”诎溃考慮到延遲任務(wù)必然有一個先后順序 因此只需要一個線程使用timed waiting等待隊首任務(wù)(堆頂)即可

ScheduledThreadPool采用了Leader-Follower模式,等待第一個線程的任務(wù)也稱為leader媚媒,其調(diào)用available.awaitNanos待當前隊列頭部任務(wù)到達調(diào)度時間時喚醒嗜逻。其余線程作為follower只需調(diào)用await方法無限阻塞等待,直至被leader喚醒缭召,并重新完成搶鎖->嘗試執(zhí)行隊列首元素->搶leader->等待的循環(huán)变泄。

舉個例子來理解Leader-Follower模式,拿飯店員工來對照理解恼琼。

  1. 單Reactor多線程模式 飯店的員工一般都是分角色的,比如接待員屏富、服務(wù)員晴竞、廚師等等。
    1. 假如有一個叫做A的人狠半,固定他作為飯店接待員噩死,來客人了就分給客人一個座位號,然后交給其他服務(wù)員神年,比如B進行后續(xù)處理已维。
    2. B會根據(jù)座位號為客人引路,為客人點菜等等已日。
    3. 如果把A垛耳、B比作兩個線程,客人比作任務(wù)飘千,任務(wù)由A處理堂鲜,到交接給B處理,有一次線程上下文切換护奈。
  2. Leader-Follower模式 這次飯店不分角色了缔莲,每個人都是接待員和服務(wù)員,統(tǒng)稱為員工霉旗。
    1. 每次只能有一個員工在門口等待痴奏,比如A先在門口等待,其他員工在屋里歇著厌秒。
    2. 來客人了的話读拆,A會叫一個其他員工,比如B來門口接替自己简僧。
    3. 然后A開始為客人服務(wù)建椰,比如分配座位號,引路岛马,點菜等全流程服務(wù)棉姐。
    4. 拿線程來說的話屠列,就是接受任務(wù),處理任務(wù)都是由線程A負責伞矩,沒有線程上下文切換笛洛。
  3. DelayQueue的Leader-Follower模式 這次飯店也不分角色,都是員工乃坤,但是改變了經(jīng)營策略苛让,每個客人必須預(yù)約吃飯時間,預(yù)約采用APP預(yù)約湿诊。因為加入了延時狱杰,邏輯變得復雜了一些。
    1. 每次還是只能有一個員工在門口等待厅须,比如A先在門口等待仿畸,A看了眼預(yù)約登記表,發(fā)現(xiàn)離預(yù)約最早到店的時間還有30分鐘朗和,A就什么都不干了错沽,先休息30分鐘。
    2. 其他員工還是先在屋里歇著眶拉,但是因為采用APP預(yù)約千埃,客人約幾點都有可能,如果此時有客人約的是10分鐘后到店忆植,因為A要30分鐘后才能醒來干活放可,所以如果這位客人來了,門口就沒有人接待了唱逢。
    3. 對于這個問題吴侦,飯店的軟件系統(tǒng)在監(jiān)聽到最早到店時間變了的話,會再叫一個員工來門口等待坞古,此員工可能是新員工B备韧,也可能是叫醒了之前在門口休息的員工A。我們叫這位新員工X痪枫。
      1. 如果新員工X發(fā)現(xiàn)最早到店時間是現(xiàn)在织堂,或者客人已經(jīng)來了,就會叫一個員工C來門口接替自己奶陈,并立即開始為客人提供全流程服務(wù)易阳。
      2. 如果新員工X發(fā)現(xiàn)最早到店時間是10分鐘后,新員工X就像A之前一樣吃粒,什么都不干了潦俺,先休息10分鐘。
    4. 如果最早到店時間沒有變化,還是30分鐘后事示,軟件系統(tǒng)不會叫人早像,其他員工看到A在門口等待,自己可以安心的在屋里歇著肖爵,等著A叫人替換他卢鹦。
    5. 員工A在30分鐘后醒來,客人也到了劝堪,A會叫一個同事比如B接替自己冀自,而A為客人提供全流程服務(wù)。

問題解決

所以問題二到這里應(yīng)該能解釋了 DelayedWorkQueue延遲隊列只能有一個Leader線程 當Leader不為null 即有其他任務(wù)在等待執(zhí)行的時候 下一個take線程無法從堆頂獲取并啟動任務(wù) (這不廢話嗎 隊首的任務(wù)都等著呢 后面的任務(wù)肯定沒到時間懊肜病)所以其實不是這個原因

問題二之所以隔天9點才能開啟第二次任務(wù)是因為同一個任務(wù)執(zhí)行完畢之后才會把自己重新放回隊列 雖然算出來的執(zhí)行時間是8點 但是9點才被放回任務(wù)隊列 一放進去立馬被取出來了判斷一下delay時間 這個delay時間≤0 (小于0就屬于上一個任務(wù)超時了導致下一個任務(wù)延遲了)都統(tǒng)一算作時間到了 立即執(zhí)行

一個疑問:為什么不設(shè)計成支持多任務(wù)并列執(zhí)行呢 答:其實是支持多任務(wù)并列執(zhí)行的 只不過是不支持同一個任務(wù)同時并列執(zhí)行而已

同時問題一也可以解釋了 其實DelayWorkQueue是一個基于堆的優(yōu)先隊列 優(yōu)化了多個定時任務(wù)的執(zhí)行調(diào)度(避免無效監(jiān)聽等待和避免線程上下文切換)

但是如果你只有一個定時任務(wù) 直接while true sleep也是可以的

總結(jié)

  1. 任務(wù)執(zhí)行方法位于 ScheduledFutureTask的父類FutureTask.run() 該方法內(nèi)部捕獲異常且未打印異常信息熬粗,難以排查問題,同時周期性執(zhí)行任務(wù)會因為任務(wù)代碼拋異常而不再設(shè)置下次執(zhí)行時間和把自己放回延遲隊列的操作余境,即不會再周期性執(zhí)行 因此任務(wù)代碼一定要自行try-catch做異常處理而不要直接拋出
  2. ScheduledFutureTask通過一個變量就區(qū)分了延遲和周期性執(zhí)行荐糜,period=0延遲執(zhí)行,即只執(zhí)行一次葛超;period>0固定頻率周期執(zhí)行;period<0固定延遲時間周期執(zhí)行延塑,兩次任務(wù)開始執(zhí)行時間間隔受任務(wù)執(zhí)行耗時影響绣张。
  3. 如果周期性任務(wù)的執(zhí)行時長大于period,且看重執(zhí)行等間隔关带,建議使用scheduleWithFixedDelay()侥涵。
  4. 若周期性任務(wù)的執(zhí)行時長遠小于period,則可以使用scheduleAtFixedRate()宋雏。
  5. 如果周期性任務(wù)的執(zhí)行時長大于period 且使用的是scheduleAtFixedRate 則并不能實現(xiàn)固定頻率執(zhí)行
  6. 當線程池處于關(guān)閉狀態(tài)(shutdown)芜飘,周期性任務(wù)會被取消和阻止執(zhí)行,非周期性任務(wù)會順利執(zhí)行完成不會被阻止磨总。
  7. 同一個周期性定時任務(wù)出隊執(zhí)行完之后才會重新入隊
  8. DelayedWorkQueue通過全局可重入鎖來實現(xiàn)同步
  9. DelayedWorkQueue常用于定時任務(wù)
  10. DelayedWorkQueue內(nèi)部使用優(yōu)先級隊列來存儲
  11. DelayedWorkQueue添加元素滿了之后會自動擴容原來容量的1/2嗦明,即永遠不會阻塞,最大擴容可達Integer.MAX_VALUE蚪燕,所以線程池中至多有corePoolSize個工作線程正在運行娶牌。。
  12. DelayedWorkQueue 消費元素take馆纳,在堆頂元素為空和delay >0 時诗良,阻塞等待。
  13. DelayedWorkQueue 是一個生產(chǎn)永遠不會阻塞鲁驶,消費可以阻塞的生產(chǎn)者消費者模式鉴裹。
  14. DelayedWorkQueue 有一個leader線程的變量,是Leader-Follower模式的變種。當一個take線程變成leader線程時径荔,只需要等待下一次的延遲時間督禽,不是leader線程的其他take線程則需要等leader線程出隊列了才喚醒其他take線程 減少了不必要的定時等待。同時一個線程完成監(jiān)聽和任務(wù)執(zhí)行 也避免了調(diào)度線程和任務(wù)執(zhí)行線程的上下文切換成本

ScheduledThreadPoolExecutor與Timer的區(qū)別

JDK1.5開始提供ScheduledThreadPoolExecutor類猖凛,ScheduledThreadPoolExecutor類繼承ThreadPoolExecutor類重用線程池實現(xiàn)了任務(wù)的周期性調(diào)度功能赂蠢。在IDK1.5之前,實現(xiàn)任務(wù)的周期性調(diào)度主要使用的是Timer類和TimerTask類辨泳。

1.1線程角度

Timer是單線程模式虱岂,如果某個TimerTask任務(wù)的執(zhí)行時間比較久,會影響到其他任務(wù)的調(diào)度執(zhí)行菠红。ScheduledThreadPoolExecutor是多線程模式第岖,并且重用線程池,某個ScheduledFutureTask任務(wù)執(zhí)行的時間比較久试溯,不會影響到其他任務(wù)的調(diào)度執(zhí)行蔑滓。

1.2系統(tǒng)時間敏感度

Timer調(diào)度是基于操作系統(tǒng)的絕對時間的,對操作系統(tǒng)的時間敏感遇绞,一旦操作系統(tǒng)的時間改變键袱,則Timer的調(diào)度不再精確。ScheduledThreadPoolExecutor調(diào)度是基于相對時間的摹闽,不受操作系統(tǒng)時間改變的影響蹄咖。

1.3是否捕獲異常

Timer不會捕獲TimerTask拋出的異常,加上Timer又是單線程的付鹿。一旦某個調(diào)度任務(wù)出現(xiàn)異常則整個線程就會終止澜汤,其他需要調(diào)度的任務(wù)也不再執(zhí)行。ScheduledThreadPoolExecutor基于線程池來實現(xiàn)調(diào)度功能舵匾,某個任務(wù)拋出異常后俊抵,其他任務(wù)仍能正常執(zhí)行。

1.4任務(wù)是否具備優(yōu)先級

Timer中執(zhí)行的TimerTask任務(wù)整體上沒有優(yōu)先級的概念坐梯,只是按照系統(tǒng)的絕對時間來執(zhí)行任務(wù)徽诲。ScheduledThreadPoolExecutor中執(zhí)行的ScheduledFutureTask類實現(xiàn)了iavalang.Comparable接口和java.utilconcurrentDelayed接口,這也就說明了ScheduledFutureTask類中實現(xiàn)了兩個非常重要的方法吵血,一個是javalangComparable接口的compareTo方法馏段,一個是java.util.concurrentDelayed接口的getDelay方法。在ScheduledFutureTask類中compareTo方法方法實現(xiàn)了任務(wù)的比較践瓷,距離下次執(zhí)行的時間間隔短的任務(wù)會排在前面院喜,也就是說,距離下次執(zhí)行的時間間隔短的任務(wù)的優(yōu)先級比較高晕翠。而getDelay方法則能夠返回距離下次任務(wù)執(zhí)行的時間間隔喷舀。

1.5是否支持對任務(wù)排序

Timer不支持對任務(wù)的排序砍濒。ScheduledThreadPoolExecutor類中定義了一個靜態(tài)內(nèi)部類DelayedWorkQueue,DelayedWorkQueue類本質(zhì)上是一個有序隊列,為需要調(diào)度的每個任務(wù)按照距離下次執(zhí)行時間間隔的大小來排序

1.6能否獲取返回的結(jié)果

Timer中執(zhí)行的TimerTask類只是實現(xiàn)了iavaangRunnable接口硫麻,無法從TimerTask中獲取返回的結(jié)果爸邢。ScheduledThreadPoolExecutor中執(zhí)行的ScheduledFutureTask類繼承了FutureTask類,能夠通過遍Future來獲取返回的結(jié)果拿愧。通過以上對ScheduledThreadPoolExecutor類和Timer類的對比杠河,相信在JDK1.5之后,就沒有使用Timer來實現(xiàn)定時任務(wù)調(diào)度的必要了浇辜。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末券敌,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子柳洋,更是在濱河造成了極大的恐慌待诅,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,525評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件熊镣,死亡現(xiàn)場離奇詭異卑雁,居然都是意外死亡,警方通過查閱死者的電腦和手機绪囱,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,203評論 3 395
  • 文/潘曉璐 我一進店門测蹲,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人鬼吵,你說我怎么就攤上這事弛房。” “怎么了而柑?”我有些...
    開封第一講書人閱讀 164,862評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長荷逞。 經(jīng)常有香客問我媒咳,道長,這世上最難降的妖魔是什么种远? 我笑而不...
    開封第一講書人閱讀 58,728評論 1 294
  • 正文 為了忘掉前任涩澡,我火速辦了婚禮,結(jié)果婚禮上坠敷,老公的妹妹穿的比我還像新娘妙同。我一直安慰自己,他們只是感情好膝迎,可當我...
    茶點故事閱讀 67,743評論 6 392
  • 文/花漫 我一把揭開白布粥帚。 她就那樣靜靜地躺著,像睡著了一般限次。 火紅的嫁衣襯著肌膚如雪芒涡。 梳的紋絲不亂的頭發(fā)上柴灯,一...
    開封第一講書人閱讀 51,590評論 1 305
  • 那天,我揣著相機與錄音费尽,去河邊找鬼赠群。 笑死,一個胖子當著我的面吹牛旱幼,可吹牛的內(nèi)容都是我干的查描。 我是一名探鬼主播,決...
    沈念sama閱讀 40,330評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼柏卤,長吁一口氣:“原來是場噩夢啊……” “哼冬三!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起闷旧,我...
    開封第一講書人閱讀 39,244評論 0 276
  • 序言:老撾萬榮一對情侶失蹤长豁,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后忙灼,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體匠襟,經(jīng)...
    沈念sama閱讀 45,693評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,885評論 3 336
  • 正文 我和宋清朗相戀三年该园,在試婚紗的時候發(fā)現(xiàn)自己被綠了酸舍。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,001評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡里初,死狀恐怖啃勉,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情双妨,我是刑警寧澤淮阐,帶...
    沈念sama閱讀 35,723評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站刁品,受9級特大地震影響泣特,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜挑随,卻給世界環(huán)境...
    茶點故事閱讀 41,343評論 3 330
  • 文/蒙蒙 一状您、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧兜挨,春花似錦膏孟、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,919評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至噪舀,卻和暖如春幕垦,著一層夾襖步出監(jiān)牢的瞬間丢氢,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,042評論 1 270
  • 我被黑心中介騙來泰國打工先改, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留疚察,地道東北人。 一個月前我還...
    沈念sama閱讀 48,191評論 3 370
  • 正文 我出身青樓仇奶,卻偏偏與公主長得像貌嫡,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子该溯,可洞房花燭夜當晚...
    茶點故事閱讀 44,955評論 2 355