// SinkDisposer
fileprivate final class SinkDisposer: Cancelable {
fileprivate enum DisposeState: UInt32 {
case disposed = 1
case sinkAndSubscriptionSet = 2
}
// Jeej, swift API consistency rules
fileprivate enum DisposeStateInt32: Int32 {
case disposed = 1
case sinkAndSubscriptionSet = 2
}
private var _state: AtomicInt = 0
private var _sink: Disposable? = nil
private var _subscription: Disposable? = nil
var isDisposed: Bool {
return AtomicFlagSet(DisposeState.disposed.rawValue, &_state)
}
func setSinkAndSubscription(sink: Disposable, subscription: Disposable) {
_sink = sink
_subscription = subscription
let previousState = AtomicOr(DisposeState.sinkAndSubscriptionSet.rawValue, &_state)
if (previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) != 0 {
rxFatalError("Sink and subscription were already set")
}
if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
sink.dispose()
subscription.dispose()
_sink = nil
_subscription = nil
}
}
func dispose() {
let previousState = AtomicOr(DisposeState.disposed.rawValue, &_state)
if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
return
}
if (previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) != 0 {
guard let sink = _sink else {
rxFatalError("Sink not set")
}
guard let subscription = _subscription else {
rxFatalError("Subscription not set")
}
sink.dispose()
subscription.dispose()
_sink = nil
_subscription = nil
}
}
}
首先我們要分析下需求库倘,哪些操作是需要保證線程安全的挫以。 顯然_state
的設(shè)置是要保證線程安全的诫惭, 那么與之相關(guān)的讀寫操作都是要加鎖的,因此 isDisposed
,setSinkAndSubscription
,dispose
都是需要加鎖的桩了。RX 通過Atomic Operation 保證操作的原子性附帽。這里值的注意的是通過or
設(shè)置具體標志位,通過&
操作檢測具體相應(yīng)標志位井誉。
以setSinkAndSubscription
為例子:
let previousState = AtomicOr(DisposeState.sinkAndSubscriptionSet.rawValue, &_state)
if (previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) != 0 {
rxFatalError("Sink and subscription were already set")
}
if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
sink.dispose()
subscription.dispose()
_sink = nil
_subscription = nil
}
現(xiàn)假設(shè)執(zhí)行setSinkAndSubscription
前_state = 0bxy
那么執(zhí)行 let previousState = AtomicOr(DisposeState.sinkAndSubscriptionSet.rawValue, &_state)
后
previousState = 0bxy
_state = 0bxy | 0b10 = 0b1y
也就是說這個操作最終導(dǎo)致_state
的第二位置1
previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue
0bxy & 0b10 = 0bx0
若x = 1
則最終結(jié)果為0b10
,否則則為0b00
, 再通過if
語句即可檢測第二位是否為0.
這里還有一個小坑:
if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
print("triger disposed in setSinkAndSubscription function \(self) \n \(Thread.current)")
sink.dispose()
subscription.dispose()
_sink = nil
_subscription = nil
}
在檢測完setSinkAndSubscription flag之后蕉扮,立馬又檢測 disposed flag 如果為真則立即執(zhí)行dispose
操作,也就是說一旦set disposed flag,則再設(shè)置setSinkAndSubscription
則是無效操作,這里我不確定具體是什么情況會觸發(fā)這個操作颗圣,不過我跑了下單元測試喳钟,確實某些case是會觸發(fā)這種情況,以下便是一個觸發(fā)該case的單元測試:
func test1323() {
func performSharingOperatorsTest(share: @escaping (Observable<Int>) -> Observable<Int>) {
_ = share(Observable<Int>.create({ observer in
observer.on(.next(1))
Thread.sleep(forTimeInterval: 0.1)
observer.on(.completed)
return Disposables.create()
})
.flatMap { (int) -> Observable<Int> in
return Observable.create { (observer) -> Disposable in
DispatchQueue.global().async {
observer.onNext(int)
observer.onCompleted()
}
return Disposables.create()
}
})
.subscribe { (e) in
}
}
for op in [
{ $0.share(replay: 0, scope: .whileConnected) },
{ $0.share(replay: 0, scope: .forever) },
{ $0.share(replay: 1, scope: .whileConnected) },
{ $0.share(replay: 1, scope: .forever) },
{ $0.share(replay: 2, scope: .whileConnected) },
{ $0.share(replay: 2, scope: .forever) },
] as [(Observable<Int>) -> Observable<Int>] {
performSharingOperatorsTest(share: op)
}
}
但是具體怎么原理還有待細究在岂。
相關(guān)資料: