響應(yīng)式編程入門之 Project Reactor

本文目標(biāo)

  • 理解響應(yīng)式編程

前言

之前的《聊聊 IO 多路復(fù)用》中浸卦,我們理解了非阻塞 IO 的意義主巍。但是 Spring MVC 并不能完美的應(yīng)用非阻塞編程指黎,于是 Spring 團隊開發(fā)了 WebFlux,而 WebFlux 的基礎(chǔ)正是本文要講到的 Project Reactor(下文簡稱為 Reactor)

本文以 Reactor 為例帶大家入門響應(yīng)式編程

版本

    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>
        <version>3.4.6</version>
    </dependency>

什么是 Reactor

Reactor 是 JVM 的非阻塞響應(yīng)式編程基礎(chǔ)亡资,支持背壓。 它直接與 Java 8 函數(shù)式 API 集成,特別是 CompletableFuture伊诵、Stream 和 Duration。 它提供了可組合的異步序列 API — Flux(用于 [N] 個元素)和 Mono(用于 [0|1] 個元素)回官,并實現(xiàn)了 Reactive Streams 規(guī)范曹宴。
在 Reactor 的基礎(chǔ)上還演化出了適合微服務(wù)架構(gòu)的 Reactor Netty 。為 HTTP(包括 Websockets)歉提、TCP 和 UDP 提供支持背壓和響應(yīng)式的網(wǎng)絡(luò)引擎笛坦。

上面是對于官方文檔的翻譯。下面來說說我自己對 Reactor 和響應(yīng)式編程的理解苔巨。

回想一下之前的非阻塞 IO 編程版扩,例如我們現(xiàn)在要用非阻塞的方式調(diào)用一個遠(yuǎn)程服務(wù),當(dāng)遠(yuǎn)程接口數(shù)據(jù)可用時去做一些業(yè)務(wù)處理侄泽。這時候代碼怎么寫呢礁芦?我們需要提供一個回調(diào)函數(shù),然后在響應(yīng)就緒的時候悼尾,去調(diào)用我們的回調(diào)函數(shù)柿扣。

從邏輯上來看,這完全沒有問題诀豁。但是如果我們的回調(diào)很復(fù)雜窄刘,代碼看起來會是什么樣呢?

// 以下案例來自 Reactor 官網(wǎng)
userService.getFavorites(userId, new Callback<List<String>>() { 
  public void onSuccess(List<String> list) { 
    if (list.isEmpty()) { 
      suggestionService.getSuggestions(new Callback<List<Favorite>>() {
        public void onSuccess(List<Favorite> list) { 
          UiUtils.submitOnUiThread(() -> { 
            list.stream()
                .limit(5)
                .forEach(uiList::show); 
            });
        }

        public void onError(Throwable error) { 
          UiUtils.errorPopup(error);
        }
      });
    } else {
      list.stream() 
          .limit(5)
          .forEach(favId -> favoriteService.getDetails(favId, 
            new Callback<Favorite>() {
              public void onSuccess(Favorite details) {
                UiUtils.submitOnUiThread(() -> uiList.show(details));
              }

              public void onError(Throwable error) {
                UiUtils.errorPopup(error);
              }
            }
          ));
    }
  }

  public void onError(Throwable error) {
    UiUtils.errorPopup(error);
  }
});

這個代碼說實話已經(jīng)有點回調(diào)地獄那味兒了舷胜,讓一段不是很復(fù)雜的邏輯變得很難讀了娩践。但是如果用 Reactor 寫呢?

// 以下案例來自 Reactor 官網(wǎng)
userService.getFavorites(userId) 
           .flatMap(favoriteService::getDetails) 
           .switchIfEmpty(suggestionService.getSuggestions()) 
           .take(5) 
           .publishOn(UiUtils.uiThreadScheduler()) 
           .subscribe(uiList::show, UiUtils::errorPopup);

可以看到烹骨,代碼變得非常的簡潔翻伺。唯一帶來的困擾就是,我們不知道這些函數(shù)到底是啥意思 ??

響應(yīng)式編程雖然有非常多的特性沮焕,但是它并不是什么神奇的技術(shù)吨岭,它也是建立在傳統(tǒng)命令式編程的基礎(chǔ)上。只不過它所提供的 API 以及規(guī)范更適合在非阻塞 IO 中使用峦树。雖然在非阻塞 IO 框架中幾乎只使用響應(yīng)式編程(Vertx辣辫,WebFlux)旦事,只是因為這樣做更合適,并不是說沒了響應(yīng)式編程急灭,就玩不了非阻塞 IO 了姐浮。

