上一篇文章中定義了Rx = Observable + Operator + Scheduler图柏。Rx以經(jīng)典觀察者模式為骨架、并擴(kuò)展之使得我們能夠以類似使用Iterable的方式使用Observable。
Rx最為重要的兩個(gè)要素是:數(shù)據(jù)流和異步(實(shí)際上Rx把數(shù)據(jù)流都視作異步的)。今天的主角便是數(shù)據(jù)流——Observable。根據(jù)上下文語義的需要馆里,本系列文中可能另稱之為數(shù)據(jù)序列、事件流可柿、被觀察者鸠踪。
觀察者
在Rx的世界中,(幾乎)每一個(gè)故事都從“觀察者訂閱了數(shù)據(jù)流”開始趾痘。觀察者——Observer——好比哨兵慢哈,時(shí)刻監(jiān)視著數(shù)據(jù)流的動(dòng)靜,一旦有數(shù)據(jù)發(fā)射或通知發(fā)送便立即響應(yīng)永票。觀察者實(shí)現(xiàn)了以下三個(gè)方法的子集:
-
onNext
-- 當(dāng)數(shù)據(jù)流發(fā)射流中任意一個(gè)數(shù)據(jù)時(shí)會(huì)調(diào)用觀察者的onNext
方法,并將發(fā)射的數(shù)據(jù)作為參數(shù)滥沫。 -
onError
-- 當(dāng)數(shù)據(jù)流產(chǎn)生數(shù)據(jù)失敗或發(fā)生其他異常時(shí)會(huì)調(diào)用觀察者的onError
方法侣集,并將失敗原因(Throwable
)作為參數(shù)。 -
onComplete
-- 當(dāng)數(shù)據(jù)流中的所有數(shù)據(jù)全部正常發(fā)射完會(huì)調(diào)用觀察著的onComplete
方法兰绣。
當(dāng)數(shù)據(jù)流調(diào)用觀察者的onError/onComplete時(shí)世分,我們稱它發(fā)送了錯(cuò)誤/完成通知。觀察者只能收到來自某個(gè)數(shù)據(jù)流的一個(gè)通知缀辩,也就是說如果收到了流的錯(cuò)誤通知臭埋,就不可能再收到該流的完成通知,反之亦然臀玄。一旦觀察者收到了通知瓢阴,便不能接收任何由該流發(fā)射的數(shù)據(jù)。
注:數(shù)據(jù)流可以發(fā)送多個(gè)通知健无,也可以在發(fā)送通知之后繼續(xù)發(fā)送數(shù)據(jù)荣恐,只是觀察者收到通知后就單方面把該流“拉黑”了而已。有時(shí)候?yàn)榱藢?shí)現(xiàn)一些特殊功能,我們不得不允許Observer
不受限制地接收數(shù)據(jù)和通知(RxJava2的源碼中也存在著這樣的實(shí)現(xiàn)叠穆,比如:ObservableConcatMap.SourceObserver.InnerObserver
就可以多次接收onComplete
通知)少漆。
Rx編程模型
我們先看一個(gè)常規(guī)的方法調(diào)用過程,程序會(huì)按照代碼書寫的順序逐步地執(zhí)行指令并返回結(jié)果硼被,以同步的方式完成任務(wù):
- 先調(diào)用某個(gè)方法示损。
- 把方法的返回值賦值給某個(gè)變量。
- 使用該變量執(zhí)行后續(xù)指令以完成任務(wù)嚷硫。
在Rx中检访,數(shù)據(jù)流用于定義產(chǎn)生、處理數(shù)據(jù)的機(jī)制论巍,一旦有觀察者訂閱(subscribe
)了該流烛谊,其預(yù)定義的機(jī)制立即生效,觀察者等待數(shù)據(jù)發(fā)射或通知發(fā)送并響應(yīng):
- 定義一個(gè)數(shù)據(jù)流嘉汰,該流定義了一個(gè)異步操作丹禀,可以產(chǎn)生一個(gè)或多個(gè)數(shù)據(jù)。
- 定義一個(gè)觀察者鞋怀,并為它定義一個(gè)方法(
onNext
)双泪,該方法用來消費(fèi)第一步的異步操作發(fā)射的數(shù)據(jù)。 - 觀察者訂閱數(shù)據(jù)流(于是故事開始了)密似,數(shù)據(jù)流的異步操作被觸發(fā)焙矛,然后生產(chǎn)發(fā)射數(shù)據(jù),或發(fā)送通知(以結(jié)束整個(gè)故事)残腌。
如果程序需要完成多個(gè)不存在互相依賴的任務(wù)村斟,由于Rx中指令可以異步并發(fā)地執(zhí)行,我們可以同時(shí)啟動(dòng)多個(gè)任務(wù)抛猫,而不用依次地等待某個(gè)任務(wù)完成再啟動(dòng)下一個(gè)蟆盹。
Observable操作符
掌握數(shù)據(jù)流和觀察者之后,我們能比以前更好地處理數(shù)據(jù)序列(而不限于單個(gè)數(shù)據(jù))闺金。然而Rx真正的核武器是操作符Operator逾滥。我們先了解一下Rx有哪些操作符。
-
創(chuàng)建型
Create
,Defer
,Empty
/Never
/Throw
,From
,Interval
,Just
,Range
,Repeat
,Start
,Timer
-
變換型
Buffer
,FlatMap
,GroupBy
,Map
,Scan
,Window
-
過濾型
Debounce
,Distinct
,ElementAt
,Filter
,First
,IgnoreElements
,Last
,Sample
,Skip
,SkipLast
,Take
,TakeLast
-
組合型
And
/Then
/When
,CombineLatest
,Join
,Merge
,StartWith
,Switch
,Zip
-
容錯(cuò)型
Catch
,Retry
-
工具型
Delay
,Do
,Materialize
/Dematerialize
,ObserveOn
,Serialize
,Subscribe
,SubscribeOn
,TimeInterval
,Timeout
,Timestamp
,Using
-
條件型
All
,Amb
,Contains
,DefaultIfEmpty
,SequenceEqual
,SkipUntil
,SkipWhile
,TakeUntil
,TakeWhile
-
聚合型
Average
,Concat
,Count
,Max
,Min
,Reduce
,Sum
-
轉(zhuǎn)換型
To
-
連接型
Connect
,Publish
,RefCount
,Replay
- 背壓型
多數(shù)操作符仍然返回一個(gè)數(shù)據(jù)流败匹,這種方式允許我們?cè)诔绦蛑墟準(zhǔn)降貙?duì)數(shù)據(jù)流調(diào)用操作符——聯(lián)想一下builder(構(gòu)建者)模式的鏈?zhǔn)秸{(diào)用——與builder模式不同的是寨昙,Observable
的操作符返回了一個(gè)新Observable
,這個(gè)新Observable
是原Observable
的代理掀亩。
應(yīng)用了操作符后舔哪,單單用“數(shù)據(jù)流”已經(jīng)無法準(zhǔn)確描述Observable
的含義。我增加“原始流”归榕、“上游”和“下游”以及“支流(流中流)”來區(qū)分不同意義的Observable
尸红。“數(shù)據(jù)流”是Observable
的泛稱。
- 原始流——全稱為“原始數(shù)據(jù)流”外里,指代由創(chuàng)建型操作符返回的
Observable
怎爵。 - 上游和下游——二者必須成對(duì)地出現(xiàn)。對(duì)
Observable
調(diào)用非創(chuàng)建型操作符后盅蝗,“上游”指代原Observable
鳖链,“下游”指代返回的新Observable
。 - 支流或流中流——僅僅在應(yīng)用
FlatMap
和ConcatMap
操作符的場(chǎng)景中使用這一稱謂墩莫≤轿“支流”指代這兩個(gè)操作符的mapper
返回的子Observable
,“支流”亦稱“流中流”狂秦。
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
從flatMap
的方法簽名分析灌侣,它接受一個(gè)mapper參數(shù),此mapper將上游中的數(shù)據(jù)變換成一個(gè)ObservableSource
裂问,新Observable
中的數(shù)據(jù)是ObservableSource
類型——頗有子Observable
的味道——這不就是“流中流”(“流中流”自帶解釋功能侧啼,理解之后還是叫“支流”比較自然)嗎?
需要注意的是:最終Rx將整合子Observable
(支流)中的所有數(shù)據(jù)而不是子Observable
本身匯入下游堪簿。后面的文章會(huì)詳細(xì)地對(duì)FlatMap
和ConcatMap
進(jìn)行源碼分析痊乾。
RxJava2#Observable類(源碼基于v2.1.5)
Observable
是一個(gè)抽象類,實(shí)現(xiàn)了ObservableSource(void subscribe())
接口椭更。該類有且僅有一個(gè)抽象方法subscribeActual
哪审,其他非private
(private
方法也就3個(gè))方法要么是static
的,要么是final
的虑瀑。這意味著定義自己的ObservableCustom
是件非常簡(jiǎn)單的事情湿滓,Observable
類已經(jīng)完成了99.99%的工作,我們只需要override
subscribeActual
方法就夠了舌狗。
Observable
所有的創(chuàng)建型操作符都是靜態(tài)的茉稠,比如Just:
public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, "The item is null");
// 通過RxJavaPlugins的setters可以在運(yùn)行時(shí)改變默認(rèn)的行為
// 如果程序中沒有調(diào)用RxJavaPlugins.setOnObservableAssembly(xxx),下面一行代碼跟其后一行注釋完全等效
return RxJavaPlugins.onAssembly(new ObservableJust(item));
// return new ObservableJust(item);
}
我們可以看到Just
操作符本質(zhì)上構(gòu)造了一個(gè)ObservableJust
對(duì)象把夸。RxJava2內(nèi)置了大量的ObservableXXX
(XXX
往往是操作符的名字比如Just
)。
再來看一個(gè)非創(chuàng)建型的操作符Map:
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap(this, mapper));
}
以及ObservableMap
類的核心代碼:
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
public void subscribeActual(Observer<? super U> t) {
this.source.subscribe(new ObservableMap.MapObserver(t, this.function));
}
}
重點(diǎn)看ObservableMap
構(gòu)造方法铭污,它接收ObservableSource
類型的對(duì)象作為第一個(gè)參數(shù)——回憶一下代理模式——?jiǎng)?chuàng)建了原Observable
的代理恋日,也就是新ObservableMap
實(shí)例。
RxJava2中大量運(yùn)用了代理模式嘹狞,細(xì)心的你或許已經(jīng)發(fā)現(xiàn):在subscribeActual
方法中岂膳,還創(chuàng)建了一個(gè)原Observer
的代理——MapObserver
的實(shí)例。