Java線程池實(shí)現(xiàn)原理和源碼分析

Java線程池實(shí)現(xiàn)原理和源碼分析

前言

本文章是從2019年11月下旬開始打開寫的善绎,一直拖到2020年的年尾才開始寫屯蹦,直到2021年年初才寫完搁胆。

時(shí)間太快也太慢~夺溢!

依稀記得2019年10月份的時(shí)候某東從創(chuàng)業(yè)公司離職打算面試找工作阔逼,他問我線程池你會(huì)么兆衅?然后給我他發(fā)了一篇我2017年寫的筆記《Java并發(fā)編程之線程池必用知識點(diǎn)》,他說就這么點(diǎn)嗜浮?我當(dāng)時(shí)想線程池也差不多就這么多吧~羡亩!

2019年11月9號我和某東一起從大望路做815公交去燕郊。當(dāng)時(shí)只是因?yàn)槲艺趯W(xué)習(xí)一部分多線程相關(guān)的知識點(diǎn)危融,剛好公交車上沒啥事情我倆就嘮了嘮畏铆。當(dāng)時(shí)他問了我一些線程池的問題,我覺得在平時(shí)的工作線程池知道該怎么用就行頂多優(yōu)化一下核心線程數(shù)量吉殃。主要討論的還是多線程并發(fā)和鎖的相關(guān)問題辞居。

年底工作一般比較忙也就很少進(jìn)行自我學(xué)習(xí)了,隔了一周想起了某東問我的問題“線程池中線程是怎么產(chǎn)生的蛋勺,任務(wù)是怎么等待執(zhí)行瓦灶?”。

自己也不是很清楚這塊的邏輯抱完,臨時(shí)把這個(gè)TODO項(xiàng)紀(jì)錄了下來贼陶,想以后有時(shí)間了研究一下。結(jié)果這個(gè)以后跨越了2019年和2020年直接來到了2021年。

原諒我的啰里啰嗦碉怔,關(guān)鍵這個(gè)篇文章時(shí)間跨度太長了烘贴,給我的印象太深了。不得不說道說道撮胧,下面開始進(jìn)去正題~桨踪!

JDK1.8的源碼來分析Java線程池的核心設(shè)計(jì)與實(shí)現(xiàn)。

本文參考了Java線程池實(shí)現(xiàn)原理及其在美團(tuán)業(yè)務(wù)中的實(shí)踐這篇文章中的部分內(nèi)容趴樱。

Java線程池實(shí)現(xiàn)原理及其在美團(tuán)業(yè)務(wù)中的實(shí)踐這篇文章寫的非常好馒闷,除過本文內(nèi)容之外這篇文章還講述了的關(guān)于線程池的背景線程池在業(yè)務(wù)中的實(shí)踐動(dòng)態(tài)化線程池等叁征,所以想了解線程池關(guān)于這些類容的可以閱讀Java線程池實(shí)現(xiàn)原理及其在美團(tuán)業(yè)務(wù)中的實(shí)踐這篇文章纳账。

如果讀者為做服務(wù)端開發(fā)的同學(xué)那么強(qiáng)烈建議閱讀Java線程池實(shí)現(xiàn)原理及其在美團(tuán)業(yè)務(wù)中的實(shí)踐

外觀

外觀主要是我們平常使用線程池的時(shí)候所看到的一些點(diǎn)捺疼。

  • 繼承關(guān)系疏虫;
  • 構(gòu)造函數(shù);
  • 構(gòu)造函數(shù)中的參數(shù)啤呼;
  • 構(gòu)造函數(shù)中的阻塞隊(duì)列卧秘;
  • 線程池的創(chuàng)建;
  • 構(gòu)造函數(shù)中的拒絕策略官扣;

線程池繼承關(guān)系

ThreadPoolExecutor-uml.png

ThreadPoolExecutor實(shí)現(xiàn)的頂層接口是Executor翅敌,在接口Executor中用戶無需關(guān)注如何創(chuàng)建線程,如何調(diào)度線程來執(zhí)行任務(wù)惕蹄,用戶只需提供Runnable對象蚯涮,將任務(wù)的運(yùn)行邏輯提交到執(zhí)行器Executor中,由Executor框架完成線程的調(diào)配和任務(wù)的執(zhí)行部分卖陵。

ExecutorService接口增加了一些能力:

  1. 擴(kuò)充執(zhí)行任務(wù)的能力遭顶,補(bǔ)充可以為一個(gè)或一批異步任務(wù)生成Future的方法;
  2. 提供了管控線程池的方法泪蔫,比如停止線程池的運(yùn)行棒旗。

AbstractExecutorService則是上層的抽象類,將執(zhí)行任務(wù)的流程串聯(lián)了起來撩荣,保證下層的實(shí)現(xiàn)只需關(guān)注一個(gè)執(zhí)行任務(wù)的方法即可铣揉。

最下層的實(shí)現(xiàn)類ThreadPoolExecutor實(shí)現(xiàn)最復(fù)雜的運(yùn)行部分:

  1. 可以自動(dòng)創(chuàng)建、管理和復(fù)用指定數(shù)量的一組線程餐曹,適用方只需提交任務(wù)即可

  2. 線程安全老速,ThreadPoolExecutor內(nèi)部有狀態(tài)、核心線程數(shù)凸主、非核心線程等屬性橘券,廣泛使用了CASAQS鎖機(jī)制避免并發(fā)帶來的沖突問題

  3. 提供了核心線程、緩沖阻塞隊(duì)列、非核心線程旁舰、拋棄策略的概念锋华,可以根據(jù)實(shí)際應(yīng)用場景進(jìn)行組合使用

  4. 提供了beforeExecuteafterExecute()可以支持對線程池的功能進(jìn)行擴(kuò)展

