ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
5,
10,
20,
TimeUnit.MINUTES,
new SynchronousQueue<Runnable>(),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread();
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
}
});
按照上面參數(shù)順序講解
- 創(chuàng)建線程池的時候沒有線程, 當提交任務的時候會陸續(xù)創(chuàng)建線程, 當corePoolSize 滿的時候, 會將任務放到隊列中去, 隊列滿了, 那么會繼續(xù)創(chuàng)建 corePoolIsze 到maxPoolSize之間的線程 .
- 設置allowCoreThreadTimeout=true(默認false)時镣陕,核心線程會超時關閉
- prestartAllCoreThread 或者prestartAllCoreThread 初始化核心線程
- 當線程數(shù)量 = maxPoolSize , 且任務隊列已經(jīng)滿了, 線程池會拒絕任務拋出一場
- 我們的項目 這個參數(shù)是 Runtime.getRuntime().availableProcessors() * 4 +1 , 根據(jù)壓力測試獲得最好的最大核心數(shù)量
- 空閑的線程能夠保持空閑的時間, 超過這個時間, 這一部分線程將被回收.
- 核心線程到最大線程數(shù)量的差異, 如果兩個值相等, 那么這個參數(shù)毫無意義.
- 當核心線程數(shù)達到最大時,新任務會放在隊列中排隊等待執(zhí)行
- 常見的隊列
- ArrayBlockingQueue: 有界隊列愉择,基于數(shù)組結構严衬,按照隊列FIFO原則對元素排序;
- LinkedBlockingQueue:無界隊列抚笔,基于鏈表結構扶认,按照隊列FIFO原則對元素排序,Executors.newFixedThreadPool()使用了這個隊列
- SynchronousQueue 同步隊列, 該隊列不存儲元素殊橙,每個插入操作必須等待另一個線程調(diào)用移除操作辐宾,否則插入操作會一直被阻塞,Executors.newCachedThreadPool()使用了這個隊列膨蛮;
- PriorityBlockingQueue:優(yōu)先級隊列叠纹,具有優(yōu)先級的無限阻塞隊列。
6 .
- 自定義線程的名字,daemon,優(yōu)先級等
- 這里的拒絕可以采取你自定義的辦法, 比如使用second 線程池處理 r , 或者丟棄r, 或者打印出日志, 取決于你的業(yè)務.
- 線程池執(zhí)行拒絕的兩種情況
- 當線程數(shù)量達到maxPoolSize , 隊列已經(jīng)滿了, 會拒絕新任務
- 當線程池調(diào)用shutdown 的時候, 會等待線程池的任務執(zhí)行完畢, 再shutdown, 這是一種優(yōu)雅的方式. 調(diào)用shutdown和shutdown關閉之前, 這段時間會拒絕新任務. shutdownNow, 會立刻關閉, 并且停止執(zhí)行任務. 和shutdown 有很大區(qū)別.
- 幾種拒絕策略
- AbortPolicy 丟棄任務, 拋運行異常(如果不設置, 默認就是這一個策略)
- CallerRunsPolicy 執(zhí)行任務
- DiscardPolicy 忽視鸽疾,什么都不會發(fā)生
- DiscardOldestPolicy 從隊列中踢出最先進入隊列(最后一個執(zhí)行)的任務
線程執(zhí)行的一些流程
- 如果當前線程池中的線程數(shù)目 小于 < corePoolSize 吊洼。則對于每個新任務,都會創(chuàng)建一個線程去執(zhí)行這個任務(即使core線程中也有空閑線程, 也會新創(chuàng)建線程會執(zhí)行)制肮。
- 如果當前線程池中的線程數(shù)目 大于等于 >= corePoolSize 冒窍。
對于每個新任務,會將其添加到任務隊列中豺鼻。若添加成功综液,則任務由空閑的線程主動將其從隊列中移除后執(zhí)行。若添加失斎屐(任務隊列已滿)谬莹,則嘗試創(chuàng)建新的線程執(zhí)行。 - 若當前線程池中的線程數(shù)目到達 maximumPoolSize 桩了,則對于新任務采取拒絕策略附帽。
- 如果線程池中的數(shù)量大于 corePoolSize 時,如果某個線程空閑時間超過 keepAliveTime 井誉,線程會被終止蕉扮,直到線程池中的線程數(shù)目不超過 corePoolSize 。
- 如果為核心線程池中的線程設置存活時間颗圣,則當核心線程池中的線程空閑時間超過 keepAliveTime 喳钟,則該線程也會被終止
- 如果核心線程數(shù)已經(jīng)達到, 如果沒有隊列沒有滿的話, 是不會創(chuàng)建新的線程. 有時候取決你使用什么隊列. 比如使用 ArrayBlockingQueue(10), 當核心線程已經(jīng)創(chuàng)建完成, 只有當隊列滿了之后才會繼續(xù)創(chuàng)建新的線程. 如果你使用的是SynchronousQueue, 內(nèi)部只能一個的隊列,那么只有隊列直接創(chuàng)建core線程到maxpoolSize 之間的線程
線程池性能優(yōu)化建議
- 建議使用 prestartAllCoreThread 或者prestartAllCoreThread 初始化核心線程
- 考慮 allowCoreThreadTimeOut 允許核心線程能夠回收節(jié)約機器的使用.
- 拒絕策略可以將任務, 提交給 second 線程池處理.
- 自定義ThreadFactory ,標識線程, 方便排查線程的使用情況
關于線程異常是否處理的問題:
- execute : 如果不手動捕獲一場, 線程池只能重新創(chuàng)建新的異常來填補空白屁使,重新創(chuàng)建線程這是有代價的
- submit: 因為能夠調(diào)用 future.get(). 所以有異常也會捕獲, 不會造成線程終止.
// 證明execute 的異常
@Test
public void test1() throws InterruptedException {
Thread.setDefaultUncaughtExceptionHandler((Thread t, Throwable e) -> {
log.warn("Exception in thread {}", t, e);
});
String prefix = "test";
ExecutorService threadPool = Executors.newFixedThreadPool(1, new ThreadUtil.ThreadFactoryImpl(prefix));
IntStream.rangeClosed(1, 10).forEach(i -> threadPool.execute(() -> {
if (i == 5) {
throw new RuntimeException("error");
}
log.info("I'm done : {}", i);
System.out.println(Thread.currentThread().getName() + " I'm done : " + i);
if (i < 5) {
Assert.assertEquals(prefix + "1", Thread.currentThread().getName());
} else {
Assert.assertEquals(prefix + "2", Thread.currentThread().getName());
}
}));
threadPool.shutdown();
threadPool.awaitTermination(1, TimeUnit.HOURS);
// 本來是通過test 1 線程執(zhí)行的, 后面出現(xiàn)異常 確是test2 執(zhí)行的, 說明x線程已經(jīng)終止2, 并且重新創(chuàng)建線程
}
// 線程名稱沒有變, 說明已經(jīng)幫你捕獲異常.
String prefix = "test";
ExecutorService threadPool = Executors.newFixedThreadPool(1, new ThreadUtil.ThreadFactoryImpl(prefix));
List<Future> futures = new ArrayList<>();
IntStream.rangeClosed(1, 10).forEach(i -> futures.add(threadPool.submit(() -> {
if (i == 5) {
throw new RuntimeException("error");
}
log.info("I'm done : {}", i);
// if (i < 5) Assert.assertEquals(prefix + "1", Thread.currentThread().getName());
// else Assert.assertEquals(prefix + "2", Thread.currentThread().getName());
})));
for (Future future : futures) {
try {
future.get();
} catch (ExecutionException e) {
log.warn("future ExecutionException", e);
}
}
threadPool.shutdown();
threadPool.awaitTermination(1, TimeUnit.HOURS);
面試問題
- 說說線程池的核心參數(shù)有哪些
- 說說你們的corePoolSize 的數(shù)量是如何設置,超時時間如何設置
- 你們使用的是什么隊列, 為什么使用這個隊列.
- 你們項目是如何優(yōu)化自己的線程池參數(shù)的.
- 當線程池還沒達到 corePoolSize 的時候, 線程池里面有空閑線程, 這個時候來了一個新的任務, 線程池是創(chuàng)建新的線程還是使用空閑線程 ?
路過點贊, 月入10w