我學 rxjava 2(4)- subscribeOn/observeOn 切換的是誰的線程

rxjava 的東西是很多的马澈,難免有理解錯誤的地方,這兩天面試碰到有人問 subscribeOn/observeOn 線程切換的問題弄息,我回答完痊班,面試官明顯不滿意,回來找了找資料摹量,還真是自己理解錯了涤伐,有必要專門寫一篇文章出來馒胆。

例子1 : subscribeOn/observeOn 最簡單使用


        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.d("AA", "數(shù)據(jù)源" + Thread.currentThread().getName());
                emitter.onNext("");
            }
        })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d("AA", "監(jiān)聽者" + Thread.currentThread().getName());
                    }
                });
Snip20180421_12.png

按照之前的理解:

  • subscribeOn 是決定 observable 中產(chǎn)生數(shù)據(jù)的方法執(zhí)行在哪個線程
  • observeOn 是決定 observer 消費數(shù)據(jù)的方法執(zhí)行在哪個線程

我們看這個最簡單的例子,的確是這樣凝果,那么更復雜的情況呢祝迂。

例子2:subscribeOn/observeOn 連著重復寫,哪個為準


還是以上面那個最簡單的例子來

        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.d("AA", "數(shù)據(jù)源" + Thread.currentThread().getName());
                emitter.onNext("");
            }
        })
                .subscribeOn(Schedulers.io())
                .subscribeOn(Schedulers.computation())
                .observeOn(AndroidSchedulers.mainThread())
                .observeOn(Schedulers.io())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d("AA", "監(jiān)聽者" + Thread.currentThread().getName());
                    }
                });
Snip20180421_13.png

從結(jié)果來看:

  • 多個 subscribeOn 連著寫器净,以第一個為準
  • 多個 observeOn 連著寫型雳,以最后一個為準

例子3 :添加多個操作符呢


rxjava 中的操作符基本都會生成一個新的 observable 出來,上下游的關(guān)系就復雜了山害,情況會不會有變化呢纠俭,這個例子就復雜了

        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.d("AA", "數(shù)據(jù)源" + Thread.currentThread().getName());
                emitter.onNext("");
            }
        })
                .subscribeOn(Schedulers.computation())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        Log.d("AA", "第1次變化" + Thread.currentThread().getName());
                        return "";
                    }
                })
                .subscribeOn(AndroidSchedulers.mainThread())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        Log.d("AA", "第2次變化" + Thread.currentThread().getName());
                        return "";
                    }
                })
                .observeOn(Schedulers.io())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        Log.d("AA", "第3次變化" + Thread.currentThread().getName());
                        return "";
                    }
                })
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        Log.d("AA", "第4次變化" + Thread.currentThread().getName());
                        return "";
                    }
                })
                .observeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d("AA", "監(jiān)聽者" + Thread.currentThread().getName());
                    }
                });
Snip20180421_17.png

從結(jié)果看:

  • subscribeOn 決定 Observable.create 的執(zhí)行線程,之后再寫 subscribeOn 浪慌,無論是挨著寫冤荆,還是隔著操作符寫都沒有意思
  • subscribeOn 決定數(shù)據(jù)源的執(zhí)行線程后,也會當前線程置為這個線程眷射,若無其他設(shè)置匙赞,之后操作符的操作也是在當前線程執(zhí)行,也就是 subscribeOn 指定的線程
  • observeOn 不僅僅可以決定 .subscribe 執(zhí)行的線程妖碉,更能夠更改 observeOn 之后書寫的操作符的執(zhí)行線程涌庭,也就是可以切換當前線程。

例子4:用 observeOn 給多個操作符切換線程


        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.d("AA", "數(shù)據(jù)源" + Thread.currentThread().getName());
                emitter.onNext("");
            }
        })
                .subscribeOn(Schedulers.computation())
                .observeOn( Schedulers.io() )
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        Log.d("AA", "第1次變化" + Thread.currentThread().getName());
                        return "";
                    }
                })
                .observeOn(Schedulers.io())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        Log.d("AA", "第2次變化" + Thread.currentThread().getName());
                        return "";
                    }
                })
                .observeOn(Schedulers.computation())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        Log.d("AA", "第3次變化" + Thread.currentThread().getName());
                        return "";
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        Log.d("AA", "第4次變化" + Thread.currentThread().getName());
                        return "";
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d("AA", "監(jiān)聽者" + Thread.currentThread().getName());
                    }
                });
Snip20180421_19.png

從結(jié)果看:

  • observeOn 的的確確是可以 rxjava 所在線程

好了來說說原理


因為 observeOn() 指定的是 Subscriber 的線程欧宜,而這個 Subscriber 并不是(嚴格說應該為『不一定是』坐榆,但這里不妨理解為『不是』)subscribe() 參數(shù)中的 Subscriber ,而是 observeOn() 執(zhí)行時的當前 Observable 所對應的 Subscriber 冗茸,即它的直接下級 Subscriber 席镀。換句話說,observeOn() 指定的是它之后的操作所在的線程夏漱。因此如果有多次切換線程的需求豪诲,只要在每個想要切換線程的位置調(diào)用一次 observeOn() 即可。 ——扔物線

