關(guān)于 RxJava 最友好的文章—— RxJava 2.0 全新來襲

前言

之前寫RxJava相關(guān)文章的時(shí)候先口,就有人想讓我談?wù)凴xJava2.0的新特性,說實(shí)話宇驾,一開始我是拒絕的倍靡。因?yàn)樵谖铱磥恚琑xJava2.0雖然是版本的重大升級(jí)课舍,但總歸還是RxJava塌西,升級(jí)一個(gè)版本還能上天是咋的他挎?了解一下它的更新文檔不就好了么?真的有必要單出一篇文章來談這個(gè)么捡需?

但是詳細(xì)的了解了RxJava2.0以及部分源碼之后办桨,我覺得還是有必要對(duì)RxJava2.0做一個(gè)說明,幫助大家對(duì)于RxJava有更好的認(rèn)識(shí)站辉。


鋪墊

假如你還不是很熟悉RxJava呢撞,或者對(duì)于背壓這個(gè)概念(2.0更新中會(huì)涉及到背壓的概念)很模糊,希望你也能讀一讀下面兩篇鋪墊的文章:

  • 關(guān)于RxJava最友好的文章
  • 關(guān)于RxJava最友好的文章----背壓

關(guān)于背壓的那篇文章本來是本文的一部分饰剥,因?yàn)槠^大殊霞,被剝離出去了,所以建議大家有時(shí)間也一并閱讀捐川。


正文

RxJava2.0有很多的更新脓鹃,一些改動(dòng)甚至沖擊了我之前的文章里的內(nèi)容,這也是我想寫這篇文章的原因之一古沥。不過想要寫這篇文章其實(shí)也是有難度的瘸右,因?yàn)橄嚓P(guān)的資料去其實(shí)是很少的,還得自己硬著頭皮上....不過俗話說得好岩齿,有困難要上太颤,沒有困難創(chuàng)造困難也要上。

在這里盹沈,我會(huì)按照我們之前關(guān)于RxJava的文章的講述順序:觀察者模式龄章,操作符,線程調(diào)度乞封,這三個(gè)方面依次看有哪些更新做裙。


添加依賴

這個(gè)估計(jì)得放在最前面。

Android端使用RxJava需要依賴新的包名:

    //RxJava的依賴包(我使用的最新版本)
    compile 'io.reactivex.rxjava2:rxjava:2.0.1'
    //RxAndroid的依賴包
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
    

觀察者模式

首先聲明肃晚,RxJava以觀察者模式為骨架锚贱,在2.0中依然如此

不過此次更新中关串,出現(xiàn)了兩種觀察者模式:

  • Observable(被觀察者)/Observer(觀察者)
  • Flowable(被觀察者)/Subscriber(觀察者)

RxJava2.X中拧廊,Observeable用于訂閱Observer,是不支持背壓的晋修,而Flowable用于訂閱Subscriber吧碾,是支持背壓(Backpressure)的。

關(guān)于背壓這個(gè)概念以及它在1.0版本中的缺憾在上一篇文章中我已經(jīng)介紹到了墓卦,如果你不是很清楚倦春,我在這里在做一個(gè)介紹:背壓是指在異步場(chǎng)景中,被觀察者發(fā)送事件速度遠(yuǎn)快于觀察者的處理速度的情況下,一種告訴上游的被觀察者降低發(fā)送速度的策略溅漾,在1.0中山叮,關(guān)于背壓最大的遺憾,就是集中在Observable這個(gè)類中添履,導(dǎo)致有的Observable支持背壓,有的不支持脑又。為了解決這種缺憾暮胧,新版本把支持背壓和不支持背壓的Observable區(qū)分開來。

Observable/Observer

Observable正常用法:

  Observable mObservable=Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onComplete();
            }
        });
        
Observer mObserver=new Observer<Integer>() {
            //這是新加入的方法问麸,在訂閱后發(fā)送數(shù)據(jù)之前往衷,
            //回首先調(diào)用這個(gè)方法,而Disposable可用于取消訂閱
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer value) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        };
        
mObservable.subscribe(mObserver);

這種觀察者模型是不支持背壓的严卖。

啥叫不支持背壓呢席舍?

當(dāng)被觀察者快速發(fā)送大量數(shù)據(jù)時(shí),下游不會(huì)做其他處理哮笆,即使數(shù)據(jù)大量堆積来颤,調(diào)用鏈也不會(huì)報(bào)MissingBackpressureException,消耗內(nèi)存過大只會(huì)OOM

我在測(cè)試的時(shí)候,快速發(fā)送了100000個(gè)整形數(shù)據(jù)稠肘,下游延遲接收福铅,結(jié)果被觀察者的數(shù)據(jù)全部發(fā)送出去了,內(nèi)存確實(shí)明顯增加了项阴,遺憾的是沒有OOM滑黔。

所以,當(dāng)我們使用Observable/Observer的時(shí)候环揽,我們需要考慮的是略荡,數(shù)據(jù)量是不是很大(官方給出以1000個(gè)事件為分界線,僅供各位參考)

Flowable/Subscriber

        Flowable.range(0,10)
        .subscribe(new Subscriber<Integer>() {
            Subscription sub;
            //當(dāng)訂閱后歉胶,會(huì)首先調(diào)用這個(gè)方法汛兜,其實(shí)就相當(dāng)于onStart(),
            //傳入的Subscription s參數(shù)可以用于請(qǐng)求數(shù)據(jù)或者取消訂閱
            @Override
            public void onSubscribe(Subscription s) {
                Log.w("TAG","onsubscribe start");
                sub=s;
                sub.request(1);
                Log.w("TAG","onsubscribe end");
            }

            @Override
            public void onNext(Integer o) {
                Log.w("TAG","onNext--->"+o);
                sub.request(1);
            }
            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }
            @Override
            public void onComplete() {
                Log.w("TAG","onComplete");
            }
        });

輸出如下:

onsubscribe start
onNext--->0
onNext--->1
onNext--->2
...
onNext--->10
onComplete
onsubscribe end

Flowable是支持背壓的跨扮,也就是說序无,一般而言,上游的被觀察者會(huì)響應(yīng)下游觀察者的數(shù)據(jù)請(qǐng)求衡创,下游調(diào)用request(n)來告訴上游發(fā)送多少個(gè)數(shù)據(jù)帝嗡。這樣避免了大量數(shù)據(jù)堆積在調(diào)用鏈上,使內(nèi)存一直處于較低水平璃氢。

當(dāng)然哟玷,F(xiàn)lowable也可以通過creat()來創(chuàng)建:

        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onNext(4);
                e.onComplete();
            }
        }
        //需要指定背壓策略
        , BackpressureStrategy.BUFFER);

Flowable雖然可以通過create()來創(chuàng)建,但是你必須指定背壓的策略,以保證你創(chuàng)建的Flowable是支持背壓的(這個(gè)在1.0的時(shí)候就很難保證巢寡,可以說RxJava2.0收緊了create()的權(quán)限)喉脖。

根據(jù)上面的代碼的結(jié)果輸出中可以看到,當(dāng)我們調(diào)用subscription.request(n)方法的時(shí)候抑月,不等onSubscribe()中后面的代碼執(zhí)行树叽,就會(huì)立刻執(zhí)行到onNext方法,因此谦絮,如果你在onNext方法中使用到需要初始化的類時(shí)题诵,應(yīng)當(dāng)盡量在subscription.request(n)這個(gè)方法調(diào)用之前做好初始化的工作;

當(dāng)然,這也不是絕對(duì)的层皱,我在測(cè)試的時(shí)候發(fā)現(xiàn)性锭,通過create()自定義Flowable的時(shí)候,即使調(diào)用了subscription.request(n)方法叫胖,也會(huì)等onSubscribe()方法中后面的代碼都執(zhí)行完之后草冈,才開始調(diào)用onNext。

TIPS: 盡可能確保在request()之前已經(jīng)完成了所有的初始化工作瓮增,否則就有空指針的風(fēng)險(xiǎn)怎棱。

其他觀察者模式

當(dāng)然,除了上面這兩種觀察者钉赁,還有一類觀察者

  • Single/SingleObserver
  • Completable/CompletableObserver
  • Maybe/MaybeObserver

其實(shí)這三者都差不多蹄殃,Maybe/MaybeObserver可以說是前兩者的復(fù)合體,因此以Maybe/MaybeObserver為例簡(jiǎn)單介紹一下這種觀察者模式的用法

//判斷是否登陸
Maybe.just(isLogin())
    //可能涉及到IO操作你踩,放在子線程
    .subscribeOn(Schedulers.newThread())
    //取回結(jié)果傳到主線程
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new MaybeObserver<Boolean>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onSuccess(Boolean value) {
                if(value){
                    ...
                }else{
                    ...
                }
            }

            @Override
            public void onError(Throwable e) {
                
            }

            @Override
            public void onComplete() {

            }
        });

上面就是Maybe/MaybeObserver的普通用法诅岩,你可以看到,實(shí)際上带膜,這種觀察者模式并不用于發(fā)送大量數(shù)據(jù)吩谦,而是發(fā)送單個(gè)數(shù)據(jù),也就是說膝藕,當(dāng)你只想要某個(gè)事件的結(jié)果(true or false)的時(shí)候式廷,你可以用這種觀察者模式


這是上面那些被觀察者的上層接口:

//Observable接口
interface ObservableSource<T> {
    void subscribe(Observer<? super T> observer);
}
//Single接口
interface SingleSource<T> {
    void subscribe(SingleObserver<? super T> observer);
}
//Completable接口
interface CompletableSource {
    void subscribe(CompletableObserver observer);
}
//Maybe接口
interface MaybeSource<T> {
    void subscribe(MaybeObserver<? super T> observer);
}
//Flowable接口
public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

其實(shí)我們可以看到,每一種觀察者都繼承自各自的接口芭挽,這也就把他們能完全的區(qū)分開滑废,各自獨(dú)立(特別是Observable和Flowable),保證了他們各自的拓展或者配套的操作符不會(huì)相互影響。

例如flatMap操作符實(shí)現(xiàn):

//Flowable中flatMap的定義
Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper);

//Observable中flatMap的定義
Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper);

假如你想為Flowable寫一個(gè)自定義的操作符袜爪,那么只要保證Function< Publisher >中的類型實(shí)現(xiàn)了Publisher接口即可蠕趁。這么說可能很抽象,大家不理解其實(shí)也沒關(guān)系辛馆,因?yàn)椴⒉煌扑]大家自定義操作符俺陋,RxJava中的操縱符的組合已經(jīng)可以滿足大家的需求了。

當(dāng)然,你也會(huì)注意到上面那些接口中的subscribe()方法的返回類型為void了腊状,在1.X中诱咏,這個(gè)方法一般會(huì)返回一個(gè)Subscription對(duì)象,用于取消訂閱〗赏冢現(xiàn)在袋狞,這個(gè)功能的對(duì)象已經(jīng)被放到觀察者Observer或者subscriber的內(nèi)部實(shí)現(xiàn)方法中了,

Flowable/Subscriber

public interface Subscriber<T> {  
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

public interface Subscription {
    public void request(long n);
    public void cancel();
}

上面的實(shí)例中映屋,onSubscribe(Subscription s)傳入的參數(shù)s就肩負(fù)著取消訂閱的功能硕并,當(dāng)然,他也可以用于請(qǐng)求上游的數(shù)據(jù)秧荆。

在Observable/observer中,傳入的參數(shù)是另一個(gè)對(duì)象

Observable/Observer

public interface Observer<T> {
   void onSubscribe(Disposable d);
    void onNext(T value);
    void onError(Throwable e);
    void onComplete();
}

public interface Disposable {
    /**
     * Dispose the resource, the operation should be idempotent.
     */
    void dispose();
    /**
     * Returns true if this resource has been disposed.
     * @return true if this resource has been disposed
     */
    boolean isDisposed();
}

在Observer接口中埃仪,onSubscribe(Disposable d)方法傳入的Disposable也是用于取消訂閱乙濒,基本功能是差不多的,只不過命名不一致卵蛉,大家知道就好颁股。

其實(shí)這種設(shè)計(jì)可以說還是符合邏輯的,因?yàn)槿∠嗛嗊@個(gè)動(dòng)作就只有觀察者(Observer等)才能做的傻丝,現(xiàn)在把它并入到觀察者內(nèi)部甘有,也算順理成章吧。

最后再提一點(diǎn)更新葡缰,就是被觀察者不再接收null作為數(shù)據(jù)源了亏掀。


操作符相關(guān)

這一塊其實(shí)可以說沒什么改動(dòng),大部分之前你用過的操作符都沒變泛释,即使有所變動(dòng)滤愕,也只是包名或類名的改動(dòng)。大家可能經(jīng)常用到的就是Action和Function怜校。

Action相關(guān)

之前我在文章里介紹過關(guān)于Action這類接口间影,在1.0中,這類接口是從Action0茄茁,Action1...往后排(數(shù)字代表可接受的參數(shù))魂贬,現(xiàn)在做出了改動(dòng)

Rx1.0-----------Rx2.0

Action1--------Action
Action1--------Consumer
Action2--------BiConsumer
后面的Action都去掉了,只保留了ActionN

Function相關(guān)

同上裙顽,也是命名方式的改變

上面那兩個(gè)類付燥,和RxJava1.0相比,他們都增加了throws Exception锦庸,也就是說机蔗,在這些方法做某些操作就不需要try-catch

例如:

Flowable.just("file.txt")
.map(name -> Files.readLines(name))
.subscribe(lines -> System.out.println(lines.size()), Throwable::printStackTrace);

Files.readLines(name)這類io方法本來是需要try-catch的,現(xiàn)在直接拋出異常萝嘁,就可以放心的使用lambda 梆掸,保證代碼的簡(jiǎn)潔優(yōu)美。

doOnCancel/doOnDispose/unsubscribeOn

以doOnCancel為例牙言,大概就是當(dāng)取消訂閱時(shí)酸钦,會(huì)調(diào)用這個(gè)方法,例如:

Flowable.just(1, 2, 3)
.doOnCancel(() -> System.out.println("Cancelled!"))
.take(2)
.subscribe(System.out::println);

take新操符會(huì)取消后面那些還未被發(fā)送的事件咱枉,因而會(huì)觸發(fā)doOnCancel

其他的一些操作符基本沒變卑硫,或者只是改變了名字,在這里就不一一介紹了蚕断,需要提一下的是欢伏,很多操作符都有兩套,一套用于Observable亿乳,一套用于Flowable硝拧。


線程調(diào)度

可以說這一塊兒基本也沒有改動(dòng),如果一定要說的話葛假。

  • 那就是去掉了Schedulers.immediate()這個(gè)線程環(huán)境
  • 移除的還有Schedulers.test()(我好像從來沒用過這個(gè)方法)
  • io.reactivex.Scheduler這個(gè)抽象類支持直接調(diào)度自定義線程任務(wù)(這個(gè)我也沒怎么用)

補(bǔ)充

如果你想把自己的RxJava1.0的遷移到2.0的版本障陶,可以使用這個(gè)庫RxJava2Interop ,在github上可以找到,它可以在Rxjava1.0和2.0之間相互轉(zhuǎn)換聊训,也就是說抱究,不僅可以把1.0的代碼遷移到2.0,你還可以把2.0的代碼遷移到1.0,哈哈带斑。

補(bǔ)充2

在RxJava1.0中鼓寺,有的人會(huì)使用CompositeSubscription來收集Subscription,來統(tǒng)一取消訂閱遏暴,現(xiàn)在在RxJava2.0中侄刽,由于subscribe()方法現(xiàn)在返回void,那怎么辦呢朋凉?

其實(shí)在RxJava2.0中州丹,Flowable提供了subscribeWith這個(gè)方法可以返回當(dāng)前訂閱的觀察者,并且通過ResourceSubscriber DisposableSubscriber等觀察者來提供 Disposable的接口杂彭。

所以墓毒,如果想要達(dá)成RxJava1.0的效果,現(xiàn)在應(yīng)該是這樣做:

CompositeDisposable composite = new CompositeDisposable();

composite.add(Flowable.range(1, 8).subscribeWith(subscriber));

這個(gè)subscriber 應(yīng)該是 ResourceSubscriber 或者 DisposableSubscriber 的實(shí)例亲怠。


結(jié)尾

其實(shí)從整篇文章的分析來看所计,改動(dòng)最大的還是觀察者模式的實(shí)現(xiàn),被拆分和細(xì)化了团秽,主要分成了Observable和Flowable兩大類主胧,當(dāng)然還有與之相關(guān)聯(lián)的其他變動(dòng)叭首,總體來看這一版本可以說是對(duì)于觀察者和被觀察者的重構(gòu)

RxJava2.0的范例代碼我沒精力去寫了踪栋,也正巧有位外國朋友已經(jīng)寫了RxJava2.0的demo,下面是他的項(xiàng)目地址:

在github上搜索:RxJava2-Android-Samples

當(dāng)然焙格,學(xué)習(xí)2.0 的過程中有什么問題也可以在這里留言討論。


附錄

下面我截圖展示一下2.0相對(duì)于1.0的一些改動(dòng)的細(xì)節(jié)夷都,僅做參考眷唉。





其實(shí)這些都是官方給出的列表,截圖在這里只是方便大家觀摩囤官。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末冬阳,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子党饮,更是在濱河造成了極大的恐慌肝陪,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,372評(píng)論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件刑顺,死亡現(xiàn)場(chǎng)離奇詭異见坑,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)捏检,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來不皆,“玉大人贯城,你說我怎么就攤上這事∨Γ” “怎么了能犯?”我有些...
    開封第一講書人閱讀 162,415評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)犬耻。 經(jīng)常有香客問我踩晶,道長(zhǎng),這世上最難降的妖魔是什么枕磁? 我笑而不...
    開封第一講書人閱讀 58,157評(píng)論 1 292
  • 正文 為了忘掉前任渡蜻,我火速辦了婚禮,結(jié)果婚禮上计济,老公的妹妹穿的比我還像新娘茸苇。我一直安慰自己,他們只是感情好沦寂,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,171評(píng)論 6 388
  • 文/花漫 我一把揭開白布学密。 她就那樣靜靜地躺著,像睡著了一般传藏。 火紅的嫁衣襯著肌膚如雪腻暮。 梳的紋絲不亂的頭發(fā)上彤守,一...
    開封第一講書人閱讀 51,125評(píng)論 1 297
  • 那天,我揣著相機(jī)與錄音哭靖,去河邊找鬼具垫。 笑死,一個(gè)胖子當(dāng)著我的面吹牛款青,可吹牛的內(nèi)容都是我干的做修。 我是一名探鬼主播,決...
    沈念sama閱讀 40,028評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼抡草,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼饰及!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起康震,我...
    開封第一講書人閱讀 38,887評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤燎含,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后腿短,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體屏箍,經(jīng)...
    沈念sama閱讀 45,310評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,533評(píng)論 2 332
  • 正文 我和宋清朗相戀三年橘忱,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了赴魁。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,690評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡钝诚,死狀恐怖颖御,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情凝颇,我是刑警寧澤潘拱,帶...
    沈念sama閱讀 35,411評(píng)論 5 343
  • 正文 年R本政府宣布,位于F島的核電站拧略,受9級(jí)特大地震影響芦岂,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜垫蛆,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,004評(píng)論 3 325
  • 文/蒙蒙 一禽最、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧袱饭,春花似錦弛随、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至决左,卻和暖如春愕够,著一層夾襖步出監(jiān)牢的瞬間走贪,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評(píng)論 1 268
  • 我被黑心中介騙來泰國打工惑芭, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留坠狡,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,693評(píng)論 2 368
  • 正文 我出身青樓遂跟,卻偏偏與公主長(zhǎng)得像逃沿,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子幻锁,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,577評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容