線程池-一文弄懂Java里面的線程池ThreadPoolExecutor

前言

工作中難免會(huì)使用線程池溪猿。對(duì)線程池的使用要格外的小心,說(shuō)不定某天就出現(xiàn)了難搞的生產(chǎn)問(wèn)題(OOM)讲弄。每次在使用的時(shí)候依痊,我都會(huì)網(wǎng)上找找資料,今天我就自己全部整理了一篇瓶摆,不足或錯(cuò)誤之處性宏,希望大家看完后多多補(bǔ)充,提提意見(jiàn)书斜。



1、為什么要使用多線程

我們使用多線程的本質(zhì)是為了提升程序的性能荐吉。程序的性能我們可以用2個(gè)指標(biāo)來(lái)度量:
延遲:發(fā)出請(qǐng)求到收到響應(yīng)這個(gè)過(guò)程的時(shí)間;延遲越短穿撮,意味著程序執(zhí)行得越快瞧哟,性能也就越好。
吞吐量:在單位時(shí)間內(nèi)能處理請(qǐng)求的數(shù)量咧党;吞吐量越大陨亡,意味著程序能處理的請(qǐng)求越多,性能也就越好蛙埂。
同等條件下遮糖,延遲越短,吞吐量越大屡江。但是由于它們隸屬不同的維度(一個(gè)是時(shí)間維度赛不,一個(gè)是空間維度),并不能互相轉(zhuǎn)換文黎。
我們所謂提升性能殿较,從度量的角度,主要是降低延遲抓艳,提高吞吐量。

要想“降低延遲儡首,提高吞吐量”偏友,對(duì)應(yīng)的方法呢,基本上有兩個(gè)方向氛濒,一個(gè)方向是優(yōu)化算法鹅髓,另一個(gè)方向是將硬件的性能發(fā)揮到極致。前者屬于算法范疇骗奖,后者則是和并發(fā)編程息息相關(guān)了醒串。那計(jì)算機(jī)主要有哪些硬件呢?主要是兩類(lèi):一個(gè)是 I/O仰挣,一個(gè)是 CPU缠沈。簡(jiǎn)言之,在并發(fā)編程領(lǐng)域香椎,提升性能本質(zhì)上就是提升硬件的利用率禽篱,再具體點(diǎn)來(lái)說(shuō)躺率,就是提升 I/O 的利用率和 CPU 的利用率万矾。

在單核時(shí)代,多線程主要就是用來(lái)平衡 CPU 和 I/O 設(shè)備的后添。如果程序只有 CPU 計(jì)算薪丁,而沒(méi)有 I/O 操作的話馅精,多線程不但不會(huì)提升性能洲敢,還會(huì)使性能變得更差茄蚯,原因是增加了線程切換的成本。但是在多核時(shí)代壮不,這種純計(jì)算型的程序也可以利用多線程來(lái)提升性能皱碘。為什么呢?因?yàn)槔枚嗪丝梢越档晚憫?yīng)時(shí)間家凯。



2如失、創(chuàng)建線程幾種方式

繼承Thread類(lèi):Thread是類(lèi),有單繼承的局限性掂之。
實(shí)現(xiàn)Runnable接口:任務(wù)和線程分開(kāi)脆丁,不能返回執(zhí)行結(jié)果。
實(shí)現(xiàn)Callable接口:利用FutureTask執(zhí)行任務(wù)跟压,能取到執(zhí)行結(jié)果歼培。

package com.top.test.threads;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class ThreadPoolTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Thread a = new A();
        Thread b = new Thread(new B());
        FutureTask<String> futureTask = new FutureTask<>(new C());
        Thread c = new Thread(futureTask);

        a.start();
        b.start();
        c.start();
        System.out.println(futureTask.get());
    }
}

class A extends Thread {
    @Override
    public void run() {
        System.out.println("繼承Thread的線程任務(wù)");
    }
}

class B implements Runnable {
    @Override
    public void run() {
        System.out.println("實(shí)現(xiàn)Runnable的線程任務(wù)");
    }
}

class C implements Callable<String> {
    @Override
    public String call() throws Exception {
        return "實(shí)現(xiàn)Callable的線程任務(wù)";
    }
}

但是我們工作中一般不這樣來(lái)創(chuàng)建線程躲庄。原因:雖然在 Java 語(yǔ)言中創(chuàng)建線程看上去就像創(chuàng)建一個(gè)對(duì)象一樣簡(jiǎn)單,只需要 new Thread() 就可以了笋庄,但實(shí)際上創(chuàng)建線程遠(yuǎn)不是創(chuàng)建一個(gè)對(duì)象那么簡(jiǎn)單。創(chuàng)建對(duì)象菌仁,僅僅是在 JVM 的堆里分配一塊內(nèi)存而已哆键;而創(chuàng)建一個(gè)線程,卻需要調(diào)用操作系統(tǒng)內(nèi)核的 API闪盔,然后操作系統(tǒng)要為線程分配一系列的資源辱士,這個(gè)成本就很高了,所以線程是一個(gè)重量級(jí)的對(duì)象异赫,應(yīng)該避免頻繁創(chuàng)建和銷(xiāo)毀头岔。

那應(yīng)該怎樣創(chuàng)建線程呢?你應(yīng)該立刻想到了用線程池靠抑。利用線程池把資源池化适掰,使得線程資源能服用,可以避免頻繁地創(chuàng)建和銷(xiāo)毀载城。



3费就、生產(chǎn)者-消費(fèi)者模式

線程池的設(shè)計(jì)采用了生產(chǎn)者-消費(fèi)者模式。線程池的使用方是生產(chǎn)者垦搬,線程池本身是消費(fèi)者艳汽,阻塞隊(duì)列來(lái)存儲(chǔ)要處理的任務(wù)对雪。簡(jiǎn)單畫(huà)了個(gè)圖:



4、Java中的線程池

從jdk1.5版本開(kāi)始馋艺,在java.uitl.concurrent包下面定義定義了一些列與并發(fā)相關(guān)的類(lèi),其中線程池最核心的一個(gè)類(lèi)是ThreadPoolExecutor碱鳞。

查看ThreadPoolExecutor的源碼踱蛀,看下基本的繼承關(guān)系:

public class ThreadPoolExecutor extends AbstractExecutorService {
    …
}

public abstract class AbstractExecutorService implements ExecutorService {
    …
}
public interface ExecutorService extends Executor {
    …
}
public interface Executor {
    void execute(Runnable command);
}

我們可以看出率拒,Executor接口中定義了execute方法,execute是用來(lái)執(zhí)行我們提交的任務(wù)的猬膨。

但是類(lèi)ThreadPoolExecutor源碼注釋中,是推薦我們使用類(lèi)Executors的工程方法來(lái)創(chuàng)建線程池的:

* <p>To be useful across a wide range of contexts, this class
* provides many adjustable parameters and extensibility
* hooks. However, programmers are urged to use the more convenient
* {@link Executors} factory methods {@link
* Executors#newCachedThreadPool} (unbounded thread pool, with
* automatic thread reclamation), {@link Executors#newFixedThreadPool}
* (fixed size thread pool) and {@link
* Executors#newSingleThreadExecutor} (single background thread), that
* preconfigure settings for the most common usage
* scenarios.

看下源碼勃痴,Executors提供四種線程池,分別為:

  • newCachedThreadPool創(chuàng)建一個(gè)可緩存線程池劣领,如果線程池長(zhǎng)度超過(guò)處理需要污它,可靈活回收空閑線程,若無(wú)可回收德澈,則新建線程固惯。
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

  • newFixedThreadPool 創(chuàng)建一個(gè)定長(zhǎng)線程池,可控制線程最大并發(fā)數(shù)镇辉,超出的線程會(huì)在隊(duì)列中等待贴捡。
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
  • newScheduledThreadPool 創(chuàng)建一個(gè)定長(zhǎng)線程池,支持定時(shí)及周期性任務(wù)執(zhí)行屹逛。
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}
  • newSingleThreadExecutor 創(chuàng)建一個(gè)單線程化的線程池,它只會(huì)用唯一的工作線程來(lái)執(zhí)行任務(wù)罕模,保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級(jí))執(zhí)行。
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

對(duì)于上面創(chuàng)建四種線程池蒿讥,我這里就不一一寫(xiě)例子了抛腕,網(wǎng)上多的是例子。

既然JDK提供了這么好的工具類(lèi)侥钳,我們是不是就肯定選擇它呢柄错?并不是,在阿里開(kāi)發(fā)手冊(cè)中有這樣一條:

需要阿里開(kāi)發(fā)手冊(cè)的同學(xué)给猾,可在公眾號(hào)“Java尖子生”颂跨,回復(fù)“alibaba”領(lǐng)取。

看來(lái)池颈,最終都有可能導(dǎo)致OOM钓丰,而 OOM 會(huì)導(dǎo)致所有請(qǐng)求都無(wú)法處理,這是致命問(wèn)題琢歇。所以強(qiáng)烈建議使用有界隊(duì)列并執(zhí)行最大線程數(shù)梦鉴。



5、自定義線程池ThreadPoolExecutor

從上面的源碼魄宏,我們也看到的四種線程池的最終方式也是調(diào)用的ThreadPoolExecutor的構(gòu)造方法:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                          long keepAliveTime, TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

ThreadPoolExecutor 的構(gòu)造函數(shù)非常復(fù)雜存筏,現(xiàn)在我們自己通過(guò)ThreadPoolExecutor來(lái)創(chuàng)建線程池塔次,那么我們就有必要詳細(xì)了解下ThreadPoolExecutor的構(gòu)造函數(shù)的每一個(gè)參數(shù)。

5.1 corePoolSize
核心池(核心線程數(shù))的大小藕溅。在創(chuàng)建了線程池后,默認(rèn)情況下汁掠,線程池中并沒(méi)有任何線程集币,而是等待有任務(wù)到來(lái)才創(chuàng)建線程去執(zhí)行任務(wù),除非調(diào)用了prestartAllCoreThreads()或者prestartCoreThread()方法乞榨,從這2個(gè)方法的名字就可以看出当娱,是預(yù)創(chuàng)建線程的意思,即在沒(méi)有任務(wù)到來(lái)之前就創(chuàng)建corePoolSize個(gè)線程或者一個(gè)線程跨细。默認(rèn)情況下,在創(chuàng)建了線程池后震叙,線程池中的線程數(shù)為0散休,當(dāng)有任務(wù)來(lái)之后,就會(huì)創(chuàng)建一個(gè)線程去執(zhí)行任務(wù)匣砖,當(dāng)線程池中的線程數(shù)目達(dá)到corePoolSize后昏滴,就會(huì)把到達(dá)的任務(wù)放到緩存隊(duì)列(阻塞隊(duì)列)當(dāng)中。
核心線程數(shù)的選擇很重要拂共,那創(chuàng)建多少個(gè)線程是合理的呢姻几?

對(duì)于 CPU 密集型的計(jì)算場(chǎng)景势告,理論上“線程的數(shù)量 =CPU 核數(shù)”就是最合適的咱台。不過(guò)在工程上俭驮,線程的數(shù)量一般會(huì)設(shè)置為“CPU 核數(shù) +1”,這樣的話遗遵,當(dāng)線程因?yàn)榕紶柕膬?nèi)存頁(yè)失效或其他原因?qū)е伦枞麜r(shí)逸嘀,這個(gè)額外的線程可以頂上,從而保證 CPU 的利用率崭倘。

對(duì)于 I/O 密集型計(jì)算場(chǎng)景,最佳的線程數(shù)是與程序中 CPU 計(jì)算和 I/O 操作的耗時(shí)比相關(guān)的登澜,我們可以總結(jié)出這樣一個(gè)公式【理論值】:
最佳線程數(shù) =CPU 核數(shù) * [ 1 +(I/O 耗時(shí) / CPU 耗時(shí))]

但實(shí)際【經(jīng)驗(yàn)值】告訴我們應(yīng)該為:2 * CPU 的核數(shù) + 1

I/O 耗時(shí)和 CPU 耗時(shí)的比值是一個(gè)關(guān)鍵參數(shù)飘庄,不幸的是這個(gè)參數(shù)是未知的,而且是動(dòng)態(tài)變化的谴仙,所以工程上碾盐,我們要估算這個(gè)參數(shù),然后做各種不同場(chǎng)景下的壓測(cè)來(lái)驗(yàn)證我們的估計(jì)掀虎。不過(guò)工程上付枫,原則還是將硬件的性能發(fā)揮到極致,所以壓測(cè)時(shí)二打,我們需要重點(diǎn)關(guān)注 CPU掂榔、I/O 設(shè)備的利用率和性能指標(biāo)(響應(yīng)時(shí)間症杏、吞吐量)之間的關(guān)系瑞信。
工作中都是按照邏輯核數(shù)來(lái)的,理論值和經(jīng)驗(yàn)值只是提供個(gè)指導(dǎo)走芋,實(shí)際上還是要靠壓測(cè)E琐辍8嚷亍状植!

5.2 maximumPoolSize
線程池最大線程數(shù),表示在線程池中最多能創(chuàng)建多少個(gè)線程振定。當(dāng)阻塞隊(duì)列裝滿時(shí)肉拓,繼續(xù)提交任務(wù),會(huì)創(chuàng)建救急(非核心)線程來(lái)處理卑惜。注意: 不是先前創(chuàng)建的線程是核心線程驻售,后面創(chuàng)建的線程是非核心線程,線程是沒(méi)有核心非核心的概念的欺栗。

