注:本篇文章代碼基于Rxjava1.x
RxJava是目前非常流行的一個(gè)響應(yīng)是編程框架,它用了Java的語(yǔ)法特性來(lái)模擬出一套流式過(guò)程化的寫法,并可以通過(guò)線程調(diào)度器,非常方便的實(shí)現(xiàn)線程切換贡茅。本系列文章假設(shè)讀者已經(jīng)是使用過(guò)Rxjava或者RxAndroid的開發(fā)者,如果你還未使用過(guò),不妨看下下面的幾篇文章:
1.謎之RxJava
2.給 Android 開發(fā)者的 RxJava 詳解
3.深入淺出RxJava
本篇將使用一個(gè)非常簡(jiǎn)單的例子做引線,引出在Rxjava中一些核心類和核心對(duì)象,如果你尚未使用過(guò)Rxjava,請(qǐng)?jiān)陂喿x過(guò)上面幾篇文章后,編寫過(guò)一些Rxjava相關(guān)代碼后再閱讀本文章许布。
我們來(lái)看下我們的例子:
Observable.just("str1", "str2", "str3", "str4")
.map(new Func1<String, String>() {
@Override
public String call(String t) {
// TODO Auto-generated method stub
return "[" + t + "]";
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted ");
}
@Override
public void onNext(String t) {
System.out.println("onNext " + t);
}
@Override
public void onError(Throwable e) {}
});
上面的例子非常簡(jiǎn)單我們換成我們的自然語(yǔ)言可以分成以下步驟:
1.構(gòu)建一個(gè)String[]數(shù)組的Observable
2.通過(guò)映射方法map,將返回值映射成為"["+t+"]"格式
3.被一個(gè)訂閱者所訂閱
最后我們將在控制臺(tái)輸出:output:
onNext [str1]
onNext [str2]
onNext [str3]
onNext [str4]
onCompleted
雖然是短短幾行代碼,簡(jiǎn)單幾個(gè)類,但是已經(jīng)包含了大部分RxJava中的核心元素,本章就以這個(gè)簡(jiǎn)單的例子為引子,引出RxJava的基本體系結(jié)構(gòu)和一些核心功能類厚柳。我們先來(lái)看下
Observable.just("str1", "str2", "str3", "str4")
這個(gè)方法的輸入是一堆數(shù)組,輸出是一個(gè)Observable對(duì)象匆绣。實(shí)際上它就是一個(gè)靜態(tài)工廠,我們看下它的源碼:
public static <T> Observable<T> just(T t1, T t2, T t3, T t4) {
return from((T[])new Object[] { t1, t2, t3, t4 });
}// call from
public static <T> Observable<T> from(T[] array) {
int n = array.length;
if (n == 0) {
return empty();
} else
if (n == 1) {
return just(array[0]);//選擇構(gòu)造方法
}
return unsafeCreate(new OnSubscribeFromArray<T>(array));//使用OnSubscribeFromArray為參數(shù)構(gòu)造
}
just代碼中調(diào)用了from方法來(lái)執(zhí)行構(gòu)造,而在from方法中,會(huì)先進(jìn)行數(shù)組空判斷和長(zhǎng)度判斷,目的是為了選擇不同的構(gòu)造方法。最后我們所傳入的數(shù)組對(duì)象,將被包裝成為一個(gè)OnSubscribeFromArray對(duì)象倡怎。而這個(gè)對(duì)象作為一個(gè)參數(shù)被unsafeCreate方法所調(diào)用。unsafeCreate是構(gòu)造Observable最核心的方法,不論哪種方式的構(gòu)造器最終都會(huì)調(diào)用到這個(gè)方法。我們現(xiàn)在看下這個(gè)方法的聲明:
public static <T> Observable<T> unsafeCreate(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
首先,我們傳入的OnSubscribe對(duì)象將被RxJavaHooks的onCreate給hook住,轉(zhuǎn)化為一個(gè)OnSubscribe對(duì)象瞎颗。這里,如果你對(duì)aop不陌生的話,相信這塊很好理解,實(shí)際上相當(dāng)于你在構(gòu)造Observable的時(shí)候做了一層攔截,或者說(shuō)一次hook。我們不妨深入一點(diǎn),看下RxJavaHooks里面究竟到底做了什么轉(zhuǎn)換:
//code RxJavaHooks.java
public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate;
if (f != null) {
return f.call(onSubscribe);
}
return onSubscribe;
}
RxJavaHooks.onCreate方法用到了一個(gè)函數(shù)對(duì)象onObservableCreate捌议。這里所定義的函數(shù)對(duì)象很類似我們?cè)趧?dòng)態(tài)語(yǔ)言中定義的閉包對(duì)象哼拔。我們看下onObservableCreate對(duì)象是怎么被賦值的:
public class RxJavaHooks {
static {
init();
}
static void init() {
...
initCreate();
}
static void initCreate() {
...
onObservableCreate = new Func1<Observable.OnSubscribe, Observable.OnSubscribe>() {
@Override
public Observable.OnSubscribe call(Observable.OnSubscribe f) {
return RxJavaPlugins.getInstance().getObservableExecutionHook().onCreate(f);
}
};
...
}
RxJavaHooks類在類初始化的時(shí)候通過(guò)調(diào)用init->initCreate方法給onObservableCreate函數(shù)對(duì)象賦值。而賦值函數(shù)會(huì)調(diào)用
RxJavaPlugins.getInstance().getObservableExecutionHook().onCreate(f);
也就是說(shuō)RxJavaHooks只是提供一個(gè)簡(jiǎn)單的接口和初始化操作瓣颅。實(shí)際調(diào)用者在RxJavaPlugins中倦逐。我們看下RxJavaPlugins.getObservableExecutionHook函數(shù):
public RxJavaObservableExecutionHook getObservableExecutionHook() {
if (observableExecutionHook.get() == null) {
// check for an implementation from System.getProperty first
Object impl = getPluginImplementationViaProperty(RxJavaObservableExecutionHook.class, System.getProperties());//通過(guò)系統(tǒng)配置獲取一個(gè)RxJavaObservableExecutionHook對(duì)象。
if (impl == null) {
// nothing set via properties so initialize with default
observableExecutionHook.compareAndSet(null, RxJavaObservableExecutionHookDefault.getInstance());
//如果沒有配置對(duì)象則使用默認(rèn)對(duì)象
// we don't return from here but call get() again in case of thread-race so the winner will always get returned
} else {
// we received an implementation from the system property so use it
observableExecutionHook.compareAndSet(null, (RxJavaObservableExecutionHook) impl);
}
}
return observableExecutionHook.get();
}
實(shí)際上, getObservableExecutionHook()方法得到的對(duì)象也是一個(gè)單利,但是是非線程安全的,這段代碼主要做以下事情:
1.如果RxJavaObservableExecutionHook對(duì)象不存在,會(huì)先通過(guò)調(diào)用getPluginImplementationViaProperty方法,也就是通過(guò)查看系統(tǒng)配置參數(shù)查看是否有實(shí)現(xiàn)類,如果有,將生成一個(gè)具體的RxJavaObservableExecutionHook實(shí)例返回
2.如果通過(guò)步驟1無(wú)法生成一個(gè)RxJavaObservableExecutionHook對(duì)象,將返回一個(gè)默認(rèn)的RxJavaObservableExecutionHookDefault. getInstance()對(duì)象
3.最后將通過(guò)1,2獲取的對(duì)象記錄在全局變量中
這里引出一個(gè)問(wèn)題,就是我們?nèi)绾巫⑷胍粋€(gè)hook函數(shù)呢檬姥?這就需要深入到getPluginImplementationViaProperty的具體實(shí)現(xiàn)中去:
{//code getPluginImplementationViaProperty()
final String classSimpleName = pluginClass.getSimpleName();
String pluginPrefix = "rxjava.plugin.";
String defaultKey = pluginPrefix + classSimpleName + ".implementation";
...
}
首先,getPluginImplementationViaProperty會(huì)先定義一個(gè)key,這個(gè)key的基本結(jié)構(gòu)為:rxjava.plugin.[classSimpleName].implementation粉怕。而這里的classSimpleName依賴于我們傳入的pluginClass對(duì)象贫贝。我們回到剛才的調(diào)用鏈:
Object impl = getPluginImplementationViaProperty(RxJavaObservableExecutionHook.class, System.getProperties());
在調(diào)用getPluginImplementationViaProperty函數(shù)的時(shí)候,我們傳入的是一個(gè)RxJavaObservableExecutionHook類型,因此這里的classSimpleName 值對(duì)應(yīng)的應(yīng)該是"RxJavaObservableExecutionHook",所以我們就得到了配置的key為:
"rxjava.plugin.RxJavaObservableExecutionHook.implementation"
之后,getPluginImplementationViaProperty函數(shù)會(huì)通過(guò)這個(gè)key,從System.property中尋找具體的實(shí)現(xiàn)類,然后通過(guò)反射構(gòu)建出具體的實(shí)現(xiàn)對(duì)象。
//code getPluginImplementationViaProperty()
{
...
String implementingClass = props.getProperty(defaultKey);
try {
Class<?> cls = Class.forName(implementingClass);
// narrow the scope (cast) to the type we're expecting
cls = cls.asSubclass(pluginClass);
return cls.newInstance();
}
...
}
我們不妨來(lái)試一下這種寫法,還是基于上面的簡(jiǎn)單例子,我們?cè)诖a前增加一段話:
{
System.setProperty("rxjava.plugin.RxJavaObservableExecutionHook.implementation",
"demos.rx.RxJavaObservableExecutionHookImpl");
//配置hook實(shí)現(xiàn)類
Observable.just("str1", "str2", "str3", "str4")...
}
RxJavaObservableExecutionHookImpl是我們實(shí)現(xiàn)的一個(gè)RxJavaObservableExecutionHook類型:
public class RxJavaObservableExecutionHookImpl extends RxJavaObservableExecutionHook{
@Override
public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
System.out.println("perform intercept onCreate");
return super.onCreate(f);
}
}
我們執(zhí)行以下輸出:
output:
perform intercept onCreate //被我們的hook函數(shù)攔截
perform intercept onCreate //被我們的hook函數(shù)攔截
onNext [str1]
onNext [str2]
onNext [str3]
onNext [str4]
onCompleted
我們可以從輸出日志看出,我們所配置的hook類,確實(shí)被構(gòu)造,并且成功實(shí)現(xiàn)了hook操作。根據(jù)上面所述,如果我們不采用配置Hook類的方式,RxJava將調(diào)用一個(gè)默認(rèn)的實(shí)現(xiàn)類:RxJavaObservableExecutionHookDefault.getInstance()也搓。而這個(gè)類的主要操作實(shí)際上就是直接返回,不進(jìn)行任何的攔截:
//code RxJavaObservableExecutionHookDefault
public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
return f;//直接返回,不進(jìn)行任何攔截和轉(zhuǎn)換
}
好的,我們花了很大的篇幅就是講了RxJavaHooks的onCreate函數(shù),我們?cè)跊]有配置任何的hook函數(shù)的情況下,返回值就是我們所傳入的OnSubscribe對(duì)象棍辕。那么什么是OnSubscribe對(duì)象呢暮现?我們先來(lái)看下OnSubscribe這個(gè)類吧:
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
// cover for generics insanity
}
public interface Action1<T> extends Action {
void call(T t);
}
OnSubscribe繼承于Action1,OnSubscribe限定了在Action1聲明中的泛型變量T,是一個(gè)Subscriber類型,而T變量聲明應(yīng)用在Action1的call函數(shù)中,所以,OnSubscribe實(shí)際上是限定了OnSubscribe中的call方法的參數(shù)類型是一個(gè)Subscriber類型。但這并沒有解釋OnSubscribe是個(gè)什么東西,我們來(lái)看下OnSubscribe的繼承樹:
在OnSubscribe類型的頂端是一個(gè)Function楚昭。Function就是一個(gè)函數(shù)或者說(shuō)一個(gè)過(guò)程,那么OnSubscribe是一個(gè)什么樣的過(guò)程呢栖袋?OnSubscribe是一個(gè)當(dāng)訂閱者訂閱的時(shí)候,執(zhí)行的一個(gè)過(guò)程。正如OnSubscribe這個(gè)類名所描述的那樣,這個(gè)過(guò)程的觸發(fā)在Subscribe的時(shí)候抚太。這實(shí)際上是一種策略的模式,根據(jù)不同的需求構(gòu)建不同的過(guò)程策略,比如我們回到上面說(shuō)的例子中,當(dāng)我們傳入一個(gè)數(shù)組對(duì)象的時(shí)候:
public static <T> Observable<T> from(T[] array) {
....
return unsafeCreate(new OnSubscribeFromArray<T>(array));
}
RxJava將采用一個(gè)叫做OnSubscribeFromArray的策略對(duì)象傳遞給unsafeCreate函數(shù)塘幅。為了繼續(xù)說(shuō)明這點(diǎn)我們不妨在來(lái)看下map函數(shù):
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return unsafeCreate(new OnSubscribeMap<T, R>(this, func));
}
正如我上面所說(shuō)的一樣,在map函數(shù)中,基于我們上次構(gòu)造的Observable對(duì)象又生成了一個(gè)新的Observable對(duì)象,而新生成的對(duì)象,將采用OnSubscribeMap策略來(lái)處理訂閱事件。這種包裝的寫法實(shí)際上是一種職責(zé)鏈模式尿贫〉缦保回顧一下我們上面簡(jiǎn)單例子的那個(gè)流程:
1.通過(guò)just生成一個(gè)數(shù)組Observable對(duì)象-Observable1
2.通過(guò)map完成映射,在Observable1之上包裝,生成一個(gè)新的Observable對(duì)象Observable2
3.通過(guò)subscribe函數(shù)訂閱Observable2對(duì)象
通過(guò)上面的"引用關(guān)系圖"我們可以很清楚的看到Observable類型的整條職責(zé)鏈,那么當(dāng)我們調(diào)用Observable.subscribe的時(shí)候發(fā)生了什么呢?
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
這個(gè)方法中調(diào)用了一個(gè)靜態(tài)方法subscribe(subscriber, this)來(lái)生成這種訂閱關(guān)系庆亡。
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
....pre check
subscriber.onStart();
....
try {
// allow the hook to intercept and/or decorate
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
....
}
return Subscriptions.unsubscribed();
}
}
這個(gè)函數(shù)中,會(huì):
1.進(jìn)行pre check檢查參數(shù)是否合法
2.回調(diào)subscriber.onStart方法,告訴訂閱者我這邊已經(jīng)準(zhǔn)備開始了
3.之后就是我們的老朋友RxJavaHooks對(duì)象執(zhí)行onObservableStart匾乓,用來(lái)在onSubscribe函數(shù)執(zhí)行前做一次hook。(如何hook根據(jù)我們上面的方法可以實(shí)現(xiàn),不再贅述)
4.通過(guò)調(diào)用onSubscribe對(duì)象的call方法執(zhí)行函數(shù)操作
5.通過(guò)RxJavaHooks的onObservableReturn去hook訂閱操作執(zhí)行結(jié)束以后的返回值
根據(jù)我們上面的"引用關(guān)系圖",我們可以知道訂閱者發(fā)生訂閱的時(shí)候,最初執(zhí)行的onSubscribe對(duì)象是OnSubscribeMap類型,我們來(lái)看下這個(gè)類型的實(shí)現(xiàn):
//code OnSubscribeMap.java
public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
this.source = source;
this.transformer = transformer;
}
@Override
public void call(final Subscriber<? super R> o) {
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
source.unsafeSubscribe(parent);
}
OnSubscribeMap在構(gòu)造的時(shí)候需要傳遞兩個(gè)參數(shù)
1.輸入源Observable對(duì)象source
2.映射函數(shù):transformer
當(dāng)調(diào)用call方法的時(shí)候OnSubscribeMap會(huì)生成一個(gè)新的訂閱對(duì)象MapSubscriber,然后注冊(cè)到source對(duì)象(對(duì)應(yīng)例子中的Observable1)的訂閱者中又谋。unsafeSubscribe執(zhí)行代碼:
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
try {
subscriber.onStart();
RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
}
這時(shí)候unsafeSubscribe中的訂閱Observable的onSubscribe函數(shù)對(duì)象就是"引用關(guān)系圖"中的Observable1.OnSubscribeFromArray對(duì)象拼缝。一樣,我們看下OnSubscribeFromArray的call方法:
@Override
public void call(Subscriber<? super T> child) {
child.setProducer(new FromArrayProducer<T>(child, array));
}
這里我們引入了一個(gè)新的類Producer,而OnSubscribeFromArray中所引用的實(shí)現(xiàn)類是FromArrayProducer。我們根據(jù)上面的調(diào)用鏈可以知道此時(shí),傳入OnSubscribeFromArray.call方法中的參數(shù)child對(duì)象,對(duì)應(yīng)著已經(jīng)被OnSubscribeMap裝飾過(guò)的MapSubscriber對(duì)象彰亥。而在OnSubscribeFromArray.call方法中調(diào)用了Subscriber的setProducer方法,我們看下這個(gè)方法是干什么的:
public void setProducer(Producer p) {
long toRequest;
boolean passToSubscriber = false;
synchronized (this) {
toRequest = requested;
producer = p;
if (subscriber != null) {
if (toRequest == NOT_SET) {
passToSubscriber = true;
}
}
}
// do after releasing lock
if (passToSubscriber) {
subscriber.setProducer(producer);
} else {
if (toRequest == NOT_SET) {
producer.request(Long.MAX_VALUE);
} else {
producer.request(toRequest);
}
}
}
Producer顧名思義,就是對(duì)一個(gè)生產(chǎn)者的一個(gè)抽象,而生產(chǎn)什么東西呢咧七?生產(chǎn)的是數(shù)據(jù),Producer.request(int n)函數(shù)中的n參數(shù)代表讓生產(chǎn)者生產(chǎn)多少的數(shù)據(jù)對(duì)象。為什么需要這個(gè)方法呢任斋?toRequest變量又是從何而來(lái)呢继阻?toRequest由Subscriber的成員變量requested,而requested通過(guò)Subscriber的request函數(shù)進(jìn)行賦值:
protected final void request(long n) {
if (n < 0) {
throw new IllegalArgumentException("number requested cannot be negative: " + n);
}
Producer producerToRequestFrom;
synchronized (this) {
if (producer != null) {
producerToRequestFrom = producer;
} else {
addToRequested(n);//如果沒有producer需要計(jì)數(shù)
return;
}
}
// after releasing lock (we should not make requests holding a lock)
producerToRequestFrom.request(n);
}
代碼寫的很清楚,當(dāng)你的這個(gè)訂閱者對(duì)象Subscriber并沒有對(duì)應(yīng)的producer的時(shí)候,每一次請(qǐng)求數(shù)據(jù)的操作都會(huì)被記錄到你的requested變量中,這樣,當(dāng)你進(jìn)行設(shè)置了producer的時(shí)候,就可以知道自己請(qǐng)求了多少次,需要多少個(gè)數(shù)據(jù)對(duì)象。那么我們回到Subscriber的setProducer方法中去,當(dāng)代碼執(zhí)行到最后,Subscriber會(huì)調(diào)用Producer的request方法來(lái)請(qǐng)求數(shù)據(jù),而這里所對(duì)應(yīng)的Producer對(duì)象,就是在OnSubscribeFromArray.call方法中傳遞進(jìn)來(lái)的FromArrayProducer類型對(duì)象废酷。
public void call(Subscriber<? super T> child) {
child.setProducer(new FromArrayProducer<T>(child, array));
}
根據(jù)我們最終輸出的日志,我們可以推測(cè)FromArrayProducer是進(jìn)行了一次數(shù)組的迭代遍歷,那么是不是這樣呢瘟檩?我們看下FromArrayProducer的request方法:
@Override
public void request(long n) {
if (n < 0) {//異常參數(shù)檢查
throw new IllegalArgumentException("n >= 0 required but it was " + n);
}
if (n == Long.MAX_VALUE) {
if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
fastPath();
}
} else
if (n != 0) {
if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
slowPath(n);
}
}
}
這里,在FromArrayProducer的request處理的時(shí)候執(zhí)行了兩個(gè)分支分別對(duì)應(yīng)執(zhí)行fastPath方法和slowPath方法。而在執(zhí)行之前有判斷了一個(gè)條件:
BackpressureUtils.getAndAddRequest(this, n)
//code BackpressureUtils.getAndAddRequest()
public static long getAndAddRequest(AtomicLong requested, long n) {
// add n to field but check for overflow
while (true) {
long current = requested.get();
long next = addCap(current, n);
if (requested.compareAndSet(current, next)) {
return current;
}
}
}
注意這里的傳遞對(duì)象:
1.requested參數(shù)對(duì)應(yīng)的是FromArrayProducer對(duì)象
2.n對(duì)應(yīng)的就是我們所傳遞的請(qǐng)求總數(shù)
requested初始值為0,通過(guò)addCap將數(shù)值加入到requested對(duì)象中,這樣就完成了生成對(duì)象的統(tǒng)計(jì)操作澈蟆。
public static long addCap(long a, long b) {
long u = a + b;
if (u < 0L) {//防止越界
u = Long.MAX_VALUE;
}
return u;
}
同時(shí)我們也可以看到,由于產(chǎn)生請(qǐng)求以后,FromArrayProducer統(tǒng)計(jì)數(shù)增加,因此返回的(BackpressureUtils.getAndAddRequest(this, n) 必不為0芒帕。所以每一個(gè)數(shù)據(jù)生產(chǎn)者FromArrayProducer對(duì)象只能被使用一次,這時(shí)候有人會(huì)問(wèn)了如果我用以下的代碼,數(shù)據(jù)可以被回調(diào)兩次的。
Observable<String> ob = Observable.just("str1", "str2", "str3", "str4");
ob.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted ");
}
@Override
public void onNext(String t) {
System.out.println("onNext " + t);
}
@Override
public void onError(Throwable e) {}
});
System.out.println("----------");
ob.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted ");
}
@Override
public void onNext(String t) {
System.out.println("onNext " + t);
}
@Override
public void onError(Throwable e) {}
});
最后輸出:
output:
//第一次輸出
onNext str1
onNext str2
onNext str3
onNext str4
onCompleted
onNext str1//第二次輸出
onNext str2
onNext str3
onNext str4
onCompleted
這是為什么呢丰介?
回答這個(gè)問(wèn)題,我們需要回到我們的OnSubscribeFromArray類中:
@Override
public void call(Subscriber<? super T> child) {
child.setProducer(new FromArrayProducer<T>(child, array));
}
因?yàn)槊堪l(fā)生一次訂閱操作就生成一個(gè)全新的FromArrayProducer對(duì)象,因此你用到數(shù)據(jù)自然是在的背蟆。我們最后再來(lái)看下fastPath函數(shù)和slowPath函數(shù),字面意思上好像是代表快慢的路徑搜索,我們現(xiàn)在看下fastPath函數(shù):
void fastPath() {
final Subscriber<? super T> child = this.child;
for (T t : array) {
if (child.isUnsubscribed()) {
return;
}
child.onNext(t);
}
if (child.isUnsubscribed()) {
return;
}
child.onCompleted();
}
代碼非常簡(jiǎn)單,就是遍歷內(nèi)存中的數(shù)組array,然后執(zhí)行訂閱者的onNext回調(diào)和onCompleted回調(diào)函數(shù)哮幢。而slowPath方法呢带膀?
void slowPath(long r) {
final Subscriber<? super T> child = this.child;
final T[] array = this.array;
final int n = array.length;
long e = 0L;
int i = index;//當(dāng)前數(shù)據(jù)流索引
for (;;) {
while (r != 0L && i != n) {
child.onNext(array[i]);
i++;
if (i == n) {
return;
}
r--;
e--;
}
r = get() + e;
if (r == 0L) {
index = i;
r = addAndGet(e);
if (r == 0L) {
return;
}
e = 0L;
}
}
}
這個(gè)函數(shù)中幾個(gè)重要的參數(shù):
1.參數(shù)r代表你的請(qǐng)求數(shù)
2.e代表數(shù)據(jù)消耗數(shù)量
3.n代表你的數(shù)組長(zhǎng)度
4.index代表你的數(shù)組數(shù)據(jù)流索引
這個(gè)函數(shù)執(zhí)行的時(shí)候會(huì)先執(zhí)行一個(gè)大循環(huán),而這個(gè)大循環(huán)中包含著一個(gè)小循環(huán):
while (r != 0L && i != n) {
child.onNext(array[i]);
i++;
if (i == n) {
return;
}
r--;
e--;
}
用來(lái)判斷數(shù)據(jù)流i是否結(jié)束,或者請(qǐng)求數(shù)r是否滿足。當(dāng)滿足一個(gè)條件以后跳出循環(huán)執(zhí)行addAndGet方法將請(qǐng)求數(shù)目加入到計(jì)數(shù)器中:
r = get() + e;
if (r == 0L) {
index = i;
r = addAndGet(e);
if (r == 0L) {
return;
}
e = 0L;
}
按照"訂閱函數(shù)調(diào)用時(shí)序圖"我們知道,此時(shí)我們的訂閱者類是被Observable.map函數(shù)裝飾過(guò)的MapSubscriber類,前面我們說(shuō)過(guò),這個(gè)類是一個(gè)Subscriber類的裝飾器,我們來(lái)看下它的基本實(shí)現(xiàn):
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
從構(gòu)造器上看,在構(gòu)建MapSubscriber類的時(shí)候需要指定它的被裝飾對(duì)象和映射函數(shù)mapper,而當(dāng)我們回調(diào)到MapSubscriber的onNext回調(diào)的時(shí)候:
@Override
public void onNext(T t) {
R result;
try {
result = mapper.call(t);//映射轉(zhuǎn)換
} catch (Throwable ex) {
....
return;
}
actual.onNext(result);//回調(diào)被裝飾對(duì)象
}
我們傳入的原始數(shù)據(jù)t,將被mapper映射函數(shù)處理,轉(zhuǎn)化為一個(gè)R類型的結(jié)果result,然后把這個(gè)結(jié)果回調(diào)給被裝飾對(duì)象橙垢。按照我們上面的例子,這里面我們的映射函數(shù)就是將t外面增加"[]"的Func1接口函數(shù),actual被裝飾對(duì)象就是我們代碼中的匿名訂閱者對(duì)象垛叨。
好了我們總結(jié)一下,我們通過(guò)上面一個(gè)非常非常簡(jiǎn)單例子我們接觸到RxJava這個(gè)大家族中的很多核心類:
1.Observable是一個(gè)被觀察者對(duì)象,每個(gè)訂閱者需要通過(guò)subscribe方法與Observable對(duì)象簽訂訂閱契約
2.Observable的構(gòu)建是一系列OnSubscribe對(duì)象職責(zé)鏈?zhǔn)教幚磉^(guò)程
3.RxJava中可以在觀察的每個(gè)階段配置hook函數(shù)
4.鏈?zhǔn)教幚磉^(guò)程中,訂閱者Subscriber對(duì)象可能會(huì)被鏈條中的中間環(huán)節(jié)所包裝
5.Producer是用來(lái)定義生產(chǎn)數(shù)據(jù)的類型
6.Subscriber在函數(shù)setProducer中調(diào)用Producer的request(int n)方法用于請(qǐng)求n個(gè)數(shù)據(jù)