RxJava2 + Retrofit2 結合使用詳解

不講 rxjava 和 retrofit 而是直接上手 2 了狂魔,因為 2 封裝的更好用的更多。

1. 觀察者模式

常見的 button 點擊事件為例,button 是被觀察者说贝,listener 是觀察者,setOnClickListener 過程是訂閱慎颗,有了訂閱關系后在 button 被點擊的時候乡恕,監(jiān)聽者 listener 就可以響應事件。

這里的button.setOnClickListener(listener)看上去意思是被觀察者訂閱了觀察者(雜志訂閱了讀者)俯萎,邏輯上不符合日常生活習慣傲宜。其實這是設計模式的習慣,不必糾結夫啊,習慣了這種模式就利于理解觀察者模式了函卒。

2. RxJava 中的觀察者模式

  • Observable:被觀察者(ble 結尾的單詞一般表示 可...的,可觀察的)
  • Observer:觀察者(er 結尾的單詞一般表示 ...者撇眯,...人)
  • subscribe:訂閱

首先創(chuàng)建 Observable 和 Observer报嵌,然后 observable.subscribe(observer),這樣 Observable 發(fā)出的事件就會被 Observer 響應熊榛。一般我們不手動創(chuàng)建 Observable锚国,而是由 Retrofit 返回給我們,我們拿到 Observable 之后只需關心如何操作 Observer 中的數(shù)據(jù)即可玄坦。
不過為了由淺入深的演示血筑,還是手動創(chuàng)建 Observable 來講解。

2.1 創(chuàng)建 Observable

常見的幾種方式营搅,不常用的不寫了云挟,因為我覺得這個模塊不是重點。

  • var observable=Observable.create(ObservableOnSubscribe<String> {...})
  • var observable=Observable.just(...)
  • var observable = Observable.fromIterable(mutableListOf(...))

2.1.1 create()

var observable2=Observable.create(object :ObservableOnSubscribe<String>{
            override fun subscribe(emitter: ObservableEmitter<String>) {
                emitter.onNext("Hello ")
                emitter.onNext("RxJava ")
                emitter.onNext("GoodBye ")
                emitter.onComplete()            }

        })

ObservableOnSubscribeObservableEmitter都是陌生人转质,這個要是詳細講涉及到源碼分析园欣,東西可就多了(主要是我不熟悉),所以可以理解成 ObservableOnSubscribe 是用來幫助創(chuàng)建 Observable 的休蟹,ObservableEmitter 是用來發(fā)出事件的(這些事件在觀察者 Observer 中可以響應處理)沸枯。
emitter 一次發(fā)射了三個事件,然后調用了 onComplete() 這些在下面講觀察者 Observer 時還會提到赂弓,一并講解绑榴。

2.1.2 just

var observable=Observable.just("Hello","RxJava","GoodBye")

這句的效果等同于上面用 create 創(chuàng)建 observable,即 調用 3 次 onNext 后再調 onComplete盈魁。

2.1.3 fromIterable

var observable = Observable.fromIterable(mutableListOf("Hello","RxJava","GoodBye"))

這句的效果等同于上面用 create 創(chuàng)建 observable翔怎,即 調用 3 次 onNext 后再調 onComplete。

2.2 創(chuàng)建 Observer

val observer = object : Observer<String> {
            override fun onComplete() {
                Log.e("abc", "-----onComplete-----")
            }

            override fun onSubscribe(d: Disposable) {
                Log.e("abc", "-----onSubscribe-----")
            }

            override fun onNext(t: String) {
                Log.e("abc", "-----onNext-----$t")
            }

            override fun onError(e: Throwable) {
                Log.e("abc", "-----onError-----$e")
            }
        }
//訂閱
observable.subscribe(observer)

log 打印情況:

-----onSubscribe-----
-----onNext-----Hello
-----onNext-----RxJava
-----onNext-----GoodBye
-----onComplete-----

可以看到,先是建立訂閱關系赤套,然后根據(jù)前面 observable 的發(fā)射順序來打印 onNext飘痛,參數(shù)通過 onNext(t: String) 傳進來,最后調用 onComplete容握,多說一句宣脉,在 just 和 fromIterable 的情況下,沒有手動調用 Emitter剔氏,但是仍會先調用 onNext塑猖,最后調用 onComplete

