RxJava的學習筆記(三)線程調度

前言

RxJava的學習筆記(一)基礎概念
RxJava的學習筆記(二)操作符
RxJava的學習筆記(三)線程調度
上一節(jié)筆記二中記錄了RxJava2常用操作符的使用,在本節(jié)中繼續(xù)學習RxJava2中最強大、最牛逼的地方傀顾,那就是可以在各個事件產生和傳遞的過程中能夠自由的切換線程。
要在Android中使用RxAndroid, 先繼續(xù)添加Gradle配置:

 compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

1、Scheduler

Scheduler顧名思義被稱為線程調度器。有了這個東西能夠讓RxJava中可以自由的在主線程和子線程之間切換攀隔,在RxJava中內置了以下Scheduler線程調度器:

調度器類型 用途
Schedulers.computation() 用于計算任務,如事件循環(huán)或和回調處理栖榨,不要用于IO操作(IO操作請使用Schedulers.io())昆汹;默認線程數等于處理器的數量
Schedulers.from(executor) 使用指定的Executor線程池作為自定義的調度器
Schedulers.single() RxJava2新增調度器,返回一個默認的婴栽,共享的满粗,單線程支持的Scheduler實例,用于在同一個后臺線程上強制執(zhí)行的工作愚争。
Schedulers.immediate(?) RxJava1.0里面有該調度器焦匈,到2.0之后已被移除翩伪。在當前線程立即開始執(zhí)行任務
Schedulers.io(?) 用于IO密集型任務着倾,如異步阻塞IO操作寺旺,這個調度器的線程池會根據需要增長;對于普通的計算任務鞍陨,請使用Schedulers.computation()步淹;Schedulers.io(?)默認是一個CachedThreadScheduler,類似一個有線程緩存的新線程調度器
Schedulers.newThread(?) 為每個任務創(chuàng)建一個新線程
Schedulers.trampoline(?) 當其它排隊的任務完成后诚撵,在當前線程排隊開始執(zhí)行
AndroidSchedulers.mainThread() 此調度器為RxAndroid特有贤旷,顧名思義,運行在Android UI線程上

2砾脑、Scheduler線程調度的實踐

在RxJava中可以利用subscribeOn() 結合 observeOn() 來實現線程控制,讓事件的產生和消費發(fā)生在不同的線程艾杏。對于subscribeOn和observeOn方法的使用特點和對應的作用分下面幾種情況:

  • 1韧衣、不進行線程Scheduler切換控制時都是在當前主線程里面運行。
    示例:
    public static void noScheduler() {
        Log.d(XqTag.TAG, "start.thread:" + Thread.currentThread());
        Observable.just("data")
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        Log.d(XqTag.TAG, "map0.thread:" + Thread.currentThread());
                        return s + "-map0";
                    }
                })
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(XqTag.TAG, "onSubscribe.thread:" + Thread.currentThread());
                    }

                    @Override
                    public void onNext(String value) {
                        Log.d(XqTag.TAG, "onNext:" + value + ".thread:" + Thread.currentThread());
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(XqTag.TAG, "onError:" + e.toString() + ".thread:" + Thread.currentThread());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(XqTag.TAG, "onComplete.thread:" + Thread.currentThread());
                    }
                });
    }

得到的結果:

08-15 18:30:04.854 10161-10161/cn.jltx.rxjava.rx D/XQTAG: start.thread:Thread[main,5,main]
08-15 18:30:04.863 10161-10161/cn.jltx.rxjava.rx D/XQTAG: onSubscribe.thread:Thread[main,5,main]
08-15 18:30:04.863 10161-10161/cn.jltx.rxjava.rx D/XQTAG: map0.thread:Thread[main,5,main]
08-15 18:30:04.863 10161-10161/cn.jltx.rxjava.rx D/XQTAG: onNext:data-map0.thread:Thread[main,5,main]
08-15 18:30:04.863 10161-10161/cn.jltx.rxjava.rx D/XQTAG: onComplete.thread:Thread[main,5,main]
  • 2购桑、subscribeOn無論放置在哪個位置畅铭,它的線程切換發(fā)生在該Observable的OnSubscribe 中,即在它通知上一級 OnSubscribe 時勃蜘,這時事件還沒有開始發(fā)送硕噩,因此 subscribeOn() 的線程控制可以從事件發(fā)出的開端就造成影響。當使用多個subscribeOn() 的時候缭贡,只有第一個 subscribeOn() 起作用炉擅。
    示例:
    public static void subscribeOnScheduler() {
        Log.d(XqTag.TAG, "start.thread:" + Thread.currentThread());
        Observable.just("data")
                .subscribeOn(Schedulers.io())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        Log.d(XqTag.TAG, "map0.thread:" + Thread.currentThread());
                        return s + "-map0";
                    }
                })
                .subscribeOn(Schedulers.newThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(XqTag.TAG, "onSubscribe.thread:" + Thread.currentThread());
                    }

                    @Override
                    public void onNext(String value) {
                        Log.d(XqTag.TAG, "onNext:" + value + ".thread:" + Thread.currentThread());
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(XqTag.TAG, "onError:" + e.toString() + ".thread:" + Thread.currentThread());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(XqTag.TAG, "onComplete.thread:" + Thread.currentThread());
                    }
                });
    }

得到的結果是:

08-15 18:33:31.654 10161-10161/cn.jltx.rxjava.rx D/XQTAG: start.thread:Thread[main,5,main]
08-15 18:33:31.671 10161-10161/cn.jltx.rxjava.rx D/XQTAG: onSubscribe.thread:Thread[main,5,main]
08-15 18:33:31.695 10161-10499/cn.jltx.rxjava.rx D/XQTAG: map0.thread:Thread[RxCachedThreadScheduler-1,5,main]
08-15 18:33:31.695 10161-10499/cn.jltx.rxjava.rx D/XQTAG: onNext:data-map0.thread:Thread[RxCachedThreadScheduler-1,5,main]
08-15 18:33:31.695 10161-10499/cn.jltx.rxjava.rx D/XQTAG: onComplete.thread:Thread[RxCachedThreadScheduler-1,5,main]

只有第一個subscribeOn()生效辉懒,而最后的subscribeOn就沒有效果了。

  • 3谍失、通過observeOn進行線程切換時眶俩,發(fā)生在自己內部構建的Observable被訂閱者中,也就是在它即將給下一級的訂閱者發(fā)送事件時快鱼,因此颠印,ObserveOn()控制的是后面的線程,可以多個ObserveOn()一起使用抹竹。每使用一次ObserveOn()线罕,它后面的線程就跟著變換一次。
52eb2279jw1f2rxd1vl7xj20hd0hzq6e.jpg

示例:

    public static void observeOnScheduler() {
        Log.d(XqTag.TAG, "start.thread:" + Thread.currentThread());
        Observable.just("data")
                .observeOn(Schedulers.io())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        Log.d(XqTag.TAG, "map0.thread:" + Thread.currentThread());
                        return s + "-map0";
                    }
                })
                .observeOn(Schedulers.computation())
                .observeOn(Schedulers.newThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(XqTag.TAG, "onSubscribe.thread:" + Thread.currentThread());
                    }

                    @Override
                    public void onNext(String value) {
                        Log.d(XqTag.TAG, "onNext:" + value + ".thread:" + Thread.currentThread());
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(XqTag.TAG, "onError:" + e.toString() + ".thread:" + Thread.currentThread());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(XqTag.TAG, "onComplete.thread:" + Thread.currentThread());
                    }
                });
    }

得到的結果是:

