讀 RxSiwft

當程序員原來越浮躁了,項目做多了大都是雷同的, 對技術(shù)沒啥幫助,讀一些牛逼的第三方框架,有助于提升,
關(guān)于RxSwift,作為ReactiveX的成員框架之一奠旺,它有著血脈相承的語法劣砍,語言和哲學(xué)上的一致性使得即便之后轉(zhuǎn)向其他的平臺我們也能很快的上手Rx对粪。其次仿吞,通過閱讀源碼,我們也可以看到淳地,其實有的時候大神的代碼也沒有那么的完美怖糊,我們也可以在一些地方看到妥協(xié)和失誤

本篇目標

本篇的目標就是了解下面這段代碼(來自Rx.playgroud)的實現(xiàn):

example("just") {
    let disposeBag = DisposeBag()
    
    Observable.just("??")
        .subscribe { event in
            print(event)
        }
        .disposed(by: disposeBag)
}

這寥寥數(shù)行代碼,我想但凡RxSwift入門的同學(xué)都知道它的用處:創(chuàng)建一個單值的可觀察序列颇象,并且打印出它的所有序列伍伤。恩...沒毛病,但是本篇想要知道的是:

序列是如何創(chuàng)建的遣钳?它的結(jié)構(gòu)是怎么樣的扰魂?
序列是如何被觀察者訂閱的?
so...let's go !

眾所周知,自Swift誕生以來,蘋果爸爸就一直在推崇*** 面向協(xié)議編程(POP) *** 阅爽,而RxSwift也是同樣的,遵循了從一個協(xié)議開始荐开,而不是從一個類開始付翁。但是我并不想從協(xié)議講起,因為雖然從協(xié)議講起最具邏輯性晃听,但是從文章的角度來說并不好理解和閱讀百侧。所以本文將以示例代碼為切入點,自上而下的閱讀能扒,以求簡單清晰易懂佣渴。

Observable

在開篇的示例代碼中,首先映入我們眼簾的是Observable初斑,Observable調(diào)用了just方法辛润。其實Observable是一個遵守ObservableType的類,實現(xiàn)代碼如下

public class Observable<Element> : ObservableType {
    /// Type of elements in sequence.
    public typealias E = Element
    
    init() {
 #if TRACE_RESOURCES
        let _ = Resources.incrementTotal()
#endif
    }
    
    public func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
        rxAbstractMethod()
    }
    
    public func asObservable() -> Observable<E> {
        return self
    }
    
    deinit {
  #if TRACE_RESOURCES
        let _ = Resources.decrementTotal()
#endif
    }

    // this is kind of ugly I know :(
    // Swift compiler reports "Not supported yet" when trying to override protocol extensions, so ˉ\_(ツ)_/ˉ

    /// Optimizations for map operator
    internal func composeMap<R>(_ selector: @escaping (Element) throws -> R) -> Observable<R> {
        return Map(source: self, transform: selector)
    }
}

首先這是一個用Public修飾的抽象類见秤,它是直接面向RxSwift使用者的砂竖。在這個類當中,使用了E的別名來充當序列值的泛型類型鹃答。在Init方法中我們可以看到一個Resource的結(jié)構(gòu)體乎澄,順便提一句,這是一個用來“追蹤計數(shù)”RxSwift引用資源的测摔,每當init一個資源計數(shù)就+1置济,deinit的時候就總數(shù)-1,以此來追蹤全局的資源使用锋八。

除此之外浙于,我們還可以看到有兩個方法:

    public func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E 
    public func asObservable() -> Observable<E>

這兩個方法都在ObservableType這個協(xié)議中有所定義,前者定義了Observable能夠被訂閱的行為挟纱,后者則是定義了可以將自身“轉(zhuǎn)換”成“Observable實體”的功能路媚。這里可能會有一些Confuse,其實前面已經(jīng)提到過,單純的Observable類并不能作為序列被直接訂閱使用樊销,只有Observable的實體子類才可以被實例化使用叽躯。

所以,我們也可以看到何暮,subscribe函數(shù)的實現(xiàn)也只是簡單的fatalError彼乌,并沒有實際的邏輯操作:

// Swift does not implement abstract methods. This method is used as a runtime check to ensure that methods which intended to be abstract (i.e., they should be implemented in subclasses) are not called directly on the superclass.
func rxAbstractMethod(file: StaticString = #file, line: UInt = #line) -> Swift.Never {
    rxFatalError("Abstract method", file: file, line: line)
}

-> Observable - > ObservableType

現(xiàn)在我們再來看一下ObservableType:

public protocol ObservableType : ObservableConvertibleType {
    /// Type of elements in sequence.
    associatedtype E
    func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E
}

extension ObservableType {
    
    /// Default implementation of converting `ObservableType` to `Observable`.
    public func asObservable() -> Observable<E> {
        // temporary workaround
        //return Observable.create(subscribe: self.subscribe)
        return Observable.create { o in
            return self.subscribe(o)
        }
    }
}

這兩個方法的作用前面已經(jīng)講過,在這里我們可以看到通過Protocol ExtensionRxSwift為ObservableType提供了默認的asObservable的實現(xiàn)剂府,那么ObservableConvertibleType是一個什么協(xié)議呢拧揽,是不是有它從根源上定義了asObservable方法呢?我們來看一下ObservableConvertibleType的定義:

