ThreadPoolExecutor詳解

???????ThreadPoolExecutor顧名思義江耀,是一個(gè)線程池管理工具類萨咳,該類主要提供了任務(wù)管理截粗,線程的調(diào)度和相關(guān)的hook方法來控制線程池的狀態(tài)垮刹。

1.方法說明

任務(wù)管理主要方法如下:

public void execute(Runnable command);
public <T> Future<T> submit(Callable<T> task);
public <T> Future<T> submit(Runnable task, T result);
public Future<?> submit(Runnable task);
public void shutdown();
public List<Runnable> shutdownNow();

???????上述方法中达吞,execute()和submit()方法在有空閑線程存在的情況下會(huì)立即調(diào)用該線程執(zhí)行任務(wù),區(qū)別在于execute()方法是忽略任務(wù)執(zhí)行結(jié)果的荒典,而submit()方法則可以獲取結(jié)果酪劫。除此之外,ThreadPoolExecutor還提供了shutdown()和shutdownNow()方法用于關(guān)閉線程池寺董,區(qū)別在于shutdown()方法在調(diào)用之后會(huì)將任務(wù)隊(duì)列中的任務(wù)都執(zhí)行完畢之后再關(guān)閉線程池覆糟,而shutdownNow()方法則會(huì)直接關(guān)閉線程池,并且將任務(wù)隊(duì)列中的任務(wù)導(dǎo)出到一個(gè)列表中返回遮咖。

???????除上述用于執(zhí)行任務(wù)的方法外滩字,ThreadPoolExecutor還提供了如下幾個(gè)hook(鉤子)方法:

protected void beforeExecute(Thread t, Runnable r);
protected void afterExecute(Runnable r, Throwable t);
protected void terminated();

???????在ThreadPoolExecutor中這幾個(gè)方法默認(rèn)都是空方法,beforeExecute()會(huì)在每次任務(wù)執(zhí)行之前調(diào)用,afterExecute()會(huì)在每次任務(wù)結(jié)束之后調(diào)用麦箍,terminated()方法則會(huì)在線程池被終止時(shí)調(diào)用酗电。使用這幾個(gè)方法的方式就是聲明一個(gè)子類繼承ThreadPoolExecutor,并且在子類中重寫需要定制的鉤子方法内列,最后在創(chuàng)建線程池時(shí)使用該子類實(shí)例即可。

2.任務(wù)調(diào)度

a.相關(guān)參數(shù)

???????對(duì)于ThreadPoolExecutor的實(shí)例化背率,其主要有如下幾個(gè)重要的參數(shù):

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
                          TimeUnit unit, BlockingQueue<Runnable> workQueue, 
                          ThreadFactory threadFactory, RejectedExecutionHandler handler);
  • corePoolSize: 線程池核心線程的數(shù)量话瞧;
  • maximumPoolSize: 線程池可創(chuàng)建的最大線程數(shù)量;
  • keepAliveTime: 當(dāng)線程數(shù)量超過了corePoolSize指定的線程數(shù)寝姿,并且空閑線程空閑的時(shí)間達(dá)到當(dāng)前參數(shù)指定的時(shí)間時(shí)該線程就會(huì)被銷毀交排,如果調(diào)用過allowCoreThreadTimeOut(boolean value)方法允許核心線程過期,那么該策略針對(duì)核心線程也是生效的饵筑;
  • unit: 指定了keepAliveTime的單位埃篓,可以為毫秒,秒根资,分架专,小時(shí)等;
  • workQueue: 存儲(chǔ)未執(zhí)行的任務(wù)的隊(duì)列玄帕;
  • threadFactory: 創(chuàng)建線程的工廠部脚,如果未指定則使用默認(rèn)的線程工廠;
  • handler: 指定了當(dāng)任務(wù)隊(duì)列已滿裤纹,并且沒有可用線程執(zhí)行任務(wù)時(shí)對(duì)新添加的任務(wù)的處理策略委刘;
b.調(diào)度策略