構(gòu)造函數(shù)

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
  • corePoolSize:線程池的核心線程數(shù),一般情況下不管有沒有任務(wù)都會(huì)一直在線程池中一直存活箭窜,只有在 ThreadPoolExecutor中的方法allowCoreThreadTimeOut(boolean value)設(shè)置為true時(shí)毯焕,閑置的核心線程會(huì)存在超時(shí)機(jī)制,如果在指定時(shí)間沒有新任務(wù)來時(shí)磺樱,核心線程也會(huì)被終止纳猫,而這個(gè)時(shí)間間隔由第3個(gè)屬性 keepAliveTime指定。
  • maximumPoolSize:線程池所能容納的最大線程數(shù)竹捉,當(dāng)活動(dòng)的線程數(shù)達(dá)到這個(gè)值后芜辕,后續(xù)的新任務(wù)將會(huì)被阻塞。
  • keepAliveTime:控制線程閑置時(shí)的超時(shí)時(shí)長块差,超過則終止該線程侵续。一般情況下用于非核心線程,只有在 ThreadPoolExecutor中的方法allowCoreThreadTimeOut(boolean value)設(shè)置為true時(shí)憨闰,也作用于核心線程状蜗。
  • unit:用于指定keepAliveTime參數(shù)的時(shí)間單位,TimeUnit是個(gè)enum枚舉類型鹉动,常用的有:TimeUnit.HOURS(小時(shí))轧坎、TimeUnit.MINUTES(分鐘)TimeUnit.SECONDS(秒)TimeUnit.MILLISECONDS(毫秒)等泽示。
  • workQueue:線程池的任務(wù)隊(duì)列眶根,通過線程池的execute(Runnable command)方法會(huì)將任務(wù)Runnable存儲在隊(duì)列中。
  • threadFactory:線程工廠边琉,它是一個(gè)接口,用來為線程池創(chuàng)建新線程的记劝。
  • handler:拒絕策略变姨,所謂拒絕策略,是指將任務(wù)添加到線程池中時(shí)厌丑,線程池拒絕該任務(wù)所采取的相應(yīng)策略定欧。

成員變量

/**
 * 任務(wù)阻塞隊(duì)列 
 */
private final BlockingQueue<Runnable> workQueue; 
/**
 * 非公平的互斥鎖(可重入鎖)
 */
private final ReentrantLock mainLock = new ReentrantLock();
/**
 * 線程集合一個(gè)Worker對應(yīng)一個(gè)線程,沒有核心線程的說話怒竿,只有核心線程數(shù)
 */
private final HashSet<Worker> workers = new HashSet<Worker>();
/**
 * 配合mainLock通過Condition能夠更加精細(xì)的控制多線程的休眠與喚醒
 */
private final Condition termination = mainLock.newCondition();
/**
 * 線程池中線程數(shù)量曾經(jīng)達(dá)到過的最大值砍鸠。
 */
private int largestPoolSize;  
/**
 * 已完成任務(wù)數(shù)量
 */
private long completedTaskCount;
/**
 * ThreadFactory對象,用于創(chuàng)建線程耕驰。
 */
private volatile ThreadFactory threadFactory;  
/**
 * 拒絕策略的處理句柄
 * 現(xiàn)在默認(rèn)提供了CallerRunsPolicy爷辱、AbortPolicy、DiscardOldestPolicy、DiscardPolicy
 */
private volatile RejectedExecutionHandler handler;
/**
 * 線程池維護(hù)線程(超過核心線程數(shù))所允許的空閑時(shí)間
 */
private volatile long keepAliveTime;
/**
 * 允許線程池中的核心線程超時(shí)進(jìn)行銷毀
 */
private volatile boolean allowCoreThreadTimeOut;  
/**
 * 線程池維護(hù)線程的最小數(shù)量饭弓,哪怕是空閑的  
 */
private volatile int corePoolSize;
/**
 * 線程池維護(hù)的最大線程數(shù)量双饥,線程數(shù)超過這個(gè)數(shù)量之后新提交的任務(wù)就需要進(jìn)入阻塞隊(duì)列
 */
private volatile int maximumPoolSize;

創(chuàng)建線程池

Executors提供獲取幾種常用的線程池的方法:

  • 緩存程線程池

newCachedThreadPool是一個(gè)可根據(jù)需要?jiǎng)?chuàng)建新線程的線程池,但是在以前構(gòu)造的線程可用時(shí)將重用它們弟断。對于執(zhí)行很多短期異步任務(wù)的程序而言咏花,這些線程池通常可提高程序性能阀趴。調(diào)用 execute() 將重用以前構(gòu)造的線程(如果線程可用)昏翰。如果現(xiàn)有線程沒有可用的,則創(chuàng)建一個(gè)新線程并添加到池中刘急。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程棚菊。因此,長時(shí)間保持空閑的線程池不會(huì)使用任何資源排霉。注意窍株,可以使用 ThreadPoolExecutor 構(gòu)造方法創(chuàng)建具有類似屬性但細(xì)節(jié)不同(例如超時(shí)參數(shù))的線程池。

public static ExecutorService newCachedThreadPool() {
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                  60L, TimeUnit.SECONDS,
                  new SynchronousQueue<Runnable>());
}
  • 單線程線程池

newSingleThreadExecutor 創(chuàng)建是一個(gè)單線程池攻柠,也就是該線程池只有一個(gè)線程在工作球订,所有的任務(wù)是串行執(zhí)行的,如果這個(gè)唯一的線程因?yàn)楫惓=Y(jié)束瑰钮,那么會(huì)有一個(gè)新的線程來替代它冒滩,此線程池保證所有任務(wù)的執(zhí)行順序按照任務(wù)的提交順序執(zhí)行。

public static ExecutorService newSingleThreadExecutor() {
  return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>()));
}
  • 固定大小線程池

newFixedThreadPool 創(chuàng)建固定大小的線程池浪谴,每次提交一個(gè)任務(wù)就創(chuàng)建一個(gè)線程开睡,直到線程達(dá)到線程池的最大大小,線程池的大小一旦達(dá)到最大值就會(huì)保持不變苟耻,如果某個(gè)線程因?yàn)閳?zhí)行異常而結(jié)束篇恒,那么線程池會(huì)補(bǔ)充一個(gè)新線程。

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
  return new ThreadPoolExecutor(nThreads, nThreads,
                  0L, TimeUnit.MILLISECONDS,
                  new LinkedBlockingQueue<Runnable>(),
                  threadFactory);
}
  • 單線程線程池

newScheduledThreadPool 創(chuàng)建一個(gè)大小無限的線程池凶杖,此線程池支持定時(shí)以及周期性執(zhí)行任務(wù)的需求胁艰。

public static ScheduledExecutorService newScheduledThreadPool(
    int corePoolSize, ThreadFactory threadFactory) {
  return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
                   ThreadFactory threadFactory) {
  super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
      new DelayedWorkQueue(), threadFactory);
}

我們可以看出來上面的方法一共使用了DelayedWorkQueueLinkedBlockingQueueSynchronousQueue智蝠。這個(gè)就是線程核心之一的阻塞隊(duì)列腾么。

任務(wù)阻塞隊(duì)列

BlockingQueue.png

它一般分為直接提交隊(duì)列、有界任務(wù)隊(duì)列杈湾、無界任務(wù)隊(duì)列解虱、優(yōu)先任務(wù)隊(duì)列;

SynchronousQueue

1漆撞、直接提交隊(duì)列:設(shè)置為SynchronousQueue隊(duì)列殴泰,SynchronousQueue是一個(gè)特殊的BlockingQueue寂纪,它沒有容量倡蝙,每執(zhí)行一個(gè)插入操作就會(huì)阻塞,需要再執(zhí)行一個(gè)刪除操作才會(huì)被喚醒,反之每一個(gè)刪除操作也都要等待對應(yīng)的插入操作凯傲。

