【JAVA】應(yīng)用中的并行

下面看一個服務(wù)的調(diào)用鏈


屏幕快照 2019-03-22 下午10.56.41.png

設(shè)想一下這5個查詢服務(wù)曲秉,平均每次消耗50ms二蓝,那么本次調(diào)用至少是250ms翘盖,我們細(xì)想一下,在這個這五個服務(wù)其實并沒有任何的依賴瓶摆,誰先獲取誰后獲取都可以凉逛,那么我們可以想想,是否可以用多重影分身之術(shù)群井,同時獲取這五個服務(wù)的信息呢状飞?優(yōu)化如下:

屏幕快照 2019-03-22 下午10.58.07.png

將這五個查詢服務(wù)并行查詢,在理想情況下可以優(yōu)化至50ms书斜。當(dāng)然說起來簡單叭爱,我們真正如何落地呢造寝?

1.CountDownLatch

CountDownLatch钳垮,可以將其看成是一個計數(shù)器蝗拿,await()方法可以阻塞至超時或者計數(shù)器減至0,其他線程當(dāng)完成自己目標(biāo)的時候可以減少1样屠,利用這個機制我們可以將其用來做并發(fā)穿撮。 可以用如下的代碼實現(xiàn)我們上面的下訂單的需求:

public class CountDownTask {
    private static final int CORE_POOL_SIZE = 4;
    private static final int MAX_POOL_SIZE = 12;
    private static final long KEEP_ALIVE_TIME = 5L;
    private final static int QUEUE_SIZE = 1600;

    protected final static ExecutorService THREAD_POOL = new ThreadPoolExecutor(
            CORE_POOL_SIZE,
            MAX_POOL_SIZE,
            KEEP_ALIVE_TIME,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(QUEUE_SIZE)
    );

    public static void main(String[] args) throws InterruptedException {
        // 新建一個為5的計數(shù)器
        CountDownLatch countDownLatch = new CountDownLatch(5);

        OrderInfo orderInfo = new OrderInfo();
        THREAD_POOL.execute(() -> {
            System.out.println("當(dāng)前任務(wù)Customer,線程名字為:"+ Thread.currentThread().getName());
            orderInfo.setCustomerInfo(new CustomerInfo());
            countDownLatch.countDown();
        });
        THREAD_POOL.execute(() -> {
            System.out.println("當(dāng)前任務(wù)Discount,線程名字為:"+ Thread.currentThread().getName());
            orderInfo.setDiscountInfo(new DiscountInfo());
            countDownLatch.countDown();
        });
        THREAD_POOL.execute(() -> {
            System.out.println("當(dāng)前任務(wù)Food,線程名字為:"+ Thread.currentThread().getName());
            orderInfo.setFoodListInfo(new FoodListInfo());
            countDownLatch.countDown();
        });
        THREAD_POOL.execute(() -> {
            System.out.println("當(dāng)前任務(wù)Tenant,線程名字為:"+ Thread.currentThread().getName());
            orderInfo.setTenantInfo(new TenantInfo());
            countDownLatch.countDown();
        });
        THREAD_POOL.execute(() -> {
            System.out.println("當(dāng)前任務(wù)OtherInfo,線程名字為:"+ Thread.currentThread().getName());
            orderInfo.setOtherInfo(new OtherInfo());
            countDownLatch.countDown();
        });
        countDownLatch.await(1, TimeUnit.SECONDS);
        System.out.println("主線程:"+ Thread.currentThread().getName());
    }
}

建立一個線程池(具體配置根據(jù)具體業(yè)務(wù),具體機器配置)痪欲,進行并發(fā)的執(zhí)行我們的任務(wù)(生成用戶信息悦穿,菜品信息等),最后利用await方法阻塞等待結(jié)果成功返回勤揩。

2.CompletableFuture

