深入理解Java線程池

深入理解Java線程池

線程池初探

所謂線程池,就是將多個線程放在一個池子里面(所謂池化技術(shù))束析,然后需要線程的時候不是創(chuàng)建一個線程帘皿,而是從線程池里面獲取一個可用的線程,然后執(zhí)行我們的任務(wù)畸陡。線程池的關(guān)鍵在于它為我們管理了多個線程鹰溜,我們不需要關(guān)心如何創(chuàng)建線程,我們只需要關(guān)系我們的核心業(yè)務(wù)丁恭,然后需要線程來執(zhí)行任務(wù)的時候從線程池中獲取線程曹动。任務(wù)執(zhí)行完之后線程不會被銷毀,而是會被重新放到池子里面牲览,等待機會去執(zhí)行任務(wù)墓陈。

我們?yōu)槭裁葱枰€程池呢?首先一點是線程池為我們提高了一種簡易的多線程編程方案,我們不需要投入太多的精力去管理多個線程贡必,線程池會自動幫我們管理好兔港,它知道什么時候該做什么事情,我們只要在需要的時候去獲取就可以了仔拟。其次衫樊,我們使用線程池很大程度上歸咎于創(chuàng)建和銷毀線程的代價是非常昂貴的,甚至我們創(chuàng)建和銷毀線程的資源要比我們實際執(zhí)行的任務(wù)所花費的時間還要長利花,這顯然是不科學也是不合理的科侈,而且如果沒有一個合理的管理者,可能會出現(xiàn)創(chuàng)建了過多的線程的情況炒事,也就是在JVM中存活的線程過多臀栈,而存活著的線程也是需要銷毀資源的,另外一點挠乳,過多的線程可能會造成線程過度切換的尷尬境地权薯。

對線程池有了一個初步的認識之后,我們來看看如何使用線程池睡扬。

        // 創(chuàng)建一個線程池
        ExecutorService executorService = Executors.newFixedThreadPool(1);

        // 提交任務(wù)
        executorService.submit(() -> System.out.println("run"));
        Future<String> stringFuture = executorService.submit(() -> "run");

        // 創(chuàng)建一個調(diào)度線程池
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);

        // 提交一個周期性執(zhí)行的任務(wù)
        scheduledExecutorService.scheduleAtFixedRate(() -> System.out.println("schedule"),0,1, TimeUnit.SECONDS);

        // shutdown
        executorService.shutdown();
        scheduledExecutorService.shutdown();

可以發(fā)現(xiàn)使用線程池非常簡單崭闲,只需要極少的代碼就可以創(chuàng)建出我們需要的線程池,然后將我們的任務(wù)提交到線程池中去威蕉。我們只需要在結(jié)束之時記得關(guān)閉線程池就可以了刁俭。本文的重點并非在于如何使用線程池,而是試圖剖析線程池的實現(xiàn)韧涨,比如一個調(diào)度線程池是怎么實現(xiàn)的牍戚?是靠什么實現(xiàn)的?為什么能這樣實現(xiàn)等等問題虑粥。

Java線程池實現(xiàn)架構(gòu)

Java中與線程池相關(guān)的類都在java.util.concurrent包下如孝,如下展示了一些:

  • Executor
  • ExecutorService
  • ScheduledExecutorService
  • ThreadPoolExecutor
  • ScheduledThreadPoolExecutor
  • Executors

通過上面一節(jié)中的使用示例,可以發(fā)現(xiàn)Executors類是一個創(chuàng)建線程池的有用的類娩贷,事實上第晰,Executors類的角色也就是創(chuàng)建線程池,它是一個工廠類彬祖,可以產(chǎn)生不同類型的線程池茁瘦。而Executor是線程池的鼻祖類,它有兩個子類是ExecutorServiceScheduledExecutorService储笑,而ThreadPoolExecutorScheduledThreadPoolExecutor則是真正的線程池甜熔,我們的任務(wù)將被這兩個類交由其所管理者的線程池運行,可以發(fā)現(xiàn)突倍,ScheduledThreadPoolExecutor是一個萬千寵愛于一身的類腔稀,下面我們可以看看它的類關(guān)系圖:

