??樓主最近在找實習(xí)工作,由于簡歷上說了解RxJava,所以在面試的時候應(yīng)該會問到RxJava的知識,于是樓主結(jié)合RxJava的源碼怜姿,對RxJava的工作原理進行初步的了解。也只敢說是初步了解疼燥,因為自己也是第一次看RxJava的源碼沧卢,理解的程度肯定不是很深。還是那樣醉者,如果有錯誤之處但狭,希望各位指正!
??本文參考:
??1.除非特殊說明撬即,源碼來自:2.2.0版本
??2.RxJava從源碼到應(yīng)用 移動端開發(fā)效率秒提速
1.概述
??樓主打算將RxJava的源碼分析寫成一個系列文章立磁,所以這個是這個系列的第一篇文章,在概述里面還是對RxJava是什么簡單的介紹一下剥槐,本系列文章不會對RxJava的基本用法進行展開唱歧,如果有老哥對RxJava的基本使用掌握的不是很好的話,推薦這個系列的文章:給初學(xué)者的RxJava2.0教程(一)粒竖。
??簡單的說一下RxJava颅崩,RxJava是基于觀察者模式的一個框架,在RxJava中有兩個角色蕊苗,一個Observable沿后,通常被稱為被觀察者,一個是Observer朽砰,通常被稱為觀察者得运∠ヲ冢總體的架構(gòu)是,由Observable來處理任務(wù)或者發(fā)送事件熔掺,然后在Observer里面來接受到Observable發(fā)送過來的信息饱搏。
??RxJava有很多的優(yōu)勢,比如線程調(diào)度置逻,在Android里面推沸,耗時操作必須放在子線程中,但是同時還需要主線程來更細(xì)UI券坞,所以線程調(diào)度就顯得尤為重要鬓催。當(dāng)然RxJava還有很多重要的操作符,使得我們的開發(fā)變得非常的方便恨锚。本系列文章不會對每個操作符的基本使用展開宇驾,而是對一些比較常用的操作源碼分析,所說的常用猴伶,也是指樓主用到的?紊帷!畢竟是菜雞他挎,肯定有很多的東西都不太懂筝尾。
2.基本元素
??想要對RxJava的基本原理有一個更好的了解,必須對它的基本有一個大概的了解办桨。我們先通過一個簡單的案例筹淫,來對RxJava的基本元素進行提取。
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
??在這個簡單的案例當(dāng)中呢撞,我們可以提取的元素有:Observable
, ObservableOnSubscribe
, ObservableEmitter
,Observer
损姜。
??元素還是挺少的,我們現(xiàn)在對每個元素的類結(jié)構(gòu)來進行簡單的分析一下殊霞。
(1).Observable
public abstract class Observable<T> implements ObservableSource<T> {
}
??我們發(fā)現(xiàn)Observable本身是一個抽象類薛匪,并且實現(xiàn)了ObservableSource接口,在來看看ObservableSource接口里面有什么脓鹃。
public interface ObservableSource<T> {
void subscribe(@NonNull Observer<? super T> observer);
}
??ObservableSource接口里面只有一個subscribe
方法,也就是說逸尖,RxJava將注冊觀察者這部分的功能提取成一個接口,從而可以看出來瘸右,面向接口編程是多么的重要????娇跟。。太颤。
??再分別來看看我們上面案例中使用的兩個方法--create
和subscribe
苞俘。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
// 先省略代碼部分,待會詳細(xì)的分析龄章。
}
??啊吃谣,嚇我一跳乞封,我以為create方法的參數(shù)又是一個接口類型,還好是ObservableOnSubscribe
類型岗憋,也是上面提取出來的元素其中之一肃晚,關(guān)于這個類,待會會詳細(xì)的分析仔戈。
public final void subscribe(Observer<? super T> observer) {
//...
}
??這個方法就更加的簡單了关串,就是傳遞了一個Observer接口的對象。不過需要注意的是這個方法有很多的重載监徘,其中以Consumer類型的操作最為多晋修,不過這個也沒什么,最后還是Consumer轉(zhuǎn)換成為了Observer凰盔,這個就涉及到Observer接口的一個實現(xiàn)類--LambdaObserver
墓卦。不要害怕,待會都會一一的講解的户敬。
(2).Observer
??說了被觀察者落剪,我們先來看看觀察者--Observer
。
public interface Observer<T> {
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
}
??哎呀呀山叮,更加的簡單了, Observer只是簡單的接口添履,不過我們需要注意的是這個接口定義的4個方法屁倔,這里不講解四個方法的作用,畢竟我們這里將Observable的基本原理????暮胧。
(3).ObservableOnSubscribe
public interface ObservableOnSubscribe<T> {
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
??一如既往的接口锐借,subscribe
方法里面就是具體做事情的地方,這個相信大佬們應(yīng)該都知道往衷,我這里就班門弄斧的提醒一下????钞翔。
(4).ObservableEmitter
public interface ObservableEmitter<T> extends Emitter<T> {
void setDisposable(@Nullable Disposable d);
void setCancellable(@Nullable Cancellable c);
boolean isDisposed();
ObservableEmitter<T> serialize();
boolean tryOnError(@NonNull Throwable t);
}
??ObservableEmitter
也是一個接口,同時繼承了Emitter
接口席舍,我們來看看Emitter
接口的定義
public interface Emitter<T> {
void onNext(@NonNull T value);
void onError(@NonNull Throwable error);
void onComplete();
}
??作為一個發(fā)射器布轿,Emitter
里面定義了很多關(guān)于發(fā)送消息給Observer
的方法,Emitter
的onNext
對應(yīng)著Observer
的onNext
方法,其他的方法也是類似的来颤。
3.Observable的工作原理
(1).create方法
??我們對相關(guān)部分的基本元素有了一個基本的了解汰扭,現(xiàn)在我們來對整個流程的工作原理進行分析。首先我們create
方法入手
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
??create方法沒有我們想象中的那么難福铅,就只有兩行代碼萝毛,還有一行用來check的????。對于ObservableCreate
類這里先不進行分析滑黔,我們來看看 RxJavaPlugins
的onAssembly
方法笆包。
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
??這里提醒一下环揽,onAssembly
方法的參數(shù)類型是Observable
類型,也就是說ObservableCreate
本身就是一個Observable
庵佣。好了歉胶,扯了題外話,來看看onAssembly
方法具體是干嘛的秧了。
??整個方法的執(zhí)行過程比較簡單跨扮,如果onObservableAssembly
為null,直接就返回了source
,也就是說返回了ObservableCreate
本身验毡。而我們在整個Observable的源碼中發(fā)現(xiàn)衡创,onObservableAssembly
初始值本身為null。
public static void reset() {
//······
setOnObservableAssembly(null);
//······
}
??為什么需要這樣子繞圈子的做呢晶通?這里就是做了鉤子璃氢,以便于以后的擴展。
??所以Observable
的create
方法就是返回了一個ObservableCreate
對象狮辽,不過需要注意的是ObservableCreate
包裹了一個ObservableOnSubscribe
對象一也,也就是我們在create方法里面new的那個ObservableOnSubscribe
對象。
??我們先來不急著去理解ObservableCreate
是什么喉脖,還是來看看subscribe
方法為我們做了什么椰苟。
(2). subscribe方法
??當(dāng)我們通過Observable的create方法來獲取一個Observable對象時,通常還會調(diào)用Observable的subscribe方法來注冊一個觀察者∈鬟矗現(xiàn)在我們來看看subscribe方法的實現(xiàn)舆蝴。
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
??整個過程也不是想象中的那么神秘,除去check相關(guān)的方法不看题诵,歸根結(jié)底就是兩行代碼洁仗,先是通過RxJavaPlugins
的onSubscribe
方法來獲取Observer
對象,具體操作這里就不說了性锭,肯定跟RxJavaPlugins
的onAssembly
方法差不多赠潦,最后返回的是observer本身,最后調(diào)用了subscribeActual
方法草冈。這個subscribeActual
方法是干嘛的她奥?
protected abstract void subscribeActual(Observer<? super T> observer);
??臥了個槽?抽象方法怎棱!那我怎么知道調(diào)用的是哪個類的subscribeActual
方法方淤?不急哈,記得我們之前在create
方法返回的Observable
對象是哪個類的對象嗎蹄殃?想起來了吧携茂,是ObservableCreate
(3). ObservableCreate
??先來看看ObservableCreate
類結(jié)構(gòu)。
public final class ObservableCreate<T> extends Observable<T> {
}
??我們發(fā)現(xiàn)诅岩,ObservableCreate
繼承了Observable
,其實在分析create方法時讳苦,我也說過喲带膜。
??在ObservableCreate
類中,只有一個ObservableOnSubscribe
類型的成員變量鸳谜,這個成員變量就是我們在create
方法里面new的ObservableOnSubscribe
對象
??我們再來看看ObservableCreate
對subscribeActual
方法的實現(xiàn)
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
??在subscribeActual
方法里面膝藕,先是對Observer
對象進行一次包裝,將它包裝在CreateEmitter
類中咐扭。然后我們會發(fā)現(xiàn)兩個比較眼熟的方法onSubscribe
方法和subscribe
方法芭挽。其中onSubscribe
方法在Observer
里面看到過,而這里恰好是通過Observer
對象來調(diào)用的蝗肪,沒錯袜爪,這個的observer
就是在subscribe
方法里面new的對象⊙ι粒可是我們記得onSubscribe
方法的參數(shù)類型是Disposable
,而這里是一個CreateEmitter
辛馆。我們來看看CreateEmitter的類結(jié)構(gòu):
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
//······
}
??沒錯,CreateEmitter
實現(xiàn)了Disposable
接口,所以CreateEmitter
本身可以充當(dāng)Disposable
的角色豁延。
??調(diào)用了Observer
的onSubscribe
方法之后昙篙,然后就會調(diào)用ObservableOnSubscribe
的subscribe
方法。
??到這里诱咏,我們應(yīng)該徹底的明白了整個Observable
的工作流程苔可。我們通過create方法創(chuàng)建一個ObservableCreate
方法,然后調(diào)用了subscribe
方法來注冊了一個觀察者袋狞,在subscribe
方法里面又調(diào)用了subscribeActual
方法焚辅,在subscribeActual
方法里面先是調(diào)用了Observer
的onSubscribe
方法,然后調(diào)用了
ObservableOnSubscribe
的subscribe
方法硕并,在ObservableOnSubscribe
的subscribe
方法當(dāng)中法焰,具體的做的事有兩件:1.做我們自己的事情秧荆,比如從服務(wù)器上獲取數(shù)據(jù)之類倔毙;2.將發(fā)送信息到Observer
去。
??理解了整個流程的工作原理乙濒,我們現(xiàn)在來看看CreateEmitter
是怎么信息發(fā)給Observer
的陕赃。
4. CreateEmitter的工作原理
??我們知道,我們在ObservableOnSubscribe
的subscribe
方法里面使用ObservableEmitter
來發(fā)射信息到Observer
“涔桑現(xiàn)在我們來看看整個CreateEmitter
的工作原理么库,不過,我們還是先來看看這個類的結(jié)構(gòu)甘有,雖然上面已經(jīng)看了诉儒,但是擔(dān)心大佬們忘了:
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
//······
}
??在上面已經(jīng)說了CreateEmitter
實現(xiàn)了Disposable
接口,可以作為Disposable
對象來操作亏掀,在接下來忱反,我們將重點介紹Disposable
是怎么控制Observer
對信息的接收泛释,同時還會介紹CreateEmitter
作為ObservableEmitter
接口的那部分功能。
??之前在分析基本元素時温算,已經(jīng)說了ObservableEmitter
這個接口怜校,它實現(xiàn)了Emitter
接口。在Emitter
接口里面有三個方法用來發(fā)送信息給Observer
注竿,分別是:onNext
茄茁,onError
,onComplete
巩割。而CreateEmitter
類則是具體的實現(xiàn)了這三個方法裙顽,我們來看看。
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
??代碼是非常的簡單喂分,直接調(diào)用了Observer
的onNext
方法锦庸,也沒用什么高逼格的東西????。其余兩個方法也是如此蒲祈。只不過是甘萧,在調(diào)用onNext
方法時做了一個isDisposed
的判斷。
??所以感覺Disposable
才是這個類的核心梆掸。我們來看看isDisposed
方法:
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
??在isDisposed
方法里面調(diào)用了DisposableHelper
的isDisposed
方法扬卷。不過這里需要注意的是這里傳遞過去的是get方法的返回值,這個返回值什么意思酸钦?
??回到CreateEmitter
的類結(jié)構(gòu)怪得,發(fā)現(xiàn)它繼承了AtomicReference
類,所以get方法返回的是一個Disposable
對象卑硫。
??同時徒恋,我們發(fā)現(xiàn)CreateEmitter
的dispose
方法也是通過DisposableHelper
類進行進行操作的,看看要理解Disposable
的功能欢伏,必須了解DisposableHelper
是怎么操作的入挣。
5.DisposableHelper
??從感官上來說,一個發(fā)射器是否dispose
硝拧,直接設(shè)置一個boolean
類型的flag就OK了径筏,為什么搞得這么復(fù)雜,又是AtomicReference
障陶,又是DisposableHelper
滋恬。這一切,我們從DisposableHelper
來尋找答案抱究。
??首先我們還是來看看DisposableHelper
的結(jié)構(gòu):
public enum DisposableHelper implements Disposable {
DISPOSED
;
}
??DisposableHelper
本身是一個enum類型恢氯,同時實現(xiàn)了Disposable
接口。這里使用enum主要是為了做一個DISPOSED
的單例。然后在通過isDisposed
方法來判斷是否dispose
勋拟,可以直接與DISPOSED
比較遏暴。
public static boolean isDisposed(Disposable d) {
return d == DISPOSED;
}
??既然判斷是否dispose
是直接與DISPOSED
比較,那么如果dispose
的話指黎,應(yīng)該是將AtomicReference
里面的值設(shè)置為DISPOSED
吧朋凉?我們來看一下dispose
方法:
public static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get();
Disposable d = DISPOSED;
if (current != d) {
current = field.getAndSet(d);
if (current != d) {
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}
??果然,跟我們猜測一樣的醋安,AtomicReference
里面的值設(shè)置為DISPOSED
杂彭。只是,這里為了線程安全吓揪,做了很多的判斷操作亲怠。
??從這里我們可以得到,為什么需要設(shè)置DisposableHelper
來控制dispose
的狀態(tài)柠辞,那是因為線程安全团秽,如果直接設(shè)置一個flag,在有些情況下叭首,可能存在線程不安全的風(fēng)險习勤。同時為了代碼的優(yōu)雅,如果這部分的邏輯寫在CreateEmitter
里面焙格,會不會顯得冗雜呢图毕?
6.總結(jié)
??寫到這里,我感覺也差不多了眷唉。這里對著部分的知識做一個總結(jié)予颤。
??1.在整個流程中,基本有Observable
,ObservableOnSubscribe
,ObservableEmitter
,Observer
冬阳,如果想要對整個過程有一個大概的理解蛤虐,必須對這幾個元素有基本的認(rèn)識。
??2.Observer
的onNext
之類方法的觸發(fā)時機肝陪,實際上是Observable
的subscribe
方法驳庭,因為subscribe
方法調(diào)用了Observable
的subscribeActual
方法,而在subscribeActual
方法里面做了兩部分的操作:1.直接調(diào)用了Observer
的onSubscribe
方法见坑;2.使用ObservableEmitter
將Observer
包裹起來嚷掠,所以我們在ObservableOnSubscribe
的subscribe
方法用ObservableEmitter
來發(fā)射信息捏检,相當(dāng)于調(diào)用了Observer
的相關(guān)方法。
??3.在ObservableEmitter
的onNext
之類方法里面,存在一種類似AOP的代碼魄懂,因為在調(diào)用Observer
的相關(guān)方法鸭叙,做了一些其他的操作。