Rxjava 過(guò)程分析一
說(shuō)明
- 該文章是基于 Rxjava2 源碼牙言。
- 該篇只是講述 Rxjava 建議的用法掸犬,不涉及操作符和線(xiàn)程切換聋伦, 后兩個(gè)會(huì)有新的篇幅去寫(xiě)。 一步一步的來(lái)蛉拙。
- 在源碼中那些判空還有 Rxjava 中 RxJavaPlugins 鉤子等在分析中去除(只關(guān)注用法和思想尸闸, 和主流程不管的暫時(shí)剔除)。
- 由于習(xí)慣孕锄, 和 Rxjava2 中的命名吮廉。 我稱(chēng) emitter 為上游, 也就是發(fā)射水(數(shù)據(jù))的源頭畸肆, 結(jié)果回調(diào)給外部的 FlowableSubscriber宦芦, 我稱(chēng)它為下游。 上游流水流到下游轴脐!
最簡(jiǎn)單的使用
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
// emitter.onNext("");
// emitter.onError();
// emitter.onComplete();
}
}, BackpressureStrategy.LATEST).subscribe(new FlowableSubscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
引發(fā)的思考
- 調(diào)用 emitter 的 onNext调卑、 onError抡砂、 onComplete, 就會(huì)回調(diào) FlowableSubscriber 中對(duì)應(yīng)的方法恬涧。 那么這兩個(gè)對(duì)象是一個(gè)嗎注益? 有什么聯(lián)系呢?
- 我們把上述代碼寫(xiě)好后溯捆, 會(huì)自動(dòng)調(diào)用并回調(diào)丑搔, 那么上游發(fā)射器 emitter 是什么時(shí)候觸發(fā)的呢? 該方法是什么時(shí)機(jī)和誰(shuí)調(diào)用的呢提揍?
源碼分析
從創(chuàng)建開(kāi)始
public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
return new FlowableCreate<T>(source, mode);
}
簡(jiǎn)單嗎大兄弟啤月, 僅僅是創(chuàng)建了一個(gè) FlowableCreate 類(lèi)而已。 并對(duì)成員變量賦值劳跃。
簡(jiǎn)單說(shuō)說(shuō)下游
在開(kāi)發(fā)中顽冶, 可能很多都有在用回調(diào)吧。 再次機(jī)會(huì)我也想說(shuō)說(shuō)回調(diào)是咋回事售碳。 其實(shí) java 中的常用的內(nèi)部類(lèi)回調(diào)强重, 還是 c 的函數(shù)指針, 或者其語(yǔ)言的閉包(swift), 其實(shí)不要把它們想的多么神奇贸人。 就這么想间景, 我把一個(gè)實(shí)例地址或者函數(shù)地址給你了, 你在內(nèi)部去調(diào)用我的方法艺智, 自然就運(yùn)行到了外面了倘要。
訂閱
public final void subscribe(FlowableSubscriber<? super T> s) {
try {
Subscriber<? super T> z = s;
subscribeActual(z);
} catch (NullPointerException e) {
throw e;
} catch (Throwable e) {
}
}
信息量很少, 只是調(diào)用了當(dāng)前 Flowable 的 subscribeActual() 方法十拣。 我們前面知道當(dāng)前的 Flowable 是 FlowableCreate 對(duì)象封拧, 所以進(jìn) FlowableCreate 中去看看做了什么事情。
public void subscribeActual(Subscriber<? super T> t) {
BaseEmitter<T> emitter;
switch (backpressure) {
case MISSING: {
emitter = new MissingEmitter<T>(t);
break;
}
case ERROR: {
emitter = new ErrorAsyncEmitter<T>(t);
break;
}
case DROP: {
emitter = new DropAsyncEmitter<T>(t);
break;
}
case LATEST: {
emitter = new LatestAsyncEmitter<T>(t);
break;
}
default: {
emitter = new BufferAsyncEmitter<T>(t, bufferSize());
break;
}
}
t.onSubscribe(emitter);
try {
source.subscribe(emitter);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
emitter.onError(ex);
}
}
我們這里不去討論背壓等問(wèn)題夭问, 所以我們只是關(guān)注主要流程和關(guān)鍵方法泽西。 其中一眼就可以看到關(guān)鍵的一個(gè)就是在 try 塊中的 source.subscribe(emitter); source 是什么呀? 就是我們?cè)?new FlowableCreate 傳進(jìn)來(lái)的 Flowable.create(new FlowableOnSubscribe<String>()) 中 FlowableOnSubscribe 對(duì)象缰趋。 source 的 subscribe 這不就是運(yùn)行了 外部 FlowableOnSubscribe 的 subscribe嘛捧杉, 所以外部調(diào)用 onNext, onError, onComplete 方法, 其實(shí)調(diào)用了 內(nèi)部 emitter 中對(duì)應(yīng)的方法秘血。 我們以背壓為 LATEST 為例看看 LatestAsyncEmitter 被調(diào)用的方法做了什么事情味抖。 先多說(shuō)一句, 初始化 emitter 時(shí)我們傳入的是下游哦灰粮, 下游相應(yīng)的方法調(diào)用了仔涩, 那么外部的就會(huì)看似回調(diào)出去拿到結(jié)果了!
我們以 onNext 為例粘舟, 看看 LatestAsyncEmitter 被調(diào)用到 onNext 做了什么事情熔脂。
public void onNext(T t) {
queue.set(t);
drain();
}
看到是先把結(jié)果存到了隊(duì)列中佩研, 我們不考慮背壓, 所以我們看主要的大致流程哈锤悄。 顯然下一個(gè)有用的代碼就是 drain() 了韧骗。
void drain() {
final Subscriber<? super T> a = downstream;
final AtomicReference<T> q = queue;
// ......
T o = q.getAndSet(null);
// ......
a.onNext(o);
// ......
}
其中 downstream 就是我們外部的 FlowableSubscriber 及下游了嘉抒。 我們可以看到零聚, 簡(jiǎn)單的從隊(duì)列中取出數(shù)據(jù), 直接調(diào)用了下游的 onNext些侍。 就這樣數(shù)據(jù)就被從上游流向了下游隶症。
前面的疑惑問(wèn)題
- 上游和下游是一個(gè)東西嗎? 它們的關(guān)系是什么岗宣?
這個(gè)問(wèn)題從上面的分析已經(jīng)很明顯了蚂会。 上游和下游不是一個(gè)東西, 上游 emitter 調(diào)用相應(yīng)的方法去回調(diào)下游的方法耗式。
- 在哪一個(gè)時(shí)刻觸發(fā)的事件流動(dòng)呢胁住?
其實(shí)是在上游 emitter 調(diào)用相應(yīng)的方法那一刻, 比如調(diào)用 onNext刊咳。 那么是在哪一個(gè)時(shí)機(jī)觸發(fā)調(diào)用的呢彪见? 很明顯是在訂閱時(shí), 調(diào)用了 subscribeActual 中又調(diào)用了上游的 subscribe(emitter) 觸發(fā)了數(shù)據(jù)的流動(dòng)娱挨。