從“設(shè)計(jì)思想”到“源碼解讀”,一步一步深入解析Java線程池淋硝!

01 初識線程池

我們知道雹熬,線程的創(chuàng)建和銷毀都需要映射到操作系統(tǒng),因此其代價是比較高昂的谣膳。出于避免頻繁創(chuàng)建竿报、銷毀線程以及方便線程管理的需要,線程池應(yīng)運(yùn)而生继谚。

1.1 線程池優(yōu)勢

  • 降低資源消耗:線程池通常會維護(hù)一些線程(數(shù)量為 corePoolSize)烈菌,這些線程被重復(fù)使用來執(zhí)行不同的任務(wù),任務(wù)完成后不會銷毀花履。在待處理任務(wù)量很大的時候芽世,通過對線程資源的復(fù)用,避免了線程的頻繁創(chuàng)建與銷毀诡壁,從而降低了系統(tǒng)資源消耗济瓢。
  • 提高響應(yīng)速度:由于線程池維護(hù)了一批 alive 狀態(tài)的線程,當(dāng)任務(wù)到達(dá)時欢峰,不需要再創(chuàng)建線程,而是直接由這些線程去執(zhí)行任務(wù),從而減少了任務(wù)的等待時間纽帖。
  • 提高線程的可管理性:使用線程池可以對線程進(jìn)行統(tǒng)一的分配宠漩,調(diào)優(yōu)和監(jiān)控。

1.2 線程池設(shè)計(jì)思路

有句話叫做藝術(shù)來源于生活懊直,編程語言也是如此扒吁,很多設(shè)計(jì)思想能映射到日常生活中,比如面向?qū)ο笏枷胧夷摇⒎庋b雕崩、繼承,等等融撞。今天我們要說的線程池盼铁,它同樣可以在現(xiàn)實(shí)世界找到對應(yīng)的實(shí)體——工廠。

先假想一個工廠的生產(chǎn)流程:

image.png

工廠中有固定的一批工人尝偎,稱為正式工人饶火,工廠接收的訂單由這些工人去完成。當(dāng)訂單增加致扯,正式工人已經(jīng)忙不過來了肤寝,工廠會將生產(chǎn)原料暫時堆積在倉庫中,等有空閑的工人時再處理(因?yàn)楣と丝臻e了也不會主動處理倉庫中的生產(chǎn)任務(wù)抖僵,所以需要調(diào)度員實(shí)時調(diào)度)鲤看。倉庫堆積滿了后,訂單還在增加怎么辦耍群?工廠只能臨時擴(kuò)招一批工人來應(yīng)對生產(chǎn)高峰义桂,而這批工人高峰結(jié)束后是要清退的,所以稱為臨時工世吨。當(dāng)時臨時工也已招滿后(受限于工位限制澡刹,臨時工數(shù)量有上限),后面的訂單只能忍痛拒絕了耘婚。

我們做如下一番映射:

  • 工廠——線程池
  • 訂單——任務(wù)(Runnable)
  • 正式工人——核心線程
  • 臨時工——普通線程
  • 倉庫——任務(wù)隊(duì)列
  • 調(diào)度員——getTask()

getTask()是一個方法罢浇,將任務(wù)隊(duì)列中的任務(wù)調(diào)度給空閑線程,在解讀線程池有詳細(xì)介紹

映射后沐祷,形成線程池流程圖如下嚷闭,兩者是不是有異曲同工之妙?

image.png

這樣赖临,線程池的工作原理或者說流程就很好理解了胞锰,提煉成一個簡圖:

image.png

02 深入線程池

那么接下來,問題來了兢榨,線程池是具體如何實(shí)現(xiàn)這套工作機(jī)制的呢嗅榕?從Java線程池Executor框架體系可以看出:線程池的真正實(shí)現(xiàn)類是ThreadPoolExecutor顺饮,因此我們接下來重點(diǎn)研究這個類。

image.png

2.1 構(gòu)造方法

研究一個類凌那,先從它的構(gòu)造方法開始兼雄。ThreadPoolExecutor提供了4個有參構(gòu)造方法:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

解釋一下構(gòu)造方法中涉及到的參數(shù):

  • corePoolSize(必需):核心線程數(shù)。即池中一直保持存活的線程數(shù)帽蝶,即使這些線程處于空閑赦肋。但是將allowCoreThreadTimeOut參數(shù)設(shè)置為true后,核心線程處于空閑一段時間以上励稳,也會被回收佃乘。
  • maximumPoolSize(必需):池中允許的最大線程數(shù)。當(dāng)核心線程全部繁忙且任務(wù)隊(duì)列打滿之后驹尼,線程池會臨時追加線程趣避,直到總線程數(shù)達(dá)到maximumPoolSize這個上限。
  • keepAliveTime(必需):線程空閑超時時間扶欣。當(dāng)非核心線程處于空閑狀態(tài)的時間超過這個時間后鹅巍,該線程將被回收。將allowCoreThreadTimeOut參數(shù)設(shè)置為true后料祠,核心線程也會被回收骆捧。
  • unit(必需):keepAliveTime參數(shù)的時間單位。有:TimeUnit.DAYS(天)髓绽、TimeUnit.HOURS(小時)敛苇、TimeUnit.MINUTES(分鐘)、TimeUnit.SECONDS(秒)顺呕、TimeUnit.MILLISECONDS(毫秒)枫攀、TimeUnit.MICROSECONDS(微秒)、TimeUnit.NANOSECONDS(納秒)
  • workQueue(必需):任務(wù)隊(duì)列株茶,采用阻塞隊(duì)列實(shí)現(xiàn)来涨。當(dāng)核心線程全部繁忙時,后續(xù)由execute方法提交的Runnable將存放在任務(wù)隊(duì)列中启盛,等待被線程處理蹦掐。
  • threadFactory(可選):線程工廠。指定線程池創(chuàng)建線程的方式僵闯。
  • handler(可選):拒絕策略卧抗。當(dāng)線程池中線程數(shù)達(dá)到maximumPoolSize且workQueue打滿時,后續(xù)提交的任務(wù)將被拒絕鳖粟,handler可以指定用什么方式拒絕任務(wù)社裆。

放到一起再看一下:

image.png

2.2 任務(wù)隊(duì)列

