深入Java線程池講解

@[toc]

一沃琅、什么是線程池


線程池就是創(chuàng)建若干個(gè)可執(zhí)行的線程放入一個(gè)池(容器)中,有任務(wù)需要處理時(shí)蜘欲,會(huì)提交到線程池中的任務(wù)隊(duì)列益眉,處理完之后線程并不會(huì)被銷毀,而是仍然在線程池中等待下一個(gè)任務(wù)姥份。

Java中的線程池是運(yùn)用場(chǎng)景最多的并發(fā)框架郭脂,幾乎所有需要異步或并發(fā)執(zhí)行任務(wù)的程序都可以使用線程池。在開發(fā)過(guò)程中澈歉,合理地使用線程池能夠帶來(lái)以下下好處

  1. <font color = "orange">降低資源消耗 </font>展鸡,通過(guò)重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。
  2. <font color = "orange">提高響應(yīng)速度</font> 埃难,當(dāng)任務(wù)到達(dá)時(shí)莹弊,任務(wù)可以不需要等到線程創(chuàng)建就能立即執(zhí)行。
  3. <font color = "orange">提高線程的可管理性</font>涡尘,線程是稀缺資源忍弛,如果無(wú)限制地創(chuàng)建,不僅會(huì)消耗系統(tǒng)資源考抄,還會(huì)降低系統(tǒng)的穩(wěn)定性细疚,使用線程池可以進(jìn)行統(tǒng)一分配、調(diào)優(yōu)和監(jiān)控

二川梅、Executor 框架


我們知道線程池就是線程的集合惠昔,下提供了集中管理、線程重用挑势、降低資源消耗镇防、提高響應(yīng)速度等 從 JDK 1.5之后。為了把工作單元與執(zhí)行機(jī)制分開潮饱,<font color = "orange">Executor</font> 框架誕生了来氧,他是一個(gè)用于統(tǒng)一創(chuàng)建與運(yùn)行的接口。<font color = "orange">Executor</font> 框架實(shí)現(xiàn)的就是線程池的功能香拉。<font color = "orange">Executor</font> 框架不僅包括了線程池的管理啦扬,還提供了線程工廠、隊(duì)列以及拒絕策略等凫碌,<font color = "orange">Executor</font> 框架讓并發(fā)編程變得更加簡(jiǎn)單扑毡。


2.1 Executor 框架組成




1. 任務(wù) (Runnable / Callable)

執(zhí)行任務(wù)需要實(shí)現(xiàn) Runnable 接口或者 Callable 接口

2. 任務(wù)等執(zhí)行 (Executor)

包括任務(wù)執(zhí)行機(jī)制的核心接口Executor,以及繼承自Executor的 ExecutorService 接口盛险。Executor框架有兩個(gè)關(guān)鍵類實(shí)現(xiàn)了ExecutorService接口:ThreadPoolExecutor 和 ScheduledThreadPoolExecutor


    ExecutorService executorService = Executors.newFixedThreadPool(5);
    

我們實(shí)現(xiàn)線程池通過(guò) Executors 實(shí)現(xiàn) ExecutorService 接口瞄摊,ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 這兩個(gè)關(guān)鍵類實(shí)現(xiàn)了 ExecutorService 接口

ThreadPoolExecutor 類描述 :


    //AbstractExecutorService實(shí)現(xiàn)了ExecutorService接口
    public class ThreadPoolExecutor extends AbstractExecutorService 

    public abstract class AbstractExecutorService implements ExecutorService 
    

ScheduledThreadPoolExecutor 類描述:


    //繼承ThreadPoolExecutor
    public class ScheduledThreadPoolExecutor
            extends ThreadPoolExecutor
            implements ScheduledExecutorService
            

3. 異步計(jì)算的結(jié)果 (Future)

包括Future和實(shí)現(xiàn)Future接口的FutureTask類勋又。


2.2 Executor 結(jié)構(gòu)


