22.線程池的使用

[TOC]

執(zhí)行器(Executor)層次

圖片來(lái)自參考資料1

Executor

Executor即為執(zhí)行器,是執(zhí)行器框架的頂層接口软能,定義最為基礎(chǔ)的框架功能:執(zhí)行任務(wù)。

  • 接口定義如下:
public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}
  • Runnable:任務(wù)抽象

執(zhí)行器接口定義了執(zhí)行器的任務(wù)執(zhí)行模型,指定了任務(wù)的抽象為Runnable接口逸寓。Runnable接口:

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

Runnable是一個(gè)函數(shù)式接口若治,內(nèi)部唯一抽象方法run方法無(wú)異常拋出慨蓝,無(wú)返回值。Thread類(lèi)是Runnable接口的實(shí)現(xiàn)類(lèi)端幼,所以可以把一個(gè)線程抽象成一個(gè)任務(wù)提交給執(zhí)行器來(lái)執(zhí)行礼烈,在這種應(yīng)用場(chǎng)景下,Thread實(shí)例不是作為一個(gè)線程婆跑,而是作為一個(gè)任務(wù)的封裝此熬,因?yàn)橹皇钦{(diào)用其run方法,不會(huì)觸及線程的start()滑进。

無(wú)返回值且無(wú)異常拋出的run方法給任務(wù)模型帶來(lái)局限性犀忱,即無(wú)法拋出異常也沒(méi)有返回值。但是有一個(gè)wrap類(lèi)FutureTask扶关,能夠把Callable接口包裝成Runnable和Future峡碉,使得執(zhí)行器接口的任務(wù)可以擁有Callable和Future的特性。(Callable和Future另行筆記:21.Future和Callable.md)

ExecutorService

ExecutorService接口繼承Executor接口驮审,是執(zhí)行器框架的估價(jià)接口鲫寄,它定義了任務(wù)的提交、取消疯淫、執(zhí)行器的關(guān)閉等方法地来。是為執(zhí)行器的最終執(zhí)行“提供服務(wù)”的接口。

  • 接口定義:
public interface ExecutorService extends Executor {

    /**
     * 啟動(dòng)有序關(guān)閉熙掺,其中先前提交的任務(wù)將被執(zhí)行未斑,但不會(huì)接受任何新任務(wù)。 
     * 如果已經(jīng)關(guān)閉币绩,調(diào)用沒(méi)有其他影響蜡秽。
     * 此方法不會(huì)等待先前提交的任務(wù)完成執(zhí)行。 
     * 使用awaitTermination來(lái)做到這一點(diǎn)缆镣。{@link #awaitTermination awaitTermination}
     */
    void shutdown();

    /**
     * 阻止等待任務(wù)啟動(dòng)并試圖停止當(dāng)前正在執(zhí)行的任務(wù)芽突,返回等待執(zhí)行的任務(wù)list。
     * 此方法不會(huì)等待先前提交的任務(wù)完成執(zhí)行董瞻。 
     * 使用awaitTermination來(lái)做到這一點(diǎn)寞蚌。{@link #awaitTermination awaitTermination}
     */
    List<Runnable> shutdownNow();

    boolean isShutdown();

    /**
     * 以shutdown為前提
     * Returns {@code true} if all tasks have completed following shut down.
     * Note that {@code isTerminated} is never {@code true} unless
     * either {@code shutdown} or {@code shutdownNow} was called first.
     *
     * @return {@code true} if all tasks have completed following shut down
     */
    boolean isTerminated();

    /**
     * Blocks until all tasks have completed execution after a shutdown
     * request, or the timeout occurs, or the current thread is
     * interrupted, whichever happens first.
     */
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * Submits a value-returning task for execution and returns a
     * Future representing the pending results of the task. The
     * Future's {@code get} method will return the task's result upon
     * successful completion.
     */
    <T> Future<T> submit(Callable<T> task);

    /**
     * Submits a Runnable task for execution and returns a Future
     * representing that task. The Future's {@code get} method will
     * return the given result upon successful completion.
     */
    <T> Future<T> submit(Runnable task, T result);

    /**
     * Submits a Runnable task for execution and returns a Future
     * representing that task. The Future's {@code get} method will
     * return {@code null} upon <em>successful</em> completion.
     */
    Future<?> submit(Runnable task);

    /**
     * Executes the given tasks, returning a list of Futures holding
     * their status and results when all complete.
     * {@link Future#isDone} is {@code true} for each
     * element of the returned list.
     * Note that a <em>completed</em> task could have
     * terminated either normally or by throwing an exception.
     * The results of this method are undefined if the given
     * collection is modified while this operation is in progress.
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    /**
     * Executes the given tasks, returning a list of Futures holding
     * their status and results
     * when all complete or the timeout expires, whichever happens first.
     * {@link Future#isDone} is {@code true} for each
     * element of the returned list.
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * Executes the given tasks, returning the result
     * of one that has completed successfully (i.e., without throwing
     * an exception), if any do. Upon normal or exceptional return,
     * tasks that have not completed are cancelled.
     * The results of this method are undefined if the given
     * collection is modified while this operation is in progress.
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    /**
     * Executes the given tasks, returning the result
     * of one that has completed successfully (i.e., without throwing
     * an exception), if any do before the given timeout elapses.
     * Upon normal or exceptional return, tasks that have not
     * completed are cancelled.
     * The results of this method are undefined if the given
     * collection is modified while this operation is in progress.
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

AbstractExecutorService

抽象類(lèi)AbstractExecutorService為ExecutorService提供默認(rèn)公共實(shí)現(xiàn)田巴。

ScheduledExecutorService

繼承ExcutorService,是一個(gè)支持“定時(shí)”調(diào)度任務(wù)的ExecutorService挟秤。

    // 延時(shí)執(zhí)行
    public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);

    // 延時(shí)執(zhí)行壹哺,Callable版本
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
    
    // 定期執(zhí)行,在初始時(shí)延結(jié)束后艘刚,以period為周期執(zhí)行
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
    
    // 定期執(zhí)行管宵,在初始的延遲結(jié)束后周期運(yùn)行任務(wù),在一次調(diào)用完成和下一次調(diào)用開(kāi)始之間有delay的延遲攀甚,這個(gè)版本會(huì)等待一個(gè)線程執(zhí)行完成箩朴。
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);

ThreadPoolExecutor

是執(zhí)行器框架中最常用的執(zhí)行器:線程池。在其之下還擴(kuò)展了其他的執(zhí)行器云稚。如:ScheduledThreadPoolExecutor隧饼。線程池是執(zhí)行器框架的功能實(shí)現(xiàn)者,池化的線程作為任務(wù)的執(zhí)行者存在于執(zhí)行器內(nèi)部來(lái)完成任務(wù)的最終執(zhí)行静陈,執(zhí)行器控制內(nèi)部的線程(執(zhí)行者)的生命周期以及任務(wù)的調(diào)度策略燕雁。

線程池的簡(jiǎn)單理解就是將線程創(chuàng)建好,緩存在內(nèi)存中被多個(gè)任務(wù)復(fù)用鲸拥,減少線程創(chuàng)建和銷(xiāo)毀的開(kāi)銷(xiāo)拐格。和自行動(dòng)手創(chuàng)建線程執(zhí)行任務(wù)相比,線程池負(fù)責(zé)線程的創(chuàng)建和任務(wù)的分發(fā)刑赶,以及控制線程的生命周期捏浊。

Executors

Executors是執(zhí)行器框架的一個(gè)工具類(lèi),提供了使用執(zhí)行器的一些靜態(tài)方法撞叨,如返回某個(gè)類(lèi)型的線程池實(shí)例等金踪,在《阿里巴巴java開(kāi)發(fā)手冊(cè)》中,并不推薦使用Executors來(lái)創(chuàng)建線程池牵敷,因?yàn)檠谏w了創(chuàng)建線程池的細(xì)節(jié)胡岔,失去對(duì)線程池細(xì)節(jié)的配置。如newCachedThreadPool()內(nèi)部設(shè)置的最大線程數(shù)為Integer.MAX_VALUE,這可能會(huì)使得OOM發(fā)生枷餐。

線程池

線程池基本屬性

  • private volatile long keepAliveTime

corePoolSize之外的線程的空轉(zhuǎn)存活時(shí)間(默認(rèn)納秒)

  • private volatile boolean allowCoreThreadTimeOut;

false時(shí)靶瘸,core線程始終保持存活,否則使用keepAliveTime來(lái)控制core線程的空轉(zhuǎn)時(shí)間毛肋,默認(rèn)false怨咪;

  • private volatile int corePoolSize;

最小線程數(shù),如果開(kāi)啟了allowCoreThreadTimeOut润匙,最小線程可能會(huì)是0

  • private volatile int maximumPoolSize;

線程池中允許的最大線程數(shù)诗眨,實(shí)際數(shù)量還需依賴底層限制。線程池的阻塞隊(duì)列滿了之后趁桃,如果還有任務(wù)提交辽话,如果當(dāng)前的線程數(shù)小于maximumPoolSize肄鸽,則會(huì)新建線程來(lái)執(zhí)行任務(wù)卫病。 如果使用的是無(wú)界隊(duì)列油啤,該參數(shù)也就沒(méi)有什么效果了。

  • private final BlockingQueue<Runnable> workQueue;

任務(wù)隊(duì)列蟀苛,當(dāng)任務(wù)提交后無(wú)法為其分配線程執(zhí)行時(shí)益咬,會(huì)暫存到阻塞的任務(wù)隊(duì)列中,當(dāng)任務(wù)隊(duì)列滿了(如果有界),就會(huì)創(chuàng)建新的線程來(lái)hold任務(wù)(如果沒(méi)有達(dá)到maximumPoolSize)帜平。需要注意使用的是有界隊(duì)列還是無(wú)界隊(duì)列幽告。ArrayBlockingQueue(需要指定容量創(chuàng)建)和LinkedBlockingQueue(默認(rèn)Integer.MAX_VALUE)是有界隊(duì)列,PriorityBlockingQueue(默認(rèn)容量11裆甩,會(huì)自動(dòng)擴(kuò)容的二叉堆)和SynchronousQueue(不存儲(chǔ)元素的阻塞隊(duì)列冗锁,每個(gè)插入操作都必須等待一個(gè)移出操作)是無(wú)界隊(duì)列。

  • 狀態(tài):

RUNNING嗤栓、SHUTDOWN冻河、STOP、TIDYING茉帅、TERMINATED

狀態(tài)含義和狀態(tài)如何遷移:

    /* The runState provides the main lifecycle control, taking on values:
     *
     *   RUNNING:  Accept new tasks and process queued tasks
     *   SHUTDOWN: Don't accept new tasks, but process queued tasks
     *   STOP:     Don't accept new tasks, don't process queued tasks,
     *             and interrupt in-progress tasks
     *   TIDYING:  All tasks have terminated, workerCount is zero,
     *             the thread transitioning to state TIDYING
     *             will run the terminated() hook method
     *   TERMINATED: terminated() has completed
     *
     * The numerical order among these values matters, to allow
     * ordered comparisons. The runState monotonically increases over
     * time, but need not hit each state. The transitions are:
     *
     * RUNNING -> SHUTDOWN
     *    On invocation of shutdown(), perhaps implicitly in finalize()
     * (RUNNING or SHUTDOWN) -> STOP
     *    On invocation of shutdownNow()
     * SHUTDOWN -> TIDYING
     *    When both queue and pool are empty
     * STOP -> TIDYING
     *    When pool is empty
     * TIDYING -> TERMINATED
     *    When the terminated() hook method has completed
     *
     * Threads waiting in awaitTermination() will return when the
     * state reaches TERMINATED.
     */
  • 工作者數(shù)量:workerCount

workerCount和state共用一個(gè)int型變量叨叙,前者使用低29位,后者使用高3位堪澎。

線程池工作邏輯:

  • 任務(wù)提交到線程池擂错,如果線程池中的線程數(shù)小于corePoolSize,就創(chuàng)建新線程來(lái)執(zhí)行任務(wù)
  • 當(dāng)任務(wù)到達(dá)時(shí)線程數(shù)達(dá)到corePoolSize樱蛤,任務(wù)被放入woreQueue中等待線程空閑钮呀,當(dāng)線程執(zhí)行完一個(gè)任務(wù),會(huì)取隊(duì)列中的任務(wù)昨凡;
  • 當(dāng)任務(wù)到達(dá)時(shí)隊(duì)列已滿爽醋,如果當(dāng)前線程數(shù)小于Maxsize,就創(chuàng)建一個(gè)新線程處理新來(lái)的任務(wù)土匀,如果當(dāng)前線程數(shù)已經(jīng)達(dá)到上限子房,就使用指定的拒絕任務(wù)處理器處理任務(wù)。

線程池創(chuàng)建

線程池主要有常規(guī)的任務(wù)調(diào)度線程池和定時(shí)(延遲)任務(wù)的線程池兩種就轧,前者對(duì)應(yīng)ThreadPoolExecutor類(lèi)和ForkJoinPool(java8)证杭,后者對(duì)應(yīng)的是ScheduledThreadPoolExecutor。

使用構(gòu)造器(推薦)

ThreadPoolExecutor
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

前幾個(gè)參數(shù)的含義前文已經(jīng)說(shuō)明妒御,threadFactory是指定線程創(chuàng)建工廠解愤,handler是當(dāng)任務(wù)工作隊(duì)列已經(jīng)滿了且線程數(shù)已經(jīng)達(dá)到上限時(shí)的任務(wù)拒絕處理器。這個(gè)構(gòu)造器有幾個(gè)重載的構(gòu)造器:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue);
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory);
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler);
  • Test
    @Test
    public void testThreadPool() {
        ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
        pool.prestartAllCoreThreads();// 預(yù)先啟動(dòng)所有核心線程乎莉,讓其空轉(zhuǎn)等待
//      pool.prestartCoreThread(); // 預(yù)先啟動(dòng)一個(gè)核心線程
        System.out.println(pool.getCorePoolSize());
        System.out.println(pool.getKeepAliveTime(TimeUnit.SECONDS));
        System.out.println(pool.getActiveCount());
        System.out.println(pool.getLargestPoolSize());
        System.out.println(pool.getCompletedTaskCount());
        System.out.println(pool.getPoolSize());
        System.out.println(pool.getTaskCount());
        System.out.println(pool.getQueue().size());
        pool.shutdown(); // 關(guān)閉線程池
    }
