本節(jié)代碼中的 observer 就是第二節(jié)中的
Observer 與 Subscribe
按照慣例,先來(lái)看兩段能跑的代碼
兩段能跑的代碼
// 4.1.kt
import io.reactivex.Observable
fun main(args: Array<String>) {
val observable: Observable<Int> = Observable.range(1, 3)
observable.subscribe({ // 我知道你要問(wèn)我為什么 subscribe 后面還可以接三個(gè) Lambda,先看例子,下面說(shuō)
//onNext method
println("Next $it")
}, {
//onError Method
println("Error ${it.message}")
}, {
//onComplete Method
println("All Completed")
})
}
輸出
Next 1
Next 2
Next 3
All Completed
再來(lái)一段(上一節(jié)用過(guò)的例子)
// 3.11.kt
import io.reactivex.Observable
fun main(args: Array<String>) {
Observable.range(1, 3).subscribe(observer)
}
輸出
New Subscription // 上面那個(gè)例子沒(méi)有這一行
Next 1
Next 2
Next 3
All Completed
上面代碼的主旨
Observer
之前(第二節(jié))我們說(shuō)過(guò),一個(gè) Observer
需要實(shí)現(xiàn)四個(gè)方法(它們的作用參見(jiàn)第二節(jié))
- onNext
- onComplete
- onError
- onSubscribe
當(dāng)我們把 Observable 連接到 Observer 上的時(shí)候,系統(tǒng)會(huì)調(diào)用這四個(gè)方法并把相應(yīng)的值傳給它們丰歌。
subscribe 的參數(shù)都能是什么
subscribe 在 ReactiveX 中有幾個(gè)重載方法,這里不列出翔脱∪鄄簦基本模式有這兩個(gè)
-
subscribe(onNext,onError,onComplete,onSubscribe)
這幾個(gè)參數(shù)都可以省略,但是只能從后往前省略(這句是廢話(huà))
是廢話(huà)也要說(shuō),因?yàn)?subscribe 是在 Java 文件中定義的,不能使用 Kotlin 的命名參數(shù)
4.1.kt
中省略了onSubscribe
-
subscribe(observer)
3.11.kt
已經(jīng)很清晰,這里不展開(kāi)了
除了 subscribe
方法,還有 RxKotlin 提供的小語(yǔ)法糖 subscribeBy
這個(gè)函數(shù)是 RxKotlin 為 Observable (等可以 subscribe 的對(duì)象)定義的擴(kuò)展函數(shù),函數(shù)定義如下
fun <T : Any> Observable<T>.subscribeBy(
onError: (Throwable) -> Unit = onErrorStub,
onComplete: () -> Unit = onCompleteStub,
onNext: (T) -> Unit = onNextStub
): Disposable = subscribe(onNext, onError, onComplete) // 好的好的,我知道你要問(wèn) Disposable 是什么,稍等。
因?yàn)楸欢x在 Kotlin 文件中,它可以使用命名參數(shù)(例子見(jiàn) 第一節(jié) 1.kt
)
Subscribe
從之前的例子可知,subscribe
可以連接 Observable
與 Observer
涤姊。
它有兩種形式(上面說(shuō)過(guò),這里再概括一下)
- 把
onNext
等,以參數(shù)的形式傳進(jìn)去 - 直接傳入一個(gè)
Observer
對(duì)象
如果你選擇第一種形式,那么 subscribe
方法是有返回值的,返回值類(lèi)型是 Disposable
(不要急,它的介紹馬上就到了)
如果你選擇第二種形式,那么 subscribe
方法是沒(méi)有返回值的
這兩種形式中的 onSubscribe
都是一個(gè) (d:Disposable):Unit
類(lèi)型的函數(shù)。
那么 Disposable
有什么用呢?
Disposable
disposable: 一次性的,可任意處理的; 用后就拋棄的; 免洗的; 可供使用的肋拔。講真,這幾個(gè)中文翻譯放在這里我覺(jué)得并不是很合適,我也沒(méi)有想到合適的中文翻譯(如果有合適的歡迎指出)。我就一直用英文了呀酸。
Disposable
對(duì)象的 dispose
方法可以停止本次訂閱
看一個(gè)例子
我保證這個(gè)例子是為數(shù)不多的長(zhǎng)例子之一,真的不能再精簡(jiǎn)了
下面示例用到了 lateinit
可以自行 Google 下,此處不介紹凉蜂。(如果有好的鏈接歡迎發(fā)給我,加在這里)
// 4.2.kt
import io.reactivex.Observable
import io.reactivex.Observer
import io.reactivex.disposables.Disposable
import java.util.concurrent.TimeUnit
fun main(args: Array<String>) {
val observale: Observable<Long> = Observable.interval(100, TimeUnit.MILLISECONDS)
val observer: Observer<Long> = object : Observer<Long> {
lateinit var disposable: Disposable
override fun onSubscribe(d: Disposable) {
disposable = d
}
override fun onNext(item: Long) {
if (item >= 5 && !disposable.isDisposed) {
disposable.dispose()
println("Disposed")
}
println("Received $item")
}
override fun onError(e: Throwable) {
println("Error ${e.message}")
}
override fun onComplete() {
println("Complete")
}
}
observale.subscribe(observer)
Thread.sleep(1000)
}
輸出
Received 0
Received 1
Received 2
Received 3
Received 4
Disposed // 注釋1
Received 5 // 注釋2
// 注釋3
注釋1
dispose 處理后不會(huì)執(zhí)行 observer 的 onComplete
方法(所以 Complete 沒(méi)有輸出)
注釋2
disposable.dispose()
之后,observer 不會(huì)再處理其它值(所以 Received 6 Received 7 等等并沒(méi)有輸出)
但是當(dāng)前值依然會(huì)繼續(xù)處理(所以 Received 5 依然被輸出)
總結(jié)一下, Disposable
是用來(lái)控制訂閱的
下面我們回到 Observable
,看看它的分類(lèi)
Hot/Cold Observable
在本教程前面所有示例中,如果多次訂閱同一個(gè) Observable,則所有訂閱都會(huì)得到從一開(kāi)始的所有值性誉。
例子
// 4.3.kt
import io.reactivex.Observable
import io.reactivex.rxkotlin.toObservable
// Cold Observables
fun main(args: Array<String>) {
val observable: Observable<Int> = listOf(1, 2, 3, 4).toObservable()
observable.subscribe(observer)
observable.subscribe(observer)
}
輸出
New Subscription
Next 1
Next 2
Next 3
Next 4
All Completed
New Subscription
Next 1
Next 2
Next 3
Next 4
All Completed
我們可以看到每一個(gè) Observer 都被推送了了從 1-4 的所有值窿吩。
到目前為止,我們遇到的所有 Observable
都是這樣的。
這樣的 Observable
被稱(chēng)作 Cold Observable
我之前曾經(jīng)比喻 Observable
為電臺(tái)错览,這是有一些不恰當(dāng)?shù)娜已恪R驗(yàn)楫?dāng)你錯(cuò)過(guò)時(shí)間再打開(kāi)電臺(tái)會(huì)聽(tīng)不到原先的內(nèi)容。
Cold Observable
更像是光盤(pán)(容量可能無(wú)限),隨時(shí)打開(kāi)都能從頭開(kāi)始聽(tīng)倾哺。
電臺(tái)這個(gè)比喻更適合 Hot Observable
,看下一個(gè)例子
// 4.4.kt
import io.reactivex.rxkotlin.toObservable
//Hot Observable
fun main(args: Array<String>) {
val connectableObservable = listOf(1, 2, 3).toObservable().publish() // 注釋1
connectableObservable.subscribe({ println("Subscription 1: $it") }) // 描點(diǎn)1
connectableObservable.subscribe({ println("Subscription 2: $it") }) // 描點(diǎn)2
connectableObservable.connect() // 注釋2
connectableObservable.subscribe({ println("Subscription 3: $it") }) // 注釋3
}
輸出
Subscription 1: 1
Subscription 2: 1
Subscription 1: 2
Subscription 2: 2
Subscription 1: 3
Subscription 2: 3
// 并沒(méi)有輸出 Subscription 3
注釋1
我們用 publish
方法把 Cold Observable
變成 ConnectableObservable
(ConnectableObservable
是 Hot Observable
的一種)
注釋2
ConnectableObservable
在 描點(diǎn)1
和 描點(diǎn)2
處都不會(huì)發(fā)送消息,它會(huì)在 注釋2
處(調(diào)用 connect
方法時(shí))開(kāi)始發(fā)送消息
而 Cold Observable
會(huì)在調(diào)用 subscribe
時(shí)開(kāi)始發(fā)送消息
如果訂閱晚了(如 注釋3
),則會(huì)錯(cuò)過(guò)一些消息(在這里,注釋3
錯(cuò)過(guò)了所有消息(計(jì)算機(jī)速度太快....),接下來(lái)有其他例子,不要急)
注釋3
訂閱3
不會(huì)收到任何信息
我們來(lái)看下一個(gè)例子,在這個(gè)例子中,調(diào)用 connect
方法后我們又增加了新的訂閱,這個(gè)訂閱會(huì)丟失部分消息
import io.reactivex.Observable
import java.util.concurrent.TimeUnit
fun main(args: Array<String>) {
val connectableObservable = Observable.interval(10, TimeUnit.MILLISECONDS).publish()
connectableObservable.subscribe({ println("Subscription 1: $it") })
connectableObservable.subscribe({ println("Subscription 2: $it") })
connectableObservable.connect() // ConnectableObservable 開(kāi)始發(fā)送消息
println("Sleep 1 starts")
Thread.sleep(20)
println("Sleep 1 ends")
connectableObservable.subscribe({ println("Subscription 3: $it") }) // 不用再次調(diào)用 connect 方法
println("Sleep 2 starts")
Thread.sleep(30)
println("Sleep 2 ends")
}
輸出(有點(diǎn)長(zhǎng))
Sleep 1 starts
Subscription 1: 0
Subscription 2: 0
Subscription 1: 1
Subscription 2: 1 // 注釋1
Sleep 1 ends // 開(kāi)始 訂閱3
Sleep 2 starts
Subscription 1: 2
Subscription 2: 2
Subscription 3: 2 // 注釋2
Subscription 1: 3
Subscription 2: 3
Subscription 3: 3
Subscription 1: 4
Subscription 2: 4
Subscription 3: 4
Sleep 2 ends
注釋1
到這里我們沒(méi)有開(kāi)始 訂閱3
所以沒(méi)有輸出任何 Subscription 3
注釋2
訂閱3
的輸出是從 2 開(kāi)始的,它錯(cuò)過(guò)了 0 和 1
這一節(jié)到這里就 OK 了,明天說(shuō) Subject
RxKotlin 例子不超過(guò)15行教程 1----環(huán)境配置與初體驗(yàn)
RxKotlin 例子不超過(guò)15行教程 2----Observable Observer 與 Subscribe 簡(jiǎn)介
RxKotlin 例子不超過(guò)15行教程 3----Observable 的創(chuàng)建
RxKotlin 例子不超過(guò)15行教程 4----Observer Subscribe 與 Hot/Cold Observable
RxKotlin 例子不超過(guò)15行教程 5----Subject
RxKotlin 例子不超過(guò)15行教程 6----Operator 與 Marble Diagram
RxKotlin 例子不超過(guò)15行教程 7----Backpressure Flowable 與 Subscriber 簡(jiǎn)介