Java多進(jìn)程----ThreadPoolExecutor實(shí)現(xiàn)機(jī)制解析

一闯第、相關(guān)概念

ThreadPoolExecutor是jdk內(nèi)置線程池的一個(gè)實(shí)現(xiàn)兔院,基本上大部分情況都會(huì)使用這個(gè)線程池完成并發(fā)操作零聚,下面是涉及到ThreadPoolExecutor的相關(guān)概念。

  1. ThreadPoolExecutor: 這是線程池實(shí)現(xiàn)類煎谍,會(huì)動(dòng)態(tài)創(chuàng)建多個(gè)線程攘蔽,并發(fā)執(zhí)行提交的多個(gè)任務(wù);

  2. Worker: 是個(gè)Runnable實(shí)現(xiàn)類來(lái)的呐粘,內(nèi)部會(huì)創(chuàng)建一個(gè)線程满俗,一直循環(huán)不斷執(zhí)行任務(wù),所以可以認(rèn)為一個(gè)Worker就是一個(gè)工作線程;

  3. corePoolSize: 當(dāng)池中總線程數(shù)<corePoolSize時(shí)作岖,提交一個(gè)任務(wù)創(chuàng)建一個(gè)新線程唆垃,而不管已經(jīng)存在的線程是不是閑著,通常情況下痘儡,一旦線程數(shù)達(dá)到了corePoolSize辕万,那么池中總線程數(shù)是不會(huì)跌破到corePoolSize以下的。(除非 allowCoreThreadTimeOut=true并且keepAliveTime>0)沉删;

  4. maximumPoolSize: 當(dāng)池中線程數(shù)達(dá)到了corePoolSize渐尿,這時(shí)候新提交的任務(wù)就會(huì)放入等待隊(duì)列中,一般情況下矾瑰,這些任務(wù)會(huì)被前面創(chuàng)建的 corePoolSize個(gè)線程執(zhí)行涡戳。當(dāng)任務(wù)提交速度過(guò)快,隊(duì)列滿了脯倚,這時(shí)候渔彰,如果當(dāng)前總線程數(shù)<maximumPoolSize,那么線程池會(huì)創(chuàng)建一個(gè)新的線程來(lái)執(zhí)行新提交的任務(wù)推正,否則根據(jù)策略放棄任務(wù)恍涂;

  5. keepAliveTime:存活時(shí)間,分兩種情況: (1)allowCoreThreadTimeOut=true植榕,所有線程再沧,一旦創(chuàng)建后,在keepAliveTime時(shí)間內(nèi)尊残,如果沒(méi)有任務(wù)可以執(zhí)行炒瘸,則該線程會(huì)退出并銷(xiāo)毀,這樣的好處是系統(tǒng)不忙時(shí)可以回收線程資源寝衫;(2)allowCoreThreadTimeOut=false顷扩,如果總線程數(shù)<=corePoolSize,那么這些線程是不會(huì)退出的慰毅,他們會(huì)一直不斷的等待任務(wù)并執(zhí)行隘截,哪怕當(dāng)前沒(méi)有任務(wù),但如果線程數(shù)>corePoolSize,而且一旦一個(gè)線程閑的時(shí)間超過(guò) keepAliveTime則會(huì)退出婶芭,但一旦降低到corePoolSize东臀,則不會(huì)再退出了。

  6. allowCoreThreadTimeOut: 用于決定是否在系統(tǒng)閑時(shí)可以逐步回收所有的線程犀农,如果為allowCoreThreadTimeOut=true,必須結(jié)合keepAliveTime一起使用惰赋,用于決定當(dāng)線程數(shù)<corePoolSize時(shí),是否要回收這些線程呵哨。

  7. workQueue:這是一個(gè)阻塞隊(duì)列赁濒,當(dāng)線程數(shù)>=corePoolSize,這時(shí)候提交的任務(wù)將會(huì)放入阻塞隊(duì)列中仇穗,如果阻塞隊(duì)列是無(wú)界的流部,那么總的線程數(shù)是不可能>corePoolSize的,即maximumPoolSize屬性就是無(wú)用的纹坐;如果阻塞隊(duì)列是有界的枝冀,而且未滿,則任務(wù)入隊(duì)耘子,否則根據(jù)maximumPoolSize的值判斷是要新建線程執(zhí)行新任務(wù)或者是根據(jù)策略丟棄任務(wù)果漾。

