Reactor
Reactor 項(xiàng)目的主要 artifact 是 reactor-core纫骑,這是一個(gè)基于 Java 8 的實(shí)現(xiàn)了響應(yīng)式流規(guī)范的響應(yīng)式庫(kù)用狱。
Reactor 提供了實(shí)現(xiàn) Publisher 的響應(yīng)式類 Flux 和 Mono校辩,以及豐富的操作符隐孽。一個(gè) Flux 代表 0...N 個(gè)元素的響應(yīng)式流侠草;一個(gè) Mono 代表 0|1 個(gè)元素的響應(yīng)式流。
Flux 和 Mono 之間可以轉(zhuǎn)換罩旋,比如 Flux 的 count 操作(計(jì)算流中元素個(gè)數(shù))返回 Mono,Mono 的 concatWith 操作(連接另一個(gè)響應(yīng)式流)返回 Flux眶诈。
Flux
Flux<T> 是一個(gè)能夠發(fā)出 0 到 N 個(gè)元素的標(biāo)準(zhǔn) Publisher<T>涨醋,它會(huì)被一個(gè)完成(completion)或錯(cuò)誤(error)信號(hào)終止。因此册养,一個(gè) Flux 的可能結(jié)果是 value东帅、completion
或 value,這三個(gè)分別會(huì)傳遞給訂閱者中的 onNext球拦、onComplete靠闭、onError 方法。
注意:所有的信號(hào)事件坎炼,包括代表終止的信號(hào)事件都是可選的愧膀。如果沒(méi)有 onNext 事件,但是有 onComplete 事件谣光,那么發(fā)出的就是空的有限流檩淋;如果去掉 onComplete
就得到一個(gè) 無(wú)限的空數(shù)據(jù)流。無(wú)限的數(shù)據(jù)流可以不是空的萄金,比如 Flux.interval(Duration) 生成的是一個(gè) Flux<Long>蟀悦,這是一個(gè)無(wú)限周期性發(fā)出規(guī)律整數(shù)的時(shí)鐘數(shù)據(jù)流。
下圖展示的是 Flux 基于時(shí)間線的彈珠交互圖氧敢,通過(guò)操作符轉(zhuǎn)換 Flux 中元素:
- 上面那條線表示的是 Flux 數(shù)據(jù)流時(shí)間線日戈,時(shí)間從左至右
- 上面那條線中的彈珠代表示的是 Flux 發(fā)出的 數(shù)據(jù)元素
- 上面那條線最后的垂直線表示的是 Flux 已經(jīng)完成成功事件
- 中間的箭頭虛線和框表示的是 Flux 中的元素正在被轉(zhuǎn)換,框內(nèi)的文字表示的是轉(zhuǎn)換的方式(包含操作符)
- 下面那條線表示的是 FLux 經(jīng)過(guò)轉(zhuǎn)換后的新數(shù)據(jù)流
- 如果由于某種原因?qū)е?Flux 的轉(zhuǎn)換終止孙乖,將使用 X 來(lái)代替 垂直線
后續(xù)在學(xué)習(xí)操作符的過(guò)程中浙炼,我們將見(jiàn)到很多類似的彈珠圖份氧,請(qǐng)大家詳細(xì)了解清楚該圖各部分的含義。
Mono
Mono<T> 是一種特殊的Publisher<T>弯屈,它最多只能發(fā)出一個(gè)元素蜗帜,然后(可選的)終止于 onComplete 或 onError 信號(hào)。
Mono 中的操作符是 Flux 中操作符的子集资厉,即 Flux 中只有部分操作符適用于 Mono厅缺,有些操作符是將 Mono 和另一個(gè) Publisher 連接轉(zhuǎn)換為 Flux。例如酌住,Mono#concatWith(Publisher
) 轉(zhuǎn)換為 Flux店归,Mono#then(Mono) 返回另一個(gè) Mono。
注意:可以使用 Mono<Void> 來(lái)創(chuàng)建一個(gè)只有完成概念的空值異步處理過(guò)程(類似于 Runnable)酪我。
下圖展示的是 Mono 基于時(shí)間線的彈珠交互圖:
創(chuàng)建 Flux 和 Mono
如同創(chuàng)建 Java Stream 一樣消痛,Reactor 也為我們提供了 多個(gè)工廠方法用來(lái)創(chuàng)建 Flux 和 Mono,有了 Stream 的基礎(chǔ)都哭,創(chuàng)建的基本方法我們來(lái)快速過(guò)一下秩伞。
下面的創(chuàng)建方法,如果是 Flux 或 Mono 獨(dú)有的欺矫,會(huì)在方法名前增加類名前綴纱新。
下面的示例代碼中都有用到 subscribe 方法,下面會(huì)講到穆趴,大家先了解它是響應(yīng)式流的訂閱方法脸爱,用于觸發(fā)流,類似于 Java Stream 中的終端操作未妹。
just
使用提供的元素發(fā)出數(shù)據(jù)然后結(jié)束的流簿废。
Mono.just("hello, world").subscribe(System.out::println);
Mono.justOrEmpty(str).subscribe(System.out::println);
Mono.justOrEmpty(optional).subscribe(System.out::println);
Flux.just("hello", "world").subscribe(System.out::println);
Flux.just("hello").subscribe(System.out::println);
Flux#fromXxx
Flux 提供了 fromArray(從數(shù)組)、fromIterable(從迭代器)络它、fromStream(從 Java Stream 流) 的方式來(lái)創(chuàng)建 Flux族檬。
String[] array = new String[]{"hello", "reactor", "flux"};
List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux.fromArray(array).subscribe(System.out::println);
Flux.fromIterable(iterable).subscribe(System.out::println);
Flux.fromStream(Arrays.stream(array)).subscribe(System.out::println);
Flux#range
從 start 開(kāi)始構(gòu)建一個(gè) Flux,該 Flux 僅發(fā)出一系列遞增計(jì)數(shù)的整數(shù)化戳。 也就是說(shuō)单料,在 start(包括)和 start + count(排除)之間發(fā)出整數(shù),然后完成点楼。見(jiàn)圖識(shí)意:
Flux.range(3, 5).subscribe(System.out::println);
Flux#interval
在全局計(jì)時(shí)器上創(chuàng)建一個(gè) Flux扫尖,該 Flux 在初始延遲后,發(fā)出從0開(kāi)始并以指定的時(shí)間間隔遞增的長(zhǎng)整數(shù)掠廓。 如果未及時(shí)產(chǎn)生藏斩,則會(huì)通過(guò)溢出 IllegalStateException 發(fā)出 onError
信號(hào),詳細(xì)說(shuō)明無(wú)法發(fā)出的原因却盘。 在正常情況下狰域,F(xiàn)lux 將永遠(yuǎn)不會(huì)完成。interval 提供了 3 個(gè)重載方法黄橘,三者的區(qū)別主要在于是否延遲發(fā)出兆览、以及使用的調(diào)度器。
interval 生成的是一個(gè)無(wú)限數(shù)據(jù)流塞关。
Flux<Long> interval(Duration period)
Flux<Long> interval(Duration delay, Duration period)
Flux<Long> interval(Duration delay, Duration period, Scheduler timer)
- 第 1 個(gè)方法抬探,沒(méi)有延遲,按照 period 的周期立即發(fā)出帆赢,默認(rèn)使用 Schedulers.parallel() 調(diào)度器
- 第 2 個(gè)方法小压,以 delay 延遲,按照 period 的周期發(fā)出椰于,默認(rèn)使用 Schedulers.parallel() 調(diào)度器
- 第 3 個(gè)方法怠益,以 delay 延遲,按照 period 的周期發(fā)出瘾婿,使用指定的調(diào)度器
見(jiàn)圖識(shí)意:
Flux.interval(Duration.ofMillis(30), Duration.ofMillis(500)).subscribe(System.out::println);
empty
生成一個(gè)空的有限流蜻牢。見(jiàn)圖識(shí)意:
Flux.empty().subscribe(System.out::println, System.out::println, () -> System.out.println("結(jié)束"));
never
生成一個(gè)空的無(wú)限流。見(jiàn)圖識(shí)意:
Flux.never().subscribe(System.out::println, System.out::println, () -> System.out.println("結(jié)束"));
error
生成一個(gè)錯(cuò)誤流偏陪。error 有 3 個(gè)重載方法抢呆,它們的主要區(qū)別是否立即生成錯(cuò)誤及是否由 Supplier 提供,見(jiàn)圖識(shí)意:
Flux.error(new IllegalStateException(), true)
.log()
.subscribe(System.out::println, System.err::println);
其它
Flux 和 Mono 還提供了編程式的創(chuàng)建數(shù)據(jù)流的方法笛谦,諸如 create抱虐、generate、push饥脑、handle 等的方式恳邀,這些內(nèi)容暫時(shí)不是我們的重點(diǎn),這里我們不細(xì)展開(kāi)好啰,感興趣的可看 Api 進(jìn)行研究下轩娶。
訂閱 Flux 和 Mono
在上面創(chuàng)建 Flux 和 Mono 筆記的示例代碼中,我們已經(jīng)提到了 subscribe 訂閱框往,在 subscribe 訂閱中鳄抒,F(xiàn)lux 和 Mono 支持 Java 8 Lambda 表達(dá)式。下面我們來(lái)看看 Reactor
為我們提供了哪些訂閱方法椰弊。
subscribe(); // ①
subscribe(Consumer<? super T> consumer); // ②
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer); // ③
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer); // ④
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer); // ⑤
subscribe(Subscriber<? super T> actual); // ⑥
- 序號(hào)① 訂閱并觸發(fā)響應(yīng)式流许溅。
- 序號(hào)② 對(duì)每個(gè)生成的元素進(jìn)行消費(fèi)。
- 序號(hào)③ 對(duì)正常元素進(jìn)行消費(fèi)秉版,對(duì)錯(cuò)誤進(jìn)行響應(yīng)處理贤重。
- 序號(hào)④ 對(duì)正常元素和錯(cuò)誤均有響應(yīng),還定義了響應(yīng)流正常完成后的回調(diào)清焕。
- 序號(hào)⑤ 對(duì)正常元素并蝗、錯(cuò)誤信號(hào)和完成信號(hào)均有響應(yīng)祭犯,同時(shí)也定義了 對(duì)該 subscribe 返回的 Subscription 的回調(diào)處理。
- 序號(hào)⑥ 通過(guò)自定義實(shí)現(xiàn) Subscriber 接口來(lái)訂閱滚停。
注意:序號(hào)⑤ 變量傳遞一個(gè) Subscription 的引用沃粗,如果不再需要更多元素時(shí),可以通過(guò)它來(lái)取消訂閱键畴。取消訂閱時(shí)最盅,源頭會(huì)停止生成數(shù)據(jù),并清理相關(guān)的資源起惕。取消和清理的操作是在 Disposable 接口中定義的涡贱。
來(lái)看下序號(hào) ⑤ 的 subscribe 的彈珠圖:
Flux.range(1, 4)
.subscribe(System.out::println,
error -> System.err.println("發(fā)生錯(cuò)誤:" + error),
() -> System.out.println("完成"),
sub -> {
System.out.println("已訂閱");
// 理解背壓
// 嘗試修改下 request 中的值,看看有啥變化
sub.request(10);
});
注意:序號(hào)⑥ 的方式支持背壓等操作惹想,不在我們本次筆記的范疇问词,我們還是先略過(guò),后期在學(xué)習(xí)勺馆。
補(bǔ)充
在上節(jié)我們講解 Reactor 調(diào)試部分時(shí)戏售,遺漏了記錄數(shù)據(jù)流的日志方法,再此做下補(bǔ)充:除了基于 stack trace 的方式調(diào)試分析草穆,我們還可以使用 log
操作符灌灾,來(lái)跟蹤響應(yīng)式流并記錄日志。將它添加到操作鏈上之后悲柱,它會(huì)讀取每一個(gè)再其上游的 Flux 和 Mono 事件(包括 onNext锋喜、onError、onComplete豌鸡、Subscribe嘿般、Cancel 和 Request)。
// 嘗試交換下 take 和 log 的順序涯冠,看看有啥變化
Flux.range(1, 10)
// .log()
.take(3)
.log()
.subscribe();
總結(jié)
本篇我們介紹了 Reactor 的基礎(chǔ)知識(shí):先是了解了 Reactor 為我們提供的響應(yīng)式流類 Flux 和 Mono炉奴,之后學(xué)習(xí)了如何創(chuàng)建他們和訂閱他們,因?yàn)橛兄?Stream
的基礎(chǔ)蛇更,想來(lái)大家對(duì)這些知識(shí)點(diǎn)都好理解和接受瞻赶。
今天的內(nèi)容就學(xué)到這里,我們下篇開(kāi)始學(xué)習(xí) Reactor 的操作符派任。
源碼詳見(jiàn):https://github.com/crystalxmumu/spring-web-flux-study-note 下 02-reactor-core-learning
模塊下 ReactorBasicLearningTest 測(cè)試類砸逊。