使用ThreadPoolExecutor需要指定一個實(shí)現(xiàn)了BlockingQueue接口的任務(wù)等待隊(duì)列。在ThreadPoolExecutor線程池的API文檔中向图,一共推薦了三種等待隊(duì)列泳秀,它們是:SynchronousQueue标沪、LinkedBlockingQueue和ArrayBlockingQueue;

  1. SynchronousQueue:同步隊(duì)列嗜傅。這是一個內(nèi)部沒有任何容量的阻塞隊(duì)列谨娜,任何一次插入操作的元素都要等待相對的刪除/讀取操作,否則進(jìn)行插入操作的線程就要一直等待磺陡,反之亦然。
  2. LinkedBlockingQueue:無界隊(duì)列(嚴(yán)格來說并非無界漠畜,上限是Integer.MAX_VALUE)币他,基于鏈表結(jié)構(gòu)。使用無界隊(duì)列后憔狞,當(dāng)核心線程都繁忙時蝴悉,后續(xù)任務(wù)可以無限加入隊(duì)列,因此線程池中線程數(shù)不會超過核心線程數(shù)瘾敢。這種隊(duì)列可以提高線程池吞吐量拍冠,但代價是犧牲內(nèi)存空間,甚至?xí)?dǎo)致內(nèi)存溢出簇抵。另外庆杜,使用它時可以指定容量,這樣它也就是一種有界隊(duì)列了碟摆。
  3. ArrayBlockingQueue:有界隊(duì)列晃财,基于數(shù)組實(shí)現(xiàn)。在線程池初始化時典蜕,指定隊(duì)列的容量断盛,后續(xù)無法再調(diào)整。這種有界隊(duì)列有利于防止資源耗盡愉舔,但可能更難調(diào)整和控制钢猛。

另外,Java還提供了另外4種隊(duì)列:

  1. PriorityBlockingQueue:支持優(yōu)先級排序的無界阻塞隊(duì)列轩缤。存放在PriorityBlockingQueue中的元素必須實(shí)現(xiàn)Comparable接口命迈,這樣才能通過實(shí)現(xiàn)compareTo()方法進(jìn)行排序。優(yōu)先級最高的元素將始終排在隊(duì)列的頭部典奉;PriorityBlockingQueue不會保證優(yōu)先級一樣的元素的排序躺翻,也不保證當(dāng)前隊(duì)列中除了優(yōu)先級最高的元素以外的元素,隨時處于正確排序的位置卫玖。
  2. DelayQueue:延遲隊(duì)列公你。基于二叉堆實(shí)現(xiàn)假瞬,同時具備:無界隊(duì)列陕靠、阻塞隊(duì)列迂尝、優(yōu)先隊(duì)列的特征。DelayQueue延遲隊(duì)列中存放的對象剪芥,必須是實(shí)現(xiàn)Delayed接口的類對象垄开。通過執(zhí)行時延從隊(duì)列中提取任務(wù),時間沒到任務(wù)取不出來税肪。更多內(nèi)容請見DelayQueue溉躲。
  3. LinkedBlockingDeque:雙端隊(duì)列∫嫘郑基于鏈表實(shí)現(xiàn)锻梳,既可以從尾部插入/取出元素,還可以從頭部插入元素/取出元素净捅。
  4. LinkedTransferQueue:由鏈表結(jié)構(gòu)組成的無界阻塞隊(duì)列疑枯。這個隊(duì)列比較特別的時,采用一種預(yù)占模式蛔六,意思就是消費(fèi)者線程取元素時荆永,如果隊(duì)列不為空,則直接取走數(shù)據(jù)国章,若隊(duì)列為空具钥,那就生成一個節(jié)點(diǎn)(節(jié)點(diǎn)元素為null)入隊(duì),然后消費(fèi)者線程被等待在這個節(jié)點(diǎn)上液兽,后面生產(chǎn)者線程入隊(duì)時發(fā)現(xiàn)有一個元素為null的節(jié)點(diǎn)氓拼,生產(chǎn)者線程就不入隊(duì)了,直接就將元素填充到該節(jié)點(diǎn)抵碟,并喚醒該節(jié)點(diǎn)等待的線程桃漾,被喚醒的消費(fèi)者線程取走元素。

2.3 拒絕策略

線程池有一個重要的機(jī)制:拒絕策略拟逮。當(dāng)線程池workQueue已滿且無法再創(chuàng)建新線程池時撬统,就要拒絕后續(xù)任務(wù)了。拒絕策略需要實(shí)現(xiàn)RejectedExecutionHandler接口敦迄,不過Executors框架已經(jīng)為我們實(shí)現(xiàn)了4種拒絕策略:

  1. AbortPolicy(默認(rèn)):丟棄任務(wù)并拋出RejectedExecutionException異常恋追。
  2. CallerRunsPolicy:直接運(yùn)行這個任務(wù)的run方法,但并非是由線程池的線程處理罚屋,而是交由任務(wù)的調(diào)用線程處理苦囱。
  3. DiscardPolicy:直接丟棄任務(wù),不拋出任何異常脾猛。
  4. DiscardOldestPolicy:將當(dāng)前處于等待隊(duì)列列頭的等待任務(wù)強(qiáng)行取出撕彤,然后再試圖將當(dāng)前被拒絕的任務(wù)提交到線程池執(zhí)行。

線程工廠指定創(chuàng)建線程的方式,這個參數(shù)不是必選項(xiàng)羹铅,Executors類已經(jīng)為我們非常貼心地提供了一個默認(rèn)的線程工廠:

/**
 * The default thread factory
 */
static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

2.4 線程池狀態(tài)

線程池有5種狀態(tài):

volatile int runState;
// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

runState表示當(dāng)前線程池的狀態(tài)蚀狰,它是一個 volatile 變量用來保證線程之間的可見性。

下面的幾個static final變量表示runState可能的幾個取值职员,有以下幾個狀態(tài):

  • RUNNING:當(dāng)創(chuàng)建線程池后麻蹋,初始時,線程池處于RUNNING狀態(tài)焊切;
  • SHUTDOWN:如果調(diào)用了shutdown()方法扮授,則線程池處于SHUTDOWN狀態(tài),此時線程池不能夠接受新的任務(wù)专肪,它會等待所有任務(wù)執(zhí)行完畢糙箍;
  • STOP:如果調(diào)用了shutdownNow()方法,則線程池處于STOP狀態(tài)牵祟,此時線程池不能接受新的任務(wù),并且會去嘗試終止正在執(zhí)行的任務(wù)抖格;
  • TERMINATED:當(dāng)線程池處于SHUTDOWN或STOP狀態(tài)诺苹,并且所有工作線程已經(jīng)銷毀,任務(wù)緩存隊(duì)列已經(jīng)清空或執(zhí)行結(jié)束后雹拄,線程池被設(shè)置為TERMINATED狀態(tài)收奔。

