線程池
Executors 目前提供了 5 種不同的線程池創(chuàng)建配置:
- newCachedThreadPool()峰弹,它是一種用來處理大量短時間工作任務(wù)的線程池,具有幾個鮮明特點:它會試圖緩存線程并重用,當(dāng)無緩存線程可用時,就會創(chuàng)建新的工作線程桨啃;如果線程閑置的時間超過 60 秒凰盔,則被終止并移出緩存;長時間閑置時紊扬,這種線程池,不會消耗什么資源唉擂。其內(nèi)部使用 SynchronousQueue 作為工作隊列餐屎。
- newFixedThreadPool(int nThreads),重用指定數(shù)目(nThreads)的線程池玩祟,其背后使用的是無界的工作隊列腹缩,任何時候最多有 nThreads 個工作線程是活動的。這意味著空扎,如果任務(wù)數(shù)量超過了活動隊列數(shù)目藏鹊,將在工作隊列中等待空閑線程出現(xiàn);如果有工作線程退出勺卢,將會有新的工作線程被創(chuàng)建伙判,以補足指定的數(shù)目 nThreads。
- newSingleThreadExecutor()黑忱,它的特點在于工作線程數(shù)目被限制為 1宴抚,操作一個無界的工作隊列,所以它保證了所有任務(wù)的都是被順序執(zhí)行甫煞,最多會有一個任務(wù)處于活動狀態(tài)菇曲,并且不允許使用者改動線程池實例,因此可以避免其改變線程數(shù)目抚吠。
- newSingleThreadScheduledExecutor() 和 newScheduledThreadPool(int corePoolSize)常潮,創(chuàng)建的是個 ScheduledExecutorService,可以進行定時或周期性的工作調(diào)度楷力,區(qū)別在于單一工作線程還是多個工作線程喊式。
-
newWorkStealingPool(int parallelism),這是一個經(jīng)常被人忽略的線程池萧朝,Java 8 才加入這個創(chuàng)建方法岔留,其內(nèi)部會構(gòu)建ForkJoinPool,利用Work-Stealing算法检柬,并行地處理任務(wù)献联,不保證處理順序。
線程池構(gòu)造方法幾個參數(shù)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
- corePoolSize : 該線程池中核心線程數(shù)最大值
核心線程:線程池新建線程的時候,如果當(dāng)前線程總數(shù)小于 corePoolSize 里逆,則新建的是核心線程进胯;如果超過corePoolSize,則新建的是非核心線程原押。核心線程默認情況下會一直存活在線程池中胁镐,即使這個核心線程啥也不干(閑置狀態(tài))。如果指定ThreadPoolExecutor的 allowCoreThreadTimeOut 這個屬性為true诸衔,那么核心線程如果不干活(閑置狀態(tài))的話希停,超過一定時間( keepAliveTime),就會被銷毀掉. - maximumPoolSize :該線程池中線程總數(shù)的最大值
線程總數(shù)計算公式 = 核心線程數(shù) + 非核心線程數(shù)署隘。 - keepAliveTime :該線程池中非核心線程閑置超時時長
注意:一個非核心線程,如果不干活(閑置狀態(tài))的時長亚隙,超過這個參數(shù)所設(shè)定的時長磁餐,就會被銷毀掉。但是阿弃,如果設(shè)置了 allowCoreThreadTimeOut = true诊霹,則會作用于核心線程。 - unit :(時間單位)
首先渣淳,TimeUnit是一個枚舉類型脾还,翻譯過來就是時間單位,我們最常用的時間單位包括:
MILLISECONDS : 1毫秒 入愧、SECONDS : 秒鄙漏、MINUTES : 分、HOURS : 小時棺蛛、DAYS : 天 - BlockingQueue<Runnable> workQueue :( 阻塞隊列)怔蚌,主要有四種阻塞隊列
- SynchronousQueue:(同步隊列)這個隊列接收到任務(wù)的時候,會直接提交給線程處理旁赊,而不保留它(名字定義為 同步隊列)桦踊,所有該隊列跟設(shè)置的corePoolSize無效。但有一種情況终畅,假設(shè)所有線程都在工作怎么辦籍胯?這種情況下,SynchronousQueue就會新建一個線程來處理這個任務(wù)离福。所以為了保證不出現(xiàn)(線程數(shù)達到了maximumPoolSize而不能新建線程)的錯誤杖狼,使用這個類型隊列的時候,maximumPoolSize一般指定成Integer.MAX_VALUE术徊,即無限大本刽,去規(guī)避這個使用風(fēng)險。
- LinkedBlockingQueue(鏈表阻塞隊列):這個隊列接收到任務(wù)的時候,如果當(dāng)前線程數(shù)小于核心線程數(shù)子寓,則新建線程(核心線程)處理任務(wù)暗挑;如果當(dāng)前線程數(shù)等于核心線程數(shù),則進入隊列等待斜友。由于這個隊列沒有最大值限制炸裆,即所有超過核心線程數(shù)的任務(wù)都將被添加到隊列中,這也就導(dǎo)致了maximumPoolSize的設(shè)定失效鲜屏,因為總線程數(shù)永遠不會超過corePoolSize
- ArrayBlockingQueue(數(shù)組阻塞隊列):可以限定隊列的長度(既然是數(shù)組烹看,那么就限定了大小)洛史,接收到任務(wù)的時候惯殊,如果沒有達到corePoolSize的值,則新建線程(核心線程)執(zhí)行任務(wù)也殖,如果達到了土思,則入隊等候,如果隊列已滿忆嗜,則新建線程(非核心線程)執(zhí)行任務(wù)己儒,又如果總線程數(shù)到了maximumPoolSize,并且隊列也滿了捆毫,則發(fā)生錯誤
- DelayQueue(延遲隊列):隊列內(nèi)元素必須實現(xiàn)Delayed接口闪湾,這就意味著你傳進去的任務(wù)必須先實現(xiàn)Delayed接口。這個隊列接收到任務(wù)時绩卤,首先先入隊途样,只有達到了指定的延時時間,才會執(zhí)行任務(wù)
- ThreadFactory threadFactory = > 創(chuàng)建線程的方式省艳,這是一個接口娘纷,new它的時候需要實現(xiàn)他的Thread newThread(Runnable r)方法
- RejectedExecutionHandler handler = > 這個主要是用來拋異常的
當(dāng)線程無法執(zhí)行新任務(wù)時(一般是由于線程池中的線程數(shù)量已經(jīng)達到最大數(shù)或者線程池關(guān)閉導(dǎo)致的),默認情況下跋炕,當(dāng)線程池?zé)o法處理新線程時赖晶,會拋出一個RejectedExecutionException。
以 LinkedBlockingQueue辐烂、ArrayBlockingQueue 和 SynchronousQueue 為例遏插,我們一起來分析一下,根據(jù)需求可以從很多方面考量:
ArrayBlockingQueue ,LinkedBlockingQueue ,SynchronousQueue
- 從空間利用角度纠修,數(shù)組結(jié)構(gòu)的 ArrayBlockingQueue 要比 LinkedBlockingQueue 緊湊胳嘲,因為其不需要創(chuàng)建所謂節(jié)點,但是其初始分配階段就需要一段連續(xù)的空間扣草,所以初始內(nèi)存需求更大了牛。
- 通用場景中颜屠,LinkedBlockingQueue 的吞吐量一般優(yōu)于 ArrayBlockingQueue,因為它實現(xiàn)了更加細粒度的鎖操作鹰祸。
- ArrayBlockingQueue 實現(xiàn)比較簡單甫窟,性能更好預(yù)測,屬于表現(xiàn)穩(wěn)定的“選手”蛙婴。
- 如果我們需要實現(xiàn)的是兩個線程之間接力性(handoff)的場景粗井,按照專欄上一講的例子,你可能會選擇 CountDownLatch街图,但是SynchronousQueue也是完美符合這種場景的浇衬,而且線程間協(xié)調(diào)和數(shù)據(jù)傳輸統(tǒng)一起來,代碼更加規(guī)范餐济。
- 可能令人意外的是耘擂,很多時候 SynchronousQueue 的性能表現(xiàn),往往大大超過其他實現(xiàn)絮姆,尤其是在隊列元素較小的場景梳星。
重點源碼分享
okhttp
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
AsyncTask
private static final BlockingQueue<Runnable> sPoolWorkQueue =
new LinkedBlockingQueue<Runnable>(128);
/**
* An {@link Executor} that can be used to execute tasks in parallel.
*/
public static final Executor THREAD_POOL_EXECUTOR;
static {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
sPoolWorkQueue, sThreadFactory);
threadPoolExecutor.allowCoreThreadTimeOut(true);
THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
private static class SerialExecutor implements Executor {
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
Runnable mActive;
public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (mActive == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}