PublishSubject繼承類和協(xié)議
public final class PublishSubject<Element>
: Observable<Element>
, SubjectType
, Cancelable
, ObserverType
, SynchronizedUnsubscribeType
Observable
: 被觀察者Cancelable
: 可以被dispose
SynchronizedUnsubscribeType
可以取消訂閱ObserverType
: 觀察者-
SubjectType
:asObserver
subscribe
這個協(xié)議很好的描述對象即可以作為Observer
,也可以作為 Observerable
的特性
Observers Type
要理解PublishSubject
的核心朝抖,得先理解Observers
這個類型
typealias Observers = AnyObserver<Element>.s
extension AnyObserver {
/// Collection of `AnyObserver`s
typealias s = Bag<(Event<Element>) -> ()>
}
about Bag
Bag
: 是一個自定義容器般妙,類似于Dictionary
饭望, 之所以自定義實現(xiàn)主要是出于性能考慮蜗字,有興趣的可以研究下蛤铜,這里先留個坑
總的來說Observers 是一個 Bag 容器, 元素的類型為 (Event<Element>) -> ()
訂閱
public override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
_lock.lock()
let subscription = _synchronized_subscribe(observer)
_lock.unlock()
return subscription
}
func _synchronized_subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == E {
if let stoppedEvent = _stoppedEvent {
observer.on(stoppedEvent)
return Disposables.create()
}
if _isDisposed {
observer.on(.error(RxError.disposed(object: self)))
return Disposables.create()
}
let key = _observers.insert(observer.on)
return SubscriptionDisposable(owner: self, key: key)
}
一個同步鎖婆殿, 幾個異常處理诈乒, 剩下的就是核心代碼:
let key = _observers.insert(observer.on)
: 將 Observer.on
函數(shù)塞入Bag
容器
return SubscriptionDisposable(owner: self, key: key)
返回SubscriptionDisposable
struct SubscriptionDisposable<T: SynchronizedUnsubscribeType> : Disposable {
private let _key: T.DisposeKey
private weak var _owner: T?
init(owner: T, key: T.DisposeKey) {
_owner = owner
_key = key
}
func dispose() {
_owner?.synchronizedUnsubscribe(_key)
}
}
SubscriptionDisposable
功能比較簡單罩扇,只是對其進行簡單的封裝婆芦,那么這里為什么不讓PublishSubject
直接支持Disposable
協(xié)議呢怕磨?這是為了保證設(shè)計的一致性,只有被訂閱的Observable
才能被dispose
, 如果讓PublishSubject
直接支持Disposable
協(xié)議就有損這個設(shè)計原則消约,固這里選擇返回SubscriptionDisposable
on 事件
public func on(_ event: Event<Element>) {
#if DEBUG
_synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { _synchronizationTracker.unregister() }
#endif
dispatch(_synchronized_on(event), event)
}
func _synchronized_on(_ event: Event<E>) -> Observers {
_lock.lock(); defer { _lock.unlock() }
switch event {
case .next(_):
if _isDisposed || _stopped {
return Observers()
}
return _observers
case .completed, .error:
if _stoppedEvent == nil {
_stoppedEvent = event
_stopped = true
let observers = _observers
_observers.removeAll()
return observers
}
return Observers()
}
}
@inline(__always)
func dispatch<E>(_ bag: Bag<(Event<E>) -> ()>, _ event: Event<E>) {
if bag._onlyFastPath {
bag._value0?(event)
return
}
let value0 = bag._value0
let dictionary = bag._dictionary
if let value0 = value0 {
value0(event)
}
let pairs = bag._pairs
for i in 0 ..< pairs.count {
pairs[i].value(event)
}
if let dictionary = dictionary {
for element in dictionary.values {
element(event)
}
}
}
這段邏輯復(fù)雜一點肠鲫,先講一下Bag容器
- 數(shù)據(jù)量為1 的時候 使用,直接使用
value0
存儲元素 - 數(shù)據(jù)量在(1,30)的時候或粮, 使用
ContiguousArray
存儲元素导饲,value0
仍然存儲0號元素 - 超過30的數(shù)據(jù),使用
Dictionary
存儲超出的元素
理解了這些就明白dispatch
干了些啥了氯材, 簡單說就是
Bag.foreach { $0(event) }
一個PublishSubject
可以被多次訂閱渣锦,每次訂閱的時候?qū)?code>Observer.on 塞入Bag
容器,在執(zhí)行PublishSubject.on
事件的時候氢哮, 會執(zhí)行所有Observer
的on
事件袋毙。
Unsubscribe
func synchronizedUnsubscribe(_ disposeKey: DisposeKey) {
_lock.lock()
_synchronized_unsubscribe(disposeKey)
_lock.unlock()
}
func _synchronized_unsubscribe(_ disposeKey: DisposeKey) {
_ = _observers.removeKey(disposeKey)
}
這個就很簡單,unsubscribe
的時候冗尤,將Observer
從Bag
容器中移除就可以听盖。