在這里插入圖片描述
  • <font color = "orange">Executor</font> :是一個(gè)接口,他是Executor框架的基礎(chǔ)换帜,它將任務(wù)的提交與任務(wù)的執(zhí)行分離楔壤。
  • <font color = "orange">ThreadPoolExecutor</font> :是線程池的核心實(shí)現(xiàn)類,用來(lái)執(zhí)行被提交的任務(wù)惯驼。
  • <font color = "orange">ScheduledThreadPoolExcecutor</font> 是一個(gè)實(shí)現(xiàn)類蹲嚣,可以在給定等延遲后運(yùn)行命令,或者定期執(zhí)行命令祟牲,ScheduledThreadPoolExcecutoe比 Timer 更靈活隙畜,功能更強(qiáng)大
  • <font color = "orange">Future</font> Future接口和它的實(shí)現(xiàn)FutureTask類,代表異步計(jì)算的結(jié)果说贝。
  • <font color = "orange">ExecutorService</font> :是一個(gè)線程池的實(shí)現(xiàn)

2.2 Executor 使用

  1. 祝線程首先要?jiǎng)?chuàng)建實(shí)現(xiàn) Runnable 或者 Callable 接口的任務(wù)對(duì)象
    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("當(dāng)前線程 :" + Thread.currentThread().getName());
            }
        }).start();
    }
  1. 把創(chuàng)建完成實(shí)現(xiàn) Runnable 或者 Callable 接口對(duì)象直接交給 ExecutorService 執(zhí)行 <font color = "orange">ExecutorService.execute(Runnable command))</font>或者也可以把 Runnable 對(duì)象或Callable 對(duì)象提交給 ExecutorService 執(zhí)行 <font color = "orange">ExecutorService.execute(Runnable command))</font>或 ExecutorService 執(zhí)行 <font color = "orange">ExecutorService.submit(Callable <T> task))</font>
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("當(dāng)前線程:" + Thread.currentThread().getName());
            }
        });
        Future<?> submit = executorService.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println("當(dāng)前線程:" + Thread.currentThread().getName());
            }
        });
  
    }
  1. 如果 執(zhí)行 ExecutorService.submit(…)议惰,將會(huì)返回一個(gè) Future<?> 對(duì)象
  2. 祝線程可以執(zhí)行 Future.get() 方法來(lái)等待任務(wù)執(zhí)行完成 也可以執(zhí)行 FutureTask.cancel(boolean mayInterruptIfRunning)來(lái)取消此任務(wù)的執(zhí)行。

三狂丝、ThreadPoolExecutor 解析


3.1 構(gòu)造方法


    public ThreadPoolExecutor(int corePoolSize, //核心線程數(shù)
                              int maximumPoolSize, //最大線程數(shù)
                              long keepAliveTime, //線程數(shù)大于核心時(shí)换淆,多余線程存活時(shí)間
                              TimeUnit unit, //時(shí)間單位
                              BlockingQueue<Runnable> workQueue, //工作隊(duì)列
                              ThreadFactory threadFactory, //線程工廠
                              RejectedExecutionHandler handler //拒絕策略 哗总,線程過(guò)多的處理
                              ) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.acc = System.getSecurityManager() == null ?
                    null :
                    AccessController.getContext();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }

參數(shù)詳解 :

  • <font color = "orange">corePoolSize </font> :核心池的大小几颜。 當(dāng)有任務(wù)來(lái)之后,就會(huì)創(chuàng)建一個(gè)線程去執(zhí)行任務(wù)讯屈,當(dāng)線程池中的線程數(shù)目達(dá)到corePoolSize后蛋哭,就會(huì)把到達(dá)的任務(wù)放到緩存隊(duì)列當(dāng)中
  • <font color = "orange">maximumPoolSize </font> :當(dāng)隊(duì)列中存放的任務(wù)達(dá)到隊(duì)列容量的時(shí)候,當(dāng)前可以同時(shí)運(yùn)行的線程數(shù)量變?yōu)樽畲缶€程數(shù)
  • <font color = "orange">keepAliveTime</font> :當(dāng)線程池中的線程數(shù)量大于 <font color = "orange">corePoolSize </font> 的時(shí)候涮母,如果這時(shí)候沒(méi)有新的任務(wù)提交谆趾,核心線程外的線程不會(huì)立刻銷毀,而是等待 時(shí)間超過(guò)來(lái) <font color = "orange">keepAliveTime</font> 才會(huì)被銷毀
  • <font color = "orange">unit</font> :<font color = "orange">keepAliveTime</font> 參數(shù)的時(shí)間單位叛本。
  • <font color = "orange">workQueue </font> : 工作隊(duì)列 , 當(dāng)新任務(wù)來(lái)的時(shí)候會(huì)先判斷當(dāng)前運(yùn)行的線程數(shù)量是否達(dá)到核心心線程數(shù)沪蓬,如果達(dá)到的話,新任務(wù)就會(huì)被存放在隊(duì)列中
  • <font color = "orange">threadFactory </font> : 線程工廠来候,用來(lái)創(chuàng)建線程
  • <font color = "orange">handler </font> :拒絕策略跷叉,提交的任務(wù)過(guò)多而不能及時(shí)處理時(shí),我們可以定制策略來(lái)處理任務(wù)
