【RxJava】- 創(chuàng)建操作符源碼分析
【RxJava】- 過濾操作符源碼分析
【RxJava】- 結(jié)合操作符源碼分析
【RxJava】- 連接操作符源碼分析
注意
文章中說的被觀察者和觀察者可以看------RxJava的基本執(zhí)行流程。比如Observable.create傳入的參數(shù)是被觀察者鳄虱,而subscribe傳入的參數(shù)是觀察者娇昙,因?yàn)榍罢呤鞘录陌l(fā)射地匆绣,而后者是接收事件的地方强衡,事件發(fā)射地的變化,我們都能第一時(shí)間得知畜普,就好像我們在觀察前者一樣蛀序。
Compose
通過對其應(yīng)用特定操作。比如在網(wǎng)絡(luò)請求中挂捻,需要在觀察者接收通知前對返回的數(shù)據(jù)做一定的處理后碉纺,再通知觀察者。
FlatMap
FlatMap將一個(gè)發(fā)射數(shù)據(jù)的Observable變換為多個(gè)Observables刻撒,然后將它們發(fā)射的數(shù)據(jù)合并后放進(jìn)一個(gè)單獨(dú)的Observable骨田。
FlatMap功能由
ObservableFlatMap
ObservableMapNotification
ObservableInternalHelper
FlatMapWithCombinerOuter
ObservableFlatMapCompletableCompletable
ObservableFlattenIterable
FlatMapIntoIterable
ObservableFlatMapMaybe
ObservableFlatMapSingle
實(shí)現(xiàn)
ObservableFlatMap
首先調(diào)用ObservableFlatMap的subscribeActual方法,并觀察者實(shí)例声怔。
public void subscribeActual(Observer<? super U> t) {
if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)){return;}
source.subscribe(new MergeObserver<>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
tryScalarXMapSubscribe方法
如果被觀察者(比如Observable.create傳入的參數(shù))是Supplier類型态贤,返回true。
直接調(diào)用Supplier的get方法獲取事件數(shù)據(jù)醋火,然后調(diào)用FlatMap傳入的數(shù)據(jù)轉(zhuǎn)換包裝實(shí)例的apply方法悠汽,得到一個(gè)包裝后,類型為ObservableSource的c胎撇。
如果得到的數(shù)據(jù)包裝實(shí)例也是Supplier類型介粘,者直接調(diào)用Supplier的get方法獲取新的數(shù)據(jù),新數(shù)據(jù)可以被轉(zhuǎn)換晚树,也可以沒有轉(zhuǎn)換姻采。然后調(diào)用觀察者的onSubscribe方法,緊接著調(diào)用觀察者的onNext爵憎,onComplete等方法慨亲。
如果得到的數(shù)據(jù)包裝實(shí)例不是Supplier類型,這直接調(diào)用數(shù)據(jù)包裝實(shí)例的subscribe方法宝鼓,并把觀察者實(shí)例傳過去刑棵。這樣,可以由數(shù)據(jù)包裝實(shí)例對數(shù)據(jù)轉(zhuǎn)換后自己給觀察者發(fā)出通知愚铡。
source.subscribe(new MergeObserver<>(t, mapper, delayErrors, maxConcurrency, bufferSize))
調(diào)用被觀察者subscribe方法蛉签,傳入MergeObserver實(shí)例,看上去是不是像是又創(chuàng)建了一個(gè)新的被觀察者和觀察者關(guān)系沥寥。在觀察者subscribe里實(shí)現(xiàn)事件發(fā)射邏輯碍舍,在MergeObserver接收事件。下面重點(diǎn)看一下onNext方法邑雅。
onNext
首先獲取數(shù)據(jù)包裝實(shí)例片橡,如果數(shù)據(jù)個(gè)數(shù)大于最大允許的個(gè)數(shù),將不在往隊(duì)列里面插入淮野,直接返回捧书。-
subscribeInner
如果數(shù)據(jù)包裝實(shí)例是Supplier類型吹泡,者從隊(duì)列取出數(shù)據(jù)包裝實(shí)例,調(diào)用Supplier的get方法獲取新的轉(zhuǎn)換數(shù)據(jù)经瓷,然后通知觀察者爆哑。如果不是Supplier類型,創(chuàng)建InnerObserver實(shí)例了嚎,傳入上面創(chuàng)建的MergeObserver實(shí)例泪漂,添加到,然后調(diào)用數(shù)據(jù)包裝實(shí)例的subscribe方法歪泳,就好像數(shù)據(jù)包裝實(shí)例做被觀察者萝勤,InnerObserver作為觀察者。發(fā)射的數(shù)據(jù)會被保存在InnerObserver中的隊(duì)列queue中呐伞,然后一一發(fā)射給觀察者敌卓。queue是線程可見的隊(duì)列,這樣可以運(yùn)用到多線程中伶氢。
RxJava很多操作符都對多線程進(jìn)行了支持和處理趟径,如果想在多線程中使用,可以自己查看一下源碼癣防。也不是很難蜗巧。
ObservableMapNotification
這個(gè)類大概就是通過傳入的數(shù)據(jù)包裝實(shí)例對數(shù)據(jù)做轉(zhuǎn)換,然后通知觀察者蕾盯,具體邏輯可以自己查看源碼幕屹,源碼很少很簡單。
FlatMapWithCombinerOuter
簡單看了一下代碼级遭,大概功能是望拖,將得到的兩個(gè)包裝數(shù)據(jù)實(shí)例進(jìn)行合并,前一個(gè)包裝數(shù)據(jù)實(shí)例會被傳入到第二包裝實(shí)例里面挫鸽,然后返回最終的包裝數(shù)據(jù)说敏。
當(dāng)有兩個(gè)數(shù)據(jù)需要合并成一個(gè)新的數(shù)據(jù)實(shí)例時(shí),可以用這個(gè)丢郊。
ObservableFlatMapCompletableCompletable
將值序列映射到CompletableSources中盔沫,并等待其終止。代碼不多枫匾,自己查看迅诬。
ObservableFlattenIterable
將序列映射為Iterable并發(fā)出其值。然后遍歷迭代器發(fā)射數(shù)據(jù)婿牍。
flatMapIntoIterable
功能和ObservableFlattenIterable差不多,只是被觀察者本身就是一個(gè)迭代器惩歉。
ObservableFlatMapMaybe
將上游值映射到MaybeSources中等脂,并將其信號合并為一個(gè)序列俏蛮。
ObservableFlatMapSingle
將上游值映射到SingleSources并將其信號合并為一個(gè)序列。
GroupBy
將一個(gè)Observable分拆為一些Observables集合上遥,它們中的每一個(gè)發(fā)射原始Observable的一個(gè)子序列搏屑,GroupBy操作符將原始Observable分拆為一些Observables集合,它們中的每一個(gè)發(fā)射原始Observable數(shù)據(jù)序列的一個(gè)子序列粉楚。哪個(gè)數(shù)據(jù)項(xiàng)由哪一個(gè)Observable發(fā)射是由一個(gè)函數(shù)判定的辣恋,這個(gè)函數(shù)給每一項(xiàng)指定一個(gè)Key,Key相同的數(shù)據(jù)會被同一個(gè)Observable發(fā)射模软。
只有ObservableGroupBy實(shí)現(xiàn)類伟骨。在ObservableGroupBy維護(hù)著 groups(Map<Object, GroupedUnicast<K, V>>)這樣一個(gè)數(shù)組,當(dāng)發(fā)射數(shù)據(jù)是燃异,首先獲取key携狭,然后從groups獲取key對應(yīng)的GroupedUnicast實(shí)例,GroupedUnicast中有 State<T, K> state實(shí)例回俐,而state維護(hù)著queue(SpscLinkedArrayQueue)數(shù)組來存放發(fā)射的數(shù)據(jù)逛腿。
group.onNext(v);
if (newGroup) {
downstream.onNext(group);
if (group.state.tryAbandon()) {
cancel(key);
group.onComplete();
}
}
如果不是新創(chuàng)建的GroupedUnicast實(shí)例,那么可以推斷直接已經(jīng)發(fā)射過仅颇,所以在GroupedUnicast中已經(jīng)存在觀察者實(shí)例单默。如果是新創(chuàng)建的,執(zhí)行downstream.onNext(group)忘瓦。進(jìn)入后面的流程搁廓。
GroupJoin
由ObservableGroupBy實(shí)現(xiàn)。將不同數(shù)據(jù)來源的數(shù)據(jù)進(jìn)行合并政冻,合并規(guī)則自己查看源碼枚抵。
Map
由ObservableMap實(shí)現(xiàn)。對Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù)明场,執(zhí)行變換操作汽摹。就相當(dāng)于FlatMap中的獲取包裝數(shù)據(jù),然后直接通知觀察者苦锨。源碼很簡單逼泣,自己查看。
Scan
實(shí)現(xiàn)類有
ObservableScan
ObservableScanSeed
連續(xù)地對數(shù)據(jù)序列的每一項(xiàng)應(yīng)用一個(gè)函數(shù)舟舒,然后連續(xù)發(fā)射結(jié)果拉庶。Scan操作符對原始Observable發(fā)射的第一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù),然后將那個(gè)函數(shù)的結(jié)果作為自己的第一項(xiàng)數(shù)據(jù)發(fā)射秃励。它將函數(shù)的結(jié)果同第二項(xiàng)數(shù)據(jù)一起填充給這個(gè)函數(shù)來產(chǎn)生它自己的第二項(xiàng)數(shù)據(jù)氏仗。它持續(xù)進(jìn)行這個(gè)過程來產(chǎn)生剩余的數(shù)據(jù)序列。這個(gè)操作符在某些情況下被叫做accumulator夺鲜。
源碼很簡單皆尔,自己查看呐舔。
Window
這個(gè)就帶大家分析源碼了,有興趣的可以自己查看慷蠕,如果只是想知道用法珊拼,請參考Window
總結(jié)
其實(shí)網(wǎng)上有很多對RxJava操作符的講解,還有對官網(wǎng)文檔的翻譯流炕,我覺得澎现,要想對RxJava用好,還是要自己去查看源碼每辟。其實(shí)RxJava雖然很多剑辫,但是每個(gè)操作符都有獨(dú)立的實(shí)現(xiàn),分析源碼其實(shí)也不難影兽。只要你了解了被觀察者和觀察者的調(diào)用關(guān)系揭斧,很多操作符只不過是在中間有添加了被觀察者和觀察者過程。相當(dāng)于嵌套峻堰。如果你了解這個(gè)執(zhí)行過程讹开,你可以很快找到關(guān)鍵的執(zhí)行代碼。