本篇接著上篇函數(shù)響應(yīng)式編程思想 & RxSwift 核心邏輯(一)繼續(xù)詳細(xì)分析上篇沒有提到的地方。
序列繼承鏈
let ob = Observable<Any>.create
這句代碼創(chuàng)建的是類AnonymousObservable
匿名可觀察序列, 它的父類和層級(jí)關(guān)系如下:
類 AnonymousObservable --> 類 Producer --> 類 Observable --> 協(xié)議 ObservableType --> 協(xié)議 ObservableConvertibleType
先從上往下說茵乱,比較好理解一點(diǎn)茂洒。
協(xié)議ObservableConvertibleType
public protocol ObservableConvertibleType {
/// Type of elements in sequence.
associatedtype E
/// Converts `self` to `Observable` sequence.
///
/// - returns: Observable sequence that represents `self`.
func asObservable() -> Observable<E>
}
看注釋可以明白,使用了關(guān)聯(lián)類型提供了一個(gè)可轉(zhuǎn)換為可觀察序列Observable的方法瓶竭,這個(gè)方法在遵守這個(gè)協(xié)議的類中實(shí)現(xiàn)即可督勺。
這是最底層的協(xié)議,即可滿足“萬物皆序列”的目的斤贰。例如:
UISwitch().rx.value.asObservable();
因?yàn)関alue是結(jié)構(gòu)體ControlProperty類型的智哀,而ControlProperty底層又遵守了ObservableConvertibleType協(xié)議,所以最后value可以被轉(zhuǎn)換為一個(gè)可觀察序列荧恍。
協(xié)議ObservableType
顧名思義盏触,這是一個(gè)可觀察序列協(xié)議,于是目前提供了一個(gè)每個(gè)可觀察序列都一定會(huì)有的訂閱方法:
- returns: Subscription for `observer` that can be used to cancel production of sequence elements and free resources.
*/
func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E
類Observable
遵守ObservableType協(xié)議的可觀察序列块饺,這是所有序列的基類赞辩。所有這里面實(shí)現(xiàn)的方法,都是所有序列都必須要實(shí)現(xiàn)的基本方法授艰。
public class Observable<Element> : ObservableType { // Observable 可觀察序列辨嗽,所有序列的基類
/// Type of elements in sequence.
public typealias E = Element // 起別名
init() {
#if TRACE_RESOURCES
_ = Resources.incrementTotal() // 初始化的時(shí)候,類似于 引用計(jì)數(shù) +1
#endif
}
// 協(xié)議的實(shí)現(xiàn)
public func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
rxAbstractMethod() // 抽象方法淮腾,父類不實(shí)現(xiàn)糟需,子類去實(shí)現(xiàn)
}
public func asObservable() -> Observable<E> { // 返回的是一個(gè) Observable對(duì)象
return self
}
deinit {
#if TRACE_RESOURCES
_ = Resources.decrementTotal() // 銷毀的時(shí)候屉佳,類似于 引用計(jì)數(shù) -1 ; 從而根據(jù)進(jìn)入一個(gè)頁(yè)面前,再退出一個(gè)頁(yè)面后類似的操作洲押,總共引用計(jì)數(shù)的變化來判斷是否有內(nèi)存泄漏問題
#endif
}
// this is kind of ugly I know :(
// Swift compiler reports "Not supported yet" when trying to override protocol extensions, so ˉ\_(ツ)_/ˉ 想優(yōu)化協(xié)議擴(kuò)展武花,但是swift不支持,雖然不知道他想具體怎么優(yōu)化
/// Optimizations for map operator 優(yōu)化map函數(shù)
internal func composeMap<R>(_ transform: @escaping (Element) throws -> R) -> Observable<R> {
return _map(source: self, transform: transform)
}
}
上面注釋寫的很清楚了杈帐,主要是三個(gè)功能:
- 便于內(nèi)存管理体箕,實(shí)現(xiàn)了當(dāng)Observable初始化和銷毀的時(shí)候,分別實(shí)現(xiàn)Resources.incrementTotal()方法和Resources.decrementTotal()方法
- 實(shí)現(xiàn)了遵守的協(xié)議方法subscribe抽象方法挑童,但是并沒有具體實(shí)現(xiàn)累铅,交給子類去實(shí)現(xiàn)
- 實(shí)現(xiàn)了協(xié)議方法asObservable(),使其子類調(diào)用這個(gè)方法都能返回一個(gè)可觀察序列Observable
類Producer
Producer繼承自O(shè)bservable站叼,主要是具體實(shí)現(xiàn)了父類的訂閱方法subscribe
娃兽,并且提供了一個(gè)run
方法,但是并沒有具體實(shí)現(xiàn)尽楔,交給子類去實(shí)現(xiàn)投储。
class Producer<Element> : Observable<Element> {
override init() {
super.init()
}
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
if !CurrentThreadScheduler.isScheduleRequired { // CurrentThreadScheduler 調(diào)度者
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer // 銷毀
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
rxAbstractMethod() // 抽象方法,父類不實(shí)現(xiàn)阔馋,子類去實(shí)現(xiàn)
}
}
類AnonymousObservable
主要是保存了屬性_subscribeHandler玛荞,和具體實(shí)現(xiàn)了父類Producer中的run方法
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable // 起個(gè)別名為SubscribeHandler
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) { // 帶了一個(gè)SubscribeHandler參數(shù)的初始化
self._subscribeHandler = subscribeHandler // 保存了訂閱管理者
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel) // 這里傳進(jìn)去了 訂閱者 銷毀者
let subscription = sink.run(self) // 這里的self就是 AnonymousObservable 這里傳進(jìn)去了 可觀察序列,即 ob
return (sink: sink, subscription: subscription)
}
}
訂閱者繼承鏈和訂閱發(fā)送流程
訂閱者繼承鏈
在let _ = ob.subscribe(onNext: { (text) in
這句代碼里面創(chuàng)建了一個(gè)AnonymousObserver內(nèi)部訂閱者垦缅。它的繼承關(guān)系如下:
類AnonymousObserver --> 類ObserverBase --> 協(xié)議Disposable,協(xié)議ObserverType
還是從上往下說驹碍,好梳理一些:
協(xié)議ObserverType
顧名思義這是一個(gè)訂閱者的底層基本協(xié)議
public protocol ObserverType {
/// The type of elements in sequence that observer can observe.
associatedtype E
/// Notify observer about sequence event.
///
/// - parameter event: Event that occurred.
func on(_ event: Event<E>)
}
/// Convenience API extensions to provide alternate next, error, completed events
extension ObserverType {
/// Convenience method equivalent to `on(.next(element: E))`
///
/// - parameter element: Next element to send to observer(s)
public func onNext(_ element: E) {
self.on(.next(element)) // 這里的on壁涎,是遵守了ObserverType協(xié)議的 AnonymousObservableSink的on方法 即管子來處理結(jié)果
}
/// Convenience method equivalent to `on(.completed)`
public func onCompleted() {
self.on(.completed) // 管子來處理結(jié)果
}
/// Convenience method equivalent to `on(.error(Swift.Error))`
/// - parameter error: Swift.Error to send to observer(s)
public func onError(_ error: Swift.Error) {
self.on(.error(error)) // 管子來處理結(jié)果
}
}
從上面代碼中可以看出,主要是提供了一個(gè)所有訂閱者都會(huì)有的事件處理on
方法志秃。以及擴(kuò)展的序列會(huì)出現(xiàn)的Next
怔球,Completed
和Error
三種狀態(tài)档玻,只不過這里on
的方法是遵守這個(gè)協(xié)議的類的子類實(shí)現(xiàn)的on
方法爱态。
協(xié)議Disposable
/// Represents a disposable resource.
public protocol Disposable {
/// Dispose resource.
func dispose()
}
提供了一個(gè)釋放資源鸟缕,即垃圾回收的方法净当。
類ObserverBase
顧名思義琉预,這是所有訂閱者的基類:
class ObserverBase<ElementType> : Disposable, ObserverType {
typealias E = ElementType
private let _isStopped = AtomicInt(0)
func on(_ event: Event<E>) { // 這里的類型必須是前面?zhèn)鬟M(jìn)來的類型尿褪,因?yàn)閟wift是一門強(qiáng)類型語言
switch event {
case .next:
if load(self._isStopped) == 0 {
self.onCore(event)
}
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.onCore(event)
}
}
}
func onCore(_ event: Event<E>) {
rxAbstractMethod() // 抽象方法壤巷,交給子類具體實(shí)現(xiàn)
}
func dispose() {
fetchOr(self._isStopped, 1) // 當(dāng)計(jì)數(shù)為0的時(shí)候明也,就回收
}
}
從上面可以看出洼冻,主要是提供了下面三個(gè)功能:
- 提供了
onCore
方法崭歧,但是并沒有具體實(shí)現(xiàn),交給子類具體實(shí)現(xiàn) - 實(shí)現(xiàn)on方法撞牢,根據(jù)序列狀態(tài)
next
,error
,completed
率碾,調(diào)用onCore
方法叔营,并將事件傳過去,交由子類具體實(shí)現(xiàn) - 實(shí)現(xiàn)了垃圾回收的方法
類AnonymousObserver
final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
typealias Element = ElementType
typealias EventHandler = (Event<Element>) -> Void // 起別名
private let _eventHandler : EventHandler
init(_ eventHandler: @escaping EventHandler) {
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
self._eventHandler = eventHandler // 保存了事件管理者所宰,保存的即是前面AnonymousObserver大括號(hào)內(nèi)的內(nèi)容
}
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event) //
}
#if TRACE_RESOURCES
deinit {
_ = Resources.decrementTotal()
}
#endif
}
從上面可以看出绒尊,主要實(shí)現(xiàn)的功能是:
- 在類初始化的時(shí)候,保存了_eventHandler對(duì)象
- 和Observable一樣仔粥,分別實(shí)現(xiàn)Resources.incrementTotal()方法和Resources.decrementTotal()方法來管理內(nèi)存
- 重寫onCore方法
訂閱和發(fā)送流程:
1婴谱、在訂閱的這句let _ = ob.subscribe(onNext: { (text) in
代碼中創(chuàng)建了訂閱者observer
2、接下來件炉,到這句
return Disposables.create(
self.asObservable().subscribe(observer), // self.asObservable() 這個(gè)即是序列ob勘究。這句代碼回到上面的 ob序列者的閉包
disposable
)
即走到了Producer
的subscribe
方法中的run
方法
3、來到了AnonymousObservable.run
方法中的sink.run
方法
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel) // 這里傳進(jìn)去了 訂閱者 銷毀者
let subscription = sink.run(self) // 這里的self就是 AnonymousObservable 這里傳進(jìn)去了 可觀察序列斟冕,即 ob
return (sink: sink, subscription: subscription)
}
4口糕、這時(shí)就來到了
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self)) // 這里的self是 AnonymousObservableSink
}
5、這時(shí)來到了AnyObserver的初始化方法
public init<O : ObserverType>(_ observer: O) where O.E == Element {
self.observer = observer.on // 這個(gè)后面的observer是AnonymousObservableSink磕蛇,將AnonymousObservableSink的on函數(shù)保存在了常量 self.observer 中
}
6景描、這時(shí)來到了創(chuàng)建序列時(shí)的閉包
let ob = Observable<Any>.create { (obserber) -> Disposable in // 這里的obserber是AnyObserver類型
// 3:發(fā)送信號(hào)
obserber.onNext("框架班級(jí)") //這一句
obserber.onCompleted()
// obserber.onError(NSError.init(domain: "coocieeror", code: 10087, userInfo: nil))
return Disposables.create()
}
中onNext這句
7、這時(shí)來到了AnyObserver的on方法
public func on(_ event: Event<Element>) {
return self.observer(event) // 這里的self.observer是上面保存的AnonymousObservableSink的on函數(shù)秀撇,并將event傳進(jìn)去
}
8超棺、最后來到了管道中的forwardOn方法
final func forwardOn(_ event: Event<O.E>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
if isFlagSet(self._disposed, 1) {
return
}
self._observer.on(event) // 這里的self._observer 即是訂閱者AnonymousObserver
}
9、最后就來到了AnonymousObserver的on方法的實(shí)現(xiàn)中
let observer = AnonymousObserver<E> { event in
//保存了事件呵燕,這整個(gè)大括號(hào)是作為一個(gè)逃逸閉包參數(shù)棠绘,初始化傳遞給了AnonymousObserver。并且將其賦值給新的常量observer再扭,保存起來
//最后傳給了管子
#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
走到.next方法中氧苍,最后打印“訂閱到:框架班級(jí)”
總結(jié)的簡(jiǎn)單流程圖如下:
總的流程:
RxSwift設(shè)計(jì)思維之我想
- 最底層的是協(xié)議,每個(gè)協(xié)議都有自己獨(dú)有的方法泛范,可以遵守多個(gè)協(xié)議让虐,方便后期擴(kuò)展
- 使用了一層一層的繼承關(guān)系,最底層的基類實(shí)現(xiàn)所有類都要實(shí)現(xiàn)的基本方法罢荡,可以實(shí)現(xiàn)協(xié)議的方法赡突,但是只是一個(gè)抽象方法,具體方法到某個(gè)具體的子類去實(shí)現(xiàn)
- 使用了AnonymousObservableSink中間層來實(shí)現(xiàn)可觀察序列AnonymousObservable和訂閱者AnonymousObserver之間的通信区赵。類似于其他框架中的manager
- 可以在每個(gè)基類的初始化init和銷毀方法中deinit實(shí)現(xiàn)監(jiān)控內(nèi)存管理的方法
- 參數(shù)起名還是盡量準(zhǔn)確一點(diǎn)惭缰,很多地方observer表示的是Sink或別的,但是RxSwift還起名為observer笼才,不便于理解