Java核心(二)深入理解線程池ThreadPool

Threadpool
Threadpool

本文你將獲得以下信息:

  • 線程池源碼解讀
  • 線程池執(zhí)行流程分析
  • 帶返回值的線程池實(shí)現(xiàn)
  • 延遲線程池實(shí)現(xiàn)

為了方便讀者理解烫幕,本文會(huì)由淺入深,先從線程池的使用開(kāi)始再延伸到源碼解讀和源碼分析等高級(jí)內(nèi)容,讀者可根據(jù)自己的情況自主選擇閱讀順序和需要了解的章節(jié)。

一、線程池優(yōu)點(diǎn)

線程池能夠更加充分的利用CPU票顾、內(nèi)存、網(wǎng)絡(luò)帆调、IO等系統(tǒng)資源奠骄,線程池的主要作用如下:

  • 利用線程池可以復(fù)用線程,控制最大并發(fā)數(shù)番刊;
  • 實(shí)現(xiàn)任務(wù)緩存策略和拒絕機(jī)制含鳞;
  • 實(shí)現(xiàn)延遲執(zhí)行

阿里巴巴Java開(kāi)發(fā)手冊(cè)強(qiáng)制規(guī)定:線程資源必須通過(guò)線程池提供,如下圖:

線程池規(guī)定
線程池規(guī)定

二芹务、線程池使用

本節(jié)會(huì)介紹7種線程池的創(chuàng)建與使用蝉绷,線程池的狀態(tài)介紹鸭廷,ThreadPoolExecutor參數(shù)介紹等。

2.1 線程池創(chuàng)建

線程池可以使用Executors和ThreadPoolExecutor熔吗,其中使用Executors有六種創(chuàng)建線程池的方法辆床,如下圖:

線程池創(chuàng)建
線程池創(chuàng)建
// 使用Executors方式創(chuàng)建
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(2);
ScheduledExecutorService singleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);
ExecutorService workStealingPool = Executors.newWorkStealingPool();
// 原始創(chuàng)建方式
ThreadPoolExecutor tp = new ThreadPoolExecutor(10, 10, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());

2.1.1 線程池解讀

  1. newSingleThreadExecutor(),它的特點(diǎn)在于工作線程數(shù)目被限制為 1桅狠,操作一個(gè)無(wú)界的工作隊(duì)列讼载,所以它保證了所有任務(wù)的都是被順序執(zhí)行,最多會(huì)有一個(gè)任務(wù)處于活動(dòng)狀態(tài)中跌,并且不允許使用者改動(dòng)線程池實(shí)例咨堤,因此可以避免其改變線程數(shù)目。
  2. newCachedThreadPool()漩符,它是一種用來(lái)處理大量短時(shí)間工作任務(wù)的線程池一喘,具有幾個(gè)鮮明特點(diǎn):它會(huì)試圖緩存線程并重用,當(dāng)無(wú)緩存線程可用時(shí)嗜暴,就會(huì)創(chuàng)建新的工作線程凸克;如果線程閑置的時(shí)間超過(guò) 60 秒,則被終止并移出緩存灼伤;長(zhǎng)時(shí)間閑置時(shí)触徐,這種線程池咪鲜,不會(huì)消耗什么資源狐赡。其內(nèi)部使用 SynchronousQueue 作為工作隊(duì)列。
  3. newFixedThreadPool(int nThreads)疟丙,重用指定數(shù)目(nThreads)的線程颖侄,其背后使用的是無(wú)界的工作隊(duì)列,任何時(shí)候最多有 nThreads 個(gè)工作線程是活動(dòng)的享郊。這意味著览祖,如果任務(wù)數(shù)量超過(guò)了活動(dòng)隊(duì)列數(shù)目,將在工作隊(duì)列中等待空閑線程出現(xiàn)炊琉;如果有工作線程退出展蒂,將會(huì)有新的工作線程被創(chuàng)建,以補(bǔ)足指定的數(shù)目 nThreads苔咪。
  4. newSingleThreadScheduledExecutor() 創(chuàng)建單線程池锰悼,返回 ScheduledExecutorService,可以進(jìn)行定時(shí)或周期性的工作調(diào)度团赏。
  5. newScheduledThreadPool(int corePoolSize)和newSingleThreadScheduledExecutor()類似箕般,創(chuàng)建的是個(gè) ScheduledExecutorService,可以進(jìn)行定時(shí)或周期性的工作調(diào)度舔清,區(qū)別在于單一工作線程還是多個(gè)工作線程丝里。
  6. newWorkStealingPool(int parallelism)曲初,這是一個(gè)經(jīng)常被人忽略的線程池,Java 8 才加入這個(gè)創(chuàng)建方法杯聚,其內(nèi)部會(huì)構(gòu)建<a >ForkJoinPool</a>臼婆,利用<a >Work-Stealing</a>算法,并行地處理任務(wù)幌绍,不保證處理順序目锭。
  7. ThreadPoolExecutor是最原始的線程池創(chuàng)建,上面1-3創(chuàng)建方式都是對(duì)ThreadPoolExecutor的封裝纷捞。

總結(jié): 其中newSingleThreadExecutor痢虹、newCachedThreadPool、newFixedThreadPool是對(duì)ThreadPoolExecutor的封裝實(shí)現(xiàn)主儡,newSingleThreadScheduledExecutor奖唯、newScheduledThreadPool則為ThreadPoolExecutor子類ScheduledThreadPoolExecutor的封裝,用于執(zhí)行延遲任務(wù)糜值,newWorkStealingPool則為Java 8新加的方法丰捷。

2.1.2 單線程池的意義

從以上代碼可以看出newSingleThreadExecutor和newSingleThreadScheduledExecutor創(chuàng)建的都是單線程池,那么單線程池的意義是什么呢寂汇?

雖然是單線程池病往,但提供了工作隊(duì)列,生命周期管理骄瓣,工作線程維護(hù)等功能停巷。

2.2 ThreadPoolExecutor解讀

ThreadPoolExecutor作為線程池的核心方法,我們來(lái)看一下ThreadPoolExecutor內(nèi)部實(shí)現(xiàn)榕栏,以及封裝類是怎么調(diào)用ThreadPoolExecutor的畔勤。

先從構(gòu)造函數(shù)說(shuō)起,構(gòu)造函數(shù)源碼如下:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

參數(shù)說(shuō)明:

  • corePoolSize:所謂的核心線程數(shù)扒磁,可以大致理解為長(zhǎng)期駐留的線程數(shù)目(除非設(shè)置了 allowCoreThreadTimeOut)庆揪。對(duì)于不同的線程池,這個(gè)值可能會(huì)有很大區(qū)別妨托,比如 newFixedThreadPool 會(huì)將其設(shè)置為 nThreads缸榛,而對(duì)于 newCachedThreadPool 則是為 0。
  • maximumPoolSize:顧名思義兰伤,就是線程不夠時(shí)能夠創(chuàng)建的最大線程數(shù)内颗。同樣進(jìn)行對(duì)比,對(duì)于 newFixedThreadPool医清,當(dāng)然就是 nThreads起暮,因?yàn)槠湟笫枪潭ù笮。?newCachedThreadPool 則是 Integer.MAX_VALUE。
  • keepAliveTime:空閑線程的备号常活時(shí)間筒捺,如果線程的空閑時(shí)間超過(guò)這個(gè)值,那么將會(huì)被關(guān)閉纸厉。注意此值生效條件必須滿足:空閑時(shí)間超過(guò)這個(gè)值系吭,并且線程池中的線程數(shù)少于等于核心線程數(shù)corePoolSize。當(dāng)然核心線程默認(rèn)是不會(huì)關(guān)閉的颗品,除非設(shè)置了allowCoreThreadTimeOut(true)那么核心線程也可以被回收肯尺。
  • TimeUnit:時(shí)間單位。
  • BlockingQueue:任務(wù)丟列躯枢,用于存儲(chǔ)線程池的待執(zhí)行任務(wù)的则吟。
  • threadFactory:用于生成線程,一般我們可以用默認(rèn)的就可以了锄蹂。
  • handler:當(dāng)線程池已經(jīng)滿了氓仲,但是又有新的任務(wù)提交的時(shí)候,該采取什么策略由這個(gè)來(lái)指定得糜。有幾種方式可供選擇敬扛,像拋出異常、直接拒絕然后返回等朝抖,也可以自己實(shí)現(xiàn)相應(yīng)的接口實(shí)現(xiàn)自己的邏輯啥箭。

來(lái)看一下線程池封裝類對(duì)于ThreadPoolExecutor的調(diào)用:

newSingleThreadExecutor對(duì)ThreadPoolExecutor的封裝源碼如下:

public static ExecutorService newSingleThreadExecutor() {
    return new Executors.FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                    0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>()));
}

newCachedThreadPool對(duì)ThreadPoolExecutor的封裝源碼如下:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

newFixedThreadPool對(duì)ThreadPoolExecutor的封裝源碼如下:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

ScheduledExecutorService對(duì)ThreadPoolExecutor的封裝源碼如下:

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1));
}

newSingleThreadScheduledExecutor使用的是ThreadPoolExecutor的子類ScheduledThreadPoolExecutor,如下圖所示:

Threadpool
Threadpool

newScheduledThreadPool對(duì)ThreadPoolExecutor的封裝源碼如下:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

newScheduledThreadPool使用的也是ThreadPoolExecutor的子類ScheduledThreadPoolExecutor治宣。

2.3 線程池狀態(tài)

