【一起學(xué)習(xí)Reactor】響應(yīng)式編程簡介

IMG_3558.JPG

Reactor是響應(yīng)式編程規(guī)范的一個實現(xiàn)弃锐,維基百科對響應(yīng)式編程的總結(jié)如下:

響應(yīng)式編程是一種異步編程范例袄友,主要關(guān)注數(shù)據(jù)流和數(shù)據(jù)變化通知。這意味著可以使用編程語言輕松表達靜態(tài)(數(shù)組)或動態(tài)(事件發(fā)射器)數(shù)據(jù)流霹菊。更多有關(guān)響應(yīng)式編程的描述可以參考Reactive programming

響應(yīng)式編程邁出的第一步是微軟在.NET系統(tǒng)中創(chuàng)建了響應(yīng)式擴展庫(Rx)剧蚣。在微軟創(chuàng)建Rx之后,RxJava在JVM上實現(xiàn)了響應(yīng)式編程旋廷。隨著時間推移鸠按,經(jīng)過Reactive Streams的不斷努力制定了Java實現(xiàn)響應(yīng)式編程的規(guī)范,規(guī)范為JVM上的響應(yīng)式庫定義了一組接口和交互規(guī)則饶碘。Java 9的Flow類已經(jīng)實現(xiàn)了規(guī)范定義的接口(從Java 9 開始目尖,java開始默認(rèn)支持響應(yīng)式編程,有條件的小伙伴該考慮升級Java版本了)扎运。

在面向?qū)ο蟮木幊陶Z言中瑟曲,響應(yīng)式編程通常作為觀察者模式的一種擴展。如果對比迭代器設(shè)計模式和主流的響應(yīng)式流模式對比豪治,會發(fā)現(xiàn)在幾乎所有的庫中Iterable-Iterator 都有雙重性(可以互相轉(zhuǎn)換)洞拨。兩者主要的區(qū)別是:迭代器設(shè)計模式基于,響應(yīng)式流基于负拟。

迭代器是命令式編程模式烦衣,盡管訪問數(shù)據(jù)的方法僅由Iterable負(fù)責(zé)。實際上在使用迭代器時由開發(fā)者決定何時選擇序列中的next()元素掩浙。在響應(yīng)式流中花吟,和上面Iterable-Iterator對應(yīng)的是Publisher-Subscriber,新值出現(xiàn)時Publisher 會通知Subscriber 涣脚,推送是響應(yīng)的關(guān)鍵示辈。同樣,對推送值的操作是聲明式而不是命令式遣蚀,代碼表達計算的邏輯矾麻,而不是描述其精確的控制流纱耻。

響應(yīng)式流除了推送值之外,同樣以良好的方式定義了錯誤處理和操作完成险耀。一個Publisher可以向其Subscriber推送新的值弄喘,也可以發(fā)送錯誤信號或者完成信號。錯誤信號和完成信號都會終止序列甩牺,下面的表達式準(zhǔn)確簡練的描述了這個邏輯:

onNext x 0..N [onError | onComplete]

這種模式非常的靈活蘑志,可以支持沒有值,一個值或n個值(包括無限序列贬派,比如時間)急但。但是為什么我們首先需要這樣一個異步響應(yīng)式庫呢?

阻塞是一種浪費

現(xiàn)在應(yīng)用有大量的并發(fā)用戶,盡管現(xiàn)代化硬件的能力在不斷提升搞乏,但是軟件性能依然是一個關(guān)鍵問題波桩。有兩種方法可以提高軟件的性能:

  • 并行使用更多的線程和更多的硬件資源,
  • 提高現(xiàn)有資源的使用率请敦。

通常镐躲,Java 開發(fā)者使用阻塞代碼開發(fā)程序,這種方法在沒有性能瓶頸之前非常的完美侍筛,因為阻塞代碼更容易理解也更容易編寫萤皂。當(dāng)程序出現(xiàn)性能瓶頸時,引入另外的線程來運行相同的阻塞代碼(活多了需要加人)匣椰。但是裆熙,資源的這種擴展會迅速引入競爭和并發(fā)問題。更糟糕的是窝爪,阻塞會浪費資源弛车。如果程序遇到延遲(特別是I/O操作,比如數(shù)據(jù)庫請求或網(wǎng)絡(luò)調(diào)用)蒲每,因為線程需要等待數(shù)據(jù)而處于空閑狀態(tài)進而導(dǎo)致資源的浪費纷跛。

