J.U.C之線程池02:大話線程池ThreadPoolExecutor

ThreadPoolExecutor

ThreadPoolExecutor是Executor執(zhí)行框架最重要的一個實現(xiàn)類仁烹,提供了線程池管理和任務(wù)管理是兩個最基本的能力。這篇通過分析ThreadPoolExecutor的源碼來看看如何設(shè)計和實現(xiàn)一個基于生產(chǎn)者消費者模型的執(zhí)行器注盈。

線程池有多重要

線程是一個程序員一定會涉及到的一個概念晃危,但是線程的創(chuàng)建和切換都是代價比較大的。所以老客,我們有沒有一個好的方案能做到線程的復(fù)用呢僚饭?這就涉及到一個概念——線程池。合理的使用線程池能夠帶來3個很明顯的好處:

1.降低資源消耗:通過重用已經(jīng)創(chuàng)建的線程來降低線程創(chuàng)建和銷毀的消耗

2.提高響應(yīng)速度:任務(wù)到達時不需要等待線程創(chuàng)建就可以立即執(zhí)行胧砰。

3.提高線程的可管理性:線程池可以統(tǒng)一管理材蛛、分配荒揣、調(diào)優(yōu)和監(jiān)控捷沸。

image

ThreadPoolExecutor 設(shè)計原則

線程池就好像生活中的工廠,對于一個工廠最重要的是“訂單”和創(chuàng)建商品的“員工”击罪。對應(yīng)到線程池中就是訂單=任務(wù)員工=worker,

作為工廠必然滿足生產(chǎn)者消費者模型贪薪,工廠接到訂單分解為一個個任務(wù)會放入一個流水線上媳禁,而員工則會從流水線中拿到訂單做處理。處理完畢后重新在流水線上獲取任務(wù)画切。對應(yīng)線程池來說外部線程產(chǎn)生一個任務(wù)調(diào)用線程池處理竣稽,線程池首先會將任務(wù)放入工作隊列,線程池中worker線程不斷從工作隊列取出任務(wù)去處理(有條件無限循環(huán))霍弹,執(zhí)行完畢繼續(xù)從工作隊列獲取毫别,直到線程池關(guān)閉或worker被銷毀。

  1. 生產(chǎn)者是任務(wù)的提交者典格,是外部調(diào)用ThreadPoolExecutor的線程

  2. 工作隊列是一個阻塞隊列的接口岛宦,具體的實現(xiàn)類可以有很多種。

  3. 消費者是封裝了線程的Worker類的集合

image

創(chuàng)建線程池耍缴,需要準備哪些 砾肺?

線程池和工廠一樣首先需要創(chuàng)建出來。而創(chuàng)建工廠需要明確如下準備工作:

  • 工廠生成需要的員工數(shù)量私恬,這里存在2個值债沮,一個是工廠正常運作需要的最少員工數(shù)量我們在線程池中稱為核心work數(shù)量corePoolSize炼吴,一個是工廠最多能夠招聘員工是多少本鸣,我們在線程池中稱為maximumPoolSize,

  • 為了節(jié)省成本有些員工如果長期無法從工作隊列獲取任務(wù)(員工之間存在競爭關(guān)系)硅蹦。那么需要淘汰一定數(shù)量的員工荣德,我們在線程中規(guī)定超過keepAliveTime時間空閑的線程被淘汰。

  • 需要購買一條流水線童芹,我們在線程池中被稱工作隊列workQueue

  • 需要招聘一名HR來雇傭員工涮瞻,我們在線程中稱為threadFactory

  • 需要招聘售前團隊,這里主要是負責(zé)工廠超負苛工作無力承擔(dān)新的訂單時拒絕策略假褪,我們在線程中稱為線程池的拒絕策略handler

我們線程池的工廠在創(chuàng)建之初為了節(jié)約成本并沒有招聘員工署咽,而是等到有訂單任務(wù)時在運作起來。

