java8中的Consumer袄简、Function
Consummer接口
Consumer 是消費(fèi)者接口鸠蚪,被@FunctionalInterface注解修飾,這意味著它可以看做是一個(gè)方法落蝙,Consumer接口中定義了一個(gè)accept方法胰锌。
void accept(T t);
實(shí)現(xiàn)Consumer接口的消費(fèi)者需要重寫accept方法,accept方法就是消費(fèi)生產(chǎn)者生產(chǎn)出來的對(duì)象進(jìn)行消費(fèi)芹血。Consumer對(duì)象可以這樣定義:
Consumer c0 = System.out::println;
Consumer<Integer> c1 = x -> System.out.println(x + 1);
Consumer對(duì)象可以定義某個(gè)對(duì)象的方法蹲嚣,或者是類的靜態(tài)方法,這就是代表 在Consummer的accept方法中使用該對(duì)象去執(zhí)行該方法祟牲,或者執(zhí)行這個(gè)類的靜態(tài)方法隙畜。也可以用lambda表達(dá)式來進(jìn)行定義。
Function接口
Function接口也是被@FunctionalInterface注解修飾修飾的说贝,表示一個(gè)方法议惰。Function接口中定義了一個(gè)apply方法:
R apply(T t);
與Consumer接口類似,F(xiàn)unction接口的實(shí)現(xiàn)類需要實(shí)現(xiàn)apply方法乡恕,通常Function類型的對(duì)象作為參數(shù)都會(huì)去調(diào)用Function對(duì)象的apply方法言询,也就是執(zhí)行這個(gè)Function。Function對(duì)象也可以按照Consumer接口一樣的方式進(jìn)行定義:
Function f0 = WebfluxTest::aaa;//這里aaa方法必須是有返回值的靜態(tài)方法
Function<Integer, Integer> f1 = (x) -> x + 1;
Function與Consumer的區(qū)別就在于傲宜,Consumer是沒有返回值的运杭,而Function是有返回值的,也因此函卒,如果lambda表達(dá)式只有一行辆憔,F(xiàn)unction中匿名方法中的內(nèi)容是一個(gè)表達(dá)式(表示返回的值),而Consumer中匿名方法的內(nèi)容是一條語句(執(zhí)行的邏輯)报嵌。
Mono和Flux
Mono和Flux都實(shí)現(xiàn)了Publisher接口虱咧,也就是發(fā)布者,一個(gè)Mono對(duì)象中最多發(fā)射一個(gè)信號(hào)(可能是一個(gè)值锚国,或者空腕巡,或者一個(gè)error),而Flux可以發(fā)射多個(gè)信號(hào)血筑。
Publisher接口中只定義了一個(gè)subscribe方法:
public void subscribe(Subscriber<? super T> s);
subscribe方法就是綁定一個(gè)Subscriber去訂閱這個(gè)Publisher中的信號(hào)绘沉。
Subscriber類的定義如下:
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
Mono subcribe一個(gè)Consummer的過程
首先需要先創(chuàng)建一個(gè)Mono對(duì)象煎楣,Mono提供了一些方法創(chuàng)建Mono對(duì)象,如creat车伞,just等:
public static <T> Mono<T> just(T data) {
return onAssembly(new MonoJust<>(data));
}
public static <T> Mono<T> create(Consumer<MonoSink<T>> callback) {
return onAssembly(new MonoCreate<>(callback));
}
public static <T> Mono<T> defer(Supplier<? extends Mono<? extends T>> supplier) {
return onAssembly(new MonoDefer<>(supplier));
}
public static Mono<Long> delay(Duration duration, Scheduler timer) {
return onAssembly(new MonoDelay(duration.toMillis(), TimeUnit.MILLISECONDS, timer));
}
public static <T> Mono<T> empty() {
return MonoEmpty.instance();
}
public static <T> Mono<T> error(Throwable error) {
return onAssembly(new MonoError<>(error));
}
public static <T> Mono<T> first(Mono<? extends T>... monos) {
return onAssembly(new MonoFirst<>(monos));
}
創(chuàng)建出來的MonoJust择懂,MonoCreate等對(duì)象都是Mono的子類,onAssembly方法是將創(chuàng)建好的Mono對(duì)象進(jìn)行裝飾增強(qiáng)帖世。MonoJust休蟹,MonoCreate這些類都是重寫了Mono中的抽象方法:
public abstract void subscribe(CoreSubscriber<? super T> actual);
subcribe一個(gè)Consummer對(duì)象是執(zhí)行父類Mono中定義的subcribe方法,并一直調(diào)用重載的方法日矫,最后將參數(shù)構(gòu)造為一個(gè)LambdaMonoSubscriber對(duì)象調(diào)用subscribeWith方法:
public final Disposable subscribe(Consumer<? super T> consumer) {
Objects.requireNonNull(consumer, "consumer");
return subscribe(consumer, null, null);
}
public final Disposable subscribe(
@Nullable Consumer<? super T> consumer,
@Nullable Consumer<? super Throwable> errorConsumer,
@Nullable Runnable completeConsumer) {
return subscribe(consumer, errorConsumer, completeConsumer, (Context) null);
}
public final Disposable subscribe(
@Nullable Consumer<? super T> consumer,
@Nullable Consumer<? super Throwable> errorConsumer,
@Nullable Runnable completeConsumer,
@Nullable Context initialContext) {
return subscribeWith(new LambdaMonoSubscriber<>(consumer, errorConsumer,
completeConsumer, null, initialContext));
}
public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber) {
subscribe(subscriber);
return subscriber;
}
接著調(diào)用Mono類中實(shí)現(xiàn)Publisher接口中定義的subscribe方法:
@Override
@SuppressWarnings("unchecked")
public final void subscribe(Subscriber<? super T> actual) {
CorePublisher publisher = Operators.onLastAssembly(this);
CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);
try {
//... 省略部分代碼
publisher.subscribe(subscriber);
}
catch (Throwable e) {
Operators.reportThrowInSubscribe(subscriber, e);
return;
}
}
然后調(diào)用實(shí)現(xiàn)CorePublisher接口中定義的subscribe方法赂弓,也是上面提到的需要子類去實(shí)現(xiàn)的Mono類中的抽象方法,以MonoJust為例哪轿,其實(shí)現(xiàn)如下:
public void subscribe(CoreSubscriber<? super T> actual) {
actual.onSubscribe(Operators.scalarSubscription(actual, value));
}
調(diào)用actual的onSubcribe方法盈魁,actual就是剛才創(chuàng)建的LambdaMonoSubscriber對(duì)象,參數(shù)是一個(gè)ScalarSubscription類型的對(duì)象窃诉,其onSubcribe方法如下:
public final void onSubscribe(Subscription s) {
if (Operators.validate(subscription, s)) {
this.subscription = s;
if (subscriptionConsumer != null) {
try {
subscriptionConsumer.accept(s);
}
catch (Throwable t) {
Exceptions.throwIfFatal(t);
s.cancel();
onError(t);
}
}
else {
s.request(Long.MAX_VALUE);
}
}
}
如果有異常會(huì)調(diào)用Subscription對(duì)象的cancel方法和LambdaMonoSubscriber對(duì)象自身的onError方法杨耙,否則會(huì)調(diào)用Subscription對(duì)象的request方法,這里的Subscription對(duì)象的實(shí)際類型為ScalarSubscription類型飘痛,其request方法如下:
public void request(long n) {
if (validate(n)) {
if (ONCE.compareAndSet(this, 0, 1)) {
Subscriber<? super T> a = actual;
a.onNext(value);
if(once != 2) {
a.onComplete();
}
}
}
}
request方法中匯調(diào)用LambdaMonoSubscriber對(duì)象的onNext方法和onComplete方法
基于以上的源碼珊膜,可以總結(jié)如下:
- Mono提供了一些方法創(chuàng)建Mono對(duì)象(MonoCreate,MonoJust等)宣脉,這些對(duì)象都是Publisher
- Publisher可以subcribe Subcriber车柠,subcribe過程就是構(gòu)造一個(gè)Subcription,然后作為參數(shù)執(zhí)行Subcriber的onSubcribe方法塑猖,如果有異常就執(zhí)行Subscription的cancel方法和自身的onError方法
- Subcriber的onSubcribe方法中執(zhí)行Subcription的request方法竹祷,request方法里面會(huì)執(zhí)行Subcriber的onNext方法和onComplete方法
- Mono對(duì)象可以直接subcribe一個(gè)Consumer對(duì)象,其實(shí)是把這個(gè)consummer變成一個(gè)Subcriber對(duì)象