就像序列代码一样,并发代码可以有许多不同的形状和形式。取决于我们试图实现的目标——无论是异步获取数据块、从磁盘加载大量文件,还是执行一组相关操作——将被证明是最适合的抽象可能因用例不同而有很大差异。

这样的并发编程抽象之一就是任务。虽然乍一看,任务可能与Futures & promise和Foundation的操作API非常相似,但在它们的行为方式以及它们给API用户的控制级别上有一些明显的不同。

本周,让我们看看其中的一些区别,以及一些任务可以变得非常有用的场景。

The task at hand

假设我们正在构建一个社交网络应用程序,并且我们向用户提供了在发布帖子时附加照片和视频的选项。目前,当用户点击“Publish”时,我们调用以下函数,它上传所有附加的媒体,也上传帖子本身的数据,如下所示:

func publish(_ post: Post) {
    for photo in post.photos {
        upload(photo)
    }

    for video in post.videos {
        upload(video)
    }

    upload(post)
}

虽然上面的代码非常简单,但是有一些问题。 首先,当发布完成时,我们无法得到通知。我们也不做任何错误处理,这意味着如果照片或视频上传失败 -我们将继续上传post数据,这并不理想。

有相当多的方法,我们可以着手解决上述问题。一种想法可能是使用内置的Operation和OperationQueue类型来顺序执行所有操作 - 但这需要我们要么让我们的网络代码同步,或子类化Operation以创建非常自定义的解决方案。这两种选择都是有效的,但可能感觉有点“严厉”,因为它们需要对原始代码进行很大的修改。

幸运的是,为了获得我们需要的控制,我们所要做的就是深入堆栈一层, 并使用了Operation和OperationQueue基于的框架——GCD。

就像我们在“Swift中央调度的深入研究”中看到的,GCD让我们可以很容易地将一系列操作组合在一起, 并在完成所有任务后得到通知。为了实现这一点,我们将对之前的上传功能做一些微调-所以他们现在给我们提供了一种使用闭包来观察他们完成的方法, 并平衡呼叫进入和离开一个调度组,以便在所有媒体上传完成时得到通知:

func publish(_ post: Post) {
    let group = DispatchGroup()
    for photo in post.photos {
        group.enter()
        upload(photo) {
            group.leave()
        }
    }
    for video in post.videos {
        group.enter()
        upload(video) {
            group.leave()
        }
    }
    // Calling ‘notify’ allows us to observe whenever the whole
    // group was finished using a closure.
    group.notify(queue: .main) {
        upload(post)
    }
}

上面的代码运行得非常好,给我们留下了非常整洁的代码。但是我们仍然没有解决缺乏错误处理的问题-无论媒体上传成功与否,我们还是会盲目上传帖子。

为了解决这个问题,让我们添加一个可选的Error变量,我们将使用它来跟踪发生的任何错误。我们将对上传函数做另一个调整,让它们将一个可选的错误参数传递给它们的完成处理器, 并使用它来捕获遇到的第一个错误-像这样:

func publish(_ post: Post) {
    let group = DispatchGroup()
    var anyError: Error?
    for photo in post.photos {
        group.enter()
        upload(photo) { error in
            anyError = anyError ?? error
            group.leave()
        }
    }
    for video in post.videos {
        group.enter()
        upload(video) { error in
            anyError = anyError ?? error
            group.leave()
        }
    }
    group.notify(queue: .main) {
        // If an error was encountered while uploading the
        // post’s media, we’ll call an error handling function
        // instead of proceeding with the post data upload.
        if let error = anyError {
            return handle(error)
        }
        upload(post)
    }
}

我们现在已经修复了原始代码的所有正确性问题,但在这个过程中,我们也让它变得更加复杂和难以阅读。 我们的新解决方案也相当繁琐(有一个需要跟踪的本地错误变量和调度组调用),这在这里可能不是问题,但只要我们开始移动更多的异步代码来使用这个模式-事情会很快变得更难维护。

It’s abstraction time!

让我们看看是否可以通过引入在GCD之上引入一个基于任务的瘦抽象层来简化上述操作.

