線程池原理解析

1. 什么是線程池闲延?

通俗來講是就是裝有線程的池子整袁,和我們使用到的各種連接池的概念類似挽拂,那么線程池解決了什么問題呢,來看下官方的闡述說明:

  Thread pools address two different problems: they usually provide improved performance when     
  executing large numbers of asynchronous tasks, due to reduced per-task invocation overhead, 
  and they provide a means of bounding and managing the resources, including threads, consumed 
  when executing a collection of tasks.  Each {@code ThreadPoolExecutor} also maintains some basic 
  statistics, such as the number of completed tasks.

  翻譯如下人灼,線程池主要解決了兩個問題:
  1. 通過減少調(diào)用開銷围段,提高大量異步任務(wù)執(zhí)行的性能。
  2. 可以管理和限制線程數(shù)量投放、任務(wù)數(shù)量等資源奈泪。

首先線程池通過長期持有一組線程,避免了線程頻繁的創(chuàng)建和銷毀帶來的額外開銷灸芳。并且提供了豐富的工具方法來管理和限制線程池狀態(tài)段磨、資源等。

2. 初識線程池

2.1 簡單的上手例子

public static void main(String[] args) throws InterruptedException {

        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 2
                , 10, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(2),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
                
        threadPool.submit(() -> {
              // 執(zhí)行的任務(wù)內(nèi)容僅作為實例使用耗绿,沒有任何實際意義。
            System.out.println("task has been executing");
        });

}
程序輸出:task has been executing

第一步我們通過使用ThreadPoolExecutor構(gòu)造方法定義一個擁有兩個線程的線程池砾隅,并且成功了執(zhí)行我們提交的任務(wù)误阻。

2.2 剖析構(gòu)造方法參數(shù)的含義

ThreadPoolExecutor的構(gòu)造方法有以下四個:

image.png

既然是剖析,當然是關(guān)注最下面那個參數(shù)最多的構(gòu)造方法啦~晴埂,先來看看每個參數(shù)的意義以及它們作用究反。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
    ... 省略中間無關(guān)代碼
       
}

corePoolSize

線程池里持有的核心線程的數(shù)量,即最小線程數(shù)量儒洛。額精耐,除非設(shè)定了allowCoreThreadTimeOut為true,那么在指定時間內(nèi)如果核心線程一直處于空閑狀態(tài)琅锻,則會被終止卦停。

maximumPoolSize

線程池最大的線程數(shù)量。如果新的任務(wù)被添加恼蓬,存放任務(wù)的隊列已經(jīng)滿了惊完,并且當前的線程數(shù)量小于maximumPoolSize數(shù)量,線程池將會主動的創(chuàng)建一個新的線程处硬。

keepAliveTime TimeUnit

兩個組合使用的參數(shù)小槐,用來控制線程池里線程最大的空閑時間上限。如果超出指定時間空閑的線程將會被終止荷辕,以釋放資源凿跳。

ThreadFactory

用來統(tǒng)一創(chuàng)建線程的工廠類件豌,可以通過接口ThreadFactory自定義實現(xiàn),看下默認使用的線程工廠類內(nèi)容控嗜。

static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

通過newThread方法可以大至得知一下信息:指定了線程名茧彤,設(shè)定了優(yōu)先級、設(shè)定為非后臺線程躬审,如果想修改成自定義值巫员,可以參考使用com.google.common.util.concurrent.ThreadFactoryBuilder類。

BlockingQueue

存放待執(zhí)行任務(wù)隊列啄踊,特性如下:

  1. 任務(wù)先進先出
  2. 阻塞毡鉴,具體體現(xiàn)在隊列為空時阻塞出隊操作;隊列滿的時候阻塞入隊操作博助。

下面的表格列舉了BlockingQueue中不同方法在操作不能立刻完成時的行為险污。

Throws exception Special value Blocks Times out
Insert add(e) offer(e) put(e) offer(e, time, unit)
Remove remove() poll() take() poll(time, unit)
Examine element() peek() not applicable not applicable

ThreadPoolExecutor在添加任務(wù)的時候使用的是offer()方法,如果隊列滿了則會返回一個false富岳。取任務(wù)的時候使用到了take()蛔糯、poll()、poll(time, unit)窖式,具體使用的地方請看下面執(zhí)行流程的分析蚁飒。

常用的具體實現(xiàn)類分別是:

1. ArrayBlockingQueue 有界隊列,可以通過限定任務(wù)堆積數(shù)量
2. LinkedBlockingQueue 無界隊列萝喘,任務(wù)可以無限堆積淮逻,對執(zhí)行時間不敏感的業(yè)務(wù)可以使用這個隊列。
3. SynchronousQueue 阻塞隊列阁簸,不能緩沖任務(wù)爬早。新任務(wù)進來的時候會一直阻塞到有空閑的線程去處理。

RejectedExecutionHandler

用來處理不能被線程池執(zhí)行任務(wù)启妹。首先看下接口定義:

public interface RejectedExecutionHandler {

    /**
     * Method that may be invoked by a {@link ThreadPoolExecutor} when
     * {@link ThreadPoolExecutor#execute execute} cannot accept a
     * task.  This may occur when no more threads or queue slots are
     * available because their bounds would be exceeded, or upon
     * shutdown of the Executor.
     *
     * <p>In the absence of other alternatives, the method may throw
     * an unchecked {@link RejectedExecutionException}, which will be
     * propagated to the caller of {@code execute}.
     *
     * @param r the runnable task requested to be executed
     * @param executor the executor attempting to execute this task
     * @throws RejectedExecutionException if there is no remedy
     */
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

注釋里說明了什么情況下任務(wù)不能被執(zhí)行:1.沒有多余的線程或者任務(wù)隊列滿了 2.線程池被關(guān)閉了

看下其子類實現(xiàn)的具體的拒絕策略有哪些:

  1. CallerRunsPolicy直接使用調(diào)用submit或者execute的當前線程來執(zhí)行任務(wù)
public static void main(String[] args) throws InterruptedException {

    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1
            , 10, TimeUnit.SECONDS,
            new ArrayBlockingQueue<Runnable>(1),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.CallerRunsPolicy());

    for (int i = 0; i < 3; i++) {
        threadPool.submit(() -> {
            System.out.println(Thread.currentThread().getName());
        });
    }

    Thread.sleep(200000000);
}

輸出的結(jié)果:
main
pool-1-thread-1
pool-1-thread-1

這里定義了固定線程大小的線程池并且隊列中只存放一個任務(wù)筛严,所以for循環(huán)添加的第三個任務(wù)會被執(zhí)行CallerRunsPolicy策略,進而會被當前調(diào)用submit()方法的線程執(zhí)行饶米,也就是主線程(main)桨啃。

  1. AbortPolicy核心代碼如下:
/**
 * Always throws RejectedExecutionException.
 *
 * @param r the runnable task requested to be executed
 * @param e the executor attempting to execute this task
 * @throws RejectedExecutionException always
 */
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                         " rejected from " +
                                         e.toString());
}

比較直白,直接拋出非檢查異常檬输。

  1. DiscardPolicy直接忽略任務(wù)优幸,什么也不干。
  2. DiscardOldestPolicy核心代碼如下:
/**
 * Obtains and ignores the next task that the executor
 * would otherwise execute, if one is immediately available,
 * and then retries execution of task r, unless the executor
 * is shut down, in which case task r is instead discarded.
 *
 * @param r the runnable task requested to be executed
 * @param e the executor attempting to execute this task
 */
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        e.getQueue().poll();
        e.execute(r);
    }
}

因為緩存任務(wù)是使用的隊列(先進先出)褪猛,所以在執(zhí)行e.getQueue().poll();的時候把存放時間最久的任務(wù)出隊舍棄网杆。下面看下驗證示例:

public class Demo {

    public static void main(String[] args) throws InterruptedException {

        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1
                , 10, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(1),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardOldestPolicy());


        for (int i = 0; i < 5; i++) {
            threadPool.submit(new NamedRunnable(String.valueOf(i)));
        }

        Thread.sleep(10000);
    }
}

class NamedRunnable implements Runnable {

    String name;

    public NamedRunnable(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {

        }
        System.out.print(name + ", ");
    }
}
程序輸出:
0,  4,  中間1、2、3任務(wù)被丟棄了

3.核心概念

3.1 線程池狀態(tài)

線程池的線程數(shù)量碳却、線程池狀態(tài)都是通過一個原子類AtomicInteger來控制队秩,其中低28位用來表示線程的數(shù)量(5億左右),高3位用來表示線程的狀態(tài)(允許有8中狀態(tài)昼浦,實際使用了5種)馍资。

代碼如下:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

重點關(guān)注下高3位線程池狀態(tài)的表示:
1110 0000 0000 0000 0000 0000 0000 0000 RUNNING
0000 0000 0000 0000 0000 0000 0000 0000 SHUTDOWN
0010 0000 0000 0000 0000 0000 0000 0000 STOP
0100 0000 0000 0000 0000 0000 0000 0000 TIDYING
0110 0000 0000 0000 0000 0000 0000 0000 TERMINATED

線程池狀態(tài)相互轉(zhuǎn)換關(guān)系如下圖:


線程池狀態(tài)轉(zhuǎn)換

3.2 Woker

Worker繼承自AQS實現(xiàn)了簡單的非重入鎖的功能,并實現(xiàn)了Runnable接口关噪,實現(xiàn)了run()方法鸟蟹,調(diào)用了runWorker()方法(ps:runWorker這個方法后續(xù)會分析到)。每個worker對象都保存了當前工作線程使兔,和需要執(zhí)行的任務(wù)建钥。代碼如下:

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }

    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

3.3 提交任務(wù)過程

任務(wù)提交的方法有:submit()execute()兩種方法,submit()代碼如下:

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

這里submit()其實還是調(diào)用execute()虐沥,execute()代碼如下:

 public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    // 當前線程數(shù)量如果小于核心線程數(shù)量熊经,創(chuàng)建新的線程去執(zhí)行任務(wù),而不是將任務(wù)加入隊列欲险。
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 如果線程池是運行狀態(tài)镐依,并且入隊成功
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 再次檢查線程池狀態(tài),如果不符合條件將任務(wù)刪除
        if (! isRunning(recheck) && remove(command))
            // rejectHandler處理任務(wù)天试。
            reject(command);
        // 如果線程數(shù)量為0槐壳,創(chuàng)建新的線程去執(zhí)行任務(wù)。
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 創(chuàng)建線程執(zhí)行當前任務(wù)
    else if (!addWorker(command, false))
        // 失敗則執(zhí)行rejectHandler
        reject(command);
}

這里的邏輯還是比較清晰的喜每,下面看下addWorker()

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        // 獲取線程池狀態(tài)
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        /**
         * rs > SHUTDOWN || firstTask != null || workQueue.isEmpty()
         */
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            // 線程數(shù)量
            int wc = workerCountOf(c);
            /**
             * 線程不能大于最大數(shù)量限制宏粤。如果core為true則限定線程數(shù)量為corePoolSize
             * 大小,否則限定為maximumPoolSize大小
             */
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 線程池線程數(shù)量加1
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 如果線程池狀態(tài)變更則繼續(xù)loop
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    
    // 添加的線程啟動是否成功
    boolean workerStarted = false;
    // 線程是否成功添加
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                /**
                 * 線程池為RUNNING狀態(tài)時灼卢,或者調(diào)用了shutdown()方法之后空閑狀態(tài)的
                 * 線程被清理了,則會主動的添加一個新的工作線程來執(zhí)行之前的任務(wù)来农。
                 * 如果線程池為SHUTDOWN狀態(tài)時鞋真,新的任務(wù)不能被添加進來,以前的任務(wù)可
                 * 以繼續(xù)被執(zhí)行沃于。
                 */
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // 檢測Worker中的線程是否已經(jīng)被啟動涩咖。
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    // 記錄線程池曾經(jīng)最大的線程數(shù)量。
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 啟動worker實例中的線程繁莹。
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 如果添加失敗檩互,則執(zhí)行清理的工作:線程池線程數(shù)量減1、移除剛剛添加的Worker
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

到這里任務(wù)就已經(jīng)提交過程就分析完畢了咨演≌⒆颍總結(jié)下:

    1.當線程池的線程數(shù)量小于核心線程數(shù)量時,直接創(chuàng)建線程數(shù)量執(zhí)行
    2.當線程池的線程數(shù)量大于等于核顯線程數(shù)來時,先將任務(wù)入隊等待工作中的線程依次處理饵较。
    3.當線程入隊失敗時說明隊列已經(jīng)滿了拍嵌,就再次創(chuàng)建新的線程去處理(這里線程數(shù)量受maximumPoolSize影響),如果超過了限制循诉,則會將任務(wù)丟給rejectHandler處理横辆。

3.4 工作線程啟動過程

分析完了提交任務(wù)的過程,下面關(guān)注下啟動線程過程茄猫。下面代碼是Worker被添加之后開始工作的入口狈蚤,看下相關(guān)代碼:

// 省略無關(guān)代碼
if (workerAdded) {
    // 啟動worker實例中的線程。
    t.start();
    workerStarted = true;
}
// 省略無關(guān)代碼

在上面3.2Worker中有提到其實現(xiàn)了Runnable接口中的run()方法

