一、RxJava2 整體功能分析
下面這段代碼很簡單宵膨,就是事件源會(huì)在當(dāng)前線程通過 e.onNext() 的方式發(fā)送 "1"涯曲,"2","3" 三個(gè)事件秃臣,最后發(fā)送 e.onComplete() 第四個(gè)事件蠢终,那么在訂閱者 Observer 中就可以收到這個(gè)幾個(gè)由事件源發(fā)送的事件梦染。接下來通過源碼的角度分析下面這段代碼的整體邏輯
在分析代碼之前需要明白一個(gè)原則,那就是了解一個(gè)類首先先了解這個(gè)的頂層接口晶姊,通過頂層接口就可以明白這個(gè)類的框架體系的大體功能了,子類只是對(duì)這個(gè)體系的功能擴(kuò)展而已。這就好比學(xué)習(xí)集合框架一樣姑裂,我們首先會(huì)去了解 Collection 接口內(nèi)部的所有的方法,知道了這些方法之后泞辐,我們心里就大概知道這個(gè) Collection 體系大概的功能了,然后再慢慢的去了解它的實(shí)現(xiàn)類對(duì)這些功能的具體實(shí)現(xiàn)喂急。
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("1");
e.onNext("2");
e.onNext("3");
e.onComplete();
}
}).subscribe(new Observer<String>() {
private Disposable mD = null;
@Override
public void onSubscribe(Disposable d) {
mD = d;
}
@Override
public void onNext(String s) {
if ("2".equals(s)) {
mD.dispose();
}
System.out.println("s = " + s);
}
@Override
public void onError(Throwable e) {
System.out.println(e.toString());
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
二格嘁、Observable 的繼承關(guān)系
Observable 是一個(gè)抽象類,是 ObservableSource 的實(shí)現(xiàn)類廊移,而 ObservableSource 類是一個(gè)接口糕簿,它表示事件源。內(nèi)部只有一個(gè)方法 subscribe 該方法表示通過 Observer 訂閱當(dāng)前的事件源狡孔。那么事件發(fā)布的事件懂诗,在 Observer 訂閱者中就會(huì)被收到。了解了 Observable 的頂層接口之后苗膝,我們就知道該體系最重要的一個(gè)功能那就是 subscribe 方法了殃恒,因此我們就重點(diǎn)關(guān)注子類的 subscribe 方法。
public interface ObservableSource<T> {
void subscribe(Observer<? super T> observer);
}
跟蹤 Observable 中 subscribe 的調(diào)用關(guān)系荚醒,最后可以知道最終會(huì)調(diào)用到一個(gè)方法第 31 行代碼 subscribeActual(observer); 期間做了多次轉(zhuǎn)換操作芋类,這些我們不用管。我說過現(xiàn)在分析是整體流程界阁,所以沒有必要去分析細(xì)枝末節(jié)的東西侯繁,不然會(huì)迷失方向的。所以大膽的得出一個(gè)結(jié)論泡躯,只要是 ObservableSource 的子類贮竟,那么我們只要關(guān)心 subscribeActual(observer); 這個(gè)方法就好的。
public abstract class Observable<T> implements ObservableSo
urce<T> {
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe() {
return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
//這段代碼是核心代碼较剃。
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS")
npe.initCause(e);
throw npe;
}
}
三咕别、Observable#create(ObservableOnSubscribe)
我們?cè)?create 方法中傳入一個(gè) ObsevableOnSubscribe 對(duì)象,而這個(gè)對(duì)象就是一個(gè) Observable 的父類写穴。而 create 方法顧名思義就適用于創(chuàng)建 Observable 對(duì)象的惰拱。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
//非空校驗(yàn)
ObjectHelper.requireNonNull(source, "source is null");
//內(nèi)部就是創(chuàng)建一個(gè) ObservableCreate 對(duì)象
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
四、ObservableCreate
上面的 create 方法中內(nèi)部實(shí)際返回的是一個(gè) ObservableCreate 對(duì)象啊送,而這個(gè)類實(shí)際上就是 Observable 的子類偿短。通過構(gòu)造方法方法可以知道當(dāng)前創(chuàng)建的 ObservableCreate 內(nèi)部維護(hù)了上一級(jí)創(chuàng)建的 ObsevableOnSubscribe 對(duì)象欣孤,這個(gè)對(duì)象就是用戶在 create 方法傳入的對(duì)象。這里很重要昔逗,因?yàn)橄旅婷恳患?jí)都會(huì)創(chuàng)建一個(gè)新的 Observable 對(duì)象降传,內(nèi)部都會(huì)保存上一級(jí)的 ObservableOnSubscribe 對(duì)象。如果不太理解的話勾怒,先放下婆排,等下面分析了應(yīng)該就會(huì)明白了。到這里我們就知道 Observable.create() 方法會(huì)返回一個(gè) Observable 類型的對(duì)象笔链。
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T>
source) {
//內(nèi)部保存了上一級(jí)創(chuàng)建的 ObservableOnSubscribe 對(duì)象的引用段只。
this.source = source;
}
}
五、觸發(fā) subscribe 方法
這個(gè)方法大家都知道卡乾,就是用來發(fā)生訂閱關(guān)系的翼悴。在 RxJava 中事件源 Observable 只有發(fā)生了訂閱才會(huì)發(fā)送事件。我們知道剛才通過 create 方法的分析可以知道幔妨,內(nèi)部是創(chuàng)建了 ObservableCreate 這個(gè) Observable 子類的鹦赎,那么就分析 ObservableCreate 的 subscribe 的內(nèi)部實(shí)現(xiàn)即可。
- ObservableCreate#subscribeActual
在上面已經(jīng)分析過了误堡,只要是 Observable 類型的對(duì)象古话,在調(diào)用 subscribe(observer) 最終都會(huì)調(diào)用調(diào) subscribeActual(observer) 方法。
@Override
//subscribe 方法內(nèi)部會(huì)調(diào)用 subscribeActual
protected void subscribeActual(Observer<? super T> observer) {
//發(fā)射器
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//回調(diào)給 observer#onSubscribe
observer.onSubscribe(parent);
try {
//告訴上一級(jí)的 observable 你可以發(fā)送事件了锁施。
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
- 分析事件源是如何發(fā)送事件的陪踩?
在文章開頭,我們?cè)?ObservableOnSubscribe#subscribe 方法內(nèi)部發(fā)送的了 4 個(gè)事件悉抵。那么這個(gè) ObservableOnSubscribe#subscribe(ObservableEmitter) 是在哪里調(diào)用的呢肩狂?還記得 ObservableCreate 類中的 subscribeActual 的實(shí)現(xiàn)嗎?它的內(nèi)部調(diào)用 source.subscribe(parent); 這個(gè)方法姥饰,目的就是將發(fā)射器 CreateEmitter 傳遞給上一級(jí)創(chuàng)建的 ObservableOnSubscribe 對(duì)象傻谁。
@Override
//subscribe 方法內(nèi)部會(huì)調(diào)用 subscribeActual
protected void subscribeActual(Observer<? super T> observer) {
//發(fā)射器
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//回調(diào)給 observer#onSubscribe
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
這樣上面這 4 個(gè)事件就可以通過 ObservableEmitter 對(duì)象發(fā)送了,由于多態(tài)的原理列粪,實(shí)際上是由 CreateEmitter 去發(fā)送這四個(gè)事件的审磁。
**CreateEmitter 就是上面描述的 Emitter 的實(shí)現(xiàn)類。 **
//發(fā)射器
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//將發(fā)送器對(duì)象傳入給上一級(jí)創(chuàng)建的 ObservableOnSubscribe 對(duì)象岂座,其實(shí)也就類似于接口回調(diào)的方式去通知 Observable 您的訂閱者 Observer 已準(zhǔn)備好了态蒂,您可以發(fā)送事件了。
source.subscribe(parent);
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("1");
e.onNext("2");
e.onNext("3");
e.onComplete();
}
});
- ObservableCreate#CreateEmitter
這個(gè)類是一個(gè)發(fā)射器费什,它是 Emitter 的實(shí)現(xiàn)類钾恢,主要用于發(fā)射事件的。內(nèi)部封裝了 Observer 對(duì)象,這個(gè) Observer 就是通過 subscribe(observer) 參數(shù)傳入的 observer 對(duì)象瘩蚪,那么在 CreateEmitter 中調(diào)用 onNext,onError,onComplete 方法內(nèi)部都去調(diào)用該 observber 對(duì)象對(duì)應(yīng)的 onNext(t),onError(t),onComplete() 方法刑桑。這樣就實(shí)現(xiàn)了事件源 Emitter 發(fā)送事件,在訂閱者 Observer 收到事件了募舟。
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
//onNext 的參數(shù)不能為 null
if (t == null) {
return;
}
if (!isDisposed()) {
//回調(diào) observer 對(duì)應(yīng)的方法
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
//onError 的參數(shù)不能為 null
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."
}
if (!isDisposed()) {
try {
//回調(diào) observer 對(duì)應(yīng)的方法
observer.onError(t);
} finally {
dispose();
}
} else {
RxJavaPlugins.onError(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
//回調(diào) observer 對(duì)應(yīng)的方法
observer.onComplete();
} finally {
dispose();
}
}
}
@Override
public void setDisposable(Disposable d) {
DisposableHelper.set(this, d);
}
@Override
public void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}
@Override
public ObservableEmitter<T> serialize() {
return new SerializedEmitter<T>(this);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
- Emitter
發(fā)射器頂層接口,定義 onNext,onError,onComplete 方法闻察。
public interface Emitter<T> {
/**
* Signal a normal value.
* @param value the value to signal, not null
*/
void onNext(T value);
/**
* Signal a Throwable exception.
* @param error the Throwable to signal, not null
*/
void onError(Throwable error);
/**
* Signal a completion.
*/
void onComplete();
}
- Disposable 的作用
Disposable 可以理解為一個(gè)事件源和訂閱者的一個(gè)連接器拱礁,當(dāng)調(diào)用 dispose() 方法之后,這個(gè)連接器就關(guān)閉了辕漂,那么事件源將不會(huì)往該訂閱者 observer 發(fā)送事件了呢灶。isDisposed() 就是用于判斷該連接器是否被中斷了。
public interface Disposable {
/**
* Dispose the resource, the operation should be idempotent.
*/
void dispose();
/**
* Returns true if this resource has been disposed.
* @return true if this resource has been disposed
*/
boolean isDisposed();
}
- Disposable 的使用
還是回到 ObservableCreate 這個(gè)類的 subscribeActual 方法钉嘹,這個(gè)方法中是發(fā)生訂閱的時(shí)候調(diào)用的鸯乃。在其內(nèi)部有這段代碼
observer.onSubscribe(parent); 這個(gè) parent 就是先前創(chuàng)建的 CreateEmitter 對(duì)象,從上面的源碼可以看到該類實(shí)現(xiàn)了 Emitter 接口外跋涣,還實(shí)現(xiàn)了 Disposable 接口缨睡。那么在外部的Observer 中的 onSubscribe 這個(gè)方法可以收到 Disposable 對(duì)象,那么用戶就可以在適當(dāng)?shù)臅r(shí)候進(jìn)行關(guān)閉連接器操作了陈辱。下面的代碼示例中奖年,在 onNext 方法中當(dāng)收到的事件為 "2" 時(shí),那么就調(diào)用 dispose() 關(guān)閉連接器沛贪。而關(guān)閉之后事件源在發(fā)送下一個(gè)事件的時(shí)候就會(huì)判斷該連接器是否是關(guān)閉的陋守,具體代碼看 CreateEmitter#onNext 方法,它內(nèi)部會(huì)判斷 if (!isDisposed()) 判斷利赋。如果已經(jīng)關(guān)水评,那么將不會(huì)再往該 Observer 發(fā)送事件了。
//CreateEmitter 類繼承結(jié)構(gòu)
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable
//開始訂閱
.subscribe(new Observer<String>() {
private Disposable mD = null;
//onSubscribe 方法用于接收一個(gè) Dispoable 對(duì)象媚送。
@Override
public void onSubscribe(Disposable d) {
mD = d;
}
@Override
public void onNext(String s) {
//當(dāng)接收到的事件為 "2" 時(shí)中燥,那么就關(guān)閉連接器。
if ("2".equals(s)) {
mD.dispose();
}
System.out.println("s = " + s);
}
@Override
public void onError(Throwable e) {
System.out.println(e.toString());
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
六季希、總結(jié)
1褪那、在 RxJava 中最重要的就是每一次 Observable 的創(chuàng)建都會(huì)保存上一級(jí)的創(chuàng)建的 Observable 對(duì)象,這個(gè)有什么用呢式塌?其實(shí)每一個(gè) Observable 都要進(jìn)行 subscribe 發(fā)生訂閱關(guān)系的博敬。在當(dāng)前 Observable 調(diào)用了 subscribe 之后,還需要調(diào)用上一級(jí)創(chuàng)建的 Observable.subscribe() 進(jìn)行訂閱峰尝,這樣一級(jí)級(jí)往上發(fā)生訂閱關(guān)系偏窝。這個(gè)作用是可以在下一節(jié)分析線程切換時(shí)就用體現(xiàn)了,到時(shí)再分析咯。
2祭往、分析整體流程不要在意細(xì)枝末節(jié)伦意,先接觸頂層接口,了解體系功能硼补。