今天心肪,來學習RxJava中的組合/合并操作符,并完成實例递雀。
一泵喘、作用
組合多個被觀察者&合并需要發(fā)送的事件
二众眨、類型
常見的組合/合并操作符有:
1.組合多個被觀察者
a.按發(fā)送順序:concat()、concatArray()
b.按時間:merge()阻逮、mergeArray()
c.錯誤處理:concatDelayError()粱快、mergeDelayError()
2.合并多個事件
a.按數(shù)量:Zip()
b.按時間:combineLatest()、combineLatestDelayError()
c.合并成1個事件發(fā)送:reduce()叔扼、collect()
3.發(fā)送事件前追加發(fā)送事件:startWith()事哭、startWithArray()
4.統(tǒng)計發(fā)送事件數(shù)量:count()
三、操作符及應用介紹
首先在項目中添加依賴:
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.0.7'
1.組合多個被觀察者
concat()/concatArray()
作用:組合多個被觀察者發(fā)送數(shù)據(jù)瓜富,合并后按發(fā)送順序串行執(zhí)行
二者區(qū)別:組合被觀察者的數(shù)量鳍咱,即concat()組合被觀察者數(shù)量≤4個,而concatArray()可以大于4個
具體使用:
//concat()
:組合多個被觀察者
(≤4
個
)
一起發(fā)送數(shù)據(jù)
//
注:串行執(zhí)行
Observable
.
concat
(
Observable
.
just
(
1
,
2
,
3
),
Observable
.
just
(
4
,
5
,
6
),
Observable
.
just
(
7
,
8
,
9
),
Observable
.
just
(
10
,
11
,
12
))
.subscribe(
new
Observer<Integer>() {
@Override
public void
onSubscribe(Disposable d) {
}
@Override
public void
onNext(Integer integer) {
Log.
e
(
tag
,
"
接收到了事件
"
+integer);
}
@Override
public void
onError(Throwable e) {
Log.
e
(
tag
,
""
);
}
@Override
public void
onComplete() {
Log.
e
(
tag
,
""
);
}
});
//concat()
:組合多個被觀察者
(≤4
個
)
一起發(fā)送數(shù)據(jù)
//
注:串行執(zhí)行
Observable
.
concatArray
(
Observable
.
just
(
1
,
2
,
3
),
Observable
.
just
(
4
,
5
,
6
),
Observable
.
just
(
7
,
8
,
9
),
Observable
.
just
(
10
,
11
,
12
),
Observable.just(13,14,15))
.subscribe(
new
Observer<Integer>() {
@Override
public void
onSubscribe(Disposable d) {
}
@Override
public void
onNext(Integer integer) {
Log.
e
(
tag
,
"
接收到了事件
"
+integer);
}
@Override
public void
onError(Throwable e) {
Log.
e
(
tag
,
""
);
}
@Override
public void
onComplete() {
Log.
e
(
tag
,
""
);
}
});
merge()/mergeArray
作用:
組合多個被觀察者一起發(fā)送數(shù)據(jù)与柑,合并后按時間線并行執(zhí)行
二者區(qū)別:組合被觀察者的數(shù)量谤辜,即merge()組合被觀察者數(shù)量≤4個,而mergeArray()則可>4個
-
區(qū)別上述concat()操作符:同樣是組合多個被觀察者一起發(fā)送數(shù)據(jù)价捧,但concat()操作符合并后是按發(fā)送順序串行執(zhí)行
具體使用:
//merge()組合多個被觀察者(<4個)一起發(fā)送數(shù)據(jù)
// 注:合并后按照時間線并行執(zhí)行
Observable.merge(Observable.intervalRange(1,2,1,1, TimeUnit.SECONDS),
Observable.intervalRange(1,3,10,1,TimeUnit.SECONDS))
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
Log.e(tag,"接收到了事件"+aLong);
}
@Override
public void onError(Throwable e) {
Log.e(tag,"");
}
@Override
public void onComplete() {
Log.e(tag,"complete");
}
});
//mergeArray()組合多個被觀察者(>4個)一起發(fā)送數(shù)據(jù)
// 注:合并后按照時間線并行執(zhí)行
Observable.mergeArray(Observable.intervalRange(1,2,1,1, TimeUnit.SECONDS),
Observable.intervalRange(1,3,10,1,TimeUnit.SECONDS)丑念,
Observable.intervalRange(1,2,1,1, TimeUnit.SECONDS),
Observable.intervalRange(1,3,10,1,TimeUnit.SECONDS),
Observable.intervalRange(1,2,1,1, TimeUnit.SECONDS),
Observable.intervalRange(1,3,10,1,TimeUnit.SECONDS))
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
Log.e(tag,"接收到了事件"+aLong);
}
@Override
public void onError(Throwable e) {
Log.e(tag,"");
}
@Override
public void onComplete() {
Log.e(tag,"complete");
}
});
concatDelayError()/mergeDelayError()
作用:
[圖片上傳失敗...(image-4c0cf4-1512547073721)]
具體使用:
Observable
.
concatArrayDelayError
(
Observable
.
create
(
new
ObservableOnSubscribe<Integer>() {
@Override
public void
subscribe(ObservableEmitter<Integer> emitter)
throws
Exception {
emitter.onNext(
1
);
emitter.onNext(
2
);
emitter.onNext(
3
);
emitter.onError(
new
NullPointerException());
//
發(fā)送
Error
事件干旧,因為使用了
concatDelayError
渠欺,所以第
2
個
Observable
將會發(fā)送事件,等發(fā)送完畢后椎眯,再發(fā)送錯誤事件
emitter.onComplete();
}
}),
Observable
.
just
(
4
,
5
,
6
))
.subscribe(
new
Observer<Integer>() {
@Override
public void
onSubscribe(Disposable d) {
}
@Override
public void
onNext(Integer value) {
Log.
d
(
tag
,
"
接收到了事件
"
-
value );
}
@Override
public void
onError(Throwable e) {
Log.
d
(
tag
,
"
對
Error
事件作出響應
"
);
}
@Override
public void
onComplete() {
Log.
d
(
tag
,
"
對
Complete
事件作出響應
"
);
}
});
2.合并多個事件
該類型操作符主要是對多個被觀察者中的事件進行合并處理挠将。
Zip()
作用:合并多個被觀察者發(fā)送的事件胳岂,生成一個新的事件序列,最終發(fā)送舔稀。
原理:
[圖片上傳失敗...(image-55bcd1-1512547073721)]
注意:事件組合方式=嚴格按照原先的事件序列進行對位合并
最終合并的事件數(shù)量=
多個被觀察者中數(shù)量最少的數(shù)量
combineLatest()
作用:當兩個Observables中的任何一個發(fā)送了數(shù)據(jù)后乳丰,將先發(fā)送了數(shù)據(jù)的Observables的最新(最后)一個數(shù)據(jù)與另外一個Observable發(fā)送的每個數(shù)據(jù)結合,最終基于該函數(shù)的結果發(fā)送數(shù)據(jù)内贮。
與Zip()的區(qū)別:
zip()=按個數(shù)合并产园,即1對1合并;CombineLatest()=按時間合并夜郁,即在同一個時間點上合并
combineLatestDelayError()
作用類似于
concatDelayError()
/
mergeDelayError()
什燕,即錯誤處理,此處不作過多描述
reduce()
作用:把被觀察者要發(fā)送的事件聚合成1個事件&發(fā)送
collect()
作用:將被觀察者Observable發(fā)送的數(shù)據(jù)事件收集到一個數(shù)據(jù)結構里
startWith()/startWithArray()
作用:在一個被觀察者發(fā)送事件前竞端,追加發(fā)送一些數(shù)據(jù)/一個新的被觀察者屎即。
Observable.
just
(
4
,
5
,
6
)
.startWith(
0
)
//
追加單個數(shù)據(jù)
= startWith()
.
startWithArray
(
1
,
2
,
3
)
//
追加多個數(shù)據(jù)
= startWithArray()
.subscribe(
new
Observer<Integer>() {
@Override
public void
onSubscribe(Disposable d) {
}
@Override
public void
onNext(Integer value) {
Log.
d
(
tag
,
"
接收到了事件
"
-
value );
} @Override public void
onError(Throwable e) {
Log.
d
(
tag
,
"
對
Error
事件作出響應
"
);
}
@Override
public void
onComplete() {
Log.
d
(
tag
,
"
對
Complete
事件作出響應
"
);
}
});
count()統(tǒng)計發(fā)送事件數(shù)量