探索Android開源框架 - 3. RxJava使用及源碼解析

相關概念

Android多線程編程的原則:

  1. 不要阻塞UI線程;
  2. 不要在UI線程之外訪問UI組件;

ReactiveX

  • Reactive Extensions的縮寫,一般簡寫為Rx;
  • 是一個使用可觀察數(shù)據(jù)流進行異步編程的編程接口,ReactiveX結合了觀察者模式、迭代器模式和函數(shù)式編程的精華;

RxJava

  • Reactive Extensions for the JVM:寂玲,RxJava就是ReactiveX在JVM平臺的實現(xiàn);
  • 基于事件流的鏈式調用呆馁,進行耗時任務,線程切換鹊漠,其本質是一個異步操作庫;

原理

  • 基于觀察者模式, 一個方面的操作依賴于另一個方面的狀態(tài)變化主到,當一個對象必須通知其他對象茶行,又希望這個對象和其他被通知的對象是松散耦合的;

三個要素

  • 被觀察者(Observable)登钥,觀察者(Subscriber)畔师,訂閱(subscribe);

被觀察者

1. Observable
  • 可多次發(fā)送事件(onNext),直到 onComplete 或 onError 被調用結束訂閱;
  • 不支持背壓:當被觀察者快速發(fā)送大量數(shù)據(jù)時牧牢,下游不會做其他處理看锉,即使數(shù)據(jù)大量堆積,調用鏈也不會報MissingBackpressureException,消耗內存過大只會OOM塔鳍。(官方給出以1000個事件為分界線作為參考);
val observable = Observable.create(ObservableOnSubscribe<Int> {
    it.onNext(1)
    it.onNext(3)
    it.onNext(5)
    it.onComplete()
    LjyLogUtil.d("${Thread.currentThread().name}_subscribe")
})
//Disposable通過 dispose() ?方法來讓上游停?止?工作伯铣,達到「丟棄」的效果
var disposable: Disposable? = null
val observer = object : Observer<String> {
    override fun onSubscribe(d: Disposable) {
        //訂閱后發(fā)送數(shù)據(jù)之前, 回調這個方法,Disposable可用于取消訂閱
        disposable = d
        LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")
    }

    override fun onNext(t: String) {
        LjyLogUtil.d("${Thread.currentThread().name}_onNext:$t")
    }

    override fun onError(e: Throwable) {
        LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")
    }

    override fun onComplete() {
        LjyLogUtil.d("${Thread.currentThread().name}_onComplete")
    }
}

//rxJava的整體結構是一條鏈轮纫,鏈的最上游是被觀察者observable腔寡,最下游是觀察者observer,
//鏈的中間各個節(jié)點掌唾,既是其下游的observable放前,又是其上游的observer
observable
    //subscribeOn: 切換起源 Observable 的線程,
    // 當多次調?用 subscribeOn() 的時候,只有最上?面的會對起源 Observable 起作?用糯彬,
    // 原因subscribeOn底層通過新建observable實現(xiàn)
    .subscribeOn(Schedulers.io())
    .subscribeOn(AndroidSchedulers.mainThread())
    //observeOn指定的是它之后的操作所在的線程凭语,通過observeOn的多次調用,程序實現(xiàn)了線程的多次切換
    //影響范圍observeOn是它下面的每個observer情连,除非又遇到新的observeOn
    .observeOn(Schedulers.io())
    .map {
        LjyLogUtil.d("map:${Thread.currentThread().name}")
        "num_$it"
    }
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(observer)
2. Flowable:
  • 和 Observable 一樣, 且支持Reactive-Streams和背壓;
  • 上游的被觀察者會響應下游觀察者的數(shù)據(jù)請求叽粹,下游調用request(n)來告訴上游發(fā)送多少個數(shù)據(jù)。這樣避免了大量數(shù)據(jù)堆積在調用鏈上却舀,使內存一直處于較低水平;
var sub: Subscription? = null
Flowable.create(FlowableOnSubscribe<Int> {
    it.onNext(1)
    it.onNext(3)
    it.onNext(5)
    it.onComplete()
    LjyLogUtil.d("${Thread.currentThread().name}_subscribe")
    //使用create創(chuàng)建Flowable虫几,需要指定背壓策略
}, BackpressureStrategy.BUFFER)

Flowable.range(0, 5)
    .subscribe(object : Subscriber<Int> {
        override fun onSubscribe(s: Subscription?) {
            //當訂閱后,會首先調用這個方法挽拔,其實就相當于onStart()辆脸,
            //傳入的Subscription s參數(shù)可以用于請求數(shù)據(jù)或者取消訂閱
            LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe start")
            sub = s
            sub?.request(1)
            LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe end")
        }

        override fun onNext(t: Int?) {
            LjyLogUtil.d("${Thread.currentThread().name}_onNext:$t")
            sub?.request(1)
        }

        override fun onError(t: Throwable?) {
            LjyLogUtil.d("${Thread.currentThread().name}_onError:${t?.message}")
        }

        override fun onComplete() {
            LjyLogUtil.d("${Thread.currentThread().name}_onComplete")
        }

    })
3. Single
  • 單次發(fā)送事件(onSuccess、onError)螃诅,發(fā)完即結束訂閱;
  • 總是只發(fā)射一個值啡氢,或者一個錯誤通知,而不是發(fā)射一系列的值(當然就不存在背壓問題);
  • 一般一個接口只是一次請求一次返回术裸,所以使用Single 與 Retrofit 配合是更為合理;
Single.just(1)
    .map { "num_$it" }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe { num ->
        LjyLogUtil.d(num)
    }
Single.create(SingleOnSubscribe<String> { emitter ->
    LjyLogUtil.d("${Thread.currentThread().name}_subscribe")
    emitter.onSuccess("str1")
    emitter.onSuccess("str2")//錯誤寫法倘是,重復調用也不會處理,因為只會調用一次
}).subscribe(object : SingleObserver<String> {
    override fun onSubscribe(d: Disposable) {
        LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")
    }

    override fun onSuccess(t: String) {
        LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$t")
    }

    override fun onError(e: Throwable) {
        LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")
    }

})
4. Completable
  • 單次發(fā)送事件(onError袭艺、onComplete)搀崭,發(fā)完即結束訂閱;
  • 如果你的觀察者連onNext事件都不關心,可以使用Completable猾编,它只有onComplete和onError兩個事件,要轉換成其他類型的被觀察者瘤睹,也是可以使用toFlowable()升敲、toObservable()等方法去轉換;
Completable.create { emitter ->
    LjyLogUtil.d("${Thread.currentThread().name}_subscribe")
    emitter.onComplete()//單一onComplete或者onError
}.subscribe(object : CompletableObserver {
    override fun onSubscribe(d: Disposable) {
        LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")
    }

    override fun onComplete() {
        LjyLogUtil.d("${Thread.currentThread().name}_onComplete")
    }

    override fun onError(e: Throwable) {
        LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")
    }

})
5. Maybe
  • 單次發(fā)送事件(onSuccess、onError轰传、onComplete)驴党,發(fā)完即結束訂閱。相當于 Completable 和 Single 結合;
  • 如果可能發(fā)送一個數(shù)據(jù)或者不會發(fā)送任何數(shù)據(jù)获茬,這時候你就需要Maybe港庄,它類似于Single和Completable的混合體;
  • onSuccess和onComplete是互斥的存在;
Maybe.create(MaybeOnSubscribe<String> {
    it.onSuccess("str1")
    it.onComplete()
}).subscribe(object : MaybeObserver<String> {
    override fun onSubscribe(d: Disposable) {
        LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")
    }

    override fun onSuccess(t: String) {
        LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$t")
    }

    override fun onError(e: Throwable) {
        LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")
    }

    override fun onComplete() {
        LjyLogUtil.d("${Thread.currentThread().name}_onComplete")
    }

})
  • 五種被觀察者可通過toObservable,toFlowable,toSingle,toCompletable,toMaybe相互轉換;

觀察者

val observer = object : Observer<Int> {
        override fun onSubscribe(d: Disposable) {
            LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")
        }

        override fun onNext(t: Int) {
            LjyLogUtil.d("${Thread.currentThread().name}_onNext:$t")
        }

        override fun onError(e: Throwable) {
            LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")
        }

        override fun onComplete() {
            LjyLogUtil.d("${Thread.currentThread().name}_onComplete")
        }
    }

線程調度器(Schedulers)

  • 指定 被觀察者(Observable)/ 觀察者(Observer)的工作線程,簡化了異步操作;
  • 默認在創(chuàng)建自身的線程;
  • 配合操作符 subscribeOn , observeOn 使用;
AndroidSchedulers.mainThread()
  • 需要引用RxAndroid, 切換到UI線程(Android的主線程), 為Android開發(fā)定制;
Schedulers.io()
  • 用于IO密集型任務锦茁,如讀寫SD卡文件攘轩,查詢數(shù)據(jù)庫,訪問網(wǎng)絡等;
  • 具有線程緩存機制码俩,默認是一個CacheThreadScheduler;
Schedulers.newThread()
  • 為每一個任務創(chuàng)建一個新線程;
  • 不具有線程緩存機制,雖然使用Schedulers.io的地方歼捏,都可以使用Schedulers.newThread稿存,但是,Schedulers.newThread的效率沒有Schedulers.io高;
Schedulers.computation()
  • 用于CPU 密集型計算任務瞳秽,即不會被 I/O 等操作限制性能的耗時操作瓣履,例如xml,json文件的解析,Bitmap圖片的壓縮取樣等练俐,具有固定的線程池袖迎,大小為CPU的核數(shù)。不可以用于I/O操作腺晾,因為I/O操作的等待時間會浪費CPU燕锥。
Schedulers.trampoline()
  • 在當前線程立即執(zhí)行任務,如果當前線程有任務在執(zhí)行悯蝉,則會將其暫停归形,等插入進來的任務執(zhí)行完之后,再將未完成的任務接著執(zhí)行;
Schedulers.single()
  • 擁有一個線程單例鼻由,所有的任務都在這一個線程中執(zhí)行暇榴,當此線程中有任務執(zhí)行時,其他任務將會按照先進先出的順序依次執(zhí)行;
Scheduler.from(executor)
  • 指定一個線程調度器蕉世,由此調度器來控制任務的執(zhí)行策略;

背壓(Backpressure)

  • 背壓是指在異步場景中蔼紧,被觀察者發(fā)送事件速度遠快于觀察者的處理速度的情況下,一種告訴上游的被觀察者降低發(fā)送速度的策略;
  • 支持背壓的被觀察者為Flowable狠轻;
  • Android中很少用到奸例,除非在線視頻流,直播等場景哈误,當畫面卡頓已取得的數(shù)據(jù)失效了哩至,需要拋棄等;
背壓策略模式
BackpressureStrategy.MISSING
  • 在此策略下躏嚎,通過Create方法創(chuàng)建的Flowable相當于沒有指定背壓策略,不會對通過onNext發(fā)射的數(shù)據(jù)做緩存或丟棄處理菩貌,需要下游通過背壓操作符處理
BackpressureStrategy.ERROR:
  • 在此策略下卢佣,如果放入Flowable的異步緩存池中的數(shù)據(jù)超限了,則會拋出MissingBackpressureException異常;
BackpressureStrategy.BUFFER:
  • 內部維護了一個緩存池SpscLinkedArrayQueue箭阶,其大小不限虚茶,此策略下,如果Flowable默認的異步緩存池滿了仇参,會通過此緩存池暫存數(shù)據(jù)嘹叫,它與Observable的異步緩存池一樣,可以無限制向里添加數(shù)據(jù)诈乒,不會拋出MissingBackpressureException異常罩扇,但會導致OOM;
  • 當緩存區(qū)大小存滿(默認緩存區(qū)大小 = 128)、被觀察者仍然繼續(xù)發(fā)送下1個事件時怕磨,將緩存區(qū)大小設置成無限大喂饥,被觀察者可無限發(fā)送事件 觀察者,但實際上是存放在緩存區(qū)肠鲫,但要注意內存情況员帮,防止出現(xiàn)OOM;
BackpressureStrategy.DROP
  • 在此策略下,如果Flowable的異步緩存池滿了导饲,會丟掉上游發(fā)送的數(shù)據(jù);
BackpressureStrategy.LATEST
  • 與Drop策略一樣捞高,如果緩存池滿了,會丟掉將要放入緩存池中的數(shù)據(jù)渣锦,不同的是硝岗,不管緩存池的狀態(tài)如何,LATEST都會將最后一條數(shù)據(jù)強行放入緩存池中泡挺,來保證觀察者在接收到完成通知之前辈讶,能夠接收到Flowable最新發(fā)射的一條數(shù)據(jù);
  • 即如果發(fā)送了150個事件,緩存區(qū)里會保存129個事件(第1-第128 + 第150事件);
RxJava 2.0內部提供 封裝了背壓策略模式的方法
  • 默認采用BackpressureStrategy.ERROR模式;
onBackpressureBuffer()
onBackpressureDrop()
onBackpressureLatest()

Flowable.interval(1, TimeUnit.MILLISECONDS)
    //添加背壓策略封裝好的方法娄猫,此處選擇Buffer模式贱除,即緩存區(qū)大小無限制
    .onBackpressureBuffer()
    .observeOn(Schedulers.newThread())
    .subscribe(subscriber)

冷熱流

Cold Observable

  • subscribe時才會發(fā)射數(shù)據(jù);
  • 常見的工廠方法提供的都是ColdObservable,包括just(),fromXX,create(),interval(),defer();
  • 當你有多個Subscriber的時候,他們的事件是獨立的,示例代碼如下:
val interval = Observable.interval(1, TimeUnit.SECONDS)
var disposable1: Disposable? = null
val observer1 = object : Observer<Long> {
    override fun onSubscribe(d: Disposable) {
        disposable1 = d
    }

    override fun onNext(t: Long) {
        LjyLogUtil.d("觀察者1:$t")
    }

    override fun onError(e: Throwable) {

    }

    override fun onComplete() {

    }
}

var disposable2: Disposable? = null
val observer2 = object : Observer<Long> {
    override fun onSubscribe(d: Disposable) {
        disposable1 = d
    }

    override fun onNext(t: Long) {
        LjyLogUtil.d("觀察者2:$t")
    }

    override fun onError(e: Throwable) {

    }

    override fun onComplete() {

    }
}

findViewById<Button>(R.id.btn_subscribe1)
    .clicks().subscribe {
        interval.subscribe(observer1)
    }
findViewById<Button>(R.id.btn_dispose1)
    .clicks().subscribe {
        disposable1?.dispose()
    }


findViewById<Button>(R.id.btn_subscribe2)
    .clicks().subscribe {
        interval.subscribe(observer2)
    }
findViewById<Button>(R.id.btn_dispose2)
    .clicks().subscribe {
        disposable2?.dispose()
    }

Hot Observable

  • 對于Hot Observable的所有subscriber,他們會在同一時刻收到相同的數(shù)據(jù);
  • 通常使用publish()操作符來將ColdObservable變?yōu)镠ot媳溺≡禄希或者 使用 Subjects 也是Hot Observable;
  • 如果他開始傳輸數(shù)據(jù),你不主動喊停(dispose()/cancel())悬蔽,那么他就不會停扯躺,一直發(fā)射數(shù)據(jù),即使他已經(jīng)沒有Subscriber了;
val interval = Observable.interval(1, TimeUnit.SECONDS).publish()
var disposable: Disposable? = null
findViewById<Button>(R.id.btn_connect)
    .clicks().subscribe {
        disposable = interval.connect()
    }
findViewById<Button>(R.id.btn_dispose)
    .clicks().subscribe {
        disposable?.dispose()
    }
findViewById<Button>(R.id.btn_subscribe3)
    .clicks().subscribe {
        interval.subscribe {
            LjyLogUtil.d("觀察者3:$it")
        }
    }
findViewById<Button>(R.id.btn_subscribe4)
    .clicks().subscribe {
        interval.subscribe {
            LjyLogUtil.d("觀察者4:$it")
        }
    }

操作符

  • 操作符很多,不用完全背下來录语,瀏覽一遍倍啥,用時知道在哪找(關注+收藏本文[狗頭])即可

線程切換

  • subscribeOn:指定被觀察者Observable的工作線程;
  • observeOn:指定觀察者的observer工作線程;
observable
    //subscribeOn: 切換起源 Observable 的線程,
    // 當多次調?用 subscribeOn() 的時候,只有最上?面的會對起源 Observable 起作?用澎埠,
    // 原因subscribeOn底層通過新建observable實現(xiàn)
    .subscribeOn(Schedulers.io())
    .subscribeOn(AndroidSchedulers.mainThread())
    //observeOn指定的是它之后的操作所在的線程虽缕,通過observeOn的多次調用,程序實現(xiàn)了線程的多次切換
    //影響范圍observeOn是它下面的每個observer蒲稳,除非又遇到新的observeOn
    .observeOn(Schedulers.io())
    .map {
        LjyLogUtil.d("map:${Thread.currentThread().name}")
        "num_$it"
    }
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(observer)

創(chuàng)建操作

基本創(chuàng)建
create
  • 通過調用觀察者的方法從頭創(chuàng)建一個Observable;
Observable.create(ObservableOnSubscribe<Int> {
    LjyLogUtil.d("${Thread.currentThread().name}_subscribe")
    it.onNext(1)
    it.onNext(3)
    it.onNext(5)
    it.onComplete()
})
快速創(chuàng)建
just
  • 將對象或者對象集合轉換為一個會發(fā)射這些對象的Observable;
Observable.just(1, 2, 3, 4)
fromArray & fromIterable
  • 將其它的對象或數(shù)據(jù)結構轉換為Observable;
Observable.fromArray(arrayOf(1,2,3))

Observable.fromIterable(listOf(4,5,6))
never
  • 創(chuàng)建的被觀察者不發(fā)送任何事件, 觀察者接收后什么都不調用;
Observable.never<Int>().subscribe(observer)
empty
  • 創(chuàng)建的被觀察者僅發(fā)送Complete事件氮趋,直接通知完成, 觀察者接收后會直接調用onCompleted;
Observable.empty<Int>().subscribe(observer)
error
  • 創(chuàng)建的被觀察者僅發(fā)送Error事件,直接通知異常, 觀察者接收后會直接調用onError;
Observable.error<Int>(RuntimeException()).subscribe(observer)
延遲創(chuàng)建
defer:
  • 在觀察者訂閱之前不創(chuàng)建這個Observable江耀,為每一個觀察者創(chuàng)建一個新的Observable;
var num=1
val observable=Observable.defer {Observable.just(num)}
num=2
observable.subscribe(observer)
timer
  • 創(chuàng)建在一個指定的延遲之后發(fā)射單個數(shù)據(jù)的Observable;
//延時3秒后剩胁,發(fā)送一個整數(shù)0
Observable.timer(3, TimeUnit.SECONDS)
interval & intervalRange
  • 創(chuàng)建一個定時發(fā)射整數(shù)序列的Observable;
//初始延時1秒,每3秒發(fā)一個自增整數(shù)
Observable.interval(1, 3, TimeUnit.SECONDS)

//初始延時2秒祥国,后每1秒發(fā)一個從10開始的整數(shù)昵观,發(fā)5個(發(fā)到14)停止
Observable.intervalRange(10, 5, 2, 1, TimeUnit.SECONDS);
range & rangeLong
  • 創(chuàng)建發(fā)射指定范圍的整數(shù)序列的Observable;
Observable.range(0, 5)

Observable.rangeLong(0, 5)
Repeat
  • 創(chuàng)建重復發(fā)射特定的數(shù)據(jù)或數(shù)據(jù)序列的Observable;
//一直重復
Observable.fromArray(1, 2, 3, 4).repeat()
//重復發(fā)送5次
Observable.fromArray(1, 2, 3, 4).repeat(5)
//重復發(fā)送直到符合條件時停止重復
Observable.fromArray(1, 2, 3, 4).repeatUntil { false }
//
Observable.just(1, 2, 3, 4)
    .repeatWhen {
        it.flatMap { obj ->
            if (obj is NumberFormatException) {
                Observable.error(Throwable("repeatWhen終止"))
            } else {
                Observable.just(5, 6, 7)
            }
        }
    }.subscribe(observer)

2. 變換操作:

map
  • 映射,通過對序列的每一項都應用一個函數(shù)變換Observable發(fā)射的數(shù)據(jù)舌稀,實質是對序列中的每一項執(zhí)行一個函數(shù)索昂,函數(shù)的參數(shù)就是這個數(shù)據(jù)項;
Observable.just("1", "2", "3").map { it.toInt() }.subscribe(observer)
flatMap & concatMap
  • 扁平映射,將Observable發(fā)射的數(shù)據(jù)變換為Observables集合扩借,然后將這些Observable發(fā)射的數(shù)據(jù)平坦化的放進一個單獨的Observable,可以認為是一個將嵌套的數(shù)據(jù)結構展開的過程;
//concatMap與flatMap的區(qū)別: concatMap是有序的缤至,flatMap是無序的
Observable.just("A", "B", "C")
    .flatMap { x ->
        Observable.intervalRange(1, 3, 0, 1, TimeUnit.SECONDS).map { y ->
            "($x,$y)"
        }
    }
Observable.just("A", "B", "C")
    .concatMap { m ->
        Observable.intervalRange(1, 3, 0, 1, TimeUnit.SECONDS).map { n ->
            "($m,$n)"
        }
    }
groupBy
  • 分組潮罪,將原來的Observable分拆為Observable集合,將原始Observable發(fā)射的數(shù)據(jù)按Key分組领斥,每一個Observable發(fā)射一組不同的數(shù)據(jù);
Observable.just(
    "Tiger",
    "Elephant",
    "Cat",
    "Chameleon",
    "Frog",
    "Fish",
    "Turtle",
    "Flamingo"
)
    .groupBy { it[0].uppercaseChar() }
    .concatMapSingle { it.toList() }
    .subscribe(observer4)
scan
  • 掃描嫉到,對Observable發(fā)射的每一項數(shù)據(jù)應用一個函數(shù),然后按順序依次發(fā)射這些值;
  • 對發(fā)射的數(shù)據(jù)和上一輪發(fā)射的數(shù)據(jù)進行函數(shù)處理月洛,并返回的數(shù)據(jù)供下一輪使用何恶,持續(xù)這個過程來產生剩余的數(shù)據(jù)流。其應用場景有簡單的累加計算嚼黔,判斷所有數(shù)據(jù)的最小值等;
Observable.just(1, 2, 3, 4)
    .scan { t1, t2 -> t1 + t2 }
    .subscribe(observer)
buffer
  • 緩存细层,定期從Observable收集數(shù)據(jù)到一個集合,然后把這些數(shù)據(jù)集合打包發(fā)射唬涧,而不是一次發(fā)射一個
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
    .buffer(3)
    .subscribe(observer5)
Window
  • 窗口疫赎,定期將來自Observable的數(shù)據(jù)分拆成一些Observable窗口,然后發(fā)射這些窗口碎节,而不是每次發(fā)射一項; 類似于Buffer捧搞,但Buffer發(fā)射的是數(shù)據(jù),Window發(fā)射的是Observable,每一個Observable發(fā)射原始Observable的數(shù)據(jù)的一個子集;
//window操作符和buffer操作符在功能上實現(xiàn)的效果是一樣的胎撇,但window操作符最大區(qū)別在于同樣是緩存一定數(shù)量的數(shù)據(jù)項介粘,
// window操作符最終發(fā)射出來的是新的事件流integerObservable,而buffer操作符發(fā)射出來的是新的數(shù)據(jù)流晚树,
// 也就是說姻采,window操作符發(fā)射出來新的事件流中的數(shù)據(jù)項,還可以經(jīng)過Rxjava其他操作符進行處理题涨。
Observable.just(1, 2, 3, 4)
    .window(2, 1)
    .subscribe(observer6)

3. 過濾操作:

Filter
  • 過濾偎谁,過濾掉沒有通過謂詞測試的數(shù)據(jù)項,只發(fā)射通過測試的;
Observable.just(1, 2, 3, 4, 5)
    .filter {
            it % 2 == 0
    }.subscribe(observer)
ofType
  • 過濾特定類型的數(shù)據(jù);
Observable.just(1, 2.1, "3", 4L, 5.0)
    .ofType(Int::class.java)
    .subscribe(observer)
distinct & distinctUntilChanged
  • 去重纲堵,過濾掉重復數(shù)據(jù)項;
Observable.just(1, 2, 3, 4, 3, 2, 1)
    .distinct()
//  .distinctUntilChanged()//去掉相鄰連續(xù)重復數(shù)據(jù)
    .subscribe(observer)
skip & skipLast
val observable = Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
// 跳過, 跳過前面的若干項數(shù)據(jù);
observable.skip(4).subscribe(observer)
//skipLast:跳過后面的若干項數(shù)據(jù)
observable.skipLast(4).subscribe(observer)
take & takeLast
val observable = Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
//take:只保留前面的若干項數(shù)據(jù)
observable.take(4).subscribe(observer)
//takeLast:只保留后面的若干項數(shù)據(jù)
observable.takeLast(4).subscribe(observer)
debounce
  • 只有在空閑了一段時間后才發(fā)射數(shù)據(jù)巡雨,通俗的說,就是如果一段時間沒有操作席函,就執(zhí)行一次操作;
val observable2 = Observable.create<Int> {
    it.onNext(1)
    Thread.sleep(400)
    it.onNext(2)
    Thread.sleep(1200)
    it.onNext(3)
    Thread.sleep(1000)
    it.onNext(4)
    Thread.sleep(800)
    it.onNext(5)
    Thread.sleep(2000)
    it.onNext(6)
}
observable2
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.io())
    .debounce(1, TimeUnit.SECONDS)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(observer)
sample
  • 取樣铐望,定期發(fā)射最新的數(shù)據(jù),等于是數(shù)據(jù)抽樣;
//sample:與debounce的區(qū)別是茂附,sample是以時間為周期的發(fā)射正蛙,一秒又一秒內的最新數(shù)據(jù)。而debounce是最后一個有效數(shù)據(jù)開始
observable2
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.io())
    .sample(1, TimeUnit.SECONDS)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(observer)
throttleFirst & throttleLast & throttleWithTimeout & throttleLatest
  • throttleFirst是指定周期內第一個數(shù)據(jù)营曼,throttleLast與sample一致乒验。throttleWithTimeout與debounce一致;
observable2
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.io())
    .throttleFirst(1, TimeUnit.SECONDS)
//  .throttleLast(1, TimeUnit.SECONDS)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(observer)
timeout
  • 后一個數(shù)據(jù)發(fā)射未在前一個元素發(fā)射后規(guī)定時間內發(fā)射則返回超時異常;
observable2
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.io())
    .timeout(1, TimeUnit.SECONDS)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(observer)
elementAt
  • 取值,取特定位置的數(shù)據(jù)項;
Observable.just(4, 3, 2, 1)
    .elementAt(1)
//出現(xiàn)越界時蒂阱,拋出異常
//  .elementAtOrError(1)
    .subscribe(maybeObserver)
first
  • 首項锻全,只發(fā)射滿足條件的第一條數(shù)據(jù);
Observable.just(1, 2, 3, 4, 5)
//  .first(-1)
//  .firstOrError()
    .firstElement()
    .subscribe(maybeObserver)
last
  • 末項,只發(fā)射最后一條數(shù)據(jù);
 Observable.just(1, 2, 3, 4, 5)
//  .last(-1)
//  .lastOrError()
    .lastElement()
    .subscribe(maybeObserver)
IgnoreElements
  • 忽略所有的數(shù)據(jù)录煤,只保留終止通知(onError或onCompleted),ignoreElements 作用于Flowable鳄厌、Observable。ignoreElement作用于Maybe妈踊、Single;
Observable.just(1, 2, 3, 4, 5)
    .ignoreElements()
    .subscribe(object : CompletableObserver {
        override fun onSubscribe(d: Disposable) {
            LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")
        }

        override fun onComplete() {
            LjyLogUtil.d("${Thread.currentThread().name}_onComplete")
        }

        override fun onError(e: Throwable) {
            LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")
        }
    })

4. 組合操作:

concat & concatArray
  • 不交錯的連接多個Observable的數(shù)據(jù);
  • 組合多個被觀察者一起發(fā)送數(shù)據(jù)了嚎,合并后 按發(fā)送順序串行執(zhí)行;
val just1 = Observable.just(1, 2, 3)
val just2 = Observable.just("A", "B", "C")
val just3 = Observable.just(4, 5, 6)
Observable.concat(just1, just2).subscribe(observer3)

//concat組合被觀察者數(shù)量<=4個,而concatArray則可>4個
Observable.concatArray(
    Observable.just(1, 2, 3),
    Observable.just(4, 5, 6),
    Observable.just(7, 8, 9)
).subscribe(observer3)
merge & mergeArray
  • 組合多個被觀察者一起發(fā)送數(shù)據(jù)廊营,合并后 按時間線并行執(zhí)行;
  • merge和concat的區(qū)別: merge合并后發(fā)射的數(shù)據(jù)項是并行無序的歪泳,concat合并后發(fā)射的數(shù)據(jù)項是串行有序的;
Observable.merge(just1, just2).subscribe(observer3)
//mergeWith
just1.mergeWith(just3).subscribe(observer3)
zip
  • 打包,使用一個指定的函數(shù)將多個Observable發(fā)射的數(shù)據(jù)組合在一起赘风,然后將這個函數(shù)的結果作為單項數(shù)據(jù)發(fā)射;
//zip操作符是將兩個數(shù)據(jù)流進行指定的函數(shù)規(guī)則合并
Observable.zip(just1, just2, { t1, t2 -> "${t1}_${t2}" })
    .subscribe(observer3)
//zipWith
just1.zipWith(just2, { t1, t2 -> "${t1}_${t2}" })
    .subscribe(observer3)
startWith & startWithArray
  • 在發(fā)射原來的Observable的數(shù)據(jù)序列之前夹囚,先發(fā)射一個指定的數(shù)據(jù)序列或數(shù)據(jù)項;
Observable.just(1,2,3)
    .startWith(Observable.just(4,5,6))
    .startWithArray(7,8,9)
    .subscribe(observer3)
join
  • 無論何時,如果一個Observable發(fā)射了一個數(shù)據(jù)項邀窃,只要在另一個Observable發(fā)射的數(shù)據(jù)項定義的時間窗口內荸哟,就將兩個Observable發(fā)射的數(shù)據(jù)合并發(fā)射;
just1.join(just2,
    //規(guī)定just2的過期期限
    { Observable.timer(3, TimeUnit.SECONDS) },
    //規(guī)定just1的過期期限
    { Observable.timer(8, TimeUnit.SECONDS) },
    //規(guī)定just1和just2的合并規(guī)則
    { t1, t2 -> "${t1}_${t2}" })
    .subscribe(observer3)
combineLatest
  • 當兩個Observables中的任何一個發(fā)射了一個數(shù)據(jù)時假哎,通過一個指定的函數(shù)組合每個Observable發(fā)射的最新數(shù)據(jù)(一共兩個數(shù)據(jù)),然后發(fā)射這個函數(shù)的結果;
Observable.combineLatest(just1, just2,
    { t1, t2 -> "${t1}_${t2}" }).subscribe(observer3)
switch
  • 將一個發(fā)射Observable序列的Observable轉換為這樣一個Observable:它逐個發(fā)射那些Observable最近發(fā)射的數(shù)據(jù);
Observable.switchOnNext(ObservableSource<Observable<Int>> { }, 12)
collect
  • 將被觀察者Observable發(fā)送的數(shù)據(jù)事件收集到一個數(shù)據(jù)結構里;
Observable.just(1, 2, 3, 4, 5, 6)
    .collect(
        { ArrayList() },
        BiConsumer<ArrayList<Int?>, Int> { t1, t2 -> t1.add(t2) }
    ).subscribe(Consumer { LjyLogUtil.d("num: $it") })
count
  • 計算Observable發(fā)射的數(shù)據(jù)個數(shù)鞍历,然后發(fā)射這個結果;
Observable.just(1, 2, 3)
    .count()
    .subscribe(object : SingleObserver<Long>{
        override fun onSubscribe(d: Disposable) {
            LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")
        }

        override fun onSuccess(t: Long) {
            LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
        }

        override fun onError(e: Throwable) {
            LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")
        }

    })

5. 錯誤處理:

cast
  • 將數(shù)據(jù)元素轉型成其他類型,轉型失敗會拋出異常;
Observable.just(1, 4.0, 3f, 7, 12, 4.6, 5)
    .cast(Int::class.java)
    .subscribe(observer)
onErrorReturn
  • 調用數(shù)據(jù)源的onError函數(shù)后會回到該函數(shù)舵抹,可對錯誤進行處理,然后返回值劣砍,會調用觀察者onNext()繼續(xù)執(zhí)行惧蛹,執(zhí)行完調用onComplete()函數(shù)結束所有事件的發(fā)射;
Observable.just(1, 2, 3.2, 4)
    .map { it.toInt() }
    .onErrorReturn {
        if (it is NumberFormatException) {
            0
        } else {
            throw IllegalArgumentException()
        }
    }.subscribe(observer)
onErrorReturnItem
  • 與onErrorReturn類似,onErrorReturnItem不對錯誤進行處理刑枝,直接返回一個值;
Observable.just(1, 2, 3.2, 4)
    .map { it.toInt() }
    .onErrorReturnItem(0)
    .subscribe(observer)
onErrorResumeNext & onExceptionResumeNext
  • 遇到錯誤時香嗓,發(fā)送1個新的Observable;
Observable.just(1, 2, 3.2, 4)
    .map { it.toInt() }
    .onErrorResumeNext { Observable.just(5,6,7) }
    .subscribe(observer)
retry
  • 重試,如果Observable發(fā)射了一個錯誤通知装畅,重新訂閱它靠娱,期待它正常終止;
//retry:當發(fā)生錯誤時,數(shù)據(jù)源重復發(fā)射item掠兄,直到?jīng)]有異诚裨疲或者達到所指定的次數(shù)
Observable.just(1, 2, 3, 4)
    .retry(3)
    .subscribe(observer)
retryUntil
  • 發(fā)生異常時,返回值是false表示繼續(xù)執(zhí)行(重復發(fā)射數(shù)據(jù))蚂夕,true不再執(zhí)行,但會調用onError方法;
var temp = 0
Observable.just(1, 2, 3, 4)
    .map {
        temp = it
        it
    }
    .retryUntil {
        temp > 3
    }
    .subscribe(observer)
retryWhen
  • 遇到錯誤時迅诬,將發(fā)生的錯誤傳遞給一個新的被觀察者(Observable),并決定是否需要重新訂閱原始被觀察者(Observable)& 發(fā)送事件;
Observable.just(1, 2, 3.2, 4)
    .map { it.toInt() }
    .retryWhen {
        it.flatMap { throwable ->
            if (throwable is NumberFormatException) {
                Observable.error(Throwable("retryWhen終止"))
            } else {
                Observable.just(5, 6, 7)
            }
        }
    }
    .subscribe(observer)

6. 輔助操作:

delay
  • 延遲一段時間發(fā)射結果數(shù)據(jù);
Observable.just(1, 2, 3)
    .delay(1,TimeUnit.SECONDS)
    .subscribe(observer)
do
  • 在某個事件的生命周期中調用;
  • doOnEach:數(shù)據(jù)源(Observable)每發(fā)送一次數(shù)據(jù)婿牍,就調用一次;
  • doOnNext:數(shù)據(jù)源每次調用onNext() 之前都會先回調該方法;
  • doOnError:數(shù)據(jù)源每次調用onError() 之前會回調該方法;
  • doOnComplete:數(shù)據(jù)源每次調用onComplete() 之前會回調該方法;
  • doOnSubscribe:數(shù)據(jù)源每次調用onSubscribe() 之后會回調該方法;
  • doOnDispose:數(shù)據(jù)源每次調用dispose() 之后會回調該方法;
Observable.just(1, 2, 3, 4)
    .observeOn(Schedulers.io())
    .subscribeOn(AndroidSchedulers.mainThread())
    .doOnSubscribe { LjyLogUtil.d("${Thread.currentThread().name}_doOnSubscribe") }
    .doOnEach { LjyLogUtil.d("${Thread.currentThread().name}_doOnEach:$it") }
    .doOnNext { LjyLogUtil.d("${Thread.currentThread().name}_doOnNext:$it") }
    .doOnError { LjyLogUtil.d("${Thread.currentThread().name}_doOnError:${it.localizedMessage}") }
    .doOnComplete { LjyLogUtil.d("${Thread.currentThread().name}_doOnComplete") }
    .doOnDispose { LjyLogUtil.d("${Thread.currentThread().name}_doOnDispose") }
    .subscribe(observer)

7. 條件和布爾操作:

all
  • 判斷Observable發(fā)射的所有的數(shù)據(jù)項是否都滿足某個條件;
Observable.just(1, -2, 3)
    .all {
        it > 0
    }.subscribe { it ->
        LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
    }
takeWhile & skipWhile & takeUntil & skipUntil
// 從0開始每1s發(fā)送1個數(shù)據(jù)
Observable.interval(1, TimeUnit.SECONDS)
    //條件滿足時侈贷,取數(shù)據(jù)
    .takeWhile {
        it<10
    }
Observable.interval(1, TimeUnit.SECONDS)
    //滿足條件時,跳過數(shù)據(jù)
    .skipWhile {
        it<10
    }
Observable.interval(1, TimeUnit.SECONDS)
   //取數(shù)據(jù)等脂,直到滿足條件
    .takeUntil {
        it>10
    }
Observable.interval(1, TimeUnit.SECONDS)
    //等待直到傳入的Observable開始發(fā)送數(shù)據(jù)
    .skipUntil (Observable.timer(5, TimeUnit.SECONDS))
SequenceEqual:
  • 判斷兩個Observable是否按相同的數(shù)據(jù)序列;
Observable.sequenceEqual(
    Observable.just(4, 5, 6),
    Observable.just(4, 5, 6)
).subscribe { it ->
    LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
}
contains
  • 判斷Observable是否會發(fā)射一個指定的數(shù)據(jù)項;
Observable.just(1, -2, 3)
    .contains(2)
    .subscribe { it ->
        LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
    }
isEmpty
  • 判斷發(fā)送的數(shù)據(jù)是否為空;
Observable.just(1, -2, 3)
    .isEmpty()
    .subscribe { it ->
        LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
    }
amb

-給定多個Observable铐维,只讓第一個發(fā)射數(shù)據(jù)的Observable發(fā)射全部數(shù)據(jù);

val list: MutableList<ObservableSource<Int>> = ArrayList()
list.add(Observable.just(1, 2, 3).delay(1, TimeUnit.SECONDS))
list.add(Observable.just(4, 5, 6))
Observable.amb(list).subscribe{
    LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
}
defaultIfEmpty
  • 發(fā)射來自原始Observable的數(shù)據(jù),如果原始Observable沒有發(fā)射數(shù)據(jù)慎菲,就發(fā)射一個默認數(shù)據(jù);
//在不發(fā)送onNext事件, 僅發(fā)送onComplete事件
Observable.empty<Int>()
    .defaultIfEmpty(-1)
    .subscribe { it ->
        LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
    }
  • SkipUntil:丟棄原始Observable發(fā)射的數(shù)據(jù),直到第二個Observable發(fā)射了一個數(shù)據(jù)锨并,然后發(fā)射原始Observable的剩余數(shù)據(jù);

實戰(zhàn)

結合 RxBinding 使用

  • RxBinding是對 Android View 事件的擴展, 它使得開發(fā)者可以對 View 事件使用 RxJava 的各種操作露该;
1. 添加依賴
//RxBinding
implementation 'com.jakewharton.rxbinding4:rxbinding:4.0.0'
//Google 'material' library bindings:
implementation 'com.jakewharton.rxbinding4:rxbinding-material:4.0.0'
//AndroidX library bindings:
implementation 'com.jakewharton.rxbinding4:rxbinding-core:4.0.0'
implementation 'com.jakewharton.rxbinding4:rxbinding-appcompat:4.0.0'
implementation 'com.jakewharton.rxbinding4:rxbinding-drawerlayout:4.0.0'
implementation 'com.jakewharton.rxbinding4:rxbinding-leanback:4.0.0'
implementation 'com.jakewharton.rxbinding4:rxbinding-recyclerview:4.0.0'
implementation 'com.jakewharton.rxbinding4:rxbinding-slidingpanelayout:4.0.0'
implementation 'com.jakewharton.rxbinding4:rxbinding-swiperefreshlayout:4.0.0'
implementation 'com.jakewharton.rxbinding4:rxbinding-viewpager:4.0.0'
implementation 'com.jakewharton.rxbinding4:rxbinding-viewpager2:4.0.0'
2. 按鈕防抖
val btn1 = findViewById<Button>(R.id.btn_1)
btn1.clicks()
    .throttleFirst(2, TimeUnit.SECONDS)
    .subscribeOn(AndroidSchedulers.mainThread())
    .subscribe {
        LjyLogUtil.d("點擊按鈕")
    }
3. editText輸入監(jiān)聽
//也可用作聯(lián)想搜索優(yōu)化
val et1 = findViewById<EditText>(R.id.et_1)
et1.textChanges()
    .debounce(1, TimeUnit.SECONDS)
    //跳過第1次請求 因為初始輸入框的空字符狀態(tài)
    .skip(1)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe{
        LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
    }
4. 聯(lián)合/表單判斷
val etName = findViewById<EditText>(R.id.et_name)
val etPwd = findViewById<EditText>(R.id.et_pwd)
val obName = etName.textChanges()
val obPwd = etPwd.textChanges()
Observable.combineLatest(
    obName, obPwd, { name, pwd -> name == "ljy" && pwd == "123" })
    //跳過第1次請求 因為初始輸入框的空字符狀態(tài)
    .skip(1)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe { isLogin -> LjyLogUtil.d(if (isLogin) "登錄成功" else "登錄失敗") }
5. 定時器任務
val time = 10L
val btnLogin = findViewById<Button>(R.id.btn_login)
btnLogin.clicks()
    .throttleFirst(time, TimeUnit.SECONDS)
    .subscribeOn(AndroidSchedulers.mainThread())
    .doOnNext { btnLogin.isEnabled = false }
    .subscribe {
        LjyLogUtil.d("點擊登錄")
        Observable.intervalRange(
            0, time, 0, 1,
            TimeUnit.SECONDS, AndroidSchedulers.mainThread()
        )
            .subscribe(
                { btnLogin.text = "剩余${time - it}秒" },
                { LjyLogUtil.e(it.message) },
                {
                    btnLogin.text = "獲取驗證碼"
                    btnLogin.isEnabled = true
                })
    }

利用RxLifecycle解決內存泄漏問題

1. 添加依賴
//RxLifecycle
    implementation 'com.trello.rxlifecycle4:rxlifecycle:4.0.2'
// If you want to bind to Android-specific lifecycles
    implementation 'com.trello.rxlifecycle4:rxlifecycle-android:4.0.2'
// If you want pre-written Activities and Fragments you can subclass as providers
    implementation 'com.trello.rxlifecycle4:rxlifecycle-components:4.0.2'
// If you want pre-written support preference Fragments you can subclass as providers
    implementation 'com.trello.rxlifecycle4:rxlifecycle-components-preference:4.0.2'
// If you want to use Android Lifecycle for providers
    implementation 'com.trello.rxlifecycle4:rxlifecycle-android-lifecycle:4.0.2'
// If you want to use Kotlin syntax
    implementation 'com.trello.rxlifecycle4:rxlifecycle-kotlin:4.0.2'
// If you want to use Kotlin syntax with Android Lifecycle
    implementation 'com.trello.rxlifecycle4:rxlifecycle-android-lifecycle-kotlin:4.0.2'
2. 綁定
Observable.intervalRange(0, 60, 0, 1, TimeUnit.SECONDS)
    //利用RxLifecycle來解決內存泄漏問題
    //bindToLifecycle的自動取消訂閱示例
    //.compose(bindToLifecycle())
    //手動設置在activity onPause的時候取消訂閱
    .compose(this.bindUntilEvent(ActivityEvent.PAUSE))
    .subscribe(
        { LjyLogUtil.d("剩余${60 - it}秒") },
        { LjyLogUtil.e(it.message) },
        { LjyLogUtil.d("完成") }
    )

網(wǎng)絡請求

1. 網(wǎng)絡請求嵌套回調
//使用observeOn多次切換線程
apiService.searchRepo(emptyMap()) // 請求搜索列表
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .doOnNext {
        //展示列表
    }
    .observeOn(Schedulers.io())
    .flatMap {
        Observable.fromIterable(it.items).map { repo -> repo.id }
    }
    .flatMap {
        apiService.getItem(it) // 請求詳情
    }
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe {
        //展示詳情
    }
2. 網(wǎng)絡請求輪詢
//每60秒一次,無限輪詢
Observable.interval(60, TimeUnit.SECONDS)
    .doOnNext {
        apiService.searchRepo(emptyMap())
            .subscribe(Consumer { LjyLogUtil.d("accept: ${it.items}") })
    }.subscribe { LjyLogUtil.d("第 $it 次輪詢") }
//有條件輪詢
var count = 0
apiService.searchRepo(emptyMap())
    .repeatWhen {
        it.flatMap {
            if (count > 3) {
                Observable.error(Throwable("輪詢結束"))
            } else {
                LjyLogUtil.d("第 $count 次輪詢")
                Observable.just(1).delay(60, TimeUnit.SECONDS)
            }
        }
    }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe({
        LjyLogUtil.d("accept: ${it.items}")
        count++
    }, {
        LjyLogUtil.d(it.message)
    })
3. 網(wǎng)絡請求出錯重連
//最大重試次數(shù)
val maxConnectCount = 10
//已重試次數(shù)
var currentRetryCount = 0
apiService.searchRepo(emptyMap())
    .retryWhen {
        it.flatMap { throwable ->
            //根據(jù)異常類型選擇是否重試
            if (throwable is IOException) {
                if (currentRetryCount < maxConnectCount) {
                    currentRetryCount++
                    //遇到的異常越多第煮,重試延遲間隔時間越長
                    Observable.just(1).delay(currentRetryCount * 60L, TimeUnit.SECONDS)
                } else {
                    Observable.error(Throwable("重試結束"))
                }
            } else {
                Observable.error(Throwable("發(fā)生了非網(wǎng)絡異常(非I/O異常)"))
            }
        }
    }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe({
        LjyLogUtil.d("accept: ${it.items}")
    }, {
        LjyLogUtil.d(it.message)
    })
4. 內存解幼,磁盤,網(wǎng)絡 三級緩存
var memoryCache: String? = null
var diskCache: String? = null
private fun test10() {
    val memory = Observable.create<String> {
        if (memoryCache != null) {
            it.onNext(memoryCache)
        } else {
            it.onComplete()
        }
    }

    val disk = Observable.create<String> {
        if (diskCache != null) {
            it.onNext(diskCache)
        } else {
            it.onComplete()
        }
    }.doOnNext {
        memoryCache = "內存數(shù)據(jù)"
    }

    val net = Observable.just("網(wǎng)絡數(shù)據(jù)")
        .doOnNext {
            // memoryCache="內存數(shù)據(jù)"
            diskCache = "磁盤數(shù)據(jù)"
        }
    //通過concat()合并memory包警、disk撵摆、network 3個被觀察者的事件(即檢查內存緩存、磁盤緩存 & 發(fā)送網(wǎng)絡請求)
    Observable.concat(memory, disk, net)
        //通過firstElement()害晦,從串聯(lián)隊列中取出并發(fā)送第1個有效事件(Next事件)特铝,即依次判斷檢查memory暑中、disk、network
        .firstElement()
        .subscribe {
            LjyLogUtil.d("accept: $it")
        }
}
5. 合并數(shù)據(jù)源 & 同時展示
var result = "數(shù)據(jù)來自:"
val net = Observable.just("網(wǎng)絡")
val disk = Observable.just("磁盤")
//使用merge鲫剿,從網(wǎng)絡和本地獲取數(shù)據(jù)并展示
Observable.merge(net, disk)
    .subscribe({
        result += "$it, "
    }, {

    }, {
        LjyLogUtil.d("result: $result")
    })
//使用zip,合并2個網(wǎng)絡請求向獲取數(shù)據(jù)并展示
val repo1 = apiService.getItem(1001).subscribeOn(Schedulers.io())
val repo2 = apiService.getItem(1002).subscribeOn(Schedulers.io())
Observable.zip(
    repo1, repo2, { data1, data2 ->
        val repoList = ArrayList<RepoDetail>()
        repoList.add(data1)
        repoList.add(data2)
        repoList
    })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe {
        for (repoDetail in it) {
            LjyLogUtil.d("result: ${repoDetail.name}")
        }
    }

源碼解析

入口

  • 以下面代碼為源碼閱讀入口鳄逾,Single是最簡單的被觀察者;
Single.just(1)
    .subscribe(object : SingleObserver<String> {
        override fun onSubscribe(d: Disposable) {
            LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")
        }

        override fun onSuccess(t: String) {
            LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$t")
        }

        override fun onError(e: Throwable) {
            LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")
        }
    })

被觀察者的創(chuàng)建

  • 先看上面代碼中被觀察者的創(chuàng)建方法:Single.just()灵莲;
public static <@NonNull T> Single<T> just(T item) {
    Objects.requireNonNull(item, "item is null");//判空
    return RxJavaPlugins.onAssembly(new SingleJust<>(item));
}
  • 其中第一行為判空雕凹,第二行RxJavaPlugins.onAssembly為一個鉤子方法,方便添加一些額外操作,代碼如下:
@Nullable
static volatile Function<? super Single, ? extends Single> onSingleAssembly;

public static <T> Single<T> onAssembly(@NonNull Single<T> source) {
    Function<? super Single, ? extends Single> f = onSingleAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}

  • 其中onSingleAssembly默認為空政冻,所以實際默認是直接返回source枚抵,也就是SingleJust;
  • 所以Single.just就是直接創(chuàng)建了一個被觀察者的實現(xiàn)類SingleJust的實例并返回明场,那么我們看一下SingleJust的代碼汽摹;
public final class SingleJust<T> extends Single<T> {
    final T value;

    public SingleJust(T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(SingleObserver<? super T> observer) {
        observer.onSubscribe(Disposable.disposed());
        observer.onSuccess(value);
    }
}
  • 很簡單吧,繼承了Single榕堰,并重寫了Single唯一的抽象方法subscribeActual竖慧,有個泛型變量value;
  • Single的代碼很多,基本都是上面介紹的操作符方法的實現(xiàn)逆屡,用到時可以點進源碼看一看圾旨;
public abstract class Single<@NonNull T> implements SingleSource<T> {
     ...
     protected abstract void subscribeActual(@NonNull SingleObserver<? super T> observer);
     ...
      public final void subscribe(@NonNull SingleObserver<? super T> observer) {
         ...
      }
}
  • Single實現(xiàn)了SingleSource接口,并實現(xiàn)subscribe方法魏蔗;
public interface SingleSource<@NonNull T> {

    /**
     * Subscribes the given {@link SingleObserver} to this {@link SingleSource} instance.
     * @param observer the {@code SingleObserver}, not {@code null}
     * @throws NullPointerException if {@code observer} is {@code null}
     */
    void subscribe(@NonNull SingleObserver<? super T> observer);
}

訂閱

  • 下面來看看開頭例子中的第二行的subscribe方法:Single.subscribe()砍的;
public final void subscribe(@NonNull SingleObserver<? super T> observer) {
    //判空
    Objects.requireNonNull(observer, "observer is null");
    //鉤子方法,默認還是入?yún)⒌腟ingleObserver
    observer = RxJavaPlugins.onSubscribe(this, observer);
    //判空
    Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null SingleObserver. Please check the handler provided to RxJavaPlugins.setOnSingleSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

    try {
        subscribeActual(observer);
    } catch (NullPointerException ex) {
        throw ex;
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        NullPointerException npe = new NullPointerException("subscribeActual failed");
        npe.initCause(ex);
        throw npe;
    }
}
  • 可以看到訂閱方法subscribe中最有用的一行就是調用了subscribeActual莺治,而這里的subscribeActual的實現(xiàn)正式上面的SingleJust中的實現(xiàn)廓鞠;
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
    observer.onSubscribe(Disposable.disposed());
    observer.onSuccess(value);
}
  • 可以看到SingleJust中的subscribeActual直接連著調用了observer的onSubscribe和onSuccess方法,并且onSubscribe的入?yún)橐粋€disposed的Disposable,是個已取消的Disposable谣旁,因為just是沒有延遲的床佳,無需取消和修改,而且onError是不會被調用的榄审,因為瞬發(fā)一個也不會有出錯的可能砌们;

