說說關(guān)于RxJava源碼部分解讀

一.正常執(zhí)行流程分析

 Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("123456");
                subscriber.onCompleted();
            }
        }).subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String string) {
                Log.i("TAG","String=" + string);
            }
        });

這是一個(gè)簡(jiǎn)單的構(gòu)建RxJava從被觀察者到觀察者的流程,這里我們首先還是先聲明一些概念性的東西

  • Observable 被觀察者
  • Subscriber 觀察者
  • OnSubscribe 從設(shè)計(jì)模式角度理解锚扎,OnSubscribe.call()可以理解為觀察者模式中被觀察者用來通知觀察者的notifyObservers()方法
  • subscribe() 實(shí)現(xiàn)觀察者與被觀察者訂閱關(guān)系的方法

同樣我們的解析流程也是從創(chuàng)建Observable鲫剿,創(chuàng)建Subscriber到最后的訂閱關(guān)系來進(jìn)行解析

1.創(chuàng)建Observable(被觀察者)

創(chuàng)建Observable通過Observable.create()方法衣洁,我們看一下這個(gè)方法的實(shí)現(xiàn)

public static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(RxJavaHooks.onCreate(f));
}

我們看一下RxJavaHooks.onCreate()方法

public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
    Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate;
    if (f != null) {
        return f.call(onSubscribe);
    }
    return onSubscribe;
}

這里其實(shí)就是返回onCreate傳入的OnSubscribe,也就是我們創(chuàng)建的OnSubscribe

至此我們做下邏輯梳理:Observable.create()方法構(gòu)造了一個(gè)被觀察者Observable對(duì)象,同時(shí)將new出來的OnSubscribe賦值給了該Observable的成員變量onSubscribe毫玖。

2.創(chuàng)建Subscriber(觀察者)

創(chuàng)建其實(shí)很簡(jiǎn)單炼蹦,我們這里簡(jiǎn)單看一下Subscriber的源碼

public abstract class Subscriber<T> implements Observer<T>, Subscription {

    private final SubscriptionList subscriptions;//訂閱事件集羡宙,所有發(fā)送給當(dāng)前Subscriber的事件都會(huì)保存在這里

    ...

    protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) {
        this.subscriber = subscriber;
        this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList();
    }

    ...

    @Override
    public final void unsubscribe() {
        subscriptions.unsubscribe();
    }

    @Override
    public final boolean isUnsubscribed() {
        return subscriptions.isUnsubscribed();
    }

    public void onStart() {
    }

    ...
}

Subscriber實(shí)現(xiàn)了Observer接口,提供了onCompleted()掐隐,onError()與onNext()狗热,需要我們繼承并進(jìn)行相關(guān)操作。

Subscriber實(shí)現(xiàn)了Subscription接口虑省,從而對(duì)外提供isUnsubscribed()和unsubscribe()方法匿刮。前者用于判斷是否已經(jīng)取消訂閱;后者用于將訂閱事件列表(也就是當(dāng)前觀察者的成員變量subscriptions)中的所有Subscription取消訂閱探颈,并且不再接受觀察者Observable發(fā)送的后續(xù)事件僻焚。

3.subscribe()源碼分析

部分源碼如下

static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {

    ...

    subscriber.onStart();

    if (!(subscriber instanceof SafeSubscriber)) {
        subscriber = new SafeSubscriber<T>(subscriber);
    }

    try {
        RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);

        return RxJavaHooks.onObservableReturn(subscriber);
    } catch (Throwable e) {
        ...
        return Subscriptions.unsubscribed();
    }
}

上述代碼中最關(guān)鍵的就是RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber)。這里的RxJavaHooks和之前提到的一樣膝擂,RxJavaHooks.onObservableStart(observable, observable.onSubscribe)返回的正是他的第二個(gè)入?yún)bservable.onSubscribe虑啤,也就是當(dāng)前observable的成員變量onSubscribe。而這個(gè)成員變量我們前面提到過架馋,它是我們?cè)贠bservable.create()的時(shí)候new出來的狞山。

所以這段代碼可以簡(jiǎn)化為onSubscribe.call(subscriber),通過這樣就建立了觀察者與被觀察者的聯(lián)系

所以最后我們?cè)倏偨Y(jié)一下整個(gè)流程

1.調(diào)用crate()創(chuàng)建一個(gè)觀察者叉寂,同時(shí)創(chuàng)建一個(gè)OnSubscribe作為該方法的入?yún)?br> 2.調(diào)用subscribe()來訂閱我們自己創(chuàng)建的觀察者Subscriber
3.一旦調(diào)用subscribe()方法后就會(huì)觸發(fā)執(zhí)行OnSubscribe.call()
4.我們就可以在call方法調(diào)用觀察者subscriber的onNext(),onCompleted(),onError()

二.操作符原理分析

我們以map操作符為例子萍启,使用如下

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        subscriber.onNext(123456);
        subscriber.onCompleted();
    }
}).map(new Func1<Integer, String>() {
    @Override
    public String call(Integer integer) {
        return "This is" + integer;
    }
}).subscribe(new Subscriber<String>() {
    @Override
    public void onCompleted() {}

    @Override
    public void onError(Throwable e) {}

    @Override
    public void onNext(String string) {
        Log.i("TAG","String=" + string);
    }
});

我們直接看map操作相關(guān)的代碼部分,如下

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
    return unsafeCreate(new OnSubscribeMap<T, R>(this, func));
}

public static <T> Observable<T> unsafeCreate(OnSubscribe<T> f) {
    return new Observable<T>(RxJavaHooks.onCreate(f));
}

從這里我們看見,是創(chuàng)建了一個(gè)新的被觀察者勘纯,然后這個(gè)新的被觀察者與觀察者建立關(guān)聯(lián)局服,這里與前面的觀察者建立的區(qū)別在于OnSubscribe成員變量的不同。

這里是一個(gè)新的OnSubscribe成員變量OnSubscribeMap驳遵,繼承于OnSubscribe淫奔,代碼如下:

public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {

    final Observable<T> source;
    final Func1<? super T, ? extends R> transformer;

    public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
        this.source = source;
        this.transformer = transformer;
    }

    @Override
    public void call(final Subscriber<? super R> o) {
        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
        o.add(parent);
        source.unsafeSubscribe(parent);
    }

    static final class MapSubscriber<T, R> extends Subscriber<T> {

        final Subscriber<? super R> actual;
        final Func1<? super T, ? extends R> mapper;
        boolean done;

        public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            R result;
            try {
                result = mapper.call(t);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }
            actual.onNext(result);
        }

        @Override
        public void onError(Throwable e) {
            ...
            actual.onError(e);
        }


        @Override
        public void onCompleted() {
            ...
            actual.onCompleted();
        }

        @Override
        public void setProducer(Producer p) {
            actual.setProducer(p);
        }
    }
}

我們看下OnSubscribeMap構(gòu)造函數(shù)的傳入的兩個(gè)變量,一個(gè)是之前創(chuàng)建的Observable(被觀察者)堤结,另外一個(gè)是我們新操作的Func1函數(shù)唆迁。

然后在調(diào)用subscribe()方法后就會(huì)觸發(fā)執(zhí)行OnSubscribeMap.call()方法,我們仔細(xì)看一下這個(gè)方法竞穷,

MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
source.unsafeSubscribe(parent);

首先是通過創(chuàng)建了觀察者o和轉(zhuǎn)換函數(shù)Func1構(gòu)造了一個(gè)新的觀察者M(jìn)apSubscriber唐责,接下來最后調(diào)用了source也就是的unsafeSubscribe()方法,而這個(gè)source通過前后關(guān)系瘾带,我們可以知道是之前創(chuàng)建的Observable(被觀察者)鼠哥。

接下來看一下unsafeSubscribe函數(shù)

public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
    try {
        // new Subscriber so onStart it
        subscriber.onStart();
        // allow the hook to intercept and/or decorate
        RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
        return RxJavaHooks.onObservableReturn(subscriber);
    } catch (Throwable e) {
        ...
        return Subscriptions.unsubscribed();
    }
}

這里是不是很熟悉,和前面說過的一樣看政,也就是調(diào)用了onSubscribe.call()方法朴恳,注意這里的onSubscribe是最早創(chuàng)建的Observable(被觀察者)的onSubscribe,而subscriber則是新構(gòu)建的觀察者M(jìn)apSubscriber帽衙。

所有當(dāng)onSubscribe.call()方法調(diào)用后菜皂,我們看下調(diào)用的方法贞绵,如下

subscriber.onNext(123456);
subscriber.onCompleted();

所有這里其實(shí)是調(diào)用MapSubscriber的onNext方法與onCompleted方法厉萝,我們分析下onNext方法,如下

@Override
public void onNext(T t) {
    R result;
    try {
        result = mapper.call(t);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        unsubscribe();
        onError(OnErrorThrowable.addValueAsLastCause(ex, t));
        return;
    }
    actual.onNext(result);
}

可以看到這里首先做了預(yù)處理mapper.call(t)榨崩,這里的mapper就是我們重寫的Func1中的方法谴垫。

所有其實(shí)我們執(zhí)行的順序是

1.MapSubscriber.onNext(123456)。

2.map中的call方法進(jìn)行處理母蛛。

3.執(zhí)行原始的觀察者的onNext(T)方法翩剪。

所以最后我們?cè)倏偨Y(jié)一下整個(gè)流程

1.調(diào)用crate()創(chuàng)建一個(gè)觀察者A,同時(shí)創(chuàng)建一個(gè)OnSubscribe作為該方法的入?yún)?/p>

2.調(diào)用map()創(chuàng)建一個(gè)觀察者B彩郊,同時(shí)創(chuàng)建一個(gè)OnSubscribeMap作為該方法入?yún)⑶巴洌瑒?chuàng)建OnSubscribeMap傳入觀察者A與轉(zhuǎn)換函數(shù)Func1

3.調(diào)用subscribe()來訂閱我們自己創(chuàng)建的觀察者Subscriber

4.一旦調(diào)用subscribe()方法后就會(huì)觸發(fā)執(zhí)行MapOnSubscribe.call(),這個(gè)方法內(nèi)創(chuàng)建新的觀察者M(jìn)apSubscriber秫逝,通過觀察者o和轉(zhuǎn)換函數(shù)Func1構(gòu)建

5.MapOnSubscribe.call()調(diào)用了unsafeSubscribe函數(shù)恕出,這里面調(diào)用了onSubscribe.call()方法,這里的onSubscribe是最早創(chuàng)建的Observable(被觀察者)的onSubscribe违帆,而subscriber則是新構(gòu)建的觀察者M(jìn)apSubscriber浙巫。

6.接下來就是執(zhí)行相關(guān)的傳遞工作,MapSubscriber.onNext方法,map中的call方法進(jìn)行處理的畴,執(zhí)行原始的觀察者的onNext(T)方法渊抄。

至此結(jié)束。

三.線程調(diào)到分析

RxJava其中一個(gè)很重要的特性就是對(duì)于線程調(diào)度想對(duì)來說比較容易丧裁,可以很方便的通過subscribeOn()和observeOn()來指定數(shù)據(jù)流的每一部分運(yùn)行在哪個(gè)線程护桦。

subscribeOn()指定了處理Observable(被觀察者)的全部的過程(包括發(fā)射數(shù)據(jù)和通知)的線程。

observeOn()指定了Subscriber(觀察者)的onNext(), onError()和onCompleted()執(zhí)行的線程渣慕。

我們首先看一下線程調(diào)度的代碼

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        subscriber.onNext(123456);
        subscriber.onCompleted();
    }
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
    @Override
    public void onCompleted() {}

    @Override
    public void onError(Throwable e) {}

    @Override
    public void onNext(String string) {
        Log.i("TAG","String=" + string);
    }
});
1.subscribeOn()源碼分析

經(jīng)過調(diào)用嘶炭,代碼如下

public final Observable<T> subscribeOn(Scheduler scheduler, boolean requestOn) {
    if (this instanceof ScalarSynchronousObservable) {
        return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
    }
    return unsafeCreate(new OperatorSubscribeOn<T>(this, scheduler, requestOn));
}

是不是看到了熟悉的一幕,接下來我們看下OperatorSubscribeOn這個(gè)類

public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

    final Scheduler scheduler;
    final Observable<T> source;
    final boolean requestOn;

    public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler, boolean requestOn) {
        this.scheduler = scheduler;
        this.source = source;
        this.requestOn = requestOn;
    }

    @Override
    public void call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();

        SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<T>(subscriber, requestOn, inner, source);
        subscriber.add(parent);
        subscriber.add(inner);

        inner.schedule(parent);
    }

    static final class SubscribeOnSubscriber<T> extends Subscriber<T> implements Action0 {
        ...
    }
}

所有在調(diào)用subscribe()方法后就會(huì)觸發(fā)執(zhí)行OperatorSubscribeOn.call()方法逊桦,這里調(diào)用了scheduler的createWorker方法

這里的scheduler是我們定義傳進(jìn)來的眨猎,即Schedulers.io()

public static Scheduler io() {
    return RxJavaHooks.onIOScheduler(getInstance().ioScheduler);
}

private Schedulers() {
    ...
    Scheduler io = hook.getIOScheduler();
    if (io != null) {
        ioScheduler = io;
    } else {
        ioScheduler = RxJavaSchedulersHook.createIoScheduler();
    }
    ...
}

經(jīng)過調(diào)用,發(fā)現(xiàn)我們實(shí)現(xiàn)的Schedulers是CachedThreadScheduler强经,接下來我們看下createWorker方法睡陪,如下

public Worker createWorker() {
    return new EventLoopWorker(pool.get());
}

這里返回了一個(gè)新的EventLoopWorker對(duì)象,而這個(gè)EventLoopWorker是CachedThreadScheduler的內(nèi)部類匿情。

接下來回去看call方法兰迫,我們會(huì)發(fā)現(xiàn)執(zhí)行了EventLoopWorker的schedule方法如下

 @Override
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
    if (innerSubscription.isUnsubscribed()) {
        // don't schedule, we are unsubscribed
        return Subscriptions.unsubscribed();
    }

    ScheduledAction s = threadWorker.scheduleActual(new Action0() {
        @Override
        public void call() {
            if (isUnsubscribed()) {
                return;
            }
            action.call();
        }
    }, delayTime, unit);
    innerSubscription.add(s);
    s.addParent(innerSubscription);
    return s;
}

這里執(zhí)行了scheduleActual方法,我們看一下這個(gè)方法的實(shí)現(xiàn)

public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
    Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
    ScheduledAction run = new ScheduledAction(decoratedAction);
    Future<?> f;
    if (delayTime <= 0) {
        f = executor.submit(run);
    } else {
        f = executor.schedule(run, delayTime, unit);
    }
    run.add(f);

    return run;
}

這個(gè)方法主要是將傳入的Action0包裝成ScheduledAction炬称,然后線程池運(yùn)行這個(gè)ScheduledAction汁果,那么這個(gè)ScheduledAction肯定是繼承Runable或者Callable的

找到ScheduledAction的run方法如下

@Override
public void run() {
    ...
    action.call();
    ...
}

這里調(diào)用的傳入Action0的call方法,我們看下定義的Action0的call方法玲躯,這個(gè)方法是調(diào)用了schedule方法時(shí)候傳入Action0的call方法

通過查看据德,我們知道這個(gè)Action0即是SubscribeOnSubscriber,接下來看一個(gè)這個(gè)SubscribeOnSubscriber的call方法

@Override
public void call() {
    Observable<T> src = source;
    source = null;
    t = Thread.currentThread();
    src.unsafeSubscribe(this);
}

這里的source是之前創(chuàng)建的Observable(被觀察者)跷车,接下來調(diào)用unsafeSubscribe方法棘利,如下

public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
    try {
        // new Subscriber so onStart it
        subscriber.onStart();
        // allow the hook to intercept and/or decorate
        RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
        return RxJavaHooks.onObservableReturn(subscriber);
    } catch (Throwable e) {
        ...
        return Subscriptions.unsubscribed();
    }
}

和前面操作符一致,這里的onSubscribe是最早創(chuàng)建的Observable(被觀察者)的onSubscribe朽缴,而subscriber則是新構(gòu)建的觀察者SubscribeOnSubscriber善玫。

所以這時(shí)候onSubscribe的call操作其實(shí)已經(jīng)是在異步的線程中執(zhí)行了。

2.observeOn()源碼分析
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末密强,一起剝皮案震驚了整個(gè)濱河市茅郎,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌或渤,老刑警劉巖系冗,帶你破解...
    沈念sama閱讀 217,734評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異劳坑,居然都是意外死亡毕谴,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,931評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來涝开,“玉大人循帐,你說我怎么就攤上這事∫ㄎ洌” “怎么了拄养?”我有些...
    開封第一講書人閱讀 164,133評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長银舱。 經(jīng)常有香客問我瘪匿,道長,這世上最難降的妖魔是什么寻馏? 我笑而不...
    開封第一講書人閱讀 58,532評(píng)論 1 293
  • 正文 為了忘掉前任棋弥,我火速辦了婚禮,結(jié)果婚禮上诚欠,老公的妹妹穿的比我還像新娘顽染。我一直安慰自己,他們只是感情好轰绵,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,585評(píng)論 6 392
  • 文/花漫 我一把揭開白布粉寞。 她就那樣靜靜地躺著,像睡著了一般左腔。 火紅的嫁衣襯著肌膚如雪唧垦。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,462評(píng)論 1 302
  • 那天液样,我揣著相機(jī)與錄音振亮,去河邊找鬼。 笑死,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的袖肥。 我是一名探鬼主播衷畦,決...
    沈念sama閱讀 40,262評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼摇锋!你這毒婦竟也來了丹拯?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,153評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤荸恕,失蹤者是張志新(化名)和其女友劉穎乖酬,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體融求,經(jīng)...
    沈念sama閱讀 45,587評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡咬像,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,792評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片县昂。...
    茶點(diǎn)故事閱讀 39,919評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡肮柜,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出倒彰,到底是詐尸還是另有隱情审洞,我是刑警寧澤,帶...
    沈念sama閱讀 35,635評(píng)論 5 345
  • 正文 年R本政府宣布待讳,位于F島的核電站芒澜,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏创淡。R本人自食惡果不足惜痴晦,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,237評(píng)論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望琳彩。 院中可真熱鬧阅酪,春花似錦、人聲如沸汁针。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,855評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽施无。三九已至辉词,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間猾骡,已是汗流浹背瑞躺。 一陣腳步聲響...
    開封第一講書人閱讀 32,983評(píng)論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留兴想,地道東北人幢哨。 一個(gè)月前我還...
    沈念sama閱讀 48,048評(píng)論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像嫂便,于是被迫代替她去往敵國和親捞镰。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,864評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容