乍一看,Apple的Combine框架似乎只是执行各种异步操作的抽象。然而,尽管这是一个关键的方面,但可以肯定的是,Combine的真正力量在于它使我们能够构建越来越复杂的数据管道,可以使用多个输入和转换来加载、准备和处理应用程序的数据。

本周,让我们来看看其中的一些功能,以及它们如何使我们能够以简洁而又非常健壮的方式解决现实世界中的问题。

Loading data from multiple sources

举个例子,假设我们正在开发一个任务管理应用程序,它允许用户创建包含各种任务和待办事项的组。 然后通过网络加载这些组,我们使用下面的taskgrouploader,它反过来使用Combine(以及URLSession和来自“创建通用网络api in Swift”的NetworkResponse包装器)来执行它的工作:

struct TaskGroupsLoader {
    var urlSession = URLSession.shared
    private let decoder = JSONDecoder()

    func loadGroupList() -> AnyPublisher<[Task.Group], Error> {
        urlSession
            .dataTaskPublisher(for: .taskGroups)
            .map(\.data)
            .decode(
                type: NetworkResponse<[Task.Group]>.self,
                decoder: decoder
            )
            .map(\.result)
            .eraseToAnyPublisher()
    }
}

之所以可以简单地使用. taskgroups来引用上面调用的URL,是因为我们已经使用了一系列静态api来扩展URL,这些api返回各种服务器URL。要了解更多关于这种方法的信息,以及一些更强大的替代方法,请查看“管理url和端点”Swift剪辑。

只要我们调用的URL返回客户端需要的所有的Task.Group的数据,上述方法就可以很好地工作。然而,当使用遵循流行REST约定的web api时,为了加载构建完整模型所需的所有数据,必须进行几个单独的调用是非常常见的。

例如,我们假设为了加载某些元数据,比如给定组中包含的任务数量,或者最后更新的时间,我们需要在每组的基础上调用另一个端点- 这意味着我们现在需要执行一系列嵌套的网络任务,以便能够形成完整的Task.Group模型数组。

为了实现这一点,让我们从重新定义task.Group作为一个结构体,它包含了我们在一系列网络调用中想要加载的所有数据:

extension Task {
    struct Group: Identifiable {
        let id: UUID
        var name: String
        var taskCount: Int
        var lastUpdated: Date
    }
}

请注意,我们不再使上面的模型可解码,因为我们不会直接从单个网络响应解码它的实例。 相反,我们将在taskgrouploader中定义两个部分模型,因为这两个模型都应该被视为加载器的私有实现细节,所以我们将使用私有扩展名将它们放在同一个文件中——像这样:

private extension Task.Group {
    struct ListEntry: Decodable {
        let id: Task.Group.ID
        var name: String
    }

    struct Metadata: Decodable {
        var taskCount: Int
        var lastUpdated: Date
    }
}

要了解更多关于以上使用扩展的方式,请查看上周的“Swift中扩展的力量”。

虽然像上面所做的那样使用嵌套类型是改善代码的整体语义和结构的好方法,但重复键入这些长名称可能会有点乏味-因此,让我们也创建两个类型别名,让我们把它们作为入口和元数据在我们的taskgrouploader实现:

private extension TaskGroupsLoader {
    typealias Entry = Task.Group.ListEntry
    typealias Metadata = Task.Group.Metadata
}

有了上面的模型代码,现在让我们来实现我们的嵌套网络调用。首先,我们将添加一个私有方法,用于将已加载的条目转换为完整的Task.Group模型。 为此,我们将再次使用URLSession来加载当前组的Metadata,然后将该操作的结果与传入的Entry结合起来——像这样:

private extension TaskGroupsLoader {
    func loadGroup(
        for entry: Entry
    ) -> AnyPublisher<Task.Group, Error> {
        let url = URL.metadataForTaskGroup(withID: entry.id)

        return urlSession.dataTaskPublisher(for: url)
            .map(\.data)
            .decode(
                type: NetworkResponse<Metadata>.self,
                decoder: decoder
            )
            .map(\.result)
            .map { metadata in
                // Forming our final model by combining the newly
                // loaded Metadata with the Entry that was passed in:
                Task.Group(
                    id: entry.id,
                    name: entry.name,
                    taskCount: metadata.taskCount,
                    lastUpdated: metadata.lastUpdated
                )
            }
            .eraseToAnyPublisher()
    }
}

