操作符分類
操作符分類有十三種:
名稱 | 解析 |
---|---|
創(chuàng)建操作 | 用于創(chuàng)建Observable的操作符 |
變換操作 | 這些操作符可用于對Observable發(fā)射的數(shù)據(jù)進(jìn)行變換 |
過濾操作 | 這些操作符用于從Observable發(fā)射的數(shù)據(jù)中進(jìn)行選擇 |
組合操作 | 組合操作符用于將多個(gè)Observable組合成一個(gè)單一的Observable |
錯(cuò)誤處理 | 這些操作符用于從錯(cuò)誤通知中恢復(fù) |
輔助操作 | 一組用于處理Observable的操作符 |
條件和布爾操作 | 用于單個(gè)或多個(gè)數(shù)據(jù)項(xiàng)白翻,也可用于Observable |
算術(shù)和聚合操作 | 用于整個(gè)數(shù)據(jù)序列 |
異步操作 | 單獨(dú)的 rxjava-async 模塊,它們用于將同步對象轉(zhuǎn)換為Observable |
連接操作 | 一些有精確可控的訂閱行為的特殊Observable |
轉(zhuǎn)換操作 | 只有一個(gè)To绢片,將Observable轉(zhuǎn)換為其它的對象或數(shù)據(jù)結(jié)構(gòu) |
阻塞操作 | 阻塞Observable的操作符 |
字符串操作 | 一些用于處理字符串序列和流的特殊操作符 |
變換操作符
操作符 | 解析 |
---|---|
buffer() | 緩存滤馍,可以簡單的理解為緩存,它定期從Observable收集數(shù)據(jù)到一個(gè)集合杉畜,然后把這些數(shù)據(jù)集合打包發(fā)射纪蜒,而不是一次發(fā)射一個(gè) |
map() | 對序列的每一項(xiàng)都應(yīng)用一個(gè)函數(shù)來變換Observable發(fā)射的數(shù)據(jù)序列 |
flatMap()(無序), concatMap() (有序), flatMapIterable() | 將Observable發(fā)射的數(shù)據(jù)集合變換為Observables集合,然后將這些Observable發(fā)射的數(shù)據(jù)平坦化的放進(jìn)一個(gè)單獨(dú)的Observable |
switchMap() | 將Observable發(fā)射的數(shù)據(jù)集合變換為Observables集合此叠,然后只發(fā)射這些Observables最近發(fā)射的數(shù)據(jù) |
scan() | 對Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù),然后按順序依次發(fā)射每一個(gè)值 |
groupBy() | 將Observable分拆為Observable集合随珠,將原始Observable發(fā)射的數(shù)據(jù)按Key分組灭袁,每一個(gè)Observable發(fā)射一組不同的數(shù)據(jù) |
buffer() | 它定期從Observable收集數(shù)據(jù)到一個(gè)集合,然后把這些數(shù)據(jù)集合打包發(fā)射窗看,而不是一次發(fā)射一個(gè) |
window() | 定期將來自O(shè)bservable的數(shù)據(jù)分拆成一些Observable窗口茸歧,然后發(fā)射這些窗口,而不是每次發(fā)射一項(xiàng) |
cast() | 在發(fā)射之前強(qiáng)制將Observable發(fā)射的所有數(shù)據(jù)轉(zhuǎn)換為指定類型 |
過濾操作符列表
方法 | 含義 |
---|---|
filter() | 過濾數(shù)據(jù) |
takeLast() | 只發(fā)射最后的N項(xiàng)數(shù)據(jù) |
last() | 只發(fā)射最后的一項(xiàng)數(shù)據(jù) |
lastOrDefault() | 只發(fā)射最后的一項(xiàng)數(shù)據(jù)显沈,如果Observable為空就發(fā)射默認(rèn)值 |
takeLastBuffer() | 將最后的N項(xiàng)數(shù)據(jù)當(dāng)做單個(gè)數(shù)據(jù)發(fā)射 |
skip() | 跳過開始的N項(xiàng)數(shù)據(jù) |
skipLast() | 跳過最后的N項(xiàng)數(shù)據(jù) |
take() | 只發(fā)射開始的N項(xiàng)數(shù)據(jù) |
first() , takeFirst() | 只發(fā)射第一項(xiàng)數(shù)據(jù)软瞎,或者滿足某種條件的第一項(xiàng)數(shù)據(jù) |
firstOrDefault() | 只發(fā)射第一項(xiàng)數(shù)據(jù),如果Observable為空就發(fā)射默認(rèn)值 |
elementAt() | 發(fā)射第N項(xiàng)數(shù)據(jù) |
elementAtOrDefault() | 發(fā)射第N項(xiàng)數(shù)據(jù)拉讯,如果Observable數(shù)據(jù)少于N項(xiàng)就發(fā)射默認(rèn)值 |
sample() , throttleLast() | 定期發(fā)射Observable最近的數(shù)據(jù) |
throttleFirst() | 定期發(fā)射Observable發(fā)射的第一項(xiàng)數(shù)據(jù) |
throttleWithTimeout() , debounce() | 只有在空閑了一段時(shí)間后才發(fā)射數(shù)據(jù)涤浇,通俗的說,就是如果一段時(shí)間沒有操作魔慷,就執(zhí)行一次操作 |
timeout() | 如果在一個(gè)指定的時(shí)間段后還沒發(fā)射數(shù)據(jù)只锭,就發(fā)射一個(gè)異常 |
distinct() | 過濾掉重復(fù)數(shù)據(jù) |
distinctUntilChanged() | 過濾掉連續(xù)重復(fù)的數(shù)據(jù) |
ofType() | 只發(fā)射指定類型的數(shù)據(jù) |
ignoreElements() | 丟棄所有的正常數(shù)據(jù),只發(fā)射錯(cuò)誤或完成通知 |
組合操作符
操作符 | 解析 |
---|---|
and() , then() , when() | 通過模式(And條件)和計(jì)劃(Then次序)組合兩個(gè)或多個(gè)Observable發(fā)射的數(shù)據(jù)集 |
combineLatest() | 當(dāng)兩個(gè)Observables中的任何一個(gè)發(fā)射了一個(gè)數(shù)據(jù)時(shí)院尔,通過一個(gè)指定的函數(shù)組合每個(gè)Observable發(fā)射的最新數(shù)據(jù)(一共兩個(gè)數(shù)據(jù))蜻展,然后發(fā)射這個(gè)函數(shù)的結(jié)果 |
join() , groupJoin() | 無論何時(shí)喉誊,如果一個(gè)Observable發(fā)射了一個(gè)數(shù)據(jù)項(xiàng),只要在另一個(gè)Observable發(fā)射的數(shù)據(jù)項(xiàng)定義的時(shí)間窗口內(nèi)纵顾,就將兩個(gè)Observable發(fā)射的數(shù)據(jù)合并發(fā)射 |
merge() | 將多個(gè)Observable合并為一個(gè) |
startWith() | 在數(shù)據(jù)序列的開頭增加一項(xiàng)數(shù)據(jù)伍茄,在發(fā)射原來的Observable的數(shù)據(jù)序列之前,先發(fā)射一個(gè)指定的數(shù)據(jù)序列或數(shù)據(jù)項(xiàng) |
switch() | 將一個(gè)發(fā)射Observable序列的Observable轉(zhuǎn)換為這樣一個(gè)Observable:它逐個(gè)發(fā)射那些Observable最近發(fā)射的數(shù)據(jù) |
switchOnNext() | 將一個(gè)發(fā)射Observables的Observable轉(zhuǎn)換成另一個(gè)Observable施逾,后者發(fā)射這些Observables最近發(fā)射的數(shù)據(jù) |
zip() | 打包幻林,使用一個(gè)指定的函數(shù)將多個(gè)Observable發(fā)射的數(shù)據(jù)組合在一起,然后將這個(gè)函數(shù)的結(jié)果作為單項(xiàng)數(shù)據(jù)發(fā)射 |
mergeDelayError() | 合并多個(gè)Observables音念,讓沒有錯(cuò)誤的Observable都完成后再發(fā)射錯(cuò)誤通知 |
錯(cuò)誤處理操作符
名稱 | 解析 |
---|---|
onErrorResumeNext() | 指示Observable在遇到錯(cuò)誤時(shí)發(fā)射一個(gè)數(shù)據(jù)序列 |
onErrorReturn() | 讓Observable遇到錯(cuò)誤時(shí)發(fā)射一個(gè)特殊的項(xiàng)并且正常終止沪饺。方法返回一個(gè)鏡像原有Observable行為的新Observable,后者會(huì)忽略前者的onError調(diào)用闷愤,不會(huì)將錯(cuò)誤傳遞給觀察者整葡,作為替代,它會(huì)發(fā)發(fā)射一個(gè)特殊的項(xiàng)并調(diào)用觀察者的onCompleted方法讥脐。 |
onExceptionResumeNext() | 指示Observable遇到錯(cuò)誤時(shí)繼續(xù)發(fā)射數(shù)據(jù) |
retry() | 指示Observable遇到錯(cuò)誤時(shí)重試 |
retryWhen() | 指示Observable遇到錯(cuò)誤時(shí)遭居,將錯(cuò)誤傳遞給另一個(gè)Observable來決定是否要重新給訂閱這個(gè)Observable |
retryUntil() | 指示Observable遇到錯(cuò)誤時(shí),是否讓Observable重新訂閱 |
輔助操作符
名稱 | 解析 |
---|---|
materialize() | 將Observable轉(zhuǎn)換成一個(gè)通知列表 |
dematerialize() | 將上面的結(jié)果逆轉(zhuǎn)回一個(gè)Observable |
timestamp() | 給Observable發(fā)射的每個(gè)數(shù)據(jù)項(xiàng)添加一個(gè)時(shí)間戳 |
serialize() | 強(qiáng)制Observable按次序發(fā)射數(shù)據(jù)并且要求功能是完好的 |
cache() | 記住Observable發(fā)射的數(shù)據(jù)序列并發(fā)射相同的數(shù)據(jù)序列給后續(xù)的訂閱者 |
observeOn() | 指定觀察者觀察Observable的調(diào)度器 |
subscribeOn() | 指定Observable執(zhí)行任務(wù)的調(diào)度器 |
doOnEach() | 注冊一個(gè)動(dòng)作旬渠,對Observable發(fā)射的每個(gè)數(shù)據(jù)項(xiàng)使用 |
doOnCompleted() | 注冊一個(gè)動(dòng)作俱萍,對正常完成的Observable使用 |
doOnError() | 注冊一個(gè)動(dòng)作,對發(fā)生錯(cuò)誤的Observable使用 |
doOnTerminate() | Observable終止之前會(huì)被調(diào)用告丢,無論是正常還是異常終止 |
doOnSubscribe() | 注冊一個(gè)動(dòng)作枪蘑,在觀察者訂閱時(shí)使用 |
doOnUnsubscribe() | 注冊一個(gè)動(dòng)作,在觀察者取消訂閱時(shí)使用 |
doOnNext() | 在onNext前執(zhí)行 |
doAfterNext() | 在onNext之后執(zhí)行 |
doAfterTerminate | 終止發(fā)送時(shí)候調(diào)用 |
doOnLifecycle | 可以在訂閱之后 設(shè)置是否取消訂閱 |
doFinally | 在最后執(zhí)行 |
finallyDo() | 注冊一個(gè)動(dòng)作岖免,在Observable完成時(shí)使用 |
delay() | 延時(shí)發(fā)射Observable的結(jié)果 |
delaySubscription() | 延時(shí)處理訂閱請求 |
timeInterval() | 轉(zhuǎn)換獲取數(shù)據(jù)發(fā)送的時(shí)間間隔 |
using() | 創(chuàng)建一個(gè)只在Observable生命周期存在的資源 |
single() | 強(qiáng)制返回單個(gè)數(shù)據(jù)岳颇,否則拋出異常 |
singleOrDefault() | 如果Observable完成時(shí)返回了單個(gè)數(shù)據(jù),就返回它颅湘,否則返回默認(rèn)數(shù)據(jù) |
toFuture(),toIterable(),toList() | 將Observable轉(zhuǎn)換為其它對象或數(shù)據(jù)結(jié)構(gòu) |
條件操作符
名稱 | 解析 |
---|---|
amb() | 給定多個(gè)Observable话侧,只讓第一個(gè)發(fā)射數(shù)據(jù)的Observable發(fā)射全部數(shù)據(jù) |
defaultIfEmpty() | 發(fā)射來自原始Observable的數(shù)據(jù),如果原始Observable沒有發(fā)射數(shù)據(jù)闯参,就發(fā)射一個(gè)默認(rèn)數(shù)據(jù) |
switchIfEmpty() | 如果原始Observable沒有發(fā)射數(shù)據(jù)闷营,它發(fā)射一個(gè)備用Observable的發(fā)射物 |
(rxjava-computation-expressions) doWhile() | 發(fā)射原始Observable的數(shù)據(jù)序列扯旷,然后重復(fù)發(fā)射這個(gè)序列直到不滿足這個(gè)條件為止 |
(rxjava-computation-expressions) ifThen() 只有當(dāng)某個(gè)條件為真時(shí)才發(fā)射原始Observable的數(shù)據(jù)序列,否則發(fā)射一個(gè)空的或默認(rèn)的序列 | |
skipUntil() | 丟棄原始Observable發(fā)射的數(shù)據(jù),直到第二個(gè)Observable發(fā)射了一個(gè)數(shù)據(jù)唉锌,然后發(fā)射原始Observable的剩余數(shù)據(jù) |
skipWhile() | 丟棄原始Observable發(fā)射的數(shù)據(jù)行拢,直到一個(gè)特定的條件為假旭愧,然后發(fā)射原始Observable剩余的數(shù)據(jù) |
(rxjava-computation-expressions) switchCase() | 基于一個(gè)計(jì)算結(jié)果辽狈,發(fā)射一個(gè)指定Observable的數(shù)據(jù)序列 |
takeUntil() | 發(fā)射來自原始Observable的數(shù)據(jù),直到第二個(gè)Observable發(fā)射了一個(gè)數(shù)據(jù)或一個(gè)通知 |
takeWhile(),takeWhileWithIndex() | 發(fā)射原始Observable的數(shù)據(jù),直到一個(gè)特定的條件為真涩蜘,然后跳過剩余的數(shù)據(jù) |
(rxjava-computation-expressions)whileDo() | 如果滿足一個(gè)條件嚼贡,發(fā)射原始Observable的數(shù)據(jù),然后重復(fù)發(fā)射直到不滿足這個(gè)條件為止 |
(rxjava-computation-expressions): 表示這個(gè)操作符當(dāng)前是可選包 rxjava-
computation-expressions 的一部分同诫,還沒有包含在標(biāo)準(zhǔn)RxJava的操作符集合里,需要自己導(dǎo)包粤策,但是這個(gè)包用的是Rxjava1.0.0-rc3的版本,所以在Rxjava2.0用的話误窖,會(huì)出現(xiàn)問題叮盘。 可以使用repeat操作符代替
compile 'io.reactivex:rxjava-computation-expressions:0.21.0'
布爾操作符
名稱 | 解析 |
---|---|
all() | 判斷是否所有的數(shù)據(jù)項(xiàng)都滿足某個(gè)條件 |
contains() | 判斷Observable是否會(huì)發(fā)射一個(gè)指定的值 |
isEmpty() | 判斷Observable是否發(fā)射了一個(gè)值 |
sequenceEqual() | 判斷兩個(gè)Observables發(fā)射的序列是否相等 |
算術(shù)和聚合操作符
這個(gè)模塊需要導(dǎo)下面這個(gè)包,不支持Rxjava2,所以Rxjava2用不了噢:
compile 'io.reactivex:rxjava-math:1.0.0'
名稱 | 解析 |
---|---|
averageInteger() | 求序列平均數(shù)并發(fā)射 |
averageLong() | 求序列平均數(shù)并發(fā)射 |
averageFloat() | 求序列平均數(shù)并發(fā)射 |
averageDouble() | 求序列平均數(shù)并發(fā)射 |
max() | 求序列最大值并發(fā)射 |
maxBy() | 求最大key對應(yīng)的值并發(fā)射 |
min() | 求最小值并發(fā)射 |
minBy() | 求最小Key對應(yīng)的值并發(fā)射 |
sumInteger() | 求和并發(fā)射 |
sumLong() | 求和并發(fā)射 |
sumFloat() | 求和并發(fā)射 |
sumDouble() | 求和并發(fā)射 |
其他聚合操作符:
名稱 | 解析 |
---|---|
concat() / concatArray | 順序連接多個(gè)Observables |
count() | 計(jì)算數(shù)據(jù)項(xiàng)的個(gè)數(shù)并發(fā)射結(jié)果 |
reduce() | 按順序?qū)bservable發(fā)射的每項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù)并發(fā)射最終的值 |
collect() | 將原始Observable發(fā)射的數(shù)據(jù)放到一個(gè)單一的可變的數(shù)據(jù)結(jié)構(gòu)中霹俺,然后返回一個(gè)發(fā)射這個(gè)數(shù)據(jù)結(jié)構(gòu)的Observable |
toList() | 收集原始Observable發(fā)射的所有數(shù)據(jù)到一個(gè)列表柔吼,然后返回這個(gè)列表 |
toSortedList() | 收集原始Observable發(fā)射的所有數(shù)據(jù)到一個(gè)有序列表,然后返回這個(gè)列表 |
toMap() | 將序列數(shù)據(jù)轉(zhuǎn)換為一個(gè)Map丙唧,Map的key是根據(jù)一個(gè)函數(shù)計(jì)算的 |
toMultiMap() | 將序列數(shù)據(jù)轉(zhuǎn)換為一個(gè)列表愈魏,同時(shí)也是一個(gè)Map,Map的key是根據(jù)一個(gè)函數(shù)計(jì)算的 |
ps: 由于這些操作必須等待數(shù)據(jù)發(fā)射完成(通常也必須緩存這些數(shù)據(jù))想际,它們對于非常長或者無限的序列來說是危險(xiǎn)的培漏,不推薦使用。
異步操作符
異步操作符屬于單獨(dú)的rxjava-async模塊胡本,它們用于將同步對象轉(zhuǎn)換為Observable牌柄。不支持Rxjava2.0,如果使用Rxjava1.0的話侧甫,可以導(dǎo)入下面的包就可以使用異步操作符了珊佣。
compile 'io.reactivex:rxjava-async-util:0.21.0'
名稱 | 解析 |
---|---|
start() | 創(chuàng)建一個(gè)Observable,它發(fā)射一個(gè)函數(shù)的返回值 |
toAsync() / asyncAction()/ asyncFunc() | 將一個(gè)函數(shù)或者Action轉(zhuǎn)換為已Observable闺骚,它執(zhí)行這個(gè)函數(shù)并發(fā)射函數(shù)的返回值 |
startFuture() | 將一個(gè)返回Future的函數(shù)轉(zhuǎn)換為一個(gè)Observable彩扔,它發(fā)射Future的返回值 |
deferFuture() | 將一個(gè)返回Observable的Future轉(zhuǎn)換為一個(gè)Observable,但是并不嘗試獲取這個(gè)Future返回的Observable僻爽,直到有訂閱者訂閱它 |
forEachFuture() | 傳遞Subscriber方法給一個(gè)Subscriber,但是同時(shí)表現(xiàn)得像一個(gè)Future一樣阻塞直到它完成 |
fromAction() | 將一個(gè)Action轉(zhuǎn)換為Observable贾惦,當(dāng)一個(gè)訂閱者訂閱時(shí)胸梆,它執(zhí)行這個(gè)action并發(fā)射它的返回值 |
fromCallable() | 將一個(gè)Callable轉(zhuǎn)換為Observable,當(dāng)一個(gè)訂閱者訂閱時(shí)须板,它執(zhí)行這個(gè)Callable并發(fā)射Callable的返回值碰镜,或者發(fā)射異常 |
fromRunnable() | 將一個(gè)Runnable轉(zhuǎn)換為Observable,當(dāng)一個(gè)訂閱者訂閱時(shí)习瑰,它執(zhí)行這個(gè)Runnable并發(fā)射Runnable的返回值 |
runAsync() | 返回一個(gè)StoppableObservable绪颖,它發(fā)射某個(gè)Scheduler上指定的Action生成的多個(gè)actions |
連接操作符
一個(gè)可連接的Observable與普通的Observable差不多,除了這一點(diǎn):可連接的Observabe在被訂閱時(shí)并不開始發(fā)射數(shù)據(jù)甜奄,只有在它的connect()被調(diào)用時(shí)才開始柠横。用這種方法窃款,你可以等所有的潛在訂閱者都訂閱了這個(gè)Observable之后才開始發(fā)射數(shù)據(jù)。
名稱 | 解析 |
---|---|
ConnectableObservable.connect() | 指示一個(gè)可連接的Observable開始發(fā)射數(shù)據(jù) |
Observable.publish() | 將一個(gè)Observable轉(zhuǎn)換為一個(gè)可連接的Observable |
Observable.replay() | 確保所有的訂閱者看到相同的數(shù)據(jù)序列牍氛,即使它們在Observable開始發(fā)射數(shù)據(jù)之后才訂閱 |
ConnectableObservable.refCount() | 讓一個(gè)可連接的Observable表現(xiàn)得像一個(gè)普通的Observable |
阻塞操作符
在Rxjava1中的BlockingObservable已經(jīng)在Rxjava2中去掉了晨继,在Rxjava2中已經(jīng)集成到了Observable中。
官方說明:
https://github.com/ReactiveX/RxJava/wiki/What’s-different-in-2.0
名稱 | 解析 |
---|---|
blockingForEach() | 對Observable發(fā)射的每一項(xiàng)數(shù)據(jù)調(diào)用一個(gè)方法搬俊,會(huì)阻塞直到Observable完成 |
blockingFirst() | 阻塞直到Observable發(fā)射了一個(gè)數(shù)據(jù)紊扬,然后返回第一項(xiàng)數(shù)據(jù) |
blockingMostRecent() | 返回一個(gè)總是返回Observable最近發(fā)射的數(shù)據(jù)的iterable |
blockingLatest() | 返回一個(gè)iterable,會(huì)阻塞直到或者除非Observable發(fā)射了一個(gè)iterable沒有返回的值唉擂,然后返回這個(gè)值 |
blockingNext() | 返回一個(gè)iterable,阻塞直到返回另外一個(gè)值 |
blockingLast() | 阻塞直到Observable終止餐屎,然后返回最后一項(xiàng)數(shù)據(jù) |
blockingIterable() | 將Observable轉(zhuǎn)換返回一個(gè)iterable. |
blockingSingle() | 如果Observable終止時(shí)只發(fā)射了一個(gè)值,返回那個(gè)值玩祟,否則拋出異常 |
blockingSubscribe() | 在當(dāng)前線程訂閱腹缩,和forEach類似 |