在使用Combine编写异步代码时,有时我们可能希望共享给定的并发操作集的结果,而不是为每个操作执行重复的工作。让我们看看share操作符是如何让我们以一种非常简洁的方式做到这一点的。

1. Duplicate, concurrent network calls

举个例子,假设我们正在使用下面的ArticleLoader,它使用URLSession从给定的URL加载Article模型:

class ArticleLoader {
    private let urlSession: URLSession
    private let decoder: JSONDecoder

    init(urlSession: URLSession = .shared,
         decoder: JSONDecoder = .init()) {
        self.urlSession = urlSession
        self.decoder = decoder
    }

    func loadArticle(from url: URL) -> AnyPublisher<Article, Error> {
        urlSession
            .dataTaskPublisher(for: url)
            .map(\.data)
            .decode(type: Article.self, decoder: decoder)
            .eraseToAnyPublisher()
    }
}

现在让我们假设我们期望以上loadArticle方法被多次调用,以并行或快速连续的方式,使用相同的URL—这目前会导致重复的网络请求,因为对我们方法的每次调用都会产生一个全新的发布者。

2. Reusing publishers

为了解决这个问题,让我们将创建的每个Publisher存储在一个字典中(以每个Publisher所对应的URL作为关键字),然后当我们接收到loadArticle调用时,我们将首先检查该字典是否包含可重用的已有Publisher——像这样:

class ArticleLoader {
    typealias Publisher = AnyPublisher<Article, Error>

    private let urlSession: URLSession
    private let decoder: JSONDecoder
    private var publishers = [URL: Publisher]()
    ...

    func loadArticle(from url: URL) -> Publisher {
        if let publisher = publishers[url] {
            return publisher
        }

        let publisher = urlSession
            .dataTaskPublisher(for: url)
            .map(\.data)
            .decode(type: Article.self, decoder: decoder)
            .handleEvents(receiveCompletion: { [weak self] _ in
                self?.publishers[url] = nil
            })
            .eraseToAnyPublisher()

        publishers[url] = publisher
        return publisher
    }
}

请注意,我们还将在每个publisher完成后从字典中删除它们,以避免将旧publisher保留在内存中。我们使用receiveCompletion,而不是receiveOutput,也可以在遇到错误时得到通知。

现在,看看上面的代码,乍一看似乎我们已经解决了问题。但是,如果我们查看网络日志(或者简单地在handleEvents闭包中放入一个*print(“Done”)*调用),就会发现我们实际上仍然在执行多个重复的操作。怎么可能呢?

事实证明,即使我们确实重用了publisher实例,也不能保证我们确实重用了那些publisher正在执行的工作。事实上,在默认情况下,每个publisher将通过连接到它的每个subscriber的整个数据管道运行。这一开始可能看起来相当奇怪,所以让我们从一个稍微不同的角度来检查这种行为。

3. New subscriber, new values

作为另一个简单的例子,这里我们创建了一个publisher,它使用Timer每秒钟发布一个新的随机数,然后我们将两个独立的subscriber附加到该publisher,这两个订阅者都只是打印它们收到的数字:

var cancellables = Set<AnyCancellable>()

let randomNumberGenerator = Timer
        .publish(every: 1, on: .main, in: .common)
        .autoconnect()
        .map { _ in Int.random(in: 1...100) }

randomNumberGenerator
    .sink { number in
        print(number)
    }
    .store(in: &cancellables)

randomNumberGenerator
    .sink { number in
        print(number)
    }
    .store(in: &cancellables)

如果我们的两个subscriber每秒都得到相同的数字,这可能会有点奇怪,因为我们希望每个数字都是完全随机的(因此在某种程度上是“唯一的”)。因此,从这个角度来看,Combine出版商为每个订阅者产生独立的输出确实很有意义。

但是,回到我们的ArticleLoader,我们如何修改默认行为以防止执行重复的网络调用呢?

4. Using the share operator

好消息是,我们所要做的就是使用share操作符,它(顾名思义)修改给定的Combine管道,以便其工作结果在所有subscriber之间自动共享:

class ArticleLoader {
    ...

    func loadArticle(from url: URL) -> Publisher {
        if let publisher = publishers[url] {
            return publisher
        }

        let publisher = urlSession
            .dataTaskPublisher(for: url)
            .map(\.data)
            .decode(type: Article.self, decoder: decoder)
            .handleEvents(receiveCompletion: { [weak self] _ in
                self?.publishers[url] = nil
            })
            .share()
            .eraseToAnyPublisher()

        publishers[url] = publisher
        return publisher
    }
}

有了这个小小的改变,我们现在完全解决了我们的问题。 现在,即使多个loadArticle调用快速连续发生,也只会执行一个网络调用,其结果将被报告给每个调用站点。

好吧,也许“完全解决”并不完全正确,因为我们的实现仍然有一个潜在的问题-它目前没有考虑这样一个事实,即我们的ArticleLoader很可能被调用在不同的线程上,而不是在URLSession返回它的数据任务输出的线程上。虽然这可能永远不会导致任何实际问题,但我们是否可以做一个快速的额外工作,使我们的实现完全线程安全呢?

为此,让我们对loadArticle实现做一些调整。首先,我们将基于输入URL创建Combine管道,然后立即跳转到内部DispatchQueue,当从某个publisher接收到完成事件时,我们也将使用这个DispatchQueue。通过这种方式,我们可以保证我们的publishers字典将始终在完全相同的队列上读取和写入:

class ArticleLoader {
    ...
    private let queue = DispatchQueue(label: "ArticleLoader")

    func loadArticle(from url: URL) -> Publisher {
        Just(url)
            .receive(on: queue)
            .flatMap { [weak self, urlSession, queue, decoder] url -> Publisher in
                if let publisher = self?.publishers[url] {
                    return publisher
                }

                let publisher = urlSession
                    .dataTaskPublisher(for: url)
                    .map(\.data)
                    .decode(type: Article.self, decoder: decoder)
                    .receive(on: queue)
                    .handleEvents(receiveCompletion: { [weak self] _ in
                        self?.publishers[url] = nil
                    })
                    .share()
                    .eraseToAnyPublisher()

                self?.publishers[url] = publisher
                return publisher
            }
            .eraseToAnyPublisher()
    }
}

通过这些调整,我们现在有了一个完全线程安全的实现,它成功地重用了发布者,避免了执行任何重复的工作。下一步可能是在上述实现中添加缓存(我们目前只依赖于URLSession提供的默认缓存机制),如果我们认为这是有用的的话。

5. Conclusion

就是如何使用share操作符来避免在Combine管道中重复工作。