/** Delegates main run loop to outer runWorker  */
public void run() {
    runWorker(this);
}

那么worker實例中的線程啟動的時候是如何調(diào)用該runWorker(this)方法的呢划纽?這個要從創(chuàng)建Worker實例說起~脆侮!

Worker創(chuàng)建代碼如下:

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}
其中threadFactory的默認實現(xiàn)是Executors.DefaultThreadFactory類,代碼如下:

class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

重點關(guān)注newThread(Runnable r)的方法阿浓,在getThreadFactory().newThread(this)這里偷偷
的傳入了this參數(shù)他嚷,而Worker實現(xiàn)了Runnable接口的run()方法,所以當線程啟動的時候會執(zhí)行
runWorker(this)這個方法芭毙。額筋蓖,有那么一點點繞~,大家明白就好退敦。

開始真正的啟動過程的分析:

final void runWorker(Worker w) {
    // 因為調(diào)用當前方法是剛剛啟動的新的工作線程粘咖,這里自然就是獲取到的當前工作線程。
    Thread wt = Thread.currentThread();
    // 獲取任務(wù)侈百。
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 更新worker狀態(tài)瓮下,允許響應(yīng)中斷。
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
         // 循環(huán)獲取隊列中的任務(wù)钝域,getTask()下面分析
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // 這一大段就說了一件事情讽坏,線程池被shutdown()的時候,將當前線程置為
            // 中斷狀態(tài)
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                   // 執(zhí)行鉤子函數(shù)
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 執(zhí)行任務(wù)
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 執(zhí)行鉤子函數(shù)
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                // 記錄當前工作線程已經(jīng)完成的任務(wù)數(shù)量
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 異常的時候執(zhí)行清理工作例证。
        processWorkerExit(w, completedAbruptly);
    }
}

至此啟動過程就完成了路呜,總結(jié)下:

1. 通過while循環(huán)不斷的獲取任務(wù),如果獲取不到任務(wù)就進入finally執(zhí)行processWorkerExit(w,false)
2. 如果能獲取到就執(zhí)行任務(wù)织咧,中間如果報錯就拋異常胀葱,進入finally執(zhí)行processWorkerExit(w,true)

注意1和2執(zhí)行processWorkerExit()方法時傳入的參數(shù)是不同,具體的區(qū)別后面會分析到笙蒙。

3.5 獲取任務(wù)的過程

上面介紹了完了worker的啟動過程抵屿,但是啟動之后從任務(wù)隊列中獲取task過程是什么樣的呢?