通過(guò)下面演示,熟悉上面相關(guān)概念:


ThreadPoolExecutor演示狀態(tài)圖
ThreadPoolExecutor演示狀態(tài)圖

二谷誓、ThreadPoolExecutor狀態(tài)

ThreadPoolExecutor的五個(gè)狀態(tài)

線程池有5個(gè)狀態(tài)绒障,分別是:

  • RUNNING:可以接受新的任務(wù),也可以處理阻塞隊(duì)列里的任務(wù)
  • SHUTDOWN:不接受新的任務(wù)捍歪,但是可以處理阻塞隊(duì)列里的任務(wù)
  • STOP:不接受新的任務(wù)户辱,不處理阻塞隊(duì)列里的任務(wù),中斷正在處理的任務(wù)
  • TIDYING:過(guò)渡狀態(tài)糙臼,也就是說(shuō)所有的任務(wù)都執(zhí)行完了庐镐,當(dāng)前線程池已經(jīng)沒(méi)有有效的線程,這個(gè)時(shí)候線程池的狀態(tài)將會(huì)TIDYING变逃,并且將要調(diào)用terminated方法
  • TERMINATED:終止?fàn)顟B(tài)必逆。terminated方法調(diào)用完成以后的狀態(tài)
狀態(tài)的轉(zhuǎn)換

狀態(tài)之間可以進(jìn)行轉(zhuǎn)換:

  • RUNNING -> SHUTDOWN:手動(dòng)調(diào)用shutdown方法,或者ThreadPoolExecutor要被GC回收的時(shí)候調(diào)用finalize方法揽乱,finalize方法內(nèi)部也會(huì)調(diào)用shutdown方法

  • (RUNNING or SHUTDOWN) -> STOP:調(diào)用shutdownNow方法

  • SHUTDOWN -> TIDYING:當(dāng)隊(duì)列和線程池都為空的時(shí)候

  • STOP -> TIDYING:當(dāng)線程池為空的時(shí)候

  • TIDYING -> TERMINATED:terminated方法調(diào)用完成之后

狀態(tài)的表示

在ThreadPoolExecutor名眉,利用AtomicInteger(一個(gè)提供原子操作的Integer的類)保存狀態(tài)和線程數(shù),整型中32位的前3位用來(lái)表示線程池狀態(tài)凰棉,后3位表示線程池中有效的線程數(shù)损拢。

// 前3位表示狀態(tài),所有線程數(shù)占29位
private static final int COUNT_BITS = Integer.SIZE - 3;

線程池容量大小為 1 << 29 - 1 = 00011111111111111111111111111111(二進(jìn)制)渊啰,代碼如下
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

RUNNING狀態(tài) -1 << 29 = 11111111111111111111111111111111 << 29 = 11100000000000000000000000000000(前3位為111):
private static final int RUNNING = -1 << COUNT_BITS;

SHUTDOWN狀態(tài) 0 << 29 = 00000000000000000000000000000000 << 29 = 00000000000000000000000000000000(前3位為000):
private static final int SHUTDOWN = 0 << COUNT_BITS;

STOP狀態(tài) 1 << 29 = 00000000000000000000000000000001 << 29 = 00100000000000000000000000000000(前3位為001):
private static final int STOP = 1 << COUNT_BITS;

TIDYING狀態(tài) 2 << 29 = 00000000000000000000000000000010 << 29 = 01000000000000000000000000000000(前3位為010):
private static final int TIDYING = 2 << COUNT_BITS;

TERMINATED狀態(tài) 3 << 29 = 00000000000000000000000000000011 << 29 = 01100000000000000000000000000000(前3位為011):
private static final int TERMINATED = 3 << COUNT_BITS;

