本文參考
這可能是最好的RxJava 2.x 教程(完結(jié)版)
RxJava 2.x 擁有了新的特性,其依賴于4個基礎(chǔ)接口螺男,它們分別是
Publisher
Subscriber
Subscription
Processor
其中最核心的莫過于 Publisher 和 Subscriber华望。Publisher 可以發(fā)出一系列的事件丈秩,而 Subscriber 負責和處理這些事件。
其中用的比較多的自然是 Publisher 的 Flowable烁巫,它支持背壓署隘。關(guān)于背壓給個簡潔的定義就是:
背壓是指在異步場景中,被觀察者發(fā)送事件速度遠快于觀察者的處理速度的情況下亚隙,一種告訴上游的被觀察者降低發(fā)送速度的策略磁餐。
簡而言之,背壓是流速控制的一種策略恃鞋。有興趣的可以看一下官方對于背壓的講解崖媚。
可以明顯地發(fā)現(xiàn),RxJava 2.x 最大的改動就是對于 backpressure 的處理恤浪,為此將原來的 Observable 拆分成了新的 Observable 和 Flowable畅哑,同時其他相關(guān)部分也同時進行了拆分,但令人慶幸的是水由,是它荠呐,是它,還是它砂客,還是我們最熟悉和最喜歡的 RxJava泥张。
觀察者模式
大家可能都知道, RxJava 以觀察者模式為骨架鞠值,在 2.0 中依舊如此媚创。
不過此次更新中,出現(xiàn)了兩種觀察者模式:
Observable ( 被觀察者 ) / Observer ( 觀察者 )
Flowable (被觀察者)/ Subscriber (觀察者)
在 RxJava 2.x 中彤恶,Observable 用于訂閱 Observer钞钙,不再支持背壓(1.x 中可以使用背壓策略),而 Flowable 用于訂閱 Subscriber 声离, 是支持背壓(Backpressure)的芒炼。
第一步:初始化 Observable
第二步:初始化 Observer
第三步:建立訂閱關(guān)系
// RxJava的鏈式操作
Observable.create(new ObservableOnSubscribe<Integer>() {
// 1. 創(chuàng)建被觀察者 & 生產(chǎn)事件
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
}
}).subscribe(new Observer<Integer>() {
// 2. 通過通過訂閱(subscribe)連接觀察者和被觀察者
// 3. 創(chuàng)建觀察者 & 定義響應事件的行為
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始采用subscribe連接");
}
// 默認最先調(diào)用復寫的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "對Next事件" + value + "作出響應");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
線程調(diào)度
subScribeOn
用于指定發(fā)送者發(fā)送消息的線程
observeOn
用于指定接受者接送消息的線程
線程切換需要注意的
RxJava 內(nèi)置的線程調(diào)度器的確可以讓我們的線程切換得心應手,但其中也有些需要注意的地方术徊。
簡單地說本刽,subscribeOn() 指定的就是發(fā)射事件的線程,observerOn 指定的就是訂閱者接收事件的線程。
多次指定發(fā)射事件的線程只有第一次指定的有效子寓,也就是說多次調(diào)用 subscribeOn() 只有第一次的有效暗挑,其余的會被忽略。
但多次指定訂閱者接收線程是可以的斜友,也就是說每調(diào)用一次 observerOn()窿祥,下游的線程就會切換一次。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Log.e(TAG, "Observable thread is : " + Thread.currentThread().getName());
e.onNext(1);
e.onComplete();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "After observeOn(mainThread)蝙寨,Current thread is " + Thread.currentThread().getName());
}
})
.observeOn(Schedulers.io())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "After observeOn(io),Current thread is " + Thread.currentThread().getName());
}
});
實例代碼中嗤瞎,分別用 Schedulers.newThread() 和 Schedulers.io() 對發(fā)射線程進行切換墙歪,并采用 observeOn(AndroidSchedulers.mainThread() 和 Schedulers.io() 進行了接收線程的切換”雌妫可以看到輸出中發(fā)射線程僅僅響應了第一個 newThread虹菲,但每調(diào)用一次 observeOn() ,線程便會切換一次掉瞳,因此如果我們有類似的需求時毕源,便知道如何處理了。
RxJava 中陕习,已經(jīng)內(nèi)置了很多線程選項供我們選擇霎褐,例如有:
Schedulers.io() 代表io操作的線程, 通常用于網(wǎng)絡,讀寫文件等io密集型的操作;
Schedulers.computation() 代表CPU計算密集型的操作, 例如需要大量計算的操作该镣;
Schedulers.newThread() 代表一個常規(guī)的新線程冻璃;
AndroidSchedulers.mainThread() 代表Android的主線程
這些內(nèi)置的 Scheduler 已經(jīng)足夠滿足我們開發(fā)的需求,因此我們應該使用內(nèi)置的這些選項损合,而 RxJava 內(nèi)部使用的是線程池來維護這些線程省艳,所以效率也比較高。
操作符
Create
create 操作符應該是最常見的操作符了嫁审,主要用于產(chǎn)生一個 Obserable 被觀察者對象跋炕,為了方便大家的認知,以后的教程中統(tǒng)一把被觀察者 Observable 稱為發(fā)射器(上游事件)律适,觀察者 Observer 稱為接收器(下游事件)辐烂。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Log.e(TAG, "Observable emit 1" + "\n");
e.onNext(1);
Log.e(TAG, "Observable emit 2" + "\n");
e.onNext(2);
Log.e(TAG, "Observable emit 3" + "\n");
e.onNext(3);
e.onComplete();
Log.e(TAG, "Observable emit 4" + "\n" );
e.onNext(4);
}
}).subscribe(new Observer<Integer>() {
private int i;
private Disposable mDisposable;
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.e(TAG, "onSubscribe : " + d.isDisposed() + "\n" );
mDisposable = d;
}
@Override
public void onNext(@NonNull Integer integer) {
Log.e(TAG, "onNext : value : " + integer + "\n" );
i++;
if (i == 2) {
// 在RxJava 2.x 中,新增的Disposable可以做到切斷的操作擦耀,讓Observer觀察者不再接收上游事件
mDisposable.dispose();
Log.e(TAG, "onNext : isDisposable : " + mDisposable.isDisposed() + "\n");
}
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG, "onError : value : " + e.getMessage() + "\n" );
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete" + "\n" );
}
});
需要注意的幾點是:
在發(fā)射事件中棉圈,我們在發(fā)射了數(shù)值 3 之后,直接調(diào)用了 e.onComlete()眷蜓,雖然無法接收事件分瘾,但發(fā)送事件還是繼續(xù)的。
另外一個值得注意的點是,在 RxJava 2.x 中德召,可以看到發(fā)射事件方法相比 1.x 多了一個 throws Excetion白魂,意味著我們做一些特定操作再也不用 try-catch 了。
并且 2.x 中有一個 Disposable 概念上岗,這個東西可以直接調(diào)用切斷福荸,可以看到,當它的 isDisposed() 返回為 false 的時候肴掷,接收器能正常接收事件敬锐,但當其為 true 的時候,接收器停止了接收呆瞻。所以可以通過此參數(shù)動態(tài)控制接收事件了台夺。
Map
Map 基本算是 RxJava 中一個最簡單的操作符了,熟悉 RxJava 1.x 的知道痴脾,它的作用是對上游發(fā)送的每一個事件應用一個函數(shù)颤介,使得每一個事件都按照指定的函數(shù)去變化,而在 2.x 中它的作用幾乎一致赞赖。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "this is result " + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "accept : " + s +"\n" );
}
});
map 基本作用就是將一個 Observable 通過某種函數(shù)關(guān)系滚朵,轉(zhuǎn)換為另一種 Observable,上面例子中就是把我們的 Integer 數(shù)據(jù)變成了 String 類型前域。從Log日志顯而易見辕近。
Zip
zip 專用于合并事件,該合并不是連接(連接操作符后面會說)匿垄,合并事件專用,分別從兩個上游事件中各取出一個組合,一個事件只能被使用一次亏推,順序嚴格按照事件發(fā)送的順序,最終下游事件收到的是和上游事件最少的數(shù)目相同(必須兩兩配對,多余的舍棄)
Observable.zip(getStringObservable(), getInterObservable(), new BiFunction<String, Integer, String>() {
@Override
public String apply(String s, Integer integer) throws Exception {
return s+integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "zip : accept : " + s + "\n");
}
});
private Observable<String> getStringObservable() {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
if (!e.isDisposed()) {
e.onNext("A");
Log.e(TAG, "String emit : A \n");
e.onNext("B");
Log.e(TAG, "String emit : B \n");
e.onNext("C");
Log.e(TAG, "String emit : C \n");
}
}
});
}
private Observable<Integer> getInterObservable() {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
if (!e.isDisposed()) {
e.onNext(1);
Log.e(TAG, "Integer emit : 1 \n");
e.onNext(2);
Log.e(TAG, "Integer emit : 2 \n");
e.onNext(3);
Log.e(TAG, "Integer emit : 3 \n");
e.onNext(4);
Log.e(TAG, "Integer emit : 4 \n");
e.onNext(5);
Log.e(TAG, "Integer emit : 5 \n");
}
}
});
}
zip 組合事件的過程就是分別從發(fā)射器 A 和發(fā)射器 B 各取出一個事件來組合年堆,并且一個事件只能被使用一次吞杭,組合的順序是嚴格按照事件發(fā)送的順序來進行的,運行后可以看到变丧,1 永遠是和 A 結(jié)合的芽狗,2 永遠是和 B 結(jié)合的。
最終接收器收到的事件數(shù)量是和發(fā)送器發(fā)送事件最少的那個發(fā)送器的發(fā)送事件數(shù)目相同痒蓬,運行后看到童擎,5 很孤單,沒有人愿意和它交往攻晒,孤獨終老的單身狗顾复。
Concat
對于單一的把兩個發(fā)射器連接成一個發(fā)射器,雖然 zip 不能完成鲁捏,但我們還是可以自力更生芯砸,官方提供的 concat 讓我們的問題得到了完美解決。
//兩個發(fā)射器連接成一個發(fā)射器
Observable.concat(Observable.just(1, 2, 3, 4), Observable.just("hello", 5, 6))
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object serializable) throws Exception {
Log.e(TAG, "concat : "+ serializable + "\n" );
}
});
FlatMap
FlatMap將一個發(fā)送事件的上游Observable變換成多個發(fā)送事件的Observables,然后將它們發(fā)射的事件合并后放進一個單獨的Observable里假丧,flatMap 并不能保證事件的順序
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
}
int delayTime = (int) (1 + Math.random() * 100);
return Observable.fromIterable(list).delay(delayTime, TimeUnit.MICROSECONDS);
// return Observable.fromIterable(list);
}
}).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object s) throws Exception {
Log.e(TAG, "flatMap : accept : " + s + "\n");
}
});
ConcatMap
ConcatMap將一個發(fā)送事件的上游Observable變換成多個發(fā)送事件Observables双揪,然后將它們發(fā)射的事件合并后放進一個單獨的Observable里,flatMap 并不能保證事件的順序包帚,ConcatMap 可以保證事件發(fā)送的順序
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).concatMap(new Function<Integer, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Integer integer) throws Exception {
List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
}
int delayTime = (int) (1 + Math.random() * 10);
return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
Log.e(TAG, "flatMap : accept : " + o + "\n");
}
});
distinct
去重操作符渔期,其實就是簡單的去重
Observable.just(1, 2, 3, 1, 2, 34, 45, 55)
.distinct()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "distinct : " + integer + "\n");
}
});
Filter
Filter 過濾操作符】拾睿可以接受一個參數(shù)疯趟,讓其過濾掉不符合我們條件的值
Observable.just(1, 20, 65, -5, 19)
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer >= 10;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "filter : " + integer + "\n");
}
});
buffer
buffer 操作符接受兩個參數(shù),buffer(count,skip)谋梭,作用是將 Observable 中的數(shù)據(jù)按 skip (步長) 分成最大不超過 count 的 buffer 迅办,然后生成一個 Observable
image.png
Observable.just(1, 2, 3, 4, 5)
.buffer(3,2)
.subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
Log.e(TAG, "buffer size : " + integers.size() + "\n");
Log.e(TAG, "buffer value : " + integers.toString());
for (Integer i : integers) {
Log.e(TAG, i + "");
}
Log.e(TAG, "\n");
}
});
timer
在Rxjava中timer操作符既可以延遲執(zhí)行一段邏輯,也可以間隔執(zhí)行一段邏輯【注意】但在RxJava2.x已經(jīng)過時了章蚣,現(xiàn)在用interval操作符來間隔執(zhí)行,但需要注意的是姨夹,timer 和 interval 均默認在新線程纤垂。
image.png
Observable.timer(2, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG, "timer :" + aLong + " at " + DateUtil.getStringDate() + "\n");
}
});
interval
interval 操作符用于間隔時間執(zhí)行某個操作,其接受三個參數(shù)磷账,分別是第一次發(fā)送延遲峭沦,間隔時間,時間單位逃糟。默認在新線程
image.png
Disposable mDisposable;
Log.e(TAG, "interval start : " + DateUtil.getStringDate() + "\n");
mDisposable = Observable.interval(3, 2, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())// 由于interval默認在新線程吼鱼,所以我們應該切回主線程
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG, "interval :" + aLong + " at " + DateUtil.getStringDate() + "\n");
}
});
@Override
protected void onDestroy() {
super.onDestroy();
if (mDisposable != null && !mDisposable.isDisposed()) {
mDisposable.dispose();
}
}
doOnNext
它的作用是讓訂閱者在接收到數(shù)據(jù)之前干點有意思的事情。假如我們在獲取到數(shù)據(jù)之前想先保存一下它
Observable.just(1, 2, 3, 4)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "doOnNext 保存 " + integer + "成功" + "\n");
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "doOnNext :" + integer + "\n");
}
});
skip
skip 很有意思绰咽,其實作用就和字面意思一樣菇肃,接受一個 long 型參數(shù) count ,代表跳過 count 個數(shù)目開始接收取募。
image.png
Observable.just(1, 2, 3, 4, 5).skip(2)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "skip : "+integer + "\n");
}
});
take
take琐谤,接受一個 long 型參數(shù) count ,用于指定訂閱者最多收到多少數(shù)據(jù)
Flowable.fromArray(1, 2, 3, 4, 5)
.take(2)//最多接收多少個參數(shù)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept: take : "+integer + "\n" );
}
});
just
就是一個簡單的發(fā)射器依次調(diào)用 onNext() 方法玩敏。
Observable.just("1", "2",1,5,7)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Serializable>() {
@Override
public void accept(Serializable serializable) throws Exception {
Log.e(TAG,"accept : onNext : " + serializable + "\n" );
}
});
Single
顧名思義斗忌,Single 只會接收一個參數(shù),而 SingleObserver 只會調(diào)用 onError() 或者 onSuccess()旺聚。
Single.just(new Random().nextInt())
.subscribe(new SingleObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Integer value) {
Log.e(TAG, "single : onSuccess : "+value+"\n" );
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "single : onError : "+e.getMessage()+"\n");
}
});
debounce
去除發(fā)送頻率過快的項 debounce(500, TimeUnit.MILLISECONDS) 去除發(fā)送間隔時間小于500毫秒的發(fā)射事件织阳、或者用于過濾掉發(fā)射速率過快的數(shù)據(jù)項
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
// send events with simulated time wait
emitter.onNext(1); // skip
Thread.sleep(400);
emitter.onNext(2); // deliver
Thread.sleep(505);
emitter.onNext(3); // skip
Thread.sleep(100);
emitter.onNext(4); // deliver
Thread.sleep(605);
emitter.onNext(5); // deliver
Thread.sleep(510);
emitter.onComplete();
}
}).debounce(500, TimeUnit.MICROSECONDS) //去除發(fā)送間隔小于500毫秒的發(fā)送事件
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "debounce :" + integer + "\n");
}
});
Defer
簡單地時候就是每次訂閱都會創(chuàng)建一個新的 Observable,并且如果沒有被訂閱砰粹,就不會產(chǎn)生新的 Observable唧躲。
image.png
Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> call() throws Exception {
return Observable.just(1, 2, 3, 4, 5);
}
});
observable.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.e(TAG, "defer : " + value + "\n");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "defer : onError : " + e.getMessage() + "\n");
}
@Override
public void onComplete() {
Log.e(TAG, "defer : onComplete\n");
}
});
last
last 操作符僅取出可觀察到的最后一個值,或者是滿足某些條件的最后一項。
image.png
Observable.just(1, 2, 3,99,33,0)
.last(5)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("RxJavaAct", "last : " + integer + "\n");
}
});
merge
merge 合并惊窖,熟悉版本控制工具的你一定不會不知道 merge 命令刽宪,而在 Rx 操作符中,merge 的作用是把多個 Observable 結(jié)合起來界酒,接受可變參數(shù)圣拄,也支持迭代器集合。注意它和 concat 的區(qū)別在于毁欣,不用等到 發(fā)射器 A 發(fā)送完所有的事件再進行發(fā)射器 B 的發(fā)送
Observable.merge(Observable.just(1, 2), Observable.just(3, 4, 5, 99))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("RxJavaAct", "accept : " + integer + "\n");
}
});
reduce
reduce 操作符一次用一個方法處理一個值庇谆,可以有一個 seed 作為初始值。
Observable.just(1, 2, 3)
.reduce(new BiFunction<Integer, Integer, Integer>() {
//我們中間采用 reduce 凭疮,支持一個 function 為兩數(shù)值相加
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
Log.e("RxJavaAct", "BiFunction: apply : " + integer + " + " + integer2 + " = " + (integer + integer2) + "\n");
return integer + integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("RxJavaAct", "accept: reduce : " + integer + "\n");
}
});
scan
scan 操作符作用和上面的 reduce 一致饭耳,唯一區(qū)別是 reduce 是個只追求結(jié)果的壞人,而 scan 會始終如一地把每一個步驟都輸出执解。
Observable.just(1, 2, 3)
.scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
Log.e("RxJavaAct", "BiFunction: apply : " + integer + " + " + integer2 + " = " + (integer + integer2) + "\n");
return integer + integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("RxJavaAct", "accept: reduce : " + integer + "\n");
}
});
window
按照時間劃分窗口寞肖,將數(shù)據(jù)發(fā)送給不同的Observable
Log.e("RxJavaAct", "window\n");
Observable.interval(1, TimeUnit.SECONDS)
.take(15)
.window(3, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Observable<Long>>() {
@Override
public void accept(Observable<Long> longObservable) throws Exception {
Log.e("RxJavaAct", "Sub Divide begin...\n");
longObservable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e("RxJavaAct", "Next:" + aLong + "\n");
}
});
}
});
PublishSubject
onNext() 會通知每個觀察者,僅此而已
PublishSubject<Integer> publishSubject = PublishSubject.create();
publishSubject.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.e("RxJavaAct", "First onSubscribe :"+d.isDisposed()+"\n");
}
@Override
public void onNext(@NonNull Integer integer) {
Log.e("RxJavaAct", "First onNext value :"+integer + "\n");
}
@Override
public void onError(@NonNull Throwable e) {
Log.e("RxJavaAct", "First onError:"+e.getMessage()+"\n" );
}
@Override
public void onComplete() {
Log.e("RxJavaAct", "First onComplete!\n");
}
});
publishSubject.onNext(1);
publishSubject.onNext(2);
publishSubject.onNext(3);
publishSubject.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.e("RxJavaAct", "Second onSubscribe :"+d.isDisposed()+"\n");
}
@Override
public void onNext(@NonNull Integer integer) {
Log.e("RxJavaAct", "Second onNext value :"+integer + "\n");
}
@Override
public void onError(@NonNull Throwable e) {
Log.e("RxJavaAct", "Second onError:"+e.getMessage()+"\n" );
}
@Override
public void onComplete() {
Log.e("RxJavaAct", "Second onComplete!\n");
}
});
publishSubject.onNext(4);
publishSubject.onNext(5);
publishSubject.onComplete();
AsyncSubject
在調(diào)用 onComplete() 之前衰腌,除了 subscribe() 其它的操作都會被緩存新蟆,
在調(diào)用 onComplete() 之后只有最后一個 onNext() 會生效
AsyncSubject<Integer> asyncSubject = AsyncSubject.create();
asyncSubject.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.e("RxJavaAct", "First onSubscribe :"+d.isDisposed()+"\n");
}
@Override
public void onNext(@NonNull Integer integer) {
Log.e("RxJavaAct", "First onNext value :"+integer + "\n");
}
@Override
public void onError(@NonNull Throwable e) {
Log.e("RxJavaAct", "First onError:"+e.getMessage()+"\n" );
}
@Override
public void onComplete() {
Log.e("RxJavaAct", "First onComplete!\n");
}
});
asyncSubject.onNext(1);
asyncSubject.onNext(2);
asyncSubject.onNext(3);
asyncSubject.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.e("RxJavaAct", "Second onSubscribe :"+d.isDisposed()+"\n");
}
@Override
public void onNext(@NonNull Integer integer) {
Log.e("RxJavaAct", "Second onNext value :"+integer + "\n");
}
@Override
public void onError(@NonNull Throwable e) {
Log.e("RxJavaAct", "Second onError:"+e.getMessage()+"\n" );
}
@Override
public void onComplete() {
Log.e("RxJavaAct", "Second onComplete!\n");
}
});
asyncSubject.onNext(4);
asyncSubject.onNext(5);
asyncSubject.onComplete();
BehaviorSubject
// BehaviorSubject 的最后一次 onNext() 操作會被緩存,
// 然后在 subscribe() 后立刻推給新注冊的 Observer
BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create();
behaviorSubject.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.e("RxJavaAct", "First onSubscribe :"+d.isDisposed()+"\n");
}
@Override
public void onNext(@NonNull Integer integer) {
Log.e("RxJavaAct", "First onNext value :"+integer + "\n");
}
@Override
public void onError(@NonNull Throwable e) {
Log.e("RxJavaAct", "First onError:"+e.getMessage()+"\n" );
}
@Override
public void onComplete() {
Log.e("RxJavaAct", "First onComplete!\n");
}
});
behaviorSubject.onNext(1);
behaviorSubject.onNext(2);
behaviorSubject.onNext(3);
behaviorSubject.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.e("RxJavaAct", "Second onSubscribe :"+d.isDisposed()+"\n");
}
@Override
public void onNext(@NonNull Integer integer) {
Log.e("RxJavaAct", "Second onNext value :"+integer + "\n");
}
@Override
public void onError(@NonNull Throwable e) {
Log.e("RxJavaAct", "Second onError:"+e.getMessage()+"\n" );
}
@Override
public void onComplete() {
Log.e("RxJavaAct", "Second onComplete!\n");
}
});
behaviorSubject.onNext(4);
behaviorSubject.onNext(5);
behaviorSubject.onComplete();
Completable
// 只關(guān)心結(jié)果右蕊,也就是說 Completable 是沒有 onNext 的琼稻,
// 要么成功要么出錯,不關(guān)心過程饶囚,在 subscribe 后的某個時間點返回結(jié)果
Log.e("RxJavaAct", "Completable\n");
Completable.timer(1, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new CompletableObserver() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.e("RxJavaAct", "onSubscribe : d :" + d.isDisposed() + "\n");
}
@Override
public void onComplete() {
Log.e("RxJavaAct", "onComplete\n");
}
@Override
public void onError(@NonNull Throwable e) {
Log.e("RxJavaAct", "onError :" + e.getMessage() + "\n");
}
});
Flowable
專用于解決被壓問題
Flowable.just(1, 2, 3, 4)
//seed 作為初始值
.reduce(100, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
Log.e("RxJavaAct", "reduce :" + integer + " + " + integer2 + " = " + (integer + integer2) + "\n");
return integer + integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("RxJavaAct", "Flowable :" + integer + "\n");
}
});
上邊這些操作符的使用場景
采用 OkHttp3 配合 map , doOnNext , 線程切換做簡單的網(wǎng)絡請求
- 1帕翻、通過 Observable.create() 方法,調(diào)用 OkHttp 網(wǎng)絡請求;
- 2萝风、通過 map 操作符結(jié)合 Gson , 將 Response 轉(zhuǎn)換為 bean 類;
- 3嘀掸、通過 doOnNext() 方法,解析 bean 中的數(shù)據(jù)规惰,并進行數(shù)據(jù)庫存儲等操作;
- 4横殴、調(diào)度線程,在子線程進行耗時操作任務卿拴,在主線程更新 UI;
- 5衫仑、通過 subscribe(),根據(jù)請求成功或者失敗來更新 UI。
Observable.create(new ObservableOnSubscribe<Response>() {
@Override
public void subscribe(ObservableEmitter<Response> e) throws Exception {
Request.Builder builder = new Request.Builder()
.url("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
.get();
Request request = builder.build();
Call call = new OkHttpClient().newCall(request);
Response response = call.execute();
e.onNext(response);
}
}).map(new Function<Response, MobileAddress>() {
@Override
public MobileAddress apply(Response response) throws Exception {
Log.e(TAG, "map 線程:" + Thread.currentThread().getName() + "\n");
if (response.isSuccessful()) {
ResponseBody body = response.body();
if (body != null) {
Log.e(TAG, "map:轉(zhuǎn)換前:" + response.body().toString());
return new Gson().fromJson(body.string(), MobileAddress.class);
}
}
return null;
}
}).observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<MobileAddress>() {
@Override
public void accept(MobileAddress mobileAddress) throws Exception {
Log.e(TAG, "doOnNext 線程:" + Thread.currentThread().getName() + "\n");
Log.e(TAG, "doOnNext: 保存成功:" + mobileAddress.toString() + "\n");
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<MobileAddress>() {
@Override
public void accept(MobileAddress mobileAddress) throws Exception {
Log.e(TAG, "subscribe 線程:" + Thread.currentThread().getName() + "\n");
Log.e(TAG, "成功:" + mobileAddress.toString() + "\n");
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e(TAG, "subscribe 線程:" + Thread.currentThread().getName() + "\n");
Log.e(TAG, "失敹榛ā:" + throwable.getMessage() + "\n");
}
});
使用框架 rx2-Networking
- 1文狱、通過 Rx2AndroidNetworking 的 get() 方法獲取 Observable 對象(已解析);
- 2、調(diào)度線程缘挽,根據(jù)請求結(jié)果更新 UI瞄崇。
- 3呻粹、 implementation 'com.amitshekhar.android:rx2-android-networking:1.0.0'
Rx2AndroidNetworking.get("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
.build()
.getObjectObservable(MobileAddress.class)
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<MobileAddress>() {
@Override
public void accept(MobileAddress mobileAddress) throws Exception {
Log.e(TAG, "doOnNext:" + Thread.currentThread().getName() + "\n");
Log.e(TAG, "doOnNext:" + mobileAddress.toString() + "\n");
}
}).map(new Function<MobileAddress, MobileAddress.ResultBean>() {
@Override
public MobileAddress.ResultBean apply(MobileAddress mobileAddress) throws Exception {
Log.e(TAG, "\n" );
Log.e(TAG, "map:"+Thread.currentThread().getName()+"\n" );
return mobileAddress.getResult();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<MobileAddress.ResultBean>() {
@Override
public void accept(MobileAddress.ResultBean resultBean) throws Exception {
Log.e(TAG, "subscribe 成功:" + Thread.currentThread().getName() + "\n");
Log.e(TAG, "成功:" + resultBean.toString() + "\n");
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e(TAG, "subscribe 失敗:"+Thread.currentThread().getName()+"\n" );
Log.e(TAG, "失敗:"+ throwable.getMessage()+"\n" );
}
});
zip操作符的使用場景
- 結(jié)合多個接口的數(shù)據(jù)再更新 UI
- zip 操作符可以把多個 Observable 的數(shù)據(jù)接口成一個數(shù)據(jù)源再發(fā)出去
Observable<MobileAddress> observable1 = Rx2AndroidNetworking.get("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
.build()
.getObjectObservable(MobileAddress.class);
Observable<CategoryResult> observable2 = Network.getGankApi()
.getCategoryData("Android", 1, 1);
Observable.zip(observable1, observable2, new BiFunction<MobileAddress, CategoryResult, String>() {
@Override
public String apply(MobileAddress mobileAddress, CategoryResult categoryResult) throws Exception {
return "合并后的數(shù)據(jù)為:手機歸屬地:"+mobileAddress.getResult().getMobilearea()+"人名:"+categoryResult.toString();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "accept: 成功:" + s+"\n");
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e(TAG, "accept: 失斔昭小:" + throwable+"\n");
}
});
flatMap 使用場景
- 多個網(wǎng)絡請求依次依賴
- 1等浊、注冊用戶前先通過接口A獲取當前用戶是否已注冊,再通過接口B注冊;
- 2摹蘑、注冊后自動登錄筹燕,先通過注冊接口注冊用戶信息,注冊成功后馬上調(diào)用登錄接口進行自動登錄衅鹿。
Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list")
.addQueryParameter("rows", 1 + "")
.build()
.getObjectObservable(FoodList.class) // 發(fā)起獲取食品列表的請求撒踪,并解析到FootList
.subscribeOn(Schedulers.io()) // 在io線程進行網(wǎng)絡請求
.observeOn(AndroidSchedulers.mainThread()) // 在主線程處理獲取食品列表的請求結(jié)果
.doOnNext(new Consumer<FoodList>() {
@Override
public void accept(@NonNull FoodList foodList) throws Exception {
// 先根據(jù)獲取食品列表的響應結(jié)果做一些操作
Log.e(TAG, "accept: doOnNext :" + foodList.toString());
}
})
.observeOn(Schedulers.io()) // 回到 io 線程去處理獲取食品詳情的請求
.flatMap(new Function<FoodList, ObservableSource<FoodDetail>>() {
@Override
public ObservableSource<FoodDetail> apply(@NonNull FoodList foodList) throws Exception {
if (foodList != null && foodList.getTngou() != null && foodList.getTngou().size() > 0) {
return Rx2AndroidNetworking.post("http://www.tngou.net/api/food/show")
.addBodyParameter("id", foodList.getTngou().get(0).getId() + "")
.build()
.getObjectObservable(FoodDetail.class);
}
return null;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<FoodDetail>() {
@Override
public void accept(@NonNull FoodDetail foodDetail) throws Exception {
Log.e(TAG, "accept: success :" + foodDetail.toString());
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "accept: error :" + throwable.getMessage());
}
});
concat 使用場景
- 先讀取緩存數(shù)據(jù)再讀取網(wǎng)絡請求
- 實用場景中經(jīng)常會用到緩存數(shù)據(jù),以通過減少頻繁的網(wǎng)絡請求達到節(jié)約流量:
- 1大渤、concat 可以做到不交錯的發(fā)射兩個甚至多個 Observable 的發(fā)射物;
- 2制妄、并且只有前一個 Observable 終止(onComplete)才會訂閱下一個 Observable
// 先讀取緩存再讀取網(wǎng)絡
Observable<FoodList> cache = Observable.create(new ObservableOnSubscribe<FoodList>() {
@Override
public void subscribe(ObservableEmitter<FoodList> e) throws Exception {
Log.e(TAG, "create當前線程:" + Thread.currentThread().getName());
FoodList data = CacheManager.getInstance().getFoodListJsonData();
// 在操作符concat 中,只有調(diào)用onComplete 之后才會執(zhí)行下一個Observable
if (data != null) {//如果緩存數(shù)據(jù)不為空泵三,則直接讀取緩存數(shù)據(jù)耕捞,而不讀取網(wǎng)絡數(shù)據(jù)
isFromNet = false;
Log.e(TAG, "\nsubscribe: 讀取緩存數(shù)據(jù):");
runOnUiThread(new Runnable() {
@Override
public void run() {
Log.e(TAG, "正在 : 讀取緩存數(shù)據(jù):");
}
});
e.onNext(data);
} else {
isFromNet = true;
runOnUiThread(new Runnable() {
@Override
public void run() {
Log.e(TAG, "正在 : 讀取網(wǎng)絡數(shù)據(jù):");
}
});
Log.e(TAG, "\nsubscribe: 讀取網(wǎng)絡數(shù)據(jù):");
e.onComplete();
}
}
});
// 請求網(wǎng)絡數(shù)據(jù)
Observable<FoodList> network = Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list")
.addQueryParameter("rows", 10 + "")
.build()
.getObjectObservable(FoodList.class);
// Concat 先讀取緩存數(shù)據(jù)并展示UI再獲取網(wǎng)絡數(shù)據(jù)刷新UI
// 1、concat 可以做到不交錯的發(fā)射兩個甚至多個 Observable 的發(fā)射物;
// 2烫幕、并且只有前一個 Observable 終止(onComplete)才會訂閱下一個 Observable
// 兩個 Observable 的泛型應當保持一致
Observable.concat(cache, network)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<FoodList>() {
@Override
public void accept(FoodList foodList) throws Exception {
Log.e(TAG, "subscribe 成功:" + Thread.currentThread().getName());
if (isFromNet) {
Log.e(TAG, "accept : 網(wǎng)絡獲取數(shù)據(jù)設(shè)置緩存: \n" + foodList.toString());
CacheManager.getInstance().setFoodListJsonData(foodList);
}
Log.e(TAG, "accept: 讀取數(shù)據(jù)成功:" + foodList.toString());
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e(TAG, "subscribe 失敗:" + Thread.currentThread().getName());
Log.e(TAG, "accept: 讀取數(shù)據(jù)失敯吵椤:" + throwable.getMessage());
}
});
debounce 使用場景
- 減少頻繁的網(wǎng)絡請求
- 設(shè)想情景:輸入框數(shù)據(jù)變化或者點擊一次按鈕時就要進行網(wǎng)絡請求,這樣會產(chǎn)生大量的網(wǎng)絡請求纬霞,而實際上又不需要
- 這時候可以通過 debounce 過濾掉發(fā)射頻率過快的請求。
btn_debounce = findViewById(R.id.btn_debounce);
// 減少頻繁的網(wǎng)絡請求
RxView.clicks(btn_debounce)
.debounce(2, TimeUnit.SECONDS) // 過濾掉發(fā)射頻率小于2秒的發(fā)射事件
.subscribe(new Consumer<Object>() {
@Override
public void accept(@NonNull Object o) throws Exception {
clickBtn();
}
});
private void clickBtn() {
Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list")
.addQueryParameter("rows", 2 + "") // 只獲取兩條數(shù)據(jù)
.build()
.getObjectObservable(FoodList.class)
.subscribeOn(Schedulers.io()) // 在 io 線程進行網(wǎng)絡請求
.observeOn(AndroidSchedulers.mainThread()) // 在主線程進行更新UI等操作
.subscribe(new Consumer<FoodList>() {
@Override
public void accept(@NonNull FoodList foodList) throws Exception {
Log.e(TAG, "accept: 獲取數(shù)據(jù)成功:" + foodList.toString() + "\n");
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "accept: 獲取數(shù)據(jù)失斍浴:" + throwable.getMessage() + "\n");
}
});
}
interval 使用場景
- 間隔任務實現(xiàn)心跳
- 可能我們會遇上各種即時通訊诗芜,如果是自己家開發(fā)的 IM 即時通訊,我相信在移動端一定少不了心跳包的管理
- 而我們 RxJava 2.x 的 interval 操作符棒我們解決了這個問題埃疫。
private Disposable mDisposable;
// 間隔任務實現(xiàn)心跳
mDisposable = Flowable
.interval(1, TimeUnit.SECONDS)
.doOnNext(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG, "accept: doOnNext : " + aLong);
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG, "accept: 設(shè)置文本 :" + aLong);
}
});
@Override
protected void onDestroy() {
super.onDestroy();
if (mDisposable != null) {
mDisposable.dispose();
}
}
線程調(diào)度
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
Log.e(TAG, "Observable thread is : " + Thread.currentThread().getName());
e.onNext(1);
e.onComplete();
}
}).subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "After observeOn(mainThread)伏恐,Current thread is " + Thread.currentThread().getName());
}
})
.observeOn(Schedulers.io())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "After observeOn(io),Current thread is " + Thread.currentThread().getName());
}
});
或許上邊的代碼接口有些調(diào)用不了