下面是常用的操作符列表:
- 創(chuàng)建操作 Create, Defer, Empty/Never/Throw, From, Interval, Just, Range, Repeat, Start, Timer
- 變換操作 Buffer, FlatMap, GroupBy, Map, Scan和Window
- 過濾操作 Debounce, Distinct, ElementAt, Filter, First, IgnoreElements, Last, Sample, Skip, SkipLast, Take, TakeLast
- 組合操作 And/Then/When, CombineLatest, Join, Merge, StartWith, Switch, Zip
- 錯誤處理 Catch和Retry
- 輔助操作 Delay, Do, Materialize/Dematerialize, ObserveOn, Serialize, Subscribe, SubscribeOn, TimeInterval, Timeout, Timestamp, Using
- 條件和布爾操作 All, Amb, Contains, DefaultIfEmpty, SequenceEqual, SkipUntil, SkipWhile, TakeUntil, TakeWhile
- 算術和集合操作 Average, Concat, Count, Max, Min, Reduce, Sum
- 轉(zhuǎn)換操作 To
- 連接操作 Connect, Publish, RefCount, Replay
- 反壓操作下硕,用于增加特殊的流程控制策略的操作符
這些操作符并不全都是ReactiveX的核心組成部分,有一些是語言特定的實現(xiàn)或可選的模塊辛掠。
RxJava
在RxJava中没咙,一個實現(xiàn)了Observer接口的對象可以訂閱(subscribe)一個Observable 類的實例牌捷。訂閱者(subscriber)對Observable發(fā)射(emit)的任何數(shù)據(jù)或數(shù)據(jù)序列作出響應捉捅。這種模式簡化了并發(fā)操作寄月,因為它不需要阻塞等待Observable發(fā)射數(shù)據(jù),而是創(chuàng)建了一個處于待命狀態(tài)的觀察者哨兵,哨兵在未來某個時刻響應Observable的通知耕蝉。
Single
介紹
RxJava(以及它派生出來的RxGroovy和RxScala)中有一個名為Single的Observable變種扔亥。
Single類似于Observable伞鲫,不同的是柒瓣,它總是只發(fā)射一個值傍药,或者一個錯誤通知拣挪,而不是發(fā)射一系列的值乙埃。
因此出吹,不同于Observable需要三個方法onNext, onError, onCompleted,訂閱Single只需要兩個方法:
- onSuccess - Single發(fā)射單個的值到這個方法
- onError - 如果無法發(fā)射需要的值,Single發(fā)射一個Throwable對象到這個方法
Single只會調(diào)用這兩個方法中的一個,而且只會調(diào)用一次,調(diào)用了任何一個方法之后,訂閱關系終止。
Single的操作符
Single也可以組合使用多種操作,一些操作符讓你可以混合使用Observable和Single:
操作符 | 返回值 | 說明 |
---|---|---|
compose | Single | 創(chuàng)建一個自定義的操作符 |
concat and concatWith | Observable | 連接多個Single和Observable發(fā)射的數(shù)據(jù) |
create | Single | 調(diào)用觀察者的create方法創(chuàng)建一個Single |
error | Single | 返回一個立即給訂閱者發(fā)射錯誤通知的Single |
flatMap | Single | 返回一個Single,它發(fā)射對原Single的數(shù)據(jù)執(zhí)行flatMap操作后的結(jié)果 |
flatMapObservable | Observable | 返回一個Observable,它發(fā)射對原Single的數(shù)據(jù)執(zhí)行flatMap操作后的結(jié)果 |
from | Single | 將Future轉(zhuǎn)換成Single |
just | Single | 返回一個發(fā)射一個指定值的Single |
map | Single | 返回一個Single,它發(fā)射對原Single的數(shù)據(jù)執(zhí)行map操作后的結(jié)果 |
merge | Single | 將一個Single(它發(fā)射的數(shù)據(jù)是另一個Single,假設為B)轉(zhuǎn)換成另一個Single(它發(fā)射來自另一個Single(B)的數(shù)據(jù)) |
merge and mergeWith | Observable | 合并發(fā)射來自多個Single的數(shù)據(jù) |
observeOn | Single | 指示Single在指定的調(diào)度程序上調(diào)用訂閱者的方法 |
onErrorReturn | Single | 將一個發(fā)射錯誤通知的Single轉(zhuǎn)換成一個發(fā)射指定數(shù)據(jù)項的Single |
subscribeOn | Single | 指示Single在指定的調(diào)度程序上執(zhí)行操作 |
timeout | Single | 它給原有的Single添加超時控制索烹,如果超時了就發(fā)射一個錯誤通知 |
toSingle | Single | 將一個發(fā)射單個值的Observable轉(zhuǎn)換為一個Single |
zip and zipWith | Single | 將多個Single轉(zhuǎn)換為一個况木,后者發(fā)射的數(shù)據(jù)是對前者應用一個函數(shù)后的結(jié)果 |
操作符圖示
詳細的圖解可以參考英文文檔:Single
Subject
Subject可以看成是一個橋梁或者代理,在某些ReactiveX實現(xiàn)中(如RxJava),它同時充當了Observer和Observable的角色。因為它是一個Observer阔逼,它可以訂閱一個或多個Observable;又因為它是一個Observable,它可以轉(zhuǎn)發(fā)它收到(Observe)的數(shù)據(jù),也可以發(fā)射新的數(shù)據(jù)。
由于一個Subject訂閱一個Observable,它可以觸發(fā)這個Observable開始發(fā)射數(shù)據(jù)(如果那個Observable是"冷"的--就是說烘贴,它等待有訂閱才開始發(fā)射數(shù)據(jù))趴樱。因此有這樣的效果,Subject可以把原來那個"冷"的Observable變成"熱"的疏虫。
Subject的種類
針對不同的場景一共有四種類型的Subject官扣。他們并不是在所有的實現(xiàn)中全部都存在,而且一些實現(xiàn)使用其它的命名約定(例如,在RxScala中Subject被稱作PublishSubject)。
AsyncSubject
一個AsyncSubject只在原始Observable完成后,發(fā)射來自原始Observable的最后一個值。(如果原始Observable沒有發(fā)射任何值,AsyncObject也不發(fā)射任何值)它會把這最后一個值發(fā)射給任何后續(xù)的觀察者锋华。BehaviorSubject
當觀察者訂閱BehaviorSubject時边琉,它開始發(fā)射原始Observable最近發(fā)射的數(shù)據(jù)(如果此時還沒有收到任何數(shù)據(jù)厌丑,它會發(fā)射一個默認值),然后繼續(xù)發(fā)射其它任何來自原始Observable的數(shù)據(jù)。PublishSubject
PublishSubject只會把在訂閱發(fā)生的時間點之后來自原始Observable的數(shù)據(jù)發(fā)射給觀察者攻柠。需要注意的是球订,PublishSubject可能會一創(chuàng)建完成就立刻開始發(fā)射數(shù)據(jù)(除非你可以阻止它發(fā)生),因此這里有一個風險:在Subject被創(chuàng)建后到有觀察者訂閱它之前這個時間段內(nèi)瑰钮,一個或多個數(shù)據(jù)可能會丟失。如果要確保來自原始Observable的所有數(shù)據(jù)都被分發(fā)浪谴,你需要這樣做:或者使用Create創(chuàng)建那個Observable以便手動給它引入"冷"Observable的行為(當所有觀察者都已經(jīng)訂閱時才開始發(fā)射數(shù)據(jù))开睡,或者改用ReplaySubject。ReplaySubject
ReplaySubject會發(fā)射所有來自原始Observable的數(shù)據(jù)給觀察者凶杖,無論它們是何時訂閱的胁艰。也有其它版本的ReplaySubject,在重放緩存增長到一定大小的時候或過了一段時間后會丟棄舊的數(shù)據(jù)(原始Observable發(fā)射的)智蝠。
如果你把ReplaySubject當作一個觀察者使用腾么,注意不要從多個線程中調(diào)用它的onNext方法(包括其它的on系列方法),這可能導致同時(非順序)調(diào)用杈湾,這會違反Observable協(xié)議解虱,給Subject的結(jié)果增加了不確定性。
RxJava的對應類
假設你有一個Subject漆撞,你想把它傳遞給其它的代理或者暴露它的Subscriber接口殴泰,你可以調(diào)用它的asObservable方法于宙,這個方法返回一個Observable。具體使用方法可以參考Javadoc文檔艰匙。
串行化
如果你把 Subject
當作一個 Subscriber
使用限煞,注意不要從多個線程中調(diào)用它的onNext方法(包括其它的on系列方法),這可能導致同時(非順序)調(diào)用员凝,這會違反Observable協(xié)議署驻,給Subject的結(jié)果增加了不確定性。
要避免此類問題健霹,你可以將 Subject
轉(zhuǎn)換為一個 SerializedSubject
旺上,類似于這樣:
mySafeSubject = new SerializedSubject( myUnsafeSubject );
調(diào)度器 Scheduler
如果你想給Observable操作符鏈添加多線程功能,你可以指定操作符(或者特定的Observable)在特定的調(diào)度器(Scheduler)上執(zhí)行糖埋。
某些ReactiveX的Observable操作符有一些變體宣吱,它們可以接受一個Scheduler參數(shù)。這個參數(shù)指定操作符將它們的部分或全部任務放在一個特定的調(diào)度器上執(zhí)行瞳别。
使用ObserveOn和SubscribeOn操作符征候,你可以讓Observable在一個特定的調(diào)度器上執(zhí)行,ObserveOn指示一個Observable在一個特定的調(diào)度器上調(diào)用觀察者的onNext, onError和onCompleted方法祟敛,SubscribeOn更進一步疤坝,它指示Observable將全部的處理過程(包括發(fā)射數(shù)據(jù)和通知)放在特定的調(diào)度器上執(zhí)行。
RxJava示例
調(diào)度器的種類
下表展示了RxJava中可用的調(diào)度器種類:
調(diào)度器類型 | 效果 |
---|---|
Schedulers.computation(?) | 用于計算任務馆铁,如事件循環(huán)或和回調(diào)處理跑揉,不要用于IO操作(IO操作請使用Schedulers.io());默認線程數(shù)等于處理器的數(shù)量 |
Schedulers.from(executor) | 使用指定的Executor作為調(diào)度器 |
Schedulers.immediate(?) | 在當前線程立即開始執(zhí)行任務 |
Schedulers.io(?) | 用于IO密集型任務埠巨,如異步阻塞IO操作历谍,這個調(diào)度器的線程池會根據(jù)需要增長;對于普通的計算任務辣垒,請使用Schedulers.computation()望侈;Schedulers.io(?)默認是一個CachedThreadScheduler,很像一個有線程緩存的新線程調(diào)度器 |
Schedulers.newThread(?) | 為每個任務創(chuàng)建一個新線程 |
Schedulers.trampoline(?) | 當其它排隊的任務完成后勋桶,在當前線程排隊開始執(zhí)行 |
默認調(diào)度器
在RxJava中脱衙,某些Observable操作符的變體允許你設置用于操作執(zhí)行的調(diào)度器,其它的則不在任何特定的調(diào)度器上執(zhí)行哥遮,或者在一個指定的默認調(diào)度器上執(zhí)行。下面的表格個列出了一些操作符的默認調(diào)度器:
操作符 | 調(diào)度器 |
---|---|
buffer(timespan) | computation |
buffer(timespan,?count) | computation |
buffer(timespan,?timeshift) | computation |
debounce(timeout,?unit) | computation |
delay(delay,?unit) | computation |
delaySubscription(delay,?unit) | computation |
interval | computation |
repeat | trampoline |
replay(time,?unit) | computation |
replay(buffersize,?time,?unit) | computation |
replay(selector,?time,?unit) | computation |
replay(selector,?buffersize,?time,?unit) | computation |
retry | trampoline |
sample(period,?unit) | computation |
skip(time,?unit) | computation |
skipLast(time,?unit) | computation |
take(time,?unit) | computation |
takeLast(time,?unit) | computation |
takeLast(count,?time,?unit) | computation |
takeLastBuffer(time,?unit) | computation |
takeLastBuffer(count,?time,?unit) | computation |
throttleFirst | computation |
throttleLast | computation |
throttleWithTimeout | computation |
timeInterval | immediate |
timeout(timeoutSelector) | immediate |
timeout(firstTimeoutSelector,?timeoutSelector) | immediate |
timeout(timeoutSelector,?other) | immediate |
timeout(timeout,?timeUnit) | computation |
timeout(firstTimeoutSelector,?timeoutSelector,?other) | immediate |
timeout(timeout,?timeUnit,?other) | computation |
timer | computation |
timestamp | immediate |
window(timespan) | computation |
window(timespan,?count) | computation |
window(timespan,?timeshift) | computation |
使用調(diào)度器
除了將這些調(diào)度器傳遞給RxJava的Observable操作符陵究,你也可以用它們調(diào)度你自己的任務眠饮。下面的示例展示了Scheduler.Worker的用法:
worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {
@Override
public void call() {
yourWork();
}
});
// some time later...
worker.unsubscribe();
遞歸調(diào)度器
要調(diào)度遞歸的方法調(diào)用,你可以使用schedule铜邮,然后再用schedule(this)仪召,示例:
worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {
@Override
public void call() {
yourWork();
// recurse until unsubscribed (schedule will do nothing if unsubscribed)
worker.schedule(this);
}
});
// some time later...
worker.unsubscribe();
檢查或設置取消訂閱狀態(tài)
Worker類的對象實現(xiàn)了Subscription接口寨蹋,使用它的isUnsubscribed和unsubscribe方法,所以你可以在訂閱取消時停止任務扔茅,或者從正在調(diào)度的任務內(nèi)部取消訂閱已旧,示例:
Worker worker = Schedulers.newThread().createWorker();
Subscription mySubscription = worker.schedule(new Action0() {
@Override
public void call() {
while(!worker.isUnsubscribed()) {
status = yourWork();
if(QUIT == status) { worker.unsubscribe(); }
}
}
});
Worker同時是Subscription,因此你可以(通常也應該)調(diào)用它的unsubscribe方法通知可以掛起任務和釋放資源了召娜。
延時和周期調(diào)度器
你可以使用schedule(action,delayTime,timeUnit)在指定的調(diào)度器上延時執(zhí)行你的任務运褪,下面例子中的任務將在500毫秒之后開始執(zhí)行:
someScheduler.schedule(someAction, 500, TimeUnit.MILLISECONDS);
使用另一個版本的schedule,schedulePeriodically(action,initialDelay,period,timeUnit)方法讓你可以安排一個定期執(zhí)行的任務玖瘸,下面例子的任務將在500毫秒之后執(zhí)行秸讹,然后每250毫秒執(zhí)行一次:
someScheduler.schedulePeriodically(someAction, 500, 250, TimeUnit.MILLISECONDS);
測試調(diào)度器
TestScheduler讓你可以對調(diào)度器的時鐘表現(xiàn)進行手動微調(diào)。這對依賴精確時間安排的任務的測試很有用處雅倒。這個調(diào)度器有三個額外的方法:
- advanceTimeTo(time,unit) 向前波動調(diào)度器的時鐘到一個指定的時間點
- advanceTimeBy(time,unit) 將調(diào)度器的時鐘向前撥動一個指定的時間段
- triggerActions(?) 開始執(zhí)行任何計劃中的但是未啟動的任務璃诀,如果它們的計劃時間等于或者早于調(diào)度器時鐘的當前時間
操作符分類
ReactiveX的每種編程語言的實現(xiàn)都實現(xiàn)了一組操作符的集合。不同的實現(xiàn)之間有很多重疊的部分蔑匣,也有一些操作符只存在特定的實現(xiàn)中劣欢。每種實現(xiàn)都傾向于用那種編程語言中他們熟悉的上下文中相似的方法給這些操作符命名。
本文首先會給出ReactiveX的核心操作符列表和對應的文檔鏈接裁良,后面還有一個決策樹用于幫助你根據(jù)具體的場景選擇合適的操作符凿将。最后有一個語言特定實現(xiàn)的按字母排序的操作符列表。
如果你想實現(xiàn)你自己的操作符趴久,可以參考這里:實現(xiàn)自定義操作符
創(chuàng)建操作
用于創(chuàng)建Observable的操作符
-
Create
— 通過調(diào)用觀察者的方法從頭創(chuàng)建一個Observable -
Defer
— 在觀察者訂閱之前不創(chuàng)建這個Observable丸相,為每一個觀察者創(chuàng)建一個新的Observable -
Empty/Never/Throw
— 創(chuàng)建行為受限的特殊Observable -
From
— 將其它的對象或數(shù)據(jù)結(jié)構(gòu)轉(zhuǎn)換為Observable -
Interval
— 創(chuàng)建一個定時發(fā)射整數(shù)序列的Observable -
Just
— 將對象或者對象集合轉(zhuǎn)換為一個會發(fā)射這些對象的Observable -
Range
— 創(chuàng)建發(fā)射指定范圍的整數(shù)序列的Observable -
Repeat
— 創(chuàng)建重復發(fā)射特定的數(shù)據(jù)或數(shù)據(jù)序列的Observable -
Start
— 創(chuàng)建發(fā)射一個函數(shù)的返回值的Observable -
Timer
— 創(chuàng)建在一個指定的延遲之后發(fā)射單個數(shù)據(jù)的Observable
變換操作
這些操作符可用于對Observable發(fā)射的數(shù)據(jù)進行變換,詳細解釋可以看每個操作符的文檔
-
Buffer
— 緩存彼棍,可以簡單的理解為緩存灭忠,它定期從Observable收集數(shù)據(jù)到一個集合,然后把這些數(shù)據(jù)集合打包發(fā)射座硕,而不是一次發(fā)射一個 -
FlatMap
— 扁平映射弛作,將Observable發(fā)射的數(shù)據(jù)變換為Observables集合,然后將這些Observable發(fā)射的數(shù)據(jù)平坦化的放進一個單獨的Observable华匾,可以認為是一個將嵌套的數(shù)據(jù)結(jié)構(gòu)展開的過程映琳。 -
GroupBy
— 分組,將原來的Observable分拆為Observable集合蜘拉,將原始Observable發(fā)射的數(shù)據(jù)按Key分組萨西,每一個Observable發(fā)射一組不同的數(shù)據(jù) -
Map
— 映射,通過對序列的每一項都應用一個函數(shù)變換Observable發(fā)射的數(shù)據(jù)旭旭,實質(zhì)是對序列中的每一項執(zhí)行一個函數(shù)谎脯,函數(shù)的參數(shù)就是這個數(shù)據(jù)項 -
Scan
— 掃描,對Observable發(fā)射的每一項數(shù)據(jù)應用一個函數(shù)持寄,然后按順序依次發(fā)射這些值 -
Window
— 窗口源梭,定期將來自Observable的數(shù)據(jù)分拆成一些Observable窗口娱俺,然后發(fā)射這些窗口,而不是每次發(fā)射一項废麻。類似于Buffer荠卷,但Buffer發(fā)射的是數(shù)據(jù),Window發(fā)射的是Observable烛愧,每一個Observable發(fā)射原始Observable的數(shù)據(jù)的一個子集
過濾操作
這些操作符用于從Observable發(fā)射的數(shù)據(jù)中進行選擇
-
Debounce
— 只有在空閑了一段時間后才發(fā)射數(shù)據(jù)油宜,通俗的說,就是如果一段時間沒有操作屑彻,就執(zhí)行一次操作 -
Distinct
— 去重验庙,過濾掉重復數(shù)據(jù)項 -
ElementAt
— 取值,取特定位置的數(shù)據(jù)項 -
Filter
— 過濾社牲,過濾掉沒有通過謂詞測試的數(shù)據(jù)項粪薛,只發(fā)射通過測試的 -
First
— 首項,只發(fā)射滿足條件的第一條數(shù)據(jù) -
IgnoreElements
— 忽略所有的數(shù)據(jù)搏恤,只保留終止通知(onError或onCompleted) -
Last
— 末項违寿,只發(fā)射最后一條數(shù)據(jù) -
Sample
— 取樣,定期發(fā)射最新的數(shù)據(jù)熟空,等于是數(shù)據(jù)抽樣藤巢,有的實現(xiàn)里叫ThrottleFirst -
Skip
— 跳過前面的若干項數(shù)據(jù) -
SkipLast
— 跳過后面的若干項數(shù)據(jù) -
Take
— 只保留前面的若干項數(shù)據(jù) -
TakeLast
— 只保留后面的若干項數(shù)據(jù)
組合操作
組合操作符用于將多個Observable組合成一個單一的Observable
-
And/Then/When
— 通過模式(And條件)和計劃(Then次序)組合兩個或多個Observable發(fā)射的數(shù)據(jù)集 -
CombineLatest
— 當兩個Observables中的任何一個發(fā)射了一個數(shù)據(jù)時,通過一個指定的函數(shù)組合每個Observable發(fā)射的最新數(shù)據(jù)(一共兩個數(shù)據(jù))息罗,然后發(fā)射這個函數(shù)的結(jié)果 -
Join
— 無論何時掂咒,如果一個Observable發(fā)射了一個數(shù)據(jù)項,只要在另一個Observable發(fā)射的數(shù)據(jù)項定義的時間窗口內(nèi)迈喉,就將兩個Observable發(fā)射的數(shù)據(jù)合并發(fā)射 -
Merge
— 將兩個Observable發(fā)射的數(shù)據(jù)組合并成一個 -
StartWith
— 在發(fā)射原來的Observable的數(shù)據(jù)序列之前绍刮,先發(fā)射一個指定的數(shù)據(jù)序列或數(shù)據(jù)項 -
Switch
— 將一個發(fā)射Observable序列的Observable轉(zhuǎn)換為這樣一個Observable:它逐個發(fā)射那些Observable最近發(fā)射的數(shù)據(jù) -
Zip
— 打包,使用一個指定的函數(shù)將多個Observable發(fā)射的數(shù)據(jù)組合在一起挨摸,然后將這個函數(shù)的結(jié)果作為單項數(shù)據(jù)發(fā)射
錯誤處理
這些操作符用于從錯誤通知中恢復
-
Catch
— 捕獲孩革,繼續(xù)序列操作种冬,將錯誤替換為正常的數(shù)據(jù)蝶溶,從onError通知中恢復 -
Retry
— 重試,如果Observable發(fā)射了一個錯誤通知吕粗,重新訂閱它熔掺,期待它正常終止
輔助操作
一組用于處理Observable的操作符
-
Delay
— 延遲一段時間發(fā)射結(jié)果數(shù)據(jù) -
Do
— 注冊一個動作占用一些Observable的生命周期事件饱搏,相當于Mock某個操作 -
Materialize/Dematerialize
— 將發(fā)射的數(shù)據(jù)和通知都當做數(shù)據(jù)發(fā)射,或者反過來 -
ObserveOn
— 指定觀察者觀察Observable的調(diào)度程序(工作線程) -
Serialize
— 強制Observable按次序發(fā)射數(shù)據(jù)并且功能是有效的 -
Subscribe
— 收到Observable發(fā)射的數(shù)據(jù)和通知后執(zhí)行的操作 -
SubscribeOn
— 指定Observable應該在哪個調(diào)度程序上執(zhí)行 -
TimeInterval
— 將一個Observable轉(zhuǎn)換為發(fā)射兩個數(shù)據(jù)之間所耗費時間的Observable -
Timeout
— 添加超時機制置逻,如果過了指定的一段時間沒有發(fā)射數(shù)據(jù)推沸,就發(fā)射一個錯誤通知 -
Timestamp
— 給Observable發(fā)射的每個數(shù)據(jù)項添加一個時間戳 -
Using
— 創(chuàng)建一個只在Observable的生命周期內(nèi)存在的一次性資源
條件和布爾操作
這些操作符可用于單個或多個數(shù)據(jù)項,也可用于Observable
-
All
— 判斷Observable發(fā)射的所有的數(shù)據(jù)項是否都滿足某個條件 -
Amb
— 給定多個Observable诽偷,只讓第一個發(fā)射數(shù)據(jù)的Observable發(fā)射全部數(shù)據(jù) -
Contains
— 判斷Observable是否會發(fā)射一個指定的數(shù)據(jù)項 -
DefaultIfEmpty
— 發(fā)射來自原始Observable的數(shù)據(jù)坤学,如果原始Observable沒有發(fā)射數(shù)據(jù),就發(fā)射一個默認數(shù)據(jù) -
SequenceEqual
— 判斷兩個Observable是否按相同的數(shù)據(jù)序列 -
SkipUntil
— 丟棄原始Observable發(fā)射的數(shù)據(jù)报慕,直到第二個Observable發(fā)射了一個數(shù)據(jù)深浮,然后發(fā)射原始Observable的剩余數(shù)據(jù) -
SkipWhile
— 丟棄原始Observable發(fā)射的數(shù)據(jù),直到一個特定的條件為假眠冈,然后發(fā)射原始Observable剩余的數(shù)據(jù) -
TakeUntil
— 發(fā)射來自原始Observable的數(shù)據(jù)飞苇,直到第二個Observable發(fā)射了一個數(shù)據(jù)或一個通知 -
TakeWhile
— 發(fā)射原始Observable的數(shù)據(jù),直到一個特定的條件為真蜗顽,然后跳過剩余的數(shù)據(jù)
算術和聚合操作
這些操作符可用于整個數(shù)據(jù)序列
-
Average
— 計算Observable發(fā)射的數(shù)據(jù)序列的平均值布卡,然后發(fā)射這個結(jié)果 -
Concat
— 不交錯的連接多個Observable的數(shù)據(jù) -
Count
— 計算Observable發(fā)射的數(shù)據(jù)個數(shù),然后發(fā)射這個結(jié)果 -
Max
— 計算并發(fā)射數(shù)據(jù)序列的最大值 -
Min
— 計算并發(fā)射數(shù)據(jù)序列的最小值 -
Reduce
— 按順序?qū)?shù)據(jù)序列的每一個應用某個函數(shù)雇盖,然后返回這個值 -
Sum
— 計算并發(fā)射數(shù)據(jù)序列的和
連接操作
一些有精確可控的訂閱行為的特殊Observable
-
Connect
— 指示一個可連接的Observable開始發(fā)射數(shù)據(jù)給訂閱者 -
Publish
— 將一個普通的Observable轉(zhuǎn)換為可連接的 -
RefCount
— 使一個可連接的Observable表現(xiàn)得像一個普通的Observable -
Replay
— 確保所有的觀察者收到同樣的數(shù)據(jù)序列忿等,即使他們在Observable開始發(fā)射數(shù)據(jù)之后才訂閱
轉(zhuǎn)換操作
操作符決策樹
幾種主要的需求
- 直接創(chuàng)建一個Observable(創(chuàng)建操作)
- 組合多個Observable(組合操作)
- 對Observable發(fā)射的數(shù)據(jù)執(zhí)行變換操作(變換操作)
- 從Observable發(fā)射的數(shù)據(jù)中取特定的值(過濾操作)
- 轉(zhuǎn)發(fā)Observable的部分值(條件/布爾/過濾操作)
- 對Observable發(fā)射的數(shù)據(jù)序列求值(算術/聚合操作)
這個頁面展示了創(chuàng)建Observable的各種方法。
- just(?) — 將一個或多個對象轉(zhuǎn)換成發(fā)射這個或這些對象的一個Observable
- from(?) — 將一個Iterable, 一個Future, 或者一個數(shù)組轉(zhuǎn)換成一個Observable
- repeat(?) — 創(chuàng)建一個重復發(fā)射指定數(shù)據(jù)或數(shù)據(jù)序列的Observable
- repeatWhen(?) — 創(chuàng)建一個重復發(fā)射指定數(shù)據(jù)或數(shù)據(jù)序列的Observable崔挖,它依賴于另一個Observable發(fā)射的數(shù)據(jù)
- create(?) — 使用一個函數(shù)從頭創(chuàng)建一個Observable
- defer(?) — 只有當訂閱者訂閱才創(chuàng)建Observable贸街;為每個訂閱創(chuàng)建一個新的Observable
- range(?) — 創(chuàng)建一個發(fā)射指定范圍的整數(shù)序列的Observable
- interval(?) — 創(chuàng)建一個按照給定的時間間隔發(fā)射整數(shù)序列的Observable
- timer(?) — 創(chuàng)建一個在給定的延時之后發(fā)射單個數(shù)據(jù)的Observable
- empty(?) — 創(chuàng)建一個什么都不做直接通知完成的Observable
- error(?) — 創(chuàng)建一個什么都不做直接通知錯誤的Observable
- never(?) — 創(chuàng)建一個不發(fā)射任何數(shù)據(jù)的Observable
創(chuàng)建操作
Create
使用一個函數(shù)從頭開始創(chuàng)建一個Observable
你可以使用Create
操作符從頭開始創(chuàng)建一個Observable,給這個操作符傳遞一個接受觀察者作為參數(shù)的函數(shù)狸相,編寫這個函數(shù)讓它的行為表現(xiàn)為一個Observable--恰當?shù)恼{(diào)用觀察者的onNext薛匪,onError和onCompleted方法。
一個形式正確的有限Observable必須嘗試調(diào)用觀察者的onCompleted正好一次或者它的onError正好一次脓鹃,而且此后不能再調(diào)用觀察者的任何其它方法逸尖。
RxJava將這個操作符實現(xiàn)為 create
方法。
建議你在傳遞給create
方法的函數(shù)中檢查觀察者的isUnsubscribed
狀態(tài)瘸右,以便在沒有觀察者的時候娇跟,讓你的Observable停止發(fā)射數(shù)據(jù)或者做昂貴的運算。
示例代碼:
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> observer) {
try {
if (!observer.isUnsubscribed()) {
for (int i = 1; i < 5; i++) {
observer.onNext(i);
}
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
} ).subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
輸出:
Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.
create
方法默認不在任何特定的調(diào)度器上執(zhí)行尊浓。
- Javadoc:
create(OnSubscribe)
Defer
直到有觀察者訂閱時才創(chuàng)建Observable逞频,并且為每個觀察者創(chuàng)建一個新的Observable
Defer
操作符會一直等待直到有觀察者訂閱它,然后它使用Observable工廠方法生成一個Observable栋齿。它對每個觀察者都這樣做苗胀,因此盡管每個訂閱者都以為自己訂閱的是同一個Observable,事實上每個訂閱者獲取的是它們自己的單獨的數(shù)據(jù)序列瓦堵。
在某些情況下基协,等待直到最后一分鐘(就是知道訂閱發(fā)生時)才生成Observable可以確保Observable包含最新的數(shù)據(jù)。
RxJava將這個操作符實現(xiàn)為 defer
方法菇用。這個操作符接受一個你選擇的Observable工廠函數(shù)作為單個參數(shù)澜驮。這個函數(shù)沒有參數(shù),返回一個Observable惋鸥。
defer
方法默認不在任何特定的調(diào)度器上執(zhí)行杂穷。
- Javadoc:
defer(Func0)
switchCase
可選包 rxjava-computation-expressions
中有一個類似的操作符悍缠。switchCase
操作符有條件的創(chuàng)建并返回一個可能的Observables集合中的一個。
可選包 rxjava-computation-expressions
中還有一個更簡單的操作符叫ifThen
耐量。這個操作符檢查某個條件飞蚓,然后根據(jù)結(jié)果,返回原始Observable的鏡像廊蜒,或者返回一個空Observable趴拧。
Empty/Never/Throw
Empty
創(chuàng)建一個不發(fā)射任何數(shù)據(jù)但是正常終止的Observable
Never
創(chuàng)建一個不發(fā)射數(shù)據(jù)也不終止的Observable
Throw
創(chuàng)建一個不發(fā)射數(shù)據(jù)以一個錯誤終止的Observable
這三個操作符生成的Observable行為非常特殊和受限。測試的時候很有用山叮,有時候也用于結(jié)合其它的Observables著榴,或者作為其它需要Observable的操作符的參數(shù)。
RxJava將這些操作符實現(xiàn)為 empty
屁倔,never
和error
脑又。error
操作符需要一個Throwable
參數(shù),你的Observable會以此終止锐借。這些操作符默認不在任何特定的調(diào)度器上執(zhí)行挂谍,但是empty
和error
有一個可選參數(shù)是Scheduler,如果你傳遞了Scheduler參數(shù)瞎饲,它們會在這個調(diào)度器上發(fā)送通知口叙。
- Javadoc: empty()
- Javadoc: never()
- Javadoc: error(java.lang.Throwable)
From
將其它種類的對象和數(shù)據(jù)類型轉(zhuǎn)換為Observable
當你使用Observable時,如果你要處理的數(shù)據(jù)都可以轉(zhuǎn)換成展現(xiàn)為Observables嗅战,而不是需要混合使用Observables和其它類型的數(shù)據(jù)妄田,會非常方便。這讓你在數(shù)據(jù)流的整個生命周期中驮捍,可以使用一組統(tǒng)一的操作符來管理它們疟呐。
例如,Iterable可以看成是同步的Observable东且;Future启具,可以看成是總是只發(fā)射單個數(shù)據(jù)的Observable。通過顯式地將那些數(shù)據(jù)轉(zhuǎn)換為Observables珊泳,你可以像使用Observable一樣與它們交互鲁冯。
因此,大部分ReactiveX實現(xiàn)都提供了將語言特定的對象和數(shù)據(jù)結(jié)構(gòu)轉(zhuǎn)換為Observables的方法色查。
在RxJava中薯演,from
操作符可以轉(zhuǎn)換Future、Iterable和數(shù)組秧了。對于Iterable和數(shù)組跨扮,產(chǎn)生的Observable會發(fā)射Iterable或數(shù)組的每一項數(shù)據(jù)。
示例代碼
Integer[] items = { 0, 1, 2, 3, 4, 5 };
Observable myObservable = Observable.from(items);
myObservable.subscribe(
new Action1<Integer>() {
@Override
public void call(Integer item) {
System.out.println(item);
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable error) {
System.out.println("Error encountered: " + error.getMessage());
}
},
new Action0() {
@Override
public void call() {
System.out.println("Sequence complete");
}
}
);
輸出
0
1
2
3
4
5
Sequence complete
對于Future,它會發(fā)射Future.get()方法返回的單個數(shù)據(jù)衡创。from
方法有一個可接受兩個可選參數(shù)的版本帝嗡,分別指定超時時長和時間單位。如果過了指定的時長Future還沒有返回一個值璃氢,這個Observable會發(fā)射錯誤通知并終止丈探。
from
默認不在任何特定的調(diào)度器上執(zhí)行。然而你可以將Scheduler作為可選的第二個參數(shù)傳遞給Observable拔莱,它會在那個調(diào)度器上管理這個Future。
- Javadoc: from(array)
- Javadoc: from(Iterable)
- Javadoc: from(Future)
- Javadoc: from(Future,Scheduler)
- Javadoc: from(Future,timeout, timeUnit)
RxJavaAsyncUtil
此外隘竭,在可選包 RxJavaAsyncUtil
中塘秦,你還可以用下面這些操作符將actions,callables动看,functions和runnables轉(zhuǎn)換為發(fā)射這些動作的執(zhí)行結(jié)果的Observable:
- fromAction
- fromCallable
- fromFunc0
- fromRunnable
在這個頁面 Start 查看關于這些操作符的更多信息尊剔。
注意:還有一個可選的StringObservable
類中也有一個from
方法,它將一個字符流或者一個REader轉(zhuǎn)換為一個發(fā)射字節(jié)數(shù)組或字符串的Observable菱皆。
runAsync2
注意:這里與后面start
操作符里的runAsync
說明重復了
在單獨的RxJavaAsyncUtil
包中(默認不包含在RxJava中),還有一個runAsync
函數(shù)须误。傳遞一個Action
和一個Scheduler
給runAsync
,它會返回一個StoppableObservable
仇轻,這個Observable使用Action
產(chǎn)生發(fā)射的數(shù)據(jù)項京痢。
傳遞一個Action
和一個Scheduler
給runAsync
,它返回一個使用這個Action
產(chǎn)生數(shù)據(jù)的StoppableObservable
篷店。這個Action
接受一個Observable
和一個Subscription
作為參數(shù)祭椰,它使用Subscription
檢查unsubscribed
條件,一旦發(fā)現(xiàn)條件為真就立即停止發(fā)射數(shù)據(jù)疲陕。在任何時候你都可以使用unsubscribe
方法手動停止一個StoppableObservable
(這會同時取消訂閱與這個StoppableObservable
關聯(lián)的Subscription
)方淤。
由于runAsync
會立即調(diào)用Action
并開始發(fā)射數(shù)據(jù),在你創(chuàng)建StoppableObservable之后到你的觀察者準備好接受數(shù)據(jù)之前這段時間里蹄殃,可能會有一部分數(shù)據(jù)會丟失携茂。如果這不符合你的要求,可以使用runAsync
的一個變體诅岩,它也接受一個Subject
參數(shù)讳苦,傳遞一個ReplaySubject
給它,你可以獲取其它丟失的數(shù)據(jù)了吩谦。
decode
StringObservable
類不是默認RxJava的一部分医吊,包含一個decode
操作符,這個操作符將一個多字節(jié)字符流轉(zhuǎn)換為一個發(fā)射字節(jié)數(shù)組的Observable逮京,這些字節(jié)數(shù)組按照字符的邊界劃分卿堂。
Interval
創(chuàng)建一個按固定時間間隔發(fā)射整數(shù)序列的Observable
Interval
操作符返回一個Observable,它按固定的時間間隔發(fā)射一個無限遞增的整數(shù)序列。
RxJava將這個操作符實現(xiàn)為interval
方法草描。它接受一個表示時間間隔的參數(shù)和一個表示時間單位的參數(shù)览绿。
- Javadoc: interval(long,TimeUnit)
- Javadoc: interval(long,TimeUnit,Scheduler)
還有一個版本的interval
返回一個Observable,它在指定延遲之后先發(fā)射一個零值穗慕,然后再按照指定的時間間隔發(fā)射遞增的數(shù)字饿敲。這個版本的interval
在RxJava 1.0.0中叫做timer
,但是那個方法已經(jīng)不建議使用了逛绵,因為一個名叫interval
的操作符有同樣的功能怀各。
Javadoc: interval(long,long,TimeUnit) Javadoc: interval(long,long,TimeUnit,Scheduler)
interval
默認在computation
調(diào)度器上執(zhí)行。你也可以傳遞一個可選的Scheduler參數(shù)來指定調(diào)度器术浪。
Just
創(chuàng)建一個發(fā)射指定值的Observable
Just將單個數(shù)據(jù)轉(zhuǎn)換為發(fā)射那個數(shù)據(jù)的Observable瓢对。
Just類似于From,但是From會將數(shù)組或Iterable的數(shù)據(jù)取出然后逐個發(fā)射胰苏,而Just只是簡單的原樣發(fā)射硕蛹,將數(shù)組或Iterable當做單個數(shù)據(jù)。
注意:如果你傳遞null
給Just硕并,它會返回一個發(fā)射null
值的Observable法焰。不要誤認為它會返回一個空Observable(完全不發(fā)射任何數(shù)據(jù)的Observable),如果需要空Observable你應該使用Empty操作符倔毙。
RxJava將這個操作符實現(xiàn)為just
函數(shù)埃仪,它接受一至九個參數(shù),返回一個按參數(shù)列表順序發(fā)射這些數(shù)據(jù)的Observable陕赃。
示例代碼:
Observable.just(1, 2, 3)
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
輸出
Next: 1
Next: 2
Next: 3
Sequence complete.
- Javadoc: just(item) (還有其它接受二到九個參數(shù)的版本)
Range
創(chuàng)建一個發(fā)射特定整數(shù)序列的Observable
Range操作符發(fā)射一個范圍內(nèi)的有序整數(shù)序列,你可以指定范圍的起始和長度凯正。
RxJava將這個操作符實現(xiàn)為range
函數(shù)廊散,它接受兩個參數(shù)米者,一個是范圍的起始值韭畸,一個是范圍的數(shù)據(jù)的數(shù)目宇智。如果你將第二個參數(shù)設為0,將導致Observable不發(fā)射任何數(shù)據(jù)(如果設置為負數(shù)胰丁,會拋異常)随橘。
range
默認不在任何特定的調(diào)度器上執(zhí)行。有一個變體可以通過可選參數(shù)指定Scheduler锦庸。
- Javadoc: range(int,int)
- Javadoc: range(int,int,Scheduler)
Repeat
創(chuàng)建一個發(fā)射特定數(shù)據(jù)重復多次的Observable
Repeat重復地發(fā)射數(shù)據(jù)机蔗。某些實現(xiàn)允許你重復的發(fā)射某個數(shù)據(jù)序列,還有一些允許你限制重復的次數(shù)甘萧。
RxJava將這個操作符實現(xiàn)為repeat
方法萝嘁。它不是創(chuàng)建一個Observable,而是重復發(fā)射原始Observable的數(shù)據(jù)序列扬卷,這個序列或者是無限的牙言,或者通過repeat(n)
指定重復次數(shù)。
repeat
操作符默認在trampoline
調(diào)度器上執(zhí)行邀泉。有一個變體可以通過可選參數(shù)指定Scheduler。
Javadoc: repeat() Javadoc: repeat(long) Javadoc: repeat(Scheduler) Javadoc: repeat(long,Scheduler)
repeatWhen
還有一個叫做repeatWhen
的操作符钝鸽,它不是緩存和重放原始Observable的數(shù)據(jù)序列汇恤,而是有條件的重新訂閱和發(fā)射原來的Observable。
將原始Observable的終止通知(完成或錯誤)當做一個void
數(shù)據(jù)傳遞給一個通知處理器拔恰,它以此來決定是否要重新訂閱和發(fā)射原來的Observable因谎。這個通知處理器就像一個Observable操作符,接受一個發(fā)射void
通知的Observable為輸入颜懊,返回一個發(fā)射void
數(shù)據(jù)(意思是财岔,重新訂閱和發(fā)射原始Observable)或者直接終止(意思是,使用repeatWhen
終止發(fā)射數(shù)據(jù))的Observable河爹。
repeatWhen
操作符默認在trampoline
調(diào)度器上執(zhí)行匠璧。有一個變體可以通過可選參數(shù)指定Scheduler。
- Javadoc: repeatWhen(Func1)
- Javadoc: repeatWhen(Func1,Scheduler)
doWhile
doWhile
屬于可選包rxjava-computation-expressions
咸这,不是RxJava標準操作符的一部分夷恍。doWhile
在原始序列的每次重復后檢查某個條件,如果滿足條件才重復發(fā)射媳维。
whileDo
whileDo
屬于可選包rxjava-computation-expressions
酿雪,不是RxJava標準操作符的一部分。whileDo
在原始序列的每次重復前檢查某個條件侄刽,如果滿足條件才重復發(fā)射
Start
返回一個Observable指黎,它發(fā)射一個類似于函數(shù)聲明的值
編程語言有很多種方法可以從運算結(jié)果中獲取值,它們的名字一般叫functions, futures, actions, callables, runnables
等等州丹。在Start
目錄下的這組操作符可以讓它們表現(xiàn)得像Observable醋安,因此它們可以在Observables調(diào)用鏈中與其它Observable搭配使用。
Start
操作符的多種RxJava實現(xiàn)都屬于可選的rxjava-async
模塊。
rxjava-async
模塊包含start
操作符茬故,它接受一個函數(shù)作為參數(shù)盖灸,調(diào)用這個函數(shù)獲取一個值,然后返回一個會發(fā)射這個值給后續(xù)觀察者的Observable磺芭。
注意:這個函數(shù)只會被執(zhí)行一次赁炎,即使多個觀察者訂閱這個返回的Observable。
toAsync
rxjava-async
模塊還包含這幾個操作符:toAsync
, asyncAction
, 和asyncFunc
钾腺。它們接受一個函數(shù)或一個Action作為參數(shù)徙垫。
對于函數(shù)(functions),這個操作符調(diào)用這個函數(shù)獲取一個值放棒,然后返回一個會發(fā)射這個值給后續(xù)觀察者的Observable(和start
一樣)姻报。對于動作(Action),過程類似间螟,但是沒有返回值吴旋,在這種情況下,這個操作符在終止前會發(fā)射一個null
值厢破。
注意:這個函數(shù)或動作只會被執(zhí)行一次荣瑟,即使多個觀察者訂閱這個返回的Observable。
startFuture
rxjava-async
模塊還包含一個startFuture
操作符摩泪,傳遞給它一個返回Future
的函數(shù)笆焰,startFuture
會立即調(diào)用這個函數(shù)獲取Future
對象,然后調(diào)用Future
的get()
方法嘗試獲取它的值见坑。它返回一個發(fā)射這個值給后續(xù)觀察者的Observable嚷掠。
deferFuture
rxjava-async
模塊還包含一個deferFuture
操作符,傳遞給它一個返回Future
的函數(shù)(這個Future
返回一個Observable
)荞驴,deferFuture
返回一個Observable不皆,但是不會調(diào)用你提供的函數(shù),直到有觀察者訂閱它返回的Observable熊楼。這時粟焊,它立即調(diào)用Future
的get()
方法,然后鏡像發(fā)射get()
方法返回的Observable發(fā)射的數(shù)據(jù)孙蒙。
用這種方法项棠,你可以在Observables調(diào)用鏈中包含一個返回Observable的Future
對象。
fromAction
rxjava-async
模塊還包含一個fromAction
操作符挎峦,它接受一個Action
作為參數(shù)香追,返回一個Observable,一旦Action終止坦胶,它發(fā)射這個你傳遞給fromAction
的數(shù)據(jù)透典。
fromCallable
rxjava-async
模塊還包含一個fromCallable
操作符晴楔,它接受一個Callable
作為參數(shù),返回一個發(fā)射這個Callable
的結(jié)果的Observable峭咒。
fromRunnable
rxjava-async
模塊還包含一個fromRunnable
操作符税弃,它接受一個Runnable
作為參數(shù),返回一個Observable凑队,一旦Runnable終止则果,它發(fā)射這個你傳遞給fromRunnable
的數(shù)據(jù)。
forEachFuture
rxjava-async
模塊還包含一個forEachFuture
操作符漩氨。它其實不算Start
操作符的一個變體西壮,而是有一些自己的特點。你傳遞一些典型的觀察者方法(如onNext, onError和onCompleted)給它叫惊,Observable會以通常的方式調(diào)用它款青。但是forEachFuture
自己返回一個Future
并且在get()
方法處阻塞,直到原始Observable執(zhí)行完成霍狰,然后它返回抡草,完成還是錯誤依賴于原始Observable是完成還是錯誤。
如果你想要一個函數(shù)阻塞直到Observable執(zhí)行完成蔗坯,可以使用這個操作符康震。