概述
Rx系列現(xiàn)在火遍全球盼忌,網(wǎng)上也紛紛涌現(xiàn)各類教程、博客。作為一個Android開發(fā)人員目溉,我認為掌握RxJava已經(jīng)成為了一項必不可少的專業(yè)技能,然而一眛的去看網(wǎng)上已有的教程和博客菱农,并不能讓自己深入理解RxJava缭付,于是有了本系列,也當作為自己的一個總結(jié)循未。本系列章節(jié)打算從最常見的使用開始陷猫,然后進入源碼具體分析。
使用
最簡單的使用RxJava的一個例子的妖,我們需要三個元素
- Observable(被觀察者)
- Observer(觀察者)
- subscribe(訂閱關(guān)系)
廢話不多說绣檬,就是上代碼
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
Log.d(TAG, "subscribe");
e.onNext("This is String");
e.onNext("This is String");
e.onComplete();
}
})
.subscribe(new Observer() {
@Override
public void onSubscribe(@NonNull final Disposable d) {
Log.d(TAG, "onSubScribe");
}
@Override
public void onNext(@NonNull Object o) {
Log.d(TAG, "onNext");
}
@Override
public void onError(@NonNull Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
1.首先我們需要創(chuàng)建Observable,創(chuàng)建Observable的操作符有很多嫂粟,這里不一一寫出娇未,本文中先使用create操作符創(chuàng)建Observable對象。我們來看一下create方法:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
- create方法中需要一個ObservableOnSubscribe<T>類型參數(shù)
- 方法中最先使用ObjectHelper做了一次判空處理赋元。
- 使用RxJavaPlugins做了某些事情
ObservableOnSubscribe<T>是一個接口忘蟹,因此需要實現(xiàn)當中的subscribe方法,而subscribe方法中存在ObservableEmitter<T>類型的參數(shù)搁凸。
ObservableEmitter<T>繼承Emitter<T>接口媚值,Emitter<T>接口中定義了我們熟知的onNext、onError护糖、onComplete方法褥芒。
ObjectHelper是個工具類,在這就不多說了。
RxJavaPlugins顧名思義锰扶,應該叫做插件類献酗,具體的作用后續(xù)再做詳解,這里大概說明一下坷牛,該類的onAssembly方法跟hook相關(guān)罕偎。而這里hook不影響我們主流程,因此傳進去什么參數(shù)就返回什么(我們這里傳進去的是ObservableCreate<T>對象)京闰,在ObservableCreate<T>中定義了具體訂閱時的邏輯以及發(fā)射器的邏輯颜及。
從這個流程我們看出,當我們使用create操作符創(chuàng)建一個Observable時蹂楣,我們需要傳入一個實現(xiàn)了ObservableOnSubscribe<T>接口的對象俏站,在這個實現(xiàn)中,存在發(fā)射器ObservableEmitter<T>痊土,通過它可以讓使用者自由定義數(shù)據(jù)流向肄扎。并且在create操作符過程中,ObservableOnSubscribe<T>對象與ObservableCreate<T>相關(guān)聯(lián)赁酝。
2.接下來我們需要創(chuàng)建Observer犯祠,Observer是個接口,因此我們只需要實現(xiàn)該接口即可赞哗。
new Observer() {
@Override
public void onSubscribe(@NonNull final Disposable d) {
Log.d(TAG, "onSubScribe");
}
@Override
public void onNext(@NonNull Object o) {
Log.d(TAG, "onNext");
}
@Override
public void onError(@NonNull Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
}
創(chuàng)建Observer很簡單雷则,實現(xiàn)接口當中定義的4個方法即可辆雾。
- onSubscribe(@NonNull final Disposable d),當訂閱時會回調(diào)該方法肪笋,Disposable 用來取消訂閱關(guān)系。
- onNext(@NonNull Object o)度迂,發(fā)射數(shù)據(jù)
- onComplete()藤乙,當onNext方法全部執(zhí)行完畢,執(zhí)行該方法惭墓。
- onError(@NonNull Throwable e)坛梁,當數(shù)據(jù)流向出現(xiàn)問題或使用者自己調(diào)用時執(zhí)行。
3.完成訂閱過程腊凶,首先我們來看看如何完成訂閱的划咐。
observable.subscribe(observer)
當我們使用創(chuàng)建操作符create創(chuàng)建Observable時,還記得create方法返回的是什么類型嗎钧萍?ok褐缠,就是Observable類型,但具體的實現(xiàn)類是ObservableCreate<T>风瘦。
進入Observable的subscribe方法看看:
- 判空處理
- hook相關(guān)
- 執(zhí)行subscribeActual(observer)
- 拋出異常
在這4個流程當中队魏,subscribeActual(observer)方法才是我們應該關(guān)注的。在Observable類中万搔,subscribeActual(observer)是個抽象方法胡桨,因此我們需要尋找它的具體實現(xiàn)官帘。上面已經(jīng)提到使用create操作符,具體的實現(xiàn)類是ObservableCreate<T>昧谊。
在ObservableCreate<T>類的subscribeActual(observer)中刽虹,所聲明的方法參數(shù)便是我們外部傳進來的實現(xiàn)Observer接口的對象。
- 將Observer(觀察者)與發(fā)射器相關(guān)聯(lián)
- 調(diào)用observer的onSubscribe方法呢诬,為了方便觀察者可隨時解除訂閱關(guān)系状婶。
- 執(zhí)行使用者自定義的subscribe方法中的邏輯,同時也將發(fā)射器與Observable(被觀察者)做關(guān)聯(lián)
- 發(fā)生異常馅巷,回調(diào)onError
因此膛虫,當執(zhí)行到subscribeActual(observer)時,才是真正的訂閱钓猬。
而當執(zhí)行到
source.subscribe(parent);
將ObservableOnSubscribe(源頭)與CreateEmitter(Observer稍刀,終點)聯(lián)系起來。
這里的souce就是
這里的parent就是 CreateEmitter<T>敞曹。因此账月,會執(zhí)行parent.onNext(), parent.onComplete(),parent.onError()
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
...
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
...
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
...
}
到這,最簡單的一個使用RxJava的流程就結(jié)束了澳迫。
最后總結(jié)
1.創(chuàng)建Observable過程
- 需要傳入一個實現(xiàn)了ObservableOnSubscribe<T>接口的對象
- 在ObservableOnSubscribe<T>實現(xiàn)中局齿,通過ObservableEmitter<T>可以讓使用者自由定義數(shù)據(jù)流向
- create操作符過程中,采用適配器模式橄登,將ObservableOnSubscribe<T>通過ObservableCreate<T>適配為Observable<T>對象抓歼,讓ObservableOnSubscribe<T>與ObservableCreate<T>相關(guān)聯(lián)
2.訂閱過程
- 真正訂閱的方法在subscribeActual(Observer<? super T> observer)
- source.subscribe(parent); 這行代碼執(zhí)行時,才開始發(fā)射數(shù)據(jù)拢锹,在ObservableOnSubscribe<T>中通過ObservableEmitter<T>發(fā)送數(shù)據(jù)給Observer
- 當Observable與Observer訂閱關(guān)系被dispose時谣妻,不會執(zhí)行onXXX方法。
- Observer 的 onComplete() 和 onError() 互斥只能執(zhí)行一次卒稳,因為CreateEmitter 在回調(diào)他們兩中任意一個后蹋半,都會自動 dispose()。
- 先 error 后 complete充坑,complete 不顯示减江。 反之會 crash
- 還有一點要注意的是 onSubscribe() 是在我們執(zhí)行 subscribe() 這句代碼的那個線程回調(diào)的,并不受線程調(diào)度影響