JAVA中提供的線程池
Executors工廠類
Executors工具類提供了5種線程池的創(chuàng)建方法
// 線程數(shù)動態(tài)創(chuàng)建噪矛,每個空閑線程會在默認(rèn)60秒后被回收
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
// 固定線程數(shù)的線程池
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
// 定時器線程
ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(10);
// 單線程松邪,只有一個核心線程的線程池
ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
// fork/join線程池
ExecutorService newWorkStealingPool = Executors.newWorkStealingPool();
每種線程池都有不同的作用,這里不一一展開,只從他們的原理說明缤剧。
我們看下這五種線程池的構(gòu)造方法
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
除了newScheduledThreadPool
和newWorkStealingPool
,其他都是通過創(chuàng)建
ThreadPoolExecutor
實現(xiàn)線程池.下面對ThreadPoolExecutor
構(gòu)造函數(shù)的參數(shù)進行了解。
public ThreadPoolExecutor(int corePoolSize, //核心線程數(shù)
int maximumPoolSize, // 最大線程數(shù)
long keepAliveTime, //空閑超時時間缎除,worker線程無任務(wù)執(zhí)行严就,等待時間
TimeUnit unit, //超時時間單位
BlockingQueue<Runnable> workQueue, //等待隊列
ThreadFactory threadFactory, //線程工廠
RejectedExecutionHandler handler //拒絕策略
)
通過對ThreadPoolExecutor
參數(shù)的不同設(shè)置,Executors
創(chuàng)建了不同功能的線程池器罐。
ThreadPoolExecutor的實現(xiàn)原理
ThreadPoolExecutor
中包含屬性Ctl使用高三位保存線程池的運行狀態(tài)梢为,低29位保存工作線程數(shù),使用位運算進行操作轰坊。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
再看一下execute
方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 當(dāng)前worker數(shù)量小于核心線程數(shù) 則添加worker
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果超過核心線程數(shù)則嘗試加入等待隊列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 如果這時有線程池shutdown了铸董,把剛剛添加的移除
if (! isRunning(recheck) && remove(command))
//執(zhí)行拒絕策略
reject(command);
// 添加空worker
else if (workerCountOf(recheck) == 0)
//注意此時core參數(shù)為false,用于判斷最大線程數(shù)
addWorker(null, false);
}
// worker數(shù)量達(dá)到最大線程數(shù)肴沫,添加失敗粟害,拒絕
else if (!addWorker(command, false))
reject(command);
}
再來看addWorker
方法
private boolean addWorker(Runnable firstTask, boolean core) {
//第一部分開始 主要作用判斷添加的worker是否超出數(shù)量
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//工作線程數(shù)大于線程池容量,或則大于需要判斷的線程數(shù)大小颤芬,則添加失敗悲幅,進入等待隊列
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 第一部分結(jié)束
// 第二部分 添加worker信息,將線程信息封裝worker對象站蝠,添加到workers(HashSet集合)
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 初始化了一個線程汰具,firstTask為execute方法傳入的線程實現(xiàn)。
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 啟動worker初始化是創(chuàng)建的線程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
// 第二部分結(jié)束
return workerStarted;
}
再來看下worker
結(jié)構(gòu)
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
}
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//通過傳入當(dāng)前worker對象創(chuàng)建線程
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
worker
繼承了AQS,通過實現(xiàn)lock和unlock方法實現(xiàn)任務(wù)的執(zhí)行菱魔,防止執(zhí)行過程中被中斷留荔。并且包含一個Thread.
當(dāng)新建一個worker
的時候創(chuàng)建了一個線程,而addWorker
完成后啟動了這個線程,而這個線程傳入了worker豌习,worker實現(xiàn)了runnable接口的run方法存谎。真正啟動的是runworker方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 當(dāng)前worker是不斷循環(huán),線程復(fù)用的地方是getTask肥隆,getTask從等待隊列中獲取任務(wù)既荚,如果拿到任務(wù)了就繼續(xù)運行。如果為空則銷毀非核心線程worker
while (task != null || (task = getTask()) != null) {
// 這里的作用是當(dāng)線程池shutdown時栋艳,不中斷已經(jīng)運行的線程
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//通過worker線程執(zhí)行線程的run方法到達(dá)恰聘,線程池線程復(fù)用的作用
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 執(zhí)行完成移除worker,核心線程會被復(fù)用吸占。核心線程如果需要回收需要設(shè)置allowCoreThreadTimeOut
processWorkerExit(w, completedAbruptly);
}
}
線程池的核心是復(fù)用線程晴叨,更好的管理線程的創(chuàng)建。阿里P3C規(guī)范中不建議使用Executors創(chuàng)建線程池矾屯,原因是為了讓使用者更好的控制ThreadPoolExecutor
參數(shù)兼蕊,實現(xiàn)自己想要的結(jié)果。
另外ThreadPoolExecutor
提供submit
方法支持傳入Callable
接口件蚕,并且?guī)Х祷刂邓锛肌F鋵崿F(xiàn)原理是FutureTask
維護任務(wù)的執(zhí)行狀態(tài)产禾,通過Future.get()
方法中判斷任務(wù)未完成,則調(diào)用LockSupport.park
掛起線程牵啦,當(dāng)任務(wù)執(zhí)行完成再unpark返回結(jié)果亚情。
線程池大小的設(shè)置
- 硬件和軟件上的限制。
- CPU核心數(shù)的考慮
- 程序運行任務(wù)的類別哈雏,當(dāng)時IO密集型的任務(wù)楞件,則可以設(shè)置更多的線程數(shù),例如設(shè)置CPU核心數(shù)*2裳瘪。而如果是CPU密集型的任務(wù)土浸,則設(shè)置更多線程數(shù)反而影響性能,例如可以設(shè)置CPU核心數(shù)+1盹愚。