RxAndroid簡(jiǎn)單分析

先來(lái)個(gè)RxAndroid的github地址

https://github.com/ReactiveX/RxAndroid


官方例子

Observable.just("one", "two", "three", "four", "five")
    .subscribeOn(Schedulers.newThread())
    .observeOn(AndroidSchedulers.mainThread())吧
    .subscribe(/* an Observer */);

- - - 
###簡(jiǎn)化例子
```java
Observer<String> observer = new Observer<String>() {
         @Override
         public void onSubscribe(Disposable d) {
             Log.i(LOG_TAG,"[onSubscribe]  " + Thread.currentThread().getId());
         }

         @Override
         public void onNext(String value) {
             Log.i(LOG_TAG,"[onNext]  "+value + Thread.currentThread().getId());
         }

         @Override
         public void onError(Throwable e) {
             Log.i(LOG_TAG,"[onError]  "+e);
         }

         @Override
         public void onComplete() {
             Log.i(LOG_TAG,"[onComplete]  "+Thread.currentThread().getId());
         }
     };
Observable.just("next -- >  1","next  -->  2")
    .subscribe(observer);
  • 接下來(lái)看看Observable.just()方法的實(shí)現(xiàn)
  public static <T> Observable<T> just(T item1, T item2) {
      ObjectHelper.requireNonNull(item1, "The first item is null");
      ObjectHelper.requireNonNull(item2, "The second item is null");
      /*
          ObjectHelper.requireNonNull(item1, "The first item is null");
          這個(gè)方法僅僅是判斷item1是不是null 
          public static <T> T requireNonNull(T object, String message) {
              if (object == null) {
                  throw new NullPointerException(message);
               }
            return object;
          }
      */
      return fromArray(item1, item2);
  }
  • 可以看到j(luò)ust方法最后調(diào)用了 fromArray() 方法 接下來(lái)看看fromArray方法的實(shí)現(xiàn)
      public static <T> Observable<T> fromArray(T... items) {
          ObjectHelper.requireNonNull(items, "items is null");
          if (items.length == 0) {
              return return RxJavaPlugins.onAssembly((Observable<T>) ObservableEmpty.INSTANCE);
          } else
          if (items.length == 1) {
              return return RxJavaPlugins.onAssembly(new ObservableJust<T>(items[0]));
          }
          return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
       }
    
    • 可以看到fromArray方法就是調(diào)用了RxJavaPlugins.onAssembly這個(gè)方法怀喉,根據(jù)items的長(zhǎng)度不同傳遞了不同的參數(shù)
    • 先分析items.length == 1的情況迅诬,這個(gè)情況下傳入的實(shí)例是:new ObservableJust<T>(items[0]))刺覆,對(duì)應(yīng)上面的例子就是new ObservableJust<String>("one"));
    public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
    
      private final T value;
      public ObservableJust(final T value) {
          this.value = value;
      }
    
      @Override
      protected void subscribeActual(Observer<? super T> s) {
          ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
          s.onSubscribe(sd);
          sd.run();
      }
    
      @Override
      public T call() {
          return value;
      }
    

}

  - 這里有些疑問(wèn)。subscribeActual()這個(gè)方法是干什么的?以及里面的**ScalarDisposable**又是做什么的肘交?先放一放后面再說(shuō)

- 再來(lái)看看**RxJavaPlugins.onAssembly**這個(gè)方法
 ```java
public static <T> Observable<T> onAssembly(Observable<T> source) {
      Function<Observable, Observable> f = onObservableAssembly;
      if (f != null) {
          return apply(f, source);
      }
      return source;
  }
- 這里有一個(gè)非空的判斷(以后再說(shuō)這個(gè))氯窍,可以先理解成就直接吧source返回
  • 小結(jié)一哈just方法,首先是判斷傳入的item是不是空锤躁,如果不是空就調(diào)用了fromArray方法搁料,在fromArray里面構(gòu)造了一個(gè)Observable對(duì)象,然后直接返回系羞。

  • just方法構(gòu)造完了以后就調(diào)用了subscribe()方法并傳入了一個(gè)Observer對(duì)象郭计。看看suubscribe的核心代碼(就兩句話)
      public final void subscribe(Observer<? super T> observer) {
              observer = RxJavaPlugins.onSubscribe(this, observer);
              subscribeActual(observer);
      }
    
    • 先來(lái)看看RxJavaPlugins.onSubscribe(this, observer)這個(gè)究竟做了什么事~~
    public static <T> Observer<? super T> onSubscribe(Observable<T> source, Observer<? super T> observer) {
          BiFunction<Observable, Observer, Observer> f = onObservableSubscribe;
          if (f != null) {
              return apply(f, source, observer);
          }
          return observer;
      }
    
    • 這里面又有一個(gè)非空的判斷椒振,不管這個(gè)非空判斷昭伸,也就是直接返回了傳入的observer對(duì)象
    • 現(xiàn)在代碼就比較清晰了,其實(shí)就是直接調(diào)用了subscribeActual(observer)這個(gè)方法
  • 小結(jié)一哈澎迎,Observable.just()根據(jù)參數(shù)的長(zhǎng)度構(gòu)造了一個(gè)特定的Observable對(duì)象并返回庐杨,然后調(diào)用了該對(duì)象的subscribeActual方法并傳入observer
  • 接下來(lái)再來(lái)看前面留下的問(wèn)題,fromArray方法里面有根據(jù)items的長(zhǎng)度進(jìn)行實(shí)例化不同的Observable
    • item長(zhǎng)度為1的時(shí)候 --> ObservableJust
      • 它的subscribeActual()方法
      protected void subscribeActual(Observer<? super T> s) {
      ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
      s.onSubscribe(sd);
      sd.run();
      }
      - 這里又引入了一個(gè)新的類**ScalarDisposable**夹供,來(lái)看看這個(gè)又是做什么的
      ```java
      public static final class ScalarDisposable<T>
      extends AtomicInteger //這個(gè)類是用來(lái)原子操作的類,java里面i++都不是線程安全的~~
      implements QueueDisposable<T>, Runnable {
      
        private static final long serialVersionUID = 3880992722410194083L;
      
        final Observer<? super T> observer;
      
        final T value;
      
        static final int START = 0;
        static final int FUSED = 1;
        static final int ON_NEXT = 2;
        static final int ON_COMPLETE = 3;
      
        public ScalarDisposable(Observer<? super T> observer, T value) {
            this.observer = observer;
            this.value = value;
        }
        //中間省略了一大堆方法~~
        @Override
        public void run() {
            if (get() == START && compareAndSet(START, ON_NEXT)) {
                //上面就是比較和賦值原子操作
                observer.onNext(value);//在這里可以看到調(diào)用了onNext()
                if (get() == ON_NEXT) {
                    lazySet(ON_COMPLETE);
                    observer.onComplete();
                }
            }
        }
      }
      

    • 當(dāng)items的長(zhǎng)度大于1的時(shí)候 --> ObservableFromArray
      • 它的subscribeActual()方法
       public void subscribeActual(Observer<? super T> s) {
          FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
          s.onSubscribe(d);
          if (d.fusionMode) {
              return;
          }
          d.run();
      } 
      
      • 這里又出現(xiàn)了一個(gè)新的Disposable --> FromArrayDisposable灵份,但是不管怎么樣兒,最后都調(diào)用了d.run()方法

static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {

    final Observer<? super T> actual;

    final T[] array;

    int index;

    boolean fusionMode;

    volatile boolean disposed;

    FromArrayDisposable(Observer<? super T> actual, T[] array) {
        this.actual = actual;
        this.array = array;
    }
    void run() {
        T[] a = array;
        int n = a.length;

        for (int i = 0; i < n && !isDisposed(); i++) {//for循環(huán)調(diào)用撒~~
            T value = a[i];
            if (value == null) {
                actual.onError(new NullPointerException("The " + i + "th element is null"));
                return;
            }
            actual.onNext(value);//調(diào)用onNext方法
        }
        if (!isDisposed()) {
            actual.onComplete();
        }
    }
}
    ```

小結(jié):

調(diào)用just的時(shí)候構(gòu)造了一個(gè)Observable對(duì)象罩引,并根據(jù)不同的參數(shù)實(shí)例化不同的Observable各吨,不同的Observable有不同的subscribeActual()方法實(shí)現(xiàn),subscribeActual方法里面都有一個(gè)Disposable對(duì)象,最后都調(diào)用了Disposable的run(該方法調(diào)用了onNext()方法)方法揭蜒,最后在subscribe的時(shí)候?qū)嶋H上就是調(diào)用了Observable的subscribeActual方法横浑。


線程切換分析

eg:

Observable.just("next -- >  1","next  -->  2")
              .subscribeOn(AndroidSchedulers.mainThread())
              .observeOn(Schedulers.newThread())
              .subscribe(observer);
  • subscribeOn
    public final Observable<T> subscribeOn(Scheduler scheduler) {
          ObjectHelper.requireNonNull(scheduler, "scheduler is null");
          return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
      }
    
    • 可以看到這里也是調(diào)用了RxJavaPlugins.onAsswmbly()方法,只是這里的參數(shù)變成了ObservableSubscribeOn的實(shí)例屉更。
    • ObservableSubscribeOn
      public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
      final Scheduler scheduler;
    
      public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
          super(source);
          this.scheduler = scheduler;
      }
    
      @Override
      public void subscribeActual(final Observer<? super T> s) {
          final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    
          s.onSubscribe(parent);
    
          parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
              @Override
              public void run() {
                  source.subscribe(parent);
              }
          }));
        }
        ....省略其他代碼
    
    - 可以看到里面主要是調(diào)用了徙融,Scheduler的schedulerDirect()方法,并在這個(gè)里面調(diào)用了瑰谜,source.subscribe()
    
    • 這里我們就僅僅去看看Scheduler.newThread()的實(shí)現(xiàn)
     public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent) {
    

//省略部分代碼....
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
return sr;
}
```
- 這里就可以看出來(lái)實(shí)際上就是用的線程池來(lái)做的~~

  • observeOn
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
  • 這個(gè)也是調(diào)用了RxJavaPluginsonAssembly方法欺冀,傳入的對(duì)象是ObservableObserveOn的實(shí)例。
    • ObservableObserveOn
     //僅僅提出了核心代碼哈
     protected void subscribeActual(Observer<? super T> observer) {
       if (scheduler instanceof TrampolineScheduler) {
           source.subscribe(observer);
       } else {
           Scheduler.Worker w = scheduler.createWorker();
    
           source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
       }
    }
    
    • 這里對(duì)傳入的Scheduler進(jìn)行了判斷萨脑,如果是TrampolineScheduler類型就直接調(diào)用了隐轩,Source的subscribe方法,這個(gè)Scource其實(shí)就是調(diào)用observeOn方法的Observable
    • 先來(lái)看看當(dāng)Scheduler是newThreadScheduler的時(shí)候渤早,可以看到實(shí)例化了一個(gè)ObserveOnObserver
      static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {
    
      private static final long serialVersionUID = 6576896619930983584L;
      final Observer<? super T> actual;
      final Scheduler.Worker worker;
      final boolean delayError;
      final int bufferSize;
    
      SimpleQueue<T> queue;
    
      Disposable s;
    
      Throwable error;
      volatile boolean done;
    
      volatile boolean cancelled;
    
      int sourceMode;
    
      boolean outputFused;
    
      ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
          this.actual = actual;
          this.worker = worker;
          this.delayError = delayError;
          this.bufferSize = bufferSize;
      }
      //省略了很多代碼....
      @Override
      public void onNext(T t) {
          if (done) {
              return;
          }
    
          if (sourceMode != QueueDisposable.ASYNC) {
              queue.offer(t);
          }
          schedule();
      }
        ```
      - 從源碼可以看出來(lái)职车,**ObserveOnObserver**其實(shí)就是對(duì)Observer的一個(gè)包裝
      - 在**onNext**方法中可以看到線程切換的代碼
    
    

小結(jié):

其實(shí)搞了半天就是一個(gè)線程池在里面切換,對(duì)對(duì)象的各種包裝鹊杖。subscribeOn就是對(duì)Observable的包裝悴灵,切換了線程來(lái)調(diào)用source.subscribe()方法,而observeOn則是對(duì)Observer的包裝骂蓖,并重寫了里面的回調(diào)方法积瞒,在回調(diào)的時(shí)候會(huì)自動(dòng)切換線程。


AndroidSchedulers.mainThread()這個(gè)Scheduler的分析

public final class AndroidSchedulers {

    private static final class MainHolder {

        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });

    /** A {@link Scheduler} which executes actions on the Android main thread. */
    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }

    /** A {@link Scheduler} which executes actions on {@code looper}. */
    public static Scheduler from(Looper looper) {
        if (looper == null) throw new NullPointerException("looper == null");
        return new HandlerScheduler(new Handler(looper));
    }

    private AndroidSchedulers() {
        throw new AssertionError("No instances.");
    }
}
  • 可以看到最后都是實(shí)例化了HandlerScheduler登下,不同的是Looper的不同茫孔,
  • 再來(lái)看看HandlerScheduler的實(shí)現(xiàn)(僅僅貼出了主要的兩個(gè)方法)

@Override
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");

    run = RxJavaPlugins.onSchedule(run);
    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
    handler.postDelayed(scheduled, Math.max(0L, unit.toMillis(delay)));
    return scheduled;
}

@Override
public Worker createWorker() {
    return new HandlerWorker(handler);
}

private static final class HandlerWorker extends Worker {
    private final Handler handler;

    private volatile boolean disposed;

    HandlerWorker(Handler handler) {
        this.handler = handler;
    }

    @Override
    public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");

        if (disposed) {
            return Disposables.disposed();
        }

        run = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

        Message message = Message.obtain(handler, scheduled);
        message.obj = this; // Used as token for batch disposal of this worker's runnables.

        handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

        // Re-check disposed state for removing in case we were racing a call to dispose().
        if (disposed) {
            handler.removeCallbacks(scheduled);
            return Disposables.disposed();
        }

        return scheduled;
    }
```
  - 最后都是用的handler.postDelayed方法來(lái)做的線程切換,so android上面的Schulder其實(shí)就是用了庐船,Handler機(jī)制~~

RxAndroid使用不當(dāng)會(huì)有內(nèi)存泄漏的哦~~


Nothing is certain in this life. The only thing i know for sure is that. I love you and my life. That is the only thing i know. have a good day

:)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末银酬,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子筐钟,更是在濱河造成了極大的恐慌揩瞪,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,858評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件篓冲,死亡現(xiàn)場(chǎng)離奇詭異李破,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)壹将,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,372評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門嗤攻,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人诽俯,你說(shuō)我怎么就攤上這事妇菱。” “怎么了?”我有些...
    開封第一講書人閱讀 165,282評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵闯团,是天一觀的道長(zhǎng)辛臊。 經(jīng)常有香客問(wèn)我,道長(zhǎng)房交,這世上最難降的妖魔是什么彻舰? 我笑而不...
    開封第一講書人閱讀 58,842評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮候味,結(jié)果婚禮上刃唤,老公的妹妹穿的比我還像新娘。我一直安慰自己白群,他們只是感情好尚胞,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,857評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著川抡,像睡著了一般辐真。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上崖堤,一...
    開封第一講書人閱讀 51,679評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音耐床,去河邊找鬼密幔。 笑死,一個(gè)胖子當(dāng)著我的面吹牛撩轰,可吹牛的內(nèi)容都是我干的胯甩。 我是一名探鬼主播,決...
    沈念sama閱讀 40,406評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼堪嫂,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼偎箫!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起皆串,我...
    開封第一講書人閱讀 39,311評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤淹办,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后恶复,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體怜森,經(jīng)...
    沈念sama閱讀 45,767評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,945評(píng)論 3 336
  • 正文 我和宋清朗相戀三年谤牡,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了副硅。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,090評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡翅萤,死狀恐怖恐疲,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情,我是刑警寧澤培己,帶...
    沈念sama閱讀 35,785評(píng)論 5 346
  • 正文 年R本政府宣布碳蛋,位于F島的核電站,受9級(jí)特大地震影響漱凝,放射性物質(zhì)發(fā)生泄漏疮蹦。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,420評(píng)論 3 331
  • 文/蒙蒙 一茸炒、第九天 我趴在偏房一處隱蔽的房頂上張望愕乎。 院中可真熱鬧,春花似錦壁公、人聲如沸感论。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,988評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)比肄。三九已至,卻和暖如春囊陡,著一層夾襖步出監(jiān)牢的瞬間芳绩,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,101評(píng)論 1 271
  • 我被黑心中介騙來(lái)泰國(guó)打工撞反, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留妥色,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,298評(píng)論 3 372
  • 正文 我出身青樓遏片,卻偏偏與公主長(zhǎng)得像嘹害,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子吮便,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,033評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • 我從去年開始使用 RxJava 笔呀,到現(xiàn)在一年多了。今年加入了 Flipboard 后髓需,看到 Flipboard 的...
    Jason_andy閱讀 5,473評(píng)論 7 62
  • 1. Java基礎(chǔ)部分 基礎(chǔ)部分的順序:基本語(yǔ)法许师,類相關(guān)的語(yǔ)法,內(nèi)部類的語(yǔ)法授账,繼承相關(guān)的語(yǔ)法枯跑,異常的語(yǔ)法,線程的語(yǔ)...
    子非魚_t_閱讀 31,639評(píng)論 18 399
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理白热,服務(wù)發(fā)現(xiàn)敛助,斷路器,智...
    卡卡羅2017閱讀 134,659評(píng)論 18 139
  • 前言我從去年開始使用 RxJava 屋确,到現(xiàn)在一年多了纳击。今年加入了 Flipboard 后续扔,看到 Flipboard...
    占導(dǎo)zqq閱讀 9,164評(píng)論 6 151
  • 除了夢(mèng)中有你,翻看別處焕数,再找不出你的半點(diǎn)消息纱昧,可我仍然知曉,你的存在堡赔,她人無(wú)可代替识脆,但愿我們不在謀面,彼此...
    張暴少閱讀 199評(píng)論 0 1