Java線程池原理分析ScheduledThreadPoolExecutor篇

前言

在上一篇線程池的文章《Java線程池原理分析ThreadPoolExecutor篇》中從ThreadPoolExecutor源碼分析了其運(yùn)行機(jī)制。限于篇幅第岖,留下了ScheduledThreadPoolExecutor未做分析键袱,因此本文繼續(xù)從源代碼出發(fā)分析ScheduledThreadPoolExecutor的內(nèi)部原理。

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor同ThreadPoolExecutor一樣也可以從 Executors線程池工廠創(chuàng)建比藻,所不同的是它具有定時(shí)執(zhí)行,以周期或間隔循環(huán)執(zhí)行任務(wù)等功能。

public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

}

public interface ScheduledExecutorService extends ExecutorService {
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

}

ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor烛缔,因此它具有ThreadPoolExecutor的所有能力。
通過(guò)super方法的參數(shù)可知拿愧,核心線程的數(shù)量即傳入的參數(shù)杠河,而線程池的線程數(shù)為Integer.MAX_VALUE,幾乎為無(wú)上限柳洋。
這里采用了DelayedWorkQueue任務(wù)隊(duì)列陪白,也是定時(shí)任務(wù)的核心,留在后面分析膳灶。

ScheduledThreadPoolExecutor實(shí)現(xiàn)了ScheduledExecutorService 中的接口:

public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

延時(shí)執(zhí)行Callable任務(wù)的功能咱士。

 public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

延時(shí)執(zhí)行Runnable任務(wù)的功能。

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

可以延時(shí)循環(huán)執(zhí)行周期性任務(wù)轧钓。

假設(shè)任務(wù)執(zhí)行時(shí)間固定2s,period為1s序厉,因?yàn)槿蝿?wù)的執(zhí)行時(shí)間大于規(guī)定的period,所以任務(wù)會(huì)每隔2s(任務(wù)執(zhí)行時(shí)間)開(kāi)始執(zhí)行一次毕箍。如果任務(wù)執(zhí)行時(shí)間固定為0.5s,period為1s弛房,因?yàn)槿蝿?wù)執(zhí)行時(shí)間小于period,所以任務(wù)會(huì)每隔1s(period)開(kāi)始執(zhí)行一次而柑。實(shí)際任務(wù)的執(zhí)行時(shí)間即可能是大于period的文捶,也可能小于period,scheduleAtFixedRate的好處就是每次任務(wù)的開(kāi)始時(shí)間間隔必然大于等于period媒咳。

假設(shè)一項(xiàng)業(yè)務(wù)需求每天凌晨3點(diǎn)將數(shù)據(jù)庫(kù)備份粹排,然而數(shù)據(jù)庫(kù)備份的時(shí)間小于24H,最適合用scheduleAtFixedRate方法實(shí)現(xiàn)涩澡。

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

可以延時(shí)以相同間隔時(shí)間循環(huán)執(zhí)行任務(wù)顽耳。

假設(shè)任務(wù)執(zhí)行的時(shí)間固定為2s,delay為1s妙同,那么任務(wù)會(huì)每隔3s(任務(wù)時(shí)間+delay)開(kāi)始執(zhí)行一次射富。

如果業(yè)務(wù)需求本次任務(wù)的結(jié)束時(shí)間與下一個(gè)任務(wù)的開(kāi)始時(shí)間固定,使用scheduleWithFixedDelay可以方便地實(shí)現(xiàn)業(yè)務(wù)粥帚。

ScheduledFuture

四個(gè)執(zhí)行任務(wù)的方法都返回了ScheduledFuture對(duì)象胰耗,它與Future有什么區(qū)別?

public interface ScheduledFuture<V> extends Delayed, Future<V> {
}

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

public interface Comparable<T> {
    public int compareTo(T o);
}

可以看到ScheduledFuture也繼承了Future芒涡,并且繼承了Delayed柴灯,增加了getDelay方法卖漫,而Delayed繼承自Comparable,所以具有compareTo方法。

四種執(zhí)行定時(shí)任務(wù)的方法

schedule(Runnable command,long delay, TimeUnit unit)

    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }

這個(gè)方法中出現(xiàn)了幾個(gè)陌生的類弛槐,首先是ScheduledFutureTask:

private class ScheduledFutureTask<V>  extends FutureTask<V> implements RunnableScheduledFuture<V> {
            ...
            ScheduledFutureTask(Runnable r, V result, long ns) {
              super(r, result);
              this.time = ns;
              this.period = 0;
              this.sequenceNumber = sequencer.getAndIncrement();
           }
}

public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {
    boolean isPeriodic();
}

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

這個(gè)類是ScheduledThreadPoolExecutor的內(nèi)部類懊亡,繼承自FutureTask實(shí)現(xiàn)了RunnableScheduledFuture接口。RunnableScheduledFuture有些復(fù)雜乎串,繼承自RunnableFuture和ScheduledFuture接口店枣。可見(jiàn)ScheduledThreadPoolExecutor身兼多職叹誉。這個(gè)類既可以作為Runnable被線程執(zhí)行鸯两,又可以作為FutureTask用于獲取Callable任務(wù)call方法返回的結(jié)果。

在FutureTask的構(gòu)造方法中傳入Runnable對(duì)象會(huì)將其轉(zhuǎn)換為返回值為null的Callable對(duì)象长豁。

/**
     * Modifies or replaces the task used to execute a runnable.
     * This method can be used to override the concrete
     * class used for managing internal tasks.
     * The default implementation simply returns the given task.
     *
     * @param runnable the submitted Runnable
     * @param task the task created to execute the runnable
     * @param <V> the type of the task's result
     * @return a task that can execute the runnable
     * @since 1.6
     */
    protected <V> RunnableScheduledFuture<V> decorateTask(
        Runnable runnable, RunnableScheduledFuture<V> task) {
        return task;
    }

從decorateTask的字面意義判斷它將具體的RunnableScheduledFuture實(shí)現(xiàn)類向上轉(zhuǎn)型為RunnableScheduledFuture接口钧唐。從它的方法描述和實(shí)現(xiàn)看出它只是簡(jiǎn)單的將ScheduledFutureTask向上轉(zhuǎn)型為RunnableScheduledFuture接口,由protected 修飾符可知設(shè)計(jì)者希望子類擴(kuò)展這個(gè)方法的實(shí)現(xiàn)匠襟。

之所以向上轉(zhuǎn)型為RunnableScheduledFuture接口钝侠,設(shè)計(jì)者也是希望將具體與接口分離。

    private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            //情況一
            reject(task);
        else {
            super.getQueue().add(task);
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                //情況二
                task.cancel(false);
            else
                //情況三
                ensurePrestart();
        }
    }

    boolean canRunInCurrentRunState(boolean periodic) {
        return isRunningOrShutdown(periodic ?
                                   continueExistingPeriodicTasksAfterShutdown :
                                   executeExistingDelayedTasksAfterShutdown);
    }

    private volatile boolean executeExistingDelayedTasksAfterShutdown = true;

    final boolean isRunningOrShutdown(boolean shutdownOK) {
        int rs = runStateOf(ctl.get());
        return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
    }

delayedExecute方法負(fù)責(zé)執(zhí)行延時(shí)任務(wù)酸舍。
情況一 : 先判斷線程池是否關(guān)閉帅韧,若關(guān)閉則拒絕任務(wù)。
情況二:線程池未關(guān)閉啃勉,將任務(wù)添加到父類的任務(wù)隊(duì)列忽舟,即DelayedWorkQueue中。下面再次判斷線程池是否關(guān)閉淮阐,并且判斷canRunInCurrentRunState方法的返回值是否為false叮阅。因?yàn)閭魅隦unnable參數(shù),task.isPeriodic()為false泣特,所以isRunningOrShutdown返回true浩姥。所以這里不會(huì)執(zhí)行到。
情況三:任務(wù)成功添加到任務(wù)隊(duì)列群扶,執(zhí)行ensurePrestart方法及刻。

    void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
    }

addWorker已經(jīng)在ThreadPoolExecutor篇分析過(guò),該方法負(fù)責(zé)同步將線程池?cái)?shù)量+1竞阐,并且創(chuàng)建Worker對(duì)象添加到HashSet中,最后開(kāi)啟Worker對(duì)象中的線程暑劝。因?yàn)镽unnableScheduledFuture對(duì)象已經(jīng)被添加到任務(wù)隊(duì)列骆莹,Worker中的線程通過(guò)getTask方法自然會(huì)取到DelayedWorkQueue中的RunnableScheduledFuture任務(wù)并執(zhí)行它的run方法。