源碼實現(xiàn)

   /**
     * 創(chuàng)建一個線程池生音,
     * @param corePoolSize 線程池中核心wor線程數(shù)量宁否。
     * @param maximumPoolSize 線程池中允許的最大worker數(shù)量
     * @param keepAliveTime worker線程(非核心線程)空閑的時間,大于此時間是被銷毀
     * @param unit keepAliveTime的單位缀遍。TimeUnit
     * @param workQueue 用來保存等待執(zhí)行的任務(wù)的阻塞隊列
     * @param threadFactory 創(chuàng)建work工廠
     * @param handler 線程池的拒絕策略
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

任務(wù)來了慕匠,如何運作 ?

image

前面說到一個線程需要運作起來需要任務(wù)。當一個訂單被指派到工廠時是如何運作的呢域醇?

1 當員工人數(shù) < corePoolSize台谊,每接到一個任務(wù)就會去雇傭一個新員工來完成這個任務(wù).對于線程池來說就創(chuàng)建一個work標記為core work線程執(zhí)行接收的任務(wù)

2 如果 (員工人數(shù)> corePoolSize) 且(員工人數(shù) < maximumPoolSize),將任務(wù)放入流水線蓉媳,不在雇傭員工。對于線程來說線程會將任務(wù)放入工作隊列锅铅。不在創(chuàng)建新的worker.

3 如果情況2中流水線容量滿了酪呻,說明當前任務(wù)已經(jīng)超負荷。需要雇傭新員工來處理新的任務(wù)盐须。對于線程池來說就創(chuàng)建一個worker來執(zhí)行新任務(wù)

4 如果雇傭的員工已達到上線maximumPoolSize号杠,且流水線容量也滿,則新任務(wù)只好讓售前拒絕丰歌。對于線程池來說就是交給RejectedExecutionHandler處理姨蟋。

[圖片上傳失敗...(image-5cf12c-1562081688936)]

作為一個工廠,也要有生命周期

對于一個工廠伴隨著創(chuàng)建到倒閉都需要經(jīng)歷一個生命周期立帖。對于一個工廠來說有如下幾個狀態(tài)

  • RUNNING:表示工廠正常運行眼溶,對應(yīng)到線程正常運行。

  • SHUTDOWN:表示工廠正常申請倒閉晓勇,這時工廠不接收新的訂單任務(wù)堂飞,對原先接收的訂單任務(wù)(包括還在工作隊列未執(zhí)行,和正在執(zhí)行的)绑咱;對應(yīng)到線程就是不再接收任務(wù)绰筛,但仍然會處理已接收的任務(wù)

  • STOP:表示工廠異常倒閉,這時工廠不僅不再接收新的訂單任務(wù)描融,還會清理流水線中任務(wù)铝噩,對正在處理的任務(wù)進行終止;對應(yīng)到線程池就是停止接收新的任務(wù)窿克,清空工作隊列中的任務(wù)骏庸。同時對work線程提交中斷interrupt

  • TIDYING: 表示工廠已經(jīng)沒有需要執(zhí)行的訂單任務(wù),等待執(zhí)行最后的清理動作年叮;對應(yīng)到線程池表示不在存在任務(wù)具被,等待執(zhí)行terminate函數(shù)(模板方法)

  • TERMINATED:工廠倒閉

image

源碼實現(xiàn)

線程池用一個整數(shù)記錄了線程狀態(tài)work線程數(shù)量
前3位記錄線程池狀態(tài),后29位記錄運行work數(shù)量

/**
     * 用于記錄線程池池的 狀態(tài)和work線程數(shù)量
     * 前3位記錄線程池狀態(tài)
     * 后29位記錄運行work數(shù)量
     */
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

對于線程池不同狀態(tài)對應(yīng)二進制狀態(tài)

RUNNING            -- 對應(yīng)的高3位值是111只损。
SHUTDOWN       -- 對應(yīng)的高3位值是000一姿。
STOP                   -- 對應(yīng)的高3位值是001。
TIDYING              -- 對應(yīng)的高3位值是010跃惫。
TERMINATED     -- 對應(yīng)的高3位值是011叮叹。

具體實現(xiàn)和相關(guān)方法

