閱讀Observable的xxx操作符的步驟
- 找到Observable的子類ObservableXXX
RxJavaPlugins.onAssembly(new ObservableXXX<T>()); - 查看ObservableXXX的subscribeActual(Observer<? super T> s)函數,一般做下面三件事
- 一般會創(chuàng)建一個Disposable接口的實現類d
- 調用s.onSubscribe(d);
- 具體subscribe實現代碼
- 具體subscribe實現代碼需要關注的幾個點
- disposed的實現
- observer的onNext面睛,OnComplete巡社,OnError何時被調用
以Observable.just為例
找到ObservableFromArray
-
查看ObservableFromArray的subscribeActual函數慈格,發(fā)現主要邏輯都在FromArrayDisposable的run方法里
public final class ObservableFromArray<T> extends Observable<T> { final T[] array; public ObservableFromArray(T[] array) { this.array = array; } @Override public void subscribeActual(Observer<? super T> s) { FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array); s.onSubscribe(d); if (d.fusionMode) { // 根據fusion mode值得來的,具體看QueueFuseable疹启,默認為false,暫時先不管 return; } d.run(); }
-
查看FromArrayDisposable的run
static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> { final Observer<? super T> actual; final T[] array; int index; boolean fusionMode; volatile boolean disposed; FromArrayDisposable(Observer<? super T> actual, T[] array) { this.actual = actual; this.array = array; } @Override public void dispose() { // 用一個boolean變量來標記 disposed = true; } @Override public boolean isDisposed() { return disposed; } void run() { T[] a = array; int n = a.length; for (int i = 0; i < n && !isDisposed(); i++) { // 如果沒有Disposed則遍歷array數組 T value = a[i]; if (value == null) { // 如果value為null走error,跳出for循環(huán) actual.onError(new NullPointerException("The " + i + "th element is null")); return; } // 走onNext actual.onNext(value); } if (!isDisposed()) { // 如果沒有Disposed唉铜,走onComplete actual.onComplete(); } } }
所以總結下:Just操作符依次發(fā)送數組中的事件,并且碰到null就中斷律杠;并且除非手動dispose狀態(tài)一直都不會變
PS
我的github:https://github.com/nppp1990/MyTips