RxJava操作符圖譜
創(chuàng)建操作符
create
完整創(chuàng)建1個被觀察者對象(Observable)
just
- 快速創(chuàng)建1個被觀察者對象(Observable)
- 發(fā)送事件的特點:直接發(fā)送 傳入的事件
快速創(chuàng)建 被觀察者對象(Observable) & 發(fā)送10個以下事件
from
fromeArray
- 快速創(chuàng)建1個被觀察者對象(Observable)
- 發(fā)送事件的特點:直接發(fā)送 傳入的數(shù)組數(shù)據(jù)
將數(shù)組元素一次發(fā)射出查吊,可以用來遍歷數(shù)組
fromIterable
- 快速創(chuàng)建1個被觀察者對象(Observable)
- 發(fā)送事件的特點:直接發(fā)送 傳入的集合List數(shù)據(jù)
同上,可用來遍歷集合
發(fā)送事件
下列方法一般用于測試使用
<-- empty() -->
// 該方法創(chuàng)建的被觀察者對象發(fā)送事件的特點:僅發(fā)送Complete事件,直接通知完成
Observable observable1=Observable.empty();
// 即觀察者接收后會直接調(diào)用onCompleted()
<-- error() -->
// 該方法創(chuàng)建的被觀察者對象發(fā)送事件的特點:僅發(fā)送Error事件器腋,直接通知異常
// 可自定義異常
Observable observable2=Observable.error(new RuntimeException())
// 即觀察者接收后會直接調(diào)用onError()
<-- never() -->
// 該方法創(chuàng)建的被觀察者對象發(fā)送事件的特點:不發(fā)送任何事件
Observable observable3=Observable.never();
// 即觀察者接收后什么都不調(diào)用
延時操作符
- 定時操作:在經(jīng)過了x秒后,需要自動執(zhí)行y操作
- 周期性操作:每隔x秒后饭入,需要自動執(zhí)行y操作
delay
使得被觀察者延遲一段時間再發(fā)送事件
// 1. 指定延遲時間
// 參數(shù)1 = 時間;參數(shù)2 = 時間單位
delay(long delay,TimeUnit unit)
// 2. 指定延遲時間 & 調(diào)度器
// 參數(shù)1 = 時間;參數(shù)2 = 時間單位五芝;參數(shù)3 = 線程調(diào)度器
delay(long delay,TimeUnit unit,mScheduler scheduler)
// 3. 指定延遲時間 & 錯誤延遲
// 錯誤延遲,即:若存在Error事件辕万,則如常執(zhí)行枢步,執(zhí)行后再拋出錯誤異常
// 參數(shù)1 = 時間;參數(shù)2 = 時間單位蓄坏;參數(shù)3 = 錯誤延遲參數(shù)
delay(long delay,TimeUnit unit,boolean delayError)
// 4. 指定延遲時間 & 調(diào)度器 & 錯誤延遲
// 參數(shù)1 = 時間价捧;參數(shù)2 = 時間單位;參數(shù)3 = 線程調(diào)度器涡戳;參數(shù)4 = 錯誤延遲參數(shù)
delay(long delay,TimeUnit unit,mScheduler scheduler,boolean delayError): 指定延遲多長時間并添加調(diào)度器结蟋,錯誤通知可以設置是否延遲
defer
直到有觀察者(Observer )訂閱時,才動態(tài)創(chuàng)建被觀察者對象(Observable) & 發(fā)送事件
- 通過 Observable工廠方法創(chuàng)建被觀察者對象(Observable)
- 每次訂閱后渔彰,都會得到一個剛創(chuàng)建的最新的Observable對象嵌屎,這可以確保Observable對象里的數(shù)據(jù)是最新的
<-- 1. 第1次對i賦值 ->>
Integer i = 10;
// 2. 通過defer 定義被觀察者對象
// 注:此時被觀察者對象還沒創(chuàng)建
Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> call() throws Exception {
return Observable.just(i);
}
});
<-- 2. 第2次對i賦值 ->>
i = 15;
<-- 3. 觀察者開始訂閱 ->>
// 注:此時,才會調(diào)用defer()創(chuàng)建被觀察者對象(Observable)
observable.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始采用subscribe連接");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到的整數(shù)是"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
timer
- 快速創(chuàng)建1個被觀察者對象(Observable)
- 發(fā)送事件的特點:延遲指定時間后恍涂,發(fā)送1個數(shù)值0(Long類型)
timer操作符默認運行在一個新線程上
也可自定義線程調(diào)度器(第3個參數(shù)):timer(long, TimeUnit, Scheduler)
interval
快速創(chuàng)建1個被觀察者對象(Observable)
-
發(fā)送事件的特點:每隔指定時間就發(fā)送事件
/** * initialDelay 初始延時時間 * unit 時間單位 * period 間隔時間 * scheduler 線程調(diào)度器 */ public static Observable<Long> interval(long interval, TimeUnit unit) { return interval(interval, interval, unit, Schedulers.computation()); } public static Observable<Long> interval(long interval, TimeUnit unit, Scheduler scheduler) { return interval(interval, interval, unit, scheduler); } public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit) { return interval(initialDelay, period, unit, Schedulers.computation()); }
range/rangeLong
快速創(chuàng)建1個被觀察者對象(Observable)
-
發(fā)送事件的特點:連續(xù)發(fā)送1個事件序列宝惰,可指定范圍
/** * start 起始數(shù)字 * count 數(shù)量 */ public static Observable<Integer> range(int start, int count)
過濾操作符
take, takeFirst, takeLast
skip, skipFirst, skipLast
first
last
-
firstOrDefault, lastOrDefault (只發(fā)射最后一項(或者滿足某個條件的最后一項)數(shù)據(jù),可以指定默認值再沧。)
// 跳過前面幾項 public final Observable<T> skip(int count) // 跳過前面的時間尼夺,之后產(chǎn)生的數(shù)據(jù)提交 public final Observable<T> skip(long time, TimeUnit unit) // skipLast和skip相反,跳過后面的幾項炒瘸。 // 忽略最后時間單位內(nèi)產(chǎn)生的數(shù)據(jù) skipLast(long time,TimeUnit) // 并不是娶第n個淤堵,而是取前面n個數(shù)據(jù) take(n) // 是在制定時間內(nèi)取數(shù)據(jù),如果超過了這個時間源Observable產(chǎn)生的數(shù)據(jù)將作廢 take(long time, TimeUnit unit)
takeFirst操作符和first操作符類似,取滿足條件的第一個
區(qū)別:first取不到要拋異常顷扩,takeFirst不會takeLast操作符與last操作符相似拐邪。區(qū)別在于,如果取不到滿足條件的值隘截,last將拋出異常
filter
過濾數(shù)據(jù)扎阶,不滿足條件的數(shù)據(jù)將被過濾不發(fā)射。
filter(Fun) 自定義過濾條件
ofType
過濾指定類型的數(shù)據(jù)
Observable.just(1,2,"3")
.ofType(Integer.class)
.subscribe(item -> Log.d("JG",item.toString()));
elementAt/elementAtOrDefault/elementAtOrError
發(fā)射某一項數(shù)據(jù)婶芭,如果超過了范圍可以指定默認值东臀。內(nèi)部通過OperatorElementAt過濾。
Observable.just(3,4,5,6)
.elementAt(2)
.subscribe(item->Log.d("JG",item.toString())); //5
firstElement/lastElement
僅選取第1個元素 / 最后一個元素
ignoreElements
丟棄所有數(shù)據(jù)雕擂,只發(fā)射錯誤或正常終止的通知啡邑。內(nèi)部通過OperatorIgnoreElements實現(xiàn)。
distinct
過濾重復數(shù)據(jù)井赌,內(nèi)部通過OperatorDistinct實現(xiàn)谤逼。
distinctUntilChanged
過濾掉連續(xù)重復的數(shù)據(jù)贵扰。內(nèi)部通過OperatorDistinctUntilChanged實現(xiàn)
Observable.just(3,4,5,6,3,3,4,9)
.distinctUntilChanged()
.subscribe(item->Log.d("JG",item.toString())); //3,4,5,6,3,4,9
timeout
如果原始Observable過了指定的一段時長沒有發(fā)射任何數(shù)據(jù),就發(fā)射一個異沉鞑浚或者使用備用的Observable戚绕。
Debounce/throtleWithTimeout
根據(jù)你指定的時間間隔進行限流
發(fā)送數(shù)據(jù)事件時,若2次發(fā)送事件的間隔<指定時間枝冀,就會丟棄前一次的數(shù)據(jù)舞丛,直到指定時間內(nèi)都沒有新數(shù)據(jù)發(fā)射時才會發(fā)送后一次的數(shù)據(jù)
條件操作符
single/singleOrDefault
檢測源Observable產(chǎn)生的數(shù)據(jù)項是否只有一個,否則報錯
onError()
java.lang.IllegalArgumentException: Sequence contains too many elements
all
all操作符接收一個函數(shù)參數(shù),創(chuàng)建并返回一個單布爾值的Observable,
如果原Observable正常終止并且每一項數(shù)據(jù)都滿足條件果漾,就返回true,
如果原Observable的任何一項數(shù)據(jù)不滿足條件或者非正常終止就返回False球切。
判斷所有的數(shù)據(jù)項是否滿足某個條件,內(nèi)部通過OperatorAll實現(xiàn)绒障。
amb/ambWith
amb操作符對于給定兩個或多個Observables吨凑,它只發(fā)射首先發(fā)射數(shù)據(jù)或通知的那個Observable的所有數(shù)據(jù)。
當你傳遞多個Observable給amb操作符時户辱,該操作符只發(fā)射其中一個Observable的數(shù)據(jù)和通知:首先發(fā)送通知給amb操作符的的那個Observable鸵钝,不管發(fā)射的是一項數(shù)據(jù)還是一個onError或onCompleted通知,amb將忽略和丟棄其它所有Observables的發(fā)射物庐镐。
amb(T o1, T ... o2)(可接受2到9個參數(shù))
給定多個Observable恩商,只讓第一個發(fā)射數(shù)據(jù)的Observable發(fā)射全部數(shù)據(jù),其他Observable將會被忽略必逆。
contains
contains操作符將接收一個特定的值作為一個參數(shù)怠堪,判定原Observable是否發(fā)射該值,若已發(fā)射名眉,則創(chuàng)建并返回的Observable將發(fā)射true研叫,否則發(fā)射false。
判斷在發(fā)射的所有數(shù)據(jù)項中是否包含指定的數(shù)據(jù)璧针,內(nèi)部調(diào)用的其實是exists
contains操作符默認不在任何特定的調(diào)度器上執(zhí)行。
可用來判斷Observable發(fā)射的值中是否包含該值渊啰。
exists
exists操作符類似與contains操作符探橱,不同的是,其接受一個函數(shù)參數(shù)绘证,在函數(shù)中隧膏,對原Observable發(fā)射的數(shù)據(jù),設定比對條件并做判斷嚷那。若任何一項滿足條件就創(chuàng)建并返回一個發(fā)射true的Observable胞枕,否則返回一個發(fā)射false的Observable。
該操作符默認不在任何特定的調(diào)度器上執(zhí)行魏宽。
判斷是否存在數(shù)據(jù)項滿足某個條件腐泻。內(nèi)部通過OperatorAny實現(xiàn)决乎。
isEmpty
isEmpty操作符用于判定原始Observable是否沒有發(fā)射任何數(shù)據(jù)。若原Observable未發(fā)射任何數(shù)據(jù)派桩,創(chuàng)建創(chuàng)建并返回一個發(fā)射true的Observable构诚,否則返回一個發(fā)射false的Observable。
isEmpty操作符默認不在任何特定的調(diào)度器上執(zhí)行铆惑。
可以用來判斷是否沒有數(shù)據(jù)發(fā)射范嘱。
defaultIfEmpty
defaultIfEmpty操作接受一個備用數(shù)據(jù),在原Observable沒有發(fā)射任何數(shù)據(jù)正常終止(以onCompletedd的形式)员魏,該操作符以備用數(shù)據(jù)創(chuàng)建一個Observable并將數(shù)據(jù)發(fā)射出去丑蛤。
RxJava將這個操作符實現(xiàn)為defaultIfEmpty。它默認不在任何特定的調(diào)度器上執(zhí)行撕阎。
switchIfEmpty
如果原始Observable正常終止后仍然沒有發(fā)射任何數(shù)據(jù)受裹,就使用備用的Observable。
如果原始Observable正常終止后仍然沒有發(fā)射任何數(shù)據(jù)
defaultIfEmpty使用默認值發(fā)射闻书,switchIfEmpty使用默認Observable發(fā)射
sequenceEqual
sequenceEqual(Observable,Observable,Func2)變體接收兩個Observable參數(shù)和一個函數(shù)參數(shù)名斟,在函數(shù)參數(shù)中,可以比較兩個參數(shù)是否相同魄眉。
該操作符默認不在任何特定的調(diào)度器上執(zhí)行砰盐。
用于判斷兩個Observable發(fā)射的數(shù)據(jù)是否相同(數(shù)據(jù),發(fā)射順序坑律,終止狀態(tài))
skipUntil
skipUntil操作符在觀察者訂閱原Observable時岩梳,該操作符將是忽略原Observable的發(fā)射的數(shù)據(jù),直到第二個Observable發(fā)射了一項數(shù)據(jù)那一刻晃择,它才 開始發(fā)射原Observable發(fā)射的數(shù)據(jù)冀值。
該操作符默認不在任何特定的調(diào)度器上執(zhí)行。
skipWhile
skipWhile操作符丟棄原Observable發(fā)射的數(shù)據(jù)宫屠,直到發(fā)射的數(shù)據(jù)不滿足一個指定的條件列疗,才開始發(fā)射原Observable發(fā)射的數(shù)據(jù)。
在觀察者訂閱原Observable時浪蹂,skipWhile操作符將忽略原Observable的發(fā)射物抵栈,直到你指定的某個條件變?yōu)閒alse時,它開始發(fā)射原Observable發(fā)射的數(shù)據(jù)坤次。
skipWhile操作符默認不在任何特定的調(diào)度器上執(zhí)行古劲。
takeUntil
takeUntil操作符與skipUntil操作符作用相反,當?shù)诙€Observable發(fā)射了一項數(shù)據(jù)或者終止時缰猴,丟棄原Observable發(fā)射的任何數(shù)據(jù)产艾。
takeUntil(Func1)變體接受一個函數(shù)參數(shù),當滿足條件時終止發(fā)射數(shù)據(jù)。
takeWhile
takeWhile操作符與skipWhile操作符作用相反闷堡。在觀察者訂閱原Observable時隘膘,takeWhile創(chuàng)建并返回原Oservable的鏡像Observable,暫命名為_observable缚窿,發(fā)射原Observable發(fā)射的數(shù)據(jù)棘幸。當你指定的某個條件變?yōu)閒alse時,_observable發(fā)射onCompleted終止通知倦零。
takeWhile操作符默認不在任何特定的調(diào)度器上執(zhí)行误续。
變換操作符
map
對被觀察者發(fā)送的每1個事件都通過指定的函數(shù)處理,從而變換成另外一種事件
即扫茅,將被觀察者發(fā)送的事件轉(zhuǎn)換為任意的類型事件蹋嵌。
如果是list,可對list的每個元素進行類型轉(zhuǎn)換葫隙,最后tolist發(fā)射轉(zhuǎn)換后的list栽烂。
flatmap
對Observable發(fā)射的數(shù)據(jù)都應用(apply)一個函數(shù)破喻,這個函數(shù)返回一個Observable伪阶,然后合并這些Observables,并且發(fā)送(emit)合并的結(jié)果逆瑞。 flatMap和map操作符很相像糟描,flatMap發(fā)送的是合并后的Observables怀喉,map操作符發(fā)送的是應用函數(shù)后返回的結(jié)果集
將原Observable發(fā)射的每個數(shù)據(jù)轉(zhuǎn)換為新的Observable,發(fā)射每一個轉(zhuǎn)換的Observable
新合并生成的事件序列順序是無序的船响,即與舊序列發(fā)送事件的順序無關
concatMap
作用同flatMap
與flatMap的區(qū)別是躬拢,新合并生成的事件序列順序是有序的
switchMap
當源Observable發(fā)射一個新的數(shù)據(jù)項時,如果舊數(shù)據(jù)項訂閱還未完成见间,就取消舊訂閱數(shù)據(jù)和停止監(jiān)視那個數(shù)據(jù)項產(chǎn)生的Observable,開始監(jiān)視新的數(shù)據(jù)項.
cast
cast操作符將原始Observable發(fā)射的每一項數(shù)據(jù)都強制轉(zhuǎn)換為一個指定的類型聊闯,然后再發(fā)射數(shù)據(jù),它是map的一個特殊版本
所相互轉(zhuǎn)換的類之間需要存在某種關系米诉,如繼承菱蔬、實現(xiàn)
concat
組合多個被觀察者一起發(fā)送數(shù)據(jù),合并后 按發(fā)送順序串行執(zhí)行
按發(fā)送順序串行執(zhí)行
merge
組合多個被觀察者一起發(fā)送數(shù)據(jù)史侣,合并后 按時間線并行執(zhí)行
區(qū)別上述concat()操作符:同樣是組合多個被觀察者一起發(fā)送數(shù)據(jù)汗销,但concat()操作符合并后是按發(fā)送順序串行執(zhí)行
并行執(zhí)行
zip
合并多個被觀察者(Observable)發(fā)送的事件,生成一個新的事件序列(即組合過后的事件序列)抵窒,并最終發(fā)送
事件組合方式 = 嚴格按照原先事件序列 進行對位合并
最終合并的事件數(shù)量 = 多個被觀察者(Observable)中數(shù)量最少的數(shù)量
reduce
把被觀察者需要發(fā)送的事件聚合成1個事件 & 發(fā)送
聚合的邏輯根據(jù)需求撰寫,但本質(zhì)都是前2個數(shù)據(jù)聚合叠骑,然后與后1個數(shù)據(jù)繼續(xù)進行聚合李皇,依次類推
自定義聚合條件,前2個數(shù)據(jù)聚合得到結(jié)果與第三個數(shù)據(jù)再聚合。以此類推...
collect
將被觀察者Observable發(fā)送的數(shù)據(jù)事件收集到一個數(shù)據(jù)結(jié)構(gòu)里
Observable.just(1, 2, 3, 4)
.collect(new Func0<ArrayList<Integer>>() {
@Override
public ArrayList<Integer> call() {
//創(chuàng)建收集容器
return new ArrayList<>();
}
}, new Action2<ArrayList<Integer>, Integer>() {
@Override
public void call(ArrayList<Integer> list1, Integer integer) {
//開始收集每一項數(shù)據(jù)
list1.add(integer);
}
}).subscribe(new Action1<ArrayList<Integer>>() {
@Override
public void call(ArrayList<Integer> integers) {
//得到收集后的數(shù)據(jù)
}
});
startWith
在一個被觀察者發(fā)送事件前掉房,追加發(fā)送一些數(shù)據(jù)或是一個新的被觀察者
//源碼是通過concat實現(xiàn)茧跋,在前面追加一個Observable
public final Observable<T> startWith(Observable<T> values) {
return concat(values, this);
}
compose
其他操作符
retry
重試,即當出現(xiàn)錯誤時卓囚,讓被觀察者(Observable)重新發(fā)射數(shù)據(jù)
retryUntil
出現(xiàn)錯誤后瘾杭,判斷是否需要重新發(fā)送數(shù)據(jù)
retryWhen
遇到錯誤時,將發(fā)生的錯誤傳遞給一個新的被觀察者(Observable)哪亿,并決定是否需要重新訂閱原始被觀察者(Observable)& 發(fā)送事件
repeat
無條件地粥烁、重復發(fā)送 被觀察者事件
具備重載方法,可設置重復創(chuàng)建次數(shù)
repeatWhen
有條件地蝇棉、重復發(fā)送 被觀察者事件
count
統(tǒng)計被觀察者發(fā)送事件的數(shù)量