首先,我们将创建一个名为Task的类型,它本质上是一个闭包的封装包,它将访问控制任务流的控制器:

struct Task {
    typealias Closure = (Controller) -> Void
    private let closure: Closure
    init(closure: @escaping Closure) {
        self.closure = closure
    }
}

Controller类型反过来提供了完成或失败与其关联的任务的方法, 并通过调用一个处理器来报告任务的结果:

extension Task {
    struct Controller {
        fileprivate let queue: DispatchQueue
        fileprivate let handler: (Outcome) -> Void
        func finish() {
            handler(.success)
        }
        func fail(with error: Error) {
            handler(.failure(error))
        }
    }
}

正如上面的代码所示,Result有两种情况——.success和.failure——这使得它非常类似于带有泛型Void的Result类型。 实际上,我们可以选择将Result实现为它自己的enum类型,或者简单地使用“Swift类型别名的力量”中的一种技术,使其成为Result的通用简写:

extension Task {
    enum Outcome {
        case success
        case failure(Error)
    }
}
extension Task {
    typealias Outcome = Result<Void>
}

最后,我们需要一种方法来实际执行我们将定义的任务。 为此,让我们在Task上添加一个perform方法,它将接受一个显式的DispatchQueue来执行任务-或者简单地检索任何全局处理程序-以及在任务执行完成后调用的处理程序。在内部,我们将使用给定的DispatchQueue来异步执行任务, 通过创建一个控制器并将它传递到任务的闭包中——像这样:

extension Task {
    func perform(on queue: DispatchQueue = .global(),
                 then handler: @escaping (Outcome) -> Void) {
        queue.async {
            let controller = Controller(
                queue: queue,
                handler: handler
            )
            self.closure(controller)
        }
    }
}

有了上面的内容,我们的任务API的初始版本就完成了,我们可以开始使用它了。让我们从定义一个任务开始,它将取代之前的照片上传功能 -它调用一个底层的Uploader类,然后使用任务的控制器通知我们结果:

extension Task {
    static func uploading(_ photo: Photo,
                          using uploader: Uploader) -> Task {
        return Task { controller in
            uploader.upload(photo.data, to: photo.url) { error in
                if let error = error {
                    controller.fail(with: error)
                } else {
                    controller.finish()
                }
            }
        }
    }
}

现在我们的第一个任务可用了-让我们看看在调用站点使用它是什么样子的:

for photo in photos {
    let task = Task.uploading(photo, using: uploader)

    task.perform { outcome in
        // Handle outcome
    }
}

很酷!😎然而,当我们开始对任务进行分组和排序时,任务的力量才真正开始显现,这将让我们以一种非常优雅的方式解决我们最初的问题。所以让我们继续!

Grouping

在前面,为了跟踪一组操作何时完成,我们使用了一个输入和离开的DispatchGroup,因此让我们将该逻辑移植到新的任务系统中。为此,我们将在Task上添加一个静态方法,它将接受一个任务数组并将它们分组在一起。 在底层,我们仍将使用与之前完全相同的分派组逻辑——但现在封装在一个更好的API中:

extension Task {
    static func group(_ tasks: [Task]) -> Task {
        return Task { controller in
            let group = DispatchGroup()

            // To avoid race conditions with errors, we set up a private
            // queue to sync all assignments to our error variable
            let errorSyncQueue = DispatchQueue(label: "Task.ErrorSync")
            var anyError: Error?

            for task in tasks {
                group.enter()

                // It’s important to make the sub-tasks execute
                // on the same DispatchQueue as the group, since
                // we might cause unexpected threading issues otherwise.
                task.perform(on: controller.queue) { outcome in
                    switch outcome {
                    case .success:
                        break
                    case .failure(let error):
                        errorSyncQueue.sync {
                            anyError = anyError ?? error
                        }
                    }
                    group.leave()
                }
            }
            group.notify(queue: controller.queue) {
                if let error = anyError {
                    controller.fail(with: error)
                } else {
                    controller.finish()
                }
            }
        }
    }
}

有了上面的代码,我们就可以大大简化以前的媒体上传代码。我们现在所要做的就是将每个媒体片段映射到一个任务中,然后将这些任务的组合数组传递到我们新的group API中,将它们都分组到一个单一的任务中——像这样:

let photoTasks = post.photos.map { photo in
    return Task.uploading(photo, using: uploader)
}
let videoTasks = post.videos.map { video in
    return Task.uploading(video, using: uploader)
}
let mediaGroup = Task.group(photoTasks + videoTasks)

我们不依赖于完成分组任务的顺序时,分组是很好的,但并不总是这样。回到我们最初的问题,没有上传一个帖子的数据,直到我们确定所有媒体都成功上传-这就是一个例子。理想情况下,我们希望能够将媒体上传操作与完成post上传的操作进行链结或排序。

让我们看看如何扩展Task来支持它

Sequencing

与其使用DispatchGroup(它对我们的操作顺序没有任何意见),不如通过跟踪当前任务的索引来实现排序,然后在前一个任务完成后继续执行下一个任务。一旦我们到达了任务列表的末尾,我们将考虑完成序列:

extension Task {
    static func sequence(_ tasks: [Task]) -> Task {
        var index = 0

        func performNext(using controller: Controller) {
            guard index < tasks.count else {
                // We’ve reached the end of our array of tasks,
                // time to finish the sequence.
                controller.finish()
                return
            }
            let task = tasks[index]
            index += 1
            task.perform(on: controller.queue) { outcome in
                switch outcome {
                case .success:
                    performNext(using: controller)
                case .failure(let error):
                    // As soon as an error was occurred, we’ll
                    // fail the entire sequence.
                    controller.fail(with: error)
                }
            }
        }
        return Task(closure: performNext)
    }
}

我们不简单地使用串行调度队列来实现排序的原因是,我们不能假设我们的序列总是在串行队列上被调度 - API用户可以选择在任何类型的队列上执行它。

上面我们利用了Swift同时支持第一类函数和内联函数定义的事实——因为我们将performNext函数作为一个闭包传递来为我们的序列创建一个任务。

就是这样——信不信由你,但我们实际上已经从头开始构建了一个完整的基于任务的并发系统!😀

Putting all the pieces together

所有的部分就绪之后,让我们最后更新原始的邮政发布代码,以充分利用我们的新系统所提供的一切。 我们不必跟踪错误,也不必因为网络调用的不可预测性而遇到bug。现在,我们可以简单地将一组媒体上传操作与一个post上传任务结合起来,形成一个序列,就像这样:

func publish(_ post: Post,
             then handler: @escaping (Outcome) -> Void) {
    let photoTasks = post.photos.map { photo in
        return Task.uploading(photo, using: uploader)
    }

    let videoTasks = post.videos.map { video in
        return Task.uploading(video, using: uploader)
    }

    let sequence = Task.sequence([
        .group(photoTasks + videoTasks),
        .uploading(post, using: uploader)
    ])

    sequence.perform(then: handler)
}

上述解决方案的美妙之处在于,所有内容都是并发执行的——使用分派队列-在幕后,但作为一个API用户,我们所要做的就是创建一些任务,并告诉系统我们想要如何组合它们。由于我们的抽象是如此的薄弱,如果我们遇到了一个问题或意想不到的行为,我们只需要降低一级来调试。

Conclusion

任务可以是抽象大量并发代码的好方法,这些代码要么应该以尽可能多的并行性执行,要么等待前一个任务完成后再继续执行。它们本质上提供了一种方法,可以在GCD之上创建一个简单、薄的层——这让我们能够以一种非常好的方式利用它的所有功能。

正如本文前面提到的,当然还有许多其他方法可以在Swift中实现或使用并发性. Futures & promises使得隐藏正在进行的并发很容易,但代价是控制量稍低——而像RxSwift这样的框架使构建更复杂的执行链成为可能,但使用了更重的抽象。

我的建议是在Swift中尝试多种不同类型的并发编程——看看哪种(或多种)最适合你、你的团队和你的项目。希望本文能让您对快速实现任务的一种方法有一些了解,以及与社区中常用的其他解决方案相比,任务的实现方式如何。如果是这样的话,我认为我的使命完成了😀。

原文链接