>웹 프론트엔드 >JS 튜토리얼 >Node Stream의 작동 메커니즘 설명(예제 포함)

Node Stream의 작동 메커니즘 설명(예제 포함)

不言
不言앞으로
2018-10-23 16:07:242756검색

이 글은 Node Stream의 작동 메커니즘에 대한 설명을 제공합니다(예제 포함). 도움이 필요한 친구들이 참고할 수 있기를 바랍니다.

Node를 배우고 있다면 스트림은 반드시 마스터해야 할 개념일 것입니다. 노드 마스터가 되려면 무술 비법 중 흐름이 빼놓을 수 없는 부분이겠죠.

Stream-Handbook에서 인용함. 이는 Node.js의 심층 학습을 위한 스트림의 중요성을 보여줍니다.

플로우란 무엇인가요?

스트리밍을 전송 기능으로 이해할 수 있습니다. 스트림을 통해 데이터는 부작용 없이 원활하게 대상으로 전송될 수 있습니다. Node에서는 Node Stream으로 생성된 스트림이 String과 Buffer 전용으로 사용됩니다. Stream은 전송 능력을 나타내고, Buffer는 콘텐츠 전송을 위한 전달자(이렇게 이해될 수 있습니다. Stream: 테이크아웃 형제, Buffer: your takeaway). 스트림을 생성할 때 ObjectMode를 true로 설정합니다. Stream은 모든 유형의 JS 객체를 전송할 수도 있습니다(스트림에서 특수하게 사용되는 null 제외).

스트림을 사용하는 이유는 무엇인가요?

이제 요구 사항이 있습니다. 대용량 파일을 클라이언트에 전송해야 합니다. 요청을 받을 때마다

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  fs.readFile('./big.file', (err, data) => {
    if (err) throw err;
    res.end(data);
  });
});

server.listen(8000);

방법을 사용한다면 이 대용량 파일을 메모리로 읽어온 후 클라이언트로 전송해야 합니다. 이런 방식으로 다음 세 가지 결과가 발생할 수 있습니다.

  • 메모리 고갈

  • 다른 프로세스 속도 저하

  • 가비지 수집기의 부하 증가

따라서 이 방법은 대용량 파일을 전송할 때 매우 유용합니다. 경우에는 좋은 해결책이 아닙니다. 동시성 양이 크고 수백 건의 요청으로 인해 메모리가 쉽게 소모될 수 있습니다.

스트리밍을 사용하면 어떨까요?

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  const src = fs.createReadStream('./big.file');
  src.pipe(res);
});

server.listen(8000);

이 방법을 사용하면 메모리를 많이 차지하지 않고 조금만 읽고 전송하면 됩니다. 전체 프로세스가 원활하게 진행되고 매우 우아합니다. 압축, 암호화 등 전송 과정에서 파일을 처리하고 싶다면 확장도 쉽습니다(자세한 내용은 추후 소개하겠습니다).

스트림은 Node.js 어디에나 있습니다. 아래 그림에서 볼 수 있듯이 :

Node Stream의 작동 메커니즘 설명(예제 포함)

Stream 분류

스트림은 4 가지 주요 범주로 나뉩니다. Duplex(이중 스트림)

  • Transform(변환 스트림)

  • Readable

  • 읽기 가능한 스트림의 데이터는 다음 두 가지 모드로 데이터를 생성할 수 있습니다.
  • 흐르는 모드
  • 비흐르는 모드

두 모드에서는 발동 방식과 소비 방식이 다릅니다.

    플로우 모드: 데이터가 지속적으로 생성되어 "플로우" 현상을 형성합니다. 이 모드는 스트림의 data 이벤트를 수신하여 들어갈 수 있습니다.
  • 비 흐름 모드: 데이터를 얻으려면 read() 메서드를 명시적으로 호출해야 합니다.

  • 두 모드를 서로 변환할 수 있습니다
  • Node Stream의 작동 메커니즘 설명(예제 포함)스트림의 초기 상태는 Null입니다. data 이벤트 또는 pipe 메서드를 수신하여 를 호출합니다. 재개흐름을 흐름 모드 상태로 전환하는 방법입니다. 흐름을 비 흐름 모드 상태로 설정하려면 흐름 모드 상태에서 pause 메서드를 호출하세요. 비 흐름 모드 상태에서 resume 메서드를 호출하면 흐름을 흐름 모드 상태로 설정할 수도 있습니다.
