《實戰(zhàn)高并發(fā)程序設(shè)計》讀書筆記-線程池

線程池

為了能夠更好地控制多線程,JDK提供了一套Executor框架拨与,幫助開發(fā)人員有效地進行線程控制忆蚀,其本質(zhì)就是一個線程池根蟹。


image.png

以上成員均在java.util.concurrent包中蕾域,是JDK并發(fā)包的核心類拷肌。

Executor框架提供了各種類型的線程池,主要有以下工廠方法

public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newCachedThreadPool()
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

以上工廠方法分別返回具有不同工作特性的線程池旨巷。這些線程池工廠方法的具體說明如下:

  • newFixedThreadPool()方法:該方法返回一個固定線程數(shù)量的線程池巨缘。該線程池中的線程數(shù)量始終不變。當有一個新的任務提交時契沫,線程池中若有空閑線程,則立即執(zhí)行昔汉。若沒有懈万,則新的任務會被暫存在一個任務隊列中,待有線程空閑時靶病,便處理在任務隊列中的任務会通。
  • newSingleThreadExecutor()方法:該方法返回一個只有一個線程的線程池。若多余一個任務被提交到該線程池娄周,任務會被保存在一個任務隊列中涕侈,待線程空閑,按先入先出的順序執(zhí)行隊列中的任務煤辨。
  • newCachedThreadPool()方法:該方法返回一個可根據(jù)實際情況調(diào)整線程數(shù)量的線程池裳涛。線程池的線程數(shù)量不確定,但若有空閑線程可以復用众辨,則會優(yōu)先使用可復用的線程端三。若所有線程均在工作,又有新的任務提交鹃彻,則會創(chuàng)建新的線程處理任務郊闯。所有線程在當前任務執(zhí)行完畢后,將返回線程池進行復用蛛株。
  • newSingleThreadScheduledExecutor()方法:該方法返回一個ScheduledExecutorService對象团赁,線程池大小為1。ScheduledExecutorService接口在ExecutorService接口之上擴展了在給定時間執(zhí)行某任務的功能谨履,如在某個固定的延時之后執(zhí)行欢摄,或者周期性執(zhí)行某個任務
  • newScheduledThreadPool()方法:該方法也返回一個ScheduledExecutorService對象笋粟,但該線程池可以指定線程數(shù)量剧浸。

固定大小的線程池

01 public class ThreadPoolDemo {
02     public static class MyTask implements Runnable {
03         @Override
04         public void run() {
05             System.out.println(System.currentTimeMillis() + ":Thread ID:"
06                     + Thread.currentThread().getId());
07             try {
08                 Thread.sleep(1000);
09             } catch (InterruptedException e) {
10                 e.printStackTrace();
11             }
12         }
13     }
14
15     public static void main(String[] args) {
16         MyTask task = new MyTask();
17         ExecutorService es = Executors.newFixedThreadPool(5);
18         for (int i = 0; i < 10; i++) {
19             es.submit(task);
20         }
21     }
22 }

輸出

1426510293450:Thread ID:8
1426510293450:Thread ID:9
1426510293450:Thread ID:12
1426510293450:Thread ID:10
1426510293450:Thread ID:11
1426510294450:Thread ID:12
1426510294450:Thread ID:11
1426510294450:Thread ID:8
1426510294450:Thread ID:10
1426510294450:Thread ID:9

計劃任務

newScheduledThreadPool()锹引。它返回一個ScheduledExecutorService對象,可以根據(jù)時間需要對線程進行調(diào)度唆香。它的一些主要方法如下:

public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                 long initialDelay,
                                                 long period,
                                                 TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                    long initialDelay,
                                                    long delay,
                                                    TimeUnit unit);

計劃任務即定時線程池嫌变,定時執(zhí)行任務,這個定時有兩個維度躬它,周期和延遲:

  • scheduleAtFixedRate
    • 創(chuàng)建一個周期性任務腾啥。任務開始于給定的初始延時。后續(xù)的任務按照給定的周期進行:后續(xù)第一個任務將會在initialDelay+period時執(zhí)行冯吓,后續(xù)第二個任務將在initialDelay+2period時進行倘待,依此類推。即按照已固定的頻率來執(zhí)行某項計劃(任務)*组贺。
  • scheduleWithFixedDelay
    • 創(chuàng)建并執(zhí)行一個周期性任務凸舵。任務開始于初始延時時間,后續(xù)任務將會按照給定的延時進行失尖,即上一個任務的結(jié)束時間到下一個任務的開始時間的時間差啊奄。這個周期執(zhí)行,是等上一次任務結(jié)束后掀潮,等待一個延遲時間執(zhí)行一次任務菇夸,即每個任務之間按照固定的延遲執(zhí)行任務
image.png

下面的例子使用scheduleAtFixedRate()方法調(diào)度一個任務仪吧。這個任務會執(zhí)行1秒鐘時間庄新,調(diào)度周期是2秒。也就是說每2秒鐘薯鼠,任務就會被執(zhí)行一次择诈。

