Reactor是響應(yīng)式編程規(guī)范的一個實現(xiàn)弃锐,維基百科對響應(yīng)式編程的總結(jié)如下:
響應(yīng)式編程是一種異步編程范例袄友,主要關(guān)注數(shù)據(jù)流和數(shù)據(jù)變化通知。這意味著可以使用編程語言輕松表達靜態(tài)(數(shù)組)或動態(tài)(事件發(fā)射器)數(shù)據(jù)流霹菊。更多有關(guān)響應(yīng)式編程的描述可以參考Reactive programming
響應(yīng)式編程邁出的第一步是微軟在.NET系統(tǒng)中創(chuàng)建了響應(yīng)式擴展庫(Rx)剧蚣。在微軟創(chuàng)建Rx之后,RxJava在JVM上實現(xiàn)了響應(yīng)式編程旋廷。隨著時間推移鸠按,經(jīng)過Reactive Streams的不斷努力制定了Java實現(xiàn)響應(yīng)式編程的規(guī)范,規(guī)范為JVM上的響應(yīng)式庫定義了一組接口和交互規(guī)則饶碘。Java 9的Flow
類已經(jīng)實現(xiàn)了規(guī)范定義的接口(從Java 9 開始目尖,java開始默認(rèn)支持響應(yīng)式編程,有條件的小伙伴該考慮升級Java版本了)扎运。
在面向?qū)ο蟮木幊陶Z言中瑟曲,響應(yīng)式編程通常作為觀察者模式的一種擴展。如果對比迭代器設(shè)計模式和主流的響應(yīng)式流模式對比豪治,會發(fā)現(xiàn)在幾乎所有的庫中Iterable-
Iterator 都有雙重性(可以互相轉(zhuǎn)換)洞拨。兩者主要的區(qū)別是:迭代器設(shè)計模式基于拉,響應(yīng)式流基于推负拟。
迭代器是命令式編程模式烦衣,盡管訪問數(shù)據(jù)的方法僅由Iterable
負(fù)責(zé)。實際上在使用迭代器時由開發(fā)者決定何時選擇序列中的next()元素掩浙。在響應(yīng)式流中花吟,和上面Iterable-
Iterator對應(yīng)的是Publisher-Subscriber
,新值出現(xiàn)時Publisher
會通知Subscriber
涣脚,推送是響應(yīng)的關(guān)鍵示辈。同樣,對推送值的操作是聲明式而不是命令式遣蚀,代碼表達計算的邏輯矾麻,而不是描述其精確的控制流纱耻。
響應(yīng)式流除了推送值之外,同樣以良好的方式定義了錯誤處理和操作完成险耀。一個Publisher
可以向其Subscriber
推送新的值弄喘,也可以發(fā)送錯誤信號或者完成信號。錯誤信號和完成信號都會終止序列甩牺,下面的表達式準(zhǔn)確簡練的描述了這個邏輯:
onNext x 0..N [onError | onComplete]
這種模式非常的靈活蘑志,可以支持沒有值,一個值或n個值(包括無限序列贬派,比如時間)急但。但是為什么我們首先需要這樣一個異步響應(yīng)式庫呢?
阻塞是一種浪費
現(xiàn)在應(yīng)用有大量的并發(fā)用戶,盡管現(xiàn)代化硬件的能力在不斷提升搞乏,但是軟件性能依然是一個關(guān)鍵問題波桩。有兩種方法可以提高軟件的性能:
- 并行使用更多的線程和更多的硬件資源,
- 提高現(xiàn)有資源的使用率请敦。
通常镐躲,Java 開發(fā)者使用阻塞代碼開發(fā)程序,這種方法在沒有性能瓶頸之前非常的完美侍筛,因為阻塞代碼更容易理解也更容易編寫萤皂。當(dāng)程序出現(xiàn)性能瓶頸時,引入另外的線程來運行相同的阻塞代碼(活多了需要加人)匣椰。但是裆熙,資源的這種擴展會迅速引入競爭和并發(fā)問題。更糟糕的是窝爪,阻塞會浪費資源弛车。如果程序遇到延遲(特別是I/O操作,比如數(shù)據(jù)庫請求或網(wǎng)絡(luò)調(diào)用)蒲每,因為線程需要等待數(shù)據(jù)而處于空閑狀態(tài)進而導(dǎo)致資源的浪費纷跛。
因此,并行不是靈丹妙藥邀杏。充分使用硬件的能力十分必要贫奠,但是推理過程十分復(fù)雜而且更加容易造成浪費。
異步是一副良藥嗎望蜡?
通過編寫異步唤崭、非阻塞代碼,可以切換到另一個活動的使用相同基礎(chǔ)資源的任務(wù)并在異步處理完成后返回到當(dāng)前的任務(wù)脖律。通過異步代碼可以提高資源的使用率谢肾,減少資源浪費。
Java提供了下面兩種異步編程模型:
-
Callbacks: 異步方法沒有返回值小泉,但是需要一個額外的
callback
參數(shù)(可以是lambda表達式或匿名類)芦疏,當(dāng)結(jié)果可用時會被調(diào)用冕杠。 -
Futures::方法立即返回一個
Future<T>
。異步計算T的值酸茴,但是Future
封裝了對T值的訪問分预。T值可能不是立即可用,而且Future對象支持輪詢直到值T可用薪捍。Java的ExecutorService
執(zhí)行Callable<T>
時返回一個Future
對象笼痹。
Java提供的這兩種編寫異步代碼的技術(shù)足夠好了嗎?這些技術(shù)很好酪穿,但并不適用于每一種場景凳干,而且都有各自的局限性。Callbacks的缺點是很難組合在一起使用昆稿,而且多個回調(diào)組合在一起使用時纺座,代碼很快就會變的難以閱讀和維護(通常稱為回調(diào)地獄)息拜。
下面以在用戶界面顯示用戶前五個收藏夾為樣例說明Callbacks的局限性溉潭。業(yè)務(wù)場景為:顯時用戶前五個收藏夾,如果用戶沒有收藏夾則顯示建議少欺。
userService.getFavorites(userId, new Callback<List<String>>() { // 1
public void onSuccess(List<String> list) { // 2
if (list.isEmpty()) { // 3
suggestionService.getSuggestions(new Callback<List<Favorite>>() {
public void onSuccess(List<Favorite> list) { // 4
UiUtils.submitOnUiThread(() -> { // 5
list.stream()
.limit(5)
.forEach(uiList::show); // 6
});
}
public void onError(Throwable error) { // 7
UiUtils.errorPopup(error);
}
});
} else {
list.stream() //8
.limit(5)
.forEach(favId -> favoriteService.getDetails(favId, // 9
new Callback<Favorite>() {
public void onSuccess(Favorite details) {
UiUtils.submitOnUiThread(() -> uiList.show(details));
}
public void onError(Throwable error) {
UiUtils.errorPopup(error);
}
}
));
}
}
public void onError(Throwable error) {
UiUtils.errorPopup(error);
}
});
基于callback的實現(xiàn)有很多的代碼喳瓣,這些代碼難以理解,要想一步一步弄懂邏輯也比較困難赞别,而且代碼還有部分重復(fù)畏陕。下面是基于Reactor的等價實現(xiàn):
userService.getFavorites(userId)
.flatMap(favoriteService::getDetails)
.switchIfEmpty(suggestionService.getSuggestions())
.take(5)
.publishOn(UiUtils.uiThreadScheduler())
.subscribe(uiList::show, UiUtils::errorPopup);
基于callback的代碼實現(xiàn)如果要增加超時邏輯會十分的困難,但是基于Reactor的實現(xiàn)只要使用timeout方法即可輕松完成:
userService.getFavorites(userId)
.timeout(Duration.ofMillis(800))
.onErrorResume(cacheService.cachedFavoritesFor(userId))
.flatMap(favoriteService::getDetails)
.switchIfEmpty(suggestionService.getSuggestions())
.take(5)
.publishOn(UiUtils.uiThreadScheduler())
.subscribe(uiList::show, UiUtils::errorPopup);
Future
對象比回調(diào)好一點仿滔,但它們在組合方面仍然做得不好惠毁,盡管CompletableFuture
在Java 8中帶來了改進。編排多個Future
對象可行但是并不容易崎页。除此之外鞠绰,Future
還有其他問題:
- 調(diào)用
get()
方法很容易導(dǎo)致Future
對象阻塞, - 不支持懶加載/懶計算飒焦,
- 它們?nèi)狈Χ嘀岛透呒夊e誤處理的支持蜈膨。
考慮另一個例子:我們獲得一個id列表,我們想從中獲取一個名稱和一個統(tǒng)計信息牺荠,并將它們成對組合翁巍,所有這些都是異步的.。下面的代碼使用一個 CompletableFuture
列表來實現(xiàn)該功能:
CompletableFuture<List<String>> ids = ifhIds();
CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> {
Stream<CompletableFuture<String>> zip =
l.stream().map(i -> {
CompletableFuture<String> nameTask = ifhName(i);
CompletableFuture<Integer> statTask = ifhStat(i);
return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat);
});
List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList());
CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);
CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray);
return allDone.thenApply(v -> combinationList.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
});
List<String> results = result.join();
assertThat(results).contains(
"Name NameJoe has stats 103",
"Name NameBart has stats 104",
"Name NameHenry has stats 105",
"Name NameNicole has stats 106",
"Name NameABSLAJNFOAJNFOANFANSF has stats 121");
由于Reactor有更多的可開箱的組合操作符休雌,上面的過程可以簡化如下:
Flux<String> ids = ifhrIds();
Flux<String> combinations =
ids.flatMap(id -> {
Mono<String> nameTask = ifhrName(id);
Mono<Integer> statTask = ifhrStat(id);
return nameTask.zipWith(statTask,
(name, stat) -> "Name " + name + " has stats " + stat);
});
Mono<List<String>> result = combinations.collectList();
List<String> results = result.block();
assertThat(results).containsExactly(
"Name NameJoe has stats 103",
"Name NameBart has stats 104",
"Name NameHenry has stats 105",
"Name NameNicole has stats 106",
"Name NameABSLAJNFOAJNFOANFANSF has stats 121"
);
使用回調(diào)和Future對象的危險是相似的灶壶,這也是響應(yīng)式編程通過Publisher-Subscriber
對解決的問題。
3.3. 從命令式編程到響應(yīng)式編程
諸如Reactor的響應(yīng)式編程庫旨在解決JVM上“經(jīng)典”異步方法的缺點杈曲,同時也著重對以下方面進行改進:
- 可組合性 和 可讀性驰凛,
- 把數(shù)據(jù)當(dāng)做流處理孝情,同時提供豐富的操作方法,
- 在訂閱之前不會發(fā)生任何事情洒嗤,
- 背壓 或 消費者向生產(chǎn)者發(fā)送信號通知數(shù)據(jù)生產(chǎn)速率過高或過低的能力箫荡,
- 對并發(fā)不可知更高價值和更高級的抽象。
3.3.1. 可組合性和可讀性
“可組合性”指的是編排多個異步任務(wù)的能力渔隶,可以將前面任務(wù)的結(jié)果作為后續(xù)任務(wù)的輸入羔挡。當(dāng)然也可以以fork-join的方式運行多個任務(wù)。此外间唉,我們可以在更高級的系統(tǒng)中把異步任務(wù)作為離散組件重用绞灼。
編排任務(wù)的能力與代碼的可讀性和可維護性緊密相關(guān)。隨著異步處理層的數(shù)量和復(fù)雜性的增加呈野,編寫和閱讀代碼變得越來越困難低矮。正如我們所見,callback模型十分簡單被冒,但是callback一個缺點就是處理變的復(fù)雜军掂,一個callback需要在另外一個callback中執(zhí)行,這樣一層一層的嵌套昨悼。這就是“回調(diào)地獄”蝗锥,這種代碼難以閱讀和分析邏輯。
Reactor提供了豐富的組合操作率触,代碼可以反應(yīng)對處理過程抽象的組織终议,一切盡量保持在同一層(盡量減少嵌套,這也是和callback模式相比最大的改進之一)葱蝗。
3.3.2. 類比工廠的生產(chǎn)線
數(shù)據(jù)在響應(yīng)式程序中的處理過程穴张,可以被看作是數(shù)據(jù)在組裝流水線中移動。Reactor既是傳送帶又是工作站两曼。原材料從來源(第一個Publisher)傾瀉而出(中間經(jīng)過多道工序加工)皂甘,最終成為可以推送給消費者(Subscriber)的成品。
原材料可以經(jīng)過各種轉(zhuǎn)換和其他中間步驟合愈,也可以成為將中間零件組裝在一起的更大裝配線的一部分叮贩。如果某一點出現(xiàn)故障或堵塞(某到工序耗時長),那么出問題的工作站可以向上游發(fā)出信號佛析,以限制原材料的流動(有問題及時向上游反饋益老,上游做出響應(yīng),避免進一步惡化)寸莫。
3.3.3. Operators
在Reactor中捺萌,operator就是流水線中的工作站。每個operator都會將行為添加到Publisher
中膘茎,并將上一步的Publisher
包裝到新實例中桃纯。這樣構(gòu)建了一個完整的鏈接酷誓,數(shù)據(jù)從第一個Publisher
向下游移動并由每一個鏈接進行轉(zhuǎn)換,最后态坦,由一個Subscriber
結(jié)束數(shù)據(jù)的數(shù)據(jù)處理過程盐数。在Subscriber
訂閱之前數(shù)據(jù)不會被處理也不會向下游移動。
盡管響應(yīng)式流規(guī)范根本沒有定義operator伞梯,但是像Reactor的響應(yīng)式庫提供的最佳附加值之一就是提供了豐富的operator玫氢,從簡單的轉(zhuǎn)換和過濾到復(fù)雜的編排和錯誤處理,這些內(nèi)容涉及很多的領(lǐng)域谜诫。
3.3.4. 不 subscribe()
不會發(fā)生任何事情
在Reactor中漾峡,當(dāng)你編寫了一個Publisher
鏈,默認(rèn)數(shù)據(jù)不會注入喻旷。實際上只是創(chuàng)建了一個異步處理過程的抽象描述(這有助于重用和組合)生逸。通過subscribing 動作,將Publisher
綁定到Subscriber
且预,這會觸發(fā)數(shù)據(jù)在整個鏈路中移動槽袄。內(nèi)部實現(xiàn)是通過Subscriber
發(fā)送Request
信號,信號被傳播到上游一直到Publisher
辣之。request
也是實現(xiàn)背壓的關(guān)鍵方法掰伸。
3.3.5. 背壓
向上游傳播信號通常用來實現(xiàn)背壓,在和流水線的類比中描述為當(dāng)工作站處理比上游工作站的處理速度慢時怀估,沿著流水線向上游反饋。背壓其實就是下游向上游發(fā)送信號合搅,并影響上游數(shù)據(jù)處理的一種機制多搀。
響應(yīng)式流規(guī)范定義的實際機制可以簡單的概括為:一個subscriber可以以“無界” 模式工作,并讓數(shù)據(jù)源以最快的速度推送所有數(shù)據(jù)灾部,或者使用request
機制向數(shù)據(jù)源發(fā)送信息康铭,向數(shù)據(jù)源反饋已經(jīng)準(zhǔn)備好處理n個元素。
中間operator可以在中途改變request赌髓。想象一下一個buffer
以十個元素為一組將元素進行分組从藤。如果subscriber請求一個buffer,數(shù)據(jù)源發(fā)送十個元素是可以被接受锁蠕。一些operator也實現(xiàn)了預(yù)拉取策略 夷野,這避免了request(1)
不斷往返。如果在請求之前生成元素的成本很低荣倾,這種操作就非常的有幫助悯搔,可以顯著的提高處理效率。
這會將推模式轉(zhuǎn)換為推拉混合模式舌仍,如果上游已經(jīng)準(zhǔn)備了數(shù)據(jù)妒貌,下游則可以從上游獲取n個元素通危。但是如果數(shù)據(jù)還沒有準(zhǔn)備好,那么當(dāng)有數(shù)據(jù)時上游就會將數(shù)據(jù)推送到下游灌曙。
3.3.6. Hot vs Cold
Rx響應(yīng)式庫家族將響應(yīng)序列分為兩大類:“熱”和“冷”菊碟。這種區(qū)別主要與響應(yīng)式流對subscriber的響應(yīng)有關(guān):
- 對于每一個
Subscriber
,包括在數(shù)據(jù)源位置在刺,冷序列都會重新開始框沟。例如,如果源包裝了HTTP調(diào)用增炭,則將為每個subscription發(fā)出一個新的HTTP請求忍燥。 - 對于每一個
Subscriber
,熱序列并非都會從頭開始隙姿。相反摄杂,后面的subscriber只能收到訂閱完成之后產(chǎn)生的數(shù)據(jù)。但是一些熱響應(yīng)式流可以緩存或者對歷史數(shù)據(jù)全部或部分重放场靴,也就是說遲來的subscriber可以收到在完成訂閱動作之前的數(shù)據(jù)玩祟。從一般的角度來看,即使沒有訂閱者在訂閱數(shù)據(jù)欲鹏,熱序列甚至?xí)l(fā)出數(shù)據(jù)(“訂閱之前什么也沒有發(fā)生”規(guī)則的例外)机久。