Tomcat線程池詳解

寫在前面的話

最近一直都在研究Java的線程池ThreadPoolExecutor蚣录,但是雖然它那么好,但是在實際的用途中怎么去用眷篇,對于我來說就不知道如何下手了萎河,還好有開源社區(qū)我們可以了解很多項目中所運用到的線程池,比如最熟悉的就是Apache Tomcat了铅歼,相信都對它不默生公壤,一個Apache軟件基金下的一個開源Web容器,所以今天就來聊一下Tomcat的線程池實現(xiàn)椎椰。

準備工作

首先去Apache Tomcat的官網(wǎng)下載Tomcat的源代碼厦幅,這里給出<a >Tomcat源碼鏈接</a>,下載下來之后,它是一個zip文件慨飘,需要把它進行解壓到相應(yīng)的文件夾下面确憨,以便我們能方便的查看其源代碼译荞。分析源碼最行之有效的方法就是知道這個類有哪些方法,哪些字段休弃,繼承了哪些類吞歼,實現(xiàn)了哪些接口,所以我們這里推薦一款UML工具塔猾, astah-professional篙骡,可自行下載安裝,這是一個收費軟件丈甸,但是它有50天的試用期糯俗,所以我們可以以使用的身份使用該軟件。準備工作做好之后就可以進行下一步的操作了睦擂。

初探Tomcat線程池

Tomcat的線程池的類文件在../apache-tomcat-7.0.72-src\java\org\apache\catalina\core包下面得湘,定位到這個文件夾下面可以看到StandardThreadExecutor.java就是我們找尋的類了,用文本工具打開就可以查看其源碼了顿仇。這里源碼如下:
StandardThreadExecutor.java

public class StandardThreadExecutor extends LifecycleMBeanBase
        implements Executor, ResizableExecutor {
    //默認線程的優(yōu)先級
    protected int threadPriority = Thread.NORM_PRIORITY;
    //守護線程
    protected boolean daemon = true;
    //線程名稱的前綴
    protected String namePrefix = "tomcat-exec-";
    //最大線程數(shù)默認200個
    protected int maxThreads = 200;
    //最小空閑線程25個
    protected int minSpareThreads = 25;
    //超時時間為6000
    protected int maxIdleTime = 60000;
    //線程池容器
    protected ThreadPoolExecutor executor = null;
    //線程池的名稱
    protected String name;
     //是否提前啟動線程
    protected boolean prestartminSpareThreads = false;
    //隊列最大大小
    protected int maxQueueSize = Integer.MAX_VALUE;
    //為了避免在上下文停止之后淘正,所有的線程在同一時間段被更新,所以進行線程的延遲操作
    protected long threadRenewalDelay = 1000L;
    //任務(wù)隊列
    private TaskQueue taskqueue = null;

    //容器啟動時進行,具體可參考org.apache.catalina.util.LifecycleBase#startInternal()
    @Override
    protected void startInternal() throws LifecycleException {
        //實例化任務(wù)隊列
        taskqueue = new TaskQueue(maxQueueSize);
        //自定義的線程工廠類,實現(xiàn)了JDK的ThreadFactory接口
        TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority());
        //這里的ThreadPoolExecutor是tomcat自定義的,不是JDK的ThreadPoolExecutor
        executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);
        executor.setThreadRenewalDelay(threadRenewalDelay);
        //是否提前啟動線程臼闻,如果為true鸿吆,則提前初始化minSpareThreads個的線程,放入線程池內(nèi)
        if (prestartminSpareThreads) {
            executor.prestartAllCoreThreads();
        }
        //設(shè)置任務(wù)容器的父級線程池對象
        taskqueue.setParent(executor);
        //設(shè)置容器啟動狀態(tài)
        setState(LifecycleState.STARTING);
    }
    
  //容器停止時的生命周期方法,進行關(guān)閉線程池和資源清理
    @Override
    protected void stopInternal() throws LifecycleException {

        setState(LifecycleState.STOPPING);
        if ( executor != null ) executor.shutdownNow();
        executor = null;
        taskqueue = null;
    }
    
    //這個執(zhí)行線程方法有超時的操作些阅,參考org.apache.catalina.Executor接口
    @Override
    public void execute(Runnable command, long timeout, TimeUnit unit) {
        if ( executor != null ) {
            executor.execute(command,timeout,unit);
        } else { 
            throw new IllegalStateException("StandardThreadExecutor not started.");
        }
    }

    //JDK默認操作線程的方法,參考java.util.concurrent.Executor接口
    @Override
    public void execute(Runnable command) {
        if ( executor != null ) {
            try {
                executor.execute(command);
            } catch (RejectedExecutionException rx) {
                //there could have been contention around the queue
                if ( !( (TaskQueue) executor.getQueue()).force(command) ) throw new RejectedExecutionException("Work queue full.");
            }
        } else throw new IllegalStateException("StandardThreadPool not started.");
    }

    //由于繼承了org.apache.tomcat.util.threads.ResizableExecutor接口伞剑,所以可以重新定義線程池的大小
    @Override
    public boolean resizePool(int corePoolSize, int maximumPoolSize) {
        if (executor == null)
            return false;

        executor.setCorePoolSize(corePoolSize);
        executor.setMaximumPoolSize(maximumPoolSize);
        return true;
    }
}

??看完了上面的源碼之后,不知此刻的你是一面茫然還是認為小菜一碟呢市埋,不管怎樣黎泣,我們先來看下UML類圖吧,了解一下具體的繼承關(guān)系缤谎,你就明白了抒倚,廢話不多說,能用圖片解決的東西盡量少用文字坷澡。

StandardThreadExecutor類繼承關(guān)系

??接下來托呕,我們來看一下ResizableExecutor這個接口:

import java.util.concurrent.Executor;

public interface ResizableExecutor extends Executor {

    /**
     * Returns the current number of threads in the pool.
     *
     * @return the number of threads
     */
    public int getPoolSize();

    public int getMaxThreads();

    /**
     * Returns the approximate number of threads that are actively executing
     * tasks.
     *
     * @return the number of threads
     */
    public int getActiveCount();

    public boolean resizePool(int corePoolSize, int maximumPoolSize);

    public boolean resizeQueue(int capacity);

}

??實現(xiàn)這個接口之后,就能動態(tài)改變線程池的大小和任務(wù)隊列的大小了频敛,它是繼承自JDK的Executor接口的项郊,其它的接口不再多說,可自行查看源碼斟赚。

Tomcat線程池的實現(xiàn)

??Tomcat的線程池的名字也叫作ThreadPoolExecutor着降,剛開始看源代碼的時候還以為是使用了JDK的ThreadPoolExecutor了呢,后面仔細查看才知道是Tomcat自己實現(xiàn)的一個ThreadPoolExecutor拗军,不過基本上都差不多任洞,都是在JDK之上封裝了一些自己的東西蓄喇,上源碼:

public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
    protected static final StringManager sm = StringManager
            .getManager("org.apache.tomcat.util.threads.res");
    /**
     *  已經(jīng)提交但尚未完成的任務(wù)數(shù)量。
     *  這包括已經(jīng)在隊列中的任務(wù)和已經(jīng)交給工作線程的任務(wù)但還未開始執(zhí)行的任務(wù)
     *  這個數(shù)字總是大于getActiveCount()的
     **/
    private final AtomicInteger submittedCount = new AtomicInteger(0);
    private final AtomicLong lastContextStoppedTime = new AtomicLong(0L);
    /**
    *  最近的時間在ms時交掏,一個線程決定殺死自己來避免
    *  潛在的內(nèi)存泄漏妆偏。 用于調(diào)節(jié)線程的更新速率。
    */
    private final AtomicLong lastTimeThreadKilledItself = new AtomicLong(0L);
    //延遲2個線程之間的延遲盅弛。 如果為負钱骂,不更新線程。
    private long threadRenewalDelay = 1000L;
    //4個構(gòu)造方法  ... 省略

    public long getThreadRenewalDelay() {
        return threadRenewalDelay;
    }

    public void setThreadRenewalDelay(long threadRenewalDelay) {
        this.threadRenewalDelay = threadRenewalDelay;
    }
    
    /**
    *  方法在完成給定Runnable的執(zhí)行時調(diào)用挪鹏。
    *  此方法由執(zhí)行任務(wù)的線程調(diào)用罐柳。 如果
    *  非null,Throwable是未捕獲的{@code RuntimeException}
    *  或{@code Error}狰住,導致執(zhí)行突然終止。...
    *  @param r 已完成的任務(wù)
    *  @param t 引起終止的異常齿梁,如果執(zhí)行正常完成則為null
    **/
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        submittedCount.decrementAndGet();

        if (t == null) {
            stopCurrentThreadIfNeeded();
        }
    }

    //如果當前線程在上一次上下文停止之前啟動催植,則拋出異常,以便停止當前線程勺择。
    protected void stopCurrentThreadIfNeeded() {
        if (currentThreadShouldBeStopped()) {
            long lastTime = lastTimeThreadKilledItself.longValue();
            if (lastTime + threadRenewalDelay < System.currentTimeMillis()) {
                if (lastTimeThreadKilledItself.compareAndSet(lastTime,
                        System.currentTimeMillis() + 1)) {
                    // OK, it's really time to dispose of this thread

                    final String msg = sm.getString(
                                    "threadPoolExecutor.threadStoppedToAvoidPotentialLeak",
                                    Thread.currentThread().getName());

                    throw new StopPooledThreadException(msg);
                }
            }
        }
    }
    
    //當前線程是否需要被終止
    protected boolean currentThreadShouldBeStopped() {
        if (threadRenewalDelay >= 0
            && Thread.currentThread() instanceof TaskThread) {
            TaskThread currentTaskThread = (TaskThread) Thread.currentThread();
            //線程創(chuàng)建的時間<上下文停止的時間,則可以停止該線程
            if (currentTaskThread.getCreationTime() <
                    this.lastContextStoppedTime.longValue()) {
                return true;
            }
        }
        return false;
    }

    public int getSubmittedCount() {
        return submittedCount.get();
    }

    @Override
    public void execute(Runnable command) {
        execute(command,0,TimeUnit.MILLISECONDS);
    }

    public void execute(Runnable command, long timeout, TimeUnit unit) {
        submittedCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            if (super.getQueue() instanceof TaskQueue) {
                final TaskQueue queue = (TaskQueue)super.getQueue();
                try {
                    if (!queue.force(command, timeout, unit)) {
                        submittedCount.decrementAndGet();
                        throw new RejectedExecutionException("Queue capacity is full.");
                    }
                } catch (InterruptedException x) {
                    submittedCount.decrementAndGet();
                    Thread.interrupted();
                    throw new RejectedExecutionException(x);
                }
            } else {
                submittedCount.decrementAndGet();
                throw rx;
            }

        }
    }

    public void contextStopping() {
        this.lastContextStoppedTime.set(System.currentTimeMillis());
        int savedCorePoolSize = this.getCorePoolSize();
        TaskQueue taskQueue =
                getQueue() instanceof TaskQueue ? (TaskQueue) getQueue() : null;
        if (taskQueue != null) {
            taskQueue.setForcedRemainingCapacity(Integer.valueOf(0));
        }

        // setCorePoolSize(0) wakes idle threads
        this.setCorePoolSize(0);

        if (taskQueue != null) {
            // ok, restore the state of the queue and pool
            taskQueue.setForcedRemainingCapacity(null);
        }
        this.setCorePoolSize(savedCorePoolSize);
    }
}

Tomcat的線程池根據(jù)文檔來說:和java.util.concurrent一樣,但是它實現(xiàn)了一個高效的方法getSubmittedCount()方法用來處理工作隊列创南。具體可以查看上面的注釋和源碼就知道了。把UML圖獻上省核。

ThreadPoolExecutor

Tomcat線程工廠

??想要自定義線程工廠類稿辙,只需要實現(xiàn)JDK的ThreadFactory接口就可以了,我們來看看Tomcat是如何實現(xiàn)的吧:

public class TaskThreadFactory implements ThreadFactory {
    //線程組
    private final ThreadGroup group;
    //線程增長因子
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    //名稱前綴
    private final String namePrefix;
    //是否是守護線程
    private final boolean daemon;
    //線程優(yōu)先級
    private final int threadPriority;
    public TaskThreadFactory(String namePrefix, boolean daemon, int priority) {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
        this.namePrefix = namePrefix;
        this.daemon = daemon;
        this.threadPriority = priority;
    }

    @Override
    public Thread newThread(Runnable r) {
        TaskThread t = new TaskThread(group, r, namePrefix + threadNumber.getAndIncrement());
        t.setDaemon(daemon);
        t.setPriority(threadPriority);
        return t;
    }

}