操作符實現(xiàn)

map

  • 以map為例,對之前的示例擴展一下搁进,增加個map操作符對數(shù)據(jù)進行轉換,map搞懂了浪感,其他操作符的主要思路也是一樣的;
Single.just(1)
    .map { "num_$it" }
    .subscribe { num ->
        LjyLogUtil.d(num)
    }
  • 那么看一下map的實現(xiàn):
public final <@NonNull R> Single<R> map(@NonNull Function<? super T, ? extends R> mapper) {
    Objects.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new SingleMap<>(this, mapper));
}
  • map中也是有判空和鉤子饼问,創(chuàng)建了一個SingleMap并返回影兽,其入?yún)⒌谝淮螀?shù)為this,也就是just返回的SingleJust實例莱革,第二個參數(shù)為數(shù)據(jù)轉換的轉換器峻堰;
  • 那么來看一下SingleMap:
public final class SingleMap<T, R> extends Single<R> {
    final SingleSource<? extends T> source;

    final Function<? super T, ? extends R> mapper;

    public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
        this.source = source;
        this.mapper = mapper;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super R> t) {
        source.subscribe(new MapSingleObserver<T, R>(t, mapper));
    }
    ...//省略內部類MapSingleObserver
}
  • 可以看到SingleMap的subscribeActual中只有一行代碼讹开,就是調用source的訂閱方法subscribe,并傳入一個觀察者MapSingleObserver實例和SingleMap持有的數(shù)據(jù)轉換器茧妒,其中source也就是前一步的被觀察者SingleJust實例萧吠;
  • 而觀察者MapSingleObserver則是負責對上下游數(shù)據(jù)進行轉換 和傳遞,其入?yún)⑾掠蝧ubscribe傳入的觀察者SingleObserver實例桐筏,來看一下它的實現(xiàn):
static final class MapSingleObserver<T, R> implements SingleObserver<T> {
    final SingleObserver<? super R> t;
    final Function<? super T, ? extends R> mapper;

    MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
        this.t = t;
        this.mapper = mapper;
    }

    @Override
    public void onSubscribe(Disposable d) {
        t.onSubscribe(d);
    }

    @Override
    public void onSuccess(T value) {
        R v;
        try {
            //mapper.apply數(shù)據(jù)轉換器對數(shù)據(jù)進行轉換
            v = Objects.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            onError(e);
            return;
        }

        t.onSuccess(v);
    }

    @Override
    public void onError(Throwable e) {
        t.onError(e);
    }
}
  • 可以看到MapSingleObserver的onSuccess中調用數(shù)據(jù)轉換器mapper的apply方法對數(shù)據(jù)進行轉換纸型,
    并調用構造函數(shù)傳入的SingleObserver的onSuccess將轉換后的數(shù)據(jù)傳遞過去,而onSubscribe梅忌,onError方法則是直接調用了SingleObserver的方法
小結
  • 那么示例代碼中map前后整個流程串起來就是:
  1. Single.just創(chuàng)建了一個被觀察者SingleJust,負責發(fā)送原始數(shù)據(jù)狰腌;
  2. map創(chuàng)建了一個被觀察者SingleMap,構造函數(shù)入?yún)⑹巧弦徊降腟ingleJust和數(shù)據(jù)轉換器牧氮,也就是對SingleJust進行了包裝琼腔,或者說接管,如何接管的呢踱葛,SingleMap中調用了SingleJust的訂閱方法subscribe丹莲,并傳入觀察者MapSingleObserver,也就是對SingleJust說尸诽,小j啊甥材,你這項目給我(SingleMap)了,后面的客戶你不用跟了性含,把你手里的數(shù)據(jù)給我小弟MapSingleObserver就行了洲赵,以后需要啥他找你要;
  3. 后面再調用subscribe呢商蕴,實際是調用SingleMap的subscribe叠萍,也就是后面再有客戶其實都是跟SingleMap簽的單子了
  4. SingleMap收到單子呢轉手就給小弟MapSingleObserver了,反正SingleJust的數(shù)據(jù)也在你手里呢你看看咋給客戶服務吧
  5. MapSingleObserver就拿著SingleJust的數(shù)據(jù)進行轉換绪商,然后調用客戶(下游subscribe傳入的觀察者)的回調方法進行服務
  • ps:
    • MapSingleObserver:同樣是觀察者苛谷,我也太累了,又負責接收上游的數(shù)據(jù),又要進行處理晋柱,處理完還得反饋給下游的客戶;
    • SingleJust:你累?同樣是被觀察者忿族,我辛辛苦苦做的項目,還不是被SingleMap拿走了句灌,還讓你監(jiān)視(觀察)我账胧,這點業(yè)績都被內部消化了啊,而且項目開始了還把我工號留給客戶( t.onSubscribe(d))落蝙,那客戶是說停就停啊织狐,我這出錯了你也是轉頭就給客戶說霸萦住( t.onError(e));
    • 這哪是RxJava移迫,這分明是職場巴摇;

subscribeOn & observeOn

  • 在對之前的示例擴展一下厨埋,加入線程切換邪媳,那就是用到了subscribeOn & observeOn 操作符了;
Single.just(1)
    .map { "num_$it" }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe { num ->
        LjyLogUtil.d(num)
    }
subscribeOn
  • 先來看看subscribeOn的入?yún)chedulers.io():
public static Scheduler io() {
    return RxJavaPlugins.onIoScheduler(IO);
}

static {
    SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());

    COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());

    IO = RxJavaPlugins.initIoScheduler(new IOTask());

    TRAMPOLINE = TrampolineScheduler.instance();

    NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}

static final class IOTask implements Supplier<Scheduler> {
    @Override
    public Scheduler get() {
        return IoHolder.DEFAULT;
    }
}

static final class IoHolder {
    static final Scheduler DEFAULT = new IoScheduler();
}

public final class IoScheduler extends Scheduler {
    ...
}
  • 可以看到最后返回的是Scheduler的實現(xiàn)類IoScheduler實例荡陷;
  • 再來看看subscribeOn方法的實現(xiàn):
public final Single<T> subscribeOn(@NonNull Scheduler scheduler) {
    Objects.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new SingleSubscribeOn<>(this, scheduler));
}
  • 還是判空和鉤子雨效,然后創(chuàng)建一個SingleSubscribeOn并返回,入?yún)⑹巧嫌蔚谋挥^察者(this)废赞,和線程調度器Scheduler徽龟;
public final class SingleSubscribeOn<T> extends Single<T> {
    final SingleSource<? extends T> source;

    final Scheduler scheduler;

    public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) {
        this.source = source;
        this.scheduler = scheduler;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super T> observer) {
        //創(chuàng)建一個觀察者SubscribeOnObserver的實例,傳入上游的被觀察者source
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer, source);
        //調用下游的觀察者observer的onSubscribe唉地,通知其開始訂閱了据悔,取消的話可以找SubscribeOnObserver
        observer.onSubscribe(parent);
        //調用IoScheduler的scheduleDirect方法切換線程
        Disposable f = scheduler.scheduleDirect(parent);

        parent.task.replace(f);

    }
    ...//省略內部類SubscribeOnObserver
}
  • 上面subscribeActual中創(chuàng)建了一個被觀察者SubscribeOnObserver,并且調用scheduler.scheduleDirect切換線程耘沼;
public Disposable scheduleDirect(@NonNull Runnable run) {
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    DisposeTask task = new DisposeTask(decoratedRun, w);
    w.schedule(task, delay, unit);
    return task;
}

//其中DisposeTask為Scheduler的內部類
static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
    ...
    DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) {
        this.decoratedRun = decoratedRun;
        this.w = w;
    }

    @Override
    public void run() {
        runner = Thread.currentThread();
        try {
            try {
                decoratedRun.run();
            } catch (Throwable ex) {
                // Exceptions.throwIfFatal(e); nowhere to go
                RxJavaPlugins.onError(ex);
                throw ex;
            }
        } finally {
            dispose();
            runner = null;
        }
    }
    ...
}
  • 其中scheduleDirect的入?yún)⑹莻€Runnable极颓,而SubscribeOnObserver實現(xiàn)了Runnable接口和SingleObserver接口,構造方法入?yún)⑹窍掠蔚挠^察者和上游的被觀察者,在自己的onSuccess,onError中直接傳給下游的觀察者耕拷,在run方法中訂閱上游的被觀察者;
static final class SubscribeOnObserver<T>
    extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable, Runnable {
    ...
    SubscribeOnObserver(SingleObserver<? super T> actual, SingleSource<? extends T> source) {
        this.downstream = actual;
        this.source = source;
        this.task = new SequentialDisposable();
    }
    ...
    @Override
    public void onSuccess(T value) {
        downstream.onSuccess(value);
    }

    @Override
    public void onError(Throwable e) {
        downstream.onError(e);
    }
    ...
    @Override
    public void run() {
        source.subscribe(this);
    }
}
observeOn
  • 先來看看Android專用的線程調度器AndroidSchedulers.mainThread():
