迷之RxJava (三)—— 線程切換

【謎之RxJava (二) —— Magic Lift】

Rxjava -- 一個(gè)異步庫(kù)

RxJava最迷人的是什么夜涕?
答案就是把異步序列寫到一個(gè)工作流里朱嘴!javascriptPromise/A如出一轍哄尔。
OK振愿,在java中做異步的事情在我們傳統(tǒng)理解過(guò)來(lái)可不方便,而且红选,如果要讓異步按照我們的工作流來(lái)澜公,就更困難了。

但是在RxJava中喇肋,我們只要調(diào)用調(diào)用
subscribOn()observeOn()就能切換我們的工作線程坟乾,是不是讓小伙伴都驚呆了?

然后結(jié)合RxJavaOperator蝶防,寫異步的時(shí)候甚侣,想切換線程就是一行代碼的事情,整個(gè)workflow還非常清晰:

Observable.create()
// do something on io thread
.work() // work.. work..
.subscribeOn(Schedulers.io())
// observeOn android main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribe();

我們?cè)僖膊挥萌懯裁匆?jiàn)鬼的new ThreadHandler了慧脱,在這么幾行代碼里渺绒,我們實(shí)現(xiàn)了在io線程上做我們的工作(work),在main線程上,更新UI

Subscribe On

先看下subscribeOn干了什么

 public final Observable<T> subscribeOn(Scheduler scheduler) {
    if (this instanceof ScalarSynchronousObservable) {
        return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
    }
    return nest().lift(new OperatorSubscribeOn<T>(scheduler));
}

啊宗兼,原來(lái)也是個(gè)lift躏鱼,就是從一個(gè)Observable生成另外一個(gè)Observable咯,這個(gè)nest是干嘛用殷绍?

 public final Observable<Observable<T>> nest() {
    return just(this);
}

這里返回類型告訴我們染苛,它是產(chǎn)生一個(gè)Observable<Observable<T>>
講到這里,會(huì)有點(diǎn)暈主到,先記著這個(gè)茶行,然后我們看OperatorSubscribeOn這個(gè)操作符,

構(gòu)造函數(shù)是

public OperatorSubscribeOn(Scheduler scheduler) {
    this.scheduler = scheduler;
}

OK,這里保存了scheduler對(duì)象登钥,然后就是我們前一章說(shuō)過(guò)的轉(zhuǎn)換方法畔师。

 @Override
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) {
    final Worker inner = scheduler.createWorker();
    subscriber.add(inner);
    return new Subscriber<Observable<T>>(subscriber) {

        @Override
        public void onCompleted() {
            // ignore because this is a nested Observable and we expect only 1 Observable<T> emitted to onNext
        }

        @Override
        public void onError(Throwable e) {
            subscriber.onError(e);
        }

        @Override
        public void onNext(final Observable<T> o) {
            inner.schedule(new Action0() {

                @Override
                public void call() {
                    final Thread t = Thread.currentThread();
                    o.unsafeSubscribe(new Subscriber<T>(subscriber) {

                        @Override
                        public void onCompleted() {
                            subscriber.onCompleted();
                        }

                        @Override
                        public void onError(Throwable e) {
                            subscriber.onError(e);
                        }

                        @Override
                        public void onNext(T t) {
                            subscriber.onNext(t);
                        }

                        @Override
                        public void setProducer(final Producer producer) {
                            subscriber.setProducer(new Producer() {

                                @Override
                                public void request(final long n) {
                                    if (Thread.currentThread() == t) {
                                        // don't schedule if we're already on the thread (primarily for first setProducer call)
                                        // see unit test 'testSetProducerSynchronousRequest' for more context on this
                                        producer.request(n);
                                    } else {
                                        inner.schedule(new Action0() {

                                            @Override
                                            public void call() {
                                                producer.request(n);
                                            }
                                        });
                                    }
                                }

                            });
                        }

                    });
                }
            });
        }

    };
}

讓人糾結(jié)的類模板

看完這段又臭又長(zhǎng)的,先深呼吸一口氣牧牢,我們慢慢分析下看锉。
首先要注意RxJava里面最讓人頭疼的模板問(wèn)題,那么OperatorMap這個(gè)類的聲明是

public final class OperatorMap<T, R> implements Operator<R, T>

Operator這個(gè)接口繼承Func1

public interface Func1<T, R> extends Function {
    R call(T t);
}

我們這里不要記TR塔鳍,記住傳入左邊的模板是形參伯铣,傳入右邊的模板是返回值

好了轮纫,那么這里的call就是從一個(gè)T轉(zhuǎn)換成一個(gè)Observable<T>的過(guò)程了腔寡。

總結(jié)一下,我們這一次調(diào)用subscribeOn掌唾,做了兩件事

1放前、nest()Observable<T>生成了一個(gè)Observable<Observable<T>>
2、lift() 對(duì)Observalbe<Observalbe<T>>進(jìn)行一個(gè)變化郑兴,變回Observable<T>

因?yàn)?code>lift是一個(gè)模板函數(shù)犀斋,它的返回值的類型是參照它的形參來(lái),而他的形參是Operator<T, Observable<T>> 這個(gè)結(jié)論非常重要G榱叽粹!
OK,到這里我們已經(jīng)存儲(chǔ)了所有的序列却舀,等著我們調(diào)用了虫几。

調(diào)用鏈

首先,記錄我們?cè)谡{(diào)用這條指令之前的Observable<T>挽拔,記為Observable$1
然后辆脸,經(jīng)過(guò)lift生成的Observable<T>記為Observable$2

好了,現(xiàn)在我們拿到的依然是Observable<T>這個(gè)對(duì)象螃诅,但是它不是原始的Observable$1啡氢,要深深記住這一點(diǎn)状囱,它是由lift生成的Observable$2,這時(shí)候進(jìn)行subscribe倘是,那看到首先調(diào)用的就是OnSubscribe.call方法亭枷,好,直接進(jìn)入lift當(dāng)中生成的那個(gè)地方搀崭。

我們知道這一層liftoperator就是剛剛的OperatorSubscribOn叨粘,那么調(diào)用它的call方法,生成的是一個(gè)Subscriber<Observable<T>>

Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
    // new Subscriber created and being subscribed with so 'onStart' it
    st.onStart();
    onSubscribe.call(st);
} catch (Throwable e) {
...
}

好瘤睹,還記得我們調(diào)用過(guò)nest么升敲?,這里的onSubscribe可是nest上下文中的噢轰传,每一次驴党,到這個(gè)地方,這個(gè)onSubscribe就是上一層ObservableonSubscribe绸吸,即Observable<Observable<T>>onSubscribe鼻弧,相當(dāng)于棧彈出了一層设江。它的call直接在SubscriberonNext中給出了最開(kāi)始的Observable<T>锦茁,我們這里就要看下剛剛在OperatorSubscribeOn中生成的Subscriber

new Subscriber<Observable<T>>(subscriber) {

    @Override
    public void onCompleted() {
        // ignore because this is a nested Observable and we expect only 1 Observable<T> emitted to onNext
    }
    
    @Override
    public void onError(Throwable e) {
        subscriber.onError(e);
    }
    
    @Override
    public void onNext(final Observable<T> o) {
        inner.schedule(new Action0() {
    
            @Override
            public void call() {
                final Thread t = Thread.currentThread();
                o.unsafeSubscribe(new Subscriber<T>(subscriber) {
    
                    @Override
                    public void onCompleted() {
                        subscriber.onCompleted();
                    }
    
                    @Override
                    public void onError(Throwable e) {
                        subscriber.onError(e);
                    }
    
                    @Override
                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }
                });
            }
        });
    }
}

對(duì),就是它叉存,這里要注意码俩,這里的subscriber就是我們?cè)?code>lift中,傳入的o

Subscriber<? super T> st = hook.onLift(operator).call(o);

對(duì)歼捏,就是它稿存,其實(shí)它就是SafeSubscriber

回過(guò)頭瞳秽,看看剛剛的onNext()方法瓣履,inner.schedule() 這個(gè)函數(shù),我們可以認(rèn)為就是postRun()類似的方法练俐,而onNext()中傳入的o是我們之前生成的Observable$1袖迎,是從Observable.just封裝出來(lái)的Observable<Observable<T>>中產(chǎn)生的,這里調(diào)用了Observable$1.unsafeSubscribe方法腺晾,我們暫時(shí)不關(guān)心它和subscribe有什么不同燕锥,但是我們知道最終功能是一樣的就好了。

注意它運(yùn)行時(shí)的線程C醪酢归形!在inner這個(gè)Worker上!于是它的運(yùn)行線程已經(jīng)被改了1怯伞暇榴!

好厚棵,這里的unsafeSubscribe調(diào)用的方法就是調(diào)用原先Observable$1.onSubscribe中的call方法:
這個(gè)Observable$1就是我們之前自己定義的Observable了。

綜上所述蔼紧,如果我們需要我們的Observable$1在一個(gè)別的線程上運(yùn)行的時(shí)候窟感,只需要在后面跟一個(gè)subscribeOn即可。結(jié)合扔物線大大的圖如下:

rxjavarxjava_12.png
rxjavarxjava_12.png

總結(jié)

這里邏輯著實(shí)不好理解歉井。如果還沒(méi)有理解的朋友柿祈,可以按照我前文說(shuō)的順序,細(xì)致的看下來(lái)哩至,我把邏輯過(guò)一遍之后躏嚎,發(fā)現(xiàn)lift的陷阱實(shí)在太大,內(nèi)部類用的風(fēng)生水起菩貌,一不小心卢佣,就不知道一個(gè)變量的上下文是什么,需要特別小心箭阶。

本文在不停更新中虚茶,如果有不明白的地方(可能會(huì)有很多),請(qǐng)大家給出意見(jiàn)仇参,拍磚請(qǐng)輕點(diǎn)= =

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末嘹叫,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子诈乒,更是在濱河造成了極大的恐慌罩扇,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,839評(píng)論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件怕磨,死亡現(xiàn)場(chǎng)離奇詭異喂饥,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)肠鲫,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,543評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門员帮,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人导饲,你說(shuō)我怎么就攤上這事捞高。” “怎么了帜消?”我有些...
    開(kāi)封第一講書人閱讀 153,116評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵棠枉,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我泡挺,道長(zhǎng)辈讶,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書人閱讀 55,371評(píng)論 1 279
  • 正文 為了忘掉前任娄猫,我火速辦了婚禮贱除,結(jié)果婚禮上生闲,老公的妹妹穿的比我還像新娘。我一直安慰自己月幌,他們只是感情好碍讯,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,384評(píng)論 5 374
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著扯躺,像睡著了一般捉兴。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上录语,一...
    開(kāi)封第一講書人閱讀 49,111評(píng)論 1 285
  • 那天倍啥,我揣著相機(jī)與錄音,去河邊找鬼澎埠。 笑死虽缕,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的蒲稳。 我是一名探鬼主播氮趋,決...
    沈念sama閱讀 38,416評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼江耀!你這毒婦竟也來(lái)了剩胁?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書人閱讀 37,053評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤决记,失蹤者是張志新(化名)和其女友劉穎摧冀,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體系宫,經(jīng)...
    沈念sama閱讀 43,558評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,007評(píng)論 2 325
  • 正文 我和宋清朗相戀三年建车,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了扩借。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,117評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡缤至,死狀恐怖潮罪,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情领斥,我是刑警寧澤嫉到,帶...
    沈念sama閱讀 33,756評(píng)論 4 324
  • 正文 年R本政府宣布,位于F島的核電站月洛,受9級(jí)特大地震影響何恶,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜嚼黔,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,324評(píng)論 3 307
  • 文/蒙蒙 一细层、第九天 我趴在偏房一處隱蔽的房頂上張望惜辑。 院中可真熱鬧,春花似錦疫赎、人聲如沸盛撑。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 30,315評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)抵卫。三九已至,卻和暖如春胎撇,著一層夾襖步出監(jiān)牢的瞬間陌僵,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 31,539評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工创坞, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留碗短,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,578評(píng)論 2 355
  • 正文 我出身青樓题涨,卻偏偏與公主長(zhǎng)得像偎谁,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子纲堵,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,877評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容