拆輪子系列--RxJava理解(三)--observeOn

本系列文章如下:

上一篇文章主要介紹了RxJava中線程調(diào)度的核心方法之一subscribeOn,本篇文章繼續(xù)分析RxJava中線程調(diào)度的另一個(gè)核心方法--observeOn版扩。本篇文章基于RxJava2源碼進(jìn)行分析锈拨。
本文的大綱如下:

  • 一個(gè)具體的例子
  • observeOn源碼分析
  • 總結(jié)

1 .一個(gè)具體的例子

首先,以一個(gè)具體的例子分析observeOn的原理:

Observable.create(new ObservableOnSubscribe<String>() {
     @Override
     public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("1");
                Thread.sleep(1000);
                e.onNext("2");
                Thread.sleep(1000);
                e.onComplete();
            }
        })
                .map(new Function<String, Integer>() {
                    @Override
                    public Integer apply(String s) throws Exception {
                        Log.e("TAG", "map1--thread=" + Thread.currentThread().getName() + "-s:" + s);
                        return Integer.valueOf(s);
                    }
                })
                .subscribeOn(AndroidSchedulers.mainThread())
                .map(new Function<Integer, Long>() {
                 @Override
                public Long apply(Integer integer) throws Exception {
                        Log.e("TAG", "map2--thread=" + Thread.currentThread().getName() + "-integer:" + integer);
                        return Long.valueOf(integer);
                    }
                })
                .observeOn(Schedulers.io())
                .map(new Function<Long, String>() {
                    @Override
                   public String apply(Long aLong) throws Exception {
                        Log.e("TAG", "map3--thread=" + Thread.currentThread().getName() + "-aLong:" + aLong);
                        return String.valueOf(aLong);
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e("TAG", "Consumer--thread=" + Thread.currentThread().getName() + "-String:" + s);
                    }
                });

如果你了解map這個(gè)操作符惋鸥,那么這個(gè)例子你很快就能得運(yùn)行結(jié)果贺嫂,如果你對(duì)于map這個(gè)操作符不太清楚际起,建議回顧下之前的文章拆輪子系列--RxJava理解(一)--Map解析拾碌。接下來我們看看本例的程序運(yùn)行結(jié)果:

E/TAG: map1--thread-main-s:1
E/TAG: map2--thread-main-integer:1
E/TAG: map3--thread-RxCachedThreadScheduler-1-aLong:1
E/TAG: Consumer--thread-RxCachedThreadScheduler-1-String:1
E/TAG: map1--thread-main-s:2
E/TAG: map2--thread-main-integer:2
E/TAG: map3--thread-RxCachedThreadScheduler-1-aLong:2
E/TAG: Consumer--thread-RxCachedThreadScheduler-1-String:2

細(xì)看下之前的例子吐葱,可能有些朋友已經(jīng)發(fā)現(xiàn)了一個(gè)異常操作Thread.sleep(1000);。為什么在發(fā)射元素的時(shí)候睡了一秒鐘校翔?這個(gè)是為什么呢唇撬?哈哈,先不急展融,下文將一一道來窖认。
從上面運(yùn)行的結(jié)果我們發(fā)現(xiàn),除了observeOn()下面的部分運(yùn)行在observeOn()指定的線程中告希,其余的部分運(yùn)行在subscribeOn()指定的線程扑浸,這個(gè)是為什么呢?下面再分析燕偶,這里先給個(gè)結(jié)論:RxJava中喝噪,observeOn()是用來指定下游observer回調(diào)發(fā)生的線程。對(duì)應(yīng)上面的例子指么,也就是map3與Consumer運(yùn)行的線程酝惧。

2. observeOn源碼分析

為什么會(huì)產(chǎn)生上面的結(jié)果?我們來看看源碼:

@CheckReturnValue
 @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ...
      return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

從源碼中我們可以看出伯诬,調(diào)用observeOn()方法返回了一個(gè)Observable對(duì)象晚唇,而真正的操作是在ObservableObserveOn()這個(gè)方法里面,接下來我們看看ObservableObserveOn()這個(gè)方法到底干了什么事情:

public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

