RxJava2.X源碼解析(四)

更多分享:http://www.cherylgood.cn

一、前言

  • 基于RxJava2.1.1
  • 我們在前面的 RxJava2.0使用詳解(一)初步分析了RxJava從創(chuàng)建到執(zhí)行的流程笨奠。RxJava2.0使用詳解(二) 中分析了RxJava的隨意終止Reactive流的能力的來源袭蝗;也明白了RxJavaonComplete();onError(t);只有一個會被執(zhí)行的秘密唤殴。RxJava2.X 源碼分析(三)中探索了RxJava2調(diào)用subscribeOn切換被觀察者線程的原理。
  • 本次我們將繼續(xù)探索RxJava2.x切換觀察者的原理到腥,分析observeOnsubscribeOn的不同之處眨八。繼續(xù)實現(xiàn)我們在第一篇中定下的小目標

二、從Demo到原理

  • OK左电,我們的Demo還是上次的demo廉侧,忘記了的小伙伴可以點擊RxJava2.X 源碼分析(三),這里就不再重復(fù)了哦篓足,我們直接進入正題段誊。
  • Ok,按照套路栈拖,我們從observeOn方法入手连舍。
  • Ok,我點~_
 @CheckReturnValue
  @SchedulerSupport(SchedulerSupport.CUSTOM)
  public final Observable<T> observeOn(Scheduler scheduler) {
      //false為默認無延遲發(fā)送錯誤涩哟,bufferSize為緩沖區(qū)大小
      return observeOn(scheduler, false, bufferSize());
  }
  • 我們繼續(xù)往下看索赏,我猜套路跟subscribeOn的逃不多,也是采用裝飾者模式贴彼,wrapper我們的ObservableObserver產(chǎn)生一個中間被觀察者和觀察中潜腻,通過中間被觀察者訂閱上游被觀察者,通過中間觀察者接收上游被觀察者下發(fā)的數(shù)據(jù)器仗,然后通過線程切換將數(shù)據(jù)傳遞給下游觀察者融涣。
  • Ok,我們來驗證下才想精钮。我覺得就是沒完全猜對威鹿,也能猜對其中的大部分。
 @CheckReturnValue
  @SchedulerSupport(SchedulerSupport.CUSTOM)
  public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
      ObjectHelper.requireNonNull(scheduler, "scheduler is null");
      ObjectHelper.verifyPositive(bufferSize, "bufferSize");
      return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
  }
  • Ok轨香,熟悉的RxJavaPlugins.onAssemblyhook處理忽你,略過,直接看new ObservableObserveOn(this, scheduler, delayError, bufferSize)這句

      public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
          final Scheduler scheduler;
          final boolean delayError;
          final int bufferSize;
          public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
              super(source);
              this.scheduler = scheduler;
              this.delayError = delayError;
              this.bufferSize = bufferSize;
          }
    
          @Override
        protected void subscribeActual(Observersuper T> observer) {
             //1臂容、在當前線程調(diào)度科雳,但不是立即執(zhí)行,放入隊列中
              if (scheduler instanceof TrampolineScheduler) {
                  source.subscribe(observer);
              } else {
               //2策橘、本次走的是這里
                  Scheduler.Worker w = scheduler.createWorker();
                //3
                  source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
              }
          }
    
  • Ok,果然炸渡,熟悉的模式娜亿,對我們上游的Observable,下游的Observerwrapper一次丽已。
    1、ObservableObserveOn繼承了AbstractObservableWithUpstream
    2买决、source保存上游的Observable
    3沛婴、scheduler為本次的調(diào)度器
    4吼畏、在下游調(diào)用subscribe訂閱時觸發(fā)->subscribeActual->Wrapper了下游的Observer觀察者
  • 3處:source為游Observable,下游Observer被wrapper到ObserveOnObserver嘁灯,發(fā)生訂閱數(shù)件泻蚊,上游Observable開始執(zhí)行subscribeActual,調(diào)用ObserveOnObserver的onSubscribe以及onNext丑婿、onError性雄、onComplete等

  • OK,我們接著看Observer被包裝進 ObserveOnObserver的樣子羹奉,代碼有點多秒旋,我們分段講解
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {

        private static final long serialVersionUID = 6576896619930983584L;
        //下游的Observer
        final Observersuper T> actual;
        //調(diào)度工作者
        final Scheduler.Worker worker;
        //是否延遲錯誤,默認false
        final boolean delayError;
        //隊列大小
        final int bufferSize;
        //存儲上游Observable下發(fā)的數(shù)據(jù)隊列
        SimpleQueue<T> queue;
        //存儲下游Observer的Disposable
        Disposable s;
        //存儲錯誤信息
        Throwable error;
        //校驗是否完畢
        volatile boolean done;
        //是否被取消
        volatile boolean cancelled;
        //存儲執(zhí)行模式诀拭,同步或者異步 同步
        int sourceMode;

        boolean outputFused;

        ObserveOnObserver(Observersuper T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }

        @Override
      public void onSubscribe(Disposable s) {

            if (DisposableHelper.validate(this.s, s)) {
                this.s = s;
                if (s instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    QueueDisposable<T> qd = (QueueDisposable<T>) s;

                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

                  //1迁筛、判斷執(zhí)行模式并調(diào)用onSubscribe傳遞給下游Observer
                    if (m == QueueDisposable.SYNC) {
                        sourceMode = m;
                        queue = qd;
                        //true 后面的onXX方法都不會被調(diào)用
                        done = true;
                        actual.onSubscribe(this);
                        //2、同步模式下耕挨,直接調(diào)用schedule
                        schedule();
                        return;
                    }
                    if (m == QueueDisposable.ASYNC) {
                        sourceMode = m;
                        queue = qd;
                        actual.onSubscribe(this);
                        //2细卧、異步模式下,等待schedule
                        return;
                    }
                }

                queue = new SpscLinkedArrayQueue<T>(bufferSize);
                //判斷執(zhí)行模式并調(diào)用onSubscribe傳遞給下游Observer
                actual.onSubscribe(this);
            }
        }
  • OK筒占,執(zhí)行玩這里之后贪庙,就到我們的onXX方法了
  • 首先可無限調(diào)用的onNext
 @Override
  public void onNext(T t) {
       //3、數(shù)據(jù)源是同步模式或者執(zhí)行過error / complete 會是true
      if (done) {
          return;
      }
      //如果數(shù)據(jù)源不是異步類型翰苫,
      if (sourceMode != QueueDisposable.ASYNC) {
          //4插勤、上游Observable下發(fā)的數(shù)據(jù)壓入queue
          queue.offer(t);
      }
      //5、開始調(diào)度
      schedule();
  }
  • 其次只能觸發(fā)一次的onError革骨,基本差不多
 @Override
    public void onError(Throwable t) {
        if (done) {
            //6农尖、已完成再執(zhí)行會拋一場
            RxJavaPlugins.onError(t);
            return;
        }
        //7、記錄錯誤信息
        error = t;
        //8良哲、標識已完成
        done = true;
        //9盛卡、開始調(diào)度
        schedule();
    }
  • 同樣是只能觸發(fā)一次的onComplete,同樣的套路筑凫,就不說了
 @Override
    public void onComplete() {
        if (done) {
            return;
        }
        done = true;
        schedule();
    }
  • 然后就是我們的關(guān)鍵點schedule();
 //關(guān)鍵點就是直接滑沧、簡單、里面線程調(diào)度工作者調(diào)用schedule(this)巍实,傳入了this
    void schedule() {
           //getAndIncrement很關(guān)鍵滓技,他原子性的保證了worker.schedule(this);在調(diào)度完之前不會被再次調(diào)度
        if (getAndIncrement() == 0) {
            worker.schedule(this);
        }
    }
  • 什么?傳入了this棚潦?那么說明什么呢令漂?( ̄? ̄)

  • 嗯?this是個runnable,沒錯叠必,我們的ObserveOnObserver實現(xiàn)了Runnable接口

  • 那么荚孵,接下來自然是調(diào)用run方法

    @Override
    public void run() {
          //outputFused一般是false
        if (outputFused) {
            drainFused();
        } else {
            drainNormal();
        }
  • 好吧,在看drainNormal前纬朝,我們先看一個函數(shù)
 //從名字看是檢測是否已終止
    boolean checkTerminated(boolean d, boolean empty, Observersuper T> a) {
        //1收叶、訂閱已取消
        if (cancelled) {
            //清空隊列
            queue.clear();
            return true;
        }
        //2、d其實是done共苛,
        if (d) {
            //done==ture可能的情況onNext剛被調(diào)度完判没,onError或者onCompele被調(diào)用,
            Throwable e = error;
            if (delayError) {
                //delayError==true時等到隊列為空才調(diào)用
                if (empty) {
                    if (e != null) {
                        a.onError(e);
                    } else {
                        a.onComplete();
                    }
                    worker.dispose();
                    return true;
                }
            } else {
                //否則直接調(diào)用
                if (e != null) {
                    queue.clear();
                    a.onError(e);
                    worker.dispose();
                    return true;
                } else
     if (empty) {
                    a.onComplete();
                    worker.dispose();
                    return true;
                }
            }
        }
        //否則未終結(jié)
        return false;
    }
  • true:1隅茎、訂閱被取消cancelled==true哆致,2、done==true onNext剛被調(diào)度完患膛,onError或者onCompele被調(diào)用

  • 繼續(xù)看drainNormal

void drainNormal() {
      int missed = 1;
      final SimpleQueue<T> q = queue;
      final Observersuper T> a = actual;
      //Ok,死循環(huán)摊阀,我們來看下有哪些出口
      for (;;) {
      //Ok,出口踪蹬,該方法前面分析的
      if (checkTerminated(done, q.isEmpty(), a)) {
              return;
          }

          //在此死循環(huán)
          for (;;) {
              boolean d = done;
              T v;
              try {
                  //分發(fā)數(shù)據(jù)出隊列
                  v = q.poll();
              } catch (Throwable ex) {
                  //有異常時終止退出
                  Exceptions.throwIfFatal(ex);
                  s.dispose();
                  q.clear();
                  a.onError(ex);
                  //停止worker(線程)
                  worker.dispose();
                  return;
              }
              boolean empty = v == null;
              //判斷隊列是否為空
              if (checkTerminated(d, empty, a)) {
                  return;
              }
               //沒數(shù)據(jù)退出
              if (empty) {
                  break;
              }
              //數(shù)據(jù)下發(fā)給下游Obsever胞此,這里支付者onNext,onComplete和onError主要放在了checkTerminated里面回調(diào)
              a.onNext(v);
          }
       //保證此時確實有一個 worker.schedule(this);正在被執(zhí)行跃捣,
          missed = addAndGet(-missed);
       //為何要這樣做呢漱牵?我的理解是保證drainNormal方法被原子性調(diào)用,如果執(zhí)行了addAndGet之后getAndIncrement() == 0就成立了疚漆,此時又一個worker.schedule(this);被調(diào)用了酣胀,那么就不能執(zhí)行break了
          if (missed == 0) {
              break;
          }
      }
  }

總結(jié)

  • Ok,看到這里我們基本了解了observeOn的實現(xiàn)流程娶聘,同樣是老套路闻镶,使用裝飾者模式,中間Wrapper了我們的Observable和Observer丸升,通過中間增加一個Observable和Observer來實現(xiàn)線程的切換铆农。
  • 喜歡就給我留言哦

相關(guān)文章

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市狡耻,隨后出現(xiàn)的幾起案子墩剖,更是在濱河造成了極大的恐慌,老刑警劉巖夷狰,帶你破解...
    沈念sama閱讀 206,482評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件岭皂,死亡現(xiàn)場離奇詭異,居然都是意外死亡沼头,警方通過查閱死者的電腦和手機爷绘,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,377評論 2 382
  • 文/潘曉璐 我一進店門书劝,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人揉阎,你說我怎么就攤上這事庄撮”嘲疲” “怎么了毙籽?”我有些...
    開封第一講書人閱讀 152,762評論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長毡庆。 經(jīng)常有香客問我坑赡,道長,這世上最難降的妖魔是什么么抗? 我笑而不...
    開封第一講書人閱讀 55,273評論 1 279
  • 正文 為了忘掉前任毅否,我火速辦了婚禮,結(jié)果婚禮上蝇刀,老公的妹妹穿的比我還像新娘螟加。我一直安慰自己,他們只是感情好吞琐,可當我...
    茶點故事閱讀 64,289評論 5 373
  • 文/花漫 我一把揭開白布捆探。 她就那樣靜靜地躺著,像睡著了一般站粟。 火紅的嫁衣襯著肌膚如雪黍图。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,046評論 1 285
  • 那天奴烙,我揣著相機與錄音助被,去河邊找鬼。 笑死切诀,一個胖子當著我的面吹牛揩环,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播幅虑,決...
    沈念sama閱讀 38,351評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼检盼,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了翘单?” 一聲冷哼從身側(cè)響起吨枉,我...
    開封第一講書人閱讀 36,988評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎哄芜,沒想到半個月后貌亭,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,476評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡认臊,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,948評論 2 324
  • 正文 我和宋清朗相戀三年圃庭,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,064評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡剧腻,死狀恐怖拘央,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情书在,我是刑警寧澤灰伟,帶...
    沈念sama閱讀 33,712評論 4 323
  • 正文 年R本政府宣布,位于F島的核電站儒旬,受9級特大地震影響栏账,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜栈源,卻給世界環(huán)境...
    茶點故事閱讀 39,261評論 3 307
  • 文/蒙蒙 一挡爵、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧甚垦,春花似錦茶鹃、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,264評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至垃杖,卻和暖如春男杈,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背调俘。 一陣腳步聲響...
    開封第一講書人閱讀 31,486評論 1 262
  • 我被黑心中介騙來泰國打工伶棒, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人彩库。 一個月前我還...
    沈念sama閱讀 45,511評論 2 354
  • 正文 我出身青樓肤无,卻偏偏與公主長得像,于是被迫代替她去往敵國和親骇钦。 傳聞我的和親對象是個殘疾皇子宛渐,可洞房花燭夜當晚...
    茶點故事閱讀 42,802評論 2 345

推薦閱讀更多精彩內(nèi)容

  • 轉(zhuǎn)載自:https://xiaobailong24.me/2017/03/18/Android-RxJava2.x...
    Young1657閱讀 2,016評論 1 9
  • 先來個RxAndroid的github地址 https://github.com/ReactiveX/RxAndr...
    大批閱讀 578評論 0 0
  • 我從去年開始使用 RxJava ,到現(xiàn)在一年多了眯搭。今年加入了 Flipboard 后窥翩,看到 Flipboard 的...
    Jason_andy閱讀 5,456評論 7 62
  • java.util.concurrent.locks包提供了鎖和等待條件的接口和類, 可用于替代JDK1.5之前的...
    待汝豪杰只是凡夫閱讀 357評論 0 0
  • 石家莊從今天早上就開始刮很大的風! 我騎車上班的時候鳞仙,被風刮得前行很費勁兒寇蚊。在過一個小路口兒時...
    關(guān)輝閱讀 919評論 1 0