搜索
首页web前端js教程10需要知道的RXJS功能与示例

10 Need-to-Know RxJS Functions with Examples

本文由 Florian Rappl 和 Moritz Kröger 共同评审。感谢所有 SitePoint 的同行评审员,使 SitePoint 的内容尽善尽美!

随着对函数式反应式编程 (FRP) 兴趣的增长,RxJS 已成为此范例中最流行的 JavaScript 库之一。在本文中,我们将探讨我认为 RxJS 中十大必知函数。

注意:本文假定您熟悉 RxJS 的基础知识,如文章《使用 RxJS 入门函数式反应式编程》中所述。

关键要点

  • RxJS 利用类似于随时间推移填充的数组的可观察对象来促进函数式反应式编程 (FRP),从而允许在应用程序中进行更声明式和强大的错误处理。
  • RxJS 中简单流的核心操作,例如 map()filter()reduce()take(),镜像数组操作,但应用于随时间发出值的数流。
  • flatMap()switch() 这样的专用函数对于分别处理复杂数据结构和管理多个流至关重要,这使得它们对于高级反应式编程任务至关重要。
  • 可以使用 concat()merge()combineLatest() 等运算符有效地组合多个流,每个运算符在流管理和数据同步中发挥不同的作用。
  • takeUntil() 函数提供了一种基于外部条件取消订阅的机制,这说明了 RxJS 在流控制和资源管理方面的灵活性。

反应式编程

反应式编程是一种编程范例,它将称为可观察对象的数据流作为其基本的编程单元。

流——或 RxJS 行话中的可观察对象——类似于事件监听器:两者都等待某些事情发生,并在发生时通知您。从 onClick 监听器获得的一系列异步通知是数据流的完美示例。

换句话说,可观察对象只不过是一个随时间推移填充的数组。

该数组的元素可以来自几乎任何地方:文件系统、DOM 事件、API 调用,甚至转换后的同步数据,如数组。从根本上说,反应式编程只不过是用可观察对象作为程序的构建块。

与数组的关系

数组很简单,因为它们的内容是最终的,除非明确更改。从这个意义上说,数组中没有什么本质上的时间性。

另一方面,可观察对象由时间定义。您最多只能知道流到目前为止已接收 [1, 2, 3]。您不能确定您是否会得到 4——或者不会——并且是数据源,而不是您的程序,决定了这一点。

流和数组之间的关系是如此深刻,以至于大多数反应式扩展都源于函数式编程的世界,其中列表操作是核心。

熟悉 RxJS

考虑一下常见的待办事项应用程序。让我们看看使用 RxJS 如何显示用户未完成任务的名称的问题:

const task_stream =
  // 创建所有数据库中任务的流
  getTasks().
    // 只获取此用户的任务
    filter((task) => task.user_id == user_id).
    // 获取未完成的任务
    filter((task) => !task.completed).
    // 只获取任务名称
    map((task) => task.name)

/* 任务如下所示:
   task = {
    user_id   : number,
    completed : boolean,
    name      : string
   }
 */

到目前为止,这只不过是数组扩展,但它演示了反应式编程的函数式风格。

通过添加更复杂、“现实世界”的功能,其声明性性质变得清晰。假设我们想要:

  • 响应用户选择查看已完成或未完成的任务来启动请求;
  • 每秒只发送对上次选择的请求一次,以免在用户快速更改选择时浪费带宽;
  • 最多重试三次失败的请求;以及
  • 只有当服务器发送与上次不同的响应时才重绘视图。
const task_stream =
  parameter_stream.
    debounce(1000).
    map((parameter) => {
      getTasks().
        retry(3).
        filter((task) => task.user_id === user_id).
        filter((task) => task.completed === parameter).
        map((task)    => task.name)
    }).
    flatMap(Rx.Observable.from).
    distinctUntilChanged().
    update()

逐步分解:

  • parameter_stream 告诉我们用户是否想要已完成或未完成的任务,并将选择存储在 parameter 中;
  • debounce() 确保我们每秒钟只关注最后一次按钮点击;
  • getTasks() 周围的部分与之前相同;
  • distinctUntilChanged() 确保我们只在服务器的响应与上次不同时才关注;以及
  • update() 负责更新 UI 以反映我们从服务器获得的内容。

在命令式、基于回调的样式中处理 debounce、retry 和“distinct until changed”逻辑是有效的,但它既脆弱又复杂。

关键在于,使用 RxJS 进行编程允许:

  1. 声明式程序;
  2. 可扩展的系统;以及
  3. 简单直接、强大的错误处理。

在浏览 RxJS 十大必知函数的过程中,我们将遇到上述示例中的每个函数。

简单流操作

简单流(发出简单值,如字符串的流)的基本函数包括:

  • map()
  • filter()
  • reduce()
  • take() / takeWhile()

除了 take() 和 takeWhile() 之外,这些都类似于 JavaScript 的高阶数组函数。

我们将通过解决一个示例问题来应用这些函数:查找数据库中所有具有 .com 或 .org 网站的用户,并计算其网站名称的平均长度。

JSONPlaceholder 将作为我们的用户来源。这是我们将使用的用户数据的 JSON 表示。

1. 使用 map() 转换数据

在可观察对象上使用 map() 与在数组上使用它相同。它:

  1. 接受回调作为参数;
  2. 在您调用的数组的每个元素上执行它;以及
  3. 返回一个新数组,其中原始数组的每个元素都被调用回调在其上产生的结果所替换。

在可观察对象上使用 map() 时,唯一的区别是:

  1. 它返回一个新的可观察对象,而不是一个新的数组;以及
  2. 它在可观察对象发出新项目时执行,而不是立即全部执行一次。

我们可以使用 map() 将我们的用户数据流转换为仅包含其网站名称的列表:

const task_stream =
  // 创建所有数据库中任务的流
  getTasks().
    // 只获取此用户的任务
    filter((task) => task.user_id == user_id).
    // 获取未完成的任务
    filter((task) => !task.completed).
    // 只获取任务名称
    map((task) => task.name)

/* 任务如下所示:
   task = {
    user_id   : number,
    completed : boolean,
    name      : string
   }
 */

在这里,我们使用 map 将传入流中的每个用户对象替换为每个用户的网站。

RxJS 还允许您调用 map() as select()。这两个名称都指代相同的函数。

2. 过滤结果

像 map() 一样,filter() 在可观察对象上的作用与在数组上的作用大致相同。要查找每个具有 .net 或 .org 网站地址的用户,我们可以这样写:

const task_stream =
  parameter_stream.
    debounce(1000).
    map((parameter) => {
      getTasks().
        retry(3).
        filter((task) => task.user_id === user_id).
        filter((task) => task.completed === parameter).
        map((task)    => task.name)
    }).
    flatMap(Rx.Observable.from).
    distinctUntilChanged().
    update()

这将只选择其网站以“net”或“org”结尾的用户。

filter() 也有别名 where()。

3. 使用 reduce() 收集结果

reduce() 允许我们使用所有单个值并将它们转换为单个结果。

reduce() 往往是最令人困惑的基本列表操作,因为与 filter() 或 map() 不同,它的行为因使用而异。

通常,reduce() 获取值的集合,并将其转换为单个数据点。在我们的例子中,我们将向它提供一个网站名称流,并使用 reduce() 将该流转换为一个对象,该对象计算我们找到的网站数量以及其名称长度的总和。

source.
  map((user) => user.website)

在这里,我们将流简化为单个对象,它跟踪:

  1. 我们已经看到了多少个站点;以及
  2. 所有名称的总长度。

请记住,reduce() 只有在源可观察对象完成时才返回结果。如果您想在每次流接收新项目时都知道累加器的状态,请改用 scan()。

4. 使用 take() 限制结果

take() 和 takeWhile() 补充了简单流的基本函数。

take(n) 从流中读取 n 个值,然后取消订阅。

我们可以使用 scan() 在每次我们收到网站时发出我们的对象,并且只 take() 前两个值。

source.
  map((user) => user.website).
  filter((website) => (website.endsWith('net') || website.endsWith('org'));
})

RxJS 还提供 takeWhile(),它允许您在某个布尔测试成立之前获取值。我们可以这样使用 takeWhile() 来编写上述流:

source.
  map((user) => user.website).
  filter((website) => (website.endsWith('net') || website.endsWith('org'))).
  reduce((data, website) => {
    return {
      count       : data.count += 1,
      name_length : data.name_length += website.length
    }
  }, { count : 0, name_length : 0 })

高阶流操作

除了它们在可观察对象而不是数组上工作之外,这些函数几乎与熟悉的列表操作相同。

“[I]f you know how to program against Arrays using the Array#extras, then you already know how to use RxJS!” ~ RxJS 文档

正如数组可以包含比简单值(如数组或对象)更复杂的数据一样,可观察对象也可以发出高阶数据,如 Promise 或其他可观察对象。这就是更专业的工具发挥作用的地方。

5. 使用 flatMap() 压平流

……事实上,我们已经在使用了!

当我们定义源流时,我们调用了 fromPromise() 和 flatMap():

source.
  map((user) => user.website).
  filter((website) => (website.endsWith('net') || website.endsWith('org'))).
  scan((data, website) => {
      return {
        count       : data.count += 1,
        name_length : data.name_length += website.length
      }
    }, { count : 0, name_length : 0 }).
  take(2);

这使用了三个新的机制:

  1. fromPromise;
  2. Rx.Observable.from;以及
  3. flatMap。

来自 promise 的可观察对象

Promise 代表我们将异步获得的单个未来值——例如,对服务器的调用的结果。

Promise 的一个定义特征是它只代表一个未来的值。它不能返回多个异步数据;这是可观察对象所做的,也是两者之间的一个根本区别。

这意味着,当我们使用 Rx.Observable.fromPromise() 时,我们得到一个可观察对象,它发出单个值——或者:

  1. Promise 解析到的值;或
  2. Promise 拒绝的值。

当 Promise 返回字符串或数字时,我们不需要做任何特殊的事情。但是,当它返回数组时(在我们的例子中就是这样),我们更希望创建一个可观察对象,该对象发出数组的内容,而不是数组本身作为单个值。

6. 使用 flatMap()

此过程称为扁平化,flatMap() 会处理它。它有很多重载,但我们只使用最简单和最常用的重载。

使用 flatMap() 时,我们:

  1. 在发出 Promise 的单值解析或拒绝的可观察对象上调用 flatMap();以及
  2. 传递一个函数来创建一个新的可观察对象。

在我们的例子中,我们传递 Rx.Observable.from(),它从数组的值创建一个序列:

const task_stream =
  // 创建所有数据库中任务的流
  getTasks().
    // 只获取此用户的任务
    filter((task) => task.user_id == user_id).
    // 获取未完成的任务
    filter((task) => !task.completed).
    // 只获取任务名称
    map((task) => task.name)

/* 任务如下所示:
   task = {
    user_id   : number,
    completed : boolean,
    name      : string
   }
 */

这涵盖了我们简短序言中的代码:

const task_stream =
  parameter_stream.
    debounce(1000).
    map((parameter) => {
      getTasks().
        retry(3).
        filter((task) => task.user_id === user_id).
        filter((task) => task.completed === parameter).
        map((task)    => task.name)
    }).
    flatMap(Rx.Observable.from).
    distinctUntilChanged().
    update()

RxJS 也为 flatMap() 提供了一个别名:selectMany()。

组合多个流

通常,我们将有多个需要组合的流。组合流的方法有很多,但有一些比其他的出现频率更高。

7. 使用 concat() 和 merge() 组合流

连接和合并是组合流的两种最常见方法。

连接通过发出第一个流的值直到它完成,然后发出第二个流的值来创建一个新流。

合并通过发出任何活动流的值来从多个流创建新流

想想在 Facebook Messenger 上同时与两个人交谈。concat() 是您从双方收到消息,但在回复另一个人之前完成与一个人的对话的情况。merge() 就像创建一个群聊并同时接收两条消息流。

source.
  map((user) => user.website)

concat() 流将首先打印 source1 的所有值,并且只有在 source1 完成后才开始打印 source2 的值。

merge() 流将根据接收到的顺序打印 source1 和 source2 的值:它不会等待第一个流完成,然后再发出第二个流的值。

