在分析之前呢,首先需要大家打開源碼對照分析榜旦。效果更加S钠摺!
首先我們看一下RxJava 2 三步曲的一個基本實(shí)現(xiàn):
1 創(chuàng)建被觀察者(也叫數(shù)據(jù)發(fā)射者)
2 創(chuàng)建觀察者(也叫數(shù)據(jù)消費(fèi)者)
3 建立訂閱關(guān)系
代碼如下:
//第一步 創(chuàng)建被觀察者
Observable<String> observable = Observable.create(
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("發(fā)射數(shù)據(jù)");
e.onComplete();
}
}
);
//第二步 創(chuàng)建觀察者
Observer<String> observer = new Observer<String>() {
private Disposable disposable;
@Override
public void onSubscribe(Disposable d) {
disposable = d;
}
@Override
public void onNext(String value) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
//第三步 建立訂閱關(guān)系
observable.subscribe(observer);
新記:
我們來看一下創(chuàng)建Observable的這個靜態(tài)方法create.實(shí)際上這個是RxJava大量的操作符中的一個溅呢,create方法會返回一個Observable實(shí)例澡屡。
create方法的參數(shù)是一個實(shí)現(xiàn)了ObservableOnSubscribe接口的對象實(shí)例,該接口提供了發(fā)射數(shù)據(jù)的回調(diào)subscribe()方法,回調(diào)回來的ObservableEmitter實(shí)例就可以看成是數(shù)據(jù)發(fā)射器咐旧,用來發(fā)射數(shù)據(jù)驶鹉。
我們來看看Observable.create()的內(nèi)部實(shí)現(xiàn):
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
首先是創(chuàng)建了一個ObservableCreate實(shí)例,是Observable的子類铣墨,把之前參數(shù)中創(chuàng)建的ObservableOnSubscribe實(shí)例直接傳了進(jìn)去室埋,作了件什么事呢?實(shí)際上實(shí)現(xiàn)了一個代理的作用,代理的是誰姚淆?是Observer孕蝉,后面我們再詳細(xì)分析,我們可以確定腌逢,ObservableCreate就是一個Observable降淮。
我們再看RxJavaPlugins.onAssembly().這個方法拿了ObservableCreate(Observable)實(shí)例去做了什么:
public static <T> Observable<T> onAssembly(Observable<T> source) {
Function<Observable, Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
RxJavaPlugins這個類是一個鉤子函數(shù)集合類,為RxJava中大量的操作符提供鉤子函數(shù)的注入搏讶。這里的鉤子函數(shù)會對Observable實(shí)例按照鉤子函數(shù)實(shí)際提供的功能進(jìn)行加工處理佳鳖,然后返回一個處理過的Observable。實(shí)際上在我們上面所寫的這個代碼示例中媒惕,這里的鉤子函數(shù)為null系吩,并沒有對Observable做任何處理就直接返回了。
實(shí)際上我們示例代碼中整個被觀察者Observable的創(chuàng)建吓笙,實(shí)際創(chuàng)建的是一個ObservableCreate實(shí)例淑玫,該實(shí)例提供了回調(diào)方法subscribe(),當(dāng)發(fā)生訂閱行為時(shí)會回調(diào)面睛,也就是示例代碼中執(zhí)行observable.subscribe(),訂閱后就可以發(fā)射數(shù)據(jù)了絮蒿。通過數(shù)據(jù)發(fā)射器ObservableEmitter來進(jìn)行數(shù)據(jù)發(fā)射。
像這種訂閱后才開始發(fā)射數(shù)據(jù)的叁鉴,我們稱為Cold Observable土涝; 另外一種稱作Hot Observable,這種是不管有沒有觀察者來訂閱都會不斷地發(fā)射數(shù)據(jù)。
我們再來看看observable.subscribe()訂閱的內(nèi)部代碼:
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
...//這里把不關(guān)心的代碼省略掉了
}
首先是代碼中的RxJavaPlugins.onSubscribe()的調(diào)用與上面講的鉤子方法是一樣的幌墓,這里是通過鉤子方法對observer作了某種處理但壮。示例中實(shí)際上也并未調(diào)用實(shí)際的鉤子方法。
最后就執(zhí)行到了subscribeActual()方法常侣,我們前面講過我們的Observable是一個ObservableCreate實(shí)例蜡饵,subscribeActual方法在Observable中是一個虛方法,真正的實(shí)現(xiàn)是在ObservableCreate中胳施,代碼如下:
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
代碼中的CreateEmitter是ObservableCreate的內(nèi)部類溯祸,實(shí)現(xiàn)了對observer的代理,CreateEmitter同時(shí)也實(shí)現(xiàn)了Disposable接口舞肆,該接口提供了dispose方法焦辅,可以用來停止對數(shù)據(jù)的接收。
我們接下來再看后面的幾行代碼就很明了了椿胯,首先是進(jìn)行了observer的onSubscribe回調(diào)筷登,然后是調(diào)用了observable的subscribe回調(diào),回調(diào)后就執(zhí)行數(shù)據(jù)發(fā)射操作哩盲。
整個過程就這樣前方。
最后狈醉,我們來看一下執(zhí)行的順序: