nest
nest
操作符有一個(gè)特殊的用途:將一個(gè)Observable轉(zhuǎn)換為一個(gè)發(fā)射這個(gè)Observable的Observable蹋盆。
條件和布爾操作
這個(gè)頁面的操作符可用于根據(jù)條件發(fā)射或變換Observables同欠,或者對(duì)它們做布爾運(yùn)算:
條件操作符
- amb(?) — 給定多個(gè)Observable,只讓第一個(gè)發(fā)射數(shù)據(jù)的Observable發(fā)射全部數(shù)據(jù)
- defaultIfEmpty(?) — 發(fā)射來自原始Observable的數(shù)據(jù)震嫉,如果原始Observable沒有發(fā)射數(shù)據(jù)茵瘾,就發(fā)射一個(gè)默認(rèn)數(shù)據(jù)
- (
rxjava-computation-expressions
) doWhile(?) — 發(fā)射原始Observable的數(shù)據(jù)序列,然后重復(fù)發(fā)射這個(gè)序列直到不滿足這個(gè)條件為止 - (
rxjava-computation-expressions
) ifThen(?) — 只有當(dāng)某個(gè)條件為真時(shí)才發(fā)射原始Observable的數(shù)據(jù)序列,否則發(fā)射一個(gè)空的或默認(rèn)的序列 - skipUntil(?) — 丟棄原始Observable發(fā)射的數(shù)據(jù)丙号,直到第二個(gè)Observable發(fā)射了一個(gè)數(shù)據(jù),然后發(fā)射原始Observable的剩余數(shù)據(jù)
- skipWhile(?) — 丟棄原始Observable發(fā)射的數(shù)據(jù)缰冤,直到一個(gè)特定的條件為假犬缨,然后發(fā)射原始Observable剩余的數(shù)據(jù)
- (
rxjava-computation-expressions
) switchCase(?) — 基于一個(gè)計(jì)算結(jié)果,發(fā)射一個(gè)指定Observable的數(shù)據(jù)序列 - takeUntil(?) — 發(fā)射來自原始Observable的數(shù)據(jù)棉浸,直到第二個(gè)Observable發(fā)射了一個(gè)數(shù)據(jù)或一個(gè)通知
- takeWhile(?) and takeWhileWithIndex(?) — 發(fā)射原始Observable的數(shù)據(jù)遍尺,直到一個(gè)特定的條件為真,然后跳過剩余的數(shù)據(jù)
- (
rxjava-computation-expressions
) whileDo(?) — 如果條件為true
涮拗,則發(fā)射源Observable數(shù)據(jù)序列乾戏,并且只要條件保持為true
就重復(fù)發(fā)射此數(shù)據(jù)序列
(
rxjava-computation-expressions
) — 表示這個(gè)操作符當(dāng)前是可選包rxjava-computation-expressions
的一部分,還沒有包含在標(biāo)準(zhǔn)RxJava的操作符集合里
布爾操作符
- all(?) — 判斷是否所有的數(shù)據(jù)項(xiàng)都滿足某個(gè)條件
- contains(?) — 判斷Observable是否會(huì)發(fā)射一個(gè)指定的值
- exists(?) and isEmpty(?) — 判斷Observable是否發(fā)射了一個(gè)值
- sequenceEqual(?) — 判斷兩個(gè)Observables發(fā)射的序列是否相等
條件和布爾操作
All
判定是否Observable發(fā)射的所有數(shù)據(jù)都滿足某個(gè)條件
傳遞一個(gè)謂詞函數(shù)給All
操作符,這個(gè)函數(shù)接受原始Observable發(fā)射的數(shù)據(jù),根據(jù)計(jì)算返回一個(gè)布爾值帆啃。All
返回一個(gè)只發(fā)射一個(gè)單個(gè)布爾值的Observable台囱,如果原始Observable正常終止并且每一項(xiàng)數(shù)據(jù)都滿足條件,就返回true;如果原始Observable的任何一項(xiàng)數(shù)據(jù)不滿足條件就返回False。
RxJava將這個(gè)操作符實(shí)現(xiàn)為all
,它默認(rèn)不在任何特定的調(diào)度器上執(zhí)行摆出。
- Javadoc: all(Func1)
Amb
給定兩個(gè)或多個(gè)Observables,它只發(fā)射首先發(fā)射數(shù)據(jù)或通知的那個(gè)Observable的所有數(shù)據(jù)
當(dāng)你傳遞多個(gè)Observable給Amb
時(shí)首妖,它只發(fā)射其中一個(gè)Observable的數(shù)據(jù)和通知:首先發(fā)送通知給Amb
的那個(gè)偎漫,不管發(fā)射的是一項(xiàng)數(shù)據(jù)還是一個(gè)onError
或onCompleted
通知。Amb
將忽略和丟棄其它所有Observables的發(fā)射物有缆。
RxJava的實(shí)現(xiàn)是amb
象踊,有一個(gè)類似的對(duì)象方法ambWith
温亲。例如,Observable.amb(o1,o2)
和o1.ambWith(o2)
是等價(jià)的杯矩。
這個(gè)操作符默認(rèn)不在任何特定的調(diào)度器上執(zhí)行栈虚。
Contains
判定一個(gè)Observable是否發(fā)射一個(gè)特定的值
給Contains
傳一個(gè)指定的值,如果原始Observable發(fā)射了那個(gè)值史隆,它返回的Observable將發(fā)射true魂务,否則發(fā)射false。
相關(guān)的一個(gè)操作符IsEmpty
用于判定原始Observable是否沒有發(fā)射任何數(shù)據(jù)泌射。
contains
默認(rèn)不在任何特定的調(diào)度器上執(zhí)行粘姜。
- Javadoc: contains(Object)
RxJava中還有一個(gè)exists
操作符,它通過一個(gè)謂詞函數(shù)測(cè)試原始Observable發(fā)射的數(shù)據(jù)魄幕,只要任何一項(xiàng)滿足條件就返回一個(gè)發(fā)射true的Observable相艇,否則返回一個(gè)發(fā)射false的Observable颖杏。
exists
默認(rèn)不在任何特定的調(diào)度器上執(zhí)行纯陨。
- Javadoc: exists(Func1)
isEmpty
默認(rèn)不在任何特定的調(diào)度器上執(zhí)行。
- Javadoc: isEmpty()
DefaultIfEmpty
發(fā)射來自原始Observable的值留储,如果原始Observable沒有發(fā)射任何值翼抠,就發(fā)射一個(gè)默認(rèn)值
DefaultIfEmpty
簡(jiǎn)單的精確地發(fā)射原始Observable的值,如果原始Observable沒有發(fā)射任何數(shù)據(jù)正常終止(以onCompleted
d的形式)获讳,DefaultIfEmpty
返回的Observable就發(fā)射一個(gè)你提供的默認(rèn)值阴颖。
RxJava將這個(gè)操作符實(shí)現(xiàn)為defaultIfEmpty
。它默認(rèn)不在任何特定的調(diào)度器上執(zhí)行丐膝。
- Javadoc: defaultIfEmpty(T)
還有一個(gè)新的操作符switchIfEmpty
量愧,不在RxJava 1.0.0版中,它和defaultIfEmtpy
類似帅矗,不同的是偎肃,如果原始Observable沒有發(fā)射數(shù)據(jù),它發(fā)射一個(gè)備用Observable的發(fā)射物浑此。
SequenceEqual
判定兩個(gè)Observables是否發(fā)射相同的數(shù)據(jù)序列累颂。
傳遞兩個(gè)Observable給SequenceEqual
操作符,它會(huì)比較兩個(gè)Observable的發(fā)射物凛俱,如果兩個(gè)序列是相同的(相同的數(shù)據(jù)紊馏,相同的順序,相同的終止?fàn)顟B(tài))蒲犬,它就發(fā)射true朱监,否則發(fā)射false。
它還有一個(gè)版本接受第三個(gè)參數(shù)原叮,可以傳遞一個(gè)函數(shù)用于比較兩個(gè)數(shù)據(jù)項(xiàng)是否相同赌朋。
這個(gè)操作符默認(rèn)不在任何特定的調(diào)度器上執(zhí)行凰狞。
SkipUntil
丟棄原始Observable發(fā)射的數(shù)據(jù),直到第二個(gè)Observable發(fā)射了一項(xiàng)數(shù)據(jù)
SkipUntil
訂閱原始的Observable沛慢,但是忽略它的發(fā)射物赡若,直到第二個(gè)Observable發(fā)射了一項(xiàng)數(shù)據(jù)那一刻,它開始發(fā)射原始Observable团甲。
RxJava中對(duì)應(yīng)的是skipUntil
逾冬,它默認(rèn)不在任何特定的調(diào)度器上執(zhí)行。
- Javadoc: skipUntil(Observable)
SkipWhile
丟棄Observable發(fā)射的數(shù)據(jù)躺苦,直到一個(gè)指定的條件不成立
SkipWhile
訂閱原始的Observable身腻,但是忽略它的發(fā)射物,直到你指定的某個(gè)條件變?yōu)閒alse的那一刻匹厘,它開始發(fā)射原始Observable嘀趟。
skipWhile
默認(rèn)不在任何特定的調(diào)度器上執(zhí)行。
- Javadoc: skipWhile(Func1)
TakeUntil
當(dāng)?shù)诙€(gè)Observable發(fā)射了一項(xiàng)數(shù)據(jù)或者終止時(shí)愈诚,丟棄原始Observable發(fā)射的任何數(shù)據(jù)
TakeUntil
訂閱并開始發(fā)射原始Observable她按,它還監(jiān)視你提供的第二個(gè)Observable。如果第二個(gè)Observable發(fā)射了一項(xiàng)數(shù)據(jù)或者發(fā)射了一個(gè)終止通知炕柔,TakeUntil
返回的Observable會(huì)停止發(fā)射原始Observable并終止酌泰。
RxJava中的實(shí)現(xiàn)是takeUntil
。注意:第二個(gè)Observable發(fā)射一項(xiàng)數(shù)據(jù)或一個(gè)onError
通知或一個(gè)onCompleted
通知都會(huì)導(dǎo)致takeUntil
停止發(fā)射數(shù)據(jù)匕累。
takeUntil
默認(rèn)不在任何特定的調(diào)度器上執(zhí)行陵刹。
- Javadoc: takeUntil(Observable)
還有一個(gè)版本的takeUntil
,不在RxJava 1.0.0版中欢嘿,它使用一個(gè)謂詞函數(shù)而不是第二個(gè)Observable來判定是否需要終止發(fā)射數(shù)據(jù)衰琐,它的行為類似于takeWhile
。
- Javadoc: takeUntil(Func1)
TakeWhile
發(fā)射Observable發(fā)射的數(shù)據(jù)炼蹦,直到一個(gè)指定的條件不成立
TakeWhile
發(fā)射原始Observable羡宙,直到你指定的某個(gè)條件不成立的那一刻,它停止發(fā)射原始Observable框弛,并終止自己的Observable辛辨。
RxJava中的takeWhile
操作符返回一個(gè)鏡像原始Observable行為的Observable,直到某一項(xiàng)數(shù)據(jù)你指定的函數(shù)返回false
那一刻瑟枫,這個(gè)新的Observable發(fā)射onCompleted
終止通知斗搞。
takeWhile
默認(rèn)不在任何特定的調(diào)度器上執(zhí)行。
- Javadoc: takeWhile(Func1)
算術(shù)和聚合操作
本頁展示的操作符用于對(duì)整個(gè)序列執(zhí)行算法操作或其它操作慷妙,由于這些操作必須等待數(shù)據(jù)發(fā)射完成(通常也必須緩存這些數(shù)據(jù))僻焚,它們對(duì)于非常長(zhǎng)或者無限的序列來說是危險(xiǎn)的,不推薦使用膝擂。
rxjava-math
模塊的操作符
- averageInteger(?) — 求序列平均數(shù)并發(fā)射
- averageLong(?) — 求序列平均數(shù)并發(fā)射
- averageFloat(?) — 求序列平均數(shù)并發(fā)射
- averageDouble(?) — 求序列平均數(shù)并發(fā)射
- max(?) — 求序列最大值并發(fā)射
- maxBy(?) — 求最大key對(duì)應(yīng)的值并發(fā)射
- min(?) — 求最小值并發(fā)射
- minBy(?) — 求最小Key對(duì)應(yīng)的值并發(fā)射
- sumInteger(?) — 求和并發(fā)射
- sumLong(?) — 求和并發(fā)射
- sumFloat(?) — 求和并發(fā)射
- sumDouble(?) — 求和并發(fā)射
其它聚合操作符
- concat(?) — 順序連接多個(gè)Observables
- count(?) and countLong(?) — 計(jì)算數(shù)據(jù)項(xiàng)的個(gè)數(shù)并發(fā)射結(jié)果
- reduce(?) — 對(duì)序列使用reduce()函數(shù)并發(fā)射最終的結(jié)果
- collect(?) — 將原始Observable發(fā)射的數(shù)據(jù)放到一個(gè)單一的可變的數(shù)據(jù)結(jié)構(gòu)中虑啤,然后返回一個(gè)發(fā)射這個(gè)數(shù)據(jù)結(jié)構(gòu)的Observable
- toList(?) — 收集原始Observable發(fā)射的所有數(shù)據(jù)到一個(gè)列表隙弛,然后返回這個(gè)列表
- toSortedList(?) — 收集原始Observable發(fā)射的所有數(shù)據(jù)到一個(gè)有序列表,然后返回這個(gè)列表
- toMap(?) — 將序列數(shù)據(jù)轉(zhuǎn)換為一個(gè)Map狞山,Map的key是根據(jù)一個(gè)函數(shù)計(jì)算的
- toMultiMap(?) — 將序列數(shù)據(jù)轉(zhuǎn)換為一個(gè)列表全闷,同時(shí)也是一個(gè)Map,Map的key是根據(jù)一個(gè)函數(shù)計(jì)算的
算術(shù)和聚合操作
Average
計(jì)算原始Observable發(fā)射數(shù)字的平均值并發(fā)射它
Average
操作符操作符一個(gè)發(fā)射數(shù)字的Observable萍启,并發(fā)射單個(gè)值:原始Observable發(fā)射的數(shù)字序列的平均值总珠。
這個(gè)操作符不包含在RxJava核心模塊中,它屬于不同的rxjava-math
模塊勘纯。它被實(shí)現(xiàn)為四個(gè)操作符:averageDouble
, averageFloat
, averageInteger
, averageLong
局服。
如果原始Observable不發(fā)射任何數(shù)據(jù),這個(gè)操作符會(huì)拋異常:IllegalArgumentException
驳遵。
Min
發(fā)射原始Observable的最小值
Min
操作符操作一個(gè)發(fā)射數(shù)值的Observable并發(fā)射單個(gè)值:最小的那個(gè)值淫奔。
RxJava中,min
屬于rxjava-math
模塊堤结。
min
接受一個(gè)可選參數(shù)唆迁,用于比較兩項(xiàng)數(shù)據(jù)的大小,如果最小值的數(shù)據(jù)超過一項(xiàng)霍殴,min
會(huì)發(fā)射原始Observable最近發(fā)射的那一項(xiàng)媒惕。
minBy
類似于min
系吩,但是它發(fā)射的不是最小值来庭,而是發(fā)射Key最小的項(xiàng),Key由你指定的一個(gè)函數(shù)生成穿挨。
Max
發(fā)射原始Observable的最大值
Max
操作符操作一個(gè)發(fā)射數(shù)值的Observable并發(fā)射單個(gè)值:最大的那個(gè)值月弛。
RxJava中,max
屬于rxjava-math
模塊科盛。
max
接受一個(gè)可選參數(shù)帽衙,用于比較兩項(xiàng)數(shù)據(jù)的大小,如果最大值的數(shù)據(jù)超過一項(xiàng)贞绵,max
會(huì)發(fā)射原始Observable最近發(fā)射的那一項(xiàng)厉萝。
maxBy
類似于max
,但是它發(fā)射的不是最大值榨崩,而是發(fā)射Key最大的項(xiàng)谴垫,Key由你指定的一個(gè)函數(shù)生成。
Count
計(jì)算原始Observable發(fā)射物的數(shù)量母蛛,然后只發(fā)射這個(gè)值
Count
操作符將一個(gè)Observable轉(zhuǎn)換成一個(gè)發(fā)射單個(gè)值的Observable翩剪,這個(gè)值表示原始Observable發(fā)射的數(shù)據(jù)的數(shù)量。
如果原始Observable發(fā)生錯(cuò)誤終止彩郊,Count
不發(fā)射數(shù)據(jù)而是直接傳遞錯(cuò)誤通知前弯。如果原始Observable永遠(yuǎn)不終止蚪缀,Count
既不會(huì)發(fā)射數(shù)據(jù)也不會(huì)終止。
RxJava的實(shí)現(xiàn)是count
和countLong
恕出。
示例代碼
String[] items = new String[] { "one", "two", "three" };
assertEquals( new Integer(3), Observable.from(items).count().toBlocking().single() );
- Javadoc: count()
- Javadoc: countLong()
Sum
計(jì)算Observable發(fā)射的數(shù)值的和并發(fā)射這個(gè)和
Sum
操作符操作一個(gè)發(fā)射數(shù)值的Observable询枚,僅發(fā)射單個(gè)值:原始Observable所有數(shù)值的和。
RxJava的實(shí)現(xiàn)是sumDouble
, sumFloat
, sumInteger
, sumLong
浙巫,它們不是RxJava核心模塊的一部分哩盲,屬于rxjava-math
模塊。
你可以使用一個(gè)函數(shù)狈醉,計(jì)算Observable每一項(xiàng)數(shù)據(jù)的函數(shù)返回值的和廉油。
在StringObservable
類(這個(gè)類不是RxJava核心模塊的一部分)中有一個(gè)stringConcat
操作符,它將一個(gè)發(fā)射字符串序列的Observable轉(zhuǎn)換為一個(gè)發(fā)射單個(gè)字符串的Observable苗傅,后者這個(gè)字符串表示的是前者所有字符串的連接抒线。
StringObservable
類還有一個(gè)join
操作符,它將一個(gè)發(fā)射字符串序列的Observable轉(zhuǎn)換為一個(gè)發(fā)射單個(gè)字符串的Observable渣慕,后者這個(gè)字符串表示的是前者所有字符串以你指定的分界符連接的結(jié)果嘶炭。
Concat
不交錯(cuò)的發(fā)射兩個(gè)或多個(gè)Observable的發(fā)射物
Concat
操作符連接多個(gè)Observable的輸出,就好像它們是一個(gè)Observable逊桦,第一個(gè)Observable發(fā)射的所有數(shù)據(jù)在第二個(gè)Observable發(fā)射的任何數(shù)據(jù)前面眨猎,以此類推。
直到前面一個(gè)Observable終止强经,Concat
才會(huì)訂閱額外的一個(gè)Observable睡陪。注意:因此,如果你嘗試連接一個(gè)"熱"Observable(這種Observable在創(chuàng)建后立即開始發(fā)射數(shù)據(jù)匿情,即使沒有訂閱者)兰迫,Concat
將不會(huì)看到也不會(huì)發(fā)射它之前發(fā)射的任何數(shù)據(jù)。
在ReactiveX的某些實(shí)現(xiàn)中有一種ConcatMap
操作符(名字可能叫concat_all
, concat_map
, concatMapObserver
, for
, forIn/for_in
, mapcat
, selectConcat
或selectConcatObserver
)炬称,他會(huì)變換原始Observable發(fā)射的數(shù)據(jù)到一個(gè)對(duì)應(yīng)的Observable汁果,然后再按觀察和變換的順序進(jìn)行連接操作。
StartWith
操作符類似于Concat
玲躯,但是它是插入到前面据德,而不是追加那些Observable的數(shù)據(jù)到原始Observable發(fā)射的數(shù)據(jù)序列。
Merge
操作符也差不多跷车,它結(jié)合兩個(gè)或多個(gè)Observable的發(fā)射物棘利,但是數(shù)據(jù)可能交錯(cuò),而Concat
不會(huì)讓多個(gè)Observable的發(fā)射物交錯(cuò)姓赤。
RxJava中的實(shí)現(xiàn)叫concat
赡译。
- Javadoc: concat(Observable)
- Javadoc: concat(Observable,Observable)
還有一個(gè)實(shí)例方法叫concatWith
,這兩者是等價(jià)的:Observable.concat(a,b)
和a.concatWith(b)
不铆。
Reduce
按順序?qū)bservable發(fā)射的每項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù)并發(fā)射最終的值
Reduce
操作符對(duì)原始Observable發(fā)射數(shù)據(jù)的第一項(xiàng)應(yīng)用一個(gè)函數(shù)蝌焚,然后再將這個(gè)函數(shù)的返回值與第二項(xiàng)數(shù)據(jù)一起傳遞給函數(shù)裹唆,以此類推,持續(xù)這個(gè)過程知道原始Observable發(fā)射它的最后一項(xiàng)數(shù)據(jù)并終止只洒,此時(shí)Reduce
返回的Observable發(fā)射這個(gè)函數(shù)返回的最終值许帐。
在其它場(chǎng)景中,這種操作有時(shí)被稱為累積
毕谴,聚集
成畦,壓縮
,折疊
涝开,注射
等循帐。
注意如果原始Observable沒有發(fā)射任何數(shù)據(jù),reduce
拋出異常IllegalArgumentException
舀武。
reduce
默認(rèn)不在任何特定的調(diào)度器上執(zhí)行拄养。
- Javadoc: reduce(Func2)
還有一個(gè)版本的reduce
額外接受一個(gè)種子參數(shù)。注意傳遞一個(gè)值為null
的種子是合法的银舱,但是與不傳種子參數(shù)的行為是不同的瘪匿。如果你傳遞了種子參數(shù),并且原始Observable沒有發(fā)射任何數(shù)據(jù)寻馏,reduce
操作符將發(fā)射這個(gè)種子值然后正常終止棋弥,而不是拋異常。
- Javadoc: reduce(R,Func2)
提示:不建議使用reduce
收集發(fā)射的數(shù)據(jù)到一個(gè)可變的數(shù)據(jù)結(jié)構(gòu)诚欠,那種場(chǎng)景你應(yīng)該使用collect
顽染。
collect
與reduce
類似,但它的目的是收集原始Observable發(fā)射的所有數(shù)據(jù)到一個(gè)可變的數(shù)據(jù)結(jié)構(gòu)聂薪,collect
生成的這個(gè)Observable會(huì)發(fā)射這項(xiàng)數(shù)據(jù)家乘。它需要兩個(gè)參數(shù):
- 一個(gè)函數(shù)返回可變數(shù)據(jù)結(jié)構(gòu)
- 另一個(gè)函數(shù)蝗羊,當(dāng)傳遞給它這個(gè)數(shù)據(jù)結(jié)構(gòu)和原始Observable發(fā)射的數(shù)據(jù)項(xiàng)時(shí)藏澳,適當(dāng)?shù)匦薷臄?shù)據(jù)結(jié)構(gòu)。
collect
默認(rèn)不在任何特定的調(diào)度器上執(zhí)行耀找。
- Javadoc: collect(Func0,Action2)
異步操作
下面的這些操作符屬于單獨(dú)的rxjava-async
模塊翔悠,它們用于將同步對(duì)象轉(zhuǎn)換為Observable。
- start(?) — 創(chuàng)建一個(gè)Observable野芒,它發(fā)射一個(gè)函數(shù)的返回值
- toAsync(?) or asyncAction(?) or asyncFunc(?) — 將一個(gè)函數(shù)或者Action轉(zhuǎn)換為已Observable蓄愁,它執(zhí)行這個(gè)函數(shù)并發(fā)射函數(shù)的返回值
- startFuture(?) — 將一個(gè)返回Future的函數(shù)轉(zhuǎn)換為一個(gè)Observable,它發(fā)射Future的返回值
- deferFuture(?) — 將一個(gè)返回Observable的Future轉(zhuǎn)換為一個(gè)Observable狞悲,但是并不嘗試獲取這個(gè)Future返回的Observable撮抓,直到有訂閱者訂閱它
- forEachFuture(?) — 傳遞Subscriber方法給一個(gè)Subscriber,但是同時(shí)表現(xiàn)得像一個(gè)Future一樣阻塞直到它完成
- fromAction(?) — 將一個(gè)Action轉(zhuǎn)換為Observable摇锋,當(dāng)一個(gè)訂閱者訂閱時(shí)丹拯,它執(zhí)行這個(gè)action并發(fā)射它的返回值
- fromCallable(?) — 將一個(gè)Callable轉(zhuǎn)換為Observable站超,當(dāng)一個(gè)訂閱者訂閱時(shí),它執(zhí)行這個(gè)Callable并發(fā)射Callable的返回值乖酬,或者發(fā)射異常
- fromRunnable(?) — convert a Runnable into an Observable that invokes the runable and emits its result when a Subscriber subscribes將一個(gè)Runnable轉(zhuǎn)換為Observable死相,當(dāng)一個(gè)訂閱者訂閱時(shí),它執(zhí)行這個(gè)Runnable并發(fā)射Runnable的返回值
- runAsync(?) — 返回一個(gè)StoppableObservable咬像,它發(fā)射某個(gè)Scheduler上指定的Action生成的多個(gè)actions
連接操作
這一節(jié)解釋ConnectableObservable
和它的子類以及它們的操作符:
- ConnectableObservable.connect(?) — 指示一個(gè)可連接的Observable開始發(fā)射數(shù)據(jù)
- Observable.publish(?) — 將一個(gè)Observable轉(zhuǎn)換為一個(gè)可連接的Observable
- Observable.replay(?) — 確保所有的訂閱者看到相同的數(shù)據(jù)序列算撮,即使它們?cè)贠bservable開始發(fā)射數(shù)據(jù)之后才訂閱
- ConnectableObservable.refCount(?) — 讓一個(gè)可連接的Observable表現(xiàn)得像一個(gè)普通的Observable
一個(gè)可連接的Observable與普通的Observable差不多,除了這一點(diǎn):可連接的Observable在被訂閱時(shí)并不開始發(fā)射數(shù)據(jù)县昂,只有在它的connect()
被調(diào)用時(shí)才開始肮柜。用這種方法,你可以等所有的潛在訂閱者都訂閱了這個(gè)Observable之后才開始發(fā)射數(shù)據(jù)倒彰。
The following example code shows two Subscribers subscribing to the same Observable. In the first case, they subscribe to an ordinary Observable; in the second case, they subscribe to a Connectable Observable that only connects after both Subscribers subscribe. Note the difference in the output: 下面的示例代碼展示了兩個(gè)訂閱者訂閱同一個(gè)Observable的情況素挽。第一種情形,它們訂閱一個(gè)普通的Observable狸驳;第二種情形预明,它們訂閱一個(gè)可連接的Observable,并且在兩個(gè)都訂閱后再連接耙箍。注意輸出的不同:
示例 #1:
def firstMillion = Observable.range( 1, 1000000 ).sample(7, java.util.concurrent.TimeUnit.MILLISECONDS);
firstMillion.subscribe(
{ println("Subscriber #1:" + it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence #1 complete"); } // onCompleted
);
firstMillion.subscribe(
{ println("Subscriber #2:" + it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence #2 complete"); } // onCompleted
);
Subscriber #1:211128
Subscriber #1:411633
Subscriber #1:629605
Subscriber #1:841903
Sequence #1 complete
Subscriber #2:244776
Subscriber #2:431416
Subscriber #2:621647
Subscriber #2:826996
Sequence #2 complete
示例 #2:
def firstMillion = Observable.range( 1, 1000000 ).sample(7, java.util.concurrent.TimeUnit.MILLISECONDS).publish();
firstMillion.subscribe(
{ println("Subscriber #1:" + it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence #1 complete"); } // onCompleted
);
firstMillion.subscribe(
{ println("Subscriber #2:" + it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence #2 complete"); } // onCompleted
);
firstMillion.connect();
Subscriber #2:208683
Subscriber #1:208683
Subscriber #2:432509
Subscriber #1:432509
Subscriber #2:644270
Subscriber #1:644270
Subscriber #2:887885
Subscriber #1:887885
Sequence #2 complete
Sequence #1 complete
Connect
讓一個(gè)可連接的Observable開始發(fā)射數(shù)據(jù)給訂閱者
可連接的Observable (connectable Observable)與普通的Observable差不多撰糠,不過它并不會(huì)在被訂閱時(shí)開始發(fā)射數(shù)據(jù),而是直到使用了Connect
操作符時(shí)才會(huì)開始辩昆。用這個(gè)方法阅酪,你可以等待所有的觀察者都訂閱了Observable之后再開始發(fā)射數(shù)據(jù)。
RxJava中connect
是ConnectableObservable
接口的一個(gè)方法汁针,使用publish
操作符可以將一個(gè)普通的Observable轉(zhuǎn)換為一個(gè)ConnectableObservable
术辐。
調(diào)用ConnectableObservable
的connect
方法會(huì)讓它后面的Observable開始給發(fā)射數(shù)據(jù)給訂閱者。
connect
方法返回一個(gè)Subscription
對(duì)象施无,可以調(diào)用它的unsubscribe
方法讓Observable停止發(fā)射數(shù)據(jù)給觀察者辉词。
即使沒有任何訂閱者訂閱它,你也可以使用connect
方法讓一個(gè)Observable開始發(fā)射數(shù)據(jù)(或者開始生成待發(fā)射的數(shù)據(jù))猾骡。這樣瑞躺,你可以將一個(gè)"冷"的Observable變?yōu)?熱"的。
- Javadoc: connect()
- Javadoc: connect(Action1)
Publish
將普通的Observable轉(zhuǎn)換為可連接的Observable
可連接的Observable (connectable Observable)與普通的Observable差不多兴想,不過它并不會(huì)在被訂閱時(shí)開始發(fā)射數(shù)據(jù)幢哨,而是直到使用了Connect
操作符時(shí)才會(huì)開始。用這種方法嫂便,你可以在任何時(shí)候讓一個(gè)Observable開始發(fā)射數(shù)據(jù)捞镰。
RxJava的實(shí)現(xiàn)為publish
。
- Javadoc: publish()
有一個(gè)變體接受一個(gè)函數(shù)作為參數(shù)。這個(gè)函數(shù)用原始Observable發(fā)射的數(shù)據(jù)作為參數(shù)岸售,產(chǎn)生一個(gè)新的數(shù)據(jù)作為ConnectableObservable
給發(fā)射几迄,替換原位置的數(shù)據(jù)項(xiàng)。實(shí)質(zhì)是在簽名的基礎(chǔ)上添加一個(gè)Map
操作冰评。
- Javadoc: publish(Func1)
RefCount
讓一個(gè)可連接的Observable行為像普通的Observable
可連接的Observable (connectable Observable)與普通的Observable差不多映胁,不過它并不會(huì)在被訂閱時(shí)開始發(fā)射數(shù)據(jù),而是直到使用了Connect
操作符時(shí)才會(huì)開始甲雅。用這種方法解孙,你可以在任何時(shí)候讓一個(gè)Observable開始發(fā)射數(shù)據(jù)。
RefCount
操作符把從一個(gè)可連接的Observable連接和斷開的過程自動(dòng)化了抛人。它操作一個(gè)可連接的Observable弛姜,返回一個(gè)普通的Observable。當(dāng)?shù)谝粋€(gè)訂閱者訂閱這個(gè)Observable時(shí)妖枚,RefCount
連接到下層的可連接Observable廷臼。RefCount
跟蹤有多少個(gè)觀察者訂閱它,直到最后一個(gè)觀察者完成才斷開與下層可連接Observable的連接绝页。
RxJava中的實(shí)現(xiàn)為refCount
荠商,還有一個(gè)操作符叫share
,它的作用等價(jià)于對(duì)一個(gè)Observable同時(shí)應(yīng)用publish
和refCount
操作续誉。
- Javadoc: refCount()
- Javadoc: share()
Replay
保證所有的觀察者收到相同的數(shù)據(jù)序列莱没,即使它們?cè)贠bservable開始發(fā)射數(shù)據(jù)之后才訂閱
可連接的Observable (connectable Observable)與普通的Observable差不多,不過它并不會(huì)在被訂閱時(shí)開始發(fā)射數(shù)據(jù)酷鸦,而是直到使用了Connect
操作符時(shí)才會(huì)開始饰躲。用這種方法,你可以在任何時(shí)候讓一個(gè)Observable開始發(fā)射數(shù)據(jù)臼隔。
如果在將一個(gè)Observable轉(zhuǎn)換為可連接的Observable之前對(duì)它使用Replay
操作符嘹裂,產(chǎn)生的這個(gè)可連接Observable將總是發(fā)射完整的數(shù)據(jù)序列給任何未來的觀察者,即使那些觀察者在這個(gè)Observable開始給其它觀察者發(fā)射數(shù)據(jù)之后才訂閱摔握。
RxJava的實(shí)現(xiàn)為replay
寄狼,它有多個(gè)接受不同參數(shù)的變體,有的可以指定replay
的最大緩存數(shù)量盒发,有的還可以指定調(diào)度器例嘱。
- Javadoc: replay()
- Javadoc: replay(int)
- Javadoc: replay(long,TimeUnit)
- Javadoc: replay(int,long,TimeUnit)
有一種 replay
返回一個(gè)普通的Observable闺阱。它可以接受一個(gè)變換函數(shù)為參數(shù)际看,這個(gè)函數(shù)接受原始Observable發(fā)射的數(shù)據(jù)項(xiàng)為參數(shù)法牲,返回結(jié)果Observable要發(fā)射的一項(xiàng)數(shù)據(jù)。因此蛮艰,這個(gè)操作符其實(shí)是replay
變換之后的數(shù)據(jù)項(xiàng)。
- Javadoc: replay(Func1)
- Javadoc: replay(Func1,int)
- Javadoc: replay(Func1,long,TimeUnit)
- Javadoc: replay(Func1,int,long,TimeUnit)
實(shí)現(xiàn)自己的操作符
你可以實(shí)現(xiàn)你自己的Observable操作符雀彼,本文展示怎么做壤蚜。
如果你的操作符是被用于創(chuàng)造一個(gè)Observable即寡,而不是變換或者響應(yīng)一個(gè)Observable,使用 create(?)
方法袜刷,不要試圖手動(dòng)實(shí)現(xiàn) Observable
聪富。另外,你可以按照下面的用法說明創(chuàng)建一個(gè)自定義的操作符著蟹。
如果你的操作符是用于Observable發(fā)射的單獨(dú)的數(shù)據(jù)項(xiàng)墩蔓,按照下面的說明做:Sequence Operators 。如果你的操作符是用于變換Observable發(fā)射的整個(gè)數(shù)據(jù)序列萧豆,按照這個(gè)說明做:Transformational Operators 奸披。
提示: 在一個(gè)類似于Groovy的語言Xtend中,你可以以 extension methods 的方式實(shí)現(xiàn)你自己的操作符 ,不使用本文的方法涮雷,它們也可以鏈?zhǔn)秸{(diào)用阵面。詳情參見 RxJava and Xtend
序列操作符
下面的例子向你展示了怎樣使用lift(?)
操作符將你的自定義操作符(在這個(gè)例子中是 myOperator
)與標(biāo)準(zhǔn)的RxJava操作符(如ofType
和map
)一起使用:
fooObservable = barObservable.ofType(Integer).map({it*2}).lift(new MyOperator<T>()).map({"transformed by myOperator: " + it});
下面這部分向你展示了你的操作符的腳手架形式,以便它能正確的與lift()
搭配使用洪鸭。
實(shí)現(xiàn)你的操作符
將你的自定義操作符定義為實(shí)現(xiàn)了 Operator
接口的一個(gè)公開類, 就像這樣:
public class MyOperator<T> implements Operator<T> {
public MyOperator( /* any necessary params here */ ) {
/* 這里添加必要的初始化代碼 */
}
@Override
public Subscriber<? super T> call(final Subscriber<? super T> s) {
return new Subscriber<t>(s) {
@Override
public void onCompleted() {
/* 這里添加你自己的onCompleted行為样刷,或者僅僅傳遞完成通知: */
if(!s.isUnsubscribed()) {
s.onCompleted();
}
}
@Override
public void onError(Throwable t) {
/* 這里添加你自己的onError行為, 或者僅僅傳遞錯(cuò)誤通知:*/
if(!s.isUnsubscribed()) {
s.onError(t);
}
}
@Override
public void onNext(T item) {
/* 這個(gè)例子對(duì)結(jié)果的每一項(xiàng)執(zhí)行排序操作,然后返回這個(gè)結(jié)果 */
if(!s.isUnsubscribed()) {
transformedItem = myOperatorTransformOperation(item);
s.onNext(transformedItem);
}
}
};
}
}
變換操作符
下面的例子向你展示了怎樣使用 compose(?)
操作符將你得自定義操作符(在這個(gè)例子中览爵,是一個(gè)名叫myTransformer
的操作符颂斜,它將一個(gè)發(fā)射整數(shù)的Observable轉(zhuǎn)換為發(fā)射字符串的)與標(biāo)準(zhǔn)的RxJava操作符(如ofType
和map
)一起使用:
fooObservable = barObservable.ofType(Integer).map({it*2}).compose(new MyTransformer<Integer,String>()).map({"transformed by myOperator: " + it});
下面這部分向你展示了你的操作符的腳手架形式,以便它能正確的與compose()
搭配使用拾枣。
實(shí)現(xiàn)你的變換器
將你的自定義操作符定義為實(shí)現(xiàn)了 Transformer
接口的一個(gè)公開類沃疮,就像這樣:
public class MyTransformer<Integer,String> implements Transformer<Integer,String> {
public MyTransformer( /* any necessary params here */ ) {
/* 這里添加必要的初始化代碼 */
}
@Override
public Observable<String> call(Observable<Integer> source) {
/*
* 這個(gè)簡(jiǎn)單的例子Transformer應(yīng)用一個(gè)map操作,
* 這個(gè)map操作將發(fā)射整數(shù)變換為發(fā)射整數(shù)的字符串表示梅肤。
*/
return source.map( new Func1<Integer,String>() {
@Override
public String call(Integer t1) {
return String.valueOf(t1);
}
} );
}
}
參見
其它需要考慮的
在發(fā)射任何數(shù)據(jù)(或者通知)給訂閱者之前司蔬,你的序列操作符可能需要檢查它的
Subscriber.isUnsubscribed(?)
狀態(tài),如果沒有訂閱者了姨蝴,沒必要浪費(fèi)時(shí)間生成數(shù)據(jù)項(xiàng)俊啼。-
請(qǐng)注意:你的序列操作符必須復(fù)合Observable協(xié)議的核心原則:
- 它可能調(diào)用訂閱者的
onNext(?)
方法任意次,但是這些調(diào)用必須是不重疊的左医。 - 它只能調(diào)用訂閱者的
onCompleted(?)
或onError(?)
正好一次授帕,但不能都調(diào)用,而且不能在這之后調(diào)用訂閱者的onNext(?)
方法浮梢。 - 如果你不能保證你得操作符遵從這兩個(gè)原則跛十,你可以給它添加
serialize(?)
操作符,它會(huì)強(qiáng)制保持正確的行為秕硝。
- 它可能調(diào)用訂閱者的
請(qǐng)關(guān)注這里 Issue #1962 &mdash芥映;需要有一個(gè)計(jì)劃創(chuàng)建一個(gè)測(cè)試腳手架,你可以用它來寫測(cè)試驗(yàn)證你的新操作符遵從了Observable協(xié)議。
不要讓你的操作符阻塞別的操作奈偏。
When possible, you should compose new operators by combining existing operators, rather than implementing them with new code. RxJava itself does this with some of its standard operators, for example:
-
如果可能坞嘀,你應(yīng)該組合現(xiàn)有的操作符創(chuàng)建你的新操作符,而不是從零開始實(shí)現(xiàn)它惊来。RxJava自身的標(biāo)準(zhǔn)操作符也是這樣做的丽涩,例如:
-
first(?)
被定義為take(1).single(?)
-
ignoreElements(?)
被定義為filter(alwaysFalse(?))
-
reduce(a)
被定義為scan(a).last(?)
-
如果你的操作符使用了函數(shù)或者lambda表達(dá)式作為參數(shù),請(qǐng)注意它們可能是異常的來源裁蚁,而且要準(zhǔn)備好捕獲這些異常矢渊,并且使用
onError()
通知訂閱者。
某些異常被認(rèn)為是致命的厘擂,對(duì)它們來說昆淡,調(diào)用
onError()
毫無意義,那樣或者是無用的刽严,或者只是對(duì)問題的妥協(xié)昂灵。你可以使用Exceptions.throwIfFatal(throwable)
方法過濾掉這些致命的異常,并重新拋出它們舞萄,而不是試圖發(fā)射關(guān)于它們的通知眨补。一般說來,一旦發(fā)生錯(cuò)誤應(yīng)立即通知訂閱者倒脓,而不是首先嘗試發(fā)射更多的數(shù)據(jù)撑螺。
請(qǐng)注意
null
可能是Observable發(fā)射的一個(gè)合法數(shù)據(jù)。頻繁發(fā)生錯(cuò)誤的一個(gè)來源是:測(cè)試一些變量并且將持有一個(gè)非null
值作為是否發(fā)射了數(shù)據(jù)的替代崎弃。一個(gè)值為null
的數(shù)據(jù)仍然是一個(gè)發(fā)射數(shù)據(jù)項(xiàng)甘晤,它與沒有發(fā)射任何東西是不能等同的。想讓你的操作符在反壓(backpressure)場(chǎng)景中變得得好可能會(huì)非常棘手饲做∠呋椋可以參考Dávid Karnok的博客 Advanced RxJava,這里有一個(gè)涉及到的各種因素和怎樣處理它們的很值得看的討論盆均。
插件讓你可以用多種方式修改RxJava的默認(rèn)行為:
- 修改默認(rèn)的計(jì)算塞弊、IO和新線程調(diào)度器集合
- 為RxJava可能遇到的特殊錯(cuò)誤注冊(cè)一個(gè)錯(cuò)誤處理器
- 注冊(cè)一個(gè)函數(shù)記錄一些常規(guī)RxJava活動(dòng)的發(fā)生
RxJavaSchedulersHook
這個(gè)插件讓你可以使用你選擇的調(diào)度器覆蓋默認(rèn)的計(jì)算、IO和新線程調(diào)度 (Scheduler
)泪姨,要做到這些游沿,需要繼承 RxJavaSchedulersHook
類并覆寫這些方法:
Scheduler getComputationScheduler(?)
Scheduler getIOScheduler(?)
Scheduler getNewThreadScheduler(?)
Action0 onSchedule(action)
然后是下面這些步驟:
- 創(chuàng)建一個(gè)你實(shí)現(xiàn)的
RxJavaSchedulersHook
子類的對(duì)象。 - 使用
RxJavaPlugins.getInstance(?)
獲取全局的RxJavaPlugins對(duì)象肮砾。 - 將你的默認(rèn)調(diào)度器對(duì)象傳遞給
RxJavaPlugins
的registerSchedulersHook(?)
方法诀黍。
完成這些后,RxJava會(huì)開始使用你的方法返回的調(diào)度器唇敞,而不是內(nèi)置的默認(rèn)調(diào)度器蔗草。
RxJavaErrorHandler
這個(gè)插件讓你可以注冊(cè)一個(gè)函數(shù)處理傳遞給 Subscriber.onError(Throwable)
的錯(cuò)誤咒彤。要做到這一點(diǎn)疆柔,需要繼承 RxJavaErrorHandler
類并覆寫這個(gè)方法:
void handleError(Throwable e)
然后是下面這些步驟:
- 創(chuàng)建一個(gè)你實(shí)現(xiàn)的
RxJavaErrorHandler
子類的對(duì)象咒精。 - 使用
RxJavaPlugins.getInstance(?)
獲取全局的RxJavaPlugins對(duì)象。 - 將你的錯(cuò)誤處理器對(duì)象傳遞給
RxJavaPlugins
的registerErrorHandler(?)
方法旷档。
完成這些后模叙,RxJava會(huì)開始使用你的錯(cuò)誤處理器處理傳遞給 Subscriber.onError(Throwable)
的錯(cuò)誤。
RxJavaObservableExecutionHook
這個(gè)插件讓你可以注冊(cè)一個(gè)函數(shù)用于記錄日志或者性能數(shù)據(jù)收集鞋屈,RxJava在某些常規(guī)活動(dòng)時(shí)會(huì)調(diào)用它范咨。要做到這一點(diǎn),需要繼承 RxJavaObservableExecutionHook
類并覆寫這些方法:
方法 | 何時(shí)調(diào)用 |
---|---|
onCreate(?) |
在 Observable.create(?) 方法中 |
onSubscribeStart(?) |
在 Observable.subscribe(?) 之前立刻 |
onSubscribeReturn(?) |
在 Observable.subscribe(?) 之后立刻 |
onSubscribeError(?) |
在Observable.subscribe(?) 執(zhí)行失敗時(shí) |
onLift(?) |
在Observable.lift(?) 方法中 |
然后是下面這些步驟:
- 創(chuàng)建一個(gè)你實(shí)現(xiàn)的
RxJavaObservableExecutionHook
子類的對(duì)象厂庇。 - 使用
RxJavaPlugins.getInstance(?)
獲取全局的RxJavaPlugins對(duì)象渠啊。 - 將你的Hook對(duì)象傳遞給
RxJavaPlugins
的registerObservableExecutionHook(?)
方法。
When you do this, RxJava will begin to call your functions when it encounters the specific conditions they were designed to take note of. 完成這些后权旷,在滿足某些特殊的條件時(shí)替蛉,RxJava會(huì)開始調(diào)用你的方法。
背壓?jiǎn)栴}
背壓是指在異步場(chǎng)景中拄氯,被觀察者發(fā)送事件速度遠(yuǎn)快于觀察者的處理速度的情況下躲查,一種告訴上游的被觀察者降低發(fā)送速度的策略
簡(jiǎn)而言之,背壓是流速控制的一種策略译柏。
需要強(qiáng)調(diào)兩點(diǎn):
- 背壓策略的一個(gè)前提是異步環(huán)境镣煮,也就是說,被觀察者和觀察者處在不同的線程環(huán)境中鄙麦。
- 背壓(Backpressure)并不是一個(gè)像flatMap一樣可以在程序中直接使用的操作符典唇,他只是一種控制事件流速的策略。
響應(yīng)式拉瓤韪(reactive pull)
首先我們回憶之前那篇《關(guān)于Rxjava最友好的文章》介衔,里面其實(shí)提到,在RxJava的觀察者模型中盟劫,被觀察者是主動(dòng)的推送數(shù)據(jù)給觀察者夜牡,觀察者是被動(dòng)接收的。而響應(yīng)式拉取則反過來侣签,觀察者主動(dòng)從被觀察者那里去拉取數(shù)據(jù)塘装,而被觀察者變成被動(dòng)的等待通知再發(fā)送數(shù)據(jù)。
結(jié)構(gòu)示意圖如下:
[圖片上傳失敗...(image-cccb92-1572187975155)]
觀察者可以根據(jù)自身實(shí)際情況按需拉取數(shù)據(jù)影所,而不是被動(dòng)接收(也就相當(dāng)于告訴上游觀察者把速度慢下來)蹦肴,最終實(shí)現(xiàn)了上游被觀察者發(fā)送事件的速度的控制,實(shí)現(xiàn)了背壓的策略猴娩。
源碼
public class FlowableOnBackpressureBufferStategy{
...
@Override
public void onNext(T t) {
if (done) {
return;
}
boolean callOnOverflow = false;
boolean callError = false;
Deque<T> dq = deque;
synchronized (dq) {
if (dq.size() == bufferSize) {
switch (strategy) {
case DROP_LATEST:
dq.pollLast();
dq.offer(t);
callOnOverflow = true;
break;
case DROP_OLDEST:
dq.poll();
dq.offer(t);
callOnOverflow = true;
break;
default:
// signal error
callError = true;
break;
}
} else {
dq.offer(t);
}
}
if (callOnOverflow) {
if (onOverflow != null) {
try {
onOverflow.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.cancel();
onError(ex);
}
}
} else if (callError) {
s.cancel();
onError(new MissingBackpressureException());
} else {
drain();
}
}
...
}
在這段源碼中阴幌,根據(jù)不同的背壓策略進(jìn)行了不同的處理措施勺阐,當(dāng)然這只是列舉了一段關(guān)于buffer背壓策略的例子。
根源
產(chǎn)生背壓?jiǎn)栴}的根源就是上游發(fā)送速度與下游的處理速度不均導(dǎo)致的矛双,所以如果想要解決這個(gè)問題就需要通過匹配兩個(gè)速率達(dá)到解決這個(gè)背壓根源的措施渊抽。
通常有兩個(gè)策略可供使用:
- 從數(shù)量上解決,對(duì)數(shù)據(jù)進(jìn)行采樣
- 從速度上解決议忽,降低發(fā)送事件的速率
- 利用flowable和subscriber
使用Flowable
Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Log.d(TAG, "emit complete");
emitter.onComplete();
}
}, BackpressureStrategy.ERROR); //增加了一個(gè)參數(shù)
Subscriber<Integer> downstream = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
s.request(Long.MAX_VALUE); //注意這句代碼
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
upstream.subscribe(downstream);
我們注意到這次和Observable
有些不同. 首先是創(chuàng)建Flowable
的時(shí)候增加了一個(gè)參數(shù), 這個(gè)參數(shù)是用來選擇背壓,也就是出現(xiàn)上下游流速不均衡的時(shí)候應(yīng)該怎么處理的辦法, 這里我們直接用BackpressureStrategy.ERROR
這種方式, 這種方式會(huì)在出現(xiàn)上下游流速不均衡的時(shí)候直接拋出一個(gè)異常,這個(gè)異常就是著名的MissingBackpressureException
. 其余的策略后面再來講解.
另外的一個(gè)區(qū)別是在下游的onSubscribe
方法中傳給我們的不再是Disposable
了, 而是Subscription
, 它倆有什么區(qū)別呢, 首先它們都是上下游中間的一個(gè)開關(guān), 之前我們說調(diào)用Disposable.dispose()
方法可以切斷水管, 同樣的調(diào)用Subscription.cancel()
也可以切斷水管, 不同的地方在于Subscription
增加了一個(gè)void request(long n)
方法, 這個(gè)方法有什么用呢, 在上面的代碼中也有這么一句代碼:
s.request(Long.MAX_VALUE);
這是因?yàn)?code>Flowable在設(shè)計(jì)的時(shí)候采用了一種新的思路也就是響應(yīng)式拉取
的方式來更好的解決上下游流速不均衡的問題, 與我們之前所講的控制數(shù)量
和控制速度
不太一樣, 這種方式用通俗易懂的話來說就好比是葉問打鬼子
, 我們把上游
看成小日本
, 把下游
當(dāng)作葉問
, 當(dāng)調(diào)用Subscription.request(1)
時(shí), 葉問
就說我要打一個(gè)!
然后小日本
就拿出一個(gè)鬼子
給葉問, 讓他打, 等葉問打死這個(gè)鬼子之后, 再次調(diào)用request(10)
, 葉問就又說我要打十個(gè)!
然后小日本又派出十個(gè)鬼子
給葉問, 然后就在邊上看熱鬧, 看葉問能不能打死十個(gè)鬼子, 等葉問打死十個(gè)鬼子后再繼續(xù)要鬼子接著打...
所以我們把request當(dāng)做是一種能力, 當(dāng)成下游處理事件
的能力, 下游能處理幾個(gè)就告訴上游我要幾個(gè), 這樣只要上游根據(jù)下游的處理能力來決定發(fā)送多少事件, 就不會(huì)造成一窩蜂的發(fā)出一堆事件來, 從而導(dǎo)致OOM. 這也就完美的解決之前我們所學(xué)到的兩種方式的缺陷, 過濾事件會(huì)導(dǎo)致事件丟失, 減速又可能導(dǎo)致性能損失. 而這種方式既解決了事件丟失的問題, 又解決了速度的問題, 完美 !
同步情況
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) { //無限循環(huán)發(fā)事件
emitter.onNext(i);
}
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Thread.sleep(2000);
Log.d(TAG, "" + integer);
}
});
當(dāng)上下游工作在同一個(gè)線程
中時(shí), 這時(shí)候是一個(gè)同步
的訂閱關(guān)系, 也就是說上游
每發(fā)送一個(gè)事件必須
等到下游
接收處理完了以后才能接著發(fā)送下一個(gè)事件.
同步與異步的區(qū)別就在于有沒有緩存發(fā)送事件的緩沖區(qū)懒闷。
異步情況
通過subscribeOn和observeOn來確定對(duì)應(yīng)的線程,達(dá)到異步的效果栈幸,異步時(shí)會(huì)有一個(gè)對(duì)應(yīng)的緩存區(qū)來換從從上游發(fā)送的事件愤估。
public enum BackpressureStrategy {
/**
* OnNext events are written without any buffering or dropping.
* Downstream has to deal with any overflow.
* <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
*/
MISSING,
/**
* Signals a MissingBackpressureException in case the downstream can't keep up.
*/
ERROR,
/**
* Buffers <em>all</em> onNext values until the downstream consumes it.
*/
BUFFER,
/**
* Drops the most recent onNext value if the downstream can't keep up.
*/
DROP,
/**
* Keeps only the latest onNext value, overwriting any previous value if the
* downstream can't keep up.
*/
LATEST
}
背壓策略:
- error, 緩沖區(qū)大概在128
- buffer速址, 緩沖區(qū)在1000左右
- drop玩焰, 把存不下的事件丟棄
- latest, 只保留最新的
- missing, 缺省設(shè)置芍锚,不做任何操作
上游從哪里得知下游的處理能力呢昔园?我們來看看上游最重要的部分,肯定就是FlowableEmitter
了啊闹炉,我們就是通過它來發(fā)送事件的啊蒿赢,來看看它的源碼吧(別緊張,它的代碼灰常簡(jiǎn)單):
public interface FlowableEmitter<T> extends Emitter<T> {
void setDisposable(Disposable s);
void setCancellable(Cancellable c);
/**
* The current outstanding request amount.
* <p>This method is thread-safe.
* @return the current outstanding request amount
*/
long requested();
boolean isCancelled();
FlowableEmitter<T> serialize();
}
FlowableEmitter是個(gè)接口渣触,繼承Emitter羡棵,Emitter里面就是我們的onNext(),onComplete()和onError()三個(gè)方法。我們看到FlowableEmitter中有這么一個(gè)方法:
long requested();
同步request.png
這張圖的意思就是當(dāng)上下游在同一個(gè)線程中的時(shí)候嗅钻,在下游
調(diào)用request(n)就會(huì)直接改變上游
中的requested的值皂冰,多次調(diào)用便會(huì)疊加這個(gè)值,而上游每發(fā)送一個(gè)事件之后便會(huì)去減少這個(gè)值养篓,當(dāng)這個(gè)值減少至0的時(shí)候秃流,繼續(xù)發(fā)送事件便會(huì)拋異常了。
異步request.png
可以看到柳弄,當(dāng)上下游工作在不同的線程里時(shí)舶胀,每一個(gè)線程里都有一個(gè)requested,而我們調(diào)用request(1000)時(shí)碧注,實(shí)際上改變的是下游主線程中的requested嚣伐,而上游中的requested的值是由RxJava內(nèi)部調(diào)用request(n)去設(shè)置的,這個(gè)調(diào)用會(huì)在合適的時(shí)候自動(dòng)觸發(fā)萍丐。
Rxjava實(shí)例開發(fā)應(yīng)用
- 網(wǎng)絡(luò)請(qǐng)求處理(輪詢轩端,嵌套,出錯(cuò)重連)
- 功能防抖
- 從多級(jí)緩存獲取數(shù)據(jù)
- 合并數(shù)據(jù)源
- 聯(lián)合判斷
- 與 Retrofit,RxBinding,EventBus結(jié)合使用
Rxjava原理
- Scheduler線程切換工作原理
- 數(shù)據(jù)的發(fā)送與接收(觀察者模式)
- lift的工作原理
- map的工作原理
- flatMap的工作原理
- merge的工作原理
- concat的工作原理
結(jié)完逝变。