JDK9新特性 Reactive Stream 響應式流

JDK9新特性 Reactive Stream 響應式流

?本篇主要講解 JDK9特性 Reactive Stream 響應式流噪猾,介紹 Reactive Stream是什么 背壓是什么反番,以及JDK9中提供的關于Reactive Stream的接口和 2個使用案例包括如何使用Processor夺刑。

?1.Reactive Stream 概念

?Reactive Stream (響應式流/反應流) 是JDK9引入的一套標準茬末,是一套基于發(fā)布/訂閱模式的數(shù)據(jù)處理規(guī)范同诫。響應式流從2013年開始,作為提供非阻塞背壓的異步流處理標準的倡議耿戚。 它旨在解決處理元素流的問題——如何將元素流從發(fā)布者傳遞到訂閱者窒典,而不需要發(fā)布者阻塞蟆炊,或訂閱者有無限制的緩沖區(qū)或丟棄。更確切地說瀑志,Reactive流目的是“找到最小的一組接口涩搓,方法和協(xié)議,用來描述必要的操作和實體以實現(xiàn)這樣的目標:以非阻塞背壓方式實現(xiàn)數(shù)據(jù)的異步流”劈猪。

?反應式流 (Reactive Stream) 規(guī)范誕生昧甘,定義了如下四個接口:

Subscription 接口定義了連接發(fā)布者和訂閱者的方法
Publisher<T> 接口定義了發(fā)布者的方法
Subscriber<T> 接口定義了訂閱者的方法
Processor<T,R> 接口定義了處理器

?Reactive Stream 規(guī)范誕生后,RxJava 從 RxJava 2 開始實現(xiàn) Reactive Stream 規(guī)范 战得, 同時 Spring提供的Reactor 框架(WebFlux的基礎) 等也相繼實現(xiàn)了 Reactive Stream 規(guī)范

?下圖展示了訂閱者和發(fā)布者之間的交互

Xnip20200225_131010.png

?2.背壓(back pressure)概念

?如果生產者發(fā)出的信息比消費者能夠處理消息最大量還要多充边,消費者可能會被迫一直在抓消息,耗費越來越多的資源常侦,埋下潛在的崩潰風險浇冰。為了防止這一點,需要有一種機制使消費者可以通知生產者聋亡,降低消息的生成速度肘习。生產者可以采用多種策略來實現(xiàn)這一要求,這種機制稱為背壓坡倔。

?簡單來說就是

  • 背壓指的發(fā)布者和訂閱者之間的互動
  • 訂閱者可以告訴發(fā)布者自己需要多少數(shù)據(jù)漂佩,可以調節(jié)數(shù)據(jù)流量,不會導致發(fā)布者發(fā)布數(shù)據(jù)過多導致數(shù)據(jù)浪費或壓垮訂閱者

?3.JDK9中 Reactive Stream規(guī)范的實現(xiàn)

?JDK9中Reactive Stream的實現(xiàn)規(guī)范 通常被稱為 Flow API 罪塔,通過java.util.concurrent.Flow 和java.util.concurrent.SubmissionPublisher 類來實現(xiàn)響應式流

?在JDK9里Reactive Stream的主要接口聲明在Flow類里投蝉,F(xiàn)low 類中定義了四個嵌套的靜態(tài)接口,用于建立流量控制的組件征堪,發(fā)布者在其中生成一個或多個供訂閱者使用的數(shù)據(jù)項:

  • Publisher:數(shù)據(jù)項發(fā)布者墓拜、生產者
  • Subscriber:數(shù)據(jù)項訂閱者、消費者
  • Subscription:發(fā)布者與訂閱者之間的關系紐帶请契,訂閱令牌
  • Processor:數(shù)據(jù)處理器

?
Xnip20200225_132212.png

??3.1 發(fā)布者 Publisher

??Publisher 將數(shù)據(jù)流發(fā)布給注冊的 Subscriber咳榜。 它通常使用 Executor 異步發(fā)布項目給訂閱者。 Publisher 需要確保每個訂閱的 Subscriber 方法嚴格按順序調用爽锥。

  • subscribe:訂閱者訂閱發(fā)布者
    @FunctionalInterface 
    public static interface Flow.Publisher<T> { 
      public void subscribe(Subscriber<? super T> subscriber); 
    }

??3.2 訂閱者 Subscriber

