我們用Observable提供的靜態(tài)方法just來寫一個簡單的列子
Observable.just(1).subscribe(object: Observer<Int>{
override fun onComplete() {
Log.d(Constants.TAG, "rx -- onComplete")
}
override fun onSubscribe(d: Disposable) {
Log.d(Constants.TAG, "rx -- onSubscribe")
}
override fun onNext(t: Int) {
Log.d(Constants.TAG, "rx -- onNext --$t")
}
override fun onError(e: Throwable) {
Log.d(Constants.TAG, "rx -- onError")
}
})
打印結(jié)果
D/cat: rx -- onSubscribe
D/cat: rx -- onNext --1
D/cat: rx -- onComplete
RxJava采用觀察者模式實現(xiàn)的草则,我們知道簡單的觀察者模式潭陪,一定是可觀察者(Observable)發(fā)生變化雄妥,觸發(fā)觀察者(Observer)的方法,實現(xiàn)實時監(jiān)聽
那么問題來了依溯,僅僅一個
just
方法老厌,如何實現(xiàn)觸發(fā)操作的呢?
我們來看源碼
public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, "The item is null");
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}
比較重要是創(chuàng)建了ObservableJust
實例黎炉,ObservableJust是Observable的實例類
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
private final T value;
public ObservableJust(final T value) {
this.value = value;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value);
observer.onSubscribe(sd);
sd.run();
}
@Override
public T call() {
return value;
}
}
看到這枝秤,可能也沒有明白,構(gòu)造方法只是傳入個一個value值慷嗜,還是沒有講解到如何觸發(fā)淀弹。
我們先去看下Observable的subscribe方法丹壕,代碼最終執(zhí)行
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
...
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
從上面可以看到最終會調(diào)用subscribeActual
方法。
再來分析subscribeActual
方法
@Override
protected void subscribeActual(Observer<? super T> observer) {
ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value);
// 會執(zhí)行到 onSubscribe()方法
observer.onSubscribe(sd);
// next complete方法會在這里執(zhí)行
sd.run();
}
這個ScalarDisposable又是什么呢垦页?
Represents a Disposable that signals one onNext followed by an onComplete.
@Override
public void run() {
if (get() == START && compareAndSet(START, ON_NEXT)) {
// 執(zhí)行onNext()方法
observer.onNext(value);
if (get() == ON_NEXT) {
lazySet(ON_COMPLETE);
執(zhí)行onComplete()方法
observer.onComplete();
}
}
}
從整個流程可以看到雀费,當(dāng)我們Observer訂閱了Observable之后,才會觸發(fā)Observable產(chǎn)出數(shù)據(jù)痊焊,進而觸發(fā)Observer的監(jiān)聽方法盏袄。