Java高并發(fā)系列——線程池
JAVA線程池
線程池實現(xiàn)原理
類似于一個工廠的運作充尉。
當(dāng)向線程池提交一個任務(wù)之后飘言,線程池的處理流程如下:
- 判斷是否達到核心線程數(shù),若未達到驼侠,則直接創(chuàng)建新的線程處理當(dāng)前傳入的任務(wù)姿鸿,否則進入下個流程
- 線程池中的工作隊列是否已滿,若未滿倒源,則將任務(wù)丟入工作隊列中先存著等待處理般妙,否則進入下個流程
- 是否達到最大線程數(shù),若未達到相速,則創(chuàng)建新的線程處理當(dāng)前傳入的任務(wù)碟渺,否則交給線程池中的飽和策略進行處理。
java中的線程池
jdk中提供了線程池的具體實現(xiàn)突诬,實現(xiàn)類是:java.util.concurrent.ThreadPoolExecutor
苫拍,主要構(gòu)造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize:核心線程大小,當(dāng)提交一個任務(wù)到線程池時旺隙,線程池會創(chuàng)建一個線程來執(zhí)行任務(wù)绒极,即使有其他空閑線程可以處理任務(wù)也會創(chuàng)新線程,等到工作的線程數(shù)大于核心線程數(shù)時就不會在創(chuàng)建了蔬捷。如果調(diào)用了線程池的prestartAllCoreThreads
方法垄提,線程池會提前把核心線程都創(chuàng)造好,并啟動周拐。(prestartCoreThread:啟動一個核心線程或 prestartAllCoreThreads:啟動全部核心線程 )
maximumPoolSize:線程池允許創(chuàng)建的最大線程數(shù)铡俐。如果隊列滿了,并且以創(chuàng)建的線程數(shù)小于最大線程數(shù)妥粟,則線程池會再創(chuàng)建新的線程執(zhí)行任務(wù)审丘。如果我們使用了無界隊列(或者大小是Integer.MAX_VALUE,可能還沒達到就OOM了)勾给,那么所有的任務(wù)會加入隊列滩报,這個參數(shù)就沒有什么效果了锅知。
keepAliveTime:線程池的工作線程空閑后,保持存活的時間脓钾。如果沒有任務(wù)處理了售睹,有些線程會空閑,空閑的時間超過了這個值可训,會被回收掉昌妹。如果任務(wù)很多,并且每個任務(wù)的執(zhí)行時間比較短沉噩,避免線程重復(fù)創(chuàng)建和回收捺宗,可以調(diào)大這個時間,提高線程的利用率
unit:keepAliveTIme的時間單位川蒙,可以選擇的單位有天蚜厉、小時、分鐘畜眨、毫秒昼牛、微妙、千分之一毫秒和納秒康聂。類型是一個枚舉java.util.concurrent.TimeUnit
贰健,這個枚舉也經(jīng)常使用,有興趣的可以看一下其源碼
workQueue:工作隊列恬汁,用于緩存待處理任務(wù)的阻塞隊列伶椿,常見的有4種(ArrayBlockingQueue 、LinkedBlockingQueue 氓侧、SynchronousQueue 脊另、PriorityBlockingQueue )
threadFactory:線程池中創(chuàng)建線程的工廠,可以通過線程工廠給每個創(chuàng)建出來的線程設(shè)置更有意義的名字
handler:飽和策略约巷,當(dāng)線程池?zé)o法處理新來的任務(wù)了偎痛,那么需要提供一種策略處理提交的新任務(wù),默認有4種策略(AbortPolicy 独郎、CallerRunsPolicy 踩麦、DiscardOldestPolicy、DiscardPolicy )
調(diào)用線程池的execute方法處理任務(wù)氓癌,執(zhí)行execute方法的過程:
- 判斷線程池中運行的線程數(shù)是否小于corepoolsize谓谦,是:則創(chuàng)建新的線程來處理任務(wù),否:執(zhí)行下一步
- 試圖將任務(wù)添加到workQueue指定的隊列中顽铸,如果無法添加到隊列茁计,進入下一步
- 判斷線程池中運行的線程數(shù)是否小于
maximumPoolSize
,是:則新增線程處理當(dāng)前傳入的任務(wù)谓松,否:將任務(wù)傳遞給handler
對象rejectedExecution
方法處理
線程池的使用步驟:
- 調(diào)用構(gòu)造方法創(chuàng)建線程池
- 調(diào)用線程池的方法處理任務(wù)
- 關(guān)閉線程池
線程池中常見5種工作隊列
任務(wù)太多的時候星压,工作隊列用于暫時緩存待處理的任務(wù),jdk中常見的4種阻塞隊列:
ArrayBlockingQueue:是一個基于數(shù)組結(jié)構(gòu)的有界阻塞隊列鬼譬,此隊列按照先進先出原則對元素進行排序
LinkedBlockingQueue:是一個基于鏈表結(jié)構(gòu)的阻塞隊列娜膘,此隊列按照先進先出排序元素,吞吐量通常要高于ArrayBlockingQueue优质。靜態(tài)工廠方法Executors.newFixedThreadPool
使用了這個隊列竣贪。
SynchronousQueue :一個不存儲元素的阻塞隊列,每個插入操作必須等到另外一個線程調(diào)用移除操作巩螃,否則插入操作一直處理阻塞狀態(tài)演怎,吞吐量通常要高于LinkedBlockingQueue,靜態(tài)工廠方法Executors.newCachedThreadPool
使用這個隊列避乏。
PriorityBlockingQueue:優(yōu)先級隊列爷耀,進入隊列的元素按照優(yōu)先級會進行排序。
SynchronousQueue隊列的線程池
使用Executors.newCachedThreadPool()
創(chuàng)建線程池拍皮,看一下的源碼:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newCachedThreadPool()
使用了SynchronousQueue
同步隊列歹叮,這種隊列比較特殊,放入元素必須要有另外一個線程去獲取這個元素铆帽,否則放入元素會失敗或者一直阻塞在那里直到有線程取走咆耿,示例中任務(wù)處理休眠了指定的時間,導(dǎo)致已創(chuàng)建的工作線程都忙于處理任務(wù)爹橱,所以新來任務(wù)之后萨螺,將任務(wù)丟入同步隊列會失敗,丟入隊列失敗之后愧驱,會嘗試新建線程處理任務(wù)慰技。使用上面的方式創(chuàng)建線程池需要注意,如果需要處理的任務(wù)比較耗時冯键,會導(dǎo)致新來的任務(wù)都會創(chuàng)建新的線程進行處理惹盼,可能會導(dǎo)致創(chuàng)建非常多的線程,最終耗盡系統(tǒng)資源惫确,觸發(fā)OOM手报。
//SynchronousQueue隊列默認是false,采用先進后出的棧處理,也可以是公平隊列先進先出改化。
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
PriorityBlockingQueue優(yōu)先級隊列的線程池
輸出中掩蛤,除了第一個任務(wù),其他任務(wù)按照優(yōu)先級高低按順序處理陈肛。原因在于:創(chuàng)建線程池的時候使用了優(yōu)先級隊列揍鸟,進入隊列中的任務(wù)會進行排序,任務(wù)的先后順序由Task中的i變量決定句旱。向PriorityBlockingQueue
加入元素的時候阳藻,內(nèi)部會調(diào)用代碼中Task的compareTo
方法決定元素的先后順序晰奖。
示例:
public class ThreadPoolExecutorPriorityTest {
/**
* 優(yōu)先級隊列執(zhí)行的任務(wù)要實現(xiàn)Comparable比較
*/
static class Task implements Runnable, Comparable<Task> {
private int i;
private String name;
public Task(int i, String name) {
this.i = i;
this.name = name;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "處理" + this.name);
}
@Override
public int compareTo(Task o) {
return Integer.compare(o.i, this.i);
}
}
//自定義線程工廠,優(yōu)先級隊列
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 60,
TimeUnit.SECONDS, new PriorityBlockingQueue<>(), new DemoThreadFactory("訂單創(chuàng)建組"), new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) {
for (int i = 1; i <= 10; i++) {
int j = i;
String taskName = "task" + j;
executor.execute(new Task(j,taskName));
}
for (int i = 90; i <= 100; i++) {
int j = i;
String taskName = "task" + j;
executor.execute(new Task(j,taskName));
}
executor.shutdown();
}
}
輸出:
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task1
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task100
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task99
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task98
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task97
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task96
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task95
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task94
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task93
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task92
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task91
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task90
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task10
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task9
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task8
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task7
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task6
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task5
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task4
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task3
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task2
自定義創(chuàng)建線程的工廠
給線程池中線程起一個有意義的名字,在系統(tǒng)出現(xiàn)問題的時候腥泥,通過線程堆棧信息可以更容易發(fā)現(xiàn)系統(tǒng)中問題所在匾南。通過jstack查看線程的堆棧信息,也可以看到我們自定義的名稱 蛔外。
自定義創(chuàng)建工廠需要實現(xiàn)java.util.concurrent.ThreadFactory
接口中的Thread newThread(Runnable r)
方法蛆楞,參數(shù)為傳入的任務(wù),需要返回一個工作線程夹厌。
示例:
public class ThreadPoolExecutorTest {
//默認線程創(chuàng)建
/* private static ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(15), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());*/
//自定義線程工廠1
/* private static final AtomicInteger nextId = new AtomicInteger(1);
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(12), (r -> {
Thread t = new Thread(r);
t.setName("示范線程" + nextId.getAndIncrement());
return t;
}), new ThreadPoolExecutor.AbortPolicy());*/
//自定義線程工廠2 豹爹,推薦
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(15), new DemoThreadFactory("訂單創(chuàng)建組"), new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) {
//提前啟動所有核心線程
executor.prestartAllCoreThreads();
//提前啟動一個核心線程
executor.prestartCoreThread();
for (int i = 1; i <= 20; i++) {
int j = i;
String taskName = "task" + j;
executor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(j);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "線程執(zhí)行" + taskName + "完畢!");
});
}
executor.shutdown();
}
}
輸出:
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1線程執(zhí)行task1完畢矛纹!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-3線程執(zhí)行task2完畢臂聋!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-2線程執(zhí)行task3完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-4線程執(zhí)行task4完畢崖技!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-5線程執(zhí)行task5完畢逻住!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1線程執(zhí)行task6完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-3線程執(zhí)行task7完畢迎献!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-2線程執(zhí)行task8完畢瞎访!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-4線程執(zhí)行task9完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-5線程執(zhí)行task10完畢吁恍!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-6線程執(zhí)行task17完畢扒秸!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1線程執(zhí)行task11完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-7線程執(zhí)行task20完畢冀瓦!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-3線程執(zhí)行task12完畢伴奥!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-2線程執(zhí)行task13完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-4線程執(zhí)行task14完畢翼闽!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-5線程執(zhí)行task15完畢拾徙!
四種常見飽和策略
當(dāng)線程池中隊列已滿,并且線程池已達到最大線程數(shù)感局,線程池會將任務(wù)傳遞給飽和策略進行處理尼啡。這些策略都實現(xiàn)了RejectedExecutionHandler
接口。接口中有個方法:
void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
參數(shù)說明:
r:需要執(zhí)行的任務(wù)
executor:當(dāng)前線程池對象
JDK中提供了4種常見的飽和策略:
AbortPolicy:直接拋出異常询微。
CallerRunsPolicy:在當(dāng)前調(diào)用者的線程中運行任務(wù)崖瞭,即誰丟來的任務(wù),由他自己去處理撑毛。
DiscardOldestPolicy:丟棄隊列中最老的一個任務(wù)书聚,即丟棄隊列頭部的一個任務(wù),然后執(zhí)行當(dāng)前傳入的任務(wù)。
DiscardPolicy:不處理雌续,直接丟棄掉斩个,方法內(nèi)部為空。
解釋:
//自定義線程工廠
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 5, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(5),
new DemoThreadFactory("訂單創(chuàng)建組"), new ThreadPoolExecutor.CallerRunsPolicy());
AbortPolicy:直接拋出異常西雀。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
輸出:到飽和策略時拋出異常記錄萨驶,丟棄掉任務(wù)11個歉摧。
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.self.current.ThreadPoolExecutorTest$$Lambda$1/1915503092@50134894 rejected from java.util.concurrent.ThreadPoolExecutor@2957fcb0[Running, pool size = 5, active threads = 4, queued tasks = 5, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.self.current.ThreadPoolExecutorTest.main(ThreadPoolExecutorTest.java:47)
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1線程執(zhí)行task1完畢艇肴!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1線程執(zhí)行task2完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-2線程執(zhí)行task6完畢叁温!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1線程執(zhí)行task3完畢再悼!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-3線程執(zhí)行task7完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-4線程執(zhí)行task8完畢膝但!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-5線程執(zhí)行task9完畢冲九!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-2線程執(zhí)行task4完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1線程執(zhí)行task5完畢跟束!
CallerRunsPolicy:在當(dāng)前調(diào)用者的線程中運行任務(wù)莺奸,即隨丟來的任務(wù),由他自己去處理冀宴。如main方法調(diào)用的線程池灭贷,則如果走到飽和策略處理時,由main方法處理這個任務(wù)略贮。不會丟棄任何一個任務(wù)甚疟,但執(zhí)行會變得很慢。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
輸出:
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1線程執(zhí)行task1完畢逃延!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1線程執(zhí)行task2完畢览妖!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-2線程執(zhí)行task6完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1線程執(zhí)行task3完畢揽祥!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-3線程執(zhí)行task8完畢讽膏!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-4線程執(zhí)行task9完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-2線程執(zhí)行task4完畢拄丰!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-5線程執(zhí)行task10完畢府树!
main線程執(zhí)行task11完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1線程執(zhí)行task5完畢愈案!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-3線程執(zhí)行task7完畢挺尾!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-4線程執(zhí)行task12完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-2線程執(zhí)行task13完畢站绪!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-5線程執(zhí)行task14完畢遭铺!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1線程執(zhí)行task15完畢!
main線程執(zhí)行task17完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-3線程執(zhí)行task16完畢魂挂!
DiscardOldestPolicy:丟棄隊列中最老的一個任務(wù)甫题,即丟棄隊列頭部的一個任務(wù),然后執(zhí)行當(dāng)前傳入的任務(wù)涂召。這時候線程池會在執(zhí)行到飽和策略時丟棄掉頭部最老的認為坠非,沒有任何記錄,任務(wù)就丟掉了果正。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
輸出:20個任務(wù)被無聲無息地丟掉了11個
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-2線程執(zhí)行task6完畢炎码!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-3線程執(zhí)行task7完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-4線程執(zhí)行task8完畢秋泳!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-5線程執(zhí)行task9完畢潦闲!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1線程執(zhí)行task16完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-2線程執(zhí)行task17完畢迫皱!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-3線程執(zhí)行task18完畢歉闰!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-4線程執(zhí)行task19完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-5線程執(zhí)行task20完畢卓起!
DiscardPolicy:不處理和敬,直接丟棄掉,方法內(nèi)部為空戏阅。沒處理Runnable r就表示丟棄了昼弟。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
輸出:20個任務(wù)被無聲無息地丟掉了10個
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1線程執(zhí)行task1完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1線程執(zhí)行task2完畢饲握!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1線程執(zhí)行task3完畢私杜!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-2線程執(zhí)行task7完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-3線程執(zhí)行task8完畢救欧!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-4線程執(zhí)行task9完畢衰粹!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-5線程執(zhí)行task10完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1線程執(zhí)行task4完畢笆怠!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-2線程執(zhí)行task5完畢铝耻!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-3線程執(zhí)行task6完畢!
自定義飽和策略
需要實現(xiàn)RejectedExecutionHandler
接口蹬刷。任務(wù)無法處理的時候瓢捉,我們想記錄一下日志,我們需要自定義一個飽和策略办成。記錄了任務(wù)的日志泡态,對于無法處理多任務(wù),我們最好能夠記錄一下迂卢,讓開發(fā)人員能夠知道某弦。 任務(wù)進入了飽和策略桐汤,說明線程池的配置可能不是太合理,或者機器的性能有限靶壮,需要做一些優(yōu)化調(diào)整怔毛。
實例:
public class ThreadPoolExecutorRejectHandlerTest {
static class Task implements Runnable {
String name;
public Task(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "處理" + this.name);
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String toString() {
return "Task{" +
"name='" + name + '\'' +
'}';
}
}
//自定義包含策略:可以直接用函數(shù)式方法定義,也可以實現(xiàn)RejectedExecutionHandler自定義
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 5, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(5),
new DemoThreadFactory("訂單創(chuàng)建組"), (r,executor)->{
//自定義飽和策略
//記錄一下無法處理的任務(wù)
System.out.println("無法處理的任務(wù):" + r.toString());
});
public static void main(String[] args) {
//提前啟動所有核心線程
executor.prestartAllCoreThreads();
//提前啟動一個核心線程
executor.prestartCoreThread();
for (int i = 1; i <= 20; i++) {
int j = i;
String taskName = "task" + j;
executor.execute(new Task(taskName));
}
executor.shutdown();
}
}
輸出:
無法處理的任務(wù):Task{name='task10'}
無法處理的任務(wù):Task{name='task11'}
無法處理的任務(wù):Task{name='task12'}
無法處理的任務(wù):Task{name='task13'}
無法處理的任務(wù):Task{name='task14'}
無法處理的任務(wù):Task{name='task15'}
無法處理的任務(wù):Task{name='task16'}
無法處理的任務(wù):Task{name='task17'}
無法處理的任務(wù):Task{name='task18'}
無法處理的任務(wù):Task{name='task19'}
無法處理的任務(wù):Task{name='task20'}
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task1
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-2處理task6
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-3處理task7
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-4處理task8
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-5處理task9
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-2處理task2
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task3
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-4處理task5
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-3處理task4
線程池中的2個關(guān)閉方法
線程池提供了2個關(guān)閉方法:shutdown
和shutdownNow
腾降,當(dāng)調(diào)用者兩個方法之后拣度,線程池會遍歷內(nèi)部的工作線程,然后調(diào)用每個工作線程的interrrupt方法給線程發(fā)送中斷信號螃壤,內(nèi)部如果無法響應(yīng)中斷信號的可能永遠無法終止抗果,所以如果內(nèi)部有無線循環(huán)的,最好在循環(huán)內(nèi)部檢測一下線程的中斷信號映穗,合理的退出窖张。調(diào)用者兩個方法中任意一個,線程池的isShutdown
方法(是否執(zhí)行了關(guān)閉線程池命令)就會返回true蚁滋,當(dāng)所有的任務(wù)線程都關(guān)閉之后,才表示線程池關(guān)閉成功赘淮,這時調(diào)用isTerminaed
方法(是否關(guān)閉成功)會返回true辕录。
調(diào)用shutdown
方法之后,線程池將不再接受新任務(wù)梢卸,內(nèi)部會將所有已提交的任務(wù)處理完畢走诞,處理完畢之后,工作線程自動退出蛤高。
而調(diào)用shutdownNow
方法后蚣旱,線程池會將還未處理的(在隊里等待處理的任務(wù))任務(wù)移除,將正在處理中的處理完畢之后戴陡,工作線程自動退出塞绿。
至于調(diào)用哪個方法來關(guān)閉線程,應(yīng)該由提交到線程池的任務(wù)特性決定恤批,多數(shù)情況下調(diào)用shutdown
方法來關(guān)閉線程池异吻,如果任務(wù)不一定要執(zhí)行完,則可以調(diào)用shutdownNow
方法喜庞。
擴展線程池
ThreadPoolExecutor
內(nèi)部提供了幾個方法beforeExecute
诀浪、afterExecute
、terminated
延都,可以由開發(fā)人員自己去重寫實現(xiàn)這些方法雷猪。
看一下線程池內(nèi)部的源碼:
try {
beforeExecute(wt, task);//任務(wù)執(zhí)行之前調(diào)用的方法
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
afterExecute(task, thrown);//任務(wù)執(zhí)行完畢之后調(diào)用的方法
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
beforeExecute:任務(wù)執(zhí)行之前調(diào)用的方法,有2個參數(shù)晰房,第1個參數(shù)是執(zhí)行任務(wù)的線程求摇,第2個參數(shù)是任務(wù)
protected void beforeExecute(Thread t, Runnable r) { }
afterExecute:任務(wù)執(zhí)行完成之后調(diào)用的方法酵颁,2個參數(shù),第1個參數(shù)表示任務(wù)月帝,第2個參數(shù)表示任務(wù)執(zhí)行時的異常信息躏惋,如果無異常,第二個參數(shù)為null
protected void afterExecute(Runnable r, Throwable t) { }
terminated:線程池最終關(guān)閉之后調(diào)用的方法嚷辅。所有的工作線程都退出了簿姨,最終線程池會退出,退出時調(diào)用該方法
實例:
public class ThreadPoolExecutorExtensionTest {
static class Task implements Runnable {
String name;
public Task(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "處理" + this.name);
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String toString() {
return "Task{" +
"name='" + name + '\'' +
'}';
}
}
//擴展線程池,可以繼承也可以直接重寫
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(15),
new DemoThreadFactory("訂單創(chuàng)建組"), new ThreadPoolExecutor.AbortPolicy()){
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println(t.getName() + ",開始執(zhí)行任務(wù):" + r.toString());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println(Thread.currentThread().getName() + ",任務(wù):" + r.toString() + ",執(zhí)行完畢!");
}
@Override
protected void terminated() {
System.out.println(Thread.currentThread().getName() + ",關(guān)閉線程池!");
}
};
public static void main(String[] args) {
for (int i = 1; i <= 3; i++) {
int j = i;
String taskName = "task" + j;
executor.execute(new Task(taskName));
}
executor.shutdown();
}
}
輸出:
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1,開始執(zhí)行任務(wù):Task{name='task1'}
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task1
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-2,開始執(zhí)行任務(wù):Task{name='task2'}
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-2處理task2
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-3,開始執(zhí)行任務(wù):Task{name='task3'}
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-3處理task3
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1,任務(wù):Task{name='task1'}耘纱,執(zhí)行完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-2,任務(wù):Task{name='task2'}贷岸,執(zhí)行完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-3,任務(wù):Task{name='task3'},執(zhí)行完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-3糊探,關(guān)閉線程池!
合理地配置線程池
要想合理的配置線程池,需要先分析任務(wù)的特性,可以沖一下四個角度分析:
- 任務(wù)的性質(zhì):CPU密集型任務(wù)暇务、IO密集型任務(wù)和混合型任務(wù)
- 任務(wù)的優(yōu)先級:高、中怔软、低
- 任務(wù)的執(zhí)行時間:長垦细、中、短
- 任務(wù)的依賴性:是否依賴其他的系統(tǒng)資源挡逼,如數(shù)據(jù)庫連接括改。
性質(zhì)不同任務(wù)可以用不同規(guī)模的線程池分開處理。CPU密集型任務(wù)應(yīng)該盡可能小的線程家坎,如配置cpu數(shù)量+1個線程的線程池嘱能。由于IO密集型任務(wù)并不是一直在執(zhí)行任務(wù),不能讓cpu閑著虱疏,則應(yīng)配置盡可能多的線程惹骂,如:cup數(shù)量2。混合型的任務(wù)订框,如果可以拆分析苫,將其拆分成一個CPU密集型任務(wù)和一個IO密集型任務(wù)*,只要這2個任務(wù)執(zhí)行的時間相差不是太大穿扳,那么分解后執(zhí)行的吞吐量將高于串行執(zhí)行的吞吐量衩侥。可以通過Runtime.getRuntime().availableProcessors()
方法獲取cpu數(shù)量矛物。優(yōu)先級不同任務(wù)可以對線程池采用優(yōu)先級隊列來處理茫死,讓優(yōu)先級高的先執(zhí)行。
使用隊列的時候建議使用有界隊列履羞,有界隊列增加了系統(tǒng)的穩(wěn)定性峦萎,如果采用無界隊列屡久,任務(wù)太多的時候可能導(dǎo)致系統(tǒng)OOM,直接讓系統(tǒng)宕機爱榔。
線程池中線程數(shù)量的配置
線程池中總線程大小對系統(tǒng)的性能有一定的影響被环,我們的目標(biāo)是希望系統(tǒng)能夠發(fā)揮最好的性能,過多或者過小的線程數(shù)量無法有效的使用機器的性能详幽。在Java Concurrency in Practice書中給出了估算線程池大小的公式:
Ncpu = CUP的數(shù)量
Ucpu = 目標(biāo)CPU的使用率筛欢,0<=Ucpu<=1
W/C = 等待時間與計算時間的比例
為保存處理器達到期望的使用率,最優(yōu)的線程池的大小等于:
Nthreads = Ncpu × Ucpu × (1+W/C)
線程池數(shù)量 = CUP的數(shù)量 * 目標(biāo)CPU的使用率 * 等待時間與計算時間的比例
使用建議
在《阿里巴巴java開發(fā)手冊》中指出了線程資源必須通過線程池提供唇聘,不允許在應(yīng)用中自行顯示的創(chuàng)建線程版姑,這樣一方面是線程的創(chuàng)建更加規(guī)范,可以合理控制開辟線程的數(shù)量迟郎;另一方面線程的細節(jié)管理交給線程池處理剥险,優(yōu)化了資源的開銷。而線程池不允許使用Executors去創(chuàng)建宪肖,而要通過ThreadPoolExecutor方式表制,這一方面是由于jdk中Executor框架雖然提供了如newFixedThreadPool()、newSingleThreadExecutor()匈庭、newCachedThreadPool()等創(chuàng)建線程池的方法夫凸,但都有其局限性,不夠靈活阱持;另外由于前面幾種方法內(nèi)部也是通過ThreadPoolExecutor方式實現(xiàn),使用ThreadPoolExecutor有助于大家明確線程池的運行規(guī)則魔熏,創(chuàng)建符合自己的業(yè)務(wù)場景需要的線程池衷咽,避免資源耗盡的風(fēng)險。
線程池不允許使用Executors去創(chuàng)建蒜绽,而是通過ThreadPoolExecutor的方式镶骗,這樣的處理方式讓寫的同學(xué)更加明確線程池的運行規(guī)則,規(guī)避資源耗盡的風(fēng)險躲雅。
說明:Executors返回的線程池對象的弊端如下:
1) FixedThreadPool和SingleThreadPool:
允許的請求隊列:LinkedBlockingQueue長度為Integer.MAX_VALUE鼎姊,可能會堆積大量的請求,從而導(dǎo)致OOM相赁。
2) CachedThreadPool:
允許的創(chuàng)建線程數(shù)量為Integer.MAX_VALUE相寇,可能會創(chuàng)建大量的線程,從而導(dǎo)致OOM钮科。
疑問:
Q:LinkedBlockingQueue吞吐量通常要高于ArrayBlockingQueue唤衫,為什么?
LinkedBlockingQueue底層是鏈表绵脯,增刪效率比較高佳励,而ArrayBlockingQueue底層是數(shù)組休里,增刪效率比較低。
最主要的是ArrayBlockingQueue數(shù)據(jù)的插入與取出共用同一個鎖赃承,因此ArrayBlockingQueue并不能實現(xiàn)生產(chǎn)妙黍、消費同時進行;
LinkedBlockingQueue中用于阻塞生產(chǎn)者瞧剖、消費者的鎖是兩個(鎖分離:注意這里是JDK8之前的設(shè)計拭嫁,JDK8之后是用一個鎖實現(xiàn)),因此生產(chǎn)與消費是可以同時進行的筒繁。
JUC中的Executor框架
Excecutor框架主要包含3部分的內(nèi)容:
- 任務(wù)相關(guān)的:包含被執(zhí)行的任務(wù)要實現(xiàn)的接口:Runnable接口或Callable接口
- 任務(wù)的執(zhí)行相關(guān)的:包含任務(wù)執(zhí)行機制的核心接口Executor噩凹,以及繼承自
Executor
的ExecutorService
接口。Executor框架中有兩個關(guān)鍵的類實現(xiàn)了ExecutorService接口(ThreadPoolExecutor
和ScheduleThreadPoolExecutor
) - 異步計算結(jié)果相關(guān)的:包含接口Future和實現(xiàn)Future接口的FutureTask類
Executors框架包括:
- Executor
- ExecutorService
- ThreadPoolExecutor
- Executors
- Future
- Callable
- FutureTask
- CompletableFuture
- CompletionService
- ExecutorCompletionService
Executor接口
Executor接口中定義了方法execute(Runable able)接口毡咏,該方法接受一個Runable實例驮宴,他來執(zhí)行一個任務(wù),任務(wù)即實現(xiàn)一個Runable接口的類呕缭。
ExecutorService接口
ExecutorService繼承于Executor接口堵泽,他提供了更為豐富的線程實現(xiàn)方法,比如ExecutorService提供關(guān)閉自己的方法恢总,以及為跟蹤一個或多個異步任務(wù)執(zhí)行狀況而生成Future的方法迎罗。
ExecutorService有三種狀態(tài):運行、關(guān)閉片仿、終止纹安。創(chuàng)建后便進入運行狀態(tài),當(dāng)調(diào)用了shutdown()方法時砂豌,便進入了關(guān)閉狀態(tài)厢岂,此時意味著ExecutorService不再接受新的任務(wù),但是他還是會執(zhí)行已經(jīng)提交的任務(wù)阳距,當(dāng)所有已經(jīng)提交了的任務(wù)執(zhí)行完后塔粒,便達到終止?fàn)顟B(tài)。如果不調(diào)用shutdown方法筐摘,ExecutorService方法會一直運行下去卒茬,系統(tǒng)一般不會主動關(guān)閉。
ThreadPoolExecutor類
線程池類咖熟,實現(xiàn)了ExecutorService
接口中所有方法圃酵,參考線程池的使用。
ScheduleThreadPoolExecutor定時器
ScheduleThreadPoolExecutor繼承自ThreadPoolExecutor
(實現(xiàn)了線程池的核心功能),實現(xiàn)了ScheduledExecutorService
(實現(xiàn)了定時器調(diào)度功能)球恤,他主要用來延遲執(zhí)行任務(wù)辜昵,或者定時執(zhí)行任務(wù)。功能和Timer類似咽斧,但是ScheduleThreadPoolExecutor更強大堪置、更靈活一些躬存。Timer后臺是單個線程,而ScheduleThreadPoolExecutor可以在創(chuàng)建的時候指定多個線程舀锨。
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
}
schedule:延遲執(zhí)行任務(wù)1次
使用ScheduleThreadPoolExecutor的schedule方法
岭洲,看一下這個方法的聲明:
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
3個參數(shù):
command:需要執(zhí)行的任務(wù)
delay:需要延遲的時間
unit:參數(shù)2的時間單位,是個枚舉坎匿,可以是天盾剩、小時、分鐘替蔬、秒告私、毫秒、納秒等
實例:
//只延遲調(diào)度一次
public static void main(String[] args) {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3,
new DemoThreadFactory("延遲調(diào)度線程池"));
scheduledThreadPool.schedule(()->{
System.out.println(System.currentTimeMillis()+"開始執(zhí)行調(diào)度承桥!");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(System.currentTimeMillis()+"執(zhí)行調(diào)度結(jié)束驻粟!");
},3,TimeUnit.SECONDS);
}
輸出:
1598509985652開始執(zhí)行調(diào)度!
1598509990653執(zhí)行調(diào)度結(jié)束凶异!
scheduleAtFixedRate:固定的頻率執(zhí)行任務(wù)
使用ScheduleThreadPoolExecutor的scheduleAtFixedRate
方法蜀撑,該方法設(shè)置了執(zhí)行周期,下一次執(zhí)行時間相當(dāng)于是上一次的執(zhí)行時間加上period剩彬,任務(wù)每次執(zhí)行完畢之后才會計算下次的執(zhí)行時間酷麦。
看一下這個方法的聲明:
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
4個參數(shù):
command:表示要執(zhí)行的任務(wù)
initialDelay:表示延遲多久執(zhí)行第一次
period:連續(xù)執(zhí)行之間的時間間隔
unit:參數(shù)2和參數(shù)3的時間單位,是個枚舉喉恋,可以是天沃饶、小時、分鐘轻黑、秒绍坝、毫秒、納秒等
假設(shè)系統(tǒng)調(diào)用scheduleAtFixedRate的時間是T1苔悦,那么執(zhí)行時間如下:
第1次:T1+initialDelay
第2次:T1+initialDelay+period(這時候如果第一次執(zhí)行完后時間大于固定頻率的時間,就會被馬上調(diào)度起來)
第3次:T1+initialDelay+2*period
第n次:T1+initialDelay+(n-1)*period
實例:
//scheduleAtFixedRate()表示每次方法的執(zhí)行周期是多久關(guān)注的是執(zhí)行周期椎咧,如果已經(jīng)到了執(zhí)行周期玖详,就會立即開啟調(diào)度任務(wù),時間間隔是調(diào)度任務(wù)開始時間加周期
public static void main2(String[] args) throws ExecutionException, InterruptedException {
//任務(wù)執(zhí)行計數(shù)器
AtomicInteger count = new AtomicInteger(1);
ScheduledExecutorService scheduledThreadPool = new ScheduledThreadPoolExecutor(3,
new DemoThreadFactory("延遲調(diào)度線程池"),new ThreadPoolExecutor.AbortPolicy());
ScheduledFuture<?> schedule = scheduledThreadPool.scheduleAtFixedRate(() -> {
int currCount = count.getAndIncrement();
System.out.println(Thread.currentThread().getName()+":"+new Date(System.currentTimeMillis()) + " 第" + currCount + "次" + "開始執(zhí)行");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+":"+new Date(System.currentTimeMillis()) + " 第" + currCount + "次" + "結(jié)束執(zhí)行");
}, 1,3, TimeUnit.SECONDS);
}
輸出:
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-1:Thu Aug 27 14:36:17 CST 2020 第1次開始執(zhí)行
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-1:Thu Aug 27 14:36:22 CST 2020 第1次結(jié)束執(zhí)行
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-1:Thu Aug 27 14:36:22 CST 2020 第2次開始執(zhí)行
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-1:Thu Aug 27 14:36:27 CST 2020 第2次結(jié)束執(zhí)行
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-2:Thu Aug 27 14:36:27 CST 2020 第3次開始執(zhí)行
任務(wù)當(dāng)前執(zhí)行完畢之后會計算下次執(zhí)行時間勤讽,下次執(zhí)行時間為上次執(zhí)行的開始時間+period蟋座,這個時間小于第一次結(jié)束的時間了,說明小于系統(tǒng)當(dāng)前時間了脚牍,會立即執(zhí)行向臀。
scheduleWithFixedDelay:固定的間隔執(zhí)行任務(wù)
使用ScheduleThreadPoolExecutor的scheduleWithFixedDelay
方法,該方法設(shè)置了執(zhí)行周期诸狭,與scheduleAtFixedRate方法不同的是券膀,下一次執(zhí)行時間是上一次任務(wù)執(zhí)行完的系統(tǒng)時間加上period君纫,因而具體執(zhí)行時間不是固定的,但周期是固定的芹彬,是采用相對固定的延遲來執(zhí)行任務(wù)蓄髓。看一下這個方法的聲明:
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
4個參數(shù):
command:表示要執(zhí)行的任務(wù)
initialDelay:表示延遲多久執(zhí)行第一次
period:表示下次執(zhí)行時間和上次執(zhí)行結(jié)束時間之間的間隔時間
unit:參數(shù)2和參數(shù)3的時間單位,是個枚舉舒帮,可以是天会喝、小時、分鐘玩郊、秒肢执、毫秒、納秒等
假設(shè)系統(tǒng)調(diào)用scheduleAtFixedRate的時間是T1译红,那么執(zhí)行時間如下:
第1次:T1+initialDelay预茄,執(zhí)行結(jié)束時間:E1(執(zhí)行結(jié)束時間是不固定的)
第2次:E1+period,執(zhí)行結(jié)束時間:E2
第3次:E2+period临庇,執(zhí)行結(jié)束時間:E3
第4次:E3+period反璃,執(zhí)行結(jié)束時間:E4
第n次:上次執(zhí)行結(jié)束時間+period
實例:
//scheduleWithFixedDelay()表示每次方法執(zhí)行完后延遲多久執(zhí)行,關(guān)注的是延遲時間假夺,時間間隔是調(diào)度任務(wù)結(jié)束時間加延遲時間
public static void main(String[] args) throws ExecutionException, InterruptedException {
//任務(wù)執(zhí)行計數(shù)器
AtomicInteger count = new AtomicInteger(1);
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(4,
new DemoThreadFactory("延遲調(diào)度線程池"));
ScheduledFuture<?> schedule = scheduledThreadPool.scheduleWithFixedDelay(() -> {
int currCount = count.getAndIncrement();
System.out.println(Thread.currentThread().getName()+":"+new Date(System.currentTimeMillis()) + " 第" + currCount + "次" + "開始執(zhí)行");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+":"+new Date(System.currentTimeMillis()) + " 第" + currCount + "次" + "結(jié)束執(zhí)行");
}, 1,3, TimeUnit.SECONDS);
}
輸出:
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-1:Thu Aug 27 14:39:16 CST 2020 第1次開始執(zhí)行
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-1:Thu Aug 27 14:39:22 CST 2020 第1次結(jié)束執(zhí)行
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-1:Thu Aug 27 14:39:25 CST 2020 第2次開始執(zhí)行
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-1:Thu Aug 27 14:39:30 CST 2020 第2次結(jié)束執(zhí)行
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-2:Thu Aug 27 14:39:33 CST 2020 第3次開始執(zhí)行
延遲1秒之后執(zhí)行第1次淮蜈,后面每次的執(zhí)行時間和上次執(zhí)行結(jié)束時間間隔3秒。
定時任務(wù)有異骋丫恚——沒有對異常處理則定時任務(wù)會結(jié)束
先說補充點知識:schedule梧田、scheduleAtFixedRate、scheduleWithFixedDelay這幾個方法有個返回值ScheduledFuture侧蘸,通過ScheduledFuture
可以對執(zhí)行的任務(wù)做一些操作裁眯,如判斷任務(wù)是否被取消、是否執(zhí)行完成讳癌。
再回到上面代碼穿稳,任務(wù)中有個10/0的操作,會觸發(fā)異常晌坤,發(fā)生異常之后沒有任何現(xiàn)象逢艘,被ScheduledExecutorService內(nèi)部給吞掉了,然后這個任務(wù)再也不會執(zhí)行了骤菠,scheduledFuture.isDone()
輸出true它改,表示這個任務(wù)已經(jīng)結(jié)束了,再也不會被執(zhí)行了商乎。所以如果程序有異常央拖,開發(fā)者自己注意try-catch處理一下,不然跑著跑著發(fā)現(xiàn)任務(wù)怎么不跑了,也沒有異常輸出鲜戒。
實例:
public static void main(String[] args) throws ExecutionException, InterruptedException {
//任務(wù)執(zhí)行計數(shù)器
AtomicInteger count = new AtomicInteger(1);
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(4,
new DemoThreadFactory("延遲調(diào)度線程池"));
ScheduledFuture<?> schedule = scheduledThreadPool.scheduleWithFixedDelay(() -> {
int currCount = count.getAndIncrement();
System.out.println(Thread.currentThread().getName()+":"+new Date(System.currentTimeMillis()) + " 第" + currCount + "次" + "開始執(zhí)行");
/* try {
System.out.println(10/0);
} catch (Exception e) {
e.printStackTrace();
}*/
System.out.println(10/0);
System.out.println(Thread.currentThread().getName()+":"+new Date(System.currentTimeMillis()) + " 第" + currCount + "次" + "結(jié)束執(zhí)行");
}, 1,3, TimeUnit.SECONDS);
TimeUnit.SECONDS.sleep(3);
System.out.println(schedule.isCancelled());
System.out.println(schedule.isDone());
}
輸出:
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-1:Thu Aug 27 14:45:09 CST 2020 第1次開始執(zhí)行
false
true
取消定時任務(wù)的執(zhí)行——調(diào)用ScheduledFuture
的cancel
方法
可能任務(wù)執(zhí)行一會专控,想取消執(zhí)行,可以調(diào)用ScheduledFuture
的cancel
方法袍啡,參數(shù)表示是否給任務(wù)發(fā)送中斷信號踩官。
示例:
public static void main(String[] args) throws ExecutionException, InterruptedException {
//任務(wù)執(zhí)行計數(shù)器
AtomicInteger count = new AtomicInteger(1);
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(4,
new DemoThreadFactory("延遲調(diào)度線程池"));
ScheduledFuture<?> schedule = scheduledThreadPool.scheduleWithFixedDelay(() -> {
int currCount = count.getAndIncrement();
System.out.println(Thread.currentThread().getName()+":"+new Date(System.currentTimeMillis()) + " 第" + currCount + "次" + "開始執(zhí)行");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+":"+new Date(System.currentTimeMillis()) + " 第" + currCount + "次" + "結(jié)束執(zhí)行");
}, 1,3, TimeUnit.SECONDS);
TimeUnit.SECONDS.sleep(5);
schedule.cancel(false);
TimeUnit.SECONDS.sleep(1);
System.out.println("任務(wù)是否被取消:"+schedule.isCancelled());
System.out.println("任務(wù)是否已完成:"+schedule.isDone());
}
}
輸出:
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-1:Thu Aug 27 14:53:12 CST 2020 第1次開始執(zhí)行
任務(wù)是否被取消:true
任務(wù)是否已完成:true
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-1:Thu Aug 27 14:53:17 CST 2020 第1次結(jié)束執(zhí)行
Executors類——線程池工具類
Executors類,提供了一系列工廠方法用于創(chuàng)建線程池境输,返回的線程池都實現(xiàn)了ExecutorService接口蔗牡。常用的方法有:
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)
創(chuàng)建一個單線程的線程池。這個線程池只有一個線程在工作嗅剖,也就是相當(dāng)于單線程串行執(zhí)行所有任務(wù)辩越。如果這個唯一的線程因為異常結(jié)束,那么會有一個新的線程來替代它信粮。此線程池保證所有任務(wù)的執(zhí)行順序按照任務(wù)的提交順序執(zhí)行黔攒。內(nèi)部使用了無限容量的LinkedBlockingQueue阻塞隊列來緩存任務(wù),任務(wù)如果比較多强缘,單線程如果處理不過來督惰,會導(dǎo)致隊列堆滿,引發(fā)OOM旅掂。
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
創(chuàng)建固定大小的線程池赏胚。每次提交一個任務(wù)就創(chuàng)建一個線程,直到線程達到線程池的最大大小商虐。線程池的大小一旦達到最大值就會保持不變觉阅,在提交新任務(wù),任務(wù)將會進入等待隊列中等待秘车。如果某個線程因為執(zhí)行異常而結(jié)束典勇,那么線程池會補充一個新線程。內(nèi)部使用了無限容量的LinkedBlockingQueue阻塞隊列來緩存任務(wù)叮趴,任務(wù)如果比較多割笙,如果處理不過來,會導(dǎo)致隊列堆滿眯亦,引發(fā)OOM咳蔚。
newCachedThreadPool
public static ExecutorService newCachedThreadPool()
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)
創(chuàng)建一個可緩存的線程池。如果線程池的大小超過了處理任務(wù)所需要的線程搔驼,
那么就會回收部分空閑(60秒處于等待任務(wù)到來)的線程,當(dāng)任務(wù)數(shù)增加時侈询,此線程池又可以智能的添加新線程來處理任務(wù)舌涨。此線程池的最大值是Integer的最大值(2^31-1)。內(nèi)部使用了SynchronousQueue同步隊列來緩存任務(wù),此隊列的特性是放入任務(wù)時必須要有對應(yīng)的線程獲取任務(wù)囊嘉,任務(wù)才可以放入成功温技。如果處理的任務(wù)比較耗時,任務(wù)來的速度也比較快扭粱,會創(chuàng)建太多的線程引發(fā)OOM舵鳞。
newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)
創(chuàng)建一個大小無限的線程池。此線程池支持定時以及周期性執(zhí)行任務(wù)的需求琢蛤。
在《阿里巴巴java開發(fā)手冊》中指出了線程資源必須通過線程池提供蜓堕,不允許在應(yīng)用中自行顯示的創(chuàng)建線程,這樣一方面是線程的創(chuàng)建更加規(guī)范博其,可以合理控制開辟線程的數(shù)量套才;另一方面線程的細節(jié)管理交給線程池處理,優(yōu)化了資源的開銷慕淡。而線程池不允許使用Executors去創(chuàng)建背伴,而要通過ThreadPoolExecutor方式,這一方面是由于jdk中Executor框架雖然提供了如newFixedThreadPool()峰髓、newSingleThreadExecutor()傻寂、newCachedThreadPool()等創(chuàng)建線程池的方法,但都有其局限性携兵,不夠靈活疾掰;另外由于前面幾種方法內(nèi)部也是通過ThreadPoolExecutor方式實現(xiàn),使用ThreadPoolExecutor有助于大家明確線程池的運行規(guī)則眉孩,創(chuàng)建符合自己的業(yè)務(wù)場景需要的線程池个绍,避免資源耗盡的風(fēng)險。
Future浪汪、Callable接口
Future巴柿、Callable接口需要結(jié)合ExecutorService來使用,需要有線程池的支持死遭。
Future
接口定義了操作異步異步任務(wù)執(zhí)行一些方法广恢,如獲取異步任務(wù)的執(zhí)行結(jié)果、取消任務(wù)的執(zhí)行呀潭、判斷任務(wù)是否被取消钉迷、判斷任務(wù)執(zhí)行是否完畢等。
Callable
接口中定義了需要有返回的任務(wù)需要實現(xiàn)的方法钠署】反希——相當(dāng)于有返回值的Runnable
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
比如主線程讓一個子線程去執(zhí)行任務(wù),子線程可能比較耗時谐鼎,啟動子線程開始執(zhí)行任務(wù)后舰蟆,主線程就去做其他事情了,過了一會才去獲取子任務(wù)的執(zhí)行結(jié)果。
Future其他方法介紹一下
cancel:取消在執(zhí)行的任務(wù)身害,參數(shù)表示是否對執(zhí)行的任務(wù)發(fā)送中斷信號味悄,方法聲明如下:
boolean cancel(boolean mayInterruptIfRunning);
isCancelled:用來判斷任務(wù)是否被取消
isDone:判斷任務(wù)是否執(zhí)行完畢。
調(diào)用線程池的submit
方法執(zhí)行任務(wù)塌鸯,submit參數(shù)為Callable
接口:表示需要執(zhí)行的任務(wù)有返回值侍瑟,submit方法返回一個Future
對象,F(xiàn)uture相當(dāng)于一個憑證丙猬,可以在任意時間拿著這個憑證去獲取對應(yīng)任務(wù)的執(zhí)行結(jié)果(調(diào)用其get
方法)涨颜,代碼中調(diào)用了result.get()
方法之后,此方法會阻塞當(dāng)前線程直到任務(wù)執(zhí)行結(jié)束淮悼。
實例:
public static void main(String[] args) throws ExecutionException, InterruptedException {
String taskName = "task";
Future<String> future = executor.submit(() -> {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
// System.out.println(Thread.currentThread().getName() + "線程執(zhí)行" + taskName + "完畢咐低!");
return "finished";
});
TimeUnit.SECONDS.sleep(1);
//取消正在執(zhí)行的任務(wù),mayInterruptIfRunning:是否發(fā)送中斷信息
future.cancel(false);
System.out.println(future.isCancelled());
System.out.println(future.isDone());
//System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + ",結(jié)果:" + future.get());
try {
//超時獲取異步任務(wù)執(zhí)行結(jié)果
System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + ",結(jié)果:" + future.get(10,TimeUnit.SECONDS));
} catch (TimeoutException e) {
e.printStackTrace();
}
executor.shutdown();
}
}
輸出:
Exception in thread "main" java.util.concurrent.CancellationException
at java.util.concurrent.FutureTask.report(FutureTask.java:121)
at java.util.concurrent.FutureTask.get(FutureTask.java:206)
at com.self.current.FutureTest.main(FutureTest.java:46)
true
true
FutureTask類
FutureTask除了實現(xiàn)Future接口袜腥,還實現(xiàn)了Runnable接口见擦,因此FutureTask可以交給Executor執(zhí)行,也可以交給線程執(zhí)行執(zhí)行(Thread有個Runnable的構(gòu)造方法)羹令,FutureTask表示帶返回值結(jié)果的任務(wù)鲤屡。線程池的submit方法返回的Future實際類型正是FutureTask對象 。
疑問:
Q:線程池執(zhí)行submit()方法是如何調(diào)用Callable任務(wù)的福侈?
A:Callable通過線程池執(zhí)行的過程酒来,封裝為Runnable。線程池執(zhí)行submit()方法會把Callable包裝成FutrueTask對象肪凛,此對象實現(xiàn)了Runnable接口堰汉,當(dāng)調(diào)用FutrueTask的run方法時,會把其屬性中的Callable拿出來執(zhí)行call()方法伟墙。示例代碼如下:
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
public void run() {
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
}
}
Q:多線程并行處理定時任務(wù)時翘鸭,Timer運行多個TimeTask時,只要其中之一沒有捕獲拋出的異常戳葵,其它任務(wù)便會自動終止運行就乓,使用ScheduledExecutorService則沒有這個問題。是因為ScheduledExecutorService是多線程么拱烁?
A:是因為Timer只有一個線程在運行生蚁,while(true)循環(huán)不斷地從隊列中獲取任務(wù)執(zhí)行,而當(dāng)線程被被殺死或者中斷時戏自,就相當(dāng)于關(guān)閉了Timer.
Q: ScheduleThreadPoolExecutor定時器并不關(guān)心線程數(shù)多少邦投,他不是并發(fā)的執(zhí)行多任務(wù),只關(guān)心調(diào)度一個定時任務(wù)擅笔,線程數(shù)的多少只是影響多個任務(wù)再調(diào)度時需要多個線程尼摹,這樣理解對么见芹?
A:我認為這樣理解是對的,而這樣也可以解釋上面Timer運行多個TimeTask時蠢涝,只要其中之一沒有捕獲拋出的異常,其它任務(wù)便會自動終止運行的原因阅懦,是因為Timer只有一個線程在運行和二,while(true)循環(huán)不斷地從隊列中獲取任務(wù)執(zhí)行,而當(dāng)線程被被殺死或者中斷時耳胎,就相當(dāng)于關(guān)閉了Timer.下面是多個任務(wù)調(diào)度時會創(chuàng)建多個線程去執(zhí)行惯吕。
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-1:Thu Aug 27 14:22:22 CST 2020 第1次開始執(zhí)行
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-2:Thu Aug 27 14:22:22 CST 2020 第2次開始執(zhí)行
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-3:Thu Aug 27 14:22:22 CST 2020 第3次開始執(zhí)行
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-1:Thu Aug 27 14:22:27 CST 2020 第1次結(jié)束執(zhí)行
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-2:Thu Aug 27 14:22:27 CST 2020 第2次結(jié)束執(zhí)行
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-3:Thu Aug 27 14:22:27 CST 2020 第3次結(jié)束執(zhí)行
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-1:Thu Aug 27 14:22:30 CST 2020 第4次開始執(zhí)行
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-4:Thu Aug 27 14:22:30 CST 2020 第5次開始執(zhí)行
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-2:Thu Aug 27 14:22:30 CST 2020 第6次開始執(zhí)行
CompletionService接口——獲取線程池中已經(jīng)完成的任務(wù)
CompletionService相當(dāng)于一個執(zhí)行任務(wù)的服務(wù),通過submit丟任務(wù)給這個服務(wù)怕午,服務(wù)內(nèi)部去執(zhí)行任務(wù)废登,可以通過服務(wù)提供的一些方法獲取服務(wù)中已經(jīng)完成的任務(wù)。
接口內(nèi)的幾個方法:
Future<V> submit(Callable<V> task);
用于向服務(wù)中提交有返回結(jié)果的任務(wù)郁惜,并返回Future對象
Future<V> submit(Runnable task, V result);
用戶向服務(wù)中提交有返回值的任務(wù)去執(zhí)行堡距,并返回Future對象。Runnable會被包裝成有返回值的Callable兆蕉,返回值為傳入的result羽戒。
Future<V> take() throws InterruptedException;
從服務(wù)中返回并移除一個已經(jīng)完成的任務(wù),如果獲取不到虎韵,會一致阻塞到有返回值為止易稠。此方法會響應(yīng)線程中斷。
Future<V> poll();
從服務(wù)中返回并移除一個已經(jīng)完成的任務(wù)包蓝,如果內(nèi)部沒有已經(jīng)完成的任務(wù)驶社,則返回空,此方法會立即響應(yīng)测萎。
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
嘗試在指定的時間內(nèi)從服務(wù)中返回并移除一個已經(jīng)完成的任務(wù)亡电,等待的時間超時還是沒有獲取到已完成的任務(wù),則返回空绳泉。此方法會響應(yīng)線程中斷
通過submit向內(nèi)部提交任意多個任務(wù)逊抡,通過take方法可以獲取已經(jīng)執(zhí)行完成的任務(wù),如果獲取不到將等待零酪。
ExecutorCompletionService
ExecutorCompletionService類是CompletionService接口的具體實現(xiàn)冒嫡。
說一下其內(nèi)部原理,ExecutorCompletionService創(chuàng)建的時候會傳入一個線程池四苇,調(diào)用submit方法傳入需要執(zhí)行的任務(wù)经窖,任務(wù)由內(nèi)部的線程池來處理;ExecutorCompletionService內(nèi)部有個阻塞隊列迄埃,任意一個任務(wù)完成之后卵渴,會將任務(wù)的執(zhí)行結(jié)果(Future類型)放入阻塞隊列中瓣赂,然后其他線程可以調(diào)用它take、poll方法從這個阻塞隊列中獲取一個已經(jīng)完成的任務(wù)片拍,獲取任務(wù)返回結(jié)果的順序和任務(wù)執(zhí)行完成的先后順序一致煌集,所以最先完成的任務(wù)會先返回。
看一下構(gòu)造方法:
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
構(gòu)造方法需要傳入一個Executor對象捌省,這個對象表示任務(wù)執(zhí)行器苫纤,所有傳入的任務(wù)會被這個執(zhí)行器執(zhí)行。
completionQueue
是用來存儲任務(wù)結(jié)果的阻塞隊列纲缓,默認用采用的是LinkedBlockingQueue
卷拘,也支持開發(fā)自己設(shè)置。通過submit傳入需要執(zhí)行的任務(wù)祝高,任務(wù)執(zhí)行完成之后栗弟,會放入completionQueue
中。
任務(wù)完成入隊操作原理:
還是通過線程池execute()方法執(zhí)行一個FutureTask包裝的Callable任務(wù)工闺,F(xiàn)utureTask里的run方法會調(diào)用Callable任務(wù)call()方法執(zhí)行具體的行為乍赫,并在執(zhí)行結(jié)算后執(zhí)行set(result);設(shè)置返回值操作,而設(shè)置返回值操作中的finishCompletion()方法會調(diào)用鉤子方法done(),ExecutorCompletionService里定義的QueueingFuture繼承了FutureTask斤寂,重寫了鉤子方法耿焊,把完成的方法入隊保存起來了。
場景:買新房了遍搞,然后在網(wǎng)上下單買冰箱罗侯、洗衣機,電器商家不同溪猿,所以送貨耗時不一樣钩杰,然后等他們送貨,快遞只愿送到樓下诊县,然后我們自己將其搬到樓上的家中讲弄。 這時候我們需要根據(jù)異步先完成的快遞,拿個先到對其獲取做處理——搬上樓依痊。
示例:
public class ExecutorCompletionServiceTest {
static class GoodsModel {
//商品名稱
String name;
//購物開始時間
long startime;
//送到的時間
long endtime;
public GoodsModel(String name, long startime, long endtime) {
this.name = name;
this.startime = startime;
this.endtime = endtime;
}
@Override
public String toString() {
return name + "避除,下單時間[" + this.startime + "," + endtime + "],耗時:" + (this.endtime - this.startime);
}
}
/**
* 將商品搬上樓
*
* @param goodsModel
* @throws InterruptedException
*/
static void moveUp(GoodsModel goodsModel) throws InterruptedException {
//休眠5秒胸嘁,模擬搬上樓耗時
TimeUnit.SECONDS.sleep(5);
System.out.println("將商品搬上樓瓶摆,商品信息:" + goodsModel);
}
/**
* 模擬下單
*
* @param name 商品名稱
* @param costTime 耗時
* @return
*/
static Callable<GoodsModel> buyGoods(String name, long costTime) {
return () -> {
long startTime = System.currentTimeMillis();
System.out.println(startTime + "購買" + name + "下單!");
//模擬送貨耗時
try {
TimeUnit.SECONDS.sleep(costTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
long endTime = System.currentTimeMillis();
System.out.println(endTime + name + "送到了!");
return new GoodsModel(name, startTime, endTime);
};
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
long st = System.currentTimeMillis();
System.out.println(st + "開始購物!");
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(10),
new DemoThreadFactory("訂單創(chuàng)建組"), new ThreadPoolExecutor.AbortPolicy());
ExecutorCompletionService<GoodsModel> completionService = new ExecutorCompletionService<>(executor);
//異步下單購買
completionService.submit(buyGoods("電視機", 3));
completionService.submit(buyGoods("洗碗機", 5));
executor.shutdown();
for (int i = 0; i < 2; i++) {
//可以獲取到最先到的商品
GoodsModel goodsModel = completionService.take().get();
//將最先到的商品送上樓
moveUp(goodsModel);
}
long et = System.currentTimeMillis();
System.out.println(et + "貨物已送到家里咯,哈哈哈性宏!");
System.out.println("總耗時:" + (et - st));
}
}
1598583792616開始購物!
1598583792707購買電視機下單!
1598583792708購買洗碗機下單!
1598583795708電視機送到了!
1598583797709洗碗機送到了!
將商品搬上樓群井,商品信息:電視機,下單時間[1598583792707,1598583795708]毫胜,耗時:3001
將商品搬上樓书斜,商品信息:洗碗機诬辈,下單時間[1598583792708,1598583797709],耗時:5001
1598583805710貨物已送到家里咯荐吉,哈哈哈焙糟!
總耗時:13094
異步執(zhí)行一批任務(wù),有一個完成立即返回样屠,其他取消——線程池invokeAny ()方法
如果是要返回所有的任務(wù)結(jié)果酬荞,則調(diào)用 invokeAll(Collection<? extends Callable<T>> tasks)方法,invokeAny ()和invokeAll()都有超時調(diào)用方法瞧哟。如果超時時間到了,調(diào)用結(jié)束后還沒有全部完成枪向,會對所有工作線程發(fā)送中斷信號中斷操作勤揩。
方法聲明如下:
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
示例:
public static void main(String[] args) throws InterruptedException, ExecutionException {
long st = System.currentTimeMillis();
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(10),
new DemoThreadFactory("訂單創(chuàng)建組"), new ThreadPoolExecutor.AbortPolicy());
List<Callable<Integer>> list = new ArrayList<>();
int taskCount = 5;
for (int i = taskCount; i > 0; i--) {
int j = i * 2;
String taskName = "任務(wù)"+i;
list.add(() -> {
TimeUnit.SECONDS.sleep(j);
System.out.println(taskName+"執(zhí)行完畢!");
return j;
});
}
//Integer integer = invokeAny(executor, list);
//ExecutorService提供異步執(zhí)行一批任務(wù),有一個完成立即返回秘蛔,其他取消
Integer integer = executor.invokeAny(list);
System.out.println("耗時:" + (System.currentTimeMillis() - st) + ",執(zhí)行結(jié)果:" + integer);
executor.shutdown();
}
private static <T> T invokeAny(ThreadPoolExecutor executor, List<Callable<T>> list) throws InterruptedException, ExecutionException {
ExecutorCompletionService<T> completionService = new ExecutorCompletionService(executor);
List<Future<T>> futureList = new ArrayList<>();
for (Callable<T> s : list) {
futureList.add(completionService.submit(s));
}
int n = list.size();
try {
for (int i = 0; i < n; ++i) {
T r = completionService.take().get();
if (r != null) {
return r;
}
}
} finally {
for (Future<T> future : futureList) {
future.cancel(true);
}
}
return null;
}
}
輸出:
任務(wù)1執(zhí)行完畢!
耗時:2053,執(zhí)行結(jié)果:2
CompletableFuture——當(dāng)異步任務(wù)完成或者發(fā)生異常時陨亡,自動調(diào)用回調(diào)對象的回調(diào)方法,主線程無需等待獲取結(jié)果,異步是以守護線程執(zhí)行的深员,如果是用線程池作為執(zhí)行器則不是守護線程
使用Future
獲得異步執(zhí)行結(jié)果時负蠕,要么調(diào)用阻塞方法get()
,要么輪詢看isDone()
是否為true
倦畅,這兩種方法都不是很好遮糖,因為主線程也會被迫等待。
從Java 8開始引入了CompletableFuture
叠赐,它針對Future
做了改進欲账,可以傳入回調(diào)對象,當(dāng)異步任務(wù)完成或者發(fā)生異常時芭概,自動調(diào)用回調(diào)對象的回調(diào)方法赛不。
我們以獲取股票價格為例,看看如何使用CompletableFuture
:
CompletableFuture
的優(yōu)點是:
- 異步任務(wù)結(jié)束時罢洲,會自動回調(diào)某個對象的方法踢故;
- 異步任務(wù)出錯時,會自動回調(diào)某個對象的方法惹苗;
- 主線程設(shè)置好回調(diào)后殿较,不再關(guān)心異步任務(wù)的執(zhí)行。
如果只是實現(xiàn)了異步回調(diào)機制鸽粉,我們還看不出CompletableFuture
相比Future
的優(yōu)勢斜脂。CompletableFuture
更強大的功能是,多個CompletableFuture
可以串行執(zhí)行触机,多個CompletableFuture
還可以并行執(zhí)行帚戳。
除了anyOf()
可以實現(xiàn)“任意個CompletableFuture
只要一個成功”玷或,allOf()
可以實現(xiàn)“所有CompletableFuture
都必須成功”,這些組合操作可以實現(xiàn)非常復(fù)雜的異步流程控制片任。
最后我們注意CompletableFuture
的命名規(guī)則:
-
xxx()
:表示該方法將繼續(xù)在已有的線程中執(zhí)行偏友; -
xxxAsync()
:表示將異步在線程池中執(zhí)行。
示例:
public class CompletableFutureTest {
public static void main(String[] args) throws Exception {
// 創(chuàng)建異步執(zhí)行任務(wù):
CompletableFuture<Double> cf = CompletableFuture.supplyAsync(CompletableFutureTest::fetchPrice);
// 如果執(zhí)行成功:
cf.thenAccept((result) -> {
System.out.println("price: " + result);
});
// 如果執(zhí)行異常:
cf.exceptionally((e) -> {
e.printStackTrace();
return null;
});
// 主線程不要立刻結(jié)束对供,否則CompletableFuture默認使用的線程池會立刻關(guān)閉:
Thread.sleep(200);
}
static Double fetchPrice() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
if (Math.random() < 0.3) {
throw new RuntimeException("fetch price failed!");
}
return 5 + Math.random() * 20;
}
}
定義兩個CompletableFuture
位他,第一個CompletableFuture
根據(jù)證券名稱查詢證券代碼,第二個CompletableFuture
根據(jù)證券代碼查詢證券價格产场,這兩個CompletableFuture
實現(xiàn)串行操作如下:
public class CompletableFutureSerialTest {
public static void main(String[] args) throws InterruptedException {
//先獲取股票代碼
CompletableFuture<String> tesla = CompletableFuture.supplyAsync(() -> {
return CompletableFutureSerialTest.queryCode("tesla");
});
//再獲取股票代碼對應(yīng)的股價
CompletableFuture<Double> priceFuture = tesla.thenApplyAsync((code) -> {
return CompletableFutureSerialTest.fetchPrice(code);
});
//打印結(jié)果
priceFuture.thenAccept((price)->{
System.out.println("price: " + price);
});
// 主線程不要立刻結(jié)束鹅髓,否則CompletableFuture默認使用的線程池會立刻關(guān)閉:
Thread.sleep(2000);
}
static String queryCode(String name) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
return "601857";
}
static Double fetchPrice(String code) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
return 5 + Math.random() * 20;
}
}
輸出:
price: 23.116752498711122
示例:同時從新浪和網(wǎng)易查詢證券代碼,只要任意一個返回結(jié)果京景,就進行下一步查詢價格窿冯,查詢價格也同時從新浪和網(wǎng)易查詢,只要任意一個返回結(jié)果确徙,就完成操作醒串。
public class CompletableFutureParallelTest {
public static void main(String[] args) throws InterruptedException {
// 兩個CompletableFuture執(zhí)行異步查詢:
CompletableFuture<String> teslaSina = CompletableFuture.supplyAsync(() -> {
return CompletableFutureParallelTest.queryCode("tesla","https://finance.sina.com.cn/code/");
});
CompletableFuture<String> tesla163 = CompletableFuture.supplyAsync(() -> {
return CompletableFutureParallelTest.queryCode("tesla","https://money.163.com/code/");
});
// 用anyOf合并為一個新的CompletableFuture:
CompletableFuture<Object> stockFuture = CompletableFuture.anyOf(tesla163, teslaSina);
//再獲取股票代碼對應(yīng)的股價
// 兩個CompletableFuture執(zhí)行異步查詢:
CompletableFuture<Double> priceSina = stockFuture.thenApplyAsync((code) -> {
return CompletableFutureParallelTest.fetchPrice(String.valueOf(code),"https://money.163.com/code/");
});
CompletableFuture<Double> price163 = stockFuture.thenApplyAsync((code) -> {
return CompletableFutureParallelTest.fetchPrice(String.valueOf(code),"https://money.163.com/code/");
});
// 用anyOf合并為一個新的CompletableFuture:
CompletableFuture<Object> priceFuture = CompletableFuture.anyOf(priceSina, price163);
//打印結(jié)果
priceFuture.thenAccept((price)->{
System.out.println("price: " + price);
});
// 主線程不要立刻結(jié)束,否則CompletableFuture默認使用的線程池會立刻關(guān)閉:
Thread.sleep(2000);
}
static String queryCode(String name, String url) {
System.out.println("query code from " + url + "...");
try {
Thread.sleep((long) (Math.random() * 100));
} catch (InterruptedException e) {
}
return "601857";
}
static Double fetchPrice(String code, String url) {
System.out.println("query price from " + url + "...");
try {
Thread.sleep((long) (Math.random() * 100));
} catch (InterruptedException e) {
}
return 5 + Math.random() * 20;
}
}
query code from https://finance.sina.com.cn/code/...
query code from https://money.163.com/code/...
query price from https://money.163.com/code/...
query price from https://money.163.com/code/...
price: 17.34369661842006