FluxJava 最初的設(shè)計(jì)就是以 Add-on 的方式來(lái)提供對(duì)于 RxJava 的支持炎功,所以這次增加 RxJava2 的部份也依照相同的模式枚冗,在 Project 中加上了 fluxjava-rx2 的 Module。新的 Module 功能上與 fluxjava-rx 大致上相同蛇损,只是原本以 RxJava 規(guī)格運(yùn)作的部份赁温,改為使用 RxJava2。
由于 RxJava 與 RxJava2 不太有機(jī)會(huì)共存在同一個(gè) Module 里淤齐,所以 fluxjava-rx2 沿用了 fluxjava-rx 的 Package 名稱(chēng)股囊,在使用上這二個(gè) Add-on 必須要擇一引用。不過(guò)也帶來(lái)了一個(gè)額外的好處更啄,如果想要由 fluxjava-rx 升級(jí)到 fluxjava-rx2 時(shí)稚疹,只要修改成 RxJava2 的調(diào)用規(guī)格,不用再特別調(diào)整 Import 的內(nèi)容祭务。
配合 RxJava2 的更新贫堰,按照慣例增加了一篇文章手把手教你如何一起用 FluxJava 與 RxJava2 做為使用上的參考穆壕。更多深入的文章請(qǐng)參閱簡(jiǎn)書(shū) http://www.reibang.com/u/fea63707e07f 或 wznote.blogspot.com。
在 fluxjava-rx2 中與 fluxjava-rx 最大的差異其屏,主要是因應(yīng) RxJava2 把原本的 Observable 分成了有背壓版本的 Flowable 與沒(méi)有背壓版本的 Observable喇勋。因此 RxBus 與 RxStore 中都分別再提供了 toFlowable
的 Method 來(lái)取得 Flowable,不過(guò) Observable 本來(lái)就可以再轉(zhuǎn)換為 Flowable偎行,此處的功能只是為了增加便利性川背、簡(jiǎn)化源代碼之用。
同時(shí)蛤袒,利用這篇文章補(bǔ)充說(shuō)明一下一些使用上的技術(shù)細(xì)節(jié)熄云。在 RxStore 中使用的是沒(méi)有背壓版本的 Observable 來(lái)接收外部傳來(lái)的信息,只是接收到之后就會(huì)被分派到不同的 Thread 上去處理后續(xù)的工作妙真,所以在這個(gè)部份是不大有機(jī)會(huì)遇上 MissingBackpressureException 問(wèn)題的缴允。但是,這并不代表背壓所造成的情況就不存在了珍德,只是瓶頸移到了 ThreadPool 的承受能力或是執(zhí)行的環(huán)境可以產(chǎn)生 Thread 的數(shù)量上练般。
就算是使用有背壓功能的 Flowable,也不代表就可以高枕無(wú)憂了锈候。說(shuō)穿了薄料,背壓本身不是什么神奇的黑科技,原理上只是在上下游中間加了個(gè)水池泵琳,讓下游有喘息的空間摄职。水池畢竟還是有一定的物理限制,沒(méi)有控制好获列,依舊會(huì)讓水池承載不下而出現(xiàn)錯(cuò)誤谷市。
因此,不論使用哪一種方法击孩,前端發(fā)送的數(shù)量仍然應(yīng)該要被謹(jǐn)慎地控制迫悠,避免海量的信息把接收端給淹沒(méi)了。像是把 RecyclerView 滾動(dòng)時(shí)產(chǎn)生位移的 Event 毫無(wú)選擇地往 RxStore 送。
在發(fā)送端就要篩選信息,除了要減少 MissingBackpressureException 可能會(huì)出現(xiàn)的機(jī)會(huì)外薪寓,還有一個(gè)目的是要節(jié)省執(zhí)行成本。當(dāng)信息數(shù)量大到無(wú)法處理時(shí)验烧,能做的只有挑選值得處理的部份來(lái)進(jìn)行。當(dāng)挑選的工作被移到發(fā)送后又跛,RxJava 的傳送機(jī)制表面上看起來(lái)就是簡(jiǎn)單地轉(zhuǎn)了一手碍拆,但是如果去追蹤其源代碼,就可以發(fā)現(xiàn)其實(shí)底下做了不少的工作。而每一次傳送都要運(yùn)行這些內(nèi)容感混,可以想見(jiàn)在數(shù)量到達(dá)一定的程度之后端幼,就會(huì)顯現(xiàn)出可觀的效能差距。
由于 Observable 在定義上所形成的限制使然弧满,同一個(gè)發(fā)送源無(wú)法把的信息分配至不同的 Thread 上送出婆跑。RxStore 為了要讓每個(gè)資料處理要求可以獨(dú)立、同步地進(jìn)行庭呜,所以才會(huì)在接收到信息后滑进,以不同的 Thread 進(jìn)行后續(xù)的工作。
在 RxStore 里提供了一個(gè) getExecutor
Method募谎,可以使用 ThreadPool 來(lái)做為背壓的替代方案扶关,但是并沒(méi)有像背壓一樣有可以控制上游的功能。在實(shí)作 getExecutor
時(shí)要注意数冬,不要直接在回傳時(shí) New 一個(gè) ThreadPool 的 Instance节槐。因?yàn)?getExecutor
是在每一次收到信息后調(diào)用一次。以上的做法拐纱,也等同于每一次收到信息就拿到一個(gè) ThreadPool 的 Instance铜异,每一個(gè) ThreadPool 都只會(huì)產(chǎn)生一個(gè) Thread,這就失去了使用 ThreadPool 的用意戳玫。
最容易出現(xiàn)以上問(wèn)題的情況是使用 Executors 來(lái)取得 ThreadPool熙掺,一般的情況下很容易忽略 Executors 其實(shí)是每次調(diào)用就產(chǎn)生一個(gè) Instance未斑。所以當(dāng)以 return Executors.newFixedThreadPool(poolSize);
的方式回傳 getExecutor
時(shí)咕宿,一開(kāi)始也許顯示不出問(wèn)題而被略過(guò),但是一但信息爆量后蜡秽,就會(huì)因?yàn)楫a(chǎn)生 Thread 的數(shù)量到達(dá)上限而中止運(yùn)作府阀。
以上,是這次補(bǔ)充的內(nèi)容芽突,讓使用 FluxJava 的朋友做為參考试浙。