Java 線程池詳解

1. 線程池的概念

1.1 基本概念

由于線程的生命周期中包括創(chuàng)建击敌、就緒介返、運(yùn)行、阻塞、銷(xiāo)毀階段圣蝎,當(dāng)我們待處理的任務(wù)數(shù)目較小時(shí)刃宵,我們可以自己創(chuàng)建幾個(gè)線程來(lái)處理相應(yīng)的任務(wù),但當(dāng)有大量的任務(wù)時(shí)徘公,由于創(chuàng)建牲证、銷(xiāo)毀線程需要很大的開(kāi)銷(xiāo),運(yùn)用線程池這些問(wèn)題就大大的緩解了关面。

1.2 使用線程池的好處

1.2.1 使用new Thread()創(chuàng)建線程的弊端

  1. 每次通過(guò)new Thread()創(chuàng)建對(duì)象性能不佳坦袍。

  2. 線程缺乏統(tǒng)一管理,可能無(wú)限制新建線程等太,相互之間競(jìng)爭(zhēng)捂齐,及可能占用過(guò)多系統(tǒng)資源導(dǎo)致死機(jī)或oom。

  3. 缺乏更多功能缩抡,如定時(shí)執(zhí)行奠宜、定期執(zhí)行、線程中斷瞻想。

1.2.2 使用Java線程池的好處

  1. 重用存在的線程压真,減少對(duì)象創(chuàng)建、消亡的開(kāi)銷(xiāo)蘑险,提升性能滴肿。

  2. 可有效控制最大并發(fā)線程數(shù),提高系統(tǒng)資源的使用率佃迄,同時(shí)避免過(guò)多資源競(jìng)爭(zhēng)泼差,避免堵塞。

  3. 提供定時(shí)執(zhí)行和屎、定期執(zhí)行拴驮、單線程、并發(fā)數(shù)控制等功能柴信。

2. Java 中有哪幾種線程池

2.1 CachedThreadPool

優(yōu)點(diǎn)

工作線程的創(chuàng)建數(shù)量幾乎沒(méi)有限制(其實(shí)也有限制的,數(shù)目為Interger. MAX_VALUE), 這樣可靈活的往線程池中添加線程。

如果長(zhǎng)時(shí)間沒(méi)有往線程池中提交任務(wù)宽气,即如果工作線程空閑了指定的時(shí)間(默認(rèn)為1分鐘)随常,則該工作線程將自動(dòng)終止。終止后萄涯,如果你又提交了新的任務(wù)绪氛,則線程池重新創(chuàng)建一個(gè)工作線程。

缺點(diǎn)

在使用CachedThreadPool時(shí)涝影,一定要注意控制任務(wù)的數(shù)量枣察,否則,由于大量線程同時(shí)運(yùn)行,很有會(huì)造成系統(tǒng)癱瘓序目。

2.2 FixedThreadPool

創(chuàng)建一個(gè)指定工作線程數(shù)量的線程池臂痕。每當(dāng)提交一個(gè)任務(wù)就創(chuàng)建一個(gè)工作線程,如果工作線程數(shù)量達(dá)到線程池初始的最大數(shù)猿涨,則將提交的任務(wù)存入到池隊(duì)列中握童。定長(zhǎng)線程池的大小最好根據(jù)系統(tǒng)資源進(jìn)行設(shè)置如Runtime.getRuntime().availableProcessors()

優(yōu)點(diǎn)

FixedThreadPool是一個(gè)典型且優(yōu)秀的線程池,它具有線程池提高程序效率和節(jié)省創(chuàng)建線程時(shí)所耗的開(kāi)銷(xiāo)的優(yōu)點(diǎn)叛赚。

缺點(diǎn)

但是澡绩,在線程池空閑時(shí),即線程池中沒(méi)有可運(yùn)行任務(wù)時(shí)俺附,它不會(huì)釋放工作線程肥卡,還會(huì)占用一定的系統(tǒng)資源。

2.3 SingleThreadExecutor