我們主要看看ObservableObserveOn中主要的實(shí)現(xiàn)方法subscribeActual()盗似。在這個(gè)方法中哩陕,首先創(chuàng)建了一個(gè)指定的事物worker,然后將worker作為參數(shù)創(chuàng)建了一個(gè)ObserveOnObserver對(duì)象赫舒,接下來我們分析這個(gè)ObserveOnObserver中具體的邏輯:

ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }

        @Override
        public void onSubscribe(Disposable s) {
            if (DisposableHelper.validate(this.s, s)) {
                this.s = s;
                if (s instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    QueueDisposable<T> qd = (QueueDisposable<T>) s;
                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

                    if (m == QueueDisposable.SYNC) {
                        sourceMode = m;
                        queue = qd;
                        done = true;
                        actual.onSubscribe(this);
                        schedule();
                        return;
                    }
                    if (m == QueueDisposable.ASYNC) {
                        sourceMode = m;
                        queue = qd;
                        actual.onSubscribe(this);
                        return;
                    }
                }
                queue = new SpscLinkedArrayQueue<T>(bufferSize);
                actual.onSubscribe(this);
            }
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }
        ...

ObserveOnObserver實(shí)現(xiàn)了Observer這個(gè)接口悍及,重寫了Observer里面的方法,我們看看主要的方法onNext()接癌。在該方法中心赶,首先會(huì)向queue()中添加元素,我們主要關(guān)注schedule()這個(gè)方法缺猛,進(jìn)入schedule()

void schedule() {
       if (getAndIncrement() == 0) {
             worker.schedule(this);
       }
  }

上述方法將實(shí)現(xiàn)了Runnable接口的ObserveOnObserver對(duì)象放入了worker里面進(jìn)行操作缨叫,直白的說,就是該ObserveOnObserver對(duì)象的操作會(huì)被放入一個(gè)線程池中枯夜,尋找合適的線程運(yùn)行弯汰。
主要的問題來了艰山,當(dāng)ObserveOnObserver對(duì)象尋找到一條線程后執(zhí)行了什么操作呢湖雹?繼續(xù)看源碼:

 @Override
   public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }

        //我們主要看看drainNormal()這個(gè)方法:
        void drainNormal() {
            int missed = 1;

            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = actual;

            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }

                for (;;) {
                    boolean d = done;
                    T v;
                    try {
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        s.dispose();
                        q.clear();
                        a.onError(ex);
                        return;
                    }
                    boolean empty = v == null;
                    if (checkTerminated(d, empty, a)) {
                        return;
                    }
                    if (empty) {
                        break;
                    }
                    a.onNext(v);
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

其實(shí)這個(gè)方法就是一個(gè)死循環(huán),它不斷的從queue取出元素然后交給由下一級(jí)傳遞上來的observer來執(zhí)行onNext()方法曙搬。而這整個(gè)從queue中取元素到由下級(jí)的observer執(zhí)行onNext()方法摔吏,都是執(zhí)行在scheduler( Scheduler.Worker w = scheduler.createWorker();)所指定的線程中鸽嫂。總的來說征讲,ObserveOnObserver會(huì)將下一級(jí)傳遞過來的observer進(jìn)行封裝据某,讓它獨(dú)立的運(yùn)行在scheduler指定的線程中去處理元素。
再回到前面的例子诗箍,我們?cè)?code>observeOn()操作符后面接著使用了一個(gè)map()操作符癣籽,那么此時(shí)的流程又是怎么樣的呢?我們以一張圖來進(jìn)行說明:

observerOn執(zhí)行流程.png

從上圖中可以看到滤祖,observeOn后面跟了一個(gè)map()筷狼,那么在drainNormal ()方法中a.onNext(v)a就是經(jīng)過map轉(zhuǎn)換過的observer,接著調(diào)用mapo.onNext(transformer.call(t))匠童,此時(shí)保證了transformer.call()方法運(yùn)行在observeOn()所指定的線程中埂材,而o就是observer2

3. 總結(jié)

使用observeOn()這個(gè)操作符汤求,會(huì)在原來Observer發(fā)射元素的時(shí)候俏险,將元素一個(gè)個(gè)的添加到一個(gè)指定的隊(duì)列中,然后異步(使用一個(gè)新的線程)的從該隊(duì)列中取出元素扬绪,將取出的元素交給下一級(jí)的observeronNext()方法來處理元素竖独。

回到前面拋出的一個(gè)問題,我們?cè)诎l(fā)射元素的時(shí)候sleep了1秒鐘挤牛,這個(gè)是為什么呢预鬓?說明一下:因?yàn)槲覀內(nèi)≡氐倪^程是異步操作的,那么很有可能出現(xiàn)某個(gè)線程的轉(zhuǎn)換執(zhí)行完畢之后才執(zhí)行另一個(gè)線程的轉(zhuǎn)換操作赊颠,最后與我們期望的結(jié)果不太一樣格二。當(dāng)我們?nèi)サ衾又?code>sleep()操作,其結(jié)果如下:

E/TAG: map1--thread=main-s:1
E/TAG: map2--thread=main-integer:1
E/TAG: map1--thread=main-s:2
E/TAG: map2--thread=main-integer:2
E/TAG: map3--thread=RxCachedThreadScheduler-1-aLong:1
E/TAG: Consumer--thread=RxCachedThreadScheduler-1-String:1
E/TAG: map3--thread=RxCachedThreadScheduler-1-aLong:2
E/TAG: Consumer--thread=RxCachedThreadScheduler-1-String:2

好了竣蹦,關(guān)于RxJava中線程調(diào)度的核心方法observeOn操作符已經(jīng)介紹完畢顶猜。

如果文章中有什么疏漏或者錯(cuò)誤的地方,還望各位指正痘括,你們的監(jiān)督是我最大的動(dòng)力长窄,謝謝!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末纲菌,一起剝皮案震驚了整個(gè)濱河市挠日,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌翰舌,老刑警劉巖嚣潜,帶你破解...
    沈念sama閱讀 218,204評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異椅贱,居然都是意外死亡懂算,警方通過查閱死者的電腦和手機(jī)只冻,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來计技,“玉大人喜德,你說我怎么就攤上這事】迕剑” “怎么了舍悯?”我有些...
    開封第一講書人閱讀 164,548評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)睡雇。 經(jīng)常有香客問我贱呐,道長(zhǎng),這世上最難降的妖魔是什么入桂? 我笑而不...
    開封第一講書人閱讀 58,657評(píng)論 1 293
  • 正文 為了忘掉前任奄薇,我火速辦了婚禮,結(jié)果婚禮上抗愁,老公的妹妹穿的比我還像新娘馁蒂。我一直安慰自己,他們只是感情好蜘腌,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,689評(píng)論 6 392
  • 文/花漫 我一把揭開白布沫屡。 她就那樣靜靜地躺著,像睡著了一般撮珠。 火紅的嫁衣襯著肌膚如雪沮脖。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,554評(píng)論 1 305
  • 那天芯急,我揣著相機(jī)與錄音勺届,去河邊找鬼。 笑死娶耍,一個(gè)胖子當(dāng)著我的面吹牛免姿,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播榕酒,決...
    沈念sama閱讀 40,302評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼胚膊,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了想鹰?” 一聲冷哼從身側(cè)響起紊婉,我...
    開封第一講書人閱讀 39,216評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎辑舷,沒想到半個(gè)月后喻犁,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,661評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,851評(píng)論 3 336
  • 正文 我和宋清朗相戀三年株汉,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片歌殃。...
    茶點(diǎn)故事閱讀 39,977評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡乔妈,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出氓皱,到底是詐尸還是另有隱情路召,我是刑警寧澤,帶...
    沈念sama閱讀 35,697評(píng)論 5 347
  • 正文 年R本政府宣布波材,位于F島的核電站股淡,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏廷区。R本人自食惡果不足惜唯灵,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,306評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望隙轻。 院中可真熱鬧埠帕,春花似錦、人聲如沸玖绿。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,898評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽斑匪。三九已至呐籽,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間蚀瘸,已是汗流浹背狡蝶。 一陣腳步聲響...
    開封第一講書人閱讀 33,019評(píng)論 1 270
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留贮勃,地道東北人牢酵。 一個(gè)月前我還...
    沈念sama閱讀 48,138評(píng)論 3 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像衙猪,于是被迫代替她去往敵國(guó)和親馍乙。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,927評(píng)論 2 355