@[toc]
一沃琅、什么是線程池
線程池就是創(chuàng)建若干個(gè)可執(zhí)行的線程放入一個(gè)池(容器)中,有任務(wù)需要處理時(shí)蜘欲,會(huì)提交到線程池中的任務(wù)隊(duì)列益眉,處理完之后線程并不會(huì)被銷毀,而是仍然在線程池中等待下一個(gè)任務(wù)姥份。
Java中的線程池是運(yùn)用場(chǎng)景最多的并發(fā)框架郭脂,幾乎所有需要異步或并發(fā)執(zhí)行任務(wù)的程序都可以使用線程池。在開發(fā)過(guò)程中澈歉,合理地使用線程池能夠帶來(lái)以下下好處
- <font color = "orange">降低資源消耗 </font>展鸡,通過(guò)重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。
- <font color = "orange">提高響應(yīng)速度</font> 埃难,當(dāng)任務(wù)到達(dá)時(shí)莹弊,任務(wù)可以不需要等到線程創(chuàng)建就能立即執(zhí)行。
- <font color = "orange">提高線程的可管理性</font>涡尘,線程是稀缺資源忍弛,如果無(wú)限制地創(chuàng)建,不僅會(huì)消耗系統(tǒng)資源考抄,還會(huì)降低系統(tǒng)的穩(wěn)定性细疚,使用線程池可以進(jìn)行統(tǒng)一分配、調(diào)優(yōu)和監(jiān)控
二川梅、Executor 框架
我們知道線程池就是線程的集合惠昔,下提供了集中管理、線程重用挑势、降低資源消耗镇防、提高響應(yīng)速度等 從 JDK 1.5之后。為了把工作單元與執(zhí)行機(jī)制分開潮饱,<font color = "orange">Executor</font> 框架誕生了来氧,他是一個(gè)用于統(tǒng)一創(chuàng)建與運(yùn)行的接口。<font color = "orange">Executor</font> 框架實(shí)現(xiàn)的就是線程池的功能香拉。<font color = "orange">Executor</font> 框架不僅包括了線程池的管理啦扬,還提供了線程工廠、隊(duì)列以及拒絕策略等凫碌,<font color = "orange">Executor</font> 框架讓并發(fā)編程變得更加簡(jiǎn)單扑毡。
2.1 Executor 框架組成
1. 任務(wù) (Runnable / Callable)
執(zhí)行任務(wù)需要實(shí)現(xiàn) Runnable 接口或者 Callable 接口
2. 任務(wù)等執(zhí)行 (Executor)
包括任務(wù)執(zhí)行機(jī)制的核心接口Executor,以及繼承自Executor的 ExecutorService 接口盛险。Executor框架有兩個(gè)關(guān)鍵類實(shí)現(xiàn)了ExecutorService接口:ThreadPoolExecutor 和 ScheduledThreadPoolExecutor
ExecutorService executorService = Executors.newFixedThreadPool(5);
我們實(shí)現(xiàn)線程池通過(guò) Executors 實(shí)現(xiàn) ExecutorService 接口瞄摊,ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 這兩個(gè)關(guān)鍵類實(shí)現(xiàn)了 ExecutorService 接口
ThreadPoolExecutor 類描述 :
//AbstractExecutorService實(shí)現(xiàn)了ExecutorService接口
public class ThreadPoolExecutor extends AbstractExecutorService
public abstract class AbstractExecutorService implements ExecutorService
ScheduledThreadPoolExecutor 類描述:
//繼承ThreadPoolExecutor
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService
3. 異步計(jì)算的結(jié)果 (Future)
包括Future和實(shí)現(xiàn)Future接口的FutureTask類勋又。
2.2 Executor 結(jié)構(gòu)
- <font color = "orange">Executor</font> :是一個(gè)接口,他是Executor框架的基礎(chǔ)换帜,它將任務(wù)的提交與任務(wù)的執(zhí)行分離楔壤。
- <font color = "orange">ThreadPoolExecutor</font> :是線程池的核心實(shí)現(xiàn)類,用來(lái)執(zhí)行被提交的任務(wù)惯驼。
- <font color = "orange">ScheduledThreadPoolExcecutor</font> 是一個(gè)實(shí)現(xiàn)類蹲嚣,可以在給定等延遲后運(yùn)行命令,或者定期執(zhí)行命令祟牲,ScheduledThreadPoolExcecutoe比 Timer 更靈活隙畜,功能更強(qiáng)大
- <font color = "orange">Future</font> Future接口和它的實(shí)現(xiàn)FutureTask類,代表異步計(jì)算的結(jié)果说贝。
- <font color = "orange">ExecutorService</font> :是一個(gè)線程池的實(shí)現(xiàn)
2.2 Executor 使用
- 祝線程首先要?jiǎng)?chuàng)建實(shí)現(xiàn) Runnable 或者 Callable 接口的任務(wù)對(duì)象
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("當(dāng)前線程 :" + Thread.currentThread().getName());
}
}).start();
}
- 把創(chuàng)建完成實(shí)現(xiàn) Runnable 或者 Callable 接口對(duì)象直接交給 ExecutorService 執(zhí)行 <font color = "orange">ExecutorService.execute(Runnable command))</font>或者也可以把 Runnable 對(duì)象或Callable 對(duì)象提交給 ExecutorService 執(zhí)行 <font color = "orange">ExecutorService.execute(Runnable command))</font>或 ExecutorService 執(zhí)行 <font color = "orange">ExecutorService.submit(Callable <T> task))</font>
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(5);
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("當(dāng)前線程:" + Thread.currentThread().getName());
}
});
Future<?> submit = executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println("當(dāng)前線程:" + Thread.currentThread().getName());
}
});
}
- 如果 執(zhí)行 ExecutorService.submit(…)议惰,將會(huì)返回一個(gè) Future<?> 對(duì)象
- 祝線程可以執(zhí)行 Future.get() 方法來(lái)等待任務(wù)執(zhí)行完成 也可以執(zhí)行 FutureTask.cancel(boolean mayInterruptIfRunning)來(lái)取消此任務(wù)的執(zhí)行。
三狂丝、ThreadPoolExecutor 解析
3.1 構(gòu)造方法
public ThreadPoolExecutor(int corePoolSize, //核心線程數(shù)
int maximumPoolSize, //最大線程數(shù)
long keepAliveTime, //線程數(shù)大于核心時(shí)换淆,多余線程存活時(shí)間
TimeUnit unit, //時(shí)間單位
BlockingQueue<Runnable> workQueue, //工作隊(duì)列
ThreadFactory threadFactory, //線程工廠
RejectedExecutionHandler handler //拒絕策略 哗总,線程過(guò)多的處理
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
參數(shù)詳解 :
- <font color = "orange">corePoolSize </font> :核心池的大小几颜。 當(dāng)有任務(wù)來(lái)之后,就會(huì)創(chuàng)建一個(gè)線程去執(zhí)行任務(wù)讯屈,當(dāng)線程池中的線程數(shù)目達(dá)到corePoolSize后蛋哭,就會(huì)把到達(dá)的任務(wù)放到緩存隊(duì)列當(dāng)中
- <font color = "orange">maximumPoolSize </font> :當(dāng)隊(duì)列中存放的任務(wù)達(dá)到隊(duì)列容量的時(shí)候,當(dāng)前可以同時(shí)運(yùn)行的線程數(shù)量變?yōu)樽畲缶€程數(shù)
- <font color = "orange">keepAliveTime</font> :當(dāng)線程池中的線程數(shù)量大于 <font color = "orange">corePoolSize </font> 的時(shí)候涮母,如果這時(shí)候沒(méi)有新的任務(wù)提交谆趾,核心線程外的線程不會(huì)立刻銷毀,而是等待 時(shí)間超過(guò)來(lái) <font color = "orange">keepAliveTime</font> 才會(huì)被銷毀
- <font color = "orange">unit</font> :<font color = "orange">keepAliveTime</font> 參數(shù)的時(shí)間單位叛本。
- <font color = "orange">workQueue </font> : 工作隊(duì)列 , 當(dāng)新任務(wù)來(lái)的時(shí)候會(huì)先判斷當(dāng)前運(yùn)行的線程數(shù)量是否達(dá)到核心心線程數(shù)沪蓬,如果達(dá)到的話,新任務(wù)就會(huì)被存放在隊(duì)列中
- <font color = "orange">threadFactory </font> : 線程工廠来候,用來(lái)創(chuàng)建線程
- <font color = "orange">handler </font> :拒絕策略跷叉,提交的任務(wù)過(guò)多而不能及時(shí)處理時(shí),我們可以定制策略來(lái)處理任務(wù)
為什么要這么設(shè)計(jì)呢 有了最大線程數(shù)营搅,為什么要設(shè)計(jì)核心池大小呢?
- 如果當(dāng)前線程池中的線程數(shù) < <font color = "orange">corePoolSize </font> 則每來(lái)一個(gè)任務(wù)云挟,就會(huì)超級(jí)愛(ài)你一個(gè)線程去執(zhí)行這個(gè)任務(wù)
- 如果當(dāng)前線程池中的線程數(shù) >= <font color = "orange">corePoolSize </font> , 則每來(lái)一個(gè)任務(wù),會(huì)將其添加到工作隊(duì)列中转质,若添加成功园欣,則等待 核心線程空閑將其取出執(zhí)行,若添加失敗 (一般隊(duì)列已滿)則在總數(shù) 不大于 <font color = "orange">maximumPoolSize </font> 的前提下休蟹,創(chuàng)建新的線程
- 如果當(dāng)前線程池中的線程數(shù)達(dá)到 <font color = "orange">maximumPoolSize </font> 沸枯,則會(huì)才用拒絕策略進(jìn)行處理
- 補(bǔ)充 : 如果當(dāng)前線程池的數(shù)量大于 <font color = "orange">corePoolSize </font> 時(shí)日矫,如果某個(gè)線程空閑時(shí)間超過(guò)<font color = "orange">keepAliveTime</font> ,線程將被銷毀辉饱,直至線程池中的線程數(shù)目不大于 <font color = "orange">corePoolSize </font>
ThreadPoolExecutor 拒絕策略
- <font color = "orange">ThreadPoolExecutor.AbortPolicy</font> :拋出 RejectedExecutionException來(lái)拒絕新任務(wù)的處理
- <font color = "orange">ThreadPoolExecutor.CallerRunsPolicy</font> :調(diào)用執(zhí)行自己的線程運(yùn)行任務(wù)搬男,也就是直接在調(diào)用 execute 方法的線程運(yùn)行(run)被拒絕的任務(wù),如果執(zhí)行程序已關(guān)閉彭沼,則會(huì)丟棄該任務(wù)缔逛。因此這種策略會(huì)降低對(duì)于新任務(wù)的提交速度,影響程序的整體性能姓惑。
- <font color = "orange">ThreadPoolExecutor.DiscardPolicy</font> :不處理新任務(wù)褐奴,直接丟棄掉。
- <font color = "orange">ThreadPoolExecutor.DiscardOldestPolicy</font> :此策略將丟棄最早的未處理的任務(wù)請(qǐng)求
3.2 自定義 ThreadPoolExecutor
在 《阿里巴巴 JAVA 開發(fā)手冊(cè)》明確指出線程資源利用線程池于毙,線程池不允許使用 Executors 去創(chuàng)建
<font color = 'red'>【強(qiáng)制】</font> 線程資源必須通過(guò)線程池提供敦冬,不允許在應(yīng)用中自行顯式創(chuàng)建線程。
說(shuō)明:線程池的好處是減少在創(chuàng)建和銷毀線程上所消耗的時(shí)間以及系統(tǒng)資源的開銷唯沮,解決資源不足的問(wèn)題脖旱。
如果不使用線程池,有可能造成系統(tǒng)創(chuàng)建大量同類線程而導(dǎo)致消耗完內(nèi)存或者“過(guò)度切換”的問(wèn)題介蛉。
<font color = 'red'>【強(qiáng)制】</font> 線程池不允許使用 Executors 去創(chuàng)建萌庆,而是通過(guò) ThreadPoolExecutor 的方式,這
樣的處理方式讓寫的同學(xué)更加明確線程池的運(yùn)行規(guī)則币旧,規(guī)避資源耗盡的風(fēng)險(xiǎn)践险。
說(shuō)明:Executors 返回的線程池對(duì)象的弊端如下:
- =1) FixedThreadPool 和 SingleThreadPool:
允許的請(qǐng)求隊(duì)列長(zhǎng)度為 Integer.MAX_VALUE,可能會(huì)堆積大量的請(qǐng)求吹菱,從而導(dǎo)致 OOM巍虫。- 2) CachedThreadPool:
允許的創(chuàng)建線程數(shù)量為 Integer.MAX_VALUE,可能會(huì)創(chuàng)建大量的線程鳍刷,從而導(dǎo)致 OOM占遥。
public class test {
private static final int CORE_POOL_SIZE = 5;
private static final int MAX_POOL_SIZE = 10;
private static final int QUEUE_CAPACITY = 20;
private static final Long KEEP_ALIVE_TIME = 1L;
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
new ThreadPoolExecutor.AbortPolicy()
);
for (int i = 0; i < 300; i++) {
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println("當(dāng)前線程:"+ Thread.currentThread().getName());
System.out.println("當(dāng)前狀態(tài):"+ Thread.currentThread().getState());
}
});
}
//終止線程池
threadPoolExecutor.shutdown();
}
}
因?yàn)槲覀兪褂玫? <font color = "orange">ThreadPoolExecutor.AbortPolicy</font> 的拒絕任務(wù),所以被拋出異常
OOM 案例:
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(100000);
System.out.println("開始執(zhí)行");
for (int i = 0; i < 100000000; i++) {
executorService.execute(() -> {
String payload = IntStream.rangeClosed(1, 1000000)
.mapToObj(__ -> "a") .collect(Collectors.joining("")) + UUID.randomUUID().toString();
System.out.println("等待一小時(shí)開始");
try {
TimeUnit.HOURS.sleep(1);
}catch (Exception e){
log.info(payload);
}
});
}
executorService.shutdown();
executorService.awaitTermination(1,TimeUnit.HOURS);
}
3.3 源碼分析
線程池狀態(tài):利用低29位表示線程池中線程數(shù)输瓜,通過(guò)高3位表示線程池的運(yùn)行狀態(tài):
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
- <font color = "orange">RUNNING</font>:運(yùn)行狀態(tài)瓦胎,接受新的任務(wù)并且處理隊(duì)列中的任務(wù)。-
- <font color = "orange">SHUTDOWN</font>:關(guān)閉狀態(tài)(調(diào)用了 shutdown 方法)前痘。不接受新任務(wù)凛捏,,但是要處理隊(duì)列
中的任務(wù)。 - <font color = "orange">STOP</font>:停止?fàn)顟B(tài)(調(diào)用了 shutdownNow 方法)芹缔。不接受新任務(wù)坯癣,也不處理隊(duì)列中的
任務(wù),并且要中斷正在處理的任務(wù)最欠。 - <font color = "orange">TIDYING</font>:所有的任務(wù)都已終止了示罗,workerCount 為 0惩猫,線程池進(jìn)入該狀態(tài)后會(huì)調(diào)terminated() 方法進(jìn)入 TERMINATED 狀態(tài)。
- <font color = "orange">TERMINATED</font>:終止?fàn)顟B(tài)蚜点,terminated() 方法調(diào)用結(jié)束后的狀態(tài)轧房。
//記錄線程池中線程狀態(tài)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//來(lái)獲取當(dāng)前線程數(shù)量
private static int workerCountOf(int c) { return c & CAPACITY; }
//工作隊(duì)列
private final BlockingQueue<Runnable> workQueue;
public void execute(Runnable command) {
//如果任務(wù)為null,則拋出異常
if (command == null)
throw new NullPointerException();
// 取的是記錄線程狀態(tài)
int c = ctl.get();
//判斷當(dāng)前線程池中之行的任務(wù)數(shù)量是否小于 corePoolSize
if (workerCountOf(c) < corePoolSize) {
//小于的話绍绘,通過(guò)addWorker(command, true)新建一個(gè)線程奶镶,并將任務(wù)(command)添加到該線程中
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果當(dāng)前之行的任務(wù)數(shù)量大于等于 corePoolSize 的時(shí)候就會(huì)走到這里
if (isRunning(c) && workQueue.offer(command)) {
// 再次獲取線程池狀態(tài),如果線程池狀態(tài)不是 RUNNING 狀態(tài)就需要從任務(wù)隊(duì)列中移除任務(wù)
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果當(dāng)前線程池為空就新創(chuàng)建一個(gè)線程并執(zhí)行陪拘。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 通過(guò)addWorker(command, false)新建一個(gè)線程
// 如果addWorker(command, false)執(zhí)行失敗厂镇,則通過(guò)reject()執(zhí)行相應(yīng)的拒絕策略的內(nèi)容。
else if (!addWorker(command, false))
reject(command);
}
addWorker() 方法
// Lock鎖
private final ReentrantLock mainLock = new ReentrantLock();
// 跟蹤線程池的最大大小
private int largestPoolSize;
// 工作線程集合
private final HashSet<Worker> workers = new HashSet<>();
//獲取線程池狀態(tài)
private static int runStateOf(int c) { return c & ~CAPACITY; }
//判斷線程池的狀態(tài)是否為 Running
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
private boolean addWorker(Runnable firstTask, boolean core) {
// CAS更新線程池?cái)?shù)量
retry:
for (;;) {
//獲取線程池狀態(tài)
int c = ctl.get();
int rs = runStateOf(c);
// 檢查queue是否為空
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//獲取線程池中線程的數(shù)量
int wc = workerCountOf(c);
// core參數(shù)為true的話表明隊(duì)列也滿了左刽,線程池大小變?yōu)?maximumPoolSize
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// //原子操作將workcount的數(shù)量加1
if (compareAndIncrementWorkerCount(c))
break retry;
// 如果線程的狀態(tài)改變了就再次執(zhí)行上述操作
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
// 標(biāo)記工作線程是否啟動(dòng)成功
boolean workerStarted = false;
// 標(biāo)記工作線程是否創(chuàng)建成功
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 加鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// rs < SHUTDOWN 如果線程池狀態(tài)依然為RUNNING,并且線程的狀態(tài)是存活的話捺信,就會(huì)將工作線程添加到工作線程集合中
// 或者 rs == SHUTDOWN 傳入的firstTask == null 添加新的worker
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
// 更新當(dāng)前工作線程的最大容量
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
// 釋放鎖
mainLock.unlock();
}
// 如果成功添加工作線程,則調(diào)用Worker內(nèi)部的線程實(shí)例t的Thread#start()方法啟動(dòng)真實(shí)的線程實(shí)例
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 失敗移除
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
runWorker() 方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 首先會(huì)通過(guò)run方法執(zhí)行firstTask欠痴,執(zhí)行完畢后會(huì)將task置為null
// 那么task迄靠!=null的判斷條件肯定不通過(guò),它就會(huì)嘗試通過(guò)getTask()喇辽,從任務(wù)隊(duì)列中獲取任務(wù)掌挚。
while (task != null || (task = getTask()) != null) {
// 每一次任務(wù)的執(zhí)行都必須獲取鎖來(lái)保證下方臨界區(qū)代碼的線程安全
w.lock();
//如果狀態(tài)值大于等于STOP(狀態(tài)值是有序的,即STOP茵臭、TIDYING疫诽、TERMINATED)且當(dāng)前線程還沒(méi)有被中斷舅世,則主動(dòng)中斷線程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//執(zhí)行任務(wù)前處理操作旦委,默認(rèn)是一個(gè)空實(shí)現(xiàn);在子類中可以通過(guò)重寫來(lái)改變?nèi)蝿?wù)執(zhí)行前的處理行為
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 任務(wù)后處理
afterExecute(task, thrown);
}
} finally {
//將task 變?yōu)閚ull,已處理完成
task = null;
// ++操作雏亚,已完成任務(wù)數(shù)
w.completedTasks++;
//解鎖
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 銷毀當(dāng)前的worker對(duì)象缨硝,并完成一些諸如完成任務(wù)數(shù)量統(tǒng)計(jì)之類的輔助性工作
// 在線程池當(dāng)前狀態(tài)小于STOP的情況下會(huì)創(chuàng)建一個(gè)新的worker來(lái)替換被銷毀的worker
processWorkerExit(w, completedAbruptly);
}
}
getTask() 方法
private Runnable getTask() {
// 通過(guò)timeOut變量表示線程是否空閑時(shí)間超時(shí)了
boolean timedOut = false; // Did the last poll() time out?
// 死循環(huán)
for (;;) {
// 獲取線程池狀態(tài)
int c = ctl.get();
int rs = runStateOf(c);
// 如果 線程池狀態(tài)>=STOP 則直接減少一個(gè)worker計(jì)數(shù)并返回null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
// 獲取線程池中的worker計(jì)數(shù)
int wc = workerCountOf(c);
// 判斷當(dāng)前線程是否會(huì)被超時(shí)銷毀
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果當(dāng)前線程數(shù)大于最大線程數(shù) 或者超時(shí) 或者阻塞隊(duì)列為空 減少worker計(jì)數(shù)并返回null
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//從阻塞隊(duì)列中取出一個(gè)任務(wù)(如果隊(duì)列為空會(huì)進(jìn)入阻塞等待狀態(tài))
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 如果線程不等于null,直接返回
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
四、常用線程池
4.1 FixedThreadPool
- <font color = "orange">FixedThreadPool</font> : 定長(zhǎng)線程池
public class Test {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 1; i < 5; i++) {
executorService.execute(()->{
System.out.println("當(dāng)前線程"+ Thread.currentThread().getName());
});
}
executorService.shutdown();
}
}
當(dāng)前線程pool-1-thread-1
當(dāng)前線程pool-1-thread-3
當(dāng)前線程pool-1-thread-2
當(dāng)前線程pool-1-thread-1
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
從上面源代碼可以看出新創(chuàng)建的 <font color = "orange">FixedThreadPool</font> 的 corePoolSize 和 maximumPoolSize 都被設(shè)置為 nThreads, 固定長(zhǎng)度
說(shuō)明:
- 如果當(dāng)前運(yùn)行的線程數(shù)小于 corePoolSize罢低, 如果再來(lái)新任務(wù)的話查辩,就創(chuàng)建新的線程來(lái)執(zhí)行任務(wù)
- 當(dāng)前運(yùn)行的線程數(shù)等于 corePoolSize 后, 如果再來(lái)新任務(wù)的話网持,會(huì)將任務(wù)加入 LinkedBlockingQueue
- 線程池中的線程執(zhí)行完 手頭的任務(wù)后宜岛,會(huì)在循環(huán)中反復(fù)從 LinkedBlockingQueue 中獲取任務(wù)來(lái)執(zhí)行
<font color ='red'>弊端 :
<font color = "orange">FixedThreadPool</font> 線程池使用的是 <font color = "orange">LinkedBlockingQueue</font> 無(wú)界隊(duì)列 ,(隊(duì)列的容量為 Intger.MAX_VALUE),在線程任務(wù)多的情況下會(huì)導(dǎo)致 OOM
4.2 SingleThreadExecutor
- <font color = "orange">SingleThreadExecutor</font> : 一個(gè)單線程化的線程池
public class Test {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 1; i < 5; i++) {
executorService.execute(()->{
System.out.println("當(dāng)前線程"+ Thread.currentThread().getName());
});
}
executorService.shutdown();
}
}
當(dāng)前線程pool-1-thread-1
當(dāng)前線程pool-1-thread-3
當(dāng)前線程pool-1-thread-2
當(dāng)前線程pool-1-thread-1
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
從上面源代碼可以看出新創(chuàng)建的 <font color = "orange">SingleThreadExecutor</font> 的 corePoolSize 和 maximumPoolSize 都被設(shè)置為 1, 固定長(zhǎng)度,其他參數(shù)和 <font color = "orange">FixedThreadPool</font> 相同功舀。
說(shuō)明:
- 如果當(dāng)前運(yùn)行的線程數(shù)少于 corePoolSize萍倡,則創(chuàng)建一個(gè)新的線程執(zhí)行任務(wù)
- 當(dāng)前線程池中有一個(gè)運(yùn)行的線程后,將任務(wù)加入 LinkedBlockingQueue
- 線程執(zhí)行完當(dāng)前的任務(wù)后辟汰,會(huì)在循環(huán)中反復(fù)從LinkedBlockingQueue 中獲取任務(wù)來(lái)執(zhí)行
<font color ='red'>弊端 :
<font color = "orange">SingleThreadExecutor</font> 線程池使用的是 <font color = "orange">LinkedBlockingQueue</font> 無(wú)界隊(duì)列 ,(隊(duì)列的容量為 Intger.MAX_VALUE)列敲,在線程任務(wù)多的情況下會(huì)導(dǎo)致 OOM
4.3 CachedThreadPool
- <font color = "orange">CachedThreadPool</font> : 緩存線程池
public class Test {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 1; i < 5; i++) {
executorService.execute(()->{
System.out.println("當(dāng)前線程"+ Thread.currentThread().getName());
});
}
executorService.shutdown();
}
}
當(dāng)前線程pool-1-thread-1
當(dāng)前線程pool-1-thread-3
當(dāng)前線程pool-1-thread-2
當(dāng)前線程pool-1-thread-1
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
CachedThreadPool 的 corePoolSize 被設(shè)置為空(0)阱佛,maximumPoolSize被設(shè)置為 Integer.MAX.VALUE,即它是無(wú)界的戴而,這也就意味著如果主線程提交任務(wù)的速度高于 maximumPool 中線程處理任務(wù)的速度時(shí)凑术,CachedThreadPool 會(huì)不斷創(chuàng)建新的線程。極端情況下所意,這樣會(huì)導(dǎo)致耗盡 cpu 和內(nèi)存資源淮逊。
說(shuō)明:
- 首先 SynchronousQueue 是一個(gè)生產(chǎn)消費(fèi)模式等阻塞任務(wù)隊(duì)列,只要有任務(wù)就需要有線程執(zhí)行扶踊,線程池中等線程可以重復(fù)使用
<font color ='red'>弊端 :
<font color = "orange">CachedThreadPool</font> 允許創(chuàng)建的線程數(shù)量為 Integer.MAX_VALUE 壮莹,可能會(huì)創(chuàng)建大量線程,從而導(dǎo)致 OOM姻檀。
4.3 ScheduledThreadPoolExecutor
- <font color = "orange">ScheduledThreadPoolExecutor</font> : 延遲線程池
public class Test {
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
for (int i = 1; i < 5; i++) {
scheduledExecutorService.schedule(()->{
System.out.println("當(dāng)前線程"+ Thread.currentThread().getName());
System.out.println("時(shí)間"+LocalDateTime.now());
},3,TimeUnit.SECONDS);
}
scheduledExecutorService.shutdown();
}
}
當(dāng)前線程pool-1-thread-1
時(shí)間2021-04-14T00:11:12.125
當(dāng)前線程pool-1-thread-1
時(shí)間2021-04-14T00:11:12.126
當(dāng)前線程pool-1-thread-1
時(shí)間2021-04-14T00:11:12.126
當(dāng)前線程pool-1-thread-1
時(shí)間2021-04-14T00:11:12.126
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
<font color = "orange">ScheduledThreadPoolExecutor</font> 主要用來(lái)在給定的延遲后運(yùn)行任務(wù)命满,或者定期執(zhí)行任務(wù)
<font color = "orange">ScheduledThreadPoolExecutor</font> 使用的任務(wù)隊(duì)列 DelayQueue 封裝了一個(gè) PriorityQueue,PriorityQueue 會(huì)對(duì)隊(duì)列中的任務(wù)進(jìn)行排序绣版,執(zhí)行所需時(shí)間短的放在前面先被執(zhí)行(ScheduledFutureTask 的 time 變量小的先執(zhí)行)胶台,如果執(zhí)行所需時(shí)間相同則先提交的任務(wù)將被先執(zhí)行(ScheduledFutureTask 的 squenceNumber 變量小的先執(zhí)行)。
說(shuō)明:
- 延遲定時(shí)任務(wù)線程池杂抽,有點(diǎn)像我們的定時(shí)任務(wù)诈唬。同樣,它也是一個(gè)無(wú)限大小的線程池 缩麸,Integer.MAX_VALUE铸磅。它提供的調(diào)用方法比較多,包括:scheduleAtFixedRate杭朱、scheduleWithFixedDelay阅仔,可以按需選擇延遲執(zhí)行方式。
個(gè)人博客地址:http://blog.yanxiaolong.cn ? |<font color = "orange"> 『縱有疾風(fēng)起弧械,人生不言棄』</font>