創(chuàng)建一個(gè)單線程化的Executor事镣,即只創(chuàng)建唯一的工作者線程來(lái)執(zhí)行任務(wù)召调,它只會(huì)用唯一的工作線程來(lái)執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級(jí))執(zhí)行蛮浑。如果這個(gè)線程異常結(jié)束唠叛,會(huì)有另一個(gè)取代它,保證順序執(zhí)行沮稚。單工作線程最大的特點(diǎn)是可保證順序地執(zhí)行各個(gè)任務(wù)艺沼,并且在任意給定的時(shí)間不會(huì)有多個(gè)線程是活動(dòng)的。

2.4 ScheduleThreadPool

創(chuàng)建一個(gè)定長(zhǎng)的線程池蕴掏,而且支持定時(shí)的以及周期性的任務(wù)執(zhí)行障般,支持定時(shí)及周期性任務(wù)執(zhí)行。

3. 如何實(shí)現(xiàn)自定義的線程池

3.1 線程池具體實(shí)現(xiàn)解析

當(dāng)我們使用 線程池的時(shí)候盛杰,可以使用 newCachedThreadPool()或者 newFixedThreadPool(int)等方法挽荡,其實(shí)我們深入到這些方法里面,就可以看到它們的是實(shí)現(xiàn)方式是這樣的即供。

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

可以看到定拟,線程池的具體實(shí)現(xiàn)是調(diào)用 ThreadPoolExecutor 的構(gòu)造方法實(shí)現(xiàn)的,那么下面來(lái)具體看看 ThreadPoolExecutor 構(gòu)造方法的參數(shù)逗嫡。

3.2 ThreadPoolExecutor 參數(shù)詳解

先來(lái)看看 ThreadPoolExecutor 的構(gòu)造方法:

通過(guò)查看 ThreadPoolExecutor 源碼可以看到青自,該類(lèi)有四個(gè)構(gòu)造方法,頭三個(gè)構(gòu)造方法驱证,其實(shí)都是調(diào)用的第四個(gè)構(gòu)造方法延窜,所以我們就解釋一下第四個(gè)構(gòu)造方法的參數(shù)含義。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
  1. corePoolSize:核心線程池的大小抹锄,在線程池被創(chuàng)建之后逆瑞,其實(shí)里面是沒(méi)有線程的荠藤。(當(dāng)然,調(diào)用 prestartAllCoreThreads() 或者 prestartCoreThread() 方法會(huì)預(yù)創(chuàng)建線程获高,而不用等著任務(wù)的到來(lái))哈肖。當(dāng)有任務(wù)進(jìn)來(lái)的時(shí)候,才會(huì)創(chuàng)建線程谋减。當(dāng)線程池中的線程數(shù)量達(dá)到corePoolSize之后牡彻,就把任務(wù)放到 緩存隊(duì)列當(dāng)中。(就是 workQueue )出爹。

  2. maximumPoolSize:最大線程數(shù)量是多少庄吼。它標(biāo)志著這個(gè)線程池的最大線程數(shù)量。如果沒(méi)有最大數(shù)量严就,當(dāng)創(chuàng)建的線程數(shù)量達(dá)到了 某個(gè)極限值总寻,到最后內(nèi)存肯定就爆掉了。

  3. keepAliveTime:當(dāng)線程沒(méi)有任務(wù)時(shí)梢为,最多保持的時(shí)間渐行,超過(guò)這個(gè)時(shí)間就被終止了。默認(rèn)情況下铸董,只有 線程池中線程數(shù)量 大于 corePoolSize 時(shí)祟印,keepAliveTime 值才會(huì)起作用。也就說(shuō)說(shuō)粟害,只有在線程池線程數(shù)量超出 corePoolSize 了蕴忆。我們才會(huì)把超時(shí)的空閑線程給停止掉。否則就保持線程池中有 corePoolSize 個(gè)線程就可以了悲幅。默認(rèn)值是60秒套鹅。

  4. Unit:參數(shù)keepAliveTime的時(shí)間單位,就是 TimeUnit類(lèi)當(dāng)中的幾個(gè)屬性汰具。

  5. workQueue:用來(lái)存儲(chǔ)待執(zhí)行任務(wù)的隊(duì)列卓鹿,不同的線程池它的隊(duì)列實(shí)現(xiàn)方式不同(因?yàn)檫@關(guān)系到排隊(duì)策略的問(wèn)題)比如有以下幾種:

    • ArrayBlockingQueue:基于數(shù)組的隊(duì)列,創(chuàng)建時(shí)需要指定大小留荔。

    • LinkedBlockingQueue:基于鏈表的隊(duì)列吟孙,如果沒(méi)有指定大小,則默認(rèn)值是 Integer.MAX_VALUE存谎。(newFixedThreadPool和newSingleThreadExecutor使用的就是這種隊(duì)列)拔疚,吞吐量通常要高于ArrayBlockingQuene。

    • SynchronousQueue:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列既荚,每個(gè)插入操作必須等到另一個(gè)線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài)栋艳,吞吐量通常要高于LinkedBlockingQuene(newCachedThreadPool使用的就是這種隊(duì)列)恰聘。

  6. threadFactory:線程工廠,用來(lái)創(chuàng)建線程。通過(guò)自定義的線程工廠可以給每個(gè)新建的線程設(shè)置一個(gè)具有識(shí)別度的線程名晴叨。

  7. Handler:拒絕執(zhí)行任務(wù)時(shí)的策略凿宾,一般來(lái)講有以下四種策略:

    • ThreadPoolExecutor.AbortPolicy 丟棄任務(wù),并拋出 RejectedExecutionException 異常兼蕊。

    • ThreadPoolExecutor.CallerRunsPolicy:該任務(wù)被線程池拒絕初厚,由調(diào)用 execute 方法的線程執(zhí)行該任務(wù)。

    • ThreadPoolExecutor.DiscardOldestPolicy : 拋棄隊(duì)列最前面的任務(wù)孙技,然后重新嘗試執(zhí)行任務(wù)产禾。

    • ThreadPoolExecutor.DiscardPolicy,丟棄任務(wù)牵啦,不過(guò)也不拋出異常亚情。

