在前邊RxJava實(shí)戰(zhàn)技巧大全一文中,我們介紹了RxJava使用過程中常見的應(yīng)用場景和可能遇到的問題,其中我們談到利用RxLifeCycle來管理RxJava的生命周期乐设,避免內(nèi)存泄漏問題晚树,今天自己動手打生命周期管理框RxLife來加深對RxJava的認(rèn)識。
詳解Subject
什么是Subject
在RxJava當(dāng)中崇堵,有四個對象值得我們關(guān)注:Observable型诚,Subject,Observer鸳劳,Subscriber狰贯,它們之間的關(guān)系如下:
對于Observable,Observer赏廓,Subscriber我們比較熟悉涵紊,故不做說明,重點(diǎn)來看Subject幔摸。
通過上面的圖我們可以看出Subject繼承自O(shè)bservable摸柄,也就意味著Subject可以作為被觀察者,另外既忆,它又實(shí)現(xiàn)了Observer接口驱负,這意味著它也可以作為觀察者。不難看出患雇,Subject既能作為Observer訂閱Observable,又能作為Observable被其他Observer訂閱跃脊。總之庆亡,Subject承擔(dān)了這么一種角色:對上作為觀察者匾乓,對下作為被觀察者。
和Observable必須有訂閱者才能發(fā)射數(shù)據(jù)不一樣又谋,無論Subject是否有訂閱者拼缝,它都可以發(fā)射數(shù)據(jù)。這有點(diǎn)類似廣播電臺彰亥,不會因?yàn)槲覀冴P(guān)閉收音機(jī)就停止廣播咧七,在收聽的人自然收聽的到,沒收聽的人也無關(guān)緊要任斋。
常見的Subject
從上面的uml中我們看出继阻,RxJava為我們提供了四種常用的Subject,
即syncSubject,BehabviorSubject,PublishSubject,ReplaySubject,下面我們對這四者進(jìn)行說明瘟檩。
AsyncSubject
AsyncSubject會緩存最后一個數(shù)據(jù)并在調(diào)用onCompleted()
時將該數(shù)據(jù)發(fā)送給訂閱者抹缕,原理如下:
在該過程中,一旦發(fā)生任何異常都不會發(fā)送數(shù)據(jù)到訂閱者墨辛,而是發(fā)送給訂閱者一個異常通知卓研,即訂閱者只能接受到一個異常的通知,如下:
舉例來說明AsyncSubject的用法:
asyncSubject.onNext("1");
asyncSubject.onNect("2");
asyncSubject.onCompleted();//必須調(diào)用才會開始發(fā)送數(shù)據(jù)
以上代碼執(zhí)行后睹簇,訂閱者接受到的數(shù)據(jù)是2.
BehaviorSubject
當(dāng)BehaviorSubject被訂閱后奏赘,它首先會發(fā)送原始Observable最近發(fā)射的數(shù)據(jù),如果最近沒有太惠,會發(fā)射一個默認(rèn)值磨淌,接下繼續(xù)發(fā)射原始Observable的數(shù)據(jù),如下圖:
如果原始的Observable因?yàn)榘l(fā)生了錯誤而終止凿渊,那么BehaviorSubject在發(fā)送一個錯誤通知后不再發(fā)射數(shù)據(jù)梁只,如下:
我們舉例來說明BehabviorSubject的用法:
behaviorSubject.onNext("1");
behaviorSubject.onNect("2");
behaviorSubject.onNext("3");
behaviorSubject.subscribe(new Action<String>(){
@Override
public void call(String s){
System.out.println(“result:”+s);
}
});
behaviorSubject.onNext("4");
輸出結(jié)果是3,4.
PublishSubject
默認(rèn)情況下,RxJava中的Observable一旦被訂閱就開始發(fā)送事件埃脏,這和我們傳統(tǒng)的觀察者模式有所區(qū)別敛纲。而PublishSuject的行為則類似傳統(tǒng)的觀察這模式,觀察者可以先訂閱被觀察者剂癌,然后在某個時刻手動調(diào)用方法來發(fā)射數(shù)據(jù)(訂閱之后的數(shù)據(jù))到所有的觀察者。如下圖:
如果原始的Observable因?yàn)榘l(fā)生了錯誤而終止翰绊,那么PublishSubject在發(fā)送一個錯誤通知后不再發(fā)射數(shù)據(jù)佩谷,如下:
舉例來說明PublishSubject的用法:
publishSubject.onNext("1");
publishSubject.onNect("2");
publishSubject.onNext("3");//訂閱之前不會被發(fā)送
publishSubject.subscribe(new Action<String>(){
@Override
public void call(String s){
System.out.println(“result:”+s);
}
});
publishSubject.onNect("4");
publishSubject.onNect("5");
1,2监嗜,3是在訂閱之前的數(shù)據(jù)谐檀,不會被發(fā)射,最終輸出結(jié)果是4裁奇,5桐猬。
ReplaySubject
ReplaySubject會緩存所有已經(jīng)發(fā)射的數(shù)據(jù),當(dāng)一個新的訂閱關(guān)系產(chǎn)生時刽肠,ReplaySuject會將所有數(shù)據(jù)都發(fā)送給他溃肪。另外,ReplaySubject支持設(shè)置緩存數(shù)據(jù)和緩存時間音五。如下圖:
舉例來說明ReplaySubject的用法:
replaySubject.onNext("1");
replaySubject.onNect("2");
replaySubject.onNext("3");
replaySubject.subscribe(new Action<String>(){
@Override
public void call(String s){
System.out.println(“result:”+s);
}
});
replaySubject.onNect("4");
默認(rèn)情況下ReplaySubject會緩存所有的數(shù)據(jù)惫撰,因此最終數(shù)據(jù)的結(jié)果如下:
result:1
result:2
result:3
result:4
小結(jié)
回顧上面所談的,不難看出不同的Subject最大的區(qū)別在于發(fā)送數(shù)據(jù)的行為不同躺涝,簡單概括如下:
Subject | 發(fā)射行為 |
---|---|
AsyncSubject | 不論訂閱發(fā)生在什么時候厨钻,只會發(fā)射最后一個數(shù)據(jù) |
BehaviorSubject | 發(fā)送訂閱之前一個數(shù)據(jù)和訂閱之后的全部數(shù)據(jù) |
ReplaySubject | 不論訂閱發(fā)生在什么時候,都發(fā)射全部數(shù)據(jù) |
PublishSubject | 發(fā)送訂閱之后全部數(shù)據(jù) |
關(guān)于Subject更詳細(xì)的使用方法請直接查閱api doc.
實(shí)現(xiàn)生命周期管理框架(RxLife)
在了解Subject之后就可以開始考慮如何實(shí)現(xiàn)一個生命周期管理框架。每當(dāng)Activity或者Fragment的生命周期發(fā)生變化時我們都希望產(chǎn)生一個對應(yīng)的事件來通知當(dāng)前所有的訂閱者夯膀,這樣我們就可以根據(jù)對應(yīng)的事件去確定是否取消訂閱關(guān)系了诗充。
從上面的描述中,我們有兩個問題要解決:
- 如何監(jiān)聽Activity或Fragmeng生命周期變化并將其發(fā)送出去诱建。
- 原有的觀察者如何接受生命周期蝴蜓,并在某生命周期下中斷原有的事件流。
通過以上兩個問題涂佃,我們知道我們需要一個既能夠發(fā)射生命周期励翼,又能接受生命周期的觀察者,因此不難想到這里需要Subject辜荠。生命周期是連續(xù)產(chǎn)生的汽抚,無論是否有訂閱者,我們只關(guān)注最最近的生命周期伯病,因此我們選擇使用BehaviorSubject造烁。
現(xiàn)在我們來考慮如何監(jiān)聽Activity或Fragment的生命周期,并利用BehaviorSubject發(fā)射生命周期午笛。這里我們以Activity為例進(jìn)行說明惭蟋。
生命周期事件監(jiān)聽
定義生命周期事件
我們根據(jù)Activity的生命周期,定義相應(yīng)的事件药磺。
public enum ActivityEvent {
CREATE,
RESUME,
START,
PAUSE,
STOP,
DESTORY
}
監(jiān)聽生命周期
為了能在Activitiy生命周期變化時發(fā)送相應(yīng)的事件告组,我們定義了RxAppcompatActivity,該類繼承了AppCompatActivity并重寫器生命周期方法:在不同方法中發(fā)射事件到BehaviorSubject中癌佩。這就好像我們的BehaviorSubject對象在不斷的觀察Activity生命周期的變化木缝。當(dāng)然,由于Subject的特性围辙,BehaviorSubject也具備了將這些事件發(fā)射出去的能力我碟。
public class RxAppCompatActivity extends AppCompatActivity {
protected final BehaviorSubject<ActivityEvent> lifeSubject = BehaviorSubject.create();
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
lifeSubject.onNext(ActivityEvent.CREATE);
}
@Override
protected void onResume() {
super.onResume();
lifeSubject.onNext(ActivityEvent.RESUME);
}
@Override
protected void onStart() {
super.onStart();
lifeSubject.onNext(ActivityEvent.START);
}
@Override
protected void onPause() {
super.onPause();
lifeSubject.onNext(ActivityEvent.PAUSE);
}
@Override
protected void onStop() {
super.onStop();
lifeSubject.onNext(ActivityEvent.STOP);
}
@Override
protected void onDestroy() {
super.onDestroy();
lifeSubject.onNext(ActivityEvent.DESTORY);
}
}
Observable自動停止發(fā)射數(shù)據(jù)
到現(xiàn)在我們已經(jīng)利用Subject來監(jiān)視生命周期的變化,那又如何讓原有的Observable(比如網(wǎng)絡(luò)請求的Observable)來監(jiān)視Subject發(fā)射的數(shù)據(jù)呢姚建,并根據(jù)Subject的狀態(tài)自動停止原始數(shù)據(jù)的發(fā)射矫俺?換言之就是一個Observable如何在發(fā)射數(shù)據(jù)的同時監(jiān)視另一個Observable?
TakeUtil操作符
令人高興的是掸冤,RxJava中提供的TakeUntil操作符來實(shí)現(xiàn)上述需求厘托。TakeUntil訂閱原始的Observable并發(fā)射數(shù)據(jù),此外它還監(jiān)視你提供的第二個Observable贩虾。當(dāng)?shù)诙€Observable發(fā)射了一項(xiàng)數(shù)據(jù)或者發(fā)射一項(xiàng)終止的通知時(onError通知或者onCompleted通知)催烘,TakeUntil返回的Observable會停止發(fā)射原始的Observable,如下圖所示:
我們用一個簡單的例子來展示TakeUntil操作符的使用:
Observable.interval(2, TimeUnit.SECONDS).subscribe(new Action1<Long>() {
@Override
public void call(Long num) {
Log.d("MainActivity", "num:" + num);
}
});
上面的代碼每隔2s進(jìn)行輸出缎罢,現(xiàn)在我們希望5s后自動停止輸出,就可以這樣做:
Observable.interval(2, TimeUnit.SECONDS).takeUntil(Observable.timer(5,TimeUnit.SECONDS)).subscribe(new Action1<Long>() {
@Override
public void call(Long num) {
Log.d("MainActivity", "num:" + num);
}
});
為了讓以上代碼更通用伊群,我們利用compose操作符進(jìn)行改寫(對compose不熟悉的童鞋自行查閱資料):
private void startIntervalTask1() {
Observable.interval(2, TimeUnit.SECONDS).compose(bindUntilDelay(5)).subscribe(new Action1<Long>() {
@Override
public void call(Long num) {
Log.d("MainActivity", "num:" + num);
}
});
}
@NonNull
private Observable.Transformer<Long, Long> bindUntilDelay(final int delaySecond) {
return new Observable.Transformer<Long, Long>() {
@Override
public Observable<Long> call(Observable<Long> longObservable) {
return longObservable.takeUntil(timer(delaySecond,TimeUnit.SECONDS));
}
};
}
回到正題考杉,現(xiàn)在我們已經(jīng)有了可以發(fā)射生命周期事件的BehaviorSubject,再結(jié)合TakeUntil不就可以實(shí)現(xiàn)在指定生命周期發(fā)生時自動停止原有的Observable了嗎舰始?
結(jié)合BehaviorSubject與TakeUntil
有了上面的知識做鋪墊崇棠,實(shí)現(xiàn)生命周期管理框架也就顯得輕而易舉了。
為了方便使用丸卷,我們在RxAppcompatActivity中提供了bindUntilEvent(ActivityEvent nindEvent)方法:
public class RxAppCompatActivity extends AppCompatActivity {
protected final BehaviorSubject<ActivityEvent> lifeSubject = BehaviorSubject.create();
public <T> Observable.Transformer<T, T> bindUntilEvent(final ActivityEvent bindEvent) {
//被監(jiān)視的Observable
final Observable<ActivityEvent> observable = lifeSubject.takeFirst(new Func1<ActivityEvent, Boolean>() {
@Override
public Boolean call(ActivityEvent event) {
return event.equals(bindEvent);
}
});
return new Observable.Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> sourceOb) {
return sourceOb.takeUntil(observable);
}
};
}
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
lifeSubject.onNext(ActivityEvent.CREATE);
}
@Override
protected void onResume() {
super.onResume();
lifeSubject.onNext(ActivityEvent.RESUME);
}
@Override
protected void onStart() {
super.onStart();
lifeSubject.onNext(ActivityEvent.START);
}
@Override
protected void onPause() {
super.onPause();
lifeSubject.onNext(ActivityEvent.PAUSE);
}
@Override
protected void onStop() {
super.onStop();
lifeSubject.onNext(ActivityEvent.STOP);
}
@Override
protected void onDestroy() {
super.onDestroy();
lifeSubject.onNext(ActivityEvent.DESTORY);
}
}
接下來枕稀,我們用同樣的方式來處理Fragment或者其他組件即可。
具體使用
新建的Activity需要繼承我們的RxAppcompatActivity谜嫉,新建的Fragment則繼承我們的RxFragment萎坷,就是這么簡單。
我們同樣還是以師父說為例沐兰,由于我們的方法基本和RxLifeCycle保持一致哆档,因此只要簡單的改動就可以讓RxLife工作起來,現(xiàn)在就可以用RxLife來代替RxLifeCycle住闯。
仍然做個簡單的示例:
ApiFactory.getWXApi().getWXHot(AppConstant.KEY_WX, getPageSize(), mCurrentPage + 1).compose(this.bindUntilEvent(FragmentEvent.STOP))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(mSubscriber);
總結(jié)
通過自行實(shí)現(xiàn)一個RxJava生命周期管理框架(RxLife)加深le我們對RxJava中Subject的理解瓜浸。另外,Subject的應(yīng)用非常廣泛比原,在下篇文章中插佛,我們將會進(jìn)一步深入,利用Subject來打造自己的事件通信總線RxBus量窘。