查看ThreadPoolExecutor源碼可知線程的狀態(tài)如下:

Threadpool
Threadpool

線程狀態(tài)解讀(以下內(nèi)容來(lái)源于:https://javadoop.com/post/java-thread-pool):

  • RUNNING:這個(gè)沒(méi)什么好說(shuō)的急侥,這是最正常的狀態(tài):接受新的任務(wù),處理等待隊(duì)列中的任務(wù)炼七;
  • SHUTDOWN:不接受新的任務(wù)提交缆巧,但是會(huì)繼續(xù)處理等待隊(duì)列中的任務(wù);
  • STOP:不接受新的任務(wù)提交豌拙,不再處理等待隊(duì)列中的任務(wù),中斷正在執(zhí)行任務(wù)的線程题暖;
  • TIDYING:所有的任務(wù)都銷毀了按傅,workCount 為 0。線程池的狀態(tài)在轉(zhuǎn)換為 TIDYING 狀態(tài)時(shí)胧卤,會(huì)執(zhí)行鉤子方法 terminated()唯绍;
  • TERMINATED:terminated() 方法結(jié)束后,線程池的狀態(tài)就會(huì)變成這個(gè)枝誊;

RUNNING 定義為 -1况芒,SHUTDOWN 定義為 0,其他的都比 0 大叶撒,所以等于 0 的時(shí)候不能提交任務(wù)绝骚,大于 0 的話耐版,連正在執(zhí)行的任務(wù)也需要中斷。

看了這幾種狀態(tài)的介紹压汪,讀者大體也可以猜到十之八九的狀態(tài)轉(zhuǎn)換了粪牲,各個(gè)狀態(tài)的轉(zhuǎn)換過(guò)程有以下幾種:

  • RUNNING -> SHUTDOWN:當(dāng)調(diào)用了 shutdown() 后,會(huì)發(fā)生這個(gè)狀態(tài)轉(zhuǎn)換止剖,這也是最重要的腺阳;
  • (RUNNING or SHUTDOWN) -> STOP:當(dāng)調(diào)用 shutdownNow() 后,會(huì)發(fā)生這個(gè)狀態(tài)轉(zhuǎn)換穿香,這下要清楚 shutDown() 和 shutDownNow() 的區(qū)別了亭引;
  • SHUTDOWN -> TIDYING:當(dāng)任務(wù)隊(duì)列和線程池都清空后,會(huì)由 SHUTDOWN 轉(zhuǎn)換為 TIDYING皮获;
  • STOP -> TIDYING:當(dāng)任務(wù)隊(duì)列清空后痛侍,發(fā)生這個(gè)轉(zhuǎn)換;
  • TIDYING -> TERMINATED:這個(gè)前面說(shuō)了魔市,當(dāng) terminated() 方法結(jié)束后主届;

2.4 線程池執(zhí)行

說(shuō)了那么多下來(lái)一起來(lái)看線程池的是怎么執(zhí)行任務(wù)的,線程池任務(wù)提交有兩個(gè)方法:

  • execute
  • submit

其中execute只能接受Runnable類型的任務(wù)待德,使用如下:

ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
singleThreadExecutor.execute(new Runnable() {
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName());
    }
});

submit可以接受Runnable或Callable類型的任務(wù)君丁,使用如下:

ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(new Runnable() {
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName());
    }
});

2.4.1 帶返回值的線程池實(shí)現(xiàn)

使用submit傳遞Callable類可以獲取執(zhí)行任務(wù)的返回值,Callable是JDK 1.5 添加的特性用于補(bǔ)充Runnable無(wú)返回的情況将宪。

ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<Long> result = executorService.submit(new Callable<Long>() {
    @Override
    public Long call() throws Exception {
        return new Date().getTime();
    }
});
try {
    System.out.println("運(yùn)行結(jié)果:" + result.get());
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}

2.4.2 延遲線程池實(shí)現(xiàn)

在線程池中newSingleThreadScheduledExecutor和newScheduledThreadPool返回的是ScheduledExecutorService绘闷,用于執(zhí)行延遲線程池的,代碼如下:

// 延遲線程池
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);
scheduledThreadPool.schedule(new Runnable() {
    @Override
    public void run() {
        System.out.println("time:" + new Date().getTime());
    }
}, 10, TimeUnit.SECONDS);

完整示例下載地址: https://github.com/vipstone/java-core-example

三较坛、線程池源碼解讀

閱讀線程池的源碼有一個(gè)小技巧印蔗,可以按照線程池執(zhí)行的順序進(jìn)行串連關(guān)聯(lián)閱讀,這樣更容易理解線程池的實(shí)現(xiàn)丑勤。

源碼閱讀流程解讀