3.3 線程池的處理流程

下圖是提交任務(wù)給線程池之后, 線程池的處理流程圖

線程處理流程圖
  1. 如果當(dāng)前線程池線程數(shù)目小于 corePoolSize(核心池還沒(méi)滿(mǎn)呢)哈雏,那么就創(chuàng)建一個(gè)新線程去處理任務(wù)楞件。

  2. 如果核心池已經(jīng)滿(mǎn)了裳瘪,來(lái)了一個(gè)新的任務(wù)后皆怕,會(huì)嘗試將其添加到任務(wù)隊(duì)列中虱黄,如果成功,則等待空閑線程將其從隊(duì)列中取出并且執(zhí)行,如果隊(duì)列已經(jīng)滿(mǎn)了,則繼續(xù)下一步搪桂。

  3. 此時(shí)裸燎,如果線程池線程數(shù)量 小于 maximumPoolSize蕴纳,則創(chuàng)建一個(gè)新線程執(zhí)行任務(wù)稻薇,否則案狠,那就說(shuō)明線程池到了最大飽和能力了,沒(méi)辦法再處理了名段,此時(shí)就按照拒絕策略來(lái)處理阱扬。(就是構(gòu)造函數(shù)當(dāng)中的 Handler 對(duì)象)泣懊。

  4. 如果線程池的線程數(shù)量大于 corePoolSize伸辟,則當(dāng)某個(gè)線程的空閑時(shí)間超過(guò)了 keepAliveTime,那么這個(gè)線程就要被銷(xiāo)毀了馍刮,直到線程池中線程數(shù)量不大于 corePoolSize 為止信夫。

4. 線程池的實(shí)現(xiàn)原理

4.1 線程池的處理流程

一個(gè)線程從被提交(submit)到執(zhí)行共經(jīng)歷以下流程:

  1. 判斷核心線程池中的線程是否都在執(zhí)行任務(wù),如果不是則新建一個(gè)工作線程執(zhí)行任務(wù)卡啰,如果都在執(zhí)行任務(wù)則進(jìn)入到第二個(gè)流程

  2. 判斷工作隊(duì)列是否已滿(mǎn)静稻,如果工作隊(duì)列沒(méi)有滿(mǎn),則新提交的任務(wù)存儲(chǔ)在工作隊(duì)列中匈辱,如果滿(mǎn)了則進(jìn)入到第三個(gè)流程

  3. 判斷線程池內(nèi)部的線程是否都處于工作狀態(tài)振湾,如果不是則新建一個(gè)工作線程執(zhí)行任務(wù),如果都在執(zhí)行任務(wù)亡脸,則交給任務(wù)飽和度策略來(lái)處理這個(gè)任務(wù)押搪。

