線程池源碼解析 Jdk 1.8

基礎(chǔ)

學(xué)習(xí)一個(gè)類昼钻,我們應(yīng)該先從其字段開始。首先看看ThreadPoolExecutor對(duì)應(yīng)的屬性有哪些仅财。

private volatile int corePoolSize; // 核心線程數(shù)盏求,線程池在阻塞獲取任務(wù)時(shí)可以保持永久存活的線程的最大值亿眠。當(dāng)線程池內(nèi)的線程超過(guò)此值的線程會(huì)通過(guò)poll(keepAliveTime)獲取任務(wù)
private volatile int maximumPoolSize; // 線程池中允許的最大的線程數(shù)纳像,這里使用volatile修飾竟趾,保證多線程下的可見性
private volatile long keepAliveTime; // Woker從workQueue獲取任務(wù)的最大等待時(shí)間岔帽,超過(guò)這個(gè)時(shí)間后,worker會(huì)被回收掉(run方法執(zhí)行完畢鞋邑,線程不可復(fù)生)
private final BlockingQueue<Runnable> workQueue; // 提交的任務(wù)的排隊(duì)隊(duì)列枚碗,這是一個(gè)接口铸本,通過(guò)不同的策略實(shí)現(xiàn)不同的線程池機(jī)制
private int largestPoolSize; // 線程池中最大的pool size箱玷,只會(huì)增加不會(huì)減少,其是一個(gè)統(tǒng)計(jì)信息
private final HashSet<Worker> workers = new HashSet<Worker>(); // 內(nèi)部運(yùn)行的Worker存放的地方波丰,通過(guò)mainLock保證線程安全
private final ReentrantLock mainLock = new ReentrantLock(); //內(nèi)部的一個(gè)獨(dú)占鎖掰烟,主要保證線程池的一些統(tǒng)計(jì)信息(最大的線程數(shù)纫骑、完成的任務(wù)數(shù))和worker添加到集合的安全性
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //線程安全類型,最高位為符號(hào)位发框,次高3位為狀態(tài)值梅惯,低28位為當(dāng)前的線程數(shù)
private volatile boolean allowCoreThreadTimeOut; // 是否允許核心線程從阻塞隊(duì)列獲取任務(wù)時(shí)銷毀个唧。默認(rèn)為false
private volatile ThreadFactory threadFactory; // 內(nèi)部為worker提供任務(wù)執(zhí)行的線程的生成工廠设预。我們通過(guò)自定義的工廠來(lái)使得業(yè)務(wù)日志更為清晰或者執(zhí)行不同的業(yè)務(wù)邏輯
private volatile RejectedExecutionHandler handler; // 拒絕策略鳖枕,默認(rèn)拒絕策略為拋出異常宾符。線程池的拒絕策略是策略模式在JDK中的一個(gè)應(yīng)用點(diǎn)灭翔「蜗洌可以自定義拒絕策略煌张,在生產(chǎn)者的速度遠(yuǎn)遠(yuǎn)大于消費(fèi)者時(shí)將超出的任務(wù)持久化到外部存儲(chǔ)。

其中corePoolSize链嘀、maximumPoolSize怀泊、keepAliveTime等變量使用volatile修飾,是因?yàn)榫€程池提供了public的set方法讓我們可以對(duì)其進(jìn)行修改刷允,這里需要使用volatile來(lái)使得修改對(duì)多線程可見树灶。
其他屬性的修改在mainLock的控制下進(jìn)行天通。

線程池狀態(tài)

了解線程池必須了解其狀態(tài)機(jī)制熄驼。線程池內(nèi)部使用AtomicInteger類型的clt屬性來(lái)進(jìn)行狀態(tài)控制瓜贾。其中次高三位分別表示running祭芦、shutdown、stop胃夏、tidying仰禀、teminated這5種狀態(tài)

線程池狀態(tài)圖.png

常用的方法

  1. 任務(wù)提交
public void execute(Runnable command) {
        // NPE檢查答恶,線程池不允許提交NULL任務(wù)
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get(); // 獲取當(dāng)前的clt悬嗓,AtomicInteger類型保證線程安全
        if (workerCountOf(c) < corePoolSize) { //如果當(dāng)前運(yùn)行的線程數(shù)小于核心線程數(shù)
            if (addWorker(command, true)) //如果添加核心線程數(shù)成功則方法返回
                return;
            c = ctl.get();//執(zhí)行到這里必定是添加核心線程失敗烫扼,重新讀取最新的clt
        }
        /**
         * 這里分析一下添加核心態(tài)worker失敗的幾種場(chǎng)景:
         * 1映企、線程池為shutdown以上的狀態(tài)
         * 2、當(dāng)前線程池中運(yùn)行的worker的數(shù)量超過(guò)其本身最大限制(2^29  -1 )
         * 3挤渐、當(dāng)前線程池中運(yùn)行的worker的數(shù)量超過(guò)corePoolSize
         */
        // 如果線程池處于running狀態(tài)浴麻,則將當(dāng)前提交的任務(wù)提交到內(nèi)部的阻塞隊(duì)列進(jìn)行排隊(duì)等待worker處理
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            /**
             * double check是否線程池仍在運(yùn)行中
             * 如果線程池不在running狀態(tài)則將剛才進(jìn)行排隊(duì)的任務(wù)移除软免,并拒絕此次提交的任務(wù)
             * 如果此時(shí)在線程池中運(yùn)行的worker數(shù)量減少到0(corePoolSize為0的線程池在并發(fā)的情況下會(huì)出現(xiàn)此場(chǎng)景)
             * 則添加一個(gè)不攜帶任何任務(wù)的非核心態(tài)的worker去處理剛才排隊(duì)成功的任務(wù)
             */
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))//如果排隊(duì)失敻嘞簟(有界的阻塞隊(duì)列)則添加一個(gè)非核心態(tài)的worker
         //添加失旈环骸:當(dāng)前運(yùn)行的worker數(shù)量超過(guò)maximumPoolSize或者本身最大的限制噩斟;線程池狀態(tài)在shutdown以上
            reject(command);
    }
  1. 新增處理線程(worker)