???????當(dāng)初始化一個(gè)線程池之后,池中是沒有任何用戶執(zhí)行任務(wù)的活躍線程的鹰椒,當(dāng)新的任務(wù)到來時(shí)锡移,根據(jù)配置的參數(shù)其主要的執(zhí)行任務(wù)如下:

  • 若線程池中線程數(shù)小于corePoolSize指定的線程數(shù)時(shí),每來一個(gè)任務(wù)漆际,都會(huì)創(chuàng)建一個(gè)新的線程執(zhí)行該任務(wù)淆珊,無論線程池中是否已有空閑的線程;
  • 若當(dāng)前執(zhí)行的任務(wù)達(dá)到了corePoolSize指定的線程數(shù)時(shí)奸汇,也即所有的核心線程都在執(zhí)行任務(wù)時(shí)套蒂,此時(shí)來的新任務(wù)會(huì)保存在workQueue指定的任務(wù)隊(duì)列中;
  • 當(dāng)所有的核心線程都在執(zhí)行任務(wù)茫蛹,并且任務(wù)隊(duì)列中存滿了任務(wù)操刀,此時(shí)若新來了任務(wù),那么線程池將會(huì)創(chuàng)建新線程執(zhí)行任務(wù)婴洼;
  • 若所有的線程(maximumPoolSize指定的線程數(shù))都在執(zhí)行任務(wù)骨坑,并且任務(wù)隊(duì)列也存滿了任務(wù)時(shí),對(duì)于新添加的任務(wù),其都會(huì)使用handler所指定的方式對(duì)其進(jìn)行處理欢唾。
c.調(diào)度策略注意點(diǎn)
  • 在第二步中且警,當(dāng)前核心線程都在執(zhí)行任務(wù),并且任務(wù)隊(duì)列已滿時(shí)礁遣,會(huì)創(chuàng)建新的線程執(zhí)行任務(wù)斑芜,這里需要注意的是,創(chuàng)建新線程的時(shí)候當(dāng)前總共需要執(zhí)行的任務(wù)數(shù)是(corePoolSize + workQueueSize)祟霍,并不是只有corePoolSize個(gè)任務(wù)杏头;
  • 在第三步中,這里workQueue主要有三種類型:ArrayBlockingQueue沸呐、LinkedBlockingQueue醇王、SynchronousQueue,第一個(gè)是有界阻塞隊(duì)列崭添,第二個(gè)是無界阻塞隊(duì)列寓娩,當(dāng)然也可以為其指定界限大小,第三個(gè)是同步隊(duì)列呼渣,對(duì)于ArrayBlockingQueue棘伴,其是需要指定隊(duì)列大小的,當(dāng)隊(duì)列存滿了任務(wù)線程池就會(huì)創(chuàng)建新的線程執(zhí)行任務(wù)屁置,對(duì)于LinkedBlockingQueue排嫌,如果其指定界限,那么和ArrayBlockingQueue區(qū)別不大缰犁,如果其不指定界限淳地,那么其理論上是可以存儲(chǔ)無限量的任務(wù)的,實(shí)際上能夠存儲(chǔ)Integer.MAX_VALUE個(gè)任務(wù)(還是相當(dāng)于可以存儲(chǔ)無限量的任務(wù))帅容,此時(shí)由于LinkedBlockingQueue是永遠(yuǎn)無法存滿任務(wù)的颇象,因而maxPoolSize的設(shè)定將沒有意義,一般其會(huì)設(shè)定為和corePoolSize相同的值并徘,對(duì)于SynchronousQueue遣钳,其內(nèi)部是沒有任何結(jié)構(gòu)存儲(chǔ)任務(wù)的,當(dāng)一個(gè)任務(wù)添加到該隊(duì)列時(shí)麦乞,當(dāng)前線程和后續(xù)添加任務(wù)的線程都會(huì)被阻塞蕴茴,直至有一個(gè)線程從該隊(duì)列中取出任務(wù),當(dāng)前線程才會(huì)被釋放姐直,因而如果線程池使用了該隊(duì)列倦淀,那么一般corePoolSize都會(huì)設(shè)計(jì)得比較小,maxPoolSize會(huì)設(shè)計(jì)得比較大声畏,因?yàn)樵撽?duì)列比較適合大量并且執(zhí)行時(shí)間較短的任務(wù)的執(zhí)行撞叽;
  • 在第四步中姻成,DiscardPolicy和DiscardOldestPolicy一般不會(huì)配合SynchronousQueue使用,因?yàn)楫?dāng)同步隊(duì)列阻塞了任務(wù)時(shí)愿棋,該任務(wù)都會(huì)被拋棄科展;對(duì)于AbortPolicy,因?yàn)槿绻?duì)列已滿糠雨,那么其會(huì)拋出異常才睹,因而使用時(shí)需要小心;對(duì)于CallerRunsPolicy甘邀,由于當(dāng)有新的任務(wù)到達(dá)時(shí)會(huì)使用調(diào)用線程執(zhí)行當(dāng)前任務(wù)琅攘,因而使用時(shí)需要考慮其對(duì)服務(wù)器響應(yīng)的影響,并且還需要注意的是鹃答,相對(duì)于其他幾個(gè)策略,該策略不會(huì)拋棄任務(wù)到達(dá)的任務(wù)突硝,因?yàn)槿绻竭_(dá)的任務(wù)使隊(duì)列滿了而只能使用調(diào)用線程執(zhí)行任務(wù)時(shí)测摔,說明線程池設(shè)計(jì)得不夠合理,如果任其發(fā)展解恰,那么所有的調(diào)用線程都可能會(huì)被需要執(zhí)行的任務(wù)所阻塞锋八,導(dǎo)致服務(wù)器出現(xiàn)問題。

