RxJava2源碼分析

響應(yīng)式編程

說道rxjava,就要提到響應(yīng)式編程
響應(yīng)式編程是一種基于異步數(shù)據(jù)流概念的編程模式。
數(shù)據(jù)流就像一條河:它可以被觀測课舍,被過濾霹俺,被操作,或者為新的消費者與另外一條流合并為一條新的流掰邢。

簡介

RxJava可以濃縮為異步兩個字牺陶,其核心的東西不外乎兩個伟阔, Observable(被觀察者) 和 Observer(觀察者)。Observable可以發(fā)出一系列的 事件(例如網(wǎng)絡(luò)請求掰伸、復(fù)雜計算皱炉、數(shù)據(jù)庫操作、文件讀取等)碱工,事件執(zhí)行結(jié)束后交給Observer的回調(diào)處理娃承。

使用

 Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                e.onNext("5555555");
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull String o) {
                Log.e("onNext",o);
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
    }

看源碼

先看被觀察者Observable的create方法,Observable是一個抽象類

 @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        //null判斷
        ObjectHelper.requireNonNull(source, "source is null");
        //追蹤該代碼
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

  @NonNull
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        //將Observable返回給我們
        return source;
    }
//ObservableCreate是Observable(被觀察者)的子類
public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

從上述代碼可以看出,最終將Observable返回給了我們,那我們看看ObservableOnSubscribe是什么

public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * @param emitter the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}

接下我們看Observer,這幾個方法很熟悉

public interface Observer<T> {
    void onSubscribe(@NonNull Disposable d);

    void onNext(@NonNull T t);

    void onError(@NonNull Throwable e);

    void onComplete();
}

然后我們看Observable的subscribe方法

 @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
            //該方法是Observable的抽象方法,是Observable的子類實現(xiàn)的
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

//我們知道ObservableCreate是Observable的子類,所以我們看ObservableCreate中實現(xiàn)的subscribeActual方法
 @Override
    protected void subscribeActual(Observer observer) {
        //是ObservableCreate的靜態(tài)內(nèi)部類
        CreateEmitter<T> parent = new CreateEmitter(observer);
        observer.onSubscribe(parent);
        try {
            //source就是我們在Observable.create方法中傳入的ObservableOnSubscribe
            source.subscribe(parent);
        } catch (Exception e) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);      
        }
    }

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {


        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                //被觀察者和觀察者發(fā)生了關(guān)系....
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

        @Override
        public void setDisposable(Disposable d) {
            DisposableHelper.set(this, d);
        }

        @Override
        public void setCancellable(Cancellable c) {
            setDisposable(new CancellableDisposable(c));
        }

        @Override
        public ObservableEmitter<T> serialize() {
            return new SerializedEmitter<T>(this);
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
    }

線程調(diào)度

subscribeOn

 @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        //這里代碼和 Observable.create方法類似,直接看ObservableSubscribeOn
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        //Observable實現(xiàn)了ObservableSource這個接口的,所以這里的source是指我們剛才創(chuàng)建的那個Observable
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        //Observer的onSubscribe方法,因為此時的Observable的subscribe
        //方法發(fā)生在當(dāng)前線程怕篷,所以O(shè)bserver的onSubscribe方法的執(zhí)行
        //線程和當(dāng)前調(diào)用Observable的subscribe方法的線程一致
        s.onSubscribe(parent);
        //SubscribeTask是一個Runnable,scheduler是線程調(diào)度對象,我們跟蹤scheduleDirect方法
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
      //將新的 Disposable 設(shè)置給 parent 历筝,方便取消訂閱關(guān)系,

    }

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> actual;

        final AtomicReference<Disposable> s;

        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }

        @Override
        public void onSubscribe(Disposable s) {
            DisposableHelper.setOnce(this.s, s);
        }

        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            actual.onError(t);
        }

        @Override
        public void onComplete() {
            actual.onComplete();
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(s);
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            //執(zhí)行了source(ObservableSource)的subscribe方法
            //即上游Observable的subscribe方法
            source.subscribe(parent);
        }
    }
}

subscribeOn方法返回了一個新的Observable廊谓,而這個新的Observable里面持有一個上一層Observable的引用梳猪。那個引用就是source。調(diào)用subscribeOn方法后蒸痹,在他之前的和在他之后的代碼執(zhí)行的線程都是subscribeOn指定的線程,onSubscribe方法除外,因為Observable的subscribe方法發(fā)生在當(dāng)前線程春弥,所以O(shè)bserver的onSubscribe方法的執(zhí)行線程和當(dāng)前調(diào)用Observable的subscribe方法的線程一致!

 @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }

 @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        //包裝了一下Runnable,DisposeTask還是一個Runnable
        DisposeTask task = new DisposeTask(decoratedRun, w);
        //schedule是Worker的抽象方法,在這我們追蹤下IoScheduler的schedule方法
        w.schedule(task, delay, unit);

        return task;
    }

  @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }

            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }

 @NonNull
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }

        Future<?> f;
        try {
            if (delayTime <= 0) {
                //通過線程池執(zhí)行Runnable 
                f = executor.submit((Callable<Object>)sr);
            } else {
                //通過線程池執(zhí)行Runnable 
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }

我們看到最終調(diào)用了NewThreadWorker的中的線程池去執(zhí)行我們傳入的Runnable的run方法,即訂閱方法中的線程始終由上游observable.subscribeOn方法來決定

subscribeOn第一次有效原理

由于訂閱是由下而上的,所以最上面的subscribeOn方法是最后執(zhí)行了,而subscribeOn方法返回的一個新的Observable里持有一個上一層Observable的引用,所以這里的線程是由上游observable.subscribeOn方法中傳入的線程來決定,個人理解就像遞歸調(diào)用一樣,直至調(diào)用到最上層的第一個subscribeOn方法.

舉個栗子

public class Rx {

    String name;
    Rx rx;

    public Rx(Rx rx,String name) {
        this.name = name;
        this.rx = rx;
    }

    public void subscribe(){
        System.out.println(name+"subscribe線程:"+Thread.currentThread().getName());
        if (rx == null){
            System.out.println("最終執(zhí)行"+name+"線程:"+Thread.currentThread().getName());
            return;
        }
        new Thread(new Runnable() {
            @Override
            public void run() {
                //調(diào)用上游rx的subscribe方法
                rx.subscribe();
            }
        }).start();
    }
}

  public static void main(String[] args) {
        Rx rx1 = new Rx(null, "rx1");
        Rx rx2 = new Rx(rx1, "rx2");
        Rx rx3 = new Rx(rx2, "rx3");
        Rx rx4 = new Rx(rx3, "rx4");
        rx4.subscribe();
    }

結(jié)果如圖


image.png

observeOn

  @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        //不啰嗦,直接看ObservableObserveOn的subscribeActual方法(真正的訂閱方法)
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

 @Override
    protected void subscribeActual(Observer<? super T> observer) {
         //TrampolineScheduler 表示當(dāng)前線程
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
            //重點是ObserveOnObserver觀察者
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

重點看ObserveOnObserver的onNext,onError,onComplete等方法

 @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            //切換了線程
            schedule();
        }

        @Override
        public void onError(Throwable t) {
            if (done) {
                RxJavaPlugins.onError(t);
                return;
            }
            error = t;
            done = true;
            //切換了線程
            schedule();
        }

        @Override
        public void onComplete() {
            if (done) {
                return;
            }
            done = true;
            //切換了線程
            schedule();
        }


 void schedule() {
            if (getAndIncrement() == 0) {
                //我們已IO線程為例,追蹤該方法
                worker.schedule(this);
            }
        }

在Ioscheduler中找到實現(xiàn)的schedule方法

 @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }
            //該方法即為我們追蹤的subscribeOn中的相同的方法
            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }

看到這里,我們應(yīng)該了解了在線程調(diào)度過程中,subscribeOn和observeOn方法的區(qū)別,observeOn只要調(diào)用,就會切換下游事件的線程,而subscribeOn在他之前的和在他之后的代碼執(zhí)行的線程都是subscribeOn指定的線程,onSubscribe方法除外.

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末叠荠,一起剝皮案震驚了整個濱河市匿沛,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌榛鼎,老刑警劉巖逃呼,帶你破解...
    沈念sama閱讀 219,188評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異者娱,居然都是意外死亡抡笼,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,464評論 3 395
  • 文/潘曉璐 我一進(jìn)店門黄鳍,熙熙樓的掌柜王于貴愁眉苦臉地迎上來推姻,“玉大人,你說我怎么就攤上這事框沟〔毓牛” “怎么了?”我有些...
    開封第一講書人閱讀 165,562評論 0 356
  • 文/不壞的土叔 我叫張陵忍燥,是天一觀的道長校翔。 經(jīng)常有香客問我,道長灾前,這世上最難降的妖魔是什么防症? 我笑而不...
    開封第一講書人閱讀 58,893評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮,結(jié)果婚禮上蔫敲,老公的妹妹穿的比我還像新娘饲嗽。我一直安慰自己,他們只是感情好奈嘿,可當(dāng)我...
    茶點故事閱讀 67,917評論 6 392
  • 文/花漫 我一把揭開白布貌虾。 她就那樣靜靜地躺著,像睡著了一般裙犹。 火紅的嫁衣襯著肌膚如雪尽狠。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,708評論 1 305
  • 那天叶圃,我揣著相機(jī)與錄音袄膏,去河邊找鬼。 笑死掺冠,一個胖子當(dāng)著我的面吹牛沉馆,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播德崭,決...
    沈念sama閱讀 40,430評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼斥黑,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了眉厨?” 一聲冷哼從身側(cè)響起锌奴,我...
    開封第一講書人閱讀 39,342評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎憾股,沒想到半個月后鹿蜀,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,801評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡荔燎,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,976評論 3 337
  • 正文 我和宋清朗相戀三年耻姥,在試婚紗的時候發(fā)現(xiàn)自己被綠了销钝。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片有咨。...
    茶點故事閱讀 40,115評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖蒸健,靈堂內(nèi)的尸體忽然破棺而出座享,到底是詐尸還是另有隱情,我是刑警寧澤似忧,帶...
    沈念sama閱讀 35,804評論 5 346
  • 正文 年R本政府宣布渣叛,位于F島的核電站,受9級特大地震影響盯捌,放射性物質(zhì)發(fā)生泄漏淳衙。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,458評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望箫攀。 院中可真熱鬧肠牲,春花似錦、人聲如沸靴跛。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,008評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽梢睛。三九已至肥印,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間绝葡,已是汗流浹背深碱。 一陣腳步聲響...
    開封第一講書人閱讀 33,135評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留挤牛,地道東北人莹痢。 一個月前我還...
    沈念sama閱讀 48,365評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像墓赴,于是被迫代替她去往敵國和親竞膳。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,055評論 2 355

推薦閱讀更多精彩內(nèi)容

  • 作者寄語 很久之前就想寫一個專題诫硕,專寫Android開發(fā)框架坦辟,專題的名字叫 XXX 從入門到放棄 ,沉淀了這么久章办,...
    戴定康閱讀 7,631評論 13 85
  • 響應(yīng)式編程簡介 響應(yīng)式編程是一種基于異步數(shù)據(jù)流概念的編程模式锉走。數(shù)據(jù)流就像一條河:它可以被觀測,被過濾藕届,被操作挪蹭,或者...
    說碼解字閱讀 3,068評論 0 5
  • http://blog.csdn.net/yyh352091626/article/details/5330472...
    奈何心善閱讀 3,561評論 0 0
  • 內(nèi)容翻譯自官方文檔,水平有限休偶,僅供初學(xué)者學(xué)習(xí)交流梁厉。 官網(wǎng)文檔英文版 1. 使用 RxJava 2.0 實現(xiàn)響應(yīng)式編...
    天天吃飯呀閱讀 3,880評論 0 1
  • 而誓言如同"我愛你"好像也不足以表達(dá)她胸腔內(nèi)時刻奔騰著的情意词顾。 那感情經(jīng)年累月的沖刷著她的每一條神經(jīng),每每叫囂著什...
    蛋不分公母閱讀 168評論 0 0