目錄
- 一:創(chuàng)建操作
- 二:合并操作
- 三:過(guò)濾操作
- 四:切換線程
- 五:條件/布爾操作
- 六:聚合操作
- 七:轉(zhuǎn)換操作
- 八:變換操作
- 九:錯(cuò)誤處理/重試機(jī)制
- 十:連接操作
- 十一:阻塞操作
- 十二:工具集
- 十三:Flowable (2.0出來(lái)的) 非操作符
前言
RxJava,這個(gè)詞爸邢,如果是android 開(kāi)發(fā)的小伙伴韵吨,估計(jì)早就聽(tīng)過(guò)不知道多少遍了茫蛹,如果你對(duì)RxJava 一點(diǎn)都不了解,推薦RxJava 入門(mén) 拋物線寫(xiě)的會(huì)讓你對(duì)RxJava罢浇,有個(gè)認(rèn)識(shí),本文只要是記錄操作符,畢竟太多了抓歼,記不住啊
一:創(chuàng)建操作
create
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("item1");
subscriber.onNext("item2");
subscriber.onCompleted();
}
});
just
Observable observable = Observable.just("Hello", "Hi", "Aloha");
// 將會(huì)依次調(diào)用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
from
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
// 將會(huì)依次調(diào)用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
empty
創(chuàng)建一個(gè)什么都不做直接通知完成的Observable
error
創(chuàng)建一個(gè)什么都不做直接通知錯(cuò)誤的Observable
never
創(chuàng)建一個(gè)什么都不做的Observable
Observable observable1=Observable.empty();//直接調(diào)用onCompleted。
Observable observable2=Observable.error(new RuntimeException());//直接調(diào)用onError示绊。這里可以自定義異常
Observable observable3=Observable.never();//啥都不做
timer
創(chuàng)建一個(gè)在給定的延時(shí)之后
Observable.timer(1000,TimeUnit.MILLISECONDS)
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.d("JG",aLong.toString()); // 0
}
});
interval
創(chuàng)建一個(gè)按照給定的時(shí)間間隔發(fā)射從0開(kāi)始的整數(shù)序列的
Observable.interval(1, TimeUnit.SECONDS)
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
//每隔1秒發(fā)送數(shù)據(jù)項(xiàng)锭部,從0開(kāi)始計(jì)數(shù)
//0,1,2,3....
}
});
range:
創(chuàng)建一個(gè)發(fā)射指定范圍的整數(shù)序列的Observable<Integer>
Observable.range(2,5).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("JG",integer.toString());// 2,3,4,5,6 從2開(kāi)始發(fā)射5個(gè)數(shù)據(jù)
}
});
defer
只有當(dāng)訂閱者訂閱才創(chuàng)建Observable,為每個(gè)訂閱創(chuàng)建一個(gè)新的Observable面褐。內(nèi)部通過(guò)OnSubscribeDefer在訂閱時(shí)調(diào)用Func0創(chuàng)建Observable拌禾。
Observable.defer(new Func0<Observable<String>>() {
@Override
public Observable<String> call() {
return Observable.just("hello");
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.d("JG",s);
}
});
二:合并操作
concat
按順序連接多個(gè)Observables。需要注意的是Observable.concat(a,b)等價(jià)于a.concatWith(b)展哭。
Observable<Integer> observable1=Observable.just(1,2,3,4);
Observable<Integer> observable2=Observable.just(4,5,6);
Observable.concat(observable1,observable2)
.subscribe(item->Log.d("JG",item.toString()));//1,2,3,4,4,5,6
startWith
在數(shù)據(jù)序列的開(kāi)頭增加一項(xiàng)數(shù)據(jù)湃窍。startWith的內(nèi)部也是調(diào)用了concat
Observable.just(1,2,3,4,5)
.startWith(6,7,8)
.subscribe(item->Log.d("JG",item.toString()));//6,7,8,1,2,3,4,5
merge
將多個(gè)Observable合并為一個(gè)。不同于concat匪傍,merge不是按照添加順序連接您市,而是按照時(shí)間線來(lái)連接。其中mergeDelayError
將異常延遲到其它沒(méi)有錯(cuò)誤的Observable發(fā)送完畢后才發(fā)射役衡。而merge則是一遇到異常將停止發(fā)射數(shù)據(jù)茵休,發(fā)送onError通知。
zip
使用一個(gè)函數(shù)組合多個(gè)Observable發(fā)射的數(shù)據(jù)集合手蝎,然后再發(fā)射這個(gè)結(jié)果榕莺。如果多個(gè)Observable發(fā)射的數(shù)據(jù)量不一樣,則以最少的Observable為標(biāo)準(zhǔn)進(jìn)行壓合棵介。內(nèi)部通過(guò)OperatorZip進(jìn)行壓合
Observable<Integer> observable1=Observable.just(1,2,3,4);
Observable<Integer> observable2=Observable.just(4,5,6);
Observable.zip(observable1, observable2, new Func2<Integer, Integer, String>() {
@Override
public String call(Integer item1, Integer item2) {
return item1+"and"+item2;
}
})
.subscribe(item->Log.d("JG",item)); //1and4,2and5,3and6
combineLatest
當(dāng)兩個(gè)Observables中的任何一個(gè)發(fā)射了一個(gè)數(shù)據(jù)時(shí)钉鸯,通過(guò)一個(gè)指定的函數(shù)組合每個(gè)Observable發(fā)射的最新數(shù)據(jù)(一共兩個(gè)數(shù)據(jù)),然后發(fā)射這個(gè)函數(shù)的結(jié)果邮辽。類似于zip唠雕,但是,不同的是zip只有在每個(gè)Observable都發(fā)射了數(shù)據(jù)才工作吨述,而combineLatest任何一個(gè)發(fā)射了數(shù)據(jù)都可以工作岩睁,每次與另一個(gè)Observable最近的數(shù)據(jù)壓合。具體請(qǐng)看下面流程圖锐极。
zip工作流程
三:過(guò)濾操作
filter
過(guò)濾數(shù)據(jù)笙僚。內(nèi)部通過(guò)OnSubscribeFilter過(guò)濾數(shù)據(jù)。
Observable.just(3,4,5,6)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer>4;
}
})
.subscribe(item->Log.d("JG",item.toString())); //5,6
ofType
過(guò)濾指定類型的數(shù)據(jù)灵再,與filter類似肋层,
Observable.just(1,2,"3")
.ofType(Integer.class)
.subscribe(item -> Log.d("JG",item.toString()));
take
只發(fā)射開(kāi)始的N項(xiàng)數(shù)據(jù)或者一定時(shí)間內(nèi)的數(shù)據(jù)亿笤。內(nèi)部通過(guò)OperatorTake和OperatorTakeTimed過(guò)濾數(shù)據(jù)。
Observable.just(3,4,5,6)
.take(3)//發(fā)射前三個(gè)數(shù)據(jù)項(xiàng)
.take(100, TimeUnit.MILLISECONDS)//發(fā)射100ms內(nèi)的數(shù)據(jù)
takeLast
只發(fā)射最后的N項(xiàng)數(shù)據(jù)或者一定時(shí)間內(nèi)的數(shù)據(jù)栋猖。內(nèi)部通過(guò)OperatorTakeLast和OperatorTakeLastTimed過(guò)濾數(shù)據(jù)净薛。takeLastBuffer和takeLast類似,不同點(diǎn)在于takeLastBuffer會(huì)收集成List后發(fā)射蒲拉。
Observable.just(3,4,5,6)
.takeLast(3)
.subscribe(integer -> Log.d("JG",integer.toString()));//4,5,6
first/firstOrDefault:
只發(fā)射第一項(xiàng)(或者滿足某個(gè)條件的第一項(xiàng))數(shù)據(jù)肃拜,可以指定默認(rèn)值。
Observable.just(3,4,5,6)
.first()
.subscribe(integer -> Log.d("JG",integer.toString()));//3
Observable.just(3,4,5,6)
.first(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer>3;
}
}) .subscribe(integer -> Log.d("JG",integer.toString()));//4
last/lastOrDefault:
只發(fā)射最后一項(xiàng)(或者滿足某個(gè)條件的最后一項(xiàng))數(shù)據(jù)雌团,可以指定默認(rèn)值燃领。
skip
跳過(guò)開(kāi)始的N項(xiàng)數(shù)據(jù)或者一定時(shí)間內(nèi)的數(shù)據(jù)。內(nèi)部通過(guò)OperatorSkip和OperatorSkipTimed實(shí)現(xiàn)過(guò)濾锦援。
Observable.just(3,4,5,6)
.skip(1)
.subscribe(integer -> Log.d("JG",integer.toString()));//4,5,6
skipLast
跳過(guò)最后的N項(xiàng)數(shù)據(jù)或者一定時(shí)間內(nèi)的數(shù)據(jù)猛蔽。內(nèi)部通過(guò)OperatorSkipLast和OperatorSkipLastTimed實(shí)現(xiàn)過(guò)濾。
Observable.just(3,4,5,6)
.elementAt(2)
.subscribe(item->Log.d("JG",item.toString())); //5
ignoreElements
丟棄所有數(shù)據(jù)灵寺,只發(fā)射錯(cuò)誤或正常終止的通知曼库。內(nèi)部通過(guò)OperatorIgnoreElements實(shí)現(xiàn)。
distinct
過(guò)濾重復(fù)數(shù)據(jù)略板,內(nèi)部通過(guò)OperatorDistinct實(shí)現(xiàn)毁枯。
Observable.just(3,4,5,6,3,3,4,9)
.distinct()
.subscribe(item->Log.d("JG",item.toString())); //3,4,5,6,9
distinctUntilChanged
過(guò)濾掉連續(xù)重復(fù)的數(shù)據(jù)。內(nèi)部通過(guò)OperatorDistinctUntilChanged實(shí)現(xiàn)
Observable.just(3,4,5,6,3,3,4,9)
.distinctUntilChanged()
.subscribe(item->Log.d("JG",item.toString())); //3,4,5,6,3,4,9
throttleFirst:
定期發(fā)射Observable發(fā)射的第一項(xiàng)數(shù)據(jù)叮称。內(nèi)部通過(guò)OperatorThrottleFirst實(shí)現(xiàn)种玛。
Observable.create(subscriber -> {
subscriber.onNext(1);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
subscriber.onNext(2);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
subscriber.onNext(3);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
subscriber.onNext(4);
subscriber.onNext(5);
subscriber.onCompleted();
}).throttleFirst(999, TimeUnit.MILLISECONDS)
.subscribe(item-> Log.d("JG",item.toString())); //結(jié)果為1,3,4
throttleWithTimeout/debounce:
發(fā)射數(shù)據(jù)時(shí),如果兩次數(shù)據(jù)的發(fā)射間隔小于指定時(shí)間瓤檐,就會(huì)丟棄前一次的數(shù)據(jù),直到指定時(shí)間內(nèi)都沒(méi)有新數(shù)據(jù)發(fā)射時(shí)
才進(jìn)行發(fā)射
Observable.create(subscriber -> {
subscriber.onNext(1);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
subscriber.onNext(2);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
subscriber.onNext(3);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
subscriber.onNext(4);
subscriber.onNext(5);
subscriber.onCompleted();
}).debounce(999, TimeUnit.MILLISECONDS)//或者為throttleWithTimeout(1000, TimeUnit.MILLISECONDS)
.subscribe(item-> Log.d("JG",item.toString())); //結(jié)果為3,5
sample/throttleLast:
定期發(fā)射Observable最近的數(shù)據(jù)蒂誉。內(nèi)部通過(guò)OperatorSampleWithTime實(shí)現(xiàn)。
Observable.create(subscriber -> {
subscriber.onNext(1);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
subscriber.onNext(2);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
subscriber.onNext(3);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
subscriber.onNext(4);
subscriber.onNext(5);
subscriber.onCompleted();
}).sample(999, TimeUnit.MILLISECONDS)//或者為throttleLast(1000, TimeUnit.MILLISECONDS)
.subscribe(item-> Log.d("JG",item.toString())); //結(jié)果為2,3,5
timeout:
如果原始Observable過(guò)了指定的一段時(shí)長(zhǎng)沒(méi)有發(fā)射任何數(shù)據(jù)距帅,就發(fā)射一個(gè)異常或者使用備用的Observable括堤。
Observable.create(( subscriber) -> {
subscriber.onNext(1);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
subscriber.onNext(2);
subscriber.onCompleted();
}).timeout(999, TimeUnit.MILLISECONDS,Observable.just(99,100))//如果不指定備用Observable將會(huì)拋出異常
.subscribe(item-> Log.d("JG",item.toString()),error->Log.d("JG","onError")); //結(jié)果為1,99,100 如果不指定備用Observable結(jié)果為1,onError
}
四:切換線程
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
- subscribeOn
- 發(fā)送事件的線程
- observeOn
- 接收事件的線程
線程選項(xiàng)
- Schedulers.io()
- 代表io操作的線程, 通常用于網(wǎng)絡(luò),讀寫(xiě)文件等io密集型的操作
- Schedulers.computation()
- 代表CPU計(jì)算密集型的操作, 例如需要大量計(jì)算的操作
- Schedulers.newThread()
- 代表一個(gè)常規(guī)的新線程
- AndroidSchedulers.mainThread()
- 代表Android的主線程
五:條件/布爾操作
all:
判斷所有的數(shù)據(jù)項(xiàng)是否滿足某個(gè)條件碌秸,內(nèi)部通過(guò)OperatorAll實(shí)現(xiàn)。
Observable.just(2,3,4,5)
.all(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer>3;
}
})
.subscribe(new Action1<Boolean>() {
@Override
public void call(Boolean aBoolean) {
Log.d("JG",aBoolean.toString()); //false
}
});
exists:
判斷是否存在數(shù)據(jù)項(xiàng)滿足某個(gè)條件悄窃。內(nèi)部通過(guò)OperatorAny實(shí)現(xiàn)讥电。
Observable.just(2,3,4,5)
.exists(integer -> integer>3)
.subscribe(aBoolean -> Log.d("JG",aBoolean.toString())); //true
contains:
判斷在發(fā)射的所有數(shù)據(jù)項(xiàng)中是否包含指定的數(shù)據(jù),內(nèi)部調(diào)用的其實(shí)是exists
Observable.just(2,3,4,5)
.contains(3)
.subscribe(aBoolean -> Log.d("JG",aBoolean.toString())); //true
sequenceEqual:
用于判斷兩個(gè)Observable發(fā)射的數(shù)據(jù)是否相同(數(shù)據(jù)轧抗,發(fā)射順序恩敌,終止?fàn)顟B(tài))。
Observable.sequenceEqual(Observable.just(2,3,4,5),Observable.just(2,3,4,5))
.subscribe(aBoolean -> Log.d("JG",aBoolean.toString()));//true
isEmpty:
用于判斷Observable發(fā)射完畢時(shí)横媚,有沒(méi)有發(fā)射數(shù)據(jù)纠炮。有數(shù)據(jù)false月趟,如果只收到了onComplete通知?jiǎng)t為true。
Observable.just(3,4,5,6)
.isEmpty()
.subscribe(item -> Log.d("JG",item.toString()));//false
amb:
給定多個(gè)Observable恢口,只讓第一個(gè)發(fā)射數(shù)據(jù)的Observable發(fā)射全部數(shù)據(jù)孝宗,其他Observable將會(huì)被忽略。
Observable<Integer> observable1=Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
subscriber.onError(e);
}
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.computation());
Observable<Integer> observable2=Observable.create(subscriber -> {
subscriber.onNext(3);
subscriber.onNext(4);
subscriber.onCompleted();
});
Observable.amb(observable1,observable2)
.subscribe(integer -> Log.d("JG",integer.toString())); //3,4
switchIfEmpty:
如果原始Observable正常終止后仍然沒(méi)有發(fā)射任何數(shù)據(jù)耕肩,就使用備用的Observable因妇。
Observable.empty()
.switchIfEmpty(Observable.just(2,3,4))
.subscribe(o -> Log.d("JG",o.toString())); //2,3,4
defaultIfEmpty:
如果原始Observable正常終止后仍然沒(méi)有發(fā)射任何數(shù)據(jù),就發(fā)射一個(gè)默認(rèn)值,內(nèi)部調(diào)用的switchIfEmpty猿诸。
takeUntil:
當(dāng)發(fā)射的數(shù)據(jù)滿足某個(gè)條件后(包含該數(shù)據(jù))婚被,或者第二個(gè)Observable發(fā)送完畢,終止第一個(gè)Observable發(fā)送數(shù)據(jù)梳虽。
Observable.just(2,3,4,5)
.takeUntil(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer==4;
}
}).subscribe(integer -> Log.d("JG",integer.toString())); //2,3,4
takeWhile:
當(dāng)發(fā)射的數(shù)據(jù)滿足某個(gè)條件時(shí)(不包含該數(shù)據(jù))址芯,Observable終止發(fā)送數(shù)據(jù)。
Observable.just(2,3,4,5)
.takeWhile(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer==4;
}
})
.subscribe(integer -> Log.d("JG",integer.toString())); //2,3
skipUntil:
丟棄Observable發(fā)射的數(shù)據(jù)怖辆,直到第二個(gè)Observable發(fā)送數(shù)據(jù)是复。(丟棄條件數(shù)據(jù))
skipWhile:
丟棄Observable發(fā)射的數(shù)據(jù),直到一個(gè)指定的條件不成立(不丟棄條件數(shù)據(jù))
六:聚合操作
reduce:
對(duì)序列使用reduce()函數(shù)并發(fā)射最終的結(jié)果,內(nèi)部使用OnSubscribeReduce實(shí)現(xiàn)竖螃。
Observable.just(2,3,4,5)
.reduce(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer sum, Integer item) {
return sum+item;
}
})
.subscribe(integer -> Log.d("JG",integer.toString()));//14
collect:
使用collect收集數(shù)據(jù)到一個(gè)可變的數(shù)據(jù)結(jié)構(gòu)淑廊。
Observable.just(3,4,5,6)
.collect(new Func0<List<Integer>>() { //創(chuàng)建數(shù)據(jù)結(jié)構(gòu)
@Override
public List<Integer> call() {
return new ArrayList<Integer>();
}
}, new Action2<List<Integer>, Integer>() { //收集器
@Override
public void call(List<Integer> integers, Integer integer) {
integers.add(integer);
}
})
.subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
}
});
count/countLong:
計(jì)算發(fā)射的數(shù)量,內(nèi)部調(diào)用的是reduce.
doOnNext()
允許我們?cè)诿看屋敵鲆粋€(gè)元素之前做一些額外的事情特咆。
Observable.just(list).flatMap(new Function<List<String>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(List<String> strings) throws Exception {
return Observable.fromIterable(strings);
}
}).take(5).doOnNext(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
System.out.println("準(zhǔn)備工作");
}
}).subscribe(new Consumer<Object>() {
@Override
public void accept(Object s) throws Exception {
System.out.println((String)s);
}
});
七:轉(zhuǎn)換操作
toList:
收集原始Observable發(fā)射的所有數(shù)據(jù)到一個(gè)列表季惩,然后返回這個(gè)列表.
Observable.just(2,3,4,5)
.toList()
.subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
}
});
toSortedList:
收集原始Observable發(fā)射的所有數(shù)據(jù)到一個(gè)有序列表,然后返回這個(gè)列表腻格。
Observable.just(6,2,3,4,5)
.toSortedList(new Func2<Integer, Integer, Integer>() {//自定義排序
@Override
public Integer call(Integer integer, Integer integer2) {
return integer-integer2; //>0 升序 画拾,<0 降序
}
})
.subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
Log.d("JG",integers.toString()); // [2, 3, 4, 5, 6]
}
});
toMap:
將序列數(shù)據(jù)轉(zhuǎn)換為一個(gè)Map。我們可以根據(jù)數(shù)據(jù)項(xiàng)生成key和生成value菜职。
Observable.just(6,2,3,4,5)
.toMap(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return "key:" + integer; //根據(jù)數(shù)據(jù)項(xiàng)生成map的key
}
}, new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return "value:"+integer; //根據(jù)數(shù)據(jù)項(xiàng)生成map的kvalue
}
}).subscribe(new Action1<Map<String, String>>() {
@Override
public void call(Map<String, String> stringStringMap) {
Log.d("JG",stringStringMap.toString()); // {key:6=value:6, key:5=value:5, key:4=value:4, key:2=value:2, key:3=value:3}
}
});
toMultiMap
: 類似于toMap青抛,不同的地方在于map的value是一個(gè)集合。
八:變換操作
map
: 對(duì)Observable發(fā)射的每一項(xiàng)數(shù)據(jù)都應(yīng)用一個(gè)函數(shù)來(lái)變換酬核。
Observable.just(6,2,3,4,5)
.map(integer -> "item:"+integer)
.subscribe(s -> Log.d("JG",s));//item:6,item:2....
cast:
在發(fā)射之前強(qiáng)制將Observable發(fā)射的所有數(shù)據(jù)轉(zhuǎn)換為指定類型
flatMap:
將Observable發(fā)射的數(shù)據(jù)變換為Observables集合蜜另,然后將這些Observable發(fā)射的數(shù)據(jù)平坦化的放進(jìn)一個(gè)單獨(dú)的Observable,內(nèi)部采用merge合并嫡意。
Observable.just(2,3,5)
.flatMap(new Func1<Integer, Observable<String>>() {
@Override
public Observable<String> call(Integer integer) {
return Observable.create(subscriber -> {
subscriber.onNext(integer*10+"");
subscriber.onNext(integer*100+"");
subscriber.onCompleted();
});
}
})
.subscribe(o -> Log.d("JG",o)) //20,200,30,300,50,500
flatMapIterable:
和flatMap的作用一樣举瑰,只不過(guò)生成的是Iterable而不是Observable。
Observable.just(2,3,5)
.flatMapIterable(new Func1<Integer, Iterable<String>>() {
@Override
public Iterable<String> call(Integer integer) {
return Arrays.asList(integer*10+"",integer*100+"");
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
}
});
concatMap:
類似于flatMap蔬螟,由于內(nèi)部使用concat合并此迅,所以是按照順序連接發(fā)射。
switchMap:
和flatMap很像,將Observable發(fā)射的數(shù)據(jù)變換為Observables集合耸序,當(dāng)原始Observable發(fā)射一個(gè)新的數(shù)據(jù)(Observable)時(shí)忍些,它將取消訂閱前一個(gè)Observable。
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for(int i=1;i<4;i++){
subscriber.onNext(i);
Utils.sleep(500,subscriber);//線程休眠500ms
}
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.newThread())
.switchMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
//每當(dāng)接收到新的數(shù)據(jù)佑吝,之前的Observable將會(huì)被取消訂閱
return Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(integer*10);
Utils.sleep(500,subscriber);
subscriber.onNext(integer*100);
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.newThread());
}
})
.subscribe(s -> Log.d("JG",s.toString()));//10,20,30,300
scan:
與reduce很像坐昙,對(duì)Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù),然后按順序依次發(fā)射每一個(gè)值芋忿。
Observable.just(2,3,5)
.scan(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer sum, Integer item) {
return sum+item;
}
})
.subscribe(integer -> Log.d("JG",integer.toString())) //2,5,10
Observable.just(1, 2, 3, 4, 5)
.scan(-1,new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer sum, Integer item) {
return sum + item;
}
})
groupBy:
將Observable分拆為Observable集合炸客,將原始Observable發(fā)射的數(shù)據(jù)按Key分組,每一個(gè)Observable發(fā)射一組不同的數(shù)據(jù)戈钢。
Observable.just(2,3,5,6)
.groupBy(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {//分組
return integer%2==0?"偶數(shù)":"奇數(shù)";
}
})
.subscribe(new Action1<GroupedObservable<String, Integer>>() {
@Override
public void call(GroupedObservable<String, Integer> o) {
o.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("JG",o.getKey()+":"+integer.toString()); //偶數(shù):2痹仙,奇數(shù):3,...
}
});
}
})
buffer:
它定期從Observable收集數(shù)據(jù)到一個(gè)集合殉了,然后把這些數(shù)據(jù)集合打包發(fā)射开仰,而不是一次發(fā)射一個(gè)
Observable.just(2,3,5,6)
.buffer(3)
.subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
}
})
window:
定期將來(lái)自O(shè)bservable的數(shù)據(jù)分拆成一些Observable窗口,然后發(fā)射這些窗口薪铜,而不是每次發(fā)射一項(xiàng)众弓。
Observable.just(2,3,5,6)
.window(3)
.subscribe(new Action1<Observable<Integer>>() {
@Override
public void call(Observable<Integer> integerObservable) {
integerObservable.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
}
});
}
})
九:錯(cuò)誤處理/重試機(jī)制
onErrorResumeNext:
當(dāng)原始Observable在遇到錯(cuò)誤時(shí),使用備用Observable隔箍。谓娃。
Observable.just(1,"2",3)
.cast(Integer.class)
.onErrorResumeNext(Observable.just(1,2,3))
.subscribe(integer -> Log.d("JG",integer.toString())) //1,2,3
;
onExceptionResumeNext:
當(dāng)原始Observable在遇到異常時(shí),使用備用的Observable蜒滩。與onErrorResumeNext類似滨达,區(qū)別在于onErrorResumeNext可以處理所有的錯(cuò)誤,onExceptionResumeNext只能處理異常俯艰。
onErrorReturn:
當(dāng)原始Observable在遇到錯(cuò)誤時(shí)發(fā)射一個(gè)特定的數(shù)據(jù)捡遍。
Observable.just(1,"2",3)
.cast(Integer.class)
.onErrorReturn(new Func1<Throwable, Integer>() {
@Override
public Integer call(Throwable throwable) {
return 4;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("JG",integer.toString());1,4
}
});
retry:
當(dāng)原始Observable在遇到錯(cuò)誤時(shí)進(jìn)行重試。
Observable.just(1,"2",3)
.cast(Integer.class)
.retry(3)
.subscribe(integer -> Log.d("JG",integer.toString()),throwable -> Log.d("JG","onError"))
;//1,1,1,1,onError
retryWhen
: 當(dāng)原始Observable在遇到錯(cuò)誤竹握,將錯(cuò)誤傳遞給另一個(gè)Observable來(lái)決定是否要重新訂閱這個(gè)Observable,內(nèi)部調(diào)用的是retry画株。
Observable.just(1,"2",3)
.cast(Integer.class)
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<Long>>() {
@Override
public Observable<Long> call(Observable<? extends Throwable> observable) {
return Observable.timer(1, TimeUnit.SECONDS);
}
})
.subscribe(integer -> Log.d("JG",integer.toString()),throwable -> Log.d("JG","onError"));
//1,1
十:連接操作
ConnectableObservable與普通的Observable差不多,但是可連接的Observable在被訂閱時(shí)并不開(kāi)始發(fā)射數(shù)據(jù)啦辐,只有在它的connect()被調(diào)用時(shí)才開(kāi)始污秆。用這種方法,你可以等所有的潛在訂閱者都訂閱了這個(gè)Observable之后才開(kāi)始發(fā)射數(shù)據(jù)昧甘。
ConnectableObservable.connect()
指示一個(gè)可連接的Observable開(kāi)始發(fā)射數(shù)據(jù).
Observable.publish()
將一個(gè)Observable轉(zhuǎn)換為一個(gè)可連接的Observable
Observable.replay()
確保所有的訂閱者看到相同的數(shù)據(jù)序列的ConnectableObservable,即使它們?cè)贠bservable開(kāi)始發(fā)射數(shù)據(jù)之后才訂閱战得。
ConnectableObservable.refCount()
讓一個(gè)可連接的Observable表現(xiàn)得像一個(gè)普通的Observable充边。
ConnectableObservable<Integer> co= Observable.just(1,2,3)
.publish();
co .subscribe(integer -> Log.d("JG",integer.toString()) );
co.connect();//此時(shí)開(kāi)始發(fā)射數(shù)據(jù)
十一:阻塞操作
BlockingObservable是一個(gè)阻塞的Observable。普通的Observable 轉(zhuǎn)換為 BlockingObservable,可以使用 Observable.toBlocking(?)方法或者BlockingObservable.from(?)方法浇冰。內(nèi)部通過(guò)CountDownLatch實(shí)現(xiàn)了阻塞操作贬媒。。
以下的操作符可以用于BlockingObservable肘习,如果是普通的Observable际乘,務(wù)必使用Observable.toBlocking()轉(zhuǎn)為阻塞Observable后使用,否則達(dá)不到預(yù)期的效果漂佩。
在Rxjava1中的BlockingObservable已經(jīng)在Rxjava2中去掉了脖含,在Rxjava2中已經(jīng)集成到了Observable中。
名稱 | 解析 |
---|---|
blockingForEach() | 對(duì)Observable發(fā)射的每一項(xiàng)數(shù)據(jù)調(diào)用一個(gè)方法投蝉,會(huì)阻塞直到Observable完成 |
blockingFirst() | 阻塞直到Observable發(fā)射了一個(gè)數(shù)據(jù)养葵,然后返回第一項(xiàng)數(shù)據(jù) |
blockingMostRecent() | 返回一個(gè)總是返回Observable最近發(fā)射的數(shù)據(jù)的iterable |
blockingLatest() | 返回一個(gè)iterable,會(huì)阻塞直到或者除非Observable發(fā)射了一個(gè)iterable沒(méi)有返回的值瘩缆,然后返回這個(gè)值 |
blockingNext() | 返回一個(gè)iterable,阻塞直到返回另外一個(gè)值 |
blockingLast() | 阻塞直到Observable終止关拒,然后返回最后一項(xiàng)數(shù)據(jù) |
blockingIterable() | 將Observable轉(zhuǎn)換返回一個(gè)iterable. |
blockingSingle() | 如果Observable終止時(shí)只發(fā)射了一個(gè)值,返回那個(gè)值庸娱,否則拋出異常 |
blockingSubscribe() | 在當(dāng)前線程訂閱着绊,和forEach類似 |
十二:工具集
materialize:
將Observable轉(zhuǎn)換成一個(gè)通知列表。
Observable.just(1,2,3)
.materialize()
.subscribe(new Action1<Notification<Integer>>() {
@Override
public void call(Notification<Integer> notification) {
Log.d("JG",notification.getKind()+" "+notification.getValue());
//OnNext 1
//OnNext 2
//OnNext 3
//OnCompleted null
}
});
dematerialize:
與上面的作用相反熟尉,將通知逆轉(zhuǎn)回一個(gè)Observable归露。
timestamp:
給Observable發(fā)射的每個(gè)數(shù)據(jù)項(xiàng)添加一個(gè)時(shí)間戳。
Observable.just(1,2,3)
.timestamp()
.subscribe(new Action1<Timestamped<Integer>>() {
@Override
public void call(Timestamped<Integer> timestamped) {
Log.d("JG",timestamped.getTimestampMillis()+" "+timestamped.getValue());
//1472627510548 1
//1472627510549 2
//1472627510549 3
}
});
timeInterval:
給Observable發(fā)射的兩個(gè)數(shù)據(jù)項(xiàng)間添加一個(gè)時(shí)間差臣樱,實(shí)現(xiàn)在OperatorTimeInterval中
timeInterval
serialize:
強(qiáng)制Observable按次序發(fā)射數(shù)據(jù)并且要求功能是完好的
cache:
緩存Observable發(fā)射的數(shù)據(jù)序列并發(fā)射相同的數(shù)據(jù)序列給后續(xù)的訂閱者
observeOn:
指定觀察者觀察Observable的調(diào)度器
subscribeOn:
指定Observable執(zhí)行任務(wù)的調(diào)度器
doOnEach:
注冊(cè)一個(gè)動(dòng)作靶擦,對(duì)Observable發(fā)射的每個(gè)數(shù)據(jù)項(xiàng)使用
Observable.just(2,3)
.doOnEach(new Action1<Notification<? super Integer>>() {
@Override
public void call(Notification<? super Integer> notification) {
Log.d("JG","--doOnEach--"+notification.toString());
}
})
.subscribe(integer -> Log.d("JG",integer.toString()));
//結(jié)果為:
// --doOnEach--[rx.Notification@133c40b0 OnNext 2]
// 2
// --doOnEach--[rx.Notification@133c40b0 OnNext 3]
// 3
// --doOnEach--[rx.Notification@df4db0e OnCompleted]
doOnCompleted':
注冊(cè)一個(gè)動(dòng)作,對(duì)正常完成的Observable使用
doOnError:
注冊(cè)一個(gè)動(dòng)作雇毫,對(duì)發(fā)生錯(cuò)誤的Observable使用
doOnTerminate
:注冊(cè)一個(gè)動(dòng)作玄捕,對(duì)完成的Observable使用,無(wú)論是否發(fā)生錯(cuò)誤
Observable.just(2,3)
.doOnTerminate(new Action0() {
@Override
public void call() {
Log.d("JG","--doOnTerminate--");
}
})
.subscribe(integer -> Log.d("JG",integer.toString()));
// 2 , 3 , --doOnTerminate--
doOnSubscribe:
注冊(cè)一個(gè)動(dòng)作棚放,在觀察者訂閱時(shí)使用枚粘。內(nèi)部由OperatorDoOnSubscribe實(shí)現(xiàn)doOnSubscribe
doOnUnsubscribe
: 注冊(cè)一個(gè)動(dòng)作,在觀察者取消訂閱時(shí)使用飘蚯。內(nèi)部由OperatorDoOnUnsubscribe實(shí)現(xiàn)馍迄,在call中加入一個(gè)解綁動(dòng)作。
doOnUnsubscribe
finallyDo/doAfterTerminate:
注冊(cè)一個(gè)動(dòng)作局骤,在Observable完成時(shí)使用
Observable.just(2,3)
.doAfterTerminate(new Action0() {
@Override
public void call() {
Log.d("JG","--doAfterTerminate--");
}
})
.subscribe(integer -> Log.d("JG",integer.toString()));
//2,3, --doAfterTerminate--
delay:
延時(shí)發(fā)射Observable的結(jié)果攀圈。即讓原始Observable在發(fā)射每項(xiàng)數(shù)據(jù)之前都暫停一段指定的時(shí)間段。效果是Observable發(fā)射的數(shù)據(jù)項(xiàng)在時(shí)間上向前整體平移了一個(gè)增量(除了onError峦甩,它會(huì)即時(shí)通知)赘来。
delaySubscription:
延時(shí)處理訂閱請(qǐng)求现喳。實(shí)現(xiàn)在OnSubscribeDelaySubscription中
delaySubscription
using
: 創(chuàng)建一個(gè)只在Observable生命周期存在的資源,當(dāng)Observable終止時(shí)這個(gè)資源會(huì)被自動(dòng)釋放犬辰。
Observable.using(new Func0<File>() {//資源工廠
@Override
public File call() {
File file = new File(getCacheDir(), "a.txt");
if(!file.exists()){
try {
Log.d("JG","--create--");
file.createNewFile();
} catch (IOException e) {
e.printStackTrace();
}
}
return file;
}
}, new Func1<File, Observable<String>>() { //Observable
@Override
public Observable<String> call(File file) {
return Observable.just(file.exists() ? "exist" : "no exist");
}
}, new Action1<File>() {//釋放資源動(dòng)作
@Override
public void call(File file) {
if(file!=null&&file.exists()){
Log.d("JG","--delete--");
file.delete();
}
}
})
.subscribe(s -> Log.d("JG",s))
;
//--create--
//exist
//--delete--
single/singleOrDefault:
強(qiáng)制返回單個(gè)數(shù)據(jù)嗦篱,否則拋出異常或默認(rèn)數(shù)據(jù)幌缝。
十三:Flowable (2.0出來(lái)的) 非操作符
Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Log.d(TAG, "emit complete");
emitter.onComplete();
}
}, BackpressureStrategy.ERROR); //增加了一個(gè)參數(shù)
Subscriber<Integer> downstream = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
s.request(Long.MAX_VALUE); //注意這句代碼
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
s.request(Long.MAX_VALUE); //注意這句代碼
處理事件 默認(rèn)處理 128 灸促,當(dāng)發(fā)送的事件 > 處理的事件 MissingBackpressureException
異常
BackpressureStrategy
- BackpressureStrategy.BUFFER
- 沒(méi)有128 的限制
- BackpressureStrategy.ERROR
- 默認(rèn)128
- BackpressureStrategy.DROP
- 直接把存不下的事件丟棄
- BackpressureStrategy.LATEST
- 只保留最新的事件
寫(xiě)在最后
感謝作者maplejaw
RxJava操作符大全
推薦Rxjava系列
給初學(xué)者的RxJava2.0教程
官網(wǎng)學(xué)習(xí)筆記