>  기사  >  웹 프론트엔드  >  Node.js의 스트림에 대한 심층적인 이해

Node.js의 스트림에 대한 심층적인 이해

青灯夜游
青灯夜游앞으로
2020-08-13 17:30:503389검색

Node.js의 스트림에 대한 심층적인 이해

Node.js의 스트림은 사용하거나 이해하기가 매우 어렵습니다. [동영상 튜토리얼 추천: nodejs 동영상 튜토리얼 ]

Dominic Tarr의 말에 따르면: "스트림은 Node에서 가장 훌륭하지만 가장 오해받는 아이디어입니다." 심지어 Redux의 창시자이자 React.js 팀원인 Dan Abramov도 그렇습니다. 노드 흐름도 두려워합니다.

Node.js의 스트림에 대한 심층적인 이해

이 글은 스트림과 스트림 사용 방법을 이해하는 데 도움이 될 것입니다. 두려워하지 마세요. 완전히 알아낼 수 있습니다!

스트림이란 무엇인가요?

스트림은 Node.js 애플리케이션을 구동하는 기본 개념 중 하나입니다. 입력 데이터를 순차적으로 읽거나 출력에 데이터를 쓰는 데이터 처리 방법입니다.

스트리밍은 파일 읽기 및 쓰기, 네트워크 통신 또는 모든 유형의 엔드투엔드 정보 교환을 효율적인 방식으로 처리하는 방법입니다.

스트림이 처리되는 방식은 매우 독특합니다. 기존 방식처럼 파일을 한 번에 메모리로 읽는 대신 스트림은 데이터 블록을 하나씩 읽어 메모리에 모두 보관하지 않고 데이터 내용을 처리합니다.

이 접근 방식은 대량의 데이터를 처리할 때 스트림을 매우 강력하게 만듭니다. 예를 들어 파일 크기가 사용 가능한 메모리 공간보다 클 수 있으므로 처리를 위해 전체 파일을 메모리로 읽어들이는 것이 불가능할 수 있습니다. 그것이 바로 흐름이 들어오는 곳입니다!

스트림을 사용하여 더 작은 데이터 블록을 처리하고 더 큰 파일을 읽을 수 있습니다.

YouTube나 Netflix와 같은 "스트리밍" 서비스를 예로 들어 보겠습니다. 이러한 서비스에서는 비디오 및 오디오 파일을 즉시 다운로드할 수 없습니다. 대신, 브라우저는 연속적인 청크 스트림으로 비디오를 수신하므로 수신자는 거의 즉시 시청하고 듣기 시작할 수 있습니다.

하지만 스트리밍은 단순히 미디어와 빅데이터를 처리하는 것만이 아닙니다. 또한 코드에 "구성성"의 힘을 부여합니다. 구성 가능성을 염두에 두고 디자인한다는 것은 동일한 유형의 결과를 생성하기 위해 어떤 방식으로든 여러 구성 요소를 결합할 수 있다는 것을 의미합니다. Node.js에서는 스트림을 통해 다른 작은 조각 내에서 데이터를 전달하여 강력한 조각을 형성할 수 있습니다.

스트리밍을 사용하는 이유는 무엇인가요?

스트리밍은 기본적으로 다른 데이터 처리 방법에 비해 두 가지 주요 장점이 있습니다.

  1. 메모리 효율성: 처리하기 위해 미리 많은 양의 데이터를 메모리에 로드할 필요가 없습니다.
  2. 시간 효율성: 시간 데이터를 받은 후 즉시 시작해야 하는 시간이 크게 줄어듭니다. Node.js에는 4가지 유형의 스트림이 있습니다.

