Building async and concurrent versions of forEach and map

如果我们仔细想想,我们每天编写的大部分代码实际上都是由一系列数据转换组成的。我们以一种形状或形式获取数据——无论是实际的模型数据、网络响应,还是像用户输入或其他事件之类的东西——然后在其上运行逻辑,最终得到某种形式的输出数据,然后保存、发送或显示给用户。

在本文中,让我们看看在使用forEach和map等函数执行此类数据转换时,如何利用Swift内置的并发系统。


1. Synchronous transformations

让我们先看一个例子,在这个例子中,我们构建了一个MovieListViewController,它允许用户将某些电影标记为收藏。发生这种情况时,我们的视图控制器使用标准库的forEach API迭代用户选择的索引路径,然后调用FavoritesManager,以便将与这些索引路径对应的电影标记为收藏夹:

class MovieListViewController: UIViewController {
    private var movies: [Movie]
    private let favoritesManager: FavoritesManager
    private lazy var tableView = UITableView()
    ...

    func markSelectedMoviesAsFavorites() {
        tableView.indexPathsForSelectedRows?.forEach { indexPath in
            let movie = movies[indexPath.row]
            favoritesManager.markMovieAsFavorite(movie)
        }
    }
}

除了允许将电影标记为收藏外,我们的FavoritesManager还有一个方法,允许我们加载用户先前添加到其收藏夹中的所有电影模型,该方法使用内置的映射方法,通过从数据库检索一组电影ID,将其转换为实际模型:

class FavoritesManager {
    private let database: Database
    private var favoriteIDs = Set<Movie.ID>()
    ...
    
    func loadFavorites() throws -> [Movie] {
        try favoriteIDs.map { id in
            try database.loadMovie(withID: id)
        }
    }

    func markMovieAsFavorite(_ movie: Movie) {
        ...
    }
}

到现在为止,一直都还不错。但是现在让我们看看,如果我们想开始使用Swift的新并发系统以异步、非阻塞的方式执行上述操作,将会发生什么。


2. Going asynchronous

首先,假设我们希望让FavoritesManagerDatabase类型异步执行工作,例如:

class FavoritesManager {
    ...
    
    func markMovieAsFavorite(_ movie: Movie) async {
        ...
    }

    func loadFavorites() async throws -> [Movie] {
        try await favoriteIDs.map { id in
            try await database.loadMovie(withID: id)
        }
    }
}

然而,事实证明,上面的实现实际上并没有编译,因为标准库的map版本(还)不支持异步闭包。

在Swift中,我们通常对各种集合(包括forEach、map、flatMap等API)执行的大多数操作都是使用序列协议扩展实现的,而序列协议扩展又是所有内置集合(如数组、集合和字典)都遵循的协议。这也是用来驱动Swift for循环的协议。

因此,要构建map的异步版本,我们需要做的就是自己扩展序列协议,以便添加新方法:

extension Sequence {
    func asyncMap<T>(
        _ transform: (Element) async throws -> T
    ) async rethrows -> [T] {
        var values = [T]()

        for element in self {
            try await values.append(transform(element))
        }

        return values
    }
}

注意上面的方法是如何用rethrows关键字标记的。这告诉Swift编译器,如果传递给它的闭包也抛出,则只将我们的新方法视为抛出,这使我们能够灵活地在需要时抛出错误,而不需要我们总是使用try调用我们的新方法,即使它的闭包没有抛出。

有了以上内容,我们现在可以返回并用asyncMap替换loadFavorites方法对map的使用,我们新的异步FavoritesManager实现现在已成功编译并准备就绪:

class FavoritesManager {
    ...

    func loadFavorites() async throws -> [Movie] {
        try await favoriteIDs.asyncMap { id in
            try await database.loadMovie(withID: id)
        }
    }
}

接下来,假设我们还希望将MovieListViewController中的forEach迭代也变成异步的,因为这将使我们的UI保持完全响应,而我们的FavoritesManager正忙于将用户选择的电影标记为收藏。

要做到这一点,我们还需要forEach的异步版本,它的实现方式与我们构建asyncMap的方式非常相似:

extension Sequence {
    func asyncForEach(
        _ operation: (Element) async throws -> Void
    ) async rethrows {
        for element in self {
            try await operation(element)
        }
    }
}

在我们开始在视图控制器中使用新的asyncForEach方法之前,有一件事我们需要考虑——因为我们将用户选择的电影标记为收藏的调用现在将异步执行,这意味着我们的电影阵列在执行迭代时可能会发生突变。

因此,为了防止在这种情况下发生bug或崩溃,让我们在调用asyncForEach时按值捕获我们的movies数组,以便在整个迭代过程中始终在同一数组上运行:

class MovieListViewController: UIViewController {
    ...

    func markSelectedMoviesAsFavorites() {
        Task {
            await tableView.indexPathsForSelectedRows?.asyncForEach { 
                [movies] indexPath in

                let movie = movies[indexPath.row]
                await favoritesManager.markMovieAsFavorite(movie)
            }
        }
    }
}

有了这些更改,我们现在已经成功地将两个可能很繁重的数据转换操作转换为异步、非阻塞调用——这反过来将使我们的UI保持响应,而不管这些调用需要多长时间才能完成。


3. Concurrency

不过,我们还可以做一件事来潜在地加速上述操作,那就是使它们并行执行,而不是按顺序执行。因为即使我们的操作现在相对于其他代码是异步的,实际上它们在执行方式上仍然是完全连续的——因为asyncForEach和asyncMap在继续下一个操作之前总是等待一个操作的结果。

因此,为了使我们能够以并行执行所有操作的方式执行给定的forEach调用,让我们编写一个新的并发版本,它利用内置的TaskGroup API

extension Sequence {
    func concurrentForEach(
        _ operation: @escaping (Element) async -> Void
    ) async {
        // A task group automatically waits for all of its
        // sub-tasks to complete, while also performing those
        // tasks in parallel:
        await withTaskGroup(of: Void.self) { group in
            for element in self {
                group.addTask {
                    await operation(element)
                }
            }
        }
    }
}

现在,我们肯定不想同时执行所有forEach调用(因为我们必须记住,在多个并发任务和线程上分散任务也会带来自身的开销和延迟)。然而,在MovieListViewController中将电影标记为收藏的情况下,如果用户最终选择了大量电影,我们可能会获得速度增益,现在我们可以同时迭代这些电影。

因此,让我们用调用新的concurrentForEach方法来代替以前使用的asyncForEach,如下所示:

class MovieListViewController: UIViewController {
    ...

    func markSelectedMoviesAsFavorites() {
        Task {
            await tableView.indexPathsForSelectedRows?.concurrentForEach {
                [movies, favoritesManager] indexPath in

                let movie = movies[indexPath.row]
                await favoritesManager.markMovieAsFavorite(movie)
            }
        }
    }
}

请注意,我们现在如何在闭包中捕获对FavoritesManager的引用。这是因为我们现在正在处理一个转义闭包,它要求我们将所有与自身相关的捕获都显式化。因此,我们不是捕获自我,而是只捕获执行操作所需的实际对象。

随着上述变化的实施,我们最喜欢的标记逻辑现在将在需要时在多部电影上并行运行,这可能会给我们带来显著的速度提升
当然(与大多数与性能相关的事情一样),我们是否要使用asyncForEach或concurrentForEach是我们必须根据每个特定情况下的实际性能指标来决定的,因为对于并发执行是否有好处,没有通用的规则。

另一件重要的事情是在执行给定的操作集之前要考虑的是,我们的基础数据是否支持并发访问。在接下来的文章中,我们将更深入地研究这个主题,包括如何使用Swift的actor类型强制对可变状态进行串行访问。

继续讨论我们的FavoritesManager中的映射操作,现在让我们看看如何使迭代也能够并发执行。这需要一个新的并发版本的map,我们称之为concurrentMap

然而,与concurrentForEach不同,这种新方法不会使用任务组,因为该API在完成顺序方面没有给我们任何保证,这意味着我们最终可能会得到一个与输入顺序不同的输出数组。因此,我们将从为每个元素创建一个异步任务开始,然后使用前面的asyncMap方法依次等待这些任务的结果:

extension Sequence {
    func concurrentMap<T>(
        _ transform: @escaping (Element) async throws -> T
    ) async throws -> [T] {
        let tasks = map { element in
            Task {
                try await transform(element)
            }
        }

        return try await tasks.asyncMap { task in
            try await task.value
        }
    }
}

实现上述目标的另一种方法是将任务组与跟踪输出顺序的字典结合使用。然而,这不仅会使代码更加复杂,而且与当前的实现相比,它也不会给我们带来任何显著的性能提升。

使用新的concurrentMap方法,我们现在可以让FavoritesManager以完全并发的方式加载所有收藏夹-这将使该操作在用户将越来越多的电影添加到收藏夹列表时能够非常好地扩展:

class FavoritesManager {
    ...

    func loadFavorites() async throws -> [Movie] {
        try await favoriteIDs.concurrentMap { [database] id in
            try await database.loadMovie(withID: id)
        }
    }
}

最后一部分就绪后,我们不仅对代码进行了异步改造,还使其同时执行——在此过程中,我们构建了四个完全可重用的序列API——asyncForEach、concurrentForEach、asyncMap和concurrentMap。


4. Conclusion

我希望本文为您提供了一些新的见解,了解如何使用Swift的新并发系统将某些Swift迭代转换为异步或并发操作。同样重要的是,考虑何时(何时不)部署并发,并尝试将这些决策基于实际的性能度量。毕竟,完全同步的代码可能始终是实现、调试和维护最简单的代码。

如果您对使用本文中介绍的新forEach和map变体以及compactMap和flatMap的类似版本感兴趣,请查看CollectionConcurrencyKit,这是一个新的开源Swift包,它将这些API添加到所有Swift集合中。