RxSwift-02:Subject

1.PublishSubject

        let publishSub = PublishSubject<Int>() 
        publishSub.onNext(1)
        publishSub.subscribe { print("訂閱到了:",$0)}
            .disposed(by: disposbag)
        publishSub.subscribe { print("訂閱到2:",$0)}
            .disposed(by: disposbag)
        publishSub.onNext(2)
        publishSub.onNext(3)

打印結(jié)果是

訂閱到了: next(2)
訂閱到2: next(2)
訂閱到了: next(3)
訂閱到2: next(3)

訂閱后發(fā)送的信號才能被訂閱者收到旬迹,每一次訂閱都會有被保存起來,當(dāng)有信號發(fā)出時,會想每個訂閱者發(fā)送該信號待秃。

1.1訂閱

    public override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        self._lock.lock()
        let subscription = self._synchronized_subscribe(observer)
        self._lock.unlock()
        return subscription
    }
    func _synchronized_subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
        if let stoppedEvent = self._stoppedEvent {
            observer.on(stoppedEvent)
            return Disposables.create()
        }
        
        if self._isDisposed {
            observer.on(.error(RxError.disposed(object: self)))
            return Disposables.create()
        }
        
        let key = self._observers.insert(observer.on)
        return SubscriptionDisposable(owner: self, key: key)
    }

Subject被訂閱時,會保存到容器中self._observers痹屹。

1.2.保存訂閱者的容器self._observers 本質(zhì)為Bag

插入訂閱者

 mutating func insert(_ element: T) -> BagKey {
        let key = _nextKey

        _nextKey = BagKey(rawValue: _nextKey.rawValue &+ 1)

        if _key0 == nil {
            _key0 = key
            _value0 = element
            return key
        }

        _onlyFastPath = false

        if _dictionary != nil {
            _dictionary![key] = element
            return key
        }

        if _pairs.count < arrayDictionaryMaxSize {
            _pairs.append((key: key, value: element))
            return key
        }
        
        _dictionary = [key: element]
        
        return key
    }

1.第1個訂閱者章郁,用屬性保存_key0,_value0
2.第2個到第訂31閱者保存到_pairs數(shù)組中
3.大于31個訂閱者則保存到dictionary中

1.3.可觀察者發(fā)送信號

    public func on(_ event: Event<Element>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        dispatch(self._synchronized_on(event), event)
    }
    func _synchronized_on(_ event: Event<E>) -> Observers {
        self._lock.lock(); defer { self._lock.unlock() }
        switch event {
        case .next:
            if self._isDisposed || self._stopped {
                return Observers()
            }
            
            return self._observers
        case .completed, .error:
            if self._stoppedEvent == nil {
                self._stoppedEvent = event
                self._stopped = true
                let observers = self._observers
                self._observers.removeAll()
                return observers
            }

            return Observers()
        }
    }
func dispatch<E>(_ bag: Bag<(Event<E>) -> Void>, _ event: Event<E>) {
    bag._value0?(event)

    if bag._onlyFastPath {
        return
    }

    let pairs = bag._pairs
    for i in 0 ..< pairs.count {
        pairs[i].value(event)
    }

    if let dictionary = bag._dictionary {
        for element in dictionary.values {
            element(event)
        }
    }
}

_synchronized_on返回訂閱者序列志衍,也就是保存訂閱者的容器bag暖庄。
dispatch想各個訂閱者發(fā)送信號。發(fā)送的順序先value0楼肪,再_pairs數(shù)組培廓,最后Dictionary

2.BehaviorSubject

     let behaviorSub = BehaviorSubject.init(value: 100)
        // 2:發(fā)送信號
        behaviorSub.onNext(2)
        behaviorSub.onNext(3)
        // 3:訂閱序列
        behaviorSub.subscribe{ print("訂閱到1:",$0)}
            .disposed(by: disposbag)
        // 再次發(fā)送
        behaviorSub.onNext(4)
        behaviorSub.onNext(5)
        // 再次訂閱
        behaviorSub.subscribe{ print("訂閱到2:",$0)}
            .disposed(by: disposbag)
//打印結(jié)果
訂閱到1: next(3)
訂閱到1: next(4)
訂閱到1: next(5)
訂閱到2: next(5)

BehaviorSubject和PublishSubject差不多,唯一區(qū)別當(dāng)BehaviorSubject被訂閱者春叫,訂閱者會收到訂閱者之前BehaviorSubject發(fā)送的最后的一個信號肩钠。

    func _synchronized_subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
        if self._isDisposed {
            observer.on(.error(RxError.disposed(object: self)))
            return Disposables.create()
        }
        
        if let stoppedEvent = self._stoppedEvent {
            observer.on(stoppedEvent)
            return Disposables.create()
        }
        
        let key = self._observers.insert(observer.on)
        observer.on(.next(self._element))
    
        return SubscriptionDisposable(owner: self, key: key)
    }

在_synchronized_subscribe中俘侠,我們可以看到observer.on(.next(self._element))。
self._element是發(fā)送信號的最新的值蔬将。

3.ReplaySubject

    let replaySub = ReplaySubject<Int>.create(bufferSize: 2)
        // let replaySub = ReplaySubject<Int>.createUnbounded()

        // 2:發(fā)送信號
        replaySub.onNext(1)
        replaySub.onNext(2)
        replaySub.onNext(3)
        replaySub.onNext(4)

        // 3:訂閱序列
        replaySub.subscribe{ print("訂閱者1:",$0)}
            .disposed(by: disposbag)
        // 再次發(fā)送
        replaySub.onNext(7)
        replaySub.onNext(8)
        replaySub.onNext(9)
        replaySub.subscribe{ print("訂閱者2:",$0)}
            .disposed(by: disposbag)
