Java分析——使用異步編排任務(wù)和線程池

| 一宿崭、如何創(chuàng)建線程池? |

1、七大參數(shù)介紹

| 1)corePoolSize |

核心線程數(shù),一直存在線程池中(除非設(shè)置了allowCoreThreadTimeOut)垃你,創(chuàng)建好就等待就緒趾娃,去執(zhí)行任務(wù)

| 2)maximumPoolSize |

最大線程數(shù)缭嫡,設(shè)置最大線程數(shù)是為了控制資源

| 3)keepAliveTime |

存活時(shí)間,如果當(dāng)前的線程數(shù)大于核心線程數(shù)抬闷,并且線程空閑的時(shí)間大于存活時(shí)間了妇蛀,則會(huì)執(zhí)行釋放線程的操作。(釋放的數(shù)量為:maximumPoolSize - corePoolSize)

| 4)unit |

時(shí)間單位

| 5)workQueue |

阻塞隊(duì)列笤成,如果任務(wù)有很多评架,就會(huì)將目前多的任務(wù)放到隊(duì)列中,當(dāng)有空閑的線程時(shí)炕泳,就會(huì)從隊(duì)列中取出新的任務(wù)繼續(xù)執(zhí)行纵诞。

| 6)threadFactory |

線程的創(chuàng)建工廠

| 7)handler |

拒絕策略,如果隊(duì)列滿了培遵,按照我們指定的拒絕策略拒絕執(zhí)行任務(wù)

有哪些拒絕策略浙芙?

  1. DiscardOldestPolicy
    如果有新的任務(wù)進(jìn)來就會(huì)丟去最舊的未執(zhí)行的任務(wù)

  2. AbortPolicy
    直接丟棄新任務(wù)登刺,拋出異常

  3. CallerRunsPolicy
    如果有新任務(wù)進(jìn)來,直接調(diào)用run()方法茁裙,同步執(zhí)行操作

  1. DiscardPolicy
    直接丟棄新進(jìn)來的任務(wù)塘砸,不會(huì)拋出異常
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

// 常見的創(chuàng)建線程的方式
// 1)Executors . newCachedThreadApol() // 核心為0,所有都可回收的線程池
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
// 2)Ехесutоr? . nеwF?хеdТhrеаdРооl(xiāng)() 固定大小的線程池晤锥,不會(huì)過期
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
// 3)Executors . newScheduledThreadPool() 定時(shí)任務(wù)的線程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
}
// 4)Executors . newSingleThreadExecutor() 單線程的線程池掉蔬,后臺(tái)從隊(duì)列中獲取任務(wù),一個(gè)一個(gè)執(zhí)行
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
}

問:一個(gè)corePoolSize=7 maximumPoolSize=20 workQueue=50的線程池矾瘾,如果本次有100個(gè)并發(fā)進(jìn)來女轿,是如何執(zhí)行的?

答:7個(gè)會(huì)立即被執(zhí)行壕翩,50個(gè)會(huì)進(jìn)入隊(duì)列蛉迹,然后會(huì)另外開13個(gè)新的線程,剩余的30個(gè)線程就需要看當(dāng)前線程池的拒絕策略了放妈。

| 二北救、CompletableFeture異步編排 |

1、runAsync 創(chuàng)建異步對(duì)象的方式

    // 1)無返回值的異步操作
    public static CompletableFuture<Void> runAsync(Runnable runnable) {
            return asyncRunStage(asyncPool, runnable);
    }
    // 2)無返回值的異步操作,可指定線程池
    public static CompletableFuture<Void> runAsync(Runnable runnable,
                                                       Executor executor) {
            return asyncRunStage(screenExecutor(executor), runnable);
    }
    
    // 3)有返回值的異步操作
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
            return asyncSupplyStage(asyncPool, supplier);
    }
    
    // 4)有返回值的異步操作,可指定線程池
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                           Executor executor) {
            return asyncSupplyStage(screenExecutor(executor), supplier);
    }

2芜抒、whenComplete 計(jì)算完成時(shí)回調(diào)的方法

