Async sequences, streams, and Combine

当使用标准for循环遍历任何Swift集合时,有两个关键组件决定将哪些元素传递到迭代代码中——序列和迭代器(a sequence, and an iterator)。

虽然在编写Swift代码时,我们经常直接与序列交互,但我们很少需要自己处理迭代器,因为每当我们使用for循环时,语言会自动为我们管理这些实例。

然而,由专用类型显式控制的迭代非常强大,因为这使我们能够编写自己的、完全定制的序列,这些序列可以以与循环内置类型(如Array、Dictionary和Set)相同的方式进行迭代。

在本文中,让我们看看这个系统是如何工作的,以及它是如何扩展到Swift并发世界的——使我们能够创建完全异步的序列和流,随时间传递它们的值。

1. Sequences and iterators

假设我们想要构建一个自定义序列,让我们可以从一系列url下载数据。为了实现这一点,我们首先需要实现一个符合Sequence协议的自定义类型,该类型会使用必需的makeIterator方法创建一个迭代器,如下所示:

struct RemoteDataSequence: Sequence {
    var urls: [URL]

    func makeIterator() -> RemoteDataIterator {
        RemoteDataIterator(urls: urls)
    }
}

接下来,让我们实现上面的RemoteDataIterator类型,它通过每次系统请求新元素时递增索引属性来执行迭代:

struct RemoteDataIterator: IteratorProtocol {
    var urls: [URL]
    fileprivate var index = 0

    mutating func next() -> Data? {
        guard index < urls.count else {
            return nil
        }

        let url = urls[index]
        index += 1

        // If a download fails, we simply move on to
        // the next URL in this case:
        guard let data = try? Data(contentsOf: url) else {
            return next()
        }

        return data
    }
}

我们不需要担心管理多个并发迭代,因为每次启动新的for循环时,系统都会自动创建一个新的迭代器(使用序列的makeIterator方法)。

有了上面的内容,我们现在就可以用迭代数组时使用的语法来迭代所有下载的数据了:

for data in RemoteDataSequence(urls: urls) {
    ...
}

很酷的! 然而,像这样同步下载数据可能不是一个很好的主意(除非编写脚本或命令行工具),因为这样做将完全阻塞当前线程,直到所有下载完成。因此,让我们来探索一下如何将上述内容转换为异步序列。

2. Asynchronous iterations

Swift 5.5的新并发系统引入了异步序列和迭代器的概念,它们的定义方式几乎与同步序列完全相同。 所以,要使我们的RemoteDataSequence异步,我们所要做的就是使它符合AsyncSequence协议,并实现makeAsyncIterator方法——像这样:

struct RemoteDataSequence: AsyncSequence {
    typealias Element = Data

    var urls: [URL]

    func makeAsyncIterator() -> RemoteDataIterator {
        RemoteDataIterator(urls: urls)
    }
}

注意,上面的typealias实际上并不需要,因为编译器应该能够推断出序列的Element类型,但在Xcode 13 beta 5中似乎不是这样。

接下来,让我们给RemoteDataIterator一个异步改造——其中包括添加关键词asyncthrows(因为我们希望我们的迭代时能够产生错误数据下载失败),我们可以使用内置的URLSession网络API完全异步的下载我们的数据:

struct RemoteDataIterator: AsyncIteratorProtocol {
    var urls: [URL]
    fileprivate var urlSession = URLSession.shared
    fileprivate var index = 0

    mutating func next() async throws -> Data? {
        guard index < urls.count else {
            return nil
        }

        let url = urls[index]
        index += 1

        let (data, _) = try await urlSession.data(from: url)
        return data
    }
}

有了上面的更改,我们的RemoteDataSequence现在已经变成了一个完全异步的序列,这要求我们在遍历其元素时使用await(在本例中是try)-因为我们的数据现在将在后台下载,并将提交到我们的for循环:

for try await data in RemoteDataSequence(urls: urls) {
    ...
}

