Reactor 操作符
數(shù)據(jù)在響應(yīng)式流中的處理醉箕,就像流過一條裝配流水線获高。Reactor 既是傳送帶材原,又是一個個的裝配工或機器人袍冷。原材料從源頭(最初的 Publisher )流出,經(jīng)過一個個的裝配線中裝配工或機器人的工位加工(operator 操作)坛缕,最終被加工成成品墓猎,等待被推送到消費者( subscribe 操作)。
在 Reactor 中赚楚,每個操作符對 Publisher 進行處理毙沾,然后將 Publisher 包裝為另一個新的 Publisher 。就像一個鏈條宠页,數(shù)據(jù)源自第一個 Publisher 左胞,然后順鏈條而下,在每個環(huán)節(jié)進行相應(yīng)的處理举户。最終烤宙,訂閱者(Subscriber )終結(jié)這個過程。所以俭嘁, 響應(yīng)式編程按照鏈式方式進行開發(fā)躺枕。
注意,如同 Java Stream 的終端操作供填,訂閱者( Subscriber )在沒有訂閱( subscribe )到一個發(fā)布者( Publisher )之前拐云,什么也不會發(fā)生。
如同 Java Stream 的中間操作一樣近她,Reactor 的 Flux 和 Mono 也為我們提供了多種操作符(遠多于 Stream )叉瘩,我們將它們分類如下:
序號 | 類型 | 操作符 |
---|---|---|
1 | 轉(zhuǎn)換 | as, cast, collect, collectList, collectMap, collectMultimap, collectSortedList, concatMap, concatMapDelayError, concatMapIterable, elapsed, expand, expandDeep, flatMap, flatMapDelayError, flatMapIterable, flatMapSequential, flatMapSequentialDelayError, groupJoin, handle, index, join, map, switchMap, switchOnFirst, then, thenEmpty, thenMany, timestamp, transform, transformDeferred |
2 | 篩選 | blockFirst, blockLast, distinct, distinctUntilChanged, elementAt, filter, filterWhen, ignoreElements, last, next, ofType, or, repeat, retry, single, singleOrEmpty, sort, take, takeLast, takeUntil, takeUntilOther, takeWhile |
3 | 組合 | concatWith, concatWithValues, mergeOrderWith, mergeWith, startWith, withLatestFrom, zipWith, zipWithIterable |
4 | 條件 | defaultIfEmpty, delayUntil, retryWhen, switchIfEmpty |
5 | 時間 | delayElements, delaySequence, delaySubscription, sample, sampleFirst, sampleTimeout, skip, skipLast, skipUntil, skipUntilOther, skipWhile, timeout |
6 | 統(tǒng)計 | count, reduce, reduceWith, scan, scanWith |
7 | 匹配 | all, any, hasElement, hasElements |
8 | 分組 | buffer, bufferTimeout, bufferUntil, bufferUntilChanged, bufferWhen, groupBy, window, windowTimeout, windowUntil, windowUntilChanged, windowWhen, windowWhile |
9 | 事件 | doAfterTerminate, doFinally, doFirst, doOnCancel, doOnComplete, doOnDiscard, doOnEach, doOnError, doOnNext, doOnRequest, doOnSubscribe, doOnTerminate, onBackpressureBuffer, onBackpressureDrop, onBackpressureError, onBackpressureLatest, onErrorContinue, onErrorMap, onErrorResume, onErrorReturn, onErrorStop |
10 | 調(diào)試 | checkpoint, hide, log |
11 | 其它 | cache, dematerialize, limitRate, limitRequest, materialize, metrics, name, onTerminateDetach, parallel, publish, publishNext, publishOn, replay, share, subscribeOn, subscriberContext, subscribeWith, tag |
接下來我們來挨個學(xué)習(xí)各類的操作符,如同前面學(xué)習(xí)響應(yīng)式流創(chuàng)建一樣粘捎,講解操作符時薇缅,如果是 Flux 或 Mono 獨有的,會在方法名前增加類名前綴晌端。
轉(zhuǎn)換類操作符
轉(zhuǎn)換類的操作符數(shù)量最多捅暴,平常過程中也是使用最頻繁的。
as
將響應(yīng)式流轉(zhuǎn)換為目標類型咧纠,既可以是非響應(yīng)式對象蓬痒,也可以是 Flux 或 Mono。
Flux.range(3, 8)
.as(Mono::from)
.subscribe(System.out::println);
cast
將響應(yīng)式流內(nèi)的元素強轉(zhuǎn)為目標類型漆羔,如果類型不匹配(非父類類型或當前類型)梧奢,將拋出 ClassCastException ,見圖知意:
Flux.range(1, 3)
.cast(Number.class)
.subscribe(System.out::println);
Flux#collect
通過應(yīng)用收集器演痒,將 Flux 發(fā)出的所有元素收集到一個容器中亲轨。當此流完成時,發(fā)出收集的結(jié)果鸟顺。 Flux 提供了 2 個重載方法惦蚊,主要區(qū)別在于應(yīng)用的收集器不同器虾,一個是 Java Stream 的 Collector, 另一個是自定義收集方法(同 Java Stream 中 collect 方法):
<R,A> Mono<R> collect(Collector<? super T,A,? extends R> collector);
<E> Mono<E> collect(Supplier<E> containerSupplier,
BiConsumer<E,? super T> collector);
見圖知意:
Flux.range(1, 5)
.collect(Collectors.toList())
.subscribe(System.out::println);
Flux#collectList
當此 Flux 完成時蹦锋,將此流發(fā)出的所有元素收集到一個列表中兆沙,該列表由生成的 Mono 發(fā)出。見圖知意:
Flux.range(1, 5)
.collectList()
.subscribe(System.out::println);
Flux#collectMap
將 Flux 發(fā)出的所有元素按照鍵生成器和值生成器收集到 Map 中莉掂,之后由 Mono 發(fā)出葛圃。Flux 提供了 3 個重載方法:
<K> Mono<Map<K,T>> collectMap(Function<? super T,? extends K> keyExtractor);
<K,V> Mono<Map<K,V>> collectMap(Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor);
<K,V> Mono<Map<K,V>> collectMap(Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor,
Supplier<Map<K,V>> mapSupplier);
它們的主要區(qū)別在于是否提供值生成器和初始的Map,意同 Java Stream 中的 Collectors#toMap 憎妙。見圖知意:
Flux.just(1, 2, 3, 4, 5, 3, 1)
.collectMap(n -> n, n -> n + 100)
.subscribe(System.out::println);
Flux#collectMultimap
collectMultimap 與 collectMap 的區(qū)別在于库正,map 中的 value 類型不同,一個是集合厘唾,一個是元素褥符。 collectMultimap 對于流中出現(xiàn)重復(fù)的 key 的 value,加入到了集合中阅嘶,而 collectMap 做了替換属瓣。在這點上,reactor 不如 Java Stream 中的 Collectors#toMap 方法讯柔,沒有提供 key 重復(fù)時的合并函數(shù)抡蛙。也提供了 3 個重載方法。
<K> Mono<Map<K,Collection<T>>> collectMultimap(Function<? super T,? extends K> keyExtractor);
<K,V> Mono<Map<K,Collection<V>>> collectMultimap(Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor);
<K,V> Mono<Map<K,Collection<V>>> collectMultimap(Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor,
Supplier<Map<K,Collection<V>>> mapSupplier)
見圖知意:
Flux.just(1, 2, 3, 4, 5, 3, 1)
.collectMultimap(n -> n, n -> n + 100)
.subscribe(System.out::println);
Flux#collectSortedList
將 Flux 發(fā)出的元素在完成時進行排序魂迄,之后由 Mono 發(fā)出粗截。Flux 提供了 2 個重載方法:
Mono<List<T>> collectSortedList();
Mono<List<T>> collectSortedList(@Nullable Comparator<? super T> comparator);
見圖知意:
Flux.just(1, 3, 5, 3, 2, 5, 1, 4)
.collectSortedList()
.subscribe(System.out::println);
總結(jié)
本篇我們介紹了 Reactor 操作符的分類,之后介紹了部分轉(zhuǎn)換類操作符捣炬,講解示例時都是單個操作符熊昌,相信大家都能理解。
今天的內(nèi)容就學(xué)到這里湿酸,我們下篇繼續(xù)學(xué)習(xí) Reactor 的操作符婿屹。
源碼詳見:https://github.com/crystalxmumu/spring-web-flux-study-note 下 02-reactor-core-learning
模塊下 ReactorTransformOperatorTest 測試類。