今天帶大家解讀下Subject相關(guān)類的源碼陵珍。
在我們之前講過的類中寝杖,有些類是觀察者,有些是被觀察者互纯,今天要說的Subject比較特殊瑟幕,它既是觀察者,又是被觀察者留潦,兼具兩者的特性只盹。
Subject相關(guān)類包括:PublishSubject, BehaviorSubject, AsyncSubject, ReplaySubject。
還是先了解基本用法兔院,請看下面的例子:
private let bag = DisposeBag()
private let subject = PublishSubject<Int>()
override func viewDidLoad() {
super.viewDidLoad()
subject.onNext(1)
subject.subscribe(onNext: { ele in
print("First ->",ele)
})
.disposed(by: bag)
subject.onNext(2)
subject.subscribe(onNext: { ele in
print("Second ->",ele)
})
.disposed(by: bag)
subject.onNext(3)
}
先定義一個subject = PublishSubject<Int>()
, subject既可以發(fā)送序列殖卑,也可以訂閱序列,它先后發(fā)送1秆乳,2懦鼠,3,同時訂閱兩次屹堰,打印結(jié)果為:
First -> 2
First -> 3
Second -> 3
我們看到1沒有打印肛冶,2打印一次 3打印兩次,為什么呢扯键?
PublishSubject只會接受訂閱之后的元素睦袖,1在訂閱之前發(fā)送,所以無法接受到元素荣刑。2在第一次訂閱之后發(fā)送馅笙,所以收到第一次訂閱的元素,3在兩次訂閱之后發(fā)送厉亏,所以兩次訂閱都接受到元素董习;我們通過源碼分析下內(nèi)部原理:
PublishSubject
因為Subject既是觀察者又是被觀察者,所以我們看看它的類的定義:
public final class PublishSubject<Element>
: Observable<Element>
, SubjectType
, Cancelable
, ObserverType
, SynchronizedUnsubscribeType
其他3種Subject也是一樣:首先繼承了Observable爱只,然后遵循了ObserverType皿淋,同時又遵循Cancelable,所以不是兩者是三者兼具恬试。另外還遵循了SubjectType窝趣,SubjectType繼承了ObservableType,它有個協(xié)議方法asObserver(), 返回一個ObserverType, 一般返回自身對象:
/// Returns observer interface for subject.
public func asObserver() -> PublishSubject<Element> {
self
}
PublishSubject作為Observer训柴,實現(xiàn)了on協(xié)議方法, 我們知道當observer調(diào)用onNext等方法時哑舒,會調(diào)用on方法:
/// Notifies all subscribed observers about next event.
///
/// - parameter event: Event to send to the observers.
public func on(_ event: Event<Element>) {
#if DEBUG
self.synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self.synchronizationTracker.unregister() }
#endif
dispatch(self.synchronized_on(event), event)
}
synchronizationTracker.register
和 synchronizationTracker.unregister
, 這兩個是為了保證不會出現(xiàn)重入問題,即在當前on方法執(zhí)行完成前幻馁,又執(zhí)行了on方法洗鸵,這會導(dǎo)致循環(huán)調(diào)用。
然后調(diào)用了synchronized_on
:
func synchronized_on(_ event: Event<Element>) -> Observers {
self.lock.lock(); defer { self.lock.unlock() }
switch event {
case .next:
if self.isDisposed || self.stopped {
return Observers()
}
return self.observers
case .completed, .error:
if self.stoppedEvent == nil {
self.stoppedEvent = event
self.stopped = true
let observers = self.observers
self.observers.removeAll()
return observers
}
return Observers()
}
}
這里是線程安全的宣赔,而且用的遞歸鎖實現(xiàn)的预麸,我想可能是為了保證鎖能嵌套,因為可以會出現(xiàn)方法嵌套調(diào)用儒将。
如果未釋放資源吏祸,或者未停止,則直接返回一個Observers()
钩蚊,否則返回當前observers贡翘,這個Observers
是重點,它其實是一個Bag:Bag<(Event<Element>) -> Void>
砰逻, item類型是一個閉包鸣驱。這個閉包是發(fā)送元素的閉包,Bag內(nèi)部保存了這些閉包蝠咆。Bag的內(nèi)部實現(xiàn)其實是這樣的:將第一個元素放在_key0 和 _value0中踊东,key是通過自增的方式產(chǎn)生唯一的key北滥。如果超過一個元素 則將其他元素存放在數(shù)組中,數(shù)組最大空間為30闸翅,超過30個元素則將多余的放在字典里再芋,所以這個bag內(nèi)部有一個數(shù)組和一個字典,這樣設(shè)計的好處是當元素個數(shù)較少時(少于30)坚冀,數(shù)組的查找效率不會低济赎,同時空間使用率較高,超過30個數(shù)組查找效率會變低適合用字典记某,而字典的查找效率可以達到O1的效率司训,但空間使用率較低,所以才這么設(shè)計液南,大家可以自行查看Bag結(jié)構(gòu)體的源碼外驱。我們可以把observers當成一個觀察者容器制妄,存放了所有觀察者跨晴。
所以返回Observers()
,相當于創(chuàng)建一個新Bag對象标锄,所以內(nèi)部沒有元素。
當synchronized_on 返回observers后譬涡,接著執(zhí)行dispatch方法:
func dispatch<Element>(_ bag: Bag<(Event<Element>) -> Void>, _ event: Event<Element>) {
bag._value0?(event)
if bag._onlyFastPath {
return
}
let pairs = bag._pairs
for i in 0 ..< pairs.count {
pairs[i].value(event)
}
if let dictionary = bag._dictionary {
for element in dictionary.values {
element(event)
}
}
}
這個方法目的就取出所有閉包闪幽,并執(zhí)行這些閉包,相當于發(fā)送序列涡匀。
這里先執(zhí)行第一個元素的閉包(如果有的話)盯腌,_onlyFastPath表示當元素只有一個時,則_onlyFastPath等于true陨瘩,就是說當只有一個元素時腕够,后面無需再執(zhí)行了,有多個元素時還需要遍歷數(shù)組和字典執(zhí)行閉包舌劳,發(fā)出序列帚湘。
如果事件類型是.completed, .error
, 那么將stopped 設(shè)置 true, stoppedEvent設(shè)置為event甚淡, 返回當前observers并清空observers大诸,后續(xù)當然無法再接收事件.
stoppedEvent 設(shè)置完后,后面再訂閱時會將這個event發(fā)出去贯卦,這個event一定是error或completed资柔,我們可以看看訂閱的代碼:
func synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if let stoppedEvent = self.stoppedEvent {
observer.on(stoppedEvent)
return Disposables.create()
}
if self.isDisposed {
observer.on(.error(RxError.disposed(object: self)))
return Disposables.create()
}
let key = self.observers.insert(observer.on)
return SubscriptionDisposable(owner: self, key: key)
}
如果已經(jīng)釋放資源了,則發(fā)送error.
否則將observer的on方法放入observers撵割,緩存起來贿堰。等到發(fā)送序列時再把所有緩存的Observer發(fā)出去;每訂閱一次就會緩存一個啡彬。這就是為什么在訂閱之前發(fā)送序列無效羹与,因為沒有在緩存中故硅,只要在發(fā)送序列之前訂閱了,怎么這些訂閱都會收到這個序列纵搁。
然后再看看釋放資源的代碼:
public func dispose() {
self.lock.performLocked { self.synchronized_dispose() }
}
final func synchronized_dispose() {
self.disposed = true
self.observers.removeAll()
self.stoppedEvent = nil
}
很簡單先加鎖契吉,然后移除所有observers, disposed= true, stoppedEvent = nil;
所以釋放資源后,所有觀察者無法在接收到序列了诡渴。
現(xiàn)在試著在例子中加入error和complete事件:
subject.onNext(1)
subject.subscribe(onNext: { ele in
print("First ->",ele)
}, onError: { error in
print(error)
}, onCompleted: {
print("complete")
})
.disposed(by: bag)
subject.onNext(2)
subject.subscribe(onNext: { ele in
print("Second ->",ele)
}, onError: { error in
print(error)
}, onCompleted: {
print("complete")
}).disposed(by: bag)
subject.onNext(3)
subject.onError(RxError.unknown)
subject.onCompleted()
subject.subscribe(onNext: { ele in
print("Third ->",ele)
}, onError: { error in
print(error)
}, onCompleted: {
print("complete")
}).disposed(by: bag)
打印出:
First -> 2
First -> 3
Second -> 3
Unknown error occurred.
Unknown error occurred.
Unknown error occurred.
當訂閱error時,也會將發(fā)送序列的閉包緩存到observers中菲语,這樣在調(diào)用onError時妄辩,兩個觀察者都會收到error事件,所以打印前兩條error山上,之后在發(fā)送onCompleted事件時眼耀,因為在發(fā)送onError時會清空observers,所以這個時候觀察者什么都收不到佩憾,之后又訂閱了一次哮伟,這時因為stoppedEvent != nil, 所以直接調(diào)用on發(fā)送之前的error事件,所以會再打印一次error妄帘。
BehaviorSubject
BehaviorSubject的代碼跟PublishSubject基本相同楞黄,只是BehaviorSubject會保存最后一次發(fā)送的元素。
private var element: Element
BehaviorSubject初始化時抡驼,會給這個element初始化鬼廓。然后訂閱時插入observer.on的同時會把這個element發(fā)出去。
let key = self.observers.insert(observer.on)
observer.on(.next(self.element))
發(fā)送onNext時會更新這個element致盟。
case .next(let element):
self.element = element
AsyncSubject
AsyncSubject會發(fā)出Observable發(fā)出的最后一個值(也僅是最后一個值)碎税,并且只有在這個Observable完成之后;(如果Observable沒有發(fā)出任何值馏锡,那么AsyncSubject也會在沒有發(fā)出任何值的情況下完成雷蹂。),完成事件發(fā)出后杯道,后續(xù)無法再接受事件匪煌,error事件也算完成事件。
考慮下面的例子:
subject.onNext(1)
subject.subscribe(onNext: { ele in
print("First ->",ele)
}, onError: { error in
print("First ->", error)
}, onCompleted: {
print("complete")
})
.disposed(by: bag)
subject.onNext(2)
subject.subscribe(onNext: { ele in
print("Second ->",ele)
}, onError: { error in
print("Second ->", error)
}, onCompleted: {
print("complete")
}).disposed(by: bag)
subject.onNext(3)
// subject.onError(RxError.unknown)
subject.onCompleted()
subject.subscribe(onNext: { ele in
print("Third ->",ele)
}, onError: { error in
print("Third ->", error)
}, onCompleted: {
print("complete")
}).disposed(by: bag)
這時候只會發(fā)出3党巾,同時complete事件也會發(fā)出虐杯,打印結(jié)果:
First -> 3
Second -> 3
complete
complete
Third -> 3
complete
onCompleted之后再次訂閱,會收到最后的元素和完成事件昧港。
如果打開// subject.onError(RxError.unknown)的注釋擎椰,則只會打印error事件:
First -> Unknown error occurred.
Second -> Unknown error occurred.
Third -> Unknown error occurred.
error之后,事件已經(jīng)完成创肥,再發(fā)送onCompleted則無效达舒,所以不會打印complete值朋。
有一點跟BehaviorSubject很像就是在發(fā)出完成事件之后再訂閱依然可以收到最后一個元素和完成事件。
在源碼中subject保存了最后一個元素(lastElement)巩搏,發(fā)出error時昨登,會立刻向所有觀察者發(fā)出error事件,同時將stoppedEvent賦值為error事件贯底,之后再也無法發(fā)出事件:
if self.isStopped {
return (Observers(), .completed)
}
如果沒有發(fā)出error丰辣, 而是發(fā)出了complete事件,怎么判斷是否有l(wèi)astElement禽捆,如果有則向所有觀察者發(fā)出最后一個元素和complete事件笙什,如果沒有l(wèi)astElement,怎么只發(fā)出complete事件胚想,之后清空觀察者琐凭。stoppedEvent賦值為最后一個next事件。
在訂閱的代碼中:
func synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if let stoppedEvent = self.stoppedEvent {
switch stoppedEvent {
case .next:
observer.on(stoppedEvent)
observer.on(.completed)
case .completed:
observer.on(stoppedEvent)
case .error:
observer.on(stoppedEvent)
}
return Disposables.create()
}
let key = self.observers.insert(observer.on)
return SubscriptionDisposable(owner: self, key: key)
}
如果已經(jīng)完成(前面已經(jīng)發(fā)出了error或complete)浊服,則發(fā)出最后一個error统屈,或最后一個元素和complete,或者只發(fā)出complete牙躺。
否則將觀察者插入到observers中愁憔。
ReplaySubject
我們把PublishSubject理解為沒有緩存的序列,BehaviorSuject緩存一個元素孽拷,而ReplaySubject可以緩存任意個元素惩淳,根據(jù)緩存策略,每項通知都會廣播給所有已訂閱和未來的觀察者乓搬。
實現(xiàn)方式是:ReplayOne緩存一個元素思犁,ReplayManyBase緩存多個元素,通過隊列維護這些元素进肯,發(fā)出緩存元素時激蹲,遍歷隊列發(fā)送元素。(先進先出)
override func replayBuffer<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
for item in self.queue {
observer.on(.next(item))
}
}
當buffer的數(shù)量超出設(shè)置的數(shù)量時江掩,進行出隊操作
override func trim() {
while self.queue.count > self.bufferSize {
_ = self.queue.dequeue()
}
}
ReplayManyBase繼承自ReplayBufferBase学辱,ReplayBufferBase中封裝了核心邏輯,發(fā)送數(shù)據(jù)時先緩存元素环形,再向觀察者發(fā)出元素策泣,訂閱時先重放緩存的元素,再保存觀察者抬吟。
總結(jié)
Suject因為既是Observable萨咕,ObserverType,同時又時Cancelable火本,核心邏輯基本都在一個類中危队,相對來說代碼比較簡單聪建。它實現(xiàn)了一個典型的觀察者模式(先保存所有的觀察者,發(fā)送事件時通知所有觀察者)茫陆。