Rxjava源碼解讀筆記:線程、map數(shù)據(jù)操作

一點(diǎn)牢騷:
前段時(shí)間捧毛,接到需求观堂,舊項(xiàng)目要增添許多功能;舊項(xiàng)目是這樣的:功能以及代碼量就非常龐大呀忧,加上各種代碼不規(guī)范师痕、可讀性很差、代碼耦合度有點(diǎn)小高而账;
聽(tīng)到這個(gè)消息真的讓我腦袋大了一圈胰坟,
如果真的要在原有架構(gòu)上做開(kāi)發(fā),肯定會(huì)導(dǎo)致小組成員開(kāi)發(fā)沖突以及眾多的冗余代碼泞辐,浪費(fèi)時(shí)間和精力在非必要的事情上笔横,之前自身也知道舊項(xiàng)目有這個(gè)問(wèn)題 但由于新項(xiàng)目開(kāi)發(fā)呀嫌棄舊項(xiàng)目一直沒(méi)有決心去改動(dòng),這下好了完全推不了 那就改架構(gòu)吧咐吼,新的模式是 組件化+Rxjava.Retrofit+MVP模式吹缔,最近一直在忙著項(xiàng)目代碼架構(gòu)調(diào)整,相對(duì)應(yīng)的代碼模板編寫(xiě)等等锯茄,雖然說(shuō)改架構(gòu)是被逼的厢塘,但改著改著還是有成長(zhǎng)以及很有成就感的一件事情; 再接再厲肌幽。


說(shuō)實(shí)話晚碾,rxjava的源碼太難了,一直沒(méi)有去時(shí)間(懶癌)去學(xué)習(xí)牍颈; 包括現(xiàn)在項(xiàng)目比較緊張迄薄,每天下班后更是不太想去學(xué)習(xí),那么現(xiàn)在我就和大家一起看一下rxjava的源碼吧;

1、正常簡(jiǎn)易流程煮岁;
2讥蔽、帶線程切換流程;
3画机、map之后冶伞;
4、一些總結(jié)

1步氏、正常簡(jiǎn)易流程

基于以下這段代碼查看源碼

   Observable.just("11")
            .subscribe(observer);

大家應(yīng)該都知道或者聽(tīng)過(guò)响禽,Rxjava采用的是 增強(qiáng)版的觀察者模式,在訂閱的那一瞬間開(kāi)始執(zhí)行整個(gè)流程,那么現(xiàn)在看一下訂閱方法subscribe(Observer<? super T> observer)

Observable.class
    @Override
    public final void subscribe(Observer<? super T> observer) {
        //..
        // 實(shí)際訂閱
            subscribeActual(observer);
        //...
    }
    
    
RxJavaPlugins.class
    public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
        BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
        
        if (f != null) {
            return apply(f, source, observer);
        }
        return observer;
    }
    
    static <T, U, R> R apply(@NonNull BiFunction<T, U, R> f, @NonNull T t, @NonNull U u) {
        try {
            return f.apply(t, u);
        } catch (Throwable ex) {
            throw ExceptionHelper.wrapOrThrow(ex);
        }
    }

看到這里實(shí)際訂閱是發(fā)生在 observable 的 subscribeActual 中 而 subscribeActual是個(gè)抽象方法芋类; 那么我們又要去找它的實(shí)現(xiàn)隆嗅;
這邊通過(guò)Observable.just開(kāi)始看

Observable.calss

  public static <T> Observable<T> just(T item) {
        //...
        return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
    }

ObservableJust.class
    
    protected void subscribeActual(Observer<? super T> s) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
        //調(diào)用 observer的 onSubsribe方法
        s.onSubscribe(sd);
        //執(zhí)行
        sd.run();
    }
    
ScalarDisposable.calss
   public void run() {
   // 判斷什么的
    if (get() == START && compareAndSet(START, ON_NEXT)) {
    // 
        observer.onNext(value);
        if (get() == ON_NEXT) {
            lazySet(ON_COMPLETE);
            observer.onComplete();
        }
    }
}

可以看到 run是直接執(zhí)行的;
整體的一個(gè)簡(jiǎn)單正常的流程就是: observable.subscribe(Observer) -> observable.subscribeActual -> Observer.onSubscribe( Disposable ) -> ScalarDisposable.run -> observer.onNext(value) -> observer.onComplete();

簡(jiǎn)易源碼流程——01

其中正常完整流程都會(huì)執(zhí)行標(biāo)紅部分的方法;其中其它部分先放著,只是判斷有沒(méi)有完成完成所有數(shù)據(jù)流的發(fā)射

2侯繁、線程切換流程

基于以下這段代碼查看源碼

Observable.just("11")
        .subscribeOn(Schedulers.io())//指定Observable 在哪個(gè)線程上創(chuàng)建執(zhí)行操作
        .observeOn(AndroidSchedulers.mainThread()) //在指定下一事件發(fā)生的線程
        .subscribe(observer);

2.1胖喳、 流向 Observable.subscribe 都經(jīng)歷了什么

先看下 Observable.subscribeOn都做了些什么

Observable.class
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        //
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
    
ObservableSubscribeOn.class    本質(zhì)上繼承 Observable
    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        //保存以及初始化
        super(source);
        this.scheduler = scheduler;
    }

可以看就就是轉(zhuǎn)換變成了 ObservableSubscribeOn

再看下 Observable.observeOn(Scheduler scheduler) 做了些什么

Observable.class  這邊應(yīng)該是: ObservableSubscribeOn extends .... Observable
    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

ObservableObserveOn.class 
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
    super(source);
    this.scheduler = scheduler;
    this.delayError = delayError;
    this.bufferSize = bufferSize;
}

本質(zhì)是將Observable轉(zhuǎn)換成ObservableObserveOn ,在這個(gè)流程中是將 ObservableSubscribeOn 轉(zhuǎn)換成ObservableObserveOn;

我們的Observable變換是這樣子的,ObservableJust->ObservableSubscribeOn->ObservableObserveOn
一層一層被包含

Obserable轉(zhuǎn)換

2.2贮竟、流向 -> Observer.onSubscribe 都經(jīng)歷了什么

那么又到了我們的 訂閱方法subscribe(Observer<? super T> observer)了,只不過(guò)我們中間多了幾層轉(zhuǎn)換; 我們?cè)賮?lái)看一下

Observable.class
    @Override
    public final void subscribe(Observer<? super T> observer) {
        //...
        // 實(shí)際訂閱
            subscribeActual(observer);//...
    }
ObservableObserveOn.class
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
        //創(chuàng)建一個(gè)  Scheduler.Worker
            Scheduler.Worker w = scheduler.createWorker();
        //   new一個(gè)新的 ObserveOnObserver implements Observer 再次循環(huán)  Observable.subscribe
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    
ObserveOnObserver.class .... implements Observer<T>, Runnable
    
Observable.class
    @Override
    public final void subscribe(Observer<? super T> observer) {
        //..
        // 實(shí)際訂閱
            subscribeActual(observer);
        //...
    }

ObservableSubscribeOn.class 
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        //直接執(zhí)行丽焊,what? Observer.onSubscribe 不能指定線程   
        // 記錄一下   Observer.onSubscribe 的入口是
        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

Observer的轉(zhuǎn)變是這樣的 Observer->ObserveOnObserver->SubscribeOnObserver

以上面為準(zhǔn),先看下 s.onSubscribe(parent)所經(jīng)歷的事情

ObserveOnObserver.class 
    @Override
    public void onSubscribe(Disposable s) {
        if (DisposableHelper.validate(this.s, s)) {
            this.s = s;
            if (s instanceof QueueDisposable) {
                @SuppressWarnings("unchecked")
                QueueDisposable<T> qd = (QueueDisposable<T>) s;

                int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

                if (m == QueueDisposable.SYNC) {
                    sourceMode = m;
                    queue = qd;
                    done = true;
                    actual.onSubscribe(this);
                    schedule();
                    return;
                }
                if (m == QueueDisposable.ASYNC) {
                    sourceMode = m;
                    queue = qd;
                    actual.onSubscribe(this);
                    return;
                }
            }

            queue = new SpscLinkedArrayQueue<T>(bufferSize);

//   看這里  actual 其實(shí)是  Observer ;
            actual.onSubscribe(this);
        }
    }
    
Observer.class 
    onSubscribe(sd){...}

這里究竟可以看到 執(zhí)行到 最初observeronSubscribe的一條完整的線路咕别;
ObserveOnObserver.subscribeActual -> ObservableSubscribeOn.subscribeActual -> ObserveOnObserver.onSubscribe -> Observer.onSubscribe ;
不知道有沒(méi)有細(xì)心的同學(xué)發(fā)現(xiàn)了沒(méi)有技健,'onSubscribe'的執(zhí)行沒(méi)有SubscribeOnObserver什么事情,雖然說(shuō)上面有一層轉(zhuǎn)換成功了SubscribeOnObserver
畫(huà)成圖應(yīng)該就是下面這樣:

onSubscribe執(zhí)行鏈

我們發(fā)現(xiàn)了 從訂閱開(kāi)始一直到執(zhí)行我們的 observer.onSubscribe() 中間沒(méi)有任何切換線程的影子惰拱;
所以我們得出了一個(gè)

observer的 onSubscribe 運(yùn)行與訂閱動(dòng)作發(fā)生在同一線程雌贱,不受線程指定方法(observeOn subscribeOn)影響

2.3、流向 -> observer.next弓颈、onComplete 都經(jīng)歷了什么

ObservableSubscribeOn.class 
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        s.onSubscribe(parent);

//      new 出一個(gè)  SubscribeTask
//      scheduler.scheduleDirect 切換線程執(zhí)行  SubscribeTask
//      SubscribeOnObserver.setDisposable方法

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

可以看到上面最后一段代碼做個(gè)這樣事情帽芽,一件一件去看一下:

// new 出一個(gè) SubscribeTask
// scheduler.scheduleDirect 切換線程執(zhí)行 SubscribeTask
// SubscribeOnObserver.setDisposable方法

先看一下SubscribeTaskrun 里面是干嘛的

ObservableSubscribeOn.class
    class SubscribeOnObserver
        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }
        
    class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            //其中  source 是  ObservableJust  
            source.subscribe(parent);
        }
    }

由第一節(jié)的分析我們可以知道删掀,這邊最終會(huì)執(zhí)行到 SubscribeOnObserver.onNext() -> ObserveOnObserver.onNext()->Observer.onNext() 這邊一層一層調(diào)用出來(lái);

SubscribeTask.run 最終執(zhí)行我們的 最初observer.onNext() onComplete(); 這邊還沒(méi)有涉及到線程切換

再看我們的 scheduler.scheduleDirect(new SubscribeTask)
我們上面用的是 Scheduler.IO 實(shí)際上是 IoScheduler;

IoScheduler extends Scheduler.class
    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();    

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        //指定工作線程
        w.schedule(task, delay, unit);

        return task;
    }
    @Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }
    
    EventLoopWorker extends Scheduler.Worker 
       @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }

            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    

那么這邊流程就比較清晰了翔冀,拿到subscribeOn 設(shè)置的Scheduler中創(chuàng)建一個(gè)Worker 設(shè)定了一個(gè) IO 線程;
看到這里 我們就該逆向地執(zhí)行我們 Observer 真正的方法了;
執(zhí)行到 SubscribeOnObserver.onNext()

ObservableSubscribeOn : SubscribeOnObserver<T> 
        @Override
        public void onNext(T t) {
        // actual 為 ObserveOnObserver
            actual.onNext(t);
        }
//  scheduler  這邊指定為  AndroidSchedulers.mainThread()    createWorker() 這邊不深究,里面轉(zhuǎn)成了 handler
Scheduler.Worker worker = scheduler.createWorker();
ObservableObserveOn : ObserveOnObserver
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }
        void schedule() {
            if (getAndIncrement() == 0) {
            // 這個(gè)最終 執(zhí)行在handler
                worker.schedule(this);
            }
        }

最后的流程應(yīng)該是這樣的


線程切換

3披泪、map 數(shù)據(jù)操作源碼

Observable.just(1)
        .map(new Function<Integer, Integer>() {
            @Override
            public Integer apply(@NonNull Integer integer) throws Exception {
                return null;
            }
        }).subscribe(integer -> out("accept:" + integer));
Observable.class
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }
    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
    
   MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }

            U v;

            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            actual.onNext(v);
        }