5.3 不同場(chǎng)景下提交任務(wù),線程池的表現(xiàn)形式
【其中workerThread表示工作線程數(shù)】消请。
當(dāng)workerThread等于0時(shí)瘤旨,提交任務(wù),創(chuàng)建線程
當(dāng)workerThread小于corePoolSize時(shí)因宇,提交任務(wù),創(chuàng)建線程察滑,不管其他工作線程是不是閑置的。
當(dāng)workerThread大于等于corePoolSize 且 workerThread小于maxinumPoolsize時(shí)户盯,將任務(wù)添加到隊(duì)列中饲化,當(dāng)隊(duì)列滿了后,創(chuàng)建線程硫眨。當(dāng)一個(gè)線程完成任務(wù)時(shí)巢块,它會(huì)從隊(duì)列中取下一個(gè)任務(wù)來(lái)執(zhí)行。
當(dāng)workerThread等于maxinumPoolsize時(shí)姥闭,提交任務(wù),既不能加入隊(duì)列,也不能創(chuàng)建新的線程越走,將RejectedExecutionHandler的rejectedExecution方法執(zhí)行拒絕策略。
下面是ThreadPoolExecutor 的execute()方法的源碼及注釋:

/**
 * Executes the given task sometime in the future.  The task
 * may execute in a new thread or in an existing pooled thread.
 *
 * If the task cannot be submitted for execution, either because this
 * executor has been shutdown or because its capacity has been reached,
 * the task is handled by the current {@code RejectedExecutionHandler}.
 *
 * @param command the task to execute
 * @throws RejectedExecutionException at discretion of
 *         {@code RejectedExecutionHandler}, if the task
 *         cannot be accepted for execution
 * @throws NullPointerException if {@code command} is null
 */
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}
/**
 * Invokes the rejected execution handler for the given command.
 * Package-protected for use by ScheduledThreadPoolExecutor.
 */
final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

5.4 keepAliveTime和unit
keepAliveTime表示線程沒(méi)有任務(wù)執(zhí)行時(shí)最多保持多久時(shí)間會(huì)終止。默認(rèn)情況下庭敦,只有當(dāng)線程池中的線程數(shù)大于corePoolSize時(shí),keepAliveTime才會(huì)起作用伞广,直到線程池中的線程數(shù)不大于corePoolSize疼电,即當(dāng)線程池中的線程數(shù)大于corePoolSize時(shí),如果一個(gè)線程空閑的時(shí)間達(dá)到keepAliveTime蔽豺,則會(huì)終止,直到線程池中的線程數(shù)不超過(guò)corePoolSize沧侥。但是如果調(diào)用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數(shù)不大于corePoolSize時(shí)癣朗,keepAliveTime參數(shù)也會(huì)起作用旺罢,直到線程池中的線程數(shù)為0;當(dāng)一個(gè)線程無(wú)事可做正卧,超過(guò)一定的時(shí)間(keepAliveTime)時(shí)跪解,線程池會(huì)判斷,如果當(dāng)前運(yùn)行的線程數(shù)大于 corePoolSize惠遏,那么這個(gè)線程就被停掉节吮。所以線程池的所有任務(wù)完成后判耕,它最終會(huì)收縮到 corePoolSize 的大小。

unit:參數(shù)keepAliveTime的時(shí)間單位帚豪,有7種取值草丧,在TimeUnit類(lèi)中有7種靜態(tài)屬性:
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小時(shí)
TimeUnit.MINUTES; //分鐘
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //納秒

5.5 workQueue
一個(gè)阻塞隊(duì)列,用來(lái)存儲(chǔ)等待執(zhí)行的任務(wù)烛亦,這個(gè)參數(shù)的選擇也很重要,會(huì)對(duì)線程池的運(yùn)行過(guò)程產(chǎn)生重大影響煤禽,一般來(lái)說(shuō)岖赋,這里的阻塞隊(duì)列有以下幾種選擇:
ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;
線程池的排隊(duì)策略與BlockingQueue有關(guān)。

關(guān)于隊(duì)列选脊、阻塞隊(duì)列,后面我會(huì)寫(xiě)一篇文章祈争。

5.6 threadFactory
通過(guò)這個(gè)參數(shù)你可以自定義如何創(chuàng)建線程角寸,例如你可以給線程指定一個(gè)有意義的名字。

給線程賦予一個(gè)有意義的名字很重要沮峡,我這里整理了幾種方法:
方法一:自己實(shí)現(xiàn)ThreadFactory并制定給線程池,在實(shí)現(xiàn)的ThreadFactory中設(shè)定計(jì)數(shù)和調(diào)用Thread.setName邢疙。
方法二:guava的ThreadFactoryBuilder.setNameFormat可以指定一個(gè)前綴望薄,使用%d表示序號(hào);例如:

ThreadFactory myThreadFactory =(new ThreadFactoryBuilder()).setNameFormat("my-pool-thread-%d").build();

ThreadFactory是JDK自帶的颁虐,下面我們?cè)敿?xì)看下如何用ThreadFactory來(lái)實(shí)現(xiàn)卧须。
ThreadFactory是一個(gè)只有一個(gè)newThread方法的接口:

public interface ThreadFactory {
    Thread newThread(Runnable r);
}

我們利用這個(gè)方法我們可以做什么呢?可以給線程命名笋籽,查看線程數(shù)椭员,指定是否守護(hù)線程,設(shè)置線程優(yōu)先級(jí)等等隘击。

下面是我寫(xiě)的完整例子,你可以拿這個(gè)例子做其他場(chǎng)景的測(cè)試(對(duì)shutdown等方法不太懂的竭贩,往下看):

package com.top.test.threads;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class TestThreadFactory {
    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(8, 100,
                0L, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<>(100),
                new CustomThreadFactory(false, 1), new ThreadPoolExecutor.AbortPolicy());
        System.out.println("開(kāi)始往線程池中提交任務(wù)");
        // 可以把10改成其他值來(lái)測(cè)試其他場(chǎng)景哦-_-
        for (int i = 0; i < 10; i++) {
            threadPool.submit(() -> {
                // 可以在此睡眠一下
                /*try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }*/
                System.out.println("我是子線程:" + Thread.currentThread().getName());
            });
        }
        System.out.println("一共往線程池提交了10個(gè)任務(wù)");
        System.out.println("線程池是否已關(guān)閉:" + threadPool.isShutdown());
        // 發(fā)起關(guān)閉線程池
        threadPool.shutdown();
        System.out.println("線程池是否已關(guān)閉:" + threadPool.isShutdown());
        System.out.println("線程池是否已終止:" + threadPool.isTerminated());
        threadPool.awaitTermination(1, TimeUnit.SECONDS);
        if (threadPool.isTerminated()) {
            System.out.println("線程池是否已終止:" + threadPool.isTerminated());
        } else {
            System.out.println("線程池未終止(超時(shí))就退出");
        }
    }
}

class CustomThreadFactory implements ThreadFactory {
    // 原子類(lèi) CAS 保證線程安全
    private AtomicInteger threadCount = new AtomicInteger();

    private boolean isDaemon;
    private int priority;

    public CustomThreadFactory(boolean isDaemon, int priority) {
        this.isDaemon = isDaemon;
        this.priority = priority;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r);
        thread.setName("My_Custom_Thread_" + threadCount.incrementAndGet());
        thread.setDaemon(isDaemon);
        thread.setPriority(priority);
        return thread;
    }
}

執(zhí)行結(jié)果:

Connected to the target VM, address: '127.0.0.1:53514', transport: 'socket'
開(kāi)始往線程池中提交任務(wù)
一共往線程池提交了10個(gè)任務(wù)
線程池是否已關(guān)閉:false
線程池是否已關(guān)閉:true
線程池是否已終止:false
我是子線程:My_Custom_Thread_1
我是子線程:My_Custom_Thread_1
我是子線程:My_Custom_Thread_1
我是子線程:My_Custom_Thread_2
我是子線程:My_Custom_Thread_3
我是子線程:My_Custom_Thread_4
我是子線程:My_Custom_Thread_5
我是子線程:My_Custom_Thread_6
我是子線程:My_Custom_Thread_7
我是子線程:My_Custom_Thread_8
線程池是否已終止:true
Disconnected from the target VM, address: '127.0.0.1:53514', transport: 'socket'
Process finished with exit code 0

5.6 Handler
通過(guò)這個(gè)參數(shù)你可以自定義任務(wù)的拒絕策略留量。如果線程池中所有的線程都在忙碌楼熄,并且工作隊(duì)列也滿了(前提是工作隊(duì)列是有界隊(duì)列),那么此時(shí)提交任務(wù)可岂,線程池就會(huì)拒絕接收。
至于拒絕的策略稚茅,你可以通過(guò) handler 這個(gè)參數(shù)來(lái)指定平斩。ThreadPoolExecutor 已經(jīng)提供了以下 4 種策略。

  • CallerRunsPolicy:提交任務(wù)的線程自己去執(zhí)行該任務(wù)欺税。
  • AbortPolicy:默認(rèn)的拒絕策略,會(huì) throws RejectedExecutionException晚凿。
  • DiscardPolicy:直接丟棄任務(wù)瘦馍,沒(méi)有任何異常拋出。
  • DiscardOldestPolicy:丟棄最老的任務(wù),其實(shí)就是把最早進(jìn)入工作隊(duì)列的任務(wù)丟棄扛吞,然后把新任務(wù)加入到工作隊(duì)列。

注意:使用有界隊(duì)列滥比,當(dāng)任務(wù)過(guò)多時(shí),線程池會(huì)觸發(fā)執(zhí)行拒絕策略濒持,線程池默認(rèn)的拒絕策略會(huì) throw RejectedExecutionException 這是個(gè)運(yùn)行時(shí)異常寺滚,對(duì)于運(yùn)行時(shí)異常編譯器并不強(qiáng)制 catch 它,所以開(kāi)發(fā)人員很容易忽略官套。因此默認(rèn)拒絕策略要慎重使用。如果線程池處理的任務(wù)非常重要奶赔,建議自定義自己的拒絕策略;并且在實(shí)際工作中另伍,自定義的拒絕策略往往和降級(jí)策略配合使用。下面我給出一個(gè)完整的示例代碼:

public static class CallerRunsEnhancedPolicy implements RejectedExecutionHandler {
    public CallerRunsEnhancedPolicy() {}

    /**
     * Executes task r in the caller's thread, unless the executor has been shut down, in which case the task is
     * discarded.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // 可以在此加一條日志
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

public static class AbortEnhancedPolicy implements RejectedExecutionHandler {
    public AbortEnhancedPolicy() {}

    /**
     * Always throws RejectedExecutionException.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     * @throws RejectedExecutionException always.
     */
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // 可以在此加一條日志
        throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
    }
}

public static class RetryPolicy implements RejectedExecutionHandler {
    public RetryPolicy() {}

    /**
     * 提醒并進(jìn)入后補(bǔ)線程池排隊(duì)執(zhí)行
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // 可以在此加一條日志
        if (!e.isShutdown()) {
            // 可以在此加一條日志
            try {
                // ThreadPool.getAlternateExecutor()返回的是一個(gè)線程池摆尝,這個(gè)線程池應(yīng)該是個(gè)全局的線程池玻靡,最好只初始化一次,保證其線程的安全性(可以用單例模式來(lái)初始化)
                ThreadPool.getAlternateExecutor().execute(r);
            } catch (Exception ex) {
                // 可以在此加一條日志
                throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()
                        + "; failure retry execute task in " + ThreadPool.getAlternateExecutor().toString());
            }
        }
    }
}

public static class WaitPolicy implements RejectedExecutionHandler {
    public WaitPolicy() {}

    /**
     * 提醒并等待囤捻,直到加入線程池隊(duì)列中
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // 可以在此加一條日志
        if (!e.isShutdown()) {
            // 可以在此加一條日志
            try {
                e.getQueue().put(r);
                // 可以在此加一條日志
            } catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
                // 可以在此加一條日志
                throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()
                        + "; waiting put task to pool interrupted");
            } catch (Exception ex) {
                // 可以在此加一條日志
                throw new RejectedExecutionException(
                        "Task " + r.toString() + " rejected from " + e.toString() + "; failure put task to pool");
            }
        }
    }
}

下面是ThreadPool的定義:

public final class ThreadPool {
public static final int LOGICAL_CPU_CORE_COUNT = Runtime.getRuntime().availableProcessors();
public static final int MAX_POOLSIZE_THRESHOLD = 2000;

private static Object alternateExecutorLock = new Object();
private static volatile ListenableThreadPoolExecutor alternateExecutor = null;

public static ThreadPoolExecutor getAlternateExecutor() {
    synchronized (alternateExecutorLock) {
        if (alternateExecutor == null) {
            initAlternateExecutor();
        } else if (alternateExecutor.isShutdown()) {
            boolean term = false;
            while (!term) {
                try {
                    term = alternateExecutor.awaitTermination(1, TimeUnit.SECONDS);
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                }
            }
            initAlternateExecutor();
        }
        return alternateExecutor;
    }
}

private static void initAlternateExecutor() {
    int threadCount =
            getRecommendedThreadPoolSizeByTaskIntensiveType(TaskIntensiveType.CPU, MAX_POOLSIZE_THRESHOLD);
    int maxThreadCount = getMaxThreadPoolSizeByTaskIntensiveType(TaskIntensiveType.CPU, MAX_POOLSIZE_THRESHOLD);
    alternateExecutor = newThreadPool(threadCount, maxThreadCount, 60L, maxThreadCount * 3,
            new WaitPolicy(), (new ThreadFactoryBuilder()).setNameFormat("alternate-pool-thread-%d").build());
    alternateExecutor.allowCoreThreadTimeOut(true);
}

public static int getRecommendedThreadPoolSizeByTaskIntensiveType(TaskIntensiveType taskIntensive,
        int poolSizeThreshold) {
    int poolSize;
    switch (taskIntensive) {
        case IO:
            poolSize = 2 * LOGICAL_CPU_CORE_COUNT + 1; // NOSONAR
            break;
        case CPU:
        default:
            poolSize = LOGICAL_CPU_CORE_COUNT + 1;
            break;
    }
    if (poolSize > poolSizeThreshold) {
        return poolSizeThreshold;
    }
    return poolSize;
}

public static int getMaxThreadPoolSizeByTaskIntensiveType(TaskIntensiveType taskIntensive, int poolSizeThreshold) {
    int poolSize;
    switch (taskIntensive) {
        case IO:
            poolSize = 2 * LOGICAL_CPU_CORE_COUNT * 5 + 1; // NOSONAR
            break;
        case CPU:
        default:
            poolSize = LOGICAL_CPU_CORE_COUNT * 5 + 1;
            break;
    }
    if (poolSize > poolSizeThreshold) {
        return poolSizeThreshold;
    }
    return poolSize;
}

public enum TaskIntensiveType {
    IO(0), CPU(1);

    private int value;

    private TaskIntensiveType(int value) {
        this.value = value;
    }

    public static TaskIntensiveType valueOf(int value) {
        for (TaskIntensiveType oneVal : TaskIntensiveType.values()) {
            if (oneVal.value == value) {
                return oneVal;
            }
        }
        return null;
    }

    public int getValue() {
        return value;
    }

}



6视哑、ThreadPoolExecutor常見(jiàn)方法

6.1 execute()
方法實(shí)際上是Executor中聲明的方法誊涯,在ThreadPoolExecutor進(jìn)行了具體的實(shí)現(xiàn),這個(gè)方法是ThreadPoolExecutor的核心方法跪呈,通過(guò)這個(gè)方法可以向線程池提交一個(gè)任務(wù)取逾,交由線程池去執(zhí)行。

6.2 submit()
方法是在ExecutorService中聲明的方法误阻,在AbstractExecutorService就已經(jīng)有了具體的實(shí)現(xiàn)旭愧,在ThreadPoolExecutor中并沒(méi)有對(duì)其進(jìn)行重寫(xiě),這個(gè)方法也是用來(lái)向線程池提交任務(wù)的儒洛,但是它和execute()方法不同,它能夠返回任務(wù)執(zhí)行的結(jié)果琅锻,去看submit()方法的實(shí)現(xiàn)唐含,會(huì)發(fā)現(xiàn)它實(shí)際上還是調(diào)用的execute()方法,只不過(guò)它利用了Future來(lái)獲取任務(wù)執(zhí)行結(jié)果沫浆。submit可以提交Runnable和 Callable。

6.3 6.3 shutdown()
將線程池狀態(tài)置為SHUTDOWN淮捆。平滑的關(guān)閉ExecutorService本股,當(dāng)此方法被調(diào)用時(shí),ExecutorService停止接收新的任務(wù)并且等待已經(jīng)提交的任務(wù)(包含提交正在執(zhí)行和提交未執(zhí)行)執(zhí)行完成拄显。當(dāng)所有提交任務(wù)執(zhí)行完畢,線程池即被關(guān)閉棘街。所以手動(dòng)調(diào)用shotdown方法,可以不必?fù)?dān)心存在剩余任務(wù)沒(méi)有執(zhí)行的情況承边。

6.4 shutdownNow()
將線程池狀態(tài)置為STOP。跟shutdown()一樣博助,先停止接收外部提交的任務(wù),忽略隊(duì)列里等待的任務(wù)富岳,嘗試將正在跑的任務(wù)interrupt中斷,返回未執(zhí)行的任務(wù)列表蚁飒。
對(duì)于那些正在執(zhí)行的task脖镀,并不能保證他們就一定會(huì)直接停止執(zhí)行狼电,或許他們會(huì)暫停,或許會(huì)執(zhí)行直到完成强窖,但是ExecutorService會(huì)盡力關(guān)閉所有正在運(yùn)行的task。

6.5 awaitTermination(long timeout, TimeUnit unit)
awaitTermination方法接收timeout和TimeUnit兩個(gè)參數(shù)翅溺,用于設(shè)定超時(shí)時(shí)間及單位。當(dāng)?shù)却^(guò)設(shè)定時(shí)間時(shí)咙崎,會(huì)監(jiān)測(cè)ExecutorService是否已經(jīng)關(guān)閉,若關(guān)閉則返回true褪猛,否則返回false。一般情況下會(huì)和shutdown方法組合使用碳却。
第一個(gè)參數(shù)指定的是時(shí)間,第二個(gè)參數(shù)指定的是時(shí)間單位(當(dāng)前是秒)昼浦。返回值類(lèi)型為boolean型筒主。
如果等待的時(shí)間超過(guò)指定的時(shí)間,但是線程池中的線程運(yùn)行完畢物舒,那么awaitTermination()返回true。執(zhí)行分線程已結(jié)束火诸。
如果等待的時(shí)間超過(guò)指定的時(shí)間,但是線程池中的線程未運(yùn)行完畢置蜀,那么awaitTermination()返回false悉盆。不執(zhí)行分線程已結(jié)束。
如果等待時(shí)間沒(méi)有超過(guò)指定時(shí)間焕盟,等待!

可以用awaitTermination()方法來(lái)判斷線程池中是否有繼續(xù)運(yùn)行的線程灼卢。
下面是ThreadPoolExecutor的awaitTermination源碼:

/**
 * Wait condition to support awaitTermination
 */
private final Condition termination = mainLock.newCondition();

public boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException {
// 等待時(shí)長(zhǎng)
    long nanos = unit.toNanos(timeout);
//加鎖
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (;;) {
            if (runStateAtLeast(ctl.get(), TERMINATED))
                return true;
            if (nanos <= 0)
                return false;
    // condition條件等待
            nanos = termination.awaitNanos(nanos);
        }
    } finally {
        mainLock.unlock();
    }
}

