1. Controlling the timing of a Combine pipeline
通常在编写 Combine-powered 数据管道时,我们希望这些管道在每个操作完成后尽快发出值。然而,有时我们可能还希望引入某些延迟,以防止执行不必要的工作,或者能够在一段时间后重试失败的操作。
1.1. Debouncing
当我们的操作基于某种自由形式的用户输入时,我们可能希望在触发给定管道之前等待一小段时间。
例如,假设我们正在构建一个控制器,该控制器管理从数据库加载的items列表,用户可以选择对加载的item应用基于字符串的过滤器。
为了防止在用户快速输入我们的过滤器值所连接的文本字段时执行过多的数据库查询,我们可以在执行数据库调用之前应用debounce操作符。这样,只有在一定时间内(在本例中为0.3秒)没有新值时,Combine才会继续执行我们的管道:
final class ItemListController: ObservableObject {
@Published private(set) var items = [Item]()
@Published var filter = ""
init(database: Database) {
$filter
.removeDuplicates()
.debounce(for: 0.3, scheduler: DispatchQueue.main)
.map(database.loadItemsMatchingFilter)
.switchToLatest()
.assign(to: &$items)
}
}
了解关于上述技术的更多信息,包括为什么使用switchToLatest操作符,请查看“Connecting and merger Combine publisher in Swift”,它详细介绍了这个主题。
1.2. Delayed retries
顾名思义,Combine的内置重试操作符允许我们在遇到错误时自动重试管道的操作。当使用它时,我们只需指定我们想要执行的最大重试次数,Combine会处理剩下的。这里,我们在标准的组合式URLSession数据任务API的自定义版本中使用了这个操作符,它将允许我们在一定时间内自动重试任何失败的网络请求:
extension URLSession {
func decodedDataTaskPublisher<T: Decodable>(
for url: URL,
retryCount: Int = 3,
decodingResultAs resultType: T.Type = T.self,
decoder: JSONDecoder = .init(),
returnQueue: DispatchQueue = .main
) -> AnyPublisher<T, Error> {
dataTaskPublisher(for: url)
.map(\.data)
.decode(type: T.self, decoder: decoder)
.retry(retryCount)
.receive(on: returnQueue)
.eraseToAnyPublisher()
}
}
然而,使用上面的实现,每次重试都将在遇到错误时立即执行,但是如果我们希望在每个重试操作之间应用某种延迟,该怎么办呢?
为此,让我们转向Combine的delay运算符,它允许我们在两个操作之间引入固定的延迟量。 因为在本例中我们只希望延迟重试,所以我们将使用catch操作符创建一个单独的管道,将我们想要的延迟应用于一个常量Void输出值,然后调用flatMap再次触发我们的上游管道,如下所示:
extension Publisher {
func retry<T: Scheduler>(
_ retries: Int,
delay: T.SchedulerTimeType.Stride,
scheduler: T
) -> AnyPublisher<Output, Failure> {
self.catch { _ in
Just(())
.delay(for: delay, scheduler: scheduler)
.flatMap { _ in self }
.retry(retries > 0 ? retries - 1 : 0)
}
.eraseToAnyPublisher()
}
}
注意,我们必须从传递的重试次数中减去1,因为我们的flatMap操作符总是至少运行一次。然而,我们也必须小心不要将该数字变为负数,因为这将导致Combine执行无限次重试。
有了上面的内容,我们就可以像这样更新我们的自定义数据任务API:
extension URLSession {
func decodedDataTaskPublisher<T: Decodable>(
for url: URL,
retryCount: Int = 3,
decodingResultAs resultType: T.Type = T.self,
decoder: JSONDecoder = .init(),
returnQueue: DispatchQueue = .main
) -> AnyPublisher<T, Error> {
dataTaskPublisher(for: url)
.map(\.data)
.decode(type: T.self, decoder: decoder)
.retry(retryCount, delay: 3, scheduler: returnQueue)
.receive(on: returnQueue)
.eraseToAnyPublisher()
}
}
现在在每次重试之间将应用3秒的延迟,这在本例中特别有用,因为当我们第一次启动网络调用时,用户的连接可能暂时处于离线状态
然而,值得指出的是,上面的代码示例并不意味着是一个完整的随时可用的网络实现,因为我们可能只希望在遇到某些错误时重试。 例如,如果我们正在执行一个经过验证的网络调用,而用户的访问令牌已经过时,那么用相同的参数重新尝试这样的调用只会浪费电池和带宽。
要实现这种per-error逻辑,我们可以使用tryCatch操作符,然后抛出我们不希望执行重试的错误。当这样做时,我们的管道将立即失败,并触发我们在调用站点添加的任何错误处理。
像我们有时可能希望在某些管道中引入人为的延迟,也有一些情况下,我们可能希望完全推迟发布者的执行,直到订阅方附加到它。 这正是特殊的Deferred publisher 允许我们做的,当使用Combine的Future类型时,这通常变得特别有用。
例如,假设我们目前正在使用Future来改进一个支持Combine的featureditemloader——通过将传入Future的promise闭包作为一个完成处理器发送给我们之前的基于闭包的API:
extension FeaturedItemsLoader {
var itemsPublisher: Future<[Item], Error> {
Future { [weak self] promise in
self?.loadItems(then: promise)
}
}
}
如果我们希望每个加载操作都立即启动,如果我们不想重试这样的操作,那么上面的方法就可以很好地工作。然而,像下面这样做实际上不会像预期的那样工作:
featuredItemsLoader.itemsPublisher
.retry(5)
.replaceError(with: [])
.sink { items in
// Handle items
...
}
上面的不工作的原因是Future总是只运行一次,然后无论成功还是失败了缓存其结果,这意味着即使上述管道的确会重试5次如果遇到一个错误,这些重试将从我们Future-based itemsPublisher得到相同的输出。
为了解决这个问题,让我们包装我们的Future创建代码在一个Deferred publisher——它将推迟我们潜在的publisher的创建,直到subscriber开始请求值,也会让我们正确重试管道,因为这样做将导致为每个重试创建一个新的Future的实例:
extension FeaturedItemsLoader {
var itemsPublisher: AnyPublisher<[Item], Error> {
Deferred {
Future { [weak self] promise in
self?.loadItems(then: promise)
}
}
.eraseToAnyPublisher()
}
}
然而,虽然Deferred非常有用,但我们不应该在每次使用Future时都使用它。将Deferred看作是lazy属性的Combine等价物——让每个属性都成为lazy属性没有意义,但当我们需要lazy求值所提供的特征时,它是工具箱中一个有用的工具。
1.3. Conclusion
就像Combine的面向管道的设计在设置反应性数据流和观察时非常出色一样,它还可以使实现精确计时和重试等内容变得有点挑战性,但希望本文为您提供了一些如何实现这一点的见解。