迷之RxJava —— subscribeOn 和 observeOn 的區(qū)別

開頭

之前我們分析過subscribeOn這個函數,
現在我們來看下subscribeOnobserveOn這兩個函數到底有什么異同纺非。

用過rxjava的旁友都知道派草,subscribeOnobserveOn都是用來切換線程用的须蜗,可是我什么時候用subscribeOn硅确,什么時候用observeOn呢,我們很少知道這兩個區(qū)別是啥唠粥。

友情提示疏魏,如果不想看分析過程的,可以直接跳到下面的總結部分晤愧。

subscribeOn

先看下OperatorSubscribeOn的核心代碼:

public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

    final Scheduler scheduler;
    final Observable<T> source;

    public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
        this.scheduler = scheduler;
        this.source = source;
    }

    @Override
    public void call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);
        
        inner.schedule(new Action0() {
        
            @Override
            public void call() {
               
                Subscriber<T> s = new Subscriber<T>(subscriber) {
                    @Override
                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }
                    
                    @Override
                    public void onError(Throwable e) {
                        try {
                            subscriber.onError(e);
                        } finally {
                            inner.unsubscribe();
                        }
                    }
                    
                    @Override
                    public void onCompleted() {
                        try {
                            subscriber.onCompleted();
                        } finally {
                            inner.unsubscribe();
                        }
                    }
                    
                    ....
                };
                
                source.unsafeSubscribe(s);
            }
        });
    }
}

這里注意兩點:

  1. 因為OperatorSubscribeOn是個OnSubscribe對象大莫,所以在call參數中傳入的subscriber就是我們在外面使用Observable.subscribe(a)傳入的對象a
  1. 這里source對象指向的是調用subscribeOn之前的那個Observable序列官份。

明確了這兩點只厘,我們就很好的知道了subscribeOn是如何工作,產生神奇的效果了舅巷。
其實最最主要的就是一行函數

source.unsafeSubscribe(s);

并且要注意它所在的位置羔味,是在worker的call里面,說白了钠右,就是把source.subscribe這一行調用放在指定的線程里赋元,那么總結起來的結論就是:

subscribeOn的調用,改變了調用前序列所運行的線程飒房。

observeOn

同樣看下OperatorObserveOn這個類的主要代碼:

public final class OperatorObserveOn<T> implements Operator<T, T> {

    private final Scheduler scheduler;
    private final boolean delayError;

    /**
     * @param scheduler the scheduler to use
     * @param delayError delay errors until all normal events are emitted in the other thread?
     */
    public OperatorObserveOn(Scheduler scheduler, boolean delayError) {
        this.scheduler = scheduler;
        this.delayError = delayError;
    }

    @Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
        ....
        ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError);
        parent.init();
        return parent;
    }

    /** Observe through individual queue per observer. */
    private static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
        final Subscriber<? super T> child;
        final Scheduler.Worker recursiveScheduler;
        final NotificationLite<T> on;
        final boolean delayError;
        final Queue<Object> queue;
        
        // the status of the current stream
        volatile boolean finished;

        final AtomicLong requested = new AtomicLong();
        
        final AtomicLong counter = new AtomicLong();
        
        /** 
         * The single exception if not null, should be written before setting finished (release) and read after
         * reading finished (acquire).
         */
        Throwable error;

        // do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should
        // not prevent anything downstream from consuming, which will happen if the Subscription is chained
        public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError) {
            this.child = child;
            this.recursiveScheduler = scheduler.createWorker();
            this.delayError = delayError;
            this.on = NotificationLite.instance();
            if (UnsafeAccess.isUnsafeAvailable()) {
                queue = new SpscArrayQueue<Object>(RxRingBuffer.SIZE);
            } else {
                queue = new SpscAtomicArrayQueue<Object>(RxRingBuffer.SIZE);
            }
        }
        
        void init() {
            // don't want this code in the constructor because `this` can escape through the 
            // setProducer call
            Subscriber<? super T> localChild = child;
            
            localChild.setProducer(new Producer() {

                @Override
                public void request(long n) {
                    if (n > 0L) {
                        BackpressureUtils.getAndAddRequest(requested, n);
                        schedule();
                    }
                }

            });
            localChild.add(recursiveScheduler);
            localChild.add(this);
        }

        @Override
        public void onStart() {
            // signal that this is an async operator capable of receiving this many
            request(RxRingBuffer.SIZE);
        }

        @Override
        public void onNext(final T t) {
            if (isUnsubscribed() || finished) {
                return;
            }
            if (!queue.offer(on.next(t))) {
                onError(new MissingBackpressureException());
                return;
            }
            schedule();
        }

        @Override
        public void onCompleted() {
            if (isUnsubscribed() || finished) {
                return;
            }
            finished = true;
            schedule();
        }

        @Override
        public void onError(final Throwable e) {
            if (isUnsubscribed() || finished) {
                RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
                return;
            }
            error = e;
            finished = true;
            schedule();
        }

        protected void schedule() {
            if (counter.getAndIncrement() == 0) {
                recursiveScheduler.schedule(this);
            }
        }

        // only execute this from schedule()
        @Override
        public void call() {
            long emitted = 0L;

            long missed = 1L;

            // these are accessed in a tight loop around atomics so
            // loading them into local variables avoids the mandatory re-reading
            // of the constant fields
            final Queue<Object> q = this.queue;
            final Subscriber<? super T> localChild = this.child;
            final NotificationLite<T> localOn = this.on;
            
            // requested and counter are not included to avoid JIT issues with register spilling
            // and their access is is amortized because they are part of the outer loop which runs
            // less frequently (usually after each RxRingBuffer.SIZE elements)
            
            for (;;) {
                long requestAmount = requested.get();
                long currentEmission = 0L;
                
                while (requestAmount != currentEmission) {
                    boolean done = finished;
                    Object v = q.poll();
                    boolean empty = v == null;
                    
                    if (checkTerminated(done, empty, localChild, q)) {
                        return;
                    }
                    
                    if (empty) {
                        break;
                    }
                    
                    localChild.onNext(localOn.getValue(v));

                    currentEmission++;
                    emitted++;
                }
                
                if (requestAmount == currentEmission) {
                    if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
                        return;
                    }
                }
                
                if (currentEmission != 0L) {
                    BackpressureUtils.produced(requested, currentEmission);
                }
                
                missed = counter.addAndGet(-missed);
                if (missed == 0L) {
                    break;
                }
            }
            
            if (emitted != 0L) {
                request(emitted);
            }
        }
        
        boolean checkTerminated(boolean done, boolean isEmpty, Subscriber<? super T> a, Queue<Object> q) {
            if (a.isUnsubscribed()) {
                q.clear();
                return true;
            }
            
            if (done) {
                if (delayError) {
                    if (isEmpty) {
                        Throwable e = error;
                        try {
                            if (e != null) {
                                a.onError(e);
                            } else {
                                a.onCompleted();
                            }
                        } finally {
                            recursiveScheduler.unsubscribe();
                        }
                    }
                } else {
                    Throwable e = error;
                    if (e != null) {
                        q.clear();
                        try {
                            a.onError(e);
                        } finally {
                            recursiveScheduler.unsubscribe();
                        }
                        return true;
                    } else
                    if (isEmpty) {
                        try {
                            a.onCompleted();
                        } finally {
                            recursiveScheduler.unsubscribe();
                        }
                        return true;
                    }
                }
                    
            }
            
            return false;
        }
    }
}

這里的代碼有點長搁凸,我們先注意到它是一個Operator,它沒有對上層Observable做任何的控制或者包裝狠毯。

既然是Operator护糖,那么它的職責就是把一個Subscriber轉換成另外一個Subscriber, 我們來關注下轉換后的Subscriber對轉換前的Subscriber做了些什么事嚼松。

首先它是一個ObserveOnSubscriber類嫡良, 既然是Subscriber那么肯定有onNext, onCompleteonError 看最主要的onNext

@Override
public void onNext(final T t) {
    if (isUnsubscribed() || finished) {
        return;
    }
    if (!queue.offer(on.next(t))) {
        onError(new MissingBackpressureException());
        return;
    }
    schedule();
}

好了锰扶,這里做了兩件事,首先把結果緩存到一個隊列里寝受,然后調用schedule啟動傳入的worker

我們這里需要注意下:

在調用observeOn前的序列坷牛,把結果傳入到onNext就是它的工作,它并不關心后續(xù)的流程很澄,所以工作就到這里就結束了漓帅,剩下的交給ObserveOnSubscriber繼續(xù)。

protected void schedule() {
    if (counter.getAndIncrement() == 0) {
        recursiveScheduler.schedule(this);
    }
}

recursiveScheduler 就是之前我們傳入的Scheduler痴怨,我們一般會在observeOn傳入AndroidScheluders.mainThread()對吧、

接下去器予,我們看下在scheduler中調用的call方法浪藻,這里只列出主要帶代碼

@Override
public void call() {
    ...
    final Subscriber<? super T> localChild = this.child;
    for (;;) {
        ...
        boolean done = finished;
        Object v = q.poll();
        boolean empty = v == null;
        
        if (checkTerminated(done, empty, localChild, q)) {
            return;
        }
        
        if (empty) {
            break;
        }
        
        localChild.onNext(localOn.getValue(v));

        ...
    }
    
    if (emitted != 0L) {
        request(emitted);
    }
}

OK,在Scheduler啟動后乾翔, 我們在Observable.subscribe(a)傳入的a就是這里的child爱葵, 我們看到,在call中終于調用了它的onNext方法反浓,把真正的結果傳了出去萌丈,但是在這里,我們是工作在observeOn的線程上的雷则。

那么總結起來的結論就是:

  1. observeOn 對調用之前的序列默不關心辆雾,也不會要求之前的序列運行在指定的線程上
  2. observeOn 對之前的序列產生的結果先緩存起來,然后再在指定的線程上月劈,推送給最終的subscriber

復雜情況

我們經常多次使用subscribeOn切換線程度迂,那么以后是否可以組合observeOnsubscribeOn達到自由切換的目的呢?

組合是可以的猜揪,但是他們的執(zhí)行順序是有條件的惭墓,如果仔細分析的話,可以知道observeOn調用之后而姐,再調用subscribeOn是無效的腊凶,原因是什么?

因為subscribeOn改變的是subscribe這句調用所在的線程拴念,大多數情況钧萍,產生內容和消費內容是在同一線程的,所以改變了產生內容所在的線程丈莺,就改變了消費內容所在的線程划煮。

經過上面的闡述,我們知道缔俄,observeOn的工作原理是把消費結果先緩存弛秋,再切換到新線程上讓原始消費者消費器躏,它和生產者是沒有一點關系的,就算subscribeOn調用了蟹略,也只是改變observeOn這個消費者所在的線程登失,和OperatorObserveOn中存儲的原始消費者一點關系都沒有,它還是由observeOn控制挖炬。

總結

如果我們有一段這樣的序列

Observable
.map                    // 操作1
.flatMap                // 操作2
.subscribeOn(io)
.map                    //操作3
.flatMap                //操作4
.observeOn(main)
.map                    //操作5
.flatMap                //操作6
.subscribeOn(io)        //!!特別注意
.subscribe(handleData)

假設這里我們是在主線程上調用這段代碼揽浙,
那么

  1. 操作1操作2是在io線程上意敛,因為之后subscribeOn切換了線程
  2. 操作3媒吗,操作4也是在io線程上,因為在subscribeOn切換了線程之后舍哄,并沒有發(fā)生改變氛改。
  3. 操作5操作6是在main線程上撩独,因為在他們之前的observeOn切換了線程敞曹。
  4. 特別注意那一段,對于操作5操作6是無效的
    再簡單點總結就是
  1. subscribeOn的調用切換之前的線程综膀。
  2. observeOn的調用切換之后的線程澳迫。
  3. observeOn之后,不可再調用subscribeOn 切換線程
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末剧劝,一起剝皮案震驚了整個濱河市橄登,隨后出現的幾起案子,更是在濱河造成了極大的恐慌担平,老刑警劉巖示绊,帶你破解...
    沈念sama閱讀 218,755評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現場離奇詭異暂论,居然都是意外死亡面褐,警方通過查閱死者的電腦和手機,發(fā)現死者居然都...
    沈念sama閱讀 93,305評論 3 395
  • 文/潘曉璐 我一進店門取胎,熙熙樓的掌柜王于貴愁眉苦臉地迎上來展哭,“玉大人,你說我怎么就攤上這事闻蛀》税” “怎么了?”我有些...
    開封第一講書人閱讀 165,138評論 0 355
  • 文/不壞的土叔 我叫張陵觉痛,是天一觀的道長役衡。 經常有香客問我,道長薪棒,這世上最難降的妖魔是什么手蝎? 我笑而不...
    開封第一講書人閱讀 58,791評論 1 295
  • 正文 為了忘掉前任榕莺,我火速辦了婚禮,結果婚禮上棵介,老公的妹妹穿的比我還像新娘钉鸯。我一直安慰自己,他們只是感情好邮辽,可當我...
    茶點故事閱讀 67,794評論 6 392
  • 文/花漫 我一把揭開白布唠雕。 她就那樣靜靜地躺著,像睡著了一般吨述。 火紅的嫁衣襯著肌膚如雪岩睁。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,631評論 1 305
  • 那天揣云,我揣著相機與錄音笙僚,去河邊找鬼。 笑死灵再,一個胖子當著我的面吹牛,可吹牛的內容都是我干的亿笤。 我是一名探鬼主播翎迁,決...
    沈念sama閱讀 40,362評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼净薛!你這毒婦竟也來了汪榔?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,264評論 0 276
  • 序言:老撾萬榮一對情侶失蹤肃拜,失蹤者是張志新(化名)和其女友劉穎痴腌,沒想到半個月后,有當地人在樹林里發(fā)現了一具尸體燃领,經...
    沈念sama閱讀 45,724評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡士聪,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現自己被綠了猛蔽。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片剥悟。...
    茶點故事閱讀 40,040評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖曼库,靈堂內的尸體忽然破棺而出区岗,到底是詐尸還是另有隱情,我是刑警寧澤毁枯,帶...
    沈念sama閱讀 35,742評論 5 346
  • 正文 年R本政府宣布慈缔,位于F島的核電站,受9級特大地震影響种玛,放射性物質發(fā)生泄漏藐鹤。R本人自食惡果不足惜瓤檐,卻給世界環(huán)境...
    茶點故事閱讀 41,364評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望教藻。 院中可真熱鬧距帅,春花似錦、人聲如沸括堤。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,944評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽悄窃。三九已至讥电,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間轧抗,已是汗流浹背恩敌。 一陣腳步聲響...
    開封第一講書人閱讀 33,060評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留横媚,地道東北人纠炮。 一個月前我還...
    沈念sama閱讀 48,247評論 3 371
  • 正文 我出身青樓,卻偏偏與公主長得像灯蝴,于是被迫代替她去往敵國和親恢口。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,979評論 2 355

推薦閱讀更多精彩內容