Java線程池詳解(二)

三缤至、ThreadPoolExecutor解析

上文中描述了Java中線程池相關(guān)的架構(gòu)牡辽,了解了這些內(nèi)容其實(shí)我們就可以使用java的線程池為我們工作了,使用其提供的線程池我們可以很方便的寫出高質(zhì)量的多線程代碼街夭,本節(jié)將分析ThreadPoolExecutor的實(shí)現(xiàn)祸轮,來探索線程池的運(yùn)行原理。下面的圖片展示了ThreadPoolExecutor的類圖:

Paste_Image.png

ThreadPoolExecutor的類圖
下面是幾個(gè)比較關(guān)鍵的類成員:

private final BlockingQueue<Runnable> workQueue;  // 任務(wù)隊(duì)列哀峻,我們的任務(wù)會(huì)添加到該隊(duì)列里面涡相,線程將從該隊(duì)列獲取任務(wù)來執(zhí)行
 
 private final HashSet<Worker> workers = new HashSet<Worker>();//任務(wù)的執(zhí)行值集合,來消費(fèi)workQueue里面的任務(wù)
  
 private volatile ThreadFactory threadFactory;//線程工廠
      
 private volatile RejectedExecutionHandler handler;//拒絕策略剩蟀,默認(rèn)會(huì)拋出異異常催蝗,還要其他幾種拒絕策略如下:
  
 1、CallerRunsPolicy:在調(diào)用者線程里面運(yùn)行該任務(wù)
 2育特、DiscardPolicy:丟棄任務(wù)
 3丙号、DiscardOldestPolicy:丟棄workQueue的頭部任務(wù)
  
private volatile int corePoolSize;//最下保活work數(shù)量
 
private volatile int maximumPoolSize;//work上限

我們嘗試執(zhí)行submit方法,下面是執(zhí)行的關(guān)鍵路徑犬缨,總結(jié)起來就是:如果Worker數(shù)量還沒達(dá)到上限則繼續(xù)創(chuàng)建喳魏,否則提交任務(wù)到workQueue,然后讓worker來調(diào)度運(yùn)行任務(wù)遍尺。

step 1: <ExecutorService>
Future<?> submit(Runnable task); 
 
step 2:<AbstractExecutorService>
    public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}
 
step 3:<Executor>
void execute(Runnable command);
 
step 4:<ThreadPoolExecutor>
 public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) { //提交我們的額任務(wù)到workQueue
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false)) //使用maximumPoolSize作為邊界
        reject(command); //還不行截酷?拒絕提交的任務(wù)
}
 
step 5:<ThreadPoolExecutor>
private boolean addWorker(Runnable firstTask, boolean core)
 
 
step 6:<ThreadPoolExecutor>
w = new Worker(firstTask); //包裝任務(wù)
final Thread t = w.thread; //獲取線程(包含任務(wù))
workers.add(w);   // 任務(wù)被放到works中
t.start(); //執(zhí)行任務(wù)

上面的流程是高度概括的涮拗,實(shí)際情況遠(yuǎn)比這復(fù)雜得多乾戏,但是我們關(guān)心的是怎么打通整個(gè)流程,所以這樣分析問題是沒有太大的問題的三热。觀察上面的流程鼓择,我們發(fā)現(xiàn)其實(shí)關(guān)鍵的地方在于Worker,如果弄明白它是如何工作的就漾,那么我們也就大概明白了線程池是怎么工作的了呐能。下面分析一下Worker類。

Paste_Image.png

worker類圖
上面的圖片展示了Worker的類關(guān)系圖抑堡,關(guān)鍵在于他實(shí)現(xiàn)了Runnable接口摆出,所以問題的關(guān)鍵就在于run方法上。在這之前首妖,我們來看一下Worker類里面的關(guān)鍵成員:

final Thread thread;
 
Runnable firstTask; //我們提交的任務(wù)偎漫,可能被立刻執(zhí)行,也可能被放到隊(duì)列里面

thread是Worker的工作線程有缆,上面的分析我們也發(fā)現(xiàn)了在addWorker中會(huì)獲取worker里面的thread然后start象踊,也就是這個(gè)線程的執(zhí)行,而Worker實(shí)現(xiàn)了Runnable接口棚壁,所以在構(gòu)造thread的時(shí)候Worker將自己傳遞給了構(gòu)造函數(shù)杯矩,thread.start執(zhí)行的其實(shí)就是Worker的run方法。下面是run方法的內(nèi)容:

  public void run() {
        runWorker(this);
    }
     
    final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

我們來分析一下runWorker這個(gè)方法袖外,這就是整個(gè)線程池的核心史隆。首先獲取到了我們剛提交的任務(wù)firstTask,然后會(huì)循環(huán)從workQueue里面獲取任務(wù)來執(zhí)行曼验,獲取任務(wù)的方法如下:

private Runnable getTask() {
       boolean timedOut = false; // Did the last poll() time out?
 
       for (;;) {
           int c = ctl.get();
           int rs = runStateOf(c);
 
           // Check if queue empty only if necessary.
           if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
               decrementWorkerCount();
               return null;
           }
 
           int wc = workerCountOf(c);
 
           // Are workers subject to culling?
           boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
 
           if ((wc > maximumPoolSize || (timed && timedOut))
               && (wc > 1 || workQueue.isEmpty())) {
               if (compareAndDecrementWorkerCount(c))
                   return null;
               continue;
           }
 
           try {
               Runnable r = timed ?
                   workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                   workQueue.take();
               if (r != null)
                   return r;
               timedOut = true;
           } catch (InterruptedException retry) {
               timedOut = false;
           }
       }
   }

其實(shí)核心也就一句:

Runnable r = timed ?
           workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
           workQueue.take();

我們再回頭看一下execute逆害,其實(shí)我們上面只走了一條邏輯,在execute的時(shí)候蚣驼,我們的worker的數(shù)量還沒有到達(dá)我們設(shè)定的corePoolSize的時(shí)候魄幕,會(huì)走上面我們分析的邏輯,而如果達(dá)到了我們設(shè)定的閾值之后颖杏,execute中會(huì)嘗試去提交任務(wù)纯陨,如果提交成功了就結(jié)束,否則會(huì)拒絕任務(wù)的提交。我們上面還提到一個(gè)成員:maximumPoolSize翼抠,其實(shí)線程池的最大的Worker數(shù)量應(yīng)該是maximumPoolSize咙轩,但是我們上面的分析是corePoolSize,這是因?yàn)槲覀兊膒rivate boolean addWorker(Runnable firstTask, boolean core)的參數(shù)core的值來控制的阴颖,core為true則使用corePoolSize來設(shè)定邊界活喊,否則使用maximumPoolSize來設(shè)定邊界盗尸。直觀的解釋一下霎肯,當(dāng)線程池里面的Worker數(shù)量還沒有到corePoolSize,那么新添加的任務(wù)會(huì)伴隨著產(chǎn)生一個(gè)新的worker匣砖,如果Worker的數(shù)量達(dá)到了corePoolSize偎肃,那么就將任務(wù)存放在阻塞隊(duì)列中等待Worker來獲取執(zhí)行煞烫,如果沒有辦法再向阻塞隊(duì)列放任務(wù)了,那么這個(gè)時(shí)候maximumPoolSize就變得有用了累颂,新的任務(wù)將會(huì)伴隨著產(chǎn)生一個(gè)新的Worker滞详,如果線程池里面的Worker已經(jīng)達(dá)到了maximumPoolSize,那么接下來提交的任務(wù)只能被拒絕策略拒絕了紊馏×霞ⅲ可以參考下面的描述來理解:

* When a new task is submitted in method {@link #execute(Runnable)},
* and fewer than corePoolSize threads are running, a new thread is
* created to handle the request, even if other worker threads are
* idle.  If there are more than corePoolSize but less than
* maximumPoolSize threads running, a new thread will be created only
* if the queue is full.  By setting corePoolSize and maximumPoolSize
* the same, you create a fixed-size thread pool. By setting
* maximumPoolSize to an essentially unbounded value such as {@code
* Integer.MAX_VALUE}, you allow the pool to accommodate an arbitrary
* number of concurrent tasks. Most typically, core and maximum pool
* sizes are set only upon construction, but they may also be changed
* dynamically using {@link #setCorePoolSize} and {@link
* #setMaximumPoolSize}.

在此需要說明一點(diǎn),有一個(gè)重要的成員:keepAliveTime朱监,當(dāng)線程池里面的線程數(shù)量超過corePoolSize了岸啡,那么超出的線程將會(huì)在空閑keepAliveTime之后被terminated《呐螅可以參考下面的文檔:

* If the pool currently has more than corePoolSize threads,
* excess threads will be terminated if they have been idle for more
* than the keepAliveTime (see {@link #getKeepAliveTime(TimeUnit)}).

歡迎加入學(xué)習(xí)交流群569772982凰狞,大家一起學(xué)習(xí)交流。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末沛慢,一起剝皮案震驚了整個(gè)濱河市赡若,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌团甲,老刑警劉巖逾冬,帶你破解...
    沈念sama閱讀 218,451評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異躺苦,居然都是意外死亡身腻,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,172評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門匹厘,熙熙樓的掌柜王于貴愁眉苦臉地迎上來嘀趟,“玉大人,你說我怎么就攤上這事愈诚∷矗” “怎么了牛隅?”我有些...
    開封第一講書人閱讀 164,782評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長酌泰。 經(jīng)常有香客問我媒佣,道長,這世上最難降的妖魔是什么陵刹? 我笑而不...
    開封第一講書人閱讀 58,709評(píng)論 1 294
  • 正文 為了忘掉前任默伍,我火速辦了婚禮,結(jié)果婚禮上衰琐,老公的妹妹穿的比我還像新娘也糊。我一直安慰自己,他們只是感情好碘耳,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,733評(píng)論 6 392
  • 文/花漫 我一把揭開白布显设。 她就那樣靜靜地躺著框弛,像睡著了一般辛辨。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上瑟枫,一...
    開封第一講書人閱讀 51,578評(píng)論 1 305
  • 那天斗搞,我揣著相機(jī)與錄音,去河邊找鬼慷妙。 笑死僻焚,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的膝擂。 我是一名探鬼主播虑啤,決...
    沈念sama閱讀 40,320評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼架馋!你這毒婦竟也來了狞山?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,241評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤叉寂,失蹤者是張志新(化名)和其女友劉穎萍启,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體屏鳍,經(jīng)...
    沈念sama閱讀 45,686評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡勘纯,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,878評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了钓瞭。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片驳遵。...
    茶點(diǎn)故事閱讀 39,992評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖山涡,靈堂內(nèi)的尸體忽然破棺而出堤结,到底是詐尸還是另有隱情搏讶,我是刑警寧澤,帶...
    沈念sama閱讀 35,715評(píng)論 5 346
  • 正文 年R本政府宣布霍殴,位于F島的核電站媒惕,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏来庭。R本人自食惡果不足惜妒蔚,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,336評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望月弛。 院中可真熱鬧肴盏,春花似錦、人聲如沸帽衙。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,912評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽厉萝。三九已至恍飘,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間谴垫,已是汗流浹背章母。 一陣腳步聲響...
    開封第一講書人閱讀 33,040評(píng)論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留翩剪,地道東北人乳怎。 一個(gè)月前我還...
    沈念sama閱讀 48,173評(píng)論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像前弯,于是被迫代替她去往敵國和親蚪缀。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,947評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容