2.3 Consumer 和 Action

這兩個詞意思分別是消費者(可以理解為消費被觀察者發(fā)射出來的事件)和行為(可以理解為響應被觀察者的行為)。對于 Observer 中的 4 個回調方法谈跛,我們未必都能用得到羊苟,如果只需要用到其中的一部分,就需要 Consumer 和 Action 上場了币旧。

有參數(shù)的onSubscribe践险、onNextonError我們用 Consumer 來代替吹菱,無參的onComplete用 Action 代替:

2.3.1 subscribe(Consumer<? super T> onNext)

observable.subscribe(object :Consumer<String>{
            override fun accept(t: String?) {
                Log.e("abc", "-----onNext-----$t")
            }
        })
//打印
-----onNext-----Hello
-----onNext-----RxJava
-----onNext-----GoodBye

說明一下,如果 subscribe 中我們只傳一個對象參數(shù)彭则,那只能是subscribe(Consumer<? super T> onNext)(onNext 方法)鳍刷,不能是 Action 或 Consumer<? super Throwable> onError、Consumer<? super Disposable> onSubscribe

==注意==:Consumer 中的回調方法名稱是 accept俯抖,區(qū)別于前面的 onNext

2.3.2 subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)

帶有兩個 Consumer 參數(shù)输瓜,分別負責 onNext 和 onError 的回調。

observable.subscribe(object : Consumer<String> {
            override fun accept(t: String?) {
                Log.e("abc", "-----onNext-----$t")
            }
        }, object : Consumer<Throwable> {
            override fun accept(t: Throwable?) {
                Log.e("abc", "-----onError-----$e")
            }
        })

如果想要一個帶有兩個 Consumer 但是不是這種搭配(比如subscribe(Consumer<? super T> onNext, Consumer<? super Disposable> onSubscribe))芬萍,可以嗎尤揣?答案是:不行

2.3.3 subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete)

帶有三個參數(shù),分別負責onNext柬祠、onError和onComplete的回調北戏。

observable.subscribe(object : Consumer<String> {
            override fun accept(t: String?) {
                Log.e("abc", "-----onNext-----$t")
            }
        }, object : Consumer<Throwable> {
            override fun accept(t: Throwable?) {
                Log.e("abc", "-----onError-----$e")
            }
        }, object : Action {
            override fun run() {
                Log.e("abc", "-----onComplete-----")
            }
        })

同樣,三個參數(shù)只能有這一種搭配

==注意==:Action 中的回調方法名稱是 run漫蛔,區(qū)別于前面的 onComplete

2.3.4 subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete, Consumer<? super Disposable> onSubscribe)

這種情況和直接 new 出來的 Observer 效果一樣嗜愈。

observable2.subscribe(object : Consumer<String> {
            override fun accept(t: String?) {
                Log.e("abc", "-----onNext-----$t")
            }
        }, object : Consumer<Throwable> {
            override fun accept(t: Throwable?) {
                Log.e("abc", "-----onError-----$e")
            }
        }, object : Action {
            override fun run() {
                Log.e("abc", "-----onComplete-----")
            }
        },object : Consumer<Disposable>{
            override fun accept(t: Disposable?) {
                Log.e("abc", "-----onSubscribe-----")            }
        })

3. 變換

在上面的例子中,Observable 發(fā)送的都是 String 類型的數(shù)據(jù)莽龟,所以在 Observer 中接收的也都是 String蠕嫁,現(xiàn)實開發(fā)中的數(shù)據(jù)多種多樣,而且有時候 Observable 提供的數(shù)據(jù)不是我們理想的情況毯盈,這種情況下就需要用到轉換操作符剃毒。
同樣我們只講常用的:

3.1 map

比如我們想把上游的 Int 類型的數(shù)據(jù)轉換成 String 可以這樣操作:

Observable.fromIterable(mutableListOf<Int>(1, 3, 5, 7, 8))
                .map(object : Function<Int, String> {
                    override fun apply(t: Int): String {
                        return "zb$t"
                    }
                })
                .subscribe(object : Consumer<String> {
                    override fun accept(t: String?) {
                        Log.e("abc","-- $t --")
                    }
                })
//Log日志
-- zb1 --
-- zb3 --
-- zb5 --
-- zb7 --
-- zb8 --

