源碼篇-ThreadPoolExecutor

一、執(zhí)行任務

public void execute(Runnable command) {
    // 如果任務為空柬焕,直接拋異常
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    
    // 獲取線程池運行狀態(tài)
    int c = ctl.get();
    // 如果運行的線程數(shù)小于核心數(shù)沃饶,添加worker
    if (workerCountOf(c) < corePoolSize) {
        // 添加worker,并將core設為true石抡,表示是核心線程
        if (addWorker(command, true))
            return;
        // 如果添加失敗重新獲取線程池運行狀態(tài)
        c = ctl.get();
    }
    // 如果線程池在運行且隊列未滿
    if (isRunning(c) && workQueue.offer(command)) {     
        int recheck = ctl.get();
        // 如果線程池不在運行且刪除任務成功檐嚣,執(zhí)行拒絕策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 如果線程池工作線程為0,添加空的任務
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 走到這里啰扛,說明核心線程數(shù)用完且任務隊列已滿嚎京,那么啟用非核心線程數(shù),如果失敗隐解,執(zhí)行拒絕策略
    else if (!addWorker(command, false))
        reject(command);
}
  • 首先用核心線程執(zhí)行任務鞍帝,如果核心線程已滿,將任務添加到任務隊列煞茫;如果隊列也滿了帕涌,那么用非核心線程執(zhí)行任務
  • addWorker(Runnable firstTask, boolean core)第一個參數(shù)是執(zhí)行的任務,第二個參數(shù)如果為true续徽,表示用的是核心線程蚓曼,false表示用的是非核心線程
  • 成員變量ctl是AtomicInteger類型,用來表示線程運行狀態(tài)和線程數(shù)钦扭,高3位表示運行狀態(tài)纫版,低29位表示運行線程數(shù)
private boolean addWorker(Runnable firstTask, boolean core) {
    
    //retry用來判斷是否可以添加任務,并更新線程數(shù)    
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        // 如果線程池已經(jīng)關閉且沒有任務土全,直接返回false
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            // 獲取工作線程數(shù)
            int wc = workerCountOf(c);
            // 如果工作線程數(shù)大于等于最大線程數(shù)捎琐,直接返回false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 更新線程數(shù)会涎,如果成功裹匙,跳出retry
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 走到這,說明更新線程數(shù)失敗了末秃,重新獲取線程池狀態(tài)
            c = ctl.get();  // Re-read ctl
            // 如果線程池狀態(tài)變化了概页,從retry重新執(zhí)行;如果線程池狀態(tài)沒有變化练慕,繼續(xù)for循環(huán)
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 創(chuàng)建Worker對象
        w = new Worker(firstTask);
        // 獲取線程
        final Thread t = w.thread;
        // 如果線程不為空惰匙,啟動線程
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                
                // 再次獲取狀態(tài)
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // 因為線程還沒啟動,所以這里線程是alive铃将,說明是不正常的
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 將新創(chuàng)建的worker加入到workers集合
                    workers.add(w);
                    int s = workers.size();
                    // 更新largestPoolSize值
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    // workerAdded標記為true
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 如果任務添加完成项鬼,啟動線程且將workerStarted標記為true
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 如果啟動線程失敗,從workers刪除新創(chuàng)建的任務劲阎,且執(zhí)行tryTerminate
        if (! workerStarted)
            addWorkerFailed(w);
    }
    // 最后返回線程是否啟動成功
    return workerStarted;
}
  • 首先去更新工作的線程數(shù)
  • 創(chuàng)建Worker對象绘盟,此時會創(chuàng)建Thread類型的成員變量thread,Worker對象會傳入到該線程,因為Worker對象實現(xiàn)了Runnable方法龄毡,所以啟動線程thread時吠卷,會執(zhí)行Worker的run方法
Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

二、執(zhí)行任務

1. 發(fā)起任務執(zhí)行

public void run() {
    runWorker(this);
}

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    // 用來表示是否正常執(zhí)行任務沦零,true表示被打斷了祭隔,false表示未被打斷
    boolean completedAbruptly = true;
    try {
        // 如果worker對象有傳入了任務或者任務隊列有任務
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            
            // 如果線程池狀態(tài)是停止以上的級別
            // 或者
            // 線程已經(jīng)被中斷且狀態(tài)是停止以上的級別且當前線程還不是打斷狀態(tài)
            // 那么中斷線程
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 方法執(zhí)行前,空方法路操,留給子類實現(xiàn)
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 執(zhí)行任務
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 執(zhí)行完后疾渴,空方法,留給子類實現(xiàn)
                    afterExecute(task, thrown);
                }
            } finally {               
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        // 被中斷標記設為false
        completedAbruptly = false;
    } finally {
        // 任務完成后的操作
        processWorkerExit(w, completedAbruptly);
    }
}        
  • 首先執(zhí)行傳入worker對象里的任務寻拂,如果為空程奠,則從任務隊列里獲取任務
  • 判斷是否需要打斷線程
  • 執(zhí)行任務
  • 任務完成相關的操作
