源碼閱讀 - ThreadPoolExecutor

0. ThreadPoolExecutor簡介

  • ExecutorService的一種實現(xiàn)類丢氢,提供線程池的管理方法
    ThreadPoolExecutor類圖.png

    ThreadPoolExecutor繼承了AbstractExecutorService抽象類傅联,主要提供了線程池生命周期的管理、任務(wù)提交的方法疚察。
    提交任務(wù):execute submit方法
    關(guān)閉線程池:shutdown shutdownNow方法

1. 主要屬性介紹

ctl

AtomicInteger ctl線程池的狀態(tài)及容量控制蒸走,低29位表示容量,高3位表示狀態(tài)

private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

corePoolSize

核心線程數(shù)稍浆,當(dāng)前運(yùn)行線程數(shù)小于此數(shù)目時直接創(chuàng)建核心線程载碌,大于此數(shù)目時會先將任務(wù)入隊列猜嘱,入隊列失敗才會再創(chuàng)建非核心線程衅枫,但保證總數(shù)目不大于maximumPoolSize,失敗執(zhí)行reject方法朗伶。

maximumPoolSize

線程池中最多線程數(shù)目弦撩。

keepAliveTime

線程存活時間,線程數(shù)目大于corePoolSize或者allowCoreThreadTimeOuttrue時论皆,如有線程在此時間內(nèi)沒有執(zhí)行任務(wù)則會結(jié)束線程益楼。

allowCoreThreadTimeOut

是否允許核心線程到時間后結(jié)束線程。

workQueue

BlockingQueue對象点晴,存放待執(zhí)行任務(wù)感凤。

2. 主要方法介紹

構(gòu)造方法

構(gòu)造方法有4個,但是最終都是調(diào)用最后一個粒督,主要是設(shè)置一些屬性

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

execute

向線程池提交任務(wù)

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     * 如果當(dāng)前運(yùn)行的線程數(shù)小于corePoolSize陪竿,嘗試啟動一個新線程
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     * 成功放入隊列,重復(fù)檢查是否需要創(chuàng)建一個新線程
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     * 不能入隊屠橄,嘗試添加一個新線程族跛,如果失敗,拒絕任務(wù)
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        //添加一個核心線程
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //達(dá)到corePoolSize锐墙,嘗試放入等待隊列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        //二次檢查礁哄,若線程池關(guān)閉,移除任務(wù)溪北,拒絕任務(wù)
        if (! isRunning(recheck) && remove(command))
            reject(command);
        //若當(dāng)前沒有線程桐绒,添加一個非核心線程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //放入等待隊列失敗夺脾,嘗試添加非核心線程
    else if (!addWorker(command, false))
        //添加非核心線程失敗,拒絕任務(wù)
        reject(command);
}

addWorker

添加一個工作線程

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        //檢查隊列是否為空
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            int wc = workerCountOf(c);
            //檢查線程容量限制
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //線程數(shù)+1
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //新建一個worker
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //將worker放入工作集中
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //啟動剛才的線程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

runWorker

在創(chuàng)建worker對象時茉继,線程參數(shù)是worker自身

this.thread = getThreadFactory().newThread(this);

所以啟動worker線程時執(zhí)行的是runWorker方法

/** Delegates main run loop to outer runWorker  */
public void run() {
    runWorker(this);
}

getTask方法負(fù)責(zé)從workQueue等待隊列中取出待執(zhí)行任務(wù)

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        //getTask表示某個worker的當(dāng)前任務(wù)完成劳翰,來取下一個任務(wù),如果線程池已經(jīng)關(guān)閉馒疹,則不繼續(xù)執(zhí)行佳簸,worker數(shù)目-1
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        //數(shù)量超出限制或者超時,worker數(shù)目-1
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            //限時用poll颖变,否則take阻塞
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            //r為null生均,超時了
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

runWorker方法執(zhí)行上面取到的task

final void runWorker(Worker w) {
    //獲取執(zhí)行線程
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            //如果線程池STOP了,但是wt沒中斷腥刹,中斷之
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //執(zhí)行task
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //worker退出時執(zhí)行昵时,如果是異常中斷,可能會新建一個worker來代替
        processWorkerExit(w, completedAbruptly);
    }
}

