Rxjava操作符五

nest

nest

nest操作符有一個(gè)特殊的用途:將一個(gè)Observable轉(zhuǎn)換為一個(gè)發(fā)射這個(gè)Observable的Observable蹋盆。

條件和布爾操作

這個(gè)頁面的操作符可用于根據(jù)條件發(fā)射或變換Observables同欠,或者對(duì)它們做布爾運(yùn)算:

條件操作符

  • amb(?) — 給定多個(gè)Observable,只讓第一個(gè)發(fā)射數(shù)據(jù)的Observable發(fā)射全部數(shù)據(jù)
  • defaultIfEmpty(?) — 發(fā)射來自原始Observable的數(shù)據(jù)震嫉,如果原始Observable沒有發(fā)射數(shù)據(jù)茵瘾,就發(fā)射一個(gè)默認(rèn)數(shù)據(jù)
  • (rxjava-computation-expressions) doWhile(?) — 發(fā)射原始Observable的數(shù)據(jù)序列,然后重復(fù)發(fā)射這個(gè)序列直到不滿足這個(gè)條件為止
  • (rxjava-computation-expressions) ifThen(?) — 只有當(dāng)某個(gè)條件為真時(shí)才發(fā)射原始Observable的數(shù)據(jù)序列,否則發(fā)射一個(gè)空的或默認(rèn)的序列
  • skipUntil(?) — 丟棄原始Observable發(fā)射的數(shù)據(jù)丙号,直到第二個(gè)Observable發(fā)射了一個(gè)數(shù)據(jù),然后發(fā)射原始Observable的剩余數(shù)據(jù)
  • skipWhile(?) — 丟棄原始Observable發(fā)射的數(shù)據(jù)缰冤,直到一個(gè)特定的條件為假犬缨,然后發(fā)射原始Observable剩余的數(shù)據(jù)
  • (rxjava-computation-expressions) switchCase(?) — 基于一個(gè)計(jì)算結(jié)果,發(fā)射一個(gè)指定Observable的數(shù)據(jù)序列
  • takeUntil(?) — 發(fā)射來自原始Observable的數(shù)據(jù)棉浸,直到第二個(gè)Observable發(fā)射了一個(gè)數(shù)據(jù)或一個(gè)通知
  • takeWhile(?) and takeWhileWithIndex(?) — 發(fā)射原始Observable的數(shù)據(jù)遍尺,直到一個(gè)特定的條件為真,然后跳過剩余的數(shù)據(jù)
  • (rxjava-computation-expressions) whileDo(?) — 如果條件為true涮拗,則發(fā)射源Observable數(shù)據(jù)序列乾戏,并且只要條件保持為true就重復(fù)發(fā)射此數(shù)據(jù)序列

(rxjava-computation-expressions) — 表示這個(gè)操作符當(dāng)前是可選包 rxjava-computation-expressions 的一部分,還沒有包含在標(biāo)準(zhǔn)RxJava的操作符集合里

布爾操作符

  • all(?) — 判斷是否所有的數(shù)據(jù)項(xiàng)都滿足某個(gè)條件
  • contains(?) — 判斷Observable是否會(huì)發(fā)射一個(gè)指定的值
  • exists(?) and isEmpty(?) — 判斷Observable是否發(fā)射了一個(gè)值
  • sequenceEqual(?) — 判斷兩個(gè)Observables發(fā)射的序列是否相等

條件和布爾操作

All

判定是否Observable發(fā)射的所有數(shù)據(jù)都滿足某個(gè)條件

all

傳遞一個(gè)謂詞函數(shù)給All操作符,這個(gè)函數(shù)接受原始Observable發(fā)射的數(shù)據(jù),根據(jù)計(jì)算返回一個(gè)布爾值帆啃。All返回一個(gè)只發(fā)射一個(gè)單個(gè)布爾值的Observable台囱,如果原始Observable正常終止并且每一項(xiàng)數(shù)據(jù)都滿足條件,就返回true;如果原始Observable的任何一項(xiàng)數(shù)據(jù)不滿足條件就返回False。

all

RxJava將這個(gè)操作符實(shí)現(xiàn)為all,它默認(rèn)不在任何特定的調(diào)度器上執(zhí)行摆出。

Amb

給定兩個(gè)或多個(gè)Observables,它只發(fā)射首先發(fā)射數(shù)據(jù)或通知的那個(gè)Observable的所有數(shù)據(jù)

amb

當(dāng)你傳遞多個(gè)Observable給Amb時(shí)首妖,它只發(fā)射其中一個(gè)Observable的數(shù)據(jù)和通知:首先發(fā)送通知給Amb的那個(gè)偎漫,不管發(fā)射的是一項(xiàng)數(shù)據(jù)還是一個(gè)onErroronCompleted通知。Amb將忽略和丟棄其它所有Observables的發(fā)射物有缆。

amb

RxJava的實(shí)現(xiàn)是amb象踊,有一個(gè)類似的對(duì)象方法ambWith温亲。例如,Observable.amb(o1,o2)o1.ambWith(o2)是等價(jià)的杯矩。

這個(gè)操作符默認(rèn)不在任何特定的調(diào)度器上執(zhí)行栈虚。

Contains

判定一個(gè)Observable是否發(fā)射一個(gè)特定的值

contains

Contains傳一個(gè)指定的值,如果原始Observable發(fā)射了那個(gè)值史隆,它返回的Observable將發(fā)射true魂务,否則發(fā)射false。

相關(guān)的一個(gè)操作符IsEmpty用于判定原始Observable是否沒有發(fā)射任何數(shù)據(jù)泌射。

contains

contains默認(rèn)不在任何特定的調(diào)度器上執(zhí)行粘姜。

exists

RxJava中還有一個(gè)exists操作符,它通過一個(gè)謂詞函數(shù)測(cè)試原始Observable發(fā)射的數(shù)據(jù)魄幕,只要任何一項(xiàng)滿足條件就返回一個(gè)發(fā)射true的Observable相艇,否則返回一個(gè)發(fā)射false的Observable颖杏。

exists默認(rèn)不在任何特定的調(diào)度器上執(zhí)行纯陨。

isEmpty

isEmpty默認(rèn)不在任何特定的調(diào)度器上執(zhí)行。

DefaultIfEmpty

發(fā)射來自原始Observable的值留储,如果原始Observable沒有發(fā)射任何值翼抠,就發(fā)射一個(gè)默認(rèn)值

defaultIfEmtpy

DefaultIfEmpty簡(jiǎn)單的精確地發(fā)射原始Observable的值,如果原始Observable沒有發(fā)射任何數(shù)據(jù)正常終止(以onCompletedd的形式)获讳,DefaultIfEmpty返回的Observable就發(fā)射一個(gè)你提供的默認(rèn)值阴颖。

RxJava將這個(gè)操作符實(shí)現(xiàn)為defaultIfEmpty。它默認(rèn)不在任何特定的調(diào)度器上執(zhí)行丐膝。

還有一個(gè)新的操作符switchIfEmpty量愧,不在RxJava 1.0.0版中,它和defaultIfEmtpy類似帅矗,不同的是偎肃,如果原始Observable沒有發(fā)射數(shù)據(jù),它發(fā)射一個(gè)備用Observable的發(fā)射物浑此。

SequenceEqual

判定兩個(gè)Observables是否發(fā)射相同的數(shù)據(jù)序列累颂。

sequenceEqual

傳遞兩個(gè)Observable給SequenceEqual操作符,它會(huì)比較兩個(gè)Observable的發(fā)射物凛俱,如果兩個(gè)序列是相同的(相同的數(shù)據(jù)紊馏,相同的順序,相同的終止?fàn)顟B(tài))蒲犬,它就發(fā)射true朱监,否則發(fā)射false。

