通常我們使用RxJava時會這樣寫:
Subscription subscription = getUsers()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
}
});
得到一個Subscription對象之后,在Activity銷毀的時候去取消訂閱以防內存泄漏:
if (subscription != null && !subscription.isUnsubscribed()) {
subscription.unsubscribe();
}
這樣做固然沒錯, 可是真的每次都需要手動取消訂閱嗎? 答案顯然不是.
我們來看看源碼:
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
...
// if not already wrapped
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}
}
當調用subscribe時,調用了第二個方法, 當subscriber不是SafeSubscriber的實例時, 則創(chuàng)建一個SafeSubscriber對象. 那么來看看這個SafeSubscriber是個什么鬼:
public class SafeSubscriber<T> extends Subscriber<T> {
@Override
public void onCompleted() {
...
try {
actual.onCompleted();
} catch (Throwable e) {
...
} finally { // NOPMD
try {
unsubscribe();
} catch (Throwable e) {}
}
}
@Override
public void onError(Throwable e) {
...
_onError(e);
}
@Override
public void onNext(T args) {}
protected void _onError(Throwable e) { // NOPMD
...
try {
unsubscribe();
} catch (Throwable unsubscribeException) {}
}
}
可以看出該類僅僅是對Subscriber做了個包裝.
仔細查看代碼, 發(fā)現(xiàn)在onComplated() 方法和onError() 方法中, 都調用了unsubscribe() 方法進行取消訂閱.
因此我們可以大膽猜測, 只要OnCompleted() 和 onError() 執(zhí)行之后, 都會取消訂閱.
下面就簡單的進行一下測試:
Subscription subscription = Observable.just(1)
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("complete");
}
@Override
public void onError(Throwable e) {
System.out.println("error");
}
@Override
public void onNext(Integer integer) {
System.out.println("integer:" + integer);
}
});
boolean flag = subscription.isUnsubscribed();
System.out.println("flag:" + flag);
運行結果為:
integer:1
complete
flag: true
說明onCompleted執(zhí)行之后的確取消了訂閱. 再來看看OnError:
Subscription subscription = Observable.just(1)
.doOnNext(new Action1<Integer>() {
@Override
public void call(Integer integer) {
throw new RuntimeException("Oops!");
}
})
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("complete");
}
@Override
public void onError(Throwable e) {
System.out.println("error");
}
@Override
public void onNext(Integer integer) {
System.out.println("integer:" + integer);
}
});
boolean flag = subscription.isUnsubscribed();
System.out.println("flag:" + flag);
運行結果為:
error
flag: true
說明OnError執(zhí)行之后也取消了訂閱.
那么究竟什么時候需要手動取消訂閱呢? 我們再看一個例子:
final Subscription subscription = Observable.just(1)
.subscribeOn(Schedulers.io())
.delay(2, SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("complete");
}
@Override
public void onError(Throwable e) {
System.out.println("error");
}
@Override
public void onNext(Integer integer) {
System.out.println("integer:" + integer);
}
});
new Thread(new Runnable() {
@Override
public void run() {
while (true){
System.out.println("flag:" + subscription.isUnsubscribed());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
我們模擬平時App網(wǎng)絡請求的例子, 在后臺線程中發(fā)起網(wǎng)絡請求, 在主線程更新界面, 并延時兩秒模擬網(wǎng)絡不好的情況, 同時新啟一個線程, 每隔一秒打印一次當前Subscription的狀態(tài), 下面是運行結果:
I/System.out: flag:false
I/System.out: flag:false
I/System.out: integer:1
I/System.out: complete
I/System.out: flag:true
I/System.out: flag:true
可以看到在前兩秒時由于還在后臺執(zhí)行網(wǎng)絡請求,所以并沒有取消訂閱, 而當complete執(zhí)行之后, 訂閱就取消了.
因此我們在開發(fā)中, 如果有異步的操作還正在進行, 在Activity銷毀時才需要手動取消訂閱, 而如果是同步的操作, 或者異步操作已經(jīng)完成, 則并不需要手動取消訂閱.