RxJava源碼解析(一)從一個(gè)例子開始

注:本篇文章代碼基于Rxjava1.x
RxJava是目前非常流行的一個(gè)響應(yīng)是編程框架,它用了Java的語(yǔ)法特性來(lái)模擬出一套流式過(guò)程化的寫法,并可以通過(guò)線程調(diào)度器,非常方便的實(shí)現(xiàn)線程切換贡茅。本系列文章假設(shè)讀者已經(jīng)是使用過(guò)Rxjava或者RxAndroid的開發(fā)者,如果你還未使用過(guò),不妨看下下面的幾篇文章:
1.謎之RxJava
2.給 Android 開發(fā)者的 RxJava 詳解
3.深入淺出RxJava

本篇將使用一個(gè)非常簡(jiǎn)單的例子做引線,引出在Rxjava中一些核心類和核心對(duì)象,如果你尚未使用過(guò)Rxjava,請(qǐng)?jiān)陂喿x過(guò)上面幾篇文章后,編寫過(guò)一些Rxjava相關(guān)代碼后再閱讀本文章许布。
我們來(lái)看下我們的例子:

Observable.just("str1", "str2", "str3", "str4")
        .map(new Func1<String, String>() {
            @Override
            public String call(String t) {
                // TODO Auto-generated method stub
                return "[" + t + "]";
            }
        }).subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted ");
            }
            @Override
            public void onNext(String t) {
                System.out.println("onNext " + t);
            }

            @Override
            public void onError(Throwable e) {}
        });

上面的例子非常簡(jiǎn)單我們換成我們的自然語(yǔ)言可以分成以下步驟:
1.構(gòu)建一個(gè)String[]數(shù)組的Observable
2.通過(guò)映射方法map,將返回值映射成為"["+t+"]"格式
3.被一個(gè)訂閱者所訂閱
最后我們將在控制臺(tái)輸出:

output:
onNext [str1]
onNext [str2]
onNext [str3]
onNext [str4]
onCompleted

雖然是短短幾行代碼,簡(jiǎn)單幾個(gè)類,但是已經(jīng)包含了大部分RxJava中的核心元素,本章就以這個(gè)簡(jiǎn)單的例子為引子,引出RxJava的基本體系結(jié)構(gòu)和一些核心功能類厚柳。我們先來(lái)看下

Observable.just("str1", "str2", "str3", "str4")

這個(gè)方法的輸入是一堆數(shù)組,輸出是一個(gè)Observable對(duì)象匆绣。實(shí)際上它就是一個(gè)靜態(tài)工廠,我們看下它的源碼:

public static <T> Observable<T> just(T t1, T t2, T t3, T t4) {
        return from((T[])new Object[] { t1, t2, t3, t4 });
}// call from

public static <T> Observable<T> from(T[] array) {
        int n = array.length;
        if (n == 0) {
            return empty();
        } else
        if (n == 1) {
            return just(array[0]);//選擇構(gòu)造方法
        }
        return unsafeCreate(new OnSubscribeFromArray<T>(array));//使用OnSubscribeFromArray為參數(shù)構(gòu)造
    }

just代碼中調(diào)用了from方法來(lái)執(zhí)行構(gòu)造,而在from方法中,會(huì)先進(jìn)行數(shù)組空判斷和長(zhǎng)度判斷,目的是為了選擇不同的構(gòu)造方法。最后我們所傳入的數(shù)組對(duì)象,將被包裝成為一個(gè)OnSubscribeFromArray對(duì)象倡怎。而這個(gè)對(duì)象作為一個(gè)參數(shù)被unsafeCreate方法所調(diào)用。unsafeCreate是構(gòu)造Observable最核心的方法,不論哪種方式的構(gòu)造器最終都會(huì)調(diào)用到這個(gè)方法。我們現(xiàn)在看下這個(gè)方法的聲明:

public static <T> Observable<T> unsafeCreate(OnSubscribe<T> f) {
        return new Observable<T>(RxJavaHooks.onCreate(f));
    }

首先,我們傳入的OnSubscribe對(duì)象將被RxJavaHooks的onCreate給hook住,轉(zhuǎn)化為一個(gè)OnSubscribe對(duì)象瞎颗。這里,如果你對(duì)aop不陌生的話,相信這塊很好理解,實(shí)際上相當(dāng)于你在構(gòu)造Observable的時(shí)候做了一層攔截,或者說(shuō)一次hook。我們不妨深入一點(diǎn),看下RxJavaHooks里面究竟到底做了什么轉(zhuǎn)換:

//code RxJavaHooks.java
public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
        Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate;
        if (f != null) {
            return f.call(onSubscribe);
        }
        return onSubscribe;
    }

RxJavaHooks.onCreate方法用到了一個(gè)函數(shù)對(duì)象onObservableCreate捌议。這里所定義的函數(shù)對(duì)象很類似我們?cè)趧?dòng)態(tài)語(yǔ)言中定義的閉包對(duì)象哼拔。我們看下onObservableCreate對(duì)象是怎么被賦值的:

public class RxJavaHooks {
  static {
        init();
    }
  static void init() { 
      ...
      initCreate();
  }
static void initCreate() {
        ...
        onObservableCreate = new Func1<Observable.OnSubscribe, Observable.OnSubscribe>() {
            @Override
            public Observable.OnSubscribe call(Observable.OnSubscribe f) {
                return RxJavaPlugins.getInstance().getObservableExecutionHook().onCreate(f);
            }
        };
     ...
}

RxJavaHooks類在類初始化的時(shí)候通過(guò)調(diào)用init->initCreate方法給onObservableCreate函數(shù)對(duì)象賦值。而賦值函數(shù)會(huì)調(diào)用

RxJavaPlugins.getInstance().getObservableExecutionHook().onCreate(f);

也就是說(shuō)RxJavaHooks只是提供一個(gè)簡(jiǎn)單的接口和初始化操作瓣颅。實(shí)際調(diào)用者在RxJavaPlugins中倦逐。我們看下RxJavaPlugins.getObservableExecutionHook函數(shù):

public RxJavaObservableExecutionHook getObservableExecutionHook() {
        if (observableExecutionHook.get() == null) {
            // check for an implementation from System.getProperty first
            Object impl = getPluginImplementationViaProperty(RxJavaObservableExecutionHook.class, System.getProperties());//通過(guò)系統(tǒng)配置獲取一個(gè)RxJavaObservableExecutionHook對(duì)象。
            if (impl == null) {
                // nothing set via properties so initialize with default
                observableExecutionHook.compareAndSet(null, RxJavaObservableExecutionHookDefault.getInstance());
//如果沒有配置對(duì)象則使用默認(rèn)對(duì)象
                // we don't return from here but call get() again in case of thread-race so the winner will always get returned
            } else {
                // we received an implementation from the system property so use it
                observableExecutionHook.compareAndSet(null, (RxJavaObservableExecutionHook) impl);
            }
        }
        return observableExecutionHook.get();
    }

實(shí)際上, getObservableExecutionHook()方法得到的對(duì)象也是一個(gè)單利,但是是非線程安全的,這段代碼主要做以下事情:

1.如果RxJavaObservableExecutionHook對(duì)象不存在,會(huì)先通過(guò)調(diào)用getPluginImplementationViaProperty方法,也就是通過(guò)查看系統(tǒng)配置參數(shù)查看是否有實(shí)現(xiàn)類,如果有,將生成一個(gè)具體的RxJavaObservableExecutionHook實(shí)例返回
2.如果通過(guò)步驟1無(wú)法生成一個(gè)RxJavaObservableExecutionHook對(duì)象,將返回一個(gè)默認(rèn)的RxJavaObservableExecutionHookDefault. getInstance()對(duì)象
3.最后將通過(guò)1,2獲取的對(duì)象記錄在全局變量中

getObservableExecutionHook函數(shù)流程圖

這里引出一個(gè)問(wèn)題,就是我們?nèi)绾巫⑷胍粋€(gè)hook函數(shù)呢檬姥?這就需要深入到getPluginImplementationViaProperty的具體實(shí)現(xiàn)中去:

{//code getPluginImplementationViaProperty()
final String classSimpleName = pluginClass.getSimpleName();
        String pluginPrefix = "rxjava.plugin.";
        String defaultKey = pluginPrefix + classSimpleName + ".implementation";
...
}

首先,getPluginImplementationViaProperty會(huì)先定義一個(gè)key,這個(gè)key的基本結(jié)構(gòu)為:rxjava.plugin.[classSimpleName].implementation粉怕。而這里的classSimpleName依賴于我們傳入的pluginClass對(duì)象贫贝。我們回到剛才的調(diào)用鏈:

 Object impl = getPluginImplementationViaProperty(RxJavaObservableExecutionHook.class, System.getProperties());
            

在調(diào)用getPluginImplementationViaProperty函數(shù)的時(shí)候,我們傳入的是一個(gè)RxJavaObservableExecutionHook類型,因此這里的classSimpleName 值對(duì)應(yīng)的應(yīng)該是"RxJavaObservableExecutionHook",所以我們就得到了配置的key為:

"rxjava.plugin.RxJavaObservableExecutionHook.implementation"

之后,getPluginImplementationViaProperty函數(shù)會(huì)通過(guò)這個(gè)key,從System.property中尋找具體的實(shí)現(xiàn)類,然后通過(guò)反射構(gòu)建出具體的實(shí)現(xiàn)對(duì)象。

//code getPluginImplementationViaProperty()
{
...
String implementingClass = props.getProperty(defaultKey);
 try {
                Class<?> cls = Class.forName(implementingClass);
                // narrow the scope (cast) to the type we're expecting
                cls = cls.asSubclass(pluginClass);
                return cls.newInstance();
            }
...
}

我們不妨來(lái)試一下這種寫法,還是基于上面的簡(jiǎn)單例子,我們?cè)诖a前增加一段話:

{
System.setProperty("rxjava.plugin.RxJavaObservableExecutionHook.implementation", 
                "demos.rx.RxJavaObservableExecutionHookImpl");
//配置hook實(shí)現(xiàn)類
Observable.just("str1", "str2", "str3", "str4")...
}

RxJavaObservableExecutionHookImpl是我們實(shí)現(xiàn)的一個(gè)RxJavaObservableExecutionHook類型:

public class RxJavaObservableExecutionHookImpl extends  RxJavaObservableExecutionHook{
    
    @Override
    public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
        System.out.println("perform intercept onCreate");
        return super.onCreate(f);
    }
}

我們執(zhí)行以下輸出:

output:
perform intercept onCreate //被我們的hook函數(shù)攔截
perform intercept onCreate //被我們的hook函數(shù)攔截
onNext [str1]
onNext [str2]
onNext [str3]
onNext [str4]
onCompleted
我們可以從輸出日志看出,我們所配置的hook類,確實(shí)被構(gòu)造,并且成功實(shí)現(xiàn)了hook操作。根據(jù)上面所述,如果我們不采用配置Hook類的方式,RxJava將調(diào)用一個(gè)默認(rèn)的實(shí)現(xiàn)類:RxJavaObservableExecutionHookDefault.getInstance()也搓。而這個(gè)類的主要操作實(shí)際上就是直接返回,不進(jìn)行任何的攔截:

//code RxJavaObservableExecutionHookDefault
public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
        return f;//直接返回,不進(jìn)行任何攔截和轉(zhuǎn)換
    }

好的,我們花了很大的篇幅就是講了RxJavaHooks的onCreate函數(shù),我們?cè)跊]有配置任何的hook函數(shù)的情況下,返回值就是我們所傳入的OnSubscribe對(duì)象棍辕。那么什么是OnSubscribe對(duì)象呢暮现?我們先來(lái)看下OnSubscribe這個(gè)類吧:

public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
        // cover for generics insanity
    }
public interface Action1<T> extends Action {
    void call(T t);
}

OnSubscribe繼承于Action1,OnSubscribe限定了在Action1聲明中的泛型變量T,是一個(gè)Subscriber類型,而T變量聲明應(yīng)用在Action1的call函數(shù)中,所以,OnSubscribe實(shí)際上是限定了OnSubscribe中的call方法的參數(shù)類型是一個(gè)Subscriber類型。但這并沒有解釋OnSubscribe是個(gè)什么東西,我們來(lái)看下OnSubscribe的繼承樹:

OnSubscribe的繼承樹

在OnSubscribe類型的頂端是一個(gè)Function楚昭。Function就是一個(gè)函數(shù)或者說(shuō)一個(gè)過(guò)程,那么OnSubscribe是一個(gè)什么樣的過(guò)程呢栖袋?OnSubscribe是一個(gè)當(dāng)訂閱者訂閱的時(shí)候,執(zhí)行的一個(gè)過(guò)程。正如OnSubscribe這個(gè)類名所描述的那樣,這個(gè)過(guò)程的觸發(fā)在Subscribe的時(shí)候抚太。這實(shí)際上是一種策略的模式,根據(jù)不同的需求構(gòu)建不同的過(guò)程策略,比如我們回到上面說(shuō)的例子中,當(dāng)我們傳入一個(gè)數(shù)組對(duì)象的時(shí)候:

public static <T> Observable<T> from(T[] array) {
        ....
        return unsafeCreate(new OnSubscribeFromArray<T>(array));
    }

RxJava將采用一個(gè)叫做OnSubscribeFromArray的策略對(duì)象傳遞給unsafeCreate函數(shù)塘幅。為了繼續(xù)說(shuō)明這點(diǎn)我們不妨在來(lái)看下map函數(shù):

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return unsafeCreate(new OnSubscribeMap<T, R>(this, func));
    }

正如我上面所說(shuō)的一樣,在map函數(shù)中,基于我們上次構(gòu)造的Observable對(duì)象又生成了一個(gè)新的Observable對(duì)象,而新生成的對(duì)象,將采用OnSubscribeMap策略來(lái)處理訂閱事件。這種包裝的寫法實(shí)際上是一種職責(zé)鏈模式尿贫〉缦保回顧一下我們上面簡(jiǎn)單例子的那個(gè)流程:
1.通過(guò)just生成一個(gè)數(shù)組Observable對(duì)象-Observable1
2.通過(guò)map完成映射,在Observable1之上包裝,生成一個(gè)新的Observable對(duì)象Observable2
3.通過(guò)subscribe函數(shù)訂閱Observable2對(duì)象

引用關(guān)系圖

通過(guò)上面的"引用關(guān)系圖"我們可以很清楚的看到Observable類型的整條職責(zé)鏈,那么當(dāng)我們調(diào)用Observable.subscribe的時(shí)候發(fā)生了什么呢?

public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }

這個(gè)方法中調(diào)用了一個(gè)靜態(tài)方法subscribe(subscriber, this)來(lái)生成這種訂閱關(guān)系庆亡。

static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
        ....pre check
        subscriber.onStart();
        ....
        try {
            // allow the hook to intercept and/or decorate
            RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
            return RxJavaHooks.onObservableReturn(subscriber);
        } catch (Throwable e) {
              ....
            }
            return Subscriptions.unsubscribed();
        }
    }

這個(gè)函數(shù)中,會(huì):
1.進(jìn)行pre check檢查參數(shù)是否合法
2.回調(diào)subscriber.onStart方法,告訴訂閱者我這邊已經(jīng)準(zhǔn)備開始了
3.之后就是我們的老朋友RxJavaHooks對(duì)象執(zhí)行onObservableStart匾乓,用來(lái)在onSubscribe函數(shù)執(zhí)行前做一次hook。(如何hook根據(jù)我們上面的方法可以實(shí)現(xiàn),不再贅述)
4.通過(guò)調(diào)用onSubscribe對(duì)象的call方法執(zhí)行函數(shù)操作
5.通過(guò)RxJavaHooks的onObservableReturn去hook訂閱操作執(zhí)行結(jié)束以后的返回值

根據(jù)我們上面的"引用關(guān)系圖",我們可以知道訂閱者發(fā)生訂閱的時(shí)候,最初執(zhí)行的onSubscribe對(duì)象是OnSubscribeMap類型,我們來(lái)看下這個(gè)類型的實(shí)現(xiàn):

//code OnSubscribeMap.java

public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
        this.source = source;
        this.transformer = transformer;
    }
@Override
    public void call(final Subscriber<? super R> o) {
        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
        o.add(parent);
        source.unsafeSubscribe(parent);
    }

OnSubscribeMap在構(gòu)造的時(shí)候需要傳遞兩個(gè)參數(shù)
1.輸入源Observable對(duì)象source
2.映射函數(shù):transformer

當(dāng)調(diào)用call方法的時(shí)候OnSubscribeMap會(huì)生成一個(gè)新的訂閱對(duì)象MapSubscriber,然后注冊(cè)到source對(duì)象(對(duì)應(yīng)例子中的Observable1)的訂閱者中又谋。unsafeSubscribe執(zhí)行代碼:

public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
        try {
            subscriber.onStart();
            RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
            return RxJavaHooks.onObservableReturn(subscriber);
}

這時(shí)候unsafeSubscribe中的訂閱Observable的onSubscribe函數(shù)對(duì)象就是"引用關(guān)系圖"中的Observable1.OnSubscribeFromArray對(duì)象拼缝。一樣,我們看下OnSubscribeFromArray的call方法:

 @Override
    public void call(Subscriber<? super T> child) {
        child.setProducer(new FromArrayProducer<T>(child, array));
    }

這里我們引入了一個(gè)新的類Producer,而OnSubscribeFromArray中所引用的實(shí)現(xiàn)類是FromArrayProducer。我們根據(jù)上面的調(diào)用鏈可以知道此時(shí),傳入OnSubscribeFromArray.call方法中的參數(shù)child對(duì)象,對(duì)應(yīng)著已經(jīng)被OnSubscribeMap裝飾過(guò)的MapSubscriber對(duì)象彰亥。而在OnSubscribeFromArray.call方法中調(diào)用了Subscriber的setProducer方法,我們看下這個(gè)方法是干什么的:

public void setProducer(Producer p) {
        long toRequest;
        boolean passToSubscriber = false;
        synchronized (this) {
            toRequest = requested;
            producer = p;
            if (subscriber != null) {
                if (toRequest == NOT_SET) {
                    passToSubscriber = true;
                }
            }
        }
        // do after releasing lock
        if (passToSubscriber) {
            subscriber.setProducer(producer);
        } else {
            if (toRequest == NOT_SET) {
                producer.request(Long.MAX_VALUE);
            } else {
                producer.request(toRequest);
            }
        }
    }

Producer顧名思義,就是對(duì)一個(gè)生產(chǎn)者的一個(gè)抽象,而生產(chǎn)什么東西呢咧七?生產(chǎn)的是數(shù)據(jù),Producer.request(int n)函數(shù)中的n參數(shù)代表讓生產(chǎn)者生產(chǎn)多少的數(shù)據(jù)對(duì)象。為什么需要這個(gè)方法呢任斋?toRequest變量又是從何而來(lái)呢继阻?toRequest由Subscriber的成員變量requested,而requested通過(guò)Subscriber的request函數(shù)進(jìn)行賦值:

protected final void request(long n) {
        if (n < 0) {
            throw new IllegalArgumentException("number requested cannot be negative: " + n);
        }
        Producer producerToRequestFrom;
        synchronized (this) {
            if (producer != null) {
                producerToRequestFrom = producer;
            } else {
                addToRequested(n);//如果沒有producer需要計(jì)數(shù)
                return;
            }
        }
        // after releasing lock (we should not make requests holding a lock)
        producerToRequestFrom.request(n);
    }