01 public class ScheduledExecutorServiceDemo {
02     public static void main(String[] args) {
03         ScheduledExecutorService ses=Executors.newScheduledThreadPool(10);
04         //如果前面的任務沒有完成,則調(diào)度也不會啟動
05         ses.scheduleAtFixedRate(new Runnable() {
06             @Override
07             public void run() {
08                 try {
09                     Thread.sleep(1000);
10                     System.out.println(System.currentTimeMillis()/1000);
11                 } catch (InterruptedException e) {
12                     e.printStackTrace();
13                 }
14             }
15         }, 0, 2, TimeUnit.SECONDS);
16     }
17 }

執(zhí)行上述代碼出皇,一種輸出的可能如下:

1426515345
1426515347
1426515349
1426515351

上述輸出的單位是秒吭从。可以看到恶迈,時間間隔是2秒涩金。

如果任務的執(zhí)行時間超過調(diào)度時間,會發(fā)生什么情況呢暇仲?

將第9行的代碼改為

Thread.sleep(8000);

再執(zhí)行上述代碼步做,你就會發(fā)現(xiàn)任務的執(zhí)行周期不再是2秒,而是變成了8秒奈附。如下所示全度,是一種可能的結(jié)果。

1426516323
1426516331
1426516339
1426516347
1426516355

也就是說斥滤,周期如果太短将鸵,那么任務就會在上一個任務結(jié)束后勉盅,立即被調(diào)用。

另外一個值得注意的問題是顶掉,調(diào)度程序?qū)嶋H上并不保證任務會無限期的持續(xù)調(diào)用草娜。如果任務本身拋出了異常,那么后續(xù)的所有執(zhí)行都會被中斷痒筒,因此宰闰,如果你想讓你的任務持續(xù)穩(wěn)定的執(zhí)行,那么做好異常處理就非常重要簿透,否則移袍,你很有可能觀察到你的調(diào)度器無疾而終。

注意:如果任務遇到異常老充,那么后續(xù)的所有子任務都會停止調(diào)度葡盗,因此,必須保證異常被及時處理啡浊,為周期性任務的穩(wěn)定調(diào)度提供條件觅够。

核心線程池的內(nèi)部實現(xiàn)

ThreadPoolExecutor最重要的構(gòu)造函數(shù)

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
  • corePoolSize:指定了線程池中的線程數(shù)量。

  • maximumPoolSize:指定了線程池中的最大線程數(shù)量虫啥。

  • keepAliveTime:當線程池線程數(shù)量超過corePoolSize時蔚约,多余的空閑線程的存活時間奄妨。即涂籽,超過

    corePoolSize的空閑線程,在多長時間內(nèi)砸抛,會被銷毀评雌。

  • unit:keepAliveTime的單位。

  • workQueue:任務隊列直焙,被提交但尚未被執(zhí)行的任務景东。

  • threadFactory:線程工廠,用于創(chuàng)建線程奔誓,一般用默認的即可斤吐。

  • handler:拒絕策略。當任務太多來不及處理厨喂,如何拒絕任務和措。

以上參數(shù)中,大部分都很簡單蜕煌,只有workQueue和handler需要進行詳細說明派阱。

workQueue

參數(shù)workQueue指被提交但未執(zhí)行的任務隊列,它是一個BlockingQueue接口的對象斜纪,僅用于存放Runnable對象贫母。根據(jù)隊列功能分類文兑,在ThreadPoolExecutor的構(gòu)造函數(shù)中可使用以下幾種BlockingQueue。

  • 直接提交的隊列:該功能由SynchronousQueue對象提供腺劣。SynchronousQueue是一個特殊的BlockingQueue绿贞。SynchronousQueue沒有容量,每一個插入操作都要等待一個相應的刪除操作誓酒,反之樟蠕,每一個刪除操作都要等待對應的插入操作。如果使用SynchronousQueue靠柑,提交的任務不會被真實的保存寨辩,而總是將新任務提交給線程執(zhí)行,如果沒有空閑的進程歼冰,則嘗試創(chuàng)建新的進程靡狞,如果進程數(shù)量已經(jīng)達到最大值,則執(zhí)行拒絕策略隔嫡。因此甸怕,使用SynchronousQueue隊列,通常要設(shè)置很大的maximumPoolSize值腮恩,否則很容易執(zhí)行拒絕策略梢杭。即SynchronousQueue沒有容量,提交的任務直接交給線程執(zhí)行秸滴,不保存任務武契,如果進程數(shù)量已經(jīng)達到最大值,則執(zhí)行拒絕策略荡含。

  • 有界的任務隊列:有界的任務隊列可以使用ArrayBlockingQueue實現(xiàn)咒唆。ArrayBlockingQueue的構(gòu)造函數(shù)必須帶一個容量參數(shù),表示該隊列的最大容量释液,如下所示全释。

