學(xué)習(xí)資料
1. 變換操作符
作用:用于對(duì)Observable發(fā)射的數(shù)據(jù)進(jìn)行變換
1.1 Buffer緩沖
定期收集Observable的數(shù)據(jù)放進(jìn)一個(gè)數(shù)據(jù)包裹,然后發(fā)射這些數(shù)據(jù)包裹继薛,而不是一次發(fā)射一個(gè)值
注意:如果原來的Observable
發(fā)射了一個(gè)onError
通知,Buffer
會(huì)立即傳遞這個(gè)通知,而不是首先發(fā)射緩存的數(shù)據(jù)定续,即使在這之前緩存中包含了原始Observable
發(fā)射的數(shù)據(jù)
1.1.1 buffer(count)
將原始Observable
產(chǎn)生的數(shù)據(jù)以List
非重疊的形式緩存,一次最多緩存count個(gè)禾锤,然后產(chǎn)生的新的Observable
一次性將List
發(fā)送出去
/**
* buffer(3)
* 緩存3個(gè)原始數(shù)據(jù)私股,存進(jìn)List
*/
private static void bufferCount() {
Observable
.from(Stream.iterate(1, new UnaryOperator<Integer>() {
@Override
public Integer apply(Integer integer) {
return integer + 1;
}
}).limit(30).collect(Collectors.toList()))
.buffer(3)
.subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
System.out.println(integers);
}
});
}
運(yùn)行結(jié)果:
[1, 2, 3]
[4, 5, 6]
[7, 8, 9]
[10, 11, 12]
[13, 14, 15]
[16, 17, 18]
[19, 20, 21]
[22, 23, 24]
[25, 26, 27]
[28, 29, 30]
1.1.2 buffer(count , skip)
從原始的Observable
的第一項(xiàng)數(shù)據(jù)開始進(jìn)行緩存,發(fā)出了skip
個(gè)數(shù)據(jù)后时肿,將著skip
個(gè)數(shù)據(jù)看作一組庇茫,從當(dāng)前這組第一項(xiàng)數(shù)據(jù)開始,直到count
個(gè)數(shù)據(jù)螃成,存進(jìn)List
集合旦签,由新的Observable
發(fā)出。根據(jù)count寸宏,skip
大小宁炫,會(huì)出現(xiàn)重疊或者間隙
count < skip ,出現(xiàn)間隙:
private static void bufferSkip() {
Observable
.from(Stream.iterate(1, new UnaryOperator<Integer>() {
@Override
public Integer apply(Integer integer) {
return integer + 1;
}
}).limit(30).collect(Collectors.toList()))
//.buffer(3,2)//重疊
.buffer(3,5)//間隙
.subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
integers.forEach(new Consumer<Integer>() {
@Override
public void accept(Integer integer) {
System.out.print(integer + " ");
}
});
}
});
}
運(yùn)行結(jié)果:
1 2 3 6 7 8 11 12 13 16 17 18 21 22 23 26 27 28
每當(dāng)收到5數(shù)據(jù)時(shí),這5個(gè)數(shù)據(jù)就是一組的氮凝,就從第一個(gè)開始羔巢,將緩存的3個(gè)數(shù)據(jù)存進(jìn)List
,剩下2個(gè)就丟棄,這樣就導(dǎo)致產(chǎn)生間隙
count > skip 竿秆,出現(xiàn)重疊:
...
.buffer(3,2)
...
運(yùn)行結(jié)果:
1 2 3 3 4 5 5 6 7 7 8 9 9 10 11 11 12 13 13 14 15 15 16 17 17 18 19 19 20 21 21 22 23 23 24 25 25 26 27 27 28 29 29 30
每當(dāng)收到2個(gè)數(shù)據(jù)后启摄,這2個(gè)數(shù)據(jù)是一組,但由于是將緩存的3個(gè)數(shù)據(jù)存進(jìn)List
幽钢,就將下一組數(shù)據(jù)補(bǔ)充進(jìn)來歉备,導(dǎo)致重疊
1.1.3 buffer(func0)
原始的Observable
產(chǎn)生數(shù)據(jù)后,當(dāng)buffer(func0)
訂閱了后匪燕,將收到的數(shù)據(jù)存進(jìn)List
中蕾羊,Func0
會(huì)返回一個(gè)Observable
對(duì)象具有監(jiān)視作用,適當(dāng)條件下這個(gè)Observable
發(fā)出一個(gè)通知時(shí)(感覺隨意發(fā)出一個(gè)數(shù)據(jù)就可以)帽驯,buffer
就會(huì)將當(dāng)前存放緩存數(shù)據(jù)的List
發(fā)出去
private static void bufferFunc() {
Observable
//在當(dāng)前線程龟再,每隔100毫秒產(chǎn)生一個(gè)整數(shù)
.interval(0,100,TimeUnit.MILLISECONDS,Schedulers.immediate())
//限制為30個(gè)
.limit(30)
//每隔500毫秒發(fā)一個(gè)整數(shù),發(fā)出的數(shù)據(jù)尼变,可以看作通知
.buffer(new Func0<Observable<?>>() {
@Override
public Observable<?> call() {
return Observable.interval(500, TimeUnit.MILLISECONDS, Schedulers.newThread());
}
})
//將List集合打印
.subscribe(System.out::println);
}
運(yùn)行結(jié)果:
[0, 1, 2, 3, 4]
[5, 6, 7, 8, 9]
[10, 11, 12, 13, 14]
[15, 16, 17, 18, 19]
[20, 21, 22, 23, 24]
[25, 26, 27, 28, 29]
1.1.4 buffer(Observable,Func1)
buffer(bufferOpenings,?bufferClosingSelector)
原始Obseravble
開始發(fā)送數(shù)據(jù)之后利凑,buffer
會(huì)監(jiān)視bufferOpenings
這個(gè)Observable
,每當(dāng)bufferOpenings
發(fā)送出一個(gè)數(shù)據(jù)后,會(huì)創(chuàng)建出一個(gè)新的List
開始存放原始的Obseravble
發(fā)出的數(shù)據(jù)享甸,相當(dāng)于Open
標(biāo)記截碴。bufferOpenings
發(fā)出的數(shù)據(jù)叫bufferClosingSelector
的Func1
會(huì)接收到,當(dāng)bufferClosingSelector
接收到這個(gè)信號(hào)后蛉威,根據(jù)需求做相應(yīng)的處理。Func1
會(huì)返回一個(gè)新的Observable
走哺,當(dāng)buffer
監(jiān)測(cè)到這個(gè)新的Observable
時(shí)蚯嫌,就會(huì)關(guān)閉List
,然后將List
發(fā)送出去
private static void bufferOpenClose() {
Observable
//在當(dāng)前線程 每隔100毫秒 從0開始 發(fā)出整數(shù)序列
.interval(0, 100, TimeUnit.MILLISECONDS, Schedulers.immediate())
.limit(30)
//buffer(bufferOpenings,?bufferClosingSelector)
//bufferOpenings丙躏,每隔500毫秒發(fā)出一個(gè)整數(shù)择示,打開信號(hào)
//bufferClosingSelector,接到bufferOpenings發(fā)來的通知晒旅,延遲200毫秒后發(fā)出第一個(gè)關(guān)閉信號(hào)栅盲,之后每隔500毫秒發(fā)出一個(gè)整數(shù)進(jìn)行通知
.buffer(Observable.interval(500, TimeUnit.MILLISECONDS, Schedulers.newThread()), new Func1<Long, Observable<?>>() {
@Override
public Observable<?> call(Long aLong) {
System.out.println(aLong);
//200毫秒后,發(fā)出一個(gè)整數(shù)0废恋,作用關(guān)閉信號(hào)
return Observable.timer(200,TimeUnit.MILLISECONDS,Schedulers.newThread());
}
})
.subscribe(System.out::println);
}
運(yùn)行結(jié)果:
0
[5, 6]
1
[10, 11]
2
[15, 16]
3
[20, 21]
4
[25, 26]
當(dāng)bufferOpenings
發(fā)出第一個(gè)打開信號(hào)之后200毫秒那一時(shí)刻谈秫,bufferClosing
會(huì)接到一個(gè)關(guān)閉關(guān)閉信號(hào),這200毫秒內(nèi)的數(shù)據(jù)項(xiàng)就會(huì)被存進(jìn)一個(gè)List
集合內(nèi)鱼鼓,緊接著List
遍被原始的Observable
發(fā)送出去拟烫。第一次buffer
便完成
在bufferOpenings
發(fā)出第一個(gè)信號(hào)500毫秒時(shí),發(fā)出第二個(gè)打開信號(hào)迄本,之后200毫秒那一時(shí)刻硕淑,bufferClosing
會(huì)接到一個(gè)關(guān)閉關(guān)閉信號(hào),如此循環(huán),重復(fù)置媳,直到數(shù)據(jù)項(xiàng)沒有
估計(jì)也就只有我自己看得懂了
1.2 GroupBy分組
GroupBy
將原始的Observable
拆分成多個(gè)組于樟,每個(gè)組可以有一個(gè)自己的key
,同一個(gè)key
的數(shù)據(jù)由一個(gè)Obsvervable
來發(fā)送
GroupBy
返回的是Observable
的一個(gè)特殊子類GroupedObservable
拇囊,實(shí)現(xiàn)了GroupedObservable
接口的對(duì)象有一個(gè)額外的方法getKey
迂曲,根據(jù)拿到的key
可以做對(duì)應(yīng)的操作
默認(rèn)不在任何特定的調(diào)度器上執(zhí)行
1.2.1 groupBy(Func1)
簡單使用:
/**
*將產(chǎn)生的數(shù)據(jù)中為偶數(shù)的輸出打印
*
*/
private static void groupByFunc1() {
Observable
.interval(0, 100, TimeUnit.MILLISECONDS, Schedulers.immediate())
.limit(10)
//根據(jù)奇偶類型 返回不同的key 偶數(shù)為 "1"
.groupBy(new Func1<Long, String>() {
@Override
public String call(Long aLong) {
return aLong % 2 == 0 ? "1" : "2";
}
})
//根據(jù)stringLongGroupedObservable的key類型 輸出
.subscribe(new Action1<GroupedObservable<String, Long>>() {
@Override
public void call(GroupedObservable<String, Long> stringLongGroupedObservable) {
if (stringLongGroupedObservable.getKey().equals("1")) {
stringLongGroupedObservable.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.print(aLong +",");
}
});
}
}
});
}
運(yùn)行結(jié)果:
0,2,4,6,8,
1.2.2 groupBy(Func1,Func1)
這個(gè)方法可以對(duì)原始數(shù)據(jù)進(jìn)行修改
簡單使用:
/**
* 修改產(chǎn)生的數(shù)據(jù)中結(jié)果大于4的值
*/
private static void groupByMap() {
Observable
.interval(0, 100, TimeUnit.MILLISECONDS, Schedulers.immediate())
.limit(10)
.groupBy(new Func1<Long, String>() {
@Override
public String call(Long aLong) {
return aLong % 2 == 0 ? "1" : "2";
}
}, new Func1<Long, String>() {
@Override
public String call(Long aLong) {
return aLong > 4 ? (aLong + " -- > 哈") : (aLong + "");
}
}
// , new Func1<Action1<String>, Map<String, Object>>() {
// @Override
// public Map<String, Object> call(Action1<String> stringAction1) {
// return null;
// }
// }
)
.subscribe(new Action1<GroupedObservable<String, String>>() {
@Override
public void call(GroupedObservable<String, String> stringLongGroupedObservable) {
if (stringLongGroupedObservable.getKey().equals("1")) {
stringLongGroupedObservable.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.print(s + ",");
}
});
}
}
});
}
運(yùn)行結(jié)果:
0,2,4,6 -- > 哈,8 -- > 哈,
方法中注釋掉的是3個(gè)參數(shù)的方法,不知道怎么用的寂拆,看源碼中的注釋也沒看明白奢米,先不管了
1.3 Window窗口
定期將來自原始Observable的數(shù)據(jù)分解為一個(gè)Observable窗口,發(fā)射這些窗口纠永,而不是每次發(fā)射一項(xiàng)數(shù)據(jù)
Window
和Buffer
有些類似鬓长,Buffer
發(fā)送的是存放原始數(shù)據(jù)的List
包裹,而Window
發(fā)送的是Observable
尝江,發(fā)送的每個(gè)Observable
都包含原始的Observable
的數(shù)據(jù)子集涉波,最后會(huì)發(fā)送一個(gè)onCompleted
通知
1.3.1 window(Func0)
- window(closingSelector)
window(Func0)
會(huì)打開一個(gè)窗口,當(dāng)監(jiān)測(cè)到closingSelector
返回了一個(gè)Obsvable
對(duì)象后炭序,就會(huì)關(guān)閉當(dāng)前的窗口打開一個(gè)新的窗口啤覆,并將在當(dāng)前窗口打開期間的收集數(shù)據(jù)的Observable
發(fā)送出去
發(fā)射一系列不重疊的窗口,這些窗口的數(shù)據(jù)集合與原始Observable
發(fā)射的數(shù)據(jù)是一一對(duì)應(yīng)的惭聂。
簡單使用:
private static void windowFunc0() {
Observable
.interval(0, 100, TimeUnit.MILLISECONDS, Schedulers.immediate())
.limit(10)
.window(new Func0<Observable<?>>() {
@Override
public Observable<?> call() {
return Observable.timer(500, TimeUnit.MILLISECONDS);
}
})
.subscribe(new Subscriber<Observable<Long>>() {
@Override
public void onCompleted() {
System.out.println(" onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.println(e.getMessage());
}
@Override
public void onNext(Observable<Long> longObservable) {
longObservable.forEach(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.print(aLong + " ,");
}
});
}
});
}
運(yùn)行結(jié)果:
0 ,1 ,2 ,3 ,4 ,5 ,6 ,7 ,8 ,9 , onCompleted
1.3,2 window(int)
- window(count)
window
打開一個(gè)窗口后窗声,每當(dāng)接收到count
個(gè)數(shù)據(jù)后,就會(huì)關(guān)閉當(dāng)前的窗口辜纲,打開下一個(gè)窗口笨觅。如果從原始Observable
收到了onError
或onCompleted
通知它也會(huì)關(guān)閉當(dāng)前窗口。
簡單使用:
private static void windowCount() {
Observable
.interval(0, 100, TimeUnit.MILLISECONDS, Schedulers.immediate())
.limit(10)
.window(3)
.subscribe(longObservable -> longObservable.forEach(System.out::print));
}
運(yùn)行結(jié)果:
0123456789
在輸出結(jié)果時(shí)耕腾,是3個(gè)數(shù)據(jù)一起打印的见剩,012
有一種一瞬間一起打出來,然后停頓一下扫俺,接著打印下面的一組
1.3.3 window(long,TimeUnit,Scheduler)
- window(timespan, unit[, scheduler])
window
打開一個(gè)窗口后苍苞,每當(dāng)?shù)搅似谙?code>timespan后,就會(huì)關(guān)閉當(dāng)前窗后狼纬,打開一個(gè)新的羹呵。時(shí)間單位是設(shè)置的unit
,scheduler
指定調(diào)度器
簡單使用:
private static void windowTimeSpan() {
Observable
.interval(0, 100, TimeUnit.MILLISECONDS, Schedulers.immediate())
.limit(10)
.window(300,TimeUnit.MILLISECONDS,Schedulers.newThread())
.subscribe(new Action1<Observable<Long>>() {
@Override
public void call(Observable<Long> longObservable) {
longObservable.forEach((along)-> System.out.print(along +" ,"));
}
});
}
運(yùn)行結(jié)果還是0~9
十個(gè)數(shù)字
1.4 Sacn掃描
連續(xù)地對(duì)數(shù)據(jù)序列的每一項(xiàng)應(yīng)用一個(gè)函數(shù)畸颅,然后連續(xù)發(fā)射結(jié)果,默認(rèn)不在任何特定的調(diào)度器上執(zhí)行
- scan(Func2)
當(dāng)原始數(shù)據(jù)發(fā)送第一個(gè)數(shù)據(jù)后担巩,Scan
操作符會(huì)將Func2
中指定的函數(shù)應(yīng)用到第一個(gè)數(shù)據(jù)上,并將操作結(jié)果作為Scan
自身第一個(gè)數(shù)據(jù)發(fā)送出去没炒。后續(xù)第二個(gè)數(shù)據(jù)作為Fun2.call()
方法的第二個(gè)參數(shù)涛癌,而第一次函數(shù)操作的結(jié)果犯戏,作為第一個(gè)參數(shù),再次待用函數(shù)拳话。之后數(shù)據(jù)項(xiàng)都會(huì)重復(fù)先匪,前一次的結(jié)果作為call()
第一個(gè)參數(shù),后一個(gè)數(shù)據(jù)項(xiàng)作為第2個(gè)參數(shù)弃衍,直到最后一個(gè)數(shù)據(jù)項(xiàng)呀非,最后會(huì)發(fā)送onCompleted
通知。Scan
操作符在某些情況下被叫做accumulator
簡單使用:
/**
* 前n項(xiàng)累加和
*/
private static void scanSum() {
Observable
.range(1,10)
.scan(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer sum, Integer item) {
return sum + item;
}
})
.subscribe((n) -> System.out.print(n + " ,"));
}
運(yùn)行結(jié)果:
1 ,3 ,6 ,10 ,15 ,21 ,28 ,36 ,45 ,55 ,
2. 最后
Map,FlatMap
之前已經(jīng)學(xué)習(xí)了解過了镜盯,這里不想再次重復(fù)
這兩天《你的名字》上映了岸裙,聽說很不錯(cuò),約不到妹紙速缆,一會(huì)就在電腦看了降允,哈哈
本人很菜,有錯(cuò)誤請(qǐng)指出
共勉 :)