concat操作符可以接收若干個Observables滤否,并且保證發(fā)射的數(shù)據(jù)是有序的片择。
官方文檔:Returns an Observable that emits the items emitted by three Observables, one after the other, without interleaving them.
onNext榴嗅、onComplete的觸發(fā)順序
//關(guān)鍵代碼示例
Observable.concat(firstObservable, secondObservable)
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(concatObserver);
以上面的代碼為例乾蛤,總結(jié)一下onNext荣挨、onComplete的執(zhí)行順序锰什。
1嫂粟、concatObserver按順序接收到firstObservable的onNext傳遞的數(shù)據(jù)降瞳,secondObservable的onNext傳遞的數(shù)據(jù)昔瞧,最后再觸發(fā)onComplete吟吝。
2菱父、firstObservable必須要執(zhí)行emitter.onComplete后,secondObservable的emitter.onNext才能傳遞到concatObserver的onNext方法剑逃。
3浙宜、firstObservable和secondObservable必須都要調(diào)用emitter.onComplete才能執(zhí)行concatObserver的onComplete方法。
4蛹磺、firstObservable粟瞬、secondObservable在emitter.onComplete方法后調(diào)用的emitter.onNext并不會抵達(dá)concatObserver的onNext方法。emitter.onError方法后的emitter.onNext方法同上萤捆。但不要再emitter.onComplete后調(diào)用emitter.onError亩钟,否則出現(xiàn)io.reactivex.exceptions.UndeliverableException
onNext、onError的觸發(fā)順序
1鳖轰、firstObservable執(zhí)行emitter.onError后清酥,secondObservable的emitter.onNext不會觸發(fā),且secondObservable的subscribe都沒有觸發(fā)蕴侣。
onNext焰轻、onError的坑
一般情況,eg不切換線程昆雀,secondObservable必須等firstObservable的onComplete之后才會觸發(fā)辱志。但是在開發(fā)中遇到一個場景,firstObservable查詢db緩存正常狞膘,觸發(fā)emitter.onNext揩懒,emitter.onComplete方法。但是secondObservable因網(wǎng)絡(luò)異常立即返回了Exception觸發(fā)emitter.onError挽封。這時concatObserver竟然沒有觸發(fā)onNext已球,只觸發(fā)了一次onError。
各種排查后,看到stackoverflow上一個提問智亮,ReactiveX concat doesn't produce onNext from first observable if second fails immediately恍然大悟忆某。
根據(jù)observerOn默認(rèn)方法的javadoc說明,onError事件可能插隊(duì)到onNext之前執(zhí)行阔蛉。說明如下:
Note that onError notifications will cut ahead of onNext notifications on the emission thread if Scheduler is truly asynchronous.
可以用observerOn的一個重載方法弃舒,增加一個delayError參數(shù)為true。
indicates if the onError notification may not cut ahead of onNext notification on the other side of the scheduling boundary. If true a sequence ending in onError will be replayed in the same order as was received from upstream
我理解的是firstObservable在其observeOn的線程準(zhǔn)備觸發(fā)concat的observer状原;secondObservable在其observeOn線程觸發(fā)聋呢;二者最終都需要在concat的observeOn上運(yùn)行。在這個過程中颠区,如果firstObservable和secondObservable還有concat的Observer都不在一個線程坝冕,就可能出現(xiàn)時序問題,導(dǎo)致onError截?cái)嗟給nNext之前瓦呼。
所以另一種方案也生效:
Observable.concat(
getContentFromCache.subscribeOn(dbScheduler).observeOn(AndroidSchedulers.mainThread()),
getContentFromNetwork.subscibeOn(networkScheduler).observeOn(AndroidSchedulers.mainThread())
)
.subscribe(subscriber);
參考文檔:
1喂窟、Rxjava2中Concat操作符onNext,OnError,OnComplte的執(zhí)行順序
2、 ReactiveX concat doesn't produce onNext from first observable if second fails immediately