摘自:擁抱RxJava(番外篇):關(guān)于RxJava的Tips & Tricks 挂绰,推薦大家去看看原文

我們翻翻源碼呢屎篱,看看能不能簡單的走一下邏輯


1.png
2.png
3.png
  • 我們可以看到 map 是生成了一個新的 observable 出來,這個 observable 還有我們的變化數(shù)據(jù)的接口類
  • 在這個 新的 observable 里面葵蒂,有上面的 observable 對象引用交播,然后給這個上面的 observable 對象注冊了一個新的觀察者進來
  • 這個新的觀察者即是一個 observer,但同時還是一個 observable 践付,這個新的觀察者在數(shù)據(jù)生成方法中接受上一級 observable 發(fā)送過來的數(shù)據(jù)秦士,然后根據(jù)我們傳入的數(shù)據(jù)變換接口對象計算出新的數(shù)據(jù),最后發(fā)送給消費者或是下一級

不是很好理解永高,但是大概應該是這個意思

換個更容易理解的描述:

  • subscribeOn 決定的是上游線程隧土,上游切換到哪個線程提针,下游要是不改的話,rxjava 就在這個線程一直跑
  • observeOn 決定的是下游線程
  • 整個 rxjava 中嚴格說來真正的上游只有一個次洼,那就是產(chǎn)生數(shù)據(jù)的位置关贵,比如 .just / ,create,其他任何變換和操作符卖毁,注冊都是下游揖曾。
  • 所以 subscribeOn 只有第一次切換有效,作用范圍也是最小的亥啦,就是 .just / ,create
  • 基本上操作符都會生成一個新的 observable 出來炭剪,和之前的 observable 關(guān)聯(lián)(其實也是注冊到之前的 observable)。所以在一個操作的范圍來看翔脱,前一個 observable 發(fā)送數(shù)據(jù)給我奴拦,算是上游,我這個操作符消費數(shù)據(jù)届吁,產(chǎn)生新的 observable 错妖,算是下游
  • 所以 observeOn 可以多次切換他之后的操作符的線程

參考資料:


最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市疚沐,隨后出現(xiàn)的幾起案子暂氯,更是在濱河造成了極大的恐慌,老刑警劉巖亮蛔,帶你破解...
    沈念sama閱讀 216,496評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件痴施,死亡現(xiàn)場離奇詭異,居然都是意外死亡究流,警方通過查閱死者的電腦和手機辣吃,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,407評論 3 392
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來芬探,“玉大人神得,你說我怎么就攤上這事⊥捣拢” “怎么了哩簿?”我有些...
    開封第一講書人閱讀 162,632評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長炎疆。 經(jīng)常有香客問我卡骂,道長国裳,這世上最難降的妖魔是什么形入? 我笑而不...
    開封第一講書人閱讀 58,180評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮缝左,結(jié)果婚禮上亿遂,老公的妹妹穿的比我還像新娘浓若。我一直安慰自己,他們只是感情好蛇数,可當我...
    茶點故事閱讀 67,198評論 6 388
  • 文/花漫 我一把揭開白布挪钓。 她就那樣靜靜地躺著,像睡著了一般耳舅。 火紅的嫁衣襯著肌膚如雪碌上。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,165評論 1 299
  • 那天浦徊,我揣著相機與錄音馏予,去河邊找鬼。 笑死盔性,一個胖子當著我的面吹牛霞丧,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播冕香,決...
    沈念sama閱讀 40,052評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼蛹尝,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了悉尾?” 一聲冷哼從身側(cè)響起突那,我...
    開封第一講書人閱讀 38,910評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎焕襟,沒想到半個月后陨收,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,324評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡鸵赖,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,542評論 2 332
  • 正文 我和宋清朗相戀三年务漩,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片它褪。...
    茶點故事閱讀 39,711評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡饵骨,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出茫打,到底是詐尸還是另有隱情居触,我是刑警寧澤,帶...
    沈念sama閱讀 35,424評論 5 343
  • 正文 年R本政府宣布老赤,位于F島的核電站轮洋,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏抬旺。R本人自食惡果不足惜弊予,卻給世界環(huán)境...
    茶點故事閱讀 41,017評論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望开财。 院中可真熱鬧汉柒,春花似錦误褪、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,668評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至正塌,卻和暖如春嘀略,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背乓诽。 一陣腳步聲響...
    開封第一講書人閱讀 32,823評論 1 269
  • 我被黑心中介騙來泰國打工屎鳍, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人问裕。 一個月前我還...
    沈念sama閱讀 47,722評論 2 368
  • 正文 我出身青樓逮壁,卻偏偏與公主長得像,于是被迫代替她去往敵國和親粮宛。 傳聞我的和親對象是個殘疾皇子窥淆,可洞房花燭夜當晚...
    茶點故事閱讀 44,611評論 2 353

推薦閱讀更多精彩內(nèi)容