前言
本系列文章適用于已經(jīng)了解 RxJava 的讀者震糖,深入貫徹其原理总珠,加深對(duì)其的認(rèn)識(shí)。如果從未了解過(guò) RxJava 的讀者們溉仑,建議先熟悉它挖函。
RxJava 0.x
RxJava 最早是 Netflix 參照微軟的 Rx.Net,在 Java 上實(shí)現(xiàn)一套類似的庫(kù)浊竟,0.x 其實(shí)就是社區(qū)內(nèi)部迭代開(kāi)發(fā)的時(shí)代挪圾。
在 0.x 的迭代過(guò)程中,API 還不穩(wěn)定逐沙,在長(zhǎng)期的變更中,逐步完善了 Observable洼畅,Publisher吩案,Subscriber,Scheduler 等接口以及大量的操作符帝簇。
Reactive Streams
在開(kāi)發(fā) RxJava 早期版本的過(guò)程中徘郭,開(kāi)發(fā)組也參與了制定 Reactive Streams 規(guī)范。
但是 RxJava1 并沒(méi)有遵循這個(gè)規(guī)范丧肴,因?yàn)榭紤]到下面幾個(gè)原因:
- Subscriber.onComplete 和 當(dāng)時(shí)的 Observer.onCompleted 相差一個(gè)字母不见,導(dǎo)致 API 不能兼容雅任。
- RxJava 中的 Subscription 和 RS 中的有些細(xì)微的區(qū)別,它不能在沒(méi)有較大變動(dòng)的情況下實(shí)現(xiàn) request(n) 方法,因此 RxJava 引入了 Producer 這個(gè)接口希痴,并混在了 Subscriber 中。
- RS 彼時(shí)不是穩(wěn)定版本棒掠,也在持續(xù)修訂中厉萝。
因此 RxJava 開(kāi)發(fā)組決定在 2.0 版本中正式支持 RS 規(guī)范,在 1.x 版本中實(shí)現(xiàn)類似的機(jī)制瘤旨,而不像在 2.0 中直接使用 RS 的接口梯啤。
Backpressure
由于 RS 的影響,在 0.20.0-RC1 中 RxJava 第一次引入 Backpressure 的概念存哲,從此 RxJava 變成了一個(gè)讓人愛(ài)恨交織的庫(kù)因宇。事實(shí)上 RxJava 開(kāi)發(fā)組也曾表示七婴,在 RxJava 早期版本中,在 Observable 混入 Backpressure 是一個(gè)重大的失誤察滑。
- 由于 Backpressure 是在中間版本引入的打厘,因此部分操作符支持,部分操作符不支持杭棵,導(dǎo)致對(duì)使用者有些混亂和不友好婚惫。
- 在熱數(shù)據(jù)源中(如點(diǎn)擊事件),是無(wú)法被正確地 backpressured 魂爪,從而導(dǎo)致經(jīng)常出現(xiàn)意外的
MissingBackpressureException
。
事實(shí)上正如現(xiàn)在 2.x 中做的那樣蒋川,正確的做法是應(yīng)該把模塊分為 支持 Backpressure 和不支持的兩類撩笆。在 io.reactivex.Observable
中徹底移除了 Backpressure捺球,而 io.reactivex.Flowable
則遵循 RS 規(guī)范支持 Backpressure。
RxJava 1.x
經(jīng)過(guò)兩年多的迭代夕冲,RxJava 在 14 年 9 月發(fā)布了 1.0.0 正式版氮兵。
上面有提到歹鱼,其實(shí) 1.x 是一個(gè)類似 RS 的版本,但是不依賴 RS 的接口弥姻。同時(shí)對(duì)比 0.x南片,做了如下的更改。
依賴方式
groupId | artifactId | |
---|---|---|
0.x | com.netflix.rxjava | rxjava-core |
1.x | io.reactivex | rxjava |
拆分項(xiàng)目
在 RxJava 1.0.0 發(fā)布之際庭敦,把 JVM 上其他語(yǔ)言的實(shí)現(xiàn)和子工程都獨(dú)立出去了,而在 RxJava 庫(kù)中只保留了 Java 版的實(shí)現(xiàn)秧廉。
如:
其他
新增和廢棄了部分操作符,修復(fù)了大量的 BUG赔癌。
RxJava 2.x
RxJava 2.0.0 正式版發(fā)布于 2016 年 9 月底澜沟。
筆者也曾寫(xiě)過(guò)一篇 《淺談RxJava與2.0的新特性》,不過(guò)寫(xiě)那篇文章時(shí)還在版本還在 2.0.0-RC1茫虽,以現(xiàn)在的角度看起來(lái)不免顯得不夠全面既们,因此最好的理解方式還是看官方的 Wiki正什。
類型
1.x | 2.x | |
---|---|---|
Single/Completable | Maybe | 0 或 1 數(shù)據(jù)源 |
Observable | Flowable | 多數(shù)據(jù)源 |
Subject | Processor | 熱數(shù)據(jù)源 |
正如上面所說(shuō)的,RxJava2 遵循了 RS 規(guī)范斯棒,其冷數(shù)據(jù)源真正實(shí)現(xiàn)的類型就是 Flowable主经,熱數(shù)據(jù)源的實(shí)現(xiàn)則在io.reactivex.processors
包中。
同時(shí)也把 Observable 中舊的 Backpressure 徹底移除罩驻,因此在 RxJava2 中使 用 Observable 再也不會(huì)拋出MissingBackpressureException
。
Observable 與 Flowable
在 RxJava2 中砾跃, Flowable 和 Observable 雖然實(shí)現(xiàn)的代碼復(fù)用了一部分节吮,但是機(jī)制卻大相徑庭。這里要涉及到數(shù)據(jù)源的三種模型:
數(shù)據(jù)源
- 響應(yīng)式推:如鼠標(biāo)點(diǎn)擊事件透绩。此時(shí)生產(chǎn)者無(wú)條件產(chǎn)生數(shù)據(jù),消費(fèi)者負(fù)責(zé)配合生產(chǎn)者。
- 同步拉:如
Iterable
的迭代请毛。此時(shí)消費(fèi)者提出要求,生產(chǎn)者配合消費(fèi)者下發(fā)固棚,數(shù)據(jù)源是確定的仙蚜。 - 異步拉:如
Future
,Processor
呜师。消費(fèi)者提出要求贾节,生產(chǎn)者據(jù)此下發(fā)數(shù)據(jù)衷畦,但是數(shù)據(jù)到來(lái)不確定知牌。
Observable
在 Observable 中,消費(fèi)者是無(wú)權(quán)提出要求的角寸,即數(shù)據(jù)都是生產(chǎn)者提供的,消費(fèi)者只能被動(dòng)接受沮峡。雖然數(shù)據(jù)源不確定纹磺,但是對(duì)消費(fèi)者是透明的,只能被動(dòng)等待數(shù)據(jù)秘症。由于 Observable 已經(jīng)徹底移除了 Backpressure式矫,因此對(duì)于消費(fèi)速度和生產(chǎn)速度不協(xié)調(diào)時(shí),中間操作符可能會(huì)創(chuàng)建 Buffer(如 observeOn)來(lái)緩存數(shù)據(jù)采转。因此數(shù)據(jù)在無(wú)限的積累中可能會(huì)導(dǎo)致 OOM, 但不再會(huì)拋出MissingBackpressureException
板熊。
Flowable
在 Flowable 中察绷,消費(fèi)者通過(guò)Subscription
主動(dòng)的向生產(chǎn)者提出自己需求的數(shù)量,上游據(jù)此發(fā)射數(shù)據(jù)容劳。從而就有生產(chǎn)和消費(fèi)的矛盾闸度。如果是數(shù)據(jù)源是響應(yīng)式推或者異步拉時(shí),可能會(huì)導(dǎo)致MissingBackpressureException
莺禁。
舉例
這么說(shuō)有點(diǎn)抽象,我們舉個(gè)例子肪获。Subject / Processor 就是對(duì)應(yīng)異步拉的數(shù)據(jù)源,也是熱數(shù)據(jù)源孝赫。消費(fèi)者在訂閱他們后青柄,無(wú)論是否可以 request,數(shù)據(jù)的是否產(chǎn)生以及產(chǎn)生速度也是未知的致开。
在接受到onNext
時(shí):
- Subject 直接轉(zhuǎn)發(fā)給下游
- Prosessor 檢查下游的 request 數(shù)目,如果少于已經(jīng)發(fā)送的數(shù)目虹蒋,則拋出
MissingBackpressureException
因此使用Prosessor
稍有不慎就會(huì)出錯(cuò)哦飒货。當(dāng)然在實(shí)際使用中Flowable.subscribe()
時(shí),內(nèi)置的 Subscriber 通常都會(huì)在 onSubscribe 時(shí)直接向生產(chǎn)者request(Long.MAX_VALUE)
晃虫,在 RxJava2 Long.MAX_VALUE 是一個(gè)特殊值扣墩,意味著無(wú)限流。大家可參見(jiàn) subscribers
選擇
對(duì)于 Observable 與 Flowable 的選擇官方也有提示:
-
選擇 Observable:
- 處理不超過(guò) 1000 個(gè)數(shù)據(jù)時(shí)呻惕,因?yàn)閿?shù)據(jù)很少,一般不會(huì)觸發(fā) OOM
- 處理 GUI 或者點(diǎn)擊事件時(shí)草巡,因?yàn)檫@些事件是異步推的型酥,很難被 backpressured 也一般不這樣做
- 數(shù)據(jù)源本質(zhì)上是同步的查乒,但是平臺(tái)不支持 Java Stream API 或者你想用 RxJava 豐富的操作符,因?yàn)?Observable 比 Flowable 的性能更好
-
選擇 Flowable:
- 處理 10k+ 數(shù)據(jù)且數(shù)據(jù)源是可控的
- 基于拉的且阻塞的數(shù)據(jù)源
拆分之爭(zhēng)
事實(shí)上 RxJava 自 16年 開(kāi)始社區(qū)一度有討論是否要把 RxJava2 拆分成多個(gè)庫(kù)由境,因?yàn)椋?/p>
- 區(qū)分是否支持 Backpressure
- 將通用的代碼獨(dú)立出去,如
Scheduler
讥蟆、SimpleQueue
- 減少包體積的大小
但是最終經(jīng)過(guò)討論后還是放棄了:
- 使用 Proguard 來(lái)壓縮 RxJava2 的效果非常好纺阔,基本上是用多少保留多少代碼
- 如果拆分開(kāi),強(qiáng)行逼迫使用者需要了解 Backpressure 的概念笛钝,且增加了使用方的麻煩,因?yàn)樗麄冃枰獏^(qū)分自己到底需要哪些庫(kù)
- 多個(gè)不同版本子庫(kù)的組合可能會(huì)導(dǎo)致兼容問(wèn)題
- 拆分后结榄,Observable 和 Flowable 互不依賴囤捻,互轉(zhuǎn)需要使用靜態(tài)方法,打斷了鏈?zhǔn)讲僮?/li>
- ...
結(jié)語(yǔ)
如果您作為一個(gè) Android 開(kāi)發(fā)者视哑,正在糾結(jié)于 RxJava 帶來(lái)的好處和他的龐大的體積瘟则,那么您可以打消這個(gè)顧慮了,只要正確地配置了 Proguard醋拧,RxJava 對(duì)您包體積大小的影響微乎其微。反過(guò)來(lái)說(shuō)庆械,如果沒(méi)有配置 Proguard菌赖,那么是否引入 RxJava 確實(shí)是值得思考的一件事。
事實(shí)上堕绩,做一個(gè) Android 開(kāi)發(fā)者邑时,筆者認(rèn)為 RxJava 簡(jiǎn)直是為 Android 而生的,天生響應(yīng)式的事件與 RxJava 的結(jié)合能大幅提供您的工作效率晶丘。前提是您有一些函數(shù)式編程的思維唐含,能把流程拆解成一個(gè)個(gè)操作符沫浆。