學(xué)習(xí)響應(yīng)式編程 Reactor (3) - reactor 基礎(chǔ)

Reactor

Reactor 項(xiàng)目的主要 artifact 是 reactor-core纫骑,這是一個(gè)基于 Java 8 的實(shí)現(xiàn)了響應(yīng)式流規(guī)范的響應(yīng)式庫(kù)用狱。

Reactor 提供了實(shí)現(xiàn) Publisher 的響應(yīng)式類 Flux 和 Mono校辩,以及豐富的操作符隐孽。一個(gè) Flux 代表 0...N 個(gè)元素的響應(yīng)式流侠草;一個(gè) Mono 代表 0|1 個(gè)元素的響應(yīng)式流。

Flux 和 Mono 之間可以轉(zhuǎn)換罩旋,比如 Flux 的 count 操作(計(jì)算流中元素個(gè)數(shù))返回 Mono,Mono 的 concatWith 操作(連接另一個(gè)響應(yīng)式流)返回 Flux眶诈。

Flux

Flux<T> 是一個(gè)能夠發(fā)出 0 到 N 個(gè)元素的標(biāo)準(zhǔn) Publisher<T>涨醋,它會(huì)被一個(gè)完成(completion)或錯(cuò)誤(error)信號(hào)終止。因此册养,一個(gè) Flux 的可能結(jié)果是 value东帅、completion
或 value,這三個(gè)分別會(huì)傳遞給訂閱者中的 onNext球拦、onComplete靠闭、onError 方法。

注意:所有的信號(hào)事件坎炼,包括代表終止的信號(hào)事件都是可選的愧膀。如果沒(méi)有 onNext 事件,但是有 onComplete 事件谣光,那么發(fā)出的就是空的有限流檩淋;如果去掉 onComplete
就得到一個(gè) 無(wú)限的空數(shù)據(jù)流。無(wú)限的數(shù)據(jù)流可以不是空的萄金,比如 Flux.interval(Duration) 生成的是一個(gè) Flux<Long>蟀悦,這是一個(gè)無(wú)限周期性發(fā)出規(guī)律整數(shù)的時(shí)鐘數(shù)據(jù)流。

下圖展示的是 Flux 基于時(shí)間線的彈珠交互圖氧敢,通過(guò)操作符轉(zhuǎn)換 Flux 中元素:

04_reactor_flux_transform.png
  • 上面那條線表示的是 Flux 數(shù)據(jù)流時(shí)間線日戈,時(shí)間從左至右
  • 上面那條線中的彈珠代表示的是 Flux 發(fā)出的 數(shù)據(jù)元素
  • 上面那條線最后的垂直線表示的是 Flux 已經(jīng)完成成功事件
  • 中間的箭頭虛線和框表示的是 Flux 中的元素正在被轉(zhuǎn)換,框內(nèi)的文字表示的是轉(zhuǎn)換的方式(包含操作符)
  • 下面那條線表示的是 FLux 經(jīng)過(guò)轉(zhuǎn)換后的新數(shù)據(jù)流
  • 如果由于某種原因?qū)е?Flux 的轉(zhuǎn)換終止孙乖,將使用 X 來(lái)代替 垂直線

后續(xù)在學(xué)習(xí)操作符的過(guò)程中浙炼,我們將見(jiàn)到很多類似的彈珠圖份氧,請(qǐng)大家詳細(xì)了解清楚該圖各部分的含義。

Mono

Mono<T> 是一種特殊的Publisher<T>弯屈,它最多只能發(fā)出一個(gè)元素蜗帜,然后(可選的)終止于 onComplete 或 onError 信號(hào)。

Mono 中的操作符是 Flux 中操作符的子集资厉,即 Flux 中只有部分操作符適用于 Mono厅缺,有些操作符是將 Mono 和另一個(gè) Publisher 連接轉(zhuǎn)換為 Flux。例如酌住,Mono#concatWith(Publisher
) 轉(zhuǎn)換為 Flux店归,Mono#then(Mono) 返回另一個(gè) Mono。

注意:可以使用 Mono<Void> 來(lái)創(chuàng)建一個(gè)只有完成概念的空值異步處理過(guò)程(類似于 Runnable)酪我。

下圖展示的是 Mono 基于時(shí)間線的彈珠交互圖:

05_reactor_mono_transform.png

創(chuàng)建 Flux 和 Mono

如同創(chuàng)建 Java Stream 一樣消痛,Reactor 也為我們提供了 多個(gè)工廠方法用來(lái)創(chuàng)建 Flux 和 Mono,有了 Stream 的基礎(chǔ)都哭,創(chuàng)建的基本方法我們來(lái)快速過(guò)一下秩伞。

下面的創(chuàng)建方法,如果是 Flux 或 Mono 獨(dú)有的欺矫,會(huì)在方法名前增加類名前綴纱新。

下面的示例代碼中都有用到 subscribe 方法,下面會(huì)講到穆趴,大家先了解它是響應(yīng)式流的訂閱方法脸爱,用于觸發(fā)流,類似于 Java Stream 中的終端操作未妹。

just

使用提供的元素發(fā)出數(shù)據(jù)然后結(jié)束的流簿废。

06_reactor_flux_just.png
Mono.just("hello, world").subscribe(System.out::println);
Mono.justOrEmpty(str).subscribe(System.out::println);
Mono.justOrEmpty(optional).subscribe(System.out::println);

Flux.just("hello", "world").subscribe(System.out::println);
Flux.just("hello").subscribe(System.out::println);

Flux#fromXxx

Flux 提供了 fromArray(從數(shù)組)、fromIterable(從迭代器)络它、fromStream(從 Java Stream 流) 的方式來(lái)創(chuàng)建 Flux族檬。

String[] array = new String[]{"hello", "reactor", "flux"};
List<String> iterable = Arrays.asList("foo", "bar", "foobar");

Flux.fromArray(array).subscribe(System.out::println);
Flux.fromIterable(iterable).subscribe(System.out::println);
Flux.fromStream(Arrays.stream(array)).subscribe(System.out::println);

Flux#range

從 start 開(kāi)始構(gòu)建一個(gè) Flux,該 Flux 僅發(fā)出一系列遞增計(jì)數(shù)的整數(shù)化戳。 也就是說(shuō)单料,在 start(包括)和 start + count(排除)之間發(fā)出整數(shù),然后完成点楼。見(jiàn)圖識(shí)意:

07_reactor_flux_range.png
Flux.range(3, 5).subscribe(System.out::println);

Flux#interval

在全局計(jì)時(shí)器上創(chuàng)建一個(gè) Flux扫尖,該 Flux 在初始延遲后,發(fā)出從0開(kāi)始并以指定的時(shí)間間隔遞增的長(zhǎng)整數(shù)掠廓。 如果未及時(shí)產(chǎn)生藏斩,則會(huì)通過(guò)溢出 IllegalStateException 發(fā)出 onError
信號(hào),詳細(xì)說(shuō)明無(wú)法發(fā)出的原因却盘。 在正常情況下狰域,F(xiàn)lux 將永遠(yuǎn)不會(huì)完成。interval 提供了 3 個(gè)重載方法黄橘,三者的區(qū)別主要在于是否延遲發(fā)出兆览、以及使用的調(diào)度器。

interval 生成的是一個(gè)無(wú)限數(shù)據(jù)流塞关。

Flux<Long> interval(Duration period)
Flux<Long> interval(Duration delay, Duration period)
Flux<Long> interval(Duration delay, Duration period, Scheduler timer)
  • 第 1 個(gè)方法抬探,沒(méi)有延遲,按照 period 的周期立即發(fā)出帆赢,默認(rèn)使用 Schedulers.parallel() 調(diào)度器
  • 第 2 個(gè)方法小压,以 delay 延遲,按照 period 的周期發(fā)出椰于,默認(rèn)使用 Schedulers.parallel() 調(diào)度器
  • 第 3 個(gè)方法怠益,以 delay 延遲,按照 period 的周期發(fā)出瘾婿,使用指定的調(diào)度器

見(jiàn)圖識(shí)意:

10_reactor_flux_interval.png
Flux.interval(Duration.ofMillis(30), Duration.ofMillis(500)).subscribe(System.out::println);

empty