使用SynchronousQueue隊(duì)列妇斤,提交的任務(wù)不會(huì)被保存响鹃,總是會(huì)馬上提交執(zhí)行瞧掺。如果用于執(zhí)行任務(wù)的線程數(shù)量小于maximumPoolSize,則嘗試創(chuàng)建新的進(jìn)程健霹,如果達(dá)到maximumPoolSize設(shè)置的最大值旺上,則根據(jù)你設(shè)置的handler執(zhí)行拒絕策略。因此這種方式你提交的任務(wù)不會(huì)被緩存起來糖埋,而是會(huì)被馬上執(zhí)行宣吱,在這種情況下,你需要對你程序的并發(fā)量有個(gè)準(zhǔn)確的評估瞳别,才能設(shè)置合適的maximumPoolSize數(shù)量征候,否則很容易就會(huì)執(zhí)行拒絕策略;

ArrayBlockingQueue

2祟敛、有界的任務(wù)隊(duì)列:有界的任務(wù)隊(duì)列可以使用ArrayBlockingQueue實(shí)現(xiàn)疤坝,如下所示:

pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

使用ArrayBlockingQueue有界任務(wù)隊(duì)列,若有新的任務(wù)需要執(zhí)行時(shí)馆铁,線程池會(huì)創(chuàng)建新的線程跑揉,直到創(chuàng)建的線程數(shù)量達(dá)到corePoolSize時(shí),則會(huì)將新的任務(wù)加入到等待隊(duì)列中埠巨。若等待隊(duì)列已滿历谍,即超過ArrayBlockingQueue初始化的容量,則繼續(xù)創(chuàng)建線程辣垒,直到線程數(shù)量達(dá)到maximumPoolSize設(shè)置的最大線程數(shù)量望侈,若大于maximumPoolSize,則執(zhí)行拒絕策略勋桶。在這種情況下脱衙,線程數(shù)量的上限與有界任務(wù)隊(duì)列的狀態(tài)有直接關(guān)系,如果有界隊(duì)列初始容量較大或者沒有達(dá)到超負(fù)荷的狀態(tài)哥遮,線程數(shù)將一直維持在corePoolSize以下,反之當(dāng)任務(wù)隊(duì)列已滿時(shí)陵究,則會(huì)以maximumPoolSize為最大線程數(shù)上限眠饮。

LinkedBlockingQueue

3、無界的任務(wù)隊(duì)列:無界任務(wù)隊(duì)列可以使用LinkedBlockingQueue實(shí)現(xiàn)铜邮,如下所示:

pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

使用無界任務(wù)隊(duì)列仪召,線程池的任務(wù)隊(duì)列可以無限制的添加新的任務(wù)寨蹋,而線程池創(chuàng)建的最大線程數(shù)量就是你corePoolSize設(shè)置的數(shù)量,也就是說在這種情況下maximumPoolSize這個(gè)參數(shù)是無效的扔茅,哪怕你的任務(wù)隊(duì)列中緩存了很多未執(zhí)行的任務(wù)已旧,當(dāng)線程池的線程數(shù)達(dá)到corePoolSize后,就不會(huì)再增加了召娜;若后續(xù)有新的任務(wù)加入运褪,則直接進(jìn)入隊(duì)列等待,當(dāng)使用這種任務(wù)隊(duì)列模式時(shí)玖瘸,一定要注意你任務(wù)提交與處理之間的協(xié)調(diào)與控制秸讹,不然會(huì)出現(xiàn)隊(duì)列中的任務(wù)由于無法及時(shí)處理導(dǎo)致一直增長,直到最后資源耗盡的問題雅倒。

PriorityBlockingQueue

4璃诀、優(yōu)先任務(wù)隊(duì)列:優(yōu)先任務(wù)隊(duì)列通過PriorityBlockingQueue實(shí)現(xiàn):

任務(wù)會(huì)按優(yōu)先級重新排列執(zhí)行,且線程池的線程數(shù)一直為corePoolSize蔑匣,也就是只有一個(gè)劣欢。

PriorityBlockingQueue其實(shí)是一個(gè)特殊的無界隊(duì)列,它其中無論添加了多少個(gè)任務(wù)裁良,線程池創(chuàng)建的線程數(shù)也不會(huì)超過corePoolSize的數(shù)量凿将,只不過其他隊(duì)列一般是按照先進(jìn)先出的規(guī)則處理任務(wù),而PriorityBlockingQueue隊(duì)列可以自定義規(guī)則根據(jù)任務(wù)的優(yōu)先級順序先后執(zhí)行趴久。

其實(shí)LinkedBlockingQueue也是可以設(shè)置界限的丸相,它默認(rèn)的界限是Integer.MAX_VALUE。同時(shí)也支持也支持構(gòu)造的時(shí)候設(shè)置隊(duì)列大小彼棍。

拒絕策略

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

當(dāng)Executor已經(jīng)關(guān)閉灭忠,即執(zhí)行了executorService.shutdown()方法后,或者Executor將有限邊界用于最大線程和工作隊(duì)列容量座硕,且已經(jīng)飽和時(shí)弛作。使用方法execute()提交的新任務(wù)將被拒絕.
在以上述情況下,execute方法將調(diào)用其RejectedExecutionHandlerRejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor)方法华匾。

AbortPolicy 默認(rèn)的拒絕策略

也稱為終止策略映琳,遭到拒絕將拋出運(yùn)行時(shí)RejectedExecutionException。業(yè)務(wù)方能通過捕獲異常及時(shí)得到對本次任務(wù)提交的結(jié)果反饋蜘拉。

public static class AbortPolicy implements RejectedExecutionHandler {
  public AbortPolicy() { }
  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                         " rejected from " +
                                         e.toString());
  }
}

CallerRunsPolicy

擁有自主反饋控制萨西,讓提交者執(zhí)行提交任務(wù),能夠減緩新任務(wù)的提交速度旭旭。這種情況是需要讓所有的任務(wù)都執(zhí)行完畢谎脯。

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

DiscardPolicy

拒絕任務(wù)的處理程序,靜默丟棄任務(wù)持寄。使用此策略源梭,我們可能無法感知系統(tǒng)的異常狀態(tài)娱俺。慎用~!

public static class DiscardPolicy implements RejectedExecutionHandler {
    public DiscardPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

DiscardOldestPolicy

丟棄隊(duì)列中最前面的任務(wù)废麻,然后重新提交被拒絕的任務(wù)荠卷。是否要使用此策略需要看業(yè)務(wù)是否需要新老的替換,慎用~烛愧!

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}

內(nèi)核

