文章系列:
Async Squences/Streams
在介紹Concurrency Framework中的Async Squences/Streams焕参,我們先回顧一下swift的集合中的 Sequence和Iterators丈氓。
Swift集合中的Sqeuence
swift集合中的Sequence是一系列相同類型值的集合就珠,并提供了對(duì)這些值的迭代能力。
for element in someSequence {
doSomething(with: element)
}
Sequence 協(xié)議的定義:
protocol Sequence {
associatedtype Iterator: IteratorProtocol
func makeIterator() -> Iterator
}
Sequence 協(xié)議需要實(shí)現(xiàn)makeIterator方法符欠,并返回一個(gè)Iterator闯估,Iterator遵循IteratorProtocol:
public protocol IteratorProtocol {
associatedtype Element
public mutating func next() -> Self.Element?
}
IteratorProtocol需要實(shí)現(xiàn)next方法,返回存儲(chǔ)的值對(duì)象。當(dāng)沒有下一個(gè)元素返回nil昔头。
我們以下載一系列的url的任務(wù)為例子,使用同步Squence方式:
struct RemoteDataSequence: Sequence {
var urls: [URL]
func makeIterator() -> RemoteDataIterator {
RemoteDataIterator(urls: urls)
}
}
為了返回?cái)?shù)據(jù)影兽,我們需要實(shí)現(xiàn)RemoteDataIterator類型揭斧,設(shè)計(jì)上我們使用index
跟蹤下一個(gè)待下載的urls數(shù)組索引
struct RemoteDataIterator: IteratorProtocol {
var urls: [URL]
fileprivate var index = 0
mutating func next() -> Data? {
guard index < urls.count else {
return nil
}
let url = urls[index]
index += 1
// If a download fails, we simply move on to
// the next URL in this case:
guard let data = try? Data(contentsOf: url) else {
return next()
}
return data
}
}
我們現(xiàn)在可以通過for循環(huán)來(lái)遍歷訪問下載的所有圖片數(shù)據(jù)
for data in RemoteDataSequence(urls: urls) {
...
}
雖然我們通過Sqeuence實(shí)現(xiàn)了一個(gè)簡(jiǎn)潔的批量下載器,但是批量下載使用同步的方式顯然比較難于接受峻堰,這樣會(huì)完全阻塞線程讹开。接下來(lái)我們通過使用asynchronous sequence來(lái)達(dá)到我們的要求。
Asynchronous iterations
Swift 5.5中Concurrency為了方便并行任務(wù)的開發(fā)捐名,提供了AsyncSequence旦万,使用方式類似同步版本的Sequence。針對(duì)批量下載器我們可以這樣改造一下:
struct RemoteDataSequence: AsyncSequence {
typealias Element = Data
var urls: [URL]
func makeAsyncIterator() -> RemoteDataIterator {
RemoteDataIterator(urls: urls)
}
}
AsyncSequence重要實(shí)現(xiàn)其實(shí)是在RemoteDataIterator中镶蹋,Concurrency為RemoteDataIterator的next
返回方法添加了async
成艘。通過URLSession
的async-wait API,我們可以異步下載我們的數(shù)據(jù):
struct RemoteDataIterator: AsyncIteratorProtocol {
var urls: [URL]
fileprivate var urlSession = URLSession.shared
fileprivate var index = 0
mutating func next() async throws -> Data? {
guard index < urls.count else {
return nil
}
let url = urls[index]
index += 1
let (data, _) = try await urlSession.data(from: url)
return data
}
}
通過AsyncSequence
的改造贺归,現(xiàn)在我們的批量下載器已經(jīng)可以全異步執(zhí)行淆两,不過在我們?cè)L問數(shù)據(jù)時(shí)還是需要調(diào)用await
和try
,數(shù)據(jù)會(huì)通過后臺(tái)線程下載并允許我們使用for循環(huán)來(lái)遍歷訪問
for try await data in RemoteDataSequence(urls: urls) {
...
}
在for循環(huán)中拂酣,如果一個(gè)步驟拋出了異常則循環(huán)會(huì)中止秋冰,這樣有利于簡(jiǎn)化異常捕獲的處理。如果不想要異常導(dǎo)致循環(huán)中斷踱葛,也可以實(shí)現(xiàn)無(wú)異常的方法丹莲。
Asynchronous streams
通過實(shí)現(xiàn)AsyncIteratorProtocol有時(shí)候還是稍嫌麻煩(需要自定義 AsyncIteratorProtocol 的類型),Concurrency提供了AsyncStream
和AsyncThrowingStream
尸诽。在AsyncStream
和AsyncThrowingStream
構(gòu)造閉包函數(shù)中甥材,需要使用Task
來(lái)執(zhí)行異步任務(wù),使用yield
方法來(lái)返回?cái)?shù)據(jù)性含,同時(shí)調(diào)用finish
來(lái)告知是否存在異常洲赵。上面的例子可以改造為:
func remoteDataStream(
forURLs urls: [URL],
urlSession: URLSession = .shared
) -> AsyncThrowingStream<Data, Error> {
AsyncThrowingStream { continuation in
Task {
do {
for url in urls {
let (data, _) = try await urlSession.data(from: url)
continuation.yield(data)
}
continuation.finish(throwing: nil)
} catch {
continuation.finish(throwing: error)
}
}
}
}
現(xiàn)在我們可以同樣使用for來(lái)遍歷我們的下載數(shù)據(jù):
for try await data in remoteDataStream(forURLs: urls) {
...
}
AsyncStream
和AsyncThrowingStream
可以認(rèn)為是AsyncSequence
協(xié)議的具體實(shí)現(xiàn),相當(dāng)于Array
是Sequence
的具體實(shí)現(xiàn)商蕴。在開發(fā)中使用stream可以簡(jiǎn)化我們的異步程序編寫叠萍。
在Apple的響應(yīng)式框架Combine
也提供了對(duì)AsyncSequence
的兼容,可以輕松地將任何publisher
都轉(zhuǎn)換為AsyncSequence
的值對(duì)象绪商。上面的下載器可以使用Combine來(lái)改寫:
func remoteDataPublisher(
forURLs urls: [URL],
urlSession: URLSession = .shared
) -> AnyPublisher<Data, URLError> {
urls.publisher
.setFailureType(to: URLError.self)
.flatMap(maxPublishers: .max(1)) {
urlSession.dataTaskPublisher(for: $0)
}
.map(\.data)
.eraseToAnyPublisher()
}
將AnyPublisher
轉(zhuǎn)換為AsyncSequence
苛谷,我們只需要訪問publisher的values屬性:
let publisher = remoteDataPublisher(forURLs: urls)
for try await data in publisher.values {
...
}