java.util.concurrent.ThreadPoolExecutor線程池源碼(jdk1.7)

ThreadPoolExecutor類結(jié)構(gòu)


線程池執(zhí)行流程


  1. 線程池判斷核心線程是否都處于運行狀態(tài)候址,如果不是慢叨,就創(chuàng)建一個新線程來執(zhí)行任務(wù)。如果是丽惭,執(zhí)行2击奶。
  2. 判斷線程池中的工作隊列是否已經(jīng)滿,如果未滿责掏,那么直接添加到工作隊列柜砾,如果滿了就執(zhí)行3。
  3. 線程池判斷池中的線程是否處于工作狀態(tài)换衬。如果沒有就創(chuàng)建一個新工作線程執(zhí)行任務(wù)痰驱。如果已經(jīng)滿了,則執(zhí)行飽和策略的任務(wù)瞳浦。

ThreadPoolExecutor運行機制

    1. 工作線程數(shù) < 核心線程數(shù)
      1.1 new Thread創(chuàng)建新線程作為當前工作線程担映,將當前工作線程Worker添加到workers集合
      1.2 啟動當前線程(調(diào)用runWorker方法)
      1.2.1 獲取當前線程,當前線程已執(zhí)行完畢并釋放叫潦,那么從阻塞隊列獲取任務(wù)(調(diào)用getTask)
      1.2.1.1 如果等待隊列中存在了等待的任務(wù)蝇完,那么取出等待隊列中第一個任務(wù)返回。
      1.2.1.2 如果超過核心線程數(shù)的線程,那么timed = true短蜕,同時隊列中已經(jīng)沒有任務(wù)了氢架,那么等待keepalivetime秒就釋放。
      1.2.1.3 如果不超過(等于)核心線程數(shù)的線程朋魔,那么timed=false岖研,同時隊列中已經(jīng)沒有任務(wù)了,那么執(zhí)行take方法阻塞警检。
      1.2.1.3.1 如果此時進來一個新任務(wù)缎玫,那么發(fā)現(xiàn)阻塞線程數(shù)(工作線程數(shù))不小于核心線程數(shù),那么直接執(zhí)行offer插入等待隊列解滓,等待隊列有值了赃磨,阻塞的線程們又開始搶任務(wù)干活了。
      1.2.2 執(zhí)行我們自己的業(yè)務(wù)邏輯(調(diào)用run方法)
      1.2.3 移除workers中的當前線程洼裤,回收線程(調(diào)用processWorkerExit方法)
      1.3 移除workers中的當前線程邻辉,回收線程(調(diào)用addWorkerFailed方法)
    1. 工作線程數(shù) > 核心線程數(shù)
      將當前線程加入到等待隊列BlockQueue。
    1. 等待隊列滿了
      直接創(chuàng)建新線程作為工作線程執(zhí)行邏輯腮鞍。
    1. 工作線程數(shù) > 最大線程數(shù)
      拋出拒絕策略異常

ThreadPoolExecutor源碼分析


ThreadPoolExecutor線程池源碼流程圖.png

關(guān)鍵屬性:

    // 該變量ctl作為線程池中的重要屬性值骇,屬性一分為2;
    // 高位(前三位)用來保存各個線程的狀態(tài),低位(后三位)保存有效線程的數(shù)量移国。
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 線程數(shù)量占用的位數(shù)
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 最大線程容量為2的29次冪減1
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // 接受新任務(wù)并處理排隊任務(wù)
    private static final int RUNNING    = -1 << COUNT_BITS;
    // 不接受新任務(wù)吱瘩,而是處理隊列任務(wù)
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // 不接受新任務(wù),不處理排隊任務(wù)和中斷進程中的任務(wù)
    private static final int STOP       =  1 << COUNT_BITS;
    // 終止所有的任務(wù)迹缀,有效的線程數(shù)量設(shè)置為0使碾,TIDYING狀態(tài)的線程執(zhí)行回調(diào)方法terminated()
    private static final int TIDYING    =  2 << COUNT_BITS;
    // terminated()方法已經(jīng)執(zhí)行完畢
    private static final int TERMINATED =  3 << COUNT_BITS;

    // 持有工作線程隊列
    private final BlockingQueue<Runnable> workQueue;

    // 工作線程所在的集合上的鎖
    private final ReentrantLock mainLock = new ReentrantLock();

    // 包含所有工作線程的集合,僅在獲取到鎖時才可以訪問
    private final HashSet<Worker> workers = new HashSet<Worker>();

    // 使用等待隊列
    private final Condition termination = mainLock.newCondition();

    // 最大池的大小
    private int largestPoolSize;

    // 完成任務(wù)時的計數(shù)值祝懂,只能在終止工作線程時更新票摇,在獲取到鎖時才可以訪問
    private long completedTaskCount;

    // 創(chuàng)建新線程的工廠
    private volatile ThreadFactory threadFactory;

    // 關(guān)閉執(zhí)行的線程池時調(diào)用的Handler
    private volatile RejectedExecutionHandler handler;

    // 等待工作的線程的超時時間,單位納秒
    private volatile long keepAliveTime;

    // 是否允許核心線程等待砚蓬,
    // 默認false矢门,就是空閑也可以生存。
    // true的話灰蛙,在keepAliveTime時間內(nèi)生存祟剔。
    private volatile boolean allowCoreThreadTimeOut;

    // 核心運行的線程數(shù)
    private volatile int corePoolSize;

    // 最大線程數(shù),超出這個數(shù)量將執(zhí)行丟棄策略
    private volatile int maximumPoolSize;

    // 默認拒絕執(zhí)行策略Handler
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

關(guān)鍵方法:

private static int runStateOf(int c)     { return c & ~CAPACITY; }

說明:作用就是獲取線程池的狀態(tài)摩梧;過程:~CAPACITY :按位取反的值為11100000000000000000000000000000物延。c & ~CAPACITY : 執(zhí)行按位與操作,那么結(jié)果的低29位肯定為0障本。

private static int workerCountOf(int c)  { return c & CAPACITY; }

說明:獲取線程池工作線程的數(shù)量教届;過程:CAPACITY : 00011111111111111111111111111111响鹃。&操作將參數(shù)的高3位設(shè)置0。

 private static int ctlOf(int rs, int wc) { return rs | wc; }

說明:將runState和workerCount存到同一個int中案训。使用或運算买置,將兩個值合并。

    // 使用cas方式將線程的數(shù)量加1
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }

    // 使用cas方式將線程的數(shù)量減1
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }

    // 減少ctl的計數(shù)值
    private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }

詳解execute方法


  • 說明(不關(guān)心線程池的狀態(tài)和能否添加到等待隊列):
    ① 如果當前工作線程數(shù)小于核心線程數(shù)强霎,委派給addWorker方法(將封裝線程的Worker添加到Worker集合中忿项,然后啟動線程)。
    ② 如果當前工作線程數(shù)大于核心線程數(shù)城舞,將工作線程command添加到等待隊列轩触。

addWorker方法,將封裝線程的Worker添加到Worker集合中家夺,然后啟動線程脱柱。


// 添加任務(wù)
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    // 死循環(huán)遍歷
    for (;;) {
        // 獲取當前的線程池屬性值
        int c = ctl.get();
        // 獲取線程池的狀態(tài)
        int rs = runStateOf(c);
        // 如果線程池不可以接受任務(wù)
        if (rs >= SHUTDOWN && ! (rs == SHUTDOWN &&
               firstTask == null && ! workQueue.isEmpty()))
            return false;
        // 死循環(huán)遍歷
        for (;;) {
            // 獲取工作線程的數(shù)量
            int wc = workerCountOf(c);
            // 如果工作線程的數(shù)量超出線程池最大線程容量
            // 或者工作線程的數(shù)量超出核心線程池數(shù)量或者最大線程池的數(shù)量
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 采用cas方法更新線程狀態(tài)值,如果成功拉馋,那么worker數(shù)量加1
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 重新讀取線程池屬性值
            c = ctl.get();  // Re-read ctl
            // 如果線程池的狀態(tài)不等于之前的線程狀態(tài)榨为,重新執(zhí)行retry代碼塊
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 獲取線程池的鎖
        final ReentrantLock mainLock = this.mainLock;
        // 將firstTask封裝成worker
        w = new Worker(firstTask);
        // 獲取worker的工作線程
        final Thread t = w.thread;
        // 如果工作線程不為空
        if (t != null) {
            // 給線程池上鎖
            mainLock.lock();
            try {
                // 獲取當前的線程池的屬性值
                int c = ctl.get();
                // 獲取當前線程池的狀態(tài)
                int rs = runStateOf(c);
                // 如果線程池的狀態(tài)可以接受任務(wù)正常工作
                // 或者如果線程池不接受任務(wù)并且當前任務(wù)為空
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // 判斷工作線程是否可以啟動
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 將當前的工作線程添加到workers集合中
                    workers.add(w);
                    // 獲取Worker工作線程的數(shù)量
                    int s = workers.size();
                    // 如果Worker工作線程的數(shù)量超出最大池長度
                    if (s > largestPoolSize)
                        // 更新最大池長度
                        largestPoolSize = s;
                    // 設(shè)置工作線程添加成功
                    workerAdded = true;
                }
            } finally {
                // 線程池釋放鎖
                mainLock.unlock();
            }
            // 如果工作線程添加成功
            if (workerAdded) {
                // 啟動工作線程
                t.start();
                // 設(shè)置工作線程啟動成功
                workerStarted = true;
            }
        }
    } finally {
        // 如果工作線程啟動失敗煌茴!
        if (! workerStarted)
            // 回滾Worker線程的創(chuàng)建
            addWorkerFailed(w);
    }
    // 工作線程啟動成功標識
    return workerStarted;
}

說明(代碼分為兩個大塊邏輯):

  • 第一大塊随闺,兩個死循環(huán):可以發(fā)現(xiàn)代碼結(jié)構(gòu)是兩個死循環(huán),外循環(huán)的作用主要是檢查當前線程池的狀態(tài)蔓腐,如果線程池不可以接受任務(wù)矩乐,那么直接退出。內(nèi)循環(huán)的作用主要是檢查工作線程的數(shù)量回论。
    ① 如果線程池不可以接受任務(wù)散罕,那么直接退出。
    ② 如果工作線程的數(shù)量大于最大線程容量或者工作線程的數(shù)量大于核心線程池數(shù)量(最大線程池的數(shù)量)透葛,那么直接退出笨使。
    ③ 使用cas更新線程屬性值ctl(將ctl值加1),因為ctl的低位用來存儲工作線程的數(shù)量僚害,所以就是cas方式更新工作線程加1,跳出循環(huán)繁调。如果線程池的狀態(tài)和之前的不一致萨蚕,說明cas方式失敗了,那么重新執(zhí)行retry蹄胰。
  • 第二大塊岳遥,將當前的工作線程Worker添加到集合,然后啟動線程裕寨。

addWorkerFailed方法:線程添加失敗策略浩蓉。


工作線程Worker類設(shè)計


前面源碼設(shè)計派继,主要還是使用當前工作線程Worker對象,設(shè)計在線程池中執(zhí)行的流程捻艳,然后前面addWorker方法里面驾窟,啟動線程,執(zhí)行Worker的run方法认轨,現(xiàn)在進入Worker類里面绅络,看看源碼怎么設(shè)計的?

