RxKotlin 例子不超過(guò)15行教程 4----Observer Subscribe 與 Hot/Cold Observable

本節(jié)代碼中的 observer 就是第二節(jié)中的

Observer 與 Subscribe

按照慣例,先來(lái)看兩段能跑的代碼

兩段能跑的代碼

// 4.1.kt
import io.reactivex.Observable

fun main(args: Array<String>) {
    val observable: Observable<Int> = Observable.range(1, 3)
    observable.subscribe({  // 我知道你要問(wèn)我為什么 subscribe 后面還可以接三個(gè) Lambda,先看例子,下面說(shuō)
        //onNext method
        println("Next $it")
    }, {
        //onError Method
        println("Error ${it.message}")
    }, {
        //onComplete Method
        println("All Completed")
    })
}

輸出

Next 1
Next 2
Next 3
All Completed

再來(lái)一段(上一節(jié)用過(guò)的例子)

// 3.11.kt
import io.reactivex.Observable

fun main(args: Array<String>) {
    Observable.range(1, 3).subscribe(observer)
}

輸出

New Subscription  // 上面那個(gè)例子沒(méi)有這一行
Next 1
Next 2
Next 3
All Completed

上面代碼的主旨

Observer

之前(第二節(jié))我們說(shuō)過(guò),一個(gè) Observer 需要實(shí)現(xiàn)四個(gè)方法(它們的作用參見(jiàn)第二節(jié))

  • onNext
  • onComplete
  • onError
  • onSubscribe

當(dāng)我們把 Observable 連接到 Observer 上的時(shí)候,系統(tǒng)會(huì)調(diào)用這四個(gè)方法并把相應(yīng)的值傳給它們丰歌。

subscribe 的參數(shù)都能是什么

subscribe 在 ReactiveX 中有幾個(gè)重載方法,這里不列出翔脱∪鄄簦基本模式有這兩個(gè)

  • subscribe(onNext,onError,onComplete,onSubscribe)
    這幾個(gè)參數(shù)都可以省略,但是只能從后往前省略(這句是廢話(huà))
    是廢話(huà)也要說(shuō),因?yàn)?subscribe 是在 Java 文件中定義的,不能使用 Kotlin 的命名參數(shù)
    4.1.kt 中省略了 onSubscribe
  • subscribe(observer)
    3.11.kt 已經(jīng)很清晰,這里不展開(kāi)了

除了 subscribe 方法,還有 RxKotlin 提供的小語(yǔ)法糖 subscribeBy

這個(gè)函數(shù)是 RxKotlin 為 Observable (等可以 subscribe 的對(duì)象)定義的擴(kuò)展函數(shù),函數(shù)定義如下

fun <T : Any> Observable<T>.subscribeBy(
        onError: (Throwable) -> Unit = onErrorStub,
        onComplete: () -> Unit = onCompleteStub,
        onNext: (T) -> Unit = onNextStub
        ): Disposable = subscribe(onNext, onError, onComplete)  // 好的好的,我知道你要問(wèn) Disposable 是什么,稍等。

因?yàn)楸欢x在 Kotlin 文件中,它可以使用命名參數(shù)(例子見(jiàn) 第一節(jié) 1.kt)

Subscribe

從之前的例子可知,subscribe 可以連接 ObservableObserver涤姊。
它有兩種形式(上面說(shuō)過(guò),這里再概括一下)

  • onNext 等,以參數(shù)的形式傳進(jìn)去
  • 直接傳入一個(gè) Observer 對(duì)象

如果你選擇第一種形式,那么 subscribe 方法是有返回值的,返回值類(lèi)型是 Disposable (不要急,它的介紹馬上就到了)
如果你選擇第二種形式,那么 subscribe 方法是沒(méi)有返回值的
這兩種形式中的 onSubscribe 都是一個(gè) (d:Disposable):Unit 類(lèi)型的函數(shù)。
那么 Disposable 有什么用呢?

Disposable

disposable: 一次性的,可任意處理的; 用后就拋棄的; 免洗的; 可供使用的肋拔。講真,這幾個(gè)中文翻譯放在這里我覺(jué)得并不是很合適,我也沒(méi)有想到合適的中文翻譯(如果有合適的歡迎指出)。我就一直用英文了呀酸。
Disposable 對(duì)象的 dispose 方法可以停止本次訂閱
看一個(gè)例子
我保證這個(gè)例子是為數(shù)不多的長(zhǎng)例子之一,真的不能再精簡(jiǎn)了
下面示例用到了 lateinit 可以自行 Google 下,此處不介紹凉蜂。(如果有好的鏈接歡迎發(fā)給我,加在這里)

// 4.2.kt
import io.reactivex.Observable
import io.reactivex.Observer
import io.reactivex.disposables.Disposable
import java.util.concurrent.TimeUnit