8. 使用 switch()

通常,我们想监听发出可观察对象的可观察对象,但只关注来自源的最新发射。

为了进一步扩展 Facebook Messenger 的类比,switch() 是您……好吧,根据当前正在发送消息的人来切换您回复的人。

为此,RxJS 提供了 switch。

用户界面为 switch() 提供了几个很好的用例。如果我们的应用程序每次用户选择他们想要搜索的内容时都会发出请求,我们可以假设他们只想查看最新选择的結果。因此,我们使用 switch() 只监听最新选择的結果。

顺便说一下,我们应该确保不要浪费带宽,而只针对用户每秒进行的最后一次选择访问服务器。我们为此使用的函数称为 debounce()

如果您想朝另一个方向前进,并且只遵守第一次选择,则可以使用 throttle()。它具有相同的 API,但行为相反。

9. 协调流

如果我们想允许用户搜索具有特定 ID 的帖子或用户怎么办?

为了演示,我们将创建另一个下拉菜单,并允许用户选择他们想要检索的项目的 ID。

有两种情况。当用户:

  1. 更改任一选择;或
  2. 更改两个选择。

使用 combineLatest() 响应任一流的更改

在第一种情况下,我们需要创建一个流,该流使用以下内容启动网络请求:

  1. 用户最近选择的端点;以及
  2. 用户最近选择的 ID。

……并在用户更新任一选择时执行此操作。

这就是 combineLatest() 的用途:

const task_stream =
  // 创建所有数据库中任务的流
  getTasks().
    // 只获取此用户的任务
    filter((task) => task.user_id == user_id).
    // 获取未完成的任务
    filter((task) => !task.completed).
    // 只获取任务名称
    map((task) => task.name)

/* 任务如下所示:
   task = {
    user_id   : number,
    completed : boolean,
    name      : string
   }
 */

每当任一流发出值时,combineLatest() 都会获取发出的值并将其与其他流发出的最后一个项目配对,并将该对以数组的形式发出。

这在图表中更容易可视化:

const task_stream =
  parameter_stream.
    debounce(1000).
    map((parameter) => {
      getTasks().
        retry(3).
        filter((task) => task.user_id === user_id).
        filter((task) => task.completed === parameter).
        map((task)    => task.name)
    }).
    flatMap(Rx.Observable.from).
    distinctUntilChanged().
    update()

使用 zip 只响应两个流的更改

要等到用户更新其对 id 和端点字段的选择后,请将 combineLatest() 替换为 zip()。

同样,这在图表中更容易理解:

source.
  map((user) => user.website)

与 combineLatest() 不同,zip() 会等到两个可观察对象都发出新内容后才会发出其更新值的数组。

10. takeUntil

最后,takeUntil() 允许我们监听第一个流,直到第二个流开始发出值。

source.
  map((user) => user.website).
  filter((website) => (website.endsWith('net') || website.endsWith('org'));
})

当您需要协调流但不需要组合它们时,这很有用。

总结

仅仅向数组添加时间维度就开启了对程序进行全新思考的大门。

RxJS 的内容远不止我们在这里看到的这些,但这已经足够走得很远了。

从 RxJS Lite 开始,随时准备参考文档,并抽出时间动手实践。在您不知不觉中,一切都会看起来像一个流……因为一切都是。

关于 RxJS 函数的常见问题解答 (FAQ)

RxJS 与传统 JavaScript 的主要区别是什么?

RxJS 是一个使用可观察对象的反应式编程库,用于简化异步或基于回调的代码的组合。这与使用更命令式编程风格的传统 JavaScript 相比。关键区别在于它们如何处理数据——RxJS 将数据视为流,可以使用各种运算符对其进行操作和转换,而传统 JavaScript 则以更线性的方式处理数据。

如何在 RxJS 中创建可观察对象?

在 RxJS 中,您可以使用新的 Observable() 构造函数创建可观察对象。此构造函数将一个函数作为参数,称为订阅者函数,该函数在最初订阅可观察对象时执行。这是一个基本示例:

const task_stream =
  // 创建所有数据库中任务的流
  getTasks().
    // 只获取此用户的任务
    filter((task) => task.user_id == user_id).
    // 获取未完成的任务
    filter((task) => !task.completed).
    // 只获取任务名称
    map((task) => task.name)

/* 任务如下所示:
   task = {
    user_id   : number,
    completed : boolean,
    name      : string
   }
 */

RxJS 中的主要运算符是什么以及它们如何工作?

RxJS 具有广泛的运算符,可用于控制数据在可观察对象和观察者之间的流动方式。一些主要运算符包括 map()、filter()、reduce()、merge() 和 concat()。这些运算符中的每一个都以不同的方式操作数据流,例如转换数据、过滤掉某些值或组合多个流。

如何在 RxJS 中处理错误?

RxJS 提供了几个处理错误的运算符,例如 catchError()、retry() 和 retryWhen()。catchError() 运算符用于捕获可观察流上的错误并返回新的可观察对象或抛出错误。retry() 运算符可用于在发生错误时重新订阅可观察对象。retryWhen() 运算符类似,但它提供了对何时重试的更多控制。

如何取消 RxJS 中可观察对象的订阅?

当您订阅可观察对象时,您会收到一个 Subscription,它有一个 unsubscribe() 方法。您可以调用此方法来取消可观察对象的执行并清理正在使用的任何资源。这是一个示例:

const task_stream =
  parameter_stream.
    debounce(1000).
    map((parameter) => {
      getTasks().
        retry(3).
        filter((task) => task.user_id === user_id).
        filter((task) => task.completed === parameter).
        map((task)    => task.name)
    }).
    flatMap(Rx.Observable.from).
    distinctUntilChanged().
    update()

RxJS 中热可观察对象和冷可观察对象的区别是什么?

在 RxJS 中,可观察对象可以是热的或冷的。冷可观察对象在订阅时开始运行,而热可观察对象即使在订阅之前也会产生值。换句话说,冷可观察对象是惰性的,而热可观察对象不是。

如何在 RxJS 中组合多个可观察对象?

RxJS 提供了几个组合多个可观察对象的运算符,例如 merge()、concat()、combineLatest() 和 zip()。这些运算符中的每一个都以不同的方式组合数据流,具体取决于您的特定需求。

RxJS 中主题的目的是什么?

RxJS 中的主题是一种特殊类型的可观察对象,它允许将值多播到多个观察者。与普通可观察对象不同,主题维护许多监听器的注册表。

如何将 RxJS 与 Angular 一起使用?

Angular 内置支持 RxJS,并在内部将其用于各种功能。您也可以在自己的代码中使用 RxJS 来处理异步操作并实现自动完成、去抖动、节流、轮询等功能。

RxJS 的一些常见用例是什么?

RxJS 可用于需要处理异步数据的各种场景。一些常见的用例包括处理用户输入、发出 HTTP 请求、使用 WebSockets 和处理动画。

以上是10需要知道的RXJS功能与示例的详细内容。更多信息请关注PHP中文网其他相关文章!

声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
node.js流带打字稿node.js流带打字稿Apr 30, 2025 am 08:22 AM

Node.js擅长于高效I/O,这在很大程度上要归功于流。 流媒体汇总处理数据,避免内存过载 - 大型文件,网络任务和实时应用程序的理想。将流与打字稿的类型安全结合起来创建POWE

Python vs. JavaScript:性能和效率注意事项Python vs. JavaScript:性能和效率注意事项Apr 30, 2025 am 12:08 AM

Python和JavaScript在性能和效率方面的差异主要体现在:1)Python作为解释型语言,运行速度较慢,但开发效率高,适合快速原型开发;2)JavaScript在浏览器中受限于单线程,但在Node.js中可利用多线程和异步I/O提升性能,两者在实际项目中各有优势。

JavaScript的起源:探索其实施语言JavaScript的起源:探索其实施语言Apr 29, 2025 am 12:51 AM

JavaScript起源于1995年,由布兰登·艾克创造,实现语言为C语言。1.C语言为JavaScript提供了高性能和系统级编程能力。2.JavaScript的内存管理和性能优化依赖于C语言。3.C语言的跨平台特性帮助JavaScript在不同操作系统上高效运行。

