線程池任務(wù)執(zhí)行過程

前言

JAVA通過多線程的方式實(shí)現(xiàn)并發(fā)俐筋,為了方便線程池的管理知纷,JAVA采用線程池的方式對(duì)線線程的整個(gè)生命周期進(jìn)行管理免猾。1.5后引入的Executor框架的最大優(yōu)點(diǎn)是把任務(wù)的提交和執(zhí)行解耦岂嗓。

要執(zhí)行任務(wù)的人只需把Task描述清楚互站,然后提交即可私蕾。這個(gè)Task是怎么被執(zhí)行的,被誰(shuí)執(zhí)行的胡桃,什么時(shí)候執(zhí)行的踩叭,提交的人就不用關(guān)心了。

具體點(diǎn)講翠胰,提交一個(gè)Callable對(duì)象給ExecutorService(如最常用的線程池ThreadPoolExecutor)容贝,將得到一個(gè)Future對(duì)象,調(diào)用Future對(duì)象的get方法等待執(zhí)行結(jié)果就好了之景。

一個(gè)簡(jiǎn)單的例子

// 一個(gè)有5個(gè)作業(yè)線程的線程池斤富,
//老大的老大找到一個(gè)管5個(gè)人的小團(tuán)隊(duì)的老大
   ExecutorService executorService = Executors.newFixedThreadPool(5);
     //提交作業(yè)給老大,作業(yè)內(nèi)容封裝在Callable中锻狗,約定好了輸出的類型是String满力。
        String outputs = executorService.submit(
                 new Callable<String>() {
                     public String call() throws Exception {
                         return "I am a task";
                     }
                     //提交后就等著結(jié)果吧,到底是手下5個(gè)作業(yè)中誰(shuí)領(lǐng)到任務(wù)了屋谭,老大是不關(guān)心的脚囊。
                 }).get();

        System.out.println(outputs);

使用上非常簡(jiǎn)單,通過一個(gè)工場(chǎng)類Executors創(chuàng)建了一個(gè)工作類桐磁,工場(chǎng)類返回一個(gè)ExecutorService對(duì)象悔耘。

執(zhí)行過程

任務(wù)提交

ExecutorService是一個(gè)接口,沒有具體實(shí)現(xiàn)我擂,最后的具體實(shí)現(xiàn)應(yīng)該由ThreadPoolExecutor實(shí)現(xiàn)的衬以。

Executor 定義了一個(gè)execute接口,ExecutorService繼承了Executor校摩,并定義了管理線程生命周期的接口看峻,可以接受提交任務(wù)、執(zhí)行任務(wù)衙吩、關(guān)閉服務(wù)互妓。

抽象類AbstractExecutorService 實(shí)現(xiàn)了ExecutorService接口,也實(shí)現(xiàn)了接口定義的默認(rèn)行為;ThreadPoolExecutor繼承了AbstractExecutorService。

AbstractExecutorService任務(wù)提交的submit方法有三個(gè)實(shí)現(xiàn)冯勉。

  1. 接收一個(gè)Runnable的Task澈蚌,沒有執(zhí)行結(jié)果;

     public Future<?> submit(Runnable task) {
         if (task == null) throw new NullPointerException();
         RunnableFuture<Void> ftask = newTaskFor(task, null);
         execute(ftask);
         return ftask;
     }
    
  2. 接收兩個(gè)參數(shù):一個(gè)任務(wù)灼狰,一個(gè)執(zhí)行結(jié)果宛瞄;

     public <T> Future<T> submit(Runnable task, T result) {
         if (task == null) throw new NullPointerException();
         RunnableFuture<T> ftask = newTaskFor(task, result);
         execute(ftask);
         return ftask;
     }
    
  3. 接收一個(gè)Callable,本身就包含執(zhí)任務(wù)內(nèi)容和執(zhí)行結(jié)果交胚。

     public <T> Future<T> submit(Callable<T> task) {
         if (task == null) throw new NullPointerException();
         RunnableFuture<T> ftask = newTaskFor(task);
         execute(ftask);
         return ftask;
     }
    

submit方法的返回結(jié)果是Future類型份汗,調(diào)用該接口定義的get方法即可獲得執(zhí)行結(jié)果。 V get() 方法的返回值類型V是在提交任務(wù)時(shí)就約定好了的蝴簇。

分析
  1. 看AbstractExecutorService中submit(Callable<T> task)杯活,構(gòu)造好一個(gè)FutureTask對(duì)象后,調(diào)用execute()方法執(zhí)行任務(wù)熬词。

    public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
    }

  2. Submit傳入的參數(shù)都被封裝成了FutureTask類型來execute的轩猩,對(duì)應(yīng)前面三個(gè)不同的參數(shù)類型都會(huì)封裝成FutureTask。

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
    }

  3. Executor接口中定義的void execute(Runnable command)方法的作用就是執(zhí)行提交的任務(wù)荡澎,該方法在抽象類AbstractExecutorService的子類ThreadPoolExecutor中實(shí)現(xiàn)。

一個(gè)任務(wù)的執(zhí)行過程

execute

先補(bǔ)充下ThreadPoolExecutor有兩個(gè)最重要的集合屬性晤锹,分別是存儲(chǔ)接收任務(wù)的任務(wù)隊(duì)列和用來干活的作業(yè)集合摩幔。

//任務(wù)隊(duì)列
private final BlockingQueue<Runnable> workQueue;
//作業(yè)線程集合
private final HashSet<Worker> workers = new HashSet<Worker>();

execute源碼分析

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. 如果當(dāng)前正在執(zhí)行的Worker數(shù)量比corePoolSize(核心線程大小)要小。
     * 直接創(chuàng)建一個(gè)新的Worker執(zhí)行任務(wù)鞭铆,會(huì)調(diào)用addWorker方法
     *
     * 2. 如果當(dāng)前正在執(zhí)行的Worker數(shù)量大于等于corePoolSize(核心線程大小)或衡。
     * 將任務(wù)放到阻塞隊(duì)列里,如果阻塞隊(duì)列沒滿并且狀態(tài)是RUNNING的話车遂,直接丟到阻塞隊(duì)列封断,否則執(zhí)行第3步。
     * 丟到阻塞隊(duì)列之后舶担,還需要再做一次驗(yàn)證(丟到阻塞隊(duì)列之后可能另外一個(gè)線程關(guān)閉了線程池或者剛剛加入到隊(duì)列的線程死了)坡疼。
     * 如果這個(gè)時(shí)候線程池不在RUNNING狀態(tài),把剛剛丟入隊(duì)列的任務(wù)remove掉衣陶,調(diào)用reject方法柄瑰,
     * 否則查看Worker數(shù)量,如果Worker數(shù)量為0剪况,起一個(gè)新的Worker去阻塞隊(duì)列里拿任務(wù)執(zhí)行
     *
     * 3. 丟到阻塞失敗的話教沾,會(huì)調(diào)用addWorker方法嘗試起一個(gè)新的Worker去阻塞隊(duì)列拿任務(wù)并執(zhí)行任務(wù),
     * 如果這個(gè)新的Worker創(chuàng)建失敗译断,調(diào)用reject方法
     */
    int c = ctl.get();

    // 第一個(gè)步驟授翻,滿足線程池中的線程大小比核心線程大小要小
    if (workerCountOf(c) < corePoolSize) {

        // addWorker方法第二個(gè)參數(shù)true表示使用基本大小
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }

    // 第二個(gè)步驟,線程池的線程大小比核心線程大小要大,
    // 并且線程池還在RUNNING狀態(tài)堪唐,阻塞隊(duì)列也沒滿的情況巡语,加阻塞隊(duì)列里
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();

    // 雖然滿足了第二個(gè)步驟,但是這個(gè)時(shí)候可能突然線程池關(guān)閉了羔杨,所以再做一層判斷
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }

    // 第三個(gè)步驟捌臊,直接使用線程池最大大小。addWorker方法第二個(gè)參數(shù)false表示使用最大大小
    else if (!addWorker(command, false))
        reject(command);
}

在前面方法中都會(huì)調(diào)用addWorker(Runnable firstTask, boolean core)方法創(chuàng)建一個(gè)工作線程兜材,差別是創(chuàng)建的有些工作線程上面關(guān)聯(lián)接收到的任務(wù)firstTask理澎,有些沒有。該方法為當(dāng)前接收到的任務(wù)firstTask創(chuàng)建Worker曙寡,并將Worker添加到作業(yè)集合HashSet<Worker> workers中糠爬,并啟動(dòng)作業(yè)。

