Java并發(fā)編程-線(xiàn)程池

1.簡(jiǎn)介

如果并發(fā)的線(xiàn)程數(shù)量很多,并且每個(gè)線(xiàn)程都是執(zhí)行一個(gè)時(shí)間很短的任務(wù)就結(jié)束了笋除,這樣頻繁創(chuàng)建線(xiàn)程就會(huì)大大降低系統(tǒng)的效率听隐,因?yàn)轭l繁創(chuàng)建線(xiàn)程和銷(xiāo)毀線(xiàn)程需要時(shí)間织咧。那么有沒(méi)有一種辦法使得線(xiàn)程可以復(fù)用幸乒,就是執(zhí)行完一個(gè)任務(wù)懦底,并不被銷(xiāo)毀,而是可以繼續(xù)執(zhí)行其他的任務(wù)罕扎,在Java中可以通過(guò)線(xiàn)程池來(lái)達(dá)到這樣的效果基茵。

2.Java中的ThreadPoolExecutor類(lèi)

線(xiàn)程池相關(guān)類(lèi):ThreadPoolExecutor、AbstractExecutorService壳影、ExecutorService、Executor弥臼、Executors(線(xiàn)程池工廠類(lèi))
幾個(gè)類(lèi)的關(guān)系如下:
(1)ThreadPoolExecutor extends AbstractExecutorService
(2)AbstractExecutorService implements ExecutorService
(3)public interface ExecutorService extends Executor
(4)Executors(線(xiàn)程池工廠類(lèi))宴咧,Exectors工廠類(lèi)提供了線(xiàn)程池的初始化接口,可以初始化四種不同的線(xiàn)程池。
Executor是一個(gè)接口径缅,它是Executor框架的基礎(chǔ)掺栅,它將任務(wù)的提交和任務(wù)的執(zhí)行分離開(kāi)來(lái)烙肺。Android中的線(xiàn)程池來(lái)源于Java中的Executor、Executor是一個(gè)接口氧卧,真正的線(xiàn)程池實(shí)現(xiàn)是在ThreadPoolExecutor桃笙,ThreadPoolExecutor提供一系列參數(shù)來(lái)配置線(xiàn)程池,通過(guò)不同的參數(shù)可以創(chuàng)建不同的線(xiàn)程池沙绝,線(xiàn)程池主要分為4類(lèi)搏明,這四類(lèi)線(xiàn)程池可以通過(guò)Executors所提供的工廠方法來(lái)的得到。線(xiàn)程池都是直接或者間接通過(guò)配置ThreadPoolExecutor來(lái)實(shí)現(xiàn)的闪檬。從線(xiàn)程池的上層API來(lái)看星著,再多種的線(xiàn)程池,無(wú)非是參數(shù)的不同粗悯,讓它們呈現(xiàn)出了不同的特性虚循。那么這些特性到底依賴(lài)什么樣的原理實(shí)現(xiàn),就更值得去深究样傍。ThreadPoolExecutor實(shí)現(xiàn)了線(xiàn)程池所需的最小功能集横缔,已能hold住很多場(chǎng)景。
java.uitl.concurrent.ThreadPoolExecutor類(lèi)是線(xiàn)程池中最核心的一個(gè)類(lèi)衫哥,因此如果要透徹地了解Java中的線(xiàn)程池茎刚,必須先了解這個(gè)類(lèi)。線(xiàn)程池都是直接或者間接通過(guò)配置ThreadPoolExecutor來(lái)實(shí)現(xiàn)的炕檩,并且ThreadPoolExecutor有四個(gè)構(gòu)造函數(shù)斗蒋,通過(guò)觀察每個(gè)構(gòu)造器的源碼具體實(shí)現(xiàn),發(fā)現(xiàn)前面三個(gè)構(gòu)造器都是調(diào)用的第四個(gè)構(gòu)造器進(jìn)行的初始化工作笛质。

    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
        BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);

ThreadPoolExecutor共七個(gè)參數(shù):核心線(xiàn)程數(shù)泉沾,最大線(xiàn)程數(shù),keepAliveTime妇押,以及keepAliveTime時(shí)間單位跷究,阻塞隊(duì)列、線(xiàn)程工廠敲霍、拒絕策略俊马。
我們先看下四種不同的線(xiàn)程池相關(guān)參數(shù):
FixThreadPool

    //適用于負(fù)載比較重的服務(wù)器。
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();
    public static ThreadFactory defaultThreadFactory() {
        return new DefaultThreadFactory();
    }

