接續(xù)上篇: Rxjava2 Observable的輔助操作詳解及實例(一)
8. TimeInterval
將一個發(fā)射數(shù)據(jù)的Observable轉(zhuǎn)換為發(fā)射那些數(shù)據(jù)發(fā)射時間間隔的Observable旭从。
TimeInterval
操作符攔截原始Observable發(fā)射的數(shù)據(jù)項胧谈,替換為發(fā)射表示相鄰發(fā)射物時間間隔的對象。
這個操作符將原始 Observable 轉(zhuǎn)換為另一個 Observable ,后者發(fā)射一個標志替換前者的數(shù)據(jù)項扮念,這個標志表示前者的兩個連續(xù)發(fā)射物之間流逝的時間長度。新的Observable的第一個發(fā)射物表示的是在觀察者訂閱原始 Observable 到原始 Observable 發(fā)射它的第一項數(shù)據(jù)之間流逝的時間長度。不存在與原始 Observable 發(fā)射最后一項數(shù)據(jù)和發(fā)射 onCompleted 通知之間時長對應(yīng)的發(fā)射物。
示例代碼:
/**
* 1. timeInterval(Scheduler scheduler)
* scheduler: 可選參數(shù)削锰,指定調(diào)度線程
* 接收原始數(shù)據(jù)項,發(fā)射射表示相鄰發(fā)射物時間間隔的對象
*/
Observable.intervalRange(1, 10, 100, 100, TimeUnit.MILLISECONDS)
.timeInterval()
// .timeInterval(Schedulers.newThread()) // 指定工作線程
.subscribe(new Observer<Timed<Long>>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe(1)");
}
@Override
public void onNext(Timed<Long> longTimed) {
long time = longTimed.time(); // 連續(xù)數(shù)據(jù)間的間隔時間
TimeUnit unit = longTimed.unit(); // 連續(xù)數(shù)據(jù)間的時間間隔單位
Long value = longTimed.value(); // Observable發(fā)送的數(shù)據(jù)項
System.out.println("--> onNext(1): " + longTimed.toString());
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError(1): " + e);
}
@Override
public void onComplete() {
System.out.println("--> onComplete(1)");
}
});
System.in.read();
System.out.println("-------------------------------------------------");
/**
* 2. timeInterval(TimeUnit unit, Scheduler scheduler)
* 指定時間間隔單位和指定工作線程毕莱,接收原始數(shù)據(jù)項器贩,發(fā)射射表示相鄰發(fā)射物時間間隔的對象
*/
Observable.intervalRange(1, 10, 1000, 1200, TimeUnit.MILLISECONDS)
// .timeInterval(TimeUnit.SECONDS) // 指定時間間隔單位
.timeInterval(TimeUnit.SECONDS, Schedulers.newThread()) // 指定時間間隔單位和指定工作線程
.subscribe(new Observer<Timed<Long>>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe(2)");
}
@Override
public void onNext(Timed<Long> longTimed) {
System.out.println("--> onNext(2): " + longTimed.toString());
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError(2): " + e);
}
@Override
public void onComplete() {
System.out.println("--> onComplete(2)");
}
});
System.in.read();
輸出:
--> onSubscribe(1)
--> onNext(1): Timed[time=104, unit=MILLISECONDS, value=1]
--> onNext(1): Timed[time=100, unit=MILLISECONDS, value=2]
--> onNext(1): Timed[time=100, unit=MILLISECONDS, value=3]
--> onNext(1): Timed[time=100, unit=MILLISECONDS, value=4]
--> onNext(1): Timed[time=100, unit=MILLISECONDS, value=5]
--> onNext(1): Timed[time=100, unit=MILLISECONDS, value=6]
--> onNext(1): Timed[time=100, unit=MILLISECONDS, value=7]
--> onNext(1): Timed[time=100, unit=MILLISECONDS, value=8]
--> onNext(1): Timed[time=100, unit=MILLISECONDS, value=9]
--> onNext(1): Timed[time=100, unit=MILLISECONDS, value=10]
--> onComplete(1)
-------------------------------------------------
--> onSubscribe(2)
--> onNext(2): Timed[time=1, unit=SECONDS, value=1]
--> onNext(2): Timed[time=1, unit=SECONDS, value=2]
--> onNext(2): Timed[time=1, unit=SECONDS, value=3]
--> onNext(2): Timed[time=1, unit=SECONDS, value=4]
--> onNext(2): Timed[time=1, unit=SECONDS, value=5]
--> onNext(2): Timed[time=2, unit=SECONDS, value=6]
--> onNext(2): Timed[time=1, unit=SECONDS, value=7]
--> onNext(2): Timed[time=1, unit=SECONDS, value=8]
--> onNext(2): Timed[time=1, unit=SECONDS, value=9]
--> onNext(2): Timed[time=1, unit=SECONDS, value=10]
--> onComplete(2)
Javadoc: timeInterval()
Javadoc: timeInterval(Scheduler scheduler)
Javadoc: timeInterval(TimeUnit unit)
Javadoc: timeInterval(TimeUnit unit, Scheduler scheduler)
9. Timeout
對原始Observable的一個鏡像,如果過了一個指定的時長仍沒有發(fā)射數(shù)據(jù)朋截,它會發(fā)一個錯誤通知蛹稍。
RxJava中的實現(xiàn)為 timeout
操作符,具有多個不同的變體部服。
9.1 timeout(timeout, timeUnit)
如果原始 Observable 過了指定的一段時長沒有發(fā)射任何數(shù)據(jù)唆姐,Timeout
操作符會以一個 onError
通知終止這個Observable。
示例代碼:
/**
* 1. timeout(long timeout, TimeUnit timeUnit)
* 接受一個時長參數(shù)饲宿,如果在指定的時間段內(nèi)沒有數(shù)據(jù)項發(fā)射,將會發(fā)射一個Error通知胆描,
* 或者每當原始Observable發(fā)射了一項數(shù)據(jù)瘫想, timeout 就啟動一個計時器,
* 如果計時器超過了指定指定的時長而原始Observable沒有發(fā)射另一項數(shù)據(jù)昌讲,
* 就拋出 TimeoutException 国夜,以一個錯誤通知終止Observable。
*/
Observable.create(new ObservableOnSubscribe<Long>() {
@Override
public void subscribe(ObservableEmitter<Long> emitter) throws Exception {
// Thread.sleep(2000); // 延遲2秒后發(fā)射數(shù)據(jù)短绸,此時會有TimeoutException
emitter.onNext(1L);
Thread.sleep(2000); // 延遲2秒后發(fā)射數(shù)據(jù)车吹,此時會有TimeoutException
emitter.onNext(2L);
emitter.onComplete();
}
}).timeout(1, TimeUnit.SECONDS) // 指定超時時間段為1秒
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe(1)");
}
@Override
public void onNext(Long aLong) {
System.out.println("--> onNext(1): " + aLong);
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError(1): " + e);
}
@Override
public void onComplete() {
System.out.println("--> onComplete(1)");
}
});
System.in.read();
輸出:
--> onSubscribe(1)
--> onNext(1): 1
--> onError(1): java.util.concurrent.TimeoutException: The source did not signal an event for 1 seconds and has been terminated.
9.2 timeout(timeout, timeUnit, scheduler, other)
在指定時間段后超時時會切換到使用一個你指定的備用的 Observable,而不是發(fā)onError
通知醋闭,可以通過scheduler
來指定工作線程窄驹。
示例代碼:
/**
* 2. timeout(long timeout, TimeUnit timeUnit,
* Scheduler scheduler, // 可選參數(shù),指定線程調(diào)度器
* ObservableSource other // 可選參數(shù)证逻,超時備用Observable
* )
*
* 在指定時間段后超時時會切換到使用一個你指定的備用的Observable乐埠,而不是發(fā)onError通知。
*/
Observable.create(new ObservableOnSubscribe<Long>() {
@Override
public void subscribe(ObservableEmitter<Long> emitter) throws Exception {
// Thread.sleep(2000); // 延遲2秒后發(fā)射數(shù)據(jù),此時會有TimeoutException
emitter.onNext(1L);
Thread.sleep(2000); // 延遲2秒后發(fā)射數(shù)據(jù)丈咐,此時會有TimeoutException
emitter.onNext(2L);
emitter.onComplete();
}
}).timeout(1, TimeUnit.SECONDS, // 指定超時時間段為1秒
Schedulers.newThread(), // 指定工作線程為子線程
Observable.just(888L)) // 超時后默認發(fā)射的Observable
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe(2)");
}
@Override
public void onNext(Long aLong) {
System.out.println("--> onNext(2): " + aLong);
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError(2): " + e);
}
@Override
public void onComplete() {
System.out.println("--> onComplete(2)");
}
});
System.in.read();
輸出:
--> onSubscribe(2)
--> onNext(2): 1
--> onNext(2): 888
--> onComplete(2)
Javadoc: timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler, ObservableSource other)
9.3 timeout(Function itemTimeoutIndicator, ObservableSource other)
使用一個函數(shù) itemTimeoutIndicator
針對原始 Observable 的每一項返回一個 Observable瑞眼,如果當這個 Observable 終止時原始 Observable 還沒有發(fā)射另一項數(shù)據(jù),就會認為是超時了棵逊,如果沒有指定超時備用的 other
伤疙,就拋出 TimeoutException
,以一個錯誤通知終止 bservable辆影,否則超時后發(fā)射備用的 Observable徒像。
示例代碼:
/**
* 3. timeout(Function<T, ObservableSource> itemTimeoutIndicator
* ObservableSource other // 可選參數(shù),當超時后發(fā)射的備用Observable
* )
* 對原始Observable的每一項返回一個Observable秸歧,
* 如果當這個Observable終止時原始Observable還沒有發(fā)射另一項數(shù)據(jù)厨姚,就會認為是超時了,
* 如果沒有指定超時備用的Observable键菱,就拋出TimeoutException谬墙,以一個錯誤通知終止Observable,
* 否則超時后發(fā)射備用的Observable经备。
*/
Observable.create(new ObservableOnSubscribe<Long>() {
@Override
public void subscribe(ObservableEmitter<Long> emitter) throws Exception {
emitter.onNext(1L);
Thread.sleep(3000); // 延遲3秒后發(fā)射數(shù)據(jù)拭抬,此時會有TimeoutException
emitter.onNext(2L);
emitter.onComplete();
}
}).timeout(new Function<Long, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(Long aLong) throws Exception {
// 為每一個原始數(shù)據(jù)發(fā)射一個Observable來指示下一個數(shù)據(jù)發(fā)射的Timeout,這里指定1秒超時時間
return Observable.timer(1, TimeUnit.SECONDS);
}
}, Observable.just(888L)) // 超時后默認發(fā)射的Observable
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe(3)");
}
@Override
public void onNext(Long aLong) {
System.out.println("--> onNext(3): " + aLong);
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError(3): " + e);
}
@Override
public void onComplete() {
System.out.println("--> onComplete(3)");
}
});
System.in.read();
輸出:
--> onSubscribe(3)
--> onNext(3): 1
--> onNext(3): 888
--> onComplete(3)
Javadoc: timeout(Function<T, ObservableSource> itemTimeoutIndicator)
Javadoc: timeout(Function<T, ObservableSource> itemTimeoutIndicator, ObservableSource other)
10. Timestamp
給Observable發(fā)射的數(shù)據(jù)項附加一個指定的時間戳侵蒙。
timestamp
造虎,它將一個發(fā)射Timed
類型數(shù)據(jù)的Observable轉(zhuǎn)換為一個發(fā)射類型為 Timestamped<Timed>
的數(shù)據(jù)的Observable,每一項都包含數(shù)據(jù)的原始發(fā)射時間信息和原始數(shù)據(jù)纷闺。
示例代碼:
/**
* 1. timestamp(Scheduler scheduler)
* scheduler: 可選參數(shù)算凿,指定線程調(diào)度器
*
* 給Observable發(fā)射的數(shù)據(jù)項附加一個時間戳信息
*/
Observable.intervalRange(1, 5, 1, 100, TimeUnit.MILLISECONDS)
.timestamp(Schedulers.newThread()) // 指定在子線程調(diào)度處理
.subscribe(new Observer<Timed<Long>>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe(1)");
}
@Override
public void onNext(Timed<Long> longTimed) {
long time = longTimed.time(); // 連續(xù)數(shù)據(jù)間的間隔時間
TimeUnit unit = longTimed.unit(); // 連續(xù)數(shù)據(jù)間的時間間隔單位
Long value = longTimed.value(); // Observable發(fā)送的數(shù)據(jù)項
System.out.println("--> onNext(1): " + longTimed);
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError(1): " + e);
}
@Override
public void onComplete() {
System.out.println("--> onComplete(1)");
}
});
System.in.read();
System.out.println("-------------------------------------------");
/**
* 2. timestamp(TimeUnit unit, Scheduler scheduler)
* scheduler: 可選參數(shù),指定線程調(diào)度器
*
* 給Observable發(fā)射的數(shù)據(jù)項附加一個指定單位的時間戳信息
*/
Observable.intervalRange(1, 5, 1, 1200, TimeUnit.MILLISECONDS)
.timestamp(TimeUnit.SECONDS, Schedulers.newThread()) // 指定時間單位為秒犁功,在子線程調(diào)度處理
.subscribe(new Observer<Timed<Long>>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe(2)");
}
@Override
public void onNext(Timed<Long> longTimed) {
System.out.println("--> onNext(2): " + longTimed);
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError(2): " + e);
}
@Override
public void onComplete() {
System.out.println("--> onComplete(2)");
}
});
System.in.read();
輸出:
--> onSubscribe(1)
--> onNext(1): Timed[time=1577455367446, unit=MILLISECONDS, value=1]
--> onNext(1): Timed[time=1577455367545, unit=MILLISECONDS, value=2]
--> onNext(1): Timed[time=1577455367645, unit=MILLISECONDS, value=3]
--> onNext(1): Timed[time=1577455367745, unit=MILLISECONDS, value=4]
--> onNext(1): Timed[time=1577455367845, unit=MILLISECONDS, value=5]
--> onComplete(1)
-------------------------------------------
--> onSubscribe(2)
--> onNext(2): Timed[time=1577455369, unit=SECONDS, value=1]
--> onNext(2): Timed[time=1577455370, unit=SECONDS, value=2]
--> onNext(2): Timed[time=1577455371, unit=SECONDS, value=3]
--> onNext(2): Timed[time=1577455373, unit=SECONDS, value=4]
--> onNext(2): Timed[time=1577455374, unit=SECONDS, value=5]
--> onComplete(2)
Javadoc: timestamp()
Javadoc: timestamp(Scheduler scheduler)
Javadoc: timestamp(TimeUnit unit)
Javadoc: timestamp(TimeUnit unit, Scheduler scheduler)
11. Using
創(chuàng)建一個只在Observable生命周期內(nèi)存在的一次性資源氓轰。
Using
操作符讓你可以指示Observable創(chuàng)建一個只在它的生命周期內(nèi)存在的資源,當Observable終止時這個資源會被自動釋放浸卦。
using 操作符接受三個參數(shù):
-
observableFactory
:一個用戶創(chuàng)建一次性資源的工廠函數(shù) -
resourceFactory
:一個用于創(chuàng)建Observable的工廠函數(shù) -
disposeFunction
:一個用于釋放資源的函數(shù)
當一個觀察者訂閱 using 返回的Observable時署鸡, using 將會使用Observable工廠函數(shù)創(chuàng)建觀察者要觀察的Observable,同時使用資源工廠函數(shù)創(chuàng)建一個你想要創(chuàng)建的資源限嫌。當觀察者取消訂閱這個Observable時靴庆,或者當觀察者終止時(無論是正常終止還是因錯誤而終止), using 使用第三個函數(shù)釋放它創(chuàng)建的資源怒医。
示例代碼:
/**
* 用于在Observable的生命周期內(nèi)存在的資源對象
*/
class MyResource {
private String resource;
public MyResource(String resource) {
this.resource = resource;
}
@Override
public String toString() {
return "MyResource{" +
"resource='" + resource + '\'' +
'}';
}
public void releaseResource() {
System.out.println("----> MyResource resource is release. ");
resource = null;
}
}
/**
* 1. using(Callable resourceSupplier, Function sourceSupplier, Consumer disposer, boolean eager)
*
* resourceSupplier: // 一個用戶創(chuàng)建一次性資源的工廠函數(shù)
* sourceSupplier: // 一個用于創(chuàng)建Observable的工廠函數(shù)
* disposer: // 一個用于釋放資源的函數(shù)
* eager: // 可選參數(shù)炉抒,如果為true的話,則第三個函數(shù)disposer的處理在Observable的結(jié)束前執(zhí)行
*
* 當一個觀察者訂閱 using 返回的Observable時稚叹, using 將會使用Observable工廠函數(shù)創(chuàng)建觀察者要觀察的Observable端礼,
* 同時使用資源工廠函數(shù)創(chuàng)建一個你想要創(chuàng)建的資源禽笑。
* 當觀察者取消訂閱這個Observable時,或者當觀察者終止時(無論是正常終止還是因錯誤而終止)蛤奥,
* using 使用第三個函數(shù)釋放它創(chuàng)建的資源佳镜。
*/
Observable.using(
// 一個用戶創(chuàng)建一次性資源的工廠函數(shù)
new Callable<MyResource>() {
@Override
public MyResource call() throws Exception {
System.out.println("----> resourceSupplier call");
return new MyResource("This is Observable resource!");
}
},
// 一個用于創(chuàng)建Observable的工廠函數(shù),這個函數(shù)返回的Observable就是最終被觀察的Observable
new Function<MyResource, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(MyResource myResource) throws Exception {
System.out.println("----> sourceSupplier apply: " + myResource);
return Observable.rangeLong(1, 5);
}
},
// 一個用于釋放資源的函數(shù)
new Consumer<MyResource>() {
@Override
public void accept(MyResource myResource) throws Exception {
System.out.println("----> disposer accept: ");
myResource.releaseResource();
}
},
// 可選參數(shù)凡桥,如果為true的話蟀伸,則在Observable的結(jié)束前執(zhí)行釋放資源的函數(shù)
true).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe");
}
@Override
public void onNext(Long aLong) {
System.out.println("--> onNext: " + aLong);
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError: " + e);
}
@Override
public void onComplete() {
System.out.println("--> onComplete");
}
});
輸出:
----> resourceSupplier call(1)
----> sourceSupplier apply(1): MyResource{resource='This is Observable resource!'}
--> onSubscribe(1)
--> onNext(1): 1
--> onNext(1): 2
--> onNext(1): 3
--> onNext(1): 4
--> onNext(1): 5
----> disposer accept(1):
----> MyResource resource is release.
--> onComplete
Javadoc: using(Callable resourceSupplier, Function sourceSupplier, Consumer disposer)
Javadoc: using(Callable resourceSupplier, Function sourceSupplier, Consumer disposer, boolean eager)
12. To
將Observable轉(zhuǎn)換為另一個對象或數(shù)據(jù)結(jié)構(gòu)。
將 Observable 或者Observable 發(fā)射的數(shù)據(jù)序列轉(zhuǎn)換為另一個對象或數(shù)據(jù)結(jié)構(gòu)缅刽。它們中的一些會阻塞直到 Observable 終止啊掏,然后生成一個等價的對象或數(shù)據(jù)結(jié)構(gòu);另一些返回一個發(fā)射那個對象或數(shù)據(jù)結(jié)構(gòu)的 Observable衰猛。
由于 rxjava 的 To
操作符中有很多 toXXX
操作符的實現(xiàn)和不同的變體重載迟蜜,此處就不詳細的展開了,有興趣的可以查看官方的API 文檔 詳細參閱啡省。
下面幾個是常見的幾種To
操作符的:
-
toList()
:讓Observable將多項數(shù)據(jù)組合成一個List娜睛,然后調(diào)用一次onNext方法傳遞整個列表。 -
toMap(Function keySelector,Function valueSelector)
:toMap收集原始Observable發(fā)射的所有數(shù)據(jù)項到一個Map(默認是HashMap)然后發(fā)射這個Map卦睹。 你可以提供一個用于生成Map的Key的函數(shù)畦戒,還可以提供一個函數(shù)轉(zhuǎn)換數(shù)據(jù)項到Map存儲的值(默認數(shù)據(jù)項本身就是值)。 -
toSortedList()
: 它會對產(chǎn)生的列表排序结序,默認是自然升序障斋,如果發(fā)射的數(shù)據(jù)項沒有實現(xiàn)Comparable接口价匠,會拋出一個異常伴挚,你也可以傳遞一個函數(shù)作為用于比較兩個數(shù)據(jù)項。 -
toMultimap(Function keySelector, Function valueSelector)
:類似于toMap异赫,不同的是返敬,它生成的這個Map的value類型還是一個ArrayList遂庄。
示例代碼:
/**
* 1. toList()
* 讓Observable將多項數(shù)據(jù)組合成一個List,然后調(diào)用一次onNext方法傳遞整個列表救赐。
*/
range.toList()
.subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
System.out.println("--> toList accept(1): " + integers);
}
});
System.out.println("------------------------------------------");
/**
* 2. toMap(Function<? super T, ? extends K> keySelector,Function<? super T, ? extends V> valueSelector)
* toMap收集原始Observable發(fā)射的所有數(shù)據(jù)項到一個Map(默認是HashMap)然后發(fā)射這個Map涧团。
* 你可以提供一個用于生成Map的Key的函數(shù)只磷,還可以提供一個函數(shù)轉(zhuǎn)換數(shù)據(jù)項到Map存儲的值(默認數(shù)據(jù)項本身就是值)经磅。
*/
range.toMap(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "key" + integer; // 返回一個Map的key
}
}, new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
return integer; // 返回一個Map的value
}
}).subscribe(new SingleObserver<Map<String, Integer>>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe(2)");
}
@Override
public void onSuccess(Map<String, Integer> stringIntegerMap) {
System.out.println("--> onSuccess(2): " + stringIntegerMap);
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError(2): " + e);
}
});
System.out.println("------------------------------------------");
/**
* 3. toSortedList()
* 它會對產(chǎn)生的列表排序,默認是自然升序钮追,如果發(fā)射的數(shù)據(jù)項沒有實現(xiàn)Comparable接口预厌,會拋出一個異常。
* 然而元媚,你也可以傳遞一個函數(shù)作為用于比較兩個數(shù)據(jù)項
*/
Observable.just(5, 3, 8, 6, 9, 10)
.toSortedList()
.subscribe(new SingleObserver<List<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe(3)");
}
@Override
public void onSuccess(List<Integer> integers) {
System.out.println("--> onSuccess(3): " + integers);
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError(3): " + e);
}
});
System.out.println("------------------------------------------");
/**
* 4. toSortedList(Comparator comparator)
*
* 傳遞一個函數(shù)comparator作為用于比較兩個數(shù)據(jù)項轧叽,它會對產(chǎn)生的列表排序
*/
Observable.just(5, 3, 8, 6, 9, 10)
.toSortedList(new Comparator<Integer>() {
@Override
public int compare(Integer o1, Integer o2) {
System.out.println("--> compare: o1 = " + o1 + ", o2 = " + o2);
return o1 - o2; // 比較器的排序邏輯
}
}).subscribe(new SingleObserver<List<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe(4)");
}
@Override
public void onSuccess(List<Integer> integers) {
System.out.println("--> onSuccess(4): " + integers);
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError(4): " + e);
}
});
System.out.println("------------------------------------------");
/**
* 5. toMultimap(Function<T, K> keySelector, Function<T, V> valueSelector)
* 類似于 toMap 苗沧,不同的是,它生成的這個Map的value類型還是一個ArrayList
*/
range.toMultimap(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "key" + integer; // 返回一個Map的key
}
}, new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
return integer; // 返回一個Map的value
}
}).subscribe(new SingleObserver<Map<String, Collection<Integer>>>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe(5)");
}
@Override
public void onSuccess(Map<String, Collection<Integer>> stringCollectionMap) {
System.out.println("--> onSuccess(5): " + stringCollectionMap);
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError(5): " + e);
}
});
輸出:
--> toList accept(1): [1, 2, 3, 4, 5]
------------------------------------------
--> onSubscribe(2)
--> onSuccess(2): {key1=1, key2=2, key5=5, key3=3, key4=4}
------------------------------------------
--> onSubscribe(3)
--> onSuccess(3): [3, 5, 6, 8, 9, 10]
------------------------------------------
--> onSubscribe(4)
--> compare: o1 = 3, o2 = 5
--> compare: o1 = 8, o2 = 3
--> compare: o1 = 8, o2 = 5
--> compare: o1 = 6, o2 = 5
--> compare: o1 = 6, o2 = 8
--> compare: o1 = 9, o2 = 6
--> compare: o1 = 9, o2 = 8
--> compare: o1 = 10, o2 = 6
--> compare: o1 = 10, o2 = 9
--> onSuccess(4): [3, 5, 6, 8, 9, 10]
------------------------------------------
--> onSubscribe(5)
--> onSuccess(5): {key1=[1], key2=[2], key5=[5], key3=[3], key4=[4]}
Javadoc: toList()
Javadoc: toMap(Function keySelector,Function valueSelector)
Javadoc: toSortedList()
Javadoc: toMultimap(Function keySelector, Function valueSelector)
小結(jié)
本節(jié)主要是介紹了 Rxjava
中的各種輔助操作符炭晒,比如延遲待逞、超時,事件監(jiān)聽等相關(guān)的輔助類型的操作网严,這在開發(fā)中是很有用處的识樱。
提示:以上使用的Rxjava2版本: 2.2.12
Rx介紹與講解及完整目錄參考:Rxjava2 介紹與詳解實例
實例代碼: