1 線程池介紹
1.1 線程池概念
Sun
在Java5
中挽铁,對Java
線程的類庫做了大量的擴展,其中線程池就是Java5
的新特征之一虑乖,除了線程池之外穷遂,還有很多多線程相關的內(nèi)容,為多線程的編程帶來了極大便利千诬。為了編寫高效穩(wěn)定可靠的多線程程序耍目,線程部分的新增內(nèi)容顯得尤為重要。
有關Java5
線程新特征的內(nèi)容全部在java.util.concurrent
下面徐绑,里面包含數(shù)目眾多的接口和類邪驮,熟悉這部分API特征是一項艱難的學習過程
線程池的基本思想還是一種對象池的思想,開辟一塊內(nèi)存空間傲茄,里面存放了眾多(未死亡)的線程毅访,池中線程執(zhí)行調(diào)度由池管理器來處理。當有線程任務時盘榨,從池中取一個喻粹,執(zhí)行完成后線程對象歸池,這樣可以避免反復創(chuàng)建線程對象所帶來的性能開銷较曼,節(jié)省了系統(tǒng)的資源磷斧。
1.2 線程池好處
合理利用線程池能夠帶來三個好處
第一:降低資源消耗
。通過重復利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗
第二:提高響應速度
捷犹。當任務到達時弛饭,任務可以不需要等到線程創(chuàng)建就能立即執(zhí)行
第三:提高線程的可管理性
。線程是稀缺資源萍歉,如果無限制的創(chuàng)建侣颂,不僅會消耗系統(tǒng)資源,還會降低系統(tǒng)的穩(wěn)定性枪孩,使用線程池可以進行統(tǒng)一的分配憔晒,調(diào)優(yōu)和監(jiān)控。但是要做到合理的利用線程池蔑舞,必須對其原理了如指掌拒担。
2 線程池的使用
2.1 線程池的創(chuàng)建
2.1.1 通過ThreadPoolExecutor創(chuàng)建
我們可以通過ThreadPoolExecutor
來創(chuàng)建一個線程池
new ThreadPoolExecutor(corePoolSize, maximumPoolSize,keepAliveTime, timeUnit,
runnableTaskQueue, threadFactory,handler);
創(chuàng)建一個線程池需要輸入幾個參數(shù):
-
corePoolSize
(線程池的基本大小):當提交一個任務到線程池時攻询,線程池會創(chuàng)建一個線程來執(zhí)行任務从撼,即使其他空閑的基本線程能夠執(zhí)行新任務也會創(chuàng)建線程,等到需要執(zhí)行的任務數(shù)大于線程池基本大小時就不再創(chuàng)建钧栖。如果調(diào)用了線程池的prestartAllCoreThreads
方法低零,線程池會提前創(chuàng)建并啟動所有基本線程婆翔。 -
runnableTaskQueue
(任務隊列):用于保存等待執(zhí)行的任務的阻塞隊列√蜕簦可以選擇以下幾個阻塞隊列:-
ArrayBlockingQueue
:是一個基于數(shù)組結構的有界阻塞隊列
啃奴,此隊列按FIFO
(先進先出)原則對元素進行排序,必須指定隊列大小雄妥。 -
LinkedBlockingQueue
:一個基于鏈表結構的有界/(無界:未指定容量則Integer.MAX_VALUE)阻塞隊列
最蕾,此隊列按FIFO
(先進先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue
老厌。靜態(tài)工廠方法Executors.newFixedThreadPool()
和newSingleThreadExecutor
使用了這個隊列揖膜,注意
:如果不指定隊列大小,就默認為Integer.MAX_VALUE
梅桩。 -
SynchronousQueue
:一個無容量的阻塞隊列。每個插入操作必須等到另一個線程調(diào)用移除操作拜隧,否則插入操作一直處于阻塞狀態(tài)宿百,吞吐量通常要高于LinkedBlockingQueue
,靜態(tài)工廠方法Executors.newCachedThreadPool
使用了這個隊列洪添。 -
PriorityBlockingQueue
:一個具有優(yōu)先級的無限阻塞隊列
垦页。存放在PriorityBlockingQueue
中的元素必須實現(xiàn)Comparable
接口,這樣才能通過實現(xiàn)compareTo()
方法進行排序干奢。優(yōu)先級最高的元素將始終排在隊列的頭部痊焊;PriorityBlockingQueue
不會保證優(yōu)先級一樣的元素的排序,也不保證當前隊列中除了優(yōu)先級最高的元素以外的元素忿峻,隨時處于正確排序的位置薄啥。 -
DelayedWorkQueue
:一個使用優(yōu)先級隊列實現(xiàn)的無界阻塞隊列,靜態(tài)工廠方法Executors.newScheduledThreadPool
使用逛尚。
基于二叉堆實現(xiàn)垄惧,同時具備:無界隊列、阻塞隊列绰寞、優(yōu)先隊列的特征到逊。DelayQueue
延遲隊列中存放的對象,必須是實現(xiàn)Delayed
接口的類對象滤钱。通過執(zhí)行時延從隊列中提取任務觉壶,時間沒到任務取不出來。 -
LinkedBlockingDeque
: 雙端隊列件缸,有界/(無界:未指定容量則Integer.MAX_VALUE)阻塞隊列
铜靶,基于鏈表實現(xiàn),既可以從尾部插入/取出元素停团,還可以從頭部插入元素/取出元素旷坦。 -
LinkedTransferQueue
: 由鏈表結構組成的無界阻塞隊列掏熬。這個隊列比較特別的時,采用一種預占模式秒梅,意思就是消費者線程取元素時旗芬,如果隊列不為空,則直接取走數(shù)據(jù)捆蜀,若隊列為空疮丛,那就生成一個節(jié)點(節(jié)點元素為null
)入隊,然后消費者線程被等待在這個節(jié)點上辆它,后面生產(chǎn)者線程入隊時發(fā)現(xiàn)有一個元素為null
的節(jié)點誊薄,生產(chǎn)者線程就不入隊了,直接就將元素填充到該節(jié)點锰茉,并喚醒該節(jié)點等待的線程呢蔫,被喚醒的消費者線程取走元素。
-
-
maximumPoolSize
(線程池最大大徐):線程池允許創(chuàng)建的最大線程數(shù)片吊。如果隊列滿了,并且已創(chuàng)建的線程數(shù)小于最大線程數(shù)协屡,則線程池會再創(chuàng)建新的線程執(zhí)行任務俏脊。值得注意的是如果使用了無界的任務隊列這個參數(shù)就沒什么效果。 -
ThreadFactory
:用于設置創(chuàng)建線程的工廠肤晓,可以通過線程工廠給每個創(chuàng)建出來的線程設置更有意義的名字爷贫,Debug和定位問題時非常又幫助。 -
RejectedExecutionHandler
(飽和策略):當隊列和線程池都滿了补憾,說明線程池處于飽和狀態(tài)漫萄,那么必須采取一種策略處理提交的新任務。這個策略默認情況下是AbortPolicy
余蟹,表示無法處理新任務時拋出異常卷胯。以下是JDK1.5提供的四種策略:-
AbortPolicy
(默認): 丟棄任務并拋出RejectedExecutionException
異常。 -
CallerRunsPolicy
:只用調(diào)用者所在線程來運行任務威酒。 -
DiscardOldestPolicy
:丟棄隊列里最近的一個任務窑睁,并執(zhí)行當前任務。將當前處于等待隊列列頭
的等待任務強行取出葵孤,然后再試圖將當前被拒絕的任務提交到線程池執(zhí)行担钮。 -
DiscardPolicy
:直接丟棄任務,不拋出任何異常尤仍。 - 自定義策略:當然也可以根據(jù)應用場景需要來實現(xiàn)
RejectedExecutionHandler
接口自定義策略箫津。如記錄日志或持久化不能處理的任務。
-
-
keepAliveTime
(線程活動保持時間):線程池的工作線程空閑后,保持存活的時間苏遥。所以如果任務很多饼拍,并且每個任務執(zhí)行的時間比較短,可以調(diào)大這個時間田炭,提高線程的利用率师抄。 -
TimeUnit
(線程活動保持時間的單位):可選的單位有天(DAYS
),小時(HOURS
)教硫,分鐘(MINUTES
)叨吮,毫秒(MILLISECONDS
),微秒(MICROSECONDS
, 千分之一毫秒)和毫微秒(NANOSECONDS
, 千分之一微秒)
自定義連接池稍微麻煩些瞬矩,不過通過創(chuàng)建的ThreadPoolExecutor線程池對象茶鉴,可以獲取到當前線程池的尺寸、正在執(zhí)行任務的線程數(shù)景用、工作隊列等等
具體示例:
ThreadPoolExecutor thread
= new ThreadPoolExecutor(1, 5, 5, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactoryBuilder().setNamePrefix("Pool-" + "線程名字").setDaemon(false).build(),
new ThreadPoolExecutor.CallerRunsPolicy());
2.1.2 通過Executors方式創(chuàng)建
Java
通過Executors
(jdk1.5并發(fā)包)提供四種線程池涵叮,分別為:
-
newCachedThreadPool
:創(chuàng)建一個可緩存線程池,如果線程池長度超過處理需要伞插,可靈活回收空閑線程围肥,若無可回收,則新建線程
ExecutorService pool = Executors.newCachedThreadPool();
-
newFixedThreadPool
:創(chuàng)建一個定長線程池蜂怎,可控制線程最大并發(fā)數(shù),超出的線程會在隊列中等待
ExecutorService pool = Executors.newFixedThreadPool(2);
-
newScheduledThreadPool
:創(chuàng)建一個定長線程池置尔,支持定時及周期性任務執(zhí)行
ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
將線程放入池中進行執(zhí)行
pool.execute(new MyThread());
使用延遲執(zhí)行風格的方法
pool.schedule(new MyThread(), 10, TimeUnit.MILLISECONDS);
class MyThread extends Thread {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "正在執(zhí)行杠步。。榜轿。");
}
}
-
newSingleThreadExecutor
:創(chuàng)建一個單線程化的線程池幽歼,它只會用唯一的工作線程來執(zhí)行任務,保證所有任務按照指定順序(FIFO, LIFO, 優(yōu)先級)執(zhí)行
ExecutorService pool = Executors.newSingleThreadExecutor();
2.1.3 線程池初始化和容量調(diào)整
默認情況下谬盐,創(chuàng)建線程池之后甸私,線程池中是沒有線程的,需要提交任務之后才會創(chuàng)建線程飞傀。
在實際中如果需要線程池創(chuàng)建之后立即創(chuàng)建線程皇型,可以通過以下兩個方法辦到:
-
prestartCoreThread()
:boolean prestartCoreThread()
,初始化一個核心線程 -
prestartAllCoreThreads()
:int prestartAllCoreThreads()
砸烦,初始化所有核心線程弃鸦,并返回初始化的線程數(shù)
線程池容量調(diào)整
ThreadPoolExecutor
提供了動態(tài)調(diào)整線程池容量大小的方法:
-
setCorePoolSize
:設置核心池大小 -
setMaximumPoolSize
:設置線程池最大能創(chuàng)建的線程數(shù)目大小
當上述參數(shù)從小變大時,ThreadPoolExecutor
進行線程賦值幢痘,還可能立即創(chuàng)建新的線程來執(zhí)行任務唬格。
2.2 線程池提交的返回值
2.2.1 無返回值
2.2.1.1 execute提交
可以使用execute
提交的任務,但是execute
方法沒有返回值,所以無法判斷任務是否被線程池執(zhí)行成功购岗。通過以下代碼可知execute
方法輸入的任務是一個Runnable
類的實例
threadsPool.execute(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
}
});
2.2.1.2 實現(xiàn)Runnable接口
無返回值的任務必須Runnable
接口重寫run
方法汰聋,方法的異常只能在內(nèi)部消化,不能繼續(xù)上拋
2.2.2 有返回值
2.2.2.1 submit提交
我們也可以使用submit
方法來提交任務喊积,它會返回一個future
,那么我們可以通過這個future
來判斷任務是否執(zhí)行成功烹困,通過future
的get
方法來獲取返回值, get方法會阻塞住直到任務完成
注服,而使用get(long timeout, TimeUnit unit)
方法則會阻塞一段時間后立即返回韭邓,這時有可能任務沒有執(zhí)行完。
try {
Object s = future.get();
} catch (InterruptedException e) {
// 處理中斷異常
} catch (ExecutionException e) {
// 處理無法執(zhí)行任務異常
} finally {
// 關閉線程池
executor.shutdown();
}
2.2.2.2 實現(xiàn)Callable接口
返回值的任務必須實現(xiàn)Callable
接口重寫call
方法且方法允許拋出異常
要想有返回值溶弟,那么submit
的組合:
-
submit(Callable<T> task)
能獲取到它的返回值女淑,通過future.get()
獲取(阻塞直到任務執(zhí)行完)辜御。一般使用FutureTask+Callable
配合使用 -
submit(Runnable task, T result)
能通過傳入的載體result
間接獲得線程的返回值鸭你。 -
submit(Runnable task)
則是沒有返回值的,就算獲取它的返回值也是null
2.2.3 使用例子
public void testFuture() throws InterruptedException {
ExecutorService executor = Executors.newCachedThreadPool();
Task task = new Task();
NewTask newTask = new NewTask();
Future<Integer> result = executor.submit(task);
Future<String> ends = executor.submit(newTask);
executor.shutdown();
System.out.println("主線程開始運行");
System.out.println("主線程做一些復雜任務");
Thread.sleep(10000);
System.out.println("主線程需要子線程的計算結果");
try {
System.out.println("主線程得到子線程的結果:"+result.get());
System.out.println("主線程需要第二個子線程的數(shù)據(jù):"+ends.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("所有均完畢");
}
class Task implements Callable<Integer>{
public Integer call() throws Exception {
System.out.println("子線程計算開始");
Thread.sleep(3000);
int sum = 0;
for (int i=0;i<100000;i++){
sum += i ;
}
System.out.println("子線程已經(jīng)計算完畢");
return sum;
}
}
class NewTask implements Callable<String>{
public String call() throws Exception {
System.out.println("第二個子線程已經(jīng)運行完畢");
return "success";
}
}
2.2.4 異常拋出
execute
:提交的任務在Worker#run
中被捕獲后直接拋出擒权。
submit
:提交的任務在FutureTask#run
中被捕獲后存儲到了屬性outcome
袱巨,get
時才會拋出異常。
為什么submit
提交的任務要get時才會獲取結果碳抄?
Callable
提交的任務愉老,我們只需要關心其結果即可,不用關心其過程剖效,主線程只需要在處理完自己的任務后獲取一個或一批任務的狀態(tài)和結果嫉入,讓開發(fā)者決定何時檢查任務是否成功完成,以及如何處理異常璧尸。
而Runnable
我們一般不要獲取返回值咒林,只需要提交任務即可,如果不直接拋出異常就會導致任務失敗了但是沒有感知爷光,尤其是在任務中catch
遺漏的一些沒有預料到異常且未設置UncaughtExceptionHandler
的時候垫竞。
雖然Runnable
任務雖然沒有返回值,但也可以submit, 然后通過Future.get()
了解到其成功或失敗的狀態(tài)蛀序。
2.2.5 invokeAll和invokeAny
invokeAll
和invokeAny
區(qū)別:
-
invokeAll
觸發(fā)執(zhí)行任務列表欢瞪,返回的結果順序也與任務在任務列表中的順序一致。所有線程執(zhí)行完任務后才返回結果徐裸。如果設置了超時時間引有,未超時完成則正常返回結果,如果超時未完成則報異常倦逐。 -
invokeAny
將第一個得到的結果作為返回值譬正,然后立刻終止所有的線程宫补。如果設置了超時時間,未超時完成則正常返回結果曾我,如果超時未完成則報超時異常
submit
粉怕、execute
、invokeAll
和invokeAny
區(qū)別:
-
invallkeAll
和invokeAny
會直接造成主線程阻塞(需要設置超時時間)抒巢。等待所有任務執(zhí)行完成后返回結果贫贝,主線程繼續(xù)執(zhí)行。 -
submit
不會造成主線程阻塞蛉谜,在后面執(zhí)行get
方法的時候阻塞稚晚。超時時間在get
里面設置。 -
execute
會新開啟線程直接執(zhí)行任務型诚,不會阻塞主線程客燕,但無返回結果
2.3 線程池的關閉
我們可以通過調(diào)用線程池的shutdown
或shutdownNow
方法來關閉線程池,但是它們的實現(xiàn)原理不同:
shutdown
的原理是只是將線程池的狀態(tài)設置成SHUTDOWN
狀態(tài)狰贯,然后 中斷所有沒有正在執(zhí)行任務的線程
也搓。
shutdownNow
的原理是遍歷線程池中的工作線程,然后逐個調(diào)用線程的interrupt
方法來中斷線程涵紊,所以無法響應中斷的任務可能永遠無法終止傍妒。shutdownNow
會首先將線程池的狀態(tài)設置成STOP
,然后嘗試停止所有的正在執(zhí)行或暫停任務的線程摸柄,并返回等待執(zhí)行任務的列表颤练。
只要調(diào)用了這兩個關閉方法的其中一個,isShutdown
方法就會返回true
驱负。當所有的任務都已關閉后,才表示線程池關閉成功昔案,這時調(diào)用isTerminaed
方法會返回true
。至于我們應該調(diào)用哪一種方法來關閉線程池电媳,應該由提交到線程池的任務特性決定,通常調(diào)用shutdown
來關閉線程池庆亡,如果任務不一定要執(zhí)行完匾乓,則可以調(diào)用shutdownNow
。
2.4 線程池狀態(tài)分析
線程池和線程的狀態(tài)是不一樣的哈又谋,線程池有這幾個狀態(tài):RUNNING
,SHUTDOWN
,STOP
,TIDYING
,TERMINATED
//線程池狀態(tài)
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
線程池各個狀態(tài)切換狀態(tài)圖如下:
-
RUNNING
當創(chuàng)建線程池后拼缝,初始時,線程池處于RUNNING
狀態(tài)
該狀態(tài)的線程池會接收新任務彰亥,并處理阻塞隊列中的任務;
調(diào)用線程池的shutdown()
方法咧七,可以切換到SHUTDOWN
狀態(tài);
調(diào)用線程池的shutdownNow()
方法,可以切換到STOP
狀態(tài); -
SHUTDOWN
該狀態(tài)的線程池不會接收新任務任斋,但會處理阻塞隊列中的任務继阻;
隊列為空,并且線程池中執(zhí)行的任務也為空,進入TIDYING
狀態(tài); -
STOP
該狀態(tài)的線程不會接收新任務,也不會處理阻塞隊列中的任務瘟檩,而且會中斷正在運行的任務抹缕;
線程池中執(zhí)行的任務為空,進入TIDYING
狀態(tài); -
TIDYING
該狀態(tài)表明所有的任務已經(jīng)運行終止,記錄的任務數(shù)量為0墨辛。
terminated()
執(zhí)行完畢卓研,進入TERMINATED
狀態(tài) -
TERMINATED
該狀態(tài)表示線程池徹底終止
3 線程池的分析
3.1 流程分析
Java線程池主要工作流程:
當提交一個新任務到線程池時,線程池的處理流程如下:
- 首先線程池判斷
基本線程池
是否已滿睹簇?沒滿奏赘,創(chuàng)建一個工作線程來執(zhí)行任務。滿了太惠,則進入下個流程磨淌。 - 其次線程池判斷
工作隊列
是否已滿?沒滿垛叨,則將新提交的任務存儲在工作隊列里伦糯。滿了,則進入下個流程嗽元。 - 最后線程池判斷
整個線程池
是否已滿敛纲?沒滿,則創(chuàng)建一個新的工作線程來執(zhí)行任務剂癌,滿了淤翔,則交給飽和策略來處理這個任務。
另外:當線程池中超過corePoolSize
線程佩谷,空閑時間達到keepAliveTime
時旁壮,關閉空閑線程
線程池內(nèi)線程數(shù)量小于等于coreSize
的部分稱為核心池,核心池是線程池的常駐部分谐檀,內(nèi)部的線程一般不會被銷毀抡谐,我們提交的任務也應該絕大部分都由核心池內(nèi)的線程來執(zhí)行,但是當設置allowCoreThreadTimeOut(true)
時桐猬,線程池中corePoolSize
線程空閑時間達到keepAliveTime
也將關閉
3.2 源碼分析
上面的流程分析讓我們很直觀的了解的線程池的工作原理麦撵,讓我們再通過源代碼來看看是如何實現(xiàn)的。線程池執(zhí)行任務的方法如下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//如果線程數(shù)小于基本線程數(shù)溃肪,則創(chuàng)建線程并執(zhí)行當前任務
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
//如線程數(shù)大于等于基本線程數(shù)或線程創(chuàng)建失敗免胃,則將當前任務放到工作隊列中。
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
//如果線程池不處于運行中或任務無法放入隊列惫撰,并且當前線程數(shù)量小于最大允許的線程數(shù)量羔沙,則創(chuàng)建一個線程執(zhí)行任務。
else if (!addIfUnderMaximumPoolSize(command))
//拋出RejectedExecutionException異常
reject(command); // is shutdown or saturated
}
}
工作線程
線程池創(chuàng)建線程時厨钻,會將線程封裝成工作線程Worker
扼雏,Worker
在執(zhí)行完任務后坚嗜,還會無限循環(huán)獲取工作隊列里的任務來執(zhí)行。我們可以從Worker
的run
方法里看到這點:
public void run() {
try {
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
}
3.3 解讀線程池
我們知道呢蛤,向線程池提交任務是用ThreadPoolExecutor的execute()
方法惶傻,但在其內(nèi)部,線程任務的處理其實是相當復雜的其障,涉及到ThreadPoolExecutor银室、Worker、Thread
三個類的6個方法:
3.3.1 execute()
在ThreadPoolExecutor
類中励翼,任務提交方法的入口是execute(Runnable command)
方法(submit()方法也是調(diào)用了execute())蜈敢,該方法其實只在嘗試做一件事:經(jīng)過各種校驗之后,調(diào)用 addWorker(Runnable command,boolean core)
方法為線程池創(chuàng)建一個線程并執(zhí)行任務汽抚,與之相對應抓狭,execute()的結果有兩個:
參數(shù)說明:
-
Runnable command
:待執(zhí)行的任務
執(zhí)行流程:
- 通過
ctl.get()
得到線程池的當前線程數(shù),如果線程數(shù)小于corePoolSize
造烁,則調(diào)用addWorker(commond,true)
方法創(chuàng)建新的線程執(zhí)行任務否过,否則執(zhí)行步驟2; - 步驟1失敗惭蟋,說明已經(jīng)無法再創(chuàng)建新線程苗桂,那么考慮將任務放入阻塞隊列,等待執(zhí)行完任務的線程來處理告组∶何埃基于此,判斷線程池是否處于
Running
狀態(tài)(只有Running
狀態(tài)的線程池可以接受新任務)木缝,如果任務添加到任務隊列成功則進入步驟3便锨,失敗則進入步驟4; - 來到這一步需要說明任務已經(jīng)加入任務隊列我碟,這時要二次校驗線程池的狀態(tài)放案,會有以下情形:
- 線程池不再是
Running
狀態(tài)了,需要將任務從任務隊列中移除矫俺,如果移除成功則拒絕本次任務 - 線程池是
Running
狀態(tài)吱殉,則判斷線程池工作線程是否為0,是則調(diào)用addWorker(commond,true)
添加一個沒有初始任務的線程(這個線程將去獲取已經(jīng)加入任務隊列的本次任務并執(zhí)行)恳守,否則進入步驟4; - 線程池不是
Running
狀態(tài)贩虾,但從任務隊列移除任務失敶吆妗(可能已被某線程獲取缎罢?)伊群,進入步驟4考杉;
- 線程池不再是
- 將線程池擴容至
maximumPoolSize
并調(diào)用addWorker(commond,false)
方法創(chuàng)建新的線程執(zhí)行任務,失敗則拒絕本次任務舰始。
流程圖:
3.3.2 addWorker()
addWorker(Runnable firstTask, boolean core)
方法崇棠,顧名思義,向線程池添加一個帶有任務的工作線程丸卷。
參數(shù)說明:
-
Runnable firstTask
:新創(chuàng)建的線程應該首先運行的任務(如果沒有枕稀,則為空)。 -
boolean core
:該參數(shù)決定了線程池容量的約束條件谜嫉,即當前線程數(shù)量以何值為極限值萎坷。參數(shù)為true
則使用corePollSize
作為約束值,否則使用maximumPoolSize
沐兰。
執(zhí)行流程:
- 外層循環(huán)判斷線程池的狀態(tài)是否可以新增工作線程哆档。這層校驗基于下面兩個原則:
- 線程池為
Running
狀態(tài)時,既可以接受新任務也可以處理任務 - 線程池為關閉狀態(tài)時只能新增空任務的工作線程(
worker
)處理任務隊列(workQueue
)中的任務不能接受新任務
- 線程池為
- 內(nèi)層循環(huán)向線程池添加工作線程并返回是否添加成功的結果住闯。
- 首先校驗線程數(shù)是否已經(jīng)超限制瓜浸,是則返回
false
,否則進入下一步 - 通過
CAS
使工作線程數(shù)+1
比原,成功則進入步驟3插佛,失敗則再次校驗線程池是否是運行狀態(tài),是則繼續(xù)內(nèi)層循環(huán)春寿,不是則返回外層循環(huán)
- 首先校驗線程數(shù)是否已經(jīng)超限制瓜浸,是則返回
- 核心線程數(shù)量+1成功的后續(xù)操作:添加到工作線程集合朗涩,并啟動工作線程
- 首先獲取鎖之后,再次校驗線程池狀態(tài)(具體校驗規(guī)則見代碼注解)绑改,通過則進入下一步谢床,未通過則添加線程失敗
- 線程池狀態(tài)校驗通過后,再檢查線程是否已經(jīng)啟動厘线,是則拋出異常识腿,否則嘗試將線程加入線程池
檢查線程是否啟動成功,成功則返回true,失敗則進入addWorkerFailed
方法
流程圖:
3.3.3 Worker類
Worker
類是內(nèi)部類,既實現(xiàn)了Runnable
芒填,又繼承了AbstractQueuedSynchronizer
(以下簡稱AQS
)郊酒,所以其既是一個可執(zhí)行的任務,又可以達到鎖的效果醉顽。
Worker
類主要維護正在運行任務的線程的中斷控制狀態(tài),以及其他次要的記錄。這個類適時地繼承了AbstractQueuedSynchronizer
類蹬昌,以簡化獲取和釋放鎖(該鎖作用于每個任務執(zhí)行代碼)的過程。這樣可以防止去中斷正在運行中的任務皂贩,只會中斷在等待從任務隊列中獲取任務的線程明刷。
我們實現(xiàn)了一個簡單的不可重入互斥鎖檬洞,而不是使用可重入鎖狸膏,因為我們不希望工作任務在調(diào)用setCorePoolSize
之類的池控制方法時能夠重新獲取鎖。另外添怔,為了在線程真正開始運行任務之前禁止中斷湾戳,我們將鎖狀態(tài)初始化為負值,并在啟動時清除它(在runWorker中)广料。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
// 通過構造函數(shù)初始化砾脑,
Worker(Runnable firstTask) {
//設置AQS的同步狀態(tài)
// state:鎖狀態(tài),-1為初始值艾杏,0為unlock狀態(tài)韧衣,1為lock狀態(tài)
setState(-1); // inhibit interrupts until runWorker 在調(diào)用runWorker前,禁止中斷
this.firstTask = firstTask;
// 線程工廠創(chuàng)建一個線程
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this); //runWorker()是ThreadPoolExecutor的方法
}
// Lock methods
// The value 0 represents the unlocked state. 0代表“沒被鎖定”狀態(tài)
// The value 1 represents the locked state. 1代表“鎖定”狀態(tài)
protected boolean isHeldExclusively() {
return getState() != 0;
}
/**
* 嘗試獲取鎖的方法
* 重寫AQS的tryAcquire()购桑,AQS本來就是讓子類來實現(xiàn)的
*/
protected boolean tryAcquire(int unused) {
// 判斷原值為0畅铭,且重置為1,所以state為-1時勃蜘,鎖無法獲取硕噩。
// 每次都是0->1,保證了鎖的不可重入性
if (compareAndSetState(0, 1)) {
// 設置exclusiveOwnerThread=當前線程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
/**
* 嘗試釋放鎖
* 不是state-1缭贡,而是置為0
*/
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
/**
* 中斷(如果運行)
* shutdownNow時會循環(huán)對worker線程執(zhí)行
* 且不需要獲取worker鎖炉擅,即使在worker運行時也可以中斷
*/
void interruptIfStarted() {
Thread t;
//如果state>=0、t!=null阳惹、且t沒有被中斷
//new Worker()時state==-1谍失,說明不能中斷
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
3.3.4 runWorker()
可以說,runWorker(Worker w)
是線程池中真正處理任務的方法莹汤,前面的execute()
和 addWorker()
都是在為該方法做準備和鋪墊快鱼。
參數(shù)說明:
-
Worker w
:封裝的Worker
,攜帶了工作線程的諸多要素,包括Runnable
(待處理任務)攒巍、lock
(鎖)、completedTasks
(記錄線程池已完成任務數(shù))
執(zhí)行流程:
- 判斷當前任務或者從任務隊列中獲取的任務是否不為空荒勇,都為空則進入步驟2柒莉,否則進入步驟3
- 任務為空,則將
completedAbruptly
置為false
(即線程不是突然終止)沽翔,并執(zhí)行processWorkerExit(w,completedAbruptly)
方法進入線程退出程序 - 任務不為空兢孝,則進入循環(huán),并加鎖
- 判斷是否為線程添加中斷標識仅偎,以下兩個條件滿足其一則添加中斷標識:
- 線程池狀態(tài)>=
STOP
,即STOP或TERMINATED
- 一開始判斷線程池狀態(tài)<
STOP
跨蟹,接下來檢查發(fā)現(xiàn)Thread.interrupted()為true
,即線程已經(jīng)被中斷橘沥,再次檢查線程池狀態(tài)是否>=STOP
(以消除該瞬間shutdown方法生效窗轩,使線程池處于STOP或TERMINATED)
- 線程池狀態(tài)>=
- 執(zhí)行前置方法
beforeExecute(wt, task)
(該方法為空方法,由子類實現(xiàn))后執(zhí)行task.run()
方法執(zhí)行任務(執(zhí)行不成功拋出相應異常) - 執(zhí)行后置方法
afterExecute(task, thrown)
(該方法為空方法座咆,由子類實現(xiàn))后將線程池已完成的任務數(shù)+1痢艺,并釋放鎖。 - 再次進行循環(huán)條件判斷介陶。
流程圖:
3.3.5 getTask()
由函數(shù)調(diào)用關系圖可知堤舒,在ThreadPoolExecutor
類的實現(xiàn)中,Runnable getTask()
方法是為void runWorker(Worker w)
方法服務的哺呜,它的作用就是在任務隊列(workQueue)中獲取 task(Runnable)
參數(shù)說明:無參數(shù)
執(zhí)行流程:
- 將
timedOut
(上次獲取任務是否超時)置為false
(首次執(zhí)行方法舌缤,無上次,自然為false)某残,進入一個無限循環(huán) - 如果線程池為
Shutdown
狀態(tài)且任務隊列為空(線程池shutdown
狀態(tài)可以處理任務隊列中的任務国撵,不再接受新任務,這個是重點)或者線程池為STOP或TERMINATED
狀態(tài)驾锰,則意味著線程池不必再獲取任務了卸留,當前工作線程數(shù)量-1并返回null,否則進入步驟3 - 如果線程池數(shù)量超限制或者時間超限且(任務隊列為空或當前線程數(shù)>1)椭豫,則進入步驟4耻瑟,否則進入步驟5。
- 移除工作線程赏酥,成功則返回null喳整,不成功則進入下輪循環(huán)。
- 嘗試用
poll() 或者 take()
(具體用哪個取決于timed的值)獲取任務裸扶,如果任務不為空框都,則返回該任務。如果為空呵晨,則將timeOut
置為true
進入下一輪循環(huán)魏保。如果獲取任務過程發(fā)生異常熬尺,則將 timeOut置為 false 后進入下一輪循環(huán)。
流程圖:
3.3.6 processWorkerExit()
processWorkerExit(Worker w, boolean completedAbruptly)
執(zhí)行線程退出的方法
參數(shù)說明:
-
Worker w
:要結束的工作線程谓罗。 -
boolean completedAbruptly
:是否突然完成(異常導致)粱哼,如果工作線程因為用戶異常死亡,則completedAbruptly
參數(shù)為true
檩咱。
執(zhí)行流程:
- 如果
completedAbruptly 為 true
揭措,即工作線程因為異常突然死亡,則執(zhí)行工作線程-1操作刻蚯。 - 主線程獲取鎖后绊含,線程池已經(jīng)完成的任務數(shù)追加 w(當前工作線程) 完成的任務數(shù),并從
worker
的set集合中移除當前worker炊汹。 - 根據(jù)線程池狀態(tài)進行判斷是否執(zhí)行
tryTerminate()
結束線程池躬充。 - 是否需要增加工作線程,如果線程池還沒有完全終止讨便,仍需要保持一定數(shù)量的線程麻裳。
- 如果當前線程是突然終止的,調(diào)用
addWorker()
創(chuàng)建工作線程 - 當前線程不是突然終止器钟,但當前工作線程數(shù)量小于線程池需要維護的線程數(shù)量津坑,則創(chuàng)建工作線程。需要維護的線程數(shù)量為
corePoolSize
(取決于成員變量allowCoreThreadTimeOut
是否為false
)或1傲霸。
- 如果當前線程是突然終止的,調(diào)用
3.4 線程池的線程回收--回收的是非核心線程嗎
一個線程池疆瑰,如下:
ExecutorService executorService =
ThreadPoolExecutor(
2,
3,
60_000,
TimeUnit.MILLISECONDS,
LinkedBlockingQueue(2),
ZjtThreadFactory()
)
即核心池的大小是2,最大線程數(shù)是3昙啄,等待隊列的大小是2穆役,非核心線程存活的時間是60秒。
假設現(xiàn)在向線程池中提交5個任務梳凛,每個任務耗時1s耿币,那么按照線程池的原理,肯定是先創(chuàng)建2個線程(線程1和線程2)分別執(zhí)行任務1和任務2韧拒,然后任務3和任務4放到等待隊列中淹接,然后再創(chuàng)建一個線程執(zhí)行任務5,等1s任務結束后叛溢,線程1和線程2去等待隊列中去取任務3和任務4執(zhí)行塑悼。再然后就是任務執(zhí)行完了,線程1和線程2和線程3都在等待任務楷掉,等待60秒之后會回收線程3厢蒜。
注意本例中是回收線程3,并不是因為線程3是非核心線程,而是線程1和線程2執(zhí)行完任務1和任務2后又去隊列中取任務3和任務4執(zhí)行了斑鸦,線程3最先等待
愕贡,所以會先回收線程3。
4 合理的配置線程池
4.1 線程池分析
要想合理的配置線程池巷屿,就必須首先分析任務特性颂鸿,可以從以下幾個角度來進行分析:
-
任務的性質(zhì)
:CPU
密集型任務,IO
密集型任務和混合型任務攒庵。 -
任務的優(yōu)先級
:高,中和低败晴。 -
任務的執(zhí)行時間
:長浓冒,中和短。 -
任務的依賴性
:是否依賴其他系統(tǒng)資源尖坤,如數(shù)據(jù)庫連接稳懒。
任務性質(zhì)不同的任務可以用不同規(guī)模的線程池分開處理。CPU
密集型任務配置盡可能少的線程數(shù)量慢味,如配置Ncpu+1
個線程的線程池场梆。IO
密集型任務則由于需要等待IO
操作,線程并不是一直在執(zhí)行任務纯路,則配置盡可能多的線程或油,如2*Ncpu
〕刍#混合型的任務顶岸,如果可以拆分,則將其拆分成一個CPU
密集型任務和一個IO
密集型任務叫编,只要這兩個任務執(zhí)行的時間相差不是太大辖佣,那么分解后執(zhí)行的吞吐率要高于串行執(zhí)行的吞吐率,如果這兩個任務執(zhí)行時間相差太大搓逾,則沒必要進行分解卷谈。我們可以通過Runtime.getRuntime().availableProcessors()
方法獲得當前設備的CPU
個數(shù)。
優(yōu)先級不同的任務可以使用優(yōu)先級隊列PriorityBlockingQueue
來處理霞篡。它可以讓優(yōu)先級高的任務先得到執(zhí)行世蔗,需要注意的是如果一直有優(yōu)先級高的任務提交到隊列里,那么優(yōu)先級低的任務可能永遠不能執(zhí)行朗兵。
執(zhí)行時間不同的任務可以交給不同規(guī)模的線程池來處理凸郑,或者也可以使用優(yōu)先級隊列,讓執(zhí)行時間短的任務先執(zhí)行矛市。
依賴數(shù)據(jù)庫連接池的任務芙沥,因為線程提交SQL
后需要等待數(shù)據(jù)庫返回結果,如果等待的時間越長CPU
空閑時間就越長,那么線程數(shù)應該設置越大而昨,這樣才能更好的利用CPU
救氯。
4.2 有界隊列
建議使用有界隊列,有界隊列能增加系統(tǒng)的穩(wěn)定性和預警能力歌憨,可以根據(jù)需要設大一點着憨,比如幾千。有一次使用的后臺任務線程池的隊列和線程池全滿了务嫡,不斷的拋出拋棄任務的異常甲抖,通過排查發(fā)現(xiàn)是數(shù)據(jù)庫出現(xiàn)了問題,導致執(zhí)行SQL變得非常緩慢心铃,因為后臺任務線程池里的任務全是需要向數(shù)據(jù)庫查詢和插入數(shù)據(jù)的准谚,所以導致線程池里的工作線程全部阻塞住,任務積壓在線程池里去扣。如果當時我們設置成無界隊列柱衔,線程池的隊列就會越來越多,有可能會撐滿內(nèi)存愉棱,導致整個系統(tǒng)不可用唆铐,而不只是后臺任務出現(xiàn)問題
5 線程池的監(jiān)控
5.1 通過線程池提供參數(shù)監(jiān)控
線程池里有一些屬性在監(jiān)控線程池的時候可以使用:
-
taskCount
:線程池需要執(zhí)行的任務數(shù)量。 -
completedTaskCount
:線程池在運行過程中已完成的任務數(shù)量奔滑。小于或等于taskCount
-
largestPoolSize
:線程池曾經(jīng)創(chuàng)建過的最大線程數(shù)量艾岂。通過這個數(shù)據(jù)可以知道線程池是否滿過。如等于線程池的最大大小朋其,則表示線程池曾經(jīng)滿了澳盐。 -
getPoolSize
:線程池的線程數(shù)量。如果線程池不銷毀的話令宿,池里的線程不會自動銷毀叼耙,所以這個大小只增不減。 -
getActiveCount
:獲取活動的線程數(shù)粒没。
5.2 通過擴展線程池進行監(jiān)控
通過繼承線程池并重寫線程池的beforeExecute
筛婉,afterExecute
和terminated
方法,我們可以在任務執(zhí)行前
癞松,執(zhí)行后
和線程池關閉前
干一些事情爽撒。如監(jiān)控任務的平均執(zhí)行時間,最大執(zhí)行時間和最小執(zhí)行時間等响蓉。這幾個方法在線程池里是空方法硕勿。如:
protected void beforeExecute(Thread t, Runnable r) { }
附:Spring線程池ThreadPoolTaskExecutor的使用
6 線程組
6.1 線程組定義
可以把線程歸屬到某一個線程組中,線程組中可以有線程對象
枫甲,也可以有線程組
源武,組中還可以有線程扼褪,這樣的組織結構有點類似于樹的形式,如圖所示.
線程組的作用是:可以批量管理線程或線程組對象粱栖,有效地對線程或線程組對象進行組織
相關構造方法:
-
ThreadGroup(String name)
:構造一個新線程組 -
ThreadGroup(ThreadGroup parent,String name)
:構造一個新線程組
注意一下第二個话浇,假如要使用多級關聯(lián)一般就是用第二個構造函數(shù)。第一個參數(shù)表示新線程組的父線程組闹究,第二個參數(shù)表示新線程組的名稱幔崖,有了父線程組和新線程組的名稱,自然可以構造出一個新的線程組來了渣淤。
注意
:線程必須啟動后才能歸到指定線程組中
6.2 使用操作
ThreadGroup
其實比ExecutorService
更好
用java做抓取的時候免不了要用到多線程的了赏寇,因為要同時抓取多個網(wǎng)站或一條線程抓取一個網(wǎng)站的話實在太慢,而且有時一條線程抓取同一個網(wǎng)站的話也比較浪費CPU資源价认。要用到多線程的等方面嗅定,也就免不了對線程的控制或用到線程池
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
public class JavaThreadPool {
public static void main(String[] args) {
// 創(chuàng)建一個可重用固定線程數(shù)的線程池
ExecutorService pool = Executors.newFixedThreadPool(2);
// 創(chuàng)建實現(xiàn)了Runnable接口對象,Thread對象當然也實現(xiàn)了Runnable接口
Thread t1 = new MyThread();
Thread t2 = new MyThread();
Thread t3 = new MyThread();
Thread t4 = new MyThread();
Thread t5 = new MyThread();
// 將線程放入池中進行執(zhí)行
pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.execute(t5);
// 關閉線程池
pool.shutdown();
}
}
class MyThread extends Thread {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "正在執(zhí)行刻伊。。椒功。");
}
}
后來發(fā)現(xiàn)ExecutorService的功能沒有想像中的那么好捶箱,而且最多只是提供一個線程的容器而然,所以后來改用了java.lang.ThreadGroup动漾,ThreadGroup有很多優(yōu)勢丁屎,最重要的一點就是它可以對線程進行遍歷,知道那些線程已經(jīng)運行完畢旱眯,還有那些線程在運行晨川。關于ThreadGroup的使用代碼如下:
class MyThread extends Thread {
boolean stopped;
MyThread(ThreadGroup tg, String name) {
super(tg, name);
stopped = false;
}
public void run() {
System.out.println(Thread.currentThread().getName() + " starting.");
try {
for (int i = 1; i < 1000; i++) {
System.out.print(".");
Thread.sleep(250);
synchronized (this) {
if (stopped)break;
}
}
} catch (Exception exc) {
System.out.println(Thread.currentThread().getName() + " interrupted.");
}
System.out.println(Thread.currentThread().getName() + " exiting.");
}
synchronized void myStop() {
stopped = true;
}
}
public class Main {
public static void main(String args[]) throws Exception {
ThreadGroup tg = new ThreadGroup("My Group");
MyThread thrd = new MyThread(tg, "MyThread #1");
MyThread thrd2 = new MyThread(tg, "MyThread #2");
MyThread thrd3 = new MyThread(tg, "MyThread #3");
thrd.start();
thrd2.start();
thrd3.start();
Thread.sleep(1000);
System.out.println(tg.activeCount() + " threads in thread group.");
Thread thrds[] = new Thread[tg.activeCount()];
tg.enumerate(thrds);
for (Thread t : thrds)
System.out.println(t.getName());
thrd.myStop();
Thread.sleep(1000);
System.out.println(tg.activeCount() + " threads in tg.");
tg.interrupt();
}
}
由以上的代碼可以看出:ThreadGroup比ExecutorService多以下幾個優(yōu)勢
-
ThreadGroup
可以遍歷線程,知道那些線程已經(jīng)運行完畢删豺,那些還在運行 - 可以通過
ThreadGroup.activeCount
知道有多少線程從而可以控制插入的線程數(shù)
6.3 和線程池區(qū)別
線程組和線程池區(qū)別:
- 線程組:
線程組存在的意義共虑,首要原因是安全
java
默認創(chuàng)建的線程都是屬于系統(tǒng)線程組
,而同一個線程組的線程是可以相互修改對方的數(shù)據(jù)的呀页。
但如果在不同的線程組中妈拌,那么就不能跨線程組
修改數(shù)據(jù),可以從一定程度上保證數(shù)據(jù)安全蓬蝶。 - 線程池:
線程池存在的意義尘分,首要作用是效率
線程的創(chuàng)建和結束都需要耗費一定的系統(tǒng)時間(特別是創(chuàng)建),不停創(chuàng)建和刪除線程會浪費大量的時間丸氛。所以培愁,在創(chuàng)建出一條線程并使其在執(zhí)行完任務后不結束,而是使其進入休眠狀態(tài)缓窜,在需要用時再喚醒定续,那么 就可以節(jié)省一定的時間谍咆。
如果這樣的線程比較多,那么就可以使用線程池來進行管理香罐。保證效率卧波。 - 線程組和線程池共有的特點:
都是管理一定數(shù)量的線程
都可以對線程進行控制---包括休眠,喚醒庇茫,結束港粱,創(chuàng)建,中斷(暫停)--但并不一定包含全部這些操作旦签。