通過map操作符,Int 類型數(shù)據(jù),到 Consumer 里已經(jīng)成了 String(這里為了簡單的只看數(shù)據(jù)就沒用 Observer 而改用 Consumer赘阀,兩者都可以)陪拘。這里面用到了Function,它的第一個泛型是 Observable 中發(fā)射的數(shù)據(jù)類型纤壁,第二個泛型是我們想要裝換之后的數(shù)據(jù)類型左刽,在 Function 的 apply 方法中手動完成數(shù)據(jù)的轉化。
示意圖:map 把圓的變成了方的酌媒。

map

3.2 flatMap

與 map 相似欠痴,不過 flatMap 返回的是一個 Observable,也就是說 Function 的第二個泛型固定了,就是 Observable晚碾,這樣說不太好理解周霉,看個例子:
假如現(xiàn)在有多個學生,每個學生有多個科目菩咨,每個科目考了多次試,現(xiàn)在要打印所有的分數(shù)陡厘。單單只用 map 就不能直接搞定抽米,試試吧

class Course(var name: String, var scores: MutableList<Int>)
class Student(var name: String, var courses: MutableList<Course>)

var stu1Course1 = Course("體育",mutableListOf(80, 81, 82))
var stu1Course2 = Course("美術",mutableListOf(63, 62, 60))
var stu1 = Student("StuA", mutableListOf(stu1Course1, stu1Course2))
var stu2Course1 = Course("音樂",mutableListOf(74, 77, 79))
var stu2Course2 = Course("希臘語",mutableListOf(90, 90, 91))
var stu2 = Student("StuB", mutableListOf(stu2Course1, stu2Course2))

Observable.just(stu1,stu2)
                .map(object :Function<Student,MutableList<Course>>{
                    override fun apply(t: Student): MutableList<Course> {
                        return t.courses
                    }
                })
                .subscribe(object :Consumer<MutableList<Course>>{
                    override fun accept(t: MutableList<Course>?) {
                        for (item in t!!){
                            for (i in item.scores){
                                Log.e("abc","--->$i")
                            }
                        }
                    }
                })

通過兩層 for 循環(huán)可以打印,這也是沒辦法的事糙置,因為在 map 里面只能拿到 Course 集合云茸。使用 flatMap 的情況是這樣的:

Observable.just(stu1, stu2)
                .flatMap(object : Function<Student, ObservableSource<Course>> {
                    override fun apply(t: Student): ObservableSource<Course> {
                        return Observable.fromIterable(t.courses)
                    }
                })
                .flatMap(object : Function<Course, ObservableSource<Int>> {
                    override fun apply(t: Course): ObservableSource<Int> {
                        return Observable.fromIterable(t.scores)
                    }

                })
                .subscribe(object : Consumer<Int> {
                    override fun accept(t: Int?) {
                        Log.e("abc", "---> $t")
                    }
                })
// log 打印
    ---> 80
    ---> 81
    ---> 82
    ---> 63
    ---> 62
    ---> 60
    ---> 74
    ---> 77
    ---> 79
    ---> 90
    ---> 90
    ---> 91

用了兩次 flatMap,鏈式調用比縮進式更清晰谤饭。這里面的 flatMap 返回值類型的是 ObservableSource 并不是我們在前面提到的 Observable标捺,查看 Observable 源碼可以看到,它繼承了 ObservableSource揉抵,所以這種多態(tài)用法是可以的亡容。
另外在 apply 中返回的Observable.fromIterable(t.courses)這一句不就是我們創(chuàng)建 Observable 的方式嗎?
簡單的說冤今,map 是把 Observable 發(fā)射的數(shù)據(jù)變換一下類型闺兢,flatMap 是把數(shù)據(jù)中集合/數(shù)組中的每個元素再次通過 Observable 發(fā)射。
示意圖:faltMap 把一系列圓的通過一系列的 Observable 變成了一系列方的辟汰。

flatMap

圖雖然畫的丑列敲,但是我想意思比較明白了。

3.3 filter

filter是過濾的意思帖汞,通過判斷是否符合我們想要的邏輯戴而,來決定是否發(fā)射事件,只有返回 true 的事件才被發(fā)射翩蘸,其他的被拋棄所意。還以上面的例子為例,假如我們只想看 80 分以上的成績可以這樣過濾:

Observable.just(stu1, stu2)
                .flatMap(object : Function<Student, ObservableSource<Course>> {
                    override fun apply(t: Student): ObservableSource<Course> {
                        return Observable.fromIterable(t.courses)
                    }
                })
                .flatMap(object : Function<Course, ObservableSource<Int>> {
                    override fun apply(t: Course): ObservableSource<Int> {
                        return Observable.fromIterable(t.scores)
                    }

                })
                .filter(object :Predicate<Int>{
                    override fun test(t: Int): Boolean {
                        return t > 80
                    }

                })
                .subscribe(object : Consumer<Int> {
                    override fun accept(t: Int?) {
                        Log.e("abc", "---> $t")
                    }
                })
// log 打印
    ---> 81
    ---> 82
    ---> 90
    ---> 90
    ---> 91

注意,filter 里面不是用 Function 了扶踊,而是 Predicate泄鹏,這個單詞是“基于...”的意思,基于 t > 80秧耗,也就是選擇大于 80 分的成績备籽。

4. 結合 Retrofit 使用

前面 3 小節(jié)講了很多,都是為了講清楚 RxJava 的整個工作流程分井,還沒涉及到線程切換〕碘現(xiàn)實開發(fā)中更多的時候 Observable 是通過 Retrofit 返回給我們的。Retrofit 是一個網(wǎng)絡請求框架尺锚,它基于 OkHttp3珠闰,做了更好的封裝,結合 RxJava 用慣了的話可以大大提到開發(fā)效率瘫辩。還是一樣伏嗜,我們只看怎么用,不涉及源碼解讀伐厌。

4.1 Retrofit 進行簡單的 Get 請求

implementation 'com.squareup.retrofit2:retrofit:2.6.2'
implementation 'com.squareup.retrofit2:converter-gson:2.6.2'

