在項目開發(fā)中常常會在一個頁面中執(zhí)行多個任務(wù)扇售,多線程異步執(zhí)行任務(wù)時哪個任務(wù)先結(jié)束出結(jié)果這些并不好控制鸳谜,譬如要進(jìn)行幾個并發(fā)的網(wǎng)絡(luò)請求在都拿到結(jié)果后需要對數(shù)據(jù)進(jìn)行處理膝藕,希望的是用戶只感知一次數(shù)據(jù)加載,如果不能做到這樣咐扭,反饋到用戶界面的體驗也就不好了芭挽。通過 RxJava 的合并操作符我們能夠很方便的應(yīng)對這些情況懒棉。
常用的幾個合并操作符
我們先創(chuàng)建如下三個不同數(shù)據(jù)類型的 observable 作為示例數(shù)據(jù)
// 先創(chuàng)建三個不同類型的示例 observable
private val observable = Observable.fromArray(1, 2, 3)
.concatMap(object : Function<Int, ObservableSource<Int>> {
override fun apply(t: Int): ObservableSource<Int> {
return Observable.just(t).delay(1000, TimeUnit.MILLISECONDS)
}
})
private val observable1 = Observable.just("a", "b", "c")
.concatMap(object : Function<String, ObservableSource<String>> {
override fun apply(t: String): ObservableSource<String> {
return Observable.just(t).delay(400, TimeUnit.MILLISECONDS)
}
})
private val observable2 = Observable.fromArray(1.0f, 2.0f, 3.0f)
.concatMap(object : Function<Float, ObservableSource<Float>> {
override fun apply(t: Float): ObservableSource<Float> {
return Observable.just(t).delay(1000, TimeUnit.MILLISECONDS)
}
})
concat()
concat()
操作符將多個 Observable 按先后順序進(jìn)行合并,當(dāng)合并的 Observable<T> 泛型類型不一致時,事件流中的對象類型只能使用 Object
(java),Any
(kotlin)览绿。通過如下示例代碼可以直觀的看到 concat 后事件的發(fā)送順序:
Observable.concat(observable, observable1)
.subscribe(object : AppCallBack<Any>() {
override fun success(data: Any?) {
println("contact----->$data")
}
override fun fail(code: Long?, msg: String?) {
}
override fun finish(success: Boolean) {
merge()
}
})
得到的輸出結(jié)果為:
observable 發(fā)送 1、2穗慕、3 的時間間隔是大于 observable1發(fā)送 a饿敲、b、c 的間隔的逛绵,但輸出結(jié)果為 123 打印完畢后再打印 abc 怀各。這說明 concat 操作符是線性有序的,要等前一個 observable 發(fā)送完畢后才會處理下一個 observable术浪。當(dāng)我們需要多個接口的返回數(shù)據(jù)按順序進(jìn)行處理時可以使用 concat 操作符合并請求瓢对。
多于四個 observable 的合并:
兩種方式
-
concat()
傳入一個Iterable<? extends ObservableSource<? extends T>
對象 -
concatArray()
傳入一個 Observable 數(shù)組。
merge()
merge()
操作符將多個 Observable 無序合并胰苏,當(dāng)合并的 Observable<T> 泛型類型不一致時,事件流中的對象類型只能使用 Object
(java),Any
(kotlin)硕蛹。
Observable.merge(observable, observable1)
.subscribe(object : AppCallBack<Any>() {
override fun success(data: Any?) {
println("merge----->$data")
}
override fun fail(code: Long?, msg: String?) {
}
override fun finish(success: Boolean) {
zip()
}
})
得到的輸出結(jié)果為:
abc 與 123 是按照時間先后順序交錯進(jìn)行輸出的,說明
merge()
后事件的發(fā)送是并發(fā)的無序的硕并,先發(fā)送先處理
多于四個 observable 的合并:
兩種方式
-
merge()
傳入一個Iterable<? extends ObservableSource<? extends T>
對象 -
mergeArray()
傳入一個 Observable 數(shù)組法焰。
zip()
上面的兩種合并都是單個事件 item 訂閱監(jiān)聽,如果想合并的事件 item 都接收到數(shù)據(jù)時處理這兩個事件數(shù)據(jù)就需要使用 zip() 操作符倔毙。
Observable.zip(observable, observable1, object : BiFunction<Int, String, String> {
override fun apply(t1: Int, t2: String): String {
return "t1=$t1 t2=$t2"
}
}).subscribe(object : AppCallBack<String>() {
override fun success(data: String?) {
println("zip----->$data")
}
override fun fail(code: Long?, msg: String?) {
}
override fun finish(success: Boolean) {
}
})
得到的輸出結(jié)果為:
使用 zip 合并時埃仪,會等待每一次的合并項都發(fā)送完畢后再發(fā)送下一輪的事件。當(dāng)我們需要從兩個數(shù)據(jù)源拿數(shù)據(jù)陕赃,但是需要統(tǒng)一合并顯示時可以使用 zip 操作符對事件流進(jìn)行合并卵蛉。
多個 observable 的合并:
zip()
的多事件合并就有點厲害了,支持九個 Observable 按數(shù)據(jù)類型合并么库,除了兩個觀察者對象合并時 zipper 是 BiFunction
其他的為 FunctionX
,X 為合并個數(shù)傻丝。如果多于九個觀察者對象合并,與上面兩種合并一樣可以使用 zipArray()
進(jìn)行合并诉儒,但是合并后的觀察結(jié)果是一個 Object 數(shù)組對象桑滩,需要自己判斷數(shù)據(jù)類型
結(jié)語
除了以上的創(chuàng)建 Observable 對象級別的合并操作符,還有一些事件流中的操作符允睹,譬如第一段代碼中的 concatMap
运准,在事件流前面添加其他事件的 startWith()
等。還是比較完善的