原文地址:README.md
前言
- 關(guān)于RxJava:詳情請查看另一篇文章:Android RxJava 學(xué)習(xí)筆記。
- 寫這篇文章的目的:通過閱讀RxJava官方文檔颖御,對RxJava有更全面的認識榄棵,在翻譯的過程中,提高自己的英語水平潘拱,并為他人帶來便利疹鳄。
以下內(nèi)容為譯文內(nèi)容。
RxJava:為JVM而設(shè)計的Reactive Extensions
Reactive Extensions(Rx):一個使用可觀察序列組成異步的芦岂、基于事件的程序的庫瘪弓。RxJava就是Reactive Extensions在Java虛擬機上的實現(xiàn)。
RxJava繼承了觀察者模式以支持數(shù)據(jù)盔腔、事件序列杠茬,以及添加允許你以聲明的方式組合序列的運算符,同時抽出對類似于低級線程弛随、同步瓢喉、線程安全和并發(fā)數(shù)據(jù)結(jié)構(gòu)等問題的關(guān)注。
2.x版本(API文檔)
- 單獨依賴:Reactive-Streams
- 持續(xù)支持Java 6+ 以及 Android 2.3+
- 通過設(shè)計變更以及Reactive-Streams-Commons研究項目提升了性能
- Java 8 lambda友好API
- 兼容并發(fā)源(線程舀透、池栓票、事件循環(huán)、Actors、參與者等)
- 異步或同步執(zhí)行
- 用于參數(shù)化并發(fā)的虛擬時間和調(diào)度器
版本2.x和版本1.x將會在幾年內(nèi)并存走贪。他們擁有不同的Group id(io.reactivex.rxjava2
對比 io.reactivex
)和命名空間(io.reactivex
對比 rx
)佛猛。
關(guān)于版本1.x和2.x的不同可以查看維基文章《2.0有何不同》∽菇疲可以在維基主頁從整體上了解更多RxJava的內(nèi)容继找。
1.x版本
截至2018年3月31日,1.x版本已過期逃沿。不再對其進行開發(fā)婴渡、支持、維護凯亮、PRs和更新边臼。最后一個版本1.3.8的API文檔,將可持續(xù)訪問假消。
入門
設(shè)置依賴
首先將RxJava2添加到你的項目中柠并,例如,通過Gradle方式添加compile依賴:
implementation "io.reactivex.rxjava2:rxjava:2.x.y"
(請將x和y換成最新的版本號)
Hello World
接下來我們寫一段Hello World程序:
package rxjava.examples;
import io.reactivex.*;
public class HelloWorld {
public static void main(String[] args) {
Flowable.just("Hello world").subscribe(System.out::println);
}
}
如果你的平臺不支持Java 8 lambdas(目前為止)富拗,你需要手動新建一個Consumer
內(nèi)部類:
import io.reactivex.functions.Consumer;
Flowable.just("Hello world")
.subscribe(new Consumer<String>() {
@Override public void accept(String s) {
System.out.println(s);
}
});
基本類
RxJava 2 具有幾個基本類臼予,你可以在其中發(fā)現(xiàn)運算符:
-
io.reactivex.Flowable
:0..N個流,支持響應(yīng)流和背壓 -
io.reactivex.Observable
:0..N個流媒峡,無背壓 -
io.reactivex.Single
:一條只有一個條目或者錯誤的流 -
io.reactivex.Completable
:一條沒有條目但是只有一個完成或錯誤標志的流 -
io.reactivex.Maybe
:一條無條目或者只有一個條目或錯誤的流
一些術(shù)語
上游瘟栖,下游
RxJava中的數(shù)據(jù)流包含一個數(shù)據(jù)源,至少0個中間步驟谅阿,隨后是一個數(shù)據(jù)消費者或者組合器步驟(該步驟負責以某種方式消費數(shù)據(jù)流):
source.operator1().operator2().operator3().subscribe(consumer);
source.flatMap(value -> source.operator1().operator2().operator3());
在這里,如果我們想想自己處于operator2
的位置酬滤,向左看签餐,一直到source
,被稱為上游盯串。向右看氯檐,直到consumer
,被稱為下游体捏。這當我們把每個元素分開寫時通常更容易理解:
source
.operator1()
.operator2()
.operator3()
.subscribe(consumer)
運動中的對象
在RxJava的文檔中冠摄,排放物(emission),發(fā)射物(emits)几缭,條目(item)河泳,信號(signal),數(shù)據(jù)(data)以及消息(message)被認為是同意詞并且代表在整個數(shù)據(jù)流中運動的對象年栓。
背壓
當數(shù)據(jù)流運行到異步步驟時拆挥,每一步可能都以不同的速度執(zhí)行不同的操作。為了避免此類步驟(通常表現(xiàn)為由于臨時緩沖或需要跳過/刪除數(shù)據(jù)而導(dǎo)致內(nèi)存使用量增加)被吞沒某抓,應(yīng)用了所謂的凡壓力纸兔,這是一種流控制形式惰瓜,使得此類步驟能明確他們準備處理的條目數(shù)。其允許當數(shù)據(jù)流中的某個步驟無法直到上游有多少個條目會傳遞過來時汉矿,限制內(nèi)存的使用崎坊。
在RxJava中,Flowable
類專用于支持背壓洲拇,而Observable
類專用于非背壓操作(短序列流强、GUI交互等)。其他類型呻待,Single
打月,Maybe
以及Completable
都不支持背壓,并且也不應(yīng)該支持背壓蚕捉;暫時存儲一個條目的空間總會有的奏篙。
裝配時間
數(shù)據(jù)流通過應(yīng)用各種中間操作的發(fā)生而做的準備被稱為裝配時間:
Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v * v)
.filter(v -> v % 3 == 0)
;
在此時,數(shù)據(jù)還未流動并且沒有副作用發(fā)生迫淹。
訂閱時間
這是在流中的subscribe()
方法被調(diào)用使得隊列內(nèi)部建立起鏈條關(guān)系時的一個短暫的狀態(tài):
flow.subscribe(System.out::println)
這是觸發(fā)訂閱副作用的時候(參見doonsubscribe)秘通。在這種狀態(tài)下,某些源會立即阻止或開始發(fā)出條目敛熬。
運行時間
這是當流主動地發(fā)出條目肺稀、錯誤或完成信號時的狀態(tài):
Observable.create(emitter -> {
while (!emitter.isDisposed()) {
long time = System.currentTimeMillis();
emitter.onNext(time);
if (time % 2 != 0) {
emitter.onError(new IllegalStateException("Odd millisecond!"));
break;
}
}
})
.subscribe(System.out::println, Throwable::printStackTrace);
實際上,是上面給出的示例的主體執(zhí)行的時候应民。
簡單的后臺計算
在RxJava中有一個很普遍的使用場景就是在后臺線程運行一些計算话原,網(wǎng)絡(luò)請求并在UI線程顯示結(jié)果(或錯誤):
import io.reactivex.schedulers.Schedulers;
Flowable.fromCallable(() -> {
Thread.sleep(1000); // 模仿復(fù)雜的計算
return "Done";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000); // <--- 等待流結(jié)束
這種類似于建造者模式的鏈式方法風(fēng)格被稱為流式API。然而诲锹,RxJava的響應(yīng)式類型是不可改變的繁仁;每個方法調(diào)用后返回一個攜帶了被添加的行為的新Flowable
對象。為了更容易理解归园,上方的示例代碼可以被寫為如下形式:
Flowable<String> source = Flowable.fromCallable(() -> {
Thread.sleep(1000); // 模仿復(fù)雜的計算
return "Done";
});
Flowable<String> runBackground = source.subscribeOn(Schedulers.io());
Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());
showForeground.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000);
通常黄虱,你可以通過subscribeOn
將計算或者阻塞IO轉(zhuǎn)移到其他線程。一旦數(shù)據(jù)就緒庸诱,你可以通過observeOn
確保他們在前臺或者GUI線程上進行處理捻浦。
調(diào)度器
RxJava運算符不直接與Thread
或ExecutorService
直接工作,而是與所謂的Scheduler
一起工作桥爽,后者在統(tǒng)一的API背后抽象出并發(fā)源朱灿。RxJava 2 具有多個標準調(diào)度程序,可通過Schedulers
工具類訪問它們聚谁。
-
Schedulers.computation()
:在后臺中的固定數(shù)量的專用線程中運行密集的計算工作母剥。大部分異步操作將此作為默認的Scheduler
。 -
Schedulers.io()
:在一組動態(tài)變化的線程池中運行I/O類或阻塞操作。 -
Schedulers.single()
:以順序和先進先出的方式在單個線程上運行工作环疼。 -
Schedulers.trampoline()
:在一個線程中以順序和先進先出的方式運行工作习霹,通常用作測試目的。
這些可用于所有JVM平臺炫隶,但在某些特定的平臺淋叶,比如Android,定義了典型的Scheduler
:AndroidSchedulers.mainThread()
伪阶,SwingScheduler.instance()
或JavaFXSchedulers.gui()
煞檩。
此外,可以通過Schedulers.from(Executor)
方法選擇一個已有的Executor
(及其子類比如ExecutorService
)栅贴。舉個例子斟湃,持有一個更大但數(shù)量固定的線程池(與computation()
和io()
不同)時可以使用這個方法。
末尾的Thread.sleep(2000)
不會引發(fā)異常檐薯。RxJava中的默認Scheduler
運行在守護進程中凝赛,這意味著一旦Java主線程存在,它們都會停止坛缕,后臺的計算也不會發(fā)生墓猎。在這個例子中睡眠一段時間,可以讓你有足夠時間在控制臺中看到流的輸出赚楚。
流中的并發(fā)情況
RxJava中的流本質(zhì)上是連續(xù)的毙沾,分為幾個處理階段,這些處理階段可能同時運行:
Flowable.range(1, 10)
.observeOn(Schedulers.computation())
.map(v -> v * v)
.blockingSubscribe(System.out::println);
這個例子中宠页,流在計算Scheduler
中將1到10的數(shù)字作開方左胞,并在“主”線程中消耗計算結(jié)果(更精確地說,blockingSubscribe
的調(diào)用線程)勇皇。然而罩句,lambda表達式v -> v * v
并未與這個流并行運行;它在同一個計算線程上一個接一個地接收1到10的值敛摘。
并行處理
并行處理數(shù)字1到10會更復(fù)雜一些:
Flowable.range(1, 10)
.flatMap(v ->
Flowable.just(v)
.subscribeOn(Schedulers.computation())
.map(w -> w * w)
)
.blockingSubscribe(System.out::println);
實際上,并行在RxJava中其實是在不同的流中運行最后將他們的結(jié)果合并到一個單獨的流中乳愉。運算符flatMap
通過首先將1到10之間的每個數(shù)字銀蛇到它自己單獨的Flowable
中運行兄淫,然后將計算好的平方數(shù)合并起來以實現(xiàn)這種平行。
但是蔓姚,請注意捕虽,flatMap
不保證任何順序,內(nèi)部流的最終結(jié)果可能會交替出現(xiàn)坡脐。有一些其他運算符可以代替:
-
concatMap
同時在一個內(nèi)部流中進行映射以及運行 -
concatMapEager
“同時”運行所有的內(nèi)部流泄私,但是輸出流會按照這些內(nèi)部流創(chuàng)建時的順序進行排序
或者,操作符Flowable.parallel()
以及ParallelFlowable
類型可以實現(xiàn)相同的并行處理模式:
Flowable.range(1, 10)
.parallel()
.runOn(Schedulers.computation())
.map(v -> v * v)
.sequential()
.blockingSubscribe(System.out::println);
子流依賴
flatMap
是一個能在很多情況下使用的強大的操作符。比如晌端,給定一個返回Flowable
的服務(wù)捅暴,我們項用第一個服務(wù)發(fā)出的值調(diào)用另一個服務(wù):
Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();
inventorySource.flatMap(inventoryItem ->
erp.getDemandAsync(inventoryItem.getId())
.map(demand
-> System.out.println("Item " + inventoryItem.getName() + " has demand " + demand));
)
.subscribe();
持續(xù)
有時候,當一個條目變得可用時咧纠,有人希望對它執(zhí)行一些依賴性的計算蓬痒。這有時被稱為持續(xù),根據(jù)應(yīng)該發(fā)生的內(nèi)容的不同以及設(shè)計的類型的不同漆羔,可能需要不同的操作符實現(xiàn)梧奢。
依賴
最典型的場景是給定一個值,調(diào)用另一個服務(wù)演痒,等待并繼續(xù)其結(jié)果:
service.apiCall()
.flatMap(value -> service.anotherApiCall(value))
.flatMap(next -> service.finalCall(next))
還有一個場景是后面的序列需要用到之前的映射中的值亲轨。這可以通過將外部的flatMap
移動到之前的flatMap
中而解決,如下:
service.apiCall()
.flatMap(value ->
service.anotherApiCall(value)
.flatMap(next -> service.finalCallBoth(value, next))
)
這樣鸟顺,最初的value
就可以被內(nèi)部的flatMap
使用了惦蚊,由lambda變量捕獲提供。
非依賴
在其他場景中诊沪,第一個源/數(shù)據(jù)流的結(jié)果是無用的养筒,有人想要以另一個差不多的獨立的源繼續(xù),這種情況端姚,flatMap
同樣可用:
Observable continued = sourceObservable.flatMapSingle(ignored -> someSingleSource)
continued.map(v -> v.toString())
.subscribe(System.out::println, Throwable::printStackTrace);
但是晕粪,這個例子中的連續(xù)保持著Observable
而不是看起來更合適的Single
。(這是可以理解的渐裸,因為從flatMapSingle
的調(diào)用者sourceObservable
是一個多值的源巫湘,所以映射的可能的結(jié)果同樣也是多值的)。
通常昏鹃,即使有一種有點兒更具表現(xiàn)力(同時也降低了開銷)方式尚氛,通過使用Completable
作為中介及其操作符andThen
以用其他東西繼續(xù):
sourceObservable
.ignoreElements() // returns Completable
.andThen(someSingleSource)
.map(v -> v.toString())
sourceObservable
和someSingleSource
之間的唯一的依賴是前者應(yīng)當正常完成以便后者被消費。
延遲依賴
有時洞渤,在之前的序列和新序列中有一個隱藏的依賴阅嘶,由于某種原因,并未通過“常規(guī)通道”而流動载迄。有人會想要將持續(xù)寫成如下文這樣:
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.just(count.get()))
.subscribe(System.out::println);
不幸的是讯柔,因為Single.just(count.get())
在數(shù)據(jù)流還沒有運行的時候,作為參數(shù)組裝時已經(jīng)運行完成了护昧,所以最終打印了0
魂迄。我們需要某種方式以延遲這個Single
源的運算,直到主源完成運行時為止:
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.defer(() -> Single.just(count.get())))
.subscribe(System.out::println);
或者
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.fromCallable(() -> count.get()))
.subscribe(System.out::println);
類型變換
有時惋耙,一個源或服務(wù)返回了一個與應(yīng)該使用它的流不同的類型捣炬。比如熊昌,在上方的inventory例子中,getDemandAsync
應(yīng)當返回一個Single<DemandRecord>
湿酸。如果示例代碼保持不變婿屹,最終將引起一個便利錯誤(但是,經(jīng)常會拋出一個有關(guān)缺乏過載的誤導(dǎo)性錯誤消息)稿械。
待更新……