RxJava2源碼解析系列——1.使用流程解析

概述

Rx系列現(xiàn)在火遍全球盼忌,網(wǎng)上也紛紛涌現(xiàn)各類教程、博客。作為一個Android開發(fā)人員目溉,我認為掌握RxJava已經(jīng)成為了一項必不可少的專業(yè)技能,然而一眛的去看網(wǎng)上已有的教程和博客菱农,并不能讓自己深入理解RxJava缭付,于是有了本系列,也當作為自己的一個總結(jié)循未。本系列章節(jié)打算從最常見的使用開始陷猫,然后進入源碼具體分析。

使用

最簡單的使用RxJava的一個例子的妖,我們需要三個元素

  1. Observable(被觀察者)
  2. Observer(觀察者)
  3. subscribe(訂閱關(guān)系)

廢話不多說绣檬,就是上代碼

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                Log.d(TAG, "subscribe");
                e.onNext("This is String");
                e.onNext("This is String");
                e.onComplete();
            }
        })
                .subscribe(new Observer() {
                    @Override
                    public void onSubscribe(@NonNull final Disposable d) {
                        Log.d(TAG, "onSubScribe");
                    }

                    @Override
                    public void onNext(@NonNull Object o) {
                        Log.d(TAG, "onNext");

                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
                        Log.d(TAG, "onError");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

1.首先我們需要創(chuàng)建Observable,創(chuàng)建Observable的操作符有很多嫂粟,這里不一一寫出娇未,本文中先使用create操作符創(chuàng)建Observable對象。我們來看一下create方法:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
  1. create方法中需要一個ObservableOnSubscribe<T>類型參數(shù)
  2. 方法中最先使用ObjectHelper做了一次判空處理赋元。
  3. 使用RxJavaPlugins做了某些事情
mark

ObservableOnSubscribe<T>是一個接口忘蟹,因此需要實現(xiàn)當中的subscribe方法,而subscribe方法中存在ObservableEmitter<T>類型的參數(shù)搁凸。

mark
mark

ObservableEmitter<T>繼承Emitter<T>接口媚值,Emitter<T>接口中定義了我們熟知的onNext、onError护糖、onComplete方法褥芒。

ObjectHelper是個工具類,在這就不多說了。

mark

RxJavaPlugins顧名思義锰扶,應該叫做插件類献酗,具體的作用后續(xù)再做詳解,這里大概說明一下坷牛,該類的onAssembly方法跟hook相關(guān)罕偎。而這里hook不影響我們主流程,因此傳進去什么參數(shù)就返回什么(我們這里傳進去的是ObservableCreate<T>對象)京闰,在ObservableCreate<T>中定義了具體訂閱時的邏輯以及發(fā)射器的邏輯颜及。

從這個流程我們看出,當我們使用create操作符創(chuàng)建一個Observable時蹂楣,我們需要傳入一個實現(xiàn)了ObservableOnSubscribe<T>接口的對象俏站,在這個實現(xiàn)中,存在發(fā)射器ObservableEmitter<T>痊土,通過它可以讓使用者自由定義數(shù)據(jù)流向肄扎。并且在create操作符過程中,ObservableOnSubscribe<T>對象與ObservableCreate<T>相關(guān)聯(lián)赁酝。

2.接下來我們需要創(chuàng)建Observer犯祠,Observer是個接口,因此我們只需要實現(xiàn)該接口即可赞哗。

new Observer() {
                    @Override
                    public void onSubscribe(@NonNull final Disposable d) {
                        Log.d(TAG, "onSubScribe");
                    }

                    @Override
                    public void onNext(@NonNull Object o) {
                        Log.d(TAG, "onNext");

                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
                        Log.d(TAG, "onError");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                }

創(chuàng)建Observer很簡單雷则,實現(xiàn)接口當中定義的4個方法即可辆雾。

  1. onSubscribe(@NonNull final Disposable d),當訂閱時會回調(diào)該方法肪笋,Disposable 用來取消訂閱關(guān)系。
  2. onNext(@NonNull Object o)度迂,發(fā)射數(shù)據(jù)
  3. onComplete()藤乙,當onNext方法全部執(zhí)行完畢,執(zhí)行該方法惭墓。
  4. onError(@NonNull Throwable e)坛梁,當數(shù)據(jù)流向出現(xiàn)問題或使用者自己調(diào)用時執(zhí)行。

3.完成訂閱過程腊凶,首先我們來看看如何完成訂閱的划咐。

observable.subscribe(observer) 

當我們使用創(chuàng)建操作符create創(chuàng)建Observable時,還記得create方法返回的是什么類型嗎钧萍?ok褐缠,就是Observable類型,但具體的實現(xiàn)類是ObservableCreate<T>风瘦。

進入Observable的subscribe方法看看:

mark
  1. 判空處理
  2. hook相關(guān)
  3. 執(zhí)行subscribeActual(observer)
  4. 拋出異常

在這4個流程當中队魏,subscribeActual(observer)方法才是我們應該關(guān)注的。在Observable類中万搔,subscribeActual(observer)是個抽象方法胡桨,因此我們需要尋找它的具體實現(xiàn)官帘。上面已經(jīng)提到使用create操作符,具體的實現(xiàn)類是ObservableCreate<T>昧谊。

mark

在ObservableCreate<T>類的subscribeActual(observer)中刽虹,所聲明的方法參數(shù)便是我們外部傳進來的實現(xiàn)Observer接口的對象。

  1. 將Observer(觀察者)與發(fā)射器相關(guān)聯(lián)
  2. 調(diào)用observer的onSubscribe方法呢诬,為了方便觀察者可隨時解除訂閱關(guān)系状婶。
  3. 執(zhí)行使用者自定義的subscribe方法中的邏輯,同時也將發(fā)射器與Observable(被觀察者)做關(guān)聯(lián)
  4. 發(fā)生異常馅巷,回調(diào)onError

因此膛虫,當執(zhí)行到subscribeActual(observer)時,才是真正的訂閱钓猬。

而當執(zhí)行到

 source.subscribe(parent);  

將ObservableOnSubscribe(源頭)與CreateEmitter(Observer稍刀,終點)聯(lián)系起來。

這里的souce就是

mark

這里的parent就是 CreateEmitter<T>敞曹。因此账月,會執(zhí)行parent.onNext(), parent.onComplete(),parent.onError()

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {
  
        private static final long serialVersionUID = -3434801548987643227L;
        final Observer<? super T> observer;
  
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            ...
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            ...
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }
        ...
    }

到這,最簡單的一個使用RxJava的流程就結(jié)束了澳迫。

最后總結(jié)

1.創(chuàng)建Observable過程

  1. 需要傳入一個實現(xiàn)了ObservableOnSubscribe<T>接口的對象
  2. 在ObservableOnSubscribe<T>實現(xiàn)中局齿,通過ObservableEmitter<T>可以讓使用者自由定義數(shù)據(jù)流向
  3. create操作符過程中,采用適配器模式橄登,將ObservableOnSubscribe<T>通過ObservableCreate<T>適配為Observable<T>對象抓歼,讓ObservableOnSubscribe<T>與ObservableCreate<T>相關(guān)聯(lián)

2.訂閱過程

  1. 真正訂閱的方法在subscribeActual(Observer<? super T> observer)
  2. source.subscribe(parent); 這行代碼執(zhí)行時,才開始發(fā)射數(shù)據(jù)拢锹,在ObservableOnSubscribe<T>中通過ObservableEmitter<T>發(fā)送數(shù)據(jù)給Observer
  3. 當Observable與Observer訂閱關(guān)系被dispose時谣妻,不會執(zhí)行onXXX方法。
  4. Observer 的 onComplete() 和 onError() 互斥只能執(zhí)行一次卒稳,因為CreateEmitter 在回調(diào)他們兩中任意一個后蹋半,都會自動 dispose()。
  5. 先 error 后 complete充坑,complete 不顯示减江。 反之會 crash
  6. 還有一點要注意的是 onSubscribe() 是在我們執(zhí)行 subscribe() 這句代碼的那個線程回調(diào)的,并不受線程調(diào)度影響
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末捻爷,一起剝皮案震驚了整個濱河市辈灼,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌役衡,老刑警劉巖茵休,帶你破解...
    沈念sama閱讀 206,214評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異,居然都是意外死亡榕莺,警方通過查閱死者的電腦和手機俐芯,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來钉鸯,“玉大人吧史,你說我怎么就攤上這事∵氲瘢” “怎么了贸营?”我有些...
    開封第一講書人閱讀 152,543評論 0 341
  • 文/不壞的土叔 我叫張陵,是天一觀的道長岩睁。 經(jīng)常有香客問我钞脂,道長,這世上最難降的妖魔是什么捕儒? 我笑而不...
    開封第一講書人閱讀 55,221評論 1 279
  • 正文 為了忘掉前任冰啃,我火速辦了婚禮,結(jié)果婚禮上刘莹,老公的妹妹穿的比我還像新娘阎毅。我一直安慰自己,他們只是感情好点弯,可當我...
    茶點故事閱讀 64,224評論 5 371
  • 文/花漫 我一把揭開白布扇调。 她就那樣靜靜地躺著,像睡著了一般抢肛。 火紅的嫁衣襯著肌膚如雪狼钮。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,007評論 1 284
  • 那天雌团,我揣著相機與錄音燃领,去河邊找鬼士聪。 笑死锦援,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的剥悟。 我是一名探鬼主播灵寺,決...
    沈念sama閱讀 38,313評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼区岗!你這毒婦竟也來了略板?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,956評論 0 259
  • 序言:老撾萬榮一對情侶失蹤慈缔,失蹤者是張志新(化名)和其女友劉穎叮称,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,441評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡瓤檐,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,925評論 2 323
  • 正文 我和宋清朗相戀三年赂韵,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片挠蛉。...
    茶點故事閱讀 38,018評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡祭示,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出谴古,到底是詐尸還是另有隱情质涛,我是刑警寧澤,帶...
    沈念sama閱讀 33,685評論 4 322
  • 正文 年R本政府宣布掰担,位于F島的核電站汇陆,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏带饱。R本人自食惡果不足惜瞬测,卻給世界環(huán)境...
    茶點故事閱讀 39,234評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望纠炮。 院中可真熱鬧月趟,春花似錦、人聲如沸恢口。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,240評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽耕肩。三九已至因妇,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間猿诸,已是汗流浹背婚被。 一陣腳步聲響...
    開封第一講書人閱讀 31,464評論 1 261
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留梳虽,地道東北人址芯。 一個月前我還...
    沈念sama閱讀 45,467評論 2 352
  • 正文 我出身青樓,卻偏偏與公主長得像窜觉,于是被迫代替她去往敵國和親谷炸。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,762評論 2 345

推薦閱讀更多精彩內(nèi)容