2.5 初始化&容量調(diào)整&關(guān)閉

1、線程初始化

默認(rèn)情況下滓玖,創(chuàng)建線程池之后坪哄,線程池中是沒有線程的,需要提交任務(wù)之后才會創(chuàng)建線程势篡。

在實(shí)際中如果需要線程池創(chuàng)建之后立即創(chuàng)建線程翩肌,可以通過以下兩個方法辦到:

  • prestartCoreThread():boolean prestartCoreThread(),初始化一個核心線程
  • prestartAllCoreThreads():int prestartAllCoreThreads()禁悠,初始化所有核心線程念祭,并返回初始化的線程數(shù)
public boolean prestartCoreThread() {
    return addIfUnderCorePoolSize(null); //注意傳進(jìn)去的參數(shù)是null
}

public int prestartAllCoreThreads() {
    int n = 0;
    while (addIfUnderCorePoolSize(null))//注意傳進(jìn)去的參數(shù)是null
        ++n;
    return n;
}

2、線程池關(guān)閉

ThreadPoolExecutor提供了兩個方法碍侦,用于線程池的關(guān)閉:

  • shutdown():不會立即終止線程池粱坤,而是要等所有任務(wù)緩存隊(duì)列中的任務(wù)都執(zhí)行完后才終止,但再也不會接受新的任務(wù)
  • shutdownNow():立即終止線程池瓷产,并嘗試打斷正在執(zhí)行的任務(wù)站玄,并且清空任務(wù)緩存隊(duì)列,返回尚未執(zhí)行的任務(wù)

3濒旦、線程池容量調(diào)整

ThreadPoolExecutor提供了動態(tài)調(diào)整線程池容量大小的方法:

  • setCorePoolSize:設(shè)置核心池大小
  • setMaximumPoolSize:設(shè)置線程池最大能創(chuàng)建的線程數(shù)目大小

當(dāng)上述參數(shù)從小變大時株旷,ThreadPoolExecutor進(jìn)行線程賦值,還可能立即創(chuàng)建新的線程來執(zhí)行任務(wù)尔邓。

03 使用線程池

3.1 ThreadPoolExecutor

通過構(gòu)造方法使用ThreadPoolExecutor是線程池最直接的使用方式灾常,下面看一個實(shí)例:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MyTest {
    public static void main(String[] args) {
        // 創(chuàng)建線程池
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3, 5, 5, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(5));
        // 向線程池提交任務(wù)
        for (int i = 0; i < threadPool.getCorePoolSize(); i++) {
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    for (int x = 0; x < 2; x++) {
                        System.out.println(Thread.currentThread().getName() + ":" + x);
                        try {
                            Thread.sleep(2000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
        }

        // 關(guān)閉線程池
        threadPool.shutdown(); // 設(shè)置線程池的狀態(tài)為SHUTDOWN霎冯,然后中斷所有沒有正在執(zhí)行任務(wù)的線程
        // threadPool.shutdownNow(); // 設(shè)置線程池的狀態(tài)為STOP,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程钞瀑,并返回等待執(zhí)行任務(wù)的列表沈撞,該方法要慎用,容易造成不可控的后果
    }
}

運(yùn)行結(jié)果:

pool-1-thread-2:0
pool-1-thread-1:0
pool-1-thread-3:0
pool-1-thread-2:1
pool-1-thread-3:1
pool-1-thread-1:1

3.2 Executors封裝線程池

另外雕什,Executors封裝好了4種常見的功能線程池(還是那么地貼心):

1缠俺、FixedThreadPool

固定容量線程池。其特點(diǎn)是最大線程數(shù)就是核心線程數(shù)贷岸,意味著線程池只能創(chuàng)建核心線程壹士,keepAliveTime為0,即線程執(zhí)行完任務(wù)立即回收偿警。任務(wù)隊(duì)列未指定容量躏救,代表使用默認(rèn)值Integer.MAX_VALUE。適用于需要控制并發(fā)線程的場景螟蒸。

// 使用默認(rèn)線程工廠
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
// 需要自定義線程工廠
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}

使用示例:

// 1\. 創(chuàng)建線程池對象盒使,設(shè)置核心線程和最大線程數(shù)為5
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
// 2\. 創(chuàng)建Runnable(任務(wù))
Runnable task =new Runnable(){
  public void run() {
     System.out.println(Thread.currentThread().getName() + "--->運(yùn)行");
  }
};
// 3\. 向線程池提交任務(wù)
fixedThreadPool.execute(task);

2、 SingleThreadExecutor

單線程線程池七嫌。特點(diǎn)是線程池中只有一個線程(核心線程)少办,線程執(zhí)行完任務(wù)立即回收,使用有界阻塞隊(duì)列(容量未指定诵原,使用默認(rèn)值Integer.MAX_VALUE)

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
// 為節(jié)省篇幅英妓,省略了自定義線程工廠方式的源碼

使用示例:

// 1\. 創(chuàng)建單線程線程池
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
// 2\. 創(chuàng)建Runnable(任務(wù))
Runnable task = new Runnable(){
  public void run() {
     System.out.println(Thread.currentThread().getName() + "--->運(yùn)行");
  }
};
// 3\. 向線程池提交任務(wù)
singleThreadExecutor.execute(task);

3、 ScheduledThreadPool

定時線程池绍赛。指定核心線程數(shù)量蔓纠,普通線程數(shù)量無限,線程執(zhí)行完任務(wù)立即回收吗蚌,任務(wù)隊(duì)列為延時阻塞隊(duì)列贺纲。這是一個比較特別的線程池,適用于執(zhí)行定時或周期性的任務(wù)褪测。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

// 繼承了 ThreadPoolExecutor
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor
        implements ScheduledExecutorService {
    // 構(gòu)造函數(shù)猴誊,省略了自定義線程工廠的構(gòu)造函數(shù)
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
            new DelayedWorkQueue());
    }

    // 延時執(zhí)行任務(wù)
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        ...
    }
    // 定時執(zhí)行任務(wù)
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {...}
}

使用示例:

// 1\. 創(chuàng)建定時線程池
ExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
// 2\. 創(chuàng)建Runnable(任務(wù))
Runnable task = new Runnable(){
  public void run() {
     System.out.println(Thread.currentThread().getName() + "--->運(yùn)行");
  }
};
// 3\. 向線程池提交任務(wù)
scheduledThreadPool.schedule(task, 2, TimeUnit.SECONDS); // 延遲2s后執(zhí)行任務(wù)
scheduledThreadPool.scheduleAtFixedRate(task,50,2000,TimeUnit.MILLISECONDS);// 延遲50ms后、每隔2000ms執(zhí)行任務(wù)

4侮措、CachedThreadPool

緩存線程池懈叹。沒有核心線程,普通線程數(shù)量為Integer.MAX_VALUE(可以理解為無限)分扎,線程閑置60s后回收澄成,任務(wù)隊(duì)列使用SynchronousQueue這種無容量的同步隊(duì)列。適用于任務(wù)量大但耗時低的場景。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

使用示例:

// 1\. 創(chuàng)建緩存線程池
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
// 2\. 創(chuàng)建Runnable(任務(wù))
Runnable task = new Runnable(){
  public void run() {
     System.out.println(Thread.currentThread().getName() + "--->運(yùn)行");
  }
};
// 3\. 向線程池提交任務(wù)
cachedThreadPool.execute(task);

04 解讀線程池

OK墨状,相信前面內(nèi)容閱讀起來還算輕松愉悅吧卫漫,那么從這里開始就進(jìn)入深水區(qū)了,如果后面內(nèi)容能吃透肾砂,那么線程池知識就真的被你掌握了列赎。

我們知道,向線程池提交任務(wù)是用ThreadPoolExecutor的execute()方法镐确,但在其內(nèi)部包吝,線程任務(wù)的處理其實(shí)是相當(dāng)復(fù)雜的,涉及到ThreadPoolExecutor源葫、Worker诗越、Thread三個類的6個方法:

image.png

4.1 execute()

在ThreadPoolExecutor類中,任務(wù)提交方法的入口是execute(Runnable command)方法(submit()方法也是調(diào)用了execute())息堂,該方法其實(shí)只在嘗試做一件事:經(jīng)過各種校驗(yàn)之后嚷狞,調(diào)用 addWorker(Runnable command,boolean core)方法為線程池創(chuàng)建一個線程并執(zhí)行任務(wù),與之相對應(yīng)荣堰,execute() 的結(jié)果有兩個:

參數(shù)說明:

Runnable command:待執(zhí)行的任務(wù)

執(zhí)行流程:

  • 1床未、通過 ctl.get() 得到線程池的當(dāng)前線程數(shù),如果線程數(shù)小于corePoolSize持隧,則調(diào)用 addWorker(commond,true)方法創(chuàng)建新的線程執(zhí)行任務(wù),否則執(zhí)行步驟2逃片;
  • 2屡拨、步驟1失敗,說明已經(jīng)無法再創(chuàng)建新線程褥实,那么考慮將任務(wù)放入阻塞隊(duì)列呀狼,等待執(zhí)行完任務(wù)的線程來處理∷鹄耄基于此哥艇,判斷線程池是否處于Running狀態(tài)(只有Running狀態(tài)的線程池可以接受新任務(wù)),如果任務(wù)添加到任務(wù)隊(duì)列成功則進(jìn)入步驟3僻澎,失敗則進(jìn)入步驟4貌踏;
  • 3、來到這一步需要說明任務(wù)已經(jīng)加入任務(wù)隊(duì)列窟勃,這時要二次校驗(yàn)線程池的狀態(tài)祖乳,會有以下情形:
  1. 線程池不再是Running狀態(tài)了,需要將任務(wù)從任務(wù)隊(duì)列中移除秉氧,如果移除成功則拒絕本次任務(wù)
  2. 線程池是Running狀態(tài)眷昆,則判斷線程池工作線程是否為0,是則調(diào)用 addWorker(commond,true)添加一個沒有初始任務(wù)的線程(這個線程將去獲取已經(jīng)加入任務(wù)隊(duì)列的本次任務(wù)并執(zhí)行),否則進(jìn)入步驟4亚斋;
  3. 線程池不是Running狀態(tài)作媚,但從任務(wù)隊(duì)列移除任務(wù)失敗(可能已被某線程獲人Э纸泡?),進(jìn)入步驟4厚掷;
  • 4唆阿、將線程池擴(kuò)容至maximumPoolSize并調(diào)用 addWorker(commond,false)方法創(chuàng)建新的線程執(zhí)行任務(wù),失敗則拒絕本次任務(wù)伊约。

流程圖:

image.png

源碼詳讀:

/**
 * 在將來的某個時候執(zhí)行給定的任務(wù)嘁信。任務(wù)可以在新線程中執(zhí)行,也可以在現(xiàn)有的池線程中執(zhí)行抡爹。
 * 如果由于此執(zhí)行器已關(guān)閉或已達(dá)到其容量而無法提交任務(wù)以供執(zhí)行掩驱,則由當(dāng)前的{@code RejectedExecutionHandler}處理該任務(wù)。
 * 
 * @param command the task to execute  待執(zhí)行的任務(wù)命令
 */
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     * 
     * 1\. 如果運(yùn)行的線程少于corePoolSize冬竟,將嘗試以給定的命令作為第一個任務(wù)啟動新線程欧穴。
     *
     * 2\. 如果一個任務(wù)可以成功排隊(duì),那么我們?nèi)匀恍枰屑?xì)檢查兩點(diǎn)泵殴,其一涮帘,我們是否應(yīng)該添加一個線程
     * (因?yàn)樽詮纳洗螜z查至今,一些存在的線程已經(jīng)死亡)笑诅,其二调缨,線程池狀態(tài)此時已改變成非運(yùn)行態(tài)。因此吆你,我們重新檢查狀態(tài)弦叶,如果檢查不通過,則移除已經(jīng)入列的任務(wù)妇多,如果檢查通過且線程池線程數(shù)為0伤哺,則啟動新線程。
     * 
     * 3\. 如果無法將任務(wù)加入任務(wù)隊(duì)列者祖,則將線程池擴(kuò)容到極限容量并嘗試創(chuàng)建一個新線程立莉,如果失敗則拒絕任務(wù)。
     */
    int c = ctl.get();

    // 步驟1:判斷線程池當(dāng)前線程數(shù)是否小于線程池大小
    if (workerCountOf(c) < corePoolSize) {
        // 增加一個工作線程并添加任務(wù)七问,成功則返回桃序,否則進(jìn)行步驟2
        // true代表使用coreSize作為邊界約束,否則使用maximumPoolSize
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 步驟2:不滿足workerCountOf(c) < corePoolSize或addWorker失敗烂瘫,進(jìn)入步驟2
    // 校驗(yàn)線程池是否是Running狀態(tài)且任務(wù)是否成功放入workQueue(阻塞隊(duì)列)
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 再次校驗(yàn)媒熊,如果線程池非Running且從任務(wù)隊(duì)列中移除任務(wù)成功奇适,則拒絕該任務(wù)
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 如果線程池工作線程數(shù)量為0,則新建一個空任務(wù)的線程
        else if (workerCountOf(recheck) == 0)
            // 如果線程池不是Running狀態(tài)芦鳍,是加入不進(jìn)去的
            addWorker(null, false);
    }
    // 步驟3:如果線程池不是Running狀態(tài)或任務(wù)入列失敗嚷往,嘗試擴(kuò)容maxPoolSize后再次addWorker,失敗則拒絕任務(wù)
    else if (!addWorker(command, false))
        reject(command);
}

4.2 addWorker()

addWorker(Runnable firstTask, boolean core) 方法柠衅,顧名思義皮仁,向線程池添加一個帶有任務(wù)的工作線程。

參數(shù)說明:

  1. Runnable firstTask:新創(chuàng)建的線程應(yīng)該首先運(yùn)行的任務(wù)(如果沒有菲宴,則為空)贷祈。
  2. boolean core:該參數(shù)決定了線程池容量的約束條件,即當(dāng)前線程數(shù)量以何值為極限值喝峦。參數(shù)為 true 則使用corePollSize 作為約束值势誊,否則使用maximumPoolSize。

執(zhí)行流程:

1谣蠢、外層循環(huán)判斷線程池的狀態(tài)是否可以新增工作線程粟耻。這層校驗(yàn)基于下面兩個原則:

  • 線程池為Running狀態(tài)時,既可以接受新任務(wù)也可以處理任務(wù)
  • 線程池為關(guān)閉狀態(tài)時只能新增空任務(wù)的工作線程(worker)處理任務(wù)隊(duì)列(workQueue)中的任務(wù)不能接受新任務(wù)

2眉踱、內(nèi)層循環(huán)向線程池添加工作線程并返回是否添加成功的結(jié)果挤忙。

  • 首先校驗(yàn)線程數(shù)是否已經(jīng)超限制,是則返回false谈喳,否則進(jìn)入下一步
  • 通過CAS使工作線程數(shù)+1册烈,成功則進(jìn)入步驟3,失敗則再次校驗(yàn)線程池是否是運(yùn)行狀態(tài)婿禽,是則繼續(xù)內(nèi)層循環(huán)赏僧,不是則返回外層循環(huán)

3、核心線程數(shù)量+1成功的后續(xù)操作:添加到工作線程集合谈宛,并啟動工作線程

  • 首先獲取鎖之后次哈,再次校驗(yàn)線程池狀態(tài)(具體校驗(yàn)規(guī)則見代碼注解)胎署,通過則進(jìn)入下一步吆录,未通過則添加線程失敗
  • 線程池狀態(tài)校驗(yàn)通過后,再檢查線程是否已經(jīng)啟動琼牧,是則拋出異常恢筝,否則嘗試將線程加入線程池
  • 檢查線程是否啟動成功,成功則返回true巨坊,失敗則進(jìn)入 addWorkerFailed 方法

流程圖:

image.png

源碼詳讀:

private boolean addWorker(Runnable firstTask, boolean core) {
    // 外層循環(huán):判斷線程池狀態(tài)
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        /** 
         * 1.線程池為非Running狀態(tài)(Running狀態(tài)則既可以新增核心線程也可以接受任務(wù))
         * 2.線程為shutdown狀態(tài)且firstTask為空且隊(duì)列不為空
         * 3.滿足條件1且條件2不滿足撬槽,則返回false
         * 4.條件2解讀:線程池為shutdown狀態(tài)時且任務(wù)隊(duì)列不為空時,可以新增空任務(wù)的線程來處理隊(duì)列中的任務(wù)
         */
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        // 內(nèi)層循環(huán):線程池添加核心線程并返回是否添加成功的結(jié)果
        for (;;) {
            int wc = workerCountOf(c);
            // 校驗(yàn)線程池已有線程數(shù)量是否超限:
            // 1.線程池最大上限CAPACITY 
            // 2.corePoolSize或maximumPoolSize(取決于入?yún)ore)
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize)) 
                return false;
            // 通過CAS操作使工作線程數(shù)+1趾撵,跳出外層循環(huán)
            if (compareAndIncrementWorkerCount(c)) 
                break retry;
            // 線程+1失敗侄柔,重讀ctl
            c = ctl.get();   // Re-read ctl
            // 如果此時線程池狀態(tài)不再是running共啃,則重新進(jìn)行外層循環(huán)
            if (runStateOf(c) != rs)
                continue retry;
            // 其他 CAS 失敗是因?yàn)楣ぷ骶€程數(shù)量改變了,繼續(xù)內(nèi)層循環(huán)嘗試CAS對線程數(shù)+1
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    /**
     * 核心線程數(shù)量+1成功的后續(xù)操作:添加到工作線程集合暂题,并啟動工作線程
     */
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        final ReentrantLock mainLock = this.mainLock;
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            // 下面代碼需要加鎖:線程池主鎖
            mainLock.lock(); 
            try {
                // 持鎖期間重新檢查移剪,線程工廠創(chuàng)建線程失敗或獲取鎖之前關(guān)閉的情況發(fā)生時,退出
                int c = ctl.get();
                int rs = runStateOf(c);

                // 再次檢驗(yàn)線程池是否是running狀態(tài)或線程池shutdown但線程任務(wù)為空
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // 線程已經(jīng)啟動薪者,則拋出非法線程狀態(tài)異常
                    // 為什么會存在這種狀態(tài)呢纵苛?未解決
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w); //加入線程池
                    int s = workers.size();
                    // 如果當(dāng)前工作線程數(shù)超過線程池曾經(jīng)出現(xiàn)過的最大線程數(shù),刷新后者值
                    if (s > largestPoolSize)
                        largestPoolSize = s; 
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();  // 釋放鎖
            }
            if (workerAdded) { // 工作線程添加成功言津,啟動該線程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //線程啟動失敗攻人,則進(jìn)入addWorkerFailed
        if (! workerStarted) 
            addWorkerFailed(w);
    }
    return workerStarted;
}

