文章來(lái)自:http://www.54tianzhisheng.cn/2017/07/29/ThreadPool/
Wiki 上是這樣解釋的:Thread Pool
作用:利用線程池可以大大減少在創(chuàng)建和銷毀線程上所花的時(shí)間以及系統(tǒng)資源的開(kāi)銷!
下面主要講下線程池中最重要的一個(gè)類 ThreadPoolExecutor 店读。
ThreadPoolExecutor 構(gòu)造器:
有四個(gè)構(gòu)造器的,挑了參數(shù)最長(zhǎng)的一個(gè)進(jìn)行講解。
七個(gè)參數(shù):
corePoolSize:核心池的大小,在創(chuàng)建了線程池后斋否,默認(rèn)情況下堆巧,線程池中并沒(méi)有任何線程,而是等待有任務(wù)到來(lái)才創(chuàng)建線程去執(zhí)行任務(wù)旋奢,當(dāng)有任務(wù)來(lái)之后,就會(huì)創(chuàng)建一個(gè)線程去執(zhí)行任務(wù)雳锋,當(dāng)線程池中的線程數(shù)目達(dá)到corePoolSize后黄绩,就會(huì)把到達(dá)的任務(wù)放到緩存隊(duì)列當(dāng)中;
maximumPoolSize:線程池最大線程數(shù)玷过;
keepAliveTime:表示線程沒(méi)有任務(wù)執(zhí)行時(shí)最多保持多久時(shí)間會(huì)終止爽丹;
unit:參數(shù)keepAliveTime的時(shí)間單位(DAYS、HOURS辛蚊、MINUTES粤蝎、SECONDS 等);
workQueue:阻塞隊(duì)列袋马,用來(lái)存儲(chǔ)等待執(zhí)行的任務(wù)初澎;
ArrayBlockingQueue (有界隊(duì)列)
LinkedBlockingQueue (無(wú)界隊(duì)列)
SynchronousQueue
threadFactory:線程工廠,主要用來(lái)創(chuàng)建線程
handler:拒絕處理任務(wù)的策略
AbortPolicy:丟棄任務(wù)并拋出 RejectedExecutionException 異常虑凛。(默認(rèn)這種)
DiscardPolicy:也是丟棄任務(wù)碑宴,但是不拋出異常
DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過(guò)程)
CallerRunsPolicy:由調(diào)用線程處理該任務(wù)
重要方法:
execute():通過(guò)這個(gè)方法可以向線程池提交一個(gè)任務(wù)桑谍,交由線程池去執(zhí)行延柠;
shutdown():關(guān)閉線程池;
execute() 方法:
注:JDK 1.7 和 1.8 這個(gè)方法有點(diǎn)區(qū)別锣披,下面代碼是 1.8 中的贞间。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
publicvoidexecute(Runnable command){
if(command ==null)
thrownewNullPointerException();
intc = ctl.get();
//1、如果當(dāng)前的線程數(shù)小于核心線程池的大小雹仿,根據(jù)現(xiàn)有的線程作為第一個(gè) Worker 運(yùn)行的線程增热,新建一個(gè) Worker,addWorker 自動(dòng)的檢查當(dāng)前線程池的狀態(tài)和 Worker 的數(shù)量胧辽,防止線程池在不能添加線程的狀態(tài)下添加線程
if(workerCountOf(c) < corePoolSize) {
if(addWorker(command,true))
return;
c = ctl.get();
}
//2峻仇、如果線程入隊(duì)成功,然后還是要進(jìn)行 double-check 的邑商,因?yàn)榫€程在入隊(duì)之后狀態(tài)是可能會(huì)發(fā)生變化的
if(isRunning(c) && workQueue.offer(command)) {
intrecheck = ctl.get();
// recheck 防止線程池狀態(tài)的突變础浮,如果突變帆调,那么將 reject 線程奠骄,防止 workQueue 中增加新線程
if(! isRunning(recheck) && remove(command))
reject(command);
elseif(workerCountOf(recheck) ==0)//上下兩個(gè)操作都有 addWorker 的操作豆同,但是如果在workQueue.offer 的時(shí)候 Worker 變?yōu)?0,那么將沒(méi)有 Worker 執(zhí)行新的 task含鳞,所以增加一個(gè) Worker.
addWorker(null,false);
}
//3影锈、如果 task 不能入隊(duì)(隊(duì)列滿了),這時(shí)候嘗試增加一個(gè)新線程蝉绷,如果增加失敗那么當(dāng)前的線程池狀態(tài)變化了或者線程池已經(jīng)滿了然后拒絕task
elseif(!addWorker(command,false))
reject(command);
}
其中調(diào)用了 addWorker() 方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
privatebooleanaddWorker(Runnable firstTask,booleancore){// firstTask: 新增一個(gè)線程并執(zhí)行這個(gè)任務(wù)鸭廷,可空,增加的線程從隊(duì)列獲取任務(wù)熔吗;core:是否使用 corePoolSize 作為上限辆床,否則使用 maxmunPoolSize
retry:
for(;;) {
intc = ctl.get();
intrs = runStateOf(c);
// Check if queue empty only if necessary.
/**
* rs!=Shutdown || fistTask!=null || workQueue.isEmpty
* 如果當(dāng)前的線程池的狀態(tài) > SHUTDOWN 那么拒絕 Worker 的 add 如果 =SHUTDOWN
* 那么此時(shí)不能新加入不為 null 的 Task桅狠,如果在 workQueue 為 empty 的時(shí)候不能加入任何類型的 Worker讼载,
* 如果不為 empty 可以加入 task 為 null 的 Worker, 增加消費(fèi)的 Worker
*/
if(rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask ==null&&! workQueue.isEmpty()))
returnfalse;
for(;;) {
intwc = workerCountOf(c);
//如果當(dāng)前的數(shù)量超過(guò)了 CAPACITY,或者超過(guò)了 corePoolSize 和 maximumPoolSize(試 core 而定)
if(wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
returnfalse;
//CAS 嘗試增加線程數(shù)中跌,如果失敗咨堤,證明有競(jìng)爭(zhēng),那么重新到 retry漩符。
if(compareAndIncrementWorkerCount(c))// AtomicInteger 的 CAS 操作;
breakretry;
c = ctl.get();// Re-read ctl
//判斷當(dāng)前線程池的運(yùn)行狀態(tài),狀態(tài)發(fā)生改變一喘,重試 retry;
if(runStateOf(c) != rs)
continueretry;
// else CAS failed due to workerCount change; retry inner loop
}
}
booleanworkerStarted =false;
booleanworkerAdded =false;
Worker w =null;
try{
w =newWorker(firstTask);// Worker 為內(nèi)部類,封裝了線程和任務(wù)嗜暴,通過(guò) ThreadFactory 創(chuàng)建線程凸克,可能失敗拋異常或者返回 null
finalThread t = w.thread;
if(t !=null) {
finalReentrantLock mainLock =this.mainLock;
mainLock.lock();
try{
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
intrs = runStateOf(ctl.get());
if(rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask ==null)) {
if(t.isAlive())// precheck that t is startable
// SHUTDOWN 以后的狀態(tài)和 SHUTDOWN 狀態(tài)下 firstTask 為 null闷沥,不可新增線程
thrownewIllegalThreadStateException();
workers.add(w);
ints = workers.size();
if(s > largestPoolSize)
largestPoolSize = s;//記錄最大線程數(shù)
workerAdded =true;
}
}finally{
mainLock.unlock();
}
if(workerAdded) {
t.start();
workerStarted =true;
}
}
}finally{
if(! workerStarted)
addWorkerFailed(w);//失敗回退,從 wokers 移除 w, 線程數(shù)減一萎战,嘗試結(jié)束線程池(調(diào)用tryTerminate 方法)
}
returnworkerStarted;
}
示意圖:
執(zhí)行流程:
1、當(dāng)有任務(wù)進(jìn)入時(shí)狐赡,線程池創(chuàng)建線程去執(zhí)行任務(wù)撞鹉,直到核心線程數(shù)滿為止
2、核心線程數(shù)量滿了之后颖侄,任務(wù)就會(huì)進(jìn)入一個(gè)緩沖的任務(wù)隊(duì)列中
當(dāng)任務(wù)隊(duì)列為無(wú)界隊(duì)列時(shí)鸟雏,任務(wù)就會(huì)一直放入緩沖的任務(wù)隊(duì)列中,不會(huì)和最大線程數(shù)量進(jìn)行比較
當(dāng)任務(wù)隊(duì)列為有界隊(duì)列時(shí)览祖,任務(wù)先放入緩沖的任務(wù)隊(duì)列中孝鹊,當(dāng)任務(wù)隊(duì)列滿了之后,才會(huì)將任務(wù)放入線程池展蒂,此時(shí)會(huì)與線程池中最大的線程數(shù)量進(jìn)行比較又活,如果超出了苔咪,則默認(rèn)會(huì)拋出異常。然后線程池才會(huì)執(zhí)行任務(wù)柳骄,當(dāng)任務(wù)執(zhí)行完团赏,又會(huì)將緩沖隊(duì)列中的任務(wù)放入線程池中,然后重復(fù)此操作耐薯。
shutdown() 方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
publicvoidshutdown(){
finalReentrantLock mainLock =this.mainLock;
mainLock.lock();
try{
//判斷是否可以操作目標(biāo)線程
checkShutdownAccess();
//設(shè)置線程池狀態(tài)為 SHUTDOWN, 此處之后舔清,線程池中不會(huì)增加新 Task
advanceRunState(SHUTDOWN);
//中斷所有的空閑線程
interruptIdleWorkers();
onShutdown();// hook for ScheduledThreadPoolExecutor
}finally{
mainLock.unlock();
}
//轉(zhuǎn)到 Terminate
tryTerminate();
}
參考資料:深入理解java線程池—ThreadPoolExecutor
創(chuàng)建一個(gè)定長(zhǎng)線程池曲初,可控制線程最大并發(fā)數(shù)体谒,超出的線程會(huì)在隊(duì)列中等待。
創(chuàng)建一個(gè)單線程化的線程池抒痒,它只會(huì)用唯一的工作線程來(lái)執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級(jí))執(zhí)行颁褂。
創(chuàng)建一個(gè)可緩存線程池,如果線程池長(zhǎng)度超過(guò)處理需要痢虹,可靈活回收空閑線程被去,若無(wú)可回收,則新建線程奖唯。
創(chuàng)建一個(gè)定長(zhǎng)線程池,支持定時(shí)及周期性任務(wù)執(zhí)行丰捷。
四種線程池其實(shí)內(nèi)部方法都是調(diào)用的 ThreadPoolExecutor 類坯墨,只不過(guò)利用了其不同的構(gòu)造器方法而已(傳入自己需要傳入的參數(shù)),那么利用這個(gè)特性病往,我們自己也是可以實(shí)現(xiàn)自己定義的線程池的捣染。
1、創(chuàng)建任務(wù)類
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
packagecom.zhisheng.thread.threadpool.demo;
/**
* Created by 10412 on 2017/7/24.
* 任務(wù)
*/
publicclassMyTaskimplementsRunnable
{
privateinttaskId;//任務(wù) id
privateString taskName;//任務(wù)名字
publicintgetTaskId(){
returntaskId;
}
publicvoidsetTaskId(inttaskId){
this.taskId = taskId;
}
publicStringgetTaskName(){
returntaskName;
}
publicvoidsetTaskName(String taskName){
this.taskName = taskName;
}
publicMyTask(inttaskId, String taskName){
this.taskId = taskId;
this.taskName = taskName;
}
@Override
publicvoidrun(){
System.out.println("當(dāng)前正在執(zhí)行 ******? 線程Id-->"+ taskId +",任務(wù)名稱-->"+ taskName);
try{
Thread.currentThread().sleep(5*1000);
}catch(InterruptedException e) {
e.printStackTrace();
}
System.out.println("線程Id-->"+ taskId +",任務(wù)名稱-->"+ taskName +"? -----------? 執(zhí)行完畢停巷!");
}
}
2耍攘、自定義拒絕策略,實(shí)現(xiàn) RejectedExecutionHandler 接口畔勤,重寫(xiě) rejectedExecution 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
packagecom.zhisheng.thread.threadpool.demo;
importjava.util.concurrent.RejectedExecutionHandler;
importjava.util.concurrent.ThreadPoolExecutor;
/**
* Created by 10412 on 2017/7/24.
* 自定義拒絕策略蕾各,實(shí)現(xiàn) RejectedExecutionHandler 接口
*/
publicclassRejectedThreadPoolHandlerimplementsRejectedExecutionHandler
{
publicRejectedThreadPoolHandler(){
}
@Override
publicvoidrejectedExecution(Runnable r, ThreadPoolExecutor executor){
System.out.println("WARNING 自定義拒絕策略: Task "+ r.toString() +" rejected from "+ executor.toString());
}
}
3、創(chuàng)建線程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
packagecom.zhisheng.thread.threadpool.demo;
importjava.util.concurrent.ArrayBlockingQueue;
importjava.util.concurrent.ThreadPoolExecutor;
importjava.util.concurrent.TimeUnit;
/**
* Created by 10412 on 2017/7/24.
*/
publicclassThreadPool
{
publicstaticvoidmain(String[] args){
//核心線程數(shù)量為 2庆揪,最大線程數(shù)量 4式曲,空閑線程存活的時(shí)間 60s,有界隊(duì)列長(zhǎng)度為 3,
//ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3));
//核心線程數(shù)量為 2,最大線程數(shù)量 4吝羞,空閑線程存活的時(shí)間 60s兰伤, 無(wú)界隊(duì)列,
//ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
//核心線程數(shù)量為 2,最大線程數(shù)量 4钧排,空閑線程存活的時(shí)間 60s敦腔,有界隊(duì)列長(zhǎng)度為 3, 使用自定義拒絕策略
ThreadPoolExecutor pool =newThreadPoolExecutor(2,4,60, TimeUnit.SECONDS,
newArrayBlockingQueue(3),newRejectedThreadPoolHandler());
for(inti =1; i <=10; i++) {
//創(chuàng)建 10 個(gè)任務(wù)
MyTask task =newMyTask(i,"任務(wù)"+ i);
//運(yùn)行
pool.execute(task);
System.out.println("活躍的線程數(shù):"+pool.getActiveCount() +",核心線程數(shù):"+ pool.getCorePoolSize() +",線程池大小:"+ pool.getPoolSize() +",隊(duì)列的大小"+ pool.getQueue().size());
}
//關(guān)閉線程池
pool.shutdown();
}
}
這里運(yùn)行結(jié)果就不截圖了卖氨,我在本地測(cè)試了代碼是沒(méi)問(wèn)題的会烙,感興趣的建議還是自己跑一下,然后分析下結(jié)果是不是和前面分析的一樣筒捺,如有問(wèn)題,請(qǐng)?jiān)谖?a target="_blank" rel="nofollow">博客下面評(píng)論纸厉!
本文一開(kāi)始講了線程池的介紹和好處系吭,然后分析了線程池中最核心的 ThreadPoolExecutor 類中構(gòu)造器的七個(gè)參數(shù)的作用、類中兩個(gè)重要的方法颗品,然后在對(duì)比研究了下 JDK 中自帶的四種線程池的用法和內(nèi)部代碼細(xì)節(jié)肯尺,最后寫(xiě)了一個(gè)自定義的線程池。