RxJava很優(yōu)勢的一個方面就是他的線程切換精绎,基本是依靠ObserveOn和SubscribeOn這兩個操作符來完成的友鼻。
先來看看什么是ObserveOn和SubscribeOn,官方對他們的定義是這樣的:
- ObserveOn
specify the Scheduler on which an observer will observe this Observable
指定一個觀察者在哪個調(diào)度器上觀察這個Observable
- SubscribeOn
specify the Scheduler on which an Observable will operate
指定Observable自身在哪個調(diào)度器上執(zhí)行
By default, an Observable and the chain of operators that you apply to it will do its work, and will notify its observers, on the same thread on which its Subscribe method is called. The SubscribeOn operator changes this behavior by specifying a different Scheduler on which the Observable should operate. The ObserveOn operator specifies a different Scheduler that the Observable will use to send notifications to its observers.
As shown in this illustration, the SubscribeOn operator designates which thread the Observable will begin operating on, no matter at what point in the chain of operators that operator is called. ObserveOn, on the other hand, affects the thread that the Observable will use below where that operator appears. For this reason, you may call ObserveOn multiple times at various points during the chain of Observable operators in order to change on which threads certain of those operators operate.
從上面的圖例中可以看出知染,SubscribeOn這個操作符指定的是Observable自身在哪個調(diào)度器上執(zhí)行寺枉,而且跟調(diào)用的位置沒有關(guān)系。而ObservableOn則是指定一個觀察者在哪個調(diào)度器上觀察這個Observable幻件,當每次調(diào)用了ObservableOn這個操作符時,之后都會在選擇的調(diào)度器上進行觀察蛔溃,直到再次調(diào)用ObservableOn切換了調(diào)度器绰沥。
那么,如果多次調(diào)用SubscribeOn贺待,會有什么效果呢徽曲?寫個例子測試一下就知道了。
Observable.just(1)
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
Log.i(TAG, "map-1:"+Thread.currentThread().getName());
return integer;
}
})
.subscribeOn(Schedulers.newThread())
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
Log.i(TAG, "map-2:"+Thread.currentThread().getName());
return integer;
}
})
.subscribeOn(Schedulers.io())
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
Log.i(TAG, "map-3:"+Thread.currentThread().getName());
return integer;
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.i(TAG, "subscribe:"+Thread.currentThread().getName());
}
});
在例子用麸塞,我多次調(diào)用了subscribeOn操作符秃臣,并且在每個map操作符中打印了當前線程的名稱。
從打印的日志中可以看出哪工,只有第一次調(diào)用SubscribeOn時選擇的調(diào)度器.subscribeOn(Schedulers.newThread())
有作用奥此,而后來選擇的都沒有作用。這說明了SubscribeOn這個操作符雁比,與調(diào)用的位置無關(guān)稚虎,而且只有第一次調(diào)用時會指定Observable自己在哪個調(diào)度器執(zhí)行。
其實有一種情況特殊偎捎,就是在DoOnSubscribe操作符之后調(diào)用蠢终,可以使DoOnSubscribe在指定的調(diào)度器中執(zhí)行。
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
Log.i(TAG, "create:" + Thread.currentThread().getName());
observableEmitter.onNext(1);
observableEmitter.onComplete();
}
});
observable.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.io())
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
Log.i(TAG, "map:" + Thread.currentThread().getName());
return integer;
}
})
.observeOn(AndroidSchedulers.mainThread())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
Log.i(TAG, "doOnSubscribe:" + Thread.currentThread().getName());
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.i(TAG, "subscribe:" + Thread.currentThread().getName());
}
});
由此可見茴她,SubscribeOn不僅可以指定Observable自身的調(diào)度器寻拂,也可以指定DoOnSubscribe執(zhí)行的調(diào)度器。
我們知道了多次調(diào)用SubscribeOn并不會起作用丈牢,那么多次調(diào)用ObservableOn呢祭钉?還是同樣的例子,將所有SubscribeOn換為ObservableOn試試赡麦。
Observable.just(1)
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
Log.i(TAG, "map-1:"+Thread.currentThread().getName());
return integer;
}
})
.observeOn(Schedulers.newThread())
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
Log.i(TAG, "map-2:"+Thread.currentThread().getName());
return integer;
}
})
.observeOn(Schedulers.io())
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
Log.i(TAG, "map-3:"+Thread.currentThread().getName());
return integer;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.i(TAG, "subscribe:"+Thread.currentThread().getName());
}
});
從打印的日志中可以看出朴皆,每次調(diào)用了ObservableOn操作符之后,之后的Map和Subscribe操作符都會發(fā)生在指定的調(diào)度器中泛粹,實現(xiàn)了線程的切換遂铡。
平時我們見到最多的使用場景,可能就是官方圖例上描述的那樣晶姊,配合Retrofit從網(wǎng)絡(luò)拿回數(shù)據(jù)扒接、在io線程或子線程執(zhí)行某些耗時操作,比如一些變換操作,然后再切換到主線程去更新UI钾怔,可以用RxJava的線程切換很方便的實現(xiàn)碱呼。但是這只是很簡單的應(yīng)用場景之一,RxJava能做的遠不止這些宗侦,我仍在不斷的摸索中愚臀。