[TOC]
執(zhí)行器(Executor)層次
Executor
Executor即為執(zhí)行器,是執(zhí)行器框架的頂層接口软能,定義最為基礎(chǔ)的框架功能:執(zhí)行任務(wù)。
- 接口定義如下:
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
- Runnable:任務(wù)抽象
執(zhí)行器接口定義了執(zhí)行器的任務(wù)執(zhí)行模型,指定了任務(wù)的抽象為Runnable接口逸寓。Runnable接口:
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
Runnable是一個(gè)函數(shù)式接口若治,內(nèi)部唯一抽象方法run方法無(wú)異常拋出慨蓝,無(wú)返回值。Thread類(lèi)是Runnable接口的實(shí)現(xiàn)類(lèi)端幼,所以可以把一個(gè)線程抽象成一個(gè)任務(wù)提交給執(zhí)行器來(lái)執(zhí)行礼烈,在這種應(yīng)用場(chǎng)景下,Thread實(shí)例不是作為一個(gè)線程婆跑,而是作為一個(gè)任務(wù)的封裝此熬,因?yàn)橹皇钦{(diào)用其run方法,不會(huì)觸及線程的start()滑进。
無(wú)返回值且無(wú)異常拋出的run方法給任務(wù)模型帶來(lái)局限性犀忱,即無(wú)法拋出異常也沒(méi)有返回值。但是有一個(gè)wrap類(lèi)FutureTask扶关,能夠把Callable接口包裝成Runnable和Future峡碉,使得執(zhí)行器接口的任務(wù)可以擁有Callable和Future的特性。(Callable和Future另行筆記:21.Future和Callable.md)
ExecutorService
ExecutorService接口繼承Executor接口驮审,是執(zhí)行器框架的估價(jià)接口鲫寄,它定義了任務(wù)的提交、取消疯淫、執(zhí)行器的關(guān)閉等方法地来。是為執(zhí)行器的最終執(zhí)行“提供服務(wù)”的接口。
- 接口定義:
public interface ExecutorService extends Executor {
/**
* 啟動(dòng)有序關(guān)閉熙掺,其中先前提交的任務(wù)將被執(zhí)行未斑,但不會(huì)接受任何新任務(wù)。
* 如果已經(jīng)關(guān)閉币绩,調(diào)用沒(méi)有其他影響蜡秽。
* 此方法不會(huì)等待先前提交的任務(wù)完成執(zhí)行。
* 使用awaitTermination來(lái)做到這一點(diǎn)缆镣。{@link #awaitTermination awaitTermination}
*/
void shutdown();
/**
* 阻止等待任務(wù)啟動(dòng)并試圖停止當(dāng)前正在執(zhí)行的任務(wù)芽突,返回等待執(zhí)行的任務(wù)list。
* 此方法不會(huì)等待先前提交的任務(wù)完成執(zhí)行董瞻。
* 使用awaitTermination來(lái)做到這一點(diǎn)寞蚌。{@link #awaitTermination awaitTermination}
*/
List<Runnable> shutdownNow();
boolean isShutdown();
/**
* 以shutdown為前提
* Returns {@code true} if all tasks have completed following shut down.
* Note that {@code isTerminated} is never {@code true} unless
* either {@code shutdown} or {@code shutdownNow} was called first.
*
* @return {@code true} if all tasks have completed following shut down
*/
boolean isTerminated();
/**
* Blocks until all tasks have completed execution after a shutdown
* request, or the timeout occurs, or the current thread is
* interrupted, whichever happens first.
*/
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
/**
* Submits a value-returning task for execution and returns a
* Future representing the pending results of the task. The
* Future's {@code get} method will return the task's result upon
* successful completion.
*/
<T> Future<T> submit(Callable<T> task);
/**
* Submits a Runnable task for execution and returns a Future
* representing that task. The Future's {@code get} method will
* return the given result upon successful completion.
*/
<T> Future<T> submit(Runnable task, T result);
/**
* Submits a Runnable task for execution and returns a Future
* representing that task. The Future's {@code get} method will
* return {@code null} upon <em>successful</em> completion.
*/
Future<?> submit(Runnable task);
/**
* Executes the given tasks, returning a list of Futures holding
* their status and results when all complete.
* {@link Future#isDone} is {@code true} for each
* element of the returned list.
* Note that a <em>completed</em> task could have
* terminated either normally or by throwing an exception.
* The results of this method are undefined if the given
* collection is modified while this operation is in progress.
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
/**
* Executes the given tasks, returning a list of Futures holding
* their status and results
* when all complete or the timeout expires, whichever happens first.
* {@link Future#isDone} is {@code true} for each
* element of the returned list.
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
/**
* Executes the given tasks, returning the result
* of one that has completed successfully (i.e., without throwing
* an exception), if any do. Upon normal or exceptional return,
* tasks that have not completed are cancelled.
* The results of this method are undefined if the given
* collection is modified while this operation is in progress.
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
/**
* Executes the given tasks, returning the result
* of one that has completed successfully (i.e., without throwing
* an exception), if any do before the given timeout elapses.
* Upon normal or exceptional return, tasks that have not
* completed are cancelled.
* The results of this method are undefined if the given
* collection is modified while this operation is in progress.
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
AbstractExecutorService
抽象類(lèi)AbstractExecutorService為ExecutorService提供默認(rèn)公共實(shí)現(xiàn)田巴。
ScheduledExecutorService
繼承ExcutorService,是一個(gè)支持“定時(shí)”調(diào)度任務(wù)的ExecutorService挟秤。
// 延時(shí)執(zhí)行
public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
// 延時(shí)執(zhí)行壹哺,Callable版本
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
// 定期執(zhí)行,在初始時(shí)延結(jié)束后艘刚,以period為周期執(zhí)行
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
// 定期執(zhí)行管宵,在初始的延遲結(jié)束后周期運(yùn)行任務(wù),在一次調(diào)用完成和下一次調(diào)用開(kāi)始之間有delay的延遲攀甚,這個(gè)版本會(huì)等待一個(gè)線程執(zhí)行完成箩朴。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);
ThreadPoolExecutor
是執(zhí)行器框架中最常用的執(zhí)行器:線程池。在其之下還擴(kuò)展了其他的執(zhí)行器云稚。如:ScheduledThreadPoolExecutor隧饼。線程池是執(zhí)行器框架的功能實(shí)現(xiàn)者,池化的線程作為任務(wù)的執(zhí)行者存在于執(zhí)行器內(nèi)部來(lái)完成任務(wù)的最終執(zhí)行静陈,執(zhí)行器控制內(nèi)部的線程(執(zhí)行者)的生命周期以及任務(wù)的調(diào)度策略燕雁。
線程池的簡(jiǎn)單理解就是將線程創(chuàng)建好,緩存在內(nèi)存中被多個(gè)任務(wù)復(fù)用鲸拥,減少線程創(chuàng)建和銷(xiāo)毀的開(kāi)銷(xiāo)拐格。和自行動(dòng)手創(chuàng)建線程執(zhí)行任務(wù)相比,線程池負(fù)責(zé)線程的創(chuàng)建和任務(wù)的分發(fā)刑赶,以及控制線程的生命周期捏浊。
Executors
Executors是執(zhí)行器框架的一個(gè)工具類(lèi),提供了使用執(zhí)行器的一些靜態(tài)方法撞叨,如返回某個(gè)類(lèi)型的線程池實(shí)例等金踪,在《阿里巴巴java開(kāi)發(fā)手冊(cè)》中,并不推薦使用Executors來(lái)創(chuàng)建線程池牵敷,因?yàn)檠谏w了創(chuàng)建線程池的細(xì)節(jié)胡岔,失去對(duì)線程池細(xì)節(jié)的配置。如newCachedThreadPool()
內(nèi)部設(shè)置的最大線程數(shù)為Integer.MAX_VALUE
,這可能會(huì)使得OOM發(fā)生枷餐。
線程池
線程池基本屬性
- private volatile long keepAliveTime
corePoolSize之外的線程的空轉(zhuǎn)存活時(shí)間(默認(rèn)納秒)
- private volatile boolean allowCoreThreadTimeOut;
false時(shí)靶瘸,core線程始終保持存活,否則使用keepAliveTime來(lái)控制core線程的空轉(zhuǎn)時(shí)間毛肋,默認(rèn)false怨咪;
- private volatile int corePoolSize;
最小線程數(shù),如果開(kāi)啟了allowCoreThreadTimeOut润匙,最小線程可能會(huì)是0
- private volatile int maximumPoolSize;
線程池中允許的最大線程數(shù)诗眨,實(shí)際數(shù)量還需依賴底層限制。線程池的阻塞隊(duì)列滿了之后趁桃,如果還有任務(wù)提交辽话,如果當(dāng)前的線程數(shù)小于maximumPoolSize肄鸽,則會(huì)新建線程來(lái)執(zhí)行任務(wù)卫病。 如果使用的是無(wú)界隊(duì)列油啤,該參數(shù)也就沒(méi)有什么效果了。
- private final BlockingQueue<Runnable> workQueue;
任務(wù)隊(duì)列蟀苛,當(dāng)任務(wù)提交后無(wú)法為其分配線程執(zhí)行時(shí)益咬,會(huì)暫存到阻塞的任務(wù)隊(duì)列中,當(dāng)任務(wù)隊(duì)列滿了(如果有界),就會(huì)創(chuàng)建新的線程來(lái)hold任務(wù)(如果沒(méi)有達(dá)到maximumPoolSize)帜平。需要注意使用的是有界隊(duì)列還是無(wú)界隊(duì)列幽告。ArrayBlockingQueue
(需要指定容量創(chuàng)建)和LinkedBlockingQueue
(默認(rèn)Integer.MAX_VALUE)是有界隊(duì)列,PriorityBlockingQueue
(默認(rèn)容量11裆甩,會(huì)自動(dòng)擴(kuò)容的二叉堆)和SynchronousQueue
(不存儲(chǔ)元素的阻塞隊(duì)列冗锁,每個(gè)插入操作都必須等待一個(gè)移出操作)是無(wú)界隊(duì)列。
- 狀態(tài):
RUNNING嗤栓、SHUTDOWN冻河、STOP、TIDYING茉帅、TERMINATED
狀態(tài)含義和狀態(tài)如何遷移:
/* The runState provides the main lifecycle control, taking on values:
*
* RUNNING: Accept new tasks and process queued tasks
* SHUTDOWN: Don't accept new tasks, but process queued tasks
* STOP: Don't accept new tasks, don't process queued tasks,
* and interrupt in-progress tasks
* TIDYING: All tasks have terminated, workerCount is zero,
* the thread transitioning to state TIDYING
* will run the terminated() hook method
* TERMINATED: terminated() has completed
*
* The numerical order among these values matters, to allow
* ordered comparisons. The runState monotonically increases over
* time, but need not hit each state. The transitions are:
*
* RUNNING -> SHUTDOWN
* On invocation of shutdown(), perhaps implicitly in finalize()
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* SHUTDOWN -> TIDYING
* When both queue and pool are empty
* STOP -> TIDYING
* When pool is empty
* TIDYING -> TERMINATED
* When the terminated() hook method has completed
*
* Threads waiting in awaitTermination() will return when the
* state reaches TERMINATED.
*/
- 工作者數(shù)量:workerCount
workerCount和state共用一個(gè)int型變量叨叙,前者使用低29位,后者使用高3位堪澎。
線程池工作邏輯:
- 任務(wù)提交到線程池擂错,如果線程池中的線程數(shù)小于corePoolSize,就創(chuàng)建新線程來(lái)執(zhí)行任務(wù)
- 當(dāng)任務(wù)到達(dá)時(shí)線程數(shù)達(dá)到corePoolSize樱蛤,任務(wù)被放入woreQueue中等待線程空閑钮呀,當(dāng)線程執(zhí)行完一個(gè)任務(wù),會(huì)取隊(duì)列中的任務(wù)昨凡;
- 當(dāng)任務(wù)到達(dá)時(shí)隊(duì)列已滿爽醋,如果當(dāng)前線程數(shù)小于Maxsize,就創(chuàng)建一個(gè)新線程處理新來(lái)的任務(wù)土匀,如果當(dāng)前線程數(shù)已經(jīng)達(dá)到上限子房,就使用指定的拒絕任務(wù)處理器處理任務(wù)。
線程池創(chuàng)建
線程池主要有常規(guī)的任務(wù)調(diào)度線程池和定時(shí)(延遲)任務(wù)的線程池兩種就轧,前者對(duì)應(yīng)ThreadPoolExecutor類(lèi)和ForkJoinPool(java8)证杭,后者對(duì)應(yīng)的是ScheduledThreadPoolExecutor。
使用構(gòu)造器(推薦)
ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
前幾個(gè)參數(shù)的含義前文已經(jīng)說(shuō)明妒御,threadFactory是指定線程創(chuàng)建工廠解愤,handler是當(dāng)任務(wù)工作隊(duì)列已經(jīng)滿了且線程數(shù)已經(jīng)達(dá)到上限時(shí)的任務(wù)拒絕處理器。這個(gè)構(gòu)造器有幾個(gè)重載的構(gòu)造器:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory);
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler);
- Test
@Test
public void testThreadPool() {
ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
pool.prestartAllCoreThreads();// 預(yù)先啟動(dòng)所有核心線程乎莉,讓其空轉(zhuǎn)等待
// pool.prestartCoreThread(); // 預(yù)先啟動(dòng)一個(gè)核心線程
System.out.println(pool.getCorePoolSize());
System.out.println(pool.getKeepAliveTime(TimeUnit.SECONDS));
System.out.println(pool.getActiveCount());
System.out.println(pool.getLargestPoolSize());
System.out.println(pool.getCompletedTaskCount());
System.out.println(pool.getPoolSize());
System.out.println(pool.getTaskCount());
System.out.println(pool.getQueue().size());
pool.shutdown(); // 關(guān)閉線程池
}
// 5
// 60
// 0
// 5
// 0
// 5
// 0
// 0
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor并實(shí)現(xiàn)ScheduledExecutorService接口送讲,為一個(gè)提供周期任務(wù)執(zhí)行的線程池奸笤。
構(gòu)造器:
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 基于父類(lèi)ThreadPoolExecutor的構(gòu)造器,最大poolsize是Integer.MAX_VALUE哼鬓,
// 使用的任務(wù)隊(duì)列是DelayedWorkQueue
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
/*幾個(gè)重載的構(gòu)造器监右,重載的風(fēng)格類(lèi)似于父類(lèi)ThreadPoolExecutor*/
public ScheduledThreadPoolExecutor(int corePoolSize);
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory)异希;
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler)健盒;
- Test
@Test
public void testSchedulePool() {
ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1);
pool.shutdown();
}
ForkJoinPool
暫略.
使用Executors工具類(lèi)
Executors類(lèi)提供了眾多public static工廠方法來(lái)創(chuàng)建線程池:
固定線程數(shù)的線程池
@since 1.5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
// 重載版本
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory);
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
// 重載版本
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory);
支持任務(wù)密取的線程池
(ForkJoinPool) @since 1.8
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
// 并行數(shù)為處理器數(shù)
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
CachedThreadPool
初始化是線程池線程數(shù)為0称簿,使用時(shí)按需創(chuàng)建線程
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
// 重載
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)扣癣;
ScheduledThreadPoolExecutor
執(zhí)行周期性任務(wù)的線程池
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
// 重載
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) ;
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
// 重載
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) 憨降;
注: ThreadFactory threadFactory是創(chuàng)建線程的工廠接口父虑,通過(guò)此參數(shù)可以自定義線程創(chuàng)建的過(guò)程。
線程池任務(wù)執(zhí)行
應(yīng)用思路
- 創(chuàng)建線程池 (使用構(gòu)造器創(chuàng)建線程池)
- submit()提交任務(wù) 或 execute()執(zhí)行任務(wù)
- 關(guān)閉線程池
submit是AbstractExecutorService提供的模板方法授药,其內(nèi)部執(zhí)行了Executor接口的execute方法士嚎,實(shí)際上execute是交給子類(lèi)類(lèi)來(lái)實(shí)現(xiàn)的。execute方法在ThreadPoolExecutor中有實(shí)現(xiàn)烁焙。
Test
固定線程數(shù)線程池
@Test
public void testFixCountPool() {
// 固定線程數(shù)為10航邢,任務(wù)隊(duì)列100
ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));
for (int i = 0; i < 100; i++) {
Runnable r = () -> {
System.out.println(Thread.currentThread().getName() + ",task start");
System.out.println(Thread.currentThread().getName() + ",task end");
// latch.countDown();
};
pool.submit(r);
}
// 等待任務(wù)完成后再shutdown,還可以使用CountDownLatch來(lái)完成此需求
while (pool.getCompletedTaskCount() < 100) {
}
pool.shutdown();
}
@Test
public void testPool2() throws InterruptedException, ExecutionException {
ExecutorService pool = new ThreadPoolExecutor(1,1,0,TimeUnit.SECONDS,new LinkedBlockingQueue<>(20));
// 使用有返回結(jié)果的Callable任務(wù)
FutureTask<Integer> task = new FutureTask<Integer>(() -> {
System.out.println("This is a callable");
return 10;
});
pool.submit(task);
// 阻塞等待結(jié)果
System.out.println(task.get());
// pool支持Callable任務(wù)
Future<Integer> future = pool.submit(() -> {
System.out.println("this is a pure callable");
return 100;
});
System.out.println(future.get());
pool.shutdown();
}
cachedPool
CachedThreadPool初始化時(shí)不會(huì)創(chuàng)建線程骄蝇,在有任務(wù)提交的時(shí)候回按需創(chuàng)建線程膳殷,線程有idle有效期,過(guò)期會(huì)自動(dòng)銷(xiāo)毀九火。
ThreadPoolExecutor pool = new ThreadPoolExecutor(0, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));
schedulePool
@Test
public void testSchedulePool() throws InterruptedException, ExecutionException {
ScheduledExecutorService pool = new ScheduledThreadPoolExecutor(1);
// 延遲三秒執(zhí)行
ScheduledFuture<String> future = pool.schedule(() -> {
System.out.println("hhh");
return "end";
}, 3, TimeUnit.SECONDS);
System.out.println(System.currentTimeMillis());
System.out.println(future.get());
System.out.println(System.currentTimeMillis());
pool.shutdowm();
}
// 指定周期執(zhí)行
@Test
public void testSchedulePool2() throws InterruptedException, ExecutionException, TimeoutException {
ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor();
pool.scheduleAtFixedRate(() -> {
System.out.println("hhh");
}, 2, 1, TimeUnit.SECONDS);
TimeUnit.SECONDS.sleep(10);
pool.shutdown();
pool.awaitTermination(3,TimeUnit.SECONDS);
}
// 指定延遲的周期執(zhí)行
@Test
public void testScheduledPool() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(10);
ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1);
// 1秒后執(zhí)行任務(wù)赚窃,任務(wù)完成后按間隔兩秒再次執(zhí)行任務(wù)
pool.scheduleWithFixedDelay(() -> {
try {
System.out.println("hi");
// 任務(wù)休眠三秒,差不多任務(wù)是5秒執(zhí)行一次
TimeUnit.SECONDS.sleep(3);
latch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}, 1, 2, TimeUnit.SECONDS);
latch.await();
pool.shutdown();
}
invokeAll等
@Test
public void testInvokeAny() {
ExecutorService executor = Executors.newFixedThreadPool(10);
List<Callable<String>> tasks = new ArrayList<>(100);
Callable<String> task = () -> {
Thread.sleep(new Random().nextInt(3000));
return Thread.currentThread().getName();
};
for (int i = 0; i < 100; i++) {
tasks.add(task);
}
try {
// 返回一個(gè)完成的任務(wù)結(jié)果岔激,沒(méi)完成的其他的都被取消
String res = executor.invokeAny(tasks);
System.out.println(res);
executor.shutdown();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
/**
* invokeAll
*/
@Test
public void testInvokeAll() {
ExecutorService executor = Executors.newFixedThreadPool(10);
List<Callable<Integer>> tasks = new ArrayList<>(100);
for (int i = 0; i < 100; i++) {
final int v = i;
Callable<Integer> task = () -> {
Thread.sleep(100);
return v;
};
tasks.add(task);
}
try {
// 所有任務(wù)完成后會(huì)返回,結(jié)果是按提交順序存到list中的
List<Future<Integer>> res = executor.invokeAll(tasks);
// 遍歷
res.stream().forEach((Future<Integer> f) -> {
try {
System.out.println(f.get());
} catch (Exception e) {
e.printStackTrace();
}
});
executor.shutdown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 按照獲得結(jié)果的順序存儲(chǔ)結(jié)果
*/
@Test
public void testInvokeAll2() {
ExecutorService executor = new ThreadPoolExecutor(10, 10, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
List<Callable<Integer>> tasks = new ArrayList<>(100);
for (int i = 0; i < 100; i++) {
final int v = i;
Callable<Integer> task = () -> {
Thread.sleep(100);
return v;
};
tasks.add(task);
}
try {
ExecutorCompletionService<Integer> service = new ExecutorCompletionService<>(executor);
for (Callable<Integer> task : tasks)
service.submit(task);
for (int i = 0; i < tasks.size(); i++) {
System.out.println(service.take().get());
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
小結(jié)
向?qū)嵗蟮膒ool提交任務(wù)勒极,需要返回值的task使用Callable接口定義,否則使用Runnable虑鼎。
線程池關(guān)閉
API提供了兩種關(guān)閉池的方式辱匿,shutdown和shutdownNow,兩者執(zhí)行后炫彩,線程池都會(huì)拒絕新任務(wù)的提交匾七,前者會(huì)等待已啟動(dòng)的任務(wù)執(zhí)行結(jié)束,隊(duì)列中的任務(wù)執(zhí)行會(huì)啟動(dòng)江兢,但是線程池不會(huì)阻塞等待任務(wù)完成昨忆,后者是會(huì)嘗試中斷啟動(dòng)的任務(wù)、不會(huì)啟動(dòng)隊(duì)列中的任務(wù)杉允,暴力關(guān)閉pool邑贴。
shutdown
@Test
public void testShutdown() throws InterruptedException {
ThreadPoolExecutor p = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));
// 讓主線程等待任務(wù)完成(防止jvm退出席里,另一種方案是輪詢pool.getActiveCount()
CountDownLatch latch = new CountDownLatch(6);
Callable<String> task = () -> {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println("hi");
return "hi";
} finally {
latch.countDown();
}
};
p.submit(task);
// 提交5個(gè)任務(wù),會(huì)直接進(jìn)入到隊(duì)列中
for (int i = 0; i < 5; i++)
p.submit(task);
System.out.println("shutdown");
p.shutdown();
System.out.println(p.isShutdown());// true
System.out.println(p.getActiveCount());
// p.submit(task);// RejectedExecutionException
latch.await();
System.out.println(p.getActiveCount());
}
// 輸出:
// shutdown
// true
// 1
// hi
// hi
// hi
// hi
// hi
// hi
// 0
輸出結(jié)果證明提交的任務(wù)以及隊(duì)列中的任務(wù)能夠正常執(zhí)行完成拢驾,如果主進(jìn)程沒(méi)有退出的話奖磁。
- 使用awaitTermination阻塞等待
@Test
public void testShutdownAwait() throws InterruptedException {
ThreadPoolExecutor p = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));
Callable<String> task = () -> {
TimeUnit.SECONDS.sleep(1);
System.out.println("hi");
return "hi";
};
p.submit(task);
// 提交5個(gè)任務(wù),會(huì)直接進(jìn)入到隊(duì)列中
for (int i = 0; i < 5; i++)
p.submit(task);
System.out.println("shutdown");
p.shutdown();
// 阻塞直達(dá)池中任務(wù)全部完成或者超時(shí)
p.awaitTermination(1000, TimeUnit.SECONDS);
System.out.println(p.isShutdown());// true
System.out.println(p.getActiveCount());
}
shutdownNow
@Test
public void testShutdownNow() throws InterruptedException {
ThreadPoolExecutor p = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));
Callable<String> task = () -> {
System.out.println("start");
TimeUnit.SECONDS.sleep(3);
System.out.println("hi");
return "hi";
};
p.submit(task);
// 提交5個(gè)任務(wù)独旷,會(huì)直接進(jìn)入到隊(duì)列中
for (int i = 0; i < 5; i++)
p.submit(task);
System.out.println("shutdown");
p.shutdownNow();
System.out.println(p.isShutdown());// true
System.out.println(p.getActiveCount());
while (p.getActiveCount() > 0) {
}
System.out.println("end");
}
// 輸出:
// shutdown
// start
// true
// 0
// end
第一個(gè)任務(wù)啟動(dòng)后沒(méi)有等到完成就被取消(shutdownNow執(zhí)行后getActiveCount就變成了0)署穗,隊(duì)列中任務(wù)不會(huì)啟動(dòng)寥裂。
shutdownNow使用Thread#interrupt來(lái)中斷任務(wù)嵌洼,如果任務(wù)不響應(yīng)中斷,就不會(huì)被中斷封恰。awaitTermination()調(diào)用后會(huì)阻塞等待沒(méi)有被結(jié)束成功的任務(wù)執(zhí)行完成麻养。如果任務(wù)在shutdownNow時(shí)就全部被中斷,任務(wù)就都已經(jīng)結(jié)束了诺舔,awaitTermination()也不會(huì)阻塞鳖昌。
這里引出停止線程的一個(gè)方法:讓線程響應(yīng)中斷。
@Test
public void testShutdownNowAwait() throws InterruptedException {
ThreadPoolExecutor p = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));
Callable<String> task = () -> {
System.out.println("start");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
System.out.println(e);
}
System.out.println("hi");
return "hi";
};
p.submit(task);
// 提交5個(gè)任務(wù)低飒,會(huì)直接進(jìn)入到隊(duì)列中
for (int i = 0; i < 5; i++)
p.submit(task);
System.out.println("shutdown");
p.shutdownNow();
System.out.println(p.isShutdown());// true
System.out.println(p.getActiveCount());
// 阻塞等待active的任務(wù)執(zhí)行完成
p.awaitTermination(1000, TimeUnit.SECONDS);
System.out.println("end");
}
// 輸出
// start
// true
// 1
// java.lang.InterruptedException: sleep interrupted
// hi
// end
小結(jié)
shutdown和shutdownNow的區(qū)別是许昨,前者只是拒絕新任務(wù),如果主線程不消亡褥赊,池中的活躍任務(wù)和隊(duì)列中的任務(wù)都會(huì)執(zhí)行糕档,可以使用awaitTermination()指定pool阻塞等待它們完成;shutdownNow也會(huì)拒絕新任務(wù)拌喉,它會(huì)通過(guò)中斷試著終止活躍任務(wù)速那,但是不一定能終止成功,隊(duì)列中的任務(wù)被取消尿背,如果活躍任務(wù)沒(méi)有被殺死端仰,只要宿主進(jìn)程沒(méi)死也是會(huì)繼續(xù)執(zhí)行的,但是隊(duì)列中的任務(wù)不會(huì)啟動(dòng)田藐,它也可以使用awaitTermination()指定pool阻塞等待沒(méi)被殺死的活躍任務(wù)完成荔烧。
參考資料
[1] 【死磕Java并發(fā)】-----J.U.C之線程池:線程池的基礎(chǔ)架構(gòu)
[2] 【死磕Java并發(fā)】-----J.U.C之線程池:ThreadPoolExecutor