首页 >web前端 >js教程 >10需要知道的RXJS功能与示例

10需要知道的RXJS功能与示例

Joseph Gordon-Levitt
Joseph Gordon-Levitt原创
2025-02-17 10:08:10205浏览

10 Need-to-Know RxJS Functions with Examples

本文由 Florian Rappl 和 Moritz Kröger 共同评审。感谢所有 SitePoint 的同行评审员,使 SitePoint 的内容尽善尽美!

随着对函数式反应式编程 (FRP) 兴趣的增长,RxJS 已成为此范例中最流行的 JavaScript 库之一。在本文中,我们将探讨我认为 RxJS 中十大必知函数。

注意:本文假定您熟悉 RxJS 的基础知识,如文章《使用 RxJS 入门函数式反应式编程》中所述。

关键要点

  • RxJS 利用类似于随时间推移填充的数组的可观察对象来促进函数式反应式编程 (FRP),从而允许在应用程序中进行更声明式和强大的错误处理。
  • RxJS 中简单流的核心操作,例如 map()filter()reduce()take(),镜像数组操作,但应用于随时间发出值的数流。
  • flatMap()switch() 这样的专用函数对于分别处理复杂数据结构和管理多个流至关重要,这使得它们对于高级反应式编程任务至关重要。
  • 可以使用 concat()merge()combineLatest() 等运算符有效地组合多个流,每个运算符在流管理和数据同步中发挥不同的作用。
  • takeUntil() 函数提供了一种基于外部条件取消订阅的机制,这说明了 RxJS 在流控制和资源管理方面的灵活性。

反应式编程

反应式编程是一种编程范例,它将称为可观察对象的数据流作为其基本的编程单元。

流——或 RxJS 行话中的可观察对象——类似于事件监听器:两者都等待某些事情发生,并在发生时通知您。从 onClick 监听器获得的一系列异步通知是数据流的完美示例。

换句话说,可观察对象只不过是一个随时间推移填充的数组。

该数组的元素可以来自几乎任何地方:文件系统、DOM 事件、API 调用,甚至转换后的同步数据,如数组。从根本上说,反应式编程只不过是用可观察对象作为程序的构建块。

与数组的关系

数组很简单,因为它们的内容是最终的,除非明确更改。从这个意义上说,数组中没有什么本质上的时间性。

另一方面,可观察对象由时间定义。您最多只能知道流到目前为止已接收 [1, 2, 3]。您不能确定您是否会得到 4——或者不会——并且是数据源,而不是您的程序,决定了这一点。

流和数组之间的关系是如此深刻,以至于大多数反应式扩展都源于函数式编程的世界,其中列表操作是核心。

熟悉 RxJS

考虑一下常见的待办事项应用程序。让我们看看使用 RxJS 如何显示用户未完成任务的名称的问题:

<code class="language-javascript">const task_stream =
  // 创建所有数据库中任务的流
  getTasks().
    // 只获取此用户的任务
    filter((task) => task.user_id == user_id).
    // 获取未完成的任务
    filter((task) => !task.completed).
    // 只获取任务名称
    map((task) => task.name)

/* 任务如下所示:
   task = {
    user_id   : number,
    completed : boolean,
    name      : string
   }
 */</code>

到目前为止,这只不过是数组扩展,但它演示了反应式编程的函数式风格。

通过添加更复杂、“现实世界”的功能,其声明性性质变得清晰。假设我们想要:

  • 响应用户选择查看已完成或未完成的任务来启动请求;
  • 每秒只发送对上次选择的请求一次,以免在用户快速更改选择时浪费带宽;
  • 最多重试三次失败的请求;以及
  • 只有当服务器发送与上次不同的响应时才重绘视图。
<code class="language-javascript">const task_stream =
  parameter_stream.
    debounce(1000).
    map((parameter) => {
      getTasks().
        retry(3).
        filter((task) => task.user_id === user_id).
        filter((task) => task.completed === parameter).
        map((task)    => task.name)
    }).
    flatMap(Rx.Observable.from).
    distinctUntilChanged().
    update()</code>

