RxJava和Retrofit的依賴
? ??//rxjava依賴
? ??implementation'io.reactivex.rxjava2:rxjava:2.0.1'
? ??implementation'io.reactivex.rxjava2:rxandroid:2.0.1'
? ??//retrofit
? ??compile'com.squareup.retrofit2:retrofit:2.1.0'
? ??//Gson converter
? ??compile'com.squareup.retrofit2:converter-gson:2.1.0'
? ??/RxJava2 Adapter
? ??compile'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'
? ??//okhttp
? ??compile'com.squareup.okhttp3:okhttp:3.4.1'
? ??compile'com.squareup.okhttp3:logging-interceptor:3.4.1'
1. RxJava鏈?zhǔn)綄懛?線程切換
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
Log.d(TAG, "emitter 1");
emitter.onNext("1");
Log.d(TAG, "emitter 2");
emitter.onNext("2");
Log.d(TAG, "emitter onComplete");
emitter.onComplete();
Log.d(TAG, "emitter 3");
emitter.onNext("3");
}
}).subscribeOn(Schedulers.newThread())?
.subscribeOn(Schedulers.io())??
.observeOn(AndroidSchedulers.mainThread())?
.observeOn(Schedulers.newThread())
.subscribe(new Observer() {
private int i;
@Override
public void onSubscribe(Disposable disposable) {
TestRxJavaActivity.this.disposable = disposable;
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext value:" + value);
i++;
if (i == 2) {
Log.d(TAG, "dispose");
disposable.dispose();
Log.d(TAG, "isDisposed:" + disposable.isDisposed());
}
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}});
ObservableEmitter:發(fā)射器,用來發(fā)出事件,可以發(fā)出三種類型的事件,通過調(diào)用emitter的onNext(T value)陪踩、onComplete()和onError(Throwable error)就可以分別發(fā)出next事件潮瓶、complete事件和error事件
? ? 不可以隨意亂七八糟發(fā)射事件圣絮,需要滿足一定的規(guī)則:
? ? ? ?① 上游可以發(fā)送無限個(gè)onNext, 下游也可以接收無限個(gè)onNext.
????????②當(dāng)上游發(fā)送了一個(gè)onComplete后, 上游onComplete之后的事件將會(huì)繼續(xù)發(fā)送, 而下游收到onComplete事件之后將不再繼續(xù)接收事件.
????????③當(dāng)上游發(fā)送了一個(gè)onError后, 上游onError之后的事件將繼續(xù)發(fā)送, 而下游收到onError事件之后將不再繼續(xù)接收事件.
? ? ? ? ④上游可以不發(fā)送onComplete或onError.
? ? ? ? ⑤最為關(guān)鍵的是onComplete和onError必須唯一并且互斥, 即不能發(fā)多個(gè)onComplete, 也不能發(fā)多個(gè)onError, 也不能先發(fā)一個(gè)onComplete, 然后再發(fā)一個(gè)onError, 反之亦然
注: 關(guān)于onComplete和onError唯一并且互斥這一點(diǎn), 是需要自行在代碼中進(jìn)行控制, 如果你的代碼邏輯中違背了這個(gè)規(guī)則, **并不一定會(huì)導(dǎo)致程序崩潰. ** 比如發(fā)送多個(gè)onComplete是可以正常運(yùn)行的, 依然是收到第一個(gè)onComplete就不再接收了, 但若是發(fā)送多個(gè)onError, 則收到第二個(gè)onError事件會(huì)導(dǎo)致程序會(huì)崩潰.??
2. RxJava中內(nèi)置的線程
????Schedulers.io() 代表io操作的線程, 通常用于網(wǎng)絡(luò),讀寫文件等io密集型的操作
? ??Schedulers.computation() 代表CPU計(jì)算密集型的操作, 例如需要大量計(jì)算的操作
????Schedulers.newThread() 代表一個(gè)常規(guī)的新線程
? ??AndroidSchedulers.mainThread() 代表Android的主線程
這些內(nèi)置的Scheduler已經(jīng)足夠滿足我們開發(fā)的需求, 因此我們應(yīng)該使用內(nèi)置的這些選項(xiàng), 在RxJava內(nèi)部使用的是線程池來維護(hù)這些線程, 所以效率也比較高.
線程切換:
? ??.subscribeOn(Schedulers.newThread()) //指定的是上游發(fā)送事件的線程
????.observeOn(AndroidSchedulers.mainThread()) //指定的是下游接收事件的線程.
多次指定上游的線程只有第一次指定的有效, 也就是說多次調(diào)用subscribeOn()?只有第一次的有效, 其余的會(huì)被忽略.
多次指定下游的線程是可以的, 也就是說每調(diào)用一次observeOn()?, 下游的線程就會(huì)切換一次.
3. 在Activity退出時(shí)要切斷水管(訂閱),調(diào)用Disposable的dispose()方法切斷連接,如果有多個(gè)Disposable,使用RxJava中內(nèi)置的容器CompositeDisposable, 每得到一個(gè)Disposable就調(diào)用?CompositeDisposable.add(),將它添加到容器中,在Activity退出的時(shí)候, 調(diào)用CompositeDisposable.clear()?即可切斷所有的水管(訂閱)
4. 最簡(jiǎn)單的map轉(zhuǎn)換操作符:
? ? 通過map, 可以將上游發(fā)來的事件轉(zhuǎn)換為任意的類型, 可以是一個(gè)Object, 也可以是一個(gè)集合.
? ? 示例代碼:
? ??????.map(new Function<Integer, String>() {
? ? ? ? ? ? //將Integer轉(zhuǎn)成String類型
????????????@Override
????????????public??String apply(Integer integer) throws Exception{
????????????return"This is result "+ integer;?
?????????} })
5. flatMap操作符:
? ??上游每發(fā)送一個(gè)事件, flatMap都將創(chuàng)建一個(gè)新的水管, 然后發(fā)送轉(zhuǎn)換之后的新的事件, 下游接收到的就是這些新的水管發(fā)送的數(shù)據(jù).這里需要注意的是, flatMap并不保證事件的順序,也就是圖中所看到的, 并不是事件1就在事件2的前面. 如果需要保證順序則需要使用concatMap,用法一模一樣
????示例代碼:
? ? ? ? .concatMap(newFunction<String, ObservableSource<Integer>>() {
????????????@Override
????????????public ObservableSource??apply(Integer integer)throws Exception{
????????????????final List list =new ArrayList<>();
????????????????for(int i =0; i <3; i++) {?
?????????????????????list.add("I am value "+ integer);?
?????????????????}
? ? ? ? ?????????return Observable.fromIterable(list);
? ? ? } })
????實(shí)踐示例:可以實(shí)現(xiàn)嵌套請(qǐng)求,如注冊(cè)完成去登錄就可以使用concatMap或者flatMap了
6. zip操作符通過一個(gè)函數(shù)將多個(gè)Observable發(fā)送的事件結(jié)合到一起耸黑,然后發(fā)送這些組合到一起的事件. 它按照嚴(yán)格的順序應(yīng)用這個(gè)函數(shù)盈简。它只發(fā)射與發(fā)射數(shù)據(jù)項(xiàng)最少的那個(gè)Observable一樣多的數(shù)據(jù)。并且一個(gè)事件只能被使用一次,?組合的順序是嚴(yán)格按照事件發(fā)送的順序來進(jìn)行的.并且發(fā)送的事件都是在同一線程,需要切換線程,發(fā)送事件才可以在不同線程同步進(jìn)行發(fā)送.
? ? 示例代碼:
? ??????Observable.zip(observable1, observable2,new BiFunction<Integer,String,String>(){
????????????@Override
?????????????public String apply(Integer integer, String s)throws Exception{
????????????????????return? integer + s;
?????????} })
? ? 實(shí)踐示例:比如一個(gè)界面需要展示用戶的一些信息, 而這些信息分別要從兩個(gè)服務(wù)器接口中獲取, 而只有當(dāng)兩個(gè)都獲取到了之后才能進(jìn)行展示, 這個(gè)時(shí)候就可以用Zip了
7. filter:過濾操作符,過濾上游事件
? ? ? ? 示例代碼:
? ??????????.filter(new Predicate() {
????????????????@Override
? ????????????? public boolean test(Integer integer)throws Exception {
????????????????????return integer%10==0;
????????????????}
????????????})
8. sample:取樣操作符,每隔指定的時(shí)間就從上游中取出一個(gè)事件發(fā)送給下游
? ? 示例代碼:.sample(2, TimeUnit.SECONDS)//沒個(gè)兩秒sample取樣一次上游事件
9.Flowable的使用
? ? 上游是Flowable,下游是Subscriber,鏈接不變,和Observable用法一樣,只是多了一個(gè)?背壓策略參數(shù):BackpressureStrategy.ERROR;
Flowable在設(shè)計(jì)的時(shí)候采用了一種新的思路也就是響應(yīng)式拉取的方式來更好的解決上下游流速不均衡的問題;
FLowable相比Observable, 在性能方面有些不足, 畢竟FLowable內(nèi)部為了實(shí)現(xiàn)響應(yīng)式拉取做了更多的操作, 性能有所丟失也是在所難免;
Flowable默認(rèn)最多可以存放128個(gè)事件;
背壓策略:
? ?? BackpressureStrategy.ERROR:當(dāng)下游沒有處理事件能力是拋出 MissingBackpressureException異常
? ?? BackpressureStrategy.BUFFER:上游水缸沒有大小限制;
? ? ?BackpressureStrategy.DROP: 直接把存不下的事件丟棄;
? ?? BackpressureStrategy.LATEST:只保留最新的事件;
? ? 注:Latest和Drop的區(qū)別在于Latest總是能獲取到最后最新的事件;
Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>(){
????????@Override?
????????public void subscribe(FlowableEmitter<Integer> emitter)throws Exception{? ? ? ? ? ? ? ? ? ?????????????Log.d(TAG,"emit 1");?
?????????????emitter.onNext(1);
?????????????Log.d(TAG,"emit 2");
?????????????emitter.onNext(2);?
?????????????Log.d(TAG,"emit 3");
?????????????emitter.onNext(3);
?????????????Log.d(TAG,"emit complete");
?????????????emitter.onComplete();
?????????}??}, BackpressureStrategy.ERROR);//增加了一個(gè)參數(shù),背壓策略
? ??????Subscriber<Integer> downstream =new Subscriber<Integer>() {
? ? ? ? ? ? ? ?@Override
????????????????publicvoid onSubscribe(Subscription s){?
?????????????????????Log.d(TAG,"onSubscribe");?
?????????????????????s.request(Long.MAX_VALUE);//注意這句代碼
????????????????}
????????????@Override?
????????????public void onNext(Integer integer){
?????????????????Log.d(TAG,"onNext: "+ integer); }
????????????@Override
????????????public void onError(Throwable t){?
?????????????????Log.w(TAG,"onError: ", t); }
????????????@Override
? ? ? ? ? ? ?public void onComplete(){?
?????????????????Log.d(TAG,"onComplete");
? ? ? ? ? } };?
?????????upstream.subscribe(downstream);
10.下游Subscriber的onSubcribe方法中傳遞的是一個(gè)Subscription,它也是上下游的一個(gè)開關(guān),調(diào)用Subscription.cancel()也可以切斷水管,同時(shí)也增加一個(gè)Subscription.requset(3)方法,該方法代表下游處理事件的能力,參數(shù)是幾就是代表能處理幾個(gè)事件
11.同一線程中在上游使用FlowableEmitter.requested()獲取下游處理事件能力的多少;
? ??當(dāng)上下游工作在不同的線程里時(shí)陶因,每一個(gè)線程里都有一個(gè)requested方法堰汉,而我們調(diào)用request(1000)時(shí),實(shí)際上改變的是下游線程中的requested费什,而上游中的requested的值是由RxJava內(nèi)部調(diào)用request(n)去設(shè)置的钾恢,這個(gè)調(diào)用會(huì)在合適的時(shí)候自動(dòng)觸發(fā)。
注:不同線程中下游每消費(fèi)96個(gè)事件便會(huì)自動(dòng)觸發(fā)內(nèi)部的request()去設(shè)置上游的requested的值, 發(fā)送事件前先判斷當(dāng)前的requested的值是否大于0鸳址,若等于0則說明下游處理不過來了瘩蚪,則需要等待
12.只有onNext()事件才會(huì)消耗事件,complete和error事件不會(huì)消耗requested;