3.源碼講解

a.主要屬性
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 32
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// 00011111 11111111 11111111 11111111

private static final int RUNNING    = -1 << COUNT_BITS; // 11100000 00000000 00000000 00000000
private static final int SHUTDOWN   =  0 << COUNT_BITS; // 00000000 00000000 00000000 00000000
private static final int STOP       =  1 << COUNT_BITS; // 00100000 00000000 00000000 00000000
private static final int TIDYING    =  2 << COUNT_BITS; // 01000000 00000000 00000000 00000000
private static final int TERMINATED =  3 << COUNT_BITS; // 01100000 00000000 00000000 00000000

???????由于ThreadPoolExecutor需要管理多種狀態(tài)护盈,并且還要記錄當(dāng)前執(zhí)行任務(wù)的線程的數(shù)量挟纱,如果使用多個(gè)變量,并發(fā)更新時(shí)管理將會(huì)非常復(fù)雜腐宋,這里ThreadPoolExecutor則主要使用一個(gè)AtomicInteger類型的變量ctl存儲(chǔ)所有主要的信息紊服。ctl是一個(gè)32位的整形數(shù)字,初始值為0胸竞,其最高的三位用于存儲(chǔ)當(dāng)前線程池的狀態(tài)信息欺嗤,主要有RUNNING,SHUTDOWN卫枝,STOP煎饼,TIDING和TERMINATED,分別表示運(yùn)行狀態(tài)校赤,關(guān)閉狀態(tài)吆玖,終止?fàn)顟B(tài),整理狀態(tài)和結(jié)束狀態(tài)马篮。這幾種狀態(tài)對(duì)應(yīng)的具體數(shù)值信息如上述代碼所示沾乘,這里需要注意的一點(diǎn)是,在ThreadPoolExecutor中浑测,這幾種狀態(tài)在數(shù)值上是從小到大依次增大的意鲸,并且狀態(tài)流轉(zhuǎn)也是依次往下的,這就為其判斷狀態(tài)信息提供了比較便利的方式,如當(dāng)需要判斷線程池狀態(tài)是否處于SHUTDOWN狀態(tài)時(shí)怎顾,只需要判斷其代表狀態(tài)位部分的值是否等于SHUTDOWN即可读慎。在ctl中,除了最高三位用于表示狀態(tài)外槐雾,其余位所代表的數(shù)值則指定了當(dāng)前線程池中正在執(zhí)行任務(wù)的線程數(shù)夭委。如下是操作ctl屬性的相關(guān)方法:

private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

private static boolean runStateLessThan(int c, int s) {
  return c < s;
}

private static boolean runStateAtLeast(int c, int s) {
  return c >= s;
}

private static boolean isRunning(int c) {
  return c < SHUTDOWN;
}

private boolean compareAndIncrementWorkerCount(int expect) {
  return ctl.compareAndSet(expect, expect + 1);
}

private boolean compareAndDecrementWorkerCount(int expect) {
  return ctl.compareAndSet(expect, expect - 1);
}
  • runStateOf(int c): 用于獲取當(dāng)前線程池的狀態(tài),c為當(dāng)前線程池工作時(shí)的ctl屬性值募强;
  • workerCountOf(int c): 用于獲取當(dāng)前線程池正在工作的線程數(shù)量株灸,c為當(dāng)前線程池工作時(shí)的ctl屬性值;
  • ctlOf(int rs, int wc): 這里rs表示當(dāng)前線程的工作狀態(tài)擎值,wc則表示正在工作的線程數(shù)慌烧,該方法用于將這兩個(gè)參數(shù)組裝為一個(gè)ctl屬性值;
  • runStateLessThan(int c, int s): 判斷當(dāng)前線程池狀態(tài)是否未達(dá)到指定狀態(tài)鸠儿,如前所述屹蚊,狀態(tài)流轉(zhuǎn)在數(shù)值上是依次增大的,因而這里只需要判斷其大小即可进每;
  • runStateAtLeast(int c, int s): 用于判斷當(dāng)前線程池狀態(tài)是否至少處于某種狀態(tài)汹粤;
  • isRunning(int c): 用于判斷當(dāng)前線程池是否處于正常運(yùn)行狀態(tài);
  • compareAndIncrementWorkerCount(int expect): 增加當(dāng)前線程池的工作線程數(shù)量值田晚;
  • compareAndDecrementWorkerCount(int expect): 減少當(dāng)前線程池的工作線程數(shù)量值嘱兼。
b.主要方法

???????對(duì)于線程池的execute()和submit()方法,其實(shí)在底層submit()方法會(huì)將傳入的任務(wù)封裝為一個(gè)FutureTask對(duì)象贤徒,由于FutureTask對(duì)象是實(shí)現(xiàn)了Runnable接口的芹壕,因而其也可以當(dāng)做一個(gè)任務(wù)執(zhí)行,這里就是將封裝后的FutureTask對(duì)象傳遞給execute()方法執(zhí)行的接奈。我們這里則主要講解execute()方法的實(shí)現(xiàn)方式哪雕,如下是execute()方法的代碼:

public void execute(Runnable command) {
  if (command == null)
    throw new NullPointerException();

  int c = ctl.get();    // 獲取當(dāng)前線程池狀態(tài)
  if (workerCountOf(c) < corePoolSize) {
    // 當(dāng)工作線程數(shù)小于核心線程數(shù)時(shí),則調(diào)用addWorker()方法創(chuàng)建線程并執(zhí)行任務(wù)
    if (addWorker(command, true))
      return;
    c = ctl.get();  // 若添加失敗鲫趁,則更新當(dāng)前線程池狀態(tài)
  }
  
  // 執(zhí)行到此處斯嚎,則說明線程池中的工作線程要么大于等于核心線程數(shù),要么當(dāng)前線程池已經(jīng)被命令關(guān)閉了(addWorker方法添加失敗的原因)挨厚,因而這里判斷線程池是否為RUNNING狀態(tài)堡僻,是則將任務(wù)添加到任務(wù)隊(duì)列中
  if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    // 添加隊(duì)列成功后雙重驗(yàn)證,確保線程池處于正確狀態(tài)
    if (! isRunning(recheck) && remove(command))
      reject(command);
    else if (workerCountOf(recheck) == 0)
      addWorker(null, false);   // 若線程池中沒有線程疫剃,則創(chuàng)建一個(gè)新線程執(zhí)行添加的任務(wù)
  } else if (!addWorker(command, false))
    reject(command);    // 線程池至少處于SHUTDOWN狀態(tài)钉疫,拒絕當(dāng)前任務(wù)的執(zhí)行
}

???????在execute()方法中,其首先判斷線程池工作線程數(shù)是否小于核心線程數(shù)巢价,是則創(chuàng)建核心線程執(zhí)行任務(wù)牲阁,添加失敗或者工作線程數(shù)大于等于核心線程數(shù)時(shí)固阁,則將任務(wù)添加到任務(wù)隊(duì)列中,添加成功后會(huì)進(jìn)行雙重驗(yàn)證確保當(dāng)前線程池處于正確的狀態(tài)城菊,并且確保當(dāng)前有可用的線程執(zhí)行新添加的任務(wù)备燃。由此可見對(duì)于execute()方法的實(shí)現(xiàn),其比較核心的方法是addWorker()方法凌唬,如下是addWorker()方法的實(shí)現(xiàn)方式:

private boolean addWorker(Runnable firstTask, boolean core) {
  retry:
  for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c); // 獲取當(dāng)前運(yùn)行狀態(tài)

    // 判斷當(dāng)前線程池是否至少為SHUTDOWN狀態(tài)并齐,并且firstTask和任務(wù)隊(duì)列中沒有任務(wù),是則直接返回
    if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
      return false;

    for (;;) {
      int wc = workerCountOf(c);
      // 判斷是否工作線程數(shù)大于可記錄的最大線程數(shù)客税,或者工作線程超過了指定的核心線程或者最大線程數(shù)
      if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
        return false;
      // 走到這一步說明當(dāng)前線程池處于RUNNING狀態(tài)况褪,或者任務(wù)隊(duì)列存在任務(wù),并且工作線程數(shù)不超過
      // 指定的線程數(shù)量更耻,那么就增加工作線程數(shù)量测垛,成功則繼續(xù)往下執(zhí)行,失敗則重復(fù)上述添加步驟
      if (compareAndIncrementWorkerCount(c))
        break retry;
      c = ctl.get();
      if (runStateOf(c) != rs)
        continue retry;
    }
  }

  // 記錄工作線程數(shù)的變量已經(jīng)更新秧均,接下來創(chuàng)建線程執(zhí)行任務(wù)
  boolean workerStarted = false;
  boolean workerAdded = false;
  Worker w = null;
  try {
    w = new Worker(firstTask);  // 創(chuàng)建一個(gè)工作者對(duì)象
    final Thread t = w.thread;
    if (t != null) {
      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
        int rs = runStateOf(ctl.get());

        // 重新檢查線程池狀態(tài)食侮,或者是判斷當(dāng)前是SHUTDOWN狀態(tài),而firstTask為空熬北,這說明任務(wù)隊(duì)列此時(shí)不為空
        if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
          if (t.isAlive())
            throw new IllegalThreadStateException();
          workers.add(w);   // 將創(chuàng)建的工作者添加到工作者集合中
          int s = workers.size();
          if (s > largestPoolSize)
            largestPoolSize = s;    // 更新已使用的最大線程數(shù)
          workerAdded = true;
        }
      } finally {
        mainLock.unlock();
      }
      if (workerAdded) {
        t.start();  // 工作者對(duì)象成功創(chuàng)建之后疙描,調(diào)用該工作者執(zhí)行任務(wù)
        workerStarted = true;
      }
    }
  } finally {
    if (!workerStarted)
      addWorkerFailed(w);
  }
  return workerStarted;
}