Worker類:封裝了需要執(zhí)行的線程類
Worker實現(xiàn)了Runnable接口嘁字,封裝了線程的實現(xiàn)和執(zhí)行恩急。并且繼承了AQS隊列同步器。簡化了獲取鎖和釋放鎖的過程纪蜒,交給AQS去實現(xiàn)了衷恭。

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable {
        
        private static final long serialVersionUID = 6138294804551838833L;

        // 運行中的線程,由ThreadFactory工廠創(chuàng)建
        final Thread thread;
        // 第一個執(zhí)行的任務(wù)纯续,可能為null
        Runnable firstTask;
        // 線程計數(shù)器匾荆,記住完成的任務(wù)
        volatile long completedTasks;

        // 使用線程工廠ThreadFactory為第一個任務(wù)firstTask創(chuàng)建線程
        Worker(Runnable firstTask) {
            // 在執(zhí)行runWorker之前,禁止中斷操作
            setState(-1); // inhibit interrupts until runWorker
            // 初始化第一個執(zhí)行的任務(wù)
            this.firstTask = firstTask;
            // 獲取線程工廠杆烁,為當前對象創(chuàng)建一個線程來初始化
            this.thread = getThreadFactory().newThread(this);
        }

        // 運行線程run方法委托給外部的runWorker來執(zhí)行
        public void run() {
            runWorker(this);
        }

        // 判斷當前線程是否獨占(使用state狀態(tài)值判斷牙丽,如果獲取鎖就加1,如果釋放鎖就減1)
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        // 當前線程嘗試獲取鎖
        protected boolean tryAcquire(int unused) {
            // 使用cas的方式來設(shè)置當前線程獲取鎖
            if (compareAndSetState(0, 1)) {
                // 設(shè)置當前線程為獨占線程
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        
        // 當前線程嘗試釋放鎖
        protected boolean tryRelease(int unused) {
            // 將獨占線程設(shè)置為null
            setExclusiveOwnerThread(null);
            // 使用cas方式更新當前同步狀態(tài)值為0
            setState(0);
            return true;
        }
        
        // 獨占式獲取鎖
        public void lock()        { acquire(1); }
        // 嘗試以獨占式獲取鎖
        public boolean tryLock()  { return tryAcquire(1); }
        // 獨占式釋放鎖
        public void unlock()      { release(1); }
        // 判斷當前線程是否獨占著鎖
        public boolean isLocked() { return isHeldExclusively(); }
        // 線程啟動后中斷
        void interruptIfStarted() {
            Thread t;
            // 標識線程中斷
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

說明:

  • 里面大多實現(xiàn)的方法兔魂,獲取鎖烤芦,釋放鎖,判斷當前線程是否中斷都是AQS的cas操作同步狀態(tài)值state的方式析校。
    AbstractQueuedSynchronizer同步隊列源碼
  • Worker本身被設(shè)計為一個線程類构罗,需要初始化第一個執(zhí)行的線程以及線程Thread對象,線程執(zhí)行run方法的邏輯委派給外部的runWorker方法智玻。

runWorker方法遂唧,Worker的run線程執(zhí)行的方法;就是執(zhí)行線程任務(wù)吊奢。


getTask方法盖彭,從等待隊列獲取工作線程。


private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        // 設(shè)置核心線程超時或者當前工作線程數(shù)大于核心線程數(shù)
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                // 超時keepAliveTime秒釋放當前工作線程
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                // 當前工作線程阻塞
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

processWorkerExit方法页滚,線程池關(guān)閉召边,回收線程


private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            // 工作線程數(shù)量減1
            decrementWorkerCount();
        // 獲取重入鎖
        final ReentrantLock mainLock = this.mainLock;
        // 上鎖
        mainLock.lock();
        try {
            // 完成任務(wù)的數(shù)量增加
            completedTaskCount += w.completedTasks;
            // 工作線程集合移除工作線程
            workers.remove(w);
        } finally {
            // 釋放鎖
            mainLock.unlock();
        }
        
        // 終止線程池
        tryTerminate();
        
        // cas方式獲取ctl值
        int c = ctl.get();
        // 如果當前線程池接受新任務(wù)且處理排隊任務(wù)
        // 或者線程池雖然不接受新任務(wù)但是還處理排隊任務(wù)
        if (runStateLessThan(c, STOP)) {
            
            if (!completedAbruptly) {
                // min 線程池最小空閑數(shù)
                // allowCoreThreadTimeOut允許核心線程在keepAliveTime時間等待之后是否允許生存
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                // 如果min == 0但是隊列不為空要保證有1個線程來執(zhí)行隊列中的任務(wù)
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                // 如果線程池中工作線程不為空,就直接返回了
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            // 線程池將不處理隊列中的任務(wù)并且等待隊列不為空裹驰,直接返回false
            addWorker(null, false);
        }
 }

shutdown方法隧熙,將線程池的狀態(tài)轉(zhuǎn)換為SHUTDOWN并且終止所有線程

// 將線程池的狀態(tài)轉(zhuǎn)換為SHUTDOWN并且終止所有線程
    public void shutdown() {
        // 獲取重入鎖
        final ReentrantLock mainLock = this.mainLock;
        // 上鎖
        mainLock.lock();
        try {
            // 檢查線程池關(guān)閉的權(quán)限
            checkShutdownAccess();
            // 將當前線程池的狀態(tài)轉(zhuǎn)換到目標狀態(tài)targetState
            advanceRunState(SHUTDOWN);
            // 中斷等待隊列中的空閑線程
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            // 釋放鎖
            mainLock.unlock();
        }
        // 將線程池的狀態(tài)轉(zhuǎn)換為終止狀態(tài)
        tryTerminate();
    }

    // 將當前線程池的狀態(tài)轉(zhuǎn)換到目標狀態(tài)targetState
    private void advanceRunState(int targetState) {
        for (;;) {
            int c = ctl.get();
            if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }

線程回收

問題:如果我們不設(shè)置線程池回收線程,那么線程池中的線程會怎么樣幻林。

public static void main(String[] args) {
        ThreadPoolExecutor tp = new ThreadPoolExecutor(3, 
                5, 
                1, 
                TimeUnit.SECONDS, 
                new ArrayBlockingQueue<Runnable>(10),
                new ThreadPoolExecutor.CallerRunsPolicy());
        
        for (int i = 0 ; i < 5; i++) {
            tp.execute(new Runnable() {
                
                public void run() {
                    System.out.println(Thread.currentThread().getName());
                }
            });
        }
    }
線程池不回收線程.png

說明:線程池中的線程執(zhí)行完任務(wù)后贞盯,當從等待隊列中獲取任務(wù)時音念,因為任務(wù)已經(jīng)執(zhí)行完了,沒有任務(wù)了躏敢,阻塞的工作隊列已經(jīng)空了闷愤,那么線程只能處于阻塞狀態(tài)了(這段答案在上面getTask方法的源碼)。

allowCoreThreadTimeOut方法父丰,設(shè)置線程超時回收(包括核心線程和非核心線程)

    // 設(shè)置線程超時回收(包括核心線程和非核心線程)
    public void allowCoreThreadTimeOut(boolean value) {
        // 如果keepAliveTime設(shè)置不大于0肝谭,那么報出非法參數(shù)異常
        if (value && keepAliveTime <= 0)
            throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
        // value為true時向下執(zhí)行,allowCoreThreadTimeOut默認值為false
        if (value != allowCoreThreadTimeOut) {
            // allowCoreThreadTimeOut設(shè)置為true
            allowCoreThreadTimeOut = value;
            if (value)
                // 中斷等待任務(wù)的線程
                interruptIdleWorkers();
        }
    }
    
    // 中斷等待任務(wù)的線程
    private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }
    
    // 中斷等待任務(wù)的線程
    private void interruptIdleWorkers(boolean onlyOne) {
        // 獲取重入鎖
        final ReentrantLock mainLock = this.mainLock;
        // 上鎖
        mainLock.lock();
        try {
            // 遍歷線程集合
            for (Worker w : workers) {
                Thread t = w.thread;
                // 允許嘗試獲取鎖且非中斷
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        // 將線程中斷
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        // 釋放鎖
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            // 釋放鎖
            mainLock.unlock();
        }
    }

說明:通過設(shè)置allowCoreThreadTimeOut為true蛾扇,線程池中的線程如果阻塞的時間超過了keepAliveTime攘烛,那么將被標記為中斷標志。
還可以通過前面提到的關(guān)閉線程池shutdown方法:檢查線程池關(guān)閉的權(quán)限镀首、將當前線程池狀態(tài)轉(zhuǎn)為為SHUTDOWN坟漱、中斷等待(阻塞)的線程。

源碼閱讀總結(jié)


1. 線程池采用ThreadFactory中默認的DefaultThreadFactory實現(xiàn)類來創(chuàng)建的更哄,使用工廠方法模式來設(shè)計的線程工廠創(chuàng)建線程芋齿。(而工廠方法模式對于我的理解:一個產(chǎn)物對應一個工廠)
2. 線程池的任務(wù)飽和策略?
AbortPolicy:直接拋出RejectedExecutionException異常成翩。
CallerRunsPolicy:只用調(diào)用者所在線程來運行任務(wù)觅捆。
DiscardOldestPolicy:丟棄隊列里最近的一個任務(wù),并執(zhí)行當前任務(wù)麻敌。
DiscardPolicy:不處理栅炒,丟棄掉。
3. 線程池什么時候啟動术羔?
答案:在new ThreadPoolExecutor類時并沒有啟動線程池赢赊,只是設(shè)置了參數(shù),而線程池的啟動是在執(zhí)行execute方法(源碼中是addWorker方法)级历。
4.線程池執(zhí)行任務(wù)的邏輯(execute方法的執(zhí)行邏輯)释移?
4.1. 工作線程數(shù) < 核心線程數(shù)(調(diào)用addWorker方法)
4.1.1 new Thread創(chuàng)建新線程作為當前工作線程,將當前工作線程Worker添加到workers集合
4.1.2 啟動當前線程(調(diào)用runWorker方法)
4.1.2.1 獲取當前線程寥殖,當前線程已執(zhí)行完畢并釋放玩讳,那么從阻塞隊列獲取任務(wù)(調(diào)用getTask)
4.1.2.1.1 如果等待隊列中存在了等待的任務(wù),那么取出等待隊列中第一個任務(wù)返回扛禽。
4.1.2.1.2 如果超過核心線程數(shù)的線程锋边,那么timed = true,同時隊列中已經(jīng)沒有任務(wù)了编曼,那么等待keepalivetime秒就釋放。
4.1.2.1.3 如果不超過(等于)核心線程數(shù)的線程剩辟,那么timed=false掐场,同時隊列中已經(jīng)沒有任務(wù)了往扔,那么執(zhí)行take方法阻塞。
4.1.2.1.3.1 如果此時進來一個新任務(wù)熊户,那么發(fā)現(xiàn)阻塞線程數(shù)(工作線程數(shù))不小于核心線程數(shù)萍膛,那么直接執(zhí)行offer插入等待隊列,等待隊列有值了嚷堡,阻塞的線程們又開始搶任務(wù)干活了蝗罗。
4.1.2.2 執(zhí)行我們自己的業(yè)務(wù)邏輯(調(diào)用run方法)
4.1.2.3 移除workers中的當前線程,回收線程(調(diào)用processWorkerExit方法)
4.1.3 移除workers中的當前線程蝌戒,回收線程(調(diào)用addWorkerFailed方法)
4.2. 工作線程數(shù) > 核心線程數(shù)
4.2.1 將當前線程加入到等待隊列BlockQueue<Runnable>
4.3. 如果等待隊列滿了
4.3.1 直接創(chuàng)建新線程作為工作線程執(zhí)行邏輯串塑。
4.4. 如果當前工作線程數(shù) > 最大線程數(shù)
4.4.1 拋出拒絕策略異常

5. 線程釋放策略?
①第一種策略:使用allowCoreThreadTimeOut方法對核心線程和非核心線程進行釋放(調(diào)用poll方法北苟,超出了keepAliveTime桩匪,自動釋放)。
②第二種策略:使用shutdown方法關(guān)閉線程池來對核心線程和非核心線程進行釋放(主動為Workers集合中的線程設(shè)置中斷標識友鼻,自動釋放)傻昙。

6. 線程回收處理?
getTask中使用cas自旋彩扔,達到-1時妆档,退出方法,回收線程虫碉。

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
    // 工作線程的數(shù)量減1
    decrementWorkerCount();
    return null;
}

private void decrementWorkerCount() {
    do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}

private boolean compareAndDecrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect - 1);
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末贾惦,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子蔗衡,更是在濱河造成了極大的恐慌纤虽,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,277評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件绞惦,死亡現(xiàn)場離奇詭異逼纸,居然都是意外死亡,警方通過查閱死者的電腦和手機济蝉,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評論 3 393
  • 文/潘曉璐 我一進店門杰刽,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人王滤,你說我怎么就攤上這事贺嫂。” “怎么了雁乡?”我有些...
    開封第一講書人閱讀 163,624評論 0 353
  • 文/不壞的土叔 我叫張陵第喳,是天一觀的道長。 經(jīng)常有香客問我踱稍,道長曲饱,這世上最難降的妖魔是什么悠抹? 我笑而不...
    開封第一講書人閱讀 58,356評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮扩淀,結(jié)果婚禮上楔敌,老公的妹妹穿的比我還像新娘。我一直安慰自己驻谆,他們只是感情好卵凑,可當我...
    茶點故事閱讀 67,402評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著胜臊,像睡著了一般勺卢。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上区端,一...
    開封第一講書人閱讀 51,292評論 1 301
  • 那天值漫,我揣著相機與錄音,去河邊找鬼织盼。 笑死杨何,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的沥邻。 我是一名探鬼主播危虱,決...
    沈念sama閱讀 40,135評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼唐全!你這毒婦竟也來了埃跷?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,992評論 0 275
  • 序言:老撾萬榮一對情侶失蹤邮利,失蹤者是張志新(化名)和其女友劉穎弥雹,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體延届,經(jīng)...
    沈念sama閱讀 45,429評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡剪勿,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,636評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了方庭。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片厕吉。...
    茶點故事閱讀 39,785評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖械念,靈堂內(nèi)的尸體忽然破棺而出头朱,到底是詐尸還是另有隱情,我是刑警寧澤龄减,帶...
    沈念sama閱讀 35,492評論 5 345
  • 正文 年R本政府宣布项钮,位于F島的核電站,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏寄纵。R本人自食惡果不足惜鳖敷,卻給世界環(huán)境...
    茶點故事閱讀 41,092評論 3 328
  • 文/蒙蒙 一脖苏、第九天 我趴在偏房一處隱蔽的房頂上張望程拭。 院中可真熱鬧,春花似錦棍潘、人聲如沸恃鞋。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,723評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽恤浪。三九已至,卻和暖如春肴楷,著一層夾襖步出監(jiān)牢的瞬間水由,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,858評論 1 269
  • 我被黑心中介騙來泰國打工赛蔫, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留砂客,地道東北人。 一個月前我還...
    沈念sama閱讀 47,891評論 2 370
  • 正文 我出身青樓呵恢,卻偏偏與公主長得像鞠值,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子渗钉,可洞房花燭夜當晚...
    茶點故事閱讀 44,713評論 2 354

推薦閱讀更多精彩內(nèi)容