RxJava詳解-線程切換原理篇

概要

RxJava最神秘的部分莫過于此,我為了編寫這篇文章也是一遍一遍的查看源碼尋找它的運行原理秩霍,同樣的也查閱了很多的相關(guān)資料,但是慚愧的是并未找到實質(zhì)性有用的資料蚁阳,很多都是避開這話題避而不談铃绒,在此我們詳細的對它進行剖析。
不得不說這個框架編寫的真的很棒螺捐,相關(guān)類也是錯綜復(fù)雜颠悬,不過作為程序員我們必須拿出積極的態(tài)度來學(xué)習它的實現(xiàn),以此提高自身的價值定血;好了廢話不多說我們馬上開始赔癌。

源碼剖析

我們先來看一下外部的實現(xiàn)調(diào)用

public void onMineTask() {
        //聲明一個ObservableCreate類型的 被觀察者對象
        Observable mObservable = new ObservableCreate(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter e) throws Exception {
                e.onNext("ONE");
                e.onNext("TWO");
                e.onNext("THREE");
                e.onNext("FOUR");
            }
        });
        //聲明一個Observer類型的觀察者對象
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                disposable = d;
                Log.e("回調(diào)執(zhí)行了onSubscribe函數(shù)->", "觀察者已成功訂閱");
            }

            @Override
            public void onNext(String value) {
                if (disposable.isDisposed())
                    onComplete();

                value = mine_result.getText().toString() + "\n" + value;
                mine_result.setText(value);
                Log.e("回調(diào)執(zhí)行了onNext函數(shù)->", value);

            }

            @Override
            public void onError(Throwable e) {
                Log.e("回調(diào)執(zhí)行了onError函數(shù)->", e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.e("回調(diào)執(zhí)行了onComplete函數(shù)->", "本次結(jié)束!");
            }
        };
       //指定被觀察者和觀察者運行的線程
       //指定 被觀察者線程      ObservableSubscribeOn類型
        mObservable.subscribeOn(Schedulers.io())  
                 //指定 觀察者線程    ObservableObserveOn類型
                .observeOn(AndroidSchedulers.mainThread()) 
                 //事件發(fā)布
                .subscribe(observer);
    }

我們通過上述的代碼可以看出mObservable它是ObservableCreate類型,通過他調(diào)取了[1]subscribeOn(Schedulers.io())來指定被觀察者的執(zhí)行線程澜沟,并返回一個Observable類型的返回值灾票,當然這個Observable類型是個抽象類型;
而獲取到Observable類型的返回值之后茫虽,我們有調(diào)用了[2]observeOn(AndroidSchedulers.mainThread())函數(shù)方法刊苍,實現(xiàn)了對觀察者執(zhí)行線程的指定,此時我們得到返回值依舊是Observable類型濒析;
最終我們依舊使用Observable類型的返回值正什,通過調(diào)取[3]subscribe(observer)函數(shù)實現(xiàn)主題發(fā)布,并攜帶著observer觀察者實例号杏。
那么我們接下來婴氮,就按序講解他們的執(zhí)行的過程:

[ 1 ] subscribeOn(Schedulers.io())函數(shù)方法,源碼解析

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        //此處查Null操作
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        //這里是一個鉤子函數(shù)馒索,在無擴展性特殊操作情況下莹妒,
        //原封不動的返回一個ObservableSubscribeOn類型的值,
        //而由于它的基類是 Observable類型绰上,所以我們直接將其基類類型
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

我們來看RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler))函數(shù)方法旨怠,它其實是一個鉤子函數(shù),在無擴展性特殊操作情況下蜈块,會原封不動的返回一個ObservableSubscribeOn類型的值鉴腻,而由于它的基類是 Observable類型,所以我們直接將其基類類型返回百揭。

