Java9 Flow API(譯)

原文鏈接

這篇文章中,會展示一個Java9中FlowAPI的列子昧旨,通過Publisher和Subscriber接口來構(gòu)建響應(yīng)式程序。最后你將會理解這種全新的編程模式和她的優(yōu)缺點富拗。所有的代碼可在Github上下載臼予。


Java9 Flow API介紹

JDK9響應(yīng)式編程

Java是一個“古老”并且廣泛應(yīng)用的編程語言,但Java9中引入了一些新鮮有趣的特性啃沪。這篇文章主要介紹FlowAPI這個新特性粘拾,通過FlowAPI我們僅僅使用JDK就能夠搭建響應(yīng)式應(yīng)用程序,而不需要其他額外的類庫创千,如RxJava或Project Reactor缰雇。

盡管如此,當你看到過接口文檔后你就會明白到正如字面所說追驴,這只是一個API而已械哟。她僅僅包含了一些Interface和一個實現(xiàn)類:

  • Interface Flow.Publisher<T>定義了生產(chǎn)數(shù)據(jù)和控制事件的方法。
  • Interface Flow.Subscriber<T>定義了消費數(shù)據(jù)和事件的方法殿雪。
  • Interface Flow.Subscription 定義了鏈接Publisher和Subscriber的方法暇咆。
  • Interface Flow.Processor<T,R>定義了轉(zhuǎn)換Publisher到Subscriber的方法
  • 最后,class SubmissionPublisher<T>Flow.Publisher<T>的實現(xiàn)丙曙,她可以靈活的生產(chǎn)數(shù)據(jù)爸业,同時與Reactive Stream兼容。

雖然Java9中沒有很多FlowAPI的實現(xiàn)類可供我們使用亏镰,但是依靠這些接口第三方可以提供的響應(yīng)式編程得到了規(guī)范和統(tǒng)一扯旷,比如從JDBC driver到RabbitMQ的響應(yīng)式實現(xiàn)。

Pull索抓,Push钧忽,Pull-Push

我對響應(yīng)式編程的理解是毯炮, 這是一種數(shù)據(jù)消費者控制數(shù)據(jù)流的編程方式。需要指出是耸黑,當消費速度低于生產(chǎn)速度時桃煎,消費者要求生產(chǎn)者降低速度以完全消費數(shù)據(jù)(這個現(xiàn)象稱作back-pressure)。這種處理方式不是在制造混亂崎坊,你可能已經(jīng)使用過這種模式备禀,只是最近因為在主要框架和平臺上使用才變得更流行,比如Java9奈揍,Spring5。另外在分布式系統(tǒng)中處理大規(guī)模數(shù)據(jù)傳輸時也使用到了這種模式赋续。

回顧過去可以幫我們更好的理解這種模式男翰。幾年前,最常見的消費數(shù)據(jù)模式是pull-based纽乱。client端不斷輪詢服務(wù)端以獲取數(shù)據(jù)蛾绎。這種模式的優(yōu)點是當client端資源有限時可以更好的控制數(shù)據(jù)流(停止輪詢),而缺點是當服務(wù)端沒有數(shù)據(jù)時輪詢是對計算資源和網(wǎng)絡(luò)資源的浪費鸦列。

隨著時間推移租冠,處理數(shù)據(jù)的模式轉(zhuǎn)變?yōu)閜ush-based,生產(chǎn)者不關(guān)心消費者的消費能力薯嗤,直接推送數(shù)據(jù)顽爹。這種模式的缺點是當消費資源低于生產(chǎn)資源時會造成緩沖區(qū)溢出從而數(shù)據(jù)丟失,當丟失率維持在較小的數(shù)值時還可以接受骆姐,但是當這個比率變大時我們會希望生產(chǎn)者降速以避免大規(guī)模數(shù)據(jù)丟失镜粤。

響應(yīng)式編程是一種pull-push混合模式以綜合他們的優(yōu)點,這種模式下消費者負責(zé)請求數(shù)據(jù)以控制生產(chǎn)者數(shù)據(jù)流玻褪,同時當處理資源不足時也可以選擇阻斷或者丟棄數(shù)據(jù)肉渴,接下來我們會看到一個典型案例。

Flow與Stream

響應(yīng)式編程并不是為了替換傳統(tǒng)編程带射,其實兩者相互兼容而且可以互相協(xié)作完成任務(wù)同规。Java8中引入的StreamAPI通過map,reduce以及其他操作可以完美的處理數(shù)據(jù)集窟社,而FlowAPI則專注于處理數(shù)據(jù)的流通券勺,比如對數(shù)據(jù)的請求,減速桥爽,丟棄朱灿,阻塞等。同時你可以使用Streams作為數(shù)據(jù)源(publisher)钠四,當必要時阻塞丟棄其中的數(shù)據(jù)盗扒。你也可以在Subscriber中使用Streams以進行數(shù)據(jù)的歸并操作跪楞。更值得一提的時reactive streams不僅兼容傳統(tǒng)編程方式,而且還支持函數(shù)式編程以極大的提高可讀性和可維護性侣灶。

有一點可能會使我們感到困惑:如果你需要在兩個系統(tǒng)間傳輸數(shù)據(jù)甸祭,同時進行轉(zhuǎn)形操作,如何使用Flows和Streams來完成褥影?這種情況下池户,我們使用Java8的Function來做數(shù)據(jù)轉(zhuǎn)換,但是如何在Publisher和Subscriber之間使用StreamAPI呢凡怎?答案是我們可以在Publisher和Subscriber之間再加一個subscriber校焦,她可以從最初的publisher獲取數(shù)據(jù),轉(zhuǎn)換统倒,然后再作為一個新的publisher寨典,而使最初的subscriber訂閱這個新的publisher,也是Java9中的接口Flow.Processor<T,R>房匆,我們只需要實現(xiàn)這個接口并編寫轉(zhuǎn)換數(shù)據(jù)的functions耸成。

從技術(shù)上講,我們完全可以使用Flows來替換Streams浴鸿,但任何時候都這么做就顯得過于偏激井氢。比如,我們創(chuàng)建一個Publisher來作為int數(shù)組的數(shù)據(jù)源岳链,然后在Processor中轉(zhuǎn)換Integer為String花竞,最后創(chuàng)建一個Subscriber來歸并到一個String中。這個時候就完全沒有必要使用Flows宠页,因為這不是在控制兩個模塊或兩個線程間的數(shù)據(jù)通信左胞,這個時候使用Streams更為合理。

一個雜志出版商的使用場景

image

本文中給出的示例代碼是以雜志出版商為模型举户。假設(shè)出版商有兩個訂閱客戶烤宙。

出版商將為每個訂閱客戶出版20本雜志。出版商知道他們的客戶有時在郵遞雜志時會不在家俭嘁,而當他們的郵箱(subscriber buffer)不巧被塞滿時郵遞員會退回或丟棄雜志躺枕,出版商不希望出現(xiàn)這種情況。

于是出版商發(fā)明了一個郵遞系統(tǒng):當客戶在家時再給出版商致電供填,出版商會立即郵遞一份雜志拐云。出版商打算在辦公室為每個客戶保留一個小號的郵箱以防當雜志出版時客戶沒有第一時間致電獲取。出版商認為為每個客戶預(yù)留一個可以容納8份雜志的郵件已經(jīng)足夠(publisher buffer)近她。

于是一名員工提出了以下不同的場景:

  1. 如果客戶請求雜志足夠迅速叉瘩,將不會存在郵箱容量的問題。
  2. 如果客戶沒有以雜志出版的速度發(fā)出請求粘捎,那么郵箱將被塞滿薇缅。這位員工提出以下幾種處理方案:
    a. 增加郵箱容量危彩,為每位客戶提供可容納20份雜志的郵箱。(publisher增加buffer)
    b. 直到客戶請求下一份雜志之前停止印刷泳桦,并且根據(jù)客戶請求的速度降低印刷速度以清空郵箱汤徽。
    c. 新的雜志直接丟掉。
    d. 一個折中的方案: 如果郵箱滿了灸撰,在下次打印之前等待一段時間谒府,如果還是沒有足夠的空間則丟棄新的雜志。

出版商無法承受花費過多的資源僅僅是因為一個速度慢的客戶浮毯,那將是巨大的浪費完疫,最終選擇了方案d,最大程度上減少客戶損失。

本文示例代碼中選用了方案d是因為如果我們使用了一個虛擬的無窮buffer,這對理解Reactive模式的中概念是不利的员帮,代碼也將變得過于簡易亥贸,無法與其他方案進行比較,接下來讓我們來看代碼吧讯嫂。

Java9 Flow 代碼示例

一個簡單的Subscriber(full-control of Flow)

從訂閱者開始蹦锋,MagazineSubscriber實現(xiàn)了Flow.Subscriber<Integer>,訂閱者將收到一個數(shù)字欧芽,但請假設(shè)這是一份雜志正如上面的使用場景提到的莉掂。

package com.thepracticaldeveloper;