2. 獲取任務
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        // 獲取線程池狀態(tài)
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        
        
        // 如果線程池狀態(tài)關閉且任務隊列為空
        // 或者
        // 線程池狀態(tài)是停止
        // 那么將線程數(shù)減1,返回空任務
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        // 獲取線程數(shù)
        int wc = workerCountOf(c);

        // Are workers subject to culling?
        
        // 如果allowCoreThreadTimeOut為真祭钉,表示核心線程也有超時時間瞄沙,一般默認為false
        // 或者
        // 工作線程數(shù)超過核心線程數(shù)
        // 那么將timed設為真,設為真的目的是為了沒任務的時候慌核,減少工作線程的數(shù)量
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 如果工作線程大于最大線程數(shù)或者超時了
        // 且
        // 工作線程數(shù)大于1或者任務隊列不為空
        // 那么工作線程減1
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 如果有超時設置距境,那么帶有超時時間獲取任務,否則就阻塞獲取任務
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            // 如果任務不為空垮卓,返回任務
            if (r != null)
                return r;
            // 如果任務為空垫桂,將超時設置為true
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
  • 這部分代碼能說明線程復用,超時未獲取到任務減少線程的原理
  • 如果設置了核心線程有超時時間或者線程數(shù)超過了核心線程數(shù)粟按,那么采用帶超時的方式獲取任務诬滩,如果沒有獲取到任務,那么線程數(shù)會減1灭将;如果不采用帶超時的方式獲取任務疼鸟,那么一直等待,知道從任務隊列里獲取了任務

3. 退出任務執(zhí)行

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        // 從任務集合中刪除任務
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    // 嘗試終止任務
    tryTerminate();

    int c = ctl.get();
    // 如果線程池狀態(tài)小于STOP庙曙,說明還需要工作線程
    if (runStateLessThan(c, STOP)) {
        // 如果任務執(zhí)行被中斷了空镜,保證至少還有1個工作線程在執(zhí)行
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // 相當于線程還在工作
        addWorker(null, false);
    }
}
  • 從任務集合中刪除當前任務
  • 嘗試終止線程池
  • 如果線程池狀態(tài)是小于STOP,保證有工作線程在工作
final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        // isRunning(c) 表示在運行捌朴,不能停止
        // runStateAtLeast(c, TIDYING)表示已經(jīng)停止了吴攒,沒必要停止
        // (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())表示關閉但是有任務,不能停止
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
       // 如果工作線程數(shù)不等于0砂蔽,停止1個空閑的工作線程洼怔,通過tryLock判斷是否空閑
       if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 走到這,說明工作線程是0了左驾,將狀態(tài)改為TIDYING
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // 終止線程池镣隶,空方法泽台,留給子類實現(xiàn)
                    terminated();
                } finally {
                    // 最后將狀態(tài)改為TERMINATED,通知等待線程池終止的線程
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}
  • 如果線程池狀態(tài)不在運行矾缓,且工作線程數(shù)為0怀酷,那么最終將線程池狀態(tài)改為TERMINATED

三、拒絕策略

1. 直接拋異常(默認策略)
public static class AbortPolicy implements RejectedExecutionHandler {
    /**
     * Creates an {@code AbortPolicy}.
     */
    public AbortPolicy() { }

    /**
     * Always throws RejectedExecutionException.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     * @throws RejectedExecutionException always
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}
2. 調(diào)用者的線程執(zhí)行任務
public static class CallerRunsPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code CallerRunsPolicy}.
     */
    public CallerRunsPolicy() { }

    /**
     * Executes task r in the caller's thread, unless the executor
     * has been shut down, in which case the task is discarded.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}
3. 丟掉最早的任務
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code DiscardOldestPolicy} for the given executor.
     */
    public DiscardOldestPolicy() { }

    /**
     * Obtains and ignores the next task that the executor
     * would otherwise execute, if one is immediately available,
     * and then retries execution of task r, unless the executor
     * is shut down, in which case task r is instead discarded.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}
4. 空的策略
public static class DiscardPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code DiscardPolicy}.
     */
    public DiscardPolicy() { }