sequenceEqual

它還有一個(gè)版本接受第三個(gè)參數(shù)原叮,可以傳遞一個(gè)函數(shù)用于比較兩個(gè)數(shù)據(jù)項(xiàng)是否相同赌朋。

這個(gè)操作符默認(rèn)不在任何特定的調(diào)度器上執(zhí)行凰狞。

SkipUntil

丟棄原始Observable發(fā)射的數(shù)據(jù),直到第二個(gè)Observable發(fā)射了一項(xiàng)數(shù)據(jù)

skipUntil

SkipUntil訂閱原始的Observable沛慢,但是忽略它的發(fā)射物赡若,直到第二個(gè)Observable發(fā)射了一項(xiàng)數(shù)據(jù)那一刻,它開始發(fā)射原始Observable团甲。

RxJava中對(duì)應(yīng)的是skipUntil逾冬,它默認(rèn)不在任何特定的調(diào)度器上執(zhí)行。

SkipWhile

丟棄Observable發(fā)射的數(shù)據(jù)躺苦,直到一個(gè)指定的條件不成立

skipWhile

SkipWhile訂閱原始的Observable身腻,但是忽略它的發(fā)射物,直到你指定的某個(gè)條件變?yōu)閒alse的那一刻匹厘,它開始發(fā)射原始Observable嘀趟。

skipWhile默認(rèn)不在任何特定的調(diào)度器上執(zhí)行。

TakeUntil

當(dāng)?shù)诙€(gè)Observable發(fā)射了一項(xiàng)數(shù)據(jù)或者終止時(shí)愈诚,丟棄原始Observable發(fā)射的任何數(shù)據(jù)

takeUntil

TakeUntil訂閱并開始發(fā)射原始Observable她按,它還監(jiān)視你提供的第二個(gè)Observable。如果第二個(gè)Observable發(fā)射了一項(xiàng)數(shù)據(jù)或者發(fā)射了一個(gè)終止通知炕柔,TakeUntil返回的Observable會(huì)停止發(fā)射原始Observable并終止酌泰。

takeUntil

RxJava中的實(shí)現(xiàn)是takeUntil。注意:第二個(gè)Observable發(fā)射一項(xiàng)數(shù)據(jù)或一個(gè)onError通知或一個(gè)onCompleted通知都會(huì)導(dǎo)致takeUntil停止發(fā)射數(shù)據(jù)匕累。

takeUntil默認(rèn)不在任何特定的調(diào)度器上執(zhí)行陵刹。

takeUntil

還有一個(gè)版本的takeUntil,不在RxJava 1.0.0版中欢嘿,它使用一個(gè)謂詞函數(shù)而不是第二個(gè)Observable來判定是否需要終止發(fā)射數(shù)據(jù)衰琐,它的行為類似于takeWhile

TakeWhile

發(fā)射Observable發(fā)射的數(shù)據(jù)炼蹦,直到一個(gè)指定的條件不成立

takeWhile

TakeWhile發(fā)射原始Observable羡宙,直到你指定的某個(gè)條件不成立的那一刻,它停止發(fā)射原始Observable框弛,并終止自己的Observable辛辨。

RxJava中的takeWhile操作符返回一個(gè)鏡像原始Observable行為的Observable,直到某一項(xiàng)數(shù)據(jù)你指定的函數(shù)返回false那一刻瑟枫,這個(gè)新的Observable發(fā)射onCompleted終止通知斗搞。

takeWhile默認(rèn)不在任何特定的調(diào)度器上執(zhí)行。

算術(shù)和聚合操作

本頁展示的操作符用于對(duì)整個(gè)序列執(zhí)行算法操作或其它操作慷妙,由于這些操作必須等待數(shù)據(jù)發(fā)射完成(通常也必須緩存這些數(shù)據(jù))僻焚,它們對(duì)于非常長(zhǎng)或者無限的序列來說是危險(xiǎn)的,不推薦使用膝擂。

rxjava-math 模塊的操作符

其它聚合操作符

  • concat(?) — 順序連接多個(gè)Observables
  • count(?) and countLong(?) — 計(jì)算數(shù)據(jù)項(xiàng)的個(gè)數(shù)并發(fā)射結(jié)果
  • reduce(?) — 對(duì)序列使用reduce()函數(shù)并發(fā)射最終的結(jié)果
  • collect(?) — 將原始Observable發(fā)射的數(shù)據(jù)放到一個(gè)單一的可變的數(shù)據(jù)結(jié)構(gòu)中虑啤,然后返回一個(gè)發(fā)射這個(gè)數(shù)據(jù)結(jié)構(gòu)的Observable
  • toList(?) — 收集原始Observable發(fā)射的所有數(shù)據(jù)到一個(gè)列表隙弛,然后返回這個(gè)列表
  • toSortedList(?) — 收集原始Observable發(fā)射的所有數(shù)據(jù)到一個(gè)有序列表,然后返回這個(gè)列表
  • toMap(?) — 將序列數(shù)據(jù)轉(zhuǎn)換為一個(gè)Map狞山,Map的key是根據(jù)一個(gè)函數(shù)計(jì)算的
  • toMultiMap(?) — 將序列數(shù)據(jù)轉(zhuǎn)換為一個(gè)列表全闷,同時(shí)也是一個(gè)Map,Map的key是根據(jù)一個(gè)函數(shù)計(jì)算的

算術(shù)和聚合操作

Average

計(jì)算原始Observable發(fā)射數(shù)字的平均值并發(fā)射它

average

Average操作符操作符一個(gè)發(fā)射數(shù)字的Observable萍启,并發(fā)射單個(gè)值:原始Observable發(fā)射的數(shù)字序列的平均值总珠。

這個(gè)操作符不包含在RxJava核心模塊中,它屬于不同的rxjava-math模塊勘纯。它被實(shí)現(xiàn)為四個(gè)操作符:averageDouble, averageFloat, averageInteger, averageLong局服。

average

如果原始Observable不發(fā)射任何數(shù)據(jù),這個(gè)操作符會(huì)拋異常:IllegalArgumentException驳遵。

Min

發(fā)射原始Observable的最小值

min

Min操作符操作一個(gè)發(fā)射數(shù)值的Observable并發(fā)射單個(gè)值:最小的那個(gè)值淫奔。

RxJava中,min屬于rxjava-math模塊堤结。

min

min接受一個(gè)可選參數(shù)唆迁,用于比較兩項(xiàng)數(shù)據(jù)的大小,如果最小值的數(shù)據(jù)超過一項(xiàng)霍殴,min會(huì)發(fā)射原始Observable最近發(fā)射的那一項(xiàng)媒惕。

minBy

minBy類似于min系吩,但是它發(fā)射的不是最小值来庭,而是發(fā)射Key最小的項(xiàng),Key由你指定的一個(gè)函數(shù)生成穿挨。

Max

發(fā)射原始Observable的最大值

max

Max操作符操作一個(gè)發(fā)射數(shù)值的Observable并發(fā)射單個(gè)值:最大的那個(gè)值月弛。

RxJava中,max屬于rxjava-math模塊科盛。

max

max接受一個(gè)可選參數(shù)帽衙,用于比較兩項(xiàng)數(shù)據(jù)的大小,如果最大值的數(shù)據(jù)超過一項(xiàng)贞绵,max會(huì)發(fā)射原始Observable最近發(fā)射的那一項(xiàng)厉萝。

maxBy

maxBy類似于max,但是它發(fā)射的不是最大值榨崩,而是發(fā)射Key最大的項(xiàng)谴垫,Key由你指定的一個(gè)函數(shù)生成。

Count

計(jì)算原始Observable發(fā)射物的數(shù)量母蛛,然后只發(fā)射這個(gè)值

count

Count操作符將一個(gè)Observable轉(zhuǎn)換成一個(gè)發(fā)射單個(gè)值的Observable翩剪,這個(gè)值表示原始Observable發(fā)射的數(shù)據(jù)的數(shù)量。

如果原始Observable發(fā)生錯(cuò)誤終止彩郊,Count不發(fā)射數(shù)據(jù)而是直接傳遞錯(cuò)誤通知前弯。如果原始Observable永遠(yuǎn)不終止蚪缀,Count既不會(huì)發(fā)射數(shù)據(jù)也不會(huì)終止。

RxJava的實(shí)現(xiàn)是countcountLong恕出。

示例代碼

String[] items = new String[] { "one", "two", "three" };
assertEquals( new Integer(3), Observable.from(items).count().toBlocking().single() );

Sum

計(jì)算Observable發(fā)射的數(shù)值的和并發(fā)射這個(gè)和

sum

Sum操作符操作一個(gè)發(fā)射數(shù)值的Observable询枚,僅發(fā)射單個(gè)值:原始Observable所有數(shù)值的和。

RxJava的實(shí)現(xiàn)是sumDouble, sumFloat, sumInteger, sumLong浙巫,它們不是RxJava核心模塊的一部分哩盲,屬于rxjava-math模塊。

sum.f

你可以使用一個(gè)函數(shù)狈醉,計(jì)算Observable每一項(xiàng)數(shù)據(jù)的函數(shù)返回值的和廉油。

StringObservable類(這個(gè)類不是RxJava核心模塊的一部分)中有一個(gè)stringConcat操作符,它將一個(gè)發(fā)射字符串序列的Observable轉(zhuǎn)換為一個(gè)發(fā)射單個(gè)字符串的Observable苗傅,后者這個(gè)字符串表示的是前者所有字符串的連接抒线。

St.join

StringObservable類還有一個(gè)join操作符,它將一個(gè)發(fā)射字符串序列的Observable轉(zhuǎn)換為一個(gè)發(fā)射單個(gè)字符串的Observable渣慕,后者這個(gè)字符串表示的是前者所有字符串以你指定的分界符連接的結(jié)果嘶炭。

Concat

不交錯(cuò)的發(fā)射兩個(gè)或多個(gè)Observable的發(fā)射物

concat

Concat操作符連接多個(gè)Observable的輸出,就好像它們是一個(gè)Observable逊桦,第一個(gè)Observable發(fā)射的所有數(shù)據(jù)在第二個(gè)Observable發(fā)射的任何數(shù)據(jù)前面眨猎,以此類推。

直到前面一個(gè)Observable終止强经,Concat才會(huì)訂閱額外的一個(gè)Observable睡陪。注意:因此,如果你嘗試連接一個(gè)"熱"Observable(這種Observable在創(chuàng)建后立即開始發(fā)射數(shù)據(jù)匿情,即使沒有訂閱者)兰迫,Concat將不會(huì)看到也不會(huì)發(fā)射它之前發(fā)射的任何數(shù)據(jù)。

在ReactiveX的某些實(shí)現(xiàn)中有一種ConcatMap操作符(名字可能叫concat_all, concat_map, concatMapObserver, for, forIn/for_in, mapcat, selectConcatselectConcatObserver)炬称,他會(huì)變換原始Observable發(fā)射的數(shù)據(jù)到一個(gè)對(duì)應(yīng)的Observable汁果,然后再按觀察和變換的順序進(jìn)行連接操作。

StartWith操作符類似于Concat玲躯,但是它是插入到前面据德,而不是追加那些Observable的數(shù)據(jù)到原始Observable發(fā)射的數(shù)據(jù)序列。

Merge操作符也差不多跷车,它結(jié)合兩個(gè)或多個(gè)Observable的發(fā)射物棘利,但是數(shù)據(jù)可能交錯(cuò),而Concat不會(huì)讓多個(gè)Observable的發(fā)射物交錯(cuò)姓赤。

concat

RxJava中的實(shí)現(xiàn)叫concat赡译。

還有一個(gè)實(shí)例方法叫concatWith,這兩者是等價(jià)的:Observable.concat(a,b)a.concatWith(b)不铆。

Reduce

按順序?qū)bservable發(fā)射的每項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù)并發(fā)射最終的值

reduce

Reduce操作符對(duì)原始Observable發(fā)射數(shù)據(jù)的第一項(xiàng)應(yīng)用一個(gè)函數(shù)蝌焚,然后再將這個(gè)函數(shù)的返回值與第二項(xiàng)數(shù)據(jù)一起傳遞給函數(shù)裹唆,以此類推,持續(xù)這個(gè)過程知道原始Observable發(fā)射它的最后一項(xiàng)數(shù)據(jù)并終止只洒,此時(shí)Reduce返回的Observable發(fā)射這個(gè)函數(shù)返回的最終值许帐。

在其它場(chǎng)景中,這種操作有時(shí)被稱為累積毕谴,聚集成畦,壓縮折疊涝开,注射等循帐。

reduce

注意如果原始Observable沒有發(fā)射任何數(shù)據(jù),reduce拋出異常IllegalArgumentException舀武。

reduce默認(rèn)不在任何特定的調(diào)度器上執(zhí)行拄养。

reduce

還有一個(gè)版本的reduce額外接受一個(gè)種子參數(shù)。注意傳遞一個(gè)值為null的種子是合法的银舱,但是與不傳種子參數(shù)的行為是不同的瘪匿。如果你傳遞了種子參數(shù),并且原始Observable沒有發(fā)射任何數(shù)據(jù)寻馏,reduce操作符將發(fā)射這個(gè)種子值然后正常終止棋弥,而不是拋異常。

提示:不建議使用reduce收集發(fā)射的數(shù)據(jù)到一個(gè)可變的數(shù)據(jù)結(jié)構(gòu)诚欠,那種場(chǎng)景你應(yīng)該使用collect顽染。

collect

collectreduce類似,但它的目的是收集原始Observable發(fā)射的所有數(shù)據(jù)到一個(gè)可變的數(shù)據(jù)結(jié)構(gòu)聂薪,collect生成的這個(gè)Observable會(huì)發(fā)射這項(xiàng)數(shù)據(jù)家乘。它需要兩個(gè)參數(shù):

  1. 一個(gè)函數(shù)返回可變數(shù)據(jù)結(jié)構(gòu)
  2. 另一個(gè)函數(shù)蝗羊,當(dāng)傳遞給它這個(gè)數(shù)據(jù)結(jié)構(gòu)和原始Observable發(fā)射的數(shù)據(jù)項(xiàng)時(shí)藏澳,適當(dāng)?shù)匦薷臄?shù)據(jù)結(jié)構(gòu)。

collect默認(rèn)不在任何特定的調(diào)度器上執(zhí)行耀找。

異步操作

