引言
關(guān)于RxJava2的用法網(wǎng)上的資料很多,這里我們只學(xué)習(xí)它的實現(xiàn)原理握侧。本文專題目的:
1.知道源頭(Observable)是如何將數(shù)據(jù)發(fā)送出去的。
2.知道終點(Observer)是如何接收到數(shù)據(jù)的。
3.何時將源頭和終點關(guān)聯(lián)起來的
今天我們先從最簡單的無背壓(Observable)的create操作符說起远剩,來解決前三個問題汛兜。
樣例
//1.創(chuàng)建被觀察者巴粪,生產(chǎn)事件
final Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
//2.訂閱的時候發(fā)送事件
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
// emitter.onError(new Throwable("haha"));
emitter.onComplete();//onComplete事件發(fā)送后,后面的所有事件無效,且后面不能發(fā)送錯誤事件
emitter.onNext(3);
}
});
//3.定義觀察者
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "開始采用subscribe連接");
mDisposable = d;
}
@Override
public void onNext(Integer value) {
Log.e(TAG, "對Next事件" + value + "作出響應(yīng)");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "對Error事件作出響應(yīng)");
}
@Override
public void onComplete() {
Log.e(TAG, "對Complete事件作出響應(yīng)");
}
};
//4建立聯(lián)系
observable.subscribe(observer);
我們看到出現(xiàn)了一下幾個角色:
- Observable:被觀察者粥谬,是數(shù)據(jù)的源頭肛根,通過subscribe訂閱被觀察者;
- ObservableOnSubscribe:從代碼結(jié)構(gòu)上看,Observable的構(gòu)造方法需要它漏策,且持有subscribe方法派哲,這里暫時理解為觀察者和被觀察者的中間件,具體作用后面再看;
- ObservableEmitter:顧名思義,是數(shù)據(jù)發(fā)射器掺喻,被觀察者通過它發(fā)送事件;
- Observer:被觀察者芭届,數(shù)據(jù)接受者,持有onNext感耙、onError褂乍、onComplete、onSubscribe方法即硼。
Observable
public abstract class Observable<T> implements ObservableSource<T> {
...
}
實現(xiàn)了ObservableSource接口:
public interface ObservableSource<T> {
/**
* Subscribes the given Observer to this ObservableSource instance.
* @param observer the Observer, not null
* @throws NullPointerException if {@code observer} is null
*/
void subscribe(Observer<? super T> observer);
}
接口很簡單逃片,提供了訂閱觀察者的功能,Observable中該方法的實現(xiàn)后面再看只酥。我們先看看create操作符干了些啥:
public abstract class Observable<T> implements ObservableSource<T> {
...
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
//判空
ObjectHelper.requireNonNull(source, "source is null");
//構(gòu)造ObservableCreate對象
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
...
}
最后通過用戶構(gòu)造的ObservableOnSubscribe對象褥实,返回了ObservableCreate對象。我們先看看ObservableOnSubscribe裂允。
數(shù)據(jù)發(fā)射封裝-ObservableOnSubscribe
public interface ObservableOnSubscribe<T> {
/**
* Called for each Observer that subscribes.
* @param e the safe emitter instance, never null
* @throws Exception on error
*/
void subscribe(ObservableEmitter<T> e) throws Exception;
}
是個接口损离,用戶發(fā)射數(shù)據(jù)就是在這個接口實現(xiàn)中完成,具體見樣例代碼绝编,這里流一個疑問:入?yún)bservableEmitter是怎么來的僻澎,別急后面馬上會講到!
create操作符的產(chǎn)物-ObservableCreate
然后再看Observable的子類ObservableCreate十饥,根據(jù)名字我們可以猜到它是由create操作符創(chuàng)建的被觀察者:
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;//用戶實現(xiàn)具體的數(shù)據(jù)發(fā)射操作
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
//調(diào)用中間件ObservableOnSubscribe的訂閱方法窟勃,開始調(diào)用發(fā)射數(shù)據(jù)的代碼了!
//說明1
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
...
}
這里的source由用戶構(gòu)造绷跑,實現(xiàn)發(fā)射數(shù)據(jù)操作拳恋,subscribeActual方法是核心,當(dāng)訂閱觀察者是砸捏,最終會執(zhí)行subscribeActual方法谬运,后面會具體說明隙赁,不過看方法名也應(yīng)該能猜到。
前面我們講ObservableOnSubscribe的subscribe方法時梆暖,關(guān)于入?yún)⒌膩碓戳粝铝艘粋€疑問伞访,這里看說明1的代碼:入?yún)arent類型為CreateEmitter,很明顯它必然是ObservableEmitter的子類或者子接口轰驳。
事件發(fā)射器--ObservableEmitter
public interface ObservableEmitter<T> extends Emitter<T> {
...
}
public interface Emitter<T> {
/**
* Signal a normal value.
* @param value the value to signal, not null
*/
void onNext(T value);
/**
* Signal a Throwable exception.
* @param error the Throwable to signal, not null
*/
void onError(Throwable error);
/**
* Signal a completion.
*/
void onComplete();
}
可見發(fā)射器接口提供了發(fā)射數(shù)據(jù)的功能厚掷,在回過頭來看看CreateEmitter,它是ObservableCreate的內(nèi)部類.
public final class ObservableCreate<T> extends Observable<T> {
...
...
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
//持有觀察者,發(fā)射器的發(fā)送數(shù)據(jù)的方法其實是調(diào)用觀察者對應(yīng)的方法
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
//調(diào)用觀察者的接收數(shù)據(jù)的方法
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
} else {
RxJavaPlugins.onError(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
//發(fā)送完成事件后,斷開連接级解,不接受后序事件
observer.onComplete();
} finally {
dispose();
}
}
}
...
}
}
饒了半天冒黑,我們終于找到被觀察者被調(diào)用的地方了,用戶調(diào)用發(fā)射器的發(fā)送數(shù)據(jù)的方法最終會通過ObservableCreate中的CreateEmitter實現(xiàn)調(diào)用勤哗,而CreateEmitter最終又會調(diào)用觀察者的接收數(shù)據(jù)方法抡爹,到此為止,下游接收數(shù)據(jù)的流程如下:
1.create操作符通過ObservableOnSubscribe對象構(gòu)造ObservableCreate對象;
- ObservableCreate在執(zhí)行訂閱方法subscribeActual時芒划,通過Observer對象構(gòu)造發(fā)射器CreateEmitter;
- CreateEmitter發(fā)射數(shù)據(jù)最終會調(diào)用Observer對應(yīng)的接收數(shù)據(jù)方法冬竟。
Observable的訂閱方法
上面我們理解了接收數(shù)據(jù)的流程,下面我們瞅瞅Observable和Observer建立聯(lián)系的訂閱方法:
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
//subscribeActual核心代碼C癖啤1门埂!
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
lei了lei了拼苍!看到subscribeActual方法就像遇到親人了笑诅,之前我們了解到Create操作法創(chuàng)建的是ObservableCreate對象,這里用戶執(zhí)行訂閱方法時會調(diào)用subscribeActual映屋,我們再回頭看看ObservableCreate的subscribeActual實現(xiàn):
@Override
protected void subscribeActual(Observer<? super T> observer) {
//根據(jù)Observer構(gòu)造發(fā)射器
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
//這里由用戶實現(xiàn)發(fā)射數(shù)據(jù)操作
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
讀到這里苟鸯,發(fā)現(xiàn)整個數(shù)據(jù)的產(chǎn)生和接收終于打通同蜻,訂閱方法通過 source.subscribe(parent)由用戶發(fā)射數(shù)據(jù)棚点,在本樣例中就是:
final Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
//2.訂閱的時候發(fā)送事件
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
// emitter.onError(new Throwable("haha"));
emitter.onComplete();//onComplete事件發(fā)送后,后面的所有事件無效,且后面不能發(fā)送錯誤事件
emitter.onNext(3);
}
});
emitter參數(shù)就是CreateEmitter類型發(fā)射器parent湾蔓。相信看到這里瘫析,整個數(shù)據(jù)的流程應(yīng)該比較清晰了。數(shù)據(jù)流向如下:
Observable訂閱Observer--> Observable執(zhí)行subscribeActual方法--> ObservableOnSubscribe執(zhí)行subscribe方法--> ObservableEmitter執(zhí)行發(fā)射數(shù)據(jù)方法--> Observer執(zhí)行接收數(shù)據(jù)方法默责。
我們可以看到贬循,RxJava規(guī)定了數(shù)據(jù)從Observable到Observer的統(tǒng)一流程,至于用戶發(fā)送什么數(shù)據(jù)桃序、按什么順序發(fā)都通過中間件ObservableOnSubscribe和ObservableEmitter實現(xiàn)杖虾。