RxJava2 vs RxJava1

官方WIKI What's different in 2.0

RxJava2已經(jīng)發(fā)布了兩周了乙嘀,相比RxJava1,它的改動(dòng)還是很大的:


Observable and Flowable

在前一個(gè)版本里backpressure被集成到了Observable中,官方也提供了很多方法讓我們來處理backpressure問題禀倔。但是有一些特殊的場(chǎng)景根本無法用其來解決熬芜,最常見的例如UI事件。而不處理backpressure有可能導(dǎo)致MissingBackpressureException的出現(xiàn)曙强。

關(guān)于backpressure的概念可以看一下RxJava中backpressure這個(gè)概念的理解

為了解決這個(gè)問題残拐,在RxJava2里,引入了Flowable這個(gè)類:Observable不包含backpressure處理碟嘴,而Flowable包含溪食。

例如:

Flowable<Long> flowable = 
    Flowable.create((FlowableOnSubscribe<Long>) e -> {
        Observable.interval(10, TimeUnit.MILLISECONDS)
            .take(Integer.MAX_VALUE)
            .subscribe(e::onNext);
}, FlowableEmitter.BackpressureMode.DROP);


Observable<Long> observable = 
    Observable.create((ObservableOnSubscribe<Long>) e -> {
        Observable.interval(10, TimeUnit.MILLISECONDS)
            .take(Integer.MAX_VALUE)
            .subscribe(e::onNext);
});

兩個(gè)對(duì)象都以10毫秒一次派發(fā)數(shù)據(jù),假設(shè)訂閱他們的方法都是:

i -> {
    Thread.sleep(100);
    Log.v("TEST", "out : " + i);
}

以100毫秒一次消費(fèi)數(shù)據(jù)娜扇,消費(fèi)數(shù)據(jù)的效率是生產(chǎn)的1/10错沃。那么

  • 對(duì)于observable
    他會(huì)按照0,1,2,3,4...的順序依次消費(fèi)栅组,并輸出log,而沒有消費(fèi)的數(shù)據(jù)將會(huì)都存在內(nèi)存中枢析。如果在RxJava1中玉掸,內(nèi)存數(shù)據(jù)超過128個(gè)時(shí)將會(huì)拋出MissingBackpressureException錯(cuò)誤;而在RxJava2中并不會(huì)報(bào)錯(cuò)醒叁,數(shù)據(jù)會(huì)一直放到內(nèi)存中司浪,直到發(fā)生OutOfMemoryError

  • 對(duì)于flowable, 在創(chuàng)建時(shí)我們?cè)O(shè)定了FlowableEmitter.BackpressureMode.DROP把沼,一開始他會(huì)輸出0,1,2,3....127但之后會(huì)忽然跳躍到966,967,968 ...啊易。中間的部分?jǐn)?shù)據(jù)由于緩存不了,被拋棄掉了饮睬。


Single

和Observable租谈,F(xiàn)lowable一樣會(huì)發(fā)送數(shù)據(jù),不同的是訂閱后只能接受到一次:

Single<Long> single = Single.just(1l);

single.subscribe(new SingleObserver<Long>() {
    @Override
    public void onSubscribe(Disposable d) {
    }

    @Override
    public void onSuccess(Long value) {
        // 和onNext是一樣的
    }

    @Override
    public void onError(Throwable e) {
    }
});

普通Observable可以使用toSingle轉(zhuǎn)換:Observable.just(1).toSingle()


Completable

與Single類似捆愁,只能接受到完成(onComplete)和錯(cuò)誤(onError)

同樣也可以由普通的Observable轉(zhuǎn)換而來:Observable.just(1).toCompletable()


Base reactive interfaces

和Flowable的接口Publisher類似垦垂,Observable、Single牙瓢、Completable也有類似的基類

interface ObservableSource<T> {
    void subscribe(Observer<? super T> observer);
}

interface SingleSource<T> {
    void subscribe(SingleObserver<? super T> observer);
}

interface CompletableSource {
    void subscribe(CompletableObserver observer);
}

因此許多操作符接受的參數(shù)從以前的具體對(duì)象劫拗,變成了現(xiàn)在的接口:

Flowable<R> flatMap(
    Function<? super T, ? extends Publisher<? extends R>> mapper
);

Observable<R> flatMap(
    Function<? super T, ? extends ObservableSource<? extends R>> mapper
);

------
// 以前
Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {

由于接收的都是接口,在使用其他遵循Reactive-Streams設(shè)計(jì)的第三方庫的時(shí)候矾克,就不需要把他自定義的Flowable轉(zhuǎn)換成標(biāo)準(zhǔn)Flowable了页慷。


Subjects and Processors

io.reactivex.subjects.AsyncSubject,
io.reactivex.subjects.BehaviorSubject,
io.reactivex.subjects.PublishSubject,
io.reactivex.subjects.ReplaySubject,
io.reactivex.subjects.UnicastSubject

在RxJava2中依然存在,但現(xiàn)在他們不支持backpressure胁附。新出現(xiàn)的

io.reactivex.processors.AsyncProcessor,
io.reactivex.processors.BehaviorProcessor,
io.reactivex.processors.PublishProcessor,
io.reactivex.processors.ReplayProcessor
io.reactivex.processors.UnicastProcessor

支持backpressure


Other classes

rx.observables.ConnectableObservable變成了io.reactivex.observables.ConnectableObservable<T>io.reactivex.flowables.ConnectableFlowable<T>

類似的還有rx.observables.GroupedObservable酒繁。


Functional interfaces

需要注意的一點(diǎn)是,現(xiàn)在RxJava2的接口方法里加上了throws Exception:

ublic interface Consumer<T> {
    void accept(T t) throws Exception;
}

意味著在這些方法里調(diào)用一些會(huì)發(fā)生異常的方法不需要try-catch


Actions

另外大部分接口方法都按照J(rèn)ava8的接口方法名進(jìn)行了相應(yīng)的修改控妻,比如上面那個(gè)Consumer<T>接口原來叫Action1<T>州袒,而Action2<T>改名成了BiConsumer

Action3-Action9被刪掉了,大概因?yàn)闆]人用弓候。郎哭。


Functions

同上,基本就是名字的修改和不常用類的刪除


Subscriber

RxJava1里Subscriber只是一個(gè)空接口菇存,在新版里Subscriber被賦予了更多的作用夸研,有幾個(gè)實(shí)現(xiàn)類可以供我們使用,例如

ResourceSubscriber<Integer> subscriber = new ResourceSubscriber<Integer>() {
    @Override
    public void onStart() {
        request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(Integer t) {
        System.out.println(t);
    }

    @Override
    public void onError(Throwable t) {
        t.printStackTrace();
    }

    @Override
    public void onComplete() {
        System.out.println("Done");
    }
};

request()方法可以控制當(dāng)前subscriber需要接收幾個(gè)事件依鸥。而且亥至,還可以調(diào)用subscriber.dispose()來斷開對(duì)信號(hào)的監(jiān)聽。

同時(shí),onCompleted方法改成了onComplete姐扮。意味著完成時(shí)調(diào)用這個(gè)方法絮供,而不是完成后d

由于Subscription被改掉了(下面會(huì)講到)。如果需要類似以前CompositeSubscription的用法茶敏,可以使用:

CompositeDisposable composite2 = new CompositeDisposable();
composite2.add(Flowable.range(1, 5).subscribeWith(subscriber));

注意這里需要使用subscribeWith而不是subscribe杯缺,因?yàn)閟ubscribe方法現(xiàn)在返回void


Subscription

在RxJava1里,Subscription起到的是訂閱橋梁的作用睡榆。在2中萍肆,由于Subscription本身和Reactive-Streams里的另外一個(gè)同名概念沖突。因此把原本的Subscription改名成了Disposable胀屿。

除了上一節(jié)里subscribe(Subscriber )方法返回void塘揣,其他名為subscribe的方法都返回Disposable

相應(yīng)的,

  • CompositeSubscription 改名成了 CompositeDisposable
  • SerialSubscriptionMultipleAssignmentSubscription被合并到了SerialDisposable里. set() 方法會(huì)處理掉就的值宿崭,而replace()方法不會(huì)亲铡。
  • RefCountSubscription被移除了

Backpressure

在第一節(jié)Observable and Flowable里已經(jīng)說到了這個(gè)問題,在2中葡兑,Observable將不會(huì)處理backpressure奖蔓,也就不會(huì)發(fā)生MissingBackpressureException問題,但內(nèi)存仍然會(huì)緩存多余的數(shù)據(jù)讹堤。

而在使用Flowable時(shí)吆鹤,如果配置Backpressure有問題,那么MissingBackpressureException依然存在


Schedulers

RxJava2里仍然包含了computation, io, newThreadtrampoline這些默認(rèn)線程調(diào)度洲守。而immediate被移除了疑务,因?yàn)樗?jīng)常被人錯(cuò)誤使用。同時(shí)Schedulers.test也被移除了梗醇。


Entering the reactive world

將普通方法轉(zhuǎn)換成RxJava的數(shù)據(jù)源知允,在RxJava1中,提供了Observable.create()方法叙谨,但是這個(gè)方法過于強(qiáng)大温鸽,但使用時(shí)需要注意的東西太多經(jīng)常會(huì)發(fā)生錯(cuò)誤。

因此在RxJava2中手负,把原來的fromAsync重命名成了create涤垫,fromAsync是一個(gè)和create類似但更為簡(jiǎn)單和安全的方法。這樣大部分舊代碼都能夠繼續(xù)使用虫溜。