쓰기 가능한 스트림: 예 데이터가 기록되는 스트림입니다. 예를 들어 fs.createWriteStream()을 사용하면 스트림을 사용하여 파일에 데이터를 쓸 수 있습니다.

    읽기 가능한 스트림:
  1. 데이터를 읽을 수 있는 스트림입니다. 예: fs.createReadStream() 파일의 내용을 읽어 보겠습니다. fs.createWriteStream() 使我们可以使用流将数据写入文件。
  2. 可读流:可从中读取数据的流。例如:fs.createReadStream() 让我们读取文件的内容。
  3. 双工流(可读写的流):可读和可写的流。例如,net.Socket
  4. Transform:可在写入和读取时修改或转换数据。例如在文件压缩的情况下,你可以在文件中写入压缩数据,也可以从文件中读取解压缩的数据。

如果你已经使用过 Node.js,则可能遇到过流。例如在基于 Node.js 的 HTTP 服务器中,request 是可读流,而 response 是可写流。你可能用过 fs 模块,该模块可让你用可读和可写文件流。每当使用 Express 时,你都在使用流与客户端进行交互,而且由于 TCP 套接字、TLS栈和其他连接都基于 Node.js,所以在每个可以使用的数据库连接驱动的程序中使用流。

实例

如何创建可读流?

首先需要可读性流,然后将其初始化。

const Stream = require('stream')
const readableStream = new Stream.Readable()

现在,流已初始化,可以向其发送数据了:

readableStream.push('ping!')
readableStream.push('pong!')

异步迭代器

强烈建议在使用流时配合异步迭代器(async iterator)。根据 Axel Rauschmayer 博士的说法,异步迭代是一种用于异步检索数据容器内容的协议(这意味着当前“任务”可以在检索项目之前被暂停)。另外必须提及的是,流异步迭代器实现使用内部的 readable이중 스트림(읽기 및 쓰기 가능 스트림):

읽기 및 쓰기 가능 스트림. 예를 들어 net.Socket

Transform: 은 데이터를 쓰거나 읽을 때 데이터를 수정하거나 변환할 수 있습니다. 예를 들어 파일 압축의 경우 압축된 데이터를 파일에 쓰고 파일에서 압축이 풀린 데이터를 읽을 수 있습니다.

🎜Node.js를 사용해 본 적이 있다면 스트리밍을 접했을 수도 있습니다. 예를 들어 Node.js 기반 HTTP 서버에서 요청은 읽기 가능한 스트림이고 응답은 쓰기 가능한 스트림입니다. 읽기 및 쓰기 가능한 파일 스트림을 사용할 수 있게 해주는 fs 모듈을 사용했을 수도 있습니다. Express를 사용할 때마다 스트림을 사용하여 클라이언트와 상호 작용하며, TCP 소켓, TLS 스택 및 기타 연결은 모두 Node.js를 기반으로 하므로 사용할 수 있는 모든 데이터베이스 연결 기반 프로그램에서 스트림을 사용합니다. 🎜

인스턴스

🎜🎜🎜읽을 수 있는 스트림을 만드는 방법은 무엇입니까? 🎜🎜🎜🎜 먼저 가독성 스트림이 필요하고 초기화됩니다. 🎜
import * as fs from 'fs';

async function logChunks(readable) {
  for await (const chunk of readable) {
    console.log(chunk);
  }
}

const readable = fs.createReadStream(
  'tmp/test.txt', {encoding: 'utf8'});
logChunks(readable);

// Output:
// 'This is a test!\n'
🎜이제 스트림이 초기화되었으며 데이터를 보낼 수 있습니다. 🎜
import {Readable} from 'stream';

async function readableToString2(readable) {
  let result = '';
  for await (const chunk of readable) {
    result += chunk;
  }
  return result;
}

const readable = Readable.from('Good morning!', {encoding: 'utf8'});
assert.equal(await readableToString2(readable), 'Good morning!');
🎜🎜🎜Async iterator🎜🎜🎜🎜🎜스트림을 사용할 때 비동기 반복기를 사용하는 것이 좋습니다. 🎜 비동기 반복은 데이터 컨테이너의 콘텐츠를 비동기적으로 검색하기 위한 프로토콜입니다. 항목을 검색하기 전에 현재 "작업"을 일시 중지할 수 있습니다. 또한 스트림 비동기 반복기 구현은 내부 읽기 가능 이벤트를 사용한다는 점도 언급해야 합니다. 🎜🎜읽기 가능한 스트림에서 읽을 때 비동기 반복자를 사용할 수 있습니다: 🎜
const { Readable } = require('stream');

async function * generate() {
  yield 'hello';
  yield 'streams';
}

const readable = Readable.from(generate());

readable.on('data', (chunk) => {
  console.log(chunk);
});
🎜🎜문자열을 사용하여 읽기 가능한 스트림의 콘텐츠를 수집할 수도 있습니다: 🎜🎜
import {Readable} from 'stream';

async function readableToString2(readable) {
  let result = '';
  for await (const chunk of readable) {
    result += chunk;
  }
  return result;
}

const readable = Readable.from('Good morning!', {encoding: 'utf8'});
assert.equal(await readableToString2(readable), 'Good morning!');

注意,在这种情况下必须使用异步函数,因为我们想返回 Promise。

请切记不要将异步功能与 EventEmitter 混合使用,因为当前在事件处理程序中发出拒绝时,无法捕获拒绝,从而导致难以跟踪错误和内存泄漏。目前的最佳实践是始终将异步函数的内容包装在 try/catch 块中并处理错误,但这很容易出错。 这个 pull request 旨在解决一旦其落在 Node 核心上产生的问题。

要了解有关异步迭代的 Node.js 流的更多信息,请查看这篇很棒的文章

Readable.from():从可迭代对象创建可读流

stream.Readable.from(iterable, [options])  这是一种实用方法,用于从迭代器中创建可读流,该迭代器保存可迭代对象中包含的数据。可迭代对象可以是同步可迭代对象或异步可迭代对象。参数选项是可选的,除其他作用外,还可以用于指定文本编码。

const { Readable } = require('stream');

async function * generate() {
  yield 'hello';
  yield 'streams';
}

const readable = Readable.from(generate());

readable.on('data', (chunk) => {
  console.log(chunk);
});

两种读取模式

根据 Streams API,可读流有效地以两种模式之一运行:flowingpaused。可读流可以处于对象模式,无论处于 flowing 模式还是 paused 模式。

  • 流模式下,将自动从底层系统读取数据,并通过 EventEmitter 接口使用事件将其尽快提供给程序。
  • paused 模式下,必须显式调用 stream.read() 方法以从流中读取数据块。

在 flowing 模式中,要从流中读取数据,可以监听数据事件并附加回调。当有大量数据可用时,可读流将发出一个数据事件,并执行你的回调。看下面的代码片段:

var fs = require("fs");
var data = '';

var readerStream = fs.createReadStream('file.txt'); //Create a readable stream

readerStream.setEncoding('UTF8'); // Set the encoding to be utf8. 

// Handle stream events --> data, end, and error
readerStream.on('data', function(chunk) {
   data += chunk;
});

readerStream.on('end',function() {
   console.log(data);
});

readerStream.on('error', function(err) {
   console.log(err.stack);
});

console.log("Program Ended");

函数调用 fs.createReadStream() 给你一个可读流。最初流处于静态状态。一旦你侦听数据事件并附加了回调,它就会开始流动。之后将读取大块数据并将其传递给你的回调。流实现者决定发送数据事件的频率。例如,每当有几 KB 的数据被读取时,HTTP 请求就可能发出一个数据事件。当从文件中读取数据时,你可能会决定读取一行后就发出数据事件。

当没有更多数据要读取(结束)时,流将发出结束事件。在以上代码段中,我们监听此事件以在结束时得到通知。

另外,如果有错误,流将发出并通知错误。

在 paused 模式下,你只需在流实例上重复调用 read(),直到读完所有数据块为止,如以下示例所示:

var fs = require('fs');
var readableStream = fs.createReadStream('file.txt');
var data = '';
var chunk;

readableStream.on('readable', function() {
    while ((chunk=readableStream.read()) != null) {
        data += chunk;
    }
});

readableStream.on('end', function() {
    console.log(data)
});

read() 函数从内部缓冲区读取一些数据并将其返回。当没有内容可读取时返回 null。所以在 while 循环中,我们检查是否为 null 并终止循环。请注意,当可以从流中读取大量数据时,将会发出可读事件。

所有 Readable 流均以 paused 模式开始,但可以通过以下方式之一切换为 flowing 模式

  • 添加一个 'data' 事件处理。
  • 调用 stream.resume() 方法。
  • 调用 stream.pipe() 方法将数据发送到可写对象。

Readable 可以使以下方法之一切换回 paused 模式:

  • 如果没有管道目标,则通过调用 stream.pause() 方法。
  • 如果有管道目标,请删除所有管道目标。可以通过调用 stream.unpipe() 方法来删除多个管道目标。

一个需要记住的重要概念是,除非提供了一种用于消耗或忽略该数据的机制,否则 Readable 将不会生成数据。如果使用机制被禁用或取消,则 Readable 将会试图停止生成数据。添加 readable 事件处理会自动使流停止 flowing,并通过 read.read() 得到数据。如果删除了 readable 事件处理,那么如果存在 'data' 事件处理,则流将再次开始 flowing。

如何创建可写流?

要将数据写入可写流,你需要在流实例上调用 write()。如以下示例所示:

var fs = require('fs');
var readableStream = fs.createReadStream('file1.txt');
var writableStream = fs.createWriteStream('file2.txt');

readableStream.setEncoding('utf8');

readableStream.on('data', function(chunk) {
    writableStream.write(chunk);
});

上面的代码很简单。它只是简单地从输入流中读取数据块,并使用 write() 写入目的地。该函数返回一个布尔值,指示操作是否成功。如果为 true,则写入成功,你可以继续写入更多数据。如果返回 false,则表示出了点问题,你目前无法写任何内容。可写流将通过发出 drain 事件来通知你什么时候可以开始写入更多数据。

调用 writable.end() 方法表示没有更多数据将被写入 Writable。如果提供,则可选的回调函数将作为 finish 事件的侦听器附加。

// Write 'hello, ' and then end with 'world!'.
const fs = require('fs');
const file = fs.createWriteStream('example.txt');
file.write('hello, ');
file.end('world!');
// Writing more now is not allowed!

你可以用可写流从可读流中读取数据:

const Stream = require('stream')

const readableStream = new Stream.Readable()
const writableStream = new Stream.Writable()

writableStream._write = (chunk, encoding, next) => {
    console.log(chunk.toString())
    next()
}

readableStream.pipe(writableStream)

readableStream.push('ping!')
readableStream.push('pong!')

writableStream.end()

还可以用异步迭代器来写入可写流,建议使用

import * as util from 'util';
import * as stream from 'stream';
import * as fs from 'fs';
import {once} from 'events';

const finished = util.promisify(stream.finished); // (A)

async function writeIterableToFile(iterable, filePath) {
  const writable = fs.createWriteStream(filePath, {encoding: 'utf8'});
  for await (const chunk of iterable) {
    if (!writable.write(chunk)) { // (B)
      // Handle backpressure
      await once(writable, 'drain');
    }
  }
  writable.end(); // (C)
  // Wait until done. Throws if there are errors.
  await finished(writable);
}

await writeIterableToFile(
  ['One', ' line of text.\n'], 'tmp/log.txt');
assert.equal(
  fs.readFileSync('tmp/log.txt', {encoding: 'utf8'}),
  'One line of text.\n');

stream.finished() 的默认版本是基于回调的,但是可以通过 util.promisify() 转换为基于 Promise 的版本(A行)。

在此例中,使用以下两种模式:

Writing to a writable stream while handling backpressure (line B):
在处理 backpressure 时写入可写流(B行):

if (!writable.write(chunk)) {
  await once(writable, 'drain');
}

关闭可写流,并等待写入完成(C行):

writable.end();
await finished(writable);

pipeline()

pipeline(管道)是一种机制,可以将一个流的输出作为另一流的输入。它通常用于从一个流中获取数据并将该流的输出传递到另一个流。管道操作没有限制。换句话说,管道可用于分多个步骤处理流数据。

