RxJava如何結(jié)合觀察者與鏈?zhǔn)教幚?/h1>

RxJava如何結(jié)合觀察者與鏈?zhǔn)教幚?/h1>

Author: Dorae
Date: 2018年12月3日17:10:31
轉(zhuǎn)載請注明出處


一性芬、概述

首先問自己幾個(gè)問題慨蓝,如果非常清楚這幾個(gè)問題的目的與答案,那么恭喜你竹伸,不用繼續(xù)往下看了-_-泥栖。

  1. RxJava是干什么的簇宽;
  2. 鏈?zhǔn)秸{(diào)用中當(dāng)存在數(shù)個(gè)Observable.subscribeOn()時(shí)的處理方式;
  3. 鏈?zhǔn)秸{(diào)用中當(dāng)存在數(shù)個(gè)Observable.observeOn()時(shí)的處理方式吧享;
  4. 數(shù)據(jù)是如何經(jīng)過操作符進(jìn)行的處理魏割。

回顧

觀察者模式

如圖1-1所示

[圖片上傳失敗...(image-f38e70-1545098102748)]

圖 1-1

Java8的stream

參見這里

二、RxJava是什么

一款為了簡化異步調(diào)用钢颂,且功能比較全面的框架钞它。

三、RxJava如何結(jié)合了觀察者模式與鏈?zhǔn)教幚?/h2>

參見Java8中的sink鏈殊鞭,在RxJava中同樣實(shí)現(xiàn)了鏈?zhǔn)教幚碓舛狻H绱a片段code1-1所示,我們對其結(jié)構(gòu)進(jìn)行分析:

code1-1

Observable.create(new ObservableOnSubscribe<String>() {

        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            emitter.onNext("Dorae");
        }
    })
    .filter(e -> e.contains("o"))
    .map(e -> "AfterMap: " + e)
    .filter(e -> e.contains("D"))
    .subscribe(new Observer<String>() {

                @Override
                public void onNext(@NonNull String o) {
                    System.out.println("觀察者 onNext: " + o);
                }

                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("觀察者onSubscribe: " + d + "### " + Thread.currentThread().getName());
                }

                @Override
                public void onError(Throwable e) {
                }

                @Override
                public void onComplete() {

                }
            });

1操灿、首先看下其輸出結(jié)果:

觀察者onSubscribe: io.reactivex.internal.operators.observable.ObservableFilter$FilterObserver@52d455b8### main

觀察者 onNext: AfterMap: Dorae

2锯仪、現(xiàn)在來對如何輸出這段結(jié)果進(jìn)行分析

首先了解下RxJava中的幾種基本角色:

  • Observable,是RxJava描述的事件流趾盐,可以說其與Observer構(gòu)成了RxJava的基礎(chǔ)庶喜。在鏈?zhǔn)秸{(diào)用中,事件從創(chuàng)建到加工到最后被Observer接受谤碳,其實(shí)就是由一條Observerable鏈串起來的溃卡。
  • Observer溢豆,RxJava中的訂閱者蜒简,也就是需要對事件進(jìn)行響應(yīng)的一個(gè)角色。其實(shí)除了我們通常自己實(shí)現(xiàn)的一個(gè)Observer漩仙,在鏈中的每一步都會(huì)產(chǎn)生一個(gè)Observer搓茬,然后這些Observer構(gòu)成一條鏈,最終完成整個(gè)鏈的計(jì)算队他。
  • ObservableOnSubscribe卷仑,整個(gè)事件流的源頭,通常需要我們自己實(shí)現(xiàn)麸折,其依賴一個(gè)Emitter锡凝。
  • Emitter,可以將其理解為觸發(fā)器垢啼,推動(dòng)整個(gè)流程的運(yùn)轉(zhuǎn)窜锯。
  • Scheduler,這個(gè)其實(shí)不用太過關(guān)心芭析,RxJava用其封裝了Thread锚扎,用于完成線程切換等任務(wù)。

是不是感覺上邊的一堆廢話非衬倨簦枯燥驾孔?先上一張RxJava的核心結(jié)構(gòu),如圖3-1所示。

圖 3-1

圖 3-1

現(xiàn)在我們再來看看code1-1翠勉,其最終形成的Observable鏈如圖3-2所示妖啥,每次調(diào)用map、filter等操作对碌,都會(huì)生成一個(gè)新對象迹栓,并且保持了一個(gè)對上游的引用(用于生成Observer鏈)。

[圖片上傳失敗...(image-e79420-1545098102748)]

圖 3-2

Observer鏈如圖3-3所示俭缓,整個(gè)事件流程由CreateEmitter觸發(fā)克伊,最終交由我們的實(shí)現(xiàn)Observer$1處理。

[圖片上傳失敗...(image-da5c92-1545098102748)]

圖 3-3

看了上邊幾張圖之后华坦,是不是感覺清晰了很多愿吹?那么讓我們進(jìn)一步看下Rxjava如何完成了一鍵線程切換。

四惜姐、RxJava如何實(shí)現(xiàn)線程切換

通常我們使用RxJava的線程切換功能時(shí)犁跪,只需要在調(diào)用鏈中加上一句subscribeOn()或observeOn(),其中Scheduler如上所述歹袁,其實(shí)就是一個(gè)包裝了ThreadPool的調(diào)度器坷衍。那么我們先來看下相關(guān)源碼。

1条舔、subscribeOn

如代碼code4-1所示枫耳,為subscribeOn的核心代碼。很明顯孟抗,其中在新線程中只是簡單的直接調(diào)用了source迁杨,也就是說這里之后的所有操作均在一個(gè)新線程中進(jìn)行,和單線程并沒有什么區(qū)別凄硼。

code 4-1

public final Observable<T> subscribeOn(Scheduler scheduler) {
    return new ObservableSubscribeOn<T>() {
        @Override
        public void subscribeActual(final Observer<? super T> observer) {
            scheduler.createWorker().schedule(new SubscribeTask() {
                @Override
                public void run() {
                    source.subscribe(e);
                }
            });
        }
    };
}

2铅协、observeOn

如代碼段code4-2所示,為observeOn的核心邏輯摊沉,可以看出其在訂閱階段(生成Observer鏈的階段)還是在當(dāng)前線程執(zhí)行狐史,只有觸發(fā)之后,到了ObserverOn的Observer的節(jié)點(diǎn)時(shí)才會(huì)真正的切換到新線程说墨。

code 4-2

public final Observable<T> observeOn(Scheduler scheduler) {
    return new ObservableOnSubscribe<T>() {
        @Override
        public void subscribeActual(@NonNull Observer<Object> e) { 
            source.subscribe(new Observer<T>() {

                @Override
                public void onNext(T var1) {
                    scheduler.createWorker().schedule(new Runnable() {
                        @Override
                        public void run() {
                            e.onNext(var1);
                        }
                    });
                }
            });
        }
    };
}

多次Observable.subscribeOn()骏全、多次Observable.observeOn()會(huì)發(fā)生什么

通過上述code4-1、code4-2的分析婉刀,是不是可以推斷出當(dāng)多次subscribeOn時(shí)會(huì)發(fā)生什么吟温?沒錯(cuò),雖然每次subscribeOn都會(huì)產(chǎn)生一次線程切換突颊,但是真正起作用的只有最開始的一次subscribeOn鲁豪,也就相當(dāng)于只在最初的位置調(diào)用了subscribeOn潘悼;對于observeOn也是類似,每次都會(huì)產(chǎn)生新線程爬橡,但是每次都會(huì)產(chǎn)生一定的影響治唤,也就是每個(gè)線程都承擔(dān)了一部分工作。

小結(jié)

通過本文糙申,我們可以簡要了解到RxJava的基本原理宾添,但是對于其豐富的api還需要在實(shí)踐中進(jìn)行磨合。但是柜裸,RxJava既然作為一個(gè)異步框架缕陕,其必然有一定的局限,比如其切換線程時(shí)無法阻塞當(dāng)前線程(這種對于Android等需要渲染或者網(wǎng)絡(luò)IO的需求來說非常適用)疙挺,但是對于常見的服務(wù)端業(yè)務(wù)來說扛邑,還需要額外引入阻塞當(dāng)前線程的操作(因?yàn)榇蟛糠值膕erver代碼還是單線程模型),倘若完全不用線程切換在服務(wù)端強(qiáng)行引入铐然,可能會(huì)得不償失蔬崩。個(gè)人更推薦Java8的CompletableFuture。

參考

理解RxJava(一)基本流程源碼分析

RxJava基本原理分析

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者

  • 序言:七十年代末搀暑,一起剝皮案震驚了整個(gè)濱河市沥阳,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌自点,老刑警劉巖桐罕,帶你破解...
    沈念sama閱讀 222,378評論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異樟氢,居然都是意外死亡冈绊,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,970評論 3 399
  • 文/潘曉璐 我一進(jìn)店門埠啃,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人伟恶,你說我怎么就攤上這事碴开。” “怎么了博秫?”我有些...
    開封第一講書人閱讀 168,983評論 0 362
  • 文/不壞的土叔 我叫張陵潦牛,是天一觀的道長。 經(jīng)常有香客問我挡育,道長巴碗,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,938評論 1 299
  • 正文 為了忘掉前任即寒,我火速辦了婚禮橡淆,結(jié)果婚禮上召噩,老公的妹妹穿的比我還像新娘。我一直安慰自己逸爵,他們只是感情好具滴,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,955評論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著师倔,像睡著了一般构韵。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上趋艘,一...
    開封第一講書人閱讀 52,549評論 1 312
  • 那天疲恢,我揣著相機(jī)與錄音,去河邊找鬼瓷胧。 笑死冈闭,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的抖单。 我是一名探鬼主播萎攒,決...
    沈念sama閱讀 41,063評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼矛绘!你這毒婦竟也來了耍休?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,991評論 0 277
  • 序言:老撾萬榮一對情侶失蹤货矮,失蹤者是張志新(化名)和其女友劉穎羊精,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體囚玫,經(jīng)...
    沈念sama閱讀 46,522評論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡喧锦,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,604評論 3 342
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了抓督。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片燃少。...
    茶點(diǎn)故事閱讀 40,742評論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖铃在,靈堂內(nèi)的尸體忽然破棺而出阵具,到底是詐尸還是另有隱情,我是刑警寧澤定铜,帶...
    沈念sama閱讀 36,413評論 5 351
  • 正文 年R本政府宣布阳液,位于F島的核電站,受9級特大地震影響揣炕,放射性物質(zhì)發(fā)生泄漏帘皿。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,094評論 3 335
  • 文/蒙蒙 一畸陡、第九天 我趴在偏房一處隱蔽的房頂上張望鹰溜。 院中可真熱鬧虽填,春花似錦、人聲如沸奉狈。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,572評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽仁期。三九已至桑驱,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間跛蛋,已是汗流浹背熬的。 一陣腳步聲響...
    開封第一講書人閱讀 33,671評論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留赊级,地道東北人押框。 一個(gè)月前我還...
    沈念sama閱讀 49,159評論 3 378
  • 正文 我出身青樓,卻偏偏與公主長得像理逊,于是被迫代替她去往敵國和親橡伞。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,747評論 2 361

推薦閱讀更多精彩內(nèi)容