這里需要注意的是addWorker方法只在核心線程數(shù)未達(dá)上限或者沒(méi)有線程的情況下執(zhí)行,并不像ThreadPoolExecutor那樣可以同時(shí)存在多個(gè)非核心線程担猛,ScheduledThreadPoolExecutor最多只支持一個(gè)非核心線程幕垦,除非它終止了不會(huì)再創(chuàng)建新的非核心線程丢氢。

schedule(Callable<V> callable, long delay, TimeUnit unit)

    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        if (callable == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<V> t = decorateTask(callable,
            new ScheduledFutureTask<V>(callable,
                                       triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }

    protected <V> RunnableScheduledFuture<V> decorateTask(
        Callable<V> callable, RunnableScheduledFuture<V> task) {
        return task;
    }

     ScheduledFutureTask(Callable<V> callable, long ns) {
            super(callable);
            this.time = ns;
            this.period = 0;
            this.sequenceNumber = sequencer.getAndIncrement();
        }

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

與schedule(Runnable command,long delay,TimeUnit unit)相比除了可以通過(guò)ScheduledFutureTask的get方法得到返回值外沒(méi)有區(qū)別。

scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

與上述兩個(gè)方法的區(qū)別在于ScheduledFutureTask的構(gòu)造函數(shù)多了參數(shù)period,即任務(wù)執(zhí)行的最小周期:

        ScheduledFutureTask(Runnable r, V result, long ns, long period) {
            super(r, result);
            this.time = ns;
            this.period = period;
            this.sequenceNumber = sequencer.getAndIncrement();
        }

scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(-delay));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

與scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)的區(qū)別是參數(shù)delay傳入到ScheduledFutureTask的構(gòu)造方法中是以負(fù)數(shù)的形式先改。

小結(jié)

四種延時(shí)啟動(dòng)任務(wù)的方法除了構(gòu)造ScheduledFutureTask的參數(shù)不同外疚察,運(yùn)行機(jī)制是相同的。先將任務(wù)添加到DelayedWorkQueue 中仇奶,然后創(chuàng)建Worker對(duì)象貌嫡,啟動(dòng)內(nèi)部線程輪詢DelayedWorkQueue 中的任務(wù)。

那么DelayedWorkQueue的add方法是如何實(shí)現(xiàn)的该溯,線程輪詢DelayedWorkQueue 調(diào)用的poll和take方法又如何實(shí)現(xiàn)岛抄?

回顧getTask方法獲取任務(wù)時(shí)的代碼片段:

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();

如果我們?cè)O(shè)置ScheduledThreadPoolExecutor的核心線程數(shù)量為0,則執(zhí)行poll方法狈茉。而對(duì)于核心線程則執(zhí)行take方法夫椭。

下面分析DelayedWorkQueue 的具體實(shí)現(xiàn)。

DelayedWorkQueue

static class DelayedWorkQueue extends AbstractQueue<Runnable>
        implements BlockingQueue<Runnable> {
        private static final int INITIAL_CAPACITY = 16;
        private RunnableScheduledFuture<?>[] queue =
            new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
        private final ReentrantLock lock = new ReentrantLock();
        private int size = 0;
}

首先DelayedWorkQueue 是ScheduledThreadPoolExecutor的靜態(tài)內(nèi)部類氯庆。它的內(nèi)部有一個(gè)RunnableScheduledFuture數(shù)組蹭秋,且初始容量為16.這里提前說(shuō)明下,queue 數(shù)組儲(chǔ)存的其實(shí)是二叉樹(shù)結(jié)構(gòu)的索引堤撵,這個(gè)二叉樹(shù)其實(shí)就是最小堆仁讨。

