RxJava2外傳Ⅰ:官方README翻譯

有些翻譯仍不準確弄兜,會持續(xù)改進萝挤。

術(shù)語

上游蹂窖,下游

RxJava中的數(shù)據(jù)流包括一個數(shù)據(jù)源、0個或多個中間步驟抡锈、一個數(shù)據(jù)消費者或組合子步驟(其中的步驟負責以某種方式使用數(shù)據(jù)流):

source.operator1().operator2().operator3().subscribe(consumer);

source.flatMap(value -> source.operator1().operator2().operator3());

如果我們想象自己站在operator2上疾忍,向左看向source,叫做上游床三;向右看向subscriber/consumer一罩,叫做下游。當像下面這樣每個元素單獨寫一行時看的更加明顯:

source
  .operator1()
  .operator2()
  .operator3()
  .subscribe(consumer)

運動的對象

在RxJava的文檔中撇簿, emission, emits, item, event, signal, data and message都是近義詞聂渊,都表示沿著數(shù)據(jù)流移動的對象。

背壓

當數(shù)據(jù)流通過異步步驟運行時四瘫,每個步驟可能以不同的速度執(zhí)行不同的事情汉嗽。為了避免那些由于臨時緩沖或需要跳過/刪除數(shù)據(jù)而導致內(nèi)存使用量增加的步驟被淹沒,因此應用了所謂的背壓找蜜,這是一種流控制形式饼暑,從而步驟可以表示準備處理多少項數(shù)據(jù)。使用背壓允許當前步驟在通常無法知道上游將發(fā)送多少項數(shù)據(jù)的情況下限制數(shù)據(jù)流的內(nèi)存使用洗做。

在RxJava中弓叛,Flowable類被設(shè)計成支持背壓,Observable類專用于非背壓操作诚纸。Single, Maybe and Completable也不支持背壓撰筷。

裝配時間Assembly time

通過應用各式各樣的中間操作符來準備數(shù)據(jù)流,發(fā)生在裝配時間畦徘。

Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v * v)
.filter(v -> v % 3 == 0)
;

在當前點上闭专,數(shù)據(jù)還沒有流動,也沒有發(fā)生副作用旧烧。

訂閱時間 Subscription time

這是在內(nèi)部建立處理步驟鏈的流上調(diào)用subscribe()時的臨時狀態(tài):

flow.subscribe(System.out::println)

這時會觸發(fā)訂閱副作用影钉。有些源在這種狀態(tài)下會立即阻塞或開始發(fā)送數(shù)據(jù)項。

運行時 Runtime

這是數(shù)據(jù)流主動發(fā)出數(shù)據(jù)項掘剪、錯誤或完成信號時的狀態(tài):

Observable.create(emitter -> {
     while (!emitter.isDisposed()) {
         long time = System.currentTimeMillis();
         emitter.onNext(time);
         if (time % 2 != 0) {
             emitter.onError(new IllegalStateException("Odd millisecond!"));
             break;
         }
     }
})
.subscribe(System.out::println, Throwable::printStackTrace);

實際上平委,這是上面給定示例的主體執(zhí)行的時候。

簡單的后臺計算

RxJava的一個常見用例是在后臺線程上運行一些計算夺谁、網(wǎng)絡(luò)請求廉赔,并在UI線程上顯示結(jié)果(或錯誤)

import io.reactivex.schedulers.Schedulers;

Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
})
  .subscribeOn(Schedulers.io())
  .observeOn(Schedulers.single())
  .subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000); // <--- wait for the flow to finish

這種鏈式調(diào)用方法的形式稱為流式API,類似于builder模式匾鸥。然而蜡塌,RxJava的響應類型是不可變的;每個方法調(diào)用都返回一個添加了行為的新的Flowable。我們可以把上面的例子改寫為下面這樣:

Flowable<String> source = Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
});

Flowable<String> runBackground = source.subscribeOn(Schedulers.io());

Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());

showForeground.subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000);

通常勿负,您可以通過subscribeOn將計算或阻塞IO移動到其他線程馏艾。一旦數(shù)據(jù)準備好,您就可以通過observeOn確保它們在前臺或GUI線程上得到處理。

調(diào)度者Schedulers

