RxJava2.0學習筆記

序言

由于我是先學習了1.0版本接著繼續(xù)學習2.0休涤,所以本文可能不太適合沒有接觸過RxJava的同學,所以可以先看一下诫舅,1.0的學習筆記,傳送門 http://www.reibang.com/p/a8cedc061ab1

首先要使用RxJava2羽利,先要添加依賴:

compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

類介紹

與RxJava1相比:

  • 相同點:
    都有Observable,Observer刊懈,subscribe()
  • 不同點:
  1. 在RxJava2的Observable中重寫的方法變?yōu)閟ubscribe(ObservableEmitter<String> emitter),其中的ObservableEmitter: Emitter是發(fā)射器的意思这弧,就是用來發(fā)出事件的。

  2. 在RxJava2的observer重寫方法中新添加了一個方法onSubscribe(Disposable d),其中Disposable是一次性用品虚汛,用完就丟棄匾浪,對與這個參數(shù)可以理解為一個攔截器,將所有發(fā)送過來的數(shù)據(jù)攔截下倆卷哩,讓observer不會收到蛋辈。

  3. 在RxJava2中subscribe()重載方法的參數(shù)變了。

        public final Disposable subscribe() {}
        public final Disposable subscribe(Consumer<? super T> onNext) {}
        public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} 
        public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
        public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
        public final void subscribe(Observer<? super T> observer) {}
    

在實際項目中我們一般只關(guān)心onNext(),和onError(),所以我們一般只會重載兩個參數(shù)的殉疼。

然后我們用代碼來理解一下上面的相同點和不同點

Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            Log.e(TAG, "subscribe: 1" );
            e.onNext(1);
            Log.e(TAG, "subscribe: 2" );
            e.onNext(2);
            Log.e(TAG, "subscribe: 3" );
            e.onNext(3);
            e.onComplete();
        }
    });

    Observer<Integer> observer = new Observer<Integer>() {
        private Disposable disposable;

        @Override
        public void onSubscribe(Disposable d) {
            Log.e(TAG, "onSubscribe: " + d.isDisposed());
            disposable = d;
        }

        @Override
        public void onNext(Integer value) {
            Log.e(TAG, "onNext: " + value);
            if (value == 2) {
                disposable.dispose();
                Log.e(TAG, "onNext: " + disposable.isDisposed());
            }
        }

        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "onError: " + e.getMessage());
        }

        @Override
        public void onComplete() {
            Log.e(TAG, "onComplete: ");
        }
    };

    observable.subscribe(observer);

運行結(jié)果如下所示:

com.example.cosima.rxjava2learn E/MainActivity: onSubscribe: false
com.example.cosima.rxjava2learn E/MainActivity: subscribe: 1
com.example.cosima.rxjava2learn E/MainActivity: onNext: 1
com.example.cosima.rxjava2learn E/MainActivity: subscribe: 2
com.example.cosima.rxjava2learn E/MainActivity: onNext: 2
com.example.cosima.rxjava2learn E/MainActivity: onNext: true
com.example.cosima.rxjava2learn E/MainActivity: subscribe: 3

可以看到在disposable為true的時候梯浪,Observable可以發(fā)送數(shù)據(jù),但是在Observer沒有接收到數(shù)據(jù)瓢娜。

注:與RxJava1相同挂洛,當Observable發(fā)送onComplete之后,Observable在onComplete之后的數(shù)據(jù)可以發(fā)送眠砾,Observer在接收到onComplete之后不再繼續(xù)接收事件虏劲。onError與onComplete的原理一樣,但是onComplete和onError必須唯一并且互斥, 即不能發(fā)多個onComplete, 也不能發(fā)多個onError, 也不能先發(fā)一個onComplete, 然后再發(fā)一個onError, 反之亦然褒颈。

創(chuàng)建方式

想對于RxJava1來說柒巫,Observable的創(chuàng)建方式心添加了幾種方式:

  • fromIterable((Iterable<? extends T> list)方式
    遍歷集合,發(fā)送每個item相當于多次調(diào)用onNext().

注Collection接口是Iterable接口的子接口谷丸,所以所有Collection接口的實現(xiàn)類都可以作為Iterable對象直接傳入fromIterable()方法堡掏。

  • fromArray(T... items)方式
    遍歷集合,發(fā)送每個item相當于多次調(diào)用onNext().

  • interval(long period, TimeUnit unit)方式
    創(chuàng)建一個按固定時間間隔發(fā)射整數(shù)序列的Observable刨疼,可用作定時器 period:時間間隔

  • interval(long initialDelay, long period, TimeUnit unit)方式
    initialDelay:開始值泉唁,默認為0。

  • range(final int start, final int count)方式
    創(chuàng)建一個發(fā)射特定整數(shù)序列的Observable揩慕,第一個參數(shù)為起始值亭畜,第二個為發(fā)送的個數(shù),如果為0則不發(fā)送迎卤,負數(shù)則拋異常拴鸵。

  • timer(long delay, TimeUnit unit)方式
    一個給定的延遲后發(fā)射一個特殊的值,即表示延遲2秒后蜗搔,調(diào)用onNext()方法劲藐。

用于實現(xiàn)觀察者模式方式有很多種:
<center>

</center>

現(xiàn)在我們來實現(xiàn)一個簡單的倒計時功能:

封裝Observable

private Observable<Integer> initEvent2(final int time) {
    return Observable.interval(0, 1, TimeUnit.SECONDS)
            .subscribeOn(AndroidSchedulers.mainThread())
            .observeOn(AndroidSchedulers.mainThread())
            .map(new Function<Long, Integer>() {
                @Override
                public Integer apply(Long aLong) throws Exception {
                    return time - aLong.intValue();
                }
            })
            .take(time + 1);//限制循環(huán)次數(shù)
}

使用如下:

initEvent2(5).doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            Log.e(TAG, "accept: 開始計時");
        }
    })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.e(TAG, "accept: 當前計時" + integer);
                }
            });

打印結(jié)果為

在上述代碼中我們使用到了map()操作符,接下來我們就一起來看下RxJava中的操作符

RxJava操作符

1.map()操作符:

把原來的Observable對象轉(zhuǎn)換成另一個Observable對象
2.flatMap()操作符:


flatMap和Map的相同點就是把一個對象轉(zhuǎn)化為另一個對象返回碍扔,但是不同的是flatMap()返回的是個Observable對象瘩燥,并且這個對象并不是直接發(fā)送到了回調(diào)方法中,而是把這個對象激活不同,之后將他發(fā)送到回調(diào)方法中厉膀。
3.filter()操作符:


根據(jù)自己的需求加入判斷邏輯,他的返回值是true或者是false二拐,用于表示是否需要被過濾服鹅。
4.take()操作符:


再上面的代碼中已經(jīng)用到過,具體含義就是限制輸出次數(shù)
5.doOnNext()操作符:


在輸出前可以做一個額外的操作
6.delay()操作符:


延遲Observer的輸出

線程控制Scheduler

