Java線程池帶圖源碼解析

線程池作為Java中一個重要的知識點几晤,看了很多文章,在此以Java自帶的線程池為例植阴,記錄分析一下蟹瘾。本文參考了Java并發(fā)編程:線程池的使用Java線程池---addWorker方法解析掠手、線程池憾朴、ThreadPoolExecutor中策略的選擇與工作隊列的選擇(java線程池)ThreadPoolExecutor線程池解析與BlockingQueue的三種實現(xiàn)。本文基于JDK1.8實現(xiàn)用例和源碼分析喷鸽,并主要著重流程伊脓。

一、使用線程池

要知道一個東西的原理魁衙,首先要知道如何使用它。所以先上一個使用線程池的示例株搔。

1剖淀、任務(wù)類

要使用Java自帶的線程池,首先需要一個任務(wù)類纤房,這個任務(wù)類需要實現(xiàn)Runnable接口纵隔,并重寫run方法(需要多線程執(zhí)行的任務(wù)邏輯)。

package org.my.threadPoolDemo;
/**
 * 任務(wù)類炮姨,實現(xiàn)Runnable接口 重寫run方法
 */
public class MyTask implements Runnable{
    
    private int taskNum;
    
    public MyTask(int taskNum) {
        super();
        this.taskNum = taskNum;
    }

    @Override
    public void run() {
        System.out.println("正在執(zhí)行task"+taskNum);
        try {
            Thread.currentThread().sleep(4000);//sleep 4秒模擬執(zhí)行代碼過程
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("task"+taskNum+"執(zhí)行完畢");
    }
}

2捌刮、創(chuàng)建線程池并執(zhí)行多個任務(wù)

有了任務(wù)類,接下來創(chuàng)建線程池绅作,并執(zhí)行多個任務(wù)俄认。我們使用ThreadPoolExecutor來創(chuàng)建線程池夜焦。

package org.my.threadPoolDemo;

import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolUseDemo {

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>());
        for (int i = 0; i < 15; i++) {
            MyTask myTask = new MyTask(i);
            executor.execute(myTask);
            System.out.println("線程池中線程數(shù)量:"+executor.getPoolSize()+",線程池中等待執(zhí)行的任務(wù)數(shù)量:"+executor.getQueue().size()+",已執(zhí)行完的任務(wù)數(shù)量:"+executor.getCompletedTaskCount());
        }
        executor.shutdown();
    }

}

邏輯很簡單,創(chuàng)建了一個線程池管理器卸伞,然后使用它執(zhí)行了15個任務(wù)。這里需要解釋一下構(gòu)造ThreadPoolExecutor的時候傳入的參數(shù)的意義:

  • 5(corePoolSize)是指核心池大小弃酌。即創(chuàng)建的線程數(shù)量妓湘。如果線程池中線程數(shù)等于這個數(shù)量榜贴,那么下一個來的任務(wù)就會被放在任務(wù)隊列中(稍后詳細圖解)唬党。
  • 10(maximumPoolSize)是指線程池能創(chuàng)建的最大線程數(shù)量驶拱。如果上一步的任務(wù)隊列已滿蓝纲,那么線程池將繼續(xù)創(chuàng)建線程,直到線程數(shù)=10箭养,下一個來的任務(wù)會被拒絕執(zhí)行闯冷。
  • 200(keepAliveTime)是指表示線程沒有任務(wù)執(zhí)行時最多保持多久時間會終止。默認情況下坎弯,只有當線程池中的線程數(shù)大于corePoolSize時抠忘,keepAliveTime才會起作用拧咳,直到線程池中的線程數(shù)不大于corePoolSize,即當線程池中的線程數(shù)大于corePoolSize時灶体,如果一個線程空閑的時間達到keepAliveTime,則會終止蝎抽,直到線程池中的線程數(shù)不超過corePoolSize。但是如果調(diào)用了allowCoreThreadTimeOut(boolean)方法樟结,在線程池中的線程數(shù)不大于corePoolSize時,keepAliveTime參數(shù)也會起作用瓢宦,直到線程池中的線程數(shù)為0层坠。
  • TimeUnit.MILLISECONDS是keepAliveTime的時間單位。
TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小時
TimeUnit.MINUTES;           //分鐘
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //納秒
  • ArrayBlockingQueue是傳入的阻塞隊列,用來存放任務(wù)的隊列workQueue谦趣。即上文中提到的任務(wù)隊列摘悴。除了ArrayBlockingQueue蹂喻,還有LinkedBlockingQueue孵运、SynchronousQueue可以選擇。

ThreadPoolExecutor擁有四個構(gòu)造器蔓彩,除了上方傳入的參數(shù)以外从媚,還有另外的構(gòu)造器可以傳入:

  • threadFactory:線程工廠宫纬,主要用來創(chuàng)建線程
  • handler:表示拒絕執(zhí)行任務(wù)時的策略,有四種取值:
ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。 
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù)憔四,但是不拋出異常。 
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務(wù)伊群,然后重新嘗試執(zhí)行任務(wù)(重復此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù) 

執(zhí)行上面的程序洛口,結(jié)果如下:

正在執(zhí)行task0
線程池中線程數(shù)量:1,線程池中等待執(zhí)行的任務(wù)數(shù)量:0,已執(zhí)行完的任務(wù)數(shù)量:0
線程池中線程數(shù)量:2,線程池中等待執(zhí)行的任務(wù)數(shù)量:0,已執(zhí)行完的任務(wù)數(shù)量:0
正在執(zhí)行task1
線程池中線程數(shù)量:3,線程池中等待執(zhí)行的任務(wù)數(shù)量:0,已執(zhí)行完的任務(wù)數(shù)量:0
正在執(zhí)行task2
線程池中線程數(shù)量:4,線程池中等待執(zhí)行的任務(wù)數(shù)量:0,已執(zhí)行完的任務(wù)數(shù)量:0
正在執(zhí)行task3
線程池中線程數(shù)量:5,線程池中等待執(zhí)行的任務(wù)數(shù)量:0,已執(zhí)行完的任務(wù)數(shù)量:0
正在執(zhí)行task4
線程池中線程數(shù)量:5,線程池中等待執(zhí)行的任務(wù)數(shù)量:1,已執(zhí)行完的任務(wù)數(shù)量:0
線程池中線程數(shù)量:5,線程池中等待執(zhí)行的任務(wù)數(shù)量:2,已執(zhí)行完的任務(wù)數(shù)量:0
線程池中線程數(shù)量:5,線程池中等待執(zhí)行的任務(wù)數(shù)量:3,已執(zhí)行完的任務(wù)數(shù)量:0
線程池中線程數(shù)量:5,線程池中等待執(zhí)行的任務(wù)數(shù)量:4,已執(zhí)行完的任務(wù)數(shù)量:0
線程池中線程數(shù)量:5,線程池中等待執(zhí)行的任務(wù)數(shù)量:5,已執(zhí)行完的任務(wù)數(shù)量:0
線程池中線程數(shù)量:6,線程池中等待執(zhí)行的任務(wù)數(shù)量:5,已執(zhí)行完的任務(wù)數(shù)量:0
正在執(zhí)行task10
線程池中線程數(shù)量:7,線程池中等待執(zhí)行的任務(wù)數(shù)量:5,已執(zhí)行完的任務(wù)數(shù)量:0
正在執(zhí)行task11
線程池中線程數(shù)量:8,線程池中等待執(zhí)行的任務(wù)數(shù)量:5,已執(zhí)行完的任務(wù)數(shù)量:0
正在執(zhí)行task12
正在執(zhí)行task13
線程池中線程數(shù)量:9,線程池中等待執(zhí)行的任務(wù)數(shù)量:5,已執(zhí)行完的任務(wù)數(shù)量:0
線程池中線程數(shù)量:10,線程池中等待執(zhí)行的任務(wù)數(shù)量:5,已執(zhí)行完的任務(wù)數(shù)量:0
正在執(zhí)行task14
task1執(zhí)行完畢
task10執(zhí)行完畢
正在執(zhí)行task5
task11執(zhí)行完畢
task13執(zhí)行完畢
task4執(zhí)行完畢
task3執(zhí)行完畢
task2執(zhí)行完畢
task0執(zhí)行完畢
正在執(zhí)行task9
正在執(zhí)行task8
正在執(zhí)行task7
task14執(zhí)行完畢
task12執(zhí)行完畢
正在執(zhí)行task6
task5執(zhí)行完畢
task9執(zhí)行完畢
task7執(zhí)行完畢
task8執(zhí)行完畢
task6執(zhí)行完畢

