Rxjava2.2.1(4) observeOn 線程切換-源碼分析

rxjava代碼

Observable
    .create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            emitter.onNext("有情況");
        }
    })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {

        }

        @Override
        public void onNext(String s) {
            Log.e("qwer", s);
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    });

然后create和subscribe也不講了(可以看前面文章)
1、直接看observeOn

public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}

再進(jìn)入重載的observeOn方法

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

再進(jìn)入ObservableObserveOn

public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
    super(source);
    this.scheduler = scheduler;
    this.delayError = delayError;
    this.bufferSize = bufferSize;
}

同樣的套路,完成賦值
通過(guò)前面的三篇文章,我們已經(jīng)知道接下來(lái)會(huì)進(jìn)入ObservableObserveOn的subscribeActual方法

protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();

        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

很顯然,scheduler 不是 TrampolineScheduler類(lèi)型孵构,也就是進(jìn)入else代碼中,調(diào)用了scheduler.createWorker();
2、這個(gè)時(shí)候必須要先具體了解一下scheduler了
這個(gè)scheduler就是我們傳的AndroidSchedulers.mainThread()
進(jìn)入該方法

public static Scheduler mainThread() {
    return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}

其實(shí)這里onMainThreadScheduler返回的就是本身
3累榜、所以繼續(xù)看MAIN_THREAD

private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
    new Callable<Scheduler>() {
        @Override public Scheduler call() throws Exception {
            return MainHolder.DEFAULT;
        }
    });

進(jìn)入RxAndroidPlugins.initMainThreadScheduler

public static Scheduler initMainThreadScheduler(Callable<Scheduler> scheduler) {
    if (scheduler == null) {
        throw new NullPointerException("scheduler == null");
    }
    Function<Callable<Scheduler>, Scheduler> f = onInitMainThreadHandler;
    if (f == null) {
        return callRequireNonNull(scheduler);
    }
    return applyRequireNonNull(f, scheduler);
}

因?yàn)閒為空(為什么為空,就是我沒(méi)有對(duì)onInitMainThreadHandler進(jìn)行賦值)灵嫌,所以返回了callRequireNonNull(scheduler)壹罚,進(jìn)入該方法

static Scheduler callRequireNonNull(Callable<Scheduler> s) {
    try {
        Scheduler scheduler = s.call();
        if (scheduler == null) {
            throw new NullPointerException("Scheduler Callable returned null");
        }
        return scheduler;
    } catch (Throwable ex) {
        throw Exceptions.propagate(ex);
    }
}

其實(shí)就是返回了s.call(),而s.call()是什么寿羞,不錯(cuò)猖凛,就是步驟3最開(kāi)始的
return MainHolder.DEFAULT;
4、繼續(xù)查看 MainHolder.DEFAULT

private static final class MainHolder {
    static final Scheduler DEFAULT
        = new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
}

P髂隆1嬗尽虱岂!發(fā)現(xiàn)重點(diǎn)
這里返回了一個(gè)HandlerScheduler,構(gòu)造函數(shù)參數(shù)還放了一個(gè)
new Handler(Looper.getMainLooper())菠红,是不是感覺(jué)有點(diǎn)感覺(jué)
進(jìn)入HandlerScheduler

HandlerScheduler(Handler handler, boolean async) {
    this.handler = handler;
    this.async = async;
}

只是賦值第岖,一個(gè)主線程handler,然后async是false
而這個(gè)時(shí)候我們聯(lián)系步驟1最后是不是應(yīng)該看它的createWorker方法

public Worker createWorker() {
    return new HandlerWorker(handler, async);
}

好,所以此時(shí)回到步驟1的最后试溯,Scheduler.Worker就是HandlerWorker
5蔑滓、順著步驟1最后的else代碼繼續(xù)看
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
source.subscribe就不用講了,直接看ObserveOnObserver

ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
    this.downstream = actual;
    this.worker = worker;
    this.delayError = delayError;
    this.bufferSize = bufferSize;
}

一系列的賦值操作
嗯哼遇绞?是不是好像結(jié)束了键袱??
不存在的D∶觥8茏荨!
注意它可是Observer钩骇,注意它的方法
6比藻、首先看onSubscribe方法,因?yàn)樗鼤?huì)被第一個(gè)調(diào)用(看過(guò)第一篇文章的都知道)倘屹,它也是我們自己new的觀察者的第一個(gè)回調(diào)方法

public void onSubscribe(Disposable d) {
    if (DisposableHelper.validate(this.upstream, d)) {
        this.upstream = d;
        if (d instanceof QueueDisposable) {
            @SuppressWarnings("unchecked")
            QueueDisposable<T> qd = (QueueDisposable<T>) d;

            int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

            if (m == QueueDisposable.SYNC) {
                sourceMode = m;
                queue = qd;
                done = true;
                downstream.onSubscribe(this);
                schedule();
                return;
            }
            if (m == QueueDisposable.ASYNC) {
                sourceMode = m;
                queue = qd;
                downstream.onSubscribe(this);
                return;
            }
        }

        queue = new SpscLinkedArrayQueue<T>(bufferSize);

        downstream.onSubscribe(this);
    }
}

根據(jù)之前的代碼我們知道這個(gè)d不是QueueDisposable
所以直接跳過(guò)if
對(duì)queue 進(jìn)行了初始化
這里queue 說(shuō)一下银亲,雖然不知道具體細(xì)節(jié),單可以肯定的是纽匙,他是一個(gè)隊(duì)列
然后繼續(xù)往下看downstream.onSubscribe(this)完成Disposable的繼續(xù)傳遞
7务蝠、接下來(lái)再看什么?是不是就是我們的onNext()方法了

public void onNext(T t) {
    if (done) {
        return;
    }

    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
    schedule();
}

因?yàn)槲覀冎肋@個(gè)t肯定要往下傳烛缔,所以done為false,sourceMode 也不等于QueueDisposable.ASYNC
然后我們被觀察者發(fā)送的數(shù)據(jù)t就被壓入了隊(duì)列queue里去了
8馏段、然后執(zhí)行schedule()

void schedule() {
    if (getAndIncrement() == 0) {
        worker.schedule(this);
    }
}

這里getAndIncrement()應(yīng)該是線程任務(wù)數(shù)(我猜的,返回這里肯定要為0)
然后進(jìn)入schedule方法

public Disposable schedule(@NonNull Runnable run) {
    return schedule(run, 0L, TimeUnit.NANOSECONDS);
}
走了重載的方法践瓷,而且schedule是個(gè)抽象方法

9院喜、那此時(shí)你還記得這個(gè)worker是誰(shuí)了么?
不錯(cuò)晕翠,看步驟4的最后喷舀,就是HandlerWorker,我們看它的schedule方法

public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
    if (run == null) throw new NullPointerException("run == null");
    if (unit == null) throw new NullPointerException("unit == null");

    if (disposed) {
        return Disposables.disposed();
    }

    run = RxJavaPlugins.onSchedule(run);

    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

    Message message = Message.obtain(handler, scheduled);
    message.obj = this; // Used as token for batch disposal of this worker's runnables.

    if (async) {
        message.setAsynchronous(true);
    }

    handler.sendMessageDelayed(message, unit.toMillis(delay));

    // Re-check disposed state for removing in case we were racing a call to dispose().
    if (disposed) {
        handler.removeCallbacks(scheduled);
        return Disposables.disposed();
    }

    return scheduled;
}

在代碼里只需要注意重點(diǎn)
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run)
這個(gè)handler是主線程handler(步驟4)
這個(gè)run就是步驟5里new的那個(gè)ObserveOnObserver淋肾,它實(shí)現(xiàn)了Runnable接口
下面執(zhí)行代碼
handler.sendMessageDelayed(message, unit.toMillis(delay))
好硫麻,此時(shí)run線程被執(zhí)行(如果不知道為什么可以好好看看handler基礎(chǔ)),而且是在主線程執(zhí)行7俊D美ⅰ!說(shuō)明線程已經(jīng)切換到主線程了
那么接下來(lái)呢碌尔?
不錯(cuò)浇辜!回到步驟5里new的那個(gè)ObserveOnObserver的run方法

public void run() {
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
    }
}

這里outputFused為false(賦值true的地方我沒(méi)有執(zhí)行券敌,其實(shí)具體看drainFused和drainNormal兩個(gè)方法的代碼,也可以辨別出其為false)
10奢赂、然后進(jìn)入drainNormal方法

void drainNormal() {
    int missed = 1;

    final SimpleQueue<T> q = queue;
    final Observer<? super T> a = downstream;

    for (;;) {
        if (checkTerminated(done, q.isEmpty(), a)) {
            return;
        }

        for (;;) {
            boolean d = done;
            T v;

            try {
                v = q.poll();
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                disposed = true;
                upstream.dispose();
                q.clear();
                a.onError(ex);
                worker.dispose();
                return;
            }
            boolean empty = v == null;

            if (checkTerminated(d, empty, a)) {
                return;
            }

            if (empty) {
                break;
            }

            a.onNext(v);
        }

        missed = addAndGet(-missed);
        if (missed == 0) {
            break;
        }
    }
}

直接說(shuō)重點(diǎn)
首先開(kāi)啟for循環(huán)陪白,當(dāng)隊(duì)列不為空的時(shí)候
v = q.poll();
拿到隊(duì)列里的值(就是步驟7里offer的那個(gè))
然后a.onNext(v)完成事件傳遞(往上面看颈走,a就是downstream)

自此就算結(jié)束了
總結(jié)---
關(guān)鍵步驟在9膳灶,通過(guò)主線程的hanlder完成線程切換

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市立由,隨后出現(xiàn)的幾起案子轧钓,更是在濱河造成了極大的恐慌,老刑警劉巖锐膜,帶你破解...
    沈念sama閱讀 218,755評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件毕箍,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡道盏,警方通過(guò)查閱死者的電腦和手機(jī)而柑,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)荷逞,“玉大人媒咳,你說(shuō)我怎么就攤上這事≈衷叮” “怎么了涩澡?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,138評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)坠敷。 經(jīng)常有香客問(wèn)我妙同,道長(zhǎng),這世上最難降的妖魔是什么膝迎? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,791評(píng)論 1 295
  • 正文 為了忘掉前任粥帚,我火速辦了婚禮,結(jié)果婚禮上限次,老公的妹妹穿的比我還像新娘茎辐。我一直安慰自己,他們只是感情好掂恕,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,794評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布拖陆。 她就那樣靜靜地躺著,像睡著了一般懊亡。 火紅的嫁衣襯著肌膚如雪依啰。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,631評(píng)論 1 305
  • 那天店枣,我揣著相機(jī)與錄音速警,去河邊找鬼叹誉。 笑死,一個(gè)胖子當(dāng)著我的面吹牛闷旧,可吹牛的內(nèi)容都是我干的长豁。 我是一名探鬼主播,決...
    沈念sama閱讀 40,362評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼忙灼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼匠襟!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起该园,我...
    開(kāi)封第一講書(shū)人閱讀 39,264評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤酸舍,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后里初,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體啃勉,經(jīng)...
    沈念sama閱讀 45,724評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評(píng)論 3 336
  • 正文 我和宋清朗相戀三年双妨,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了淮阐。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,040評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡刁品,死狀恐怖泣特,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情哑诊,我是刑警寧澤群扶,帶...
    沈念sama閱讀 35,742評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站镀裤,受9級(jí)特大地震影響竞阐,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜暑劝,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,364評(píng)論 3 330
  • 文/蒙蒙 一骆莹、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧担猛,春花似錦幕垦、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,944評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至蒸走,卻和暖如春仇奶,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背比驻。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,060評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工该溯, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留岛抄,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,247評(píng)論 3 371
  • 正文 我出身青樓狈茉,卻偏偏與公主長(zhǎng)得像夫椭,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子氯庆,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,979評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容