RXJava是什么
RxJava 在 GitHub 主頁上官方解釋為:RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.(一個(gè)在 Java VM 上使用可觀測的序列來組成異步的连躏、基于事件的程序的庫)
為什么使用RXJava
上面說了RXJava是一個(gè)異步事件庫,Android中關(guān)于異步的處理有Handler消息機(jī)制贞滨、AsyncTask異步任務(wù)入热,但是使用RXJava來實(shí)行異步處理更加簡潔,鏈?zhǔn)秸{(diào)用書寫起來更方便晓铆,邏輯也更加清晰勺良,而且RXJava有很多適合各種場景的操作符,功能非常強(qiáng)大骄噪,使用起來非常方便尚困。
觀察者模式
觀察者模式面向的需求是:A 對(duì)象(觀察者)對(duì) B 對(duì)象(被觀察者)的某種變化高度敏感,A對(duì)象需要在 B 變化的一瞬間做出反應(yīng)链蕊。
RxJava 有兩個(gè)基本概念:Observable (可觀察者事甜,即被觀察者)、 Observer (觀察者)滔韵、 subscribe (訂閱)逻谦。Observable 和 Observer 通過 subscribe() 方法實(shí)現(xiàn)訂閱關(guān)系,從而 Observable 可以在需要的時(shí)候發(fā)出事件來通知 Observer陪蜻。
RXJava基本使用
Observable.create(new ObservableOnSubscribe<TestBean>() {
@Override
public void subscribe(ObservableEmitter<TestBean> e) throws Exception {
Log.d(TAG, "---subscribe---");
e.onNext(new TestBean());
e.onComplete();
}
})
.map(new Function<TestBean, String>() {
@Override
public String apply(TestBean testBean) throws Exception {
Log.d(TAG, "---map操作符---");
return "test";
}
})
// .subscribeOn(Schedulers.io())
// .observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "---onSubscribe---");
}
@Override
public void onNext(@NonNull String string) {
Log.d(TAG, "---onNext---");
}
@Override
public void onError(@NonNull Throwable e) {
Log.d(TAG, "---onError---");
}
@Override
public void onComplete() {
Log.d(TAG, "---onComplete---");
}
});
}
-
Observable.create(...)
產(chǎn)生一個(gè)繼承抽象類Observable的子類對(duì)象(ObservableCreate對(duì)象)邦马,RxJavaPlugins.onAssembly(...)
為hook相關(guān)操作,一般的調(diào)用f==null宴卖,直接返回傳進(jìn)去的參數(shù)滋将,即ObservableCreate對(duì)象
public abstract class Observable<T> implements ObservableSource<T> {
......
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
//判空操作乙濒,如果為空直接拋空指針異常
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
......
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
2.Observable.map(...)
方法會(huì)產(chǎn)生一個(gè)ObservableMap對(duì)象徊哑,此對(duì)象的構(gòu)造方法會(huì)保存this(這里的this為Observable對(duì)象(多態(tài)),即上面返回的ObservableCreate對(duì)象)和傳進(jìn)來的mapper邻吞。
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
//判空操作齿兔,如果為空直接拋空指針異常
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
3.Observable.subscribe(...)
讓觀察者和被觀察者產(chǎn)生訂閱關(guān)系(第1次訂閱
),在里面會(huì)調(diào)用subscribeActual(observer)
方法,并將第1個(gè)觀察者
observer回調(diào)過去。
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
subscribeActual(observer)
為Observable的抽象方法分苇,實(shí)際上會(huì)執(zhí)行子類ObservableMap的subscribeActual(observer)
如下:
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
- 該方法中的source為ObservableMap構(gòu)造函數(shù)中保存的Observable的子類(多態(tài))添诉,即ObservableCreate對(duì)象
-
source.subscribe(new MapObserver<T, U>(t, function))
,又產(chǎn)生了一次訂閱(第2次訂閱
)医寿,和上面的第3步一樣栏赴,會(huì)走Observable的抽象方法subscribeActual(observer)
,并將MapObserver對(duì)象(第2個(gè)觀察者
)回調(diào)回去靖秩,其中MapObserver對(duì)象的構(gòu)造方法中會(huì)保存t(第1次訂閱回調(diào)過來的的observer)和function(mapper) - 實(shí)際會(huì)執(zhí)行ObservableCreate對(duì)象中的
subscribeActual(observer)
方法如下:
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
該方法會(huì)執(zhí)行兩個(gè)方法observer.onSubscribe(parent)
,source.subscribe(parent)
- 第一個(gè)方法中回調(diào)回來的observer為上面的mapObserver,
parent為上面的CreateEmitter對(duì)象(下面再介紹CreateEmitter里面做了什么)
Observer是一個(gè)接口,調(diào)用onSubscribe(parent)
實(shí)際走的是mapObserver的父類BasicFuseableObserver(Observer的實(shí)現(xiàn)類)中的onSubscribe(...)
方法,如下:
public abstract class BasicFuseableObserver<T, R> implements Observer<T>, QueueDisposable<R> {
......
@SuppressWarnings("unchecked")
@Override
public final void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {
this.qs = (QueueDisposable<T>)s;
}
if (beforeDownstream()) {
actual.onSubscribe(this);
afterDownstream();
}
}
}
......
}
其中actual為在MapObserver構(gòu)造方法中保存的observer即第一次回調(diào)回來的observer须眷,并將mapObserver回調(diào)回去
終于回到了最初第一次new的observer,接著就走到了下圖中的onSubscribe(@NonNull Disposable d)
,d為回調(diào)回來的mapObserver對(duì)象沟突。
- 第二個(gè)方法
source.subscribe(parent)
,source為上圖中new出來的ObservableOnSubscribe匿名對(duì)象花颗,parent為第一個(gè)方法里的CreateEmitter對(duì)象,執(zhí)行回調(diào)方法subscribe(ObservableEmitter<TestBean> e)
惠拭,回調(diào)回來的ObservableEmitter對(duì)象(CreateEmitter對(duì)象),CreateEmitter構(gòu)造中保存了第2次訂閱回調(diào)回來的觀察者MapObserver對(duì)象如下圖扩劝, CreateEmitter對(duì)象可以調(diào)用我們常用的方法如事件的發(fā)射emitter.onNext(...)
和完成事件emitter.onComplete()
- 首先看
emitter.onNext(...)
做了什么操作
static final class CreateEmitter<T> extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
}
observer為第2次訂閱回調(diào)回來的觀察者mapObserver,調(diào)用他的onNext(T t)
方法职辅,在onNext(T t)
方法中會(huì)執(zhí)行mapper.apply(t)并返回一個(gè)泛型U v,實(shí)際會(huì)回調(diào)到下圖中箭頭所指的apply(...)
方法
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
}
緊接著又會(huì)調(diào)用actual.onNext(v)
,最終會(huì)回到第一次訂閱回調(diào)回來的觀察者的onNext()
棒呛。
- 接著當(dāng)調(diào)用
emitter.onComplete()
時(shí)會(huì)走到mapObserver父類BasicFuseableObserver的onComplete()
,其中actual為第一次回調(diào)回來的觀察者observer域携,最終會(huì)回到它的onComplete()
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
actual.onComplete();
}
到此為止一個(gè)簡單的調(diào)用流程就走完了簇秒,當(dāng)被觀察者和觀察者產(chǎn)生訂閱關(guān)系時(shí),各個(gè)方法的調(diào)用順序?yàn)橄聢D中的1秀鞭、2趋观、3、4气筋、5拆内、6
總結(jié)
Observable.create(new ObservableOnSubscribe<TestBean>() {...})
.map(new Function<TestBean, String>(){...} )
//.subscribeOn(Schedulers.io())
// .observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {...})
- Observable通過
.XXX(...)
都會(huì)依次生成一個(gè)Observable對(duì)象(多態(tài)),每個(gè)Observable對(duì)象都會(huì)依次保存上一個(gè)
Observable對(duì)象的引用和傳進(jìn)來的參數(shù)宠默,參數(shù)最終會(huì)被保存到觀察者observer中(鏈?zhǔn)秸{(diào)用連起來的關(guān)鍵)麸恍。 - 當(dāng)通過
Observable.subscribe(new Observer<String>(){...})
產(chǎn)生訂閱時(shí),抽象類Observable會(huì)在subscribe(observer)
方法中調(diào)用subscribeActual(observer)
(子類已復(fù)寫該方法),實(shí)際上就是Observable子類(ObservableCreate搀矫、ObservableMap等均繼承Observable)執(zhí)行此方法抹沪,在subscribeActual(observer)
方法中會(huì)調(diào)用source.subscribe(parent)
利用保存的上個(gè)Observable(source)
又一次產(chǎn)生訂閱,也就是自下而上產(chǎn)生訂閱直到第一個(gè)source開始發(fā)射事件
瓤球。 -
subscribeActual(observer)
會(huì)生成一個(gè)新的observer(如MapObserver對(duì)象),新的observer會(huì)自下而上的保存上個(gè)observer
,當(dāng)發(fā)射事件時(shí)會(huì)利用observer自上而下將事件依次發(fā)送到最下面的observer上
融欧。
結(jié)束
第一次寫blog,表達(dá)能力也不好,希望用來記錄自己的學(xué)習(xí)過程卦羡,以上的RXJava只是最基本的調(diào)用(注釋了線程切換噪馏,如果加上線程切換整體流程大同小異)麦到,有不對(duì)的地方歡迎指正,下次準(zhǔn)備學(xué)習(xí)RXJava的Schedulers線程調(diào)度欠肾。