清楚狀態(tài)位之后探橱,下面是獲得狀態(tài)和線程數(shù)的內(nèi)部方法:


    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // 得到線程數(shù)申屹,也就是后29位的數(shù)字绘证。 直接跟CAPACITY做一個(gè)與操作即可隧膏,CAPACITY就是的值就 1 << 29 - 1 = 00011111111111111111111111111111。 與操作的話前面3位肯定為0嚷那,相當(dāng)于直接取后29位的值
    private static int workerCountOf(int c)  { return c & CAPACITY; }

    // 得到狀態(tài)胞枕,CAPACITY的非操作得到的二進(jìn)制位11100000000000000000000000000000,然后做在一個(gè)與操作魏宽,相當(dāng)于直接取前3位的的值
    private static int runStateOf(int c)     { return c & ~CAPACITY; }

    // 或操作腐泻。相當(dāng)于更新數(shù)量和狀態(tài)兩個(gè)操作
    private static int ctlOf(int rs, int wc) { return rs | wc; }

三、源碼分析

根據(jù)javadoc中關(guān)于ThreadPoolExecutor類的描述可知队询。ThreadPoolExecutor的實(shí)現(xiàn)主要依靠?jī)蓚€(gè)數(shù)據(jù)結(jié)構(gòu):

  1. 線程池
  2. 任務(wù)隊(duì)列

任務(wù)隊(duì)列使用的數(shù)據(jù)結(jié)構(gòu)比較容易想到派桩,可以采用實(shí)現(xiàn)了java.util.concurrent.BlockingQueue接口的類。
線程池該怎么實(shí)現(xiàn)才能讓線程池里的任務(wù)持續(xù)執(zhí)行一個(gè)接一個(gè)的任務(wù)呢蚌斩?

ThreadPoolExecutor類
public class ThreadPoolExecutor extends AbstractExecutorService {

    ...

    /**
     * Set containing all worker threads in pool. Accessed only when
     * holding mainLock.
     */
    private final HashSet<Worker> workers = new HashSet<Worker>();

    ...    

    /**
     * The queue used for holding tasks and handing off to worker
     * threads.  We do not require that workQueue.poll() returning
     * null necessarily means that workQueue.isEmpty(), so rely
     * solely on isEmpty to see if the queue is empty (which we must
     * do for example when deciding whether to transition from
     * SHUTDOWN to TIDYING).  This accommodates special-purpose
     * queues such as DelayQueues for which poll() is allowed to
     * return null even if it may later return non-null when delays
     * expire.
     */
    private final BlockingQueue<Runnable> workQueue;

    ...

}

如代碼中的注釋所說(shuō)铆惑,workers就是存放工作線程的線程池,就是一個(gè)簡(jiǎn)單的HashSet送膳。那么员魏,關(guān)鍵信息一定是藏在這個(gè)Worker類里了。

Worker類
private final class Worker    
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        // 使用ThreadFactory構(gòu)造Thread叠聋,這個(gè)構(gòu)造的Thread內(nèi)部的Runnable就是本身撕阎,也就是Worker。所以得到Worker的thread并start的時(shí)候碌补,會(huì)執(zhí)行Worker的run方法虏束,也就是執(zhí)行ThreadPoolExecutor的runWorker方法
        setState(-1);  //把狀態(tài)位設(shè)置成-1,這樣任何線程都不能得到Worker的鎖厦章,除非調(diào)用了unlock方法镇匀。這個(gè)unlock方法會(huì)在runWorker方法中一開(kāi)始就調(diào)用,這是為了確保Worker構(gòu)造出來(lái)之后闷袒,沒(méi)有任何線程能夠得到它的鎖坑律,除非調(diào)用了runWorker之后,其他線程才能獲得Worker的鎖
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }

    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