RxJava操作符不直接使用Thread或ExecutorServices琅摩,而是使用所謂的調(diào)度器Scheduler铁孵,這些調(diào)度器將并發(fā)源抽象到統(tǒng)一API后面。RxJava 2提供了幾個可通過scheduler類訪問的標準調(diào)度器房资。

  • Schedulers.computation(): 在后臺的固定數(shù)量的專用線程上運行計算密集型工作蜕劝。大多數(shù)異步操作符將此作為其默認調(diào)度程序。
  • Schedulers.io(): 在一組動態(tài)變化的線程上運行類似I/ o或阻塞操作轰异。
  • Schedulers.single(): 以順序和FIFO方式在單個線程上運行工作岖沛。
  • Schedulers.trampoline(): 在參與的線程中以順序和FIFO方式運行工作,通常用于測試目的搭独。

這些在所有JVM平臺上都可用婴削,但是在某些特定的平臺上,例如android戳稽,有它們特有的調(diào)度器:AndroidSchedulers.mainThread(), SwingScheduler.instance() or JavaFXSchedulers.gui().

此外,還可以通過Scheduler .from(Executor)將現(xiàn)有的Executor(及其子類型期升,如ExecutorService)包裝到調(diào)度器Schedulers中惊奇。例如,可以使用它來擁有更大但仍然固定的線程池(不同于分別使用compute()和io())播赁。

上面例子結(jié)尾處的Thread.sleep(2000);是有意為之颂郎。在RxJava中,默認調(diào)度程序在守護線程上運行容为,這意味著一旦Java主線程退出乓序,它們就會全部停止,后臺計算可能永遠不會發(fā)生坎背。在這個示例場景中替劈,休眠一段時間可以讓您看到控制臺上數(shù)據(jù)流流的輸出。

流中的并發(fā)

RxJava中的流本質(zhì)上是順序的得滤,它們被劃分為可以彼此并發(fā)運行的處理階段

Flowable.range(1, 10)
  .observeOn(Schedulers.computation())
  .map(v -> v * v)
  .blockingSubscribe(System.out::println);

這個示例流將computation調(diào)度器上的數(shù)字從1平方到10陨献,并在主線程(更準確地說,是blockingSubscribe的調(diào)用線程)上處理結(jié)果懂更。然而lambda表達式v -> v * v并不是并行運行的眨业。它在同一個計算線程上依次接收1到10的值。

并行處理

并行處理數(shù)字1到10稍微復雜一些:

Flowable.range(1, 10)
  .flatMap(v ->
      Flowable.just(v)
        .subscribeOn(Schedulers.computation())
        .map(w -> w * w)
  )
  .blockingSubscribe(System.out::println);

實際上沮协,RxJava中的并行性意味著運行獨立的流并將它們的結(jié)果合并回單個流龄捡。操作符flatMap首先將1到10的每個數(shù)字映射到它自己的Flowable中,運行它們并合并計算的平方慷暂。

但是聘殖,請注意,flatMap不保證任何順序,來自內(nèi)部流的最終結(jié)果可能是交錯的就斤。有其他的操作符可供選擇:

  • concatMap 它每次映射并運行一個內(nèi)部流
  • concatMapEager 它“同時”運行所有內(nèi)部流悍募,但是輸出流將按照這些內(nèi)部流創(chuàng)建的順序輸出。

或者洋机,Flowable.parallel()操作符和ParallelFlowable類型可以幫助實現(xiàn)相同的并行處理模式:

Flowable.range(1, 10)
  .parallel()
  .runOn(Schedulers.computation())
  .map(v -> v * v)
  .sequential()
  .blockingSubscribe(System.out::println);

依賴子流

flatMap是一個強大的操作符坠宴,在很多情況下都有幫助。例如绷旗,給定一個返回Flowable的服務(wù)喜鼓,我們希望使用第一個服務(wù)發(fā)出的值調(diào)用另一個服務(wù):

Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();

inventorySource.flatMap(inventoryItem ->
    erp.getDemandAsync(inventoryItem.getId())
    .map(demand 
        -> System.out.println("Item " + inventoryItem.getName() + " has demand " + demand));
  )
  .subscribe();

延續(xù)Continuations

有時,當一個項變得可用時衔肢,人們希望依賴它執(zhí)行一些計算庄岖。這有時稱為延續(xù),根據(jù)將要發(fā)生的情況和涉及的類型角骤,可能需要不同的操作符來實現(xiàn)隅忿。

依賴

最典型的場景是給定一個值,調(diào)用另一個服務(wù)邦尊,等待并使用其結(jié)果繼續(xù):