???????在addWorker()方法中诚隙,其首先檢查當(dāng)前線程池是否處于RUNNING狀態(tài)讶隐,或者處于SHUTDOWN狀態(tài),但是任務(wù)隊(duì)列中還存在有任務(wù)久又,那么其就會(huì)創(chuàng)建一個(gè)新的Worker對(duì)象巫延,并且將其添加到工作者對(duì)象集合中,然后調(diào)用工作者對(duì)象所維護(hù)的線程執(zhí)行任務(wù)地消,如下是工作者對(duì)象的實(shí)現(xiàn)代碼:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
  private static final long serialVersionUID = 6138294804551838833L;
  final Thread thread;  // 當(dāng)前工作者中執(zhí)行任務(wù)的線程
  Runnable firstTask;   // 第一個(gè)需要執(zhí)行的任務(wù)
  volatile long completedTasks; // 當(dāng)前工作者完成的任務(wù)數(shù)

  Worker(Runnable firstTask) {
    // 默認(rèn)設(shè)置為-1炉峰,那么如果不調(diào)用當(dāng)前工作者的run()方法,那么其狀態(tài)是不會(huì)改變的脉执,
    // 其他的線程也無法使用當(dāng)前工作者執(zhí)行任務(wù)疼阔,在run()方法調(diào)用的runWorker()方法中會(huì)
    // 調(diào)用unlock()方法使當(dāng)前工作者處于正常狀態(tài)
    setState(-1);
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);   // 使用線程工廠創(chuàng)建線程
  }

  public void run() {
    runWorker(this);    // 使用當(dāng)前工作者執(zhí)行任務(wù)
  }

  protected boolean isHeldExclusively() {
    return getState() != 0;
  }

  protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {
      setExclusiveOwnerThread(Thread.currentThread());
      return true;
    }
    return false;
  }

  protected boolean tryRelease(int unused) {
    setExclusiveOwnerThread(null);
    setState(0);
    return true;
  }

  public void lock()        { acquire(1); }
  public boolean tryLock()  { return tryAcquire(1); }
  public void unlock()      { release(1); }
  public boolean isLocked() { return isHeldExclusively(); }

  // 如果當(dāng)前線程已經(jīng)在執(zhí)行任務(wù),那么將其標(biāo)記為打斷狀態(tài)半夷,待其任務(wù)執(zhí)行完畢則終止任務(wù)的執(zhí)行
  void interruptIfStarted() {
    Thread t;
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
      try {
        t.interrupt();
      } catch (SecurityException ignore) {
      }
    }
  }
}

???????在工作者對(duì)象中婆廊,其主要維護(hù)了一個(gè)工作者線程,用于執(zhí)行任務(wù)巫橄。該工作者對(duì)象繼承了AbstractQueuedSynchronizer淘邻,用于控制當(dāng)前工作者工作狀態(tài)的獲取,并且其也實(shí)現(xiàn)了Runnable接口湘换,將主要任務(wù)的執(zhí)行封裝到run()方法中宾舅。如下是runWorker()方法的具體實(shí)現(xiàn):