/// Type that can be converted to observable sequence (`Observer<E>`).
public protocol ObservableConvertibleType {
    /// Type of elements in sequence.
    associatedtype E

    /// Converts `self` to `Observable` sequence.
    ///
    /// - returns: Observable sequence that represents `self`.
    func asObservable() -> Observable<E>
}

果然如此,那么至此為止淤袜,Observable之前的協(xié)議繼承體系我們已經(jīng)明了痒谴,畫成圖大概是這樣的:



很遺憾,至此為止我們并沒有看到太多的實現(xiàn)邏輯铡羡,但是我們看到了一系列Observable的Protocol根基积蔚。那么具體的Observable實體應(yīng)該是怎么樣的呢?我們從just身上來找到答案烦周。

在Observable+Creation.swift文件中尽爆,我們找到了關(guān)于just的定義:

Observable+Creation.swift是一個Observable的一個拓展(extension),文件中我們可以看到很多關(guān)于構(gòu)建Observable實體的方法,諸如create, empty, never等等,本篇以just作為切入點读慎,其實其他的公開的Creation函數(shù)也是類似的邏輯漱贱,所以不會一一介紹了。
    public static func just(_ element: E) -> Observable<E> {
        return Just(element: element)
    }

我們可以看到夭委,這是一個public修飾的暴露在外的Observable的靜態(tài)方法幅狮,返回的也是Observable類型。那么這個Just是什么呢株灸?

final class Just<Element> : Producer<Element> {
    private let _element: Element
    
    init(element: Element) {
        _element = element
    }
    
    override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        observer.on(.next(_element))
        observer.on(.completed)
        return Disposables.create()
    }
}

我們可以看到彪笼,這是一個繼承自Producer的一個類,OK蚂且,我們先不去管這個Just,先去看看Producer是一個什么東西:

class Producer<Element> : Observable<Element> {
    override init() {
        super.init()
    }
    
    override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            let disposer = SinkDisposer()
            let sinkAndSubscription = run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }
    
    func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        rxAbstractMethod()
    }
}

看到這里我們清楚了配猫,Producer是繼承自O(shè)bservable的一個抽象類,結(jié)合前面的Just杏死,于是我們的圖可以畫成這樣了:

在Observable的子類Producer我們可以看到該基類實現(xiàn)subscribe的基礎(chǔ)方法泵肄,這里用到了RxSwift當中的另外一個概念--Scheduler,但是它不是本文的重點,我們將在接下來的文章里面去集中討論它淑翼,這里只是做一些簡單的解讀(感覺給自己埋了坑)腐巢。

在Producer中,subscribe主要做了以下幾件事情:

(1)創(chuàng)建一個SinkDisposer玄括。

(2)判斷是否需要Scheduler來進行切換線程的調(diào)用冯丙,如果需要那么就在指定的線程中操作。

(4)調(diào)用run方法遭京,將observer和剛剛創(chuàng)建的SinkDisposer作為入?yún)⑽赶В玫揭粋€Sink和Subscription的一個元組。這里的Sink和Subscription都是遵守Disposable的類哪雕。

(4).SinkDisposer對傳入之前的Sink和Subscription執(zhí)行setSinkAndSubscription方法船殉。

(5).將執(zhí)行完setSinkAndSubscription方法的disposer作為返回值返回。
這里的相關(guān)操作其實都容易理解,首先看看這個run:

func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        rxAbstractMethod()
    }

好斯嚎,既然這是一個抽象方法利虫,那么我們暫且先不去管它挨厚,今晚不是它的輪次(最近??殺玩多了)。除去這個方法糠惫,最讓人疑惑的就是這個setSinkAndSubscription方法,那么它的作用是什么呢疫剃?

我們先來談一談SinkDisposer類,但是再談它之前我們需要先知道它所遵守的協(xié)議硼讽。SinkDisposer是一個遵守Cancelable協(xié)議的類巢价,那么這個Cancelable是何方神圣呢?

這一切都要從Disposable說起理郑。

Disposable

Disposable只是一個簡單的協(xié)議蹄溉,其中只有一個dispose方法咨油,定義了釋放資源的統(tǒng)一行為您炉。

/// Respresents a disposable resource.
public protocol Disposable {
    /// Dispose resource.
    func dispose()
}

OK,這個很簡單Cancelable呢役电?

/// Represents disposable resource with state tracking.
public protocol Cancelable : Disposable {
    /// Was resource disposed.
    var isDisposed: Bool { get }
}

沒錯赚爵,Cancelable只是一個繼承自Disposable的一個協(xié)議,其中定義了一個Bool類型的isDisposed標識法瑟,用來標識是否該序列已經(jīng)被釋放冀膝。

SinkDisposer

OK,現(xiàn)在我們終于來到了SinkDisposer類霎挟,先上源碼:

