Observable 是 RxSwift 的基礎掀抹,但它們本質(zhì)上是只讀(read-only)的。你只能通過訂閱 observable,來獲得它們產(chǎn)生的新事件的通知盖奈。
舉個栗子:
Observable.create { observer in
cache.calculateDiskStorageSize { result in
switch result {
case .success(let size):
observer.onNext(Int64(size))
case .failure(let error):
observer.error(error)
log.error("calculate cache failed with error:\\(error)")
}
observer.onCompleted()
}
return Disposables.create()
}
從上述代碼中可以看出宪塔,Observable 在創(chuàng)建的時候磁奖,就已經(jīng)確定了通過某種固定的邏輯,去發(fā)出事件某筐,從而產(chǎn)生事件流比搭。
上面這個例子中的固定邏輯,指的就是計算 cache 大小南誊。通過 cache 計算結(jié)果身诺,來決定發(fā)出 onNext
, error
或者 completable
事件蜜托。
而在日常開發(fā)中,我們通常需要根據(jù)不同的邏輯霉赡,來決定發(fā)出事件橄务。抽象來說,就是需要一個既能作為 Observable 又能作為 Observer 的東西同廉。這種東西稱為 Subject:
let subject = PublishSubject<String>()
subject.on(.next("Is anyone listening?"))
let subscriptionOne = subject
.subscribe(onNext: { string in
print(string)
})
上述代碼創(chuàng)建了一個 PublishSubject, 它的名字很貼切:就像一個報紙出版商一樣仪糖,它接收信息,然后發(fā)布給訂閱者迫肖。
執(zhí)行上述代碼锅劝,會發(fā)現(xiàn)控制臺中沒有打印任何東西,這是因為:PublishSubject 只給當前的訂閱者發(fā)出事件蟆湖,如果一個 observer 是在事件發(fā)出之后才訂閱的故爵,那么將不會收到任何事件。
什么是 Subject隅津?
Subject 既是一個 observable, 又是一個 observer诬垂。在上面的例子中,可以看到 subject 既可以接收事件伦仍,又可以被訂閱结窘。
Observable 和 Subject 的區(qū)別,除了 Subject 既可以作為 Observable充蓝,又可以作為 Observer以外隧枫,也可以這么理解:
Observable 已經(jīng)把各種事件都定好了,比如發(fā)送網(wǎng)絡請求谓苟,然后 Observer 在 subscribe 的時候官脓,就觸發(fā)這個網(wǎng)絡請求,然后發(fā)送各種事件涝焙。
Subject 則是事件沒有定好卑笨,可以靈活地根據(jù)業(yè)務需求去進行觸發(fā),比如選擇相片仑撞,比如發(fā)送網(wǎng)絡請求赤兴,然后發(fā)送各種事件。
在 RxSwift 中隧哮,有四種類型的 subject:
- PublishSubject: 初始時是空的桶良,只向訂閱者發(fā)出新元素。
- BehaviorSubject: 有一個初始值近迁,并將其初始值和最新的元素發(fā)送給新的訂閱者。
- ReplaySubject: 初始化時需要有一個緩沖區(qū)大小簸州,并將維持一個該大小的緩沖區(qū)鉴竭,緩沖區(qū)內(nèi)的元素都會發(fā)送給新的訂閱者歧譬。
- AsyncSubject: 只發(fā)出序列中的最后一個 next 事件,并且只在 subject 接收到 completed 事件時才發(fā)出搏存。這是一種很少使用的 subject瑰步。
RxSwift 中還提供了一種叫做 Relay 的概念(在使用是需要 import RxRelay
),RxSwift 中提供了兩種 relay:
- PublishRelay
- BehaviorRelay
這兩種 relay 包含著對應的 subject, 但只能接收和轉(zhuǎn)發(fā) next 事件璧眠,不能添加 completed 或者 error 事件缩焦,所以它們對于非終止序列來說是非常友好的。
Relay 只能 accept 事件责静,不能發(fā)送 completed 或者 error 等終止事件袁滥,因此沒有結(jié)束的概念。如果要正確釋放 replay, 需要把它添加到 disposeBag 中灾螃。
使用 PublishSubject
let subject = PublishSubject<String>()
subject.on(.next("Is anyone listening?"))
let subscriptionOne = subject
.subscribe(onNext: { string in
print(string)
})
let subscriptionTwo = subject
.subscribe { event in
print("2)", event.element ?? event)
}
subject.onNext("3")
/**
output:
3
2) 3
**/
subscriptionOne.dispose()
subject.onNext("4")
/**
output:
2) 4
**/
當一個 publish subject 收到一個 completed 或 errror 事件题翻,也就是終止事件,它將向新的訂閱者發(fā)出該終止事件腰鬼,它將不再發(fā)出 next 事件嵌赠。
而且,它將向后來的訂閱者重新發(fā)出其終止事件熄赡。(Subject 被終止后姜挺,如果還有 observer 去 subscribe 它,那么 subject 會重復給這些 observers 發(fā)送終止事件)
// 1
subject.onCompleted()
// 2
subject.onNext("5")
// 3
subscriptionTwo.dispose()
let disposeBag = DisposeBag()
// 4
subject
.subscribe {
print("3)", $0.element ?? $0)
}
.disposed(by: disposeBag)
subject.onNext("?")
/**
output:
2) completed
3) completed
**/
使用 BehaviorSubject
頂部的第一行是 subject彼硫。
第二行的第一個訂閱者在 1 之后但在 2 之前訂閱炊豪,所以它在訂閱后立即收到 1,然后在主體發(fā)出 2 和 3 的時候收到乌助。
同樣地溜在,第二個訂閱者在 2 之后但在 3 之前訂閱,所以它在訂閱后立即收到 2他托,然后在 3 被發(fā)出時收到掖肋。
// 1
enum MyError: Error {
case anError
}
// 2
func print<T: CustomStringConvertible>(label: String, event: Event<T>) {
print(label, (event.element ?? event.error) ?? event)
}
// 3
example(of: "BehaviorSubject") {
// 4
let subject = BehaviorSubject(value: "Initial value")
let disposeBag = DisposeBag()
subject
.subscribe {
print(label: "1)", event: $0)
}
.disposed(by: disposeBag
/**
1) Initial value
**/
subject.onNext("X")
/**
1) X
**/
// 1
subject.onError(MyError.anError)
// 2
subject
.subscribe {
print(label: "2)", event: $0)
}
.disposed(by: disposeBag)
/**
1) anError
2) anError
**/
}
BehaviorSubject 向新的訂閱者重放他們的最新值。這使得它們很適合用來模擬各種狀態(tài)的轉(zhuǎn)移赏参,比如“請求正在加載中”→“請求完成”志笼。
那如果想要顯示比上一個值還更多的內(nèi)容呢,比如在搜索框上把篓,需要顯示最近使用的五個搜索值纫溃,這個時候就要用到 ReplaySubject 了。
使用 ReplaySubject
ReplaySubject 將暫時緩存韧掩、或緩沖他們發(fā)出的最新元素紊浩,直到達到你選擇的指定大小。然后,他們將向新的訂閱者重新發(fā)出該緩沖區(qū)內(nèi)的元素坊谁。
下面的大理石圖描述了一個緩沖區(qū)大小為2的重放主體:
第一個訂閱者(中間一行)已經(jīng)訂閱了 replay subject(最上面一行)费彼,所以它在元素被發(fā)射出來的時候得到了元素。第二個訂閱者(底線)在 2 之后訂閱了口芍,所以它得到了 1 和 2 的重放箍铲。
請記住,當使用一個 replay subject 時鬓椭,這個緩沖區(qū)是在內(nèi)存中保存的颠猴,所以很有可能會導致太高的內(nèi)存占用。比如你為某種類型的 replay subject 設置一個大的緩沖區(qū)大小小染,而這種類型的實例都會占用大量的內(nèi)存翘瓮,比如圖像。
另一件需要注意的事情是創(chuàng)建一個數(shù)組類型的 replay subject氧映。每個發(fā)射的元素將是一個數(shù)組春畔,所以緩沖區(qū)的大小將緩沖那么多數(shù)組。如果不小心的話岛都,也很容易在這里產(chǎn)生內(nèi)存壓力律姨。
example(of: "ReplaySubject") {
// 1
let subject = ReplaySubject<String>.create(bufferSize: 2)
let disposeBag = DisposeBag()
// 2
subject.onNext("1")
subject.onNext("2")
subject.onNext("3")
// 3
subject
.subscribe {
print(label: "1)", event: $0)
}
.disposed(by: disposeBag)
subject
.subscribe {
print(label: "2)", event: $0)
}
.disposed(by: disposeBag)
/**
--- Example of: ReplaySubject ---
1) 2
1) 3
2) 2
2) 3
**/
subject.onNext("4")
subject.onError(MyError.anError)
subject
.subscribe {
print(label: "3)", event: $0)
}
.disposed(by: disposeBag)
/**
前兩個訂閱者將正常接收當前元素,因為當新元素被添加到主題時臼疫,他們已經(jīng)被訂閱了择份,而新的第三個訂閱者將得到最后兩個緩沖的元素重放給它。
雖然最后訂閱流中發(fā)出了一個 error 事件烫堤,但是緩沖區(qū)還在內(nèi)存中荣赶,所以它還會把緩沖區(qū)之前的元素發(fā)給訂閱者。
1) 4
2) 4
1) anError
2) anError
3) 3
3) 4
3) anError
**/
}
subject.dispose()
// 因為 subject 在前面已經(jīng)發(fā)出了 error 事件鸽斟,所以它被終止并且釋放了拔创,這里再調(diào)用 dispose 會報錯
// 3) Object `RxSwift...ReplayMany<Swift.String>` was already disposed.
使用 Relay
在前面的介紹中,我們知道:Relay 實際上是對應 Subject 的一層封裝——PublishRelay 是 PublishSubject 的封裝富蓄,BehaviorRelay 是 BehaviorSubject 的封裝剩燥。它和 Subject 不一樣的地方在于:它只能通過 accept(_:)
方法接收并發(fā)出事件,它不能使用 onNext(_:)
發(fā)出事件立倍,也不能使用 onCompleted()
或者 onError(_:)
去終止訂閱流灭红,因此,Relay 保證了永遠不會終止口注。
let relay = PublishRelay<String>()
let disposeBag = DisposeBag()
relay.accept("Knock knock, anyone home?")
relay
.subscribe(onNext: {
print($0)
})
.disposed(by: disposeBag)
relay.accept("1")
// output: 1
relay.accept(MyError.anError)
relay.onCompleted()
// compile error