線程池工作機制與原理

書接上文喇勋,<a href="http://www.reibang.com/p/aa5884bcd032">Java線程池</a>涤姊。
接下來記錄一下線程池的工作機制和原理

線程池的兩個核心隊列:

  • 線程等待池籽懦,即線程隊列BlockingQueue财骨。
  • 任務處理池(PoolWorker)肋拔,即正在工作的Thread列表(HashSet<Worker>)锈津。

線程池的核心參數(shù):

  • 核心池大小(corePoolSize)只损,即固定大小一姿,設定好之后,線程池的穩(wěn)定峰值跃惫,達到這個值之后池的線程數(shù)大小不會釋放叮叹。
  • 最大處理線程池數(shù)(maximumPoolSize),當線程池里面的線程數(shù)超過corePoolSize爆存,小于maximumPoolSize時會動態(tài)創(chuàng)建與回收線程池里面的線程池資源蛉顽。

線程池的運行機制:
舉個栗子。假如有一個工廠先较,工廠里面有10個人携冤,每個工人同時只能做一件事情。因此只要當10個工人中有工人是空閑的闲勺,來了任務就分配給空閑的工人做曾棕;當10個工人都有任務時,如果還來任務菜循,就把任務進行排隊等待翘地。
如果說新任務數(shù)目增長的速度遠遠大于工作做任務的速度,那么此時工廠的主管可能就需要采取補救措施了癌幕,比如重新招4個工人進來衙耕;然后就將任務分配給這4個剛招進來的工人處理。
如果說這14個工人做任務的速度還是不夠勺远,此時工廠主管就要考慮不再接受新的任務或者拋棄前面的一些任務了橙喘。當這14個工人當中有人空閑時,而新任務增長的速度又比較緩慢胶逢,工廠主管就要考慮辭掉4個臨時工了厅瞎,只保持原來10個工人饰潜,比較額外的工人是需要花費的。
而這個栗子中永遠等待干活的10個工人機制就是workerQueue磁奖。這個栗子中的corePoolSize就是10囊拜,而maximumPoolSize就是14(10+4)。也就是說corePoolSize就是線程池的大小比搭,maximumPoolSize在我看來就是一種線程池任務超過負荷的一種補救措施冠跷,即任務量突然過大時的一種補救措施。再看看下面圖好好理解一下身诺。工人永遠在等待干活蜜托,就像workerQueue永遠在循環(huán)干活一樣,除非霉赡,整個線程池停止了橄务。

線程池原理圖

線程池里面的線程的時序圖如下圖所示:

線程的時序圖

自定義線程池與ExecutorService

自定義線程池需要用到ThreadFactory,本節(jié)將通過創(chuàng)建一個線程的例子對ExecutorService及其參數(shù)進行詳細講解穴亏。

1.認識ExecutorService家族

ExecutorService家族成員如下所示:

ExecutorService家族

使用startUML畫的蜂挪,我是UML菜鳥,所以湊合著看下嗓化。

上圖中主要元素說明如下:
Executor:線程池的頂級接口棠涮,但是嚴格意義上講Executor并不是一個線程池,而只是一個執(zhí)行線程的工具刺覆。
ExecutorService:真正線程池接口严肪。這個接口繼承了Executor接口,并聲明了一些方法:
submit谦屑、invokeAll驳糯、invokeAny以及shutDown等。
AbstractExecutorService實現(xiàn)了ExecutorService接口氢橙,基本實現(xiàn)了ExecutorService中聲明的所有方法酝枢。
ThreadPoolExecutor:ExecutorService的默認實現(xiàn),繼承了類AbstractExecutorService悍手。
ScheduledExecutorService:與Timer/TimerTask類似隧枫,解決那些需要任務重復執(zhí)行的問題。
ScheduledThreadPoolExecutor:繼承ThreadPoolExecutor的ScheduledExecutorService接口實現(xiàn)谓苟,周期性任務調(diào)度的類實現(xiàn)。
Executors是個線程工廠類协怒,方便我們快速地創(chuàng)建線程池涝焙。

2.利用ThreadFactory創(chuàng)建一個線程

java.util.concurrent.ThreadFactory提供了一個創(chuàng)建線程的工廠的接口。
ThreadFactory源碼如下:

public interface ThreadFactory{
  @override
  public Thread newThread(Runnable r);
}

我們可以看到上面的接口類中有一個newThread()的方法孕暇,為此我們自己手動定義一個線程工廠類仑撞,有木有激動啊赤兴,呵呵,下面我們就手動寫一個自己的線程工廠類吧隧哮!
MyThreadFactory.java

public class MyThreadFactory implements ThreadFactory{
  @Override
  public Thread newThread(Runnable r){
        return new Thread(r);
  }
}