可以看到,由于corePoolSize是5蹂空,所以當任務(wù)數(shù)量大于5的時候俯萌,接下來來的任務(wù)放入了任務(wù)隊列等待,但是由于任務(wù)隊列最大容量是5腌闯,maximumPoolSize=10绳瘟,所以在任務(wù)隊列滿了之后,線程池管理器又繼續(xù)創(chuàng)建了5個線程姿骏,最終線程池中線程數(shù)量達到了10糖声。這時候15個任務(wù)能夠由這些線程處理完,如果再增加任務(wù)分瘦,比如將for循環(huán)次數(shù)增加到20蘸泻,就出現(xiàn)了java.util.concurrent.RejectedExecutionException。

二嘲玫、原理分析

從上面使用線程池的例子來看悦施,最主要就是兩步,構(gòu)造ThreadPoolExecutor對象去团,然后每來一個任務(wù)抡诞,就調(diào)用ThreadPoolExecutor對象的execute方法。

1土陪、ThreadPoolExecutor結(jié)構(gòu)

ThreadPoolExecutor的主要結(jié)構(gòu)及繼承關(guān)系如下圖所示:

ThreadPoolExecutor結(jié)構(gòu)及繼承關(guān)系

主要成員變量:任務(wù)隊列——存放那些暫時無法執(zhí)行的任務(wù)昼汗;工作線程池——存放當前啟用的所有線程;線程工廠——創(chuàng)建線程鬼雀;還有一些用來調(diào)度線程與任務(wù)并保證線程安全的成員顷窒。

了解了ThreadPoolExecutor的主要結(jié)構(gòu),再簡單梳理一下“一個傳入線程池的任務(wù)能夠被最終正常執(zhí)行需要經(jīng)過的主要流程”源哩,方法名稱前面沒有“XXX.”這種標注的都是ThreadPoolExecutor的方法:

線程池主要流程

2鞋吉、ThreadPoolExecutor構(gòu)造器及重要常量

簡單了解下構(gòu)造器鸦做,ThreadPoolExecutor的四個構(gòu)造器的源碼如下:

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
    }
    
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler);
    }
    
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler);
    }
    
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

從源碼中可以看出,這四個構(gòu)造器都是調(diào)用最后一個構(gòu)造器谓着,只是根據(jù)開發(fā)者傳入的參數(shù)的不同而填充一些默認的參數(shù)泼诱。比如如果開發(fā)者沒有傳入線程工廠threadFactory參數(shù),那么構(gòu)造器就使用默認的Executors.defaultThreadFactor漆魔。

在這里還要理解ThreadPoolExecutor的幾個常量的含義和幾個簡單方法:

//Integer.SIZE是一個靜態(tài)常量坷檩,值為32,也就是說COUNT_BITS是29
private static final int COUNT_BITS = Integer.SIZE - 3;
//CAPACITY是最大容量536870911改抡,因為1左移29位之后-1矢炼,導致最高三位為0,而其余29位全部為1
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
//ctl用于表示線程池狀態(tài)和有效線程數(shù)量阿纤,最高三位表示線程池的狀態(tài)句灌,其余低位表示有效線程數(shù)量
//初始化之后ctl等于RUNNING的值,即默認狀態(tài)是運行狀態(tài)欠拾,線程數(shù)量為0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//1110 0000 0000 0000 0000 0000 0000 0000最高三位為111
private static final int RUNNING    = -1 << COUNT_BITS;
//最高三位為000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
//0010 0000 0000 0000 0000 0000 0000 0000最高三位為001
private static final int STOP       =  1 << COUNT_BITS;
//0100 0000 0000 0000 0000 0000 0000 0000最高三位為010
private static final int TIDYING    =  2 << COUNT_BITS;
//0110 0000 0000 0000 0000 0000 0000 0000最高三位為011
private static final int TERMINATED =  3 << COUNT_BITS;
/**
* 獲取運行狀態(tài)胰锌,入?yún)閏tl。因為CAPACITY是最高三位為0藐窄,其余低位為1
* 所以當取反的時候资昧,就只有最高三位為1,再經(jīng)過與運算荆忍,只會取到ctl的最高三位
* 而這最高三位如上文所述格带,表示線程池的狀態(tài)
*/
private static int runStateOf(int c)     { return c & ~CAPACITY; }
/**
* 獲取工作線程數(shù)量,入?yún)閏tl刹枉。因為CAPACITY是最高三位為0叽唱,其余低位為1
* 所以當進行與運算的時候,只會取到低29位微宝,這29位正好表示有效線程數(shù)量
*/
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
//任務(wù)隊列棺亭,用于存放待執(zhí)行任務(wù)的阻塞隊列
private final BlockingQueue<Runnable> workQueue;
/** 判斷線程池是否是運行狀態(tài),傳入的參數(shù)是ctl的值
* 只有RUNNING的符號位是1蟋软,也就是只有RUNNING為負數(shù)
* 所以如果目前的ctl值<0镶摘,就是RUNNING狀態(tài)
*/
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}
//從任務(wù)隊列中移除任務(wù)
public boolean remove(Runnable task) {
    boolean removed = workQueue.remove(task);
    tryTerminate(); // In case SHUTDOWN and now empty
    return removed;
}

3、getTask()源代碼

通過前面的流程圖岳守,我們知道getTask()是由runWorker方法調(diào)用的钉稍,目的是取出一個任務(wù)。

private Runnable getTask() {
        //記錄循環(huán)體中上個循環(huán)在從阻塞隊列中取任務(wù)時是否超時
        boolean timedOut = false; 
        //無條件循環(huán)棺耍,主要是在線程池運行正常情況下
        //通過循環(huán)體內(nèi)部的阻塞隊列的阻塞時間,來控制當前線程的超時時間
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);//獲取線程池狀態(tài)

            /*先獲取線程池的狀態(tài)种樱,如果狀態(tài)大于等于STOP蒙袍,也就是STOP俊卤、TIDYING、TERMINATED之一
             *這時候不管隊列中有沒有任務(wù)害幅,都不用去執(zhí)行了消恍;
             *如果線程池的狀態(tài)為SHUTDOWN且隊列中沒有任務(wù)了,也不用繼續(xù)執(zhí)行了
             *將工作線程數(shù)量-1以现,并且返回null
             **/
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            //獲取工作線程數(shù)量
            int wc = workerCountOf(c);

            //是否啟用超時參數(shù)keepAliveTime
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            //如果這個條件成立狠怨,如果工作線程數(shù)量-1成功,返回null邑遏,否則跳出循環(huán)
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            //如果需要采用阻塞形式獲取佣赖,那么就poll設(shè)定阻塞時間,否則take無限期等待记盒。
            //這里通過阻塞時間憎蛤,間接控制了超時的值,如果取值超時纪吮,意味著這個線程在超時時間內(nèi)處于空閑狀態(tài)
            //那么下一個循環(huán)俩檬,將會return null并且線程數(shù)量-1
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

