今天绰播,筆者與大家分析一下骄噪,RxSwift中另一個(gè)特別重要,甚至是最重要之一的操作符:flatMap幅垮。
如果大家沒有讀過本系列的第一篇腰池,建議先閱讀一下再來看本篇內(nèi)容。
在RxSwift進(jìn)行開發(fā)的過程中忙芒,有一個(gè)非常常見的場景:點(diǎn)擊某個(gè)按鈕示弓,然后發(fā)送請求,之后處理請求結(jié)果呵萨。代碼大致如下:
myBtn
.rx
.tap
.flatMap { _ in
performRequest()
}
.map {
processResponse($0)
}
.subScribe(
onNext: { ... }
)
.disposed(by: bag)
在這個(gè)場景中奏属,所要使用的最核心的一個(gè)操作符便是flatMap,它的作用是將myBtn產(chǎn)生的點(diǎn)擊事件轉(zhuǎn)換成另一個(gè)事件序列潮峦,并將該事件序列產(chǎn)生的事件向下傳遞囱皿。代碼如下:
extension ObservableType {
public func flatMap<Source: ObservableConvertibleType>(_ selector: @escaping (Element) throws -> Source)
-> Observable<Source.Element> {
return FlatMap(source: self.asObservable(), selector: selector)
}
}
該方法接收一個(gè)(Element) throws -> Source的closure勇婴,Element就是自身的元素類型,而返回值類型Source是一個(gè)泛型參數(shù)嘱腥,并且是遵循ObservableConvertibleType的耕渴。也就是說,這個(gè)closuer會(huì)把上游產(chǎn)生的事件轉(zhuǎn)換成一個(gè)事件序列齿兔。方法實(shí)現(xiàn)中返回了一個(gè)FlatMap類型的對象橱脸,這個(gè)對象持有了self.asObservable(),以及傳入的closure.
接下來分苇,咱們看一下這個(gè)FlatMap的源碼:
final private class FlatMap<SourceElement, SourceSequence: ObservableConvertibleType>: Producer<SourceSequence.Element> {
typealias Selector = (SourceElement) throws -> SourceSequence
private let source: Observable<SourceElement>
private let selector: Selector
init(source: Observable<SourceElement>, selector: @escaping Selector) {
self.source = source
self.selector = selector
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == SourceSequence.Element {
let sink = FlatMapSink(selector: self.selector, observer: observer, cancel: cancel)
let subscription = sink.run(self.source)
return (sink: sink, subscription: subscription)
}
}
FlatMap是一個(gè)Producer的子類添诉,在它的run方法中通過self.selector和observer生成了一個(gè)FlatMapSink類的對象。這個(gè)selector就是在調(diào)用flatMap時(shí)傳入的closure, observer其實(shí)就是在subscribe方法中創(chuàng)建的observer(參見系列第一篇)医寿。然后調(diào)用了FlatMapSink的run方法栏赴。但是我們再FlatMapSink里并沒有看到run方法,因?yàn)镕latMapSink是MergeSink的子類靖秩,它的很多邏輯都在它的父類MergeSink里面:
private class MergeSink<SourceElement, SourceSequence: ObservableConvertibleType, Observer: ObserverType>
: Sink<Observer>
, ObserverType where Observer.Element == SourceSequence.Element {
typealias ResultType = Observer.Element
typealias Element = SourceElement
let lock = RecursiveLock()
var subscribeNext: Bool {
true
}
// state
let group = CompositeDisposable()
let sourceSubscription = SingleAssignmentDisposable()
var activeCount = 0
var stopped = false
......
func run(_ source: Observable<SourceElement>) -> Disposable {
_ = self.group.insert(self.sourceSubscription)
let subscription = source.subscribe(self)
self.sourceSubscription.setDisposable(subscription)
return self.group
}
}
在源碼的最后面须眷,我們看到了run方法,只有四行盆偿,我們需要關(guān)注的是第二行:
let subscription = source.subscribe(self)
也就是說讓self訂閱了傳入的Observable柒爸,而這個(gè)傳入的Observable就是FlatMap.source,也就是上游序列事扭。這樣一來大致流程就清楚了:
通過Obsevable.flatMap,讓FlatMap類持有了上游Obsevable乐横,并通過FlatMapSink訂閱了這個(gè)上游Obsevable求橄。接下來上游Obsevable的所有事件都會(huì)被傳遞給FlatMapSink的on方法(原理參見系列第一篇)。
接下來的重點(diǎn)葡公,我們需要看一下這個(gè)FlatMapSink的on方法做了什么罐农,還是在父類MergeSink里面。
func on(_ event: Event<SourceElement>) {
switch event {
case .next(let element):
if let value = self.nextElementArrived(element: element) {
self.subscribeInner(value.asObservable())
}
case .error(let error):
self.lock.performLocked {
self.forwardOn(.error(error))
self.dispose()
}
case .completed:
self.lock.performLocked {
self.stopped = true
self.sourceSubscription.dispose()
self.checkCompleted()
}
}
}
error事件會(huì)直接調(diào)用forwardOn催什,將事件傳遞給下游observer涵亏,completed事件則是調(diào)用checkCompleted方法,在checkCompleted方法中判斷是否需要將completed事件傳遞下去蒲凶。這是因?yàn)镸ergeSink可能會(huì)訂閱多個(gè)事件序列气筋,對這些事件進(jìn)行一系列變換,直到所有的事件序列都complete之后才會(huì)傳遞旋圆,但這個(gè)不是今天的重點(diǎn)宠默,所以只要知道個(gè)大概就可以了,具體的咱們在后續(xù)的文章中再介紹灵巧。
接下來看next事件搀矫,在next事件中會(huì)調(diào)用self.nextElementArrived(element: element):
final private func nextElementArrived(element: SourceElement) -> SourceSequence? {
self.lock.performLocked {
if !self.subscribeNext {
return nil
}
do {
let value = try self.performMap(element)
self.activeCount += 1
return value
}
catch let e {
self.forwardOn(.error(e))
self.dispose()
return nil
}
}
}
這個(gè)方法會(huì)首先加鎖抹沪,然后執(zhí)行self.performMap(element)方法,這個(gè)方法在子類(FlatMapSink)中有實(shí)現(xiàn),其實(shí)就是調(diào)用self.selector:
override func performMap(_ element: SourceElement) throws -> SourceSequence {
try self.selector(element)
}
而這個(gè)selector其實(shí)就是在最初調(diào)用Observable.flatMap時(shí)傳入的closure瓤球,所以到這里融欧,myBtn的點(diǎn)擊事件就會(huì)被轉(zhuǎn)換成了一個(gè)事件序列。獲取這個(gè)返回值之后卦羡,on方法會(huì)調(diào)用subscribeInner方法:
func subscribeInner(_ source: Observable<Observer.Element>) {
let iterDisposable = SingleAssignmentDisposable()
if let disposeKey = self.group.insert(iterDisposable) {
let iter = MergeSinkIter(parent: self, disposeKey: disposeKey)
let subscription = source.subscribe(iter)
iterDisposable.setDisposable(subscription)
}
}
在這里蹬癌,主要是創(chuàng)建了一個(gè)MergeSinkIter對象,并讓這個(gè)MergeSinkIter對象訂閱了傳入的Observable虹茶。根據(jù)MergeSinkIter的on方法:
func on(_ event: Event<Element>) {
self.parent.lock.performLocked {
switch event {
case .next(let value):
self.parent.forwardOn(.next(value))
case .error(let error):
self.parent.forwardOn(.error(error))
self.parent.dispose()
case .completed:
self.parent.group.remove(for: self.disposeKey)
self.parent.activeCount -= 1
self.parent.checkCompleted()
}
}
}
我們可以看見它實(shí)際就是將事件傳遞給了parent逝薪,也就是MergeSink。但是注意蝴罪,這里是直接調(diào)用的forwardOn方法董济,所以會(huì)直接把事件傳遞給下游observer,而不會(huì)觸發(fā)performMap方法(當(dāng)然也沒法直接調(diào)用on方法要门,因?yàn)槎呓邮盏膮?shù)泛型通常是不一樣的)虏肾。
這樣一來,flatMap的完整流程就結(jié)束了欢搜。
總結(jié)一下:
1.通過flatMap創(chuàng)建了一個(gè)FlatMap對象封豪,并通過FlatMapSink訂閱了上游Observable
2.當(dāng)上游Observable產(chǎn)生一個(gè)事件時(shí)FlatMapSink通過調(diào)用FlatMap.performMap方法觸發(fā)傳入的closure,產(chǎn)生了新的事件序列炒瘟。
3.通過MergeSinkIter將新的事件序列的事件傳遞個(gè)下游的observer吹埠,從而實(shí)現(xiàn)了Event->Observable->Event的傳遞。
它的流程和RxSwift中大部分操作符的核心邏輯差不多疮装,都是通過持有上游observable缘琅,創(chuàng)建一個(gè)Sink的子類去訂閱這個(gè)上游observable,然后將事件傳遞個(gè)下游的observer廓推。只不過因?yàn)檫@里產(chǎn)生的是事件序列刷袍,所以又多出了一個(gè)MergeSinkIter來處理這個(gè)事件序列的事件并進(jìn)行傳遞。
當(dāng)然樊展,今天只是介紹了FlatMap的具體流程呻纹,MergeSink的相關(guān)細(xì)節(jié)我們都忽略了,這個(gè)我們后面再說专缠。
碼字不易雷酪,若有錯(cuò)漏,歡迎指正藤肢。若有助益太闺,煩請點(diǎn)贊。^ _ ^