CompletionStage源碼
/**
* A stage of a possibly asynchronous computation, that performs an
* action or computes a value when another CompletionStage completes.
* A stage completes upon termination of its computation, but this may
* in turn trigger other dependent stages. The functionality defined
* in this interface takes only a few basic forms, which expand out to
* a larger set of methods to capture a range of usage styles: <ul>
*當另外一個CompletionStage完成時,此時正處于執(zhí)行一個動作或者計算一個值的異步計算階段筹淫。
該階段在其計算終止時完成就斤,但這又可能觸發(fā)其他從屬階段。該定義的功能僅采用幾種基本形式苟径,這些形式擴展為更大的方法集,以捕獲各種使用樣式
* <li>The computation performed by a stage may be expressed as a
* Function, Consumer, or Runnable (using methods with names including
* <em>apply</em>, <em>accept</em>, or <em>run</em>, respectively)
* depending on whether it requires arguments and/or produces results.
* For example, {@code stage.thenApply(x -> square(x)).thenAccept(x ->
* System.out.print(x)).thenRun(() -> System.out.println())}. An
* additional form (<em>compose</em>) applies functions of stages
* themselves, rather than their results. </li>
*根據(jù)階段是否需要參數(shù)和/或產(chǎn)生結(jié)果躬审,可以將階段執(zhí)行的計算表示為“Function”,“Consumer”或“Runnable ”(使用名稱分別包括“apply”蟆盐,“accept”或“run”的方法)承边。
* <li> One stage's execution may be triggered by completion of a
* single stage, or both of two stages, or either of two stages.
* Dependencies on a single stage are arranged using methods with
* prefix <em>then</em>. Those triggered by completion of
* <em>both</em> of two stages may <em>combine</em> their results or
* effects, using correspondingly named methods. Those triggered by
* <em>either</em> of two stages make no guarantees about which of the
* results or effects are used for the dependent stage's
* computation.</li>
*一個階段的執(zhí)行可以通過完成一個階段,兩個階段或兩個階段之一來觸發(fā)石挂。
*對單個階段的依賴關(guān)系使用帶有前綴的方法來安排博助。
*通過兩個階段都完成而觸發(fā)的結(jié)果可以使用相應(yīng)命名的方法來組合其結(jié)果或效果。
*由兩個階段中的任何一個觸發(fā)的事件都不能保證將哪個結(jié)果或效果用于從屬階段的計算痹愚。
* <li> Dependencies among stages control the triggering of
* computations, but do not otherwise guarantee any particular
* ordering. Additionally, execution of a new stage's computations may
* be arranged in any of three ways: default execution, default
* asynchronous execution (using methods with suffix <em>async</em>
* that employ the stage's default asynchronous execution facility),
* or custom (via a supplied {@link Executor}). The execution
* properties of default and async modes are specified by
* CompletionStage implementations, not this interface. Methods with
* explicit Executor arguments may have arbitrary execution
* properties, and might not even support concurrent execution, but
* are arranged for processing in a way that accommodates asynchrony.
*階段之間的依賴關(guān)系控制著計算的觸發(fā)富岳,但不能保證任何特定的順序。
另外拯腮,新階段計算的執(zhí)行可以三種方式安排:默認執(zhí)行窖式,
默認異步執(zhí)行(使用帶有后綴異步的方法,采用該階段的默認異步執(zhí)行工具)或通過提供的執(zhí)行程序進行自定義动壤。
默認模式和異步模式的執(zhí)行屬性由CompletionStage實現(xiàn)而不是此接口指定萝喘。
具有顯式Executor自變量的方法可能具有任意執(zhí)行屬性,甚至可能不支持并發(fā)執(zhí)行
琼懊,但被安排為以適應(yīng)異步的方式進行處理
* <li> Two method forms support processing whether the triggering
* stage completed normally or exceptionally: Method {@link
* #whenComplete whenComplete} allows injection of an action
* regardless of outcome, otherwise preserving the outcome in its
* completion. Method {@link #handle handle} additionally allows the
* stage to compute a replacement result that may enable further
* processing by other dependent stages. In all other cases, if a
* stage's computation terminates abruptly with an (unchecked)
* exception or error, then all dependent stages requiring its
* completion complete exceptionally as well, with a {@link
* CompletionException} holding the exception as its cause. If a
* stage is dependent on <em>both</em> of two stages, and both
* complete exceptionally, then the CompletionException may correspond
* to either one of these exceptions. If a stage is dependent on
* <em>either</em> of two others, and only one of them completes
* exceptionally, no guarantees are made about whether the dependent
* stage completes normally or exceptionally. In the case of method
* {@code whenComplete}, when the supplied action itself encounters an
* exception, then the stage exceptionally completes with this
* exception if not already completed exceptionally.</li>
* </ul>
*有兩種方法形式支持處理過程阁簸,無論觸發(fā)階段是正常完成還是異常完成:方法whenComplete允許不考慮結(jié)
*果而注入動作,否則將結(jié)果保留在完成時哼丈。方法句柄還允許該階段計算替換結(jié)果启妹,
*該替換結(jié)果使其他從屬階段可以進行進一步處理。
*在所有其他情況下醉旦,如果一個階段的計算由于(未經(jīng)檢查的)異橙拿祝或錯誤而突然終止,
*那么所有需要完成該過程的從屬階段也會異常完成车胡,并且CompletionException將異常作為其原因咙崎。
*如果一個階段同時依賴于兩個階段,并且都異常完成吨拍,則CompletionException可能對應(yīng)于這些異常中的任何一個褪猛。
*如果一個階段依賴于其他兩個階段中的任何一個,并且其中只有一個異常完成羹饰,則不能保證從屬階段是正常完成還是異常完成伊滋。
*在方法whenComplete的情況下碳却,如果提供的操作本身遇到異常,則該階段將以該異常異常完成(如果尚未異常完成)笑旺。
* <p>All methods adhere to the above triggering, execution, and
* exceptional completion specifications (which are not repeated in
* individual method specifications). Additionally, while arguments
* used to pass a completion result (that is, for parameters of type
* {@code T}) for methods accepting them may be null, passing a null
* value for any other parameter will result in a {@link
* NullPointerException} being thrown.
*所有方法都遵循上述觸發(fā)昼浦,執(zhí)行和特殊完成規(guī)范(在各個方法規(guī)范中不再重復(fù))。
* 此外筒主,雖然用于傳遞完成結(jié)果的參數(shù)(即关噪,對于T類型參數(shù)接受方法的參數(shù)可能為null,
*但為其他任何參數(shù)傳遞null值將導(dǎo)致拋出NullPointerException
* <p>This interface does not define methods for initially creating,
* forcibly completing normally or exceptionally, probing completion
* status or results, or awaiting completion of a stage.
* Implementations of CompletionStage may provide means of achieving
* such effects, as appropriate. Method {@link #toCompletableFuture}
* enables interoperability among different implementations of this
* interface by providing a common conversion type.
*該接口未定義用于初始創(chuàng)建乌妙,強制正呈雇茫或例外地完成,探測完成狀態(tài)或結(jié)果或等待階段完成的方法藤韵。
*CompletionStage的實現(xiàn)可以酌情提供實現(xiàn)這種效果的方法虐沥。
*方法toCompletableFuture通過提供常見的轉(zhuǎn)換類型來實現(xiàn)此接口的不同實現(xiàn)之間的互操作性
* @author Doug Lea
* @since 1.8
*/
public interface CompletionStage<T>
在這之前,先學(xué)習(xí)三個接口泽艘,Supplier欲险,Function,Consumer:
Supplier
可以簡單理解為數(shù)據(jù)的提供者
/**
* Represents a supplier of results.
*代表結(jié)果的提供者匹涮。
* <p>There is no requirement that a new or distinct result be returned each
* time the supplier is invoked.
*不需要每次調(diào)用supplier 時都返回新的或不同的結(jié)果天试。
* <p>This is a <a href="package-summary.html">functional interface</a>
* whose functional method is {@link #get()}.
*
* @param <T> the type of results supplied by this supplier
*
* @since 1.8
*/
@FunctionalInterface
public interface Supplier<T>
Function
可以理解數(shù)據(jù)轉(zhuǎn)換,接收一個參數(shù)然低,然后返回一個數(shù)據(jù)
/**
* Represents a function that accepts one argument and produces a result.
*表示接受一個參數(shù)并產(chǎn)生一個結(jié)果的功能秋秤。
* <p>This is a <a href="package-summary.html">functional interface</a>
* whose functional method is {@link #apply(Object)}.
*
* @param <T> the type of the input to the function
*函數(shù)輸入的數(shù)據(jù)類型
* @param <R> the type of the result of the function
*函數(shù)返回的數(shù)據(jù)類型
* @since 1.8
*/
@FunctionalInterface
public interface Function<T, R>
Consumer
可以理解為消費掉數(shù)據(jù),沒有返回值
/**
* Represents an operation that accepts a single input argument and returns no
* result. Unlike most other functional interfaces, {@code Consumer} is expected
* to operate via side-effects.
*表示一個接受單個輸入?yún)?shù)且不返回結(jié)果的操作脚翘。 與大多數(shù)其他功能接口不同灼卢,消費者有望通過副作用進行操作。
* <p>This is a <a href="package-summary.html">functional interface</a>
* whose functional method is {@link #accept(Object)}.
*
* @param <T> the type of the input to the operation
*
* @since 1.8
*/
@FunctionalInterface
public interface Consumer<T>
總結(jié):
其實這三者的角色可以理解為數(shù)據(jù)的產(chǎn)生来农、數(shù)據(jù)的轉(zhuǎn)換以及數(shù)據(jù)的消費鞋真。無論是數(shù)據(jù)的轉(zhuǎn)換,還是數(shù)據(jù)的消費沃于,前提條件就是你得有數(shù)據(jù)不是涩咖,所以這三者中,Suplier在其中扮演著必不可少的角色繁莹。
可以站在分工的角度類比一下工作流檩互。任務(wù)是有時序關(guān)系的,比如有串行關(guān)系咨演、并行關(guān)系闸昨、匯聚關(guān)系等,CompletionStage 接口可以清晰地描述任務(wù)之間的這種時序關(guān)系.下面一一介紹,CompletionStage 接口如何描述串行關(guān)系、AND 聚合關(guān)系饵较、OR 聚合關(guān)系以及異常處理拍嵌。
1. 描述串行關(guān)系
CompletionStage 接口里面描述串行關(guān)系,主要是 thenApply循诉、thenAccept横辆、thenRun 和 thenCompose 這四個系列的接口
thenApply系列
CompletionStage<R> thenApply(fn);
CompletionStage<R> thenApplyAsync(fn);
CompletionStage<R> thenApplyAsync(fn,executor);
CompletionStage<Void> thenAccept(consumer);
CompletionStage<Void> thenAcceptAsync(consumer);
CompletionStage<Void> thenAcceptAsync(consumer,executor);
CompletionStage<Void> thenRun(action);
CompletionStage<Void> thenRunAsync(action);
CompletionStage<Void> thenRunAsync(action,executor);
CompletionStage<R> thenCompose(fn);
CompletionStage<R> thenComposeAsync(fn);
CompletionStage<R> thenComposeAsync(fn,executor);
thenApply(Function<? super T,? extends U> fn)
返回一個新的CompletionStage,當此階段正常完成時茄猫,將使用該階段的結(jié)果作為所提供函數(shù)的參數(shù)來執(zhí)行
/**
* Returns a new CompletionStage that, when this stage completes
* normally, is executed with this stage's result as the argument
* to the supplied function.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param fn the function to use to compute the value of
* the returned CompletionStage
* @param <U> the function's return type
* @return the new CompletionStage
*/
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
thenApplyAsync(Function<? super T,? extends U> fn)
返回一個新的CompletionStage狈蚤,當此階段正常完成時,將使用該階段的默認異步執(zhí)行工具執(zhí)行該階段脆侮,并將該階段的結(jié)果作為所提供函數(shù)的參數(shù)
/**
* Returns a new CompletionStage that, when this stage completes
* normally, is executed using this stage's default asynchronous
* execution facility, with this stage's result as the argument to
* the supplied function.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param fn the function to use to compute the value of
* the returned CompletionStage
* @param <U> the function's return type
* @return the new CompletionStage
*/
public <U> CompletionStage<U> thenApplyAsync
(Function<? super T,? extends U> fn);
thenApplyAsync (Function<? super T,? extends U> fn,Executor executor)
返回一個新的CompletionStage,當此階段正常完成時阿浓,將使用該階段的默認異步執(zhí)行工具執(zhí)行該階段,并將該階段的結(jié)果作為所提供函數(shù)的參數(shù)蹋绽。
/**
* Returns a new CompletionStage that, when this stage completes
* normally, is executed using the supplied Executor, with this
* stage's result as the argument to the supplied function.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param fn the function to use to compute the value of
* the returned CompletionStage
* @param executor the executor to use for asynchronous execution
* @param <U> the function's return type
* @return the new CompletionStage
*/
public <U> CompletionStage<U> thenApplyAsync (Function<? super T,? extends U> fn,Executor executor);
Show the code
public class ThenApplyDemo {
public static void main(String[] args) {
CompletableFuture thenApply=CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "Hello";
}
}).thenApply(new Function<String, String>() {
@Override
public String apply(String s) {
return s+"---";
}
}).thenApply(new Function<String, String>() {
@Override
public String apply(String s) {
return s+"World";
}
});
try {
System.out.println((String)thenApply.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
Show the result of code
Hello---World
Process finished with exit code 0
thenAccept系列
thenAccept(Consumer<? super T> action)
返回一個新的CompletionStage芭毙,當此階段正常完成時,將使用該階段的結(jié)果作為所提供操作的參數(shù)來執(zhí)行該階段卸耘。
/**
* Returns a new CompletionStage that, when this stage completes
* normally, is executed with this stage's result as the argument
* to the supplied action.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param action the action to perform before completing the
* returned CompletionStage
* @return the new CompletionStage
*/
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
thenAcceptAsync(Consumer<? super T> action)
返回一個新的CompletionStage退敦,當該階段正常完成時,將使用該階段的默認異步執(zhí)行工具執(zhí)行該階段蚣抗,并將該階段的結(jié)果作為所提供操作的參數(shù)侈百。
/**
* Returns a new CompletionStage that, when this stage completes
* normally, is executed using this stage's default asynchronous
* execution facility, with this stage's result as the argument to
* the supplied action.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param action the action to perform before completing the
* returned CompletionStage
* @return the new CompletionStage
*/
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
thenAcceptAsync(Consumer<? super T> action,Executor executor)
返回一個新的CompletionStage,當此階段正常完成時翰铡,將使用提供的Executor執(zhí)行此階段钝域,并將該階段的結(jié)果作為提供的操作的參數(shù)。
/**
* Returns a new CompletionStage that, when this stage completes
* normally, is executed using the supplied Executor, with this
* stage's result as the argument to the supplied action.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param action the action to perform before completing the
* returned CompletionStage
* @param executor the executor to use for asynchronous execution
* @return the new CompletionStage
*/
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
Show the code
public class ThenAcceptDemo {
public static void main(String[] args) {
CompletableFuture thenAccept=CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "Hello thenAccept";
}
}).thenAccept(new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
});
try {
System.out.println((String) thenAccept.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
Show the result of code
Hello thenAccept
null
Process finished with exit code 0
Show the summary
1锭魔、Consumer已經(jīng)正常消費掉數(shù)據(jù)了
2例证、從打印的數(shù)據(jù)可以驗證,Consumer是沒有返回數(shù)據(jù)的迷捧,所以獲取到的數(shù)據(jù)為null织咧。
thenRun系列
thenRun(Runnable action)
返回一個新的CompletionStage,當此階段正常完成時漠秋,該CompletionStage執(zhí)行給定的操作
/**
* Returns a new CompletionStage that, when this stage completes
* normally, executes the given action.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param action the action to perform before completing the
* returned CompletionStage
* @return the new CompletionStage
*/
public CompletionStage<Void> thenRun(Runnable action);
thenRunAsync(Runnable action)
返回一個新的CompletionStage笙蒙,當此階段正常完成時,將使用此階段的默認異步執(zhí)行工具來執(zhí)行給定的動作庆锦。
/**
* Returns a new CompletionStage that, when this stage completes
* normally, executes the given action using this stage's default
* asynchronous execution facility.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param action the action to perform before completing the
* returned CompletionStage
* @return the new CompletionStage
*/
public CompletionStage<Void> thenRunAsync(Runnable action);
thenRunAsync(Runnable action,Executor executor)
返回一個新的CompletionStage捅位,該階段正常完成時,將使用提供的Executor執(zhí)行給定的動作。
/**
* Returns a new CompletionStage that, when this stage completes
* normally, executes the given action using the supplied Executor.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param action the action to perform before completing the
* returned CompletionStage
* @param executor the executor to use for asynchronous execution
* @return the new CompletionStage
*/
public CompletionStage<Void> thenRunAsync(Runnable action,
Executor executor);
thenCompose系列
thenCompose (Function<? super T, ? extends CompletionStage<U>> fn)
返回一個新的CompletionStage绿渣,當此階段正常完成時朝群,將使用此階段作為所提供函數(shù)的參數(shù)來執(zhí)行
/**
* Returns a new CompletionStage that, when this stage completes
* normally, is executed with this stage as the argument
* to the supplied function.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param fn the function returning a new CompletionStage
* @param <U> the type of the returned CompletionStage's result
* @return the CompletionStage
*/
public <U> CompletionStage<U> thenCompose
(Function<? super T, ? extends CompletionStage<U>> fn);
thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
返回一個新的CompletionStage,當此階段正常完成時中符,將使用該階段的默認異步執(zhí)行工具將其作為提供的函數(shù)的參數(shù)來執(zhí)行
/**
* Returns a new CompletionStage that, when this stage completes
* normally, is executed using this stage's default asynchronous
* execution facility, with this stage as the argument to the
* supplied function.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param fn the function returning a new CompletionStage
* @param <U> the type of the returned CompletionStage's result
* @return the CompletionStage
*/
public <U> CompletionStage<U> thenComposeAsync
(Function<? super T, ? extends CompletionStage<U>> fn);
thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,Executor executor)
返回一個新的CompletionStage姜胖,當此階段正常完成時,將使用提供的Executor執(zhí)行此階段淀散,并將該階段的結(jié)果作為提供的函數(shù)的參數(shù)右莱。
/**
* Returns a new CompletionStage that, when this stage completes
* normally, is executed using the supplied Executor, with this
* stage's result as the argument to the supplied function.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param fn the function returning a new CompletionStage
* @param executor the executor to use for asynchronous execution
* @param <U> the type of the returned CompletionStage's result
* @return the CompletionStage
*/
public <U> CompletionStage<U> thenComposeAsync
(Function<? super T, ? extends CompletionStage<U>> fn,
Executor executor);
Show the code
public class ThenComposeDemo {
public static void main(String[] args) {
CompletableFuture thenCompose=CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "Hello supplyAsync";
}
}).thenCompose(new Function<String, CompletionStage<String>>() {
@Override
public CompletionStage<String> apply(String s) {
return CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return s+"--"+"Hello thenCompose";
}
});
}
});
try {
System.out.println(thenCompose.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
Show the result of code
Hello supplyAsync--Hello thenCompose
Process finished with exit code 0
2. 描述 AND 匯聚關(guān)系
CompletionStage 接口里面描述 AND 匯聚關(guān)系,主要是 thenCombine档插、thenAcceptBoth 和 runAfterBoth 系列的接口慢蜓,這些接口的區(qū)別也是源自 fn、consumer郭膛、action 這三個核心參數(shù)不同
CompletionStage<R> thenCombine(other, fn);
CompletionStage<R> thenCombineAsync(other, fn);
CompletionStage<R> thenCombineAsync(other, fn,executor);
CompletionStage<Void> thenAcceptBoth(other, consumer);
CompletionStage<Void> thenAcceptBothAsync(other, consumer);
CompletionStage<Void> thenAcceptBothAsync(other, consumer,executor);
CompletionStage<Void> runAfterBoth(other, action);
CompletionStage<Void> runAfterBothAsync(other, action);
CompletionStage<Void> runAfterBothAsync(other, action,executor);
thenCombine的第二個參數(shù)是BiFunction,前面我們解釋過晨抡,F(xiàn)unction類型的參數(shù)會接收一個參數(shù),并且最后會返回一個結(jié)果则剃;thenAcceptBoth的第二個參數(shù)是BiConsumer,它是個消費型的耘柱,只接收參數(shù)并且消費掉該數(shù)據(jù);runAfterBoth的第二個參數(shù)是Runnable棍现。
thenCombine系列
thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
返回一個新的CompletionStage调煎,當此階段和另一個給定階段都正常完成時,將使用兩個結(jié)果作為所提供函數(shù)的參數(shù)來執(zhí)行己肮。
/**
* Returns a new CompletionStage that, when this and the other
* given stage both complete normally, is executed with the two
* results as arguments to the supplied function.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param other the other CompletionStage
* @param fn the function to use to compute the value of
* the returned CompletionStage
* @param <U> the type of the other CompletionStage's result
* @param <V> the function's return type
* @return the new CompletionStage
*/
public <U,V> CompletionStage<V> thenCombine
(CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn);
thenCombineAsync (CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
返回一個新的CompletionStage士袄,當此階段和另一個給定階段正常完成時,將使用該階段的默認異步執(zhí)行工具來執(zhí)行谎僻,并將兩個結(jié)果作為所提供函數(shù)的參數(shù)娄柳。
/**
* Returns a new CompletionStage that, when this and the other
* given stage complete normally, is executed using this stage's
* default asynchronous execution facility, with the two results
* as arguments to the supplied function.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param other the other CompletionStage
* @param fn the function to use to compute the value of
* the returned CompletionStage
* @param <U> the type of the other CompletionStage's result
* @param <V> the function's return type
* @return the new CompletionStage
*/
public <U,V> CompletionStage<V> thenCombineAsync
(CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn);
thenCombineAsync (CompletionStage<? extends U> other,BiFunction<? super T,? super U,?extends V> fn,Executor executor)
返回一個新的CompletionStage,當此階段和另一個給定階段正常完成時艘绍,將使用提供的executor將其執(zhí)行西土,并將兩個結(jié)果作為提供函數(shù)的參數(shù)。
/**
* Returns a new CompletionStage that, when this and the other
* given stage complete normally, is executed using the supplied
* executor, with the two results as arguments to the supplied
* function.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param other the other CompletionStage
* @param fn the function to use to compute the value of
* the returned CompletionStage
* @param executor the executor to use for asynchronous execution
* @param <U> the type of the other CompletionStage's result
* @param <V> the function's return type
* @return the new CompletionStage
*/
public <U,V> CompletionStage<V> thenCombineAsync
(CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn,
Executor executor);
Show the code
public class ThenCombineDemo {
public static void main(String[] args) {
CompletableFuture supplyAsync=CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "Hello";
}
});
CompletableFuture thenCombine=CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "ThenCombine";
}
});
CompletableFuture result=supplyAsync.thenCombine(thenCombine, new BiFunction<String,String,String>() {
@Override
public String apply(String o, String o2) {
return o+"--"+o2;
}
});
try {
System.out.println(result.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
Show the result
Hello--ThenCombine
Process finished with exit code 0
Show the summary
上面的例子我們可以看出鞍盗,thenCombine和thenApply一樣都可以對返回的數(shù)據(jù)進行轉(zhuǎn)換操作需了,不同的是,thenCombine針對的是兩個CompletableFuture的返回結(jié)果般甲,而thenApply是針對同一個CompletableFuture的返回結(jié)果進行轉(zhuǎn)換操作的肋乍。
thenAcceptBoth系列
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action)
/**
* Returns a new CompletionStage that, when this and the other
* given stage both complete normally, is executed with the two
* results as arguments to the supplied action.
*返回一個新的CompletionStage,當此階段和另一個給定階段都正常完成時敷存,將使用兩個結(jié)果作為所提供操作的參數(shù)來執(zhí)行墓造。
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param other the other CompletionStage
* @param action the action to perform before completing the
* returned CompletionStage
* @param <U> the type of the other CompletionStage's result
* @return the new CompletionStage
*/
public <U> CompletionStage<Void> thenAcceptBoth
(CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action)
runAfterBoth系列
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action)
/**
* Returns a new CompletionStage that, when this and the other
* given stage both complete normally, executes the given action.
*返回一個新的CompletionStage堪伍,當此階段和另一個給定階段都正常完成時,執(zhí)行給定操作
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param other the other CompletionStage
* @param action the action to perform before completing the
* returned CompletionStage
* @return the new CompletionStage
*/
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,
Runnable action)
Show the code
public class RunAfterBothDemo {
public static void main(String[] args) {
CompletableFuture runAsync=CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("runAsync:"+System.currentTimeMillis());
}
});
CompletableFuture other=CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
System.out.println("other:"+System.currentTimeMillis());
}
});
runAsync.runAfterBoth(other, new Runnable() {
@Override
public void run() {
System.out.println("runAfterBoth:"+System.currentTimeMillis());
}
});
try {
runAsync.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
Show the result of code
other:1621507355629
runAsync:1621507356634
runAfterBoth:1621507356634
Process finished with exit code 0
由于在runAsync里面加入了sleep(),所有other先于runAsync運行觅闽。
3. 描述 OR 匯聚關(guān)系
CompletionStage 接口里面描述 OR 匯聚關(guān)系帝雇,主要是 applyToEither、acceptEither 和 runAfterEither 系列的接口蛉拙,這些接口的區(qū)別也是源自 fn尸闸、consumer、action 這三個核心參數(shù)不同孕锄。
CompletionStage applyToEither(other, fn);
CompletionStage applyToEitherAsync(other, fn);
CompletionStage applyToEitherAsync(other, fn,executor);
CompletionStage acceptEither(other, consumer);
CompletionStage acceptEitherAsync(other, consumer);
CompletionStage acceptEitherAsync(other, consumer,executor);
CompletionStage runAfterEither(other, action);
CompletionStage runAfterEitherAsync(other, action);
CompletionStage runAfterEitherAsync(other, action,executor);
applyToEither系列
applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn)
/**
* Returns a new CompletionStage that, when either this or the
* other given stage complete normally, is executed with the
* corresponding result as argument to the supplied function.
*返回一個新的CompletionStage吮廉,當此階段或另一個給定階段正常完成時,將使用相應(yīng)的結(jié)果作為所提供函數(shù)的參數(shù)來執(zhí)行
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param other the other CompletionStage
* @param fn the function to use to compute the value of
* the returned CompletionStage
* @param <U> the function's return type
* @return the new CompletionStage
*/
public <U> CompletionStage<U> applyToEither
(CompletionStage<? extends T> other,
Function<? super T, U> fn);
Show the wrong code
public class ApplyToEitherDemo {
public static void main(String[] args) {
CompletableFuture supplyAsync = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "First";
}
});
CompletableFuture other = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "Other";
}
});
supplyAsync.applyToEither(other, new Function<String, String>() {
@Override
public String apply(String o) {
return 0 + "--applyToEither";
}
});
try {
System.out.println(supplyAsync.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
Show the wrong result of code
First
Process finished with exit code 0
我故意在第一個里面做了sleep操作畸肆,讓Other優(yōu)先完成宦芦,但是輸出的結(jié)果卻不是自己想要的,折騰了很久轴脐,也沒有折騰出來调卑,第二天才發(fā)現(xiàn),問題出在最后的get方法上大咱,不是調(diào)用第一個CompletableFuture的get方法恬涧,而是調(diào)用新生成的CompletableFuture的get方法。這個點著實坑了自己一把徽级。
Show the code
public class ApplyToEitherDemo {
public static void main(String[] args) {
CompletableFuture first=CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello";
}
});
CompletableFuture second=CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "World";
}
});
CompletableFuture applyToEither = first.applyToEither(second, new Function<String, String>() {
@Override
public String apply(String s) {
System.out.println("apply:" + s);
return s + "---->applyToEither";
}
});
try {
System.out.println("Result:"+applyToEither.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
Show the result of code
apply:World
Result:World---->applyToEither
Process finished with exit code 0
由于第一個CompletableFuture睡眠了1秒气破,所以第二個CompletableFuture會先完成聊浅,所以最后得到的數(shù)據(jù)為第二個CompletableFuture返回的數(shù)據(jù)
acceptEither系列
acceptEither (CompletionStage<? extends T> other,Consumer<? super T> action)
/**
* Returns a new CompletionStage that, when either this or the
* other given stage complete normally, is executed with the
* corresponding result as argument to the supplied action.
*返回一個新的CompletionStage餐抢,當此階段或另一個給定階段正常完成時,
將使用相應(yīng)的結(jié)果作為所提供操作的參數(shù)來執(zhí)行低匙。
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param other the other CompletionStage
* @param action the action to perform before completing the
* returned CompletionStage
* @return the new CompletionStage
*/
public CompletionStage<Void> acceptEither
(CompletionStage<? extends T> other,
Consumer<? super T> action);
Show the code
public class AcceptEitherDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture cf1=CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello";
}
});
CompletableFuture other=CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "Other";
}
});
CompletableFuture result=cf1.acceptEither(other, new Consumer<String>() {
@Override
public void accept(String o) {
System.out.println("result:"+o);
}
});
System.out.println(result.get());
}
}
Show the result of code
result:Other
null
Process finished with exit code 0
Remove the sleep()
public class AcceptEitherDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture cf1=CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "Hello";
}
});
CompletableFuture other=CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "Other";
}
});
CompletableFuture result=cf1.acceptEither(other, new Consumer<String>() {
@Override
public void accept(String o) {
System.out.println("result:"+o);
}
});
System.out.println(result.get());
}
}
Show the result of remove sleep
result:Hello
null
Process finished with exit code 0
如果移除第一個里面的sleep(),則第一個會先執(zhí)行完畢旷痕,則得到的數(shù)據(jù)為第一個的返回的數(shù)據(jù)
runAfterEither系列
* runAfterEither(CompletionStage<?> other,Runnable action)*
有一個執(zhí)行完則執(zhí)行后面給定的操作
/**
* Returns a new CompletionStage that, when either this or the
* other given stage complete normally, executes the given action.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*返回一個新的CompletionStage,當此階段或另一個給定階段正常完成時顽冶,將執(zhí)
行給定操作欺抗。
* @param other the other CompletionStage
* @param action the action to perform before completing the
* returned CompletionStage
* @return the new CompletionStage
*/
public CompletionStage<Void> runAfterEither(CompletionStage<?> other,
Runnable action);
Show the code
public class RunAfterEitherDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture cf=CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("cf");
}
});
CompletableFuture other=CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
System.out.println("Other");
}
});
CompletableFuture runAfterEither=cf.runAfterEither(other, new Runnable() {
@Override
public void run() {
System.out.println("runAfterEither");
}
});
runAfterEither.get();
}
}
Show the result of code
Other
runAfterEither
Process finished with exit code 0
由于第一個里面增加了sleep方法,所以后面的會先于第一個執(zhí)行完畢强重。
4. 異常處理
雖然上面我們提到的 fn绞呈、consumer、action 它們的核心方法都不允許拋出可檢查異常间景,但是卻無法限制它們拋出運行時異常佃声,例如下面的代碼,執(zhí)行 7/0 就會出現(xiàn)除零錯誤這個運行時異常倘要。非異步編程里面圾亏,我們可以使用 try{}catch{}來捕獲并處理異常,那在異步編程里面,異常該如何處理呢志鹃?
CompletionStage 接口給我們提供的方案非常簡單夭问,比 try{}catch{}還要簡單,下面是相關(guān)的方法曹铃,使用這些方法進行異常處理和串行操作是一樣的缰趋,都支持鏈式編程方式
CompletionStage exceptionally(fn);
CompletionStage<R> whenComplete(consumer);
CompletionStage<R> whenCompleteAsync(consumer);
CompletionStage<R> handle(fn);
CompletionStage<R> handleAsync(fn);
Handle系列
handle (BiFunction<? super T, Throwable, ? extends U> fn)
參數(shù)里面需要傳入一個BiFunction類型的,所以該方法屬于轉(zhuǎn)換類型的铛只,既有入?yún)⒉号郑灿蟹祷刂怠?/p>
/**
* Returns a new CompletionStage that, when this stage completes
* either normally or exceptionally, is executed with this stage's
* result and exception as arguments to the supplied function.
*返回一個新的CompletionStage,當此階段正炒就妫或異常完成時直撤,將使用該階段的結(jié)果和異常作為所提供函數(shù)的參數(shù)來執(zhí)行。
* <p>When this stage is complete, the given function is invoked
* with the result (or {@code null} if none) and the exception (or
* {@code null} if none) of this stage as arguments, and the
* function's result is used to complete the returned stage.
*完成此階段后蜕着,將使用該階段的結(jié)果(如果沒有則為null)和該階段的異常(如果沒有則為null)作為參
數(shù)調(diào)用給定函數(shù)谋竖,
然后使用函數(shù)的結(jié)果來完成返回的階段
* @param fn the function to use to compute the value of the
* returned CompletionStage
* @param <U> the function's return type
* @return the new CompletionStage
*/
public <U> CompletionStage<U> handle
(BiFunction<? super T, Throwable, ? extends U> fn);
Show the code
public class HandleDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture cf=CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int result=10/0;
return result;
}
}).handle(new BiFunction<Integer, Throwable, Integer>() {
@Override
public Integer apply(Integer integer, Throwable throwable) {
int handle=-1;
if (throwable==null){
handle=0;
}else{
handle=-1;
System.out.println(throwable.getMessage());
}
return handle;
}
});
System.out.println(cf.get());
}
}
Show the result of code
java.lang.ArithmeticException: / by zero
-1
Process finished with exit code 0
WhemComplete系列
whenComplete(BiConsumer<? super T, ? super Throwable> action)
由第一個入?yún)iConsumer可以得知,該方法屬于消費型承匣,沒有返回值蓖乘。
/**
* Returns a new CompletionStage with the same result or exception as
* this stage, that executes the given action when this stage completes.
*該階段完成執(zhí)行給定的操作返回具有與該階段相同的結(jié)果或異常的新CompletionStage
* <p>When this stage is complete, the given action is invoked with the
* result (or {@code null} if none) and the exception (or {@code null}
* if none) of this stage as arguments. The returned stage is completed
* when the action returns. If the supplied action itself encounters an
* exception, then the returned stage exceptionally completes with this
* exception unless this stage also completed exceptionally.
*完成此階段后,將使用該階段的結(jié)果(如果沒有則為null)和該階段的異常(如果沒有則為null)作為參
數(shù)來調(diào)用給定操作韧骗。 動作返回時嘉抒,返回的階段已完成。 如果所提供的操作本身遇到異常袍暴,那么返回階段
將異常終止些侍,除非該階段也異常完成。
* @param action the action to perform
* @return the new CompletionStage
*/
public CompletionStage<T> whenComplete
(BiConsumer<? super T, ? super Throwable> action);
Normal code
the normal code
public class WhenCompleteDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture cf = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int result = 10 / 5;
return result;
}
});
CompletableFuture whenCompleteAsync = cf.whenCompleteAsync(new BiConsumer<Integer, Throwable>() {
@Override
public void accept(Integer integer, Throwable throwable) {
if (throwable == null) {
System.out.println(integer);
} else {
System.out.println(throwable.getMessage());
}
}
});
System.out.println("結(jié)果:"+whenCompleteAsync.get());
}
}
the result of normal code
2
結(jié)果:2
Process finished with exit code 0
Except code
the except code
public class WhenCompleteDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture cf = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int result = 10 / 0;
return result;
}
});
CompletableFuture whenCompleteAsync = cf.whenCompleteAsync(new BiConsumer<Integer, Throwable>() {
@Override
public void accept(Integer integer, Throwable throwable) {
if (throwable == null) {
System.out.println(integer);
} else {
System.out.println(throwable.getMessage());
}
}
});
System.out.println("結(jié)果:"+whenCompleteAsync.get());
}
}
the result of except code
java.lang.ArithmeticException: / by zero
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at com.yandroid.thread.future.WhenCompleteDemo.main(WhenCompleteDemo.java:26)
Caused by: java.lang.ArithmeticException: / by zero
at com.yandroid.thread.future.WhenCompleteDemo$2.get(WhenCompleteDemo.java:13)
at com.yandroid.thread.future.WhenCompleteDemo$2.get(WhenCompleteDemo.java:10)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Process finished with exit code 1
雖然也打印出了異常信息政模,但是與Handle不同的是岗宣,該方法導(dǎo)致了main線程的異常;而Handle方法打印出了異常信息淋样,但是main線程還是正常的耗式。
exceptionally
exceptionally(Function<Throwable, ? extends T> fn)
/**
* Returns a new CompletionStage that, when this stage completes
* exceptionally, is executed with this stage's exception as the
* argument to the supplied function. Otherwise, if this stage
* completes normally, then the returned stage also completes
* normally with the same value.
*返回一個新的CompletionStage,當此階段異常完成時趁猴,將以該階段的異常作為提供的函數(shù)的參數(shù)執(zhí)行該
新的CompletionStage刊咳。 否則,如果此階段正常完成儡司,則返回的階段也將以相同的值正常完成娱挨。
* @param fn the function to use to compute the value of the
* returned CompletionStage if this CompletionStage completed
* exceptionally
* @return the new CompletionStage
*/
public CompletionStage<T> exceptionally
(Function<Throwable, ? extends T> fn);
Except code
the except code
public class ExceptionallyDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture cf = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
return 6 / 0;
}
});
CompletableFuture whenComplete = cf.exceptionally(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) {
System.out.println(throwable.getMessage());
return -1;
}
});
System.out.println(whenComplete.get());
}
}
the result of except code
java.lang.ArithmeticException: / by zero
-1
Process finished with exit code 0
Show the normal
normal code
public class ExceptionallyDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture cf = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
return 6 / 3;
}
});
CompletableFuture whenComplete = cf.exceptionally(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) {
System.out.println(throwable.getMessage());
return -1;
}
});
System.out.println(whenComplete.get());
}
}
the result of normal code
2
Process finished with exit code 0