學(xué)習(xí)響應(yīng)式編程 Reactor (4) - reactor 轉(zhuǎn)換類操作符(1)

Reactor 操作符

數(shù)據(jù)在響應(yīng)式流中的處理醉箕,就像流過一條裝配流水線获高。Reactor 既是傳送帶材原,又是一個個的裝配工或機器人袍冷。原材料從源頭(最初的 Publisher )流出,經(jīng)過一個個的裝配線中裝配工或機器人的工位加工(operator 操作)坛缕,最終被加工成成品墓猎,等待被推送到消費者( subscribe 操作)。

在 Reactor 中赚楚,每個操作符對 Publisher 進行處理毙沾,然后將 Publisher 包裝為另一個新的 Publisher 。就像一個鏈條宠页,數(shù)據(jù)源自第一個 Publisher 左胞,然后順鏈條而下,在每個環(huán)節(jié)進行相應(yīng)的處理举户。最終烤宙,訂閱者(Subscriber )終結(jié)這個過程。所以俭嘁, 響應(yīng)式編程按照鏈式方式進行開發(fā)躺枕。

注意,如同 Java Stream 的終端操作供填,訂閱者( Subscriber )在沒有訂閱( subscribe )到一個發(fā)布者( Publisher )之前拐云,什么也不會發(fā)生。

如同 Java Stream 的中間操作一樣近她,Reactor 的 Flux 和 Mono 也為我們提供了多種操作符(遠多于 Stream )叉瘩,我們將它們分類如下:

序號 類型 操作符
1 轉(zhuǎn)換 as, cast, collect, collectList, collectMap, collectMultimap, collectSortedList, concatMap, concatMapDelayError, concatMapIterable, elapsed, expand, expandDeep, flatMap, flatMapDelayError, flatMapIterable, flatMapSequential, flatMapSequentialDelayError, groupJoin, handle, index, join, map, switchMap, switchOnFirst, then, thenEmpty, thenMany, timestamp, transform, transformDeferred
2 篩選 blockFirst, blockLast, distinct, distinctUntilChanged, elementAt, filter, filterWhen, ignoreElements, last, next, ofType, or, repeat, retry, single, singleOrEmpty, sort, take, takeLast, takeUntil, takeUntilOther, takeWhile
3 組合 concatWith, concatWithValues, mergeOrderWith, mergeWith, startWith, withLatestFrom, zipWith, zipWithIterable
4 條件 defaultIfEmpty, delayUntil, retryWhen, switchIfEmpty
5 時間 delayElements, delaySequence, delaySubscription, sample, sampleFirst, sampleTimeout, skip, skipLast, skipUntil, skipUntilOther, skipWhile, timeout
6 統(tǒng)計 count, reduce, reduceWith, scan, scanWith
7 匹配 all, any, hasElement, hasElements
8 分組 buffer, bufferTimeout, bufferUntil, bufferUntilChanged, bufferWhen, groupBy, window, windowTimeout, windowUntil, windowUntilChanged, windowWhen, windowWhile
9 事件 doAfterTerminate, doFinally, doFirst, doOnCancel, doOnComplete, doOnDiscard, doOnEach, doOnError, doOnNext, doOnRequest, doOnSubscribe, doOnTerminate, onBackpressureBuffer, onBackpressureDrop, onBackpressureError, onBackpressureLatest, onErrorContinue, onErrorMap, onErrorResume, onErrorReturn, onErrorStop
10 調(diào)試 checkpoint, hide, log
11 其它 cache, dematerialize, limitRate, limitRequest, materialize, metrics, name, onTerminateDetach, parallel, publish, publishNext, publishOn, replay, share, subscribeOn, subscriberContext, subscribeWith, tag

接下來我們來挨個學(xué)習(xí)各類的操作符,如同前面學(xué)習(xí)響應(yīng)式流創(chuàng)建一樣粘捎,講解操作符時薇缅,如果是 Flux 或 Mono 獨有的,會在方法名前增加類名前綴晌端。

轉(zhuǎn)換類操作符

轉(zhuǎn)換類的操作符數(shù)量最多捅暴,平常過程中也是使用最頻繁的。

as

將響應(yīng)式流轉(zhuǎn)換為目標類型咧纠,既可以是非響應(yīng)式對象蓬痒,也可以是 Flux 或 Mono。

Flux.range(3, 8)
    .as(Mono::from)
    .subscribe(System.out::println);

cast

將響應(yīng)式流內(nèi)的元素強轉(zhuǎn)為目標類型漆羔,如果類型不匹配(非父類類型或當前類型)梧奢,將拋出 ClassCastException ,見圖知意:

01_operator_flux_cast.png
Flux.range(1, 3)
    .cast(Number.class)
    .subscribe(System.out::println);

Flux#collect

通過應(yīng)用收集器演痒,將 Flux 發(fā)出的所有元素收集到一個容器中亲轨。當此流完成時,發(fā)出收集的結(jié)果鸟顺。 Flux 提供了 2 個重載方法惦蚊,主要區(qū)別在于應(yīng)用的收集器不同器虾,一個是 Java Stream 的 Collector, 另一個是自定義收集方法(同 Java Stream 中 collect 方法):

<R,A> Mono<R> collect(Collector<? super T,A,? extends R> collector);
<E> Mono<E> collect(Supplier<E> containerSupplier,
                     BiConsumer<E,? super T> collector);

見圖知意:

02_operator_flux_collect.png
Flux.range(1, 5)
    .collect(Collectors.toList())
    .subscribe(System.out::println);

Flux#collectList

當此 Flux 完成時蹦锋,將此流發(fā)出的所有元素收集到一個列表中兆沙,該列表由生成的 Mono 發(fā)出。見圖知意:

03_operator_flux_collectList.png
Flux.range(1, 5)
    .collectList()
    .subscribe(System.out::println);

Flux#collectMap

將 Flux 發(fā)出的所有元素按照鍵生成器和值生成器收集到 Map 中莉掂,之后由 Mono 發(fā)出葛圃。Flux 提供了 3 個重載方法:

<K> Mono<Map<K,T>> collectMap(Function<? super T,? extends K> keyExtractor);
<K,V> Mono<Map<K,V>> collectMap(Function<? super T,? extends K> keyExtractor,
                                 Function<? super T,? extends V> valueExtractor);
<K,V> Mono<Map<K,V>> collectMap(Function<? super T,? extends K> keyExtractor,
                                 Function<? super T,? extends V> valueExtractor,
                                 Supplier<Map<K,V>> mapSupplier);

它們的主要區(qū)別在于是否提供值生成器和初始的Map,意同 Java Stream 中的 Collectors#toMap 憎妙。見圖知意:

04_operator_flux_collectMap.png
Flux.just(1, 2, 3, 4, 5, 3, 1)
    .collectMap(n -> n, n -> n + 100)
    .subscribe(System.out::println);

Flux#collectMultimap

collectMultimap 與 collectMap 的區(qū)別在于库正,map 中的 value 類型不同,一個是集合厘唾,一個是元素褥符。 collectMultimap 對于流中出現(xiàn)重復(fù)的 key 的 value,加入到了集合中阅嘶,而 collectMap 做了替換属瓣。在這點上,reactor 不如 Java Stream 中的 Collectors#toMap 方法讯柔,沒有提供 key 重復(fù)時的合并函數(shù)抡蛙。也提供了 3 個重載方法。

<K> Mono<Map<K,Collection<T>>> collectMultimap(Function<? super T,? extends K> keyExtractor);
<K,V> Mono<Map<K,Collection<V>>> collectMultimap(Function<? super T,? extends K> keyExtractor,
                                                  Function<? super T,? extends V> valueExtractor);
<K,V> Mono<Map<K,Collection<V>>> collectMultimap(Function<? super T,? extends K> keyExtractor,
                                                  Function<? super T,? extends V> valueExtractor,
                                                  Supplier<Map<K,Collection<V>>> mapSupplier)

見圖知意:

05_operator_flux_collectMultimap.png
Flux.just(1, 2, 3, 4, 5, 3, 1)
    .collectMultimap(n -> n, n -> n + 100)
    .subscribe(System.out::println);

