前言
RxJava的學習筆記(一)基礎概念
RxJava的學習筆記(二)操作符
RxJava的學習筆記(三)線程調度
上一節(jié)筆記二中記錄了RxJava2常用操作符的使用,在本節(jié)中繼續(xù)學習RxJava2中最強大、最牛逼的地方傀顾,那就是可以在各個事件產生和傳遞的過程中能夠自由的切換線程。
要在Android中使用RxAndroid, 先繼續(xù)添加Gradle配置:
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
1、Scheduler
Scheduler顧名思義被稱為線程調度器。有了這個東西能夠讓RxJava中可以自由的在主線程和子線程之間切換攀隔,在RxJava中內置了以下Scheduler線程調度器:
調度器類型 | 用途 |
---|---|
Schedulers.computation() | 用于計算任務,如事件循環(huán)或和回調處理栖榨,不要用于IO操作(IO操作請使用Schedulers.io())昆汹;默認線程數等于處理器的數量 |
Schedulers.from(executor) | 使用指定的Executor線程池作為自定義的調度器 |
Schedulers.single() | RxJava2新增調度器,返回一個默認的婴栽,共享的满粗,單線程支持的Scheduler實例,用于在同一個后臺線程上強制執(zhí)行的工作愚争。 |
Schedulers.immediate(?) | RxJava1.0里面有該調度器焦匈,到2.0之后已被移除翩伪。在當前線程立即開始執(zhí)行任務 |
Schedulers.io(?) | 用于IO密集型任務着倾,如異步阻塞IO操作寺旺,這個調度器的線程池會根據需要增長;對于普通的計算任務鞍陨,請使用Schedulers.computation()步淹;Schedulers.io(?)默認是一個CachedThreadScheduler,類似一個有線程緩存的新線程調度器 |
Schedulers.newThread(?) | 為每個任務創(chuàng)建一個新線程 |
Schedulers.trampoline(?) | 當其它排隊的任務完成后诚撵,在當前線程排隊開始執(zhí)行 |
AndroidSchedulers.mainThread() | 此調度器為RxAndroid特有贤旷,顧名思義,運行在Android UI線程上 |
2砾脑、Scheduler線程調度的實踐
在RxJava中可以利用subscribeOn() 結合 observeOn() 來實現線程控制,讓事件的產生和消費發(fā)生在不同的線程艾杏。對于subscribeOn和observeOn方法的使用特點和對應的作用分下面幾種情況:
- 1韧衣、不進行線程Scheduler切換控制時都是在當前主線程里面運行。
示例:
public static void noScheduler() {
Log.d(XqTag.TAG, "start.thread:" + Thread.currentThread());
Observable.just("data")
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d(XqTag.TAG, "map0.thread:" + Thread.currentThread());
return s + "-map0";
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(XqTag.TAG, "onSubscribe.thread:" + Thread.currentThread());
}
@Override
public void onNext(String value) {
Log.d(XqTag.TAG, "onNext:" + value + ".thread:" + Thread.currentThread());
}
@Override
public void onError(Throwable e) {
Log.d(XqTag.TAG, "onError:" + e.toString() + ".thread:" + Thread.currentThread());
}
@Override
public void onComplete() {
Log.d(XqTag.TAG, "onComplete.thread:" + Thread.currentThread());
}
});
}
得到的結果:
08-15 18:30:04.854 10161-10161/cn.jltx.rxjava.rx D/XQTAG: start.thread:Thread[main,5,main]
08-15 18:30:04.863 10161-10161/cn.jltx.rxjava.rx D/XQTAG: onSubscribe.thread:Thread[main,5,main]
08-15 18:30:04.863 10161-10161/cn.jltx.rxjava.rx D/XQTAG: map0.thread:Thread[main,5,main]
08-15 18:30:04.863 10161-10161/cn.jltx.rxjava.rx D/XQTAG: onNext:data-map0.thread:Thread[main,5,main]
08-15 18:30:04.863 10161-10161/cn.jltx.rxjava.rx D/XQTAG: onComplete.thread:Thread[main,5,main]
- 2购桑、subscribeOn無論放置在哪個位置畅铭,它的線程切換發(fā)生在該Observable的OnSubscribe 中,即在它通知上一級 OnSubscribe 時勃蜘,這時事件還沒有開始發(fā)送硕噩,因此 subscribeOn() 的線程控制可以從事件發(fā)出的開端就造成影響。當使用多個subscribeOn() 的時候缭贡,只有第一個 subscribeOn() 起作用炉擅。
示例:
public static void subscribeOnScheduler() {
Log.d(XqTag.TAG, "start.thread:" + Thread.currentThread());
Observable.just("data")
.subscribeOn(Schedulers.io())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d(XqTag.TAG, "map0.thread:" + Thread.currentThread());
return s + "-map0";
}
})
.subscribeOn(Schedulers.newThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(XqTag.TAG, "onSubscribe.thread:" + Thread.currentThread());
}
@Override
public void onNext(String value) {
Log.d(XqTag.TAG, "onNext:" + value + ".thread:" + Thread.currentThread());
}
@Override
public void onError(Throwable e) {
Log.d(XqTag.TAG, "onError:" + e.toString() + ".thread:" + Thread.currentThread());
}
@Override
public void onComplete() {
Log.d(XqTag.TAG, "onComplete.thread:" + Thread.currentThread());
}
});
}
得到的結果是:
08-15 18:33:31.654 10161-10161/cn.jltx.rxjava.rx D/XQTAG: start.thread:Thread[main,5,main]
08-15 18:33:31.671 10161-10161/cn.jltx.rxjava.rx D/XQTAG: onSubscribe.thread:Thread[main,5,main]
08-15 18:33:31.695 10161-10499/cn.jltx.rxjava.rx D/XQTAG: map0.thread:Thread[RxCachedThreadScheduler-1,5,main]
08-15 18:33:31.695 10161-10499/cn.jltx.rxjava.rx D/XQTAG: onNext:data-map0.thread:Thread[RxCachedThreadScheduler-1,5,main]
08-15 18:33:31.695 10161-10499/cn.jltx.rxjava.rx D/XQTAG: onComplete.thread:Thread[RxCachedThreadScheduler-1,5,main]
只有第一個subscribeOn()生效辉懒,而最后的subscribeOn就沒有效果了。
- 3谍失、通過observeOn進行線程切換時眶俩,發(fā)生在自己內部構建的Observable被訂閱者中,也就是在它即將給下一級的訂閱者發(fā)送事件時快鱼,因此颠印,ObserveOn()控制的是后面的線程,可以多個ObserveOn()一起使用抹竹。每使用一次ObserveOn()线罕,它后面的線程就跟著變換一次。
示例:
public static void observeOnScheduler() {
Log.d(XqTag.TAG, "start.thread:" + Thread.currentThread());
Observable.just("data")
.observeOn(Schedulers.io())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d(XqTag.TAG, "map0.thread:" + Thread.currentThread());
return s + "-map0";
}
})
.observeOn(Schedulers.computation())
.observeOn(Schedulers.newThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(XqTag.TAG, "onSubscribe.thread:" + Thread.currentThread());
}
@Override
public void onNext(String value) {
Log.d(XqTag.TAG, "onNext:" + value + ".thread:" + Thread.currentThread());
}
@Override
public void onError(Throwable e) {
Log.d(XqTag.TAG, "onError:" + e.toString() + ".thread:" + Thread.currentThread());
}
@Override
public void onComplete() {
Log.d(XqTag.TAG, "onComplete.thread:" + Thread.currentThread());
}
});
}
得到的結果是:
08-15 10:52:51.998 2498-2498/cn.jltx.rxjava.rx D/XQTAG: start.thread:Thread[main,5,main]
08-15 10:52:52.003 2498-2498/cn.jltx.rxjava.rx D/XQTAG: onSubscribe.thread:Thread[main,5,main]
08-15 10:52:52.004 2498-2577/cn.jltx.rxjava.rx D/XQTAG: map0.thread:Thread[RxCachedThreadScheduler-1,5,main]
08-15 10:52:52.018 2498-2579/cn.jltx.rxjava.rx D/XQTAG: onNext:data-map0.thread:Thread[RxNewThreadScheduler-1,5,main]
08-15 10:52:52.018 2498-2579/cn.jltx.rxjava.rx D/XQTAG: onComplete.thread:Thread[RxNewThreadScheduler-1,5,main]
- 4窃判、下游的onSubscrible的回調是在subscrible()訂閱動作的時候就被調用了钞楼,因此不能先直接指定onSubscrible執(zhí)行的線程,而只能在subscribe()被調用時的線程中兢孝。
- 5窿凤、對于設置doOnSubscribe回調默認跟onSubscribe的回調規(guī)則那樣在執(zhí)行subscribe()訂閱動作的時候被調用了,但是如果在 doOnSubscribe() 之后有 subscribeOn() 跨蟹,它將執(zhí)行在離它最近的 subscribeOn() 所指定的線程雳殊。
示例:
public static void subscribeAndobserveOnScheduler() {
Log.d(XqTag.TAG, "start.thread:" + Thread.currentThread());
Observable.just("data")
.observeOn(Schedulers.io())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.d(XqTag.TAG, "doOnSubscribe.thread:" + Thread.currentThread());
}
})
//doOnSubscribe后面設置了subscribeOn后執(zhí)行離它最近的subscribeOn所指定的線程
.subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String s) throws Exception {
Log.d(XqTag.TAG, "map0.thread:" + Thread.currentThread());
return Observable.just(s + "-map0");
}
})
.observeOn(Schedulers.computation())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(XqTag.TAG, "onSubscribe.thread:" + Thread.currentThread());
}
@Override
public void onNext(String value) {
Log.d(XqTag.TAG, "onNext:" + value + ".thread:" + Thread.currentThread());
}
@Override
public void onError(Throwable e) {
Log.d(XqTag.TAG, "onError:" + e.toString() + ".thread:" + Thread.currentThread());
}
@Override
public void onComplete() {
Log.d(XqTag.TAG, "onComplete.thread:" + Thread.currentThread());
}
});
}
得到結果是:
08-15 10:58:41.261 2498-2498/cn.jltx.rxjava.rx D/XQTAG: start.thread:Thread[main,5,main]
08-15 10:58:41.267 2498-2498/cn.jltx.rxjava.rx D/XQTAG: onSubscribe.thread:Thread[main,5,main]
08-15 10:58:41.268 2498-2606/cn.jltx.rxjava.rx D/XQTAG: doOnSubscribe.thread:Thread[RxNewThreadScheduler-2,5,main]
08-15 10:58:41.268 2498-2605/cn.jltx.rxjava.rx D/XQTAG: map0.thread:Thread[RxCachedThreadScheduler-2,5,main]
08-15 10:58:41.270 2498-2607/cn.jltx.rxjava.rx D/XQTAG: onNext:data-map0.thread:Thread[RxComputationThreadPool-2,5,main]
08-15 10:58:41.270 2498-2607/cn.jltx.rxjava.rx D/XQTAG: onComplete.thread:Thread[RxComputationThreadPool-2,5,main]
- 6、subscribeOn和observeOn嵌套以及多個Observable聯合使用時窗轩,一層一層的線程由父Observable到子Observable的切換夯秃,同時也可以在doOnSubscribe()、doOnNext()痢艺、doOnComplete()這些方法前后自由通過observeOn()切換這些方法回調的線程仓洼。
示例:
public static void subscribeAndobserveOnNestScheduler() {
Log.d(XqTag.TAG, "start.thread:" + Thread.currentThread());
Observable.just("data")
.observeOn(Schedulers.io())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.d(XqTag.TAG, "doOnSubscribe.thread:" + Thread.currentThread());
}
})
//doOnSubscribe后面設置了subscribeOn后執(zhí)行離它最近的subscribeOn所指定的線程
.subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d(XqTag.TAG, "map0.thread:" + Thread.currentThread());
return s + "-map0";
}
})
.observeOn(Schedulers.newThread())
.flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String s) throws Exception {
Log.d(XqTag.TAG, "flatMap1.thread:" + Thread.currentThread());
Observable<String> flatMapObservable = Observable.just(s + "-flatMap1")
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.d(XqTag.TAG, "flatMapObservable.doOnSubscribe.thread:" + Thread.currentThread());
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.io())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d(XqTag.TAG, "apply.map2.thread:" + Thread.currentThread());
return s + "-map2";
}
})
.observeOn(Schedulers.single())
.doOnNext(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(XqTag.TAG, "doOnNext.accept" + s + ".map2.thread:" + Thread.currentThread());
}
})
.observeOn(Schedulers.newThread())
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
Log.d(XqTag.TAG, "doOnComplete.accept" + s + ".map2.thread:" + Thread.currentThread());
}
});
return flatMapObservable;
}
})
.observeOn(Schedulers.computation())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(XqTag.TAG, "onSubscribe.thread:" + Thread.currentThread());
}
@Override
public void onNext(String value) {
Log.d(XqTag.TAG, "onNext:" + value + ".thread:" + Thread.currentThread());
}
@Override
public void onError(Throwable e) {
Log.d(XqTag.TAG, "onError:" + e.toString() + ".thread:" + Thread.currentThread());
}
@Override
public void onComplete() {
Log.d(XqTag.TAG, "onComplete.thread:" + Thread.currentThread());
}
});
}
得到的結果是:
08-15 11:08:09.277 2498-2498/cn.jltx.rxjava.rx D/XQTAG: start.thread:Thread[main,5,main]
08-15 11:08:09.277 2498-2498/cn.jltx.rxjava.rx D/XQTAG: onSubscribe.thread:Thread[main,5,main]
08-15 11:08:09.279 2498-2646/cn.jltx.rxjava.rx D/XQTAG: doOnSubscribe.thread:Thread[RxNewThreadScheduler-3,5,main]
08-15 11:08:09.279 2498-2645/cn.jltx.rxjava.rx D/XQTAG: map0.thread:Thread[RxCachedThreadScheduler-3,5,main]
08-15 11:08:09.280 2498-2647/cn.jltx.rxjava.rx D/XQTAG: flatMap1.thread:Thread[RxNewThreadScheduler-4,5,main]
08-15 11:08:09.282 2498-2498/cn.jltx.rxjava.rx D/XQTAG: flatMapObservable.doOnSubscribe.thread:Thread[main,5,main]
08-15 11:08:09.283 2498-2645/cn.jltx.rxjava.rx D/XQTAG: apply.map2.thread:Thread[RxCachedThreadScheduler-3,5,main]
08-15 11:08:09.284 2498-2648/cn.jltx.rxjava.rx D/XQTAG: doOnNext.acceptdata-map0-flatMap1-map2.map2.thread:Thread[RxSingleScheduler-1,5,main]
08-15 11:08:09.287 2498-2649/cn.jltx.rxjava.rx D/XQTAG: doOnComplete.acceptdata-map0.map2.thread:Thread[RxNewThreadScheduler-5,5,main]
08-15 11:08:09.299 2498-2650/cn.jltx.rxjava.rx D/XQTAG: onNext:data-map0-flatMap1-map2.thread:Thread[RxComputationThreadPool-3,5,main]
08-15 11:08:09.299 2498-2650/cn.jltx.rxjava.rx D/XQTAG: onComplete.thread:Thread[RxComputationThreadPool-3,5,main]
寫在最后
每天堅持寫點日志心得。
最后附上源碼:
https://git.oschina.net/jltx/RxJavaRetrofitDemoPro