下面的這些操作符屬于單獨(dú)的rxjava-async模塊翔悠,它們用于將同步對(duì)象轉(zhuǎn)換為Observable。

  • start(?) — 創(chuàng)建一個(gè)Observable野芒,它發(fā)射一個(gè)函數(shù)的返回值
  • toAsync(?) or asyncAction(?) or asyncFunc(?) — 將一個(gè)函數(shù)或者Action轉(zhuǎn)換為已Observable蓄愁,它執(zhí)行這個(gè)函數(shù)并發(fā)射函數(shù)的返回值
  • startFuture(?) — 將一個(gè)返回Future的函數(shù)轉(zhuǎn)換為一個(gè)Observable,它發(fā)射Future的返回值
  • deferFuture(?) — 將一個(gè)返回Observable的Future轉(zhuǎn)換為一個(gè)Observable狞悲,但是并不嘗試獲取這個(gè)Future返回的Observable撮抓,直到有訂閱者訂閱它
  • forEachFuture(?) — 傳遞Subscriber方法給一個(gè)Subscriber,但是同時(shí)表現(xiàn)得像一個(gè)Future一樣阻塞直到它完成
  • fromAction(?) — 將一個(gè)Action轉(zhuǎn)換為Observable摇锋,當(dāng)一個(gè)訂閱者訂閱時(shí)丹拯,它執(zhí)行這個(gè)action并發(fā)射它的返回值
  • fromCallable(?) — 將一個(gè)Callable轉(zhuǎn)換為Observable站超,當(dāng)一個(gè)訂閱者訂閱時(shí),它執(zhí)行這個(gè)Callable并發(fā)射Callable的返回值乖酬,或者發(fā)射異常
  • fromRunnable(?) — convert a Runnable into an Observable that invokes the runable and emits its result when a Subscriber subscribes將一個(gè)Runnable轉(zhuǎn)換為Observable死相,當(dāng)一個(gè)訂閱者訂閱時(shí),它執(zhí)行這個(gè)Runnable并發(fā)射Runnable的返回值
  • runAsync(?) — 返回一個(gè)StoppableObservable咬像,它發(fā)射某個(gè)Scheduler上指定的Action生成的多個(gè)actions

連接操作

這一節(jié)解釋ConnectableObservable 和它的子類以及它們的操作符:

一個(gè)可連接的Observable與普通的Observable差不多,除了這一點(diǎn):可連接的Observable在被訂閱時(shí)并不開始發(fā)射數(shù)據(jù)县昂,只有在它的connect()被調(diào)用時(shí)才開始肮柜。用這種方法,你可以等所有的潛在訂閱者都訂閱了這個(gè)Observable之后才開始發(fā)射數(shù)據(jù)倒彰。

publishConnect

The following example code shows two Subscribers subscribing to the same Observable. In the first case, they subscribe to an ordinary Observable; in the second case, they subscribe to a Connectable Observable that only connects after both Subscribers subscribe. Note the difference in the output: 下面的示例代碼展示了兩個(gè)訂閱者訂閱同一個(gè)Observable的情況素挽。第一種情形,它們訂閱一個(gè)普通的Observable狸驳;第二種情形预明,它們訂閱一個(gè)可連接的Observable,并且在兩個(gè)都訂閱后再連接耙箍。注意輸出的不同:

示例 #1:

def firstMillion  = Observable.range( 1, 1000000 ).sample(7, java.util.concurrent.TimeUnit.MILLISECONDS);

firstMillion.subscribe(
   { println("Subscriber #1:" + it); },       // onNext
   { println("Error: " + it.getMessage()); }, // onError
   { println("Sequence #1 complete"); }       // onCompleted
);

firstMillion.subscribe(
    { println("Subscriber #2:" + it); },       // onNext
    { println("Error: " + it.getMessage()); }, // onError
    { println("Sequence #2 complete"); }       // onCompleted
);
Subscriber #1:211128
Subscriber #1:411633
Subscriber #1:629605
Subscriber #1:841903
Sequence #1 complete
Subscriber #2:244776
Subscriber #2:431416
Subscriber #2:621647
Subscriber #2:826996
Sequence #2 complete

示例 #2:

def firstMillion  = Observable.range( 1, 1000000 ).sample(7, java.util.concurrent.TimeUnit.MILLISECONDS).publish();

firstMillion.subscribe(
   { println("Subscriber #1:" + it); },       // onNext
   { println("Error: " + it.getMessage()); }, // onError
   { println("Sequence #1 complete"); }       // onCompleted
);

firstMillion.subscribe(
   { println("Subscriber #2:" + it); },       // onNext
   { println("Error: " + it.getMessage()); }, // onError
   { println("Sequence #2 complete"); }       // onCompleted
);

firstMillion.connect();
Subscriber #2:208683
Subscriber #1:208683
Subscriber #2:432509
Subscriber #1:432509
Subscriber #2:644270
Subscriber #1:644270
Subscriber #2:887885
Subscriber #1:887885
Sequence #2 complete
Sequence #1 complete

Connect

讓一個(gè)可連接的Observable開始發(fā)射數(shù)據(jù)給訂閱者

connect

可連接的Observable (connectable Observable)與普通的Observable差不多撰糠,不過它并不會(huì)在被訂閱時(shí)開始發(fā)射數(shù)據(jù),而是直到使用了Connect操作符時(shí)才會(huì)開始辩昆。用這個(gè)方法阅酪,你可以等待所有的觀察者都訂閱了Observable之后再開始發(fā)射數(shù)據(jù)。

connect

RxJava中connectConnectableObservable接口的一個(gè)方法汁针,使用publish操作符可以將一個(gè)普通的Observable轉(zhuǎn)換為一個(gè)ConnectableObservable术辐。

調(diào)用ConnectableObservableconnect方法會(huì)讓它后面的Observable開始給發(fā)射數(shù)據(jù)給訂閱者。

connect方法返回一個(gè)Subscription對(duì)象施无,可以調(diào)用它的unsubscribe方法讓Observable停止發(fā)射數(shù)據(jù)給觀察者辉词。

即使沒有任何訂閱者訂閱它,你也可以使用connect方法讓一個(gè)Observable開始發(fā)射數(shù)據(jù)(或者開始生成待發(fā)射的數(shù)據(jù))猾骡。這樣瑞躺,你可以將一個(gè)"冷"的Observable變?yōu)?熱"的。

Publish

將普通的Observable轉(zhuǎn)換為可連接的Observable

publish

可連接的Observable (connectable Observable)與普通的Observable差不多兴想,不過它并不會(huì)在被訂閱時(shí)開始發(fā)射數(shù)據(jù)幢哨,而是直到使用了Connect操作符時(shí)才會(huì)開始。用這種方法嫂便,你可以在任何時(shí)候讓一個(gè)Observable開始發(fā)射數(shù)據(jù)捞镰。

publish

RxJava的實(shí)現(xiàn)為publish

publish

有一個(gè)變體接受一個(gè)函數(shù)作為參數(shù)。這個(gè)函數(shù)用原始Observable發(fā)射的數(shù)據(jù)作為參數(shù)岸售,產(chǎn)生一個(gè)新的數(shù)據(jù)作為ConnectableObservable給發(fā)射几迄,替換原位置的數(shù)據(jù)項(xiàng)。實(shí)質(zhì)是在簽名的基礎(chǔ)上添加一個(gè)Map操作冰评。

RefCount

讓一個(gè)可連接的Observable行為像普通的Observable

refCount

可連接的Observable (connectable Observable)與普通的Observable差不多映胁,不過它并不會(huì)在被訂閱時(shí)開始發(fā)射數(shù)據(jù),而是直到使用了Connect操作符時(shí)才會(huì)開始甲雅。用這種方法解孙,你可以在任何時(shí)候讓一個(gè)Observable開始發(fā)射數(shù)據(jù)。

