接續(xù)上篇: Rxjava2 Observable的數(shù)據(jù)變換詳解及實(shí)例(一)
1. Window
定期將來自原始Observable的數(shù)據(jù)分解為一個(gè)Observable窗口,發(fā)射這些窗口,而不是每次發(fā)射一項(xiàng)數(shù)據(jù)畴蹭。
Window
和 Buffer
類似卫枝,但不是發(fā)射來自原始Observable的數(shù)據(jù)包捕虽,它發(fā)射的是 Observables簿盅,這些Observables中的每一個(gè)都發(fā)射原始Observable數(shù)據(jù)的一個(gè)子集诅病,最后發(fā) 射一個(gè) onCompleted 通知。
和 Buffer
一樣卓鹿,Window 有很多變體菱魔,每一種都以自己的方式將原始Observable分解為多個(gè)作為結(jié)果的Observable,每一個(gè)都包含一個(gè)映射原始數(shù)據(jù)的 window
吟孙。用 Window操作符的術(shù)語(yǔ)描述就是澜倦,當(dāng)一個(gè)窗口打開(when a window "opens")意味著一個(gè)新的Observable已經(jīng)發(fā)射 (產(chǎn)生)了,而且這個(gè)Observable開始發(fā)射來自原始Observable的數(shù)據(jù)杰妓;當(dāng)一個(gè)窗口關(guān)閉 (when a window "closes")意味著發(fā)射(產(chǎn)生)的Observable停止發(fā)射原始Observable的數(shù)據(jù)藻治, 并且發(fā)射終止通知 onCompleted 給它的觀察者們。
在RxJava中有許多種Window
操作符的方法稚失。
1.1 window(closingSelector)
window
的這個(gè)方法會(huì)立即打開它的第一個(gè)窗口栋艳。每當(dāng)它觀察到closingSelector
返回的 Observable發(fā)射了一個(gè)對(duì)象時(shí),它就關(guān)閉當(dāng)前打開的窗口并立即打開一個(gè)新窗口句各。用這個(gè)方法吸占,這種 window 變體發(fā)射一系列不重疊的窗口,這些窗口的數(shù)據(jù)集合與原始Observable發(fā)射的數(shù)據(jù)是一一對(duì)應(yīng)的凿宾。
解析: 一開始開啟一個(gè)
window
接收原始數(shù)據(jù)矾屯,每當(dāng)它觀察到closingSelector
返回的 Observable發(fā)射了一個(gè)對(duì)象時(shí),它就關(guān)閉當(dāng)前打開的窗口并取消此時(shí)訂閱closingSelector 的Observable ( 此時(shí)可能是沒有數(shù)據(jù) window
)并立即打開一個(gè)新窗口初厚,注意: 每個(gè)窗口開啟前都會(huì)去訂閱一個(gè)closingSelector
返回的 Observable件蚕。
實(shí)例代碼:
// 1. window(Callable boundary)
// 開啟一個(gè)window,并訂閱觀察boundary返回的Observable發(fā)射了一個(gè)數(shù)據(jù)产禾,
// 則關(guān)閉此window排作,將收集的數(shù)據(jù)以O(shè)bservable發(fā)送, 重新訂閱boundary返回的Observable,開啟新window
Observable.intervalRange(1, 10, 0, 1, TimeUnit.SECONDS)
.window(new Callable<Observable<Long>>() {
@Override
public Observable<Long> call() throws Exception {
System.out.println("--> call(1)");
return Observable.timer(2, TimeUnit.SECONDS); // 兩秒后關(guān)閉當(dāng)前窗口
}
}).subscribe(new Consumer<Observable<Long>>() {
@Override
public void accept(Observable<Long> t) throws Exception {
// 接受每個(gè)window接受的數(shù)據(jù)的Observable
t.subscribe(new Consumer<Long>() {
@Override
public void accept(Long t) throws Exception {
System.out.println("--> accept(1): " + t);
}
});
}
});
輸出:
--> call(1)
--> accept(1): 1
--> accept(1): 2
--> accept(1): 3
--> call(1)
--> accept(1): 4
--> accept(1): 5
--> call(1)
--> accept(1): 6
--> accept(1): 7
--> call(1)
--> accept(1): 8
--> accept(1): 9
--> call(1)
--> accept(1): 10
Javadoc: window(closingSelector)
Javadoc: window(closingSelector, bufferSize)
1.2 window(openingIndicator, closingIndicator)
當(dāng) openingIndicator
發(fā)射一個(gè)數(shù)據(jù)亚情,就會(huì)打開一個(gè) window
, 同時(shí)訂閱 closingIndicator
返回的Observable妄痪,當(dāng)這個(gè)Observable發(fā)射一個(gè)數(shù)據(jù),就結(jié)束此 window 和 ,發(fā)送收集數(shù)據(jù)的 Observable楞件。
無論何時(shí)衫生,只要 window 觀察到 windowOpenings 這個(gè)Observable發(fā)射了一個(gè)
Opening
對(duì)象,它就打開一個(gè)窗口土浸,并且同時(shí)調(diào)用 closingSelector
生成一個(gè)與那個(gè)窗口關(guān)聯(lián)的關(guān)閉 (closing)Observable
罪针。當(dāng)這個(gè)關(guān)閉 (closing)Observable 發(fā)射了一個(gè)對(duì)象時(shí),window
操作符就會(huì)關(guān)閉那個(gè)窗口以及關(guān)聯(lián)的closingSelector
的 Observable黄伊。
注意: 對(duì)這個(gè)方法來說泪酱,由于當(dāng)前窗口的關(guān)閉和新窗口的打開是由單獨(dú)的 Observable 管理的,它創(chuàng)建的窗口可能會(huì)存在重疊(重復(fù)某些來自原始Observable的數(shù)據(jù)) 或間隙(丟棄某些來自原始Observable的數(shù)據(jù))。
實(shí)例代碼:
// 2. window(ObservableSource openingIndicator, Function<T, ObservableSource<R>> closingIndicator)
// 當(dāng)openingIndicator發(fā)射一個(gè)數(shù)據(jù)西篓,就會(huì)打開一個(gè)window, 同時(shí)訂閱closingIndicator返回的Observable愈腾,
// 當(dāng)這個(gè)Observable發(fā)射一個(gè)數(shù)據(jù),就結(jié)束此window以及對(duì)應(yīng)的closingIndicator,發(fā)送收集數(shù)據(jù)的 Observable岂津。
Observable<Long> openingIndicator = Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable t) throws Exception {
System.out.println("--> openingIndicator is subscribe!");
}
}).doOnComplete(new Action() {
@Override
public void run() throws Exception {
System.out.println("--> openingIndicator is completed!");
}
}).doOnNext(new Consumer<Long>() {
@Override
public void accept(Long t) throws Exception {
System.out.println("--> openingIndicator emitter: " + t);
}
});
Observable<Long> dataSource = Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable t) throws Exception {
System.out.println("--> DataSource is subscribe!");
}
}).doOnNext(new Consumer<Long>() {
@Override
public void accept(Long t) throws Exception {
System.out.println("--> DataSource emitter: " + t);
}
});
dataSource.window(openingIndicator, new Function<Long, Observable<Long>>() {
@Override
public Observable<Long> apply(Long t) throws Exception {
System.out.println("--> apply(2): " + t);
return Observable.timer(2, TimeUnit.SECONDS).doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable t) throws Exception {
System.out.println("--> closingIndicator is subscribe!");
}
});
}
}).subscribe(new Consumer<Observable<Long>>() {
@Override
public void accept(Observable<Long> t) throws Exception {
System.out.println("-------------------> new window data");
t.subscribe(new Consumer<Long>() {
@Override
public void accept(Long t) throws Exception {
System.out.println("--> accept(2): " + t);
}
});
}
});
輸出:
--> DataSource is subscribe!
--> openingIndicator is subscribe!
--> openingIndicator emitter: 1
--> DataSource emitter: 1
-------------------> new window data
--> apply(2): 1
--> closingIndicator is subscribe!
--> openingIndicator emitter: 2
--> DataSource emitter: 2
-------------------> new window data
--> apply(2): 2
--> closingIndicator is subscribe!
--> accept(2): 2
--> accept(2): 2
--> openingIndicator emitter: 3
--> DataSource emitter: 3
-------------------> new window data
--> apply(2): 3
--> closingIndicator is subscribe!
--> accept(2): 3
--> accept(2): 3
--> accept(2): 3
--> DataSource emitter: 4
--> openingIndicator emitter: 4
--> accept(2): 4
--> accept(2): 4
-------------------> new window data
--> apply(2): 4
--> closingIndicator is subscribe!
--> DataSource emitter: 5
--> accept(2): 5
--> accept(2): 5
--> openingIndicator emitter: 5
Javadoc: window(openingIndicator, closingIndicator)
Javadoc: window(openingIndicator, closingIndicator虱黄,bufferSize)
1.3 window(count)
這個(gè) window
的方法立即打開它的第一個(gè)窗口。每當(dāng)當(dāng)前窗口發(fā)射了 count
項(xiàng)數(shù)據(jù)吮成,它就關(guān)閉當(dāng)前窗口并打開一個(gè)新窗口橱乱。如果從原始Observable收到了 onError
或 onCompleted
通知它也會(huì)關(guān)閉當(dāng)前窗口。
這種 window 方法發(fā)射一系列不重疊的窗口粱甫,這些窗口的數(shù)據(jù)集合與原始 Observable發(fā)射的數(shù)據(jù)是 一一對(duì)應(yīng) 的泳叠。
實(shí)例代碼:
// 3. window(count)
// 以count為緩存大小收集的不重疊的Observables對(duì)象,接受的數(shù)據(jù)與原數(shù)據(jù)彼此對(duì)應(yīng)
Observable.range(1, 20)
.window(5) // 設(shè)置緩存大小為5
.subscribe(new Consumer<Observable<Integer>>() {
@Override
public void accept(Observable<Integer> t) throws Exception {
System.out.println("--------------> new data window");
t.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
System.out.println("--> accept window(3): " + t);
}
});
}
});
輸出:
--------------> new data window
--> accept window(3): 1
--> accept window(3): 2
--> accept window(3): 3
--> accept window(3): 4
--> accept window(3): 5
--------------> new data window
--> accept window(3): 6
--> accept window(3): 7
--> accept window(3): 8
--> accept window(3): 9
--> accept window(3): 10
--------------> new data window
--> accept window(3): 11
--> accept window(3): 12
--> accept window(3): 13
--> accept window(3): 14
--> accept window(3): 15
--------------> new data window
--> accept window(3): 16
--> accept window(3): 17
--> accept window(3): 18
--> accept window(3): 19
--> accept window(3): 20
Javadoc: window(count)
1.4 window(count, skip)
這個(gè) window
的方法立即打開它的第一個(gè)窗口茶宵。原始Observable每發(fā)射 skip
項(xiàng)數(shù)據(jù)它就打開 一個(gè)新窗口(例如危纫,如果 skip 等于3,每到第三項(xiàng)數(shù)據(jù)乌庶,它會(huì)創(chuàng)建一個(gè)新窗口)种蝶。每當(dāng)當(dāng)前窗口發(fā)射了 count
項(xiàng)數(shù)據(jù),它就關(guān)閉當(dāng)前窗口并打開一個(gè)新窗口瞒大。如果從原始Observable 收到了onError
或 onCompleted
通知它也會(huì)關(guān)閉當(dāng)前窗口螃征。
解析: window 一開始打開一個(gè) window,每發(fā)射 skip 項(xiàng)數(shù)據(jù)就會(huì)打開一個(gè) window 獨(dú)立收集 原始數(shù)據(jù)透敌,當(dāng) window 收集了 count 個(gè)數(shù)據(jù)就會(huì)關(guān)閉盯滚,開啟另外一個(gè)。當(dāng)原始Observable發(fā)送了onError或者onCompleted通知也會(huì)關(guān)閉當(dāng)前窗口酗电。
- skip = count: 會(huì)依次順序接受原始數(shù)據(jù)魄藕,同window(count)
- skip > count: 兩個(gè)窗口可能會(huì)有 skip-count 項(xiàng)數(shù)據(jù)丟失
- skip < count: 兩個(gè)窗口可能會(huì)有 count-skip 項(xiàng)數(shù)據(jù)重疊
實(shí)例代碼:
// 4. window(count,skip)
// window一開始打開一個(gè)window,每發(fā)射skip項(xiàng)數(shù)據(jù)就會(huì)打開一個(gè)window獨(dú)立收集原始數(shù)據(jù)
// 當(dāng)window收集了count個(gè)數(shù)據(jù)就會(huì)關(guān)閉window撵术,開啟另外一個(gè)背率。
// 當(dāng)原始Observable發(fā)送了onError 或者 onCompleted 通知也會(huì)關(guān)閉當(dāng)前窗口。
// 4.1 skip = count: 會(huì)依次順序接受原始數(shù)據(jù)荷荤,同window(count)
Observable.range(1, 10)
.window(2, 2) // skip = count, 數(shù)據(jù)會(huì)依次順序輸出
.subscribe(new Consumer<Observable<Integer>>() {
@Override
public void accept(Observable<Integer> t) throws Exception {
t.observeOn(Schedulers.newThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
System.out.println("--> accept window(4-1): " + t +" , ThreadID: "+ Thread.currentThread().getId());
}
});
}
});
// 4.2 skip > count: 兩個(gè)窗口可能會(huì)有 skip-count 項(xiàng)數(shù)據(jù)丟失
Observable.range(1, 10)
.window(2, 3) // skip > count, 數(shù)據(jù)會(huì)存在丟失
.subscribe(new Consumer<Observable<Integer>>() {
@Override
public void accept(Observable<Integer> t) throws Exception {
t.observeOn(Schedulers.newThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
System.out.println("--> accept window(4-2): " + t +" , ThreadID: "+ Thread.currentThread().getId());
}
});
}
});
// 4.3 skip < count: 兩個(gè)窗口可能會(huì)有 count-skip 項(xiàng)數(shù)據(jù)重疊
Observable.range(1, 10)
.window(3, 2) // skip < count, 數(shù)據(jù)會(huì)重疊
.subscribe(new Consumer<Observable<Integer>>() {
@Override
public void accept(Observable<Integer> t) throws Exception {
t.observeOn(Schedulers.newThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
System.out.println("--> accept window(4-3): " + t +" , ThreadID: "+ Thread.currentThread().getId());
}
});
}
});
輸出:
--> accept window(4-1): 1 , ThreadID: 11
--> accept window(4-1): 2 , ThreadID: 11
--> accept window(4-1): 4 , ThreadID: 12
--> accept window(4-1): 3 , ThreadID: 11
--> accept window(4-1): 5 , ThreadID: 12
--> accept window(4-1): 6 , ThreadID: 12
--> accept window(4-1): 7 , ThreadID: 13
--> accept window(4-1): 8 , ThreadID: 13
--> accept window(4-1): 9 , ThreadID: 13
--> accept window(4-1): 10 , ThreadID: 14
--> accept window(4-2): 1 , ThreadID: 15
--> accept window(4-2): 2 , ThreadID: 15
--> accept window(4-2): 4 , ThreadID: 16
--> accept window(4-2): 5 , ThreadID: 16
--> accept window(4-2): 7 , ThreadID: 17
--> accept window(4-2): 8 , ThreadID: 17
--> accept window(4-2): 10 , ThreadID: 18
--> accept window(4-3): 1 , ThreadID: 19
--> accept window(4-3): 2 , ThreadID: 19
--> accept window(4-3): 3 , ThreadID: 19
--> accept window(4-3): 3 , ThreadID: 20
--> accept window(4-3): 4 , ThreadID: 20
--> accept window(4-3): 5 , ThreadID: 20
--> accept window(4-3): 5 , ThreadID: 21
--> accept window(4-3): 6 , ThreadID: 21
--> accept window(4-3): 7 , ThreadID: 21
--> accept window(4-3): 7 , ThreadID: 22
--> accept window(4-3): 8 , ThreadID: 22
--> accept window(4-3): 9 , ThreadID: 22
--> accept window(4-3): 9 , ThreadID: 23
--> accept window(4-3): 10 , ThreadID: 23
Javadoc: window(count, skip)
1.5 window(timespan, TimeUnit)
這個(gè) window
的方法立即打開它的第一個(gè)窗口收集數(shù)據(jù)退渗。每當(dāng)過了 timespan
這么長(zhǎng)的時(shí)間段它就關(guān)閉當(dāng)前窗口并打開一個(gè)新窗口(時(shí)間單位是 unit
移稳,可選在調(diào)度器 scheduler
上執(zhí)行)收集數(shù)據(jù)蕴纳。如果從原始 Observable 收到了 onError 或 onCompleted 通知它也會(huì)關(guān)閉當(dāng)前窗口。
這種 window 方法發(fā)射一系列不重疊的窗口个粱,這些窗口的數(shù)據(jù)集合與原始Observable發(fā)射的數(shù)據(jù)也是 一一對(duì)應(yīng) 的古毛。
實(shí)例代碼:
// 5. window(long timespan, TimeUnit unit)
// 每當(dāng)過了 timespan 的時(shí)間段,它就關(guān)閉當(dāng)前窗口并打開另一個(gè)新window收集數(shù)據(jù)
Observable.intervalRange(1, 10, 0, 1, TimeUnit.SECONDS)
.window(2, TimeUnit.SECONDS) // 間隔2秒關(guān)閉當(dāng)前 window 并打開一個(gè)新 window 收集數(shù)據(jù)
// .window(2, TimeUnit.SECONDS, Schedulers.newThread()) // 指定在 newThread 線程中
.subscribe(new Consumer<Observable<Long>>() {
@Override
public void accept(Observable<Long> t) throws Exception {
t.observeOn(Schedulers.newThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long t) throws Exception {
System.out.println("--> accept window(5): " + t +" , ThreadID: "+ Thread.currentThread().getId() );
}
});
}
});
輸出:
--> accept window(5): 1 , ThreadID: 11
--> accept window(5): 2 , ThreadID: 11
--> accept window(5): 3 , ThreadID: 11
--> accept window(5): 4 , ThreadID: 14
--> accept window(5): 5 , ThreadID: 14
--> accept window(5): 6 , ThreadID: 15
--> accept window(5): 7 , ThreadID: 16
--> accept window(5): 8 , ThreadID: 16
--> accept window(5): 9 , ThreadID: 17
--> accept window(5): 10 , ThreadID: 17
Javadoc: window(timespan, TimeUnit)
Javadoc: window(timespan, TimeUnit, scheduler)
1.6 window(timespan, TimeUnit, count)
這個(gè) window
的方法立即打開它的第一個(gè)窗口。這個(gè)變體是 window(count) 和 window(timespan, unit[, scheduler]) 的結(jié)合稻薇,每當(dāng)過了 timespan
的時(shí)長(zhǎng)或者當(dāng)前窗口收到了 count
項(xiàng)數(shù)據(jù)嫂冻,它就關(guān)閉當(dāng)前窗口并打開另一個(gè)。如果從原始 Observable收到了 onError
或 onCompleted
通知它也會(huì)關(guān)閉當(dāng)前窗口塞椎。
這種window方法發(fā)射 一系列不重疊的窗口桨仿,這些窗口的數(shù)據(jù)集合與原始Observable發(fā)射的數(shù)據(jù)也是 一一對(duì)應(yīng) 的。
實(shí)例代碼:
// 6. window(long timespan, TimeUnit unit, long count)
// 每當(dāng)過了timespan的時(shí)間段或者當(dāng)前窗口收到了count項(xiàng)數(shù)據(jù)案狠,它就關(guān)閉當(dāng)前window并打開另一個(gè)window收集數(shù)據(jù)
Observable.intervalRange(1, 12, 0, 500, TimeUnit.MILLISECONDS)
.window(2, TimeUnit.SECONDS, 5) // 每隔2秒關(guān)閉當(dāng)前收集數(shù)據(jù)的window并開啟一個(gè)window收集5項(xiàng)數(shù)據(jù)
// .window(2, TimeUnit.SECONDS,Schedulers.newThread(), 5 ) // 指定在 newThread 線程中
.subscribe(new Consumer<Observable<Long>>() {
@Override
public void accept(Observable<Long> t) throws Exception {
t.observeOn(Schedulers.newThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long t) throws Exception {
System.out.println("--> accept window(6): " + t + " , ThreadID: "+ Thread.currentThread().getId() );
}
});
}
});
輸出:
--> accept window(6): 1 , ThreadID: 11
--> accept window(6): 2 , ThreadID: 11
--> accept window(6): 3 , ThreadID: 11
--> accept window(6): 4 , ThreadID: 11
--> accept window(6): 5 , ThreadID: 11
--> accept window(6): 6 , ThreadID: 14
--> accept window(6): 7 , ThreadID: 14
--> accept window(6): 8 , ThreadID: 14
--> accept window(6): 9 , ThreadID: 14
--> accept window(6): 10 , ThreadID: 14
--> accept window(6): 11 , ThreadID: 15
--> accept window(6): 12 , ThreadID: 15
Javadoc: window(timespan, TimeUnit, count)
Javadoc: window(timespan, TimeUnit, scheduler, count)
1.7 window(timespan, timeskip, TimeUnit)
這個(gè) window
的方法立即打開它的第一個(gè)窗口服傍。隨后每當(dāng)過了 timeskip
的時(shí)長(zhǎng)就打開一個(gè)新窗口(時(shí)間單位是 unit
,可選在調(diào)度器 scheduler
上執(zhí)行)骂铁,當(dāng)窗口打開的時(shí)長(zhǎng)達(dá) 到 timespan
吹零,它就關(guān)閉當(dāng)前打開的窗口。如果從原始Observable收到 了 onError 或 onCompleted 通知它也會(huì)關(guān)閉當(dāng)前窗口拉庵。窗口的數(shù)據(jù)可能重疊也可能有間隙灿椅,取決于你設(shè)置的 timeskip
和 timespan
的值。
解析: 在每一個(gè) timeskip 時(shí)期內(nèi)都創(chuàng)建一個(gè)新的 window钞支,然后獨(dú)立收集 timespan 時(shí)間段的原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)茫蛹。注意:因?yàn)槊總€(gè) window 都是獨(dú)立接收數(shù)據(jù),當(dāng)接收數(shù)據(jù)的時(shí)間與創(chuàng)建新 window 的時(shí)間不一致時(shí)會(huì)有數(shù)據(jù)項(xiàng)重復(fù)伸辟,丟失等情況麻惶。
- skip = timespan: 會(huì)依次順序接受原始數(shù)據(jù),同window(count)
- skip > timespan: 兩個(gè)窗口可能會(huì)有 skip-timespan 項(xiàng)數(shù)據(jù)丟失
- skip < timespan: 兩個(gè)窗口可能會(huì)有 timespan-skip 項(xiàng)數(shù)據(jù)重疊
實(shí)例代碼:
// 7. window(long timespan, long timeskip, TimeUnit unit)
// 在每一個(gè)timeskip時(shí)期內(nèi)都創(chuàng)建一個(gè)新的window,然后獨(dú)立收集timespan時(shí)間段的原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)信夫,
// 如果timespan長(zhǎng)于timeskip窃蹋,它發(fā)射的數(shù)據(jù)包將會(huì)重疊,因此可能包含重復(fù)的數(shù)據(jù)項(xiàng)静稻。
// 7.1 skip = timespan: 會(huì)依次順序接受原始數(shù)據(jù)警没,同window(count)
Observable.intervalRange(1, 5, 0, 1000, TimeUnit.MILLISECONDS)
.window(1, 1, TimeUnit.SECONDS) // 設(shè)置每秒創(chuàng)建一個(gè)window,收集2秒的數(shù)據(jù)
// .window(2, 1, TimeUnit.SECONDS, Schedulers.newThread()) // 指定在 newThread 線程中
.subscribe(new Consumer<Observable<Long>>() {
@Override
public void accept(Observable<Long> t) throws Exception {
t.observeOn(Schedulers.newThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long t) throws Exception {
System.out.println("--> accept window(7-1): " + t + " , ThreadID: "+ Thread.currentThread().getId());
}
});
}
});
// 7.2 skip > timespan: 兩個(gè)窗口可能會(huì)有 skip-timespan 項(xiàng)數(shù)據(jù)丟失
Observable.intervalRange(1, 5, 0, 1000, TimeUnit.MILLISECONDS)
.window(1, 2, TimeUnit.SECONDS) // 設(shè)置每秒創(chuàng)建一個(gè)window振湾,收集2秒的數(shù)據(jù)
// .window(2, 1, TimeUnit.SECONDS, Schedulers.newThread()) // 指定在 newThread 線程中
.subscribe(new Consumer<Observable<Long>>() {
@Override
public void accept(Observable<Long> t) throws Exception {
t.observeOn(Schedulers.newThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long t) throws Exception {
System.out.println("--> accept window(7-2): " + t + " , ThreadID: "+ Thread.currentThread().getId());
}
});
}
});
// 7.3 skip < timespan: 兩個(gè)窗口可能會(huì)有 timespan-skip 項(xiàng)數(shù)據(jù)重疊
Observable.intervalRange(1, 5, 0, 1000, TimeUnit.MILLISECONDS)
.window(2, 1, TimeUnit.SECONDS) // 設(shè)置每秒創(chuàng)建一個(gè)window杀迹,收集2秒的數(shù)據(jù)
// .window(2, 1, TimeUnit.SECONDS, Schedulers.newThread()) // 指定在 newThread 線程中
.subscribe(new Consumer<Observable<Long>>() {
@Override
public void accept(Observable<Long> t) throws Exception {
t.observeOn(Schedulers.newThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long t) throws Exception {
System.out.println("--> accept window(7-3): " + t + " , ThreadID: "+ Thread.currentThread().getId());
}
});
}
});
輸出:
--> accept window(7-1): 1 , ThreadID: 11
--> accept window(7-1): 2 , ThreadID: 11
--> accept window(7-1): 3 , ThreadID: 14
--> accept window(7-1): 4 , ThreadID: 15
--> accept window(7-1): 5 , ThreadID: 17
----------------------------------------------------------------------
--> accept window(7-2): 1 , ThreadID: 11
--> accept window(7-2): 3 , ThreadID: 14
--> accept window(7-2): 5 , ThreadID: 15
----------------------------------------------------------------------
--> accept window(7-3): 1 , ThreadID: 11
--> accept window(7-3): 2 , ThreadID: 11
--> accept window(7-3): 2 , ThreadID: 14
--> accept window(7-3): 3 , ThreadID: 14
--> accept window(7-3): 3 , ThreadID: 15
--> accept window(7-3): 4 , ThreadID: 15
--> accept window(7-3): 4 , ThreadID: 16
--> accept window(7-3): 5 , ThreadID: 16
--> accept window(7-3): 5 , ThreadID: 17
Javadoc: window(timespan, timeskip, TimeUnit)
Javadoc: window(timespan, timeskip, TimeUnit, scheduler)
2. GroupBy
將一個(gè) Observable 分拆為一些 Observables 集合,它們中的每一個(gè)發(fā)射原始 Observable 的一個(gè)子序列押搪。
RxJava實(shí)現(xiàn)了 groupBy
操作符树酪。它返回Observable的一個(gè)特殊子類 GroupedObservable
,實(shí)現(xiàn)了GroupedObservable
接口的對(duì)象有一個(gè)額外的方法 getKey 大州,這個(gè) Key 用于將數(shù)據(jù)分組到指定的Observable续语。有一個(gè)版本的 groupBy 允許你傳遞一個(gè)變換函數(shù),這樣它可以在發(fā)射結(jié)果 GroupedObservable 之前改變數(shù)據(jù)項(xiàng)厦画。
如果你取消訂閱一個(gè) GroupedObservable 疮茄,那個(gè) Observable 將會(huì)終止滥朱。如果之后原始的 Observable又發(fā)射了一個(gè)與這個(gè)Observable的Key匹配的數(shù)據(jù), groupBy 將會(huì)為這個(gè) Key 創(chuàng)建一個(gè)新的 GroupedObservable力试。
注意: groupBy
將原始 Observable 分解為一個(gè)發(fā)射多個(gè) GroupedObservable
的Observable徙邻,一旦有訂閱,每個(gè) GroupedObservable 就開始緩存數(shù)據(jù)畸裳。因此缰犁,如果你忽略這 些 GroupedObservable 中的任何一個(gè),這個(gè)緩存可能形成一個(gè)潛在的內(nèi)存泄露怖糊。因此民鼓,如果你不想觀察,也不要忽略 GroupedObservable 蓬抄。你應(yīng)該使用像 take(0)
這樣會(huì)丟棄自己的緩存的操作符丰嘉。
2.1 groupBy(keySelector)
GroupBy
操作符將原始 Observable 分拆為一些 Observables
集合,它們中的每一個(gè)發(fā)射原始 Observable 數(shù)據(jù)序列的一個(gè)子序列嚷缭。哪個(gè)數(shù)據(jù)項(xiàng)由哪一個(gè) Observable 發(fā)射是由一個(gè)函數(shù)判定的饮亏,這個(gè)函數(shù)給每一項(xiàng)指定一個(gè)Key
,Key相同的數(shù)據(jù)會(huì)被同一個(gè) Observable 發(fā)射阅爽。還有一個(gè) delayError
參數(shù)的方法路幸,指定是否延遲 Error
通知的Observable。
實(shí)例代碼:
// 1. groupBy(keySelector)
// 將原始數(shù)據(jù)處理后加上分組tag付翁,通過GroupedObservable發(fā)射分組數(shù)據(jù)
Observable.range(1, 10)
.groupBy(new Function<Integer, String>() {
@Override
public String apply(Integer t) throws Exception {
// 不同的key將會(huì)產(chǎn)生不同分組的Observable
return t % 2 == 0 ? "Even" : "Odd"; // 將數(shù)據(jù)奇偶數(shù)進(jìn)行分組,
}
}).observeOn(Schedulers.newThread())
.subscribe(new Consumer<GroupedObservable<String, Integer>>() {
@Override
public void accept(GroupedObservable<String, Integer> grouped) throws Exception {
// 得到每個(gè)分組數(shù)據(jù)的的Observable
grouped.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
// 得到數(shù)據(jù)
System.out.println("--> accept groupBy(1): groupKey: " + grouped.getKey() + ", value: " + t);
}
});
}
});
輸出:
--> accept groupBy(1): groupKey: Odd, value: 1
--> accept groupBy(1): groupKey: Odd, value: 3
--> accept groupBy(1): groupKey: Odd, value: 5
--> accept groupBy(1): groupKey: Odd, value: 7
--> accept groupBy(1): groupKey: Odd, value: 9
--> accept groupBy(1): groupKey: Even, value: 2
--> accept groupBy(1): groupKey: Even, value: 4
--> accept groupBy(1): groupKey: Even, value: 6
--> accept groupBy(1): groupKey: Even, value: 8
--> accept groupBy(1): groupKey: Even, value: 10
Javadoc: groupBy(keySelector)
Javadoc: groupBy(keySelector, delayError)
2.2 groupBy(keySelector, valueSelector)
GroupBy
操作符通過 keySelector
將原始 Observable 按照 Key
分組简肴,產(chǎn)生不同的 Observable,再通過 valueSelector
對(duì)原始的數(shù)據(jù)進(jìn)行處理百侧,在發(fā)送每一個(gè)被處理完成的數(shù)據(jù)砰识。
實(shí)例代碼:
// 2. groupBy(Function(T,R),F(xiàn)unction(T,R))
// 第一個(gè)func對(duì)原數(shù)據(jù)進(jìn)行分組處理(僅僅分組添加key佣渴,不處理原始數(shù)據(jù))辫狼,第二個(gè)func對(duì)原始數(shù)據(jù)進(jìn)行處理
Observable.range(1, 10)
.groupBy(new Function<Integer, String>() {
@Override
public String apply(Integer t) throws Exception {
// 對(duì)原始數(shù)據(jù)進(jìn)行分組處理
return t % 2 == 0 ? "even" : "odd";
}
},new Function<Integer, String>() {
@Override
public String apply(Integer t) throws Exception {
// 對(duì)原始數(shù)據(jù)進(jìn)行數(shù)據(jù)轉(zhuǎn)換處理
return t + " is " + (t % 2 == 0 ? "even" : "odd");
}
}).observeOn(Schedulers.newThread()).subscribe(new Consumer<GroupedObservable<String, String>>() {
@Override
public void accept(GroupedObservable<String, String> grouped) throws Exception {
grouped.subscribe(new Consumer<String>() {
@Override
public void accept(String t) throws Exception {
// 接受最終的分組處理以及原數(shù)據(jù)處理后的數(shù)據(jù)
System.out.println("--> accept groupBy(2): groupKey = " + grouped.getKey()
+ ", value = " + t);
}
});
}
});
輸出:
--> accept groupBy(2): groupKey = odd, value = 1 is odd
--> accept groupBy(2): groupKey = odd, value = 3 is odd
--> accept groupBy(2): groupKey = odd, value = 5 is odd
--> accept groupBy(2): groupKey = odd, value = 7 is odd
--> accept groupBy(2): groupKey = odd, value = 9 is odd
--> accept groupBy(2): groupKey = even, value = 2 is even
--> accept groupBy(2): groupKey = even, value = 4 is even
--> accept groupBy(2): groupKey = even, value = 6 is even
--> accept groupBy(2): groupKey = even, value = 8 is even
--> accept groupBy(2): groupKey = even, value = 10 is even
Javadoc: groupBy(keySelector, valueSelector)
Javadoc: groupBy(keySelector, valueSelector, delayError)
Javadoc: groupBy(keySelector, valueSelector, delayError, bufferSize)
3. Scan
連續(xù)地對(duì)數(shù)據(jù)序列的每一項(xiàng)應(yīng)用一個(gè)函數(shù),然后連續(xù)發(fā)射結(jié)果辛润。
3.1 scan(accumulator)
Scan
操作符對(duì)原始 Observable 發(fā)射的第一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù)膨处,然后將那個(gè)函數(shù)的結(jié)果作為 自己的第一項(xiàng)數(shù)據(jù)發(fā)射。它將函數(shù)的結(jié)果同第二項(xiàng)數(shù)據(jù)一起填充給這個(gè)函數(shù)來產(chǎn)生它自己的第二項(xiàng)數(shù)據(jù)砂竖。它持續(xù)進(jìn)行這個(gè)過程來產(chǎn)生剩余的數(shù)據(jù)序列真椿。這個(gè)操作符在某些情況下被叫做 accumulator
。
解析: 先發(fā)送原始數(shù)據(jù)第一項(xiàng)數(shù)據(jù)乎澄,然后將這個(gè)數(shù)據(jù)與下一個(gè)原始數(shù)據(jù)作為參數(shù)傳遞給
accumulator
突硝, 處理后發(fā)送這個(gè)數(shù)據(jù),并與下一個(gè)原始數(shù)據(jù)一起傳遞到下一次 accumulator 三圆,直到數(shù)據(jù)序列結(jié)束狞换。類似一個(gè)累積的過程。
實(shí)例代碼:
// 1. scan(BiFunction(Integer sum, Integer t2))
// 接受數(shù)據(jù)序列舟肉,從第二個(gè)數(shù)據(jù)開始修噪,每次會(huì)將上次處理數(shù)據(jù)和現(xiàn)在接受的數(shù)據(jù)進(jìn)行處理后發(fā)送
Observable.range(1, 10)
.scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer LastItem, Integer item) throws Exception {
System.out.println("--> apply: LastItem = " + LastItem + ", CurrentItem = " + item);
return LastItem + item; // 實(shí)現(xiàn)求和操作
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
System.out.println("--> accept scan(1): " + t);
}
});
輸出:
--> accept scan(1): 1
--> apply: LastItem = 1, CurrentItem = 2
--> accept scan(1): 3
--> apply: LastItem = 3, CurrentItem = 3
--> accept scan(1): 6
--> apply: LastItem = 6, CurrentItem = 4
--> accept scan(1): 10
--> apply: LastItem = 10, CurrentItem = 5
--> accept scan(1): 15
--> apply: LastItem = 15, CurrentItem = 6
--> accept scan(1): 21
--> apply: LastItem = 21, CurrentItem = 7
--> accept scan(1): 28
--> apply: LastItem = 28, CurrentItem = 8
--> accept scan(1): 36
--> apply: LastItem = 36, CurrentItem = 9
--> accept scan(1): 45
--> apply: LastItem = 45, CurrentItem = 10
--> accept scan(1): 55
Javadoc: scan(accumulator)
3.2 scan(initialValue, accumulator)
有一個(gè) scan
操作符的方法,你可以傳遞一個(gè)種子值給累加器函數(shù)的第一次調(diào)用(Observable 發(fā)射的第一項(xiàng)數(shù)據(jù))路媚。如果你使用這個(gè)版本黄琼,scan
將發(fā)射種子值作為自己的第一項(xiàng)數(shù)據(jù)。
注意: 傳遞 null
作為種子值與不傳遞是不同的整慎,null
種子值是合法的脏款。
解析: 指定初始種子值,第一次發(fā)送種子值裤园,后續(xù)發(fā)送原始數(shù)據(jù)序列以及累計(jì)處理數(shù)據(jù)撤师。
實(shí)例代碼:
// 2. scan(R,Func2)
// 指定初始種子值,第一次發(fā)送種子值,后續(xù)發(fā)送原始數(shù)據(jù)序列以及累計(jì)處理數(shù)據(jù)
Observable.range(1, 10)
.scan(100, new BiFunction<Integer, Integer, Integer>() { // 指定初始種子數(shù)據(jù)為100
@Override
public Integer apply(Integer lastValue, Integer item) throws Exception {
System.out.println("--> apply: lastValue = " + lastValue + ", item = " + item);
return lastValue + item; // 指定初值的求和操作
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
System.out.println("--> accept sacn(2) = " + t);
}
});
輸出:
--> accept sacn(2) = 100
--> apply: lastValue = 100, item = 1
--> accept sacn(2) = 101
--> apply: lastValue = 101, item = 2
--> accept sacn(2) = 103
--> apply: lastValue = 103, item = 3
--> accept sacn(2) = 106
--> apply: lastValue = 106, item = 4
--> accept sacn(2) = 110
--> apply: lastValue = 110, item = 5
--> accept sacn(2) = 115
--> apply: lastValue = 115, item = 6
--> accept sacn(2) = 121
--> apply: lastValue = 121, item = 7
--> accept sacn(2) = 128
--> apply: lastValue = 128, item = 8
--> accept sacn(2) = 136
--> apply: lastValue = 136, item = 9
--> accept sacn(2) = 145
--> apply: lastValue = 145, item = 10
--> accept sacn(2) = 155
注意: 這個(gè)操作符默認(rèn)不在任何特定的調(diào)度器上執(zhí)行拧揽。
Javadoc: scan(initialValue, accumulator)
4. Cast
Cast
將原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)都強(qiáng)制轉(zhuǎn)換為一個(gè)指定的類型剃盾,然后再發(fā)射數(shù)據(jù),它是 map 的一個(gè)特殊版本淤袜。轉(zhuǎn)換失敗會(huì)有Error通知痒谴。
4.1 cast(clazz)
將原始數(shù)據(jù)強(qiáng)制轉(zhuǎn)換為指定的 clazz
類型,如果轉(zhuǎn)換成功發(fā)送轉(zhuǎn)換后的數(shù)據(jù)铡羡,否則發(fā)送Error
通知积蔚。一般用于 數(shù)據(jù)類型的轉(zhuǎn)換 和 數(shù)據(jù)實(shí)際類型的檢查(多態(tài))。
實(shí)例代碼:
// cast(clazz)
// 1. 基本類型轉(zhuǎn)換
Observable.range(1, 5)
.cast(Integer.class)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Exception {
System.out.println("-- accept cast(1): " + t);
}
});
// 2. 轉(zhuǎn)換失敗通知
System.out.println("------------------------------------");
Observable.just((byte)1)
.cast(Integer.class)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe(2)");
}
@Override
public void onNext(Integer t) {
System.out.println("--> onNext(2) = " + t);
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError(2) = " + e.toString());
}
@Override
public void onComplete() {
System.out.println("--> onComplete(2)");
}
});
System.out.println("------------------------------------");
class Animal{
public int id;
}
class Dog extends Animal{
public String name;
@Override
public String toString() {
return "Dog [name=" + name + ", id=" + id + "]";
}
}
// 3. 多態(tài)轉(zhuǎn)換烦周,檢查數(shù)據(jù)的實(shí)際類型
Animal animal = new Dog();
animal.id = 666;
Observable.just(animal)
.cast(Dog.class)
.subscribe(new Consumer<Dog>() {
@Override
public void accept(Dog t) throws Exception {
System.out.println("--> accept cast(3): " + t);
}
});
輸出:
-- accept cast(1): 1
-- accept cast(1): 2
-- accept cast(1): 3
-- accept cast(1): 4
-- accept cast(1): 5
------------------------------------
--> onSubscribe(2)
--> onError(2) = java.lang.ClassCastException: Cannot cast java.lang.Byte to java.lang.Integer
------------------------------------
--> accept cast(3): Dog [name=null, id=666]
Javadoc: cast(clazz)
小結(jié):
在實(shí)際開發(fā)場(chǎng)景中尽爆,比如網(wǎng)絡(luò)數(shù)據(jù)請(qǐng)求場(chǎng)景,原始的數(shù)據(jù)格式或類型可能并不滿足開發(fā)的實(shí)際需要读慎,需要對(duì)數(shù)據(jù)進(jìn)行處理教翩。數(shù)據(jù)變換操作在實(shí)際開發(fā)場(chǎng)景中還是非常多的,所以數(shù)據(jù)的變換是非常重要的贪壳。使用Rx的數(shù)據(jù)變換操作可以輕松完成大多數(shù)場(chǎng)景的數(shù)據(jù)變換操作饱亿,提高開發(fā)效率。
Rx介紹與講解及完整目錄參考:Rxjava2 介紹與詳解實(shí)例
實(shí)例代碼: