Rxjava--背壓(Backpressure)

前言:Rxjava是通過觀察者模式設(shè)計的異步任務(wù)框架,他的有點在于簡潔性,不是代碼的簡潔性,而是邏輯的簡潔性,隨著項目的中異步任務(wù)邏輯越來越復(fù)雜,Rxjava可以一直保持代碼邏輯的簡潔,不會像handler,ThreadHandler這些在幾次線程間切換以后可能都已經(jīng)暈頭轉(zhuǎn)向了.Rxjava提供了多種類的操作符,比如ObserverOn可以為每次業(yè)務(wù)的處理的處理制定線程,flatmap這種操作符來幫助我們處理循環(huán)遍歷的問題,總的來說rxjava就是時代碼簡潔的異步任務(wù)框架.

但是在rxjava1.x版本中存在一個比較深的坑就是,有些操作符支持熬粗,背壓叶眉,有些操作符不支持背壓米罚,當我們使用不支持背壓的操作符時會報出rx.exceptions.MissingBackpressureException,當我們不理解Backpressure概念時導(dǎo)致我們很難處理這種錯誤嘱支。下面我們就來學(xué)習(xí)rxjava中背壓的概念。

Backpressure
Rx 中的數(shù)據(jù)流是從一個地方發(fā)射到另外一個地方。每個地方處理數(shù)據(jù)的速度是不一樣的零抬。如果生產(chǎn)者發(fā)射數(shù)據(jù)的速度比消費者處理的快會出現(xiàn)什么情況?在同步操作中宽涌,這不是個問題平夜,例如:

Observable<Integer> producer = Observable.create(o -> {
    o.onNext(1);
    o.onNext(2);
    o.onCompleted();
});
// Consume
producer.subscribe(i -> {
    try {
        Thread.sleep(1000);
        System.out.println(i);
    } catch (Exception e) { }
});

雖然上面的消費者處理數(shù)據(jù)的速度慢,但是由于是同步調(diào)用的卸亮,所以當 o.onNext(1) 執(zhí)行后忽妒,一直阻塞到消費者處理完才執(zhí)行 o.onNext(2)。 但是生產(chǎn)者和消費者異步處理的情況很常見兼贸。如果是在異步的情況下會出現(xiàn)什么情況呢段直?

在傳統(tǒng)的 pull 模型中,當消費者請求數(shù)據(jù)的時候溶诞,如果生產(chǎn)者比較慢柬甥,則消費者會阻塞等待辛慰。如果生產(chǎn)者比較快猫妙,則生產(chǎn)者會等待消費者處理完后再生產(chǎn)新的數(shù)據(jù)串绩。

而 Rx 為 push 模型。 在 Rx 中枉圃,只要生產(chǎn)者數(shù)據(jù)好了就發(fā)射出去了功茴。如果生產(chǎn)者比較慢,則消費者就會等待新的數(shù)據(jù)到來讯蒲。如果生產(chǎn)者快痊土,則就會有很多數(shù)據(jù)發(fā)射給消費者,而不管消費者當前有沒有能力處理數(shù)據(jù)墨林。這樣會導(dǎo)致一個問題赁酝,例如:

Observable.interval(1, TimeUnit.MILLISECONDS)
    .observeOn(Schedulers.newThread())
    .subscribe(
        i -> {
            System.out.println(i);
            try {
                Thread.sleep(100);
            } catch (Exception e) { }
        },
        System.out::println);

結(jié)果;

0
1
rx.exceptions.MissingBackpressureException

上面的 MissingBackpressureException 告訴我們,生產(chǎn)者太快了旭等,我們的操作函數(shù)無法處理這種情況酌呆。

解決:

RxJava 實現(xiàn)了一種通過 Subscriber 來通知 Observable 發(fā)射數(shù)據(jù)的方式。Subscriber 有個函數(shù) request(n)搔耕,調(diào)用該函數(shù)用來通知 Observable 現(xiàn)在 Subscriber 準備接受下面 n 個數(shù)據(jù)了隙袁。在 Subscriber 的 onStart 函數(shù)里面調(diào)用 request 函數(shù)則就開啟了reactive pull backpressure痰娱。這并不是傳統(tǒng)的 pull 模型,并不會阻塞調(diào)用菩收。只是 Subscriber 通知 Observable 當前 Subscriber 的處理能力梨睁。 通過調(diào)用 request 可以發(fā)射更多的數(shù)據(jù)。

pull

觀察者可以根據(jù)自身實際情況按需拉取數(shù)據(jù)娜饵,而不是被動接收(也就相當于告訴上游觀察者把速度慢下來)坡贺,最終實現(xiàn)了上游被觀察者發(fā)送事件的速度的控制,實現(xiàn)了背壓的策略箱舞。

class MySubscriber extends Subscriber<T> {
    @Override
    public void onStart() {
      request(1); //要在onStart中通知被觀察者先發(fā)送一個事件
    }

    @Override
    public void onCompleted() {
        ...
    }

    @Override
    public void onError(Throwable e) {
        ...
    }

    @Override
    public void onNext(T n) {
        ...
        request(1); //處理完畢之后遍坟,在通知被觀察者發(fā)送下一個事件
    }
}


Observable observable=Observable.range(1,100000);
observable.observeOn(Schedulers.newThread())
            .subscribe(new MySubscriber());

在 onStart 函數(shù)中調(diào)用 request(1) 開啟了 backpressure 模式,告訴 Observable 一次只發(fā)射一個數(shù)據(jù)晴股。在 onNext 里面處理完該數(shù)據(jù)后愿伴,可以請求下一個數(shù)據(jù)。通過 quest(Long.MAX_VALUE) 可以取消 backpressure 模式电湘。
實際上隔节,在上面的代碼中,你也可以不需要調(diào)用request(n)方法去拉取數(shù)據(jù)胡桨,程序依然能完美運行官帘,這是因為range –> observeOn,這一段中間過程本身就是響應(yīng)式拉取數(shù)據(jù)瞬雹,observeOn這個操作符內(nèi)部有一個緩沖區(qū)昧谊,Android環(huán)境下長度是16,它會告訴range最多發(fā)送16個事件酗捌,充滿緩沖區(qū)即可呢诬。不過話說回來,在觀察者中使用request(n)這個方法可以使背壓的策略表現(xiàn)得更加直觀胖缤,更便于理解尚镰。
如果你足夠細心,會發(fā)現(xiàn)哪廓,在開頭展示異常情況的代碼中狗唉,使用的是interval這個操作符,但是在這里使用了range操作符涡真,為什么呢分俯?
這是因為interval操作符本身并不支持背壓策略,它并不響應(yīng)request(n)哆料,也就是說缸剪,它發(fā)送事件的速度是不受控制的,而range這類操作符是支持背壓的东亦,它發(fā)送事件的速度可以被控制杏节。

Backpressure 策略

很多 Rx 操作函數(shù)內(nèi)部都使用了 backpressure 從而避免過多的數(shù)據(jù)填滿內(nèi)部的隊列。這樣處理慢的消費者就會把這種情況傳遞給前面的消費者,前面的消費者開始緩沖數(shù)據(jù)直到他也緩存滿為止再告訴他前面的消費者奋渔。Backpressure 并沒有消除這種情況镊逝。只是讓錯誤延遲發(fā)生,我們還是需要處理這種情況嫉鲸。
Rx 中有操作函數(shù)可以用來處理這種消費者處理不過來的情況蹋半。

onBackpressureBuffer

onBackpressureBuffer 會緩存所有當前無法消費的數(shù)據(jù),直到 Observer 可以處理為止充坑。

Observable.interval(1, TimeUnit.MILLISECONDS)
    .onBackpressureBuffer(1000)
    .observeOn(Schedulers.newThread())
    .subscribe(
        i -> {
            System.out.println(i);
            try {
                Thread.sleep(100);
            } catch (Exception e) { }
        },
        System.out::println
    );

上面的示例减江,生產(chǎn)者比消費者快 100 倍。使用 1000個緩沖來處理這種消費者比較慢的情況捻爷。當消費者消費 11個數(shù)據(jù)的時候辈灼,緩沖區(qū)滿了,生產(chǎn)者生產(chǎn)了 1100個數(shù)據(jù)也榄。數(shù)據(jù)流就拋出異常了巡莹。

onBackpressureDrop

如果消費者無法處理數(shù)據(jù),則 onBackpressureDrop 就把該數(shù)據(jù)丟棄了甜紫。

Observable.interval(1, TimeUnit.MILLISECONDS)
    .onBackpressureDrop()
    .observeOn(Schedulers.newThread())
    .subscribe(
        i -> {
            System.out.println(i);
            try {
                Thread.sleep(100);
            } catch (Exception e) { }
        },
        System.out::println);

結(jié)果:

0
1
2
...
126
127
12861
12862
...

這個示例中降宅,前面 128 個數(shù)據(jù)正常的被處理的,這是應(yīng)為 observeOn 在切換線程的時候囚霸, 使用了一個 128 個數(shù)據(jù)的小緩沖腰根。

在RxJava1.X中,背壓的設(shè)計并不十分完美拓型。而是希望你對背壓有一個全面清晰的認識额嘿,對于它在RxJava1.X中的設(shè)計缺陷有所了解即可×哟欤可喜的是册养,RxJava2.X中解決了背壓的問題,推出了Flowable(Observable在RxJava2.0中新的實現(xiàn))压固,而且其中的操作符全部都實現(xiàn)了背壓

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末球拦,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子帐我,更是在濱河造成了極大的恐慌坎炼,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,039評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件焚刚,死亡現(xiàn)場離奇詭異点弯,居然都是意外死亡,警方通過查閱死者的電腦和手機矿咕,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,426評論 3 395
  • 文/潘曉璐 我一進店門抢肛,熙熙樓的掌柜王于貴愁眉苦臉地迎上來狼钮,“玉大人,你說我怎么就攤上這事捡絮“疚撸” “怎么了?”我有些...
    開封第一講書人閱讀 165,417評論 0 356
  • 文/不壞的土叔 我叫張陵福稳,是天一觀的道長涎拉。 經(jīng)常有香客問我,道長的圆,這世上最難降的妖魔是什么鼓拧? 我笑而不...
    開封第一講書人閱讀 58,868評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮越妈,結(jié)果婚禮上季俩,老公的妹妹穿的比我還像新娘。我一直安慰自己梅掠,他們只是感情好酌住,可當我...
    茶點故事閱讀 67,892評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著阎抒,像睡著了一般酪我。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上且叁,一...
    開封第一講書人閱讀 51,692評論 1 305
  • 那天都哭,我揣著相機與錄音,去河邊找鬼谴古。 笑死质涛,一個胖子當著我的面吹牛稠歉,可吹牛的內(nèi)容都是我干的掰担。 我是一名探鬼主播,決...
    沈念sama閱讀 40,416評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼怒炸,長吁一口氣:“原來是場噩夢啊……” “哼带饱!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起阅羹,我...
    開封第一講書人閱讀 39,326評論 0 276
  • 序言:老撾萬榮一對情侶失蹤勺疼,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后捏鱼,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體执庐,經(jīng)...
    沈念sama閱讀 45,782評論 1 316
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,957評論 3 337
  • 正文 我和宋清朗相戀三年导梆,在試婚紗的時候發(fā)現(xiàn)自己被綠了轨淌。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片迂烁。...
    茶點故事閱讀 40,102評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖递鹉,靈堂內(nèi)的尸體忽然破棺而出盟步,到底是詐尸還是另有隱情,我是刑警寧澤躏结,帶...
    沈念sama閱讀 35,790評論 5 346
  • 正文 年R本政府宣布却盘,位于F島的核電站,受9級特大地震影響媳拴,放射性物質(zhì)發(fā)生泄漏黄橘。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,442評論 3 331
  • 文/蒙蒙 一屈溉、第九天 我趴在偏房一處隱蔽的房頂上張望旬陡。 院中可真熱鬧,春花似錦语婴、人聲如沸描孟。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,996評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽匿醒。三九已至,卻和暖如春缠导,著一層夾襖步出監(jiān)牢的瞬間廉羔,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,113評論 1 272
  • 我被黑心中介騙來泰國打工僻造, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留憋他,地道東北人。 一個月前我還...
    沈念sama閱讀 48,332評論 3 373
  • 正文 我出身青樓髓削,卻偏偏與公主長得像竹挡,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子立膛,可洞房花燭夜當晚...
    茶點故事閱讀 45,044評論 2 355

推薦閱讀更多精彩內(nèi)容