이 글은 Node Stream의 작동 메커니즘에 대한 설명을 제공합니다(예제 포함). 도움이 필요한 친구들이 참고할 수 있기를 바랍니다.
Node를 배우고 있다면 스트림은 반드시 마스터해야 할 개념일 것입니다. 노드 마스터가 되려면 무술 비법 중 흐름이 빼놓을 수 없는 부분이겠죠.
Stream-Handbook에서 인용함. 이는 Node.js의 심층 학습을 위한 스트림의 중요성을 보여줍니다.
스트리밍을 전송 기능으로 이해할 수 있습니다. 스트림을 통해 데이터는 부작용 없이 원활하게 대상으로 전송될 수 있습니다. Node에서는 Node Stream으로 생성된 스트림이 String과 Buffer 전용으로 사용됩니다. Stream은 전송 능력을 나타내고, Buffer는 콘텐츠 전송을 위한 전달자(이렇게 이해될 수 있습니다. Stream: 테이크아웃 형제, Buffer: your takeaway). 스트림을 생성할 때 ObjectMode를 true로 설정합니다. Stream은 모든 유형의 JS 객체를 전송할 수도 있습니다(스트림에서 특수하게 사용되는 null 제외).
이제 요구 사항이 있습니다. 대용량 파일을 클라이언트에 전송해야 합니다. 요청을 받을 때마다
const fs = require('fs'); const server = require('http').createServer(); server.on('request', (req, res) => { fs.readFile('./big.file', (err, data) => { if (err) throw err; res.end(data); }); }); server.listen(8000);
방법을 사용한다면 이 대용량 파일을 메모리로 읽어온 후 클라이언트로 전송해야 합니다. 이런 방식으로 다음 세 가지 결과가 발생할 수 있습니다.
메모리 고갈
다른 프로세스 속도 저하
가비지 수집기의 부하 증가
따라서 이 방법은 대용량 파일을 전송할 때 매우 유용합니다. 경우에는 좋은 해결책이 아닙니다. 동시성 양이 크고 수백 건의 요청으로 인해 메모리가 쉽게 소모될 수 있습니다.
스트리밍을 사용하면 어떨까요?
const fs = require('fs'); const server = require('http').createServer(); server.on('request', (req, res) => { const src = fs.createReadStream('./big.file'); src.pipe(res); }); server.listen(8000);
이 방법을 사용하면 메모리를 많이 차지하지 않고 조금만 읽고 전송하면 됩니다. 전체 프로세스가 원활하게 진행되고 매우 우아합니다. 압축, 암호화 등 전송 과정에서 파일을 처리하고 싶다면 확장도 쉽습니다(자세한 내용은 추후 소개하겠습니다).
스트림은 Node.js 어디에나 있습니다. 아래 그림에서 볼 수 있듯이 :
스트림은 4 가지 주요 범주로 나뉩니다. Duplex(이중 스트림)
Transform(변환 스트림)
Readable
두 모드에서는 발동 방식과 소비 방식이 다릅니다.
data
이벤트를 수신하여 들어갈 수 있습니다. 비 흐름 모드: 데이터를 얻으려면 read()
메서드를 명시적으로 호출해야 합니다.
data
이벤트 또는 pipe
메서드를 수신하여 를 호출합니다. 재개
흐름을 흐름 모드
상태로 전환하는 방법입니다. 흐름을 비 흐름 모드
상태로 설정하려면 흐름 모드
상태에서 pause
메서드를 호출하세요. 비 흐름 모드
상태에서 resume
메서드를 호출하면 흐름을 흐름 모드
상태로 설정할 수도 있습니다. data
事件便可进入该模式。
Non-Flowing Mode下:需要显示地调用read()
方法,才能获取数据。
两种模式可以互相转换
流的初始状态是Null,通过监听data
事件,或者pipe
方法,调用resume
方法,将流转为Flowing Mode
状态。Flowing Mode
状态下调用pause
方法,将流置为Non-Flowing Mode
状态。Non-Flowing Mode
状态下调用resume
方法,同样可以将流置为Flowing Mode
Flowing 모드 상태에서는 생성된 myReadable 읽기 스트림이 데이터 이벤트를 직접 모니터링하고 데이터가 지속적으로 흘러 소비됩니다.
myReadable.on('data',function(chunk){ consume(chunk);//消费流 })
데이터 이벤트가 모니터링되면 Readable의 내부 프로세스는 아래 그림과 같습니다
핵심 방법은 스트림 내부의 읽기 방법으로, 매개변수 n이 a일 때 다른 작업을 트리거합니다. 다른 가치. 아래 설명의hightwatermark는 스트림 내부의 버퍼 풀 크기를 나타냅니다.
n=undefine(데이터를 소비하고 읽기 가능한 스트림 트리거)
图中黄色标识的_read(),是用户实现流所需要自己实现的方法,这个方法就是实际读取流的方式(可以这样理解,外卖平台给你提供外卖的能力,那_read()方法就相当于你下单点外卖)。后面会详细介绍如何实现_read方法。
以上的流程可以描述为:监听data方法,Readable内部就会调用read方法,来进行触发读流操作,通过判断是同步还是异步读取,来决定读取的数据是否放入缓冲区。如果为异步的,那么就要调用flow方法,来继续触发read方法,来读取流,同时根据size参数判定是否emit('data')来消费流,循环读取。如果是同步的,那就emit('data')来消费流,同时继续触发read方法,来读取流。一旦push方法传入的是null,整个流就结束了。
从使用者的角度来看,在这种模式下,你可以通过下面的方式来使用流
const fs = require('./fs'); const readFile = fs.createReadStream('./big.file'); const writeFile = fs.createWriteStream('./writeFile.js'); readFile.on('data',function(chunk){ writeFile1.write(chunk); })
相对于Flowing mode,Non-Flowing Mode要相对简单很多。
消费该模式下的流,需要使用下面的方式
myReadable.on(‘readable’,function(){ const chunk = myReadable.read() consume(chunk);//消费流 })
在Non-Flowing Mode下,Readable内部的流程如下图:
从这个图上看出,你要实现该模式的读流,同样要实现一个_read方法。
整个流程如下:监听readable方法,Readable内部就会调用read方法。调用用户实现的_read方法,来push数据到缓冲池,然后发送emit readable事件,通知用户端消费。
从使用者的角度来看,你可以通过下面的方式来使用该模式下的流
const fs = require('fs'); const readFile = fs.createReadStream('./big.file'); const writeFile = fs.createWriteStream('./writeFile.js'); readFile.on('readable',function(chunk) { while (null !== (chunk = myReadable.read())) { writeFile.write(chunk); } });
相对于读流,写流的机制就更容易理解了。
写流使用下面的方式进行数据写入
myWrite.write(chunk);
调用write后,内部Writable的流程如下图所示
类似于读流,实现一个写流,同样需要用户实现一个_write方法。
整个流程是这样的:调用write之后,会首先判定是否要写入缓冲区。如果不需要,那就调用用户实现的_write方法,将流写入到相应的地方,_write会调用一个writeable内部的一个回调函数。
从使用者的角度来看,使用一个写流,采用下面的代码所示的方式。
const fs = require('fs'); const readFile = fs.createReadStream('./big.file'); const writeFile = fs.createWriteStream('./writeFile.js'); readFile.on('data',function(chunk) { writeFile.write(chunk); })
可以看到,使用写流是非常简单的。
我们先讲解一下如何实现一个读流和写流,再来看Duplex和Transform是什么,因为了解了如何实现一个读流和写流,再来理解Duplex和Transform就非常简单了。
实现自定义的Readable,只需要实现一个_read方法即可,需要在_read方法中调用push方法来实现数据的生产。如下面的代码所示:
const Readable = require('stream').Readable; class MyReadable extends Readable { constructor(dataSource, options) { super(options); this.dataSource = dataSource; } _read() { const data = this.dataSource.makeData(); setTimeout(()=>{ this.push(data); }); } } // 模拟资源池 const dataSource = { data: new Array(10).fill('-'), makeData() { if (!dataSource.data.length) return null; return dataSource.data.pop(); } }; const myReadable = new MyReadable(dataSource,); myReadable.on('readable', () => { let chunk; while (null !== (chunk = myReadable.read())) { console.log(chunk); } });
实现自定义的writable,只需要实现一个_write方法即可。在_write中消费chunk写入到相应地方,并且调用callback回调。如下面代码所示:
const Writable = require('stream').Writable; class Mywritable extends Writable{ constuctor(options){ super(options); } _write(chunk,endcoding,callback){ console.log(chunk); callback && callback(); } } const myWritable = new Mywritable();
双工流:简单理解,就是讲一个Readable流和一个Writable流绑定到一起,它既可以用来做读流,又可以用来做写流。
实现一个Duplex流,你需要同时实现_read和_write方法。
有一点需要注意的是:它所包含的 Readable流和Writable流是完全独立,互不影响的两个流,两个流使用的不是同一个缓冲区。通过下面的代码可以验证
// 模拟资源池1 const dataSource1 = { data: new Array(10).fill('a'), makeData() { if (!dataSource1.data.length) return null; return dataSource1.data.pop(); } }; // 模拟资源池2 const dataSource2 = { data: new Array(10).fill('b'), makeData() { if (!dataSource2.data.length) return null; return dataSource2.data.pop(); } }; const Readable = require('stream').Readable; class MyReadable extends Readable { constructor(dataSource, options) { super(options); this.dataSource = dataSource; } _read() { const data = this.dataSource.makeData(); setTimeout(()=>{ this.push(data); }) } } const Writable = require('stream').Writable; class MyWritable extends Writable{ constructor(options){ super(options); } _write(chunk, encoding, callback) { console.log(chunk.toString()); callback && callback(); } } const Duplex = require('stream').Duplex; class MyDuplex extends Duplex{ constructor(dataSource,options) { super(options); this.dataSource = dataSource; } _read() { const data = this.dataSource.makeData(); setTimeout(()=>{ this.push(data); }) } _write(chunk, encoding, callback) { console.log(chunk.toString()); callback && callback(); } } const myWritable = new MyWritable(); const myReadable = new MyReadable(dataSource1); const myDuplex = new MyDuplex(dataSource1); myReadable.pipe(myDuplex).pipe(myWritable);
打印的结果是
abababababababababab
从这个结果可以看出,myReadable.pipe(myDuplex)
,myDuplex充当的是写流,写入的内容是a;myDuplex.pipe(myWritable)
,myDuplex充当的是读流,往myWritable写的却是b;所以说它所包含的 Readable流和Writable流是完全独立的。
理解了Duplex,就更好理解Transform了。Transform是一个转换流,它既有读的功能又有写的功能,但是它和Duplex不同的是,它的读流和写流共用同一个缓冲区;也就是说,通过它读入什么,那它就能写入什么。
实现一个Transform,你只需要实现一个_transform方法。比如最简单的Transform:PassThrough,其源代码如下所示
PassThrough就是一个Transform,但是这个转换流,什么也没做,相当于一个透明的转换流。可以看到_transform中什么都没有,只是简单的将数据进行回调。
如果我们在这个环节做些扩展,只需要在_transform中直接扩展就行了。比如我们可以对流进行压缩,加密,混淆等等操作。
最后介绍一个流中非常重要的一个概念:背压。要了解这个,我们首先来看下pipe和highWaterMaker是什么。
首先看下下面的代码
const fs = require('./fs'); const readFile = fs.createReadStream('./big.file'); const writeFile = fs.createWriteStream('./writeFile.js'); readFile.pipe(writeFile);
上面的代码和下面是等价的
const fs = require('./fs'); const readFile = fs.createReadStream('./big.file'); const writeFile = fs.createWriteStream('./writeFile.js'); readFile.on('data',function(data){ var flag = ws.write(data); if(!flag){ // 当前写流缓冲区已满,暂停读数据 readFile.pause(); } }) writeFile.on('drain',function()){ readFile.resume();// 当前写流缓冲区已清空,重新开始读流 } readFile.on('end',function(data){ writeFile.end();//将写流缓冲区的数据全部写入,并且关闭写入的文件 })
pipe所做的操作就是相当于为写流和读流自动做了速度的匹配。
读写流速度不匹配的情况下,一般情况下不会造成什么问题,但是会造成内存增加。内存消耗增加,就有可能会带来一系列的问题。所以在使用的流的时候,强烈推荐使用pipe。
highWaterMaker说白了,就是定义缓冲区的大小。
默认16Kb(Readable最大8M)
可以自定义
背压的概念可以理解为:为了防止读写流速度不匹配而产生的一种调整机制;背压该调整机制的触发时机,受限于highWaterMaker设置的大小。
如上面的代码 var flag = ws.write(data);
,一旦写流的缓冲区满了,那flag
就会置为false,反向促进读流的速度调整。
主要有以下场景
文件操作(复制,压缩,解压,加密等)
下面的就很容易就实现了文件复制的功能。
const fs = require('fs'); const readFile = fs.createReadStream('big.file'); const writeFile = fs.createWriteStream('big_copy.file'); readFile.pipe(writeFile);
那我们想在复制的过程中对文件进行压缩呢?
const fs = require('fs'); const readFile = fs.createReadStream('big.file'); const writeFile = fs.createWriteStream('big.gz'); const zlib = require('zlib'); readFile.pipe(zlib.createGzip()).pipe(writeFile);
实现解压、加密也是类似的。
静态文件服务器
比如需要返回一个html,可以使用如下代码。
var http = require('http'); var fs = require('fs'); http.createServer(function(req,res){ fs.createReadStream('./a.html').pipe(res); }).listen(8000);
위 내용은 Node Stream의 작동 메커니즘 설명(예제 포함)의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!