苹果在2019年全球开发者大会(WWDC)上推出的Combine框架让我们可以将各种异步事件和操作建模为“随时间变化的值”。虽然这是一个在响应式编程世界中经常使用的短语,但作为构建逻辑的概念和方式,它最初可能很难完全掌握。

因此,在本文中,让我们来看看Combine的基础知识、响应式编程的一些核心原则是什么,以及它们在实践中如何真正发挥作用。

让我们从publisher开始,它是一种可观察对象,当给定的事件发生时,它就会发出值。Publishers可以无限期地处于活动状态,也可以最终完成,并且在遇到错误时也可以选择失败。

在引入Combine时,苹果还对一些核心库进行了改进,以支持Combine。例如,下面是我们如何使用Foundation的URLSession类型来创建一个发布者来对给定的URL发起网络请求:

let url = URL(string: "https://api.github.com/repos/johnsundell/publish")!
let publisher = URLSession.shared.dataTaskPublisher(for: url)

一旦我们创建了一个publisher,我们就可以给它附加subscriptions,例如通过使用sink API——它允许我们传递一个闭包,以便在接收到新值时调用它,也可以在发布者完成后调用它:

let cancellable = publisher.sink(
    receiveCompletion: { completion in
        // Called once, when the publisher was completed.
        print(completion)
    },
    receiveValue: { value in
        // Can be called multiple times, each time that a
        // new value was emitted by the publisher.
        print(value)
    }
)

注意上面对sink的调用是如何返回一个存储为可取消的值的。 在附加新subscriber时,Combine publisher总是返回一个符合Cancellable协议的对象,该对象充当新订阅的令牌。然后,只要我们希望订阅保持活动状态,我们就需要保留该令牌,因为一旦它被释放,我们的订阅将自动取消(我们也可以通过对令牌调用cancel()来手动取消它)。

接下来,让我们用更多的逻辑来填充上述闭包,从receiveCompletion的逻辑开始,它将传递一个包含两种情况的enum-一个表示遇到的任何错误,一个表示发布者成功完成:

let cancellable = publisher.sink(
    receiveCompletion: { completion in
        switch completion {
        case .failure(let error):
            print(error)
        case .finished:
            print("Success")
        }
    },
    receiveValue: { value in
        print(value)
    }
)

在开始填充receiveValue闭包之前,让我们定义一个简单的数据模型,我们将把下载的数据解码到这个模型中。 因为我们使用的URL指向一个GitHub API端点的存储库(确切地说,发布),让我们将我们的模型声明为一个可编码的结构体,它有两个属性,可以在JSON中找到,我们将下载:

struct Repository: Codable {
    var name: String
    var url: URL
}

有了上面的模型,现在让我们实现receiveValue逻辑——在这个逻辑中,我们将创建一个JSONDecoder来解码下载到存储库值中的数据,如下所示:

let cancellable = publisher.sink(
    receiveCompletion: { completion in
        switch completion {
        case .failure(let error):
            print(error)
        case .finished:
            print("Success")
        }
    },
    receiveValue: { value in
        let decoder = JSONDecoder()

        do {
            // Since each value passed into our closure will be a tuple
            // containing the downloaded data, as well as the network
            // response itself, we're accessing the 'data' property here:
            let repo = try decoder.decode(Repository.self, from: value.data)
            print(repo)
        } catch {
            print(error)
        }
    }
)

虽然上面的方法可以工作,但我们在某种程度上以与使用标准的基于闭包的API相同的方式编写代码——我们将逻辑嵌套在完成处理程序中。这并没有错,但是Combine(以及一般的响应式编程)的真正力量在于构建数据流通过的操作链。

首先,让我们看一下map操作符,它的工作方式与它在集合上的工作方式相同——它允许我们将发布者发出的每个值转换为新的形式。 这样的转换可以简单到访问每个值的属性—例如,这里我们通过提取每个网络结果值的data属性来转换它们,这就给了我们一个发布者来释放数据值:

let dataPublisher = publisher.map(\.data)

除了map, Combine还附带了许多其他操作符,我们可以使用它们以各种方式转换数据。它甚至包括一个操作符,可以让我们在链中直接解码数据——像这样:

let repoPublisher = publisher
    .map(\.data)
    .decode(
        type: Repository.self,
        decoder: JSONDecoder()
    )

虽然我们从发出(Data, URLResponse)值的发布者开始,但通过上面的链,我们现在已经将该发布者转换为直接发出存Repository值的发布者 -这让我们大大简化了订阅代码,因为我们不再需要在闭包中执行任何形式的数据解码:

let cancellable = repoPublisher.sink(
    receiveCompletion: { completion in
        switch completion {
        case .failure(let error):
            print(error)
        case .finished:
            print("Success")
        }
    },
    receiveValue: { repo in
        print(repo)
    }
)

一般的经验法则是尽量让所有的订阅闭包都尽可能的简单——而不是构造上述的反应链,我们的数据可以通过这些反应链来转换为最终的形式。

由于Combine主要用于处理异步事件和值,因此在使用它时经常会遇到线程问题——特别是当我们想在UI代码中使用接收到的值时。 由于苹果的UI框架(UIKit、AppKit、SwiftUI等)在大多数情况下只能从主线程更新,所以我们在编写代码时会遇到这样的问题:

// Two labels that we want to render our data using:
let nameLabel = UILabel()
let errorLabel = UILabel()

