RxJava2.0 操作符(1)—— Create 創(chuàng)建操作
Create 創(chuàng)建類(lèi)操作符
Create — 使用一個(gè)函數(shù)從頭創(chuàng)建一個(gè) Observable。
Defer — 只有當(dāng)訂閱者訂閱才創(chuàng)建 Observable疆前;為每個(gè)訂閱創(chuàng)建一個(gè)新的 Observable寒跳。
Empty — 創(chuàng)建一個(gè)什么都不做直接通知完成的 Observable。
Error — 創(chuàng)建一個(gè)什么都不做直接通知錯(cuò)誤的 Observable竹椒。
From — 將一個(gè) Iterable, 一個(gè) Future, 或者一個(gè)數(shù)組轉(zhuǎn)換成一個(gè) Observable童太。
Interval — 創(chuàng)建一個(gè)按照給定的時(shí)間間隔發(fā)射整數(shù)序列的 Observable。
Just — 將一個(gè)或多個(gè)對(duì)象轉(zhuǎn)換成發(fā)射這個(gè)或這些對(duì)象的一個(gè) Observable胸完。
Range — 創(chuàng)建一個(gè)發(fā)射指定范圍的整數(shù)序列的 Observable书释。
Repeat — 創(chuàng)建一個(gè)重復(fù)發(fā)射指定數(shù)據(jù)或數(shù)據(jù)序列的 Observable。
RepeatWhen — 創(chuàng)建一個(gè)重復(fù)發(fā)射指定數(shù)據(jù)或數(shù)據(jù)序列的 Observable赊窥,它依賴(lài)于另一個(gè) Observable 發(fā)射的數(shù)據(jù)爆惧。
Never — 創(chuàng)建一個(gè)不發(fā)射任何數(shù)據(jù)的 Observable。
Timer — 創(chuàng)建一個(gè)在給定的延時(shí)之后發(fā)射單個(gè)數(shù)據(jù)的 Observable锨能。
1.1 Create
使用一個(gè)函數(shù)從頭創(chuàng)建一個(gè) Observable扯再。
示例代碼:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
for (int i = 1; i < 5; i++) {
emitter.onNext(i+"");
}
emitter.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String s) {
System.out.println("Next: " + s);
}
@Override
public void onError(@NonNull Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onComplete() {
System.out.println("Sequence complete.");
}
輸出:
Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.
1.2 defer
只有當(dāng)訂閱者訂閱才創(chuàng)建 Observable;才會(huì)為每個(gè)訂閱創(chuàng)建一個(gè)新的 Observable址遇。
示例代碼:
Observable<String> observable = Observable.defer(new Callable<ObservableSource<? extends String>>() {
@Override
public ObservableSource<? extends String> call() throws Exception {
return Observable.just("String");
}
});
observable.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println(s);
}
});
輸出:
String
1.3 Empty
創(chuàng)建一個(gè)什么都不做直接通知完成的 Observable熄阻。
示例代碼:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
for (int i = 1; i < 5; i++) {
emitter.onNext(i+"");
}
emitter.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String s) {
System.out.println("Next: " + s);
}
@Override
public void onError(@NonNull Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onComplete() {
System.out.println("Sequence complete.");
}
});
輸出結(jié)果:
onComplete
1.3 Error
創(chuàng)建一個(gè)什么都不做直接通知錯(cuò)誤的 Observable。
示例代碼:
Observable<String> observable = Observable.error(new Callable<Throwable>() {
@Override
public Throwable call() throws Exception {
return new NullPointerException();
}
});
observable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String s) {
System.out.println("Next: " + s);
}
@Override
public void onError(@NonNull Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
輸出結(jié)果:
Error: null
1.4 From
將一個(gè) Iterable, 一個(gè) Future, 或者一個(gè)數(shù)組轉(zhuǎn)換成一個(gè) Observable倔约。
示例代碼1:
//1.遍歷集合
List<String> items = new ArrayList<>();
for (int i = 0; i < 3; i++) {
items.add(i + "");
}
Observable<String> observable = Observable.fromIterable(items);
//Observable<String> observable = Observable.fromArray(new String[]{"Hello", "world"});
observable.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("result:" + s);
}
});
輸出結(jié)果:
result:0
result:1
result:2
1.5 Interval
創(chuàng)建一個(gè)按照給定的時(shí)間間隔發(fā)射整數(shù)序列的 Observable秃殉。
示例代碼:
final CompositeDisposable disposable = new CompositeDisposable();
disposable.add(Observable.interval(1, TimeUnit.SECONDS).subscribeWith(new DisposableObserver<Long>() {
@Override
public void onNext(@NonNull Long aLong) {
System.out.println("Next: " + aLong);
}
@Override
public void onError(@NonNull Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
}));
//5秒后取消訂閱
try {
Thread.sleep(4000);
//取消訂閱
disposable.dispose();
} catch (InterruptedException e) {
e.printStackTrace();
}
輸出結(jié)果:
Next: 0
Next: 1
Next: 2
Next: 3
1.6 Just
將一個(gè)或多個(gè)對(duì)象轉(zhuǎn)換成發(fā)射這個(gè)或這些對(duì)象的一個(gè) Observable。
示例代碼:
Observable.just(1, 2, 3).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer s) {
System.out.println("Next: " + s);
}
@Override
public void onError(@NonNull Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
輸出結(jié)果:
Next: 1
Next: 2
Next: 3
onComplete
1.7 Range
創(chuàng)建一個(gè)發(fā)射指定范圍的整數(shù)序列的 Observable浸剩。
RxJava 將這個(gè)操作符實(shí)現(xiàn)為 range 函數(shù)钾军,它接受兩個(gè)參數(shù),一個(gè)是范圍的起始值绢要,一個(gè)是范圍的數(shù)據(jù)的數(shù)目吏恭。如果你將第二個(gè)參數(shù)設(shè)為 0,將導(dǎo)致 Observable 不發(fā)射任何數(shù)據(jù)(如果設(shè)置為負(fù)數(shù)袖扛,會(huì)拋異常)
示例代碼:
// 依次發(fā)射 10砸泛、11十籍、12
Observable.range(10, 2).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer s) {
System.out.println("Next: " + s);
}
@Override
public void onError(@NonNull Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
輸出結(jié)果:
Next: 10
Next: 11
Next: 12
onComplete
1.8 Repeat
創(chuàng)建一個(gè)重復(fù)發(fā)射指定數(shù)據(jù)或數(shù)據(jù)序列的 Observable。
示例代碼:
//重復(fù)三次唇礁,repeat()就是無(wú)限次
Observable.just("hello", "world").repeat(3).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Object o) {
System.out.println("onNext:" + o.toString());
}
@Override
public void onError(@NonNull Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
輸出結(jié)果:
onNext:hello
onNext:world
onNext:hello
onNext:world
onNext:hello
onNext:world
onComplete
1.9 RepeatWhen
創(chuàng)建一個(gè)重復(fù)發(fā)射指定數(shù)據(jù)或數(shù)據(jù)序列的 Observable勾栗,它依賴(lài)于另一個(gè) Observable 發(fā)射的數(shù)據(jù)。
1.10 Never
創(chuàng)建一個(gè)不發(fā)射任何數(shù)據(jù)的 Observable盏筐。
(ps:不太懂有何意義)
1.11 Timer
創(chuàng)建一個(gè)在給定的延時(shí)之后發(fā)射單個(gè)數(shù)據(jù)的 Observable围俘。
在 RxJava 1.0.0 及其之后的版本,官方已不再提倡使用 timer() 操作符琢融,因?yàn)?interval() 具有同樣的功能界牡。
示例代碼:
Observable.timer(1, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
System.out.println("result:" + aLong);
}
});
輸出結(jié)果:
result:0