1.PublishSubject
let publishSub = PublishSubject<Int>()
publishSub.onNext(1)
publishSub.subscribe { print("訂閱到了:",$0)}
.disposed(by: disposbag)
publishSub.subscribe { print("訂閱到2:",$0)}
.disposed(by: disposbag)
publishSub.onNext(2)
publishSub.onNext(3)
打印結(jié)果是
訂閱到了: next(2)
訂閱到2: next(2)
訂閱到了: next(3)
訂閱到2: next(3)
訂閱后發(fā)送的信號才能被訂閱者收到旬迹,每一次訂閱都會有被保存起來,當(dāng)有信號發(fā)出時,會想每個訂閱者發(fā)送該信號待秃。
1.1訂閱
public override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
self._lock.lock()
let subscription = self._synchronized_subscribe(observer)
self._lock.unlock()
return subscription
}
func _synchronized_subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
if let stoppedEvent = self._stoppedEvent {
observer.on(stoppedEvent)
return Disposables.create()
}
if self._isDisposed {
observer.on(.error(RxError.disposed(object: self)))
return Disposables.create()
}
let key = self._observers.insert(observer.on)
return SubscriptionDisposable(owner: self, key: key)
}
Subject被訂閱時,會保存到容器中self._observers痹屹。
1.2.保存訂閱者的容器self._observers 本質(zhì)為Bag
插入訂閱者
mutating func insert(_ element: T) -> BagKey {
let key = _nextKey
_nextKey = BagKey(rawValue: _nextKey.rawValue &+ 1)
if _key0 == nil {
_key0 = key
_value0 = element
return key
}
_onlyFastPath = false
if _dictionary != nil {
_dictionary![key] = element
return key
}
if _pairs.count < arrayDictionaryMaxSize {
_pairs.append((key: key, value: element))
return key
}
_dictionary = [key: element]
return key
}
1.第1個訂閱者章郁,用屬性保存_key0,_value0
2.第2個到第訂31閱者保存到_pairs數(shù)組中
3.大于31個訂閱者則保存到dictionary中
1.3.可觀察者發(fā)送信號
public func on(_ event: Event<Element>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
dispatch(self._synchronized_on(event), event)
}
func _synchronized_on(_ event: Event<E>) -> Observers {
self._lock.lock(); defer { self._lock.unlock() }
switch event {
case .next:
if self._isDisposed || self._stopped {
return Observers()
}
return self._observers
case .completed, .error:
if self._stoppedEvent == nil {
self._stoppedEvent = event
self._stopped = true
let observers = self._observers
self._observers.removeAll()
return observers
}
return Observers()
}
}
func dispatch<E>(_ bag: Bag<(Event<E>) -> Void>, _ event: Event<E>) {
bag._value0?(event)
if bag._onlyFastPath {
return
}
let pairs = bag._pairs
for i in 0 ..< pairs.count {
pairs[i].value(event)
}
if let dictionary = bag._dictionary {
for element in dictionary.values {
element(event)
}
}
}
_synchronized_on返回訂閱者序列志衍,也就是保存訂閱者的容器bag暖庄。
dispatch想各個訂閱者發(fā)送信號。發(fā)送的順序先value0楼肪,再_pairs數(shù)組培廓,最后Dictionary
2.BehaviorSubject
let behaviorSub = BehaviorSubject.init(value: 100)
// 2:發(fā)送信號
behaviorSub.onNext(2)
behaviorSub.onNext(3)
// 3:訂閱序列
behaviorSub.subscribe{ print("訂閱到1:",$0)}
.disposed(by: disposbag)
// 再次發(fā)送
behaviorSub.onNext(4)
behaviorSub.onNext(5)
// 再次訂閱
behaviorSub.subscribe{ print("訂閱到2:",$0)}
.disposed(by: disposbag)
//打印結(jié)果
訂閱到1: next(3)
訂閱到1: next(4)
訂閱到1: next(5)
訂閱到2: next(5)
BehaviorSubject和PublishSubject差不多,唯一區(qū)別當(dāng)BehaviorSubject被訂閱者春叫,訂閱者會收到訂閱者之前BehaviorSubject發(fā)送的最后的一個信號肩钠。
func _synchronized_subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
if self._isDisposed {
observer.on(.error(RxError.disposed(object: self)))
return Disposables.create()
}
if let stoppedEvent = self._stoppedEvent {
observer.on(stoppedEvent)
return Disposables.create()
}
let key = self._observers.insert(observer.on)
observer.on(.next(self._element))
return SubscriptionDisposable(owner: self, key: key)
}
在_synchronized_subscribe中俘侠,我們可以看到observer.on(.next(self._element))。
self._element是發(fā)送信號的最新的值蔬将。
3.ReplaySubject
let replaySub = ReplaySubject<Int>.create(bufferSize: 2)
// let replaySub = ReplaySubject<Int>.createUnbounded()
// 2:發(fā)送信號
replaySub.onNext(1)
replaySub.onNext(2)
replaySub.onNext(3)
replaySub.onNext(4)
// 3:訂閱序列
replaySub.subscribe{ print("訂閱者1:",$0)}
.disposed(by: disposbag)
// 再次發(fā)送
replaySub.onNext(7)
replaySub.onNext(8)
replaySub.onNext(9)
replaySub.subscribe{ print("訂閱者2:",$0)}
.disposed(by: disposbag)
//打印結(jié)果
訂閱者1: next(3)
訂閱者1: next(4)
訂閱者1: next(7)
訂閱者1: next(8)
訂閱者1: next(9)
訂閱者2: next(8)
訂閱者2: next(9)
有一個queue保存發(fā)送的信號爷速。
4.AsyncSubject
let asynSub = AsyncSubject<Int>.init()
// 2:發(fā)送信號
asynSub.onNext(1)
asynSub.onNext(2)
// 3:訂閱序列
asynSub.subscribe{ print("訂閱到了:",$0)}
.disposed(by: disposbag)
// 再次發(fā)送
asynSub.onNext(3)
asynSub.onNext(4)
asynSub.onCompleted()
asynSub.subscribe{ print("訂閱到2:",$0)}
.disposed(by: disposbag)
//打印日志
訂閱到了: next(4)
訂閱到了: completed
訂閱到2: next(4)
訂閱到2: completed
為什么呢?我們看源碼分析一下
public func on(_ event: Event<E>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
let (observers, event) = self._synchronized_on(event)
switch event {
case .next:
dispatch(observers, event)
dispatch(observers, .completed)
case .completed:
dispatch(observers, event)
case .error:
dispatch(observers, event)
}
}
func _synchronized_on(_ event: Event<E>) -> (Observers, Event<E>) {
self._lock.lock(); defer { self._lock.unlock() }
if self._isStopped {
return (Observers(), .completed)
}
switch event {
case .next(let element):
self._lastElement = element
return (Observers(), .completed)
case .error:
self._stoppedEvent = event
let observers = self._observers
self._observers.removeAll()
return (observers, event)
case .completed:
let observers = self._observers
self._observers.removeAll()
if let lastElement = self._lastElement {
self._stoppedEvent = .next(lastElement)
return (observers, .next(lastElement))
}
else {
self._stoppedEvent = event
return (observers, .completed)
}
}
}
我們看一下_synchronized_on霞怀,我們看switch event
.next惫东,用self._lastElement記錄傳來的數(shù)據(jù),但是在return觀察者是重新初始化的毙石。所以next事件只記錄數(shù)據(jù)廉沮,不像觀察者發(fā)信號。
completed事件時徐矩,先用一臨時變量記錄觀察者滞时,把我們保存的觀察者remove調(diào),保存_stoppedEvent滤灯,并向觀察者發(fā)送信號坪稽。如果重新訂閱subject,則會收到最后一個最后一個信號和完成鳞骤。
error事件窒百,記錄_stoppedEvent,并向觀察者分發(fā)豫尽。