該部分與RxJava1相比沒有變換百新,在平常使用中較多的都是以下幾個:

  • Schedulers.newThread(): 總是啟用新線程企软,并在新線程執(zhí)行操作。
  • Schedulers.io(): I/O 操作(讀寫文件饭望、讀寫數(shù)據(jù)庫仗哨、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler形庭。行為模式和 newThread() 差不多,區(qū)別在于 io() 的內(nèi)部實現(xiàn)是用一個無數(shù)量上限的線程池厌漂,可以重用空閑的線程萨醒,因此多數(shù)情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中苇倡,可以避免創(chuàng)建不必要的線程富纸。
  • Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算旨椒,即不會被 I/O 等操作限制性能的操作晓褪,例如圖形的計算。這個 Scheduler 使用的固定的線程池综慎,大小為 CPU 核數(shù)涣仿。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU示惊。
  • Android 還有一個專用的 AndroidSchedulers.mainThread()变过,它指定的操作將在 Android 主線程運行。

都是通過subscribeOn()observeOn() 兩個方法來對線程進行控制涝涤。
具體可見RxJava1.0學習筆記

與Retrofit結(jié)合使用

與1.0版本總體相差不多媚狰,只需要修改相應(yīng)的參數(shù)就可以了,比如subcribe()方法中的參數(shù)阔拳,請求如下所示:
主要內(nèi)容為紅框所示

接下來講一下RxJava2特有的了崭孤。

Flowable及Backpressure

來由

Rxjava2中,還有一個很大的變化就是Backpressure(背壓)糊肠,何為背壓辨宠,就是觀察者來不及處理被觀察者發(fā)出的事件(產(chǎn)生事件的速度大于處理事件的速度),導(dǎo)致事件被無限堆積货裹,最后產(chǎn)生異常嗤形。Flowable就是由此產(chǎn)生,專門用來處理這類問題弧圆。將原來的Observable拆分成了新的Observable和Flowable赋兵,同時其他相關(guān)部分也同時進行了拆分。

注意:處理Backpressure的策略僅僅是處理Subscriber接收事件的方式搔预,并不影響Flowable發(fā)送事件的方法霹期。即使采用了處理Backpressure的策略,F(xiàn)lowable原來以什么樣的速度產(chǎn)生事件拯田,現(xiàn)在還是什么樣的速度不會變化历造,主要處理的是Subscriber接收事件的方式。

處理Backpressure的策略

  • 產(chǎn)生原因:
    生產(chǎn)者和消費者不在同一線程下长已,生產(chǎn)者的速度大于消費者的速度褂删,就會產(chǎn)生Backpressure問題冯勉。如果生產(chǎn)者和消費者在同一線程下侈玄,不會產(chǎn)生Backpressure問題,所以可以說成同步不會產(chǎn)生問題祥国,異步可能產(chǎn)生問題培他。

  • 處理策略:
    1. ERROR策略
    產(chǎn)生Backpressure問題的時候直接拋出異常(MissingBackpressureException)

          Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() {
          @Override
          public void subscribe(FlowableEmitter<String> e) throws Exception {
              e.onNext("我");
              e.onNext("愛");
              e.onNext("你");
              e.onComplete();
          }
          }, BackpressureStrategy.ERROR);
    
          Subscriber<String> subscriber = new Subscriber<String>() {
              @Override
              public void onSubscribe(Subscription s) {
                  Log.e(TAG, "onSubscribe: ");
                  s.request(Long.MAX_VALUE);
              }
    
              @Override
              public void onNext(String s) {
                  Log.e(TAG, "onNext: " + s);
              }
    
              @Override
              public void onError(Throwable t) {
                  Log.e(TAG, "onError: " + t.getMessage());
              }
    
              @Override
              public void onComplete() {
                  Log.e(TAG, "onComplete: " );
              }
          };
          flowable.subscribeOn(Schedulers.io())
                  .observeOn(AndroidSchedulers.mainThread())
                  .subscribe(subscriber);
    

上述代碼中在Flowable(被觀察者)添加了一個參數(shù)绸罗,并且onSubscribe(Subscription s)中傳給我們的不再是Disposable了, 而是Subscription荒典。然而Subscription也可以用于切斷觀察者與被觀察者之間的聯(lián)系,調(diào)用Subscription.cancel()方法吞鸭,并在此方法中添加了s.request(long count);這個方法就是用來向生產(chǎn)者申請可以消費的事件數(shù)量寺董。這樣就可以根據(jù)自身的消費能力進行消費。在異步調(diào)用時刻剥,RxJava中有個緩存池遮咖,用來緩存消費者處理不了暫時緩存下來的數(shù)據(jù),緩存池的默認大小為128造虏,即只能緩存128個事件御吞。無論request()中傳入的數(shù)字比128大或小,緩存池中在剛開始都會存入128個事件漓藕。當然如果本身并沒有這么多事件需要發(fā)送陶珠,則不會存128個事件。
在ERROR策略下享钞,如果緩存池溢出揍诽,就會立刻拋出MissingBackpressureException異常。

注:如果不調(diào)用request表示消費能力為0栗竖。如果不限制想request()中傳入任意參數(shù)暑脆,超過消費能力,也會造成資源浪費狐肢,產(chǎn)生OOM添吗。

