ThreadPoolExecutor

4、 ThreadPoolExecutor 線程池

  1. 降低資源消耗。通過(guò)重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷(xiāo)毀造成的消耗。
  2. 提高響應(yīng)速度绘盟。當(dāng)任務(wù)到達(dá)時(shí),任務(wù)可以不需要等到線程創(chuàng)建就能立即執(zhí)行悯仙。
  3. 提高線程的可管理性龄毡。線程是稀缺資源,如果無(wú)限制地創(chuàng)建雁比,不僅會(huì)消耗系統(tǒng)資源,還會(huì)降低系統(tǒng)的穩(wěn)定性撤嫩,使用線程池可以進(jìn)行統(tǒng)一分配偎捎、調(diào)優(yōu)和監(jiān)控。但是序攘,要做到合理利用線程池茴她,必須對(duì)其實(shí)現(xiàn)原理了如指掌。

4.1程奠、創(chuàng)建ThreadPoolExecutor丈牢、參數(shù)、提交任務(wù)

// 1
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
}

// 2
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {}

// 3                              
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {}                        
  1. 參數(shù)解釋?zhuān)?
    1. corePoolSize(線程池的基本大小):當(dāng)提交一個(gè)任務(wù)到線程池時(shí)瞄沙,線程池會(huì)創(chuàng)建一個(gè)線程來(lái)執(zhí)行任務(wù)己沛,即使其他空閑的基本線程能夠執(zhí)行新任務(wù)也會(huì)創(chuàng)建線程慌核,等到需要執(zhí)行的任務(wù)數(shù)大于線程池基本大小時(shí)就不再創(chuàng)建。如果調(diào)用了線程池的prestartAllCoreThreads()方法申尼,線程池會(huì)提前創(chuàng)建并啟動(dòng)所有基本線程垮卓。
    2. maximumPoolSize(線程池最大數(shù)量):線程池允許創(chuàng)建的最大線程數(shù)。如果隊(duì)列滿了师幕,并且已創(chuàng)建的線程數(shù)小于最大線程數(shù)粟按,則線程池會(huì)再創(chuàng)建新的線程執(zhí)行任務(wù)。值得注意的是霹粥,如果使用了無(wú)界的任務(wù)隊(duì)列這個(gè)參數(shù)就沒(méi)什么效果灭将。
    3. keepAliveTime (線程活動(dòng)保持時(shí)間):線程池的工作線程空閑后,保持存活的時(shí)間后控。所以庙曙,如果任務(wù)很多,并且每個(gè)任務(wù)執(zhí)行的時(shí)間比較短忆蚀,可以調(diào)大時(shí)間矾利,提高線程的利用率。
    4. unit (線程活動(dòng)保持時(shí)間的單位):可選的單位有天(DAYS)馋袜、小時(shí)(HOURS)男旗、分鐘 (MINUTES)、毫秒(MILLISECONDS)欣鳖、微秒(MICROSECONDS察皇,千分之一毫秒)和納秒。
    5. workQueue runnableTaskQueue(任務(wù)隊(duì)列):用于保存等待執(zhí)行的任務(wù)的阻塞隊(duì)列泽台∈踩伲可以選擇以下幾個(gè)阻塞隊(duì)列。
      1. ArrayBlockingQueue:是一個(gè)基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列怀酷,此隊(duì)列按FIFO(先進(jìn)先出)原則對(duì)元素進(jìn)行排序稻爬。
      2. LinkedBlockingQueue:一個(gè)基于鏈表結(jié)構(gòu)的阻塞隊(duì)列,此隊(duì)列按FIFO排序元素蜕依,吞吐量通常要高于ArrayBlockingQueue桅锄。靜態(tài)工廠方法Executors.newFixedThreadPool()使用了這個(gè)隊(duì)列。
      3. SynchronousQueue:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列样眠。每個(gè)插入操作必須等到另一個(gè)線程調(diào)用 移除操作友瘤,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于Linked-BlockingQueue檐束,靜態(tài)工廠方法Executors.newCachedThreadPool使用了這個(gè)隊(duì)列辫秧。
      4. PriorityBlockingQueue:一個(gè)具有優(yōu)先級(jí)的無(wú)限阻塞隊(duì)列。
    6. handler (飽和策略):當(dāng)隊(duì)列和線程池都滿了被丧,說(shuō)明線程池處于飽和狀態(tài)盟戏,那么必須采取一種策略處理提交的新任務(wù)绪妹。這個(gè)策略默認(rèn)情況下是AbortPolicy,表示無(wú)法處理新任務(wù)時(shí)拋出異常抓半。在JDK1.5中Java線程池框架提供了以下4種策略喂急。
      1. AbortPolicy:直接拋出異常。
      2. CallerRunsPolicy:只用調(diào)用者所在線程來(lái)運(yùn)行任務(wù)笛求。
      3. DiscardOldestPolicy:丟棄隊(duì)列里最近的一個(gè)任務(wù)廊移,并執(zhí)行當(dāng)前任務(wù)。
      4. DiscardPolicy:不處理探入,丟棄掉狡孔。
    7. threadFactory 用于設(shè)置創(chuàng)建線程的工廠,可以通過(guò)線程工廠給每個(gè)創(chuàng)建出來(lái)的線程設(shè)置更有意義的名字蜂嗽。
  2. 提交任務(wù)
    1. execute() 方法用于提交不需要返回值的任務(wù)苗膝,所以無(wú)法判斷任務(wù)是否被線程池執(zhí)行成功。
    2. submit() 方法用于提交需要返回值的任務(wù)植旧。線程池會(huì)返回一個(gè)future類(lèi)型的對(duì)象辱揭,通過(guò)這個(gè)future對(duì)象可以判斷任務(wù)是否執(zhí)行成功,并且可以通過(guò)future的get()方法來(lái)獲取返回值病附,get()方法會(huì)阻塞當(dāng)前線程直到任務(wù)完成问窃,而使用get(long timeout,TimeUnit unit)方法則會(huì)阻塞當(dāng)前線 程一段時(shí)間后立即返回完沪,這時(shí)候有可能任務(wù)沒(méi)有執(zhí)行完域庇。

摘抄自《Java并發(fā)編程的藝術(shù)》

4.2、線程池的生命周期及其他狀態(tài)

了解線程池的生命周期對(duì)理解線程池的實(shí)現(xiàn)原理至關(guān)重要覆积。線程池的生命周期通過(guò)AtomicInteger類(lèi)型的變量ctl來(lái)維護(hù)听皿。

// ctl變量維護(hù)線程池的生命周期并提供一些額外方法
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  1. 生命周期:

    1. RUNNING:接受新任務(wù)并處理隊(duì)列中已有的任務(wù)。
    2. SHUTDOWN:不接受新任務(wù)宽档,但會(huì)處理隊(duì)列中已有任務(wù)尉姨。
    3. STOP:不接受新任務(wù)、不處理隊(duì)列中已有任務(wù)吗冤,且中斷正在運(yùn)行的任務(wù)又厉。
    4. TIDYING:線程池中所有的線程都已終止,并將workerCount清零欣孤。
    5. TERMINATED:線程池完全停止馋没。
  2. 生命周期轉(zhuǎn)換:

    1. RUNNING -> SHUTDOWN昔逗,調(diào)用shutdown()方法降传。
    2. (RUNNING or SHUTDOWN) -> STOP,調(diào)用shutdownNow()方法勾怒。
    3. SHUTDOWN -> TIDYING婆排,線程池和任務(wù)隊(duì)列同時(shí)為空声旺。
    4. STOP -> TIDYING,線程池為空段只。
    5. TIDYING -> TERMINATED腮猖,調(diào)用terminated()鉤子方法。

了解了線程池的狀態(tài)赞枕,以及狀態(tài)之間的轉(zhuǎn)換之后澈缺,下面我們著手開(kāi)始分析線程池的源碼。

4.3炕婶、以execute方式提交任務(wù)

execute()方法用于提交不需要返回值的任務(wù)姐赡,所以無(wú)法判斷任務(wù)是否被線程池執(zhí)行成功。通過(guò)分析execute()方法柠掂,我們將了解任務(wù)提交之后项滑,線程池是如何創(chuàng)建線程、如何將線程池的線程數(shù)逐步擴(kuò)大到maximumPoolSize涯贞、以及如何處理隊(duì)列中的任務(wù)枪狂。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // ① 線程池中的線程數(shù)小于corePoolSize,創(chuàng)建線程宋渔。
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // ② 線程池中的線程數(shù)大于corePoolSize州疾。
    // 第二步的邏輯稍有些復(fù)雜,分為三個(gè)步驟:
    // 2.1傻谁、若阻塞隊(duì)列未滿孝治,將任務(wù)加入阻塞隊(duì)列,而非創(chuàng)建線程
    if (isRunning(c) && workQueue.offer(command)) {
        // 因?yàn)樽枞?duì)列和線程池使用的不是同一把鎖审磁,任務(wù)入隊(duì)之后谈飒,
        // 線程池的狀態(tài)可能會(huì)發(fā)生變化,從而導(dǎo)致任務(wù)無(wú)法正常執(zhí)行态蒂,所以這里要做一次二次校驗(yàn)
        int recheck = ctl.get();
        // 2.2杭措、若線程池非運(yùn)行狀態(tài),則將任務(wù)移出阻塞隊(duì)列
        //(注意:!isRunning(recheck) = (STOP 或 TIDYING 或 TERMINATED))
        // 處于這三種狀態(tài)之一的線程池即不接受新任務(wù)钾恢、也不處理隊(duì)列中的任務(wù)
        if (!isRunning(recheck) && remove(command))
            // 根據(jù)RejectedExecutionHandler策略回絕該任務(wù)
            reject(command);
        // 2.3 若線程池中的worker數(shù)為0
        else if (workerCountOf(recheck) == 0)
            // 創(chuàng)建一個(gè)任務(wù)為null的worker手素,初看之下較為晦澀難懂。
            // 分析:雖然worker數(shù)為0瘩蚪,但是阻塞隊(duì)列不一定為空泉懦,
            // 而阻塞隊(duì)列中任務(wù)的執(zhí)行又依賴worker來(lái)完成,所以這里加入一個(gè)任務(wù)為空的worker
            // 以便繼續(xù)執(zhí)行阻塞隊(duì)列中的任務(wù)
            addWorker(null, false);
    }
    // ③ 執(zhí)行到此處疹瘦,線程池中的線程數(shù)大于corePoolSize崩哩、且隊(duì)列已滿。
    // 3.1、若線程池中的線程數(shù)小于maximumPoolSize邓嘹,繼續(xù)創(chuàng)建線程酣栈;
    // 3.2、否則根據(jù)飽和策略汹押,回絕該任務(wù)矿筝。
    else if (!addWorker(command, false))
        reject(command);
}

以上就是通過(guò)execute()方法提交任務(wù)之后,線程池對(duì)任務(wù)的接受處理過(guò)程棚贾。主要分為三種情況:

  1. 線程池中線程數(shù)窖维,小于corePoolSize,創(chuàng)建線程妙痹。
  2. 線程池中線程數(shù)陈辱,大于corePoolSize,阻塞隊(duì)列未滿细诸,將任務(wù)加入阻塞隊(duì)列沛贪。
    1. 二次判斷線程池狀態(tài),若非運(yùn)行狀態(tài)震贵,回絕利赋;若運(yùn)行狀態(tài),再判斷worker數(shù)猩系。
    2. worker數(shù)小于0媚送,創(chuàng)建任務(wù)為空的worker,以便執(zhí)行阻塞隊(duì)列中的任務(wù)寇甸。
  3. 線程池中線程數(shù)塘偎,大于corePoolSize,小于maximumPoolSize拿霉,阻塞隊(duì)列已滿吟秩,創(chuàng)建線程。
4.3.1绽淘、addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    // 自旋
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // ① 快速檢查線程池涵防、阻塞隊(duì)列的狀態(tài),若不滿足添加條件沪铭,快速返回失敗
        // 第一個(gè)條件:rs >= SHUTDOWN 根據(jù)線程池生命周期的定義壮池,若狀態(tài)大于等于SHUTDOWN不接受新任務(wù)
        // 第二個(gè)條件:rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty() 三者有一個(gè)為不成立
        // 綜合第一個(gè)條件:
        // 1.1、若rs == SHUTDOWN不成立杀怠,rs為STOP椰憋、TIDYING、TERMINATED其中之一赔退,不接受新任務(wù)橙依、不處理隊(duì)列任務(wù),返回false;
        // 1.2票编、若firstTask == null不成立,rs為SHUTDOWN卵渴,不接受新任務(wù)慧域,返回false;
        // 1.3浪读、若!workQueue.isEmpty()不成立昔榴,rs為SHUTDOWN、且firstTask == null碘橘、且隊(duì)列為空
        //      第三個(gè)條件較為晦澀互订,結(jié)合前文分析,此條件是為了保證線程池中依然有worker可以處理阻塞隊(duì)列的任務(wù)痘拆,
        //      若阻塞隊(duì)列為空仰禽,則阻塞隊(duì)列無(wú)任務(wù)需要處理,返回false纺蛆。
        if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
            return false;
        // ② 自旋吐葵,嘗試將worker數(shù)加一
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // ③ 將任務(wù)包裝成Worker(此步非常重要,下文單獨(dú)分析)
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // ④ 為防止線程池在獲取鎖之前關(guān)閉桥氏,這里要進(jìn)行二次檢查温峭。
                // rs < SHUTDOWN 線程池的狀態(tài)為運(yùn)行中,可接受新任務(wù)字支、可以處理隊(duì)列中的任務(wù)凤藏。
                // rs == SHUTDOWN && firstTask == null ?堕伪?揖庄? 此處尚不清晰
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 將任務(wù)添加至worker集合
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // ④ 開(kāi)啟線程(這一步最終會(huì)執(zhí)行到runWorker()方法,下文單獨(dú)分析其調(diào)用過(guò)程)
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // ⑤ 線程啟動(dòng)失敗欠雌,回滾
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

addWorker方法是線程池中較為核心的方法之一:

  1. 狀態(tài)檢查抠艾,快速返回失敗。如果不了解線程池的狀態(tài)桨昙、線程池運(yùn)行上下文检号,則較為晦澀難懂。
  2. 將任務(wù)包裝成Worker對(duì)象蛙酪,以便委托worker執(zhí)行任務(wù)齐苛。
  3. 開(kāi)啟線程。開(kāi)啟線程是指開(kāi)啟worker(worker實(shí)現(xiàn)了Runnable接口)線程桂塞,并通過(guò)run方法調(diào)用runWorker方法來(lái)執(zhí)行提交的任務(wù)凹蜂。
  4. worker添加失敗,回滾。
4.3.2玛痊、 將任務(wù)包裝成Worker對(duì)象

線程池將任務(wù)包裝成Worker對(duì)象汰瘫,并在其構(gòu)造方法中通過(guò)this將任務(wù)指向其本身。由于Worker類(lèi)繼承了Runnable接口擂煞,線程啟動(dòng)后混弥,即可通過(guò)run()方法調(diào)用runWorker()方法來(lái)完成任務(wù)的執(zhí)行。

// Worker類(lèi)部分源碼
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    // 注意:前文addWorker方法中啟動(dòng)的線程即該線程对省,而該線程即worker本身
    final Thread thread;
    // 任務(wù)
    Runnable firstTask;
    
    Worker(Runnable firstTask) {
        setState(-1); 
        this.firstTask = firstTask;
        // 注意這里傳入的參數(shù)為this蝗拿,即Worker本身
        this.thread = getThreadFactory().newThread(this);
    }
    
    public void run() {
        runWorker(this);
    }
}       
4.3.3、runWorker
// 委托run()方法來(lái)執(zhí)行runWorker()方法
public void run() {
    runWorker(this);
}

// runWorker()方法
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    // ① 標(biāo)記任務(wù)執(zhí)行過(guò)程中蒿涎,是否拋出異常
    boolean completedAbruptly = true;
    try {
        // ②
        // 2.1哀托、任務(wù)不為空,直接執(zhí)行
        // 2.2劳秋、任務(wù)為空仓手,通過(guò)getTask()方法從阻塞隊(duì)列中獲取任務(wù)
        while (task != null || (task = getTask()) != null) {
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) 
                    && !wt.isInterrupted())
                wt.interrupt();
            try {
                // 模板方法,任務(wù)開(kāi)始執(zhí)行之前玻淑,可在該方法中做一些額外處理
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // ③ 執(zhí)行任務(wù)
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 模板方法俗或,任務(wù)結(jié)束執(zhí)行之后,可在該方法中做一些額外處理
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // ④ 處理worker退出
        processWorkerExit(w, completedAbruptly);
    }
}
  1. 定義變量completedAbruptly岁忘,以標(biāo)記任務(wù)執(zhí)行過(guò)程中辛慰,是否拋出異常。
  2. 獲取任務(wù)
    1. 任務(wù)不為空干像,直接執(zhí)行帅腌。
    2. 否則,通過(guò)getTask()方法從隊(duì)列中獲取任務(wù)麻汰。若當(dāng)前任務(wù)隊(duì)列中無(wú)任務(wù)速客,則阻塞。直至獲取到任務(wù)或返回null五鲫。
  3. 執(zhí)行任務(wù)溺职。
  4. 調(diào)用processWorkerExit()方法,處理worker退出位喂。