add方法

        public boolean add(Runnable e) {
            return offer(e);
        }

        public boolean offer(Runnable x) {
            if (x == null)
                throw new NullPointerException();
            RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int i = size;
                if (i >= queue.length)
                    grow();
                size = i + 1;
                if (i == 0) {
                    queue[0] = e;
                    setIndex(e, 0);
                } else {
                    siftUp(i, e);
                }
                if (queue[0] == e) {
                    leader = null;
                    available.signal();
                }
            } finally {
                lock.unlock();
            }
            return true;
        }

        private void grow() {
            int oldCapacity = queue.length;
            int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
            if (newCapacity < 0) // overflow
                newCapacity = Integer.MAX_VALUE;
            queue = Arrays.copyOf(queue, newCapacity);
        }

        private void setIndex(RunnableScheduledFuture<?> f, int idx) {
            if (f instanceof ScheduledFutureTask)
                ((ScheduledFutureTask)f).heapIndex = idx;
        }

        private void siftUp(int k, RunnableScheduledFuture<?> key) {
            while (k > 0) {
                int parent = (k - 1) >>> 1;
                RunnableScheduledFuture<?> e = queue[parent];
                if (key.compareTo(e) >= 0)
                    break;
                queue[k] = e;
                setIndex(e, k);
                k = parent;
            }
            queue[k] = key;
            setIndex(key, k);
        }

在執(zhí)行add方法時(shí)內(nèi)部執(zhí)行的是offer方法,添加RunnableScheduledFuture任務(wù)到隊(duì)列時(shí)先通過(guò)內(nèi)部的ReentrantLock加鎖粒督,因此在多線程調(diào)用schedule(Runnable command,long delay, TimeUnit unit)添加任務(wù)時(shí)也能保證同步陪竿。

接下來(lái)先判斷隊(duì)列是否已滿,若已滿就先通過(guò)grow方法擴(kuò)容屠橄。擴(kuò)容算法是將現(xiàn)有容量*1.5族跛,然后將舊的數(shù)組復(fù)制到新的數(shù)組。(左移一位等于除以2)锐墙。

然后判斷插入的是否為第一個(gè)任務(wù)礁哄,如果是就將RunnableScheduledFuture向下轉(zhuǎn)型為ScheduledFutureTask,并將其heapIndex 屬性設(shè)置為0.

如果不是第一個(gè)任務(wù)溪北,則執(zhí)行siftUp方法桐绒。該方法先找到父親RunnableScheduledFuture對(duì)象節(jié)點(diǎn),將要插入的RunnableScheduledFuture節(jié)點(diǎn)與之compareTo比較之拨,若父親RunnableScheduledFuture對(duì)象的啟動(dòng)時(shí)間小于當(dāng)前要插入的節(jié)點(diǎn)的啟動(dòng)時(shí)間茉继,則將節(jié)點(diǎn)插入到末尾。反之會(huì)對(duì)二叉樹(shù)以啟動(dòng)時(shí)間升序重新排序RunnableScheduledFuture接口的實(shí)現(xiàn)其實(shí)是ScheduledFutureTask類:

    new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit);

    private long triggerTime(long delay, TimeUnit unit) {
        return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
    }

    /**
     * Returns the trigger time of a delayed action.
     */
    long triggerTime(long delay) {
        return now() +
            ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
    }

    final long now() {
        return System.nanoTime();
    }

第三個(gè)參數(shù)triggerTime方法返回的就是任務(wù)延時(shí)的時(shí)間加上當(dāng)前時(shí)間蚀乔。

在ScheduledFutureTask內(nèi)部實(shí)現(xiàn)了compareTo方法:

public int compareTo(Delayed other) {
            if (other == this) // compare zero if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
        }

比較的兩個(gè)任務(wù)的啟動(dòng)時(shí)間烁竭。所以DelayedWorkQueue內(nèi)部的二叉樹(shù)是以啟動(dòng)時(shí)間早晚排序的。

poll方法

        public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null) {
                        //情況一 空隊(duì)列
                        if (nanos <= 0)
                            return null;
                        else
                            nanos = available.awaitNanos(nanos);
                    } else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            //情況二 已到啟動(dòng)時(shí)間  
                            return finishPoll(first);
                        if (nanos <= 0)
                            //情況三 未到啟動(dòng)時(shí)間吉挣,但是線程等待超時(shí)
                            return null;
                        first = null; // don't retain ref while waiting
                        if (nanos < delay || leader != null)
                            nanos = available.awaitNanos(nanos);
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                long timeLeft = available.awaitNanos(delay);
                                nanos -= delay - timeLeft;
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }

非核心線程會(huì)通過(guò)poll方法同步獲取任務(wù)隊(duì)列中的RunnableScheduledFuture派撕,如果隊(duì)列為空或者在timeout內(nèi)還等不到任務(wù)的啟動(dòng)時(shí)間婉弹,都將返回null。如果任務(wù)隊(duì)列不為空终吼,并且首個(gè)任務(wù)已到啟動(dòng)時(shí)間線程就能夠獲取RunnableScheduledFuture任務(wù)并執(zhí)行run方法镀赌。

take方法

        public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null)
                        available.await();
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            return finishPoll(first);
                        first = null; // don't retain ref while waiting
                        if (leader != null)
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }

與非核心線程執(zhí)行的poll方法相比,核心線程執(zhí)行的take方法并不會(huì)超時(shí)际跪,在獲取到首個(gè)將要啟動(dòng)的任務(wù)前商佛,核心線程會(huì)一直阻塞。

finishPoll方法

        private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
            int s = --size;
            RunnableScheduledFuture<?> x = queue[s];
            queue[s] = null;
            if (s != 0)
                siftDown(0, x);
            setIndex(f, -1);
            return f;
        }

在成功獲取任務(wù)后垫卤,DelayedWorkQueue的finishPoll方法會(huì)將任務(wù)移除隊(duì)列威彰,并以啟動(dòng)時(shí)間升序重排二叉樹(shù)。

小結(jié)

DelayedWorkQueue內(nèi)部維持了一個(gè)以任務(wù)啟動(dòng)時(shí)間升序排序的二叉樹(shù)數(shù)組穴肘,啟動(dòng)時(shí)間最靠前的任務(wù)即數(shù)組的首個(gè)位置上的任務(wù)歇盼。核心線程通過(guò)take方法一直阻塞直到獲取首個(gè)要啟動(dòng)的任務(wù)。非核心線程通過(guò)poll方法會(huì)在timeout時(shí)間內(nèi)阻塞嘗試獲取首個(gè)要啟動(dòng)的任務(wù)评抚,如果超過(guò)timeout未得到任務(wù)不會(huì)繼續(xù)阻塞豹缀。

這里要特別說(shuō)明要啟動(dòng)的任務(wù)指的是RunnableScheduledFuture內(nèi)部的time減去當(dāng)前時(shí)間小于等于0,未滿足條件的任務(wù)不會(huì)被take或poll方法返回慨代,這也就保證了未到指定時(shí)間任務(wù)不會(huì)執(zhí)行邢笙。

執(zhí)行ScheduledFutureTask

前面已經(jīng)分析了schedule方法如何將RunnableScheduledFuture插入到DelayedWorkQueue,Worker內(nèi)的線程如何獲取定時(shí)任務(wù)。下面分析任務(wù)的執(zhí)行過(guò)程侍匙,即ScheduledFutureTask的run方法:

        public void run() {
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();
                reExecutePeriodic(outerTask);
            }
        }

如果執(zhí)行的是非周期型任務(wù)氮惯,調(diào)用ScheduledFutureTask.super.run()方法,即ScheduledFutureTask的父類FutureTask的run方法想暗。FutureTask的run方法已經(jīng)在ThreadPoolExecutor篇分析過(guò)妇汗,這里不再多說(shuō)。

如果執(zhí)行的是周期型任務(wù)说莫,則執(zhí)行ScheduledFutureTask.super.runAndReset():

    protected boolean runAndReset() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    c.call(); // don't set result
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        return ran && s == NEW;
    }

這個(gè)方法同run方法比較的區(qū)別是call方法執(zhí)行后不設(shè)置結(jié)果杨箭,因?yàn)橹芷谛腿蝿?wù)會(huì)多次執(zhí)行,所以為了讓FutureTask支持這個(gè)特性除了發(fā)生異常不設(shè)置結(jié)果储狭。

執(zhí)行完任務(wù)后通過(guò)setNextRunTime方法計(jì)算下一次啟動(dòng)時(shí)間:

        private void setNextRunTime() {
            long p = period;
            if (p > 0)
                //情況一
                time += p;
            else
                //情況二
                time = triggerTime(-p);
        }

        long triggerTime(long delay) {
            return now() +
                ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
        }