private boolean addWorker(Runnable firstTask, boolean core) {
        //自旋進(jìn)行線程狀態(tài)check
        retry:
        for (;;) {
            int c = ctl.get(); //讀取最新的clt剃允,其本身具有可見性
            int rs = runStateOf(c);
            // 檢查線程池狀態(tài)是否在shutdown以上
            if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
                return false;
            /**
             * 自旋進(jìn)行worker數(shù)量自增
             * 如果當(dāng)前新增的是核心態(tài)的worker則與corePoolSize進(jìn)行比較
             * 如果當(dāng)期新增的是非核心態(tài)的worker則與maximumPoolSize進(jìn)行比較
             * 不滿足數(shù)量限制則直接添加失敗硅急,進(jìn)入后續(xù)的排隊(duì) or 拒絕流程
             */
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                /**
                 * 通過(guò)CAS進(jìn)行worker數(shù)量+1营袜。為什么不直接調(diào)用AtomicInteger提供的incrementAndGet() 方法荚板?
                 * 因?yàn)槲覀兪切枰獙orker數(shù)量+1,而后者并不能提供單純的+1功能跪另。將c-> c+1而不是變成c -> c + N
                 */
                if (compareAndIncrementWorkerCount(c))
                    break retry; //如果CAS成功則跳出自旋
                c = ctl.get();  // 重新讀clt免绿,代碼執(zhí)行到這里意味著clt的值必定被其他線程修改擦盾,本次讀會(huì)從主存讀取最新的值到工作內(nèi)存
                if (runStateOf(c) != rs)// 如果線程池狀態(tài)發(fā)生變化(只有running狀態(tài)才接受新任務(wù)),則跳到外層循環(huán)執(zhí)行拒絕
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        // 代碼執(zhí)行到此處徒仓,意味著worker的數(shù)量成功+1掉弛,則可以進(jìn)行worker的構(gòu)造過(guò)程
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // new 一個(gè)worker殃饿,將本次提交的任務(wù)封裝到其內(nèi)部
            w = new Worker(firstTask);
            final Thread t = w.thread; // worker內(nèi)部真正用來(lái)執(zhí)行任務(wù)的線程
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                /**
                 * 進(jìn)行線程池狀態(tài)檢查乎芳,thread狀態(tài)檢查秒咐,進(jìn)行運(yùn)行的最大線程數(shù)(largestPoolSize)統(tǒng)計(jì)
                 * 將worker添加到wokrers容器(HashSet)中
                 * 修改workerAdded為true
                 */
                try {
                   ...省略此處代碼
                } finally {
                    mainLock.unlock();
                }
                //在這里workerAdded為false:thread已經(jīng)調(diào)用該start方法携取;線程池狀態(tài)為shutdown以上
                if (workerAdded) {
                    // 啟動(dòng)worker內(nèi)部的線程帮孔,其會(huì)調(diào)用worker內(nèi)部的run方法
                    t.start();
                    // 添加成功
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
  1. 執(zhí)行任務(wù) (worker的工作流程)

什么是worker文兢?

 private final class Worker extends AbstractQueuedSynchronizer implements Runnable
      Worker(Runnable firstTask) {
         setState(-1);
         this.firstTask = firstTask; //外部提交的任務(wù)
         this.thread = getThreadFactory().newThread(this); // 真實(shí)執(zhí)行任務(wù)的線程
       }

從這里我們可以看出其實(shí)際是一個(gè)Runnable姆坚,并且是AQS的子類兼呵,那么我們可以簡(jiǎn)單的猜測(cè)到其能夠進(jìn)行并發(fā)的控制(lock击喂、unlock)

  final void runWorker(Worker w) {
        //在添加worker的流程中執(zhí)行thread.start()之后真實(shí)執(zhí)行的方法
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask; // 獲取當(dāng)前worker攜帶的任務(wù)
        w.firstTask = null;
        /**
         * 直接unlock懂昂??沸柔?在unlock之前一定要lock嗎勉失?從這里我們可以看出不一定
         */
        w.unlock(); // 修改state為0乱凿,將占用鎖的線程設(shè)為null(第一次執(zhí)行之前沒(méi)有線程占用)
        boolean completedAbruptly = true;
        try {
            // 自旋徒蟆。先執(zhí)行自己攜帶的任務(wù)段审,然后從阻塞隊(duì)列中獲取一個(gè)任務(wù)直到無(wú)法獲取任務(wù)
            while (task != null || (task = getTask()) != null) {
                // 將state修改為1,設(shè)置占有鎖的線程為自己
                w.lock();
                /**
                 * check線程池的狀態(tài)抑淫,如果狀態(tài)為stop以上(stop以上不執(zhí)行任務(wù))始苇,則中斷當(dāng)前線程
                 * 如果當(dāng)前線程已被中斷(其他線程并發(fā)的調(diào)用線程池的shutdown()或shutdownNow()方法)催式,則check線程池狀態(tài)是否為stop以上
                 * 最后如果當(dāng)前線程未被中斷則中斷當(dāng)前線程(不可能荣月!筆者還未想到此種場(chǎng)景)
                 */
                if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);// 空方法哺窄,留給子類實(shí)現(xiàn)
                    Throwable thrown = null;
                    try {
                        task.run(); //執(zhí)行外部提交的任務(wù)堂氯,通過(guò)try-catch來(lái)保證異常不會(huì)影響線程池本身的功能
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);// 空方法,留給子類實(shí)現(xiàn)
                    }
                } finally {
                    task = null;
                    w.completedTasks++; //已完成任務(wù)數(shù)量統(tǒng)計(jì)
                    w.unlock();
                }
            }
            // 如果執(zhí)行到這里代表非核心線程在keepAliveTime內(nèi)無(wú)法獲取任務(wù)而退出
            completedAbruptly = false;
        } finally {
            /**
             * 從上面可以看出如果實(shí)際業(yè)務(wù)(外部提交的Runnable)出現(xiàn)異常會(huì)導(dǎo)致當(dāng)前worker終止
             * completedAbruptly 此時(shí)為true意味著worker是突然完成啤握,不是正常退出
             */
            processWorkerExit(w, completedAbruptly);// 執(zhí)行worker退出收尾工作
        }
    }
  1. 獲取任務(wù)
  private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        // 自旋獲取任務(wù)(因?yàn)槭嵌嗑€程環(huán)境)
        for (;;) {
            int c = ctl.get();// 讀取最新的clt
            int rs = runStateOf(c);
            /**
             * 1鸟缕、線程池狀態(tài)為shutdown并且任務(wù)隊(duì)列為空
             * 2、線程池狀態(tài)為stop狀態(tài)以上
             * 這2種情況直接減少worker數(shù)量,并且返回null從而保證外部獲取任務(wù)的worker進(jìn)行正常退出
             */
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            int wc = workerCountOf(c);
            /**
             * 1懂从、允許核心線程退出
             * 2授段、當(dāng)前的線程數(shù)量超過(guò)核心線程數(shù)
             * 這時(shí)獲取任務(wù)的機(jī)制切換為poll(keepAliveTime)
             */
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            /**
             * 1、線程數(shù)大于maximumPoolSize(什么時(shí)候會(huì)出現(xiàn)這種情況番甩? 當(dāng)maximumPoolSize初始設(shè)置為0或者其他線程通過(guò)set方法對(duì)其進(jìn)行修改)
             * 2侵贵、線程數(shù)未超過(guò)maximumPoolSize但是timed為true(允許核心線程退出或者線程數(shù)量超過(guò)核心線程)
             * 并且上次獲取任務(wù)超時(shí)(沒(méi)獲取到任務(wù),我們推測(cè)本次依舊會(huì)超時(shí))
             * 3、在滿足條件1或者條件2的情況下進(jìn)行check:運(yùn)行線程數(shù)大于1或者任務(wù)隊(duì)列沒(méi)有任務(wù)
             */
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c)) // CAS進(jìn)行worker數(shù)量-1缘薛,成功則返回null進(jìn)行worker退出流程,失敗則繼續(xù)自旋
                    return null;
                continue;
            }
            try {
                // 如果允許超時(shí)退出宴胧,則調(diào)用poll(keepAliveTime)獲取任務(wù)漱抓,否則則通過(guò)tack()一直阻塞等待直到有任務(wù)提交到隊(duì)列
                Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;// 當(dāng)?shù)却^(guò)keepAliveTime時(shí)間未獲取到任務(wù)時(shí),標(biāo)記為true恕齐。在下次自旋時(shí)會(huì)進(jìn)入銷毀流程
            } catch (InterruptedException retry) {
                // 什么時(shí)候會(huì)拋出異常乞娄?當(dāng)調(diào)用shutdown或者shutdownNow方法觸發(fā)worker內(nèi)的Thread調(diào)用interrupt方法時(shí)會(huì)執(zhí)行到此處
                timedOut = false;
            }
        }
    }
  1. 關(guān)閉線程池
  public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        // 利用排它鎖進(jìn)行上鎖,保證只有一個(gè)線程執(zhí)行關(guān)閉流程
        mainLock.lock();
        try {
            // 安全檢查
            checkShutdownAccess();
            // 內(nèi)部通過(guò)自旋+CAS修改線程池狀態(tài)為shutdown
            advanceRunState(SHUTDOWN);
            // 遍歷所有的worker显歧,進(jìn)行線程中斷通知
            interruptIdleWorkers();
            // 鉤子函數(shù)
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        // 進(jìn)行最后的整理工作
        tryTerminate();
    }
  public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        ...和shutdown類似仪或,將狀態(tài)修改為stop并返回在任務(wù)隊(duì)列排隊(duì)的任務(wù) ...
        return tasks;
    }