ScheduledThreadPoolExecutor類圖

ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor盆昙,ThreadPoolExecutor實現(xiàn)了一般的線程池,沒有調(diào)度功能焊虏,而ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor的實現(xiàn)淡喜,然后增加了調(diào)度功能沉颂。

最為原始的Executor只有一個方法execute鸟整,它接受一個Runnable類型的參數(shù),意思是使用線程池來執(zhí)行這個Runnable宜猜,可以發(fā)現(xiàn)Executor不提供有返回值的任務(wù)涂圆。ExecutorService繼承了Executor们镜,并且極大的增強了Executor的功能币叹,不僅支持有返回值的任務(wù)執(zhí)行润歉,而且還有很多十分有用的方法來為你提供服務(wù),下面展示了ExecutorService提供的方法:

@ExecutorService提供的方法|center

ScheduledExecutorService繼承了ExecutorService颈抚,并且增加了特有的調(diào)度(schedule)功能踩衩。關(guān)于Executor、ExecutorService和ScheduledExecutorService的關(guān)系贩汉,可以見下圖:

Executor驱富、ExecutorService和ScheduledExecutorService的關(guān)系|center

總結(jié)一下,經(jīng)過我們的調(diào)研匹舞,可以發(fā)現(xiàn)其實對于我們編寫多線程代碼來說褐鸥,最為核心的是Executors類,根據(jù)我們是需要ExecutorService類型的線程池還是ScheduledExecutorService類型的線程池調(diào)用相應(yīng)的工廠方法就可以了赐稽,而ExecutorService的實現(xiàn)表現(xiàn)在ThreadPoolExecutor上叫榕,ScheduledExecutorService的實現(xiàn)則表現(xiàn)在ScheduledThreadPoolExecutor上,下文將分別剖析這兩者姊舵,嘗試弄清楚線程池的原理晰绎。

ThreadPoolExecutor解析

上文中描述了Java中線程池相關(guān)的架構(gòu),了解了這些內(nèi)容其實我們就可以使用java的線程池為我們工作了括丁,使用其提供的線程池我們可以很方便的寫出高質(zhì)量的多線程代碼荞下,本節(jié)將分析ThreadPoolExecutor的實現(xiàn),來探索線程池的運行原理史飞。下面的圖片展示了ThreadPoolExecutor的類圖:

@ThreadPoolExecutor的類圖|center

private final BlockingQueue<Runnable> workQueue;  // 任務(wù)隊列尖昏,我們的任務(wù)會添加到該隊列里面,線程將從該隊列獲取任務(wù)來執(zhí)行
 
private final HashSet<Worker> workers = new HashSet<Worker>();//所有工作線程的集合构资,來消費workQueue里面的任務(wù)
  
private volatile ThreadFactory threadFactory;//線程工廠
      
private volatile RejectedExecutionHandler handler;//拒絕策略会宪,默認會拋出異常,還要其他幾種拒絕策略如下:
1蚯窥、CallerRunsPolicy:在調(diào)用者線程里面運行該任務(wù)
2掸鹅、DiscardPolicy:丟棄任務(wù)
3塞帐、DiscardOldestPolicy:丟棄workQueue的頭部任務(wù)
private volatile int corePoolSize;//最下保活work數(shù)量
 
private volatile int maximumPoolSize;//work上限

我們嘗試執(zhí)行submit方法巍沙,下面是執(zhí)行的關(guān)鍵路徑葵姥,總結(jié)起來就是:如果Worker數(shù)量還沒達到上限則繼續(xù)創(chuàng)建,否則提交任務(wù)到workQueue句携,然后讓worker來調(diào)度運行任務(wù)榔幸。
<a name="anchor"></a>

step 1: <ExecutorService>
Future<?> submit(Runnable task); 
 
step 2:<AbstractExecutorService>
    public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}
 
step 3:<Executor>
void execute(Runnable command);

step 4:<ThreadPoolExecutor>
 public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) { //提交我們的任務(wù)到workQueue
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false)) //使用maximumPoolSize作為邊界
        reject(command); //還不行?拒絕提交的任務(wù)
}
step 5:<ThreadPoolExecutor>
private boolean addWorker(Runnable firstTask, boolean core)
 
 
step 6:<ThreadPoolExecutor>
w = new Worker(firstTask); //包裝任務(wù)
final Thread t = w.thread; //獲取線程(包含任務(wù))
workers.add(w);   // 任務(wù)被放到works中
t.start(); //執(zhí)行任務(wù)

上面的流程是高度概括的矮嫉,實際情況遠比這復(fù)雜得多削咆,但是我們關(guān)心的是怎么打通整個流程,所以這樣分析問題是沒有太大的問題的蠢笋。觀察上面的流程拨齐,我們發(fā)現(xiàn)其實關(guān)鍵的地方在于Worker,如果弄明白它是如何工作的昨寞,那么我們也就大概明白了線程池是怎么工作的了瞻惋。下面分析一下Worker類。

@Worker類圖|center

上面的圖片展示了Worker的類關(guān)系圖援岩,關(guān)鍵在于他實現(xiàn)了Runnable接口歼狼,所以問題的關(guān)鍵就在于run方法上。在這之前享怀,我們來看一下Worker類里面的關(guān)鍵成員:

/** Thread this worker is running in.  Null if factory fails. */
final Thread thread;
/** Initial task to run.  Possibly null. */
Runnable firstTask; // 我們提交的任務(wù)羽峰,可能被立刻執(zhí)行,也可能被放到隊列里面

thread是Worker的工作線程添瓷,上面的分析我們也發(fā)現(xiàn)了在addWorker中會獲取worker里面的thread然后start梅屉,也就是這個線程的執(zhí)行,而Worker實現(xiàn)了Runnable接口仰坦,所以在構(gòu)造thread的時候Worker將自己傳遞給了構(gòu)造函數(shù)履植,thread.start執(zhí)行的其實就是Worker的run方法。下面是run方法的內(nèi)容:

/** Delegates main run loop to outer runWorker  */
public void run() {
     runWorker(this);
}
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
}

我們來分析一下runWorker這個方法悄晃,這就是整個線程池的核心玫霎。首先獲取到了我們剛提交的任務(wù)firstTask,然后會循環(huán)從workQueue里面獲取任務(wù)來執(zhí)行妈橄,獲取任務(wù)的方法如下:

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
    }
}

其實核心也就一句:
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();

我們再回頭看一下execute庶近,其實我們上面只走了一條邏輯,在execute的時候眷蚓,我們的worker的數(shù)量還沒有到達我們設(shè)定的corePoolSize的時候鼻种,會走上面我們分析的邏輯,而如果達到了我們設(shè)定的閾值之后沙热,execute中會嘗試去提交任務(wù)叉钥,如果提交成功了就結(jié)束罢缸,否則會拒絕任務(wù)的提交。我們上面還提到一個成員:maximumPoolSize投队,其實線程池的最大的Worker數(shù)量應(yīng)該是maximumPoolSize枫疆,但是我們上面的分析是corePoolSize,這是因為我們的private boolean addWorker(Runnable firstTask, boolean core)的參數(shù)core的值來控制的敷鸦,core為true則使用corePoolSize來設(shè)定邊界息楔,否則使用maximumPoolSize來設(shè)定邊界。直觀的解釋一下扒披,當線程池里面的Worker數(shù)量還沒有到corePoolSize值依,那么新添加的任務(wù)會伴隨著產(chǎn)生一個新的worker,如果Worker的數(shù)量達到了corePoolSize碟案,那么就將任務(wù)存放在阻塞隊列中等待Worker來獲取執(zhí)行愿险,如果沒有辦法再向阻塞隊列放任務(wù)了,那么這個時候maximumPoolSize就變得有用了蟆淀,新的任務(wù)將會伴隨著產(chǎn)生一個新的Worker拯啦,如果線程池里面的Worker已經(jīng)達到了maximumPoolSize澡匪,那么接下來提交的任務(wù)只能被拒絕策略拒絕了熔任。可以參考下面的描述來理解:

 * When a new task is submitted in method {@link #execute(Runnable)},
 * and fewer than corePoolSize threads are running, a new thread is
 * created to handle the request, even if other worker threads are
 * idle.  If there are more than corePoolSize but less than
 * maximumPoolSize threads running, a new thread will be created only
 * if the queue is full.  By setting corePoolSize and maximumPoolSize
 * the same, you create a fixed-size thread pool. By setting
 * maximumPoolSize to an essentially unbounded value such as {@code
 * Integer.MAX_VALUE}, you allow the pool to accommodate an arbitrary
 * number of concurrent tasks. Most typically, core and maximum pool
 * sizes are set only upon construction, but they may also be changed
 * dynamically using {@link #setCorePoolSize} and {@link
 * #setMaximumPoolSize}.

在此需要說明一點唁情,有一個重要的成員:keepAliveTime疑苔,當線程池里面的線程數(shù)量超過corePoolSize了,那么超出的線程將會在空閑keepAliveTime之后被terminated甸鸟〉敕眩可以參考下面的文檔:

* If the pool currently has more than corePoolSize threads,
* excess threads will be terminated if they have been idle for more
* than the keepAliveTime (see {@link #getKeepAliveTime(TimeUnit)}).

ScheduledThreadPoolExecutor解析

ScheduledThreadPoolExecutor適用于延時執(zhí)行,或者周期性執(zhí)行的任務(wù)調(diào)度抢韭,ScheduledThreadPoolExecutor在實現(xiàn)上繼承了ThreadPoolExecutor薪贫,所以你依然可以將ScheduledThreadPoolExecutor當成ThreadPoolExecutor來使用,但是ScheduledThreadPoolExecutor的功能要強大得多刻恭,因為ScheduledThreadPoolExecutor可以根據(jù)設(shè)定的參數(shù)來周期性調(diào)度運行瞧省,下面的圖片展示了四個和周期性相關(guān)的方法:

@四個Scheduled方法|center

  • 如果你想延時一段時間然后運行一個Callable,那么使用的第一個方法
  • 如果你想延時一段時間之后運行一個Runnable鳍贾,那么使用第二個方法鞍匾;
  • 如果你想要延時一段時間,然后根據(jù)設(shè)定的參數(shù)周期執(zhí)行Runnable骑科,那么可以選擇第三個和第四個方法橡淑,第三個方法和第四個方法的區(qū)別在于:第三個方法嚴格按照規(guī)劃的時間路徑來執(zhí)行,比如周期為2咆爽,延時為0梁棠,那么執(zhí)行的序列為0置森,2,4符糊,6暇藏,8....,而第四個方法將基于上次執(zhí)行時間來規(guī)劃下次的執(zhí)行濒蒋,也就是在上次執(zhí)行完成之后再次執(zhí)行盐碱。比如上面的執(zhí)行序列0,2沪伙,4瓮顽,6,8...围橡,如果第2秒沒有被調(diào)度執(zhí)行暖混,而在第三秒的時候才被調(diào)度,那么下次執(zhí)行的時間不是4翁授,而是5拣播,以此類推。

下面來看一下這四個方法的一些細節(jié):

public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
}


public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        if (callable == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<V> t = decorateTask(callable,
            new ScheduledFutureTask<V>(callable,
                                       triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
}

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
}


public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(-delay));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
}

通過上面的代碼我們可以發(fā)現(xiàn)收擦,前兩個方法是類似的贮配,后兩個方法也是類似的。前兩個方法屬于一次性調(diào)度塞赂,所以period都為0泪勒,區(qū)別在于參數(shù)不同,一個是Runnable宴猾,而一個是Callable圆存,可笑的是,最后都變?yōu)榱薈allable了仇哆,見下面的構(gòu)造函數(shù):

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

對于后兩個方法沦辙,區(qū)別僅僅在于period的,scheduleWithFixedDelay對參數(shù)進行了操作讹剔,將原來的時間變?yōu)樨摂?shù)了油讯,而后面在計算下次被調(diào)度的時間的時候會根據(jù)這個參數(shù)的正負值來分別處理,正數(shù)代表scheduleAtFixedRate辟拷,而負數(shù)代表了scheduleWithFixedDelay撞羽。

一個需要被我們注意的細節(jié)是,以上四個方法最后都會調(diào)用一個方法: delayedExecute(t)衫冻,下面看一下這個方法:

private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
            reject(task);
        else {
            super.getQueue().add(task);
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
}

大概的意思就是先判斷線程池是否被關(guān)閉了诀紊,如果被關(guān)閉了,則拒絕任務(wù)的提交隅俘,否則將任務(wù)加入到任務(wù)隊列中去等待被調(diào)度執(zhí)行邻奠。最后的ensurePrestart的意思是需要確保線程池已經(jīng)被啟動起來了笤喳。下面是這個方法:

void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

主要是增加了一個沒有任務(wù)的worker,有什么用呢碌宴?我們還記得Worker的邏輯嗎杀狡?addWorker方法的執(zhí)行,會觸發(fā)Worker的run方法的執(zhí)行贰镣,然后runWorker方法就會被執(zhí)行呜象,而runWorker方法是循環(huán)從workQueue中取任務(wù)執(zhí)行的,所以確保線程池被啟動起來是重要的碑隆,而只需要簡單的執(zhí)行addWorker便會觸發(fā)線程池的啟動流程恭陡。對于調(diào)度線程池來說,只要執(zhí)行了addWorker方法上煤,那么線程池就會一直在后臺周期性的調(diào)度執(zhí)行任務(wù)休玩。

到此,似乎我們還是沒有鬧明白ScheduledThreadPoolExecutor是如何實現(xiàn)周期性的劫狠,上面講到四個scheduled方法時拴疤,我們沒有提一個重要的類:ScheduledFutureTask,對独泞,所有神奇的事情將會發(fā)生在這個類中呐矾,下面來分析一下這個類。


@ScheduledFutureTask類圖|center

看上面的類圖阐肤,貌似這個類非常復(fù)雜凫佛,還好讲坎,我們發(fā)現(xiàn)他實現(xiàn)了Runnable接口孕惜,那么必然會有一個run方法,而這個run方法必然是整個類的核心晨炕,下面來看一下這個run方法的內(nèi)容:

       public void run() {
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();
                reExecutePeriodic(outerTask);
            }
        }

首先衫画,判斷是否是周期性的任務(wù),如果不是瓮栗,則直接執(zhí)行(一次性)削罩,否則執(zhí)行后,然后設(shè)置下次執(zhí)行的時間费奸,然后重新調(diào)度弥激,等待下次執(zhí)行。這里有一個方法需要注意愿阐,也就是setNextRunTime微服,上面我們提到scheduleAtFixedRate和scheduleWithFixedDelay在傳遞參數(shù)時不一樣,后者將delay值變?yōu)榱素摂?shù)缨历,所以下面的處理正好印證了前文所述以蕴。

        /**
         * Sets the next time to run for a periodic task.
         */
        private void setNextRunTime() {
            long p = period;
            if (p > 0)
                time += p;
            else
                time = triggerTime(-p);
        }