線程池在執(zhí)行excute方法時(shí),主要有以下四種情況:

  1. 如果當(dāng)前線程數(shù)少于 coolPoolSize 浅碾,那么新建一個(gè)工作線程執(zhí)行任務(wù)大州。(需要獲得全局鎖)

  2. 如果當(dāng)前線程數(shù)等于或大于 coolPoolSize , 那么將創(chuàng)建的線程放入到 BlockQueue 中垂谢。

  3. 如果如果無(wú)法將任務(wù)放入到 BlockQueue(已滿(mǎn))厦画,那么新建一個(gè)線程執(zhí)行任務(wù)。(需要獲得全局鎖)

  4. 如果新建的線程數(shù)大于了當(dāng)前線程池的 maxiumPoolSize 滥朱, 那么任務(wù)會(huì)被拒絕根暑,交給 RejectedExecutionHandler.rejectedExecution() 方法處理。

4.2 ThreadPoolExecutor 源碼分析

4.2.1 定義的幾個(gè)變量

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));  
private static final int COUNT_BITS = Integer.SIZE - 3;  
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;  

// runState is stored in the high-order bits  
private static final int RUNNING    = -1 << COUNT_BITS;  
private static final int SHUTDOWN   =  0 << COUNT_BITS;  
private static final int STOP       =  1 << COUNT_BITS;  
private static final int TIDYING    =  2 << COUNT_BITS;  
private static final int TERMINATED =  3 << COUNT_BITS;  

// Packing and unpacking ctl  
private static int runStateOf(int c)     { return c & ~CAPACITY; }  
private static int workerCountOf(int c)  { return c & CAPACITY; }  
private static int ctlOf(int rs, int wc) { return rs | wc; } 

在分析源碼前有必要理解一個(gè)變量 ctl 徙邻。這是 Java 大神們?yōu)榱税压ぷ骶€程數(shù)量和線程池狀態(tài)放在一個(gè)int類(lèi)型變量里存儲(chǔ)而設(shè)置的一個(gè)原子類(lèi)型的變量排嫌。 在 ctl 中,低位的 29 位表示工作線程的數(shù)量鹃栽,高位用來(lái)表示 RUNNING躏率、SHUTDOWN、STOP 等狀態(tài)民鼓。 因此一個(gè)線程池的數(shù)量也就變成了 (2^29)-1 薇芝,大約 500 million ,而不是 (2^31)-1 丰嘉, 2billion 夯到。上面定義的三個(gè)方法只是為了計(jì)算得到線程池的狀態(tài)和工作線程的數(shù)量。

4.2.2 Execute 方法

public void execute(Runnable command) {  
 //如果提交了空的任務(wù) 拋出異常  
       if (command == null)  
           throw new NullPointerException();  
 int c = ctl.get();//獲取當(dāng)前線程池的狀態(tài)  
 //檢查當(dāng)前工作線程數(shù)量是否小于核心線程數(shù)量  
       if (workerCountOf(c) < corePoolSize) {  
    //通過(guò)addWorker方法提交任務(wù)  
           if (addWorker(command, true))  
               return;  
           c = ctl.get();//如果提交失敗 需要二次檢查狀態(tài)  
       }  
//向工作線程提交任務(wù)   
       if (isRunning(c) && workQueue.offer(command)) {  
           // 再次檢查狀態(tài)  
    int recheck = ctl.get();  
      
           if (! isRunning(recheck) && remove(command))  
               reject(command);  
           else if (workerCountOf(recheck) == 0)  
               addWorker(null, false);  
       }  
//擴(kuò)容失敗 則拒絕任務(wù)  
       else if (!addWorker(command, false))  
           reject(command);  
    
} 