在 Node 10.x 中引入了 stream.pipeline()。这是一种模块方法,用于在流转发错误和正确清理之间进行管道传输,并在管道完成后提供回调。

这是使用管道的例子:

const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

// 使用 pipeline API 可以轻松将一系列流
// 通过管道传输在一起,并在管道完全完成后得到通知。
// 一个有效地用 gzip压缩巨大视频文件的管道:

pipeline(
  fs.createReadStream('The.Matrix.1080p.mkv'),
  zlib.createGzip(),
  fs.createWriteStream('The.Matrix.1080p.mkv.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline failed', err);
    } else {
      console.log('Pipeline succeeded');
    }
  }
);

由于pipe 不安全,应使用 pipeline 代替 pipe

流模块

Node.js 流模块 提供了构建所有流 API 的基础。

Stream 模块是 Node.js 中默认提供的原生模块。 Stream 是 EventEmitter 类的实例,该类在 Node 中异步处理事件。因此流本质上是基于事件的。

要访问流模块:

const stream = require('stream');

stream 模块对于创建新型流实例非常有用。通常不需要使用 stream 模块来消耗流。

流驱动的 Node API

由于它们的优点,许多 Node.js 核心模块提供了原生流处理功能,最值得注意的是:

  • net.Socket 是流所基于的主 API 节点,它是以下大多数 API 的基础
  • process.stdin 返回连接到 stdin 的流
  • process.stdout 返回连接到 stdout 的流
  • process.stderr 返回连接到 stderr 的流
  • fs.createReadStream() 创建一个可读的文件流
  • fs.createWriteStream() 创建可写的文件流
  • net.connect() 启动基于流的连接
  • http.request() 返回 http.ClientRequest 类的实例,它是可写流
  • zlib.createGzip() 使用gzip(一种压缩算法)将数据压缩到流中
  • zlib.createGunzip() 解压缩 gzip 流。
  • zlib.createDeflate() deflate(压缩算法)将数据压缩到流中
  • zlib.createInflate() 解压缩一个deflate流

流 备忘单:

Node.js의 스트림에 대한 심층적인 이해

Node.js의 스트림에 대한 심층적인 이해

Node.js의 스트림에 대한 심층적인 이해

Node.js의 스트림에 대한 심층적인 이해

Node.js의 스트림에 대한 심층적인 이해

查看更多:Node.js 流速查表

以下是与可写流相关的一些重要事件:

  • error – 파이프라인을 쓰거나 구성하는 동안 오류가 발생했음을 나타냅니다. error –表示在写或配置管道时发生了错误。
  • pipeline – 当把可读流传递到可写流中时,该事件由可写流发出。
  • unpipe
  • 파이프라인 – 이 이벤트는 읽기 가능한 스트림을 쓰기 가능한 스트림으로 전달할 때 쓰기 가능한 스트림에서 발생합니다.

unpipe – 읽을 수 있는 스트림에서 unpipe를 호출하고 대상 스트림으로의 파이프 연결을 중지할 때 발생합니다.

결론

이것이 스트리밍에 대한 모든 기본 사항입니다. 스트림, 파이프 및 체인은 Node.js의 핵심이자 가장 강력한 기능입니다. 스트림은 I/O를 수행하기 위한 간결하고 효율적인 코드를 작성하는 데 실제로 도움이 됩니다. 또한 BOB

이라는
BOB

이라는 기대되는

Node.js 전략 계획

이 있습니다. 이는 Node.js의 내부 데이터 흐름과 미래 Node.js의 역할을 할 공개 API를 개선하는 것을 목표로 합니다. 스트리밍 데이터 인터페이스.

영어 원본 주소: https://nodesource.com/blog/understanding-streams-in-nodejs

저자: Liz Parody번역: Crazy Technology House

🎜🎜관련 추천: 🎜nodejs 튜토리얼🎜🎜

위 내용은 Node.js의 스트림에 대한 심층적인 이해의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 segmentfault.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제