最近在改進項目的并發(fā)功能呐矾,但開發(fā)起來磕磕碰碰的∨成埃看了好多資料蜒犯,總算加深了認(rèn)識组橄。于是打算配合查看源代碼,總結(jié)并發(fā)編程的原理罚随。
準(zhǔn)備從用得最多的線程池開始玉工,圍繞創(chuàng)建、執(zhí)行淘菩、關(guān)閉認(rèn)識線程池整個生命周期的實現(xiàn)原理遵班。后續(xù)再研究原子變量、并發(fā)容器潮改、阻塞隊列狭郑、同步工具、鎖等等主題进陡。java.util.concurrent里的并發(fā)工具用起來不難愿阐,但不能僅僅會用微服,我們要read the fucking source code趾疚,哈哈。順便說聲以蕴,我用的JDK是1.8糙麦。
Executor框架
Executor是一套線程池管理框架,接口里只有一個方法execute丛肮,執(zhí)行Runnable任務(wù)赡磅。ExecutorService接口擴展了Executor,添加了線程生命周期的管理宝与,提供任務(wù)終止焚廊、返回任務(wù)結(jié)果等方法。AbstractExecutorService實現(xiàn)了ExecutorService习劫,提供例如submit方法的默認(rèn)實現(xiàn)邏輯咆瘟。
然后到今天的主題ThreadPoolExecutor,繼承了AbstractExecutorService诽里,提供線程池的具體實現(xiàn)袒餐。
構(gòu)造方法
下面是ThreadPoolExecutor最普通的構(gòu)造函數(shù),最多有七個參數(shù)谤狡。具體代碼不貼了灸眼,只是一些參數(shù)校驗和設(shè)置的語句。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}
corePoolSize是線程池的目標(biāo)大小墓懂,也叫核心線程數(shù)焰宣。maximumPoolSize是線程池的最大上限,maximumPoolSize減去corePoolSize即是非核心線程數(shù)捕仔,或者叫空閑線程匕积。keepAliveTime指明空閑線程的存活時間佛嬉,超出存活時間的空閑線程就會被回收。unit就不用說了闸天,剩下的三個參數(shù)看后文的分析暖呕。
預(yù)設(shè)的定制線程池
ThreadPoolExecutor預(yù)設(shè)了一些已經(jīng)定制好的線程池,由Executors里的工廠方法創(chuàng)建苞氮。下面分析newSingleThreadExecutor湾揽、newFixedThreadPool、newCachedThreadPool的創(chuàng)建參數(shù)笼吟。
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newFixedThreadPool的corePoolSize和maximumPoolSize都設(shè)置為傳入的固定數(shù)量库物,keepAliveTim設(shè)置為0。線程池創(chuàng)建后贷帮,線程數(shù)量將會固定不變戚揭,適合需要線程很穩(wěn)定的場合。
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
newSingleThreadExecutor是線程數(shù)量固定為1的newFixedThreadPool版本撵枢,保證池內(nèi)的任務(wù)串行民晒。注意到返回的是FinalizableDelegatedExecutorService,來看看源碼:
static class FinalizableDelegatedExecutorService
extends DelegatedExecutorService {
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
protected void finalize() {
super.shutdown();
}
}
FinalizableDelegatedExecutorService繼承了DelegatedExecutorService锄禽,僅僅在gc時增加關(guān)閉線程池的操作潜必,再來看看DelegatedExecutorService的源碼:
static class DelegatedExecutorService extends AbstractExecutorService {
private final ExecutorService e;
DelegatedExecutorService(ExecutorService executor) { e = executor; }
public void execute(Runnable command) { e.execute(command); }
public void shutdown() { e.shutdown(); }
public List<Runnable> shutdownNow() { return e.shutdownNow(); }
public boolean isShutdown() { return e.isShutdown(); }
public boolean isTerminated() { return e.isTerminated(); }
//...
}
代碼很簡單,DelegatedExecutorService包裝了ExecutorService沃但,使其只暴露出ExecutorService的方法磁滚,因此不能再配置線程池的參數(shù)。本來宵晚,線程池創(chuàng)建的參數(shù)是可以調(diào)整的垂攘,ThreadPoolExecutor提供了set方法。使用newSingleThreadExecutor目的是生成單線程串行的線程池淤刃,如果還能配置線程池大小晒他,那就沒意思了。
Executors還提供了unconfigurableExecutorService方法钝凶,將普通線程池包裝成不可配置的線程池仪芒。如果不想線程池被不明所以的后人修改,可以調(diào)用這個方法耕陷。
newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newCachedThreadPool生成一個會緩存的線程池掂名,線程數(shù)量可以從0到Integer.MAX_VALUE,超時時間為1分鐘哟沫。線程池用起來的效果是:如果有空閑線程饺蔑,會復(fù)用線程;如果沒有空閑線程嗜诀,會新建線程猾警;如果線程空閑超過1分鐘孔祸,將會被回收。
newScheduledThreadPool
newScheduledThreadPool將會創(chuàng)建一個可定時執(zhí)行任務(wù)的線程池发皿。這個不打算在本文展開崔慧,后續(xù)會另開文章細(xì)講。
等待隊列
newCachedThreadPool的線程上限幾乎等同于無限穴墅,但系統(tǒng)資源是有限的惶室,任務(wù)的處理速度總有可能比不上任務(wù)的提交速度。因此玄货,可以為ThreadPoolExecutor提供一個阻塞隊列來保存因線程不足而等待的Runnable任務(wù)皇钞,這就是BlockingQueue。
JDK為BlockingQueue提供了幾種實現(xiàn)方式松捉,常用的有:
- ArrayBlockingQueue:數(shù)組結(jié)構(gòu)的阻塞隊列
- LinkedBlockingQueue:鏈表結(jié)構(gòu)的阻塞隊列
- PriorityBlockingQueue:有優(yōu)先級的阻塞隊列
- SynchronousQueue:不會存儲元素的阻塞隊列
newFixedThreadPool和newSingleThreadExecutor在默認(rèn)情況下使用一個無界的LinkedBlockingQueue夹界。要注意的是,如果任務(wù)一直提交隘世,但線程池又不能及時處理可柿,等待隊列將會無限制地加長,系統(tǒng)資源總會有消耗殆盡的一刻以舒。所以趾痘,推薦使用有界的等待隊列慢哈,避免資源耗盡蔓钟。但解決一個問題,又會帶來新問題:隊列填滿之后卵贱,再來新任務(wù)滥沫,這個時候怎么辦?后文會介紹如何處理隊列飽和键俱。
newCachedThreadPool使用的SynchronousQueue十分有趣兰绣,看名稱是個隊列,但它卻不能存儲元素编振。要將一個任務(wù)放進隊列缀辩,必須有另一個線程去接收這個任務(wù),一個進就有一個出踪央,隊列不會存儲任何東西臀玄。因此,SynchronousQueue是一種移交機制畅蹂,不能算是隊列健无。newCachedThreadPool生成的是一個沒有上限的線程池,理論上提交多少任務(wù)都可以液斜,使用SynchronousQueue作為等待隊列正合適累贤。
飽和策略
當(dāng)有界的等待隊列滿了之后叠穆,就需要用到飽和策略去處理,ThreadPoolExecutor的飽和策略通過傳入RejectedExecutionHandler來實現(xiàn)臼膏。如果沒有為構(gòu)造函數(shù)傳入硼被,將會使用默認(rèn)的defaultHandler。
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
}
}
AbortPolicy是默認(rèn)的實現(xiàn)渗磅,直接拋出一個RejectedExecutionException異常祷嘶,讓調(diào)用者自己處理。除此之外夺溢,還有幾種飽和策略论巍,來看一下:
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
DiscardPolicy的rejectedExecution直接是空方法,什么也不干风响。如果隊列滿了嘉汰,后續(xù)的任務(wù)都拋棄掉。
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
DiscardOldestPolicy會將等待隊列里最舊的任務(wù)踢走状勤,讓新任務(wù)得以執(zhí)行鞋怀。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
最后一種飽和策略是CallerRunsPolicy,它既不拋棄新任務(wù)持搜,也不拋棄舊任務(wù)密似,而是直接在當(dāng)前線程運行這個任務(wù)。當(dāng)前線程一般就是主線程啊葫盼,讓主線程運行任務(wù)残腌,說不定就阻塞了。如果不是想清楚了整套方案贫导,還是少用這種策略為妙抛猫。
ThreadFactory
每當(dāng)線程池需要創(chuàng)建一個新線程,都是通過線程工廠獲取孩灯。如果不為ThreadPoolExecutor設(shè)定一個線程工廠闺金,就會使用默認(rèn)的defaultThreadFactory:
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
平時打印線程池里線程的name時,會輸出形如pool-1-thread-1之類的名稱峰档,就是在這里設(shè)置的败匹。這個默認(rèn)的線程工廠,創(chuàng)建的線程是普通的非守護線程讥巡,如果需要定制掀亩,實現(xiàn)ThreadFactory后傳給ThreadPoolExecutor即可。
不看代碼不總結(jié)不會知道尚卫,光是線程池的創(chuàng)建就可以引出很多學(xué)問归榕。別看平時創(chuàng)建線程池是一句代碼的事,其實ThreadPoolExecutor提供了很靈活的定制方法吱涉。
歡迎留言和轉(zhuǎn)發(fā)刹泄,下一篇打算分析線程池如何執(zhí)行任務(wù)外里。