fileprivate final class SinkDisposer: Cancelable {
    fileprivate enum DisposeState: UInt32 {
        case disposed = 1
        case sinkAndSubscriptionSet = 2
    }

    // Jeej, swift API consistency rules
    fileprivate enum DisposeStateInt32: Int32 {
        case disposed = 1
        case sinkAndSubscriptionSet = 2
    }
    
    private var _state: AtomicInt = 0
    private var _sink: Disposable? = nil
    private var _subscription: Disposable? = nil

    var isDisposed: Bool {
        return AtomicFlagSet(DisposeState.disposed.rawValue, &_state)
    }

    func setSinkAndSubscription(sink: Disposable, subscription: Disposable) {
        _sink = sink
        _subscription = subscription

        let previousState = AtomicOr(DisposeState.sinkAndSubscriptionSet.rawValue, &_state)
        if (previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) != 0 {
            rxFatalError("Sink and subscription were already set")
        }

        if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
            sink.dispose()
            subscription.dispose()
            _sink = nil
            _subscription = nil
        }
    }
    
    func dispose() {
        let previousState = AtomicOr(DisposeState.disposed.rawValue, &_state)

        if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
            return
        }

        if (previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) != 0 {
            guard let sink = _sink else {
                rxFatalError("Sink not set")
            }
            guard let subscription = _subscription else {
                rxFatalError("Subscription not set")
            }

            sink.dispose()
            subscription.dispose()

            _sink = nil
            _subscription = nil
        }
    }
}

在這里我們看到的一些以Atomic開頭的方法都是在OSAtomic.h中所定義的自動讀取和更新指定值方法窝剖,詳細的使用方法可以點 這里的官方文檔 。這里使用的Atomic,方法是為了區(qū)分以下幾種可能性:

case1 - 第一次執(zhí)行

(1)將sink和subscription賦值給自身的私有變量酥夭。

(2)通過Atmoic方法(也就是OSAtomicOr32OrigBarrier)方法赐纱,將_state的值更新為2,并且返回值previousState為0熬北。

(3)previousState和DisposeStateInt32. sinkAndSubscriptionSet.rawValue做邏輯與運算疙描,得值為0所以不執(zhí)行if里面的邏輯。

(4)previousState和DisposeStateInt32.disposed.rawValue 做邏輯與運算讶隐,的值為0起胰,所以也不執(zhí)行if里面的邏輯。

(5)結(jié)束巫延。

case2 - 再次執(zhí)行

1.由于沒有執(zhí)行過dispose方法效五,所以自從第一次執(zhí)行setSinkAndSubscription之后,_state的值一直為2炉峰。當執(zhí)行previousState和DisposeStateInt32.disposed.rawValue的時候火俄,的值為2,所以執(zhí)行xFatalError("Sink and subscription were already set")程序中止運行讲冠。

case3 - 先執(zhí)行過dispose,然后第一次執(zhí)行

(1)由于執(zhí)行過dispose 方法瓜客,所以_state的值為1。

(2)通過Atmoic方法(也就是OSAtomicOr32OrigBarrier)方法,將_state的值更新為2谱仪,并且返回值previousState為1玻熙。

(3)previousState和DisposeStateInt32. sinkAndSubscriptionSet.rawValue做邏輯與運算,得值為0所以不執(zhí)行if里面的邏輯疯攒。

(4)previousState和DisposeStateInt32.disposed.rawValue 做邏輯與運算嗦随,的值為1,所以執(zhí)行if內(nèi)的操作敬尺,將sink和subscription分別執(zhí)行dispose操作枚尼,并且將兩個私有變量置nil,打破引用環(huán)。

我們可以看到砂吞,通過一個_state和OSAtomic的方法署恍,RxSwift非常優(yōu)雅的解決了上述三種場景,非常值得借鑒蜻直。而本類中的dispose方法其實也是類似的處理方法盯质,來保證只有一次有效的dispose操作,本文就不再贅述

Observer

接下來我們來講講RxSwift中的另外一個角色概而,Observer(觀察者),這次我們從觀察者的基類ObserverBase談起:

ObserverBase是一個遵守了Disposable和ObserverType協(xié)議的一個抽象類呼巷,實現(xiàn)了on和dispose。值得注意的是赎瑰,在ObserverBase中有一個私有變量:

接下來我們來講講RxSwift中的另外一個角色王悍,Observer(觀察者),這次我們從觀察者的基類ObserverBase談起:

ObserverBase是一個遵守了Disposable和ObserverType協(xié)議的一個抽象類,實現(xiàn)了on和dispose餐曼。值得注意的是压储,在ObserverBase中有一個私有變量:

private var _isStopped: AtomicInt = 0

_isStopped是一個哨兵,用來標記所觀察的序列是否已經(jīng)停止了晋辆,那么什么時候需要標記為Stop呢渠脉?我們來看這段代碼:

func on(_ event: Event<E>) {
    switch event {
    case .next:
        if _isStopped == 0 {
            onCore(event)
        }
    case .error, .completed:
        if !AtomicCompareAndSwap(0, 1, &_isStopped) {
            return
        }
        onCore(event)
    }
}

只要_isStopped不為0,那么就允許“發(fā)射”.next事件瓶佳,也就是執(zhí)行onCore方法芋膘。
當?shù)谝淮巍鞍l(fā)射”.error或者.completed時,執(zhí)行一次onCore霸饲,并且將_isStopped設(shè)為1为朋。
因為所有的Observer類在事件發(fā)射的邏輯上面都相同,所以統(tǒng)一在ObserverBase中作了處理厚脉,這也是典型的OOP思想习寸。老鐵,沒毛病~

值得一提的是傻工,我們可以看到這里使用了一個AtomicCompareAndSwap的方法霞溪,這個方法是做什么的呢孵滞?在Platform.Darwin.swift中,我們可以看到關(guān)于這個方法的定義:

typealias AtomicInt = Int32

    let AtomicCompareAndSwap = OSAtomicCompareAndSwap32Barrier
    let AtomicIncrement = OSAtomicIncrement32Barrier
    let AtomicDecrement = OSAtomicDecrement32Barrier

我們可以看到鸯匹,AtomicCompareAndSwap其實就是OSAtomic庫中所定義的一個全局方法:

/*! @abstract Compare and swap for 32-bit values with barrier.
    @discussion
    This function compares the value in <code>__oldValue</code> to the value
    in the memory location referenced by <code>__theValue</code>.  If the values
    match, this function stores the value from <code>__newValue</code> into
    that memory location atomically.

    This function is equivalent to {@link OSAtomicCompareAndSwap32}
    except that it also introduces a barrier.
    @result Returns TRUE on a match, FALSE otherwise.
 */
@available(iOS 2.0, *)
@available(iOS, deprecated: 10.0, message: "Use atomic_compare_exchange_strong() from <stdatomic.h> instead")
public func OSAtomicCompareAndSwap32Barrier(_ __oldValue: Int32, _ __newValue: Int32, _ __theValue: UnsafeMutablePointer<Int32>!) -> Bool

簡單的來說坊饶,該方法傳入三個參數(shù):__oldValue,__newValue和__theValue殴蓬,前兩個參數(shù)都是Int32類型的匿级,后一個是UnsafeMutablePointer<Int32>的可變指針。當__oldValue的值和指針所指向的內(nèi)存地址的變量的值相等時染厅,返回true否則為false,于此同時痘绎,如果__newValue和當前的值不相等,那么就賦值肖粮,使得__theValue的值為新值孤页。偽代碼如下:

f (*pointer == oldvalue) {  
    *pointer = newvalue;  
    return 1;  
} else {  
    return 0;  
} 
為了達到最佳性能,編譯器通常會對匯編基本的指令進行重新排序來盡可能保持處理器的指令流水線尿赚。作為優(yōu)化的一部分散庶,編譯器有可能對訪問主內(nèi)存的指令蕉堰,如果它認為這有可能產(chǎn)生不正確的數(shù)據(jù)時凌净,將會對指令進行重新排序。不幸的是屋讶,靠編譯器檢測到所有可能內(nèi)存依賴的操作幾乎總是不太可能的冰寻。如果看似獨立的變量實際上是相互影響,那么編譯器優(yōu)化有可能把這些變量更新位錯誤的順序皿渗,導(dǎo)致潛在不不正確結(jié)果斩芭。
內(nèi)存屏障(memory barrier)是一個使用來確保內(nèi)存操作按照正確的順序工作的非阻塞的同步工具。內(nèi)存屏障的作用就像一個柵欄乐疆,迫使處理器來完成位于障礙前面的任何加載和存儲操作划乖,才允許它執(zhí)行位于屏障之后的加載和存儲操作。內(nèi)存屏障同樣使用來確保一個線程(但對另外一個線程可見)的內(nèi)存操作總是按照預(yù)定的順序完成挤土。如果在這些地方缺少內(nèi)存屏障有可能讓其他線程看到看似不可能的結(jié)果琴庵。為了使用一個內(nèi)存屏障,你只要在你代碼里面需要的地方簡單的調(diào)用OSMemoryBarrier函數(shù)仰美。
####匿名觀察者
看完了ObserverBase現(xiàn)在我們來看一下AnonymousObserver:

final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
typealias Element = ElementType

typealias EventHandler = (Event<Element>) -> Void

private let _eventHandler : EventHandler

init(_ eventHandler: @escaping EventHandler) {

if TRACE_RESOURCES

    let _ = Resources.incrementTotal()

endif

    _eventHandler = eventHandler
}

override func onCore(_ event: Event<Element>) {
    return _eventHandler(event)
}

if TRACE_RESOURCES

deinit {
    let _ = Resources.decrementTotal()
}

endif

}

我們可以看到迷殿,在這個匿名觀察者中,它主要做的事情就是將基類ObserverBase所沒有實現(xiàn)的onCore方法實現(xiàn)了咖杂,將觀察者構(gòu)造方法時傳入的EventHandler在onCore方法中執(zhí)行庆寺。這也就是觀察者受到序列事件的動作。
####訂閱過程
在我們對Observable诉字、Observer和Disposeable有了一定的認知之后懦尝,我們可以來認識一下最為關(guān)鍵的一步知纷,subscribe也就是訂閱。

在ObservableType+Extensions.swift中我們可以看到相關(guān)的實現(xiàn):

