RxJava源碼分析

從創(chuàng)建一個(gè)數(shù)組的Observable說(shuō)起:

public static <T> Observable<T> from(T[] array) {
    ......
    return create(new OnSubscribeFromArray<T>(array));
}

public static <T> Observable<T> create(OnSubscribe<T> f) {
    return new Observable<T>(RxJavaHooks.onCreate(f)); //RxJavaHooks提供了改寫(xiě)OnSubscribe的鉤子女蜈,可以認(rèn)為直接返回參數(shù)
}

protected Observable(OnSubscribe<T> f) {
    this.onSubscribe = f;
}

OnSubsribeFromArray:

public final class OnSubscribeFromArray<T> implements OnSubscribe<T> {
    final T[] array;
    public OnSubscribeFromArray(T[] array) {
        this.array = array;
    }

    @Override
    public void call(Subscriber<? super T> child) {
        child.setProducer(new FromArrayProducer<T>(child, array));
    }

當(dāng)調(diào)用Observable的subscribe時(shí)倘零,會(huì)創(chuàng)建一個(gè)ObserverSubscribe包裝Observer

    public final Subscription subscribe(final Observer<? super T> observer) {
        if (observer instanceof Subscriber) {
            return subscribe((Subscriber<? super T>)observer);
        }
        if (observer == null) {
            throw new NullPointerException("observer is null");
        }
        return subscribe(new ObserverSubscriber<T>(observer));
    }

    public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }

    static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
    
        // new Subscriber so onStart it
        subscriber.onStart();

        /*
         * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
         * to user code from within an Observer"
         */
        // if not already wrapped
        if (!(subscriber instanceof SafeSubscriber)) {
            // assign to `observer` so we return the protected version
            subscriber = new SafeSubscriber<T>(subscriber);
        }

        // The code below is exactly the same an unsafeSubscribe but not used because it would
        // add a significant depth to already huge call stacks.
        try {
            // allow the hook to intercept and/or decorate
            observable.onSubscribe.call(subscriber);
            return subscriber;
        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // in case the subscriber can't listen to exceptions anymore
            if (subscriber.isUnsubscribed()) {
                RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
            } else {
                // if an unhandled error occurs executing the onSubscribe we will propagate it
                try {
                    subscriber.onError(RxJavaHooks.onObservableError(e));
                } catch (Throwable e2) {
                    Exceptions.throwIfFatal(e2);
                    // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                    // so we are unable to propagate the error correctly and will just throw
                    RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                    // TODO could the hook be the cause of the error in the on error handling.
                    RxJavaHooks.onObservableError(r);
                    // TODO why aren't we throwing the hook's return value.
                    throw r; // NOPMD
                }
            }
            return Subscriptions.unsubscribed();
        }
    }

顯示調(diào)用subscriber的onStart,然后用SafeSubscriber包裝subscriber碟婆,SafeSubscriber是為了保證OnError和OnComplete只會(huì)執(zhí)行其中之一逞频,并且執(zhí)行后不再執(zhí)行onNext

接著調(diào)用OnSubscribeFromArray.call(subscriber)

    @Override
    public void call(Subscriber<? super T> child) {
        child.setProducer(new FromArrayProducer<T>(child, array));
    }

    static final class FromArrayProducer<T>
    extends AtomicLong
    implements Producer {
        /** */
        private static final long serialVersionUID = 3534218984725836979L;

        final Subscriber<? super T> child;
        final T[] array;

        int index;

        public FromArrayProducer(Subscriber<? super T> child, T[] array) {
            this.child = child;
            this.array = array;
        }

        @Override
        public void request(long n) {
            if (n < 0) {
                throw new IllegalArgumentException("n >= 0 required but it was " + n);
            }
            if (n == Long.MAX_VALUE) {
                if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                    fastPath();
                }
            } else
            if (n != 0) {
                if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                    slowPath(n);
                }
            }
        }

        void fastPath() {
            final Subscriber<? super T> child = this.child;

            for (T t : array) {
                if (child.isUnsubscribed()) {
                    return;
                }

                child.onNext(t);
            }

            if (child.isUnsubscribed()) {
                return;
            }
            child.onCompleted();
        }

        void slowPath(long r) {
            final Subscriber<? super T> child = this.child;
            final T[] array = this.array;
            final int n = array.length;

            long e = 0L;
            int i = index;

            for (;;) {

                while (r != 0L && i != n) {
                    if (child.isUnsubscribed()) {
                        return;
                    }

                    child.onNext(array[i]);

                    i++;

                    if (i == n) {
                        if (!child.isUnsubscribed()) {
                            child.onCompleted();
                        }
                        return;
                    }

                    r--;
                    e--;
                }

                r = get() + e;

                if (r == 0L) {
                    index = i;
                    r = addAndGet(e);
                    if (r == 0L) {
                        return;
                    }
                    e = 0L;
                }
            }
        }
    }

Subscriber.setProducer:如果包裝了其他subscriber,則轉(zhuǎn)交給其他subscriber任岸,如果沒(méi)有包裝subscriber再榄,沒(méi)有調(diào)用request,會(huì)調(diào)用p.request(Long.MAX_VALUE)享潜。如果之前調(diào)用了request困鸥,則調(diào)用p.request(n),n為累積的數(shù)量剑按。

    public void setProducer(Producer p) {
        long toRequest;
        boolean passToSubscriber = false;
        synchronized (this) {
            toRequest = requested;
            producer = p;
            if (subscriber != null) {
                // middle operator ... we pass through unless a request has been made
                if (toRequest == NOT_SET) {
                    // we pass through to the next producer as nothing has been requested
                    passToSubscriber = true;
                }
            }
        }
        // do after releasing lock
        if (passToSubscriber) {
            subscriber.setProducer(producer);
        } else {
            // we execute the request with whatever has been requested (or Long.MAX_VALUE)
            if (toRequest == NOT_SET) {
                producer.request(Long.MAX_VALUE);
            } else {
                producer.request(toRequest);
            }
        }
    }

FromArrayProducer.request(n)

        @Override
        public void request(long n) {
            if (n < 0) {
                throw new IllegalArgumentException("n >= 0 required but it was " + n);
            }
            if (n == Long.MAX_VALUE) {
                if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                    fastPath();
                }
            } else
            if (n != 0) {
                if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                    slowPath(n);
                }
            }
        }

如果n 為 Long.MAX_VALUE疾就,如果之前沒(méi)有請(qǐng)求過(guò)澜术,則調(diào)用fastPath

如果n不為0,則說(shuō)明之前調(diào)用了request猬腰,則要支持背壓鸟废,調(diào)用slowPath

        void fastPath() {
            final Subscriber<? super T> child = this.child;

            for (T t : array) {
                if (child.isUnsubscribed()) {
                    return;
                }

                child.onNext(t);
            }

            if (child.isUnsubscribed()) {
                return;
            }
            child.onCompleted();
        }

只要沒(méi)有unsubscribe,會(huì)對(duì)數(shù)組中每個(gè)元素調(diào)用Observer的onNext姑荷,然后調(diào)用Observer.onCompleted()

Scheduler

通過(guò)subscribeOn和observeOn可以改變操作符執(zhí)行的線程

subscribeOn只能調(diào)用一次盒延,表示subscribe操作所在的線程,第一次ObserveOn之前所以操作都在subscribeOn所指定的線程

observeOn可以調(diào)用多次厢拭,調(diào)用之后所有的操作都在observeOn指定的線程上進(jìn)行操作

subscribeOn兰英,創(chuàng)建一個(gè)OperatorSubscribeOn將原來(lái)的Observable包裝起來(lái)。

    public final Observable<T> subscribeOn(Scheduler scheduler) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        return create(new OperatorSubscribeOn<T>(this, scheduler));
    }

public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

    final Scheduler scheduler;
    final Observable<T> source;

    public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
        this.scheduler = scheduler;
        this.source = source;
    }

    @Override
    public void call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);

        inner.schedule(new Action0() {
            @Override
            public void call() {
                final Thread t = Thread.currentThread();

                Subscriber<T> s = new Subscriber<T>(subscriber) {
                    @Override
                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }

                    @Override
                    public void onError(Throwable e) {
                        try {
                            subscriber.onError(e);
                        } finally {
                            inner.unsubscribe();
                        }
                    }

                    @Override
                    public void onCompleted() {
                        try {
                            subscriber.onCompleted();
                        } finally {
                            inner.unsubscribe();
                        }
                    }

                    @Override
                    public void setProducer(final Producer p) {
                        subscriber.setProducer(new Producer() {
                            @Override
                            public void request(final long n) {
                                if (t == Thread.currentThread()) {
                                    p.request(n);
                                } else {
                                    inner.schedule(new Action0() {
                                        @Override
                                        public void call() {
                                            p.request(n);
                                        }
                                    });
                                }
                            }
                        });
                    }
                };

                source.unsafeSubscribe(s);
            }
        });
    }
}

