RxJava概述
- RxJava 是一種響應式編程,來創(chuàng)建基于事件的異步操作庫∽樘猓基于事件流的鏈式調(diào)用橱脸、邏輯清晰簡潔础米。
- RxJava 我的理解是將事件從起點(上游)流向終點(下游),中間有很多卡片對數(shù)據(jù)進操作并傳遞添诉,每個卡片獲取上一個卡片傳遞下來的結(jié)果然后對事件進行處理然后將結(jié)果傳遞給下一個卡片屁桑,這樣事件就從起點通過卡片一次次傳遞直到流向終點。
RxJava觀察者模式
- 傳統(tǒng)觀察者是一個被觀察者多過觀察者栏赴,當被觀察者發(fā)生改變時候及時通知所有觀察者
- RXjava是一個觀察者多個被觀察者蘑斧,被觀察者像鏈條一樣串起來,數(shù)據(jù)在被觀察者之間朝著一個方向傳遞须眷,直到傳遞給觀察者 竖瘾。
RxJava原理理解
-
被觀察者通過訂閱將事件按順序依次傳遞給觀察者,
image.png
//RxAndroid中包含RxJava的內(nèi)容花颗,只引入RxAndroid還是會報錯
dependencies {
......
compile 'io.reactivex.rxjava2:rxjava:2.1.3'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
}
image.png
創(chuàng)建Observer(觀察者)
Observer<Integer> observer = new Observer<Integer>() {
// 觀察者接收事件前 捕传,當 Observable 被訂閱時,觀察者onSubscribe方法會自動被調(diào)用
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始采用subscribe連接");
}
// 當被觀察者生產(chǎn)Next事件
@Override
public void onNext(Integer value) {
Log.d(TAG, "對Next事件作出響應" + value);
}
// 當被觀察者生產(chǎn)Error事件
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
// 當被觀察者生產(chǎn)Complete事件
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
};
//Subscriber類 = RxJava 內(nèi)置的一個實現(xiàn)了 Observer 的抽象類扩劝,對 Observer 接口進行了擴展
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
// 觀察者接收事件前 庸论,當 Observable 被訂閱時,觀察者onSubscribe方法會自動被調(diào)用
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始采用subscribe連接");
}
// 當被觀察者生產(chǎn)Next事件
@Override
public void onNext(Integer value) {
Log.d(TAG, "對Next事件作出響應" + value);
}
// 當被觀察者生產(chǎn)Error事件
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
// 當被觀察者生產(chǎn)Complete事件
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
};
Subscriber 抽象類與Observer 接口的區(qū)別
- 二者基本使用方式一致(在RxJava的subscribe過程中棒呛,Observer會先被轉(zhuǎn)換成Subscriber再使用)
- Subscriber抽象類對 Observer 接口進行了擴展聂示,新增了兩個方法:
1. onStart():在還未響應事件前調(diào)用,用于做一些初始化工作簇秒,他是在subscribe 所在的線程調(diào)用鱼喉,不能切換線程,所以不能進行界面UI更新比如彈框這些。
2. unsubscribe():用于取消訂閱蒲凶。在該方法被調(diào)用后气筋,觀察者將不再接收響應事件,比如在onStop方法中可以調(diào)用此方法結(jié)束訂閱旋圆。調(diào)用該方法前宠默,先使用 isUnsubscribed() 判斷狀態(tài),確定被觀察者Observable是否還持有觀察者Subscriber的引用灵巧。
創(chuàng)建 Observable (被觀察者)
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
// 通過 ObservableEmitter類對象產(chǎn)生事件并通知觀察者
// ObservableEmitter:定義需要發(fā)送的事件 & 向觀察者發(fā)送事件
emitter.onNext(1);
emitter.onComplete();
}
});
RxJava 提供了其他方法用于 創(chuàng)建被觀察者對象Observable
// 方法1:just(T...):直接將傳入的參數(shù)依次發(fā)送出來
Observable observable = Observable.just("A", "B", "C");
// 將會依次調(diào)用:
// onNext("A");
// onNext("B");
// onNext("C");
// onCompleted();
// 方法2:fromArray(T[]) / from(Iterable<? extends T>) : 將傳入的數(shù)組 / Iterable 拆分成具體對象后搀矫,依次發(fā)送出來
String[] words = {"A", "B", "C"};
Observable observable = Observable.fromArray(words);
// 將會依次調(diào)用:
// onNext("A");
// onNext("B");
// onNext("C");
// onCompleted();
以上兩種方法創(chuàng)建出來的觀察者都是繼承Observable,比如ObservableCreate刻肄、ObservableFromArray瓤球、ObservableMap...,
public interface ObservableSource<T> {
void subscribe(@NonNull Observer<? super T> observer);
}
--------------------------------------------------------------------------------------------------------
public abstract class Observable<T> implements ObservableSource<T> {
...
protected abstract void subscribeActual(Observer<? super T> observer);
@Override
public final void subscribe(Observer<? super T> observer) {
...
try {
...
subscribeActual(observer);
} catch (Throwable e) {
...
}
}
}
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
static final class CreateEmitter<T> extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
...
@Override
public void onNext(T t) {
if (t == null) {
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
...
}
}
public final class ObservableFromArray<T> extends Observable<T> {
final T[] array;
@Override
public void subscribeActual(Observer<? super T> s) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
s.onSubscribe(d);
if (d.fusionMode) {
return;
}
d.run();
}
static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {
final Observer<? super T> actual; //對應觀察者
final T[] array;
...
@Override
public void dispose() {
disposed = true;
}
@Override
public boolean isDisposed() {
return disposed;
}
void run() {
T[] a = array;
int n = a.length;
for (int i = 0; i < n && !isDisposed(); i++) {
T value = a[i];
if (value == null) {
actual.onError(new NullPointerException("The " + i + "th element is null"));
return;
}
actual.onNext(value);
}
if (!isDisposed()) {
actual.onComplete();
}
}
}
}
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
...
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t),
"The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Nullable
@Override
public U poll() throws Exception {
T t = qs.poll();
return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t),
"The mapper function returned a null value.") : null;
}
}
}
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
private final T value;
public ObservableJust(final T value) {
this.value = value;
}
@Override
protected void subscribeActual(Observer<? super T> s) {
ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
s.onSubscribe(sd);
sd.run();
}
@Override
public T call() {
return value;
}
}
---------------------------------------------------------------------------------------------
public static final class ScalarDisposable<T>
extends AtomicInteger
implements QueueDisposable<T>, Runnable {
final Observer<? super T> observer;
final T value;
@Override
public void dispose() {
set(ON_COMPLETE);
}
....
@Override
public void run() {
if (get() == START && compareAndSet(START, ON_NEXT)) {
observer.onNext(value);
if (get() == ON_NEXT) {
lazySet(ON_COMPLETE);
observer.onComplete();
}
}
}
}
觀察者和被觀察者通過subscribe訂閱敏弃,訂閱完成后被觀察者就可以像觀察者發(fā)送數(shù)據(jù)
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始采用subscribe連接");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "對Next事件"+ value +"作出響應" );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
}
}
image.png
image.png
鏈式調(diào)用
image.png
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
}
}).map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
return null;
}
}).map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
return null;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
disposable = d;
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
這個訂閱的過程就如同洋蔥一樣一層層封裝卦羡,當訂閱完成后就像剝洋蔥一樣一層層剝,用發(fā)射器發(fā)送數(shù)據(jù)麦到,用onNext方法一層層發(fā)送绿饵,發(fā)送給每一層的時候就回調(diào)每一層的Function類apply方法,這個方法由開發(fā)者實現(xiàn)瓶颠,該方法處理數(shù)據(jù)后就返回處理后的數(shù)據(jù)拟赊,然后數(shù)據(jù)又往下一層傳遞,直到傳遞到觀察者手里粹淋,然后觀察者接收數(shù)據(jù)
image.png