我們先從線程池的任務(wù)提交方法execute()開(kāi)始閱讀华嘹,從execute()我們會(huì)發(fā)現(xiàn)線程池執(zhí)行的核心方法是addWorker(),在addWorker()中我們發(fā)現(xiàn)啟動(dòng)線程調(diào)用了start()方法法竞,調(diào)用start()方法之后會(huì)執(zhí)行Worker類的run()方法耙厚,run里面調(diào)用runWorker(),運(yùn)行程序的關(guān)鍵在于getTask()方法岔霸,getTask()方法之后就是此線程的關(guān)閉薛躬,整個(gè)線程池的工作流程也就完成了,下來(lái)一起來(lái)看吧(如果本段文章沒(méi)看懂的話也可以看完源碼之后呆细,回過(guò)頭來(lái)再看一遍)型宝。

3.1 execute() 源碼解讀

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();

    // 如果當(dāng)前線程數(shù)少于核心線程數(shù),那么直接添加一個(gè) worker 來(lái)執(zhí)行任務(wù),
    // 創(chuàng)建一個(gè)新的線程趴酣,并把當(dāng)前任務(wù) command 作為這個(gè)線程的第一個(gè)任務(wù)(firstTask)
    if (workerCountOf(c) < corePoolSize) {
        // 添加任務(wù)成功梨树,那么就結(jié)束了。提交任務(wù)嘛价卤,線程池已經(jīng)接受了這個(gè)任務(wù)劝萤,這個(gè)方法也就可以返回了
        // 至于執(zhí)行的結(jié)果,到時(shí)候會(huì)包裝到 FutureTask 中慎璧。
        // 返回 false 代表線程池不允許提交任務(wù)
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 到這里說(shuō)明床嫌,要么當(dāng)前線程數(shù)大于等于核心線程數(shù),要么剛剛 addWorker 失敗了

    // 如果線程池處于 RUNNING 狀態(tài)胸私,把這個(gè)任務(wù)添加到任務(wù)隊(duì)列 workQueue 中
    if (isRunning(c) && workQueue.offer(command)) {
        /* 這里面說(shuō)的是厌处,如果任務(wù)進(jìn)入了 workQueue,我們是否需要開(kāi)啟新的線程
         * 因?yàn)榫€程數(shù)在 [0, corePoolSize) 是無(wú)條件開(kāi)啟新的線程
         * 如果線程數(shù)已經(jīng)大于等于 corePoolSize岁疼,那么將任務(wù)添加到隊(duì)列中阔涉,然后進(jìn)到這里
         */
        int recheck = ctl.get();
        // 如果線程池已不處于 RUNNING 狀態(tài),那么移除已經(jīng)入隊(duì)的這個(gè)任務(wù)捷绒,并且執(zhí)行拒絕策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 如果線程池還是 RUNNING 的瑰排,并且線程數(shù)為 0,那么開(kāi)啟新的線程
        // 到這里暖侨,我們知道了椭住,這塊代碼的真正意圖是:擔(dān)心任務(wù)提交到隊(duì)列中了,但是線程都關(guān)閉了
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 如果 workQueue 隊(duì)列滿了字逗,那么進(jìn)入到這個(gè)分支
    // 以 maximumPoolSize 為界創(chuàng)建新的 worker京郑,
    // 如果失敗,說(shuō)明當(dāng)前線程數(shù)已經(jīng)達(dá)到 maximumPoolSize葫掉,執(zhí)行拒絕策略
    else if (!addWorker(command, false))
        reject(command);
}

3.2 addWorker() 源碼解讀

// 第一個(gè)參數(shù)是準(zhǔn)備提交給這個(gè)線程執(zhí)行的任務(wù)些举,之前說(shuō)了,可以為 null
// 第二個(gè)參數(shù)為 true 代表使用核心線程數(shù) corePoolSize 作為創(chuàng)建線程的界線俭厚,也就說(shuō)創(chuàng)建這個(gè)線程的時(shí)候户魏,
//         如果線程池中的線程總數(shù)已經(jīng)達(dá)到 corePoolSize,那么不能響應(yīng)這次創(chuàng)建線程的請(qǐng)求
//         如果是 false套腹,代表使用最大線程數(shù) maximumPoolSize 作為界線
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 這個(gè)非常不好理解
        // 如果線程池已關(guān)閉绪抛,并滿足以下條件之一,那么不創(chuàng)建新的 worker:
        // 1. 線程池狀態(tài)大于 SHUTDOWN电禀,其實(shí)也就是 STOP, TIDYING, 或 TERMINATED
        // 2. firstTask != null
        // 3. workQueue.isEmpty()
        // 簡(jiǎn)單分析下:
        // 還是狀態(tài)控制的問(wèn)題,當(dāng)線程池處于 SHUTDOWN 的時(shí)候笤休,不允許提交任務(wù)尖飞,但是已有的任務(wù)繼續(xù)執(zhí)行
        // 當(dāng)狀態(tài)大于 SHUTDOWN 時(shí),不允許提交任務(wù),且中斷正在執(zhí)行的任務(wù)
        // 多說(shuō)一句:如果線程池處于 SHUTDOWN政基,但是 firstTask 為 null贞铣,且 workQueue 非空,那么是允許創(chuàng)建 worker 的
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 如果成功沮明,那么就是所有創(chuàng)建線程前的條件校驗(yàn)都滿足了辕坝,準(zhǔn)備創(chuàng)建線程執(zhí)行任務(wù)了
            // 這里失敗的話,說(shuō)明有其他線程也在嘗試往線程池中創(chuàng)建線程
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 由于有并發(fā)荐健,重新再讀取一下 ctl
            c = ctl.get();
            // 正常如果是 CAS 失敗的話酱畅,進(jìn)到下一個(gè)里層的for循環(huán)就可以了
            // 可是如果是因?yàn)槠渌€程的操作,導(dǎo)致線程池的狀態(tài)發(fā)生了變更江场,如有其他線程關(guān)閉了這個(gè)線程池
            // 那么需要回到外層的for循環(huán)
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    /* 
     * 到這里纺酸,我們認(rèn)為在當(dāng)前這個(gè)時(shí)刻,可以開(kāi)始創(chuàng)建線程來(lái)執(zhí)行任務(wù)了址否,
     * 因?yàn)樵撔r?yàn)的都校驗(yàn)了餐蔬,至于以后會(huì)發(fā)生什么,那是以后的事佑附,至少當(dāng)前是滿足條件的
     */

    // worker 是否已經(jīng)啟動(dòng)
    boolean workerStarted = false;
    // 是否已將這個(gè) worker 添加到 workers 這個(gè) HashSet 中
    boolean workerAdded = false;
    Worker w = null;
    try {
        final ReentrantLock mainLock = this.mainLock;
        // 把 firstTask 傳給 worker 的構(gòu)造方法
        w = new Worker(firstTask);
        // 取 worker 中的線程對(duì)象樊诺,之前說(shuō)了,Worker的構(gòu)造方法會(huì)調(diào)用 ThreadFactory 來(lái)創(chuàng)建一個(gè)新的線程
        final Thread t = w.thread;
        if (t != null) {
            // 這個(gè)是整個(gè)類的全局鎖音同,持有這個(gè)鎖才能讓下面的操作“順理成章”词爬,
            // 因?yàn)殛P(guān)閉一個(gè)線程池需要這個(gè)鎖,至少我持有鎖的期間瘟斜,線程池不會(huì)被關(guān)閉
            mainLock.lock();
            try {

                int c = ctl.get();
                int rs = runStateOf(c);

                // 小于 SHUTTDOWN 那就是 RUNNING缸夹,這個(gè)自不必說(shuō),是最正常的情況
                // 如果等于 SHUTDOWN螺句,前面說(shuō)了虽惭,不接受新的任務(wù),但是會(huì)繼續(xù)執(zhí)行等待隊(duì)列中的任務(wù)
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // worker 里面的 thread 可不能是已經(jīng)啟動(dòng)的
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    // 加到 workers 這個(gè) HashSet 中
                    workers.add(w);
                    int s = workers.size();
                    // largestPoolSize 用于記錄 workers 中的個(gè)數(shù)的最大值
                    // 因?yàn)?workers 是不斷增加減少的蛇尚,通過(guò)這個(gè)值可以知道線程池的大小曾經(jīng)達(dá)到的最大值
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 添加成功的話芽唇,啟動(dòng)這個(gè)線程
            if (workerAdded) {
                // 啟動(dòng)線程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 如果線程沒(méi)有啟動(dòng),需要做一些清理工作取劫,如前面 workCount 加了 1匆笤,將其減掉
        if (! workerStarted)
            addWorkerFailed(w);
    }
    // 返回線程是否啟動(dòng)成功
    return workerStarted;
}

在這段代碼可以看出,調(diào)用了t.start();

3.3 runWorker() 源碼解讀

根據(jù)上面代碼可知谱邪,調(diào)用了Worker的t.start()之后炮捧,緊接著會(huì)調(diào)用Worker的run()方法,run()源碼如下:

public void run() {
    runWorker(this);
}

runWorker()源碼如下:

//  worker 線程啟動(dòng)后調(diào)用,while 循環(huán)(即自旋!)不斷從等待隊(duì)列獲取任務(wù)并執(zhí)行
//  worker 初始化時(shí)惦银,可指定 firstTask咆课,那么第一個(gè)任務(wù)也就可以不需要從隊(duì)列中獲取
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // 該線程的第一個(gè)任務(wù)(若有)
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 允許中斷
    w.unlock(); 
    boolean completedAbruptly = true;
    try {
        // 循環(huán)調(diào)用 getTask 獲取任務(wù)
        while (task != null || (task = getTask()) != null) {
            w.lock();          
            // 若線程池狀態(tài)大于等于 STOP末誓,那么意味著該線程也要中斷
              /**
               * 若線程池STOP,請(qǐng)確保線程 已被中斷
               * 如果沒(méi)有书蚪,請(qǐng)確保線程未被中斷
               * 這需要在第二種情況下進(jìn)行重新檢查喇澡,以便在關(guān)中斷時(shí)處理shutdownNow競(jìng)爭(zhēng)
               */
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 這是一個(gè)鉤子方法,留給需要的子類實(shí)現(xiàn)
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 到這里終于可以執(zhí)行任務(wù)了
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    // 這里不允許拋出 Throwable殊校,所以轉(zhuǎn)換為 Error
                    thrown = x; throw new Error(x);
                } finally {
                    // 也是一個(gè)鉤子方法晴玖,將 task 和異常作為參數(shù),留給需要的子類實(shí)現(xiàn)
                    afterExecute(task, thrown);
                }
            } finally {
                // 置空 task为流,準(zhǔn)備 getTask 下一個(gè)任務(wù)
                task = null;
                // 累加完成的任務(wù)數(shù)
                w.completedTasks++;
                // 釋放掉 worker 的獨(dú)占鎖
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 到這里呕屎,需要執(zhí)行線程關(guān)閉
        // 1. 說(shuō)明 getTask 返回 null,也就是說(shuō)艺谆,這個(gè) worker 的使命結(jié)束了榨惰,執(zhí)行關(guān)閉
        // 2. 任務(wù)執(zhí)行過(guò)程中發(fā)生了異常
        //    第一種情況,已經(jīng)在代碼處理了將 workCount 減 1静汤,這個(gè)在 getTask 方法分析中說(shuō)
        //    第二種情況琅催,workCount 沒(méi)有進(jìn)行處理,所以需要在 processWorkerExit 中處理
        processWorkerExit(w, completedAbruptly);
    }
}

3.4 getTask() 源碼解讀

runWorker里面的有g(shù)etTask()虫给,來(lái)看下具體的實(shí)現(xiàn):

// 此方法有三種可能
// 1. 阻塞直到獲取到任務(wù)返回藤抡。默認(rèn) corePoolSize 之內(nèi)的線程是不會(huì)被回收的,它們會(huì)一直等待任務(wù)
// 2. 超時(shí)退出抹估。keepAliveTime 起作用的時(shí)候缠黍,也就是如果這么多時(shí)間內(nèi)都沒(méi)有任務(wù),那么應(yīng)該執(zhí)行關(guān)閉
// 3. 如果發(fā)生了以下條件药蜻,須返回 null
//     池中有大于 maximumPoolSize 個(gè) workers 存在(通過(guò)調(diào)用 setMaximumPoolSize 進(jìn)行設(shè)置)
//     線程池處于 SHUTDOWN瓷式,而且 workQueue 是空的,前面說(shuō)了语泽,這種不再接受新的任務(wù)
//     線程池處于 STOP贸典,不僅不接受新的線程,連 workQueue 中的線程也不再執(zhí)行
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
   for (;;) {
            // 允許核心線程數(shù)內(nèi)的線程回收踱卵,或當(dāng)前線程數(shù)超過(guò)了核心線程數(shù)廊驼,那么有可能發(fā)生超時(shí)關(guān)閉
            // 這里 break,是為了不往下執(zhí)行后一個(gè) if (compareAndDecrementWorkerCount(c))
            // 兩個(gè) if 一起看:如果當(dāng)前線程數(shù) wc > maximumPoolSize惋砂,或者超時(shí)妒挎,都返回 null
            // 那這里的問(wèn)題來(lái)了,wc > maximumPoolSize 的情況西饵,為什么要返回 null酝掩?
            // 換句話說(shuō),返回 null 意味著關(guān)閉線程眷柔。
            // 那是因?yàn)橛锌赡荛_(kāi)發(fā)者調(diào)用了 setMaximumPoolSize 將線程池的 maximumPoolSize 調(diào)小了
            // 如果此 worker 發(fā)生了中斷庸队,采取的方案是重試
            // 解釋下為什么會(huì)發(fā)生中斷积蜻,這個(gè)讀者要去看 setMaximumPoolSize 方法闯割,
            // 如果開(kāi)發(fā)者將 maximumPoolSize 調(diào)小了彻消,導(dǎo)致其小于當(dāng)前的 workers 數(shù)量,
            // 那么意味著超出的部分線程要被關(guān)閉宙拉。重新進(jìn)入 for 循環(huán)宾尚,自然會(huì)有部分線程會(huì)返回 null
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                // CAS 操作振定,減少工作線程數(shù)
                decrementWorkerCount();
                return null;
            }
            int wc = workerCountOf(c);
            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
            // 如果此 worker 發(fā)生了中斷闭专,采取的方案是重試
            // 解釋下為什么會(huì)發(fā)生中斷,這個(gè)讀者要去看 setMaximumPoolSize 方法说榆,
            // 如果開(kāi)發(fā)者將 maximumPoolSize 調(diào)小了锥忿,導(dǎo)致其小于當(dāng)前的 workers 數(shù)量牛郑,
            // 那么意味著超出的部分線程要被關(guān)閉。重新進(jìn)入 for 循環(huán)敬鬓,自然會(huì)有部分線程會(huì)返回 null
                timedOut = false;
            }
        }
}