fun main(args: Array<String>) {

    val observale: Observable<Long> = Observable.interval(100, TimeUnit.MILLISECONDS)

    val observer: Observer<Long> = object : Observer<Long> {
        lateinit var disposable: Disposable

        override fun onSubscribe(d: Disposable) {
            disposable = d
        }

        override fun onNext(item: Long) {
            if (item >= 5 && !disposable.isDisposed) {
                disposable.dispose()
                println("Disposed")
            }
            println("Received $item")
        }

        override fun onError(e: Throwable) {
            println("Error ${e.message}")
        }

        override fun onComplete() {
            println("Complete")
        }

    }

    observale.subscribe(observer)
    Thread.sleep(1000)
}

輸出

Received 0
Received 1
Received 2
Received 3
Received 4
Disposed    // 注釋1
Received 5 // 注釋2
// 注釋3

注釋1
dispose 處理后不會(huì)執(zhí)行 observer 的 onComplete 方法(所以 Complete 沒(méi)有輸出)
注釋2
disposable.dispose() 之后,observer 不會(huì)再處理其它值(所以 Received 6 Received 7 等等并沒(méi)有輸出)
但是當(dāng)前值依然會(huì)繼續(xù)處理(所以 Received 5 依然被輸出)
總結(jié)一下, Disposable 是用來(lái)控制訂閱的

下面我們回到 Observable,看看它的分類(lèi)

Hot/Cold Observable

在本教程前面所有示例中,如果多次訂閱同一個(gè) Observable,則所有訂閱都會(huì)得到從一開(kāi)始的所有值性誉。
例子

// 4.3.kt
import io.reactivex.Observable
import io.reactivex.rxkotlin.toObservable

// Cold Observables
fun main(args: Array<String>) {
    val observable: Observable<Int> = listOf(1, 2, 3, 4).toObservable()

    observable.subscribe(observer)

    observable.subscribe(observer)
}

輸出

New Subscription
Next 1
Next 2
Next 3
Next 4
All Completed
New Subscription
Next 1
Next 2
Next 3
Next 4
All Completed

我們可以看到每一個(gè) Observer 都被推送了了從 1-4 的所有值窿吩。
到目前為止,我們遇到的所有 Observable 都是這樣的。
這樣的 Observable 被稱(chēng)作 Cold Observable
我之前曾經(jīng)比喻 Observable 為電臺(tái)错览,這是有一些不恰當(dāng)?shù)娜已恪R驗(yàn)楫?dāng)你錯(cuò)過(guò)時(shí)間再打開(kāi)電臺(tái)會(huì)聽(tīng)不到原先的內(nèi)容。
Cold Observable 更像是光盤(pán)(容量可能無(wú)限),隨時(shí)打開(kāi)都能從頭開(kāi)始聽(tīng)倾哺。

電臺(tái)這個(gè)比喻更適合 Hot Observable,看下一個(gè)例子

// 4.4.kt
import io.reactivex.rxkotlin.toObservable

//Hot Observable
fun main(args: Array<String>) {
    val connectableObservable = listOf(1, 2, 3).toObservable().publish()  // 注釋1
    connectableObservable.subscribe({ println("Subscription 1: $it") })  // 描點(diǎn)1
    connectableObservable.subscribe({ println("Subscription 2: $it") })  // 描點(diǎn)2
    connectableObservable.connect()  // 注釋2
    connectableObservable.subscribe({ println("Subscription 3: $it") })  // 注釋3
}

輸出

Subscription 1: 1
Subscription 2: 1
Subscription 1: 2
Subscription 2: 2
Subscription 1: 3
Subscription 2: 3
// 并沒(méi)有輸出 Subscription 3

注釋1
我們用 publish 方法把 Cold Observable 變成 ConnectableObservable (ConnectableObservableHot Observable 的一種)
注釋2
ConnectableObservable描點(diǎn)1描點(diǎn)2 處都不會(huì)發(fā)送消息,它會(huì)在 注釋2 處(調(diào)用 connect 方法時(shí))開(kāi)始發(fā)送消息
Cold Observable 會(huì)在調(diào)用 subscribe 時(shí)開(kāi)始發(fā)送消息
如果訂閱晚了(如 注釋3),則會(huì)錯(cuò)過(guò)一些消息(在這里,注釋3 錯(cuò)過(guò)了所有消息(計(jì)算機(jī)速度太快....),接下來(lái)有其他例子,不要急)
注釋3
訂閱3不會(huì)收到任何信息

我們來(lái)看下一個(gè)例子,在這個(gè)例子中,調(diào)用 connect 方法后我們又增加了新的訂閱,這個(gè)訂閱會(huì)丟失部分消息

import io.reactivex.Observable
import java.util.concurrent.TimeUnit

fun main(args: Array<String>) {
    val connectableObservable = Observable.interval(10, TimeUnit.MILLISECONDS).publish()
    connectableObservable.subscribe({ println("Subscription 1: $it") })
    connectableObservable.subscribe({ println("Subscription 2: $it") })
    connectableObservable.connect()  // ConnectableObservable 開(kāi)始發(fā)送消息
    println("Sleep 1 starts")
    Thread.sleep(20)
    println("Sleep 1 ends")
    connectableObservable.subscribe({ println("Subscription 3: $it") })  // 不用再次調(diào)用 connect 方法
    println("Sleep 2 starts")
    Thread.sleep(30)
    println("Sleep 2 ends")
}

輸出(有點(diǎn)長(zhǎng))

Sleep 1 starts
Subscription 1: 0
Subscription 2: 0
Subscription 1: 1
Subscription 2: 1 // 注釋1
Sleep 1 ends      // 開(kāi)始 訂閱3
Sleep 2 starts
Subscription 1: 2
Subscription 2: 2
Subscription 3: 2  // 注釋2
Subscription 1: 3
Subscription 2: 3
Subscription 3: 3
Subscription 1: 4
Subscription 2: 4
Subscription 3: 4
Sleep 2 ends

注釋1
到這里我們沒(méi)有開(kāi)始 訂閱3 所以沒(méi)有輸出任何 Subscription 3
注釋2
訂閱3 的輸出是從 2 開(kāi)始的,它錯(cuò)過(guò)了 01

這一節(jié)到這里就 OK 了,明天說(shuō) Subject

RxKotlin 例子不超過(guò)15行教程 1----環(huán)境配置與初體驗(yàn)

RxKotlin 例子不超過(guò)15行教程 2----Observable Observer 與 Subscribe 簡(jiǎn)介

RxKotlin 例子不超過(guò)15行教程 3----Observable 的創(chuàng)建

RxKotlin 例子不超過(guò)15行教程 4----Observer Subscribe 與 Hot/Cold Observable

RxKotlin 例子不超過(guò)15行教程 5----Subject

RxKotlin 例子不超過(guò)15行教程 6----Operator 與 Marble Diagram

RxKotlin 例子不超過(guò)15行教程 7----Backpressure Flowable 與 Subscriber 簡(jiǎn)介

RxKotlin 例子不超過(guò)15行教程 8----Error Handling

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末轧邪,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子羞海,更是在濱河造成了極大的恐慌忌愚,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,277評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件却邓,死亡現(xiàn)場(chǎng)離奇詭異硕糊,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)腊徙,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門(mén)简十,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人撬腾,你說(shuō)我怎么就攤上這事勺远。” “怎么了时鸵?”我有些...
    開(kāi)封第一講書(shū)人閱讀 163,624評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵胶逢,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我饰潜,道長(zhǎng)初坠,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,356評(píng)論 1 293
  • 正文 為了忘掉前任彭雾,我火速辦了婚禮碟刺,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘薯酝。我一直安慰自己半沽,他們只是感情好爽柒,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,402評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著者填,像睡著了一般浩村。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上占哟,一...
    開(kāi)封第一講書(shū)人閱讀 51,292評(píng)論 1 301
  • 那天心墅,我揣著相機(jī)與錄音,去河邊找鬼榨乎。 笑死怎燥,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的蜜暑。 我是一名探鬼主播铐姚,決...
    沈念sama閱讀 40,135評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼肛捍!你這毒婦竟也來(lái)了谦屑?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 38,992評(píng)論 0 275
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤篇梭,失蹤者是張志新(化名)和其女友劉穎氢橙,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體恬偷,經(jīng)...
    沈念sama閱讀 45,429評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡悍手,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,636評(píng)論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了袍患。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片坦康。...
    茶點(diǎn)故事閱讀 39,785評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖诡延,靈堂內(nèi)的尸體忽然破棺而出滞欠,到底是詐尸還是另有隱情,我是刑警寧澤肆良,帶...
    沈念sama閱讀 35,492評(píng)論 5 345
  • 正文 年R本政府宣布筛璧,位于F島的核電站,受9級(jí)特大地震影響惹恃,放射性物質(zhì)發(fā)生泄漏夭谤。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,092評(píng)論 3 328
  • 文/蒙蒙 一巫糙、第九天 我趴在偏房一處隱蔽的房頂上張望朗儒。 院中可真熱鬧,春花似錦、人聲如沸醉锄。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,723評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)恳不。三九已至檩小,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間妆够,已是汗流浹背识啦。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,858評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工负蚊, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留神妹,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,891評(píng)論 2 370
  • 正文 我出身青樓家妆,卻偏偏與公主長(zhǎng)得像鸵荠,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子伤极,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,713評(píng)論 2 354