Github 相關(guān)代碼: Github地址
一直感覺 RxJava2 的取消訂閱有點混亂, 這樣也能取消, 那樣也能取消, 沒能系統(tǒng)起來的感覺就像掉進(jìn)了盤絲洞, 迷亂...
下面說說這幾種情況
幾種取消的情況
-
subscribe 時返回了 disposable:
-
subscribe 不返回 disposable, 從 observer 的 onSubscribe 中獲取:
-
之前從網(wǎng)上看的, 使用繼承 DisposableObserver 的 observer, 這個 observer 可以直接 dispose
今天打起精神, 看了點源碼, 搞懂了這到底是什么鬼.
源碼分析
" 啰嗦啥啊, 這么簡單的東西還需要貼源碼? "
" 大哥, 下面有總結(jié).... "
從第一種情況開始看, 我們進(jìn)入到 .subscribe((s) -> {})
中看, 發(fā)現(xiàn)它是返回了一個四參數(shù)的重載方法
public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
...
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
可以看到, 這個方法里創(chuàng)建了一個 LambdaObserver
, 這個 Observer
實現(xiàn)了Disposable
接口, 所以可以直接作為 Disposable
返回到最上級, 這就是為什么第一種情況中的 subscribe
能返回 disposable
的原因.
public final class LambdaObserver<T> extends AtomicReference<Disposable>
implements Observer<T>, Disposable, LambdaConsumerIntrospection {
...
}
而第二種情況的 subscribe
其實就是上面方法里的 subscribe(LambdaObserver)
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
這個方法內(nèi)執(zhí)行了 subscribeActual(observer)
抽象方法, 交給了 Observale
的子類重寫.
第三種情況 中出現(xiàn)的 DisposableObserver
與 LambdaObserver
差不多, 甚至更簡單
public abstract class DisposableObserver<T> implements Observer<T>, Disposable {
final AtomicReference<Disposable> s = new AtomicReference<Disposable>();
@Override
public final void onSubscribe(@NonNull Disposable s) {
if (EndConsumerHelper.setOnce(this.s, s, getClass())) {
onStart();
}
}
protected void onStart() { }
@Override
public final boolean isDisposed() {
return s.get() == DisposableHelper.DISPOSED;
}
@Override
public final void dispose() {
DisposableHelper.dispose(s);
}
}
簡單總結(jié)
稍微一看我們就明白了, 當(dāng)傳入 Observer
接口的四個方法時, subscribe
在內(nèi)部構(gòu)建了一個 LambdaObserver
, 而這個 LambdaObserver
和第三種情況的 DisposableObserver
都實現(xiàn)了 Disposable
接口, 所以可以作為 Disposable
返回, 就是這么簡單.
另外第三種情況里出現(xiàn)的 CompositeDisposable
, 簡單說就是一個 Disposable
集合( 由 RxJava 內(nèi)部提供的OpenHashSet 維護(hù), 線程安全 ), CompositeDisposable.dispose()
時會遍歷內(nèi)部的所有 Disposable
執(zhí)行 dispose
操作.
/**
* Dispose the contents of the OpenHashSet by suppressing non-fatal
* Throwables till the end.
* @param set the OpenHashSet to dispose elements of
*/
void dispose(OpenHashSet<Disposable> set) {
...
for (Object o : array) {
if (o instanceof Disposable) {
try {
((Disposable) o).dispose();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
if (errors == null) {
errors = new ArrayList<Throwable>();
}
errors.add(ex);
}
}
}
...
}
幾種方式的適用情況##
-
如果是零星使用的話, 第一種最方便,
observer
的四個方法可以按需使用, 相同邏輯的方法有多種可供選擇
image 如果使用的
Observer
有很多共同邏輯, 則可以寫一個BaseObserver
繼承DisposableObserver
或者LambdaObserver
, 直接使用xxObserver.dispose()
open class BaseObserver<T>() : DisposableObserver<T>()
- 如果有很多的
diposable
需要取消的話, 使用CompositeDisposable
會更簡單一些