線程池

在沒有看線程池的源碼的時候,我腦子里經(jīng)常會想砰诵,線程跑完怎么之后怎么歸還給線程池雨席。其實這種思想很明顯是和對象池的概念搞混淆了菩咨。
首先講對象池。對象池的核心思想是緩存和共享。那些被頻繁使用的對象旦委,在使用完后奇徒,不立即將它們釋放,而是將它們緩存起來缨硝,以供后續(xù)的應(yīng)用程序重復(fù)使用摩钙,從而減少創(chuàng)建對象和釋放對象的次數(shù),進而改善應(yīng)用程序的性能查辩。但是這個共享并不是同時共享胖笛。而是一個線程用完之后,交給另外一個線程用宜岛。就像數(shù)據(jù)庫連接池长踊。就像一把鑰匙,多個人共用萍倡,但是一次只能一個人進入身弊。我們把這種叫做串行線程關(guān)閉
串行線程封閉使得任意時刻列敲,最多僅有一個線程擁有對象的所有權(quán)阱佛。當然,這不是絕對的戴而,只要線程T1事實不會再修改對象O凑术,那么就相當于僅有T2擁有對象的所有權(quán)。串行線層封閉讓對象變得可以共享(雖然只能串行的擁有所有權(quán))所意,靈活性得到大大提高淮逊;相對的,要共享對象就涉及安全發(fā)布的問題扶踊,依靠BlockingQueue等同步工具很容易實現(xiàn)這一點泄鹏。
見前面我寫的自定義數(shù)據(jù)庫連接池http://www.reibang.com/p/545f8b1194ef
對象的復(fù)用是對象的所有權(quán)的交接。
而線程池的話是生產(chǎn)者消費者模型.
小于core個數(shù)的時候姻檀,去創(chuàng)建worker線程命满。然后調(diào)用runworker方法中的,task.run();假設(shè)core=3 ;然后不斷創(chuàng)建3個的消費者線程涝滴。即使先創(chuàng)建消費者線程1绣版,和2 。然后這個時候消費者1執(zhí)行完成了歼疮。假如這個時候再有任務(wù)杂抽。還是創(chuàng)建消費者線程3 。后面再有任務(wù)的話韩脏,堆積在BlockingQueue上缩麸。用戶不斷的submit提交任務(wù)。生產(chǎn)者赡矢。然后 3個消費者去并發(fā)消費提交的任務(wù)杭朱。他并沒有歸還給線程池阅仔。task.run();這句執(zhí)行完成之后,會進入下一個循環(huán)弧械,執(zhí)行task = getTask()方法八酒。
講完了這兩個東西的區(qū)別之后。我們來看線程池的源碼刃唐。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolTest {
    private static final ExecutorService service = Executors.newFixedThreadPool(5);

    public static void main(String[] args) {
        service.submit(new Task());
    }
}

class Task implements Runnable{
    @Override
    public void run() {
        System.out.println("hello world");
    }
}

注意羞迷,《阿里開發(fā)手冊》 上不建議用這種方式創(chuàng)建線程池。ExecutorService service = Executors.newFixedThreadPool(5); 因為這樣創(chuàng)建的LinkedBlockingQueue的長度是Integer.MaxValue画饥。會耗盡系統(tǒng)資源衔瓮。

Runnable任務(wù)的走向。

分析源碼之前抖甘,我們先畫出線程池的UML類圖热鞍。


image.png

主要的方法邏輯都在ThreadPoolExecutor上。
下面我們分析runnable task怎么走衔彻。

1 提交任務(wù)

public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

1.首先不能提交空任務(wù)碍现,否者拋個異常
2.然后通過newTaskFor方法將一個Runnable類型的task包裝成一個RunnableFuture類型的ftask
3.然后調(diào)用executor方法執(zhí)行ftask
newTaskFor(task, null)這一句的目的是將Runnable接口適配成一個返回值為null的FutureTask類型的任務(wù)。當然這都不重要米奸,我們關(guān)心的是task到哪里去了

2 封裝任務(wù)成FutureTask

包裝成一個FutureTask

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}
2.包裝成一個Callable交給成員變量callable

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}
3.包裝成一個RunnableAdapter

public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
}
4.交給了成員變量task,并交由call方法來執(zhí)行

static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
}

task先封裝成 RunnableAdapter 的task屬性昼接。然后這個RunnableAdapter是FutureTask的callable屬性。
然后接下來 我們關(guān)心ftask的callable屬性悴晰。

