響應(yīng)式編程
作為響應(yīng)式編程方向上的第一步瓦哎,微軟在.NET生態(tài)系統(tǒng)中創(chuàng)建了Rx庫(kù)(Reactive Extensions)咬腕。RxJava是在JVM上對(duì)它的實(shí)現(xiàn)践付。
響應(yīng)式編程是一個(gè)異步編程范式,通常出現(xiàn)在面向?qū)ο蟮恼Z(yǔ)言中票顾,作為觀察者模式的一個(gè)擴(kuò)展弄匕。
它關(guān)注數(shù)據(jù)的流動(dòng)颅悉、變化的傳播。這意味著可以輕易地使用編程語(yǔ)言表示靜態(tài)(如數(shù)組)或動(dòng)態(tài)(如事件發(fā)射源)數(shù)據(jù)流迁匠。
響應(yīng)式流
隨著時(shí)間的推移剩瓶,一個(gè)專(zhuān)門(mén)為Java的標(biāo)準(zhǔn)化出現(xiàn)了。它是一個(gè)規(guī)范柒瓣,定義了一些接口和交互規(guī)則儒搭,用于JVM平臺(tái)上的響應(yīng)式庫(kù)吠架。
它就是響應(yīng)式流(Reactive Streams)芙贫,它的這些接口已經(jīng)被集成到Java 9里,在java.util.concurrent.Flow這個(gè)父類(lèi)里傍药。
響應(yīng)式流和迭代器較相似磺平,不過(guò)迭代器是基于“拉”(pull)的,而響應(yīng)式流是基于“推”(push)的拐辽。
迭代器的使用其實(shí)是命令式編程拣挪,因?yàn)橛砷_(kāi)發(fā)者決定什么時(shí)候調(diào)用next()獲取下一個(gè)元素。
在響應(yīng)式流中俱诸,與上面等價(jià)的是發(fā)布者-訂閱者菠劝。但當(dāng)有新的可用元素時(shí),是由發(fā)布者推給訂閱者的睁搭。這個(gè)“推”就是響應(yīng)式的關(guān)鍵所在赶诊。
另外,對(duì)被推過(guò)來(lái)元素的操作也是以聲明的方式進(jìn)行的园骆,程序員只需表達(dá)做什么就行了舔痪,不需要管怎么做。
發(fā)布者使用onNext方法向訂閱者推送新元素锌唾,使用onError方法告知一個(gè)錯(cuò)誤锄码,使用onComplete方法告知已經(jīng)結(jié)束。
可見(jiàn),錯(cuò)誤處理和完成(結(jié)束)也是以一個(gè)良好的方式被處理滋捶。錯(cuò)誤和結(jié)束都可以終止序列痛悯。
這種方式非常靈活。這種模式支持0個(gè)(沒(méi)有)元素/1個(gè)元素/n(多)個(gè)元素(包括無(wú)限序列重窟,如果滴答的鐘表)這些情況灸蟆。
Reactor粉墨登場(chǎng)
Reactor是第四代響應(yīng)式庫(kù),是一個(gè)響應(yīng)式編程范式的實(shí)現(xiàn)亲族,用于在JVM平臺(tái)上基于響應(yīng)式流規(guī)范構(gòu)建非阻塞異步應(yīng)用炒考。
它極大地實(shí)現(xiàn)了JVM上響應(yīng)式流的規(guī)范(
http://www.reactive-streams.org/)。
它是一個(gè)完全非阻塞響應(yīng)式編程的基石霎迫,帶有高效需求管理(以管理“后壓”的形式)斋枢。
它直接集成Java函數(shù)式API,特別是CompletableFuture知给,Stream和Duration瓤帚。
它支持使用reactor-netty工程實(shí)現(xiàn)非阻塞跨進(jìn)程通信,適合微服務(wù)架構(gòu)涩赢,支持HTTP(包括Websockets)戈次,TCP和UDP。
注:Reactor要求Java 8+
講了這么多筒扒,是不是要首先思考下怯邪,為什么我們需要這樣一個(gè)異步的響應(yīng)式庫(kù)?
阻塞就是浪費(fèi)
現(xiàn)代的應(yīng)用能達(dá)到非常多的并發(fā)用戶花墩,即使現(xiàn)代硬件的能力被持續(xù)改進(jìn)悬秉,現(xiàn)代軟件的性能仍然是一個(gè)關(guān)鍵的關(guān)注點(diǎn)。
大體上有兩種方式可以改進(jìn)一個(gè)程序的性能:
1冰蘑、并行化和泌,使用更多的線程和更多的硬件資源
2、提高效率祠肥,在當(dāng)前資源用量的情況下尋求更高效率
通常武氓,Java開(kāi)發(fā)者使用阻塞代碼來(lái)寫(xiě)程序。這種實(shí)踐性很好仇箱,直到遇到性能瓶頸县恕。
此時(shí)會(huì)引入額外線程,運(yùn)行相似的阻塞代碼工碾。但是這種擴(kuò)展方法在資源利用方面會(huì)引起爭(zhēng)論和導(dǎo)致并發(fā)問(wèn)題弱睦。
更糟糕的是,阻塞浪費(fèi)資源渊额。如果你仔細(xì)看况木,一旦一個(gè)程序涉及到一些延遲(特別是I/O垒拢,像數(shù)據(jù)庫(kù)請(qǐng)求或網(wǎng)絡(luò)調(diào)用),資源就被浪費(fèi)火惊,因?yàn)榫€程現(xiàn)在是空閑的求类,在等待數(shù)據(jù)。
所以并行化方式不是銀彈屹耐。我們有必要讓硬件發(fā)揮完全的力量尸疆,但是關(guān)于資源浪費(fèi)的影響和原因也是非常復(fù)雜的。
異步性來(lái)營(yíng)救
前面提到的第二種方式是尋求更高效率惶岭,可以作為資源浪費(fèi)問(wèn)題的一個(gè)解決方案寿弱。
通過(guò)寫(xiě)異步非阻塞代碼,你能讓執(zhí)行切換到其它活動(dòng)的任務(wù)按灶,使用相同的底層資源症革,稍后再回到當(dāng)前的處理上。
但是如何產(chǎn)生異步代碼到JVM上呢鸯旁?Java提供兩種異步編程模型:
1噪矛、Callbacks,異步方法沒(méi)有返回值铺罢,但是會(huì)帶一個(gè)回調(diào)艇挨,當(dāng)結(jié)果可用時(shí)回調(diào)會(huì)被調(diào)用。
2韭赘、Futures缩滨,異步方法立即返回一個(gè)Future,異步處理過(guò)程就是計(jì)算一個(gè)T值辞居,使用Future對(duì)象包裝了對(duì)它的訪問(wèn)楷怒。這個(gè)值不是立即可用的蛋勺,該對(duì)象可以被輪詢來(lái)查看T值是否可用瓦灶。
這兩種技術(shù)都足夠好嗎?并不是對(duì)每種情況都是的抱完,兩種方式都有局限性贼陶。
回調(diào)比較難于組合在一起,很快就會(huì)導(dǎo)致代碼難以閱讀和維護(hù)(眾所周知的“回調(diào)地獄”)巧娱。
看個(gè)回調(diào)示例碉怔,展示
一個(gè)用戶的前5個(gè)最?lèi)?ài),如果沒(méi)有的話就推薦5個(gè)給他:
這么簡(jiǎn)單的功能需要如此多的代碼禁添,而且嵌套很多撮胧、且難懂。
下面是等價(jià)的用Reactor的示例:
從代碼的數(shù)量老翘、寫(xiě)法上是不是清爽了很多芹啥。
與回調(diào)相比锻离,F(xiàn)utures稍微好一點(diǎn),但是仍然在組合方面做得不好墓怀。組合多個(gè)Futures對(duì)象到一起是可行的但是并不容易汽纠。
Future也有其它問(wèn)題,很容易因?yàn)檎{(diào)用了get()方法造成了另一個(gè)阻塞傀履。
另外虱朵,它不支持延遲計(jì)算,缺乏對(duì)多個(gè)值的支持钓账,缺乏高級(jí)錯(cuò)誤處理碴犬。
從命令式到響應(yīng)式編程
像Reactor這樣的響應(yīng)式庫(kù)的目標(biāo)就是解決在JVM上“傳統(tǒng)”異步方式的弊端,同時(shí)也關(guān)注一些額外方面:
可組合性和可讀性
數(shù)據(jù)作為流梆暮,被豐富的操作符操作
什么都不會(huì)發(fā)生翅敌,直到你訂閱
后壓,消費(fèi)者通知生產(chǎn)者發(fā)射的速率太快了
高級(jí)別而不是高數(shù)值抽象
可組合性和可讀性
可組合性惕蹄,其實(shí)就是編排多個(gè)異步任務(wù)的能力蚯涮,使前一個(gè)任務(wù)的結(jié)果作為后續(xù)任務(wù)的輸入,或以fork-join(分叉-合并)的方式執(zhí)行若干個(gè)任務(wù)卖陵,或在更高的級(jí)別重復(fù)利用這些異步任務(wù)遭顶。
任務(wù)編排的能力和代碼的可讀性和可維護(hù)性緊密地耦合在一起。隨著異步處理在數(shù)量和復(fù)雜度上的增加泪蔫,組合和閱讀代碼變得更加困難棒旗。
就像我們看到的,回調(diào)模型雖然簡(jiǎn)單撩荣,但是當(dāng)回調(diào)里嵌套回調(diào)铣揉,達(dá)到多層時(shí)就會(huì)變成回調(diào)地獄。
Reactor提供豐富的組合選項(xiàng)餐曹,使嵌套級(jí)別最小逛拱,讓代碼的組織結(jié)構(gòu)能反映出在進(jìn)行什么樣的抽象處理,且通常保持在同級(jí)別上台猴。
裝配線類(lèi)比
你可以認(rèn)為響應(yīng)式應(yīng)用處理數(shù)據(jù)就像通過(guò)一個(gè)裝配(生產(chǎn))線朽合。Reactor既是傳送帶又是工作站。
原材料從一個(gè)源(原始發(fā)布者)持續(xù)不斷地獲取饱狂,以一個(gè)完成的產(chǎn)品被推送給消費(fèi)者(訂閱者)結(jié)束曹步。
原材料可以經(jīng)過(guò)許多不同的轉(zhuǎn)換,如其它的中間步驟休讳,或者是一個(gè)更大裝配線的一部分讲婚。
如果在某個(gè)地方出現(xiàn)一個(gè)小故障或阻塞了,出問(wèn)題的工作站可以向上游發(fā)出通知來(lái)限制原材料的流動(dòng)(速率)俊柔。
操作符
在Reactor里筹麸,操作符就是裝配線類(lèi)比中的工作站纳猫。每一個(gè)操作符都向一個(gè)發(fā)布者添加某些行為,把上一步的發(fā)布者包裝到一個(gè)新的實(shí)例里竹捉。整個(gè)鏈就是這樣被鏈接起來(lái)的芜辕。
所以數(shù)據(jù)一開(kāi)始從第一個(gè)發(fā)布者出來(lái),然后沿著鏈往下游移動(dòng)块差,且被每一個(gè)鏈接轉(zhuǎn)換侵续。最后,一個(gè)訂閱者結(jié)束了這個(gè)處理憨闰。
響應(yīng)式流規(guī)范并沒(méi)有明確規(guī)定操作符状蜗,不過(guò)Reactor就提供了豐富的操作符,它們涉及到很多方面鹉动,從簡(jiǎn)單的轉(zhuǎn)換轧坎、過(guò)濾到復(fù)雜的編排、錯(cuò)誤處理泽示。
只要不訂閱缸血,就什么都不發(fā)生
當(dāng)你寫(xiě)一個(gè)發(fā)布者鏈時(shí),默認(rèn)械筛,數(shù)據(jù)是不會(huì)開(kāi)始進(jìn)入鏈中的捎泻。相反,你只是創(chuàng)建了異步處理的一個(gè)抽象描述埋哟。
通過(guò)訂閱這個(gè)行為(動(dòng)作)笆豁,才把發(fā)布者和訂閱者連接起來(lái),然后才會(huì)觸發(fā)數(shù)據(jù)在鏈里流動(dòng)赤赊。
這是在內(nèi)部實(shí)現(xiàn)好的闯狱,通過(guò)來(lái)自于訂閱者的request信號(hào)往上游傳播,一路逆流而上直到最開(kāi)始的發(fā)布者那里抛计。
Reactor核心特性
Reactor引入可組合響應(yīng)式的類(lèi)型哄孤,實(shí)現(xiàn)了發(fā)布者接口,但也提供了豐富的操作符爷辱,就是Flux和Mono录豺。
Flux,流動(dòng)饭弓,表示0到N個(gè)元素。
Mono媒抠,單個(gè)弟断,表示0或1個(gè)元素。
它們之間的不同主要在語(yǔ)義上趴生,表示異步處理的
粗略基數(shù)阀趴。
如一個(gè)http請(qǐng)求只會(huì)產(chǎn)生一個(gè)響應(yīng)昏翰,把它表示為Mono顯然更有意義,且它只提供相對(duì)于0/1這樣上下文的操作符刘急,因?yàn)榇藭r(shí)count操作顯然沒(méi)有太大意義棚菊。
操作符可以
改變處理的最大基數(shù),也會(huì)切換到相關(guān)類(lèi)型上叔汁。如count操作符雖然存在于Flux上统求,但它的返回值卻是一個(gè)Mono。
Flux<T>
一個(gè)Flux是一個(gè)標(biāo)準(zhǔn)的Publisher据块,表示一個(gè)異步序列码邻,可以發(fā)射0到N個(gè)元素,可以通過(guò)一個(gè)完成信號(hào)或錯(cuò)誤信號(hào)終止另假。
就像在響應(yīng)式流規(guī)范里那樣像屋,這3種類(lèi)型的信號(hào)轉(zhuǎn)化為對(duì)一個(gè)下游訂閱者的onNext,onComplete边篮,onError3個(gè)方法的調(diào)用己莺。
這3個(gè)方法也可以理解為事件/回調(diào),且它們都是可選的戈轿。
如沒(méi)有onNext但有onComplete篇恒,表示一個(gè)空的有限序列。既沒(méi)有onNext也沒(méi)有onComplete凶杖,表示一個(gè)空的無(wú)限序列(沒(méi)有什么實(shí)際用途胁艰,可用于測(cè)試)。
無(wú)限序列也沒(méi)有必要是空的智蝠,如Flux.interval(Duration)產(chǎn)生一個(gè)Flux 腾么,它是無(wú)限的,從鐘表里發(fā)射出的規(guī)則的“嘀嗒”杈湾。
Mono<T>
一個(gè)Mono是一個(gè)特殊的Publisher解虱,最多發(fā)射一個(gè)元素,可以使用onComplete信號(hào)或onError信號(hào)來(lái)終止漆撞。
它提供的操作符只是Flux提供的一個(gè)子集殴泰,同樣,一些操作符(如把Mono和Publisher結(jié)合起來(lái))可以把它切換到一個(gè)Flux浮驳。
如Mono#concatWith(Publisher)返回一個(gè)Flux悍汛,然而Mono#then(Mono)返回的是另一個(gè)Mono。
Mono可以用于表示沒(méi)有返回值的異步處理(與Runnable相似)至会,用Mono表示离咐。
創(chuàng)建Flux或Mono,并訂閱它們
最容易的方式就是使用它們各自的工廠方法:
Flux?seq1?=?Flux.just("foo","bar","foobar");
List?iterable?=?Arrays.asList("foo","bar","foobar");
Flux?seq2?=?Flux.fromIterable(iterable);
Flux?numbersFromFiveToSeven?=?Flux.range(5,3);
Mono?noData?=?Mono.empty();
Mono?data?=?Mono.just("foo");
當(dāng)談到訂閱時(shí),可以使用Java 8的lambda表達(dá)式宵蛀,訂閱方法有多種不同的變體昆著,帶有不同的回調(diào)。
下面是方法簽名:
//訂閱并觸發(fā)序列
subscribe();
//可以對(duì)每一個(gè)產(chǎn)生的值進(jìn)行處理
subscribe(Consumer?consumer);
//還可以響應(yīng)一個(gè)錯(cuò)誤
subscribe(Consumer?consumer,
Consumer?errorConsumer);
//還可以在成功結(jié)束后執(zhí)行一些代碼
subscribe(Consumer?consumer,
Consumer?errorConsumer,
Runnable?completeConsumer);
//還可以對(duì)Subscription執(zhí)行一些操作
subscribe(Consumer?consumer,
Consumer?errorConsumer,
Runnable?completeConsumer,
Consumer?subscriptionConsumer);
使用Disposable取消訂閱
這些基于lambda的訂閱方法都返回一個(gè)Disposable類(lèi)型术陶,通過(guò)調(diào)用它的dispose()來(lái)取消這個(gè)訂閱凑懂。
對(duì)于Flux和Mono,取消就是一個(gè)信號(hào)梧宫,表明源應(yīng)該停止生產(chǎn)元素接谨。然而,不保證立即生效祟敛,一些源可能生產(chǎn)元素非嘲贪樱快,以致于還沒(méi)有收到取消信號(hào)就已經(jīng)生產(chǎn)完了馆铁。
歡迎工作一到五年的Java工程師朋友們加入Java程序員開(kāi)發(fā): 854393687
群內(nèi)提供免費(fèi)的Java架構(gòu)學(xué)習(xí)資料(里面有高可用跑揉、高并發(fā)、高性能及分布式埠巨、Jvm性能調(diào)優(yōu)历谍、Spring源碼,MyBatis辣垒,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個(gè)知識(shí)點(diǎn)的架構(gòu)資料)合理利用自己每一分每一秒的時(shí)間來(lái)學(xué)習(xí)提升自己望侈,不要再用"沒(méi)有時(shí)間“來(lái)掩飾自己思想上的懶惰!趁年輕勋桶,使勁拼脱衙,給未來(lái)的自己一個(gè)交代!