Java 線程池的使用作岖,是面試必問(wèn)的割以。下面我們來(lái)從使用到源碼整理一下义钉。
1、構(gòu)造線程池
- 通過(guò)Executors來(lái)構(gòu)造線程池
1典唇、構(gòu)造一個(gè)固定線程數(shù)目的線程池,配置的corePoolSize與maximumPoolSize大小相同胯府,
同時(shí)使用了一個(gè)無(wú)界LinkedBlockingQueue存放阻塞任務(wù)介衔,因此多余的任務(wù)將存在阻塞隊(duì)列,
不會(huì)由RejectedExecutionHandler處理
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
2盟劫、構(gòu)造一個(gè)緩沖功能的線程池夜牡,配置corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE侣签,
keepAliveTime=60s,以及一個(gè)無(wú)容量的阻塞隊(duì)列 SynchronousQueue塘装,因此任務(wù)提交之后,
將會(huì)創(chuàng)建新的線程執(zhí)行影所;線程空閑超過(guò)60s將會(huì)銷毀
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
3蹦肴、構(gòu)造一個(gè)只支持一個(gè)線程的線程池,配置corePoolSize=maximumPoolSize=1猴娩,
無(wú)界阻塞隊(duì)列LinkedBlockingQueue阴幌;保證任務(wù)由一個(gè)線程串行執(zhí)行
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
4勺阐、構(gòu)造有定時(shí)/延時(shí)功能的線程池,配置corePoolSize矛双,無(wú)界延遲阻塞隊(duì)列DelayedWorkQueue渊抽;
有意思的是:maximumPoolSize=Integer.MAX_VALUE,由于DelayedWorkQueue是無(wú)界隊(duì)列议忽,
所以這個(gè)值是沒(méi)有意義的
對(duì)于一些不能及時(shí)處理懒闷,需要延時(shí)處理的操作,用ScheduledExecutorService處理很方便栈幸,
比如我們?cè)谀承l件下需要清理redis/mysql中數(shù)據(jù)時(shí)愤估,但是可能當(dāng)前有些地方還需要用到(并發(fā)),這時(shí)用ScheduledExecutorService處理非常合適速址,
雖然也可以用定時(shí)任務(wù)處理玩焰,但是定時(shí)任務(wù)會(huì)一直執(zhí)行,而這里的場(chǎng)景是滿足一定條件去執(zhí)行芍锚,而執(zhí)行的機(jī)會(huì)又很少昔园。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
注:阻塞隊(duì)列與普通隊(duì)列的區(qū)別在于,當(dāng)隊(duì)列是空的時(shí)闹炉,從隊(duì)列中獲取元素的操作將會(huì)被阻塞蒿赢,
或者當(dāng)隊(duì)列是滿時(shí),往隊(duì)列里添加元素的操作會(huì)被阻塞渣触。
試圖從空的阻塞隊(duì)列中獲取元素的線程將會(huì)被阻塞羡棵,直到其他的線程往空的隊(duì)列插入新的元素。
同樣嗅钻,試圖往已滿的阻塞隊(duì)列中添加新元素的線程同樣也會(huì)被阻塞皂冰,直到其他的線程使隊(duì)列重新變得空閑起來(lái),
如從隊(duì)列中移除一個(gè)或者多個(gè)元素养篓,或者完全清空隊(duì)列.
- 通過(guò)ThreadPoolExecutor自定義線程池
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler )
* corePoolSize 核心線程池大小----1
* maximumPoolSize 最大線程池大小----3
* keepAliveTime 線程池中超過(guò)corePoolSize數(shù)目的空閑線程最大存活時(shí)間----30
* keepAliveTime時(shí)間單位----TimeUnit.MINUTES
* workQueue 阻塞隊(duì)列----new ArrayBlockingQueue<Runnable>(5)----阻塞隊(duì)列的容量是5
* threadFactory 新建線程工廠----new CustomThreadFactory()----定制的線程工廠
* rejectedExecutionHandler 當(dāng)提交任務(wù)數(shù)超過(guò)maxmumPoolSize+workQueue之和(3+5)秃流,
即當(dāng)提交第9個(gè)任務(wù)時(shí)(前面線程都沒(méi)有執(zhí)行完,此測(cè)試方法中用 sleep(30),
任務(wù)會(huì)交給RejectedExecutionHandler來(lái)處理
new ThreadPoolExecutor(
1,
3,
30,
TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(5),
new CustomThreadFactory(),
new CustomRejectedExecutionHandler());
package com.vendor.control.web.device;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by zhangkai on 2019/8/12.
*/
public class CustomThreadPoolExecutor
{
private ThreadPoolExecutor pool = null;
public void init() {
pool = new ThreadPoolExecutor(
1,
3,
30,
TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(5),
new CustomThreadFactory(),
new CustomRejectedExecutionHandler());
}
public void destory() {
if(pool != null) {
pool.shutdownNow();
}
}
public ExecutorService getCustomThreadPoolExecutor() {
return this.pool;
}
private class CustomThreadFactory implements ThreadFactory
{
private AtomicInteger count = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
String threadName = CustomThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1);
System.out.println(threadName);
t.setName(threadName);
return t;
}
}
private class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// 當(dāng)使用blockingqueue的offer插入數(shù)據(jù)時(shí),如果隊(duì)列已滿柳弄,那么阻塞指定時(shí)間等待隊(duì)列可用舶胀,
//等待期間如果被中斷,那么拋出InterruptedException碧注。
// 如果插入成功嚣伐,那么返回true,如果在達(dá)到指定時(shí)間后仍然隊(duì)列不可用萍丐,
//那么返回false轩端。===========超時(shí)退出
// 使用put 時(shí),插入數(shù)據(jù)時(shí)逝变,如果隊(duì)列已滿基茵,那么阻塞等待隊(duì)列可用奋构,等待期間如果被中斷,
//那么拋出InterruptedException拱层。 ============= 一直阻塞:
System.out.println("拒絕任務(wù)");
executor.getQueue().offer(r); //會(huì)有任務(wù)線程不執(zhí)行
//executor.getQueue().put(r); //不會(huì)有任務(wù)線程不執(zhí)行
} catch (Exception e) {
e.printStackTrace();
}
}
}
// 測(cè)試構(gòu)造的線程池
public static void main(String[] args) {
CustomThreadPoolExecutor exec = new CustomThreadPoolExecutor();
// 1.初始化
exec.init();
ExecutorService pool = exec.getCustomThreadPoolExecutor();
for(int i=1; i<=10; i++) {
System.out.println("提交第" + i + "個(gè)任務(wù)!");
pool.execute(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(30);
System.out.println(">>>task is running=====");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
// 2.銷毀----此處不能銷毀,因?yàn)槿蝿?wù)沒(méi)有提交執(zhí)行完,如果銷毀線程池,任務(wù)也就無(wú)法執(zhí)行了
// exec.destory();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2弥臼、線程池執(zhí)行流程
源碼
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 1、工作線程 < 核心線程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2舱呻、運(yùn)行態(tài)醋火,并嘗試將任務(wù)加入隊(duì)列悠汽;如果能加入箱吕,說(shuō)明隊(duì)列沒(méi)滿
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
} // 3、工作線程 < 核心線程柿冲,并且隊(duì)列滿了茬高,那么繼續(xù)新建線程,嘗試使用最大線程運(yùn)行
else if (!addWorker(command, false))
reject(command);
}
從上面這個(gè)圖中可以看出假抄,在創(chuàng)建了線程池后怎栽,默認(rèn)情況下,線程池中并沒(méi)有任何線程宿饱,而是等待有任務(wù)到來(lái)才創(chuàng)建線程去執(zhí)行任務(wù)熏瞄,除非調(diào)用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個(gè)方法的名字就可以看出谬以,是預(yù)創(chuàng)建線程的意思强饮,即在沒(méi)有任務(wù)到來(lái)之前就創(chuàng)建corePoolSize個(gè)線程或者一個(gè)線程。默認(rèn)情況下为黎,在創(chuàng)建了線程池后邮丰,線程池中的線程數(shù)為0,當(dāng)有任務(wù)來(lái)之后铭乾,并且工作線程數(shù)<核心線程數(shù)時(shí)剪廉,就會(huì)創(chuàng)建一個(gè)線程去執(zhí)行任務(wù),當(dāng)線程池中的線程數(shù)目達(dá)到corePoolSize后炕檩,就會(huì)把到達(dá)的任務(wù)放到緩存隊(duì)列當(dāng)中斗蒋;
在核心線程數(shù)創(chuàng)建以后,就不會(huì)再關(guān)閉了笛质,這個(gè)創(chuàng)建過(guò)程類似懶加載泉沾,只有需要用的時(shí)候才去創(chuàng)建
當(dāng)核心線程數(shù)執(zhí)行完其第一個(gè)任務(wù)以后,就會(huì)阻塞经瓷,等待從隊(duì)列中獲取任務(wù)(getTask)爆哑,獲取到的話,線程就繼續(xù)執(zhí)行任務(wù)舆吮。見(jiàn)下面runWorker源碼揭朝。
ThreadPoolExecutor執(zhí)行順序總結(jié):
當(dāng)線程數(shù)小于核心線程數(shù)時(shí)队贱,創(chuàng)建線程。
當(dāng)線程數(shù)大于等于核心線程數(shù)潭袱,且任務(wù)隊(duì)列未滿時(shí)柱嫌,將任務(wù)放入任務(wù)隊(duì)列。
當(dāng)線程數(shù)大于等于核心線程數(shù)屯换,且任務(wù)隊(duì)列已滿
若線程數(shù)小于最大線程數(shù)编丘,創(chuàng)建線程
若線程數(shù)等于最大線程數(shù),拋出異常彤悔,拒絕任務(wù)
簡(jiǎn)單的說(shuō)嘉抓,
- addWorker(command, true): 創(chuàng)建核心線程執(zhí)行任務(wù);
- addWorker(command, false):創(chuàng)建非核心線程執(zhí)行任務(wù)晕窑;
- addWorker(null, false): 創(chuàng)建非核心線程抑片,當(dāng)前任務(wù)為空;
- addWorker(null,true) : 預(yù)先創(chuàng)建corePoolSize個(gè)線程杨赤;
addWorker源碼
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//啟動(dòng)線程敞斋,執(zhí)行任務(wù),這里調(diào)用的其實(shí)是worker的run方法疾牲,見(jiàn)下面Worker構(gòu)造方法
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
//Worker構(gòu)造方法植捎,thread變量構(gòu)造的線程是它本身,即當(dāng)調(diào)用Worker中thread.start()時(shí)阳柔,
//最終執(zhí)行的是Worker類的run方法
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
//執(zhí)行任務(wù)時(shí)焰枢,最后調(diào)用的方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//第一次執(zhí)行task時(shí),task肯定不為空盔沫,當(dāng)firstTask執(zhí)行完以后医咨,while循環(huán)等待,
//指導(dǎo)從隊(duì)列中獲取到task架诞,即getTask()不為空時(shí)拟淮,getTask就是從隊(duì)列中獲取任務(wù)
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
3、拒絕策略
當(dāng)線程池的任務(wù)緩存隊(duì)列已滿并且線程池中的線程數(shù)目達(dá)到maximumPoolSize時(shí)谴忧,如果還有任務(wù)到來(lái)就會(huì)采取任務(wù)拒絕策略很泊,通常有以下四種策略:
- ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。
- ThreadPoolExecutor.DiscardPolicy:丟棄任務(wù)沾谓,但是不拋出異常委造。
- ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新提交被拒絕的任務(wù)
- ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程(提交任務(wù)的線程)處理該任務(wù)
線程池的默認(rèn)拒絕策略為AbortPolicy均驶,即丟棄任務(wù)并拋出RejectedExecutionException異常昏兆。
4、線程池優(yōu)雅關(guān)閉
從源碼中可以看到妇穴,有兩種關(guān)閉方式爬虱,shutdown和shutdownNow隶债。
- executorService.shutdown():線程池拒接收新提交的任務(wù),同時(shí)立馬關(guān)閉線程池跑筝,線程池里的任務(wù)不再執(zhí)行死讹。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//設(shè)置線程池狀態(tài)為SHUTDOWN,之后就不能再向線程池提交任務(wù)了
advanceRunState(SHUTDOWN);
//遍歷所有未執(zhí)行任務(wù)的線程曲梗,對(duì)其設(shè)置中斷
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//判斷所有任務(wù)是否都已退出赞警,如果退出則設(shè)置標(biāo)記 “TERMINATED”
//在這里會(huì)死循環(huán)一直等到所有線程都執(zhí)行完任務(wù)后,再次中斷線程
tryTerminate();
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
//中斷時(shí)虏两,先獲取鎖愧旦,runWorker中執(zhí)行任務(wù)時(shí),會(huì)先lock加鎖(見(jiàn)上面runWorker源碼)
//所以碘举,這里其實(shí)只會(huì)對(duì)中斷空閑線程
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
}
} finally {
mainLock.unlock();
}
}
從上面的源碼中可以看出忘瓦,當(dāng)我們調(diào)用線程池的shuwdown方法時(shí),如果線程正在執(zhí)行線程池里的任務(wù)引颈,即便任務(wù)處于阻塞狀態(tài),線程也不會(huì)被中斷境蜕,而是繼續(xù)執(zhí)行(因?yàn)橛屑渔i蝙场,所以interruptIdleWorkers中worker獲取不到鎖,所以執(zhí)行不了中斷)粱年。
如果線程池阻塞等待從隊(duì)列里讀取任務(wù)getTask()售滤,則會(huì)被喚醒,但是會(huì)繼續(xù)判斷隊(duì)列是否為空台诗,如果不為空會(huì)繼續(xù)從隊(duì)列里讀取任務(wù)完箩,為空則線程退出。
- executorService.shutdownNow():線程池拒接收新提交的任務(wù)拉队,同時(shí)立馬關(guān)閉線程池弊知,線程池里的任務(wù)不再執(zhí)行。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//修改線程池的狀態(tài)為STOP狀態(tài)
advanceRunState(STOP);
//遍歷線程池里的所有工作線程粱快,然后調(diào)用線程的interrupt方法
interruptWorkers();
//將隊(duì)列里還沒(méi)有執(zhí)行的任務(wù)放到列表里秩彤,返回給調(diào)用方
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
//直接中斷
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
//移除工作隊(duì)列中任務(wù),同時(shí)把其返回給調(diào)用方
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
當(dāng)我們調(diào)用線程池的shutdownNow時(shí)事哭,如果線程正在getTask方法中執(zhí)行漫雷,則會(huì)通過(guò)for循環(huán)進(jìn)入到if語(yǔ)句,于是getTask返回null鳍咱,從而線程退出(getTask源碼中會(huì)先判斷線程狀態(tài)降盹,而上一步已經(jīng)把線程狀態(tài)修改為STOP了)。不管線程池里是否有未完成的任務(wù)谤辜。
如果線程因?yàn)閳?zhí)行提交到線程池里的任務(wù)而處于阻塞狀態(tài)蓄坏,則會(huì)導(dǎo)致報(bào)錯(cuò)(如果任務(wù)里沒(méi)有捕獲InterruptedException異常)仅胞,否則線程會(huì)執(zhí)行完當(dāng)前任務(wù),然后通過(guò)getTask方法返回為null來(lái)退出剑辫。
總結(jié):調(diào)用完shutdownNow和shuwdown方法后干旧,并不代表線程池已經(jīng)完成關(guān)閉操作,它只是異步的通知線程池進(jìn)行關(guān)閉處理妹蔽。如果要同步等待線程池徹底關(guān)閉后才繼續(xù)往下執(zhí)行椎眯,需要調(diào)用awaitTermination方法進(jìn)行同步等待。
5胳岂、ThreadPoolExecutor參數(shù)設(shè)置
5.1 默認(rèn)值
- corePoolSize=1
- queueCapacity=Integer.MAX_VALUE
- maxPoolSize=Integer.MAX_VALUE
- keepAliveTime=60s
- allowCoreThreadTimeout=false
- rejectedExecutionHandler=AbortPolicy()
5.2 自定義線程池參數(shù)的合理設(shè)置
為了說(shuō)明合理設(shè)置的條件编整,我們首先確定有以下?個(gè)相關(guān)參數(shù):
- 1.tasks,程序每秒需要處理的最?任務(wù)數(shù)量(假設(shè)系統(tǒng)每秒任務(wù)數(shù)為100~1000)
- 2.tasktime乳丰,單線程處理?個(gè)任務(wù)所需要的時(shí)間(每個(gè)任務(wù)耗時(shí)0.1秒)
- 3.responsetime掌测,系統(tǒng)允許任務(wù)最?的響應(yīng)時(shí)間(每個(gè)任務(wù)的響應(yīng)時(shí)間不得超過(guò)2秒)
corePoolSize:核心線程數(shù)
每個(gè)任務(wù)需要tasktime秒處理,則每個(gè)線程每秒可處理1/tasktime個(gè)任務(wù)产园。系統(tǒng)每秒有tasks個(gè)任務(wù)需要處理汞斧,則需要的線程數(shù)為:tasks/(1/tasktime),即tasks*tasktime個(gè)線程數(shù)什燕。
假設(shè)系統(tǒng)每秒任務(wù)數(shù)為100到1000之間粘勒,每個(gè)任務(wù)耗時(shí)0.1秒,則需要100x0.1?1000x0.1屎即,即10到100個(gè)線程庙睡。
那么corePoolSize應(yīng)該設(shè)置為大于10。具體數(shù)字最好根據(jù)8020原則技俐,即80%情況下系統(tǒng)每秒任務(wù)數(shù)乘陪,若系統(tǒng)80%的情況下任務(wù)數(shù)小于200,最多時(shí)為1000雕擂,則corePoolSize可設(shè)置為20啡邑。
queueCapacity:任務(wù)隊(duì)列的長(zhǎng)度:
任務(wù)隊(duì)列的長(zhǎng)度要根據(jù)核心線程數(shù),以及系統(tǒng)對(duì)任務(wù)響應(yīng)時(shí)間的要求有關(guān)捂刺。隊(duì)列長(zhǎng)度可以設(shè)置為(corePoolSize/tasktime) * responsetime=(20/0.1) * 2=400谣拣,即隊(duì)列長(zhǎng)度可設(shè)置為400。
如果隊(duì)列長(zhǎng)度設(shè)置過(guò)?族展,會(huì)導(dǎo)致任務(wù)響應(yīng)時(shí)間過(guò)長(zhǎng)森缠,如以下寫(xiě)法:
LinkedBlockingQueue queue = new LinkedBlockingQueue();
這實(shí)際上是將隊(duì)列長(zhǎng)度設(shè)置為Integer.MAX_VALUE,將會(huì)導(dǎo)致線程數(shù)量永遠(yuǎn)為corePoolSize仪缸,再也不會(huì)增加贵涵,當(dāng)任務(wù)數(shù)量陡增時(shí),任務(wù)響應(yīng)時(shí)間也將隨之陡增。
maxPoolSize:最大線程數(shù)
當(dāng)系統(tǒng)負(fù)載達(dá)到最?值時(shí)宾茂,核心線程數(shù)已無(wú)法按時(shí)處理完所有任務(wù)瓷马,這時(shí)就需要增加線程。
每秒200個(gè)任務(wù)需要20個(gè)線程跨晴,那么當(dāng)每秒達(dá)到1000個(gè)任務(wù)時(shí)欧聘,則需要(1000-queueCapacity) * 0.1,即60個(gè)線程端盆,可將maxPoolSize設(shè)置為60怀骤。
keepAliveTime:
線程數(shù)量只增加不減少也不?。當(dāng)負(fù)載降低時(shí)焕妙,可減少線程數(shù)量蒋伦,如果?個(gè)線程空閑時(shí)間達(dá)到keepAliveTiime,該線程就退出焚鹊。默認(rèn)情況下線程池最少會(huì)保持corePoolSize個(gè)線程痕届。keepAliveTiime設(shè)定值可根據(jù)任務(wù)峰值持續(xù)時(shí)間來(lái)設(shè)定。
rejectedExecutionHandler:
根據(jù)具體情況來(lái)決定末患,任務(wù)不重要可丟棄研叫,任務(wù)重要?jiǎng)t要利用一些緩沖機(jī)制來(lái)處理
以上關(guān)于線程數(shù)量的計(jì)算并沒(méi)有考慮CPU的情況。若結(jié)合CPU的情況阻塑,比如蓝撇,當(dāng)線程數(shù)量達(dá)到50時(shí),CPU達(dá)到100%陈莽,則將maxPoolSize設(shè)置為60也不合適,此時(shí)若系統(tǒng)負(fù)載長(zhǎng)時(shí)間維持在每秒1000個(gè)任務(wù)虽抄,則超出線程池處理能?走搁,應(yīng)設(shè)法降低每個(gè)任務(wù)的處理時(shí)間(tasktime)。
補(bǔ)充:線程中斷
在程序中迈窟,我們是不能隨便中斷一個(gè)線程的私植,因?yàn)檫@是極其不安全的操作,我們無(wú)法知道這個(gè)線程正運(yùn)行在什么狀態(tài)车酣,它可能持有某把鎖曲稼,強(qiáng)行中斷可能導(dǎo)致鎖不能釋放的問(wèn)題;或者線程可能在操作數(shù)據(jù)庫(kù)湖员,強(qiáng)行中斷導(dǎo)致數(shù)據(jù)不一致混亂的問(wèn)題贫悄。正因此,JAVA里將Thread的stop方法設(shè)置為過(guò)時(shí)娘摔,以禁止大家使用窄坦。
一個(gè)線程什么時(shí)候可以退出呢?當(dāng)然只有線程自己才能知道。
所以我們這里要說(shuō)的Thread的interrrupt方法鸭津,本質(zhì)不是用來(lái)中斷一個(gè)線程彤侍。是將線程設(shè)置一個(gè)中斷狀態(tài)。
當(dāng)我們調(diào)用線程的interrupt方法逆趋,它有兩個(gè)作用:
- 1盏阶、如果此線程處于阻塞狀態(tài)(比如調(diào)用了wait方法,io等待)闻书,則會(huì)立馬退出阻塞名斟,并拋出InterruptedException異常,線程就可以通過(guò)捕獲InterruptedException來(lái)做一定的處理惠窄,然后讓線程退出蒸眠。
- 2、如果此線程正處于運(yùn)行之中杆融,則線程不受任何影響楞卡,繼續(xù)運(yùn)行,僅僅是線程的中斷標(biāo)記被設(shè)置為true脾歇。所以線程要在適當(dāng)?shù)奈恢猛ㄟ^(guò)調(diào)用isInterrupted方法來(lái)查看自己是否被中斷蒋腮,并做退出操作。
如果線程的interrupt方法先被調(diào)用藕各,然后線程調(diào)用阻塞方法進(jìn)入阻塞狀態(tài)池摧,InterruptedException異常依舊會(huì)拋出。
如果線程捕獲InterruptedException異常后激况,繼續(xù)調(diào)用阻塞方法作彤,將不再觸發(fā)InterruptedException異常。
參考:
http://www.reibang.com/p/f030aa5d7a28
http://www.reibang.com/p/23cb8b903d2c
https://www.cnblogs.com/zedosu/p/6665306.html
https://blog.csdn.net/mayongzhan_csdn/article/details/80790966