構(gòu)造一個線程池為什么需要幾個參數(shù)液茎?如果避免線程池出現(xiàn)OOM?Runnable
和Callable
的區(qū)別是什么严嗜?本文將對這些問題一一解答晋涣,同時還將給出使用線程池的常見場景和代碼片段笨鸡。
基礎知識
Executors創(chuàng)建線程池
Java中創(chuàng)建線程池很簡單扮叨,只需要調(diào)用Executors
中相應的便捷方法即可缤弦,比如Executors.newFixedThreadPool(int nThreads)
,但是便捷不僅隱藏了復雜性彻磁,也為我們埋下了潛在的隱患(OOM碍沐,線程耗盡)狸捅。
Executors
創(chuàng)建線程池便捷方法列表:
方法名 | 功能 |
---|---|
newFixedThreadPool(int nThreads) | 創(chuàng)建固定大小的線程池 |
newSingleThreadExecutor() | 創(chuàng)建只有一個線程的線程池 |
newCachedThreadPool() | 創(chuàng)建一個不限線程數(shù)上限的線程池,任何提交的任務都將立即執(zhí)行 |
小程序使用這些快捷方法沒什么問題累提,對于服務端需要長期運行的程序尘喝,創(chuàng)建線程池應該直接使用ThreadPoolExecutor
的構(gòu)造方法。沒錯斋陪,上述Executors
方法創(chuàng)建的線程池就是ThreadPoolExecutor
朽褪。
ThreadPoolExecutor構(gòu)造方法
Executors
中創(chuàng)建線程池的快捷方法,實際上是調(diào)用了ThreadPoolExecutor
的構(gòu)造方法(定時任務使用的是ScheduledThreadPoolExecutor
)无虚,該類構(gòu)造方法參數(shù)列表如下:
// Java線程池的完整構(gòu)造函數(shù)
public ThreadPoolExecutor(
int corePoolSize, // 線程池長期維持的線程數(shù)缔赠,即使線程處于Idle狀態(tài),也不會回收骑科。
int maximumPoolSize, // 線程數(shù)的上限
long keepAliveTime, TimeUnit unit, // 超過corePoolSize的線程的idle時長,
// 超過這個時間构拳,多余的線程會被回收咆爽。
BlockingQueue<Runnable> workQueue, // 任務的排隊隊列
ThreadFactory threadFactory, // 新線程的產(chǎn)生方式
RejectedExecutionHandler handler) // 拒絕策略
竟然有7個參數(shù),很無奈置森,構(gòu)造一個線程池確實需要這么多參數(shù)斗埂。這些參數(shù)中,比較容易引起問題的有corePoolSize
, maximumPoolSize
, workQueue
以及handler
:
-
corePoolSize
和maximumPoolSize
設置不當會影響效率凫海,甚至耗盡線程呛凶; -
workQueue
設置不當容易導致OOM; -
handler
設置不當會導致提交任務時拋出異常行贪。
正確的參數(shù)設置方式會在下文給出漾稀。
線程池的工作順序
If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.
If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.
If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.
corePoolSize -> 任務隊列 -> maximumPoolSize -> 拒絕策略
Runnable和Callable
可以向線程池提交的任務有兩種:Runnable
和Callable
,二者的區(qū)別如下:
- 方法簽名不同建瘫,
void Runnable.run()
,V Callable.call() throws Exception
- 是否允許有返回值崭捍,
Callable
允許有返回值 - 是否允許拋出異常,
Callable
允許拋出異常啰脚。
Callable
是JDK1.5時加入的接口殷蛇,作為Runnable
的一種補充,允許有返回值橄浓,允許拋出異常粒梦。
三種提交任務的方式:
提交方式 | 是否關心返回結(jié)果 |
---|---|
Future<T> submit(Callable<T> task) |
是 |
void execute(Runnable command) |
否 |
Future<?> submit(Runnable task) |
否,雖然返回Future荸实,但是其get()方法總是返回null |
如何正確使用線程池
避免使用無界隊列
不要使用Executors.newXXXThreadPool()
快捷方法創(chuàng)建線程池匀们,因為這種方式會使用無界的任務隊列,為避免OOM准给,我們應該使用ThreadPoolExecutor
的構(gòu)造方法手動指定隊列的最大長度:
ExecutorService executorService = new ThreadPoolExecutor(2, 2,
0, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(512), // 使用有界隊列昼蛀,避免OOM
new ThreadPoolExecutor.DiscardPolicy());
明確拒絕任務時的行為
任務隊列總有占滿的時候宴猾,這是再submit()
提交新的任務會怎么樣呢?RejectedExecutionHandler
接口為我們提供了控制方式叼旋,接口定義如下:
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
線程池給我們提供了幾種常見的拒絕策略:
拒絕策略 | 拒絕行為 |
---|---|
AbortPolicy | 拋出RejectedExecutionException |
DiscardPolicy | 什么也不做仇哆,直接忽略 |
DiscardOldestPolicy | 丟棄執(zhí)行隊列中最老的任務,嘗試為當前提交的任務騰出位置 |
CallerRunsPolicy | 直接由提交任務者執(zhí)行這個任務 |
線程池默認的拒絕行為是AbortPolicy
夫植,也就是拋出RejectedExecutionHandler
異常讹剔,該異常是非受檢異常,很容易忘記捕獲详民。如果不關心任務被拒絕的事件延欠,可以將拒絕策略設置成DiscardPolicy
,這樣多余的任務會悄悄的被忽略沈跨。
ExecutorService executorService = new ThreadPoolExecutor(2, 2,
0, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(512),
new ThreadPoolExecutor.DiscardPolicy());// 指定拒絕策略
獲取處理結(jié)果和異常
線程池的處理結(jié)果由捎、以及處理過程中的異常都被包裝到Future
中,并在調(diào)用Future.get()
方法時獲取饿凛,執(zhí)行過程中的異常會被包裝成ExecutionException
狞玛,submit()
方法本身不會傳遞結(jié)果和任務執(zhí)行過程中的異常。獲取執(zhí)行結(jié)果的代碼可以這樣寫:
ExecutorService executorService = Executors.newFixedThreadPool(4);
Future<Object> future = executorService.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
throw new RuntimeException("exception in call~");// 該異常會在調(diào)用Future.get()時傳遞給調(diào)用者
}
});
try {
Object result = future.get();
} catch (InterruptedException e) {
// interrupt
} catch (ExecutionException e) {
// exception in Callable.call()
e.printStackTrace();
}
上述代碼輸出類似如下:
線程池的常用場景
正確構(gòu)造線程池
int poolSize = Runtime.getRuntime().availableProcessors() * 2;
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(512);
RejectedExecutionHandler policy = new ThreadPoolExecutor.DiscardPolicy();
executorService = new ThreadPoolExecutor(poolSize, poolSize,
0, TimeUnit.SECONDS,
queue,
policy);
獲取單個結(jié)果
過submit()
向線程池提交任務后會返回一個Future
涧窒,調(diào)用V Future.get()
方法能夠阻塞等待執(zhí)行結(jié)果心肪,V get(long timeout, TimeUnit unit)
方法可以指定等待的超時時間。
獲取多個結(jié)果
如果向線程池提交了多個任務纠吴,要獲取這些任務的執(zhí)行結(jié)果硬鞍,可以依次調(diào)用Future.get()
獲得。但對于這種場景戴已,我們更應該使用ExecutorCompletionService固该,該類的take()
方法總是阻塞等待某一個任務完成,然后返回該任務的Future
對象糖儡。向CompletionService
批量提交任務后蹬音,只需調(diào)用相同次數(shù)的CompletionService.take()
方法,就能獲取所有任務的執(zhí)行結(jié)果休玩,獲取順序是任意的著淆,取決于任務的完成順序:
void solve(Executor executor, Collection<Callable<Result>> solvers)
throws InterruptedException, ExecutionException {
CompletionService<Result> ecs = new ExecutorCompletionService<Result>(executor);// 構(gòu)造器
for (Callable<Result> s : solvers)// 提交所有任務
ecs.submit(s);
int n = solvers.size();
for (int i = 0; i < n; ++i) {// 獲取每一個完成的任務
Result r = ecs.take().get();
if (r != null)
use(r);
}
}
單個任務的超時時間
V Future.get(long timeout, TimeUnit unit)
方法可以指定等待的超時時間,超時未完成會拋出TimeoutException
拴疤。
多個任務的超時時間
等待多個任務完成永部,并設置最大等待時間,可以通過CountDownLatch完成:
public void testLatch(ExecutorService executorService, List<Runnable> tasks)
throws InterruptedException{
CountDownLatch latch = new CountDownLatch(tasks.size());
for(Runnable r : tasks){
executorService.submit(new Runnable() {
@Override
public void run() {
try{
r.run();
}finally {
latch.countDown();// countDown
}
}
});
}
latch.await(10, TimeUnit.SECONDS); // 指定超時時間
}
線程池和裝修公司
以運營一家裝修公司做個比喻呐矾。公司在辦公地點等待客戶來提交裝修請求苔埋;公司有固定數(shù)量的正式工以維持運轉(zhuǎn);旺季業(yè)務較多時蜒犯,新來的客戶請求會被排期组橄,比如接單后告訴用戶一個月后才能開始裝修荞膘;當排期太多時,為避免用戶等太久玉工,公司會通過某些渠道(比如人才市場羽资、熟人介紹等)雇傭一些臨時工(注意,招聘臨時工是在排期排滿之后)遵班;如果臨時工也忙不過來屠升,公司將決定不再接收新的客戶,直接拒單狭郑。
線程池就是程序中的“裝修公司”腹暖,代勞各種臟活累活。上面的過程對應到線程池上:
// Java線程池的完整構(gòu)造函數(shù)
public ThreadPoolExecutor(
int corePoolSize, // 正式工數(shù)量
int maximumPoolSize, // 工人數(shù)量上限翰萨,包括正式工和臨時工
long keepAliveTime, TimeUnit unit, // 臨時工游手好閑的最長時間脏答,超過這個時間將被解雇
BlockingQueue<Runnable> workQueue, // 排期隊列
ThreadFactory threadFactory, // 招人渠道
RejectedExecutionHandler handler) // 拒單方式
總結(jié)
Executors
為我們提供了構(gòu)造線程池的便捷方法,對于服務器程序我們應該杜絕使用這些便捷方法亩鬼,而是直接使用線程池ThreadPoolExecutor
的構(gòu)造方法殖告,避免無界隊列可能導致的OOM以及線程個數(shù)限制不當導致的線程數(shù)耗盡等問題。ExecutorCompletionService
提供了等待所有任務執(zhí)行結(jié)束的有效方式辛孵,如果要設置等待的超時時間丛肮,則可以通過CountDownLatch
完成赡磅。