08-15 10:52:51.998 2498-2498/cn.jltx.rxjava.rx D/XQTAG: start.thread:Thread[main,5,main]
08-15 10:52:52.003 2498-2498/cn.jltx.rxjava.rx D/XQTAG: onSubscribe.thread:Thread[main,5,main]
08-15 10:52:52.004 2498-2577/cn.jltx.rxjava.rx D/XQTAG: map0.thread:Thread[RxCachedThreadScheduler-1,5,main]
08-15 10:52:52.018 2498-2579/cn.jltx.rxjava.rx D/XQTAG: onNext:data-map0.thread:Thread[RxNewThreadScheduler-1,5,main]
08-15 10:52:52.018 2498-2579/cn.jltx.rxjava.rx D/XQTAG: onComplete.thread:Thread[RxNewThreadScheduler-1,5,main]
  • 4窃判、下游的onSubscrible的回調是在subscrible()訂閱動作的時候就被調用了钞楼,因此不能先直接指定onSubscrible執(zhí)行的線程,而只能在subscribe()被調用時的線程中兢孝。
  • 5窿凤、對于設置doOnSubscribe回調默認跟onSubscribe的回調規(guī)則那樣在執(zhí)行subscribe()訂閱動作的時候被調用了,但是如果在 doOnSubscribe() 之后有 subscribeOn() 跨蟹,它將執(zhí)行在離它最近的 subscribeOn() 所指定的線程雳殊。
    示例:
    public static void subscribeAndobserveOnScheduler() {
        Log.d(XqTag.TAG, "start.thread:" + Thread.currentThread());
        Observable.just("data")
                .observeOn(Schedulers.io())
                .doOnSubscribe(new Consumer<Disposable>() {
                    @Override
                    public void accept(Disposable disposable) throws Exception {
                        Log.d(XqTag.TAG, "doOnSubscribe.thread:" + Thread.currentThread());
                    }
                })
                //doOnSubscribe后面設置了subscribeOn后執(zhí)行離它最近的subscribeOn所指定的線程
                .subscribeOn(Schedulers.newThread())
                .subscribeOn(Schedulers.io())
                .flatMap(new Function<String, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(String s) throws Exception {
                        Log.d(XqTag.TAG, "map0.thread:" + Thread.currentThread());
                        return Observable.just(s + "-map0");
                    }
                })
                .observeOn(Schedulers.computation())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(XqTag.TAG, "onSubscribe.thread:" + Thread.currentThread());
                    }

                    @Override
                    public void onNext(String value) {
                        Log.d(XqTag.TAG, "onNext:" + value + ".thread:" + Thread.currentThread());
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(XqTag.TAG, "onError:" + e.toString() + ".thread:" + Thread.currentThread());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(XqTag.TAG, "onComplete.thread:" + Thread.currentThread());
                    }
                });
    }

得到結果是:

08-15 10:58:41.261 2498-2498/cn.jltx.rxjava.rx D/XQTAG: start.thread:Thread[main,5,main]
08-15 10:58:41.267 2498-2498/cn.jltx.rxjava.rx D/XQTAG: onSubscribe.thread:Thread[main,5,main]
08-15 10:58:41.268 2498-2606/cn.jltx.rxjava.rx D/XQTAG: doOnSubscribe.thread:Thread[RxNewThreadScheduler-2,5,main]
08-15 10:58:41.268 2498-2605/cn.jltx.rxjava.rx D/XQTAG: map0.thread:Thread[RxCachedThreadScheduler-2,5,main]
08-15 10:58:41.270 2498-2607/cn.jltx.rxjava.rx D/XQTAG: onNext:data-map0.thread:Thread[RxComputationThreadPool-2,5,main]
08-15 10:58:41.270 2498-2607/cn.jltx.rxjava.rx D/XQTAG: onComplete.thread:Thread[RxComputationThreadPool-2,5,main]
  • 6、subscribeOn和observeOn嵌套以及多個Observable聯合使用時窗轩,一層一層的線程由父Observable到子Observable的切換夯秃,同時也可以在doOnSubscribe()、doOnNext()痢艺、doOnComplete()這些方法前后自由通過observeOn()切換這些方法回調的線程仓洼。
    示例:
    public static void subscribeAndobserveOnNestScheduler() {
        Log.d(XqTag.TAG, "start.thread:" + Thread.currentThread());
        Observable.just("data")
                .observeOn(Schedulers.io())
                .doOnSubscribe(new Consumer<Disposable>() {
                    @Override
                    public void accept(Disposable disposable) throws Exception {
                        Log.d(XqTag.TAG, "doOnSubscribe.thread:" + Thread.currentThread());
                    }
                })
                //doOnSubscribe后面設置了subscribeOn后執(zhí)行離它最近的subscribeOn所指定的線程
                .subscribeOn(Schedulers.newThread())
                .subscribeOn(Schedulers.io())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        Log.d(XqTag.TAG, "map0.thread:" + Thread.currentThread());
                        return s + "-map0";
                    }
                })
                .observeOn(Schedulers.newThread())
                .flatMap(new Function<String, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(String s) throws Exception {
                        Log.d(XqTag.TAG, "flatMap1.thread:" + Thread.currentThread());
                        Observable<String> flatMapObservable = Observable.just(s + "-flatMap1")
                                .doOnSubscribe(new Consumer<Disposable>() {
                                    @Override
                                    public void accept(Disposable disposable) throws Exception {
                                        Log.d(XqTag.TAG, "flatMapObservable.doOnSubscribe.thread:" + Thread.currentThread());
                                    }
                                })
                                .subscribeOn(AndroidSchedulers.mainThread())
                                .observeOn(Schedulers.io())
                                .map(new Function<String, String>() {
                                    @Override
                                    public String apply(String s) throws Exception {
                                        Log.d(XqTag.TAG, "apply.map2.thread:" + Thread.currentThread());
                                        return s + "-map2";
                                    }
                                })
                                .observeOn(Schedulers.single())
                                .doOnNext(new Consumer<String>() {
                                    @Override
                                    public void accept(String s) throws Exception {
                                        Log.d(XqTag.TAG, "doOnNext.accept" + s + ".map2.thread:" + Thread.currentThread());
                                    }
                                })
                                .observeOn(Schedulers.newThread())
                                .doOnComplete(new Action() {
                                    @Override
                                    public void run() throws Exception {
                                        Log.d(XqTag.TAG, "doOnComplete.accept" + s + ".map2.thread:" + Thread.currentThread());
                                    }
                                });
                        return flatMapObservable;
                    }
                })
                .observeOn(Schedulers.computation())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(XqTag.TAG, "onSubscribe.thread:" + Thread.currentThread());
                    }

                    @Override
                    public void onNext(String value) {
                        Log.d(XqTag.TAG, "onNext:" + value + ".thread:" + Thread.currentThread());
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(XqTag.TAG, "onError:" + e.toString() + ".thread:" + Thread.currentThread());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(XqTag.TAG, "onComplete.thread:" + Thread.currentThread());
                    }
                });
    }

得到的結果是:

08-15 11:08:09.277 2498-2498/cn.jltx.rxjava.rx D/XQTAG: start.thread:Thread[main,5,main]
08-15 11:08:09.277 2498-2498/cn.jltx.rxjava.rx D/XQTAG: onSubscribe.thread:Thread[main,5,main]
08-15 11:08:09.279 2498-2646/cn.jltx.rxjava.rx D/XQTAG: doOnSubscribe.thread:Thread[RxNewThreadScheduler-3,5,main]
08-15 11:08:09.279 2498-2645/cn.jltx.rxjava.rx D/XQTAG: map0.thread:Thread[RxCachedThreadScheduler-3,5,main]
08-15 11:08:09.280 2498-2647/cn.jltx.rxjava.rx D/XQTAG: flatMap1.thread:Thread[RxNewThreadScheduler-4,5,main]
08-15 11:08:09.282 2498-2498/cn.jltx.rxjava.rx D/XQTAG: flatMapObservable.doOnSubscribe.thread:Thread[main,5,main]
08-15 11:08:09.283 2498-2645/cn.jltx.rxjava.rx D/XQTAG: apply.map2.thread:Thread[RxCachedThreadScheduler-3,5,main]
08-15 11:08:09.284 2498-2648/cn.jltx.rxjava.rx D/XQTAG: doOnNext.acceptdata-map0-flatMap1-map2.map2.thread:Thread[RxSingleScheduler-1,5,main]
08-15 11:08:09.287 2498-2649/cn.jltx.rxjava.rx D/XQTAG: doOnComplete.acceptdata-map0.map2.thread:Thread[RxNewThreadScheduler-5,5,main]
08-15 11:08:09.299 2498-2650/cn.jltx.rxjava.rx D/XQTAG: onNext:data-map0-flatMap1-map2.thread:Thread[RxComputationThreadPool-3,5,main]
08-15 11:08:09.299 2498-2650/cn.jltx.rxjava.rx D/XQTAG: onComplete.thread:Thread[RxComputationThreadPool-3,5,main]

寫在最后

每天堅持寫點日志心得。
最后附上源碼:
https://git.oschina.net/jltx/RxJavaRetrofitDemoPro

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末堤舒,一起剝皮案震驚了整個濱河市色建,隨后出現的幾起案子,更是在濱河造成了極大的恐慌舌缤,老刑警劉巖箕戳,帶你破解...
    沈念sama閱讀 217,277評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現場離奇詭異国撵,居然都是意外死亡陵吸,警方通過查閱死者的電腦和手機,發(fā)現死者居然都...
    沈念sama閱讀 92,689評論 3 393
  • 文/潘曉璐 我一進店門介牙,熙熙樓的掌柜王于貴愁眉苦臉地迎上來壮虫,“玉大人,你說我怎么就攤上這事环础∏羲疲” “怎么了剩拢?”我有些...
    開封第一講書人閱讀 163,624評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長谆构。 經常有香客問我裸扶,道長,這世上最難降的妖魔是什么搬素? 我笑而不...
    開封第一講書人閱讀 58,356評論 1 293
  • 正文 為了忘掉前任呵晨,我火速辦了婚禮,結果婚禮上熬尺,老公的妹妹穿的比我還像新娘摸屠。我一直安慰自己,他們只是感情好粱哼,可當我...
    茶點故事閱讀 67,402評論 6 392
  • 文/花漫 我一把揭開白布季二。 她就那樣靜靜地躺著,像睡著了一般揭措。 火紅的嫁衣襯著肌膚如雪胯舷。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,292評論 1 301
  • 那天绊含,我揣著相機與錄音桑嘶,去河邊找鬼。 笑死躬充,一個胖子當著我的面吹牛逃顶,可吹牛的內容都是我干的。 我是一名探鬼主播充甚,決...
    沈念sama閱讀 40,135評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼以政,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了伴找?” 一聲冷哼從身側響起盈蛮,我...
    開封第一講書人閱讀 38,992評論 0 275
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎技矮,沒想到半個月后眉反,有當地人在樹林里發(fā)現了一具尸體,經...
    沈念sama閱讀 45,429評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡穆役,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,636評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現自己被綠了梳凛。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片耿币。...
    茶點故事閱讀 39,785評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖韧拒,靈堂內的尸體忽然破棺而出淹接,到底是詐尸還是另有隱情十性,我是刑警寧澤,帶...
    沈念sama閱讀 35,492評論 5 345
  • 正文 年R本政府宣布塑悼,位于F島的核電站劲适,受9級特大地震影響,放射性物質發(fā)生泄漏厢蒜。R本人自食惡果不足惜霞势,卻給世界環(huán)境...
    茶點故事閱讀 41,092評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望斑鸦。 院中可真熱鬧愕贡,春花似錦、人聲如沸巷屿。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,723評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽嘱巾。三九已至憨琳,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間旬昭,已是汗流浹背篙螟。 一陣腳步聲響...
    開封第一講書人閱讀 32,858評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留稳懒,地道東北人闲擦。 一個月前我還...
    沈念sama閱讀 47,891評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像场梆,于是被迫代替她去往敵國和親墅冷。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,713評論 2 354

推薦閱讀更多精彩內容