一参咙、線程池實(shí)現(xiàn)原理
Java支持多線程表蝙,多線程可以提高任務(wù)的執(zhí)行效率瘪弓。但是Java里面的線程跟操作系統(tǒng)的線程是一一對(duì)應(yīng)的宠哄,所以創(chuàng)建過(guò)多的線程會(huì)對(duì)系統(tǒng)資源產(chǎn)生很大的消耗壹将,同時(shí)過(guò)多的線程競(jìng)爭(zhēng)CPU,也會(huì)產(chǎn)生頻繁的上下文切換毛嫉,結(jié)果可能適得其反诽俯,降低系統(tǒng)的運(yùn)行效率。
線程池的作用就是對(duì)線程的重復(fù)利用承粤,把線程數(shù)量控制在合理的范圍內(nèi)暴区,避免上述情況的產(chǎn)生。
線程池有幾個(gè)部分組成:
- 任務(wù)隊(duì)列:要執(zhí)行的任務(wù)集合
- 活躍的線程:用來(lái)執(zhí)行任務(wù)的線程
- 任務(wù)調(diào)度:控制任務(wù)執(zhí)行
所以線程池的基本原理可以用下圖來(lái)表示:
二辛臊、Java線程池類圖
- Executor : 定義任務(wù)提交執(zhí)行execute仙粱,執(zhí)行的線程可能是新的線程,線程池的線程浪讳,或者調(diào)用者的線程缰盏,取決于線程池的實(shí)現(xiàn)涌萤。
- ExecutorService : 定義了有返回值的任務(wù)提交submit淹遵,線程池的中斷。
- AbstractExecutorService : 實(shí)現(xiàn)submit方法
- ScheduledExecutorService : 定義定時(shí)執(zhí)行的方法
- ThreadPoolExecutor : 線程池的主要實(shí)現(xiàn)類
- ScheduledThreadPoolExecutor : 實(shí)現(xiàn)定時(shí)執(zhí)行線程池的邏輯
三负溪、ThreadPoolExecutor源碼分析
ThreadPoolExecutor是線程池的核心類透揣,包含線程池的主要實(shí)現(xiàn)邏輯。
3.1川抡、主要成員變量
- AtomicInteger ctl :表示線程池當(dāng)前的狀態(tài)(高3位)&有效線程數(shù)量(低29位)
- BlockingQueue<Runnable> workQueue : 任務(wù)隊(duì)列
- HashSet<Worker> workers : 活躍線程集合
- int corePoolSize : 核心線程數(shù)
- int maximumPoolSize : 最大線程數(shù)
- RejectedExecutionHandler defaultHandler : 任務(wù)拒絕策略
3.2辐真、execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
提交任務(wù)時(shí)须尚,如果當(dāng)前線程數(shù)量小于核心線程數(shù),不管是否有線程空閑侍咱,都會(huì)新建一個(gè)線程來(lái)執(zhí)行任務(wù)耐床,當(dāng)線程數(shù)量大于核心線程,小于最大線程時(shí)楔脯,只有當(dāng)workQueue滿的時(shí)候撩轰,才會(huì)新建線程來(lái)執(zhí)行,當(dāng)線程數(shù)量等于最大線程數(shù)時(shí)昧廷,并且workQueue滿的時(shí)候堪嫂,會(huì)執(zhí)行任務(wù)拒絕策略。
addWorder
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker封裝了執(zhí)行任務(wù)的線程木柬,它可以用一個(gè)任務(wù)初始化皆串。新增一個(gè)worker首先判斷當(dāng)前線程池的狀態(tài)還是否允許新增一個(gè)線程。如果可以眉枕,則會(huì)CAS更新workCount恶复。然后new一個(gè)worker,調(diào)用worker線程的start方法齐遵。
worker線程是實(shí)現(xiàn)了Runnable的方法寂玲,worker線程會(huì)調(diào)用它的run方法,進(jìn)而調(diào)用runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
這個(gè)方法就會(huì)判斷一些狀態(tài)梗摇,然后調(diào)用Task的run方法拓哟,執(zhí)行任務(wù)。
四伶授、ScheduledThreadPoolExecutor源碼分析
ScheduledThreadPoolExecutor 繼承自ThreadPoolExecutor断序,本身自帶了線程池的能力,只需要實(shí)現(xiàn)定時(shí)執(zhí)行功能即可糜烹。
定時(shí)執(zhí)行的基本原理是存儲(chǔ)任務(wù)的隊(duì)列是一個(gè)優(yōu)先級(jí)隊(duì)列违诗,按執(zhí)行時(shí)間的先后排序,任務(wù)線程去隊(duì)列獲取最近執(zhí)行的任務(wù)疮蹦,如果任務(wù)還沒(méi)有到執(zhí)行時(shí)間诸迟,則等待直到可以執(zhí)行。
ScheduledThreadPoolExecutor 會(huì)把要定時(shí)執(zhí)行的任務(wù)封裝成一個(gè)ScheduledFutureTask愕乎。
ScheduledFutureTask 有幾個(gè)成員變量:
- time 需要執(zhí)行的時(shí)間
- period 執(zhí)行周期
- sequenceNumber 序列號(hào)阵苇,如果時(shí)間相同,序列號(hào)小的先執(zhí)行
利用這三個(gè)參數(shù)感论,結(jié)合優(yōu)先級(jí)隊(duì)列绅项,可以實(shí)現(xiàn)定時(shí)執(zhí)行。
DelayedWorkQueue.take
public RunnableScheduledFuture take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture first = queue[0];
if (first == null)
available.await();
else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
else if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
先去獲取位于隊(duì)首的任務(wù)比肄,如果為NULL則wait,然后看時(shí)間是否到達(dá)快耿,如果到達(dá)了囊陡,返回任務(wù),如果沒(méi)有掀亥,則等待現(xiàn)在距離任務(wù)執(zhí)行之間的時(shí)間差撞反。
四、線程池使用注意
- 盡量使用 new ThreadPoolExecutor來(lái)構(gòu)建線程池搪花,這樣可以自己去控制和判斷線程池的大小痢畜。
- 如果是CPU密集型,線程數(shù)量可以設(shè)置為CPU核數(shù)+1
- 如果是IO密集型鳍侣,線程數(shù)量可以設(shè)置為2*CPU核數(shù)+1