RxJava2.0的簡(jiǎn)單使用
RxJava2.0---創(chuàng)建被觀察者
基于RxJava的事件總線RxBus
在RxJava中喉钢,我們可以創(chuàng)建出各種各樣的被觀察者(數(shù)據(jù)源),也對(duì)應(yīng)各種各樣的創(chuàng)建方法酷窥,下面就一一來(lái)解釋一下语御。能力有限,如有錯(cuò)誤請(qǐng)指出宽堆,謝謝郑原。
Create
使用Create操作符從頭開始創(chuàng)建一個(gè)Observable唉韭,給這個(gè)操作符傳遞一個(gè)接受觀察者作為參數(shù)的函數(shù),編寫這個(gè)函數(shù)讓它的行為表現(xiàn)為一個(gè)Observable--恰當(dāng)?shù)恼{(diào)用觀察者的onNext犯犁,onError和onCompleted方法属愤。這里只講非背壓的,背壓的也是類型創(chuàng)建方法酸役,也可以點(diǎn)擊這里進(jìn)行查看
/**
* 非背壓
* Observable對(duì)應(yīng)Observer
*/
private void createObservable() {
//被觀察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("This");
e.onNext("is");
e.onNext("RxJava");
e.onComplete();
e.onNext("Oh");//就算觀察者接收onComplete住诸,
// 被觀察者還是會(huì)繼續(xù)發(fā)送數(shù)據(jù),只是觀察者不接收
Log.i(TAG, "subscribe: ");
}
});
//觀察者
Observer<String> observer = new Observer<String>() {
Disposable disposable;
@Override
public void onSubscribe(Disposable d) {
disposable = d;
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: " + s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError: " + e.getLocalizedMessage());
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete");
//取消訂閱
if (!disposable.isDisposed()) {
disposable.dispose();
}
}
};
observable.subscribe(observer);
}
just涣澡、fromArray贱呐、fromIterable
可以將其它種類的對(duì)象和數(shù)據(jù)類型轉(zhuǎn)換為Observable
/**
* 三種遍歷數(shù)組集合的Observable
*/
private void justFromObservable() {
Log.i(TAG, "justFrom: ");
//just
Observable<Integer> observable = Observable.just(1, 2, 3);
// //fromArray
// observable = Observable.fromArray(1, 2, 3);
//
// ArrayList<Integer> list = new ArrayList<>();
// list.add(1);
// //fromIterable
// observable = Observable.fromIterable(list);
observable.subscribe(new Observer<Integer>() {
Disposable disposable;
@Override
public void onSubscribe(Disposable d) {
disposable = d;
}
@Override
public void onNext(Integer integer) {
Log.i(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, e.getLocalizedMessage());
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete");
if (!disposable.isDisposed())
disposable.dispose();
}
});
}
defer
直到有觀察者訂閱時(shí)才創(chuàng)建Observable,并且為每個(gè)觀察者創(chuàng)建一個(gè)新的Observable入桂,例如下面創(chuàng)建deferObservable時(shí)傳入的num=10的(并非真正創(chuàng)建奄薇,訂閱時(shí)才真正創(chuàng)建),然后在訂閱前把num=20,訂閱后收到的數(shù)據(jù)是20事格,并非10惕艳。
int num = 10;
/**
* 延遲創(chuàng)建使用defer
*/
private void deferObservable() {
Observable<Integer> observable = Observable.just(num);
//使用num=10來(lái)創(chuàng)建deferObservable
Observable<Integer> deferObservable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> call() throws Exception {
return Observable.just(num);
}
});
num = 20;
deferObservable.subscribe(new Observer<Integer>() {
Disposable disposable;
@Override
public void onSubscribe(Disposable d) {
disposable = d;
}
@Override
public void onNext(Integer integer) {
//輸出20
Log.i(TAG, "deferObserve onNext: " + integer);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, e.getLocalizedMessage());
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete");
if (!disposable.isDisposed())
disposable.dispose();
}
});
observable.subscribe(new Observer<Integer>() {
Disposable disposable;
@Override
public void onSubscribe(Disposable d) {
disposable = d;
}
@Override
public void onNext(Integer integer) {
//輸出10
Log.i(TAG, "observable onNext: " + integer);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, e.getLocalizedMessage());
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete");
if (!disposable.isDisposed())
disposable.dispose();
}
});
}
empty、Never驹愚、Throw
1.創(chuàng)建一個(gè)不發(fā)射任何數(shù)據(jù)但是正常終止的Observable
2.創(chuàng)建一個(gè)不發(fā)射數(shù)據(jù)也不終止的Observable
3.創(chuàng)建一個(gè)不發(fā)射數(shù)據(jù)以一個(gè)錯(cuò)誤終止的Observable
這三個(gè)操作符生成的Observable行為非常特殊和受限。測(cè)試的時(shí)候很有用劣纲,有時(shí)候也用于結(jié)合其它的Observables逢捺,或者作為其它需要Observable的操作符的參數(shù)。
private void emptyNeverThrow() {
Observable.empty().subscribe();
Observable.never().subscribe();
Observable.error(new NullPointerException()).subscribe();
}
interval
創(chuàng)建一個(gè)按固定時(shí)間間隔發(fā)射整數(shù)序列的Observable
private void timerObservable() {
Observable.interval(2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.i(TAG, Thread.currentThread().getName() + " accept: " + aLong);
}
});
}
range(1, 10)
創(chuàng)建一個(gè)發(fā)射特定整數(shù)序列的Observable(這里是1到10)
private void rangeObservable() {
Observable.range(1, 10).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.i(TAG, Thread.currentThread().getName() + " accept: " + integer);
}
});
}
timer
在一個(gè)給定的延遲后發(fā)射一個(gè)特殊的值
private void timerObservable() {
Observable.timer(2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.i(TAG, Thread.currentThread().getName() + " accept: " + aLong);
}
});
}
repeat癞季、repeatWhen
創(chuàng)建一個(gè)發(fā)射特定數(shù)據(jù)重復(fù)多次的Observable劫瞳,但是repeatWhen它不是緩存和重放原始Observable的數(shù)據(jù)序列,而是有條件的重新訂閱和發(fā)射原來(lái)的Observable绷柒。repeatWhen操作符默認(rèn)在trampoline調(diào)度器上執(zhí)行志于。
private void repeatObservable() {
Observable<String> observable = Observable.just("AAA", "BBB", "CCC");
// observable.repeat(5).subscribe(new Consumer<String>() {
// @Override
// public void accept(@NonNull String s) throws Exception {
// Log.i(TAG, Thread.currentThread().getName() + " accept: " + s);
// }
// });
observable.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Observable<Object> objectObservable) throws Exception {
return objectObservable;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.i(TAG, Thread.currentThread().getName() + " accept: " + s);
}
});
}