-
- 1.1. What is a scheduler?
- 1.2. Demystifying the scheduler
- 1.3. Setting up the project
- 1.4. Switching schedulers
- 1.4.1. Using subscribeOn
- 1.4.2. Using observeOn
- 1.5. Pitfalls
- 1.6. Hot vs. cold
- 1.7. Best practices and built-in schedulers
- 1.7.1. Serial vs concurrent schedulers
- 1.7.2. MainScheduler
- 1.7.3. SerialDispatchQueueScheduler
- 1.7.4. ConcurrentDispatchQueueScheduler
- 1.7.5. OperationQueueScheduler
- 1.7.6. TestScheduler
- 1.8. Where to go from here?
1. Chapter 15: Intro to Schedulers
到目前为止,您已经设法使用调度程序,同时避免解释它们实际上是什么以及它们如何处理线程或并发性。在前面的章节中,您使用了隐式使用某种并发/线程级别的方法,例如buffer、delaySubscription或interval操作符。
您可能会觉得调度程序在内部具有某种魔力,但是在理解调度程序之前,您还需要理解observeOn操作符的全部内容。
本章将介绍调度器背后的美丽之处,您将了解为什么RxSwift抽象如此强大,以及为什么使用异步编程远没有使用锁或队列那么痛苦。
注意:创建自定义schedulers(调度器)程序超出了本书的范围。请记住,RxSwift、RxCocoa和RxBlocking提供的调度器和初始化器通常涵盖99%的情况。始终尝试使用内置schedulers。
1.1. What is a scheduler?
在动手研究调度程序之前,了解它们是什么以及不是什么是很重要的。总之,调度程序是一个过程发生的上下文。这个上下文可以是 thread, a dispatch queue or similar entities, or even an Operation used inside the OperationQueueScheduler。
下面是一个很好的例子来说明如何使用调度器:

在此图中,您了解了缓存操作符的概念。一个可观察对象向服务器发出请求并检索一些数据。该数据由一个名为cache的自定义操作符处理,该操作符将数据存储在某个地方。在此之后,数据被传递给不同调度程序上的所有订阅者,最有可能的是位于主线程之上的MainScheduler,这使得更新UI成为可能。
1.2. Demystifying the scheduler
关于调度器的一个常见误解是,调度器与线程是同等相关的。乍一看,这似乎是合乎逻辑的——毕竟,调度程序的工作方式与GCD的调度队列相似。
但事实并非如此。如果您正在编写自定义调度器(这也是不推荐的方法),您可以使用同一个线程创建多个调度器,或者在多个线程之上创建单个调度器。这可能会很奇怪——但它会起作用!

要记住的重要一点是,调度程序不是线程,它们与线程没有一对一的关系。总是检查调度程序在其中执行操作的上下文—而不是线程。在本章的后面,您将遇到一些很好的例子来帮助您理解这一点。
1.3. Setting up the project
是时候编写一些代码了! 在这个项目中,您将为macOS创建一个简单的命令行工具。为什么使用命令行工具? 因为你在处理线程和并发性,纯文本输出将比你在应用中创建的任何视觉元素更容易理解。
为本章的启动项目安装CocoaPods依赖项,如第一章“Hello RxSwift”所述。(到目前为止,你肯定知道该如何用心去做,但谁也不知道你跳过了多少章节。)完成后,打开工作区,构建并运行,调试器控制台应该显示如下内容:
===== Schedulers =====
00s | [E] [dog] emitted on Main Thread
00s | [S] [dog] received on Main Thread
在继续之前,打开Utils.swift,看看dump()和dumpingSubscription()的实现。
第一个方法使用[E]前缀将元素和当前线程信息转储到do(onNext:)操作符中(用于“Emitted”)。第二个使用[S]前缀(用于“Subscription”)转储类似的信息。它订阅可观察对象,显示它在哪个线程上接收元素。两个函数都显示经过的时间,所以上面的00s表示“经过0秒
这些函数强调了向控制台打印信息的两种不同方式:
-
使用do(onNext:)让你在操作符链中注入副作用(执行不会改变可观察对象序列的" on the side "操作)。
-
订阅可观察序列并从那里打印。
现在,您已经有了在任何给定时间检查正在运行的线程的方法,您已经准备好了解一个可观察链在调度程序之间切换是多么容易了。
1.4. Switching schedulers
RxSwift中最重要的一点是能够在任何时候切换调度程序,除了产生事件的内部进程施加的限制之外,没有任何限制。有很好的理由可以说明为什么你想要控制操作符接收元素的scheduler:
- 在后台调度程序上执行昂贵的工作。
- 控制昂贵的工程是串行的还是并行的。
- 保证在主线程上交付用户界面更新。
注意:当使用允许切换调度程序的操作符时,请确保序列传输的元素是线程安全的。RwSwift本身就像苹果的调度框架:它允许你切换调度程序/线程,不管你的数据的线程安全。
为了理解调度器的行为,您将创建一个简单的可观察对象,它提供了一些结果。
将以下代码添加到main.swift的底部:
let fruit = Observable<String>.create { observer in
observer.onNext("[apple]")
sleep(2)
observer.onNext("[pineapple]")
sleep(2)
observer.onNext("[strawberry]")
return Disposables.create()
}
这个可观察的特征是睡眠功能。虽然这在实际应用程序中并不常见,但在本例中,它将帮助您理解订阅和观察是如何工作的。
添加以下代码来订阅你创建的可观察对象:
fruit
.dump()
.dumpingSubscription()
.disposed(by: bag)
构建并运行,并在控制台中检出日志:
===== Schedulers =====
00s | [E] [dog] emitted on Main Thread
00s | [S] [dog] received on Main Thread
00s | [E] [apple] emitted on Main Thread
00s | [S] [apple] received on Main Thread
02s | [E] [pineapple] emitted on Main Thread
02s | [S] [pineapple] received on Main Thread
04s | [E] [strawberry] emitted on Main Thread
04s | [S] [strawberry] received on Main Thread
这里是最初的实验对象,接下来是每两秒钟一个水果。
水果是在主线程中生成的,但是最好将其移到后台线程中。要在后台线程中创建水果,必须使用subscribeOn。
注意:if the application doesn‘t compile, showing random errors related to missing derived data, clean it using Clean Build Folder under the Product menu and build again.
1.4.1. Using subscribeOn
在某些情况下,你可能想要改变可观察对象计算代码运行的调度程序——不是任何订阅操作符中的代码,而是实际发出可观察对象事件的代码。
对于你创建的自定义可观察对象来说,发出事件的代码是作为**observable.create{…}**的末尾闭包提供的。
为该计算代码设置调度程序的方法是使用subscribeOn。乍一看,这可能听起来有点违反直觉,但经过一段时间的思考,它就开始有意义了。
当你想要真正观察一个可观察对象时,你必须先订阅它。这决定了原始处理将在何处发生。如果subscribe没有被调用,RxSwift会自动使用当前线程:

该进程使用主调度程序在主线程上创建事件。MainScheduler位于主线程之上。希望在主线程上执行的所有任务都必须使用这个调度器,这就是为什么在前面的示例中使用UI时使用它的原因。要切换调度程序,您将使用subscribeOn。
在main.swift中,有一个预定义的名为globalScheduler的调度器,它使用一个后台队列。这个调度器是使用全局调度队列创建的,它是一个并发队列:
let globalScheduler = ConcurrentDispatchQueueScheduler(queue: DispatchQueue.global())
因此,正如类名所暗示的那样,这个调度器要计算的所有任务都将由全局调度队列分派和处理。
要使用这个调度程序,用这个新订阅替换之前创建的fruit订阅:
fruit
.subscribeOn(globalScheduler)
.dump()
.dumpingSubscription()
.disposed(by: bag)
现在将以下行添加到文件末尾:
RunLoop.main.run(until: Date(timeIntervalSinceNow: 13))
无可否认,这是一种黑客行为;一旦主线程上的所有操作都完成了,它将防止Terminal终止,这会杀死你的全局调度程序和可观察对象。在这种情况下,Terminal会存活13秒。
注意:对于这个例子来说,13秒可能太长了,但当你浏览整个章节时,你的应用需要这段时间才能完成。因此,当所有的可观察对象都完成时,请随时停止应用程序。
现在您的新调度程序已经就绪,构建并运行并检查结果:
00s | [E] [dog] emitted on Main Thread
00s | [S] [dog] received on Main Thread
00s | [E] [apple] emitted on Anonymous Thread
00s | [S] [apple] received on Anonymous Thread
02s | [E] [pineapple] emitted on Anonymous Thread
02s | [S] [pineapple] received on Anonymous Thread
04s | [E] [strawberry] emitted on Anonymous Thread
04s | [S] [strawberry] received on Anonymous Thread
全局队列使用一个没有名称的线程,因此在本例中,Anonymous Thread是global, concurrent dispatch queue的线程之一。
现在,emitter和subscriber都在同一个线程中处理数据。

这很酷,但是如果您想改变观察者执行操作符代码的位置,该怎么办呢?你必须使用observe。
1.4.2. Using observeOn
观察是Rx的三个基本概念之一。它包括产生事件的实体和这些事件的观察者。在本例中,与subscribeOn相反,observeOn操作符在观察发生的地方更改调度程序。
因此,一旦一个事件被Observable推入,这个操作符会确保订阅者在指定的调度程序上接收到这个事件。这还包括在observeOn之后添加的所有操作符。
要从当前全局调度器切换到主线程,您需要在订阅之前调用observeOn。再一次,更换你的水果订阅代码:
fruit
.subscribeOn(globalScheduler)
.dump()
.observeOn(MainScheduler.instance)
.dumpingSubscription()
.disposed(by: bag)
编译并运行,并再次检查控制台输出(您需要等待几秒钟,直到程序在控制台中停止打印):
00s | [E] [dog] emitted on Main Thread
00s | [S] [dog] received on Main Thread
00s | [E] [apple] emitted on Anonymous Thread
00s | [S] [apple] received on Main Thread
02s | [E] [pineapple] emitted on Anonymous Thread
02s | [S] [pineapple] received on Main Thread
04s | [E] [strawberry] emitted on Anonymous Thread
04s | [S] [strawberry] received on Main Thread
您已经获得了想要的结果:所有事件现在都在正确的线程上处理。主要可观察对象在后台线程中处理和生成事件,订阅方在主线程中接收事件。

1.5. Pitfalls
切换调度器和线程的能力看起来很棒,但它也有一些缺陷。要了解原因,可以使用新线程将一些事件推送到subject。由于您需要跟踪计算发生在哪个线程上,一个好的解决方案是使用OS线程。
在可观察的fruit之后,添加以下代码来生成一些动物:
let animalsThread = Thread() {
sleep(3)
animal.onNext("[cat]")
sleep(3)
animal.onNext("[tiger]")
sleep(3)
animal.onNext("[fox]")
sleep(3)
animal.onNext("[leopard]")
}
接下来,给线程命名,这样你就能识别它,并启动它:
animalsThread.name = "Animals Thread"
animalsThread.start()
构建和运行;你应该看到你的新线程在运行:
...
03s | [E] [cat] emitted on Animals Thread
03s | [S] [cat] received on Animals Thread
04s | [E] [strawberry] emitted on Anonymous Thread
04s | [S] [strawberry] received on Main Thread
06s | [E] [tiger] emitted on Animals Thread
06s | [S] [tiger] received on Animals Thread
09s | [E] [fox] emitted on Animals Thread
09s | [S] [fox] received on Animals Thread
12s | [E] [leopard] emitted on Animals Thread
12s | [S] [leopard] received on Animals Thread
完美-你有动物创造在专用线程。下一步——在全局线程上处理结果。
注意:不断添加代码,然后用其他东西替换它,这似乎是重复的,但这里的目标是比较不同schedulers之间的差异。
用以下代码替换动物主题的原始订阅:
animal
.dump()
.observeOn(globalScheduler)
.dumpingSubscription()
.disposed(by: bag)
构建并运行,新的结果如下:
...
03s | [E] [cat] emitted on Animals Thread
03s | [S] [cat] received on Anonymous Thread
04s | [E] [strawberry] emitted on Anonymous Thread
04s | [S] [strawberry] received on Main Thread
06s | [E] [tiger] emitted on Animals Thread
06s | [S] [tiger] received on Anonymous Thread
09s | [E] [fox] emitted on Animals Thread
09s | [S] [fox] received on Anonymous Thread
12s | [E] [leopard] emitted on Animals Thread
12s | [S] [leopard] received on Anonymous Thread
现在,你正在切换线程,并接近13秒的限制!
如果你想在全局队列中生成动物,但在主线程中接收它们,该怎么办?对于第一种情况,observeOn已经是正确的,但是对于第二种情况,必须使用subscribeOn。
这次更换动物订阅,如下:
animal
.subscribeOn(MainScheduler.instance)
.dump()
.observeOn(globalScheduler)
.dumpingSubscription()
.disposed(by: bag)
构建并运行,您将得到以下结果:
03s | [E] [cat] emitted on Animals Thread
03s | [S] [cat] received on Anonymous Thread
04s | [E] [strawberry] emitted on Anonymous Thread
04s | [S] [strawberry] received on Main Thread
06s | [E] [tiger] emitted on Animals Thread
06s | [S] [tiger] received on Anonymous Thread
09s | [E] [fox] emitted on Animals Thread
09s | [S] [fox] received on Anonymous Thread
12s | [E] [leopard] emitted on Animals Thread
12s | [S] [leopard] received on Anonymous Thread
等待? 什么?为什么计算没有发生在正确的调度程序上?这是一个常见而危险的陷阱,它源于将RxSwift默认为异步或多线程的想法——事实并非如此。
RxSwift和一般抽象是自由线程的;在处理数据时没有发生神奇的线程切换。如果不指定,计算总是发生在原始线程上。
注意:任何线程切换都发生在程序员使用操作符subscribeOn和observeOn显式请求之后。
认为RxSwift默认会做一些线程处理是一个常见的陷阱。上面发生的是Subject的误用。原始的计算发生在一个特定的线程上,这些事件使用thread(){…}。由于Subject的本质,RxSwift没有能力切换原始的计算调度程序并移动到另一个线程,因为没有直接控制Subject推送到哪里。
为什么这对fruit thread起作用呢? 这是因为使用Observable.create(_:)可以让RxSwift控制Thread块内发生的事情,这样你就可以更精细地定制线程处理。
这个意想不到的结果通常被称为“冷热”观测问题。
在上面的例子中,你正在处理一个hot的可观测对象。这个可观察对象在订阅期间没有任何副作用,但是它有自己的上下文,在这个上下文中产生事件,RxSwift不能控制它(也就是说,它拥有自己的线程)。
相比之下,cold可观测对象在任何观察者订阅它之前都不会产生任何元素。这实际上意味着,在订阅之后,它创建一些上下文并开始生成元素之前,它没有自己的上下文。
1.6. Hot vs. cold
上面的部分涉及了冷热观测的主题。热观测和冷观测的话题是相当武断的,并产生了很多争论,所以让我们在这里简要地看看它。这个概念可以简化为一个非常简单的问题:

一些副作用的例子是:
- 向服务器发出请求。
- 编辑本地数据库。
- 写入文件系统。
- 发射火箭。
副作用的世界是无穷无尽的,所以你需要确定你的Observable实例在订阅后是否会执行副作用。如果您不能确定这一点,那么就执行更多的分析或进一步深入源代码。向每个订阅用户发射火箭可能不是你想要实现的目标……
另一种常见的描述方式是询问Observable是否共享副作用。如果您在订阅时执行副作用,这意味着副作用不被共享。否则,副作用将与所有订阅者共享。
这是一个相当普遍的规则,适用于任何ObservableType对象,比如subject 和相关的子类型。
正如你可能已经注意到的,到目前为止,我们在书中并没有过多讨论冷热观测。这在反应式编程中是一个常见的主题,但在RxSwift中,你只会在特定情况下遇到这个概念,比如上面的Thread例子,或者当你需要更大的控制时,比如运行测试时。
将本节作为参考点,以便在您需要根据热观察或冷观察来解决问题时,您可以快速打开这本书,并在这一点上刷新自己的概念。
1.7. Best practices and built-in schedulers
Schedulers是一个重要的主题,因此它们为最常见的用例提供了一些最佳实践。在本节中,您将快速介绍串行和并发调度器,了解它们如何处理数据,并了解哪种类型对特定上下文更有效。
1.7.1. Serial vs concurrent schedulers
考虑到调度程序只是一个上下文,它可以是任何东西(分派队列、线程、自定义上下文),并且所有转换序列的操作符都需要保留隐式保证,因此需要确保使用了正确的调度程序。
-
如果你使用的是串行调度器,RxSwift会串行地进行计算。对于串行分派队列,调度程序还能够在下面执行它们自己的优化。
-
在并发调度器中,RxSwift将尝试同时运行代码,但observeOn和subscribeOn将保持任务执行的顺序,并确保您的订阅代码最终在正确的调度器上。
1.7.2. MainScheduler
MainScheduler位于主线程之上。此调度器用于处理用户界面上的更改并执行其他高优先级任务。在iOS、tvOS或macOS上开发应用程序时,通常不应该使用此调度器执行长时间运行的任务,因此要避免服务器请求或其他繁重的任务。
此外,如果执行更新UI的副作用,则必须切换到MainScheduler以确保这些更新能够显示在屏幕上。
当使用大多数RxCocoa Traits时,MainScheduler也用于所有的观察,更具体地说,Driver and Signal。正如前面章节所讨论的,这些特征确保观察总是在MainScheduler中执行,从而让您能够将数据直接绑定到应用程序的用户界面。
1.7.3. SerialDispatchQueueScheduler
serialdispatchqueuesscheduler管理对串行DispatchQueue上的工作进行抽象。在使用observeOn时,这个调度器有几个优化的巨大优势。
您可以使用此调度器处理后台作业,这些作业最好以串行方式调度。例如,如果应用程序与服务器的单个端点通信(如在Firebase或GraphQL应用程序中),您可能希望避免同时分派多个请求,这将给接收端带来太大的压力。对于任何应该像串行任务队列一样前进的作业,这个调度器肯定是您希望使用的。
1.7.4. ConcurrentDispatchQueueScheduler
ConcurrentDispatchQueueScheduler,类似于SerialDispatchQueueScheduler,管理在DispatchQueue上抽象工作。这里的主要区别是,调度程序使用一个并发队列,而不是串行队列。
这种调度器在使用observeOn时并没有优化,所以在决定使用哪种调度器时要记住考虑到这一点。
对于需要同时结束的多个长时间运行的任务,并发调度程序可能是一个很好的选择。将多个可观察对象与一个阻塞操作符组合在一起,这样所有的结果都在准备好时被组合在一起,可以阻止串行调度器发挥它们的最佳性能。相反,并发调度可以执行多个并发任务并优化结果的收集。
1.7.5. OperationQueueScheduler
OperationQueueScheduler类似于ConcurrentDispatchQueueScheduler,但是它不是通过DispatchQueue抽象工作,而是通过OperationQueue执行工作。有时您需要对正在运行的并发作业进行更多的控制,而使用并发DispatchQueue无法做到这一点。
如果需要调优最大并发作业数,则使用该作业的调度器。您可以设置maxConcurrentOperationCount来限制并发操作的数量,以满足应用程序的需要。
1.7.6. TestScheduler
TestScheduler是一种特殊的野兽。它只用于测试,所以不应该在生产代码中使用它。这种特殊的调度程序简化了运算符测试;它是RxTest库的一部分。您将在关于测试的专门章节中了解如何使用这个调度程序,但是既然您正在进行RxSwift调度程序的伟大之旅,让我们快速浏览一下。
RxSwift测试套件提供了这个调度器的一个很好的用例。打开以下链接:https://bit.ly/2E0xVUq。你会找到一个专门用来测试delaySubscription操作符Observable+DelaySubscriptionTests.swift的文件,特别是一个名为testDelaySubscription_TimeSpan_Simple的测试用例。在这个测试用例中,你有调度程序的初始化:
let scheduler = TestScheduler(initialClock: 0)
在这个初始化之后,你就有了要测试的可观察对象的定义:
let xs = scheduler.createColdObservable([
next(50, 42),
next(60, 43),
completed(70)
])
在定义期望之前,你有如何得到结果的声明:
let res = scheduler.start {
xs.delaySubscription(30, scheduler: scheduler)
}
调度程序将使用之前定义的xs可观察对象创建Res。该结果包含有关发送的事件的所有信息,以及测试调度程序跟踪的时间。
有了这个,你可以像这样写一个测试用例:
XCTAssertEqual(res.events, [
next(280, 42),
next(290, 43),
completed(300)
])
想知道为什么事件发生在280,而不是80(考虑到最初的50,加上延迟的30)?这是由于testScheduler的特性,它会在200之后启动对ColdObservable的所有订阅。这个技巧确保冷观测不会在不可预知的时间开始——这会让测试变成一场噩梦!
同样的事情不适用于HotObservable,所以HotObservable会立即开始推事件。
在测试delaySubscription操作符时,仅发送事件及其时间的信息不足以处理这些事件。您将需要关于订阅时间的额外信息,以确保一切按预期运行。
xs.subscriptions,你可以获得订阅列表来进行测试的最后一部分:
XCTAssertEqual(xs.subscriptions, [
Subscription(230, 300)
])
第一个数字定义了第一次订阅的开始时间。第二个定义了何时释放订阅。在本例中,第二个数字与已完成的事件匹配,因为完成将释放所有订阅。
1.8. Where to go from here?
调度程序在RxSwift空间中是一个重要的话题;他们负责计算和执行RxSwift中的所有任务。调度的黄金法则是它可以是任何东西。记住这一点,在使用可观察对象以及使用和更改调度器时,您将能够很好地处理这些问题。
如前所述,调度可以位于DispatchQueue、OperationQueue、Thread的顶部,甚至可以在当前线程上立即执行任务。关于这一点没有硬性规则,所以确保你知道你正在使用什么调度器来处理手头的任务。有时,使用错误的调度器可能会对性能产生负面影响,而正确选择的调度器可能会获得很高的性能回报。
继续之前,花点时间研究一下当前示例,并测试一些调度器,看看它们对最终结果有什么影响。理解调度程序将使RxSwift的生活更轻松,并将提高您在使用subscribeOn和observeOn时的信心。