Filter
RxJava讓我們使用filter()方法來過濾觀測序列中我們不想要的值敢靡。
-
先來個沒有帶過濾的
public static void main(String... args) { Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("張三"); subscriber.onNext("李四"); subscriber.onNext("王五"); } }).subscribe(new Observer<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { System.out.println("我是" + s); } }); }
打印結(jié)果:</br>
我是張三</br>
我是李四</br>
我是王五</br>
-
現(xiàn)在在創(chuàng)建Observable之后添加filter()方法挂滓。
public static void main(String... args) { Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("張三"); subscriber.onNext("李四"); subscriber.onNext("王五"); } }).filter(new Func1<String, Boolean>() { @Override public Boolean call(String s) { return s.startsWith("李"); } }).subscribe(new Observer<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { System.out.println("我是" + s); } }); }
可以看到添加的filter()方法,判斷字符串是否以"李"開頭啸胧,返回一個布爾值赶站,只要條件符合filter()函數(shù)就會返回true。此時纺念,該值就會發(fā)送出去贝椿。
打印結(jié)果:</br>
我是李四</br>
Take
許多時候,可能生產(chǎn)者(也就是被觀察者)訂閱了好幾個消費者(也就是觀察者)陷谱,以后就用生產(chǎn)者和消費者來敘述烙博,觀察者和被觀察者拗口得要命有木有!言歸正傳烟逊,生產(chǎn)者會產(chǎn)生一條數(shù)據(jù)流习勤,而你消費者可能僅僅只需要開頭或者結(jié)尾的幾個元素,那么RxJava也為我們提供了take()和takeLast()方法來實現(xiàn)焙格。
-
take()图毕,如果我們只想要一個觀測序列中的前兩個元素,給take()傳入?yún)?shù):整數(shù)2眷唉,就能實現(xiàn)予颤。
public static void main(String... args) { Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("張三"); subscriber.onNext("李四"); subscriber.onNext("王五"); } }) .take(2) .subscribe(new Observer<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { System.out.println("我是" + s); } }); }
打印結(jié)果:</br>
我是張三</br>
我是李四</br>
在這里的take(n),表示的是說生產(chǎn)者發(fā)送前n個數(shù)據(jù),n = 2 也就發(fā)送前兩個數(shù)據(jù)冬阳,并不是說數(shù)據(jù)全部發(fā)完蛤虐,截取前兩個數(shù)據(jù)。
-
takeLast()能夠讓我們發(fā)送后幾個數(shù)據(jù)元素肝陪。
public static void main(String... args) { Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("張三"); subscriber.onNext("李四"); subscriber.onNext("王五"); subscriber.onCompleted(); } }) .takeLast(1) .subscribe(new Observer<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { System.out.println("我是" + s); } }); }
同樣的驳庭,不能少了subscriber.onCompleted()
。
打印結(jié)果:</br>
我是王五</br>
Distinct
distinct()作用于一個完整的序列氯窍,所有重復(fù)的數(shù)據(jù)項只會發(fā)射一次饲常。
public static void main(String... args) {
Observable.just(1,2,1,2)
.distinct()
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
System.out.println("i = " + integer);
}
});
}
打印結(jié)果:</br>
i = 1</br>
i = 2</br>
DistinctUntilChanged
distinctUntilChanged()與distinct()相類似,不過distinctUntilChanged()是判斷當(dāng)前發(fā)射的值與前一個數(shù)據(jù)是否相同狼讨,在實際中贝淤,可以假設(shè)情形比如說UI根據(jù)獲取到的數(shù)據(jù)不同更新自身UI,但是如果數(shù)據(jù)內(nèi)容并沒有發(fā)生改變政供,出于不浪費資源的目的播聪,就不要發(fā)射數(shù)據(jù)朽基。
public static void main(String... args) {
Observable.just(1,2,2)
.distinct()
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
System.out.println("i = " + integer);
}
});
}
打印結(jié)果:</br>
i = 1</br>
i = 2</br>
First
first()從Observable中只發(fā)射第一個元素,或者添加參數(shù)first(Fun1)只發(fā)送符合條件的第一個數(shù)據(jù)項离陶。
public static void main(String... args) {
Observable.just(1,2,2)
.first()
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
System.out.println("i = " + integer);
}
});
}
打印結(jié)果:</br>
i = 1
public static void main(String... args) {
Observable.just(1,2,2)
.first(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer == 2;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
System.out.println("i = " + integer);
}
});
}
打印結(jié)果:</br>
i = 2</br>
Last
first()從Observable中只發(fā)射最后一個元素稼虎,或者添加參數(shù)first(Fun1)只發(fā)送符合條件的最后一個數(shù)據(jù)項。代碼參考First招刨。
Skip
skip(int)可以讓我們忽略O(shè)bservable前n個元素渡蜻,而直接跳過這n個元素發(fā)射后面的元素。
public static void main(String... args) {
Observable.just(1,2,2)
.skip(2)
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
System.out.println("i = " + integer);
}
});
}
打印結(jié)果:</br>
i = 2</br>
SkipLast
skipLast(int)則是忽略后n個元素的發(fā)射计济。
ElementAt
現(xiàn)在我們有了控制前后的元素過濾規(guī)則,那么自然會有一個問題排苍,如果我只想要觀測序列其中的一個元素該怎么辦呢沦寂,那么elementAt(int)就能實現(xiàn)。elementAt(int)
用來獲取元素Observable發(fā)射的事件序列中的第n項數(shù)據(jù)淘衙,并當(dāng)做唯一的數(shù)據(jù)發(fā)射出去传藏。
public static void main(String... args) {
Observable.just(1,2,3)
.elementAt(2)
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
System.out.println("i = " + integer);
}
});
}
打印結(jié)果:</br>
i = 3</br>
同時還有一個拓展方法,如果想查找第六個元素彤守,但是可觀測序列只有三個元素怎么辦毯侦,可以用elementAtOrDefault(int index, T defaultValue)
,在第二個參數(shù)傳入一個默認(rèn)值具垫。
Sample
假如我們有一個溫度傳感器侈离,每秒鐘都會發(fā)射一次室內(nèi)溫度,然后UI根據(jù)溫度變化而更新筝蚕,但是有一個問題卦碾,我們認(rèn)為每秒鐘就獲取一次數(shù)據(jù)并更新相當(dāng)?shù)睦速M資源,再說溫度也不一定變化這么快起宽,那么我們就需要一個小小的發(fā)射間隔洲胖。sample()就能幫我們做到這一點,在Observable后面加一個sample(),將創(chuàng)建一個新的觀測序列坯沪,并且它會在指定的時間間隔里由Observable發(fā)射最近的一次數(shù)值绿映。
public static void main(String... args) {
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i <= 50; i++) {
if (i % 10 ==0){
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
subscriber.onNext(i);
}
}
});
observable.sample(2, TimeUnit.SECONDS)
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
System.out.println("i = " + integer);
}
});
}
先打印結(jié)果:</br>
i = 9</br>
i = 19</br>
i = 29</br>
i = 39</br>
i = 49</br>
分析:這里我們先創(chuàng)建了一個Observable,主要就是一個for循環(huán)腐晾,依次發(fā)射0~50叉弦,為了驗證方便呢,就加了一個判斷藻糖,如果發(fā)射的是整數(shù)就線程休眠5s卸奉,為什么要這樣干呢,你想cpu多快啊颖御,才50個數(shù)不是一滋溜就發(fā)射完了么榄棵,那之后通過sample(2, TimeUnit.SECONDS)
設(shè)置的2s發(fā)射一個最近的值不是只有最后的一個值了么凝颇,打印的結(jié)果也就達(dá)不到驗證的目的了呀。這里再附上一張圖:
如果我們想讓它定時發(fā)射第一個元素而不是最近的一個元素疹鳄,我們可以使用throttleFirst()
拧略。
Timeout
有的時候我們在規(guī)定的時間內(nèi)必須要有一個數(shù)據(jù),就上文的溫度傳感器來說瘪弓,我們想讓它每隔兩秒至少發(fā)射一個垫蛆,那么我們就可以用timeout
函數(shù)來監(jiān)聽觀測序列,如果在我們設(shè)定的時間內(nèi)沒有得到一個值就發(fā)射一個錯誤腺怯。
public static void main(String... args) {
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 50; i++) {
if (i % 10 == 0){
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
subscriber.onNext(i);
}
}
})
.timeout(2, TimeUnit.SECONDS)
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
System.out.println("Timeout error");
}
@Override
public void onNext(Integer integer) {
System.out.println("i = " + integer);
}
});
}
打印結(jié)果:</br>
Timeout error</br>
可以看到袱饭,我們通過timeout(2, TimeUnit.SECONDS)
設(shè)置了2s的時間限制,而在Observable中讓線程休眠了2s,那么觸發(fā)了Timeout呛占,發(fā)射了一個錯誤虑乖。
Debounce
debounce()過濾掉了由Observable發(fā)射的速率過快的數(shù)據(jù),如果在一個指定的時間間隔過去了仍舊沒有發(fā)射一個,那么它將發(fā)射最后的那個晾虑。
public static void main(String... args) {
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i <= 50; i++) {
if (i % 10 == 0){
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
subscriber.onNext(i);
}
}
})
.debounce(2 , TimeUnit.SECONDS)
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
System.out.println("i = " + integer);
}
});
}
打印結(jié)果:</br>
i = 9</br>
i = 19</br>
i = 29</br>
i = 39</br>
i = 49</br>
從打印的結(jié)果來看疹味,與之前的sample
一般無二,但是要理解意義的不同帜篇,sample
是在一條可觀測序列中糙捺,選擇指定時間段要發(fā)射的元素發(fā)射出來,而debounce
是指一段時間內(nèi)沒有新數(shù)據(jù)發(fā)射笙隙,那么就發(fā)射最后的那一個洪灯。