在這里插入圖片描述

為什么要這么設(shè)計(jì)呢 有了最大線程數(shù)营搅,為什么要設(shè)計(jì)核心池大小呢?

  1. 如果當(dāng)前線程池中的線程數(shù) < <font color = "orange">corePoolSize </font> 則每來(lái)一個(gè)任務(wù)云挟,就會(huì)超級(jí)愛(ài)你一個(gè)線程去執(zhí)行這個(gè)任務(wù)
  2. 如果當(dāng)前線程池中的線程數(shù) >= <font color = "orange">corePoolSize </font> , 則每來(lái)一個(gè)任務(wù),會(huì)將其添加到工作隊(duì)列中转质,若添加成功园欣,則等待 核心線程空閑將其取出執(zhí)行,若添加失敗 (一般隊(duì)列已滿)則在總數(shù) 不大于 <font color = "orange">maximumPoolSize </font> 的前提下休蟹,創(chuàng)建新的線程
  3. 如果當(dāng)前線程池中的線程數(shù)達(dá)到 <font color = "orange">maximumPoolSize </font> 沸枯,則會(huì)才用拒絕策略進(jìn)行處理
  4. 補(bǔ)充 : 如果當(dāng)前線程池的數(shù)量大于 <font color = "orange">corePoolSize </font> 時(shí)日矫,如果某個(gè)線程空閑時(shí)間超過(guò)<font color = "orange">keepAliveTime</font> ,線程將被銷毀辉饱,直至線程池中的線程數(shù)目不大于 <font color = "orange">corePoolSize </font>


ThreadPoolExecutor 拒絕策略

  • <font color = "orange">ThreadPoolExecutor.AbortPolicy</font> :拋出 RejectedExecutionException來(lái)拒絕新任務(wù)的處理
  • <font color = "orange">ThreadPoolExecutor.CallerRunsPolicy</font> :調(diào)用執(zhí)行自己的線程運(yùn)行任務(wù)搬男,也就是直接在調(diào)用 execute 方法的線程運(yùn)行(run)被拒絕的任務(wù),如果執(zhí)行程序已關(guān)閉彭沼,則會(huì)丟棄該任務(wù)缔逛。因此這種策略會(huì)降低對(duì)于新任務(wù)的提交速度,影響程序的整體性能姓惑。
  • <font color = "orange">ThreadPoolExecutor.DiscardPolicy</font> :不處理新任務(wù)褐奴,直接丟棄掉。
  • <font color = "orange">ThreadPoolExecutor.DiscardOldestPolicy</font> :此策略將丟棄最早的未處理的任務(wù)請(qǐng)求


3.2 自定義 ThreadPoolExecutor


在 《阿里巴巴 JAVA 開發(fā)手冊(cè)》明確指出線程資源利用線程池于毙,線程池不允許使用 Executors 去創(chuàng)建

<font color = 'red'>【強(qiáng)制】</font> 線程資源必須通過(guò)線程池提供敦冬,不允許在應(yīng)用中自行顯式創(chuàng)建線程。
說(shuō)明:線程池的好處是減少在創(chuàng)建和銷毀線程上所消耗的時(shí)間以及系統(tǒng)資源的開銷唯沮,解決資源不足的問(wèn)題脖旱。
如果不使用線程池,有可能造成系統(tǒng)創(chuàng)建大量同類線程而導(dǎo)致消耗完內(nèi)存或者“過(guò)度切換”的問(wèn)題介蛉。

<font color = 'red'>【強(qiáng)制】</font> 線程池不允許使用 Executors 去創(chuàng)建萌庆,而是通過(guò) ThreadPoolExecutor 的方式,這
樣的處理方式讓寫的同學(xué)更加明確線程池的運(yùn)行規(guī)則币旧,規(guī)避資源耗盡的風(fēng)險(xiǎn)践险。

說(shuō)明:Executors 返回的線程池對(duì)象的弊端如下:

  • =1) FixedThreadPool 和 SingleThreadPool:
    允許的請(qǐng)求隊(duì)列長(zhǎng)度為 Integer.MAX_VALUE,可能會(huì)堆積大量的請(qǐng)求吹菱,從而導(dǎo)致 OOM巍虫。
  • 2) CachedThreadPool:
    允許的創(chuàng)建線程數(shù)量為 Integer.MAX_VALUE,可能會(huì)創(chuàng)建大量的線程鳍刷,從而導(dǎo)致 OOM占遥。
public class test {


    private static final int CORE_POOL_SIZE = 5;
    private static final int MAX_POOL_SIZE = 10;
    private static final int QUEUE_CAPACITY = 20;
    private static final Long KEEP_ALIVE_TIME = 1L;


    public static void main(String[] args) {

        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(QUEUE_CAPACITY),
                new ThreadPoolExecutor.AbortPolicy()
        );
        for (int i = 0; i < 300; i++) {
            threadPoolExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("當(dāng)前線程:"+ Thread.currentThread().getName());
                    System.out.println("當(dāng)前狀態(tài):"+ Thread.currentThread().getState());
                }
            });
        }
        //終止線程池
        threadPoolExecutor.shutdown();
    }



}

在這里插入圖片描述

因?yàn)槲覀兪褂玫? <font color = "orange">ThreadPoolExecutor.AbortPolicy</font> 的拒絕任務(wù),所以被拋出異常

OOM 案例:

public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(100000);
        System.out.println("開始執(zhí)行");
        for (int i = 0; i < 100000000; i++) {
            executorService.execute(() -> {
                String payload = IntStream.rangeClosed(1, 1000000)
                        .mapToObj(__ -> "a") .collect(Collectors.joining("")) + UUID.randomUUID().toString();
                System.out.println("等待一小時(shí)開始");
                try {
                    TimeUnit.HOURS.sleep(1);
                }catch (Exception e){
                    log.info(payload);
                }
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(1,TimeUnit.HOURS);
    }

在這里插入圖片描述


3.3 源碼分析


線程池狀態(tài):利用低29位表示線程池中線程數(shù)输瓜,通過(guò)高3位表示線程池的運(yùn)行狀態(tài):

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
  • <font color = "orange">RUNNING</font>:運(yùn)行狀態(tài)瓦胎,接受新的任務(wù)并且處理隊(duì)列中的任務(wù)。-
  • <font color = "orange">SHUTDOWN</font>:關(guān)閉狀態(tài)(調(diào)用了 shutdown 方法)前痘。不接受新任務(wù)凛捏,,但是要處理隊(duì)列
    中的任務(wù)。
  • <font color = "orange">STOP</font>:停止?fàn)顟B(tài)(調(diào)用了 shutdownNow 方法)芹缔。不接受新任務(wù)坯癣,也不處理隊(duì)列中的
    任務(wù),并且要中斷正在處理的任務(wù)最欠。
  • <font color = "orange">TIDYING</font>:所有的任務(wù)都已終止了示罗,workerCount 為 0惩猫,線程池進(jìn)入該狀態(tài)后會(huì)調(diào)terminated() 方法進(jìn)入 TERMINATED 狀態(tài)。
  • <font color = "orange">TERMINATED</font>:終止?fàn)顟B(tài)蚜点,terminated() 方法調(diào)用結(jié)束后的狀態(tài)轧房。
//記錄線程池中線程狀態(tài)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

//來(lái)獲取當(dāng)前線程數(shù)量
private static int workerCountOf(int c)  { return c & CAPACITY; }

//工作隊(duì)列
private final BlockingQueue<Runnable> workQueue;



public void execute(Runnable command) {
        //如果任務(wù)為null,則拋出異常
        if (command == null)
            throw new NullPointerException();
        // 取的是記錄線程狀態(tài)
        int c = ctl.get();
        //判斷當(dāng)前線程池中之行的任務(wù)數(shù)量是否小于 corePoolSize
        if (workerCountOf(c) < corePoolSize) {
        //小于的話绍绘,通過(guò)addWorker(command, true)新建一個(gè)線程奶镶,并將任務(wù)(command)添加到該線程中
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //如果當(dāng)前之行的任務(wù)數(shù)量大于等于 corePoolSize 的時(shí)候就會(huì)走到這里
        if (isRunning(c) && workQueue.offer(command)) {
            // 再次獲取線程池狀態(tài),如果線程池狀態(tài)不是 RUNNING 狀態(tài)就需要從任務(wù)隊(duì)列中移除任務(wù)
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 如果當(dāng)前線程池為空就新創(chuàng)建一個(gè)線程并執(zhí)行陪拘。
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 通過(guò)addWorker(command, false)新建一個(gè)線程
        // 如果addWorker(command, false)執(zhí)行失敗厂镇,則通過(guò)reject()執(zhí)行相應(yīng)的拒絕策略的內(nèi)容。
        else if (!addWorker(command, false))
            reject(command);
    }
在這里插入圖片描述

addWorker() 方法



// Lock鎖
private final ReentrantLock mainLock = new ReentrantLock();

// 跟蹤線程池的最大大小
private int largestPoolSize;

// 工作線程集合
private final HashSet<Worker> workers = new HashSet<>();

//獲取線程池狀態(tài)
private static int runStateOf(int c)     { return c & ~CAPACITY; }
    
//判斷線程池的狀態(tài)是否為 Running
private static boolean isRunning(int c) {
      return c < SHUTDOWN;
}

private boolean addWorker(Runnable firstTask, boolean core) {
        // CAS更新線程池?cái)?shù)量
        retry:
        for (;;) {
            //獲取線程池狀態(tài)
            int c = ctl.get();
            int rs = runStateOf(c);

            // 檢查queue是否為空
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                //獲取線程池中線程的數(shù)量
                int wc = workerCountOf(c);
                // core參數(shù)為true的話表明隊(duì)列也滿了左刽,線程池大小變?yōu)?maximumPoolSize 
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // //原子操作將workcount的數(shù)量加1
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // 如果線程的狀態(tài)改變了就再次執(zhí)行上述操作
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }
        // 標(biāo)記工作線程是否啟動(dòng)成功
        boolean workerStarted = false;
        // 標(biāo)記工作線程是否創(chuàng)建成功
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                // 加鎖
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    // rs < SHUTDOWN 如果線程池狀態(tài)依然為RUNNING,并且線程的狀態(tài)是存活的話捺信,就會(huì)將工作線程添加到工作線程集合中
                    // 或者 rs == SHUTDOWN 傳入的firstTask == null 添加新的worker
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        // 更新當(dāng)前工作線程的最大容量
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    // 釋放鎖
                    mainLock.unlock();
                }
                // 如果成功添加工作線程,則調(diào)用Worker內(nèi)部的線程實(shí)例t的Thread#start()方法啟動(dòng)真實(shí)的線程實(shí)例
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 失敗移除
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

runWorker() 方法

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 首先會(huì)通過(guò)run方法執(zhí)行firstTask欠痴,執(zhí)行完畢后會(huì)將task置為null
            // 那么task迄靠!=null的判斷條件肯定不通過(guò),它就會(huì)嘗試通過(guò)getTask()喇辽,從任務(wù)隊(duì)列中獲取任務(wù)掌挚。
            while (task != null || (task = getTask()) != null) {
                // 每一次任務(wù)的執(zhí)行都必須獲取鎖來(lái)保證下方臨界區(qū)代碼的線程安全
                w.lock();
                //如果狀態(tài)值大于等于STOP(狀態(tài)值是有序的,即STOP茵臭、TIDYING疫诽、TERMINATED)且當(dāng)前線程還沒(méi)有被中斷舅世,則主動(dòng)中斷線程
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //執(zhí)行任務(wù)前處理操作旦委,默認(rèn)是一個(gè)空實(shí)現(xiàn);在子類中可以通過(guò)重寫來(lái)改變?nèi)蝿?wù)執(zhí)行前的處理行為
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 任務(wù)后處理
                        afterExecute(task, thrown);
                    }
                } finally {
                    //將task 變?yōu)閚ull,已處理完成
                    task = null;
                    // ++操作雏亚,已完成任務(wù)數(shù)
                    w.completedTasks++;
                    //解鎖
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 銷毀當(dāng)前的worker對(duì)象缨硝,并完成一些諸如完成任務(wù)數(shù)量統(tǒng)計(jì)之類的輔助性工作
            // 在線程池當(dāng)前狀態(tài)小于STOP的情況下會(huì)創(chuàng)建一個(gè)新的worker來(lái)替換被銷毀的worker
            processWorkerExit(w, completedAbruptly);
        }
    }

