前言:
這是一個關(guān)于Rx的博文系列 篮迎,也是本人第一次通過網(wǎng)絡(luò)來記錄、整理自己的學(xué)習(xí)歷程示姿。若有紕漏之處甜橱,尤其是錯誤的理解和觀點,請各位大神拍磚栈戳。
干貨在后頭呢岂傲。
Rx——Reactive Extension,使用可觀測的序列(流)來組成異步的子檀、基于事件的程序的庫镊掖。我們使用Observable(數(shù)據(jù)流)描述數(shù)據(jù)序列,通過Operator(操作符)查詢褂痰、改變數(shù)據(jù)流亩进,并且支持以參數(shù)的形式利用Scheduler(調(diào)度器)控制異步數(shù)據(jù)流的并發(fā)。簡而言之:Rx = Observable + Operator + Scheduler缩歪。
若無特別說明归薛,本系列文章中所出現(xiàn)的數(shù)據(jù)流和事件流均代表Observable。
我更愿意稱之為一種編程方式——響應(yīng)式編程匪蝙。實際上Rx不止一個庫主籍,它包括了多個實現(xiàn)了Rx方式的各種編程語言的開源庫。如非特別說明逛球,本系列文章均參考Rx的Java實現(xiàn)——RxJava2——進行闡述千元。
Rx擴展了觀察者模式,使得該模式從對一個數(shù)據(jù)或事件的觀察升級為對一個序列的數(shù)據(jù)或事件的觀察颤绕,更強大的是它可通過操作符將多個數(shù)據(jù)序列組合成一個序列幸海。借助于Rx我們能夠更加專注于程序的邏輯祟身,而不用過多地關(guān)注諸如多線程、同步物独、線程安全袜硫、并發(fā)、非阻塞IO這些底層的東東议纯。
看到這里有人會質(zhì)疑了:沒有Rx我照樣能得到想要的數(shù)據(jù)序列啊,談何“升級”溢谤?這里以吃飯來舉例瞻凤。
場景一和場景二:
來到公司樓下的食堂——沒錯,就是那種去晚了菜都涼了的地方:
一)中午胃口不錯世杀,于是點了個兩葷一素:Iterable<Dish> lunch = getDish(); eat(lauch);
二)晚上想減肥阀参,只要了一個素菜:Dish dinner = getDish(); eat(dinner);
食堂的菜已經(jīng)提前準(zhǔn)備好了,想吃多少任君挑選瞻坝。點好菜就開吃不用等待蛛壳,可見在食堂吃飯是一個同步的操作。
場景三:
某個下午堆代碼上癮錯過食堂飯點所刀,只好去樓下的拉面館吃面:
三) 老板衙荐,來碗牛肉炒刀削:Future<Dish> dinner = waitDish(); eat(lauch);
給牛肉炒刀削點贊!接下來就是等面出鍋浮创,可見吃面一個異步的操作忧吟。
場景四和場景五:
周末好不容易約了女神一起吃中餐:
為了這頓飯我餓了3天,服務(wù)員菜單遞過來我底氣十足地點了5個硬菜斩披,女神卻說要減肥(不早說)溜族。為了體現(xiàn)我的紳士風(fēng)度,餓死也必須等女神先吃垦沉。
四) 然而女神說等菜全部上齊了再吃:Future<Iterable<Dish>> lunch = waitDishAll(); eat(lauch);
五) 如果第一道菜上桌就開始吃我就不用餓那么久了:Iterable<Future<Dish>> lunch = waitDishAny(); eat(lauch);
可見吃大餐是一個由多個異步過程組合的操作煌抒,組合的方式由具體業(yè)務(wù)決定。
毫無疑問厕倍,我們能從容面對場景一和場景二寡壮;對于場景三,不同語言提供了類似功能的工具讹弯,比如Java提供了Future(配合Callback)來應(yīng)對這類單一異步操作诬像。然而在場景四和場景五中,存在多個闸婴、甚至可能嵌套的異步操作坏挠,考慮到異步操作在時間上地不確定性,盡管Future仍然可以作為一種解決方案邪乍,但隨之而來的復(fù)雜性也不容忽視降狠,程序員需要提供一些額外的邏輯來判斷異步操作何時結(jié)束以及后續(xù)的操作(對應(yīng)本場景中的吃)何時開始对竣。
吃瓜群眾開始起哄:Rx行那Rx上啊。
Observable<Dish> lauch = getDish(); // 得到數(shù)據(jù)流
5道菜上齊再吃: lauch.buffer(5).subscribe(eat); // 轉(zhuǎn)換數(shù)據(jù)流 榜配、訂閱
第一道菜上桌就吃: lauch.firstElement().subscribe(eat); // 過濾數(shù)據(jù)流 否纬、訂閱
如何獲取和響應(yīng)包含多個數(shù)據(jù)的異步數(shù)據(jù)流,Java的Future+Callback方案顯得有點笨拙蛋褥,程序員不得不分散精力去處理核心業(yè)務(wù)之外的邏輯临燃,編寫出的代碼也容易產(chǎn)生臭名昭著的“回調(diào)地獄”。相比之下烙心,Rx表現(xiàn)得游刃有余膜廊,它提供了豐富的操作符(此處暫且按下不表),原始數(shù)據(jù)流(上游數(shù)據(jù)流)經(jīng)若干次操作符處理后變成目標(biāo)數(shù)據(jù)流(下游數(shù)據(jù)流)淫茵。觀察者訂閱了目標(biāo)數(shù)據(jù)流后爪瓜,剩下的工作就是集中精力處理后續(xù)業(yè)務(wù)。
Rx不但可以方便地處理單個數(shù)據(jù)——更重要的是——它在需要處理多個匙瘪、甚至無限多個數(shù)據(jù)的場景中表現(xiàn)優(yōu)異铆铆。因此,Rx的準(zhǔn)則是:(幾乎)一切都可以成為一個數(shù)據(jù)流(哪怕只有單個數(shù)據(jù)丹喻、甚至沒有任何數(shù)據(jù))薄货。
Rx相比經(jīng)典的觀察者模式增加了兩個能力:
1) 被觀察者向觀察者發(fā)送“沒有更多數(shù)據(jù)”通知的能力(調(diào)用觀察者的onComplete方法),類比于Iterable因 !hasNext()正常結(jié)束遍歷碍论。
2) 被觀察者向觀察者發(fā)送“異撤坡浚”通知的能力 (調(diào)用觀察者的onError方法),類比于Iterable遍歷過程中拋出異常 提前結(jié)束遍歷骑冗。
觀察者只能收到以上兩個通知其中之一赊瞬。一旦收到任何一個通知,之后觀察者將不會收到任何數(shù)據(jù)或通知贼涩。
獲得以上兩個能力的加持后巧涧,Rx的Observable與Iterable看起來就像是孿生兄弟,我們可以像使用Iterable一樣使用Observable遥倦。
通過Iterable處理數(shù)據(jù)序列谤绳,用的是pull(拉取)的方式袒哥,處理過程發(fā)生在當(dāng)前線程:
getDataFromLocalMemory()
? ? .skip(10)
? ? .take(5)
? ? ? .map({ s -> s + " transformed" })
? ? .forEach({ println "next -> " + it })
通過Observable處理數(shù)據(jù)序列缩筛,用的是push(推送)的方式,可以靈活地選擇同步或異步地發(fā)送處理結(jié)果:
getDataFromNetwork()
? ? .skip(10)
? ? .take(5)
? ? .map({ s ->s + " transformed" })
? ? .subscribe({ println "onNext -> " + it })
Observable比它的孿生兄弟Iterable多了一對隱形的翅膀——在處理異步的數(shù)據(jù)流時堡称,我們便可以打開這對翅膀瞎抛。
Rx使用非常靈活,盡管我們通常用它來處理異步數(shù)據(jù)流却紧,事實上桐臊,Rx完全不關(guān)心產(chǎn)生數(shù)據(jù)的方式胎撤,無論是通過線程池、event loops断凶、non-blocking I/O伤提、 actors。即便你已經(jīng)通過線程池來產(chǎn)生數(shù)據(jù)认烁,仍然可以改造成其他方式肿男,而完全不用改變現(xiàn)有觀察者的工作方式。
數(shù)據(jù)是同步or異步獲取的却嗡?
數(shù)據(jù)序列是否需要在多個不同線程中計算并依次返回給調(diào)用者舶沛?
是否通過異步的網(wǎng)絡(luò)請求來獲取數(shù)據(jù)?
是否通過callback線程獲取數(shù)據(jù)稽穆?
Rx把一切與Observable的交互視為異步的冠王。