上一節(jié)中我們說到了 Hot Observable
的一個實現(xiàn) ---- ConnectableObservable
劳吠。這一節(jié)中我們說一說 Hot Observable
的另一種實現(xiàn) ---- Subject
Subject
按照慣例,先來一段能跑的代碼
// 5.1.kt
import io.reactivex.Observable
import io.reactivex.subjects.PublishSubject
import java.util.concurrent.TimeUnit
fun main(args: Array<String>) {
val observable = Observable.interval(10, TimeUnit.MILLISECONDS)
val subject = PublishSubject.create<Long>() // 注釋1
observable.subscribe(subject) // 描點1 Subject 充當 Observer 角色
subject.subscribe({ println("Received $it") }) // 描點2 Subject 充當 Observable 角色
Thread.sleep(60)
}
輸出
Received 0
Received 1
Received 2
Received 3
Received 4
Received 5
注釋1
我們可以用 PublishSubject.create()
來創(chuàng)建 PublishSubject
(PublishSubject
下方介紹)
-
PublishSubject
是Subject
的一種 -
Subject
是Hot Observable
的一種
(這里有空我補一個關(guān)系圖)
Subject
是 Observable
與 Observer
的組合體
-
Observable
的所有操作符
它都有(操作符
會在之后的章節(jié)介紹) - 它也可以像
Observer
一樣接收值 - 左耳朵進右耳朵出(對,我媽經(jīng)常這么說我)阅悍。如果你向它的
Observer
接口傳入值(描點1
), 它會把這些值選擇性地從Observable
接口處彈出去(描點2
)
(PublishSubject
會把所有從Observer
接口傳入的值按照時間順序全部傳出去)
這么做有什么用處,既然我們可以直接從 源Observable
訂閱,為什么要在中間加一層 PublishSubject
? 來看下一個例子
PublishSubject 的作用
// 5.2.kt
import io.reactivex.Observable
import java.util.concurrent.TimeUnit
fun main(args: Array<String>) {
val observable = Observable.interval(100, TimeUnit.MILLISECONDS)
observable.subscribe({ println("Subscription 1 Received $it") })
Thread.sleep(200)
observable.subscribe({ println("Subscription 2 Received $it") })
Thread.sleep(300)
}
輸出
Subscription 1 Received 0
Subscription 1 Received 1
Subscription 1 Received 2
Subscription 2 Received 0 // 注釋1
Subscription 1 Received 3
Subscription 2 Received 1
Subscription 1 Received 4
Subscription 2 Received 2
注釋1
訂閱2
從 0
開始接收消息(因為它訂閱的 observable
是一個 Cold Observable
,所以會從頭發(fā)送)
這里的輸出結(jié)果和下面對比一下
// 5.3.kt
import io.reactivex.Observable
import io.reactivex.subjects.PublishSubject
import java.util.concurrent.TimeUnit
fun main(args: Array<String>) {
val observable = Observable.interval(100, TimeUnit.MILLISECONDS)
val subject = PublishSubject.create<Long>()
observable.subscribe(subject)
subject.subscribe({ println("Subscription 1 Received $it") })
Thread.sleep(300)
subject.subscribe({ println("Subscription 2 Received $it") })
Thread.sleep(200)
}
輸出
Subscription 1 Received 0
Subscription 1 Received 1
Subscription 1 Received 2
Subscription 1 Received 3
Subscription 2 Received 3 // 注釋1
Subscription 1 Received 4
Subscription 2 Received 4
注釋1
訂閱2
從 3
開始接收消息(它錯過了 0
1
2
, 我們說過 Subject
是 Hot Observable
的一種)
在這里,我們通過 PublishSubject
把原來的 Cold
變成了 Hot
(上一節(jié)的 publish
也能實現(xiàn)此功能,只不過得到的是 ConnectableObservable
)
Subject 的各種實現(xiàn)
AsyncSubject
下面這張圖是為了闡述 ReactiveX 原理常用的 Marble Diagram
,我會在明天專門去說 Marble Diagram
如何看(之前我也是各種看不懂,捂臉)
(圖片來自 ReactiveX documentation)
AsyncSubject
會從 源Observable
(Subject
的 Observer
接口傳入值來自 源Observable
) 接收所有值,并把最后一個值從 Observable
接口處彈出去,看一個例子
// 5.4.kt
import io.reactivex.Observable
import io.reactivex.subjects.AsyncSubject
fun main(args: Array<String>) {
val observable = Observable.just(1, 2, 3, 4)
val subject = AsyncSubject.create<Int>()
observable.subscribe(subject)
subject.subscribe(observer)
}
輸出
New Subscription
Next 4
All Completed
我們可以不訂閱任何的 Observable
而直接調(diào)用 Subject
的 onNext
方法(Observer
接口)傳入值(其實上面 Subject
訂閱 Observable
的時候,Subject
會在內(nèi)部對每一個從 Observable
得到的值調(diào)用 onNext
方法)锰什。就像這個例子
// 5.5.kt
import io.reactivex.subjects.AsyncSubject
fun main(args: Array<String>) {
val subject = AsyncSubject.create<Int>()
subject.onNext(1)
subject.onNext(2)
subject.subscribe(observer) // 訂閱1
subject.onNext(3)
subject.subscribe(observer) // 訂閱2
subject.onNext(4)
subject.onComplete()
}
輸出
New Subscription
New Subscription
Next 4 // 訂閱1(我知道你要問為什么不輸出 2 而是 4,下面有解釋)
All Completed
Next 4 // 訂閱2
All Completed
AsyncSubject
當且僅當調(diào)用 onComplete
方法時才會彈出值(和( ConnectableObservable
與 connect
方法的關(guān)系)差不多)
所以 訂閱1
并沒有輸出 Next 2 而是輸出 Next 4浓体。
PublishSubject
PublishSubject
會把所有從 Observer
接口傳入的值按照時間順序全部傳出
(圖片來自 ReactiveX documentation)
BehaviorSubject
把 PublishSubject
與 AsyncSubject
組合在一起差不多就是 BehaviorSubject
。
BehaviorSubject
會彈出訂閱 BehaviorSubject
之前的最后一個值(AsyncSubject
的特性)和訂閱 BehaviorSubject
之后的所有值(PublishSubject
的特性)
// 5.6.kt
import io.reactivex.subjects.BehaviorSubject
fun main(args: Array<String>) {
val subject = BehaviorSubject.create<Int>()
subject.onNext(1)
subject.onNext(2)
subject.subscribe(observer)
subject.onNext(3)
subject.subscribe(observer)
subject.onNext(4)
subject.onComplete()
}
輸出
/*
New Subscription
Next 2 // 訂閱1 獲取到了 `2` 而跳過了 `1`
Next 3 // 訂閱1 獲取到了訂閱之后的值
New Subscription
Next 3 // 訂閱2
Next 4 // 訂閱1
Next 4 // 訂閱2
All Completed
All Completed
*/
ReplaySubject
它和 Cold Observable
的性質(zhì)差不多(我還不知道它有什么用,麻煩哪位同學告訴我,我加在這里,先謝過了)
(圖片來自 ReactiveX documentation)
// 5.7.kt
import io.reactivex.subjects.ReplaySubject
fun main(args: Array<String>) {
val subject = ReplaySubject.create<Int>()
subject.onNext(1)
subject.onNext(2)
subject.subscribe(observer)
subject.onNext(3)
subject.subscribe(observer)
subject.onComplete()
}
輸出
/*
New Subscription
Next 1
Next 2
Next 3
New Subscription
Next 1
Next 2
Next 3
All Completed
All Completed
*/
這節(jié) OK 了,明天我們來一起學習一下看圖(Marble Diagram)識字....
RxKotlin 例子不超過15行教程 1----環(huán)境配置與初體驗
RxKotlin 例子不超過15行教程 2----Observable Observer 與 Subscribe 簡介
RxKotlin 例子不超過15行教程 3----Observable 的創(chuàng)建
RxKotlin 例子不超過15行教程 4----Observer Subscribe 與 Hot/Cold Observable
RxKotlin 例子不超過15行教程 5----Subject
RxKotlin 例子不超過15行教程 6----Operator 與 Marble Diagram
RxKotlin 例子不超過15行教程 7----Backpressure Flowable 與 Subscriber 簡介