getTask() 方法

private Runnable getTask() {
        // 通過(guò)timeOut變量表示線程是否空閑時(shí)間超時(shí)了
        boolean timedOut = false; // Did the last poll() time out?

        // 死循環(huán)
        for (;;) {
            // 獲取線程池狀態(tài)
            int c = ctl.get();
            int rs = runStateOf(c);

            // 如果 線程池狀態(tài)>=STOP 則直接減少一個(gè)worker計(jì)數(shù)并返回null
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            //  獲取線程池中的worker計(jì)數(shù)
            int wc = workerCountOf(c);

            // 判斷當(dāng)前線程是否會(huì)被超時(shí)銷毀
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            // 如果當(dāng)前線程數(shù)大于最大線程數(shù) 或者超時(shí) 或者阻塞隊(duì)列為空 減少worker計(jì)數(shù)并返回null
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //從阻塞隊(duì)列中取出一個(gè)任務(wù)(如果隊(duì)列為空會(huì)進(jìn)入阻塞等待狀態(tài))
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                 // 如果線程不等于null,直接返回
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }


四、常用線程池


4.1 FixedThreadPool

  • <font color = "orange">FixedThreadPool</font> : 定長(zhǎng)線程池
public class Test {


    public static void main(String[] args) throws InterruptedException {

        ExecutorService executorService = Executors.newFixedThreadPool(3);
        for (int i = 1; i < 5; i++) {
            executorService.execute(()->{
                System.out.println("當(dāng)前線程"+ Thread.currentThread().getName());
            });
        }
        executorService.shutdown();

    }
}

當(dāng)前線程pool-1-thread-1
當(dāng)前線程pool-1-thread-3
當(dāng)前線程pool-1-thread-2
當(dāng)前線程pool-1-thread-1

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

從上面源代碼可以看出新創(chuàng)建的 <font color = "orange">FixedThreadPool</font> 的 corePoolSize 和 maximumPoolSize 都被設(shè)置為 nThreads, 固定長(zhǎng)度

在這里插入圖片描述

說(shuō)明:

  1. 如果當(dāng)前運(yùn)行的線程數(shù)小于 corePoolSize罢低, 如果再來(lái)新任務(wù)的話查辩,就創(chuàng)建新的線程來(lái)執(zhí)行任務(wù)
  2. 當(dāng)前運(yùn)行的線程數(shù)等于 corePoolSize 后, 如果再來(lái)新任務(wù)的話网持,會(huì)將任務(wù)加入 LinkedBlockingQueue
  3. 線程池中的線程執(zhí)行完 手頭的任務(wù)后宜岛,會(huì)在循環(huán)中反復(fù)從 LinkedBlockingQueue 中獲取任務(wù)來(lái)執(zhí)行

<font color ='red'>弊端 :

<font color = "orange">FixedThreadPool</font> 線程池使用的是 <font color = "orange">LinkedBlockingQueue</font> 無(wú)界隊(duì)列 ,(隊(duì)列的容量為 Intger.MAX_VALUE),在線程任務(wù)多的情況下會(huì)導(dǎo)致 OOM


4.2 SingleThreadExecutor


  • <font color = "orange">SingleThreadExecutor</font> : 一個(gè)單線程化的線程池
public class Test {


    public static void main(String[] args) throws InterruptedException {

        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 1; i < 5; i++) {
            executorService.execute(()->{
                System.out.println("當(dāng)前線程"+ Thread.currentThread().getName());
            });
        }
        executorService.shutdown();

    }
}

當(dāng)前線程pool-1-thread-1
當(dāng)前線程pool-1-thread-3
當(dāng)前線程pool-1-thread-2
當(dāng)前線程pool-1-thread-1

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
           (new ThreadPoolExecutor(1, 1,
                                   0L, TimeUnit.MILLISECONDS,
                                   new LinkedBlockingQueue<Runnable>()));
    }

從上面源代碼可以看出新創(chuàng)建的 <font color = "orange">SingleThreadExecutor</font> 的 corePoolSize 和 maximumPoolSize 都被設(shè)置為 1, 固定長(zhǎng)度,其他參數(shù)和 <font color = "orange">FixedThreadPool</font> 相同功舀。

在這里插入圖片描述

說(shuō)明:

  1. 如果當(dāng)前運(yùn)行的線程數(shù)少于 corePoolSize萍倡,則創(chuàng)建一個(gè)新的線程執(zhí)行任務(wù)
  2. 當(dāng)前線程池中有一個(gè)運(yùn)行的線程后,將任務(wù)加入 LinkedBlockingQueue
  3. 線程執(zhí)行完當(dāng)前的任務(wù)后辟汰,會(huì)在循環(huán)中反復(fù)從LinkedBlockingQueue 中獲取任務(wù)來(lái)執(zhí)行

<font color ='red'>弊端 :

<font color = "orange">SingleThreadExecutor</font> 線程池使用的是 <font color = "orange">LinkedBlockingQueue</font> 無(wú)界隊(duì)列 ,(隊(duì)列的容量為 Intger.MAX_VALUE)列敲,在線程任務(wù)多的情況下會(huì)導(dǎo)致 OOM


4.3 CachedThreadPool

  • <font color = "orange">CachedThreadPool</font> : 緩存線程池
public class Test {


    public static void main(String[] args) throws InterruptedException {

        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 1; i < 5; i++) {
            executorService.execute(()->{
                System.out.println("當(dāng)前線程"+ Thread.currentThread().getName());
            });
        }
        executorService.shutdown();

    }
}

當(dāng)前線程pool-1-thread-1
當(dāng)前線程pool-1-thread-3
當(dāng)前線程pool-1-thread-2
當(dāng)前線程pool-1-thread-1

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

CachedThreadPool 的 corePoolSize 被設(shè)置為空(0)阱佛,maximumPoolSize被設(shè)置為 Integer.MAX.VALUE,即它是無(wú)界的戴而,這也就意味著如果主線程提交任務(wù)的速度高于 maximumPool 中線程處理任務(wù)的速度時(shí)凑术,CachedThreadPool 會(huì)不斷創(chuàng)建新的線程。極端情況下所意,這樣會(huì)導(dǎo)致耗盡 cpu 和內(nèi)存資源淮逊。

在這里插入圖片描述

說(shuō)明:

  1. 首先 SynchronousQueue 是一個(gè)生產(chǎn)消費(fèi)模式等阻塞任務(wù)隊(duì)列,只要有任務(wù)就需要有線程執(zhí)行扶踊,線程池中等線程可以重復(fù)使用

<font color ='red'>弊端 :

<font color = "orange">CachedThreadPool</font> 允許創(chuàng)建的線程數(shù)量為 Integer.MAX_VALUE 壮莹,可能會(huì)創(chuàng)建大量線程,從而導(dǎo)致 OOM姻檀。


4.3 ScheduledThreadPoolExecutor

  • <font color = "orange">ScheduledThreadPoolExecutor</font> : 延遲線程池
public class Test {


    public static void main(String[] args) throws InterruptedException {

        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
        for (int i = 1; i < 5; i++) {
            scheduledExecutorService.schedule(()->{
                System.out.println("當(dāng)前線程"+ Thread.currentThread().getName());
                System.out.println("時(shí)間"+LocalDateTime.now());
            },3,TimeUnit.SECONDS);
        }
        scheduledExecutorService.shutdown();

    }
}

當(dāng)前線程pool-1-thread-1
時(shí)間2021-04-14T00:11:12.125
當(dāng)前線程pool-1-thread-1
時(shí)間2021-04-14T00:11:12.126
當(dāng)前線程pool-1-thread-1
時(shí)間2021-04-14T00:11:12.126
當(dāng)前線程pool-1-thread-1
時(shí)間2021-04-14T00:11:12.126

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