public ArrayBlockingQueue(int capacity)
    • 當使用有界的任務隊列時,若有新的任務需要執(zhí)行误债,如果線程池的實際線程數(shù)小于corePoolSize浸船,則會優(yōu)先創(chuàng)建 新的線程,若大于corePoolSize寝蹈,則會將新任務加入等待隊列李命。若等待隊列已滿,無法加入躺盛,則在總線程數(shù)不大于maximumPoolSize的前提下项戴,創(chuàng)建新的進程執(zhí)行任務。若大于maximumPoolSize槽惫,則執(zhí)行拒絕策略周叮”绯牛可見,有界隊列僅當在任務隊列裝滿時仿耽,才可能將線程數(shù)提升到corePoolSize以上合冀,換言之,除非系統(tǒng)非常繁忙项贺,否則確保核心線程數(shù)維持在在corePoolSize君躺。
  • 無界的任務隊列:無界任務隊列可以通過LinkedBlockingQueue類實現(xiàn)。與有界隊列相比开缎,除非系統(tǒng)資源耗盡棕叫,否則無界的任務隊列不存在任務入隊失敗的情況。當有新的任務到來奕删,系統(tǒng)的線程數(shù)小于corePoolSize時俺泣,線程池會生成新的線程執(zhí)行任務,但當系統(tǒng)的線程數(shù)達到corePoolSize后完残,就不會繼續(xù)增加伏钠。若后續(xù)仍有新的任務加入,而又沒有空閑的線程資源谨设,則任務直接進入隊列等待熟掂。若任務創(chuàng)建和處理的速度差異很大,無界隊列會保持快速增長扎拣,直到耗盡系統(tǒng)內(nèi)存赴肚。即如果使用無界隊列,正在工作的線程數(shù)最大只能是corePoolSize鹏秋,超過則直接進入隊列等待尊蚁,無界隊列可以無限增長亡笑,直到系統(tǒng)內(nèi)存耗盡侣夷。

  • 優(yōu)先任務隊列:優(yōu)先任務隊列是帶有執(zhí)行優(yōu)先級的隊列。它通過PriorityBlockingQueue實現(xiàn)仑乌,可以控制任務的執(zhí)行先后順序百拓。它是一個特殊的無界隊列。無論是有界隊列ArrayBlockingQueue晰甚,還是未指定大小的無界隊列LinkedBlockingQueue都是按照先進先出算法處理任務的衙传。而PriorityBlockingQueue則可以根據(jù)任務自身的優(yōu)先級順序先后執(zhí)行,在確保系統(tǒng)性能的同時厕九,也能有很好的質(zhì)量保證(總是確保高優(yōu)先級的任務先執(zhí)行)蓖捶。

notice:

newFixedThreadPool()和newCachedThreadPool()的線程池數(shù)量

newFixedThreadPool()

回顧newFixedThreadPool()方法的實現(xiàn)。它返回了一個corePoolSize和maximumPoolSize大小一樣的扁远,并且使用了LinkedBlockingQueue任務隊列的線程池俊鱼。因為對于固定大小的線程池而言刻像,不存在線程數(shù)量的動態(tài)變化贮乳,因此corePoolSize和maximumPoolSize可以相等符隙。同時惰爬,它使用無界隊列存放無法立即執(zhí)行的任務喊积,當任務提交非常頻繁的時候梢夯,該隊列可能迅速膨脹呛谜,從而耗盡系統(tǒng)資源市怎。
newSingleThreadExecutor()返回的單線程線程池糠排,是newFixedThreadPool()方法的一種退化犀填,只是簡單的將線程池線程數(shù)量設(shè)置為1蠢壹。

newCachedThreadPool()

newCachedThreadPool()方法返回corePoolSize為0,maximumPoolSize無窮大的線程池九巡,這意味著在沒有任務時知残,該線程池內(nèi)無線程,而當任務被提交時比庄,該線程池會使用空閑的線程執(zhí)行任務求妹,若無空閑線程,則將任務加入SynchronousQueue隊列佳窑,而SynchronousQueue隊列是一種直接提交的隊列制恍,它總會迫使線程池增加新的線程執(zhí)行任務。當任務執(zhí)行完畢后神凑,由于corePoolSize為0净神,因此空閑線程又會在指定時間內(nèi)(60秒)被回收。
對于newCachedThreadPool()溉委,如果同時有大量任務被提交鹃唯,而任務的執(zhí)行又不那么快時,那么系統(tǒng)便會開啟等量的線程處理瓣喊,這樣做法可能會很快耗盡系統(tǒng)的資源坡慌。即newCachedThreadPool()是根據(jù)實際情況調(diào)整線程數(shù)量的線程池,最大可以創(chuàng)建無限多的線程藻三。

ThreadPoolExecutor線程池的核心調(diào)度代碼

01 public void execute(Runnable command) {
02     if (command == null)
03         throw new NullPointerException();
04     int c = ctl.get();
05     if (workerCountOf(c) < corePoolSize) {
06         if (addWorker(command, true))
07             return;
08         c = ctl.get();
09     }
10     if (isRunning(c) && workQueue.offer(command)) {
11         int recheck = ctl.get();
12         if (! isRunning(recheck) && remove(command))
13             reject(command);
14         else if (workerCountOf(recheck) == 0)
15             addWorker(null, false);
16     }
17     else if (!addWorker(command, false))
18         reject(command);
19 }

