源碼|從串行線程封閉到對象池辛慰、線程池

今天講一個牛逼而實用的概念干像,串行線程封閉對象池是串行線程封閉的典型應用場景狞膘;線程池糅合了對象池技術什乙,但核心實現(xiàn)不依賴于對象池已球,很容易產(chǎn)生誤會智亮。本文從串行線程封閉和對象池入手,最后通過源碼分析線程池的核心原理阔蛉,厘清對象池與線程池之間的誤會状原。

JDK版本:oracle java 1.8.0_102

線程封閉與串行線程封閉

線程封閉

線程封閉是一種常見的線程安全設計策略:僅在固定的一個線程內訪問對象,不對其他線程共享削锰。

使用線程封閉技術毕莱,對象O始終只對一個線程T1可見,“單線程”中自然不存在線程安全的問題蛹稍。

ThreadLocal是常用的線程安全工具唆姐,見源碼|ThreadLocal的實現(xiàn)原理。線程封閉在Servlet及高層的web框架Spring等中應用不少厦酬。

串行線程封閉

線程封閉雖然好用胆描,卻限制了對象的共享昌讲。串行線程封閉改進了這一點:對象O只能由單個線程T1擁有减噪,但可以通過安全的發(fā)布對象O來轉移O的所有權;在轉移所有權后醋闭,也只有另一個線程T2能獲得這個O的所有權朝卒,并且發(fā)布O的T1不會再訪問O抗斤。

所謂“所有權”,指修改對象的權利龙宏。

相對于線程封閉伤疙,串行線程封閉使得任意時刻徒像,最多僅有一個線程擁有對象的所有權。當然衅澈,這不是絕對的谬墙,只要線程T1事實不會再修改對象O经备,那么就相當于僅有T2擁有對象的所有權侵蒙。串行線層封閉讓對象變得可以共享(雖然只能串行的擁有所有權)傅蹂,靈活性得到大大提高份蝴;相對的,要共享對象就涉及安全發(fā)布的問題浸卦,依靠BlockingQueue等同步工具很容易實現(xiàn)這一點案糙。

對象池是串行線程封閉的經(jīng)典應用場景时捌,如數(shù)據(jù)庫連接池等。

對象池

對象池利用了串行封閉:將對象O“借給”一個請求線程T1稚叹,T1使用完再交還給對象池禽笑,并保證“未擅自發(fā)布該對象”且“以后不再使用”佳镜;對象池收回O后凡桥,等T2來借的時候再把它借給T2,完成對象所有權的傳遞啊掏。

猴子擼了一個簡化版的線程池衰猛,用戶只需要覆寫newObject()方法:

public abstract class AbstractObjectPool<T> {
  protected final int min;
  protected final int max;
  protected final List<T> usings = new LinkedList<>();
  protected final List<T> buffer = new LinkedList<>();
  private volatile boolean inited = false;

  public AbstractObjectPool(int min, int max) {
    this.min = min;
    this.max = max;
    if (this.min < 0 || this.min > this.max) {
      throw new IllegalArgumentException(String.format(
          "need 0 <= min <= max <= Integer.MAX_VALUE, given min: %s, max: %s", this.min, this.max));
    }
  }

  public void init() {
    for (int i = 0; i < min; i++) {
      buffer.add(newObject());
    }
    inited = true;
  }

  protected void checkInited() {
    if (!inited) {
      throw new IllegalStateException("not inited");
    }
  }

  abstract protected T newObject();

  public synchronized T getObject() {
    checkInited();

    if (usings.size() == max) {
      return null;
    }
    if (buffer.size() == 0) {
      T newObj = newObject();
      usings.add(newObj);
      return newObj;
    }
    T oldObj = buffer.remove(0);
    usings.add(oldObj);
    return oldObj;
  }

  public synchronized void freeObject(T obj) {
    checkInited();
    if (!usings.contains(obj)) {
      throw new IllegalArgumentException(String.format("obj not in using queue: %s", obj));
    }

    usings.remove(usings.indexOf(obj));
    buffer.add(obj);
  }
}

AbstractObjectPool具有以下特性:

  • 支持設置最小娜睛、最大容量
  • 對象一旦申請就不再釋放,避免了GC

雖然很簡單方库,但大可以用于一些時間敏感障斋、資源充裕的場景垃环。如果時間進一步敏感,可將getObject()被济、freeObject()改寫為并發(fā)程度更高的版本涧团,但記得保證安全發(fā)布安全回收泌绣;如果資源不那么充裕,可以適當增加對象回收策略元媚。

可以看到苗沧,一個對象池的基本行為包括:

  • 創(chuàng)建對象newObject()
  • 借取對象getObject()
  • 歸還對象freeObject()