1)方法介紹

    // 上一個(gè)異步完成時(shí)執(zhí)行該方法珍策,和上一個(gè)任務(wù)用同一個(gè)線程
    public CompletableFuture<T> whenComplete(
        BiConsumer<? super T, ? super Throwable> action) {
        return uniWhenCompleteStage(null, action);
    }
    // 上一個(gè)異步完成時(shí)執(zhí)行該方法,異步的方式執(zhí)行
    public CompletableFuture<T> whenCompleteAsync(
        BiConsumer<? super T, ? super Throwable> action) {
        return uniWhenCompleteStage(asyncPool, action);
    }
    // 上一個(gè)異步完成時(shí)執(zhí)行該方法宅倒,異步的方式執(zhí)行攘宙,可以自己指定線程池
    public CompletableFuture<T> whenCompleteAsync(
        BiConsumer<? super T, ? super Throwable> action, Executor executor) {
        return uniWhenCompleteStage(screenExecutor(executor), action);
    }

    // 處理異常
    public CompletableFuture<T> exceptionally(
        Function<Throwable, ? extends T> fn) {
        return uniExceptionallyStage(fn);
    }

2)示例代碼

// 示例代碼線程池
public static ExecutorService executor = Executors.newFixedThreadPool(10);

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("當(dāng)前線程號(hào) -> " + Thread.currentThread().getId());
    int n = 10 / 0;
    return n;
}, executor).whenComplete((result,excption) -> {
    System.out.println("運(yùn)行結(jié)果:" + result + "異常:" + excption);
}).exceptionally(throwable -> { 
    // 出現(xiàn)異常 exceptionally感知并處理異常,返回最終結(jié)果
        return 10;
});

Integer integer = future.get();
System.out.println("最終運(yùn)行結(jié)果:" + integer);  // 10

3拐迁、handleAsync 方法

1)方法介紹

    // 上一個(gè)方法執(zhí)行后作出的處理
    public <U> CompletableFuture<U> handle(
        BiFunction<? super T, Throwable, ? extends U> fn) {
        return uniHandleStage(null, fn);
    }

    public <U> CompletableFuture<U> handleAsync(
        BiFunction<? super T, Throwable, ? extends U> fn) {
        return uniHandleStage(asyncPool, fn);
    }

    public <U> CompletableFuture<U> handleAsync(
        BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
        return uniHandleStage(screenExecutor(executor), fn);
    }

2)示例代碼

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("當(dāng)前線程號(hào) -> " + Thread.currentThread().getId());
    int n = 10 / 0;
    return n;
}, executor).handle((res, exception) -> {
    if (res != null) {
        // 如果上一個(gè)任務(wù)沒出現(xiàn)異常蹭劈,修改返回結(jié)果
        return res * 10;
    }
    if (exception != null) {
        // 上一個(gè)任務(wù)出現(xiàn)了異常
        return 0;
    }
    return 0;
});

4、線程串行化方法

1)方法介紹

thenApply方法:當(dāng)一個(gè)線程依賴另一個(gè) 線程時(shí)线召,獄取上一個(gè)任務(wù)返回的結(jié)果铺韧,開返回當(dāng)前任務(wù)的返回值。

thenAccept方法:消費(fèi)處理結(jié)果缓淹。接收任務(wù)的處理結(jié)果祟蚀,并消費(fèi)處理,無返回結(jié)果割卖。

thenRun方法:只要上面的任務(wù)執(zhí)行完成前酿,就開始執(zhí)行thenRun,只是處理完任務(wù)后,執(zhí)行thenRun的后續(xù)操作


    public <U> CompletableFuture<U> thenApply(
        Function<? super T,? extends U> fn) {
        return uniApplyStage(null, fn);
    }

    public <U> CompletableFuture<U> thenApplyAsync(
        Function<? super T,? extends U> fn) {
        return uniApplyStage(asyncPool, fn);
    }

    public <U> CompletableFuture<U> thenApplyAsync(
        Function<? super T,? extends U> fn, Executor executor) {
        return uniApplyStage(screenExecutor(executor), fn);
    }

    public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
        return uniAcceptStage(null, action);
    }

    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
        return uniAcceptStage(asyncPool, action);
    }

    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
                                                   Executor executor) {
        return uniAcceptStage(screenExecutor(executor), action);
    }

    public CompletableFuture<Void> thenRun(Runnable action) {
        return uniRunStage(null, action);
    }

    public CompletableFuture<Void> thenRunAsync(Runnable action) {
        return uniRunStage(asyncPool, action);
    }

    public CompletableFuture<Void> thenRunAsync(Runnable action,
                                                Executor executor) {
        return uniRunStage(screenExecutor(executor), action);
    }

2)示例代碼

| ① thenRunAsync |

thenRunAsync 不能獲取上一步執(zhí)行結(jié)果

    // thenRunAsync 不能獲取上一步執(zhí)行結(jié)果
    CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
        System.out.println("當(dāng)前線程號(hào) -> " + Thread.currentThread().getId());
        int n = 10 / 0;
        return n;
    }, executor).thenRunAsync(() -> {
        System.out.println("線程2運(yùn)行了鹏溯!");
    }, executor);