4.3.4浪耘、 getTask()

以阻塞或超時(shí)(allowCoreThreadTimeOut為true)的方式從隊(duì)列中獲取任務(wù),返回任務(wù)或null塑崖。比較特殊的是七冲,在該方法有可能導(dǎo)致worker退出:

  1. 線程池中線程數(shù)大于maximumPoolSize(調(diào)用setMaximumPoolSize()導(dǎo)致)。
  2. 線程池狀態(tài)為STOP规婆。
  3. 線程池SHUTDOWN且任務(wù)隊(duì)列為空澜躺。
  4. 獲取任務(wù)超時(shí)蝉稳。
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    // 自旋
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        
        // Check if queue empty only if necessary.
        // 快速判斷線程池、任務(wù)隊(duì)列狀態(tài)
        // 1掘鄙、rs >= STOP 線程池不接受新任務(wù)耘戚、不處理隊(duì)列任務(wù),返回null
        // 2操漠、rs = SHUTDOWN && workQueue.isEmpty() 隊(duì)列中無(wú)待處理任務(wù)收津,返回null
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        
        // 當(dāng)allowCoreThreadTimeOut為true或線程池中線程數(shù)大于corePoolSize時(shí),可銷(xiāo)毀線程
        // 1颅夺、allowCoreThreadTimeOut:false(默認(rèn)):即使核心線程處于空閑狀態(tài)也不會(huì)被銷(xiāo)毀;
        // 2蛹稍、allowCoreThreadTimeOut:true:        核心任務(wù)超過(guò)keepAliveTime還未獲取到任務(wù)吧黄,則銷(xiāo)毀。
        // 當(dāng)然是否銷(xiāo)毀線程唆姐,還需根據(jù)后續(xù)獲取任務(wù)的狀況判斷
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
        // 若滿足銷(xiāo)毀條件拗慨,返回null
        if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            // 獲取任務(wù)
            Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
            // 如獲取到任務(wù),則返回奉芦;否則將timedOut標(biāo)記為true赵抢,作為下一輪自旋銷(xiāo)毀線程的判斷條件
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

小結(jié):代碼執(zhí)行到這里,任務(wù)正常執(zhí)行声功、通過(guò)getTask()方法獲取任務(wù)并執(zhí)行兩種方式都已分析過(guò)烦却。接著分析runWorker方法,還剩下最后一步processWorkerExit()

4.3.5先巴、 processWorkerExit