從源碼中可以看到提交任務(wù)的這一過(guò)程基本與第二個(gè)圖的四個(gè)流程是一致的饮亏,需要檢查的是當(dāng)前工作線程的數(shù)量與核心線程數(shù)量的關(guān)系耍贾,來(lái)決定提交任務(wù)的方式或者是拒絕任務(wù)提交阅爽。而具體任務(wù)的提交工作是在addWorker方法中。在這里面看到了recheck這樣的變量荐开,這是在執(zhí)行了一些動(dòng)作失敗后再次檢查線程池的狀態(tài)付翁,因?yàn)樵谶@期間可能有線程池關(guān)閉獲得線程池飽和等狀態(tài)的改變。

4.2.3 addWorker 方法

這個(gè)方法是任務(wù)提交的一個(gè)核心方法晃听。在里面完成了狀態(tài)檢查百侧、新建任務(wù)、執(zhí)行任務(wù)等一系列動(dòng)作能扒。它有兩個(gè)參數(shù)佣渴,第一個(gè)參數(shù)是提交的任務(wù),第二個(gè)參數(shù)是一個(gè)標(biāo)識(shí)符初斑,標(biāo)識(shí)在檢查工作線程數(shù)量的時(shí)候是應(yīng)該與 corePoolSize 對(duì)比還是應(yīng)該 maximumPoolSize 對(duì)比辛润。

private boolean addWorker(Runnable firstTask, boolean core) {  
       retry:  
    //死循環(huán)更新?tīng)顟B(tài)  
       for (;;) {  
           int c = ctl.get();  
           int rs = runStateOf(c);//獲取運(yùn)行狀態(tài)  
  
    //檢查線程池是否處于關(guān)閉狀態(tài)  
           if (rs >= SHUTDOWN &&  
               ! (rs == SHUTDOWN &&  
                  firstTask == null &&  
                  ! workQueue.isEmpty()))  
               return false;  
  
           for (;;) {  
      //獲取當(dāng)前工作線程數(shù)量  
               int wc = workerCountOf(c);  
    //如果已經(jīng)超過(guò)corePoolSize獲取maximumPoolSize 返回false  
               if (wc >= CAPACITY ||  
                   wc >= (core ? corePoolSize : maximumPoolSize))  
                   return false;  
    //CAS增加一個(gè)工作線程  
               if (compareAndIncrementWorkerCount(c))  
                break retry;  
    //再次獲取狀態(tài)  
               c = ctl.get();  // Re-read ctl  
    //如果狀態(tài)更新失敗 則循環(huán)更新  
               if (runStateOf(c) != rs)  
                   continue retry;  
               // else CAS failed due to workerCount change; retry inner loop  
           }  
       }  
  
       boolean workerStarted = false;  
       boolean workerAdded = false;  
       Worker w = null;  
       try {  
           w = new Worker(firstTask);//初始化一個(gè)工作線程  
           final Thread t = w.thread;  
           if (t != null) {  
     //獲得鎖  
               final ReentrantLock mainLock = this.mainLock;  
               mainLock.lock();  
               try {  
                   // Recheck while holding lock.  
                   // Back out on ThreadFactory failure or if  
                   // shut down before lock acquired.  
                   int rs = runStateOf(ctl.get());  
  
                   if (rs < SHUTDOWN ||  
                       (rs == SHUTDOWN && firstTask == null)) {  
                       if (t.isAlive()) // SHUTDOWN以后的狀態(tài)和SHUTDOWN狀態(tài)下firstTask為null,不可新增線程 
                           throw new IllegalThreadStateException();  
                       workers.add(w);  //添加工作這到hashset中保存  
                       int s = workers.size();  
                       if (s > largestPoolSize)  
                           largestPoolSize = s;  //記錄最大線程數(shù)
                       workerAdded = true;  
                   }  
               } finally {  
                   mainLock.unlock();  
               }  
               if (workerAdded) {  
    //工作線程啟動(dòng) 執(zhí)行第一個(gè)任務(wù) 就是新提交的任務(wù)  
                   t.start();  
                   workerStarted = true;  
               }  
           }  
       } finally {  
           if (! workerStarted)  
               addWorkerFailed(w);  
       }  
       return workerStarted;  
} 

