Buffer
定期收集Observable的數(shù)據(jù)放進(jìn)一個(gè)數(shù)據(jù)包裹,然后發(fā)射這些數(shù)據(jù)包裹,而不是一次發(fā)射一個(gè)值。
Buffer
操作符將一個(gè)Observable變換為另一個(gè),原來的Observable正常發(fā)射數(shù)據(jù)啊掏,變換產(chǎn)生的Observable發(fā)射這些數(shù)據(jù)的緩存集合。Buffer
操作符在很多語言特定的實(shí)現(xiàn)中有很多種變體衰猛,它們?cè)谌绾尉彺孢@個(gè)問題上存在區(qū)別迟蜜。
注意:如果原來的Observable發(fā)射了一個(gè)onError
通知,Buffer
會(huì)立即傳遞這個(gè)通知啡省,而不是首先發(fā)射緩存的數(shù)據(jù)娜睛,即使在這之前緩存中包含了原始Observable發(fā)射的數(shù)據(jù)。
Window
操作符與Buffer
類似卦睹,但是它在發(fā)射之前把收集到的數(shù)據(jù)放進(jìn)單獨(dú)的Observable畦戒,而不是放進(jìn)一個(gè)數(shù)據(jù)結(jié)構(gòu)。
在RxJava中有許多Buffer
的變體:
buffer(count)
buffer(count)以列表(List)的形式發(fā)射非重疊的緩存结序,每一個(gè)緩存至多包含來自原始Observable的count項(xiàng)數(shù)據(jù)(最后發(fā)射的列表數(shù)據(jù)可能少于count項(xiàng))
buffer(count, skip)
buffer(count,?skip)從原始Observable的第一項(xiàng)數(shù)據(jù)開始創(chuàng)建新的緩存障斋,此后每當(dāng)收到skip
項(xiàng)數(shù)據(jù),用count
項(xiàng)數(shù)據(jù)填充緩存:開頭的一項(xiàng)和后續(xù)的count-1
項(xiàng)徐鹤,它以列表(List)的形式發(fā)射緩存垃环,取決于count
和skip
的值,這些緩存可能會(huì)有重疊部分(比如skip < count時(shí))返敬,也可能會(huì)有間隙(比如skip > count時(shí))
FlatMap
FlatMap
將一個(gè)發(fā)射數(shù)據(jù)的Observable變換為多個(gè)Observables遂庄,然后將它們發(fā)射的數(shù)據(jù)合并后放進(jìn)一個(gè)單獨(dú)的Observable
FlatMap
操作符使用一個(gè)指定的函數(shù)對(duì)原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)執(zhí)行變換操作,這個(gè)函數(shù)返回一個(gè)本身也發(fā)射數(shù)據(jù)的Observable救赐,然后FlatMap
合并這些Observables發(fā)射的數(shù)據(jù)涧团,最后將合并后的結(jié)果當(dāng)做它自己的數(shù)據(jù)序列發(fā)射只磷。
這個(gè)方法是很有用的经磅,例如泌绣,當(dāng)你有一個(gè)這樣的Observable:它發(fā)射一個(gè)數(shù)據(jù)序列,這些數(shù)據(jù)本身包含Observable成員或者可以變換為Observable预厌,因此你可以創(chuàng)建一個(gè)新的Observable發(fā)射這些次級(jí)Observable發(fā)射的數(shù)據(jù)的完整集合阿迈。
注意:FlatMap
對(duì)這些Observables發(fā)射的數(shù)據(jù)做的是合并(merge
)操作,因此它們可能是交錯(cuò)的轧叽。
在許多語言特定的實(shí)現(xiàn)中苗沧,還有一個(gè)操作符不會(huì)讓變換后的Observables發(fā)射的數(shù)據(jù)交錯(cuò),它按照嚴(yán)格的順序發(fā)射這些數(shù)據(jù)炭晒,這個(gè)操作符通常被叫作ConcatMap
或者類似的名字待逞。
注意:如果任何一個(gè)通過這個(gè)flatMap
操作產(chǎn)生的單獨(dú)的Observable調(diào)用onError
異常終止了,這個(gè)Observable自身會(huì)立即調(diào)用onError
并終止网严。
這個(gè)操作符有一個(gè)接受額外的int
參數(shù)的一個(gè)變體识樱。這個(gè)參數(shù)設(shè)置flatMap
從原來的Observable映射Observables的最大同時(shí)訂閱數(shù)。當(dāng)達(dá)到這個(gè)限制時(shí)震束,它會(huì)等待其中一個(gè)終止然后再訂閱另一個(gè)怜庸。
GroupBy
將一個(gè)Observable分拆為一些Observables集合,它們中的每一個(gè)發(fā)射原始Observable的一個(gè)子序列
GroupBy
操作符將原始Observable分拆為一些Observables集合垢村,它們中的每一個(gè)發(fā)射原始Observable數(shù)據(jù)序列的一個(gè)子序列割疾。哪個(gè)數(shù)據(jù)項(xiàng)由哪一個(gè)Observable發(fā)射是由一個(gè)函數(shù)判定的,這個(gè)函數(shù)給每一項(xiàng)指定一個(gè)Key嘉栓,Key相同的數(shù)據(jù)會(huì)被同一個(gè)Observable發(fā)射宏榕。
RxJava實(shí)現(xiàn)了groupBy
操作符。它返回Observable的一個(gè)特殊子類GroupedObservable
侵佃,實(shí)現(xiàn)了GroupedObservable
接口的對(duì)象有一個(gè)額外的方法getKey
麻昼,這個(gè)Key用于將數(shù)據(jù)分組到指定的Observable。
有一個(gè)版本的groupBy
允許你傳遞一個(gè)變換函數(shù)趣钱,這樣它可以在發(fā)射結(jié)果GroupedObservable
之前改變數(shù)據(jù)項(xiàng)涌献。
注意:groupBy
將原始Observable分解為一個(gè)發(fā)射多個(gè)GroupedObservable
的Observable,一旦有訂閱首有,每個(gè)GroupedObservable
就開始緩存數(shù)據(jù)燕垃。因此,如果你忽略這些GroupedObservable
中的任何一個(gè)井联,這個(gè)緩存可能形成一個(gè)潛在的內(nèi)存泄露卜壕。因此,如果你不想觀察烙常,也不要忽略GroupedObservable
轴捎。你應(yīng)該使用像take(0)
這樣會(huì)丟棄自己的緩存的操作符鹤盒。
如果你取消訂閱一個(gè)GroupedObservable
,那個(gè)Observable將會(huì)終止侦副。如果之后原始的Observable又發(fā)射了一個(gè)與這個(gè)Observable的Key匹配的數(shù)據(jù)侦锯,groupBy
將會(huì)為這個(gè)Key創(chuàng)建一個(gè)新的GroupedObservable
。
groupBy
默認(rèn)不在任何特定的調(diào)度器上執(zhí)行秦驯。
Map
對(duì)Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù)尺碰,執(zhí)行變換操作
Map
操作符對(duì)原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)你選擇的函數(shù),然后返回一個(gè)發(fā)射這些結(jié)果的Observable译隘。
RxJava將這個(gè)操作符實(shí)現(xiàn)為map
函數(shù)亲桥。這個(gè)操作符默認(rèn)不在任何特定的調(diào)度器上執(zhí)行。
Scan
連續(xù)地對(duì)數(shù)據(jù)序列的每一項(xiàng)應(yīng)用一個(gè)函數(shù)固耘,然后連續(xù)發(fā)射結(jié)果
scan操作符對(duì)原始Observable發(fā)射的第一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù)题篷,然后將那個(gè)函數(shù)的結(jié)果作為自己的第一項(xiàng)數(shù)據(jù)發(fā)射。它將函數(shù)的結(jié)果同第二項(xiàng)數(shù)據(jù)一起填充給這個(gè)函數(shù)來產(chǎn)生它自己的第二項(xiàng)數(shù)據(jù)厅目。它持續(xù)進(jìn)行這個(gè)過程來產(chǎn)生剩余的數(shù)據(jù)序列番枚。這個(gè)操作符在某些情況下被叫做accumulator
。
RxJava實(shí)現(xiàn)了scan
操作符璧瞬。
示例代碼:
Observable.just(1, 2, 3, 4, 5)
.scan(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer sum, Integer item) {
return sum + item;
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
輸出
Next: 1
Next: 3
Next: 6
Next: 10
Next: 15
Sequence complete.