RxJava2筆記(四、觀察者線程切換)

在上一篇文章RxJava2筆記(三熟吏、訂閱線程切換)中,我們分析了訂閱線程是如何切換的玄窝,即調用subscribeOn()來切換訂閱線程時都執(zhí)行了哪些操作牵寺。在本文我們將繼續(xù)介紹觀察者線程切換,也就是將線程由子線程切換回UI線程恩脂。

繼續(xù)在前面的基礎上修改代碼帽氓,在訂閱線程切換方法后調用observeOn(AndroidSchedulers.mainThread())將線程切換回主線程:

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) {
        Log.i(TAG, "subscribe--運行線程:" + Thread.currentThread().getName());
        emitter.onNext(1);
        emitter.onNext(2);
        try {
            TimeUnit.MILLISECONDS.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        emitter.onNext(3);
        emitter.onComplete();
    }
})
    //將線程由UI線程切換到子線程執(zhí)行IO請求
    .subscribeOn(Schedulers.io())
    //將線程切換回UI線程,方面后續(xù)操作更新UI界面
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(observer);

看下運行結果:

I/MainActivity: onSubscribe--運行線程:main
I/MainActivity: subscribe--運行線程:RxCachedThreadScheduler-1
I/MainActivity: onNext: 1 --運行線程:main
I/MainActivity: onNext: 2 --運行線程:main
I/MainActivity: onNext: 3 --運行線程:main
I/MainActivity: onComplete--運行線程:main

可以看到俩块,subscribe方法運行在子線程中(也就是訂閱線程運行在名為RxCachedThreadScheduler-1的一個子線程中黎休,上文提到該線程是由RxJava實現(xiàn)的一個工廠類創(chuàng)建的)浓领,而observer運行在名為main的線程中,這個main線程就是UI線程奋渔。


看完了輸出結果,接下來就看看這個observeOn(AndroidSchedulers.mainThread())是如何將線程切換到UI線程的壮啊,點進去看下:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
    //當出現(xiàn)異常時嫉鲸,默認無延遲發(fā)送錯誤。bufferSize()是緩沖區(qū)大小歹啼,RxJava設置了一個默認大小玄渗,為128。
    return observeOn(scheduler, false, bufferSize());
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    //包裝類狸眼,保存上游observable
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

熟悉的套路藤树,跟調用subscribe.on方法時類似,只是多了一個驗證緩沖區(qū)大小不為空的代碼拓萌,這些我們都略過岁钓,直接看ObservableObserveOn這個類:

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //如果傳入的調度器是TrampolineScheduler,則不切換線程微王,在當前線程調度
        //但是調度的任務并不是馬上執(zhí)行屡限,而是等待當前任務執(zhí)行完畢再執(zhí)行
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            //創(chuàng)建工作者worker
            Scheduler.Worker w = scheduler.createWorker();
            //上游的subscribe,該方法會觸發(fā)上游的subscribeActual炕倘,
            //ObserveOnObserver也是一個包裝類钧大,保存下游的observer
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    //......代碼省略
}

熟悉的裝飾器模式:

  • 1、ObservableObserveOn繼承了AbstractObservableWithUpstream罩旋,其繼承的ObservableSource類型的source成員變量用于保存上游的observable啊央。
  • 2、AndroidSchedulers.mainThread()為本次傳入的scheduler涨醋,負責將線程切換到UI線程瓜饥。
  • 3、下游調用subscribe方法是觸發(fā)=>當前observable的subscribeActual方法=>觸發(fā)上游observable的subscribe方法=>傳入?yún)?shù)包裝類ObserveOnObserver(包裝了下游的觀察者observer)浴骂。

這里簡要介紹下步驟3:
這里的source是上游的observable對象压固,source.subscribe()方法實際調用的是上游observable對象的subscribeActual方法,并將下游observer對象的包裝類ObserveOnObserver作為參數(shù)傳遞進去靠闭,在上游observable對象的subscribeActual方法內帐我,調用ObserveOnObserver包裝類中的onSubscribe,onNext等方法愧膀,進而調用下游observer的onSubscribe拦键,onNext等方法。

接下來看下ObserveOnObserver這個下游observer包裝類:

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
        implements Observer<T>, Runnable {
    private static final long serialVersionUID = 6576896619930983584L;
    //下游observer
    final Observer<? super T> actual;
    //調度工作者
    final Scheduler.Worker worker;
    //當訂閱任務執(zhí)行出錯時檩淋,是否延遲發(fā)送錯誤消息芬为,默認為false萄金,也就是不延遲
    final boolean delayError;
    //緩沖區(qū)大小,緩存上游發(fā)送的事件
    final int bufferSize;
    //存儲上游observable發(fā)出的數(shù)據(jù)隊列
    SimpleQueue<T> queue;
    //存儲管理下游observer的訂閱狀態(tài)disposable
    Disposable s;
    //訂閱任務執(zhí)行出錯時媚朦,存儲錯誤信息
    Throwable error;
    //訂閱任務是否終止
    volatile boolean done;
    //訂閱任務是否被取消
    volatile boolean cancelled;
    //任務執(zhí)行模式----同步還是異步
    int sourceMode;
    //是否輸出融合(通常情況下該選項為false)
    boolean outputFused;

    ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
        this.actual = actual;
        this.worker = worker;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    public void onSubscribe(Disposable s) {
        //當前的disposable為null氧敢,上游subscribe產生的disposable不為null,則驗證通過
        if (DisposableHelper.validate(this.s, s)) {
            this.s = s;
            //如果訂閱時獲取的disposable對象s是QueueDisposable類型的
            if (s instanceof QueueDisposable) {
                @SuppressWarnings("unchecked")
                //新建QueueDisposable隊列并將訂閱時獲取的disposable對象s強轉為QueueDisposable询张,然后賦值給queue
                QueueDisposable<T> qd = (QueueDisposable<T>) s;
                //獲取任務運行模式
                int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
                //判斷運行模式孙乖,并且調用下游observer的onSubscribe方法將當前observer在訂閱時產生的disposable傳遞給下游observer
                if (m == QueueDisposable.SYNC) {
                    sourceMode = m;
                    queue = qd;
                    //為true 使得接下來的onXXX等方法均不會執(zhí)行
                    done = true;
                    actual.onSubscribe(this);
                    //worker直接調度任務
                    schedule();
                    return;
                }
                if (m == QueueDisposable.ASYNC) {
                    sourceMode = m;
                    queue = qd;
                    actual.onSubscribe(this);
                    //在異步模式下,等待onXXX方法中的worker調度
                    return;
                }
            }
            //否則創(chuàng)建一個支持單一生產者單一消費者的隊列
            queue = new SpscLinkedArrayQueue<T>(bufferSize);
            //調用下游observer的onSubscribe方法將當前observer在訂閱時產生的disposable傳遞給下游observer
            actual.onSubscribe(this);
        }
    }

    //......代碼省略
}

onSubscribe方法調用后份氧,就開始執(zhí)行onXXX方法了唯袄,首先是onNext方法,這個方法可以反復調用:

@Override
public void onNext(T t) {
    //訂閱模式是同步模式或者執(zhí)行過onComplete/onError方法時蜗帜,此時done為true恋拷,直接返回
    if (done) {
        return;
    }

    if (sourceMode != QueueDisposable.ASYNC) {
        //將上游數(shù)據(jù)源發(fā)射的數(shù)據(jù)添加到緩存隊列中
        queue.offer(t);
    }
    //開始worker調度任務
    //(這里面調用了handler,將數(shù)據(jù)發(fā)送到主線程所在的消息隊列厅缺,進而更新UI界面蔬顾,這里稍后分析)
    schedule();
}

其次是onError和onComplete,這兩個方法只能執(zhí)行一次并且是互斥的:

@Override
public void onError(Throwable t) {
    //如果任務狀態(tài)已經(jīng)是終止狀態(tài)湘捎,再執(zhí)行該任務是就會拋出異常
    if (done) {
        RxJavaPlugins.onError(t);
        return;
    }
    error = t;
    //設置任務狀態(tài)為終止狀態(tài)
    done = true;
    //worker任務調度
    schedule();
}

@Override
public void onComplete() {
    //如果任務是終止狀態(tài)阎抒,直接返回
    if (done) {
        return;
    }
    //設置任務狀態(tài)為終止狀態(tài)
    done = true;
    //worker任務調度
    schedule();
}

這里onNext,onComplete消痛,onError最后都調用schedule()來調度任務:

//調用worker.schedule(this)開始任務調度
void schedule() {
    //這里通過getAndIncrement() == 0原子性的保證了worker.schedule(this)在調度完之前不會再次被調度
    if (getAndIncrement() == 0) {
        worker.schedule(this);
    }
}

上面在執(zhí)行worker.schedule(this)時傳入了this且叁,也就是當前對象ObserveOnObserver,ObserveOnObserver類實現(xiàn)了Runnable接口秩伞,因此worker.schedule(this)調度的任務就是自己run()實現(xiàn)方法中的任務:

@Override
public void run() {
    //outputFused通常情況下為false
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
    }
}

drainFused()通常情況下不會執(zhí)行逞带,我們只需要關注drainNormal()方法即可,在查看該方法之前纱新,先看下drainNormal()內部調用的一個驗證方法checkTerminated(boolean d, boolean empty, Observer<? super T> a)展氓,該方法主要是檢測任務是否已終止:

boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
    //如果訂閱被取消,清空數(shù)據(jù)緩存隊列
    if (cancelled) {
        queue.clear();
        return true;
    }
    
    //這個d就是done
    if (d) {
        //done為true時脸爱,有兩種情況遇汞,在onNext調度完畢后執(zhí)行onComplete或onError
        Throwable e = error;
        if (delayError) {
            //如果是延遲發(fā)送錯誤的情況,必須等到queue(緩存上游observable發(fā)出的數(shù)據(jù))為空的情況下才能發(fā)送錯誤(有錯誤的情況下)
            if (empty) {
                if (e != null) {
                    a.onError(e);
                } else {
                    a.onComplete();
                }
                //終止worker任務調度
                worker.dispose();
                return true;
            }
        } else {
            //不延遲發(fā)送錯誤時簿废,直接調用
            if (e != null) {
                //如果任務執(zhí)行出錯空入,即調用了onNext方法,清空queue
                queue.clear();
                //調用下游observer的onError
                a.onError(e);
                //終止worker調度
                worker.dispose();
                return true;
            } else
            if (empty) {
                //任務正常執(zhí)行族檬,未出現(xiàn)錯誤
                //調用下游observer的onComplete歪赢,并終止worker調度
                a.onComplete();
                worker.dispose();
                return true;
            }
        }
    }
    //否則返回false,任務還未終止
    return false;
}

任務結束情況分為以下兩種

  • 1单料、訂閱被取消埋凯,即cancelled==true点楼,此時任務為終止狀態(tài)。
  • 2白对、任務運行標記done==true掠廓,即onNext調用完畢,調用onComplete正常結束甩恼;或者在onNext調用過程中出現(xiàn)錯誤蟀瞧,調用了onNext,此時是異常結束媳拴,會發(fā)送異常信息黄橘。

說完了這個方法兆览,我們繼續(xù)看drainNormal()這個方法:

void drainNormal() {
    //這個變量只是一個控制變量屈溉,用來確保drainNormal()方法能被原子性調用
    int missed = 1;

    final SimpleQueue<T> q = queue;
    final Observer<? super T> a = actual;
    
    //外層死循環(huán)
    for (;;) {
        //根據(jù)數(shù)據(jù)緩存隊列是否為空檢查任務是否已終止
        //為空就會直接跳出外層循環(huán),方法執(zhí)行結束抬探,內層的循環(huán)也不會執(zhí)行
        if (checkTerminated(done, q.isEmpty(), a)) {
            return;
        }

        //內層死循環(huán)子巾,負責具體的任務處理
        for (;;) {
            boolean d = done;
            T v;

            try {
                //從隊列中獲取數(shù)據(jù)
                v = q.poll();
            } catch (Throwable ex) {
                //任務執(zhí)行出現(xiàn)錯誤,拋出致命錯誤信息小压,結束循環(huán)线梗,終止本次任務
                Exceptions.throwIfFatal(ex);
                //終止訂閱
                s.dispose();
                //清空緩存隊列
                q.clear();
                a.onError(ex);
                //停止worker調度
                worker.dispose();
                return;
            }
            boolean empty = v == null;
            //判斷從隊列中取出的數(shù)據(jù)是否為空,即判斷隊列是否為空
            if (checkTerminated(d, empty, a)) {
                return;
            }
            //隊列中沒有了數(shù)據(jù)怠益,直接退出
            if (empty) {
                break;
            }
            //調用下游observer的onNext(onComplete和onError均在checkTerminated方法里調用)
            a.onNext(v);
        }
        //這里主要是確保在同一時間只有一個worker.schedule(this)正在執(zhí)行仪搔。
        //missed變量在方法最開始初始化為1,這里missed會被重置為0蜻牢,這樣下面的missed==0成立烤咧,當前任務結束。
        //addAndGet(-missed)方法也會將AtomicInteger內部的VALUE值設置為0抢呆。
        //同時煮嫌,run()方法中的判定方法getAndIncrement() == 0成立,繼續(xù)執(zhí)行下一個worker.schedule(this)抱虐。
        //如果程序沒有走到這里昌阿,那么missed==0也就不成立,相應的run()方法中的getAndIncrement() == 0不成立恳邀,也就不會執(zhí)行下一個worker.schedule(this)懦冰。
        //這樣就原子性的確保了同一時間只有一個worker.schedule(this)正在執(zhí)行,即同一時間只有一個drainNormal()方法在執(zhí)行谣沸。
        missed = addAndGet(-missed);
        //程序走到這里儿奶,表示當前任務正常結束,退出循環(huán)鳄抒。
        //run()方法繼續(xù)執(zhí)行闯捎,getAndIncrement() == 0成立椰弊,開始執(zhí)行下一個worker.schedule(this)。
        if (missed == 0) {
            break;
        }
    }
}

到這里瓤鼻,ObserveOnObserver這個下游observer包裝類也就介紹的差不多了秉版,簡要總結下:

  • 1、構造方法接收了外部創(chuàng)建的工作者worker茬祷,負責任務調度清焕。
  • 2、內部定義了一個緩存隊列祭犯,緩存上游observable發(fā)射的數(shù)據(jù)秸妥。
  • 3、ObserveOnObserver的父類繼承了AtomicInteger類沃粗,這樣在多線程環(huán)境中粥惧,可以通過AtomicInteger的CAS操作確保線程安全。
  • 4最盅、ObserveOnObserver類實現(xiàn)了Runnable接口突雪,在run()方法內,使用worker進行任務調度涡贱,并通過getAndIncrement()==0來確保worker任務調度的原子性操作咏删。
  • 5、drainNormal()為實際調度執(zhí)行的方法问词,其內部聲明了一個局部變量missed督函,并初始化為1。然后開始死循環(huán)激挪,在外層循環(huán)末尾處(內層循環(huán)正常結束辰狡,消息隊列中的數(shù)據(jù)都被取了出來并處理完畢,此時消息隊列為空灌灾,結束內層循環(huán))搓译,調用missed = addAndGet(-missed)后,ObserveOnObserver內部的VALUE值被重置為0锋喜,最后判定missed==0成立些己,程序跳出死循環(huán),drainNormal()方法執(zhí)行完畢嘿般。最后run()方法繼續(xù)執(zhí)行段标,此時getAndIncrement()==0成立,開始下一個worker任務調度炉奴。
    如果在循環(huán)途中因為某些原因(任務已執(zhí)行完畢或出現(xiàn)異常)而中途退出循環(huán)沒能執(zhí)行到循環(huán)末尾逼庞,那么其內部的VALUE值就不為0,getAndIncrement()==0不成立瞻赶,不會執(zhí)行worker調度赛糟,因此整個訂閱也就結束了派任。

在上一篇文章中,訂閱線程最終是通過調度器來執(zhí)行具體切換過程的璧南。同樣的掌逛,對于觀察者線程切換執(zhí)行的也是類似的過程。前面分析道司倚,ObservableObserveOn構造方法接收我們傳入的調度器scheduler豆混,并通過scheduler創(chuàng)建工作者worker,將其傳入到ObserveOnObserver的構造方法中动知,最后在run()方法中執(zhí)行具體的任務調度皿伺。因此觀察者的線程切換肯定是發(fā)生在worker的調度過程中。

先從observeOn(AndroidSchedulers.mainThread())的參數(shù)AndroidSchedulers.mainThread()開始盒粮,點進去看下:

public final class AndroidSchedulers {

    private static final class MainHolder {
        //構造一個與UI線程關聯(lián)的Handler鸵鸥,并將其作為參數(shù)構造HandlerScheduler調度器
        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });

    /** A {@link Scheduler} which executes actions on the Android main thread. */
    //返回與主線程相關的scheduler調度器
    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }

    /** A {@link Scheduler} which executes actions on {@code looper}. */
    //自己指定線程Looper自定義觀察者線程切換
    public static Scheduler from(Looper looper) {
        if (looper == null) throw new NullPointerException("looper == null");
        return new HandlerScheduler(new Handler(looper));
    }

    private AndroidSchedulers() {
        throw new AssertionError("No instances.");
    }
}

AndroidSchedulers.mainThread()最終返回HandlerScheduler對象,HandlerScheduler也是繼承自Scheduler拆讯,其構造方法接收一個Handler類型的參數(shù)脂男,這個Handler通過Looper.getMainLooper()與UI線程關聯(lián)起來养叛,這樣通過Handler發(fā)送的消息就能被UI線程接收种呐,從而更新UI界面。具體返回HandlerScheduler對象的步驟與上文所介紹的訂閱線程切換生成IoScheduler一致弃甥,不再詳述爽室。

熟悉Handler的童鞋都知道,Handler在安卓的消息機制中占有重要的地位淆攻,它貫穿著整個安卓的體系阔墩,在很多地方我們都能見到它的身影。在剛開始接觸安卓開發(fā)的時候我們都寫過類似下面的代碼:

private Handler handler = new Handler(){
    @Override
    public void handleMessage(Message msg) {
        super.handleMessage(msg);
        //更新UI線程代碼
        //獲取子線程發(fā)送的message消息里面的請求數(shù)據(jù)瓶珊,更新UI界面
        //操作省略
    }
};

public void getData(){
    new Thread(new Runnable() {
        @Override
        public void run() {
            //1啸箫、網(wǎng)絡請求數(shù)據(jù)并返回
            //2、獲取Message對象伞芹,并將請求到的數(shù)據(jù)用Message包裹起來
            //3忘苛、調用handler的sendMessage等方法發(fā)送Message
        }
    }).start();
}

隨著現(xiàn)在各種各樣網(wǎng)絡請求框架的出現(xiàn),大大簡化了我們網(wǎng)絡請求更新UI的操作唱较,上面的代碼我們很少再去寫了扎唾,但并不意味著它就不重要了,尤其是Handler南缓。其實很多的網(wǎng)絡請求框架都只不過是將上面的操作封裝起來了而已胸遇,RxJava雖然與網(wǎng)絡請求無關,但在觀察者線程切換里面同樣也是將上面的過程封裝起來汉形,方便我們使用纸镊。

我們接著來看HandlerScheduler這個類:

final class HandlerScheduler extends Scheduler {
    private final Handler handler;

    HandlerScheduler(Handler handler) {
        this.handler = handler;
    }

    //......代碼省略

    @Override
    public Worker createWorker() {
        return new HandlerWorker(handler);
    }
    //......代碼省略
}

HandlerScheduler繼承自Scheduler倍阐,并實現(xiàn)了createWorker()方法生成任務調度worker,這里返回的是一個HandlerWorker對象逗威,前面提到的worker.schedule(this)中的worker實際上就是這個HandlerWorker收捣。

private static final class HandlerWorker extends Worker {
    //保存外部傳進來的Handler,這里保存的是與UI線程關聯(lián)的Handler庵楷,具體參見上面介紹
    private final Handler handler;
    //事件訂閱狀態(tài)標志位
    private volatile boolean disposed;

    HandlerWorker(Handler handler) {
        this.handler = handler;
    }

    //worker.schedule(this)最終調用的方法罢艾,這里delay為0,表示handler無延遲發(fā)送消息
    @Override
    public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");
        //如果訂閱已終止尽纽,返回帶有終止狀態(tài)的disposable
        if (disposed) {
            return Disposables.disposed();
        }
        //對任務做一些自己的處理(默認情況下沒做任何處理)
        run = RxJavaPlugins.onSchedule(run);
        //包裝類
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
        //獲取message咐蚯,設置其target成員為handler,callback成員為scheduled
        Message message = Message.obtain(handler, scheduled);
        message.obj = this; // Used as token for batch disposal of this worker's runnables.
        //將消息發(fā)送到UI關聯(lián)的消息隊列中弄贿,此時handler中的queue是UI線程中的queue
        //(參考new Handler(Looper.getMainLooper()))
        handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

        // Re-check disposed state for removing in case we were racing a call to dispose().
        //再次檢查訂閱是否被終止春锋,若已被終止,移除handler中的callback差凹,并返回帶有終止狀態(tài)的disposable
        if (disposed) {
            handler.removeCallbacks(scheduled);
            return Disposables.disposed();
        }

        return scheduled;
    }
    
    //訂閱終止
    @Override
    public void dispose() {
        disposed = true;
        handler.removeCallbacksAndMessages(this /* token */);
    }

    @Override
    public boolean isDisposed() {
        return disposed;
    }
}

在HandlerWorker中期奔,將任務run用ScheduledRunnable包裝起來,在設置到Message的callback中危尿,生成一個Message呐萌,熟悉handler的都知道,handler通過dispatchMessage(Message msg)方法來進行消息的分發(fā)處理谊娇,這個方法的處理順序如下:

  • 1肺孤、判斷message中的callback(該callback是一個Runnable)是否為空,若不為空济欢,調用handleCallback(msg)赠堵,在handleCallback(msg)中,直接執(zhí)行message.callback.run()法褥。若為空茫叭,進入步驟2。
  • 2半等、判斷handler中的callback(該callback是Handler內部的一個內部接口揍愁,由我們自己實現(xiàn))是否為空,若不為空酱鸭,執(zhí)行callback.handleMessage(msg)吗垮。若為空,進入步驟3凹髓。
  • 3烁登、若步驟1和步驟2中的判斷均不成立,執(zhí)行handleMessage(msg),也就是我們前面寫的樣例代碼饵沧。
    在這里由于Message中的callback不為空锨络,因此執(zhí)行執(zhí)行callback中的run()方法,也就是前面提到的ObserveOnObserver(實現(xiàn)了Runnable接口)內部的run()方法狼牺,進而調用drainNormal()方法羡儿,最終調用下游observer的onNext。

生成Message后是钥,調用handler的sendMessageDelayed方法發(fā)送消息(這里delay參數(shù)為0掠归,因此是立即發(fā)送),由于這里的handler是與UI線程關聯(lián)在一起的悄泥,因此ObserveOnObserver(實現(xiàn)了Runnable接口)內部的run()方法就被發(fā)送到了UI線程中的消息隊列中虏冻,最終通過handler的dispatchMessage(Message msg)方法調用handleCallback(msg),最后調用message.callback.run()執(zhí)行run()方法里面的代碼弹囚,完成UI線程更新厨相。

在文章的最后,我們來看下這個run任務的包裝類ScheduledRunnable:

private static final class ScheduledRunnable implements Runnable, Disposable {
    private final Handler handler;
    private final Runnable delegate;

    private volatile boolean disposed;

    //保存外部傳入的handler和runnable任務
    ScheduledRunnable(Handler handler, Runnable delegate) {
        this.handler = handler;
        this.delegate = delegate;
    }

    @Override
    public void run() {
        try {
            //執(zhí)行run方法鸥鹉,即傳入的ObserveOnObserver(實現(xiàn)了Runnable接口)內部的run()方法
            delegate.run();
        } catch (Throwable t) {
            IllegalStateException ie =
                    new IllegalStateException("Fatal Exception thrown on Scheduler.", t);
            RxJavaPlugins.onError(ie);
            Thread thread = Thread.currentThread();
            thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
        }
    }

    //訂閱終止蛮穿,從handler中移除該runnable
    //(實際上是從UI線程內部的隊列中將包裝這個runnable的message移除,如果這個message還未處理的話)
    @Override
    public void dispose() {
        disposed = true;
        handler.removeCallbacks(this);
    }

    @Override
    public boolean isDisposed() {
        return disposed;
    }
}

至此毁渗,整個觀察者線程切換也就介紹完了践磅,最后我們再來梳理下思路:

  • 1、observeOn(AndroidSchedulers.mainThread())返回一個ObservableObserveOn對象祝蝠,該對象內部(source)保存了上游傳遞過來的observable音诈。
  • 2幻碱、下游執(zhí)行subscribe時绎狭,實際上執(zhí)行的是ObservableObserveOn對象內部的subscribeActual方法,該方法內部首先判斷調度器的類型(具體參看上面介紹)褥傍,這里傳入的調度器類型是HandlerScheduler儡嘶。
  • 3、將下游observer對象用ObserveOnObserver包裝起來恍风,生成ObserveOnObserver對象蹦狂。
  • 4、執(zhí)行source.subscribe方法朋贬,并將步驟3中生成的ObserveOnObserver對象作為參數(shù)傳遞進去凯楔。
  • 5、步驟4中的subscribe方法實際上執(zhí)行的是上游observable的subscribeActual方法锦募,在該方法中ObserveOnObserver內部的onSubscribe方法首先被執(zhí)行摆屯,在這個onSubscribe方法內部調用下游observer的onSubscribe方法,建立訂閱關系糠亩。
  • 6虐骑、執(zhí)行完onSubscribe方法后准验,ObserveOnObserver內部的onNext和onComplete或者onNext依次被執(zhí)行(onNext可被反復調用)。
  • 7廷没、在ObserveOnObserver內部的onNext方法中糊饱,將上游發(fā)射的數(shù)據(jù)緩存到隊列中,然后調用schedule方法颠黎,通過HandlerScheduler生成的worker對象(HandlerWorker)開始調度任務另锋,通過步驟1中調用AndroidSchedulers.mainThread()時生成的關聯(lián)UI線程的handler將Runnable任務發(fā)送到UI線程的消息隊列中。
  • 8狭归、ObserveOnObserver實現(xiàn)了Runnable接口砰蠢,在其實現(xiàn)方法run()中,最終調用了drainNormal()方法唉铜,該方法內有兩個嵌套的循環(huán)台舱,且都是死循環(huán)。外層循環(huán)結束的條件是當次訂閱任務被終止(外部取消或者任務執(zhí)行完畢或任務執(zhí)行出錯)潭流,內層循環(huán)結束的條件是當次訂閱任務執(zhí)行完畢或執(zhí)行出現(xiàn)錯誤竞惋。在內層循環(huán)內部,不斷的從緩存隊列中取出數(shù)據(jù)灰嫉,最后調用下游observer的onNext方法拆宛。在步驟7中,Runnable任務最后被發(fā)送到了UI線程內的消息隊列中讼撒,因此這個run()方法也是運行在UI線程浑厚,最終在UI線程中我們可以用這些數(shù)據(jù)更新UI界面。
  • 9根盒、ObserveOnObserver內部的onComplete和onError執(zhí)行過程與onNext類似钳幅,區(qū)別是onComplete和onNext只會調用一次并且是互斥調用(參見上面源碼分析)。同樣炎滞,最終也會調用drainNormal()方法敢艰,只不過在drainNormal()方法的外層死循環(huán)內,首先調用的是checkTerminated方法册赛,用來判斷當前訂閱任務是否已被終止钠导,下游observer的onComplete和onError就是在這個方法里調用的。

下一章RxJava2筆記(五森瘪、訂閱流程梳理以及線程切換次數(shù)有效性)將對前面做一個流程梳理牡属,以此來結束RxJava的學習。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末扼睬,一起剝皮案震驚了整個濱河市逮栅,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖证芭,帶你破解...
    沈念sama閱讀 212,816評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件瞳浦,死亡現(xiàn)場離奇詭異,居然都是意外死亡废士,警方通過查閱死者的電腦和手機叫潦,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,729評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來官硝,“玉大人矗蕊,你說我怎么就攤上這事∏饧埽” “怎么了傻咖?”我有些...
    開封第一講書人閱讀 158,300評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長岖研。 經(jīng)常有香客問我卿操,道長,這世上最難降的妖魔是什么孙援? 我笑而不...
    開封第一講書人閱讀 56,780評論 1 285
  • 正文 為了忘掉前任害淤,我火速辦了婚禮,結果婚禮上拓售,老公的妹妹穿的比我還像新娘窥摄。我一直安慰自己,他們只是感情好础淤,可當我...
    茶點故事閱讀 65,890評論 6 385
  • 文/花漫 我一把揭開白布崭放。 她就那樣靜靜地躺著,像睡著了一般鸽凶。 火紅的嫁衣襯著肌膚如雪币砂。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 50,084評論 1 291
  • 那天吱瘩,我揣著相機與錄音道伟,去河邊找鬼。 笑死使碾,一個胖子當著我的面吹牛,可吹牛的內容都是我干的祝懂。 我是一名探鬼主播票摇,決...
    沈念sama閱讀 39,151評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼砚蓬!你這毒婦竟也來了矢门?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 37,912評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎祟剔,沒想到半個月后隔躲,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,355評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡物延,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,666評論 2 327
  • 正文 我和宋清朗相戀三年宣旱,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片叛薯。...
    茶點故事閱讀 38,809評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡浑吟,死狀恐怖,靈堂內的尸體忽然破棺而出耗溜,到底是詐尸還是另有隱情组力,我是刑警寧澤,帶...
    沈念sama閱讀 34,504評論 4 334
  • 正文 年R本政府宣布抖拴,位于F島的核電站燎字,受9級特大地震影響,放射性物質發(fā)生泄漏阿宅。R本人自食惡果不足惜轩触,卻給世界環(huán)境...
    茶點故事閱讀 40,150評論 3 317
  • 文/蒙蒙 一澄成、第九天 我趴在偏房一處隱蔽的房頂上張望肮柜。 院中可真熱鬧,春花似錦酸舍、人聲如沸拉馋。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,882評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽煌茴。三九已至随闺,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間蔓腐,已是汗流浹背矩乐。 一陣腳步聲響...
    開封第一講書人閱讀 32,121評論 1 267
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留回论,地道東北人散罕。 一個月前我還...
    沈念sama閱讀 46,628評論 2 362
  • 正文 我出身青樓,卻偏偏與公主長得像傀蓉,于是被迫代替她去往敵國和親欧漱。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,724評論 2 351

推薦閱讀更多精彩內容