上面已經(jīng)創(chuàng)建好了我們自己的線程工廠類桶良,但是啥都沒有做,就是直接new了一個Thread就返回回去了沮翔,我們一般在創(chuàng)建線程的時候陨帆,都需要定義其線程的名字,因為我們在定義了線程的名字之后就能在出現(xiàn)問題的時候根據(jù)監(jiān)視工具來查找錯誤的來源采蚀,所以我們來看下官方實現(xiàn)的ThreadFactory吧疲牵!
這個類在java.util.concurrent.Executors類中的靜態(tài)類中DefaultThreadFactory

/**
*  The default thread factory
*/
static class DefaultThreadFactory implements ThreadFactory{
  private static final AtomicInteger poolNumber=new AtomicInteger(1);
  private final ThreadGroup group;
  private final AtomicInteger threadNumber=new AtomicInteger(1);
  private final String namePrefix;

  DefaultThreadFactory(){
    SecurityManager s=System.getSecurityManager();
    group=(s!=null)?s.getThreadGroup():Thread.currentThread().getThreadGroup();
    namePrefix="pool-"+poolNumber.getAndIncrement()+"-thread-";
  }
  public Thread newThread(Runnable r){
      Thread t=new Thread(group,r,namePrefix+threadNumber.getAndIncrement(),0);
      if((t.isDaemon())
          t.setDaemon(false);
      if(t.getPriority()!=Thread.NORM_PRIORITY)
          t.setPriority(Thread.NORM_PRIORITY);
      return t;
  }
}

3.了解線程池的拒絕策略(RejectExecutionHandler)

當調(diào)用ThreadPoolExecutor的execute方法時,而此時線程池處于一個飽和的狀態(tài)榆鼠,并且任務隊列也已經(jīng)滿了那么就需要做丟棄處理纲爸,RejectExecutionHandler就是這樣的一個處理接口類。
RejectExecutionHandler.java

public interface RejectedExecutionHandler {

    /**
     * Method that may be invoked by a {@link ThreadPoolExecutor} when
     * {@link ThreadPoolExecutor#execute execute} cannot accept a
     * task.  This may occur when no more threads or queue slots are
     * available because their bounds would be exceeded, or upon
     * shutdown of the Executor.
     *
     * <p>In the absence of other alternatives, the method may throw
     * an unchecked {@link RejectedExecutionException}, which will be
     * propagated to the caller of {@code execute}.
     *
     * @param r the runnable task requested to be executed
     * @param executor the executor attempting to execute this task
     * @throws RejectedExecutionException if there is no remedy
     */
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

在JDK里面有4中拒絕策略妆够,如下圖所示:

線程池拒絕策略
  • AbortPolicy:一言不合就拋異常(默認使用策略)识啦。
  • CallerRunsPolicy:只用調(diào)用者所在線程來運行任務。
  • DiscardOldestPolicy:丟棄隊列里最近的一個任務神妹,并執(zhí)行當前任務颓哮。
  • DiscardPolicy:不處理,直接丟棄灾螃。

來看下源碼吧:
AbortPolicy : 一言不合就拋異常的

   /**
     * A handler for rejected tasks that throws a
     * {@code RejectedExecutionException}.
     */
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always.
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

CallerRunsPolicy:調(diào)用者所在線程來運行任務

    /**
     * A handler for rejected tasks that runs the rejected task
     * directly in the calling thread of the {@code execute} method,
     * unless the executor has been shut down, in which case the task
     * is discarded.
     */
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

DiscardOldestPolicy :丟棄隊列里面最近的一個任務,并執(zhí)行當前任務

    /**
     * A handler for rejected tasks that discards the oldest unhandled
     * request and then retries {@code execute}, unless the executor
     * is shut down, in which case the task is discarded.
     */
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }

        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

DiscardPolicy : 不處理题翻,直接丟棄

/**
     * A handler for rejected tasks that silently discards the
     * rejected task.
     */
    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }

        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

思考問題:
為什么有任務拒絕的情況發(fā)生呢:
這里先假設有一個前提:線程池里面有一個任務隊列,用于緩存所有待處理的任務腰鬼,正在處理的任務將從任務隊列中移除嵌赠。因此,在任務隊列長度有限的情況下熄赡,就會出現(xiàn)現(xiàn)任務的拒絕情況姜挺,需要一種策略來處理發(fā)生這種已滿無法加入的情況。另外彼硫,在線程池關閉的時候炊豪,也需要對任務加入隊列操作進行額外的協(xié)調(diào)處理。

4.ThreadPoolExecutor詳解

ThreadPoolExecutor類是線程池中最核心的一個類拧篮,因此如果要想透徹的了解Java線程池词渤,必須先了解這個大BOSS,下面來看下其源碼:
4種構造方法:

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime,TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime,TimeUnit unit,
                              BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);
    }

    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
                             BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,RejectedExecutionHandler handler) {
        if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

通過源碼我們清楚的看到串绩,最終構造函數(shù)調(diào)用了最后一個構造函數(shù)缺虐,后面的那個構造函數(shù)才是真正的構造函數(shù),接下來研究一下參數(shù)礁凡。

  • int corePoolSize:核心池大小高氮,這個參數(shù)跟后面講的線程池原理有很大的關系慧妄。在創(chuàng)建了線程池之后,默認情況下剪芍,線程池中并沒有任何線程塞淹,而是等待所有的任務到來之時才進行創(chuàng)建線程去執(zhí)行任務,除非調(diào)用了prestartAllCoreThreads()或者prestartCoreThread()方法 罪裹,從這個兩個方法的名字可以知道是預創(chuàng)建線程的意思饱普,即在沒有任務來臨之前先創(chuàng)建好corePoolSize個線程或者一個線程。默認情況下坊谁,在創(chuàng)建好線程池之后费彼,線程池中的線程數(shù)為0,當有任務來之后口芍,就會創(chuàng)建一個線程去執(zhí)行任務箍铲,當線程池中的線程數(shù)量達到corePoolSize后,就會把達到的任務放到緩存隊列中去鬓椭。
  • int maximumPoolSize:線程池最大線程數(shù)量颠猴,這是個非常重要的參數(shù),它表示在線程池中最多能創(chuàng)建線程的數(shù)量小染;在corePoolSize和maximumPoolSize的線程數(shù)會被自動釋放翘瓮,而小于corePoolSize的則不會。
  • long keepAliveTime:表示線程沒有執(zhí)行任務時最多保持多久時間會終止裤翩。默認情況下资盅,只有當線程池中的線程數(shù)大于corePoolSize時,keepAliveTime才會生效,直到線程池數(shù)量不大于corePoolSize踊赠,即只有當線程池數(shù)量大于corePoolSize數(shù)量呵扛,超出這個數(shù)量的線程一旦到達keepAliveTime就會終止。但是如果調(diào)用了allowCoreThreadTimeout(boolean)方法筐带,即使線程池的線程數(shù)量不大于corePoolSize今穿,線程也會在keepAliveTime之后就終止,知道線程池的數(shù)量為0為止伦籍。
  • TimeUnit unit:參數(shù)keepAliveTime的時間單位蓝晒,一個時間單位枚舉類。
  • BlockingQueue workQueue:一個阻塞隊列帖鸦,用來存儲等待執(zhí)行任務的隊列芝薇,這個參數(shù)選擇也很重要,會對線程池的運行過程產(chǎn)生重大影響作儿,一般來說剩燥,這里的阻塞隊列就是(ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue灭红;)。
  • ThreadFactory ThreadFactory:線程工廠口注,主要用來創(chuàng)建線程变擒;可以是一個自定義的線程工廠,默認就是Executors.defaultThreadFactory()寝志。用來在線程池中創(chuàng)建線程娇斑。
  • RejectedExecutionHandler handler:表示當拒絕處理任務時的策略,也是可以自定義的材部,默認是我們前面的4種取值:
  • ThreadPoolExecutor.AbortPolicy(默認的毫缆,一言不合即拋異常的)
  • ThreadPoolExecutor.DiscardPolicy(一言不合就丟棄任務)
  • ThreadPoolExecutor.DiscardOldestPolicy(一言不合就把最近的任務給拋棄,然后執(zhí)行當前任務)
  • ThreadPoolExecutor.CallerRunsPolicy(由調(diào)用者所在線程來執(zhí)行任務)

所以想自定義線程池就可以從上面的幾個參數(shù)入手乐导。接下來具體看下代碼,了解一下實現(xiàn)原理:

   // 默認異常處理機制
   private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
   //任務緩存隊列苦丁,用來存放等待執(zhí)行的任務
   private final BlockingQueue<Runnable> workQueue;
   //線程池的主要狀態(tài)鎖,對線程狀態(tài)(比如線程大小物臂、runState等)的改變都需要這個鎖
   private final ReentrantLock mainLock = new ReentrantLock();
   //用來存放工作集
   private final HashSet<Worker> workers = new HashSet<Worker>();
   //volatile 可變變量關鍵字旺拉,寫的時候用mainLock做鎖,讀的時候無鎖棵磷,高性能
   private volatile long keepAliveTime;
   //是否允許核心線程超時
   private volatile boolean allowCoreThreadTimeOut;
   //核心線程數(shù)量
   private volatile int corePoolSize;
   //線程最大線程數(shù)量
   private volatile int maximumPoolSize;
   //任務拒絕策略
   private volatile RejectedExcutionHandler handler;

結合之前的知識蛾狗,大概就能猜出里面是怎么實現(xiàn)的了,具體可以參考一下JDK的源代碼仪媒,這樣我們就能做到了解原理又會用了沉桌。

5.自定義實現(xiàn)一個簡單的Web請求連接池

我們來自定義一個簡單的Web請求線程池。模仿Web服務的需求場景說明如下:

  • 服務器可容納的最小請求數(shù)是多少算吩。
  • 可以動態(tài)擴充的請求數(shù)大小是多少留凭。
  • 多久回收多余線程數(shù)即請求數(shù)。
  • 用戶訪問量打了怎么處理赌莺。
  • 線程隊列機制采取有優(yōu)先級的排隊的執(zhí)行機制冰抢。
    根據(jù)上面的場景,看下這個線程池如何編寫艘狭?
public class MyExecutors extends Executors{
    //利用默認線程工廠和PriorityBlockingQueue隊列機制鹦倚,當然了沮明,我們還可以自定義ThreadFactory和繼承queue進行自定義擴展
   public static ExecutorService newMyWebThreadPool(int minSpareThreads,int maxThreads,int maxIdleTime){
    return new ThreadPoolExecutor(minSpareThread,maxThreads,maxIdleTime,TimeUnit.MILLISECONDS,
          new PriorityBlockingQueue<Runnable>());
  }
}

6.線程池在工作中的錯誤使用

  • (1)分不清楚線程是單例還是多對象。
  • (2)線程池數(shù)量設置很大邑贴。
  • (3)注意死鎖問題
最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市愧哟,隨后出現(xiàn)的幾起案子劣挫,更是在濱河造成了極大的恐慌,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,185評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件掠哥,死亡現(xiàn)場離奇詭異巩踏,居然都是意外死亡,警方通過查閱死者的電腦和手機续搀,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評論 3 393
  • 文/潘曉璐 我一進店門塞琼,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人禁舷,你說我怎么就攤上這事彪杉。” “怎么了牵咙?”我有些...
    開封第一講書人閱讀 163,524評論 0 353
  • 文/不壞的土叔 我叫張陵派近,是天一觀的道長。 經(jīng)常有香客問我洁桌,道長渴丸,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,339評論 1 293
  • 正文 為了忘掉前任战坤,我火速辦了婚禮曙强,結果婚禮上,老公的妹妹穿的比我還像新娘途茫。我一直安慰自己碟嘴,他們只是感情好,可當我...
    茶點故事閱讀 67,387評論 6 391
  • 文/花漫 我一把揭開白布囊卜。 她就那樣靜靜地躺著娜扇,像睡著了一般。 火紅的嫁衣襯著肌膚如雪栅组。 梳的紋絲不亂的頭發(fā)上雀瓢,一...
    開封第一講書人閱讀 51,287評論 1 301
  • 那天,我揣著相機與錄音玉掸,去河邊找鬼刃麸。 笑死,一個胖子當著我的面吹牛司浪,可吹牛的內(nèi)容都是我干的泊业。 我是一名探鬼主播,決...
    沈念sama閱讀 40,130評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼啊易,長吁一口氣:“原來是場噩夢啊……” “哼吁伺!你這毒婦竟也來了?” 一聲冷哼從身側響起租谈,我...
    開封第一講書人閱讀 38,985評論 0 275
  • 序言:老撾萬榮一對情侶失蹤篮奄,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體窟却,經(jīng)...
    沈念sama閱讀 45,420評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡昼丑,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,617評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了夸赫。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片矾克。...
    茶點故事閱讀 39,779評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖憔足,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情酒繁,我是刑警寧澤滓彰,帶...
    沈念sama閱讀 35,477評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站州袒,受9級特大地震影響揭绑,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜郎哭,卻給世界環(huán)境...
    茶點故事閱讀 41,088評論 3 328
  • 文/蒙蒙 一他匪、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧夸研,春花似錦邦蜜、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,716評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至姐扮,卻和暖如春絮供,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背茶敏。 一陣腳步聲響...
    開封第一講書人閱讀 32,857評論 1 269
  • 我被黑心中介騙來泰國打工壤靶, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人惊搏。 一個月前我還...
    沈念sama閱讀 47,876評論 2 370
  • 正文 我出身青樓贮乳,卻偏偏與公主長得像,于是被迫代替她去往敵國和親胀屿。 傳聞我的和親對象是個殘疾皇子塘揣,可洞房花燭夜當晚...
    茶點故事閱讀 44,700評論 2 354

推薦閱讀更多精彩內(nèi)容