Rxjava源碼解析--observeOn指定線程

基于rxjava1.1.0 rxandroid 1.0.1

用例代碼↓
        Observable<String> observable1 = Observable.create(new Observable.OnSubscribe<String>() {
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("1");
                subscriber.onCompleted();
            }
        });

        Subscriber<String> subscriber1 = new Subscriber<String>() {
            @Override
            public void onCompleted() {
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onNext(String s) {
                Log.e("haha",s);
            }
        };
 observable1.observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber1);
observeOn源碼精簡↓
public final Observable<T> observeOn(Scheduler scheduler) {
        return lift(new OperatorObserveOn<T>(scheduler));
    }
AndroidSchedulers 源碼↓
public final class AndroidSchedulers {
    private AndroidSchedulers() {
        throw new AssertionError("No instances");
    }

    private static final Scheduler MAIN_THREAD_SCHEDULER =
            new HandlerScheduler(new Handler(Looper.getMainLooper()));

    ①
    public static Scheduler mainThread() {
        Scheduler scheduler =
                RxAndroidPlugins.getInstance().getSchedulersHook().getMainThreadScheduler();
        return scheduler != null ? scheduler : MAIN_THREAD_SCHEDULER;
    }
}
lift精簡源碼↓
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        ②
        //create Observable2  OnSubscribe2
        return new Observable<R>(new OnSubscribe<R>() {
            ③
            @Override
            public void call(Subscriber<? super R> o) {
                Subscriber<? super T> st = hook.onLift(operator).call(o);
                st.onStart();
                ⑤
                onSubscribe.call(st);//onSubscribe1.call(subscriber2)
            }
        });
    }
OperatorObserveOn源碼片段↓
    ④
    @Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {//child = subscriber1
        if (scheduler instanceof ImmediateScheduler) {
            // avoid overhead, execute directly
            return child;
        } else if (scheduler instanceof TrampolineScheduler) {
            // avoid overhead, execute directly
            return child;
        } else {
            ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child);
            parent.init();
            return parent;
        }
    }

        ⑥
        @Override
        public void onNext(final T t) {
            if (isUnsubscribed()) {
                return;
            }
            ⑦
            if (!queue.offer(on.next(t))) {
                onError(new MissingBackpressureException());
                return;
            }
            schedule();
        }

final Action0 action = new Action0() {
            @Override
            public void call() {
                pollQueue();
            }
        };

        protected void schedule() {
            if (counter.getAndIncrement() == 0) {
                recursiveScheduler.schedule(action);
            }
        }

pollQueue精簡版↓
void pollQueue() {
        Object o = queue.poll();
        if (o != null) {
            ⑧
            child.onNext(on.getValue(o));
        } else {
            break;
        }
    }

OperatorObserveOn.ObserveOnSubscriber源碼片段↓
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child) {
            this.child = child;
            this.recursiveScheduler = scheduler.createWorker();
            if (UnsafeAccess.isUnsafeAvailable()) {
                queue = new SpscArrayQueue<Object>(RxRingBuffer.SIZE);
            } else {
                queue = new SynchronizedQueue<Object>(RxRingBuffer.SIZE);
            }
            this.scheduledUnsubscribe = new ScheduledUnsubscribe(recursiveScheduler);
        }

代碼調(diào)用流程由①到最后
代碼分解
observable1.observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber1) =
observable1.lift(operatorObserveOn(func)).subscribe(subscriber1)=
observable2.subscribe(subscriber1)

執(zhí)行代碼首先在①創(chuàng)建一個(gè)HandlerScheduler 其Looper為主線程的Looper
繼續(xù)執(zhí)行②創(chuàng)建observable2 OnSubscribe2 此時(shí)訂閱關(guān)系變成observable2 .subscribe(subscriber1) 執(zhí)行observable2.OnSubscribe2.call(subscriber1)到達(dá)③傳入subscriber1到④中作為call()的入?yún)?此時(shí)child = subscriber1創(chuàng)建subscriber2

繼續(xù)執(zhí)行到達(dá)⑤等價(jià)執(zhí)行onSubscribe1.call(subscriber2) 即subscriber2.onNext("1")到達(dá)⑥其中subscriber2.onNext方法中在節(jié)點(diǎn)⑦把數(shù)據(jù)存放在隊(duì)列中然后執(zhí)行schedule();在節(jié)點(diǎn)⑧會(huì)在指定的線程從隊(duì)列中取出數(shù)據(jù)重新發(fā)射出來child.onNext(on.getValue(o));其中child為subscriber1 即調(diào)用subscriber1.onNext("123"));

至此流程完結(jié)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市操软,隨后出現(xiàn)的幾起案子嘁锯,更是在濱河造成了極大的恐慌,老刑警劉巖聂薪,帶你破解...
    沈念sama閱讀 211,376評(píng)論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件家乘,死亡現(xiàn)場離奇詭異,居然都是意外死亡藏澳,警方通過查閱死者的電腦和手機(jī)仁锯,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,126評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來笆载,“玉大人扑馁,你說我怎么就攤上這事涯呻。” “怎么了腻要?”我有些...
    開封第一講書人閱讀 156,966評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵复罐,是天一觀的道長。 經(jīng)常有香客問我雄家,道長效诅,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,432評(píng)論 1 283
  • 正文 為了忘掉前任趟济,我火速辦了婚禮乱投,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘顷编。我一直安慰自己戚炫,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,519評(píng)論 6 385
  • 文/花漫 我一把揭開白布媳纬。 她就那樣靜靜地躺著双肤,像睡著了一般。 火紅的嫁衣襯著肌膚如雪钮惠。 梳的紋絲不亂的頭發(fā)上茅糜,一...
    開封第一講書人閱讀 49,792評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音素挽,去河邊找鬼蔑赘。 笑死,一個(gè)胖子當(dāng)著我的面吹牛预明,可吹牛的內(nèi)容都是我干的缩赛。 我是一名探鬼主播,決...
    沈念sama閱讀 38,933評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼贮庞,長吁一口氣:“原來是場噩夢啊……” “哼峦筒!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起窗慎,我...
    開封第一講書人閱讀 37,701評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎卤材,沒想到半個(gè)月后遮斥,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,143評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡扇丛,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,488評(píng)論 2 327
  • 正文 我和宋清朗相戀三年术吗,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片帆精。...
    茶點(diǎn)故事閱讀 38,626評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡较屿,死狀恐怖隧魄,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情隘蝎,我是刑警寧澤购啄,帶...
    沈念sama閱讀 34,292評(píng)論 4 329
  • 正文 年R本政府宣布,位于F島的核電站嘱么,受9級(jí)特大地震影響狮含,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜曼振,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,896評(píng)論 3 313
  • 文/蒙蒙 一几迄、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧冰评,春花似錦映胁、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,742評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至务荆,卻和暖如春妆距,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背函匕。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評(píng)論 1 265
  • 我被黑心中介騙來泰國打工娱据, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人盅惜。 一個(gè)月前我還...
    沈念sama閱讀 46,324評(píng)論 2 360
  • 正文 我出身青樓中剩,卻偏偏與公主長得像,于是被迫代替她去往敵國和親抒寂。 傳聞我的和親對(duì)象是個(gè)殘疾皇子结啼,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,494評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容