生成一個(gè)空的有限流蜻牢。見(jiàn)圖識(shí)意:

08_reactor_mono_empty.png
Flux.empty().subscribe(System.out::println, System.out::println, () -> System.out.println("結(jié)束"));

never

生成一個(gè)空的無(wú)限流。見(jiàn)圖識(shí)意:

09_reactor_flux_never.png
Flux.never().subscribe(System.out::println, System.out::println, () -> System.out.println("結(jié)束"));

error

生成一個(gè)錯(cuò)誤流偏陪。error 有 3 個(gè)重載方法抢呆,它們的主要區(qū)別是否立即生成錯(cuò)誤及是否由 Supplier 提供,見(jiàn)圖識(shí)意:

13_reactor_flux_error.png
Flux.error(new IllegalStateException(), true)
    .log()
    .subscribe(System.out::println, System.err::println);

其它

Flux 和 Mono 還提供了編程式的創(chuàng)建數(shù)據(jù)流的方法笛谦,諸如 create抱虐、generate、push饥脑、handle 等的方式恳邀,這些內(nèi)容暫時(shí)不是我們的重點(diǎn),這里我們不細(xì)展開(kāi)好啰,感興趣的可看 Api 進(jìn)行研究下轩娶。

11_reactor_flux_generate.png

訂閱 Flux 和 Mono

在上面創(chuàng)建 Flux 和 Mono 筆記的示例代碼中,我們已經(jīng)提到了 subscribe 訂閱框往,在 subscribe 訂閱中鳄抒,F(xiàn)lux 和 Mono 支持 Java 8 Lambda 表達(dá)式。下面我們來(lái)看看 Reactor
為我們提供了哪些訂閱方法椰弊。

subscribe(); // ①

subscribe(Consumer<? super T> consumer);  // ②

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer); // ③

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer); // ④

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer,
          Consumer<? super Subscription> subscriptionConsumer); // ⑤

subscribe(Subscriber<? super T> actual); // ⑥
  1. 序號(hào)① 訂閱并觸發(fā)響應(yīng)式流许溅。
  2. 序號(hào)② 對(duì)每個(gè)生成的元素進(jìn)行消費(fèi)。
  3. 序號(hào)③ 對(duì)正常元素進(jìn)行消費(fèi)秉版,對(duì)錯(cuò)誤進(jìn)行響應(yīng)處理贤重。
  4. 序號(hào)④ 對(duì)正常元素和錯(cuò)誤均有響應(yīng),還定義了響應(yīng)流正常完成后的回調(diào)清焕。
  5. 序號(hào)⑤ 對(duì)正常元素并蝗、錯(cuò)誤信號(hào)和完成信號(hào)均有響應(yīng)祭犯,同時(shí)也定義了 對(duì)該 subscribe 返回的 Subscription 的回調(diào)處理。
  6. 序號(hào)⑥ 通過(guò)自定義實(shí)現(xiàn) Subscriber 接口來(lái)訂閱滚停。

注意:序號(hào)⑤ 變量傳遞一個(gè) Subscription 的引用沃粗,如果不再需要更多元素時(shí),可以通過(guò)它來(lái)取消訂閱键畴。取消訂閱時(shí)最盅,源頭會(huì)停止生成數(shù)據(jù),并清理相關(guān)的資源起惕。取消和清理的操作是在 Disposable 接口中定義的涡贱。

來(lái)看下序號(hào) ⑤ 的 subscribe 的彈珠圖:

12_reactor_flux_subscribe.png
Flux.range(1, 4)
        .subscribe(System.out::println,
                error -> System.err.println("發(fā)生錯(cuò)誤:" + error),
                () -> System.out.println("完成"),
                sub -> {
                    System.out.println("已訂閱");
                    // 理解背壓
                    // 嘗試修改下 request 中的值,看看有啥變化
                    sub.request(10);
                });

注意:序號(hào)⑥ 的方式支持背壓等操作惹想,不在我們本次筆記的范疇问词,我們還是先略過(guò),后期在學(xué)習(xí)勺馆。

補(bǔ)充

在上節(jié)我們講解 Reactor 調(diào)試部分時(shí)戏售,遺漏了記錄數(shù)據(jù)流的日志方法,再此做下補(bǔ)充:除了基于 stack trace 的方式調(diào)試分析草穆,我們還可以使用 log
操作符灌灾,來(lái)跟蹤響應(yīng)式流并記錄日志。將它添加到操作鏈上之后悲柱,它會(huì)讀取每一個(gè)再其上游的 Flux 和 Mono 事件(包括 onNext锋喜、onError、onComplete豌鸡、Subscribe嘿般、Cancel 和 Request)。

// 嘗試交換下 take 和 log 的順序涯冠,看看有啥變化
Flux.range(1, 10)
        // .log()
        .take(3)
        .log()
        .subscribe();

總結(jié)

本篇我們介紹了 Reactor 的基礎(chǔ)知識(shí):先是了解了 Reactor 為我們提供的響應(yīng)式流類 Flux 和 Mono炉奴,之后學(xué)習(xí)了如何創(chuàng)建他們和訂閱他們,因?yàn)橛兄?Stream
的基礎(chǔ)蛇更,想來(lái)大家對(duì)這些知識(shí)點(diǎn)都好理解和接受瞻赶。

今天的內(nèi)容就學(xué)到這里,我們下篇開(kāi)始學(xué)習(xí) Reactor 的操作符派任。

源碼詳見(jiàn):https://github.com/crystalxmumu/spring-web-flux-study-note 下 02-reactor-core-learning
模塊下 ReactorBasicLearningTest 測(cè)試類砸逊。

參考

  1. Reactor 3 Reference Guide
  2. Reactor 3 中文指南
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市掌逛,隨后出現(xiàn)的幾起案子师逸,更是在濱河造成了極大的恐慌,老刑警劉巖豆混,帶你破解...
    沈念sama閱讀 221,635評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件篓像,死亡現(xiàn)場(chǎng)離奇詭異动知,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)遗淳,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,543評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門沐祷,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)袖外,“玉大人,你說(shuō)我怎么就攤上這事屏富≈校” “怎么了养叛?”我有些...
    開(kāi)封第一講書(shū)人閱讀 168,083評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)宰翅。 經(jīng)常有香客問(wèn)我弃甥,道長(zhǎng),這世上最難降的妖魔是什么汁讼? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,640評(píng)論 1 296
  • 正文 為了忘掉前任淆攻,我火速辦了婚禮,結(jié)果婚禮上嘿架,老公的妹妹穿的比我還像新娘瓶珊。我一直安慰自己,他們只是感情好耸彪,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,640評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布伞芹。 她就那樣靜靜地躺著,像睡著了一般蝉娜。 火紅的嫁衣襯著肌膚如雪唱较。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 52,262評(píng)論 1 308
  • 那天召川,我揣著相機(jī)與錄音南缓,去河邊找鬼。 笑死荧呐,一個(gè)胖子當(dāng)著我的面吹牛汉形,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播坛增,決...
    沈念sama閱讀 40,833評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼获雕,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了收捣?” 一聲冷哼從身側(cè)響起届案,我...
    開(kāi)封第一講書(shū)人閱讀 39,736評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎罢艾,沒(méi)想到半個(gè)月后楣颠,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體尽纽,經(jīng)...
    沈念sama閱讀 46,280評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,369評(píng)論 3 340
  • 正文 我和宋清朗相戀三年童漩,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了弄贿。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,503評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡矫膨,死狀恐怖差凹,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情侧馅,我是刑警寧澤危尿,帶...
    沈念sama閱讀 36,185評(píng)論 5 350
  • 正文 年R本政府宣布,位于F島的核電站馁痴,受9級(jí)特大地震影響谊娇,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜罗晕,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,870評(píng)論 3 333
  • 文/蒙蒙 一济欢、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧小渊,春花似錦法褥、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,340評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至梆惯,卻和暖如春酱鸭,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背垛吗。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,460評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工凹髓, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人怯屉。 一個(gè)月前我還...
    沈念sama閱讀 48,909評(píng)論 3 376
  • 正文 我出身青樓蔚舀,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親锨络。 傳聞我的和親對(duì)象是個(gè)殘疾皇子赌躺,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,512評(píng)論 2 359