RxJava 線程切換源碼的一些體會和思考

前言

RxJava 是在今年年初的時候上的車,接觸也快要滿一年了没宾。從最初只知道幾個操作符凌彬,寫寫 Demo 琼梆,或者跟著別人的項目和經(jīng)驗依葫蘆畫瓢偎行,到目前終于有點初窺門徑的地步舰攒。

RxJava 對于 Android 來說诞外,最直觀地便利就在于線程切換羞海。所以本篇內(nèi)容就是學(xué)習(xí) RxJava 是如何實現(xiàn)切換線程环戈。

希望讀者閱讀此篇文章宵膨,是有用過 RxJava 的童鞋犯建。

本章內(nèi)容基于源碼版本

RxJava: 1.2.4

目錄

準備

答案我會放在文章末尾

先來一道開胃菜:

指出下列程序操作符所運行的線程迁酸。

Observable.just() //1
          .subscribeOn(Schedulers.newThread())
          .map() //2
          .subscribeOn(Schedulers.io())
          .map() //3
          .observeOn(Schedulers.computation())
          .map() //4
          .observeOn(Schedulers.newThread())
          .subscribe() //5

開胃菜就到上面結(jié)束先鱼,如果你能夠清楚明白每個操作運行的線程,說明對于 RxJava 的線程切換的理解很正確胁出。

再具體分析 RxJava 是如何線程切換的型型,希望能清楚以下幾個 RxJava 中名詞的意思。

  • Create()
  • OnSubscribe
  • Operator

如果你特別明白這幾個 RxJava 類/方法的作用全蝶,可以直接跳過看切換這部分闹蒜。

  1. Create()

    /**
     * Returns an Observable that will execute the specified function when a {@link Subscriber} subscribes to
     * it.
     */
    
    public static <T> Observable<T> create(OnSubscribe<T> f) {
       return new Observable<T>(RxJavaHooks.onCreate(f));
    }
    

    方法注釋上說明,當訂閱者訂閱之后抑淫,該函數(shù)會返回將會執(zhí)行具體功能的流绷落。操作符進入源碼會發(fā)現(xiàn)他們最終都會調(diào)用到 create() 函數(shù)。

  2. OnSubscribe

    /**
     * Invoked when Observable.subscribe is called.
     * @param <T> the output value type
     */
    public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {}
    

    首先我們知道這是一個繼承 Action1 的接口始苇,并且是在 Observable.subscribe 流進行訂閱操作后回調(diào)砌烁。而且回顧剛剛 create() 源碼中也發(fā)現(xiàn)參數(shù)就是這個 OnSubscribeAction 的作用就是執(zhí)行其中的 call() 方法催式。

    Observable.OnSubscribe 有點像 Todo List 函喉,里面都是一個一個待處理的事務(wù),并且這個 List 是有序的(這個很關(guān)鍵)荣月。

  3. Operator

    public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
      // cover for generics insanity
    }
    

    簡單來說它的職責(zé)就是將一個 Subscriber 變成另外一個 Subscriber管呵。

切換

上面知識點是一些小鋪墊,因為后面的內(nèi)容核心其實就是上面幾個類的作用哺窄。

SubscribeOn

追蹤這個方法捐下,核心是在這個類:

public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

    final Scheduler scheduler;
    final Observable<T> source;

    public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
        this.scheduler = scheduler;
        this.source = source;
    }
}

我先貼出這個類的账锹,構(gòu)造方法和成員變量,因為很重要坷襟,我們先把前因弄清楚奸柬。

首先我們發(fā)現(xiàn)這個類是實現(xiàn)了 OnSubscribe 接口,之前復(fù)習(xí)到這個的作用就是在該流被訂閱之后執(zhí)行 call() 方法婴程,這里面就是后果廓奕,待會我們來看。

前因其實很簡單档叔,就是傳入兩個參數(shù):

  1. 一個是 Scheduler 懂从,調(diào)度器,它的具體實現(xiàn)在 Schedulers 里蹲蒲。

  2. Observable<T> source 這個其實就是當前這個流。

    public final Observable<T> subscribeOn(Scheduler scheduler) {
      if (this instanceof ScalarSynchronousObservable) {
        return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
      }
      return create(new OperatorSubscribeOn<T>(this, scheduler));
    }
    

接下來看看 call() 核心代碼里做的事情:


    // 因為是 OnSubscribe 類侵贵,這里 call() 中傳入的參數(shù)是 Observable.subscribe(s) 中的 s
    @Override
    public void call(final Subscriber<? super T> subscriber) {
        // 根據(jù)傳入的調(diào)度器届搁,創(chuàng)建一個 Worker 對象 inner
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);

        // 在 Worker 對象 inner 中執(zhí)行(意思就是,在我們指定的調(diào)度器創(chuàng)建的線程中運行)
        inner.schedule(new Action0() {
            @Override
            public void call() {
                final Thread t = Thread.currentThread();

                // 對訂閱者包裝
                Subscriber<T> s = new Subscriber<T>(subscriber) {
                    @Override
                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }
                    ······
                };

                // 這一句位置很關(guān)鍵
                // 首先 source 是之前傳入的流(也就是當前流)窍育,在 Worker 內(nèi)部進行了訂閱操作卡睦,所以該流所有操作都執(zhí)行在其中
                source.unsafeSubscribe(s);
            }
        });
    }

通過我們指定的調(diào)度器,創(chuàng)建好 Worker 漱抓,之前傳入的流在 Worker 內(nèi)部表锻,對重新包裹的 subscriber 進行訂閱操作。

所以 SubscribeOn()最關(guān)鍵的地方其實是因為這行代碼在調(diào)度器創(chuàng)建的 Worker 的 call()

source.unsafeSubscribe(s);

總結(jié):

subscribeOn 其實是改變了調(diào)用前序列所運行的線程乞娄。

ObserveOn

同樣的方法來分析瞬逊,最終的回調(diào)會到:

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
  if (this instanceof ScalarSynchronousObservable) {
    return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
  }
  return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}

其實看到關(guān)鍵字 lift 和 operator 就大約可以猜到是做什么的了。

接下來我們進入到 OperatorObserveOn 類中:

public final class OperatorObserveOn<T> implements Operator<T, T> {

    private final Scheduler scheduler;
    // 省略不必要的代碼

    @Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
            // 省略 ···
            ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
            parent.init();
            return parent;
        }
    }
}

我們首先會注意到它是一個 Operator 仪或,并且沒有對上層 Observale 做任何修改和包裝确镊。那么它的作用就是將一個 Subscriber 變成另外一個 Subscriber。所以接下來我們的首要任務(wù)就是看轉(zhuǎn)換后的 Subscriber 做了什么改變范删。

關(guān)鍵代碼在

ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();

child 是改變前的 Subscriber 蕾域,最后返回了 parent

我們發(fā)現(xiàn) ObserveOnSubscriber 同樣也是一個 Subscriber 類到旦,所以肯定含有 onNext/onError/onComplete 這三個標準方法旨巷,重要的肯定是 onNext ,所以我只貼上了該類三個有關(guān)函數(shù)添忘。

void init() {
    Subscriber<? super T> localChild = child;

    localChild.setProducer(new Producer() {

        @Override
        public void request(long n) {
            if (n > 0L) {
                BackpressureUtils.getAndAddRequest(requested, n);
                // 執(zhí)行
                schedule();
            }
        }

    });
    // recursiveScheduler 這個是構(gòu)造函數(shù)時傳入調(diào)度器創(chuàng)建的 worker
    localChild.add(recursiveScheduler);
    localChild.add(this);
}

@Override
public void onNext(final T t) {
  if (isUnsubscribed() || finished) {
    return;
  }
  // 條件判斷里先將之前流的結(jié)果緩存進隊列
  if (!queue.offer(on.next(t))) {
    onError(new MissingBackpressureException());
    return;
  }
  // 執(zhí)行
  schedule();
}


protected void schedule() {
    if (counter.getAndIncrement() == 0) {
        // 在當前 worker 上執(zhí)行該類的 call 方法
        recursiveScheduler.schedule(this);
    }
}

call() 方法有點冗長采呐,做的事情其實很簡單,就是取出我們緩存之前流的所有值昔汉,然后在 Worker 工作線程中傳下去懈万。

總結(jié):

  1. ObserveOn 不會關(guān)心之前的流的線程
  2. ObserveOn 會先將之前的流的值緩存起來拴清,然后再在指定的線程上,將緩存推送給后面的 Subscriber

共用時各自的作用域

 Observable.just() //1
            .subscribeOn(Schedulers.newThread())
            .map() //2
            .map() //3
            .observeOn(Schedulers.computation())
            .map() //4
            .observeOn(Schedulers.newThread())
            .subscribe() //5

如果分析這個流各個操作符的執(zhí)行線程会通,我們先把第一個 subscribeOn() 之前和第一個 observeOn() 之前的 Todo Items 找出來然后求并集:

得到的結(jié)果就是 subscribeOn() 的作用域口予。

之后的線程切換簡單了,遇到 observeOn() 就切換一次涕侈。

思考

為什么subscribeOn 只有第一次調(diào)用生效沪停?

我的理解如下:

subscribeOn 的作用域就是調(diào)用前序列中所有的 Todo List 任務(wù)清單(Observable.OnSubscribe),當我們執(zhí)行 subscribe() 時裳涛,這些任務(wù)清單就會執(zhí)行在 subscribeOn 指定的工作線程木张,而第二個 subscribeOn 早就沒有任務(wù)可做了,所以無法生效端三。


知乎里這段說的比我專業(yè):

正像 StackOverflow 上那段描述的舷礼,整個 Observable 數(shù)據(jù)流工作起來是分為兩個階段(或者說是兩個 lifecycle):upstream 的 subscription-time 和 downstream 的 runtime。

subscription-time 的階段郊闯,是為了發(fā)起和驅(qū)動數(shù)據(jù)流的啟動妻献,在內(nèi)部實現(xiàn)上體現(xiàn)為 OnSubscribe 向上游的逐級調(diào)用(控制流向上游傳遞)。支持 backpressure 的 producer request 也屬于這個階段团赁。除了 producer request 的情況之外育拨,subscription-time 階段一般就是從下游到上游調(diào)用一次就結(jié)束了,最終到達生產(chǎn)者(以最上游的那個 OnSubscribe 來體現(xiàn))欢摄。接下來數(shù)據(jù)流就開始向下游流動了熬丧。

Rxjava 中, subscribeOn 及 observeOn 方法切換線程發(fā)生的位置為什么設(shè)計為不同的怀挠? - 知乎

doOnSubscribe 的例外

我們再改動下開胃菜的代碼:

Observable.just() //1
          .subscribeOn(Schedulers.newThread())
          .map() //2
          .subscribeOn(Schedulers.io())
          .map() //3
          .observeOn(Schedulers.computation())
          .map() //4
          .doOnSubscribe() //6
          .observeOn(Schedulers.newThread())
          .subscribe() //5

只添加了一行.doOnSubscribe() //6 析蝴,也是探討這個操作符執(zhí)行的線程。

public class OperatorDoOnSubscribe<T> implements Operator<T, T> {
    private final Action0 subscribe;

    public OperatorDoOnSubscribe(Action0 subscribe) {
        this.subscribe = subscribe;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super T> child) {
        // 執(zhí)行我們的 Action
        subscribe.call();
        // Wrap 里面是包裝成一個新的 Subscriber 返回唆香,不對這個流做任何改變
        return Subscribers.wrap(child);
    }
}

doOnSubscribe 執(zhí)行的線程其實就是 subscribe.call(); 所在的線程嫌变。這里觸發(fā)的時機就是,當我們進行 Observable.subscribe() 時躬它,如果我們沒有在緊接之后SubscribeOn 指定線程腾啥,那么它就會運行在默認線程,然后返回一個新的流冯吓。


關(guān)于 doOnSubscribe() 留一個問題

Observable.just()
          .doOnSubscribe() // 1
          .doOnSubscribe() // 2
          .subscribe()

問題是倘待,對于 1 和 2 的執(zhí)行順序?

在開發(fā)中组贺,我們肯定不會像問題那樣寫代碼凸舵,只是自己在看 doOnSubscribe 源碼的時候,在問自己為什么它在其他操作符之前失尖,拓展到了 RxJava 流的一個執(zhí)行順序啊奄,也是自己想要明白的地方渐苏。所以下次準備探討學(xué)習(xí)。

對了菇夸,老司機說 RxJava 很像洋蔥琼富,一層一層。

進行分析學(xué)習(xí)的時候可以類比幫助理解庄新。

參考

Thomas Nield: RxJava- Understanding observeOn() and subscribeOn()

SubscribeOn 和 ObserveOn |Piasy Blog

答案:

1 newThread

2 newThread

3 newThread

4 computation

5 newThread

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末鞠眉,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子择诈,更是在濱河造成了極大的恐慌械蹋,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,682評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件羞芍,死亡現(xiàn)場離奇詭異哗戈,居然都是意外死亡,警方通過查閱死者的電腦和手機荷科,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,277評論 3 395
  • 文/潘曉璐 我一進店門谱醇,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人步做,你說我怎么就攤上這事∧胃剑” “怎么了全度?”我有些...
    開封第一講書人閱讀 165,083評論 0 355
  • 文/不壞的土叔 我叫張陵,是天一觀的道長斥滤。 經(jīng)常有香客問我将鸵,道長,這世上最難降的妖魔是什么佑颇? 我笑而不...
    開封第一講書人閱讀 58,763評論 1 295
  • 正文 為了忘掉前任顶掉,我火速辦了婚禮,結(jié)果婚禮上挑胸,老公的妹妹穿的比我還像新娘痒筒。我一直安慰自己,他們只是感情好茬贵,可當我...
    茶點故事閱讀 67,785評論 6 392
  • 文/花漫 我一把揭開白布簿透。 她就那樣靜靜地躺著,像睡著了一般解藻。 火紅的嫁衣襯著肌膚如雪老充。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,624評論 1 305
  • 那天螟左,我揣著相機與錄音啡浊,去河邊找鬼觅够。 笑死,一個胖子當著我的面吹牛巷嚣,可吹牛的內(nèi)容都是我干的喘先。 我是一名探鬼主播,決...
    沈念sama閱讀 40,358評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼涂籽,長吁一口氣:“原來是場噩夢啊……” “哼苹祟!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起评雌,我...
    開封第一講書人閱讀 39,261評論 0 276
  • 序言:老撾萬榮一對情侶失蹤树枫,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后景东,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體砂轻,經(jīng)...
    沈念sama閱讀 45,722評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年斤吐,在試婚紗的時候發(fā)現(xiàn)自己被綠了搔涝。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,030評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡和措,死狀恐怖庄呈,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情派阱,我是刑警寧澤诬留,帶...
    沈念sama閱讀 35,737評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站贫母,受9級特大地震影響文兑,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜腺劣,卻給世界環(huán)境...
    茶點故事閱讀 41,360評論 3 330
  • 文/蒙蒙 一绿贞、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧橘原,春花似錦籍铁、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,941評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至歼冰,卻和暖如春靡狞,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背隔嫡。 一陣腳步聲響...
    開封第一講書人閱讀 33,057評論 1 270
  • 我被黑心中介騙來泰國打工甸怕, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留甘穿,地道東北人。 一個月前我還...
    沈念sama閱讀 48,237評論 3 371
  • 正文 我出身青樓梢杭,卻偏偏與公主長得像温兼,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子武契,可洞房花燭夜當晚...
    茶點故事閱讀 44,976評論 2 355

推薦閱讀更多精彩內(nèi)容