CountDownLatch雖然能實現(xiàn)我們需要滿足的功能但是其任然有個問題是,在我們的業(yè)務(wù)代碼需要耦合CountDownLatch的代碼秘蛔,比如在我們獲取用戶信息之后我們會執(zhí)行countDownLatch.countDown()陨亡,很明顯我們的業(yè)務(wù)代碼顯然不應(yīng)該關(guān)心這一部分邏輯傍衡,并且在開發(fā)的過程中萬一寫漏了,那我們的await方法將只會被各種異常喚醒负蠕。

在JDK1.8中提供了一個類CompletableFuture蛙埂,它是一個多功能的非阻塞的Future。(什么是Future:用來代表異步結(jié)果遮糖,并且提供了檢查計算完成绣的,等待完成,檢索結(jié)果完成等方法欲账。)

我們將每個任務(wù)的計算完成的結(jié)果都用CompletableFuture來表示屡江,利用CompletableFuture.allOf匯聚成一個大的CompletableFuture,那么利用get()方法就可以阻塞赛不。

public class CompletableFutureParallel {

    private static final int CORE_POOL_SIZE = 4;
    private static final int MAX_POOL_SIZE = 12;
    private static final long KEEP_ALIVE_TIME = 5L;
    private final static int QUEUE_SIZE = 1600;

    protected final static ExecutorService THREAD_POOL = new ThreadPoolExecutor(
            CORE_POOL_SIZE,
            MAX_POOL_SIZE,
            KEEP_ALIVE_TIME,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(QUEUE_SIZE)
    );

    public static void main(String[] args) throws Exception {
        OrderInfo orderInfo = new OrderInfo();

        List futures = new ArrayList<>();

        futures.add(CompletableFuture.runAsync(() -> {
            System.out.println("當(dāng)前任務(wù)Customer,線程名字為:" + Thread.currentThread().getName());
            orderInfo.setCustomerInfo(new CustomerInfo());
        }, THREAD_POOL));

        futures.add(CompletableFuture.runAsync(() -> {
            System.out.println("當(dāng)前任務(wù)Discount,線程名字為:" + Thread.currentThread().getName());
            orderInfo.setDiscountInfo(new DiscountInfo());
        }, THREAD_POOL));

        futures.add(CompletableFuture.runAsync(() -> {
            System.out.println("當(dāng)前任務(wù)Food,線程名字為:" + Thread.currentThread().getName());
            orderInfo.setFoodListInfo(new FoodListInfo());
        }, THREAD_POOL));

        futures.add(CompletableFuture.runAsync(() -> {
            System.out.println("當(dāng)前任務(wù)Other,線程名字為:" + Thread.currentThread().getName());
            orderInfo.setOtherInfo(new OtherInfo());
        }, THREAD_POOL));

        CompletableFuture allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
        allDoneFuture.get(10, TimeUnit.SECONDS);
        System.out.println(orderInfo);
    }
}

可以看見我們使用CompletableFuture能很快的完成的需求惩嘉,當(dāng)然這還不夠。

3.Fork/Join

我們上面用CompletableFuture完成了我們對多組任務(wù)并行執(zhí)行踢故,但是其依然是依賴我們的線程池文黎,在我們的線程池中使用的是阻塞隊列,也就是當(dāng)我們某個線程執(zhí)行完任務(wù)的時候需要通過這個阻塞隊列進行殿较,那么肯定會發(fā)生競爭耸峭,所以在JDK1.7中提供了ForkJoinTask和ForkJoinPool。

屏幕快照 2019-03-22 下午11.17.16.png

ForkJoinPool中每個線程都有自己的工作隊列淋纲,并且采用Work-Steal算法防止線程饑餓劳闹。 Worker線程用LIFO的方法取出任務(wù),但是會用FIFO的方法去偷取別人隊列的任務(wù)帚戳,這樣就減少了鎖的沖突玷或。

屏幕快照 2019-03-22 下午11.18.20.png

網(wǎng)上這個框架的例子很多,我們看看如何使用代碼其完成我們上面的下訂單需求:

public class OrderTask extends RecursiveTask {
    