接下来,让我们实现另一个私有方法,该方法允许我们将一个条目值数组转换为一个组合发布者,该发布者将发出最终的任务Task.Group模型。这样做需要以下三个步骤(不计算对eraseToAnyPublisher的类型擦除调用):

private extension TaskGroupsLoader {
    func loadGroups(
        for entries: [Entry]
    ) -> AnyPublisher<[Task.Group], Error> {
        // First, we convert our Entry array into a publisher:
        entries.publisher
                // Then, we use the flatMap operator to convert
                // each Entry element into a nested publisher using
                // the loadGroup method that we implemented earlier:
               .flatMap(loadGroup)
               // Finally, we collect the results from all of our
               // nested publishers into one final array of task groups:
               .collect()
               .eraseToAnyPublisher()
    }
}

在Combine中,map操作符允许我们同步地将输出值转换为新类型的值,而flatMap操作符则允许我们将输出值转换为新的发布者。

完成上述部分后,剩下的工作就是对原始的taskgrouploader实现做两个小的修改 -首先加载一个数组的条目值(而不是Task.Group)。然后再一次使用flatMap操作符,使用我们新添加的loadGroups方法来加载最终的模型数组:

struct TaskGroupsLoader {
    var urlSession = URLSession.shared
    private let decoder = JSONDecoder()

    func loadGroupList() -> AnyPublisher<[Task.Group], Error> {
        urlSession
            .dataTaskPublisher(for: .taskGroups)
            .map(\.data)
            .decode(
                type: NetworkResponse<[Entry]>.self,
                decoder: decoder
            )
            .map(\.result)
            .flatMap(loadGroups)
            .eraseToAnyPublisher()
    }
}

尽管把上面的代码归类为普遍简单的代码是不公平的,但与我们在没有Combine的情况下实现同种嵌套网络调用所需要做的事情相比,它肯定要简单得多。

通过使用Combine,我们能够将问题分解为几个原子操作链,然后将它们组合到最终的数据加载管道中(这就是框架的名字)——非常棒!

Not a silver bullet against race conditions

我们上面的实现确实有一个相当大的问题——这可能不是很明显,因为我们的代码现在几乎是同步的(尽管它在底层是高度异步和并行的)-这就是最终数组中的Task.Group模型可能会出现无序。

虽然Combine可以自动处理编写并行代码时涉及的许多复杂问题,当使用像flatMap这样的操作符一次执行多个异步操作时,它不会给我们任何输出值的顺序保证。

现在我们的taskgrouploader将发出的Task.Group数组的顺序由每个嵌套网络调用完成的时间决定,这给了我们一个相当可观的竞态条件在应用程序的那部分。

解决这个问题的一种方法是在发出最终输出数组之前显式地对它进行排序。 为了让它更容易做到这一点,让我们开始使用一个转换API(也被称为操作符)来扩展Combine的发布者协议,它可以对任何发布者发出的符合序列的值进行排序——像这样:

extension Publisher where Output: Sequence {
    typealias Sorter = (Output.Element, Output.Element) -> Bool
    func sort(
        by sorter: @escaping Sorter
    ) -> Publishers.Map<Self, [Output.Element]> {
        map { sequence in
            sequence.sorted(by: sorter)
        }
    }
}

然后,我们只需将新的排序操作符添加到loadGroups方法中的管道,以及最后的Task.Group值数组现在具有可预测的顺序。 在这种情况下,一个合理的方法可能是根据最近更新的组对组进行排序,将最近更新的组放在前面:

private extension TaskGroupsLoader {
    func loadGroups(
        for entries: [Entry]
    ) -> AnyPublisher<[Task.Group], Error> {
        entries.publisher
               .flatMap(loadGroup)
               .collect()
               .sort { $0.lastUpdated > $1.lastUpdated }
               .eraseToAnyPublisher()
    }
}

但是,如果我们没有可以用于排序的特定数据块-我们如何仍然可以确保一个稳定的输出顺序基于我们的初始数组的入口值?一种方法是在开始嵌套加载操作之前构造一个索引字典,然后基于这些索引进行最终排序——像这样:

private extension TaskGroupsLoader {
    func loadGroups(
        for entries: [Entry]
    ) -> AnyPublisher<[Task.Group], Error> {
        var indexes = [Task.Group.ID : Int]()

        for (index, entry) in entries.enumerated() {
            indexes[entry.id] = index
        }
        return entries.publisher
               .flatMap(loadGroup)
               .collect()
               .sort { a, b in
                   // Here we can safely force-unwrap both of
                   // our indexes, since we're dealing with local
                   // data that's under our complete control:
                   indexes[a.id]! < indexes[b.id]!
               }
               .eraseToAnyPublisher()
    }
}

有了上述排序策略中的任何一种,我们现在就能够从多个源加载数据,然后将该数据转换为单个的、可预测的输出值数组 - 这真的很好,但我们仍然只是触及表面的Combine的实际能力

Completely reactive pipelines

在下一个系列的例子中,我们将使用下面的searchresultloader,它允许我们使用基于字符串的查询来加载一个SearchResult值数组,以及一个可选的SearchFilter:

struct SearchResultsLoader {
    var urlSession = URLSession.shared
    private let decoder = JSONDecoder()

    func loadResults(
        forQuery query: String,
        filter: SearchFilter?
    ) -> AnyPublisher<[SearchResult], Error> {
        // When given a query that's less than 3 characters long,
        // we simply return an empty array as our result:
        guard query.count > 2 else {
            return Just([])
                .setFailureType(to: Error.self)
                .eraseToAnyPublisher()
        }

        let url = URL.search(for: query, filter: filter)

        return urlSession.dataTaskPublisher(for: url)
            .map(\.data)
            .decode(
                type: NetworkResponse<[SearchResult]>.self,
                decoder: decoder
            )
            .map(\.result)
            .eraseToAnyPublisher()
    }
}

为了将上面的searchresultloader连接到我们的UI,我们将使用一个视图模型,让我们从一个SwiftUI视图或视图控制器观察一个Published-marked输出属性。为了使错误也能正确地传播给用户,我们将使output属性包含一个结果值——给我们以下类声明:

class SearchViewModel: ObservableObject {
    typealias Output = Result<[SearchResult], Error>

    @Published private(set) var output = Output.success([])

    var query = "" { didSet { loadResults() } }
    var filter: SearchFilter? { didSet { loadResults() } }

    private let loader: SearchResultsLoader

    init(loader: SearchResultsLoader = .init()) {
        self.loader = loader
    }
}

最后,让我们实现loadResults方法,当查询或过滤器发生更改时,我们将调用该方法。在该方法中,我们首先调用searchresultloader来获取一个发布者,该发布者发出一个SearchResult值数组。然后,我们将使用这个自定义操作符将发布者转换为一个会发出结果值(而不是单独的错误)的发布者,然后我们可以直接将这些结果值赋给视图模型的输出属性——像这样:

private extension SearchViewModel {
    func loadResults() {
        loader.loadResults(forQuery: query, filter: filter)
              .asResult()
              .receive(on: DispatchQueue.main)
              .assign(to: &$output)
    }
}

请注意我们是如何在执行赋值之前显式地跳转到主队列的,因为我们现在正在处理视图层中要使用的代码。

同样,我们有一个工作得相当不错的实现,但它肯定可以得到改进。 具体来说,如果我们的视图模型能够同时对我们的searchresultloader调用(以避免重复或冗余的网络请求,当它的查询被快速更改时),我们还应该确保一旦有新的网络调用启动,任何延迟的网络调用都会被丢弃。

值得庆幸的是,Combine为实现这类功能提供了完整的支持,但是这次让我们进一步讨论一下-通过使我们的视图模型的实现完全是响应式的,而不是要求我们在每次输入属性发生变化时手动调用loadResults。

为了让那种模式更容易实现,以一种完全兼容SwiftUI和UIKit的方式,让我们来介绍下面的属性包装器,它将允许我们以组合发布者的形式访问任何带有该包装器注释的属性:

@propertyWrapper
struct Input<Value> {
    var wrappedValue: Value {
        get { subject.value }
        set { subject.send(newValue) }
    }

    var projectedValue: AnyPublisher<Value, Never> {
        subject.eraseToAnyPublisher()
    }

    private let subject: CurrentValueSubject<Value, Never>

    init(wrappedValue: Value) {
        subject = CurrentValueSubject(wrappedValue)
    }
}

上面的Input类型与Published属性包装器的区别在于,它不会触发自动objectWillChange publisher,而SwiftUI则使用该publisher来连接observable对象类型到给定的视图体。这意味着我们可以自由地观察输入标记的属性,而不会导致任何不必要的SwiftUI视图更新,或者其他类型的对象会改变观察结果。

接下来,让我们更新初始的SearchViewModel声明,以便现在使用新的输入属性包装器。我们也会移除我们的didSet属性观察者,现在我们在初始化器中调用一个新的configureDataPipeline方法:

class SearchViewModel: ObservableObject {
    typealias Output = Result<[SearchResult], Error>

    @Published private(set) var output = Output.success([])
    @Input var query = ""
    @Input var filter: SearchFilter?

    private let loader: SearchResultsLoader

    init(loader: SearchResultsLoader = .init()) {
        self.loader = loader
        configureDataPipeline()
    }
}

现在,真正酷的部分来了。由于我们现在可以将查询和过滤器作为发布者来观察,所以我们实际上可以使用单个组合管道来构造我们的视图模型的所有内部逻辑。

为此,我们将从观察查询发布者开始,在对其发出的值进行分解和去重复处理之后,我们将使用combineLatest操作符将其与筛选发布者结合起来。然后,我们将使用这两个发布者的组合输出调用我们的searchresultloader,最后,我们将使用switchToLatest操作符总是发出为最新请求加载的结果——像这样:

private extension SearchViewModel {
    func configureDataPipeline() {
        $query
            .dropFirst()
            .debounce(for: 0.5, scheduler: DispatchQueue.main)
            .removeDuplicates()
            .combineLatest($filter)
            .map { [loader] query, filter in
                loader.loadResults(
                    forQuery: query,
                    filter: filter
                )
                .asResult()
            }
            .switchToLatest()
            .receive(on: DispatchQueue.main)
            .assign(to: &$output)
    }
}

我们通过调用dropFirst来启动上述管道的原因是,当订阅附加到一个currentvaluessubject(我们使用它来实现输入属性包装器)时,它会发出当前值。因为在本例中,它总是一个空查询字符串,所以我们直接忽略它。

上述抽象类型的美妙之处在于,它完全隐藏了处理多个输入所涉及的所有复杂性, 网络调用和JSON解码从我们的UI层 -特别是使用SwiftUI实现时,可以保持非常简单:

struct SearchView: View {
    @ObservedObject var viewModel: SearchViewModel
    var body: some View {
        VStack {
            // We'd probably want to use a more properly styled
            // search field here, for example by importing either
            // UISearchTextField or UISearchBar from UIKit:
            TextField("Search", text: $viewModel.query)

            switch viewModel.output {
            case .success(let results):
                List(results) { result in
                    SearchResultView(result: result)
                }
            case .failure(let error):
                ErrorView(error: error)
            }
        }
    }
}

我们的SearchViewModel也完全兼容uikit,因为我们可以手动为查询和过滤器分配新值,我们可以使用Combine的sink操作符来观察视图模型的输出属性以便将搜索结果绑定到UITableView或UICollectionView。更多关于如何在使用UIKit时设置这些类型的绑定,请查看“Swift中的Published properties”。

Conclusion

Combine确实名副其实,它为我们提供了一套强大的工具,让我们能够将多个发行商组合成一个单一的价值流 - 无论这些是UI指定的输入值,还是之前异步操作的输出值。

这通常使我们能够将复杂的异步任务(如嵌套的网络调用)分解为更小的、可组合的构建块-这确实有助于使这样的逻辑更容易阅读、调试和维护。

原文链接