在使用Combine编写异步代码时,有时我们可能希望共享给定的并发操作集的结果,而不是为每个操作执行重复的工作。让我们看看share操作符是如何让我们以一种非常简洁的方式做到这一点的。
1. Duplicate, concurrent network calls
举个例子,假设我们正在使用下面的ArticleLoader,它使用URLSession从给定的URL加载Article模型:
class ArticleLoader {
private let urlSession: URLSession
private let decoder: JSONDecoder
init(urlSession: URLSession = .shared,
decoder: JSONDecoder = .init()) {
self.urlSession = urlSession
self.decoder = decoder
}
func loadArticle(from url: URL) -> AnyPublisher<Article, Error> {
urlSession
.dataTaskPublisher(for: url)
.map(\.data)
.decode(type: Article.self, decoder: decoder)
.eraseToAnyPublisher()
}
}
现在让我们假设我们期望以上loadArticle方法被多次调用,以并行或快速连续的方式,使用相同的URL—这目前会导致重复的网络请求,因为对我们方法的每次调用都会产生一个全新的发布者。
2. Reusing publishers
为了解决这个问题,让我们将创建的每个Publisher存储在一个字典中(以每个Publisher所对应的URL作为关键字),然后当我们接收到loadArticle调用时,我们将首先检查该字典是否包含可重用的已有Publisher——像这样:
class ArticleLoader {
typealias Publisher = AnyPublisher<Article, Error>
private let urlSession: URLSession
private let decoder: JSONDecoder
private var publishers = [URL: Publisher]()
...
func loadArticle(from url: URL) -> Publisher {
if let publisher = publishers[url] {
return publisher
}
let publisher = urlSession
.dataTaskPublisher(for: url)
.map(\.data)
.decode(type: Article.self, decoder: decoder)
.handleEvents(receiveCompletion: { [weak self] _ in
self?.publishers[url] = nil
})
.eraseToAnyPublisher()
publishers[url] = publisher
return publisher
}
}
请注意,我们还将在每个publisher完成后从字典中删除它们,以避免将旧publisher保留在内存中。我们使用receiveCompletion,而不是receiveOutput,也可以在遇到错误时得到通知。
现在,看看上面的代码,乍一看似乎我们已经解决了问题。但是,如果我们查看网络日志(或者简单地在handleEvents闭包中放入一个*print(“Done”)*调用),就会发现我们实际上仍然在执行多个重复的操作。怎么可能呢?
事实证明,即使我们确实重用了publisher实例,也不能保证我们确实重用了那些publisher正在执行的工作。事实上,在默认情况下,每个publisher将通过连接到它的每个subscriber的整个数据管道运行。这一开始可能看起来相当奇怪,所以让我们从一个稍微不同的角度来检查这种行为。
3. New subscriber, new values
作为另一个简单的例子,这里我们创建了一个publisher,它使用Timer每秒钟发布一个新的随机数,然后我们将两个独立的subscriber附加到该publisher,这两个订阅者都只是打印它们收到的数字:
var cancellables = Set<AnyCancellable>()
let randomNumberGenerator = Timer
.publish(every: 1, on: .main, in: .common)
.autoconnect()
.map { _ in Int.random(in: 1...100) }
randomNumberGenerator
.sink { number in
print(number)
}
.store(in: &cancellables)
randomNumberGenerator
.sink { number in
print(number)
}
.store(in: &cancellables)
如果我们的两个subscriber每秒都得到相同的数字,这可能会有点奇怪,因为我们希望每个数字都是完全随机的(因此在某种程度上是“唯一的”)。因此,从这个角度来看,Combine出版商为每个订阅者产生独立的输出确实很有意义。
但是,回到我们的ArticleLoader,我们如何修改默认行为以防止执行重复的网络调用呢?
4. Using the share operator
好消息是,我们所要做的就是使用share操作符,它(顾名思义)修改给定的Combine管道,以便其工作结果在所有subscriber之间自动共享:
class ArticleLoader {
...
func loadArticle(from url: URL) -> Publisher {
if let publisher = publishers[url] {
return publisher
}
let publisher = urlSession
.dataTaskPublisher(for: url)
.map(\.data)
.decode(type: Article.self, decoder: decoder)
.handleEvents(receiveCompletion: { [weak self] _ in
self?.publishers[url] = nil
})
.share()
.eraseToAnyPublisher()
publishers[url] = publisher
return publisher
}
}
有了这个小小的改变,我们现在完全解决了我们的问题。 现在,即使多个loadArticle调用快速连续发生,也只会执行一个网络调用,其结果将被报告给每个调用站点。
好吧,也许“完全解决”并不完全正确,因为我们的实现仍然有一个潜在的问题-它目前没有考虑这样一个事实,即我们的ArticleLoader很可能被调用在不同的线程上,而不是在URLSession返回它的数据任务输出的线程上。虽然这可能永远不会导致任何实际问题,但我们是否可以做一个快速的额外工作,使我们的实现完全线程安全呢?
为此,让我们对loadArticle实现做一些调整。首先,我们将基于输入URL创建Combine管道,然后立即跳转到内部DispatchQueue,当从某个publisher接收到完成事件时,我们也将使用这个DispatchQueue。通过这种方式,我们可以保证我们的publishers字典将始终在完全相同的队列上读取和写入:
class ArticleLoader {
...
private let queue = DispatchQueue(label: "ArticleLoader")
func loadArticle(from url: URL) -> Publisher {
Just(url)
.receive(on: queue)
.flatMap { [weak self, urlSession, queue, decoder] url -> Publisher in
if let publisher = self?.publishers[url] {
return publisher
}
let publisher = urlSession
.dataTaskPublisher(for: url)
.map(\.data)
.decode(type: Article.self, decoder: decoder)
.receive(on: queue)
.handleEvents(receiveCompletion: { [weak self] _ in
self?.publishers[url] = nil
})
.share()
.eraseToAnyPublisher()
self?.publishers[url] = publisher
return publisher
}
.eraseToAnyPublisher()
}
}
通过这些调整,我们现在有了一个完全线程安全的实现,它成功地重用了发布者,避免了执行任何重复的工作。下一步可能是在上述实现中添加缓存(我们目前只依赖于URLSession提供的默认缓存机制),如果我们认为这是有用的的话。
5. Conclusion
就是如何使用share操作符来避免在Combine管道中重复工作。