事实上,异步迭代器可以抛出错误是非常强大的,因为这让我们在遇到错误时自动退出for循环,而不需要我们手动跟踪这些错误。 当然,这并不意味着所有异步序列都需要能够抛出。如果在声明迭代器的下一个方法时简单地省略了throws关键字,那么序列将被视为非抛出(并且在遍历其元素时不再需要使用try)。

3. Asynchronous streams

虽然能够定义完全自定义的异步序列非常强大,但标准库还提供了两种独立类型,使我们无需定义自己的任何类型就可以创建这样的序列。这些类型是AsyncStream和AsyncThrowingStream,前者让我们创建非抛出的异步序列,而后者给我们抛出错误的选项。

回到从一系列url下载数据的示例,让我们看看如何使用AsyncThrowingStream实现相同的功能,而不是声明自定义类型。这样做会涉及到启动一个async Task,在其中我们会生成所有下载的数据,然后我们通过报告遇到的任何错误来完成-像这样:

func remoteDataStream(
    forURLs urls: [URL],
    urlSession: URLSession = .shared
) -> AsyncThrowingStream<Data, Error> {
    AsyncThrowingStream { continuation in
        Task {
            do {
                for url in urls {
                    let (data, _) = try await urlSession.data(from: url)
                    continuation.yield(data)
                }

                continuation.finish(throwing: nil)
            } catch {
                continuation.finish(throwing: error)
            }
        }
    }
}

虽然上面是一个完美的实现,但实际上可以使用另一个AsyncThrowingStream初始化器来简化它——它给了我们一个已经标记为async的闭包,在这个闭包中我们可以专注于返回流中的下一个元素:

func remoteDataStream(
    forURLs urls: [URL],
    urlSession: URLSession = .shared
) -> AsyncThrowingStream<Data, Error> {
    var index = 0

    return AsyncThrowingStream {
        guard index < urls.count else {
            return nil
        }

        let url = urls[index]
        index += 1

        let (data, _) = try await urlSession.data(from: url)
        return data
    }
}

有了上面两个实现中的任何一个,我们现在可以迭代新的异步流,就像我们之前循环自定义的AsyncSequence一样:

for try await data in remoteDataStream(forURLs: urls) {
    ...
}

因此,AsyncStream和AsyncThrowingStream可以被视为AsyncSequence协议的具体实现,就像Array是同步序列协议的具体实现一样。在大多数情况下,使用流可能是最直接的实现,但如果我们想获得对给定迭代的完全控制,那么构建自定义AsyncSequence可能是最好的方法。

4. How does all of this relate to Combine?

现在,如果您使用过苹果的Combine框架,那么您可能会问自己,这个新的异步序列api套件是如何与该框架相关联的,因为它们都使我们能够随时间发出和观察值。

好消息是,Combine现在已经完全兼容AsyncSequence,这使我们能够将任何Publisher转换为这样的异步值序列。

func remoteDataPublisher(
    forURLs urls: [URL],
    urlSession: URLSession = .shared
) -> AnyPublisher<Data, URLError> {
    urls.publisher
        .setFailureType(to: URLError.self)
        .flatMap(maxPublishers: .max(1)) {
            urlSession.dataTaskPublisher(for: $0)
        }
        .map(\.data)
        .eraseToAnyPublisher()
}

然后要将上述函数返回的AnyPublisher转换为AsyncSequence,我们所要做的就是访问它的values属性——系统会处理剩下的部分:

let publisher = remoteDataPublisher(forURLs: urls)

for try await data in publisher.values {
    ...
}

很整洁! 在现有的Combine代码库中,上面的API应该被证明是非常有用的,因为它允许我们继续使用那些代码(无需修改!),即使在采用Swift的新并发系统时也是如此。

5. Conclusion

我希望这篇文章能让您对Swift的新AsyncSequence和AsyncStream api的工作原理、它们如何用于实现各种异步序列以及这些新api如何与Combine相关有一些新的见解。