>  기사  >  웹 프론트엔드  >  Node.js 스트림 모듈에 대해 이야기하고 고성능 애플리케이션을 구축하는 방법을 살펴보겠습니다.

Node.js 스트림 모듈에 대해 이야기하고 고성능 애플리케이션을 구축하는 방법을 살펴보겠습니다.

青灯夜游
青灯夜游앞으로
2022-03-28 20:25:382516검색

이 글에서는 Node 스트림 모듈을 소개하고 Stream을 사용하여 고성능 Node.js 애플리케이션을 구축하는 방법을 소개합니다. 모든 사람에게 도움이 되기를 바랍니다!

Node.js 스트림 모듈에 대해 이야기하고 고성능 애플리케이션을 구축하는 방법을 살펴보겠습니다.

키보드에서 문자를 입력하거나, 디스크에서 파일을 읽거나, 인터넷에서 파일을 다운로드할 때 정보(비트)의 흐름이 다양한 장치와 애플리케이션을 통해 흐릅니다.

이러한 바이트 스트림을 처리하는 방법을 배우면 고성능의 가치 있는 애플리케이션을 구축할 수 있습니다. 예를 들어 YouTube에서 동영상을 볼 때 전체 동영상이 다운로드될 때까지 기다릴 필요가 없다고 상상해 보세요. 작은 버퍼가 있으면 비디오가 재생되기 시작하고 시청하는 동안 나머지는 계속 다운로드됩니다.

Nodejs에는 스트리밍 데이터를 처리할 수 있는 내장 모듈 stream이 포함되어 있습니다. 이 글에서는 몇 가지 간단한 예시를 통해 stream의 사용법을 설명하고, 복잡한 흐름에서 고성능 애플리케이션을 구축할 때 서로 다른 스트림을 병합하는 파이프라인을 구축하는 방법도 설명합니다. stream 可以让我们处理流数据。在这篇文章中,我们将通过几个简单的示例来讲解 stream 的用法,我们也会描述在面对复杂案例构建高性能应用时,应该如何构建管道去合并不同的流。

在我们深入理解应用构建前,理解 Node.js stream 模块提供的特性很重要。

让我们开始吧!

Node.js 流的类型

Node.js stream 提供了四种类型的流

  • 可读流(Readable Streams)
  • 可写流(Writable Streams)
  • 双工流(Duplex Streams)
  • 转换流(Transform Streams)

更多详情请查看 Node.js 官方文档

https://nodejs.org/api/stream.html#stream_types_of_streams

让我们在高层面来看看每一种流类型吧。

可读流

可读流可以从一个特定的数据源中读取数据,最常见的是从一个文件系统中读取。Node.js 应用中其他常见的可读流用法有:

  • process.stdin -通过 stdin  在终端应用中读取用户输入。
  • http.IncomingMessage - 在 HTTP 服务中读取传入的请求内容或者在 HTTP 客户端中读取服务器的 HTTP 响应。

可写流

你可以使用可写流将来自应用的数据写入到特定的地方,比如一个文件。

process.stdout  可以用来将数据写成标准输出且被 console.log 内部使用。

接下来是双工流和转换流,可以被定义为基于可读流和可写流的混合流类型。

双工流

双工流是可读流和可写流的结合,它既可以将数据写入到特定的地方也可以从数据源读取数据。最常见的双工流案例是 net.Socket,它被用来从 socket 读写数据。

有一点很重要,双工流中的可读端和可写端的操作是相互独立的,数据不会从一端流向另一端。

转换流

转换流与双工流略有相似,但在转换流中,可读端和可写端是相关联的。

crypto.Cipher 类是一个很好的例子,它实现了加密流。通过 crypto.Cipher 流,应用可以往流的可写端写入纯文本数据并从流的可读端读取加密后的密文。之所以将这种类型的流称之为转换流就是因为其转换性质。

附注:另一个转换流是 stream.PassThroughstream.PassThrough 从可写端传递数据到可读端,没有任何转换。这听起来可能有点多余,但 Passthrough 流对构建自定义流以及流管道非常有帮助。(比如创建一个流的数据的多个副本)

从可读的 Node.js 流读取数据

一旦可读流连接到生产数据的源头,比如一个文件,就可以用几种方法通过该流读取数据。

首先,先创建一个名为 myfile  的简单的 text 文件,85 字节大小,包含以下字符串:

Lorem ipsum dolor sit amet, consectetur adipiscing elit. Curabitur nec mauris turpis.

现在,我们看下从可读流读取数据的两种不同方式。

1. 监听 data 事件

从可读流读取数据的最常见方式是监听流发出的 data 事件。以下代码演示了这种方式:

const fs = require('fs')
const readable = fs.createReadStream('./myfile', { highWaterMark: 20 });

readable.on('data', (chunk) => {
    console.log(`Read ${chunk.length} bytes\n"${chunk.toString()}"\n`);
})

highWaterMark 属性作为一个选项传递给 fs.createReadStream,用于决定该流中有多少数据缓冲。然后数据被冲到读取机制(在这个案例中,是我们的 data 处理程序)。默认情况下,可读 fs 流的 highWaterMark 值是 64kb。我们刻意重写该值为 20 字节用于触发多个 data

애플리케이션 구축을 이해하기 전에 Node.js stream 모듈에서 제공하는 기능을 이해하는 것이 중요합니다. 🎜🎜 시작해 보세요! 🎜

Node.js 스트림 유형

🎜Node.js stream은 네 가지 유형의 스트림을 제공합니다🎜
  • 읽기 가능한 스트림(Readable Streams)
  • 쓰기 가능한 스트림(Writable Streams)
  • 이중 스트림(Duplex Streams)
  • 변환 스트림(Transform Streams)
  • li>
🎜자세한 내용은 Node.js 공식 문서를 확인하세요🎜🎜https://nodejs.org/api/stream.html#stream_types_of_streams🎜
🎜각각 살펴보겠습니다. 높은 수준의 흐름 유형. 🎜

읽기 가능한 스트림

🎜읽기 가능한 스트림은 특정 데이터 소스, 가장 일반적으로 시스템에서 읽은 파일에서 데이터를 읽을 수 있습니다. Node.js 애플리케이션에서 읽기 가능한 스트림의 다른 일반적인 용도는 다음과 같습니다. 🎜
  • process.stdin - stdin을 통해 터미널 애플리케이션에서 사용자 입력을 읽습니다.
  • http.IncomingMessage - HTTP 서버에서 들어오는 요청의 내용을 읽거나 HTTP 클라이언트에서 서버의 HTTP 응답을 읽습니다.

쓰기 가능한 스트림

🎜쓰기 가능한 스트림을 사용하여 애플리케이션의 데이터를 특정 위치에 쓸 수 있습니다. , 파일 등. 🎜🎜process.stdout는 데이터를 표준 출력에 쓰는 데 사용할 수 있으며 console.log에서 내부적으로 사용됩니다. 🎜🎜다음은 이중 스트림과 변환 스트림으로, 읽기 가능한 스트림과 쓰기 가능한 스트림을 기반으로 혼합 스트림 유형으로 정의할 수 있습니다. 🎜

이중 스트림

🎜이중 스트림은 읽기 가능한 스트림과 쓰기 가능한 스트림의 조합입니다. 특정 위치에 데이터를 쓸 수도 있고 읽을 수도 있습니다. 데이터 소스에서. 이중 스트림의 가장 일반적인 예는 소켓에서 데이터를 읽고 쓰는 데 사용되는 net.Socket입니다. 🎜🎜이중 스트림의 읽기 가능한 끝과 쓰기 가능한 끝의 작업은 서로 독립적이며 데이터가 한 끝에서 다른 끝으로 흐르지 않는다는 점에 유의하는 것이 중요합니다. 🎜

변환 스트림

🎜변환 스트림은 이중 스트림과 약간 유사하지만 변환 스트림에는 읽을 수 있는 끝과 쓰기 가능한 끝은 관련되어 있습니다. 🎜🎜crypto.Cipher 클래스가 좋은 예이며 암호화된 스트림을 구현합니다. crypto.Cipher 스트림을 통해 애플리케이션은 쓰기 가능한 스트림 끝 부분에 일반 텍스트 데이터를 쓰고 읽기 가능한 스트림 끝 부분에서 암호화된 암호 텍스트를 읽을 수 있습니다. 이러한 유형의 흐름은 변환 속성 때문에 변환 흐름이라고 합니다. 🎜🎜추신: 또 다른 변환 스트림은 stream.PassThrough입니다. stream.PassThrough는 변환 없이 쓰기 가능한 쪽에서 읽기 가능한 쪽으로 데이터를 전달합니다. 중복된 것처럼 들릴 수도 있지만 패스스루 스트림은 사용자 지정 스트림과 스트림 파이프라인을 구축하는 데 매우 유용합니다. (예: 스트림 데이터의 여러 복사본 생성) 🎜

읽기 가능한 Node.js 스트림에서 데이터 읽기

🎜사용 가능한 경우 읽기 스트림 파일과 같은 프로덕션 데이터의 소스에 연결되며 여러 가지 방법으로 스트림을 통해 데이터를 읽을 수 있습니다. 🎜🎜먼저 다음 문자열을 포함하는 85바이트 크기의 myfile이라는 이름의 간단한 텍스트 파일을 만듭니다. 🎜
Read 20 bytes
"Lorem ipsum dolor si"

Read 20 bytes
"t amet, consectetur "

Read 20 bytes
"adipiscing elit. Cur"

Read 20 bytes
"abitur nec mauris tu"

Read 5 bytes
"rpis."
🎜이제, 읽을 수 있는 스트림에서 데이터를 읽는 두 가지 방법을 살펴보겠습니다. 🎜

1. 데이터 이벤트 수신

🎜읽기 가능한 스트림에서 데이터를 읽는 가장 일반적인 방법은 다음과 같습니다. 스트림에서 발생하는 data 이벤트를 수신합니다. 다음 코드는 이 접근 방식을 보여줍니다. 🎜
const fs = require('fs')
const readable = fs.createReadStream('./myfile', { highWaterMark: 20 });

(async () => {
    for await (const chunk of readable) {
        console.log(`Read ${chunk.length} bytes\n"${chunk.toString()}"\n`);
    }
})()
🎜 highWaterMark 속성은 스트림에서 버퍼링되는 데이터의 양을 결정하는 옵션으로 fs.createReadStream에 전달됩니다. 그런 다음 데이터는 읽기 메커니즘(이 경우 data 핸들러)으로 플러시됩니다. 기본적으로 읽기 가능한 fs 스트림의 highWaterMark 값은 64kb입니다. 여러 data 이벤트를 트리거하기 위해 의도적으로 이 값을 20바이트로 다시 작성합니다. 🎜

如果你运行上述程序,它会在五个迭代内从 myfile 中读取 85 个字节。你会在 console 看到以下输出:

Read 20 bytes
"Lorem ipsum dolor si"

Read 20 bytes
"t amet, consectetur "

Read 20 bytes
"adipiscing elit. Cur"

Read 20 bytes
"abitur nec mauris tu"

Read 5 bytes
"rpis."

2. 使用异步迭代器

从可读流中读取数据的另一种方法是使用异步迭代器:

const fs = require('fs')
const readable = fs.createReadStream('./myfile', { highWaterMark: 20 });

(async () => {
    for await (const chunk of readable) {
        console.log(`Read ${chunk.length} bytes\n"${chunk.toString()}"\n`);
    }
})()

如果你运行这个程序,你会得到和前面例子一样的输出。

可读 Node.js 流的状态

当一个监听器监听到可读流的 data 事件时,流的状态会切换成”流动”状态(除非该流被显式的暂停了)。你可以通过流对象的 readableFlowing  属性检查流的”流动”状态

我们可以稍微修改下前面的例子,通过 data 处理器来示范:

const fs = require('fs')
const readable = fs.createReadStream('./myfile', { highWaterMark: 20 });

let bytesRead = 0

console.log(`before attaching 'data' handler. is flowing: ${readable.readableFlowing}`);
readable.on('data', (chunk) => {
    console.log(`Read ${chunk.length} bytes`);
    bytesRead += chunk.length

    // 在从可读流中读取 60 个字节后停止阅读
    if (bytesRead === 60) {
        readable.pause()
        console.log(`after pause() call. is flowing: ${readable.readableFlowing}`);

        // 在等待 1 秒后继续读取
        setTimeout(() => {
            readable.resume()
            console.log(`after resume() call. is flowing: ${readable.readableFlowing}`);
        }, 1000)
    }
})
console.log(`after attaching 'data' handler. is flowing: ${readable.readableFlowing}`);

在这个例子中,我们从一个可读流中读取  myfile,但在读取 60 个字节后,我们临时暂停了数据流 1 秒。我们也在不同的时间打印了 readableFlowing 属性的值去理解他是如何变化的。

如果你运行上述程序,你会得到以下输出:

before attaching 'data' handler. is flowing: null
after attaching 'data' handler. is flowing: true
Read 20 bytes
Read 20 bytes
Read 20 bytes
after pause() call. is flowing: false
after resume() call. is flowing: true
Read 20 bytes
Read 5 bytes

我们可以用以下来解释输出:

  • 当我们的程序开始时,readableFlowing 的值是 null,因为我们没有提供任何消耗流的机制。

  • 在连接到 data 处理器后,可读流变为“流动”模式,readableFlowing 变为 true

  • 一旦读取 60 个字节,通过调用 pause()来暂停流,readableFlowing 也转变为 false

  • 在等待 1 秒后,通过调用 resume(),流再次切换为“流动”模式,readableFlowing 改为 `true'。然后剩下的文件内容在流中流动。

通过 Node.js 流处理大量数据

因为有流,应用不需要在内存中保留大型的二进制对象:小型的数据块可以接收到就进行处理。

在这部分,让我们组合不同的流来构建一个可以处理大量数据的真实应用。我们会使用一个小型的工具程序来生成一个给定文件的 SHA-256。

但首先,我们需要创建一个大型的 4GB 的假文件来测试。你可以通过一个简单的 shell 命令来完成:

  • On macOS: mkfile -n 4g 4gb_file
  • On Linux: xfs_mkfile 4096m 4gb_file

在我们创建了假文件 4gb_file 后,让我们在不使用 stream 模块的情况下来生成来文件的 SHA-256 hash。

const fs = require("fs");
const crypto = require("crypto");

fs.readFile("./4gb_file", (readErr, data) => {
  if (readErr) return console.log(readErr)
  const hash = crypto.createHash("sha256").update(data).digest("base64");
  fs.writeFile("./checksum.txt", hash, (writeErr) => {
    writeErr && console.error(err)
  });
});

如果你运行以上代码,你可能会得到以下错误:

RangeError [ERR_FS_FILE_TOO_LARGE]: File size (4294967296) is greater than 2 GB
    at FSReqCallback.readFileAfterStat [as oncomplete] (fs.js:294:11) {
  code: 'ERR_FS_FILE_TOO_LARGE'
}

以上报错之所以发生是因为 JavaScript 运行时无法处理随机的大型缓冲。运行时可以处理的最大尺寸的缓冲取决于你的操作系统结构。你可以通过使用内建的 buffer 模块里的 buffer.constants.MAX_LENGTH 变量来查看你操作系统缓存的最大尺寸。

即使上述报错没有发生,在内存中保留大型文件也是有问题的。我们所拥有的可用的物理内存会限制我们应用能使用的内存量。高内存使用率也会造成应用在 CPU 使用方面性能低下,因为垃圾回收会变得昂贵。

使用  pipeline() 减少 APP 的内存占用

现在,让我们看看如何修改应用去使用流且避免遇到这个报错:

const fs = require("fs");
const crypto = require("crypto");
const { pipeline } = require("stream");

const hashStream = crypto.createHash("sha256");
hashStream.setEncoding('base64')

const inputStream = fs.createReadStream("./4gb_file");
const outputStream = fs.createWriteStream("./checksum.txt");

pipeline(
    inputStream,
    hashStream,
    outputStream,
    (err) => {
        err && console.error(err)
    }
)

在这个例子中,我们使用 crypto.createHash 函数提供的流式方法。它返回一个“转换”流对象 hashStream,为随机的大型文件生成 hash。

为了将文件内容传输到这个转换流中,我们使用 fs.createReadStream4gb_file 创建了一个可读流 inputStream。我们将 hashStream  转换流的输出传递到可写流 outputStream 中,而 checksum.txt 通过 fs.createWriteStream 创建的。

如果你运行以上程序,你将看见在 checksum.txt 文件中看见 4GB 文件的 SHA-256 hash。

对流使用 pipeline()pipe() 的对比

在前面的案例中,我们使用 pipeline 函数来连接多个流。另一种常见的方法是使用 .pipe() 函数,如下所示:

inputStream
  .pipe(hashStream)
  .pipe(outputStream)

但这里有几个原因,所以并不推荐在生产应用中使用 .pipe()。如果其中一个流被关闭或者出现报错,pipe() 不会自动销毁连接的流,这会导致应用内存泄露。同样的,pipe() 不会自动跨流转发错误到一个地方处理。

因为这些问题,所以就有了 pipeline(),所以推荐你使用 pipeline() 而不是 pipe() 来连接不同的流。 我们可以重写上述的 pipe() 例子来使用  pipeline() 函数,如下:

pipeline(
    inputStream,
    hashStream,
    outputStream,
    (err) => {
        err && console.error(err)
    }
)

pipeline() 接受一个回调函数作为最后一个参数。任何来自被连接的流的报错都将触发该回调函数,所以可以很轻松的在一个地方处理报错。

总结:使用 Node.js 流降低内存并提高性能

在 Node.js 中使用流有助于我们构建可以处理大型数据的高性能应用。

在这篇文章中,我们覆盖了:

  • 四种类型的 Node.js 流(可读流、可写流、双工流以及转换流)。
  • 如何通过监听 data 事件或者使用异步迭代器来从可读流中读取数据。
  • 通过使用 pipeline 连接多个流来减少内存占用。

一个简短的警告:你很可能不会遇到太多必须使用流的场景,而基于流的方案会提高你的应用的复杂性。务必确保使用流的好处胜于它所带来的复杂性。

更多node相关知识,请访问:nodejs 教程

위 내용은 Node.js 스트림 모듈에 대해 이야기하고 고성능 애플리케이션을 구축하는 방법을 살펴보겠습니다.의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 juejin.cn에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제