RxJava源碼

使用

首先從代碼層面來(lái)分析RxJava的每一步到底干了什么。

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
        emitter.onNext("A");
    }
}).map(new Function<String, Bitmap>() {
    @Override
    public Bitmap apply(String s) throws Throwable {
        return null;
    }
}).subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(new Observer<Bitmap>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {
              disposable = d; 
        }

        @Override
        public void onNext(@NonNull Bitmap bitmap) {
                
        }

        @Override
        public void onError(@NonNull Throwable e) {

        }

        @Override
        public void onComplete() {

        }
  });

源碼分析

new ObservableOnSubscribe

在這里ObservableOnSubscribe就是我們的被觀察者

public interface ObservableOnSubscribe<@NonNull T> {

    /**
     * Called for each {@link Observer} that subscribes.
     * @param emitter the safe emitter instance, never {@code null}
     * @throws Throwable on error
     */
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Throwable;
}
Observable.create

調(diào)用Observable.create的時(shí)候?qū)⒈挥^察者傳了進(jìn)來(lái)并且創(chuàng)建了\color{#FF0000}{ObservableCreate}對(duì)象

public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
        Objects.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}
Hook

hook的含義就是在函數(shù)執(zhí)行的過(guò)程中,我們有一個(gè)鉤子函數(shù)狈邑,可以優(yōu)先執(zhí)行我們的代碼驾胆,然后再接著執(zhí)行牲证。
上面的方法中調(diào)用了RxJavaPlugins.onAssembly函數(shù)
RxJavaPlugins.onAssembly(new ObservableCreate<>(source))

/*在這里什么都沒(méi)有做纠脾,直接將source返回了亿傅,所以我們?cè)谶@里可以給
onObservableAssembly進(jìn)行賦值老玛,通過(guò)setOnObservableAssembly方法淤年。
*/
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }
//如果onObservableAssembly有值的話會(huì)執(zhí)行apply方法
 static <T, R> R apply(@NonNull Function<T, R> f, @NonNull T t) {
        try {
            //這里會(huì)執(zhí)行我們的hook方法,將被觀察者傳遞進(jìn)來(lái)蜡豹,在我們實(shí)現(xiàn)的hook方法中返回觀察者
            return f.apply(t);
        } catch (Throwable ex) {
            throw ExceptionHelper.wrapOrThrow(ex);
        }
    }
//自定義hook方法實(shí)現(xiàn)
RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
            @Override
            public Observable apply(Observable observable) throws Throwable {
                //為了不影響代碼流程麸粮,我們需要將observable返回
                return observable;
            }
});
map(new Function<String, Bitmap>()

上面我們獲取到了\color{#FF0000}{ObservableCreate}對(duì)象,再調(diào)用map肯定是調(diào)用了\color{#FF0000}{ObservableCreate}對(duì)象的map方法,由于他繼承自O(shè)bservable镜廉,所以調(diào)用的還是Observable的map方法弄诲。
這里又生成了\color{#FF0000}{ObservableMap}對(duì)象

 public final <R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
        Objects.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper));
}
subscribeOn

\color{#FF0000}{ObservableMap}對(duì)象. subscribeOn

public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {
        Objects.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));
}

這里傳入了一個(gè)Scheduler對(duì)象Schedulers.io(),并且創(chuàng)建了ObservableSubscribeOn對(duì)象

schedulers.io調(diào)用過(guò)程.png

最終拿到了一個(gè)線程池娇唯。

observeOn

AndroidSchedulers.mainThread()

生成了一個(gè)主線程Handler.png

observeOn創(chuàng)建了ObservableObserveOn對(duì)象

new Observer<Bitmap>()
創(chuàng)建了一個(gè)觀察者
public interface Observer<@NonNull T> {

    /**
     * Provides the {@link Observer} with the means of cancelling (disposing) the
     * connection (channel) with the {@link Observable} in both
     * synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
     * @param d the {@link Disposable} instance whose {@link Disposable#dispose()} can
     * be called anytime to cancel the connection
     * @since 2.0
     */
    void onSubscribe(@NonNull Disposable d);

    /**
     * Provides the {@link Observer} with a new item to observe.
     * <p>
     * The {@link Observable} may call this method 0 or more times.
     * <p>
     * The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
     * {@link #onError}.
     *
     * @param t
     *          the item emitted by the Observable
     */
    void onNext(@NonNull T t);

    /**
     * Notifies the {@link Observer} that the {@link Observable} has experienced an error condition.
     * <p>
     * If the {@code Observable} calls this method, it will not thereafter call {@link #onNext} or
     * {@link #onComplete}.
     *
     * @param e
     *          the exception encountered by the Observable
     */
    void onError(@NonNull Throwable e);

    /**
     * Notifies the {@link Observer} that the {@link Observable} has finished sending push-based notifications.
     * <p>
     * The {@code Observable} will not call this method if it calls {@link #onError}.
     */
    void onComplete();

}
層次圖

分析到這里就有了代碼嵌套層次了


嵌套層次.png
subscribe

subscribe方法內(nèi)部調(diào)用了subscribeActual(observer);那我們可以看到調(diào)用的就是ObservableObserveOn的subscribeActual方法

@Override
protected void subscribeActual(Observer<? super T> observer) {
     //主線程的HandlerScheduler明顯不是TrampolineScheduler的子類
     if (scheduler instanceof TrampolineScheduler) {
         source.subscribe(observer);
     } else {
         Scheduler.Worker w = scheduler.createWorker();

         source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
     }
}
//調(diào)用了createWorker
@Override
public Worker createWorker() {
    return new HandlerWorker(handler, async);
}
/*source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
這里的source就是$\color{#FF0000}{ObservableSubscribeOn}$對(duì)象齐遵,因?yàn)檎{(diào)用observeOn方法的是ObservableSubscribeOn,并且他將自己作為source傳了進(jìn)去
*/
public final Observable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {
    Objects.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize));
}

new ObserveOnObserver

在上面的subscribeActual方法我們可以看到他創(chuàng)建了一個(gè)新的ObserveOnObserver對(duì)象

//這里接收了一個(gè)observer對(duì)象塔插,而這個(gè)observer正是我們的觀察者梗摇,也就是在這里對(duì)觀察者又進(jìn)行了一次封裝
new ObserveOnObserver<>(observer, w, delayError, bufferSize)

source.subscribe,source就是ObservableSubscribeOn

//這里SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);又對(duì)observer封裝了一層想许,并且調(diào)用了onSubscribe伶授,最終會(huì)調(diào)到我們自己的觀察者的onSubscribe方法
@Override
public void subscribeActual(final Observer<? super T> observer) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);

    observer.onSubscribe(parent);
    //將觀察者放入線程池中執(zhí)行,下面的SubscribeTask可以看到在子線程里面執(zhí)行了source.subscribe(parent);
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }

    @Override
    public void run() {
        //執(zhí)行了上一步的subscribe方法
        source.subscribe(parent);
    }
}

對(duì)觀察者進(jìn)行封裝圖

觀察者封裝圖.png

一層一層向上調(diào)用subscribe(@NonNull Observer<? super T> observer)伸刃,--->subscribeActual(observer);

當(dāng)調(diào)用到了最上層的時(shí)候source就是我們的被觀察者了

public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
        Objects.requireNonNull(source, "source is null");
        //注意他這里傳的可不是this
        return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}
create的source.png

在我們自定義的被觀察者里面的subscribe調(diào)用了emitter.onNext("A");

從外層向內(nèi)層調(diào)用onNext.png
當(dāng)執(zhí)行到以后一層也就是我們自己調(diào)用subscribe方法的時(shí)候

他會(huì)執(zhí)行ObserveOnObserver的onNext

 @Override
 public void onNext(T t) {
     if (done) {
         return;
     }

     if (sourceMode != QueueDisposable.ASYNC) {
         queue.offer(t);
     }
         schedule();
}
void schedule() {
   if (getAndIncrement() == 0) {
        //這里的work就是我們創(chuàng)建出來(lái)的HandlerWork
        worker.schedule(this);
   }
}
//HandlerWork類的schedule方法
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
    if (run == null) throw new NullPointerException("run == null");
    if (unit == null) throw new NullPointerException("unit == null");

    if (disposed) {
        return Disposable.disposed();
    }

    run = RxJavaPlugins.onSchedule(run);

    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

    Message message = Message.obtain(handler, scheduled);
    message.obj = this; // Used as token for batch disposal of this worker's runnables.

    if (async) {
        message.setAsynchronous(true);
    }
    //使用了主線程的Handler發(fā)送消息
    handler.sendMessageDelayed(message, unit.toMillis(delay));

    // Re-check disposed state for removing in case we were racing a call to dispose().
    if (disposed) {
        handler.removeCallbacks(scheduled);
        return Disposable.disposed();
    }

    return scheduled;
}
handler的message中添加了該類的對(duì)象.png

當(dāng)執(zhí)行上面的run方法的時(shí)候

@Override
public void run() {
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
    }
}
void drainNormal() {
    int missed = 1;

    final SimpleQueue<T> q = queue;
    final Observer<? super T> a = downstream;

    for (;;) {
        if (checkTerminated(done, q.isEmpty(), a)) {
            return;
        }

        for (;;) {
            boolean d = done;
            T v;

            try {
                 v = q.poll();
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                disposed = true;
                upstream.dispose();
                q.clear();
                a.onError(ex);
                worker.dispose();
                return;
            }
            boolean empty = v == null;

            if (checkTerminated(d, empty, a)) {
                return;
            }

            if (empty) {
                break;
            }
            //*************************************************  
            //在這里最終調(diào)用了onNext的方法
            a.onNext(v);
        }

        missed = addAndGet(-missed);
        if (missed == 0) {
             break;
        }
    }
}

總結(jié)

1.執(zhí)行過(guò)程

  • 從上到下對(duì)被觀察者進(jìn)行封裝
  • 從下往上對(duì)觀察者進(jìn)行封裝
  • 然后再?gòu)纳贤聢?zhí)行onNext
    思考一個(gè)問(wèn)題谎砾?對(duì)觀察者和被觀察者的封裝層數(shù)是一樣的,那么是不是可以看成是1個(gè)被觀察者對(duì)應(yīng)一個(gè)觀察者
    對(duì)應(yīng)關(guān)系.png

2.觀察者的onSubscribe是在ObservableSubscribeOn的subscribeActual方法中執(zhí)行的捧颅,此時(shí)還沒(méi)有進(jìn)行線程的切換景图,那么就是說(shuō)在那個(gè)線程使用的RXJava那么觀察者的onSubscribe方法就執(zhí)行在哪個(gè)線程

  1. subscribeOn只會(huì)負(fù)責(zé)上層的線程調(diào)度,observeOn只有在執(zhí)行onNext的時(shí)候才起作用碉哑,也就是下層的線程調(diào)度

4.使用handler(getMainLooper)來(lái)保證主線程操作


調(diào)用流程.png

自上而下(左邊的流程)->自下而上(右邊的流程+subscribe的調(diào)用)->自上而下(onNext的調(diào)用)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末挚币,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子扣典,更是在濱河造成了極大的恐慌妆毕,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,907評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件贮尖,死亡現(xiàn)場(chǎng)離奇詭異笛粘,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,987評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門薪前,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)润努,“玉大人,你說(shuō)我怎么就攤上這事示括∑探剑” “怎么了?”我有些...
    開(kāi)封第一講書人閱讀 164,298評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵垛膝,是天一觀的道長(zhǎng)鳍侣。 經(jīng)常有香客問(wèn)我,道長(zhǎng)吼拥,這世上最難降的妖魔是什么倚聚? 我笑而不...
    開(kāi)封第一講書人閱讀 58,586評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮扔罪,結(jié)果婚禮上秉沼,老公的妹妹穿的比我還像新娘。我一直安慰自己矿酵,他們只是感情好唬复,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,633評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著全肮,像睡著了一般敞咧。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上辜腺,一...
    開(kāi)封第一講書人閱讀 51,488評(píng)論 1 302
  • 那天休建,我揣著相機(jī)與錄音,去河邊找鬼评疗。 笑死测砂,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的百匆。 我是一名探鬼主播砌些,決...
    沈念sama閱讀 40,275評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼加匈!你這毒婦竟也來(lái)了存璃?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書人閱讀 39,176評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤雕拼,失蹤者是張志新(化名)和其女友劉穎纵东,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體啥寇,經(jīng)...
    沈念sama閱讀 45,619評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡偎球,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,819評(píng)論 3 336
  • 正文 我和宋清朗相戀三年洒扎,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片衰絮。...
    茶點(diǎn)故事閱讀 39,932評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡逊笆,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出岂傲,到底是詐尸還是另有隱情,我是刑警寧澤子檀,帶...
    沈念sama閱讀 35,655評(píng)論 5 346
  • 正文 年R本政府宣布镊掖,位于F島的核電站,受9級(jí)特大地震影響褂痰,放射性物質(zhì)發(fā)生泄漏亩进。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,265評(píng)論 3 329
  • 文/蒙蒙 一缩歪、第九天 我趴在偏房一處隱蔽的房頂上張望归薛。 院中可真熱鬧,春花似錦匪蝙、人聲如沸主籍。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 31,871評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)千元。三九已至,卻和暖如春颤绕,著一層夾襖步出監(jiān)牢的瞬間幸海,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 32,994評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工奥务, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留物独,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,095評(píng)論 3 370
  • 正文 我出身青樓氯葬,卻偏偏與公主長(zhǎng)得像挡篓,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子溢谤,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,884評(píng)論 2 354