04. RxSwift源碼解讀:Subject

今天帶大家解讀下Subject相關(guān)類的源碼陵珍。
在我們之前講過的類中寝杖,有些類是觀察者,有些是被觀察者互纯,今天要說的Subject比較特殊瑟幕,它既是觀察者,又是被觀察者留潦,兼具兩者的特性只盹。

Subject相關(guān)類包括:PublishSubject, BehaviorSubject, AsyncSubject, ReplaySubject

還是先了解基本用法兔院,請看下面的例子:

    private let bag = DisposeBag()
    private let subject = PublishSubject<Int>()
    override func viewDidLoad() {
        super.viewDidLoad()
        subject.onNext(1)
        subject.subscribe(onNext: { ele in
            print("First ->",ele)
        })
        .disposed(by: bag)
        subject.onNext(2)
        subject.subscribe(onNext: { ele in
            print("Second ->",ele)
        })
        .disposed(by: bag)
        subject.onNext(3)
}

先定義一個subject = PublishSubject<Int>(), subject既可以發(fā)送序列殖卑,也可以訂閱序列,它先后發(fā)送1秆乳,2懦鼠,3,同時訂閱兩次屹堰,打印結(jié)果為:

First -> 2
First -> 3
Second -> 3

我們看到1沒有打印肛冶,2打印一次 3打印兩次,為什么呢扯键?
PublishSubject只會接受訂閱之后的元素睦袖,1在訂閱之前發(fā)送,所以無法接受到元素荣刑。2在第一次訂閱之后發(fā)送馅笙,所以收到第一次訂閱的元素,3在兩次訂閱之后發(fā)送厉亏,所以兩次訂閱都接受到元素董习;我們通過源碼分析下內(nèi)部原理:

PublishSubject

因為Subject既是觀察者又是被觀察者,所以我們看看它的類的定義:

public final class PublishSubject<Element>
    : Observable<Element>
    , SubjectType
    , Cancelable
    , ObserverType
    , SynchronizedUnsubscribeType

其他3種Subject也是一樣:首先繼承了Observable爱只,然后遵循了ObserverType皿淋,同時又遵循Cancelable,所以不是兩者是三者兼具恬试。另外還遵循了SubjectType窝趣,SubjectType繼承了ObservableType,它有個協(xié)議方法asObserver(), 返回一個ObserverType, 一般返回自身對象:

    /// Returns observer interface for subject.
    public func asObserver() -> PublishSubject<Element> {
        self
    }

PublishSubject作為Observer训柴,實現(xiàn)了on協(xié)議方法, 我們知道當observer調(diào)用onNext等方法時哑舒,會調(diào)用on方法:

    /// Notifies all subscribed observers about next event.
    ///
    /// - parameter event: Event to send to the observers.
    public func on(_ event: Event<Element>) {
        #if DEBUG
            self.synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self.synchronizationTracker.unregister() }
        #endif
        dispatch(self.synchronized_on(event), event)
    }

synchronizationTracker.registersynchronizationTracker.unregister, 這兩個是為了保證不會出現(xiàn)重入問題,即在當前on方法執(zhí)行完成前幻馁,又執(zhí)行了on方法洗鸵,這會導(dǎo)致循環(huán)調(diào)用。
然后調(diào)用了synchronized_on:

    func synchronized_on(_ event: Event<Element>) -> Observers {
        self.lock.lock(); defer { self.lock.unlock() }
        switch event {
        case .next:
            if self.isDisposed || self.stopped {
                return Observers()
            }
            
            return self.observers
        case .completed, .error:
            if self.stoppedEvent == nil {
                self.stoppedEvent = event
                self.stopped = true
                let observers = self.observers
                self.observers.removeAll()
                return observers
            }

            return Observers()
        }
    }

這里是線程安全的宣赔,而且用的遞歸鎖實現(xiàn)的预麸,我想可能是為了保證鎖能嵌套,因為可以會出現(xiàn)方法嵌套調(diào)用儒将。
如果未釋放資源吏祸,或者未停止,則直接返回一個Observers()钩蚊,否則返回當前observers贡翘,這個Observers是重點,它其實是一個Bag:Bag<(Event<Element>) -> Void>砰逻, item類型是一個閉包鸣驱。這個閉包是發(fā)送元素的閉包,Bag內(nèi)部保存了這些閉包蝠咆。Bag的內(nèi)部實現(xiàn)其實是這樣的:將第一個元素放在_key0 和 _value0中踊东,key是通過自增的方式產(chǎn)生唯一的key北滥。如果超過一個元素 則將其他元素存放在數(shù)組中,數(shù)組最大空間為30闸翅,超過30個元素則將多余的放在字典里再芋,所以這個bag內(nèi)部有一個數(shù)組和一個字典,這樣設(shè)計的好處是當元素個數(shù)較少時(少于30)坚冀,數(shù)組的查找效率不會低济赎,同時空間使用率較高,超過30個數(shù)組查找效率會變低適合用字典记某,而字典的查找效率可以達到O1的效率司训,但空間使用率較低,所以才這么設(shè)計液南,大家可以自行查看Bag結(jié)構(gòu)體的源碼外驱。我們可以把observers當成一個觀察者容器制妄,存放了所有觀察者跨晴。

所以返回Observers(),相當于創(chuàng)建一個新Bag對象标锄,所以內(nèi)部沒有元素。

當synchronized_on 返回observers后譬涡,接著執(zhí)行dispatch方法:

func dispatch<Element>(_ bag: Bag<(Event<Element>) -> Void>, _ event: Event<Element>) {
    bag._value0?(event)

    if bag._onlyFastPath {
        return
    }

    let pairs = bag._pairs
    for i in 0 ..< pairs.count {
        pairs[i].value(event)
    }

    if let dictionary = bag._dictionary {
        for element in dictionary.values {
            element(event)
        }
    }
}

這個方法目的就取出所有閉包闪幽,并執(zhí)行這些閉包,相當于發(fā)送序列涡匀。
這里先執(zhí)行第一個元素的閉包(如果有的話)盯腌,_onlyFastPath表示當元素只有一個時,則_onlyFastPath等于true陨瘩,就是說當只有一個元素時腕够,后面無需再執(zhí)行了,有多個元素時還需要遍歷數(shù)組和字典執(zhí)行閉包舌劳,發(fā)出序列帚湘。
如果事件類型是.completed, .error, 那么將stopped 設(shè)置 true, stoppedEvent設(shè)置為event甚淡, 返回當前observers并清空observers大诸,后續(xù)當然無法再接收事件.
stoppedEvent 設(shè)置完后,后面再訂閱時會將這個event發(fā)出去贯卦,這個event一定是error或completed资柔,我們可以看看訂閱的代碼:

      func synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        if let stoppedEvent = self.stoppedEvent {
            observer.on(stoppedEvent)
            return Disposables.create()
        }
        
        if self.isDisposed {
            observer.on(.error(RxError.disposed(object: self)))
            return Disposables.create()
        }
        
        let key = self.observers.insert(observer.on)
        return SubscriptionDisposable(owner: self, key: key)
    }

如果已經(jīng)釋放資源了,則發(fā)送error.
否則將observer的on方法放入observers撵割,緩存起來贿堰。等到發(fā)送序列時再把所有緩存的Observer發(fā)出去;每訂閱一次就會緩存一個啡彬。這就是為什么在訂閱之前發(fā)送序列無效羹与,因為沒有在緩存中故硅,只要在發(fā)送序列之前訂閱了,怎么這些訂閱都會收到這個序列纵搁。

然后再看看釋放資源的代碼:

    public func dispose() {
        self.lock.performLocked { self.synchronized_dispose() }
    }

    final func synchronized_dispose() {
        self.disposed = true
        self.observers.removeAll()
        self.stoppedEvent = nil
    }

很簡單先加鎖契吉,然后移除所有observers, disposed= true, stoppedEvent = nil;
所以釋放資源后,所有觀察者無法在接收到序列了诡渴。

現(xiàn)在試著在例子中加入error和complete事件:

        subject.onNext(1)
        subject.subscribe(onNext: { ele in
            print("First ->",ele)
        }, onError: { error in
            print(error)
        }, onCompleted: {
            print("complete")
        })
        .disposed(by: bag)
        subject.onNext(2)
        subject.subscribe(onNext: { ele in
            print("Second ->",ele)
        }, onError: { error in
            print(error)
        }, onCompleted: {
            print("complete")
        }).disposed(by: bag)
        
        subject.onNext(3)
        subject.onError(RxError.unknown)
        subject.onCompleted()
        subject.subscribe(onNext: { ele in
            print("Third ->",ele)
        }, onError: { error in
            print(error)
        }, onCompleted: {
            print("complete")
        }).disposed(by: bag)