這個(gè)方法可以分為兩個(gè)階段來(lái)看见秤,第一個(gè)階段是判斷是否有必要新增一個(gè)工作線程砂竖,如果有則利用CAS更新工作線程的數(shù)量;第二部分是將提交的任務(wù)封裝成一個(gè)工作線程Worker然后加入到線程池的容器中秦叛,開(kāi)始執(zhí)行新提交的任務(wù)晦溪。這個(gè)Worker在執(zhí)行完任務(wù)后,還會(huì)循環(huán)地獲取工作隊(duì)列里的任務(wù)來(lái)執(zhí)行挣跋。下面來(lái)看一下Worker的構(gòu)造方法就能更好地理解上面的代碼了

4.2.4 runWorker 方法

在addWorker方法快要結(jié)束的地方三圆,調(diào)用了t.start()方法,我們知道它實(shí)際執(zhí)行的就是Worker對(duì)象的run()方法避咆,而worker的run()方法是這樣定義的:

/** Delegates main run loop to outer runWorker  */  
       public void run() {  
           runWorker(this);  
       }

它實(shí)際上是將自己委托給線程池的runWorker方法

final void runWorker(Worker w) {  
  
      Thread wt = Thread.currentThread();  
      Runnable task = w.firstTask;  
      w.firstTask = null;  
      w.unlock(); // allow interrupts  
      boolean completedAbruptly = true;  
      try {  
   //不斷地從blockingQueue獲取任務(wù)  
          while (task != null || (task = getTask()) != null) {  
              w.lock();  
              // If pool is stopping, ensure thread is interrupted;  
              // if not, ensure thread is not interrupted.  This  
              // requires a recheck in second case to deal with  
              // shutdownNow race while clearing interrupt  
              if ((runStateAtLeast(ctl.get(), STOP) ||  
                   (Thread.interrupted() &&  
                    runStateAtLeast(ctl.get(), STOP))) &&  
                  !wt.isInterrupted())  
                  wt.interrupt();  
              try {  
    //執(zhí)行beforeExecute方法  
                  beforeExecute(wt, task);  
                  Throwable thrown = null;  
                  try {  
    //調(diào)用Runable的run方法  
                      task.run();  
                  } catch (RuntimeException x) {  
                      thrown = x; throw x;  
                  } catch (Error x) {  
                      thrown = x; throw x;  
                  } catch (Throwable x) {  
                      thrown = x; throw new Error(x);  
                  } finally {  
    // 執(zhí)行aferExecute方法  
                      afterExecute(task, thrown);  
                  }  
              } finally {  
                  task = null;  
                  w.completedTasks++;  
                  w.unlock();  
              }  
          }  
          completedAbruptly = false;  
      } finally {  
          processWorkerExit(w, completedAbruptly);  
      }  
}  

這個(gè)方法呢也比較好理解舟肉,它在不斷執(zhí)行我們提交的任務(wù)的run方法。而這個(gè)任務(wù)可能是我們新提交的查库,也有可能是從等待隊(duì)列中獲取的路媚。這樣就實(shí)現(xiàn)了線程池的完整邏輯。

4.2.5 shutdown樊销,shutdownNow 方法

shutdown 方法

public void shutdown() {  
        final ReentrantLock mainLock = this.mainLock;  
        mainLock.lock();  
        try {  
            checkShutdownAccess(); //這個(gè)方法校驗(yàn)線程訪問(wèn)許可整慎,不是很理解,后面有時(shí)間再單獨(dú)解析围苫;  
            advanceRunState(SHUTDOWN); //轉(zhuǎn)換線程池狀態(tài)為SHUTDOWN  
            interruptIdleWorkers(); //中斷所有空閑的線程  
            onShutdown(); // 空實(shí)現(xiàn)方法裤园,是做shutdown清理操作的  
        } finally {  
            mainLock.unlock();  
        }  
        tryTerminate(); //嘗試結(jié)束線程池(設(shè)置狀態(tài)為T(mén)ERMINATED)  
} 

shutdownNow 方法

public List<Runnable> shutdownNow() {  
        List<Runnable> tasks;  
        final ReentrantLock mainLock = this.mainLock;  
        mainLock.lock();  
        try {  
            checkShutdownAccess();//同上  
            advanceRunState(STOP);//轉(zhuǎn)換線程池狀態(tài)到STOP  
            interruptWorkers();//中斷所有線程  
            tasks = drainQueue();//獲取到任務(wù)隊(duì)列所有任務(wù),并清空隊(duì)列  
        } finally {  
            mainLock.unlock();  
        }  
        tryTerminate();//同上  
        return tasks;  
    } 