典型的對象池有各種連接池待逞、常量池等,應用非常多嗤无,模型也大同小異怜庸,不做解析割疾。令人迷惑的是線程池,很容易讓人誤以為線程池的核心原理也是對象池拓诸,下面來追一遍源碼。

線程池

首先擺出結論:線程池糅合了對象池模型趣钱,但核心原理是生產(chǎn)者-消費者模型首有。

繼承結構如下:

image.png

用戶可以將Runnable(或Callables)實例提交給線程池枢劝,線程池會異步執(zhí)行該任務您旁,返回響應的結果(完成/返回值)。

猴子最喜歡的是submit(Callable<T> task)方法蚕脏。我們從該方法入手侦锯,逐步深入函數(shù)棧尺碰,探究線程池的實現(xiàn)原理。

submit()

submit()方法在ExecutorService接口中定義洛心,AbstractExecutorService實現(xiàn)词身,ThreadPoolExecutor直接繼承悼凑。

public abstract class AbstractExecutorService implements ExecutorService {
...
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
...
}

AbstractExecutorService#newTaskFor()創(chuàng)建一個RunnableFuture類型的FutureTask户辫。

核心是execute()方法嗤锉。

execute()

execute()方法在Executor接口中定義瘟忱,ThreadPoolExecutor實現(xiàn)苫幢。

public class ThreadPoolExecutor extends AbstractExecutorService {
...
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
...
}

我們暫且忽略線程池的池化策略韩肝。關注一個最簡單的場景九榔,看能不能先回答一個問題:線程池中的任務如何執(zhí)行哲泊?

核心是addWorker()方法。以8行的參數(shù)為例,此時,線程池中的線程數(shù)未達到最小線程池大小corePoolSize敢伸,通吃海可以直接在9行返回截酷。

addWorker()

簡化如下:

public class ThreadPoolExecutor extends AbstractExecutorService {
...
    private boolean addWorker(Runnable firstTask, boolean core) {
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN) {
                        workers.add(w);
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
...
}

我去掉了很多用于管理線程池迂苛、維護線程安全的代碼。假設線程池未關閉就漾,worker(即w抑堡,下同)添加成功朗徊,則必然能夠將worker添加至workers中爷恳。workers是一個HashSet:

private final HashSet<Worker> workers = new HashSet<Worker>();

哪里是對象池?

如果說與對象池有關棚壁,那么workers即相當于示例代碼中的using,應用了對象池模型;只不過這里的using是一直增長的史隆,直到達到最大線程池大小maximumPoolSize泌射。

但是很明顯蚣驼,線程池并沒有將線程發(fā)布出去颖杏,workers也僅僅完成using“保存線程”的功能。那么翼抠,線程池中的任務如何執(zhí)行呢获讳?跟線程池有沒有關系丐膝?

哪里又不是?

注意9偎肃、17累颂、24行:

  • 9行將我們提交到線程池的firstTask封裝入一個worker凛俱。
  • 17行將worker加入workers蒲犬,維護起來
  • 24行則啟動了worker中的線程t

核心在與這三行,但線程池并沒有直接在addWorker()中啟動任務firstTask赌朋,代之以啟動一個worker篇裁。最終任務必然被啟動达布,那么我們繼續(xù)看Worker如何啟動這個任務。

Worker

Worker實現(xiàn)了Runnable接口:

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
...
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }
...
}

為什么要將構造Worker時的參數(shù)命名為firstTask?因為當且僅當需要建立新的Worker以執(zhí)行任務task時产还,才會調用構造函數(shù)脐区。因此,任務task對于新Worker而言炕柔,是第一個任務firstTask匕累。

Worker的實現(xiàn)非常簡單:將自己作為Runable實例默伍,構造時在內部創(chuàng)建并持有一個線程thread也糊。Thread和Runable的使用大家很熟悉了,核心是Worker的run方法框弛,它直接調用了runWorker()方法瑟枫。

runWorker()

敲黑板V冈堋T试谩!

重頭戲來了架馋。簡化如下:

public class ThreadPoolExecutor extends AbstractExecutorService {
...
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
...
}

我們在前面將要執(zhí)行的任務賦值給firstTask萍启,5-6行首先取出任務task屏鳍,并將firstTask置為null钓瞭。因為接下來要執(zhí)行task,firstTask字段就沒有用了堤结。

重點是10-31行的while循環(huán)佳鳖。下面分情況討論系吩。

case1:第一次進入循環(huán),task不為null

case1對應前面作出的諸多假設月弛。

