文檔網(wǎng)址:https://mcxiaoke.gitbooks.io/rxdocs/content/
private void repeat(){
/**
* main 1
main 2
main 1
main 2
main 1
main 2
main 1
main 2
main 1
main 2
*/
Observable.range(1, 2).repeat(5).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.e("qwer",Thread.currentThread().getName() + " " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
private void range(){
Observable.range(1,10).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.e("qwer",Thread.currentThread().getName() + " " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
private void just(){
Observable.just(1,"df").subscribe();
Observable.just(1,"df",99L,new Object()).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
private void timer_interval(){
Observer<Long> observer = new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
//延遲 1s 后執(zhí)行一個任務(wù)栽惶,然后結(jié)束
//Observable.timer(1000, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io()).subscribe(new Observer<Long>() {
Observable.timer(1000, TimeUnit.MILLISECONDS).
subscribeOn(Schedulers.newThread()).
observeOn(AndroidSchedulers.mainThread()).
subscribe(observer);
//每隔 1s 執(zhí)行一次任務(wù),第一次任務(wù)執(zhí)行前有 1s 的間隔久脯,執(zhí)行無限次
Observable.interval(1000, TimeUnit.MILLISECONDS).
subscribeOn(Schedulers.io()).subscribe(observer);
//每隔 1s 執(zhí)行一次任務(wù)衙傀,立即執(zhí)行第一次任務(wù)莺奸,執(zhí)行無限次
Observable.interval(0, 1000, TimeUnit.MILLISECONDS).
subscribeOn(Schedulers.io()).subscribe(observer);
//每隔 1s 執(zhí)行一次任務(wù)灭贷,立即執(zhí)行第一次任務(wù)甚疟,只執(zhí)行五次
Observable.interval(0, 1000, TimeUnit.MILLISECONDS).
take(5).
subscribe(observer);
//先執(zhí)行一個任務(wù)逃延,等待 1s真友,再執(zhí)行另一個任務(wù)紧帕,然后結(jié)束
Observable.just(0L).doOnNext(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d("qwer", "執(zhí)行第一個任務(wù)");
}
}).delay(1000, TimeUnit.MILLISECONDS).subscribe(observer);
}
private void from(){
List<String> data = new ArrayList<>();
data.add("1");
data.add("2");
data.add("3");
data.add("4");
Observable.fromIterable(data).doOnNext(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
}
}).subscribe();
Observable.fromIterable(data).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("qwer","accept : " + s);
}
}).dispose();
Observable.fromIterable(data).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("qwer","onSubscribe");
}
@Override
public void onNext(String s) {
Log.e("qwer","onNext" + s);
}
@Override
public void onError(Throwable e) {
Log.e("qwer","onError:" + e.getMessage());
}
@Override
public void onComplete() {
Log.e("qwer","onComplete");
}
});
/**
* Observable 1
onNext 1
Observable 2
onNext 2
Observable 3
onNext 3
Observable 4
onNext 4
*/
Observable.fromIterable(data).doOnNext(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("qwer"," Observable " + s);
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.e("qwer"," onNext " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
private void create(){
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) {
for(int i = 0;i < 13;i++){
if(i == 10){
emitter.onError(new Throwable(""));
}else {
emitter.onNext(i);
}
}
emitter.onComplete();
}
}).onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) throws Exception {
return 100;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("qwer","onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e("qwer","onNext" + integer);
}
@Override
public void onError(Throwable e) {
Log.e("qwer","onError:" + e.getMessage());
}
@Override
public void onComplete() {
Log.e("qwer","onComplete");
}
});
}