final void runWorker(Worker w) {
  Thread wt = Thread.currentThread();
  Runnable task = w.firstTask;
  w.firstTask = null;
  w.unlock();   // 重置Worker對(duì)象的狀態(tài)
  boolean completedAbruptly = true;
  try {
    // 首先執(zhí)行工作者線程中的任務(wù)统阿,然后循環(huán)從任務(wù)隊(duì)列中獲取任務(wù)執(zhí)行
    while (task != null || (task = getTask()) != null) {
      w.lock();
      // 檢查當(dāng)前線程池的狀態(tài),如果線程池被終止或者線程池終止并且當(dāng)前線程已被打斷
      if ((runStateAtLeast(ctl.get(), STOP) ||
           (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
        wt.interrupt();
      try {
        beforeExecute(wt, task);    // 調(diào)用鉤子方法進(jìn)行預(yù)處理
        Throwable thrown = null;
        try {
          task.run();   // 執(zhí)行任務(wù)
        } catch (RuntimeException x) {
          thrown = x; throw x;
        } catch (Error x) {
          thrown = x; throw x;
        } catch (Throwable x) {
          thrown = x; throw new Error(x);
        } finally {
          afterExecute(task, thrown);   // 調(diào)用鉤子方法進(jìn)行任務(wù)完成后的處理工作
        }
      } finally {
        task = null;    // 重置工作者的初始任務(wù)
        w.completedTasks++;
        w.unlock();
      }
    }
    completedAbruptly = false;
  } finally {
    processWorkerExit(w, completedAbruptly);
  }
}

???????可以看到筹我,在runWorker()方法中扶平,其首先會(huì)執(zhí)行工作者對(duì)象的初始化任務(wù),當(dāng)執(zhí)行完畢后會(huì)通過一個(gè)無限循環(huán)不斷在任務(wù)隊(duì)列中獲取任務(wù)執(zhí)行崎溃。如下是getTask()方法的源碼:

private Runnable getTask() {
  boolean timedOut = false;

  for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);

    // 判斷當(dāng)前線程是否處于STOP狀態(tài)蜻直,或者處于SHUTDOWN狀態(tài),并且工作隊(duì)列是空的袁串,是則不返回任務(wù)
    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
      decrementWorkerCount();
      return null;
    }

    int wc = workerCountOf(c);
    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;    // 是否允許空閑線程過期

    // 工作線程數(shù)大于最大允許線程數(shù)概而,或者線程在指定時(shí)間內(nèi)無法從工作隊(duì)列中獲取到新任務(wù),則銷毀當(dāng)前線程
    if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
      if (compareAndDecrementWorkerCount(c))
        return null;
      continue;
    }

    try {
      // 允許核心線程過期或者工作線程數(shù)大于corePoolSize時(shí)囱修,從任務(wù)隊(duì)列獲取任務(wù)時(shí)會(huì)指定等待時(shí)間赎瑰,
      // 否則會(huì)一直等待任務(wù)隊(duì)列中新的任務(wù)
      Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
      if (r != null)
        return r;
      timedOut = true;
    } catch (InterruptedException retry) {
      timedOut = false;
    }
  }
}

???????可以看到,getTask方法首先會(huì)判斷當(dāng)前線程池狀態(tài)是否為STOP狀態(tài)破镰,或者是SHUTDOWN狀態(tài)餐曼,并且任務(wù)隊(duì)列是空的,是則不返回任務(wù)鲜漩,否則會(huì)根據(jù)相關(guān)參數(shù)從任務(wù)隊(duì)列中獲取任務(wù)執(zhí)行源譬。

???????以上execute()方法的主要實(shí)現(xiàn)步驟,在ThreadPoolExecutor中另一個(gè)至關(guān)重要的方法則是shutdown()方法孕似,以下是shutdown()方法的主要代碼:

public void shutdown() {
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    checkShutdownAccess();  // 檢查對(duì)線程狀態(tài)的控制權(quán)限
    advanceRunState(SHUTDOWN);  // 更新當(dāng)前線程池狀態(tài)為SHUTDOWN
    interruptIdleWorkers(); // 打斷空閑的工作者
    onShutdown();   // 鉤子方法踩娘,但是沒有對(duì)外公開,因?yàn)樵摲椒ㄖ挥邪L問權(quán)限
  } finally {
    mainLock.unlock();
  }
  tryTerminate();   
}

???????在shutdown()方法中喉祭,其首先檢查當(dāng)前線程是否有修改線程狀態(tài)的權(quán)限养渴,然后將當(dāng)前線程池的狀態(tài)修改為SHUTDOWN,接著調(diào)用interruptIdleWorkers()方法中斷所有處于空閑狀態(tài)的線程泛烙,最后則是調(diào)用tryTerminate()方法嘗試將當(dāng)前線程池的狀態(tài)由SHUTDOWN修改為TERMINATED理卑,這里interruptIdleWorkers()方法最終會(huì)調(diào)用其重載方法interruptIdleWorkers(boolean)方法,該方法代碼如下:

private void interruptIdleWorkers(boolean onlyOne) {
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    for (Worker w : workers) {
      Thread t = w.thread;
      if (!t.isInterrupted() && w.tryLock()) {
        try {
          t.interrupt();
        } catch (SecurityException ignore) {
        } finally {
          w.unlock();
        }
      }
      if (onlyOne)
        break;
    }
  } finally {
    mainLock.unlock();
  }
}

???????可以看到蔽氨,該方法會(huì)遍歷所有的工作者對(duì)象藐唠,如果其處于空閑狀態(tài),則將其終止鹉究。對(duì)于處于工作狀態(tài)的線程宇立,由于在shutdown()方法中已經(jīng)將當(dāng)前線程池的狀態(tài)設(shè)置為SHUTDOWN,那么工作狀態(tài)的線程會(huì)將任務(wù)隊(duì)列中的任務(wù)都執(zhí)行完畢之后自動(dòng)銷毀坊饶。

???????本文主要講解了ThreadPoolExecutor的主要方法泄伪,線程池的調(diào)度方式,以及其核心功能的實(shí)現(xiàn)原理匿级,如本文有任何不當(dāng)之處蟋滴,敬請(qǐng)指正染厅,謝謝!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末津函,一起剝皮案震驚了整個(gè)濱河市肖粮,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌尔苦,老刑警劉巖涩馆,帶你破解...
    沈念sama閱讀 218,451評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異允坚,居然都是意外死亡魂那,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,172評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門稠项,熙熙樓的掌柜王于貴愁眉苦臉地迎上來涯雅,“玉大人,你說我怎么就攤上這事展运』钅妫” “怎么了?”我有些...
    開封第一講書人閱讀 164,782評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵拗胜,是天一觀的道長(zhǎng)蔗候。 經(jīng)常有香客問我,道長(zhǎng)埂软,這世上最難降的妖魔是什么锈遥? 我笑而不...
    開封第一講書人閱讀 58,709評(píng)論 1 294
  • 正文 為了忘掉前任,我火速辦了婚禮仰美,結(jié)果婚禮上迷殿,老公的妹妹穿的比我還像新娘儿礼。我一直安慰自己咖杂,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,733評(píng)論 6 392
  • 文/花漫 我一把揭開白布蚊夫。 她就那樣靜靜地躺著诉字,像睡著了一般。 火紅的嫁衣襯著肌膚如雪知纷。 梳的紋絲不亂的頭發(fā)上壤圃,一...
    開封第一講書人閱讀 51,578評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音琅轧,去河邊找鬼伍绳。 笑死,一個(gè)胖子當(dāng)著我的面吹牛乍桂,可吹牛的內(nèi)容都是我干的冲杀。 我是一名探鬼主播效床,決...
    沈念sama閱讀 40,320評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼权谁!你這毒婦竟也來了剩檀?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,241評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤旺芽,失蹤者是張志新(化名)和其女友劉穎沪猴,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體采章,經(jīng)...
    沈念sama閱讀 45,686評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡运嗜,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,878評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了悯舟。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片洗出。...
    茶點(diǎn)故事閱讀 39,992評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖图谷,靈堂內(nèi)的尸體忽然破棺而出翩活,到底是詐尸還是另有隱情,我是刑警寧澤便贵,帶...
    沈念sama閱讀 35,715評(píng)論 5 346
  • 正文 年R本政府宣布菠镇,位于F島的核電站,受9級(jí)特大地震影響承璃,放射性物質(zhì)發(fā)生泄漏利耍。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,336評(píng)論 3 330
  • 文/蒙蒙 一盔粹、第九天 我趴在偏房一處隱蔽的房頂上張望隘梨。 院中可真熱鬧,春花似錦舷嗡、人聲如沸轴猎。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,912評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽捻脖。三九已至,卻和暖如春中鼠,著一層夾襖步出監(jiān)牢的瞬間可婶,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,040評(píng)論 1 270
  • 我被黑心中介騙來泰國打工援雇, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留矛渴,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,173評(píng)論 3 370
  • 正文 我出身青樓惫搏,卻偏偏與公主長(zhǎng)得像具温,于是被迫代替她去往敵國和親盗舰。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,947評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容