下面來看一下reExecutePeriodic方法是如何做的糙麦,他的目標是將任務(wù)再次被調(diào)度執(zhí)行,下面的代碼展示了這個功能的實現(xiàn):

    void reExecutePeriodic(RunnableScheduledFuture<?> task) {
        if (canRunInCurrentRunState(true)) {
            super.getQueue().add(task);
            if (!canRunInCurrentRunState(true) && remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }

可以看到丛肮,這個方法就是將我們的任務(wù)再次放到了workQueue里面赡磅,那這個參數(shù)是什么?在上面的run方法中我們調(diào)用了reExecutePeriodic方法宝与,參數(shù)為outerTask焚廊,而這個變量是什么?看下面的代碼:

/** The actual task to be re-enqueued by reExecutePeriodic */
RunnableScheduledFuture<V> outerTask = this;

這個變量指向了自己习劫,而this的類型是什么节值?是ScheduledFutureTask,也就是可以被調(diào)度的task榜聂,這樣就實現(xiàn)了循環(huán)執(zhí)行任務(wù)了搞疗。

上面的分析已經(jīng)到了循環(huán)執(zhí)行,但是ScheduledThreadPoolExecutor的功能是周期性執(zhí)行须肆,所以我們接著分析ScheduledThreadPoolExecutor是如何根據(jù)我們的參數(shù)走走停停的匿乃。這個時候,是應(yīng)該看一下ScheduledThreadPoolExecutor的構(gòu)造函數(shù)了豌汇,我們來看一個最簡單的構(gòu)造函數(shù):

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

我們知道ScheduledThreadPoolExecutor的父類是ThreadPoolExecutor幢炸,所以這里的super其實是ThreadPoolExecutor的構(gòu)造函數(shù),我們發(fā)現(xiàn)其中有一個參數(shù)DelayedWorkQueue拒贱,看名字貌似是一個延遲隊列的樣子宛徊,進一步跟蹤代碼,發(fā)現(xiàn)了下面的一行代碼(構(gòu)造函數(shù)中):
this.workQueue = workQueue;

所以在ScheduledThreadPoolExecutor中逻澳,workQueue是一個DelayedWorkQueue類型的隊列闸天,我們暫且認為DelayedWorkQueue是一種具備延遲功能的隊列吧,那么斜做,到此我們便可以想明白了苞氮,上面的分析我們明白了ScheduledThreadPoolExecutor是如何循環(huán)執(zhí)行任務(wù)的,而這里我們明白了ScheduledThreadPoolExecutor使用DelayedWorkQueue來達到延遲的目標瓤逼,所以組合起來笼吟,就可以實現(xiàn)ScheduledThreadPoolExecutor周期性執(zhí)行的目標。下面我們來看一下DelayedWorkQueue是如何做到延遲的吧霸旗,上文中提到一個方法:getTask贷帮,這個方法的作用是從workQueue中取出任務(wù)來執(zhí)行,而在ScheduledThreadPoolExecutor里面诱告,getTask方法是從DelayedWorkQueue中取任務(wù)的撵枢,而取任務(wù)無非兩個方法:poll或者take,下面我們對DelayedWorkQueue的take方法來分析一下:

      public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null)
                        available.await();
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            return finishPoll(first);
                        first = null; // don't retain ref while waiting
                        if (leader != null)
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }

在for循環(huán)里面,首先從queue中獲取第一個任務(wù)诲侮,然后從任務(wù)中取出延遲時間镀虐,而后使用available變量來實現(xiàn)延遲效果。這里面需要幾個點需要探索一下:

  • 這個queue是什么東西沟绪?
  • 延遲時間的來龍去脈刮便?
  • available變量的來龍去脈?

對于第一個問題绽慈,看下面的代碼:
private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];

它是一個RunnableScheduledFuture類型的數(shù)組恨旱,下面是RunnableScheduledFuture類的類關(guān)系圖:


@RunnableScheduledFuture類關(guān)系|center

數(shù)組里面保存了我們的RunnableScheduledFuture,對queue的操作坝疼,主要來看一下增加元素和消費元素的操作搜贤。首先,假設(shè)使用add方法來增加RunnableScheduledFuture到queue钝凶,調(diào)用的鏈路如下:

        public boolean add(Runnable e) {
            return offer(e);
        }

        public boolean offer(Runnable x) {
            if (x == null)
                throw new NullPointerException();
            RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int i = size;
                if (i >= queue.length)
                    grow();
                size = i + 1;
                if (i == 0) {
                    queue[0] = e;
                    setIndex(e, 0);
                } else {
                    siftUp(i, e);
                }
                if (queue[0] == e) {
                    leader = null;
                    available.signal();
                }
            } finally {
                lock.unlock();
            }
            return true;
        }