??Subscriber 訂閱 Publisher 的數(shù)據(jù)流涌韩,并接受回調。 如果 Subscriber 沒有發(fā)出請求氯夷,就不會收到數(shù)據(jù)臣樱。對于給定 訂閱合同(Subscription),調用 Subscriber 的方法是嚴格按順序的腮考。

  • onSubscribe:發(fā)布者調用訂閱者的這個方法來異步傳遞訂閱 雇毫, 這個方法在 publisher.subscribe方法調用后被執(zhí)行
  • onNext:發(fā)布者調用這個方法傳遞數(shù)據(jù)給訂閱者
  • onError:當 Publisher 或 Subscriber 遇到不可恢復的錯誤時調用此方法,之后不會再調用其他方法
  • onComplete:當數(shù)據(jù)已經發(fā)送完成踩蔚,且沒有錯誤導致訂閱終止時棚放,調用此方法,之后不再調用其他方法

??3.3 訂閱合同 Subscription

??Subscription 用于連接 Publisher 和 Subscriber馅闽。Subscriber 只有在請求時才會收到項目飘蚯,并可以通過 Subscription 取消訂閱。Subscription 主要有兩個方法:

  • request:訂閱者調用此方法請求數(shù)據(jù)

  • cancel:訂閱者調用這個方法來取消訂閱福也,解除訂閱者與發(fā)布者之間的關系

      public static interface Flow.Subscription {
        public void request(long n);
        public void cancel();
      }
    

??3.4 處理器 Processor

??Processor 位于 Publisher 和 Subscriber 之間局骤,用于做數(shù)據(jù)轉換”┐眨可以有多個 Processor 同時使用峦甩,組成一個處理鏈,鏈中最后一個處理器的處理結果發(fā)送給 Subscriber现喳。JDK 沒有提供任何具體的處理器凯傲。處理器同時是訂閱者和發(fā)布者,接口的定義也是繼承了兩者 即作為訂閱者也作為發(fā)布者 拿穴,作為訂閱者接收數(shù)據(jù)泣洞,然后進行處理,處理完后作為發(fā)布者默色,再發(fā)布出去球凰。

/**
 * A component that acts as both a Subscriber and Publisher.
 *
 * @param <T> the subscribed item type
 * @param <R> the published item type
 */
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}
Xnip20200225_133449.png

?4.JDK9 中Reactive Stream(Flow API )規(guī)范調用流程

?Publisher是能夠發(fā)出元素的發(fā)布者,Subscriber是接收元素并做出響應的訂閱者腿宰。當執(zhí)行Publisher里的subscribe方法時呕诉,發(fā)布者會回調訂閱者的onSubscribe方法,這個方法中吃度,通常訂閱者會借助傳入的Subscription向發(fā)布者請求n個數(shù)據(jù)甩挫。然后發(fā)布者通過不斷調用訂閱者的onNext方法向訂閱者發(fā)出最多n個數(shù)據(jù)。如果數(shù)據(jù)全部發(fā)完椿每,則會調用onComplete告知訂閱者流已經發(fā)完伊者;如果有錯誤發(fā)生英遭,則通過onError發(fā)出錯誤數(shù)據(jù),同樣也會終止流亦渗。

?其中挖诸,Subscription相當于是連接Publisher和Subscriber的“紐帶(合同)”。因為當發(fā)布者調用subscribe方法注冊訂閱者時法精,會通過訂閱者的回調方法onSubscribe傳入Subscription對象多律,之后訂閱者就可以使用這個Subscription對象的request方法向發(fā)布者“要”數(shù)據(jù)了。背壓機制正是基于此來實現(xiàn)的搂蜓。

Xnip20200225_133733.png

?5.案例一 響應式基礎使用案例

??5.1 以下代碼簡單演示了SubmissionPublisher 和這套發(fā)布-訂閱框架的基本使用方式:

??注意要使用JDK9以上的版本

/**
* @author johnny
* @create 2020-02-24 下午5:44
**/
@Slf4j
public class ReactiveStreamTest {


public static void main(String[] args) throws InterruptedException {


    //1.創(chuàng)建 生產者Publisher JDK9自帶的 實現(xiàn)了Publisher接口
    SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();

    //2.創(chuàng)建 訂閱者 Subscriber狼荞,需要自己去實現(xiàn)內部方法

    Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<>() {

        private Flow.Subscription subscription;

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            System.out.println("訂閱成功。帮碰。");
            subscription.request(1);
            System.out.println("訂閱方法里請求一個數(shù)據(jù)");
        }

        @Override
        public void onNext(Integer item) {
            log.info("【onNext 接受到數(shù)據(jù) item : {}】 ", item);
            subscription.request(1);
        }

        @Override
        public void onError(Throwable throwable) {
            log.info("【onError 出現(xiàn)異诚辔叮】");
            subscription.cancel();
        }

        @Override
        public void onComplete() {
            log.info("【onComplete 所有數(shù)據(jù)接收完成】");
        }
    };

    //3。發(fā)布者和訂閱者 建立訂閱關系 就是回調訂閱者的onSubscribe方法傳入訂閱合同
    publisher.subscribe(subscriber);


    //4.發(fā)布者 生成數(shù)據(jù)
    for (int i = 1; i <= 5; i++) {
        log.info("【生產數(shù)據(jù) {} 】", i );
        //submit是一個阻塞方法收毫,此時會調用訂閱者的onNext方法
        publisher.submit(i);
    }


    //5.發(fā)布者 數(shù)據(jù)都已發(fā)布完成后攻走,關閉發(fā)送,此時會回調訂閱者的onComplete方法
    publisher.close();

    //主線程睡一會
    Thread.currentThread().join(100000);


  }
}

?打印輸出結果

Xnip20200225_134439.png

?看結果好像我們看不出來Reactive Stream有什么用 此再,其實關鍵點在 publisher.submit(i); submit它是一個阻塞方法
讓我們把代碼修改一點

1.將onNext添加耗時操作昔搂,模擬業(yè)務耗時邏輯
2.增加發(fā)布者發(fā)布數(shù)據(jù)的數(shù)量,模擬真實場景 無限數(shù)據(jù)

    @Override
        public void onNext(Integer item) {
            log.info("【onNext 接受到數(shù)據(jù) item : {}】 ", item);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            subscription.request(1);
        }

    //發(fā)布者 生成數(shù)據(jù)
    for (int i = 1; i <= 1000; i++) {
        log.info("【生產數(shù)據(jù) {} 】", i );
        //submit是一個阻塞方法输拇,此時會調用訂閱者的onNext方法
        publisher.submit(i);
    }

?直接看打印

?會發(fā)現(xiàn)發(fā)布者 生成數(shù)據(jù)到256后就會停止生產摘符,這是因為publisher.submit(i)方法是阻塞的,
內部有個緩沖數(shù)組最大容量就是256策吠,只有當訂閱者發(fā)送 subscription.request(1); 請求后逛裤,才會從緩沖數(shù)組里拿按照順序拿出數(shù)據(jù)傳給 onNext方法 供訂閱者處理,當subscription.request(1)這個方法被調用后猴抹,發(fā)布者發(fā)現(xiàn)數(shù)組里沒有滿才會再生產數(shù)據(jù)带族,這樣就防止了生產者一次生成過多的數(shù)據(jù)把訂閱者壓垮,從而實現(xiàn)了背壓機制

Xnip20200225_135111.png

?6.案例二 響應式帶 Processor 使用案例

??6.1創(chuàng)建自定義Processor

package com.johnny.webflux.webfluxlearn.reactivestream;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

/**
* 自定義 Processor
*
* @author johnny
* @create 2020-02-25 下午1:56
**/
@Slf4j
public class MyProcessor extends SubmissionPublisher<Integer> implements Flow.Processor<Integer, Integer> {

private Flow.Subscription subscription;

@Override
public void onSubscribe(Flow.Subscription subscription) {
    log.info("【Processor 收到訂閱請求】");
    //保存訂閱關系蟀给,需要用它來給發(fā)布者 相應
    this.subscription = subscription;

    this.subscription.request(1);
}

@Override
public void onNext(Integer item) {
    log.info("【onNext 收到發(fā)布者數(shù)據(jù)  : {} 】", item);

    //做業(yè)務處理蝙砌。。
    if (item % 2 == 0) {
        //篩選偶數(shù) 發(fā)送給 訂閱者
        this.submit(item);
    }
    this.subscription.request(1);
}

@Override
public void onError(Throwable throwable) {
    // 我們可以告訴發(fā)布者, 后面不接受數(shù)據(jù)了
    this.subscription.cancel();
 }

@Override
public void onComplete() {
    log.info("【處理器處理完畢】");
    this.close();
 }
}

??6.2 運行demo 關聯(lián)publisher 和 Processor 和 subscriber

package com.johnny.webflux.webfluxlearn.reactivestream;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;