線(xiàn)程數(shù):固定數(shù)量的線(xiàn)程肩杈,核心線(xiàn)程和最大線(xiàn)程數(shù)一樣
阻塞隊(duì)列:LinkedBlockingQueue基于鏈表結(jié)構(gòu)的阻塞隊(duì)列柴我,按FIFO排序任務(wù),吞吐量通常要高于ArrayBlockingQuene扩然;

    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

線(xiàn)程工廠ThreadFactory:默認(rèn)的線(xiàn)程工廠艘儒;
拒絕策略:ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常
CachedThreadPool:大小無(wú)界的線(xiàn)程池,適用于執(zhí)行很多短期異步任務(wù)的小程序,或者負(fù)載較輕的服務(wù)器界睁。阻塞隊(duì)列:SynchronousQueue一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列觉增,每個(gè)插入操作必須等到另一個(gè)線(xiàn)程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài)翻斟,吞吐量通常要高于LinkedBlockingQuene逾礁;線(xiàn)程工廠和拒絕策略與FixThreadPool相同。
SingleThreadExecutor:適用于需要保證任務(wù)順序執(zhí)行的各個(gè)任務(wù)访惜。核心線(xiàn)程和最大線(xiàn)程都是1嘹履,阻塞隊(duì)列LinkedBlockingQueue:LinkedBlockingQueue基于鏈表結(jié)構(gòu)的阻塞隊(duì)列,阻塞隊(duì)列大小為Integer.MAX_VALUE。線(xiàn)程工廠和拒絕策略也和FixThreadPool一樣疾牲。
ScheduledThreadPoolExecutor:適用于需要多個(gè)后臺(tái)線(xiàn)程執(zhí)行周期任務(wù)植捎。阻塞隊(duì)列:DelayedWorkQueue,默認(rèn)大小是16阳柔。線(xiàn)程工廠和拒絕和FixThreadPool一樣焰枢。
構(gòu)造函數(shù)中各參數(shù)含義:
corePoolSize:線(xiàn)程池中的核心線(xiàn)程數(shù),當(dāng)提交一個(gè)任務(wù)時(shí)舌剂,線(xiàn)程池創(chuàng)建一個(gè)新線(xiàn)程執(zhí)行任務(wù)济锄,直到當(dāng)前線(xiàn)程數(shù)等于corePoolSize;如果當(dāng)前線(xiàn)程數(shù)為corePoolSize霍转,繼續(xù)提交的任務(wù)被保存到阻塞隊(duì)列中荐绝,等待被執(zhí)行;如果執(zhí)行了線(xiàn)程池的prestartAllCoreThreads()方法避消,線(xiàn)程池會(huì)提前創(chuàng)建并啟動(dòng)所有核心線(xiàn)程低滩。
maximumPoolSize:線(xiàn)程池中允許的最大線(xiàn)程數(shù)。如果當(dāng)前阻塞隊(duì)列滿(mǎn)了岩喷,且繼續(xù)提交任務(wù)恕沫,則創(chuàng)建新的線(xiàn)程執(zhí)行任務(wù),前提是當(dāng)前線(xiàn)程數(shù)小于maximumPoolSize纱意;
keepAliveTime:線(xiàn)程空閑時(shí)的存活時(shí)間婶溯,即當(dāng)線(xiàn)程沒(méi)有任務(wù)執(zhí)行時(shí),繼續(xù)存活的時(shí)間偷霉;默認(rèn)情況下迄委,該參數(shù)只在線(xiàn)程數(shù)大于corePoolSize時(shí)才有用;
unit:keepAliveTime的單位类少;
阻塞隊(duì)列workQueue
四種線(xiàn)程池用到了三種阻塞隊(duì)列:LinkedBlockingQueue叙身、SynchronousQueue、DelayedWorkQueue硫狞。阻塞隊(duì)列用來(lái)存儲(chǔ)等待執(zhí)行的任務(wù)信轿。四種線(xiàn)程池用到的線(xiàn)程工廠和拒絕策略都一樣赞警,都是默認(rèn)的。
JDK提供的阻塞隊(duì)列有:
(1)ArrayBlockingQueue:基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列虏两,按FIFO排序任務(wù);基于數(shù)組的先進(jìn)先出隊(duì)列世剖,此隊(duì)列創(chuàng)建時(shí)必須指定大卸ò铡;
(2)LinkedBlockingQuene:基于鏈表結(jié)構(gòu)的阻塞隊(duì)列旁瘫,按FIFO排序任務(wù)祖凫,吞吐量通常要高于ArrayBlockingQuene;基于鏈表的先進(jìn)先出隊(duì)列酬凳,如果創(chuàng)建時(shí)沒(méi)有指定此隊(duì)列大小惠况,則默認(rèn)為Integer.MAX_VALUE;
(3)SynchronousQuene:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列宁仔,每個(gè)插入操作必須等到另一個(gè)線(xiàn)程調(diào)用移除操作稠屠,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于LinkedBlockingQuene翎苫;這個(gè)隊(duì)列比較特殊权埠,它不會(huì)保存提交的任務(wù),而是將直接新建一個(gè)線(xiàn)程來(lái)執(zhí)行新來(lái)的任務(wù)煎谍。
(4)PriorityBlockingQuene:具有優(yōu)先級(jí)的無(wú)界阻塞隊(duì)列攘蔽;線(xiàn)程池的排隊(duì)策略與BlockingQueue有關(guān)。 其中ArrayBlockQueue和LinkedBlockingQueue都可以指定初始容量呐粘。
線(xiàn)程工廠threadFactory:可以使用Executors中默認(rèn)線(xiàn)程工廠满俗,也可以通過(guò)自定義的線(xiàn)程工廠可以給每個(gè)新建的線(xiàn)程設(shè)置一個(gè)具有識(shí)別度的線(xiàn)程名。
線(xiàn)程池拒絕策略handler:線(xiàn)程池的飽和策略作岖,當(dāng)阻塞隊(duì)列滿(mǎn)了唆垃,且沒(méi)有空閑的工作線(xiàn)程,如果繼續(xù)提交任務(wù)鳍咱,必須采取一種策略處理該任務(wù)降盹,線(xiàn)程池提供了4種策略:
(1)AbortPolicy:直接拋出異常,默認(rèn)策略谤辜;
(2)CallerRunsPolicy:用調(diào)用者所在的線(xiàn)程來(lái)執(zhí)行任務(wù)蓄坏;
(3)DiscardOldestPolicy:丟棄阻塞隊(duì)列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù)丑念;
(4)DiscardPolicy:直接丟棄任務(wù)涡戳;
當(dāng)然也可以根據(jù)應(yīng)用場(chǎng)景實(shí)現(xiàn)RejectedExecutionHandler接口,自定義飽和策略脯倚,如記錄日志或持久化存儲(chǔ)不能處理的任務(wù)渔彰。
綜合以上嵌屎,線(xiàn)程池阻塞隊(duì)列、線(xiàn)程工廠恍涂、線(xiàn)程拒絕策略都可以自定義宝惰,根據(jù)自己的需求進(jìn)行自定義。線(xiàn)程池各個(gè)參數(shù)都可以進(jìn)行自定義再沧。ThreadFactory和拒絕策略都很容易進(jìn)行自定義尼夺。
如何存儲(chǔ)線(xiàn)程池狀態(tài)以及線(xiàn)程數(shù)量(5種狀態(tài)和線(xiàn)程個(gè)數(shù))?

