本文目標(biāo)
- 理解響應(yīng)式編程
前言
之前的《聊聊 IO 多路復(fù)用》中浸卦,我們理解了非阻塞 IO 的意義主巍。但是 Spring MVC 并不能完美的應(yīng)用非阻塞編程指黎,于是 Spring 團隊開發(fā)了 WebFlux,而 WebFlux 的基礎(chǔ)正是本文要講到的 Project Reactor(下文簡稱為 Reactor)
本文以 Reactor 為例帶大家入門響應(yīng)式編程
版本
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.6</version>
</dependency>
什么是 Reactor
Reactor 是 JVM 的非阻塞響應(yīng)式編程基礎(chǔ)亡资,支持背壓。 它直接與 Java 8 函數(shù)式 API 集成,特別是 CompletableFuture伊诵、Stream 和 Duration。 它提供了可組合的異步序列 API — Flux(用于 [N] 個元素)和 Mono(用于 [0|1] 個元素)回官,并實現(xiàn)了 Reactive Streams 規(guī)范曹宴。
在 Reactor 的基礎(chǔ)上還演化出了適合微服務(wù)架構(gòu)的 Reactor Netty 。為 HTTP(包括 Websockets)歉提、TCP 和 UDP 提供支持背壓和響應(yīng)式的網(wǎng)絡(luò)引擎笛坦。
上面是對于官方文檔的翻譯。下面來說說我自己對 Reactor 和響應(yīng)式編程的理解苔巨。
回想一下之前的非阻塞 IO 編程版扩,例如我們現(xiàn)在要用非阻塞的方式調(diào)用一個遠(yuǎn)程服務(wù),當(dāng)遠(yuǎn)程接口數(shù)據(jù)可用時去做一些業(yè)務(wù)處理侄泽。這時候代碼怎么寫呢礁芦?我們需要提供一個回調(diào)函數(shù),然后在響應(yīng)就緒的時候悼尾,去調(diào)用我們的回調(diào)函數(shù)柿扣。
從邏輯上來看,這完全沒有問題诀豁。但是如果我們的回調(diào)很復(fù)雜窄刘,代碼看起來會是什么樣呢?
// 以下案例來自 Reactor 官網(wǎng)
userService.getFavorites(userId, new Callback<List<String>>() {
public void onSuccess(List<String> list) {
if (list.isEmpty()) {
suggestionService.getSuggestions(new Callback<List<Favorite>>() {
public void onSuccess(List<Favorite> list) {
UiUtils.submitOnUiThread(() -> {
list.stream()
.limit(5)
.forEach(uiList::show);
});
}
public void onError(Throwable error) {
UiUtils.errorPopup(error);
}
});
} else {
list.stream()
.limit(5)
.forEach(favId -> favoriteService.getDetails(favId,
new Callback<Favorite>() {
public void onSuccess(Favorite details) {
UiUtils.submitOnUiThread(() -> uiList.show(details));
}
public void onError(Throwable error) {
UiUtils.errorPopup(error);
}
}
));
}
}
public void onError(Throwable error) {
UiUtils.errorPopup(error);
}
});
這個代碼說實話已經(jīng)有點回調(diào)地獄那味兒了舷胜,讓一段不是很復(fù)雜的邏輯變得很難讀了娩践。但是如果用 Reactor 寫呢?
// 以下案例來自 Reactor 官網(wǎng)
userService.getFavorites(userId)
.flatMap(favoriteService::getDetails)
.switchIfEmpty(suggestionService.getSuggestions())
.take(5)
.publishOn(UiUtils.uiThreadScheduler())
.subscribe(uiList::show, UiUtils::errorPopup);
可以看到烹骨,代碼變得非常的簡潔翻伺。唯一帶來的困擾就是,我們不知道這些函數(shù)到底是啥意思 ??
響應(yīng)式編程雖然有非常多的特性沮焕,但是它并不是什么神奇的技術(shù)吨岭,它也是建立在傳統(tǒng)命令式編程的基礎(chǔ)上。只不過它所提供的 API 以及規(guī)范更適合在非阻塞 IO 中使用峦树。雖然在非阻塞 IO 框架中幾乎只使用響應(yīng)式編程(Vertx辣辫,WebFlux)旦事,只是因為這樣做更合適,并不是說沒了響應(yīng)式編程急灭,就玩不了非阻塞 IO 了姐浮。
響應(yīng)式編程內(nèi)幕
Reactor 實現(xiàn)了 org.reactivestreams
提供的 Java 響應(yīng)式編程規(guī)范,我們只要了解 reactivestreams 中代碼是如何運轉(zhuǎn)的葬馋,再看 Reactor 相關(guān)的代碼就容易多了卖鲤。
下圖展示了 reactivestreams 中的核心接口
Publisher:發(fā)布者
Subscriber:訂閱者
Subscription:這個單詞中文翻譯為名詞的訂閱,在代碼中它是發(fā)布者和訂閱者之間的媒介
Processor:該接口繼承了發(fā)布者和訂閱者畴嘶,可以理解為發(fā)布者和訂閱者的中間操作(但是 Reactor 的中間操作并沒有實現(xiàn) Processor蛋逾,在最新版本的 Reactor 中,Processor 的相關(guān)實現(xiàn)接口已經(jīng)被棄用)
在了解了響應(yīng)式編程的核心接口之后窗悯,我們來看下響應(yīng)式編程是如何運作的
在 Reactor 中大部分實現(xiàn)都是按照上圖的邏輯來執(zhí)行的
- 首先是Subscriber(訂閱者)主動訂閱 Publisher(發(fā)布者)区匣,通過調(diào)用 Publisher 的 subscribe 方法
- Publisher 在向下游發(fā)送數(shù)據(jù)之前,會先調(diào)用 Subscriber 的 onSubscribe 方法蟀瞧,傳遞的參數(shù)為 Subscription(訂閱媒介)
- Subscriber 通過 Subscription#request 來請求數(shù)據(jù)沉颂,或者 Subscription#cancel 來取消數(shù)據(jù)發(fā)布(這就是響應(yīng)式編程中的背壓,訂閱者可以控制數(shù)據(jù)發(fā)布)
- Subscription 在接收到訂閱者的調(diào)用后悦污,通過 Subscriber#onNext 向下游訂閱者傳遞數(shù)據(jù)。
- 在數(shù)據(jù)發(fā)布完成后钉蒲,調(diào)用 Subscriber#onComplete 結(jié)束本次流切端,如果數(shù)據(jù)發(fā)布或者處理遇到錯誤會調(diào)用 Subscriber#onError
調(diào)用 Subscriber#onNext,onComplete顷啼,onError 這三個方法踏枣,可能是在 Publisher 中做的,也可能是在 Subscription 中做的钙蒙,根據(jù)不同的場景有不同的實現(xiàn)方式茵瀑,并沒有什么嚴(yán)格的要求」幔可以認(rèn)為 Publisher 和 Subscription 共同配合完成了數(shù)據(jù)發(fā)布
其實 Reactor 中 API 實現(xiàn)原理也都是這個套路马昨,我這邊也自己寫了個例子便于讓讀者加深對響應(yīng)式編程的理解
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
/**
* @author tianwen.yin
*/
public class SimpleReactiveStream {
/**
* 實現(xiàn)一個簡單的響應(yīng)式編程發(fā)布者
* 邏輯:當(dāng)訂閱者發(fā)起訂閱時,像下游發(fā)送一個 HelloWorld扛施,發(fā)布邏輯由 SimpleSubscription 完成
*/
static class SimplePublisher implements Publisher {
@Override
public void subscribe(Subscriber s) {
// 2. Publisher 發(fā)布數(shù)據(jù)之前鸿捧,調(diào)用 Subscriber 的 onSubscribe
s.onSubscribe(new SimpleSubscription(data(), s));
}
private String data() {
return "Hello World";
}
}
static class SimpleSubscriber implements Subscriber {
@Override
public void onSubscribe(Subscription s) {
// 3. Subscriber 通過 Subscription#request 來請求數(shù)據(jù)
// 或者 Subscription#cancel 來取消數(shù)據(jù)發(fā)布
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Object o) {
System.out.println(o);
}
@Override
public void onError(Throwable t) {
System.out.println("error");
}
@Override
public void onComplete() {
System.out.println("complete");
}
}
static class SimpleSubscription implements Subscription {
String data;
Subscriber actual;
boolean isCanceled;
public SimpleSubscription(String data, Subscriber actual) {
this.data = data;
this.actual = actual;
}
@Override
public void request(long n) {
if (!isCanceled) {
try {
// 4. Subscription 在接收到訂閱者的調(diào)用后
// 通過 Subscriber#onNext 向下游訂閱者傳遞數(shù)據(jù)
actual.onNext(data);
// 5. 在數(shù)據(jù)發(fā)布完成后,調(diào)用 Subscriber#onComplete 結(jié)束本次流
actual.onComplete();
} catch (Exception e) {
// 如果數(shù)據(jù)發(fā)布或者處理遇到錯誤會調(diào)用 Subscriber#onError
actual.onError(e);
}
}
}
@Override
public void cancel() {
isCanceled = true;
}
}
public static void main(String[] args) {
// 1. Subscriber ”訂閱“ Publisher
new SimplePublisher().subscribe(new SimpleSubscriber());
}
}
響應(yīng)式編程思想
響應(yīng)式編程疙渣,就像裝配一條流水線匙奴。Publisher 規(guī)定了數(shù)據(jù)如何生產(chǎn),中間會有 Operators(操作符)對流水線的數(shù)據(jù)進(jìn)行解析妄荔,校驗泼菌,轉(zhuǎn)換等等操作谍肤,最終處理好的數(shù)據(jù)流轉(zhuǎn)到 Subscriber。
這條流水線還有一個特點哗伯。大部分情況下當(dāng) Publisher 的 subscribe 方法被調(diào)用之前谣沸,什么都不會發(fā)生。在被訂閱之前我們只是在定義流水線該如何工作笋颤,直到真正有人需要的時候乳附,流水線才會啟動。
Reactor 中的 Operator
Operators 怎么理解呢伴澄?對于上游來說赋除,Operators 像一個訂閱者,而對于它的下游來說非凌,它像一個發(fā)布者(我們上文說過了 Reactor 中的中間操作并沒有實現(xiàn) Processor 接口)
Mono.just("hello")
.map(a -> a + "world")
.subscribe(System.out::println);
舉個簡單的例子举农,在上面的代碼中,map 就是一個 Operator敞嗡,它的實現(xiàn)思路是什么颁糟?來看下面的代碼
// 注意,這是我基于 Reactor API 實現(xiàn)的偽代碼喉悴!
public static class MonoMap implements Publisher {
// 我們自定義的轉(zhuǎn)換邏輯
private Function mapper;
// source 代表當(dāng)前操作符的上游發(fā)布者
private Publisher source;
public MonoMap(Publisher source, Function mapper) {
this.source = source;
this.mapper = mapper;
}
@Override
public void subscribe(Subscriber actual) {
source.subscribe(new MonoMapSubscriber(mapper, actual));
}
}
public static class MonoMapSubscriber implements Subscriber {
// 我們自定義的轉(zhuǎn)換邏輯
private Function mapper;
// 真正的下游
private Subscriber actual;
public MonoMapSubscriber(Function mapper, Subscriber actual) {
this.mapper = mapper;
this.actual = actual;
}
@Override
public void onSubscribe(Subscription s) {
actual.onSubscribe(s);
}
@Override
public void onNext(Object o) {
// 當(dāng)上游數(shù)據(jù)發(fā)送過來時棱貌,先進(jìn)行轉(zhuǎn)換再發(fā)送給下游
Object result = mapper.apply(o);
actual.onNext(result);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
}
上述代碼是我自己實現(xiàn)的一個偽代碼,用于讓大家理解操作符的實現(xiàn)思路箕肃,實際 Reactor 代碼也是這個思路婚脱,只不過實現(xiàn)的更加巧妙和嚴(yán)謹(jǐn)
我們首先來分析一下 Mono.just("hello").map(a -> a + "world")
這句話
當(dāng)執(zhí)行到 Mono.just 時,會新建一個 MonoJust 對象作為當(dāng)前的 Publisher勺像。該發(fā)布者的邏輯是障贸,當(dāng)訂閱時,向下游發(fā)送數(shù)據(jù) "hello"
-
當(dāng)執(zhí)行到 map 方法時吟宦,會新建一個 MonoMap 對象替作為當(dāng)前的 Publisher篮洁,MonoJust 成為了 MonoMap 中的一個屬性 source(實際的上游)
當(dāng) MonoMap 被訂閱時,會先將它的下游 actual 做一層包裝殃姓,也就是我們上面的 MonoMapSubscriber袁波。然后去調(diào)用 source 的 subscribe 方法。上游發(fā)布數(shù)據(jù)時辰狡,MonoMapSubscriber 先對數(shù)據(jù)進(jìn)行轉(zhuǎn)換(我們上面的拼接字符串操作)锋叨,然后再發(fā)送給 actual(它的下游)
當(dāng) MonoMap 被再次轉(zhuǎn)換時,MonoMap 就變成了下游操作符的 source...
最后通過一張圖來總結(jié)一下
Reactor 該如何學(xué)習(xí)
本文并沒有介紹太多 Reactor 的細(xì)節(jié)宛篇,因為這些東西實在是太多了娃磺。我想聊聊我自己是如何學(xué)習(xí) Reactor 的
如果你已經(jīng)通過本文理解了響應(yīng)式編程的核心接口是如何工作的了,那恭喜你已經(jīng)邁向了成功的第一步了叫倍。接下來就是閱讀官方文檔偷卧,不斷的練習(xí)和閱讀 Reactor 的源碼豺瘤。源碼追蹤的方向已經(jīng)很明確了,當(dāng)我們想了解一個發(fā)布者的實現(xiàn)原理是什么听诸,我就要去關(guān)注這個發(fā)布者的 subscribe 方法和 Subscription 都做了什么坐求。想了解消費者的邏輯,就看它的 onNext晌梨,onComplete桥嗤,onError。
最后
如果覺得我的文章對你有幫助仔蝌,動動小手點下關(guān)注泛领,你的支持是對我最大的幫助