/**
* 帶Processor的案例
*
* @author johnny
* @create 2020-02-25 下午2:17
**/
@Slf4j
public class ProcessorDemo {

public static void main(String[] args) throws InterruptedException {


    //創(chuàng)建發(fā)布者
    SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();


    //創(chuàng)建 Processor 即是發(fā)布者也是訂閱者
    MyProcessor myProcessor = new MyProcessor();


    //創(chuàng)建最終訂閱者
    Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<>() {

        private Flow.Subscription subscription;

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            this.subscription.request(1);
        }

        @Override
        public void onNext(Integer item) {
            log.info("【onNext 從Processor 接受到過濾后的 數(shù)據(jù) item : {}】 ", item);
            this.subscription.request(1);
        }

        @Override
        public void onError(Throwable throwable) {
            log.info("【onError 出現(xiàn)異嘲侠恚】");
            subscription.cancel();
        }

        @Override
        public void onComplete() {
            log.info("【onComplete 所有數(shù)據(jù)接收完成】");
        }
    };

    //建立關系 發(fā)布者和處理器择克, 此時處理器扮演 訂閱者
    publisher.subscribe(myProcessor);

    //建立關系 處理器和訂閱者  此時處理器扮演
    myProcessor.subscribe(subscriber);

    //發(fā)布者發(fā)布數(shù)據(jù)

    publisher.submit(1);
    publisher.submit(2);
    publisher.submit(3);
    publisher.submit(4);

    publisher.close();

    TimeUnit.SECONDS.sleep(2);

  }
}
Xnip20200225_143039.png

?7.總結

?本篇主要講解 JDK9特性 Reactive Stream 響應式流,介紹 Reactive Stream是什么 背壓是什么前普,以及JDK9中提供的關于Reactive Stream的接口和 2個使用案例包括如何使用Processor肚邢。

?只需要關注JDK9提供的 4個接口,以及內部的方法拭卿,對著案例敲一遍代碼 其實流程還是很簡單的 加油吧B夂<馈!

個人博客網站 https://www.askajohnny.com 歡迎來訪問勺鸦!
本文由博客一文多發(fā)平臺 OpenWrite 發(fā)布并巍!

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市换途,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌刽射,老刑警劉巖军拟,帶你破解...
    沈念sama閱讀 218,036評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異誓禁,居然都是意外死亡懈息,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,046評論 3 395
  • 文/潘曉璐 我一進店門摹恰,熙熙樓的掌柜王于貴愁眉苦臉地迎上來辫继,“玉大人,你說我怎么就攤上這事俗慈」每恚” “怎么了?”我有些...
    開封第一講書人閱讀 164,411評論 0 354
  • 文/不壞的土叔 我叫張陵闺阱,是天一觀的道長炮车。 經常有香客問我,道長酣溃,這世上最難降的妖魔是什么瘦穆? 我笑而不...
    開封第一講書人閱讀 58,622評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮赊豌,結果婚禮上扛或,老公的妹妹穿的比我還像新娘。我一直安慰自己碘饼,他們只是感情好熙兔,可當我...
    茶點故事閱讀 67,661評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著派昧,像睡著了一般黔姜。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上蒂萎,一...
    開封第一講書人閱讀 51,521評論 1 304
  • 那天秆吵,我揣著相機與錄音,去河邊找鬼五慈。 笑死纳寂,一個胖子當著我的面吹牛主穗,可吹牛的內容都是我干的。 我是一名探鬼主播毙芜,決...
    沈念sama閱讀 40,288評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼忽媒,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了腋粥?” 一聲冷哼從身側響起晦雨,我...
    開封第一講書人閱讀 39,200評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎隘冲,沒想到半個月后闹瞧,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經...
    沈念sama閱讀 45,644評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡展辞,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,837評論 3 336
  • 正文 我和宋清朗相戀三年奥邮,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片罗珍。...
    茶點故事閱讀 39,953評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡洽腺,死狀恐怖,靈堂內的尸體忽然破棺而出覆旱,到底是詐尸還是另有隱情蘸朋,我是刑警寧澤,帶...
    沈念sama閱讀 35,673評論 5 346
  • 正文 年R本政府宣布通殃,位于F島的核電站度液,受9級特大地震影響,放射性物質發(fā)生泄漏画舌。R本人自食惡果不足惜堕担,卻給世界環(huán)境...
    茶點故事閱讀 41,281評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望曲聂。 院中可真熱鬧霹购,春花似錦、人聲如沸朋腋。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,889評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽旭咽。三九已至贞奋,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間穷绵,已是汗流浹背轿塔。 一陣腳步聲響...
    開封第一講書人閱讀 33,011評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人勾缭。 一個月前我還...
    沈念sama閱讀 48,119評論 3 370
  • 正文 我出身青樓揍障,卻偏偏與公主長得像,于是被迫代替她去往敵國和親俩由。 傳聞我的和親對象是個殘疾皇子毒嫡,可洞房花燭夜當晚...
    茶點故事閱讀 44,901評論 2 355

推薦閱讀更多精彩內容