先看一小段代碼
Observable<String> observable = Observable.create(observer->{
observer.onNext("處理的數(shù)字是"+Math.random()*100);
observer.onComplete();
});
observable.subscribe(consumer->{
System.out.println("我處理的元素是"+consumer);
});
observable.subscribe(consumer->{
System.out.println("我處理的元素是"+consumer);
});
執(zhí)行結(jié)果是
我處理的元素是處理的數(shù)字是19.702425673460567
我處理的元素是處理的數(shù)字是9.601318081392996
先看Observable.create方法
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
參數(shù)是ObservableOnSubscribe
public interface ObservableOnSubscribe<T> {
/**
* Called for each Observer that subscribes.
* @param emitter the safe emitter instance, never null
* @throws Exception on error
*/
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
其實(shí)我們可以把我們最開始的例子改寫成
ObservableOnSubscribe observableOnSubscribe = new ObservableOnSubscribe(){
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
emitter.onNext("處理的數(shù)字是"+Math.random()*100);
emitter.onComplete();
}};
Observable<String> observable = Observable.create(observableOnSubscribe);
observable.subscribe(consumer->{
System.out.println("我處理的元素是"+consumer);
});
observable.subscribe(consumer->{
System.out.println("我處理的元素是"+consumer);
});
我們把create方法參數(shù)還原成1.8之前的寫法现斋,我們一眼就看出文章一開始的代碼寫的observer是影響我們理解代碼的
observer->{
observer.onNext("處理的數(shù)字是"+Math.random()*100);
observer.onComplete();
}
其實(shí)是emitter更為恰當(dāng)
emitter->{
emitter.onNext("處理的數(shù)字是"+Math.random()*100);
emitter.onComplete();
}
這個ObservableEmitter 又是個接口锥涕,也就是說下面這幾行代碼只是定義了一個模版,subscribe的時候莱找,由ObservableEmitter的實(shí)現(xiàn)類還具體執(zhí)行onNext和onComplete。那么實(shí)現(xiàn)類在哪里呢?
ObservableOnSubscribe observableOnSubscribe = new ObservableOnSubscribe(){
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
emitter.onNext("處理的數(shù)字是"+Math.random()*100);
emitter.onComplete();
}};
我們再看Observable.create方法
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
也就是說
Observable<String> observable = Observable.create(observableOnSubscribe);
observable等于ObservableCreate的一個實(shí)例。這個ObservableCreate留著待用峦筒。
我們再看observable.subscribe方法
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
可以看到除了onNext函數(shù)是往下傳遞的,剩下的參數(shù)都是默認(rèn)值贸伐。
再放下跟subscribe方法
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
注意這個LambdaObserver勘天,傳遞進(jìn)來的onNext函數(shù),在這里包裝成了一個observer對象捉邢。
繼續(xù)進(jìn)入subscribe(ls);
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
沒什么重要的脯丝,繼續(xù)進(jìn)入subscribeActual(observer);
protected abstract void subscribeActual(Observer<? super T> observer);
發(fā)現(xiàn)是個抽象方法,那么自然應(yīng)該是剛剛待用的ObservableCreate的
subscribeActual方法
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
第一句把observer對象再包裝成一個emitter對象
第二句什么也沒執(zhí)行伏伐,因?yàn)閛bserver只有onNext是傳進(jìn)來一個lambda宠进,其他三個參數(shù)都是默認(rèn)的。記得是個emptyConsumer藐翎。
本文的重中之重就是下面這句
source.subscribe(parent);
source就是我們一開始定義的observableOnSubscribe
subscribe就是observableOnSubscribe的subscribe方法
參數(shù)parent就是剛剛的CreateEmitter
ObservableOnSubscribe observableOnSubscribe = new ObservableOnSubscribe(){
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
emitter.onNext("處理的數(shù)字是"+Math.random()*100);
emitter.onComplete();
}};
至此所有邏輯拼接成功
先執(zhí)行subscribe材蹬,然后再執(zhí)行我們自己定義的onNext。
done