Rxjava源碼解析

這邊文章主要記錄使用Rxjava過程中對map方法以及flatmap方法的源碼理解主到,自認(rèn)為也是RxJava的一個精髓所在淆珊。
有關(guān)RxJava的詳細(xì)使用蛉签,網(wǎng)絡(luò)已經(jīng)有很多資料悯嗓。這里推薦[匠心寫作]的一篇文章

http://gank.io/post/560e15be2dca930e00da1083

下面進(jìn)入正題拒迅,先看下map方法

map方法解析:

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {    
    return lift(new OperatorMap<T, R>(func));
}

OperatorMap是Operator接口的實現(xiàn)類骚秦,來看一下Operator接口

public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {    
    // cover for generics insanity
}

而Operator又繼承了Func1,這個接口有一個只有一個方法R call(T t);
看下OperatorMap的call方法實現(xiàn)

public Subscriber<? super T> call(final Subscriber<? super R> o) {
        return new Subscriber<T>(o) {

            @Override
            public void onCompleted() {
                o.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                o.onError(e);
            }

            @Override
            public void onNext(T t) {
                try {
                    o.onNext(transformer.call(t));
                } catch (Throwable e) {
                    Exceptions.throwOrReport(e, this, t);
                }
            }

        };
    }

然后看lift函數(shù)

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber<? super R> o) {
                try {
                    Subscriber<? super T> st = hook.onLift(operator).call(o);
                    try {
                        // new Subscriber created and being subscribed with so 'onStart' it
                        st.onStart();
                        onSubscribe.call(st);
                    } catch (Throwable e) {
                        // localized capture of errors rather than it skipping all operators 
                        // and ending up in the try/catch of the subscribe method which then
                        // prevents onErrorResumeNext and other similar approaches to error handling
                        Exceptions.throwIfFatal(e);
                        st.onError(e);
                    }
                } catch (Throwable e) {
                    Exceptions.throwIfFatal(e);
                    // if the lift function failed all we can do is pass the error to the final Subscriber
                    // as we don't have the operator available to us
                    o.onError(e);
                }
            }
        });
    }

簡單點來說呢璧微,做了三件事
1.創(chuàng)建新的Observable作箍,負(fù)責(zé)接手原Observable發(fā)出的事件
2.hook.onLift(operator).call(o) 會執(zhí)行OperatorMap的call方法,返回一個新的Subscriber。
3.onSubscribe.call(st)新的Subscriber會傳給原Observable前硫。在原Observable發(fā)送事件是會調(diào)用新Subscriber的onNext方法胞得,會先執(zhí)行transformer.call(t) 即map(Func1)方法中參數(shù)Func1的call方法,然后執(zhí)行原Subscriber的onNext方法屹电。

faltmap方法解析

還是老樣子阶剑,先貼一下faltmap的源碼

public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
        if (getClass() == ScalarSynchronousObservable.class) {
            return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
        }
        return merge(map(func));
}

這里會執(zhí)行 merge(map(func)) 這里其實就是merge了剛才的map方法嘛。
好了嗤详,map方法我就不說了个扰,可以看下前面的解釋,這里提示一點 這里的map方法返回的是Observable<Observable>葱色,然后看下merge方法

public final static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
        if (source.getClass() == ScalarSynchronousObservable.class) {
            return ((ScalarSynchronousObservable<T>)source).scalarFlatMap((Func1)UtilityFunctions.identity());
        }
        return source.lift(OperatorMerge.<T>instance(false));
}

這里會執(zhí)行到 source.lift(OperatorMerge.<T>instance(false))其實就是之前map方法里面介紹的lift方法递宅。這里的source類型是Observable<Observable>可以理解成一個新Observable接收所有原Observable發(fā)出的事件,組成一個新的Observable。然后執(zhí)行l(wèi)ift的方法办龄,從之前map方法的分析知道烘绽,這里會先去執(zhí)行Operator中的call方法,然后執(zhí)行原Subscriber中的call方法俐填。
這里Operator為OperatorMerge安接,看下這個類中的call方法

public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) {
        MergeSubscriber<T> subscriber = new MergeSubscriber<T>(child, delayErrors, maxConcurrent);
        MergeProducer<T> producer = new MergeProducer<T>(subscriber);
        subscriber.producer = producer;
        
        child.add(subscriber);
        child.setProducer(producer);
        
        return subscriber;
}

這里經(jīng)過一些操作,最終會走到MergeProducer中的request方法中

public void request(long n) {
    if (n > 0) {
        if (get() == Long.MAX_VALUE) {
            return;
        }
        BackpressureUtils.getAndAddRequest(this, n);
        subscriber.emit();
    } else 
    if (n < 0) {
        throw new IllegalArgumentException("n >= 0 required");
    }
}

這里注意這句subscriber.emit()會發(fā)送所有的事件英融。這樣就解釋通了
1.接收所有原Observable的事件盏檐,組成新的Observable
2.新Observable發(fā)送所有事件
3.原Subscriber接收到新事件后進(jìn)行處理

總結(jié):

不管是map還是flatmap,其實都是運用了轉(zhuǎn)換的思想

  1. 截斷原事件分發(fā)流程驶悟。
  2. 增加中間處理操作(map 增加了一個call方法回調(diào)胡野,flatmap增加了一次事件收集再發(fā)送)。
  3. 回到原事件分發(fā)流程處理事件痕鳍。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末硫豆,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子笼呆,更是在濱河造成了極大的恐慌熊响,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,627評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件诗赌,死亡現(xiàn)場離奇詭異汗茄,居然都是意外死亡,警方通過查閱死者的電腦和手機铭若,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,180評論 3 399
  • 文/潘曉璐 我一進(jìn)店門剔难,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人奥喻,你說我怎么就攤上這事》羌#” “怎么了环鲤?”我有些...
    開封第一講書人閱讀 169,346評論 0 362
  • 文/不壞的土叔 我叫張陵,是天一觀的道長憎兽。 經(jīng)常有香客問我冷离,道長,這世上最難降的妖魔是什么纯命? 我笑而不...
    開封第一講書人閱讀 60,097評論 1 300
  • 正文 為了忘掉前任西剥,我火速辦了婚禮,結(jié)果婚禮上亿汞,老公的妹妹穿的比我還像新娘瞭空。我一直安慰自己,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 69,100評論 6 398
  • 文/花漫 我一把揭開白布咆畏。 她就那樣靜靜地躺著南捂,像睡著了一般。 火紅的嫁衣襯著肌膚如雪旧找。 梳的紋絲不亂的頭發(fā)上溺健,一...
    開封第一講書人閱讀 52,696評論 1 312
  • 那天,我揣著相機與錄音钮蛛,去河邊找鬼鞭缭。 笑死,一個胖子當(dāng)著我的面吹牛魏颓,可吹牛的內(nèi)容都是我干的岭辣。 我是一名探鬼主播,決...
    沈念sama閱讀 41,165評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼琼开,長吁一口氣:“原來是場噩夢啊……” “哼易结!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起柜候,我...
    開封第一講書人閱讀 40,108評論 0 277
  • 序言:老撾萬榮一對情侶失蹤搞动,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后渣刷,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體鹦肿,經(jīng)...
    沈念sama閱讀 46,646評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,709評論 3 342
  • 正文 我和宋清朗相戀三年辅柴,在試婚紗的時候發(fā)現(xiàn)自己被綠了箩溃。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,861評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡碌嘀,死狀恐怖涣旨,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情股冗,我是刑警寧澤霹陡,帶...
    沈念sama閱讀 36,527評論 5 351
  • 正文 年R本政府宣布,位于F島的核電站止状,受9級特大地震影響烹棉,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜怯疤,卻給世界環(huán)境...
    茶點故事閱讀 42,196評論 3 336
  • 文/蒙蒙 一浆洗、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧集峦,春花似錦伏社、人聲如沸抠刺。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,698評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽矫付。三九已至,卻和暖如春第焰,著一層夾襖步出監(jiān)牢的瞬間买优,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,804評論 1 274
  • 我被黑心中介騙來泰國打工挺举, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留杀赢,地道東北人。 一個月前我還...
    沈念sama閱讀 49,287評論 3 379
  • 正文 我出身青樓湘纵,卻偏偏與公主長得像脂崔,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子梧喷,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,860評論 2 361

推薦閱讀更多精彩內(nèi)容