ThreadPoolExecutor線程池原理

本文參考
Java線程池---addWorker方法解析
Java線程池ThreadPoolExecutor實(shí)現(xiàn)原理
線程池如何復(fù)用

Executor(Interface):

執(zhí)行提交的線程任務(wù)的對象。這個(gè)接口提供了一種將任務(wù)提交與每個(gè)任務(wù)將如何運(yùn)行實(shí)現(xiàn)了分離亡问,包括線程使用、調(diào)度等細(xì)節(jié)昵仅。該接口只定義了一個(gè)execute()方法邪媳。

// 用來執(zhí)行一個(gè)指令任務(wù),這個(gè)任務(wù)可能在一個(gè)新的線程中執(zhí)行,可能在線程池已有的線程中執(zhí)行巫延,也可能在當(dāng)前線程執(zhí)行,由Executor接口的實(shí)現(xiàn)方?jīng)Q定地消;
void execute(Runnable command);

ExecutorService(Interface):

提供用于管理終止的方法如 shutDown()和shutDownNow()用于關(guān)閉線程池的方法以及判斷線程池是否關(guān)閉的方法如炉峰,isShutdown(),isTerminated()的方法脉执,還提供了involveAll()和submit()

// 關(guān)閉線程池疼阔,不接受新的任務(wù),繼續(xù)執(zhí)行已經(jīng)提交的任務(wù),但是不等待已經(jīng)提交的任務(wù)完成婆廊,使用awaitTermination接口
void shutdown();

// 判斷當(dāng)前線程池是否被關(guān)閉
boolean isShutdown();

// 判斷是否所有提交的任務(wù)在showDown()后已經(jīng)執(zhí)行完畢 
boolean isTerminated(); 

// 執(zhí)行一個(gè)任務(wù)迅细,并且返回這個(gè)任務(wù)的Future對象,用于表示這個(gè)任務(wù)的掛起結(jié)果
<T> Future<T> submit(Callable<T> task);

// 執(zhí)行許多任務(wù)淘邻,返回任務(wù)的results
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

// 執(zhí)行許多任務(wù)茵典,返回許多Future(包含status and results)
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
Callable:

和Runnable類似,是java開啟子線程的一種方式宾舅,用Callable開啟的線程可以取消執(zhí)行统阿;

  • call():類似run方法
Future:

一個(gè)代表線程執(zhí)行結(jié)果的類,可以用來判斷線程是否運(yùn)行結(jié)束贴浙,主動終止線程

  • get():等待線程執(zhí)行完畢砂吞,返回線程的結(jié)果
  • cancel():停止線程
  • isDown():判斷線程是否結(jié)束
  • isCancel():判斷線程是否取消

ThreadPoolExecutor:

AbstractExecutorService的實(shí)現(xiàn)類,java線程池的核心實(shí)現(xiàn)類崎溃,下面我們重點(diǎn)看這個(gè)類蜻直;


線程池執(zhí)行任務(wù)的流程
重要變量:
//  構(gòu)造方法中的參數(shù)
private volatile int corePoolSize;// 核心線程數(shù)
private volatile int maximumPoolSize;// 最大線程數(shù)
private volatile long keepAliveTime; // 空閑線程的可存活時(shí)間 
private final BlockingQueue<Runnable> workQueue; // 工作隊(duì)列,存放任務(wù)的數(shù)據(jù)結(jié)構(gòu)
private volatile ThreadFactory threadFactory; // 線程工廠袁串,創(chuàng)建新線程的工廠
private volatile RejectedExecutionHandler handler;  // 拋棄線程策略

// 線程池狀態(tài)變量(這是一個(gè)atomic integer)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

深入解析Java AtomicInteger 原子類型
在線程池的內(nèi)部概而,封裝了一個(gè)Worker對象,Worker對象是線程池維護(hù)的用來處理用戶提交的任務(wù)的線程囱修;

// worker的本質(zhì)是一個(gè)任務(wù)
private final class Worker extends AbstractQueuedSynchronizer implements Runnable

execute():

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) { // 如果正在運(yùn)行的線程數(shù)小于核心線程數(shù)
            if (addWorker(command, true))          // 直接創(chuàng)建一個(gè)新線程執(zhí)行赎瑰,然后return
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {    // 如果任務(wù)可以成功的加入阻塞隊(duì)列
            int recheck = ctl.get();                          
            if (! isRunning(recheck) && remove(command))  //  雙重檢查:如果線程池被關(guān)閉并且任務(wù)成功的從阻塞隊(duì)列中取出,就放棄這個(gè)任務(wù)
                reject(command);
            else if (workerCountOf(recheck) == 0) // 如果當(dāng)前的線程數(shù)為0(線程池沒有被關(guān)閉)破镰,開啟新線程執(zhí)行
                addWorker(null, false);
        }
        else if (!addWorker(command, false))  // 如果無法創(chuàng)建新線程去執(zhí)行這個(gè)任務(wù) 餐曼,也放棄這個(gè)任務(wù)  
            reject(command);
    }
execute()的流程分為三部分(上面有詳細(xì)的英文注釋):
  • 如果比正在運(yùn)行的核心線程數(shù)量小,則嘗試開啟一個(gè)新線程鲜漩,將這個(gè)任務(wù)作為他的第一個(gè)任務(wù)源譬,并且調(diào)用addWorker()去檢查 runState 和workerCount,避免不必要的錯(cuò)誤警報(bào)
  • 如果一個(gè)任務(wù)可以成功的加入阻塞隊(duì)列中排隊(duì)孕似,我們需要再次雙重檢驗(yàn)是否我們需要?jiǎng)?chuàng)建一個(gè)新的線程踩娘,如果線程池被關(guān)閉(由于這個(gè)任務(wù)的某個(gè)邏輯導(dǎo)致)或者現(xiàn)存的線程死掉了,我們就調(diào)用handler去拒絕這個(gè)任務(wù)喉祭,如果當(dāng)前線程池沒有線程养渴,則創(chuàng)建一個(gè)新的線程
  • 如果這個(gè)線程無法進(jìn)入阻塞隊(duì)列又無法創(chuàng)建新線程去執(zhí)行闰集,我們就用handler丟棄他钞它;

addWorker():

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        //  使用CAS機(jī)制輪詢線程池的狀態(tài)氧敢,如果線程池處于SHUTDOWN及大于它的狀態(tài)則拒絕執(zhí)行任務(wù)
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary. 
            // 如果是STOP坊秸,TIDYING,TERMINATED狀態(tài)的話乘盖,則會返回false哎迄,如果現(xiàn)在狀態(tài)是SHUTDOWN鲜戒,但是firstTask不為空或者workQueue為空的話咙冗,那么直接返回false
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                 //  獲得當(dāng)前workerCount,如果當(dāng)前workerCount大于容量/核心線程數(shù)/最大線程數(shù)中捆,return false
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 判斷當(dāng)前的線程池狀態(tài)是否可以創(chuàng)建新的worker鸯匹,可以的話 break retry,創(chuàng)建新worker
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                 // CAS的自旋泄伪,再次比較ctl殴蓬,如果不一致retry
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        //   創(chuàng)建新的worker
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                // 加鎖
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    // 在鎖內(nèi)部在進(jìn)行一次判斷是否滿足創(chuàng)建worker的條件
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

addWorker:

根據(jù)當(dāng)前線程池的狀態(tài)和給定的線程數(shù)大小(core/maximum)判斷是否可以添加一個(gè)新的worker蟋滴,如果可以染厅,創(chuàng)建一個(gè)新的worker,當(dāng)前任務(wù)作為他的first task津函,當(dāng)線程池被關(guān)閉或者工廠創(chuàng)建線程失敗肖粮,這個(gè)方法返回false;
CAS機(jī)制
什么是retry尔苦?

自帶的ThreadPoolExecutor對象
  • newFixedThreadPool:一個(gè)固定線程數(shù)量的線程池:
  • newCachedThreadPool:不固定線程數(shù)量涩馆,且支持最大為Integer.MAX_VALUE的線程數(shù)量:
  • newSingleThreadExecutor:可以理解為線程數(shù)量為1的FixedThreadPool:
  • newScheduledThreadPool:支持定時(shí)以指定周期循環(huán)執(zhí)行任務(wù):
拒絕策略(當(dāng)阻塞隊(duì)列和max都滿了做出的處理)
  • AbortPolicy:直接拋出異常。
  • CallerRunsPolicy:由調(diào)用者所在線程來運(yùn)行任務(wù)允坚。
  • DiscardOldestPolicy:丟棄隊(duì)列里最老的一個(gè)任務(wù)魂那,并執(zhí)行當(dāng)前任務(wù)。
  • DiscardPolicy:丟棄掉當(dāng)前提交的新任務(wù)稠项。

封裝的自定義的ThreadPoolExecutor

public class ThreadPoolManager {
    /**
     * 單例設(shè)計(jì)模式(餓漢式)
     * 單例首先私有化構(gòu)造方法涯雅,然后餓漢式一開始就開始創(chuàng)建,并提供get方法
     */
    private static ThreadPoolManager mInstance = new ThreadPoolManager();

    public static ThreadPoolManager getInstance() {
        return mInstance;
    }

    private int corePoolSize;//核心線程池的數(shù)量展运,同時(shí)能夠執(zhí)行的線程數(shù)量
    private int maximumPoolSize;//最大線程池?cái)?shù)量活逆,表示當(dāng)緩沖隊(duì)列滿的時(shí)候能繼續(xù)容納的等待任務(wù)的數(shù)量
    private long keepAliveTime = 1;//存活時(shí)間
    private TimeUnit unit = TimeUnit.HOURS;
    private ThreadPoolExecutor executor;

    private ThreadPoolManager() {
        /**
         * 給corePoolSize賦值:當(dāng)前設(shè)備可用處理器核心數(shù)*2 + 1,能夠讓cpu的效率得到最大程度執(zhí)行
         */
        corePoolSize = Runtime.getRuntime().availableProcessors() * 2 + 1;
        maximumPoolSize = corePoolSize; //雖然maximumPoolSize用不到,但是需要賦值拗胜,否則報(bào)錯(cuò)
        executor = new ThreadPoolExecutor(
                corePoolSize, //當(dāng)某個(gè)核心任務(wù)執(zhí)行完畢蔗候,會依次從緩沖隊(duì)列中取出等待任務(wù)
                maximumPoolSize, //5,先corePoolSize,然后new LinkedBlockingQueue<Runnable>(),然后maximumPoolSize,但是它的數(shù)量是包含了corePoolSize的
                keepAliveTime, //表示的是maximumPoolSize當(dāng)中等待任務(wù)的存活時(shí)間
                unit,
                new LinkedBlockingQueue<Runnable>(), //緩沖隊(duì)列,用于存放等待任務(wù)挤土,Linked的先進(jìn)先出
                Executors.defaultThreadFactory(), //創(chuàng)建線程的工廠
                new ThreadPoolExecutor.AbortPolicy() //用來對超出maximumPoolSize的任務(wù)的處理策略
        );
    }

    /**
     * 執(zhí)行任務(wù)
     */
    public void execute(Runnable runnable) {
        if (runnable == null) return;
        executor.execute(runnable);
    }

    /**
     * 從線程池中移除任務(wù)
     */
    public void remove(Runnable runnable) {
        if (runnable == null) return;
        executor.remove(runnable);
    }
}

配置線程池

任務(wù)的性質(zhì):

  • CPU密集型(大量計(jì)算) :CPU數(shù)+1
  • IO密集型(磁盤IO,網(wǎng)絡(luò)IO):CPU數(shù)*2+1

線程池如何復(fù)用線程误算?