/**
     * 用于記錄線程池的 狀態(tài)和work線程數(shù)量
     * 前3位記錄線程池狀態(tài)
     * 后29位記錄運行work數(shù)量
     */
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    /** Java 中Integer 類型長度為32位,線程池用一個int類型的前3位表示線程池的狀態(tài)**/
    private static final int COUNT_BITS = Integer.SIZE - 3;

    /** 用來計算出當前線程池狀態(tài)中間變量辈挂,同時也表示work最大數(shù)量
     *  00011111 11111111 11111111 11111111
     **/
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    /** -----------------線程池狀態(tài)----------------- **/

    /**
     * 線程池RUNNING狀態(tài),當前狀態(tài)下線程池可以接收新的任務(wù)衬横,對新接收的任務(wù)進行處理,
     * 工廠正常運行
     *
     * -1 二進制 11111111111111111111111111111111 左移動 29位 前三位 111
     */
    private static final int RUNNING    = -1 << COUNT_BITS;

    /**
     * 線程池SHUTDOWN狀態(tài),當前狀態(tài)下線程池不在接收新任務(wù)终蒂,對之前接收的任務(wù)(其中包括還在隊列等待和正在執(zhí)行的任務(wù))
     *  工廠不在接收新的訂單,工廠運行出現(xiàn)了問題
     *
     *  0 二進制 00000000000000000000000000000000 左移動 29位 前三位 000
     */
    private static final int SHUTDOWN   =  0 << COUNT_BITS;

    /**
     * 線程池STOP狀態(tài),當前狀態(tài)下線程池不在接收新任務(wù)蜂林,對之前接收的任務(wù)存在隊列沒有處理的不在處理遥诉,正在執(zhí)行做中斷
     *  工廠不在接收新的訂單,工廠要倒閉了
     *
     *  1 二進制 00000000000000000000000000000001 左移動 29位 前三位 001
     */
    private static final int STOP       =  1 << COUNT_BITS;

    /**
     * 線程池TIDYING狀態(tài),當前沒有待執(zhí)行的任務(wù),等待執(zhí)行函數(shù)terminated()
     *  工廠走倒閉程序噪叙,需要做最后清理工作
     *
     *  2 二進制 00000000000000000000000000000010 左移動 29位 前三位 010
     */
    private static final int TIDYING    =  2 << COUNT_BITS;

    /**
     * 執(zhí)行函數(shù)terminated()
     *  工廠關(guān)閉
     *  3 二進制 00000000000000000000000000000011 左移動 29位 前三位 011
     */
    private static final int TERMINATED =  3 << COUNT_BITS;

    /** 計算獲取當前線程池狀態(tài) **/
    private static int runStateOf(int c)     { return c & ~CAPACITY; }

    /** 計算獲取當前運行work數(shù)量**/
    private static int workerCountOf(int c)  { return c & CAPACITY; }


    /**
     * 即根據(jù)線程池的狀態(tài)和worker數(shù)量合并成整形 ctl
     */
    private static int ctlOf(int rs, int wc) { return rs | wc; }


    /** 判斷當前線程池是否小于s,c表示當前線程池狀態(tài) **/
    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }

    /** 判斷當前線程池是否大于等于s,c表示當前線程池狀態(tài) **/
    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

    /** 判斷當前線程池是否正在正常運行  RUNNING狀態(tài)**/
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

    /**
     * 使用CAS增加線程池中work數(shù)量(后29位可以直接整數(shù)運算)
     * 成功返回true,失敗返回false
     */
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }

    /**
     * 使用CAS減少線程池中work數(shù)量(后29位可以直接整數(shù)運算)
     * 成功返回true,失敗返回false
     */
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }

    /**
     * 使用CAS減少線程池中work數(shù)量(后29位可以直接整數(shù)運算),失敗則繼續(xù)循環(huán)直到成功
     */
    private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }

流水線 原來是阻塞隊列

前面總是說到工廠是流水線用來放任務(wù)矮锈,那流水線具體有哪些功能呢?流水線在線程池中是一個BlockingQueue睁蕾,一個可以阻塞的隊列苞笨。說具體就是當流水線滿了再放任務(wù)會阻塞,當流水線不存在任務(wù)子眶,work線程去獲取任務(wù)時同樣會阻塞

private final BlockingQueue<Runnable> workQueue;

BlockingQueue API

public interface BlockingQueue<E> extends Queue<E> {
  
    //將元素插入隊列尾部瀑凝,方法在添加失敗(比如隊列已滿)時會報 一些運行時錯誤.
    boolean add(E e);

    //將元素插入隊列尾部臭杰,方法在添加失斣吝洹(比如隊列已滿)時,不會拋出異常渴杆,只會返回false
    boolean offer(E e);

    //插入元素e至隊尾, 如果隊列已滿, 則阻塞調(diào)用線程直到隊列有空閑空間.
    void put(E e) throws InterruptedException;

    //插入元素e至隊列, 如果隊列已滿, 則限時阻塞調(diào)用線程寥枝,直到隊列有空閑空間或超時.
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    //將隊首的元素刪除,隊列為空則拋出異常
    boolean remove(Object o);
    
    //將隊首的元素刪除磁奖,隊列為空則返回null(繼承方法囊拜,方便統(tǒng)一寫在這)
    E poll();

    //從隊首刪除元素,如果隊列為空, 則阻塞調(diào)用線程直到隊列中有元素.
    E take() throws InterruptedException;

    //從隊首刪除元素比搭,如果隊列為空, 則限時阻塞調(diào)用線程冠跷,直到隊列中有元素或超時.
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

    //返回剩余可用容量,沒有容量限制返回Integer.MAX_VALUE
    int remainingCapacity();

    public boolean contains(Object o);
    
    int drainTo(Collection<? super E> c);

    int drainTo(Collection<? super E> c, int maxElements);
}

和Queue接口比較會發(fā)現(xiàn)BlockingQueue接口擴展了4個和阻塞相關(guān)的核心方法:put(e)敢辩、take()蔽莱;offer(e, time, unit)弟疆、poll(time, unit)戚长。總結(jié)如下


image
  • 拋出異常:滿隊列時怠苔,執(zhí)行入隊會拋出異常同廉;空隊列時執(zhí)行出隊會拋出異常 。

  • 返回特殊值:入隊操作會返回布爾值柑司;出隊操作成功返回操作值迫肖,失敗返回空值。

  • 一直阻塞:滿隊列時攒驰,執(zhí)行入隊會進入條件等待隊列蟆湖,線程阻塞;空隊列時玻粪,執(zhí)行出隊會進入條件等待隊列隅津,線程阻塞 诬垂。

  • 超時退出:滿隊列時,執(zhí)行入隊會阻塞生產(chǎn)者線程一段時間伦仍,如果超過一定的時間结窘,生產(chǎn)者線程就會退出。

線程池中工作隊列的策略

無界隊列

可以使用LinkedBlockingQueue(基于鏈表的有界隊列充蓝,F(xiàn)IFO)隧枫,理論上是該隊列可以對無限多的任務(wù)排隊
將導(dǎo)致在所有corePoolSize線程都工作的情況下將新任務(wù)加入到隊列中。這樣谓苟,創(chuàng)建的線程就不會超過corePoolSize官脓,也因此,maximumPoolSize的值也就無效了

有界隊列

可以使用ArrayBlockingQueue(基于數(shù)組結(jié)構(gòu)的有界隊列涝焙,F(xiàn)IFO)确买,并指定隊列的最大長度
使用有界隊列可以防止資源耗盡,但也會造成超過隊列大小和maximumPoolSize后纱皆,提交的任務(wù)被拒絕的問題湾趾,比較難調(diào)整和控制

不排隊,直接提交

將任務(wù)直接交給線程處理而不保持它們派草,可使用SynchronousQueue
如果不存在可用于立即運行任務(wù)的線程(即線程池中的線程都在工作)搀缠,則試圖把任務(wù)加入緩沖隊列將會失敗,因此會構(gòu)造一個新的線程來處理新添加的任務(wù)近迁,并將其加入到線程池中(corePoolSize-->maximumPoolSize擴容)
Executors.newCachedThreadPool()采用的便是這種策略

如何雇傭一個員工

線程池中一個work的本質(zhì)是一個Runnable(線程)

private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable

而ThreadFactory創(chuàng)建一個work線程艺普,本質(zhì)就是創(chuàng)建一個Thead,并將work這個Runnable設(shè)置到其屬性種,啟動這個Thead

public class DefaultThreadFactory implements ThreadFactory{

...省略代碼

@Override
    public Thread newThread(Runnable r) {
        /** 這里r就是work **/
        Thread t = new Thread(r, prefix + nextId.incrementAndGet());
        try {
            if (t.isDaemon()) {
                if (!daemon) {
                    t.setDaemon(false);
                }
            } else {
                if (daemon) {
                    t.setDaemon(true);
                }
            }

            if (t.getPriority() != priority) {
                t.setPriority(priority);
            }
        } catch (Exception ignored) {
            // Doesn't matter even if failed to set.
        }
        return t;
    }
}

為什么要實現(xiàn)AQS同步狀態(tài)鉴竭,作為一個工廠員工同一個時間只能完成一個任務(wù)歧譬。因而需要在其開始工作和結(jié)束工作獲取同步狀態(tài)(加鎖),釋放同步狀態(tài)(解鎖)搏存。

員工又是如何工作的瑰步?

我們知道work是一個Runnable(線程),那么執(zhí)行線程最重要的是run方法。我們來看下work的run方法璧眠,run存在一個有條件的無限循環(huán)缩焦,work會不段獲取任務(wù)執(zhí)行

    /** 工作線程執(zhí)行责静,調(diào)用外部TheadPoolExecutor.runWorker方法  */
    public void run() {
            runWorker(this);
    }
        
        
    final void runWorker(Worker w) {
        /** 獲取當前線程 **/
        Thread wt = Thread.currentThread();
        /** 獲取執(zhí)行任務(wù)**/
        Runnable task = w.firstTask;
        /** 將任務(wù)從work清理 **/
        w.firstTask = null;
        ...省略代碼
        try {
            /**
             * 如果當前work中存在任務(wù)則執(zhí)行袁滥,不存在則從WorkQueue獲取任務(wù)
             * getTask()!=null 時work永遠不停止
             **/
            while (task != null || (task = getTask()) != null) {
                /** 獲取work獨占同步狀態(tài)(表示任務(wù)) **/
                w.lock();
                ...省略代碼
                /** 處理任務(wù) **/
                task.run();
                ...省略代碼
                 /**釋放work獨占同步狀態(tài) **/
                w.unlock();
                ...省略代碼
            }
    }

社會的殘酷灾螃,淘汰機制

社會是殘酷的题翻,當工廠員工多而任務(wù)少,作為老板當然想淘汰一些員工腰鬼,怎么淘汰嵌赠,如果淘汰靴拱?這就需要實現(xiàn)一個淘汰機制。即一個work長期無法獲取任務(wù)時猾普。而何時開啟淘汰機制有2種情況袜炕。

何時開啟淘汰機制

int c = ctl.get();
int rs = runStateOf(c);

/** 是否允許回收核心work線程  **/
private volatile boolean allowCoreThreadTimeOut;

/**  判斷是否需要開啟work淘汰機制 **/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  • 當allowCoreThreadTimeOut=true時,淘汰機制隨時開啟

  • 當allowCoreThreadTimeOut=false時初家,wc>corePoolSize 時淘汰機制才開啟偎窘。

如何實現(xiàn)淘汰

我們知道線程是不可控的。work之所以能夠無限運行是因為那個有條件的無限循環(huán)溜在,如果我們退出那個循環(huán)那么work線程自然銷毀陌知,也就說work被淘汰了。那么那個條件是什么掖肋?對就是getTask()返回null,

那么如何實現(xiàn)超時呢?你應(yīng)該知道BlockingQueue有一個獲取任務(wù)超時的方法
poll仆葡,如果長時間沒有獲取任務(wù)則返回null

/**
     * 從WorkQueue獲取任務(wù)
     * 同時用來判斷work何時退出銷毀
     */
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        /** 無限循環(huán),
         *  當work超過指定時間沒有獲取時志笼,設(shè)置timedOut = true進行二次遍歷時銷毀當前work **/
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            ...省略代碼

            /** 獲取work數(shù)量 **/
            int wc = workerCountOf(c);

            /**  判斷是否需要開啟work淘汰機制 **/
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            /**
             * 以下幾種情況直接銷毀當前work
             * 超時沒有獲取任務(wù)timedOut=tue,for循環(huán)遍歷第二次時
             * 當前任務(wù)超過maximumPoolSize
             * **/
            if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            try {
                /**
                 * 如果開啟work淘汰機制超時獲取任務(wù)沿盅,調(diào)用poll阻塞獲取任務(wù),存在超時纫溃,如果超時沒有獲取到任務(wù)
                 * 設(shè)置timedOut = true 進入第二次循環(huán)銷毀
                 *
                 * 如果沒開啟work淘汰機制超時獲取任務(wù)腰涧,調(diào)用take阻塞獲取任務(wù)
                 * 【這里的阻塞都能被中斷響應(yīng)!紊浩!】
                 **/
                Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }          

售前到底在做啥

private volatile RejectedExecutionHandler handler; 

  • (1)AbortPolicy:直接拋出異常窖铡,默認策略;

  • (2)CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù)坊谁;

  • (3)DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務(wù)费彼,并執(zhí)行當前任務(wù);

  • (4)DiscardPolicy:直接丟棄任務(wù)口芍;

生意不好做箍铲,工廠如何倒閉

我們知道線程不不比員工,你說讓結(jié)束手頭工作就能結(jié)束的阶界。當你調(diào)用shutdown()時虹钮,需要保證每個員工都退出的。說不定很可能他還在傻傻在任務(wù)隊列那等待呢膘融。

既然要退出還是要退出那個有條件無限循環(huán)。退出還是要找到那個getTask()返回null,現(xiàn)在我們來看看之前那些省略的代碼祭玉。當線程池狀態(tài)不為運行狀態(tài)時會返回null.

 /**
     * 從WorkQueue獲取任務(wù)
     * 同時用來判斷work何時退出銷毀
     */
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        /** 無限循環(huán)氧映,
         *  當work超過指定時間沒有獲取時,設(shè)置timedOut = true進行二次遍歷時銷毀當前work **/
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            /** 線程池中狀態(tài) >= STOP 或者 線程池狀態(tài) == SHUTDOWN且阻塞隊列為空脱货,則停止worker - 1岛都,return null **/
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
             ...省略代碼
            try {
                /**
                 * 如果開啟work淘汰機制超時獲取任務(wù)律姨,調(diào)用poll阻塞獲取任務(wù),存在超時臼疫,如果超時沒有獲取到任務(wù)
                 * 設(shè)置timedOut = true 進入第二次循環(huán)銷毀
                 *
                 * 如果沒開啟work淘汰機制超時獲取任務(wù)择份,調(diào)用take阻塞獲取任務(wù)
                 * 【這里的阻塞都能被中斷響應(yīng)!烫堤!】
                 **/
                Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

那如果work線程在工作隊列那阻塞呢荣赶?這時就要中斷該線程了。注意了workQueue.poll是可以響應(yīng)中斷的哦鸽斟!

線程擴展

子類實現(xiàn)擴展

    /**
     * 模板方法給子類實現(xiàn)拔创,執(zhí)行任務(wù)前的操作
     */
    protected void beforeExecute(Thread t, Runnable r) { }

    /**
     * 模板方法給子類實現(xiàn),執(zhí)行任務(wù)后的操作
     */
    protected void afterExecute(Runnable r, Throwable t) { }

    /**
     * 模板方法給子類實現(xiàn)富蓄,線程池狀態(tài)從TIDYING到TERMINATED需要做的清理動作
     */
    protected void terminated() { }

工作隊列擴展

可以修改offer方法邏輯在特定時候返回false剩燥,從而達到如果 (員工人數(shù)> corePoolSize) 且(員工人數(shù) < maximumPoolSize)依舊可以雇傭新員工(創(chuàng)建work)

Executors靜態(tài)工廠創(chuàng)建幾種常用線程池

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
 
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}

創(chuàng)建一個指定工作線程數(shù)的線程池,其中參數(shù) corePoolSize 和 maximumPoolSize 相等立倍,阻塞隊列基于LinkedBlockingQueue