還記得ScheduledThreadPoolExecutor執(zhí)行定時(shí)任務(wù)的后兩種scheduleAtFixedRate和scheduleWithFixedDelay互婿。
scheduleAtFixedRate會(huì)執(zhí)行到情況一,下一次任務(wù)的啟動(dòng)時(shí)間最早為上一次任務(wù)的啟動(dòng)時(shí)間加period辽狈。
scheduleWithFixedDelay會(huì)執(zhí)行到情況二慈参,這里很巧妙的將period參數(shù)設(shè)置為負(fù)數(shù)到達(dá)這段代碼塊,在此又將負(fù)的period轉(zhuǎn)為正數(shù)刮萌。情況二將下一次任務(wù)的啟動(dòng)時(shí)間設(shè)置為當(dāng)前時(shí)間加period懂牧。

然后將任務(wù)再次添加到任務(wù)隊(duì)列:

    void reExecutePeriodic(RunnableScheduledFuture<?> task) {
        if (canRunInCurrentRunState(true)) {
            super.getQueue().add(task);
            if (!canRunInCurrentRunState(true) && remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }

ScheduledFuture的get方法

既然ScheduledFuture的實(shí)現(xiàn)是ScheduledFutureTask,而ScheduledFutureTask繼承自FutureTask尊勿,所以ScheduledFuture的get方法的實(shí)現(xiàn)就是FutureTask的get方法的實(shí)現(xiàn)僧凤,F(xiàn)utureTask的get方法的實(shí)現(xiàn)分析在ThreadPoolExecutor篇已經(jīng)寫(xiě)過(guò),這里不再敘述元扔。要注意的是ScheduledFuture的get方法對(duì)于非周期任務(wù)才是有效的躯保。

ScheduledThreadPoolExecutor總結(jié)

  • ScheduledThreadPoolExecutor是實(shí)現(xiàn)自ThreadPoolExecutor的線程池,構(gòu)造方法中傳入?yún)?shù)n澎语,則最多會(huì)有n個(gè)核心線程工作途事,空閑的核心線程不會(huì)被自動(dòng)終止,而是一直阻塞在DelayedWorkQueue的take方法嘗試獲取任務(wù)。構(gòu)造方法傳入的參數(shù)為0擅羞,ScheduledThreadPoolExecutor將以非核心線程工作尸变,并且最多只會(huì)創(chuàng)建一個(gè)非核心線程,參考上文中ensurePrestart方法的執(zhí)行過(guò)程减俏。而這個(gè)非核心線程以poll方法獲取定時(shí)任務(wù)之所以不會(huì)因?yàn)槌瑫r(shí)就被回收召烂,是因?yàn)槿蝿?wù)隊(duì)列并不為空,只有在任務(wù)隊(duì)列為空時(shí)才會(huì)將空閑線程回收娃承,詳見(jiàn)ThreadPoolExecutor篇的runWorker方法,之前我以為空閑的非核心線程超時(shí)就會(huì)被回收是不正確的,還要具備任務(wù)隊(duì)列為空這個(gè)條件奏夫。
  • ScheduledThreadPoolExecutor的定時(shí)執(zhí)行任務(wù)依賴于DelayedWorkQueue,其內(nèi)部用可擴(kuò)容的數(shù)組實(shí)現(xiàn)以啟動(dòng)時(shí)間升序的二叉樹(shù)历筝。
  • 工作線程嘗試獲取DelayedWorkQueue的任務(wù)只有在任務(wù)到達(dá)指定時(shí)間才會(huì)成功酗昼,否則非核心線程會(huì)超時(shí)返回null,核心線程一直阻塞梳猪。
  • 對(duì)于非周期型任務(wù)只會(huì)執(zhí)行一次并且可以通過(guò)ScheduledFuture的get方法阻塞得到結(jié)果麻削,其內(nèi)部實(shí)現(xiàn)依賴于FutureTask的get方法。
  • 周期型任務(wù)通過(guò)get方法無(wú)法獲取有效結(jié)果春弥,因?yàn)镕utureTask對(duì)于周期型任務(wù)執(zhí)行的是runAndReset方法呛哟,并不會(huì)設(shè)置結(jié)果。周期型任務(wù)執(zhí)行完畢后會(huì)重新計(jì)算下一次啟動(dòng)時(shí)間并且再次添加到DelayedWorkQueue中惕稻。