/**
Subscribes an event handler to an observable sequence.

- parameter on: Action to invoke for each event in the observable sequence.
- returns: Subscription object used to unsubscribe from the observable sequence.
*/
public func subscribe(_ on: @escaping (Event<E>) -> Void)
    -> Disposable {
    let observer = AnonymousObserver { e in
        on(e)
    }
    return self.subscribeSafe(observer)
}
所謂的subscribe其實只是做了兩個事情陵霉。首先是構(gòu)造了一個匿名觀察者屈扎,將on也就是(Event<E>) -> Void類型的閉包作為參數(shù),每次在匿名觀察者有新的事件的時候調(diào)用撩匕,這里也用到了尾隨閉包的語法糖鹰晨,提高閱讀性。其次止毕,將剛剛構(gòu)造的匿名觀察者模蜡,通過subscribeSafe函數(shù)來完成訂閱。那么subscribeSafe究竟做了一些什么事情呢?

extension ObservableType {
func subscribeSafe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
return self.asObservable().subscribe(observer)
}
}

subscribeSafe是一個內(nèi)部的方法扁凛,所有內(nèi)部的訂閱操作全部通過該方法來完成忍疾,一般最后都是通過subscribe方法的多態(tài)性來完成最終的訂閱,那么回想一下之前Just的subscribe方法我們就可以知道谨朝,一旦調(diào)用subscribe方法卤妒,Just立刻給匿名觀察者發(fā)送一個包裹了初始值的.next事件和一個.completed事件,最后返回一個NopDisposable類型的“存根”,NopDisposable是一個在執(zhí)行dispose操作時不進行任何操作的存根字币。然后整個訂閱過程就結(jié)束了则披。

DisposeBag

對于開頭的代碼,我們現(xiàn)在唯一還沒有講到的就是addDisposableTo這個方法洗出,我們都知道士复,當一個序列執(zhí)行subscribe之后我們會得到一個遵守Disposable的存根。那么根據(jù)方法名翩活,我們也可以猜到這個一個將存根添加到一個地方的方法阱洪,那么它是要將存根添加到哪里呢?

沒錯菠镇!就是我們天天在寫的DisposeBag冗荸。在DisposeBag.swift中我們可以找到該方法的定義:

extension Disposable {
    /// Adds `self` to `bag`.
    ///
    /// - parameter bag: `DisposeBag` to add `self` to.
    public func addDisposableTo(_ bag: DisposeBag) {
        bag.insert(self)
    }
}

那么DisposeBag到底是個什么東西呢?talk is cheap,我們直接來看源碼:

public final class DisposeBag: DisposeBase {
    
    private var _lock = SpinLock()
    
    // state
    private var _disposables = [Disposable]()
    private var _isDisposed = false
    
    /// Constructs new empty dispose bag.
    public override init() {
        super.init()
    }
    
    /// Adds `disposable` to be disposed when dispose bag is being deinited.
    ///
    /// - parameter disposable: Disposable to add.
    public func insert(_ disposable: Disposable) {
        _insert(disposable)?.dispose()
    }
    
    private func _insert(_ disposable: Disposable) -> Disposable? {
        _lock.lock(); defer { _lock.unlock() }
        if _isDisposed {
            return disposable
        }

        _disposables.append(disposable)

        return nil
    }

    /// This is internal on purpose, take a look at `CompositeDisposable` instead.
    private func dispose() {
        let oldDisposables = _dispose()

        for disposable in oldDisposables {
            disposable.dispose()
        }
    }

    private func _dispose() -> [Disposable] {
        _lock.lock(); defer { _lock.unlock() }

        let disposables = _disposables
        
        _disposables.removeAll(keepingCapacity: false)
        _isDisposed = true
        
        return disposables
    }
    
    deinit {
        dispose()
    }
}

其實DisposeBag這個類設(shè)計的還是非常的簡單明了的利耍,暴露給外部的只有一個insert方法,將需要被管理的Dispose交給這個Bag蚌本,當該Bag執(zhí)行deinit方法的時候執(zhí)行dispose,將所持有的所有Disposable遍歷一遍,同時挨個dispose,值得注意的是堂竟,該類內(nèi)部使用了一個鎖:

    private var _lock = SpinLock()

這個SpinLock其實就是一個NSRecursiveLock的遞歸鎖魂毁,該??的作用就是為了保證_disposables的數(shù)組線程安全,之所以用遞歸鎖是因為有可能會出現(xiàn)在相同的線程多次調(diào)用insert的而引發(fā)死鎖出嘹。

正常情況下席楚,執(zhí)行insert方法,首先會執(zhí)行加鎖操作税稼,然后Bag會將該Disposable加入到_disposables這個數(shù)組中烦秩,最后解鎖垮斯。但是還有一種情況,那就是當執(zhí)行insert操作的時候只祠,該Bag已經(jīng)被析構(gòu)了兜蠕,那么我們就不需要再將其加入數(shù)組,直接將該Disposable釋放掉就可以了抛寝。

QA

