從線程池使用進(jìn)行實(shí)現(xiàn)分析
一.自定義線程池
1.自定義線程池
2.構(gòu)造完成之后狀態(tài)
3.關(guān)鍵參數(shù)介紹
二.執(zhí)行任務(wù)
1.execute一個(gè)任務(wù)
2.執(zhí)行分析
三.線程池停止
1.shutDown分析
2.shutDownNow分析
四.線程池常見(jiàn)問(wèn)題
一.自定義線程池
1.自定義線程池
//阻塞隊(duì)列
LinkedBlockingQueue blockingQueue= new LinkedBlockingQueue<Runnable>(10);
//線程工廠
ThreadFactory threadFactory=new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("test");
return t;
}
};
//拒絕策略
RejectedExecutionHandler rejectedExecutionHandler=new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("自定義拒絕策略");
}
};
//核心線程數(shù)
Integer corePoolSize =3;
//最大線程數(shù)
Integer maxPoolSize=10;
//空閑線程等待時(shí)間
Integer keepAliveTime=7;
//構(gòu)造方法
ExecutorService threadPoolExecutor=new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime,
TimeUnit.SECONDS, blockingQueue, threadFactory, rejectedExecutionHandler);
2.構(gòu)造完成之后狀態(tài)
3.幾個(gè)關(guān)鍵參數(shù)
ctl:保存線程池存活狀態(tài),線程池內(nèi)線程數(shù)
workqueque:自定義的任務(wù)隊(duì)列
mainLock:線程池的鎖
termination:mainLock.newCondition()阻塞/通知線程;
keepAliveTime:非核心線程的存活時(shí)間
corePoolSize:3
maximumPoolSize:10
workers:線程集合 線程池內(nèi)存活的線程數(shù)=0
二.執(zhí)行任務(wù)
1.execute一個(gè)任務(wù)
workers: 0 到 1
2.執(zhí)行分析
1.首先檢查存活線程數(shù)量
2.根據(jù)存活數(shù)量進(jìn)行不同處理
3.開(kāi)辟新的線程/添加到任務(wù)隊(duì)列/開(kāi)新線程至最大線程數(shù)/拒絕任務(wù)
4.如果成功的開(kāi)了線程,調(diào)用線程的start()開(kāi)始處理
5.worker用while循環(huán)不斷獲取任務(wù).
6.直到當(dāng)前任務(wù)和任務(wù)隊(duì)列都為空,判斷是否阻塞直至有新任務(wù)到來(lái).
下面開(kāi)始就源碼進(jìn)行分析
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
1.獲取線程池狀態(tài)
int c = ctl.get();
2.取workers數(shù)量 和如果小于核心線程數(shù),生成一個(gè)新的worker
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
3.workers.count到達(dá)了核心線程數(shù),線程池是運(yùn)行狀態(tài),把任務(wù)添加到任務(wù)隊(duì)列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
4.再次檢查狀態(tài),如果不是運(yùn)行狀態(tài),將任務(wù)移除出隊(duì)列
if (! isRunning(recheck) && remove(command))
reject(command);
5.如果此時(shí)線程池內(nèi)沒(méi)有線程了,再生成一個(gè)線程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
6.如果worker加不上去了,執(zhí)行拒絕策略
else if (!addWorker(command, false))
reject(command);
}
從上面可以看到每來(lái)一個(gè)任務(wù)都線程數(shù)量會(huì)判斷使用策略
1.如果沒(méi)有到核心線程,添加新線程執(zhí)行任務(wù)
2.如果到核心線程,添加到任務(wù)隊(duì)列
3.如果隊(duì)列都滿了,那就添加新線程,知道最大線程數(shù).
分析addworker之前需要對(duì)worker這個(gè)內(nèi)部類進(jìn)行了解
可以看到,本質(zhì)上就是對(duì)線程的包裝.主要是繼承了AQS,通過(guò)鎖對(duì)線程進(jìn)行保護(hù).
下面開(kāi)始看addWorks方法
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
1.進(jìn)來(lái)先獲取線程池狀態(tài)
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
2.校驗(yàn)線程池狀態(tài)
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
3.先看是否達(dá)到最大容量.再判斷是否到達(dá)自定義規(guī)定容量
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if
4.這里采取的cas算法讓worker計(jì)數(shù)器+1.如果cas失敗,重復(fù)1234.
(compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
5.下面開(kāi)始正式創(chuàng)建worker
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
6.獲得線程池的鎖
final ReentrantLock mainLock = this.mainLock;
7.初始化worker,構(gòu)造方法里創(chuàng)建了線程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
8.因?yàn)閣orks是hashSet不是線程安全的,所以這里需要加鎖處理.里面添加worker,更新相關(guān)計(jì)數(shù)器
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
9.如果worker添加成功.直接啟動(dòng)線程
if (workerAdded) {
11.start方法實(shí)際調(diào)用worker runWorker()方法
t.start();
workerStarted = true;
}
}
} finally {
10.如果線程啟動(dòng)失敗,worker數(shù)量減少,嘗試中斷失敗線程.
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
對(duì)runWorker進(jìn)行分析
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
猜測(cè)這里unlock主要避免執(zhí)行shutdownNow時(shí)并發(fā)問(wèn)題
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
1.這里有兩種方式獲取任務(wù) getTask會(huì)根據(jù)情況是否阻塞線程.
getTask下面分析
while (task != null || (task = getTask()) != null) {
2.這里采用的獨(dú)占鎖,保證了任務(wù)開(kāi)始執(zhí)行后,只有在鎖之前已經(jīng)產(chǎn)生的終止?fàn)顟B(tài)才能使自我中斷设江。
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
3.這里終于開(kāi)始執(zhí)行任務(wù)了!<枇痢岂却!
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
4.最后根據(jù)狀態(tài)判斷線程是否可以正常退出.
processWorkerExit(w, completedAbruptly);
}
}
線程不斷獲取任務(wù)執(zhí)行,并且用獨(dú)占鎖保護(hù)線程執(zhí)行過(guò)程,這樣就保證了shutdown時(shí)后,正在執(zhí)行任務(wù)的縣城不會(huì)被中斷,但是shutdownNow不受此影響.可以直接粗暴的中斷線程.
getTask()分析
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
1.如果線程池調(diào)用了shutDown,線程池狀態(tài)就是shutdown.如果調(diào)用shutdown線程池狀態(tài)就是stop,這2個(gè)策略主要是后續(xù)任務(wù)處理方式不一樣.
如果是shutdown狀態(tài),隊(duì)列必須不為空,那么可以繼續(xù)從任務(wù)隊(duì)列獲取任務(wù)進(jìn)行處理
如果是stop狀態(tài),隊(duì)列不為空也不進(jìn)行處理.(shutDownNow時(shí)任務(wù)隊(duì)列已經(jīng)把任務(wù)返回了)
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
boolean timed; // Are workers subject to culling?
for (;;) {
int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc > corePoolSize;
if (wc <= maximumPoolSize && ! (timedOut && timed))
break;
if (compareAndDecrementWorkerCount(c))
return null;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
try {
2.判斷是阻塞一定時(shí)間還是一直阻塞直到取到任務(wù).
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
這里通過(guò)多重判斷,來(lái)決定線程是阻塞在這里等任務(wù),還是存活keepAliveTime.
1.如果線程數(shù)是最大線程數(shù),并且任務(wù)隊(duì)列為空,那么阻塞空閑線程還是存活keepAliveTime時(shí)間,期間沒(méi)有任務(wù)到來(lái),讓其向后執(zhí)行,自然中斷.
2.如果線程數(shù)是核心線程數(shù),根據(jù)allowCoreThreadTimeOut處理站欺。true情況下阻塞keepAliveTime時(shí)間,期間沒(méi)有任務(wù)到來(lái),讓其向后執(zhí)行,自然中斷.false情況下,一直阻塞,直到有任務(wù)加入隊(duì)列.
三.線程池停止
1.shutDown分析
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
1.這里應(yīng)該是做一些權(quán)限檢查
checkShutdownAccess();
2.利用CAS將變線程池轉(zhuǎn)變成shutDown狀態(tài)
advanceRunState(SHUTDOWN);
3.中斷空閑線程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
4.最后嘗試將線程池狀態(tài)設(shè)置為TIDYING狀態(tài)
tryTerminate();
}
interruptIdleWorkers
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
1.首先線程不是中斷狀態(tài),嘗試獲取線程的鎖.這里tryLock的實(shí)現(xiàn)就決定了
if (!t.isInterrupted() && w.tryLock()) {
try {
2.如果能拿到鎖終端線程
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
這里是先獲取線程鎖,獲取不到不終止,所以不能中斷任務(wù)中的線程,并且會(huì)把隊(duì)列中的任務(wù)處理完
tryLock
tryLock里面調(diào)用了tryAcquire
protected boolean tryAcquire(int unused) {
1.這里采用了獨(dú)占鎖的實(shí)現(xiàn)方式 0到1才能獲取鎖.所以保證了正在執(zhí)行任務(wù)的線程不會(huì)被shutdown中斷
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
這是自定義了aqs獲取鎖的方式
2.shutDownNow分析
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
1.第1個(gè)區(qū)別嘗試中斷所有線程
interruptWorkers();
1.第2個(gè)區(qū)別,把隊(duì)列中的任務(wù)返回給調(diào)用者處理
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
interruptWorkers();
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
1.這里就粗暴多了,直接終止,可能導(dǎo)致有些線程不能正常終止,處于異常狀態(tài).
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
====================================================
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
直接粗暴的終止線程,不管線程上沒(méi)上鎖.
線程池常見(jiàn)問(wèn)題
1.線程池對(duì)線程包裝的作用
2.關(guān)鍵參數(shù)(allowCoreThreadTimeOut,keepAliveTime,corePoolSize,maximumPoolSize,ctl)
3.關(guān)鍵技術(shù)(CAS,AbstractQueuedSynchronizer,ReentrantLock,LinkedB lockingQueue)
4.線程池線程數(shù)動(dòng)態(tài)變化過(guò)程
5.shutDown和shutDownNow執(zhí)行上有什么區(qū)別
6.excute和submit區(qū)別
7.核心線程是什么情況下減少到0