RefCount操作符把從一個(gè)可連接的Observable連接和斷開的過程自動(dòng)化了抛人。它操作一個(gè)可連接的Observable弛姜,返回一個(gè)普通的Observable。當(dāng)?shù)谝粋€(gè)訂閱者訂閱這個(gè)Observable時(shí)妖枚,RefCount連接到下層的可連接Observable廷臼。RefCount跟蹤有多少個(gè)觀察者訂閱它,直到最后一個(gè)觀察者完成才斷開與下層可連接Observable的連接绝页。

refCount

RxJava中的實(shí)現(xiàn)為refCount荠商,還有一個(gè)操作符叫share,它的作用等價(jià)于對(duì)一個(gè)Observable同時(shí)應(yīng)用publishrefCount操作续誉。

Replay

保證所有的觀察者收到相同的數(shù)據(jù)序列莱没,即使它們?cè)贠bservable開始發(fā)射數(shù)據(jù)之后才訂閱

replay

可連接的Observable (connectable Observable)與普通的Observable差不多,不過它并不會(huì)在被訂閱時(shí)開始發(fā)射數(shù)據(jù)酷鸦,而是直到使用了Connect操作符時(shí)才會(huì)開始饰躲。用這種方法,你可以在任何時(shí)候讓一個(gè)Observable開始發(fā)射數(shù)據(jù)臼隔。

如果在將一個(gè)Observable轉(zhuǎn)換為可連接的Observable之前對(duì)它使用Replay操作符嘹裂,產(chǎn)生的這個(gè)可連接Observable將總是發(fā)射完整的數(shù)據(jù)序列給任何未來的觀察者,即使那些觀察者在這個(gè)Observable開始給其它觀察者發(fā)射數(shù)據(jù)之后才訂閱摔握。

replay

RxJava的實(shí)現(xiàn)為replay寄狼,它有多個(gè)接受不同參數(shù)的變體,有的可以指定replay的最大緩存數(shù)量盒发,有的還可以指定調(diào)度器例嘱。

replay

有一種 replay返回一個(gè)普通的Observable闺阱。它可以接受一個(gè)變換函數(shù)為參數(shù)际看,這個(gè)函數(shù)接受原始Observable發(fā)射的數(shù)據(jù)項(xiàng)為參數(shù)法牲,返回結(jié)果Observable要發(fā)射的一項(xiàng)數(shù)據(jù)。因此蛮艰,這個(gè)操作符其實(shí)是replay變換之后的數(shù)據(jù)項(xiàng)。

實(shí)現(xiàn)自己的操作符

你可以實(shí)現(xiàn)你自己的Observable操作符雀彼,本文展示怎么做壤蚜。

如果你的操作符是被用于創(chuàng)造一個(gè)Observable即寡,而不是變換或者響應(yīng)一個(gè)Observable,使用 create(?) 方法袜刷,不要試圖手動(dòng)實(shí)現(xiàn) Observable聪富。另外,你可以按照下面的用法說明創(chuàng)建一個(gè)自定義的操作符著蟹。

如果你的操作符是用于Observable發(fā)射的單獨(dú)的數(shù)據(jù)項(xiàng)墩蔓,按照下面的說明做:Sequence Operators 。如果你的操作符是用于變換Observable發(fā)射的整個(gè)數(shù)據(jù)序列萧豆,按照這個(gè)說明做:Transformational Operators 奸披。