Leaving the reactive world

之前如果想把數(shù)據(jù)源轉(zhuǎn)換成普通的數(shù)據(jù)對(duì)象雹姊,需要先轉(zhuǎn)換成BlockingObservable。而在2中衡楞,可以調(diào)用blockingXXX方法直接把數(shù)據(jù)源轉(zhuǎn)換成對(duì)象:

List<Integer> list = Flowable.range(1, 100).toList().blockingFirst();

有一點(diǎn)需要特別注意,在RxJava2里,不建議在Subscriber里拋出錯(cuò)誤瘾境,這意味著下面的代碼可能有一天就不能繼續(xù)運(yùn)行了:

Subscriber<Integer> subscriber = new Subscriber<Integer>() {
    @Override
    public void onSubscribe(Subscription s) {
        s.request(Long.MAX_VALUE);
    }

    public void onNext(Integer t) {
        if (t == 1) {
            throw new IllegalArgumentException();
        }
    }

    public void onError(Throwable e) {
        if (e instanceof IllegalArgumentException) {
            throw new UnsupportedOperationException();
        }
    }

    public void onComplete() {
        throw new NoSuchElementException();
    }
};

Flowable.just(1).subscribe(subscriber);

由于上面類似的代碼實(shí)際中出現(xiàn)得很多歧杏,因此在2中提供了safeSubscribe方法,使用它就可以繼續(xù)在subscriber里拋出錯(cuò)誤迷守。

當(dāng)然犬绒,你可以繞過subscribe(subscriber)這個(gè)方法,使用類似:

Flowable.just(1).subscribe(subscriber::onNext, subscriber::onError, subscriber::onComplete);

這樣的方法兑凿,之前的代碼仍然可以繼續(xù)throw錯(cuò)誤凯力。


Operator differences

操作符的改動(dòng)不大,大部分是擴(kuò)充了參數(shù)數(shù)量礼华。
或者是加入prefetch代表可以加入預(yù)置數(shù)據(jù)咐鹤。


總結(jié)

可以明顯的看到,RxJava2最大的改動(dòng)就是對(duì)于backpressure的處理圣絮,為此將原來的Observable拆分成了新的ObservableFlowable祈惶,同時(shí)其他相關(guān)部分也同時(shí)進(jìn)行了拆分。

除此之外扮匠,他和我們最熟悉和喜愛的RxJava~


引用

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末捧请,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子棒搜,更是在濱河造成了極大的恐慌疹蛉,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,482評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件力麸,死亡現(xiàn)場(chǎng)離奇詭異氧吐,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)末盔,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,377評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門筑舅,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人陨舱,你說我怎么就攤上這事翠拣。” “怎么了游盲?”我有些...
    開封第一講書人閱讀 152,762評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵误墓,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我益缎,道長(zhǎng)谜慌,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,273評(píng)論 1 279
  • 正文 為了忘掉前任莺奔,我火速辦了婚禮欣范,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘。我一直安慰自己恼琼,他們只是感情好妨蛹,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,289評(píng)論 5 373
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著晴竞,像睡著了一般蛙卤。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上噩死,一...
    開封第一講書人閱讀 49,046評(píng)論 1 285
  • 那天颤难,我揣著相機(jī)與錄音,去河邊找鬼已维。 笑死行嗤,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的衣摩。 我是一名探鬼主播昂验,決...
    沈念sama閱讀 38,351評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼艾扮!你這毒婦竟也來了既琴?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,988評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤泡嘴,失蹤者是張志新(化名)和其女友劉穎甫恩,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體酌予,經(jīng)...
    沈念sama閱讀 43,476評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡磺箕,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,948評(píng)論 2 324
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了抛虫。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片松靡。...
    茶點(diǎn)故事閱讀 38,064評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖建椰,靈堂內(nèi)的尸體忽然破棺而出雕欺,到底是詐尸還是另有隱情,我是刑警寧澤棉姐,帶...
    沈念sama閱讀 33,712評(píng)論 4 323
  • 正文 年R本政府宣布屠列,位于F島的核電站,受9級(jí)特大地震影響伞矩,放射性物質(zhì)發(fā)生泄漏笛洛。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,261評(píng)論 3 307
  • 文/蒙蒙 一乃坤、第九天 我趴在偏房一處隱蔽的房頂上張望苛让。 院中可真熱鬧沟蔑,春花似錦、人聲如沸蝌诡。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,264評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽浦旱。三九已至,卻和暖如春九杂,著一層夾襖步出監(jiān)牢的瞬間颁湖,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,486評(píng)論 1 262
  • 我被黑心中介騙來泰國(guó)打工例隆, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留甥捺,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,511評(píng)論 2 354
  • 正文 我出身青樓镀层,卻偏偏與公主長(zhǎng)得像镰禾,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子唱逢,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,802評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容