相關概念
Android多線程編程的原則:
- 不要阻塞UI線程;
- 不要在UI線程之外訪問UI組件;
ReactiveX
- Reactive Extensions的縮寫,一般簡寫為Rx;
- 是一個使用可觀察數(shù)據(jù)流進行異步編程的編程接口,ReactiveX結合了觀察者模式、迭代器模式和函數(shù)式編程的精華;
RxJava
- Reactive Extensions for the JVM:寂玲,RxJava就是ReactiveX在JVM平臺的實現(xiàn);
- 基于事件流的鏈式調用呆馁,進行耗時任務,線程切換鹊漠,其本質是一個異步操作庫;
原理
- 基于觀察者模式, 一個方面的操作依賴于另一個方面的狀態(tài)變化主到,當一個對象必須通知其他對象茶行,又希望這個對象和其他被通知的對象是松散耦合的;
三個要素
- 被觀察者(Observable)登钥,觀察者(Subscriber)畔师,訂閱(subscribe);
被觀察者
1. Observable
- 可多次發(fā)送事件(onNext),直到 onComplete 或 onError 被調用結束訂閱;
- 不支持背壓:當被觀察者快速發(fā)送大量數(shù)據(jù)時牧牢,下游不會做其他處理看锉,即使數(shù)據(jù)大量堆積,調用鏈也不會報MissingBackpressureException,消耗內存過大只會OOM塔鳍。(官方給出以1000個事件為分界線作為參考);
val observable = Observable.create(ObservableOnSubscribe<Int> {
it.onNext(1)
it.onNext(3)
it.onNext(5)
it.onComplete()
LjyLogUtil.d("${Thread.currentThread().name}_subscribe")
})
//Disposable通過 dispose() ?方法來讓上游停?止?工作伯铣,達到「丟棄」的效果
var disposable: Disposable? = null
val observer = object : Observer<String> {
override fun onSubscribe(d: Disposable) {
//訂閱后發(fā)送數(shù)據(jù)之前, 回調這個方法,Disposable可用于取消訂閱
disposable = d
LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")
}
override fun onNext(t: String) {
LjyLogUtil.d("${Thread.currentThread().name}_onNext:$t")
}
override fun onError(e: Throwable) {
LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")
}
override fun onComplete() {
LjyLogUtil.d("${Thread.currentThread().name}_onComplete")
}
}
//rxJava的整體結構是一條鏈轮纫,鏈的最上游是被觀察者observable腔寡,最下游是觀察者observer,
//鏈的中間各個節(jié)點掌唾,既是其下游的observable放前,又是其上游的observer
observable
//subscribeOn: 切換起源 Observable 的線程,
// 當多次調?用 subscribeOn() 的時候,只有最上?面的會對起源 Observable 起作?用糯彬,
// 原因subscribeOn底層通過新建observable實現(xiàn)
.subscribeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
//observeOn指定的是它之后的操作所在的線程凭语,通過observeOn的多次調用,程序實現(xiàn)了線程的多次切換
//影響范圍observeOn是它下面的每個observer情连,除非又遇到新的observeOn
.observeOn(Schedulers.io())
.map {
LjyLogUtil.d("map:${Thread.currentThread().name}")
"num_$it"
}
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer)
2. Flowable:
- 和 Observable 一樣, 且支持Reactive-Streams和背壓;
- 上游的被觀察者會響應下游觀察者的數(shù)據(jù)請求叽粹,下游調用request(n)來告訴上游發(fā)送多少個數(shù)據(jù)。這樣避免了大量數(shù)據(jù)堆積在調用鏈上却舀,使內存一直處于較低水平;
var sub: Subscription? = null
Flowable.create(FlowableOnSubscribe<Int> {
it.onNext(1)
it.onNext(3)
it.onNext(5)
it.onComplete()
LjyLogUtil.d("${Thread.currentThread().name}_subscribe")
//使用create創(chuàng)建Flowable虫几,需要指定背壓策略
}, BackpressureStrategy.BUFFER)
Flowable.range(0, 5)
.subscribe(object : Subscriber<Int> {
override fun onSubscribe(s: Subscription?) {
//當訂閱后,會首先調用這個方法挽拔,其實就相當于onStart()辆脸,
//傳入的Subscription s參數(shù)可以用于請求數(shù)據(jù)或者取消訂閱
LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe start")
sub = s
sub?.request(1)
LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe end")
}
override fun onNext(t: Int?) {
LjyLogUtil.d("${Thread.currentThread().name}_onNext:$t")
sub?.request(1)
}
override fun onError(t: Throwable?) {
LjyLogUtil.d("${Thread.currentThread().name}_onError:${t?.message}")
}
override fun onComplete() {
LjyLogUtil.d("${Thread.currentThread().name}_onComplete")
}
})
3. Single
- 單次發(fā)送事件(onSuccess、onError)螃诅,發(fā)完即結束訂閱;
- 總是只發(fā)射一個值啡氢,或者一個錯誤通知,而不是發(fā)射一系列的值(當然就不存在背壓問題);
- 一般一個接口只是一次請求一次返回术裸,所以使用Single 與 Retrofit 配合是更為合理;
Single.just(1)
.map { "num_$it" }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { num ->
LjyLogUtil.d(num)
}
Single.create(SingleOnSubscribe<String> { emitter ->
LjyLogUtil.d("${Thread.currentThread().name}_subscribe")
emitter.onSuccess("str1")
emitter.onSuccess("str2")//錯誤寫法倘是,重復調用也不會處理,因為只會調用一次
}).subscribe(object : SingleObserver<String> {
override fun onSubscribe(d: Disposable) {
LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")
}
override fun onSuccess(t: String) {
LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$t")
}
override fun onError(e: Throwable) {
LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")
}
})
4. Completable
- 單次發(fā)送事件(onError袭艺、onComplete)搀崭,發(fā)完即結束訂閱;
- 如果你的觀察者連onNext事件都不關心,可以使用Completable猾编,它只有onComplete和onError兩個事件,要轉換成其他類型的被觀察者瘤睹,也是可以使用toFlowable()升敲、toObservable()等方法去轉換;
Completable.create { emitter ->
LjyLogUtil.d("${Thread.currentThread().name}_subscribe")
emitter.onComplete()//單一onComplete或者onError
}.subscribe(object : CompletableObserver {
override fun onSubscribe(d: Disposable) {
LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")
}
override fun onComplete() {
LjyLogUtil.d("${Thread.currentThread().name}_onComplete")
}
override fun onError(e: Throwable) {
LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")
}
})
5. Maybe
- 單次發(fā)送事件(onSuccess、onError轰传、onComplete)驴党,發(fā)完即結束訂閱。相當于 Completable 和 Single 結合;
- 如果可能發(fā)送一個數(shù)據(jù)或者不會發(fā)送任何數(shù)據(jù)获茬,這時候你就需要Maybe港庄,它類似于Single和Completable的混合體;
- onSuccess和onComplete是互斥的存在;
Maybe.create(MaybeOnSubscribe<String> {
it.onSuccess("str1")
it.onComplete()
}).subscribe(object : MaybeObserver<String> {
override fun onSubscribe(d: Disposable) {
LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")
}
override fun onSuccess(t: String) {
LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$t")
}
override fun onError(e: Throwable) {
LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")
}
override fun onComplete() {
LjyLogUtil.d("${Thread.currentThread().name}_onComplete")
}
})
- 五種被觀察者可通過toObservable,toFlowable,toSingle,toCompletable,toMaybe相互轉換;
觀察者
val observer = object : Observer<Int> {
override fun onSubscribe(d: Disposable) {
LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")
}
override fun onNext(t: Int) {
LjyLogUtil.d("${Thread.currentThread().name}_onNext:$t")
}
override fun onError(e: Throwable) {
LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")
}
override fun onComplete() {
LjyLogUtil.d("${Thread.currentThread().name}_onComplete")
}
}
線程調度器(Schedulers)
- 指定 被觀察者(Observable)/ 觀察者(Observer)的工作線程,簡化了異步操作;
- 默認在創(chuàng)建自身的線程;
- 配合操作符 subscribeOn , observeOn 使用;
AndroidSchedulers.mainThread()
- 需要引用RxAndroid, 切換到UI線程(Android的主線程), 為Android開發(fā)定制;
Schedulers.io()
- 用于IO密集型任務锦茁,如讀寫SD卡文件攘轩,查詢數(shù)據(jù)庫,訪問網(wǎng)絡等;
- 具有線程緩存機制码俩,默認是一個CacheThreadScheduler;
Schedulers.newThread()
- 為每一個任務創(chuàng)建一個新線程;
- 不具有線程緩存機制,雖然使用Schedulers.io的地方歼捏,都可以使用Schedulers.newThread稿存,但是,Schedulers.newThread的效率沒有Schedulers.io高;
Schedulers.computation()
- 用于CPU 密集型計算任務瞳秽,即不會被 I/O 等操作限制性能的耗時操作瓣履,例如xml,json文件的解析,Bitmap圖片的壓縮取樣等练俐,具有固定的線程池袖迎,大小為CPU的核數(shù)。不可以用于I/O操作腺晾,因為I/O操作的等待時間會浪費CPU燕锥。
Schedulers.trampoline()
- 在當前線程立即執(zhí)行任務,如果當前線程有任務在執(zhí)行悯蝉,則會將其暫停归形,等插入進來的任務執(zhí)行完之后,再將未完成的任務接著執(zhí)行;
Schedulers.single()
- 擁有一個線程單例鼻由,所有的任務都在這一個線程中執(zhí)行暇榴,當此線程中有任務執(zhí)行時,其他任務將會按照先進先出的順序依次執(zhí)行;
Scheduler.from(executor)
- 指定一個線程調度器蕉世,由此調度器來控制任務的執(zhí)行策略;
背壓(Backpressure)
- 背壓是指在異步場景中蔼紧,被觀察者發(fā)送事件速度遠快于觀察者的處理速度的情況下,一種告訴上游的被觀察者降低發(fā)送速度的策略;
- 支持背壓的被觀察者為Flowable狠轻;
- Android中很少用到奸例,除非在線視頻流,直播等場景哈误,當畫面卡頓已取得的數(shù)據(jù)失效了哩至,需要拋棄等;
背壓策略模式
BackpressureStrategy.MISSING
- 在此策略下躏嚎,通過Create方法創(chuàng)建的Flowable相當于沒有指定背壓策略,不會對通過onNext發(fā)射的數(shù)據(jù)做緩存或丟棄處理菩貌,需要下游通過背壓操作符處理
BackpressureStrategy.ERROR:
- 在此策略下卢佣,如果放入Flowable的異步緩存池中的數(shù)據(jù)超限了,則會拋出MissingBackpressureException異常;
BackpressureStrategy.BUFFER:
- 內部維護了一個緩存池SpscLinkedArrayQueue箭阶,其大小不限虚茶,此策略下,如果Flowable默認的異步緩存池滿了仇参,會通過此緩存池暫存數(shù)據(jù)嘹叫,它與Observable的異步緩存池一樣,可以無限制向里添加數(shù)據(jù)诈乒,不會拋出MissingBackpressureException異常罩扇,但會導致OOM;
- 當緩存區(qū)大小存滿(默認緩存區(qū)大小 = 128)、被觀察者仍然繼續(xù)發(fā)送下1個事件時怕磨,將緩存區(qū)大小設置成無限大喂饥,被觀察者可無限發(fā)送事件 觀察者,但實際上是存放在緩存區(qū)肠鲫,但要注意內存情況员帮,防止出現(xiàn)OOM;
BackpressureStrategy.DROP
- 在此策略下,如果Flowable的異步緩存池滿了导饲,會丟掉上游發(fā)送的數(shù)據(jù);
BackpressureStrategy.LATEST
- 與Drop策略一樣捞高,如果緩存池滿了,會丟掉將要放入緩存池中的數(shù)據(jù)渣锦,不同的是硝岗,不管緩存池的狀態(tài)如何,LATEST都會將最后一條數(shù)據(jù)強行放入緩存池中泡挺,來保證觀察者在接收到完成通知之前辈讶,能夠接收到Flowable最新發(fā)射的一條數(shù)據(jù);
- 即如果發(fā)送了150個事件,緩存區(qū)里會保存129個事件(第1-第128 + 第150事件);
RxJava 2.0內部提供 封裝了背壓策略模式的方法
- 默認采用BackpressureStrategy.ERROR模式;
onBackpressureBuffer()
onBackpressureDrop()
onBackpressureLatest()
Flowable.interval(1, TimeUnit.MILLISECONDS)
//添加背壓策略封裝好的方法娄猫,此處選擇Buffer模式贱除,即緩存區(qū)大小無限制
.onBackpressureBuffer()
.observeOn(Schedulers.newThread())
.subscribe(subscriber)
冷熱流
Cold Observable
- subscribe時才會發(fā)射數(shù)據(jù);
- 常見的工廠方法提供的都是ColdObservable,包括just(),fromXX,create(),interval(),defer();
- 當你有多個Subscriber的時候,他們的事件是獨立的,示例代碼如下:
val interval = Observable.interval(1, TimeUnit.SECONDS)
var disposable1: Disposable? = null
val observer1 = object : Observer<Long> {
override fun onSubscribe(d: Disposable) {
disposable1 = d
}
override fun onNext(t: Long) {
LjyLogUtil.d("觀察者1:$t")
}
override fun onError(e: Throwable) {
}
override fun onComplete() {
}
}
var disposable2: Disposable? = null
val observer2 = object : Observer<Long> {
override fun onSubscribe(d: Disposable) {
disposable1 = d
}
override fun onNext(t: Long) {
LjyLogUtil.d("觀察者2:$t")
}
override fun onError(e: Throwable) {
}
override fun onComplete() {
}
}
findViewById<Button>(R.id.btn_subscribe1)
.clicks().subscribe {
interval.subscribe(observer1)
}
findViewById<Button>(R.id.btn_dispose1)
.clicks().subscribe {
disposable1?.dispose()
}
findViewById<Button>(R.id.btn_subscribe2)
.clicks().subscribe {
interval.subscribe(observer2)
}
findViewById<Button>(R.id.btn_dispose2)
.clicks().subscribe {
disposable2?.dispose()
}
Hot Observable
- 對于Hot Observable的所有subscriber,他們會在同一時刻收到相同的數(shù)據(jù);
- 通常使用publish()操作符來將ColdObservable變?yōu)镠ot媳溺≡禄希或者 使用 Subjects 也是Hot Observable;
- 如果他開始傳輸數(shù)據(jù),你不主動喊停(dispose()/cancel())悬蔽,那么他就不會停扯躺,一直發(fā)射數(shù)據(jù),即使他已經(jīng)沒有Subscriber了;
val interval = Observable.interval(1, TimeUnit.SECONDS).publish()
var disposable: Disposable? = null
findViewById<Button>(R.id.btn_connect)
.clicks().subscribe {
disposable = interval.connect()
}
findViewById<Button>(R.id.btn_dispose)
.clicks().subscribe {
disposable?.dispose()
}
findViewById<Button>(R.id.btn_subscribe3)
.clicks().subscribe {
interval.subscribe {
LjyLogUtil.d("觀察者3:$it")
}
}
findViewById<Button>(R.id.btn_subscribe4)
.clicks().subscribe {
interval.subscribe {
LjyLogUtil.d("觀察者4:$it")
}
}
操作符
- 操作符很多,不用完全背下來录语,瀏覽一遍倍啥,用時知道在哪找(關注+收藏本文[狗頭])即可
線程切換
- subscribeOn:指定被觀察者Observable的工作線程;
- observeOn:指定觀察者的observer工作線程;
observable
//subscribeOn: 切換起源 Observable 的線程,
// 當多次調?用 subscribeOn() 的時候,只有最上?面的會對起源 Observable 起作?用澎埠,
// 原因subscribeOn底層通過新建observable實現(xiàn)
.subscribeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
//observeOn指定的是它之后的操作所在的線程虽缕,通過observeOn的多次調用,程序實現(xiàn)了線程的多次切換
//影響范圍observeOn是它下面的每個observer蒲稳,除非又遇到新的observeOn
.observeOn(Schedulers.io())
.map {
LjyLogUtil.d("map:${Thread.currentThread().name}")
"num_$it"
}
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer)
創(chuàng)建操作
基本創(chuàng)建
create
- 通過調用觀察者的方法從頭創(chuàng)建一個Observable;
Observable.create(ObservableOnSubscribe<Int> {
LjyLogUtil.d("${Thread.currentThread().name}_subscribe")
it.onNext(1)
it.onNext(3)
it.onNext(5)
it.onComplete()
})
快速創(chuàng)建
just
- 將對象或者對象集合轉換為一個會發(fā)射這些對象的Observable;
Observable.just(1, 2, 3, 4)
fromArray & fromIterable
- 將其它的對象或數(shù)據(jù)結構轉換為Observable;
Observable.fromArray(arrayOf(1,2,3))
Observable.fromIterable(listOf(4,5,6))
never
- 創(chuàng)建的被觀察者不發(fā)送任何事件, 觀察者接收后什么都不調用;
Observable.never<Int>().subscribe(observer)
empty
- 創(chuàng)建的被觀察者僅發(fā)送Complete事件氮趋,直接通知完成, 觀察者接收后會直接調用onCompleted;
Observable.empty<Int>().subscribe(observer)
error
- 創(chuàng)建的被觀察者僅發(fā)送Error事件,直接通知異常, 觀察者接收后會直接調用onError;
Observable.error<Int>(RuntimeException()).subscribe(observer)
延遲創(chuàng)建
defer:
- 在觀察者訂閱之前不創(chuàng)建這個Observable江耀,為每一個觀察者創(chuàng)建一個新的Observable;
var num=1
val observable=Observable.defer {Observable.just(num)}
num=2
observable.subscribe(observer)
timer
- 創(chuàng)建在一個指定的延遲之后發(fā)射單個數(shù)據(jù)的Observable;
//延時3秒后剩胁,發(fā)送一個整數(shù)0
Observable.timer(3, TimeUnit.SECONDS)
interval & intervalRange
- 創(chuàng)建一個定時發(fā)射整數(shù)序列的Observable;
//初始延時1秒,每3秒發(fā)一個自增整數(shù)
Observable.interval(1, 3, TimeUnit.SECONDS)
//初始延時2秒祥国,后每1秒發(fā)一個從10開始的整數(shù)昵观,發(fā)5個(發(fā)到14)停止
Observable.intervalRange(10, 5, 2, 1, TimeUnit.SECONDS);
range & rangeLong
- 創(chuàng)建發(fā)射指定范圍的整數(shù)序列的Observable;
Observable.range(0, 5)
Observable.rangeLong(0, 5)
Repeat
- 創(chuàng)建重復發(fā)射特定的數(shù)據(jù)或數(shù)據(jù)序列的Observable;
//一直重復
Observable.fromArray(1, 2, 3, 4).repeat()
//重復發(fā)送5次
Observable.fromArray(1, 2, 3, 4).repeat(5)
//重復發(fā)送直到符合條件時停止重復
Observable.fromArray(1, 2, 3, 4).repeatUntil { false }
//
Observable.just(1, 2, 3, 4)
.repeatWhen {
it.flatMap { obj ->
if (obj is NumberFormatException) {
Observable.error(Throwable("repeatWhen終止"))
} else {
Observable.just(5, 6, 7)
}
}
}.subscribe(observer)
2. 變換操作:
map
- 映射,通過對序列的每一項都應用一個函數(shù)變換Observable發(fā)射的數(shù)據(jù)舌稀,實質是對序列中的每一項執(zhí)行一個函數(shù)索昂,函數(shù)的參數(shù)就是這個數(shù)據(jù)項;
Observable.just("1", "2", "3").map { it.toInt() }.subscribe(observer)
flatMap & concatMap
- 扁平映射,將Observable發(fā)射的數(shù)據(jù)變換為Observables集合扩借,然后將這些Observable發(fā)射的數(shù)據(jù)平坦化的放進一個單獨的Observable,可以認為是一個將嵌套的數(shù)據(jù)結構展開的過程;
//concatMap與flatMap的區(qū)別: concatMap是有序的缤至,flatMap是無序的
Observable.just("A", "B", "C")
.flatMap { x ->
Observable.intervalRange(1, 3, 0, 1, TimeUnit.SECONDS).map { y ->
"($x,$y)"
}
}
Observable.just("A", "B", "C")
.concatMap { m ->
Observable.intervalRange(1, 3, 0, 1, TimeUnit.SECONDS).map { n ->
"($m,$n)"
}
}
groupBy
- 分組潮罪,將原來的Observable分拆為Observable集合,將原始Observable發(fā)射的數(shù)據(jù)按Key分組领斥,每一個Observable發(fā)射一組不同的數(shù)據(jù);
Observable.just(
"Tiger",
"Elephant",
"Cat",
"Chameleon",
"Frog",
"Fish",
"Turtle",
"Flamingo"
)
.groupBy { it[0].uppercaseChar() }
.concatMapSingle { it.toList() }
.subscribe(observer4)
scan
- 掃描嫉到,對Observable發(fā)射的每一項數(shù)據(jù)應用一個函數(shù),然后按順序依次發(fā)射這些值;
- 對發(fā)射的數(shù)據(jù)和上一輪發(fā)射的數(shù)據(jù)進行函數(shù)處理月洛,并返回的數(shù)據(jù)供下一輪使用何恶,持續(xù)這個過程來產生剩余的數(shù)據(jù)流。其應用場景有簡單的累加計算嚼黔,判斷所有數(shù)據(jù)的最小值等;
Observable.just(1, 2, 3, 4)
.scan { t1, t2 -> t1 + t2 }
.subscribe(observer)
buffer
- 緩存细层,定期從Observable收集數(shù)據(jù)到一個集合,然后把這些數(shù)據(jù)集合打包發(fā)射唬涧,而不是一次發(fā)射一個
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
.buffer(3)
.subscribe(observer5)
Window
- 窗口疫赎,定期將來自Observable的數(shù)據(jù)分拆成一些Observable窗口,然后發(fā)射這些窗口碎节,而不是每次發(fā)射一項; 類似于Buffer捧搞,但Buffer發(fā)射的是數(shù)據(jù),Window發(fā)射的是Observable,每一個Observable發(fā)射原始Observable的數(shù)據(jù)的一個子集;
//window操作符和buffer操作符在功能上實現(xiàn)的效果是一樣的胎撇,但window操作符最大區(qū)別在于同樣是緩存一定數(shù)量的數(shù)據(jù)項介粘,
// window操作符最終發(fā)射出來的是新的事件流integerObservable,而buffer操作符發(fā)射出來的是新的數(shù)據(jù)流晚树,
// 也就是說姻采,window操作符發(fā)射出來新的事件流中的數(shù)據(jù)項,還可以經(jīng)過Rxjava其他操作符進行處理题涨。
Observable.just(1, 2, 3, 4)
.window(2, 1)
.subscribe(observer6)
3. 過濾操作:
Filter
- 過濾偎谁,過濾掉沒有通過謂詞測試的數(shù)據(jù)項,只發(fā)射通過測試的;
Observable.just(1, 2, 3, 4, 5)
.filter {
it % 2 == 0
}.subscribe(observer)
ofType
- 過濾特定類型的數(shù)據(jù);
Observable.just(1, 2.1, "3", 4L, 5.0)
.ofType(Int::class.java)
.subscribe(observer)
distinct & distinctUntilChanged
- 去重纲堵,過濾掉重復數(shù)據(jù)項;
Observable.just(1, 2, 3, 4, 3, 2, 1)
.distinct()
// .distinctUntilChanged()//去掉相鄰連續(xù)重復數(shù)據(jù)
.subscribe(observer)
skip & skipLast
val observable = Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
// 跳過, 跳過前面的若干項數(shù)據(jù);
observable.skip(4).subscribe(observer)
//skipLast:跳過后面的若干項數(shù)據(jù)
observable.skipLast(4).subscribe(observer)
take & takeLast
val observable = Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
//take:只保留前面的若干項數(shù)據(jù)
observable.take(4).subscribe(observer)
//takeLast:只保留后面的若干項數(shù)據(jù)
observable.takeLast(4).subscribe(observer)
debounce
- 只有在空閑了一段時間后才發(fā)射數(shù)據(jù)巡雨,通俗的說,就是如果一段時間沒有操作席函,就執(zhí)行一次操作;
val observable2 = Observable.create<Int> {
it.onNext(1)
Thread.sleep(400)
it.onNext(2)
Thread.sleep(1200)
it.onNext(3)
Thread.sleep(1000)
it.onNext(4)
Thread.sleep(800)
it.onNext(5)
Thread.sleep(2000)
it.onNext(6)
}
observable2
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.debounce(1, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer)
sample
- 取樣铐望,定期發(fā)射最新的數(shù)據(jù),等于是數(shù)據(jù)抽樣;
//sample:與debounce的區(qū)別是茂附,sample是以時間為周期的發(fā)射正蛙,一秒又一秒內的最新數(shù)據(jù)。而debounce是最后一個有效數(shù)據(jù)開始
observable2
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.sample(1, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer)
throttleFirst & throttleLast & throttleWithTimeout & throttleLatest
- throttleFirst是指定周期內第一個數(shù)據(jù)营曼,throttleLast與sample一致乒验。throttleWithTimeout與debounce一致;
observable2
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.throttleFirst(1, TimeUnit.SECONDS)
// .throttleLast(1, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer)
timeout
- 后一個數(shù)據(jù)發(fā)射未在前一個元素發(fā)射后規(guī)定時間內發(fā)射則返回超時異常;
observable2
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.timeout(1, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer)
elementAt
- 取值,取特定位置的數(shù)據(jù)項;
Observable.just(4, 3, 2, 1)
.elementAt(1)
//出現(xiàn)越界時蒂阱,拋出異常
// .elementAtOrError(1)
.subscribe(maybeObserver)
first
- 首項锻全,只發(fā)射滿足條件的第一條數(shù)據(jù);
Observable.just(1, 2, 3, 4, 5)
// .first(-1)
// .firstOrError()
.firstElement()
.subscribe(maybeObserver)
last
- 末項,只發(fā)射最后一條數(shù)據(jù);
Observable.just(1, 2, 3, 4, 5)
// .last(-1)
// .lastOrError()
.lastElement()
.subscribe(maybeObserver)
IgnoreElements
- 忽略所有的數(shù)據(jù)录煤,只保留終止通知(onError或onCompleted),ignoreElements 作用于Flowable鳄厌、Observable。ignoreElement作用于Maybe妈踊、Single;
Observable.just(1, 2, 3, 4, 5)
.ignoreElements()
.subscribe(object : CompletableObserver {
override fun onSubscribe(d: Disposable) {
LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")
}
override fun onComplete() {
LjyLogUtil.d("${Thread.currentThread().name}_onComplete")
}
override fun onError(e: Throwable) {
LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")
}
})
4. 組合操作:
concat & concatArray
- 不交錯的連接多個Observable的數(shù)據(jù);
- 組合多個被觀察者一起發(fā)送數(shù)據(jù)了嚎,合并后 按發(fā)送順序串行執(zhí)行;
val just1 = Observable.just(1, 2, 3)
val just2 = Observable.just("A", "B", "C")
val just3 = Observable.just(4, 5, 6)
Observable.concat(just1, just2).subscribe(observer3)
//concat組合被觀察者數(shù)量<=4個,而concatArray則可>4個
Observable.concatArray(
Observable.just(1, 2, 3),
Observable.just(4, 5, 6),
Observable.just(7, 8, 9)
).subscribe(observer3)
merge & mergeArray
- 組合多個被觀察者一起發(fā)送數(shù)據(jù)廊营,合并后 按時間線并行執(zhí)行;
- merge和concat的區(qū)別: merge合并后發(fā)射的數(shù)據(jù)項是并行無序的歪泳,concat合并后發(fā)射的數(shù)據(jù)項是串行有序的;
Observable.merge(just1, just2).subscribe(observer3)
//mergeWith
just1.mergeWith(just3).subscribe(observer3)
zip
- 打包,使用一個指定的函數(shù)將多個Observable發(fā)射的數(shù)據(jù)組合在一起赘风,然后將這個函數(shù)的結果作為單項數(shù)據(jù)發(fā)射;
//zip操作符是將兩個數(shù)據(jù)流進行指定的函數(shù)規(guī)則合并
Observable.zip(just1, just2, { t1, t2 -> "${t1}_${t2}" })
.subscribe(observer3)
//zipWith
just1.zipWith(just2, { t1, t2 -> "${t1}_${t2}" })
.subscribe(observer3)
startWith & startWithArray
- 在發(fā)射原來的Observable的數(shù)據(jù)序列之前夹囚,先發(fā)射一個指定的數(shù)據(jù)序列或數(shù)據(jù)項;
Observable.just(1,2,3)
.startWith(Observable.just(4,5,6))
.startWithArray(7,8,9)
.subscribe(observer3)
join
- 無論何時,如果一個Observable發(fā)射了一個數(shù)據(jù)項邀窃,只要在另一個Observable發(fā)射的數(shù)據(jù)項定義的時間窗口內荸哟,就將兩個Observable發(fā)射的數(shù)據(jù)合并發(fā)射;
just1.join(just2,
//規(guī)定just2的過期期限
{ Observable.timer(3, TimeUnit.SECONDS) },
//規(guī)定just1的過期期限
{ Observable.timer(8, TimeUnit.SECONDS) },
//規(guī)定just1和just2的合并規(guī)則
{ t1, t2 -> "${t1}_${t2}" })
.subscribe(observer3)
combineLatest
- 當兩個Observables中的任何一個發(fā)射了一個數(shù)據(jù)時假哎,通過一個指定的函數(shù)組合每個Observable發(fā)射的最新數(shù)據(jù)(一共兩個數(shù)據(jù)),然后發(fā)射這個函數(shù)的結果;
Observable.combineLatest(just1, just2,
{ t1, t2 -> "${t1}_${t2}" }).subscribe(observer3)
switch
- 將一個發(fā)射Observable序列的Observable轉換為這樣一個Observable:它逐個發(fā)射那些Observable最近發(fā)射的數(shù)據(jù);
Observable.switchOnNext(ObservableSource<Observable<Int>> { }, 12)
collect
- 將被觀察者Observable發(fā)送的數(shù)據(jù)事件收集到一個數(shù)據(jù)結構里;
Observable.just(1, 2, 3, 4, 5, 6)
.collect(
{ ArrayList() },
BiConsumer<ArrayList<Int?>, Int> { t1, t2 -> t1.add(t2) }
).subscribe(Consumer { LjyLogUtil.d("num: $it") })
count
- 計算Observable發(fā)射的數(shù)據(jù)個數(shù)鞍历,然后發(fā)射這個結果;
Observable.just(1, 2, 3)
.count()
.subscribe(object : SingleObserver<Long>{
override fun onSubscribe(d: Disposable) {
LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")
}
override fun onSuccess(t: Long) {
LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
}
override fun onError(e: Throwable) {
LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")
}
})
5. 錯誤處理:
cast
- 將數(shù)據(jù)元素轉型成其他類型,轉型失敗會拋出異常;
Observable.just(1, 4.0, 3f, 7, 12, 4.6, 5)
.cast(Int::class.java)
.subscribe(observer)
onErrorReturn
- 調用數(shù)據(jù)源的onError函數(shù)后會回到該函數(shù)舵抹,可對錯誤進行處理,然后返回值劣砍,會調用觀察者onNext()繼續(xù)執(zhí)行惧蛹,執(zhí)行完調用onComplete()函數(shù)結束所有事件的發(fā)射;
Observable.just(1, 2, 3.2, 4)
.map { it.toInt() }
.onErrorReturn {
if (it is NumberFormatException) {
0
} else {
throw IllegalArgumentException()
}
}.subscribe(observer)
onErrorReturnItem
- 與onErrorReturn類似,onErrorReturnItem不對錯誤進行處理刑枝,直接返回一個值;
Observable.just(1, 2, 3.2, 4)
.map { it.toInt() }
.onErrorReturnItem(0)
.subscribe(observer)
onErrorResumeNext & onExceptionResumeNext
- 遇到錯誤時香嗓,發(fā)送1個新的Observable;
Observable.just(1, 2, 3.2, 4)
.map { it.toInt() }
.onErrorResumeNext { Observable.just(5,6,7) }
.subscribe(observer)
retry
- 重試,如果Observable發(fā)射了一個錯誤通知装畅,重新訂閱它靠娱,期待它正常終止;
//retry:當發(fā)生錯誤時,數(shù)據(jù)源重復發(fā)射item掠兄,直到?jīng)]有異诚裨疲或者達到所指定的次數(shù)
Observable.just(1, 2, 3, 4)
.retry(3)
.subscribe(observer)
retryUntil
- 發(fā)生異常時,返回值是false表示繼續(xù)執(zhí)行(重復發(fā)射數(shù)據(jù))蚂夕,true不再執(zhí)行,但會調用onError方法;
var temp = 0
Observable.just(1, 2, 3, 4)
.map {
temp = it
it
}
.retryUntil {
temp > 3
}
.subscribe(observer)
retryWhen
- 遇到錯誤時迅诬,將發(fā)生的錯誤傳遞給一個新的被觀察者(Observable),并決定是否需要重新訂閱原始被觀察者(Observable)& 發(fā)送事件;
Observable.just(1, 2, 3.2, 4)
.map { it.toInt() }
.retryWhen {
it.flatMap { throwable ->
if (throwable is NumberFormatException) {
Observable.error(Throwable("retryWhen終止"))
} else {
Observable.just(5, 6, 7)
}
}
}
.subscribe(observer)
6. 輔助操作:
delay
- 延遲一段時間發(fā)射結果數(shù)據(jù);
Observable.just(1, 2, 3)
.delay(1,TimeUnit.SECONDS)
.subscribe(observer)
do
- 在某個事件的生命周期中調用;
- doOnEach:數(shù)據(jù)源(Observable)每發(fā)送一次數(shù)據(jù)婿牍,就調用一次;
- doOnNext:數(shù)據(jù)源每次調用onNext() 之前都會先回調該方法;
- doOnError:數(shù)據(jù)源每次調用onError() 之前會回調該方法;
- doOnComplete:數(shù)據(jù)源每次調用onComplete() 之前會回調該方法;
- doOnSubscribe:數(shù)據(jù)源每次調用onSubscribe() 之后會回調該方法;
- doOnDispose:數(shù)據(jù)源每次調用dispose() 之后會回調該方法;
Observable.just(1, 2, 3, 4)
.observeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
.doOnSubscribe { LjyLogUtil.d("${Thread.currentThread().name}_doOnSubscribe") }
.doOnEach { LjyLogUtil.d("${Thread.currentThread().name}_doOnEach:$it") }
.doOnNext { LjyLogUtil.d("${Thread.currentThread().name}_doOnNext:$it") }
.doOnError { LjyLogUtil.d("${Thread.currentThread().name}_doOnError:${it.localizedMessage}") }
.doOnComplete { LjyLogUtil.d("${Thread.currentThread().name}_doOnComplete") }
.doOnDispose { LjyLogUtil.d("${Thread.currentThread().name}_doOnDispose") }
.subscribe(observer)
7. 條件和布爾操作:
all
- 判斷Observable發(fā)射的所有的數(shù)據(jù)項是否都滿足某個條件;
Observable.just(1, -2, 3)
.all {
it > 0
}.subscribe { it ->
LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
}
takeWhile & skipWhile & takeUntil & skipUntil
// 從0開始每1s發(fā)送1個數(shù)據(jù)
Observable.interval(1, TimeUnit.SECONDS)
//條件滿足時侈贷,取數(shù)據(jù)
.takeWhile {
it<10
}
Observable.interval(1, TimeUnit.SECONDS)
//滿足條件時,跳過數(shù)據(jù)
.skipWhile {
it<10
}
Observable.interval(1, TimeUnit.SECONDS)
//取數(shù)據(jù)等脂,直到滿足條件
.takeUntil {
it>10
}
Observable.interval(1, TimeUnit.SECONDS)
//等待直到傳入的Observable開始發(fā)送數(shù)據(jù)
.skipUntil (Observable.timer(5, TimeUnit.SECONDS))
SequenceEqual:
- 判斷兩個Observable是否按相同的數(shù)據(jù)序列;
Observable.sequenceEqual(
Observable.just(4, 5, 6),
Observable.just(4, 5, 6)
).subscribe { it ->
LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
}
contains
- 判斷Observable是否會發(fā)射一個指定的數(shù)據(jù)項;
Observable.just(1, -2, 3)
.contains(2)
.subscribe { it ->
LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
}
isEmpty
- 判斷發(fā)送的數(shù)據(jù)是否為空;
Observable.just(1, -2, 3)
.isEmpty()
.subscribe { it ->
LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
}
amb
-給定多個Observable铐维,只讓第一個發(fā)射數(shù)據(jù)的Observable發(fā)射全部數(shù)據(jù);
val list: MutableList<ObservableSource<Int>> = ArrayList()
list.add(Observable.just(1, 2, 3).delay(1, TimeUnit.SECONDS))
list.add(Observable.just(4, 5, 6))
Observable.amb(list).subscribe{
LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
}
defaultIfEmpty
- 發(fā)射來自原始Observable的數(shù)據(jù),如果原始Observable沒有發(fā)射數(shù)據(jù)慎菲,就發(fā)射一個默認數(shù)據(jù);
//在不發(fā)送onNext事件, 僅發(fā)送onComplete事件
Observable.empty<Int>()
.defaultIfEmpty(-1)
.subscribe { it ->
LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
}
- SkipUntil:丟棄原始Observable發(fā)射的數(shù)據(jù),直到第二個Observable發(fā)射了一個數(shù)據(jù)锨并,然后發(fā)射原始Observable的剩余數(shù)據(jù);
實戰(zhàn)
結合 RxBinding 使用
- RxBinding是對 Android View 事件的擴展, 它使得開發(fā)者可以對 View 事件使用 RxJava 的各種操作露该;
1. 添加依賴
//RxBinding
implementation 'com.jakewharton.rxbinding4:rxbinding:4.0.0'
//Google 'material' library bindings:
implementation 'com.jakewharton.rxbinding4:rxbinding-material:4.0.0'
//AndroidX library bindings:
implementation 'com.jakewharton.rxbinding4:rxbinding-core:4.0.0'
implementation 'com.jakewharton.rxbinding4:rxbinding-appcompat:4.0.0'
implementation 'com.jakewharton.rxbinding4:rxbinding-drawerlayout:4.0.0'
implementation 'com.jakewharton.rxbinding4:rxbinding-leanback:4.0.0'
implementation 'com.jakewharton.rxbinding4:rxbinding-recyclerview:4.0.0'
implementation 'com.jakewharton.rxbinding4:rxbinding-slidingpanelayout:4.0.0'
implementation 'com.jakewharton.rxbinding4:rxbinding-swiperefreshlayout:4.0.0'
implementation 'com.jakewharton.rxbinding4:rxbinding-viewpager:4.0.0'
implementation 'com.jakewharton.rxbinding4:rxbinding-viewpager2:4.0.0'
2. 按鈕防抖
val btn1 = findViewById<Button>(R.id.btn_1)
btn1.clicks()
.throttleFirst(2, TimeUnit.SECONDS)
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe {
LjyLogUtil.d("點擊按鈕")
}
3. editText輸入監(jiān)聽
//也可用作聯(lián)想搜索優(yōu)化
val et1 = findViewById<EditText>(R.id.et_1)
et1.textChanges()
.debounce(1, TimeUnit.SECONDS)
//跳過第1次請求 因為初始輸入框的空字符狀態(tài)
.skip(1)
.observeOn(AndroidSchedulers.mainThread())
.subscribe{
LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
}
4. 聯(lián)合/表單判斷
val etName = findViewById<EditText>(R.id.et_name)
val etPwd = findViewById<EditText>(R.id.et_pwd)
val obName = etName.textChanges()
val obPwd = etPwd.textChanges()
Observable.combineLatest(
obName, obPwd, { name, pwd -> name == "ljy" && pwd == "123" })
//跳過第1次請求 因為初始輸入框的空字符狀態(tài)
.skip(1)
.observeOn(AndroidSchedulers.mainThread())
.subscribe { isLogin -> LjyLogUtil.d(if (isLogin) "登錄成功" else "登錄失敗") }
5. 定時器任務
val time = 10L
val btnLogin = findViewById<Button>(R.id.btn_login)
btnLogin.clicks()
.throttleFirst(time, TimeUnit.SECONDS)
.subscribeOn(AndroidSchedulers.mainThread())
.doOnNext { btnLogin.isEnabled = false }
.subscribe {
LjyLogUtil.d("點擊登錄")
Observable.intervalRange(
0, time, 0, 1,
TimeUnit.SECONDS, AndroidSchedulers.mainThread()
)
.subscribe(
{ btnLogin.text = "剩余${time - it}秒" },
{ LjyLogUtil.e(it.message) },
{
btnLogin.text = "獲取驗證碼"
btnLogin.isEnabled = true
})
}
利用RxLifecycle解決內存泄漏問題
1. 添加依賴
//RxLifecycle
implementation 'com.trello.rxlifecycle4:rxlifecycle:4.0.2'
// If you want to bind to Android-specific lifecycles
implementation 'com.trello.rxlifecycle4:rxlifecycle-android:4.0.2'
// If you want pre-written Activities and Fragments you can subclass as providers
implementation 'com.trello.rxlifecycle4:rxlifecycle-components:4.0.2'
// If you want pre-written support preference Fragments you can subclass as providers
implementation 'com.trello.rxlifecycle4:rxlifecycle-components-preference:4.0.2'
// If you want to use Android Lifecycle for providers
implementation 'com.trello.rxlifecycle4:rxlifecycle-android-lifecycle:4.0.2'
// If you want to use Kotlin syntax
implementation 'com.trello.rxlifecycle4:rxlifecycle-kotlin:4.0.2'
// If you want to use Kotlin syntax with Android Lifecycle
implementation 'com.trello.rxlifecycle4:rxlifecycle-android-lifecycle-kotlin:4.0.2'
2. 綁定
Observable.intervalRange(0, 60, 0, 1, TimeUnit.SECONDS)
//利用RxLifecycle來解決內存泄漏問題
//bindToLifecycle的自動取消訂閱示例
//.compose(bindToLifecycle())
//手動設置在activity onPause的時候取消訂閱
.compose(this.bindUntilEvent(ActivityEvent.PAUSE))
.subscribe(
{ LjyLogUtil.d("剩余${60 - it}秒") },
{ LjyLogUtil.e(it.message) },
{ LjyLogUtil.d("完成") }
)
網(wǎng)絡請求
1. 網(wǎng)絡請求嵌套回調
//使用observeOn多次切換線程
apiService.searchRepo(emptyMap()) // 請求搜索列表
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext {
//展示列表
}
.observeOn(Schedulers.io())
.flatMap {
Observable.fromIterable(it.items).map { repo -> repo.id }
}
.flatMap {
apiService.getItem(it) // 請求詳情
}
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
//展示詳情
}
2. 網(wǎng)絡請求輪詢
//每60秒一次,無限輪詢
Observable.interval(60, TimeUnit.SECONDS)
.doOnNext {
apiService.searchRepo(emptyMap())
.subscribe(Consumer { LjyLogUtil.d("accept: ${it.items}") })
}.subscribe { LjyLogUtil.d("第 $it 次輪詢") }
//有條件輪詢
var count = 0
apiService.searchRepo(emptyMap())
.repeatWhen {
it.flatMap {
if (count > 3) {
Observable.error(Throwable("輪詢結束"))
} else {
LjyLogUtil.d("第 $count 次輪詢")
Observable.just(1).delay(60, TimeUnit.SECONDS)
}
}
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
LjyLogUtil.d("accept: ${it.items}")
count++
}, {
LjyLogUtil.d(it.message)
})
3. 網(wǎng)絡請求出錯重連
//最大重試次數(shù)
val maxConnectCount = 10
//已重試次數(shù)
var currentRetryCount = 0
apiService.searchRepo(emptyMap())
.retryWhen {
it.flatMap { throwable ->
//根據(jù)異常類型選擇是否重試
if (throwable is IOException) {
if (currentRetryCount < maxConnectCount) {
currentRetryCount++
//遇到的異常越多第煮,重試延遲間隔時間越長
Observable.just(1).delay(currentRetryCount * 60L, TimeUnit.SECONDS)
} else {
Observable.error(Throwable("重試結束"))
}
} else {
Observable.error(Throwable("發(fā)生了非網(wǎng)絡異常(非I/O異常)"))
}
}
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
LjyLogUtil.d("accept: ${it.items}")
}, {
LjyLogUtil.d(it.message)
})
4. 內存解幼,磁盤,網(wǎng)絡 三級緩存
var memoryCache: String? = null
var diskCache: String? = null
private fun test10() {
val memory = Observable.create<String> {
if (memoryCache != null) {
it.onNext(memoryCache)
} else {
it.onComplete()
}
}
val disk = Observable.create<String> {
if (diskCache != null) {
it.onNext(diskCache)
} else {
it.onComplete()
}
}.doOnNext {
memoryCache = "內存數(shù)據(jù)"
}
val net = Observable.just("網(wǎng)絡數(shù)據(jù)")
.doOnNext {
// memoryCache="內存數(shù)據(jù)"
diskCache = "磁盤數(shù)據(jù)"
}
//通過concat()合并memory包警、disk撵摆、network 3個被觀察者的事件(即檢查內存緩存、磁盤緩存 & 發(fā)送網(wǎng)絡請求)
Observable.concat(memory, disk, net)
//通過firstElement()害晦,從串聯(lián)隊列中取出并發(fā)送第1個有效事件(Next事件)特铝,即依次判斷檢查memory暑中、disk、network
.firstElement()
.subscribe {
LjyLogUtil.d("accept: $it")
}
}
5. 合并數(shù)據(jù)源 & 同時展示
var result = "數(shù)據(jù)來自:"
val net = Observable.just("網(wǎng)絡")
val disk = Observable.just("磁盤")
//使用merge鲫剿,從網(wǎng)絡和本地獲取數(shù)據(jù)并展示
Observable.merge(net, disk)
.subscribe({
result += "$it, "
}, {
}, {
LjyLogUtil.d("result: $result")
})
//使用zip,合并2個網(wǎng)絡請求向獲取數(shù)據(jù)并展示
val repo1 = apiService.getItem(1001).subscribeOn(Schedulers.io())
val repo2 = apiService.getItem(1002).subscribeOn(Schedulers.io())
Observable.zip(
repo1, repo2, { data1, data2 ->
val repoList = ArrayList<RepoDetail>()
repoList.add(data1)
repoList.add(data2)
repoList
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
for (repoDetail in it) {
LjyLogUtil.d("result: ${repoDetail.name}")
}
}
源碼解析
入口
- 以下面代碼為源碼閱讀入口鳄逾,Single是最簡單的被觀察者;
Single.just(1)
.subscribe(object : SingleObserver<String> {
override fun onSubscribe(d: Disposable) {
LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")
}
override fun onSuccess(t: String) {
LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$t")
}
override fun onError(e: Throwable) {
LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")
}
})
被觀察者的創(chuàng)建
- 先看上面代碼中被觀察者的創(chuàng)建方法:Single.just()灵莲;
public static <@NonNull T> Single<T> just(T item) {
Objects.requireNonNull(item, "item is null");//判空
return RxJavaPlugins.onAssembly(new SingleJust<>(item));
}
- 其中第一行為判空雕凹,第二行RxJavaPlugins.onAssembly為一個鉤子方法,方便添加一些額外操作,代碼如下:
@Nullable
static volatile Function<? super Single, ? extends Single> onSingleAssembly;
public static <T> Single<T> onAssembly(@NonNull Single<T> source) {
Function<? super Single, ? extends Single> f = onSingleAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
- 其中onSingleAssembly默認為空政冻,所以實際默認是直接返回source枚抵,也就是SingleJust;
- 所以Single.just就是直接創(chuàng)建了一個被觀察者的實現(xiàn)類SingleJust的實例并返回明场,那么我們看一下SingleJust的代碼汽摹;
public final class SingleJust<T> extends Single<T> {
final T value;
public SingleJust(T value) {
this.value = value;
}
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
observer.onSubscribe(Disposable.disposed());
observer.onSuccess(value);
}
}
- 很簡單吧,繼承了Single榕堰,并重寫了Single唯一的抽象方法subscribeActual竖慧,有個泛型變量value;
- Single的代碼很多,基本都是上面介紹的操作符方法的實現(xiàn)逆屡,用到時可以點進源碼看一看圾旨;
public abstract class Single<@NonNull T> implements SingleSource<T> {
...
protected abstract void subscribeActual(@NonNull SingleObserver<? super T> observer);
...
public final void subscribe(@NonNull SingleObserver<? super T> observer) {
...
}
}
- Single實現(xiàn)了SingleSource接口,并實現(xiàn)subscribe方法魏蔗;
public interface SingleSource<@NonNull T> {
/**
* Subscribes the given {@link SingleObserver} to this {@link SingleSource} instance.
* @param observer the {@code SingleObserver}, not {@code null}
* @throws NullPointerException if {@code observer} is {@code null}
*/
void subscribe(@NonNull SingleObserver<? super T> observer);
}
訂閱
- 下面來看看開頭例子中的第二行的subscribe方法:Single.subscribe()砍的;
public final void subscribe(@NonNull SingleObserver<? super T> observer) {
//判空
Objects.requireNonNull(observer, "observer is null");
//鉤子方法,默認還是入?yún)⒌腟ingleObserver
observer = RxJavaPlugins.onSubscribe(this, observer);
//判空
Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null SingleObserver. Please check the handler provided to RxJavaPlugins.setOnSingleSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
try {
subscribeActual(observer);
} catch (NullPointerException ex) {
throw ex;
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
NullPointerException npe = new NullPointerException("subscribeActual failed");
npe.initCause(ex);
throw npe;
}
}
- 可以看到訂閱方法subscribe中最有用的一行就是調用了subscribeActual莺治,而這里的subscribeActual的實現(xiàn)正式上面的SingleJust中的實現(xiàn)廓鞠;
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
observer.onSubscribe(Disposable.disposed());
observer.onSuccess(value);
}
- 可以看到SingleJust中的subscribeActual直接連著調用了observer的onSubscribe和onSuccess方法,并且onSubscribe的入?yún)橐粋€disposed的Disposable,是個已取消的Disposable谣旁,因為just是沒有延遲的床佳,無需取消和修改,而且onError是不會被調用的榄审,因為瞬發(fā)一個也不會有出錯的可能砌们;
操作符實現(xiàn)
map
- 以map為例,對之前的示例擴展一下搁进,增加個map操作符對數(shù)據(jù)進行轉換,map搞懂了浪感,其他操作符的主要思路也是一樣的;
Single.just(1)
.map { "num_$it" }
.subscribe { num ->
LjyLogUtil.d(num)
}
- 那么看一下map的實現(xiàn):
public final <@NonNull R> Single<R> map(@NonNull Function<? super T, ? extends R> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new SingleMap<>(this, mapper));
}
- map中也是有判空和鉤子饼问,創(chuàng)建了一個SingleMap并返回影兽,其入?yún)⒌谝淮螀?shù)為this,也就是just返回的SingleJust實例莱革,第二個參數(shù)為數(shù)據(jù)轉換的轉換器峻堰;
- 那么來看一下SingleMap:
public final class SingleMap<T, R> extends Single<R> {
final SingleSource<? extends T> source;
final Function<? super T, ? extends R> mapper;
public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
this.source = source;
this.mapper = mapper;
}
@Override
protected void subscribeActual(final SingleObserver<? super R> t) {
source.subscribe(new MapSingleObserver<T, R>(t, mapper));
}
...//省略內部類MapSingleObserver
}
- 可以看到SingleMap的subscribeActual中只有一行代碼讹开,就是調用source的訂閱方法subscribe,并傳入一個觀察者MapSingleObserver實例和SingleMap持有的數(shù)據(jù)轉換器茧妒,其中source也就是前一步的被觀察者SingleJust實例萧吠;
- 而觀察者MapSingleObserver則是負責對上下游數(shù)據(jù)進行轉換 和傳遞,其入?yún)⑾掠蝧ubscribe傳入的觀察者SingleObserver實例桐筏,來看一下它的實現(xiàn):
static final class MapSingleObserver<T, R> implements SingleObserver<T> {
final SingleObserver<? super R> t;
final Function<? super T, ? extends R> mapper;
MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
this.t = t;
this.mapper = mapper;
}
@Override
public void onSubscribe(Disposable d) {
t.onSubscribe(d);
}
@Override
public void onSuccess(T value) {
R v;
try {
//mapper.apply數(shù)據(jù)轉換器對數(shù)據(jù)進行轉換
v = Objects.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(e);
return;
}
t.onSuccess(v);
}
@Override
public void onError(Throwable e) {
t.onError(e);
}
}
- 可以看到MapSingleObserver的onSuccess中調用數(shù)據(jù)轉換器mapper的apply方法對數(shù)據(jù)進行轉換纸型,
并調用構造函數(shù)傳入的SingleObserver的onSuccess將轉換后的數(shù)據(jù)傳遞過去,而onSubscribe梅忌,onError方法則是直接調用了SingleObserver的方法
小結
- 那么示例代碼中map前后整個流程串起來就是:
- Single.just創(chuàng)建了一個被觀察者SingleJust,負責發(fā)送原始數(shù)據(jù)狰腌;
- map創(chuàng)建了一個被觀察者SingleMap,構造函數(shù)入?yún)⑹巧弦徊降腟ingleJust和數(shù)據(jù)轉換器牧氮,也就是對SingleJust進行了包裝琼腔,或者說接管,如何接管的呢踱葛,SingleMap中調用了SingleJust的訂閱方法subscribe丹莲,并傳入觀察者MapSingleObserver,也就是對SingleJust說尸诽,小j啊甥材,你這項目給我(SingleMap)了,后面的客戶你不用跟了性含,把你手里的數(shù)據(jù)給我小弟MapSingleObserver就行了洲赵,以后需要啥他找你要;
- 后面再調用subscribe呢商蕴,實際是調用SingleMap的subscribe叠萍,也就是后面再有客戶其實都是跟SingleMap簽的單子了
- SingleMap收到單子呢轉手就給小弟MapSingleObserver了,反正SingleJust的數(shù)據(jù)也在你手里呢你看看咋給客戶服務吧
- MapSingleObserver就拿著SingleJust的數(shù)據(jù)進行轉換绪商,然后調用客戶(下游subscribe傳入的觀察者)的回調方法進行服務
- ps:
- MapSingleObserver:同樣是觀察者苛谷,我也太累了,又負責接收上游的數(shù)據(jù),又要進行處理晋柱,處理完還得反饋給下游的客戶;
- SingleJust:你累?同樣是被觀察者忿族,我辛辛苦苦做的項目,還不是被SingleMap拿走了句灌,還讓你監(jiān)視(觀察)我账胧,這點業(yè)績都被內部消化了啊,而且項目開始了還把我工號留給客戶( t.onSubscribe(d))落蝙,那客戶是說停就停啊织狐,我這出錯了你也是轉頭就給客戶說霸萦住( t.onError(e));
- 這哪是RxJava移迫,這分明是職場巴摇;
subscribeOn & observeOn
- 在對之前的示例擴展一下厨埋,加入線程切換邪媳,那就是用到了subscribeOn & observeOn 操作符了;
Single.just(1)
.map { "num_$it" }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { num ->
LjyLogUtil.d(num)
}
subscribeOn
- 先來看看subscribeOn的入?yún)chedulers.io():
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
IO = RxJavaPlugins.initIoScheduler(new IOTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
static final class IOTask implements Supplier<Scheduler> {
@Override
public Scheduler get() {
return IoHolder.DEFAULT;
}
}
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
public final class IoScheduler extends Scheduler {
...
}
- 可以看到最后返回的是Scheduler的實現(xiàn)類IoScheduler實例荡陷;
- 再來看看subscribeOn方法的實現(xiàn):
public final Single<T> subscribeOn(@NonNull Scheduler scheduler) {
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new SingleSubscribeOn<>(this, scheduler));
}
- 還是判空和鉤子雨效,然后創(chuàng)建一個SingleSubscribeOn并返回,入?yún)⑹巧嫌蔚谋挥^察者(this)废赞,和線程調度器Scheduler徽龟;
public final class SingleSubscribeOn<T> extends Single<T> {
final SingleSource<? extends T> source;
final Scheduler scheduler;
public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) {
this.source = source;
this.scheduler = scheduler;
}
@Override
protected void subscribeActual(final SingleObserver<? super T> observer) {
//創(chuàng)建一個觀察者SubscribeOnObserver的實例,傳入上游的被觀察者source
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer, source);
//調用下游的觀察者observer的onSubscribe唉地,通知其開始訂閱了据悔,取消的話可以找SubscribeOnObserver
observer.onSubscribe(parent);
//調用IoScheduler的scheduleDirect方法切換線程
Disposable f = scheduler.scheduleDirect(parent);
parent.task.replace(f);
}
...//省略內部類SubscribeOnObserver
}
- 上面subscribeActual中創(chuàng)建了一個被觀察者SubscribeOnObserver,并且調用scheduler.scheduleDirect切換線程耘沼;
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
//其中DisposeTask為Scheduler的內部類
static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
...
DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) {
this.decoratedRun = decoratedRun;
this.w = w;
}
@Override
public void run() {
runner = Thread.currentThread();
try {
try {
decoratedRun.run();
} catch (Throwable ex) {
// Exceptions.throwIfFatal(e); nowhere to go
RxJavaPlugins.onError(ex);
throw ex;
}
} finally {
dispose();
runner = null;
}
}
...
}
- 其中scheduleDirect的入?yún)⑹莻€Runnable极颓,而SubscribeOnObserver實現(xiàn)了Runnable接口和SingleObserver接口,構造方法入?yún)⑹窍掠蔚挠^察者和上游的被觀察者,在自己的onSuccess,onError中直接傳給下游的觀察者耕拷,在run方法中訂閱上游的被觀察者;
static final class SubscribeOnObserver<T>
extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
...
SubscribeOnObserver(SingleObserver<? super T> actual, SingleSource<? extends T> source) {
this.downstream = actual;
this.source = source;
this.task = new SequentialDisposable();
}
...
@Override
public void onSuccess(T value) {
downstream.onSuccess(value);
}
@Override
public void onError(Throwable e) {
downstream.onError(e);
}
...
@Override
public void run() {
source.subscribe(this);
}
}
observeOn
- 先來看看Android專用的線程調度器AndroidSchedulers.mainThread():
//通過靜態(tài)內部類提供HandlerScheduler的單例
public final class AndroidSchedulers {
//開放的入口方法
public static Scheduler mainThread() {
//鉤子
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
//私有的靜態(tài)變量
private static final Scheduler MAIN_THREAD =
RxAndroidPlugins.initMainThreadScheduler(() -> MainHolder.DEFAULT);
//私有的靜態(tài)內部類
private static final class MainHolder {
//本質還說離不開Handler,通過Looper.getMainLooper()切換到UI線程
static final Scheduler DEFAULT
= new HandlerScheduler(new Handler(Looper.getMainLooper()), true);
}
//私有的構造方法讼昆,并拋出異常,因為這里是創(chuàng)建HandlerScheduler的單例骚烧,而不是AndroidSchedulers本身
private AndroidSchedulers() {
throw new AssertionError("No instances.");
}
}
final class HandlerScheduler extends Scheduler {
private final Handler handler;
private final boolean async;
HandlerScheduler(Handler handler, boolean async) {
this.handler = handler;
this.async = async;
}
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
...//省略判空和鉤子方法
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
if (async) {
message.setAsynchronous(true);
}
//通過主線程的handler發(fā)送消息
handler.sendMessageDelayed(message, unit.toMillis(delay));
return scheduled;
}
@Override
public Worker createWorker() {
return new HandlerWorker(handler, async);
}
private static final class HandlerWorker extends Worker {
private final Handler handler;
private final boolean async;
private volatile boolean disposed;
HandlerWorker(Handler handler, boolean async) {
this.handler = handler;
this.async = async;
}
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
...
if (disposed) {
return Disposable.disposed();
}
...
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this;
...
handler.sendMessageDelayed(message, unit.toMillis(delay));
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposable.disposed();
}
return scheduled;
}
@Override
public void dispose() {
disposed = true;
handler.removeCallbacksAndMessages(this /* token */);
}
@Override
public boolean isDisposed() {
return disposed;
}
}
private static final class ScheduledRunnable implements Runnable, Disposable {
private final Handler handler;
private final Runnable delegate;
private volatile boolean disposed;
ScheduledRunnable(Handler handler, Runnable delegate) {
this.handler = handler;
this.delegate = delegate;
}
@Override
public void run() {
try {
delegate.run();
} catch (Throwable t) {
RxJavaPlugins.onError(t);
}
}
@Override
public void dispose() {
handler.removeCallbacks(this);
disposed = true;
}
@Override
public boolean isDisposed() {
return disposed;
}
}
}
- 然后來看observeOn方法:
public final Single<T> observeOn(@NonNull Scheduler scheduler) {
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new SingleObserveOn<>(this, scheduler));
}
- 本質還是創(chuàng)建了一個被觀察者:
public final class SingleObserveOn<T> extends Single<T> {
final SingleSource<T> source;
final Scheduler scheduler;
public SingleObserveOn(SingleSource<T> source, Scheduler scheduler) {
this.source = source;
this.scheduler = scheduler;
}
@Override
protected void subscribeActual(final SingleObserver<? super T> observer) {
//調用上游被觀察者source的訂閱方法subscribe浸赫,傳入一個觀察者ObserveOnSingleObserver,其入?yún)⑹窍掠蔚挠^察者observer
source.subscribe(new ObserveOnSingleObserver<>(observer, scheduler));
}
...//省略內部類ObserveOnSingleObserver
}
- ObserveOnSingleObserver代碼如下, 在自己的onSubscribe調用下游觀察者的onSubscribe赃绊,在自己的onSuccess和onError中調用scheduler.scheduleDirect切換線程既峡,scheduleDirect的入?yún)⑹荝unnable,會調用自己的run方法碧查,而在run方法中調用了下游觀察者的onSuccess和onError傳遞結果运敢,以此實現(xiàn)改變下游觀察者的線程;
static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
...
ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
this.downstream = actual;
this.scheduler = scheduler;
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.setOnce(this, d)) {
downstream.onSubscribe(this);
}
}
@Override
public void onSuccess(T value) {
this.value = value;
Disposable d = scheduler.scheduleDirect(this);
DisposableHelper.replace(this, d);
}
@Override
public void onError(Throwable e) {
this.error = e;
Disposable d = scheduler.scheduleDirect(this);
DisposableHelper.replace(this, d);
}
@Override
public void run() {
Throwable ex = error;
if (ex != null) {
downstream.onError(ex);
} else {
downstream.onSuccess(value);
}
}
...
}
番外:Single.create
- 看了上面的Single.just, 可能有種被忽悠的感覺,也太low了吧忠售,這么簡單么传惠,那么我們不妨再來看看Single.create:
public static <@NonNull T> Single<T> create(@NonNull SingleOnSubscribe<T> source) {
Objects.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new SingleCreate<>(source));
}
- 同樣的create方法中創(chuàng)建了一個被觀察者SingleCreate并返回:
public final class SingleCreate<T> extends Single<T> {
final SingleOnSubscribe<T> source;
public SingleCreate(SingleOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
Emitter<T> parent = new Emitter<>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
...//省略了內部類Emitter
}
- 看了上面代碼,create和just有什么區(qū)別呢稻扬,just{}中的代碼是直接賦值給value的卦方,不需要下游subscribe就會執(zhí)行,而create是通過匿名內部類傳入一個SingleOnSubscribe的實例,并在實現(xiàn)的subscribe方法中產生數(shù)據(jù)泰佳,在SingleCreate.subscribeActual中調用SingleOnSubscribe.subscribe盼砍,SingleCreate.subscribeActual則是在下游subscribe時才會執(zhí)行尘吗,也就是Single.create在下游訂閱后才開始產生數(shù)據(jù);那么顯然浇坐,Observable.just()不適合封裝網(wǎng)絡數(shù)據(jù)睬捶,因為我們通常不想在subscribe之前做網(wǎng)絡請求。
參考
- Rxjava的源碼
- 給 Android 開發(fā)者的 RxJava 詳解
- 扔物線.henCoder-RxJava原理完全解析
- Carson帶你學Android:RxJava
- 擁抱 RxJava(三):關于 Observable 的冷熱近刘,常見的封裝方式以及誤區(qū)
- Rxjava3文檔級教程