//打印結(jié)果
訂閱者1: next(3)
訂閱者1: next(4)
訂閱者1: next(7)
訂閱者1: next(8)
訂閱者1: next(9)
訂閱者2: next(8)
訂閱者2: next(9)

有一個queue保存發(fā)送的信號爷速。

4.AsyncSubject

      let asynSub = AsyncSubject<Int>.init()
        // 2:發(fā)送信號
        asynSub.onNext(1)
        asynSub.onNext(2)
        // 3:訂閱序列
        asynSub.subscribe{ print("訂閱到了:",$0)}
            .disposed(by: disposbag)
        // 再次發(fā)送
        asynSub.onNext(3)
        asynSub.onNext(4)
        asynSub.onCompleted()
        asynSub.subscribe{ print("訂閱到2:",$0)}
            .disposed(by: disposbag)
//打印日志
訂閱到了: next(4)
訂閱到了: completed
訂閱到2: next(4)
訂閱到2: completed

為什么呢?我們看源碼分析一下

 public func on(_ event: Event<E>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        let (observers, event) = self._synchronized_on(event)
        switch event {
        case .next:
            dispatch(observers, event)
            dispatch(observers, .completed)
        case .completed:
            dispatch(observers, event)
        case .error:
            dispatch(observers, event)
        }
    }

    func _synchronized_on(_ event: Event<E>) -> (Observers, Event<E>) {
        self._lock.lock(); defer { self._lock.unlock() }
        if self._isStopped {
            return (Observers(), .completed)
        }
        switch event {
        case .next(let element):
            self._lastElement = element
            return (Observers(), .completed)
        case .error:
            self._stoppedEvent = event
            let observers = self._observers
            self._observers.removeAll()
            return (observers, event)
        case .completed:
            let observers = self._observers
            self._observers.removeAll()
            if let lastElement = self._lastElement {
                self._stoppedEvent = .next(lastElement)
                return (observers, .next(lastElement))
            }
            else {
                self._stoppedEvent = event
                return (observers, .completed)
            }
        }
    }

我們看一下_synchronized_on霞怀,我們看switch event
.next惫东,用self._lastElement記錄傳來的數(shù)據(jù),但是在return觀察者是重新初始化的毙石。所以next事件只記錄數(shù)據(jù)廉沮,不像觀察者發(fā)信號。
completed事件時徐矩,先用一臨時變量記錄觀察者滞时,把我們保存的觀察者remove調(diào),保存_stoppedEvent滤灯,并向觀察者發(fā)送信號坪稽。如果重新訂閱subject,則會收到最后一個最后一個信號和完成鳞骤。
error事件窒百,記錄_stoppedEvent,并向觀察者分發(fā)豫尽。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末篙梢,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子美旧,更是在濱河造成了極大的恐慌渤滞,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,311評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件榴嗅,死亡現(xiàn)場離奇詭異妄呕,居然都是意外死亡,警方通過查閱死者的電腦和手機录肯,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,339評論 2 382
  • 文/潘曉璐 我一進店門趴腋,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人论咏,你說我怎么就攤上這事优炬。” “怎么了厅贪?”我有些...
    開封第一講書人閱讀 152,671評論 0 342
  • 文/不壞的土叔 我叫張陵蠢护,是天一觀的道長。 經(jīng)常有香客問我养涮,道長葵硕,這世上最難降的妖魔是什么眉抬? 我笑而不...
    開封第一講書人閱讀 55,252評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮懈凹,結(jié)果婚禮上蜀变,老公的妹妹穿的比我還像新娘。我一直安慰自己介评,他們只是感情好库北,可當(dāng)我...
    茶點故事閱讀 64,253評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著们陆,像睡著了一般寒瓦。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上坪仇,一...
    開封第一講書人閱讀 49,031評論 1 285
  • 那天杂腰,我揣著相機與錄音,去河邊找鬼椅文。 笑死喂很,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的雾袱。 我是一名探鬼主播恤筛,決...
    沈念sama閱讀 38,340評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼芹橡!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起望伦,我...
    開封第一講書人閱讀 36,973評論 0 259
  • 序言:老撾萬榮一對情侶失蹤林说,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后屯伞,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體腿箩,經(jīng)...
    沈念sama閱讀 43,466評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,937評論 2 323
  • 正文 我和宋清朗相戀三年劣摇,在試婚紗的時候發(fā)現(xiàn)自己被綠了珠移。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,039評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡末融,死狀恐怖钧惧,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情勾习,我是刑警寧澤浓瞪,帶...
    沈念sama閱讀 33,701評論 4 323
  • 正文 年R本政府宣布,位于F島的核電站巧婶,受9級特大地震影響乾颁,放射性物質(zhì)發(fā)生泄漏涂乌。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,254評論 3 307
  • 文/蒙蒙 一英岭、第九天 我趴在偏房一處隱蔽的房頂上張望湾盒。 院中可真熱鬧,春花似錦诅妹、人聲如沸罚勾。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,259評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽荧库。三九已至,卻和暖如春赵刑,著一層夾襖步出監(jiān)牢的瞬間分衫,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工般此, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留蚪战,地道東北人。 一個月前我還...
    沈念sama閱讀 45,497評論 2 354
  • 正文 我出身青樓铐懊,卻偏偏與公主長得像邀桑,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子科乎,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,786評論 2 345

推薦閱讀更多精彩內(nèi)容