寫在前面的話
最近一直都在研究Java的線程池ThreadPoolExecutor蚣录,但是雖然它那么好,但是在實際的用途中怎么去用眷篇,對于我來說就不知道如何下手了萎河,還好有開源社區(qū)我們可以了解很多項目中所運用到的線程池,比如最熟悉的就是Apache Tomcat了铅歼,相信都對它不默生公壤,一個Apache軟件基金下的一個開源Web容器,所以今天就來聊一下Tomcat的線程池實現(xiàn)椎椰。
準備工作
首先去Apache Tomcat的官網(wǎng)下載Tomcat的源代碼厦幅,這里給出<a >Tomcat源碼鏈接</a>,下載下來之后,它是一個zip文件慨飘,需要把它進行解壓到相應(yīng)的文件夾下面确憨,以便我們能方便的查看其源代碼译荞。分析源碼最行之有效的方法就是知道這個類有哪些方法,哪些字段休弃,繼承了哪些類吞歼,實現(xiàn)了哪些接口,所以我們這里推薦一款UML工具塔猾, astah-professional篙骡,可自行下載安裝,這是一個收費軟件丈甸,但是它有50天的試用期糯俗,所以我們可以以使用的身份使用該軟件。準備工作做好之后就可以進行下一步的操作了睦擂。
初探Tomcat線程池
Tomcat的線程池的類文件在../apache-tomcat-7.0.72-src\java\org\apache\catalina\core包下面得湘,定位到這個文件夾下面可以看到StandardThreadExecutor.java就是我們找尋的類了,用文本工具打開就可以查看其源碼了顿仇。這里源碼如下:
StandardThreadExecutor.java
public class StandardThreadExecutor extends LifecycleMBeanBase
implements Executor, ResizableExecutor {
//默認線程的優(yōu)先級
protected int threadPriority = Thread.NORM_PRIORITY;
//守護線程
protected boolean daemon = true;
//線程名稱的前綴
protected String namePrefix = "tomcat-exec-";
//最大線程數(shù)默認200個
protected int maxThreads = 200;
//最小空閑線程25個
protected int minSpareThreads = 25;
//超時時間為6000
protected int maxIdleTime = 60000;
//線程池容器
protected ThreadPoolExecutor executor = null;
//線程池的名稱
protected String name;
//是否提前啟動線程
protected boolean prestartminSpareThreads = false;
//隊列最大大小
protected int maxQueueSize = Integer.MAX_VALUE;
//為了避免在上下文停止之后淘正,所有的線程在同一時間段被更新,所以進行線程的延遲操作
protected long threadRenewalDelay = 1000L;
//任務(wù)隊列
private TaskQueue taskqueue = null;
//容器啟動時進行,具體可參考org.apache.catalina.util.LifecycleBase#startInternal()
@Override
protected void startInternal() throws LifecycleException {
//實例化任務(wù)隊列
taskqueue = new TaskQueue(maxQueueSize);
//自定義的線程工廠類,實現(xiàn)了JDK的ThreadFactory接口
TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority());
//這里的ThreadPoolExecutor是tomcat自定義的,不是JDK的ThreadPoolExecutor
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);
executor.setThreadRenewalDelay(threadRenewalDelay);
//是否提前啟動線程臼闻,如果為true鸿吆,則提前初始化minSpareThreads個的線程,放入線程池內(nèi)
if (prestartminSpareThreads) {
executor.prestartAllCoreThreads();
}
//設(shè)置任務(wù)容器的父級線程池對象
taskqueue.setParent(executor);
//設(shè)置容器啟動狀態(tài)
setState(LifecycleState.STARTING);
}
//容器停止時的生命周期方法,進行關(guān)閉線程池和資源清理
@Override
protected void stopInternal() throws LifecycleException {
setState(LifecycleState.STOPPING);
if ( executor != null ) executor.shutdownNow();
executor = null;
taskqueue = null;
}
//這個執(zhí)行線程方法有超時的操作些阅,參考org.apache.catalina.Executor接口
@Override
public void execute(Runnable command, long timeout, TimeUnit unit) {
if ( executor != null ) {
executor.execute(command,timeout,unit);
} else {
throw new IllegalStateException("StandardThreadExecutor not started.");
}
}
//JDK默認操作線程的方法,參考java.util.concurrent.Executor接口
@Override
public void execute(Runnable command) {
if ( executor != null ) {
try {
executor.execute(command);
} catch (RejectedExecutionException rx) {
//there could have been contention around the queue
if ( !( (TaskQueue) executor.getQueue()).force(command) ) throw new RejectedExecutionException("Work queue full.");
}
} else throw new IllegalStateException("StandardThreadPool not started.");
}
//由于繼承了org.apache.tomcat.util.threads.ResizableExecutor接口伞剑,所以可以重新定義線程池的大小
@Override
public boolean resizePool(int corePoolSize, int maximumPoolSize) {
if (executor == null)
return false;
executor.setCorePoolSize(corePoolSize);
executor.setMaximumPoolSize(maximumPoolSize);
return true;
}
}
??看完了上面的源碼之后,不知此刻的你是一面茫然還是認為小菜一碟呢市埋,不管怎樣黎泣,我們先來看下UML類圖吧,了解一下具體的繼承關(guān)系缤谎,你就明白了抒倚,廢話不多說,能用圖片解決的東西盡量少用文字坷澡。
??接下來托呕,我們來看一下ResizableExecutor這個接口:
import java.util.concurrent.Executor;
public interface ResizableExecutor extends Executor {
/**
* Returns the current number of threads in the pool.
*
* @return the number of threads
*/
public int getPoolSize();
public int getMaxThreads();
/**
* Returns the approximate number of threads that are actively executing
* tasks.
*
* @return the number of threads
*/
public int getActiveCount();
public boolean resizePool(int corePoolSize, int maximumPoolSize);
public boolean resizeQueue(int capacity);
}
??實現(xiàn)這個接口之后,就能動態(tài)改變線程池的大小和任務(wù)隊列的大小了频敛,它是繼承自JDK的Executor接口的项郊,其它的接口不再多說,可自行查看源碼斟赚。
Tomcat線程池的實現(xiàn)
??Tomcat的線程池的名字也叫作ThreadPoolExecutor着降,剛開始看源代碼的時候還以為是使用了JDK的ThreadPoolExecutor了呢,后面仔細查看才知道是Tomcat自己實現(xiàn)的一個ThreadPoolExecutor拗军,不過基本上都差不多任洞,都是在JDK之上封裝了一些自己的東西蓄喇,上源碼:
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
protected static final StringManager sm = StringManager
.getManager("org.apache.tomcat.util.threads.res");
/**
* 已經(jīng)提交但尚未完成的任務(wù)數(shù)量。
* 這包括已經(jīng)在隊列中的任務(wù)和已經(jīng)交給工作線程的任務(wù)但還未開始執(zhí)行的任務(wù)
* 這個數(shù)字總是大于getActiveCount()的
**/
private final AtomicInteger submittedCount = new AtomicInteger(0);
private final AtomicLong lastContextStoppedTime = new AtomicLong(0L);
/**
* 最近的時間在ms時交掏,一個線程決定殺死自己來避免
* 潛在的內(nèi)存泄漏妆偏。 用于調(diào)節(jié)線程的更新速率。
*/
private final AtomicLong lastTimeThreadKilledItself = new AtomicLong(0L);
//延遲2個線程之間的延遲盅弛。 如果為負钱骂,不更新線程。
private long threadRenewalDelay = 1000L;
//4個構(gòu)造方法 ... 省略
public long getThreadRenewalDelay() {
return threadRenewalDelay;
}
public void setThreadRenewalDelay(long threadRenewalDelay) {
this.threadRenewalDelay = threadRenewalDelay;
}
/**
* 方法在完成給定Runnable的執(zhí)行時調(diào)用挪鹏。
* 此方法由執(zhí)行任務(wù)的線程調(diào)用罐柳。 如果
* 非null,Throwable是未捕獲的{@code RuntimeException}
* 或{@code Error}狰住,導致執(zhí)行突然終止。...
* @param r 已完成的任務(wù)
* @param t 引起終止的異常齿梁,如果執(zhí)行正常完成則為null
**/
@Override
protected void afterExecute(Runnable r, Throwable t) {
submittedCount.decrementAndGet();
if (t == null) {
stopCurrentThreadIfNeeded();
}
}
//如果當前線程在上一次上下文停止之前啟動催植,則拋出異常,以便停止當前線程勺择。
protected void stopCurrentThreadIfNeeded() {
if (currentThreadShouldBeStopped()) {
long lastTime = lastTimeThreadKilledItself.longValue();
if (lastTime + threadRenewalDelay < System.currentTimeMillis()) {
if (lastTimeThreadKilledItself.compareAndSet(lastTime,
System.currentTimeMillis() + 1)) {
// OK, it's really time to dispose of this thread
final String msg = sm.getString(
"threadPoolExecutor.threadStoppedToAvoidPotentialLeak",
Thread.currentThread().getName());
throw new StopPooledThreadException(msg);
}
}
}
}
//當前線程是否需要被終止
protected boolean currentThreadShouldBeStopped() {
if (threadRenewalDelay >= 0
&& Thread.currentThread() instanceof TaskThread) {
TaskThread currentTaskThread = (TaskThread) Thread.currentThread();
//線程創(chuàng)建的時間<上下文停止的時間,則可以停止該線程
if (currentTaskThread.getCreationTime() <
this.lastContextStoppedTime.longValue()) {
return true;
}
}
return false;
}
public int getSubmittedCount() {
return submittedCount.get();
}
@Override
public void execute(Runnable command) {
execute(command,0,TimeUnit.MILLISECONDS);
}
public void execute(Runnable command, long timeout, TimeUnit unit) {
submittedCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
if (super.getQueue() instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue)super.getQueue();
try {
if (!queue.force(command, timeout, unit)) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.");
}
} catch (InterruptedException x) {
submittedCount.decrementAndGet();
Thread.interrupted();
throw new RejectedExecutionException(x);
}
} else {
submittedCount.decrementAndGet();
throw rx;
}
}
}
public void contextStopping() {
this.lastContextStoppedTime.set(System.currentTimeMillis());
int savedCorePoolSize = this.getCorePoolSize();
TaskQueue taskQueue =
getQueue() instanceof TaskQueue ? (TaskQueue) getQueue() : null;
if (taskQueue != null) {
taskQueue.setForcedRemainingCapacity(Integer.valueOf(0));
}
// setCorePoolSize(0) wakes idle threads
this.setCorePoolSize(0);
if (taskQueue != null) {
// ok, restore the state of the queue and pool
taskQueue.setForcedRemainingCapacity(null);
}
this.setCorePoolSize(savedCorePoolSize);
}
}
Tomcat的線程池根據(jù)文檔來說:和java.util.concurrent一樣,但是它實現(xiàn)了一個高效的方法getSubmittedCount()方法用來處理工作隊列创南。具體可以查看上面的注釋和源碼就知道了。把UML圖獻上省核。
Tomcat線程工廠
??想要自定義線程工廠類稿辙,只需要實現(xiàn)JDK的ThreadFactory接口就可以了,我們來看看Tomcat是如何實現(xiàn)的吧:
public class TaskThreadFactory implements ThreadFactory {
//線程組
private final ThreadGroup group;
//線程增長因子
private final AtomicInteger threadNumber = new AtomicInteger(1);
//名稱前綴
private final String namePrefix;
//是否是守護線程
private final boolean daemon;
//線程優(yōu)先級
private final int threadPriority;
public TaskThreadFactory(String namePrefix, boolean daemon, int priority) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
this.namePrefix = namePrefix;
this.daemon = daemon;
this.threadPriority = priority;
}
@Override
public Thread newThread(Runnable r) {
TaskThread t = new TaskThread(group, r, namePrefix + threadNumber.getAndIncrement());
t.setDaemon(daemon);
t.setPriority(threadPriority);
return t;
}
}
Tomcat的線程工廠類和JDK實現(xiàn)的線程工廠類相差無幾气忠,具體可以參考一下JDK線程工廠Executors.DefaultThreadFactory工廠類的實現(xiàn)邻储。
Tomcat的線程類
??Tomcat自己定義了TaskThread用于線程的執(zhí)行,里面增加了creationTime字段用于定義線程創(chuàng)建的開始時間旧噪,以便后面線程池獲取這個時間來進行優(yōu)化吨娜。
/**
* 一個實現(xiàn)創(chuàng)建時間紀錄的線程類
*/
public class TaskThread extends Thread {
private static final Log log = LogFactory.getLog(TaskThread.class);
private final long creationTime;
public TaskThread(ThreadGroup group, Runnable target, String name) {
super(group, new WrappingRunnable(target), name);
this.creationTime = System.currentTimeMillis();
}
public TaskThread(ThreadGroup group, Runnable target, String name,
long stackSize) {
super(group, new WrappingRunnable(target), name, stackSize);
this.creationTime = System.currentTimeMillis();
}
public final long getCreationTime() {
return creationTime;
}
/**
* 封裝{@link Runnable}以接受任何{@link StopPooledThreadException},而不是讓它走淘钟,并可能在調(diào)試器中觸發(fā)中斷宦赠。
*/
private static class WrappingRunnable implements Runnable {
private Runnable wrappedRunnable;
WrappingRunnable(Runnable wrappedRunnable) {
this.wrappedRunnable = wrappedRunnable;
}
@Override
public void run() {
try {
wrappedRunnable.run();
} catch(StopPooledThreadException exc) {
//expected : we just swallow the exception to avoid disturbing
//debuggers like eclipse's
log.debug("Thread exiting on purpose", exc);
}
}
}
}
按照Tomcat的注解可知,它就是一個普通的線程類然后增加一個紀錄線程創(chuàng)建的時間紀錄而已米母,后面還使用動態(tài)內(nèi)部類封裝了一個Runnable勾扭,用于調(diào)試出發(fā)中斷。
Tomcat任務(wù)隊列
??Tomcat的線程隊列由org.apache.tomcat.util.threads.TaskQueue來處理铁瞒,它集成自LinkedBlockingQueue(一個阻塞的鏈表隊列)妙色,來看下源代碼吧。
public class TaskQueue extends LinkedBlockingQueue<Runnable> {
private static final long serialVersionUID = 1L;
private ThreadPoolExecutor parent = null;
// no need to be volatile, the one times when we change and read it occur in
// a single thread (the one that did stop a context and fired listeners)
private Integer forcedRemainingCapacity = null;
public TaskQueue() {
super();
}
public TaskQueue(int capacity) {
super(capacity);
}
public TaskQueue(Collection<? extends Runnable> c) {
super(c);
}
public void setParent(ThreadPoolExecutor tp) {
parent = tp;
}
public boolean force(Runnable o) {
if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");
return super.offer(o); //forces the item onto the queue, to be used if the task is rejected
}
public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");
return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected
}
@Override
public boolean offer(Runnable o) {
//we can't do any checks
if (parent==null) return super.offer(o);
//we are maxed out on threads, simply queue the object
if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
//we have idle threads, just add it to the queue
if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);
//if we have less threads than maximum force creation of a new thread
if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
//if we reached here, we need to add it to the queue
return super.offer(o);
}
@Override
public Runnable poll(long timeout, TimeUnit unit)
throws InterruptedException {
Runnable runnable = super.poll(timeout, unit);
if (runnable == null && parent != null) {
// the poll timed out, it gives an opportunity to stop the current
// thread if needed to avoid memory leaks.
parent.stopCurrentThreadIfNeeded();
}
return runnable;
}
@Override
public Runnable take() throws InterruptedException {
if (parent != null && parent.currentThreadShouldBeStopped()) {
return poll(parent.getKeepAliveTime(TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS);
// yes, this may return null (in case of timeout) which normally
// does not occur with take()
// but the ThreadPoolExecutor implementation allows this
}
return super.take();
}
@Override
public int remainingCapacity() {
if (forcedRemainingCapacity != null) {
return forcedRemainingCapacity.intValue();
}
return super.remainingCapacity();
}
public void setForcedRemainingCapacity(Integer forcedRemainingCapacity) {
this.forcedRemainingCapacity = forcedRemainingCapacity;
}
}
TaskQueue這個任務(wù)隊列是專門為線程池而設(shè)計的精拟。優(yōu)化任務(wù)隊列以適當?shù)乩镁€程池執(zhí)行器內(nèi)的線程燎斩。
如果你使用一個普通的隊列虱歪,執(zhí)行器將產(chǎn)生線程,當有空閑線程栅表,你不能強制項目到隊列本身笋鄙。
總結(jié)
從0到1分析一下Apache Tomcat的線程池,感覺心好累啊怪瓶,不過有收獲萧落,至少多線程池這一塊又加強了,首先是定位到了StandardThreadExecutor這個類洗贰,然后由此展開找岖,ResizableExecutor(動態(tài)大小的線程池接口) 、ThreadPoolExecutor (Tomcat線程池具體實現(xiàn)對象)敛滋、TaskThreadFactory(Tomcat線程工廠)许布、TaskThread(Tomcat線程類-一個紀錄創(chuàng)建時間的線程類)、TaskQueue(Tomcat的任務(wù)隊列-一個專門為線程池而設(shè)計優(yōu)化的任務(wù)隊列)绎晃,喝口水蜜唾,壓壓驚。