다음은 두 가지 모드에서 Readable 스트림의 작동 메커니즘을 자세히 소개합니다.

Flowing 모드

data事件便可进入该模式。

Non-Flowing Mode下:需要显示地调用read()方法,才能获取数据。

两种模式可以互相转换

Node Stream의 작동 메커니즘 설명(예제 포함)

流的初始状态是Null,通过监听data事件,或者pipe方法,调用resume方法,将流转为Flowing Mode状态。Flowing Mode状态下调用pause方法,将流置为Non-Flowing Mode状态。Non-Flowing Mode状态下调用resume方法,同样可以将流置为Flowing ModeFlowing 모드 상태에서는 생성된 myReadable 읽기 스트림이 데이터 이벤트를 직접 모니터링하고 데이터가 지속적으로 흘러 소비됩니다.

myReadable.on('data',function(chunk){
      consume(chunk);//消费流
})

데이터 이벤트가 모니터링되면 Readable의 내부 프로세스는 아래 그림과 같습니다

핵심 방법은 스트림 내부의 읽기 방법으로, 매개변수 n이 a일 때 다른 작업을 트리거합니다. 다른 가치. 아래 설명의

hightwatermark는 스트림 내부의 버퍼 풀 크기를 나타냅니다. Node Stream의 작동 메커니즘 설명(예제 포함)

n=undefine(데이터를 소비하고 읽기 가능한 스트림 트리거)

    n=0(읽기 가능한 스트림을 트리거하지만 소비하지 않음)
  • n>hightwatermark(hightwatermark 값 수정)
  • n

    n>버퍼(버퍼에 있는 null 또는 모든 데이터(당시 마지막 읽기)를 반환할 수 있음)

图中黄色标识的_read(),是用户实现流所需要自己实现的方法,这个方法就是实际读取流的方式(可以这样理解,外卖平台给你提供外卖的能力,那_read()方法就相当于你下单点外卖)。后面会详细介绍如何实现_read方法。

以上的流程可以描述为:监听data方法,Readable内部就会调用read方法,来进行触发读流操作,通过判断是同步还是异步读取,来决定读取的数据是否放入缓冲区。如果为异步的,那么就要调用flow方法,来继续触发read方法,来读取流,同时根据size参数判定是否emit('data')来消费流,循环读取。如果是同步的,那就emit('data')来消费流,同时继续触发read方法,来读取流。一旦push方法传入的是null,整个流就结束了。

从使用者的角度来看,在这种模式下,你可以通过下面的方式来使用流

const fs = require('./fs');
const readFile = fs.createReadStream('./big.file');
const writeFile = fs.createWriteStream('./writeFile.js');
readFile.on('data',function(chunk){
      writeFile1.write(chunk);
})
Non-Flowing Mode

相对于Flowing mode,Non-Flowing Mode要相对简单很多。

消费该模式下的流,需要使用下面的方式

myReadable.on(‘readable’,function(){
     const chunk = myReadable.read()
     consume(chunk);//消费流
})

在Non-Flowing Mode下,Readable内部的流程如下图:

Node Stream의 작동 메커니즘 설명(예제 포함)

从这个图上看出,你要实现该模式的读流,同样要实现一个_read方法。

整个流程如下:监听readable方法,Readable内部就会调用read方法。调用用户实现的_read方法,来push数据到缓冲池,然后发送emit readable事件,通知用户端消费。

从使用者的角度来看,你可以通过下面的方式来使用该模式下的流

const fs = require('fs');
const readFile = fs.createReadStream('./big.file');
const writeFile = fs.createWriteStream('./writeFile.js');

readFile.on('readable',function(chunk) {
    while (null !== (chunk = myReadable.read())) {
        writeFile.write(chunk);
    }
});

Writable

相对于读流,写流的机制就更容易理解了。

写流使用下面的方式进行数据写入

myWrite.write(chunk);

调用write后,内部Writable的流程如下图所示

Node Stream의 작동 메커니즘 설명(예제 포함)

类似于读流,实现一个写流,同样需要用户实现一个_write方法。

整个流程是这样的:调用write之后,会首先判定是否要写入缓冲区。如果不需要,那就调用用户实现的_write方法,将流写入到相应的地方,_write会调用一个writeable内部的一个回调函数。

从使用者的角度来看,使用一个写流,采用下面的代码所示的方式。

const fs = require('fs');
const readFile = fs.createReadStream('./big.file');
const writeFile = fs.createWriteStream('./writeFile.js');

readFile.on('data',function(chunk) {
    writeFile.write(chunk);
})

可以看到,使用写流是非常简单的。

我们先讲解一下如何实现一个读流和写流,再来看Duplex和Transform是什么,因为了解了如何实现一个读流和写流,再来理解Duplex和Transform就非常简单了。

实现自定义的Readable

实现自定义的Readable,只需要实现一个_read方法即可,需要在_read方法中调用push方法来实现数据的生产。如下面的代码所示:

const Readable = require('stream').Readable;

class MyReadable extends Readable {
    constructor(dataSource, options) {
        super(options);
        this.dataSource = dataSource;
    }
    _read() {
        const data = this.dataSource.makeData();
        setTimeout(()=>{
            this.push(data);
        });
    }
}

// 模拟资源池
const dataSource = {
    data: new Array(10).fill('-'),
    makeData() {
        if (!dataSource.data.length) return null;
        return dataSource.data.pop();
    }
};

const myReadable = new MyReadable(dataSource,);

myReadable.on('readable', () => {
    let chunk;
    while (null !== (chunk = myReadable.read())) {
        console.log(chunk);
    }
});

实现自定义的writable

实现自定义的writable,只需要实现一个_write方法即可。在_write中消费chunk写入到相应地方,并且调用callback回调。如下面代码所示:

const Writable = require('stream').Writable;
class Mywritable extends  Writable{
    constuctor(options){
        super(options);
    }
    _write(chunk,endcoding,callback){
        console.log(chunk);
        callback && callback();
    }
}
const myWritable = new Mywritable();

Duplex

双工流:简单理解,就是讲一个Readable流和一个Writable流绑定到一起,它既可以用来做读流,又可以用来做写流。

实现一个Duplex流,你需要同时实现_read_write方法。

有一点需要注意的是:它所包含的 Readable流和Writable流是完全独立,互不影响的两个流,两个流使用的不是同一个缓冲区。通过下面的代码可以验证

// 模拟资源池1
const dataSource1 = {
    data: new Array(10).fill('a'),
    makeData() {
        if (!dataSource1.data.length) return null;
        return dataSource1.data.pop();
    }
};
// 模拟资源池2
const dataSource2 = {
    data: new Array(10).fill('b'),
    makeData() {
        if (!dataSource2.data.length) return null;
        return dataSource2.data.pop();
    }
};

const Readable = require('stream').Readable;
class MyReadable extends Readable {
    constructor(dataSource, options) {
        super(options);
        this.dataSource = dataSource;
    }
    _read() {
        const data = this.dataSource.makeData();
        setTimeout(()=>{
            this.push(data);
        })

    }
}

const Writable = require('stream').Writable;
class MyWritable extends Writable{
    constructor(options){
        super(options);
    }
    _write(chunk, encoding, callback) {
        console.log(chunk.toString());
        callback && callback();
    }
}

const Duplex = require('stream').Duplex;
class MyDuplex extends Duplex{
    constructor(dataSource,options) {
        super(options);
        this.dataSource = dataSource;
    }
    _read() {
        const data = this.dataSource.makeData();
        setTimeout(()=>{
            this.push(data);
        })
    }
    _write(chunk, encoding, callback) {
        console.log(chunk.toString());
        callback && callback();
    }
}

const myWritable = new MyWritable();
const myReadable = new MyReadable(dataSource1);
const myDuplex = new MyDuplex(dataSource1);
myReadable.pipe(myDuplex).pipe(myWritable);

打印的结果是

abababababababababab

从这个结果可以看出,myReadable.pipe(myDuplex),myDuplex充当的是写流,写入的内容是a;myDuplex.pipe(myWritable),myDuplex充当的是读流,往myWritable写的却是b;所以说它所包含的 Readable流和Writable流是完全独立的。

Transform

理解了Duplex,就更好理解Transform了。Transform是一个转换流,它既有读的功能又有写的功能,但是它和Duplex不同的是,它的读流和写流共用同一个缓冲区;也就是说,通过它读入什么,那它就能写入什么。

实现一个Transform,你只需要实现一个_transform方法。比如最简单的Transform:PassThrough,其源代码如下所示

Node Stream의 작동 메커니즘 설명(예제 포함)

PassThrough就是一个Transform,但是这个转换流,什么也没做,相当于一个透明的转换流。可以看到_transform中什么都没有,只是简单的将数据进行回调。

如果我们在这个环节做些扩展,只需要在_transform中直接扩展就行了。比如我们可以对流进行压缩,加密,混淆等等操作。

BackPress

最后介绍一个流中非常重要的一个概念:背压。要了解这个,我们首先来看下pipehighWaterMaker是什么。

pipe

首先看下下面的代码

const fs = require('./fs');
const readFile = fs.createReadStream('./big.file');
const writeFile = fs.createWriteStream('./writeFile.js');
readFile.pipe(writeFile);

上面的代码和下面是等价的

const fs = require('./fs');
const readFile = fs.createReadStream('./big.file');
const writeFile = fs.createWriteStream('./writeFile.js');
readFile.on('data',function(data){
    var flag = ws.write(data);
    if(!flag){ // 当前写流缓冲区已满,暂停读数据
        readFile.pause();
    }
})
writeFile.on('drain',function()){
    readFile.resume();// 当前写流缓冲区已清空,重新开始读流
}
readFile.on('end',function(data){
    writeFile.end();//将写流缓冲区的数据全部写入,并且关闭写入的文件
})

pipe所做的操作就是相当于为写流和读流自动做了速度的匹配。

读写流速度不匹配的情况下,一般情况下不会造成什么问题,但是会造成内存增加。内存消耗增加,就有可能会带来一系列的问题。所以在使用的流的时候,强烈推荐使用pipe

highWaterMaker

highWaterMaker说白了,就是定义缓冲区的大小。

  • 默认16Kb(Readable最大8M)

  • 可以自定义

背压的概念可以理解为:为了防止读写流速度不匹配而产生的一种调整机制;背压该调整机制的触发时机,受限于highWaterMaker设置的大小。

如上面的代码 var flag = ws.write(data);,一旦写流的缓冲区满了,那flag就会置为false,反向促进读流的速度调整。

Stream的应用场景

主要有以下场景

  1. 文件操作(复制,压缩,解压,加密等)

下面的就很容易就实现了文件复制的功能。

const fs = require('fs');
const readFile = fs.createReadStream('big.file');
const writeFile = fs.createWriteStream('big_copy.file');
readFile.pipe(writeFile);

那我们想在复制的过程中对文件进行压缩呢?

const fs = require('fs');
const readFile = fs.createReadStream('big.file');
const writeFile = fs.createWriteStream('big.gz');
const zlib = require('zlib');
readFile.pipe(zlib.createGzip()).pipe(writeFile);

实现解压、加密也是类似的。

  1. 静态文件服务器

比如需要返回一个html,可以使用如下代码。

var http = require('http');
var fs = require('fs');
http.createServer(function(req,res){
    fs.createReadStream('./a.html').pipe(res);
}).listen(8000);

위 내용은 Node Stream의 작동 메커니즘 설명(예제 포함)의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 segmentfault.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제