從scheduler中create一個(gè)worker供鸠,調(diào)用worker的schedule執(zhí)行包裝Observable的subscribe方法畦贸。而且保證Producer的request方法在worker線程上調(diào)用。

RxJava中的Scheduler有

  • NewThreadScheduler楞捂,NewThreadWorker(包裝了一個(gè)ScheduledExecutorService)
  • ComputationScheduler:EventLoopsScheduler薄坏,根據(jù)cpu數(shù)量確定worker的數(shù)量,EventLoopWorker包裝了PoolWorker(實(shí)際是NewThreadWorker)
  • IOScheduler: CachedThreadScheduler,ThreadWorker: unsubscribe的ThreadWorker可以重用寨闹,長(zhǎng)期不用的會(huì)被清理胶坠。

Map操作符

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
    return create(new OnSubscribeMap<T, R>(this, func));
}

創(chuàng)建一個(gè)OnSubscribeMap包裝map函數(shù)和原Observable。

public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {

    final Observable<T> source;

    final Func1<? super T, ? extends R> transformer;

    public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
        this.source = source;
        this.transformer = transformer;
    }

    @Override
    public void call(final Subscriber<? super R> o) {
        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
        o.add(parent);
        source.unsafeSubscribe(parent);
    }

subscribe時(shí)會(huì)生成一個(gè)MapSubscriber包裝原來(lái)subscriber繁堡。

    static final class MapSubscriber<T, R> extends Subscriber<T> {

        final Subscriber<? super R> actual;

        final Func1<? super T, ? extends R> mapper;

        boolean done;

        public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            R result;

            try {
                result = mapper.call(t);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }

            actual.onNext(result);
        }

        @Override
        public void onError(Throwable e) {
            if (done) {
                RxJavaHooks.onError(e);
                return;
            }
            done = true;

            actual.onError(e);
        }


        @Override
        public void onCompleted() {
            if (done) {
                return;
            }
            actual.onCompleted();
        }

        @Override
        public void setProducer(Producer p) {
            actual.setProducer(p);
        }
    }

當(dāng)MapSubscriber的onNext方法被調(diào)用時(shí)沈善,先用map函數(shù)轉(zhuǎn)換一下,然后傳給包裝subscriber的onNext

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末椭蹄,一起剝皮案震驚了整個(gè)濱河市闻牡,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌绳矩,老刑警劉巖罩润,帶你破解...
    沈念sama閱讀 212,454評(píng)論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異翼馆,居然都是意外死亡割以,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,553評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門应媚,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)严沥,“玉大人,你說(shuō)我怎么就攤上這事中姜∠” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 157,921評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)莱找。 經(jīng)常有香客問(wèn)我,道長(zhǎng)嗜桌,這世上最難降的妖魔是什么奥溺? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,648評(píng)論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮骨宠,結(jié)果婚禮上浮定,老公的妹妹穿的比我還像新娘。我一直安慰自己层亿,他們只是感情好桦卒,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,770評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著匿又,像睡著了一般方灾。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上碌更,一...
    開(kāi)封第一講書(shū)人閱讀 49,950評(píng)論 1 291
  • 那天裕偿,我揣著相機(jī)與錄音,去河邊找鬼痛单。 笑死嘿棘,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的旭绒。 我是一名探鬼主播鸟妙,決...
    沈念sama閱讀 39,090評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼挥吵!你這毒婦竟也來(lái)了重父?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 37,817評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤蔫劣,失蹤者是張志新(化名)和其女友劉穎坪郭,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體脉幢,經(jīng)...
    沈念sama閱讀 44,275評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡歪沃,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,592評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了嫌松。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片沪曙。...
    茶點(diǎn)故事閱讀 38,724評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖萎羔,靈堂內(nèi)的尸體忽然破棺而出液走,到底是詐尸還是另有隱情,我是刑警寧澤,帶...
    沈念sama閱讀 34,409評(píng)論 4 333
  • 正文 年R本政府宣布缘眶,位于F島的核電站嘱根,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏巷懈。R本人自食惡果不足惜该抒,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,052評(píng)論 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望顶燕。 院中可真熱鬧凑保,春花似錦、人聲如沸涌攻。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,815評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)恳谎。三九已至芝此,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間惠爽,已是汗流浹背癌蓖。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,043評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留婚肆,地道東北人租副。 一個(gè)月前我還...
    沈念sama閱讀 46,503評(píng)論 2 361
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像较性,于是被迫代替她去往敵國(guó)和親用僧。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,627評(píng)論 2 350

推薦閱讀更多精彩內(nèi)容