4、runWorker(Worker w)源代碼

通過上面流程圖碾盟,可以看出runWorker(Worker w)實際上已經(jīng)是在線程啟動之后執(zhí)行任務(wù)了棚辽,所以其主要邏輯就是獲取任務(wù),然后執(zhí)行任務(wù)的run方法冰肴。

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;//獲取傳入的Worker對象中的任務(wù)
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
                /* 
                 * 如果傳入的任務(wù)為null屈藐,就從任務(wù)隊列中獲取任務(wù),只要這兩者有一個不為null嚼沿,就進入循環(huán)體
                 */
            while (task != null || (task = getTask()) != null) {
                w.lock();
                //如果線程池狀態(tài)已被標為停止估盘,那么則不允許該線程繼續(xù)執(zhí)行任務(wù)!或者該線程已是中斷狀態(tài),
                //也不允許執(zhí)行任務(wù),還需要中斷該線程!
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);//為子類預(yù)留的空方法(鉤子)
                    Throwable thrown = null;
                    try {
                        task.run();//終于真正執(zhí)行傳入的任務(wù)了
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);//為子類預(yù)留的空方法(鉤子)
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
                /*
                 * 從這里可以看出骡尽,當一個任務(wù)執(zhí)行完畢之后遣妥,循環(huán)并沒有退出,此時會再次執(zhí)行條件判斷
                 * 也就是說如果執(zhí)行完第一個任務(wù)之后攀细,任務(wù)隊列中還有任務(wù)箫踩,那么將會繼續(xù)在這個線程執(zhí)行。
                 * 這里也是很巧妙的地方谭贪,不需要額外開一個控制線程來看那些線程處于空閑狀態(tài)境钟,然后給他分配任務(wù)。
                 * 直接自己去任務(wù)隊列拿任務(wù)
                 */
            }
            completedAbruptly = false;
        } finally {
                /*
                 * 執(zhí)行到這里俭识,說明getTask返回了null慨削,要么是超時(任務(wù)隊列沒有任務(wù)了),要么是線程池狀態(tài)有問題了
                 * 當前線程將被回收了
                 */
            processWorkerExit(w, completedAbruptly);
        }
    }

5、addWorker(Runnable firstTask, boolean core)源代碼

runWorker是從Worker對象中獲取第一個任務(wù)缚态,然后從任務(wù)隊列中一直獲取任務(wù)執(zhí)行磁椒。流程圖中已經(jīng)說過,這個Worker對象是在addWorker方法中創(chuàng)建的玫芦,所以新線程創(chuàng)建浆熔、啟動的源頭是在addWorker方法中。而addWorker是被execute所調(diào)用桥帆,execute根據(jù)addWorker的返回值医增,進行條件判斷。

private boolean addWorker(Runnable firstTask, boolean core) {
        //上來就是retry老虫,后面continue retry;語句執(zhí)行之后都會從這里重新開始
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);//獲取線程池運行狀態(tài)

            /*
             * 獲取當前線程池的狀態(tài)叶骨,如果是STOP,TIDYING,TERMINATED狀態(tài)的話张遭,則會返回false
             * 如果現(xiàn)在狀態(tài)是SHUTDOWN邓萨,但是firstTask不為空或者workQueue為空的話,那么直接返回false
             */
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);//獲取工作線程數(shù)量
                /*
                 * addWorker傳入的第二個Boolean參數(shù)用來判別當前線程數(shù)量是否大于核心池數(shù)量
                 * true菊卷,代表當前線程數(shù)量小于核心池數(shù)量
                 */
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //增加工作線程數(shù)量
                if (compareAndIncrementWorkerCount(c))
                        //如果成功缔恳,跳出retry
                    break retry;
                c = ctl.get();  // Re-read ctl
                //判斷線程池狀態(tài),改變了就重試一次
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
                /*
                 * 前面順利增加了工作線程數(shù)洁闰,那么這里就真正創(chuàng)建Worker
                 * 上面流程圖中說過歉甚,創(chuàng)建Worker時會創(chuàng)建新的線程.如果這里創(chuàng)建失敗
                 * finally中會將工作線程數(shù)-1
                 */
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);//將Worker放入工作線程池
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();//啟動線程
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

