反應式編程(Reactive Programming)這種新的編程范式越來越受到開發(fā)人員的歡迎寥掐。在 Java 社區(qū)中比較流行的是 RxJava 和 RxJava 2斗塘。本文要介紹的是另外一個新的反應式編程庫 Reactor雾鬼。
反應式編程介紹
反應式編程來源于數(shù)據(jù)流和變化的傳播葱轩,意味著由底層的執(zhí)行模型負責通過數(shù)據(jù)流來自動傳播變化媚值。比如求值一個簡單的表達式 c=a+b狠毯,當 a 或者 b 的值發(fā)生變化時,傳統(tǒng)的編程范式需要對 a+b 進行重新計算來得到 c 的值褥芒。如果使用反應式編程嚼松,當 a 或者 b 的值發(fā)生變化時,c 的值會自動更新锰扶。反應式編程最早由 .NET 平臺上的 Reactive Extensions (Rx) 庫來實現(xiàn)献酗。后來遷移到 Java 平臺之后就產(chǎn)生了著名的 RxJava 庫,并產(chǎn)生了很多其他編程語言上的對應實現(xiàn)坷牛。在這些實現(xiàn)的基礎上產(chǎn)生了后來的反應式流(Reactive Streams)規(guī)范凌摄。該規(guī)范定義了反應式流的相關接口,并將集成到 Java 9 中漓帅。
在傳統(tǒng)的編程范式中,我們一般通過迭代器(Iterator)模式來遍歷一個序列痴怨。這種遍歷方式是由調用者來控制節(jié)奏的忙干,采用的是拉的方式。每次由調用者通過 next()方法來獲取序列中的下一個值浪藻。使用反應式流時采用的則是推的方式捐迫,即常見的發(fā)布者-訂閱者模式。當發(fā)布者有新的數(shù)據(jù)產(chǎn)生時爱葵,這些數(shù)據(jù)會被推送到訂閱者來進行處理施戴。在反應式流上可以添加各種不同的操作來對數(shù)據(jù)進行處理,形成數(shù)據(jù)處理鏈萌丈。這個以聲明式的方式添加的處理鏈只在訂閱者進行訂閱操作時才會真正執(zhí)行赞哗。
反應式流中第一個重要概念是負壓(backpressure)。在基本的消息推送模式中辆雾,當消息發(fā)布者產(chǎn)生數(shù)據(jù)的速度過快時肪笋,會使得消息訂閱者的處理速度無法跟上產(chǎn)生的速度,從而給訂閱者造成很大的壓力度迂。當壓力過大時藤乙,有可能造成訂閱者本身的奔潰,所產(chǎn)生的級聯(lián)效應甚至可能造成整個系統(tǒng)的癱瘓惭墓。負壓的作用在于提供一種從訂閱者到生產(chǎn)者的反饋渠道坛梁。訂閱者可以通過 request()方法來聲明其一次所能處理的消息數(shù)量,而生產(chǎn)者就只會產(chǎn)生相應數(shù)量的消息腊凶,直到下一次 request()方法調用划咐。這實際上變成了推拉結合的模式拴念。
Reactor 簡介
前面提到的 RxJava 庫是 JVM 上反應式編程的先驅,也是反應式流規(guī)范的基礎尖殃。RxJava 2 在 RxJava 的基礎上做了很多的更新丈莺。不過 RxJava 庫也有其不足的地方。RxJava 產(chǎn)生于反應式流規(guī)范之前送丰,雖然可以和反應式流的接口進行轉換缔俄,但是由于底層實現(xiàn)的原因,使用起來并不是很直觀器躏。RxJava 2 在設計和實現(xiàn)時考慮到了與規(guī)范的整合俐载,不過為了保持與 RxJava 的兼容性,很多地方在使用時也并不直觀登失。Reactor 則是完全基于反應式流規(guī)范設計和實現(xiàn)的庫遏佣,沒有 RxJava 那樣的歷史包袱,在使用上更加的直觀易懂揽浙。Reactor 也是 Spring 5 中反應式編程的基礎状婶。學習和掌握 Reactor 可以更好地理解 Spring 5 中的相關概念。
在 Java 程序中使用 Reactor 庫非常的簡單馅巷,只需要通過 Maven 或 Gradle 來添加對 io.projectreactor:reactor-core 的依賴即可膛虫,目前的版本是 3.0.5.RELEASE。
Flux 和 Mono
Flux 和 Mono 是 Reactor 中的兩個基本概念钓猬。Flux 表示的是包含 0 到 N 個元素的異步序列稍刀。在該序列中可以包含三種不同類型的消息通知:正常的包含元素的消息、序列結束的消息和序列出錯的消息敞曹。當消息通知產(chǎn)生時账月,訂閱者中對應的方法 onNext(), onComplete()和 onError()會被調用。Mono 表示的是包含 0 或者 1 個元素的異步序列澳迫。該序列中同樣可以包含與 Flux 相同的三種類型的消息通知局齿。Flux 和 Mono 之間可以進行轉換。對一個 Flux 序列進行計數(shù)操作纲刀,得到的結果是一個 Mono對象项炼。把兩個 Mono 序列合并在一起,得到的是一個 Flux 對象示绊。
創(chuàng)建 Flux
有多種不同的方式可以創(chuàng)建 Flux 序列锭部。
Flux 類的靜態(tài)方法
第一種方式是通過 Flux 類中的靜態(tài)方法。
just():可以指定序列中包含的全部元素面褐。創(chuàng)建出來的 Flux 序列在發(fā)布這些元素之后會自動結束拌禾。
fromArray(),fromIterable()和 fromStream():可以從一個數(shù)組展哭、Iterable 對象或 Stream 對象中創(chuàng)建 Flux 對象湃窍。
empty():創(chuàng)建一個不包含任何元素闻蛀,只發(fā)布結束消息的序列。
error(Throwable error):創(chuàng)建一個只包含錯誤消息的序列您市。
never():創(chuàng)建一個不包含任何消息通知的序列觉痛。
range(int start, int count):創(chuàng)建包含從 start 起始的 count 個數(shù)量的 Integer 對象的序列。
interval(Duration period)和 interval(Duration delay, Duration period):創(chuàng)建一個包含了從 0 開始遞增的 Long 對象的序列茵休。其中包含的元素按照指定的間隔來發(fā)布薪棒。除了間隔時間之外,還可以指定起始元素發(fā)布之前的延遲時間榕莺。
intervalMillis(long period)和 intervalMillis(long delay, long period):與 interval()方法的作用相同俐芯,只不過該方法通過毫秒數(shù)來指定時間間隔和延遲時間。
代碼清單 1 中給出了上述這些方法的使用示例钉鸯。
清單 1. 通過 Flux 類的靜態(tài)方法創(chuàng)建 Flux 序列
1
2
3
4
5
6
Flux.just("Hello", "World").subscribe(System.out::println);
Flux.fromArray(new Integer[] {1, 2, 3}).subscribe(System.out::println);
Flux.empty().subscribe(System.out::println);
Flux.range(1, 10).subscribe(System.out::println);
Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);
Flux.intervalMillis(1000).subscribe(System.out::println);
上面的這些靜態(tài)方法適合于簡單的序列生成吧史,當序列的生成需要復雜的邏輯時,則應該使用 generate() 或 create() 方法唠雕。
generate()方法
generate()方法通過同步和逐一的方式來產(chǎn)生 Flux 序列贸营。序列的產(chǎn)生是通過調用所提供的 SynchronousSink 對象的 next(),complete()和 error(Throwable)方法來完成的岩睁。逐一生成的含義是在具體的生成邏輯中莽使,next()方法只能最多被調用一次。在有些情況下笙僚,序列的生成可能是有狀態(tài)的,需要用到某些狀態(tài)對象灵再。此時可以使用 generate()方法的另外一種形式 generate(Callable stateSupplier, BiFunction,S> generator)肋层,其中 stateSupplier 用來提供初始的狀態(tài)對象。在進行序列生成時翎迁,狀態(tài)對象會作為 generator 使用的第一個參數(shù)傳入栋猖,可以在對應的邏輯中對該狀態(tài)對象進行修改以供下一次生成時使用。
在代碼清單 2中汪榔,第一個序列的生成邏輯中通過 next()方法產(chǎn)生一個簡單的值蒲拉,然后通過 complete()方法來結束該序列。如果不調用 complete()方法痴腌,所產(chǎn)生的是一個無限序列雌团。第二個序列的生成邏輯中的狀態(tài)對象是一個 ArrayList 對象。實際產(chǎn)生的值是一個隨機數(shù)士聪。產(chǎn)生的隨機數(shù)被添加到 ArrayList 中锦援。當產(chǎn)生了 10 個數(shù)時,通過 complete()方法來結束序列剥悟。
清單 2. 使用 generate()方法生成 Flux 序列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Flux.generate(sink -> {
sink.next("Hello");
sink.complete();
}).subscribe(System.out::println);
final Random random = new Random();
Flux.generate(ArrayList::new, (list, sink) -> {
int value = random.nextInt(100);
list.add(value);
sink.next(value);
if (list.size() == 10) {
sink.complete();
}
return list;
}).subscribe(System.out::println);
create()方法
create()方法與 generate()方法的不同之處在于所使用的是 FluxSink 對象灵寺。FluxSink 支持同步和異步的消息產(chǎn)生曼库,并且可以在一次調用中產(chǎn)生多個元素。在代碼清單 3 中略板,在一次調用中就產(chǎn)生了全部的 10 個元素毁枯。
清單 3. 使用 create()方法生成 Flux 序列
1
2
3
4
5
6
Flux.create(sink -> {
for (int i = 0; i < 10; i++) {
sink.next(i);
}
sink.complete();
}).subscribe(System.out::println);
創(chuàng)建 Mono
Mono 的創(chuàng)建方式與之前介紹的 Flux 比較相似。Mono 類中也包含了一些與 Flux 類中相同的靜態(tài)方法叮称。這些方法包括 just()种玛,empty(),error()和 never()等颅拦。除了這些方法之外蒂誉,Mono 還有一些獨有的靜態(tài)方法。
fromCallable()距帅、fromCompletionStage()右锨、fromFuture()、fromRunnable()和 fromSupplier():分別從 Callable碌秸、CompletionStage绍移、CompletableFuture、Runnable 和 Supplier 中創(chuàng)建 Mono讥电。
delay(Duration duration)和 delayMillis(long duration):創(chuàng)建一個 Mono 序列蹂窖,在指定的延遲時間之后,產(chǎn)生數(shù)字 0 作為唯一值恩敌。
ignoreElements(Publisher source):創(chuàng)建一個 Mono 序列瞬测,忽略作為源的 Publisher 中的所有元素,只產(chǎn)生結束消息纠炮。
justOrEmpty(Optional data)和 justOrEmpty(T data):從一個 Optional 對象或可能為 null 的對象中創(chuàng)建 Mono月趟。只有 Optional 對象中包含值或對象不為 null 時,Mono 序列才產(chǎn)生對應的元素恢口。
還可以通過 create()方法來使用 MonoSink 來創(chuàng)建 Mono孝宗。代碼清單 4 中給出了創(chuàng)建 Mono 序列的示例。
清單 4. 創(chuàng)建 Mono 序列
1
2
3
Mono.fromSupplier(() -> "Hello").subscribe(System.out::println);
Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println);
Mono.create(sink -> sink.success("Hello")).subscribe(System.out::println);
操作符
和 RxJava 一樣耕肩,Reactor 的強大之處在于可以在反應式流上通過聲明式的方式添加多種不同的操作符因妇。下面對其中重要的操作符進行分類介紹。
buffer 和 bufferTimeout
這兩個操作符的作用是把當前流中的元素收集到集合中猿诸,并把集合對象作為流中的新元素婚被。在進行收集時可以指定不同的條件:所包含的元素的最大數(shù)量或收集的時間間隔。方法 buffer()僅使用一個條件梳虽,而 bufferTimeout()可以同時指定兩個條件摔寨。指定時間間隔時可以使用 Duration 對象或毫秒數(shù),即使用 bufferMillis()或 bufferTimeoutMillis()兩個方法怖辆。
除了元素數(shù)量和時間間隔之外是复,還可以通過 bufferUntil 和 bufferWhile 操作符來進行收集删顶。這兩個操作符的參數(shù)是表示每個集合中的元素所要滿足的條件的 Predicate 對象。bufferUntil 會一直收集直到 Predicate 返回為 true淑廊。使得 Predicate 返回 true 的那個元素可以選擇添加到當前集合或下一個集合中逗余;bufferWhile 則只有當 Predicate 返回 true 時才會收集。一旦值為 false季惩,會立即開始下一次收集录粱。
代碼清單 5 給出了 buffer 相關操作符的使用示例。第一行語句輸出的是 5 個包含 20 個元素的數(shù)組画拾;第二行語句輸出的是 2 個包含了 10 個元素的數(shù)組啥繁;第三行語句輸出的是 5 個包含 2 個元素的數(shù)組。每當遇到一個偶數(shù)就會結束當前的收集青抛;第四行語句輸出的是 5 個包含 1 個元素的數(shù)組旗闽,數(shù)組里面包含的只有偶數(shù)。
需要注意的是蜜另,在代碼清單 5 中适室,首先通過 toStream()方法把 Flux 序列轉換成 Java 8 中的 Stream 對象,再通過 forEach()方法來進行輸出举瑰。這是因為序列的生成是異步的捣辆,而轉換成 Stream 對象可以保證主線程在序列生成完成之前不會退出,從而可以正確地輸出序列中的所有元素此迅。
清單 5. buffer 相關操作符的使用示例
1
2
3
4
Flux.range(1, 100).buffer(20).subscribe(System.out::println);
Flux.intervalMillis(100).bufferMillis(1001).take(2).toStream().forEach(System.out::println);
Flux.range(1, 10).bufferUntil(i -> i % 2 == 0).subscribe(System.out::println);
Flux.range(1, 10).bufferWhile(i -> i % 2 == 0).subscribe(System.out::println);
filter
對流中包含的元素進行過濾汽畴,只留下滿足 Predicate 指定條件的元素。代碼清單 6 中的語句輸出的是 1 到 10 中的所有偶數(shù)耸序。
清單 6. filter 操作符使用示例
1Flux.range(1, 10).filter(i -> i % 2 == 0).subscribe(System.out::println);
window
window 操作符的作用類似于 buffer整袁,所不同的是 window 操作符是把當前流中的元素收集到另外的 Flux 序列中,因此返回值類型是 Flux>佑吝。在代碼清單 7 中,兩行語句的輸出結果分別是 5 個和 2 個 UnicastProcessor 字符绳匀。這是因為 window 操作符所產(chǎn)生的流中包含的是 UnicastProcessor 類的對象芋忿,而 UnicastProcessor 類的 toString 方法輸出的就是 UnicastProcessor 字符。
清單 7. window 操作符使用示例
1
2
Flux.range(1, 100).window(20).subscribe(System.out::println);
Flux.intervalMillis(100).windowMillis(1001).take(2).toStream().forEach(System.out::println);
zipWith
zipWith 操作符把當前流中的元素與另外一個流中的元素按照一對一的方式進行合并疾棵。在合并時可以不做任何處理戈钢,由此得到的是一個元素類型為 Tuple2 的流;也可以通過一個 BiFunction 函數(shù)對合并的元素進行處理是尔,所得到的流的元素類型為該函數(shù)的返回值殉了。
在代碼清單 8 中,兩個流中包含的元素分別是 a拟枚,b 和 c薪铜,d众弓。第一個 zipWith 操作符沒有使用合并函數(shù),因此結果流中的元素類型為 Tuple2隔箍;第二個 zipWith 操作通過合并函數(shù)把元素類型變?yōu)?String谓娃。
清單 8. zipWith 操作符使用示例
1
2
3
4
5
6
Flux.just("a", "b")
.zipWith(Flux.just("c", "d"))
.subscribe(System.out::println);
Flux.just("a", "b")
.zipWith(Flux.just("c", "d"), (s1, s2) -> String.format("%s-%s", s1, s2))
.subscribe(System.out::println);
take
take 系列操作符用來從當前流中提取元素。提取的方式可以有很多種蜒滩。
take(long n)滨达,take(Duration timespan)和 takeMillis(long timespan):按照指定的數(shù)量或時間間隔來提取。
takeLast(long n):提取流中的最后 N 個元素俯艰。
takeUntil(Predicate predicate):提取元素直到 Predicate 返回 true捡遍。
takeWhile(Predicate continuePredicate): 當 Predicate 返回 true 時才進行提取。
takeUntilOther(Publisher other):提取元素直到另外一個流開始產(chǎn)生元素竹握。
在代碼清單 9 中画株,第一行語句輸出的是數(shù)字 1 到 10;第二行語句輸出的是數(shù)字 991 到 1000涩搓;第三行語句輸出的是數(shù)字 1 到 9污秆;第四行語句輸出的是數(shù)字 1 到 10,使得 Predicate 返回 true 的元素也是包含在內的昧甘。
清單 9. take 系列操作符使用示例
1
2
3
4
Flux.range(1, 1000).take(10).subscribe(System.out::println);
Flux.range(1, 1000).takeLast(10).subscribe(System.out::println);
Flux.range(1, 1000).takeWhile(i -> i <10).subscribe(System.out::println);
Flux.range(1, 1000).takeUntil(i -> i == 10).subscribe(System.out::println);
reduce 和 reduceWith
reduce 和 reduceWith 操作符對流中包含的所有元素進行累積操作良拼,得到一個包含計算結果的 Mono 序列。累積操作是通過一個 BiFunction 來表示的充边。在操作時可以指定一個初始值庸推。如果沒有初始值,則序列的第一個元素作為初始值浇冰。
在代碼清單 10 中贬媒,第一行語句對流中的元素進行相加操作,結果為 5050肘习;第二行語句同樣也是進行相加操作际乘,不過通過一個 Supplier 給出了初始值為 100,所以結果為 5150漂佩。
清單 10. reduce 和 reduceWith 操作符使用示例
1
2
Flux.range(1, 100).reduce((x, y) -> x + y).subscribe(System.out::println);
Flux.range(1, 100).reduceWith(() -> 100, (x, y) -> x + y).subscribe(System.out::println);
merge 和 mergeSequential
merge 和 mergeSequential 操作符用來把多個流合并成一個 Flux 序列脖含。不同之處在于 merge 按照所有流中元素的實際產(chǎn)生順序來合并,而 mergeSequential 則按照所有流被訂閱的順序投蝉,以流為單位進行合并养葵。
代碼清單 11 中分別使用了 merge 和 mergeSequential 操作符。進行合并的流都是每隔 100 毫秒產(chǎn)生一個元素瘩缆,不過第二個流中的每個元素的產(chǎn)生都比第一個流要延遲 50 毫秒关拒。在使用 merge 的結果流中,來自兩個流的元素是按照時間順序交織在一起;而使用 mergeSequential 的結果流則是首先產(chǎn)生第一個流中的全部元素着绊,再產(chǎn)生第二個流中的全部元素谐算。
清單 11. merge 和 mergeSequential 操作符使用示例
1
2
3
4
5
6
Flux.merge(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5))
.toStream()
.forEach(System.out::println);
Flux.mergeSequential(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5))
.toStream()
.forEach(System.out::println);
flatMap 和 flatMapSequential
flatMap 和 flatMapSequential 操作符把流中的每個元素轉換成一個流,再把所有流中的元素進行合并畔柔。flatMapSequential 和 flatMap 之間的區(qū)別與 mergeSequential 和 merge 之間的區(qū)別是一樣的氯夷。
在代碼清單 12 中,流中的元素被轉換成每隔 100 毫秒產(chǎn)生的數(shù)量不同的流靶擦,再進行合并腮考。由于第一個流中包含的元素數(shù)量較少,所以在結果流中一開始是兩個流的元素交織在一起玄捕,然后就只有第二個流中的元素踩蔚。
清單 12. flatMap 操作符使用示例
1
2
3
4
Flux.just(5, 10)
.flatMap(x -> Flux.intervalMillis(x * 10, 100).take(x))
.toStream()
.forEach(System.out::println);
concatMap
concatMap 操作符的作用也是把流中的每個元素轉換成一個流,再把所有流進行合并枚粘。與 flatMap 不同的是馅闽,concatMap 會根據(jù)原始流中的元素順序依次把轉換之后的流進行合并;與 flatMapSequential 不同的是馍迄,concatMap 對轉換之后的流的訂閱是動態(tài)進行的福也,而 flatMapSequential 在合并之前就已經(jīng)訂閱了所有的流。
代碼清單 13 與代碼清單 12 類似攀圈,只不過把 flatMap 換成了 concatMap暴凑,結果流中依次包含了第一個流和第二個流中的全部元素。
清單 13. concatMap 操作符使用示例
1
2
3
4
Flux.just(5, 10)
.concatMap(x -> Flux.intervalMillis(x * 10, 100).take(x))
.toStream()
.forEach(System.out::println);
combineLatest
combineLatest 操作符把所有流中的最新產(chǎn)生的元素合并成一個新的元素赘来,作為返回結果流中的元素现喳。只要其中任何一個流中產(chǎn)生了新的元素,合并操作就會被執(zhí)行一次犬辰,結果流中就會產(chǎn)生新的元素嗦篱。在 代碼清單 14 中,流中最新產(chǎn)生的元素會被收集到一個數(shù)組中幌缝,通過 Arrays.toString 方法來把數(shù)組轉換成 String灸促。
清單 14. combineLatest 操作符使用示例
1
2
3
4
5
Flux.combineLatest(
Arrays::toString,
Flux.intervalMillis(100).take(5),
Flux.intervalMillis(50, 100).take(5)
).toStream().forEach(System.out::println);
消息處理
當需要處理 Flux 或 Mono 中的消息時,如之前的代碼清單所示涵卵,可以通過 subscribe 方法來添加相應的訂閱邏輯浴栽。在調用 subscribe 方法時可以指定需要處理的消息類型≡迪幔可以只處理其中包含的正常消息,也可以同時處理錯誤消息和完成消息甩挫。代碼清單 15 中通過 subscribe()方法同時處理了正常消息和錯誤消息贴硫。
清單 15. 通過 subscribe()方法處理正常和錯誤消息
1
2
3
Flux.just(1, 2)
.concatWith(Mono.error(new IllegalStateException()))
.subscribe(System.out::println, System.err::println);
正常的消息處理相對簡單。當出現(xiàn)錯誤時,有多種不同的處理策略英遭。第一種策略是通過 onErrorReturn()方法返回一個默認值间护。在代碼清單 16 中,當出現(xiàn)錯誤時挖诸,流會產(chǎn)生默認值 0.
清單 16. 出現(xiàn)錯誤時返回默認值
1
2
3
4
Flux.just(1, 2)
.concatWith(Mono.error(new IllegalStateException()))
.onErrorReturn(0)
.subscribe(System.out::println);
第二種策略是通過 switchOnError()方法來使用另外的流來產(chǎn)生元素汁尺。在代碼清單 17 中,當出現(xiàn)錯誤時多律,將產(chǎn)生 Mono.just(0)對應的流痴突,也就是數(shù)字 0。
清單 17. 出現(xiàn)錯誤時使用另外的流
1
2
3
4
Flux.just(1, 2)
.concatWith(Mono.error(new IllegalStateException()))
.switchOnError(Mono.just(0))
.subscribe(System.out::println);
第三種策略是通過 onErrorResumeWith()方法來根據(jù)不同的異常類型來選擇要使用的產(chǎn)生元素的流狼荞。在代碼清單 18 中辽装,根據(jù)異常類型來返回不同的流作為出現(xiàn)錯誤時的數(shù)據(jù)來源。因為異常的類型為 IllegalArgumentException相味,所產(chǎn)生的元素為-1拾积。
清單 18. 出現(xiàn)錯誤時根據(jù)異常類型來選擇流
1
2
3
4
5
6
7
8
9
10
11
Flux.just(1, 2)
.concatWith(Mono.error(new IllegalArgumentException()))
.onErrorResumeWith(e -> {
if (e instanceof IllegalStateException) {
return Mono.just(0);
} else if (e instanceof IllegalArgumentException) {
return Mono.just(-1);
}
return Mono.empty();
})
.subscribe(System.out::println);
當出現(xiàn)錯誤時,還可以通過 retry 操作符來進行重試丰涉。重試的動作是通過重新訂閱序列來實現(xiàn)的拓巧。在使用 retry 操作符時可以指定重試的次數(shù)。代碼清單 19 中指定了重試次數(shù)為 1一死,所輸出的結果是 1肛度,2,1摘符,2 和錯誤信息贤斜。
清單 19. 使用 retry 操作符進行重試
1
2
3
4
Flux.just(1, 2)
.concatWith(Mono.error(new IllegalStateException()))
.retry(1)
.subscribe(System.out::println);
調度器
前面介紹了反應式流和在其上可以進行的各種操作,通過調度器(Scheduler)可以指定這些操作執(zhí)行的方式和所在的線程逛裤。有下面幾種不同的調度器實現(xiàn)运褪。
當前線程,通過 Schedulers.immediate()方法來創(chuàng)建划纽。
單一的可復用的線程结洼,通過 Schedulers.single()方法來創(chuàng)建。
使用彈性的線程池蝙砌,通過 Schedulers.elastic()方法來創(chuàng)建阳堕。線程池中的線程是可以復用的。當所需要時择克,新的線程會被創(chuàng)建恬总。如果一個線程閑置太長時間,則會被銷毀肚邢。該調度器適用于 I/O 操作相關的流的處理壹堰。
使用對并行操作優(yōu)化的線程池拭卿,通過 Schedulers.parallel()方法來創(chuàng)建。其中的線程數(shù)量取決于 CPU 的核的數(shù)量贱纠。該調度器適用于計算密集型的流的處理峻厚。
使用支持任務調度的調度器,通過 Schedulers.timer()方法來創(chuàng)建谆焊。
從已有的 ExecutorService 對象中創(chuàng)建調度器惠桃,通過 Schedulers.fromExecutorService()方法來創(chuàng)建。
某些操作符默認就已經(jīng)使用了特定類型的調度器辖试。比如 intervalMillis()方法創(chuàng)建的流就使用了由 Schedulers.timer()創(chuàng)建的調度器辜王。通過 publishOn()和 subscribeOn()方法可以切換執(zhí)行操作的調度器。其中 publishOn()方法切換的是操作符的執(zhí)行方式剃执,而 subscribeOn()方法切換的是產(chǎn)生流中元素時的執(zhí)行方式誓禁。
在代碼清單 20 中,使用 create()方法創(chuàng)建一個新的 Flux 對象肾档,其中包含唯一的元素是當前線程的名稱摹恰。接著是兩對 publishOn()和 map()方法,其作用是先切換執(zhí)行時的調度器怒见,再把當前的線程名稱作為前綴添加俗慈。最后通過 subscribeOn()方法來改變流產(chǎn)生時的執(zhí)行方式。運行之后的結果是[elastic-2] [single-1] parallel-1遣耍。最內層的線程名字 parallel-1 來自產(chǎn)生流中元素時使用的 Schedulers.parallel()調度器闺阱,中間的線程名稱 single-1 來自第一個 map 操作之前的 Schedulers.single()調度器,最外層的線程名字 elastic-2 來自第二個 map 操作之前的 Schedulers.elastic()調度器舵变。
清單 20. 使用調度器切換操作符執(zhí)行方式
1
2
3
4
5
6
7
8
9
10
11
Flux.create(sink -> {
sink.next(Thread.currentThread().getName());
sink.complete();
})
.publishOn(Schedulers.single())
.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
.publishOn(Schedulers.elastic())
.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
.subscribeOn(Schedulers.parallel())
.toStream()
.forEach(System.out::println);
測試
在對使用 Reactor 的代碼進行測試時酣溃,需要用到 io.projectreactor.addons:reactor-test 庫。
使用 StepVerifier
進行測試時的一個典型的場景是對于一個序列纪隙,驗證其中所包含的元素是否符合預期赊豌。StepVerifier 的作用是可以對序列中包含的元素進行逐一驗證。在代碼清單 21 中绵咱,需要驗證的流中包含 a 和 b 兩個元素碘饼。通過 StepVerifier.create()方法對一個流進行包裝之后再進行驗證。expectNext()方法用來聲明測試時所期待的流中的下一個元素的值悲伶,而 verifyComplete()方法則驗證流是否正常結束艾恼。類似的方法還有 verifyError()來驗證流由于錯誤而終止。
清單 21. 使用 StepVerifier 驗證流中的元素
1
2
3
4
StepVerifier.create(Flux.just("a", "b"))
.expectNext("a")
.expectNext("b")
.verifyComplete();
操作測試時間
有些序列的生成是有時間要求的麸锉,比如每隔 1 分鐘才產(chǎn)生一個新的元素钠绍。在進行測試中,不可能花費實際的時間來等待每個元素的生成花沉。此時需要用到 StepVerifier 提供的虛擬時間功能柳爽。通過 StepVerifier.withVirtualTime()方法可以創(chuàng)建出使用虛擬時鐘的 StepVerifier纳寂。通過 thenAwait(Duration)方法可以讓虛擬時鐘前進。
在代碼清單 22 中泻拦,需要驗證的流中包含兩個產(chǎn)生間隔為一天的元素,并且第一個元素的產(chǎn)生延遲是 4 個小時忽媒。在通過 StepVerifier.withVirtualTime()方法包裝流之后争拐,expectNoEvent()方法用來驗證在 4 個小時之內沒有任何消息產(chǎn)生,然后驗證第一個元素 0 產(chǎn)生晦雨;接著 thenAwait()方法來讓虛擬時鐘前進一天架曹,然后驗證第二個元素 1 產(chǎn)生;最后驗證流正常結束闹瞧。
清單 22. 操作測試時間
1
2
3
4
5
6
7
StepVerifier.withVirtualTime(() -> Flux.interval(Duration.ofHours(4), Duration.ofDays(1)).take(2))
.expectSubscription()
.expectNoEvent(Duration.ofHours(4))
.expectNext(0L)
.thenAwait(Duration.ofDays(1))
.expectNext(1L)
.verifyComplete();
使用 TestPublisher
TestPublisher 的作用在于可以控制流中元素的產(chǎn)生绑雄,甚至是違反反應流規(guī)范的情況。在代碼清單 23 中奥邮,通過 create()方法創(chuàng)建一個新的 TestPublisher 對象万牺,然后使用 next()方法來產(chǎn)生元素,使用 complete()方法來結束流洽腺。TestPublisher 主要用來測試開發(fā)人員自己創(chuàng)建的操作符脚粟。
清單 23. 使用 TestPublisher 創(chuàng)建測試所用的流
1
2
3
4
5
6
7
8
9
final TestPublisher testPublisher = TestPublisher.create();
testPublisher.next("a");
testPublisher.next("b");
testPublisher.complete();
StepVerifier.create(testPublisher)
.expectNext("a")
.expectNext("b")
.expectComplete();
調試
由于反應式編程范式與傳統(tǒng)編程范式的差異性,使用 Reactor 編寫的代碼在出現(xiàn)問題時比較難進行調試蘸朋。為了更好的幫助開發(fā)人員進行調試核无,Reactor 提供了相應的輔助功能。
啟用調試模式
當需要獲取更多與流相關的執(zhí)行信息時藕坯,可以在程序開始的地方添加代碼清單 24 中的代碼來啟用調試模式团南。在調試模式啟用之后,所有的操作符在執(zhí)行時都會保存額外的與執(zhí)行鏈相關的信息炼彪。當出現(xiàn)錯誤時吐根,這些信息會被作為異常堆棧信息的一部分輸出。通過這些信息可以分析出具體是在哪個操作符的執(zhí)行中出現(xiàn)了問題霹购。
清單 24. 啟用調試模式
1Hooks.onOperator(providedHook -> providedHook.operatorStacktrace());
不過當調試模式啟用之后佑惠,記錄這些額外的信息是有代價的。一般只有在出現(xiàn)了錯誤之后齐疙,再考慮啟用調試模式膜楷。但是當為了找到問題而啟用了調試模式之后,之前的錯誤不一定能很容易重現(xiàn)出來贞奋。為了減少可能的開銷赌厅,可以限制只對特定類型的操作符啟用調試模式。
使用檢查點
另外一種做法是通過 checkpoint 操作符來對特定的流處理鏈來啟用調試模式轿塔。代碼清單 25 中特愿,在 map 操作符之后添加了一個名為 test 的檢查點仲墨。當出現(xiàn)錯誤時,檢查點名稱會出現(xiàn)在異常堆棧信息中揍障。對于程序中重要或者復雜的流處理鏈目养,可以在關鍵的位置上啟用檢查點來幫助定位可能存在的問題。
清單 25. 使用 checkpoint 操作符
1Flux.just(1, 0).map(x -> 1 / x).checkpoint("test").subscribe(System.out::println);
日志記錄
在開發(fā)和調試中的另外一項實用功能是把流相關的事件記錄在日志中毒嫡。這可以通過添加 log 操作符來實現(xiàn)癌蚁。在代碼清單 26 中,添加了 log 操作符并指定了日志分類的名稱兜畸。
清單 26. 使用 log 操作符記錄事件
1Flux.range(1, 2).log("Range").subscribe(System.out::println);
在實際的運行時努释,所產(chǎn)生的輸出如代碼清單 27 所示。
清單 27. log 操作符所產(chǎn)生的日志
1
2
3
4
5
6
7
8
13:07:56.735 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
13:07:56.751 [main] INFO Range - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
13:07:56.753 [main] INFO Range - | request(unbounded)
13:07:56.754 [main] INFO Range - | onNext(1)
1
13:07:56.754 [main] INFO Range - | onNext(2)
2
13:07:56.754 [main] INFO Range - | onComplete()
“冷”與“熱”序列
之前的代碼清單中所創(chuàng)建的都是冷序列咬摇。冷序列的含義是不論訂閱者在何時訂閱該序列伐蒂,總是能收到序列中產(chǎn)生的全部消息。而與之對應的熱序列肛鹏,則是在持續(xù)不斷地產(chǎn)生消息逸邦,訂閱者只能獲取到在其訂閱之后產(chǎn)生的消息。
在代碼清單 28 中在扰,原始的序列中包含 10 個間隔為 1 秒的元素昭雌。通過 publish()方法把一個 Flux 對象轉換成 ConnectableFlux 對象。方法 autoConnect()的作用是當 ConnectableFlux 對象有一個訂閱者時就開始產(chǎn)生消息健田。代碼 source.subscribe()的作用是訂閱該 ConnectableFlux 對象烛卧,讓其開始產(chǎn)生數(shù)據(jù)。接著當前線程睡眠 5 秒鐘妓局,第二個訂閱者此時只能獲得到該序列中的后 5 個元素总放,因此所輸出的是數(shù)字 5 到 9。
清單 28. 熱序列
1
2
3
4
5
6
7
8
9
final Flux source = Flux.intervalMillis(1000)
.take(10)
.publish()
.autoConnect();
source.subscribe();
Thread.sleep(5000);
source
.toStream()
.forEach(System.out::println);
小結
反應式編程范式對于習慣了傳統(tǒng)編程范式的開發(fā)人員來說好爬,既是一個需要進行思維方式轉變的挑戰(zhàn)局雄,也是一個充滿了更多可能的機會。Reactor 作為一個基于反應式流規(guī)范的新的 Java 庫存炮,可以作為反應式應用的基礎炬搭。本文對 Reactor 庫做了詳細的介紹,包括 Flux 和 Mono 序列的創(chuàng)建穆桂、常用操作符的使用宫盔、調度器、錯誤處理以及測試和調試技巧等享完。
參考資源 (resources)
參考 Reactor 的官方網(wǎng)站灼芭,了解 Reactor 的更多內容。
查看 Reactor 的用戶指南般又。
查看 InfoQ 上的Reactor by Example彼绷。
查看反應式流規(guī)范巍佑。
下載資源
有新評論時提醒我