當一個序列構(gòu)造完畢的之后熊杨,調(diào)用subscribe方法會進行SubscribeHandler,也就是進行訂閱的相關(guān)操作。具體來說盗舰,對于Just這個序列晶府,SubscribeHandler指的是就是發(fā)送一個.next(element)事件和一個.completed事件;對于NeverProducer這個序列钻趋,SubscribeHandler指的是單單只發(fā)送一個.completed事件川陆;所以對于不同的SubscribeHandler會有不同的訂閱操作,總的來說是根據(jù)序列的特性來發(fā)送給觀察者不同的事件流蛮位。

值得一提的是在RxSwift中還有一個很重要的概念Sink较沪,關(guān)于它的解釋可以參考一下這個issueSink相當與一個加工者失仁,可以將源事件流轉(zhuǎn)換成一個新的事件流尸曼,如果講事件流比作水流,事件的傳遞過程比作水管陶因,那么Sink就相當于水管中的一個轉(zhuǎn)換頭骡苞。關(guān)于Sink我們會在之后的文章中詳細的講述垂蜗。

我們分析了在RxSwift中的整個訂閱流程楷扬。在開講變換操作之前,首先要弄清楚Sink的概念贴见,不清楚的同學(xué)可以翻看上一篇的分析烘苹。簡單的來說,在每一次訂閱操作之前都會進行一次Sink對流的操作片部。如果把Rx中的流當做水镣衡,那么Sink就相當于每個水管水龍頭的濾網(wǎng),在出水之前進行最后的加工档悠。

override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            // The returned disposable needs to release all references once it was disposed.
            let disposer = SinkDisposer()
            let sinkAndSubscription = run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }

通過上面的源碼我們可以發(fā)現(xiàn)廊鸥,每當一個Observable被訂閱,那么該Observable一定會執(zhí)行run方法辖所,而run方法中做的事情就是Sink的相關(guān)處理操作惰说。

簡單的來說Sink主要做兩件事情:

對Next、Complete缘回、Error事件的轉(zhuǎn)發(fā);

對流轉(zhuǎn)發(fā)之前的預(yù)先變化吆视。

而我們的變換操作基本上都是在各種各樣的Sink中操作的典挑,為什么說是基本上呢?因為在一些高階變化(嵌套變換)的情況之下啦吧,Sink并不是發(fā)生變換的地方您觉,具體的情況在下文會慢慢說到。

例子

Observable.of(1, 2, 3)
    .map { $0 * $0 }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

我們可以在map方法之上卡一個斷點授滓,程序運行之后我們可以看到停在了下面的方法定義

extension ObservableType {
    public func map<R>(_ transform: @escaping (E) throws -> R)
        -> Observable<R> {
        return self.asObservable().composeMap(transform)
    }
}

我們可以看到琳水,這里做了兩件事情,首先確保把調(diào)用者轉(zhuǎn)化成Observable般堆,因為符合ObservableType的對象有可能是ControlEvent炫刷,ControlProperty之類的東西。然后調(diào)用composeMap方法郁妈,將我們所期望的變換操作的閉包傳入浑玛。
OK,我們再進一層噩咪,來看看composeMap做了什么:

internal func composeMap<R>(_ transform: @escaping (Element) throws -> R) -> Observable<R> {
        return _map(source: self, transform: transform)
    }

我們可以看到顾彰,在這里Observable調(diào)用了自身的_map私有方法:

internal func _map<Element, R>(source: Observable<Element>, transform: @escaping (Element) throws -> R) -> Observable<R> {
    return Map(source: source, transform: transform)
}
final fileprivate class Map<SourceType, ResultType>: Producer<ResultType> {
    typealias Transform = (SourceType) throws -> ResultType
    private let _source: Observable<SourceType>
    private let _transform: Transform

    init(source: Observable<SourceType>, transform: @escaping Transform) {
        _source = source
        _transform = transform
    }
    override func composeMap<R>(_ selector: @escaping (ResultType) throws -> R) -> Observable<R> {
        let originalSelector = _transform
        return Map<SourceType, R>(source: _source, transform: { (s: SourceType) throws -> R in
            let r: ResultType = try originalSelector(s)
            return try selector(r)
        })
    }
    
    override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == ResultType {
        let sink = MapSink(transform: _transform, observer: observer, cancel: cancel)
        let subscription = _source.subscribe(sink)
        return (sink: sink, subscription: subscription)
    }
}

我們可以看到,所謂的_map實際上又返回了一個基于Producer類(Producer繼承自O(shè)bservable胃碾,而Observable類中又是最開始定義composeMap的地方涨享,這個集成鏈對于接下來的理解很重要)的Map對象。這里主要做了三件事情:

(1)首先把通過構(gòu)造器把“可觀察序列”和“變換操作”保存起來備用仆百。

(2)重寫父類的composeMap厕隧,從原來的直接使用傳入的“變換操

作”(transform)構(gòu)造Map對象變成了先使用Map對象自帶的“變換操作”進行一次變換,再使用傳入的“變換操作”進行一次變換俄周。這樣的遞歸處理方式就可以達到嵌套處理map操作的目的吁讨,就像這樣:Observable<Int>.of(1, 3, 5).map(+).map(+)。

(3)重寫父類的run方法峦朗,就像前文中說的那樣建丧,run方法會在訂閱之前執(zhí)行,并且使用各類的Sink在傳遞數(shù)據(jù)時對“數(shù)據(jù)源”進行各類的加工處理波势。而在這個例子中翎朱,這個Sink就是MapSink,這個MapSink在每次的Next事件的時候,使用傳入的transform對數(shù)據(jù)源進行加工尺铣,然后再將加工后的數(shù)據(jù)源傳出拴曲。

至此所有的map操作已經(jīng)全部完成。我們可以看到凛忿,map的操作其實是“惰性”的澈灼,也就是說,當你使用了map操作除非你使用了嵌套map或者對觀察序列進行了訂閱侄非,否則他們都不會立刻執(zhí)行變換操作蕉汪。

生產(chǎn)者-消費者模式

在RxSwift的設(shè)計實現(xiàn)過程中流译,其實也是對生產(chǎn)者-消費者模式(Producer–consumer pattern)實際應(yīng)用。在RxSwift中者疤,所有的可觀察序列都充當著生產(chǎn)者的作用福澡,所以我們可以變換操作最后返回的都是一個繼承自Producer類的一個子類(除了一些Subject,Subject比較特殊驹马,之后會好好討論一下)革砸。

生產(chǎn)者繼承概覽.png

上面的腦圖大概展示了Producer所派生的子類,我們可以看到糯累,無論是我們常用的“初始化”方法:just算利、of、from,還是我們常用的變換方法:map泳姐,flatMap,merge效拭,他們所對應(yīng)的實現(xiàn)都是一種Producer。

我們可以看到胖秒,也正是得益于生產(chǎn)者-消費者模式的實現(xiàn)缎患,使得RxSwift在可觀察序列如同工廠里的流水線一樣,可以在每一條流水線結(jié)束之前進行自定義的加工阎肝。

總結(jié)

接下來我們可以俯瞰一下RxSwift對于事件變換的操作挤渔,以下做一些邏輯上的梳理工作,希望大家可以看的更加清楚

1. 協(xié)議拓展
從一個協(xié)議開始风题。 ---- WWDC 2015

我們知道判导,RxSwift的可觀察序列都是基于ObservableType,所以當我們需要給所有的可觀察序列添加一個變換操作的時候沛硅,我們只需要通過extension來添加一個公開的方法眼刃,然后去實現(xiàn)它。

public func map<R>(_ transform: @escaping (E) throws -> R)
        -> Observable<R> {
        return self.asObservable().composeMap(transform)
    }

    public func flatMap<O: ObservableConvertibleType>(_ selector: @escaping (E) throws -> O)
        -> Observable<O.E> {
            return FlatMap(source: asObservable(), selector: selector)
    }

    public func concat<O: ObservableConvertibleType>(_ second: O) -> Observable<E> where O.E == E {
        return Observable.concat([self.asObservable(), second.asObservable()])
    }

    public static func combineLatest<O1: ObservableType, O2: ObservableType>
        (_ source1: O1, _ source2: O2)
            -> Observable<(O1.E, O2.E)> {
        return CombineLatest2(
            source1: source1.asObservable(), source2: source2.asObservable(),
            resultSelector: { ($0, $1) }
        )
    }

    // More and more ....
}

上面我所列出來的代碼是我為了集中展示所以放在同一個extension中稽鞭,在實際的源碼中他們都是分散在不同的swift文件中的鸟整。所以我們知道,所有我們所使用的變換操作朦蕴,都是通過extension拓展到ObservableType協(xié)議當中的。

通過翻看源碼我們可以看到弟头,上述的變換操作其實都做了一件事情吩抓,那就是返回一個Producer的具體子類。比如map返回的是Map類的實例對象赴恨,combineLatest返回的是CombineLatest2類的實例對象疹娶。

2. 具象化的Producer

那么通過拓展方法所返回的Producer的子類又是做了一些什么事情呢?

首先伦连,具象化的Producer一定會重寫override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Accumulate方法雨饺,在該方法中钳垮,RxSwift通過具象化的Sink來對數(shù)據(jù)源進行處理,然后讓源可觀察序列執(zhí)行訂閱额港。

其次饺窿,Producer在初始化的時候會至少接收兩個參數(shù):一個參數(shù)是所傳遞的可觀察序列,另外一個參數(shù)是所進行變換操作的閉包移斩。當然肚医,有些變換操作可能由于操作的特性而需要三個的參數(shù)。比如Scan操作向瓷,不僅僅需要閉包accumulator,而且還需要一個seed肠套,這也是由Scan操作的特性所決定了,在這里不多加贅述猖任。當Producer保存了這些變換所必要的參數(shù)之后你稚,在run方法中的sink就能夠在訂閱輸出之前執(zhí)行這些變換,然后輸出給訂閱者了朱躺。

值得注意的是入宦,由于run方法和subscribe方法之間的遞歸調(diào)用,所以這樣的實現(xiàn)模式也天然的支持嵌套的變換操作

3. "苦力"Sink

所以變換的閉包的執(zhí)行都是在各類的Sink當中室琢,比如MapSink:

func on(_ event: Event<SourceType>) {
switch event {
case .next(let element):
do {
/// 進行變換操作
let mappedElement = try _selector(element, try incrementChecked(&_index))
/// 將變換操作之后的事件轉(zhuǎn)發(fā)給原來的觀察者
forwardOn(.next(mappedElement))
}
catch let e {
forwardOn(.error(e))
dispose()
}
case .error(let error):
forwardOn(.error(error))
dispose()
case .completed:
forwardOn(.completed)
dispose()
}
}

我們可以看到乾闰,在這里我們終于進行了變換操作,并且變換操作之后將結(jié)果轉(zhuǎn)發(fā)給了觀察者盈滴。

至此涯肩,整條變換鏈都轉(zhuǎn)換完畢。

設(shè)計的遺憾

在composeMap的定義方法之上巢钓,我們可以看到如下的一段注釋:

    // this is kind of ugly I know :(
    // Swift compiler reports "Not supported yet" when trying to override protocol extensions, so ˉ\_(ツ)_/ˉ
    /// Optimizations for map operator

在上一節(jié)的總結(jié)中我們知道病苗,在RxSwift中的變換操作的嵌套是通過run方法和subscribe方法的遞歸調(diào)用來解決的。但是這里存在問題症汹,比如硫朦,當你嵌套10個map方法的時候,每次發(fā)生onNext都會導(dǎo)致10次的變換操作的遞歸調(diào)用背镇,然后再生成最后的值傳遞給訂閱者咬展。用簡單的函數(shù)式的表達就像這樣:

10(+1)(+1)(+1)(+1)(+1)(+1)(+1)(+1)(+1)(+1) = 20

那么,我們?yōu)槭裁床豢梢灾苯舆@樣呢瞒斩?

10(+10) = 20

基于這樣的考慮破婆,我們可以看到map的默認實現(xiàn)比較特殊,它并不是直接返回一個Map對象胸囱,而是通過composeMap返回一個Map對象祷舀,然后再在Map對象中重寫composeMap以達到當發(fā)生嵌套調(diào)用的時候可以優(yōu)化函數(shù)式調(diào)用

final fileprivate class Map<SourceType, ResultType>: Producer<ResultType> {

    // ....

    override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == ResultType {
        let sink = MapSink(transform: _transform, observer: observer, cancel: cancel)
        let subscription = _source.subscribe(sink)
        return (sink: sink, subscription: subscription)
    }
}

也正是為了這樣的一個優(yōu)化,導(dǎo)致似乎看起來很ugly,這也是設(shè)計上的遺憾吧裳扯。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末抛丽,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子饰豺,更是在濱河造成了極大的恐慌亿鲜,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,635評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件哟忍,死亡現(xiàn)場離奇詭異狡门,居然都是意外死亡,警方通過查閱死者的電腦和手機锅很,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,543評論 3 399
  • 文/潘曉璐 我一進店門其馏,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人爆安,你說我怎么就攤上這事叛复。” “怎么了扔仓?”我有些...
    開封第一講書人閱讀 168,083評論 0 360
  • 文/不壞的土叔 我叫張陵褐奥,是天一觀的道長。 經(jīng)常有香客問我翘簇,道長撬码,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,640評論 1 296
  • 正文 為了忘掉前任版保,我火速辦了婚禮呜笑,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘彻犁。我一直安慰自己叫胁,他們只是感情好,可當我...
    茶點故事閱讀 68,640評論 6 397
  • 文/花漫 我一把揭開白布汞幢。 她就那樣靜靜地躺著驼鹅,像睡著了一般。 火紅的嫁衣襯著肌膚如雪森篷。 梳的紋絲不亂的頭發(fā)上输钩,一...
    開封第一講書人閱讀 52,262評論 1 308
  • 那天,我揣著相機與錄音疾宏,去河邊找鬼张足。 笑死,一個胖子當著我的面吹牛坎藐,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播,決...
    沈念sama閱讀 40,833評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼岩馍,長吁一口氣:“原來是場噩夢啊……” “哼碉咆!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起蛀恩,我...
    開封第一講書人閱讀 39,736評論 0 276
  • 序言:老撾萬榮一對情侶失蹤疫铜,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后双谆,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體壳咕,經(jīng)...
    沈念sama閱讀 46,280評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,369評論 3 340
  • 正文 我和宋清朗相戀三年顽馋,在試婚紗的時候發(fā)現(xiàn)自己被綠了谓厘。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,503評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡寸谜,死狀恐怖竟稳,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情熊痴,我是刑警寧澤他爸,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布铃在,位于F島的核電站瓜饥,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏产舞。R本人自食惡果不足惜巾陕,卻給世界環(huán)境...
    茶點故事閱讀 41,870評論 3 333
  • 文/蒙蒙 一讨跟、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧惜论,春花似錦许赃、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,340評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至乾巧,卻和暖如春句喜,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背沟于。 一陣腳步聲響...
    開封第一講書人閱讀 33,460評論 1 272
  • 我被黑心中介騙來泰國打工咳胃, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人旷太。 一個月前我還...
    沈念sama閱讀 48,909評論 3 376
  • 正文 我出身青樓展懈,卻偏偏與公主長得像销睁,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子存崖,可洞房花燭夜當晚...
    茶點故事閱讀 45,512評論 2 359

推薦閱讀更多精彩內(nèi)容