本文對 Java 中的線程池的使用進(jìn)行學(xué)習(xí)封孙。是對以下文章的摘錄:
- JDK 源碼
- 阿里編程規(guī)范插件提示
Executor
Executor 接口
Java 類庫中任務(wù)執(zhí)行的抽象接口是 Executor
。這個接口使得任務(wù)提交和任務(wù)如何被執(zhí)行(包括線程使用細(xì)節(jié)亿乳、調(diào)度等)得到了解耦硝拧。該接口如下所示:
/*
* @since 1.5
* @author Doug Lea
*/
public interface Executor {
/**
* 在將來某個時刻執(zhí)行任務(wù)( command )。這個任務(wù)可以通過這些方式執(zhí)行:在一個新的線程中執(zhí)行葛假、
* 在一個緩存的線程中執(zhí)行障陶、在調(diào)用者線程中執(zhí)行。具體取決于 Executor 的實現(xiàn)方式聊训。
*
* @param command 需要運(yùn)行的任務(wù)
* @throws RejectedExecutionException 如果執(zhí)行器不接受任務(wù)抱究,則拋出改異常。
* @throws NullPointerException 如果任務(wù)為 null
*/
void execute(Runnable command);
}
使用 Executor 執(zhí)行任務(wù)
Executor
經(jīng)常被用來代替顯示地創(chuàng)建線程带斑。
自己手工創(chuàng)建線程執(zhí)行(不建議):
new Thread(new RunnableTask()).start()
使用 Executor
執(zhí)行:
Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
...
Executor 的執(zhí)行方案
Executor
接口并不強(qiáng)制異步執(zhí)行任務(wù)鼓寺。一個簡單的例子是,執(zhí)行器可以在調(diào)用線程中立刻運(yùn)行提交任務(wù)勋磕。
class DirectExecutor implements Executor {
public void execute(Runnable r) {
r.run();
}
}}
更加典型的方案是妈候,在其它線程而非調(diào)用者線程中執(zhí)行任務(wù)。下面的 executor 為每個任務(wù)創(chuàng)建了一個執(zhí)行線程挂滓。
class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start();
}
}}
很多 Executor
的實現(xiàn)對任務(wù)何時以何種方式執(zhí)行都會添加一些限制苦银。下面的執(zhí)行器將提交的任務(wù)按順序在另一個執(zhí)行器中執(zhí)行,成為一個組合執(zhí)行器杂彭。
class SerialExecutor implements Executor {
final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
final Executor executor;
Runnable active;
SerialExecutor(Executor executor) {
this.executor = executor;
}
public synchronized void execute(final Runnable r) {
tasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
// 每個任務(wù)執(zhí)行完后墓毒,執(zhí)行下一個任務(wù)
scheduleNext();
}
}
});
// 沒有任務(wù)的時候吓揪,啟動此任務(wù)
if (active == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
// 當(dāng)下一個任務(wù)存在時亲怠,執(zhí)行任務(wù)
if ((active = tasks.poll()) != null) {
executor.execute(active);
}
}
}}
ExecutorService
ExecutorService 接口
Executor
有一個擴(kuò)展接口 ExecutorService
。ExecutorService
能夠提供方法來管理終止柠辞,也能夠創(chuàng)建 Future
來跟蹤一個或多個任務(wù)的異步執(zhí)行過程团秽。該接口如下圖所示:
ExecutorService 關(guān)閉
ExecutorService
可以被關(guān)閉,關(guān)閉后它會拒絕新的任務(wù)。有兩種關(guān)閉 ExecutorService
的方法:
-
shutdown
:該方法將會保證關(guān)閉之前提交的任務(wù)會在關(guān)閉前被執(zhí)行习勤。 -
shutdownNow
:將會阻止任務(wù)的啟動踪栋,并且嘗試停止當(dāng)前執(zhí)行的任務(wù)。
一旦 ExecutorService
被關(guān)閉(termination)图毕,執(zhí)行器將會沒有正在運(yùn)行的任務(wù)夷都、沒有正在等待執(zhí)行的任務(wù)、沒有新的任務(wù)被提交予颤。一個沒有使用的 ExecutorService
應(yīng)該被關(guān)閉囤官,從而使得資源可以被回收,
任務(wù)執(zhí)行
submit
方法擴(kuò)展于 Executor
中的基本方法 execute(Runnable)
蛤虐,該方法創(chuàng)建和返回了一個 Future
對象党饮,通過這個對象可以取消任務(wù)的執(zhí)行或者等待任務(wù)的結(jié)束。
invokeAny
和 invokeAll
方法可以用來執(zhí)行非常通用有效的批量執(zhí)行驳庭。執(zhí)行一批任務(wù)刑顺,等待它們中的一個或者全部執(zhí)行完成。ExecutorCompletionService
可以用來實現(xiàn)這些方法的變體饲常。
使用例子
下面是一個網(wǎng)絡(luò)服務(wù)的例子蹲堂。該例子通過線程池中的線程來服務(wù)到達(dá)的請求,使用到了Executors
中的 newFixedThreadPool
方法不皆。
class NetworkService implements Runnable {
private final ServerSocket serverSocket;
private final ExecutorService pool;
public NetworkService(int port, int poolSize)
throws IOException {
serverSocket = new ServerSocket(port);
pool = Executors.newFixedThreadPool(poolSize);
}
public void run() { // run the service
try {
for (;;) {
pool.execute(new Handler(serverSocket.accept()));
}
} catch (IOException ex) {
pool.shutdown();
}
}
}
class Handler implements Runnable {
private final Socket socket;
Handler(Socket socket) { this.socket = socket; }
public void run() {
// read and service request on socket
}
}}
下面的代碼展示了通過兩個階段關(guān)閉 ExecutorService
:
- 首先贯城,調(diào)用
shutdown
來拒絕之后到達(dá)的任務(wù)。 - 然后霹娄,如果有必要的話能犯,可調(diào)用
shutdownNow
來取消滯留的任務(wù)。
void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}}
ThreadPoolExecutor
ThreadPoolExecutor
類提供了一個可擴(kuò)展的線程池實現(xiàn)犬耻。
基本介紹
線程池用來解決兩個不同方面的問題:
- 由于減少了每個任務(wù)的調(diào)用開銷踩晶,通常可以在執(zhí)行大量異步任務(wù)的時候提高性能枕磁。
- 提供了資源控制和管理的渡蜻。
每個 ThreadPoolExecutor
具有一些基本的統(tǒng)計,例如:任務(wù)執(zhí)行完成的數(shù)量计济。
為了適應(yīng)廣泛的應(yīng)用場景茸苇,該類提高了很多可以調(diào)整的參數(shù)以及可以擴(kuò)展的鉤子。此外沦寂,更簡單的一種方式是使用 Executors
中的一些工廠方法学密,包括:
-
newCachedThreadPool
:沒有邊界,自帶線程復(fù)用传藏。 -
newFixedThreadPool
:固定線程池大小腻暮。 -
newSingleThreadExecutor
: 單線程彤守。
這些覆蓋了大部分通用的場景。
構(gòu)造函數(shù)
當(dāng)需要手工配置這個類或者對其參數(shù)進(jìn)行調(diào)整時哭靖,就需要了解其構(gòu)造函數(shù)具垫。ThreadPoolExecutor
有很多構(gòu)造函數(shù),下面是最常見的形式试幽。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) { ... }
corePoolSize and maximumPoolSize
ThreadPoolExecutor
將自動通過基本大畜莶稀(corePoolSize)、最大大衅涛搿(maximumPoolSize)來調(diào)整線程池的大小饰及。
當(dāng)有新的任務(wù)通過 execute(Runnable)
方法提交時:
- 如果當(dāng)前運(yùn)行的線程數(shù)目小于基本大小(corePoolSize)康震,即使有其它空閑線程燎含,也將會創(chuàng)建一個新的線程來處理這個請求。
- 如果當(dāng)前運(yùn)行的線程數(shù)目大于基本大型榷獭(corePoolSize)屏箍,小于最大大小(maximumPoolSize)橘忱,并且緩存的隊列(queue)滿的時候赴魁,將會創(chuàng)建新的線程。
設(shè)置示例:
- 通過設(shè)置
corePoolSize
和maximumPoolSize
相同钝诚,你可以創(chuàng)建一個固定大杏庇(fixed-size)的線程池。 - 通過將
maximumPoolSize
設(shè)置成一個很大的值凝颇,比如:Integer.MAX_VALUE
潘拱,可以使線程池容納任意數(shù)量的并發(fā)任務(wù)。
通常拧略,這些參數(shù)在構(gòu)造函數(shù)中被設(shè)置芦岂,但是它們也可以通過這些方法動態(tài)改變:
setCorePoolSize
setMaximumPoolSize
keepAliveTime
如果線程池中當(dāng)前執(zhí)行的線程數(shù)目大于 corePoolSize
,那么對于多出的線程垫蛆,當(dāng)它們空閑的時間超過 keepAliveTime
時禽最,它們將被終止。
這樣的機(jī)制袱饭,使得當(dāng)線程池不活躍的時候川无,可以減少資源的消耗。當(dāng)線程池變得活躍的時候虑乖,新的線程會被創(chuàng)建懦趋。這個參數(shù)也通過 setKeepAliveTime(long TimeUnit)
方法動態(tài)改變。
通過設(shè)置這個參數(shù)為 Long.MAX_VALUE
TimeUnit.NANOSECONDS
决左,可以禁止空閑的線程被終止愕够。
默認(rèn)情況下,這個 keep-alive
策略只會在當(dāng)前線程數(shù)目超過 corePoolSize
的時候才會起作用佛猛,也可以通過 allowCoreThreadTimeOut(boolean)
來控制惑芭,此時 keepAliveTime
的值不能為0.
Queuing
BlockingQueue
可以用來轉(zhuǎn)移和持有提交的任務(wù)。它的使用時和線程池的大小相關(guān)的:
- 如果小于
corePoolSize
數(shù)目的線程在運(yùn)行继找,那么Executor
傾向于創(chuàng)建新的線程來執(zhí)行任務(wù)遂跟,而不是將任務(wù)緩存到隊列中。 - 如果大于等于
corePoolSize
數(shù)目的線程在運(yùn)行婴渡,那么Executor
傾向于將任務(wù)緩存到隊列中幻锁,而不是創(chuàng)建新的線程。 - 如果請求達(dá)到限制無法被緩存到隊列中边臼,那么一個新的線程將會被創(chuàng)建哄尔,當(dāng)創(chuàng)建的線程數(shù)將要超過
maximumPoolSize
時,新的任務(wù)將會被拒絕柠并。
以下是一些通用的隊列策略:
- 直接切換:工作隊列一個好的默認(rèn)選擇可以是
SynchronousQueue
岭接,它可以將任務(wù)交給線程執(zhí)行,并且不需要緩存任務(wù)臼予。當(dāng)沒有線程可用的時候鸣戴,嘗試緩存任務(wù)到隊列中將會立即失敗,因此一個新的線程將會被創(chuàng)建粘拾。這個策略可以避免由于任務(wù)間存在內(nèi)部依賴造成的死機(jī)窄锅。直接切換通常需要一個沒有限制的maximumPoolSizes
,從而避免拒絕新的任務(wù)缰雇,但當(dāng)處理不夠及時杜漠,線程數(shù)目也會持續(xù)增加访敌。 - 無界隊列:使用無界隊列(例如:
LinkedBlockingQueue
沒有預(yù)先定義隊列容量),如果所有的corePoolSize
線程都很忙碌,那么新的任務(wù)將會被保存在隊列中進(jìn)行等待俱饿。因此,線程數(shù)永遠(yuǎn)不會超過corePoolSize
谋逻,而maximumPoolSize
的值在這里也不會起作用塔插。這樣的策略比較適合于每個任務(wù)都是獨立執(zhí)行的場景。例如糯崎,在 Web 頁面服務(wù)中几缭,隊列可以用來平滑瞬時的訪問高峰。 - 有界隊列:一個有界限的隊列(例如:
ArrayBlockingQueue
)可以防止maximumPoolSizes
被設(shè)置為無限大時造成的資源耗盡沃呢。隊列的大小和線程池大小可以互相調(diào)和:使用大的隊列和小的線程池可以降低 CPU 使用率年栓、操作系統(tǒng)資源占有、上下文切換的開銷薄霜,但是會導(dǎo)致較低的吞吐量某抓。如果任務(wù)經(jīng)常被阻塞(I/O 受限)纸兔,系統(tǒng)可能會調(diào)度更多的線程,超過你開始的限制否副。當(dāng)使用小隊列的時候汉矿,需要使用較大的線程池大小,這可以使得 CPU 更加忙碌备禀,但是可能會由于頻繁的上下文切換洲拇,導(dǎo)致吞吐量下降。
ThreadFactory
ThreadFactory
是用來創(chuàng)建新的線程曲尸。下面是該接口的聲明赋续。
/**
* @since 1.5
* @author Doug Lea
*/
public interface ThreadFactory {
/**
* 構(gòu)建線程 Thread。實現(xiàn)中可以設(shè)置優(yōu)先權(quán)另患、名字纽乱、守護(hù)狀態(tài)、線程組等昆箕。
*
* @param r 一個可以被線程實例運(yùn)行的任務(wù)
* @return 創(chuàng)建的線程或者 null (創(chuàng)建被拒絕)
*/
Thread newThread(Runnable r);
}
該接口一個簡單的實現(xiàn)為:
class SimpleThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
return new Thread(r);
}
}}
Executors
中的 defaultThreadFactory
方法提供了一個更加簡單實用的實現(xiàn)迫淹,可以設(shè)置線程環(huán)境為已知值。
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
Rejected tasks
在以下兩種情況下为严,新的任務(wù)將會被拒絕:
-
Executor
被關(guān)閉敛熬。 -
Executor
使用有界的線程池大小和工作隊列容量后,達(dá)到飽和第股。
任何一種情況下应民,都會調(diào)用 RejectedExecutionHandler
中的 rejectedExecution
方法。
預(yù)先定義的一些拒絕策略包括:
- 默認(rèn)是
ThreadPoolExecutor.AbortPolicy
夕吻,它在拒絕時會拋出一個運(yùn)行時異常RejectedExecutionException
诲锹。 -
ThreadPoolExecutor.CallerRunsPolicy
策略是讓調(diào)用者自己來運(yùn)行這個任務(wù)。這實現(xiàn)了一個簡單的反饋控制機(jī)制涉馅,來降低新任務(wù)的提交速率归园。 -
ThreadPoolExecutor.DiscardPolicy
方法直接丟棄任務(wù)。 -
ThreadPoolExecutor.DiscardOldestPolicy
如果 executor 沒有被關(guān)閉稚矿,那么丟棄隊列頭上的任務(wù)庸诱,任務(wù)執(zhí)行會再次嘗試。
擴(kuò)展例子
很多基于該類的擴(kuò)展都是覆蓋一個或多個受保護(hù)的函數(shù)晤揣。下面例子就展示了一個子類桥爽,該之類添加了簡單的暫停和恢復(fù)特性:
class PausableThreadPoolExecutor extends ThreadPoolExecutor {
private boolean isPaused;
private ReentrantLock pauseLock = new ReentrantLock();
private Condition unpaused = pauseLock.newCondition();
public PausableThreadPoolExecutor(...) { super(...); }
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
pauseLock.lock();
try {
while (isPaused) unpaused.await();
} catch (InterruptedException ie) {
t.interrupt();
} finally {
pauseLock.unlock();
}
}
public void pause() {
pauseLock.lock();
try {
isPaused = true;
} finally {
pauseLock.unlock();
}
}
public void resume() {
pauseLock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
pauseLock.unlock();
}
}
}}
Executors
Executors
類提供了方便的工廠方法來創(chuàng)建這些執(zhí)行器。
newFixedThreadPool
該方法創(chuàng)建的線程池可以重復(fù)使用固定數(shù)目的線程昧识,使用無限制的隊列钠四。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
newSingleThreadExecutor
單線程執(zhí)行,使用無限制的隊列跪楞。
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
newCachedThreadPool
使用無限制的線程池缀去,空線程超過60秒被回收侣灶,線程執(zhí)行采用直接交付策略。
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
newScheduledThreadPool
采用 ScheduledThreadPoolExecutor
缕碎。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
newWorkStealingPool
采用 ForkJoinPool
, 開始于 JDK 1.8褥影。
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
阿里編程規(guī)范
使用說明
線程池不允許實用 Executor
去創(chuàng)建,而是通過 ThreadPoolExecutor
的方式阎曹,這樣的處理方式可以更加明確線程次的運(yùn)行規(guī)則,規(guī)避資源耗盡的風(fēng)險煞檩。
Executor
各個方法的弊端說明:
-
newFixedThreadPool
和newSingleThreadPool
:主要問題是堆積的請求處理隊列可能會耗費非常大的內(nèi)存处嫌,甚至 OOM。 -
newCachedThreadPool
和newScheduledThreadPool
:主要問題是線程的最大數(shù)是Integer.MAX_VALUE
斟湃,可能會創(chuàng)建非常多的線程數(shù)熏迹,甚至 OOM。
使用示例
例子1:
// org.apache.commons.lang3.concurrent.BasicThreadFactory
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());
例子2:
// com.google.common.util.concurrent.ThreadFactoryBuilder
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("demo-pool-%d").build();
// Common Thread Pool
ExecutorService pool = new ThreadPoolExecutor(5,200,
0L,TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<Runnable>(1024),namedThreadFactory,
new ThreadPoolExecutor.AbortPolicy());
pool.execute(()-> System.out.println(Thread.currentThread().getName()));
pool.shutdown(); // gracefully shutdown
例子3:
<bean id="userThreadPool"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="10" />
<property name="maxPoolSize" value="100" />
<property name="queueCapacity" value="2000" />
<property name="thradFactory" value= thradFactory />
<property name="rejectedExecutionHandler">
<ref local="rejectedExecutionHandler">
</property>
</bean>