| ② thenAcceptAsync |

thenAcceptAsync可以獲取上一個(gè)任務(wù)執(zhí)行的結(jié)果罢维,但是無法對(duì)其進(jìn)行修改

    // thenAcceptAsync可以獲取上一個(gè)任務(wù)執(zhí)行的結(jié)果,但是無法對(duì)其進(jìn)行修改
   CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
        System.out.println("當(dāng)前線程號(hào) -> " + Thread.currentThread().getId());
        int n = 10;
        return n;
    }, executor).thenAcceptAsync((res) -> {
        // 如果上一個(gè)任務(wù)產(chǎn)生異常或者執(zhí)行失敗肺孵,則不執(zhí)行該任務(wù)
        System.out.println("上一個(gè)任務(wù)獲取的結(jié)果:" + res);
    }, executor);

| ③ thenApplyAsync |

thenApplyAsync 可以獲取上一個(gè)任務(wù)返回的結(jié)果匀借,并對(duì)其進(jìn)行修改再返回

 CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("當(dāng)前線程號(hào) -> " + Thread.currentThread().getId());
            int n = 10;
            return n;
        }, executor).thenApplyAsync((res) -> {
            return res * 2;
        }, executor);

        Integer result = future.get();

        System.out.println("最終返回結(jié)果:" + result);

5、組合任務(wù)平窘,一個(gè)完成

1)方法介紹

applyToEitherAsync:阻塞等待吓肋,只要有一個(gè)任務(wù)完成了,就執(zhí)行該任務(wù)

   public <U> CompletableFuture<U> applyToEitherAsync(
        CompletionStage<? extends T> other, Function<? super T, U> fn,
        Executor executor) {
        return orApplyStage(screenExecutor(executor), other, fn);
    }

2)示例代碼

CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("當(dāng)前線程號(hào) -> " + Thread.currentThread().getId());
    int n = 5;
    // 模擬這個(gè)任務(wù)比較慢完成瑰艘,讓future2先完成是鬼,測(cè)試applyToEitherAsync 只要有一個(gè)任務(wù)完成就執(zhí)行
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return n;
}, executor);

CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
    System.out.println("當(dāng)前線程號(hào) -> " + Thread.currentThread().getId());
    int n = 10;
    return n;
}, executor);

future1.applyToEitherAsync(future2, res -> {
    System.out.println(res);
    return res + 1;
}, executor);

6、組合任務(wù)紫新,所有的完成

1)方法介紹

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
        return andTree(cfs, 0, cfs.length - 1);
}

2)示例代碼

CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任務(wù)1當(dāng)前線程號(hào) -> " + Thread.currentThread().getId());
    int n = 5;
    return n;
}, executor);

CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任務(wù)2當(dāng)前線程號(hào) -> " + Thread.currentThread().getId());
    int n = 10;
    return n;
}, executor);

CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2);
// 阻塞等待所有的任務(wù)執(zhí)行完成
allOf.get();

Integer result1 = future1.get();
Integer result2 = future2.get();


讓我們來試試項(xiàng)目中如何使用異步編排吧均蜜!

| 三、異步編排實(shí)際開發(fā) |

1芒率、配置線程池

@ConfigurationProperties(prefix = "coke.thread")
@Component
@Data
public class ThreadPoolProperties {
    private Integer coreSize;
    private Integer maxSize;
    private Integer keepAliveTime;
}

//@EnableConfigurationProperties(ThreadPoolProperties.class)  如果沒有把線程池的常量配置類放到容器中囤耳,則使用該注解
@Configuration
public class MyThreadConfig {

    @Bean
    public ThreadPoolExecutor threadPoolExecutor(ThreadPoolProperties pool) {
        return new ThreadPoolExecutor(
                pool.getCoreSize(),
                pool.getMaxSize(),
                pool.getKeepAliveTime(),
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(100000),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );
    }
}

3、示例代碼

public SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException {

    SkuItemVo skuItemVo = new SkuItemVo();

    // supplyAsync 需要返回結(jié)果  因?yàn)?3 4 5 依賴1
    CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> {
        // 1偶芍、獲取sku基本信息 pms_sku_info
        SkuInfoEntity skuInfoEntity = getById(skuId);
        skuItemVo.setInfo(skuInfoEntity);
        return skuInfoEntity;
    }, executor);

    CompletableFuture<Void> saleFuture = infoFuture.thenAcceptAsync((res) -> {
        // 3充择、獲取spu的銷售屬性組合
        List<SkuItemSaleAttrVo> skuItemSaleAttrVos = skuSaleAttrValueService.getSaleAttrsBySpuId(res.getSpuId());
        skuItemVo.setSaleAttrs(skuItemSaleAttrVos);
    }, executor);

    CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync((res) -> {
        // 4、獲取spu的介紹 pms_spu_info_desc
        SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(res.getSpuId());
        skuItemVo.setDesp(spuInfoDescEntity);
    }, executor);

    CompletableFuture<Void> baseAttrFuture = infoFuture.thenAcceptAsync((res) -> {
        // 5匪蟀、獲取spu的規(guī)格參數(shù)信息
        List<SpuItemAttrGroupVo> attrGroupVos = attrGroupService.getAttrGroupWithAttrsBySpuId(res.getCatalogId(), res.getSpuId());
        skuItemVo.setAttrGroups(attrGroupVos);
    }, executor);

    CompletableFuture<Void> imageFuture = CompletableFuture.runAsync(() -> {
        // 2椎麦、獲取sku的圖片信息 pms_spu_images
        List<SkuImagesEntity> skuImagesEntities = skuImagesService.getImageBySkuId(skuId);
        skuItemVo.setImages(skuImagesEntities);
    }, executor);

    // 6、查詢當(dāng)前sku是否參與秒殺優(yōu)惠
    CompletableFuture<Void> secKillFuture = CompletableFuture.runAsync(() -> {
        R skuSecKillInfo = secKillFeignService.getSkuSecKillInfo(skuId);
        if (skuSecKillInfo.getCode() == 0) {
            SecKillInfoVo skuSecKillInfoData = skuSecKillInfo.getData(new TypeReference<SecKillInfoVo>() {
            });
            skuItemVo.setSecKillInfoVo(skuSecKillInfoData);
        }
    }, executor);

    // 等到所有任務(wù)都完成
    CompletableFuture.allOf(saleFuture, descFuture, baseAttrFuture, imageFuture, secKillFuture).get();

    return skuItemVo;
}


結(jié)尾

本文到這里就結(jié)束了萄窜,感謝看到最后的朋友铃剔,都看到最后了點(diǎn)個(gè)贊再走啦撒桨,如有不對(duì)之處還請(qǐng)多多指正查刻。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市凤类,隨后出現(xiàn)的幾起案子穗泵,更是在濱河造成了極大的恐慌,老刑警劉巖谜疤,帶你破解...
    沈念sama閱讀 221,576評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件佃延,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡夷磕,警方通過查閱死者的電腦和手機(jī)履肃,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,515評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來坐桩,“玉大人尺棋,你說我怎么就攤上這事∶圊危” “怎么了膘螟?”我有些...
    開封第一講書人閱讀 168,017評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵成福,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我荆残,道長(zhǎng)影涉,這世上最難降的妖魔是什么癣猾? 我笑而不...
    開封第一講書人閱讀 59,626評(píng)論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮,結(jié)果婚禮上柑爸,老公的妹妹穿的比我還像新娘。我一直安慰自己颁虐,他們只是感情好罕拂,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,625評(píng)論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著备徐,像睡著了一般萄传。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上蜜猾,一...
    開封第一講書人閱讀 52,255評(píng)論 1 308
  • 那天秀菱,我揣著相機(jī)與錄音,去河邊找鬼蹭睡。 笑死衍菱,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的肩豁。 我是一名探鬼主播脊串,決...
    沈念sama閱讀 40,825評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼清钥!你這毒婦竟也來了琼锋?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,729評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤祟昭,失蹤者是張志新(化名)和其女友劉穎缕坎,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體篡悟,經(jīng)...
    沈念sama閱讀 46,271評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡谜叹,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,363評(píng)論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了搬葬。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片荷腊。...
    茶點(diǎn)故事閱讀 40,498評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖急凰,靈堂內(nèi)的尸體忽然破棺而出女仰,到底是詐尸還是另有隱情,我是刑警寧澤,帶...
    沈念sama閱讀 36,183評(píng)論 5 350
  • 正文 年R本政府宣布董栽,位于F島的核電站码倦,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏锭碳。R本人自食惡果不足惜袁稽,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,867評(píng)論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望擒抛。 院中可真熱鬧推汽,春花似錦、人聲如沸歧沪。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,338評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽诊胞。三九已至暖夭,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間撵孤,已是汗流浹背迈着。 一陣腳步聲響...
    開封第一講書人閱讀 33,458評(píng)論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留邪码,地道東北人裕菠。 一個(gè)月前我還...
    沈念sama閱讀 48,906評(píng)論 3 376
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像闭专,于是被迫代替她去往敵國和親奴潘。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,507評(píng)論 2 359