RxJava系列(二)

讀了RxJava系列(一)的童鞋們,應(yīng)該大致明白了Rxjava的普遍寫法是做什么的了,但是真正的Rxjava的內(nèi)部結(jié)構(gòu)呢?
常見的Rxjava代碼形式:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("1");
                emitter.onNext("2");
                emitter.onNext("3");
                emitter.onComplete();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

代碼分析:

Observable.create

 public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

通過new ObservableCreate<T>(source)可以知道實(shí)際上就是new了一個ObservableCreate類,ObservableCreate的結(jié)構(gòu):


   public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {

只截取了重要的部分,可以看到subscribeActual方法中調(diào)用了source.subscribe(parent)方法,但是subscribeActual方法又是誰調(diào)用的呢?

.subscribe

public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

可以看到在調(diào)用訂閱方法的時候,調(diào)用了subscribeActual方法,

 source.subscribe(parent);

然后下游在onNext,onError,onComplete中就獲取到了數(shù)據(jù)

 static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

        @Override
        public void setDisposable(Disposable d) {
            DisposableHelper.set(this, d);
        }

        @Override
        public void setCancellable(Cancellable c) {
            setDisposable(new CancellableDisposable(c));
        }

        @Override
        public ObservableEmitter<T> serialize() {
            return new SerializedEmitter<T>(this);
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        @Override
        public String toString() {
            return String.format("%s{%s}", getClass().getSimpleName(), super.toString());
        }
    }

同時下游通過Disposable 中dispose方法,來決定之后的數(shù)據(jù)要不要發(fā)送到下游.
總結(jié):
1>new一個匿名類ObservableCreate,重寫了subscribe這個方法,然后該類的構(gòu)造函數(shù)需要ObservableOnSubscribe接口
2>調(diào)用該類實(shí)現(xiàn)的父類的方法subscribe來調(diào)用subscribeActual,進(jìn)而調(diào)用observer.onSubscribe(parent)與匿名類ObservableCreate的subscribe方法
小記:本來是打算寫的詳細(xì)一點(diǎn),但是發(fā)現(xiàn)本身很簡單.....

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子线梗,更是在濱河造成了極大的恐慌汁胆,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,718評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件遵班,死亡現(xiàn)場離奇詭異屠升,居然都是意外死亡潮改,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,683評論 3 385
  • 文/潘曉璐 我一進(jìn)店門腹暖,熙熙樓的掌柜王于貴愁眉苦臉地迎上來汇在,“玉大人,你說我怎么就攤上這事脏答「庋常” “怎么了?”我有些...
    開封第一講書人閱讀 158,207評論 0 348
  • 文/不壞的土叔 我叫張陵殖告,是天一觀的道長阿蝶。 經(jīng)常有香客問我,道長丛肮,這世上最難降的妖魔是什么赡磅? 我笑而不...
    開封第一講書人閱讀 56,755評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮宝与,結(jié)果婚禮上焚廊,老公的妹妹穿的比我還像新娘。我一直安慰自己习劫,他們只是感情好咆瘟,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,862評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著诽里,像睡著了一般袒餐。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上谤狡,一...
    開封第一講書人閱讀 50,050評論 1 291
  • 那天灸眼,我揣著相機(jī)與錄音,去河邊找鬼墓懂。 笑死籽腕,一個胖子當(dāng)著我的面吹牛斧散,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播,決...
    沈念sama閱讀 39,136評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼敌买,長吁一口氣:“原來是場噩夢啊……” “哼媒殉!你這毒婦竟也來了什黑?” 一聲冷哼從身側(cè)響起碧库,我...
    開封第一講書人閱讀 37,882評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎钓葫,沒想到半個月后悄蕾,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,330評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡础浮,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,651評論 2 327
  • 正文 我和宋清朗相戀三年笼吟,在試婚紗的時候發(fā)現(xiàn)自己被綠了库物。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,789評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡贷帮,死狀恐怖戚揭,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情撵枢,我是刑警寧澤民晒,帶...
    沈念sama閱讀 34,477評論 4 333
  • 正文 年R本政府宣布,位于F島的核電站锄禽,受9級特大地震影響潜必,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜沃但,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,135評論 3 317
  • 文/蒙蒙 一磁滚、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧宵晚,春花似錦垂攘、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,864評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至逸贾,卻和暖如春陨仅,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背铝侵。 一陣腳步聲響...
    開封第一講書人閱讀 32,099評論 1 267
  • 我被黑心中介騙來泰國打工灼伤, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人咪鲜。 一個月前我還...
    沈念sama閱讀 46,598評論 2 362
  • 正文 我出身青樓饺蔑,卻偏偏與公主長得像,于是被迫代替她去往敵國和親嗜诀。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,697評論 2 351

推薦閱讀更多精彩內(nèi)容