四淹朋、線程池執(zhí)行流程

線程池的執(zhí)行流程如下圖:

Threadpool
Threadpool

五、總結(jié)

本文總結(jié)以問(wèn)答的形式展示钉答,引自《深度解讀 java 線程池設(shè)計(jì)思想及源碼實(shí)現(xiàn)》础芍,最下方附參考地址。

1数尿、線程池有哪些關(guān)鍵屬性仑性?

  • corePoolSize 到 maximumPoolSize 之間的線程會(huì)被回收,當(dāng)然 corePoolSize 的線程也可以通過(guò)設(shè)置而得到回收(allowCoreThreadTimeOut(true))右蹦。

  • workQueue 用于存放任務(wù)诊杆,添加任務(wù)的時(shí)候,如果當(dāng)前線程數(shù)超過(guò)了 corePoolSize何陆,那么往該隊(duì)列中插入任務(wù)晨汹,線程池中的線程會(huì)負(fù)責(zé)到隊(duì)列中拉取任務(wù)。

  • keepAliveTime 用于設(shè)置空閑時(shí)間甲献,如果線程數(shù)超出了 corePoolSize宰缤,并且有些線程的空閑時(shí)間超過(guò)了這個(gè)值,會(huì)執(zhí)行關(guān)閉這些線程的操作

  • rejectedExecutionHandler 用于處理當(dāng)線程池不能執(zhí)行此任務(wù)時(shí)的情況晃洒,默認(rèn)有拋出 RejectedExecutionException 異常慨灭、忽略任務(wù)、使用提交任務(wù)的線程來(lái)執(zhí)行此任務(wù)和將隊(duì)列中等待最久的任務(wù)刪除球及,然后提交此任務(wù)這四種策略氧骤,默認(rèn)為拋出異常。