在源代碼的分析過(guò)程中發(fā)現(xiàn)分析DelayedWorkQueue還需要有二叉樹(shù)的升序插入算法的知識(shí)竖共,一開(kāi)始也沒(méi)有認(rèn)出來(lái)這種數(shù)據(jù)結(jié)構(gòu),后來(lái)又看了別人的文章才了解俺祠。這里比較難理解公给,有興趣的同學(xué)可以參考《深度解析Java8 – ScheduledThreadPoolExecutor源碼解析》

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末蜘渣,一起剝皮案震驚了整個(gè)濱河市淌铐,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌蔫缸,老刑警劉巖腿准,帶你破解...
    沈念sama閱讀 218,204評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡吐葱,警方通過(guò)查閱死者的電腦和手機(jī)街望,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)弟跑,“玉大人灾前,你說(shuō)我怎么就攤上這事∶霞” “怎么了哎甲?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,548評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)饲嗽。 經(jīng)常有香客問(wèn)我炭玫,道長(zhǎng),這世上最難降的妖魔是什么貌虾? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,657評(píng)論 1 293
  • 正文 為了忘掉前任吞加,我火速辦了婚禮,結(jié)果婚禮上酝惧,老公的妹妹穿的比我還像新娘榴鼎。我一直安慰自己,他們只是感情好晚唇,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,689評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布巫财。 她就那樣靜靜地躺著,像睡著了一般哩陕。 火紅的嫁衣襯著肌膚如雪平项。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,554評(píng)論 1 305
  • 那天悍及,我揣著相機(jī)與錄音闽瓢,去河邊找鬼。 笑死心赶,一個(gè)胖子當(dāng)著我的面吹牛扣讼,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播缨叫,決...
    沈念sama閱讀 40,302評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼椭符,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了耻姥?” 一聲冷哼從身側(cè)響起销钝,我...
    開(kāi)封第一講書(shū)人閱讀 39,216評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎琐簇,沒(méi)想到半個(gè)月后蒸健,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,661評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,851評(píng)論 3 336
  • 正文 我和宋清朗相戀三年似忧,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了渣叛。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,977評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡橡娄,死狀恐怖诗箍,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情挽唉,我是刑警寧澤,帶...
    沈念sama閱讀 35,697評(píng)論 5 347
  • 正文 年R本政府宣布筷狼,位于F島的核電站瓶籽,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏埂材。R本人自食惡果不足惜塑顺,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,306評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望俏险。 院中可真熱鬧严拒,春花似錦、人聲如沸竖独。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,898評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)莹痢。三九已至种蘸,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間竞膳,已是汗流浹背航瞭。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,019評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留坦辟,地道東北人刊侯。 一個(gè)月前我還...
    沈念sama閱讀 48,138評(píng)論 3 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像锉走,于是被迫代替她去往敵國(guó)和親滨彻。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,927評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • 前言 使用線程池能夠提高線程的復(fù)用率挠日,避免不必要的創(chuàng)建線程疮绷,能夠節(jié)約內(nèi)存空間和CPU運(yùn)行時(shí)間。除此之外用線程池作為...
    Mars_M閱讀 2,846評(píng)論 0 11
  • 博客鏈接:http://www.ideabuffer.cn/2017/04/14/深入理解Java線程池:Sche...
    閃電是只貓閱讀 63,819評(píng)論 17 95
  • 前言:線程是稀缺資源嚣潜,如果被無(wú)限制的創(chuàng)建冬骚,不僅會(huì)消耗系統(tǒng)資源,還會(huì)降低系統(tǒng)的穩(wěn)定性,合理的使用線程池對(duì)線程進(jìn)行統(tǒng)一...
    SDY_0656閱讀 716評(píng)論 0 1
  • 晨讀材料 今天的晨讀材料可以概括為兩句話只冻,明確目標(biāo)庇麦、過(guò)程管控笼裳。 明確目標(biāo)很重要巡社,不管是一個(gè)公司、一個(gè)團(tuán)隊(duì)或者一個(gè)個(gè)...
    做一個(gè)更好的普通人閱讀 280評(píng)論 0 0
  • 元宵佳節(jié)听系,本是冷門(mén)的廣場(chǎng)因?yàn)闊魰?huì)的到來(lái)舍悯,立刻變得熱鬧非凡起來(lái)航棱,五彩繽紛的燈在廣場(chǎng)中央,引游人駐目萌衬。
    起始之點(diǎn)閱讀 242評(píng)論 0 0