private Runnable getTask() {
    // 上次獲取任務(wù)是否超時
    boolean timedOut = false; // Did the last poll() time out?
    // 死循環(huán)
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        // 線程池狀態(tài)>=STOP()獲取任務(wù)隊列為空的時候捅位,返回null
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        /** 
         * 是否需要剔除工作線程轧葛。
         * 如果設(shè)定了allowCoreThreadTimeOut為true則超時的核心線程也會被清理搂抒,
         * 如果設(shè)置為false則只會清理超過corePoolSize數(shù)量之外的線程。
         */
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
        /**
         * (如果線程數(shù)量大于maximumPoolSize的限制 || 上次獲取任務(wù)超時而且需要剔除多余的線程)
         * && (線程數(shù)量大于1 或者 隊列為空)朝群,滿足以上條件則返回null
         * /
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 如果需要剔除線程燕耿,則使用poll()限定時間內(nèi)沒有獲取到任務(wù),則返回空值
            // 如果不需要剔除線程姜胖,則使用take()無限制等待誉帅,知道有任務(wù)返回
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            // 如果任務(wù)為空,則說明超時了右莱。
            timedOut = true;
        } catch (InterruptedException retry) {
            // 線程如果被打斷則認為沒有超時蚜锨,繼續(xù)下次循環(huán)
            timedOut = false;
        }
    }
}

上面各種判斷看著著實有點頭大,其實總結(jié)下就做了兩個方面的工作:返回可執(zhí)行的任務(wù)慢蜓,或者返回null亚再。返回null的意義在于退出上面3.4 runWorker()的循環(huán),進而執(zhí)行processWorkerExit(w, false)晨抡,進行線程清理的工作氛悬。

那么在執(zhí)行getTask()有哪些情況返回null呢?

1. 線程池狀態(tài)>=STOP或者隊列為空的時候耘柱,線程池能處于STOP狀態(tài)(上面有線程池狀態(tài)轉(zhuǎn)換的圖例)是因為調(diào)用了shutdownNow()的方法如捅,這時候不管任務(wù)隊列有沒有任務(wù)線程都會被清理。
2. 線程池數(shù)量超過maximumPoolSize的限制 && (線程數(shù)量大于1 或者 任務(wù)隊列為空)
在設(shè)定allowCoreThreadTimeOut為true的情況下:
3. 清理上次獲取任務(wù)超時的線程(核心線程也會被清理)
在設(shè)定allowCoreThreadTimeOut為false的情況下:
4. 清理上次獲取任務(wù)超時的線程(只會清理大于核心線程數(shù)量之外的線程)

3.6 線程池線程數(shù)量的控制

上接3.5獲取任務(wù)的過程调煎,這里看下清理線程的相關(guān)代碼:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 如果是因為異常
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 將當前worker完成的任務(wù)數(shù)量納入總的數(shù)量
        completedTaskCount += w.completedTasks;
        // 移除worker
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    
    // 嘗試將線程池置為TERMINATED狀態(tài)
    tryTerminate();

    // 假如線程池狀態(tài)處于RUNNING镜遣、SHUTDOWN狀態(tài)則至少保證有一個線程繼續(xù)處理已有的任務(wù),
    // 直至任務(wù)隊列為空
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

4. 寫在最后

希望博客的內(nèi)容能給廣大的Java道友提供一些的幫助和提升士袄。由于筆者水平有限悲关,如果內(nèi)容有誤,希望大家批評指出娄柳。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末寓辱,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子赤拒,更是在濱河造成了極大的恐慌秫筏,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,884評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件需了,死亡現(xiàn)場離奇詭異,居然都是意外死亡般甲,警方通過查閱死者的電腦和手機肋乍,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,755評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來敷存,“玉大人墓造,你說我怎么就攤上這事堪伍。” “怎么了觅闽?”我有些...
    開封第一講書人閱讀 158,369評論 0 348
  • 文/不壞的土叔 我叫張陵帝雇,是天一觀的道長。 經(jīng)常有香客問我蛉拙,道長尸闸,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,799評論 1 285
  • 正文 為了忘掉前任孕锄,我火速辦了婚禮吮廉,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘畸肆。我一直安慰自己宦芦,他們只是感情好,可當我...
    茶點故事閱讀 65,910評論 6 386
  • 文/花漫 我一把揭開白布轴脐。 她就那樣靜靜地躺著调卑,像睡著了一般。 火紅的嫁衣襯著肌膚如雪大咱。 梳的紋絲不亂的頭發(fā)上恬涧,一...
    開封第一講書人閱讀 50,096評論 1 291
  • 那天,我揣著相機與錄音徽级,去河邊找鬼气破。 笑死,一個胖子當著我的面吹牛餐抢,可吹牛的內(nèi)容都是我干的现使。 我是一名探鬼主播,決...
    沈念sama閱讀 39,159評論 3 411
  • 文/蒼蘭香墨 我猛地睜開眼旷痕,長吁一口氣:“原來是場噩夢啊……” “哼碳锈!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起欺抗,我...
    開封第一講書人閱讀 37,917評論 0 268
  • 序言:老撾萬榮一對情侶失蹤售碳,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后绞呈,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體贸人,經(jīng)...
    沈念sama閱讀 44,360評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,673評論 2 327
  • 正文 我和宋清朗相戀三年佃声,在試婚紗的時候發(fā)現(xiàn)自己被綠了艺智。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,814評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡圾亏,死狀恐怖十拣,靈堂內(nèi)的尸體忽然破棺而出封拧,到底是詐尸還是另有隱情,我是刑警寧澤夭问,帶...
    沈念sama閱讀 34,509評論 4 334
  • 正文 年R本政府宣布泽西,位于F島的核電站,受9級特大地震影響缰趋,放射性物質(zhì)發(fā)生泄漏捧杉。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 40,156評論 3 317
  • 文/蒙蒙 一埠胖、第九天 我趴在偏房一處隱蔽的房頂上張望糠溜。 院中可真熱鬧,春花似錦直撤、人聲如沸非竿。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,882評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽红柱。三九已至,卻和暖如春蓖乘,著一層夾襖步出監(jiān)牢的瞬間锤悄,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,123評論 1 267
  • 我被黑心中介騙來泰國打工嘉抒, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留零聚,地道東北人。 一個月前我還...
    沈念sama閱讀 46,641評論 2 362
  • 正文 我出身青樓些侍,卻偏偏與公主長得像隶症,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子岗宣,可洞房花燭夜當晚...
    茶點故事閱讀 43,728評論 2 351