Java高并發(fā)系列——檢視閱讀(五)

Java高并發(fā)系列——線程池

JAVA線程池

線程池實現(xiàn)原理

類似于一個工廠的運作充尉。

當(dāng)向線程池提交一個任務(wù)之后飘言,線程池的處理流程如下:

  1. 判斷是否達到核心線程數(shù),若未達到驼侠,則直接創(chuàng)建新的線程處理當(dāng)前傳入的任務(wù)姿鸿,否則進入下個流程
  2. 線程池中的工作隊列是否已滿,若未滿倒源,則將任務(wù)丟入工作隊列中先存著等待處理般妙,否則進入下個流程
  3. 是否達到最大線程數(shù),若未達到相速,則創(chuàng)建新的線程處理當(dāng)前傳入的任務(wù)碟渺,否則交給線程池中的飽和策略進行處理。
image.png

java中的線程池

jdk中提供了線程池的具體實現(xiàn)突诬,實現(xiàn)類是:java.util.concurrent.ThreadPoolExecutor苫拍,主要構(gòu)造方法:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

corePoolSize核心線程大小,當(dāng)提交一個任務(wù)到線程池時旺隙,線程池會創(chuàng)建一個線程來執(zhí)行任務(wù)绒极,即使有其他空閑線程可以處理任務(wù)也會創(chuàng)新線程,等到工作的線程數(shù)大于核心線程數(shù)時就不會在創(chuàng)建了蔬捷。如果調(diào)用了線程池的prestartAllCoreThreads方法垄提,線程池會提前把核心線程都創(chuàng)造好,并啟動周拐。(prestartCoreThread:啟動一個核心線程或 prestartAllCoreThreads:啟動全部核心線程 )

maximumPoolSize線程池允許創(chuàng)建的最大線程數(shù)铡俐。如果隊列滿了,并且以創(chuàng)建的線程數(shù)小于最大線程數(shù)妥粟,則線程池會再創(chuàng)建新的線程執(zhí)行任務(wù)审丘。如果我們使用了無界隊列(或者大小是Integer.MAX_VALUE,可能還沒達到就OOM了)勾给,那么所有的任務(wù)會加入隊列滩报,這個參數(shù)就沒有什么效果了锅知。

keepAliveTime線程池的工作線程空閑后,保持存活的時間脓钾。如果沒有任務(wù)處理了售睹,有些線程會空閑,空閑的時間超過了這個值可训,會被回收掉昌妹。如果任務(wù)很多,并且每個任務(wù)的執(zhí)行時間比較短沉噩,避免線程重復(fù)創(chuàng)建和回收捺宗,可以調(diào)大這個時間,提高線程的利用率

unitkeepAliveTIme的時間單位川蒙,可以選擇的單位有天蚜厉、小時、分鐘畜眨、毫秒昼牛、微妙、千分之一毫秒和納秒康聂。類型是一個枚舉java.util.concurrent.TimeUnit贰健,這個枚舉也經(jīng)常使用,有興趣的可以看一下其源碼

workQueue工作隊列恬汁,用于緩存待處理任務(wù)的阻塞隊列伶椿,常見的有4種(ArrayBlockingQueueLinkedBlockingQueue 氓侧、SynchronousQueue 脊另、PriorityBlockingQueue

threadFactory線程池中創(chuàng)建線程的工廠,可以通過線程工廠給每個創(chuàng)建出來的線程設(shè)置更有意義的名字

handler飽和策略约巷,當(dāng)線程池?zé)o法處理新來的任務(wù)了偎痛,那么需要提供一種策略處理提交的新任務(wù),默認有4種策略(AbortPolicy 独郎、CallerRunsPolicy 踩麦、DiscardOldestPolicyDiscardPolicy

調(diào)用線程池的execute方法處理任務(wù)氓癌,執(zhí)行execute方法的過程:

  1. 判斷線程池中運行的線程數(shù)是否小于corepoolsize谓谦,是:則創(chuàng)建新的線程來處理任務(wù),否:執(zhí)行下一步
  2. 試圖將任務(wù)添加到workQueue指定的隊列中顽铸,如果無法添加到隊列茁计,進入下一步
  3. 判斷線程池中運行的線程數(shù)是否小于maximumPoolSize,是:則新增線程處理當(dāng)前傳入的任務(wù)谓松,否:將任務(wù)傳遞給handler對象rejectedExecution方法處理

線程池的使用步驟:

  1. 調(diào)用構(gòu)造方法創(chuàng)建線程池
  2. 調(diào)用線程池的方法處理任務(wù)
  3. 關(guān)閉線程池

線程池中常見5種工作隊列

任務(wù)太多的時候星压,工作隊列用于暫時緩存待處理的任務(wù),jdk中常見的4種阻塞隊列:

ArrayBlockingQueue:是一個基于數(shù)組結(jié)構(gòu)的有界阻塞隊列鬼譬,此隊列按照先進先出原則對元素進行排序

LinkedBlockingQueue:是一個基于鏈表結(jié)構(gòu)的阻塞隊列娜膘,此隊列按照先進先出排序元素,吞吐量通常要高于ArrayBlockingQueue优质。靜態(tài)工廠方法Executors.newFixedThreadPool使用了這個隊列竣贪。

SynchronousQueue一個不存儲元素的阻塞隊列,每個插入操作必須等到另外一個線程調(diào)用移除操作巩螃,否則插入操作一直處理阻塞狀態(tài)演怎,吞吐量通常要高于LinkedBlockingQueue,靜態(tài)工廠方法Executors.newCachedThreadPool使用這個隊列避乏。

PriorityBlockingQueue優(yōu)先級隊列爷耀,進入隊列的元素按照優(yōu)先級會進行排序。

SynchronousQueue隊列的線程池

使用Executors.newCachedThreadPool()創(chuàng)建線程池拍皮,看一下的源碼:

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

newCachedThreadPool()使用了SynchronousQueue同步隊列歹叮,這種隊列比較特殊,放入元素必須要有另外一個線程去獲取這個元素铆帽,否則放入元素會失敗或者一直阻塞在那里直到有線程取走咆耿,示例中任務(wù)處理休眠了指定的時間,導(dǎo)致已創(chuàng)建的工作線程都忙于處理任務(wù)爹橱,所以新來任務(wù)之后萨螺,將任務(wù)丟入同步隊列會失敗,丟入隊列失敗之后愧驱,會嘗試新建線程處理任務(wù)慰技。使用上面的方式創(chuàng)建線程池需要注意,如果需要處理的任務(wù)比較耗時冯键,會導(dǎo)致新來的任務(wù)都會創(chuàng)建新的線程進行處理惹盼,可能會導(dǎo)致創(chuàng)建非常多的線程,最終耗盡系統(tǒng)資源惫确,觸發(fā)OOM手报。

//SynchronousQueue隊列默認是false,采用先進后出的棧處理,也可以是公平隊列先進先出改化。
public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
PriorityBlockingQueue優(yōu)先級隊列的線程池

輸出中掩蛤,除了第一個任務(wù),其他任務(wù)按照優(yōu)先級高低按順序處理陈肛。原因在于:創(chuàng)建線程池的時候使用了優(yōu)先級隊列揍鸟,進入隊列中的任務(wù)會進行排序,任務(wù)的先后順序由Task中的i變量決定句旱。向PriorityBlockingQueue加入元素的時候阳藻,內(nèi)部會調(diào)用代碼中Task的compareTo方法決定元素的先后順序晰奖。

示例:

public class ThreadPoolExecutorPriorityTest {
    /**
     * 優(yōu)先級隊列執(zhí)行的任務(wù)要實現(xiàn)Comparable比較
     */
    static class Task implements Runnable, Comparable<Task> {
        private int i;
        private String name;
        public Task(int i, String name) {
            this.i = i;
            this.name = name;
        }
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "處理" + this.name);
        }
        @Override
        public int compareTo(Task o) {
            return Integer.compare(o.i, this.i);
        }
    }

    //自定義線程工廠,優(yōu)先級隊列
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 60,
            TimeUnit.SECONDS, new PriorityBlockingQueue<>(), new DemoThreadFactory("訂單創(chuàng)建組"), new ThreadPoolExecutor.AbortPolicy());

    public static void main(String[] args) {
        for (int i = 1; i <= 10; i++) {
            int j = i;
            String taskName = "task" + j;
            executor.execute(new Task(j,taskName));
        }

        for (int i = 90; i <= 100; i++) {
            int j = i;
            String taskName = "task" + j;
            executor.execute(new Task(j,taskName));
        }
        executor.shutdown();
    }
}

輸出:

From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task1
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task100
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task99
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task98
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task97
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task96
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task95
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task94
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task93
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task92
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task91
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task90
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task10
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task9
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task8
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task7
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task6
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task5
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task4
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task3
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task2

自定義創(chuàng)建線程的工廠

給線程池中線程起一個有意義的名字,在系統(tǒng)出現(xiàn)問題的時候腥泥,通過線程堆棧信息可以更容易發(fā)現(xiàn)系統(tǒng)中問題所在匾南。通過jstack查看線程的堆棧信息,也可以看到我們自定義的名稱 蛔外。

