Home >Web Front-end >JS Tutorial >10 Need-to-Know RxJS Functions with Examples

10 Need-to-Know RxJS Functions with Examples

Joseph Gordon-Levitt
Joseph Gordon-LevittOriginal
2025-02-17 10:08:10211browse

10 Need-to-Know RxJS Functions with Examples

This article was reviewed by Florian Rappl and Moritz Kröger. Thanks to all the peer reviewers at SitePoint for making SitePoint’s content perfect!

As interest in functional reactive programming (FRP) grows, RxJS has become one of the most popular JavaScript libraries in this paradigm. In this article, we will explore the top ten must-know functions in RxJS.

Note: This article assumes that you are familiar with the basics of RxJS, as described in the article "Beginning with Functional Reactive Programming with RxJS".

Key Points

  • RxJS leverages observable objects similar to arrays filled over time to facilitate functional reactive programming (FRP), allowing for more declarative and powerful error handling in applications.
  • The core operations of simple streams in RxJS, such as map(), filter(), reduce(), and take(), mirror array operations, but are applied to streams of numbers that emit values ​​over time.
  • Special functions like flatMap() and switch() are critical to handling complex data structures and managing multiple streams separately, making them critical for advanced reactive programming tasks.
  • Operators such as concat(), merge() and combineLatest() can be used to effectively combine multiple streams, each of which plays a different role in stream management and data synchronization.
  • The
  • takeUntil() function provides a mechanism based on external conditions to unsubscribe, which illustrates the flexibility of RxJS in flow control and resource management.

Reactive Programming

Reactive programming is a programming paradigm that takes the data stream called observable objects as its basic programming unit.

Stream—or observable objects in RxJS jargon—alike event listeners: both wait for something to happen and notify you when it happens. A series of asynchronous notifications obtained from the onClick listener are a perfect example of data flow.

In other words, the observable object is nothing more than an array filled over time.

Elements of this array can come from almost anywhere: file system, DOM events, API calls, and even converted synchronous data such as arrays. Fundamentally, reactive programming is nothing more than using observable objects as building blocks of programs.

Relationship with array

Arrays are simple because their content is final unless explicitly changed. In this sense, there is no essential temporality in an array.

On the other hand, observable objects are defined by time. At most you can know that the stream has received so far [1, 2, 3]. You can't be sure if you're going to get 4—or not—and it's the data source, not your program, that determines that.

The relationship between streams and arrays is so profound that most reactive extensions originate from the world of functional programming, where list operations are core.

Familiar with RxJS

Consider common to-do apps. Let's see the question of how to display the name of a user's unfinished task using RxJS:

<code class="language-javascript">const task_stream =
  // 创建所有数据库中任务的流
  getTasks().
    // 只获取此用户的任务
    filter((task) => task.user_id == user_id).
    // 获取未完成的任务
    filter((task) => !task.completed).
    // 只获取任务名称
    map((task) => task.name)

/* 任务如下所示:
   task = {
    user_id   : number,
    completed : boolean,
    name      : string
   }
 */</code>

So far, this is just an array extension, but it demonstrates the functional style of reactive programming.

The declarative nature becomes clear by adding more complex, "real world" functions. Suppose we want:

  • Initiate the request in response to the user's choice to view completed or unfinished tasks;
  • Send only once a second request to the last selection to avoid wasting bandwidth when the user quickly changes the selection;
  • Retry up to three failed requests; and
  • Repaint the view only if the server sends a different response than the last time.
<code class="language-javascript">const task_stream =
  parameter_stream.
    debounce(1000).
    map((parameter) => {
      getTasks().
        retry(3).
        filter((task) => task.user_id === user_id).
        filter((task) => task.completed === parameter).
        map((task)    => task.name)
    }).
    flatMap(Rx.Observable.from).
    distinctUntilChanged().
    update()</code>

Steply decompose:

  • parameter_stream tells us whether the user wants completed or unfinished tasks and stores the selection in the parameter;
  • debounce() Make sure we only focus on the last button click per second;
  • The part around getTasks() is the same as before;
  • distinctUntilChanged() ensures that we only follow the server's response differently than the last time; and
  • update() is responsible for updating the UI to reflect what we get from the server.

Handling debounce, retry, and "distinct until changed" logic in imperative, callback-based styles is effective, but it is both fragile and complex.

The key is that programming using RxJS allows:

  1. Declarational Program;
  2. Scalable system; and
  3. Simple, direct and powerful error handling.

In the process of browsing the top ten must-know functions of RxJS, we will encounter each of the functions in the above examples.

Simple flow operation

The basic functions of a simple stream (a stream that emits simple values, such as a string) include:

  • map()
  • filter()
  • reduce()
  • take() / takeWhile()

Apart from take() and takeWhile(), these are similar to JavaScript's higher-order array functions.

We will apply these functions by solving an example problem: find all users in the database with .com or .org websites and calculate the average length of their website names.

JSONPlaceholder will serve as our source of users. This is a JSON representation of the user data we will use.

1. Use map() to convert data

Using map() on observable objects is the same as using it on an array. It:

  1. Accept callback as parameter;
  2. Execute it on each element of the array you call; and
  3. Returns a new array where each element of the original array is replaced by the result produced by the callback on it.

The only difference when using map() on observable objects is:

  1. It returns a new observable object, not a new array; and
  2. It executes when the observable object issues a new project, rather than all of it immediately.

We can use map() to convert our user data streams into a list that contains only their website names:

<code class="language-javascript">const task_stream =
  // 创建所有数据库中任务的流
  getTasks().
    // 只获取此用户的任务
    filter((task) => task.user_id == user_id).
    // 获取未完成的任务
    filter((task) => !task.completed).
    // 只获取任务名称
    map((task) => task.name)

/* 任务如下所示:
   task = {
    user_id   : number,
    completed : boolean,
    name      : string
   }
 */</code>

Here, we use map to replace each user object in the incoming stream with each user's website.

RxJS also allows you to call map() as select(). Both names refer to the same function.

2. Filter results

Like map() , filter() does roughly the same role on observable objects as on arrays. To find every user with a .net or .org website address, we can write this:

<code class="language-javascript">const task_stream =
  parameter_stream.
    debounce(1000).
    map((parameter) => {
      getTasks().
        retry(3).
        filter((task) => task.user_id === user_id).
        filter((task) => task.completed === parameter).
        map((task)    => task.name)
    }).
    flatMap(Rx.Observable.from).
    distinctUntilChanged().
    update()</code>

This will select only users whose website ends in "net" or "org".

filter() also has an alias where().

3. Use reduce() to collect results

reduce() allows us to use all single values ​​and convert them into a single result.

reduce() is often the most confusing basic list operation, because unlike filter() or map(), its behavior varies by use.

Usually, reduce() takes a collection of values ​​and converts them into a single data point. In our example, we will provide it with a website name stream and use reduce() to convert that stream into an object that calculates the sum of the number of websites we found and its name length.

<code class="language-javascript">source.
  map((user) => user.website)</code>

Here, we simplify the stream to a single object, which tracks:

  1. How many sites have we seen; and
  2. Total length of all names.

Remember that reduce() returns the result only when the source observable object is completed. If you want to know the status of the accumulator every time the stream receives a new item, use scan() instead.

4. Use take() to limit the results

take() and takeWhile() complement the basic functions of simple streams.

take(n) Read n values ​​from the stream and unsubscribe.

We can use scan() to emit our object every time we receive the website, and only take() the first two values.

<code class="language-javascript">source.
  map((user) => user.website).
  filter((website) => (website.endsWith('net') || website.endsWith('org'));
})</code>

RxJS also provides takeWhile(), which allows you to get values ​​before a certain boolean test is established. We can use takeWhile() to write the above stream like this:

<code class="language-javascript">source.
  map((user) => user.website).
  filter((website) => (website.endsWith('net') || website.endsWith('org'))).
  reduce((data, website) => {
    return {
      count       : data.count += 1,
      name_length : data.name_length += website.length
    }
  }, { count : 0, name_length : 0 })</code>

High-order flow operation

These functions are almost the same as familiar list operations except that they work on observable objects rather than arrays.

"[I]f you know how to program against Arrays using the Array#extras, then you already know how to use RxJS!" ~ RxJS Documentation

Just as an array can contain data that is more complex than simple values ​​(such as an array or object), observable objects can also emit higher-order data, such as Promise or other observable objects. This is where more professional tools come into play.

5. Use flatMap() to press advection

…In fact, we are already using it!

When we define the source stream, we call fromPromise() and flatMap():

<code class="language-javascript">source.
  map((user) => user.website).
  filter((website) => (website.endsWith('net') || website.endsWith('org'))).
  scan((data, website) => {
      return {
        count       : data.count += 1,
        name_length : data.name_length += website.length
      }
    }, { count : 0, name_length : 0 }).
  take(2);</code>

This uses three new mechanisms:

  1. fromPromise;
  2. Rx.Observable.from; and
  3. flatMap.

Observable object from promise

Promise represents a single future value that we will get asynchronously—for example, the result of a call to the server.