public class ThreadPoolExecutor extends AbstractExecutorService {
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 值為29    Integer.SIZE=32
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 高三位全為0炒瘸,低29位全為1淤堵,因此線(xiàn)程數(shù)量的表示范圍為 0 ~ 2^29
    //1左移29位然后再減1,這樣后29全為1顷扩,所以線(xiàn)程池中線(xiàn)程最大個(gè)數(shù)為2的29次數(shù)拐邪。
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl  獲取線(xiàn)程池狀態(tài),參數(shù)C為ctl最新?tīng)顟B(tài)值。 ~CAPACITY 高三位為111隘截,低29為全為0.這樣就可以得到線(xiàn)程池狀態(tài)扎阶。 
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    //獲取工作線(xiàn)程數(shù)量, 參數(shù)C為ctl最新?tīng)顟B(tài)值  CAPACITY 高三位為000,低29全為1技俐,按位與就可以得到當(dāng)前線(xiàn)程個(gè)數(shù)乘陪。 
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }
}

ctl用來(lái)控制線(xiàn)程池的狀態(tài),并用來(lái)表示線(xiàn)程池線(xiàn)程數(shù)量雕擂。ctl類(lèi)型為AtomicInteger啡邑,那用一個(gè)基礎(chǔ)如何表示以上五種狀態(tài)以及線(xiàn)程池工作線(xiàn)程數(shù)量呢?int型變量占用4字節(jié)井赌,共32位谤逼,因此采用位表示,可以解決上述問(wèn)題仇穗。5種狀態(tài)使用5種數(shù)值進(jìn)行表示流部,需要占用3位,余下的29位就可以用來(lái)表示線(xiàn)程數(shù)纹坐。因此枝冀,高三位表示進(jìn)程狀態(tài),低29位為線(xiàn)程數(shù)量耘子,ctl是原子操作類(lèi)果漾,進(jìn)行自增自減時(shí)都是線(xiàn)程安全的,原子操作類(lèi)是基于CAS實(shí)現(xiàn)的谷誓。采用了int分位表示線(xiàn)程池狀態(tài)和線(xiàn)程數(shù)量绒障,如何獲取線(xiàn)程狀態(tài)與數(shù)量見(jiàn)代碼注釋。
在Java讀寫(xiě)鎖中捍歪,讀寫(xiě)鎖狀態(tài)也是使用一個(gè)int來(lái)標(biāo)識(shí)讀寫(xiě)鎖不同狀態(tài)户辱,高16位中存儲(chǔ)讀鎖狀態(tài)鸵钝,低16位存儲(chǔ)寫(xiě)鎖狀態(tài)。
如果調(diào)用了shutdown()方法庐镐,則線(xiàn)程池處于SHUTDOWN狀態(tài)恩商,此時(shí)線(xiàn)程池不能夠接受新的任務(wù),它會(huì)等待所有任務(wù)執(zhí)行完畢必逆;如果調(diào)用了shutdownNow()方法痕届,則線(xiàn)程池處于STOP狀態(tài),此時(shí)線(xiàn)程池不能接受新的任務(wù)末患,并且會(huì)去嘗試終止正在執(zhí)行的任務(wù);當(dāng)線(xiàn)程池處于SHUTDOWN或STOP狀態(tài)锤窑,并且所有工作線(xiàn)程已經(jīng)銷(xiāo)毀璧针,任務(wù)緩存隊(duì)列已經(jīng)清空或執(zhí)行結(jié)束后,線(xiàn)程池被設(shè)置為T(mén)ERMINATED狀態(tài)渊啰。