打印出:

First -> 2
First -> 3
Second -> 3
Unknown error occurred.
Unknown error occurred.
Unknown error occurred.

當訂閱error時,也會將發(fā)送序列的閉包緩存到observers中菲语,這樣在調(diào)用onError時妄辩,兩個觀察者都會收到error事件,所以打印前兩條error山上,之后在發(fā)送onCompleted事件時眼耀,因為在發(fā)送onError時會清空observers,所以這個時候觀察者什么都收不到佩憾,之后又訂閱了一次哮伟,這時因為stoppedEvent != nil, 所以直接調(diào)用on發(fā)送之前的error事件,所以會再打印一次error妄帘。

BehaviorSubject

BehaviorSubject的代碼跟PublishSubject基本相同楞黄,只是BehaviorSubject會保存最后一次發(fā)送的元素。

 private var element: Element

BehaviorSubject初始化時抡驼,會給這個element初始化鬼廓。然后訂閱時插入observer.on的同時會把這個element發(fā)出去。

        let key = self.observers.insert(observer.on)
        observer.on(.next(self.element))

發(fā)送onNext時會更新這個element致盟。

         case .next(let element):
            self.element = element

AsyncSubject

AsyncSubject會發(fā)出Observable發(fā)出的最后一個值(也僅是最后一個值)碎税,并且只有在這個Observable完成之后;(如果Observable沒有發(fā)出任何值馏锡,那么AsyncSubject也會在沒有發(fā)出任何值的情況下完成雷蹂。),完成事件發(fā)出后杯道,后續(xù)無法再接受事件匪煌,error事件也算完成事件。
考慮下面的例子:

subject.onNext(1)
        subject.subscribe(onNext: { ele in
            print("First ->",ele)
        }, onError: { error in
            print("First ->", error)
        }, onCompleted: {
            print("complete")
        })
        .disposed(by: bag)
        subject.onNext(2)
        subject.subscribe(onNext: { ele in
            print("Second ->",ele)
        }, onError: { error in
            print("Second ->", error)
        }, onCompleted: {
            print("complete")
        }).disposed(by: bag)
        
        subject.onNext(3)
//        subject.onError(RxError.unknown)
        subject.onCompleted()
        
        subject.subscribe(onNext: { ele in
            print("Third ->",ele)
        }, onError: { error in
            print("Third ->", error)
        }, onCompleted: {
            print("complete")
        }).disposed(by: bag)

這時候只會發(fā)出3党巾,同時complete事件也會發(fā)出虐杯,打印結(jié)果:

First -> 3
Second -> 3
complete
complete
Third -> 3
complete

onCompleted之后再次訂閱,會收到最后的元素和完成事件昧港。
如果打開// subject.onError(RxError.unknown)的注釋擎椰,則只會打印error事件:

First -> Unknown error occurred.
Second -> Unknown error occurred.
Third -> Unknown error occurred.

error之后,事件已經(jīng)完成创肥,再發(fā)送onCompleted則無效达舒,所以不會打印complete值朋。
有一點跟BehaviorSubject很像就是在發(fā)出完成事件之后再訂閱依然可以收到最后一個元素和完成事件。
在源碼中subject保存了最后一個元素(lastElement)巩搏,發(fā)出error時昨登,會立刻向所有觀察者發(fā)出error事件,同時將stoppedEvent賦值為error事件贯底,之后再也無法發(fā)出事件:

        if self.isStopped {
            return (Observers(), .completed)
        }

如果沒有發(fā)出error丰辣, 而是發(fā)出了complete事件,怎么判斷是否有l(wèi)astElement禽捆,如果有則向所有觀察者發(fā)出最后一個元素和complete事件笙什,如果沒有l(wèi)astElement,怎么只發(fā)出complete事件胚想,之后清空觀察者琐凭。stoppedEvent賦值為最后一個next事件。
在訂閱的代碼中:

    func synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        if let stoppedEvent = self.stoppedEvent {
            switch stoppedEvent {
            case .next:
                observer.on(stoppedEvent)
                observer.on(.completed)
            case .completed:
                observer.on(stoppedEvent)
            case .error:
                observer.on(stoppedEvent)
            }
            return Disposables.create()
        }

        let key = self.observers.insert(observer.on)

        return SubscriptionDisposable(owner: self, key: key)
    }

如果已經(jīng)完成(前面已經(jīng)發(fā)出了error或complete)浊服,則發(fā)出最后一個error统屈,或最后一個元素和complete,或者只發(fā)出complete牙躺。
否則將觀察者插入到observers中愁憔。

ReplaySubject

我們把PublishSubject理解為沒有緩存的序列,BehaviorSuject緩存一個元素孽拷,而ReplaySubject可以緩存任意個元素惩淳,根據(jù)緩存策略,每項通知都會廣播給所有已訂閱和未來的觀察者乓搬。
實現(xiàn)方式是:ReplayOne緩存一個元素思犁,ReplayManyBase緩存多個元素,通過隊列維護這些元素进肯,發(fā)出緩存元素時激蹲,遍歷隊列發(fā)送元素。(先進先出)

    override func replayBuffer<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
        for item in self.queue {
            observer.on(.next(item))
        }
    }

當buffer的數(shù)量超出設(shè)置的數(shù)量時江掩,進行出隊操作

    override func trim() {
        while self.queue.count > self.bufferSize {
            _ = self.queue.dequeue()
        }
    }

ReplayManyBase繼承自ReplayBufferBase学辱,ReplayBufferBase中封裝了核心邏輯,發(fā)送數(shù)據(jù)時先緩存元素环形,再向觀察者發(fā)出元素策泣,訂閱時先重放緩存的元素,再保存觀察者抬吟。

總結(jié)

Suject因為既是Observable萨咕,ObserverType,同時又時Cancelable火本,核心邏輯基本都在一個類中危队,相對來說代碼比較簡單聪建。它實現(xiàn)了一個典型的觀察者模式(先保存所有的觀察者,發(fā)送事件時通知所有觀察者)茫陆。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末金麸,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子簿盅,更是在濱河造成了極大的恐慌挥下,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,635評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件桨醋,死亡現(xiàn)場離奇詭異棚瘟,居然都是意外死亡,警方通過查閱死者的電腦和手機讨盒,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,543評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來步责,“玉大人返顺,你說我怎么就攤上這事÷希” “怎么了遂鹊?”我有些...
    開封第一講書人閱讀 168,083評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長蔗包。 經(jīng)常有香客問我秉扑,道長,這世上最難降的妖魔是什么调限? 我笑而不...
    開封第一講書人閱讀 59,640評論 1 296
  • 正文 為了忘掉前任舟陆,我火速辦了婚禮,結(jié)果婚禮上耻矮,老公的妹妹穿的比我還像新娘秦躯。我一直安慰自己,他們只是感情好裆装,可當我...
    茶點故事閱讀 68,640評論 6 397
  • 文/花漫 我一把揭開白布踱承。 她就那樣靜靜地躺著,像睡著了一般哨免。 火紅的嫁衣襯著肌膚如雪茎活。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,262評論 1 308
  • 那天琢唾,我揣著相機與錄音载荔,去河邊找鬼。 笑死采桃,一個胖子當著我的面吹牛身辨,可吹牛的內(nèi)容都是我干的丐谋。 我是一名探鬼主播,決...
    沈念sama閱讀 40,833評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼煌珊,長吁一口氣:“原來是場噩夢啊……” “哼号俐!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起定庵,我...
    開封第一講書人閱讀 39,736評論 0 276
  • 序言:老撾萬榮一對情侶失蹤吏饿,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后蔬浙,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體猪落,經(jīng)...
    沈念sama閱讀 46,280評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,369評論 3 340
  • 正文 我和宋清朗相戀三年畴博,在試婚紗的時候發(fā)現(xiàn)自己被綠了笨忌。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,503評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡俱病,死狀恐怖官疲,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情亮隙,我是刑警寧澤途凫,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站溢吻,受9級特大地震影響维费,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜促王,卻給世界環(huán)境...
    茶點故事閱讀 41,870評論 3 333
  • 文/蒙蒙 一犀盟、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧蝇狼,春花似錦且蓬、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,340評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至豹障,卻和暖如春冯事,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背血公。 一陣腳步聲響...
    開封第一講書人閱讀 33,460評論 1 272
  • 我被黑心中介騙來泰國打工昵仅, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人。 一個月前我還...
    沈念sama閱讀 48,909評論 3 376
  • 正文 我出身青樓摔笤,卻偏偏與公主長得像够滑,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子吕世,可洞房花燭夜當晚...
    茶點故事閱讀 45,512評論 2 359

推薦閱讀更多精彩內(nèi)容