Worker是ThreadPoolExecutor的內(nèi)部類囊骤,成員變量thread就是實(shí)際執(zhí)行任務(wù)的線程晃择。這個(gè)thread不直接執(zhí)行用戶提交的任務(wù),它執(zhí)行的任務(wù)就是它所在的Worker對(duì)象也物。 Worker對(duì)象的run()方法調(diào)用了ThreadPoolExecutor.runWorker(Worker w)方法宫屠。

ThreadPoolExecutor.runWorker(Worker w)
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread(); // 得到當(dāng)前線程
    Runnable task = w.firstTask; // 得到Worker中的任務(wù)task,也就是用戶傳入的task
    w.firstTask = null; // 將Worker中的任務(wù)置空
    w.unlock(); // allow interrupts滑蚯。 
    boolean completedAbruptly = true;
    try {
        // 如果worker中的任務(wù)不為空浪蹂,繼續(xù)知否抵栈,否則使用getTask獲得任務(wù)。一直死循環(huán)坤次,除非得到的任務(wù)為空才退出
        while (task != null || (task = getTask()) != null) {
            w.lock();  // 如果拿到了任務(wù)古劲,給自己上鎖,表示當(dāng)前Worker已經(jīng)要開(kāi)始執(zhí)行任務(wù)了缰猴,已經(jīng)不是閑置Worker(閑置Worker的解釋請(qǐng)看下面的線程池關(guān)閉)
            // 在執(zhí)行任務(wù)之前先做一些處理产艾。 1. 如果線程池已經(jīng)處于STOP狀態(tài)并且當(dāng)前線程沒(méi)有被中斷,中斷線程 2. 如果線程池還處于RUNNING或SHUTDOWN狀態(tài)滑绒,并且當(dāng)前線程已經(jīng)被中斷了闷堡,重新檢查一下線程池狀態(tài),如果處于STOP狀態(tài)并且沒(méi)有被中斷疑故,那么中斷線程
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task); // 任務(wù)執(zhí)行前需要做什么杠览,ThreadPoolExecutor是個(gè)空實(shí)現(xiàn)
                Throwable thrown = null;
                try {
                    task.run(); // 真正的開(kāi)始執(zhí)行任務(wù),調(diào)用的是run方法纵势,而不是start方法踱阿。這里run的時(shí)候可能會(huì)被中斷,比如線程池調(diào)用了shutdownNow方法
                } catch (RuntimeException x) { // 任務(wù)執(zhí)行發(fā)生的異常全部拋出吨悍,不在runWorker中處理
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown); // 任務(wù)執(zhí)行結(jié)束需要做什么扫茅,ThreadPoolExecutor是個(gè)空實(shí)現(xiàn)
                }
            } finally {
                task = null;
                w.completedTasks++; // 記錄執(zhí)行任務(wù)的個(gè)數(shù)
                w.unlock(); // 執(zhí)行完任務(wù)之后,解鎖育瓜,Worker變成閑置Worker
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly); // 回收Worker方法
    }
}

程序的大致邏輯就是在firstTask或getTask()返回方法不為空的情況下執(zhí)行task.run()葫隙。這里的getTask()方法就是從用戶任務(wù)隊(duì)列workQueue獲取任務(wù)的那個(gè)方法。

我們看一下getTask方法是如何獲得任務(wù)的:

ThreadPoolExecutor.getTask()
// 如果發(fā)生了以下四件事中的任意一件躏仇,那么Worker需要被回收:
// 1. Worker個(gè)數(shù)比線程池最大大小要大
// 2. 線程池處于STOP狀態(tài)
// 3. 線程池處于SHUTDOWN狀態(tài)并且阻塞隊(duì)列為空
// 4. 使用超時(shí)時(shí)間從阻塞隊(duì)列里拿數(shù)據(jù)恋脚,并且超時(shí)之后沒(méi)有拿到數(shù)據(jù)(allowCoreThreadTimeOut || workerCount > corePoolSize)
private Runnable getTask() {
    boolean timedOut = false; // 如果使用超時(shí)時(shí)間并且也沒(méi)有拿到任務(wù)的標(biāo)識(shí)

    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 如果線程池是SHUTDOWN狀態(tài)并且阻塞隊(duì)列為空的話,worker數(shù)量減一焰手,直接返回null(SHUTDOWN狀態(tài)還會(huì)處理阻塞隊(duì)列任務(wù)糟描,但是阻塞隊(duì)列為空的話就結(jié)束了),如果線程池是STOP狀態(tài)的話书妻,worker數(shù)量建議船响,直接返回null(STOP狀態(tài)不處理阻塞隊(duì)列任務(wù))[方法一開(kāi)始注釋的2,3兩點(diǎn)躲履,返回null见间,開(kāi)始Worker回收]
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        boolean timed;      // 標(biāo)記從隊(duì)列中取任務(wù)時(shí)是否設(shè)置超時(shí)時(shí)間,如果為true說(shuō)明這個(gè)worker可能需要回收工猜,為false的話這個(gè)worker會(huì)一直存在米诉,并且阻塞當(dāng)前線程等待阻塞隊(duì)列中有數(shù)據(jù)

        for (;;) {
            int wc = workerCountOf(c); // 得到當(dāng)前線程池Worker個(gè)數(shù)
            // allowCoreThreadTimeOut屬性默認(rèn)為false,表示線程池中的核心線程在閑置狀態(tài)下還保留在池中篷帅;如果是true表示核心線程使用keepAliveTime這個(gè)參數(shù)來(lái)作為超時(shí)時(shí)間
            // 如果worker數(shù)量比基本大小要大的話史侣,timed就為true拴泌,需要進(jìn)行回收worker
            timed = allowCoreThreadTimeOut || wc > corePoolSize; 

            if (wc <= maximumPoolSize && ! (timedOut && timed)) // 方法一開(kāi)始注釋的1,4兩點(diǎn)惊橱,會(huì)進(jìn)行下一步worker數(shù)量減一
                break;
            if (compareAndDecrementWorkerCount(c)) // worker數(shù)量減一蚪腐,返回null,之后會(huì)進(jìn)行Worker回收工作
                return null;
            c = ctl.get();  // 重新檢查線程池狀態(tài)
            if (runStateOf(c) != rs) // 線程池狀態(tài)改變的話重新開(kāi)始外部循環(huán)李皇,否則繼續(xù)內(nèi)部循環(huán)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }

        try {
            // 如果需要設(shè)置超時(shí)時(shí)間削茁,使用poll方法宙枷,否則使用take方法一直阻塞等待阻塞隊(duì)列新進(jìn)數(shù)據(jù)
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false; // 閑置Worker被中斷
        }
    }
}

Worker類的執(zhí)行邏輯大致就是這樣了掉房。那么ThreadPoolExecutor是如何新建和啟動(dòng)這些Worker類的呢?

來(lái)看一下我們提交任務(wù)時(shí)使用的ThreadPoolExecutor.execute(Runnable command)方法慰丛。

ThreadPoolExecutor.execute(Runable command)
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {   // 第一個(gè)步驟卓囚,滿足線程池中的線程大小比基本大小要小
        if (addWorker(command, true)) // addWorker方法第二個(gè)參數(shù)true表示使用基本大小
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) { // 第二個(gè)步驟,線程池的線程大小比基本大小要大诅病,并且線程池還在RUNNING狀態(tài)哪亿,阻塞隊(duì)列也沒(méi)滿的情況,加到阻塞隊(duì)列里
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command)) // 雖然滿足了第二個(gè)步驟贤笆,但是這個(gè)時(shí)候可能突然線程池關(guān)閉了蝇棉,所以再做一層判斷
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false)) // 第三個(gè)步驟,直接使用線程池最大大小芥永。addWorker方法第二個(gè)參數(shù)false表示使用最大大小
        reject(command);
}

除去處理ThreadPoolExecutor對(duì)象狀態(tài)的代碼篡殷,最關(guān)鍵的兩段代碼就是workQueue.offer(command)和addWorker(command, true)。
workQueue.offer(command)是將任務(wù)加入隊(duì)列埋涧;新建和啟動(dòng)Worker對(duì)象的代碼就是在addWorker(command, true)里了板辽。