    @Override
    protected OrderInfocompute() {
        System.out.println("執(zhí)行" + this.getClass().getSimpleName() + "線程名字為:" + Thread.currentThread().getName());                

        CustomerTask customerTask = new CustomerTask();

        TenantTask tenantTask = new TenantTask();

        DiscountTask discountTask = new DiscountTask();

        FoodTask foodTask = new FoodTask();

        OtherTask otherTask = new OtherTask();

        invokeAll(customerTask, tenantTask, discountTask, foodTask, otherTask);

        OrderInfo orderInfo = new OrderInfo(customerTask.join(), tenantTask.join(), discountTask.join(), foodTask.join(), otherTask.join());
        returnorderInfo;
    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() - 1);
        System.out.println(forkJoinPool.invoke(new OrderTask()));
    }
}

class CustomerTask extends RecursiveTask {
    @Override
    protected CustomerInfocompute() {
        System.out.println("執(zhí)行" + this.getClass().getSimpleName() + "線程名字為:" + Thread.currentThread().getName());
        return new CustomerInfo();
    }
}

class TenantTask extends RecursiveTask {
    @Override
    protected TenantInfocompute() {
        System.out.println("執(zhí)行" + this.getClass().getSimpleName() + "線程名字為:" + Thread.currentThread().getName());
        return new TenantInfo();
    }
}

class DiscountTask extends RecursiveTask {
    @Override
    protected DiscountInfocompute() {
        System.out.println("執(zhí)行" + this.getClass().getSimpleName() + "線程名字為:" + Thread.currentThread().getName());
        return new DiscountInfo();
    }
}

class FoodTask extends RecursiveTask {
    @Override
    protected FoodListInfocompute() {
        System.out.println("執(zhí)行" + this.getClass().getSimpleName() + "線程名字為:" + Thread.currentThread().getName());
        return new FoodListInfo();
    }
}

class OtherTask extends RecursiveTask {
    @Override
    protected OtherInfocompute() {
        System.out.println("執(zhí)行" + this.getClass().getSimpleName() + "線程名字為:" + Thread.currentThread().getName());
        return new OtherInfo();
    }
}

我們定義一個OrderTask并且定義五個獲取信息的任務(wù)片任,在compute中分別fork執(zhí)行這五個任務(wù)偏友,最后在將這五個任務(wù)的結(jié)果通過Join獲得,最后完成我們的并行化的需求对供。

4. parallelStream

在jdk1.8中提供了并行流的API位他,當(dāng)我們使用集合的時候能很好的進行并行處理,下面舉了一個簡單的例子從1加到100:

public class ParallelStream {
    public static void main(String[] args) {
        ArrayList list = new ArrayList();

        for (int i = 1; i <= 100; i++) {
            list.add(i);
        }

        LongAdder sum = new LongAdder();
        list.parallelStream().forEach(integer -> {
            System.out.println("當(dāng)前線程" + Thread.currentThread().getName());
            sum.add(integer);
        });
        System.out.println(sum);
    }
}

parallelStream中底層使用的那一套也是Fork/Join的那一套,默認(rèn)的并發(fā)程度是可用CPU數(shù)-1产场。

5.分片

可以想象有這么一個需求鹅髓,每天定時對id在某個范圍之間的用戶發(fā)券,比如這個范圍之間的用戶有幾百萬京景,如果給一臺機器發(fā)的話窿冯,可能全部發(fā)完需要很久的時間,所以分布式調(diào)度框架比如:elastic-job确徙。都提供了分片的功能醒串,比如你用50臺機器执桌,那么id%50=0的在第0臺機器上,=1的在第1臺機器上發(fā)券芜赌,那么我們的執(zhí)行時間其實就分?jǐn)偟搅瞬煌臋C器上了仰挣。

并行化注意事項

