創(chuàng)建一個(gè)被觀察者畅哑,發(fā)起一個(gè)網(wǎng)絡(luò)請(qǐng)求迁杨。
observable
Retrofit retrofit = new Retrofit.Builder()
.baseUrl(Urls.baseUrl)
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
GetRequest request=retrofit.create(GetRequest.class);
Observable<Translation> observable=request.getCall1();
GetRequest代碼
public interface GetRequest {
@GET(Urls.test)
Observable<Translation> getCall();
@GET(Urls.test1)
Observable<Translation> getCall1();
Translation代碼
public class Translation {
private int status;
private content content;
private static class content{
private String from;
private String to;
private String vendor;
private String out;
private int errNo;
}
public String show(){
Log.e("yzh",content.out);
return content.out;
}
}
對(duì)retrofit不太了解的可以去看一下retrofit入門
1 模擬一次重復(fù)查詢的情況,使用了Rxjava的interval操作符
//最外面是一個(gè)循環(huán)的操作
Observable.interval(2,1, TimeUnit.SECONDS)
.doOnNext(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e("yzh","第"+aLong+"次輪詢");
//請(qǐng)求在這里執(zhí)行湃缎,注意看到線程的切換
observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Translation>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Translation translation) {
translation.show();
}
@Override
public void onError(Throwable e) {
Log.e("yzh","請(qǐng)求失敗");
}
@Override
public void onComplete() {
}
});
}
}).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
}
@Override
public void onError(Throwable e) {
Log.e("yzh","對(duì)Error事件作出響應(yīng)");
}
@Override
public void onComplete() {
Log.e("yzh","對(duì)Complete事件作出響應(yīng)");
}
});
打印結(jié)果
// 03-12 14:56:16.528 8733-8760/com.example.issuser.rxtest E/yzh: 第0次輪詢
// 03-12 14:56:16.886 8733-8733/com.example.issuser.rxtest E/yzh: 嗨世界
// 03-12 14:56:17.528 8733-8760/com.example.issuser.rxtest E/yzh: 第1次輪詢
// 03-12 14:56:17.702 8733-8733/com.example.issuser.rxtest E/yzh: 嗨世界
// 03-12 14:56:18.529 8733-8760/com.example.issuser.rxtest E/yzh: 第2次輪詢
// 03-12 14:56:18.675 8733-8733/com.example.issuser.rxtest E/yzh: 嗨世界
// 03-12 14:56:19.528 8733-8760/com.example.issuser.rxtest E/yzh: 第3次輪詢
// 03-12 14:56:19.674 8733-8733/com.example.issuser.rxtest E/yzh: 嗨世界
2.條件輪詢 枪萄,使用RxJava的repeatWhen
被觀察者的網(wǎng)絡(luò)請(qǐng)求同上
observable.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Object o) throws Exception {
//設(shè)置輪詢條件
if(i>3){
return Observable.error(new Throwable("輪詢結(jié)束"));
}else{
return Observable.just(1).delay(2, TimeUnit.SECONDS);
}
}
});
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Translation>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("yzh","onSubscribe");
}
@Override
public void onNext(Translation translation) {
translation.show();
i++;
}
@Override
public void onError(Throwable e) {
Log.e("yzh","onError--"+e.toString());
}
@Override
public void onComplete() {
}
});
打印結(jié)果
03-12 15:25:00.524 12910-12910/com.example.issuser.rxtest E/yzh: onSubscribe
03-12 15:25:00.715 12910-12910/com.example.issuser.rxtest E/yzh: hi china
03-12 15:25:02.852 12910-12910/com.example.issuser.rxtest E/yzh: hi china
03-12 15:25:05.092 12910-12910/com.example.issuser.rxtest E/yzh: hi china
03-12 15:25:07.244 12910-12910/com.example.issuser.rxtest E/yzh: hi china
03-12 15:25:09.382 12910-12910/com.example.issuser.rxtest E/yzh: hi china
03-12 15:25:09.395 12910-12910/com.example.issuser.rxtest E/yzh: onError--java.lang.Throwable: 輪詢結(jié)束
3 網(wǎng)絡(luò)異常重連 這個(gè)是常見的情況但是不好模擬所以沒有打印結(jié)果
用到的是RxJava的retryWhen
observable.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
Log.e("yzh","發(fā)生異常=="+throwable.toString());
if(throwable instanceof IOException){
Log.e("yzh","屬于IO異常决侈,重試");
if(currentRetryCount<maxConnectCount){
currentRetryCount++;
Log.e("yzh","重試的次數(shù)--"+currentRetryCount);
waitRetryTime=1000+currentRetryCount*1000;
Log.e("yzh","等待時(shí)間=="+waitRetryTime);
return Observable.just(1).delay(waitRetryTime, TimeUnit.MILLISECONDS);
}else{
return Observable.error(new Throwable("重試次數(shù)已超過設(shè)置次數(shù) = " +currentRetryCount + ",即 不再重試"));
}
}else{
return Observable.error(new Throwable("發(fā)生了非網(wǎng)絡(luò)異常(非I/O異常)"));
}
}
});
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Translation>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Translation translation) {
Log.e("yzh", "發(fā)送成功");
translation.show();
}
@Override
public void onError(Throwable e) {
Log.e("yzh",e.toString());
}
@Override
public void onComplete() {
}
});
4 對(duì)網(wǎng)絡(luò)請(qǐng)求返回的數(shù)據(jù)再做一次處理牵舱,用到RxJava的flatmap
常見的使用場(chǎng)景是注冊(cè)并且直接登錄這個(gè)場(chǎng)景串绩,注意線程的切換
observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<Translation>() {
@Override
public void accept(Translation translation) throws Exception {
Log.e("yzh","doOnNextThread--"+Thread.currentThread().getName());
Log.e("yzh","第一次網(wǎng)絡(luò)請(qǐng)求成功");
translation.show();
}
})
//切換觀察者所在線程
.observeOn(Schedulers.io())
.flatMap(new Function<Translation, ObservableSource<Translation>>() {
@Override
public ObservableSource<Translation> apply(Translation translation) throws Exception {
Log.e("yzh","flatMapThread--"+Thread.currentThread().getName());
Log.e("yzh","flatmap--");
translation.show();
//另一個(gè)網(wǎng)絡(luò)請(qǐng)求,這里就不詳細(xì)列舉了
return observable1;
}
})
//切換觀察者所在線程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Translation>() {
@Override
public void accept(Translation translation) throws Exception {
Log.e("yzh","acceptThread--"+Thread.currentThread().getName());
Log.e("yzh","accept--");
translation.show();
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
}
});
打印結(jié)果 注意在第二次請(qǐng)求時(shí) 觀察者所在線程的切換
03-12 11:50:40.681 11667-11667/com.example.issuser.rxtest E/yzh: doOnNextThread--main
03-12 11:50:40.681 11667-11667/com.example.issuser.rxtest E/yzh: 第一次網(wǎng)絡(luò)請(qǐng)求成功
03-12 11:50:40.681 11667-11667/com.example.issuser.rxtest E/yzh: 嗨世界
03-12 11:50:40.681 11667-11720/com.example.issuser.rxtest E/yzh: flatMapThread--RxCachedThreadScheduler-2
03-12 11:50:40.681 11667-11720/com.example.issuser.rxtest E/yzh: flatmap--
03-12 11:50:40.681 11667-11720/com.example.issuser.rxtest E/yzh: 嗨世界
03-12 11:50:40.949 11667-11667/com.example.issuser.rxtest E/yzh: acceptThread--main
03-12 11:50:40.949 11667-11667/com.example.issuser.rxtest E/yzh: accept--
03-12 11:50:40.949 11667-11667/com.example.issuser.rxtest E/yzh: hi china
5 聯(lián)合判斷多個(gè)條件 應(yīng)用于登錄注冊(cè)等需要填完多個(gè)信息
a. RxTextView.textChanges()監(jiān)聽控件的數(shù)據(jù)變化 芜壁,需要引入依賴:compile 'com.jakewharton.rxbinding2:rxbinding:2.0.0'
b. skip(1) 跳過控件一開始無任何輸入值的情況
Observable<CharSequence> nameObservable = RxTextView.textChanges(et_name).skip(1);
Observable<CharSequence> ageObservable = RxTextView.textChanges(et_age).skip(1);
Observable<CharSequence> jobObservable = RxTextView.textChanges(et_job).skip(1);
Observable.combineLatest(nameObservable, ageObservable, jobObservable, new Function3<CharSequence, CharSequence, CharSequence, Boolean>() {
@Override
public Boolean apply(CharSequence charSequence, CharSequence charSequence2, CharSequence charSequence3) throws Exception {
boolean isUserNameValid= !TextUtils.isEmpty(et_name.getText().toString());
boolean isUserAgeValid=!TextUtils.isEmpty(et_age.getText().toString());
boolean isUserJobValid=!TextUtils.isEmpty(et_job.getText().toString());
return isUserNameValid&isUserAgeValid&isUserJobValid;
}
}).subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.e("yzh","提交按鈕是否可以點(diǎn)擊--"+aBoolean);
}
});
6 使用RxJava時(shí)要注意的內(nèi)存泄露礁凡,例如有時(shí)候頁面結(jié)束 異步操作并未完成
// 防止activity結(jié)束 出現(xiàn)問題
private final CompositeDisposable disposables = new CompositeDisposable();
disposables.add(observer);
@Override
protected void onDestroy() {
super.onDestroy();
// 將所有的 observer 取消訂閱
disposables.clear();
}