提示: 在一個(gè)類似于Groovy的語言Xtend中,你可以以 extension methods 的方式實(shí)現(xiàn)你自己的操作符 ,不使用本文的方法涮雷,它們也可以鏈?zhǔn)秸{(diào)用阵面。詳情參見 RxJava and Xtend

序列操作符

下面的例子向你展示了怎樣使用lift(?)操作符將你的自定義操作符(在這個(gè)例子中是 myOperator)與標(biāo)準(zhǔn)的RxJava操作符(如ofTypemap)一起使用:

fooObservable = barObservable.ofType(Integer).map({it*2}).lift(new MyOperator<T>()).map({"transformed by myOperator: " + it});

下面這部分向你展示了你的操作符的腳手架形式,以便它能正確的與lift()搭配使用洪鸭。

實(shí)現(xiàn)你的操作符

將你的自定義操作符定義為實(shí)現(xiàn)了 Operator 接口的一個(gè)公開類, 就像這樣:

public class MyOperator<T> implements Operator<T> {
  public MyOperator( /* any necessary params here */ ) {
    /* 這里添加必要的初始化代碼 */
  }

  @Override
  public Subscriber<? super T> call(final Subscriber<? super T> s) {
    return new Subscriber<t>(s) {
      @Override
      public void onCompleted() {
        /* 這里添加你自己的onCompleted行為样刷,或者僅僅傳遞完成通知: */
        if(!s.isUnsubscribed()) {
          s.onCompleted();
        }
      }

      @Override
      public void onError(Throwable t) {
        /* 這里添加你自己的onError行為, 或者僅僅傳遞錯(cuò)誤通知:*/
        if(!s.isUnsubscribed()) {
          s.onError(t);
        }
      }

      @Override
      public void onNext(T item) {
        /* 這個(gè)例子對(duì)結(jié)果的每一項(xiàng)執(zhí)行排序操作,然后返回這個(gè)結(jié)果 */
        if(!s.isUnsubscribed()) {
          transformedItem = myOperatorTransformOperation(item);
          s.onNext(transformedItem);
        }
      }
    };
  }
}

變換操作符

下面的例子向你展示了怎樣使用 compose(?) 操作符將你得自定義操作符(在這個(gè)例子中览爵,是一個(gè)名叫myTransformer的操作符颂斜,它將一個(gè)發(fā)射整數(shù)的Observable轉(zhuǎn)換為發(fā)射字符串的)與標(biāo)準(zhǔn)的RxJava操作符(如ofTypemap)一起使用:

fooObservable = barObservable.ofType(Integer).map({it*2}).compose(new MyTransformer<Integer,String>()).map({"transformed by myOperator: " + it});

下面這部分向你展示了你的操作符的腳手架形式,以便它能正確的與compose()搭配使用拾枣。

實(shí)現(xiàn)你的變換器

將你的自定義操作符定義為實(shí)現(xiàn)了 Transformer 接口的一個(gè)公開類沃疮,就像這樣:

public class MyTransformer<Integer,String> implements Transformer<Integer,String> {
  public MyTransformer( /* any necessary params here */ ) {
    /* 這里添加必要的初始化代碼 */
  }

  @Override
  public Observable<String> call(Observable<Integer> source) {
    /* 
     * 這個(gè)簡(jiǎn)單的例子Transformer應(yīng)用一個(gè)map操作,
     * 這個(gè)map操作將發(fā)射整數(shù)變換為發(fā)射整數(shù)的字符串表示梅肤。
     */
    return source.map( new Func1<Integer,String>() {
      @Override
      public String call(Integer t1) {
        return String.valueOf(t1);
      }
    } );
  }
}

參見

其它需要考慮的

  • 在發(fā)射任何數(shù)據(jù)(或者通知)給訂閱者之前司蔬,你的序列操作符可能需要檢查它的 Subscriber.isUnsubscribed(?) 狀態(tài),如果沒有訂閱者了姨蝴,沒必要浪費(fèi)時(shí)間生成數(shù)據(jù)項(xiàng)俊啼。

  • 請(qǐng)注意:你的序列操作符必須復(fù)合Observable協(xié)議的核心原則:

    • 它可能調(diào)用訂閱者的 onNext(?) 方法任意次,但是這些調(diào)用必須是不重疊的左医。
    • 它只能調(diào)用訂閱者的 onCompleted(?)onError(?) 正好一次授帕,但不能都調(diào)用,而且不能在這之后調(diào)用訂閱者的 onNext(?) 方法浮梢。
    • 如果你不能保證你得操作符遵從這兩個(gè)原則跛十,你可以給它添加 serialize(?) 操作符,它會(huì)強(qiáng)制保持正確的行為秕硝。
  • 請(qǐng)關(guān)注這里 Issue #1962 &mdash芥映;需要有一個(gè)計(jì)劃創(chuàng)建一個(gè)測(cè)試腳手架,你可以用它來寫測(cè)試驗(yàn)證你的新操作符遵從了Observable協(xié)議。

  • 不要讓你的操作符阻塞別的操作奈偏。

  • When possible, you should compose new operators by combining existing operators, rather than implementing them with new code. RxJava itself does this with some of its standard operators, for example:

  • 如果可能坞嘀,你應(yīng)該組合現(xiàn)有的操作符創(chuàng)建你的新操作符,而不是從零開始實(shí)現(xiàn)它惊来。RxJava自身的標(biāo)準(zhǔn)操作符也是這樣做的丽涩,例如:

  • 如果你的操作符使用了函數(shù)或者lambda表達(dá)式作為參數(shù),請(qǐng)注意它們可能是異常的來源裁蚁,而且要準(zhǔn)備好捕獲這些異常矢渊,并且使用

onError()

通知訂閱者。

  • 某些異常被認(rèn)為是致命的厘擂,對(duì)它們來說昆淡,調(diào)用 onError()毫無意義,那樣或者是無用的刽严,或者只是對(duì)問題的妥協(xié)昂灵。你可以使用 Exceptions.throwIfFatal(throwable) 方法過濾掉這些致命的異常,并重新拋出它們舞萄,而不是試圖發(fā)射關(guān)于它們的通知眨补。

  • 一般說來,一旦發(fā)生錯(cuò)誤應(yīng)立即通知訂閱者倒脓,而不是首先嘗試發(fā)射更多的數(shù)據(jù)撑螺。

  • 請(qǐng)注意 null 可能是Observable發(fā)射的一個(gè)合法數(shù)據(jù)。頻繁發(fā)生錯(cuò)誤的一個(gè)來源是:測(cè)試一些變量并且將持有一個(gè)非 null 值作為是否發(fā)射了數(shù)據(jù)的替代崎弃。一個(gè)值為 null 的數(shù)據(jù)仍然是一個(gè)發(fā)射數(shù)據(jù)項(xiàng)甘晤,它與沒有發(fā)射任何東西是不能等同的。

  • 想讓你的操作符在反壓(backpressure)場(chǎng)景中變得得好可能會(huì)非常棘手饲做∠呋椋可以參考Dávid Karnok的博客 Advanced RxJava,這里有一個(gè)涉及到的各種因素和怎樣處理它們的很值得看的討論盆均。

插件讓你可以用多種方式修改RxJava的默認(rèn)行為:

  • 修改默認(rèn)的計(jì)算塞弊、IO和新線程調(diào)度器集合
  • 為RxJava可能遇到的特殊錯(cuò)誤注冊(cè)一個(gè)錯(cuò)誤處理器
  • 注冊(cè)一個(gè)函數(shù)記錄一些常規(guī)RxJava活動(dòng)的發(fā)生

RxJavaSchedulersHook

這個(gè)插件讓你可以使用你選擇的調(diào)度器覆蓋默認(rèn)的計(jì)算、IO和新線程調(diào)度 (Scheduler)泪姨,要做到這些游沿,需要繼承 RxJavaSchedulersHook 類并覆寫這些方法:

  • Scheduler getComputationScheduler(?)
  • Scheduler getIOScheduler(?)
  • Scheduler getNewThreadScheduler(?)
  • Action0 onSchedule(action)

然后是下面這些步驟:

  1. 創(chuàng)建一個(gè)你實(shí)現(xiàn)的 RxJavaSchedulersHook 子類的對(duì)象。
  2. 使用 RxJavaPlugins.getInstance(?) 獲取全局的RxJavaPlugins對(duì)象肮砾。
  3. 將你的默認(rèn)調(diào)度器對(duì)象傳遞給 RxJavaPluginsregisterSchedulersHook(?) 方法诀黍。

完成這些后,RxJava會(huì)開始使用你的方法返回的調(diào)度器唇敞,而不是內(nèi)置的默認(rèn)調(diào)度器蔗草。

RxJavaErrorHandler

這個(gè)插件讓你可以注冊(cè)一個(gè)函數(shù)處理傳遞給 Subscriber.onError(Throwable) 的錯(cuò)誤咒彤。要做到這一點(diǎn)疆柔,需要繼承 RxJavaErrorHandler 類并覆寫這個(gè)方法:

  • void handleError(Throwable e)

然后是下面這些步驟:

  1. 創(chuàng)建一個(gè)你實(shí)現(xiàn)的 RxJavaErrorHandler 子類的對(duì)象咒精。
  2. 使用 RxJavaPlugins.getInstance(?) 獲取全局的RxJavaPlugins對(duì)象。
  3. 將你的錯(cuò)誤處理器對(duì)象傳遞給 RxJavaPluginsregisterErrorHandler(?) 方法旷档。

完成這些后模叙,RxJava會(huì)開始使用你的錯(cuò)誤處理器處理傳遞給 Subscriber.onError(Throwable) 的錯(cuò)誤。

RxJavaObservableExecutionHook

這個(gè)插件讓你可以注冊(cè)一個(gè)函數(shù)用于記錄日志或者性能數(shù)據(jù)收集鞋屈,RxJava在某些常規(guī)活動(dòng)時(shí)會(huì)調(diào)用它范咨。要做到這一點(diǎn),需要繼承 RxJavaObservableExecutionHook 類并覆寫這些方法:

方法 何時(shí)調(diào)用
onCreate(?) Observable.create(?)方法中
onSubscribeStart(?) Observable.subscribe(?)之前立刻
onSubscribeReturn(?) Observable.subscribe(?)之后立刻
onSubscribeError(?) Observable.subscribe(?)執(zhí)行失敗時(shí)
onLift(?) Observable.lift(?)方法中

然后是下面這些步驟:

  1. 創(chuàng)建一個(gè)你實(shí)現(xiàn)的 RxJavaObservableExecutionHook 子類的對(duì)象厂庇。
  2. 使用 RxJavaPlugins.getInstance(?) 獲取全局的RxJavaPlugins對(duì)象渠啊。
  3. 將你的Hook對(duì)象傳遞給 RxJavaPluginsregisterObservableExecutionHook(?) 方法。

When you do this, RxJava will begin to call your functions when it encounters the specific conditions they were designed to take note of. 完成這些后权旷,在滿足某些特殊的條件時(shí)替蛉,RxJava會(huì)開始調(diào)用你的方法。

背壓?jiǎn)栴}

背壓是指在異步場(chǎng)景中拄氯,被觀察者發(fā)送事件速度遠(yuǎn)快于觀察者的處理速度的情況下躲查,一種告訴上游的被觀察者降低發(fā)送速度的策略

簡(jiǎn)而言之,背壓是流速控制的一種策略译柏。

需要強(qiáng)調(diào)兩點(diǎn):

  • 背壓策略的一個(gè)前提是異步環(huán)境镣煮,也就是說,被觀察者和觀察者處在不同的線程環(huán)境中鄙麦。
  • 背壓(Backpressure)并不是一個(gè)像flatMap一樣可以在程序中直接使用的操作符典唇,他只是一種控制事件流速的策略。

響應(yīng)式拉瓤韪(reactive pull)

