RxJava2.0的簡單使用
RxJava2.0---創(chuàng)建被觀察者
基于RxJava的事件總線RxBus
RxJava是什么相艇?
一個基于觀察者模式(事件流)的異步任務(wù)庫誊册∠沈龋可以很簡潔地完成一個異步任務(wù)萄窜,當(dāng)任務(wù)復(fù)雜時也能清晰地表達(dá)邏輯。GitHub地址循榆。析恢,具體的一些理論可以查看拋物線
這邊文章《給 Android 開發(fā)者的 RxJava 詳解》,很好的入門教程。
基本使用
在RxJava2.0中秧饮,把背壓
和非背壓
分兩種觀察者模式映挂。
背壓:事件產(chǎn)生的速度遠(yuǎn)遠(yuǎn)快于事件消費(fèi)的速度朝扼,最終導(dǎo)致數(shù)據(jù)積累越來越多卖漫,從而導(dǎo)致OOM等異常。
1现诀、非背壓
/**
* 非背壓
* Observable對應(yīng)Observer
*/
private void createObservable() {
//被觀察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("This");
e.onNext("is");
e.onNext("RxJava");
e.onComplete();
}
});
//觀察者
Observer<String> observer = new Observer<String>() {
Disposable disposable;
@Override
public void onSubscribe(Disposable d) {
disposable = d;
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: " + s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError: " + e.getLocalizedMessage());
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete");
//取消訂閱
if (!disposable.isDisposed()) {
disposable.dispose();
}
}
};
observable.subscribe(observer);
}
2泼各、背壓
/**
* 背壓(在異步過程中鞍时,由于被觀察者發(fā)射數(shù)據(jù)過快,而觀察者處理數(shù)據(jù)不及時,
* 導(dǎo)致內(nèi)存里堆積了太多數(shù)據(jù)扣蜻,從而OOM逆巍,可以選擇不同的策略處理該問題)
* Flowable對應(yīng)subscriber
*/
private void createFlowable() {
Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> e) throws Exception {
if (!e.isCancelled()) {
e.onNext("This");
e.onNext("is");
e.onNext("RxJava");
e.onComplete();
}
}
//拋棄策略
}, BackpressureStrategy.DROP);
Subscriber<String> subscriber = new Subscriber<String>() {
Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
subscription = s;
//請求一個數(shù)據(jù)
subscription.request(1);
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: " + s);
//處理完后,再請求一個數(shù)據(jù)
subscription.request(1);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError: " + e.getLocalizedMessage());
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete");
//取消訂閱
subscription.cancel();
}
};
flowable.subscribe(subscriber);
}