3.任務(wù)提交與任務(wù)執(zhí)行

線(xiàn)程池框架提供了兩種方式提交任務(wù)探橱,根據(jù)不同的業(yè)務(wù)需求選擇不同的方式。通過(guò)Executor.execute()方法提交的任務(wù)绘证,必須實(shí)現(xiàn)Runnable接口隧膏,該方式提交的任務(wù)不能獲取返回值,因此無(wú)法判斷任務(wù)是否執(zhí)行成功嚷那。通過(guò)ExecutorService.submit()方法提交的任務(wù)胞枕,可以獲取任務(wù)執(zhí)行完的返回值。用于提交需要返回值的任務(wù)魏宽,線(xiàn)程池會(huì)返回一個(gè)future類(lèi)型的對(duì)象腐泻,通過(guò)future對(duì)象可以判斷任務(wù)是否執(zhí)行成功,F(xiàn)uture接口和實(shí)現(xiàn)Future接口的FutureTask類(lèi)队询,代表異步計(jì)算結(jié)果派桩。任務(wù)執(zhí)行:我們以Executor.execute()方法提交的任務(wù)為例,對(duì)任務(wù)執(zhí)行進(jìn)行介紹蚌斩,該部分是線(xiàn)程池的核心铆惑,代碼如下,相關(guān)內(nèi)容在代碼中已添加注釋送膳。

