一爹橱、scan語法
public Observable<Integer> getRxJavaCreateExampleData() {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
LogUtils.debug(TAG, "getRxJavaCreateExampleData---:" + Thread.currentThread().getName() + "--:" + 1);
emitter.onNext(1);
LogUtils.debug(TAG, "getRxJavaCreateExampleData---:" + Thread.currentThread().getName() + "--:" + 2);
emitter.onNext(2);
// Thread.sleep(5000);
LogUtils.debug(TAG, "getRxJavaCreateExampleData---:" + Thread.currentThread().getName() + "--:" + 3);
emitter.onNext(3);
emitter.onComplete();
LogUtils.debug(TAG, "getRxJavaCreateExampleData---:" + Thread.currentThread().getName() + "--:" + 4);
emitter.onNext(4);
}
});
}
public void rxJavaScanExample() {
Disposable disposable = model.getRxJavaCreateExampleData()
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.scan( new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
LogUtils.error(TAG, "rxJavaScanExample--:" + Thread.currentThread().getName() + "-scan-:" + integer + "---" + integer2);
return integer+integer2;
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.error(TAG, "rxJavaReduceExample--:" + Thread.currentThread().getName() + "-consumer-:" + integer);
}
});
compositeDisposable.add(disposable);
}
日志
08-19 15:16:15.864 12835-12880/com.example.zhang E/MainPresenter: rxJavaScanExample--:RxCachedThreadScheduler-1-scan-:1---2
08-19 15:16:15.865 12835-12880/com.example.zhang E/MainPresenter: rxJavaScanExample--:RxCachedThreadScheduler-1-scan-:3---3
08-19 15:16:15.867 12835-12835/com.example.zhang E/MainPresenter: rxJavaReduceExample--:main-consumer-:1
rxJavaReduceExample--:main-consumer-:3
rxJavaReduceExample--:main-consumer-:6
二、scan語法2
public void rxJavaScanExample() {
Disposable disposable = model.getRxJavaCreateExampleData()
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.scan(10, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
LogUtils.error(TAG, "rxJavaScanExample--:" + Thread.currentThread().getName() + "-scan-:" + integer + "---" + integer2);
return integer+integer2;
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.error(TAG, "rxJavaReduceExample--:" + Thread.currentThread().getName() + "-consumer-:" + integer);
}
});
compositeDisposable.add(disposable);
}
日志
08-19 15:14:12.876 12633-12692/com.example.zhang E/MainPresenter: rxJavaScanExample--:RxCachedThreadScheduler-1-scan-:10---1
08-19 15:14:12.877 12633-12692/com.example.zhang E/MainPresenter: rxJavaScanExample--:RxCachedThreadScheduler-1-scan-:11---2
rxJavaScanExample--:RxCachedThreadScheduler-1-scan-:13---3
08-19 15:14:12.883 12633-12633/com.example.zhang E/MainPresenter: rxJavaReduceExample--:main-consumer-:10
rxJavaReduceExample--:main-consumer-:11
rxJavaReduceExample--:main-consumer-:13
rxJavaReduceExample--:main-consumer-:16
總結
1窄做、scan(BiFunction<T, T, T> accumulator) 把數據疊加起來
2愧驱、 scan(final R initialValue, BiFunction<R, ? super T, R> accumulator) initialValue給將疊加的數據添加一個初始值
3、scan與reduce的區(qū)別:reduce是只返回一次結果椭盏,scan是多次
4组砚、scan返回次數等于初始值一次+emitter發(fā)送數據size ,如果沒有初始值掏颊,則emitter發(fā)送數據的第一個當初始值