轉(zhuǎn)載請標(biāo)明出處:
http://www.reibang.com/p/23c38a4ed360
本文出自:【張旭童的簡書】 (http://www.reibang.com/users/8e91ff99b072/latest_articles)
概述
最近事情太多了桅锄,現(xiàn)在公司內(nèi)部的變動又固,自己崗位的變化脆诉,以及最近決定找工作棵红。所以博客耽誤了衷咽,準(zhǔn)備面試中演熟,打算看一看RxJava2的源碼景描,遂有了這篇文章堰乔。
不會對RxJava2的源碼逐字逐句的閱讀万哪,只尋找關(guān)鍵處侠驯,我們平時接觸得到的那些代碼。
背壓實際中接觸較少奕巍,故只分析了Observable
.
分析的源碼版本為:2.0.1
我們的目的:
- 知道源頭(
Observable
)是如何將數(shù)據(jù)發(fā)送出去的吟策。 - 知道終點(
Observer
)是如何接收到數(shù)據(jù)的。 - 何時將源頭和終點關(guān)聯(lián)起來的
- 知道線程調(diào)度是怎么實現(xiàn)的
- 知道操作符是怎么實現(xiàn)的
本文先達(dá)到目的1 的止,2 踊挠,3。
我個人認(rèn)為主要還是適配器模式的體現(xiàn)冲杀,我們接觸的就只有Observable
和Observer
效床,其實內(nèi)部有大量的中間對象在適配:將它們兩聯(lián)系起來,加入一些額外功能权谁,例如考慮dispose和hook等剩檀。
從create開始。
這是一段不涉及操作符和線程切換的簡單例子:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("1");
e.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe() called with: d = [" + d + "]");
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext() called with: value = [" + value + "]");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError() called with: e = [" + e + "]");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete() called");
}
});
拿 create來說旺芽,
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
//.....
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
返回值是Observable
,參數(shù)是ObservableOnSubscribe
,定義如下:
public interface ObservableOnSubscribe<T> {
void subscribe(ObservableEmitter<T> e) throws Exception;
}
ObservableOnSubscribe
是一個接口沪猴,里面就一個方法,也是我們實現(xiàn)的那個方法:
該方法的參數(shù)是 ObservableEmitter
,我認(rèn)為它是關(guān)聯(lián)起 Disposable
概念的一層:
public interface ObservableEmitter<T> extends Emitter<T> {
void setDisposable(Disposable d);
void setCancellable(Cancellable c);
boolean isDisposed();
ObservableEmitter<T> serialize();
}
ObservableEmitter
也是一個接口采章。里面方法很多运嗜,它也繼承了 Emitter<T>
接口。
public interface Emitter<T> {
void onNext(T value);
void onError(Throwable error);
void onComplete();
}
Emitter<T>
定義了 我們在ObservableOnSubscribe
中實現(xiàn)subscribe()
方法里最常用的三個方法悯舟。
好担租,我們回到原點,create()
方法里就一句話return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
抵怎,其中提到RxJavaPlugins.onAssembly()
:
/**
* Calls the associated hook function.
* @param <T> the value type
* @param source the hook's input value
* @return the value returned by the hook
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public static <T> Observable<T> onAssembly(Observable<T> source) {
Function<Observable, Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
可以看到這是一個關(guān)于hook的方法奋救,關(guān)于hook我們暫且不表岭参,不影響主流程,我們默認(rèn)使用中都沒有hook尝艘,所以這里就是直接返回source
,即傳入的對象演侯,也就是new ObservableCreate<T>(source)
.
ObservableCreate
我認(rèn)為算是一種適配器的體現(xiàn),create()
需要返回的是Observable
,而我現(xiàn)在有的是(方法傳入的是)ObservableOnSubscribe
對象背亥,ObservableCreate
將ObservableOnSubscribe
適配成Observable
秒际。
其中subscribeActual()
方法表示的是被訂閱時真正被執(zhí)行的方法,放后面解析:
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
OK,至此狡汉,創(chuàng)建流程結(jié)束程癌,我們得到了Observable<T>
對象,其實就是ObservableCreate<T>
.
到訂閱subscribe 結(jié)束
subscribe()
:
public final void subscribe(Observer<? super T> observer) {
...
try {
//1 hook相關(guān)轴猎,略過
observer = RxJavaPlugins.onSubscribe(this, observer);
...
//2 真正的訂閱處
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
//3 錯誤處理嵌莉,
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
//4 hook錯誤相關(guān),略過
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
關(guān)于hook的代碼:
可以看到如果沒有hook捻脖,即相應(yīng)的對象是null锐峭,則是傳入什么返回什么的。
/**
* Calls the associated hook function.
* @param <T> the value type
* @param source the hook's input value
* @param observer the observer
* @return the value returned by the hook
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public static <T> Observer<? super T> onSubscribe(Observable<T> source, Observer<? super T> observer) {
//1 默認(rèn)onObservableSubscribe(可理解為一個flatmap的操作)是null
BiFunction<Observable, Observer, Observer> f = onObservableSubscribe;
//2 所以這句跳過可婶,不會對其進(jìn)行apply
if (f != null) {
return apply(f, source, observer);
}
//3 返回參數(shù)2
return observer;
}
我也是驗證了一下 三個Hook相關(guān)的變量沿癞,確實是null:
Consumer<Throwable> errorHandler = RxJavaPlugins.getErrorHandler();
BiFunction<Observable, Observer, Observer> onObservableSubscribe = RxJavaPlugins.getOnObservableSubscribe();
Function<Observable, Observable> onObservableAssembly = RxJavaPlugins.getOnObservableAssembly();
Log.e(TAG, "errorHandler = [" + errorHandler + "]");
Log.e(TAG, "onObservableSubscribe = [" + onObservableSubscribe + "]");
Log.e(TAG, "onObservableAssembly = [" + onObservableAssembly + "]");
所以訂閱時的重點就是:
//2 真正的訂閱處
subscribeActual(observer);
我們將第一節(jié)提到的ObservableCreate
里的subscribeActual()
方法拿出來看看:
@Override
protected void subscribeActual(Observer<? super T> observer) {
//1 創(chuàng)建CreateEmitter,也是一個適配器
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//2 onSubscribe()參數(shù)是Disposable 矛渴,所以CreateEmitter可以將Observer->Disposable 椎扬。還有一點要注意的是`onSubscribe()`是在我們執(zhí)行`subscribe()`這句代碼的那個線程回調(diào)的,并不受線程調(diào)度影響具温。
observer.onSubscribe(parent);
try {
//3 將ObservableOnSubscribe(源頭)與CreateEmitter(Observer蚕涤,終點)聯(lián)系起來
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
//4 錯誤回調(diào)
parent.onError(ex);
}
}
Observer
是一個接口,里面就四個方法铣猩,我們在開頭的例子中已經(jīng)全部實現(xiàn)(打印Log)揖铜。
public interface Observer<T> {
void onSubscribe(Disposable d);
void onNext(T value);
void onError(Throwable e);
void onComplete();
}
重點在這一句:
//3 將ObservableOnSubscribe(源頭)與CreateEmitter(Observer,終點)聯(lián)系起來
source.subscribe(parent);
source
即ObservableOnSubscribe
對象达皿,在本文中是:
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("1");
e.onComplete();
}
}
則會調(diào)用parent.onNext()
和parent.onComplete()
天吓,parent
是CreateEmitter
對象,如下:
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
...
//如果沒有被dispose峦椰,會調(diào)用Observer的onNext()方法
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
...
//1 如果沒有被dispose龄寞,會調(diào)用Observer的onError()方法
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
//2 一定會自動dispose()
dispose();
}
} else {
//3 如果已經(jīng)被dispose了,會拋出異常汤功。所以onError物邑、onComplete彼此互斥,只能被調(diào)用一次
RxJavaPlugins.onError(t);
}
}
@Override
public void onComplete() {
//1 如果沒有被dispose,會調(diào)用Observer的onComplete()方法
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
//2 一定會自動dispose()
dispose();
}
}
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
總結(jié)重點:
-
Observable
和Observer
的關(guān)系沒有被dispose
拂封,才會回調(diào)Observer
的onXXXX()
方法 -
Observer
的onComplete()
和onError()
互斥只能執(zhí)行一次,因為CreateEmitter
在回調(diào)他們兩中任意一個后鹦蠕,都會自動dispose()
冒签。根據(jù)第一點,驗證此結(jié)論钟病。 -
Observable
和Observer
關(guān)聯(lián)時(訂閱時)萧恕,Observable
才會開始發(fā)送數(shù)據(jù)。 -
ObservableCreate
將ObservableOnSubscribe
(真正的源)->Observable
. -
ObservableOnSubscribe
(真正的源)需要的是發(fā)射器ObservableEmitter
. -
CreateEmitter
將Observer
->ObservableEmitter
,同時它也是Disposable
. - 先
error
后complete
肠阱,complete
不顯示票唆。 反之會crash,感興趣的可以寫如下代碼驗證屹徘。
e.onNext("1");
//先error后complete走趋,complete不顯示。 反之 會crash
//e.onError(new IOException("sb error"));
e.onComplete();
e.onError(new IOException("sb error"));
一個好玩的地方DisposableHelper
原本到這里噪伊,最簡單的一個流程我們算是搞清了簿煌。
還值得一提的是,DisposableHelper.dispose(this);
DisposableHelper
很有趣鉴吹,它是一個枚舉姨伟,這是利用枚舉實現(xiàn)了一個單例disposed state
,即是否disposed,如果Disposable
類型的變量的引用等于DISPOSED
,則起點和終點已經(jīng)斷開聯(lián)系豆励。
其中大多數(shù)方法 都是靜態(tài)方法夺荒,所以isDisposed()
方法的實現(xiàn)就很簡單,直接比較引用即可.
其他的幾個方法良蒸,和AtomicReference
類攪基在了一起技扼。
這是一個實現(xiàn)引用原子操作的類,對象引用的原子更新
嫩痰,常用方法如下:
//返回當(dāng)前的引用淮摔。
V get()
//如果當(dāng)前值與給定的expect引用相等,(注意是引用相等而不是equals()相等)始赎,更新為指定的update值和橙。
boolean compareAndSet(V expect, V update)
//原子地設(shè)為給定值并返回舊值。
V getAndSet(V newValue)
OK,鋪墊完了我們看看源碼吧:
public enum DisposableHelper implements Disposable {
/**
* The singleton instance representing a terminal, disposed state, don't leak it.
*/
DISPOSED
;
public static boolean isDisposed(Disposable d) {
return d == DISPOSED;
}
public static boolean dispose(AtomicReference<Disposable> field) {
//1 通過斷點查看造垛,默認(rèn)情況下,field的值是"null"魔招,并非引用是null哦!大坑大坑大坑
//但是current是null引用
Disposable current = field.get();
Disposable d = DISPOSED;
//2 null不等于DISPOSED
if (current != d) {
//3 field是DISPOSED了五辽,current還是null
current = field.getAndSet(d);
if (current != d) {
//4 默認(rèn)情況下 走不到這里办斑,這里是在設(shè)置了setCancellable()后會走到。
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}
總結(jié)
- 在
subscribeActual()
方法中,源頭和終點關(guān)聯(lián)起來乡翅。 -
source.subscribe(parent);
這句代碼執(zhí)行時鳞疲,才開始從發(fā)送ObservableOnSubscribe
中利用ObservableEmitter
發(fā)送數(shù)據(jù)給Observer
。即數(shù)據(jù)是從源頭push給終點的蠕蚜。 -
CreateEmitter
中么夫,只有Observable
和Observer
的關(guān)系沒有被dispose
其掂,才會回調(diào)Observer
的onXXXX()
方法 -
Observer
的onComplete()
和onError()
互斥只能執(zhí)行一次帝美,因為CreateEmitter
在回調(diào)他們兩中任意一個后骂澄,都會自動dispose()
。根據(jù)上一點挣柬,驗證此結(jié)論潮酒。 - 先
error
后complete
,complete
不顯示邪蛔。 反之會crash - 還有一點要注意的是
onSubscribe()
是在我們執(zhí)行subscribe()
這句代碼的那個線程回調(diào)的急黎,并不受線程調(diào)度影響。
轉(zhuǎn)載請標(biāo)明出處:
http://www.reibang.com/p/23c38a4ed360
本文出自:【張旭童的簡書】 (http://www.reibang.com/users/8e91ff99b072/latest_articles)