自定義創(chuàng)建工廠需要實現(xiàn)java.util.concurrent.ThreadFactory接口中的Thread newThread(Runnable r)方法蛆楞,參數(shù)為傳入的任務(wù),需要返回一個工作線程夹厌。

示例:

public class ThreadPoolExecutorTest {

    //默認線程創(chuàng)建
    /*  private static ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60,
              TimeUnit.SECONDS, new LinkedBlockingQueue<>(15), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());*/

    //自定義線程工廠1
/*    private static final AtomicInteger nextId = new AtomicInteger(1);
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,
            TimeUnit.SECONDS, new LinkedBlockingQueue<>(12), (r -> {
        Thread t = new Thread(r);
        t.setName("示范線程" + nextId.getAndIncrement());
        return t;
    }), new ThreadPoolExecutor.AbortPolicy());*/

    //自定義線程工廠2 豹爹,推薦
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,
            TimeUnit.SECONDS, new LinkedBlockingQueue<>(15), new DemoThreadFactory("訂單創(chuàng)建組"), new ThreadPoolExecutor.AbortPolicy());

    public static void main(String[] args) {
        //提前啟動所有核心線程
        executor.prestartAllCoreThreads();
        //提前啟動一個核心線程
        executor.prestartCoreThread();
        for (int i = 1; i <= 20; i++) {
            int j = i;
            String taskName = "task" + j;
            executor.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(j);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "線程執(zhí)行" + taskName + "完畢!");
            });
        }
        executor.shutdown();
    }
}

輸出:

From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1線程執(zhí)行task1完畢矛纹!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-3線程執(zhí)行task2完畢臂聋!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-2線程執(zhí)行task3完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-4線程執(zhí)行task4完畢崖技!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-5線程執(zhí)行task5完畢逻住!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1線程執(zhí)行task6完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-3線程執(zhí)行task7完畢迎献!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-2線程執(zhí)行task8完畢瞎访!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-4線程執(zhí)行task9完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-5線程執(zhí)行task10完畢吁恍!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-6線程執(zhí)行task17完畢扒秸!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1線程執(zhí)行task11完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-7線程執(zhí)行task20完畢冀瓦!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-3線程執(zhí)行task12完畢伴奥!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-2線程執(zhí)行task13完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-4線程執(zhí)行task14完畢翼闽!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-5線程執(zhí)行task15完畢拾徙!

四種常見飽和策略

當(dāng)線程池中隊列已滿,并且線程池已達到最大線程數(shù)感局,線程池會將任務(wù)傳遞給飽和策略進行處理尼啡。這些策略都實現(xiàn)了RejectedExecutionHandler接口。接口中有個方法:

void rejectedExecution(Runnable r, ThreadPoolExecutor executor)

參數(shù)說明:

r:需要執(zhí)行的任務(wù)

executor:當(dāng)前線程池對象

JDK中提供了4種常見的飽和策略:

AbortPolicy:直接拋出異常询微。

CallerRunsPolicy:在當(dāng)前調(diào)用者的線程中運行任務(wù)崖瞭,即誰丟來的任務(wù),由他自己去處理撑毛。

DiscardOldestPolicy:丟棄隊列中最老的一個任務(wù)书聚,即丟棄隊列頭部的一個任務(wù),然后執(zhí)行當(dāng)前傳入的任務(wù)。

DiscardPolicy:不處理雌续,直接丟棄掉斩个,方法內(nèi)部為空。

解釋:

//自定義線程工廠
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 5, 60,
        TimeUnit.SECONDS, new LinkedBlockingQueue<>(5),
        new DemoThreadFactory("訂單創(chuàng)建組"), new ThreadPoolExecutor.CallerRunsPolicy());
        
AbortPolicy:直接拋出異常西雀。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
    " rejected from " +
    e.toString());
}
輸出:到飽和策略時拋出異常記錄萨驶,丟棄掉任務(wù)11個歉摧。
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.self.current.ThreadPoolExecutorTest$$Lambda$1/1915503092@50134894 rejected from java.util.concurrent.ThreadPoolExecutor@2957fcb0[Running, pool size = 5, active threads = 4, queued tasks = 5, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at com.self.current.ThreadPoolExecutorTest.main(ThreadPoolExecutorTest.java:47)
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1線程執(zhí)行task1完畢艇肴!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1線程執(zhí)行task2完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-2線程執(zhí)行task6完畢叁温!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1線程執(zhí)行task3完畢再悼!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-3線程執(zhí)行task7完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-4線程執(zhí)行task8完畢膝但!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-5線程執(zhí)行task9完畢冲九!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-2線程執(zhí)行task4完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1線程執(zhí)行task5完畢跟束!

        
CallerRunsPolicy:在當(dāng)前調(diào)用者的線程中運行任務(wù)莺奸,即隨丟來的任務(wù),由他自己去處理冀宴。如main方法調(diào)用的線程池灭贷,則如果走到飽和策略處理時,由main方法處理這個任務(wù)略贮。不會丟棄任何一個任務(wù)甚疟,但執(zhí)行會變得很慢。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }    
輸出:
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1線程執(zhí)行task1完畢逃延!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1線程執(zhí)行task2完畢览妖!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-2線程執(zhí)行task6完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1線程執(zhí)行task3完畢揽祥!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-3線程執(zhí)行task8完畢讽膏!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-4線程執(zhí)行task9完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-2線程執(zhí)行task4完畢拄丰!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-5線程執(zhí)行task10完畢府树!
main線程執(zhí)行task11完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1線程執(zhí)行task5完畢愈案!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-3線程執(zhí)行task7完畢挺尾!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-4線程執(zhí)行task12完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-2線程執(zhí)行task13完畢站绪!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-5線程執(zhí)行task14完畢遭铺!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1線程執(zhí)行task15完畢!
main線程執(zhí)行task17完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-3線程執(zhí)行task16完畢魂挂!

DiscardOldestPolicy:丟棄隊列中最老的一個任務(wù)甫题,即丟棄隊列頭部的一個任務(wù),然后執(zhí)行當(dāng)前傳入的任務(wù)涂召。這時候線程池會在執(zhí)行到飽和策略時丟棄掉頭部最老的認為坠非,沒有任何記錄,任務(wù)就丟掉了果正。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
    e.getQueue().poll();
    e.execute(r);
    }
}

輸出:20個任務(wù)被無聲無息地丟掉了11個
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-2線程執(zhí)行task6完畢炎码!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-3線程執(zhí)行task7完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-4線程執(zhí)行task8完畢秋泳!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-5線程執(zhí)行task9完畢潦闲!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1線程執(zhí)行task16完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-2線程執(zhí)行task17完畢迫皱!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-3線程執(zhí)行task18完畢歉闰!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-4線程執(zhí)行task19完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-5線程執(zhí)行task20完畢卓起!

DiscardPolicy:不處理和敬,直接丟棄掉,方法內(nèi)部為空戏阅。沒處理Runnable r就表示丟棄了昼弟。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
輸出:20個任務(wù)被無聲無息地丟掉了10個
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1線程執(zhí)行task1完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1線程執(zhí)行task2完畢饲握!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1線程執(zhí)行task3完畢私杜!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-2線程執(zhí)行task7完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-3線程執(zhí)行task8完畢救欧!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-4線程執(zhí)行task9完畢衰粹!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-5線程執(zhí)行task10完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1線程執(zhí)行task4完畢笆怠!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-2線程執(zhí)行task5完畢铝耻!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-3線程執(zhí)行task6完畢!

自定義飽和策略

需要實現(xiàn)RejectedExecutionHandler接口蹬刷。任務(wù)無法處理的時候瓢捉,我們想記錄一下日志,我們需要自定義一個飽和策略办成。記錄了任務(wù)的日志泡态,對于無法處理多任務(wù),我們最好能夠記錄一下迂卢,讓開發(fā)人員能夠知道某弦。 任務(wù)進入了飽和策略桐汤,說明線程池的配置可能不是太合理,或者機器的性能有限靶壮,需要做一些優(yōu)化調(diào)整怔毛。

實例:

public class ThreadPoolExecutorRejectHandlerTest {
    static class Task implements Runnable {
        String name;

        public Task(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "處理" + this.name);
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        @Override
        public String toString() {
            return "Task{" +
                    "name='" + name + '\'' +
                    '}';
        }
    }
    //自定義包含策略:可以直接用函數(shù)式方法定義,也可以實現(xiàn)RejectedExecutionHandler自定義
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 5, 60,
            TimeUnit.SECONDS, new LinkedBlockingQueue<>(5),
            new DemoThreadFactory("訂單創(chuàng)建組"), (r,executor)->{
        //自定義飽和策略
        //記錄一下無法處理的任務(wù)
        System.out.println("無法處理的任務(wù):" + r.toString());
    });

    public static void main(String[] args) {
        //提前啟動所有核心線程
        executor.prestartAllCoreThreads();
        //提前啟動一個核心線程
        executor.prestartCoreThread();
        for (int i = 1; i <= 20; i++) {
            int j = i;
            String taskName = "task" + j;
            executor.execute(new Task(taskName));
        }
        executor.shutdown();
    }
}

輸出:

無法處理的任務(wù):Task{name='task10'}
無法處理的任務(wù):Task{name='task11'}
無法處理的任務(wù):Task{name='task12'}
無法處理的任務(wù):Task{name='task13'}
無法處理的任務(wù):Task{name='task14'}
無法處理的任務(wù):Task{name='task15'}
無法處理的任務(wù):Task{name='task16'}
無法處理的任務(wù):Task{name='task17'}
無法處理的任務(wù):Task{name='task18'}
無法處理的任務(wù):Task{name='task19'}
無法處理的任務(wù):Task{name='task20'}
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task1
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-2處理task6
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-3處理task7
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-4處理task8
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-5處理task9
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-2處理task2
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task3
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-4處理task5
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-3處理task4

線程池中的2個關(guān)閉方法

線程池提供了2個關(guān)閉方法:shutdownshutdownNow腾降,當(dāng)調(diào)用者兩個方法之后拣度,線程池會遍歷內(nèi)部的工作線程,然后調(diào)用每個工作線程的interrrupt方法給線程發(fā)送中斷信號螃壤,內(nèi)部如果無法響應(yīng)中斷信號的可能永遠無法終止抗果,所以如果內(nèi)部有無線循環(huán)的,最好在循環(huán)內(nèi)部檢測一下線程的中斷信號映穗,合理的退出窖张。調(diào)用者兩個方法中任意一個,線程池的isShutdown方法(是否執(zhí)行了關(guān)閉線程池命令)就會返回true蚁滋,當(dāng)所有的任務(wù)線程都關(guān)閉之后,才表示線程池關(guān)閉成功赘淮,這時調(diào)用isTerminaed方法(是否關(guān)閉成功)會返回true辕录。

調(diào)用shutdown方法之后,線程池將不再接受新任務(wù)梢卸,內(nèi)部會將所有已提交的任務(wù)處理完畢走诞,處理完畢之后,工作線程自動退出蛤高。

而調(diào)用shutdownNow方法后蚣旱,線程池會將還未處理的(在隊里等待處理的任務(wù))任務(wù)移除,將正在處理中的處理完畢之后戴陡,工作線程自動退出塞绿。

至于調(diào)用哪個方法來關(guān)閉線程,應(yīng)該由提交到線程池的任務(wù)特性決定恤批,多數(shù)情況下調(diào)用shutdown方法來關(guān)閉線程池异吻,如果任務(wù)不一定要執(zhí)行完,則可以調(diào)用shutdownNow方法喜庞。

擴展線程池

ThreadPoolExecutor內(nèi)部提供了幾個方法beforeExecute诀浪、afterExecuteterminated延都,可以由開發(fā)人員自己去重寫實現(xiàn)這些方法雷猪。

看一下線程池內(nèi)部的源碼:

try {
    beforeExecute(wt, task);//任務(wù)執(zhí)行之前調(diào)用的方法
    Throwable thrown = null;
    try {
        task.run();
    } catch (RuntimeException x) {
        thrown = x;
        throw x;
    } catch (Error x) {
        thrown = x;
        throw x;
    } catch (Throwable x) {
        thrown = x;
        throw new Error(x);
    } finally {
        afterExecute(task, thrown);//任務(wù)執(zhí)行完畢之后調(diào)用的方法
    }
} finally {
    task = null;
    w.completedTasks++;
    w.unlock();
}

beforeExecute:任務(wù)執(zhí)行之前調(diào)用的方法,有2個參數(shù)晰房,第1個參數(shù)是執(zhí)行任務(wù)的線程求摇,第2個參數(shù)是任務(wù)

protected void beforeExecute(Thread t, Runnable r) { }

afterExecute:任務(wù)執(zhí)行完成之后調(diào)用的方法酵颁,2個參數(shù),第1個參數(shù)表示任務(wù)月帝,第2個參數(shù)表示任務(wù)執(zhí)行時的異常信息躏惋,如果無異常,第二個參數(shù)為null

protected void afterExecute(Runnable r, Throwable t) { }

terminated:線程池最終關(guān)閉之后調(diào)用的方法嚷辅。所有的工作線程都退出了簿姨,最終線程池會退出,退出時調(diào)用該方法

實例:

public class ThreadPoolExecutorExtensionTest {
    static class Task implements Runnable {
        String name;

        public Task(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "處理" + this.name);
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        @Override
        public String toString() {
            return "Task{" +
                    "name='" + name + '\'' +
                    '}';
        }
    }
    //擴展線程池,可以繼承也可以直接重寫
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,
            TimeUnit.SECONDS, new LinkedBlockingQueue<>(15),
            new DemoThreadFactory("訂單創(chuàng)建組"), new ThreadPoolExecutor.AbortPolicy()){
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            System.out.println(t.getName() + ",開始執(zhí)行任務(wù):" + r.toString());
        }

        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            System.out.println(Thread.currentThread().getName() + ",任務(wù):" + r.toString() + ",執(zhí)行完畢!");
        }

        @Override
        protected void terminated() {
            System.out.println(Thread.currentThread().getName() + ",關(guān)閉線程池!");
        }
    };

    public static void main(String[] args) {
        for (int i = 1; i <= 3; i++) {
            int j = i;
            String taskName = "task" + j;
            executor.execute(new Task(taskName));
        }
        executor.shutdown();
    }
}

輸出:

From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1,開始執(zhí)行任務(wù):Task{name='task1'}
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1處理task1
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-2,開始執(zhí)行任務(wù):Task{name='task2'}
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-2處理task2
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-3,開始執(zhí)行任務(wù):Task{name='task3'}
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-3處理task3
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-1,任務(wù):Task{name='task1'}耘纱,執(zhí)行完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-2,任務(wù):Task{name='task2'}贷岸,執(zhí)行完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-3,任務(wù):Task{name='task3'},執(zhí)行完畢!
From DemoThreadFactory's 訂單創(chuàng)建組-Worker-3糊探,關(guān)閉線程池!

合理地配置線程池

要想合理的配置線程池,需要先分析任務(wù)的特性,可以沖一下四個角度分析:

  • 任務(wù)的性質(zhì):CPU密集型任務(wù)暇务、IO密集型任務(wù)和混合型任務(wù)
  • 任務(wù)的優(yōu)先級:高、中怔软、低
  • 任務(wù)的執(zhí)行時間:長垦细、中、短
  • 任務(wù)的依賴性:是否依賴其他的系統(tǒng)資源挡逼,如數(shù)據(jù)庫連接括改。

性質(zhì)不同任務(wù)可以用不同規(guī)模的線程池分開處理。CPU密集型任務(wù)應(yīng)該盡可能小的線程家坎,如配置cpu數(shù)量+1個線程的線程池嘱能。由于IO密集型任務(wù)并不是一直在執(zhí)行任務(wù),不能讓cpu閑著虱疏,則應(yīng)配置盡可能多的線程惹骂,如:cup數(shù)量2混合型的任務(wù)订框,如果可以拆分析苫,將其拆分成一個CPU密集型任務(wù)和一個IO密集型任務(wù)*,只要這2個任務(wù)執(zhí)行的時間相差不是太大穿扳,那么分解后執(zhí)行的吞吐量將高于串行執(zhí)行的吞吐量衩侥。可以通過Runtime.getRuntime().availableProcessors()方法獲取cpu數(shù)量矛物。優(yōu)先級不同任務(wù)可以對線程池采用優(yōu)先級隊列來處理茫死,讓優(yōu)先級高的先執(zhí)行。

使用隊列的時候建議使用有界隊列履羞,有界隊列增加了系統(tǒng)的穩(wěn)定性峦萎,如果采用無界隊列屡久,任務(wù)太多的時候可能導(dǎo)致系統(tǒng)OOM,直接讓系統(tǒng)宕機爱榔。

線程池中線程數(shù)量的配置

線程池中總線程大小對系統(tǒng)的性能有一定的影響被环,我們的目標(biāo)是希望系統(tǒng)能夠發(fā)揮最好的性能,過多或者過小的線程數(shù)量無法有效的使用機器的性能详幽。在Java Concurrency in Practice書中給出了估算線程池大小的公式:

Ncpu = CUP的數(shù)量
Ucpu = 目標(biāo)CPU的使用率筛欢,0<=Ucpu<=1
W/C = 等待時間與計算時間的比例
為保存處理器達到期望的使用率,最優(yōu)的線程池的大小等于:
Nthreads = Ncpu × Ucpu × (1+W/C)
線程池數(shù)量 = CUP的數(shù)量 * 目標(biāo)CPU的使用率 * 等待時間與計算時間的比例

使用建議

在《阿里巴巴java開發(fā)手冊》中指出了線程資源必須通過線程池提供唇聘,不允許在應(yīng)用中自行顯示的創(chuàng)建線程版姑,這樣一方面是線程的創(chuàng)建更加規(guī)范,可以合理控制開辟線程的數(shù)量迟郎;另一方面線程的細節(jié)管理交給線程池處理剥险,優(yōu)化了資源的開銷。而線程池不允許使用Executors去創(chuàng)建宪肖,而要通過ThreadPoolExecutor方式表制,這一方面是由于jdk中Executor框架雖然提供了如newFixedThreadPool()、newSingleThreadExecutor()匈庭、newCachedThreadPool()等創(chuàng)建線程池的方法夫凸,但都有其局限性,不夠靈活阱持;另外由于前面幾種方法內(nèi)部也是通過ThreadPoolExecutor方式實現(xiàn),使用ThreadPoolExecutor有助于大家明確線程池的運行規(guī)則魔熏,創(chuàng)建符合自己的業(yè)務(wù)場景需要的線程池衷咽,避免資源耗盡的風(fēng)險。

線程池不允許使用Executors去創(chuàng)建蒜绽,而是通過ThreadPoolExecutor的方式镶骗,這樣的處理方式讓寫的同學(xué)更加明確線程池的運行規(guī)則,規(guī)避資源耗盡的風(fēng)險躲雅。

說明:Executors返回的線程池對象的弊端如下:

1) FixedThreadPool和SingleThreadPool:

允許的請求隊列:LinkedBlockingQueue長度為Integer.MAX_VALUE鼎姊,可能會堆積大量的請求,從而導(dǎo)致OOM相赁。

2) CachedThreadPool:

允許的創(chuàng)建線程數(shù)量為Integer.MAX_VALUE相寇,可能會創(chuàng)建大量的線程,從而導(dǎo)致OOM钮科。

疑問:

Q:LinkedBlockingQueue吞吐量通常要高于ArrayBlockingQueue唤衫,為什么?

LinkedBlockingQueue底層是鏈表绵脯,增刪效率比較高佳励,而ArrayBlockingQueue底層是數(shù)組休里,增刪效率比較低。

最主要的是ArrayBlockingQueue數(shù)據(jù)的插入與取出共用同一個鎖赃承,因此ArrayBlockingQueue并不能實現(xiàn)生產(chǎn)妙黍、消費同時進行;
LinkedBlockingQueue中用于阻塞生產(chǎn)者瞧剖、消費者的鎖是兩個(鎖分離:注意這里是JDK8之前的設(shè)計拭嫁,JDK8之后是用一個鎖實現(xiàn)),因此生產(chǎn)與消費是可以同時進行的筒繁。

JUC中的Executor框架

Excecutor框架主要包含3部分的內(nèi)容:

  1. 任務(wù)相關(guān)的:包含被執(zhí)行的任務(wù)要實現(xiàn)的接口:Runnable接口或Callable接口
  2. 任務(wù)的執(zhí)行相關(guān)的:包含任務(wù)執(zhí)行機制的核心接口Executor噩凹,以及繼承自ExecutorExecutorService接口。Executor框架中有兩個關(guān)鍵的類實現(xiàn)了ExecutorService接口(ThreadPoolExecutorScheduleThreadPoolExecutor
  3. 異步計算結(jié)果相關(guān)的:包含接口Future實現(xiàn)Future接口的FutureTask類

Executors框架包括:

  • Executor
  • ExecutorService
  • ThreadPoolExecutor
  • Executors
  • Future
  • Callable
  • FutureTask
  • CompletableFuture
  • CompletionService
  • ExecutorCompletionService

Executor接口

Executor接口中定義了方法execute(Runable able)接口毡咏,該方法接受一個Runable實例驮宴,他來執(zhí)行一個任務(wù),任務(wù)即實現(xiàn)一個Runable接口的類呕缭。

ExecutorService接口

ExecutorService繼承于Executor接口堵泽,他提供了更為豐富的線程實現(xiàn)方法,比如ExecutorService提供關(guān)閉自己的方法恢总,以及為跟蹤一個或多個異步任務(wù)執(zhí)行狀況而生成Future的方法迎罗。

ExecutorService有三種狀態(tài):運行、關(guān)閉片仿、終止纹安。創(chuàng)建后便進入運行狀態(tài),當(dāng)調(diào)用了shutdown()方法時砂豌,便進入了關(guān)閉狀態(tài)厢岂,此時意味著ExecutorService不再接受新的任務(wù),但是他還是會執(zhí)行已經(jīng)提交的任務(wù)阳距,當(dāng)所有已經(jīng)提交了的任務(wù)執(zhí)行完后塔粒,便達到終止?fàn)顟B(tài)。如果不調(diào)用shutdown方法筐摘,ExecutorService方法會一直運行下去卒茬,系統(tǒng)一般不會主動關(guān)閉。

ThreadPoolExecutor類

線程池類咖熟,實現(xiàn)了ExecutorService接口中所有方法圃酵,參考線程池的使用。

ScheduleThreadPoolExecutor定時器

ScheduleThreadPoolExecutor繼承自ThreadPoolExecutor(實現(xiàn)了線程池的核心功能),實現(xiàn)了ScheduledExecutorService(實現(xiàn)了定時器調(diào)度功能)球恤,他主要用來延遲執(zhí)行任務(wù)辜昵,或者定時執(zhí)行任務(wù)。功能和Timer類似咽斧,但是ScheduleThreadPoolExecutor更強大堪置、更靈活一些躬存。Timer后臺是單個線程,而ScheduleThreadPoolExecutor可以在創(chuàng)建的時候指定多個線程舀锨。

public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {
            public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);
        }
schedule:延遲執(zhí)行任務(wù)1次

使用ScheduleThreadPoolExecutor的schedule方法岭洲,看一下這個方法的聲明:

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)

3個參數(shù):

command:需要執(zhí)行的任務(wù)

delay:需要延遲的時間

unit:參數(shù)2的時間單位,是個枚舉坎匿,可以是天盾剩、小時、分鐘替蔬、秒告私、毫秒、納秒等

實例:

//只延遲調(diào)度一次
public static void main(String[] args) {
    ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3,
            new DemoThreadFactory("延遲調(diào)度線程池"));
    scheduledThreadPool.schedule(()->{
        System.out.println(System.currentTimeMillis()+"開始執(zhí)行調(diào)度承桥!");
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(System.currentTimeMillis()+"執(zhí)行調(diào)度結(jié)束驻粟!");
    },3,TimeUnit.SECONDS);
}

輸出:

1598509985652開始執(zhí)行調(diào)度!
1598509990653執(zhí)行調(diào)度結(jié)束凶异!
scheduleAtFixedRate:固定的頻率執(zhí)行任務(wù)

使用ScheduleThreadPoolExecutor的scheduleAtFixedRate方法蜀撑,該方法設(shè)置了執(zhí)行周期,下一次執(zhí)行時間相當(dāng)于是上一次的執(zhí)行時間加上period剩彬,任務(wù)每次執(zhí)行完畢之后才會計算下次的執(zhí)行時間酷麦。

看一下這個方法的聲明:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

4個參數(shù):

command:表示要執(zhí)行的任務(wù)

initialDelay:表示延遲多久執(zhí)行第一次

period:連續(xù)執(zhí)行之間的時間間隔

unit:參數(shù)2和參數(shù)3的時間單位,是個枚舉喉恋,可以是天沃饶、小時、分鐘轻黑、秒绍坝、毫秒、納秒等

假設(shè)系統(tǒng)調(diào)用scheduleAtFixedRate的時間是T1苔悦,那么執(zhí)行時間如下:

第1次:T1+initialDelay

第2次:T1+initialDelay+period(這時候如果第一次執(zhí)行完后時間大于固定頻率的時間,就會被馬上調(diào)度起來)

第3次:T1+initialDelay+2*period

第n次:T1+initialDelay+(n-1)*period

實例:

//scheduleAtFixedRate()表示每次方法的執(zhí)行周期是多久關(guān)注的是執(zhí)行周期椎咧,如果已經(jīng)到了執(zhí)行周期玖详,就會立即開啟調(diào)度任務(wù),時間間隔是調(diào)度任務(wù)開始時間加周期
public static void main2(String[] args) throws ExecutionException, InterruptedException {
    //任務(wù)執(zhí)行計數(shù)器
    AtomicInteger count = new AtomicInteger(1);
    ScheduledExecutorService scheduledThreadPool = new ScheduledThreadPoolExecutor(3,
            new DemoThreadFactory("延遲調(diào)度線程池"),new ThreadPoolExecutor.AbortPolicy());
    ScheduledFuture<?> schedule = scheduledThreadPool.scheduleAtFixedRate(() -> {
        int currCount = count.getAndIncrement();
        System.out.println(Thread.currentThread().getName()+":"+new Date(System.currentTimeMillis()) + " 第" + currCount + "次" + "開始執(zhí)行");
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName()+":"+new Date(System.currentTimeMillis()) + " 第" + currCount + "次" + "結(jié)束執(zhí)行");
    }, 1,3, TimeUnit.SECONDS);

}

輸出:

From DemoThreadFactory's 延遲調(diào)度線程池-Worker-1:Thu Aug 27 14:36:17 CST 2020 第1次開始執(zhí)行
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-1:Thu Aug 27 14:36:22 CST 2020 第1次結(jié)束執(zhí)行
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-1:Thu Aug 27 14:36:22 CST 2020 第2次開始執(zhí)行
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-1:Thu Aug 27 14:36:27 CST 2020 第2次結(jié)束執(zhí)行
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-2:Thu Aug 27 14:36:27 CST 2020 第3次開始執(zhí)行
任務(wù)當(dāng)前執(zhí)行完畢之后會計算下次執(zhí)行時間勤讽,下次執(zhí)行時間為上次執(zhí)行的開始時間+period蟋座,這個時間小于第一次結(jié)束的時間了,說明小于系統(tǒng)當(dāng)前時間了脚牍,會立即執(zhí)行向臀。
scheduleWithFixedDelay:固定的間隔執(zhí)行任務(wù)

使用ScheduleThreadPoolExecutor的scheduleWithFixedDelay方法,該方法設(shè)置了執(zhí)行周期诸狭,與scheduleAtFixedRate方法不同的是券膀,下一次執(zhí)行時間是上一次任務(wù)執(zhí)行完的系統(tǒng)時間加上period君纫,因而具體執(zhí)行時間不是固定的,但周期是固定的芹彬,是采用相對固定的延遲來執(zhí)行任務(wù)蓄髓。看一下這個方法的聲明:

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

4個參數(shù):

command:表示要執(zhí)行的任務(wù)

initialDelay:表示延遲多久執(zhí)行第一次

period:表示下次執(zhí)行時間和上次執(zhí)行結(jié)束時間之間的間隔時間

unit:參數(shù)2和參數(shù)3的時間單位,是個枚舉舒帮,可以是天会喝、小時、分鐘玩郊、秒肢执、毫秒、納秒等

假設(shè)系統(tǒng)調(diào)用scheduleAtFixedRate的時間是T1译红,那么執(zhí)行時間如下:

第1次:T1+initialDelay预茄,執(zhí)行結(jié)束時間:E1(執(zhí)行結(jié)束時間是不固定的)

第2次:E1+period,執(zhí)行結(jié)束時間:E2

第3次:E2+period临庇,執(zhí)行結(jié)束時間:E3

第4次:E3+period反璃,執(zhí)行結(jié)束時間:E4

第n次:上次執(zhí)行結(jié)束時間+period

實例:

//scheduleWithFixedDelay()表示每次方法執(zhí)行完后延遲多久執(zhí)行,關(guān)注的是延遲時間假夺,時間間隔是調(diào)度任務(wù)結(jié)束時間加延遲時間
public static void main(String[] args) throws ExecutionException, InterruptedException {
    //任務(wù)執(zhí)行計數(shù)器
    AtomicInteger count = new AtomicInteger(1);
    ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(4,
            new DemoThreadFactory("延遲調(diào)度線程池"));
    ScheduledFuture<?> schedule = scheduledThreadPool.scheduleWithFixedDelay(() -> {
        int currCount = count.getAndIncrement();
        System.out.println(Thread.currentThread().getName()+":"+new Date(System.currentTimeMillis()) + " 第" + currCount + "次" + "開始執(zhí)行");
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName()+":"+new Date(System.currentTimeMillis()) + " 第" + currCount + "次" + "結(jié)束執(zhí)行");
    }, 1,3, TimeUnit.SECONDS);

}

輸出:

From DemoThreadFactory's 延遲調(diào)度線程池-Worker-1:Thu Aug 27 14:39:16 CST 2020 第1次開始執(zhí)行
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-1:Thu Aug 27 14:39:22 CST 2020 第1次結(jié)束執(zhí)行
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-1:Thu Aug 27 14:39:25 CST 2020 第2次開始執(zhí)行
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-1:Thu Aug 27 14:39:30 CST 2020 第2次結(jié)束執(zhí)行
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-2:Thu Aug 27 14:39:33 CST 2020 第3次開始執(zhí)行
延遲1秒之后執(zhí)行第1次淮蜈,后面每次的執(zhí)行時間和上次執(zhí)行結(jié)束時間間隔3秒。

定時任務(wù)有異骋丫恚——沒有對異常處理則定時任務(wù)會結(jié)束

先說補充點知識:schedule梧田、scheduleAtFixedRate、scheduleWithFixedDelay這幾個方法有個返回值ScheduledFuture侧蘸,通過ScheduledFuture可以對執(zhí)行的任務(wù)做一些操作裁眯,如判斷任務(wù)是否被取消、是否執(zhí)行完成讳癌。

再回到上面代碼穿稳,任務(wù)中有個10/0的操作,會觸發(fā)異常晌坤,發(fā)生異常之后沒有任何現(xiàn)象逢艘,被ScheduledExecutorService內(nèi)部給吞掉了,然后這個任務(wù)再也不會執(zhí)行了骤菠,scheduledFuture.isDone()輸出true它改,表示這個任務(wù)已經(jīng)結(jié)束了,再也不會被執(zhí)行了商乎。所以如果程序有異常央拖,開發(fā)者自己注意try-catch處理一下,不然跑著跑著發(fā)現(xiàn)任務(wù)怎么不跑了,也沒有異常輸出鲜戒。

實例:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    //任務(wù)執(zhí)行計數(shù)器
    AtomicInteger count = new AtomicInteger(1);
    ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(4,
            new DemoThreadFactory("延遲調(diào)度線程池"));
    ScheduledFuture<?> schedule = scheduledThreadPool.scheduleWithFixedDelay(() -> {
        int currCount = count.getAndIncrement();
        System.out.println(Thread.currentThread().getName()+":"+new Date(System.currentTimeMillis()) + " 第" + currCount + "次" + "開始執(zhí)行");
    /*    try {
            System.out.println(10/0);
        } catch (Exception e) {
            e.printStackTrace();
        }*/
        System.out.println(10/0);
        System.out.println(Thread.currentThread().getName()+":"+new Date(System.currentTimeMillis()) + " 第" + currCount + "次" + "結(jié)束執(zhí)行");
    }, 1,3, TimeUnit.SECONDS);
    TimeUnit.SECONDS.sleep(3);
    System.out.println(schedule.isCancelled());
    System.out.println(schedule.isDone());
}

輸出:

From DemoThreadFactory's 延遲調(diào)度線程池-Worker-1:Thu Aug 27 14:45:09 CST 2020 第1次開始執(zhí)行
false
true

取消定時任務(wù)的執(zhí)行——調(diào)用ScheduledFuturecancel方法

可能任務(wù)執(zhí)行一會专控,想取消執(zhí)行,可以調(diào)用ScheduledFuturecancel方法袍啡,參數(shù)表示是否給任務(wù)發(fā)送中斷信號踩官。

示例:

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //任務(wù)執(zhí)行計數(shù)器
        AtomicInteger count = new AtomicInteger(1);
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(4,
                new DemoThreadFactory("延遲調(diào)度線程池"));
        ScheduledFuture<?> schedule = scheduledThreadPool.scheduleWithFixedDelay(() -> {
            int currCount = count.getAndIncrement();
            System.out.println(Thread.currentThread().getName()+":"+new Date(System.currentTimeMillis()) + " 第" + currCount + "次" + "開始執(zhí)行");
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+":"+new Date(System.currentTimeMillis()) + " 第" + currCount + "次" + "結(jié)束執(zhí)行");
        }, 1,3, TimeUnit.SECONDS);
        TimeUnit.SECONDS.sleep(5);
        schedule.cancel(false);
        TimeUnit.SECONDS.sleep(1);
        System.out.println("任務(wù)是否被取消:"+schedule.isCancelled());
        System.out.println("任務(wù)是否已完成:"+schedule.isDone());
    }
}

輸出:

From DemoThreadFactory's 延遲調(diào)度線程池-Worker-1:Thu Aug 27 14:53:12 CST 2020 第1次開始執(zhí)行
任務(wù)是否被取消:true
任務(wù)是否已完成:true
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-1:Thu Aug 27 14:53:17 CST 2020 第1次結(jié)束執(zhí)行

Executors類——線程池工具類

Executors類,提供了一系列工廠方法用于創(chuàng)建線程池境输,返回的線程池都實現(xiàn)了ExecutorService接口蔗牡。常用的方法有:

newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)

創(chuàng)建一個單線程的線程池。這個線程池只有一個線程在工作嗅剖,也就是相當(dāng)于單線程串行執(zhí)行所有任務(wù)辩越。如果這個唯一的線程因為異常結(jié)束,那么會有一個新的線程來替代它信粮。此線程池保證所有任務(wù)的執(zhí)行順序按照任務(wù)的提交順序執(zhí)行黔攒。內(nèi)部使用了無限容量的LinkedBlockingQueue阻塞隊列來緩存任務(wù),任務(wù)如果比較多强缘,單線程如果處理不過來督惰,會導(dǎo)致隊列堆滿,引發(fā)OOM旅掂。

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)

創(chuàng)建固定大小的線程池赏胚。每次提交一個任務(wù)就創(chuàng)建一個線程,直到線程達到線程池的最大大小商虐。線程池的大小一旦達到最大值就會保持不變觉阅,在提交新任務(wù),任務(wù)將會進入等待隊列中等待秘车。如果某個線程因為執(zhí)行異常而結(jié)束典勇,那么線程池會補充一個新線程。內(nèi)部使用了無限容量的LinkedBlockingQueue阻塞隊列來緩存任務(wù)叮趴,任務(wù)如果比較多割笙,如果處理不過來,會導(dǎo)致隊列堆滿眯亦,引發(fā)OOM咳蔚。

newCachedThreadPool

public static ExecutorService newCachedThreadPool()
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)

創(chuàng)建一個可緩存的線程池。如果線程池的大小超過了處理任務(wù)所需要的線程搔驼,

那么就會回收部分空閑(60秒處于等待任務(wù)到來)的線程,當(dāng)任務(wù)數(shù)增加時侈询,此線程池又可以智能的添加新線程來處理任務(wù)舌涨。此線程池的最大值是Integer的最大值(2^31-1)。內(nèi)部使用了SynchronousQueue同步隊列來緩存任務(wù),此隊列的特性是放入任務(wù)時必須要有對應(yīng)的線程獲取任務(wù)囊嘉,任務(wù)才可以放入成功温技。如果處理的任務(wù)比較耗時,任務(wù)來的速度也比較快扭粱,會創(chuàng)建太多的線程引發(fā)OOM舵鳞。

newScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)

創(chuàng)建一個大小無限的線程池。此線程池支持定時以及周期性執(zhí)行任務(wù)的需求琢蛤。

在《阿里巴巴java開發(fā)手冊》中指出了線程資源必須通過線程池提供蜓堕,不允許在應(yīng)用中自行顯示的創(chuàng)建線程,這樣一方面是線程的創(chuàng)建更加規(guī)范博其,可以合理控制開辟線程的數(shù)量套才;另一方面線程的細節(jié)管理交給線程池處理,優(yōu)化了資源的開銷慕淡。而線程池不允許使用Executors去創(chuàng)建背伴,而要通過ThreadPoolExecutor方式,這一方面是由于jdk中Executor框架雖然提供了如newFixedThreadPool()峰髓、newSingleThreadExecutor()傻寂、newCachedThreadPool()等創(chuàng)建線程池的方法,但都有其局限性携兵,不夠靈活疾掰;另外由于前面幾種方法內(nèi)部也是通過ThreadPoolExecutor方式實現(xiàn),使用ThreadPoolExecutor有助于大家明確線程池的運行規(guī)則眉孩,創(chuàng)建符合自己的業(yè)務(wù)場景需要的線程池个绍,避免資源耗盡的風(fēng)險。

Future浪汪、Callable接口

Future巴柿、Callable接口需要結(jié)合ExecutorService來使用,需要有線程池的支持死遭。

Future接口定義了操作異步異步任務(wù)執(zhí)行一些方法广恢,如獲取異步任務(wù)的執(zhí)行結(jié)果、取消任務(wù)的執(zhí)行呀潭、判斷任務(wù)是否被取消钉迷、判斷任務(wù)執(zhí)行是否完畢等。

Callable接口中定義了需要有返回的任務(wù)需要實現(xiàn)的方法钠署】反希——相當(dāng)于有返回值的Runnable

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

比如主線程讓一個子線程去執(zhí)行任務(wù),子線程可能比較耗時谐鼎,啟動子線程開始執(zhí)行任務(wù)后舰蟆,主線程就去做其他事情了,過了一會才去獲取子任務(wù)的執(zhí)行結(jié)果。

Future其他方法介紹一下

cancel:取消在執(zhí)行的任務(wù)身害,參數(shù)表示是否對執(zhí)行的任務(wù)發(fā)送中斷信號味悄,方法聲明如下:

boolean cancel(boolean mayInterruptIfRunning);

isCancelled:用來判斷任務(wù)是否被取消

isDone:判斷任務(wù)是否執(zhí)行完畢。

調(diào)用線程池的submit方法執(zhí)行任務(wù)塌鸯,submit參數(shù)為Callable接口:表示需要執(zhí)行的任務(wù)有返回值侍瑟,submit方法返回一個Future對象,F(xiàn)uture相當(dāng)于一個憑證丙猬,可以在任意時間拿著這個憑證去獲取對應(yīng)任務(wù)的執(zhí)行結(jié)果(調(diào)用其get方法)涨颜,代碼中調(diào)用了result.get()方法之后,此方法會阻塞當(dāng)前線程直到任務(wù)執(zhí)行結(jié)束淮悼。

實例:

    public static void main(String[] args) throws ExecutionException, InterruptedException {
            String taskName = "task";
        Future<String> future = executor.submit(() -> {
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
           // System.out.println(Thread.currentThread().getName() + "線程執(zhí)行" + taskName + "完畢咐低!");
            return "finished";
        });
        TimeUnit.SECONDS.sleep(1);
        //取消正在執(zhí)行的任務(wù),mayInterruptIfRunning:是否發(fā)送中斷信息
        future.cancel(false);
        System.out.println(future.isCancelled());
        System.out.println(future.isDone());
        //System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + ",結(jié)果:" + future.get());
        try {
            //超時獲取異步任務(wù)執(zhí)行結(jié)果
            System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + ",結(jié)果:" + future.get(10,TimeUnit.SECONDS));
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        executor.shutdown();
    }
}

輸出:

Exception in thread "main" java.util.concurrent.CancellationException
    at java.util.concurrent.FutureTask.report(FutureTask.java:121)
    at java.util.concurrent.FutureTask.get(FutureTask.java:206)
    at com.self.current.FutureTest.main(FutureTest.java:46)
true
true

FutureTask類

FutureTask除了實現(xiàn)Future接口袜腥,還實現(xiàn)了Runnable接口见擦,因此FutureTask可以交給Executor執(zhí)行,也可以交給線程執(zhí)行執(zhí)行(Thread有個Runnable的構(gòu)造方法)羹令,FutureTask表示帶返回值結(jié)果的任務(wù)鲤屡。線程池的submit方法返回的Future實際類型正是FutureTask對象

疑問:

Q:線程池執(zhí)行submit()方法是如何調(diào)用Callable任務(wù)的福侈?

A:Callable通過線程池執(zhí)行的過程酒来,封裝為Runnable。線程池執(zhí)行submit()方法會把Callable包裝成FutrueTask對象肪凛,此對象實現(xiàn)了Runnable接口堰汉,當(dāng)調(diào)用FutrueTask的run方法時,會把其屬性中的Callable拿出來執(zhí)行call()方法伟墙。示例代碼如下:

  public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
    
        public void run() {
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        }
    }

Q:多線程并行處理定時任務(wù)時翘鸭,Timer運行多個TimeTask時,只要其中之一沒有捕獲拋出的異常戳葵,其它任務(wù)便會自動終止運行就乓,使用ScheduledExecutorService則沒有這個問題。是因為ScheduledExecutorService是多線程么拱烁?

A:是因為Timer只有一個線程在運行生蚁,while(true)循環(huán)不斷地從隊列中獲取任務(wù)執(zhí)行,而當(dāng)線程被被殺死或者中斷時戏自,就相當(dāng)于關(guān)閉了Timer.

Q: ScheduleThreadPoolExecutor定時器并不關(guān)心線程數(shù)多少邦投,他不是并發(fā)的執(zhí)行多任務(wù),只關(guān)心調(diào)度一個定時任務(wù)擅笔,線程數(shù)的多少只是影響多個任務(wù)再調(diào)度時需要多個線程尼摹,這樣理解對么见芹?

A:我認為這樣理解是對的,而這樣也可以解釋上面Timer運行多個TimeTask時蠢涝,只要其中之一沒有捕獲拋出的異常,其它任務(wù)便會自動終止運行的原因阅懦,是因為Timer只有一個線程在運行和二,while(true)循環(huán)不斷地從隊列中獲取任務(wù)執(zhí)行,而當(dāng)線程被被殺死或者中斷時耳胎,就相當(dāng)于關(guān)閉了Timer.下面是多個任務(wù)調(diào)度時會創(chuàng)建多個線程去執(zhí)行惯吕。

From DemoThreadFactory's 延遲調(diào)度線程池-Worker-1:Thu Aug 27 14:22:22 CST 2020 第1次開始執(zhí)行
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-2:Thu Aug 27 14:22:22 CST 2020 第2次開始執(zhí)行
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-3:Thu Aug 27 14:22:22 CST 2020 第3次開始執(zhí)行
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-1:Thu Aug 27 14:22:27 CST 2020 第1次結(jié)束執(zhí)行
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-2:Thu Aug 27 14:22:27 CST 2020 第2次結(jié)束執(zhí)行
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-3:Thu Aug 27 14:22:27 CST 2020 第3次結(jié)束執(zhí)行
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-1:Thu Aug 27 14:22:30 CST 2020 第4次開始執(zhí)行
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-4:Thu Aug 27 14:22:30 CST 2020 第5次開始執(zhí)行
From DemoThreadFactory's 延遲調(diào)度線程池-Worker-2:Thu Aug 27 14:22:30 CST 2020 第6次開始執(zhí)行

CompletionService接口——獲取線程池中已經(jīng)完成的任務(wù)

CompletionService相當(dāng)于一個執(zhí)行任務(wù)的服務(wù),通過submit丟任務(wù)給這個服務(wù)怕午,服務(wù)內(nèi)部去執(zhí)行任務(wù)废登,可以通過服務(wù)提供的一些方法獲取服務(wù)中已經(jīng)完成的任務(wù)。

接口內(nèi)的幾個方法:

Future<V> submit(Callable<V> task);

用于向服務(wù)中提交有返回結(jié)果的任務(wù)郁惜,并返回Future對象

Future<V> submit(Runnable task, V result);

用戶向服務(wù)中提交有返回值的任務(wù)去執(zhí)行堡距,并返回Future對象。Runnable會被包裝成有返回值的Callable兆蕉,返回值為傳入的result羽戒。

Future<V> take() throws InterruptedException;

從服務(wù)中返回并移除一個已經(jīng)完成的任務(wù),如果獲取不到虎韵,會一致阻塞到有返回值為止易稠。此方法會響應(yīng)線程中斷。

Future<V> poll();

從服務(wù)中返回并移除一個已經(jīng)完成的任務(wù)包蓝,如果內(nèi)部沒有已經(jīng)完成的任務(wù)驶社,則返回空,此方法會立即響應(yīng)测萎。

Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;

嘗試在指定的時間內(nèi)從服務(wù)中返回并移除一個已經(jīng)完成的任務(wù)亡电,等待的時間超時還是沒有獲取到已完成的任務(wù),則返回空绳泉。此方法會響應(yīng)線程中斷

通過submit向內(nèi)部提交任意多個任務(wù)逊抡,通過take方法可以獲取已經(jīng)執(zhí)行完成的任務(wù),如果獲取不到將等待零酪。

ExecutorCompletionService

ExecutorCompletionService類是CompletionService接口的具體實現(xiàn)冒嫡。

說一下其內(nèi)部原理,ExecutorCompletionService創(chuàng)建的時候會傳入一個線程池四苇,調(diào)用submit方法傳入需要執(zhí)行的任務(wù)经窖,任務(wù)由內(nèi)部的線程池來處理;ExecutorCompletionService內(nèi)部有個阻塞隊列迄埃,任意一個任務(wù)完成之后卵渴,會將任務(wù)的執(zhí)行結(jié)果(Future類型)放入阻塞隊列中瓣赂,然后其他線程可以調(diào)用它take、poll方法從這個阻塞隊列中獲取一個已經(jīng)完成的任務(wù)片拍,獲取任務(wù)返回結(jié)果的順序和任務(wù)執(zhí)行完成的先后順序一致煌集,所以最先完成的任務(wù)會先返回。

看一下構(gòu)造方法:

public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }

構(gòu)造方法需要傳入一個Executor對象捌省,這個對象表示任務(wù)執(zhí)行器苫纤,所有傳入的任務(wù)會被這個執(zhí)行器執(zhí)行。

completionQueue是用來存儲任務(wù)結(jié)果的阻塞隊列纲缓,默認用采用的是LinkedBlockingQueue卷拘,也支持開發(fā)自己設(shè)置。通過submit傳入需要執(zhí)行的任務(wù)祝高,任務(wù)執(zhí)行完成之后栗弟,會放入completionQueue中。

任務(wù)完成入隊操作原理:

還是通過線程池execute()方法執(zhí)行一個FutureTask包裝的Callable任務(wù)工闺,F(xiàn)utureTask里的run方法會調(diào)用Callable任務(wù)call()方法執(zhí)行具體的行為乍赫,并在執(zhí)行結(jié)算后執(zhí)行set(result);設(shè)置返回值操作,而設(shè)置返回值操作中的finishCompletion()方法會調(diào)用鉤子方法done(),ExecutorCompletionService里定義的QueueingFuture繼承了FutureTask斤寂,重寫了鉤子方法耿焊,把完成的方法入隊保存起來了。

場景:買新房了遍搞,然后在網(wǎng)上下單買冰箱罗侯、洗衣機,電器商家不同溪猿,所以送貨耗時不一樣钩杰,然后等他們送貨,快遞只愿送到樓下诊县,然后我們自己將其搬到樓上的家中讲弄。 這時候我們需要根據(jù)異步先完成的快遞,拿個先到對其獲取做處理——搬上樓依痊。

示例:

public class ExecutorCompletionServiceTest {

    static class GoodsModel {
        //商品名稱
        String name;
        //購物開始時間
        long startime;
        //送到的時間
        long endtime;

        public GoodsModel(String name, long startime, long endtime) {
            this.name = name;
            this.startime = startime;
            this.endtime = endtime;
        }

        @Override
        public String toString() {
            return name + "避除,下單時間[" + this.startime + "," + endtime + "],耗時:" + (this.endtime - this.startime);
        }
    }
    /**
     * 將商品搬上樓
     *
     * @param goodsModel
     * @throws InterruptedException
     */
    static void moveUp(GoodsModel goodsModel) throws InterruptedException {
        //休眠5秒胸嘁,模擬搬上樓耗時
        TimeUnit.SECONDS.sleep(5);
        System.out.println("將商品搬上樓瓶摆,商品信息:" + goodsModel);
    }

    /**
     * 模擬下單
     *
     * @param name     商品名稱
     * @param costTime 耗時
     * @return
     */
    static Callable<GoodsModel> buyGoods(String name, long costTime) {
        return () -> {
            long startTime = System.currentTimeMillis();
            System.out.println(startTime + "購買" + name + "下單!");
            //模擬送貨耗時
            try {
                TimeUnit.SECONDS.sleep(costTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            long endTime = System.currentTimeMillis();
            System.out.println(endTime + name + "送到了!");
            return new GoodsModel(name, startTime, endTime);
        };
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        long st = System.currentTimeMillis();
        System.out.println(st + "開始購物!");
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,
                TimeUnit.SECONDS, new LinkedBlockingQueue<>(10),
                new DemoThreadFactory("訂單創(chuàng)建組"), new ThreadPoolExecutor.AbortPolicy());
        ExecutorCompletionService<GoodsModel> completionService = new ExecutorCompletionService<>(executor);
        //異步下單購買
        completionService.submit(buyGoods("電視機", 3));
        completionService.submit(buyGoods("洗碗機", 5));
        executor.shutdown();
        for (int i = 0; i < 2; i++) {
            //可以獲取到最先到的商品
            GoodsModel goodsModel = completionService.take().get();
            //將最先到的商品送上樓
            moveUp(goodsModel);
        }
        long et = System.currentTimeMillis();
        System.out.println(et + "貨物已送到家里咯,哈哈哈性宏!");
        System.out.println("總耗時:" + (et - st));
    }
}
1598583792616開始購物!
1598583792707購買電視機下單!
1598583792708購買洗碗機下單!
1598583795708電視機送到了!
1598583797709洗碗機送到了!
將商品搬上樓群井,商品信息:電視機,下單時間[1598583792707,1598583795708]毫胜,耗時:3001
將商品搬上樓书斜,商品信息:洗碗機诬辈,下單時間[1598583792708,1598583797709],耗時:5001
1598583805710貨物已送到家里咯荐吉,哈哈哈焙糟!
總耗時:13094

異步執(zhí)行一批任務(wù),有一個完成立即返回样屠,其他取消——線程池invokeAny ()方法

如果是要返回所有的任務(wù)結(jié)果酬荞,則調(diào)用 invokeAll(Collection<? extends Callable<T>> tasks)方法,invokeAny ()和invokeAll()都有超時調(diào)用方法瞧哟。如果超時時間到了,調(diào)用結(jié)束后還沒有全部完成枪向,會對所有工作線程發(fā)送中斷信號中斷操作勤揩。

方法聲明如下:

<T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

示例:

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        long st = System.currentTimeMillis();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,
                TimeUnit.SECONDS, new LinkedBlockingQueue<>(10),
                new DemoThreadFactory("訂單創(chuàng)建組"), new ThreadPoolExecutor.AbortPolicy());
        List<Callable<Integer>> list = new ArrayList<>();
        int taskCount = 5;
        for (int i = taskCount; i > 0; i--) {
            int j = i * 2;
            String taskName = "任務(wù)"+i;
            list.add(() -> {
                TimeUnit.SECONDS.sleep(j);
                System.out.println(taskName+"執(zhí)行完畢!");
                return j;
            });
        }
        //Integer integer = invokeAny(executor, list);
        //ExecutorService提供異步執(zhí)行一批任務(wù),有一個完成立即返回秘蛔,其他取消
        Integer integer = executor.invokeAny(list);
        System.out.println("耗時:" + (System.currentTimeMillis() - st) + ",執(zhí)行結(jié)果:" + integer);
        executor.shutdown();
    }

    private static <T> T invokeAny(ThreadPoolExecutor executor, List<Callable<T>> list) throws InterruptedException, ExecutionException {
        ExecutorCompletionService<T> completionService = new ExecutorCompletionService(executor);
        List<Future<T>> futureList = new ArrayList<>();
        for (Callable<T> s : list) {
            futureList.add(completionService.submit(s));
        }
        int n = list.size();
        try {
            for (int i = 0; i < n; ++i) {
                T r = completionService.take().get();
                if (r != null) {
                    return r;
                }
            }
        } finally {
            for (Future<T> future : futureList) {
                future.cancel(true);
            }
        }
        return null;
    }
}

輸出:

任務(wù)1執(zhí)行完畢!
耗時:2053,執(zhí)行結(jié)果:2

CompletableFuture——當(dāng)異步任務(wù)完成或者發(fā)生異常時陨亡,自動調(diào)用回調(diào)對象的回調(diào)方法,主線程無需等待獲取結(jié)果,異步是以守護線程執(zhí)行的深员,如果是用線程池作為執(zhí)行器則不是守護線程

使用Future獲得異步執(zhí)行結(jié)果時负蠕,要么調(diào)用阻塞方法get(),要么輪詢看isDone()是否為true倦畅,這兩種方法都不是很好遮糖,因為主線程也會被迫等待