因此,并行不是靈丹妙藥邀杏。充分使用硬件的能力十分必要贫奠,但是推理過程十分復(fù)雜而且更加容易造成浪費。

異步是一副良藥嗎望蜡?

通過編寫異步唤崭、非阻塞代碼,可以切換到另一個活動的使用相同基礎(chǔ)資源的任務(wù)并在異步處理完成后返回到當(dāng)前的任務(wù)脖律。通過異步代碼可以提高資源的使用率谢肾,減少資源浪費。

Java提供了下面兩種異步編程模型:

  • Callbacks: 異步方法沒有返回值小泉,但是需要一個額外的callback參數(shù)(可以是lambda表達式或匿名類)芦疏,當(dāng)結(jié)果可用時會被調(diào)用冕杠。
  • Futures::方法立即返回一個Future<T> 。異步計算T的值酸茴,但是Future 封裝了對T值的訪問分预。T值可能不是立即可用,而且Future對象支持輪詢直到值T可用薪捍。Java的ExecutorService 執(zhí)行Callable<T> 時返回一個Future 對象笼痹。

Java提供的這兩種編寫異步代碼的技術(shù)足夠好了嗎?這些技術(shù)很好酪穿,但并不適用于每一種場景凳干,而且都有各自的局限性。Callbacks的缺點是很難組合在一起使用昆稿,而且多個回調(diào)組合在一起使用時纺座,代碼很快就會變的難以閱讀和維護(通常稱為回調(diào)地獄)息拜。

下面以在用戶界面顯示用戶前五個收藏夾為樣例說明Callbacks的局限性溉潭。業(yè)務(wù)場景為:顯時用戶前五個收藏夾,如果用戶沒有收藏夾則顯示建議少欺。

userService.getFavorites(userId, new Callback<List<String>>() { // 1
  public void onSuccess(List<String> list) { // 2
    if (list.isEmpty()) { // 3
      suggestionService.getSuggestions(new Callback<List<Favorite>>() {
        public void onSuccess(List<Favorite> list) { // 4
          UiUtils.submitOnUiThread(() -> { // 5
            list.stream()
                .limit(5)
                .forEach(uiList::show); // 6
            });
        }

        public void onError(Throwable error) { // 7
          UiUtils.errorPopup(error);
        }
      });
    } else {
      list.stream() //8
          .limit(5)
          .forEach(favId -> favoriteService.getDetails(favId, // 9
            new Callback<Favorite>() {
              public void onSuccess(Favorite details) {
                UiUtils.submitOnUiThread(() -> uiList.show(details));
              }

              public void onError(Throwable error) {
                UiUtils.errorPopup(error);
              }
            }
          ));
    }
  }

  public void onError(Throwable error) {
    UiUtils.errorPopup(error);
  }
});

基于callback的實現(xiàn)有很多的代碼喳瓣,這些代碼難以理解,要想一步一步弄懂邏輯也比較困難赞别,而且代碼還有部分重復(fù)畏陕。下面是基于Reactor的等價實現(xiàn):

userService.getFavorites(userId)
           .flatMap(favoriteService::getDetails)
           .switchIfEmpty(suggestionService.getSuggestions())
           .take(5)
           .publishOn(UiUtils.uiThreadScheduler())
           .subscribe(uiList::show, UiUtils::errorPopup);

基于callback的代碼實現(xiàn)如果要增加超時邏輯會十分的困難,但是基于Reactor的實現(xiàn)只要使用timeout方法即可輕松完成:

userService.getFavorites(userId)
           .timeout(Duration.ofMillis(800))
           .onErrorResume(cacheService.cachedFavoritesFor(userId))
           .flatMap(favoriteService::getDetails)
           .switchIfEmpty(suggestionService.getSuggestions())
           .take(5)
           .publishOn(UiUtils.uiThreadScheduler())
           .subscribe(uiList::show, UiUtils::errorPopup);

Future對象比回調(diào)好一點仿滔,但它們在組合方面仍然做得不好惠毁,盡管CompletableFuture在Java 8中帶來了改進。編排多個Future 對象可行但是并不容易崎页。除此之外鞠绰,Future 還有其他問題:

  • 調(diào)用get() 方法很容易導(dǎo)致Future 對象阻塞,
  • 不支持懶加載/懶計算飒焦,
  • 它們?nèi)狈Χ嘀岛透呒夊e誤處理的支持蜈膨。

考慮另一個例子:我們獲得一個id列表,我們想從中獲取一個名稱和一個統(tǒng)計信息牺荠,并將它們成對組合翁巍,所有這些都是異步的.。下面的代碼使用一個 CompletableFuture列表來實現(xiàn)該功能:

CompletableFuture<List<String>> ids = ifhIds();

CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> {
    Stream<CompletableFuture<String>> zip =
            l.stream().map(i -> {
                CompletableFuture<String> nameTask = ifhName(i);
                CompletableFuture<Integer> statTask = ifhStat(i);

                return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat);
            });
    List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList());
    CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);

    CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray);
    return allDone.thenApply(v -> combinationList.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList()));
});

List<String> results = result.join();
assertThat(results).contains(
        "Name NameJoe has stats 103",
        "Name NameBart has stats 104",
        "Name NameHenry has stats 105",
        "Name NameNicole has stats 106",
        "Name NameABSLAJNFOAJNFOANFANSF has stats 121");

由于Reactor有更多的可開箱的組合操作符休雌,上面的過程可以簡化如下:

Flux<String> ids = ifhrIds();

Flux<String> combinations =
        ids.flatMap(id -> {
            Mono<String> nameTask = ifhrName(id);
            Mono<Integer> statTask = ifhrStat(id);

            return nameTask.zipWith(statTask,
                    (name, stat) -> "Name " + name + " has stats " + stat);
        });

Mono<List<String>> result = combinations.collectList(); 

List<String> results = result.block(); 
assertThat(results).containsExactly(
        "Name NameJoe has stats 103",
        "Name NameBart has stats 104",
        "Name NameHenry has stats 105",
        "Name NameNicole has stats 106",
        "Name NameABSLAJNFOAJNFOANFANSF has stats 121"
);

使用回調(diào)和Future對象的危險是相似的灶壶,這也是響應(yīng)式編程通過Publisher-Subscriber對解決的問題。

3.3. 從命令式編程到響應(yīng)式編程

諸如Reactor的響應(yīng)式編程庫旨在解決JVM上“經(jīng)典”異步方法的缺點杈曲,同時也著重對以下方面進行改進:

  • 可組合性可讀性驰凛,
  • 把數(shù)據(jù)當(dāng)做流處理孝情,同時提供豐富的操作方法,
  • 訂閱之前不會發(fā)生任何事情洒嗤,
  • 背壓消費者向生產(chǎn)者發(fā)送信號通知數(shù)據(jù)生產(chǎn)速率過高或過低的能力箫荡,
  • 對并發(fā)不可知更高價值和更高級的抽象。

3.3.1. 可組合性和可讀性

“可組合性”指的是編排多個異步任務(wù)的能力渔隶,可以將前面任務(wù)的結(jié)果作為后續(xù)任務(wù)的輸入羔挡。當(dāng)然也可以以fork-join的方式運行多個任務(wù)。此外间唉,我們可以在更高級的系統(tǒng)中把異步任務(wù)作為離散組件重用绞灼。

編排任務(wù)的能力與代碼的可讀性和可維護性緊密相關(guān)。隨著異步處理層的數(shù)量和復(fù)雜性的增加呈野,編寫和閱讀代碼變得越來越困難低矮。正如我們所見,callback模型十分簡單被冒,但是callback一個缺點就是處理變的復(fù)雜军掂,一個callback需要在另外一個callback中執(zhí)行,這樣一層一層的嵌套昨悼。這就是“回調(diào)地獄”蝗锥,這種代碼難以閱讀和分析邏輯。

Reactor提供了豐富的組合操作率触,代碼可以反應(yīng)對處理過程抽象的組織终议,一切盡量保持在同一層(盡量減少嵌套,這也是和callback模式相比最大的改進之一)葱蝗。

3.3.2. 類比工廠的生產(chǎn)線

數(shù)據(jù)在響應(yīng)式程序中的處理過程穴张,可以被看作是數(shù)據(jù)在組裝流水線中移動。Reactor既是傳送帶又是工作站两曼。原材料從來源(第一個Publisher)傾瀉而出(中間經(jīng)過多道工序加工)皂甘,最終成為可以推送給消費者(Subscriber)的成品。

原材料可以經(jīng)過各種轉(zhuǎn)換和其他中間步驟合愈,也可以成為將中間零件組裝在一起的更大裝配線的一部分叮贩。如果某一點出現(xiàn)故障或堵塞(某到工序耗時長),那么出問題的工作站可以向上游發(fā)出信號佛析,以限制原材料的流動(有問題及時向上游反饋益老,上游做出響應(yīng),避免進一步惡化)寸莫。

3.3.3. Operators

在Reactor中捺萌,operator就是流水線中的工作站。每個operator都會將行為添加到Publisher 中膘茎,并將上一步的Publisher 包裝到新實例中桃纯。這樣構(gòu)建了一個完整的鏈接酷誓,數(shù)據(jù)從第一個Publisher 向下游移動并由每一個鏈接進行轉(zhuǎn)換,最后态坦,由一個Subscriber 結(jié)束數(shù)據(jù)的數(shù)據(jù)處理過程盐数。在Subscriber 訂閱之前數(shù)據(jù)不會被處理也不會向下游移動。

盡管響應(yīng)式流規(guī)范根本沒有定義operator伞梯,但是像Reactor的響應(yīng)式庫提供的最佳附加值之一就是提供了豐富的operator玫氢,從簡單的轉(zhuǎn)換和過濾到復(fù)雜的編排和錯誤處理,這些內(nèi)容涉及很多的領(lǐng)域谜诫。

3.3.4. 不 subscribe()不會發(fā)生任何事情

在Reactor中漾峡,當(dāng)你編寫了一個Publisher 鏈,默認(rèn)數(shù)據(jù)不會注入喻旷。實際上只是創(chuàng)建了一個異步處理過程的抽象描述(這有助于重用和組合)生逸。通過subscribing 動作,將Publisher 綁定到Subscriber 且预,這會觸發(fā)數(shù)據(jù)在整個鏈路中移動槽袄。內(nèi)部實現(xiàn)是通過Subscriber 發(fā)送Request 信號,信號被傳播到上游一直到Publisher 辣之。request 也是實現(xiàn)背壓的關(guān)鍵方法掰伸。

3.3.5. 背壓

向上游傳播信號通常用來實現(xiàn)背壓,在和流水線的類比中描述為當(dāng)工作站處理比上游工作站的處理速度慢時怀估,沿著流水線向上游反饋。背壓其實就是下游向上游發(fā)送信號合搅,并影響上游數(shù)據(jù)處理的一種機制多搀。

響應(yīng)式流規(guī)范定義的實際機制可以簡單的概括為:一個subscriber可以以“無界” 模式工作,并讓數(shù)據(jù)源以最快的速度推送所有數(shù)據(jù)灾部,或者使用request 機制向數(shù)據(jù)源發(fā)送信息康铭,向數(shù)據(jù)源反饋已經(jīng)準(zhǔn)備好處理n個元素。

中間operator可以在中途改變request赌髓。想象一下一個buffer 以十個元素為一組將元素進行分組从藤。如果subscriber請求一個buffer,數(shù)據(jù)源發(fā)送十個元素是可以被接受锁蠕。一些operator也實現(xiàn)了預(yù)拉取策略 夷野,這避免了request(1) 不斷往返。如果在請求之前生成元素的成本很低荣倾,這種操作就非常的有幫助悯搔,可以顯著的提高處理效率。