//通過靜態(tài)內部類提供HandlerScheduler的單例
public final class AndroidSchedulers {
    //開放的入口方法
    public static Scheduler mainThread() {
        //鉤子
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }
    //私有的靜態(tài)變量
    private static final Scheduler MAIN_THREAD =
          RxAndroidPlugins.initMainThreadScheduler(() -> MainHolder.DEFAULT);
    //私有的靜態(tài)內部類
    private static final class MainHolder {
        //本質還說離不開Handler,通過Looper.getMainLooper()切換到UI線程
        static final Scheduler DEFAULT
            = new HandlerScheduler(new Handler(Looper.getMainLooper()), true);
    }
    //私有的構造方法讼昆,并拋出異常,因為這里是創(chuàng)建HandlerScheduler的單例骚烧,而不是AndroidSchedulers本身
    private AndroidSchedulers() {
        throw new AssertionError("No instances.");
    }
}

final class HandlerScheduler extends Scheduler {
    private final Handler handler;
    private final boolean async;

    HandlerScheduler(Handler handler, boolean async) {
        this.handler = handler;
        this.async = async;
    }

    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        ...//省略判空和鉤子方法
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
        Message message = Message.obtain(handler, scheduled);
        if (async) {
            message.setAsynchronous(true);
        }
        //通過主線程的handler發(fā)送消息
        handler.sendMessageDelayed(message, unit.toMillis(delay));
        return scheduled;
    }

    @Override
    public Worker createWorker() {
        return new HandlerWorker(handler, async);
    }

    private static final class HandlerWorker extends Worker {
        private final Handler handler;
        private final boolean async;

        private volatile boolean disposed;

        HandlerWorker(Handler handler, boolean async) {
            this.handler = handler;
            this.async = async;
        }

        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            ...
            if (disposed) {
                return Disposable.disposed();
            }
            ...
            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
            Message message = Message.obtain(handler, scheduled);
            message.obj = this;
            ...
            handler.sendMessageDelayed(message, unit.toMillis(delay));

            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposable.disposed();
            }

            return scheduled;
        }

        @Override
        public void dispose() {
            disposed = true;
            handler.removeCallbacksAndMessages(this /* token */);
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }
    }

    private static final class ScheduledRunnable implements Runnable, Disposable {
        private final Handler handler;
        private final Runnable delegate;

        private volatile boolean disposed;

        ScheduledRunnable(Handler handler, Runnable delegate) {
            this.handler = handler;
            this.delegate = delegate;
        }

        @Override
        public void run() {
            try {
                delegate.run();
            } catch (Throwable t) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public void dispose() {
            handler.removeCallbacks(this);
            disposed = true;
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }
    }
}
  • 然后來看observeOn方法:
public final Single<T> observeOn(@NonNull Scheduler scheduler) {
    Objects.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new SingleObserveOn<>(this, scheduler));
}
  • 本質還是創(chuàng)建了一個被觀察者:
public final class SingleObserveOn<T> extends Single<T> {

    final SingleSource<T> source;

    final Scheduler scheduler;

    public SingleObserveOn(SingleSource<T> source, Scheduler scheduler) {
        this.source = source;
        this.scheduler = scheduler;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super T> observer) {
        //調用上游被觀察者source的訂閱方法subscribe浸赫,傳入一個觀察者ObserveOnSingleObserver,其入?yún)⑹窍掠蔚挠^察者observer
        source.subscribe(new ObserveOnSingleObserver<>(observer, scheduler));
    }
    ...//省略內部類ObserveOnSingleObserver
}
  • ObserveOnSingleObserver代碼如下, 在自己的onSubscribe調用下游觀察者的onSubscribe赃绊,在自己的onSuccess和onError中調用scheduler.scheduleDirect切換線程既峡,scheduleDirect的入?yún)⑹荝unnable,會調用自己的run方法碧查,而在run方法中調用了下游觀察者的onSuccess和onError傳遞結果运敢,以此實現(xiàn)改變下游觀察者的線程;
static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
    ...
    ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
        this.downstream = actual;
        this.scheduler = scheduler;
    }

    @Override
    public void onSubscribe(Disposable d) {
        if (DisposableHelper.setOnce(this, d)) {
            downstream.onSubscribe(this);
        }
    }

    @Override
    public void onSuccess(T value) {
        this.value = value;
        Disposable d = scheduler.scheduleDirect(this);
        DisposableHelper.replace(this, d);
    }

    @Override
    public void onError(Throwable e) {
        this.error = e;
        Disposable d = scheduler.scheduleDirect(this);
        DisposableHelper.replace(this, d);
    }

    @Override
    public void run() {
        Throwable ex = error;
        if (ex != null) {
            downstream.onError(ex);
        } else {
            downstream.onSuccess(value);
        }
    }
   ...
}

番外:Single.create

  • 看了上面的Single.just, 可能有種被忽悠的感覺,也太low了吧忠售,這么簡單么传惠,那么我們不妨再來看看Single.create:
public static <@NonNull T> Single<T> create(@NonNull SingleOnSubscribe<T> source) {
    Objects.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new SingleCreate<>(source));
}
  • 同樣的create方法中創(chuàng)建了一個被觀察者SingleCreate并返回:
public final class SingleCreate<T> extends Single<T> {

    final SingleOnSubscribe<T> source;

    public SingleCreate(SingleOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(SingleObserver<? super T> observer) {
        Emitter<T> parent = new Emitter<>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    ...//省略了內部類Emitter
}
  • 看了上面代碼,create和just有什么區(qū)別呢稻扬,just{}中的代碼是直接賦值給value的卦方,不需要下游subscribe就會執(zhí)行,而create是通過匿名內部類傳入一個SingleOnSubscribe的實例,并在實現(xiàn)的subscribe方法中產生數(shù)據(jù)泰佳,在SingleCreate.subscribeActual中調用SingleOnSubscribe.subscribe盼砍,SingleCreate.subscribeActual則是在下游subscribe時才會執(zhí)行尘吗,也就是Single.create在下游訂閱后才開始產生數(shù)據(jù);那么顯然浇坐,Observable.just()不適合封裝網(wǎng)絡數(shù)據(jù)睬捶,因為我們通常不想在subscribe之前做網(wǎng)絡請求。

參考

我是今陽擒贸,如果想要進階和了解更多的干貨,歡迎關注微信公眾號 “今陽說” 接收我的最新文章

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末跌宛,一起剝皮案震驚了整個濱河市酗宋,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌疆拘,老刑警劉巖蜕猫,帶你破解...
    沈念sama閱讀 216,372評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異哎迄,居然都是意外死亡回右,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評論 3 392
  • 文/潘曉璐 我一進店門漱挚,熙熙樓的掌柜王于貴愁眉苦臉地迎上來翔烁,“玉大人,你說我怎么就攤上這事旨涝〉乓伲” “怎么了?”我有些...
    開封第一講書人閱讀 162,415評論 0 353
  • 文/不壞的土叔 我叫張陵白华,是天一觀的道長慨默。 經(jīng)常有香客問我,道長弧腥,這世上最難降的妖魔是什么厦取? 我笑而不...
    開封第一講書人閱讀 58,157評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮管搪,結果婚禮上虾攻,老公的妹妹穿的比我還像新娘。我一直安慰自己更鲁,他們只是感情好霎箍,可當我...
    茶點故事閱讀 67,171評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著澡为,像睡著了一般漂坏。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,125評論 1 297
  • 那天樊拓,我揣著相機與錄音,去河邊找鬼塘慕。 笑死筋夏,一個胖子當著我的面吹牛,可吹牛的內容都是我干的图呢。 我是一名探鬼主播条篷,決...
    沈念sama閱讀 40,028評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼蛤织!你這毒婦竟也來了赴叹?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 38,887評論 0 274
  • 序言:老撾萬榮一對情侶失蹤指蚜,失蹤者是張志新(化名)和其女友劉穎乞巧,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體摊鸡,經(jīng)...
    沈念sama閱讀 45,310評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡绽媒,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,533評論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了免猾。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片是辕。...
    茶點故事閱讀 39,690評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖猎提,靈堂內的尸體忽然破棺而出获三,到底是詐尸還是另有隱情,我是刑警寧澤锨苏,帶...
    沈念sama閱讀 35,411評論 5 343
  • 正文 年R本政府宣布疙教,位于F島的核電站,受9級特大地震影響蚓炬,放射性物質發(fā)生泄漏松逊。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,004評論 3 325
  • 文/蒙蒙 一肯夏、第九天 我趴在偏房一處隱蔽的房頂上張望经宏。 院中可真熱鬧,春花似錦驯击、人聲如沸烁兰。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽沪斟。三九已至,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間主之,已是汗流浹背择吊。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評論 1 268
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留槽奕,地道東北人几睛。 一個月前我還...
    沈念sama閱讀 47,693評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像粤攒,于是被迫代替她去往敵國和親所森。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,577評論 2 353

推薦閱讀更多精彩內容