// 5
// 60
// 0
// 5
// 0
// 5
// 0
// 0
ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor并實(shí)現(xiàn)ScheduledExecutorService接口送讲,為一個(gè)提供周期任務(wù)執(zhí)行的線程池奸笤。
構(gòu)造器:

    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
        // 基于父類(lèi)ThreadPoolExecutor的構(gòu)造器,最大poolsize是Integer.MAX_VALUE哼鬓,
        // 使用的任務(wù)隊(duì)列是DelayedWorkQueue
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory, handler);
    }
    /*幾個(gè)重載的構(gòu)造器监右,重載的風(fēng)格類(lèi)似于父類(lèi)ThreadPoolExecutor*/
    public ScheduledThreadPoolExecutor(int corePoolSize);
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory)异希;
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       RejectedExecutionHandler handler)健盒;

  • Test
    @Test
    public void testSchedulePool() {
        ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1);
        pool.shutdown();
    }
ForkJoinPool

暫略.

使用Executors工具類(lèi)

Executors類(lèi)提供了眾多public static工廠方法來(lái)創(chuàng)建線程池:

固定線程數(shù)的線程池

@since 1.5

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
// 重載版本
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory);

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

// 重載版本
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory);
支持任務(wù)密取的線程池

(ForkJoinPool) @since 1.8