addWorker(Runnable firstTask, boolean core)
// 兩個(gè)參數(shù),firstTask表示需要跑的任務(wù)棘催。boolean類型的core參數(shù)為true的話表示使用線程池的基本大小劲弦,為false使用線程池最大大小
// 返回值是boolean類型,true表示新任務(wù)被接收了醇坝,并且執(zhí)行了邑跪。否則是false
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c); // 線程池當(dāng)前狀態(tài)

        // 這個(gè)判斷轉(zhuǎn)換成 rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty)。 
        // 概括為3個(gè)條件:
        // 1. 線程池不在RUNNING狀態(tài)并且狀態(tài)是STOP呼猪、TIDYING或TERMINATED中的任意一種狀態(tài)

        // 2. 線程池不在RUNNING狀態(tài)画畅,線程池接受了新的任務(wù) 

        // 3. 線程池不在RUNNING狀態(tài),阻塞隊(duì)列為空郑叠。  滿足這3個(gè)條件中的任意一個(gè)的話夜赵,拒絕執(zhí)行任務(wù)

        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c); // 線程池線程個(gè)數(shù)
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize)) // 如果線程池線程數(shù)量超過(guò)線程池最大容量或者線程數(shù)量超過(guò)了基本大小(core參數(shù)為true,core參數(shù)為false的話判斷超過(guò)最大大小)
                return false; // 超過(guò)直接返回false
            if (compareAndIncrementWorkerCount(c)) // 沒(méi)有超過(guò)各種大小的話乡革,cas操作線程池線程數(shù)量+1寇僧,cas成功的話跳出循環(huán)
                break retry;
            c = ctl.get();  // 重新檢查狀態(tài)
            if (runStateOf(c) != rs) // 如果狀態(tài)改變了摊腋,重新循環(huán)操作
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    // 走到這一步說(shuō)明cas操作成功了,線程池線程數(shù)量+1
    boolean workerStarted = false; // 任務(wù)是否成功啟動(dòng)標(biāo)識(shí)
    boolean workerAdded = false; // 任務(wù)是否添加成功標(biāo)識(shí)
    Worker w = null;
    try {
        final ReentrantLock mainLock = this.mainLock; // 得到線程池的可重入鎖
        w = new Worker(firstTask); // 基于任務(wù)firstTask構(gòu)造worker
        final Thread t = w.thread; // 使用Worker的屬性thread嘁傀,這個(gè)thread是使用ThreadFactory構(gòu)造出來(lái)的
        if (t != null) { // ThreadFactory構(gòu)造出的Thread有可能是null兴蒸,做個(gè)判斷
            mainLock.lock(); // 鎖住,防止并發(fā)
            try {
                // 在鎖住之后再重新檢測(cè)一下?tīng)顟B(tài)
                int c = ctl.get();
                int rs = runStateOf(c);

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) { // 如果線程池在RUNNING狀態(tài)或者線程池在SHUTDOWN狀態(tài)并且任務(wù)是個(gè)null
                    if (t.isAlive()) // 判斷線程是否還活著细办,也就是說(shuō)線程已經(jīng)啟動(dòng)并且還沒(méi)死掉
                        throw new IllegalThreadStateException(); // 如果存在已經(jīng)啟動(dòng)并且還沒(méi)死的線程橙凳,拋出異常
                    workers.add(w); // worker添加到線程池的workers屬性中,是個(gè)HashSet
                    int s = workers.size(); // 得到目前線程池中的線程個(gè)數(shù)
                    if (s > largestPoolSize) // 如果線程池中的線程個(gè)數(shù)超過(guò)了線程池中的最大線程數(shù)時(shí)笑撞,更新一下這個(gè)最大線程數(shù)
                        largestPoolSize = s;
                    workerAdded = true; // 標(biāo)識(shí)一下任務(wù)已經(jīng)添加成功
                }
            } finally {
                mainLock.unlock(); // 解鎖
            }
            if (workerAdded) { // 如果任務(wù)添加成功岛啸,運(yùn)行任務(wù),改變一下任務(wù)成功啟動(dòng)標(biāo)識(shí)
                t.start(); // 啟動(dòng)線程茴肥,這里的t是Worker中的thread屬性坚踩,所以相當(dāng)于就是調(diào)用了Worker的run方法
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted) // 如果任務(wù)啟動(dòng)失敗,調(diào)用addWorkerFailed方法
            addWorkerFailed(w);
    }
    return workerStarted;
}

這個(gè)方法里執(zhí)行了三個(gè)我們關(guān)注的操作:

  • 新建Worker對(duì)象——w = new Worker(firstTask);
  • 將Worker對(duì)象加入workers集合——workers.add(w);
  • 啟動(dòng)Worker對(duì)象里的thread——t.start();
總結(jié)

簡(jiǎn)單概括一下ThreadPoolExecutor的運(yùn)行過(guò)程(不包括線程池大小控制瓤狐、線程池關(guān)閉等邏輯):

  1. ThreadPoolExecutor.execute(Runnable command)提交任務(wù)
  2. 如果Worker數(shù)量未達(dá)到上限瞬铸,新建一個(gè)Worker并將command作為Worker的firstTask
  3. 如果Worker數(shù)量已達(dá)到上限,則將command放入workQueue
  4. 每個(gè)啟動(dòng)了的worker先執(zhí)行firstTask础锐,然后繼續(xù)從workQueue獲取task來(lái)執(zhí)行

參考資料

  1. Java多線程-工具篇-BlockingQueue
  2. Java的多線程編程模型5--從AtomicInteger開(kāi)始
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末嗓节,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子皆警,更是在濱河造成了極大的恐慌拦宣,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,402評(píng)論 6 499
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件耀怜,死亡現(xiàn)場(chǎng)離奇詭異恢着,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)财破,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,377評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén)掰派,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人左痢,你說(shuō)我怎么就攤上這事靡羡。” “怎么了俊性?”我有些...
    開(kāi)封第一講書(shū)人閱讀 162,483評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵略步,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我定页,道長(zhǎng)趟薄,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,165評(píng)論 1 292
  • 正文 為了忘掉前任典徊,我火速辦了婚禮杭煎,結(jié)果婚禮上恩够,老公的妹妹穿的比我還像新娘。我一直安慰自己羡铲,他們只是感情好蜂桶,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,176評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著也切,像睡著了一般扑媚。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上雷恃,一...
    開(kāi)封第一講書(shū)人閱讀 51,146評(píng)論 1 297
  • 那天疆股,我揣著相機(jī)與錄音,去河邊找鬼褂萧。 笑死押桃,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的导犹。 我是一名探鬼主播,決...
    沈念sama閱讀 40,032評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼羡忘,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼谎痢!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起卷雕,我...
    開(kāi)封第一講書(shū)人閱讀 38,896評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤节猿,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后漫雕,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體滨嘱,經(jīng)...
    沈念sama閱讀 45,311評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,536評(píng)論 2 332
  • 正文 我和宋清朗相戀三年浸间,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了太雨。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,696評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡魁蒜,死狀恐怖囊扳,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情兜看,我是刑警寧澤锥咸,帶...
    沈念sama閱讀 35,413評(píng)論 5 343
  • 正文 年R本政府宣布,位于F島的核電站细移,受9級(jí)特大地震影響屉佳,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜幌衣,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,008評(píng)論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望球涛。 院中可真熱鬧,春花似錦校镐、人聲如沸亿扁。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)从祝。三九已至,卻和暖如春引谜,著一層夾襖步出監(jiān)牢的瞬間牍陌,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,815評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工员咽, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留毒涧,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,698評(píng)論 2 368
  • 正文 我出身青樓贝室,卻偏偏與公主長(zhǎng)得像契讲,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子滑频,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,592評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容