逐步分解:

  • parameter_stream 告诉我们用户是否想要已完成或未完成的任务,并将选择存储在 parameter 中;
  • debounce() 确保我们每秒钟只关注最后一次按钮点击;
  • getTasks() 周围的部分与之前相同;
  • distinctUntilChanged() 确保我们只在服务器的响应与上次不同时才关注;以及
  • update() 负责更新 UI 以反映我们从服务器获得的内容。

在命令式、基于回调的样式中处理 debounce、retry 和“distinct until changed”逻辑是有效的,但它既脆弱又复杂。

关键在于,使用 RxJS 进行编程允许:

  1. 声明式程序;
  2. 可扩展的系统;以及
  3. 简单直接、强大的错误处理。

在浏览 RxJS 十大必知函数的过程中,我们将遇到上述示例中的每个函数。

简单流操作

简单流(发出简单值,如字符串的流)的基本函数包括:

  • map()
  • filter()
  • reduce()
  • take() / takeWhile()

除了 take() 和 takeWhile() 之外,这些都类似于 JavaScript 的高阶数组函数。

我们将通过解决一个示例问题来应用这些函数:查找数据库中所有具有 .com 或 .org 网站的用户,并计算其网站名称的平均长度。

JSONPlaceholder 将作为我们的用户来源。这是我们将使用的用户数据的 JSON 表示。

1. 使用 map() 转换数据

在可观察对象上使用 map() 与在数组上使用它相同。它:

  1. 接受回调作为参数;
  2. 在您调用的数组的每个元素上执行它;以及
  3. 返回一个新数组,其中原始数组的每个元素都被调用回调在其上产生的结果所替换。

在可观察对象上使用 map() 时,唯一的区别是:

  1. 它返回一个新的可观察对象,而不是一个新的数组;以及
  2. 它在可观察对象发出新项目时执行,而不是立即全部执行一次。

我们可以使用 map() 将我们的用户数据流转换为仅包含其网站名称的列表:

<code class="language-javascript">const task_stream =
  // 创建所有数据库中任务的流
  getTasks().
    // 只获取此用户的任务
    filter((task) => task.user_id == user_id).
    // 获取未完成的任务
    filter((task) => !task.completed).
    // 只获取任务名称
    map((task) => task.name)

/* 任务如下所示:
   task = {
    user_id   : number,
    completed : boolean,
    name      : string
   }
 */</code>

在这里,我们使用 map 将传入流中的每个用户对象替换为每个用户的网站。

RxJS 还允许您调用 map() as select()。这两个名称都指代相同的函数。

2. 过滤结果

像 map() 一样,filter() 在可观察对象上的作用与在数组上的作用大致相同。要查找每个具有 .net 或 .org 网站地址的用户,我们可以这样写:

<code class="language-javascript">const task_stream =
  parameter_stream.
    debounce(1000).
    map((parameter) => {
      getTasks().
        retry(3).
        filter((task) => task.user_id === user_id).
        filter((task) => task.completed === parameter).
        map((task)    => task.name)
    }).
    flatMap(Rx.Observable.from).
    distinctUntilChanged().
    update()</code>

这将只选择其网站以“net”或“org”结尾的用户。

filter() 也有别名 where()。

3. 使用 reduce() 收集结果

reduce() 允许我们使用所有单个值并将它们转换为单个结果。

reduce() 往往是最令人困惑的基本列表操作,因为与 filter() 或 map() 不同,它的行为因使用而异。

通常,reduce() 获取值的集合,并将其转换为单个数据点。在我们的例子中,我们将向它提供一个网站名称流,并使用 reduce() 将该流转换为一个对象,该对象计算我们找到的网站数量以及其名称长度的总和。

<code class="language-javascript">source.
  map((user) => user.website)</code>

在这里,我们将流简化为单个对象,它跟踪:

  1. 我们已经看到了多少个站点;以及
  2. 所有名称的总长度。

请记住,reduce() 只有在源可观察对象完成时才返回结果。如果您想在每次流接收新项目时都知道累加器的状态,请改用 scan()。

4. 使用 take() 限制结果

take() 和 takeWhile() 补充了简单流的基本函数。

take(n) 从流中读取 n 个值,然后取消订阅。

我们可以使用 scan() 在每次我们收到网站时发出我们的对象,并且只 take() 前两个值。

<code class="language-javascript">source.
  map((user) => user.website).
  filter((website) => (website.endsWith('net') || website.endsWith('org'));
})</code>

RxJS 还提供 takeWhile(),它允许您在某个布尔测试成立之前获取值。我们可以这样使用 takeWhile() 来编写上述流:

<code class="language-javascript">source.
  map((user) => user.website).
  filter((website) => (website.endsWith('net') || website.endsWith('org'))).
  reduce((data, website) => {
    return {
      count       : data.count += 1,
      name_length : data.name_length += website.length
    }
  }, { count : 0, name_length : 0 })</code>

高阶流操作

除了它们在可观察对象而不是数组上工作之外,这些函数几乎与熟悉的列表操作相同。

“[I]f you know how to program against Arrays using the Array#extras, then you already know how to use RxJS!” ~ RxJS 文档

正如数组可以包含比简单值(如数组或对象)更复杂的数据一样,可观察对象也可以发出高阶数据,如 Promise 或其他可观察对象。这就是更专业的工具发挥作用的地方。

5. 使用 flatMap() 压平流

……事实上,我们已经在使用了!

当我们定义源流时,我们调用了 fromPromise() 和 flatMap():

<code class="language-javascript">source.
  map((user) => user.website).
  filter((website) => (website.endsWith('net') || website.endsWith('org'))).
  scan((data, website) => {
      return {
        count       : data.count += 1,
        name_length : data.name_length += website.length
      }
    }, { count : 0, name_length : 0 }).
  take(2);</code>

这使用了三个新的机制:

  1. fromPromise;
  2. Rx.Observable.from;以及
  3. flatMap。

来自 promise 的可观察对象

Promise 代表我们将异步获得的单个未来值——例如,对服务器的调用的结果。

Promise 的一个定义特征是它只代表一个未来的值。它不能返回多个异步数据;这是可观察对象所做的,也是两者之间的一个根本区别。

这意味着,当我们使用 Rx.Observable.fromPromise() 时,我们得到一个可观察对象,它发出单个值——或者:

  1. Promise 解析到的值;或
  2. Promise 拒绝的值。

当 Promise 返回字符串或数字时,我们不需要做任何特殊的事情。但是,当它返回数组时(在我们的例子中就是这样),我们更希望创建一个可观察对象,该对象发出数组的内容,而不是数组本身作为单个值。

6. 使用 flatMap()

此过程称为扁平化,flatMap() 会处理它。它有很多重载,但我们只使用最简单和最常用的重载。

使用 flatMap() 时,我们:

  1. 在发出 Promise 的单值解析或拒绝的可观察对象上调用 flatMap();以及
  2. 传递一个函数来创建一个新的可观察对象。

在我们的例子中,我们传递 Rx.Observable.from(),它从数组的值创建一个序列:

<code class="language-javascript">const task_stream =
  // 创建所有数据库中任务的流
  getTasks().
    // 只获取此用户的任务
    filter((task) => task.user_id == user_id).
    // 获取未完成的任务
    filter((task) => !task.completed).
    // 只获取任务名称
    map((task) => task.name)

/* 任务如下所示:
   task = {
    user_id   : number,
    completed : boolean,
    name      : string
   }
 */</code>

这涵盖了我们简短序言中的代码:

<code class="language-javascript">const task_stream =
  parameter_stream.
    debounce(1000).
    map((parameter) => {
      getTasks().
        retry(3).
        filter((task) => task.user_id === user_id).
        filter((task) => task.completed === parameter).
        map((task)    => task.name)
    }).
    flatMap(Rx.Observable.from).
    distinctUntilChanged().
    update()</code>

RxJS 也为 flatMap() 提供了一个别名:selectMany()。

组合多个流

通常,我们将有多个需要组合的流。组合流的方法有很多,但有一些比其他的出现频率更高。

7. 使用 concat() 和 merge() 组合流

连接和合并是组合流的两种最常见方法。

连接通过发出第一个流的值直到它完成,然后发出第二个流的值来创建一个新流。

合并通过发出任何活动流的值来从多个流创建新流

想想在 Facebook Messenger 上同时与两个人交谈。concat() 是您从双方收到消息,但在回复另一个人之前完成与一个人的对话的情况。merge() 就像创建一个群聊并同时接收两条消息流。

<code class="language-javascript">source.
  map((user) => user.website)</code>

concat() 流将首先打印 source1 的所有值,并且只有在 source1 完成后才开始打印 source2 的值。

merge() 流将根据接收到的顺序打印 source1 和 source2 的值:它不会等待第一个流完成,然后再发出第二个流的值。

8. 使用 switch()

通常,我们想监听发出可观察对象的可观察对象,但只关注来自源的最新发射。

为了进一步扩展 Facebook Messenger 的类比,switch() 是您……好吧,根据当前正在发送消息的人来切换您回复的人。

为此,RxJS 提供了 switch。

用户界面为 switch() 提供了几个很好的用例。如果我们的应用程序每次用户选择他们想要搜索的内容时都会发出请求,我们可以假设他们只想查看最新选择的結果。因此,我们使用 switch() 只监听最新选择的結果。

顺便说一下,我们应该确保不要浪费带宽,而只针对用户每秒进行的最后一次选择访问服务器。我们为此使用的函数称为 debounce()

如果您想朝另一个方向前进,并且只遵守第一次选择,则可以使用 throttle()。它具有相同的 API,但行为相反。

9. 协调流

如果我们想允许用户搜索具有特定 ID 的帖子或用户怎么办?

为了演示,我们将创建另一个下拉菜单,并允许用户选择他们想要检索的项目的 ID。

有两种情况。当用户:

  1. 更改任一选择;或
  2. 更改两个选择。

使用 combineLatest() 响应任一流的更改

在第一种情况下,我们需要创建一个流,该流使用以下内容启动网络请求:

  1. 用户最近选择的端点;以及
  2. 用户最近选择的 ID。

……并在用户更新任一选择时执行此操作。

这就是 combineLatest() 的用途:

<code class="language-javascript">const task_stream =
  // 创建所有数据库中任务的流
  getTasks().
    // 只获取此用户的任务
    filter((task) => task.user_id == user_id).
    // 获取未完成的任务
    filter((task) => !task.completed).
    // 只获取任务名称
    map((task) => task.name)

/* 任务如下所示:
   task = {
    user_id   : number,
    completed : boolean,
    name      : string
   }
 */</code>

每当任一流发出值时,combineLatest() 都会获取发出的值并将其与其他流发出的最后一个项目配对,并将该对以数组的形式发出。

这在图表中更容易可视化:

<code class="language-javascript">const task_stream =
  parameter_stream.
    debounce(1000).
    map((parameter) => {
      getTasks().
        retry(3).
        filter((task) => task.user_id === user_id).
        filter((task) => task.completed === parameter).
        map((task)    => task.name)
    }).
    flatMap(Rx.Observable.from).
    distinctUntilChanged().
    update()</code>

使用 zip 只响应两个流的更改

要等到用户更新其对 id 和端点字段的选择后,请将 combineLatest() 替换为 zip()。

同样,这在图表中更容易理解:

<code class="language-javascript">source.
  map((user) => user.website)</code>

与 combineLatest() 不同,zip() 会等到两个可观察对象都发出新内容后才会发出其更新值的数组。

10. takeUntil

最后,takeUntil() 允许我们监听第一个流,直到第二个流开始发出值。

<code class="language-javascript">source.
  map((user) => user.website).
  filter((website) => (website.endsWith('net') || website.endsWith('org'));
})</code>

当您需要协调流但不需要组合它们时,这很有用。

总结

仅仅向数组添加时间维度就开启了对程序进行全新思考的大门。

RxJS 的内容远不止我们在这里看到的这些,但这已经足够走得很远了。

从 RxJS Lite 开始,随时准备参考文档,并抽出时间动手实践。在您不知不觉中,一切都会看起来像一个流……因为一切都是。

关于 RxJS 函数的常见问题解答 (FAQ)

RxJS 与传统 JavaScript 的主要区别是什么?

RxJS 是一个使用可观察对象的反应式编程库,用于简化异步或基于回调的代码的组合。这与使用更命令式编程风格的传统 JavaScript 相比。关键区别在于它们如何处理数据——RxJS 将数据视为流,可以使用各种运算符对其进行操作和转换,而传统 JavaScript 则以更线性的方式处理数据。

如何在 RxJS 中创建可观察对象?

在 RxJS 中,您可以使用新的 Observable() 构造函数创建可观察对象。此构造函数将一个函数作为参数,称为订阅者函数,该函数在最初订阅可观察对象时执行。这是一个基本示例:

<code class="language-javascript">const task_stream =
  // 创建所有数据库中任务的流
  getTasks().
    // 只获取此用户的任务
    filter((task) => task.user_id == user_id).
    // 获取未完成的任务
    filter((task) => !task.completed).
    // 只获取任务名称
    map((task) => task.name)

/* 任务如下所示:
   task = {
    user_id   : number,
    completed : boolean,
    name      : string
   }
 */</code>

RxJS 中的主要运算符是什么以及它们如何工作?

RxJS 具有广泛的运算符,可用于控制数据在可观察对象和观察者之间的流动方式。一些主要运算符包括 map()、filter()、reduce()、merge() 和 concat()。这些运算符中的每一个都以不同的方式操作数据流,例如转换数据、过滤掉某些值或组合多个流。

如何在 RxJS 中处理错误?

RxJS 提供了几个处理错误的运算符,例如 catchError()、retry() 和 retryWhen()。catchError() 运算符用于捕获可观察流上的错误并返回新的可观察对象或抛出错误。retry() 运算符可用于在发生错误时重新订阅可观察对象。retryWhen() 运算符类似,但它提供了对何时重试的更多控制。

如何取消 RxJS 中可观察对象的订阅?

当您订阅可观察对象时,您会收到一个 Subscription,它有一个 unsubscribe() 方法。您可以调用此方法来取消可观察对象的执行并清理正在使用的任何资源。这是一个示例:

<code class="language-javascript">const task_stream =
  parameter_stream.
    debounce(1000).
    map((parameter) => {
      getTasks().
        retry(3).
        filter((task) => task.user_id === user_id).
        filter((task) => task.completed === parameter).
        map((task)    => task.name)
    }).
    flatMap(Rx.Observable.from).
    distinctUntilChanged().
    update()</code>

RxJS 中热可观察对象和冷可观察对象的区别是什么?

在 RxJS 中,可观察对象可以是热的或冷的。冷可观察对象在订阅时开始运行,而热可观察对象即使在订阅之前也会产生值。换句话说,冷可观察对象是惰性的,而热可观察对象不是。

如何在 RxJS 中组合多个可观察对象?

RxJS 提供了几个组合多个可观察对象的运算符,例如 merge()、concat()、combineLatest() 和 zip()。这些运算符中的每一个都以不同的方式组合数据流,具体取决于您的特定需求。

RxJS 中主题的目的是什么?

RxJS 中的主题是一种特殊类型的可观察对象,它允许将值多播到多个观察者。与普通可观察对象不同,主题维护许多监听器的注册表。

如何将 RxJS 与 Angular 一起使用?

Angular 内置支持 RxJS,并在内部将其用于各种功能。您也可以在自己的代码中使用 RxJS 来处理异步操作并实现自动完成、去抖动、节流、轮询等功能。

RxJS 的一些常见用例是什么?

RxJS 可用于需要处理异步数据的各种场景。一些常见的用例包括处理用户输入、发出 HTTP 请求、使用 WebSockets 和处理动画。

以上是10需要知道的RXJS功能与示例的详细内容。更多信息请关注PHP中文网其他相关文章!

声明:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn