[第三期:JAVA并發(fā):線程池管理 ThreadPoolExecutor]

源碼不會(huì)騙你的=︱取3送埂!

一七问、背景

JAVA通過(guò)多線程的方式實(shí)現(xiàn)并發(fā)颈墅,為了方便線程池的管理蜡镶,JAVA采用線程池的方式對(duì)線線程的整個(gè)生命周期進(jìn)行管理。當(dāng)然恤筛,對(duì)簡(jiǎn)單的并發(fā)自已也可以對(duì)Thread進(jìn)行人工管理官还,但并不是此文的重點(diǎn),而且不建議方式毒坛。本文的重點(diǎn)是研究ThreadPoolExecutor管理線程池的策略望伦,讓大家對(duì)ThreadPoolExecutor的工作原理和過(guò)程有一個(gè)透徹的理解。

二煎殷、幾個(gè)關(guān)系

我們通常采用如下方法創(chuàng)建一個(gè)程池:

public class TestThread {
    @Test
    public void testCallable() throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        List<Future<Integer>> results = new ArrayList<>();
        int i = 10;
        while (i-- > 0) {
            results.add(exec.submit(new GetRand()));
        }
        for (Future<Integer> n : results) {
            System.out.println(n.get());
        }
    }
}

上便通過(guò)一個(gè)工場(chǎng)類Executors創(chuàng)建了一個(gè)工作類屯伞,工場(chǎng)類返回一個(gè)ExecutorService 對(duì)象。Executors可以返回多種類型的線程池豪直,原諒我簡(jiǎn)單啰嗦一下這幾種線程池

newCachedThreadPool() 緩存型池子劣摇,先查看池中有沒(méi)有以前建立的線程,如果有顶伞,就 reuse.如果沒(méi)有饵撑,就建一個(gè)新的線程加入池中;緩存型池子通常用于執(zhí)行一些生存期很短的異步型任務(wù)因此在一些面向連接的daemon型SERVER中用得不多剑梳。但對(duì)于生存期短的異步任務(wù),它是Executor的首選滑潘。能reuse的線程垢乙,必須是timeout IDLE內(nèi)的池中線程,缺省 timeout是60s,超過(guò)這個(gè)IDLE時(shí)長(zhǎng)语卤,線程實(shí)例將被終止及移出池追逮。注意,放入CachedThreadPool的線程不必?fù)?dān)心其結(jié)束粹舵,超過(guò)TIMEOUT不活動(dòng)钮孵,其會(huì)自動(dòng)被終止。
newFixedThreadPool(int) newFixedThreadPool與cacheThreadPool差不多眼滤,也是能reuse就用巴席,但不能隨時(shí)建新的線程;-其獨(dú)特之處:任意時(shí)間點(diǎn),最多只能有固定數(shù)目的活動(dòng)線程存在诅需,此時(shí)如果有新的線程要建立漾唉,只能放在另外的隊(duì)列中等待,直到當(dāng)前的線程中某個(gè)線程終止直接被移出池子;-和cacheThreadPool不同堰塌,F(xiàn)ixedThreadPool沒(méi)有IDLE機(jī)制(可能也有赵刑,但既然文檔沒(méi)提,肯定非常長(zhǎng)场刑,類似依賴上層的TCP或UDP IDLE機(jī)制之類的)般此,所以FixedThreadPool多數(shù)針對(duì)一些很穩(wěn)定很固定的正規(guī)并發(fā)線程,多用于服務(wù)器;-從方法的源代碼看牵现,cache池和fixed 池調(diào)用的是同一個(gè)底層 池铐懊,只不過(guò)參數(shù)不同:fixed池線程數(shù)固定,并且是0秒IDLE(無(wú)IDLE) cache池線程數(shù)支持0-Integer.MAX_VALUE(顯然完全沒(méi)考慮主機(jī)的資源承受能力)瞎疼,60秒IDLE
newScheduledThreadPool(int) 這個(gè)池子里的線程可以按schedule依次delay執(zhí)行居扒,或周期執(zhí)行
SingleThreadExecutor() -單例線程,任意時(shí)間池中只能有一個(gè)線程;-用的是和cache池和fixed池相同的底層池丑慎,但線程數(shù)目是1-1,0秒IDLE(無(wú)IDLE)

以上幾種線程池都都反回了ExecutorService對(duì)象喜喂,也就是實(shí)際是靠ExecutorService來(lái)管理線程的整個(gè)生命周期。進(jìn)一步地竿裂,我們知道ExecutorService是一個(gè)接口玉吁,沒(méi)有具體實(shí)現(xiàn),最后的具體實(shí)現(xiàn)應(yīng)該由ThreadPoolExecutor實(shí)現(xiàn)的(當(dāng)然不包括周期線程池)腻异。別問(wèn)為什么进副,請(qǐng)自覺(jué)補(bǔ)充接口化編程;我們來(lái)看一下幾個(gè)類的關(guān)系,這里有兩條路線:

Executor 定義了一個(gè)execute接口影斑,ExecutorService繼承了Executor给赞,并定義了管理線程生命周期的接口;

  • (1)AbstractExecutorService 實(shí)現(xiàn)了ExecutorService;ThreadPoolExecutor繼承了AbstractExecutorService;這條線是我們關(guān)注的重點(diǎn).
  • (2)ScheduleExecutorService 繼承了ExecutorService矫户,并增加周期調(diào)度的接口片迅;ScheduledThreadPoolExecutor 實(shí)現(xiàn)了ScheduleExecutorService,用來(lái)管理周期線程池皆辽;(本文不介紹)

繼承關(guān)系如下圖所示:

image2017-1-13 15-44-19.png

我們?cè)賮?lái)看一下Executors工場(chǎng)類產(chǎn)生的線程池的方式如下:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
  
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

其最終的實(shí)現(xiàn)方式都是通過(guò)ThreadPoolExecutor進(jìn)行實(shí)現(xiàn)的柑蛇;通過(guò)上面的介紹,我們知道線程生命周期的管理驱闷,在本質(zhì)上是由ThreadPoolExecutor來(lái)實(shí)現(xiàn)的耻台,因此只需要透徹理解ThreadPoolExecutor的實(shí)現(xiàn)原理即可了解其如何管理線程。

三空另、ThreadPoolExecutor

3.1 構(gòu)造參數(shù)

先來(lái)看一下ThreadPoolExecutor的構(gòu)造函數(shù):

ublic ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
//默認(rèn)ThreadFactory,默認(rèn)handler
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}
  
// 默認(rèn)handler
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         threadFactory, defaultHandler);
}
  
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}

可以看到ThreadPoolExecutor有四個(gè)構(gòu)造函數(shù)盆耽,只有參數(shù)有所不同,其它三個(gè)構(gòu)造函數(shù)的具體實(shí)現(xiàn)都是由第一個(gè)構(gòu)造函數(shù)來(lái)完成的扼菠。那我們只需要來(lái)研究一下第一個(gè)構(gòu)造函數(shù)就可了U髯帧!=吭ァ!
先來(lái)看一下每個(gè)參數(shù)的含義(先了解一下大概意思):

先看一下接口注釋

/**
 * Creates a new {@code ThreadPoolExecutor} with the given initial
 * parameters.
 *
 * @param corePoolSize the number of threads to keep in the pool, even
 *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 * @param maximumPoolSize the maximum number of threads to allow in the
 *        pool
 * @param keepAliveTime when the number of threads is greater than
 *        the core, this is the maximum time that excess idle threads
 *        will wait for new tasks before terminating.
 * @param unit the time unit for the {@code keepAliveTime} argument
 * @param workQueue the queue to use for holding tasks before they are
 *        executed.  This queue will hold only the {@code Runnable}
 *        tasks submitted by the {@code execute} method.
 * @param threadFactory the factory to use when the executor
 *        creates a new thread
 * @param handler the handler to use when execution is blocked
 *        because the thread bounds and queue capacities are reached
 * @throws IllegalArgumentException if one of the following holds:<br>
 *         {@code corePoolSize < 0}<br>
 *         {@code keepAliveTime < 0}<br>
 *         {@code maximumPoolSize <= 0}<br>
 *         {@code maximumPoolSize < corePoolSize}
 * @throws NullPointerException if {@code workQueue}
 *         or {@code threadFactory} or {@code handler} is null
 */


看懂了畅厢,就沒(méi)必要看下面的中文解釋了哈:
corePoolSize:核心線程池大蟹肓 ;啥意思呢框杜?就是線程池應(yīng)該維護(hù)的最小線程的數(shù)量浦楣, 線程池?cái)?shù)量小于該值,則來(lái)一個(gè)新線程時(shí)咪辱,就會(huì)創(chuàng)建一個(gè)新線程振劳,無(wú)論線程池中有無(wú)線程空閑.
maximumPoolSize: 最大線程池大小油狂;它表示線程池中最大創(chuàng)建線程池的數(shù)量.
keepAliveTime:表示線程沒(méi)有任務(wù)執(zhí)行時(shí)最多保持多久時(shí)間會(huì)終止历恐。默認(rèn)情況下,只有當(dāng)線程池中的線程數(shù)大于corePoolSize時(shí)专筷,keepAliveTime才會(huì)起作用弱贼,直到線程池中的線程數(shù)不大于. corePoolSize,
即當(dāng)線程池中的線程數(shù)大于corePoolSize時(shí)磷蛹,如果一個(gè)線程空閑的時(shí)間達(dá)到keepAliveTime吮旅,則會(huì)終止,直到線程池中的線程數(shù)不超過(guò)corePoolSize味咳。但是如果調(diào)用了. allowCoreThreadTimeOut(boolean)方法庇勃,
在線程池中的線程數(shù)不大于corePoolSize時(shí)檬嘀,keepAliveTime參數(shù)也會(huì)起作用,直到線程池中的線程數(shù)為0责嚷;
unit:和keepAliveTime相當(dāng)于同一個(gè)參數(shù)鸳兽,有以下幾個(gè)單位:

TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小時(shí)
TimeUnit.MINUTES;           //分鐘
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //納秒


BlockingQueue<Runnable> workQueue:阻塞的任務(wù)隊(duì)例;用來(lái)存儲(chǔ)等待執(zhí)行的任務(wù)再层,這個(gè)參數(shù)的選擇也很重要贸铜,會(huì)對(duì)線程池的運(yùn)行過(guò)程產(chǎn)生重大影響,一般來(lái)說(shuō)聂受,這里的阻塞隊(duì)列有以下幾種選擇:

ArrayBlockingQueue;//內(nèi)部維護(hù)一個(gè)數(shù)組蒿秦,F(xiàn)IFO策略
LinkedBlockingQueue;//隊(duì)列使用FIFO策略,內(nèi)部維護(hù)了一個(gè)單向鏈表蛋济,默認(rèn)最大容量是Integer.MAX_VALUE棍鳖,動(dòng)態(tài)生成節(jié)點(diǎn)
所以,線程池最多只有corePoolSize個(gè)thread被創(chuàng)建碗旅,其他都會(huì)在queue中被阻塞
適用場(chǎng)景渡处,確保每個(gè)請(qǐng)求都能被執(zhí)行,不被拒絕
SynchronousQueue;//相當(dāng)于隊(duì)列長(zhǎng)度為0祟辟,因此只要達(dá)到 maximumPoolSize就會(huì)拒絕新提交的任務(wù)

threadFactory:線程工廠医瘫,用來(lái)創(chuàng)建線程.
handler:當(dāng)阻塞隊(duì)列和線程池都滿了后,拒絕任務(wù)的策略旧困,有以下幾種策略:

ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常醇份。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常吼具。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù)僚纷,然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過(guò)程)
ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)

3.2 執(zhí)行過(guò)程

我們以execute為例來(lái)看一個(gè)任務(wù)的執(zhí)行過(guò)程(如下流程圖):

image2017-1-13 18-7-45.png

我們用源碼解釋一下具體的實(shí)現(xiàn)過(guò)程:

/**
 * Executes the given task sometime in the future.  The task
 * may execute in a new thread or in an existing pooled thread.
 *
 * If the task cannot be submitted for execution, either because this
 * executor has been shutdown or because its capacity has been reached,
 * the task is handled by the current {@code RejectedExecutionHandler}.
 *
 * @param command the task to execute
 * @throws RejectedExecutionException at discretion of
 *         {@code RejectedExecutionHandler}, if the task
 *         cannot be accepted for execution
 * @throws NullPointerException if {@code command} is null
 */
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

再來(lái)翻譯一下:
任務(wù)會(huì)通過(guò)創(chuàng)建一個(gè)新線程或者用線程池中的空閑線程來(lái)執(zhí)行;
如果該任務(wù)由于executor被關(guān)閉或者隊(duì)列已滿的原因被拒絕執(zhí)行拗盒,剛會(huì)交給RejectedExecutionHandler來(lái)處理.
前面是函數(shù)注釋哈怖竭,再看函數(shù)內(nèi)的注釋:
任務(wù)執(zhí)行要經(jīng)過(guò)以下三個(gè)步驟(三個(gè)if):

  • 1.如果當(dāng)前線程池中的線程數(shù)量小于corePoolSize,則通過(guò)addWorker新創(chuàng)建一個(gè)線程來(lái)執(zhí)行任務(wù),addWorker會(huì)檢查workerCount和runState陡蝇,如果創(chuàng)建線程失敗則返回false(addWorker稍后解釋)
  • 2.如果runState = RUNNING痊臭,且成功加入隊(duì)列當(dāng)中,還需要進(jìn)行雙因素驗(yàn)證登夫,如果線程池關(guān)閉趣兄,則移除隊(duì)列中的線程,并reject;如果當(dāng)前線程池?zé)o線程悼嫉,則新創(chuàng)建一個(gè)線程艇潭;
  • 3.如果無(wú)法將任務(wù)加入隊(duì)例,則嘗試新建線程;如果失敗蹋凝,則reject
    解釋一下線程池的runState(來(lái)自參考文獻(xiàn)的拷貝).
      在ThreadPoolExecutor中定義了一個(gè)volatile變量鲁纠,另外定義了幾個(gè)static final變量表示線程池的各個(gè)狀態(tài):
volatile int runState;
static final int RUNNING    = 0;
static final int SHUTDOWN   = 1;
static final int STOP       = 2;
static final int TERMINATED = 3;

runState表示當(dāng)前線程池的狀態(tài),它是一個(gè)volatile變量用來(lái)保證線程之間的可見(jiàn)性鳍寂;
  下面的幾個(gè)static final變量表示runState可能的幾個(gè)取值改含。
  當(dāng)創(chuàng)建線程池后,初始時(shí)迄汛,線程池處于RUNNING狀態(tài)捍壤;
  如果調(diào)用了shutdown()方法,則線程池處于SHUTDOWN狀態(tài)鞍爱,此時(shí)線程池不能夠接受新的任務(wù)鹃觉,它會(huì)等待所有任務(wù)執(zhí)行完畢;
  如果調(diào)用了shutdownNow()方法睹逃,則線程池處于STOP狀態(tài)盗扇,此時(shí)線程池不能接受新的任務(wù),并且會(huì)去嘗試終止正在執(zhí)行的任務(wù)沉填;
  當(dāng)線程池處于SHUTDOWN或STOP狀態(tài)疗隶,并且所有工作線程已經(jīng)銷毀,任務(wù)緩存隊(duì)列已經(jīng)清空或執(zhí)行結(jié)束后翼闹,線程池被設(shè)置為TERMINATED狀態(tài)斑鼻。
從上述過(guò)程中知道有三個(gè)關(guān)鍵操作:addWorker(添加任務(wù)),workQueue.offer(將任務(wù)添加至隊(duì)列)猎荠,reject(拒絕任務(wù))坚弱;下面我們來(lái)看看這三個(gè)方法的具體執(zhí)行過(guò)程:

addWorker(添加任務(wù))

同樣先讀源碼,execute代碼中法牲,每個(gè)條件都執(zhí)行了addworker,但參數(shù)都有不同琼掠,大家注意拒垃。
addWorker源碼:

/**
 * Checks if a new worker can be added with respect to current
 * pool state and the given bound (either core or maximum). If so,
 * the worker count is adjusted accordingly, and, if possible, a
 * new worker is created and started, running firstTask as its
 * first task. This method returns false if the pool is stopped or
 * eligible to shut down. It also returns false if the thread
 * factory fails to create a thread when asked.  If the thread
 * creation fails, either due to the thread factory returning
 * null, or due to an exception (typically OutOfMemoryError in
 * Thread.start()), we roll back cleanly.
 *
 * @param firstTask the task the new thread should run first (or
 * null if none). Workers are created with an initial first task
 * (in method execute()) to bypass queuing when there are fewer
 * than corePoolSize threads (in which case we always start one),
 * or when the queue is full (in which case we must bypass queue).
 * Initially idle threads are usually created via
 * prestartCoreThread or to replace other dying workers.
 *
 * @param core if true use corePoolSize as bound, else
 * maximumPoolSize. (A boolean indicator is used here rather than a
 * value to ensure reads of fresh values after checking other pool
 * state).
 * @return true if successful
 */
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
 
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
 
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
 
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
 
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

下面按照不同條件來(lái)說(shuō)明addWorker都做了什么,直接在源碼中注釋了哈:
current_thread_num < corePoolSize

/*********** execute **************/
public void execute(Runnable command) {
    ... ...
    if (workerCountOf(c) < corePoolSize) { // 當(dāng)前線程數(shù) < corePoolSize
        if (addWorker(command, true))
            return;
        c = ctl.get();
    ... ...
}
   
/********* addWorker ************/
private boolean addWorker(Runnable firstTask, boolean core) { // 參數(shù) firstTask != null, core = true
    ...  // 驗(yàn)證是否滿足可新增線程的條件瓷蛙,曰:滿足^_^
  
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask); // 通過(guò)ThreadFactory創(chuàng)建一個(gè)線程悼瓮,并且線程用于執(zhí)行firstTask
        final Thread t = w.thread;
        if (t != null) {
            ... ...
            try {
                ... ...
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    ... ...
                    workers.add(w);
                    workerAdded = true;
                }
            } finally {
                ... ...
            }
            if (workerAdded) { // 上面檢查是否確實(shí)添加線程成功,曰:成功
                t.start(); // 線程啟動(dòng)艰猬,調(diào)用worker.run
                workerStarted = true;
            }
        }
    } finally {
        ... ...
    }
    ... ...
}
   
/*************** worker.run ***************/
public void run() { runWorker(this);}
   
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask; // firstTask不為null
    w.firstTask = null;
    ... ...
    try {
        while (task != null || (task = getTask()) != null) { // 一進(jìn)來(lái)就滿足横堡,就執(zhí)行當(dāng)前這個(gè)task
            w.lock();
            beforeExecute(wt, task);
            task.run();
            afterExecute(task, thrown);
            ... ...
        }
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

maximumPoolSize > current_thread_num >= corePoolSize


/*********** execute **************/
public void execute(Runnable command) {
    ... ...
    if (isRunning(c) && workQueue.offer(command)) { // 放入阻塞隊(duì)列 - workQueue.offer(command)
        int recheck = ctl.get();
        // 成功加入阻塞隊(duì)列后,仍需要進(jìn)行double-check冠桃,以防 線程終止了或者線程池在進(jìn)入這個(gè)方法的時(shí)候已經(jīng)shutdown了
        if (! isRunning(recheck) && remove(command)) // 如果double check失敗命贴,remove用來(lái)回滾 workQueue.offer 的操作,執(zhí)行 workQueue.remove(task)
            reject(command); // 拒絕當(dāng)前的任務(wù)
        else if (workerCountOf(recheck) == 0) // 如果當(dāng)前沒(méi)有線程就創(chuàng)建一個(gè)
            addWorker(null, false);  // 注意參數(shù)是 (null, false)
    } else if (!addWorker(command, false)) // 如果不能放入阻塞隊(duì)列,那么久創(chuàng)建一個(gè)thread執(zhí)行當(dāng)前任務(wù)
        reject(command);
    ... ...
}
   
/********* addWorker ************/
private boolean addWorker(Runnable firstTask, boolean core) { // 參數(shù) firstTask = null, core = false
    ... ...
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            ... ...
            try {
                ... ...
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    ... ...
                    workers.add(w);
                    workerAdded = true;
                }
            } finally {
                ... ...
            }
            if (workerAdded) {
                t.start();  // 線程啟動(dòng)胸蛛,調(diào)用worker.run
                workerStarted = true;
            }
        }
    } finally {
        ... ...
    }
    return workerStarted;
}
   
/*************** worker.run ***************/
public void run() { runWorker(this);}
   
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask; // firstTask == null
    w.firstTask = null;
    ... ...
    try {
        while (task != null || (task = getTask()) != null) { // 1. 第一次進(jìn)入污茵,task=null,執(zhí)行g(shù)etTask葬项;2. 獲取到非null的task之后泞当,執(zhí)行task
            ... ...
            beforeExecute(wt, task);
            task.run();
            afterExecute(task, thrown);
            ... ...
        }
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}
   
/********* getTask ************/
private Runnable getTask() {
    boolean timedOut = false;
  
    for (;;) {
        ... ...
        int wc = workerCountOf(c);
  
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 是否設(shè)置了超時(shí)時(shí)間或者線程數(shù)已經(jīng)達(dá)到corePoolSize
        ... ...
  
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 如果設(shè)置了超時(shí),通過(guò) workQueue.poll 取出還有效的任務(wù)
                workQueue.take(); // 如果沒(méi)有設(shè)置超時(shí)民珍,通過(guò) workQueue.take取出任務(wù)
            ... ...
        } catch (InterruptedException retry) {
            ... ...
        }
    }
}

這一個(gè)過(guò)程最難理解襟士,即maximumPoolSize > current_thread_num >= corePoolSize時(shí),會(huì)往隊(duì)列中尾部添加一個(gè)任務(wù)嚷量,并從頭部中取出一個(gè)任務(wù)來(lái)運(yùn)行.
current_thread_num >= maximumPoolSize

/*********** execute **************/
public void execute(Runnable command) {
    ... ...
    else if (!addWorker(command, false)) // addWorker = false陋桂,執(zhí)行reject
        reject(command);
    ... ...
}
   
/********* addWorker ************/
private boolean addWorker(Runnable firstTask, boolean core) {
    for (;;) {
        ... ...
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize)) // 超過(guò)maximumPoolSize之后,返回false; 上面 addWorker=false
                return false;
        }
    }
    ... ...
}

這個(gè)就是直接reject了

workQueue

workQueue決定如何添加隊(duì)列津肛,如何取隊(duì)列章喉,和并發(fā)關(guān)系并不大,而且是獨(dú)立的一塊身坐,所以這里就不再詳情介紹秸脱,給出三篇文檔:
SynchronousQueue.
LinkedBlockingQueue.
ArrayBlockingQueue.

reject(拒絕任務(wù))

final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

按照四個(gè)不同的reject策略詳述,RejectedExecutionHandler部蛇,默認(rèn)使用 AbortPolicy.

class comment
DiscardPolicy 什么也不做摊唇,丟棄當(dāng)前的task
DiscardOldestPolicy executor如果被關(guān)閉,什么也不做涯鲁;如果沒(méi)被關(guān)閉巷查,丟棄隊(duì)列中最老的task【queue.poll】,重新執(zhí)行execute(task)【放入隊(duì)列】
CallerRunsPolicy 如果當(dāng)前executor沒(méi)有被關(guān)閉抹腿,那么使用當(dāng)前執(zhí)行execute方法的線程執(zhí)行task岛请;關(guān)閉了的話,就什么也不做
AbortPolicy 拋出一個(gè) RejectedExecutionException

你會(huì)根據(jù)自己的業(yè)務(wù)場(chǎng)景創(chuàng)建線程池了嗎警绩?

參考文獻(xiàn):

https://gold.xitu.io/entry/587601a7b123db4a2ed68485/view.
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末崇败,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子肩祥,更是在濱河造成了極大的恐慌后室,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,755評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件混狠,死亡現(xiàn)場(chǎng)離奇詭異岸霹,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)将饺,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門贡避,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)痛黎,“玉大人,你說(shuō)我怎么就攤上這事贸桶【艘荩” “怎么了?”我有些...
    開封第一講書人閱讀 165,138評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵皇筛,是天一觀的道長(zhǎng)琉历。 經(jīng)常有香客問(wèn)我,道長(zhǎng)水醋,這世上最難降的妖魔是什么旗笔? 我笑而不...
    開封第一講書人閱讀 58,791評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮拄踪,結(jié)果婚禮上蝇恶,老公的妹妹穿的比我還像新娘。我一直安慰自己惶桐,他們只是感情好撮弧,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,794評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著姚糊,像睡著了一般贿衍。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上救恨,一...
    開封第一講書人閱讀 51,631評(píng)論 1 305
  • 那天贸辈,我揣著相機(jī)與錄音,去河邊找鬼肠槽。 笑死擎淤,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的秸仙。 我是一名探鬼主播嘴拢,決...
    沈念sama閱讀 40,362評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼寂纪!你這毒婦竟也來(lái)了席吴?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,264評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤弊攘,失蹤者是張志新(化名)和其女友劉穎抢腐,沒(méi)想到半個(gè)月后姑曙,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體襟交,經(jīng)...
    沈念sama閱讀 45,724評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評(píng)論 3 336
  • 正文 我和宋清朗相戀三年伤靠,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了捣域。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片啼染。...
    茶點(diǎn)故事閱讀 40,040評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖焕梅,靈堂內(nèi)的尸體忽然破棺而出迹鹅,到底是詐尸還是另有隱情,我是刑警寧澤贞言,帶...
    沈念sama閱讀 35,742評(píng)論 5 346
  • 正文 年R本政府宣布斜棚,位于F島的核電站,受9級(jí)特大地震影響该窗,放射性物質(zhì)發(fā)生泄漏弟蚀。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,364評(píng)論 3 330
  • 文/蒙蒙 一酗失、第九天 我趴在偏房一處隱蔽的房頂上張望义钉。 院中可真熱鬧,春花似錦规肴、人聲如沸捶闸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,944評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)删壮。三九已至,卻和暖如春序调,著一層夾襖步出監(jiān)牢的瞬間醉锅,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,060評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工发绢, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留硬耍,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,247評(píng)論 3 371
  • 正文 我出身青樓边酒,卻偏偏與公主長(zhǎng)得像经柴,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子墩朦,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,979評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • 轉(zhuǎn)自http://www.cnblogs.com/dolphin0520/p/3932921.html Java并...
    Allen_cyn閱讀 1,906評(píng)論 0 4
  • 線程池常見(jiàn)實(shí)現(xiàn) 線程池一般包含三個(gè)主要部分: 調(diào)度器: 決定由哪個(gè)線程來(lái)執(zhí)行任務(wù), 執(zhí)行任務(wù)所能夠的最大耗時(shí)等 線...
    永順閱讀 2,299評(píng)論 3 22
  • 合理利用線程池能夠帶來(lái)三個(gè)好處氓涣。第一:降低資源消耗牛哺。通過(guò)重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。第二:提...
    Coder_老王閱讀 3,853評(píng)論 1 4
  • 總是自尋煩惱劳吠,自尋糾結(jié)引润。
    dark40閱讀 218評(píng)論 0 1
  • 最近讀了很多的英文畢業(yè)演講,感覺(jué)心中似乎也被激起了一圈圈漣漪痒玩,到底是什么樣的思緒卻又有點(diǎn)紛亂淳附,不是可以簡(jiǎn)單一言以...
    日出東方天剛曉閱讀 219評(píng)論 0 1