Async sequences, streams, and Combine
当使用标准for循环遍历任何Swift集合时,有两个关键组件决定将哪些元素传递到迭代代码中——序列和迭代器(a sequence, and an iterator)。
虽然在编写Swift代码时,我们经常直接与序列交互,但我们很少需要自己处理迭代器,因为每当我们使用for循环时,语言会自动为我们管理这些实例。
然而,由专用类型显式控制的迭代非常强大,因为这使我们能够编写自己的、完全定制的序列,这些序列可以以与循环内置类型(如Array、Dictionary和Set)相同的方式进行迭代。
在本文中,让我们看看这个系统是如何工作的,以及它是如何扩展到Swift并发世界的——使我们能够创建完全异步的序列和流,随时间传递它们的值。
1. Sequences and iterators
假设我们想要构建一个自定义序列,让我们可以从一系列url下载数据。为了实现这一点,我们首先需要实现一个符合Sequence协议的自定义类型,该类型会使用必需的makeIterator方法创建一个迭代器,如下所示:
struct RemoteDataSequence: Sequence {
var urls: [URL]
func makeIterator() -> RemoteDataIterator {
RemoteDataIterator(urls: urls)
}
}
接下来,让我们实现上面的RemoteDataIterator类型,它通过每次系统请求新元素时递增索引属性来执行迭代:
struct RemoteDataIterator: IteratorProtocol {
var urls: [URL]
fileprivate var index = 0
mutating func next() -> Data? {
guard index < urls.count else {
return nil
}
let url = urls[index]
index += 1
// If a download fails, we simply move on to
// the next URL in this case:
guard let data = try? Data(contentsOf: url) else {
return next()
}
return data
}
}
我们不需要担心管理多个并发迭代,因为每次启动新的for循环时,系统都会自动创建一个新的迭代器(使用序列的makeIterator方法)。
有了上面的内容,我们现在就可以用迭代数组时使用的语法来迭代所有下载的数据了:
for data in RemoteDataSequence(urls: urls) {
...
}
很酷的! 然而,像这样同步下载数据可能不是一个很好的主意(除非编写脚本或命令行工具),因为这样做将完全阻塞当前线程,直到所有下载完成。因此,让我们来探索一下如何将上述内容转换为异步序列。
2. Asynchronous iterations
Swift 5.5的新并发系统引入了异步序列和迭代器的概念,它们的定义方式几乎与同步序列完全相同。 所以,要使我们的RemoteDataSequence异步,我们所要做的就是使它符合AsyncSequence协议,并实现makeAsyncIterator方法——像这样:
struct RemoteDataSequence: AsyncSequence {
typealias Element = Data
var urls: [URL]
func makeAsyncIterator() -> RemoteDataIterator {
RemoteDataIterator(urls: urls)
}
}
注意,上面的typealias实际上并不需要,因为编译器应该能够推断出序列的Element类型,但在Xcode 13 beta 5中似乎不是这样。
接下来,让我们给RemoteDataIterator一个异步改造——其中包括添加关键词async和throws(因为我们希望我们的迭代时能够产生错误数据下载失败),我们可以使用内置的URLSession网络API完全异步的下载我们的数据:
struct RemoteDataIterator: AsyncIteratorProtocol {
var urls: [URL]
fileprivate var urlSession = URLSession.shared
fileprivate var index = 0
mutating func next() async throws -> Data? {
guard index < urls.count else {
return nil
}
let url = urls[index]
index += 1
let (data, _) = try await urlSession.data(from: url)
return data
}
}
有了上面的更改,我们的RemoteDataSequence现在已经变成了一个完全异步的序列,这要求我们在遍历其元素时使用await(在本例中是try)-因为我们的数据现在将在后台下载,并将提交到我们的for循环:
for try await data in RemoteDataSequence(urls: urls) {
...
}
事实上,异步迭代器可以抛出错误是非常强大的,因为这让我们在遇到错误时自动退出for循环,而不需要我们手动跟踪这些错误。 当然,这并不意味着所有异步序列都需要能够抛出。如果在声明迭代器的下一个方法时简单地省略了throws关键字,那么序列将被视为非抛出(并且在遍历其元素时不再需要使用try)。
3. Asynchronous streams
虽然能够定义完全自定义的异步序列非常强大,但标准库还提供了两种独立类型,使我们无需定义自己的任何类型就可以创建这样的序列。这些类型是AsyncStream和AsyncThrowingStream,前者让我们创建非抛出的异步序列,而后者给我们抛出错误的选项。
回到从一系列url下载数据的示例,让我们看看如何使用AsyncThrowingStream实现相同的功能,而不是声明自定义类型。这样做会涉及到启动一个async Task,在其中我们会生成所有下载的数据,然后我们通过报告遇到的任何错误来完成-像这样:
func remoteDataStream(
forURLs urls: [URL],
urlSession: URLSession = .shared
) -> AsyncThrowingStream<Data, Error> {
AsyncThrowingStream { continuation in
Task {
do {
for url in urls {
let (data, _) = try await urlSession.data(from: url)
continuation.yield(data)
}
continuation.finish(throwing: nil)
} catch {
continuation.finish(throwing: error)
}
}
}
}
虽然上面是一个完美的实现,但实际上可以使用另一个AsyncThrowingStream初始化器来简化它——它给了我们一个已经标记为async的闭包,在这个闭包中我们可以专注于返回流中的下一个元素:
func remoteDataStream(
forURLs urls: [URL],
urlSession: URLSession = .shared
) -> AsyncThrowingStream<Data, Error> {
var index = 0
return AsyncThrowingStream {
guard index < urls.count else {
return nil
}
let url = urls[index]
index += 1
let (data, _) = try await urlSession.data(from: url)
return data
}
}
有了上面两个实现中的任何一个,我们现在可以迭代新的异步流,就像我们之前循环自定义的AsyncSequence一样:
for try await data in remoteDataStream(forURLs: urls) {
...
}
因此,AsyncStream和AsyncThrowingStream可以被视为AsyncSequence协议的具体实现,就像Array是同步序列协议的具体实现一样。在大多数情况下,使用流可能是最直接的实现,但如果我们想获得对给定迭代的完全控制,那么构建自定义AsyncSequence可能是最好的方法。
4. How does all of this relate to Combine?
现在,如果您使用过苹果的Combine框架,那么您可能会问自己,这个新的异步序列api套件是如何与该框架相关联的,因为它们都使我们能够随时间发出和观察值。
好消息是,Combine现在已经完全兼容AsyncSequence,这使我们能够将任何Publisher转换为这样的异步值序列。
func remoteDataPublisher(
forURLs urls: [URL],
urlSession: URLSession = .shared
) -> AnyPublisher<Data, URLError> {
urls.publisher
.setFailureType(to: URLError.self)
.flatMap(maxPublishers: .max(1)) {
urlSession.dataTaskPublisher(for: $0)
}
.map(\.data)
.eraseToAnyPublisher()
}
然后要将上述函数返回的AnyPublisher转换为AsyncSequence,我们所要做的就是访问它的values属性——系统会处理剩下的部分:
let publisher = remoteDataPublisher(forURLs: urls)
for try await data in publisher.values {
...
}
很整洁! 在现有的Combine代码库中,上面的API应该被证明是非常有用的,因为它允许我们继续使用那些代码(无需修改!),即使在采用Swift的新并发系统时也是如此。
5. Conclusion
我希望这篇文章能让您对Swift的新AsyncSequence和AsyncStream api的工作原理、它们如何用于实现各种异步序列以及这些新api如何与Combine相关有一些新的见解。