細(xì)說(shuō)線程池---高級(jí)篇

線程源碼分析

上一篇中已經(jīng)講了線程池的原理讯柔。這一次來(lái)說(shuō)說(shuō)源碼執(zhí)行過(guò)程孵延。建議先看看細(xì)說(shuō)線程池---入門(mén)篇 細(xì)說(shuō)線程池---中級(jí)篇

依然使用newFixedThreadPool()方法創(chuàng)建線程池女淑。

看源碼從execute(Runnable runable)開(kāi)始扯再。

  public void execute(Runnable command) {
       if (command == null)
           throw new NullPointerException();
       int c = ctl.get();
       ///1.當(dāng)前池中線程比核心數(shù)少焊唬,新建一個(gè)線程執(zhí)行任務(wù)
       //workerCountOf計(jì)算出線程個(gè)數(shù)
       if (workerCountOf(c) < corePoolSize) {
           //線程池中創(chuàng)建一個(gè)線程worker
           if (addWorker(command, true))
               return;
           c = ctl.get();
      }
       ////2.核心池已滿,但任務(wù)隊(duì)列未滿,添加到隊(duì)列中
       if (isRunning(c) && workQueue.offer(command)) {
           int recheck = ctl.get();
           //任務(wù)成功添加到隊(duì)列以后,再次檢查是否需要添加新的線程缎谷,
           // 因?yàn)橐汛嬖诘木€程可能被銷毀了
           if (! isRunning(recheck) && remove(command))
               ////如果線程池處于非運(yùn)行狀態(tài),
               // 并且把當(dāng)前的任務(wù)從任務(wù)隊(duì)列中
               //移除成功灶似,則拒絕該任務(wù)
               reject(command);
           ////如果之前的線程已被銷毀完列林,新建一個(gè)線程
           else if (workerCountOf(recheck) == 0)
               addWorker(null, false);
      }
       ////3.核心池已滿瑞你,隊(duì)列已滿,試著創(chuàng)建一個(gè)新線程
       else if (!addWorker(command, false))
           ////如果創(chuàng)建新線程失敗了希痴,說(shuō)明線程池被關(guān)閉或者線程池完全滿了者甲, 拒絕任務(wù)
           reject(command);
  } 

ctl 的作用

在線程池中,ctl 貫穿在線程池的整個(gè)生命周期中

 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING,0));

它是一個(gè)原子類砌创,主要作用是用來(lái)保存線程數(shù)量和線程池的狀態(tài)虏缸。我們來(lái)分析一下這段代碼,其實(shí)比較有意思嫩实,他用到了位運(yùn)算一個(gè) int 數(shù)值是 32 個(gè) bit 位刽辙,這里采用高 3 位來(lái)保存運(yùn)行狀態(tài),低 29 位來(lái)保存線程數(shù)量舶赔。我們來(lái)分析默認(rèn)情況下扫倡,也就是 ctlOf(RUNNING)運(yùn)行狀態(tài),調(diào)用了 ctlOf(int rs,int wc)方法竟纳;其中

private static int ctlOf(int rs, int wc) { return rs | wc; }

其中 RUNNING =-1 << COUNT_BITS 撵溃;-1 左移 29 位,

-1 的二進(jìn)制是 32 個(gè) 1(1111 1111 11111111 1111 1111 1111 1111)锥累;</pre>

-1 的二進(jìn)制計(jì)算方法原碼是 1000…001 . 高位 1 表示符號(hào)位缘挑。然后對(duì)原碼取反,高位不變得到 1111…110然后對(duì)反碼進(jìn)行+1 桶略,也就是補(bǔ)碼操作语淘, 最后得到 1111…1111

那么-1 <<左移 29 位, 也就是 【111】 表示际歼;rs | wc 惶翻。二進(jìn)制的 111 | 000 。

得到的結(jié)果仍然是 111鹅心。

那么同理可得其他的狀態(tài)的 bit 位表示

//32-3
private static final int COUNT_BITS = Integer.SIZE - 3;
//將 1 的二進(jìn)制向右位移 29 位,再減 1 表示最大線程容量
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 運(yùn)行狀態(tài)保存在 int 值的高 3 位 ( 所有數(shù)值左移 29 位 )
// 接收新任務(wù),并執(zhí)行隊(duì)列中的任務(wù)
private static final int RUNNING = -1 << COUNT_BITS;
// 不接收新任務(wù),但是執(zhí)行隊(duì)列中的任務(wù)
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 不接收新任務(wù),不執(zhí)行隊(duì)列中的任務(wù),中斷正在執(zhí)行中的任務(wù)
private static final int STOP = 1 << COUNT_BITS;
// 所有的任務(wù)都已結(jié)束,線程數(shù)量為 0,處于該狀態(tài)的線程池即將調(diào)用 terminated()方法
private static final int TIDYING = 2 << COUNT_BITS;
// terminated()方法執(zhí)行完成
private static final int TERMINATED = 3 << COUNT_BITS;</pre>

線程池狀態(tài)變化圖

image

addWorker

再回到上面源碼中吕粗,當(dāng)線程池中線程數(shù)小于核心線程數(shù)的時(shí)候:會(huì)調(diào)用 addWorker,顧名思義旭愧,其實(shí)就是要?jiǎng)?chuàng)建一個(gè)工作線程颅筋。我們來(lái)看看源碼的實(shí)現(xiàn)源碼比較長(zhǎng),看起來(lái)比較唬人输枯,其實(shí)就做了兩件事议泵。

  1. 才用循環(huán) CAS操作來(lái)將線程數(shù)加 1

  2. 新建一個(gè)線程并啟用

private boolean addWorker(Runnable firstTask, boolean core) {
       //goto 語(yǔ)句,避免死循環(huán)
       retry:
       for (;;) {
           int c = ctl.get();
           int rs = runStateOf(c);
           //如果線程處于非運(yùn)行狀態(tài),并且 rs 不等于 SHUTDOWN 且 firstTask 不等于空且且
          // workQueue 為空桃熄,直接返回 false (表示不可添加 work 狀態(tài))
         // 1\. 線程池已經(jīng) shutdown 后先口,還要添加新的任務(wù),拒絕
          // 2\. (第二個(gè)判斷) SHUTDOWN 狀態(tài)不接受新任務(wù),但仍然會(huì)執(zhí)行已經(jīng)加入任務(wù)隊(duì)列的任
          // 務(wù)池充,所以當(dāng)進(jìn)入 SHUTDOWN 狀態(tài)桩引,而傳進(jìn)來(lái)的任務(wù)為空,并且任務(wù)隊(duì)列不為空的時(shí)候收夸,是允許添加
          // 新線程的 , 如果把這個(gè)條件取反,就表示不允許添加 worker
           if (rs >= SHUTDOWN &&
                   ! (rs == SHUTDOWN &&
                           firstTask == null &&
                           ! workQueue.isEmpty()))
               return false;
           for (;;) { //自旋
               int wc = workerCountOf(c);//獲得 Worker 工作線程數(shù)
               //如果工作線程數(shù)大于默認(rèn)容量大小或者大于核心線程數(shù)大小血崭,
               // 則直接返回 false 表示不能再添加 worker卧惜。
               if (wc >= CAPACITY ||
                       wc >= (core ? corePoolSize : maximumPoolSize))
                   return false;
               //通過(guò) cas 來(lái)增加工作線程數(shù),
               if (compareAndIncrementWorkerCount(c))
               //如果 cas 失敗夹纫,則直接重試
               break retry;
               // 再次獲取 ctl 的值
               c = ctl.get();
               //這里如果不想等咽瓷,說(shuō)明線程的狀態(tài)發(fā)生了變化,繼續(xù)重試
               if (runStateOf(c) != rs)
               continue retry;
          }
      }
       //上面這段代碼主要是對(duì) worker 數(shù)量做原子+1 操作,
       // 下面的邏輯才是正式構(gòu)建一個(gè) worker
       boolean workerStarted = false; //工作線程是否啟動(dòng)的標(biāo)識(shí)
       boolean workerAdded = false; //工作線程是否已經(jīng)添加成功的標(biāo)識(shí)
       Worker w = null;
       try {
           //構(gòu)建一個(gè) Worker舰讹,這個(gè) worker 是什么呢茅姜?
           //我們 可以看到構(gòu)造方法里面?zhèn)魅肓艘粋€(gè) Runnable 對(duì)象
           w = new Worker(firstTask);
           final Thread t = w.thread; //從 worker 對(duì)象中取出線程
           if (t != null) {
               final ReentrantLock mainLock = this.mainLock;
               mainLock.lock(); //這里有個(gè)重入鎖,避免并發(fā)問(wèn)題
               try {
                   int rs = runStateOf(ctl.get());
                  //只有當(dāng)前線程池是正在運(yùn)行狀態(tài)月匣,[或是 SHUTDOWN
                   // 且 firstTask 為空]钻洒,才能添加到 workers 集合中
                   if (rs < SHUTDOWN ||
                          (rs == SHUTDOWN && firstTask == null)) {
                   //任務(wù)剛封裝到 work 里面,還沒(méi) start,你封裝的線程就是 alive锄开,
                   // 幾個(gè)意思素标?肯定是要拋異常出去的

                       if (t.isAlive()) // precheck that t is startable
                           throw new IllegalThreadStateException();
                       workers.add(w); //將新創(chuàng)建的 Worker 添加到 workers 集合中
                       int s = workers.size();
                     //如果集合中的工作線程數(shù)大于最大線程數(shù),
                     // 這個(gè)最大線程數(shù)表示線程池曾經(jīng)出現(xiàn)過(guò)的最大線程數(shù)
                       if (s > largestPoolSize)
                           largestPoolSize = s; //更新線程池出現(xiàn)過(guò)的最大線程數(shù)
                       workerAdded = true;//表示工作線程創(chuàng)建成功了
                  }
              } finally {
                   mainLock.unlock(); //釋放鎖
              }
               if (workerAdded) {//如果 worker 添加成功
                   t.start();//啟動(dòng)線程
                   workerStarted = true;
              }
          }
      } finally {
           if (! workerStarted)
               //如果添加失敗萍悴,就需要做一件事头遭,就是遞減實(shí)際工作線
               //程數(shù)(還記得我們最開(kāi)始的時(shí)候增加了工作線程數(shù)嗎)
               addWorkerFailed(w);

      }
       //返回結(jié)果
       return workerStarted;
  }

Worker說(shuō)明

我們發(fā)現(xiàn) addWorker 方法只是構(gòu)造了一個(gè) Worker,并且把 firstTask 封裝到 worker 中癣诱,它是做什么的呢计维?我們來(lái)看看

  1. 每個(gè) worker,都是一條線程,同時(shí)里面包含了一個(gè) firstTask,即初始化時(shí)要被首先執(zhí)行的任務(wù).

  2. 最終執(zhí)行任務(wù)的,是 runWorker()方法Worker 類繼承了 AQS,并實(shí)現(xiàn)了 Runnable 接口撕予,注意其中的 firstTask 和 thread 屬性:firstTask 用它來(lái)保存?zhèn)魅氲娜蝿?wù)鲫惶;thread 是在調(diào)用構(gòu)造方法時(shí)通過(guò) ThreadFactory 來(lái)創(chuàng)建的線程,是用來(lái)處理任務(wù)的線程嗅蔬。

