注:只包含標準包中的操作符个绍,用于個人學習及備忘
參考博客:http://blog.csdn.net/maplejaw_/article/details/52396175
本篇將介紹rxjava中的創(chuàng)建操作巴柿、合并操作篮洁、過濾操作、條件/布爾操作蜗侈、聚合操作、轉換操作以及變換操作该面,只針對用法不涉及原理信卡,對RxJava不熟悉的可參考:http://gank.io/post/560e15be2dca930e00da1083
創(chuàng)建操作
-
create:使用OnSubscrib直接創(chuàng)建一個Observable
Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("item1"); subscriber.onNext("item2"); subscriber.onCompleted(); } });
-
from:將數(shù)組或集合拆分成具體對象后,轉換成發(fā)送這些對象的Observable
String[] arr = {"item1", "item2", "item3"}; Observable.from(arr) .subscribe(new Action1<String>() { @Override public void call(String s) { Log.d("debug", s); //調用多次猾瘸,分別打印出item1,item2,item3 } });
-
just:將一個或多個對象轉換成發(fā)送這些對象的Obserbable
Observable.just("item1","item2","item3") .subscribe(new Action1<String>() { @Override public void call(String s) { Log.d("debug", s); //調用多次,分別打印出item1,item2,item3 } });
empty:創(chuàng)建一個直接通知完成的Observable
error:創(chuàng)建一個直接通知錯誤的Observable
-
never:創(chuàng)建一個什么都不做的Observable
Observable observable1 = Observable.empty(); //直接調用onCompleted()方法 Observable observable2 = Observable.error(new RuntimeException()); //直接調用onError()方法 Observable observable3 = Observable.never(); //onNext(),onCompleted(),onError()均不調用
-
timer:創(chuàng)建一個延時發(fā)射數(shù)據(jù)的Observable
Observable.timer(1000, TimeUnit.MILLISECONDS) .subscribe(new Action1<Long>() { @Override public void call(Long aLong) { Log.d("debug", aLong.toString()); //aLong為0 } });
-
interval:創(chuàng)建一個按照給定的時間間隔發(fā)射送0開始的整數(shù)序列的Obervable
Observable.interval(2, 1, TimeUnit.SECONDS) .subscribe(new Action1<Long>() { @Override public void call(Long aLong) { //等待2秒后開始發(fā)射數(shù)據(jù)揽思,發(fā)射的時間間隔為1秒,從0開始計數(shù) } }); Observable.interval(1, TimeUnit.SECONDS) .subscribe(new Action1<Long>() { @Override public void call(Long aLong) { //等待1秒后開始發(fā)射數(shù)據(jù)儡湾,發(fā)射的時間間隔為1秒徐钠,從0開始計數(shù) //相當于Observable.interval(1, 1, TimeUnit.SECONDS) } });
-
range:創(chuàng)建一個發(fā)射指定范圍的整數(shù)序列的Observable
Observable.range(3, 4) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString());//依次發(fā)射3,4,5,6,從3開始發(fā)射4個數(shù)據(jù) } });
-
defer:觀察者訂閱時才創(chuàng)建Observable,每次訂閱返回一個新的Observable
Observable.defer(new Func0<Observable<String>>() { @Override public Observable<String> call() { return Observable.just("s"); } }).subscribe(new Action1<String>() { @Override public void call(String s) { Log.d("debug", s); //打印s } });
合并操作(用于組合多個Observavle)
-
concat:按順序連接多個Observable,注:Observable.concat(a,b)等價于a.concatWith(b)
Observable<Integer> observable1 = Observable.just(1, 2, 3); Observable<Integer> observable2 = Observable.just(4, 5, 6); Observable.concat(observable1, observable2) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印1,2,3,4,5,6 } });
-
startWith:在數(shù)據(jù)序列的開頭增加一項數(shù)據(jù)爹袁,內部調用concat
Observable.just(1, 2, 3) .startWith(Observable.just(4, 5)) //添加一個Observable .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印4,5,1,2,3 } }); Observable.just(1,2,3) .startWith(4,5) //添加多個數(shù)據(jù) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印4,5,1,2,3 } }); List<Integer> integers = new ArrayList<>(); integers.add(4); integers.add(5); Observable.just(1,2,3) .startWith(integers) //添加一個集合 .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印4,5,1,2,3 } });
merge / mergeDelayError:將多個Observable合并為一個譬淳。不同于concat邻梆,merge不是按照添加順序連接,而是按照時間線來連接剂娄。其中mergeDelayError將異常延遲到其它沒有錯誤的Observable發(fā)送完畢后才發(fā)射。而merge則是一遇到異常將停止發(fā)射數(shù)據(jù),發(fā)送onError通知
Observable<Integer> observable1 = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
SystemClock.sleep(1000);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.computation());
Observable<Integer> observable2 = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
SystemClock.sleep(500);
subscriber.onNext(4);
subscriber.onNext(5);
SystemClock.sleep(1000);
subscriber.onNext(6);
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.computation());
Observable.merge(observable1, observable2)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印1庭砍,4怠缸,5揭北,2,3疚俱,6
}
});
- zip:使用一個函數(shù)組合多個Observable發(fā)射的數(shù)據(jù)集合呆奕,再發(fā)射這個結果。如果多個Observable發(fā)射的數(shù)據(jù)量不一樣姆泻,則以最少的Observable為標準進行壓合
Observable<Integer> observable1 = Observable.just(1, 2, 3, 4, 5);
Observable<String> observable2 = Observable.just("A", "B", "C", "D");
Observable.zip(observable1, observable2, new Func2<Integer, String, String>() {
@Override
public String call(Integer integer, String s) {
return integer + s;
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.d("debug", s); //打印1A,2B,3C,4D
}
});
- combineLatest:當兩個Observable中任何一個發(fā)射了一個數(shù)據(jù)時四苇,通過一個指定的函數(shù)組合每個Observable發(fā)射的最新數(shù)據(jù)(一共兩個數(shù)據(jù))蛔琅,然后發(fā)射這個函數(shù)的結果罗售。類似于zip,但是寨躁,不同的是zip只有在每個Observable都發(fā)射了數(shù)據(jù)才工作,而combineLatest任何一個發(fā)射了數(shù)據(jù)都可以工作放钦,每次與另一個Observable最近的數(shù)據(jù)壓合
Observable<Integer> observable1 = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
SystemClock.sleep(500);
subscriber.onNext(2);
SystemClock.sleep(1000);
subscriber.onNext(3);
SystemClock.sleep(300);
subscriber.onNext(4);
SystemClock.sleep(500);
subscriber.onNext(5);
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.computation());
Observable<String> observable2 = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
SystemClock.sleep(300);
subscriber.onNext("A");
SystemClock.sleep(300);
subscriber.onNext("B");
SystemClock.sleep(500);
subscriber.onNext("C");
subscriber.onNext("D");
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.computation());
Observable.combineLatest(observable1, observable2, new Func2<Integer, String, String>() {
@Override
public String call(Integer integer, String s) {
return integer + s;
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.d("debug", s); //打印1A,2A,2B,2C,2D,3D,4D,5D
}
});
過濾操作
-
filter:過濾數(shù)據(jù)
Observable.just(1, 2, 3, 4) .filter(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer % 2 == 0; //過濾偶數(shù) } }) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印2横腿,4 } });
-
ofType:過濾指定類型數(shù)據(jù)
Observable.just(1, "2", 3, "4") .ofType(Integer.class) //過濾整形數(shù)據(jù) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印1耿焊,3 } });
-
take:只發(fā)射前n項數(shù)據(jù)或者一定時間內的數(shù)據(jù)(無需考慮索引越界問題器腋,配合interval操作符可作為定時器使用)
Observable.just(1, 2, 3, 4) .take(2) //只發(fā)射前2項 .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印1蒂培,2 } }); Observable.interval(1, TimeUnit.SECONDS) .take(3, TimeUnit.SECONDS) //只發(fā)射3秒內的數(shù)據(jù) .subscribe(new Action1<Long>() { @Override public void call(Long aLong) { //打印0,1(打印出來的并不是相像中的0,1,2,應該與代碼代碼執(zhí)行的時間有關媳荒,使用時需要注意G怼) Log.d("debug", aLong.toString()); } });
-
takeLast:只發(fā)射最后的N項數(shù)據(jù)或者一定時間內的數(shù)據(jù)(無需考慮索引越界問題)
Observable.just(1, 2, 3, 4) .takeLast(3) //只發(fā)射后3項 .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印2衔沼,3指蚁,4 } }); Observable.interval(1, TimeUnit.SECONDS) .take(10) //每1秒發(fā)射一個數(shù)據(jù)凝化,發(fā)射10秒 .takeLast(3, TimeUnit.SECONDS) //只發(fā)射最后3秒的數(shù)據(jù) .subscribe(new Action1<Long>() { @Override public void call(Long aLong) { Log.d("debug", aLong.toString()); //打印6,7枪向,8遣疯,9(同樣存在些許誤差数苫,使用時需注意!) } }); Observable.interval(1, TimeUnit.SECONDS) .take(10) .takeLast(2, 3, TimeUnit.SECONDS) //只發(fā)射最后3秒內的最后2個數(shù)據(jù) .subscribe(new Action1<Long>() { @Override public void call(Long aLong) { Log.d("debug", aLong.toString()); //打印8虐急,9 } });
-
takeFirst:只發(fā)射滿足條件的第一項(其實就是filter+take)
Observable.just(1, 2, 3, 4) .takeFirst(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer > 1; } }) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印2 } });
*first / firstOrDefault:只發(fā)射第一項或者滿足條件的第一項數(shù)據(jù)被辑,其中firstOrDefault可以指定默認值(建議使用firstOrDefault盼理,找不到對應元素時first會報異常)
Observable.just(1, 2, 3)
.first() //發(fā)射第一項
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印1
}
});
Observable.just(1, 2, 3, 4)
.first(new Func1<Integer, Boolean>() { //發(fā)射大于2的第一項
@Override
public Boolean call(Integer integer) {
return integer > 2;
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印3
}
});
Integer[] arr = {};
Observable.from(arr)
.firstOrDefault(2) //發(fā)射第一項奏路,沒有可發(fā)射的數(shù)據(jù)時鸽粉,發(fā)射默認值2
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印2
}
});
-
last / lastOrDefault:只發(fā)射最后一項或滿足條件的最后一項,其中l(wèi)astOrDefault可以指定默認值
Observable.just(1, 2, 3) .last() //發(fā)射最后一項 .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印3 } }); Observable.just(1, 2, 3, 4) .last(new Func1<Integer, Boolean>() { //發(fā)射大于2的最后一項 @Override public Boolean call(Integer integer) { return integer > 2; } }) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印4 } }); Integer[] arr = {}; Observable.from(arr) .lastOrDefault(2) //發(fā)射最后一項,沒有可發(fā)射的數(shù)據(jù)時椒舵,發(fā)射默認值2 .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印2 } });
-
skip:跳過開始的n項數(shù)據(jù)或者一定時間內的數(shù)據(jù)(與take類似)
Observable.just(1, 2, 3, 4) .skip(2) //跳過前2項 .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印3,4 } }); Observable.interval(1, TimeUnit.SECONDS) .take(5) .skip(3, TimeUnit.SECONDS) //跳過前3秒 .subscribe(new Action1<Long>() { @Override public void call(Long aLong) { Log.d("debug", aLong.toString()); //打印2泼橘,3炬灭,4,同樣存在誤差! } });
-
skipLast:跳過最后的n項數(shù)據(jù)或一定時間內的數(shù)據(jù)
Observable.just(1, 2, 3, 4) .skipLast(2) //跳過最后2項 .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印1鼻吮,2 } }); Observable.interval(1, TimeUnit.SECONDS) .take(7) .skipLast(3, TimeUnit.SECONDS) //跳過最后3秒 .subscribe(new Action1<Long>() { @Override public void call(Long aLong) { Log.d("debug", aLong.toString()); //打印0,1香椎,2,同樣存在誤差畜伐! } });
-
elementAt / elementAtOrDefault:發(fā)射某一項數(shù)據(jù)讼积,其中elementAtOrDefault可以指定索引越界時發(fā)射的默認值
Observable.just(1, 2, 3, 4) .elementAt(2) //發(fā)射索引為2的數(shù)據(jù) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印3 } }); Observable.just(1, 2, 3) .elementAtOrDefault(4, 5) //發(fā)射索引為4的數(shù)據(jù)勤众,索引越界時發(fā)射默認數(shù)據(jù)5 .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印5 } });
ignoreElements:丟棄所有數(shù)據(jù),只發(fā)射錯誤或正常終止的通知窥突,即只觸發(fā)觀察者的onError()或onCompleted()方法
-
distinct:過濾重復數(shù)據(jù),可指定判定唯一的標準
Observable.just(1, 1, 2, 3, 2, 4) .distinct() .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印1称近,2刨秆,3,4 } }); Observable.just(1, 1, 2, 3, 2, 4) //根據(jù)發(fā)射的數(shù)據(jù)生成對應的key缓醋,通過key值來判斷唯一改衩,如果兩個數(shù)據(jù)的key相同竭鞍,則只發(fā)射第一個 .distinct(new Func1<Integer, Integer>() { @Override public Integer call(Integer integer) { //奇數(shù)對應的key為1冯乘,偶數(shù)對應的key為2 if (integer % 2 == 0) { return 2; } return 1; } }) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印1裆馒,2 } });
-
distinctUntilChanged:過濾掉連續(xù)重復的數(shù)據(jù)翔横,可指定判定唯一的標準
Observable.just(1, 1, 2, 3, 2, 4) .distinctUntilChanged() .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印1禾唁,2,3掘托,2闪盔,4 } }); Observable.just(1, 1, 2, 3, 2, 4) //根據(jù)發(fā)射的數(shù)據(jù)生成對應的key,通過key值來判斷唯一族淮,如果兩個數(shù)據(jù)的key相同,則只發(fā)射第一個 .distinctUntilChanged(new Func1<Integer, Integer>() { @Override public Integer call(Integer integer) { //奇數(shù)對應的key為1蝙斜,偶數(shù)對應的key為2 if (integer % 2 == 0) { return 2; } return 1; } }) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印1,2稚伍,3,2 } }); Observable.just(1, 1, 2, 3, 2, 4) //傳入比較器的方式 .distinctUntilChanged(new Func2<Integer, Integer, Boolean>() { @Override public Boolean call(Integer integer, Integer integer2) { return integer % 2 == integer2 % 2; //同為奇數(shù)或偶數(shù)返回true } }) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印1垦搬,2对雪,3慌植,2 } });
-
throttleFirst:定期發(fā)射Observable在該時間段發(fā)射的第一項數(shù)據(jù)
Observable.interval(0, 500, TimeUnit.MILLISECONDS) .take(10) //每500毫秒發(fā)射一次數(shù)據(jù),發(fā)射10次 .throttleFirst(1000, TimeUnit.MILLISECONDS) //每1秒發(fā)射該秒內發(fā)射數(shù)據(jù)中的第一項數(shù)據(jù) .subscribe(new Action1<Long>() { @Override public void call(Long aLong) { //打印0,2芙扎,5戒洼,8(即第一秒發(fā)射0,1磷蜀,第二秒發(fā)射2,3庶弃,4,第三秒發(fā)射5,6,7鸡捐,第四秒發(fā)射8煎源,9)歇僧,同樣存在誤差兽埃! Log.d("debug", aLong.toString()); } });
-
throttleWithTimeout / debounce(兩者使用及效果相同):發(fā)射數(shù)據(jù)時舷夺,如果兩次數(shù)據(jù)的發(fā)射間隔小于指定時間,就會丟棄前一次的數(shù)據(jù),直到指定時間內都沒有新數(shù)據(jù)發(fā)射時才進行發(fā)射
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { //依次發(fā)射1-6,發(fā)射間隔不同 subscriber.onNext(1); SystemClock.sleep(500); subscriber.onNext(2); SystemClock.sleep(500); subscriber.onNext(3); SystemClock.sleep(1000); subscriber.onNext(4); SystemClock.sleep(1000); subscriber.onNext(5); SystemClock.sleep(500); subscriber.onNext(6); subscriber.onCompleted(); } }).throttleWithTimeout(700, TimeUnit.MILLISECONDS) //指定最小發(fā)射間隔時間為700毫秒 .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印3蔓同,4,6 } });
-
sample / throttleLast(兩者使用及效果相同):定期發(fā)射Observable在該時間段發(fā)射的最后一項數(shù)據(jù)弃揽,與throttleFirst相反
Observable.interval(0, 500, TimeUnit.MILLISECONDS) .take(10) //每500毫秒發(fā)射一次數(shù)據(jù)矿微,發(fā)射10次 .throttleLast(1000, TimeUnit.MILLISECONDS) //每1秒發(fā)射該秒內發(fā)射數(shù)據(jù)中的最后一項數(shù)據(jù) .subscribe(new Action1<Long>() { @Override public void call(Long aLong) { //打印1,3掖举,5,7励负,9(即第一秒發(fā)射0熄守,1耗跛,第二秒發(fā)射2调塌,3羔砾,第三秒發(fā)射4,5政溃,第四秒發(fā)射6董虱,7愤诱,第五秒發(fā)射8捐友,9) Log.d("debug", aLong.toString()); } });
-
timeout:如果指定時間內沒有發(fā)射任何數(shù)據(jù)匣砖,就發(fā)射一個異常或者使用備用的Observavle
Observable.timer(5, TimeUnit.SECONDS) .timeout(3, TimeUnit.SECONDS) //超時則發(fā)射異常 .subscribe(new Subscriber<Long>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { Log.d("debug", "onError()"); //拋出異常 } @Override public void onNext(Long aLong) { Log.d("debug", aLong.toString()); } }); Observable.timer(5, TimeUnit.SECONDS) .timeout(3, TimeUnit.SECONDS, Observable.just(2L)) //設置備用Observable .subscribe(new Subscriber<Long>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { Log.d("debug", "onError()"); } @Override public void onNext(Long aLong) { Log.d("debug", aLong.toString()); //發(fā)射備用Observable影涉,打印2 } });
條件 / 布爾操作
-
all:判斷所有數(shù)據(jù)中是否都滿足某個條件
Observable.just(1, 2, 3, 4) .all(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer < 5;//所有項都小于5 } }) .subscribe(new Action1<Boolean>() { @Override public void call(Boolean aBoolean) { Log.d("debug", aBoolean.toString()); //打印true } }); Observable.just(1, 2, 3, 4) .all(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer > 2;//所有項都大于2 } }) .subscribe(new Action1<Boolean>() { @Override public void call(Boolean aBoolean) { Log.d("debug", aBoolean.toString()); //打印false } });
-
exists:判斷是否存在數(shù)據(jù)項滿足某個條件
Observable.just(1, 2, 3, 4) .exists(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer > 2; //存在某項大于2 } }) .subscribe(new Action1<Boolean>() { @Override public void call(Boolean aBoolean) { Log.d("debug", aBoolean.toString()); //打印true } });
-
contains:判斷所有數(shù)據(jù)中是否包含指定的數(shù)據(jù)(內部調用exists)
Observable.just(1, 2, 3, 4) .contains(2) //是否包含2 .subscribe(new Action1<Boolean>() { @Override public void call(Boolean aBoolean) { Log.d("debug", aBoolean.toString()); //打印true } });
-
sequenceEqual:判斷兩個Observable發(fā)射的數(shù)據(jù)是否相同(數(shù)據(jù)规伐,發(fā)射順序猖闪,終止狀態(tài))
Observable.sequenceEqual(Observable.just(1, 2, 3), Observable.just(1, 2, 3)) .subscribe(new Action1<Boolean>() { @Override public void call(Boolean aBoolean) { Log.d("debug", aBoolean.toString()); //打印true } });
-
isEmpty:用于判斷Observable是否沒有發(fā)射任何數(shù)據(jù)(發(fā)射null返回為false)
Observable.from(new ArrayList<Integer>()) //集合中沒有數(shù)據(jù) .isEmpty() .subscribe(new Action1<Boolean>() { @Override public void call(Boolean aBoolean) { Log.d("debug", aBoolean.toString()); //打印true } }); Observable.empty() .isEmpty() .subscribe(new Action1<Boolean>() { @Override public void call(Boolean aBoolean) { Log.d("debug", aBoolean.toString()); //打印true } });
-
amber:指定多個Observable,只允許第一個開始發(fā)射數(shù)據(jù)的Observable發(fā)射全部數(shù)據(jù)培慌,其他Observable將會被會忽略
Observable<Integer> observable1 = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { SystemClock.sleep(500); //延遲500毫秒 subscriber.onNext(1); subscriber.onNext(2); subscriber.onCompleted(); } }.subscribeOn(Schedulers.computation())); //指定為新的線程 Observable<String> observable2 = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("a"); subscriber.onNext("b"); subscriber.onCompleted(); } }); Observable.amb(observable1, observable2) .subscribe(new Action1<Serializable>() { @Override public void call(Serializable serializable) { Log.d("debug", serializable.toString()); //打印a,b } });
-
switchIfEmpty:如果原始Observable正常終止后仍沒有發(fā)射任何數(shù)據(jù)吵护,就使用備用的Observable
Observable.from(new ArrayList<Integer>()) .switchIfEmpty(Observable.just(1, 2)) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印1馅而,2 } });
-
defaultIfEmpty:如果原始Observable正常終止后仍沒有發(fā)射任何數(shù)據(jù)瓮恭,就發(fā)射一個默認值(內部調用switchIfEmpty)
Observable.from(new ArrayList<Integer>()) .defaultIfEmpty(1) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印1 } });
-
takeUntil:當發(fā)射的數(shù)據(jù)滿足某個條件后(包含該數(shù)據(jù))屯蹦,或者第二個Observable發(fā)射了一項數(shù)據(jù)或發(fā)射了一個終止通知時(觀察者接受不到第二個Observable發(fā)射的數(shù)據(jù))登澜,終止第一個Observable發(fā)送數(shù)據(jù)
Observable.just(1, 2, 3, 4) .takeUntil(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer == 3; } }) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", "just" + integer.toString()); //打印1脑蠕,2空郊,3 } }); Observable.interval(0, 500, TimeUnit.MILLISECONDS) .subscribeOn(Schedulers.computation()) .takeUntil(Observable.timer(1200, TimeUnit.MILLISECONDS)) .subscribe(new Action1<Long>() { @Override public void call(Long aLong) { Log.d("debug", aLong.toString()); //打印0狞甚,1哼审,2 } });
-
takeWhile:當發(fā)射的數(shù)據(jù)對應某個條件為false時(不包含該數(shù)據(jù))涩盾,Observable終止發(fā)送數(shù)據(jù)
Observable.just(1, 2, 3, 4) .takeWhile(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer != 3; } }) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印1春霍,2 } });
-
skipUnit:丟棄Observable發(fā)射的數(shù)據(jù)址儒,直到第二個Observable開始發(fā)射數(shù)據(jù)或者發(fā)射一個終止通知時
Observable.interval(0, 500, TimeUnit.MILLISECONDS) .take(5) .subscribeOn(Schedulers.computation()) .skipUntil(Observable.timer(1200, TimeUnit.MILLISECONDS)) .subscribe(new Action1<Long>() { @Override public void call(Long aLong) { Log.d("debug", aLong.toString()); //打印3莲趣,4 } });
-
skipWhile:丟棄Observable發(fā)射的數(shù)據(jù)喧伞,直到一個指定的條件不成立(不丟棄條件數(shù)據(jù))
Observable.just(1, 2, 3, 4) .skipWhile(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer < 3; } }) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印3潘鲫,4 } });
聚合操作
-
reduce:用一個函數(shù)接收Observable發(fā)射的數(shù)據(jù)次舌,將函數(shù)的計算結果作為下次計算的參數(shù),最后輸出結果挪圾。
Observable.just(1, 2, 3, 4) .reduce(new Func2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) { Log.d("debug", "integer1:" + integer + ",integer2:" + integer2); return integer + integer2; //求和操作 } }) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", "result:" + integer); } }); /** * 日志輸出 * integer1:1,integer2:2 * integer1:3,integer2:3 * integer1:6,integer2:4 * result:10 */
-
collect:用于將數(shù)據(jù)收集到一個可變的數(shù)據(jù)結構(如List,Map)
Observable.just(1, 2, 3, 4) .collect(new Func0<List<Integer>>() { @Override public List<Integer> call() { return new ArrayList<Integer>(); //創(chuàng)建List用于收集數(shù)據(jù) } }, new Action2<List<Integer>, Integer>() { @Override public void call(List<Integer> integers, Integer integer) { integers.add(integer); //將數(shù)據(jù)添加到List中 } }) .subscribe(new Action1<List<Integer>>() { @Override public void call(List<Integer> integers) { Log.d("debug", integers.toString()); //打印[1, 2, 3, 4] } }); Observable.just(1, 2, 3, 4) .collect(new Func0<Map<Integer, String>>() { @Override public Map<Integer, String> call() { return new HashMap<Integer, String>(); //創(chuàng)建Map用于收集數(shù)據(jù) } }, new Action2<Map<Integer, String>, Integer>() { @Override public void call(Map<Integer, String> integerStringMap, Integer integer) { integerStringMap.put(integer, "value" + integer); //將數(shù)據(jù)添加到Map中 } }) .subscribe(new Action1<Map<Integer, String>>() { @Override public void call(Map<Integer, String> integerStringMap) { //打印{4=value4, 1=value1, 3=value3, 2=value2},注:HashMap保存的數(shù)據(jù)是無序的 Log.d("debug", integerStringMap.toString()); } });
-
count / countLong:計算發(fā)射的數(shù)量棚赔,內部調用的是reduce
Observable.just(1, 2, 3, 4) .count() .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", "integer:" + integer.toString()); //打印4 } });
轉換操作
-
toList:將Observable發(fā)射的所有數(shù)據(jù)收集到一個列表中靠益,返回這個列表
Observable.just(1, 2, 3, 4) .toList() .subscribe(new Action1<List<Integer>>() { @Override public void call(List<Integer> integers) { Log.d("debug", integers.toString()); //打印[1, 2, 3, 4] } });
-
toSortedList:將Observable發(fā)射的所有數(shù)據(jù)收集到一個有序列表中,返回這個列表
Observable.just(3, 2, 5, 4, 1) .toSortedList() //默認升序排序 .subscribe(new Action1<List<Integer>>() { @Override public void call(List<Integer> integers) { Log.d("debug", integers.toString()); //打印[1, 2, 3, 4, 5] } }); Observable.just(3, 2, 5, 4, 1) .toSortedList(new Func2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) { return integer2 - integer; //自定義排序規(guī)則(倒序) } }) .subscribe(new Action1<List<Integer>>() { @Override public void call(List<Integer> integers) { Log.d("debug", integers.toString()); //打印[5, 4, 3, 2, 1] } });
-
toMap:將序列數(shù)據(jù)轉換為一個Map壳快,根據(jù)數(shù)據(jù)項生成key和value
Observable.just(1, 2, 3, 4) .toMap(new Func1<Integer, String>() { //根據(jù)數(shù)據(jù)項生成key眶痰,value為原始數(shù)據(jù) @Override public String call(Integer integer) { return "key:" + integer; } }) .subscribe(new Action1<Map<String, Integer>>() { @Override public void call(Map<String, Integer> stringIntegerMap) { Log.d("debug", stringIntegerMap.toString()); //打印{key:4=4, key:2=2, key:1=1, key:3=3} } }); Observable.just(1, 2, 3, 4) .toMap(new Func1<Integer, String>() { //根據(jù)數(shù)據(jù)項生成key和value @Override public String call(Integer integer) { return "key:" + integer; } }, new Func1<Integer, String>() { @Override public String call(Integer integer) { return "value:" + integer; } }) .subscribe(new Action1<Map<String, String>>() { @Override public void call(Map<String, String> stringStringMap) { Log.d("debug", stringStringMap.toString()); //打印{key:4=value:4, key:2=value:2, key:1=value:1, key:3=value:3} } }); Observable.just(1, 2, 3, 4) .toMap(new Func1<Integer, String>() { //根據(jù)數(shù)據(jù)項生成key和value竖伯,創(chuàng)建指定類型的Map @Override public String call(Integer integer) { return "key:" + integer; } }, new Func1<Integer, String>() { @Override public String call(Integer integer) { return "value:" + integer; } }, new Func0<Map<String, String>>() { @Override public Map<String, String> call() { return new LinkedHashMap<String, String>(); //LinkedHashMap保證存取順序相同 } }) .subscribe(new Action1<Map<String, String>>() { @Override public void call(Map<String, String> stringStringMap) { Log.d("debug", stringStringMap.toString()); //打印{key:1=value:1, key:2=value:2, key:3=value:3, key:4=value:4} } });
-
toMultiMap:類似toMap宏胯,不同的地方在于map的value是一個集合,使一個key可以映射多個value婚惫,多用于分組
Observable.just(1, 2, 1, 4) .toMultimap(new Func1<Integer, String>() { //根據(jù)數(shù)據(jù)項生成key魂爪,value為原始數(shù)據(jù) @Override public String call(Integer integer) { return "key:" + integer; } }) .subscribe(new Action1<Map<String, Collection<Integer>>>() { @Override public void call(Map<String, Collection<Integer>> stringCollectionMap) { Log.d("debug", stringCollectionMap.toString()); //打印{key:4=[4], key:2=[2], key:1=[1, 1]} } }); Observable.just(1, 2, 1, 4) .toMap(new Func1<Integer, String>() { @Override public String call(Integer integer) { return "key:" + integer; } }) .subscribe(new Action1<Map<String, Integer>>() { @Override public void call(Map<String, Integer> stringIntegerMap) { Log.d("debug", stringIntegerMap.toString()); //打印{key:4=4, key:2=2, key:1=1} } });
變換操作
-
map:對Observable發(fā)射的每一項數(shù)據(jù)都應用一個函數(shù)進行變換
Observable.just(1, 2, 3, 4) .map(new Func1<Integer, String>() { @Override public String call(Integer integer) { return "item:" + integer; } }) .subscribe(new Action1<String>() { @Override public void call(String s) { Log.d("debug", s); //打印item:1,item:2,item:3,item:4 } });
-
cast:在發(fā)射之前強制將Observable發(fā)射的所有數(shù)據(jù)轉換為指定類型(父類強轉為子類)
List list = new ArrayList(); Observable.just(list) .cast(ArrayList.class) //將List強轉為ArrayList .subscribe(new Action1<ArrayList>() { @Override public void call(ArrayList arrayList) { } });
-
flatMap:將Observable發(fā)射的數(shù)據(jù)變換為Observables集合蒋川,然后將這些Observable發(fā)射的數(shù)據(jù)平坦化的放進一個單獨的Observable,內部使用marge合并(可用于一對多轉換或多對多轉換捺球,也可用于網(wǎng)絡請求的嵌套)
Observable.just(1, 2, 3) .flatMap(new Func1<Integer, Observable<Integer>>() { @Override public Observable<Integer> call(Integer integer) { return Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(integer * 10); subscriber.onNext(integer * 100); subscriber.onCompleted(); } }); } }) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印10氮兵,100泣栈,20南片,200疼进,30螺捐,300 } });
-
flatMapIterable:和flatMap作用一樣,只不過生成的是Iterable而不是Observable
Observable.just(1, 2, 3) .flatMapIterable(new Func1<Integer, Iterable<Integer>>() { @Override public Iterable<Integer> call(Integer integer) { return Arrays.asList(integer * 10, integer * 100); } }) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", integer.toString()); //打印10,100灾票,20刊苍,200正什,30婴氮,300 } });
concatMap:類似于flatMap主经,由于內部使用concat合并,所以是按照順序連接發(fā)射
-
switchMap:和flatMap很像穗酥,將Observable發(fā)射的數(shù)據(jù)變換為Observables集合砾跃,當原始Observable發(fā)射一個新的數(shù)據(jù)(Observable)時蜓席,它將取消訂閱前一個Observable
Observable.interval(0, 500, TimeUnit.MILLISECONDS) //每500毫秒發(fā)射一次 .take(4) .switchMap(new Func1<Long, Observable<String>>() { @Override public Observable<String> call(Long aLong) { return Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext(aLong + "A"); SystemClock.sleep(800); //延遲800毫秒 subscriber.onNext(aLong + "B"); subscriber.onCompleted(); } }).subscribeOn(Schedulers.newThread()); } }) .subscribe(new Action1<String>() { @Override public void call(String s) { Log.d("debug", s); //打印0A,1A渺贤,2A志鞍,3A固棚,3B } });
-
與reduce很像,對Observable發(fā)射的每一項數(shù)據(jù)應用一個函數(shù)委粉,然后按順序依次發(fā)射每一個值
Observable.just(1, 2, 3, 4) .scan(new Func2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) { Log.d("debug", "integer1:" + integer + ",integer2:" + integer2); return integer + integer2; } }) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", "result:" + integer); } }); /** * 日志輸出 * result:1 * integer1:1,integer2:2 * result:3 * integer1:3,integer2:3 * result:6 * integer1:6,integer2:4 * result:10 */
-
groupBy:將Observable分拆為Observable集合贾节,將原始Observable發(fā)射的數(shù)據(jù)按Key分組栗涂,每個Observable發(fā)射一組不同的數(shù)據(jù)(類似于toMultiMap)
Observable.just(1, 2, 3, 4) .groupBy(new Func1<Integer, String>() { @Override public String call(Integer integer) { //根據(jù)數(shù)據(jù)項生成key return integer % 2 == 0 ? "偶數(shù)" : "奇數(shù)"; } }) .subscribe(new Action1<GroupedObservable<String, Integer>>() { @Override public void call(GroupedObservable<String, Integer> o) { o.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", o.getKey() + ":" + integer); //打印奇數(shù):1斤程,偶數(shù):2暖释,奇數(shù):3,偶數(shù):4 } }); } }); Observable.just(1, 2, 3, 4) .groupBy(new Func1<Integer, String>() { @Override public String call(Integer integer) { //根據(jù)數(shù)據(jù)項生成key return integer % 2 == 0 ? "偶數(shù)" : "奇數(shù)"; } }, new Func1<Integer, Integer>() { @Override public Integer call(Integer integer) { //根據(jù)數(shù)據(jù)項生成value return integer * 10; } }) .subscribe(new Action1<GroupedObservable<String, Integer>>() { @Override public void call(GroupedObservable<String, Integer> o) { o.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d("debug", o.getKey() + ":" + integer); //打印奇數(shù):10,偶數(shù):20橄杨,奇數(shù):30式矫,偶數(shù):40 } }); } });
-
buffer:定期從Observable收集數(shù)據(jù)到一個集合采转,然后將這些數(shù)據(jù)集合打包發(fā)射
Observable.just(1, 2, 3, 4, 5) .buffer(3, 1) //count:表示從當前指針位置開始打包3個數(shù)據(jù)項到集合中故慈,skip:表示指針向后移1位察绷, .subscribe(new Action1<List<Integer>>() { @Override public void call(List<Integer> integers) { Log.d("debug", "skip" + integers.toString()); //打印[1, 2, 3]拆撼,[2, 3, 4],[3, 4, 5]蚜印,[4, 5]娶视,[5] } }); Observable.just(1, 2, 3, 4, 5) .buffer(3) //每3個打包成一個集合,內部就是.buffer(3,3) .subscribe(new Action1<List<Integer>>() { @Override public void call(List<Integer> integers) { Log.d("debug", integers.toString()); //打印[1, 2, 3]寝凌,[4, 5] } }); Observable.interval(0, 100, TimeUnit.MILLISECONDS) .take(5) .buffer(250, TimeUnit.MILLISECONDS, 2) //將每250毫秒內發(fā)射的數(shù)據(jù)收集到多個集合中,每個集合最多存放2個數(shù)據(jù) .subscribe(new Action1<List<Long>>() { @Override public void call(List<Long> longs) { //打印[0, 1]较木,[2]青柄,[3, 4]致开,[] Log.d("debug", "count:" + longs.toString()); } }); Observable.interval(0, 100, TimeUnit.MILLISECONDS) .take(5) .buffer(250, TimeUnit.MILLISECONDS) //將每250毫秒內發(fā)射的數(shù)據(jù)收集到一個集合中双戳,集合不限制大小 .subscribe(new Action1<List<Long>>() { @Override public void call(List<Long> longs) { Log.d("debug", longs.toString()); //打印[0, 1, 2]飒货,[3, 4] } }); Observable.interval(0, 100, TimeUnit.MILLISECONDS) .take(5) //從指定時間節(jié)點開始塘辅,將該節(jié)點后250毫秒內發(fā)射的數(shù)據(jù)收集的一個集合中扣墩,初始節(jié)點為0呻惕,每發(fā)射一次集合蟆融, //節(jié)點的時間增加150毫秒,即下一次收集數(shù)據(jù)從150毫秒開始查乒,收集150毫秒到400毫秒之間發(fā)射的數(shù)據(jù) .buffer(250, 150, TimeUnit.MILLISECONDS) .subscribe(new Action1<List<Long>>() { @Override public void call(List<Long> longs) { Log.d("debug", longs.toString()); //打印[0, 1, 2]玛迄,[2蓖议,3]勒虾,[4] } });
-
window:定期將來自Observable的數(shù)據(jù)分拆成一些Observable窗口修然,然后發(fā)射這些窗口质况,而不是每次發(fā)射一項结榄,類似于buffer臼朗,buffer發(fā)射的是集合依溯,而window發(fā)射的是Observable
Observable.just(1, 2, 3, 4, 5) .window(3, 1) .subscribe(new Action1<Observable<Integer>>() { @Override public void call(Observable<Integer> integerObservable) { integerObservable.toList() //將所有數(shù)據(jù)搜集成一個集合黎炉,便于觀察 .subscribe(new Action1<List<Integer>>() { @Override public void call(List<Integer> integers) { Log.d("debug", integers.toString()); //打印[1, 2, 3]慷嗜,[2, 3, 4]庆械,[3, 4, 5]缭乘,[4, 5]堕绩,[5] } }); } }); Observable.just(1, 2, 3, 4, 5) .window(3) //相當于window(3,3) .subscribe(new Action1<Observable<Integer>>() { @Override public void call(Observable<Integer> integerObservable) { integerObservable.toList() //將所有數(shù)據(jù)搜集成一個集合奴紧,便于觀察 .subscribe(new Action1<List<Integer>>() { @Override public void call(List<Integer> integers) { Log.d("debug", integers.toString()); //打印[1, 2, 3]黍氮,[4, 5] } }); } }); //剩下其余重載方法也與buffer基本一樣沫浆,不重復了