>  기사  >  웹 프론트엔드  >  Node.js의 4가지 스트림 유형에 대한 심층적인 이해

Node.js의 4가지 스트림 유형에 대한 심층적인 이해

青灯夜游
青灯夜游앞으로
2021-08-30 10:18:511947검색

이 기사에서는 Node.js의 4가지 유형의 스트림을 이해하고 버퍼 버퍼의 "배압" 문제를 해결하는 방법을 살펴보겠습니다. 필요한 친구는 배우고 이해할 수 있습니다~

Node.js의 4가지 스트림 유형에 대한 심층적인 이해

하나 넣어보세요. A에서 B로 물건을 옮기는 방법은 무엇입니까?

들어올려 목적지까지 이동한 후 내려놓으면 됩니다.

이 물건의 무게가 1톤이라면 어떨까요?

그럼 파트별로 옮겨보세요.

사실 IO는 네트워크 IO, 파일 IO 등을 옮기는 것을 의미합니다. 데이터 양이 적다면 모든 콘텐츠를 직접 전송할 수 있지만, 콘텐츠가 많으면 한꺼번에 메모리에 로드하는 것입니다. 시간이 충돌하고 속도가 느려질 것입니다. 이때 부분적으로 처리하면 됩니다. [추천 학습: "nodejs Tutorial"]

다양한 언어에서는 기본적으로 스트림 API를 구현했으며, Node.js도 스트림 API를 구현했습니다. 아래에서 스트림 API를 더 살펴보겠습니다.

이 기사는 다음 질문에 답할 것입니다:

  • Node.js의 4가지 스트림은 무엇입니까
  • 생성기는 읽기 가능한 스트림과 어떻게 결합합니까?
  • 스트림의 일시 중지 및 흐름
  • 배압 문제는 무엇이며 어떻게 발생합니까?

Node.js의 네 가지 유형의 스트림

스트림의 직관적인 느낌

은 분명히 유출과 유입이 있습니다. (읽기 가능), 유입되는 한쪽은 쓰기 가능한 스트림(writable)입니다.

Node.js의 4가지 스트림 유형에 대한 심층적인 이해

물론 들어오고 나가는 스트림도 있습니다. 이를 이중 흐름이라고 합니다.

Node.js의 4가지 스트림 유형에 대한 심층적인 이해

들어오고 나가는 콘텐츠를 변환할 수 있나요? , 이러한 종류의 흐름을 변환 흐름(transform)

Node.js의 4가지 스트림 유형에 대한 심층적인 이해

이중 흐름이라고 합니다. 이중 흐름의 유입 및 유출 내용은 관련될 필요가 없지만 변환 흐름의 유입 및 유출은 관련됩니다. 둘의 차이점입니다.

Stream api

Node.js에서 제공하는 스트림은 위에 소개된 4가지 유형입니다.

const stream = require('stream');

// 可读流
const Readable = stream.Readable;
// 可写流
const Writable = stream.Writable;
// 双工流
const Duplex = stream.Duplex;
// 转换流
const Transform = stream.Transform;

모두 구현해야 할 메서드가 있습니다.

  • Readable은 콘텐츠를 반환하기 위해 _read 메서드를 구현해야 합니다.
  • Writable이 필요합니다. 콘텐츠를 허용하는 _write 메서드 구현
  • Duplex는 콘텐츠를 허용하고 반환하기 위해 _read 및 _write 메서드를 구현해야 합니다
  • Transform은 수신된 콘텐츠를 변환하고 반환하기 위해 _transform 메서드를 구현해야 합니다

각각 살펴보겠습니다.

Readable

Readable은 _read 메서드를 구현하고 푸시를 통해 특정 데이터를 반환해야 합니다.

const Stream = require('stream');

const readableStream = Stream.Readable();

readableStream._read = function() {
    this.push('阿门阿前一棵葡萄树,');
    this.push('阿东阿东绿的刚发芽,');
    this.push('阿东背着那重重的的壳呀,');
    this.push('一步一步地往上爬。')
    this.push(null);
}

readableStream.on('data', (data)=> {
    console.log(data.toString())
});

readableStream.on('end', () => {
    console.log('done~');
});

Null을 푸시하면 스트림의 끝을 의미합니다.

실행 효과는 다음과 같습니다.

Node.js의 4가지 스트림 유형에 대한 심층적인 이해

읽기 가능 생성은 상속을 통해서도 수행할 수 있습니다.

const Stream = require('stream');

class ReadableDong extends Stream.Readable {

    constructor() {
        super();
    }

    _read() {
        this.push('阿门阿前一棵葡萄树,');
        this.push('阿东阿东绿的刚发芽,');
        this.push('阿东背着那重重的的壳呀,');
        this.push('一步一步地往上爬。')
        this.push(null);
    }

}

const readableStream = new ReadableDong();

readableStream.on('data', (data)=> {
    console.log(data.toString())
});

readableStream.on('end', () => {
    console.log('done~');
});

읽기 가능 스트림은 콘텐츠를 생성하므로 생성기와 자연스럽게 결합될 수 있습니다.

const Stream = require('stream');

class ReadableDong extends Stream.Readable {

    constructor(iterator) {
        super();
        this.iterator = iterator;
    }

    _read() {
        const next = this.iterator.next();
        if(next.done) {
            return this.push(null);
        } else {
            this.push(next.value)
        }
    }

}

function *songGenerator() {
    yield '阿门阿前一棵葡萄树,';
    yield '阿东阿东绿的刚发芽,';
    yield '阿东背着那重重的的壳呀,';
    yield '一步一步地往上爬。';
}

const songIterator = songGenerator();

const readableStream = new ReadableDong(songIterator);

readableStream.on('data', (data)=> {
    console.log(data.toString())
});

readableStream.on('end', () => {
    console.log('done~');
});

읽기 가능 스트림입니다. , _read 메소드를 구현하여 콘텐츠를 반환합니다.

Writable

Writable _write 메서드를 구현하려면 작성된 내용을 받습니다.

const Stream = require('stream');

const writableStream = Stream.Writable();

writableStream._write = function (data, enc, next) {
   console.log(data.toString());
   // 每秒写一次
   setTimeout(() => {
       next();
   }, 1000);
}

writableStream.on('finish', () => console.log('done~'));

writableStream.write('阿门阿前一棵葡萄树,');
writableStream.write('阿东阿东绿的刚发芽,');
writableStream.write('阿东背着那重重的的壳呀,');
writableStream.write('一步一步地往上爬。');
writableStream.end();

작성된 내용을 받아 인쇄하고, 다음으로 작성된 내용을 처리하기 위해 next를 호출합니다. 여기서 다음 호출은 비동기식이며 빈도를 제어할 수 있습니다.

한 동안 실행한 후 작성된 내용은 실제로 정상적으로 처리될 수 있습니다.

Node.js의 4가지 스트림 유형에 대한 심층적인 이해

이것은 쓰기 가능한 스트림이며 작성된 내용은 _write 메소드를 구현하여 처리됩니다.

Duplex

Duplex는 읽기 및 쓰기가 가능합니다. _read와 _write를 동시에 구현하세요

const Stream = require('stream');

var duplexStream = Stream.Duplex();

duplexStream._read = function () {
    this.push('阿门阿前一棵葡萄树,');
    this.push('阿东阿东绿的刚发芽,');
    this.push('阿东背着那重重的的壳呀,');
    this.push('一步一步地往上爬。')
    this.push(null);
}

duplexStream._write = function (data, enc, next) {
    console.log(data.toString());
    next();
}

duplexStream.on('data', data => console.log(data.toString()));
duplexStream.on('end', data => console.log('read done~'));

duplexStream.write('阿门阿前一棵葡萄树,');
duplexStream.write('阿东阿东绿的刚发芽,');
duplexStream.write('阿东背着那重重的的壳呀,');
duplexStream.write('一步一步地往上爬。');
duplexStream.end();

duplexStream.on('finish', data => console.log('write done~'));

Readable 스트림과 Writable 스트림의 기능을 통합합니다. 이것이 이중 스트림 Duplex입니다.

Node.js의 4가지 스트림 유형에 대한 심층적인 이해

Transform

이중 스트림은 읽고 쓸 수 있지만 때로는 들어오는 콘텐츠를 변환한 다음 외부로 내보내야 합니다. 변신하다.

Transform 스트림은 _transform API를 구현해야 합니다. 우리는 콘텐츠를 반전시키는 변환 스트림을 구현합니다:

const Stream = require('stream');

class TransformReverse extends Stream.Transform {

  constructor() {
    super()
  }

  _transform(buf, enc, next) {
    const res = buf.toString().split('').reverse().join('');
    this.push(res)
    next()
  }
}

var transformStream = new TransformReverse();

transformStream.on('data', data => console.log(data.toString()))
transformStream.on('end', data => console.log('read done~'));

transformStream.write('阿门阿前一棵葡萄树');
transformStream.write('阿东阿东绿的刚发芽');
transformStream.write('阿东背着那重重的的壳呀');
transformStream.write('一步一步地往上爬');
transformStream.end()

transformStream.on('finish', data => console.log('write done~'));

잠시 동안 실행하면 효과는 다음과 같습니다:

Node.js의 4가지 스트림 유형에 대한 심층적인 이해

流的暂停和流动

我们从 Readable 流中获取内容,然后流入 Writable 流,两边分别做 _read 和 _write 的实现,就实现了流动。

Node.js의 4가지 스트림 유형에 대한 심층적인 이해

背压

但是 read 和 write 都是异步的,如果两者速率不一致呢?

如果 Readable 读入数据的速率大于 Writable 写入速度的速率,这样就会积累一些数据在缓冲区,如果缓冲的数据过多,就会爆掉,会丢失数据。

而如果  Readable 读入数据的速率小于 Writable 写入速度的速率呢?那没关系,最多就是中间有段空闲时期。

这种读入速率大于写入速率的现象叫做“背压”,或者“负压”。也很好理解,写入段压力比较大,写不进去了,会爆缓冲区,导致数据丢失。

这个缓冲区大小可以通过 readableHighWaterMark 和 writableHightWaterMark 来查看,是 16k。

Node.js의 4가지 스트림 유형에 대한 심층적인 이해

解决背压

怎么解决这种读写速率不一致的问题呢?

当没写完的时候,暂停读就行了。这样就不会读入的数据越来越多,驻留在缓冲区。

readable stream 有个 readableFlowing 的属性,代表是否自动读入数据,默认为 true,也就是自动读入数据,然后监听 data 事件就可以拿到了。

当 readableFlowing 设置为 false 就不会自动读了,需要手动通过 read 来读入。

readableStream.readableFlowing = false;

let data;
while((data = readableStream.read()) != null) {
    console.log(data.toString());
}

但自己手动 read 比较麻烦,我们依然可以用自动流入的方式,调用 pause 和 resume 来暂停和恢复就行了。

当调用 writable stream 的 write 方法的时候会返回一个 boolean 值代表是写入了目标还是放在了缓冲区:

  • true: 数据已经写入目标
  • false:目标不可写入,暂时放在缓冲区

我们可以判断返回 false 的时候就 pause,然后等缓冲区清空了就 resume:

const rs = fs.createReadStream(src);
const ws = fs.createWriteStream(dst);

rs.on('data', function (chunk) {
    if (ws.write(chunk) === false) {
        rs.pause();
    }
});

rs.on('end', function () {
    ws.end();
});

ws.on('drain', function () {
    rs.resume();
});

这样就能达到根据写入速率暂停和恢复读入速率的功能,解决了背压问题。

pipe 有背压问题么?

平时我们经常会用 pipe 来直接把 Readable 流对接到 Writable 流,但是好像也没遇到过背压问题,其实是 pipe 内部已经做了读入速率的动态调节了。

const rs = fs.createReadStream(src);
const ws = fs.createWriteStream(dst);

rs.pipe(ws);

总结

流是传输数据时常见的思想,就是一部分一部分的传输内容,是文件读写、网络通信的基础概念。

Node.js 也提供了 stream 的 api,包括 Readable 可读流、Writable 可写流、Duplex 双工流、Transform 转换流。它们分别实现 _read、_write、_read + _write、_transform 方法,来做数据的返回和处理。

创建 Readable 对象既可以直接调用 Readable api 创建,然后重写 _read 方法,也可以继承 Readable 实现一个子类,之后实例化。其他流同理。(Readable 可以很容易的和 generator 结合)

当读入的速率大于写入速率的时候就会出现“背压”现象,会爆缓冲区导致数据丢失,解决的方式是根据 write 的速率来动态 pause 和 resume 可读流的速率。pipe 就没有这个问题,因为内部做了处理。

流是掌握 IO 绕不过去的一个概念,而背压问题也是流很常见的问题,遇到了数据丢失可以考虑是否发生了背压。希望这篇文章能够帮大家理清思路,真正掌握 stream!

更多编程相关知识,请访问:编程入门!!

위 내용은 Node.js의 4가지 스트림 유형에 대한 심층적인 이해의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 juejin.cn에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제