6、execute方法

execute方法其實最主要是根據(jù)線程池的策略來傳遞不同的參數(shù)給addWorker方法扑眉。
當調(diào)用 execute() 方法添加一個任務(wù)時纸泄,線程池會做如下判斷:

a. 如果正在運行的線程數(shù)量小于 corePoolSize,那么馬上創(chuàng)建線程運行這個任務(wù)腰素;
b. 如果正在運行的線程數(shù)量大于或等于 corePoolSize聘裁,那么將這個任務(wù)放入隊列。
c. 如果這時候隊列滿了弓千,而且正在運行的線程數(shù)量小于 maximumPoolSize衡便,那么還是要創(chuàng)建線程運行這個任務(wù);
d. 如果隊列滿了洋访,而且正在運行的線程數(shù)量大于或等于 maximumPoolSize镣陕,那么線程池會拋出異常,告訴調(diào)用者“我不能再接受任務(wù)了”姻政。

方法execute主要是在控制上面四條策略的實現(xiàn)呆抑。

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //獲取到成員變量ctl的值(線程池狀態(tài))
        int c = ctl.get();
        //如果工作線程的數(shù)量<核心池的大小
        if (workerCountOf(c) < corePoolSize) {
            //調(diào)用addWorker(這里傳入的true代表工作線程數(shù)量<核心池大小)
            //如果成功汁展,方法結(jié)束鹊碍。
            if (addWorker(command, true))
                return;
            //否則厌殉,再重新獲取一次ctl的值
            //個人理解是防止前面這段代碼執(zhí)行的時候有其他線程改變了ctl的值。
            c = ctl.get();
        }
        //如果工作線程數(shù)量>=核心池的大小或者上一步調(diào)用addWorker返回false侈咕,繼續(xù)走到下面
        //如果線程池處于運行狀態(tài)年枕,并且成功將當前任務(wù)放入任務(wù)隊列
        if (isRunning(c) && workQueue.offer(command)) {
            //為了線程安全,重新獲取ctl的值
            int recheck = ctl.get();
            //如果線程池不處于運行狀態(tài)并且任務(wù)從任務(wù)隊列移除成功
            if (! isRunning(recheck) && remove(command))
                //調(diào)用reject拒絕執(zhí)行乎完,根據(jù)handler的實現(xiàn)類拋出異常或者其他操作
                reject(command);
            //否則品洛,如果工作線程數(shù)量==0树姨,調(diào)用addWorker并傳入null和false
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //執(zhí)行到這里代表當前線程已超越了核心線程且任務(wù)提交到任務(wù)隊列失敗。(可以注意這里的addWorker是false)
        //那么這里再次調(diào)用addWroker創(chuàng)建新線程(這時創(chuàng)建的線程是maximumPoolSize)桥状。
        //如果還是提交任務(wù)失敗則調(diào)用reject處理失敗任務(wù)
        else if (!addWorker(command, false))
            reject(command);
    }

三帽揪、線程池相關(guān)影響因素

1、阻塞隊列的選擇

本段引用自ThreadPoolExecutor中策略的選擇與工作隊列的選擇(java線程池)

