訂閱分為:同步訂閱 異步訂閱
同步訂閱Rxjava1與Rxjava2中 同步訂閱沒有用到緩沖區(qū)扩氢,只要上游事件數(shù)量不大于請(qǐng)求數(shù)量不會(huì)觸發(fā) MissingBackpressureException 異常留攒,但是這種方式在數(shù)量多了以后會(huì)出現(xiàn)大量的內(nèi)存使用率上升蚂维,有可能導(dǎo)致OOM陶耍。
同步訂閱赏陵,存在問題徒像,因?yàn)椴淮嬖诰彌_區(qū),發(fā)送事件超過接收事件拋出異常笨枯。這種可以通過 FlowableEmitter類的requested()控制被觀察者發(fā)送速度薪丁,就是發(fā)送前先獲取到觀察者能接收多少個(gè)遇西。異步訂閱Rxjava1中的Observable與少量的操作符采用了背壓處理馅精。但是緩沖區(qū)的大小為16,如果上游數(shù)據(jù)超過16個(gè)同時(shí)發(fā)送事件會(huì)觸發(fā)MissBackPressureException粱檀。
Rxjava2中的Observalbe不在支持背壓策略洲敢。但是新增了Flowable支持背壓策略。相對(duì)同步訂閱存在緩沖區(qū)茄蚯,多發(fā)送的事件會(huì)被存在緩沖區(qū)压彭。事件積壓超過一定數(shù)量會(huì)拋出MissingBackpressureException。
RxJava2采取了更先進(jìn)的背壓
策略模式1:BackpressureStrategy.ERROR?
問題:發(fā)送事件速度 > 接收事件 速度渗常,即流速不匹配具體
表現(xiàn):出現(xiàn)當(dāng)緩存區(qū)大小存滿(默認(rèn)緩存區(qū)大小 = 128)壮不、被觀察者仍然繼續(xù)發(fā)送下1個(gè)事件時(shí)
處理方式:直接拋出異常MissingBackpressureException
模式2:BackpressureStrategy.MISSING
問題:發(fā)送事件速度 > 接收事件 速度,即流速不匹配
具體表現(xiàn)是:出現(xiàn)當(dāng)緩存區(qū)大小存滿(默認(rèn)緩存區(qū)大小 = 128)皱碘、被觀察者仍然繼續(xù)發(fā)送下1個(gè)事件時(shí)
處理方式:友好提示:緩存區(qū)滿了
模式3:BackpressureStrategy.BUFFER
問題:發(fā)送事件速度 > 接收事件 速度询一,即流速不匹配
具體表現(xiàn)是:出現(xiàn)當(dāng)緩存區(qū)大小存滿(默認(rèn)緩存區(qū)大小 = 128)、被觀察者仍然繼續(xù)發(fā)送下1個(gè)事件時(shí)
處理方式:將緩存區(qū)大小設(shè)置成無限大?即被觀察者可無限發(fā)送事件觀察者癌椿,但實(shí)際上是存放在緩存區(qū)?但要注意內(nèi)存情況健蕊,防止出現(xiàn)OOM
模式4: BackpressureStrategy.DROP
問題:發(fā)送事件速度 > 接收事件 速度,即流速不匹配
具體表現(xiàn)是:出現(xiàn)當(dāng)緩存區(qū)大小存滿(默認(rèn)緩存區(qū)大小 = 128)踢俄、被觀察者仍然繼續(xù)發(fā)送下1個(gè)事件時(shí)
處理方式:超過緩存區(qū)大兴豕Α(128)的事件丟棄?如發(fā)送了150個(gè)事件,僅保存第1 - 第128個(gè)事件都办,第129 -第150事件將被丟棄
模式5:BackpressureStrategy.LATEST
問題:發(fā)送事件速度 > 接收事件 速度嫡锌,即流速不匹配
具體表現(xiàn)是:出現(xiàn)當(dāng)緩存區(qū)大小存滿(默認(rèn)緩存區(qū)大小 = 128)、被觀察者仍然繼續(xù)發(fā)送下1個(gè)事件時(shí)
處理方式:只保存最新(最后)事件琳钉,超過緩存區(qū)大惺澜ⅰ(128)的事件丟棄?即如果發(fā)送了150個(gè)事件,緩存區(qū)里會(huì)保存129個(gè)事件(第1-第128 + 第150事件)