Reactor 簡介
前面提到的 RxJava 庫是 JVM 上反應(yīng)式編程的先驅(qū)分蓖,也是反應(yīng)式流規(guī)范的基礎(chǔ)尔艇。RxJava 2 在 RxJava 的基礎(chǔ)上做了很多的更新。不過 RxJava 庫也有其不足的地方么鹤。RxJava 產(chǎn)生于反應(yīng)式流規(guī)范之前终娃,雖然可以和反應(yīng)式流的接口進行轉(zhuǎn)換,但是由于底層實現(xiàn)的原因蒸甜,使用起來并不是很直觀棠耕。RxJava 2 在設(shè)計和實現(xiàn)時考慮到了與規(guī)范的整合,不過為了保持與 RxJava 的兼容性柠新,很多地方在使用時也并不直觀窍荧。Reactor 則是完全基于反應(yīng)式流規(guī)范設(shè)計和實現(xiàn)的庫,沒有 RxJava 那樣的歷史包袱登颓,在使用上更加的直觀易懂顶瞒。Reactor 也是 Spring 5 中反應(yīng)式編程的基礎(chǔ)香璃。學(xué)習(xí)和掌握 Reactor 可以更好地理解 Spring 5 中的相關(guān)概念尤筐。
Reactor是JVM的完全非阻塞反應(yīng)式編程基礎(chǔ)掠河,具有高效的需求管理(以管理“背壓”的形式)颠印。它直接與Java 8功能的API瘤载,特別是整合CompletableFuture信夫,Stream和 Duration塞栅。它提供了可組合的異步序列API Flux(用于[N]元素)和Mono(用于[0 | 1]元素),廣泛地實現(xiàn)了Reactive Extensions規(guī)范放椰。這段的重點是和Java8結(jié)合利用lambda表達式簡潔的優(yōu)點。
使用
在 Reactor 中砾医,經(jīng)常使用的類并不是很多拿撩,主要有以下兩個:
Mono 實現(xiàn)了 org.reactivestreams.Publisher 接口如蚜,代表0到1個元素的發(fā)布者。
Flux 同樣實現(xiàn)了 org.reactivestreams.Publisher 接口错邦,代表0到N個元素的發(fā)表者探赫。
Scheduler 表示背后驅(qū)動反應(yīng)式流的調(diào)度器,通常由各種線程池實現(xiàn)撬呢。
創(chuàng)建 Flux
有多種不同的方式可以創(chuàng)建 Flux 序列。
Flux 類的靜態(tài)方法
第一種方式是通過 Flux 類中的靜態(tài)方法讨勤。
just():可以指定序列中包含的全部元素。創(chuàng)建出來的 Flux 序列在發(fā)布這些元素之后會自動結(jié)束潭千。
fromArray()刨晴,fromIterable()和 fromStream():可以從一個數(shù)組屉来、Iterable 對象或 Stream 對象中創(chuàng)建 Flux 對象茄靠。
empty():創(chuàng)建一個不包含任何元素蝶桶,只發(fā)布結(jié)束消息的序列。
error(Throwable error):創(chuàng)建一個只包含錯誤消息的序列脐雪。
never():創(chuàng)建一個不包含任何消息通知的序列恢共。
range(int start, int count):創(chuàng)建包含從 start 起始的 count 個數(shù)量的 Integer 對象的序列。
interval(Duration period)和 interval(Duration delay, Duration period):創(chuàng)建一個包含了從 0 開始遞增的 Long 對象的序列脂信。其中包含的元素按照指定的間隔來發(fā)布透硝。除了間隔時間之外,還可以指定起始元素發(fā)布之前的延遲時間濒生。
intervalMillis(long period)和 intervalMillis(long delay, long period):與 interval()方法的作用相同,只不過該方法通過毫秒數(shù)來指定時間間隔和延遲時間秋泄。
Flux.just("Hello", "World").subscribe(System.out::println);
Flux.fromArray(new Integer[] {1, 2, 3}).subscribe(System.out::println);
Flux.empty().subscribe(System.out::println);
Flux.range(1, 10).subscribe(System.out::println);
Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);
Flux.intervalMillis(1000).subscribe(System.out::println);
上面的這些靜態(tài)方法適合于簡單的序列生成规阀,當(dāng)序列的生成需要復(fù)雜的邏輯時,則應(yīng)該使用 generate() 或 create() 方法歧胁。
//通過generate生成元素
Flux.generate(sink -> {
sink.next("Hello");
sink.complete();
}).subscribe(System.out::println);
final Random random = new Random();
Flux.generate(ArrayList::new, (list, sink) -> {
int value = random.nextInt(100);
list.add(value);
sink.next(value);
if (list.size() == 10) {
sink.complete();
}
return list;
}).subscribe(System.out::println);
//通過create生成元素
Flux.create(sink -> {
for (int i = 0; i < 10; i++) {
sink.next(i);
}
sink.complete();
}).subscribe(System.out::println);
Mono 的創(chuàng)建
Mono 的創(chuàng)建方式與之前介紹的 Flux 比較相似喊巍。Mono 類中也包含了一些與 Flux 類中相同的靜態(tài)方法。這些方法包括 just()呵曹,empty()何暮,error()和 never()等。除了這些方法之外跨新,Mono 還有一些獨有的靜態(tài)方法坏逢。
fromCallable()、fromCompletionStage()肖揣、fromFuture()贰盗、fromRunnable()和 fromSupplier():分別從 Callable舵盈、CompletionStage球化、CompletableFuture、Runnable 和 Supplier 中創(chuàng)建 Mono赴蝇。
delay(Duration duration)和 delayMillis(long duration):創(chuàng)建一個 Mono 序列巢掺,在指定的延遲時間之后,產(chǎn)生數(shù)字 0 作為唯一值考余。
ignoreElements(Publisher<T> source):創(chuàng)建一個 Mono 序列轧苫,忽略作為源的 Publisher 中的所有元素,只產(chǎn)生結(jié)束消息身冬。
justOrEmpty(Optional<? extends T> data)和 justOrEmpty(T data):從一個 Optional 對象或可能為 null 的對象中創(chuàng)建 Mono酥筝。只有 Optional 對象中包含值或?qū)ο蟛粸?null 時,Mono 序列才產(chǎn)生對應(yīng)的元素哀九。
還可以通過 create()方法來使用 MonoSink 來創(chuàng)建 Mono搅幅。代碼清單 4 中給出了創(chuàng)建 Mono 序列的示例。
Mono.fromSupplier(() -> "Hello").subscribe(System.out::println);
Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println);
Mono.create(sink -> sink.success("Hello")).subscribe(System.out::println);
操作符
filter:
對流中包含的元素進行過濾息裸,只留下滿足 Predicate 指定條件的元素
Flux.range(1, 10).filter(i -> i % 2 == 0).subscribe(System.out::println);
zipWith:
zipWith 操作符把當(dāng)前流中的元素與另外一個流中的元素按照一對一的方式進行合并沪编。
Flux.just("a", "b")
.zipWith(Flux.just("c", "d"))
.subscribe(System.out::println);
Flux.just("a", "b")
.zipWith(Flux.just("c", "d"), (s1, s2) -> String.format("%s-%s", s1, s2))
.subscribe(System.out::println);
reduce 和 reduceWith:
reduce 和 reduceWith 操作符對流中包含的所有元素進行累積操作蚁廓,得到一個包含計算結(jié)果的 Mono 序列。累積操作是通過一個 BiFunction 來表示的腿时。在操作時可以指定一個初始值饭宾。如果沒有初始值看铆,則序列的第一個元素作為初始值。
Flux.range(1, 100).reduce((x, y) -> x + y).subscribe(System.out::println);
Flux.range(1, 100).reduceWith(() -> 100, (x, y) -> x + y).subscribe(System.out::println);
flatMap 和 flatMapSequential
flatMap 和 flatMapSequential 操作符把流中的每個元素轉(zhuǎn)換成一個流否淤,再把所有流中的元素進行合并棠隐。flatMapSequential 和 flatMap 之間的區(qū)別與 mergeSequential 和 merge 之間的區(qū)別是一樣的。
Flux.just(5, 10)
.flatMap(x -> Flux.intervalMillis(x * 10, 100).take(x))
.toStream()
.forEach(System.out::println);
調(diào)度器
前面介紹了反應(yīng)式流和在其上可以進行的各種操作汁雷,通過調(diào)度器(Scheduler)可以指定這些操作執(zhí)行的方式和所在的線程。有下面幾種不同的調(diào)度器實現(xiàn)溜嗜。
當(dāng)前線程,通過 Schedulers.immediate()方法來創(chuàng)建裹匙。
單一的可復(fù)用的線程,通過 Schedulers.single()方法來創(chuàng)建徽曲。
使用彈性的線程池哪工,通過 Schedulers.elastic()方法來創(chuàng)建偎捎。線程池中的線程是可以復(fù)用的程奠。當(dāng)所需要時距境,新的線程會被創(chuàng)建。如果一個線程閑置太長時間,則會被銷毀矾利。該調(diào)度器適用于 I/O 操作相關(guān)的流的處理察皇。
使用對并行操作優(yōu)化的線程池稻爬,通過 Schedulers.parallel()方法來創(chuàng)建翠肘。其中的線程數(shù)量取決于 CPU 的核的數(shù)量晚碾。該調(diào)度器適用于計算密集型的流的處理。
使用支持任務(wù)調(diào)度的調(diào)度器苗膝,通過 Schedulers.timer()方法來創(chuàng)建。
從已有的 ExecutorService 對象中創(chuàng)建調(diào)度器亥鬓,通過 Schedulers.fromExecutorService()方法來創(chuàng)建域庇。
某些操作符默認就已經(jīng)使用了特定類型的調(diào)度器较剃。比如 intervalMillis()方法創(chuàng)建的流就使用了由 Schedulers.timer()創(chuàng)建的調(diào)度器。通過 publishOn()和 subscribeOn()方法可以切換執(zhí)行操作的調(diào)度器。其中 publishOn()方法切換的是操作符的執(zhí)行方式啊送,而 subscribeOn()方法切換的是產(chǎn)生流中元素時的執(zhí)行方式。
使用調(diào)度器切換操作符執(zhí)行方式:
Flux.create(sink -> {
sink.next(Thread.currentThread().getName());
sink.complete();
})
.publishOn(Schedulers.single())
.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
.publishOn(Schedulers.elastic())
.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
.subscribeOn(Schedulers.parallel())
.toStream()
.forEach(System.out::println);
在以上代碼中昔逗,使用 create()方法創(chuàng)建一個新的 Flux 對象篷朵,其中包含唯一的元素是當(dāng)前線程的名稱声旺。接著是兩對 publishOn()和 map()方法,其作用是先切換執(zhí)行時的調(diào)度器鉴扫,再把當(dāng)前的線程名稱作為前綴添加澈缺。最后通過 subscribeOn()方法來改變流產(chǎn)生時的執(zhí)行方式。運行之后的結(jié)果是[elastic-2] [single-1] parallel-1莱预。最內(nèi)層的線程名字 parallel-1 來自產(chǎn)生流中元素時使用的 Schedulers.parallel()調(diào)度器项滑,中間的線程名稱 single-1 來自第一個 map 操作之前的 Schedulers.single()調(diào)度器杖们,最外層的線程名字 elastic-2 來自第二個 map 操作之前的 Schedulers.elastic()調(diào)度器。
回壓
回壓的處理有以下幾種策略:
IGNORE: 完全忽略下游背壓請求姥饰,這可能會在下游隊列積滿的時候?qū)е?IllegalStateException孝治。
ERROR: 當(dāng)下游跟不上節(jié)奏的時候發(fā)出一個 IllegalStateException 的錯誤信號谈飒。
DROP:當(dāng)下游沒有準(zhǔn)備好接收新的元素的時候拋棄這個元素。
LATEST:讓下游只得到上游最新的元素费什。
BUFFER:(默認的)緩存所有下游沒有來得及處理的元素(這個不限大小的緩存可能導(dǎo)致 OutOfMemoryError)手素。
方法簽名:
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure)
默認(沒有第二個參數(shù)的方法)是緩存策略的瘩蚪,我們來試一下別的策略疹瘦,比如DROP的策略巡球。
@Test
public void OverflowStrategy() throws InterruptedException {
Flux flux = Flux.create(sink -> {
for (int i = 0; i < 1000; i++) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("create " + i + " ---");
sink.next(i);
}
sink.complete();
}, FluxSink.OverflowStrategy.BUFFER) //1 調(diào)整不同的策略(BUFFER/DROP/LATEST/ERROR/IGNORE)觀察效果酣栈,create方法默認為BUFFER;
.publishOn(Schedulers.newSingle("newSingle"), 1) //2 使用publishOn讓后續(xù)的操作符和訂閱者運行在一個單獨的名為newSingle的線程上鸯乃,第二個參數(shù)1是預(yù)取個數(shù)
.doOnComplete(() -> System.out.println("completed!")) // 結(jié)束時打印
.doOnRequest(n -> System.out.println("Request " + n + " values..."));//3 打印出每次的請求
flux.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) { // 4
System.out.println("Subscribed and make a request...");
request(1); // 5
}
@Override
protected void hookOnNext(Integer value) { // 6
try {
TimeUnit.SECONDS.sleep(1); // 7
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Get value [" + value + "]"); // 8
request(1); // 9
}
});
Thread.sleep(100 * 1000);
}
不同的策略(BUFFER/DROP/LATEST/ERROR/IGNORE)觀察到的結(jié)果不同缨睡。
Hot vs Cold
到目前為止陈辱,我們討論的發(fā)布者沛贪,無論是Flux還是Mono,都有一個特點:訂閱前什么都不會發(fā)生水评。當(dāng)我們“創(chuàng)建”了一個Flux的時候媚送,我們只是“聲明”/“組裝”了它,但是如果不調(diào)用.subscribe來訂閱它疗涉,它就不會開始發(fā)出元素吟秩。
但是我們對“數(shù)據(jù)流”(尤其是乍聽到這個詞的時候)會有種天然的感覺涵防,就是無論有沒有訂閱者,它始終在按照自己的步伐發(fā)出數(shù)據(jù)祭往。就像假設(shè)一個人沒有一個粉絲火窒,他也可以發(fā)微博一樣熏矿。
以上這兩種數(shù)據(jù)流分別稱為“冷”序列和“熱”序列。所以我們一直在介紹的Reactor3的發(fā)布者就屬于“冷”的發(fā)布者票编。不過有少數(shù)的例外慧域,比如just生成的就是一個“熱”序列,它直接在組裝期就拿到數(shù)據(jù)辛藻,如果之后有誰訂閱它互订,就重新發(fā)送數(shù)據(jù)給訂閱者仰禽。Reactor 中多數(shù)其他的“熱”發(fā)布者是擴展自Processor 的(下節(jié)會介紹到)。
下面我們通過對比了解一下兩種不同的發(fā)布者的效果规揪,首先是我們熟悉的“冷”發(fā)布者:
@Test
public void testCodeSequence() {
Flux<String> source = Flux.*fromIterable*(Arrays.*asList*("blue", "green", "orange", "purple"))
.map(String::toUpperCase);
source.subscribe(d -> System.*out*.println("Subscriber 1: " + d));
System.*out*.println();
source.subscribe(d -> System.*out*.println("Subscriber 2: " + d));
}
我們對發(fā)布者source進行了兩次訂閱温峭,每次訂閱都導(dǎo)致它把數(shù)據(jù)流從新發(fā)一遍:
Subscriber 1: BLUE
Subscriber 1: GREEN
Subscriber 1: ORANGE
Subscriber 1: PURPLE
Subscriber 2: BLUE
Subscriber 2: GREEN
Subscriber 2: ORANGE
Subscriber 2: PURPLE
然后再看一個“熱”發(fā)布者的例子:
@Test
public void testHotSequence() {
UnicastProcessor<String> hotSource = UnicastProcessor.*create*();
Flux<String> hotFlux = hotSource.publish()
.autoConnect()
.map(String::toUpperCase);
hotFlux.subscribe(d -> System.*out*.println("Subscriber 1 to Hot Source: " + d));
hotSource.onNext("blue");
hotSource.onNext("green");
hotFlux.subscribe(d -> System.*out*.println("Subscriber 2 to Hot Source: " + d));
hotSource.onNext("orange");
hotSource.onNext("purple");
hotSource.onComplete();
}
這個熱發(fā)布者是一個UnicastProcessor诚镰,我們可以使用它的onNext等方法手動發(fā)出元素清笨。上邊的例子中,hotSource發(fā)出兩個元素后第二個訂閱者才開始訂閱苛萎,所以第二個訂閱者只能收到之后的元素:
Subscriber 1 to Hot Source: BLUE
Subscriber 1 to Hot Source: GREEN
Subscriber 1 to Hot Source: ORANGE
Subscriber 2 to Hot Source: ORANGE
Subscriber 1 to Hot Source: PURPLE
Subscriber 2 to Hot Source: PURPLE
由此可見,UnicastProcessor是一個熱發(fā)布者蛙酪。
另一個示例:
@Test
public void codeTest() throws InterruptedException {
//getNameByPhoney方法執(zhí)行兩次
Mono<String> customerPhoneMono = Mono.fromSupplier(
() -> ColdMonoHttp.getNameByPhone("18611854542")
);
//getNameByPhoney方法執(zhí)行一次
Mono<String> customerPhoneMono = Mono.fromFuture(
CompletableFuture.supplyAsync(() -> ColdMonoHttp.getNameByPhone("18611854542"))
);
customerPhoneMono.subscribe(a -> System.out.println("-----訂閱一:name=" + a));
customerPhoneMono.subscribe(a -> System.out.println("-----訂閱二:name=" + a));
Thread.sleep(10000);
}
public static String getNameByPhone(String phone) {
try {
System.out.println("get name start:" + phone);
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
return null;
}
System.out.println("get name end:" + phone);
return "zhangsan";
}
參考:
官網(wǎng):
GitHub - reactor/reactor-core: Non-Blocking Reactive Foundation for the JVM
Reactor 3 Reference Guide
GitHub & BitBucket HTML Preview
其它:
使用 Reactor 進行反應(yīng)式編程
反應(yīng)式編程介紹 - liudongdong_jlu - CSDN博客
什么是 Reactor? - Reactor 指南中文版 - 極客學(xué)院Wiki
Reactor 入門與實踐 - Go語言中文網(wǎng) - Golang中文社區(qū)