代碼寫的很清楚,當(dāng)你的這個(gè)訂閱者對(duì)象Subscriber并沒有對(duì)應(yīng)的producer的時(shí)候,每一次請(qǐng)求數(shù)據(jù)的操作都會(huì)被記錄到你的requested變量中,這樣,當(dāng)你進(jìn)行設(shè)置了producer的時(shí)候,就可以知道自己請(qǐng)求了多少次,需要多少個(gè)數(shù)據(jù)對(duì)象。那么我們回到Subscriber的setProducer方法中去,當(dāng)代碼執(zhí)行到最后,Subscriber會(huì)調(diào)用Producer的request方法來(lái)請(qǐng)求數(shù)據(jù),而這里所對(duì)應(yīng)的Producer對(duì)象,就是在OnSubscribeFromArray.call方法中傳遞進(jìn)來(lái)的FromArrayProducer類型對(duì)象废酷。

public void call(Subscriber<? super T> child) {
        child.setProducer(new FromArrayProducer<T>(child, array));
    }

根據(jù)我們最終輸出的日志,我們可以推測(cè)FromArrayProducer是進(jìn)行了一次數(shù)組的迭代遍歷,那么是不是這樣呢瘟檩?我們看下FromArrayProducer的request方法:

@Override
        public void request(long n) {
            if (n < 0) {//異常參數(shù)檢查
                throw new IllegalArgumentException("n >= 0 required but it was " + n);
            }
            if (n == Long.MAX_VALUE) {
                if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                    fastPath();
                }
            } else
            if (n != 0) {
                if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                    slowPath(n);
                }
            }
        }
訂閱函數(shù)調(diào)用時(shí)序圖

這里,在FromArrayProducer的request處理的時(shí)候執(zhí)行了兩個(gè)分支分別對(duì)應(yīng)執(zhí)行fastPath方法和slowPath方法。而在執(zhí)行之前有判斷了一個(gè)條件:

BackpressureUtils.getAndAddRequest(this, n)
//code BackpressureUtils.getAndAddRequest()
public static long getAndAddRequest(AtomicLong requested, long n) {
        // add n to field but check for overflow
        while (true) {
            long current = requested.get();
            long next = addCap(current, n);
            if (requested.compareAndSet(current, next)) {
                return current;
            }
        }
    }

注意這里的傳遞對(duì)象:
1.requested參數(shù)對(duì)應(yīng)的是FromArrayProducer對(duì)象
2.n對(duì)應(yīng)的就是我們所傳遞的請(qǐng)求總數(shù)
requested初始值為0,通過(guò)addCap將數(shù)值加入到requested對(duì)象中,這樣就完成了生成對(duì)象的統(tǒng)計(jì)操作澈蟆。

public static long addCap(long a, long b) {
        long u = a + b;
        if (u < 0L) {//防止越界
            u = Long.MAX_VALUE;
        }
        return u;
    }

同時(shí)我們也可以看到,由于產(chǎn)生請(qǐng)求以后,FromArrayProducer統(tǒng)計(jì)數(shù)增加,因此返回的(BackpressureUtils.getAndAddRequest(this, n) 必不為0芒帕。所以每一個(gè)數(shù)據(jù)生產(chǎn)者FromArrayProducer對(duì)象只能被使用一次,這時(shí)候有人會(huì)問(wèn)了如果我用以下的代碼,數(shù)據(jù)可以被回調(diào)兩次的。

Observable<String> ob = Observable.just("str1", "str2", "str3", "str4");
        ob.subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted ");
            }
            @Override
            public void onNext(String t) {
                System.out.println("onNext " + t);
            }

            @Override
            public void onError(Throwable e) {}
        });
        System.out.println("----------");
        ob.subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted ");
            }
            @Override
            public void onNext(String t) {
                System.out.println("onNext " + t);
            }

            @Override
            public void onError(Throwable e) {}
        });

最后輸出:

output:
//第一次輸出
onNext str1
onNext str2
onNext str3
onNext str4
onCompleted
onNext str1//第二次輸出
onNext str2
onNext str3
onNext str4
onCompleted

這是為什么呢丰介?
回答這個(gè)問(wèn)題,我們需要回到我們的OnSubscribeFromArray類中:

@Override
    public void call(Subscriber<? super T> child) {
        child.setProducer(new FromArrayProducer<T>(child, array));
    }

因?yàn)槊堪l(fā)生一次訂閱操作就生成一個(gè)全新的FromArrayProducer對(duì)象,因此你用到數(shù)據(jù)自然是在的背蟆。我們最后再來(lái)看下fastPath函數(shù)和slowPath函數(shù),字面意思上好像是代表快慢的路徑搜索,我們現(xiàn)在看下fastPath函數(shù):

void fastPath() {
            final Subscriber<? super T> child = this.child;
            for (T t : array) {
                if (child.isUnsubscribed()) {
                    return;
                }
                child.onNext(t);
            }
            if (child.isUnsubscribed()) {
                return;
            }
            child.onCompleted();
        }

代碼非常簡(jiǎn)單,就是遍歷內(nèi)存中的數(shù)組array,然后執(zhí)行訂閱者的onNext回調(diào)和onCompleted回調(diào)函數(shù)哮幢。而slowPath方法呢带膀?

void slowPath(long r) {
            final Subscriber<? super T> child = this.child;
            final T[] array = this.array;
            final int n = array.length;
            long e = 0L;
            int i = index;//當(dāng)前數(shù)據(jù)流索引
            for (;;) {
                while (r != 0L && i != n) {
                    child.onNext(array[i]);
                    i++;
                    if (i == n) {
                        return;
                    }
                    r--;
                    e--;
                }
                r = get() + e;
                if (r == 0L) {
                    index = i;
                    r = addAndGet(e);
                    if (r == 0L) {
                        return;
                    }
                    e = 0L;
                }
            }
        }

這個(gè)函數(shù)中幾個(gè)重要的參數(shù):
1.參數(shù)r代表你的請(qǐng)求數(shù)
2.e代表數(shù)據(jù)消耗數(shù)量
3.n代表你的數(shù)組長(zhǎng)度
4.index代表你的數(shù)組數(shù)據(jù)流索引
這個(gè)函數(shù)執(zhí)行的時(shí)候會(huì)先執(zhí)行一個(gè)大循環(huán),而這個(gè)大循環(huán)中包含著一個(gè)小循環(huán):

while (r != 0L && i != n) {
                    child.onNext(array[i]);
                    i++;
                    if (i == n) {
                        return;
                    }
                    r--;
                    e--;
                }

用來(lái)判斷數(shù)據(jù)流i是否結(jié)束,或者請(qǐng)求數(shù)r是否滿足。當(dāng)滿足一個(gè)條件以后跳出循環(huán)執(zhí)行addAndGet方法將請(qǐng)求數(shù)目加入到計(jì)數(shù)器中:

 r = get() + e;
                if (r == 0L) {
                    index = i;
                    r = addAndGet(e);
                    if (r == 0L) {
                        return;
                    }
                    e = 0L;
                }

按照"訂閱函數(shù)調(diào)用時(shí)序圖"我們知道,此時(shí)我們的訂閱者類是被Observable.map函數(shù)裝飾過(guò)的MapSubscriber類,前面我們說(shuō)過(guò),這個(gè)類是一個(gè)Subscriber類的裝飾器,我們來(lái)看下它的基本實(shí)現(xiàn):

 public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }

從構(gòu)造器上看,在構(gòu)建MapSubscriber類的時(shí)候需要指定它的被裝飾對(duì)象和映射函數(shù)mapper,而當(dāng)我們回調(diào)到MapSubscriber的onNext回調(diào)的時(shí)候:

@Override
        public void onNext(T t) {
            R result;
            try {
                result = mapper.call(t);//映射轉(zhuǎn)換
            } catch (Throwable ex) {
                ....
                return;
            }
            actual.onNext(result);//回調(diào)被裝飾對(duì)象
        }

我們傳入的原始數(shù)據(jù)t,將被mapper映射函數(shù)處理,轉(zhuǎn)化為一個(gè)R類型的結(jié)果result,然后把這個(gè)結(jié)果回調(diào)給被裝飾對(duì)象橙垢。按照我們上面的例子,這里面我們的映射函數(shù)就是將t外面增加"[]"的Func1接口函數(shù),actual被裝飾對(duì)象就是我們代碼中的匿名訂閱者對(duì)象垛叨。

好了我們總結(jié)一下,我們通過(guò)上面一個(gè)非常非常簡(jiǎn)單例子我們接觸到RxJava這個(gè)大家族中的很多核心類:
1.Observable是一個(gè)被觀察者對(duì)象,每個(gè)訂閱者需要通過(guò)subscribe方法與Observable對(duì)象簽訂訂閱契約
2.Observable的構(gòu)建是一系列OnSubscribe對(duì)象職責(zé)鏈?zhǔn)教幚磉^(guò)程
3.RxJava中可以在觀察的每個(gè)階段配置hook函數(shù)
4.鏈?zhǔn)教幚磉^(guò)程中,訂閱者Subscriber對(duì)象可能會(huì)被鏈條中的中間環(huán)節(jié)所包裝
5.Producer是用來(lái)定義生產(chǎn)數(shù)據(jù)的類型
6.Subscriber在函數(shù)setProducer中調(diào)用Producer的request(int n)方法用于請(qǐng)求n個(gè)數(shù)據(jù)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子嗽元,更是在濱河造成了極大的恐慌敛纲,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,348評(píng)論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件剂癌,死亡現(xiàn)場(chǎng)離奇詭異淤翔,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)佩谷,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,122評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門旁壮,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人谐檀,你說(shuō)我怎么就攤上這事抡谐。” “怎么了桐猬?”我有些...
    開封第一講書人閱讀 156,936評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵麦撵,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我溃肪,道長(zhǎng)免胃,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,427評(píng)論 1 283
  • 正文 為了忘掉前任乍惊,我火速辦了婚禮杜秸,結(jié)果婚禮上放仗,老公的妹妹穿的比我還像新娘润绎。我一直安慰自己,他們只是感情好诞挨,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,467評(píng)論 6 385
  • 文/花漫 我一把揭開白布莉撇。 她就那樣靜靜地躺著,像睡著了一般惶傻。 火紅的嫁衣襯著肌膚如雪棍郎。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,785評(píng)論 1 290
  • 那天银室,我揣著相機(jī)與錄音涂佃,去河邊找鬼。 笑死蜈敢,一個(gè)胖子當(dāng)著我的面吹牛辜荠,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播抓狭,決...
    沈念sama閱讀 38,931評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼伯病,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了否过?” 一聲冷哼從身側(cè)響起午笛,我...
    開封第一講書人閱讀 37,696評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤惭蟋,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后药磺,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體告组,經(jīng)...
    沈念sama閱讀 44,141評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,483評(píng)論 2 327
  • 正文 我和宋清朗相戀三年与涡,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了惹谐。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,625評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡驼卖,死狀恐怖氨肌,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情酌畜,我是刑警寧澤怎囚,帶...
    沈念sama閱讀 34,291評(píng)論 4 329
  • 正文 年R本政府宣布,位于F島的核電站桥胞,受9級(jí)特大地震影響恳守,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜贩虾,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,892評(píng)論 3 312
  • 文/蒙蒙 一催烘、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧缎罢,春花似錦伊群、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,741評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至咽袜,卻和暖如春丸卷,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背询刹。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評(píng)論 1 265
  • 我被黑心中介騙來(lái)泰國(guó)打工谜嫉, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人凹联。 一個(gè)月前我還...
    沈念sama閱讀 46,324評(píng)論 2 360
  • 正文 我出身青樓沐兰,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親匕垫。 傳聞我的和親對(duì)象是個(gè)殘疾皇子僧鲁,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,492評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容