幕后:什么语言能力JavaScript?幕后:什么语言能力JavaScript?Apr 28, 2025 am 12:01 AM

JavaScript在浏览器和Node.js环境中运行,依赖JavaScript引擎解析和执行代码。1)解析阶段生成抽象语法树(AST);2)编译阶段将AST转换为字节码或机器码;3)执行阶段执行编译后的代码。

Python和JavaScript的未来:趋势和预测Python和JavaScript的未来:趋势和预测Apr 27, 2025 am 12:21 AM

Python和JavaScript的未来趋势包括:1.Python将巩固在科学计算和AI领域的地位,2.JavaScript将推动Web技术发展,3.跨平台开发将成为热门,4.性能优化将是重点。两者都将继续在各自领域扩展应用场景,并在性能上有更多突破。

Python vs. JavaScript:开发环境和工具Python vs. JavaScript:开发环境和工具Apr 26, 2025 am 12:09 AM

Python和JavaScript在开发环境上的选择都很重要。1)Python的开发环境包括PyCharm、JupyterNotebook和Anaconda,适合数据科学和快速原型开发。2)JavaScript的开发环境包括Node.js、VSCode和Webpack,适用于前端和后端开发。根据项目需求选择合适的工具可以提高开发效率和项目成功率。

JavaScript是用C编写的吗?检查证据JavaScript是用C编写的吗?检查证据Apr 25, 2025 am 12:15 AM

是的,JavaScript的引擎核心是用C语言编写的。1)C语言提供了高效性能和底层控制,适合JavaScript引擎的开发。2)以V8引擎为例,其核心用C 编写,结合了C的效率和面向对象特性。3)JavaScript引擎的工作原理包括解析、编译和执行,C语言在这些过程中发挥关键作用。

JavaScript的角色:使网络交互和动态JavaScript的角色:使网络交互和动态Apr 24, 2025 am 12:12 AM

JavaScript是现代网站的核心,因为它增强了网页的交互性和动态性。1)它允许在不刷新页面的情况下改变内容,2)通过DOMAPI操作网页,3)支持复杂的交互效果如动画和拖放,4)优化性能和最佳实践提高用户体验。

See all articles

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

Video Face Swap

Video Face Swap

使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热工具

ZendStudio 13.5.1 Mac

ZendStudio 13.5.1 Mac

功能强大的PHP集成开发环境

mPDF

mPDF

mPDF是一个PHP库,可以从UTF-8编码的HTML生成PDF文件。原作者Ian Back编写mPDF以从他的网站上“即时”输出PDF文件,并处理不同的语言。与原始脚本如HTML2FPDF相比,它的速度较慢,并且在使用Unicode字体时生成的文件较大,但支持CSS样式等,并进行了大量增强。支持几乎所有语言,包括RTL(阿拉伯语和希伯来语)和CJK(中日韩)。支持嵌套的块级元素(如P、DIV),

SecLists

SecLists

SecLists是最终安全测试人员的伙伴。它是一个包含各种类型列表的集合,这些列表在安全评估过程中经常使用,都在一个地方。SecLists通过方便地提供安全测试人员可能需要的所有列表,帮助提高安全测试的效率和生产力。列表类型包括用户名、密码、URL、模糊测试有效载荷、敏感数据模式、Web shell等等。测试人员只需将此存储库拉到新的测试机上,他就可以访问到所需的每种类型的列表。

适用于 Eclipse 的 SAP NetWeaver 服务器适配器

适用于 Eclipse 的 SAP NetWeaver 服务器适配器

将Eclipse与SAP NetWeaver应用服务器集成。

MinGW - 适用于 Windows 的极简 GNU

MinGW - 适用于 Windows 的极简 GNU

这个项目正在迁移到osdn.net/projects/mingw的过程中,你可以继续在那里关注我们。MinGW:GNU编译器集合(GCC)的本地Windows移植版本,可自由分发的导入库和用于构建本地Windows应用程序的头文件;包括对MSVC运行时的扩展,以支持C99功能。MinGW的所有软件都可以在64位Windows平台上运行。