runAsync
rxjava-async
模塊還包含一個runAsync
操作符。它很特殊轴或,返回一個叫做StoppableObservable
的特殊Observable。
傳遞一個Action
和一個Scheduler
給runAsync
基公,它返回一個使用這個Action
產(chǎn)生數(shù)據(jù)的StoppableObservable
需纳。這個Action
接受一個Observable
和一個Subscription
作為參數(shù),它使用Subscription
檢查unsubscribed
條件蛤织,一旦發(fā)現(xiàn)條件為真就立即停止發(fā)射數(shù)據(jù)赴叹。在任何時(shí)候你都可以使用unsubscribe
方法手動停止一個StoppableObservable
(這會同時(shí)取消訂閱與這個StoppableObservable
關(guān)聯(lián)的Subscription
)。
由于runAsync
會立即調(diào)用Action
并開始發(fā)射數(shù)據(jù)指蚜,在你創(chuàng)建StoppableObservable之后到你的觀察者準(zhǔn)備好接受數(shù)據(jù)之前這段時(shí)間里乞巧,可能會有一部分?jǐn)?shù)據(jù)會丟失。如果這不符合你的要求摊鸡,可以使用runAsync
的一個變體绽媒,它也接受一個Subject
參數(shù),傳遞一個ReplaySubject
給它免猾,你可以獲取其它丟失的數(shù)據(jù)了是辕。
在RxJava中還有一個版本的From
操作符可以將Future轉(zhuǎn)換為Observable,與start
相似猎提。
Timer
創(chuàng)建一個Observable获三,它在一個給定的延遲后發(fā)射一個特殊的值。
Timer
操作符創(chuàng)建一個在給定的時(shí)間段之后返回一個特殊值的Observable。
RxJava將這個操作符實(shí)現(xiàn)為timer
函數(shù)疙教。
timer
返回一個Observable棺聊,它在延遲一段給定的時(shí)間后發(fā)射一個簡單的數(shù)字0。
timer
操作符默認(rèn)在computation
調(diào)度器上執(zhí)行贞谓。有一個變體可以通過可選參數(shù)指定Scheduler躺屁。
- Javadoc: timer(long,TimeUnit)
- Javadoc: timer(long,TimeUnit,Scheduler)
變換操作
這個頁面展示了可用于對Observable發(fā)射的數(shù)據(jù)執(zhí)行變換操作的各種操作符。
- map(?) — 對序列的每一項(xiàng)都應(yīng)用一個函數(shù)來變換Observable發(fā)射的數(shù)據(jù)序列
- flatMap(?), concatMap(?), and flatMapIterable(?) — 將Observable發(fā)射的數(shù)據(jù)集合變換為Observables集合经宏,然后將這些Observable發(fā)射的數(shù)據(jù)平坦化的放進(jìn)一個單獨(dú)的Observable
- switchMap(?) — 將Observable發(fā)射的數(shù)據(jù)集合變換為Observables集合,然后只發(fā)射這些Observables最近發(fā)射的數(shù)據(jù)
- scan(?) — 對Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個函數(shù)驯击,然后按順序依次發(fā)射每一個值
- groupBy(?) — 將Observable分拆為Observable集合烁兰,將原始Observable發(fā)射的數(shù)據(jù)按Key分組,每一個Observable發(fā)射一組不同的數(shù)據(jù)
- buffer(?) — 它定期從Observable收集數(shù)據(jù)到一個集合徊都,然后把這些數(shù)據(jù)集合打包發(fā)射沪斟,而不是一次發(fā)射一個
- window(?) — 定期將來自O(shè)bservable的數(shù)據(jù)分拆成一些Observable窗口,然后發(fā)射這些窗口暇矫,而不是每次發(fā)射一項(xiàng)
- cast(?) — 在發(fā)射之前強(qiáng)制將Observable發(fā)射的所有數(shù)據(jù)轉(zhuǎn)換為指定類型
Buffer
定期收集Observable的數(shù)據(jù)放進(jìn)一個數(shù)據(jù)包裹主之,然后發(fā)射這些數(shù)據(jù)包裹,而不是一次發(fā)射一個值李根。
Buffer
操作符將一個Observable變換為另一個槽奕,原來的Observable正常發(fā)射數(shù)據(jù),變換產(chǎn)生的Observable發(fā)射這些數(shù)據(jù)的緩存集合房轿。Buffer
操作符在很多語言特定的實(shí)現(xiàn)中有很多種變體粤攒,它們在如何緩存這個問題上存在區(qū)別。
注意:如果原來的Observable發(fā)射了一個onError
通知囱持,Buffer
會立即傳遞這個通知夯接,而不是首先發(fā)射緩存的數(shù)據(jù),即使在這之前緩存中包含了原始Observable發(fā)射的數(shù)據(jù)纷妆。
Window
操作符與Buffer
類似盔几,但是它在發(fā)射之前把收集到的數(shù)據(jù)放進(jìn)單獨(dú)的Observable,而不是放進(jìn)一個數(shù)據(jù)結(jié)構(gòu)掩幢。
在RxJava中有許多Buffer
的變體:
buffer(count)
buffer(count)
以列表(List)的形式發(fā)射非重疊的緩存逊拍,每一個緩存至多包含來自原始Observable的count項(xiàng)數(shù)據(jù)(最后發(fā)射的列表數(shù)據(jù)可能少于count項(xiàng))
- Javadoc: buffer(int)
buffer(count, skip)
buffer(count,?skip)
從原始Observable的第一項(xiàng)數(shù)據(jù)開始創(chuàng)建新的緩存,此后每當(dāng)收到skip
項(xiàng)數(shù)據(jù)粒蜈,用count
項(xiàng)數(shù)據(jù)填充緩存:開頭的一項(xiàng)和后續(xù)的count-1
項(xiàng)顺献,它以列表(List)的形式發(fā)射緩存,取決于count
和skip
的值枯怖,這些緩存可能會有重疊部分(比如skip < count時(shí))注整,也可能會有間隙(比如skip > count時(shí))。
- Javadoc: buffer(int,int)
buffer(bufferClosingSelector)
當(dāng)它訂閱原來的Observable時(shí),buffer(bufferClosingSelector)
開始將數(shù)據(jù)收集到一個List
肿轨,然后它調(diào)用bufferClosingSelector
生成第二個Observable寿冕,當(dāng)?shù)诙€Observable發(fā)射一個TClosing
時(shí),buffer
發(fā)射當(dāng)前的List
椒袍,然后重復(fù)這個過程:開始組裝一個新的List
驼唱,然后調(diào)用bufferClosingSelector
創(chuàng)建一個新的Observable并監(jiān)視它。它會一直這樣做直到原來的Observable執(zhí)行完成驹暑。
- Javadoc: buffer(Func0)
buffer(boundary)
buffer(boundary)
監(jiān)視一個名叫boundary
的Observable玫恳,每當(dāng)這個Observable發(fā)射了一個值,它就創(chuàng)建一個新的List
開始收集來自原始Observable的數(shù)據(jù)并發(fā)射原來的List
优俘。
- Javadoc: buffer(Observable)
- Javadoc: buffer(Observable,int)
buffer(bufferOpenings, bufferClosingSelector)
buffer(bufferOpenings,?bufferClosingSelector)
監(jiān)視這個叫bufferOpenings
的Observable(它發(fā)射BufferOpening
對象)京办,每當(dāng)bufferOpenings
發(fā)射了一個數(shù)據(jù)時(shí),它就創(chuàng)建一個新的List
開始收集原始Observable的數(shù)據(jù)帆焕,并將bufferOpenings
傳遞給closingSelector
函數(shù)惭婿。這個函數(shù)返回一個Observable。buffer
監(jiān)視這個Observable叶雹,當(dāng)它檢測到一個來自這個Observable的數(shù)據(jù)時(shí)财饥,就關(guān)閉List
并且發(fā)射它自己的數(shù)據(jù)(之前的那個List)。
- Javadoc: buffer(Observable,Func1)
buffer(timespan, unit[, scheduler])
buffer(timespan,?unit)
定期以List
的形式發(fā)射新的數(shù)據(jù)折晦,每個時(shí)間段钥星,收集來自原始Observable的數(shù)據(jù)(從前面一個數(shù)據(jù)包裹之后,或者如果是第一個數(shù)據(jù)包裹筋遭,從有觀察者訂閱原來的Observale之后開始)打颤。還有另一個版本的buffer
接受一個Scheduler
參數(shù),默認(rèn)情況下會使用computation
調(diào)度器漓滔。
- Javadoc: buffer(long,TimeUnit)
- Javadoc: buffer(long,TimeUnit,Scheduler)
buffer(timespan, unit, count[, scheduler])
每當(dāng)收到來自原始Observable的count項(xiàng)數(shù)據(jù)编饺,或者每過了一段指定的時(shí)間后,buffer(timespan,?unit,?count)
就以List
的形式發(fā)射這期間的數(shù)據(jù)响驴,即使數(shù)據(jù)項(xiàng)少于count項(xiàng)透且。還有另一個版本的buffer
接受一個Scheduler
參數(shù),默認(rèn)情況下會使用computation
調(diào)度器豁鲤。
- Javadoc: buffer(long,TimeUnit,int)
- Javadoc: buffer(long,TimeUnit,int,Scheduler)
buffer(timespan, timeshift, unit[, scheduler])
buffer(timespan,?timeshift,?unit)
在每一個timeshift
時(shí)期內(nèi)都創(chuàng)建一個新的List
,然后用原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)填充這個列表(在把這個List
當(dāng)做自己的數(shù)據(jù)發(fā)射前秽誊,從創(chuàng)建時(shí)開始,直到過了timespan
這么長的時(shí)間)琳骡。如果timespan
長于timeshift
锅论,它發(fā)射的數(shù)據(jù)包將會重疊,因此可能包含重復(fù)的數(shù)據(jù)項(xiàng)楣号。
還有另一個版本的buffer
接受一個Scheduler
參數(shù)最易,默認(rèn)情況下會使用computation
調(diào)度器怒坯。
- Javadoc: buffer(long,long,TimeUnit)
- Javadoc: buffer(long,long,TimeUnit,Scheduler)
buffer-backpressure
你可以使用Buffer
操作符實(shí)現(xiàn)反壓backpressure
(意思是,處理這樣一個Observable:它產(chǎn)生數(shù)據(jù)的速度可能比它的觀察者消費(fèi)數(shù)據(jù)的速度快)藻懒。
Buffer操作符可以將大量的數(shù)據(jù)序列縮減為較少的數(shù)據(jù)緩存序列剔猿,讓它們更容易處理。例如嬉荆,你可以按固定的時(shí)間間隔归敬,定期關(guān)閉和發(fā)射來自一個爆發(fā)性O(shè)bservable的數(shù)據(jù)緩存。這相當(dāng)于一個緩沖區(qū)鄙早。
示例代碼
Observable<List<Integer>> burstyBuffered = bursty.buffer(500, TimeUnit.MILLISECONDS);
或者汪茧,如果你想更進(jìn)一步,可以在爆發(fā)期將數(shù)據(jù)收集到緩存限番,然后在爆發(fā)期終止時(shí)發(fā)射這些數(shù)據(jù)陆爽,使用 Debounce
操作符給buffer
操作符發(fā)射一個緩存關(guān)閉指示器(buffer closing indicator
)可以做到這一點(diǎn)。
代碼示例:
// we have to multicast the original bursty Observable so we can use it
// both as our source and as the source for our buffer closing selector:
Observable<Integer> burstyMulticast = bursty.publish().refCount();
// burstyDebounced will be our buffer closing selector:
Observable<Integer> burstyDebounced = burstyMulticast.debounce(10, TimeUnit.MILLISECONDS);
// and this, finally, is the Observable of buffers we're interested in:
Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced);
參見
FlatMap
FlatMap
將一個發(fā)射數(shù)據(jù)的Observable變換為多個Observables扳缕,然后將它們發(fā)射的數(shù)據(jù)合并后放進(jìn)一個單獨(dú)的Observable
FlatMap
操作符使用一個指定的函數(shù)對原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)執(zhí)行變換操作,這個函數(shù)返回一個本身也發(fā)射數(shù)據(jù)的Observable别威,然后FlatMap
合并這些Observables發(fā)射的數(shù)據(jù)躯舔,最后將合并后的結(jié)果當(dāng)做它自己的數(shù)據(jù)序列發(fā)射。
這個方法是很有用的省古,例如粥庄,當(dāng)你有一個這樣的Observable:它發(fā)射一個數(shù)據(jù)序列,這些數(shù)據(jù)本身包含Observable成員或者可以變換為Observable豺妓,因此你可以創(chuàng)建一個新的Observable發(fā)射這些次級Observable發(fā)射的數(shù)據(jù)的完整集合惜互。
注意:FlatMap
對這些Observables發(fā)射的數(shù)據(jù)做的是合并(merge
)操作,因此它們可能是交錯的琳拭。
在許多語言特定的實(shí)現(xiàn)中训堆,還有一個操作符不會讓變換后的Observables發(fā)射的數(shù)據(jù)交錯,它按照嚴(yán)格的順序發(fā)射這些數(shù)據(jù)白嘁,這個操作符通常被叫作ConcatMap
或者類似的名字坑鱼。
RxJava將這個操作符實(shí)現(xiàn)為flatMap
函數(shù)。
注意:如果任何一個通過這個flatMap
操作產(chǎn)生的單獨(dú)的Observable調(diào)用onError
異常終止了絮缅,這個Observable自身會立即調(diào)用onError
并終止懈贺。
這個操作符有一個接受額外的int
參數(shù)的一個變體逊抡。這個參數(shù)設(shè)置flatMap
從原來的Observable映射Observables的最大同時(shí)訂閱數(shù)。當(dāng)達(dá)到這個限制時(shí),它會等待其中一個終止然后再訂閱另一個搬泥。
- Javadoc: flatMap(Func1)
- Javadoc: flatMap(Func1,int)
還有一個版本的flatMap
為原始Observable的每一項(xiàng)數(shù)據(jù)和每一個通知創(chuàng)建一個新的Observable(并對數(shù)據(jù)平坦化)。
它也有一個接受額外int
參數(shù)的變體隐绵。
- Javadoc: flatMap(Func1,Func1,Func0)
- Javadoc: flatMap(Func1,Func1,Func0,int)
還有一個版本的flatMap
會使用原始Observable的數(shù)據(jù)觸發(fā)的Observable組合這些數(shù)據(jù)愧怜,然后發(fā)射這些數(shù)據(jù)組合缠局。它也有一個接受額外int
參數(shù)的版本。
- Javadoc: flatMap(Func1,Func2)
- Javadoc: flatMap(Func1,Func2,int)
flatMapIterable
flatMapIterable
這個變體成對的打包數(shù)據(jù)蔼两,然后生成Iterable而不是原始數(shù)據(jù)和生成的Observables甩鳄,但是處理方式是相同的。
- Javadoc: flatMapIterable(Func1)
- Javadoc: flatMapIterable(Func1,Func2)
concatMap
還有一個concatMap
操作符额划,它類似于最簡單版本的flatMap
妙啃,但是它按次序連接而不是合并那些生成的Observables,然后產(chǎn)生自己的數(shù)據(jù)序列俊戳。
- Javadoc: concatMap(Func1)
switchMap
RxJava還實(shí)現(xiàn)了switchMap
操作符揖赴。它和flatMap
很像,除了一點(diǎn):當(dāng)原始Observable發(fā)射一個新的數(shù)據(jù)(Observable)時(shí)抑胎,它將取消訂閱并停止監(jiān)視產(chǎn)生執(zhí)之前那個數(shù)據(jù)的Observable燥滑,只監(jiān)視當(dāng)前這一個。
- Javadoc: switchMap(Func1)
split
在特殊的StringObservable
類(默認(rèn)沒有包含在RxJava中)中還有一個split
操作符阿逃。它將一個發(fā)射字符串的Observable轉(zhuǎn)換為另一個發(fā)射字符串的Observable铭拧,只不過,后者將原始的數(shù)據(jù)序列當(dāng)做一個數(shù)據(jù)流恃锉,使用一個正則表達(dá)式邊界分割它們搀菩,然后合并發(fā)射分割的結(jié)果。
GroupBy
將一個Observable分拆為一些Observables集合破托,它們中的每一個發(fā)射原始Observable的一個子序列
GroupBy
操作符將原始Observable分拆為一些Observables集合肪跋,它們中的每一個發(fā)射原始Observable數(shù)據(jù)序列的一個子序列。哪個數(shù)據(jù)項(xiàng)由哪一個Observable發(fā)射是由一個函數(shù)判定的土砂,這個函數(shù)給每一項(xiàng)指定一個Key州既,Key相同的數(shù)據(jù)會被同一個Observable發(fā)射。
RxJava實(shí)現(xiàn)了groupBy
操作符萝映。它返回Observable的一個特殊子類GroupedObservable
吴叶,實(shí)現(xiàn)了GroupedObservable
接口的對象有一個額外的方法getKey
,這個Key用于將數(shù)據(jù)分組到指定的Observable序臂。
有一個版本的groupBy
允許你傳遞一個變換函數(shù)晤郑,這樣它可以在發(fā)射結(jié)果GroupedObservable
之前改變數(shù)據(jù)項(xiàng)。
注意:groupBy
將原始Observable分解為一個發(fā)射多個GroupedObservable
的Observable贸宏,一旦有訂閱造寝,每個GroupedObservable
就開始緩存數(shù)據(jù)。因此吭练,如果你忽略這些GroupedObservable
中的任何一個诫龙,這個緩存可能形成一個潛在的內(nèi)存泄露。因此鲫咽,如果你不想觀察签赃,也不要忽略GroupedObservable
谷异。你應(yīng)該使用像take(0)
這樣會丟棄自己的緩存的操作符。
如果你取消訂閱一個GroupedObservable
锦聊,那個Observable將會終止歹嘹。如果之后原始的Observable又發(fā)射了一個與這個Observable的Key匹配的數(shù)據(jù),groupBy
將會為這個Key創(chuàng)建一個新的GroupedObservable
孔庭。
groupBy
默認(rèn)不在任何特定的調(diào)度器上執(zhí)行尺上。
- Javadoc: groupBy(Func1)
- Javadoc: groupBy(Func1,Func1)
Map
對Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個函數(shù),執(zhí)行變換操作
Map
操作符對原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個你選擇的函數(shù)圆到,然后返回一個發(fā)射這些結(jié)果的Observable怎抛。
RxJava將這個操作符實(shí)現(xiàn)為map
函數(shù)。這個操作符默認(rèn)不在任何特定的調(diào)度器上執(zhí)行芽淡。
- Javadoc: map(Func1)
cast
cast
操作符將原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)都強(qiáng)制轉(zhuǎn)換為一個指定的類型马绝,然后再發(fā)射數(shù)據(jù),它是map
的一個特殊版本挣菲。
- Javadoc: cast(Class)
下面是常用的操作符列表:
- 創(chuàng)建操作 Create, Defer, Empty/Never/Throw, From, Interval, Just, Range, Repeat, Start, Timer
- 變換操作 Buffer, FlatMap, GroupBy, Map, Scan和Window
- 過濾操作 Debounce, Distinct, ElementAt, Filter, First, IgnoreElements, Last, Sample, Skip, SkipLast, Take, TakeLast
- 組合操作 And/Then/When, CombineLatest, Join, Merge, StartWith, Switch, Zip
- 錯誤處理 Catch和Retry
- 輔助操作 Delay, Do, Materialize/Dematerialize, ObserveOn, Serialize, Subscribe, SubscribeOn, TimeInterval, Timeout, Timestamp, Using
- 條件和布爾操作 All, Amb, Contains, DefaultIfEmpty, SequenceEqual, SkipUntil, SkipWhile, TakeUntil, TakeWhile
- 算術(shù)和集合操作 Average, Concat, Count, Max, Min, Reduce, Sum
- 轉(zhuǎn)換操作 To
- 連接操作 Connect, Publish, RefCount, Replay
- 反壓操作富稻,用于增加特殊的流程控制策略的操作符
這些操作符并不全都是ReactiveX的核心組成部分,有一些是語言特定的實(shí)現(xiàn)或可選的模塊白胀。
RxJava
在RxJava中唉窃,一個實(shí)現(xiàn)了Observer接口的對象可以訂閱(subscribe)一個Observable 類的實(shí)例。訂閱者(subscriber)對Observable發(fā)射(emit)的任何數(shù)據(jù)或數(shù)據(jù)序列作出響應(yīng)纹笼。這種模式簡化了并發(fā)操作,因?yàn)樗恍枰枞却齇bservable發(fā)射數(shù)據(jù)苟跪,而是創(chuàng)建了一個處于待命狀態(tài)的觀察者哨兵廷痘,哨兵在未來某個時(shí)刻響應(yīng)Observable的通知。
Single
介紹
RxJava(以及它派生出來的RxGroovy和RxScala)中有一個名為Single的Observable變種件已。
Single類似于Observable笋额,不同的是,它總是只發(fā)射一個值篷扩,或者一個錯誤通知兄猩,而不是發(fā)射一系列的值。
因此鉴未,不同于Observable需要三個方法onNext, onError, onCompleted枢冤,訂閱Single只需要兩個方法:
- onSuccess - Single發(fā)射單個的值到這個方法
- onError - 如果無法發(fā)射需要的值,Single發(fā)射一個Throwable對象到這個方法
Single只會調(diào)用這兩個方法中的一個铜秆,而且只會調(diào)用一次淹真,調(diào)用了任何一個方法之后,訂閱關(guān)系終止连茧。
Single的操作符
Single也可以組合使用多種操作核蘸,一些操作符讓你可以混合使用Observable和Single:
操作符 | 返回值 | 說明 |
---|---|---|
compose | Single | 創(chuàng)建一個自定義的操作符 |
concat and concatWith | Observable | 連接多個Single和Observable發(fā)射的數(shù)據(jù) |
create | Single | 調(diào)用觀察者的create方法創(chuàng)建一個Single |
error | Single | 返回一個立即給訂閱者發(fā)射錯誤通知的Single |
flatMap | Single | 返回一個Single巍糯,它發(fā)射對原Single的數(shù)據(jù)執(zhí)行flatMap操作后的結(jié)果 |
flatMapObservable | Observable | 返回一個Observable,它發(fā)射對原Single的數(shù)據(jù)執(zhí)行flatMap操作后的結(jié)果 |
from | Single | 將Future轉(zhuǎn)換成Single |
just | Single | 返回一個發(fā)射一個指定值的Single |
map | Single | 返回一個Single客扎,它發(fā)射對原Single的數(shù)據(jù)執(zhí)行map操作后的結(jié)果 |
merge | Single | 將一個Single(它發(fā)射的數(shù)據(jù)是另一個Single祟峦,假設(shè)為B)轉(zhuǎn)換成另一個Single(它發(fā)射來自另一個Single(B)的數(shù)據(jù)) |
merge and mergeWith | Observable | 合并發(fā)射來自多個Single的數(shù)據(jù) |
observeOn | Single | 指示Single在指定的調(diào)度程序上調(diào)用訂閱者的方法 |
onErrorReturn | Single | 將一個發(fā)射錯誤通知的Single轉(zhuǎn)換成一個發(fā)射指定數(shù)據(jù)項(xiàng)的Single |
subscribeOn | Single | 指示Single在指定的調(diào)度程序上執(zhí)行操作 |
timeout | Single | 它給原有的Single添加超時(shí)控制,如果超時(shí)了就發(fā)射一個錯誤通知 |
toSingle | Single | 將一個發(fā)射單個值的Observable轉(zhuǎn)換為一個Single |
zip and zipWith | Single | 將多個Single轉(zhuǎn)換為一個徙鱼,后者發(fā)射的數(shù)據(jù)是對前者應(yīng)用一個函數(shù)后的結(jié)果 |
操作符圖示
詳細(xì)的圖解可以參考英文文檔:Single
Subject
Subject可以看成是一個橋梁或者代理宅楞,在某些ReactiveX實(shí)現(xiàn)中(如RxJava),它同時(shí)充當(dāng)了Observer和Observable的角色疆偿。因?yàn)樗且粋€Observer咱筛,它可以訂閱一個或多個Observable;又因?yàn)樗且粋€Observable杆故,它可以轉(zhuǎn)發(fā)它收到(Observe)的數(shù)據(jù)迅箩,也可以發(fā)射新的數(shù)據(jù)。
由于一個Subject訂閱一個Observable处铛,它可以觸發(fā)這個Observable開始發(fā)射數(shù)據(jù)(如果那個Observable是"冷"的--就是說饲趋,它等待有訂閱才開始發(fā)射數(shù)據(jù))。因此有這樣的效果撤蟆,Subject可以把原來那個"冷"的Observable變成"熱"的奕塑。
Subject的種類
針對不同的場景一共有四種類型的Subject。他們并不是在所有的實(shí)現(xiàn)中全部都存在家肯,而且一些實(shí)現(xiàn)使用其它的命名約定(例如龄砰,在RxScala中Subject被稱作PublishSubject)。
AsyncSubject
一個AsyncSubject只在原始Observable完成后讨衣,發(fā)射來自原始Observable的最后一個值换棚。(如果原始Observable沒有發(fā)射任何值,AsyncObject也不發(fā)射任何值)它會把這最后一個值發(fā)射給任何后續(xù)的觀察者反镇。BehaviorSubject
當(dāng)觀察者訂閱BehaviorSubject時(shí),它開始發(fā)射原始Observable最近發(fā)射的數(shù)據(jù)(如果此時(shí)還沒有收到任何數(shù)據(jù)惊豺,它會發(fā)射一個默認(rèn)值)燎孟,然后繼續(xù)發(fā)射其它任何來自原始Observable的數(shù)據(jù)。PublishSubject
PublishSubject只會把在訂閱發(fā)生的時(shí)間點(diǎn)之后來自原始Observable的數(shù)據(jù)發(fā)射給觀察者碍沐。需要注意的是狸捅,PublishSubject可能會一創(chuàng)建完成就立刻開始發(fā)射數(shù)據(jù)(除非你可以阻止它發(fā)生),因此這里有一個風(fēng)險(xiǎn):在Subject被創(chuàng)建后到有觀察者訂閱它之前這個時(shí)間段內(nèi)累提,一個或多個數(shù)據(jù)可能會丟失尘喝。如果要確保來自原始Observable的所有數(shù)據(jù)都被分發(fā),你需要這樣做:或者使用Create創(chuàng)建那個Observable以便手動給它引入"冷"Observable的行為(當(dāng)所有觀察者都已經(jīng)訂閱時(shí)才開始發(fā)射數(shù)據(jù))斋陪,或者改用ReplaySubject朽褪。ReplaySubject
ReplaySubject會發(fā)射所有來自原始Observable的數(shù)據(jù)給觀察者,無論它們是何時(shí)訂閱的友题。也有其它版本的ReplaySubject嗤堰,在重放緩存增長到一定大小的時(shí)候或過了一段時(shí)間后會丟棄舊的數(shù)據(jù)(原始Observable發(fā)射的)。
如果你把ReplaySubject當(dāng)作一個觀察者使用度宦,注意不要從多個線程中調(diào)用它的onNext方法(包括其它的on系列方法)踢匣,這可能導(dǎo)致同時(shí)(非順序)調(diào)用,這會違反Observable協(xié)議戈抄,給Subject的結(jié)果增加了不確定性离唬。
RxJava的對應(yīng)類
假設(shè)你有一個Subject,你想把它傳遞給其它的代理或者暴露它的Subscriber接口划鸽,你可以調(diào)用它的asObservable方法输莺,這個方法返回一個Observable。具體使用方法可以參考Javadoc文檔裸诽。
串行化
如果你把 Subject
當(dāng)作一個 Subscriber
使用嫂用,注意不要從多個線程中調(diào)用它的onNext方法(包括其它的on系列方法),這可能導(dǎo)致同時(shí)(非順序)調(diào)用崭捍,這會違反Observable協(xié)議,給Subject的結(jié)果增加了不確定性啰脚。
要避免此類問題殷蛇,你可以將 Subject
轉(zhuǎn)換為一個 SerializedSubject
,類似于這樣:
mySafeSubject = new SerializedSubject( myUnsafeSubject );
調(diào)度器 Scheduler
如果你想給Observable操作符鏈添加多線程功能橄浓,你可以指定操作符(或者特定的Observable)在特定的調(diào)度器(Scheduler)上執(zhí)行粒梦。
某些ReactiveX的Observable操作符有一些變體,它們可以接受一個Scheduler參數(shù)荸实。這個參數(shù)指定操作符將它們的部分或全部任務(wù)放在一個特定的調(diào)度器上執(zhí)行匀们。
使用ObserveOn和SubscribeOn操作符,你可以讓Observable在一個特定的調(diào)度器上執(zhí)行准给,ObserveOn指示一個Observable在一個特定的調(diào)度器上調(diào)用觀察者的onNext, onError和onCompleted方法泄朴,SubscribeOn更進(jìn)一步重抖,它指示Observable將全部的處理過程(包括發(fā)射數(shù)據(jù)和通知)放在特定的調(diào)度器上執(zhí)行。
RxJava示例
調(diào)度器的種類
下表展示了RxJava中可用的調(diào)度器種類:
調(diào)度器類型 | 效果 |
---|---|
Schedulers.computation(?) | 用于計(jì)算任務(wù)祖灰,如事件循環(huán)或和回調(diào)處理钟沛,不要用于IO操作(IO操作請使用Schedulers.io());默認(rèn)線程數(shù)等于處理器的數(shù)量 |
Schedulers.from(executor) | 使用指定的Executor作為調(diào)度器 |
Schedulers.immediate(?) | 在當(dāng)前線程立即開始執(zhí)行任務(wù) |
Schedulers.io(?) | 用于IO密集型任務(wù)局扶,如異步阻塞IO操作恨统,這個調(diào)度器的線程池會根據(jù)需要增長;對于普通的計(jì)算任務(wù)三妈,請使用Schedulers.computation()畜埋;Schedulers.io(?)默認(rèn)是一個CachedThreadScheduler,很像一個有線程緩存的新線程調(diào)度器 |
Schedulers.newThread(?) | 為每個任務(wù)創(chuàng)建一個新線程 |
Schedulers.trampoline(?) | 當(dāng)其它排隊(duì)的任務(wù)完成后畴蒲,在當(dāng)前線程排隊(duì)開始執(zhí)行 |
默認(rèn)調(diào)度器
在RxJava中悠鞍,某些Observable操作符的變體允許你設(shè)置用于操作執(zhí)行的調(diào)度器,其它的則不在任何特定的調(diào)度器上執(zhí)行饿凛,或者在一個指定的默認(rèn)調(diào)度器上執(zhí)行狞玛。下面的表格個列出了一些操作符的默認(rèn)調(diào)度器:
操作符 | 調(diào)度器 |
---|---|
buffer(timespan) | computation |
buffer(timespan,?count) | computation |
buffer(timespan,?timeshift) | computation |
debounce(timeout,?unit) | computation |
delay(delay,?unit) | computation |
delaySubscription(delay,?unit) | computation |
interval | computation |
repeat | trampoline |
replay(time,?unit) | computation |
replay(buffersize,?time,?unit) | computation |
replay(selector,?time,?unit) | computation |
replay(selector,?buffersize,?time,?unit) | computation |
retry | trampoline |
sample(period,?unit) | computation |
skip(time,?unit) | computation |
skipLast(time,?unit) | computation |
take(time,?unit) | computation |
takeLast(time,?unit) | computation |
takeLast(count,?time,?unit) | computation |
takeLastBuffer(time,?unit) | computation |
takeLastBuffer(count,?time,?unit) | computation |
throttleFirst | computation |
throttleLast | computation |
throttleWithTimeout | computation |
timeInterval | immediate |
timeout(timeoutSelector) | immediate |
timeout(firstTimeoutSelector,?timeoutSelector) | immediate |
timeout(timeoutSelector,?other) | immediate |
timeout(timeout,?timeUnit) | computation |
timeout(firstTimeoutSelector,?timeoutSelector,?other) | immediate |
timeout(timeout,?timeUnit,?other) | computation |
timer | computation |
timestamp | immediate |
window(timespan) | computation |
window(timespan,?count) | computation |
window(timespan,?timeshift) | computation |
使用調(diào)度器
除了將這些調(diào)度器傳遞給RxJava的Observable操作符,你也可以用它們調(diào)度你自己的任務(wù)涧窒。下面的示例展示了Scheduler.Worker的用法:
worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {
@Override
public void call() {
yourWork();
}
});
// some time later...
worker.unsubscribe();
遞歸調(diào)度器
要調(diào)度遞歸的方法調(diào)用心肪,你可以使用schedule,然后再用schedule(this)纠吴,示例:
worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {
@Override
public void call() {
yourWork();
// recurse until unsubscribed (schedule will do nothing if unsubscribed)
worker.schedule(this);
}
});
// some time later...
worker.unsubscribe();
檢查或設(shè)置取消訂閱狀態(tài)
Worker類的對象實(shí)現(xiàn)了Subscription接口硬鞍,使用它的isUnsubscribed和unsubscribe方法,所以你可以在訂閱取消時(shí)停止任務(wù)戴已,或者從正在調(diào)度的任務(wù)內(nèi)部取消訂閱固该,示例:
Worker worker = Schedulers.newThread().createWorker();
Subscription mySubscription = worker.schedule(new Action0() {
@Override
public void call() {
while(!worker.isUnsubscribed()) {
status = yourWork();
if(QUIT == status) { worker.unsubscribe(); }
}
}
});
Worker同時(shí)是Subscription,因此你可以(通常也應(yīng)該)調(diào)用它的unsubscribe方法通知可以掛起任務(wù)和釋放資源了糖儡。
延時(shí)和周期調(diào)度器
你可以使用schedule(action,delayTime,timeUnit)在指定的調(diào)度器上延時(shí)執(zhí)行你的任務(wù)伐坏,下面例子中的任務(wù)將在500毫秒之后開始執(zhí)行:
someScheduler.schedule(someAction, 500, TimeUnit.MILLISECONDS);
使用另一個版本的schedule,schedulePeriodically(action,initialDelay,period,timeUnit)方法讓你可以安排一個定期執(zhí)行的任務(wù)握联,下面例子的任務(wù)將在500毫秒之后執(zhí)行桦沉,然后每250毫秒執(zhí)行一次:
someScheduler.schedulePeriodically(someAction, 500, 250, TimeUnit.MILLISECONDS);
測試調(diào)度器
TestScheduler讓你可以對調(diào)度器的時(shí)鐘表現(xiàn)進(jìn)行手動微調(diào)。這對依賴精確時(shí)間安排的任務(wù)的測試很有用處金闽。這個調(diào)度器有三個額外的方法:
- advanceTimeTo(time,unit) 向前波動調(diào)度器的時(shí)鐘到一個指定的時(shí)間點(diǎn)
- advanceTimeBy(time,unit) 將調(diào)度器的時(shí)鐘向前撥動一個指定的時(shí)間段
- triggerActions(?) 開始執(zhí)行任何計(jì)劃中的但是未啟動的任務(wù)纯露,如果它們的計(jì)劃時(shí)間等于或者早于調(diào)度器時(shí)鐘的當(dāng)前時(shí)間
操作符分類
ReactiveX的每種編程語言的實(shí)現(xiàn)都實(shí)現(xiàn)了一組操作符的集合。不同的實(shí)現(xiàn)之間有很多重疊的部分代芜,也有一些操作符只存在特定的實(shí)現(xiàn)中埠褪。每種實(shí)現(xiàn)都傾向于用那種編程語言中他們熟悉的上下文中相似的方法給這些操作符命名。
本文首先會給出ReactiveX的核心操作符列表和對應(yīng)的文檔鏈接,后面還有一個決策樹用于幫助你根據(jù)具體的場景選擇合適的操作符钞速。最后有一個語言特定實(shí)現(xiàn)的按字母排序的操作符列表贷掖。
如果你想實(shí)現(xiàn)你自己的操作符,可以參考這里:實(shí)現(xiàn)自定義操作符
創(chuàng)建操作
用于創(chuàng)建Observable的操作符
-
Create
— 通過調(diào)用觀察者的方法從頭創(chuàng)建一個Observable -
Defer
— 在觀察者訂閱之前不創(chuàng)建這個Observable玉工,為每一個觀察者創(chuàng)建一個新的Observable -
Empty/Never/Throw
— 創(chuàng)建行為受限的特殊Observable -
From
— 將其它的對象或數(shù)據(jù)結(jié)構(gòu)轉(zhuǎn)換為Observable -
Interval
— 創(chuàng)建一個定時(shí)發(fā)射整數(shù)序列的Observable -
Just
— 將對象或者對象集合轉(zhuǎn)換為一個會發(fā)射這些對象的Observable -
Range
— 創(chuàng)建發(fā)射指定范圍的整數(shù)序列的Observable -
Repeat
— 創(chuàng)建重復(fù)發(fā)射特定的數(shù)據(jù)或數(shù)據(jù)序列的Observable -
Start
— 創(chuàng)建發(fā)射一個函數(shù)的返回值的Observable -
Timer
— 創(chuàng)建在一個指定的延遲之后發(fā)射單個數(shù)據(jù)的Observable
變換操作
這些操作符可用于對Observable發(fā)射的數(shù)據(jù)進(jìn)行變換羽资,詳細(xì)解釋可以看每個操作符的文檔
-
Buffer
— 緩存,可以簡單的理解為緩存遵班,它定期從Observable收集數(shù)據(jù)到一個集合屠升,然后把這些數(shù)據(jù)集合打包發(fā)射,而不是一次發(fā)射一個 -
FlatMap
— 扁平映射狭郑,將Observable發(fā)射的數(shù)據(jù)變換為Observables集合腹暖,然后將這些Observable發(fā)射的數(shù)據(jù)平坦化的放進(jìn)一個單獨(dú)的Observable,可以認(rèn)為是一個將嵌套的數(shù)據(jù)結(jié)構(gòu)展開的過程翰萨。 -
GroupBy
— 分組脏答,將原來的Observable分拆為Observable集合,將原始Observable發(fā)射的數(shù)據(jù)按Key分組亩鬼,每一個Observable發(fā)射一組不同的數(shù)據(jù) -
Map
— 映射殖告,通過對序列的每一項(xiàng)都應(yīng)用一個函數(shù)變換Observable發(fā)射的數(shù)據(jù),實(shí)質(zhì)是對序列中的每一項(xiàng)執(zhí)行一個函數(shù)雳锋,函數(shù)的參數(shù)就是這個數(shù)據(jù)項(xiàng) -
Scan
— 掃描黄绩,對Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個函數(shù),然后按順序依次發(fā)射這些值 -
Window
— 窗口玷过,定期將來自O(shè)bservable的數(shù)據(jù)分拆成一些Observable窗口爽丹,然后發(fā)射這些窗口,而不是每次發(fā)射一項(xiàng)辛蚊。類似于Buffer粤蝎,但Buffer發(fā)射的是數(shù)據(jù),Window發(fā)射的是Observable袋马,每一個Observable發(fā)射原始Observable的數(shù)據(jù)的一個子集
過濾操作
這些操作符用于從Observable發(fā)射的數(shù)據(jù)中進(jìn)行選擇
-
Debounce
— 只有在空閑了一段時(shí)間后才發(fā)射數(shù)據(jù)初澎,通俗的說,就是如果一段時(shí)間沒有操作虑凛,就執(zhí)行一次操作 -
Distinct
— 去重碑宴,過濾掉重復(fù)數(shù)據(jù)項(xiàng) -
ElementAt
— 取值,取特定位置的數(shù)據(jù)項(xiàng) -
Filter
— 過濾卧檐,過濾掉沒有通過謂詞測試的數(shù)據(jù)項(xiàng)墓懂,只發(fā)射通過測試的 -
First
— 首項(xiàng)焰宣,只發(fā)射滿足條件的第一條數(shù)據(jù) -
IgnoreElements
— 忽略所有的數(shù)據(jù)霉囚,只保留終止通知(onError或onCompleted) -
Last
— 末項(xiàng),只發(fā)射最后一條數(shù)據(jù) -
Sample
— 取樣匕积,定期發(fā)射最新的數(shù)據(jù)盈罐,等于是數(shù)據(jù)抽樣榜跌,有的實(shí)現(xiàn)里叫ThrottleFirst -
Skip
— 跳過前面的若干項(xiàng)數(shù)據(jù) -
SkipLast
— 跳過后面的若干項(xiàng)數(shù)據(jù) -
Take
— 只保留前面的若干項(xiàng)數(shù)據(jù) -
TakeLast
— 只保留后面的若干項(xiàng)數(shù)據(jù)
組合操作
組合操作符用于將多個Observable組合成一個單一的Observable
-
And/Then/When
— 通過模式(And條件)和計(jì)劃(Then次序)組合兩個或多個Observable發(fā)射的數(shù)據(jù)集 -
CombineLatest
— 當(dāng)兩個Observables中的任何一個發(fā)射了一個數(shù)據(jù)時(shí),通過一個指定的函數(shù)組合每個Observable發(fā)射的最新數(shù)據(jù)(一共兩個數(shù)據(jù))盅粪,然后發(fā)射這個函數(shù)的結(jié)果 -
Join
— 無論何時(shí)钓葫,如果一個Observable發(fā)射了一個數(shù)據(jù)項(xiàng),只要在另一個Observable發(fā)射的數(shù)據(jù)項(xiàng)定義的時(shí)間窗口內(nèi)票顾,就將兩個Observable發(fā)射的數(shù)據(jù)合并發(fā)射 -
Merge
— 將兩個Observable發(fā)射的數(shù)據(jù)組合并成一個 -
StartWith
— 在發(fā)射原來的Observable的數(shù)據(jù)序列之前础浮,先發(fā)射一個指定的數(shù)據(jù)序列或數(shù)據(jù)項(xiàng) -
Switch
— 將一個發(fā)射Observable序列的Observable轉(zhuǎn)換為這樣一個Observable:它逐個發(fā)射那些Observable最近發(fā)射的數(shù)據(jù) -
Zip
— 打包,使用一個指定的函數(shù)將多個Observable發(fā)射的數(shù)據(jù)組合在一起奠骄,然后將這個函數(shù)的結(jié)果作為單項(xiàng)數(shù)據(jù)發(fā)射
錯誤處理
這些操作符用于從錯誤通知中恢復(fù)
-
Catch
— 捕獲豆同,繼續(xù)序列操作,將錯誤替換為正常的數(shù)據(jù)含鳞,從onError通知中恢復(fù) -
Retry
— 重試影锈,如果Observable發(fā)射了一個錯誤通知,重新訂閱它蝉绷,期待它正常終止
輔助操作
一組用于處理Observable的操作符
-
Delay
— 延遲一段時(shí)間發(fā)射結(jié)果數(shù)據(jù) -
Do
— 注冊一個動作占用一些Observable的生命周期事件鸭廷,相當(dāng)于Mock某個操作 -
Materialize/Dematerialize
— 將發(fā)射的數(shù)據(jù)和通知都當(dāng)做數(shù)據(jù)發(fā)射,或者反過來 -
ObserveOn
— 指定觀察者觀察Observable的調(diào)度程序(工作線程) -
Serialize
— 強(qiáng)制Observable按次序發(fā)射數(shù)據(jù)并且功能是有效的 -
Subscribe
— 收到Observable發(fā)射的數(shù)據(jù)和通知后執(zhí)行的操作 -
SubscribeOn
— 指定Observable應(yīng)該在哪個調(diào)度程序上執(zhí)行 -
TimeInterval
— 將一個Observable轉(zhuǎn)換為發(fā)射兩個數(shù)據(jù)之間所耗費(fèi)時(shí)間的Observable -
Timeout
— 添加超時(shí)機(jī)制熔吗,如果過了指定的一段時(shí)間沒有發(fā)射數(shù)據(jù)辆床,就發(fā)射一個錯誤通知 -
Timestamp
— 給Observable發(fā)射的每個數(shù)據(jù)項(xiàng)添加一個時(shí)間戳 -
Using
— 創(chuàng)建一個只在Observable的生命周期內(nèi)存在的一次性資源
條件和布爾操作
這些操作符可用于單個或多個數(shù)據(jù)項(xiàng),也可用于Observable
-
All
— 判斷Observable發(fā)射的所有的數(shù)據(jù)項(xiàng)是否都滿足某個條件 -
Amb
— 給定多個Observable磁滚,只讓第一個發(fā)射數(shù)據(jù)的Observable發(fā)射全部數(shù)據(jù) -
Contains
— 判斷Observable是否會發(fā)射一個指定的數(shù)據(jù)項(xiàng) -
DefaultIfEmpty
— 發(fā)射來自原始Observable的數(shù)據(jù)佛吓,如果原始Observable沒有發(fā)射數(shù)據(jù),就發(fā)射一個默認(rèn)數(shù)據(jù) -
SequenceEqual
— 判斷兩個Observable是否按相同的數(shù)據(jù)序列 -
SkipUntil
— 丟棄原始Observable發(fā)射的數(shù)據(jù)垂攘,直到第二個Observable發(fā)射了一個數(shù)據(jù)维雇,然后發(fā)射原始Observable的剩余數(shù)據(jù) -
SkipWhile
— 丟棄原始Observable發(fā)射的數(shù)據(jù),直到一個特定的條件為假晒他,然后發(fā)射原始Observable剩余的數(shù)據(jù) -
TakeUntil
— 發(fā)射來自原始Observable的數(shù)據(jù)吱型,直到第二個Observable發(fā)射了一個數(shù)據(jù)或一個通知 -
TakeWhile
— 發(fā)射原始Observable的數(shù)據(jù),直到一個特定的條件為真陨仅,然后跳過剩余的數(shù)據(jù)
算術(shù)和聚合操作
這些操作符可用于整個數(shù)據(jù)序列
-
Average
— 計(jì)算Observable發(fā)射的數(shù)據(jù)序列的平均值津滞,然后發(fā)射這個結(jié)果 -
Concat
— 不交錯的連接多個Observable的數(shù)據(jù) -
Count
— 計(jì)算Observable發(fā)射的數(shù)據(jù)個數(shù),然后發(fā)射這個結(jié)果 -
Max
— 計(jì)算并發(fā)射數(shù)據(jù)序列的最大值 -
Min
— 計(jì)算并發(fā)射數(shù)據(jù)序列的最小值 -
Reduce
— 按順序?qū)?shù)據(jù)序列的每一個應(yīng)用某個函數(shù)灼伤,然后返回這個值 -
Sum
— 計(jì)算并發(fā)射數(shù)據(jù)序列的和
連接操作
一些有精確可控的訂閱行為的特殊Observable
-
Connect
— 指示一個可連接的Observable開始發(fā)射數(shù)據(jù)給訂閱者 -
Publish
— 將一個普通的Observable轉(zhuǎn)換為可連接的 -
RefCount
— 使一個可連接的Observable表現(xiàn)得像一個普通的Observable -
Replay
— 確保所有的觀察者收到同樣的數(shù)據(jù)序列触徐,即使他們在Observable開始發(fā)射數(shù)據(jù)之后才訂閱
轉(zhuǎn)換操作
操作符決策樹
幾種主要的需求
- 直接創(chuàng)建一個Observable(創(chuàng)建操作)
- 組合多個Observable(組合操作)
- 對Observable發(fā)射的數(shù)據(jù)執(zhí)行變換操作(變換操作)
- 從Observable發(fā)射的數(shù)據(jù)中取特定的值(過濾操作)
- 轉(zhuǎn)發(fā)Observable的部分值(條件/布爾/過濾操作)
- 對Observable發(fā)射的數(shù)據(jù)序列求值(算術(shù)/聚合操作)
這個頁面展示了創(chuàng)建Observable的各種方法。
- just(?) — 將一個或多個對象轉(zhuǎn)換成發(fā)射這個或這些對象的一個Observable
- from(?) — 將一個Iterable, 一個Future, 或者一個數(shù)組轉(zhuǎn)換成一個Observable
- repeat(?) — 創(chuàng)建一個重復(fù)發(fā)射指定數(shù)據(jù)或數(shù)據(jù)序列的Observable
- repeatWhen(?) — 創(chuàng)建一個重復(fù)發(fā)射指定數(shù)據(jù)或數(shù)據(jù)序列的Observable狐赡,它依賴于另一個Observable發(fā)射的數(shù)據(jù)
- create(?) — 使用一個函數(shù)從頭創(chuàng)建一個Observable
- defer(?) — 只有當(dāng)訂閱者訂閱才創(chuàng)建Observable撞鹉;為每個訂閱創(chuàng)建一個新的Observable
- range(?) — 創(chuàng)建一個發(fā)射指定范圍的整數(shù)序列的Observable
- interval(?) — 創(chuàng)建一個按照給定的時(shí)間間隔發(fā)射整數(shù)序列的Observable
- timer(?) — 創(chuàng)建一個在給定的延時(shí)之后發(fā)射單個數(shù)據(jù)的Observable
- empty(?) — 創(chuàng)建一個什么都不做直接通知完成的Observable
- error(?) — 創(chuàng)建一個什么都不做直接通知錯誤的Observable
- never(?) — 創(chuàng)建一個不發(fā)射任何數(shù)據(jù)的Observable