一個(gè)大run()把其它小run()#1,run()#2,...給串聯(lián)起來了
前提條件:假如coreSize=3仰美,maxSize=10,當(dāng)前存在線程數(shù)是5儿礼。
(注意咖杂,存在的這5個(gè)線程,并不是你執(zhí)行ExecuteService.execute/submit時(shí)的參數(shù)蚊夫,而是為了執(zhí)行execute/submit的參數(shù)所啟動的“內(nèi)部線程”诉字。這個(gè)“內(nèi)部線程”其實(shí)是通過ThreadPoolExecutor的ThreadFactory參數(shù)生成的線程,而“execute/submit的參數(shù)”是執(zhí)行在這些“內(nèi)部線程”里面的。)
存在這5個(gè)“內(nèi)部線程”壤圃,都訪問同一個(gè)隊(duì)列陵霉,從隊(duì)列中去取任務(wù)執(zhí)行(任務(wù)就是通過execute/submit提交的Runnable參數(shù)),當(dāng)任務(wù)充足時(shí)伍绳,5個(gè)“內(nèi)部線程”都持續(xù)執(zhí)行踊挠。重點(diǎn)是沒有任務(wù)時(shí)怎么辦?
沒有任務(wù)時(shí)冲杀,這5個(gè)“內(nèi)部線程”都會做下面判斷:
如果poolSize > coreSize效床,那就從隊(duì)列里取任務(wù),當(dāng)過了keepaliveTime這么長時(shí)間還沒有得到任務(wù)的話权谁,當(dāng)前這個(gè)“內(nèi)部線程”就會結(jié)束(使用的是BlockingQueue.poll方法)剩檀。
如果poolSize <= coreSize,那就以“阻塞”的方式旺芽,去從隊(duì)列里取任務(wù)沪猴,當(dāng)?shù)玫饺蝿?wù)后,就繼續(xù)執(zhí)行甥绿。這樣的話字币,這個(gè)線程就不會結(jié)束掉。
如果沒有任務(wù)可以繼續(xù)執(zhí)行了共缕,最后只剩下coreSize那么多的“內(nèi)部線程”留在線程池里洗出,等待重用。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末图谷,一起剝皮案震驚了整個(gè)濱河市翩活,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌便贵,老刑警劉巖菠镇,帶你破解...
    沈念sama閱讀 206,839評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異承璃,居然都是意外死亡利耍,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,543評論 2 382
  • 文/潘曉璐 我一進(jìn)店門盔粹,熙熙樓的掌柜王于貴愁眉苦臉地迎上來隘梨,“玉大人,你說我怎么就攤上這事舷嗡≈崃裕” “怎么了?”我有些...
    開封第一講書人閱讀 153,116評論 0 344
  • 文/不壞的土叔 我叫張陵进萄,是天一觀的道長捻脖。 經(jīng)常有香客問我锐峭,道長,這世上最難降的妖魔是什么可婶? 我笑而不...
    開封第一講書人閱讀 55,371評論 1 279
  • 正文 為了忘掉前任沿癞,我火速辦了婚禮,結(jié)果婚禮上扰肌,老公的妹妹穿的比我還像新娘抛寝。我一直安慰自己,他們只是感情好曙旭,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,384評論 5 374
  • 文/花漫 我一把揭開白布盗舰。 她就那樣靜靜地躺著,像睡著了一般桂躏。 火紅的嫁衣襯著肌膚如雪钻趋。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,111評論 1 285
  • 那天剂习,我揣著相機(jī)與錄音蛮位,去河邊找鬼。 笑死鳞绕,一個(gè)胖子當(dāng)著我的面吹牛失仁,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播们何,決...
    沈念sama閱讀 38,416評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼萄焦,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了冤竹?” 一聲冷哼從身側(cè)響起拂封,我...
    開封第一講書人閱讀 37,053評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎鹦蠕,沒想到半個(gè)月后冒签,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,558評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡钟病,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,007評論 2 325
  • 正文 我和宋清朗相戀三年萧恕,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片肠阱。...
    茶點(diǎn)故事閱讀 38,117評論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡票唆,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出辖所,到底是詐尸還是另有隱情惰说,我是刑警寧澤磨德,帶...
    沈念sama閱讀 33,756評論 4 324
  • 正文 年R本政府宣布缘回,位于F島的核電站吆视,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏酥宴。R本人自食惡果不足惜啦吧,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,324評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望拙寡。 院中可真熱鬧授滓,春花似錦、人聲如沸肆糕。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,315評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽诚啃。三九已至淮摔,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間始赎,已是汗流浹背和橙。 一陣腳步聲響...
    開封第一講書人閱讀 31,539評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留造垛,地道東北人魔招。 一個(gè)月前我還...
    沈念sama閱讀 45,578評論 2 355
  • 正文 我出身青樓,卻偏偏與公主長得像五辽,于是被迫代替她去往敵國和親办斑。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,877評論 2 345