本文參考
Java線程池---addWorker方法解析
Java線程池ThreadPoolExecutor實(shí)現(xiàn)原理
線程池如何復(fù)用
Executor(Interface):
執(zhí)行提交的線程任務(wù)的對象。這個(gè)接口提供了一種將任務(wù)提交與每個(gè)任務(wù)將如何運(yùn)行實(shí)現(xiàn)了分離亡问,包括線程使用、調(diào)度等細(xì)節(jié)昵仅。該接口只定義了一個(gè)execute()方法邪媳。
// 用來執(zhí)行一個(gè)指令任務(wù),這個(gè)任務(wù)可能在一個(gè)新的線程中執(zhí)行,可能在線程池已有的線程中執(zhí)行巫延,也可能在當(dāng)前線程執(zhí)行,由Executor接口的實(shí)現(xiàn)方?jīng)Q定地消;
void execute(Runnable command);
ExecutorService(Interface):
提供用于管理終止的方法如 shutDown()和shutDownNow()用于關(guān)閉線程池的方法以及判斷線程池是否關(guān)閉的方法如炉峰,isShutdown(),isTerminated()的方法脉执,還提供了involveAll()和submit()
// 關(guān)閉線程池疼阔,不接受新的任務(wù),繼續(xù)執(zhí)行已經(jīng)提交的任務(wù),但是不等待已經(jīng)提交的任務(wù)完成婆廊,使用awaitTermination接口
void shutdown();
// 判斷當(dāng)前線程池是否被關(guān)閉
boolean isShutdown();
// 判斷是否所有提交的任務(wù)在showDown()后已經(jīng)執(zhí)行完畢
boolean isTerminated();
// 執(zhí)行一個(gè)任務(wù)迅细,并且返回這個(gè)任務(wù)的Future對象,用于表示這個(gè)任務(wù)的掛起結(jié)果
<T> Future<T> submit(Callable<T> task);
// 執(zhí)行許多任務(wù)淘邻,返回任務(wù)的results
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
// 執(zhí)行許多任務(wù)茵典,返回許多Future(包含status and results)
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
Callable:
和Runnable類似,是java開啟子線程的一種方式宾舅,用Callable開啟的線程可以取消執(zhí)行统阿;
- call():類似run方法
Future:
一個(gè)代表線程執(zhí)行結(jié)果的類,可以用來判斷線程是否運(yùn)行結(jié)束贴浙,主動終止線程
- get():等待線程執(zhí)行完畢砂吞,返回線程的結(jié)果
- cancel():停止線程
- isDown():判斷線程是否結(jié)束
- isCancel():判斷線程是否取消
ThreadPoolExecutor:
AbstractExecutorService的實(shí)現(xiàn)類,java線程池的核心實(shí)現(xiàn)類崎溃,下面我們重點(diǎn)看這個(gè)類蜻直;
重要變量:
// 構(gòu)造方法中的參數(shù)
private volatile int corePoolSize;// 核心線程數(shù)
private volatile int maximumPoolSize;// 最大線程數(shù)
private volatile long keepAliveTime; // 空閑線程的可存活時(shí)間
private final BlockingQueue<Runnable> workQueue; // 工作隊(duì)列,存放任務(wù)的數(shù)據(jù)結(jié)構(gòu)
private volatile ThreadFactory threadFactory; // 線程工廠袁串,創(chuàng)建新線程的工廠
private volatile RejectedExecutionHandler handler; // 拋棄線程策略
// 線程池狀態(tài)變量(這是一個(gè)atomic integer)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
深入解析Java AtomicInteger 原子類型
在線程池的內(nèi)部概而,封裝了一個(gè)Worker對象,Worker對象是線程池維護(hù)的用來處理用戶提交的任務(wù)的線程囱修;
// worker的本質(zhì)是一個(gè)任務(wù)
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
execute():
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // 如果正在運(yùn)行的線程數(shù)小于核心線程數(shù)
if (addWorker(command, true)) // 直接創(chuàng)建一個(gè)新線程執(zhí)行赎瑰,然后return
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { // 如果任務(wù)可以成功的加入阻塞隊(duì)列
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) // 雙重檢查:如果線程池被關(guān)閉并且任務(wù)成功的從阻塞隊(duì)列中取出,就放棄這個(gè)任務(wù)
reject(command);
else if (workerCountOf(recheck) == 0) // 如果當(dāng)前的線程數(shù)為0(線程池沒有被關(guān)閉)破镰,開啟新線程執(zhí)行
addWorker(null, false);
}
else if (!addWorker(command, false)) // 如果無法創(chuàng)建新線程去執(zhí)行這個(gè)任務(wù) 餐曼,也放棄這個(gè)任務(wù)
reject(command);
}
execute()的流程分為三部分(上面有詳細(xì)的英文注釋):
- 如果比正在運(yùn)行的核心線程數(shù)量小,則嘗試開啟一個(gè)新線程鲜漩,將這個(gè)任務(wù)作為他的第一個(gè)任務(wù)源譬,并且調(diào)用addWorker()去檢查 runState 和workerCount,避免不必要的錯(cuò)誤警報(bào)
- 如果一個(gè)任務(wù)可以成功的加入阻塞隊(duì)列中排隊(duì)孕似,我們需要再次雙重檢驗(yàn)是否我們需要?jiǎng)?chuàng)建一個(gè)新的線程踩娘,如果線程池被關(guān)閉(由于這個(gè)任務(wù)的某個(gè)邏輯導(dǎo)致)或者現(xiàn)存的線程死掉了,我們就調(diào)用handler去拒絕這個(gè)任務(wù)喉祭,如果當(dāng)前線程池沒有線程养渴,則創(chuàng)建一個(gè)新的線程
- 如果這個(gè)線程無法進(jìn)入阻塞隊(duì)列又無法創(chuàng)建新線程去執(zhí)行闰集,我們就用handler丟棄他钞它;
addWorker():
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 使用CAS機(jī)制輪詢線程池的狀態(tài)氧敢,如果線程池處于SHUTDOWN及大于它的狀態(tài)則拒絕執(zhí)行任務(wù)
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 如果是STOP坊秸,TIDYING,TERMINATED狀態(tài)的話乘盖,則會返回false哎迄,如果現(xiàn)在狀態(tài)是SHUTDOWN鲜戒,但是firstTask不為空或者workQueue為空的話咙冗,那么直接返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 獲得當(dāng)前workerCount,如果當(dāng)前workerCount大于容量/核心線程數(shù)/最大線程數(shù)中捆,return false
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 判斷當(dāng)前的線程池狀態(tài)是否可以創(chuàng)建新的worker鸯匹,可以的話 break retry,創(chuàng)建新worker
if (compareAndIncrementWorkerCount(c))
break retry;
// CAS的自旋泄伪,再次比較ctl殴蓬,如果不一致retry
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 創(chuàng)建新的worker
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 加鎖
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 在鎖內(nèi)部在進(jìn)行一次判斷是否滿足創(chuàng)建worker的條件
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
addWorker:
根據(jù)當(dāng)前線程池的狀態(tài)和給定的線程數(shù)大小(core/maximum)判斷是否可以添加一個(gè)新的worker蟋滴,如果可以染厅,創(chuàng)建一個(gè)新的worker,當(dāng)前任務(wù)作為他的first task津函,當(dāng)線程池被關(guān)閉或者工廠創(chuàng)建線程失敗肖粮,這個(gè)方法返回false;
CAS機(jī)制
什么是retry尔苦?
自帶的ThreadPoolExecutor對象
- newFixedThreadPool:一個(gè)固定線程數(shù)量的線程池:
- newCachedThreadPool:不固定線程數(shù)量涩馆,且支持最大為Integer.MAX_VALUE的線程數(shù)量:
- newSingleThreadExecutor:可以理解為線程數(shù)量為1的FixedThreadPool:
- newScheduledThreadPool:支持定時(shí)以指定周期循環(huán)執(zhí)行任務(wù):
拒絕策略(當(dāng)阻塞隊(duì)列和max都滿了做出的處理)
- AbortPolicy:直接拋出異常。
- CallerRunsPolicy:由調(diào)用者所在線程來運(yùn)行任務(wù)允坚。
- DiscardOldestPolicy:丟棄隊(duì)列里最老的一個(gè)任務(wù)魂那,并執(zhí)行當(dāng)前任務(wù)。
- DiscardPolicy:丟棄掉當(dāng)前提交的新任務(wù)稠项。
封裝的自定義的ThreadPoolExecutor
public class ThreadPoolManager {
/**
* 單例設(shè)計(jì)模式(餓漢式)
* 單例首先私有化構(gòu)造方法涯雅,然后餓漢式一開始就開始創(chuàng)建,并提供get方法
*/
private static ThreadPoolManager mInstance = new ThreadPoolManager();
public static ThreadPoolManager getInstance() {
return mInstance;
}
private int corePoolSize;//核心線程池的數(shù)量展运,同時(shí)能夠執(zhí)行的線程數(shù)量
private int maximumPoolSize;//最大線程池?cái)?shù)量活逆,表示當(dāng)緩沖隊(duì)列滿的時(shí)候能繼續(xù)容納的等待任務(wù)的數(shù)量
private long keepAliveTime = 1;//存活時(shí)間
private TimeUnit unit = TimeUnit.HOURS;
private ThreadPoolExecutor executor;
private ThreadPoolManager() {
/**
* 給corePoolSize賦值:當(dāng)前設(shè)備可用處理器核心數(shù)*2 + 1,能夠讓cpu的效率得到最大程度執(zhí)行
*/
corePoolSize = Runtime.getRuntime().availableProcessors() * 2 + 1;
maximumPoolSize = corePoolSize; //雖然maximumPoolSize用不到,但是需要賦值拗胜,否則報(bào)錯(cuò)
executor = new ThreadPoolExecutor(
corePoolSize, //當(dāng)某個(gè)核心任務(wù)執(zhí)行完畢蔗候,會依次從緩沖隊(duì)列中取出等待任務(wù)
maximumPoolSize, //5,先corePoolSize,然后new LinkedBlockingQueue<Runnable>(),然后maximumPoolSize,但是它的數(shù)量是包含了corePoolSize的
keepAliveTime, //表示的是maximumPoolSize當(dāng)中等待任務(wù)的存活時(shí)間
unit,
new LinkedBlockingQueue<Runnable>(), //緩沖隊(duì)列,用于存放等待任務(wù)挤土,Linked的先進(jìn)先出
Executors.defaultThreadFactory(), //創(chuàng)建線程的工廠
new ThreadPoolExecutor.AbortPolicy() //用來對超出maximumPoolSize的任務(wù)的處理策略
);
}
/**
* 執(zhí)行任務(wù)
*/
public void execute(Runnable runnable) {
if (runnable == null) return;
executor.execute(runnable);
}
/**
* 從線程池中移除任務(wù)
*/
public void remove(Runnable runnable) {
if (runnable == null) return;
executor.remove(runnable);
}
}
配置線程池
任務(wù)的性質(zhì):
- CPU密集型(大量計(jì)算) :CPU數(shù)+1
- IO密集型(磁盤IO,網(wǎng)絡(luò)IO):CPU數(shù)*2+1
線程池如何復(fù)用線程误算?
一個(gè)大run()把其它小run()#1,run()#2,...給串聯(lián)起來了
前提條件:假如coreSize=3仰美,maxSize=10,當(dāng)前存在線程數(shù)是5儿礼。
(注意咖杂,存在的這5個(gè)線程,并不是你執(zhí)行ExecuteService.execute/submit時(shí)的參數(shù)蚊夫,而是為了執(zhí)行execute/submit的參數(shù)所啟動的“內(nèi)部線程”诉字。這個(gè)“內(nèi)部線程”其實(shí)是通過ThreadPoolExecutor的ThreadFactory參數(shù)生成的線程,而“execute/submit的參數(shù)”是執(zhí)行在這些“內(nèi)部線程”里面的。)
存在這5個(gè)“內(nèi)部線程”壤圃,都訪問同一個(gè)隊(duì)列陵霉,從隊(duì)列中去取任務(wù)執(zhí)行(任務(wù)就是通過execute/submit提交的Runnable參數(shù)),當(dāng)任務(wù)充足時(shí)伍绳,5個(gè)“內(nèi)部線程”都持續(xù)執(zhí)行踊挠。重點(diǎn)是沒有任務(wù)時(shí)怎么辦?
沒有任務(wù)時(shí)冲杀,這5個(gè)“內(nèi)部線程”都會做下面判斷:
如果poolSize > coreSize效床,那就從隊(duì)列里取任務(wù),當(dāng)過了keepaliveTime這么長時(shí)間還沒有得到任務(wù)的話权谁,當(dāng)前這個(gè)“內(nèi)部線程”就會結(jié)束(使用的是BlockingQueue.poll方法)剩檀。
如果poolSize <= coreSize,那就以“阻塞”的方式旺芽,去從隊(duì)列里取任務(wù)沪猴,當(dāng)?shù)玫饺蝿?wù)后,就繼續(xù)執(zhí)行甥绿。這樣的話字币,這個(gè)線程就不會結(jié)束掉。
如果沒有任務(wù)可以繼續(xù)執(zhí)行了共缕,最后只剩下coreSize那么多的“內(nèi)部線程”留在線程池里洗出,等待重用。