addWorker源碼分析

// 返回值是boolean類型举庶,true表示新任務(wù)被接收了执隧,并且執(zhí)行了。否則是false

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c); // 線程池當(dāng)前狀態(tài)

        // 這個(gè)判斷轉(zhuǎn)換成 rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty)户侥。 
        // 概括為3個(gè)條件:
        // 1. 線程池不在RUNNING狀態(tài)并且狀態(tài)是STOP镀琉、TIDYING或TERMINATED中的任意一種狀態(tài)

        // 2. 線程池不在RUNNING狀態(tài),線程池接受了新的任務(wù) 

        // 3. 線程池不在RUNNING狀態(tài)蕊唐,阻塞隊(duì)列為空屋摔。  滿足這3個(gè)條件中的任意一個(gè)的話,拒絕執(zhí)行任務(wù)

        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
                firstTask == null &&
                ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c); // 線程池線程個(gè)數(shù)
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize)) // 如果線程池線程數(shù)量超過線程池最大容量或者線程數(shù)量超過了基本大小(core參數(shù)為true替梨,core參數(shù)為false的話判斷超過最大大小)
                return false; // 超過直接返回false
            if (compareAndIncrementWorkerCount(c)) // 沒有超過各種大小的話钓试,cas操作線程池線程數(shù)量+1,cas成功的話跳出循環(huán)
                break retry;
            c = ctl.get();  // 重新檢查狀態(tài)
            if (runStateOf(c) != rs) // 如果狀態(tài)改變了副瀑,重新循環(huán)操作
            continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    // 參數(shù) firstTask != null, core = true
    // 驗(yàn)證可以滿足可新增線程的條件
    boolean workerStarted = false; // 任務(wù)是否成功啟動(dòng)標(biāo)識(shí)
    boolean workerAdded = false; // 任務(wù)是否添加成功標(biāo)識(shí)
    Worker w = null;
    try {
        final ReentrantLock mainLock = this.mainLock; // 得到線程池的可重入鎖
        w = new Worker(firstTask); // 基于任務(wù)firstTask構(gòu)造worker
        final Thread t = w.thread; 
        if (t != null) { 
            mainLock.lock(); // 鎖住弓熏,防止并發(fā)
            try {
                // 在鎖住之后再重新檢測(cè)一下狀態(tài)
                int c = ctl.get();
                int rs = runStateOf(c);

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) { // 如果線程池在RUNNING狀態(tài)或者線程池在SHUTDOWN狀態(tài)并且任務(wù)是個(gè)null
                    if (t.isAlive()) 
                        throw new IllegalThreadStateException(); 
                        workers.add(w); 
                    int s = workers.size(); 
                    if (s > largestPoolSize) 
                        largestPoolSize = s;
                    workerAdded = true; // 標(biāo)識(shí)一下任務(wù)已經(jīng)添加成功
                }
            } finally {
                mainLock.unlock(); // 解鎖
            }
            if (workerAdded) { 
                t.start(); // 線程啟動(dòng),調(diào)用worker.run
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted) // 任務(wù)啟動(dòng)失敗
            addWorkerFailed(w);
    }
    return workerStarted;
}

任務(wù)執(zhí)行

private boolean addWorker(Runnable firstTask, boolean core) {
            ...
            ...
    
            if (workerAdded) { 
                t.start(); // 線程啟動(dòng)糠睡,調(diào)用worker.run
                workerStarted = true;
            }
            ...
    return workerStarted;
}

Worker中的線程start的時(shí)候挽鞠,調(diào)用Worker本身run方法

run()內(nèi)部調(diào)用runWorker(Worker w),就是在一直調(diào)用getTask()方法獲取任務(wù),然后調(diào)用 runTask(task)方法執(zhí)行任務(wù)铜幽。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    ...
    ...
        //循環(huán)從線程池的任務(wù)隊(duì)列獲取任務(wù)
        while (task != null || (task = getTask()) != null) {
                    ...
                    ...
                try {
                    //執(zhí)行任務(wù)
                    task.run();
                } catch (RuntimeException x) {
                            ...
                            ...
                } finally {
                    //執(zhí)行正常完成
                    afterExecute(task, thrown);
                }
        }finally {
            //調(diào)用processWorkerExit方法進(jìn)行回收
            processWorkerExit(w, completedAbruptly);
        }
           
}

getTask()方法是ThreadPoolExecutor提供給其內(nèi)部類Worker的的方法滞谢。作用就是一個(gè),從任務(wù)隊(duì)列中取任務(wù)除抛,源源不斷地輸出任務(wù)狮杨。

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
       ...
    for (;;) {
       ...

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                //從任務(wù)隊(duì)列的頭部取任務(wù)
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

如果發(fā)生了以下四件事中的任意一件,那么Worker需要被回收:

  1. Worker個(gè)數(shù)比線程池最大大小要大

  2. 線程池處于STOP狀態(tài)

  3. 線程池處于SHUTDOWN狀態(tài)并且阻塞隊(duì)列為空

  4. 使用超時(shí)時(shí)間從阻塞隊(duì)列里拿數(shù)據(jù)到忽,并且超時(shí)之后沒有拿到數(shù)據(jù)(allowCoreThreadTimeOut || workerCount > corePoolSize)

如果getTask返回的是null橄教,那說明阻塞隊(duì)列已經(jīng)沒有任務(wù)了清寇,那么會(huì)調(diào)用processWorkerExit方法進(jìn)行回收:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        // 集合移除掉需要回收的Worker
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    // 嘗試結(jié)束線程池
    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        //如果Worker不是異常而死亡結(jié)束流程
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        //異常結(jié)束,新開啟一個(gè)Worker
        //開啟條件:由于用戶任務(wù)異常退出; Worker任務(wù)少于corePoolSize或者工作正在運(yùn)行;阻塞隊(duì)列不為空,但沒有workers护蝶。
        addWorker(null, false);
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末华烟,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子持灰,更是在濱河造成了極大的恐慌盔夜,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,311評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件堤魁,死亡現(xiàn)場(chǎng)離奇詭異喂链,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)妥泉,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,339評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門椭微,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人盲链,你說我怎么就攤上這事蝇率。” “怎么了刽沾?”我有些...
    開封第一講書人閱讀 152,671評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵本慕,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我侧漓,道長(zhǎng)间狂,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,252評(píng)論 1 279
  • 正文 為了忘掉前任火架,我火速辦了婚禮,結(jié)果婚禮上忙菠,老公的妹妹穿的比我還像新娘何鸡。我一直安慰自己,他們只是感情好牛欢,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,253評(píng)論 5 371
  • 文/花漫 我一把揭開白布骡男。 她就那樣靜靜地躺著,像睡著了一般傍睹。 火紅的嫁衣襯著肌膚如雪隔盛。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,031評(píng)論 1 285
  • 那天拾稳,我揣著相機(jī)與錄音吮炕,去河邊找鬼。 笑死访得,一個(gè)胖子當(dāng)著我的面吹牛龙亲,可吹牛的內(nèi)容都是我干的陕凹。 我是一名探鬼主播,決...
    沈念sama閱讀 38,340評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼鳄炉,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼杜耙!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起拂盯,我...
    開封第一講書人閱讀 36,973評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤佑女,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后谈竿,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體团驱,經(jīng)...
    沈念sama閱讀 43,466評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,937評(píng)論 2 323
  • 正文 我和宋清朗相戀三年榕订,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了店茶。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,039評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡劫恒,死狀恐怖贩幻,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情两嘴,我是刑警寧澤丛楚,帶...
    沈念sama閱讀 33,701評(píng)論 4 323
  • 正文 年R本政府宣布,位于F島的核電站憔辫,受9級(jí)特大地震影響趣些,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜贰您,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,254評(píng)論 3 307
  • 文/蒙蒙 一坏平、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧锦亦,春花似錦舶替、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,259評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至抛蚁,卻和暖如春陈醒,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背瞧甩。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評(píng)論 1 262
  • 我被黑心中介騙來泰國(guó)打工钉跷, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人肚逸。 一個(gè)月前我還...
    沈念sama閱讀 45,497評(píng)論 2 354
  • 正文 我出身青樓尘应,卻偏偏與公主長(zhǎng)得像惶凝,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子犬钢,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,786評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容