2吃引、線程池中的線程創(chuàng)建時(shí)機(jī)筹陵?

  • 如果當(dāng)前線程數(shù)少于 corePoolSize刽锤,那么提交任務(wù)的時(shí)候創(chuàng)建一個(gè)新的線程,并由這個(gè)線程執(zhí)行這個(gè)任務(wù)朦佩;

  • 如果當(dāng)前線程數(shù)已經(jīng)達(dá)到 corePoolSize并思,那么將提交的任務(wù)添加到隊(duì)列中,等待線程池中的線程去隊(duì)列中取任務(wù)语稠;

  • 如果隊(duì)列已滿宋彼,那么創(chuàng)建新的線程來(lái)執(zhí)行任務(wù),需要保證池中的線程數(shù)不會(huì)超過(guò) maximumPoolSize仙畦,如果此時(shí)線程數(shù)超過(guò)了 maximumPoolSize输涕,那么執(zhí)行拒絕策略。

3慨畸、任務(wù)執(zhí)行過(guò)程中發(fā)生異常怎么處理莱坎?

如果某個(gè)任務(wù)執(zhí)行出現(xiàn)異常,那么執(zhí)行任務(wù)的線程會(huì)被關(guān)閉寸士,而不是繼續(xù)接收其他任務(wù)檐什。然后會(huì)啟動(dòng)一個(gè)新的線程來(lái)代替它。

