有些翻譯仍不準確弄兜,會持續(xù)改進萝挤。
術(shù)語
上游蹂窖,下游
RxJava中的數(shù)據(jù)流包括一個數(shù)據(jù)源、0個或多個中間步驟抡锈、一個數(shù)據(jù)消費者或組合子步驟(其中的步驟負責以某種方式使用數(shù)據(jù)流):
source.operator1().operator2().operator3().subscribe(consumer);
source.flatMap(value -> source.operator1().operator2().operator3());
如果我們想象自己站在operator2上疾忍,向左看向source,叫做上游床三;向右看向subscriber/consumer一罩,叫做下游。當像下面這樣每個元素單獨寫一行時看的更加明顯:
source
.operator1()
.operator2()
.operator3()
.subscribe(consumer)
運動的對象
在RxJava的文檔中撇簿, emission, emits, item, event, signal, data and message都是近義詞聂渊,都表示沿著數(shù)據(jù)流移動的對象。
背壓
當數(shù)據(jù)流通過異步步驟運行時四瘫,每個步驟可能以不同的速度執(zhí)行不同的事情汉嗽。為了避免那些由于臨時緩沖或需要跳過/刪除數(shù)據(jù)而導致內(nèi)存使用量增加的步驟被淹沒,因此應用了所謂的背壓找蜜,這是一種流控制形式饼暑,從而步驟可以表示準備處理多少項數(shù)據(jù)。使用背壓允許當前步驟在通常無法知道上游將發(fā)送多少項數(shù)據(jù)的情況下限制數(shù)據(jù)流的內(nèi)存使用洗做。
在RxJava中弓叛,Flowable
類被設(shè)計成支持背壓,Observable
類專用于非背壓操作诚纸。Single, Maybe and Completable
也不支持背壓撰筷。
裝配時間Assembly time
通過應用各式各樣的中間操作符來準備數(shù)據(jù)流,發(fā)生在裝配時間畦徘。
Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v * v)
.filter(v -> v % 3 == 0)
;
在當前點上闭专,數(shù)據(jù)還沒有流動,也沒有發(fā)生副作用旧烧。
訂閱時間 Subscription time
這是在內(nèi)部建立處理步驟鏈的流上調(diào)用subscribe()時的臨時狀態(tài):
flow.subscribe(System.out::println)
這時會觸發(fā)訂閱副作用影钉。有些源在這種狀態(tài)下會立即阻塞或開始發(fā)送數(shù)據(jù)項。
運行時 Runtime
這是數(shù)據(jù)流主動發(fā)出數(shù)據(jù)項掘剪、錯誤或完成信號時的狀態(tài):
Observable.create(emitter -> {
while (!emitter.isDisposed()) {
long time = System.currentTimeMillis();
emitter.onNext(time);
if (time % 2 != 0) {
emitter.onError(new IllegalStateException("Odd millisecond!"));
break;
}
}
})
.subscribe(System.out::println, Throwable::printStackTrace);
實際上平委,這是上面給定示例的主體執(zhí)行的時候。
簡單的后臺計算
RxJava的一個常見用例是在后臺線程上運行一些計算夺谁、網(wǎng)絡(luò)請求廉赔,并在UI線程上顯示結(jié)果(或錯誤)
import io.reactivex.schedulers.Schedulers;
Flowable.fromCallable(() -> {
Thread.sleep(1000); // imitate expensive computation
return "Done";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000); // <--- wait for the flow to finish
這種鏈式調(diào)用方法的形式稱為流式API,類似于builder模式匾鸥。然而蜡塌,RxJava的響應類型是不可變的;每個方法調(diào)用都返回一個添加了行為的新的Flowable
。我們可以把上面的例子改寫為下面這樣:
Flowable<String> source = Flowable.fromCallable(() -> {
Thread.sleep(1000); // imitate expensive computation
return "Done";
});
Flowable<String> runBackground = source.subscribeOn(Schedulers.io());
Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());
showForeground.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000);
通常勿负,您可以通過subscribeOn
將計算或阻塞IO移動到其他線程馏艾。一旦數(shù)據(jù)準備好,您就可以通過observeOn
確保它們在前臺或GUI線程上得到處理。
調(diào)度者Schedulers
RxJava操作符不直接使用Thread或ExecutorServices琅摩,而是使用所謂的調(diào)度器Scheduler铁孵,這些調(diào)度器將并發(fā)源抽象到統(tǒng)一API后面。RxJava 2提供了幾個可通過scheduler類訪問的標準調(diào)度器房资。
- Schedulers.computation(): 在后臺的固定數(shù)量的專用線程上運行計算密集型工作蜕劝。大多數(shù)異步操作符將此作為其默認調(diào)度程序。
- Schedulers.io(): 在一組動態(tài)變化的線程上運行類似I/ o或阻塞操作轰异。
- Schedulers.single(): 以順序和FIFO方式在單個線程上運行工作岖沛。
- Schedulers.trampoline(): 在參與的線程中以順序和FIFO方式運行工作,通常用于測試目的搭独。
這些在所有JVM平臺上都可用婴削,但是在某些特定的平臺上,例如android戳稽,有它們特有的調(diào)度器:AndroidSchedulers.mainThread(), SwingScheduler.instance() or JavaFXSchedulers.gui()
.
此外,還可以通過Scheduler .from(Executor)
將現(xiàn)有的Executor
(及其子類型期升,如ExecutorService
)包裝到調(diào)度器Schedulers中惊奇。例如,可以使用它來擁有更大但仍然固定的線程池(不同于分別使用compute()和io())播赁。
上面例子結(jié)尾處的Thread.sleep(2000);
是有意為之颂郎。在RxJava中,默認調(diào)度程序在守護線程上運行容为,這意味著一旦Java主線程退出乓序,它們就會全部停止,后臺計算可能永遠不會發(fā)生坎背。在這個示例場景中替劈,休眠一段時間可以讓您看到控制臺上數(shù)據(jù)流流的輸出。
流中的并發(fā)
RxJava中的流本質(zhì)上是順序的得滤,它們被劃分為可以彼此并發(fā)運行的處理階段
Flowable.range(1, 10)
.observeOn(Schedulers.computation())
.map(v -> v * v)
.blockingSubscribe(System.out::println);
這個示例流將computation調(diào)度器上的數(shù)字從1平方到10陨献,并在主線程(更準確地說,是blockingSubscribe的調(diào)用線程)上處理結(jié)果懂更。然而lambda表達式v -> v * v
并不是并行運行的眨业。它在同一個計算線程上依次接收1到10的值。
并行處理
并行處理數(shù)字1到10稍微復雜一些:
Flowable.range(1, 10)
.flatMap(v ->
Flowable.just(v)
.subscribeOn(Schedulers.computation())
.map(w -> w * w)
)
.blockingSubscribe(System.out::println);
實際上沮协,RxJava中的并行性意味著運行獨立的流并將它們的結(jié)果合并回單個流龄捡。操作符flatMap首先將1到10的每個數(shù)字映射到它自己的Flowable中,運行它們并合并計算的平方慷暂。
但是聘殖,請注意,flatMap不保證任何順序,來自內(nèi)部流的最終結(jié)果可能是交錯的就斤。有其他的操作符可供選擇:
-
concatMap
它每次映射并運行一個內(nèi)部流 -
concatMapEager
它“同時”運行所有內(nèi)部流悍募,但是輸出流將按照這些內(nèi)部流創(chuàng)建的順序輸出。
或者洋机,Flowable.parallel()
操作符和ParallelFlowable
類型可以幫助實現(xiàn)相同的并行處理模式:
Flowable.range(1, 10)
.parallel()
.runOn(Schedulers.computation())
.map(v -> v * v)
.sequential()
.blockingSubscribe(System.out::println);
依賴子流
flatMap是一個強大的操作符坠宴,在很多情況下都有幫助。例如绷旗,給定一個返回Flowable的服務(wù)喜鼓,我們希望使用第一個服務(wù)發(fā)出的值調(diào)用另一個服務(wù):
Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();
inventorySource.flatMap(inventoryItem ->
erp.getDemandAsync(inventoryItem.getId())
.map(demand
-> System.out.println("Item " + inventoryItem.getName() + " has demand " + demand));
)
.subscribe();
延續(xù)Continuations
有時,當一個項變得可用時衔肢,人們希望依賴它執(zhí)行一些計算庄岖。這有時稱為延續(xù),根據(jù)將要發(fā)生的情況和涉及的類型角骤,可能需要不同的操作符來實現(xiàn)隅忿。
依賴
最典型的場景是給定一個值,調(diào)用另一個服務(wù)邦尊,等待并使用其結(jié)果繼續(xù):
service.apiCall()
.flatMap(value -> service.anotherApiCall(value))
.flatMap(next -> service.finalCall(next))
通常情況下背桐,后面的序列也需要來自前面映射的值。這可以通過將外部flatMap
移動到上一個flatMap
的內(nèi)部來實現(xiàn)蝉揍,例如:
service.apiCall()
.flatMap(value ->
service.anotherApiCall(value)
.flatMap(next -> service.finalCallBoth(value, next))
)
在這里链峭,由于lambda變量捕獲,原始value
將在內(nèi)部flatMap
中可用又沾。
非依賴
在其他場景中弊仪,第一個源/數(shù)據(jù)流的結(jié)果是不相關(guān)的,我們希望使用獨立的另一個源繼續(xù)杖刷。flatMap
也可以勝任:
Observable continued = sourceObservable.flatMapSingle(ignored -> someSingleSource)
continued.map(v -> v.toString())
.subscribe(System.out::println, Throwable::printStackTrace);
然而励饵,在這種情況下,延續(xù)保持Observable 滑燃,而不是可能更合適的Single.(這是可以理解的曲横,因為從flatMapSingle的角度來看,sourceObservable是一個多值源不瓶,因此映射也可能導致多個值)
雖然通常有一種方法更有表現(xiàn)力(也更低的開銷)禾嫉,即使用Completable作為中介,然后使用它的操作符andThen來繼續(xù):
sourceObservable
.ignoreElements() // returns Completable
.andThen(someSingleSource)
.map(v -> v.toString())
sourceObservable和someSingleSource之間唯一的依賴關(guān)系是前者應該正常完成蚊丐,以便使用后者熙参。
延遲依賴
有時,前一個序列和新序列之間存在隱式的數(shù)據(jù)依賴關(guān)系麦备,由于某些原因孽椰,該依賴關(guān)系沒有通過“常規(guī)通道”流動昭娩。有人會傾向于這樣寫延續(xù):
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.just(count.get()))
.subscribe(System.out::println);
不幸的是,這輸出0黍匾,因為Single.just(count.get())
是在數(shù)據(jù)流尚未運行的assembly time期間計算的栏渺。我們需要一些東西,來推遲Single來源的計算锐涯,直到主要來源完成的運行時:
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.defer(() -> Single.just(count.get())))
.subscribe(System.out::println);
或者
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.fromCallable(() -> count.get()))
.subscribe(System.out::println);
類型轉(zhuǎn)換
有時磕诊,源或服務(wù)返回的類型與應該使用它的流不同。 例如纹腌,在上面的inventory例子中霎终,getDemandAsync
可能返回一個Single<DemandRecord>
. 如果代碼示例保持不變,則會導致編譯時錯誤(但是升薯,通常會出現(xiàn)關(guān)于缺少重載的誤導性錯誤消息)莱褒。
在這種情況下,通常有兩種方式來修復轉(zhuǎn)換: 1) 轉(zhuǎn)換為所需類型 2) 查找并使用支持不同類型的特定操作符的重載涎劈。
轉(zhuǎn)換為所需類型
每個響應基類都具有可以執(zhí)行此類轉(zhuǎn)換(包括協(xié)議轉(zhuǎn)換)的操作符广凸,以匹配其他類型。 下面的矩陣顯示了可用的轉(zhuǎn)換選項:
Flowable | Observable | Single | Maybe | Completable | |
---|---|---|---|---|---|
Flowable | toObservable |
first , firstOrError , single , singleOrError , last , lastOrError 1
|
firstElement , singleElement , lastElement
|
ignoreElements |
|
Observable |
toFlowable 2
|
first , firstOrError , single , singleOrError , last , lastOrError 1
|
firstElement , singleElement , lastElement
|
ignoreElements |
|
Single |
toFlowable 3
|
toObservable |
toMaybe |
ignoreElement |
|
Maybe |
toFlowable 3
|
toObservable |
toSingle |
ignoreElement |
|
Completable | toFlowable |
toObservable |
toSingle |
toMaybe |
1: 當將一個多值源轉(zhuǎn)換為一個單值源時蛛枚,你應該決定使用多個源中的哪一個值作為結(jié)果谅海。
2: 把一個Observable
變成Flowable
需要一個額外的決定: 如何處理Observable
源中潛在的無約束流? 通過BackpressureStrategy
參數(shù)或標準的Flowable
操作符(如onBackpressureBuffer
、onBackpressureDrop
坤候、onbackpressurerelatest
,這些操作符還允許進一步定制反壓力行為胁赢。)可以使用幾種策略(如緩沖企蹭、刪除白筹、保持最新狀態(tài))來處理。
3: 當只有(最多)一個源數(shù)據(jù)項時谅摄,背壓沒有問題徒河,因為它可以一直存儲到下游準備使用為止。
使用具有所需類型的重載
許多常用的操作符都有可以處理其他類型的重載送漠。它們通常以目標類型的后綴命名:
Operator | Overloads |
---|---|
flatMap |
flatMapSingle , flatMapMaybe , flatMapCompletable , flatMapIterable
|
concatMap |
concatMapSingle , concatMapMaybe , concatMapCompletable , concatMapIterable
|
switchMap |
switchMapSingle , switchMapMaybe , switchMapCompletable
|
這些操作符具有后綴而不是簡單地使用具有不同簽名的相同名稱的原因是類型擦除顽照。 Java認為像operator(Function<T, Single<R>>)
和 operator(Function<T, Maybe<R>>)
這樣的簽名是相同的(與c#不同),并且由于擦除的原因闽寡,這兩operator
最終會成為具有相同簽名的重復方法代兵。
操作符命名約定
命名在編程中是最困難的事情之一,因為命名要求不長爷狈、具有表現(xiàn)力植影、容易捕捉和容易記憶。 不幸的是涎永,目標語言(和已經(jīng)存在的約定)在這方面可能不會提供太多的幫助(不可用的關(guān)鍵字思币、類型擦除鹿响、類型歧義等等)。
不可用關(guān)鍵字
在原始的Rx.NET
中谷饿,發(fā)出單個項然后完成的運算符稱為 Return(T)
.由于Java約定是以小寫字母開頭的方法名惶我,因此它應該是return(T)
,但這是Java中的關(guān)鍵字博投,因此不可用绸贡。 因此,RxJava選擇將這個操作符命名為 just(T)
. 操作符Switch
也存在同樣的限制贬堵,必須將其命名為 switchOnNext
.另一個例子是Catch
恃轩,它被命名為 onErrorResumeNext
.
類型消除
許多期望用戶提供返回響應類型的函數(shù)的操作符無法重載,因為函數(shù)Function<T, X>
周圍的類型擦除將這些方法簽名轉(zhuǎn)換為重復黎做。RxJava通過添加類型后綴來命名這些操作符:
Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper)
Flowable<R> flatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper)
類型歧義
盡管某些操作符在類型擦除方面沒有問題叉跛,但是是在使用Java 8和lambdas的情況下,它們的簽名可能會變得含糊不清蒸殿。例如筷厘,concatWith
以各種其他反應性基類型作為參數(shù)(為了在底層實現(xiàn)中提供方便和性能優(yōu)勢),會出現(xiàn)一些重載:
Flowable<T> concatWith(Publisher<? extends T> other);
Flowable<T> concatWith(SingleSource<? extends T> other);
Publisher
和SingleSource
都以函數(shù)接口的形式出現(xiàn)(帶有一個抽象方法的類型)宏所,并可能鼓勵用戶嘗試提供lambda表達式:
someSource.concatWith(s -> Single.just(2))
.subscribe(System.out::println, Throwable::printStackTrace);
不幸的是酥艳,這種方法不起作用,示例根本不打印2
爬骤。 實際上充石,從2.1.10版本開始,它甚至不能編譯霞玄,因為至少有4個concatWith
重載存在骤铃,編譯器發(fā)現(xiàn)上面的代碼不明確。
在這種情況下坷剧,用戶可能希望延遲一些計算惰爬,直到someSource
完成,因此正確的明確操作符應該是defer
:
someSource.concatWith(Single.defer(() -> Single.just(2)))
.subscribe(System.out::println, Throwable::printStackTrace);
有時惫企,添加后綴是為了避免邏輯歧義撕瞧,這些歧義可能會編譯,但會在流中產(chǎn)生錯誤的類型:
Flowable<T> merge(Publisher<? extends Publisher<? extends T>> sources);
Flowable<T> mergeArray(Publisher<? extends T>... sources);
當函數(shù)接口類型涉及到類型參數(shù)T
時狞尔,這也會變得含糊不清丛版。
錯誤處理
數(shù)據(jù)流可能失敗,此時錯誤被發(fā)送到使用者偏序。 但是页畦,有時多個源可能會失敗,這時可以選擇是否等待所有源完成或失敗禽车。為了表示這種機會寇漫,許多操作符的名稱都添加了DelayError
單詞的后綴(而其他操作符的重載中則添加了delayError
或delayErrors
boolean標志):
Flowable<T> concat(Publisher<? extends Publisher<? extends T>> sources);
Flowable<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources);
當然刊殉,各種各樣的后綴可能出現(xiàn)在一起:
Flowable<T> concatArrayEagerDelayError(Publisher<? extends T>... sources);
基類vs基類型
由于基類上的靜態(tài)方法和實例方法的數(shù)量太多,因此可以認為基類很重州胳。 RxJava 2的設(shè)計深受響應流規(guī)范的影響记焊,因此,該庫為每種響應類型提供了一個類和一個接口:
Type | Class | Interface | Consumer |
---|---|---|---|
0..N backpressured | Flowable |
Publisher 1
|
Subscriber |
0..N unbounded | Observable |
ObservableSource 2
|
Observer |
1 element or error | Single |
SingleSource |
SingleObserver |
0..1 element or error | Maybe |
MaybeSource |
MaybeObserver |
0 element or error | Completable |
CompletableSource |
CompletableObserver |
1The org.reactivestreams.Publisher
是外部響應流庫的一部分栓撞。它是通過受響應流規(guī)范控制的標準化機制與其他響應庫進行交互的主要類型遍膜。
2接口的命名約定是將Source
追加到半傳統(tǒng)的類名中。 因為Publisher
是由響應流庫提供的瓤湘,所以沒有FlowableSource
(而且子類型對互操作也沒有幫助)瓢颅。 然而,這些接口并不是響應流規(guī)范意義上的標準接口弛说,而且目前僅針對RxJava挽懦。