RxJava2源碼(三)

subscribeOn

  1. 找到ObservableSubscribeOn類

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        // 傳入上游的Observable和調(diào)度器Scheduler
        this.scheduler = scheduler;
    }
    
    @Override
    public void subscribeActual(final Observer<? super T> s) {
        // 用下游的observer創(chuàng)建一個(gè)新的observer
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    
        s.onSubscribe(parent);
        // scheduler直接執(zhí)行SubscribeTask
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    
  2. 查看SubscribeOnObserver類钝凶,除了實(shí)現(xiàn)Disposable接口還實(shí)現(xiàn)了Observer接口,說明他還是個(gè)新的observer筐乳;

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
    
        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> actual;
    
        final AtomicReference<Disposable> s;
    
        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;// 下游的observer
            this.s = new AtomicReference<Disposable>();
        }
    
        @Override
        public void onSubscribe(Disposable s) {
            // 上游的onSubscribe會(huì)調(diào)用致稀,但是因?yàn)閠his.s的disposable不為null杂瘸,大部分情況一直都是直接跳過
            DisposableHelper.setOnce(this.s, s);
        }
    
        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }
    
        @Override
        public void onError(Throwable t) {
            actual.onError(t);
        }
    
        @Override
        public void onComplete() {
            actual.onComplete();
        }
    
        @Override
        public void dispose() {
            DisposableHelper.dispose(s); // 這里dispose多了個(gè)步驟,沒明白?仪媒??谢鹊?
            DisposableHelper.dispose(this);
        }
    
        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
    
        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }
    
  3. 查看SubscribeTask规丽,這是個(gè)Runnable,并且是ObservableSubscribeOn的內(nèi)部類

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;
    
        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }
    
        @Override
        public void run() {
            // 上游的observable執(zhí)行subscribe撇贺,傳入的是下游的obser
            source.subscribe(parent);
        }
    }
    
  4. 查看scheduler.scheduleDirect赌莺,這個(gè)方法就是用scheduler立即執(zhí)行傳入的Runnable任務(wù)

    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
    
  5. 所以總結(jié)下

    1. ObservableSubscribeOn起到一個(gè)橋接的功能,執(zhí)行source.subscribe(parent)松嘶,處理上游的Observable
    2. 傳入的scheduler用來控制怎么執(zhí)行

observeOn

  1. 找到ObservableObserveOn類

    public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
        final Scheduler scheduler;
        final boolean delayError;
        final int bufferSize;
        public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
            super(source);
            this.scheduler = scheduler; // 調(diào)度器
            this.delayError = delayError;// 
            this.bufferSize = bufferSize;// 任務(wù)隊(duì)列大小艘狭,默認(rèn)128
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            if (scheduler instanceof TrampolineScheduler) {
                // 如果是TrampolineScheduler,放個(gè)屁啥都不干翠订?
                source.subscribe(observer);
            } else {
                // scheduler.createWorker
                Scheduler.Worker w = scheduler.createWorker();
                // 下游的observer包裝成ObserveOnObserver巢音,傳給上游的source.subscribe
                source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
            }
        }
    
  2. 查看ObserveOnObserver類:AtomicInteger的子類,實(shí)現(xiàn)了QueueDisposable尽超,Observer<T>, Runnable

    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {
    
        private static final long serialVersionUID = 6576896619930983584L;
        final Observer<? super T> actual;
        final Scheduler.Worker worker;
        final boolean delayError;
        final int bufferSize;
    
        SimpleQueue<T> queue; // 一個(gè)隊(duì)列
    
        Disposable s;
    
        Throwable error;
        volatile boolean done;// 標(biāo)記是否結(jié)束官撼,和disposable不一樣的是如果done為true,還是會(huì)走OnComplete或者onError
        volatile boolean cancelled;// 標(biāo)記disposable狀態(tài)
    
        int sourceMode;
    
        boolean outputFused;
    
        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
    
  3. 查看ObserveOnObserver類的onSubscribe方法

    @Override
    public void onSubscribe(Disposable s) {
        if (DisposableHelper.validate(this.s, s)) {
            this.s = s;
    // 這里省略了一部分FuseMode的代碼似谁,默認(rèn)情況不會(huì)走
            // 初始化隊(duì)列
            queue = new SpscLinkedArrayQueue<T>(bufferSize);
            // 下游observer的onSubscribe回調(diào)
            actual.onSubscribe(this);
        }
    }
    
  4. 查看disposable的邏輯傲绣,cancelled變量標(biāo)記,dispose同時(shí)還會(huì)清空隊(duì)列和dispose任務(wù)worker

    @Override
    public void dispose() {
        if (!cancelled) {
            cancelled = true;
            s.dispose();
            worker.dispose();// 中斷Scheduler任務(wù)
            if (getAndIncrement() == 0) {
                // 清空隊(duì)列
                queue.clear();
            }
        }
    }
    
    @Override
    public boolean isDisposed() {
        return cancelled;
    }
    
  5. 查看ObserveOnObserver的onNext巩踏,把數(shù)據(jù)塞到隊(duì)列里秃诵,并且只有AtomicInteger值為0,才執(zhí)行任務(wù)worker.schedule

    @Override
    public void onNext(T t) {
        if (done) {
            return;
        }
    
        if (sourceMode != QueueDisposable.ASYNC) {
            // 入隊(duì)列
            queue.offer(t);
        }
        schedule();
    }
    
    void schedule() {
        if (getAndIncrement() == 0) {
            // AtomicInteger的值為0就執(zhí)行worker.schedule塞琼,傳入的this是Runnable菠净;AtomicInteger加1
            worker.schedule(this);
        }
    }
    
  6. ObserveOnObserver的Runnable實(shí)現(xiàn)run方法

    @Override
    public void run() {
        if (outputFused) {
            // 默認(rèn)為false,先不管
            drainFused();
        } else {
            drainNormal();
        }
    }
    
    void drainNormal() {
        int missed = 1;// 這里成功接收一次數(shù)據(jù)missed就加1,默認(rèn)為1
    
        final SimpleQueue<T> q = queue;
        final Observer<? super T> a = actual;
    
        for (;;) {
            if (checkTerminated(done, q.isEmpty(), a)) {
                // disposable或者done的情況會(huì)返回true
                return;
            }
    
            for (;;) {
                boolean d = done;
                T v;
    
                try {
                    v = q.poll();// 取出隊(duì)列一個(gè)數(shù)據(jù)v
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    s.dispose();
                    q.clear();
                    a.onError(ex);
                    worker.dispose();
                    return;
                }
                boolean empty = v == null;
    
                if (checkTerminated(d, empty, a)) {
                    // 再次檢查是否已經(jīng)done或者diposable
                    return;
                }
    
                if (empty) {
                    // 如果隊(duì)列空了跳出里面的for循環(huán)
                    break;
                }
                // 下游的Observer回調(diào)onNext
                a.onNext(v);
            }
    
            // 更新missed值毅往,表示還剩幾個(gè)走了schedule但是還沒有被調(diào)用onNext的任務(wù)
            missed = addAndGet(-missed);
            if (missed == 0) {
                // missed為0跳出for循環(huán)
                break;
            }
        }
    }
    

    這里解釋下為什么要兩個(gè)for循環(huán)和為什么用AtomicInteger標(biāo)記schedule次數(shù)牵咙,因?yàn)橐紤]上游事件發(fā)送和下游事件接受速度是不一樣,而且worker.schedule導(dǎo)致上下游不在一個(gè)線程攀唯,比如下面幾個(gè)例子

    1. 發(fā)送數(shù)據(jù)(1霜大,2,3革答,4)很快战坤,接收數(shù)據(jù)很慢:那么異步情況下,很快的會(huì)調(diào)用四次schedule残拐,getAndIncrement只有第一次為0途茫,只會(huì)走一次worker.schedule(this),那么run方法就只會(huì)走一次溪食,會(huì)在外面的for循環(huán)跳出囊卜,也就是missed=0結(jié)束
    2. 發(fā)送數(shù)據(jù)(1,2错沃,3栅组,4,complete)很快枢析,接收數(shù)據(jù)很慢:基本同上玉掸,但是因?yàn)榘l(fā)送了complete導(dǎo)致狀態(tài)為disposable,會(huì)在里面的for循環(huán)return醒叁,因?yàn)閏heckTerminated(d, empty, a)返回了true
    3. 發(fā)送數(shù)據(jù)(1司浪,2)很慢,發(fā)送數(shù)據(jù)(3把沼,4)很快啊易,接收數(shù)據(jù)(2)很慢:那么異步情況下,有可能在2執(zhí)行完onNext饮睬,剛剛跳出里面的for循環(huán)租谈,這時(shí)候發(fā)送數(shù)據(jù)3了導(dǎo)致隊(duì)列不為空,miss不為空了所以不會(huì)跳出外面的for循環(huán)捆愁。這樣就不用worker.schedule(this);
  7. 總結(jié)下

    1. 同樣是橋接割去,這里對(duì)下游的observer做處理
    2. 傳入的scheduler用來控制怎么執(zhí)行

Example

下面代碼是我們很常見的一個(gè)例子,發(fā)送數(shù)據(jù)(1牙瓢,2)在IO線程執(zhí)行劫拗,Observer在UI線程中執(zhí)行

Observable.just(1,2)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.e("yj", "---onSubscribe==" + d.isDisposed());
            }

            @Override
            public void onNext(@NonNull Integer i) {
                Log.e("yj", "---onNext==" + i);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.e("yj", "---onError==" + e);
            }

            @Override
            public void onComplete() {
                Log.e("yj", "---onComplete");
            }
        });
  1. 依次創(chuàng)建了三個(gè)Observable:ObservableFromArray间校,ObservableSubscribeOn矾克,ObservableObserveOn
  2. 從下往上看subscribeActual方法調(diào)用:ObservableObserveOn不做處理,ObservableSubscribeOn使subscribe在io線程中執(zhí)行憔足,ObservableFromArray順序發(fā)送1胁附,2
  3. Observer的回調(diào)只有ObservableObserveOn處理了酒繁,使其在UI線程中被調(diào)用

再舉個(gè)的例子

下面的代碼執(zhí)行了兩次subscribeOn和observeOn,那么just和accept在哪個(gè)線程執(zhí)行呢?

Observable.just(1,2)
        .subscribeOn(Schedulers.io())
        .subscribeOn(AndroidSchedulers.mainThread())
        .observeOn(AndroidSchedulers.mainThread())
        .observeOn(Schedulers.io())
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                
            }
        });

答案:在兩個(gè)不同的IO線程

PS

我的github:https://github.com/nppp1990/MyTips

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末控妻,一起剝皮案震驚了整個(gè)濱河市州袒,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌弓候,老刑警劉巖郎哭,帶你破解...
    沈念sama閱讀 218,682評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異菇存,居然都是意外死亡夸研,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,277評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門依鸥,熙熙樓的掌柜王于貴愁眉苦臉地迎上來亥至,“玉大人,你說我怎么就攤上這事贱迟〗惆纾” “怎么了?”我有些...
    開封第一講書人閱讀 165,083評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵衣吠,是天一觀的道長茶敏。 經(jīng)常有香客問我,道長缚俏,這世上最難降的妖魔是什么睡榆? 我笑而不...
    開封第一講書人閱讀 58,763評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮袍榆,結(jié)果婚禮上胀屿,老公的妹妹穿的比我還像新娘。我一直安慰自己包雀,他們只是感情好宿崭,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,785評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著才写,像睡著了一般葡兑。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上赞草,一...
    開封第一講書人閱讀 51,624評(píng)論 1 305
  • 那天讹堤,我揣著相機(jī)與錄音,去河邊找鬼厨疙。 笑死洲守,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播梗醇,決...
    沈念sama閱讀 40,358評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼知允,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了叙谨?” 一聲冷哼從身側(cè)響起温鸽,我...
    開封第一講書人閱讀 39,261評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎手负,沒想到半個(gè)月后涤垫,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,722評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡竟终,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評(píng)論 3 336
  • 正文 我和宋清朗相戀三年雹姊,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片衡楞。...
    茶點(diǎn)故事閱讀 40,030評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡吱雏,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出瘾境,到底是詐尸還是另有隱情歧杏,我是刑警寧澤,帶...
    沈念sama閱讀 35,737評(píng)論 5 346
  • 正文 年R本政府宣布迷守,位于F島的核電站犬绒,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏兑凿。R本人自食惡果不足惜凯力,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,360評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望礼华。 院中可真熱鬧咐鹤,春花似錦、人聲如沸圣絮。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,941評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至犁珠,卻和暖如春驱入,著一層夾襖步出監(jiān)牢的瞬間悯许,已是汗流浹背傍药。 一陣腳步聲響...
    開封第一講書人閱讀 33,057評(píng)論 1 270
  • 我被黑心中介騙來泰國打工嘿般, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人力麸。 一個(gè)月前我還...
    沈念sama閱讀 48,237評(píng)論 3 371
  • 正文 我出身青樓可款,卻偏偏與公主長得像育韩,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子筑舅,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,976評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容