3 執(zhí)行exuecute方法慢睡。

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
           int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

addWorker方法

addWorker方法是線程池的核心方法,它負責(zé)控制線程什么情況可以提交铡溪,什么情況下執(zhí)行漂辐,什么時候拒絕新線程的加。這個方法很重要棕硫,想要學(xué)習(xí)線程池還需要了解這個方法

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

w = new Worker(firstTask); 先封裝成worker對象髓涯。然后執(zhí)行t.start()。
執(zhí)行worker對象 的thread屬性的 run()方法哈扮。

 Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

這段代碼把線程的執(zhí)行機制纬纪,和任務(wù)的提交分離. this指代當前worker對象。也就是把當前worker對象(runnable對象)給thread執(zhí)行滑肉。但是這里有個問題包各,傳入的firstTask干嘛去了。

addWorker方法中執(zhí)行的t.start() 靶庙,啟動worker方法中的run()方法问畅。

調(diào)用worker對象的runwork方法。

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

這里前面的疑問解決了,先拿到前面?zhèn)魅氲膄irstTask护姆,作為task變量矾端。第一次task不為空,短路或后面語句不執(zhí)行卵皂。然后task.run(); 最后task置為空须床。while循環(huán)下次判斷。while (task != null || (task = getTask()) != null) .執(zhí)行task =getTask()方法渐裂。

執(zhí)行下一個任務(wù)

 private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

workQueue.take() 從阻塞隊列中獲取豺旬,任務(wù)Runnable。作為getTask()方法返回值柒凉。繼續(xù)執(zhí)行上面的runWorker方法中的task.run()方法

參考資料:
從一個run方法的經(jīng)歷看線程池https://blog.csdn.net/u011803809/article/details/77427641

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末族阅,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子膝捞,更是在濱河造成了極大的恐慌坦刀,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,826評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蔬咬,死亡現(xiàn)場離奇詭異鲤遥,居然都是意外死亡,警方通過查閱死者的電腦和手機林艘,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,968評論 3 395
  • 文/潘曉璐 我一進店門盖奈,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人狐援,你說我怎么就攤上這事钢坦。” “怎么了啥酱?”我有些...
    開封第一講書人閱讀 164,234評論 0 354
  • 文/不壞的土叔 我叫張陵爹凹,是天一觀的道長。 經(jīng)常有香客問我镶殷,道長禾酱,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,562評論 1 293
  • 正文 為了忘掉前任绘趋,我火速辦了婚禮颤陶,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘埋心。我一直安慰自己指郁,他們只是感情好忙上,可當我...
    茶點故事閱讀 67,611評論 6 392
  • 文/花漫 我一把揭開白布拷呆。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪茬斧。 梳的紋絲不亂的頭發(fā)上腰懂,一...
    開封第一講書人閱讀 51,482評論 1 302
  • 那天,我揣著相機與錄音项秉,去河邊找鬼绣溜。 笑死,一個胖子當著我的面吹牛娄蔼,可吹牛的內(nèi)容都是我干的怖喻。 我是一名探鬼主播,決...
    沈念sama閱讀 40,271評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼岁诉,長吁一口氣:“原來是場噩夢啊……” “哼锚沸!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起涕癣,我...
    開封第一講書人閱讀 39,166評論 0 276
  • 序言:老撾萬榮一對情侶失蹤哗蜈,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后坠韩,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體距潘,經(jīng)...
    沈念sama閱讀 45,608評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,814評論 3 336
  • 正文 我和宋清朗相戀三年只搁,在試婚紗的時候發(fā)現(xiàn)自己被綠了音比。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,926評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡氢惋,死狀恐怖硅确,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情明肮,我是刑警寧澤菱农,帶...
    沈念sama閱讀 35,644評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站柿估,受9級特大地震影響循未,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜秫舌,卻給世界環(huán)境...
    茶點故事閱讀 41,249評論 3 329
  • 文/蒙蒙 一的妖、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧足陨,春花似錦嫂粟、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,866評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽零抬。三九已至,卻和暖如春宽涌,著一層夾襖步出監(jiān)牢的瞬間平夜,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,991評論 1 269
  • 我被黑心中介騙來泰國打工卸亮, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留忽妒,地道東北人。 一個月前我還...
    沈念sama閱讀 48,063評論 3 370
  • 正文 我出身青樓兼贸,卻偏偏與公主長得像段直,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子溶诞,可洞房花燭夜當晚...
    茶點故事閱讀 44,871評論 2 354

推薦閱讀更多精彩內(nèi)容