首先我們創(chuàng)建兩個Observable1 Observable2來模擬網(wǎng)絡(luò)請求,代碼如下:
//每個子Observable都要加subscribeOn(Schedulers.io())指定為異步線程 不同線程來處理請求達到并發(fā)
val observable1: Observable<String> = Observable.create(ObservableOnSubscribe<String> {
Log.e("asd" , "observable1---"+Thread.currentThread().name)
Thread.sleep(5000)
it.onNext("1111")
it.onComplete()
}).subscribeOn(Schedulers.io())
val observable2: Observable<String> = Observable.create(ObservableOnSubscribe<String>{
Log.e("asd" , "observable2---"+Thread.currentThread().name)
Thread.sleep(10000)
it.onNext("2222")
it.onComplete()
}).subscribeOn(Schedulers.io())
observable1中第一行打印了線程的名字,當(dāng)執(zhí)行到這里的時候代表該請求已經(jīng)在執(zhí)行了洛心,然后讓線程休眠5秒鐘模擬耗時,休眠過后發(fā)送一個111字符串出去調(diào)用onComplete完成本此請求
observable2中類似和observable1類似题篷,只是休眠時間變成10秒词身,10秒過后請求完成發(fā)送數(shù)據(jù),因為兩個請求由于返回數(shù)據(jù)和網(wǎng)絡(luò)狀態(tài)番枚,等等原因法严,他們所消耗的時間肯定不一樣的。
這里需要注意 每個子的Observable必須要單獨指定線程subscribeOn(Schedulers.io())户辫,如果不加這個渐夸,那么兩個請求還是在一個線程中執(zhí)行,必定會有先后渔欢,達不到并發(fā)的效果墓塌,這里后面會展示不并發(fā)的效果
接下來吧請求合并一起使用zip關(guān)鍵字代碼如下:
val start = System.currentTimeMillis()
Observable.zip(observable1, observable2,BiFunction<String, String, String> { t1, t2 ->
Log.e("asd" , "zip:"+Thread.currentThread().name)
//這里是兩個請求都返回了之后 執(zhí)行到這里 把數(shù)據(jù)封裝到一個類中返回,這里只是簡單的吧他們加起來返回了
t1 + t2
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
val end = System.currentTimeMillis() - start
Log.e("asd" , "請求的數(shù)據(jù):${it}---耗時:${end}---observable:"+Thread.currentThread().name)
}
看下打印的日志奥额,分析一下
2019-10-25 17:18:39.143 19436-19772/com.joyrun.study E/asd: observable2---RxCachedThreadScheduler-3
2019-10-25 17:18:39.143 19436-19771/com.joyrun.study E/asd: observable1---RxCachedThreadScheduler-2
2019-10-25 17:18:49.144 19436-19772/com.joyrun.study E/asd: zip:RxCachedThreadScheduler-3
2019-10-25 17:18:49.145 19436-19436/com.joyrun.study E/asd: 請求的數(shù)據(jù):11112222---耗時:10004---observable:main
17:18:39 同一時間打印了observable1 observable2苫幢,說明這兩個請求已經(jīng)并發(fā)執(zhí)行了
17:18:49 10秒鐘過后兩個請求都完成了,同一時間把數(shù)據(jù)返回主線程垫挨,打印出請求的數(shù)據(jù):11112222---耗時:10004---observable:main
總共耗時10004 10秒 也就是說兩個請求并發(fā)了韩肝,最后拿到數(shù)據(jù)的消耗時間是倆個請求中最長的耗時時間,如果你想先執(zhí)行完 就先處理的話 使用merge操作符就可以了,最后面展示用法
下面看下 不并發(fā)的情況九榔,也就是每個子Observable不單獨指定線程哀峻,那么他們還是在同一線程中執(zhí)行,打印日志中可以看到哲泊,所以必定是有一個先一個后執(zhí)行的剩蟀,達不到并發(fā)效果
val observable1: Observable<String> = Observable.create {
Log.e("asd" , "observable1:"+Thread.currentThread().name)
Thread.sleep(5000)
it.onNext("1111")
it.onComplete()
}
val observable2: Observable<String> = Observable.create {
Log.e("asd" , "observable2:"+Thread.currentThread().name)
Thread.sleep(10000)
it.onNext("2222")
it.onComplete()
}
解釋:使用Observable.zip將兩個Observable合并成一個指定子線程,然后在BiFunction的接口函數(shù)中就可以拿到這兩個請求返回的數(shù)據(jù)切威,主意執(zhí)行到這里一定是兩個請求都成功返回數(shù)據(jù)了育特,看下打印的日志
2019-10-25 17:03:46.903 11409-11465/com.joyrun.study E/asd: observable1:RxCachedThreadScheduler-1
2019-10-25 17:03:51.905 11409-11465/com.joyrun.study E/asd: observable2:RxCachedThreadScheduler-1
2019-10-25 17:04:01.906 11409-11465/com.joyrun.study E/asd: zip:RxCachedThreadScheduler-1
2019-10-25 17:04:01.907 11409-11409/com.joyrun.study E/asd: 請求的數(shù)據(jù):11112222---耗時:15006---observable:main
17:03:46 開始了第一個請求
17:03:51 五秒過后執(zhí)行了第二個請求
17:04:01 也就是10秒過后第二個請求執(zhí)行完成 就會調(diào)用到BiFunction的接口函數(shù)中做兩個請求數(shù)據(jù)的處理并返回
17:04:01 同一時間將數(shù)據(jù)返回到了主線程
打兩個請求合并之后的請求總時間15006 15秒鐘 一個5秒一個10秒,雖然請求合并了 但是沒有并發(fā) 請求所消耗的時間 還是兩個請求消耗時間的總和
如果說想并發(fā)請求先朦,那個返回數(shù)據(jù) 就處理那個的話就是用merge操作符即可低嗎如下
Observable.merge(observable1, observable2).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()).subscribe {
Log.e("asd" , "請求的數(shù)據(jù):${it}---observable:"+Thread.currentThread().name)
}
打印日志
2019-10-25 17:28:48.586 25014-25108/com.joyrun.study E/asd: observable2:RxCachedThreadScheduler-3
2019-10-25 17:28:48.587 25014-25107/com.joyrun.study E/asd: observable1:RxCachedThreadScheduler-2
2019-10-25 17:28:53.589 25014-25014/com.joyrun.study E/asd: 請求的數(shù)據(jù):1111---observable:main
2019-10-25 17:28:58.587 25014-25014/com.joyrun.study E/asd: 請求的數(shù)據(jù):2222---observable:main
通過打印日志看出兩個請求并發(fā)執(zhí)行缰冤,并且在不同線程中
17:28:53 5秒后執(zhí)行完第一個請求拿到返回數(shù)據(jù)進行處理
17:28:58 10秒后執(zhí)行完第二個請求犬缨,拿到返回數(shù)據(jù)進行處理,注意10秒后 是針對兩個請求并發(fā)處理之后的10秒 并不是第一個請求完了 后10秒
測試中完整代碼如下:
//----------------------------------------------------------
// val observable1: Observable<String> = Observable.create {
//// Log.e("asd" , "observable1:"+Thread.currentThread().name)
//// Thread.sleep(5000)
//// it.onNext("1111")
//// it.onComplete()
//// }
//// val observable2: Observable<String> = Observable.create {
//// Log.e("asd" , "observable2:"+Thread.currentThread().name)
//// Thread.sleep(10000)
//// it.onNext("2222")
//// it.onComplete()
//// }
//----------------------------------------------------------
//**********************************************************
//每個子Observable都要加subscribeOn(Schedulers.io())指定為異步線程 不同線程來處理請求達到并發(fā)
val observable1: Observable<String> = Observable.create(ObservableOnSubscribe<String> {
Log.e("asd" , "observable1:"+Thread.currentThread().name)
Thread.sleep(5000)
it.onNext("1111")
it.onComplete()
}).subscribeOn(Schedulers.io())
val observable2: Observable<String> = Observable.create(ObservableOnSubscribe<String>{
Log.e("asd" , "observable2:"+Thread.currentThread().name)
Thread.sleep(10000)
it.onNext("2222")
it.onComplete()
}).subscribeOn(Schedulers.io())
//**********************************************************
//zip
// val start = System.currentTimeMillis()
// Observable.zip(observable1, observable2,BiFunction<String, String, String> { t1, t2 ->
// Log.e("asd" , "zip:"+Thread.currentThread().name)
// //這里是兩個請求都返回了之后 執(zhí)行到這里 把數(shù)據(jù)封裝到一個類中返回棉浸,這里只是簡單的吧他們加起來返回了
// t1 + t2
// }).subscribeOn(Schedulers.io())
// .observeOn(AndroidSchedulers.mainThread())
// .subscribe {
// val end = System.currentTimeMillis() - start
// Log.e("asd" , "請求的數(shù)據(jù):${it}---耗時:${end}---observable:"+Thread.currentThread().name)
// }
//merge
Observable.merge(observable1, observable2).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()).subscribe {
Log.e("asd" , "請求的數(shù)據(jù):${it}---observable:"+Thread.currentThread().name)
}