1媚创、filter First
StartWith操作符會(huì)在源Observable發(fā)射的數(shù)據(jù)前面插上一些數(shù)據(jù)戚篙。
Observable.range(1, 2).startWith(0).subscribe(consumer);
//Observable.range(1, 2).startWithArray(-1,0).subscribe(consumer);
12-29 13:27:50.680 6062-6062/com.rxjavademo E/tag: 0
12-29 13:27:50.686 6062-6062/com.rxjavademo E/tag: 1
12-29 13:27:50.689 6062-6062/com.rxjavademo E/tag: 2
First 操作符返回第一條數(shù)據(jù)或者返回滿足條件的第一條數(shù)據(jù)。
Last操作符返回最后一條數(shù)據(jù)或者滿足條件的最后一條數(shù)據(jù)。
2哩牍、elementAt
elementAt(index) 發(fā)送腳標(biāo)為index的數(shù)據(jù)
//在這里發(fā)送0错忱、1界阁、2侯繁、3、4泡躯,腳標(biāo)為3的數(shù)據(jù)為3
Observable.range(0, 5).elementAt(3).subscribe(consumer);
12-29 12:01:41.090 461-461/com.rxjavademo E/tag: 3
3贮竟、distinct
distinct 去重
//
Observable.just(1, 1, 2, 2, 2, 3).distinct().subscribe(consumer);
12-29 12:05:34.576 461-461/com.rxjavademo E/tag: 1
12-29 12:05:34.586 461-461/com.rxjavademo E/tag: 2
12-29 12:05:34.590 461-461/com.rxjavademo E/tag: 3
distinctUntilChanged操作符用來(lái)過(guò)濾掉連續(xù)的重復(fù)數(shù)據(jù)。
//
Observable.just(1, 2, 2, 3,1).distinctUntilChanged().subscribe(consumer);
12-29 13:20:37.298 5516-5516/com.rxjavademo E/tag: 1
12-29 13:20:37.308 5516-5516/com.rxjavademo E/tag: 2
12-29 13:20:37.311 5516-5516/com.rxjavademo E/tag: 3
12-29 13:20:37.314 5516-5516/com.rxjavademo E/tag: 1
4精续、skip
前N項(xiàng)去除掉
//
Observable.range(1, 5).skip(2).subscribe(consumer);
12-29 12:11:53.557 461-461/com.rxjavademo E/tag: 3
12-29 12:11:53.563 461-461/com.rxjavademo E/tag: 4
12-29 12:11:53.568 461-461/com.rxjavademo E/tag: 5
5坝锰、take
發(fā)送的元素只取前N項(xiàng)
//
Observable.range(0, 5).take(2).subscribe(consumer);
12-29 12:19:15.257 1642-1642/com.rxjavademo E/tag: 0
12-29 12:19:15.265 1642-1642/com.rxjavademo E/tag: 1
Observable.interval(1, TimeUnit.SECONDS, AndroidSchedulers.mainThread()).take(3).subscribe(consumer);
12-29 12:21:33.071 2269-2269/com.rxjavademo E/tag: 0
12-29 12:21:34.071 2269-2269/com.rxjavademo E/tag: 1
12-29 12:21:35.071 2269-2269/com.rxjavademo E/tag: 2
6、ignoreElements
ignoreElements操作符忽略所有源Observable產(chǎn)生的結(jié)果重付,只會(huì)執(zhí)行onCpmpleted()或者onError()方法
Observable.range(0, 5).ignoreElements().subscribe(new Action() {
@Override
public void run() throws Exception {
Log.e("tag","onCompleted()" );
}
}, consumer);
12-29 12:39:59.976 3700-3700/com.rxjavademo E/tag: onCompleted()
7、throttleFirst
throttleFirst操作符凫乖,會(huì)定期發(fā)送這個(gè)時(shí)間段里源Observable發(fā)送的第一個(gè)數(shù)據(jù)
Observable
.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
for (int i = 0; i < 10; i++) {
emitter.onNext(i);
//線程休眠100毫秒
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
})
.throttleFirst(200, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer);
12-29 13:09:25.066 3700-3700/com.rxjavademo E/tag: 0
12-29 13:09:25.258 3700-3700/com.rxjavademo E/tag: 2
12-29 13:09:25.459 3700-3700/com.rxjavademo E/tag: 4
12-29 13:09:25.662 3700-3700/com.rxjavademo E/tag: 6
12-29 13:09:25.862 3700-3700/com.rxjavademo E/tag: 8
6确垫、throttleWithTimeout
如果兩次數(shù)據(jù)的發(fā)射間隔小于指定時(shí)間,就會(huì)丟棄前一次的數(shù)據(jù),直到指定時(shí)間內(nèi)都沒(méi)有新數(shù)據(jù)發(fā)射時(shí)才進(jìn)行發(fā)射
Observable
.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
emitter.onNext(1);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
emitter.onNext(2);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
emitter.onNext(3);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
emitter.onNext(4);
emitter.onNext(5);
emitter.onNext(6);
emitter.onComplete();
}
})
.throttleWithTimeout(800, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer);
12-29 13:10:12.382 4834-4834/com.rxjavademo E/tag: 3
12-29 13:10:12.582 4834-4834/com.rxjavademo E/tag: 6