總結(jié)

線程池能為我們減少線程創(chuàng)建的開銷,但是相應(yīng)參數(shù)的設(shè)置需要不斷測(cè)試從而到達(dá)一個(gè)相對(duì)最優(yōu)的配置

  1. 過(guò)大的線程數(shù)可能導(dǎo)致CPU切換過(guò)于頻繁從而導(dǎo)致效率降低
  2. 過(guò)小的線程數(shù)可能導(dǎo)致CPU利用率不高
  3. 有界隊(duì)列可以防止資源耗盡士骤,但是我們需要考慮在生產(chǎn)速度大于消費(fèi)速度時(shí)提交任務(wù)帶來(lái)的拒絕問(wèn)題
  4. 無(wú)界隊(duì)列在消費(fèi)速度小于生產(chǎn)隊(duì)列時(shí)可能導(dǎo)致頻繁的GC從而降低系統(tǒng)響應(yīng)速度

以上所述都是個(gè)人學(xué)習(xí)源碼之中的一點(diǎn)心得體會(huì)范删,如果不實(shí)之處,望大家諒解和指正

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末敦间,一起剝皮案震驚了整個(gè)濱河市瓶逃,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌廓块,老刑警劉巖厢绝,帶你破解...
    沈念sama閱讀 210,978評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異带猴,居然都是意外死亡昔汉,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,954評(píng)論 2 384
  • 文/潘曉璐 我一進(jìn)店門拴清,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)靶病,“玉大人,你說(shuō)我怎么就攤上這事口予÷χ埽” “怎么了?”我有些...
    開封第一講書人閱讀 156,623評(píng)論 0 345
  • 文/不壞的土叔 我叫張陵沪停,是天一觀的道長(zhǎng)煤辨。 經(jīng)常有香客問(wèn)我裳涛,道長(zhǎng),這世上最難降的妖魔是什么众辨? 我笑而不...
    開封第一講書人閱讀 56,324評(píng)論 1 282
  • 正文 為了忘掉前任端三,我火速辦了婚禮,結(jié)果婚禮上鹃彻,老公的妹妹穿的比我還像新娘郊闯。我一直安慰自己,他們只是感情好蛛株,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,390評(píng)論 5 384
  • 文/花漫 我一把揭開白布团赁。 她就那樣靜靜地躺著,像睡著了一般泳挥。 火紅的嫁衣襯著肌膚如雪然痊。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,741評(píng)論 1 289
  • 那天屉符,我揣著相機(jī)與錄音剧浸,去河邊找鬼。 笑死矗钟,一個(gè)胖子當(dāng)著我的面吹牛唆香,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播吨艇,決...
    沈念sama閱讀 38,892評(píng)論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼躬它,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了东涡?” 一聲冷哼從身側(cè)響起冯吓,我...
    開封第一講書人閱讀 37,655評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎疮跑,沒(méi)想到半個(gè)月后组贺,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,104評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡祖娘,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,451評(píng)論 2 325
  • 正文 我和宋清朗相戀三年失尖,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片渐苏。...
    茶點(diǎn)故事閱讀 38,569評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡掀潮,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出琼富,到底是詐尸還是另有隱情仪吧,我是刑警寧澤,帶...
    沈念sama閱讀 34,254評(píng)論 4 328
  • 正文 年R本政府宣布鞠眉,位于F島的核電站邑商,受9級(jí)特大地震影響摄咆,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜人断,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,834評(píng)論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望朝蜘。 院中可真熱鬧恶迈,春花似錦、人聲如沸谱醇。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,725評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)副渴。三九已至奈附,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間煮剧,已是汗流浹背斥滤。 一陣腳步聲響...
    開封第一講書人閱讀 31,950評(píng)論 1 264
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留勉盅,地道東北人佑颇。 一個(gè)月前我還...
    沈念sama閱讀 46,260評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像草娜,于是被迫代替她去往敵國(guó)和親挑胸。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,446評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容