Flux#collectSortedList

將 Flux 發(fā)出的元素在完成時進行排序魂迄,之后由 Mono 發(fā)出粗截。Flux 提供了 2 個重載方法:

Mono<List<T>> collectSortedList();
Mono<List<T>> collectSortedList(@Nullable Comparator<? super T> comparator);

見圖知意:

06_operator_flux_collectSortedList.png
Flux.just(1, 3, 5, 3, 2, 5, 1, 4)
    .collectSortedList()
    .subscribe(System.out::println);

總結(jié)

本篇我們介紹了 Reactor 操作符的分類,之后介紹了部分轉(zhuǎn)換類操作符捣炬,講解示例時都是單個操作符熊昌,相信大家都能理解。

今天的內(nèi)容就學(xué)到這里湿酸,我們下篇繼續(xù)學(xué)習(xí) Reactor 的操作符婿屹。

源碼詳見:https://github.com/crystalxmumu/spring-web-flux-study-note 下 02-reactor-core-learning
模塊下 ReactorTransformOperatorTest 測試類。

參考

  1. Reactor 3 Reference Guide
  2. Reactor 3 中文指南
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末推溃,一起剝皮案震驚了整個濱河市昂利,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌铁坎,老刑警劉巖蜂奸,帶你破解...
    沈念sama閱讀 206,378評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異硬萍,居然都是意外死亡扩所,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,356評論 2 382
  • 文/潘曉璐 我一進店門朴乖,熙熙樓的掌柜王于貴愁眉苦臉地迎上來祖屏,“玉大人助赞,你說我怎么就攤上這事〈土樱” “怎么了嫉拐?”我有些...
    開封第一講書人閱讀 152,702評論 0 342
  • 文/不壞的土叔 我叫張陵哩都,是天一觀的道長魁兼。 經(jīng)常有香客問我,道長漠嵌,這世上最難降的妖魔是什么咐汞? 我笑而不...
    開封第一講書人閱讀 55,259評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮儒鹿,結(jié)果婚禮上化撕,老公的妹妹穿的比我還像新娘。我一直安慰自己约炎,他們只是感情好植阴,可當我...
    茶點故事閱讀 64,263評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著圾浅,像睡著了一般掠手。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上狸捕,一...
    開封第一講書人閱讀 49,036評論 1 285
  • 那天喷鸽,我揣著相機與錄音,去河邊找鬼灸拍。 笑死做祝,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的鸡岗。 我是一名探鬼主播混槐,決...
    沈念sama閱讀 38,349評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼轩性!你這毒婦竟也來了声登?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,979評論 0 259
  • 序言:老撾萬榮一對情侶失蹤炮姨,失蹤者是張志新(化名)和其女友劉穎捌刮,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體舒岸,經(jīng)...
    沈念sama閱讀 43,469評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡绅作,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,938評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了蛾派。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片俄认。...
    茶點故事閱讀 38,059評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡个少,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出眯杏,到底是詐尸還是另有隱情夜焦,我是刑警寧澤,帶...
    沈念sama閱讀 33,703評論 4 323
  • 正文 年R本政府宣布岂贩,位于F島的核電站茫经,受9級特大地震影響世落,放射性物質(zhì)發(fā)生泄漏先馆。R本人自食惡果不足惜兆蕉,卻給世界環(huán)境...
    茶點故事閱讀 39,257評論 3 307
  • 文/蒙蒙 一玄货、第九天 我趴在偏房一處隱蔽的房頂上張望专钉。 院中可真熱鬧代乃,春花似錦耻涛、人聲如沸烘绽。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,262評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至俊嗽,卻和暖如春雾家,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背乌询。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工榜贴, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人妹田。 一個月前我還...
    沈念sama閱讀 45,501評論 2 354
  • 正文 我出身青樓唬党,卻偏偏與公主長得像,于是被迫代替她去往敵國和親鬼佣。 傳聞我的和親對象是個殘疾皇子驶拱,可洞房花燭夜當晚...
    茶點故事閱讀 42,792評論 2 345