直接提交:
工作隊列的默認選項是SynchronousQueue辅斟,它將任務(wù)直接提交給線程而不存儲它們转晰。在此,如果不存在可用于立即運行任務(wù)的線程士飒,則試圖把任務(wù)加入隊列將失敗查邢,因此會構(gòu)造一個新的線程。此策略可以避免在處理可能具有內(nèi)部依賴性的請求集時出現(xiàn)鎖酵幕。直接提交通常要求無界maximumPoolSize以避免拒絕新提交的任務(wù)扰藕。當命令以超過隊列所能處理的平均數(shù)連續(xù)到達時,此策略允許無界線程具有增長的可能性芳撒。

無界隊列:
使用無界隊列(例如邓深,不具有預(yù)定義容量的LinkedBlockingQueue將導致在所有 corePoolSize 線程都忙時新任務(wù)在隊列中等待。這樣笔刹,創(chuàng)建的線程就不會超過corePoolSize芥备。(因此,maximumPoolSize的值也就無效了舌菜。)當每個任務(wù)完全獨立于其他任務(wù)萌壳,即任務(wù)執(zhí)行互不影響時,適合于使用無界隊列酷师;例如讶凉,在 Web 頁服務(wù)器中。這種排隊可用于處理瞬態(tài)突發(fā)請求山孔,當命令以超過隊列所能處理的平均數(shù)連續(xù)到達時懂讯,此策略允許無界線程具有增長的可能性。

有界隊列:
當使用有限的maximumPoolSize時台颠,有界隊列(如 ArrayBlockingQueue)有助于防止資源耗盡褐望,但是可能較難調(diào)整和控制勒庄。隊列大小和最大池大小可能需要相互折衷:使用大型隊列和小型池可以最大限度地降低 CPU 使用率、操作系統(tǒng)資源和上下文切換開銷瘫里,但是可能導致人工降低吞吐量实蔽。如果任務(wù)頻繁阻塞(例如,如果它們是 I/O 邊界)谨读,則系統(tǒng)可能為超過您許可的更多線程安排時間局装。使用小型隊列通常要求較大的池大小,CPU 使用率較高劳殖,但是可能遇到不可接受的調(diào)度開銷铐尚,這樣也會降低吞吐量。

2哆姻、常用的線程池

在使用ThreadPoolExecutor創(chuàng)建線程池的時候宣增,根據(jù)不同參數(shù),可以使用不同構(gòu)造器構(gòu)造不同特性的線程池矛缨。而實際情況中爹脾,我們一般使用Executors類提供的方法來創(chuàng)建線程池。Executors最終也是調(diào)用ThreadPoolExecutor的構(gòu)造器箕昭,但是已經(jīng)配置好了相關(guān)參數(shù)灵妨。

1)固定大小的線程池:Executors.newFixedThreadPool

corePoolSize和maximumPoolSize相同,超時時間為0盟广,隊列用的LinkedBlockingQueue無界的FIFO隊列闷串,這表示這個線程池始終只有corePoolSize個線程在運行。根據(jù)前面分析的筋量,如果執(zhí)行完任務(wù)烹吵,這個線程會繼續(xù)從任務(wù)隊列取任務(wù)執(zhí)行,當沒有任務(wù)的時候桨武,線程會立即關(guān)閉肋拔。

2)單任務(wù)線程池:Executors.newSingleThreadExecutor

池中只有一個線程工作,阻塞隊列無界呀酸,它能保證按照任務(wù)提交的順序來執(zhí)行任務(wù)凉蜂。

3)可變尺寸的線程池:Executors.newCachedThreadPool

SynchronousQueue隊列,一個不存儲元素的阻塞隊列性誉。每個插入操作必須等到另一個線程調(diào)用移除操作窿吩。所以,當我們提交第一個任務(wù)的時候错览,是加入不了隊列的纫雁,這就滿足了,一個線程池條件“當無法加入隊列的時候倾哺,且任務(wù)沒有達到maximumPoolSize時轧邪,我們將新開啟一個線程任務(wù)”刽脖。所以我們的maximumPoolSize是ctl低29位的最大值。超時時間是60s忌愚,當一個線程沒有任務(wù)執(zhí)行會暫時保存60s超時時間曲管,如果沒有的新的任務(wù)的話,會從線程池中remove掉硕糊。

4)scheduled線程池院水,Executors.newScheduledThreadPool

創(chuàng)建一個大小無限的線程池。此線程池支持定時以及周期性執(zhí)行任務(wù)的需求

終于差不多可以畫上句號了简十,感覺有很多東西還是沒有細說衙耕,尤其是其中大量的重入鎖。有機會再補上勺远。綜合了很多網(wǎng)上資源,也自己畫了幾張圖时鸵,個人認為相對于很多介紹線程池的博客來說胶逢,更容易理解吧。如有錯誤饰潜,歡迎批評指正初坠。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市彭雾,隨后出現(xiàn)的幾起案子碟刺,更是在濱河造成了極大的恐慌,老刑警劉巖薯酝,帶你破解...
    沈念sama閱讀 216,544評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件半沽,死亡現(xiàn)場離奇詭異,居然都是意外死亡吴菠,警方通過查閱死者的電腦和手機者填,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,430評論 3 392
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來做葵,“玉大人占哟,你說我怎么就攤上這事∧鹗福” “怎么了榨乎?”我有些...
    開封第一講書人閱讀 162,764評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長瘫筐。 經(jīng)常有香客問我蜜暑,道長,這世上最難降的妖魔是什么严肪? 我笑而不...
    開封第一講書人閱讀 58,193評論 1 292
  • 正文 為了忘掉前任史煎,我火速辦了婚禮谦屑,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘篇梭。我一直安慰自己氢橙,他們只是感情好,可當我...
    茶點故事閱讀 67,216評論 6 388
  • 文/花漫 我一把揭開白布恬偷。 她就那樣靜靜地躺著悍手,像睡著了一般。 火紅的嫁衣襯著肌膚如雪袍患。 梳的紋絲不亂的頭發(fā)上坦康,一...
    開封第一講書人閱讀 51,182評論 1 299
  • 那天,我揣著相機與錄音诡延,去河邊找鬼滞欠。 笑死,一個胖子當著我的面吹牛肆良,可吹牛的內(nèi)容都是我干的筛璧。 我是一名探鬼主播,決...
    沈念sama閱讀 40,063評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼惹恃,長吁一口氣:“原來是場噩夢啊……” “哼夭谤!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起巫糙,我...
    開封第一講書人閱讀 38,917評論 0 274
  • 序言:老撾萬榮一對情侶失蹤朗儒,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后参淹,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體醉锄,經(jīng)...
    沈念sama閱讀 45,329評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,543評論 2 332
  • 正文 我和宋清朗相戀三年浙值,在試婚紗的時候發(fā)現(xiàn)自己被綠了榆鼠。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,722評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡亥鸠,死狀恐怖妆够,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情负蚊,我是刑警寧澤神妹,帶...
    沈念sama閱讀 35,425評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站家妆,受9級特大地震影響鸵荠,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜伤极,卻給世界環(huán)境...
    茶點故事閱讀 41,019評論 3 326
  • 文/蒙蒙 一蛹找、第九天 我趴在偏房一處隱蔽的房頂上張望姨伤。 院中可真熱鬧,春花似錦庸疾、人聲如沸乍楚。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,671評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽徒溪。三九已至,卻和暖如春金顿,著一層夾襖步出監(jiān)牢的瞬間臊泌,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,825評論 1 269
  • 我被黑心中介騙來泰國打工揍拆, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留渠概,地道東北人。 一個月前我還...
    沈念sama閱讀 47,729評論 2 368
  • 正文 我出身青樓嫂拴,卻偏偏與公主長得像高氮,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子顷牌,可洞房花燭夜當晚...
    茶點故事閱讀 44,614評論 2 353

推薦閱讀更多精彩內(nèi)容