4.3 Worker類

Worker類是內(nèi)部類,既實(shí)現(xiàn)了Runnable悬槽,又繼承了AbstractQueuedSynchronizer(以下簡稱AQS)怀吻,所以其既是一個可執(zhí)行的任務(wù),又可以達(dá)到鎖的效果陷谱。

Worker類主要維護(hù)正在運(yùn)行任務(wù)的線程的中斷控制狀態(tài)烙博,以及其他次要的記錄。這個類適時地繼承了AbstractQueuedSynchronizer類烟逊,以簡化獲取和釋放鎖(該鎖作用于每個任務(wù)執(zhí)行代碼)的過程渣窜。這樣可以防止去中斷正在運(yùn)行中的任務(wù),只會中斷在等待從任務(wù)隊(duì)列中獲取任務(wù)的線程宪躯。

我們實(shí)現(xiàn)了一個簡單的不可重入互斥鎖乔宿,而不是使用可重入鎖,因?yàn)槲覀儾幌Mぷ魅蝿?wù)在調(diào)用setCorePoolSize之類的池控制方法時能夠重新獲取鎖访雪。另外详瑞,為了在線程真正開始運(yùn)行任務(wù)之前禁止中斷,我們將鎖狀態(tài)初始化為負(fù)值臣缀,并在啟動時清除它(在runWorker中)坝橡。

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread; 

    /** Initial task to run.  Possibly null. */
    Runnable firstTask;

    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    // 通過構(gòu)造函數(shù)初始化,
    Worker(Runnable firstTask) {
        //設(shè)置AQS的同步狀態(tài)
        // state:鎖狀態(tài)精置,-1為初始值计寇,0為unlock狀態(tài),1為lock狀態(tài)
        setState(-1); // inhibit interrupts until runWorker  在調(diào)用runWorker前脂倦,禁止中斷

        this.firstTask = firstTask;
        // 線程工廠創(chuàng)建一個線程
        this.thread = getThreadFactory().newThread(this); 
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this); //runWorker()是ThreadPoolExecutor的方法
    }

    // Lock methods
    // The value 0 represents the unlocked state. 0代表“沒被鎖定”狀態(tài)
    // The value 1 represents the locked state. 1代表“鎖定”狀態(tài)

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    /**
     * 嘗試獲取鎖的方法
     * 重寫AQS的tryAcquire()番宁,AQS本來就是讓子類來實(shí)現(xiàn)的
     */
    protected boolean tryAcquire(int unused) {
        // 判斷原值為0,且重置為1赖阻,所以state為-1時蝶押,鎖無法獲取。
        // 每次都是0->1火欧,保證了鎖的不可重入性
        if (compareAndSetState(0, 1)) {
            // 設(shè)置exclusiveOwnerThread=當(dāng)前線程
            setExclusiveOwnerThread(Thread.currentThread()); 
            return true;
        }
        return false;
    }

    /**
     * 嘗試釋放鎖
     * 不是state-1棋电,而是置為0
     */
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null); 
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    /**
     * 中斷(如果運(yùn)行)
     * shutdownNow時會循環(huán)對worker線程執(zhí)行
     * 且不需要獲取worker鎖茎截,即使在worker運(yùn)行時也可以中斷
     */
    void interruptIfStarted() {
        Thread t;
        //如果state>=0、t!=null赶盔、且t沒有被中斷
        //new Worker()時state==-1稼虎,說明不能中斷
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

4.4 runWorker()

可以說,runWorker(Worker w) 是線程池中真正處理任務(wù)的方法招刨,前面的execute() 和 addWorker() 都是在為該方法做準(zhǔn)備和鋪墊霎俩。

參數(shù)說明:Worker w:封裝的Worker,攜帶了工作線程的諸多要素沉眶,包括Runnable(待處理任務(wù))打却、lock(鎖)、completedTasks(記錄線程池已完成任務(wù)數(shù))

執(zhí)行流程:

1谎倔、判斷當(dāng)前任務(wù)或者從任務(wù)隊(duì)列中獲取的任務(wù)是否不為空柳击,都為空則進(jìn)入步驟2,否則進(jìn)入步驟3

2片习、任務(wù)為空捌肴,則將completedAbruptly置為false(即線程不是突然終止),并執(zhí)行processWorkerExit(w,completedAbruptly)方法進(jìn)入線程退出程序

3藕咏、任務(wù)不為空状知,則進(jìn)入循環(huán),并加鎖

4孽查、判斷是否為線程添加中斷標(biāo)識饥悴,以下兩個條件滿足其一則添加中斷標(biāo)識:

  • 線程池狀態(tài)>=STOP,即STOP或TERMINATED
  • 一開始判斷線程池狀態(tài)<STOP,接下來檢查發(fā)現(xiàn)Thread.interrupted()為true盲再,即線程已經(jīng)被中斷西设,再次檢查線程池狀態(tài)是否>=STOP(以消除該瞬間shutdown方法生效,使線程池處于STOP或TERMINATED)

5答朋、執(zhí)行前置方法 beforeExecute(wt, task)(該方法為空方法贷揽,由子類實(shí)現(xiàn))后執(zhí)行task.run() 方法執(zhí)行任務(wù)(執(zhí)行不成功拋出相應(yīng)異常)

6、執(zhí)行后置方法 afterExecute(task, thrown)(該方法為空方法梦碗,由子類實(shí)現(xiàn))后將線程池已完成的任務(wù)數(shù)+1禽绪,并釋放鎖。

7叉弦、再次進(jìn)行循環(huán)條件判斷丐一。

流程圖:

image.png

源碼詳讀:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // allow interrupts
    // new Worker()是state==-1藻糖,此處是調(diào)用Worker類的tryRelease()方法淹冰,將state置為0,而interruptIfStarted()中只有state>=0才允許調(diào)用中斷
    w.unlock(); 

    // 線程退出的原因巨柒,true是任務(wù)導(dǎo)致樱拴,false是線程正常退出
    boolean completedAbruptly = true; 
    try {
        // 當(dāng)前任務(wù)和從任務(wù)隊(duì)列中獲取的任務(wù)都為空柠衍,方停止循環(huán)
        while (task != null || (task = getTask()) != null) {
            //上鎖可以防止在shutdown()時終止正在運(yùn)行的worker,而不是應(yīng)對并發(fā)
            w.lock(); 

            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            /**
             * 判斷1:確保只有在線程處于stop狀態(tài)且wt未中斷時晶乔,wt才會被設(shè)置中斷標(biāo)識
             * 條件1:線程池狀態(tài)>=STOP,即STOP或TERMINATED
             * 條件2:一開始判斷線程池狀態(tài)<STOP珍坊,接下來檢查發(fā)現(xiàn)Thread.interrupted()為true,即線程已經(jīng)被中斷正罢,再次檢查線程池狀態(tài)是否>=STOP(以消除該瞬間shutdown方法生效阵漏,使線程池處于STOP或TERMINATED),
             * 條件1與條件2任意滿意一個翻具,且wt不是中斷狀態(tài)履怯,則中斷wt,否則進(jìn)入下一步
             */
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt(); //當(dāng)前線程調(diào)用interrupt()中斷

            try {
                //執(zhí)行前(空方法裆泳,由子類重寫實(shí)現(xiàn))
                beforeExecute(wt, task);

                Throwable thrown = null;
                try {
                    task.run();
                } 
                catch (RuntimeException x) {
                    thrown = x; throw x;
                } 
                catch (Error x) {
                    thrown = x; throw x;
                } 
                catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } 
                finally {
                    //執(zhí)行后(空方法叹洲,由子類重寫實(shí)現(xiàn))
                    afterExecute(task, thrown); 
                }
            } 
            finally {
                task = null; 
                w.completedTasks++; //完成任務(wù)數(shù)+1
                w.unlock(); //釋放鎖
            }
        }
        // 
        completedAbruptly = false;
    } 
    finally {
        //處理worker的退出
        processWorkerExit(w, completedAbruptly);
    }
}