先引入依賴承绸,然后我們請求一個知乎日報的新聞數(shù)據(jù)(點擊查看數(shù)據(jù):https://news-at.zhihu.com/api/4/news/latest):

// ZhEntity
class ZhEntity {
    var date: String? = null
    var stories: MutableList<StoriesBean>? = null
    var top_stories: MutableList<TopStoriesBean>? = null

    class StoriesBean {
        var image_hue: String? = null
        var title: String? = null
        var url: String? = null
        var hint: String? = null
        var ga_prefix: String? = null
        var type: Int = 0
        var id: Int = 0
        var images: MutableList<String>? = null
    }

    class TopStoriesBean {
        var image_hue: String? = null
        var hint: String? = null
        var url: String? = null
        var image: String? = null
        var title: String? = null
        var ga_prefix: String? = null
        var type: Int = 0
        var id: Int = 0
    }
}
// ApiService
import retrofit2.Call
import retrofit2.http.GET
import retrofit2.http.Url
interface ApiService {
    @GET("news/latest")
    fun getLatestNews(): Call<ZhEntity>
}
// 調用
val retrofit = Retrofit.Builder()
                .addConverterFactory(GsonConverterFactory.create())
                .baseUrl("https://news-at.zhihu.com/api/4/")
                .build()
        val service: ApiService = retrofit.create(ApiService::class.java)
        val call: Call<ZhEntity> = service.getLatestNews()
        call.enqueue(object : Callback<ZhEntity> {
            override fun onFailure(call: Call<ZhEntity>?, t: Throwable?) {
                Log.e("abc", "--> $t")
            }

            override fun onResponse(call: Call<ZhEntity>?, response: Response<ZhEntity>?) {
                Log.e("abc", "-->${Gson().toJson(response?.body())}")
            }
        })

代碼有點多,分別解釋一下弧械,ZhEntity 是實體類八酒,ApiService 是一個接口,里面用注解的方式定義了一個方法 getLatestNews刃唐,@GET表示 Get 請求,由此可以想象肯定有@POST界轩,@GET里面還有參數(shù)画饥,這是請求地址 BaseUrl 后面的子文件夾。

getLatestNews 函數(shù)返回類型是 Call浊猾,這個是 Retrofit 定義用來請求網(wǎng)絡的抖甘。
第三段代碼,現(xiàn)實創(chuàng)建了一個 Retrofit 對象葫慎,addConverterFactory(GsonConverterFactory.create())是把接口返回的 json 類型的數(shù)據(jù)轉換成實體類的類型衔彻,這個東西在implementation 'com.squareup.retrofit2:converter-gson:2.6.2'時被引入。

然后是一系列的 Call 調用 qnqueue 操作什么的偷办,看得出艰额,沒有用 Rxjava 一樣可以完成網(wǎng)絡請求,而且代碼不復雜椒涯,好了柄沮,本文到此結束。

好吧,我在扯淡祖搓。繼續(xù)講狱意,有人說不喜歡 url 被截成兩段,可以這樣修改拯欧,效果完全相同:

// ApiService
import retrofit2.Call
import retrofit2.http.GET
import retrofit2.http.Url
interface ApiService {
    @GET
    fun getLatestNews(@Url url:String): Call<ZhEntity>
}
// 調用
val retrofit = Retrofit.Builder()
                .addConverterFactory(GsonConverterFactory.create())
                .baseUrl("https://www.baidu.com")
                .build()
        val service: ApiService = retrofit.create(ApiService::class.java)
        val call: Call<ZhEntity> = service.getLatestNews("https://news-at.zhihu.com/api/4/news/latest")
        call.enqueue(object : Callback<ZhEntity> {
            override fun onFailure(call: Call<ZhEntity>?, t: Throwable?) {
                Log.e("abc", "--> $t")
            }

            override fun onResponse(call: Call<ZhEntity>?, response: Response<ZhEntity>?) {
                Log.e("abc", "-->${Gson().toJson(response?.body())}")
            }
        })

baseUrl 還是要的详囤,不過設置成其他值無所謂了,因為不會被請求镐作。

4.2 Retrofit 結合 RxJava

啰嗦了這么多藏姐,才講到這里。抱歉水平有限滑肉,沒辦法用簡單的語言說清復雜的問題包各。
首先,引入依賴時多加一句對 RxJava 的支持:

implementation 'com.squareup.retrofit2:retrofit:2.6.2'
implementation 'com.squareup.retrofit2:converter-gson:2.6.2'
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.6.2'

然后靶庙,我們的 getLatestNews 就可以直接返回一個 Observable 了问畅!

import io.reactivex.Observable
import retrofit2.http.GET

interface ApiService {
    @GET("news/latest")
    fun getLatestNews(): Observable<ZhEntity>
}

放心寫,不會報錯六荒,有了 Observable护姆,就好辦了,輕車熟路:

val retrofit = Retrofit.Builder()
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                .baseUrl("https://news-at.zhihu.com/api/4/")
                .build()
        val service: ApiService = retrofit.create(ApiService::class.java)
        val observable = service.getLatestNews()
        observable.subscribeOn(Schedulers.newThread())
                .subscribe(object : Observer<ZhEntity> {
            override fun onComplete() {
            }

            override fun onSubscribe(d: Disposable) {
            }

            override fun onNext(t: ZhEntity) {
                Log.e("abc","-->${Gson().toJson(t)}")
            }

            override fun onError(e: Throwable) {
                Log.e("abc","-->$e")
            }
        })

除了 Observable 來源變了掏击,其他與本文最早講的 RxJava 沒什么不同卵皂。非要說不同,有一點砚亭,多了一句subscribeOn(Schedulers.newThread())灯变,下面講講這個。

4.3 線程切換

  • subscribeOn:定義 Observable 發(fā)射事件所處的線程
  • observeOn:定義轉換/響應事件所處的線程(map捅膘、flatMap添祸、Observer 等),可多次切換

線程切換比較常見寻仗,比如 子線程請求網(wǎng)絡數(shù)據(jù)主線程更新 UI刃泌,subscribeOnobserveOn有哪些線程可以選擇?它們又是怎樣使用的署尤?我們先看一個例子:

Thread(object : Runnable {
            override fun run() {
                Log.e("abc","Thread當前線程:${Thread.currentThread().name}")
                observable.subscribeOn(Schedulers.newThread())
                        .doOnNext(object :Consumer<ZhEntity>{
                            override fun accept(t: ZhEntity?) {
                                Log.e("abc","doOnNext當前線程:${Thread.currentThread().name}")
                            }
                        })
                        .observeOn(Schedulers.io())
                        .flatMap(object :Function<ZhEntity,ObservableSource<ZhEntity.StoriesBean>>{
                            override fun apply(t: ZhEntity): ObservableSource<ZhEntity.StoriesBean> {
                                Log.e("abc","flatMap當前線程:${Thread.currentThread().name}")
                                return Observable.fromIterable(t.stories)
                            }
                        })
                        .observeOn(AndroidSchedulers.mainThread())
                        .subscribe(object : Observer<ZhEntity.StoriesBean> {
                            override fun onComplete() {
                            }

                            override fun onSubscribe(d: Disposable) {
                                Log.e("abc","onSubscribe當前線程:${Thread.currentThread().name}")
                            }

                            override fun onNext(t: ZhEntity.StoriesBean) {
                                Log.e("abc","Observer當前線程:${Thread.currentThread().name}")
                                Log.e("abc", "-->${Gson().toJson(t)}")
                            }

                            override fun onError(e: Throwable) {
                                Log.e("abc", "-->$e")
                            }
                        })
            }
        }).start()
// log 打印
Thread當前線程:Thread-4
onSubscribe當前線程:Thread-4
doOnNext當前線程:RxNewThreadScheduler-1
flatMap當前線程:RxCachedThreadScheduler-1
Observer當前線程:main
Observer當前線程:main
Observer當前線程:main

這里面只有doOnNext沒講過耙替,現(xiàn)在說說:每發(fā)送 onNext() 之前都會先回調這個方法,所以 doOnNext 和 Observable 的 subscribe(發(fā)射事件的方法)處于同一個線程曹体。
從這個例子可以看出:

  1. Observable 和 Observer 建立訂閱關系是在當前線程中(Thread-4)
  2. subscribeOn決定 Observable 發(fā)射事件所處的線程(即 Retrofit 請求網(wǎng)絡所在線程)
  3. 第一次observeOn決定 flatMap 所在的線程(RxCachedThreadScheduler-1)
  4. 再次observeOn決定 Observer 所在線程(Android 主線程 main)

所以每次調用observeOn就會切換線程俗扇,并且決定的是接下來的變換/響應的線程。多說一句混坞,多次設置 subscribeOn狐援,只有第一次生效钢坦。

線程可選值

線 程 名 稱 說明
Schedulers.immediate() 默認的 Scheduler,直接在當前線程運行,相當于不指定線程
Schedulers.newThread() 啟用新線程啥酱,并在新線程執(zhí)行操作
Schedulers.io() I/O 操作(讀寫文件爹凹、讀寫數(shù)據(jù)庫、網(wǎng)絡信息交互等)所使用的 Scheduler镶殷。行為模式和 newThread() 差不多禾酱,區(qū)別在于 io() 的內部實現(xiàn)是是用一個無數(shù)量上限的線程池,可以重用空閑的線程绘趋,因此多數(shù)情況下 io() 比 newThread() 更有效率颤陶。不要把計算工作放在 io() 中,可以避免創(chuàng)建不必要的線程
Schedulers.computation() 計算所使用的 Scheduler陷遮。這個計算指的是 CPU 密集型計算滓走,即不會被 I/O 等操作限制性能的操作,例如圖形的計算帽馋。這個 Scheduler 使用的固定的線程池搅方,大小為 CPU 核數(shù)。不要把 I/O 操作放在 computation() 中绽族,否則 I/O 操作的等待時間會浪費 CPU
AndroidSchedulers.mainThread() Android 主線程

4.4 Disposable 和 CompositeDisposable

最后介紹一下這兩個類姨涡,Disposable前文出現(xiàn)過,在 Observer 的 onSubscribe 函數(shù)中吧慢,有一個 Disposable 類型的參數(shù):override fun onSubscribe(d: Disposable) {}涛漂,通過前面介紹我們知道,Observable 和 Observer 建立訂閱關系時會調用 onSubscribe 方法检诗,但是沒有說這個參數(shù)的作用匈仗。

4.4.1 DisPosable

Disposable 的 dispose() 函數(shù)可以用來解除訂閱,這樣就不會收到 Observable 發(fā)射的事件:

var dis ?= null
val observable = Observable.fromIterable(mutableListOf("Hello", "RxJava", "GoodBye"))
        val observer = object : Observer<String> {
            override fun onComplete() {
            }
            override fun onSubscribe(d: Disposable) {
                dis=d
                Log.e("abc", "-----onSubscribe-----$d")
            }

            override fun onNext(t: String) {
                if (t=="Hello") dis.dispose()
                Log.e("abc", "-----onNext-----$t")
            }

            override fun onError(e: Throwable) {
            }
        }
observable.subscribe(observer)
// log 打印
-----onNext-----Hello

可以看到逢慌,調用dis.dispose()后锚沸,就不在打印上游發(fā)射的"RxJava"和"GoodBye"了。

4.4.2 CompositeDisposable

CompositeDisposable 可以用來管理多個 Disposable涕癣,通過add()方法添加 Disposable 對象,然后在 onDestroy 方法里面調用clear()或者dispose()來清除所有的 Disposable前标,這樣可以防止內存泄漏坠韩。

val cDis = CompositeDisposable()
// ...代碼省略
override fun onSubscribe(d: Disposable) {
                cDis.add(d)
            }
// ...代碼省略
override fun onDestroy() {
        super.onDestroy()
        cDis.clear()
    }

多說一句,通過查看RxJava2CallAdapterFactory.create()源碼可知炼列,dispose()方法能主動斷開 Observable 和 Observer 之間的連接只搁,還能取消 Retrofit 的網(wǎng)絡請求,所以放心的用吧俭尖。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末氢惋,一起剝皮案震驚了整個濱河市洞翩,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌焰望,老刑警劉巖骚亿,帶你破解...
    沈念sama閱讀 210,914評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異熊赖,居然都是意外死亡来屠,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,935評論 2 383
  • 文/潘曉璐 我一進店門震鹉,熙熙樓的掌柜王于貴愁眉苦臉地迎上來俱笛,“玉大人,你說我怎么就攤上這事传趾∮ぃ” “怎么了?”我有些...
    開封第一講書人閱讀 156,531評論 0 345
  • 文/不壞的土叔 我叫張陵浆兰,是天一觀的道長磕仅。 經(jīng)常有香客問我,道長镊讼,這世上最難降的妖魔是什么宽涌? 我笑而不...
    開封第一講書人閱讀 56,309評論 1 282
  • 正文 為了忘掉前任,我火速辦了婚禮蝶棋,結果婚禮上卸亮,老公的妹妹穿的比我還像新娘。我一直安慰自己玩裙,他們只是感情好兼贸,可當我...
    茶點故事閱讀 65,381評論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著吃溅,像睡著了一般溶诞。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上决侈,一...
    開封第一講書人閱讀 49,730評論 1 289
  • 那天螺垢,我揣著相機與錄音,去河邊找鬼赖歌。 笑死枉圃,一個胖子當著我的面吹牛,可吹牛的內容都是我干的庐冯。 我是一名探鬼主播孽亲,決...
    沈念sama閱讀 38,882評論 3 404
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼展父!你這毒婦竟也來了返劲?” 一聲冷哼從身側響起玲昧,我...
    開封第一講書人閱讀 37,643評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎篮绿,沒想到半個月后孵延,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,095評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡搔耕,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,448評論 2 325
  • 正文 我和宋清朗相戀三年隙袁,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片弃榨。...
    茶點故事閱讀 38,566評論 1 339
  • 序言:一個原本活蹦亂跳的男人離奇死亡菩收,死狀恐怖,靈堂內的尸體忽然破棺而出鲸睛,到底是詐尸還是另有隱情娜饵,我是刑警寧澤,帶...
    沈念sama閱讀 34,253評論 4 328
  • 正文 年R本政府宣布官辈,位于F島的核電站箱舞,受9級特大地震影響,放射性物質發(fā)生泄漏拳亿。R本人自食惡果不足惜晴股,卻給世界環(huán)境...
    茶點故事閱讀 39,829評論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望肺魁。 院中可真熱鬧电湘,春花似錦、人聲如沸鹅经。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,715評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽瘾晃。三九已至贷痪,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間蹦误,已是汗流浹背劫拢。 一陣腳步聲響...
    開封第一講書人閱讀 31,945評論 1 264
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留强胰,地道東北人尚镰。 一個月前我還...
    沈念sama閱讀 46,248評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像哪廓,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子初烘,可洞房花燭夜當晚...
    茶點故事閱讀 43,440評論 2 348