線程池作為Java中一個重要的知識點几晤,看了很多文章,在此以Java自帶的線程池為例植阴,記錄分析一下蟹瘾。本文參考了Java并發(fā)編程:線程池的使用、Java線程池---addWorker方法解析掠手、線程池憾朴、ThreadPoolExecutor中策略的選擇與工作隊列的選擇(java線程池)和ThreadPoolExecutor線程池解析與BlockingQueue的三種實現(xiàn)。本文基于JDK1.8實現(xiàn)用例和源碼分析喷鸽,并主要著重流程伊脓。
一、使用線程池
要知道一個東西的原理魁衙,首先要知道如何使用它。所以先上一個使用線程池的示例株搔。
1剖淀、任務(wù)類
要使用Java自帶的線程池,首先需要一個任務(wù)類纤房,這個任務(wù)類需要實現(xiàn)Runnable接口纵隔,并重寫run方法(需要多線程執(zhí)行的任務(wù)邏輯)。
package org.my.threadPoolDemo;
/**
* 任務(wù)類炮姨,實現(xiàn)Runnable接口 重寫run方法
*/
public class MyTask implements Runnable{
private int taskNum;
public MyTask(int taskNum) {
super();
this.taskNum = taskNum;
}
@Override
public void run() {
System.out.println("正在執(zhí)行task"+taskNum);
try {
Thread.currentThread().sleep(4000);//sleep 4秒模擬執(zhí)行代碼過程
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task"+taskNum+"執(zhí)行完畢");
}
}
2捌刮、創(chuàng)建線程池并執(zhí)行多個任務(wù)
有了任務(wù)類,接下來創(chuàng)建線程池绅作,并執(zhí)行多個任務(wù)俄认。我們使用ThreadPoolExecutor來創(chuàng)建線程池夜焦。
package org.my.threadPoolDemo;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolUseDemo {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>());
for (int i = 0; i < 15; i++) {
MyTask myTask = new MyTask(i);
executor.execute(myTask);
System.out.println("線程池中線程數(shù)量:"+executor.getPoolSize()+",線程池中等待執(zhí)行的任務(wù)數(shù)量:"+executor.getQueue().size()+",已執(zhí)行完的任務(wù)數(shù)量:"+executor.getCompletedTaskCount());
}
executor.shutdown();
}
}
邏輯很簡單,創(chuàng)建了一個線程池管理器卸伞,然后使用它執(zhí)行了15個任務(wù)。這里需要解釋一下構(gòu)造ThreadPoolExecutor的時候傳入的參數(shù)的意義:
- 5(corePoolSize)是指核心池大小弃酌。即創(chuàng)建的線程數(shù)量妓湘。如果線程池中線程數(shù)等于這個數(shù)量榜贴,那么下一個來的任務(wù)就會被放在任務(wù)隊列中(稍后詳細圖解)唬党。
- 10(maximumPoolSize)是指線程池能創(chuàng)建的最大線程數(shù)量驶拱。如果上一步的任務(wù)隊列已滿蓝纲,那么線程池將繼續(xù)創(chuàng)建線程,直到線程數(shù)=10箭养,下一個來的任務(wù)會被拒絕執(zhí)行闯冷。
- 200(keepAliveTime)是指表示線程沒有任務(wù)執(zhí)行時最多保持多久時間會終止。默認情況下坎弯,只有當線程池中的線程數(shù)大于corePoolSize時抠忘,keepAliveTime才會起作用拧咳,直到線程池中的線程數(shù)不大于corePoolSize,即當線程池中的線程數(shù)大于corePoolSize時灶体,如果一個線程空閑的時間達到keepAliveTime,則會終止蝎抽,直到線程池中的線程數(shù)不超過corePoolSize。但是如果調(diào)用了allowCoreThreadTimeOut(boolean)方法樟结,在線程池中的線程數(shù)不大于corePoolSize時,keepAliveTime參數(shù)也會起作用瓢宦,直到線程池中的線程數(shù)為0层坠。
- TimeUnit.MILLISECONDS是keepAliveTime的時間單位。
TimeUnit.DAYS; //天 TimeUnit.HOURS; //小時 TimeUnit.MINUTES; //分鐘 TimeUnit.SECONDS; //秒 TimeUnit.MILLISECONDS; //毫秒 TimeUnit.MICROSECONDS; //微妙 TimeUnit.NANOSECONDS; //納秒
- ArrayBlockingQueue是傳入的阻塞隊列,用來存放任務(wù)的隊列workQueue谦趣。即上文中提到的任務(wù)隊列摘悴。除了ArrayBlockingQueue蹂喻,還有LinkedBlockingQueue孵运、SynchronousQueue可以選擇。
ThreadPoolExecutor擁有四個構(gòu)造器蔓彩,除了上方傳入的參數(shù)以外从媚,還有另外的構(gòu)造器可以傳入:
- threadFactory:線程工廠宫纬,主要用來創(chuàng)建線程
- handler:表示拒絕執(zhí)行任務(wù)時的策略,有四種取值:
ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。 ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù)憔四,但是不拋出異常。 ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務(wù)伊群,然后重新嘗試執(zhí)行任務(wù)(重復此過程) ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)
執(zhí)行上面的程序洛口,結(jié)果如下:
正在執(zhí)行task0
線程池中線程數(shù)量:1,線程池中等待執(zhí)行的任務(wù)數(shù)量:0,已執(zhí)行完的任務(wù)數(shù)量:0
線程池中線程數(shù)量:2,線程池中等待執(zhí)行的任務(wù)數(shù)量:0,已執(zhí)行完的任務(wù)數(shù)量:0
正在執(zhí)行task1
線程池中線程數(shù)量:3,線程池中等待執(zhí)行的任務(wù)數(shù)量:0,已執(zhí)行完的任務(wù)數(shù)量:0
正在執(zhí)行task2
線程池中線程數(shù)量:4,線程池中等待執(zhí)行的任務(wù)數(shù)量:0,已執(zhí)行完的任務(wù)數(shù)量:0
正在執(zhí)行task3
線程池中線程數(shù)量:5,線程池中等待執(zhí)行的任務(wù)數(shù)量:0,已執(zhí)行完的任務(wù)數(shù)量:0
正在執(zhí)行task4
線程池中線程數(shù)量:5,線程池中等待執(zhí)行的任務(wù)數(shù)量:1,已執(zhí)行完的任務(wù)數(shù)量:0
線程池中線程數(shù)量:5,線程池中等待執(zhí)行的任務(wù)數(shù)量:2,已執(zhí)行完的任務(wù)數(shù)量:0
線程池中線程數(shù)量:5,線程池中等待執(zhí)行的任務(wù)數(shù)量:3,已執(zhí)行完的任務(wù)數(shù)量:0
線程池中線程數(shù)量:5,線程池中等待執(zhí)行的任務(wù)數(shù)量:4,已執(zhí)行完的任務(wù)數(shù)量:0
線程池中線程數(shù)量:5,線程池中等待執(zhí)行的任務(wù)數(shù)量:5,已執(zhí)行完的任務(wù)數(shù)量:0
線程池中線程數(shù)量:6,線程池中等待執(zhí)行的任務(wù)數(shù)量:5,已執(zhí)行完的任務(wù)數(shù)量:0
正在執(zhí)行task10
線程池中線程數(shù)量:7,線程池中等待執(zhí)行的任務(wù)數(shù)量:5,已執(zhí)行完的任務(wù)數(shù)量:0
正在執(zhí)行task11
線程池中線程數(shù)量:8,線程池中等待執(zhí)行的任務(wù)數(shù)量:5,已執(zhí)行完的任務(wù)數(shù)量:0
正在執(zhí)行task12
正在執(zhí)行task13
線程池中線程數(shù)量:9,線程池中等待執(zhí)行的任務(wù)數(shù)量:5,已執(zhí)行完的任務(wù)數(shù)量:0
線程池中線程數(shù)量:10,線程池中等待執(zhí)行的任務(wù)數(shù)量:5,已執(zhí)行完的任務(wù)數(shù)量:0
正在執(zhí)行task14
task1執(zhí)行完畢
task10執(zhí)行完畢
正在執(zhí)行task5
task11執(zhí)行完畢
task13執(zhí)行完畢
task4執(zhí)行完畢
task3執(zhí)行完畢
task2執(zhí)行完畢
task0執(zhí)行完畢
正在執(zhí)行task9
正在執(zhí)行task8
正在執(zhí)行task7
task14執(zhí)行完畢
task12執(zhí)行完畢
正在執(zhí)行task6
task5執(zhí)行完畢
task9執(zhí)行完畢
task7執(zhí)行完畢
task8執(zhí)行完畢
task6執(zhí)行完畢
可以看到,由于corePoolSize是5蹂空,所以當任務(wù)數(shù)量大于5的時候俯萌,接下來來的任務(wù)放入了任務(wù)隊列等待,但是由于任務(wù)隊列最大容量是5腌闯,maximumPoolSize=10绳瘟,所以在任務(wù)隊列滿了之后,線程池管理器又繼續(xù)創(chuàng)建了5個線程姿骏,最終線程池中線程數(shù)量達到了10糖声。這時候15個任務(wù)能夠由這些線程處理完,如果再增加任務(wù)分瘦,比如將for循環(huán)次數(shù)增加到20蘸泻,就出現(xiàn)了java.util.concurrent.RejectedExecutionException。
二嘲玫、原理分析
從上面使用線程池的例子來看悦施,最主要就是兩步,構(gòu)造ThreadPoolExecutor對象去团,然后每來一個任務(wù)抡诞,就調(diào)用ThreadPoolExecutor對象的execute方法。
1土陪、ThreadPoolExecutor結(jié)構(gòu)
ThreadPoolExecutor的主要結(jié)構(gòu)及繼承關(guān)系如下圖所示:
主要成員變量:任務(wù)隊列——存放那些暫時無法執(zhí)行的任務(wù)昼汗;工作線程池——存放當前啟用的所有線程;線程工廠——創(chuàng)建線程鬼雀;還有一些用來調(diào)度線程與任務(wù)并保證線程安全的成員顷窒。
了解了ThreadPoolExecutor的主要結(jié)構(gòu),再簡單梳理一下“一個傳入線程池的任務(wù)能夠被最終正常執(zhí)行需要經(jīng)過的主要流程”源哩,方法名稱前面沒有“XXX.”這種標注的都是ThreadPoolExecutor的方法:
2鞋吉、ThreadPoolExecutor構(gòu)造器及重要常量
簡單了解下構(gòu)造器鸦做,ThreadPoolExecutor的四個構(gòu)造器的源碼如下:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
從源碼中可以看出,這四個構(gòu)造器都是調(diào)用最后一個構(gòu)造器谓着,只是根據(jù)開發(fā)者傳入的參數(shù)的不同而填充一些默認的參數(shù)泼诱。比如如果開發(fā)者沒有傳入線程工廠threadFactory參數(shù),那么構(gòu)造器就使用默認的Executors.defaultThreadFactor漆魔。
在這里還要理解ThreadPoolExecutor的幾個常量的含義和幾個簡單方法:
//Integer.SIZE是一個靜態(tài)常量坷檩,值為32,也就是說COUNT_BITS是29
private static final int COUNT_BITS = Integer.SIZE - 3;
//CAPACITY是最大容量536870911改抡,因為1左移29位之后-1矢炼,導致最高三位為0,而其余29位全部為1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//ctl用于表示線程池狀態(tài)和有效線程數(shù)量阿纤,最高三位表示線程池的狀態(tài)句灌,其余低位表示有效線程數(shù)量
//初始化之后ctl等于RUNNING的值,即默認狀態(tài)是運行狀態(tài)欠拾,線程數(shù)量為0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//1110 0000 0000 0000 0000 0000 0000 0000最高三位為111
private static final int RUNNING = -1 << COUNT_BITS;
//最高三位為000
private static final int SHUTDOWN = 0 << COUNT_BITS;
//0010 0000 0000 0000 0000 0000 0000 0000最高三位為001
private static final int STOP = 1 << COUNT_BITS;
//0100 0000 0000 0000 0000 0000 0000 0000最高三位為010
private static final int TIDYING = 2 << COUNT_BITS;
//0110 0000 0000 0000 0000 0000 0000 0000最高三位為011
private static final int TERMINATED = 3 << COUNT_BITS;
/**
* 獲取運行狀態(tài)胰锌,入?yún)閏tl。因為CAPACITY是最高三位為0藐窄,其余低位為1
* 所以當取反的時候资昧,就只有最高三位為1,再經(jīng)過與運算荆忍,只會取到ctl的最高三位
* 而這最高三位如上文所述格带,表示線程池的狀態(tài)
*/
private static int runStateOf(int c) { return c & ~CAPACITY; }
/**
* 獲取工作線程數(shù)量,入?yún)閏tl刹枉。因為CAPACITY是最高三位為0叽唱,其余低位為1
* 所以當進行與運算的時候,只會取到低29位微宝,這29位正好表示有效線程數(shù)量
*/
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
//任務(wù)隊列棺亭,用于存放待執(zhí)行任務(wù)的阻塞隊列
private final BlockingQueue<Runnable> workQueue;
/** 判斷線程池是否是運行狀態(tài),傳入的參數(shù)是ctl的值
* 只有RUNNING的符號位是1蟋软,也就是只有RUNNING為負數(shù)
* 所以如果目前的ctl值<0镶摘,就是RUNNING狀態(tài)
*/
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
//從任務(wù)隊列中移除任務(wù)
public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
tryTerminate(); // In case SHUTDOWN and now empty
return removed;
}
3、getTask()源代碼
通過前面的流程圖岳守,我們知道getTask()是由runWorker方法調(diào)用的钉稍,目的是取出一個任務(wù)。
private Runnable getTask() {
//記錄循環(huán)體中上個循環(huán)在從阻塞隊列中取任務(wù)時是否超時
boolean timedOut = false;
//無條件循環(huán)棺耍,主要是在線程池運行正常情況下
//通過循環(huán)體內(nèi)部的阻塞隊列的阻塞時間,來控制當前線程的超時時間
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);//獲取線程池狀態(tài)
/*先獲取線程池的狀態(tài)种樱,如果狀態(tài)大于等于STOP蒙袍,也就是STOP俊卤、TIDYING、TERMINATED之一
*這時候不管隊列中有沒有任務(wù)害幅,都不用去執(zhí)行了消恍;
*如果線程池的狀態(tài)為SHUTDOWN且隊列中沒有任務(wù)了,也不用繼續(xù)執(zhí)行了
*將工作線程數(shù)量-1以现,并且返回null
**/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//獲取工作線程數(shù)量
int wc = workerCountOf(c);
//是否啟用超時參數(shù)keepAliveTime
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//如果這個條件成立狠怨,如果工作線程數(shù)量-1成功,返回null邑遏,否則跳出循環(huán)
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
//如果需要采用阻塞形式獲取佣赖,那么就poll設(shè)定阻塞時間,否則take無限期等待记盒。
//這里通過阻塞時間憎蛤,間接控制了超時的值,如果取值超時纪吮,意味著這個線程在超時時間內(nèi)處于空閑狀態(tài)
//那么下一個循環(huán)俩檬,將會return null并且線程數(shù)量-1
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
4、runWorker(Worker w)源代碼
通過上面流程圖碾盟,可以看出runWorker(Worker w)實際上已經(jīng)是在線程啟動之后執(zhí)行任務(wù)了棚辽,所以其主要邏輯就是獲取任務(wù),然后執(zhí)行任務(wù)的run方法冰肴。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;//獲取傳入的Worker對象中的任務(wù)
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
/*
* 如果傳入的任務(wù)為null屈藐,就從任務(wù)隊列中獲取任務(wù),只要這兩者有一個不為null嚼沿,就進入循環(huán)體
*/
while (task != null || (task = getTask()) != null) {
w.lock();
//如果線程池狀態(tài)已被標為停止估盘,那么則不允許該線程繼續(xù)執(zhí)行任務(wù)!或者該線程已是中斷狀態(tài),
//也不允許執(zhí)行任務(wù),還需要中斷該線程!
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);//為子類預(yù)留的空方法(鉤子)
Throwable thrown = null;
try {
task.run();//終于真正執(zhí)行傳入的任務(wù)了
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);//為子類預(yù)留的空方法(鉤子)
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
/*
* 從這里可以看出骡尽,當一個任務(wù)執(zhí)行完畢之后遣妥,循環(huán)并沒有退出,此時會再次執(zhí)行條件判斷
* 也就是說如果執(zhí)行完第一個任務(wù)之后攀细,任務(wù)隊列中還有任務(wù)箫踩,那么將會繼續(xù)在這個線程執(zhí)行。
* 這里也是很巧妙的地方谭贪,不需要額外開一個控制線程來看那些線程處于空閑狀態(tài)境钟,然后給他分配任務(wù)。
* 直接自己去任務(wù)隊列拿任務(wù)
*/
}
completedAbruptly = false;
} finally {
/*
* 執(zhí)行到這里俭识,說明getTask返回了null慨削,要么是超時(任務(wù)隊列沒有任務(wù)了),要么是線程池狀態(tài)有問題了
* 當前線程將被回收了
*/
processWorkerExit(w, completedAbruptly);
}
}
5、addWorker(Runnable firstTask, boolean core)源代碼
runWorker是從Worker對象中獲取第一個任務(wù)缚态,然后從任務(wù)隊列中一直獲取任務(wù)執(zhí)行磁椒。流程圖中已經(jīng)說過,這個Worker對象是在addWorker方法中創(chuàng)建的玫芦,所以新線程創(chuàng)建浆熔、啟動的源頭是在addWorker方法中。而addWorker是被execute所調(diào)用桥帆,execute根據(jù)addWorker的返回值医增,進行條件判斷。
private boolean addWorker(Runnable firstTask, boolean core) {
//上來就是retry老虫,后面continue retry;語句執(zhí)行之后都會從這里重新開始
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);//獲取線程池運行狀態(tài)
/*
* 獲取當前線程池的狀態(tài)叶骨,如果是STOP,TIDYING,TERMINATED狀態(tài)的話张遭,則會返回false
* 如果現(xiàn)在狀態(tài)是SHUTDOWN邓萨,但是firstTask不為空或者workQueue為空的話,那么直接返回false
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);//獲取工作線程數(shù)量
/*
* addWorker傳入的第二個Boolean參數(shù)用來判別當前線程數(shù)量是否大于核心池數(shù)量
* true菊卷,代表當前線程數(shù)量小于核心池數(shù)量
*/
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//增加工作線程數(shù)量
if (compareAndIncrementWorkerCount(c))
//如果成功缔恳,跳出retry
break retry;
c = ctl.get(); // Re-read ctl
//判斷線程池狀態(tài),改變了就重試一次
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
/*
* 前面順利增加了工作線程數(shù)洁闰,那么這里就真正創(chuàng)建Worker
* 上面流程圖中說過歉甚,創(chuàng)建Worker時會創(chuàng)建新的線程.如果這里創(chuàng)建失敗
* finally中會將工作線程數(shù)-1
*/
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);//將Worker放入工作線程池
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();//啟動線程
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
6、execute方法
execute方法其實最主要是根據(jù)線程池的策略來傳遞不同的參數(shù)給addWorker方法扑眉。
當調(diào)用 execute() 方法添加一個任務(wù)時纸泄,線程池會做如下判斷:
a. 如果正在運行的線程數(shù)量小于 corePoolSize,那么馬上創(chuàng)建線程運行這個任務(wù)腰素;
b. 如果正在運行的線程數(shù)量大于或等于 corePoolSize聘裁,那么將這個任務(wù)放入隊列。
c. 如果這時候隊列滿了弓千,而且正在運行的線程數(shù)量小于 maximumPoolSize衡便,那么還是要創(chuàng)建線程運行這個任務(wù);
d. 如果隊列滿了洋访,而且正在運行的線程數(shù)量大于或等于 maximumPoolSize镣陕,那么線程池會拋出異常,告訴調(diào)用者“我不能再接受任務(wù)了”姻政。
方法execute主要是在控制上面四條策略的實現(xiàn)呆抑。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//獲取到成員變量ctl的值(線程池狀態(tài))
int c = ctl.get();
//如果工作線程的數(shù)量<核心池的大小
if (workerCountOf(c) < corePoolSize) {
//調(diào)用addWorker(這里傳入的true代表工作線程數(shù)量<核心池大小)
//如果成功汁展,方法結(jié)束鹊碍。
if (addWorker(command, true))
return;
//否則厌殉,再重新獲取一次ctl的值
//個人理解是防止前面這段代碼執(zhí)行的時候有其他線程改變了ctl的值。
c = ctl.get();
}
//如果工作線程數(shù)量>=核心池的大小或者上一步調(diào)用addWorker返回false侈咕,繼續(xù)走到下面
//如果線程池處于運行狀態(tài)年枕,并且成功將當前任務(wù)放入任務(wù)隊列
if (isRunning(c) && workQueue.offer(command)) {
//為了線程安全,重新獲取ctl的值
int recheck = ctl.get();
//如果線程池不處于運行狀態(tài)并且任務(wù)從任務(wù)隊列移除成功
if (! isRunning(recheck) && remove(command))
//調(diào)用reject拒絕執(zhí)行乎完,根據(jù)handler的實現(xiàn)類拋出異常或者其他操作
reject(command);
//否則品洛,如果工作線程數(shù)量==0树姨,調(diào)用addWorker并傳入null和false
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//執(zhí)行到這里代表當前線程已超越了核心線程且任務(wù)提交到任務(wù)隊列失敗。(可以注意這里的addWorker是false)
//那么這里再次調(diào)用addWroker創(chuàng)建新線程(這時創(chuàng)建的線程是maximumPoolSize)桥状。
//如果還是提交任務(wù)失敗則調(diào)用reject處理失敗任務(wù)
else if (!addWorker(command, false))
reject(command);
}
三帽揪、線程池相關(guān)影響因素
1、阻塞隊列的選擇
本段引用自ThreadPoolExecutor中策略的選擇與工作隊列的選擇(java線程池)
直接提交:
工作隊列的默認選項是SynchronousQueue辅斟,它將任務(wù)直接提交給線程而不存儲它們转晰。在此,如果不存在可用于立即運行任務(wù)的線程士飒,則試圖把任務(wù)加入隊列將失敗查邢,因此會構(gòu)造一個新的線程。此策略可以避免在處理可能具有內(nèi)部依賴性的請求集時出現(xiàn)鎖酵幕。直接提交通常要求無界maximumPoolSize以避免拒絕新提交的任務(wù)扰藕。當命令以超過隊列所能處理的平均數(shù)連續(xù)到達時,此策略允許無界線程具有增長的可能性芳撒。
無界隊列:
使用無界隊列(例如邓深,不具有預(yù)定義容量的LinkedBlockingQueue將導致在所有 corePoolSize 線程都忙時新任務(wù)在隊列中等待。這樣笔刹,創(chuàng)建的線程就不會超過corePoolSize芥备。(因此,maximumPoolSize的值也就無效了舌菜。)當每個任務(wù)完全獨立于其他任務(wù)萌壳,即任務(wù)執(zhí)行互不影響時,適合于使用無界隊列酷师;例如讶凉,在 Web 頁服務(wù)器中。這種排隊可用于處理瞬態(tài)突發(fā)請求山孔,當命令以超過隊列所能處理的平均數(shù)連續(xù)到達時懂讯,此策略允許無界線程具有增長的可能性。
有界隊列:
當使用有限的maximumPoolSize時台颠,有界隊列(如 ArrayBlockingQueue)有助于防止資源耗盡褐望,但是可能較難調(diào)整和控制勒庄。隊列大小和最大池大小可能需要相互折衷:使用大型隊列和小型池可以最大限度地降低 CPU 使用率、操作系統(tǒng)資源和上下文切換開銷瘫里,但是可能導致人工降低吞吐量实蔽。如果任務(wù)頻繁阻塞(例如,如果它們是 I/O 邊界)谨读,則系統(tǒng)可能為超過您許可的更多線程安排時間局装。使用小型隊列通常要求較大的池大小,CPU 使用率較高劳殖,但是可能遇到不可接受的調(diào)度開銷铐尚,這樣也會降低吞吐量。
2哆姻、常用的線程池
在使用ThreadPoolExecutor創(chuàng)建線程池的時候宣增,根據(jù)不同參數(shù),可以使用不同構(gòu)造器構(gòu)造不同特性的線程池矛缨。而實際情況中爹脾,我們一般使用Executors類提供的方法來創(chuàng)建線程池。Executors最終也是調(diào)用ThreadPoolExecutor的構(gòu)造器箕昭,但是已經(jīng)配置好了相關(guān)參數(shù)灵妨。
1)固定大小的線程池:Executors.newFixedThreadPool
corePoolSize和maximumPoolSize相同,超時時間為0盟广,隊列用的LinkedBlockingQueue無界的FIFO隊列闷串,這表示這個線程池始終只有corePoolSize個線程在運行。根據(jù)前面分析的筋量,如果執(zhí)行完任務(wù)烹吵,這個線程會繼續(xù)從任務(wù)隊列取任務(wù)執(zhí)行,當沒有任務(wù)的時候桨武,線程會立即關(guān)閉肋拔。
2)單任務(wù)線程池:Executors.newSingleThreadExecutor
池中只有一個線程工作,阻塞隊列無界呀酸,它能保證按照任務(wù)提交的順序來執(zhí)行任務(wù)凉蜂。
3)可變尺寸的線程池:Executors.newCachedThreadPool
SynchronousQueue隊列,一個不存儲元素的阻塞隊列性誉。每個插入操作必須等到另一個線程調(diào)用移除操作窿吩。所以,當我們提交第一個任務(wù)的時候错览,是加入不了隊列的纫雁,這就滿足了,一個線程池條件“當無法加入隊列的時候倾哺,且任務(wù)沒有達到maximumPoolSize時轧邪,我們將新開啟一個線程任務(wù)”刽脖。所以我們的maximumPoolSize是ctl低29位的最大值。超時時間是60s忌愚,當一個線程沒有任務(wù)執(zhí)行會暫時保存60s超時時間曲管,如果沒有的新的任務(wù)的話,會從線程池中remove掉硕糊。
4)scheduled線程池院水,Executors.newScheduledThreadPool
創(chuàng)建一個大小無限的線程池。此線程池支持定時以及周期性執(zhí)行任務(wù)的需求
終于差不多可以畫上句號了简十,感覺有很多東西還是沒有細說衙耕,尤其是其中大量的重入鎖。有機會再補上勺远。綜合了很多網(wǎng)上資源,也自己畫了幾張圖时鸵,個人認為相對于很多介紹線程池的博客來說胶逢,更容易理解吧。如有錯誤饰潜,歡迎批評指正初坠。