由上可知剂府,兩個(gè)關(guān)閉方法的區(qū)別:

  1. shutdown 設(shè)置狀態(tài)為 SHUTDOWN拧揽,而 shutdownNow 設(shè)置狀態(tài)為 STOP

  2. shutdown 值中斷空閑的線程,已提交的任務(wù)可以繼續(xù)被執(zhí)行,而 shutdownNow 中斷所有線程

  3. shutdown 無(wú)返回值淤袜,shutdownNow 返回任務(wù)隊(duì)列中還未執(zhí)行的任務(wù)

雖然有 shutdown 和 shutdownNow 方法痒谴,但是還是不能滿(mǎn)足一個(gè)需求:就是需要知道等待所有任務(wù)已完成線程池結(jié)束,這里 ThreadPoolExecutor 提供了 awaitTermination 方法滿(mǎn)足這個(gè)需求:

public boolean awaitTermination(long timeout, TimeUnit unit)  
    throws InterruptedException {  
    long nanos = unit.toNanos(timeout);  
    final ReentrantLock mainLock = this.mainLock;  
    mainLock.lock();  
    try {  
        for (;;) {  
            if (runStateAtLeast(ctl.get(), TERMINATED))  
                return true;  
            if (nanos <= 0)  
                return false;  
            nanos = termination.awaitNanos(nanos);  
        }  
    } finally {  
        mainLock.unlock();  
    }  
}  

這個(gè)方法兩個(gè)入?yún)ⅲO(shè)置等待超時(shí)時(shí)間铡羡。

如果狀態(tài)已經(jīng)是 TERMINATED 返回 true 积蔚,表示已關(guān)閉。

否則一直等到 termination 的 signalAll 至超時(shí)或者當(dāng)前線程中斷蓖墅。超時(shí)后都線程池都沒(méi)有關(guān)閉库倘,返回 false。

5. 代碼練習(xí)

5.1 當(dāng)線程池滿(mǎn)后處理剩余任務(wù)的 handler

ThreadPoolExecutor.CallerRunsPolicy:該任務(wù)被線程池拒絕论矾,由調(diào)用 execute 方法的線程執(zhí)行該任務(wù)。

public class ThreadPoolDemo {

    public static void main(String[] args) {
        ThreadPoolExecutor exec = new ThreadPoolExecutor(3, 5, 2,
                TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(5), new ThreadPoolExecutor.CallerRunsPolicy());
        for (int i = 0; i < 15; i++) {
            exec.execute(new Task());
        }
        exec.shutdown();
    }

}
class Task implements Runnable {

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " >>> start.");
        // mock run for a while
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

運(yùn)行結(jié)果:可以看到有兩個(gè)線程交給了 main 方法處理杆勇。

pool-1-thread-1 >>> start.
pool-1-thread-2 >>> start.
pool-1-thread-3 >>> start.
main >>> start.
pool-1-thread-5 >>> start.
pool-1-thread-4 >>> start.
main >>> start.
pool-1-thread-4 >>> start.
pool-1-thread-3 >>> start.
pool-1-thread-2 >>> start.
pool-1-thread-1 >>> start.
pool-1-thread-5 >>> start.
pool-1-thread-4 >>> start.
pool-1-thread-5 >>> start.
pool-1-thread-1 >>> start.

Process finished with exit code 0

5.2 自定義線程池和線程工廠

5.2.1 自定義線程池贪壳,實(shí)現(xiàn)計(jì)時(shí)和統(tǒng)計(jì)功能

