序言
由于我是先學習了1.0版本接著繼續(xù)學習2.0休涤,所以本文可能不太適合沒有接觸過RxJava的同學,所以可以先看一下诫舅,1.0的學習筆記,傳送門 http://www.reibang.com/p/a8cedc061ab1
首先要使用RxJava2羽利,先要添加依賴:
compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
類介紹
與RxJava1相比:
-
相同點:
都有Observable,Observer刊懈,subscribe() - 不同點:
在RxJava2的Observable中重寫的方法變?yōu)閟ubscribe(ObservableEmitter<String> emitter),其中的ObservableEmitter: Emitter是發(fā)射器的意思这弧,就是用來發(fā)出事件的。
在RxJava2的observer重寫方法中新添加了一個方法onSubscribe(Disposable d),其中Disposable是一次性用品虚汛,用完就丟棄匾浪,對與這個參數(shù)可以理解為一個攔截器,將所有發(fā)送過來的數(shù)據(jù)攔截下倆卷哩,讓observer不會收到蛋辈。
-
在RxJava2中subscribe()重載方法的參數(shù)變了。
public final Disposable subscribe() {} public final Disposable subscribe(Consumer<? super T> onNext) {} public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {} public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {} public final void subscribe(Observer<? super T> observer) {}
在實際項目中我們一般只關(guān)心onNext(),和onError(),所以我們一般只會重載兩個參數(shù)的殉疼。
然后我們用代碼來理解一下上面的相同點和不同點
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Log.e(TAG, "subscribe: 1" );
e.onNext(1);
Log.e(TAG, "subscribe: 2" );
e.onNext(2);
Log.e(TAG, "subscribe: 3" );
e.onNext(3);
e.onComplete();
}
});
Observer<Integer> observer = new Observer<Integer>() {
private Disposable disposable;
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe: " + d.isDisposed());
disposable = d;
}
@Override
public void onNext(Integer value) {
Log.e(TAG, "onNext: " + value);
if (value == 2) {
disposable.dispose();
Log.e(TAG, "onNext: " + disposable.isDisposed());
}
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete: ");
}
};
observable.subscribe(observer);
運行結(jié)果如下所示:
com.example.cosima.rxjava2learn E/MainActivity: onSubscribe: false
com.example.cosima.rxjava2learn E/MainActivity: subscribe: 1
com.example.cosima.rxjava2learn E/MainActivity: onNext: 1
com.example.cosima.rxjava2learn E/MainActivity: subscribe: 2
com.example.cosima.rxjava2learn E/MainActivity: onNext: 2
com.example.cosima.rxjava2learn E/MainActivity: onNext: true
com.example.cosima.rxjava2learn E/MainActivity: subscribe: 3
可以看到在disposable為true的時候梯浪,Observable可以發(fā)送數(shù)據(jù),但是在Observer沒有接收到數(shù)據(jù)瓢娜。
注:與RxJava1相同挂洛,當Observable發(fā)送onComplete之后,Observable在onComplete之后的數(shù)據(jù)可以發(fā)送眠砾,Observer在接收到onComplete之后不再繼續(xù)接收事件虏劲。onError與onComplete的原理一樣,但是onComplete和onError必須唯一并且互斥, 即不能發(fā)多個onComplete, 也不能發(fā)多個onError, 也不能先發(fā)一個onComplete, 然后再發(fā)一個onError, 反之亦然褒颈。
創(chuàng)建方式
想對于RxJava1來說柒巫,Observable的創(chuàng)建方式心添加了幾種方式:
-
fromIterable((Iterable<? extends T> list)方式
遍歷集合,發(fā)送每個item相當于多次調(diào)用onNext().
注Collection接口是Iterable接口的子接口谷丸,所以所有Collection接口的實現(xiàn)類都可以作為Iterable對象直接傳入fromIterable()方法堡掏。
fromArray(T... items)方式
遍歷集合,發(fā)送每個item相當于多次調(diào)用onNext().interval(long period, TimeUnit unit)方式
創(chuàng)建一個按固定時間間隔發(fā)射整數(shù)序列的Observable刨疼,可用作定時器 period:時間間隔interval(long initialDelay, long period, TimeUnit unit)方式
initialDelay:開始值泉唁,默認為0。range(final int start, final int count)方式
創(chuàng)建一個發(fā)射特定整數(shù)序列的Observable揩慕,第一個參數(shù)為起始值亭畜,第二個為發(fā)送的個數(shù),如果為0則不發(fā)送迎卤,負數(shù)則拋異常拴鸵。timer(long delay, TimeUnit unit)方式
一個給定的延遲后發(fā)射一個特殊的值,即表示延遲2秒后蜗搔,調(diào)用onNext()方法劲藐。
用于實現(xiàn)觀察者模式方式有很多種:
<center>
現(xiàn)在我們來實現(xiàn)一個簡單的倒計時功能:
封裝Observable
private Observable<Integer> initEvent2(final int time) {
return Observable.interval(0, 1, TimeUnit.SECONDS)
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.map(new Function<Long, Integer>() {
@Override
public Integer apply(Long aLong) throws Exception {
return time - aLong.intValue();
}
})
.take(time + 1);//限制循環(huán)次數(shù)
}
使用如下:
initEvent2(5).doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.e(TAG, "accept: 開始計時");
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept: 當前計時" + integer);
}
});
打印結(jié)果為 在上述代碼中我們使用到了map()操作符,接下來我們就一起來看下RxJava中的操作符
RxJava操作符
1.map()操作符:
把原來的Observable對象轉(zhuǎn)換成另一個Observable對象
2.flatMap()操作符:
flatMap和Map的相同點就是把一個對象轉(zhuǎn)化為另一個對象返回碍扔,但是不同的是flatMap()返回的是個Observable對象瘩燥,并且這個對象并不是直接發(fā)送到了回調(diào)方法中,而是把這個對象激活不同,之后將他發(fā)送到回調(diào)方法中厉膀。
3.filter()操作符:
根據(jù)自己的需求加入判斷邏輯,他的返回值是true或者是false二拐,用于表示是否需要被過濾服鹅。
4.take()操作符:
再上面的代碼中已經(jīng)用到過,具體含義就是限制輸出次數(shù)
5.doOnNext()操作符:
在輸出前可以做一個額外的操作
6.delay()操作符:
延遲Observer的輸出
線程控制Scheduler
該部分與RxJava1相比沒有變換百新,在平常使用中較多的都是以下幾個:
- Schedulers.newThread(): 總是啟用新線程企软,并在新線程執(zhí)行操作。
- Schedulers.io(): I/O 操作(讀寫文件饭望、讀寫數(shù)據(jù)庫仗哨、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler形庭。行為模式和 newThread() 差不多,區(qū)別在于 io() 的內(nèi)部實現(xiàn)是用一個無數(shù)量上限的線程池厌漂,可以重用空閑的線程萨醒,因此多數(shù)情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中苇倡,可以避免創(chuàng)建不必要的線程富纸。
- Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算旨椒,即不會被 I/O 等操作限制性能的操作晓褪,例如圖形的計算。這個 Scheduler 使用的固定的線程池综慎,大小為 CPU 核數(shù)涣仿。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU示惊。
- Android 還有一個專用的 AndroidSchedulers.mainThread()变过,它指定的操作將在 Android 主線程運行。
都是通過subscribeOn()和 observeOn() 兩個方法來對線程進行控制涝涤。
具體可見RxJava1.0學習筆記
與Retrofit結(jié)合使用
與1.0版本總體相差不多媚狰,只需要修改相應(yīng)的參數(shù)就可以了,比如subcribe()方法中的參數(shù)阔拳,請求如下所示:
主要內(nèi)容為紅框所示
接下來講一下RxJava2特有的了崭孤。
Flowable及Backpressure
來由
Rxjava2中,還有一個很大的變化就是Backpressure(背壓)糊肠,何為背壓辨宠,就是觀察者來不及處理被觀察者發(fā)出的事件(產(chǎn)生事件的速度大于處理事件的速度),導(dǎo)致事件被無限堆積货裹,最后產(chǎn)生異常嗤形。Flowable就是由此產(chǎn)生,專門用來處理這類問題弧圆。將原來的Observable拆分成了新的Observable和Flowable赋兵,同時其他相關(guān)部分也同時進行了拆分。
注意:處理Backpressure的策略僅僅是處理Subscriber接收事件的方式搔预,并不影響Flowable發(fā)送事件的方法霹期。即使采用了處理Backpressure的策略,F(xiàn)lowable原來以什么樣的速度產(chǎn)生事件拯田,現(xiàn)在還是什么樣的速度不會變化历造,主要處理的是Subscriber接收事件的方式。
處理Backpressure的策略
產(chǎn)生原因:
生產(chǎn)者和消費者不在同一線程下长已,生產(chǎn)者的速度大于消費者的速度褂删,就會產(chǎn)生Backpressure問題冯勉。如果生產(chǎn)者和消費者在同一線程下侈玄,不會產(chǎn)生Backpressure問題,所以可以說成同步不會產(chǎn)生問題祥国,異步可能產(chǎn)生問題培他。-
處理策略:
1. ERROR策略
產(chǎn)生Backpressure問題的時候直接拋出異常(MissingBackpressureException)Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() { @Override public void subscribe(FlowableEmitter<String> e) throws Exception { e.onNext("我"); e.onNext("愛"); e.onNext("你"); e.onComplete(); } }, BackpressureStrategy.ERROR); Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onSubscribe(Subscription s) { Log.e(TAG, "onSubscribe: "); s.request(Long.MAX_VALUE); } @Override public void onNext(String s) { Log.e(TAG, "onNext: " + s); } @Override public void onError(Throwable t) { Log.e(TAG, "onError: " + t.getMessage()); } @Override public void onComplete() { Log.e(TAG, "onComplete: " ); } }; flowable.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(subscriber);
上述代碼中在Flowable(被觀察者)添加了一個參數(shù)绸罗,并且onSubscribe(Subscription s)中傳給我們的不再是Disposable了, 而是Subscription荒典。然而Subscription也可以用于切斷觀察者與被觀察者之間的聯(lián)系,調(diào)用Subscription.cancel()方法吞鸭,并在此方法中添加了s.request(long count);這個方法就是用來向生產(chǎn)者申請可以消費的事件數(shù)量寺董。這樣就可以根據(jù)自身的消費能力進行消費。在異步調(diào)用時刻剥,RxJava中有個緩存池遮咖,用來緩存消費者處理不了暫時緩存下來的數(shù)據(jù),緩存池的默認大小為128造虏,即只能緩存128個事件御吞。無論request()中傳入的數(shù)字比128大或小,緩存池中在剛開始都會存入128個事件漓藕。當然如果本身并沒有這么多事件需要發(fā)送陶珠,則不會存128個事件。
在ERROR策略下享钞,如果緩存池溢出揍诽,就會立刻拋出MissingBackpressureException異常。
注:如果不調(diào)用request表示消費能力為0栗竖。如果不限制想request()中傳入任意參數(shù)暑脆,超過消費能力,也會造成資源浪費狐肢,產(chǎn)生OOM添吗。
2. BUFFER策略
BUFFER就是把RxJava中默認的只能存128個事件的緩存池換成一個大的緩存池,支持存很多很多的數(shù)據(jù)份名。
這樣碟联,消費者通過request()即使傳入一個很大的數(shù)字,生產(chǎn)者也會生產(chǎn)事件僵腺,并將處理不了的事件緩存玄帕。
但是這種方式任然比較消耗內(nèi)存,除非是我們比較了解消費者的消費能力想邦,能夠把握具體情況裤纹,不會產(chǎn)生OOM。BUFFER要慎用
3. DROP策略
當消費者處理不了事件,就丟棄鹰椒。
消費者通過request()傳入其需求n锡移,然后生產(chǎn)者把n個事件傳遞給消費者供其消費。其他消費不掉的事件就丟掉漆际。
4. LATEST策略
LATEST與DROP功能基本一致淆珊。
消費者通過request()傳入其需求n,然后生產(chǎn)者把n個事件傳遞給消費者供其消費奸汇。其他消費不掉的事件就丟掉施符。
唯一的區(qū)別就是LATEST總能使消費者能夠接收到生產(chǎn)者產(chǎn)生的最后一個事件。
還是以上述例子展示擂找,唯一的區(qū)別就是Flowable不再無限發(fā)事件戳吝,只發(fā)送1000000個。
參考:http://www.reibang.com/p/d149043d103a
源碼地址:https://github.com/MrMJL/RxJava2Demo
由于只是學習筆記贯涎,源碼可能會有點亂听哭,又不對或者不明白歡迎評論多多交流。推薦一個Android實習&&經(jīng)驗交流群:541144061