    /**
     * Does nothing, which has the effect of discarding task r.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

四嗜闻、線程工廠

static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        // 設置線程組
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        // 設置線程前綴名
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }

    public Thread newThread(Runnable r) {
        // 創(chuàng)建線程
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

五蜕依、常見四種線程池

1.newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}
  • 可自定義線程數(shù)和線程工廠,核心線程數(shù)與最大線程數(shù)相等琉雳,這樣的話線程一旦創(chuàng)建样眠,就會一直運行
2.newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}
  • 只可以自定義線程工廠,核心線程數(shù)為0翠肘,最大線程數(shù)為Integer最大值
  • 相當于沒有限制工作線程數(shù)檐束,任務量大的時候,會影響機器性能
  • 空任務的時候束倍,會有1個線程在運行
3.newSingleThreadScheduledExecutor
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1));
}

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1, threadFactory));
}    
  • 只創(chuàng)建一個工作線程被丧,可自定義線程工廠,可以定時執(zhí)行任務
4.newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public static ScheduledExecutorService newScheduledThreadPool(
        int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}    
  • 可定義線程工廠和核心工作線程數(shù)绪妹,最大工作線程數(shù)為Integer.MAX_VALUE
  • 可定時執(zhí)行任務
最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末甥桂,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子邮旷,更是在濱河造成了極大的恐慌黄选,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,807評論 6 518
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件婶肩,死亡現(xiàn)場離奇詭異办陷,居然都是意外死亡,警方通過查閱死者的電腦和手機律歼,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,284評論 3 399
  • 文/潘曉璐 我一進店門民镜,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人苗膝,你說我怎么就攤上這事殃恒≈簿桑” “怎么了辱揭?”我有些...
    開封第一講書人閱讀 169,589評論 0 363
  • 文/不壞的土叔 我叫張陵,是天一觀的道長病附。 經(jīng)常有香客問我问窃,道長,這世上最難降的妖魔是什么完沪? 我笑而不...
    開封第一講書人閱讀 60,188評論 1 300
  • 正文 為了忘掉前任域庇,我火速辦了婚禮嵌戈,結果婚禮上,老公的妹妹穿的比我還像新娘听皿。我一直安慰自己熟呛,他們只是感情好,可當我...
    茶點故事閱讀 69,185評論 6 398
  • 文/花漫 我一把揭開白布尉姨。 她就那樣靜靜地躺著庵朝,像睡著了一般。 火紅的嫁衣襯著肌膚如雪又厉。 梳的紋絲不亂的頭發(fā)上九府,一...
    開封第一講書人閱讀 52,785評論 1 314
  • 那天,我揣著相機與錄音覆致,去河邊找鬼侄旬。 笑死,一個胖子當著我的面吹牛煌妈,可吹牛的內(nèi)容都是我干的儡羔。 我是一名探鬼主播,決...
    沈念sama閱讀 41,220評論 3 423
  • 文/蒼蘭香墨 我猛地睜開眼璧诵,長吁一口氣:“原來是場噩夢啊……” “哼笔链!你這毒婦竟也來了?” 一聲冷哼從身側響起腮猖,我...
    開封第一講書人閱讀 40,167評論 0 277
  • 序言:老撾萬榮一對情侶失蹤鉴扫,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后澈缺,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體坪创,經(jīng)...
    沈念sama閱讀 46,698評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,767評論 3 343
  • 正文 我和宋清朗相戀三年姐赡,在試婚紗的時候發(fā)現(xiàn)自己被綠了莱预。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,912評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡项滑,死狀恐怖依沮,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情枪狂,我是刑警寧澤危喉,帶...
    沈念sama閱讀 36,572評論 5 351
  • 正文 年R本政府宣布,位于F島的核電站州疾,受9級特大地震影響辜限,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜严蓖,卻給世界環(huán)境...
    茶點故事閱讀 42,254評論 3 336
  • 文/蒙蒙 一薄嫡、第九天 我趴在偏房一處隱蔽的房頂上張望氧急。 院中可真熱鬧,春花似錦毫深、人聲如沸吩坝。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,746評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽钾恢。三九已至,卻和暖如春鸳址,著一層夾襖步出監(jiān)牢的瞬間瘩蚪,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,859評論 1 274
  • 我被黑心中介騙來泰國打工稿黍, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留疹瘦,地道東北人。 一個月前我還...
    沈念sama閱讀 49,359評論 3 379
  • 正文 我出身青樓巡球,卻偏偏與公主長得像言沐,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子酣栈,可洞房花燭夜當晚...
    茶點故事閱讀 45,922評論 2 361

推薦閱讀更多精彩內(nèi)容