java線程池源碼走讀(一)

  • execute源碼
public void execute(Runnable command) {
// 任務(wù)的具體實(shí)現(xiàn)邏輯類不能為空       
 if (command == null)
            throw new NullPointerException();
        /*
        * 1 如果當(dāng)前線程池中的線程數(shù)小于核心線程數(shù)corePoolSize垢啼,則創(chuàng)建一個(gè)新的線程它浅,不過該線程是封裝在Worker對(duì)象中
        * 2 addWorker方法中的第一個(gè)參數(shù)是該線程的第一個(gè)任務(wù),而第二個(gè)參數(shù)就是代表是否創(chuàng)建的是核心線程
        * 3 如果當(dāng)前線程池中的線程數(shù)已經(jīng)滿足了核心線程數(shù)corePoolSize罩引,那么就會(huì)通過workQueue.offer()方法將任務(wù)添加到阻塞隊(duì)列中等待執(zhí)行
        * 4 如果線程數(shù)已經(jīng)達(dá)到了corePoolSize且阻塞隊(duì)列中無法插入該任務(wù)(比如已滿)嵌器,并且沒有超過最大線程數(shù)maximumPoolSize,那么線程池就會(huì)再增加一個(gè)非核心線程來執(zhí)行該任務(wù)
        * 5 如果確實(shí)已經(jīng)達(dá)到了最大線程數(shù)层宫,那么就拒絕這個(gè)任務(wù)
        * */
        int c = ctl.get();
        // 檢查當(dāng)前線程數(shù)是否達(dá)到了核心線程數(shù)
        if (workerCountOf(c) < corePoolSize) {
            // 未達(dá)到核心線程數(shù)杨伙,則創(chuàng)建新核心線程
            // 并將傳入的任務(wù)作為該線程的第一個(gè)任務(wù)
            if (addWorker(command, true))
                // 添加核心線程成功則直接返回捶闸,如果沒有添加成功儡司,就該執(zhí)行上面所說的3
                return;
            // 因?yàn)榍懊嬲{(diào)用了耗時(shí)操作addWorker方法
            // 所以線程池狀態(tài)有可能發(fā)生了改變,重新獲取狀態(tài)等信息
            c = ctl.get();
        }

        // 走到這里說明當(dāng)前線程池中的線程數(shù)已經(jīng)滿足了核心線程數(shù)corePoolSize
        // 如果線程池當(dāng)前狀態(tài)是運(yùn)行中就調(diào)用workQueue.offer方法將任務(wù)放入阻塞隊(duì)列
        if (isRunning(c) && workQueue.offer(command)) {
            // 因?yàn)檫@個(gè)放入操作比較耗時(shí)历极,所以在放入成功之后毁菱,又做了一些列的校驗(yàn)操作
            int recheck = ctl.get();
            // 如果當(dāng)前狀態(tài)變成了非運(yùn)行中(因?yàn)榫€程池的狀態(tài)只能從小到大進(jìn)行狀態(tài)遷移米死,如果不是RUNNING那么肯定是至少處于SHUTDOWN狀態(tài),從該狀態(tài)開始就不再接收新任務(wù)了)贮庞,則將剛才放入阻塞隊(duì)列的任務(wù)拿出峦筒,拿出成功后,直接拒絕這個(gè)任務(wù)
            // 疑問1
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 如果線程池中沒有線程了窗慎,那就創(chuàng)建一個(gè)非核心線程
            // 這里沒有看明白物喷,一個(gè)線程也沒有了卤材,為什么創(chuàng)建的不是核心線程?峦失?扇丛??
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 如果如果放入阻塞隊(duì)列失敵杞(如隊(duì)列已滿)晕拆,則添加一個(gè)非核心線程
        // 如果不是RUNNING 這里為什么還能新建線程呢?而疑問1的地方不是運(yùn)行態(tài)為什么要取出來拒絕材蹬,而不是跟這里一樣建一個(gè)非核心線程進(jìn)行處理?吝镣?堤器?
        // 目測只能是addWorker邏輯中有非運(yùn)行態(tài)的判斷肯定也會(huì)創(chuàng)建不成功(具體可以查看addWorker源碼注釋)
        else if (!addWorker(command, false))
            // 如果添加線程失敗(如已經(jīng)達(dá)到了最大線程數(shù))末贾,則拒絕任務(wù)
            reject(command);
    }
  • addWorker源碼
// 改方法的主要作用是新建一個(gè)線程并啟用,創(chuàng)建的線程被包裝成了Worker對(duì)象
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // 1 rs > SHUTDOWN 此時(shí)不再接受新的任務(wù)闸溃,直接返回添加失敗
            // 2 rs = SHUTDOWN:firtTask != null 也會(huì)創(chuàng)建線程失敗(這里也正好印證了我們?cè)趀xecute函數(shù)注解中留下的疑問)拱撵,此時(shí)不再接受任務(wù)辉川,但是仍然會(huì)執(zhí)行隊(duì)列中的任務(wù)
            // 3 線程池SHUTDOWN了不再接受新任務(wù),但是此時(shí)隊(duì)列不為空拴测,那么還得創(chuàng)建線程把任務(wù)給執(zhí)行完才行
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            /*
            * 走都這里乓旗,說明線程池狀態(tài)或者為RUNNING,
            * 或者是SHUTDOWN狀態(tài)集索,但是任務(wù)隊(duì)列中還有任務(wù)屿愚,這個(gè)時(shí)候也是需要新建線程(當(dāng)然這個(gè)線程究竟能不能新建成功,還需要后續(xù)一系列條件的限制)去執(zhí)行完任務(wù)
            * */
            for (;;) {
                int wc = workerCountOf(c);
                /*
                * 線程數(shù)不能超過容量限制务荆;如果是創(chuàng)建核心線程妆距,也不能超過核心線程數(shù),非核心線程也不能超過設(shè)置的最大值
                * */
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 原子的把線程數(shù)+1函匕,這里只是把線程數(shù)+1娱据,但是還沒有真正的創(chuàng)建工作線程
                if (compareAndIncrementWorkerCount(c))
                    break retry; // 操作成功就退出重試循環(huán)
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs) // 如果線程池的狀態(tài)發(fā)生變化重試
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        // 走到這里就是真正的開始創(chuàng)建工作線程了
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 利用Worker構(gòu)造方法中的線程池工廠創(chuàng)建線程,并把線程封裝成Worker對(duì)象
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                // 對(duì)線程池的操作都要獲取主鎖盅惜,避免添加和啟動(dòng)線程時(shí)受到干擾
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    /*
                    * 如果線程池沒有關(guān)閉中剩,或者 已經(jīng)關(guān)閉且任務(wù)不為空
                    * */
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 檢查線程是否仍然存活其實(shí)就是檢查線程是否已啟動(dòng)(線程啟動(dòng)了就是存活了)
                        // 正常邏輯這里線程應(yīng)該不是啟動(dòng)狀態(tài),只有在調(diào)用了start之后才是啟動(dòng)狀態(tài)
                        // 所以如果這里是alive就報(bào)錯(cuò)
                        if (t.isAlive())
                            throw new IllegalThreadStateException();
                        workers.add(w); //添加到workers線程數(shù)組中
                        int s = workers.size();
                        // 整個(gè)線程池運(yùn)行期間的最大并發(fā)線程數(shù)更新
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 添加到線程數(shù)組成功后酷窥,才啟動(dòng)新添加的線程
                if (workerAdded) {
                    // 啟動(dòng)線程的方法是調(diào)用Thread類的start()方法咽安,該方法會(huì)在內(nèi)部調(diào)用Runnable接口的run()方法,即會(huì)調(diào)用Worker對(duì)象中的run方法蓬推,
                    // 這是因?yàn)?w = new Worker(firstTask)時(shí) 其中的newThread時(shí)把this(即Worker本身傳進(jìn)去了)而worker實(shí)現(xiàn)了Runnable接口
                    // 所以執(zhí)行start就是執(zhí)行run就是執(zhí)行Worker對(duì)象的runWorker(this)方法
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                // 啟動(dòng)失敗就回退線程的創(chuàng)建妆棒,即釋放掉線程
                addWorkerFailed(w);
        }
        return workerStarted;
    }

我們知道一個(gè)Thread在執(zhí)行完其中的run方法之后就會(huì)退出,線程的生命周期也就結(jié)束了,那線程池中的線程是如何做到復(fù)用的呢糕珊?我們來看以下Worker類中的runWorker(this)方法动分。該方法會(huì)在線程啟動(dòng)時(shí)被調(diào)用(具體調(diào)用邏輯請(qǐng)看addWorker的源碼注釋)

  • runWorker(this)源碼分析
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        //在構(gòu)造Worker對(duì)象的時(shí)候,會(huì)把一個(gè)任務(wù)添加進(jìn)Worker對(duì)象中作為新增線程的第一個(gè)任務(wù)來執(zhí)行
        Runnable task = w.firstTask;
        //已經(jīng)將該任務(wù)拿出來進(jìn)行執(zhí)行红选,則需要將該worker對(duì)象 即線程池中的線程對(duì)象持有的任務(wù)清空
        w.firstTask = null;
        //將AQS鎖資源的狀態(tài)有-1變成0澜公,運(yùn)行該線程允許進(jìn)行中斷
        w.unlock(); // allow interrupts
        //用來判斷執(zhí)行任務(wù)的過程中,是否出現(xiàn)了異常喇肋,默認(rèn)是異常退出
        boolean completedAbruptly = true;
        try {
            // 這里先大概說一下getTask的大概邏輯坟乾,核心線程會(huì)一直阻塞等待任務(wù)隊(duì)列中有任務(wù)
            // 非核心線程會(huì)在等待keepAliveTime的超時(shí)時(shí)間之后,如果任務(wù)隊(duì)列中還沒有任務(wù)就會(huì)獲取到null
            // 然后退出whie循環(huán)蝶防,進(jìn)入到finally中 執(zhí)行processWorkerExit結(jié)束調(diào)線程
            while (task != null || (task = getTask()) != null) {
                //給該線程加鎖甚侣,一個(gè)線程只處理一個(gè)任務(wù)
                w.lock();
                // 線程池是否是STOP狀態(tài)
                // 如果是,則確保當(dāng)前線程是中斷狀態(tài)
                // 如果不是间学,則確保當(dāng)前線程不是 中斷狀態(tài)
                // 中斷對(duì)正在運(yùn)行的線程不起作用殷费,只對(duì)阻塞的線程起作用,這里具體怎么用低葫?详羡??嘿悬?实柠??
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //擴(kuò)展使用鹊漠,在執(zhí)行任務(wù)的run方法之前執(zhí)行主到;前置鉤子
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 執(zhí)行run方法
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //  后置鉤子
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            // 正常執(zhí)行完任務(wù)
            completedAbruptly = false;
        } finally {
            //所有的任務(wù)都處理完后,或者執(zhí)行任務(wù)的過程中出現(xiàn)了異常
            processWorkerExit(w, completedAbruptly);
        }
    }
  • getTask()源碼 核心線程池可以復(fù)用的原理就在這個(gè)函數(shù)里面