let cancellable = repoPublisher.sink(
    receiveCompletion: { completion in
        switch completion {
        case .failure(let error):
            // Rendering a description of the error that was encountered:
            errorLabel.text = error.localizedDescription
        case .finished:
            break
        }
    },
    receiveValue: { repo in
        // Rendering the downloaded repository's name:
        nameLabel.text = repo.name
    }
)

问题是,由于URLSession在后台线程上执行它的工作,我们的订阅在默认情况下也会在同一个后台线程上被触发——这反过来使我们违反了只在主线线程上执行UI更新的规则。

好消息是,使用Combine可以很容易地解决上述问题,因为它还包含了一个操作符,可以让我们切换发布者将在哪个线程(或DispatchQueue)上发送事件-在这种情况下,我们可以使用它跳转到主队列:

let repoPublisher = publisher
    .map(\.data)
    .decode(
        type: Repository.self,
        decoder: JSONDecoder()
    )
    .receive(on: DispatchQueue.main)

这就是使用Combine订阅发布者和使用操作符转换其值的基本知识。接下来,让我们着眼于如何创建自己的发行商,以及在此过程中需要牢记的一些内容。

假设我们正在处理一个简单的计数器类,它跟踪一个可以通过调用increment()方法递增的值——像这样:

class Counter {
    // Using 'private(set)', we ensure that our value can only
    // be modified within the Counter class itself, while still
    // enabling external code to read it:
    private(set) var value = 0

    func increment() {
        value += 1
    }
}

现在让我们使用Combine来订阅计数器值的更改。 首先,我们可以使用Combine的内置PassthroughSubject类型,它既是发布者,又是subject——一个可以通过以下方式发送新值的对象:

class Counter {
    let publisher = PassthroughSubject<Int, Never>()

    private(set) var value = 0 {
        // Whenever our property was set, we send its new value
        // to our subject/publisher:
        didSet { publisher.send(value) }
    }

    func increment() {
        value += 1
    }
}

我们使用Never作为发布者的错误类型,这意味着它永远不会抛出任何错误——这在本例中是完美的,因为我们只向它发送新的Int值。

有了上面的内容,我们现在就可以订阅新的发布者了,就像我们之前使用URLSession执行网络请求时所做的那样——例如:

let counter = Counter()

let cancellable = counter.publisher
    .filter { $0 > 2 }
    .sink { value in
        print(value)
    }

// Since we're filtering out all values below 3, only our final
// increment call will result in a value being printed:
counter.increment()
counter.increment()
counter.increment()

请注意,我们只是将一个闭包传递到上面对sink的调用中,因为我们的发布者不能抛出任何错误,这意味着我们不需要处理它的完成事件(如果我们不想的话)。

然而,尽管上述方法有效,它也有一个相当大的缺点。由于我们的PassthroughSubject既是发布者又是subject,所以任何代码都可以向它发送新值,即使该代码位于Counter类之外——只需调用send():

counter.publisher.send(17)

这不是很好,因为理想情况下,我们希望强制只有Counter可以发送新值——以避免多个真相来源。 值得庆幸的是,这很容易做到,只需创建两个单独的属性——一个只公开PassthroughSubject的publisher部分,另一个私有属性也允许我们作为subject访问它:

class Counter {
    var publisher: AnyPublisher<Int, Never> {
        // Here we're "erasing" the information of which type
        // that our subject actually is, only letting our outside
        // code know that it's a read-only publisher:
        subject.eraseToAnyPublisher()
    }

    private(set) var value = 0 {
        didSet { subject.send(value) }
    }

    // By storing our subject in a private property, we'll only
    // be able to send new values to it from within this class:
    private let subject = PassthroughSubject<Int, Never>()

    func increment() {
        value += 1
    }
}

好多了。现在,我们有了一个强有力的保证,即发布者将发出的值将始终与Counter类的实际状态完全同步。

另一个可以让我们达到同样目的的方法是使用@Published属性包装器

总结一下,以下是Combine整体术语的五个关键部分:

A publisher is an observable object that emits values over time, and that can also optionally complete either when no more values are available, or when it encountered an error.
发布者是一个可观察对象,它会随着时间的推移发出值,当没有更多可用值或遇到错误时,它也可以选择完成。

Objects or closures that are used to observe a publisher are referred to as subscribers.
用于观察发布者的对象或闭包被称为订阅者。

A subject is a mutable object that can be used to send new values through a publisher. Types like PassthroughSubject act as both publishers and subjects.
subject是一种可变对象,可用于通过发布者发送新值。像PassthroughSubject这样的类型同时充当发布者和主题。

Operators are used to build reactive chains or pipelines that our data can flow through, where each operator applies some form of transform to the data that was sent to it.
运营商被用来建立反应链或管道,我们的数据可以通过这些链或管道流动, 每个操作符对发送给它的数据应用某种形式的转换。

cancellable用于跟踪对给定发布者的订阅,只要我们希望该订阅保持活动状态,它就需要被保留。
A cancellable is used to keep track of a subscription to a given publisher, and needs to be retained for as long as we want that subscription to remain active.

Combine是一个令人兴奋的框架,它允许我们使用响应式编程的强大功能,而不需要引入任何第三方依赖——这反过来使我们能够构建逻辑,自动地对随时间变化的值作出反应。

原文链接