它是一個典型且優(yōu)秀的線程池灭红,它具有線程池提高程序效率和節(jié)省創(chuàng)建線程時所耗的開銷的優(yōu)點。但是在線程池空閑時口注,即線程池中沒有可運行任務(wù)時比伏,它也不會釋放工作線程,還會占用一定的系統(tǒng)資源

newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
 
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>(),
                                threadFactory));
}

創(chuàng)建一個可緩存工作線程的線程池疆导,默認存活時間60秒赁项,線程池的線程數(shù)可達到Integer.MAX_VALUE,即2147483647澈段,內(nèi)部使用SynchronousQueue作為阻塞隊列悠菜;

在沒有任務(wù)執(zhí)行時,當線程的空閑時間超過keepAliveTime败富,則工作線程將會終止悔醋,當提交新任務(wù)時,如果沒有空閑線程兽叮,則創(chuàng)建新線程執(zhí)行任務(wù)芬骄,會導(dǎo)致一定的系統(tǒng)開銷

newScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
 
public static ScheduledExecutorService newScheduledThreadPool(
        int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

初始化的線程池可以在指定的時間內(nèi)周期性的執(zhí)行所提交的任務(wù),在實際的業(yè)務(wù)場景中可以使用該線程池定期的同步數(shù)據(jù)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末鹦聪,一起剝皮案震驚了整個濱河市账阻,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌泽本,老刑警劉巖淘太,帶你破解...
    沈念sama閱讀 212,718評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異,居然都是意外死亡蒲牧,警方通過查閱死者的電腦和手機撇贺,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,683評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來冰抢,“玉大人,你說我怎么就攤上這事翠订≡坦欤” “怎么了橙弱?”我有些...
    開封第一講書人閱讀 158,207評論 0 348
  • 文/不壞的土叔 我叫張陵棘脐,是天一觀的道長蛀缝。 經(jīng)常有香客問我屈梁,道長在讶,這世上最難降的妖魔是什么构哺? 我笑而不...
    開封第一講書人閱讀 56,755評論 1 284
  • 正文 為了忘掉前任曙强,我火速辦了婚禮途茫,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘臀防。我一直安慰自己袱衷,他們只是感情好笑窜,可當我...
    茶點故事閱讀 65,862評論 6 386
  • 文/花漫 我一把揭開白布嫌蚤。 她就那樣靜靜地躺著脱吱,像睡著了一般箱蝠。 火紅的嫁衣襯著肌膚如雪宦搬。 梳的紋絲不亂的頭發(fā)上间校,一...
    開封第一講書人閱讀 50,050評論 1 291
  • 那天页慷,我揣著相機與錄音滓彰,去河邊找鬼欲逃。 笑死稳析,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的彰居。 我是一名探鬼主播诚纸,決...
    沈念sama閱讀 39,136評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼陈惰,長吁一口氣:“原來是場噩夢啊……” “哼畦徘!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起井辆,我...
    開封第一講書人閱讀 37,882評論 0 268
  • 序言:老撾萬榮一對情侶失蹤关筒,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后蒸播,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,330評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,651評論 2 327
  • 正文 我和宋清朗相戀三年亲铡,在試婚紗的時候發(fā)現(xiàn)自己被綠了才写。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,789評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡锭硼,死狀恐怖房资,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情檀头,我是刑警寧澤轰异,帶...
    沈念sama閱讀 34,477評論 4 333
  • 正文 年R本政府宣布,位于F島的核電站暑始,受9級特大地震影響搭独,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜廊镜,卻給世界環(huán)境...
    茶點故事閱讀 40,135評論 3 317
  • 文/蒙蒙 一牙肝、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧嗤朴,春花似錦配椭、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,864評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至吱雏,卻和暖如春敦姻,著一層夾襖步出監(jiān)牢的瞬間瘾境,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,099評論 1 267
  • 我被黑心中介騙來泰國打工镰惦, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留迷守,地道東北人。 一個月前我還...
    沈念sama閱讀 46,598評論 2 362
  • 正文 我出身青樓陨献,卻偏偏與公主長得像盒犹,于是被迫代替她去往敵國和親懂更。 傳聞我的和親對象是個殘疾皇子眨业,可洞房花燭夜當晚...
    茶點故事閱讀 43,697評論 2 351

推薦閱讀更多精彩內(nèi)容