RxKotlin 例子不超過15行教程 5----Subject

上一節(jié)中我們說到了 Hot Observable 的一個實現(xiàn) ---- ConnectableObservable劳吠。這一節(jié)中我們說一說 Hot Observable 的另一種實現(xiàn) ---- Subject

Subject

按照慣例,先來一段能跑的代碼

// 5.1.kt
import io.reactivex.Observable
import io.reactivex.subjects.PublishSubject
import java.util.concurrent.TimeUnit

fun main(args: Array<String>) {
    val observable = Observable.interval(10, TimeUnit.MILLISECONDS)
    val subject = PublishSubject.create<Long>()  // 注釋1

    observable.subscribe(subject)  // 描點1 Subject 充當 Observer 角色
    subject.subscribe({ println("Received $it") })  // 描點2 Subject 充當 Observable 角色
    Thread.sleep(60)
}

輸出

Received 0
Received 1
Received 2
Received 3
Received 4
Received 5

注釋1
我們可以用 PublishSubject.create() 來創(chuàng)建 PublishSubject (PublishSubject 下方介紹)

  • PublishSubjectSubject 的一種
  • SubjectHot Observable 的一種

(這里有空我補一個關(guān)系圖)

SubjectObservableObserver 的組合體

  • Observable 的所有 操作符 它都有(操作符 會在之后的章節(jié)介紹)
  • 它也可以像 Observer 一樣接收值
  • 左耳朵進右耳朵出(對,我媽經(jīng)常這么說我)阅悍。如果你向它的 Observer 接口傳入值(描點1), 它會把這些值選擇性地Observable 接口處彈出去(描點2)
    (PublishSubject 會把所有從 Observer 接口傳入的值按照時間順序全部傳出去)

這么做有什么用處,既然我們可以直接從 源Observable 訂閱,為什么要在中間加一層 PublishSubject? 來看下一個例子

PublishSubject 的作用

// 5.2.kt
import io.reactivex.Observable
import java.util.concurrent.TimeUnit

fun main(args: Array<String>) {
    val observable = Observable.interval(100, TimeUnit.MILLISECONDS)

    observable.subscribe({ println("Subscription 1 Received $it") })
    Thread.sleep(200)
    observable.subscribe({ println("Subscription 2 Received $it") })
    Thread.sleep(300)
}

輸出

Subscription 1 Received 0
Subscription 1 Received 1
Subscription 1 Received 2
Subscription 2 Received 0  // 注釋1
Subscription 1 Received 3
Subscription 2 Received 1
Subscription 1 Received 4
Subscription 2 Received 2

注釋1
訂閱20 開始接收消息(因為它訂閱的 observable 是一個 Cold Observable,所以會從頭發(fā)送)
這里的輸出結(jié)果和下面對比一下

// 5.3.kt
import io.reactivex.Observable
import io.reactivex.subjects.PublishSubject
import java.util.concurrent.TimeUnit

fun main(args: Array<String>) {
    val observable = Observable.interval(100, TimeUnit.MILLISECONDS)
    val subject = PublishSubject.create<Long>()

    observable.subscribe(subject)

    subject.subscribe({ println("Subscription 1 Received $it") })
    Thread.sleep(300)
    subject.subscribe({ println("Subscription 2 Received $it") })
    Thread.sleep(200)
}

輸出

Subscription 1 Received 0
Subscription 1 Received 1
Subscription 1 Received 2
Subscription 1 Received 3
Subscription 2 Received 3  // 注釋1
Subscription 1 Received 4
Subscription 2 Received 4

注釋1
訂閱23 開始接收消息(它錯過了 0 1 2, 我們說過 SubjectHot Observable 的一種)
在這里,我們通過 PublishSubject 把原來的 Cold 變成了 Hot(上一節(jié)的 publish 也能實現(xiàn)此功能,只不過得到的是 ConnectableObservable)

Subject 的各種實現(xiàn)

AsyncSubject

下面這張圖是為了闡述 ReactiveX 原理常用的 Marble Diagram ,我會在明天專門去說 Marble Diagram 如何看(之前我也是各種看不懂,捂臉)

AsyncSubject

(圖片來自 ReactiveX documentation)
AsyncSubject 會從 源Observable(SubjectObserver 接口傳入值來自 源Observable) 接收所有值,并把最后一個值Observable 接口處彈出去,看一個例子

// 5.4.kt
import io.reactivex.Observable
import io.reactivex.subjects.AsyncSubject

fun main(args: Array<String>) {
    val observable = Observable.just(1, 2, 3, 4)
    val subject = AsyncSubject.create<Int>()
    observable.subscribe(subject)
    subject.subscribe(observer)
}

輸出

New Subscription
Next 4
All Completed

我們可以不訂閱任何的 Observable 而直接調(diào)用 SubjectonNext 方法(Observer 接口)傳入值(其實上面 Subject 訂閱 Observable 的時候,Subject 會在內(nèi)部對每一個從 Observable 得到的值調(diào)用 onNext 方法)锰什。就像這個例子

// 5.5.kt
import io.reactivex.subjects.AsyncSubject

fun main(args: Array<String>) {
    val subject = AsyncSubject.create<Int>()
    subject.onNext(1)
    subject.onNext(2)
    subject.subscribe(observer)  // 訂閱1
    subject.onNext(3)
    subject.subscribe(observer)  // 訂閱2
    subject.onNext(4)
    subject.onComplete()
}

輸出

New Subscription
New Subscription
Next 4  // 訂閱1(我知道你要問為什么不輸出 2 而是 4,下面有解釋)
All Completed
Next 4  // 訂閱2
All Completed

AsyncSubject 當且僅當調(diào)用 onComplete 方法時才會彈出值(和( ConnectableObservableconnect 方法的關(guān)系)差不多)
所以 訂閱1 并沒有輸出 Next 2 而是輸出 Next 4浓体。

PublishSubject

PublishSubject 會把所有從 Observer 接口傳入的值按照時間順序全部傳出

PublishSubject

(圖片來自 ReactiveX documentation)

BehaviorSubject

PublishSubjectAsyncSubject 組合在一起差不多就是 BehaviorSubject
BehaviorSubject 會彈出訂閱 BehaviorSubject 之前的最后一個值(AsyncSubject 的特性)和訂閱 BehaviorSubject 之后的所有值(PublishSubject 的特性)

// 5.6.kt
import io.reactivex.subjects.BehaviorSubject

fun main(args: Array<String>) {
    val subject = BehaviorSubject.create<Int>()
    subject.onNext(1)
    subject.onNext(2)
    subject.subscribe(observer)
    subject.onNext(3)
    subject.subscribe(observer)
    subject.onNext(4)
    subject.onComplete()
}

輸出

/*
New Subscription
Next 2  // 訂閱1 獲取到了 `2`  而跳過了 `1`
Next 3  // 訂閱1 獲取到了訂閱之后的值
New Subscription
Next 3  // 訂閱2
Next 4  // 訂閱1
Next 4  // 訂閱2
All Completed
All Completed
 */

ReplaySubject

它和 Cold Observable 的性質(zhì)差不多(我還不知道它有什么用,麻煩哪位同學告訴我,我加在這里,先謝過了)

ReplaySubject

(圖片來自 ReactiveX documentation)

// 5.7.kt
import io.reactivex.subjects.ReplaySubject

fun main(args: Array<String>) {
    val subject = ReplaySubject.create<Int>()
    subject.onNext(1)
    subject.onNext(2)
    subject.subscribe(observer)
    subject.onNext(3)
    subject.subscribe(observer)
    subject.onComplete()
}

輸出

/*
New Subscription
Next 1
Next 2
Next 3
New Subscription
Next 1
Next 2
Next 3
All Completed
All Completed
 */

這節(jié) OK 了,明天我們來一起學習一下看圖(Marble Diagram)識字....

RxKotlin 例子不超過15行教程 1----環(huán)境配置與初體驗

RxKotlin 例子不超過15行教程 2----Observable Observer 與 Subscribe 簡介

RxKotlin 例子不超過15行教程 3----Observable 的創(chuàng)建

RxKotlin 例子不超過15行教程 4----Observer Subscribe 與 Hot/Cold Observable

RxKotlin 例子不超過15行教程 5----Subject

RxKotlin 例子不超過15行教程 6----Operator 與 Marble Diagram

RxKotlin 例子不超過15行教程 7----Backpressure Flowable 與 Subscriber 簡介

RxKotlin 例子不超過15行教程 8----Error Handling

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末桶唐,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子频祝,更是在濱河造成了極大的恐慌泌参,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,464評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件常空,死亡現(xiàn)場離奇詭異及舍,居然都是意外死亡,警方通過查閱死者的電腦和手機窟绷,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,033評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來咐柜,“玉大人兼蜈,你說我怎么就攤上這事∽居眩” “怎么了为狸?”我有些...
    開封第一講書人閱讀 169,078評論 0 362
  • 文/不壞的土叔 我叫張陵,是天一觀的道長遗契。 經(jīng)常有香客問我辐棒,道長,這世上最難降的妖魔是什么牍蜂? 我笑而不...
    開封第一講書人閱讀 59,979評論 1 299
  • 正文 為了忘掉前任漾根,我火速辦了婚禮,結(jié)果婚禮上鲫竞,老公的妹妹穿的比我還像新娘辐怕。我一直安慰自己,他們只是感情好从绘,可當我...
    茶點故事閱讀 69,001評論 6 398
  • 文/花漫 我一把揭開白布寄疏。 她就那樣靜靜地躺著,像睡著了一般僵井。 火紅的嫁衣襯著肌膚如雪陕截。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,584評論 1 312
  • 那天批什,我揣著相機與錄音农曲,去河邊找鬼。 笑死渊季,一個胖子當著我的面吹牛朋蔫,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播却汉,決...
    沈念sama閱讀 41,085評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼驯妄,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了合砂?” 一聲冷哼從身側(cè)響起青扔,我...
    開封第一講書人閱讀 40,023評論 0 277
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后微猖,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體谈息,經(jīng)...
    沈念sama閱讀 46,555評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,626評論 3 342
  • 正文 我和宋清朗相戀三年凛剥,在試婚紗的時候發(fā)現(xiàn)自己被綠了侠仇。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,769評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡犁珠,死狀恐怖逻炊,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情犁享,我是刑警寧澤余素,帶...
    沈念sama閱讀 36,439評論 5 351
  • 正文 年R本政府宣布,位于F島的核電站炊昆,受9級特大地震影響桨吊,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜凤巨,卻給世界環(huán)境...
    茶點故事閱讀 42,115評論 3 335
  • 文/蒙蒙 一视乐、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧磅甩,春花似錦炊林、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,601評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至僧叉,卻和暖如春奕枝,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背瓶堕。 一陣腳步聲響...
    開封第一講書人閱讀 33,702評論 1 274
  • 我被黑心中介騙來泰國打工隘道, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人郎笆。 一個月前我還...
    沈念sama閱讀 49,191評論 3 378
  • 正文 我出身青樓谭梗,卻偏偏與公主長得像,于是被迫代替她去往敵國和親宛蚓。 傳聞我的和親對象是個殘疾皇子激捏,可洞房花燭夜當晚...
    茶點故事閱讀 45,781評論 2 361

推薦閱讀更多精彩內(nèi)容