五 (重要)ThreadPoolExecutor 使用示例
我們上面講解了 Executor框架以及 ThreadPoolExecutor 類卫枝,下面讓我們實(shí)戰(zhàn)一下,來通過寫一個(gè) ThreadPoolExecutor 的小 Demo 來回顧上面的內(nèi)容位岔。
5.1 示例代碼:Runnable+ThreadPoolExecutor
首先創(chuàng)建一個(gè) Runnable 接口的實(shí)現(xiàn)類(當(dāng)然也可以是 Callable 接口,我們上面也說了兩者的區(qū)別堡牡。)
MyRunnable.java
import java.util.Date;
/**
* 這是一個(gè)簡單的Runnable類抒抬,需要大約5秒鐘來執(zhí)行其任務(wù)。
* @author shuang.kou
*/
public class MyRunnable implements Runnable {
private String command;
public MyRunnable(String s) {
this.command = s;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " Start. Time = " + new Date());
processCommand();
System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date());
}
private void processCommand() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String toString() {
return this.command;
}
}
編寫測試程序晤柄,我們這里以阿里巴巴推薦的使用 ThreadPoolExecutor 構(gòu)造函數(shù)自定義參數(shù)的方式來創(chuàng)建線程池擦剑。
ThreadPoolExecutorDemo.java
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExecutorDemo {
private static final int CORE_POOL_SIZE = 5;
private static final int MAX_POOL_SIZE = 10;
private static final int QUEUE_CAPACITY = 100;
private static final Long KEEP_ALIVE_TIME = 1L;
public static void main(String[] args) {
//使用阿里巴巴推薦的創(chuàng)建線程池的方式
//通過ThreadPoolExecutor構(gòu)造函數(shù)自定義參數(shù)創(chuàng)建
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 10; i++) {
//創(chuàng)建WorkerThread對象(WorkerThread類實(shí)現(xiàn)了Runnable 接口)
Runnable worker = new MyRunnable("" + i);
//執(zhí)行Runnable
executor.execute(worker);
}
//終止線程池
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("Finished all threads");
}
}
可以看到我們上面的代碼指定了:
corePoolSize: 核心線程數(shù)為 5。
maximumPoolSize :最大線程數(shù) 10
keepAliveTime : 等待時(shí)間為 1L。
unit: 等待時(shí)間的單位為 TimeUnit.SECONDS惠勒。
workQueue:任務(wù)隊(duì)列為 ArrayBlockingQueue赚抡,并且容量為 100;
handler:飽和策略為 CallerRunsPolicy。
Output:
pool-1-thread-2 Start. Time = Tue Nov 12 20:59:44 CST 2019
pool-1-thread-5 Start. Time = Tue Nov 12 20:59:44 CST 2019
pool-1-thread-4 Start. Time = Tue Nov 12 20:59:44 CST 2019
pool-1-thread-1 Start. Time = Tue Nov 12 20:59:44 CST 2019
pool-1-thread-3 Start. Time = Tue Nov 12 20:59:44 CST 2019
pool-1-thread-5 End. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-3 End. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-2 End. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-4 End. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-1 End. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-2 Start. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-1 Start. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-4 Start. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-3 Start. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-5 Start. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-2 End. Time = Tue Nov 12 20:59:54 CST 2019
pool-1-thread-3 End. Time = Tue Nov 12 20:59:54 CST 2019
pool-1-thread-4 End. Time = Tue Nov 12 20:59:54 CST 2019
pool-1-thread-5 End. Time = Tue Nov 12 20:59:54 CST 2019
pool-1-thread-1 End. Time = Tue Nov 12 20:59:54 CST 2019
5.2 線程池原理分析
承接 5.1 節(jié)纠屋,我們通過代碼輸出結(jié)果可以看出:線程池每次會(huì)同時(shí)執(zhí)行 5 個(gè)任務(wù)涂臣,這 5 個(gè)任務(wù)執(zhí)行完之后,剩余的 5 個(gè)任務(wù)才會(huì)被執(zhí)行售担。?大家可以先通過上面講解的內(nèi)容赁遗,分析一下到底是咋回事?(自己獨(dú)立思考一會(huì))
現(xiàn)在族铆,我們就分析上面的輸出內(nèi)容來簡單分析一下線程池原理岩四。
**為了搞懂線程池的原理,我們需要首先分析一下 execute方法哥攘。**在 5.1 節(jié)中的 Demo 中我們使用 executor.execute(worker)來提交一個(gè)任務(wù)到線程池中去炫乓,這個(gè)方法非常重要,下面我們來看看它的源碼:
// 存放線程池的運(yùn)行狀態(tài) (runState) 和線程池內(nèi)有效線程的數(shù)量 (workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int workerCountOf(int c) {
return c & CAPACITY;
}
private final BlockingQueue<Runnable> workQueue;
public void execute(Runnable command) {
// 如果任務(wù)為null献丑,則拋出異常末捣。
if (command == null)
throw new NullPointerException();
// ctl 中保存的線程池當(dāng)前的一些狀態(tài)信息
int c = ctl.get();
// 下面會(huì)涉及到 3 步 操作
// 1.首先判斷當(dāng)前線程池中之行的任務(wù)數(shù)量是否小于 corePoolSize
// 如果小于的話,通過addWorker(command, true)新建一個(gè)線程创橄,并將任務(wù)(command)添加到該線程中箩做;然后,啟動(dòng)該線程從而執(zhí)行任務(wù)妥畏。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2.如果當(dāng)前之行的任務(wù)數(shù)量大于等于 corePoolSize 的時(shí)候就會(huì)走到這里
// 通過 isRunning 方法判斷線程池狀態(tài)邦邦,線程池處于 RUNNING 狀態(tài)才會(huì)被并且隊(duì)列可以加入任務(wù),該任務(wù)才會(huì)被加入進(jìn)去
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次獲取線程池狀態(tài)醉蚁,如果線程池狀態(tài)不是 RUNNING 狀態(tài)就需要從任務(wù)隊(duì)列中移除任務(wù)燃辖,并嘗試判斷線程是否全部執(zhí)行完畢。同時(shí)執(zhí)行拒絕策略网棍。
if (!isRunning(recheck) && remove(command))
reject(command);
// 如果當(dāng)前線程池為空就新創(chuàng)建一個(gè)線程并執(zhí)行黔龟。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//3. 通過addWorker(command, false)新建一個(gè)線程,并將任務(wù)(command)添加到該線程中滥玷;然后氏身,啟動(dòng)該線程從而執(zhí)行任務(wù)。
//如果addWorker(command, false)執(zhí)行失敗惑畴,則通過reject()執(zhí)行相應(yīng)的拒絕策略的內(nèi)容蛋欣。
else if (!addWorker(command, false))
reject(command);
}
通過下圖可以更好的對上面這 3 步做一個(gè)展示
,現(xiàn)在如贷,讓我們在回到 5.1 節(jié)我們寫的 Demo陷虎, 現(xiàn)在應(yīng)該是不是很容易就可以搞懂它的原理了呢到踏?
沒搞懂的話,也沒關(guān)系尚猿,可以看看我的分析:
我們在代碼中模擬了 10 個(gè)任務(wù)夭禽,我們配置的核心線程數(shù)為 5 、等待隊(duì)列容量為 100 谊路,所以每次只可能存在 5 個(gè)任務(wù)同時(shí)執(zhí)行,剩下的 5 個(gè)任務(wù)會(huì)被放到等待隊(duì)列中去菩彬。當(dāng)前的 5 個(gè)任務(wù)之行完成后缠劝,才會(huì)之行剩下的 5 個(gè)任務(wù)。
5.3 幾個(gè)常見的對比
5.3.1 Runnable vs Callable
Runnable自 Java 1.0 以來一直存在骗灶,但Callable僅在 Java 1.5 中引入,目的就是為了來處理Runnable不支持的用例惨恭。Runnable 接口不會(huì)返回結(jié)果或拋出檢查異常,但是**Callable 接口**可以耙旦。所以脱羡,如果任務(wù)不需要返回結(jié)果或拋出異常推薦使用?Runnable 接口,這樣代碼看起來會(huì)更加簡潔免都。
工具類 Executors 可以實(shí)現(xiàn) Runnable 對象和 Callable 對象之間的相互轉(zhuǎn)換锉罐。(Executors.callable(Runnable task)或 Executors.callable(Runnable task,Object resule))绕娘。
Runnable.java
@FunctionalInterface
public interface Runnable {
/**
* 被線程執(zhí)行脓规,沒有返回值也無法拋出異常
*/
public abstract void run();
}
Callable.java
@FunctionalInterface
public interface Callable<V> {
/**
* 計(jì)算結(jié)果,或在無法這樣做時(shí)拋出異常险领。
* @return 計(jì)算得出的結(jié)果
* @throws 如果無法計(jì)算結(jié)果侨舆,則拋出異常
*/
V call() throws Exception;
}
5.3.2 execute() vs submit()
execute()方法用于提交不需要返回值的任務(wù),所以無法判斷任務(wù)是否被線程池執(zhí)行成功與否绢陌;
submit()方法用于提交需要返回值的任務(wù)挨下。線程池會(huì)返回一個(gè) Future 類型的對象,通過這個(gè) Future 對象可以判斷任務(wù)是否執(zhí)行成功脐湾,并且可以通過 Future 的 get()方法來獲取返回值臭笆,get()方法會(huì)阻塞當(dāng)前線程直到任務(wù)完成,而使用 get(long timeout秤掌,TimeUnit unit)方法則會(huì)阻塞當(dāng)前線程一段時(shí)間后立即返回耗啦,這時(shí)候有可能任務(wù)沒有執(zhí)行完。
我們以**AbstractExecutorService**接口中的一個(gè) submit 方法為例子來看看源代碼:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
上面方法調(diào)用的 newTaskFor 方法返回了一個(gè) FutureTask 對象机杜。
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
我們再來看看execute()方法:
public void execute(Runnable command) {
...
}
5.3.3 shutdown()VSshutdownNow()
shutdown()?:關(guān)閉線程池帜讲,線程池的狀態(tài)變?yōu)?SHUTDOWN。線程池不再接受新任務(wù)了椒拗,但是隊(duì)列里的任務(wù)得執(zhí)行完畢似将。
shutdownNow()?:關(guān)閉線程池获黔,線程的狀態(tài)變?yōu)?STOP。線程池會(huì)終止當(dāng)前正在運(yùn)行的任務(wù)在验,并停止處理排隊(duì)的任務(wù)并返回正在等待執(zhí)行的 List玷氏。
5.3.2 isTerminated() VS isShutdown()
isShutDown?當(dāng)調(diào)用 shutdown() 方法后返回為 true。
isTerminated?當(dāng)調(diào)用 shutdown() 方法后腋舌,并且所有提交的任務(wù)完成后返回為 true
5.4 加餐:Callable+ThreadPoolExecutor示例代碼
MyCallable.java
import java.util.concurrent.Callable;
public class MyCallable implements Callable<String> {
@Override
public String call() throws Exception {
Thread.sleep(1000);
//返回執(zhí)行當(dāng)前 Callable 的線程名字
return Thread.currentThread().getName();
}
}
CallableDemo.java
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class CallableDemo {
private static final int CORE_POOL_SIZE = 5;
private static final int MAX_POOL_SIZE = 10;
private static final int QUEUE_CAPACITY = 100;
private static final Long KEEP_ALIVE_TIME = 1L;
public static void main(String[] args) {
//使用阿里巴巴推薦的創(chuàng)建線程池的方式
//通過ThreadPoolExecutor構(gòu)造函數(shù)自定義參數(shù)創(chuàng)建
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
new ThreadPoolExecutor.CallerRunsPolicy());
List<Future<String>> futureList = new ArrayList<>();
Callable<String> callable = new MyCallable();
for (int i = 0; i < 10; i++) {
//提交任務(wù)到線程池
Future<String> future = executor.submit(callable);
//將返回值 future 添加到 list盏触,我們可以通過 future 獲得 執(zhí)行 Callable 得到的返回值
futureList.add(future);
}
for (Future<String> fut : futureList) {
try {
System.out.println(new Date() + "::" + fut.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
//關(guān)閉線程池
executor.shutdown();
}
}
Output:
Wed Nov 13 13:40:41 CST 2019::pool-1-thread-1
Wed Nov 13 13:40:42 CST 2019::pool-1-thread-2
Wed Nov 13 13:40:42 CST 2019::pool-1-thread-3
Wed Nov 13 13:40:42 CST 2019::pool-1-thread-4
Wed Nov 13 13:40:42 CST 2019::pool-1-thread-5
Wed Nov 13 13:40:42 CST 2019::pool-1-thread-3
Wed Nov 13 13:40:43 CST 2019::pool-1-thread-2
Wed Nov 13 13:40:43 CST 2019::pool-1-thread-1
Wed Nov 13 13:40:43 CST 2019::pool-1-thread-4
Wed Nov 13 13:40:43 CST 2019::pool-1-thread-5
六 ScheduledThreadPoolExecutor 詳解
ScheduledThreadPoolExecutor 主要用來在給定的延遲后運(yùn)行任務(wù),或者定期執(zhí)行任務(wù)块饺。這個(gè)在實(shí)際項(xiàng)目中基本不會(huì)被用到赞辩,所以對這部分大家只需要簡單了解一下它的思想。關(guān)于如何在Spring Boot 中 實(shí)現(xiàn)定時(shí)任務(wù)授艰,可以查看這篇文章《5分鐘搞懂如何在Spring Boot中Schedule Tasks》辨嗽。
6.1 簡介
ScheduledThreadPoolExecutor 使用的任務(wù)隊(duì)列 DelayQueue 封裝了一個(gè) PriorityQueue,PriorityQueue 會(huì)對隊(duì)列中的任務(wù)進(jìn)行排序淮腾,執(zhí)行所需時(shí)間短的放在前面先被執(zhí)行(ScheduledFutureTask 的 time 變量小的先執(zhí)行)糟需,如果執(zhí)行所需時(shí)間相同則先提交的任務(wù)將被先執(zhí)行(ScheduledFutureTask 的 squenceNumber 變量小的先執(zhí)行)。
ScheduledThreadPoolExecutor 和 Timer 的比較:
Timer 對系統(tǒng)時(shí)鐘的變化敏感谷朝,ScheduledThreadPoolExecutor不是洲押;
Timer 只有一個(gè)執(zhí)行線程,因此長時(shí)間運(yùn)行的任務(wù)可以延遲其他任務(wù)圆凰。 ScheduledThreadPoolExecutor 可以配置任意數(shù)量的線程诅诱。 此外,如果你想(通過提供 ThreadFactory)送朱,你可以完全控制創(chuàng)建的線程;
在TimerTask 中拋出的運(yùn)行時(shí)異常會(huì)殺死一個(gè)線程娘荡,從而導(dǎo)致 Timer 死機(jī):-( ...即計(jì)劃任務(wù)將不再運(yùn)行。ScheduledThreadExecutor 不僅捕獲運(yùn)行時(shí)異常驶沼,還允許您在需要時(shí)處理它們(通過重寫 afterExecute 方法ThreadPoolExecutor)炮沐。拋出異常的任務(wù)將被取消,但其他任務(wù)將繼續(xù)運(yùn)行回怜。
綜上大年,在 JDK1.5 之后,你沒有理由再使用 Timer 進(jìn)行任務(wù)調(diào)度了玉雾。
備注:?Quartz 是一個(gè)由 java 編寫的任務(wù)調(diào)度庫翔试,由 OpenSymphony 組織開源出來。在實(shí)際項(xiàng)目開發(fā)中使用 Quartz 的還是居多复旬,比較推薦使用 Quartz垦缅。因?yàn)?Quartz 理論上能夠同時(shí)對上萬個(gè)任務(wù)進(jìn)行調(diào)度,擁有豐富的功能特性驹碍,包括任務(wù)調(diào)度、任務(wù)持久化、可集群化润脸、插件等等。
6.2 運(yùn)行機(jī)制
ScheduledThreadPoolExecutor 的執(zhí)行主要分為兩大部分:
當(dāng)調(diào)用 ScheduledThreadPoolExecutor 的?scheduleAtFixedRate()?方法或者**scheduleWirhFixedDelay()** 方法時(shí)嚼酝,會(huì)向 ScheduledThreadPoolExecutor 的?DelayQueue?添加一個(gè)實(shí)現(xiàn)了?RunnableScheduledFuture?接口的?ScheduledFutureTask?。
線程池中的線程從 DelayQueue 中獲取 ScheduledFutureTask竟坛,然后執(zhí)行任務(wù)闽巩。
ScheduledThreadPoolExecutor 為了實(shí)現(xiàn)周期性的執(zhí)行任務(wù),對 ThreadPoolExecutor 做了如下修改:
使用?DelayQueue?作為任務(wù)隊(duì)列担汤;
獲取任務(wù)的方不同
執(zhí)行周期任務(wù)后涎跨,增加了額外的處理
6.3 ScheduledThreadPoolExecutor 執(zhí)行周期任務(wù)的步驟
線程 1 從 DelayQueue 中獲取已到期的 ScheduledFutureTask(DelayQueue.take())。到期任務(wù)是指 ScheduledFutureTask 的 time 大于等于當(dāng)前系統(tǒng)的時(shí)間漫试;
線程 1 執(zhí)行這個(gè) ScheduledFutureTask;
線程 1 修改 ScheduledFutureTask 的 time 變量為下次將要被執(zhí)行的時(shí)間碘赖;
線程 1 把這個(gè)修改 time 之后的 ScheduledFutureTask 放回 DelayQueue 中(DelayQueue.add())驾荣。
大家記得關(guān)注主頁(上篇)