本文所涉及到的RxJava操作符:
-
compose
你完全可以將apply
中的內(nèi)容轉(zhuǎn)移到外部,因?yàn)樗⒉划a(chǎn)生新的上游數(shù)據(jù)源,通常使用它來(lái)包含一系列通用的對(duì)上游數(shù)據(jù)源的處理 -
takeUntil
直到某一種條件達(dá)成時(shí)昼接,會(huì)發(fā)送onComplete結(jié)束數(shù)據(jù)流。 -
combineLatest
聯(lián)合多個(gè)上游數(shù)據(jù)源施绎,當(dāng)有一個(gè)上游發(fā)送數(shù)據(jù)時(shí),就會(huì)將所有上游所發(fā)送的最新的數(shù)據(jù)聯(lián)合發(fā)送贞绳。 -
filter
只有符合過(guò)濾條件的上游數(shù)據(jù)才會(huì)被繼續(xù)發(fā)送 -
take
只有前n個(gè)數(shù)量的上游數(shù)據(jù)才會(huì)被發(fā)送 -
skip
從n+1個(gè)數(shù)量的上游數(shù)據(jù)開(kāi)始才會(huì)被發(fā)送
直接上代碼,以下代碼為根據(jù)RxLifeCycle的實(shí)現(xiàn)提取出的一個(gè)完整的上游數(shù)據(jù)源到下游數(shù)據(jù)源所經(jīng)過(guò)的歷程
致稀,注釋已經(jīng)寫(xiě)的差不多了
Ob
.compose(new ObservableTransformer<Object, Object>() {
@Override
public ObservableSource<Object> apply(Observable<Object> upstream) {
//一個(gè)subject既是Observable又是Observer
Observable<ActivityEvent> activityEventObservable = Activity.this.provideLifecycleSubject()
.share();
return upstream
//takeUntil有兩種實(shí)現(xiàn)
// 一種是:當(dāng)返回true時(shí)冈闭,就發(fā)送onComplete
// 另一種是:當(dāng)(參數(shù)指定的)數(shù)據(jù)源發(fā)射事件時(shí),發(fā)送onComplete
//這里采用第二種抖单,同時(shí)借助filter操作符(控制數(shù)據(jù)源發(fā)送的事件是否會(huì)被攔截)萎攒,實(shí)現(xiàn)當(dāng)?shù)竭_(dá)對(duì)應(yīng)生命周期時(shí)發(fā)送onComplete的效果
.takeUntil(
//每當(dāng)聯(lián)合的數(shù)據(jù)源中遇八,有數(shù)據(jù)源發(fā)送新的事件時(shí),就會(huì)將所有的數(shù)據(jù)源發(fā)送的最新事件合并起來(lái)耍休,發(fā)送一個(gè)新的事件
//這里利用這個(gè)特性刃永,在每個(gè)生命周期方法的最后調(diào)用onNext發(fā)送生命周期事件
Observable.combineLatest(
//將訂閱時(shí)的生命周期轉(zhuǎn)化為停止發(fā)射事件的生命周期
activityEventObservable
.take(1)
.map(new Function<ActivityEvent, ActivityEvent>() {
@Override
public ActivityEvent apply(ActivityEvent activityEvent) throws Exception {
return activityEvent;
}
}),
//跳過(guò)訂閱時(shí)的生命周期,當(dāng)后續(xù)生命周期發(fā)送
activityEventObservable
.skip(1),
new BiFunction<ActivityEvent, ActivityEvent, Boolean>() {
@Override
public Boolean apply(ActivityEvent bindUntilEvent, ActivityEvent lifecycleEvent) throws Exception {
return lifecycleEvent.equals(bindUntilEvent);
}
})
//是否攔截事件
.filter(b -> b));
}
})