reject

任務(wù)提交失敗時捏悬,拒絕任務(wù)

final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

ThreadPoolExecutor提供了4中拒絕策略症昏,分別是

  • CallerRunsPolicy在調(diào)用線程中執(zhí)行
  • AbortPolicy丟棄任務(wù),拋出RejectedExecutionException
  • DiscardPolicy僅丟棄任務(wù)
  • DiscardOldestPolicy丟棄隊列中最早的任務(wù)垫卤,然后添加本任務(wù)

shutdown威彰、shutdownNow

關(guān)閉線程池

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //檢查權(quán)限,確保調(diào)用者有關(guān)閉權(quán)限
        checkShutdownAccess();
        //將線程池狀態(tài)設(shè)置為shutdown
        advanceRunState(SHUTDOWN);
        //中斷空閑線程
        interruptIdleWorkers();
        //結(jié)束回調(diào)
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        //狀態(tài)設(shè)置為stop
        advanceRunState(STOP);
        //中斷所有線程
        interruptWorkers();
        //返回未執(zhí)行的任務(wù)
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

3. 線程池的使用

通過JUC包內(nèi)提供的工具類Executors來創(chuàng)建一個線程池

  1. 線程數(shù)固定的線程池
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}
  1. 單線程的線程池穴肘,順序執(zhí)行任務(wù)
    其中FinalizableDelegatedExecutorServiceExecutorService的另一個實現(xiàn)類歇盼,使用了代理模式,其行為全部代理給ThreadPoolExecutor對象评抚。
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>(),
                                threadFactory));
}
  1. 緩存線程池
    沒有核心線程豹缀,隨用隨建,60s內(nèi)無任務(wù)則結(jié)束
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}

4. 總結(jié)

  1. 通過JUC工具類Executors創(chuàng)建線程池
  2. 通過execute submit向線程池提交任務(wù)
    • 提交Callable任務(wù)時慨代,submit會返回Future對象邢笙,可以通過此對象獲取結(jié)果
    • submit也是通過execute方法來提交任務(wù)
  3. 提交任務(wù)時
    • 如果當(dāng)前線程數(shù)小于核心線程數(shù),會創(chuàng)建一個核心線程侍匙,即使當(dāng)前有空閑線程
    • 如果大于核心線程數(shù)氮惯,任務(wù)會入隊,入隊失敗的話丈积,會創(chuàng)建一個非核心線程來處理筐骇,如果創(chuàng)建失敗,則會拒絕任務(wù)
  4. 線程的結(jié)束
    • 非核心線程在keepAliveTime時間內(nèi)未執(zhí)行任務(wù)則會結(jié)束
    • 如果allowCoreThreadTimeOuttrue江滨,核心線程在keepAliveTime時間內(nèi)未執(zhí)行任務(wù)也會結(jié)束
  5. 拒絕任務(wù)策略
    • CallerRunsPolicy在調(diào)用線程中執(zhí)行
    • AbortPolicy丟棄任務(wù)铛纬,拋出RejectedExecutionException
    • DiscardPolicy僅丟棄任務(wù)
    • DiscardOldestPolicy丟棄隊列中最早的任務(wù),然后添加本任務(wù)
  6. 線程池的結(jié)束
    • shutdown關(guān)閉線程池唬滑,不再接受新任務(wù)告唆,但是會執(zhí)行完等待隊列的任務(wù)
    • shutdownNow關(guān)閉線程池棺弊,執(zhí)行完或中斷當(dāng)前運(yùn)行線程,返回等待隊列的任務(wù)列表

5. 參考

  1. ThreadPoolExecutor源碼build 1.8.0_121-b13版本
  2. 并發(fā)編程3:線程池的使用與執(zhí)行流程
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末擒悬,一起剝皮案震驚了整個濱河市模她,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌懂牧,老刑警劉巖侈净,帶你破解...
    沈念sama閱讀 217,185評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異僧凤,居然都是意外死亡畜侦,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評論 3 393
  • 文/潘曉璐 我一進(jìn)店門躯保,熙熙樓的掌柜王于貴愁眉苦臉地迎上來旋膳,“玉大人,你說我怎么就攤上這事途事⊙榘茫” “怎么了?”我有些...
    開封第一講書人閱讀 163,524評論 0 353
  • 文/不壞的土叔 我叫張陵尸变,是天一觀的道長义图。 經(jīng)常有香客問我,道長振惰,這世上最難降的妖魔是什么歌溉? 我笑而不...
    開封第一講書人閱讀 58,339評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮骑晶,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘草慧。我一直安慰自己桶蛔,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,387評論 6 391
  • 文/花漫 我一把揭開白布漫谷。 她就那樣靜靜地躺著仔雷,像睡著了一般。 火紅的嫁衣襯著肌膚如雪舔示。 梳的紋絲不亂的頭發(fā)上碟婆,一...
    開封第一講書人閱讀 51,287評論 1 301
  • 那天,我揣著相機(jī)與錄音惕稻,去河邊找鬼竖共。 笑死,一個胖子當(dāng)著我的面吹牛俺祠,可吹牛的內(nèi)容都是我干的公给。 我是一名探鬼主播借帘,決...
    沈念sama閱讀 40,130評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼淌铐!你這毒婦竟也來了肺然?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,985評論 0 275
  • 序言:老撾萬榮一對情侶失蹤腿准,失蹤者是張志新(化名)和其女友劉穎际起,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體吐葱,經(jīng)...
    沈念sama閱讀 45,420評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡加叁,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,617評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了唇撬。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片它匕。...
    茶點故事閱讀 39,779評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖窖认,靈堂內(nèi)的尸體忽然破棺而出豫柬,到底是詐尸還是另有隱情,我是刑警寧澤扑浸,帶...
    沈念sama閱讀 35,477評論 5 345
  • 正文 年R本政府宣布烧给,位于F島的核電站,受9級特大地震影響喝噪,放射性物質(zhì)發(fā)生泄漏础嫡。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,088評論 3 328
  • 文/蒙蒙 一酝惧、第九天 我趴在偏房一處隱蔽的房頂上張望榴鼎。 院中可真熱鬧,春花似錦晚唇、人聲如沸巫财。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,716評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽平项。三九已至,卻和暖如春悍及,著一層夾襖步出監(jiān)牢的瞬間闽瓢,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,857評論 1 269
  • 我被黑心中介騙來泰國打工心赶, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留扣讼,地道東北人。 一個月前我還...
    沈念sama閱讀 47,876評論 2 370
  • 正文 我出身青樓园担,卻偏偏與公主長得像届谈,于是被迫代替她去往敵國和親枯夜。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,700評論 2 354

推薦閱讀更多精彩內(nèi)容

  • 為什么使用線程池 當(dāng)我們在使用線程時艰山,如果每次需要一個線程時都去創(chuàng)建一個線程湖雹,這樣實現(xiàn)起來很簡單,但是會有一個問題...
    閩越布衣閱讀 4,291評論 10 45
  • 線程池的概念和定義 在服務(wù)器端的業(yè)務(wù)應(yīng)用開發(fā)中曙搬,Web服務(wù)器(諸如Tomcat摔吏、Jetty)需要接受并處理http...
    dtdh閱讀 837評論 0 1
  • 世間最無情的便是佛。 佛家六根清凈纵装,四大皆空征讲,這便是最無情。 《大魚海棠》中橡娄,靈婆對椿說:“我告訴你什么事最可悲:...
    木貓閱讀 1,996評論 2 2
  • 清明節(jié)三天假期诗箍,和同事去她朋友家了。這三天我們是這樣度過的挽唉。 一直以來滤祖,我對于度假旅游的看法是自由,自由是旅游的核...
    獨步微云閱讀 102評論 0 0
  • 我一直都知道瓶籽,有很多事情都是在無病呻吟匠童。我并不很討厭這個世界,但是我有的時候總是感覺和這個世界不是很契合塑顺。 不知道...
    自童年起閱讀 248評論 0 1