5、getTask()

由函數(shù)調(diào)用關(guān)系圖可知工禾,在ThreadPoolExecutor類的實(shí)現(xiàn)中运提,Runnable getTask() 方法是為void runWorker(Worker w)方法服務(wù)的,它的作用就是在任務(wù)隊(duì)列(workQueue)中獲取 task(Runnable)闻葵。

參數(shù)說明:無參數(shù)

執(zhí)行流程:

  • 將timedOut(上次獲取任務(wù)是否超時)置為false(首次執(zhí)行方法民泵,無上次,自然為false)槽畔,進(jìn)入一個無限循環(huán)
  • 如果線程池為Shutdown狀態(tài)且任務(wù)隊(duì)列為空(線程池shutdown狀態(tài)可以處理任務(wù)隊(duì)列中的任務(wù)洪灯,不再接受新任務(wù),這個是重點(diǎn))或者線程池為STOP或TERMINATED狀態(tài)竟痰,則意味著線程池不必再獲取任務(wù)了签钩,當(dāng)前工作線程數(shù)量-1并返回null,否則進(jìn)入步驟3
  • 如果線程池數(shù)量超限制或者時間超限且(任務(wù)隊(duì)列為空或當(dāng)前線程數(shù)>1)坏快,則進(jìn)入步驟4铅檩,否則進(jìn)入步驟5。
  • 移除工作線程莽鸿,成功則返回null昧旨,不成功則進(jìn)入下輪循環(huán)。
  • 嘗試用poll() 或者 take()(具體用哪個取決于timed的值)獲取任務(wù)祥得,如果任務(wù)不為空兔沃,則返回該任務(wù)。如果為空级及,則將timeOut 置為 true進(jìn)入下一輪循環(huán)乒疏。如果獲取任務(wù)過程發(fā)生異常,則將 timeOut置為 false 后進(jìn)入下一輪循環(huán)饮焦。

流程圖

image.png

源碼詳讀:

private Runnable getTask() {
    // 最新一次poll是否超時
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        /**
         * 條件1:線程池狀態(tài)SHUTDOWN怕吴、STOP窍侧、TERMINATED狀態(tài)
         * 條件2:線程池STOP、TERMINATED狀態(tài)或workQueue為空
         * 條件1與條件2同時為true转绷,則workerCount-1伟件,并且返回null
         * 注:條件2是考慮到SHUTDOWN狀態(tài)的線程池不會接受任務(wù),但仍會處理任務(wù)
         */
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        /**
         * 下列兩個條件滿足任意一個议经,則給當(dāng)前正在嘗試獲取任務(wù)的工作線程設(shè)置阻塞時間限制(超時會被銷毀斧账?不太確定這點(diǎn)),否則線程可以一直保持活躍狀態(tài)
         * 1.allowCoreThreadTimeOut:當(dāng)前線程是否以keepAliveTime為超時時限等待任務(wù)
         * 2.當(dāng)前線程數(shù)量已經(jīng)超越了核心線程數(shù)
         */
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 兩個條件全部為true煞肾,則通過CAS使工作線程數(shù)-1其骄,即剔除工作線程
        // 條件1:工作線程數(shù)大于maximumPoolSize,或(工作線程阻塞時間受限且上次在任務(wù)隊(duì)列拉取任務(wù)超時)
        // 條件2:wc > 1或任務(wù)隊(duì)列為空
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            // 移除工作線程扯旷,成功則返回null拯爽,不成功則進(jìn)入下輪循環(huán)
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        // 執(zhí)行到這里,說明已經(jīng)經(jīng)過前面重重校驗(yàn)钧忽,開始真正獲取task了
        try {
            // 如果工作線程阻塞時間受限毯炮,則使用poll(),否則使用take()
            // poll()設(shè)定阻塞時間,而take()無時間限制耸黑,直到拿到結(jié)果為止
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            // r不為空桃煎,則返回該Runnable
            if (r != null)
                return r;
            // 沒能獲取到Runable,則將最近獲取任務(wù)是否超時設(shè)置為true
            timedOut = true;
        } catch (InterruptedException retry) {
            // 響應(yīng)中斷大刊,進(jìn)入下一次循環(huán)前將最近獲取任務(wù)超時狀態(tài)置為false
            timedOut = false;
        }
    }
}

4.5 processWorkerExit()

processWorkerExit(Worker w, boolean completedAbruptly)執(zhí)行線程退出的方法