而我們重點來看這方法中的參數(shù)new ObservableSubscribeOn<T>(this, scheduler)爽哎,它是聲明了一個ObservableSubscribeOn類型對象,并傳入了一個Observable類型的被觀察者器一,但由于Observable實現(xiàn)了ObservableSource的接口课锌,而我們也只需要ObservableSource這部分內(nèi)容,所以最終聲明ObservableSubscribeOn類型實例,我們傳入的是一個ObservableSource類型參數(shù)和一個指定運行線程的Scheduler類型參數(shù).

ObservableSubscribeOn類中我們對subscribeActual(final Observer<? super T> observer)抽象方法進行了實現(xiàn).但目前還未進行調(diào)用

[ 2 ]observeOn(AndroidSchedulers.mainThread())函數(shù)方法,源碼解析

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler) {
        //調(diào)用其自身的多態(tài)方法
        return observeOn(scheduler, false, bufferSize());
    }

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        //省略...
        //鉤子函數(shù)渺贤,若無擴展性特殊邏輯實現(xiàn)依舊返回一個`ObservableObserveOn`類型的值雏胃,
        //由于`Observable`是其基類,所以這里直接將其基類類型返回值
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

此方法與[ 1 ]中的方法一樣這里不再贅述志鞍,我們的重點是其參數(shù)new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)瞭亮,它返回是一個ObservableObserveOn類型的值,但由于它的基類是Observable類型固棚,所以此處直接按基類類型返回统翩,它攜帶參數(shù)包括
ObservableSource類型的thisScheduler類型的執(zhí)行線程此洲;boolean類型的延遲錯誤厂汗,指示onError通知是否不能在另一側(cè)的onNext通知之前切換;int類型的緩沖區(qū)大小.

ObservableObserveOn類同樣對`subscribeActual(final Observer<? super T> observer)抽象方法進行了實現(xiàn).但目前還未進行調(diào)用.

[ 3 ]subscribe(observer)函數(shù)實現(xiàn)主題發(fā)布,攜帶著觀察者參數(shù)呜师。源碼解析:

 @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
       //省略...
            //調(diào)取 subscribeActual抽象方法
            subscribeActual(observer);
        } 
      //省略...
    }

我們來看subscribeActual(observer)這個函數(shù)面徽,我們的被觀察者的ObservableSubscribeOn和觀察者的ObservableObserveOn分別都重寫了此函數(shù),那么此處的subscribeActual(observer)將進入哪個類呢匣掸?
我們將外部的調(diào)用鏈式表達分解一下來看:

     Observable observableSubscribeOn1 = mObservable.subscribeOn(Schedulers.io()); //第一次
        //指定 觀察者線程    ObservableObserveOn類型
        Observable observeObserveOn2 = observableSubscribeOn1.observeOn(AndroidSchedulers.mainThread());
        //發(fā)布執(zhí)行
        observeObserveOn2.subscribe(observer);

很清晰的我們能看到,最終對subscribe(observer)函數(shù)方法發(fā)起調(diào)用的是觀察者ObservableObserveOn類,所以我們將會執(zhí)行ObservableObserveOn類中的subscribeActual(observer)實現(xiàn)

 public ObservableObserveOn(ObservableSource<T> source, Scheduler 
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
     //省略...
            //創(chuàng)建一個線程氮双,當前創(chuàng)建出來的是一個Handler線程
            Scheduler.Worker w = scheduler.createWorker();
            //此處我們調(diào)用上一個Observable類型對象的發(fā)布事件
            //即 被觀察者對象的發(fā)布事件
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

首先明確一點此處的source對象是被觀察者的Observable類型實例碰酝,由于在指定運行線程時我們都傳入一個Observable類型對象,而傳遞的Observable都是上一個對象的實例戴差。
此處的參數(shù)new ObserveOnObserver<T>(observer, w, delayError, bufferSize)是聲明的ObservableObserveOn類的靜態(tài)內(nèi)部類的實例送爸。

    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {
    ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.downstream = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
}

可以看出它實現(xiàn)了Observer接口,而我們在source.subscribe(...)方法中也僅需要這部分內(nèi)容暖释,至此我們又回到了Observable類中的subscribe(Observer<? super T> observer)函數(shù)方法袭厂。

public final void subscribe(Observer<? super T> observer) {
       //省略...
            subscribeActual(observer);
       //省略...

而再次調(diào)用subscribeActual(observer)函數(shù)方法,不會再進入到ObservableObserveOn類中球匕,而是進入ObservableSubscribeOn類纹磺,執(zhí)行其subscribeActual(final Observer<? super T> observer)實現(xiàn)方法

@Override
    public void subscribeActual(final Observer<? super T> observer) {
  //將觀察者包裝成ObservableObserveOn類型
  final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
        //[ 3.1 ]調(diào)用此方法會進入觀察者`ObservableObserveOn`類中的實現(xiàn)方法
        //可以看出此處還沒進行被觀察者的線程指定
        observer.onSubscribe(parent);
        //[ 3.2 ]在此處設(shè)置被觀察者的運行線程
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

[ 3.1 ] 我們來看看observer.onSubscribe(parent),它在ObservableObserveOn類的內(nèi)部類ObserveOnObserveronSubscribe(Disposable d)做了什么

 @Override
        public void onSubscribe(Disposable d) {
               //省略...
                downstream.onSubscribe(this);
            }
        }

downstream對象使我們在創(chuàng)建ObserveOnObserver類型實例時傳入的Observer類型實例亮曹,即觀察者實例橄杨,而它調(diào)取onSubscribe(this)方法后直接回調(diào)至我們的外部實現(xiàn)中所創(chuàng)建的觀察者對象的onSubscribe(Disposable d)中,通知觀察者訂閱成功照卦。

[ 3.2 ] 接下來分析一下parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)))方法是如何指定被觀察者運行線程的,

先來分析他參數(shù)實現(xiàn)scheduler.scheduleDirect(new SubscribeTask(parent)),

[ 3.2.1 ] 來看new SubscribeTask(parent)做了什么

//實現(xiàn)Runnable 接口
final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
             //調(diào)取事件發(fā)布事件,而此處的source是`ObservableCreate`類型
            source.subscribe(parent);
        }
    }

此處實現(xiàn)了Runnable接口式矫,并在重寫的run()方法中調(diào)取了source.subscribe(parent),首先明確此時的source對象它的類型是誰?答案是ObservableCreate役耕,因為在創(chuàng)建ObservableSubscribeOn類型實例時傳入的ObservableSource類型參數(shù)是上一個對象采转,所以在執(zhí)行subscribe(parent)方法后我們會通過subscribeActual(observer)方法函數(shù),進入到ObservableCreate類中對subscribeActual(observer)方法實現(xiàn)瞬痘;而在此處我們還未進行有效的調(diào)用.

[ 3.2.2 ] 分析完SubscribeTask后我們接著分析他外層scheduler.scheduleDirect(new SubscribeTask(parent))方法做了什么

   @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
  @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        //創(chuàng)建一個Worker線程,
        final Worker w = createWorker();
         //鉤子函數(shù),若無擴展性特殊處理則返回參數(shù)本身
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        //聲明一個處理任務(wù)故慈,將Runnable和Work封裝成DisposeTask
        DisposeTask task = new DisposeTask(decoratedRun, w);
        //調(diào)取woker對象的schedule方法
        w.schedule(task, delay, unit);
        return task;
    }

<3.2.2.1> 我們先調(diào)用了createWorker()函數(shù)創(chuàng)建了一個EventLoopWorker類型對象板熊,利用它實現(xiàn)線程調(diào)度;而EventLoopWorker繼承于Scheduler.Worker抽象類

static final class EventLoopWorker extends Scheduler.Worker {
       //省略... 
         //結(jié)束操作
        @Override
        public void dispose() {
            if (once.compareAndSet(false, true)) {
                tasks.dispose();
                // 釋放線程池操作
                pool.release(threadWorker);
            }
        }
        //是否結(jié)束
        @Override
        public boolean isDisposed() {
            return once.get();
        }
       //重寫進程調(diào)度
        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }
            //執(zhí)行線程調(diào)度執(zhí)行
            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    }

<3.2.2.2> 然后聲明了一個DisposeTask類型對象惯悠,將Runnable對象和線程調(diào)度器包裝在一起邻邮,而DisposeTask類實現(xiàn)了Runnable接口,重寫了run()函數(shù)方法克婶,

static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
      //省略...
        @Override
        public void run() {
            runner = Thread.currentThread();
            try {
                 //執(zhí)行Runnable
                decoratedRun.run();
            } finally {
                dispose();
                runner = null;
            }
        }

        //省略...
    }

<3.2.2.3> 完成上述預(yù)備工作后筒严,我們開始調(diào)用w.schedule(task, delay, unit)方法進行任務(wù)執(zhí)行

     @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
           //判定任務(wù)是否成功被訂閱
            if (tasks.isDisposed()) {
                // 若沒有被訂閱,則不作任何處理
                return EmptyDisposable.INSTANCE;
            }
            //進行線程調(diào)度
            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    }

<3.2.2.4> 調(diào)用 scheduleActual()進行線程任務(wù)調(diào)度

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
 //      省略...
        try {
//      線程提交
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }

線程被提交后,會交給Executor來執(zhí)行情萤,調(diào)用Runnable接口的run方法鸭蛙,那么這就簡單了.

小結(jié)

在上述操作中我們成功的將[被觀察者]提交給了指定的線程去執(zhí)行,那么接下來就是線程向上執(zhí)行調(diào)取被觀察者run()的流程了筋岛,別頭大娶视,這個框架的復(fù)雜度就是那么繞~~~,想要成長我們就得邁過這道坎,畢竟成長的就是這么痛苦U鲈住肪获!
這里我建議一下,閱讀這樣復(fù)雜的框架的時候我們最好在Debug模式下,跟隨著框架流程進行學(xué)習····好了休息一下吧柒傻!我們馬上開始下半場的學(xué)習孝赫。

[ 3.3 ]我們在<3.2.2.4>()中執(zhí)行的scheduleActual()中將Runnable封裝成了ScheduledRunnable類型,那么首先執(zhí)行進入它的里面,看看它的run()方法做了什么詭譎的操作.

public void run() {
        lazySet(THREAD_INDEX, Thread.currentThread());
        try {
            try {
                //獲取的上層的Runnable對象,執(zhí)行其run方法
                actual.run();
            } catch (Throwable e) {
               //若發(fā)生異常,則直接調(diào)用onError通知觀察者
                RxJavaPlugins.onError(e);
            }
        //省略....
    }

可以看到ScheduledRunnable類中的run()并未做什么特殊操作,而是執(zhí)行了上層的Runnable類型對象的run()方法红符,那么它的上層是誰呢青柄?

[ 3.4 ] 目光轉(zhuǎn)向[ 3.2.2 ]的源碼,我們將其中把RunnableWork類型對象關(guān)聯(lián)了起來预侯,并封裝成了一個DisposeTask類型對象致开,那么廢話不多說我們直接去看看其中的源碼

  @Override
        public void run() {
            runner = Thread.currentThread();
            try {
                decoratedRun.run();
            } finally {
                dispose();
                runner = null;
            }
        }

[ 3.5 ]依舊是繼續(xù)向上調(diào)取,我最終回到了ObservableSubscribeOn類中內(nèi)部類SubscribeOnObserver中的內(nèi)部類的SubscribeTaskrun()方法萎馅,饒了這么大一圈最終還是回到了最初始調(diào)取的地方才有實質(zhì)的處理操作_!! 來看看吧

 final class SubscribeTask implements Runnable {
//      省略...
        @Override
        public void run() {
            //調(diào)取被觀察者的 subscribe方法
            source.subscribe(parent);
        }
    }

[ 3.6 ] 此時的source就是外部第一次聲明的Observable類型對象的實例了双戳,即ObservableCreate類,那么我們將再次回到Observable類中執(zhí)行其subscribe(Observer<? super T> observer)函數(shù)糜芳,老套路來看吧拣技!

 public final void subscribe(Observer<? super T> observer) {
        //省略...
            subscribeActual(observer);
      //省略...
    }

[ 3.7 ]我們又一次要執(zhí)行subscribeActual(observer)方法了,那么這次我們要進入哪個類呢耍目?猜對了就是ObservableCreate

 @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //將觀察者封裝成CreateEmitter類型
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
         //回調(diào)觀察者的接口膏斤,告訴觀察者訂閱成功,
         //并將新封裝的觀察者傳遞過去
        observer.onSubscribe(parent);
        try {
            //回調(diào)外部被觀察者的接口,告訴被觀察者主題已被訂閱,
           //可進行接下來相關(guān)操作了,并將最新封裝的觀察者對象傳入
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

[ 3.8 ] 終于出去了=_=!! 我們首先通知觀察者成功訂閱主題邪驮,然后再告訴被觀察者被成功訂閱,可以新進行接下來的主題發(fā)布了!!

 Observable mObservable = new ObservableCreate(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter e) throws Exception {
                //[ 4 ]發(fā)布下一個
                e.onNext("ONE");
                e.onNext("TWO");
                e.onNext("THREE");
                e.onNext("FOUR");
            }
        });
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("回調(diào)執(zhí)行了onSubscribe函數(shù)->", "觀察者已成功訂閱");
            }
            @Override
            public void onNext(String value) {
                Log.e("回調(diào)執(zhí)行了onNext函數(shù)->", value);
            }
            @Override
            public void onError(Throwable e) {
                Log.e("回調(diào)執(zhí)行了onError函數(shù)->", e.getMessage());
            }
            @Override
            public void onComplete() {
                Log.e("回調(diào)執(zhí)行了onComplete函數(shù)->", "本次結(jié)束!");
            }
        };

[ 3.8.1 ]調(diào)取ObservableEmitter類型的觀察者的onNext(Object value)方法

 @Override
        public void onNext(T t) {
          //省略...
            if (!isDisposed()) {
                //調(diào)取未進行包裝的觀察者的onNext方法函數(shù)
                observer.onNext(t);
            }
        }

[ 3.8.2 ]這里的并沒有進行任何處理莫辨,而是調(diào)取了上層的SubscribeOnObserver類型的觀察者的onNext(Object value)

   @Override
        public void onNext(T t) {
            downstream.onNext(t);
        }

[ 3.8.3 ] 這里依舊是繼續(xù)向上調(diào)取,通過調(diào)取上層的ObserveOnObserver類型的觀察者的onNext(Object value)方法

 @Override
        public void onNext(T t) {
           //省略
            schedule();
        }
  void schedule() {
            if (getAndIncrement() == 0) {
                //進行線程調(diào)度,并將觀察者傳入
                worker.schedule(this);
            }
        }

[ 3.8.4 ]此處進行觀察者執(zhí)行線程的調(diào)度,由于我們在外部指定觀察者運行于UI線程沮榜,所以此處我們HandlerScheduler類的schedule(Runnable run, long delay, TimeUnit unit)方法中

public abstract class Scheduler {
   public abstract static class Worker implements Disposable {     
        @NonNull
        public Disposable schedule(@NonNull Runnable run) {
            return schedule(run, 0L, TimeUnit.NANOSECONDS);
        }
   }
}
final class HandlerScheduler extends Scheduler {
   private static final class HandlerWorker extends Worker {
      public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            //省略...
             //將handle和run關(guān)聯(lián)起來包裝成新對象
            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
            Message message = Message.obtain(handler, scheduled);
            message.obj = this;
            if (async) {
                message.setAsynchronous(true);
            }
           //handler發(fā)送消息
            handler.sendMessageDelayed(message, unit.toMillis(delay));
            //省略...
            return scheduled;
        }
    }
}

[ 3.8.5 ] 我們在這里將Runnable包裝成Message同過Handler進行發(fā)送盘榨,這將進入到Hanlder類中的runWithScissors(final Runnable r, long timeout)方法

public final boolean runWithScissors(final Runnable r, long timeout) {
        //省略
        if (Looper.myLooper() == mLooper) {
            //執(zhí)行Runnable接口的run()方法
            r.run();
            return true;
        }
        //省略...
    }

[ 3.8.6 ] 通過判定我們當前運行的線程是否跟Handler所在的線程是否一致 ,如果一致直接運行Runnablerun()方法蟆融,即我們再次回到了HandlerScheduler類中的內(nèi)部類HandlerWorker下草巡,執(zhí)行其run()方法

 @Override
        public void run() {
            try {
                delegate.run();
            } catch (Throwable t) {
                RxJavaPlugins.onError(t);
            }
        }

[ 3.8.7 ]沒有意外,這沒有進行任何處理型酥,繼續(xù)向上傳遞進入ObservableObserveOn類的run()

 @Override
        public void run() {
         //省略...
         drainNormal();
        }
void drainNormal() {
           //省略...
                    a.onNext(v);
          //省略...
        }

[ 3.8.8 ] 可以看到在此處我們回調(diào)了外部聲明觀察者接口的onNext(Object value)方法山憨,進而執(zhí)行我們自定義的業(yè)務(wù)!C趾怼郁竟!

    @Override
            public void onNext(String value) {
                if (disposable.isDisposed())
                    onComplete();
                value = mine_result.getText().toString() + "\n" + value;
                mine_result.setText(value);
                Log.e("回調(diào)執(zhí)行了onNext函數(shù)->", value);
            }

小結(jié)

至此完成了線程切換的全部過程,不得不說這個框架的復(fù)雜程度尤其之高由境,為了寫這篇文章也是累的自己一頭的汗=_=!! 一開始分析源碼的時候真是一個頭兩個大棚亩,主要是其中的類粘連性太強了,特別容易把自己搞得暈頭轉(zhuǎn)向虏杰,不過分析完之后腦中也就豁然開朗了讥蟆。
由于篇幅太長,我們將整體流程大致的再進行一下梳理纺阔,之后我會將流程圖奉上9パ!

流程梳理

    1. 我們在外部聲明一個ObservableCreate類型被觀察者 0實例和Observer類型的觀察者實例
    1. 調(diào)取[被觀察者]的subscribeOn(Schedulers.io())方法指定被觀察者的執(zhí)行線程州弟,方法執(zhí)行期間會聲明一個ObservableSubscribeOn類型對象作為參數(shù)傳入,它的父類繼承于Observable抽象類,隨后獲取一個新的被觀察者1低零;
    1. 然后再調(diào)取observeOn(AndroidSchedulers.mainThread())方法來指定觀察者的執(zhí)行線程婆翔,方法執(zhí)行期間會聲明一個ObservableObserveOn類型對象作為參數(shù)傳入,它的父類同樣繼承于Observable抽象類,再次返回一個新的被觀察者2
  • 4.使用最新的[被觀察者]調(diào)取subscribe(observer)方法進行主題發(fā)布掏婶,并將觀察者作為參數(shù)傳入啃奴。
  • 5.調(diào)用新的被觀察者2 對象的subscribeActual(Observer<? super T> observer)方法,創(chuàng)建一個執(zhí)行線程,調(diào)取新的被觀察者1subscribe(Observer<? super T> observer)方法雄妥,比創(chuàng)建一個ObserveOnObserver類型對象傳入
  • 6.調(diào)用新的被觀察者1 對象的subscribeActual(Observer<? super T> observer)方法最蕾,創(chuàng)建一個執(zhí)行線程,在線程的run()方法中調(diào)取被觀察者 0subscribe(Observer<? super T> observer)方法老厌,聲明一個SubscribeOnObserver類型的對象傳入
  • 7.調(diào)用被觀察者0 對象的subscribeActual(Observer<? super T> observer)方法瘟则,通過調(diào)取被觀察者subscribe(ObservableEmitter e)方法,回調(diào)到了我們的外部實現(xiàn).
  • 8.在外部的subscribe(...)方法枝秤,調(diào)取觀察者onNext(Object value)方法醋拧,而此時的觀察者經(jīng)過一套流程下來也是被進行的層層包裝,最終回調(diào)到了我們的外部實現(xiàn).

這篇文章篇幅我有點長,希望有興趣的同學(xué)慢慢讀丹壕,之后我把流程圖梳理完畢后會如數(shù)奉上~~~

此文章只代表個人理解觀點庆械,如有差錯希望積極之處,我們共同交流!!!

This ALL! Thanks EveryBody!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末菌赖,一起剝皮案震驚了整個濱河市缭乘,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌琉用,老刑警劉巖堕绩,帶你破解...
    沈念sama閱讀 216,470評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異辕羽,居然都是意外死亡逛尚,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,393評論 3 392
  • 文/潘曉璐 我一進店門刁愿,熙熙樓的掌柜王于貴愁眉苦臉地迎上來绰寞,“玉大人,你說我怎么就攤上這事铣口÷饲” “怎么了?”我有些...
    開封第一講書人閱讀 162,577評論 0 353
  • 文/不壞的土叔 我叫張陵脑题,是天一觀的道長件缸。 經(jīng)常有香客問我,道長叔遂,這世上最難降的妖魔是什么萍虽? 我笑而不...
    開封第一講書人閱讀 58,176評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮窟社,結(jié)果婚禮上唁盏,老公的妹妹穿的比我還像新娘。我一直安慰自己哩掺,他們只是感情好凿叠,可當我...
    茶點故事閱讀 67,189評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著嚼吞,像睡著了一般盒件。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上舱禽,一...
    開封第一講書人閱讀 51,155評論 1 299
  • 那天炒刁,我揣著相機與錄音,去河邊找鬼誊稚。 笑死切心,一個胖子當著我的面吹牛飒筑,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播绽昏,決...
    沈念sama閱讀 40,041評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼协屡,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了全谤?” 一聲冷哼從身側(cè)響起肤晓,我...
    開封第一講書人閱讀 38,903評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎认然,沒想到半個月后补憾,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,319評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡卷员,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,539評論 2 332
  • 正文 我和宋清朗相戀三年盈匾,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片毕骡。...
    茶點故事閱讀 39,703評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡削饵,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出未巫,到底是詐尸還是另有隱情窿撬,我是刑警寧澤,帶...
    沈念sama閱讀 35,417評論 5 343
  • 正文 年R本政府宣布叙凡,位于F島的核電站劈伴,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏握爷。R本人自食惡果不足惜跛璧,卻給世界環(huán)境...
    茶點故事閱讀 41,013評論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望新啼。 院中可真熱鬧追城,春花似錦、人聲如沸师抄。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,664評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽叨吮。三九已至,卻和暖如春瞬矩,著一層夾襖步出監(jiān)牢的瞬間茶鉴,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,818評論 1 269
  • 我被黑心中介騙來泰國打工景用, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留涵叮,地道東北人惭蹂。 一個月前我還...
    沈念sama閱讀 47,711評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像割粮,于是被迫代替她去往敵國和親盾碗。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,601評論 2 353

推薦閱讀更多精彩內(nèi)容