4碉京、什么時(shí)候會(huì)執(zhí)行拒絕策略厢汹?

  • workers 的數(shù)量達(dá)到了 corePoolSize,任務(wù)入隊(duì)成功谐宙,以此同時(shí)線程池被關(guān)閉了烫葬,而且關(guān)閉線程池并沒(méi)有將這個(gè)任務(wù)出隊(duì),那么執(zhí)行拒絕策略凡蜻。這里說(shuō)的是非常邊界的問(wèn)題搭综,入隊(duì)和關(guān)閉線程池并發(fā)執(zhí)行,讀者仔細(xì)看看 execute 方法是怎么進(jìn)到第一個(gè) reject(command) 里面的划栓。
  • workers 的數(shù)量大于等于 corePoolSize兑巾,準(zhǔn)備入隊(duì),可是隊(duì)列滿了忠荞,任務(wù)入隊(duì)失敗蒋歌,那么準(zhǔn)備開(kāi)啟新的線程,可是線程數(shù)已經(jīng)達(dá)到 maximumPoolSize委煤,那么執(zhí)行拒絕策略堂油。

六、參考資料

書(shū)籍:《碼出高效:Java開(kāi)發(fā)手冊(cè)》

Java核心技術(shù)36講:http://t.cn/EwUJvWA