代碼第5行的workerCountOf()函數(shù)取得了當前線程池的線程總數(shù)洪橘。當線程總數(shù)小于corePoolSize核心線程數(shù)時,會將任務通過addWorker()方法直接調(diào)度執(zhí)行棵帽。否則熄求,則在第10行代碼處(workQueue.offer())進入等待隊列。如果進入等待隊列失敹焊拧(比如有界隊列到達了上限弟晚,或者使用了SynchronousQueue),則會執(zhí)行第17行,將任務直接提交給線程池卿城。如果當前線程數(shù)已經(jīng)達到maximumPoolSize淑履,則提交失敗,就執(zhí)行第18行的拒絕策略藻雪。

image.png

拒絕策略(handler)

ThreadPoolExecutor的最后一個參數(shù)指定了拒絕策略秘噪。也就是當任務數(shù)量超過系統(tǒng)實際承載能力時,使用拒絕策略勉耀。

JDK內(nèi)置的拒絕策略如下指煎。

  • AbortPolicy策略:該策略會直接拋出異常,阻止系統(tǒng)正常工作便斥。
  • CallerRunsPolicy策略:只要線程池未關(guān)閉至壤,該策略直接在調(diào)用者線程中,運行當前被丟棄的任務枢纠。顯然這樣做不會真的丟棄任務像街,但是,任務提交線程的性能極有可能會急劇下降晋渺。即該拒絕策略丟棄的線程放到調(diào)用者的線程中執(zhí)行镰绎,也就是誰調(diào)用線程池,該任務就交給誰執(zhí)行木西。
  • DiscardOledestPolicy策略:該策略將丟棄最老的一個請求畴栖,也就是即將被執(zhí)行的一個任務,并嘗試再次提交當前任務八千。
  • DiscardPolicy策略:該策略默默地丟棄無法處理的任務吗讶,不予任何處理。如果允許任務丟失恋捆,我覺得這可能是最好的一種方案了吧照皆!

notice: 執(zhí)行拒絕策略的前提都是隊列已滿,隊列將請求交給線程池沸停。

以上內(nèi)置的策略均實現(xiàn)了RejectedExecutionHandler接口膜毁,若以上策略仍無法滿足實際應用需要,完全可以自己擴展RejectedExecutionHandler接口星立。RejectedExecutionHandler的定義如下:

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

其中r為請求執(zhí)行的任務爽茴,executor為當前的線程池葬凳。
下面的代碼簡單地演示了自定義線程池和拒絕策略的使用:

01 public class RejectThreadPoolDemo {
02     public static class MyTask implements Runnable {
03         @Override
04         public void run() {
05             System.out.println(System.currentTimeMillis() + ":Thread ID:"
06                     + Thread.currentThread().getId());
07             try {
08                 Thread.sleep(100);
09             } catch (InterruptedException e) {
10                 e.printStackTrace();
11             }
12         }
13     }
14
15     public static void main(String[] args) throws InterruptedException  {
16         MyTask task = new MyTask();
17         ExecutorService es = new ThreadPoolExecutor(5, 5,
18                 0L, TimeUnit.MILLISECONDS,
19                 new LinkedBlockingQueue<Runnable>(10),
20                 Executors.defaultThreadFactory(),
21                 new RejectedExecutionHandler(){
22                     @Override
23                     public void rejectedExecution(Runnable r,
24                             ThreadPoolExecutor executor) {
25                         System.out.println(r.toString()+" is discard");
26                     }
27         });
28         for (int i = 0; i < Integer.MAX_VALUE; i++) {
29             es.submit(task);
30            Thread.sleep(10);
31         }
32     }
33 }

? 上述代碼的第17~27行自定義了一個線程池绰垂。該線程池有5個常駐線程,并且最大線程數(shù)量也是5個火焰。這和固定大小的線程池是一樣的劲装。但是它卻擁有一個只有10個容量的等待隊列。因為使用無界隊列很可能并不是最佳解決方案,如果任務量極大占业,很有可能會把內(nèi)存撐爆绒怨。給出一個合理的隊列大小,也是合乎常理的選擇谦疾。同時南蹂,這里自定義了拒絕策略,我們不拋出異常念恍,因為萬一在任務提交端沒有進行異常處理六剥,則有可能使得整個系統(tǒng)都崩潰,這極有可能不是我們希望遇到的峰伙。但作為必要的信息記錄疗疟,我們將任務丟棄的信息進行打印,當然瞳氓,這只比內(nèi)置的DiscardPolicy策略高級那么一點點策彤。

? 由于在這個案例中,MyTask執(zhí)行需要花費100毫秒匣摘,因此店诗,必然會導致大量的任務被直接丟棄。執(zhí)行上述代碼音榜,可能的部分輸出如下:

1426597264669:Thread ID:11
1426597264679:Thread ID:12
java.util.concurrent.FutureTask@a57993 is discard
java.util.concurrent.FutureTask@1b84c92 is discard

可以看到必搞,在執(zhí)行幾個任務后,拒絕策略就開始生效了囊咏。在實際應用中恕洲,我們可以將更詳細的信息記錄到日志中,來分析系統(tǒng)的負載和任務丟失的情況梅割。

自定義線程創(chuàng)建:ThreadFactory

線程池的主要作用是為了線程復用霜第,也就是避免了線程的頻繁創(chuàng)建。但是户辞,最開始的那些線程從何而來呢泌类?答案就是ThreadFactory。
ThreadFactory是一個接口底燎,它只有一個方法刃榨,用來創(chuàng)建線程:

