Executors的“罪與罰”
在上一篇文章Java并發(fā) 之 線程池系列 (1) 讓多線程不再坑爹的線程池中务甥,我們介紹了使用JDK concurrent包下的工廠和工具類Executors
來(lái)創(chuàng)建線程池常用的幾種方法:
//創(chuàng)建固定線程數(shù)量的線程池
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);
//創(chuàng)建一個(gè)線程池,該線程池會(huì)根據(jù)需要?jiǎng)?chuàng)建新的線程,但如果之前創(chuàng)建的線程可以使用储耐,會(huì)重用之前創(chuàng)建的線程
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
//創(chuàng)建一個(gè)只有一個(gè)線程的線程池
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
誠(chéng)然础浮,這種創(chuàng)建線程池的方法非常簡(jiǎn)單和方便氧映。但仔細(xì)閱讀源碼柜蜈,卻把我嚇了一條: 這是要老子的命澳ピ琛约谈!
我們前面講過(guò)笔宿,如果有新的請(qǐng)求過(guò)來(lái)犁钟,在線程池中會(huì)創(chuàng)建新的線程處理這些任務(wù),一直創(chuàng)建到線程池的最大容量(Max Size)為止泼橘;超出線程池的最大容量的Tasks涝动,會(huì)被放入阻塞隊(duì)列(Blocking
Queue)進(jìn)行等待,知道有線程資源釋放出來(lái)為止炬灭;要知道的是醋粟,阻塞隊(duì)列也是有最大容量的,多余隊(duì)列最大容量的請(qǐng)求不光沒(méi)有獲得執(zhí)行的機(jī)會(huì)重归,連排隊(duì)的資格都沒(méi)有米愿!
那這些連排隊(duì)的資格都沒(méi)有的Tasks怎么處理呢?不要急鼻吮,后面在介紹ThreadPoolExecutor
的拒絕處理策略(Handler Policies for Rejected Task)的時(shí)候會(huì)詳細(xì)介紹育苟。
說(shuō)到這里你也許有寫疑惑了,上面這些東西椎木,我通常使用Executors
的時(shí)候沒(méi)有指定過(guò)啊违柏。是的,因?yàn)?code>Executors很“聰明”地幫我們做了這些事情香椎。
Executors的源碼
我們看下Executors
的newFixedThreadPool
和newSingleThreadExecutor
方法的源碼:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(
nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(
1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
其實(shí)它們底層還是通過(guò)ThreadPoolExecutor
來(lái)創(chuàng)建ExecutorService
的漱竖,這里對(duì)妻子的參數(shù)先不作介紹,下面會(huì)詳細(xì)講畜伐,這里只說(shuō)一下new LinkedBlockingQueue<Runnable>()
這個(gè)參數(shù)闲孤。
LinkedBlockingQueue
就是當(dāng)任務(wù)數(shù)大于線程池的線程數(shù)的時(shí)候的阻塞隊(duì)列,這里使用的是無(wú)參構(gòu)造烤礁,我們?cè)倏匆幌聵?gòu)造函數(shù):
/**
* Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}.
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
我們看到阻塞隊(duì)列的默認(rèn)大小竟然是Integer.MAX_VALUE
讼积!
如果不做控制,拼命地往阻塞隊(duì)列里放Task脚仔,分分鐘“Out of Memory”扒谥凇!
還有更絕的鲤脏,newCachedThreadPool
方法:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(
0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
最大線程數(shù)默認(rèn)也是Integer.MAX_VALUE
们颜,也就是說(shuō),如果之前的任務(wù)沒(méi)有執(zhí)行完就有新的任務(wù)進(jìn)來(lái)了猎醇,就會(huì)繼續(xù)創(chuàng)建新的線程窥突,指導(dǎo)創(chuàng)建到Integer.MAX_VALUE
為止。
讓你的JVM OutOfMemoryError
下面提供一個(gè)使用newCachedThreadPool
創(chuàng)建大量線程處理Tasks硫嘶,最終OutOfMemoryError
的例子阻问。
友情提醒:場(chǎng)面過(guò)于血腥,請(qǐng)勿在生產(chǎn)環(huán)境使用沦疾。
package net.ijiangtao.tech.concurrent.jsd.threadpool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExample2 {
private static final ExecutorService executorService = Executors.newCachedThreadPool();
private static class Task implements Runnable {
@Override
public void run() {
try {
Thread.sleep(1000 * 600);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private static void newCachedThreadPoolTesterBadly() {
System.out.println("begin............");
for (int i = 0; i <= Integer.MAX_VALUE; i++) {
executorService.execute(new Task());
}
System.out.println("end.");
}
public static void main(String[] args) {
newCachedThreadPoolTesterBadly();
}
}
當(dāng)main
方法啟動(dòng)以后称近,打開控制面板第队,看到CPU和內(nèi)存幾乎已經(jīng)全部耗盡:
[圖片上傳失敗...(image-240e8d-1554126159952)]
很快控制臺(tái)就拋出了java.lang.OutOfMemoryError
:
begin............
Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:717)
at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1378)
at net.ijiangtao.tech.concurrent.jsd.threadpool.ThreadPoolExample2.newCachedThreadPoolTesterBadly(ThreadPoolExample2.java:24)
at net.ijiangtao.tech.concurrent.jsd.threadpool.ThreadPoolExample2.main(ThreadPoolExample2.java:30)
阿里巴巴Java開發(fā)手冊(cè)
下面我們?cè)诳碕ava開發(fā)手冊(cè)這條規(guī)定,應(yīng)該就明白作者的良苦用心了吧刨秆。
【強(qiáng)制】線程池不允許使用Executors去創(chuàng)建凳谦,而是通過(guò)ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學(xué)更加明確線程池的運(yùn)行規(guī)則衡未,規(guī)避資源耗盡的風(fēng)險(xiǎn)尸执。
說(shuō)明:
Executors返回的線程池對(duì)象的弊端如下:
1)FixedThreadPool和SingleThreadPool:允許的請(qǐng)求隊(duì)列長(zhǎng)度為Integer.MAX_VALUE,可能會(huì)堆積大量的請(qǐng)求缓醋,從而導(dǎo)致OOM如失。
2)CachedThreadPool和ScheduledThreadPool:允許的創(chuàng)建線程數(shù)量為Integer.MAX_VALUE,可能會(huì)創(chuàng)建大量的線程改衩,從而導(dǎo)致OOM岖常。
主角出場(chǎng)
解鈴還須系鈴人驯镊,其實(shí)避免這個(gè)OutOfMemoryError
風(fēng)險(xiǎn)的鑰匙就藏在Executors
的源碼里葫督,那就是自己直接使用ThreadPoolExecutor
。
ThreadPoolExecutor的構(gòu)造
構(gòu)造一個(gè)ThreadPoolExecutor
需要蠻多參數(shù)的板惑。下面是ThreadPoolExecutor
的構(gòu)造函數(shù)橄镜。
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
下面就一一介紹一下這些參數(shù)的具體含義。
ThreadPoolExecutor構(gòu)造參數(shù)說(shuō)明
其實(shí)從源碼中的JavaDoc已經(jīng)可以很清晰地明白這些參數(shù)的含義了冯乘,下面照顧懶得看英文的同學(xué)洽胶,再解釋一下:
- corePoolSize
線程池核心線程數(shù)。
默認(rèn)情況下核心線程會(huì)一直存活裆馒,即使處于閑置狀態(tài)也不會(huì)受存keepAliveTime
限制姊氓,除非將allowCoreThreadTimeOut
設(shè)置為true
。
- maximumPoolSize
線程池所能容納的最大線程數(shù)喷好。超過(guò)maximumPoolSize
的線程將被阻塞翔横。
最大線程數(shù)maximumPoolSize
不能小于corePoolSize
- keepAliveTime
非核心線程的閑置超時(shí)時(shí)間。
超過(guò)這個(gè)時(shí)間非核心線程就會(huì)被回收梗搅。
- TimeUnit
keepAliveTime
的時(shí)間單位禾唁,如TimeUnit.SECONDS。
當(dāng)將allowCoreThreadTimeOut
為true時(shí)无切,對(duì)corePoolSize生效荡短。
- workQueue
線程池中的任務(wù)隊(duì)列。
沒(méi)有獲得線程資源的任務(wù)將會(huì)被放入workQueue
哆键,等待線程資源被釋放掘托。如果放入workQueue
的任務(wù)數(shù)大于workQueue
的容量,將由RejectedExecutionHandler
的拒絕策略進(jìn)行處理籍嘹。
常用的有三種隊(duì)列:
SynchronousQueue
,LinkedBlockingDeque
,ArrayBlockingQueue
烫映。
- threadFactory
提供創(chuàng)建新線程功能的線程工廠沼本。
ThreadFactory
是一個(gè)接口,只有一個(gè)newThread
方法:
Thread newThread(Runnable r);
- rejectedExecutionHandler
無(wú)法被線程池處理的任務(wù)的處理器锭沟。
一般是因?yàn)槿蝿?wù)數(shù)超出了workQueue
的容量抽兆。
當(dāng)一個(gè)任務(wù)被加入線程池時(shí)
總結(jié)一下,當(dāng)一個(gè)任務(wù)通過(guò)execute(Runnable)
方法添加到線程池時(shí):
如果此時(shí)線程池中線程的數(shù)量小于
corePoolSize
族淮,即使線程池中的線程都處于空閑狀態(tài)辫红,也要?jiǎng)?chuàng)建新的線程來(lái)處理被添加的任務(wù)。如果此時(shí)線程池中的數(shù)量等于
corePoolSize
祝辣,但是緩沖隊(duì)列workQueue
未滿贴妻,那么任務(wù)被放入緩沖隊(duì)列。如果此時(shí)線程池中的數(shù)量大于
corePoolSize
蝙斜,緩沖隊(duì)列workQueue
滿名惩,并且線程池中的數(shù)量小于maximumPoolSize
,建新的線程來(lái)處理被添加的任務(wù)孕荠。如果此時(shí)線程池中的數(shù)量大于
corePoolSize
娩鹉,緩沖隊(duì)列workQueue
滿,并且線程池中的數(shù)量等于maximumPoolSize
稚伍,那么通過(guò) handler所指定的拒絕策略來(lái)處理此任務(wù)弯予。
處理任務(wù)的優(yōu)先級(jí)為:核心線程數(shù)(corePoolSize) > 任務(wù)隊(duì)列容量(workQueue) > 最大線程數(shù)(maximumPoolSize);如果三者都滿了,使用rejectedExecutionHandler處理被拒絕的任務(wù)个曙。
ThreadPoolExecutor的使用
下面就通過(guò)一個(gè)簡(jiǎn)單的例子锈嫩,使用ThreadPoolExecutor
構(gòu)造的線程池執(zhí)行任務(wù)。
ThreadPoolExample3
package net.ijiangtao.tech.concurrent.jsd.threadpool;
import java.time.LocalTime;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author ijiangtao.net
*/
public class ThreadPoolExample3 {
private static final AtomicInteger threadNumber = new AtomicInteger(1);
private static class Task implements Runnable {
@Override
public void run() {
try {
Thread.currentThread().sleep(2000);
System.out.println(Thread.currentThread().getName() + "-" + LocalTime.now());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private static class MyThreadFactory implements ThreadFactory {
private final String namePrefix;
public MyThreadFactory(String namePrefix) {
this.namePrefix = namePrefix;
}
public Thread newThread(Runnable runnable) {
return new Thread(runnable, namePrefix + "-" + threadNumber.getAndIncrement());
}
}
private static final ExecutorService executorService = new ThreadPoolExecutor(
10,
20, 30, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(50),
new MyThreadFactory("MyThreadFromPool"),
new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) {
// creates five tasks
Task r1 = new Task();
Task r2 = new Task();
Task r3 = new Task();
Task r4 = new Task();
Task r5 = new Task();
// submit方法有返回值
Future future = executorService.submit(r1);
System.out.println("r1 isDone ? " + future.isDone());
// execute方法沒(méi)有返回值
executorService.execute(r2);
executorService.execute(r3);
executorService.execute(r4);
executorService.execute(r5);
//關(guān)閉線程池
executorService.shutdown();
}
}
執(zhí)行結(jié)果
r1 isDone ? false
MyThreadFromPool-2-21:04:03.215
MyThreadFromPool-5-21:04:03.215
MyThreadFromPool-4-21:04:03.215
MyThreadFromPool-3-21:04:03.215
MyThreadFromPool-1-21:04:03.215
從結(jié)果看垦搬,從線程池取出了5個(gè)線程呼寸,并發(fā)執(zhí)行了5個(gè)任務(wù)。
總結(jié)
這一章我們介紹了一種更安全猴贰、更定制化的線程池構(gòu)建方式:ThreadPoolExecutor
对雪。相信你以后不敢輕易使用Executors
來(lái)構(gòu)造線程池了。
后面我們會(huì)介紹線程池的更多實(shí)現(xiàn)方式(例如使用Google核心庫(kù)Guava)糟趾,以及關(guān)于線程池的更多知識(shí)和實(shí)戰(zhàn)慌植。
Links
作者資源
相關(guān)資源
[Concurrent-ThreadPool-java-thread-pool-executor-example](https://howtodoinjava.com/java/multi-threading/java-thread-pool-executor-exampl
作者:濤哥 ( ijiangtao.net )
公眾號(hào):西召 ( westcall )
歡迎 評(píng)論、關(guān)注义郑、打賞蝶柿,轉(zhuǎn)發(fā)和點(diǎn)贊,你的鼓勵(lì)是我持續(xù)創(chuàng)作的動(dòng)力非驮。
濤哥這里交汤,干貨和濕貨,一應(yīng)俱全!