Tomcat的線程工廠類和JDK實現(xiàn)的線程工廠類相差無幾气忠,具體可以參考一下JDK線程工廠Executors.DefaultThreadFactory工廠類的實現(xiàn)邻储。

Tomcat的線程類

??Tomcat自己定義了TaskThread用于線程的執(zhí)行,里面增加了creationTime字段用于定義線程創(chuàng)建的開始時間旧噪,以便后面線程池獲取這個時間來進行優(yōu)化吨娜。

/**
 * 一個實現(xiàn)創(chuàng)建時間紀錄的線程類
 */
public class TaskThread extends Thread {

    private static final Log log = LogFactory.getLog(TaskThread.class);
    private final long creationTime;

    public TaskThread(ThreadGroup group, Runnable target, String name) {
        super(group, new WrappingRunnable(target), name);
        this.creationTime = System.currentTimeMillis();
    }

    public TaskThread(ThreadGroup group, Runnable target, String name,
            long stackSize) {
        super(group, new WrappingRunnable(target), name, stackSize);
        this.creationTime = System.currentTimeMillis();
    }

    public final long getCreationTime() {
        return creationTime;
    }

    /**
    *  封裝{@link Runnable}以接受任何{@link StopPooledThreadException},而不是讓它走淘钟,并可能在調(diào)試器中觸發(fā)中斷宦赠。
     */
    private static class WrappingRunnable implements Runnable {
        private Runnable wrappedRunnable;
        WrappingRunnable(Runnable wrappedRunnable) {
            this.wrappedRunnable = wrappedRunnable;
        }
        @Override
        public void run() {
            try {
                wrappedRunnable.run();
            } catch(StopPooledThreadException exc) {
                //expected : we just swallow the exception to avoid disturbing
                //debuggers like eclipse's
                log.debug("Thread exiting on purpose", exc);
            }
        }
    }
}

按照Tomcat的注解可知,它就是一個普通的線程類然后增加一個紀錄線程創(chuàng)建的時間紀錄而已米母,后面還使用動態(tài)內(nèi)部類封裝了一個Runnable勾扭,用于調(diào)試出發(fā)中斷。

Tomcat任務(wù)隊列

??Tomcat的線程隊列由org.apache.tomcat.util.threads.TaskQueue來處理铁瞒,它集成自LinkedBlockingQueue(一個阻塞的鏈表隊列)妙色,來看下源代碼吧。

public class TaskQueue extends LinkedBlockingQueue<Runnable> {

    private static final long serialVersionUID = 1L;

    private ThreadPoolExecutor parent = null;

    // no need to be volatile, the one times when we change and read it occur in
    // a single thread (the one that did stop a context and fired listeners)
    private Integer forcedRemainingCapacity = null;

    public TaskQueue() {
        super();
    }

    public TaskQueue(int capacity) {
        super(capacity);
    }

    public TaskQueue(Collection<? extends Runnable> c) {
        super(c);
    }

    public void setParent(ThreadPoolExecutor tp) {
        parent = tp;
    }

    public boolean force(Runnable o) {
        if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");
        return super.offer(o); //forces the item onto the queue, to be used if the task is rejected
    }

    public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
        if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");
        return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected
    }

    @Override
    public boolean offer(Runnable o) {
      //we can't do any checks
        if (parent==null) return super.offer(o);
        //we are maxed out on threads, simply queue the object
        if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
        //we have idle threads, just add it to the queue
        if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);
        //if we have less threads than maximum force creation of a new thread
        if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
        //if we reached here, we need to add it to the queue
        return super.offer(o);
    }


    @Override
    public Runnable poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        Runnable runnable = super.poll(timeout, unit);
        if (runnable == null && parent != null) {
            // the poll timed out, it gives an opportunity to stop the current
            // thread if needed to avoid memory leaks.
            parent.stopCurrentThreadIfNeeded();
        }
        return runnable;
    }

    @Override
    public Runnable take() throws InterruptedException {
        if (parent != null && parent.currentThreadShouldBeStopped()) {
            return poll(parent.getKeepAliveTime(TimeUnit.MILLISECONDS),
                    TimeUnit.MILLISECONDS);
            // yes, this may return null (in case of timeout) which normally
            // does not occur with take()
            // but the ThreadPoolExecutor implementation allows this
        }
        return super.take();
    }

    @Override
    public int remainingCapacity() {
        if (forcedRemainingCapacity != null) {
            return forcedRemainingCapacity.intValue();
        }
        return super.remainingCapacity();
    }

    public void setForcedRemainingCapacity(Integer forcedRemainingCapacity) {
        this.forcedRemainingCapacity = forcedRemainingCapacity;
    }
}

TaskQueue這個任務(wù)隊列是專門為線程池而設(shè)計的精拟。優(yōu)化任務(wù)隊列以適當?shù)乩镁€程池執(zhí)行器內(nèi)的線程燎斩。
如果你使用一個普通的隊列虱歪,執(zhí)行器將產(chǎn)生線程,當有空閑線程栅表,你不能強制項目到隊列本身笋鄙。

總結(jié)

從0到1分析一下Apache Tomcat的線程池,感覺心好累啊怪瓶,不過有收獲萧落,至少多線程池這一塊又加強了,首先是定位到了StandardThreadExecutor這個類洗贰,然后由此展開找岖,ResizableExecutor(動態(tài)大小的線程池接口) 、ThreadPoolExecutor (Tomcat線程池具體實現(xiàn)對象)敛滋、TaskThreadFactory(Tomcat線程工廠)许布、TaskThread(Tomcat線程類-一個紀錄創(chuàng)建時間的線程類)、TaskQueue(Tomcat的任務(wù)隊列-一個專門為線程池而設(shè)計優(yōu)化的任務(wù)隊列)绎晃,喝口水蜜唾,壓壓驚。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末庶艾,一起剝皮案震驚了整個濱河市袁余,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌咱揍,老刑警劉巖颖榜,帶你破解...
    沈念sama閱讀 216,997評論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異煤裙,居然都是意外死亡掩完,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,603評論 3 392
  • 文/潘曉璐 我一進店門积暖,熙熙樓的掌柜王于貴愁眉苦臉地迎上來藤为,“玉大人,你說我怎么就攤上這事夺刑∶迮保” “怎么了?”我有些...
    開封第一講書人閱讀 163,359評論 0 353
  • 文/不壞的土叔 我叫張陵遍愿,是天一觀的道長存淫。 經(jīng)常有香客問我,道長沼填,這世上最難降的妖魔是什么桅咆? 我笑而不...
    開封第一講書人閱讀 58,309評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮坞笙,結(jié)果婚禮上岩饼,老公的妹妹穿的比我還像新娘荚虚。我一直安慰自己,他們只是感情好籍茧,可當我...
    茶點故事閱讀 67,346評論 6 390
  • 文/花漫 我一把揭開白布版述。 她就那樣靜靜地躺著,像睡著了一般寞冯。 火紅的嫁衣襯著肌膚如雪渴析。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,258評論 1 300
  • 那天吮龄,我揣著相機與錄音俭茧,去河邊找鬼。 笑死漓帚,一個胖子當著我的面吹牛母债,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播尝抖,決...
    沈念sama閱讀 40,122評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼场斑,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了牵署?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,970評論 0 275
  • 序言:老撾萬榮一對情侶失蹤喧半,失蹤者是張志新(化名)和其女友劉穎奴迅,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體挺据,經(jīng)...
    沈念sama閱讀 45,403評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡取具,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,596評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了扁耐。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片暇检。...
    茶點故事閱讀 39,769評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖婉称,靈堂內(nèi)的尸體忽然破棺而出块仆,到底是詐尸還是另有隱情,我是刑警寧澤王暗,帶...
    沈念sama閱讀 35,464評論 5 344
  • 正文 年R本政府宣布悔据,位于F島的核電站,受9級特大地震影響俗壹,放射性物質(zhì)發(fā)生泄漏科汗。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,075評論 3 327
  • 文/蒙蒙 一绷雏、第九天 我趴在偏房一處隱蔽的房頂上張望头滔。 院中可真熱鬧怖亭,春花似錦、人聲如沸坤检。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,705評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽缀蹄。三九已至峭跳,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間缺前,已是汗流浹背蛀醉。 一陣腳步聲響...
    開封第一講書人閱讀 32,848評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留衅码,地道東北人拯刁。 一個月前我還...
    沈念sama閱讀 47,831評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像逝段,于是被迫代替她去往敵國和親垛玻。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,678評論 2 354

推薦閱讀更多精彩內(nèi)容