1.Scan
連續(xù)地對數(shù)據(jù)序列的每一項應(yīng)用一個函數(shù),然后連續(xù)發(fā)射結(jié)果
操作符對原始Observable發(fā)射的第一項數(shù)據(jù)應(yīng)用一個函數(shù)吨凑,然后將那個函數(shù)的結(jié)果作為自己的第一項數(shù)據(jù)發(fā)射捍歪。它將函數(shù)的結(jié)果同第二項數(shù)據(jù)一起填充給這個函數(shù)來產(chǎn)生它自己的第二項數(shù)據(jù)。它持續(xù)進行這個過程來產(chǎn)生剩余的數(shù)據(jù)序列鸵钝。這個操作符在某些情況下被叫做accumulator
看完文檔糙臼,感覺Scan
叫的太牽強,不過自己也想不出別這個更好的詞來恩商,當(dāng)然accumulator
更加得形象变逃。它接受的參數(shù)可以只是一個固定有兩個參數(shù)一個輸出的Func2<>
。
Character[] strs = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h','i'};
Observable<Character> origin = Observable.from(strs);
origin.scan("",new Func2<String, Character, String>() {
@Override
public String call(String s, Character character) {
String result = s+character.charValue();
Log.d("scan","拼接字符 "+s +" + "+character.charValue() +" = " +result);
return result;
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String result) {
Log.d("scan","接收到的 result = "+result);
}
});
可以看到Scan
還有一個參數(shù)怠堪,這里是一個空字符串揽乱,這是另一種形式,第一個參數(shù)是你預(yù)先設(shè)定好的初值粟矿,第二個參數(shù)則是你的函數(shù)凰棉,初值會作為第一個值加入到方法中去。
注意
A = C , B隨意
scan("",new Func2<A, B, C>() {
@Override
public C call(A a, B b) {
return .....;
}
})
這是因為Scan
會接受上一個scan
返回值作為第一個參數(shù)陌粹,當(dāng)然如果你寫不一樣了撒犀,AS會提醒你的,一片紅色波浪線...
2.Debounce & ThrottleWithTimeout
先貼上官方說明
僅在過了一段指定的時間還沒發(fā)射數(shù)據(jù)時才發(fā)射一個數(shù)據(jù),會過濾掉發(fā)射速率過快的數(shù)據(jù)項。
一開我的理解是這樣的或舞,在一定時間段內(nèi)發(fā)送在這個時間段內(nèi)輸出的最后一個數(shù)據(jù)
如下圖
于是乎荆姆,我寫了以下代碼做測試
Observable.interval(1, TimeUnit.SECONDS)//每隔1秒發(fā)送一個Long型的從0自增的數(shù)
.debounce(3, TimeUnit.SECONDS)
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.d("debounce","aLong num = "+aLong);
}
});
然而,打印出來的是一片空白映凳。啥都沒有胆筒!我懵逼了,跟想象中不一樣拔嚎怼腐泻!
于是乎請教了搜索引擎,找到呼嘯而過11寫的代碼
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
if(subscriber.isUnsubscribed()) return;
try {
//產(chǎn)生結(jié)果的間隔時間分別為100队询、200派桩、300...900毫秒
for (int i = 1; i < 10; i++) {
subscriber.onNext(i);
Thread.sleep(i * 100);
}
subscriber.onCompleted();
}catch(Exception e){
subscriber.onError(e);
}
}
}).subscribeOn(Schedulers.newThread())
.debounce(400, TimeUnit.MILLISECONDS) //超時時間為400毫秒
.subscribe(
new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("debounce","onnext aLong num = "+integer);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Log.d("debounce","onerror ");
}
}, new Action0() {
@Override
public void call() {
Log.d("debounce","oncompleted ");
}
});
運行結(jié)果如下:
Next:4
Next:5
Next:6
Next:7
Next:8
Next:9
completed!
恍然大悟,其真正含義是在輸出了一個數(shù)據(jù)后的一段時間內(nèi)蚌斩,沒有再次輸出新的數(shù)據(jù)铆惑,則把這個數(shù)據(jù)真正的發(fā)送出去;假如在這段時間內(nèi)有新的數(shù)據(jù)輸出送膳,則以這個數(shù)據(jù)作為將要發(fā)送的數(shù)據(jù)項员魏,并且重置這個時間段,重新計時叠聋。
我把之前的測試代碼按照我的理解修改了一下
/**debounce設(shè)置時間間隔為3秒撕阎,interval每一秒發(fā)送一個數(shù)據(jù)項,中間用filter過濾掉了第1碌补、2虏束、3,6厦章、
7镇匀,11、12袜啃、13汗侵、14秒的數(shù)據(jù)項,就是為了看第0項群发,第5項晰韵,第10項能否發(fā)送,按理解熟妓,在3秒的時間間隔
內(nèi)沒有新數(shù)據(jù)項到達才會發(fā)送最后一個數(shù)據(jù)項雪猪,即0、10項是可以被發(fā)送的滑蚯,而5不可以;y以及使用map打印
出真正到達debounce的數(shù)據(jù)項,方便理解
*/
Observable.interval(1, TimeUnit.SECONDS)//每隔1秒發(fā)送一個Long型的從0自增的數(shù)
.filter(new Func1<Long, Boolean>() {
@Override
public Boolean call(Long aLong) {
if(aLong==1) return false;
if(aLong==2) return false;
if(aLong==3) return false; //發(fā)送0
if(aLong==6) return false;
if(aLong==7) return false; //不能發(fā)送5
if(aLong==11) return false;//發(fā)送 10
if(aLong==12) return false;
if(aLong==13) return false;
if(aLong==14) return false;
return true;
}
})
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
Log.d("debounce","發(fā)送了 "+aLong);
return aLong;
}
})
.debounce(3, TimeUnit.SECONDS)
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.d("debounce","aLong num = "+aLong);
}
});
Debounce
有另一種形式,使用一個Func1<?, Observable<?>>
的函數(shù)來限制發(fā)送的數(shù)據(jù)告材。
來自bobo_wang的代碼
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.debounce(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(final Integer integer) {
return Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
//如果%2==0坤次,則發(fā)射數(shù)據(jù)并調(diào)用了onCompleted結(jié)束,則不會被丟棄
if (integer % 2 == 0 && !subscriber.isUnsubscribed()) {
subscriber.onNext(integer);
subscriber.onCompleted();
}
}
});
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("debounce","integer = "+integer);
}
});
輸出為
debounce:2
debounce:4
debounce:6
debounce:8
debounce:9
而ThrottleWithTimeout
則只有跟使用時間參數(shù)來限流的Debounce
一樣的功能斥赋。
這兩個操作符的理解沒有那么容易缰猴,需要寫多幾個例子來加深自己的印象。