線程調度器
上游默認在主線程發(fā)送事件,下游默認也是主線程中接收事件昼伴,
上下游默認是在同一個線程工作
//create創(chuàng)建一個上游 Observable(被觀察者)
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
Log.d(TAG, "Observable發(fā)出:1");
e.onNext(1);//向下游(觀察者)發(fā)射內容1
Log.d(TAG, "Observable發(fā)出:2");
e.onNext(2);
Log.d(TAG, "Observable發(fā)出:3");
e.onNext(3);
}
});
Consumer<Integer> consumer = new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Consumer thread is :" + Thread.currentThread().getName());
Log.d(TAG, "onNext收到:" + integer);
}
};
observable.subscribe(consumer);
而我們更多時候想要的是,在子線程中做耗時的操作, 然后回到主線程操作UI。通過RxJava內置的線程調度器美浦,我們可以很輕松的做到這一點,如下面的例子:
//create創(chuàng)建一個上游 Observable(被觀察者)
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
Log.d(TAG, "Observable發(fā)出:1");
e.onNext(1);//向下游(觀察者)發(fā)射內容1
Log.d(TAG, "Observable發(fā)出:2");
e.onNext(2);
Log.d(TAG, "Observable發(fā)出:3");
e.onNext(3);
}
});
Consumer<Integer> consumer = new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Consumer thread is :" + Thread.currentThread().getName());
Log.d(TAG, "onNext收到:" + integer);
}
};
//subscribeOn() 指定上游發(fā)送事件的線程, observeOn() 指定下游接收事件的線程.
observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer);
subscribeOn只能調用一次项栏,如果調用多次浦辨,只有第一次有效。
而observeOn可以多次調用沼沈,每次調用下游都可以切換一次線程荤牍。
如下面的例子:
//create創(chuàng)建一個上游 Observable(被觀察者)
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
Log.d(TAG, "Observable發(fā)出:1");
e.onNext(1);//向下游(觀察者)發(fā)射內容1
Log.d(TAG, "Observable發(fā)出:2");
e.onNext(2);
Log.d(TAG, "Observable發(fā)出:3");
e.onNext(3);
}
});
Consumer<Integer> consumer = new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Consumer thread is :" + Thread.currentThread().getName());
Log.d(TAG, "onNext收到:" + integer);
}
};
//subscribeOn() 指定上游發(fā)送事件的線程, observeOn() 指定下游接收事件的線程.
observable.subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())//多次調用subscribeOn()只有第一次的有效
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "observeOn(mainThread) current thread is: " + Thread.currentThread().getName());
}
})
.observeOn(Schedulers.io())//每調用一次observeOn() , 下游的線程就會切換一次
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "observeOn(io) current thread is : " + Thread.currentThread().getName());
}
})
.subscribe(consumer);
RxJava內置了很多線程選項供我們選擇
1案腺、Schedulers.io() 代表io操作的線程, 通常用于網(wǎng)絡,讀寫文件等io密集型的操作
2、Schedulers.computation() 代表CPU計算密集型的操作, 例如需要大量計算的操作
3康吵、Schedulers.newThread() 代表一個常規(guī)的新線程
4劈榨、AndroidSchedulers.mainThread() 代表Android的主線程
這些內置的Scheduler已經(jīng)足夠滿足我們開發(fā)的需求,因此我們應該使用內置的這些選項晦嵌,在RxJava內部使用的是線程池同辣,效率也比較高。