目錄
【RxJava】- 創(chuàng)建操作符源碼分析
【RxJava】- 變換操作符源碼分析
【RxJava】- 過(guò)濾操作符源碼分析
【RxJava】- 結(jié)合操作符源碼分析
【RxJava】- 連接操作符源碼分析
簡(jiǎn)介
在Activity中使用RxJava的時(shí)候诚撵,由于回調(diào)拌倍,RxJava持有Activity引用。當(dāng)Activity銷毀時(shí)舆逃,RxJava中的耗時(shí)任務(wù)還沒有完成米者,如果這時(shí)候沒有收到調(diào)用對(duì)應(yīng)的dispose()方法汉矿,那么RxJava中持有的資源得不到釋放,從而引起Activity的內(nèi)存泄露。如果在Activity中手動(dòng)調(diào)用动猬,這樣麻煩又不優(yōu)雅,所以這時(shí)候可以使用RxLifecycle來(lái)解決表箭。
使用
具體使用可以參考RxLifecycle赁咙,下面注意講解RxLifecycle源碼,帶你一步步了解RxLifecycle實(shí)現(xiàn)的真相免钻。
流程
分析之前彼水,先用RxJava創(chuàng)建一個(gè)觀察者模型任務(wù)。
Observable.create(emitter -> {}).compose(bindToLifecycle()).subscribe();
這里只分析bindToLifecycle()方法中的內(nèi)容极舔,其它都是RxJava中的操作符凤覆。
跟蹤到
public static <T> LifecycleTransformer<T> bindActivity(@NonNull Observable<ActivityEvent> lifecycle) {
return RxLifecycle.bind(lifecycle, ACTIVITY_LIFECYCLE);
}
- ACTIVITY_LIFECYCLE:Function實(shí)例,對(duì)Activity生命周期時(shí)間做映射拆魏。
- lifecycle:是新創(chuàng)建的BehaviorSubject實(shí)例盯桦。
下一步
public static <T, R> LifecycleTransformer<T> bind(@Nonnull Observable<R> lifecycle,
@Nonnull final Function<R, R> correspondingEvents) {
...
return bind(takeUntilCorrespondingEvent(lifecycle.share(), correspondingEvents));
}
takeUntilCorrespondingEvent(lifecycle.share(), correspondingEvents)方法構(gòu)建一個(gè)新的Observable實(shí)例,即ObservableFilter對(duì)象渤刃∮德停·
private static <R> Observable<Boolean> takeUntilCorrespondingEvent(final Observable<R> lifecycle,
final Function<R, R> correspondingEvents) {
return Observable.combineLatest(
lifecycle.take(1).map(correspondingEvents),
lifecycle.skip(1),
new BiFunction<R, R, Boolean>() {
@Override
public Boolean apply(R bindUntilEvent, R lifecycleEvent) throws Exception {
return lifecycleEvent.equals(bindUntilEvent);
}
})
.onErrorReturn(Functions.RESUME_FUNCTION)
.filter(Functions.SHOULD_COMPLETE);
}
參數(shù)lifecycle.share()
public final Observable<T> share() {return publish().refCount();}
public Observable<T> refCount() {
return RxJavaPlugins.onAssembly(new ObservableRefCount<T>(onRefCount()));
}
private ConnectableObservable<T> onRefCount() {
if (this instanceof ObservablePublishClassic) {
return RxJavaPlugins.onAssembly(
new ObservablePublishAlt<T>(((ObservablePublishClassic<T>)this).publishSource())
);
}
return this;
}
最后調(diào)用bind(@Nonnull final Observable<R> lifecycle)返回一個(gè)持有上面創(chuàng)建的Observable實(shí)例的LifecycleTransformer對(duì)象。
- subscribe()
訂閱
發(fā)射Activity生命周期事件
發(fā)射Activity生命周期事件卖子,封裝RxLifecycle中的RxAppCompatActivity類里面(我繼承的是RxAppCompatActivity)略号。比如onCreate:
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
this.lifecycleSubject.onNext(ActivityEvent.CREATE);
}
onNext方法
public void onNext(T t) {
...
Object o = NotificationLite.next(t);
setCurrent(o);
for (BehaviorDisposable<T> bs : subscribers.get()) {
bs.emitNext(o, index);
}
}
subscribers中的值是一個(gè)BehaviorDisposable數(shù)組,在subscribeActual方法中進(jìn)行添加洋闽。
protected void subscribeActual(Observer<? super T> observer) {
BehaviorDisposable<T> bs = new BehaviorDisposable<T>(observer, this);
if (add(bs)) {...};
...
}
emitNext方法中調(diào)用 test(value)來(lái)發(fā)射數(shù)據(jù)玄柠,如果已經(jīng)有數(shù)據(jù)處于發(fā)射中,者將數(shù)據(jù)保存起來(lái)诫舅,然后返回羽利。
public boolean test(Object o) {
return cancelled || NotificationLite.accept(o, downstream);
}
public static <T> boolean accept(Object o, Observer<? super T> observer) {
if (o == COMPLETE) {
observer.onComplete();
return true;
} elseif (o instanceof ErrorNotification) {
observer.onError(((ErrorNotification)o).e);
return true;
}
observer.onNext((T)o);
return false;
}
調(diào)用
Observable.create(emitter -> {}).compose(bindToLifecycle()).subscribe();
.subscribe()
執(zhí)行.compose中返回實(shí)例的subscribeActual(Observer<? super T> observer)方法,observer是.subscribe()傳入的訂閱實(shí)例骚勘,即觀察者铐伴。-
.compose(bindToLifecycle())
bindToLifecycle()返回的是LifecycleTransformer實(shí)例∏味铮看一下compose(bindToLifecycle())實(shí)現(xiàn)当宴。
public final <R> Observable<R> compose(ObservableTransformer<? super T, ? extends R> composer) { return wrap(((ObservableTransformer<T, R>) ObjectHelper.requireNonNull(composer, "composer is null")).apply(this)); }
public static <T> Observable<T> wrap(ObservableSource<T> source) { ObjectHelper.requireNonNull(source, "source is null"); if (source instanceof Observable) { return RxJavaPlugins.onAssembly((Observable<T>)source); } return RxJavaPlugins.onAssembly(new ObservableFromUnsafeSource<T>(source)); }
首先調(diào)用LifecycleTransformer的apply方法,傳入被觀察者對(duì)象泽疆,即.create返回的實(shí)例户矢。
apply方法做了什么public ObservableSource<T> apply(Observable<T> upstream) { return upstream.takeUntil(observable); }
對(duì)被觀察者使用takeUntil操作符,傳入值為takeUntilCorrespondingEvent返回的ObservableFilter實(shí)例殉疼,新創(chuàng)建一個(gè)ObservableTakeUntil實(shí)例并返回梯浪。
由于ObservableTakeUntil是Observable的子類捌年,所以.compose返回ObservableTakeUntil對(duì)象。
-
ObservableTakeUntil
接下來(lái)就執(zhí)行到ObservableTakeUntil中的subscribeActual(Observer<? super T> child)方法挂洛。ObservableTakeUntil中的兩個(gè)參數(shù):
- other
takeUntilCorrespondingEvent返回的ObservableFilter實(shí)例 - source
被觀察者礼预,即.create返回的實(shí)例。
public void subscribeActual(Observer<? super T> child) { TakeUntilMainObserver<T, U> parent = new TakeUntilMainObserver<T, U>(child); child.onSubscribe(parent); other.subscribe(parent.otherObserver); source.subscribe(parent); }
child是觀察者實(shí)例虏劲,即.subscribe()中的訂閱者托酸。
調(diào)用觀察者的onSubscribe方法,傳入持有觀察者實(shí)例TakeUntilMainObserver實(shí)例柒巫。
調(diào)用ObservableFilter的subscribe方法励堡,調(diào)用被觀察者(即.create返回的實(shí)例)中的subscribe方法。前者傳入OtherObserver實(shí)例堡掏,后者傳入TakeUntilMainObserver實(shí)例应结。
看一下takeUntilCorrespondingEvent方法:
private static <R> Observable<Boolean> takeUntilCorrespondingEvent(final Observable<R> lifecycle, final Function<R, R> correspondingEvents) { return Observable.combineLatest( lifecycle.take(1).map(correspondingEvents), lifecycle.skip(1), new BiFunction<R, R, Boolean>() { @Override public Boolean apply(R bindUntilEvent, R lifecycleEvent) throws Exception { return lifecycleEvent.equals(bindUntilEvent); } }) .onErrorReturn(Functions.RESUME_FUNCTION) .filter(Functions.SHOULD_COMPLETE); }
這里面的邏輯什么觸發(fā)呢,當(dāng)在Activity生命周期發(fā)射事件時(shí)泉唁,也就是上面講的RxAppCompatActivity生命周期發(fā)射事件鹅龄。
.filter返回的是ObservableFilter對(duì)象。而ObservableFilter中的downstream是ObservableTakeUntil中的OtherObserver實(shí)例游两±悖看一下ObservableFilter中的onNext方法。
public void onNext(T t) { if (sourceMode == NONE) { boolean b; try { b = filter.test(t); } catch (Throwable e) { fail(e); return; } if (b) { downstream.onNext(t); } } else { downstream.onNext(null); } }
boolean b = filter.test(t)是使用combineLatest操作符返回的值贱案,規(guī)則是肛炮,當(dāng)需要觀察的生命周期事件(即ifecycle.take(1))和在Activity中發(fā)射的生命周期事件相等時(shí)返回true,否則返回false宝踪。
比如觀察的是onStop生命周期侨糟,那么觀察的事件就是ActivityEvent.STOP,當(dāng)Activity調(diào)用onStop時(shí)瘩燥,那么filter.test(t)返回true秕重。
downstream.onNext(t)調(diào)用ObservableTakeUntil中的onNext方法。
public void onNext(U t) { DisposableHelper.dispose(this); otherComplete(); }
這里基本上就是去清理RxJava的相關(guān)數(shù)據(jù)了厉膀。其它情況類似溶耘。
- other
bindUntilEvent(@NonNull ActivityEvent event)
指定觀察那個(gè)生命周期事件。
其它
// If you want pre-written support preference Fragments you can subclass as providers
implementation 'com.trello.rxlifecycle3:rxlifecycle-components-preference:3.1.0'
// If you want to use Android Lifecycle for providers
implementation 'com.trello.rxlifecycle3:rxlifecycle-android-lifecycle:3.1.0'
這兩個(gè)是在其它情況下事情 服鹅,具體自己看庫(kù)里面的代碼凳兵,很少。