寫(xiě)在前面的話
看了源碼也不少,但是每次看源碼都僅僅鸵鸥,也就是僅僅跟著作者一步步跟進(jìn)ta的方法疾宏,點(diǎn)進(jìn)去施绎,再點(diǎn)進(jìn)去惧财,再再點(diǎn)進(jìn)去,漸漸的感覺(jué)自己當(dāng)時(shí)看的太膚淺摊崭,有很多問(wèn)題都會(huì)冒出來(lái),最多的就是為什么要這樣寫(xiě)呢杰赛,一追究這個(gè)問(wèn)題就會(huì)發(fā)現(xiàn)自己在知識(shí)的海洋里是如此的渺小呢簸。寫(xiě)了這么多,希望能夠在以后眼光放高一些乏屯,看大局根时,看別人是如何排兵布陣!
rxjava執(zhí)行原理
先從調(diào)用rxjava功能的代碼入手辰晕,首先最簡(jiǎn)單的調(diào)用就是:
Observable.create(new Observable.OnSubscribe<Integer>() {//創(chuàng)建
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onCompleted();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("Completed");
}
@Override
public void onError(Throwable e) {
System.out.println("Error");
}
@Override
public void onNext(Integer integer) {
System.out.println("i = " + integer);
}
});
Observable.create()
首先調(diào)用
Observable.create()
創(chuàng)建一個(gè)生產(chǎn)者Observable
,同時(shí)創(chuàng)建了一個(gè)OnSubscribe
作為其參數(shù)傳給Observable
OnSubscribe
是Observable
的一個(gè)內(nèi)部接口蛤迎,從它的注釋"* Invoked when Observable.subscribe is called."可以看出當(dāng)生產(chǎn)者被消費(fèi)者訂閱的時(shí)候,它將會(huì)被激活含友。-
再看方法內(nèi)部:
public static <T> Observable<T> create(OnSubscribe<T> f) { return new Observable<T>(RxJavaHooks.onCreate(f)); }
-
接著我們看到創(chuàng)建的
OnSubscribe
又傳入了RxJavaHooks.onCreate(f)
中:public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) { Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate; if (f != null) { return f.call(onSubscribe); } return onSubscribe; }
這里判斷了
RxJavaHooks
中的成員變量onObservableCreate
是否為空忘苛,在RxjavaHooks
的內(nèi)部有一個(gè)靜態(tài)方法initCreate()
可以對(duì)其進(jìn)行初始化,但是我們并沒(méi)有調(diào)用它唱较,所有我們最后返回的onSubscribe
依然還是我們自己創(chuàng)建的onSubscribe
扎唾。-
倒回至Observable的構(gòu)造函數(shù),最后將這個(gè)
onSubscribe
賦值給了Observable的成員變量南缓。protected Observable(OnSubscribe<T> f) { this.onSubscribe = f; }
Subscriber
創(chuàng)建了生產(chǎn)者Observable
胸遇,那么肯定還要?jiǎng)?chuàng)建消費(fèi)者Observer
,Subscriber
就是Observer
接口的一個(gè)實(shí)現(xiàn)類(lèi)汉形,同時(shí)還實(shí)現(xiàn)了Subscription
接口纸镊,這個(gè)接口的方法unsubscribe()
用于取消訂閱,還有一個(gè)isUnsubscribed()
方法判斷訂閱的狀態(tài)概疆,unsubscribe()
這個(gè)方法很關(guān)鍵逗威,因?yàn)樵?subscribe()
之后, Observable
會(huì)持有 Subscriber
的引用岔冀,這個(gè)引用如果不能及時(shí)被釋放凯旭,將有內(nèi)存泄露的風(fēng)險(xiǎn)。
subscribe()
-
接下來(lái)就是
subscribe()
方法:static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { // new Subscriber so onStart it subscriber.onStart(); if (!(subscriber instanceof SafeSubscriber)) { subscriber = new SafeSubscriber<T>(subscriber); } try { RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber); return RxJavaHooks.onObservableReturn(subscriber); } catch (Throwable e) { ... } return Subscriptions.unsubscribed(); } }
首先
subscriber = new SafeSubscriber<T>(subscriber)
會(huì)將我們自己寫(xiě)的subscriber
進(jìn)行包裝使套,其實(shí)也就是代理的設(shè)計(jì)模式罐呼,在我們寫(xiě)的代碼中通過(guò)代理進(jìn)行一些安全校驗(yàn),這里就保證了onCompleted()
和onError()
只會(huì)有一個(gè)執(zhí)行切只執(zhí)行一次侦高。接著看
RxJavaHooks.onObservableStart(observable, observable.onSubscribe)
就會(huì)發(fā)現(xiàn)嫉柴,同樣該方法返回的就是我們創(chuàng)建的onSubscribe
,之后還調(diào)用了它的call()
方法奉呛,也就成了
調(diào)用我們自己寫(xiě)的call()
方法计螺。而
call()
方法中的參數(shù)subscriber
其實(shí)就是我們?cè)谡{(diào)用subscribe()
訂閱時(shí)夯尽,作為參數(shù)傳進(jìn)來(lái)的subscriber
,這樣也就讓我們?cè)谏a(chǎn)者這里調(diào)用到了消費(fèi)者的方法登馒,這樣也就達(dá)到了觀察者的目的匙握。
rxjava變換原理
rxjava的變換雖然功能各有不同,但實(shí)質(zhì)上都是針對(duì)時(shí)間序列的處理和再發(fā)送谊娇,這里我們就通過(guò)map()
來(lái)了解其中的原理肺孤,一下就是利用map
的一段代碼:
Observable.just(1,2,3)
.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return "i = " + integer;
}
})
.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println("s : " + s);
}
});
接著看map()
方法:
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return create(new OnSubscribeMap<T, R>(this, func));
}
是不是感覺(jué)很熟悉罗晕,返回的就是創(chuàng)建Observable
的方法济欢,也就是說(shuō)它將我們map()
中的func
作為onSubscribeMap
的構(gòu)造參數(shù),那么onSubscribeMap
又是什么小渊,它其實(shí)就是OnSubscribe
的一個(gè)實(shí)現(xiàn)類(lèi)法褥,先看其構(gòu)造方法:
public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
this.source = source;
this.transformer = transformer;
}
這里通過(guò)內(nèi)部的成員變量保存了傳遞進(jìn)來(lái)的Observable
和Func1
,記住酬屉,這里也就是說(shuō)在外頭調(diào)用該方法的Observable
半等,也就是源Observable
保存在了source
,而我們?cè)?code>map()中寫(xiě)的Func1()
保存在了transformer
中呐萨,在往后看.subscribe()
杀饵,注意調(diào)用訂閱方法的已經(jīng)不是源Observable
,而是通過(guò)map()
內(nèi)部自己創(chuàng)建返回的一個(gè)新Observable
谬擦,也就是說(shuō)新的Observable
持有了Subscriber
的對(duì)象切距,那么,訂閱了之后自然就會(huì)激活Observable
發(fā)射數(shù)據(jù)惨远,也就是onSubscribe
中的call()
方法開(kāi)始執(zhí)行谜悟,在這里就是onSubscribeMap
的call()
方法。
public void call(final Subscriber<? super R> o) {
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
source.unsafeSubscribe(parent);
}
代碼開(kāi)頭就創(chuàng)建了一個(gè)MapSubscriber
北秽,它是Subscriber
的一個(gè)子類(lèi)葡幸,構(gòu)造函數(shù)如下:
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
聯(lián)系上頭代碼,可以發(fā)現(xiàn)它將傳入的Subscriber
包裝成了MapSubscriber
贺氓,同時(shí)還講源Subscriber
和Func1
保存在了成員變量中蔚叨,之后執(zhí)行的時(shí)候肯定要執(zhí)行onNext()
方法,直接看onNext()
:
public void onNext(T t) {
R result;
try {
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
actual.onNext(result);
}
這里有一個(gè)泛型R辙培,也就是返回值類(lèi)型缅叠,是通過(guò)構(gòu)造函數(shù)傳入的,最重要的代碼result = mapper.call(t)
虏冻,這里的mapper
就是我們?cè)?code>map()中寫(xiě)的Func1
肤粱,接著傳入?yún)?shù),通過(guò)我們自己的方法得到我們想要的結(jié)果返回厨相。