參數(shù)說明:

  • Worker w:要結(jié)束的工作線程为迈。
  • boolean completedAbruptly: 是否突然完成(異常導(dǎo)致),如果工作線程因?yàn)橛脩舢惓K劳鋈本瑒tcompletedAbruptly參數(shù)為 true葫辐。

執(zhí)行流程:

1、如果 completedAbruptly 為 true伴郁,即工作線程因?yàn)楫惓M蝗凰劳龉⒄剑瑒t執(zhí)行工作線程-1操作。

2焊傅、主線程獲取鎖后剂陡,線程池已經(jīng)完成的任務(wù)數(shù)追加 w(當(dāng)前工作線程) 完成的任務(wù)數(shù),并從worker的set集合中移除當(dāng)前worker狐胎。

3鸭栖、根據(jù)線程池狀態(tài)進(jìn)行判斷是否執(zhí)行tryTerminate()結(jié)束線程池。

4握巢、是否需要增加工作線程晕鹊,如果線程池還沒有完全終止,仍需要保持一定數(shù)量的線程。

  • 如果當(dāng)前線程是突然終止的捏题,調(diào)用addWorker()創(chuàng)建工作線程
  • 當(dāng)前線程不是突然終止,但當(dāng)前工作線程數(shù)量小于線程池需要維護(hù)的線程數(shù)量肉渴,則創(chuàng)建工作線程公荧。需要維護(hù)的線程數(shù)量為corePoolSize(取決于成員變量 allowCoreThreadTimeOut是否為 false)或1。

源碼詳讀:

/**
 * Performs cleanup and bookkeeping for a dying worker. Called
 * only from worker threads. Unless completedAbruptly is set,
 * assumes that workerCount has already been adjusted to account
 * for exit.  This method removes thread from worker set, and
 * possibly terminates the pool or replaces the worker if either
 * it exited due to user task exception or if fewer than
 * corePoolSize workers are running or queue is non-empty but
 * there are no workers.
 *
 * @param w the worker
 * @param completedAbruptly if the worker died due to user exception
 */
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    /**
     * 1.工作線程-1操作
     * 1)如果completedAbruptly 為true同规,說明工作線程發(fā)生異常循狰,那么將正在工作的線程數(shù)量-1
     * 2)如果completedAbruptly 為false,說明工作線程無任務(wù)可以執(zhí)行券勺,由getTask()執(zhí)行worker-1操作
     */
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    // 2.從線程set集合中移除工作線程绪钥,該過程需要加鎖
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 將該worker已完成的任務(wù)數(shù)追加到線程池已完成的任務(wù)數(shù)
        completedTaskCount += w.completedTasks;
        // HashSet<Worker>中移除該worker
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    // 3.根據(jù)線程池狀態(tài)進(jìn)行判斷是否結(jié)束線程池
    tryTerminate();

    /**
     * 4.是否需要增加工作線程
     * 線程池狀態(tài)是running 或 shutdown
     * 如果當(dāng)前線程是突然終止的,addWorker()
     * 如果當(dāng)前線程不是突然終止的关炼,但當(dāng)前線程數(shù)量 < 要維護(hù)的線程數(shù)量程腹,addWorker()
     * 故如果調(diào)用線程池shutdown(),直到workQueue為空前儒拂,線程池都會維持corePoolSize個線程寸潦,然后再逐漸銷毀這corePoolSize個線程
     */
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
       if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

好啦,以上就是Java線程池的全部內(nèi)容啦社痛,堅(jiān)持讀完的伙伴兒們你們收獲如何见转?

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市蒜哀,隨后出現(xiàn)的幾起案子斩箫,更是在濱河造成了極大的恐慌,老刑警劉巖撵儿,帶你破解...
    沈念sama閱讀 217,185評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件乘客,死亡現(xiàn)場離奇詭異,居然都是意外死亡淀歇,警方通過查閱死者的電腦和手機(jī)寨典,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評論 3 393
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來房匆,“玉大人耸成,你說我怎么就攤上這事≡『瑁” “怎么了井氢?”我有些...
    開封第一講書人閱讀 163,524評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長岳链。 經(jīng)常有香客問我花竞,道長,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,339評論 1 293
  • 正文 為了忘掉前任约急,我火速辦了婚禮零远,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘厌蔽。我一直安慰自己牵辣,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,387評論 6 391
  • 文/花漫 我一把揭開白布奴饮。 她就那樣靜靜地躺著纬向,像睡著了一般。 火紅的嫁衣襯著肌膚如雪戴卜。 梳的紋絲不亂的頭發(fā)上逾条,一...
    開封第一講書人閱讀 51,287評論 1 301
  • 那天,我揣著相機(jī)與錄音投剥,去河邊找鬼师脂。 笑死,一個胖子當(dāng)著我的面吹牛江锨,可吹牛的內(nèi)容都是我干的危彩。 我是一名探鬼主播,決...
    沈念sama閱讀 40,130評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼泳桦,長吁一口氣:“原來是場噩夢啊……” “哼汤徽!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起灸撰,我...
    開封第一講書人閱讀 38,985評論 0 275
  • 序言:老撾萬榮一對情侶失蹤谒府,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后浮毯,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體完疫,經(jīng)...
    沈念sama閱讀 45,420評論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,617評論 3 334
  • 正文 我和宋清朗相戀三年债蓝,在試婚紗的時候發(fā)現(xiàn)自己被綠了壳鹤。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,779評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡饰迹,死狀恐怖芳誓,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情啊鸭,我是刑警寧澤锹淌,帶...
    沈念sama閱讀 35,477評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站赠制,受9級特大地震影響赂摆,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,088評論 3 328
  • 文/蒙蒙 一烟号、第九天 我趴在偏房一處隱蔽的房頂上張望绊谭。 院中可真熱鬧,春花似錦汪拥、人聲如沸达传。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,716評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽趟大。三九已至鹤树,卻和暖如春铣焊,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背罕伯。 一陣腳步聲響...
    開封第一講書人閱讀 32,857評論 1 269
  • 我被黑心中介騙來泰國打工曲伊, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人追他。 一個月前我還...
    沈念sama閱讀 47,876評論 2 370
  • 正文 我出身青樓坟募,卻偏偏與公主長得像,于是被迫代替她去往敵國和親邑狸。 傳聞我的和親對象是個殘疾皇子懈糯,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,700評論 2 354

推薦閱讀更多精彩內(nèi)容