在調(diào)用構(gòu)造方法時(shí)剑按,需要傳入任務(wù),這里通過(guò) getThreadFactory().newThread(this);來(lái)新建一個(gè)線程澜术,newThread 方法傳入的參數(shù)是 this艺蝴,因?yàn)?Worker 本身繼承了 Runnable 接口,也就是一個(gè)線程鸟废,所以一個(gè) Worker 對(duì)象在啟動(dòng)的時(shí)候會(huì)調(diào)用 Worker 類中的 run 方法猜敢。Worker 繼承了 AQS,使用 AQS 來(lái)實(shí)現(xiàn)獨(dú)占鎖的功能。為什么不使用 ReentrantLock 來(lái)實(shí)現(xiàn)呢缩擂?可以看到 tryAcquire 方法鼠冕,它是不允許重入的,而 ReentrantLock 是允許重入的:lock 方法一旦獲取了獨(dú)占鎖胯盯,表示當(dāng)前線程正在執(zhí)行任務(wù)中懈费;那么它會(huì)有以下幾個(gè)作用

  1. 如果正在執(zhí)行任務(wù),則不應(yīng)該中斷線程博脑;

  2. 如果該線程現(xiàn)在不是獨(dú)占鎖的狀態(tài)憎乙,也就是空閑的狀態(tài),說(shuō)明它沒(méi)有在處理任務(wù)叉趣,這時(shí)可以對(duì)該線程進(jìn)行中斷泞边;

  3. 線程池在執(zhí)行 shutdown 方法或 tryTerminate 方法時(shí)會(huì)調(diào)用 interruptIdleWorkers 方法來(lái)中斷空閑的線程,interruptIdleWorkers 方法會(huì)使用 tryLock 方法來(lái)判斷線程池中的線程是否是空閑狀態(tài)

  4. 之所以設(shè)置為不可重入疗杉,是因?yàn)槲覀儾幌M蝿?wù)在調(diào)用像 setCorePoolSize 這樣的線程池控制方法時(shí)重新獲取鎖阵谚,這樣會(huì)中斷正在運(yùn)行的線程

 //繼承了AQS還實(shí)現(xiàn)了Runnable
private final class Worker
           extends AbstractQueuedSynchronizer
           implements Runnable
  {
       /**
        * This class will never be serialized, but we provide a
        * serialVersionUID to suppress a javac warning.
        */
       private static final long serialVersionUID = 6138294804551838833L;

       //注意了,這才是真正執(zhí)行task的線程烟具,從構(gòu)造函數(shù)可知是由
        //ThreadFactury 創(chuàng)建的
       final Thread thread;
       //這就是需要執(zhí)行的 task
       Runnable firstTask;
       /** 每個(gè)線程完成任務(wù)的計(jì)數(shù)器*/
       volatile long completedTasks;

       /**
        * Creates with given first task and thread from ThreadFactory.
        * @param firstTask the first task (null if none)
        */
       Worker(Runnable firstTask) {
           //// 初始狀態(tài) -1, 防止在調(diào)用 runWorker() 梢什,
           // 也就是真正執(zhí)行task前中斷thread 。
           setState(-1);  
           this.firstTask = firstTask;
           this.thread = getThreadFactory().newThread(this);
      }

       /** Delegates main run loop to outer runWorker */
       public void run() {
           runWorker(this);
      }

       // Lock methods
       //
       // The value 0 represents the unlocked state.
       // The value 1 represents the locked state.
       protected boolean isHeldExclusively() {
           return getState() != 0;
      }

       protected boolean tryAcquire(int unused) {
           if (compareAndSetState(0, 1)) {
               setExclusiveOwnerThread(Thread.currentThread());
               return true;
          }
           return false;
      }

       protected boolean tryRelease(int unused) {
           setExclusiveOwnerThread(null);
           setState(0);
           return true;
      }

       public void lock()       { acquire(1); }
       public boolean tryLock() { return tryAcquire(1); }
       public void unlock()     { release(1); }
       public boolean isLocked() { return isHeldExclusively(); }

       void interruptIfStarted() {
           Thread t;
           if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
               try {
                   t.interrupt();
              } catch (SecurityException ignore) {
              }
          }
      }
  } 

addWorkerFailed方法

addWorker方法中净赴,如果添加 Worker 并且啟動(dòng)線程失敗绳矩,則會(huì)做失敗后的處理。這個(gè)方法主要做兩件事

  1. 如果 worker 已經(jīng)構(gòu)造好了玖翅,則從 workers 集合中移除這個(gè) worker

  2. 原子遞減核心線程數(shù)(因?yàn)樵?addWorker 方法中先做了原子增加)

  3. 嘗試結(jié)束線程池

  private void addWorkerFailed(java.util.concurrent.ThreadPoolExecutor.Worker w) {
       final ReentrantLock mainLock = this.mainLock;
       //上鎖
       mainLock.lock();
       try {
           if (w != null)
               workers.remove(w);
           decrementWorkerCount();
           tryTerminate();
      } finally {
           //釋放鎖
           mainLock.unlock();
      }
  }</pre>

runWorker 方法

前面已經(jīng)了解了 ThreadPoolExecutor的核心方法 addWorker翼馆,主要作用是增加工作線程,而 Worker 簡(jiǎn)單理解其實(shí)就是一個(gè)線程金度,里面重新了 run 方法应媚,這塊是線程池中執(zhí)行任務(wù)的真正處理邏輯,也就是 runWorker方法猜极,這個(gè)方法主要做幾件事:

  1. 如果 task 不為空,則開(kāi)始執(zhí)行 task

  2. 如果 task 為空,則通過(guò) getTask()再去取任務(wù),并賦值給 task,如果取到的 Runnable 不為空,則執(zhí)行該任務(wù)

  3. 執(zhí)行完畢后,通過(guò) while 循環(huán)繼續(xù) getTask()取任務(wù)

  4. 如果 getTask()取到的任務(wù)依然是空,那么整個(gè)runWorker()方法執(zhí)行完畢

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末中姜,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子跟伏,更是在濱河造成了極大的恐慌丢胚,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,640評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件受扳,死亡現(xiàn)場(chǎng)離奇詭異携龟,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)勘高,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,254評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén)峡蟋,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)坟桅,“玉大人,你說(shuō)我怎么就攤上這事蕊蝗〗雠遥” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,011評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵蓬戚,是天一觀的道長(zhǎng)夸楣。 經(jīng)常有香客問(wèn)我,道長(zhǎng)子漩,這世上最難降的妖魔是什么裕偿? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,755評(píng)論 1 294
  • 正文 為了忘掉前任,我火速辦了婚禮痛单,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘劲腿。我一直安慰自己旭绒,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,774評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布焦人。 她就那樣靜靜地躺著挥吵,像睡著了一般。 火紅的嫁衣襯著肌膚如雪花椭。 梳的紋絲不亂的頭發(fā)上忽匈,一...
    開(kāi)封第一講書(shū)人閱讀 51,610評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音矿辽,去河邊找鬼丹允。 笑死,一個(gè)胖子當(dāng)著我的面吹牛袋倔,可吹牛的內(nèi)容都是我干的雕蔽。 我是一名探鬼主播,決...
    沈念sama閱讀 40,352評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼宾娜,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼批狐!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起前塔,我...
    開(kāi)封第一講書(shū)人閱讀 39,257評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤嚣艇,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后华弓,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體食零,經(jīng)...
    沈念sama閱讀 45,717評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,894評(píng)論 3 336
  • 正文 我和宋清朗相戀三年该抒,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了慌洪。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片顶燕。...
    茶點(diǎn)故事閱讀 40,021評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖冈爹,靈堂內(nèi)的尸體忽然破棺而出涌攻,到底是詐尸還是另有隱情,我是刑警寧澤频伤,帶...
    沈念sama閱讀 35,735評(píng)論 5 346
  • 正文 年R本政府宣布恳谎,位于F島的核電站,受9級(jí)特大地震影響憋肖,放射性物質(zhì)發(fā)生泄漏因痛。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,354評(píng)論 3 330
  • 文/蒙蒙 一岸更、第九天 我趴在偏房一處隱蔽的房頂上張望鸵膏。 院中可真熱鬧,春花似錦怎炊、人聲如沸谭企。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,936評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)债查。三九已至,卻和暖如春瓜挽,著一層夾襖步出監(jiān)牢的瞬間盹廷,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,054評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工久橙, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留俄占,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,224評(píng)論 3 371
  • 正文 我出身青樓剥汤,卻偏偏與公主長(zhǎng)得像颠放,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子吭敢,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,974評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容