public class MyThreadPoolExecutor extends ThreadPoolExecutor {
    private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
    private final Logger log = Logger.getLogger("MyThreadPoolExecutor");
    private final AtomicLong numTasks = new AtomicLong(1);
    private final AtomicLong totalTime = new AtomicLong();

    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);

    }
    /**
     * 任務(wù)執(zhí)行前
     */
    protected void beforeExecute(Thread t,Runnable r){
        super.beforeExecute(t, r);
        log.fine(String.format("Thread %s: start %s",t,r));
        startTime.set((long) (System.nanoTime()/Math.pow(10, 9)));
    }
    /**
     * 任務(wù)執(zhí)行后
     * @param r 任務(wù)
     * @param t 執(zhí)行任務(wù)的線程
     */
    protected void afterExecutor(Runnable r,Throwable t){
        try {
            Long endTime = (long) (System.nanoTime() / Math.pow(10,9));
            Long taskTime = endTime - startTime.get();
            numTasks.incrementAndGet();
            totalTime.addAndGet(taskTime);
            log.fine(String.format("Thread %s: end%s,time=%ds", taskTime));
        } finally {
            super.afterExecute(r, t);
        }
    }

    protected void terminated () {
        try {
            log.info(String.format("Terminated: avg time=%ds", totalTime.get() / numTasks.get()));
        } finally {
            super.terminated();
        }
    }
}

5.2.2 自定義線程工廠,自定義線程名稱(chēng)

class MyThreadFactory implements ThreadFactory {

    private String name;
    private int counter;

    public MyThreadFactory(String name) {
        this.name = name;
        this.counter = 1;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r);
        t.setName("[Task ID + " + (counter++) + ": " + name + "]");
        return t;
    }
}

public class ThreadPoolDemo {

    public static void main(String[] args) {
        ThreadPoolExecutor exec = new ThreadPoolExecutor(3, 5, 2,
                TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(5), new MyThreadFactory("Test Thread"), new ThreadPoolExecutor.CallerRunsPolicy());
        for (int i = 0; i < 15; i++) {
            exec.execute(new Task());
        }
        exec.shutdown();
    }

}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末蚜退,一起剝皮案震驚了整個(gè)濱河市闰靴,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌钻注,老刑警劉巖蚂且,帶你破解...
    沈念sama閱讀 216,919評(píng)論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異幅恋,居然都是意外死亡杏死,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,567評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén)捆交,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)淑翼,“玉大人,你說(shuō)我怎么就攤上這事品追⌒ǎ” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 163,316評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵肉瓦,是天一觀的道長(zhǎng)遭京。 經(jīng)常有香客問(wèn)我,道長(zhǎng)泞莉,這世上最難降的妖魔是什么哪雕? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,294評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮戒财,結(jié)果婚禮上热监,老公的妹妹穿的比我還像新娘。我一直安慰自己饮寞,他們只是感情好孝扛,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,318評(píng)論 6 390
  • 文/花漫 我一把揭開(kāi)白布列吼。 她就那樣靜靜地躺著,像睡著了一般苦始。 火紅的嫁衣襯著肌膚如雪寞钥。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,245評(píng)論 1 299
  • 那天陌选,我揣著相機(jī)與錄音理郑,去河邊找鬼。 笑死咨油,一個(gè)胖子當(dāng)著我的面吹牛您炉,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播役电,決...
    沈念sama閱讀 40,120評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼赚爵,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了法瑟?” 一聲冷哼從身側(cè)響起冀膝,我...
    開(kāi)封第一講書(shū)人閱讀 38,964評(píng)論 0 275
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎霎挟,沒(méi)想到半個(gè)月后窝剖,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,376評(píng)論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡酥夭,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,592評(píng)論 2 333
  • 正文 我和宋清朗相戀三年赐纱,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片采郎。...
    茶點(diǎn)故事閱讀 39,764評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡千所,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出蒜埋,到底是詐尸還是另有隱情淫痰,我是刑警寧澤,帶...
    沈念sama閱讀 35,460評(píng)論 5 344
  • 正文 年R本政府宣布整份,位于F島的核電站待错,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏烈评。R本人自食惡果不足惜火俄,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,070評(píng)論 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望讲冠。 院中可真熱鬧瓜客,春花似錦、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,697評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至疯攒,卻和暖如春嗦随,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背敬尺。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,846評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工枚尼, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人砂吞。 一個(gè)月前我還...
    沈念sama閱讀 47,819評(píng)論 2 370
  • 正文 我出身青樓署恍,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親呜舒。 傳聞我的和親對(duì)象是個(gè)殘疾皇子锭汛,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,665評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容