這會將推模式轉(zhuǎn)換為推拉混合模式舌仍,如果上游已經(jīng)準(zhǔn)備了數(shù)據(jù)妒貌,下游則可以從上游獲取n個元素通危。但是如果數(shù)據(jù)還沒有準(zhǔn)備好,那么當(dāng)有數(shù)據(jù)時上游就會將數(shù)據(jù)推送到下游灌曙。

3.3.6. Hot vs Cold

Rx響應(yīng)式庫家族將響應(yīng)序列分為兩大類:“熱”和“冷”菊碟。這種區(qū)別主要與響應(yīng)式流對subscriber的響應(yīng)有關(guān):

  • 對于每一個Subscriber,包括在數(shù)據(jù)源位置在刺,冷序列都會重新開始框沟。例如,如果源包裝了HTTP調(diào)用增炭,則將為每個subscription發(fā)出一個新的HTTP請求忍燥。
  • 對于每一個Subscriber ,熱序列并非都會從頭開始隙姿。相反摄杂,后面的subscriber只能收到訂閱完成之后產(chǎn)生的數(shù)據(jù)。但是一些熱響應(yīng)式流可以緩存或者對歷史數(shù)據(jù)全部或部分重放场靴,也就是說遲來的subscriber可以收到在完成訂閱動作之前的數(shù)據(jù)玩祟。從一般的角度來看,即使沒有訂閱者在訂閱數(shù)據(jù)欲鹏,熱序列甚至?xí)l(fā)出數(shù)據(jù)(“訂閱之前什么也沒有發(fā)生”規(guī)則的例外)机久。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市赔嚎,隨后出現(xiàn)的幾起案子膘盖,更是在濱河造成了極大的恐慌,老刑警劉巖尤误,帶你破解...
    沈念sama閱讀 221,635評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件侠畔,死亡現(xiàn)場離奇詭異,居然都是意外死亡损晤,警方通過查閱死者的電腦和手機软棺,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,543評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來尤勋,“玉大人喘落,你說我怎么就攤上這事∽畋” “怎么了瘦棋?”我有些...
    開封第一講書人閱讀 168,083評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長锌奴。 經(jīng)常有香客問我兽狭,道長,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,640評論 1 296
  • 正文 為了忘掉前任箕慧,我火速辦了婚禮服球,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘颠焦。我一直安慰自己斩熊,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 68,640評論 6 397
  • 文/花漫 我一把揭開白布伐庭。 她就那樣靜靜地躺著粉渠,像睡著了一般。 火紅的嫁衣襯著肌膚如雪圾另。 梳的紋絲不亂的頭發(fā)上霸株,一...
    開封第一講書人閱讀 52,262評論 1 308
  • 那天,我揣著相機與錄音集乔,去河邊找鬼去件。 笑死,一個胖子當(dāng)著我的面吹牛扰路,可吹牛的內(nèi)容都是我干的尤溜。 我是一名探鬼主播,決...
    沈念sama閱讀 40,833評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼汗唱,長吁一口氣:“原來是場噩夢啊……” “哼宫莱!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起哩罪,我...
    開封第一講書人閱讀 39,736評論 0 276
  • 序言:老撾萬榮一對情侶失蹤授霸,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后识椰,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體绝葡,經(jīng)...
    沈念sama閱讀 46,280評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,369評論 3 340
  • 正文 我和宋清朗相戀三年腹鹉,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片敷硅。...
    茶點故事閱讀 40,503評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡功咒,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出绞蹦,到底是詐尸還是另有隱情力奋,我是刑警寧澤,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布幽七,位于F島的核電站景殷,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜猿挚,卻給世界環(huán)境...
    茶點故事閱讀 41,870評論 3 333
  • 文/蒙蒙 一咐旧、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧绩蜻,春花似錦铣墨、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,340評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至孕蝉,卻和暖如春屡律,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背降淮。 一陣腳步聲響...
    開封第一講書人閱讀 33,460評論 1 272
  • 我被黑心中介騙來泰國打工超埋, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人骤肛。 一個月前我還...
    沈念sama閱讀 48,909評論 3 376
  • 正文 我出身青樓纳本,卻偏偏與公主長得像,于是被迫代替她去往敵國和親腋颠。 傳聞我的和親對象是個殘疾皇子繁成,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,512評論 2 359