<font color = "orange">ScheduledThreadPoolExecutor</font> 主要用來(lái)在給定的延遲后運(yùn)行任務(wù)命满,或者定期執(zhí)行任務(wù)

<font color = "orange">ScheduledThreadPoolExecutor</font> 使用的任務(wù)隊(duì)列 DelayQueue 封裝了一個(gè) PriorityQueue,PriorityQueue 會(huì)對(duì)隊(duì)列中的任務(wù)進(jìn)行排序绣版,執(zhí)行所需時(shí)間短的放在前面先被執(zhí)行(ScheduledFutureTask 的 time 變量小的先執(zhí)行)胶台,如果執(zhí)行所需時(shí)間相同則先提交的任務(wù)將被先執(zhí)行(ScheduledFutureTask 的 squenceNumber 變量小的先執(zhí)行)。

在這里插入圖片描述

說(shuō)明:

  1. 延遲定時(shí)任務(wù)線程池杂抽,有點(diǎn)像我們的定時(shí)任務(wù)诈唬。同樣,它也是一個(gè)無(wú)限大小的線程池 缩麸,Integer.MAX_VALUE铸磅。它提供的調(diào)用方法比較多,包括:scheduleAtFixedRate杭朱、scheduleWithFixedDelay阅仔,可以按需選擇延遲執(zhí)行方式。




個(gè)人博客地址:http://blog.yanxiaolong.cn ? |<font color = "orange"> 『縱有疾風(fēng)起弧械,人生不言棄』</font>

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末八酒,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子刃唐,更是在濱河造成了極大的恐慌羞迷,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,402評(píng)論 6 499
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件画饥,死亡現(xiàn)場(chǎng)離奇詭異衔瓮,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)抖甘,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,377評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門热鞍,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人,你說(shuō)我怎么就攤上這事碍现》郏” “怎么了?”我有些...
    開封第一講書人閱讀 162,483評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵昼接,是天一觀的道長(zhǎng)爽篷。 經(jīng)常有香客問(wèn)我,道長(zhǎng)慢睡,這世上最難降的妖魔是什么逐工? 我笑而不...
    開封第一講書人閱讀 58,165評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮漂辐,結(jié)果婚禮上泪喊,老公的妹妹穿的比我還像新娘。我一直安慰自己髓涯,他們只是感情好袒啼,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,176評(píng)論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著纬纪,像睡著了一般蚓再。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上包各,一...
    開封第一講書人閱讀 51,146評(píng)論 1 297
  • 那天摘仅,我揣著相機(jī)與錄音,去河邊找鬼问畅。 笑死娃属,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的护姆。 我是一名探鬼主播矾端,決...
    沈念sama閱讀 40,032評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼签则!你這毒婦竟也來(lái)了须床?” 一聲冷哼從身側(cè)響起铐料,我...
    開封第一講書人閱讀 38,896評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤渐裂,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后钠惩,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體柒凉,經(jīng)...
    沈念sama閱讀 45,311評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,536評(píng)論 2 332
  • 正文 我和宋清朗相戀三年篓跛,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了膝捞。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,696評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡愧沟,死狀恐怖蔬咬,靈堂內(nèi)的尸體忽然破棺而出鲤遥,到底是詐尸還是另有隱情,我是刑警寧澤林艘,帶...
    沈念sama閱讀 35,413評(píng)論 5 343
  • 正文 年R本政府宣布盖奈,位于F島的核電站,受9級(jí)特大地震影響狐援,放射性物質(zhì)發(fā)生泄漏钢坦。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,008評(píng)論 3 325
  • 文/蒙蒙 一啥酱、第九天 我趴在偏房一處隱蔽的房頂上張望爹凹。 院中可真熱鬧,春花似錦镶殷、人聲如沸禾酱。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)宇植。三九已至,卻和暖如春埋心,著一層夾襖步出監(jiān)牢的瞬間指郁,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,815評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工拷呆, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留闲坎,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,698評(píng)論 2 368
  • 正文 我出身青樓茬斧,卻偏偏與公主長(zhǎng)得像腰懂,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子项秉,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,592評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容