import java.util.concurrent.Flow;
import java.util.stream.IntStream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MagazineSubscriber implements Flow.Subscriber<Integer> {

  public static final String JACK = "Jack";
  public static final String PETE = "Pete";

  private static final Logger log = LoggerFactory.
    getLogger(MagazineSubscriber.class);

  private final long sleepTime;
  private final String subscriberName;
  private Flow.Subscription subscription;
  private int nextMagazineExpected;
  private int totalRead;

  MagazineSubscriber(final long sleepTime, final String subscriberName) {
    this.sleepTime = sleepTime;
    this.subscriberName = subscriberName;
    this.nextMagazineExpected = 1;
    this.totalRead = 0;
  }

  @Override
  public void onSubscribe(final Flow.Subscription subscription) {
    this.subscription = subscription;
    subscription.request(1);
  }

  @Override
  public void onNext(final Integer magazineNumber) {
    if (magazineNumber != nextMagazineExpected) {
      IntStream.range(nextMagazineExpected, magazineNumber).forEach(
        (msgNumber) ->
          log("Oh no! I missed the magazine " + msgNumber)
      );
      // Catch up with the number to keep tracking missing ones
      nextMagazineExpected = magazineNumber;
    }
    log("Great! I got a new magazine: " + magazineNumber);
    takeSomeRest();
    nextMagazineExpected++;
    totalRead++;

    log("I'll get another magazine now, next one should be: " +
      nextMagazineExpected);
    subscription.request(1);
  }

  @Override
  public void onError(final Throwable throwable) {
    log("Oops I got an error from the Publisher: " + throwable.getMessage());
  }

  @Override
  public void onComplete() {
    log("Finally! I completed the subscription, I got in total " +
      totalRead + " magazines.");
  }

  private void log(final String logMessage) {
    log.info("<=========== [" + subscriberName + "] : " + logMessage);
  }

  public String getSubscriberName() {
    return subscriberName;
  }

  private void takeSomeRest() {
    try {
      Thread.sleep(sleepTime);
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }
}

class中實現(xiàn)了必要的方法如下:

  • onSubscriber(subscription) Publisher在被指定一個新的Subscriber時調(diào)用此方法。 一般來說你需要在subscriber內(nèi)部保存這個subscrition實例千扔,因為后面會需要通過她向publisher發(fā)送信號來完成:請求更多數(shù)據(jù)憎妙,或者取消訂閱。 一般在這里我們會直接請求第一個數(shù)據(jù)曲楚,正如代碼中所示厘唾。
  • onNext(magazineNumber) 每當新的數(shù)據(jù)產(chǎn)生,這個方法會被調(diào)用龙誊。在我們的示例中抚垃,我們用到了最經(jīng)典的使用方式:處理這個數(shù)據(jù)的同時再請求下一個數(shù)據(jù)。然而我們在這中間添加了一段可配置的sleep時間趟大,這樣我們可以嘗試訂閱者在不同場景下的表現(xiàn)鹤树。剩下的一段邏輯判斷僅僅是記錄下丟失的雜志(當publisher出現(xiàn)丟棄數(shù)據(jù)的時候)。
  • onError(throwable) 當publisher出現(xiàn)異常時會調(diào)用subscriber的這個方法逊朽。在我們的實現(xiàn)中publisher丟棄數(shù)據(jù)時會產(chǎn)生異常罕伯。
  • onComplete() 當publisher數(shù)據(jù)推送完畢時會調(diào)用此方法,于是整個訂閱過程結(jié)束叽讳。

通過Java9 SubmissionPublisher發(fā)送數(shù)據(jù)

我們將使用Java9 SubmissionPublisher類來創(chuàng)建publisher追他。正如javadoc所述坟募, 當subscribers消費過慢,就像Reactive Streams中的Publisher一樣她會阻塞或丟棄數(shù)據(jù)湿酸。在深入理解之前讓我們先看代碼婿屹。

package com.thepracticaldeveloper;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReactiveFlowApp {

  private static final int NUMBER_OF_MAGAZINES = 20;
  private static final long MAX_SECONDS_TO_KEEP_IT_WHEN_NO_SPACE = 2;
  private static final Logger log =
    LoggerFactory.getLogger(ReactiveFlowApp.class);

  public static void main(String[] args) throws Exception {
    final ReactiveFlowApp app = new ReactiveFlowApp();

    log.info("\n\n### CASE 1: Subscribers are fast, buffer size is not so " +
      "important in this case.");
    app.magazineDeliveryExample(100L, 100L, 8);

    log.info("\n\n### CASE 2: A slow subscriber, but a good enough buffer " +
      "size on the publisher's side to keep all items until they're picked up");
    app.magazineDeliveryExample(1000L, 3000L, NUMBER_OF_MAGAZINES);

    log.info("\n\n### CASE 3: A slow subscriber, and a very limited buffer " +
      "size on the publisher's side so it's important to keep the slow " +
      "subscriber under control");
    app.magazineDeliveryExample(1000L, 3000L, 8);

  }

  void magazineDeliveryExample(final long sleepTimeJack,
                               final long sleepTimePete,
                               final int maxStorageInPO) throws Exception {
    final SubmissionPublisher<Integer> publisher =
      new SubmissionPublisher<>(ForkJoinPool.commonPool(), maxStorageInPO);

    final MagazineSubscriber jack = new MagazineSubscriber(
      sleepTimeJack,
      MagazineSubscriber.JACK
    );
    final MagazineSubscriber pete = new MagazineSubscriber(
      sleepTimePete,
      MagazineSubscriber.PETE
    );

    publisher.subscribe(jack);
    publisher.subscribe(pete);

    log.info("Printing 20 magazines per subscriber, with room in publisher for "
      + maxStorageInPO + ". They have " + MAX_SECONDS_TO_KEEP_IT_WHEN_NO_SPACE +
      " seconds to consume each magazine.");
    IntStream.rangeClosed(1, 20).forEach((number) -> {
      log.info("Offering magazine " + number + " to consumers");
      final int lag = publisher.offer(
        number,
        MAX_SECONDS_TO_KEEP_IT_WHEN_NO_SPACE,
        TimeUnit.SECONDS,
        (subscriber, msg) -> {
          subscriber.onError(
            new RuntimeException("Hey " + ((MagazineSubscriber) subscriber)
              .getSubscriberName() + "! You are too slow getting magazines" +
              " and we don't have more space for them! " +
              "I'll drop your magazine: " + msg));
          return false; // don't retry, we don't believe in second opportunities
        });
      if (lag < 0) {
        log("Dropping " + -lag + " magazines");
      } else {
        log("The slowest consumer has " + lag +
          " magazines in total to be picked up");
      }
    });

    // Blocks until all subscribers are done (this part could be improved
    // with latches, but this way we keep it simple)
    while (publisher.estimateMaximumLag() > 0) {
      Thread.sleep(500L);
    }

    // Closes the publisher, calling the onComplete() method on every subscriber
    publisher.close();
    // give some time to the slowest consumer to wake up and notice
    // that it's completed
    Thread.sleep(Math.max(sleepTimeJack, sleepTimePete));
  }

  private static void log(final String message) {
    log.info("===========> " + message);
  }

}

magazineDeliveryExample中我們?yōu)閮蓚€不同的subscribers設(shè)置了兩個不同的等待時間, 并且設(shè)置了緩存容量maxStorageInPO
步驟如下:

  1. 創(chuàng)建SubmissionPublisher并設(shè)置一個標準的線程池(每個subscriber擁有一個線程)
  2. 創(chuàng)建兩個subscribers推溃,通過傳遞變量設(shè)置不同的消費時間和不同的名字昂利,以在log中方便區(qū)別
  3. 用20個數(shù)字的的stream數(shù)據(jù)集作為數(shù)據(jù)源以扮演“雜志打印機”,我們調(diào)用offer铁坎,并傳遞以下變量:
    a. 提供給subscribers的數(shù)據(jù)蜂奸。
    b. 第二和第三個變量是等待subscribers獲取雜志的最大時間。
    c. 控制器以處理數(shù)據(jù)丟棄的情況硬萍。這里我們拋出了一個異常扩所,返回false意味著告訴publisher不需要重試。
  4. 當丟棄數(shù)據(jù)發(fā)生時朴乖,offer方法返回一個負數(shù)祖屏,否則將返回publisher的最大容量(以供最慢的subscriber消費),同時打印這個數(shù)字买羞。
    5 . 最后我們添加了一個循環(huán)等待以防止主進程過早結(jié)束袁勺。這里一個是等待publisher清空緩存數(shù)據(jù),另外等待最慢的subscriber收到onComplete回調(diào)信號(close()調(diào)用之后)

main()方法中使用不同參數(shù)調(diào)用以上邏輯三次畜普,以模擬之前介紹的三種不同真是場景期丰。

  1. 消費者消費速度很快,publisher緩存區(qū)不會發(fā)生問題吃挑。
  2. 其中一個消費者速度很慢钝荡,以至緩存被填滿,然而緩存區(qū)足夠大以容納所有所有數(shù)據(jù)舶衬,不會發(fā)生丟棄埠通。
  3. 其中一個消費者速度很慢,同時緩存區(qū)不夠大约炎,這是控制器被出發(fā)了多次植阴,subscriber沒有收到所有數(shù)據(jù)。

你還可以嘗試其他組合圾浅,比如設(shè)置MAX_SECONDS_TO_WAIT_WHEN_NO_SPACE為很大的數(shù)字掠手,這時offer表象將類似于submit,或者可以嘗試將兩個消費者速度同時降低(會出現(xiàn)大量丟棄數(shù)據(jù))狸捕。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末喷鸽,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子灸拍,更是在濱河造成了極大的恐慌做祝,老刑警劉巖砾省,帶你破解...
    沈念sama閱讀 211,743評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異混槐,居然都是意外死亡编兄,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,296評論 3 385
  • 文/潘曉璐 我一進店門声登,熙熙樓的掌柜王于貴愁眉苦臉地迎上來狠鸳,“玉大人,你說我怎么就攤上這事悯嗓〖妫” “怎么了?”我有些...
    開封第一講書人閱讀 157,285評論 0 348
  • 文/不壞的土叔 我叫張陵脯厨,是天一觀的道長铅祸。 經(jīng)常有香客問我,道長合武,這世上最難降的妖魔是什么临梗? 我笑而不...
    開封第一講書人閱讀 56,485評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮稼跳,結(jié)果婚禮上夜焦,老公的妹妹穿的比我還像新娘。我一直安慰自己岂贩,他們只是感情好,可當我...
    茶點故事閱讀 65,581評論 6 386
  • 文/花漫 我一把揭開白布巷波。 她就那樣靜靜地躺著萎津,像睡著了一般。 火紅的嫁衣襯著肌膚如雪抹镊。 梳的紋絲不亂的頭發(fā)上锉屈,一...
    開封第一講書人閱讀 49,821評論 1 290
  • 那天,我揣著相機與錄音垮耳,去河邊找鬼颈渊。 笑死,一個胖子當著我的面吹牛终佛,可吹牛的內(nèi)容都是我干的俊嗽。 我是一名探鬼主播,決...
    沈念sama閱讀 38,960評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼铃彰,長吁一口氣:“原來是場噩夢啊……” “哼绍豁!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起牙捉,我...
    開封第一講書人閱讀 37,719評論 0 266
  • 序言:老撾萬榮一對情侶失蹤竹揍,失蹤者是張志新(化名)和其女友劉穎敬飒,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體芬位,經(jīng)...
    沈念sama閱讀 44,186評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡无拗,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,516評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了昧碉。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片英染。...
    茶點故事閱讀 38,650評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖晌纫,靈堂內(nèi)的尸體忽然破棺而出税迷,到底是詐尸還是另有隱情,我是刑警寧澤锹漱,帶...
    沈念sama閱讀 34,329評論 4 330
  • 正文 年R本政府宣布箭养,位于F島的核電站,受9級特大地震影響哥牍,放射性物質(zhì)發(fā)生泄漏毕泌。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,936評論 3 313
  • 文/蒙蒙 一嗅辣、第九天 我趴在偏房一處隱蔽的房頂上張望撼泛。 院中可真熱鬧,春花似錦澡谭、人聲如沸愿题。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,757評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽潘酗。三九已至,卻和暖如春雁仲,著一層夾襖步出監(jiān)牢的瞬間仔夺,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,991評論 1 266
  • 我被黑心中介騙來泰國打工攒砖, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留缸兔,地道東北人。 一個月前我還...
    沈念sama閱讀 46,370評論 2 360
  • 正文 我出身青樓吹艇,卻偏偏與公主長得像惰蜜,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子掐暮,可洞房花燭夜當晚...
    茶點故事閱讀 43,527評論 2 349

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理蝎抽,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 134,633評論 18 139
  • RxJava RxJava是響應(yīng)式程序設(shè)計的一種實現(xiàn)樟结。在響應(yīng)式程序設(shè)計中养交,當數(shù)據(jù)到達的時候,消費者做出響應(yīng)瓢宦。響應(yīng)式...
    Mr槑閱讀 941評論 0 5
  • Android 自定義View的各種姿勢1 Activity的顯示之ViewRootImpl詳解 Activity...
    passiontim閱讀 171,777評論 25 707
  • 現(xiàn)今驮履,近視越來越趨向低齡化鱼辙,除了父母遺傳,更多是因為長期看電視玫镐、玩手機造成的倒戏。 18歲前的孩子身體機能還未發(fā)育成熟...
  • 一日一練打卡
    李果子2005閱讀 86評論 0 1