在之前的文章中我們介紹了 RxJava 2 的常用的 API 的方法總結澜建、背壓的概念以及 RxJava 2 在項目中的實際應用署恍。在本節(jié)中碧查,我們將要對 RxJava 2 的源碼進行分析约啊。下面是之前文章的一些鏈接喻喳,如果對 RxJava 2 的使用比較感興趣另玖,你可以通過下面的文章進行學習:
下面我們就從 RxJava 2 的一個簡單的示例來分析下 RxJava 2 是的主流程、設計模式以及 RxJava 2 是如何實現(xiàn)線程切換的表伦。
1谦去、RxJava 的主流程源碼分析
下面是 RxJava 的一個非常典型的使用示例,在該示例中蹦哼,我們在 IO 線程中執(zhí)行業(yè)務邏輯鳄哭,在主線程中對執(zhí)行的結果進行后續(xù)的處理。
Disposable disposable = Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
// 在這里執(zhí)行業(yè)務邏輯
emitter.onNext(new Object());
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
// 在主線程中進行后續(xù)的處理
}
});
disposable.dispose();
我們將這段程序分成四個階段來進行分析:1). 調用 create()
方法的執(zhí)行過程纲熏;2). 調用 subscribeOn(Schedulers.io())
和 observeOn(AndroidSchedulers.mainThread())
實現(xiàn)線程切換的過程妆丘;3). 使用 subscribe()
進行訂閱的工程锄俄;4). 調用 dispose()
方法取消訂閱的過程。
下面先來看第一個階段的執(zhí)行過程勺拣。
1.1 create() 和 subscribe() 方法的執(zhí)行過程
下面是調用了 create()
方法之后的執(zhí)行過程奶赠,在下面的代碼中,我們省略了 null 的檢測相關的邏輯药有。在當前的小節(jié)中毅戈,我們假設沒有指定線程切換相關的邏輯。也就是調用了 create()
之后塑猖,緊接著調用了 subscribe()
方法竹祷。
對于 RxJavaPlugins 的靜態(tài)方法,比如 onAssembly()
等羊苟,暫時我們先不考慮它的用途塑陵。你可以將其看作直接將傳入的參數的值返回。比如下面的 create()
方法將返回 ObservableCreate
的實例蜡励。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
// 看作直接返回了 new ObservableCreate<T>(source) 即可
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 對傳入的觀察者進行包裝
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 調用觀察者的訂閱回調方法
observer.onSubscribe(parent);
try {
// 真正執(zhí)行訂閱的地方
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
static final class CreateEmitter<T> extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext ..."));
return;
}
if (!isDisposed()) {
// 調用傳入的觀察者的 onNext() 方法
observer.onNext(t);
}
}
@Override
public void dispose() {
// 取消訂閱
DisposableHelper.dispose(this);
}
// ...
}
// ...
}
上面是第一個階段的執(zhí)行過程令花。這里我們省去了一些代碼,只保留了比較具有代表性的一些方法凉倚。也許你現(xiàn)在還對這部分代碼看得云里霧里兼都,沒關系,看了下面的內容你會慢慢理解稽寒。
接下來我們看下當調用了 subscribe()
方法之后的處理扮碧。
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
// 將三種類型的觀察者回調統(tǒng)一包裝到 LambdaObserver 方法中
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
public final void subscribe(Observer<? super T> observer) {
try {
// 看作直接返回 observer 即可
observer = RxJavaPlugins.onSubscribe(this, observer);
// 調用了 subscribeActual() 方法
subscribeActual(observer);
} catch (NullPointerException e) {
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
throw new NullPointerException("Actually not, but can't throw other exceptions due to RS");
}
}
上面的這些方法都定義在 Observable 中,區(qū)別只在于調用的對象杏糙。所以慎王,為了更清晰地分析這個過程,我們使用大寫字母來進行分析:
首先宏侍,整體的執(zhí)行過程是赖淤,
D = Observalbe.create(S).subscribe(X,Y,Z);
它可以被拆解成下面的兩個步驟來分析(下面是一份偽代碼,只是按照時間的調用順序來排序的):
A = Observable.create(S);
D = A.subscribe(X,Y,Z);
然后谅河,調用 A 的 subscribe()
方法的時候咱旱,實際上會調用到 Observable 的 subscribe()
方法(就是上面的代碼)。所以绷耍,按照調用的過程吐限,上面的偽代碼將變成下面這個樣子,
A = Observable.create(S)
O = LambdaObserver(X,Y,Z)
D = A.subscribe(O)
A.subscribeActual(O)
于是我們可以得知锨天,當調用了 subscribe()
方法的時候毯盈,實際上調用了 A 的 subscribeActual()
方法,并將 B 作為參數傳入病袄。B 是 LambdaObserver搂赋,由我們調用 subscribe()
的時候傳入的三個參數組成观话。那么 A 呢样眠?回到之前的 create()
代碼中税手,我們得知它就是 ObservableCreate
的實例赖钞。這里會調用到它的 subscribeActual()
方法。按照字母表示的方式宋欺,該方法將會成為下面這個樣子轰豆,
@Override
protected void subscribeActual(O) {
P = new CreateEmitter<T>(O);
O.onSubscribe(P);
S.subscribe(P);
}
這里的 S 是由 ObservableCreate 的構造方法傳入的,也就是我們在 create()
方法中傳入的對象齿诞。首先酸休,這里會將 O 作為構造方法的參數傳入到 CreateEmitter
實例中。然后祷杈,回調 O 的 onSubscribe()
方法并將 P 傳出斑司。這是我們常用的 RxJava 的回調方法之一。第三步中但汞,我們調用了 S 的 subscribe()
方法并將 P 傳出宿刮。所以,當我們按照示例代碼的方式調用下面這行代碼的時候私蕾,
emitter.onNext(new Object());
實際上是調用了這里的 P 的方法僵缺。那么,我們來看 P 的 onNext()
方法踩叭,
@Override
public void onNext(T t) {
O.onNext(t);
}
它通過調用 O 的 onNext()
方法實現(xiàn)磕潮。所以,到頭來容贝,其實還是回調了我們的在 subscribe()
方法中傳入的 Consumer 的方法揉抵。這樣就通過回調的方式把我們發(fā)送的值,傳遞給了我們的觀察方法嗤疯。
1.2 dispose()
方法的執(zhí)行過程
上面分析了 create()
和 subscribe()
方法的主流程。那么 dispose()
方法呢闺兢?
按照上面給出的代碼茂缚,它的定義如下。也就是通過 DisposableHelper
的 dispose()
方法來最終完成取消訂閱屋谭。
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
DisposableHelper
的 dispose()
方法的定義如下脚囊。按照上面的分析,dispose()
的時候傳入的 this 就是 CreateEmitter. 并且它是繼承了 AtomicReference<Disposable>
的桐磁。
public static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get();
Disposable d = DISPOSED;
if (current != d) {
current = field.getAndSet(d);
if (current != d) {
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}
對 AtomicReference悔耘,相比大家都不陌生,它是一個原子類型的引用我擂。這里正式通過對該原子類型引用的賦值來完成取消訂閱的——通過一個原子操作將其設置為 DISPOSED.
1.3 RxJava 執(zhí)行過程的總結
上面我們總結了 RxJava 的 Observable 從 create()
到 subscribe()
到 dispose()
方法的執(zhí)行過程衬以。雖然缓艳,我們依靠自己的邏輯能夠把整個流程梳理下來,但是這太笨拙了看峻。除了掌握了整個流程阶淘,我想我們更應該分析下它使用的設計思想。
一開始互妓,當我們分析到上面的流程的時候溪窒,我也是云里霧里,但是當我繼續(xù)分析了 subscribeOn()
的時候才恍然大悟——它整體的設計使用的設計模式和 Java 中的流是一致的冯勉。在真正分析 subscribeOn()
之前澈蚌,我們先來看下它的代碼,
public final Observable<T> subscribeOn(Scheduler scheduler) {
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
// ...
}
對比一下 subscribeOn()
方法和 create()
方法灼狰,我們可以很容易地發(fā)現(xiàn)宛瞄,它們的邏輯幾乎是一致的。都是傳入了一個 ObservableSource 之后對其進行包裝伏嗜,然后在 subscribeActual()
方法中坛悉,得到一個 parent,然后調用 onSubscribe()
繼而進行后續(xù)處理……也就是它和 Java 的 IO 體系一樣承绸,都使用了裝飾者設計模式裸影。
在 Java 的 IO 體系中,我們經尘可見下面的代碼轩猩。
InputStream is = new FileInputStream(fileToCopy);
BufferedInputStream bis = new BufferedInputStream(is, 15 * 1024);
DataInputStream dis = new DataInputStream(bis);
這里的 FileInputStream 是節(jié)點流,用來打開磁盤上面的輸入流荡澎。后續(xù)的 BufferedInputStream 和 DataInputStream 都用來對節(jié)點流進行修飾均践。它們各自只需要完成自己的功能,前者主要負責緩存以提升讀取速率摩幔,后者用來將得到的流轉換成我們需要的數據類型彤委。如果我們由其他的需求只需要在這個鏈的基礎上實現(xiàn)一個自定義的裝飾器即可。
回想一下我們在實際的開發(fā)過程中是不是經常使用鏈式來調用一大串或衡,中間的各個環(huán)節(jié)分別來實現(xiàn)自己的功能焦影,比如轉換、過濾封断、統(tǒng)計等等斯辰。使用了裝飾者模式之后,鏈的每個環(huán)節(jié)只需要實現(xiàn)自己的功能坡疼,使用者可以根據自己的需求在鏈上面增加環(huán)節(jié)彬呻。所以,類似于轉換、過濾闸氮、統(tǒng)計等等剪况,每個類的責任變得單一了,從整個調用鏈上面解耦出來湖苞。真是不得不佩服 RxJava 的這種設計拯欧!
知道了 RxJava 的整體使用的是裝飾者設計模式,我們理解其它的一些特性來就容易得多财骨。按照裝飾者設計模式的思路镐作,RxJava 的包裝過程和調用 subscribe()
方法之后的回調過程將如下所示:
所以,為什么 RxJava 為人詬病其調用棧太長隆箩,就是因為當我們使用一個個的裝飾器套起來的時候该贾,導致整個調用的棧變得很長。
另外捌臊,捎帶說一下所謂的線程切換的問題杨蛋。假如我們在上述調用過程中的 4 處使用了 subscribeOn()
方法,并指定處理的線程為 A理澎;在 5 處同樣調用該方法逞力,但是指定的線程為 B,那么之前的 1~3 的過程會被包裝成一個對象糠爬,放在 4 指定的線程中執(zhí)行寇荧;然后 4 又被包裝成一個對象放在 5 所在的線程。因此执隧,如果我們在 2 中獲取當前線程揩抡,那么肯定得到的是 4 所在的線程。也就是當使用兩個 subscribeOn()
的時候镀琉,通常會被認為只有第一個有效的原因峦嗤。其實兩個都有效,只是 A 是在 B 中執(zhí)行的屋摔,而 1~3 又是在 A 中執(zhí)行的烁设。所以,所謂的線程切換到奧秘啊钓试,就是依靠這層包裹的關系實現(xiàn)的署尤。一個線程里面把任務執(zhí)行完了,自然就切換到另一個線程里了亚侠。(subscribeOn()
和 observeOn()
實現(xiàn)線程的時候稍有區(qū)別,詳情看下文俗扇。)
1.4 RxJava 的線程切換的執(zhí)行過程
上面我們也提到過 subscribeOn()
和 observeOn()
實現(xiàn)線程切換的方式有所不同硝烂。所以,在下面的文章中铜幽,我們分成兩種情況來分別對其進行分析滞谢。
當調用 subscribeOn()
方法的時候串稀,上流傳入的 Observable 將會被進一步裝飾成 ObservableSubscribeOn 對象。按照我們上面的分析狮杨,當最終調用 subscribe()
方法的時候母截,將會沿著裝飾器構成的鏈,直到 ObservableSubscribeOn 的 subscribeActual()
方法中橄教。下面就是該方法的定義清寇,
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
除了上面分析的內容,這里多了一個 scheduler护蝶,它就是我們調用 subscribeOn()
方法時指定的線程华烟。這里會直接調用它的 scheduleDirect()
方法將任務添加到線程池當中執(zhí)行。這里傳入的是 SubscribeTask 對象持灰,它實現(xiàn)了 Runnable
接口盔夜,并且會在覆寫的 run()
方法中調用傳入的 parent 的 subscribe()
方法。因此堤魁,它可以被放入任何線程池當中執(zhí)行喂链,并且當被執(zhí)行的時候會調用傳入的 Observable 的 subscribe()
方法來讓上流的任務在該線程池當中執(zhí)行。
下面是 RxJava 中異步任務執(zhí)行的流程圖妥泉,
這里的傳入的 Schduler 是一個頂層的類椭微,當我們調用 Schedulers.io()
等方法的時候,會獲取其實現(xiàn)類的實例涛漂,比如 IOScheduler. 上面調用 scheduleDirect()
方法之后會先使用 Scheduler 的模板方法 createWorker()
中獲取到一個 Worker. 這個類用來對 RxJava 的任務進行管理赏表。它會進一步調用自己的 schedule()
方法來進一步安排任務的執(zhí)行。圖中的 Worker 也是一個抽象類匈仗,上面用到的 NewThreadWorker 是它的一個實現(xiàn)瓢剿。NewThreadWorker 中維護了一個線程池,當調用了它的 scheduler()
方法的時候悠轩,它就會進一步把該任務放進線程池當中執(zhí)行间狂。因此,我們的異步任務就在該線程池當中被執(zhí)行了火架。
然后鉴象,我們再來看下 observeOn()
方法是如何進行任務調度的。
當我們調用 observeOn()
方法的時候何鸡,該任務會被包裝成 ObservableObserveOn 的實例纺弊。同樣,我們來看它的 subscribeActual()
方法骡男,
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
它會直接調用 Scheduler 的模板方法得到 Worker淆游,然后將 Worker 和傳入的 Observer 一起包裝到 ObserveOnObserver 中。它會被繼續(xù)向上傳遞到 ObservableCreate 中,然后它的 onNext()
等方法將會被頂層的類觸發(fā)犹菱。接下來拾稳,我們就看下 ObserveOnObserver 的定義,這里我們仍然只以 onNext()
為例腊脱,其方法源碼如下访得,
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
因此,可以看出陕凹,在 observeOn()
方法中也是通過將任務放進某個 Worker 中執(zhí)行來實現(xiàn)的悍抑,只是具體的線程將取決于 Scheduler 和 Worker 的具體實現(xiàn)。
而 Android 中的將任務放進主線程當中去執(zhí)行就是通過向主線程的 Handler 發(fā)送消息來實現(xiàn)的捆姜。如果按照 subscribeOn()
的解釋传趾,那么當 A 線程啟動 B 線程執(zhí)行任務,那么 B 執(zhí)行完自然就到了 A 了泥技。那么為什么 Android 中還需要向主線程中發(fā)送消息呢浆兰?我們使用下面的圖來解釋。
subscribeOn()
是一個向上回調的過程珊豹,當 A 線程啟動 B 線程執(zhí)行任務簸呈,那么 B 執(zhí)行完自然就到了 A 了,沒有問題店茶。但 observeOn()
是一個向下調用的過程蜕便,從上面的代碼中也可以看出,它直接在線程池當中調用 onNext()
的時候會沿著回調相反的路線從上往下執(zhí)行贩幻,因此 observeOn()
之后所有的邏輯在它指定的線程中執(zhí)行轿腺。
2、總結
在本篇文章中丛楚,我們總結了 RxJava 2 的源碼族壳。雖然 RxJava 的功能非常強大,但是其核心的實現(xiàn)卻僅僅依賴兩個設計模式趣些,一個是觀察者模式仿荆,另一個是裝飾器模式。它采用了類似于 Java 的流的設計坏平,每個裝飾器負責自己一種任務拢操,這復合單一責任原則;各個裝飾器之間相互協(xié)作舶替,來完成復雜的功能令境。從上面的源碼分析過程中我們也可以看出,RxJava 的缺點也是非常明顯的顾瞪,大量的自定義類展父,在完成一個功能的時候各裝飾器之間不斷包裝返劲,導致調用的棧非常長。至于線程的切換栖茉,它依賴于自己的裝飾器模式,因為一個裝飾器可以決定其上游的 Observable 在哪些線程當中執(zhí)行孵延;兩個裝飾器處于不同的線程的時候吕漂,從一個線程中執(zhí)行完畢自然進入到另一個線程中執(zhí)行就完成了線程切換的過程。
以上就是 RxJava 的源碼分析尘应,如有疑問惶凝,歡迎評論區(qū)交流:)