前言
網(wǎng)絡(luò)上很多關(guān)于 RxJava 的文章都是基于「方法論」的,很少?gòu)膶?shí)現(xiàn)原理的角度去透析生兆。本文希望通過(guò)深入簡(jiǎn)出地描述 RxJava 的一些重要原理卧斟,讓讀者大概知道 RxJava 是如何 Work 的链烈。
核心對(duì)象
ReactiveX 是基于觀察者模式設(shè)計(jì)的社搅,核心對(duì)象只有 Observable 和 Observer驻债。它們最簡(jiǎn)單的代碼為:
interfaceObservable{voidsubscribe(Observerobserver);}interfaceObserver{voidonNext(Tt);}
Observable 的核心方法是 subscribe()乳规,它接收一個(gè) Observer。當(dāng)調(diào)用 subscribe() 的時(shí)候合呐,就開(kāi)始通過(guò)調(diào)用 Observer 的 onNext() 方法發(fā)射數(shù)據(jù)暮的。
上下游
以下代碼中:
ob1=Observable.create(Func1);ob2=ob1.map(Func2);ob3=ob2.subscribeOn(SchedulerA);
ob1 是 ob2 的上游,ob3 是 ob2 的下游淌实《潮纾可以看出,對(duì) Observable 進(jìn)行一次「操作」后會(huì)得到一個(gè)新的 Observable翩伪。
官方定義:Doubt about the terms Upstream vs Downstream · Issue #5022 · ReactiveX/RxJava
操作符
Rx 里的操作符微猖,例如上面的 map(Func2)谈息,其內(nèi)部實(shí)現(xiàn)是這樣(官方命名稍有不同):
ob2=newMapObservable(ob1,Func2);
所以上一小節(jié)的代碼可以寫成:
ob1=newCreateObservable(Func1);ob2=newMapObservable(ob1,Func2);ob3=newSubscribeOnObservable(ob2,SchedulerA);
可以說(shuō) Rx 里最重要的是「組合操作符缘屹,加工數(shù)據(jù)流」。
官方操作符文檔:ReactiveX - Operators
操作符的內(nèi)部實(shí)現(xiàn)的核心思想是「Wrap」侠仇,例如我們可以通過(guò)?Wrap Observable?來(lái)實(shí)現(xiàn)一個(gè)最簡(jiǎn)單的沒(méi)有任何操作的操作符:
classNoOpObservable{Observableupstream;NoOpObservable(Observableupstream){this.upstream=upstream;}voidsubscribe(Observerdownstream){upstream.subscribe(downstream);}}
可以看到轻姿,在我們 NoOpObservable 的核心方法 subscribe() 里,我們直接通過(guò)調(diào)用上游 Observable 的 subscribe() 方法逻炊,把下游的 Observer 往上游傳互亮。這樣,我們就成功把下游和上游之間建立聯(lián)系了余素。
接下來(lái)我們?cè)黾与y度豹休,通過(guò)?Wrap Observer(注意是 Observer)來(lái)實(shí)現(xiàn)一個(gè)對(duì)數(shù)據(jù)流進(jìn)行加工的操作符。舉個(gè)例子桨吊,MapObservable 就是一個(gè)能對(duì)數(shù)據(jù)流進(jìn)行加工的操作符威根,它在構(gòu)造時(shí)傳入一個(gè) Func 參數(shù),類型為:
(T) -> U
這個(gè) Func 的作用视乐,就是把上游發(fā)射的數(shù)據(jù) T 加工成 U 然后繼續(xù)往下游傳遞洛搀。例如,我們可以使用 Func 把上游發(fā)射的數(shù)據(jù)轉(zhuǎn)換為 String 再傳遞給下游:
(T t) -> t.toString();
前面有說(shuō)到佑淀,Observable 是通過(guò)調(diào)用 onNext() 來(lái)向 Observer 發(fā)射數(shù)據(jù)的留美,為了避免 Observable 直接向最下游的 Observer 直接發(fā)射數(shù)據(jù)(因?yàn)槲覀冞€要進(jìn)行加工),所以我們需要對(duì) Observer 也進(jìn)行 Wrap伸刃,于是我們可以把 MapObservable 設(shè)計(jì)成這樣:
classMapObservable{MapObservable(Observableupstream,FuncmapFunc){/* ... */}voidsubscribe(Observerdownstream){upstream.subscribe(newWrapObserver(downstream,mapFunc));}classWrapObserver(){WrapObserver(Observerdownstream,FuncmapFunc){/* ... */}voidonNext(Tdata){UnewData=mapFunc.apply(data);downstream.onNext(newData);}}}
subscribeOn & observeOn
這是 RxJava 中最常用谎砾,但又不好理解的兩個(gè)操作符。因?yàn)樗鼈冏陨聿粚?duì)數(shù)據(jù)進(jìn)行任何加工捧颅,而是對(duì)其它操作符產(chǎn)生「副作用」景图。
為了更好地理解這兩個(gè)操作符,我制作了個(gè)動(dòng)畫來(lái)展示它們的工作流程:
圖中分別有 A B C 三個(gè)不同的 Scheduler,它們會(huì)把 Runnable/Func 扔到不同的線程上去執(zhí)行,而上圖中不同的顏色代表被不同的 Scheduler 執(zhí)行袁翁。
可以看到卤恳,subscribeOn() 其實(shí)影響的是上游操作符中的 subscribe() 操作程梦,而 observeOn() 影響的是下游操作符中的 onNext() 操作(這里是泛指窃判,當(dāng)然還包括 onComplete()斜姥、onError() 等)惑惶。
所以设塔,想要確定操作符上的某個(gè) Func 是在哪個(gè) Scheduler 上工作時(shí)凄吏,先要確定這個(gè) Func 是在操作符上的 subscribe() 還是 onNext() 上執(zhí)行的。例如 Create 操作符傳入的 Func 是在 subscribe() 上執(zhí)行的闰蛔,所以它優(yōu)先受下游最近的 subscribeOn() 調(diào)度痕钢。而 Map 操作符傳入的 Func 是在 onNext() 上執(zhí)行的,所以它優(yōu)先受上游最近的 observeOn() 調(diào)度序六。
再例如 Collect 操作符:
.collect(()->V func1, (T)->U func2)
傳入的 func1 是在 subscribe() 上執(zhí)行的任连,而 func2 是在 onNext() 上執(zhí)行,所以 func1 和 func2 可能被兩個(gè)不同的 Scheduler 調(diào)度例诀。
附《Android核心知識(shí)筆記2020》分享
前段時(shí)間我和圈子里的幾位架構(gòu)師朋友一起閑聊時(shí)的突發(fā)奇想随抠,我們?cè)趯W(xué)習(xí)Android開(kāi)發(fā)的時(shí)候或多或少也受到了一些前輩的指導(dǎo),所以想把這份情懷延續(xù)下去繁涂。三個(gè)月后拱她,這套資料就出來(lái)了,需要這份資料的朋友加Android學(xué)習(xí)交流群1049273031即可獲取扔罪。