首先看一下RxSwift訂閱最簡單的代碼:代碼-001
//代碼-001
// 1.創(chuàng)建序列
let ob = Observable<Any>.create { (observer) -> Disposable in
//3.發(fā)送信號
observer.onNext("走起")
return Disposables.create()
}
// 2. ob(Observable類型) 訂閱序列
ob.subscribe(onNext: { (content) in
//4.響應(yīng)信
print("訂閱的內(nèi)容:\(content)")
}) {
print("銷毀")
}
從這段代碼我們可以看出它的流程是:
創(chuàng)建序列->訂閱序列->發(fā)送信號->響應(yīng)信號
那么它內(nèi)部的核心流程是怎樣的呼奢,下面我們一起來探討一下。
先來個流程圖切平,后面內(nèi)容不理解的可以跟著這張圖捋捋思路:一握础、創(chuàng)建序列
首先我們先進入create
函數(shù):代碼-002
//代碼-002
public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
/// AnonymousObservable持有了逃逸性函數(shù)subscribe
return AnonymousObservable(subscribe)
}
我們發(fā)現(xiàn)create
函數(shù)內(nèi)部創(chuàng)建了一個AnonymousObservable
類,并且將subscribe
傳入了該類。
進入AnonymousObservable
類:代碼-003
//代碼-003
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
// 持有subscribeHandler
self._subscribeHandler = subscribeHandler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
發(fā)現(xiàn)該類保存了subscribeHandler
悴品。 它的父類是Producer(類)
->Observable(類)
->ObservableType(協(xié)議)
->ObservableConvertibleType(協(xié)議)
弓候。
二、序列訂閱(綁定)
進入subscribe
函數(shù):代碼-004
//代碼-004
public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
/// 初始化 AnonymousObserver
let observer = AnonymousObserver<E> { event in
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
/// asObservable() 實例化 序列(Observable)
self.asObservable().subscribe(observer)
disposable
)
}
該函數(shù)內(nèi)創(chuàng)建了一個對象observer
監(jiān)聽者他匪,顧名思義該類就是用來監(jiān)聽信號的菇存,一旦有信號發(fā)出該對象就會做出響應(yīng)。
繼續(xù)看return
邦蜜,返回值暫時不用關(guān)心依鸥,關(guān)心的是返回值里的參數(shù)self.asObservable().subscribe(observer)
。代碼-005
//代碼-005
public class Observable<Element> : ObservableType {
public func asObservable() -> Observable<E> {
return self
}
}
asObservable()
就是當前序列(AnonymousObservable
類型)悼沈。
查看一開始貼出的AnonymousObservable
類的代碼(代碼-003)發(fā)現(xiàn)該類內(nèi)并沒有subscribe
贱迟,而是在他的父類producer
內(nèi),注意這里傳入的參數(shù)是observer
(AnonymousObserver
類型):代碼-006
//代碼-006
class Producer<Element> : Observable<Element> {
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
let sinkAndSubscription = self.run(observer, cancel: disposer)
}
}
在當前Producer
類subscribe
函數(shù)內(nèi)執(zhí)行了run
方法,而在一開始創(chuàng)建的Producer 的子類AnonymousObservable
類內(nèi)實現(xiàn)了該方法(查看代碼-003)絮供,在該實現(xiàn)里創(chuàng)建了一個AnonymousObservableSink
類型的對象sink
衣吠,sink
對象內(nèi)傳入了observer
(AnonymousObserver
類型),創(chuàng)建完之后執(zhí)行了run
函數(shù)壤靶。
AnonymousObservableSink
類中:代碼-007
//代碼-007
final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
override init(observer: O, cancel: Cancelable) {
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<E>) {
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
}
發(fā)現(xiàn)AnonymousObservableSink
類 實現(xiàn)了run
方法,并且繼承之Sink
類:代碼-008
//代碼-008
class Sink<O : ObserverType> : Disposable {
fileprivate let _observer: O
fileprivate let _cancel: Cancelable
fileprivate let _disposed = AtomicInt(0)
init(observer: O, cancel: Cancelable) {
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
//持有了observer(AnonymousObserver類型)
self._observer = observer
self._cancel = cancel
}
final func forwardOn(_ event: Event<O.E>) {
if isFlagSet(self._disposed, 1) {
return
}
self._observer.on(event)
}
}
從這段代碼的初始化方法里看出sink
對象持有了observer
對象(AnonymousObserver
類型)缚俏。
繼續(xù)看AnonymousObservableSink
類 內(nèi) 的run
函數(shù),parent
就是代碼-003中傳過來的self
(AnonymousObservable
類型的對象)贮乳,所以_subscribeHandler
就是一開始初始化序列時保存的subscribeHandler
忧换,終于發(fā)現(xiàn)subscribeHandler
是在這里被調(diào)用了,這喚醒了第三部:發(fā)送信號向拆,但是我們發(fā)現(xiàn)傳入的是AnyObserver(self)
而不是AnonymousObserver
,self
在這里是sink
亚茬。下面我們來看下AnyObserver
類:代碼-009
//代碼-009
public struct AnyObserver<Element> : ObserverType {
public init<O : ObserverType>(_ observer: O) where O.E == Element {
self.observer = observer.on
}
public func on(_ event: Event<Element>) {
return self.observer(event)
}
}
發(fā)現(xiàn)AnyObserver
是一個結(jié)構(gòu)體,遵循ObserverType
協(xié)議浓恳。該結(jié)構(gòu)體持有了observer.on
閉包函數(shù)(此處的observer
是sink
).
下面我們來看第三步:發(fā)送信號
三刹缝、發(fā)送信號
從上面的分析可以看出代碼-001的observer
就是AnonymousObservableSink
類(代碼-007)內(nèi)傳入的AnyObserver
,執(zhí)行了onNext
函數(shù),但是該結(jié)構(gòu)體并沒有實現(xiàn)onNext
函數(shù)颈将,而是在ObserverType
協(xié)議的擴展里面:代碼-010
//代碼-010
extension ObserverType {
public func onNext(_ element: E) {
self.on(.next(element))
}
}
onNext
函數(shù)內(nèi)調(diào)用了on
并且傳入了發(fā)送信號的值梢夯。因為該self
就是AnyObserver
類型,所以走到了AnyObserver
結(jié)構(gòu)體(代碼-009)里面的on
函數(shù)吆鹤。
查看(代碼-009)突然發(fā)現(xiàn)該函數(shù)竟然調(diào)用了self.observer
厨疙,它不就是剛剛AnyObserver
持有的observer.on
(也就是sink.on
)的閉包函數(shù)嗎!所以又走到了AnonymousObservableSink
類的on
閉包函數(shù)(代碼-007)洲守。
所以執(zhí)行到了 self.forwardOn(event)
,forwardOn
的實現(xiàn)在它的父類Sink
中(代碼-008)疑务。
至此我們發(fā)現(xiàn)該實現(xiàn)中執(zhí)行了self._observer.on(event)
代碼沾凄。哇塞!該_observer
不就是Sink
持有的observer
(AnonymousObserver
類型)對象嗎知允!不就是我們在subscribe
(代碼-004)函數(shù)中創(chuàng)建的監(jiān)聽者observer
嗎撒蟀!666!!! 繞了一大圈終于回到了監(jiān)聽者的位置,進入AnonymousObserver
類代碼:代碼-011
//代碼-011
final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
typealias EventHandler = (Event<Element>) -> Void
private let _eventHandler : EventHandler
init(_ eventHandler: @escaping EventHandler) {
self._eventHandler = eventHandler
}
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
}
從這段代碼可以看到該類只實現(xiàn)了onCore
方法温鸽,再進入他的父類ObserverBase
,查看代碼:代碼-012
//代碼-012
class ObserverBase<ElementType> : Disposable, ObserverType {
func on(_ event: Event<E>) {
switch event {
case .next:
if load(self._isStopped) == 0 {
self.onCore(event)
}
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.onCore(event)
}
}
}
func onCore(_ event: Event<E>) {
rxAbstractMethod()
}
}
到這可以長舒一口氣了保屯!on
方法內(nèi)調(diào)用了onCore
,肯定進入到他的子類AnonymousObserver
中涤垫,看代碼 代碼-011 可以發(fā)現(xiàn) _eventHandler
閉包函數(shù)在此被調(diào)用姑尺,也就是 代碼-004 中 AnonymousObserver
的閉包得到執(zhí)行,執(zhí)行到了onNext
閉包函數(shù)蝠猬。
至此subscribe
訂閱的onNext
閉包得到了調(diào)用切蟋,喚起最后一步:響應(yīng)信號。
到此我們已經(jīng)了解了整個流程榆芦,下面我們來終結(jié)一下:
總結(jié)
了解一下用到了哪些類柄粹、協(xié)議、結(jié)構(gòu)體:
- 序列:
AnonymousObservable(類)
->Producer(類)
->Observable(類)
->ObservableType(協(xié)議)
->ObservableConvertibleType(協(xié)議)
匆绣。
AnonymousObservableSink(類)
->Sink(類)
->Disposable(協(xié)議)
- 訂閱者:
AnonymousObserver(類)
->ObserverBase(類)
->Disposable(協(xié)議)
驻右,ObserverType(協(xié)議)
AnyObserver(結(jié)構(gòu)體)
- >ObserverType(協(xié)議)