6.6 其它方法
isShutdown():線程池是否已關(guān)閉

isTerminated:線程池是否已終止


7来农、線程池是否需要關(guān)閉呢?

這里說(shuō)下兩種情況下的線程池:全局線程池沃于、局部線程池海诲。

全局線程池:其實(shí)我們工作中使用線程池都是全局的特幔,當(dāng)然我們也可能為不同業(yè)務(wù)建立不同的全局線程池。那全局線程池是否需要關(guān)閉呢敬辣?分兩種場(chǎng)景:(一)如果每次在程序的結(jié)尾都去關(guān)閉線程池零院,那每次有新的任務(wù)進(jìn)來(lái),都要重新建立一個(gè)線程池告抄,這樣難免也太消耗資源了,而且原本這就是一個(gè)全局的龄糊。(二)如果一個(gè)線程池在某段時(shí)間內(nèi)處理了大量的任務(wù),創(chuàng)建了大量的線程炫惩,這個(gè)時(shí)間段之后可能一下子也沒(méi)有新的任務(wù)進(jìn)來(lái)阿浓,那這個(gè)線程池需要關(guān)閉嘛 ?我個(gè)人理解也是沒(méi)必要的芭毙,我們通過(guò)參數(shù)keepAliveTime可以給線程設(shè)置在沒(méi)有任務(wù)處理時(shí)的生存時(shí)間,并且調(diào)用allowCoreThreadTimeOut(boolean)方法粘咖,這樣充分減少資源的浪費(fèi)。

局部線程池:這種線程池工作中也該盡量避免使用瓮下。如果每個(gè)請(qǐng)求進(jìn)來(lái)都創(chuàng)建一個(gè)新的線程池钝域,當(dāng)請(qǐng)求多的時(shí)候,線程池增加网梢,那難免會(huì)有OOM赂毯。如果你真的使用了拣宰,那請(qǐng)記住一定要調(diào)用shutdown()來(lái)關(guān)閉烦感。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市晌该,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌朝群,老刑警劉巖中符,帶你破解...
    沈念sama閱讀 216,651評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異右莱,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)慢蜓,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,468評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén)郭膛,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人凄诞,你說(shuō)我怎么就攤上這事》” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 162,931評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵汛蝙,是天一觀的道長(zhǎng)朴肺。 經(jīng)常有香客問(wèn)我,道長(zhǎng)西土,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,218評(píng)論 1 292
  • 正文 為了忘掉前任需了,我火速辦了婚禮,結(jié)果婚禮上肋乍,老公的妹妹穿的比我還像新娘。我一直安慰自己墓造,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,234評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布帝雇。 她就那樣靜靜地躺著蛉拙,像睡著了一般。 火紅的嫁衣襯著肌膚如雪刘离。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,198評(píng)論 1 299
  • 那天茧痕,我揣著相機(jī)與錄音,去河邊找鬼踪旷。 笑死,一個(gè)胖子當(dāng)著我的面吹牛令野,可吹牛的內(nèi)容都是我干的徽级。 我是一名探鬼主播,決...
    沈念sama閱讀 40,084評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼餐抢,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了碳锈?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 38,926評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤售碳,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后竿屹,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,341評(píng)論 1 311
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,563評(píng)論 2 333
  • 正文 我和宋清朗相戀三年力惯,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片父晶。...
    茶點(diǎn)故事閱讀 39,731評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡弄跌,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出铛只,到底是詐尸還是另有隱情,我是刑警寧澤淳玩,帶...
    沈念sama閱讀 35,430評(píng)論 5 343
  • 正文 年R本政府宣布,位于F島的核電站谋竖,受9級(jí)特大地震影響承匣,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜韧骗,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,036評(píng)論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望袍暴。 院中可真熱鬧,春花似錦容诬、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,676評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)纽什。三九已至,卻和暖如春芦缰,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背让蕾。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,829評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留笋婿,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,743評(píng)論 2 368
  • 正文 我出身青樓缸濒,卻偏偏與公主長(zhǎng)得像粱腻,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子绍些,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,629評(píng)論 2 354