public void execute(Runnable command) {
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        //workerCountOf方法根據(jù)ctl的低29位员魏,得到線(xiàn)程池的當(dāng)前線(xiàn)程數(shù),如果線(xiàn)程數(shù)小于corePoolSize肠缨,則執(zhí)行addWorker方法創(chuàng)建新的線(xiàn)程(核心線(xiàn)程)執(zhí)行任務(wù)
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //如果線(xiàn)程池處于RUNNING狀態(tài)逆趋,且把提交的任務(wù)成功放入阻塞隊(duì)列中
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //再次檢查線(xiàn)程池的狀態(tài),如果線(xiàn)程池沒(méi)有RUNNING晒奕,且成功從阻塞隊(duì)列中刪除任務(wù)闻书,則執(zhí)行reject方法處理任務(wù)名斟;
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //當(dāng)前線(xiàn)程個(gè)數(shù)是0, 則執(zhí)行addWorker方法創(chuàng)建新的線(xiàn)程(非核心線(xiàn)程)執(zhí)行任務(wù)
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false); 
        }
        //執(zhí)行addWorker方法創(chuàng)建新的線(xiàn)程執(zhí)行任務(wù)魄眉,如果addWoker執(zhí)行失敗砰盐,則執(zhí)行reject方法處理任務(wù)
        //如果任務(wù)隊(duì)列已滿(mǎn),并且核心線(xiàn)程都已經(jīng)啟動(dòng)坑律,則創(chuàng)建非核心線(xiàn)程去執(zhí)行任務(wù)岩梳,如果創(chuàng)建非核心線(xiàn)程失敗,則拒絕任務(wù)
        else if (!addWorker(command, false))
            reject(command);
    }
    //addWorker實(shí)現(xiàn)
    //從方法execute的實(shí)現(xiàn)可以看出:addWorker主要負(fù)責(zé)創(chuàng)建新的線(xiàn)程并執(zhí)行任務(wù)
   private boolean addWorker(Runnable firstTask, boolean core) {
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //線(xiàn)程池的工作線(xiàn)程通過(guò)Woker類(lèi)實(shí)現(xiàn)晃择,當(dāng)前提交的任務(wù)firstTask作為參數(shù)傳入Worker的構(gòu)造方法冀值;
            //Worker實(shí)現(xiàn)了Runnable接口,可以將自身作為一個(gè)任務(wù)在工作線(xiàn)程中執(zhí)行宫屠;
            w = new Worker(firstTask);
            //w.thread在Worker構(gòu)造函數(shù)中使用ThreadFactory創(chuàng)建線(xiàn)程列疗,this.thread = getThreadFactory().newThread(this);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //執(zhí)行start方法啟動(dòng)線(xiàn)程thread時(shí),本質(zhì)是執(zhí)行了Worker的run方法啟動(dòng)線(xiàn)程浪蹂,然后調(diào)用runWorker()方法抵栈。
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    //runWorker()方法
    //runWorker方法是線(xiàn)程池的核心
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //firstTask執(zhí)行完成之后,通過(guò)getTask方法從阻塞隊(duì)列中獲取等待的任務(wù)坤次,如果隊(duì)列中沒(méi)有任務(wù)古劲,getTask方法會(huì)被阻塞并掛起,不會(huì)占用cpu資源缰猴;
            //線(xiàn)程復(fù)用就是在線(xiàn)程執(zhí)行完之后产艾,不斷的從任務(wù)隊(duì)列中取新的任務(wù),直到線(xiàn)程池中的全部任務(wù)執(zhí)行完滑绒,以此來(lái)完成線(xiàn)程復(fù)用
            //然后當(dāng)線(xiàn)程沒(méi)有任務(wù)執(zhí)行時(shí),超過(guò)存活時(shí)間之后胰舆,線(xiàn)程終止。
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //在執(zhí)行任務(wù)的前后蹬挤,可以根據(jù)業(yè)務(wù)場(chǎng)景自定義beforeExecute和afterExecute方法缚窿;
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //執(zhí)行通過(guò)Executor.execute()方法提交的任務(wù)
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //在執(zhí)行任務(wù)的前后,可以根據(jù)業(yè)務(wù)場(chǎng)景自定義beforeExecute和afterExecute方法焰扳;
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 線(xiàn)程池已沒(méi)有任務(wù)了倦零,工作線(xiàn)程達(dá)到了可退出的狀態(tài),Worker退出
            processWorkerExit(w, completedAbruptly);
        }
    }

    //getTask()方法
    private Runnable getTask() {
        try {
            //workQueue.take:如果阻塞隊(duì)列為空吨悍,當(dāng)前線(xiàn)程會(huì)被掛起等待扫茅;當(dāng)隊(duì)列中有任務(wù)加入時(shí),線(xiàn)程被喚醒育瓜,take方法返回任務(wù)葫隙,并執(zhí)行;
            //workQueue.poll:如果在keepAliveTime時(shí)間內(nèi)躏仇,阻塞隊(duì)列還是沒(méi)有任務(wù)恋脚,則返回null腺办;
            //所以,線(xiàn)程池中實(shí)現(xiàn)的線(xiàn)程可以一直執(zhí)行由用戶(hù)提交的任務(wù)糟描。
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

4.線(xiàn)程池原理

以上我們已經(jīng)對(duì)線(xiàn)程池中參數(shù)和執(zhí)行流程進(jìn)行了講解怀喉,下面通過(guò)示例來(lái)總結(jié)一下線(xiàn)程池工作原理。
假設(shè):corePoolSize=5船响,maxPoolSize=10躬拢,blockQueueSize=10,依次提交6個(gè)比較耗時(shí)的任務(wù)见间,線(xiàn)程池是如何執(zhí)行的聊闯?
答:依次提交6個(gè)耗時(shí)任務(wù),可以理解為正在執(zhí)行的線(xiàn)程都沒(méi)執(zhí)行完米诉,前面提交的5個(gè)任務(wù)馅袁,肯定有coresize=5的5個(gè)線(xiàn)程全部執(zhí)行,第6個(gè)任務(wù)是繼續(xù)創(chuàng)建線(xiàn)程呢荒辕,還是放在隊(duì)列里?第6個(gè)任務(wù)會(huì)放在隊(duì)列里够颠,等待前面5個(gè)任務(wù)有某一個(gè)執(zhí)行完之后勇皇,再執(zhí)行丑孩,maxsize線(xiàn)程只有阻塞隊(duì)列滿(mǎn)之后才會(huì)繼續(xù)創(chuàng)建線(xiàn)程,然后將任務(wù)隊(duì)列中的所有任務(wù)執(zhí)行完之后李皇,非核心線(xiàn)程的消息時(shí)間達(dá)到才會(huì)關(guān)閉線(xiàn)程,但是五個(gè)核心線(xiàn)程是不會(huì)被回收掉的宙枷。
核心點(diǎn):提交任務(wù)時(shí)掉房,如果當(dāng)前線(xiàn)程個(gè)數(shù)小于核心線(xiàn)程數(shù),那么提交新任務(wù)就會(huì)創(chuàng)建新的線(xiàn)程慰丛,如果阻塞隊(duì)列還沒(méi)有滿(mǎn)就不會(huì)創(chuàng)建非核心線(xiàn)程卓囚,非核心線(xiàn)程的創(chuàng)建條件是任務(wù)隊(duì)列必須滿(mǎn),并且有任務(wù)诅病。

5.線(xiàn)程池相關(guān)問(wèn)題

(1)非核心線(xiàn)程延遲死亡哪亿,如何實(shí)現(xiàn)?
通過(guò)阻塞隊(duì)列poll()贤笆,讓線(xiàn)程阻塞等待一段時(shí)間蝇棉,如果沒(méi)有取到任務(wù),則線(xiàn)程死亡芥永。

                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();

(2)核心線(xiàn)程為什么不死篡殷?
線(xiàn)程池里的線(xiàn)程從阻塞隊(duì)列里拿任務(wù),如果存在非核心線(xiàn)程埋涧,假設(shè)阻塞隊(duì)列里沒(méi)有任務(wù)板辽,那么非核心線(xiàn)程也要在等到keepAliveTime時(shí)間后才會(huì)釋放奇瘦。如果當(dāng)前僅有核心線(xiàn)程存在,如果允許釋放核心線(xiàn)程的話(huà)戳气,也就和非核線(xiàn)程的處理方式一樣链患,反之,則通過(guò)take()一直阻塞直到拿到任務(wù)瓶您,這也就是線(xiàn)程池里的核心線(xiàn)程為什么不死的原因麻捻。
(3)如何釋放核心線(xiàn)程?
將allowCoreThreadTimeOut設(shè)置為true呀袱。
(4)核心線(xiàn)程和非核心線(xiàn)程區(qū)別贸毕?
并沒(méi)有發(fā)現(xiàn)有明顯的標(biāo)志來(lái)標(biāo)志核心線(xiàn)程與非核心線(xiàn)程,而是以線(xiàn)程數(shù)來(lái)表達(dá)線(xiàn)程身份夜赵。0 ~ corePoolSize 表示線(xiàn)程池里只有核心線(xiàn)程明棍,corePoolSize ~ maximumPoolSize 表示線(xiàn)程池里核心線(xiàn)程滿(mǎn),存在非核心線(xiàn)程寇僧。線(xiàn)程池實(shí)際并不區(qū)分核心線(xiàn)程與非核心線(xiàn)程摊腋,是根據(jù)當(dāng)前的總體并發(fā)狀態(tài)來(lái)決定怎樣處理線(xiàn)程任務(wù)。實(shí)際上并不存在核心和非核心線(xiàn)程嘁傀,大家都是線(xiàn)程兴蒸,超過(guò)核心線(xiàn)程需要銷(xiāo)毀線(xiàn)程時(shí),當(dāng)前再獲取任務(wù)的線(xiàn)程先銷(xiāo)毀就行了细办。
(5)非核心線(xiàn)程能成為核心線(xiàn)程嗎橙凳?
線(xiàn)程池不區(qū)分核心線(xiàn)程于非核心線(xiàn)程,只是根據(jù)當(dāng)前線(xiàn)程池容量狀態(tài)做不同的處理來(lái)進(jìn)行調(diào)整笑撞,因此看起來(lái)像是有核心線(xiàn)程于非核心線(xiàn)程岛啸,實(shí)際上是滿(mǎn)足線(xiàn)程池期望達(dá)到的并發(fā)狀態(tài)。
(6)Runnable在線(xiàn)程池里如何執(zhí)行?
線(xiàn)程執(zhí)行Worker茴肥,Worker不斷從阻塞隊(duì)列里獲取任務(wù)來(lái)執(zhí)行坚踩。如果任務(wù)加入線(xiàn)程池失敗,則在拒絕策略里瓤狐,還有處理機(jī)會(huì)堕虹。
(7)如何釋放線(xiàn)程?
在線(xiàn)程沒(méi)有拿到任務(wù)后芬首,退出線(xiàn)程赴捞,通過(guò)processWorkerExit()可以證實(shí)上述所言。釋放工作線(xiàn)程也并沒(méi)有區(qū)分核心與非核心郁稍,也是隨機(jī)進(jìn)行的赦政。所謂隨機(jī),就是在前面所說(shuō)的區(qū)間范圍內(nèi),根據(jù)釋放策略恢着,哪個(gè)線(xiàn)程先達(dá)到獲取不到任務(wù)的狀態(tài)桐愉,就釋放哪個(gè)線(xiàn)程。在線(xiàn)程復(fù)用進(jìn)行死循環(huán)執(zhí)行任務(wù)的時(shí)候掰派,如線(xiàn)程通過(guò)take或者poll拿不到任務(wù)从诲,即線(xiàn)程池已經(jīng)沒(méi)有任務(wù)了,在線(xiàn)程達(dá)到最大存活時(shí)間時(shí)就會(huì)銷(xiāo)毀線(xiàn)程靡羡。
(8)線(xiàn)程池如何線(xiàn)程復(fù)用系洛?
線(xiàn)程池復(fù)用是一個(gè)非常巧妙的設(shè)計(jì)方式,假如我們來(lái)設(shè)計(jì)線(xiàn)程池略步,可能會(huì)有一個(gè)任務(wù)分派線(xiàn)程描扯,當(dāng)發(fā)現(xiàn)有線(xiàn)程空閑時(shí),就從任務(wù)緩存隊(duì)列中取一個(gè)任務(wù)交給空閑線(xiàn)程執(zhí)行趟薄。但是在這里绽诚,并沒(méi)有采用這樣的方式,因?yàn)檫@樣會(huì)要額外地對(duì)任務(wù)分派線(xiàn)程進(jìn)行管理杭煎,無(wú)形地會(huì)增加難度和復(fù)雜度恩够,這里直接讓執(zhí)行完任務(wù)的線(xiàn)程去任務(wù)緩存隊(duì)列里面取任務(wù)來(lái)執(zhí)行。

//線(xiàn)程執(zhí)行完任務(wù)之后羡铲,就會(huì)主動(dòng)去任務(wù)緩存隊(duì)列中取任務(wù)去執(zhí)行蜂桶,直到任務(wù)執(zhí)行完畢
while (task != null || (task = getTask()) != null) {
    beforeExecute(wt, task);
    task.run();
    afterExecute(task, thrown);
}

線(xiàn)程執(zhí)行完任務(wù)之后,會(huì)反復(fù)的從阻塞隊(duì)列中獲取任務(wù)來(lái)執(zhí)行犀勒,以此來(lái)實(shí)現(xiàn)線(xiàn)程復(fù)用。當(dāng)非核心線(xiàn)程超多最大存活時(shí)間之后就會(huì)銷(xiāo)毀非核心線(xiàn)程妥曲。
(9)線(xiàn)程數(shù)如何做選擇?
cpu密集型任務(wù)核心線(xiàn)程數(shù)選擇小一點(diǎn)贾费,io密集型任務(wù)選擇核心線(xiàn)程數(shù)大一點(diǎn),以及大部分是線(xiàn)程等待狀態(tài)檐盟。

6.AsyncTask源碼中的線(xiàn)程池

AsyncTask是一種輕量級(jí)的異步任務(wù)類(lèi)褂萧,可以在線(xiàn)程池中執(zhí)行耗時(shí)任務(wù),然后把執(zhí)行的進(jìn)度和最終結(jié)果傳遞給主線(xiàn)程并更新UI葵萎;
AsyncTask詳細(xì)內(nèi)容請(qǐng)參考:AsyncTask源碼解析导犹,AsyncTask使用的線(xiàn)程池如下代碼所示:

public abstract class AsyncTask<Params, Progress, Result> {
    private static final String LOG_TAG = "AsyncTask";

    private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
    // We want at least 2 threads and at most 4 threads in the core pool,
    // preferring to have 1 less than the CPU count to avoid saturating
    // the CPU with background work   核心線(xiàn)程個(gè)數(shù)是Math.min(CPU_COUNT - 1, 4)
    private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
    private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
    private static final int KEEP_ALIVE_SECONDS = 30;

    private static final ThreadFactory sThreadFactory = new ThreadFactory() {
        private final AtomicInteger mCount = new AtomicInteger(1);

        public Thread newThread(Runnable r) {
            return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
        }
    };

    private static final BlockingQueue<Runnable> sPoolWorkQueue =
            new LinkedBlockingQueue<Runnable>(128);

    /**
     * An {@link Executor} that can be used to execute tasks in parallel.
     */
    public static final Executor THREAD_POOL_EXECUTOR;

    static {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
                sPoolWorkQueue, sThreadFactory);
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        THREAD_POOL_EXECUTOR = threadPoolExecutor;
    }
    
}   

通過(guò)以上代碼可以發(fā)現(xiàn)核心線(xiàn)程個(gè)數(shù)為Math.min(CPU_COUNT - 1, 4),最大線(xiàn)程數(shù)CPU_COUNT * 2 + 1羡忘,提交到AsyncTask的耗時(shí)任務(wù)最終在THREAD_POOL_EXECUTOR中執(zhí)行谎痢,并且所有提交到AsyncTask的耗時(shí)任務(wù)都是串行執(zhí)行的。只需要一個(gè)線(xiàn)程就可以完成AsyncTask功能卷雕,所以AsyncTask中的線(xiàn)程池中只會(huì)有一個(gè)線(xiàn)程节猿。
那么AsyncTask完成可以使用SingleThreadExecutor線(xiàn)程池,但是為什么AsyncTask不采用SingleThreadExecutor,猜測(cè)有以下兩個(gè)原因:
第一:AsyncTask之前的版本中滨嘱,提交到AsyncTask的任務(wù)都是并行執(zhí)行峰鄙,并不是像現(xiàn)在這樣串行執(zhí)行,所以不能使用SingleThreadExecutor線(xiàn)程池太雨。
第二:可以直接將多個(gè)任務(wù)直接拋到AsyncTask.ThreadPoolExecutor線(xiàn)程池中執(zhí)行吟榴,這樣可以繞過(guò)AsyncTask串行排隊(duì)策略,從而實(shí)現(xiàn)并發(fā)執(zhí)行囊扳。
如果大家有其他的一些看法吩翻,希望可以一起討論一下。

參考資料

深入分析java線(xiàn)程池的實(shí)現(xiàn)原理
關(guān)于Java面試宪拥,你應(yīng)該準(zhǔn)備這些知識(shí)點(diǎn)
Java并發(fā)編程:線(xiàn)程池的使用
你了解線(xiàn)程池嗎
《Java并發(fā)編程的藝術(shù)》

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末仿野,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子她君,更是在濱河造成了極大的恐慌脚作,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,311評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件缔刹,死亡現(xiàn)場(chǎng)離奇詭異球涛,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)校镐,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,339評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門(mén)亿扁,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人鸟廓,你說(shuō)我怎么就攤上這事从祝。” “怎么了引谜?”我有些...
    開(kāi)封第一講書(shū)人閱讀 152,671評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵牍陌,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我员咽,道長(zhǎng)毒涧,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,252評(píng)論 1 279
  • 正文 為了忘掉前任贝室,我火速辦了婚禮契讲,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘滑频。我一直安慰自己捡偏,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,253評(píng)論 5 371
  • 文/花漫 我一把揭開(kāi)白布峡迷。 她就那樣靜靜地躺著霹琼,像睡著了一般。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上枣申,一...
    開(kāi)封第一講書(shū)人閱讀 49,031評(píng)論 1 285
  • 那天售葡,我揣著相機(jī)與錄音,去河邊找鬼忠藤。 笑死挟伙,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的模孩。 我是一名探鬼主播尖阔,決...
    沈念sama閱讀 38,340評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼榨咐!你這毒婦竟也來(lái)了介却?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 36,973評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤块茁,失蹤者是張志新(化名)和其女友劉穎齿坷,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體数焊,經(jīng)...
    沈念sama閱讀 43,466評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡永淌,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,937評(píng)論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了佩耳。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片遂蛀。...
    茶點(diǎn)故事閱讀 38,039評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖干厚,靈堂內(nèi)的尸體忽然破棺而出李滴,到底是詐尸還是另有隱情,我是刑警寧澤蛮瞄,帶...
    沈念sama閱讀 33,701評(píng)論 4 323
  • 正文 年R本政府宣布所坯,位于F島的核電站,受9級(jí)特大地震影響裕坊,放射性物質(zhì)發(fā)生泄漏包竹。R本人自食惡果不足惜燕酷,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,254評(píng)論 3 307
  • 文/蒙蒙 一籍凝、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧苗缩,春花似錦饵蒂、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,259評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至,卻和暖如春渊迁,著一層夾襖步出監(jiān)牢的瞬間慰照,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,485評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工琉朽, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留毒租,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,497評(píng)論 2 354
  • 正文 我出身青樓箱叁,卻偏偏與公主長(zhǎng)得像墅垮,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子耕漱,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,786評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容