解釋一下仪芒,add方法直接轉(zhuǎn)到了offer方法,該方法中耕陷,首先判斷數(shù)組的容量是否足夠掂名,如果不夠則grow,增長的策略如下:
int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
每次增長50%哟沫。增長完成后饺蔑,如果這是第一個元素,則放在坐標為0的位置嗜诀,否則猾警,使用siftUp操作,下面是該方法的內(nèi)容:

        private void siftUp(int k, RunnableScheduledFuture<?> key) {
            while (k > 0) {
                int parent = (k - 1) >>> 1;
                RunnableScheduledFuture<?> e = queue[parent];
                if (key.compareTo(e) >= 0)
                    break;
                queue[k] = e;
                setIndex(e, k);
                k = parent;
            }
            queue[k] = key;
            setIndex(key, k);
        }

這個數(shù)組實現(xiàn)了堆這種數(shù)據(jù)結(jié)構(gòu)隆敢,使用對象比較將最需要被調(diào)度執(zhí)行的RunnableScheduledFuture放到數(shù)組的前面发皿,而這得力于compareTo方法,下面是RunnableScheduledFuture類的compareTo方法的實現(xiàn)筑公,主要是通過延遲時間來做比較雳窟。

public int compareTo(Delayed other) {
    if (other == this) // compare zero if same object
        return 0;
    if (other instanceof ScheduledFutureTask) {
        ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
        long diff = time - x.time;
        if (diff < 0)
            return -1;
        else if (diff > 0)
            return 1;
        else if (sequenceNumber < x.sequenceNumber)
            return -1;
        else
            return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

上面是生產(chǎn)元素,下面來看一下消費數(shù)據(jù)匣屡。在上面我們提到的take方法中,使用了一個方法如下:

private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
    int s = --size;
    RunnableScheduledFuture<?> x = queue[s];
    queue[s] = null;
    if (s != 0)
        siftDown(0, x);
    setIndex(f, -1);
    return f;
}

這個方法中調(diào)用了一個方法siftDown拇涤,這個方法如下:

private void siftDown(int k, RunnableScheduledFuture<?> key) {
    int half = size >>> 1;
    while (k < half) {
        int child = (k << 1) + 1;
        RunnableScheduledFuture<?> c = queue[child];
        int right = child + 1;
        if (right < size && c.compareTo(queue[right]) > 0)
            c = queue[child = right];
        if (key.compareTo(c) <= 0)
            break;
        queue[k] = c;
        setIndex(c, k);
        k = child;
    }
    queue[k] = key;
    setIndex(key, k);
}

對其的解釋就是:
Replaces first element with last and sifts it down. Call only when holding lock.

總結(jié)一下捣作,當我們向queue插入任務(wù)的時候,會發(fā)生siftUp方法的執(zhí)行鹅士,這個時候會把任務(wù)盡量往根部移動券躁,而當我們完成任務(wù)調(diào)度之后,會發(fā)生siftDown方法的執(zhí)行,與siftUp相反也拜,siftDown方法會將任務(wù)盡量移動到queue的末尾以舒。總之慢哈,大概的意思就是queue通過compareTo實現(xiàn)了類似于優(yōu)先級隊列的功能蔓钟。

下面我們來看一下第二個問題:延遲時間的來龍去脈。在上面的take方法里面卵贱,首先獲取了delay滥沫,然后再使用available來做延遲效果,那這個delay從哪里來的呢键俱?通過上面的類圖RunnableScheduledFuture的類圖我們知道兰绣,RunnableScheduledFuture類實現(xiàn)了Delayed接口,而Delayed接口里面的唯一方法是getDelay编振,我們到RunnableScheduledFuture里面看一下這個方法的具體實現(xiàn):

public long getDelay(TimeUnit unit) {
     return unit.convert(time - now(), NANOSECONDS);
 }

time是我們設(shè)定的下次執(zhí)行的時間缀辩,所以延遲就是(time - now()),沒毛沧傺搿雌澄!

第三個問題:available變量的來龍去脈,至于這個問題杯瞻,我們看下面的代碼:

/**
 * Condition signalled when a newer task becomes available at the
 * head of the queue or a new thread may need to become leader.
 */
private final Condition available = lock.newCondition();

這是一個條件變量镐牺,take方法里面使用這個變量來做延遲效果。Condition可以在多個線程間做同步協(xié)調(diào)工作魁莉,更為具體細致的關(guān)于Condition的內(nèi)容睬涧,可以參考更多的資料來學習,本文對此知識點點到為止旗唁。

到此為止畦浓,我們梳理了ScheduledThreadPoolExecutor是如何實現(xiàn)周期性調(diào)度的,首先分析了它的循環(huán)性检疫,然后分析了它的延遲效果讶请。

本文到此也就結(jié)束了,對于線程池的學習現(xiàn)在才剛剛起步屎媳,需要更多更專業(yè)的知識類幫我理解更為底層的內(nèi)容夺溢,當然,為了更進一步理解線程池的實現(xiàn)細節(jié)烛谊,首先需要對線程間通信有足夠的把握风响,其次是要對各種數(shù)據(jù)結(jié)構(gòu)有清晰的認識,比如隊列丹禀、優(yōu)先級隊列状勤、堆等高級的數(shù)據(jù)結(jié)構(gòu)鞋怀,以及java語言對于這些數(shù)據(jù)結(jié)構(gòu)的實現(xiàn),更為重要的是要結(jié)合實際情況分析問題持搜,在工作和平時的學習中不斷總結(jié)密似,不斷迭代對于線程、線程池的認知葫盼。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末残腌,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子剪返,更是在濱河造成了極大的恐慌废累,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,589評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件脱盲,死亡現(xiàn)場離奇詭異邑滨,居然都是意外死亡,警方通過查閱死者的電腦和手機钱反,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,615評論 3 396
  • 文/潘曉璐 我一進店門掖看,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人面哥,你說我怎么就攤上這事哎壳。” “怎么了尚卫?”我有些...
    開封第一講書人閱讀 165,933評論 0 356
  • 文/不壞的土叔 我叫張陵归榕,是天一觀的道長。 經(jīng)常有香客問我吱涉,道長刹泄,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,976評論 1 295
  • 正文 為了忘掉前任怎爵,我火速辦了婚禮特石,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘鳖链。我一直安慰自己姆蘸,他們只是感情好,可當我...
    茶點故事閱讀 67,999評論 6 393
  • 文/花漫 我一把揭開白布芙委。 她就那樣靜靜地躺著逞敷,像睡著了一般。 火紅的嫁衣襯著肌膚如雪题山。 梳的紋絲不亂的頭發(fā)上兰粉,一...
    開封第一講書人閱讀 51,775評論 1 307
  • 那天,我揣著相機與錄音顶瞳,去河邊找鬼。 笑死,一個胖子當著我的面吹牛慨菱,可吹牛的內(nèi)容都是我干的焰络。 我是一名探鬼主播,決...
    沈念sama閱讀 40,474評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼符喝,長吁一口氣:“原來是場噩夢啊……” “哼闪彼!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起协饲,我...
    開封第一講書人閱讀 39,359評論 0 276
  • 序言:老撾萬榮一對情侶失蹤畏腕,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后茉稠,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體描馅,經(jīng)...
    沈念sama閱讀 45,854評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,007評論 3 338
  • 正文 我和宋清朗相戀三年而线,在試婚紗的時候發(fā)現(xiàn)自己被綠了铭污。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,146評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡膀篮,死狀恐怖嘹狞,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情誓竿,我是刑警寧澤磅网,帶...
    沈念sama閱讀 35,826評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站筷屡,受9級特大地震影響涧偷,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜速蕊,卻給世界環(huán)境...
    茶點故事閱讀 41,484評論 3 331
  • 文/蒙蒙 一嫂丙、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧规哲,春花似錦跟啤、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,029評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至袄简,卻和暖如春腥放,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背绿语。 一陣腳步聲響...
    開封第一講書人閱讀 33,153評論 1 272
  • 我被黑心中介騙來泰國打工秃症, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留候址,地道東北人。 一個月前我還...
    沈念sama閱讀 48,420評論 3 373
  • 正文 我出身青樓种柑,卻偏偏與公主長得像岗仑,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子聚请,可洞房花燭夜當晚...
    茶點故事閱讀 45,107評論 2 356

推薦閱讀更多精彩內(nèi)容