-
向Doug Lea大佬致敬
什么是線程池
線程池(Thread Pool)是一種基于池化思想管理線程的工具
為什么要使用線程池
- 線程池可以解決線程生命周期的系統(tǒng)開銷問題党晋,同時還可以加快響應(yīng)速度姐直。因?yàn)榫€程池中的線程是可以復(fù)用的饵骨,我們只用少量的線程去執(zhí)行大量的任務(wù),這就大大減小了線程生命周期的開銷。而且線程通常不是等接到任務(wù)后再臨時創(chuàng)建,而是已經(jīng)創(chuàng)建好時刻準(zhǔn)備執(zhí)行任務(wù)疟羹,這樣就消除了線程創(chuàng)建所帶來的延遲,提升了響應(yīng)速度禀倔,增強(qiáng)了用戶體驗(yàn)榄融。
- 線程池可以統(tǒng)籌內(nèi)存和CPU的使用,避免資源使用不當(dāng)救湖。線程池會根據(jù)配置和任務(wù)數(shù)量靈活地控制線程數(shù)量愧杯,不夠的時候就創(chuàng)建,太多的時候就回收捎谨,避免線程過多導(dǎo)致內(nèi)存溢出民效,或線程太少導(dǎo)致 CPU 資源浪費(fèi),達(dá)到了一個完美的平衡涛救。
- 線程池可以統(tǒng)一管理資源畏邢。比如線程池可以統(tǒng)一管理任務(wù)隊列和線程,可以統(tǒng)一開始或結(jié)束任務(wù)检吆,比單個線程逐一處理任務(wù)要更方便舒萎、更易于管理,同時也有利于數(shù)據(jù)統(tǒng)計蹭沛,比如我們可以很方便地統(tǒng)計出已經(jīng)執(zhí)行過的任務(wù)的數(shù)量臂寝。
怎樣使用線程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(5));
for (int i = 0; i < 15; i++) {
MyTask myTask = new MyTask(i);
executor.execute(myTask);
System.out.println("線程池中線程數(shù)目:" + executor.getPoolSize() + ",隊列中等待執(zhí)行的任務(wù)數(shù)目:" +
executor.getQueue().size() + "摊灭,已執(zhí)行玩別的任務(wù)數(shù)目:" + executor.getCompletedTaskCount());
}
executor.shutdown();
}
線程池原理(畫圖)
線程池各個參數(shù)的意義
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- 線程工廠 (ThreadFactory)
對于線程工廠threadFactory這個參數(shù)咆贬,我們可以使用默認(rèn)的defaultThreadFactory,也可以傳入自定義的有額外能力的線程工廠帚呼,因?yàn)槲覀兛赡苡卸鄠€線程池掏缎,而不同的線程池之間有必要通過不同的名字來進(jìn)行區(qū)分皱蹦,所以可以傳入能根據(jù)業(yè)務(wù)信息進(jìn)行命名的線程工廠,以便后續(xù)可以根據(jù)線程名區(qū)分不同的業(yè)務(wù)進(jìn)而快速定位問題代碼眷蜈。比如可以通過com.google.common.util.concurrent.ThreadFactory
Builder 來實(shí)現(xiàn)沪哺,如代碼所示。
ThreadFactoryBuilderbuilder = new ThreadFactoryBuilder();
ThreadFactoryrpcFactory = builder.setNameFormat("rpc-pool-%d").build();
我們生成了名字為rpcFactory的ThreadFactory酌儒,它的nameFormat為"rpc-pool-%d"辜妓,那么它生成的線程的名字是有固定格式的,它生成的線程的名字分別為"rpc-pool-1"忌怎,"rpc-pool-2" 籍滴,以此類推。
- 拒絕策略
最后一個參數(shù)是拒絕策略榴啸,我們可以根據(jù)業(yè)務(wù)需要异逐,選擇默認(rèn)的四種拒絕策略之一來使用:AbortPolicy,DiscardPolicy插掂,DiscardOldestPolicy或者CallerRunsPolicy。除此之外腥例,我們還可以通過實(shí)現(xiàn) RejectedExecutionHandler 接口來實(shí)現(xiàn)自己的拒絕策略辅甥,在接口中我們需要實(shí)現(xiàn) rejectedExecution 方法,在 rejectedExecution 方法中燎竖,執(zhí)行例如打印日志璃弄、暫存任務(wù)、重新執(zhí)行等自定義的拒絕策略构回,以便滿足業(yè)務(wù)需求夏块。如代碼所示。
private static class CustomRejectionHandler implements RejectedExecutionHandler{
@Override
public void rejectedExecution(Runnable r , ThreadPoolExecutor executor){
//打印日志纤掸、暫存任務(wù)脐供、重新執(zhí)行等拒絕策略
}
}
幾種常見的線程池
- FixedThreadPool
- CachedThreadPool
- ScheduledThreadPool
- SingleThreadExecutor
- SingleThreadScheduledExecutor
FixedThreadPool
它的核心線程數(shù)和最大線程數(shù)是一樣的,所以可以把它看作是固定線程數(shù)的線程池借跪,它的特點(diǎn)是線程池中的線程數(shù)除了初始階段需要從 0 開始增加外政己,之后的線程數(shù)量就是固定的,就算任務(wù)數(shù)超過線程數(shù)掏愁,線程池也不會再創(chuàng)建更多的線程來處理任務(wù)歇由,而是會把超出線程處理能力的任務(wù)放到任務(wù)隊列中進(jìn)行等待。而且就算任務(wù)隊列滿了果港,到了本該繼續(xù)增加線程數(shù)的時候沦泌,由于它的最大線程數(shù)和核心線程數(shù)是一樣的,所以也無法再增加新的線程了辛掠。
CachedThreadPool
第二種線程池是CachedThreadPool谢谦,可以稱作可緩存線程池,它的特點(diǎn)在于線程數(shù)是幾乎可以無限增加的(實(shí)際最大可以達(dá)到Integer.MAX_VALUE,為2^31-1船侧,這個數(shù)非常大,所以基本不可能達(dá)到)厅各,而當(dāng)線程閑置時還可以對線程進(jìn)行回收镜撩。也就是說該線程池的線程數(shù)量不是固定不變的,當(dāng)然它也有一個用于存儲提交任務(wù)的隊列队塘,但這個隊列是SynchronousQueue袁梗,隊列的容量為0,實(shí)際不存儲任何任務(wù)憔古,它只負(fù)責(zé)對任務(wù)進(jìn)行中轉(zhuǎn)和傳遞遮怜,所以效率比較高。
ScheduledThreadPool
第三個線程池是 ScheduledThreadPool鸿市,它支持定時或周期性執(zhí)行任務(wù)锯梁。比如每隔 10 秒鐘執(zhí)行一次任務(wù),而實(shí)現(xiàn)這種功能的方法主要有 3 種焰情,如代碼所示:
ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
service.schedule(new Task(), 10, TimeUnit.SECONDS);
service.scheduleAtFixedRate(new Task(), 10, 10, TimeUnit.SECONDS);
service.scheduleWithFixedDelay(new Task(), 10, 10, TimeUnit.SECONDS);
第一種方法 schedule 比較簡單陌凳,表示延遲指定時間后執(zhí)行一次任務(wù),如果代碼中設(shè)置參數(shù)為 10 秒内舟,也就是 10 秒后執(zhí)行一次任務(wù)后就結(jié)束合敦。
第二種方法scheduleAtFixedRate表示以固定的頻率執(zhí)行任務(wù),它的第二個參數(shù)initialDelay表示第一次延時時間验游,第三個參數(shù) period 表示周期充岛,也就是第一次延時后每次延時多長時間執(zhí)行一次任務(wù)。
第三種方法scheduleWithFixedDelay與第二種方法類似耕蝉,也是周期執(zhí)行任務(wù)崔梗,區(qū)別在于對周期的定義,之前的scheduleAtFixedRate是以任務(wù)開始的時間為時間起點(diǎn)開始計時赔硫,時間到就開始執(zhí)行第二次任務(wù)炒俱,而不管任務(wù)需要花多久執(zhí)行;而 scheduleWithFixedDelay 方法以任務(wù)結(jié)束的時間為下一次循環(huán)的時間起點(diǎn)開始計時爪膊。
SingleThreadExecutor
它會使用唯一的線程去執(zhí)行任務(wù)权悟,原理和FixedThreadPool是一樣的,只不過這里線程只有一個推盛,如果線程在執(zhí)行任務(wù)的過程中發(fā)生異常峦阁,線程池也會重新創(chuàng)建一個線程來執(zhí)行后續(xù)的任務(wù)。這種線程池由于只有一個線程耘成,所以非常適合用于所有任務(wù)都需要按被提交的順序依次執(zhí)行的場景榔昔,而前幾種線程池不一定能夠保障任務(wù)的執(zhí)行順序等于被提交的順序驹闰,因?yàn)樗鼈兪嵌嗑€程并行執(zhí)行的。
SingleThreadScheduledExecutor
第五個線程池是SingleThreadScheduledExecutor撒会,它實(shí)際和第三種ScheduledThreadPool線程池非常相似嘹朗,它只是 ScheduledThreadPool 的一個特例,內(nèi)部只有一個線程诵肛,如源碼所示:
new ScheduledThreadPoolExecutor(1)
為什么不應(yīng)該使用工廠方法自動創(chuàng)建線程池屹培?
怎樣停止線程池
shutdown()
第一種方法叫作shutdown(),它可以安全地關(guān)閉一個線程池怔檩,調(diào)用shutdown()方法之后線程池并不是立刻就被關(guān)閉褪秀,因?yàn)檫@時線程池中可能還有很多任務(wù)正在被執(zhí)行,或是任務(wù)隊列中有大量正在等待被執(zhí)行的任務(wù)薛训,調(diào)用shutdown()方法后線程池會在執(zhí)行完正在執(zhí)行的任務(wù)和隊列中等待的任務(wù)后才徹底關(guān)閉媒吗。但這并不代表shutdown()操作是沒有任何效果的,調(diào)用 shutdown() 方法后如果還有新的任務(wù)被提交乙埃,線程池則會根據(jù)拒絕策略直接拒絕后續(xù)新提交的任務(wù)闸英。
shutdownNow()
最后一個方法是shutdownNow(),也是5種方法里功能最強(qiáng)大的介袜,它與第一種shutdown方法不同之處在于名字中多了一個單詞Now自阱,也就是表示立刻關(guān)閉的意思。在執(zhí)行shutdownNow方法之后米酬,首先會給所有線程池中的線程發(fā)送interrupt中斷信號,嘗試中斷這些任務(wù)的執(zhí)行趋箩,然后會將任務(wù)隊列中正在等待的所有任務(wù)轉(zhuǎn)移到一個List中并返回赃额,我們可以根據(jù)返回的任務(wù) List 來進(jìn)行一些補(bǔ)救的操作,例如記錄在案并在后期重試叫确。shutdownNow() 的源碼如下所示跳芳。