響應(yīng)式編程內(nèi)幕

Reactor 實現(xiàn)了 org.reactivestreams 提供的 Java 響應(yīng)式編程規(guī)范,我們只要了解 reactivestreams 中代碼是如何運轉(zhuǎn)的葬馋,再看 Reactor 相關(guān)的代碼就容易多了卖鲤。

下圖展示了 reactivestreams 中的核心接口

reactivestreams 核心接口
  • Publisher:發(fā)布者

  • Subscriber:訂閱者

  • Subscription:這個單詞中文翻譯為名詞的訂閱,在代碼中它是發(fā)布者和訂閱者之間的媒介

  • Processor:該接口繼承了發(fā)布者和訂閱者畴嘶,可以理解為發(fā)布者和訂閱者的中間操作(但是 Reactor 的中間操作并沒有實現(xiàn) Processor蛋逾,在最新版本的 Reactor 中,Processor 的相關(guān)實現(xiàn)接口已經(jīng)被棄用)

在了解了響應(yīng)式編程的核心接口之后窗悯,我們來看下響應(yīng)式編程是如何運作的

響應(yīng)式編程執(zhí)行邏輯

在 Reactor 中大部分實現(xiàn)都是按照上圖的邏輯來執(zhí)行的

  1. 首先是Subscriber(訂閱者)主動訂閱 Publisher(發(fā)布者)区匣,通過調(diào)用 Publisher 的 subscribe 方法
  2. Publisher 在向下游發(fā)送數(shù)據(jù)之前,會先調(diào)用 Subscriber 的 onSubscribe 方法蟀瞧,傳遞的參數(shù)為 Subscription(訂閱媒介)
  3. Subscriber 通過 Subscription#request 來請求數(shù)據(jù)沉颂,或者 Subscription#cancel 來取消數(shù)據(jù)發(fā)布(這就是響應(yīng)式編程中的背壓,訂閱者可以控制數(shù)據(jù)發(fā)布)
  4. Subscription 在接收到訂閱者的調(diào)用后悦污,通過 Subscriber#onNext 向下游訂閱者傳遞數(shù)據(jù)。
  5. 在數(shù)據(jù)發(fā)布完成后钉蒲,調(diào)用 Subscriber#onComplete 結(jié)束本次流切端,如果數(shù)據(jù)發(fā)布或者處理遇到錯誤會調(diào)用 Subscriber#onError

調(diào)用 Subscriber#onNext,onComplete顷啼,onError 這三個方法踏枣,可能是在 Publisher 中做的,也可能是在 Subscription 中做的钙蒙,根據(jù)不同的場景有不同的實現(xiàn)方式茵瀑,并沒有什么嚴(yán)格的要求」幔可以認(rèn)為 Publisher 和 Subscription 共同配合完成了數(shù)據(jù)發(fā)布

其實 Reactor 中 API 實現(xiàn)原理也都是這個套路马昨,我這邊也自己寫了個例子便于讓讀者加深對響應(yīng)式編程的理解

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/**
 * @author tianwen.yin
 */
public class SimpleReactiveStream {

    /**
     * 實現(xiàn)一個簡單的響應(yīng)式編程發(fā)布者
     * 邏輯:當(dāng)訂閱者發(fā)起訂閱時,像下游發(fā)送一個 HelloWorld扛施,發(fā)布邏輯由 SimpleSubscription 完成
     */
    static class SimplePublisher implements Publisher {
        @Override
        public void subscribe(Subscriber s) {
            // 2. Publisher 發(fā)布數(shù)據(jù)之前鸿捧,調(diào)用 Subscriber 的 onSubscribe
            s.onSubscribe(new SimpleSubscription(data(), s));
        }

        private String data() {
            return "Hello World";
        }
    }

    static class SimpleSubscriber implements Subscriber {
        @Override
        public void onSubscribe(Subscription s) {
            // 3. Subscriber 通過 Subscription#request 來請求數(shù)據(jù)
            // 或者 Subscription#cancel 來取消數(shù)據(jù)發(fā)布
            s.request(Long.MAX_VALUE);
        }

        @Override
        public void onNext(Object o) {
            System.out.println(o);
        }

        @Override
        public void onError(Throwable t) {
            System.out.println("error");
        }

        @Override
        public void onComplete() {
            System.out.println("complete");
        }
    }

    static class SimpleSubscription implements Subscription {
        String data;
        Subscriber actual;
        boolean isCanceled;

        public SimpleSubscription(String data, Subscriber actual) {
            this.data = data;
            this.actual = actual;
        }

        @Override
        public void request(long n) {
            if (!isCanceled) {
                try {
                    // 4. Subscription 在接收到訂閱者的調(diào)用后
                    // 通過 Subscriber#onNext 向下游訂閱者傳遞數(shù)據(jù)
                    actual.onNext(data);
                    // 5. 在數(shù)據(jù)發(fā)布完成后,調(diào)用 Subscriber#onComplete 結(jié)束本次流
                    actual.onComplete();
                } catch (Exception e) {
                    // 如果數(shù)據(jù)發(fā)布或者處理遇到錯誤會調(diào)用 Subscriber#onError
                    actual.onError(e);
                }
            }
        }

        @Override
        public void cancel() {
            isCanceled = true;
        }
    }

    public static void main(String[] args) {
        // 1. Subscriber ”訂閱“ Publisher
        new SimplePublisher().subscribe(new SimpleSubscriber());
    }

}

響應(yīng)式編程思想

響應(yīng)式編程疙渣,就像裝配一條流水線匙奴。Publisher 規(guī)定了數(shù)據(jù)如何生產(chǎn),中間會有 Operators(操作符)對流水線的數(shù)據(jù)進(jìn)行解析妄荔,校驗泼菌,轉(zhuǎn)換等等操作谍肤,最終處理好的數(shù)據(jù)流轉(zhuǎn)到 Subscriber。

image.png

這條流水線還有一個特點哗伯。大部分情況下當(dāng) Publisher 的 subscribe 方法被調(diào)用之前谣沸,什么都不會發(fā)生。在被訂閱之前我們只是在定義流水線該如何工作笋颤,直到真正有人需要的時候乳附,流水線才會啟動。

Reactor 中的 Operator

Operators 怎么理解呢伴澄?對于上游來說赋除,Operators 像一個訂閱者,而對于它的下游來說非凌,它像一個發(fā)布者(我們上文說過了 Reactor 中的中間操作并沒有實現(xiàn) Processor 接口)

    Mono.just("hello")
            .map(a -> a + "world")
            .subscribe(System.out::println);

舉個簡單的例子举农,在上面的代碼中,map 就是一個 Operator敞嗡,它的實現(xiàn)思路是什么颁糟?來看下面的代碼

    // 注意,這是我基于 Reactor API 實現(xiàn)的偽代碼喉悴!
    public static class MonoMap implements Publisher {
        // 我們自定義的轉(zhuǎn)換邏輯
        private Function mapper;
        // source 代表當(dāng)前操作符的上游發(fā)布者
        private Publisher source;

        public MonoMap(Publisher source, Function mapper) {
            this.source = source;
            this.mapper = mapper;
        }

        @Override
        public void subscribe(Subscriber actual) {
            source.subscribe(new MonoMapSubscriber(mapper, actual));
        }
    }

    public static class MonoMapSubscriber implements Subscriber {
        // 我們自定義的轉(zhuǎn)換邏輯
        private Function mapper;
        // 真正的下游
        private Subscriber actual;

        public MonoMapSubscriber(Function mapper, Subscriber actual) {
            this.mapper = mapper;
            this.actual = actual;
        }

        @Override
        public void onSubscribe(Subscription s) {
            actual.onSubscribe(s);
        }

        @Override
        public void onNext(Object o) {
            // 當(dāng)上游數(shù)據(jù)發(fā)送過來時棱貌,先進(jìn)行轉(zhuǎn)換再發(fā)送給下游
            Object result = mapper.apply(o);
            actual.onNext(result);
        }

        @Override
        public void onError(Throwable t) {
            actual.onError(t);
        }

        @Override
        public void onComplete() {
            actual.onComplete();
        }
    }

上述代碼是我自己實現(xiàn)的一個偽代碼,用于讓大家理解操作符的實現(xiàn)思路箕肃,實際 Reactor 代碼也是這個思路婚脱,只不過實現(xiàn)的更加巧妙和嚴(yán)謹(jǐn)

我們首先來分析一下 Mono.just("hello").map(a -> a + "world") 這句話

  1. 當(dāng)執(zhí)行到 Mono.just 時,會新建一個 MonoJust 對象作為當(dāng)前的 Publisher勺像。該發(fā)布者的邏輯是障贸,當(dāng)訂閱時,向下游發(fā)送數(shù)據(jù) "hello"

  2. 當(dāng)執(zhí)行到 map 方法時吟宦,會新建一個 MonoMap 對象替作為當(dāng)前的 Publisher篮洁,MonoJust 成為了 MonoMap 中的一個屬性 source(實際的上游)

    • 當(dāng) MonoMap 被訂閱時,會先將它的下游 actual 做一層包裝殃姓,也就是我們上面的 MonoMapSubscriber袁波。然后去調(diào)用 source 的 subscribe 方法。上游發(fā)布數(shù)據(jù)時辰狡,MonoMapSubscriber 先對數(shù)據(jù)進(jìn)行轉(zhuǎn)換(我們上面的拼接字符串操作)锋叨,然后再發(fā)送給 actual(它的下游)

    • 當(dāng) MonoMap 被再次轉(zhuǎn)換時,MonoMap 就變成了下游操作符的 source...

最后通過一張圖來總結(jié)一下

Operator 實現(xiàn)原理

Reactor 該如何學(xué)習(xí)

本文并沒有介紹太多 Reactor 的細(xì)節(jié)宛篇,因為這些東西實在是太多了娃磺。我想聊聊我自己是如何學(xué)習(xí) Reactor 的

如果你已經(jīng)通過本文理解了響應(yīng)式編程的核心接口是如何工作的了,那恭喜你已經(jīng)邁向了成功的第一步了叫倍。接下來就是閱讀官方文檔偷卧,不斷的練習(xí)和閱讀 Reactor 的源碼豺瘤。源碼追蹤的方向已經(jīng)很明確了,當(dāng)我們想了解一個發(fā)布者的實現(xiàn)原理是什么听诸,我就要去關(guān)注這個發(fā)布者的 subscribe 方法和 Subscription 都做了什么坐求。想了解消費者的邏輯,就看它的 onNext晌梨,onComplete桥嗤,onError。

最后

如果覺得我的文章對你有幫助仔蝌,動動小手點下關(guān)注泛领,你的支持是對我最大的幫助

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市敛惊,隨后出現(xiàn)的幾起案子渊鞋,更是在濱河造成了極大的恐慌,老刑警劉巖瞧挤,帶你破解...
    沈念sama閱讀 216,692評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件锡宋,死亡現(xiàn)場離奇詭異,居然都是意外死亡特恬,警方通過查閱死者的電腦和手機执俩,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,482評論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來鸵鸥,“玉大人奠滑,你說我怎么就攤上這事《恃ǎ” “怎么了?”我有些...
    開封第一講書人閱讀 162,995評論 0 353
  • 文/不壞的土叔 我叫張陵摊崭,是天一觀的道長讼油。 經(jīng)常有香客問我,道長呢簸,這世上最難降的妖魔是什么矮台? 我笑而不...
    開封第一講書人閱讀 58,223評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮根时,結(jié)果婚禮上瘦赫,老公的妹妹穿的比我還像新娘。我一直安慰自己蛤迎,他們只是感情好确虱,可當(dāng)我...
    茶點故事閱讀 67,245評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著替裆,像睡著了一般校辩。 火紅的嫁衣襯著肌膚如雪窘问。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,208評論 1 299
  • 那天宜咒,我揣著相機與錄音惠赫,去河邊找鬼。 笑死故黑,一個胖子當(dāng)著我的面吹牛儿咱,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播场晶,決...
    沈念sama閱讀 40,091評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼混埠,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了峰搪?” 一聲冷哼從身側(cè)響起岔冀,我...
    開封第一講書人閱讀 38,929評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎概耻,沒想到半個月后使套,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,346評論 1 311
  • 正文 獨居荒郊野嶺守林人離奇死亡鞠柄,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,570評論 2 333
  • 正文 我和宋清朗相戀三年侦高,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片厌杜。...
    茶點故事閱讀 39,739評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡奉呛,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出夯尽,到底是詐尸還是另有隱情瞧壮,我是刑警寧澤,帶...
    沈念sama閱讀 35,437評論 5 344
  • 正文 年R本政府宣布匙握,位于F島的核電站咆槽,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏圈纺。R本人自食惡果不足惜秦忿,卻給世界環(huán)境...
    茶點故事閱讀 41,037評論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望蛾娶。 院中可真熱鬧灯谣,春花似錦、人聲如沸蛔琅。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,677評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至呐萨,卻和暖如春杀饵,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背谬擦。 一陣腳步聲響...
    開封第一講書人閱讀 32,833評論 1 269
  • 我被黑心中介騙來泰國打工切距, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人惨远。 一個月前我還...
    沈念sama閱讀 47,760評論 2 369
  • 正文 我出身青樓谜悟,卻偏偏與公主長得像,于是被迫代替她去往敵國和親北秽。 傳聞我的和親對象是個殘疾皇子葡幸,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,647評論 2 354

推薦閱讀更多精彩內(nèi)容