深度解讀 java 線程池設(shè)計(jì)思想及源碼實(shí)現(xiàn):https://javadoop.com/post/java-thread-pool

Java線程池-ThreadPoolExecutor源碼解析(基于Java8):https://www.imooc.com/article/42990

課程推薦:

線程池創(chuàng)建
線程池創(chuàng)建
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末碧绞,一起剝皮案震驚了整個(gè)濱河市府框,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌讥邻,老刑警劉巖迫靖,帶你破解...
    沈念sama閱讀 210,978評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件院峡,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡系宜,警方通過(guò)查閱死者的電腦和手機(jī)照激,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,954評(píng)論 2 384
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)蜈首,“玉大人实抡,你說(shuō)我怎么就攤上這事』恫撸” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 156,623評(píng)論 0 345
  • 文/不壞的土叔 我叫張陵赏淌,是天一觀的道長(zhǎng)踩寇。 經(jīng)常有香客問(wèn)我,道長(zhǎng)六水,這世上最難降的妖魔是什么俺孙? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,324評(píng)論 1 282
  • 正文 為了忘掉前任,我火速辦了婚禮掷贾,結(jié)果婚禮上睛榄,老公的妹妹穿的比我還像新娘。我一直安慰自己想帅,他們只是感情好场靴,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,390評(píng)論 5 384
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著港准,像睡著了一般旨剥。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上浅缸,一...
    開(kāi)封第一講書(shū)人閱讀 49,741評(píng)論 1 289
  • 那天轨帜,我揣著相機(jī)與錄音,去河邊找鬼衩椒。 笑死蚌父,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的毛萌。 我是一名探鬼主播苟弛,決...
    沈念sama閱讀 38,892評(píng)論 3 405
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼朝聋!你這毒婦竟也來(lái)了嗡午?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 37,655評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤冀痕,失蹤者是張志新(化名)和其女友劉穎荔睹,沒(méi)想到半個(gè)月后狸演,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,104評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡僻他,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,451評(píng)論 2 325
  • 正文 我和宋清朗相戀三年宵距,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片吨拗。...
    茶點(diǎn)故事閱讀 38,569評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡满哪,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出劝篷,到底是詐尸還是另有隱情哨鸭,我是刑警寧澤,帶...
    沈念sama閱讀 34,254評(píng)論 4 328
  • 正文 年R本政府宣布娇妓,位于F島的核電站像鸡,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏哈恰。R本人自食惡果不足惜只估,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,834評(píng)論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望着绷。 院中可真熱鬧蛔钙,春花似錦、人聲如沸荠医。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,725評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)子漩。三九已至豫喧,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間幢泼,已是汗流浹背紧显。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,950評(píng)論 1 264
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留缕棵,地道東北人孵班。 一個(gè)月前我還...
    沈念sama閱讀 46,260評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像招驴,于是被迫代替她去往敵國(guó)和親篙程。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,446評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容

  • 前段時(shí)間遇到這樣一個(gè)問(wèn)題别厘,有人問(wèn)微信朋友圈的上傳圖片的功能怎么做才能讓用戶的等待時(shí)間較短虱饿,比如說(shuō)一下上傳9張圖片,...
    加油碼農(nóng)閱讀 1,190評(píng)論 0 2
  • 原文出處http://cmsblogs.com/ 『chenssy』 作為Executor框架中最核心的類,Thr...
    踩在浪花上看浪閱讀 1,222評(píng)論 0 4
  • 序言 近日后臺(tái)需要一些數(shù)據(jù),需要從網(wǎng)上爬取氮发,但是爬取的過(guò)程中渴肉,由于訪問(wèn)速度太頻繁,造成IP被封爽冕,最終通過(guò)線程池解決...
    HusterYP閱讀 838評(píng)論 0 3
  • Java線程池 [toc] 什么是線程池 線程池就是有N個(gè)子線程共同在運(yùn)行的線程組合仇祭。 舉個(gè)容易理解的例子:有個(gè)線...
    石家志遠(yuǎn)閱讀 1,296評(píng)論 0 6
  • 一、池化技術(shù) 程序的運(yùn)行颈畸,其本質(zhì)上乌奇,是對(duì)系統(tǒng)資源(CPU、內(nèi)存眯娱、磁盤礁苗、網(wǎng)絡(luò)等等)的使用,如何高效的使用這些資源是編...
    CodingXu閱讀 141評(píng)論 0 0