public static ExecutorService newWorkStealingPool(int parallelism) {
    return new ForkJoinPool
        (parallelism,
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
}
// 并行數(shù)為處理器數(shù)
public static ExecutorService newWorkStealingPool() {
    return new ForkJoinPool
        (Runtime.getRuntime().availableProcessors(),
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
} 
CachedThreadPool

初始化是線程池線程數(shù)為0称簿,使用時(shí)按需創(chuàng)建線程

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
// 重載
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)扣癣;
ScheduledThreadPoolExecutor

執(zhí)行周期性任務(wù)的線程池

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1));
}
// 重載
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) ;

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
// 重載
public static ScheduledExecutorService newScheduledThreadPool(
        int corePoolSize, ThreadFactory threadFactory) 憨降;

注: ThreadFactory threadFactory是創(chuàng)建線程的工廠接口父虑,通過(guò)此參數(shù)可以自定義線程創(chuàng)建的過(guò)程。

線程池任務(wù)執(zhí)行

應(yīng)用思路

  • 創(chuàng)建線程池 (使用構(gòu)造器創(chuàng)建線程池
  • submit()提交任務(wù) 或 execute()執(zhí)行任務(wù)
  • 關(guān)閉線程池

submit是AbstractExecutorService提供的模板方法授药,其內(nèi)部執(zhí)行了Executor接口的execute方法士嚎,實(shí)際上execute是交給子類(lèi)類(lèi)來(lái)實(shí)現(xiàn)的。execute方法在ThreadPoolExecutor中有實(shí)現(xiàn)烁焙。

Test

固定線程數(shù)線程池
    @Test
    public void testFixCountPool() {
        // 固定線程數(shù)為10航邢,任務(wù)隊(duì)列100
        ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));
        for (int i = 0; i < 100; i++) {
            Runnable r = () -> {
                System.out.println(Thread.currentThread().getName() + ",task start");
                System.out.println(Thread.currentThread().getName() + ",task  end");
                // latch.countDown();
            };
            pool.submit(r);
        }
        // 等待任務(wù)完成后再shutdown,還可以使用CountDownLatch來(lái)完成此需求
        while (pool.getCompletedTaskCount() < 100) {
        }
        pool.shutdown();
    }
    @Test
    public void testPool2() throws InterruptedException, ExecutionException {
        ExecutorService pool = new ThreadPoolExecutor(1,1,0,TimeUnit.SECONDS,new LinkedBlockingQueue<>(20));
        // 使用有返回結(jié)果的Callable任務(wù)
        FutureTask<Integer> task = new FutureTask<Integer>(() -> {
            System.out.println("This is a callable");
            return 10;
        });
        pool.submit(task);
        // 阻塞等待結(jié)果
        System.out.println(task.get());
        // pool支持Callable任務(wù)
        Future<Integer> future = pool.submit(() -> {
            System.out.println("this is a pure callable");
            return 100;
        });
        System.out.println(future.get());
        pool.shutdown();
    }
cachedPool

CachedThreadPool初始化時(shí)不會(huì)創(chuàng)建線程骄蝇,在有任務(wù)提交的時(shí)候回按需創(chuàng)建線程膳殷,線程有idle有效期,過(guò)期會(huì)自動(dòng)銷(xiāo)毀九火。

ThreadPoolExecutor pool = new ThreadPoolExecutor(0, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));
schedulePool
    @Test
    public void testSchedulePool() throws InterruptedException, ExecutionException {
        ScheduledExecutorService pool = new ScheduledThreadPoolExecutor(1);
        // 延遲三秒執(zhí)行
        ScheduledFuture<String> future = pool.schedule(() -> {
            System.out.println("hhh");
            return "end";
        }, 3, TimeUnit.SECONDS);
        System.out.println(System.currentTimeMillis());
        System.out.println(future.get());
        System.out.println(System.currentTimeMillis());
        pool.shutdowm();
    }
    // 指定周期執(zhí)行
    @Test
    public void testSchedulePool2() throws InterruptedException, ExecutionException, TimeoutException {
        ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor();
        pool.scheduleAtFixedRate(() -> {
            System.out.println("hhh");
        }, 2, 1, TimeUnit.SECONDS);
        TimeUnit.SECONDS.sleep(10);
        pool.shutdown();
        pool.awaitTermination(3,TimeUnit.SECONDS);
        
    }
    // 指定延遲的周期執(zhí)行
    @Test
    public void testScheduledPool() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(10);
        ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1);
        // 1秒后執(zhí)行任務(wù)赚窃,任務(wù)完成后按間隔兩秒再次執(zhí)行任務(wù)
        pool.scheduleWithFixedDelay(() -> {
            try {
                System.out.println("hi");
                // 任務(wù)休眠三秒,差不多任務(wù)是5秒執(zhí)行一次
                TimeUnit.SECONDS.sleep(3);
                latch.countDown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, 1, 2, TimeUnit.SECONDS);
        latch.await();
        pool.shutdown();
    }
invokeAll等
    @Test
    public void testInvokeAny() {
        ExecutorService executor = Executors.newFixedThreadPool(10);
        List<Callable<String>> tasks = new ArrayList<>(100);
        Callable<String> task = () -> {
            Thread.sleep(new Random().nextInt(3000));
            return Thread.currentThread().getName();
        };
        for (int i = 0; i < 100; i++) {
            tasks.add(task);
        }
        try {
            // 返回一個(gè)完成的任務(wù)結(jié)果岔激,沒(méi)完成的其他的都被取消
            String res = executor.invokeAny(tasks);
            System.out.println(res);
            executor.shutdown();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
    /**
     * invokeAll
     */
    @Test
    public void testInvokeAll() {
        ExecutorService executor = Executors.newFixedThreadPool(10);
        List<Callable<Integer>> tasks = new ArrayList<>(100);
        for (int i = 0; i < 100; i++) {
            final int v = i;
            Callable<Integer> task = () -> {
                Thread.sleep(100);
                return v;
            };
            tasks.add(task);
        }
        try {
            // 所有任務(wù)完成后會(huì)返回,結(jié)果是按提交順序存到list中的
            List<Future<Integer>> res = executor.invokeAll(tasks);
            // 遍歷
            res.stream().forEach((Future<Integer> f) -> {
                try {
                    System.out.println(f.get());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
            executor.shutdown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 按照獲得結(jié)果的順序存儲(chǔ)結(jié)果
     */
    @Test
    public void testInvokeAll2() {
        ExecutorService executor = new ThreadPoolExecutor(10, 10, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
        List<Callable<Integer>> tasks = new ArrayList<>(100);
        for (int i = 0; i < 100; i++) {
            final int v = i;
            Callable<Integer> task = () -> {
                Thread.sleep(100);
                return v;
            };
            tasks.add(task);
        }
        try {
            ExecutorCompletionService<Integer> service = new ExecutorCompletionService<>(executor);
            for (Callable<Integer> task : tasks)
                service.submit(task);
            for (int i = 0; i < tasks.size(); i++) {
                System.out.println(service.take().get());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

小結(jié)

向?qū)嵗蟮膒ool提交任務(wù)勒极,需要返回值的task使用Callable接口定義,否則使用Runnable虑鼎。

線程池關(guān)閉

API提供了兩種關(guān)閉池的方式辱匿,shutdown和shutdownNow,兩者執(zhí)行后炫彩,線程池都會(huì)拒絕新任務(wù)的提交匾七,前者會(huì)等待已啟動(dòng)的任務(wù)執(zhí)行結(jié)束,隊(duì)列中的任務(wù)執(zhí)行會(huì)啟動(dòng)江兢,但是線程池不會(huì)阻塞等待任務(wù)完成昨忆,后者是會(huì)嘗試中斷啟動(dòng)的任務(wù)、不會(huì)啟動(dòng)隊(duì)列中的任務(wù)杉允,暴力關(guān)閉pool邑贴。

shutdown

    @Test
    public void testShutdown() throws InterruptedException {
        ThreadPoolExecutor p = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));
        // 讓主線程等待任務(wù)完成(防止jvm退出席里,另一種方案是輪詢pool.getActiveCount()
        CountDownLatch latch = new CountDownLatch(6);
        Callable<String> task = () -> {
            try {
                TimeUnit.SECONDS.sleep(2);
                System.out.println("hi");
                return "hi";
            } finally {
                latch.countDown();
            }
        };
        p.submit(task);
        // 提交5個(gè)任務(wù),會(huì)直接進(jìn)入到隊(duì)列中
        for (int i = 0; i < 5; i++)
            p.submit(task);
        System.out.println("shutdown");
        p.shutdown();
        System.out.println(p.isShutdown());// true
        System.out.println(p.getActiveCount());
        // p.submit(task);// RejectedExecutionException
        latch.await();
        System.out.println(p.getActiveCount());
    }
// 輸出:
// shutdown
// true
// 1
// hi
// hi
// hi
// hi
// hi
// hi
// 0

輸出結(jié)果證明提交的任務(wù)以及隊(duì)列中的任務(wù)能夠正常執(zhí)行完成拢驾,如果主進(jìn)程沒(méi)有退出的話奖磁。

  • 使用awaitTermination阻塞等待
    @Test
    public void testShutdownAwait() throws InterruptedException {
        ThreadPoolExecutor p = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));
        Callable<String> task = () -> {
            TimeUnit.SECONDS.sleep(1);
            System.out.println("hi");
            return "hi";
        };
        p.submit(task);
        // 提交5個(gè)任務(wù),會(huì)直接進(jìn)入到隊(duì)列中
        for (int i = 0; i < 5; i++)
            p.submit(task);
        System.out.println("shutdown");
        p.shutdown();
        // 阻塞直達(dá)池中任務(wù)全部完成或者超時(shí)
        p.awaitTermination(1000, TimeUnit.SECONDS);
        System.out.println(p.isShutdown());// true
        System.out.println(p.getActiveCount());

    }

shutdownNow

    @Test
    public void testShutdownNow() throws InterruptedException {
        ThreadPoolExecutor p = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));
        Callable<String> task = () -> {
            System.out.println("start");
            TimeUnit.SECONDS.sleep(3);
            System.out.println("hi");
            return "hi";
        };
        p.submit(task);
        // 提交5個(gè)任務(wù)独旷,會(huì)直接進(jìn)入到隊(duì)列中
        for (int i = 0; i < 5; i++)
            p.submit(task);
        System.out.println("shutdown");
        p.shutdownNow();
        System.out.println(p.isShutdown());// true
        System.out.println(p.getActiveCount());
        while (p.getActiveCount() > 0) {
        }
        System.out.println("end");
    }
// 輸出:
// shutdown
// start
// true
// 0
// end

第一個(gè)任務(wù)啟動(dòng)后沒(méi)有等到完成就被取消(shutdownNow執(zhí)行后getActiveCount就變成了0)署穗,隊(duì)列中任務(wù)不會(huì)啟動(dòng)寥裂。
shutdownNow使用Thread#interrupt來(lái)中斷任務(wù)嵌洼,如果任務(wù)不響應(yīng)中斷,就不會(huì)被中斷封恰。awaitTermination()調(diào)用后會(huì)阻塞等待沒(méi)有被結(jié)束成功的任務(wù)執(zhí)行完成麻养。如果任務(wù)在shutdownNow時(shí)就全部被中斷,任務(wù)就都已經(jīng)結(jié)束了诺舔,awaitTermination()也不會(huì)阻塞鳖昌。
這里引出停止線程的一個(gè)方法:讓線程響應(yīng)中斷。

    @Test
    public void testShutdownNowAwait() throws InterruptedException {
        ThreadPoolExecutor p = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));
        Callable<String> task = () -> {
            System.out.println("start");
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                System.out.println(e);
            }
            System.out.println("hi");
            return "hi";
        };
        p.submit(task);
        // 提交5個(gè)任務(wù)低飒,會(huì)直接進(jìn)入到隊(duì)列中
        for (int i = 0; i < 5; i++)
            p.submit(task);
        System.out.println("shutdown");
        p.shutdownNow(); 
        System.out.println(p.isShutdown());// true
        System.out.println(p.getActiveCount());
        // 阻塞等待active的任務(wù)執(zhí)行完成
        p.awaitTermination(1000, TimeUnit.SECONDS);
        System.out.println("end");
    }
// 輸出
// start
// true
// 1
// java.lang.InterruptedException: sleep interrupted
// hi
// end

小結(jié)

shutdown和shutdownNow的區(qū)別是许昨,前者只是拒絕新任務(wù),如果主線程不消亡褥赊,池中的活躍任務(wù)和隊(duì)列中的任務(wù)都會(huì)執(zhí)行糕档,可以使用awaitTermination()指定pool阻塞等待它們完成;shutdownNow也會(huì)拒絕新任務(wù)拌喉,它會(huì)通過(guò)中斷試著終止活躍任務(wù)速那,但是不一定能終止成功,隊(duì)列中的任務(wù)被取消尿背,如果活躍任務(wù)沒(méi)有被殺死端仰,只要宿主進(jìn)程沒(méi)死也是會(huì)繼續(xù)執(zhí)行的,但是隊(duì)列中的任務(wù)不會(huì)啟動(dòng)田藐,它也可以使用awaitTermination()指定pool阻塞等待沒(méi)被殺死的活躍任務(wù)完成荔烧。

參考資料

[1] 【死磕Java并發(fā)】-----J.U.C之線程池:線程池的基礎(chǔ)架構(gòu)
[2] 【死磕Java并發(fā)】-----J.U.C之線程池:ThreadPoolExecutor

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市汽久,隨后出現(xiàn)的幾起案子鹤竭,更是在濱河造成了極大的恐慌,老刑警劉巖回窘,帶你破解...
    沈念sama閱讀 211,884評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件诺擅,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡啡直,警方通過(guò)查閱死者的電腦和手機(jī)烁涌,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,347評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén)苍碟,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人撮执,你說(shuō)我怎么就攤上這事微峰。” “怎么了抒钱?”我有些...
    開(kāi)封第一講書(shū)人閱讀 157,435評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵蜓肆,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我谋币,道長(zhǎng)仗扬,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,509評(píng)論 1 284
  • 正文 為了忘掉前任蕾额,我火速辦了婚禮早芭,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘诅蝶。我一直安慰自己退个,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,611評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布调炬。 她就那樣靜靜地躺著语盈,像睡著了一般。 火紅的嫁衣襯著肌膚如雪缰泡。 梳的紋絲不亂的頭發(fā)上刀荒,一...
    開(kāi)封第一講書(shū)人閱讀 49,837評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音匀谣,去河邊找鬼照棋。 笑死,一個(gè)胖子當(dāng)著我的面吹牛武翎,可吹牛的內(nèi)容都是我干的烈炭。 我是一名探鬼主播,決...
    沈念sama閱讀 38,987評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼宝恶,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼符隙!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起垫毙,我...
    開(kāi)封第一講書(shū)人閱讀 37,730評(píng)論 0 267
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤霹疫,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后综芥,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體丽蝎,經(jīng)...
    沈念sama閱讀 44,194評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,525評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了屠阻。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片红省。...
    茶點(diǎn)故事閱讀 38,664評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖国觉,靈堂內(nèi)的尸體忽然破棺而出吧恃,到底是詐尸還是另有隱情,我是刑警寧澤麻诀,帶...
    沈念sama閱讀 34,334評(píng)論 4 330
  • 正文 年R本政府宣布痕寓,位于F島的核電站,受9級(jí)特大地震影響蝇闭,放射性物質(zhì)發(fā)生泄漏呻率。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,944評(píng)論 3 313
  • 文/蒙蒙 一丁眼、第九天 我趴在偏房一處隱蔽的房頂上張望筷凤。 院中可真熱鬧,春花似錦苞七、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,764評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至乾蓬,卻和暖如春惠啄,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背任内。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,997評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工撵渡, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人死嗦。 一個(gè)月前我還...
    沈念sama閱讀 46,389評(píng)論 2 360
  • 正文 我出身青樓趋距,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親越除。 傳聞我的和親對(duì)象是個(gè)殘疾皇子节腐,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,554評(píng)論 2 349

推薦閱讀更多精彩內(nèi)容