Rxjava雖然在項(xiàng)目中使用了很久振劳,但是卻一直沒有時(shí)間去了解其實(shí)現(xiàn)原理,最近空了下來梧乘,也把源碼走讀了一遍澎迎,加上看大神的博客,大致弄懂了其中的兩個(gè)關(guān)鍵點(diǎn)选调;
- Rxjava中鏈?zhǔn)秸{(diào)用怎么實(shí)現(xiàn)的夹供?
- Rxjava中的線程是如何切換的?
Rxjava操作符功能就不在本文中提及仁堪,以如下代碼進(jìn)行調(diào)試哮洽,了解第一個(gè)問題,Rxjava中鏈?zhǔn)秸{(diào)用怎么實(shí)現(xiàn)的弦聂?鸟辅。
Observable.just("a") //Observable1
.map(new Func1<String, String>() { //Observable2
@Override
public String call(String s) {
System.out.print(Thread.currentThread().getName() + ":first--" + s +"\n");
return s + s;
}
})
.subscribe(new Subscriber<String>() { //代碼⑥ Subscriber
@Override
public void onCompleted() {
System.out.print(Thread.currentThread().getName()+"\n");
System.out.print("completed"+"\n");
}
@Override
public void onError(Throwable e) {
System.out.print("error");
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
}
先說說自己的理解,若把整個(gè)鏈條看成一個(gè)整體對象莺葫,那么just創(chuàng)建被觀察者對象匪凉,而subscribe()里的Subscriber作為觀察者;若每一步都分開看捺檬,just()和subscribe()中間的操作符即是觀察者再层,又是被觀察者。
Observable中每個(gè)操作符基本都會(huì)創(chuàng)建出一個(gè)新的Observable;因此可以解理成后一級的操作符去觀察前一個(gè)Observable對象聂受;以上例來說蒿秦,.subscribe的Subscriber所觀察的對象就是.map返回的Observable2,而.map的Subscriber所觀察的對象就是 Observable.just("a")得到的對象Observable1蛋济;
下面擼一擼其實(shí)現(xiàn)代碼棍鳖,整個(gè)鏈?zhǔn)秸{(diào)用真正開始的地方是.subscribe(),我們就從這里開始擼碗旅。省略掉一些代碼渡处,只看關(guān)鍵部分如下:
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
...
try {
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); //代碼①
return hook.onSubscribeReturn(subscriber);
}
...
}
hook.onSubscribeStart(observable, observable.onSubscribe)得到的對象就是observable.onSubscribe,而此處的observable明顯就是this扛芽,也就是上例中的observable2對象骂蓖,即把subscriber傳入到了observable2里面以供其調(diào)用。
再跟著代碼進(jìn)入observable2(.map操作符)的實(shí)現(xiàn)川尖。其主要實(shí)現(xiàn)是lift和OperatorMap登下。如下:
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
}
lift和OperatorMap各自干了什么事情呢?先看OperatorMap,F(xiàn)unc1也作為構(gòu)造參數(shù)傳入肪获。關(guān)鍵代碼:
@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) { //代碼②
return new Subscriber<T>(o) {
@Override
public void onCompleted() {
o.onCompleted();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(T t) {
try {
o.onNext(transformer.call(t));
} catch (Throwable e) {
Exceptions.throwOrReport(e, this, t);
}
}
};
}
這里new出了一個(gè)觀察者對象Subscriber,它實(shí)現(xiàn)了什么功能通過 o.onNext(transformer.call(t));即將例子中的Func1代碼執(zhí)行后將結(jié)果傳入到下一層畔濒。即這里運(yùn)行了Func1的代碼
再看lift()操作符,看其返回值也就是我們定義的observable2對象锣咒。因此subscribe里的"代碼①"的call即是此處observable2里OnSubscribe的call方法侵状;再看call方法,“代碼④”部分則是調(diào)用到了observable1對象里OnSubscribe的call方法毅整,而“代碼③”將Func1操作動(dòng)作轉(zhuǎn)變?yōu)镾ubscriber趣兄,通過call(o)完成對下一級Subscriber的引用。
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = hook.onLift(operator).call(o); //代碼③
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
onSubscribe.call(st); //代碼④
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
});
}
到這里“代碼④”執(zhí)行悼嫉,即到了observable1對象艇潭,也就是例子中 Observable.just("a")所得到對象的OnSubscribe的call()方法,如下:
public final static <T> Observable<T> just(final T value) {
return ScalarSynchronousObservable.create(value);
}
ScalarSynchronousObservable類代碼如下:
public static final <T> ScalarSynchronousObservable<T> create(T t) {
return new ScalarSynchronousObservable<T>(t);
}
protected ScalarSynchronousObservable(final T t) {
super(new OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> s) {
/*
* We don't check isUnsubscribed as it is a significant performance impact in the fast-path use cases.
* See PerfBaseline tests and https://github.com/ReactiveX/RxJava/issues/1383 for more information.
* The assumption here is that when asking for a single item we should emit it and not concern ourselves with
* being unsubscribed already. If the Subscriber unsubscribes at 0, they shouldn't have subscribed, or it will
* filter it out (such as take(0)). This prevents us from paying the price on every subscription.
*/
s.onNext(t); //代碼⑤
s.onCompleted();
}
});
this.t = t;
}
其中"代碼⑤"是關(guān)鍵點(diǎn)戏蔑,t即是我們just傳入的"a",s則是代碼④傳入的st蹋凝,它其實(shí)是observable2的Subscriber(觀察者),相當(dāng)于observable1持有observable2的引用总棵。通過 s.onNext(t)鳍寂,完成了observable1向下一層的observable2的回調(diào),也就是Func1對象所在的Subscriber(OperatorMap)情龄,再通過 o.onNext(transformer.call(t));回到例子中“代碼⑥”伐割,至此候味,整個(gè)調(diào)用鏈完成。
上面的分析比較混亂隔心,重新梳理代碼執(zhí)行流程 :
1、subscribe里尚胞,hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); //代碼①
2硬霍、map里,通過lift()將Func1操作符生成Subserber笼裳,Subscriber<? super T> st = hook.onLift(operator).call(o); //代碼③
onSubscribe.call(st); //代碼④
3唯卖、just里create(), s.onNext(t); //代碼⑤
4躬柬、map里拜轨, OperatorMap里對象, o.onNext(transformer.call(t));
5允青、subscribe 的Subscriber();
Observable的所有鏈?zhǔn)秸{(diào)用橄碾,知道兩個(gè)其兩個(gè)關(guān)鍵點(diǎn)即可梳理清楚整個(gè)數(shù)據(jù)流傳遞原理;
- Observable.onSubscribe對象颠锉,完成以call方法來向上一層傳遞法牲;
- Subserber向下一層的Subserber調(diào)用;
至于其中的線程調(diào)度琼掠,只需要知道線程調(diào)度并不影響鏈?zhǔn)秸{(diào)用的數(shù)據(jù)流傳遞拒垃,其原理我們下一節(jié)再梳理;