-
- 1.1. FlatMap 函数签名
- 1.1.1. 官方定义
- 1.2. 一些基本概念
- 1.3. 摇摆起来
- 1.4. Events(事件)
- 1.5. PublishSubject
- 1.6. Subscription Chain
- 1.7. Send Event
- 1.8. Filter
- 1.9. Map
- 1.10. 执行流程
- 1.11. FlatMap
- 1.12. The Diagram
- 1.13. The Definition
- 1.14. Using FlatMap
- 1.15. Handling Errors
- 1.16. We’re Subscribing, We’re Subscribing…
- 1.17. FlatMapLatest
- 1.18. User Interactions
- 1.19. Completion Block
- 1.1. FlatMap 函数签名
1. RxSwift: FlatMap的实现细节
ReactiveX中最强大的操作符之一是FlatMap。但对很多人来说,它也是最难理解的操作符之一。
今天我们将深入了解FlatMap,当我们完成时,你将确切地知道它是如何工作的,何时使用它,以及如何避免它周围的一些陷阱。
为了做到这一点,我们将了解一些RxSwift操作符的内部。一旦您理解了一些实现细节,其余部分也会变得容易理解得多。
尽管Combine的实现可能有所不同,但一旦你理解了RxSwift版本背后的关键概念,你就可以在任何地方、任何平台上使用它。
让我们开始吧。
1.1. FlatMap 函数签名
The function signature isn’t much help.
extension ObservableType {
func flatMap<O:Observable>(_ selector:(E) -> O) -> Observable<O.E>
}
为了清晰起见,我稍微编辑了一下签名,但除非你是RxSwift和泛型方面的专家,否则它仍然相当密集。让我们试试官方的定义。
1.1.1. 官方定义
引用ReactiveX网站上的FlatMap文档:
The FlatMap operator transforms an Observable by applying a function that you specify to each item emitted by the source Observable, where that function returns an Observable that itself emits items. FlatMap then merges the emissions of these resulting Observables, emitting these merged results as its own sequence.
This method is useful, for example, when you have an Observable that emits a series of items that themselves have Observable members or are in other ways transformable into Observables, so that you can create a new Observable that emits the complete collection of items emitted by the sub-Observables of these items.
我第一次看到它的时候,它读起来像古梵文,我的眼睛开始变得呆滞。再读一遍(第三遍、第四遍、第五遍)也无济于事。
本文开头所示的大理石图似乎也没有什么意义。3个项目进去了,6个项目出来了——但是它们都混在一起了。这是怎么呢?
但是今天,我能读懂它们并理解它们想要表达的意思,当我们完成时,你们也能。
1.2. 一些基本概念
我将从一些对今天的讨论很重要的基本原理开始。请通读一遍,以确定你和我的看法一致,我们在讨论同样的事情。
在RxSwift中,我们可以订阅一个Observable来接收未来的事件。这与我们在iOS中使用NotificationCenter的方式没有什么不同。
NotificationCenter.default
.addObserver(self, selector: #selector(onOrientationChange(_:)), name: UIDevice.orientationDidChangeNotification, object: nil)
func onOrientationChange(_ sender:AnyObject) {
// do something
}
我们将一个观察者添加到orientationDidChangeNotification,并告诉它我们希望在将来该事件发生时得到通知。因此,当用户旋转他们的设备时,我们的onOrientationChange委托将被调用。
在RxSpeak中,我们订阅了一个Observable来接收未来的事件。因此,在RxSwift中执行类似的操作也就不足为奇了。
NotificationCenter.default
.rx
.notification(UIDevice.orientationDidChangeNotification)
.subscribe(onNext: { _ in
// do something
})
.disposed(by: disposeBag)
唯一真正的区别是我们的订阅处理程序是一个闭包而不是委托函数,并且RxSwift使用了一个额外的dispose (by:)操作符来取消我们不再需要的订阅。
1.3. 摇摆起来
那么,如果我告诉您,我们需要一些代码来做类似于NotificationCenter的事情,但您实际上不能使用NotificationCenter(或Rx或Combine)呢?
如果您仔细考虑一下,您可能会意识到,NotificationCenter的核心实际上就是一个字典,它的键是通知名称,值是一个observer数组。也许让字符串作为键和闭包为订阅者只是为了让实现更简单。
当发送一个通知时,send函数所做的就是在字典中查找该通知,获取订阅数组,然后遍历数组,依次执行每个闭包。“Yo, guys. It’s time. Do your thing.
Something like this:
class MyNotificationCenter {
static var subscriptions: [String:[() -> Void]] = [:]
static func subscribe(name: String, block: @escaping ()->Void) {
var subs = subscriptions[name] ?? []
subs.append(block)
subscriptions[name] = subs
}
static func send(name: String) {
subscriptions[name]?.forEach { $0() }
}
}
然后,你可以这样做
MyNotificationCenter
.subscribe(name: "myEvent") {
// do something when myEvent occurs
}
MyNotificationCenter.send(name: "myEvent")
我想说的是,在这些华丽的行话背后,这些东西只是代码。
这并不是说代码不复杂。RxSwift在其Element类型和锁定中使用泛型,以确保它是线程安全的,并且可以一次性管理订阅生命周期。毫无疑问,所有这些都增加了复杂性。
但是核心概念已经足够简单明了了。
Observable是一个事件源。subscribe这些事件产生一个subscription。发送事件可以非常简单,只需遍历一组订阅者,并告诉每个订阅者他们感兴趣的事件已经发生。
1.4. Events(事件)
在RxSwift中,我们讨论了很多事件。下面是它的实际定义:
public enum Event<Element> {
/// Next element is produced.
case next(Element)
/// Sequence terminated with an error.
case error(Swift.Error)
/// Sequence completed successfully.
case completed
}
事件只是一个包含三种情况的枚举
-
next(Element)用于传递我们感兴趣的值。如果你创建了一个PublishSubject
,那么Element就是String。 -
error(Swift.Error)用于传递任何可能发生的错误。
-
“completed” 只是告诉我们我们已经完成了,并且不会有更多的事件发生。
请注意,如果订阅链生成错误或完成事件,则整个订阅链将关闭。
1.5. PublishSubject
你可能已经注意到,在创建MyNotificationCenter时,我们基本上构建了RxSwift 的PublishSubject的迷你版本。
let publisher = PublishSubject<Void>()
publisher
.subscribe(onNext: { _ in
print("We've got one!")
})
.disposed(by: disposeBag)
publisher.onNext(())
在RxSwift中,publishers使用onNext将下一个元素通过subscription chain向下发送给它的subscribers(对应于前面提到的Event枚举的下一部分)。
1.6. Subscription Chain
RxSwift和Combine都处理订阅链。通常,您有一个某种类型的发布者,它可能在链的中间有一个或两个(或三个或15个)操作符,所有这些操作符最后都以订阅或绑定告终。
let publisher = PublishSubject<Int>()
publisher
.map { $0 * 2.5 }
.filter { $0 > 10 }
.subscribe(onNext: { n in
print("We've got number \(n)!")
})
.disposed(by: disposeBag)
订阅链是在你订阅或绑定到一个可观察对象或一个可观察对象链时构建的。换句话说,这就是可观察链。
let chain = publisher
.map { $0 * 3 }
.filter { $0 > 10 }
订阅链是在代码订阅或绑定到该链的瞬间在后台构建的。
chain
.subscribe(onNext: { n in
print("We've got number \(n)!")
})
.disposed(by: disposeBag)
幕后背后发生的事情很迷人。
实际上,subscribe函数会把闭包藏起来,并订阅链中的最后一个Observable。这里它订阅了过滤器,它告诉它,“老兄,当你碰巧得到一个你想传递的事件时,把它委托给我。”
过滤器操作符依次对它的父操作符map执行相同的操作。“哟。把你收到的任何事件的结果发给我。当然,之后你要对他们做的事都结束了。”
最后,将订阅映射到链中的父节点,即publisher,正如我们已经看到的,publisher只是将subscription添加到它的subscribers列表中。
1.7. Send Event
现在,如果我们告诉发布者发送一个事件给它的观察者…
publisher.onNext(5)
…我们触发以下事件序列:
-
发布者打开订阅委托列表,取出第一个委托,并向其发送一个5。
-
map操作符接收一个5,将值传递给它的闭包,然后将结果发送给链中的下一个委托(3*5=15)。
-
filter操作符接收15并将值传递给它的比较闭包,在本例中该闭包返回true。因为它是true, filter将继续并将当前值发送给它的委托。
-
订阅(subscribe)接收15并将其传递给onNext处理程序,该处理程序打印“我们得到了数字15!”
-
订阅链就完成了。我们的所有嵌套调用都向上返回到发布者循环。
-
循环现在将onNext值5发送到其列表中的下一个订阅者,以此类推,直到发布者所知的每个订阅链都收到了新值的通知。
-
现在我们从onNext语句返回。
这里的要点是,Rx序列并不是黑魔法。告诉发布者onNext(5)启动一个调用委托的循环,该委托调用一个闭包,闭包的值被传递给链中的下一个委托。
这个过程会不断重复,根据操作符的不同,还会有不同的附加功能,直到我们完成。
1.8. Filter
为了更加清楚,让我们来看看过滤器运算符的一个假想实现。(这不是实际的RxSwift实现,但它确实演示了使用的实际机制。)
class FilterOperator: Observable<Element> {
var subscriber: Subscriber<Element>
var predicate: (_ value: Element) -> Bool
func on(_ event: Event<Element>) {
switch event {
case .next(let value):
if predicate(value) {
subscriber.on(.next(value))
}
default:
subscriber.on(event)
}
}
...
}
正如上面所提到的,在这个过程中,令人惊讶的是缺乏黑魔法。
on(_ event: event
该函数switch事件类型。
在下一个事件中,使用提取的值调用存储的闭包(谓词),如果闭包返回true,则将相同的值传递给链中的下一个订阅者。
如果闭包返回false,则不会调用下面的委托,事件会从链中“神奇地”过滤掉。
注意,在此实现中,已完成事件和错误事件只是简单地沿链转发,跳过调用闭包的代码。
1.9. Map
现在让我们尝试Map操作符。
class MapOperator: Observable<Output> {
var subscriber: Subscriber<Output>
var transform: (_ value: Element) -> Output
func on(_ event: Event<Element>) {
switch event {
case .next(let value):
let newValue = transform(value)
subscriber.on(.next(newValue))
default:
subscriber.on(event)
}
}
...
}
再一次,on(_ event: event
在函数内部,使用传递的值调用存储的转换闭包。该函数返回一个新的映射值,同样,它只是作为下一个值传递给链中的下一个订阅者。
1.10. 执行流程
filter和map都是操作的主要例子,它们以平滑、不间断的流操作和传递它们接收到的事件和值。
获得一个值。用这个值调用函数。得到的结果。将结果传递给链中的下一个委托。
Dead simple.
事实上,我们的代码不能以任何其他方式运行。在Swift中,调用一个函数,该函数必须返回一个值。
诚然,该函数可以在一个循环中将160亿个数字加在一起,但就执行流程而言,我们调用的任何函数都必须有一个值退出。
我们调用的任何函数都可以等待某个结果,但这样做会阻塞当前线程——因为该函数必须返回一个值。我是说,这是一个函数的定义。
但是如果我们现在没有这个值,如果我们真的,真的,真的不想阻止当前线程等待它,会发生什么呢?假设我们需要进行一个API调用,然后在服务器给我们一个结果时返回结果?
那么,当我对发送和接收未来事件感兴趣时,我在RxSwift中做什么?
我创建一个可观察对象并订阅它。
1.11. FlatMap
这又回到了我们来这里的原因。让我们再来看看FlatMap的功能签名。
extension ObservableType {
func flatMap<O:Observable>(_ selector:(E) -> O) -> Observable<O.E>
}
现在光线开始透过雾照射出来。FlatMap接受一个函数作为参数,这个函数接受一个值,然后返回一个类型为E的可观察对象。
基本上,你传递给flatMap的闭包是说,“好,我现在没有你想要的东西。但如果你观察Observable,我将返回你感兴趣的(或多个)值——它很快就会出现。"
一旦接收到,FlatMap将把该值(或多个值)传递给链中的下一个订阅者。
根据我们已经看到和完成的内容,我怀疑您可以自己编写基本的实现,但下面是我的示例。
class FlatMapOperator: Observable<Output> {
var subscriber: Subscriber<Output>
var transform: (_ value: Element) -> Observable<Output>
var subscriptions: [Disposable]
func on(_ event: Event<Element>) {
switch event {
case .next(let value):
let disposable = transform(value)
.subscribe(onNext: { newValue in
self.subscriber.onNext(.next(newValue))
}, onError: { e in
self.subscriber.onNext(.error(e))
))
subscriptions.append(disposable)
default:
subscriber.onNext(event)
}
}
...
}
接收一个带有值的事件。将这个值传递给函数,并通过调用transform得到一个Observable作为结果。在内部订阅这个Observable,然后等待它发送给我们一些东西。
当该订阅接收到一个值时,将它传递给我们的委托,就像我们过去对过滤器和映射操作符所做的那样。
跟踪一次性订阅只是内务管理。当我们的运营商离开时,我们会想要关闭我们追踪的所有订阅。
1.12. The Diagram
让我们看看这个麻烦的图是否讲得通。

对于每个接收到的值,闭包{}将把这个值转换成一个可观察对象,这个可观察对象会随着时间的推移返回自己的两个值。(为了清晰起见,我在这些事件中添加了数字。)
我们得到一个红色的值并返回一个最终生成红色1和红色2的可观察对象。这些值沿流发送。然后这个Observable就会完成,flatMap会取消对它的订阅。
我们得到一个绿色的值,它返回一个可观察对象,这个可观察对象反过来产生绿色1,但是在它设法生成第二个值之前,我们收到一个蓝色的事件。它的Observable快速生成blue 1,并被发送到流中。
之后,我们的绿色和蓝色可观察对象都会生成它们自己的第二个事件,分别是绿色2和蓝色2,并将其传递下去。两者均已完成并取消订阅。
最后,我们的输入Observable本身完成了,这将关闭整个订阅链。
如果我们愿意,我们可以在RxSwift中编码这个序列:
let publisher = PublishSubject<String>()
publisher
.flatMap { (color) -> Observable<String> in
return .create { (observer) -> Disposable in
observer.onNext("\(color) 1")
delay(0.1) {
observer.onNext("\(color) 2")
observer.onCompleted()
}
return Disposables.create()
}
}
.subscribe(onNext: { (color) in
print(color)
})
.disposed(by: disposeBag)
delay(0.10) { publisher.onNext("red") }
delay(0.30) { publisher.onNext("green") }
delay(0.35) { publisher.onNext("blue") }
delay(0.90) { publisher.onCompleted() }
函数delay只是一个辅助函数,用于在指定的延迟之后将闭包放到DispatchQueue上。
func delay( _ t: TimeInterval, f: @escaping ()->Void ) {
DispatchQueue.main.asyncAfter(deadline: .now() + t, execute: f)
}
输出是您所期望的,并与上面的图相匹配。
red 1
red 2
green 1
blue 1
green 2
blue 2
1.13. The Definition
那我们最初的定义呢?
“The FlatMap operator transforms an Observable by applying a function that you specify to each item emitted by the source Observable, where that function returns an Observable that itself emits items. FlatMap then merges the emissions of these resulting Observables, emitting these merged results as its own sequence.” — ReactiveX docs
这可能需要一些脑力锻炼,但根据你现在所知道的,至少你的眼睛不应该呆滞。
1.14. Using FlatMap
当我们快结束的时候,你可能会说,“这很好,但我实际上会用它做什么呢?”
FlatMap的一个非常常见的用例是将API调用链接在一起。考虑:
func login(_ credentials: Credentials) -> Observable<User> {
return SSO.knownEndpoint()
.flatMap { (endpoint) -> Observable<String> in
return SSO.authenticate(credentials, endpoint: endpoint)
}
.flatMap { (token) -> Observable<User> in
return Users.loadUser(forToken: token)
}
}
在这里,我调用API以获得授权端点,将token传递给身份验证API,然后将产生的token传递给另一个API,以加载当前用户的信息。
每个被调用的函数返回它自己的Observable,它被简单地传递给FlatMap。下面是调用Users.loadUser,它还演示了RxSwift 6的newdecode操作符。
class Users {
static func loadUser(forToken t: String) -> Observable<User> {
session.post(path: .user, parameters: ["token":t])
.decode(type: User.self, decoder: JSONDecoder())
}
...
}
如上所示,FlatMap可以显著改善你需要将API调用连接在一起的情况,简化代码并避免错误处理金字塔的毁灭。
1.15. Handling Errors
最后一点需要更多的解释。
请注意,如果FlatMap内部的任何订阅返回一个错误,该错误将被传播并发送到订阅链,最终终止对我们的数据的任何订阅。实际上,我们在flatMap订阅处理程序中看到了这种机制。
.subscribe(onNext: { newValue in
self.subscriber.onNext(.next(newValue))
}, onError: { e in
self.subscriber.onNext(.error(e))
))
在上面的登录示例中,这不是问题。我们的订阅链将在每次用户试图登录时重新构建并调用,实际上看到返回的错误正是我们想要的。
但这可能不是你在所有情况下都想要的行为。那么如何避免错误终止长期订阅呢?
一个简单的解决方案是在FlatMap订阅看到错误之前捕获并处理它们。
...
.flatMap { _ in
service.loadArticles()
.catchErrorJustReturn([])
}
...
在这里,我们试图加载一个文章列表,但如果加载失败,我们就打开并返回一个空数组。
我在前一篇文章“RxSwift: CompactMap更好的错误处理”中写过关于FlatMap中的错误处理。
最后一件值得强调的事情是,虽然错误会从FlatMap传播出去,但completions不会。从闭包返回.completed到包含的FlatMap只是一个订阅已经完成的信号,您不再需要监视它。
1.16. We’re Subscribing, We’re Subscribing…
请注意,FlatMap会生成一个新的Observable,因此,对于它接收到的每个事件值,都会产生一个对该Observable的新订阅。
这就是上述定义中的“FlatMap然后合并这些最终可观察对象的排放”部分。这是它的标准行为。这可能正是您想要的,但您应该知道还有另一个常见的FlatMap版本。
1.17. FlatMapLatest
FlatMapLatest做了很多它所宣传的事情,从最新生成的Observable中映射和返回结果。每次它接收到一个值,它就会取消先前订阅(如果有的话),然后订阅闭包返回的最新的Observable。
class FlatMapLatestOperator: Observable<Output> {
var subscriber: Subscriber<Output>
var transform: (_ value: Element) -> Observable<Output>
var disposable: Disposable?
func on(_ event: Event<Element>) {
switch event {
case .next(let value):
disposable?.dispose()
disposable = transform(value)
.subscribe(onNext: { newValue in
self.subscriber.onNext(.next(newValue))
}, onError: { e in
self.subscriber.onNext(.error(e))
))
default:
subscriber.onNext(event)
}
}
...
}
如果我们在上面的颜色生成示例中使用.flatMapLatest而不是.flatMap,那么输出将如下所示.
red 1
red 2
green 1
blue 1
blue 2
注意,绿色的2不见了,因为它的订阅在它的可观察对象生成第二个事件之前就被丢弃了。
如果你的应用中有一个持续很长一段时间的订阅链,你可能会考虑使用.flatmaplatest。
如果您正在执行类似提前输入搜索字段的操作,并且不再关心以前API请求可能返回的任何结果,那么它也很有用。
1.18. User Interactions
FlatMap和FlatMapLatest都等待事件发生并传递,这并不局限于进行API调用。您还可以使用它们来处理用户交互,如确认对话框。
在这个例子中,FlatMapLatest是一个更好的选择,因为我只关心最近的操作。
button.rx.tap
.flatMapLatest { () -> Observable<Continue> in
return Dialogs.confirmSaveDialog()
}
.filter { $0 == .continue }
...
当点击按钮时,flatMapLatest会显示一个构造对话框,然后在继续序列之前等待响应。在这种情况下,将过滤接收到的响应,而序列的其余部分只有在用户点击Continue时才会执行。
这是一个非常强大的概念,我保证以后会在另一篇文章中访问它。
1.19. Completion Block
这就是FlatMap和它的近亲FlatMapLatest。
我已经尽力消除了围绕它的一些困惑,阐明了它在内部是如何工作的,甚至还为您提供了一些如何在代码中使用它的示例。
但是,我主要是试图提供我希望在第一次阅读ReactiveX定义时就能得到的定义和解释。
Enjoy.