傳統(tǒng)的觀察者模式
RxJava 四個(gè)要素
- 被觀察者
- 觀察者
- 訂閱
- 事件
- 創(chuàng)建被觀察者
subscriber就是觀察者
- 創(chuàng)建被觀察者
- 創(chuàng)建觀察者
- 訂閱
核心
操作符
map
//調(diào)用lift方法,創(chuàng)建一個(gè)OperatorMap對(duì)象作為參數(shù)
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
}
public final class OperatorMap<T, R> implements Operator<R, T> {
final Func1<? super T, ? extends R> transformer;
public OperatorMap(Func1<? super T, ? extends R> transformer) {
this.transformer = transformer;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
//創(chuàng)建新的觀察者對(duì)象
return new Subscriber<T>(o) {
@Override
public void onCompleted() {
o.onCompleted();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(T t) {
try {
//使用傳入的觀察者調(diào)用onNext方法回調(diào)Func1的方法
o.onNext(transformer.call(t));
} catch (Throwable e) {
Exceptions.throwOrReport(e, this, t);
}
}
};
}
}
//lift方法中創(chuàng)建新的被觀察者對(duì)象衅疙,相當(dāng)于代理瘩绒,負(fù)責(zé)接收原始的被觀察者的事件
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
try {
//拿到map方法中創(chuàng)建的觀察者對(duì)象
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
onSubscribe.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
});
總結(jié):
map操作符是通過(guò)創(chuàng)建一個(gè)Observable<R>響應(yīng)實(shí)現(xiàn)的是Observable<T>中的觀察者Subscriber<? super T>
這里有點(diǎn)繞癌压,hook.onLift(operator).call(o)這行代碼是調(diào)用OperatorMap方法中的call方法傳入新創(chuàng)建的Subscriber觀察者黄橘,在新創(chuàng)建的觀察者中的onNext犯法中執(zhí)行的是原觀察者的Func1中的call回調(diào)泳梆。
flatMap
這個(gè)其實(shí)就是把多個(gè)map串聯(lián)起來(lái)統(tǒng)一處理
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
if (getClass() == ScalarSynchronousObservable.class) {
return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
}
return merge(map(func));
}
RxJava 線程調(diào)度
Schedulers
subscribeOn 被觀察者處理觀察者的call回調(diào)(發(fā)出事件)
observeOn 觀察者的回調(diào)處理
總結(jié):
subscribeOn只能調(diào)用一次羔飞,因?yàn)閟ubscribeOn作用域是全局的每次創(chuàng)建新的Observable拼岳,subscribeOn指定Observable的線程執(zhí)行位置偿荷。Observable只有一個(gè)窘游。
observeOn指定是它之后的subscriber觀察者回調(diào)線程執(zhí)行位置。subscriber是多個(gè)跳纳。