Thread newThread(Runnable r);

? 當線程池需要新建線程時,就會調(diào)用這個方法双仍。
? 自定義線程池可以幫助我們做不少事枢希。比如:

  • 可以跟蹤線程池究竟在何時創(chuàng)建了多少線程

  • 可以自定義線程的名稱、組以及優(yōu)先級等信息

  • 可以任性地將所有的線程設(shè)置為守護線程朱沃。

總之苞轿,使用自定義線程池可以讓我們更加自由地設(shè)置池子中所有線程的狀態(tài)茅诱。下面的案例使用自定義的ThreadFactory,一方面記錄了線程的創(chuàng)建搬卒,另一方面將所有的線程都設(shè)置為守護線程瑟俭,這樣,當主線程退出后契邀,將會強制銷毀線程池(這個銷毀線程池的思路真是石破天驚)摆寄。

01 public static void main(String[] args) throws InterruptedException {
02     MyTask task = new MyTask();
03     ExecutorService es = new ThreadPoolExecutor(5, 5,
04             0L, TimeUnit.MILLISECONDS,
05             new SynchronousQueue<Runnable>(),
06             new ThreadFactory(){
07                 @Override
08                 public Thread newThread(Runnable r) {
09                     Thread t= new Thread(r);
10                     t.setDaemon(true);
11                     System.out.println("create "+t);
12                     return t;
13                 }
14             }
15            );
16     for (int i = 0; i < 5; i++) {
17         es.submit(task);
18     }
19     Thread.sleep(2000);
20 }

擴展線程池

ThreadPoolExecutor也是一個可以擴展的線程池。它提供了beforeExecute()坯门、afterExecute()和terminated()三個接口對線程池進行控制椭迎。
以beforeExecute()、afterExecute()為例田盈,在ThreadPoolExecutor.Worker. runTask()方法內(nèi)部提供了這樣的實現(xiàn):

boolean ran = false;
beforeExecute(thread, task);                            //運行前
try {
    task.run();                                         //運行任務
    ran = true;
    afterExecute(task, null);                           //運行結(jié)束后
    ++completedTasks;
} catch (RuntimeException ex) {
    if (!ran)
        afterExecute(task, ex);                         //運行結(jié)束
    throw ex;
}

ThreadPoolExecutor.Worker是ThreadPoolExecutor的內(nèi)部類畜号,它是一個實現(xiàn)了Runnable接口的類。ThreadPoolExecutor線程池中的工作線程也正是Worker實例允瞧。Worker.runTask()方法會被線程池以多線程模式異步調(diào)用简软,即Worker.runTask()會同時被多個線程訪問。因此其beforeExecute()述暂、afterExecute()接口也將同時多線程訪問痹升。
在默認的ThreadPoolExecutor實現(xiàn)中,提供了空的beforeExecute()和afterExecute()實現(xiàn)畦韭。在實際應用中疼蛾,可以對其進行擴展來實現(xiàn)對線程池運行狀態(tài)的跟蹤,輸出一些有用的調(diào)試信息艺配,以幫助系統(tǒng)故障診斷察郁,這對于多線程程序錯誤排查是很有幫助的。下面演示了對線程池的擴展转唉,在這個擴展中皮钠,我們將記錄每一個任務的執(zhí)行日志。(這種擴展太棒了)

線程池擴展起始時間赠法,完成時間麦轰,退出,和為線程賦名字范式

01 public class ExtThreadPool {
02     public static class MyTask implements Runnable {
03         public String name;
04
05         public MyTask(String name) {
06             this.name = name;
07         }
08
09         @Override
10         public void run() {
11             System.out.println("正在執(zhí)行" + ":Thread ID:" + Thread. currentThread().getId()
12                     + ",Task Name=" + name);
13             try {
14                 Thread.sleep(100);
15             } catch (InterruptedException e) {
16                 e.printStackTrace();
17             }
18         }
19     }
20
21     public static void main(String[] args) throws InterruptedException {
22
23       ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
24                 new LinkedBlockingQueue<Runnable>()) {
25             @Override
26             protected void beforeExecute(Thread t, Runnable r) {
27                 System.out.println("準備執(zhí)行:" + ((MyTask) r).name);
28             }
29
30             @Override
31             protected void afterExecute(Runnable r, Throwable t) {
32                 System.out.println("執(zhí)行完成:" + ((MyTask) r).name);
33             }
34
35             @Override
36             protected void terminated() {
37                 System.out.println("線程池退出");
38             }
39
40         };
41         for (int i = 0; i < 5; i++) {
42             MyTask task = new MyTask("TASK-GEYM-" + i);
43             es.execute(task);
44             Thread.sleep(10);
45         }
46         es.shutdown();
47     }
48 }

輸出

準備執(zhí)行:TASK-GEYM-0
正在執(zhí)行:Thread ID:8,Task Name=TASK-GEYM-0
準備執(zhí)行:TASK-GEYM-1
正在執(zhí)行:Thread ID:9,Task Name=TASK-GEYM-1
準備執(zhí)行:TASK-GEYM-2
正在執(zhí)行:Thread ID:10,Task Name=TASK-GEYM-2
準備執(zhí)行:TASK-GEYM-3
正在執(zhí)行:Thread ID:11,Task Name=TASK-GEYM-3
準備執(zhí)行:TASK-GEYM-4
正在執(zhí)行:Thread ID:12,Task Name=TASK-GEYM-4
執(zhí)行完成:TASK-GEYM-0
執(zhí)行完成:TASK-GEYM-1
執(zhí)行完成:TASK-GEYM-2
執(zhí)行完成:TASK-GEYM-3
執(zhí)行完成:TASK-GEYM-4
線程池退出

上述代碼在第23~40行砖织,擴展了原有的線程池款侵,實現(xiàn)了beforeExecute()、afterExecute()和terminiated()三個方法侧纯。這三個方法分別用于記錄一個任務的開始新锈、結(jié)束和整個線程池的退出。在第42~43行茂蚓,向線程池提交5個任務壕鹉,為了有更清晰的日志剃幌,我們?yōu)槊總€任務都取了一個不同的名字聋涨。

在提交完成后晾浴,調(diào)用shutdown()方法關(guān)閉線程池。這是一個比較安全的方法牍白,如果當前正有線程在執(zhí)行脊凰,shutdown()方法并不會立即暴力地終止所有任務,它會等待所有任務執(zhí)行完成后茂腥,再關(guān)閉線程池狸涌,但它并不會等待所有線程執(zhí)行完成后再返回,因此最岗,可以簡單地理解成shutdown()只是發(fā)送了一個關(guān)閉信號而已帕胆。但在shutdown()方法執(zhí)行后,這個線程池就不能再接受其他新的任務了般渡。

優(yōu)化線程池線程數(shù)量

在《Java Concurrency in Practice》一書中給出了一個估算線程池大小的經(jīng)驗公式:

image.png

為保持處理器達到期望的使用率懒豹,最優(yōu)的池的大小等于:

image.png

在Java中,可以通過:

Runtime.getRuntime().availableProcessors()

取得可用的CPU數(shù)量驯用。通過這么為線程池賦予線程數(shù)更加優(yōu)雅脸秽。

在線程池中尋找堆棧

處理不好線程的異常,線程池會吞異常信息

public class DivTask implements Runnable {
    int a,b;
    public DivTask(int a,int b){
        this.a=a;
        this.b=b;
    }
    @Override
    public void run() {
        double re=a/b;
        System.out.println(re);
    }
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
    ThreadPoolExecutor pools=new ThreadPoolExecutor(0, Integer.MAX_VALUE,
            0L, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>());

    for(int i=0;i<5;i++){
        pools.submit(new DivTask(100,i));
    }
}

上述代碼將DivTask提交到線程池蝴乔,從這個for循環(huán)來看记餐,我們應該會得到5個結(jié)果,分別是100除以給定的i后的商薇正。但如果你真的運行程序片酝,你得到的全部結(jié)果是:

33.0
50.0
100.0
25.0

? 只有4個輸出。也就說是程序漏算了一組數(shù)據(jù)挖腰!但更不幸的是钠怯,程序沒有任何日志,沒有任何錯誤提示曙聂,就好像一切都正常一樣晦炊。在這個簡單的案例中,只要你稍有經(jīng)驗宁脊,你就能發(fā)現(xiàn)断国,作為除數(shù)的i取到了0,這個缺失的值很可能是由于除以0導致的榆苞。但在稍復雜的業(yè)務場景中稳衬,這種錯誤足可以讓你幾天萎靡不振。
? 因此坐漏,使用線程池雖然是件好事薄疚,但是還是得處處留意這些“坑”碧信。線程池很有可能會“吃”掉程序拋出的異常,完全不打印堆棧信息街夭,導致我們對程序的錯誤一無所知砰碴。

<font color=red>解決方案</font>

解決這個問題的一個最簡單的方式就是放棄submit(),改用execute()板丽。將上述的任務提交代碼改成:

pools.execute(new DivTask(100,i));

或者你使用下面的方法改造你的submit():

Future re=pools.submit(new DivTask(100,i));
re.get();

上面兩種方法都可以得到部分堆棧信息呈枉,如下所示:

Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
   at geym.conc.ch3.trace.DivTask.run(DivTask.java:11)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor. java:1142)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor. java:617)
   at java.lang.Thread.run(Thread.java:745)
33.0
100.0
50.0
25.0

顯然只有部分堆棧信息是不夠的,從這兩個異常堆棧中我們只能知道異常是在哪里拋出的(這里是DivTask的第11行)埃碱。但是我們還希望得到另外一個更重要的信息猖辫,那就是這個任務到底是在哪里提交的?而任務的具體提交位置已經(jīng)被線程池完全淹沒了砚殿。順著堆棧啃憎,我們最多只能找到線程池中的調(diào)度流程,而這對于我們幾乎是沒有價值的似炎。

所以需要擴展ThreadPoolExecutor線程池辛萍,讓它在調(diào)度任務之前,先保存一下提交任務線程的堆棧信息名党。

01 public class TraceThreadPoolExecutor extends ThreadPoolExecutor {
02     public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
03             long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
04         super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
05     }
05     }
06
07     @Override
08     public void execute(Runnable task) {
09         super.execute(wrap(task, clientTrace(), Thread.currentThread()
10                 .getName()));
11     }
12
13     @Override
14     public Future<?> submit(Runnable task) {
15         return super.submit(wrap(task, clientTrace(), Thread.currentThread()
16                 .getName()));
17     }
18
19     private Exception clientTrace() {
20         return new Exception("Client stack trace");
21     }
22
23     private Runnable wrap(final Runnable task, final Exception clientStack,
24             String clientThreadName) {
25         return new Runnable() {
26             @Override
27             public void run() {
28                 try {
29                     task.run();
30                 } catch (Exception e) {
31                     clientStack.printStackTrace();
32                     throw e;
33                 }
34             }
35         };
36     }
37 }

在第23行代碼中叹阔,wrap()方法的第2個參數(shù)為一個異常,里面保存著提交任務的線程的堆棧信息传睹。該方法將我們傳入的Runnable任務進行一層包裝耳幢,使之能處理異常信息。當任務發(fā)生異常時欧啤,這個異常會被打印睛藻。

14 public static void main(String[] args) {
15     ThreadPoolExecutor pools=new TraceThreadPoolExecutor(0, Integer.MAX_VALUE,
16         0L, TimeUnit.SECONDS,
17         new SynchronousQueue<Runnable>());
18
19     /**
20      * 錯誤堆棧中可以看到是在哪里提交的任務
21      */
22     for(int i=0;i<5;i++){
23         pools.execute(new DivTask(100,i));
24     }
25 }

執(zhí)行上述代碼,就可以得到以下信息:

java.lang.Exception: Client stack trace
   at geym.conc.ch3.trace.TraceThreadPoolExecutor.clientTrace(TraceThreadPoolExecutor.java:28)
   at geym.conc.ch3.trace.TraceThreadPoolExecutor.execute(TraceThreadPoolExecutor.java:17)
   at geym.conc.ch3.trace.TraceMain.main(TraceMain.java:23) //這里顯示了線程的提交位置
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
   at geym.conc.ch3.trace.DivTask.run(DivTask.java:11)
   at geym.conc.ch3.trace.TraceThreadPoolExecutor$1.run(TraceThreadPoolExecutor.java:37)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
   at java.lang.Thread.run(Thread.java:745)
   33.0
100.0
25.0
50.0

熟悉的異常又回來了邢隧!現(xiàn)在店印,我們不僅可以得到異常發(fā)生的Runnable實現(xiàn)內(nèi)的信息,我們也知道了這個任務是在哪里提交的倒慧。如此豐富的信息按摘,我相信可以幫助我們瞬間定位問題!

Fork/Join框架

拆分匯總

使用Fork/Join進行數(shù)據(jù)處理時的總體結(jié)構(gòu)如圖

image.png

通過Fork/Join可以提高線程利用率纫谅,和避免數(shù)據(jù)競爭

提高利用率

線程A已經(jīng)把自己的任務都執(zhí)行完成了炫贤,而線程B還有一堆任務等著處理,此時付秕,線程A就會“幫助”線程B兰珍,從線程B的任務隊列中拿一個任務過來處理,盡可能地達到平衡询吴。

避免數(shù)據(jù)競爭

當線程試圖幫助別人時掠河,總是從任務隊列的底部開始拿數(shù)據(jù)亮元,而線程試圖執(zhí)行自己的任務時,則是從相反的頂部開始拿唠摹。

image.png

下面我們來看一下ForkJoinPool的一個重要的接口:

public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)

可以向ForkJoinPool線程池提交一個ForkJoinTask任務爆捞。所謂ForkJoinTask任務就是支持fork()分解以及join()等待的任務。ForkJoinTask有兩個重要的子類跃闹,RecursiveAction和RecursiveTask嵌削。它們分別表示沒有返回值的任務和可以攜帶返回值的任務毛好。

image.png

案例

01 public class CountTask extends RecursiveTask<Long>{
02     private static final int THRESHOLD = 10000;
03     private long start;
04     private long end;
05
06     public CountTask(long start,long end){
07         this.start=start;
08         this.end=end;
09     }
10
11     public Long compute(){
12         long sum=0;
13         boolean canCompute = (end-start)<THRESHOLD;
14         if(canCompute){
15             for(long i=start;i<=end;i++){
16                 sum +=i;
17             }
18         }else{
19             //分成100個小任務
20             long step=(start+end)/100;
21             ArrayList<CountTask> subTasks=new ArrayList<CountTask>();
22             long pos=start;
23             for(int i=0;i<100;i++){
24                 long lastOne=pos+step;
25                 if(lastOne>end)lastOne=end;
26                 CountTask subTask=new CountTask(pos,lastOne);
27                 pos+=step+1;
28                 subTasks.add(subTask);
29                 subTask.fork();//這里進行了遞歸
30             }
31             for(CountTask  t:subTasks){
32                 sum+=t.join();
33             }
34         }
35         return sum;
36     }
37
38     public static void main(String[]args){
39         ForkJoinPool forkJoinPool = new ForkJoinPool();
40         CountTask task = new CountTask(0,200000L);
41         ForkJoinTask<Long> result = forkJoinPool.submit(task);
42         try{
43             long res = result.get();
44             System.out.println("sum="+res);
45         }catch(InterruptedException e){
46             e.printStackTrace();
47         }catch(ExecutionException e){
48             e.printStackTrace();
49         }
50     }
51 }

? 由于計算數(shù)列的和必然是需要函數(shù)返回值的望艺,因此選擇RecursiveTask作為任務的模型。上述代碼第39行肌访,建立ForkJoinPool線程池找默。在第40行,構(gòu)造一個計算1到200000求和的任務吼驶。在第41行將任務提交給線程池惩激,線程池會返回一個攜帶結(jié)果的任務,通過get()方法可以得到最終結(jié)果(第43行)蟹演。如果在執(zhí)行g(shù)et()方法時风钻,任務沒有結(jié)束,那么主線程就會在get()方法時等待酒请。

? 在使用ForkJoin時需要注意骡技,如果任務的劃分層次很深,一直得不到返回羞反,那么可能出現(xiàn)兩種情況:第一布朦,系統(tǒng)內(nèi)的線程數(shù)量越積越多,導致性能嚴重下降昼窗。第二是趴,函數(shù)的調(diào)用層次變得很深,最終導致棧溢出澄惊。不同版本的JDK內(nèi)部實現(xiàn)機制可能有差異唆途,從而導致其表現(xiàn)不同。

? 下面的StackOverflowError異常就是加深本例的調(diào)用層次掸驱,在JDK 8上得到的錯誤肛搬。

java.util.concurrent.ExecutionException: java.lang.StackOverflowError
    at java.util.concurrent.ForkJoinTask.get(ForkJoinTask.java:1000)
    at geym.conc.ch3.fork.CountTask.main(CountTask.java:51)
Caused by: java.lang.StackOverflowError

? 此外,F(xiàn)orkJoin線程池使用一個無鎖的棧來管理空閑線程亭敢。如果一個工作線程暫時取不到可用的任務滚婉,則可能會被掛起,掛起的線程將會被壓入由線程池維護的棧中帅刀。待將來有任務可用時让腹,再從棧中喚醒這些線程远剩。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市骇窍,隨后出現(xiàn)的幾起案子瓜晤,更是在濱河造成了極大的恐慌,老刑警劉巖腹纳,帶你破解...
    沈念sama閱讀 218,284評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件痢掠,死亡現(xiàn)場離奇詭異,居然都是意外死亡嘲恍,警方通過查閱死者的電腦和手機足画,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,115評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來佃牛,“玉大人淹辞,你說我怎么就攤上這事》溃” “怎么了象缀?”我有些...
    開封第一講書人閱讀 164,614評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長爷速。 經(jīng)常有香客問我央星,道長,這世上最難降的妖魔是什么惫东? 我笑而不...
    開封第一講書人閱讀 58,671評論 1 293
  • 正文 為了忘掉前任莉给,我火速辦了婚禮,結(jié)果婚禮上凿蒜,老公的妹妹穿的比我還像新娘禁谦。我一直安慰自己,他們只是感情好废封,可當我...
    茶點故事閱讀 67,699評論 6 392
  • 文/花漫 我一把揭開白布州泊。 她就那樣靜靜地躺著,像睡著了一般漂洋。 火紅的嫁衣襯著肌膚如雪遥皂。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,562評論 1 305
  • 那天刽漂,我揣著相機與錄音演训,去河邊找鬼。 笑死贝咙,一個胖子當著我的面吹牛样悟,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播,決...
    沈念sama閱讀 40,309評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼窟她,長吁一口氣:“原來是場噩夢啊……” “哼陈症!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起震糖,我...
    開封第一講書人閱讀 39,223評論 0 276
  • 序言:老撾萬榮一對情侶失蹤录肯,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后吊说,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體论咏,經(jīng)...
    沈念sama閱讀 45,668評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,859評論 3 336
  • 正文 我和宋清朗相戀三年颁井,在試婚紗的時候發(fā)現(xiàn)自己被綠了厅贪。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,981評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡蚤蔓,死狀恐怖卦溢,靈堂內(nèi)的尸體忽然破棺而出糊余,到底是詐尸還是另有隱情秀又,我是刑警寧澤,帶...
    沈念sama閱讀 35,705評論 5 347
  • 正文 年R本政府宣布贬芥,位于F島的核電站吐辙,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏蘸劈。R本人自食惡果不足惜昏苏,卻給世界環(huán)境...
    茶點故事閱讀 41,310評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望威沫。 院中可真熱鬧贤惯,春花似錦、人聲如沸棒掠。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,904評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽烟很。三九已至颈墅,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間雾袱,已是汗流浹背恤筛。 一陣腳步聲響...
    開封第一講書人閱讀 33,023評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留芹橡,地道東北人毒坛。 一個月前我還...
    沈念sama閱讀 48,146評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像,于是被迫代替她去往敵國和親煎殷。 傳聞我的和親對象是個殘疾皇子屡谐,可洞房花燭夜當晚...
    茶點故事閱讀 44,933評論 2 355

推薦閱讀更多精彩內(nèi)容