【RxJava】- 變換操作符源碼分析

【RxJava】- 創(chuàng)建操作符源碼分析
【RxJava】- 過濾操作符源碼分析
【RxJava】- 結(jié)合操作符源碼分析
【RxJava】- 連接操作符源碼分析

注意

文章中說的被觀察者和觀察者可以看------RxJava的基本執(zhí)行流程。比如Observable.create傳入的參數(shù)是被觀察者鳄虱,而subscribe傳入的參數(shù)是觀察者娇昙,因?yàn)榍罢呤鞘录陌l(fā)射地匆绣,而后者是接收事件的地方强衡,事件發(fā)射地的變化,我們都能第一時(shí)間得知畜普,就好像我們在觀察前者一樣蛀序。

Compose

通過對其應(yīng)用特定操作。比如在網(wǎng)絡(luò)請求中挂捻,需要在觀察者接收通知前對返回的數(shù)據(jù)做一定的處理后碉纺,再通知觀察者。

FlatMap

FlatMap將一個(gè)發(fā)射數(shù)據(jù)的Observable變換為多個(gè)Observables刻撒,然后將它們發(fā)射的數(shù)據(jù)合并后放進(jìn)一個(gè)單獨(dú)的Observable骨田。

FlatMap功能由

ObservableFlatMap
ObservableMapNotification
ObservableInternalHelper
FlatMapWithCombinerOuter
ObservableFlatMapCompletableCompletable
ObservableFlattenIterable
FlatMapIntoIterable
ObservableFlatMapMaybe
ObservableFlatMapSingle

實(shí)現(xiàn)

ObservableFlatMap

首先調(diào)用ObservableFlatMap的subscribeActual方法,并觀察者實(shí)例声怔。

public void subscribeActual(Observer<? super U> t) {
    if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)){return;}
     source.subscribe(new MergeObserver<>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
tryScalarXMapSubscribe方法

如果被觀察者(比如Observable.create傳入的參數(shù))是Supplier類型态贤,返回true。

直接調(diào)用Supplier的get方法獲取事件數(shù)據(jù)醋火,然后調(diào)用FlatMap傳入的數(shù)據(jù)轉(zhuǎn)換包裝實(shí)例的apply方法悠汽,得到一個(gè)包裝后,類型為ObservableSource的c胎撇。

如果得到的數(shù)據(jù)包裝實(shí)例也是Supplier類型介粘,者直接調(diào)用Supplier的get方法獲取新的數(shù)據(jù),新數(shù)據(jù)可以被轉(zhuǎn)換晚树,也可以沒有轉(zhuǎn)換姻采。然后調(diào)用觀察者的onSubscribe方法,緊接著調(diào)用觀察者的onNext爵憎,onComplete等方法慨亲。

如果得到的數(shù)據(jù)包裝實(shí)例不是Supplier類型,這直接調(diào)用數(shù)據(jù)包裝實(shí)例的subscribe方法宝鼓,并把觀察者實(shí)例傳過去刑棵。這樣,可以由數(shù)據(jù)包裝實(shí)例對數(shù)據(jù)轉(zhuǎn)換后自己給觀察者發(fā)出通知愚铡。

source.subscribe(new MergeObserver<>(t, mapper, delayErrors, maxConcurrency, bufferSize))

調(diào)用被觀察者subscribe方法蛉签,傳入MergeObserver實(shí)例,看上去是不是像是又創(chuàng)建了一個(gè)新的被觀察者和觀察者關(guān)系沥寥。在觀察者subscribe里實(shí)現(xiàn)事件發(fā)射邏輯碍舍,在MergeObserver接收事件。下面重點(diǎn)看一下onNext方法邑雅。

  • onNext
    首先獲取數(shù)據(jù)包裝實(shí)例片橡,如果數(shù)據(jù)個(gè)數(shù)大于最大允許的個(gè)數(shù),將不在往隊(duì)列里面插入淮野,直接返回捧书。

  • subscribeInner
    如果數(shù)據(jù)包裝實(shí)例是Supplier類型吹泡,者從隊(duì)列取出數(shù)據(jù)包裝實(shí)例,調(diào)用Supplier的get方法獲取新的轉(zhuǎn)換數(shù)據(jù)经瓷,然后通知觀察者爆哑。

    如果不是Supplier類型,創(chuàng)建InnerObserver實(shí)例了嚎,傳入上面創(chuàng)建的MergeObserver實(shí)例泪漂,添加到,然后調(diào)用數(shù)據(jù)包裝實(shí)例的subscribe方法歪泳,就好像數(shù)據(jù)包裝實(shí)例做被觀察者萝勤,InnerObserver作為觀察者。發(fā)射的數(shù)據(jù)會被保存在InnerObserver中的隊(duì)列queue中呐伞,然后一一發(fā)射給觀察者敌卓。queue是線程可見的隊(duì)列,這樣可以運(yùn)用到多線程中伶氢。

    RxJava很多操作符都對多線程進(jìn)行了支持和處理趟径,如果想在多線程中使用,可以自己查看一下源碼癣防。也不是很難蜗巧。

ObservableMapNotification

這個(gè)類大概就是通過傳入的數(shù)據(jù)包裝實(shí)例對數(shù)據(jù)做轉(zhuǎn)換,然后通知觀察者蕾盯,具體邏輯可以自己查看源碼幕屹,源碼很少很簡單。

FlatMapWithCombinerOuter

簡單看了一下代碼级遭,大概功能是望拖,將得到的兩個(gè)包裝數(shù)據(jù)實(shí)例進(jìn)行合并,前一個(gè)包裝數(shù)據(jù)實(shí)例會被傳入到第二包裝實(shí)例里面挫鸽,然后返回最終的包裝數(shù)據(jù)说敏。

當(dāng)有兩個(gè)數(shù)據(jù)需要合并成一個(gè)新的數(shù)據(jù)實(shí)例時(shí),可以用這個(gè)丢郊。

ObservableFlatMapCompletableCompletable

將值序列映射到CompletableSources中盔沫,并等待其終止。代碼不多枫匾,自己查看迅诬。

ObservableFlattenIterable

將序列映射為Iterable并發(fā)出其值。然后遍歷迭代器發(fā)射數(shù)據(jù)婿牍。

flatMapIntoIterable

功能和ObservableFlattenIterable差不多,只是被觀察者本身就是一個(gè)迭代器惩歉。

ObservableFlatMapMaybe

將上游值映射到MaybeSources中等脂,并將其信號合并為一個(gè)序列俏蛮。

ObservableFlatMapSingle

將上游值映射到SingleSources并將其信號合并為一個(gè)序列。

GroupBy

將一個(gè)Observable分拆為一些Observables集合上遥,它們中的每一個(gè)發(fā)射原始Observable的一個(gè)子序列搏屑,GroupBy操作符將原始Observable分拆為一些Observables集合,它們中的每一個(gè)發(fā)射原始Observable數(shù)據(jù)序列的一個(gè)子序列粉楚。哪個(gè)數(shù)據(jù)項(xiàng)由哪一個(gè)Observable發(fā)射是由一個(gè)函數(shù)判定的辣恋,這個(gè)函數(shù)給每一項(xiàng)指定一個(gè)Key,Key相同的數(shù)據(jù)會被同一個(gè)Observable發(fā)射模软。

只有ObservableGroupBy實(shí)現(xiàn)類伟骨。在ObservableGroupBy維護(hù)著 groups(Map<Object, GroupedUnicast<K, V>>)這樣一個(gè)數(shù)組,當(dāng)發(fā)射數(shù)據(jù)是燃异,首先獲取key携狭,然后從groups獲取key對應(yīng)的GroupedUnicast實(shí)例,GroupedUnicast中有 State<T, K> state實(shí)例回俐,而state維護(hù)著queue(SpscLinkedArrayQueue)數(shù)組來存放發(fā)射的數(shù)據(jù)逛腿。

group.onNext(v);

if (newGroup) {
   downstream.onNext(group);
   if (group.state.tryAbandon()) {
       cancel(key);
       group.onComplete();
   }
}

如果不是新創(chuàng)建的GroupedUnicast實(shí)例,那么可以推斷直接已經(jīng)發(fā)射過仅颇,所以在GroupedUnicast中已經(jīng)存在觀察者實(shí)例单默。如果是新創(chuàng)建的,執(zhí)行downstream.onNext(group)忘瓦。進(jìn)入后面的流程搁廓。

GroupJoin

由ObservableGroupBy實(shí)現(xiàn)。將不同數(shù)據(jù)來源的數(shù)據(jù)進(jìn)行合并政冻,合并規(guī)則自己查看源碼枚抵。

Map

由ObservableMap實(shí)現(xiàn)。對Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù)明场,執(zhí)行變換操作汽摹。就相當(dāng)于FlatMap中的獲取包裝數(shù)據(jù),然后直接通知觀察者苦锨。源碼很簡單逼泣,自己查看。

Scan

實(shí)現(xiàn)類有

ObservableScan
ObservableScanSeed

連續(xù)地對數(shù)據(jù)序列的每一項(xiàng)應(yīng)用一個(gè)函數(shù)舟舒,然后連續(xù)發(fā)射結(jié)果拉庶。Scan操作符對原始Observable發(fā)射的第一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù),然后將那個(gè)函數(shù)的結(jié)果作為自己的第一項(xiàng)數(shù)據(jù)發(fā)射秃励。它將函數(shù)的結(jié)果同第二項(xiàng)數(shù)據(jù)一起填充給這個(gè)函數(shù)來產(chǎn)生它自己的第二項(xiàng)數(shù)據(jù)氏仗。它持續(xù)進(jìn)行這個(gè)過程來產(chǎn)生剩余的數(shù)據(jù)序列。這個(gè)操作符在某些情況下被叫做accumulator夺鲜。

源碼很簡單皆尔,自己查看呐舔。

Window

這個(gè)就帶大家分析源碼了,有興趣的可以自己查看慷蠕,如果只是想知道用法珊拼,請參考Window

總結(jié)

其實(shí)網(wǎng)上有很多對RxJava操作符的講解,還有對官網(wǎng)文檔的翻譯流炕,我覺得澎现,要想對RxJava用好,還是要自己去查看源碼每辟。其實(shí)RxJava雖然很多剑辫,但是每個(gè)操作符都有獨(dú)立的實(shí)現(xiàn),分析源碼其實(shí)也不難影兽。只要你了解了被觀察者和觀察者的調(diào)用關(guān)系揭斧,很多操作符只不過是在中間有添加了被觀察者和觀察者過程。相當(dāng)于嵌套峻堰。如果你了解這個(gè)執(zhí)行過程讹开,你可以很快找到關(guān)鍵的執(zhí)行代碼。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末捐名,一起剝皮案震驚了整個(gè)濱河市旦万,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌镶蹋,老刑警劉巖成艘,帶你破解...
    沈念sama閱讀 222,729評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異贺归,居然都是意外死亡淆两,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,226評論 3 399
  • 文/潘曉璐 我一進(jìn)店門拂酣,熙熙樓的掌柜王于貴愁眉苦臉地迎上來秋冰,“玉大人,你說我怎么就攤上這事婶熬〗9矗” “怎么了?”我有些...
    開封第一講書人閱讀 169,461評論 0 362
  • 文/不壞的土叔 我叫張陵赵颅,是天一觀的道長虽另。 經(jīng)常有香客問我,道長饺谬,這世上最難降的妖魔是什么捂刺? 我笑而不...
    開封第一講書人閱讀 60,135評論 1 300
  • 正文 為了忘掉前任,我火速辦了婚禮,結(jié)果婚禮上叠萍,老公的妹妹穿的比我還像新娘芝发。我一直安慰自己,他們只是感情好苛谷,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,130評論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著格郁,像睡著了一般腹殿。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上例书,一...
    開封第一講書人閱讀 52,736評論 1 312
  • 那天锣尉,我揣著相機(jī)與錄音,去河邊找鬼决采。 笑死自沧,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的树瞭。 我是一名探鬼主播拇厢,決...
    沈念sama閱讀 41,179評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼晒喷!你這毒婦竟也來了孝偎?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 40,124評論 0 277
  • 序言:老撾萬榮一對情侶失蹤凉敲,失蹤者是張志新(化名)和其女友劉穎衣盾,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體爷抓,經(jīng)...
    沈念sama閱讀 46,657評論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡势决,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,723評論 3 342
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了蓝撇。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片果复。...
    茶點(diǎn)故事閱讀 40,872評論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖唉地,靈堂內(nèi)的尸體忽然破棺而出据悔,到底是詐尸還是另有隱情,我是刑警寧澤耘沼,帶...
    沈念sama閱讀 36,533評論 5 351
  • 正文 年R本政府宣布极颓,位于F島的核電站,受9級特大地震影響群嗤,放射性物質(zhì)發(fā)生泄漏菠隆。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,213評論 3 336
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望骇径。 院中可真熱鬧躯肌,春花似錦、人聲如沸破衔。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,700評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽晰筛。三九已至嫡丙,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間读第,已是汗流浹背曙博。 一陣腳步聲響...
    開封第一講書人閱讀 33,819評論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留怜瞒,地道東北人父泳。 一個(gè)月前我還...
    沈念sama閱讀 49,304評論 3 379
  • 正文 我出身青樓,卻偏偏與公主長得像吴汪,于是被迫代替她去往敵國和親惠窄。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,876評論 2 361

推薦閱讀更多精彩內(nèi)容

  • ReactiveX 系列文章目錄 buffer 間隔固定個(gè)數(shù)緩存 按照規(guī)定大小緩存浇坐,每次取 count 個(gè)數(shù)睬捶,取完...
    三流之路閱讀 1,263評論 0 1
  • 記錄RxJava操作符,方便查詢(2.2.2版本) 英文文檔地址:http://reactivex.io/docu...
    凌云飛魚閱讀 826評論 0 0
  • 一近刘、RxJava操作符概述 RxJava中的操作符就是為了提供函數(shù)式的特性擒贸,函數(shù)式最大的好處就是使得數(shù)據(jù)處理簡潔易...
    測天測地測空氣閱讀 637評論 0 1
  • 創(chuàng)建操作 用于創(chuàng)建Observable的操作符Create通過調(diào)用觀察者的方法從頭創(chuàng)建一個(gè)ObservableEm...
    rkua閱讀 1,836評論 0 1
  • 今天,對RxJava中的變換操作符進(jìn)行學(xué)習(xí)觉渴。 一介劫、變換操作符的作用 對事件序列中的事件/整個(gè)事件序列進(jìn)行加工處理(...
    Jotyy閱讀 244評論 0 0