前面講了線程池的外觀油宜,接下來講述它的內(nèi)核屑彻。

線程池在內(nèi)部實(shí)際上構(gòu)建了一個(gè)生產(chǎn)者消費(fèi)者模型验庙,將線程任務(wù)兩者解耦,并不直接關(guān)聯(lián)社牲,從而良好的緩沖任務(wù)粪薛,復(fù)用線程。

線程池的運(yùn)行主要分成兩部分:任務(wù)管理搏恤、線程管理违寿。

任務(wù)管理部分充當(dāng)生產(chǎn)者的角色,當(dāng)任務(wù)提交后熟空,線程池會(huì)判斷該任務(wù)后續(xù)的流轉(zhuǎn):

  1. 直接申請線程執(zhí)行該任務(wù)藤巢;
  2. 緩沖到隊(duì)列中等待線程執(zhí)行;
  3. 拒絕該任務(wù)息罗。

線程管理部分是消費(fèi)者掂咒,它們被統(tǒng)一維護(hù)在線程池內(nèi),根據(jù)任務(wù)請求進(jìn)行線程的分配迈喉,當(dāng)線程執(zhí)行完任務(wù)后則會(huì)繼續(xù)獲取新的任務(wù)去執(zhí)行绍刮,最終當(dāng)線程獲取不到任務(wù)的時(shí)候,線程就會(huì)被回收挨摸。

接下來孩革,我們會(huì)按照以下三個(gè)部分去詳細(xì)講解線程池運(yùn)行機(jī)制:

  1. 線程池如何維護(hù)自身狀態(tài)。
  2. 線程池如何管理任務(wù)得运。
  3. 線程池如何管理線程膝蜈。

線程池的生命周期

線程池運(yùn)行的狀態(tài),并不是用戶顯式設(shè)置的熔掺,而是伴隨著線程池的運(yùn)行饱搏,由內(nèi)部來維護(hù)。

線程池內(nèi)部使用一個(gè)變量維護(hù)兩個(gè)值:運(yùn)行狀態(tài)(runState)和線程數(shù)量 (workerCount)置逻。

在具體實(shí)現(xiàn)中推沸,線程池將運(yùn)行狀態(tài)(runState)、線程數(shù)量 (workerCount)兩個(gè)關(guān)鍵參數(shù)的維護(hù)放在了一起:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl這個(gè)AtomicInteger類型诽偷,是對線程池的運(yùn)行狀態(tài)和線程池中有效線程的數(shù)量進(jìn)行控制的一個(gè)字段.

它同時(shí)包含兩部分的信息:線程池的運(yùn)行狀態(tài) (runState) 和線程池內(nèi)有效線程的數(shù)量 (workerCount)坤学,高3位保存runState,低29位保存workerCount报慕,兩個(gè)變量之間互不干擾深浮。

用一個(gè)變量去存儲兩個(gè)值,可避免在做相關(guān)決策時(shí)眠冈,出現(xiàn)不一致的情況飞苇,不必為了維護(hù)兩者的一致,而占用鎖資源蜗顽。通過閱讀線程池源代碼也可以發(fā)現(xiàn)布卡,經(jīng)常出現(xiàn)要同時(shí)判斷線程池運(yùn)行狀態(tài)和線程數(shù)量的情況。線程池也提供了若干方法去供用戶獲得線程池當(dāng)前的運(yùn)行狀態(tài)雇盖、線程個(gè)數(shù)忿等。這里都使用的是位運(yùn)算的方式,相比于基本運(yùn)算崔挖,速度也會(huì)快很多(PS:這種用法在許多源代碼中都可以看到)贸街。

關(guān)于內(nèi)部封裝的獲取生命周期狀態(tài)、獲取線程池線程數(shù)量的計(jì)算方法如以下代碼所示:

private static final int COUNT_BITS = Integer.SIZE - 3;//32-3
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;//低29位都為1狸相,高位都為0

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;//111
private static final int SHUTDOWN   =  0 << COUNT_BITS;//000
private static final int STOP       =  1 << COUNT_BITS;//001
private static final int TIDYING    =  2 << COUNT_BITS;//010
private static final int TERMINATED =  3 << COUNT_BITS;//011

// Packing and unpacking ctl
//計(jì)算當(dāng)前運(yùn)行狀態(tài)薛匪,取高三位
private static int runStateOf(int c)     { return c & ~CAPACITY; }
//計(jì)算當(dāng)前線程數(shù)量,取低29位
private static int workerCountOf(int c)  { return c & CAPACITY; }
//通過狀態(tài)和線程數(shù)生成ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }

ThreadPoolExecutor的運(yùn)行狀態(tài)有5種脓鹃,分別為:

運(yùn)行狀態(tài) 狀態(tài)描述
RUNNING 能接受新提交的任務(wù)逸尖,并且也能處理阻塞隊(duì)列中的任務(wù)
SHUTDOWN 不能接受新提交的任務(wù),但卻可以繼續(xù)處理阻塞隊(duì)列中的任務(wù)
STOP 不能接受新任務(wù)瘸右,也不能處理隊(duì)列中的任務(wù)同時(shí)會(huì)中斷正在處理的任務(wù)線程
TIDYING 所有的任務(wù)都已經(jīng)終止娇跟,workCount(有效線程數(shù))為0
TERMINATED 在terminated方法執(zhí)行完之后進(jìn)入該狀態(tài)
線程池聲明周期.jpg

任務(wù)調(diào)度機(jī)制

任務(wù)調(diào)度是線程池的主要入口,當(dāng)用戶提交了一個(gè)任務(wù)尊浓,接下來這個(gè)任務(wù)將如何執(zhí)行都是由這個(gè)階段決定的逞频。了解這部分就相當(dāng)于了解了線程池的核心運(yùn)行機(jī)制。

首先栋齿,所有任務(wù)的調(diào)度都是由execute方法完成的苗胀,這部分完成的工作是:檢查現(xiàn)在線程池的運(yùn)行狀態(tài)運(yùn)行線程數(shù)瓦堵、運(yùn)行策略基协,決定接下來執(zhí)行的流程,是直接申請線程執(zhí)行菇用,或是緩沖到隊(duì)列中執(zhí)行澜驮,亦或是直接拒絕該任務(wù)。其執(zhí)行過程如下:

  1. 首先檢測線程池運(yùn)行狀態(tài)惋鸥,如果不是RUNNING杂穷,則直接拒絕悍缠,線程池要保證在RUNNING的狀態(tài)下執(zhí)行任務(wù)。
  2. 如果workerCount < corePoolSize耐量,則創(chuàng)建并啟動(dòng)一個(gè)線程來執(zhí)行新提交的任務(wù)飞蚓。
  3. 如果workerCount >= corePoolSize,且線程池內(nèi)的阻塞隊(duì)列未滿廊蜒,則將任務(wù)添加到該阻塞隊(duì)列中趴拧。
  4. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且線程池內(nèi)的阻塞隊(duì)列已滿山叮,則創(chuàng)建并啟動(dòng)一個(gè)線程來執(zhí)行新提交的任務(wù)著榴。
  5. 如果workerCount >= maximumPoolSize,并且線程池內(nèi)的阻塞隊(duì)列已滿, 則根據(jù)拒絕策略來處理該任務(wù), 默認(rèn)的處理方式是直接拋異常屁倔。
任務(wù)調(diào)度流程圖.png

接下來進(jìn)入源代碼分析時(shí)間~脑又!

提交任務(wù)

//AbstractExecutorService.java
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}
//ThreadPoolExecutor.java
public void execute(Runnable command) {
  if (command == null)
    throw new NullPointerException();
  int c = ctl.get();//獲取ctl
  //檢查當(dāng)前核心線程數(shù),是否小于核心線程數(shù)的大小限制
  if (workerCountOf(c) < corePoolSize) {
    //沒有達(dá)到核心線程數(shù)的大小限制锐借,那么添家核心線程執(zhí)行該任務(wù)
    if (addWorker(command, true))
      return;
    //如果添加失敗挂谍,刷新ctl值
    c = ctl.get();
  }
  //再次檢查線程池的運(yùn)行狀態(tài),將任務(wù)添加到等待隊(duì)列中
  if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();//刷新ctl值
    //如果當(dāng)前線程池的裝不是運(yùn)行狀態(tài)瞎饲,那么移除剛才添加的任務(wù)
    if (! isRunning(recheck) && remove(command))
      reject(command);//移除成功后口叙,使用拒絕策略處理該任務(wù);
    else if (workerCountOf(recheck) == 0)//當(dāng)前工作線程數(shù)為0
      //線程池正在運(yùn)行嗅战,或者移除任務(wù)失敗妄田。
      //添加一個(gè)非核心線程,并不指定該線程的運(yùn)行任務(wù)驮捍。
      //等線程創(chuàng)建完成之后疟呐,會(huì)從等待隊(duì)列中獲取任務(wù)執(zhí)行。
      addWorker(null, false);
  } 
  //邏輯到這里說明線程池已經(jīng)不是RUNNING狀態(tài)东且,或者等待隊(duì)列已滿启具,需要?jiǎng)?chuàng)建一個(gè)新的非核心線程執(zhí)行該任務(wù);
  //如果創(chuàng)建失敗珊泳,那么非核心線程已滿鲁冯,使用拒絕策略處理該任務(wù);
  else if (!addWorker(command, false))
    reject(command);
}

添加工作線程和執(zhí)行任務(wù)

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
  Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;//初始化的任務(wù)色查,可以為null
    this.thread = getThreadFactory().newThread(this);//Worker持有的線程
  }
  /**部分代碼省略*/
    public void run() {
    runWorker(this);
    }
}

添加工作線程和執(zhí)行任務(wù):總體就是創(chuàng)建Worker薯演,并且為它找到匹配的Runnable

addwork.png

添加工作線程

增加線程是通過線程池中的addWorker方法秧了,該方法的功能就是增加一個(gè)線程跨扮,該方法不考慮線程池是在哪個(gè)階段增加的該線程,這個(gè)分配線程的策略是在上個(gè)步驟完成的,該步驟僅僅完成增加線程衡创,并使它運(yùn)行帝嗡,最后返回是否成功這個(gè)結(jié)果。

addWorker方法有兩個(gè)參數(shù):firstTask璃氢、core丈探。

firstTask參數(shù)用于指定新增的線程執(zhí)行的第一個(gè)任務(wù),該參數(shù)可以為空拔莱;

core參數(shù)為true表示在新增線程時(shí)會(huì)判斷當(dāng)前活動(dòng)線程數(shù)是否少于corePoolSizefalse表示新增線程前需要判斷當(dāng)前活動(dòng)線程數(shù)是否少于maximumPoolSize隘竭。

addwork2.png
private boolean addWorker(Runnable firstTask, boolean core) {
    retry://break和continue的跳出標(biāo)簽
    for (;;) {
        int c = ctl.get();//獲取ctl的值
        int rs = runStateOf(c);//獲取當(dāng)前線程池的狀態(tài)塘秦;
        /**
         * 1、如果當(dāng)前的線程池狀態(tài)不是RUNNING
         * 2动看、當(dāng)前線程池是RUNNING而且沒有添加新任務(wù)尊剔,而且等待隊(duì)列不為空。這種情況下是需要?jiǎng)?chuàng)建執(zhí)行線程的菱皆。
         * 所以滿足1须误,但不滿足2就創(chuàng)建執(zhí)行線程失敗,返回false仇轻。
         */
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
                firstTask == null &&
                ! workQueue.isEmpty()))
            return false;
        /**進(jìn)入內(nèi)層循環(huán) */
        for (;;) {
            int wc = workerCountOf(c);//獲取當(dāng)前執(zhí)行線程的數(shù)量
            /**
             * 1、工作線程數(shù)量大于或等于計(jì)數(shù)器的最大閾值,那么創(chuàng)建執(zhí)行線程失敗裤唠,返回false花椭。
             * 2、如果當(dāng)前創(chuàng)建的核心線程疲陕,那么工作線程數(shù)大于corePoolSize的話方淤,創(chuàng)建執(zhí)行線程失敗,返回false蹄殃。
             * 3携茂、如果當(dāng)前創(chuàng)建的是非核心線程,那么工作線程數(shù)大于maximumPoolSize的話诅岩,創(chuàng)建執(zhí)行線程失敗讳苦,返回false。
             */
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //用CAS操作讓線程數(shù)加1吩谦,如果成功跳出整個(gè)循環(huán)
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)//線程狀態(tài)前后不一樣医吊,重新執(zhí)行外循環(huán)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
            //如果CAS操作由于工作線程數(shù)的增加失敗,那么重新進(jìn)行內(nèi)循環(huán)
        }
    }
    /**就現(xiàn)在逮京,線程數(shù)已經(jīng)增加了卿堂。但是真正的線程對象還沒有創(chuàng)建出來。*/
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();//加鎖
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                /**
                 * 再次檢查線程池的運(yùn)行狀態(tài)
                 * 1、如果是RUNNING狀態(tài)草描,那么可以創(chuàng)建览绿;
                 * 2、如果是SHUTDOWN狀態(tài)穗慕,但沒有執(zhí)行線程饿敲,可以創(chuàng)建(創(chuàng)建后執(zhí)行等待隊(duì)列中的任務(wù))
                 */
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    //檢測該線程是否已經(jīng)開啟
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);//將改工作線程添加到集合中
                    int s = workers.size();
                    if (s > largestPoolSize)//更新線程池的運(yùn)行時(shí)的最大線程數(shù)
                        largestPoolSize = s;
                    workerAdded = true;//標(biāo)識工作線程添加成功
                }
            } finally {//釋放鎖
                mainLock.unlock();
            }
            if (workerAdded) {//如果工作線程添加成功,那么開啟線程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //如果工作線程添加失敗逛绵,那么進(jìn)行失敗處理
        //將已經(jīng)增加的線程數(shù)減少怀各,將添加到集合的工作線程刪除
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

執(zhí)行任務(wù)

添加工作線程部分我們看到了,添加成功之后會(huì)開啟線程執(zhí)行任務(wù)术浪。

runwork.png
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    //解鎖瓢对,允許中斷
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //如果當(dāng)前的工作線程已經(jīng)有執(zhí)行任務(wù),或者可以從等待隊(duì)列中獲取到執(zhí)行任務(wù)
        //getTask獲取任務(wù)時(shí)候會(huì)進(jìn)行阻塞
        while (task != null || (task = getTask()) != null) {
            w.lock();//開始執(zhí)行胰苏,上鎖
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            //判斷線程是否需要中斷
            //如果線程池狀態(tài)是否為STOP\TIDYING\TERMINATED,同時(shí)當(dāng)前線程沒有被中斷那么將當(dāng)前線程進(jìn)行中斷
            if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                    runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;//異常處理
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;//工作線程的當(dāng)前任務(wù)置空
                w.completedTasks++;//當(dāng)前工作線程執(zhí)行完成的線程數(shù)+1
                w.unlock();//執(zhí)行完成解鎖
            }
        }
        completedAbruptly = false;//完成了所有任務(wù)硕蛹,正常退出
    } finally {//執(zhí)行工作線程的退出操作
        processWorkerExit(w, completedAbruptly);
    }
}

工作線程獲取任務(wù)

getTask.jpg
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();//獲取ctl的值
        int rs = runStateOf(c);//獲取線程池狀態(tài)

        // Check if queue empty only if necessary.
        /**
         * 1、rs為STOP\TIDYING\TERMINATED硕并,標(biāo)識無法繼續(xù)執(zhí)行任務(wù)
         * 2法焰、等待隊(duì)列中沒有任務(wù)可以被執(zhí)行
         * 工作線程數(shù)量減一
         */
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);//獲取工作線程數(shù)量
        // Are workers subject to culling?
        //如果允許核心線程超時(shí),或者當(dāng)前工作線程數(shù)量大于核心線程數(shù)量倔毙。標(biāo)識需要進(jìn)行超時(shí)檢測
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        /**
         * 1埃仪、如果當(dāng)前工作線程數(shù)是否大于線程池可允許的最大工作線程數(shù)(maximumPoolSize可以動(dòng)態(tài)設(shè)置)
         * ,或者當(dāng)前需要進(jìn)行超時(shí)控制并且上次從等待隊(duì)列中獲取執(zhí)行任務(wù)發(fā)生了超時(shí)陕赃。
         * 2贵试、如果當(dāng)前不是唯一的線程,并且等待隊(duì)列中沒有需要執(zhí)行的任務(wù)凯正。
         * 這兩種情況下一起存在就表示毙玻,工作線程發(fā)生了超時(shí)需要回收,所以對線程數(shù)進(jìn)行-1廊散;
         */
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))//線程數(shù)量減少成功桑滩,否則重新執(zhí)行本次循環(huán)
                return null;
            continue;
        }
        try {
            //如果設(shè)置有超時(shí),那么設(shè)定超時(shí)時(shí)間允睹。否則進(jìn)行無限的阻塞等待執(zhí)行任務(wù)
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;//獲取超時(shí),設(shè)置標(biāo)記
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

工作線程的退出

線程池中線程的銷毀依賴JVM自動(dòng)的回收运准,線程池做的工作是根據(jù)當(dāng)前線程池的狀態(tài)維護(hù)一定數(shù)量的線程引用,防止這部分線程被JVM回收缭受,當(dāng)線程池決定哪些線程需要回收時(shí)胁澳,只需要將其引用消除即可。Worker被創(chuàng)建出來后米者,就會(huì)不斷地進(jìn)行輪詢韭畸,然后獲取任務(wù)去執(zhí)行宇智,核心線程可以無限等待獲取任務(wù),非核心線程要限時(shí)獲取任務(wù)胰丁。當(dāng)Worker無法獲取到任務(wù)随橘,也就是獲取的任務(wù)為空時(shí),循環(huán)會(huì)結(jié)束锦庸,Worker會(huì)主動(dòng)消除自身在線程池內(nèi)的引用机蔗。

processWorkerExit.png
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    //completedAbruptly為true,標(biāo)識該工作線程執(zhí)行出現(xiàn)了異常甘萧,將工作線程數(shù)減一
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();
    //否則標(biāo)識該工作線程為正常結(jié)束萝嘁,這種情況下getTask方法中已經(jīng)對工作線程進(jìn)行了減一
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();//加鎖
    try {
        completedTaskCount += w.completedTasks;//更新線程池的,線程執(zhí)行完成數(shù)量
        workers.remove(w);//工作線程容器移除該工作線程
    } finally {
        mainLock.unlock();//解鎖
    }
    //嘗試結(jié)束線程池
    tryTerminate();
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {//如果當(dāng)前線程池的運(yùn)行狀態(tài)是RUNNING\SHUTDOWN
        if (!completedAbruptly) {//如果該工作線程為正常結(jié)束
            /**
             * 判斷當(dāng)前需要的最少的核心線程數(shù)(如果允許核心線程超時(shí)扬卷,那么最小的核心線程數(shù)為0牙言,否則為corePoolSize)
             */
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            //如果允許核心線程超時(shí),而且等待隊(duì)列不為空邀泉,那么工作線程的最小值為1,否則為0钝鸽。
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            //當(dāng)前工作線程數(shù)汇恤,是否滿足最先的核心線程數(shù)
            if (workerCountOf(c) >= min)
                //如果滿足那么直接return
                return; // replacement not needed
        }
        //如果是異常結(jié)束,或者當(dāng)前線程數(shù)不滿足最小的核心線程數(shù)拔恰,那么添加一個(gè)非核心線程
        //核心線程和非核心線程沒有什么不同因谎,只是在創(chuàng)建的時(shí)候判斷邏輯不同
        addWorker(null, false);
    }
}

特需

線程池的監(jiān)控

通過線程池提供的參數(shù)進(jìn)行監(jiān)控。線程池里有一些屬性在監(jiān)控線程池的時(shí)候可以使用

  • getTaskCount:線程池已經(jīng)執(zhí)行的和未執(zhí)行的任務(wù)總數(shù)颜懊;
  • getCompletedTaskCount:線程池已完成的任務(wù)數(shù)量财岔,該值小于等于taskCount
  • getLargestPoolSize:線程池曾經(jīng)創(chuàng)建過的最大線程數(shù)量河爹。通過這個(gè)數(shù)據(jù)可以知道線程池是否滿過匠璧,也就是達(dá)到了maximumPoolSize
  • getPoolSize:線程池當(dāng)前的線程數(shù)量咸这;
  • getActiveCount:當(dāng)前線程池中正在執(zhí)行任務(wù)的線程數(shù)量夷恍。

動(dòng)態(tài)調(diào)整線程池的大小

JDK允許線程池使用方通過ThreadPoolExecutor的實(shí)例來動(dòng)態(tài)設(shè)置線程池的核心策略,以setCorePoolSize為方法例;

在運(yùn)行期線程池使用方調(diào)用此方法設(shè)置corePoolSize之后媳维,線程池會(huì)直接覆蓋原來的corePoolSize值酿雪,并且基于當(dāng)前值和原始值的比較結(jié)果采取不同的處理策略。

對于當(dāng)前值小于當(dāng)前工作線程數(shù)的情況侄刽,說明有多余的worker線程指黎,此時(shí)會(huì)向當(dāng)前idleworker線程發(fā)起中斷請求以實(shí)現(xiàn)回收,多余的worker在下次idel的時(shí)候也會(huì)被回收州丹;對于當(dāng)前值大于原始值且當(dāng)前隊(duì)列中有待執(zhí)行任務(wù)醋安,則線程池會(huì)創(chuàng)建新的worker線程來執(zhí)行隊(duì)列任務(wù)(PS:idel狀態(tài)為worker線程釋放鎖之后的狀態(tài),因?yàn)樗谶\(yùn)行期間都是上鎖的)。

setCorePoolSize.png
public void setCorePoolSize(int corePoolSize) {
    if (corePoolSize < 0)
        throw new IllegalArgumentException();
    //計(jì)算增量
    int delta = corePoolSize - this.corePoolSize;
    //覆蓋原有的corePoolSize
    this.corePoolSize = corePoolSize;
    //如果當(dāng)前的工作線程數(shù)量大于線程池的最大可運(yùn)行核心線程數(shù)量茬故,那么進(jìn)行中斷工作線程處理
    if (workerCountOf(ctl.get()) > corePoolSize)
        interruptIdleWorkers();
    else if (delta > 0) {//如果增量大于0
        // We don't really know how many new threads are "needed".
        // As a heuristic, prestart enough new workers (up to new
        // core size) to handle the current number of tasks in
        // queue, but stop if queue becomes empty while doing so.
        //等待隊(duì)列非空盖灸,獲取等待任務(wù)和增量的最小值
        int k = Math.min(delta, workQueue.size());
        //循環(huán)創(chuàng)建核心工作線程執(zhí)行等待隊(duì)列中的任務(wù)
        while (k-- > 0 && addWorker(null, true)) {
            if (workQueue.isEmpty())
                break;
        }
    }
}
private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();//加鎖
    try {
        //遍歷工作線程的集合
        for (Worker w : workers) {
            Thread t = w.thread;
            //如果當(dāng)前線程沒有被中斷,而且能獲取到鎖磺芭,那么嘗試進(jìn)行中斷赁炎,最后釋放鎖
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            //是否僅僅中斷一個(gè)工作線程
            if (onlyOne)
                break;
        }
    } finally {//釋放鎖
        mainLock.unlock();
    }
}

優(yōu)雅的關(guān)閉線程池

從《線程池聲明周期》圖上還可以看到,當(dāng)我們執(zhí)行 ThreadPoolExecutor#shutdown 方法將會(huì)使線程池狀態(tài)從 RUNNING 轉(zhuǎn)變?yōu)?SHUTDOWN钾腺。而調(diào)用 ThreadPoolExecutor#shutdownNow 之后線程池狀態(tài)將會(huì)從 RUNNING 轉(zhuǎn)變?yōu)?STOP徙垫。

shutdown

停止接收新任務(wù),原來的任務(wù)繼續(xù)執(zhí)行

  1. 停止接收新的submit的任務(wù)放棒;
  2. 已經(jīng)提交的任務(wù)(包括正在跑的和隊(duì)列中等待的),會(huì)繼續(xù)執(zhí)行完成姻报;
  3. 等到第2步完成后,才真正停止间螟;
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();// 檢查權(quán)限
        advanceRunState(SHUTDOWN);// 設(shè)置線程池狀態(tài)
        interruptIdleWorkers();// 中斷空閑線程
        // 鉤子函數(shù)吴旋,主要用于清理一些資源
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

shutdown 方法首先加鎖,其次先檢查系統(tǒng)安裝狀態(tài)厢破。接著就會(huì)將線程池狀態(tài)變?yōu)?SHUTDOWN荣瑟,在這之后線程池不再接受提交的新任務(wù)。此時(shí)如果還繼續(xù)往線程池提交任務(wù)摩泪,將會(huì)使用線程池拒絕策略響應(yīng)笆焰,默認(rèn)情況下將會(huì)使用 ThreadPoolExecutor.AbortPolicy,拋出 RejectedExecutionException 異常见坑。

interruptIdleWorkers 方法在動(dòng)態(tài)調(diào)整線程池大小部分有源碼講述嚷掠,它只會(huì)中斷空閑的線程,不會(huì)中斷正在執(zhí)行任務(wù)的的線程荞驴〔唤裕空閑的線程將會(huì)阻塞在線程池的阻塞隊(duì)列上。

shutdownNow

停止接收新任務(wù)熊楼,原來的任務(wù)停止執(zhí)行

  1. shutdown()一樣粟焊,先停止接收新submit的任務(wù);
  2. 忽略隊(duì)列里等待的任務(wù)孙蒙;
  3. 嘗試將正在執(zhí)行的任務(wù)interrupt中斷项棠;
  4. 返回未執(zhí)行的任務(wù)列表;
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();// 檢查狀態(tài)
        advanceRunState(STOP);// 將線程池狀態(tài)變?yōu)?STOP
        interruptWorkers();// 中斷所有線程挎峦,包括工作線程以及空閑線程
        tasks = drainQueue();// 丟棄工作隊(duì)列中存量任務(wù)
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}
private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)
            //如果工作線程已經(jīng)開始香追,那么調(diào)用interrupt進(jìn)行中斷
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}
private List<Runnable> drainQueue() {
    BlockingQueue<Runnable> q = workQueue;
    ArrayList<Runnable> taskList = new ArrayList<Runnable>();
    //從此隊(duì)列中刪除所有可用的元素,并將它們添加到給定的集合中坦胶。
    q.drainTo(taskList);
    //如果隊(duì)列是DelayQueue或其他類型的隊(duì)列透典,而poll或drainTo可能無法刪除某些元素晴楔,則會(huì)將它們逐個(gè)刪除。
    if (!q.isEmpty()) {
        for (Runnable r : q.toArray(new Runnable[0])) {
            if (q.remove(r))
                taskList.add(r);
        }
    }
    return taskList;
}

shutdownNow試圖終止線程的方法是通過調(diào)用 Thread.interrupt() 方法來實(shí)現(xiàn)的峭咒,這種方法的作用有限税弃,如果線程中沒有sleepwait凑队、Condition则果、定時(shí)鎖等應(yīng)用, interrupt() 方法是無法中斷當(dāng)前的線程的。所以漩氨,shutdownNow()并不代表線程池就一定立即就能退出西壮,它也可能必須要等待所有正在執(zhí)行的任務(wù)都執(zhí)行完成了才能退出。但是大多數(shù)時(shí)候是能立即退出的叫惊。

線程中斷機(jī)制: thread#interrupt 只是設(shè)置一個(gè)中斷標(biāo)志款青,不會(huì)立即中斷正常的線程。如果想讓中斷立即生效霍狰,必須在線程 內(nèi)調(diào)用 Thread.interrupted() 判斷線程的中斷狀態(tài)抡草。 對于阻塞的線程,調(diào)用中斷時(shí)蔗坯,線程將會(huì)立刻退出阻塞狀態(tài)并拋出 InterruptedException 異常康震。所以對于阻塞線程需要正確處理 InterruptedException 異常。

awaitTermination

線程池 shutdownshutdownNow 方法都不會(huì)主動(dòng)等待執(zhí)行任務(wù)的結(jié)束步悠,如果需要等到線程池任務(wù)執(zhí)行結(jié)束签杈,需要調(diào)用 awaitTermination 主動(dòng)等待任務(wù)調(diào)用結(jié)束瘫镇。

  • 等所有已提交的任務(wù)(包括正在跑的和隊(duì)列中等待的)執(zhí)行完鼎兽;
  • 等超時(shí)時(shí)間到了;
  • 線程被中斷铣除,拋出InterruptedException;

如果線程池任務(wù)執(zhí)行結(jié)束谚咬,awaitTermination 方法將會(huì)返回 true,否則當(dāng)?shù)却龝r(shí)間超過指定時(shí)間后將會(huì)返回 false尚粘。

// 關(guān)閉線程池的鉤子函數(shù)
private static void shutdown(ExecutorService executorService) {
    // 第一步:使新任務(wù)無法提交
    executorService.shutdown();
    try {
        // 第二步:等待未完成任務(wù)結(jié)束
        if(!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
             // 第三步:取消當(dāng)前執(zhí)行的任務(wù)
            executorService.shutdownNow();
            // 第四步:等待任務(wù)取消的響應(yīng)
            if(!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                System.err.println("Thread pool did not terminate");
            }
        }
    } catch(InterruptedException ie) {
        // 第五步:出現(xiàn)異常后择卦,重新取消當(dāng)前執(zhí)行的任務(wù)
        executorService.shutdownNow();
        Thread.currentThread().interrupt(); // 設(shè)置本線程中斷狀態(tài)
    }
}

其他

感覺內(nèi)容好多,寫不完啊~郎嫁!

說的線程池就不得不說多線程并發(fā)操作秉继,同步異步泽铛,CSA尚辑,AQS公平鎖和非公平鎖盔腔,可重入鎖和非可重入鎖等各種并發(fā)控制需要的知識點(diǎn)杠茬。

平常工作中使用比較少月褥,自己有沒有系統(tǒng)的知識體系結(jié)構(gòu)。導(dǎo)致好多學(xué)過之后忘掉瓢喉,然后又學(xué)習(xí)又忘記宁赤。

希望我以后有機(jī)會(huì)能逐步進(jìn)行學(xué)習(xí)和分享。

文章到這里就全部講述完啦栓票,若有其他需要交流的可以留言哦决左!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末逗载,一起剝皮案震驚了整個(gè)濱河市哆窿,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌厉斟,老刑警劉巖挚躯,帶你破解...
    沈念sama閱讀 217,185評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異擦秽,居然都是意外死亡码荔,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評論 3 393
  • 文/潘曉璐 我一進(jìn)店門感挥,熙熙樓的掌柜王于貴愁眉苦臉地迎上來缩搅,“玉大人,你說我怎么就攤上這事触幼∨鸢辏” “怎么了?”我有些...
    開封第一講書人閱讀 163,524評論 0 353
  • 文/不壞的土叔 我叫張陵置谦,是天一觀的道長堂鲤。 經(jīng)常有香客問我,道長媒峡,這世上最難降的妖魔是什么瘟栖? 我笑而不...
    開封第一講書人閱讀 58,339評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮谅阿,結(jié)果婚禮上半哟,老公的妹妹穿的比我還像新娘。我一直安慰自己签餐,他們只是感情好寓涨,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,387評論 6 391
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著氯檐,像睡著了一般戒良。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上男摧,一...
    開封第一講書人閱讀 51,287評論 1 301
  • 那天蔬墩,我揣著相機(jī)與錄音译打,去河邊找鬼。 笑死拇颅,一個(gè)胖子當(dāng)著我的面吹牛奏司,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播樟插,決...
    沈念sama閱讀 40,130評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼韵洋,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了黄锤?” 一聲冷哼從身側(cè)響起搪缨,我...
    開封第一講書人閱讀 38,985評論 0 275
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎鸵熟,沒想到半個(gè)月后副编,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,420評論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡流强,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,617評論 3 334
  • 正文 我和宋清朗相戀三年痹届,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片打月。...
    茶點(diǎn)故事閱讀 39,779評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡队腐,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出奏篙,到底是詐尸還是另有隱情柴淘,我是刑警寧澤,帶...
    沈念sama閱讀 35,477評論 5 345
  • 正文 年R本政府宣布秘通,位于F島的核電站为严,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏充易。R本人自食惡果不足惜梗脾,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,088評論 3 328
  • 文/蒙蒙 一荸型、第九天 我趴在偏房一處隱蔽的房頂上張望盹靴。 院中可真熱鬧,春花似錦瑞妇、人聲如沸稿静。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,716評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽改备。三九已至,卻和暖如春蔓倍,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,857評論 1 269
  • 我被黑心中介騙來泰國打工裁眯, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留伍掀,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,876評論 2 370
  • 正文 我出身青樓焰扳,卻偏偏與公主長得像,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子滞诺,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,700評論 2 354

推薦閱讀更多精彩內(nèi)容