Mono類型解析

java8中的Consumer袄简、Function

Consummer接口

Consumer 是消費(fèi)者接口鸠蚪,被@FunctionalInterface注解修飾,這意味著它可以看做是一個(gè)方法落蝙,Consumer接口中定義了一個(gè)accept方法胰锌。

void accept(T t);

實(shí)現(xiàn)Consumer接口的消費(fèi)者需要重寫accept方法,accept方法就是消費(fèi)生產(chǎn)者生產(chǎn)出來的對(duì)象進(jìn)行消費(fèi)芹血。Consumer對(duì)象可以這樣定義:

    Consumer c0 = System.out::println;
    Consumer<Integer> c1 = x -> System.out.println(x + 1);

Consumer對(duì)象可以定義某個(gè)對(duì)象的方法蹲嚣,或者是類的靜態(tài)方法,這就是代表 在Consummer的accept方法中使用該對(duì)象去執(zhí)行該方法祟牲,或者執(zhí)行這個(gè)類的靜態(tài)方法隙畜。也可以用lambda表達(dá)式來進(jìn)行定義。

Function接口

Function接口也是被@FunctionalInterface注解修飾修飾的说贝,表示一個(gè)方法议惰。Function接口中定義了一個(gè)apply方法:

    R apply(T t);

與Consumer接口類似,F(xiàn)unction接口的實(shí)現(xiàn)類需要實(shí)現(xiàn)apply方法乡恕,通常Function類型的對(duì)象作為參數(shù)都會(huì)去調(diào)用Function對(duì)象的apply方法言询,也就是執(zhí)行這個(gè)Function。Function對(duì)象也可以按照Consumer接口一樣的方式進(jìn)行定義:

    Function f0 = WebfluxTest::aaa;//這里aaa方法必須是有返回值的靜態(tài)方法
    Function<Integer, Integer> f1 = (x) -> x + 1;

Function與Consumer的區(qū)別就在于傲宜,Consumer是沒有返回值的运杭,而Function是有返回值的,也因此函卒,如果lambda表達(dá)式只有一行辆憔,F(xiàn)unction中匿名方法中的內(nèi)容是一個(gè)表達(dá)式(表示返回的值),而Consumer中匿名方法的內(nèi)容是一條語句(執(zhí)行的邏輯)报嵌。

Mono和Flux

Mono和Flux都實(shí)現(xiàn)了Publisher接口虱咧,也就是發(fā)布者,一個(gè)Mono對(duì)象中最多發(fā)射一個(gè)信號(hào)(可能是一個(gè)值锚国,或者空腕巡,或者一個(gè)error),而Flux可以發(fā)射多個(gè)信號(hào)血筑。
Publisher接口中只定義了一個(gè)subscribe方法:

public void subscribe(Subscriber<? super T> s);

subscribe方法就是綁定一個(gè)Subscriber去訂閱這個(gè)Publisher中的信號(hào)绘沉。
Subscriber類的定義如下:

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);

    public void onNext(T t);

    public void onError(Throwable t);

    public void onComplete();
}

Mono subcribe一個(gè)Consummer的過程

首先需要先創(chuàng)建一個(gè)Mono對(duì)象煎楣,Mono提供了一些方法創(chuàng)建Mono對(duì)象,如creat车伞,just等:

    public static <T> Mono<T> just(T data) {
        return onAssembly(new MonoJust<>(data));
    }

    public static <T> Mono<T> create(Consumer<MonoSink<T>> callback) {
        return onAssembly(new MonoCreate<>(callback));
    }

    public static <T> Mono<T> defer(Supplier<? extends Mono<? extends T>> supplier) {
        return onAssembly(new MonoDefer<>(supplier));
    }

    public static Mono<Long> delay(Duration duration, Scheduler timer) {
        return onAssembly(new MonoDelay(duration.toMillis(), TimeUnit.MILLISECONDS, timer));
    }

    public static <T> Mono<T> empty() {
        return MonoEmpty.instance();
    }

    public static <T> Mono<T> error(Throwable error) {
        return onAssembly(new MonoError<>(error));
    }

    public static <T> Mono<T> first(Mono<? extends T>... monos) {
        return onAssembly(new MonoFirst<>(monos));
    }

創(chuàng)建出來的MonoJust择懂,MonoCreate等對(duì)象都是Mono的子類,onAssembly方法是將創(chuàng)建好的Mono對(duì)象進(jìn)行裝飾增強(qiáng)帖世。MonoJust休蟹,MonoCreate這些類都是重寫了Mono中的抽象方法:

    public abstract void subscribe(CoreSubscriber<? super T> actual);

subcribe一個(gè)Consummer對(duì)象是執(zhí)行父類Mono中定義的subcribe方法,并一直調(diào)用重載的方法日矫,最后將參數(shù)構(gòu)造為一個(gè)LambdaMonoSubscriber對(duì)象調(diào)用subscribeWith方法:

    public final Disposable subscribe(Consumer<? super T> consumer) {
        Objects.requireNonNull(consumer, "consumer");
        return subscribe(consumer, null, null);
    }

    public final Disposable subscribe(
            @Nullable Consumer<? super T> consumer,
            @Nullable Consumer<? super Throwable> errorConsumer,
            @Nullable Runnable completeConsumer) {
        return subscribe(consumer, errorConsumer, completeConsumer, (Context) null);
    }

    public final Disposable subscribe(
            @Nullable Consumer<? super T> consumer,
            @Nullable Consumer<? super Throwable> errorConsumer,
            @Nullable Runnable completeConsumer,
            @Nullable Context initialContext) {
        return subscribeWith(new LambdaMonoSubscriber<>(consumer, errorConsumer,
                completeConsumer, null, initialContext));
    }
    
    public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber) {
        subscribe(subscriber);
        return subscriber;
    }

接著調(diào)用Mono類中實(shí)現(xiàn)Publisher接口中定義的subscribe方法:

    @Override
    @SuppressWarnings("unchecked")
    public final void subscribe(Subscriber<? super T> actual) {
        CorePublisher publisher = Operators.onLastAssembly(this);
        CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);

        try {
            //... 省略部分代碼
            publisher.subscribe(subscriber);
        }
        catch (Throwable e) {
            Operators.reportThrowInSubscribe(subscriber, e);
            return;
        }
    }

然后調(diào)用實(shí)現(xiàn)CorePublisher接口中定義的subscribe方法赂弓,也是上面提到的需要子類去實(shí)現(xiàn)的Mono類中的抽象方法,以MonoJust為例哪轿,其實(shí)現(xiàn)如下:

    public void subscribe(CoreSubscriber<? super T> actual) {
        actual.onSubscribe(Operators.scalarSubscription(actual, value));
    }

調(diào)用actual的onSubcribe方法盈魁,actual就是剛才創(chuàng)建的LambdaMonoSubscriber對(duì)象,參數(shù)是一個(gè)ScalarSubscription類型的對(duì)象窃诉,其onSubcribe方法如下:

    public final void onSubscribe(Subscription s) {
        if (Operators.validate(subscription, s)) {
            this.subscription = s;

            if (subscriptionConsumer != null) {
                try {
                    subscriptionConsumer.accept(s);
                }
                catch (Throwable t) {
                    Exceptions.throwIfFatal(t);
                    s.cancel();
                    onError(t);
                }
            }
            else {
                s.request(Long.MAX_VALUE);
            }

        }
    }

如果有異常會(huì)調(diào)用Subscription對(duì)象的cancel方法和LambdaMonoSubscriber對(duì)象自身的onError方法杨耙,否則會(huì)調(diào)用Subscription對(duì)象的request方法,這里的Subscription對(duì)象的實(shí)際類型為ScalarSubscription類型飘痛,其request方法如下:

        public void request(long n) {
            if (validate(n)) {
                if (ONCE.compareAndSet(this, 0, 1)) {
                    Subscriber<? super T> a = actual;
                    a.onNext(value);
                    if(once != 2) {
                        a.onComplete();
                    }
                }
            }
        }

request方法中匯調(diào)用LambdaMonoSubscriber對(duì)象的onNext方法和onComplete方法

基于以上的源碼珊膜,可以總結(jié)如下:

  • Mono提供了一些方法創(chuàng)建Mono對(duì)象(MonoCreate,MonoJust等)宣脉,這些對(duì)象都是Publisher
  • Publisher可以subcribe Subcriber车柠,subcribe過程就是構(gòu)造一個(gè)Subcription,然后作為參數(shù)執(zhí)行Subcriber的onSubcribe方法塑猖,如果有異常就執(zhí)行Subscription的cancel方法和自身的onError方法
  • Subcriber的onSubcribe方法中執(zhí)行Subcription的request方法竹祷,request方法里面會(huì)執(zhí)行Subcriber的onNext方法和onComplete方法
  • Mono對(duì)象可以直接subcribe一個(gè)Consumer對(duì)象,其實(shí)是把這個(gè)consummer變成一個(gè)Subcriber對(duì)象
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末羊苟,一起剝皮案震驚了整個(gè)濱河市塑陵,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌蜡励,老刑警劉巖令花,帶你破解...
    沈念sama閱讀 206,126評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異巍虫,居然都是意外死亡彭则,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門占遥,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人输瓜,你說我怎么就攤上這事瓦胎》移迹” “怎么了?”我有些...
    開封第一講書人閱讀 152,445評(píng)論 0 341
  • 文/不壞的土叔 我叫張陵搔啊,是天一觀的道長(zhǎng)柬祠。 經(jīng)常有香客問我,道長(zhǎng)负芋,這世上最難降的妖魔是什么漫蛔? 我笑而不...
    開封第一講書人閱讀 55,185評(píng)論 1 278
  • 正文 為了忘掉前任,我火速辦了婚禮旧蛾,結(jié)果婚禮上莽龟,老公的妹妹穿的比我還像新娘。我一直安慰自己锨天,他們只是感情好毯盈,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,178評(píng)論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著病袄,像睡著了一般搂赋。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上益缠,一...
    開封第一講書人閱讀 48,970評(píng)論 1 284
  • 那天脑奠,我揣著相機(jī)與錄音,去河邊找鬼幅慌。 笑死宋欺,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的欠痴。 我是一名探鬼主播迄靠,決...
    沈念sama閱讀 38,276評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼喇辽!你這毒婦竟也來了掌挚?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,927評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤菩咨,失蹤者是張志新(化名)和其女友劉穎吠式,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體抽米,經(jīng)...
    沈念sama閱讀 43,400評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡特占,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,883評(píng)論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了云茸。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片是目。...
    茶點(diǎn)故事閱讀 37,997評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖标捺,靈堂內(nèi)的尸體忽然破棺而出懊纳,到底是詐尸還是另有隱情揉抵,我是刑警寧澤,帶...
    沈念sama閱讀 33,646評(píng)論 4 322
  • 正文 年R本政府宣布嗤疯,位于F島的核電站冤今,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏茂缚。R本人自食惡果不足惜戏罢,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,213評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望脚囊。 院中可真熱鬧龟糕,春花似錦、人聲如沸凑术。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽淮逊。三九已至催首,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間泄鹏,已是汗流浹背郎任。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評(píng)論 1 260
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留备籽,地道東北人舶治。 一個(gè)月前我還...
    沈念sama閱讀 45,423評(píng)論 2 352
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像车猬,于是被迫代替她去往敵國(guó)和親霉猛。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,722評(píng)論 2 345