service.apiCall()
.flatMap(value -> service.anotherApiCall(value))
.flatMap(next -> service.finalCall(next))

通常情況下背桐,后面的序列也需要來自前面映射的值。這可以通過將外部flatMap 移動到上一個flatMap的內(nèi)部來實現(xiàn)蝉揍,例如:

service.apiCall()
.flatMap(value ->
    service.anotherApiCall(value)
    .flatMap(next -> service.finalCallBoth(value, next))
)

在這里链峭,由于lambda變量捕獲,原始value將在內(nèi)部flatMap中可用又沾。

非依賴

在其他場景中弊仪,第一個源/數(shù)據(jù)流的結(jié)果是不相關(guān)的,我們希望使用獨立的另一個源繼續(xù)杖刷。flatMap 也可以勝任:

Observable continued = sourceObservable.flatMapSingle(ignored -> someSingleSource)
continued.map(v -> v.toString())
  .subscribe(System.out::println, Throwable::printStackTrace);

然而励饵,在這種情況下,延續(xù)保持Observable 滑燃,而不是可能更合適的Single.(這是可以理解的曲横,因為從flatMapSingle的角度來看,sourceObservable是一個多值源不瓶,因此映射也可能導致多個值)

雖然通常有一種方法更有表現(xiàn)力(也更低的開銷)禾嫉,即使用Completable作為中介,然后使用它的操作符andThen來繼續(xù):

sourceObservable
  .ignoreElements()           // returns Completable
  .andThen(someSingleSource)
  .map(v -> v.toString())

sourceObservable和someSingleSource之間唯一的依賴關(guān)系是前者應該正常完成蚊丐,以便使用后者熙参。

延遲依賴

有時,前一個序列和新序列之間存在隱式的數(shù)據(jù)依賴關(guān)系麦备,由于某些原因孽椰,該依賴關(guān)系沒有通過“常規(guī)通道”流動昭娩。有人會傾向于這樣寫延續(xù):

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.just(count.get()))
  .subscribe(System.out::println);

不幸的是,這輸出0黍匾,因為Single.just(count.get())是在數(shù)據(jù)流尚未運行的assembly time期間計算的栏渺。我們需要一些東西,來推遲Single來源的計算锐涯,直到主要來源完成的運行時:

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.defer(() -> Single.just(count.get())))
  .subscribe(System.out::println);

或者

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.fromCallable(() -> count.get()))
  .subscribe(System.out::println);

類型轉(zhuǎn)換

有時磕诊,源或服務(wù)返回的類型與應該使用它的流不同。 例如纹腌,在上面的inventory例子中霎终,getDemandAsync 可能返回一個Single<DemandRecord>. 如果代碼示例保持不變,則會導致編譯時錯誤(但是升薯,通常會出現(xiàn)關(guān)于缺少重載的誤導性錯誤消息)莱褒。

在這種情況下,通常有兩種方式來修復轉(zhuǎn)換: 1) 轉(zhuǎn)換為所需類型 2) 查找并使用支持不同類型的特定操作符的重載涎劈。

轉(zhuǎn)換為所需類型

每個響應基類都具有可以執(zhí)行此類轉(zhuǎn)換(包括協(xié)議轉(zhuǎn)換)的操作符广凸,以匹配其他類型。 下面的矩陣顯示了可用的轉(zhuǎn)換選項:

Flowable Observable Single Maybe Completable
Flowable toObservable first, firstOrError, single, singleOrError, last, lastOrError1 firstElement, singleElement, lastElement ignoreElements
Observable toFlowable2 first, firstOrError, single, singleOrError, last, lastOrError1 firstElement, singleElement, lastElement ignoreElements
Single toFlowable3 toObservable toMaybe ignoreElement
Maybe toFlowable3 toObservable toSingle ignoreElement
Completable toFlowable toObservable toSingle toMaybe

1: 當將一個多值源轉(zhuǎn)換為一個單值源時蛛枚,你應該決定使用多個源中的哪一個值作為結(jié)果谅海。

2: 把一個Observable變成Flowable 需要一個額外的決定: 如何處理Observable源中潛在的無約束流? 通過BackpressureStrategy參數(shù)或標準的Flowable操作符(如onBackpressureBufferonBackpressureDrop坤候、onbackpressurerelatest,這些操作符還允許進一步定制反壓力行為胁赢。)可以使用幾種策略(如緩沖企蹭、刪除白筹、保持最新狀態(tài))來處理。

3: 當只有(最多)一個源數(shù)據(jù)項時谅摄,背壓沒有問題徒河,因為它可以一直存儲到下游準備使用為止。

使用具有所需類型的重載

許多常用的操作符都有可以處理其他類型的重載送漠。它們通常以目標類型的后綴命名:

Operator Overloads
flatMap flatMapSingle, flatMapMaybe, flatMapCompletable, flatMapIterable
concatMap concatMapSingle, concatMapMaybe, concatMapCompletable, concatMapIterable
switchMap switchMapSingle, switchMapMaybe, switchMapCompletable

這些操作符具有后綴而不是簡單地使用具有不同簽名的相同名稱的原因是類型擦除顽照。 Java認為像operator(Function<T, Single<R>>)operator(Function<T, Maybe<R>>)這樣的簽名是相同的(與c#不同),并且由于擦除的原因闽寡,這兩operator最終會成為具有相同簽名的重復方法代兵。

操作符命名約定

命名在編程中是最困難的事情之一,因為命名要求不長爷狈、具有表現(xiàn)力植影、容易捕捉和容易記憶。 不幸的是涎永,目標語言(和已經(jīng)存在的約定)在這方面可能不會提供太多的幫助(不可用的關(guān)鍵字思币、類型擦除鹿响、類型歧義等等)。

不可用關(guān)鍵字

在原始的Rx.NET中谷饿,發(fā)出單個項然后完成的運算符稱為 Return(T).由于Java約定是以小寫字母開頭的方法名惶我,因此它應該是return(T),但這是Java中的關(guān)鍵字博投,因此不可用绸贡。 因此,RxJava選擇將這個操作符命名為 just(T). 操作符Switch也存在同樣的限制贬堵,必須將其命名為 switchOnNext.另一個例子是Catch恃轩,它被命名為 onErrorResumeNext.

類型消除

許多期望用戶提供返回響應類型的函數(shù)的操作符無法重載,因為函數(shù)Function<T, X>周圍的類型擦除將這些方法簽名轉(zhuǎn)換為重復黎做。RxJava通過添加類型后綴來命名這些操作符:

Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper)

Flowable<R> flatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper)

類型歧義

盡管某些操作符在類型擦除方面沒有問題叉跛,但是是在使用Java 8和lambdas的情況下,它們的簽名可能會變得含糊不清蒸殿。例如筷厘,concatWith以各種其他反應性基類型作為參數(shù)(為了在底層實現(xiàn)中提供方便和性能優(yōu)勢),會出現(xiàn)一些重載:

Flowable<T> concatWith(Publisher<? extends T> other);

Flowable<T> concatWith(SingleSource<? extends T> other);

PublisherSingleSource都以函數(shù)接口的形式出現(xiàn)(帶有一個抽象方法的類型)宏所,并可能鼓勵用戶嘗試提供lambda表達式:

someSource.concatWith(s -> Single.just(2))
.subscribe(System.out::println, Throwable::printStackTrace);

不幸的是酥艳,這種方法不起作用,示例根本不打印2爬骤。 實際上充石,從2.1.10版本開始,它甚至不能編譯霞玄,因為至少有4個concatWith重載存在骤铃,編譯器發(fā)現(xiàn)上面的代碼不明確。

在這種情況下坷剧,用戶可能希望延遲一些計算惰爬,直到someSource完成,因此正確的明確操作符應該是defer:

someSource.concatWith(Single.defer(() -> Single.just(2)))
.subscribe(System.out::println, Throwable::printStackTrace);

有時惫企,添加后綴是為了避免邏輯歧義撕瞧,這些歧義可能會編譯,但會在流中產(chǎn)生錯誤的類型:

Flowable<T> merge(Publisher<? extends Publisher<? extends T>> sources);

Flowable<T> mergeArray(Publisher<? extends T>... sources);

當函數(shù)接口類型涉及到類型參數(shù)T時狞尔,這也會變得含糊不清丛版。

錯誤處理

數(shù)據(jù)流可能失敗,此時錯誤被發(fā)送到使用者偏序。 但是页畦,有時多個源可能會失敗,這時可以選擇是否等待所有源完成或失敗禽车。為了表示這種機會寇漫,許多操作符的名稱都添加了DelayError單詞的后綴(而其他操作符的重載中則添加了delayErrordelayErrors boolean標志):

