一.正常執(zhí)行流程分析
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("123456");
subscriber.onCompleted();
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String string) {
Log.i("TAG","String=" + string);
}
});
這是一個(gè)簡(jiǎn)單的構(gòu)建RxJava從被觀察者到觀察者的流程,這里我們首先還是先聲明一些概念性的東西
- Observable 被觀察者
- Subscriber 觀察者
- OnSubscribe 從設(shè)計(jì)模式角度理解锚扎,OnSubscribe.call()可以理解為觀察者模式中被觀察者用來通知觀察者的notifyObservers()方法
- subscribe() 實(shí)現(xiàn)觀察者與被觀察者訂閱關(guān)系的方法
同樣我們的解析流程也是從創(chuàng)建Observable鲫剿,創(chuàng)建Subscriber到最后的訂閱關(guān)系來進(jìn)行解析
1.創(chuàng)建Observable(被觀察者)
創(chuàng)建Observable通過Observable.create()方法衣洁,我們看一下這個(gè)方法的實(shí)現(xiàn)
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
我們看一下RxJavaHooks.onCreate()方法
public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate;
if (f != null) {
return f.call(onSubscribe);
}
return onSubscribe;
}
這里其實(shí)就是返回onCreate傳入的OnSubscribe,也就是我們創(chuàng)建的OnSubscribe
至此我們做下邏輯梳理:Observable.create()方法構(gòu)造了一個(gè)被觀察者Observable對(duì)象,同時(shí)將new出來的OnSubscribe賦值給了該Observable的成員變量onSubscribe毫玖。
2.創(chuàng)建Subscriber(觀察者)
創(chuàng)建其實(shí)很簡(jiǎn)單炼蹦,我們這里簡(jiǎn)單看一下Subscriber的源碼
public abstract class Subscriber<T> implements Observer<T>, Subscription {
private final SubscriptionList subscriptions;//訂閱事件集羡宙,所有發(fā)送給當(dāng)前Subscriber的事件都會(huì)保存在這里
...
protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) {
this.subscriber = subscriber;
this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList();
}
...
@Override
public final void unsubscribe() {
subscriptions.unsubscribe();
}
@Override
public final boolean isUnsubscribed() {
return subscriptions.isUnsubscribed();
}
public void onStart() {
}
...
}
Subscriber實(shí)現(xiàn)了Observer接口,提供了onCompleted()掐隐,onError()與onNext()狗热,需要我們繼承并進(jìn)行相關(guān)操作。
Subscriber實(shí)現(xiàn)了Subscription接口虑省,從而對(duì)外提供isUnsubscribed()和unsubscribe()方法匿刮。前者用于判斷是否已經(jīng)取消訂閱;后者用于將訂閱事件列表(也就是當(dāng)前觀察者的成員變量subscriptions)中的所有Subscription取消訂閱探颈,并且不再接受觀察者Observable發(fā)送的后續(xù)事件僻焚。
3.subscribe()源碼分析
部分源碼如下
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
...
subscriber.onStart();
if (!(subscriber instanceof SafeSubscriber)) {
subscriber = new SafeSubscriber<T>(subscriber);
}
try {
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
...
return Subscriptions.unsubscribed();
}
}
上述代碼中最關(guān)鍵的就是RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber)。這里的RxJavaHooks和之前提到的一樣膝擂,RxJavaHooks.onObservableStart(observable, observable.onSubscribe)返回的正是他的第二個(gè)入?yún)bservable.onSubscribe虑啤,也就是當(dāng)前observable的成員變量onSubscribe。而這個(gè)成員變量我們前面提到過架馋,它是我們?cè)贠bservable.create()的時(shí)候new出來的狞山。
所以這段代碼可以簡(jiǎn)化為onSubscribe.call(subscriber),通過這樣就建立了觀察者與被觀察者的聯(lián)系
所以最后我們?cè)倏偨Y(jié)一下整個(gè)流程
1.調(diào)用crate()創(chuàng)建一個(gè)觀察者叉寂,同時(shí)創(chuàng)建一個(gè)OnSubscribe作為該方法的入?yún)?br>
2.調(diào)用subscribe()來訂閱我們自己創(chuàng)建的觀察者Subscriber
3.一旦調(diào)用subscribe()方法后就會(huì)觸發(fā)執(zhí)行OnSubscribe.call()
4.我們就可以在call方法調(diào)用觀察者subscriber的onNext(),onCompleted(),onError()
二.操作符原理分析
我們以map操作符為例子萍启,使用如下
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(123456);
subscriber.onCompleted();
}
}).map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return "This is" + integer;
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(String string) {
Log.i("TAG","String=" + string);
}
});
我們直接看map操作相關(guān)的代碼部分,如下
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return unsafeCreate(new OnSubscribeMap<T, R>(this, func));
}
public static <T> Observable<T> unsafeCreate(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
從這里我們看見,是創(chuàng)建了一個(gè)新的被觀察者勘纯,然后這個(gè)新的被觀察者與觀察者建立關(guān)聯(lián)局服,這里與前面的觀察者建立的區(qū)別在于OnSubscribe成員變量的不同。
這里是一個(gè)新的OnSubscribe成員變量OnSubscribeMap驳遵,繼承于OnSubscribe淫奔,代碼如下:
public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {
final Observable<T> source;
final Func1<? super T, ? extends R> transformer;
public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
this.source = source;
this.transformer = transformer;
}
@Override
public void call(final Subscriber<? super R> o) {
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
source.unsafeSubscribe(parent);
}
static final class MapSubscriber<T, R> extends Subscriber<T> {
final Subscriber<? super R> actual;
final Func1<? super T, ? extends R> mapper;
boolean done;
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
@Override
public void onNext(T t) {
R result;
try {
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
actual.onNext(result);
}
@Override
public void onError(Throwable e) {
...
actual.onError(e);
}
@Override
public void onCompleted() {
...
actual.onCompleted();
}
@Override
public void setProducer(Producer p) {
actual.setProducer(p);
}
}
}
我們看下OnSubscribeMap構(gòu)造函數(shù)的傳入的兩個(gè)變量,一個(gè)是之前創(chuàng)建的Observable(被觀察者)堤结,另外一個(gè)是我們新操作的Func1函數(shù)唆迁。
然后在調(diào)用subscribe()方法后就會(huì)觸發(fā)執(zhí)行OnSubscribeMap.call()方法,我們仔細(xì)看一下這個(gè)方法竞穷,
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
source.unsafeSubscribe(parent);
首先是通過創(chuàng)建了觀察者o和轉(zhuǎn)換函數(shù)Func1構(gòu)造了一個(gè)新的觀察者M(jìn)apSubscriber唐责,接下來最后調(diào)用了source也就是的unsafeSubscribe()方法,而這個(gè)source通過前后關(guān)系瘾带,我們可以知道是之前創(chuàng)建的Observable(被觀察者)鼠哥。
接下來看一下unsafeSubscribe函數(shù)
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
try {
// new Subscriber so onStart it
subscriber.onStart();
// allow the hook to intercept and/or decorate
RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
...
return Subscriptions.unsubscribed();
}
}
這里是不是很熟悉,和前面說過的一樣看政,也就是調(diào)用了onSubscribe.call()方法朴恳,注意這里的onSubscribe是最早創(chuàng)建的Observable(被觀察者)的onSubscribe,而subscriber則是新構(gòu)建的觀察者M(jìn)apSubscriber帽衙。
所有當(dāng)onSubscribe.call()方法調(diào)用后菜皂,我們看下調(diào)用的方法贞绵,如下
subscriber.onNext(123456);
subscriber.onCompleted();
所有這里其實(shí)是調(diào)用MapSubscriber的onNext方法與onCompleted方法厉萝,我們分析下onNext方法,如下
@Override
public void onNext(T t) {
R result;
try {
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
actual.onNext(result);
}
可以看到這里首先做了預(yù)處理mapper.call(t)榨崩,這里的mapper就是我們重寫的Func1中的方法谴垫。
所有其實(shí)我們執(zhí)行的順序是
1.MapSubscriber.onNext(123456)。
2.map中的call方法進(jìn)行處理母蛛。
3.執(zhí)行原始的觀察者的onNext(T)方法翩剪。
所以最后我們?cè)倏偨Y(jié)一下整個(gè)流程
1.調(diào)用crate()創(chuàng)建一個(gè)觀察者A,同時(shí)創(chuàng)建一個(gè)OnSubscribe作為該方法的入?yún)?/p>
2.調(diào)用map()創(chuàng)建一個(gè)觀察者B彩郊,同時(shí)創(chuàng)建一個(gè)OnSubscribeMap作為該方法入?yún)⑶巴洌瑒?chuàng)建OnSubscribeMap傳入觀察者A與轉(zhuǎn)換函數(shù)Func1
3.調(diào)用subscribe()來訂閱我們自己創(chuàng)建的觀察者Subscriber
4.一旦調(diào)用subscribe()方法后就會(huì)觸發(fā)執(zhí)行MapOnSubscribe.call(),這個(gè)方法內(nèi)創(chuàng)建新的觀察者M(jìn)apSubscriber秫逝,通過觀察者o和轉(zhuǎn)換函數(shù)Func1構(gòu)建
5.MapOnSubscribe.call()調(diào)用了unsafeSubscribe函數(shù)恕出,這里面調(diào)用了onSubscribe.call()方法,這里的onSubscribe是最早創(chuàng)建的Observable(被觀察者)的onSubscribe违帆,而subscriber則是新構(gòu)建的觀察者M(jìn)apSubscriber浙巫。
6.接下來就是執(zhí)行相關(guān)的傳遞工作,MapSubscriber.onNext方法,map中的call方法進(jìn)行處理的畴,執(zhí)行原始的觀察者的onNext(T)方法渊抄。
至此結(jié)束。
三.線程調(diào)到分析
RxJava其中一個(gè)很重要的特性就是對(duì)于線程調(diào)度想對(duì)來說比較容易丧裁,可以很方便的通過subscribeOn()和observeOn()來指定數(shù)據(jù)流的每一部分運(yùn)行在哪個(gè)線程护桦。
subscribeOn()指定了處理Observable(被觀察者)的全部的過程(包括發(fā)射數(shù)據(jù)和通知)的線程。
observeOn()指定了Subscriber(觀察者)的onNext(), onError()和onCompleted()執(zhí)行的線程渣慕。
我們首先看一下線程調(diào)度的代碼
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(123456);
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(String string) {
Log.i("TAG","String=" + string);
}
});
1.subscribeOn()源碼分析
經(jīng)過調(diào)用嘶炭,代碼如下
public final Observable<T> subscribeOn(Scheduler scheduler, boolean requestOn) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return unsafeCreate(new OperatorSubscribeOn<T>(this, scheduler, requestOn));
}
是不是看到了熟悉的一幕,接下來我們看下OperatorSubscribeOn這個(gè)類
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
final Scheduler scheduler;
final Observable<T> source;
final boolean requestOn;
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler, boolean requestOn) {
this.scheduler = scheduler;
this.source = source;
this.requestOn = requestOn;
}
@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<T>(subscriber, requestOn, inner, source);
subscriber.add(parent);
subscriber.add(inner);
inner.schedule(parent);
}
static final class SubscribeOnSubscriber<T> extends Subscriber<T> implements Action0 {
...
}
}
所有在調(diào)用subscribe()方法后就會(huì)觸發(fā)執(zhí)行OperatorSubscribeOn.call()方法逊桦,這里調(diào)用了scheduler的createWorker方法
這里的scheduler是我們定義傳進(jìn)來的眨猎,即Schedulers.io()
public static Scheduler io() {
return RxJavaHooks.onIOScheduler(getInstance().ioScheduler);
}
private Schedulers() {
...
Scheduler io = hook.getIOScheduler();
if (io != null) {
ioScheduler = io;
} else {
ioScheduler = RxJavaSchedulersHook.createIoScheduler();
}
...
}
經(jīng)過調(diào)用,發(fā)現(xiàn)我們實(shí)現(xiàn)的Schedulers是CachedThreadScheduler强经,接下來我們看下createWorker方法睡陪,如下
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
這里返回了一個(gè)新的EventLoopWorker對(duì)象,而這個(gè)EventLoopWorker是CachedThreadScheduler的內(nèi)部類匿情。
接下來回去看call方法兰迫,我們會(huì)發(fā)現(xiàn)執(zhí)行了EventLoopWorker的schedule方法如下
@Override
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
if (innerSubscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed
return Subscriptions.unsubscribed();
}
ScheduledAction s = threadWorker.scheduleActual(new Action0() {
@Override
public void call() {
if (isUnsubscribed()) {
return;
}
action.call();
}
}, delayTime, unit);
innerSubscription.add(s);
s.addParent(innerSubscription);
return s;
}
這里執(zhí)行了scheduleActual方法,我們看一下這個(gè)方法的實(shí)現(xiàn)
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
ScheduledAction run = new ScheduledAction(decoratedAction);
Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);
return run;
}
這個(gè)方法主要是將傳入的Action0包裝成ScheduledAction炬称,然后線程池運(yùn)行這個(gè)ScheduledAction汁果,那么這個(gè)ScheduledAction肯定是繼承Runable或者Callable的
找到ScheduledAction的run方法如下
@Override
public void run() {
...
action.call();
...
}
這里調(diào)用的傳入Action0的call方法,我們看下定義的Action0的call方法玲躯,這個(gè)方法是調(diào)用了schedule方法時(shí)候傳入Action0的call方法
通過查看据德,我們知道這個(gè)Action0即是SubscribeOnSubscriber,接下來看一個(gè)這個(gè)SubscribeOnSubscriber的call方法
@Override
public void call() {
Observable<T> src = source;
source = null;
t = Thread.currentThread();
src.unsafeSubscribe(this);
}
這里的source是之前創(chuàng)建的Observable(被觀察者)跷车,接下來調(diào)用unsafeSubscribe方法棘利,如下
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
try {
// new Subscriber so onStart it
subscriber.onStart();
// allow the hook to intercept and/or decorate
RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
...
return Subscriptions.unsubscribed();
}
}
和前面操作符一致,這里的onSubscribe是最早創(chuàng)建的Observable(被觀察者)的onSubscribe朽缴,而subscriber則是新構(gòu)建的觀察者SubscribeOnSubscriber善玫。
所以這時(shí)候onSubscribe的call操作其實(shí)已經(jīng)是在異步的線程中執(zhí)行了。