線程安全:在parallelStream中我們列舉的代碼中使用的是LongAdder,并沒有直接使用我們的Integer和Long缠沈,這個是因為在多線程環(huán)境下Integer和Long線程不安全膘壶。所以線程安全我們需要特別注意。
合理參數(shù)配置:可以看見我們需要配置的參數(shù)比較多洲愤,比如我們的線程池的大小颓芭,等待隊列大小,并行度大小以及我們的等待超時時間等等禽篱,我們都需要根據(jù)自己的業(yè)務(wù)不斷的調(diào)優(yōu)防止出現(xiàn)隊列不夠用或者超時時間不合理等等畜伐。

CompletableFuture詳細(xì)介紹

JDK8以前的Future

public static void main(String[] args) {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<String> future = executor.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return Thread.currentThread().getName();
            }
        });

        doSomethingElse();//在我們異步操作的同時一樣可以做其他操作

        try {
            String res = future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

上面展示了我們的線程可以并發(fā)方式調(diào)用另一個線程去做我們耗時的操作。當(dāng)我們必須依賴我們的異步結(jié)果的時候我們就可以調(diào)用get方法去獲得躺率。當(dāng)我們調(diào)用get方法的時候如果我們的任務(wù)完成就可以立馬返回玛界,但是如果任務(wù)沒有完成就會阻塞,直到超時為止悼吱。

Future底層是怎么實現(xiàn)的呢慎框? 我們首先來到我們ExecutorService的代碼中submit方法這里會返回一個Future

public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

在sumbmit中會對我們的Callable進行包裝封裝成我們的FutureTask,我們最后的Future其實也是Future的實現(xiàn)類FutureTask后添,F(xiàn)utureTask實現(xiàn)了Runnable接口所以這里直接調(diào)用execute笨枯。在FutureTask代碼中的run方法代碼如下:

public void run() {
        if (state != NEW ||  !UNSAFE.compareAndSwapObject(this, runnerOffset,  null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } 
        .......
    }

可以看見當(dāng)我們執(zhí)行完成之后會set(result)來通知我們的結(jié)果完成了。set(result)代碼如下:

protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

首先用CAS置換狀態(tài)為完成遇西,以及替換結(jié)果馅精,當(dāng)替換結(jié)果完成之后,才會替換為我們的最終狀態(tài)粱檀,這里主要是怕我們設(shè)置完COMPLETING狀態(tài)之后最終值還沒有真正的賦值出去洲敢,而我們的get就去使用了,所以還會有個最終狀態(tài)茄蚯。我們的get()方法的代碼如下:

public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

首先獲得當(dāng)前狀態(tài)压彭,然后判斷狀態(tài)是否完成,如果沒有完成則進入awaitDone循環(huán)等待渗常,這也是我們阻塞的代碼壮不,然后返回我們的最終結(jié)果。

缺陷
我們的Future使用很簡單皱碘,這也導(dǎo)致了如果我們想完成一些復(fù)雜的任務(wù)可能就比較難询一。比如下面一些例子:
將兩個異步計算合成一個異步計算,這兩個異步計算互相獨立,同時第二個又依賴第一個的結(jié)果健蕊。
當(dāng)Future集合中某個任務(wù)最快結(jié)束時缓醋,返回結(jié)果。
等待Future結(jié)合中的所有任務(wù)都完成绊诲。
通過編程方式完成一個Future任務(wù)的執(zhí)行。
應(yīng)對Future的完成時間褪贵。也就是我們的回調(diào)通知掂之。

CompletableFuture
CompletableFuture是JDK8提出的一個支持非阻塞的多功能的Future,同樣也是實現(xiàn)了Future接口脆丁。

下面會寫一個比較簡單的例子:

public static void main(String[] args) {
        CompletableFuture<String> completableFuture = new CompletableFuture<>();

        new Thread(()->{
            completableFuture.complete(Thread.currentThread().getName());
        }).start();

        doSomethingelse();//做你想做的其他操作
        
        try {
            System.out.println(completableFuture.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

用法上來說和Future有一點不同世舰,我們這里fork了一個新的線程來完成我們的異步操作,在異步操作中我們會設(shè)置值槽卫,然后在外部做我們其他操作跟压。在complete中會用CAS替換result,然后當(dāng)我們get如果可以獲取到值得時候就可以返回了歼培。

錯誤處理
上面介紹了正常情況下但是當(dāng)我們在我們異步線程中產(chǎn)生了錯誤的話就會非常的不幸震蒋,錯誤的異常不會告知給你,會被扼殺在我們的異步線程中躲庄,而我們的get方法會被阻塞查剖。

對于我們的CompletableFuture提供了completeException方法可以讓我們返回我們異步線程中的異常,代碼如下:

public static void main(String[] args) {
        CompletableFuture<String> completableFuture = new CompletableFuture<>();

        new Thread(()->{
            completableFuture.completeExceptionally(new RuntimeException("error"));
            completableFuture.complete(Thread.currentThread().getName());
        }).start();

       doSomethingelse();//做你想做的耗時操作

        try {
            System.out.println(completableFuture.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
--------------
輸出:
java.util.concurrent.ExecutionException: java.lang.RuntimeException: error
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1887)
    at futurepackge.jdk8Future.main(jdk8Future.java:19)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.lang.RuntimeException: error
    at futurepackge.jdk8Future.lambda$main$0(jdk8Future.java:13)
    at futurepackge.jdk8Future$$Lambda$1/1768305536.run(Unknown Source)
    at java.lang.Thread.run(Thread.java:745)

在我們新建的異步線程中直接New一個異常拋出,在我們客戶端中依然可以獲得異常噪窘。

工廠方法創(chuàng)建CompletableFuture

我們的上面的代碼雖然不復(fù)雜笋庄,但是我們的java8依然對其提供了大量的工廠方法,用這些方法更容易完成整個流程倔监。如下面的例子:

public static void main(String[] args) {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() ->{
             return Thread.currentThread().getName();
        });

       doSomethingelse();//做你想做的耗時操作

        try {
            System.out.println(completableFuture.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
---------
輸出:
ForkJoinPool.commonPool-worker-1

上面的例子通過工廠方法supplyAsync提供了一個Completable直砂,在異步線程中的輸出是ForkJoinPool可以看出當(dāng)我們不指定線程池的時候會使用ForkJoinPool,而我們上面的compelte的操作在我們的run方法中做了,源代碼如下:

public void run() {
            CompletableFuture<T> d; Supplier<T> f;
            if ((d = dep) != null && (f = fn) != null) {
                dep = null; 
                fn = null;
                if (d.result == null) {
                    try {
                        d.completeValue(f.get());
                    } catch (Throwable ex) {
                        d.completeThrowable(ex);
                    }
                }
                d.postComplete();
            }
        }

上面代碼中通過d.completeValue(f.get());設(shè)置了我們的值浩习。同樣的構(gòu)造方法還有runasync等等静暂。

計算結(jié)果完成時的處理
當(dāng)CompletableFuture計算結(jié)果完成時,我們需要對結(jié)果進行處理,或者當(dāng)CompletableFuture產(chǎn)生異常的時候需要對異常進行處理瘦锹。有如下幾種方法:

public CompletableFuture<T>     whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)

上面的四種方法都返回了CompletableFuture籍嘹,當(dāng)我們Action執(zhí)行完畢的時候,future返回的值和我們原始的CompletableFuture的值是一樣的弯院。上面以Async結(jié)尾的會在新的線程池中執(zhí)行辱士,上面沒有一Async結(jié)尾的會在之前的CompletableFuture執(zhí)行的線程中執(zhí)行。例子代碼如下:

public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(jdk8Future::getMoreData);
        Future<Integer> f = future.whenComplete((v, e) -> {
            System.out.println(Thread.currentThread().getName());
            System.out.println(v);
        });
        System.out.println("Main" + Thread.currentThread().getName());
        System.out.println(f.get());
    }

exceptionally方法返回一個新的CompletableFuture听绳,當(dāng)原始的CompletableFuture拋出異常的時候颂碘,就會觸發(fā)這個CompletableFuture的計算,調(diào)用function計算值,否則如果原始的CompletableFuture正常計算完后头岔,這個新的CompletableFuture也計算完成塔拳,它的值和原始的CompletableFuture的計算的值相同。也就是這個exceptionally方法用來處理異常的情況峡竣。

上面我們討論了如何計算結(jié)果完成時進行的處理靠抑,接下來我們討論如何對計算結(jié)果完成時,對結(jié)果進行轉(zhuǎn)換适掰。

public <U> CompletableFuture<U>     thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

這里同樣也是返回CompletableFuture颂碧,但是這個結(jié)果會由我們自定義返回去轉(zhuǎn)換他,同樣的不以Async結(jié)尾的方法由原來的線程計算类浪,以Async結(jié)尾的方法由默認(rèn)的線程池ForkJoinPool.commonPool()或者指定的線程池executor運行载城。Java的CompletableFuture類總是遵循這樣的原則,下面就不一一贅述了费就。
例子代碼如下:

public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            return 10;
        });
        CompletableFuture<String> f = future.thenApply(i ->i+1 ).thenApply(i-> String.valueOf(i));
        System.out.println(f.get());
    }

上面的最終結(jié)果會輸出11诉瓦,我們成功將其用兩個thenApply轉(zhuǎn)換為String。

計算結(jié)果完成時的消費
上面已經(jīng)講了結(jié)果完成時的處理和轉(zhuǎn)換力细,他們最后的CompletableFuture都會返回對應(yīng)的值睬澡,這里還會有一個只會對計算結(jié)果消費不會返回任何結(jié)果的方法。

public CompletableFuture<Void>  thenAccept(Consumer<? super T> action)
public CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action, Executor executor)

函數(shù)接口為Consumer眠蚂,就知道了只會對函數(shù)進行消費猴贰,例子代碼如下:

public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            return 10;
        });
        future.thenAccept(System.out::println);
    }

這個方法用法很簡單我就不多說了.Accept家族還有個方法是用來合并結(jié)果當(dāng)兩個CompletionStage都正常執(zhí)行的時候就會執(zhí)行提供的action,它用來組合另外一個異步的結(jié)果河狐。

public <U> CompletableFuture<Void>  thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void>  thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void>  thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor)
public     CompletableFuture<Void>  runAfterBoth(CompletionStage<?> other,  Runnable action)

runAfterBoth是當(dāng)兩個CompletionStage都正常完成計算的時候,執(zhí)行一個Runnable米绕,這個Runnable并不使用計算的結(jié)果。 示例代碼如下:

public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            return 10;
        });
        System.out.println(future.thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
            return 20;
        }),(x,y) -> System.out.println(x+y)).get());
    }

CompletableFuture也提供了執(zhí)行Runnable的辦法馋艺,這里我們就不能使用我們future中的值了栅干。

public CompletableFuture<Void>  thenRun(Runnable action)
public CompletableFuture<Void>  thenRunAsync(Runnable action)
public CompletableFuture<Void>  thenRunAsync(Runnable action, Executor executor)

對計算結(jié)果的組合
首先是介紹一下連接兩個future的方法:

public <U> CompletableFuture<U>     thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U>     thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U>     thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)

對于Compose可以連接兩個CompletableFuture,其內(nèi)部處理邏輯是當(dāng)?shù)谝粋€CompletableFuture處理沒有完成時會合并成一個CompletableFuture,如果處理完成捐祠,第二個future會緊接上一個CompletableFuture進行處理碱鳞。

public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            return 10;
        });
        System.out.println(future.thenCompose(i -> CompletableFuture.supplyAsync(() -> { return i+1;})).get());
    }

我們上面的thenAcceptBoth講了合并兩個future,但是沒有返回值這里將介紹一個有返回值的方法,如下:

public <U,V> CompletableFuture<V>   thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V>   thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V>   thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)

例子比較簡單如下:

public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            return 10;
        });
        CompletableFuture<String> f = future.thenCombine(CompletableFuture.supplyAsync(() -> {
            return 20;
        }),(x,y) -> {return "計算結(jié)果:"+x+y;});
        System.out.println(f.get());
    }

上面介紹了兩個future完成的時候應(yīng)該完成的工作踱蛀,接下來介紹任意一個future完成時需要執(zhí)行的工作窿给,方法如下:

public CompletableFuture<Void>  acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void>  acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void>  acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
public <U> CompletableFuture<U>     applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U>     applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U>     applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)

上面兩個是一個是純消費不返回結(jié)果,一個是計算后返回結(jié)果率拒。