2. BUFFER策略
BUFFER就是把RxJava中默認的只能存128個事件的緩存池換成一個大的緩存池,支持存很多很多的數(shù)據(jù)份名。
這樣碟联,消費者通過request()即使傳入一個很大的數(shù)字,生產(chǎn)者也會生產(chǎn)事件僵腺,并將處理不了的事件緩存玄帕。
但是這種方式任然比較消耗內(nèi)存,除非是我們比較了解消費者的消費能力想邦,能夠把握具體情況裤纹,不會產(chǎn)生OOM。BUFFER要慎用
3. DROP策略
當消費者處理不了事件,就丟棄鹰椒。
消費者通過request()傳入其需求n锡移,然后生產(chǎn)者把n個事件傳遞給消費者供其消費。其他消費不掉的事件就丟掉漆际。
4. LATEST策略
LATEST與DROP功能基本一致淆珊。
消費者通過request()傳入其需求n,然后生產(chǎn)者把n個事件傳遞給消費者供其消費奸汇。其他消費不掉的事件就丟掉施符。
唯一的區(qū)別就是LATEST總能使消費者能夠接收到生產(chǎn)者產(chǎn)生的最后一個事件。
還是以上述例子展示擂找,唯一的區(qū)別就是Flowable不再無限發(fā)事件戳吝,只發(fā)送1000000個。

參考:http://www.reibang.com/p/d149043d103a

源碼地址:https://github.com/MrMJL/RxJava2Demo
由于只是學習筆記贯涎,源碼可能會有點亂听哭,又不對或者不明白歡迎評論多多交流。推薦一個Android實習&&經(jīng)驗交流群:541144061

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末塘雳,一起剝皮案震驚了整個濱河市陆盘,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌败明,老刑警劉巖隘马,帶你破解...
    沈念sama閱讀 212,816評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異妻顶,居然都是意外死亡祟霍,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,729評論 3 385
  • 文/潘曉璐 我一進店門盈包,熙熙樓的掌柜王于貴愁眉苦臉地迎上來沸呐,“玉大人,你說我怎么就攤上這事呢燥≌柑恚” “怎么了?”我有些...
    開封第一講書人閱讀 158,300評論 0 348
  • 文/不壞的土叔 我叫張陵叛氨,是天一觀的道長呼渣。 經(jīng)常有香客問我,道長寞埠,這世上最難降的妖魔是什么屁置? 我笑而不...
    開封第一講書人閱讀 56,780評論 1 285
  • 正文 為了忘掉前任,我火速辦了婚禮仁连,結(jié)果婚禮上蓝角,老公的妹妹穿的比我還像新娘阱穗。我一直安慰自己,他們只是感情好使鹅,可當我...
    茶點故事閱讀 65,890評論 6 385
  • 文/花漫 我一把揭開白布揪阶。 她就那樣靜靜地躺著,像睡著了一般患朱。 火紅的嫁衣襯著肌膚如雪鲁僚。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 50,084評論 1 291
  • 那天裁厅,我揣著相機與錄音冰沙,去河邊找鬼。 笑死执虹,一個胖子當著我的面吹牛拓挥,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播声畏,決...
    沈念sama閱讀 39,151評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼姻成!你這毒婦竟也來了插龄?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,912評論 0 268
  • 序言:老撾萬榮一對情侶失蹤科展,失蹤者是張志新(化名)和其女友劉穎均牢,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體才睹,經(jīng)...
    沈念sama閱讀 44,355評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡徘跪,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,666評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了琅攘。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片垮庐。...
    茶點故事閱讀 38,809評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖坞琴,靈堂內(nèi)的尸體忽然破棺而出哨查,到底是詐尸還是另有隱情,我是刑警寧澤剧辐,帶...
    沈念sama閱讀 34,504評論 4 334
  • 正文 年R本政府宣布寒亥,位于F島的核電站,受9級特大地震影響荧关,放射性物質(zhì)發(fā)生泄漏溉奕。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 40,150評論 3 317
  • 文/蒙蒙 一忍啤、第九天 我趴在偏房一處隱蔽的房頂上張望加勤。 院中可真熱鬧,春花似錦、人聲如沸胸竞。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,882評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽卫枝。三九已至煎饼,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間校赤,已是汗流浹背吆玖。 一陣腳步聲響...
    開封第一講書人閱讀 32,121評論 1 267
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留马篮,地道東北人沾乘。 一個月前我還...
    沈念sama閱讀 46,628評論 2 362
  • 正文 我出身青樓,卻偏偏與公主長得像浑测,于是被迫代替她去往敵國和親翅阵。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,724評論 2 351

推薦閱讀更多精彩內(nèi)容