processWorkerExit執(zhí)行條件:

  1. 任務(wù)在執(zhí)行過(guò)程中拋出異常
  2. getTask()方法返回null
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        // 移除worker
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    
    // 若線程池的狀態(tài)以及任務(wù)隊(duì)列的狀態(tài)符合線程池終止的條件其爵,則終止線程池
    tryTerminate();
    
    // 嘗試終止線程池后,線程池的狀態(tài)可能發(fā)生了變化
    // 接下來(lái)還要對(duì)線程池狀態(tài)為RUNNING伸蚯、SHUTDOWN做一些判斷
    // 防止不合理的清除核心線程摩渺、防止清除全部線程而任務(wù)隊(duì)列依然有待執(zhí)行任務(wù)的情況
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        // worker在執(zhí)行任務(wù)時(shí)未出異常
        if (!completedAbruptly) {
            // 定義變量min,記錄線程池中可以保留的最小線程數(shù)
            // 若allowCoreThreadTimeOut為true剂邮,則線程池保留最小線程數(shù)可以為0摇幻;否則為corePoolSize
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // 核心線程池保留最小線程數(shù)為0,則要考慮任務(wù)隊(duì)列是否為空
            // 若任務(wù)隊(duì)列不為空挥萌,則至少要保留一個(gè)線程绰姻,以便繼續(xù)執(zhí)行任務(wù)隊(duì)列中的任務(wù)
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            // 若線程池中線程數(shù)大于等于min,則直接返回
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // worker在執(zhí)行任務(wù)時(shí)出現(xiàn)異常引瀑,則向線程池中加入一個(gè)任務(wù)為空的線程
        addWorker(null, false);
    }
}
// 嘗試終止線程池
final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        // 檢查線程池的狀態(tài)龙宏,以下三種情況,不能終止線程池
        // 1伤疙、線程池狀態(tài)為RUNNING
        // 2寓调、線程池的狀態(tài)為T(mén)IDYING或TERMINATED
        // 3、線程池的狀態(tài)為SHUTDOWN朦肘,但是任務(wù)隊(duì)列中有未處理的任務(wù)
        if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        // 檢查線程池中線程數(shù)是否為0灯抛,并中斷線程池中空閑的線程
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 將線程池的狀態(tài)設(shè)置為T(mén)IDYING
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // 模板方法,可以在該方法中做一些額外的事情
                    terminated();
                } finally {
                    // 將線程池的狀態(tài)設(shè)置為T(mén)ERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

processWorkerExit()方法在嘗試終止線程池之后嫉鲸,對(duì)線程池、任務(wù)隊(duì)列的狀態(tài)做了一些判斷。再結(jié)合completedAbruptly次慢、allowCoreThreadTimeOut屬性以確定是否真的銷(xiāo)毀線程、是否維護(hù)一個(gè)執(zhí)行任務(wù)隊(duì)列的線程翔曲。從代碼中可以看到迫像,即使是核心線程,也可能被銷(xiāo)毀瞳遍。該段代碼雖短闻妓,涉及的知識(shí)點(diǎn)很多,若分析有不正確的地方掠械,還希望大家可以指正由缆。

分析到此,關(guān)于線程池的創(chuàng)建猾蒂、運(yùn)行均唉、銷(xiāo)毀等核心代碼,都以分析完畢肚菠。下面再分析一些其他較為重要的代碼舔箭。

4.4、關(guān)閉線程池

關(guān)閉線程池有以下兩種方式:

  1. shutdownNow()
    1. 將線程池狀態(tài)更改為STOP蚊逢。
    2. 中斷所有已經(jīng)啟動(dòng)的線程限嫌。
    3. 返回等待執(zhí)行的任務(wù)列表。
  2. shutdown()
    1. 將線程池的狀態(tài)更改為SHUTDOWN
    2. 中斷所有空閑線程

shutdownNow()和shutdown()雖然同為關(guān)閉線程池时捌,但區(qū)別還是很大的怒医。在關(guān)閉線程池時(shí),如需要執(zhí)行完等待任務(wù)可使用shutdown()奢讨,若不考慮稚叹,可以使用shutdownNow()拿诸。兩者大體思路均是通過(guò)循環(huán)并中斷線程來(lái)實(shí)現(xiàn),所以任務(wù)要盡量可以響應(yīng)中斷亩码,否則有可能導(dǎo)致,雖然線程池關(guān)閉了飒泻,但是線程依然在運(yùn)行的情況鞭光。

4.4.1惰许、shutdown()
// 關(guān)閉線程池
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 將線程池的狀態(tài)更改為SHUTDOWN
        advanceRunState(SHUTDOWN);
        // 中斷所有空閑線程
        interruptIdleWorkers();
        // ScheduledThreadPoolExecutor 鉤子方法
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    // 嘗試徹底關(guān)閉線程池
    tryTerminate();
}
// 中斷空閑線程
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 循環(huán)所有worker
        for (Worker w : workers) {
            Thread t = w.thread;
            // 若線程未被中斷過(guò)史辙,且線程空閑
            // “空閑” 是指:線程未獲取到鎖。所以這里嘗試去獲取鎖聊倔,一旦能獲取成功,則表明該線程確實(shí)是空閑線程耙蔑。
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}
4.4.2、shutdownNow()
// 關(guān)閉線程池
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 將線程池的狀態(tài)更改為STOP
        advanceRunState(STOP);
        // 中斷所有worker
        interruptWorkers();
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}
// 中斷所有已啟動(dòng)線程
private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}

4.5徐鹤、以submit方式提交任務(wù)

前文已經(jīng)分析了以execute()方式提交任務(wù)的過(guò)程邀层。下面分析一下另一種方式,以submit()方式提交任務(wù)寥院。與execute()方法不同涛目,submit()具有返回值。

// 方式一
public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

// 方式二
public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}

// 方式三
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

submit()有三種提交任務(wù)的方式霹肝,可再歸納為兩種估蹄,一種提交Runnable任務(wù);一種提交Callable任務(wù)沫换。submit()較execute()方法只是將任務(wù)封裝成了FutureTask對(duì)象臭蚁,再調(diào)用execute()方法執(zhí)行任務(wù)而已。相對(duì)比較簡(jiǎn)單讯赏,不做過(guò)多的分析垮兑。

接下來(lái)介紹一下ThreadPoolExecutor的擴(kuò)展ScheduledThreadPoolExecutor。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末漱挎,一起剝皮案震驚了整個(gè)濱河市系枪,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌磕谅,老刑警劉巖私爷,帶你破解...
    沈念sama閱讀 217,277評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件雾棺,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡当犯,警方通過(guò)查閱死者的電腦和手機(jī)垢村,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)嚎卫,“玉大人嘉栓,你說(shuō)我怎么就攤上這事⊥刂睿” “怎么了侵佃?”我有些...
    開(kāi)封第一講書(shū)人閱讀 163,624評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)奠支。 經(jīng)常有香客問(wèn)我馋辈,道長(zhǎng),這世上最難降的妖魔是什么倍谜? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,356評(píng)論 1 293
  • 正文 為了忘掉前任迈螟,我火速辦了婚禮,結(jié)果婚禮上尔崔,老公的妹妹穿的比我還像新娘答毫。我一直安慰自己,他們只是感情好季春,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,402評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布载弄。 她就那樣靜靜地躺著宇攻,像睡著了一般逞刷。 火紅的嫁衣襯著肌膚如雪亲桥。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,292評(píng)論 1 301
  • 那天,我揣著相機(jī)與錄音番枚,去河邊找鬼。 笑死拗馒,一個(gè)胖子當(dāng)著我的面吹牛诱桂,可吹牛的內(nèi)容都是我干的呈昔。 我是一名探鬼主播,決...
    沈念sama閱讀 40,135評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼郭宝!你這毒婦竟也來(lái)了粘室?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 38,992評(píng)論 0 275
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤丙号,失蹤者是張志新(化名)和其女友劉穎缰冤,沒(méi)想到半個(gè)月后喳魏,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體刺彩,經(jīng)...
    沈念sama閱讀 45,429評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡嗡害,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,636評(píng)論 3 334
  • 正文 我和宋清朗相戀三年霸妹,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了知押。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,785評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡畏线,死狀恐怖寝殴,靈堂內(nèi)的尸體忽然破棺而出明垢,到底是詐尸還是另有隱情袖外,我是刑警寧澤,帶...
    沈念sama閱讀 35,492評(píng)論 5 345
  • 正文 年R本政府宣布泌射,位于F島的核電站熔酷,受9級(jí)特大地震影響拒秘,放射性物質(zhì)發(fā)生泄漏躺酒。R本人自食惡果不足惜羹应,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,092評(píng)論 3 328
  • 文/蒙蒙 一园匹、第九天 我趴在偏房一處隱蔽的房頂上張望劫灶。 院中可真熱鬧本昏,春花似錦、人聲如沸紊馏。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,723評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至悦荒,卻和暖如春嘹吨,著一層夾襖步出監(jiān)牢的瞬間蟀拷,已是汗流浹背问芬。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,858評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工强戴, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留骑歹,地道東北人匕累。 一個(gè)月前我還...
    沈念sama閱讀 47,891評(píng)論 2 370
  • 正文 我出身青樓欢嘿,卻偏偏與公主長(zhǎng)得像炼蹦,于是被迫代替她去往敵國(guó)和親掐隐。 傳聞我的和親對(duì)象是個(gè)殘疾皇子虑省,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,713評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容