java源碼-ThreadPoolExecutor(2)

開篇

?這篇文章的主要目標是為了講解清楚ThreadPoolExecutor的提交任務的過程,非常推薦靜下心來仔細閱讀。
java源碼-ThreadPoolExecutor(1)
java源碼-ThreadPoolExecutor(2)
java源碼-ThreadPoolExecutor(3)


ThreadPoolExecutor狀態(tài)介紹

?ThreadPoolExecutor針對線程池一共維護了五種狀態(tài)空幻,實現(xiàn)上用用高3位表示ThreadPoolExecutor的執(zhí)行狀態(tài),低29位維持線程池線程個數(shù)社牲,分別是:

  • RUNNING = -1 << COUNT_BITS = -1<<29 高三位為111
  • SHUTDOWN = 0 << COUNT_BITS = 0<<29 高三位為000
  • STOP = 1 << COUNT_BITS = 1<<29 高三位為001
  • TIDYING = 2 << COUNT_BITS = 2<<29 高三位為010
  • TERMINATED = 3 << COUNT_BITS = 3<<29 高三位為011
public class ThreadPoolExecutor extends AbstractExecutorService {
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // Integer.SIZE=32,Integer.SIZE-3=29,COUNT_BITS=29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 線程池最大線程數(shù)=536870911(2^29-1),CAPACITY二進制中低29為為1肝谭,高3位為0
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // 用高3位表示ThreadPoolExecutor的執(zhí)行狀態(tài)
    // RUNNING=111
    private static final int RUNNING    = -1 << COUNT_BITS;
    // SHUTDOWN=000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // STOP=001
    private static final int STOP       =  1 << COUNT_BITS;
    // TIDYING=010
    private static final int TIDYING    =  2 << COUNT_BITS;
    // TERMINATED=110
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    
    // runStateOf通過獲取高3位來對比
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // workerCountOf通過比較低29位來獲取線程數(shù)
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }

    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }


ThreadPoolExecutor任務提交過程

?ThreadPoolExecutor提交任務代碼是在AbstractExecutorService當中通過submit()方法實現(xiàn)的,按照兩個步驟來實現(xiàn):

  • 通過newTaskFor()方法創(chuàng)建待提交任務穴肘,該方法內(nèi)部的實現(xiàn)后面再分析歇盼。
  • 通過execute()方法提交task,execute的在ThreadPoolExecutor類中實現(xiàn)重寫评抚。
  • 進一步跟進ThreadPoolExecutor的execute方法豹缀。
public abstract class AbstractExecutorService implements ExecutorService {

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
}



?整個ThreadPoolExecutor的execute其實在源碼自帶的注釋中已經(jīng)寫的很清楚了,怕自己翻譯的不是特別所以這次直接把注釋也貼在代碼當中了慨代,整個過程分為三個過程:

  • 1邢笙、當前的線程數(shù)是否小于corePoolSize,新建core線程并運行第一個任務侍匙。
  • 2氮惯、如果第一步不滿足條件,那么就把任務提交到workQueue代表的隊列當中想暗。
  • 3妇汗、如果第二步不滿足條件,那么就就新建不屬于corePoolSize計數(shù)的線程(也就是新建core以外的線程)來進行處理说莫。
  • 4杨箭、如果都失敗那么就直接通過rejectHandler拒絕任務,步驟123當中任何檢測到線程池關閉的情況直接執(zhí)行任務拒絕储狭。
public class ThreadPoolExecutor extends AbstractExecutorService {
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
}



? ThreadPoolExecutor的addWorker方法有兩個參數(shù)互婿,Runnable firstTask代表待執(zhí)行任務, boolean core代表是否啟動核心線程,整個啟動過程主要分為三個步驟:

  • 前置檢查:檢查線程池是否處于關閉狀態(tài)晶密,在正常運行的情況下增加工作線程計數(shù)擒悬。
  • 正常處理:創(chuàng)建Worker對象并在加鎖的條件下將新建worker添加到workers集合當中,并通過調(diào)用t.start()方法啟動線程稻艰。
  • 后置處理:判斷啟動線程是否失敗懂牧,如果失敗那么就嘗試中止線程池。
public class ThreadPoolExecutor extends AbstractExecutorService {
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                // 判斷是否超出線程限制,corePoolSize和core線程數(shù)僧凤,
                // maximumPoolSize代表超出core部分的線程數(shù)
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            decrementWorkerCount();
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }
}


ThreadPoolExecutor的worker介紹

? ThreadPoolExecutor的worker實現(xiàn)Runnable接口畜侦,在worker的內(nèi)部run()方法中通過執(zhí)行runWorker()方法來啟動task,啟動方式會調(diào)用task.run()方法躯保,所以從這個角度來看旋膳,task的執(zhí)行線程其實ThreadPoolExecutor線程池中的worker。

  • Worker類內(nèi)部包含:Thread thread工作線程用于執(zhí)行task途事、Runnable firstTask標識待執(zhí)行任務验懊。
  • runWorker()方法內(nèi)部負責執(zhí)行來自提交的firstTask或者阻塞從任務隊列通過getTask()方法取得待執(zhí)行任務
  • runWorker()方法內(nèi)部通過執(zhí)行task.run()負責真正執(zhí)行任務。
public class ThreadPoolExecutor extends AbstractExecutorService {

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
    }



