轉(zhuǎn)載或分享請事先與本人取得聯(lián)系
前言
之前就寫過一篇《關(guān)于Rxjava最友好的文章》,反響很不錯攘轩,由于那篇文章的定位就是簡單友好桐猬,因此盡可能的摒棄復(fù)雜的概念插龄,只抓住關(guān)鍵的東西來講琅束,以保證大家都能看懂。
不過那篇文章寫完之后低缩,我就覺得應(yīng)該還得有一篇文章給RxJava做一個深入的講解才算完美嘉冒,于是就有了今天的進階篇。因為一個團隊里可能大家都會用RxJava咆繁,但是必須要有一個人很懂這個讳推,不然碰到問題可就麻煩了。
在前一篇文章中的最后玩般,我們得出結(jié)論:RxJava就是在觀察者模式的骨架下银觅,通過豐富的操作符和便捷的異步操作來完成對于復(fù)雜業(yè)務(wù)的處理。今天我們還是就結(jié)論中的觀察者模式和操作符來做深入的拓展坏为。
在進入正題之前究驴,還是希望大家先去我的主頁上看看《關(guān)于Rxjava最友好的文章》。
關(guān)于觀察者模式
前一篇文章首先就重點談到了觀察者模式匀伏,我們認(rèn)為觀察者模式RxJava的骨架*洒忧。在這里不是要推翻之前的結(jié)論,而是希望從深入它的內(nèi)部的去了解它的實現(xiàn)够颠。
依然使用之前文章中關(guān)于開關(guān)和臺燈的代碼
//創(chuàng)建一個被觀察者(開關(guān))
Observable switcher=Observable.create(new Observable.OnSubscribe<String>(){
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("On");
subscriber.onNext("Off");
subscriber.onNext("On");
subscriber.onNext("On");
subscriber.onCompleted();
}
});
//創(chuàng)建一個觀察者(臺燈)
Subscriber light=new Subscriber<String>() {
@Override
public void onCompleted() {
//被觀察者的onCompleted()事件會走到這里;
Log.d("DDDDDD","結(jié)束觀察...\n");
}
@Override
public void onError(Throwable e) {
//出現(xiàn)錯誤會調(diào)用這個方法
}
@Override
public void onNext(String s) {
//處理傳過來的onNext事件
Log.d("DDDDD","handle this---"+s)
}
//訂閱
switcher.subscribe(light);
以上就是一個RxJava觀察者架構(gòu)熙侍,
看到這樣的代碼不知道你會不會有一些疑惑:
- 被觀察者中的Observable.OnSubscribe是什么,有什么用履磨?
- call(subscriber)方法中蛉抓,subscriber哪里來的?
- 為什么只有在訂閱之后,被觀察者才會開始發(fā)送消息剃诅?
其實芝雪,這些問題都可以通過了解OnSubscribe來解決。
那我們先來看看關(guān)于OnSubscribe的定義
//上一篇文章也提到Acton1這個接口综苔,內(nèi)部只有一個待實現(xiàn)call()方法
//沒啥特別惩系,人畜無害
public interface Action1<T> extends Action {
void call(T t);
}
//OnSubscribe繼承了這個Action1接口
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
// OnSubscribe仍然是個接口
}
那么也就是說,OnSubscribe本質(zhì)上也是和 Action1一樣的接口如筛,只不過它專門用于Observable內(nèi)部堡牡。
而在Observable觀察者的類中,OnSubscribe是它唯一的屬性,同時也是Observable構(gòu)造函數(shù)中唯一必須傳入的參數(shù)杨刨,也就是說晤柄,只要創(chuàng)建了Observable,那么內(nèi)部也一定有一個OnSubscribe對象妖胀。
當(dāng)然芥颈,Observable是沒有辦法直接new的惠勒,我們只能通過create(),just()等等方法創(chuàng)建爬坑,當(dāng)然纠屋,這些方法背后去調(diào)用了new Observable(onSubscribe)
public class Observable<T> {
//唯一的屬性
final OnSubscribe<T> onSubscribe;
//構(gòu)造函數(shù),因為protected盾计,我們只能使用create函數(shù)
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
//create(onSubscribe) 內(nèi)部調(diào)用構(gòu)造函數(shù)售担。
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
....
....
}
當(dāng)創(chuàng)建了Observable和Subscribe之后,調(diào)用subscribe(subscriber)方法時署辉,發(fā)生了什么呢族铆?
//傳入了觀察者對象
public final Subscription subscribe(final Observer<? super T> observer) {
....
//往下調(diào)用
return subscribe(new ObserverSubscriber<T>(observer));
}
public final Subscription subscribe(Subscriber<? super T> subscriber) {
//往下調(diào)用
return Observable.subscribe(subscriber, this);
}
//調(diào)用到這個函數(shù)
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// new Subscriber so onStart it
subscriber.onStart();
// add a significant depth to already huge call stacks.
try {
// 在這里簡單講,對onSubscribe進行封裝哭尝,不必緊張哥攘。
OnSubscribe onSubscribe=RxJavaHooks.onObservableStart(observable, observable.onSubscribe);
//這個才是重點!2酿小逝淹!
//這個調(diào)用的具體實現(xiàn)方法就是我們創(chuàng)建觀察者時
//寫在Observable.create()中的call()呀
//而調(diào)用了那個call(),就意味著事件開始發(fā)送了
onSubscribe.call(subscriber);
//不信你往回看
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
....
....
}
return Subscriptions.unsubscribed();
}
}
代碼看到這里,我們就可以對上面三個問題做統(tǒng)一的回答了:
- onSubscribe是Observable內(nèi)部唯一屬性侠姑,是連接Observable和subscriber的關(guān)鍵,相當(dāng)于連接臺燈和開關(guān)的那根電線
- call(Subscriber<? super String> subscriber)中的subscriber箩做,就是我們自己創(chuàng)建的那個觀察者
- 只有在訂閱的時候莽红,才會發(fā)生onSubscribe.call(subscriber),進而才會開始調(diào)用onNext(),onComplete()等邦邦。
到這里安吁,你是不是對于RxJava的觀察者模式了解更加清晰了呢?我們用流程圖復(fù)習(xí)一下剛才的過程燃辖。
了解了上面這些鬼店,我們就可以更進一步做以下總結(jié):
- 訂閱這個動作,實際上是觀察者(subscriber)對象把自己傳遞給被觀察者(observable)內(nèi)部的onSubscribe黔龟。
- onSubscribe的工作就是調(diào)用call(subscriber)來通知被觀察者發(fā)送消息給這個subscriber妇智。
以上的結(jié)論對于下面我們理解操作符的原理十分有幫助,因此一定要看明白氏身。
觀察者模式介紹到這里巍棱,才敢說講完了。
關(guān)于操作符
上一篇文章講了一些操作符蛋欣,并且在github上放了很多其他的操作符使用范例給大家航徙,因此在這里不會介紹更多操作符的用法,而是講解操作符的實現(xiàn)原理陷虎。他是如何攔截事件到踏,然后變換處理之后杠袱,最后傳遞到觀察者手中的呢?
相信了解相關(guān)內(nèi)容的人可能會想到lift()操作符窝稿,它本來是其他操作符做變換的基礎(chǔ)楣富,不過那已經(jīng)是好幾個版本以前的事情了。但是目前版本中RxJava已經(jīng)不一樣了了讹躯,直接把lift()的工作下放到每個操作符中菩彬,把lift的弱化了(但是依然保留了lift()操作符)。
因此潮梯,我們在這里不必講解lift骗灶,直接拿一個操作符做例子,來了解它的原理即可秉馏,因為基本上操作符的實現(xiàn)原理都是一樣的耙旦。
以map()為例,依然拿之前文章里面的例子:
Observable.just(getFilePath())
//使用map操作來完成類型轉(zhuǎn)換
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String s) {
//顯然自定義的createBitmapFromPath(s)方法萝究,是一個極其耗時的操作
return createBitmapFromPath(s);
}
})
.subscribe(
//創(chuàng)建觀察者免都,作為事件傳遞的終點處理事件
new Subscriber<Bitmap>() {
@Override
public void onCompleted() {
Log.d("DDDDDD","結(jié)束觀察...\n");
}
@Override
public void onError(Throwable e) {
//出現(xiàn)錯誤會調(diào)用這個方法
}
@Override
public void onNext(Bitmap s) {
//處理事件
showBitmap(s)
}
);
看看map背后到底做了什么
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
//創(chuàng)建了全新代理的的Observable,構(gòu)造函數(shù)傳入的參數(shù)是OnSubscribe
//OnSubscribeMap顯然是OnSubscribe的一個實現(xiàn)類帆竹,
//也就是說绕娘,OnSubscribeMap需要實現(xiàn)call()方法
//構(gòu)造函數(shù)傳入了真實的Observable對象
//和一個開發(fā)者自己實現(xiàn)的Func1的實例
return create(new OnSubscribeMap<T, R>(this, func));
}
看OnSubscribeMap的具體實現(xiàn):
public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {
//用于保存真實的Observable對象
final Observable<T> source;
//還有我們傳入的那個Func1的實例
final Func1<? super T, ? extends R> transformer;
public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
this.source = source;
this.transformer = transformer;
}
//實現(xiàn)了call方法,我們知道call方法傳入的Subscriber
//就是訂閱之后栽连,外部傳入真實的的觀察者
@Override
public void call(final Subscriber<? super R> o) {
//把外部傳入的真實觀察者傳入到MapSubscriber险领,構(gòu)造一個代理的觀察者
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
//讓外部的Observable去訂閱這個代理的觀察者
source.unsafeSubscribe(parent);
}
//Subscriber的子類,用于構(gòu)造一個代理的觀察者
static final class MapSubscriber<T, R> extends Subscriber<T> {
//這個Subscriber保存了真實的觀察者
final Subscriber<? super R> actual;
//我們自己在外部自己定義的Func1
final Func1<? super T, ? extends R> mapper;
boolean done;
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
//外部的Observable發(fā)送的onNext()等事件
//都會首先傳遞到代理觀察者這里
@Override
public void onNext(T t) {
R result;
try {
//mapper其實就是開發(fā)者自己創(chuàng)建的Func1秒紧,
//call()開始變換數(shù)據(jù)
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
//調(diào)用真實的觀察者的onNext()
//從而在變換數(shù)據(jù)之后绢陌,把數(shù)據(jù)送到真實的觀察者手中
actual.onNext(result);
}
//onError()方法也是一樣
@Override
public void onError(Throwable e) {
if (done) {
RxJavaHooks.onError(e);
return;
}
done = true;
actual.onError(e);
}
@Override
public void onCompleted() {
if (done) {
return;
}
actual.onCompleted();
}
@Override
public void setProducer(Producer p) {
actual.setProducer(p);
}
}
}
map操作符的原理基本上就講完了,其他的操作符和map在原理上是一致的熔恢。
假如你想創(chuàng)建自定義的操作符(這其實是不建議的)脐湾,你應(yīng)該按照上面的步驟
- 創(chuàng)建一個代理的被觀察者
- 實現(xiàn)被觀察者中onSubscribe的call方法
- 在call方法中創(chuàng)建一個代理的觀察者,讓真實的被觀察者訂閱它叙淌。
我知道你會有點暈秤掌,沒關(guān)系,我后面會寫一個自定義操作符放在我的github上,可以關(guān)注下鹰霍。
下面机杜,我們先通過一個流程圖鞏固一下前面學(xué)習(xí)的成果。
下次你使用操作符時衅谷,心里應(yīng)該清楚椒拗,每使用一個操作符,都會創(chuàng)建一個代理觀察者和一個代理被觀察者,以及他們之間是如何相互調(diào)用的蚀苛。相信有了這一層的理解在验,今后使用RxJava應(yīng)該會更加得心應(yīng)手。
不過最后堵未,我想給大家留一個思考題:使用一個操作符時腋舌,流程圖是這樣的,那么使用多個操作符呢渗蟹?
勘誤
暫無
后記
到這里块饺,關(guān)于RxJava的講解就基本可以告一段落了,
我相信雌芽,兩篇文章讀下來授艰,對于RxJava的理解應(yīng)該已經(jīng)到了比較高的一個層次了,我的目標(biāo)也就達(dá)到了世落。
接下來....
因為RxJava是一個事件的異步處理框架淮腾,理論上,他可以封裝任何其他的庫屉佳,那么.....