今天講一個牛逼而實用的概念干像,串行線程封閉
。對象池
是串行線程封閉的典型應用場景狞膘;線程池
糅合了對象池技術什乙,但核心實現(xiàn)不依賴于對象池已球,很容易產(chǎn)生誤會智亮。本文從串行線程封閉和對象池入手,最后通過源碼分析線程池的核心原理阔蛉,厘清對象池與線程池之間的誤會状原。
JDK版本:oracle java 1.8.0_102
線程封閉與串行線程封閉
線程封閉
線程封閉是一種常見的線程安全設計策略:僅在固定的一個線程內訪問對象,不對其他線程共享削锰。
使用線程封閉技術毕莱,對象O始終只對一個線程T1可見,“單線程”中自然不存在線程安全的問題蛹稍。
ThreadLocal是常用的線程安全工具唆姐,見源碼|ThreadLocal的實現(xiàn)原理。線程封閉在Servlet及高層的web框架Spring等中應用不少厦酬。
串行線程封閉
線程封閉雖然好用胆描,卻限制了對象的共享昌讲。串行線程封閉改進了這一點:對象O只能由單個線程T1擁有减噪,但可以通過安全的發(fā)布對象O來轉移O的所有權;在轉移所有權后醋闭,也只有另一個線程T2能獲得這個O的所有權朝卒,并且發(fā)布O的T1不會再訪問O抗斤。
所謂“所有權”,指修改對象的權利龙宏。
相對于線程封閉伤疙,串行線程封閉使得任意時刻徒像,最多僅有一個線程擁有對象的所有權。當然衅澈,這不是絕對的谬墙,只要線程T1事實不會再修改對象O经备,那么就相當于僅有T2擁有對象的所有權侵蒙。串行線層封閉讓對象變得可以共享(雖然只能串行的擁有所有權)傅蹂,靈活性得到大大提高份蝴;相對的,要共享對象就涉及安全發(fā)布的問題浸卦,依靠BlockingQueue等同步工具很容易實現(xiàn)這一點案糙。
對象池是串行線程封閉的經(jīng)典應用場景时捌,如數(shù)據(jù)庫連接池等。
對象池
對象池利用了串行封閉:將對象O“借給”一個請求線程T1稚叹,T1使用完再交還給對象池禽笑,并保證“未擅自發(fā)布該對象”且“以后不再使用”佳镜;對象池收回O后凡桥,等T2來借的時候再把它借給T2,完成對象所有權的傳遞啊掏。
猴子擼了一個簡化版的線程池衰猛,用戶只需要覆寫newObject()方法:
public abstract class AbstractObjectPool<T> {
protected final int min;
protected final int max;
protected final List<T> usings = new LinkedList<>();
protected final List<T> buffer = new LinkedList<>();
private volatile boolean inited = false;
public AbstractObjectPool(int min, int max) {
this.min = min;
this.max = max;
if (this.min < 0 || this.min > this.max) {
throw new IllegalArgumentException(String.format(
"need 0 <= min <= max <= Integer.MAX_VALUE, given min: %s, max: %s", this.min, this.max));
}
}
public void init() {
for (int i = 0; i < min; i++) {
buffer.add(newObject());
}
inited = true;
}
protected void checkInited() {
if (!inited) {
throw new IllegalStateException("not inited");
}
}
abstract protected T newObject();
public synchronized T getObject() {
checkInited();
if (usings.size() == max) {
return null;
}
if (buffer.size() == 0) {
T newObj = newObject();
usings.add(newObj);
return newObj;
}
T oldObj = buffer.remove(0);
usings.add(oldObj);
return oldObj;
}
public synchronized void freeObject(T obj) {
checkInited();
if (!usings.contains(obj)) {
throw new IllegalArgumentException(String.format("obj not in using queue: %s", obj));
}
usings.remove(usings.indexOf(obj));
buffer.add(obj);
}
}
AbstractObjectPool具有以下特性:
- 支持設置最小娜睛、最大容量
- 對象一旦申請就不再釋放,避免了GC
雖然很簡單方库,但大可以用于一些時間敏感障斋、資源充裕的場景垃环。如果時間進一步敏感,可將getObject()被济、freeObject()改寫為并發(fā)程度更高的版本涧团,但記得保證安全發(fā)布安全回收泌绣;如果資源不那么充裕,可以適當增加對象回收策略元媚。
可以看到苗沧,一個對象池的基本行為包括:
- 創(chuàng)建對象newObject()
- 借取對象getObject()
- 歸還對象freeObject()
典型的對象池有各種連接池待逞、常量池等,應用非常多嗤无,模型也大同小異怜庸,不做解析割疾。令人迷惑的是線程池,很容易讓人誤以為線程池的核心原理也是對象池拓诸,下面來追一遍源碼。
線程池
首先擺出結論:線程池糅合了對象池模型趣钱,但核心原理是生產(chǎn)者-消費者模型首有。
繼承結構如下:
用戶可以將Runnable(或Callables)實例提交給線程池枢劝,線程池會異步執(zhí)行該任務您旁,返回響應的結果(完成/返回值)。
猴子最喜歡的是submit(Callable<T> task)
方法蚕脏。我們從該方法入手侦锯,逐步深入函數(shù)棧尺碰,探究線程池的實現(xiàn)原理。
submit()
submit()方法在ExecutorService接口中定義洛心,AbstractExecutorService實現(xiàn)词身,ThreadPoolExecutor直接繼承悼凑。
public abstract class AbstractExecutorService implements ExecutorService {
...
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
...
}
AbstractExecutorService#newTaskFor()創(chuàng)建一個RunnableFuture類型的FutureTask户辫。
核心是execute()方法嗤锉。
execute()
execute()方法在Executor接口中定義瘟忱,ThreadPoolExecutor實現(xiàn)苫幢。
public class ThreadPoolExecutor extends AbstractExecutorService {
...
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
...
}
我們暫且忽略線程池的池化策略韩肝。關注一個最簡單的場景九榔,看能不能先回答一個問題:線程池中的任務如何執(zhí)行哲泊?
核心是addWorker()方法。以8行的參數(shù)為例,此時,線程池中的線程數(shù)未達到最小線程池大小corePoolSize敢伸,通吃海可以直接在9行返回截酷。
addWorker()
簡化如下:
public class ThreadPoolExecutor extends AbstractExecutorService {
...
private boolean addWorker(Runnable firstTask, boolean core) {
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN) {
workers.add(w);
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
...
}
我去掉了很多用于管理線程池迂苛、維護線程安全的代碼。假設線程池未關閉就漾,worker(即w抑堡,下同)添加成功朗徊,則必然能夠將worker添加至workers中爷恳。workers是一個HashSet:
private final HashSet<Worker> workers = new HashSet<Worker>();
哪里是對象池?
如果說與對象池有關棚壁,那么workers即相當于示例代碼中的using,應用了對象池模型;只不過這里的using是一直增長的史隆,直到達到最大線程池大小maximumPoolSize泌射。
但是很明顯蚣驼,線程池并沒有將線程發(fā)布出去颖杏,workers也僅僅完成using“保存線程”的功能。那么翼抠,線程池中的任務如何執(zhí)行呢获讳?跟線程池有沒有關系丐膝?
哪里又不是?
注意9偎肃、17累颂、24行:
- 9行將我們提交到線程池的firstTask封裝入一個worker凛俱。
- 17行將worker加入workers蒲犬,維護起來
- 24行則啟動了worker中的線程t
核心在與這三行,但線程池并沒有直接在addWorker()中啟動任務firstTask赌朋,代之以啟動一個worker篇裁。最終任務必然被啟動达布,那么我們繼續(xù)看Worker如何啟動這個任務。
Worker
Worker實現(xiàn)了Runnable接口:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
...
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
...
}
為什么要將構造Worker時的參數(shù)命名為firstTask?因為當且僅當需要建立新的Worker以執(zhí)行任務task時产还,才會調用構造函數(shù)脐区。因此,任務task對于新Worker而言炕柔,是第一個任務firstTask匕累。
Worker的實現(xiàn)非常簡單:將自己作為Runable實例默伍,構造時在內部創(chuàng)建并持有一個線程thread也糊。Thread和Runable的使用大家很熟悉了,核心是Worker的run方法框弛,它直接調用了runWorker()方法瑟枫。
runWorker()
敲黑板V冈堋T试谩!
重頭戲來了架馋。簡化如下:
public class ThreadPoolExecutor extends AbstractExecutorService {
...
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
...
}
我們在前面將要執(zhí)行的任務賦值給firstTask萍启,5-6行首先取出任務task屏鳍,并將firstTask置為null钓瞭。因為接下來要執(zhí)行task,firstTask字段就沒有用了堤结。
重點是10-31行的while循環(huán)佳鳖。下面分情況討論系吩。
case1:第一次進入循環(huán),task不為null
case1對應前面作出的諸多假設月弛。
第一次進入循環(huán)時帽衙,task==firstTask贞绵,不為null榨崩,使10行布爾短路直接進入循環(huán)母蛛;從而16行執(zhí)行的是firstTask的run()方法;異常處理不表前弯;最后,finally代碼塊中剃根,task會被置為null狈醉,導致下一輪循環(huán)會進入case2。
case2:非第一次進入循環(huán)班巩,task為null
case2是更普遍的情況逊桦,也就是線程池的核心玲躯。
case1中,task被置為了null劳坑,使10行布爾表達式執(zhí)行第二部分(task = getTask()) != null
(getTask()稍后再講距芬,它返回一個用戶已提交的任務)。假設task得到了一個已提交的任務舀武,從而16行執(zhí)行的是新獲得的任務task的run()方法银舱。后同case1跛梗,最后task仍然會被置為null核偿,以后循環(huán)都將進入case2。
getTask()
任務從哪來呢聂薪?簡化如下:
public class ThreadPoolExecutor extends AbstractExecutorService {
...
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
...
}
我們先看最簡單的藏澳,19-28行翔悠。
首先蓄愁,workQueue是一個線程安全的BlockingQueue狞悲,大部分時候使用的實現(xiàn)類是LinkedBlockingQueue摇锋,見源碼|并發(fā)一枝花之BlockingQueue:
private final BlockingQueue<Runnable> workQueue;
假設timed為false丹拯,則調用阻塞的take()方法站超,返回的r一定不是null,從而12行退出乖酬,將任務交給了某個worker線程死相。
一個小細節(jié)有點意思:前面每個worker線程runWorker()方法時,在循環(huán)中加鎖粒度在worker級別咬像,直接使用的lock同步算撮;但因為每一個woker都會調用getTask(),考慮到性能因素县昂,源碼中getTask()中使用樂觀的CAS+SPIN實現(xiàn)無鎖同步。關于樂觀鎖和CAS七芭,可以參考我的另一篇文章源碼|并發(fā)一枝花之ConcurrentLinkedQueue【偽】素挽。
workQueue中的元素從哪來呢?這就要回顧execute()方法了狸驳。
execute()
public class ThreadPoolExecutor extends AbstractExecutorService {
...
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
...
}
前面以8行的參數(shù)為例,此時缩赛,線程池中的線程數(shù)未達到最小線程池大小corePoolSize耙箍,通常可以直接在9行返回酥馍。進入8行的條件是“當前worker數(shù)小于最小線程池大小corePoolSize”
辩昆。
如果不滿足,會繼續(xù)執(zhí)行到12行旨袒。isRunning(c)
判斷線程池是否未關閉汁针,我們關注未關閉的情況;則會繼續(xù)執(zhí)行布爾表達式的第二部分workQueue.offer(command)
砚尽,嘗試將任務command放入隊列workQueue施无。
workQueue.offer()的行為取決于線程池持有的BlockingQueue實例。Executors.newFixedThreadPool()必孤、Executors.newSingleThreadExecutor()創(chuàng)建的線程池使用LinkedBlockingQueue猾骡,而Executors.newCachedThreadPool()創(chuàng)建的線程池則使用SynchronousQueue。以LinkedBlockingQueue為例敷搪,創(chuàng)建時不配置容量兴想,即創(chuàng)建為無界隊列,則LinkedBlockingQueue#offer()永遠返回true赡勘,從而進入12-18行嫂便。
更細節(jié)的內容不必關心了,當workQueue.offer()返回true時闸与,已經(jīng)將任務command放入了隊列workQueue毙替。當未來的某個時刻曼振,某worker執(zhí)行完某一個任務之后,會從workQueue中再取出一個任務繼續(xù)執(zhí)行蔚龙,直到線程池關閉冰评,直到海枯石爛木羹。
CachedThreadPool是一種無界線程池甲雅,使用SynchronousQueue能進一步提升性能,簡化代碼結構坑填。留給讀者分析抛人。
case2小結
可以看到,實際上脐瑰,線程池的核心原理與對象池模型無關妖枚,而是生產(chǎn)者-消費者模型:
- 生產(chǎn)者(調用submit()或execute()方法)將任務task放入隊列
- 消費者(worker線程)循環(huán)從隊列中取出任務處理任務(執(zhí)行task.run())。
鉤子方法
回到runWorker()方法苍在,在執(zhí)行任務的過程中绝页,線程池保留了一些鉤子方法,如beforeExecute()寂恬、afterExecute()续誉。用戶可以在實現(xiàn)自己的線程池時,可以通過覆寫鉤子方法為線程池添加功能初肉。
但猴子不認為鉤子方法是一種好的設計酷鸦。因為鉤子方法大多依賴于源碼實現(xiàn),那么除非了解源碼或API聲明絕對的嚴謹正確牙咏,否則很難正確使用鉤子方法臼隔。等發(fā)生錯誤時再去了解實現(xiàn),可能就太晚了妄壶。說到底摔握,還是不要使用類似extends這種表達“擴展”語義的語法來實現(xiàn)繼承,詳見Java中如何恰當?shù)谋磉_“繼承”與“擴展”的語義盯拱?盒发。
當然,鉤子方法也是極其方便的狡逢。權衡看待宁舰。
總結
相對于線程封閉,串行線程封閉離用戶的距離更近一些奢浑,簡單靈活蛮艰,實用性強,很容易掌握雀彼。而線程封閉更多淪為單純的設計策略壤蚜,單純使用線程封閉的場景不多即寡。
線程池與串行線程封閉、對象池的關系不大袜刷,但經(jīng)常被混為一談聪富;沒看過源碼的很難想到其實現(xiàn)方案,面試時也能立分高下著蟹。
線程池的實現(xiàn)很有意思墩蔓。在追源碼之前,猴子一直以為線程池就是把線程存起來萧豆,用的時候取出來執(zhí)行任務奸披;看了源碼才知道實現(xiàn)如此之妙,簡潔優(yōu)雅效率高涮雷。源碼才是最好的老師阵面。
本文鏈接:源碼|從串行線程封閉到對象池、線程池
作者:猴子007
出處:https://monkeysayhi.github.io
本文基于 知識共享署名-相同方式共享 4.0 國際許可協(xié)議發(fā)布洪鸭,歡迎轉載样刷,演繹或用于商業(yè)目的,但是必須保留本文的署名及鏈接卿嘲。