private Runnable getTask() {

       // 標(biāo)記是否獲取任務(wù)超時(shí)
       boolean timedOut = false; // Did the last poll() time out?

       for (;;) {
           int c = ctl.get();
           int rs = runStateOf(c);

           // Check if queue empty only if necessary.
           /*
           * 如果線程池的當(dāng)前狀態(tài)為>=關(guān)閉的狀態(tài) 并且 (已停止或者任務(wù)隊(duì)列中為空)
           * */
           if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
               decrementWorkerCount(); //Worker總數(shù)減1躯概,返回null 外層的runWorker就會(huì)退出循環(huán)登钥,進(jìn)入銷毀線程的邏輯
               return null;
           }

           int wc = workerCountOf(c);

           // Are workers subject to culling?
           // allowCoreThreadTimeOut:是否允許core Thread超時(shí),默認(rèn)false
           // workerCount是否大于核心核心線程池

           // 該標(biāo)志的含義是:是否需要等待keepAliveTime時(shí)間再去隊(duì)列中獲取
           // 也就是說如果隊(duì)列中沒有任務(wù)娶靡,可以等待keepAliveTime時(shí)長牧牢,如果還沒有任務(wù)的話,那么則返回null
           boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

           // 如果設(shè)置了allowCoreThreadTimeOut姿锭,那么大于coreThread(核心線程)就會(huì)有會(huì)有超時(shí)時(shí)間(即keepAliveTime)的限制塔鳍,
           // 即如果當(dāng)前的WorkerCount大于corePoolSize的話,那么超過的這些線程都會(huì)有超時(shí)的限制
           //1如果線程數(shù)超過最大限制呻此,直接線程數(shù)-1轮纫,然后返回null(該線程就會(huì)銷毀)
           //2如果啟用了非核心線程超時(shí)設(shè)置,那么當(dāng)獲取任務(wù)超時(shí)并且線程池中有多于1個(gè)線程焚鲜,就返回null并回收此線程
           //3如果啟用了非核心線程超時(shí)設(shè)置掌唾,那么當(dāng)獲取任務(wù)超時(shí) 并且任務(wù)隊(duì)列為空放前,就減1個(gè)線程(如果只有一個(gè)線程就會(huì)減失敗)
           // 仍熱會(huì)繼續(xù)該循環(huán)糯彬,也就是說線程池中始終會(huì)至少有一個(gè)線程
           if ((wc > maximumPoolSize || (timed && timedOut))
               && (wc > 1 || workQueue.isEmpty())) {
               if (compareAndDecrementWorkerCount(c))
                   return null;
               continue;
           }

           try {
               
               // 如果啟用了非核心線程超時(shí)設(shè)置凭语,就使用poll獲取任務(wù),如果隊(duì)列中沒任務(wù)撩扒,該方法會(huì)獲取超時(shí)似扔,就會(huì)在下次循環(huán)中返回null
               // 核心線程會(huì)使用take方法獲取任務(wù),take方法在任務(wù)隊(duì)列中沒有任務(wù)時(shí)搓谆,會(huì)阻塞炒辉。
               Runnable r = timed ?
                   workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                   workQueue.take();
               if (r != null)
                   //判斷任務(wù)不為空返回任務(wù)
                   return r;
               //獲取一段時(shí)間沒有獲取到,獲取超時(shí)
               timedOut = true;
           } catch (InterruptedException retry) {
               timedOut = false;
           }
       }
   }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末泉手,一起剝皮案震驚了整個(gè)濱河市辆脸,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌螃诅,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,311評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件状囱,死亡現(xiàn)場離奇詭異术裸,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)亭枷,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,339評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門袭艺,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人叨粘,你說我怎么就攤上這事猾编。” “怎么了升敲?”我有些...
    開封第一講書人閱讀 152,671評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵答倡,是天一觀的道長。 經(jīng)常有香客問我驴党,道長瘪撇,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,252評(píng)論 1 279
  • 正文 為了忘掉前任港庄,我火速辦了婚禮倔既,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘鹏氧。我一直安慰自己渤涌,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,253評(píng)論 5 371
  • 文/花漫 我一把揭開白布把还。 她就那樣靜靜地躺著实蓬,像睡著了一般茸俭。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上瞳秽,一...
    開封第一講書人閱讀 49,031評(píng)論 1 285
  • 那天瓣履,我揣著相機(jī)與錄音,去河邊找鬼练俐。 笑死袖迎,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的腺晾。 我是一名探鬼主播燕锥,決...
    沈念sama閱讀 38,340評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼悯蝉!你這毒婦竟也來了归形?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,973評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤鼻由,失蹤者是張志新(化名)和其女友劉穎暇榴,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體蕉世,經(jīng)...
    沈念sama閱讀 43,466評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡蔼紧,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,937評(píng)論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了狠轻。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片奸例。...
    茶點(diǎn)故事閱讀 38,039評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖向楼,靈堂內(nèi)的尸體忽然破棺而出查吊,到底是詐尸還是另有隱情,我是刑警寧澤湖蜕,帶...
    沈念sama閱讀 33,701評(píng)論 4 323
  • 正文 年R本政府宣布逻卖,位于F島的核電站,受9級(jí)特大地震影響重荠,放射性物質(zhì)發(fā)生泄漏箭阶。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,254評(píng)論 3 307
  • 文/蒙蒙 一戈鲁、第九天 我趴在偏房一處隱蔽的房頂上張望仇参。 院中可真熱鬧,春花似錦婆殿、人聲如沸诈乒。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,259評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽怕磨。三九已至喂饥,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間肠鲫,已是汗流浹背员帮。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評(píng)論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留导饲,地道東北人捞高。 一個(gè)月前我還...
    沈念sama閱讀 45,497評(píng)論 2 354
  • 正文 我出身青樓,卻偏偏與公主長得像渣锦,于是被迫代替她去往敵國和親硝岗。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,786評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容