可以看到 它是在 執(zhí)行完 function.apply在執(zhí)行 onNext();
配合上一節(jié) 纤子,流程圖就變成這樣了


加了map以后的流程

4、一些總結(jié)

來(lái)個(gè)總結(jié)吧: 估計(jì)源碼看得很混亂款票。

1控硼、對(duì)Observable指定線程、數(shù)據(jù)變換等等艾少,都采用了一種代理包裝模式卡乾; 比如 ObservableJust-> ObservableSubscribeOn -> ObservableMap -> ObservableObserveOn ; 進(jìn)行了一層包裝;
2缚够、在訂閱完成的那一刻起幔妨,反向調(diào)用 subscribe():subscribeActual()方法;比如 :(ObservableObserveOn.subscribe->ObservableObserveOn.subscribeActual())->(ObservableMap.subscribe->ObservableMap.subscribeActual())->(ObservableSubscribeOn.subscribe->ObservableSubscribeOn.subscribeActual())->(ObservableJust.subscribe->ObservableJust.subscribeActual())
3谍椅、Observer 误堡,同理包裝 Observer -> ObservableMap... 添加了指定 Schedulers.createWorker() ;
4、 Observer 的執(zhí)行順序是 Observer.onSubscribe() -> ObservableXX.onNext() -> ObsevableXXX.onNext() ->...-> Obsever.onNext() -> ObservableXX.OnComplete() -> ObsevableXXX.OnComplete() ->...-> Obsever.OnComplete();
5雏吭、 中間有些操作放入到了線程當(dāng)中.

其實(shí)有點(diǎn)坑的是:原本我就知道這個(gè)流程應(yīng)該是這樣的锁施,類似于事件分發(fā)機(jī)制成 U 字型的流程...... 本篇只是在 眾多代碼 中驗(yàn)證我的思路.................、

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市悉抵,隨后出現(xiàn)的幾起案子肩狂,更是在濱河造成了極大的恐慌,老刑警劉巖姥饰,帶你破解...
    沈念sama閱讀 219,039評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件婚温,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡媳否,警方通過(guò)查閱死者的電腦和手機(jī)栅螟,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,426評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)篱竭,“玉大人力图,你說(shuō)我怎么就攤上這事〔舯疲” “怎么了吃媒?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,417評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)吕喘。 經(jīng)常有香客問(wèn)我赘那,道長(zhǎng),這世上最難降的妖魔是什么氯质? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,868評(píng)論 1 295
  • 正文 為了忘掉前任募舟,我火速辦了婚禮,結(jié)果婚禮上闻察,老公的妹妹穿的比我還像新娘拱礁。我一直安慰自己,他們只是感情好辕漂,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,892評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布呢灶。 她就那樣靜靜地躺著,像睡著了一般钉嘹。 火紅的嫁衣襯著肌膚如雪鸯乃。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,692評(píng)論 1 305
  • 那天跋涣,我揣著相機(jī)與錄音缨睡,去河邊找鬼。 笑死仆潮,一個(gè)胖子當(dāng)著我的面吹牛宏蛉,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播性置,決...
    沈念sama閱讀 40,416評(píng)論 3 419
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼拾并,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起嗅义,我...
    開(kāi)封第一講書(shū)人閱讀 39,326評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤屏歹,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后之碗,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體蝙眶,經(jīng)...
    沈念sama閱讀 45,782評(píng)論 1 316
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,957評(píng)論 3 337
  • 正文 我和宋清朗相戀三年褪那,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了幽纷。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,102評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡博敬,死狀恐怖友浸,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情偏窝,我是刑警寧澤收恢,帶...
    沈念sama閱讀 35,790評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站祭往,受9級(jí)特大地震影響伦意,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜硼补,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,442評(píng)論 3 331
  • 文/蒙蒙 一驮肉、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧括勺,春花似錦缆八、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,996評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)栏妖。三九已至乱豆,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間吊趾,已是汗流浹背宛裕。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,113評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留论泛,地道東北人揩尸。 一個(gè)月前我還...
    沈念sama閱讀 48,332評(píng)論 3 373
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像屁奏,于是被迫代替她去往敵國(guó)和親岩榆。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,044評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容