從Java 8開始引入了CompletableFuture叠赐,它針對Future做了改進欲账,可以傳入回調(diào)對象,當(dāng)異步任務(wù)完成或者發(fā)生異常時芭概,自動調(diào)用回調(diào)對象的回調(diào)方法赛不。

我們以獲取股票價格為例,看看如何使用CompletableFuture

CompletableFuture優(yōu)點是:

  • 異步任務(wù)結(jié)束時罢洲,會自動回調(diào)某個對象的方法踢故;
  • 異步任務(wù)出錯時,會自動回調(diào)某個對象的方法惹苗;
  • 主線程設(shè)置好回調(diào)后殿较,不再關(guān)心異步任務(wù)的執(zhí)行。

如果只是實現(xiàn)了異步回調(diào)機制鸽粉,我們還看不出CompletableFuture相比Future的優(yōu)勢斜脂。CompletableFuture更強大的功能是,多個CompletableFuture可以串行執(zhí)行触机,多個CompletableFuture還可以并行執(zhí)行帚戳。

除了anyOf()可以實現(xiàn)“任意個CompletableFuture只要一個成功”玷或,allOf()可以實現(xiàn)“所有CompletableFuture都必須成功”,這些組合操作可以實現(xiàn)非常復(fù)雜的異步流程控制片任。

最后我們注意CompletableFuture的命名規(guī)則:

  • xxx():表示該方法將繼續(xù)在已有的線程中執(zhí)行偏友;
  • xxxAsync():表示將異步在線程池中執(zhí)行。

示例:

public class CompletableFutureTest {

    public static void main(String[] args) throws Exception {
        // 創(chuàng)建異步執(zhí)行任務(wù):
        CompletableFuture<Double> cf = CompletableFuture.supplyAsync(CompletableFutureTest::fetchPrice);
        // 如果執(zhí)行成功:
        cf.thenAccept((result) -> {
            System.out.println("price: " + result);
        });
        // 如果執(zhí)行異常:
        cf.exceptionally((e) -> {
            e.printStackTrace();
            return null;
        });
        // 主線程不要立刻結(jié)束对供,否則CompletableFuture默認使用的線程池會立刻關(guān)閉:
        Thread.sleep(200);
    }

    static Double fetchPrice() {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
        }
        if (Math.random() < 0.3) {
            throw new RuntimeException("fetch price failed!");
        }
        return 5 + Math.random() * 20;
    }

}

定義兩個CompletableFuture位他,第一個CompletableFuture根據(jù)證券名稱查詢證券代碼,第二個CompletableFuture根據(jù)證券代碼查詢證券價格产场,這兩個CompletableFuture實現(xiàn)串行操作如下:

public class CompletableFutureSerialTest {

    public static void main(String[] args) throws InterruptedException {
        //先獲取股票代碼
        CompletableFuture<String> tesla = CompletableFuture.supplyAsync(() -> {
            return CompletableFutureSerialTest.queryCode("tesla");
        });
        //再獲取股票代碼對應(yīng)的股價
        CompletableFuture<Double> priceFuture = tesla.thenApplyAsync((code) -> {
            return CompletableFutureSerialTest.fetchPrice(code);
        });
        //打印結(jié)果
        priceFuture.thenAccept((price)->{
            System.out.println("price: " + price);
        });
        // 主線程不要立刻結(jié)束鹅髓,否則CompletableFuture默認使用的線程池會立刻關(guān)閉:
        Thread.sleep(2000);
    }

    static String queryCode(String name) {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
        }
        return "601857";
    }

    static Double fetchPrice(String code) {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
        }
        return 5 + Math.random() * 20;
    }
}

輸出:

price: 23.116752498711122

示例:同時從新浪和網(wǎng)易查詢證券代碼,只要任意一個返回結(jié)果京景,就進行下一步查詢價格窿冯,查詢價格也同時從新浪和網(wǎng)易查詢,只要任意一個返回結(jié)果确徙,就完成操作醒串。

public class CompletableFutureParallelTest {

    public static void main(String[] args) throws InterruptedException {
        // 兩個CompletableFuture執(zhí)行異步查詢:
        CompletableFuture<String> teslaSina = CompletableFuture.supplyAsync(() -> {
            return CompletableFutureParallelTest.queryCode("tesla","https://finance.sina.com.cn/code/");
        });

        CompletableFuture<String> tesla163 = CompletableFuture.supplyAsync(() -> {
            return CompletableFutureParallelTest.queryCode("tesla","https://money.163.com/code/");
        });
        // 用anyOf合并為一個新的CompletableFuture:
        CompletableFuture<Object> stockFuture = CompletableFuture.anyOf(tesla163, teslaSina);

        //再獲取股票代碼對應(yīng)的股價
        // 兩個CompletableFuture執(zhí)行異步查詢:
        CompletableFuture<Double> priceSina = stockFuture.thenApplyAsync((code) -> {
            return CompletableFutureParallelTest.fetchPrice(String.valueOf(code),"https://money.163.com/code/");
        });
        CompletableFuture<Double> price163 = stockFuture.thenApplyAsync((code) -> {
            return CompletableFutureParallelTest.fetchPrice(String.valueOf(code),"https://money.163.com/code/");
        });
        // 用anyOf合并為一個新的CompletableFuture:
        CompletableFuture<Object> priceFuture = CompletableFuture.anyOf(priceSina, price163);

        //打印結(jié)果
        priceFuture.thenAccept((price)->{
            System.out.println("price: " + price);
        });
        // 主線程不要立刻結(jié)束,否則CompletableFuture默認使用的線程池會立刻關(guān)閉:
        Thread.sleep(2000);
    }

    static String queryCode(String name, String url) {
        System.out.println("query code from " + url + "...");
        try {
            Thread.sleep((long) (Math.random() * 100));
        } catch (InterruptedException e) {
        }
        return "601857";
    }

    static Double fetchPrice(String code, String url) {
        System.out.println("query price from " + url + "...");
        try {
            Thread.sleep((long) (Math.random() * 100));
        } catch (InterruptedException e) {
        }
        return 5 + Math.random() * 20;
    }
}
query code from https://finance.sina.com.cn/code/...
query code from https://money.163.com/code/...
query price from https://money.163.com/code/...
query price from https://money.163.com/code/...
price: 17.34369661842006
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末鄙皇,一起剝皮案震驚了整個濱河市芜赌,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌伴逸,老刑警劉巖缠沈,帶你破解...
    沈念sama閱讀 206,968評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異违柏,居然都是意外死亡博烂,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評論 2 382
  • 文/潘曉璐 我一進店門漱竖,熙熙樓的掌柜王于貴愁眉苦臉地迎上來禽篱,“玉大人,你說我怎么就攤上這事馍惹√陕剩” “怎么了?”我有些...
    開封第一講書人閱讀 153,220評論 0 344
  • 文/不壞的土叔 我叫張陵万矾,是天一觀的道長悼吱。 經(jīng)常有香客問我,道長良狈,這世上最難降的妖魔是什么后添? 我笑而不...
    開封第一講書人閱讀 55,416評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮薪丁,結(jié)果婚禮上遇西,老公的妹妹穿的比我還像新娘馅精。我一直安慰自己,他們只是感情好粱檀,可當(dāng)我...
    茶點故事閱讀 64,425評論 5 374
  • 文/花漫 我一把揭開白布洲敢。 她就那樣靜靜地躺著,像睡著了一般茄蚯。 火紅的嫁衣襯著肌膚如雪压彭。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,144評論 1 285
  • 那天渗常,我揣著相機與錄音壮不,去河邊找鬼。 笑死皱碘,一個胖子當(dāng)著我的面吹牛忆畅,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播尸执,決...
    沈念sama閱讀 38,432評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼缓醋!你這毒婦竟也來了如失?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,088評論 0 261
  • 序言:老撾萬榮一對情侶失蹤送粱,失蹤者是張志新(化名)和其女友劉穎褪贵,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體抗俄,經(jīng)...
    沈念sama閱讀 43,586評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡脆丁,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,028評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了动雹。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片槽卫。...
    茶點故事閱讀 38,137評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖胰蝠,靈堂內(nèi)的尸體忽然破棺而出歼培,到底是詐尸還是另有隱情,我是刑警寧澤茸塞,帶...
    沈念sama閱讀 33,783評論 4 324
  • 正文 年R本政府宣布躲庄,位于F島的核電站,受9級特大地震影響钾虐,放射性物質(zhì)發(fā)生泄漏噪窘。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,343評論 3 307
  • 文/蒙蒙 一效扫、第九天 我趴在偏房一處隱蔽的房頂上張望倔监。 院中可真熱鬧直砂,春花似錦、人聲如沸丐枉。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,333評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽瘦锹。三九已至籍嘹,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間弯院,已是汗流浹背辱士。 一陣腳步聲響...
    開封第一講書人閱讀 31,559評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留听绳,地道東北人颂碘。 一個月前我還...
    沈念sama閱讀 45,595評論 2 355
  • 正文 我出身青樓,卻偏偏與公主長得像椅挣,于是被迫代替她去往敵國和親头岔。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,901評論 2 345