Flowable<T> concat(Publisher<? extends Publisher<? extends T>> sources);

Flowable<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources);

當然刊殉,各種各樣的后綴可能出現(xiàn)在一起:

Flowable<T> concatArrayEagerDelayError(Publisher<? extends T>... sources);

基類vs基類型

由于基類上的靜態(tài)方法和實例方法的數(shù)量太多,因此可以認為基類很重州胳。 RxJava 2的設(shè)計深受響應流規(guī)范的影響记焊,因此,該庫為每種響應類型提供了一個類和一個接口:

Type Class Interface Consumer
0..N backpressured Flowable Publisher1 Subscriber
0..N unbounded Observable ObservableSource2 Observer
1 element or error Single SingleSource SingleObserver
0..1 element or error Maybe MaybeSource MaybeObserver
0 element or error Completable CompletableSource CompletableObserver

1The org.reactivestreams.Publisher是外部響應流庫的一部分栓撞。它是通過受響應流規(guī)范控制的標準化機制與其他響應庫進行交互的主要類型遍膜。

2接口的命名約定是將Source追加到半傳統(tǒng)的類名中。 因為Publisher是由響應流庫提供的瓤湘,所以沒有FlowableSource(而且子類型對互操作也沒有幫助)瓢颅。 然而,這些接口并不是響應流規(guī)范意義上的標準接口弛说,而且目前僅針對RxJava挽懦。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市木人,隨后出現(xiàn)的幾起案子信柿,更是在濱河造成了極大的恐慌,老刑警劉巖醒第,帶你破解...
    沈念sama閱讀 216,591評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件渔嚷,死亡現(xiàn)場離奇詭異,居然都是意外死亡稠曼,警方通過查閱死者的電腦和手機形病,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,448評論 3 392
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來霞幅,“玉大人漠吻,你說我怎么就攤上這事』柔” “怎么了侥猩?”我有些...
    開封第一講書人閱讀 162,823評論 0 353
  • 文/不壞的土叔 我叫張陵榔至,是天一觀的道長抵赢。 經(jīng)常有香客問我,道長唧取,這世上最難降的妖魔是什么铅鲤? 我笑而不...
    開封第一講書人閱讀 58,204評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮枫弟,結(jié)果婚禮上邢享,老公的妹妹穿的比我還像新娘。我一直安慰自己淡诗,他們只是感情好骇塘,可當我...
    茶點故事閱讀 67,228評論 6 388
  • 文/花漫 我一把揭開白布伊履。 她就那樣靜靜地躺著,像睡著了一般款违。 火紅的嫁衣襯著肌膚如雪唐瀑。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,190評論 1 299
  • 那天插爹,我揣著相機與錄音哄辣,去河邊找鬼。 笑死赠尾,一個胖子當著我的面吹牛力穗,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播气嫁,決...
    沈念sama閱讀 40,078評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼当窗,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了寸宵?” 一聲冷哼從身側(cè)響起超全,我...
    開封第一講書人閱讀 38,923評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎邓馒,沒想到半個月后嘶朱,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,334評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡光酣,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,550評論 2 333
  • 正文 我和宋清朗相戀三年疏遏,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片救军。...
    茶點故事閱讀 39,727評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡财异,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出唱遭,到底是詐尸還是另有隱情戳寸,我是刑警寧澤,帶...
    沈念sama閱讀 35,428評論 5 343
  • 正文 年R本政府宣布拷泽,位于F島的核電站疫鹊,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏司致。R本人自食惡果不足惜拆吆,卻給世界環(huán)境...
    茶點故事閱讀 41,022評論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望脂矫。 院中可真熱鬧枣耀,春花似錦、人聲如沸庭再。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,672評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至颅围,卻和暖如春萌焰,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背谷浅。 一陣腳步聲響...
    開封第一講書人閱讀 32,826評論 1 269
  • 我被黑心中介騙來泰國打工扒俯, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人一疯。 一個月前我還...
    沈念sama閱讀 47,734評論 2 368
  • 正文 我出身青樓撼玄,卻偏偏與公主長得像,于是被迫代替她去往敵國和親墩邀。 傳聞我的和親對象是個殘疾皇子掌猛,可洞房花燭夜當晚...
    茶點故事閱讀 44,619評論 2 354

推薦閱讀更多精彩內(nèi)容