注:本文分析的是RxJava 2.0.0 閱讀之前诅诱,希望你對(duì)RxJava1已經(jīng)有所了解奶赠。
首先了解幾個(gè)基本接口
public interface Emitter<T>
void onNext(T value);
void onError(Throwable error);
void onComplete();
public interface ObservableEmitter<T> extends Emitter<T>
void setDisposable(Disposable d);
void setCancellable(Cancellable c);
boolean isDisposed();
ObservableEmitter<T> serialize();
public interface Disposable {
void dispose();
boolean isDisposed();
和第一版的Subscriber挺像的近刘,把Subscriber拆分出了幾部分
再看一個(gè)接口
public interface Observer<T>
void onSubscribe(Disposable d);
void onNext(T value);
void onError(Throwable e);
void onComplete();
其中onSubscribe和第一版Subscriber的onStart方法類似
第二版多了個(gè)Disposable參數(shù),讓你可以取消橘沥。
我們可以寫(xiě)這個(gè)接口的子類窗轩,訂閱的時(shí)候使用
作為開(kāi)始,先寫(xiě)一個(gè)超級(jí)簡(jiǎn)單的例子:
Observable
.create(new ObservableOnSubscribe<String>() {
@Override public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("1"); } })
.subscribe(new Observer<String>() {
@Override public void onSubscribe(Disposable d) {}
@Override public void onNext(String value) { }
@Override public void onError(Throwable e) {}
@Override public void onComplete() {}
});
(排版有點(diǎn)糟糕座咆,先將就一下~~)
create方法接受一個(gè)ObservableOnSubscribe對(duì)象
ObservableOnSubscribe是一個(gè)接口品姓,只有一個(gè)方法
public interface ObservableOnSubscribe<T>
void subscribe(ObservableEmitter<T> e) throws Exception;
ObservableEmitter這個(gè)接口在開(kāi)始的時(shí)候提到過(guò)
看看create的源碼:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
先判空
RxJavaPlugins.onAssembly是一個(gè)hook,類似于監(jiān)聽(tīng)箫措,不設(shè)置的話傳入什么就返回什么
所以這里返回的是ObservableCreate對(duì)象
ObservableCreate繼承了Observable腹备,Observable是一個(gè)基類,也是一個(gè)抽象類
Observable規(guī)定了被觀察者的基本流程斤蔓,具體實(shí)現(xiàn)由子類完成
ObservableCreate的構(gòu)造植酥,僅僅是接口保存起來(lái)
現(xiàn)在來(lái)看subscribe方法
里面經(jīng)過(guò)一些檢查后,會(huì)調(diào)用subscribeActual方法
這個(gè)方法是abstract的
前面我們得到的實(shí)際上是ObservableCreate對(duì)象弦牡,所以這個(gè)具體實(shí)現(xiàn)在ObservableCreate里面
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
可以看到友驮,這里用CreateEmitter包裝了observer,這個(gè)observer就是訂閱時(shí)傳入的observer
然后驾锰,調(diào)用onSubscribe方法
之后卸留,source就是create的時(shí)候傳入的接口,終于回到我們的自定義方法了
然而椭豫,這個(gè)時(shí)候耻瑟,我們和observer還隔著一個(gè)CreateEmitter
先看看CreateEmitter部分代碼:
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
} else {
RxJavaPlugins.onError(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
onNext: 檢查發(fā)射的值是否為null, observer是否已取消觀察, 然后才交給observer
注意赏酥,異常需要自己處理
onComplete:檢查observer是否已取消觀察喳整, 交給observer, 最后調(diào)用dispose
onError: 檢查異常是否為null, observer是否已取消觀察裸扶, 然后才交給observer框都, 最后dispose
注意,要保證onComplete和onError只能調(diào)用一次
再來(lái)看看取消觀察
CreateEmitter繼承了AtomicReference
也就是說(shuō)這個(gè)類有一個(gè)變量呵晨,這個(gè)變量用來(lái)標(biāo)志是否已經(jīng)取消魏保。
調(diào)用dispose最終會(huì)調(diào)用這個(gè)方法
public static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get();
Disposable d = DISPOSED;
if (current != d) {
current = field.getAndSet(d);
if (current != d) {
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}
簡(jiǎn)單來(lái)說(shuō),就是把這個(gè)標(biāo)志設(shè)置為DISPOSED摸屠,DISPOSED就是一個(gè)單例谓罗。
isDispose方法:
public static boolean isDisposed(Disposable d) {
return d == DISPOSED;
}
很簡(jiǎn)單,看這個(gè)標(biāo)志是否等于DISPOSE
回顧一下餐塘,在create的時(shí)候返回ObservableCreate對(duì)象妥衣,這個(gè)對(duì)象保存了傳入的接口
subscribe的時(shí)候,傳入的observer會(huì)到達(dá)ObservableCreate,進(jìn)行包裝税手,然后交給create時(shí)我們的自定義處理
取消訂閱就是設(shè)置標(biāo)志位蜂筹,本身并不會(huì)做些什么,這和Thread的停止是一樣的道理