?runWorker內(nèi)部主要做兩件事情尸变,分別是:

  • 獲取任務:通過直接傳進來firstTask或者通過getTask從任務隊列中獲取任務
  • 執(zhí)行任務:task.run()執(zhí)行真正的task任務
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }



?getTask()方法外層是一個for循環(huán)义图,然后內(nèi)部從workQueue獲取任務,區(qū)分設置超時或者阻塞等待召烂。

  • 阻塞等待直至線程獲取到可消費任務碱工。
  • 超時等待使用的是keepAliveTime,用于超時后設置線程超時標記然后線程退出工作奏夫。
  • 線程退出循環(huán)是通過返回task=null怕篷,外層循環(huán)直接結束實現(xiàn)。
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            // 標記線程退出工作部分的邏輯酗昼,通過返回task=null廊谓,從而在外層調(diào)用方實現(xiàn)退出while循環(huán)
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
}


ThreadPoolExecutor的task介紹

? ThreadPoolExecutor的newTaskFor()方法負責創(chuàng)建task,創(chuàng)建的FutureTask的實例本身實現(xiàn)了Runnable仔雷、Future的接口蹂析。

  • FutureTask內(nèi)部可以創(chuàng)建入?yún)镽unnable的對象的時候會創(chuàng)建一個代理器
  • RunnableAdapter舔示,創(chuàng)建入?yún)镃allable的對象就比較直接了碟婆。
  • FutureTask的運行函數(shù)run()負責執(zhí)行Callable對象的call()方法并將返回值通過set()方法設置到outcome對象。
  • FutureTask的get()方法負責獲取返回值惕稻,就是我們submit()后返回的future的get()調(diào)用竖共。
public abstract class AbstractExecutorService implements ExecutorService {
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
}
public class Executors {
    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }

    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }
}




public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}




public class FutureTask<V> implements RunnableFuture<V> {
    /**
     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    private Callable<V> callable;
    private Object outcome; // non-volatile, protected by state reads/writes
    private volatile Thread runner;
    private volatile WaitNode waiters;

    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

    public boolean isCancelled() {
        return state >= CANCELLED;
    }

    public boolean isDone() {
        return state != NEW;
    }

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }

    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }
    
    // 核心的邏輯,負責調(diào)用對象的call方法并賦值返回值
    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            runner = null;
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long stateOffset;
    private static final long runnerOffset;
    private static final long waitersOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = FutureTask.class;
            stateOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("state"));
            runnerOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("runner"));
            waitersOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("waiters"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }

}


參考文章

ThreadPoolExecutor解析-主要源碼研究
ThreadPoolExecutor(五)——線程池關閉相關操作
ThreadPoolExecutor(六)——線程池關閉之后

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末俺祠,一起剝皮案震驚了整個濱河市公给,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌蜘渣,老刑警劉巖淌铐,帶你破解...
    沈念sama閱讀 221,888評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異蔫缸,居然都是意外死亡腿准,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,677評論 3 399
  • 文/潘曉璐 我一進店門吐葱,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人灾前,你說我怎么就攤上這事∶霞” “怎么了哎甲?”我有些...
    開封第一講書人閱讀 168,386評論 0 360
  • 文/不壞的土叔 我叫張陵饲嗽,是天一觀的道長。 經(jīng)常有香客問我喝噪,道長础嫡,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,726評論 1 297
  • 正文 為了忘掉前任酝惧,我火速辦了婚禮,結果婚禮上巫财,老公的妹妹穿的比我還像新娘哩陕。我一直安慰自己,他們只是感情好闽瓢,可當我...
    茶點故事閱讀 68,729評論 6 397
  • 文/花漫 我一把揭開白布心赶。 她就那樣靜靜地躺著,像睡著了一般缨叫。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上销钝,一...
    開封第一講書人閱讀 52,337評論 1 310
  • 那天琐簇,我揣著相機與錄音,去河邊找鬼纵装。 笑死,一個胖子當著我的面吹牛诗箍,可吹牛的內(nèi)容都是我干的挽唉。 我是一名探鬼主播滤祖,決...
    沈念sama閱讀 40,902評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼瓶籽,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了汤求?” 一聲冷哼從身側響起严拒,我...
    開封第一講書人閱讀 39,807評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎挤牛,沒想到半個月后种蘸,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,349評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡诫硕,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,439評論 3 340
  • 正文 我和宋清朗相戀三年痘括,在試婚紗的時候發(fā)現(xiàn)自己被綠了滔吠。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片挠日。...
    茶點故事閱讀 40,567評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡嚣潜,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情庇麦,我是刑警寧澤喜德,帶...
    沈念sama閱讀 36,242評論 5 350
  • 正文 年R本政府宣布舍悯,位于F島的核電站,受9級特大地震影響萌衬,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜朴艰,卻給世界環(huán)境...
    茶點故事閱讀 41,933評論 3 334
  • 文/蒙蒙 一混移、第九天 我趴在偏房一處隱蔽的房頂上張望沫屡。 院中可真熱鬧,春花似錦沮脖、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,420評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至故俐,卻和暖如春紊婉,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背槽片。 一陣腳步聲響...
    開封第一講書人閱讀 33,531評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留碌廓,地道東北人剩盒。 一個月前我還...
    沈念sama閱讀 48,995評論 3 377
  • 正文 我出身青樓,卻偏偏與公主長得像波材,于是被迫代替她去往敵國和親身隐。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,585評論 2 359

推薦閱讀更多精彩內(nèi)容