首先我們回憶之前那篇《關(guān)于Rxjava最友好的文章》介衔,里面其實(shí)提到,在RxJava的觀察者模型中盟劫,被觀察者是主動(dòng)的推送數(shù)據(jù)給觀察者夜牡,觀察者是被動(dòng)接收的。而響應(yīng)式拉取則反過來侣签,觀察者主動(dòng)從被觀察者那里去拉取數(shù)據(jù)塘装,而被觀察者變成被動(dòng)的等待通知再發(fā)送數(shù)據(jù)

結(jié)構(gòu)示意圖如下:

[圖片上傳失敗...(image-cccb92-1572187975155)]

觀察者可以根據(jù)自身實(shí)際情況按需拉取數(shù)據(jù)影所,而不是被動(dòng)接收(也就相當(dāng)于告訴上游觀察者把速度慢下來)蹦肴,最終實(shí)現(xiàn)了上游被觀察者發(fā)送事件的速度的控制,實(shí)現(xiàn)了背壓的策略猴娩。

源碼

public class FlowableOnBackpressureBufferStategy{
...
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            boolean callOnOverflow = false;
            boolean callError = false;
            Deque<T> dq = deque;
            synchronized (dq) {
               if (dq.size() == bufferSize) {
                   switch (strategy) {
                   case DROP_LATEST:
                       dq.pollLast();
                       dq.offer(t);
                       callOnOverflow = true;
                       break;
                   case DROP_OLDEST:
                       dq.poll();
                       dq.offer(t);
                       callOnOverflow = true;
                       break;
                   default:
                       // signal error
                       callError = true;
                       break;
                   }
               } else {
                   dq.offer(t);
               }
            }

            if (callOnOverflow) {
                if (onOverflow != null) {
                    try {
                        onOverflow.run();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        s.cancel();
                        onError(ex);
                    }
                }
            } else if (callError) {
                s.cancel();
                onError(new MissingBackpressureException());
            } else {
                drain();
            }
        }
...
}

在這段源碼中阴幌,根據(jù)不同的背壓策略進(jìn)行了不同的處理措施勺阐,當(dāng)然這只是列舉了一段關(guān)于buffer背壓策略的例子。

根源

產(chǎn)生背壓?jiǎn)栴}的根源就是上游發(fā)送速度與下游的處理速度不均導(dǎo)致的矛双,所以如果想要解決這個(gè)問題就需要通過匹配兩個(gè)速率達(dá)到解決這個(gè)背壓根源的措施渊抽。

通常有兩個(gè)策略可供使用:

  1. 從數(shù)量上解決,對(duì)數(shù)據(jù)進(jìn)行采樣
  2. 從速度上解決议忽,降低發(fā)送事件的速率
  3. 利用flowable和subscriber

使用Flowable

Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "emit 1");
                emitter.onNext(1);
                Log.d(TAG, "emit 2");
                emitter.onNext(2);
                Log.d(TAG, "emit 3");
                emitter.onNext(3);
                Log.d(TAG, "emit complete");
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR); //增加了一個(gè)參數(shù)

        Subscriber<Integer> downstream = new Subscriber<Integer>() {

            @Override
            public void onSubscribe(Subscription s) {
                Log.d(TAG, "onSubscribe");
                s.request(Long.MAX_VALUE);  //注意這句代碼
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: " + integer);

            }

            @Override
            public void onError(Throwable t) {
                 Log.w(TAG, "onError: ", t);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        };

        upstream.subscribe(downstream);

我們注意到這次和Observable有些不同. 首先是創(chuàng)建Flowable的時(shí)候增加了一個(gè)參數(shù), 這個(gè)參數(shù)是用來選擇背壓,也就是出現(xiàn)上下游流速不均衡的時(shí)候應(yīng)該怎么處理的辦法, 這里我們直接用BackpressureStrategy.ERROR這種方式, 這種方式會(huì)在出現(xiàn)上下游流速不均衡的時(shí)候直接拋出一個(gè)異常,這個(gè)異常就是著名的MissingBackpressureException. 其余的策略后面再來講解.

另外的一個(gè)區(qū)別是在下游的onSubscribe方法中傳給我們的不再是Disposable了, 而是Subscription, 它倆有什么區(qū)別呢, 首先它們都是上下游中間的一個(gè)開關(guān), 之前我們說調(diào)用Disposable.dispose()方法可以切斷水管, 同樣的調(diào)用Subscription.cancel()也可以切斷水管, 不同的地方在于Subscription增加了一個(gè)void request(long n)方法, 這個(gè)方法有什么用呢, 在上面的代碼中也有這么一句代碼:

 s.request(Long.MAX_VALUE);

這是因?yàn)?code>Flowable在設(shè)計(jì)的時(shí)候采用了一種新的思路也就是響應(yīng)式拉取的方式來更好的解決上下游流速不均衡的問題, 與我們之前所講的控制數(shù)量控制速度不太一樣, 這種方式用通俗易懂的話來說就好比是葉問打鬼子, 我們把上游看成小日本, 把下游當(dāng)作葉問, 當(dāng)調(diào)用Subscription.request(1)時(shí), 葉問就說我要打一個(gè)! 然后小日本就拿出一個(gè)鬼子給葉問, 讓他打, 等葉問打死這個(gè)鬼子之后, 再次調(diào)用request(10), 葉問就又說我要打十個(gè)! 然后小日本又派出十個(gè)鬼子給葉問, 然后就在邊上看熱鬧, 看葉問能不能打死十個(gè)鬼子, 等葉問打死十個(gè)鬼子后再繼續(xù)要鬼子接著打...

所以我們把request當(dāng)做是一種能力, 當(dāng)成下游處理事件的能力, 下游能處理幾個(gè)就告訴上游我要幾個(gè), 這樣只要上游根據(jù)下游的處理能力來決定發(fā)送多少事件, 就不會(huì)造成一窩蜂的發(fā)出一堆事件來, 從而導(dǎo)致OOM. 這也就完美的解決之前我們所學(xué)到的兩種方式的缺陷, 過濾事件會(huì)導(dǎo)致事件丟失, 減速又可能導(dǎo)致性能損失. 而這種方式既解決了事件丟失的問題, 又解決了速度的問題, 完美 !

同步情況

Observable.create(new ObservableOnSubscribe<Integer>() {                         
    @Override                                                                    
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { 
        for (int i = 0; ; i++) {   //無限循環(huán)發(fā)事件                                              
            emitter.onNext(i);                                                   
        }                                                                        
    }                                                                            
}).subscribe(new Consumer<Integer>() {                                           
    @Override                                                                    
    public void accept(Integer integer) throws Exception {                       
        Thread.sleep(2000);                                                      
        Log.d(TAG, "" + integer);                                                
    }                                                                            
});

當(dāng)上下游工作在同一個(gè)線程中時(shí), 這時(shí)候是一個(gè)同步的訂閱關(guān)系, 也就是說上游每發(fā)送一個(gè)事件必須等到下游接收處理完了以后才能接著發(fā)送下一個(gè)事件.

同步與異步的區(qū)別就在于有沒有緩存發(fā)送事件的緩沖區(qū)懒闷。

異步情況

通過subscribeOn和observeOn來確定對(duì)應(yīng)的線程,達(dá)到異步的效果栈幸,異步時(shí)會(huì)有一個(gè)對(duì)應(yīng)的緩存區(qū)來換從從上游發(fā)送的事件愤估。

public enum BackpressureStrategy {
    /**
     * OnNext events are written without any buffering or dropping.
     * Downstream has to deal with any overflow.
     * <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
     */
    MISSING,
    /**
     * Signals a MissingBackpressureException in case the downstream can't keep up.
     */
    ERROR,
    /**
     * Buffers <em>all</em> onNext values until the downstream consumes it.
     */
    BUFFER,
    /**
     * Drops the most recent onNext value if the downstream can't keep up.
     */
    DROP,
    /**
     * Keeps only the latest onNext value, overwriting any previous value if the
     * downstream can't keep up.
     */
    LATEST
}

背壓策略:

  1. error, 緩沖區(qū)大概在128
  2. buffer速址, 緩沖區(qū)在1000左右
  3. drop玩焰, 把存不下的事件丟棄
  4. latest, 只保留最新的
  5. missing, 缺省設(shè)置芍锚,不做任何操作

上游從哪里得知下游的處理能力呢昔园?我們來看看上游最重要的部分,肯定就是FlowableEmitter了啊闹炉,我們就是通過它來發(fā)送事件的啊蒿赢,來看看它的源碼吧(別緊張,它的代碼灰常簡(jiǎn)單):

public interface FlowableEmitter<T> extends Emitter<T> {
    void setDisposable(Disposable s);
    void setCancellable(Cancellable c);

    /**
     * The current outstanding request amount.
     * <p>This method is thread-safe.
     * @return the current outstanding request amount
     */
    long requested();

    boolean isCancelled();
    FlowableEmitter<T> serialize();
}

FlowableEmitter是個(gè)接口渣触,繼承Emitter羡棵,Emitter里面就是我們的onNext(),onComplete()和onError()三個(gè)方法。我們看到FlowableEmitter中有這么一個(gè)方法:

long requested();

img

同步request.png

這張圖的意思就是當(dāng)上下游在同一個(gè)線程中的時(shí)候嗅钻,在下游調(diào)用request(n)就會(huì)直接改變上游中的requested的值皂冰,多次調(diào)用便會(huì)疊加這個(gè)值,而上游每發(fā)送一個(gè)事件之后便會(huì)去減少這個(gè)值养篓,當(dāng)這個(gè)值減少至0的時(shí)候秃流,繼續(xù)發(fā)送事件便會(huì)拋異常了。

img

異步request.png

可以看到柳弄,當(dāng)上下游工作在不同的線程里時(shí)舶胀,每一個(gè)線程里都有一個(gè)requested,而我們調(diào)用request(1000)時(shí)碧注,實(shí)際上改變的是下游主線程中的requested嚣伐,而上游中的requested的值是由RxJava內(nèi)部調(diào)用request(n)去設(shè)置的,這個(gè)調(diào)用會(huì)在合適的時(shí)候自動(dòng)觸發(fā)萍丐。

Rxjava實(shí)例開發(fā)應(yīng)用

  1. 網(wǎng)絡(luò)請(qǐng)求處理(輪詢轩端,嵌套,出錯(cuò)重連)
  2. 功能防抖
  3. 從多級(jí)緩存獲取數(shù)據(jù)
  4. 合并數(shù)據(jù)源
  5. 聯(lián)合判斷
  6. 與 Retrofit,RxBinding,EventBus結(jié)合使用

Rxjava原理

  1. Scheduler線程切換工作原理
  2. 數(shù)據(jù)的發(fā)送與接收(觀察者模式)
  3. lift的工作原理
  4. map的工作原理
  5. flatMap的工作原理
  6. merge的工作原理
  7. concat的工作原理

結(jié)完逝变。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
禁止轉(zhuǎn)載基茵,如需轉(zhuǎn)載請(qǐng)通過簡(jiǎn)信或評(píng)論聯(lián)系作者奋构。
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市拱层,隨后出現(xiàn)的幾起案子弥臼,更是在濱河造成了極大的恐慌,老刑警劉巖舱呻,帶你破解...
    沈念sama閱讀 221,430評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件醋火,死亡現(xiàn)場(chǎng)離奇詭異悠汽,居然都是意外死亡箱吕,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,406評(píng)論 3 398
  • 文/潘曉璐 我一進(jìn)店門柿冲,熙熙樓的掌柜王于貴愁眉苦臉地迎上來茬高,“玉大人,你說我怎么就攤上這事假抄≡踉裕” “怎么了?”我有些...
    開封第一講書人閱讀 167,834評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵宿饱,是天一觀的道長(zhǎng)熏瞄。 經(jīng)常有香客問我,道長(zhǎng)谬以,這世上最難降的妖魔是什么强饮? 我笑而不...
    開封第一講書人閱讀 59,543評(píng)論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮为黎,結(jié)果婚禮上邮丰,老公的妹妹穿的比我還像新娘。我一直安慰自己铭乾,他們只是感情好剪廉,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,547評(píng)論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著炕檩,像睡著了一般斗蒋。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上笛质,一...
    開封第一講書人閱讀 52,196評(píng)論 1 308
  • 那天泉沾,我揣著相機(jī)與錄音,去河邊找鬼经瓷。 笑死爆哑,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的舆吮。 我是一名探鬼主播揭朝,決...
    沈念sama閱讀 40,776評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼队贱,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了潭袱?” 一聲冷哼從身側(cè)響起柱嫌,我...
    開封第一講書人閱讀 39,671評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎屯换,沒想到半個(gè)月后编丘,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,221評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡彤悔,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,303評(píng)論 3 340
  • 正文 我和宋清朗相戀三年嘉抓,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片晕窑。...
    茶點(diǎn)故事閱讀 40,444評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡抑片,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出杨赤,到底是詐尸還是另有隱情敞斋,我是刑警寧澤,帶...
    沈念sama閱讀 36,134評(píng)論 5 350
  • 正文 年R本政府宣布疾牲,位于F島的核電站植捎,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏阳柔。R本人自食惡果不足惜焰枢,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,810評(píng)論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望盔沫。 院中可真熱鬧医咨,春花似錦、人聲如沸架诞。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,285評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽谴忧。三九已至很泊,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間沾谓,已是汗流浹背委造。 一陣腳步聲響...
    開封第一講書人閱讀 33,399評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留均驶,地道東北人昏兆。 一個(gè)月前我還...
    沈念sama閱讀 48,837評(píng)論 3 376
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像妇穴,于是被迫代替她去往敵國(guó)和親爬虱。 傳聞我的和親對(duì)象是個(gè)殘疾皇子隶债,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,455評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容

  • RxJava正在Android開發(fā)者中變的越來越流行。唯一的問題就是上手不容易跑筝,尤其是大部分人之前都是使用命令式編...
    劉啟敏閱讀 1,871評(píng)論 1 7
  • 一死讹、RxJava操作符概述 RxJava中的操作符就是為了提供函數(shù)式的特性,函數(shù)式最大的好處就是使得數(shù)據(jù)處理簡(jiǎn)潔易...
    無求_95dd閱讀 3,105評(píng)論 0 21
  • 一曲梗、RxJava操作符概述 RxJava中的操作符就是為了提供函數(shù)式的特性赞警,函數(shù)式最大的好處就是使得數(shù)據(jù)處理簡(jiǎn)潔易...
    測(cè)天測(cè)地測(cè)空氣閱讀 637評(píng)論 0 1
  • 記錄RxJava操作符,方便查詢(2.2.2版本) 英文文檔地址:http://reactivex.io/docu...
    凌云飛魚閱讀 826評(píng)論 0 0
  • 年薪百萬虏两,實(shí)現(xiàn)的路徑愧旦, 思考路徑,貢獻(xiàn)千萬碘举, 千萬以上流水的行業(yè)及市場(chǎng)容量忘瓦, 企業(yè)軟件,供應(yīng)鏈金融引颈,千萬以上的流水...
    leoliwinner閱讀 201評(píng)論 0 0