其他方法

public static CompletableFuture<Void>       allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object>     anyOf(CompletableFuture<?>... cfs)

allOf方法是當(dāng)所有的CompletableFuture都執(zhí)行完后執(zhí)行計算崩泡。
anyOf方法是當(dāng)任意一個CompletableFuture執(zhí)行完后就會執(zhí)行計算,計算的結(jié)果相同猬膨。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末角撞,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌谒所,老刑警劉巖热康,帶你破解...
    沈念sama閱讀 207,113評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異劣领,居然都是意外死亡姐军,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,644評論 2 381
  • 文/潘曉璐 我一進店門尖淘,熙熙樓的掌柜王于貴愁眉苦臉地迎上來庶弃,“玉大人,你說我怎么就攤上這事德澈。” “怎么了固惯?”我有些...
    開封第一講書人閱讀 153,340評論 0 344
  • 文/不壞的土叔 我叫張陵梆造,是天一觀的道長。 經(jīng)常有香客問我葬毫,道長镇辉,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,449評論 1 279
  • 正文 為了忘掉前任贴捡,我火速辦了婚禮忽肛,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘烂斋。我一直安慰自己屹逛,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 64,445評論 5 374
  • 文/花漫 我一把揭開白布汛骂。 她就那樣靜靜地躺著罕模,像睡著了一般。 火紅的嫁衣襯著肌膚如雪帘瞭。 梳的紋絲不亂的頭發(fā)上淑掌,一...
    開封第一講書人閱讀 49,166評論 1 284
  • 那天,我揣著相機與錄音蝶念,去河邊找鬼抛腕。 笑死,一個胖子當(dāng)著我的面吹牛媒殉,可吹牛的內(nèi)容都是我干的担敌。 我是一名探鬼主播,決...
    沈念sama閱讀 38,442評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼廷蓉,長吁一口氣:“原來是場噩夢啊……” “哼柄错!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,105評論 0 261
  • 序言:老撾萬榮一對情侶失蹤售貌,失蹤者是張志新(化名)和其女友劉穎给猾,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體颂跨,經(jīng)...
    沈念sama閱讀 43,601評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡敢伸,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,066評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了恒削。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片池颈。...
    茶點故事閱讀 38,161評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖钓丰,靈堂內(nèi)的尸體忽然破棺而出躯砰,到底是詐尸還是另有隱情,我是刑警寧澤携丁,帶...
    沈念sama閱讀 33,792評論 4 323
  • 正文 年R本政府宣布琢歇,位于F島的核電站,受9級特大地震影響梦鉴,放射性物質(zhì)發(fā)生泄漏李茫。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,351評論 3 307
  • 文/蒙蒙 一肥橙、第九天 我趴在偏房一處隱蔽的房頂上張望魄宏。 院中可真熱鬧,春花似錦存筏、人聲如沸宠互。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,352評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽名秀。三九已至,卻和暖如春藕溅,著一層夾襖步出監(jiān)牢的瞬間匕得,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,584評論 1 261
  • 我被黑心中介騙來泰國打工巾表, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留汁掠,地道東北人。 一個月前我還...
    沈念sama閱讀 45,618評論 2 355
  • 正文 我出身青樓集币,卻偏偏與公主長得像考阱,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子鞠苟,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,916評論 2 344