先來看看核心代碼subscribe
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
其實(shí)看代碼是有技巧的暮蹂,我一看到這個(gè)代碼我就知道這個(gè)if..else是跟線程安全有關(guān)的,如果你重點(diǎn)關(guān)注線程安全的話牧抽,那么你就要仔細(xì)看線程安全的邏輯鞠评,如果你關(guān)注的是實(shí)現(xiàn)原理那么你看的核心實(shí)現(xiàn)邏輯,看代碼的時(shí)候要選擇性的忽略掉一些細(xì)節(jié)胸懈,集中精力放在你關(guān)注的問題上面。我現(xiàn)在重點(diǎn)關(guān)注的是schedule 邏輯恰响。
public class CurrentThreadScheduler : ImmediateSchedulerType {
typealias ScheduleQueue = RxMutableBox<Queue<ScheduledItemType>>
/// The singleton instance of the current thread scheduler.
public static let instance = CurrentThreadScheduler()
private static var isScheduleRequiredKey: pthread_key_t = { () -> pthread_key_t in
let key = UnsafeMutablePointer<pthread_key_t>.allocate(capacity: 1)
defer {
key.deallocate(capacity: 1)
}
guard pthread_key_create(key, nil) == 0 else {
rxFatalError("isScheduleRequired key creation failed")
}
return key.pointee
}()
private static var scheduleInProgressSentinel: UnsafeRawPointer = { () -> UnsafeRawPointer in
return UnsafeRawPointer(UnsafeMutablePointer<Int>.allocate(capacity: 1))
}()
static var queue : ScheduleQueue? {
get {
return Thread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerQueueKey.instance)
}
set {
Thread.setThreadLocalStorageValue(newValue, forKey: CurrentThreadSchedulerQueueKey.instance)
}
}
/// Gets a value that indicates whether the caller must call a `schedule` method.
public static fileprivate(set) var isScheduleRequired: Bool {
get {
return pthread_getspecific(CurrentThreadScheduler.isScheduleRequiredKey) == nil
}
set(isScheduleRequired) {
if pthread_setspecific(CurrentThreadScheduler.isScheduleRequiredKey, isScheduleRequired ? nil : scheduleInProgressSentinel) != 0 {
rxFatalError("pthread_setspecific failed")
}
}
}
/**
Schedules an action to be executed as soon as possible on current thread.
If this method is called on some thread that doesn't have `CurrentThreadScheduler` installed, scheduler will be
automatically installed and uninstalled after all work is performed.
- parameter state: State passed to the action to be executed.
- parameter action: Action to be executed.
- returns: The disposable object used to cancel the scheduled action (best effort).
*/
public func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
if CurrentThreadScheduler.isScheduleRequired {
CurrentThreadScheduler.isScheduleRequired = false
let disposable = action(state)
defer {
CurrentThreadScheduler.isScheduleRequired = true
CurrentThreadScheduler.queue = nil
}
guard let queue = CurrentThreadScheduler.queue else {
return disposable
}
while let latest = queue.value.dequeue() {
if latest.isDisposed {
continue
}
latest.invoke()
}
return disposable
}
let existingQueue = CurrentThreadScheduler.queue
let queue: RxMutableBox<Queue<ScheduledItemType>>
if let existingQueue = existingQueue {
queue = existingQueue
}
else {
queue = RxMutableBox(Queue<ScheduledItemType>(capacity: 1))
CurrentThreadScheduler.queue = queue
}
let scheduledItem = ScheduledItem(action: action, state: state)
queue.value.enqueue(scheduledItem)
return scheduledItem
}
}
代碼有點(diǎn)長(zhǎng)趣钱,拆開一點(diǎn)點(diǎn)看
typealias ScheduleQueue = RxMutableBox<Queue<ScheduledItemType>>
Box
:你可以理解為黑盒,里面裝著實(shí)物胚宦,做這個(gè)封裝是為了忽視細(xì)節(jié)首有。打個(gè)比方對(duì)于貨車司機(jī)來說他才不管你具體裝的什么貨呢?他只關(guān)心目的地在哪枢劝,只有發(fā)貨方和收貨方才需要關(guān)注具體貨物是什么井联,這樣做一層抽象有利于中間層的運(yùn)輸。類似的就要系統(tǒng)的Optional
類型
Queue
: 是一個(gè)可擴(kuò)展循環(huán)隊(duì)列您旁,遵循先進(jìn)先出原則烙常,add和remove算法復(fù)雜度都為O(1),可根據(jù)元素?cái)?shù)量自適應(yīng)大小,通過設(shè)置游標(biāo)達(dá)到訪問元素的目的鹤盒,具體參見Queue.Swift
private static var isScheduleRequiredKey: pthread_key_t = { () -> pthread_key_t in
let key = UnsafeMutablePointer<pthread_key_t>.allocate(capacity: 1)
defer {
key.deallocate(capacity: 1)
}
guard pthread_key_create(key, nil) == 0 else {
rxFatalError("isScheduleRequired key creation failed")
}
return key.pointee
}()
這是一個(gè)底層api蚕脏,為線程開辟一個(gè)私有空間存儲(chǔ)線程數(shù)據(jù)
相關(guān)文章: pthread_key_create
static var queue : ScheduleQueue? {
get {
return Thread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerQueueKey.instance)
}
set {
Thread.setThreadLocalStorageValue(newValue, forKey: CurrentThreadSchedulerQueueKey.instance)
}
}
與上面的功能類似,不同的是使用的NSThread的API,相對(duì)上面的api要高級(jí)一點(diǎn)侦锯,為當(dāng)前線程開辟一片空間用來存儲(chǔ)queue類型數(shù)據(jù)
public static fileprivate(set) var isScheduleRequired: Bool {
get {
return pthread_getspecific(CurrentThreadScheduler.isScheduleRequiredKey) == nil
}
set(isScheduleRequired) {
if pthread_setspecific(CurrentThreadScheduler.isScheduleRequiredKey, isScheduleRequired ? nil : scheduleInProgressSentinel) != 0 {
rxFatalError("pthread_setspecific failed")
}
}
}
這個(gè)比較簡(jiǎn)單設(shè)置isScheduleRequired
{get , set}屬性注意這里的數(shù)據(jù)都是針對(duì)線程的不同于以往的數(shù)據(jù)屬于對(duì)象驼鞭。
public func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
if CurrentThreadScheduler.isScheduleRequired {
CurrentThreadScheduler.isScheduleRequired = false
let disposable = action(state)
defer {
CurrentThreadScheduler.isScheduleRequired = true
CurrentThreadScheduler.queue = nil
}
guard let queue = CurrentThreadScheduler.queue else {
return disposable
}
while let latest = queue.value.dequeue() {
if latest.isDisposed {
continue
}
latest.invoke()
}
return disposable
}
let existingQueue = CurrentThreadScheduler.queue
let queue: RxMutableBox<Queue<ScheduledItemType>>
if let existingQueue = existingQueue {
queue = existingQueue
}
else {
queue = RxMutableBox(Queue<ScheduledItemType>(capacity: 1))
CurrentThreadScheduler.queue = queue
}
let scheduledItem = ScheduledItem(action: action, state: state)
queue.value.enqueue(scheduledItem)
return scheduledItem
}
這個(gè)是核心代碼schedule, 通過設(shè)置flag判斷當(dāng)前是否有task在執(zhí)行尺碰,如果有則將task加入到待處理的隊(duì)列中去挣棕。當(dāng)線程處理完當(dāng)前task后汇竭,會(huì)檢測(cè)是否還有為執(zhí)行的task,然后依次執(zhí)行為執(zhí)行的task
官方測(cè)試代碼
func testCurrentThreadScheduler_basicScenario() {
XCTAssertTrue(CurrentThreadScheduler.isScheduleRequired)
var messages = [Int]()
_ = CurrentThreadScheduler.instance.schedule(()) { s in
messages.append(1)
_ = CurrentThreadScheduler.instance.schedule(()) { s in
messages.append(3)
_ = CurrentThreadScheduler.instance.schedule(()) {
messages.append(5)
return Disposables.create()
}
messages.append(4)
return Disposables.create()
}
messages.append(2)
return Disposables.create()
}
XCTAssertEqual(messages, [1, 2, 3, 4, 5])
}