線程池
為了能夠更好地控制多線程,JDK提供了一套Executor框架拨与,幫助開發(fā)人員有效地進行線程控制忆蚀,其本質(zhì)就是一個線程池根蟹。
以上成員均在java.util.concurrent包中蕾域,是JDK并發(fā)包的核心類拷肌。
Executor框架提供了各種類型的線程池,主要有以下工廠方法
public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newCachedThreadPool()
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
以上工廠方法分別返回具有不同工作特性的線程池旨巷。這些線程池工廠方法的具體說明如下:
- newFixedThreadPool()方法:該方法返回一個固定線程數(shù)量的線程池巨缘。該線程池中的線程數(shù)量始終不變。當有一個新的任務提交時契沫,線程池中若有空閑線程,則立即執(zhí)行昔汉。若沒有懈万,則新的任務會被暫存在一個任務隊列中,待有線程空閑時靶病,便處理在任務隊列中的任務会通。
- newSingleThreadExecutor()方法:該方法返回一個只有一個線程的線程池。若多余一個任務被提交到該線程池娄周,任務會被保存在一個任務隊列中涕侈,待線程空閑,按先入先出的順序執(zhí)行隊列中的任務煤辨。
- newCachedThreadPool()方法:該方法返回一個可根據(jù)實際情況調(diào)整線程數(shù)量的線程池裳涛。線程池的線程數(shù)量不確定,但若有空閑線程可以復用众辨,則會優(yōu)先使用可復用的線程端三。若所有線程均在工作,又有新的任務提交鹃彻,則會創(chuàng)建新的線程處理任務郊闯。所有線程在當前任務執(zhí)行完畢后,將返回線程池進行復用蛛株。
- newSingleThreadScheduledExecutor()方法:該方法返回一個ScheduledExecutorService對象团赁,線程池大小為1。ScheduledExecutorService接口在ExecutorService接口之上擴展了在給定時間執(zhí)行某任務的功能谨履,如在某個固定的延時之后執(zhí)行欢摄,或者周期性執(zhí)行某個任務。
- newScheduledThreadPool()方法:該方法也返回一個ScheduledExecutorService對象笋粟,但該線程池可以指定線程數(shù)量剧浸。
固定大小的線程池
01 public class ThreadPoolDemo {
02 public static class MyTask implements Runnable {
03 @Override
04 public void run() {
05 System.out.println(System.currentTimeMillis() + ":Thread ID:"
06 + Thread.currentThread().getId());
07 try {
08 Thread.sleep(1000);
09 } catch (InterruptedException e) {
10 e.printStackTrace();
11 }
12 }
13 }
14
15 public static void main(String[] args) {
16 MyTask task = new MyTask();
17 ExecutorService es = Executors.newFixedThreadPool(5);
18 for (int i = 0; i < 10; i++) {
19 es.submit(task);
20 }
21 }
22 }
輸出
1426510293450:Thread ID:8
1426510293450:Thread ID:9
1426510293450:Thread ID:12
1426510293450:Thread ID:10
1426510293450:Thread ID:11
1426510294450:Thread ID:12
1426510294450:Thread ID:11
1426510294450:Thread ID:8
1426510294450:Thread ID:10
1426510294450:Thread ID:9
計劃任務
newScheduledThreadPool()锹引。它返回一個ScheduledExecutorService對象,可以根據(jù)時間需要對線程進行調(diào)度唆香。它的一些主要方法如下:
public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
計劃任務即定時線程池嫌变,定時執(zhí)行任務,這個定時有兩個維度躬它,周期和延遲:
- scheduleAtFixedRate
- 創(chuàng)建一個周期性任務腾啥。任務開始于給定的初始延時。后續(xù)的任務按照給定的周期進行:后續(xù)第一個任務將會在initialDelay+period時執(zhí)行冯吓,后續(xù)第二個任務將在initialDelay+2period時進行倘待,依此類推。即按照已固定的頻率來執(zhí)行某項計劃(任務)*组贺。
- scheduleWithFixedDelay
- 創(chuàng)建并執(zhí)行一個周期性任務凸舵。任務開始于初始延時時間,后續(xù)任務將會按照給定的延時進行失尖,即上一個任務的結(jié)束時間到下一個任務的開始時間的時間差啊奄。這個周期執(zhí)行,是等上一次任務結(jié)束后掀潮,等待一個延遲時間執(zhí)行一次任務菇夸,即每個任務之間按照固定的延遲執(zhí)行任務。
下面的例子使用scheduleAtFixedRate()方法調(diào)度一個任務仪吧。這個任務會執(zhí)行1秒鐘時間庄新,調(diào)度周期是2秒。也就是說每2秒鐘薯鼠,任務就會被執(zhí)行一次择诈。
01 public class ScheduledExecutorServiceDemo {
02 public static void main(String[] args) {
03 ScheduledExecutorService ses=Executors.newScheduledThreadPool(10);
04 //如果前面的任務沒有完成,則調(diào)度也不會啟動
05 ses.scheduleAtFixedRate(new Runnable() {
06 @Override
07 public void run() {
08 try {
09 Thread.sleep(1000);
10 System.out.println(System.currentTimeMillis()/1000);
11 } catch (InterruptedException e) {
12 e.printStackTrace();
13 }
14 }
15 }, 0, 2, TimeUnit.SECONDS);
16 }
17 }
執(zhí)行上述代碼出皇,一種輸出的可能如下:
1426515345
1426515347
1426515349
1426515351
上述輸出的單位是秒吭从。可以看到恶迈,時間間隔是2秒涩金。
如果任務的執(zhí)行時間超過調(diào)度時間,會發(fā)生什么情況呢暇仲?
將第9行的代碼改為
Thread.sleep(8000);
再執(zhí)行上述代碼步做,你就會發(fā)現(xiàn)任務的執(zhí)行周期不再是2秒,而是變成了8秒奈附。如下所示全度,是一種可能的結(jié)果。
1426516323
1426516331
1426516339
1426516347
1426516355
也就是說斥滤,周期如果太短将鸵,那么任務就會在上一個任務結(jié)束后勉盅,立即被調(diào)用。
另外一個值得注意的問題是顶掉,調(diào)度程序?qū)嶋H上并不保證任務會無限期的持續(xù)調(diào)用草娜。如果任務本身拋出了異常,那么后續(xù)的所有執(zhí)行都會被中斷痒筒,因此宰闰,如果你想讓你的任務持續(xù)穩(wěn)定的執(zhí)行,那么做好異常處理就非常重要簿透,否則移袍,你很有可能觀察到你的調(diào)度器無疾而終。
注意:如果任務遇到異常老充,那么后續(xù)的所有子任務都會停止調(diào)度葡盗,因此,必須保證異常被及時處理啡浊,為周期性任務的穩(wěn)定調(diào)度提供條件觅够。
核心線程池的內(nèi)部實現(xiàn)
ThreadPoolExecutor最重要的構(gòu)造函數(shù)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize:指定了線程池中的線程數(shù)量。
maximumPoolSize:指定了線程池中的最大線程數(shù)量虫啥。
-
keepAliveTime:當線程池線程數(shù)量超過corePoolSize時蔚约,多余的空閑線程的存活時間奄妨。即涂籽,超過
corePoolSize的空閑線程,在多長時間內(nèi)砸抛,會被銷毀评雌。
unit:keepAliveTime的單位。
workQueue:任務隊列直焙,被提交但尚未被執(zhí)行的任務景东。
threadFactory:線程工廠,用于創(chuàng)建線程奔誓,一般用默認的即可斤吐。
handler:拒絕策略。當任務太多來不及處理厨喂,如何拒絕任務和措。
以上參數(shù)中,大部分都很簡單蜕煌,只有workQueue和handler需要進行詳細說明派阱。
workQueue
參數(shù)workQueue指被提交但未執(zhí)行的任務隊列,它是一個BlockingQueue接口的對象斜纪,僅用于存放Runnable對象贫母。根據(jù)隊列功能分類文兑,在ThreadPoolExecutor的構(gòu)造函數(shù)中可使用以下幾種BlockingQueue。
直接提交的隊列:該功能由SynchronousQueue對象提供腺劣。SynchronousQueue是一個特殊的BlockingQueue绿贞。SynchronousQueue沒有容量,每一個插入操作都要等待一個相應的刪除操作誓酒,反之樟蠕,每一個刪除操作都要等待對應的插入操作。如果使用SynchronousQueue靠柑,提交的任務不會被真實的保存寨辩,而總是將新任務提交給線程執(zhí)行,如果沒有空閑的進程歼冰,則嘗試創(chuàng)建新的進程靡狞,如果進程數(shù)量已經(jīng)達到最大值,則執(zhí)行拒絕策略隔嫡。因此甸怕,使用SynchronousQueue隊列,通常要設(shè)置很大的maximumPoolSize值腮恩,否則很容易執(zhí)行拒絕策略梢杭。即SynchronousQueue沒有容量,提交的任務直接交給線程執(zhí)行秸滴,不保存任務武契,如果進程數(shù)量已經(jīng)達到最大值,則執(zhí)行拒絕策略荡含。
有界的任務隊列:有界的任務隊列可以使用ArrayBlockingQueue實現(xiàn)咒唆。ArrayBlockingQueue的構(gòu)造函數(shù)必須帶一個容量參數(shù),表示該隊列的最大容量释液,如下所示全释。
public ArrayBlockingQueue(int capacity)
- 當使用有界的任務隊列時,若有新的任務需要執(zhí)行误债,如果線程池的實際線程數(shù)小于corePoolSize浸船,則會優(yōu)先創(chuàng)建 新的線程,若大于corePoolSize寝蹈,則會將新任務加入等待隊列李命。若等待隊列已滿,無法加入躺盛,則在總線程數(shù)不大于maximumPoolSize的前提下项戴,創(chuàng)建新的進程執(zhí)行任務。若大于maximumPoolSize槽惫,則執(zhí)行拒絕策略周叮”绯牛可見,有界隊列僅當在任務隊列裝滿時仿耽,才可能將線程數(shù)提升到corePoolSize以上合冀,換言之,除非系統(tǒng)非常繁忙项贺,否則確保核心線程數(shù)維持在在corePoolSize君躺。
無界的任務隊列:無界任務隊列可以通過LinkedBlockingQueue類實現(xiàn)。與有界隊列相比开缎,除非系統(tǒng)資源耗盡棕叫,否則無界的任務隊列不存在任務入隊失敗的情況。當有新的任務到來奕删,系統(tǒng)的線程數(shù)小于corePoolSize時俺泣,線程池會生成新的線程執(zhí)行任務,但當系統(tǒng)的線程數(shù)達到corePoolSize后完残,就不會繼續(xù)增加伏钠。若后續(xù)仍有新的任務加入,而又沒有空閑的線程資源谨设,則任務直接進入隊列等待熟掂。若任務創(chuàng)建和處理的速度差異很大,無界隊列會保持快速增長扎拣,直到耗盡系統(tǒng)內(nèi)存赴肚。即如果使用無界隊列,正在工作的線程數(shù)最大只能是corePoolSize鹏秋,超過則直接進入隊列等待尊蚁,無界隊列可以無限增長亡笑,直到系統(tǒng)內(nèi)存耗盡侣夷。
優(yōu)先任務隊列:優(yōu)先任務隊列是帶有執(zhí)行優(yōu)先級的隊列。它通過PriorityBlockingQueue實現(xiàn)仑乌,可以控制任務的執(zhí)行先后順序百拓。它是一個特殊的無界隊列。無論是有界隊列ArrayBlockingQueue晰甚,還是未指定大小的無界隊列LinkedBlockingQueue都是按照先進先出算法處理任務的衙传。而PriorityBlockingQueue則可以根據(jù)任務自身的優(yōu)先級順序先后執(zhí)行,在確保系統(tǒng)性能的同時厕九,也能有很好的質(zhì)量保證(總是確保高優(yōu)先級的任務先執(zhí)行)蓖捶。
notice:
newFixedThreadPool()和newCachedThreadPool()的線程池數(shù)量
newFixedThreadPool()
回顧newFixedThreadPool()方法的實現(xiàn)。它返回了一個corePoolSize和maximumPoolSize大小一樣的扁远,并且使用了LinkedBlockingQueue任務隊列的線程池俊鱼。因為對于固定大小的線程池而言刻像,不存在線程數(shù)量的動態(tài)變化贮乳,因此corePoolSize和maximumPoolSize可以相等符隙。同時惰爬,它使用無界隊列存放無法立即執(zhí)行的任務喊积,當任務提交非常頻繁的時候梢夯,該隊列可能迅速膨脹呛谜,從而耗盡系統(tǒng)資源市怎。
newSingleThreadExecutor()返回的單線程線程池糠排,是newFixedThreadPool()方法的一種退化犀填,只是簡單的將線程池線程數(shù)量設(shè)置為1蠢壹。
newCachedThreadPool()
newCachedThreadPool()方法返回corePoolSize為0,maximumPoolSize無窮大的線程池九巡,這意味著在沒有任務時知残,該線程池內(nèi)無線程,而當任務被提交時比庄,該線程池會使用空閑的線程執(zhí)行任務求妹,若無空閑線程,則將任務加入SynchronousQueue隊列佳窑,而SynchronousQueue隊列是一種直接提交的隊列制恍,它總會迫使線程池增加新的線程執(zhí)行任務。當任務執(zhí)行完畢后神凑,由于corePoolSize為0净神,因此空閑線程又會在指定時間內(nèi)(60秒)被回收。
對于newCachedThreadPool()溉委,如果同時有大量任務被提交鹃唯,而任務的執(zhí)行又不那么快時,那么系統(tǒng)便會開啟等量的線程處理瓣喊,這樣做法可能會很快耗盡系統(tǒng)的資源坡慌。即newCachedThreadPool()是根據(jù)實際情況調(diào)整線程數(shù)量的線程池,最大可以創(chuàng)建無限多的線程藻三。
ThreadPoolExecutor線程池的核心調(diào)度代碼
01 public void execute(Runnable command) {
02 if (command == null)
03 throw new NullPointerException();
04 int c = ctl.get();
05 if (workerCountOf(c) < corePoolSize) {
06 if (addWorker(command, true))
07 return;
08 c = ctl.get();
09 }
10 if (isRunning(c) && workQueue.offer(command)) {
11 int recheck = ctl.get();
12 if (! isRunning(recheck) && remove(command))
13 reject(command);
14 else if (workerCountOf(recheck) == 0)
15 addWorker(null, false);
16 }
17 else if (!addWorker(command, false))
18 reject(command);
19 }
代碼第5行的workerCountOf()函數(shù)取得了當前線程池的線程總數(shù)洪橘。當線程總數(shù)小于corePoolSize核心線程數(shù)時,會將任務通過addWorker()方法直接調(diào)度執(zhí)行棵帽。否則熄求,則在第10行代碼處(workQueue.offer())進入等待隊列。如果進入等待隊列失敹焊拧(比如有界隊列到達了上限弟晚,或者使用了SynchronousQueue),則會執(zhí)行第17行,將任務直接提交給線程池卿城。如果當前線程數(shù)已經(jīng)達到maximumPoolSize淑履,則提交失敗,就執(zhí)行第18行的拒絕策略藻雪。
拒絕策略(handler)
ThreadPoolExecutor的最后一個參數(shù)指定了拒絕策略秘噪。也就是當任務數(shù)量超過系統(tǒng)實際承載能力時,使用拒絕策略勉耀。
JDK內(nèi)置的拒絕策略如下指煎。
- AbortPolicy策略:該策略會直接拋出異常,阻止系統(tǒng)正常工作便斥。
- CallerRunsPolicy策略:只要線程池未關(guān)閉至壤,該策略直接在調(diào)用者線程中,運行當前被丟棄的任務枢纠。顯然這樣做不會真的丟棄任務像街,但是,任務提交線程的性能極有可能會急劇下降晋渺。即該拒絕策略丟棄的線程放到調(diào)用者的線程中執(zhí)行镰绎,也就是誰調(diào)用線程池,該任務就交給誰執(zhí)行木西。
- DiscardOledestPolicy策略:該策略將丟棄最老的一個請求畴栖,也就是即將被執(zhí)行的一個任務,并嘗試再次提交當前任務八千。
- DiscardPolicy策略:該策略默默地丟棄無法處理的任務吗讶,不予任何處理。如果允許任務丟失恋捆,我覺得這可能是最好的一種方案了吧照皆!
notice: 執(zhí)行拒絕策略的前提都是隊列已滿,隊列將請求交給線程池沸停。
以上內(nèi)置的策略均實現(xiàn)了RejectedExecutionHandler接口膜毁,若以上策略仍無法滿足實際應用需要,完全可以自己擴展RejectedExecutionHandler接口星立。RejectedExecutionHandler的定義如下:
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
其中r為請求執(zhí)行的任務爽茴,executor為當前的線程池葬凳。
下面的代碼簡單地演示了自定義線程池和拒絕策略的使用:
01 public class RejectThreadPoolDemo {
02 public static class MyTask implements Runnable {
03 @Override
04 public void run() {
05 System.out.println(System.currentTimeMillis() + ":Thread ID:"
06 + Thread.currentThread().getId());
07 try {
08 Thread.sleep(100);
09 } catch (InterruptedException e) {
10 e.printStackTrace();
11 }
12 }
13 }
14
15 public static void main(String[] args) throws InterruptedException {
16 MyTask task = new MyTask();
17 ExecutorService es = new ThreadPoolExecutor(5, 5,
18 0L, TimeUnit.MILLISECONDS,
19 new LinkedBlockingQueue<Runnable>(10),
20 Executors.defaultThreadFactory(),
21 new RejectedExecutionHandler(){
22 @Override
23 public void rejectedExecution(Runnable r,
24 ThreadPoolExecutor executor) {
25 System.out.println(r.toString()+" is discard");
26 }
27 });
28 for (int i = 0; i < Integer.MAX_VALUE; i++) {
29 es.submit(task);
30 Thread.sleep(10);
31 }
32 }
33 }
? 上述代碼的第17~27行自定義了一個線程池绰垂。該線程池有5個常駐線程,并且最大線程數(shù)量也是5個火焰。這和固定大小的線程池是一樣的劲装。但是它卻擁有一個只有10個容量的等待隊列。因為使用無界隊列很可能并不是最佳解決方案,如果任務量極大占业,很有可能會把內(nèi)存撐爆绒怨。給出一個合理的隊列大小,也是合乎常理的選擇谦疾。同時南蹂,這里自定義了拒絕策略,我們不拋出異常念恍,因為萬一在任務提交端沒有進行異常處理六剥,則有可能使得整個系統(tǒng)都崩潰,這極有可能不是我們希望遇到的峰伙。但作為必要的信息記錄疗疟,我們將任務丟棄的信息進行打印,當然瞳氓,這只比內(nèi)置的DiscardPolicy策略高級那么一點點策彤。
? 由于在這個案例中,MyTask執(zhí)行需要花費100毫秒匣摘,因此店诗,必然會導致大量的任務被直接丟棄。執(zhí)行上述代碼音榜,可能的部分輸出如下:
1426597264669:Thread ID:11
1426597264679:Thread ID:12
java.util.concurrent.FutureTask@a57993 is discard
java.util.concurrent.FutureTask@1b84c92 is discard
可以看到必搞,在執(zhí)行幾個任務后,拒絕策略就開始生效了囊咏。在實際應用中恕洲,我們可以將更詳細的信息記錄到日志中,來分析系統(tǒng)的負載和任務丟失的情況梅割。
自定義線程創(chuàng)建:ThreadFactory
線程池的主要作用是為了線程復用霜第,也就是避免了線程的頻繁創(chuàng)建。但是户辞,最開始的那些線程從何而來呢泌类?答案就是ThreadFactory。
ThreadFactory是一個接口底燎,它只有一個方法刃榨,用來創(chuàng)建線程:
Thread newThread(Runnable r);
? 當線程池需要新建線程時,就會調(diào)用這個方法双仍。
? 自定義線程池可以幫助我們做不少事枢希。比如:
可以跟蹤線程池究竟在何時創(chuàng)建了多少線程
可以自定義線程的名稱、組以及優(yōu)先級等信息
可以任性地將所有的線程設(shè)置為守護線程朱沃。
總之苞轿,使用自定義線程池可以讓我們更加自由地設(shè)置池子中所有線程的狀態(tài)茅诱。下面的案例使用自定義的ThreadFactory,一方面記錄了線程的創(chuàng)建搬卒,另一方面將所有的線程都設(shè)置為守護線程瑟俭,這樣,當主線程退出后契邀,將會強制銷毀線程池(這個銷毀線程池的思路真是石破天驚)摆寄。
01 public static void main(String[] args) throws InterruptedException {
02 MyTask task = new MyTask();
03 ExecutorService es = new ThreadPoolExecutor(5, 5,
04 0L, TimeUnit.MILLISECONDS,
05 new SynchronousQueue<Runnable>(),
06 new ThreadFactory(){
07 @Override
08 public Thread newThread(Runnable r) {
09 Thread t= new Thread(r);
10 t.setDaemon(true);
11 System.out.println("create "+t);
12 return t;
13 }
14 }
15 );
16 for (int i = 0; i < 5; i++) {
17 es.submit(task);
18 }
19 Thread.sleep(2000);
20 }
擴展線程池
ThreadPoolExecutor也是一個可以擴展的線程池。它提供了beforeExecute()坯门、afterExecute()和terminated()三個接口對線程池進行控制椭迎。
以beforeExecute()、afterExecute()為例田盈,在ThreadPoolExecutor.Worker. runTask()方法內(nèi)部提供了這樣的實現(xiàn):
boolean ran = false;
beforeExecute(thread, task); //運行前
try {
task.run(); //運行任務
ran = true;
afterExecute(task, null); //運行結(jié)束后
++completedTasks;
} catch (RuntimeException ex) {
if (!ran)
afterExecute(task, ex); //運行結(jié)束
throw ex;
}
ThreadPoolExecutor.Worker是ThreadPoolExecutor的內(nèi)部類畜号,它是一個實現(xiàn)了Runnable接口的類。ThreadPoolExecutor線程池中的工作線程也正是Worker實例允瞧。Worker.runTask()方法會被線程池以多線程模式異步調(diào)用简软,即Worker.runTask()會同時被多個線程訪問。因此其beforeExecute()述暂、afterExecute()接口也將同時多線程訪問痹升。
在默認的ThreadPoolExecutor實現(xiàn)中,提供了空的beforeExecute()和afterExecute()實現(xiàn)畦韭。在實際應用中疼蛾,可以對其進行擴展來實現(xiàn)對線程池運行狀態(tài)的跟蹤,輸出一些有用的調(diào)試信息艺配,以幫助系統(tǒng)故障診斷察郁,這對于多線程程序錯誤排查是很有幫助的。下面演示了對線程池的擴展转唉,在這個擴展中皮钠,我們將記錄每一個任務的執(zhí)行日志。(這種擴展太棒了)
線程池擴展起始時間赠法,完成時間麦轰,退出,和為線程賦名字范式
01 public class ExtThreadPool {
02 public static class MyTask implements Runnable {
03 public String name;
04
05 public MyTask(String name) {
06 this.name = name;
07 }
08
09 @Override
10 public void run() {
11 System.out.println("正在執(zhí)行" + ":Thread ID:" + Thread. currentThread().getId()
12 + ",Task Name=" + name);
13 try {
14 Thread.sleep(100);
15 } catch (InterruptedException e) {
16 e.printStackTrace();
17 }
18 }
19 }
20
21 public static void main(String[] args) throws InterruptedException {
22
23 ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
24 new LinkedBlockingQueue<Runnable>()) {
25 @Override
26 protected void beforeExecute(Thread t, Runnable r) {
27 System.out.println("準備執(zhí)行:" + ((MyTask) r).name);
28 }
29
30 @Override
31 protected void afterExecute(Runnable r, Throwable t) {
32 System.out.println("執(zhí)行完成:" + ((MyTask) r).name);
33 }
34
35 @Override
36 protected void terminated() {
37 System.out.println("線程池退出");
38 }
39
40 };
41 for (int i = 0; i < 5; i++) {
42 MyTask task = new MyTask("TASK-GEYM-" + i);
43 es.execute(task);
44 Thread.sleep(10);
45 }
46 es.shutdown();
47 }
48 }
輸出
準備執(zhí)行:TASK-GEYM-0
正在執(zhí)行:Thread ID:8,Task Name=TASK-GEYM-0
準備執(zhí)行:TASK-GEYM-1
正在執(zhí)行:Thread ID:9,Task Name=TASK-GEYM-1
準備執(zhí)行:TASK-GEYM-2
正在執(zhí)行:Thread ID:10,Task Name=TASK-GEYM-2
準備執(zhí)行:TASK-GEYM-3
正在執(zhí)行:Thread ID:11,Task Name=TASK-GEYM-3
準備執(zhí)行:TASK-GEYM-4
正在執(zhí)行:Thread ID:12,Task Name=TASK-GEYM-4
執(zhí)行完成:TASK-GEYM-0
執(zhí)行完成:TASK-GEYM-1
執(zhí)行完成:TASK-GEYM-2
執(zhí)行完成:TASK-GEYM-3
執(zhí)行完成:TASK-GEYM-4
線程池退出
上述代碼在第23~40行砖织,擴展了原有的線程池款侵,實現(xiàn)了beforeExecute()、afterExecute()和terminiated()三個方法侧纯。這三個方法分別用于記錄一個任務的開始新锈、結(jié)束和整個線程池的退出。在第42~43行茂蚓,向線程池提交5個任務壕鹉,為了有更清晰的日志剃幌,我們?yōu)槊總€任務都取了一個不同的名字聋涨。
在提交完成后晾浴,調(diào)用shutdown()方法關(guān)閉線程池。這是一個比較安全的方法牍白,如果當前正有線程在執(zhí)行脊凰,shutdown()方法并不會立即暴力地終止所有任務,它會等待所有任務執(zhí)行完成后茂腥,再關(guān)閉線程池狸涌,但它并不會等待所有線程執(zhí)行完成后再返回,因此最岗,可以簡單地理解成shutdown()只是發(fā)送了一個關(guān)閉信號而已帕胆。但在shutdown()方法執(zhí)行后,這個線程池就不能再接受其他新的任務了般渡。
優(yōu)化線程池線程數(shù)量
在《Java Concurrency in Practice》一書中給出了一個估算線程池大小的經(jīng)驗公式:
為保持處理器達到期望的使用率懒豹,最優(yōu)的池的大小等于:
在Java中,可以通過:
Runtime.getRuntime().availableProcessors()
取得可用的CPU數(shù)量驯用。通過這么為線程池賦予線程數(shù)更加優(yōu)雅脸秽。
在線程池中尋找堆棧
處理不好線程的異常,線程池會吞異常信息
public class DivTask implements Runnable {
int a,b;
public DivTask(int a,int b){
this.a=a;
this.b=b;
}
@Override
public void run() {
double re=a/b;
System.out.println(re);
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
ThreadPoolExecutor pools=new ThreadPoolExecutor(0, Integer.MAX_VALUE,
0L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
for(int i=0;i<5;i++){
pools.submit(new DivTask(100,i));
}
}
上述代碼將DivTask提交到線程池蝴乔,從這個for循環(huán)來看记餐,我們應該會得到5個結(jié)果,分別是100除以給定的i后的商薇正。但如果你真的運行程序片酝,你得到的全部結(jié)果是:
33.0
50.0
100.0
25.0
? 只有4個輸出。也就說是程序漏算了一組數(shù)據(jù)挖腰!但更不幸的是钠怯,程序沒有任何日志,沒有任何錯誤提示曙聂,就好像一切都正常一樣晦炊。在這個簡單的案例中,只要你稍有經(jīng)驗宁脊,你就能發(fā)現(xiàn)断国,作為除數(shù)的i取到了0,這個缺失的值很可能是由于除以0導致的榆苞。但在稍復雜的業(yè)務場景中稳衬,這種錯誤足可以讓你幾天萎靡不振。
? 因此坐漏,使用線程池雖然是件好事薄疚,但是還是得處處留意這些“坑”碧信。線程池很有可能會“吃”掉程序拋出的異常,完全不打印堆棧信息街夭,導致我們對程序的錯誤一無所知砰碴。
<font color=red>解決方案</font>
解決這個問題的一個最簡單的方式就是放棄submit(),改用execute()板丽。將上述的任務提交代碼改成:
pools.execute(new DivTask(100,i));
或者你使用下面的方法改造你的submit():
Future re=pools.submit(new DivTask(100,i));
re.get();
上面兩種方法都可以得到部分堆棧信息呈枉,如下所示:
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
at geym.conc.ch3.trace.DivTask.run(DivTask.java:11)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor. java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor. java:617)
at java.lang.Thread.run(Thread.java:745)
33.0
100.0
50.0
25.0
顯然只有部分堆棧信息是不夠的,從這兩個異常堆棧中我們只能知道異常是在哪里拋出的(這里是DivTask的第11行)埃碱。但是我們還希望得到另外一個更重要的信息猖辫,那就是這個任務到底是在哪里提交的?而任務的具體提交位置已經(jīng)被線程池完全淹沒了砚殿。順著堆棧啃憎,我們最多只能找到線程池中的調(diào)度流程,而這對于我們幾乎是沒有價值的似炎。
所以需要擴展ThreadPoolExecutor線程池辛萍,讓它在調(diào)度任務之前,先保存一下提交任務線程的堆棧信息名党。
01 public class TraceThreadPoolExecutor extends ThreadPoolExecutor {
02 public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
03 long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
04 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
05 }
05 }
06
07 @Override
08 public void execute(Runnable task) {
09 super.execute(wrap(task, clientTrace(), Thread.currentThread()
10 .getName()));
11 }
12
13 @Override
14 public Future<?> submit(Runnable task) {
15 return super.submit(wrap(task, clientTrace(), Thread.currentThread()
16 .getName()));
17 }
18
19 private Exception clientTrace() {
20 return new Exception("Client stack trace");
21 }
22
23 private Runnable wrap(final Runnable task, final Exception clientStack,
24 String clientThreadName) {
25 return new Runnable() {
26 @Override
27 public void run() {
28 try {
29 task.run();
30 } catch (Exception e) {
31 clientStack.printStackTrace();
32 throw e;
33 }
34 }
35 };
36 }
37 }
在第23行代碼中叹阔,wrap()方法的第2個參數(shù)為一個異常,里面保存著提交任務的線程的堆棧信息传睹。該方法將我們傳入的Runnable任務進行一層包裝耳幢,使之能處理異常信息。當任務發(fā)生異常時欧啤,這個異常會被打印睛藻。
14 public static void main(String[] args) {
15 ThreadPoolExecutor pools=new TraceThreadPoolExecutor(0, Integer.MAX_VALUE,
16 0L, TimeUnit.SECONDS,
17 new SynchronousQueue<Runnable>());
18
19 /**
20 * 錯誤堆棧中可以看到是在哪里提交的任務
21 */
22 for(int i=0;i<5;i++){
23 pools.execute(new DivTask(100,i));
24 }
25 }
執(zhí)行上述代碼,就可以得到以下信息:
java.lang.Exception: Client stack trace
at geym.conc.ch3.trace.TraceThreadPoolExecutor.clientTrace(TraceThreadPoolExecutor.java:28)
at geym.conc.ch3.trace.TraceThreadPoolExecutor.execute(TraceThreadPoolExecutor.java:17)
at geym.conc.ch3.trace.TraceMain.main(TraceMain.java:23) //這里顯示了線程的提交位置
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
at geym.conc.ch3.trace.DivTask.run(DivTask.java:11)
at geym.conc.ch3.trace.TraceThreadPoolExecutor$1.run(TraceThreadPoolExecutor.java:37)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
33.0
100.0
25.0
50.0
熟悉的異常又回來了邢隧!現(xiàn)在店印,我們不僅可以得到異常發(fā)生的Runnable實現(xiàn)內(nèi)的信息,我們也知道了這個任務是在哪里提交的倒慧。如此豐富的信息按摘,我相信可以幫助我們瞬間定位問題!
Fork/Join框架
拆分匯總
使用Fork/Join進行數(shù)據(jù)處理時的總體結(jié)構(gòu)如圖
通過Fork/Join可以提高線程利用率纫谅,和避免數(shù)據(jù)競爭
提高利用率
線程A已經(jīng)把自己的任務都執(zhí)行完成了炫贤,而線程B還有一堆任務等著處理,此時付秕,線程A就會“幫助”線程B兰珍,從線程B的任務隊列中拿一個任務過來處理,盡可能地達到平衡询吴。
避免數(shù)據(jù)競爭
當線程試圖幫助別人時掠河,總是從任務隊列的底部開始拿數(shù)據(jù)亮元,而線程試圖執(zhí)行自己的任務時,則是從相反的頂部開始拿唠摹。
下面我們來看一下ForkJoinPool的一個重要的接口:
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
可以向ForkJoinPool線程池提交一個ForkJoinTask任務爆捞。所謂ForkJoinTask任務就是支持fork()分解以及join()等待的任務。ForkJoinTask有兩個重要的子類跃闹,RecursiveAction和RecursiveTask嵌削。它們分別表示沒有返回值的任務和可以攜帶返回值的任務毛好。
案例
01 public class CountTask extends RecursiveTask<Long>{
02 private static final int THRESHOLD = 10000;
03 private long start;
04 private long end;
05
06 public CountTask(long start,long end){
07 this.start=start;
08 this.end=end;
09 }
10
11 public Long compute(){
12 long sum=0;
13 boolean canCompute = (end-start)<THRESHOLD;
14 if(canCompute){
15 for(long i=start;i<=end;i++){
16 sum +=i;
17 }
18 }else{
19 //分成100個小任務
20 long step=(start+end)/100;
21 ArrayList<CountTask> subTasks=new ArrayList<CountTask>();
22 long pos=start;
23 for(int i=0;i<100;i++){
24 long lastOne=pos+step;
25 if(lastOne>end)lastOne=end;
26 CountTask subTask=new CountTask(pos,lastOne);
27 pos+=step+1;
28 subTasks.add(subTask);
29 subTask.fork();//這里進行了遞歸
30 }
31 for(CountTask t:subTasks){
32 sum+=t.join();
33 }
34 }
35 return sum;
36 }
37
38 public static void main(String[]args){
39 ForkJoinPool forkJoinPool = new ForkJoinPool();
40 CountTask task = new CountTask(0,200000L);
41 ForkJoinTask<Long> result = forkJoinPool.submit(task);
42 try{
43 long res = result.get();
44 System.out.println("sum="+res);
45 }catch(InterruptedException e){
46 e.printStackTrace();
47 }catch(ExecutionException e){
48 e.printStackTrace();
49 }
50 }
51 }
? 由于計算數(shù)列的和必然是需要函數(shù)返回值的望艺,因此選擇RecursiveTask作為任務的模型。上述代碼第39行肌访,建立ForkJoinPool線程池找默。在第40行,構(gòu)造一個計算1到200000求和的任務吼驶。在第41行將任務提交給線程池惩激,線程池會返回一個攜帶結(jié)果的任務,通過get()方法可以得到最終結(jié)果(第43行)蟹演。如果在執(zhí)行g(shù)et()方法時风钻,任務沒有結(jié)束,那么主線程就會在get()方法時等待酒请。
? 在使用ForkJoin時需要注意骡技,如果任務的劃分層次很深,一直得不到返回羞反,那么可能出現(xiàn)兩種情況:第一布朦,系統(tǒng)內(nèi)的線程數(shù)量越積越多,導致性能嚴重下降昼窗。第二是趴,函數(shù)的調(diào)用層次變得很深,最終導致棧溢出澄惊。不同版本的JDK內(nèi)部實現(xiàn)機制可能有差異唆途,從而導致其表現(xiàn)不同。
? 下面的StackOverflowError異常就是加深本例的調(diào)用層次掸驱,在JDK 8上得到的錯誤肛搬。
java.util.concurrent.ExecutionException: java.lang.StackOverflowError
at java.util.concurrent.ForkJoinTask.get(ForkJoinTask.java:1000)
at geym.conc.ch3.fork.CountTask.main(CountTask.java:51)
Caused by: java.lang.StackOverflowError
? 此外,F(xiàn)orkJoin線程池使用一個無鎖的棧來管理空閑線程亭敢。如果一個工作線程暫時取不到可用的任務滚婉,則可能會被掛起,掛起的線程將會被壓入由線程池維護的棧中帅刀。待將來有任務可用時让腹,再從棧中喚醒這些線程远剩。