A defining feature of

Promise is that it represents only a future value. It cannot return multiple asynchronous data; this is what the observable object does and is a fundamental difference between the two.

This means that when we use Rx.Observable.fromPromise() we get an observable object that emits a single value—or:

  1. The value parsed to Promise; or
  2. Promise Rejected Value.

When Promise returns a string or a number, we don't need to do anything special. However, when it returns an array (which is what it is in our case), we prefer to create an observable object that emits the contents of the array rather than the array itself as a single value.

6. Use flatMap()

This process is called flattening, which flatMap() processes. It has a lot of overloads, but we only use the simplest and most commonly used overloads.

When using flatMap(), we:

  1. Call flatMap() on observable objects that issue a single-value resolution or rejection of the Promise; and
  2. Pass a function to create a new observable object.

In our example, we pass Rx.Observable.from(), which creates a sequence from the values ​​of the array:

<code class="language-javascript">const task_stream =
  // 创建所有数据库中任务的流
  getTasks().
    // 只获取此用户的任务
    filter((task) => task.user_id == user_id).
    // 获取未完成的任务
    filter((task) => !task.completed).
    // 只获取任务名称
    map((task) => task.name)

/* 任务如下所示:
   task = {
    user_id   : number,
    completed : boolean,
    name      : string
   }
 */</code>

This covers the code in our short preface:

<code class="language-javascript">const task_stream =
  parameter_stream.
    debounce(1000).
    map((parameter) => {
      getTasks().
        retry(3).
        filter((task) => task.user_id === user_id).
        filter((task) => task.completed === parameter).
        map((task)    => task.name)
    }).
    flatMap(Rx.Observable.from).
    distinctUntilChanged().
    update()</code>

RxJS also provides an alias for flatMap(): selectMany().

Combining multiple streams

Usually, we will have multiple streams that need to be combined. There are many ways to combine streams, but some appear more frequently than others.

7. Use concat() and merge() to combine streams

Connection and merge are the two most common ways to combine streams.

Connection creates a new stream by emitting the value of the first stream until it is finished and then emitting the value of the second stream.

Merge creates new streams from multiple streams by emitting the value of any active stream

Think about talking to two people at the same time on Facebook Messenger. concat() is a situation where you receive a message from both parties but complete a conversation with one person before replying to another person. merge() is like creating a group chat and receiving two message streams at the same time.

<code class="language-javascript">source.
  map((user) => user.website)</code>
The

concat() stream will first print all values ​​of source1 and will only start printing the value of source2 after source1 is finished.

merge() The stream will print the values ​​of source1 and source2 according to the received order: it does not wait for the first stream to complete before emitting the value of the second stream.

8. Use switch()

Usually, we want to listen for observable objects that emit observable objects, but only focus on the latest emissions from the source.

To further expand Facebook Messenger’s analogy, switch() is you…well, toggle the person you reply based on who is currently sending the message.

For this purpose, RxJS provides switch.

The user interface provides several good use cases for switch(). If our application makes a request every time the user selects what they want to search for, we can assume that they just want to see the results of the latest selection. Therefore, we use switch() to listen only for the latest selection results.

By the way, we should make sure not to waste bandwidth and only select access to the server for the last time a user makes every second. The function we use for this is called debounce()

If you want to go in the other direction and only follow the first choice, you can use throttle(). It has the same API, but behaves the opposite.

9. Coordinated flow

What if we want to allow users to search for posts or users with a specific ID?

For demonstration, we will create another drop-down menu and allow users to select the ID of the item they want to retrieve.

There are two situations. When user:

  1. Change any choice; or
  2. Change two options.

Respond to any level of changes using combineLatest()

In the first case, we need to create a stream that starts a network request using the following:

  1. The user's recent selection of endpoint; and
  2. The ID recently selected by the user.

…and do this when the user updates any selection.

This is what combineLatest() is for:

<code class="language-javascript">const task_stream =
  // 创建所有数据库中任务的流
  getTasks().
    // 只获取此用户的任务
    filter((task) => task.user_id == user_id).
    // 获取未完成的任务
    filter((task) => !task.completed).
    // 只获取任务名称
    map((task) => task.name)

/* 任务如下所示:
   task = {
    user_id   : number,
    completed : boolean,
    name      : string
   }
 */</code>

Whenever any stream emits a value, combineLatest() takes the emitted value and pairs it with the last item emitted by the other streams and issues the pair as an array.

This is easier to visualize in the chart:

<code class="language-javascript">const task_stream =
  parameter_stream.
    debounce(1000).
    map((parameter) => {
      getTasks().
        retry(3).
        filter((task) => task.user_id === user_id).
        filter((task) => task.completed === parameter).
        map((task)    => task.name)
    }).
    flatMap(Rx.Observable.from).
    distinctUntilChanged().
    update()</code>

Use zip to respond only to changes in two streams

So wait until the user updates his selection of id and endpoint fields, replace combineLatest() with zip().

Again, this is easier to understand in the chart:

<code class="language-javascript">source.
  map((user) => user.website)</code>

Unlike combineLatest() , zip() will wait until both observable objects emit new content before sending out an array of their updated values.

10. takeUntil

Finally, takeUntil() allows us to listen for the first stream until the second stream starts to emit values.

<code class="language-javascript">source.
  map((user) => user.website).
  filter((website) => (website.endsWith('net') || website.endsWith('org'));
})</code>

This is useful when you need to coordinate streams but don't need to combine them.

Summary

Just adding time dimensions to arrays opens the door to new thinking about programs.

RxJS is much more than what we see here, but that's enough to go a long way.

Start with RxJS Lite, be ready to refer to the documentation and take time to do it. Before you know it, everything will look like a stream…because everything is.

FAQs about RxJS functions (FAQ)

What is the main difference between RxJS and traditional JavaScript?

RxJS is a reactive programming library using observable objects to simplify the combination of asynchronous or callback-based code. This is compared to using traditional JavaScript with a more imperative programming style. The key difference is how they process data—RxJS treats data as a stream, which can be operated and transformed using various operators, while traditional JavaScript processes data in a more linear way.

How to create observable objects in RxJS?

In RxJS, you can create observable objects using the new Observable() constructor. This constructor takes a function as an argument, called a subscriber function, which is executed when initially subscribed to an observable object. Here is a basic example:

<code class="language-javascript">const task_stream =
  // 创建所有数据库中任务的流
  getTasks().
    // 只获取此用户的任务
    filter((task) => task.user_id == user_id).
    // 获取未完成的任务
    filter((task) => !task.completed).
    // 只获取任务名称
    map((task) => task.name)

/* 任务如下所示:
   task = {
    user_id   : number,
    completed : boolean,
    name      : string
   }
 */</code>

What are the main operators in RxJS and how they work?

RxJS has a wide range of operators that can be used to control how data flows between observable objects and observers. Some of the main operators include map(), filter(), reduce(), merge(), and concat(). Each of these operators operates a stream of data in different ways, such as converting data, filtering out certain values, or combining multiple streams.

How to handle errors in RxJS?

RxJS provides several operators that handle errors, such as catchError(), retry(), and retryWhen(). The catchError() operator is used to catch errors on observable streams and to return a new observable object or throw an error. The retry() operator can be used to resubscribe to observable objects in the event of an error. The retryWhen() operator is similar, but it provides more control over when to retry.

How to cancel the subscription of observable objects in RxJS?

When you subscribe to an observable, you receive a Subscription that has an unsubscribe() method. You can call this method to cancel the execution of the observable object and clean up any resources that are being used. Here is an example:

<code class="language-javascript">const task_stream =
  parameter_stream.
    debounce(1000).
    map((parameter) => {
      getTasks().
        retry(3).
        filter((task) => task.user_id === user_id).
        filter((task) => task.completed === parameter).
        map((task)    => task.name)
    }).
    flatMap(Rx.Observable.from).
    distinctUntilChanged().
    update()</code>

What is the difference between a medium-heat observable object and a cold observable object?

In RxJS, the observable object can be hot or cold. Cold observables start running when subscribed, while hot observables produce values ​​even before subscribe. In other words, cold observable objects are inert, while hot observable objects are not.

How to combine multiple observable objects in RxJS?

RxJS provides several operators that combine multiple observable objects, such as merge(), concat(), combineLatest(), and zip(). Each of these operators combines data streams in different ways, depending on your specific needs.

What is the purpose of the theme in RxJS?

The topic in RxJS is a special type of observable object that allows multicasting of values ​​to multiple observers. Unlike ordinary observable objects, topics maintain registries for many listeners.

How to use RxJS with Angular?

Angular supports RxJS in-built and uses it internally for various functions. You can also use RxJS in your own code to handle asynchronous operations and implement functions such as automatic completion, de-jitter, throttling, polling, etc.

What are some common use cases for RxJS?

RxJS can be used in various scenarios where asynchronous data is required. Some common use cases include handling user input, making HTTP requests, using WebSockets, and handling animations.

The above is the detailed content of 10 Need-to-Know RxJS Functions with Examples. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn