關(guān)于合并數(shù)據(jù)源:之前使用了flatMap()以及concatMap()進(jìn)行嵌套調(diào)用入录,,注冊之后登陸
合并數(shù)據(jù)源2:合并數(shù)據(jù)(獲取圖書詳情以及評論)統(tǒng)一展示到客戶端:采用merge()或者zip()操作符
merge()例子:實(shí)現(xiàn)較為簡單的從(網(wǎng)絡(luò)+本地)獲取數(shù)據(jù)举塔,奋隶,統(tǒng)一展示
zip()例子:結(jié)合
Retrofit
以及Rxjava,實(shí)現(xiàn)多個(gè)網(wǎng)絡(luò)請求合并獲得數(shù)據(jù)俭驮,回溺,統(tǒng)一展示
二者區(qū)別為:merge()只添加被觀察者合并數(shù)據(jù)源的操作在observable觀察者的onnext()里面處理,進(jìn)行合并混萝,合并的結(jié)果在
onComplete()
處理遗遵,zip()可以直接添加發(fā)射者,再添加合并數(shù)據(jù)源的bean逸嘀,在轉(zhuǎn)主線程车要,訂閱,可以使用new Consumer<Bean>() )
里面處理合并結(jié)果
/**
* 合并發(fā)射者崭倘,按時(shí)間線執(zhí)行
* 合并事件翼岁,還是merge()比較方便好用
*/
String resultss = "數(shù)據(jù)源來自:";
private void merge() {
// Observable.merge(
// //延遲發(fā)送操作符
// //從0開始發(fā)送,發(fā)送3個(gè)數(shù)據(jù)司光,第一次發(fā)件延遲時(shí)間1秒琅坡。間隔時(shí)間1s
// //
// Observable.intervalRange(0,3,1,1,TimeUnit.SECONDS),
// Observable.intervalRange(2,3,1,1,TimeUnit.SECONDS)
// ).subscribe(aLong -> {
//
// });
Observable.merge(
Observable.just("網(wǎng)絡(luò)"),
Observable.just("本地文件")
).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
resultss += s;
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
KLog.d(TTAG, "接收完成統(tǒng)一處理事件:" + resultss);
}
});
}
下面使用zip操作:
/**
* 合并數(shù)據(jù)源
*/
private void zip() {
Observable.zip(
retrofitApi.getCall().subscribeOn(Schedulers.io()),
retrofitApi.getCall().subscribeOn(Schedulers.io()),
(translation, translation2) ->
translation.toString() + translation2.toString())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(s -> {
KLog.d(TTAG, "合并的數(shù)據(jù)源是:" + s.toString());
}, throwable -> {
});
}
concat()實(shí)例
/**
* 該類型的操作符的作用 = 組合多個(gè)被觀察者
* 組合多個(gè)被觀察者一起發(fā)送數(shù)據(jù),合并后 按發(fā)送順序串行執(zhí)行
* concat()
* concatArray()
*
* 實(shí)例:從內(nèi)存以及磁盤和網(wǎng)絡(luò)獲取緩存
*/
String memoryCache = null;
String diskCache = "磁盤緩存數(shù)據(jù)";
private void concat() {
Observable.concat(
Observable.create(emitter -> {
//判斷內(nèi)存是否含有緩存
if (null == memoryCache) {
emitter.onComplete();
} else {
emitter.onNext(memoryCache);
}
}),
Observable.create(emitter -> {
//判斷磁盤
if (null == diskCache) {
emitter.onComplete();
} else {
emitter.onNext(diskCache);
}
}),
Observable.create((ObservableOnSubscribe<String>) emitter -> {
emitter.onNext("從網(wǎng)絡(luò)獲取緩存數(shù)據(jù)");
})
//通過firstElement()残家,從串聯(lián)隊(duì)列中取出并發(fā)送第1個(gè)有效事件(Next事件)榆俺,即依次判斷檢查memory、disk坞淮、network
).firstElement()
// 即本例的邏輯為:
// a. firstElement()取出第1個(gè)事件 = memory茴晋,即先判斷內(nèi)存緩存中有無數(shù)據(jù)緩存;由于memoryCache = null回窘,即內(nèi)存緩存中無數(shù)據(jù)诺擅,所以發(fā)送結(jié)束事件(視為無效事件)
// b. firstElement()繼續(xù)取出第2個(gè)事件 = disk,即判斷磁盤緩存中有無數(shù)據(jù)緩存:由于diskCache ≠ null啡直,即磁盤緩存中有數(shù)據(jù)烁涌,所以發(fā)送Next事件(有效事件)
// c. 即firstElement()已發(fā)出第1個(gè)有效事件(disk事件)苍碟,所以停止判斷。
.subscribe(s -> {
KLog.d(TTAG, "緩存獲得路徑是:" + s.toString());
});
}
combineLatest()實(shí)例
進(jìn)行多個(gè)輸入框判斷烹玉,有一個(gè)為空時(shí)按鈕不可點(diǎn)擊驰怎,都不為空時(shí)才可以點(diǎn)擊(并且改變輸入框顏色)
/**
* 通過combineLatest()合并事件 & 聯(lián)合判斷
* <p>
* 當(dāng)兩個(gè)Observables中的任何一個(gè)發(fā)送了數(shù)據(jù)后,
* 將先發(fā)送了數(shù)據(jù)的Observables 的最新(最后)一個(gè)數(shù)據(jù) 與
* 另外一個(gè)Observable發(fā)送的每個(gè)數(shù)據(jù)結(jié)合二打,最終基于該函數(shù)的結(jié)果發(fā)送數(shù)據(jù)
*/
private void init() {
nameObser = RxTextView.textChanges(name).skip(1);
ageObser = RxTextView.textChanges(age).skip(1);
jobObser = RxTextView.textChanges(job).skip(1);
Observable.combineLatest(nameObser, ageObser, jobObser,
(charSequence, charSequence2, charSequence3) -> {
boolean nameIsNOtEmpty = !TextUtils.isEmpty(name.getText());
// boolean nameIs = !TextUtils.isEmpty(name.getText()) && name.getText().toString().length() <= 10;
boolean ageIsNotEmpty = !TextUtils.isEmpty(age.getText());
boolean jobIsNotEmpty = !TextUtils.isEmpty(job.getText());
return nameIsNOtEmpty && ageIsNotEmpty && jobIsNotEmpty;
}
).subscribe(aBoolean -> {
KLog.d(TTAG, "點(diǎn)擊結(jié)果是:" + aBoolean);
push.setEnabled(aBoolean);
});
}
有條件的輪詢操作:
使用關(guān)鍵字:repeatWhen
// 設(shè)置變量 = 模擬輪詢服務(wù)器次數(shù)
private int i = 0 ;
/**
* 有條件的輪詢
* 使用操作符:repeatWhen
*/
private void init3() {
RetrofitApi retrofitApi = OkHttpUtils.newInstance().create(RetrofitApi.class);
retrofitApi.getCall()
.repeatWhen(objectObservable -> {
// 將原始 Observable 停止發(fā)送事件的標(biāo)識(Complete() / Error())轉(zhuǎn)換成1個(gè) Object 類型數(shù)據(jù)傳遞給1個(gè)新被觀察者(Observable)
// 以此決定是否重新訂閱 & 發(fā)送原來的 Observable,即輪詢
// 此處有2種情況:
// 1. 若返回1個(gè)Complete() / Error()事件掂榔,則不重新訂閱 & 發(fā)送原來的 Observable继效,即輪詢結(jié)束
// 2. 若返回其余事件,則重新訂閱 & 發(fā)送原來的 Observable装获,即繼續(xù)輪詢
return objectObservable.flatMap((Function<Object, ObservableSource<?>>) o -> {
// 加入判斷條件:當(dāng)輪詢次數(shù) = 5次后瑞信,就停止輪詢
if (i>3){
return Observable.error(new Throwable("輪詢結(jié)束"));
}
// 若輪詢次數(shù)<4次,則發(fā)送1Next事件以繼續(xù)輪詢
// 注:此處加入了delay操作符穴豫,作用 = 延遲一段時(shí)間發(fā)送(此處設(shè)置 = 2s)凡简,以實(shí)現(xiàn)輪詢間間隔設(shè)置
return Observable.just(1).delay(2000, TimeUnit.MILLISECONDS);
});
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Translation>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Translation translation) {
translation.show();
i++;
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
有條件的網(wǎng)絡(luò)請求出錯(cuò),重試精肃,可以設(shè)置條件
/**
* 請求出錯(cuò)去重復(fù)查詢秤涩,可以設(shè)置條件
* 使用操作符:retryWhen
* 發(fā)送網(wǎng)絡(luò)請求 & 通過retryWhen()進(jìn)行重試
* 主要異常才會回調(diào)retryWhen()進(jìn)行重試
* 參數(shù)Observable<Throwable>中的泛型 = 上游操作符拋出的異常,可通過該條件來判斷異常的類型
*/
// 設(shè)置變量
// 可重試次數(shù)
private int maxConnectCount = 10;
// 當(dāng)前已重試次數(shù)
private int currentRetryCount = 0;
// 重試等待時(shí)間
private int waitRetryTime = 0;
private void init4() {
retrofitApi.getCall()
.retryWhen(throwableObservable ->
throwableObservable.flatMap((Function<Throwable, ObservableSource<?>>) throwable -> {
if (throwable instanceof IOException) {
if (currentRetryCount < maxConnectCount) {
currentRetryCount++;
waitRetryTime = 1000 + currentRetryCount * 1000;
return Observable.just(1).delay(waitRetryTime, TimeUnit.MILLISECONDS);
} else {
return Observable.error(new Throwable("超過重試次數(shù):" + currentRetryCount));
}
} else {
return Observable.error(new Throwable("發(fā)生異常司抱,非網(wǎng)絡(luò)"));
}
}))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Translation>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Translation translation) {
translation.show();
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
有關(guān)過濾操作符
ofType
/**
* 過濾操作符
*/
private void useOfType() {
Observable.just(1, "asd", 2, 3, 4, "qwe")
.ofType(Integer.class)
.subscribe(integer -> {
KLog.d(TTAG, "獲得的整型消息事件是:" + integer);
});
}
Skip筐眷,,习柠,SkipLast
/**
* 跳轉(zhuǎn)開頭和跳過結(jié)尾消息
*/
private void userSkipAndSkipLast() {
// 使用1:根據(jù)順序跳過數(shù)據(jù)項(xiàng)
Observable.just(1, 2, 3, 4, 5)
.skip(1) // 跳過正序的前1項(xiàng)
.skipLast(2) // 跳過正序的后2項(xiàng)
.subscribe(integer -> KLog.d(TTAG, "獲取到的整型事件元素是: " + integer));
// 使用2:根據(jù)時(shí)間跳過數(shù)據(jù)項(xiàng)
// 發(fā)送事件特點(diǎn):發(fā)送數(shù)據(jù)0-5匀谣,每隔1s發(fā)送一次,每次遞增1资溃;第1次發(fā)送延遲0s
Observable.intervalRange(0, 5, 0, 1, TimeUnit.SECONDS)
.skip(1, TimeUnit.SECONDS) // 跳過第1s發(fā)送的數(shù)據(jù)
.skipLast(1, TimeUnit.SECONDS) // 跳過最后1s發(fā)送的數(shù)據(jù)
.subscribe(along -> KLog.d(TTAG, "獲取到的整型事件元素是: " + along));
}
throttleFirst()武翎,,throttleLast()
在某段時(shí)間內(nèi)溶锭,只發(fā)送該段時(shí)間內(nèi)第1次事件 / 最后1次事件
<<- 在某段時(shí)間內(nèi)宝恶,只發(fā)送該段時(shí)間內(nèi)第1次事件 ->>
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
// 隔段事件發(fā)送時(shí)間
e.onNext(1);
Thread.sleep(500);
e.onNext(2);
Thread.sleep(400);
e.onNext(3);
Thread.sleep(300);
Thread.sleep(300);
e.onComplete();
}
}).throttleFirst(1, TimeUnit.SECONDS)//每1秒中采用數(shù)據(jù)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始采用subscribe連接");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應(yīng)");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應(yīng)");
}
});
<<- 在某段時(shí)間內(nèi),只發(fā)送該段時(shí)間內(nèi)最后1次事件 ->>
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
// 隔段事件發(fā)送時(shí)間
e.onNext(1);
Thread.sleep(500);
e.onNext(2);
Thread.sleep(400);
Thread.sleep(300);
e.onComplete();
}
}).throttleLast(1, TimeUnit.SECONDS)//每1秒中采用數(shù)據(jù)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
KLog.d(TTAG, "開始采用subscribe連接");
}
@Override
public void onNext(Integer value) {
KLog.d(TTAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
KLog.d(TTAG, "對Error事件作出響應(yīng)");
}
@Override
public void onComplete() {
K Log.d(TTAG, "對Complete事件作出響應(yīng)");
}
});
實(shí)際應(yīng)用:規(guī)定時(shí)間內(nèi)暖途,多次點(diǎn)擊按鈕禁止多次操作使用throttleFirst卑惜,操作符
RxView.clicks(button)
.throttleFirst(2, TimeUnit.SECONDS) // 才發(fā)送 2s內(nèi)第1次點(diǎn)擊按鈕的事件
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object value) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Sample()實(shí)例應(yīng)用實(shí)時(shí)搜索
在某段時(shí)間內(nèi),只發(fā)送該段時(shí)間內(nèi)最新(最后)1次事件,與 throttleLast() 操作符類似
throttleWithTimeout () / debounce()
發(fā)送數(shù)據(jù)事件時(shí)驻售,若2次發(fā)送事件的間隔<指定時(shí)間露久,就會丟棄前一次的數(shù)據(jù),直到指定時(shí)間內(nèi)都沒有新數(shù)據(jù)發(fā)射時(shí)才會發(fā)送后一次的數(shù)據(jù)
RxTextView.textChanges(ed)
.debounce(1, TimeUnit.SECONDS)
.skip(1) //跳過 第1次請求 = 初始輸入框的空字符狀態(tài)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<CharSequence>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(CharSequence charSequence) {
tv.setText("發(fā)送給服務(wù)器的字符 = " + charSequence.toString());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
firstElement() 欺栗,毫痕, lastElement()
僅選取第1個(gè)元素 征峦,,最后一個(gè)元素
// 獲取第1個(gè)元素
Observable.just(1, 2, 3, 4, 5)
.firstElement()
.subscribe(new Consumer<Integer>() {
@Override
public void accept( Integer integer) throws Exception {
KLog.d(TTAG,"獲取到的第一個(gè)事件是: "+ integer);
}
});
// 獲取最后1個(gè)元素
Observable.just(1, 2, 3, 4, 5)
.lastElement()
.subscribe(new Consumer<Integer>() {
@Override
public void accept( Integer integer) throws Exception {
KLog.d(TTAG,"獲取到的最后1個(gè)事件是: "+ integer);
}
});
elementAt()
指定接收某個(gè)消息消请,根據(jù)索引栏笆,可以設(shè)置默認(rèn)消息
private void userEleMentAt() {
// 使用1:獲取位置索引 = 2的 元素
// 位置索引從0開始
Observable.just(1, 2, 3, 4, 5)
.elementAt(2)
.subscribe(integer -> KLog.d(TTAG,"獲取到的事件元素是: "+ integer));
// 使用2:獲取的位置索引 > 發(fā)送事件序列長度時(shí),設(shè)置默認(rèn)參數(shù)
Observable.just(1, 2, 3, 4, 5)
.elementAt(6,10)
.subscribe(integer -> KLog.d(TTAG,"獲取到的事件元素是: "+ integer));
}
elementAtOrError()
在elementAt()的基礎(chǔ)上臊泰,當(dāng)出現(xiàn)越界情況(即獲取的位置索引 > 發(fā)送事件序列長度)時(shí)蛉加,即拋出異常
private void userElementAtOrError() {
Observable.just(1, 2, 3, 4, 5)
.elementAtOrError(6)
.subscribe(integer -> KLog.d(TTAG,"獲取到的事件元素是: "+ integer));
}