Rxjava源碼解析

先上代碼:

ObservableOnSubscribe<Integer> oos = new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
         final int max = 100;
         for (int i = 1; i <= max; i++) {
             e.onNext(max);
         }
         e.onComplete();
     }
 };
 Observer<Integer> observer = new Observer<Integer>() {
     @Override
     public void onSubscribe(Disposable d) {
     }
     @Override
     public void onNext(Integer integer) {
     }
     @Override
     public void onError(Throwable e) {
     }
     @Override
     public void onComplete() {
     }
 };

 Observable.create(oos)
           .observeOn(AndroidSchedulers.mainThread())
           .subscribeOn(Schedulers.computation())
           .subscribe(observer);

上面是Rxjava最簡單的實現(xiàn)模型盆繁。
從鏈式調(diào)用的返回值來看:

  Observable.create()------》ObservableCreate extends Observable
  ObservableCreate.observerOn()------->ObservableObserveOn extends  AbstractObservableWithUpstream  extends  Observable
  ObservableObserveOn.subscribeOn()------->ObservableSubscribeOn extends  AbstractObservableWithUpstream  extends  Observable

所以最后的調(diào)用對象是

  ObservableSubscribeOn.subscribe(observer)

從上面的返回值可以看出中間任一一個的返回值返回的都是observable的子對象。

為什么要強調(diào)中間幾個的返回值都是observable的返回值怜瞒,這里要先明確一下夜焦,待會會大量用到subscribe()方法搏予,在Observable(子類)中的subscribe()方法:

public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        //鉤子点把,如果未設(shè)置的話,返回值還是observer
        observer = RxJavaPlugins.onSubscribe(this, observer);
        //空檢驗
        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
        //核心
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        ...... //handle exception 
    }
}

因此下面的分析代碼中幌衣,如果是調(diào)用上面4個對象的subscribe()方法的時候矾削,直接看subscribeActual()方法即可。

那就從最后一層 ObservableSubscribeOn 的 subscribeActual() 方法開始分析泼掠。

@Override
public void subscribeActual(final Observer<? super T> s) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

    s.onSubscribe(parent);

    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

這里的這個s就是我們縮寫的observer
第一行:先把我們的observer封裝成了SubscribeOnObserver
第二行:調(diào)用了observer.onSubscribe()方法
??????????????也就是observer訂閱Observable時候的方法怔软,一般這個時候可以做一些操作
第三行:
parent.setDisposable() 以及scheduler調(diào)度器先不論,待會再分析择镇,這里先看SubscribeTask這個類:

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }

    @Override
    public void run() {
        source.subscribe(parent);
    }
}

看到runnable,熟悉線程的同學(xué)已經(jīng)可以猜到source.subscribe(parent) 這句代碼很可能在子線程中執(zhí)行挡逼,這里先mark一下,待會回到這個地方再具體看腻豌。

這里要先插入一下source和Observer的問題:

public ObservableCreate(ObservableOnSubscribe<T> source) {
    this.source = source;
}

public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
    super(source);
    this.scheduler = scheduler;
}

public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
    super(source);
    this.scheduler = scheduler;
    this.delayError = delayError;
    this.bufferSize = bufferSize;
}

//后面兩個類繼承抽象類家坎,調(diào)用super(source)方法嘱能。
AbstractObservableWithUpstream(ObservableSource<T> source) {
    this.source = source;
}

通過上面的代碼可以看到所有的這三個關(guān)鍵類,source都是通過構(gòu)造傳入進來的虱疏,
而后兩個類都還有schedule參數(shù)惹骂,這個涉及線程調(diào)度,待會也會說做瞪,也mark一下对粪。
通過以上代碼可以分析:

ObservableCreate 的 source 是 oos
ObservableObserveOn 的 source 是 ObservableCreate
ObservableSubscribeOn 的 source 是 ObservableObserveOn

至于Observer,通過代碼可以分析:

 (ObservableObserveOn)source
    .subscribe(parent(SubscribeOnObserver));

 (ObservableCreate)source
    .subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));


 CreateEmitter<T> parent = new CreateEmitter<T>(observer);
 (oos)source.subscribe(parent);

也即是:

ObservableObserveOn 的 observer 是 SubscribeOnObserver
ObservableCreate 的 observer 是 ObserveOnObserver
oos 的 observer 是 CreateEmitter

這里有點一級一級調(diào)用的意味了装蓬,而這個意味就是Rxjava的一個很重要的點著拭。

插入結(jié)束,繼續(xù)回到剛才的 SubscribeTask
結(jié)合上面的分析:

 source.subscribe(parent)

也就意味著

ObservableObserveOn.subscribeActual()

這里轉(zhuǎn)了兩個彎牍帚,各位可以稍微思考一下
而在ObservableObserveOn中:

@Override
protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        //這個暫時忽略儡遮,未設(shè)置的時候不走這里
        source.subscribe(observer);
    } else {
        //線程調(diào)度,待會再分析
        Scheduler.Worker w = scheduler.createWorker();
        //最終會調(diào)用這個暗赶,又是很熟悉的source subscribe()方法
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

繼續(xù)往上走鄙币,走到 ObservableCreate 中,這里省略了重復(fù)流程蹂随。

@Override
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);
    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

走到最上層十嘿,這個最上層的source就是我們前面寫的oos。
這個observer是在 ObservableObserverOn 中的 ObserveOnObserver糙及。這個名字有點像详幽,汗

第一行:先把 ObserveOnObserver 封裝成 CreateEmitter
第二行:調(diào)用 ObserveOnObserver.onSubscribe()方法。

@Override
public void onSubscribe(Disposable s) {
    if (DisposableHelper.validate(this.s, s)) {
        this.s = s;
        if (s instanceof QueueDisposable) {
            @SuppressWarnings("unchecked")
            QueueDisposable<T> qd = (QueueDisposable<T>) s;

            int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

            if (m == QueueDisposable.SYNC) {
                sourceMode = m;
                queue = qd;
                done = true;
                actual.onSubscribe(this);
                schedule();
                return;
            }
            if (m == QueueDisposable.ASYNC) {
                sourceMode = m;
                queue = qd;
                actual.onSubscribe(this);
                return;
            }
        }

        queue = new SpscLinkedArrayQueue<T>(bufferSize);

        actual.onSubscribe(this);
    }
}

這個方法比較長浸锨,但是對我們的流程分析關(guān)鍵的代碼其實就一句

actual.onSubscribe(this);

根據(jù)前面的observer的分析,這個observer其實就是 ObservableSubscribeOn 的 SubscribeOnObserver
最后找到源碼版姑,調(diào)用了 SubscribeOnObserver 的 onSubscribe()方法柱搜。

    @Override
    public void onSubscribe(Disposable s) {
        DisposableHelper.setOnce(this.s, s);
    }

    public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
        ObjectHelper.requireNonNull(d, "d is null");
        if (!field.compareAndSet(null, d)) {
            d.dispose();
            if (field.get() != DISPOSED) {
                reportDisposableSet();
            }
            return false;
        }
        return true;
    }

涉及到CAS的操作,感興趣的同學(xué)可以研究一下剥险,這里對我們的流程沒有太大影響聪蘸。

第三行:至此,整個的流程終于回到了我們的oos表制。

從ObservableSubscribeOn的subscribe()方法歷盡千辛萬苦終于調(diào)用了oos的subscribe()方法健爬。

    ObservableOnSubscribe<Integer> oos = new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            final int max = 100;
            for (int i = 1; i <= max; i++) {
                e.onNext(max);
            }
            e.onComplete();
        }
    };

首先創(chuàng)建了 ObservableEmitter ,然后調(diào)用emmiter.onNext()方法么介。

    @Override
    public void onNext(T t) {
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }

異常先不考慮娜遵,最終是調(diào)用 observer.onNext()方法。
根據(jù)上面的分析壤短,這個Observer是ObserveOnObserver
第一行:先把 ObserveOnObserver 封裝成 CreateEmitter设拟,而CreateEmmiter的構(gòu)造:

    CreateEmitter(Observer<? super T> observer) {
        this.observer = observer;
    }

所以慨仿,可以知道這個observer就是最前面的 ObserveOnObserver

也就是e.Next(n)------>最終會調(diào)用ObserveOnObserver.onNext(n)

    @Override
    public void onNext(T t) {
        if (done) {
            return;
        }
        if (sourceMode != QueueDisposable.ASYNC) {
            queue.offer(t);
        }
        schedule();
    }

最終調(diào)用了 schedule() 方法。

    void schedule() {
        if (getAndIncrement() == 0) {
            worker.schedule(this);
        }
    }

將當(dāng)前對象添加到worker中纳胧,這個是線程調(diào)度的問題了镰吆,待會分析。

再看一下ObserveOnObserver 類的聲明:

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable{}

實現(xiàn)了Runnable接口跑慕,所以關(guān)鍵代碼就在run()方法之中万皿。

   @Override
    public void run() {
        if (outputFused) {
            drainFused();
        } else {
            drainNormal();
        }
    }

    //我們看drainNormal()的方法
    void drainNormal() {
        int missed = 1;

        final SimpleQueue<T> q = queue;
        final Observer<? super T> a = actual;

        for (;;) {
            if (checkTerminated(done, q.isEmpty(), a)) {
                return;
            }

            for (;;) {
                boolean d = done;
                T v;

                try {
                    v = q.poll();
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    s.dispose();
                    q.clear();
                    a.onError(ex);
                    worker.dispose();
                    return;
                }
                boolean empty = v == null;

                if (checkTerminated(d, empty, a)) {
                    return;
                }

                if (empty) {
                    break;
                }

                a.onNext(v);
            }

            missed = addAndGet(-missed);
            if (missed == 0) {
                break;
            }
        }
    }

終于看到了a.onNext()方法,也就是actual.onNext()方法核行。
通過 ObserveOnObserver 的構(gòu)造:

//構(gòu)造方法
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
    this.actual = actual;
    this.worker = worker;
    this.delayError = delayError;
    this.bufferSize = bufferSize;
}

//創(chuàng)建對象的時候
@Override
protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();
        //這里new了ObserveOnObserver對象牢硅。
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

可以看出,這個actual對象钮科,其實就是傳入進來的observer唤衫。
而這個observer結(jié)合SubscribeTask代碼,可以知道:
這個observer其實就是講我們的observer封裝起來的SubscribeOnObserver對象绵脯。

而SubscribeOnObserver的onNext()方法:

    @Override
    public void onNext(T t) {
        actual.onNext(t);
    }

其實就是我們的o.next()方法佳励。

七轉(zhuǎn)八彎,經(jīng)歷這個這么多,也是本文最核心的:

subscribe()方法蛆挫,先一層一層往上回調(diào)赃承,調(diào)用了我們的oos的onNext()方法,
而onNext()里面又一層一層往下回調(diào)悴侵,調(diào)用了我們的obsrever的onNext()方法瞧剖,實現(xiàn)了數(shù)據(jù)的傳遞。

然后是線程切換問題:

還記得我們之前說ObservableSubscribeOn, ObservableObserveOn這兩個對象的構(gòu)造都會傳入一個 schedule 的調(diào)度器嗎可免?

先看 ObservableSubscribeOn

@Override
public void subscribeActual(final Observer<? super T> s) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    s.onSubscribe(parent);
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

再結(jié)合前面的代碼抓于,我們知道這個 scheduler 是Schedulers.computation()

最后走到了:

public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
    ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run));
    try {
        Future<?> f;
        if (delayTime <= 0L) {
            f = executor.submit(task);
        } else {
            f = executor.schedule(task, delayTime, unit);
        }
        task.setFuture(f);
        return task;
    } catch (RejectedExecutionException ex) {
        RxJavaPlugins.onError(ex);
        return EmptyDisposable.INSTANCE;
    }
}

executor,我們非常熟悉的線程池〗浇瑁看到這捉撮,也就大概明白了我們的 source.subscribe(parent)
以及其對應(yīng)的一層層往上回調(diào)都是在subscribeOn(線程) 所調(diào)用的線程之中

然后線程什么時候會再度切換呢妇垢?
是在ObservableObserveOn中的 schedule() 方法中:

    void schedule() {
        if (getAndIncrement() == 0) {
            worker.schedule(this);
        }
    }

這個worker一層層追蹤溯源巾遭,找到了其初始化的地方,是在ObservableObserveOn的subscribeActual()方法之中:

protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();

        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

這個schedule就是observerOn所對應(yīng)的線程闯估。
AndroidSchedulers.mainThread() 的實現(xiàn)是 HandlerScheduler

    public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");

        if (disposed) {
            return Disposables.disposed();
        }

        run = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

        Message message = Message.obtain(handler, scheduled);
        message.obj = this; // Used as token for batch disposal of this worker's runnables.

        if (async) {
            message.setAsynchronous(true);
        }

        handler.sendMessageDelayed(message, unit.toMillis(delay));

        // Re-check disposed state for removing in case we were racing a call to dispose().
        if (disposed) {
            handler.removeCallbacks(scheduled);
            return Disposables.disposed();
        }

        return scheduled;
    }

最終通過hanler進行了線程的切換灼舍。
也就是最后我們的observer.onNext()方法執(zhí)行的線程是由observeOn()所對應(yīng)的線程

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市涨薪,隨后出現(xiàn)的幾起案子骑素,更是在濱河造成了極大的恐慌,老刑警劉巖尤辱,帶你破解...
    沈念sama閱讀 211,290評論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件砂豌,死亡現(xiàn)場離奇詭異厢岂,居然都是意外死亡,警方通過查閱死者的電腦和手機阳距,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,107評論 2 385
  • 文/潘曉璐 我一進店門塔粒,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人筐摘,你說我怎么就攤上這事卒茬。” “怎么了咖熟?”我有些...
    開封第一講書人閱讀 156,872評論 0 347
  • 文/不壞的土叔 我叫張陵圃酵,是天一觀的道長。 經(jīng)常有香客問我馍管,道長郭赐,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,415評論 1 283
  • 正文 為了忘掉前任确沸,我火速辦了婚禮捌锭,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘罗捎。我一直安慰自己观谦,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 65,453評論 6 385
  • 文/花漫 我一把揭開白布桨菜。 她就那樣靜靜地躺著豁状,像睡著了一般。 火紅的嫁衣襯著肌膚如雪倒得。 梳的紋絲不亂的頭發(fā)上泻红,一...
    開封第一講書人閱讀 49,784評論 1 290
  • 那天,我揣著相機與錄音霞掺,去河邊找鬼承桥。 笑死,一個胖子當(dāng)著我的面吹牛根悼,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播蜀撑,決...
    沈念sama閱讀 38,927評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼挤巡,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了酷麦?” 一聲冷哼從身側(cè)響起矿卑,我...
    開封第一講書人閱讀 37,691評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎沃饶,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,137評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡撤蟆,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,472評論 2 326
  • 正文 我和宋清朗相戀三年大猛,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片荆忍。...
    茶點故事閱讀 38,622評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出抖拦,到底是詐尸還是另有隱情,我是刑警寧澤舷暮,帶...
    沈念sama閱讀 34,289評論 4 329
  • 正文 年R本政府宣布态罪,位于F島的核電站,受9級特大地震影響下面,放射性物質(zhì)發(fā)生泄漏复颈。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,887評論 3 312
  • 文/蒙蒙 一沥割、第九天 我趴在偏房一處隱蔽的房頂上張望耗啦。 院中可真熱鬧,春花似錦驯遇、人聲如沸芹彬。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,741評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽舒帮。三九已至,卻和暖如春陡叠,著一層夾襖步出監(jiān)牢的瞬間玩郊,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評論 1 265
  • 我被黑心中介騙來泰國打工枉阵, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留译红,地道東北人。 一個月前我還...
    沈念sama閱讀 46,316評論 2 360
  • 正文 我出身青樓兴溜,卻偏偏與公主長得像侦厚,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子拙徽,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,490評論 2 348

推薦閱讀更多精彩內(nèi)容