第一次進入循環(huán)時帽衙,task==firstTask贞绵,不為null榨崩,使10行布爾短路直接進入循環(huán)母蛛;從而16行執(zhí)行的是firstTask的run()方法;異常處理不表前弯;最后,finally代碼塊中剃根,task會被置為null狈醉,導致下一輪循環(huán)會進入case2。

case2:非第一次進入循環(huán)班巩,task為null

case2是更普遍的情況逊桦,也就是線程池的核心玲躯。

case1中,task被置為了null劳坑,使10行布爾表達式執(zhí)行第二部分(task = getTask()) != null(getTask()稍后再講距芬,它返回一個用戶已提交的任務)。假設task得到了一個已提交的任務舀武,從而16行執(zhí)行的是新獲得的任務task的run()方法银舱。后同case1跛梗,最后task仍然會被置為null核偿,以后循環(huán)都將進入case2。

getTask()

任務從哪來呢聂薪?簡化如下:

public class ThreadPoolExecutor extends AbstractExecutorService {
...
    private Runnable getTask() {
        boolean timedOut = false;

        for (;;) {
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
...
}

我們先看最簡單的藏澳,19-28行翔悠。

首先蓄愁,workQueue是一個線程安全的BlockingQueue狞悲,大部分時候使用的實現(xiàn)類是LinkedBlockingQueue摇锋,見源碼|并發(fā)一枝花之BlockingQueue

private final BlockingQueue<Runnable> workQueue;

假設timed為false丹拯,則調用阻塞的take()方法站超,返回的r一定不是null,從而12行退出乖酬,將任務交給了某個worker線程死相。

一個小細節(jié)有點意思:前面每個worker線程runWorker()方法時,在循環(huán)中加鎖粒度在worker級別咬像,直接使用的lock同步算撮;但因為每一個woker都會調用getTask(),考慮到性能因素县昂,源碼中getTask()中使用樂觀的CAS+SPIN實現(xiàn)無鎖同步。關于樂觀鎖和CAS七芭,可以參考我的另一篇文章源碼|并發(fā)一枝花之ConcurrentLinkedQueue【偽】素挽。

workQueue中的元素從哪來呢?這就要回顧execute()方法了狸驳。

execute()

public class ThreadPoolExecutor extends AbstractExecutorService {
...
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
...
}


前面以8行的參數(shù)為例,此時缩赛,線程池中的線程數(shù)未達到最小線程池大小corePoolSize耙箍,通常可以直接在9行返回酥馍。進入8行的條件是“當前worker數(shù)小于最小線程池大小corePoolSize”辩昆。

如果不滿足,會繼續(xù)執(zhí)行到12行旨袒。isRunning(c)判斷線程池是否未關閉汁针,我們關注未關閉的情況;則會繼續(xù)執(zhí)行布爾表達式的第二部分workQueue.offer(command)砚尽,嘗試將任務command放入隊列workQueue施无。

workQueue.offer()的行為取決于線程池持有的BlockingQueue實例。Executors.newFixedThreadPool()必孤、Executors.newSingleThreadExecutor()創(chuàng)建的線程池使用LinkedBlockingQueue猾骡,而Executors.newCachedThreadPool()創(chuàng)建的線程池則使用SynchronousQueue。以LinkedBlockingQueue為例敷搪,創(chuàng)建時不配置容量兴想,即創(chuàng)建為無界隊列,則LinkedBlockingQueue#offer()永遠返回true赡勘,從而進入12-18行嫂便。

更細節(jié)的內容不必關心了,當workQueue.offer()返回true時闸与,已經(jīng)將任務command放入了隊列workQueue毙替。當未來的某個時刻曼振,某worker執(zhí)行完某一個任務之后,會從workQueue中再取出一個任務繼續(xù)執(zhí)行蔚龙,直到線程池關閉冰评,直到海枯石爛木羹。

CachedThreadPool是一種無界線程池甲雅,使用SynchronousQueue能進一步提升性能,簡化代碼結構坑填。留給讀者分析抛人。

case2小結

可以看到,實際上脐瑰,線程池的核心原理與對象池模型無關妖枚,而是生產(chǎn)者-消費者模型

image.png
  • 生產(chǎn)者(調用submit()或execute()方法)將任務task放入隊列
  • 消費者(worker線程)循環(huán)從隊列中取出任務處理任務(執(zhí)行task.run())。

鉤子方法

回到runWorker()方法苍在,在執(zhí)行任務的過程中绝页,線程池保留了一些鉤子方法,如beforeExecute()寂恬、afterExecute()续誉。用戶可以在實現(xiàn)自己的線程池時,可以通過覆寫鉤子方法為線程池添加功能初肉。

猴子不認為鉤子方法是一種好的設計酷鸦。因為鉤子方法大多依賴于源碼實現(xiàn),那么除非了解源碼或API聲明絕對的嚴謹正確牙咏,否則很難正確使用鉤子方法臼隔。等發(fā)生錯誤時再去了解實現(xiàn),可能就太晚了妄壶。說到底摔握,還是不要使用類似extends這種表達“擴展”語義的語法來實現(xiàn)繼承,詳見Java中如何恰當?shù)谋磉_“繼承”與“擴展”的語義盯拱?盒发。

當然,鉤子方法也是極其方便的狡逢。權衡看待宁舰。

總結

相對于線程封閉,串行線程封閉離用戶的距離更近一些奢浑,簡單靈活蛮艰,實用性強,很容易掌握雀彼。而線程封閉更多淪為單純的設計策略壤蚜,單純使用線程封閉的場景不多即寡。

線程池與串行線程封閉、對象池的關系不大袜刷,但經(jīng)常被混為一談聪富;沒看過源碼的很難想到其實現(xiàn)方案,面試時也能立分高下著蟹。

線程池的實現(xiàn)很有意思墩蔓。在追源碼之前,猴子一直以為線程池就是把線程存起來萧豆,用的時候取出來執(zhí)行任務奸披;看了源碼才知道實現(xiàn)如此之妙,簡潔優(yōu)雅效率高涮雷。源碼才是最好的老師阵面。


本文鏈接:源碼|從串行線程封閉到對象池、線程池
作者:猴子007
出處:https://monkeysayhi.github.io
本文基于 知識共享署名-相同方式共享 4.0 國際許可協(xié)議發(fā)布洪鸭,歡迎轉載样刷,演繹或用于商業(yè)目的,但是必須保留本文的署名及鏈接卿嘲。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末颂斜,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子拾枣,更是在濱河造成了極大的恐慌,老刑警劉巖盒让,帶你破解...
    沈念sama閱讀 217,826評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件梅肤,死亡現(xiàn)場離奇詭異,居然都是意外死亡邑茄,警方通過查閱死者的電腦和手機姨蝴,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,968評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來肺缕,“玉大人左医,你說我怎么就攤上這事⊥荆” “怎么了浮梢?”我有些...
    開封第一講書人閱讀 164,234評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長彤路。 經(jīng)常有香客問我秕硝,道長,這世上最難降的妖魔是什么洲尊? 我笑而不...
    開封第一講書人閱讀 58,562評論 1 293
  • 正文 為了忘掉前任远豺,我火速辦了婚禮奈偏,結果婚禮上,老公的妹妹穿的比我還像新娘躯护。我一直安慰自己惊来,他們只是感情好,可當我...
    茶點故事閱讀 67,611評論 6 392
  • 文/花漫 我一把揭開白布棺滞。 她就那樣靜靜地躺著裁蚁,像睡著了一般。 火紅的嫁衣襯著肌膚如雪检眯。 梳的紋絲不亂的頭發(fā)上厘擂,一...
    開封第一講書人閱讀 51,482評論 1 302
  • 那天,我揣著相機與錄音锰瘸,去河邊找鬼刽严。 笑死,一個胖子當著我的面吹牛避凝,可吹牛的內容都是我干的舞萄。 我是一名探鬼主播,決...
    沈念sama閱讀 40,271評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼管削,長吁一口氣:“原來是場噩夢啊……” “哼倒脓!你這毒婦竟也來了?” 一聲冷哼從身側響起含思,我...
    開封第一講書人閱讀 39,166評論 0 276
  • 序言:老撾萬榮一對情侶失蹤崎弃,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后含潘,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體饲做,經(jīng)...
    沈念sama閱讀 45,608評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,814評論 3 336
  • 正文 我和宋清朗相戀三年遏弱,在試婚紗的時候發(fā)現(xiàn)自己被綠了盆均。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,926評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡漱逸,死狀恐怖泪姨,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情饰抒,我是刑警寧澤肮砾,帶...
    沈念sama閱讀 35,644評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站循集,受9級特大地震影響唇敞,放射性物質發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,249評論 3 329
  • 文/蒙蒙 一疆柔、第九天 我趴在偏房一處隱蔽的房頂上張望咒精。 院中可真熱鬧,春花似錦旷档、人聲如沸模叙。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,866評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽范咨。三九已至,卻和暖如春厂庇,著一層夾襖步出監(jiān)牢的瞬間渠啊,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,991評論 1 269
  • 我被黑心中介騙來泰國打工权旷, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留替蛉,地道東北人。 一個月前我還...
    沈念sama閱讀 48,063評論 3 370
  • 正文 我出身青樓拄氯,卻偏偏與公主長得像躲查,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子译柏,可洞房花燭夜當晚...
    茶點故事閱讀 44,871評論 2 354

推薦閱讀更多精彩內容