Rxjava(一)——鏈?zhǔn)秸{(diào)用怎么實(shí)現(xiàn)的咪辱?

Rxjava雖然在項(xiàng)目中使用了很久振劳,但是卻一直沒有時(shí)間去了解其實(shí)現(xiàn)原理,最近空了下來梧乘,也把源碼走讀了一遍澎迎,加上看大神的博客,大致弄懂了其中的兩個(gè)關(guān)鍵點(diǎn)选调;

  • Rxjava中鏈?zhǔn)秸{(diào)用怎么實(shí)現(xiàn)的夹供?
  • Rxjava中的線程是如何切換的?

Rxjava操作符功能就不在本文中提及仁堪,以如下代碼進(jìn)行調(diào)試哮洽,了解第一個(gè)問題,Rxjava中鏈?zhǔn)秸{(diào)用怎么實(shí)現(xiàn)的弦聂?鸟辅。

        Observable.just("a")     //Observable1
                .map(new Func1<String, String>() {  //Observable2   
                    @Override
                    public String call(String s) {
                        System.out.print(Thread.currentThread().getName() + ":first--" + s +"\n");
                        return s + s;
                    }
                })
                .subscribe(new Subscriber<String>() { //代碼⑥ Subscriber
                    @Override
                    public void onCompleted() {
                        System.out.print(Thread.currentThread().getName()+"\n");
                        System.out.print("completed"+"\n");
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.print("error");
                    }

                    @Override
                    public void onNext(String s) {
                        System.out.println(s);
                    }
                });
    }

先說說自己的理解,若把整個(gè)鏈條看成一個(gè)整體對象莺葫,那么just創(chuàng)建被觀察者對象匪凉,而subscribe()里的Subscriber作為觀察者;若每一步都分開看捺檬,just()和subscribe()中間的操作符即是觀察者再层,又是被觀察者。

Observable中每個(gè)操作符基本都會(huì)創(chuàng)建出一個(gè)新的Observable;因此可以解理成后一級的操作符去觀察前一個(gè)Observable對象聂受;以上例來說蒿秦,.subscribe的Subscriber所觀察的對象就是.map返回的Observable2,而.map的Subscriber所觀察的對象就是 Observable.just("a")得到的對象Observable1蛋济;

下面擼一擼其實(shí)現(xiàn)代碼棍鳖,整個(gè)鏈?zhǔn)秸{(diào)用真正開始的地方是.subscribe(),我們就從這里開始擼碗旅。省略掉一些代碼渡处,只看關(guān)鍵部分如下:

private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
      ...
      try {
            // allow the hook to intercept and/or decorate
            hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); //代碼①
            return hook.onSubscribeReturn(subscriber);
        }
        ...
}

hook.onSubscribeStart(observable, observable.onSubscribe)得到的對象就是observable.onSubscribe,而此處的observable明顯就是this扛芽,也就是上例中的observable2對象骂蓖,即把subscriber傳入到了observable2里面以供其調(diào)用。

再跟著代碼進(jìn)入observable2(.map操作符)的實(shí)現(xiàn)川尖。其主要實(shí)現(xiàn)是lift和OperatorMap登下。如下:

    public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return lift(new OperatorMap<T, R>(func));
    }

lift和OperatorMap各自干了什么事情呢?先看OperatorMap,F(xiàn)unc1也作為構(gòu)造參數(shù)傳入肪获。關(guān)鍵代碼:

    @Override
    public Subscriber<? super T> call(final Subscriber<? super R> o) {  //代碼②
        return new Subscriber<T>(o) {

            @Override
            public void onCompleted() {
                o.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                o.onError(e);
            }

            @Override
            public void onNext(T t) {
                try {
                    o.onNext(transformer.call(t));
                } catch (Throwable e) {
                    Exceptions.throwOrReport(e, this, t);
                }
            }

        };
    }

這里new出了一個(gè)觀察者對象Subscriber,它實(shí)現(xiàn)了什么功能通過 o.onNext(transformer.call(t));即將例子中的Func1代碼執(zhí)行后將結(jié)果傳入到下一層畔濒。即這里運(yùn)行了Func1的代碼

再看lift()操作符,看其返回值也就是我們定義的observable2對象锣咒。因此subscribe里的"代碼①"的call即是此處observable2里OnSubscribe的call方法侵状;再看call方法,“代碼④”部分則是調(diào)用到了observable1對象里OnSubscribe的call方法毅整,而“代碼③”將Func1操作動(dòng)作轉(zhuǎn)變?yōu)镾ubscriber趣兄,通過call(o)完成對下一級Subscriber的引用。

 public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber<? super R> o) {
                try {
                    Subscriber<? super T> st = hook.onLift(operator).call(o); //代碼③
                    try {
                        // new Subscriber created and being subscribed with so 'onStart' it
                        st.onStart();
                        onSubscribe.call(st);  //代碼④
                    } catch (Throwable e) {
                        // localized capture of errors rather than it skipping all operators 
                        // and ending up in the try/catch of the subscribe method which then
                        // prevents onErrorResumeNext and other similar approaches to error handling
                        Exceptions.throwIfFatal(e);
                        st.onError(e);
                    }
                } catch (Throwable e) {
                    Exceptions.throwIfFatal(e);
                    // if the lift function failed all we can do is pass the error to the final Subscriber
                    // as we don't have the operator available to us
                    o.onError(e);
                }
            }
        });
    }

到這里“代碼④”執(zhí)行悼嫉,即到了observable1對象艇潭,也就是例子中 Observable.just("a")所得到對象的OnSubscribe的call()方法,如下:

  public final static <T> Observable<T> just(final T value) {
        return ScalarSynchronousObservable.create(value);
    }

ScalarSynchronousObservable類代碼如下:

 public static final <T> ScalarSynchronousObservable<T> create(T t) {
        return new ScalarSynchronousObservable<T>(t);
 }
 protected ScalarSynchronousObservable(final T t) {
        super(new OnSubscribe<T>() {

            @Override
            public void call(Subscriber<? super T> s) {
                /*
                 *  We don't check isUnsubscribed as it is a significant performance impact in the fast-path use cases.
                 *  See PerfBaseline tests and https://github.com/ReactiveX/RxJava/issues/1383 for more information.
                 *  The assumption here is that when asking for a single item we should emit it and not concern ourselves with 
                 *  being unsubscribed already. If the Subscriber unsubscribes at 0, they shouldn't have subscribed, or it will 
                 *  filter it out (such as take(0)). This prevents us from paying the price on every subscription. 
                 */
                s.onNext(t);  //代碼⑤
                s.onCompleted();
            }

        });
        this.t = t;
    }

其中"代碼⑤"是關(guān)鍵點(diǎn)戏蔑,t即是我們just傳入的"a",s則是代碼④傳入的st蹋凝,它其實(shí)是observable2的Subscriber(觀察者),相當(dāng)于observable1持有observable2的引用总棵。通過 s.onNext(t)鳍寂,完成了observable1向下一層的observable2的回調(diào),也就是Func1對象所在的Subscriber(OperatorMap)情龄,再通過 o.onNext(transformer.call(t));回到例子中“代碼⑥”伐割,至此候味,整個(gè)調(diào)用鏈完成。

上面的分析比較混亂隔心,重新梳理代碼執(zhí)行流程 :
1、subscribe里尚胞,hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); //代碼①
2硬霍、map里,通過lift()將Func1操作符生成Subserber笼裳,Subscriber<? super T> st = hook.onLift(operator).call(o); //代碼③
onSubscribe.call(st); //代碼④
3唯卖、just里create(), s.onNext(t); //代碼⑤
4躬柬、map里拜轨, OperatorMap里對象, o.onNext(transformer.call(t));
5允青、subscribe 的Subscriber();

Observable的所有鏈?zhǔn)秸{(diào)用橄碾,知道兩個(gè)其兩個(gè)關(guān)鍵點(diǎn)即可梳理清楚整個(gè)數(shù)據(jù)流傳遞原理;

  • Observable.onSubscribe對象颠锉,完成以call方法來向上一層傳遞法牲;
  • Subserber向下一層的Subserber調(diào)用

至于其中的線程調(diào)度琼掠,只需要知道線程調(diào)度并不影響鏈?zhǔn)秸{(diào)用的數(shù)據(jù)流傳遞拒垃,其原理我們下一節(jié)再梳理;

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末瓷蛙,一起剝皮案震驚了整個(gè)濱河市悼瓮,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌艰猬,老刑警劉巖横堡,帶你破解...
    沈念sama閱讀 218,284評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異姥宝,居然都是意外死亡翅萤,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,115評論 3 395
  • 文/潘曉璐 我一進(jìn)店門腊满,熙熙樓的掌柜王于貴愁眉苦臉地迎上來套么,“玉大人,你說我怎么就攤上這事碳蛋∨呙冢” “怎么了?”我有些...
    開封第一講書人閱讀 164,614評論 0 354
  • 文/不壞的土叔 我叫張陵肃弟,是天一觀的道長玷室。 經(jīng)常有香客問我零蓉,道長,這世上最難降的妖魔是什么穷缤? 我笑而不...
    開封第一講書人閱讀 58,671評論 1 293
  • 正文 為了忘掉前任敌蜂,我火速辦了婚禮,結(jié)果婚禮上津肛,老公的妹妹穿的比我還像新娘章喉。我一直安慰自己,他們只是感情好身坐,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,699評論 6 392
  • 文/花漫 我一把揭開白布秸脱。 她就那樣靜靜地躺著,像睡著了一般部蛇。 火紅的嫁衣襯著肌膚如雪摊唇。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,562評論 1 305
  • 那天涯鲁,我揣著相機(jī)與錄音巷查,去河邊找鬼。 笑死撮竿,一個(gè)胖子當(dāng)著我的面吹牛吮便,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播幢踏,決...
    沈念sama閱讀 40,309評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼髓需,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了房蝉?” 一聲冷哼從身側(cè)響起僚匆,我...
    開封第一講書人閱讀 39,223評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎搭幻,沒想到半個(gè)月后咧擂,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,668評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡檀蹋,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,859評論 3 336
  • 正文 我和宋清朗相戀三年松申,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片俯逾。...
    茶點(diǎn)故事閱讀 39,981評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡贸桶,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出桌肴,到底是詐尸還是另有隱情皇筛,我是刑警寧澤,帶...
    沈念sama閱讀 35,705評論 5 347
  • 正文 年R本政府宣布坠七,位于F島的核電站水醋,受9級特大地震影響旗笔,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜拄踪,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,310評論 3 330
  • 文/蒙蒙 一蝇恶、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧惶桐,春花似錦艘包、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,904評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽卦尊。三九已至叛拷,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間岂却,已是汗流浹背忿薇。 一陣腳步聲響...
    開封第一講書人閱讀 33,023評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留躏哩,地道東北人署浩。 一個(gè)月前我還...
    沈念sama閱讀 48,146評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像扫尺,于是被迫代替她去往敵國和親筋栋。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,933評論 2 355

推薦閱讀更多精彩內(nèi)容