Home >Web Front-end >JS Tutorial >A brief discussion of Node.js: understanding streams

A brief discussion of Node.js: understanding streams

高洛峰
高洛峰Original
2016-12-28 13:22:311143browse

Stream is an abstract interface in node.js, based on EventEmitter, and is also an advanced encapsulation of Buffer, used to process streaming data. The stream module provides various APIs so that we can use Stream easily.

Streams are divided into four types, as follows:

Readable, readable stream

Writable, writable stream

Duplex, Read and write stream

Transform, extended Duplex, can modify the written data

1, Readable readable stream

A readable stream can be created through stream.Readable, It has two modes: pause and flow.

In flow mode, data will be automatically read from the downstream system and output using the data event; in pause mode, the stream.read() method must be explicitly called to read the data and trigger the data event.

All readable streams are initially in pause mode and can be switched to flowing mode through the following methods:

Listen to the 'data' event

Call stream. resume() method

Call the stream.pipe() method to output data to a writable stream Writable

Similarly, you can also switch to pause mode. There are two methods:

If the pipe target is not set, just call the stream.pause() method.

If the pipe target is set, you need to remove all data listeners and call the stream.unpipe() method

There is a _readableSate object in the Readable object, through which you can get Know what mode the stream is currently in, as shown below:

readable._readableState.flowing = null, there is no data consumer, the stream does not generate data

readable._readableState.flowing = true, it is flowing Mode

readable._readableState.flowing = false, in pause mode

Why use streaming to get data

For small files, use the fs.readFile() method to read It is more convenient to retrieve data, but when you need to read large files, such as files of several G in size, using this method will consume a lot of memory and even cause the program to crash. In this case, it is more appropriate to use streams for processing. Using segmented reading will not cause the memory 'explosion' problem.

data event

is triggered when the stream provides a data block to the consumer. It may be when switching to flow mode, or when the readable.read() method is called and there is When valid data blocks are used, use the following:

const fs = require('fs');
 
const rs = fs.createReadStream('./appbak.js');
var chunkArr = [],
  chunkLen = 0;
rs.on('data',(chunk)=>{
  chunkArr.push(chunk);
  chunkLen+=chunk.length;
});
rs.on('end',(chunk)=>{
  console.log(Buffer.concat(chunkArr,chunkLen).toString());
});

readable event

is triggered when there is available data in the stream that can be read. It is divided into two types, new available of data and reaches the end of the stream, the former stream.read() method returns available data, and the latter returns null, as shown below:

const rs = fs.createReadStream('./appbak.js');
var chunkArr = [],
  chunkLen = 0;
 
rs.on('readable',()=>{
  var chunk = null;
  //这里需要判断是否到了流的末尾
  if((chunk = rs.read()) !== null){
    chunkArr.push(chunk);
    chunkLen+=chunk.length;
  }
});
rs.on('end',(chunk)=>{
  console.log(Buffer.concat(chunkArr,chunkLen).toString());
});

pause and resume methods

stream.pause() The method puts the stream into pause mode and stops the 'data' event triggering. The stream.resume() method puts the stream into flowing mode and resumes the 'data' event triggering. It can also be used to consume all data, as shown below:

const rs = fs.createReadStream('./下载.png');
rs.on('data',(chunk)=>{
  console.log(`接收到${chunk.length}字节数据...`);
  rs.pause();
  console.log(`数据接收将暂停1.5秒.`);
  setTimeout(()=>{
    rs.resume();
  },1000);
});
rs.on('end',(chunk)=>{
  console.log(`数据接收完毕`);
});

pipe(destination[, options]) method

pipe() method binds a writable stream to a readable stream and automatically switches to flow mode to output all data to the writable stream. , and the data flow management is done well, the problem of data loss will not occur, the use is as follows:

const rs = fs.createReadStream('./app.js');
rs.pipe(process.stdout);

The above introduces the data consumption methods of multiple readable streams, but for a readable stream , it is best to choose only one of them, and it is recommended to use the pipe() method.

2. Writable stream

All writable streams are created based on the stream.Writable class. After creation, data can be written to the stream.

write(chunk[, encoding][, callback]) method

write() method writes data to the writable stream. Parameter meaning:

chunk, string or buffer

encoding, if the chunk is a string, it is the encoding of the chunk

callback, the callback function when the current chunk data is written to the disk

The return value of this method is a Boolean value. If it is false, it means that the data block that needs to be written is cached and the cache size exceeds the highWaterMark threshold at this time, otherwise it is true.

Use the following:

const ws = fs.createWriteStream('./test.txt');
ws.write('nihao','utf8',()=>{process.stdout.write('this chunk is flushed.');});
ws.end('done.')

Backpressure mechanism

If the writing speed of the writable stream cannot keep up with the reading speed of the readable stream, write The data added by the method will be cached and gradually increase, resulting in occupying a large amount of memory. What we hope is to consume one piece of data and then read another piece of data, so that the memory is maintained at the same level. How do i do this? You can use the return value of the write method to determine the cache status of the writable stream and the 'drain' event, and switch the mode of the readable stream in time, as shown below:

function copy(src,dest){
  src = path.resolve(src);
  dest = path.resolve(dest);
  const rs = fs.createReadStream(src);
  const ws = fs.createWriteStream(dest);
  console.log('正在复制中...');
  const stime = +new Date();
  rs.on('data',(chunk)=>{
    if(null === ws.write(chunk)){
      rs.pause();
    }
  });
  ws.on('drain',()=>{
    rs.resume();
  });
  rs.on('end',()=>{
    const etime = +new Date();
    console.log(`已完成,用时:${(etime-stime)/1000}秒`);
    ws.end();
  });
  function calcProgress(){
     
  }
}
copy('./CSS权威指南 第3版.pdf','./javascript.pdf');

drain event

If the Writable.write() method returns false, the drain event will be triggered, and the above backpressure mechanism has already used this event.

finish event

After the stream.end() method is called and all buffer data is written to the downstream system, this event will be triggered, as shown below:

const ws = fs.createWriteStream('./alphabet.txt');
const alphabetStr = 'abcdefghijklmnopqrstuvwxyz';
ws.on('finish',()=>{
  console.log('done.');
});
for(let letter of alphabetStr.split()){
  ws.write(letter);
}
ws.end();//必须调用

end([chunk][, encoding][, callback]) method

After the end() method is called, the stream.write() method can no longer be called to write data. Responsible for throwing errors.

3. Duplex read-write stream

Duplex stream implements the interfaces of Readable and Writable classes at the same time. It is both a readable stream and a writable stream. For example, 'zlib streams', 'crypto streams', 'TCP sockets', etc. are all Duplex streams.

4. Transform flow

Duplex流的扩展,区别在于,Transform流自动将写入端的数据变换后添加到可读端。例如:'zlib streams'、'crypto streams'等都是Transform流。

5、四种流的实现

stream模块提供的API可以让我们很简单的实现流,该模块使用require('stream')引用,我们只要继承四种流中的一个基类(stream.Writable, stream.Readable, stream.Duplex, or stream.Transform),然后实现它的接口就可以了,需要实现的接口如下所示:

| Use-case | Class | Method(s) to implement |
 | ------------- |-------------| -----|
 | Reading only | Readable | _read |
 | Writing only | Writable | _write, _writev |
 | Reading and writing | Duplex | _read, _write, _writev |
 | Operate on written data, then read the result | Transform | _transform, _flush |

Readable流实现

如上所示,我们只要继承Readable类并实现_read接口即可,,如下所示:

const Readable = require('stream').Readable;
const util = require('util');
const alphabetArr = 'abcdefghijklmnopqrstuvwxyz'.split();
/*function AbReadable(){
  if(!this instanceof AbReadable){
    return new AbReadable();
  }
  Readable.call(this);
}
util.inherits(AbReadable,Readable);
AbReadable.prototype._read = function(){
  if(!alphabetArr.length){
    this.push(null);
  }else{
    this.push(alphabetArr.shift());
  }
};
 
const abReadable = new AbReadable();
abReadable.pipe(process.stdout);*/
 
/*class AbReadable extends Readable{
  constructor(){
    super();
  }
  _read(){
    if(!alphabetArr.length){
      this.push(null);
    }else{
      this.push(alphabetArr.shift());
    }
  }
}
const abReadable = new AbReadable();
abReadable.pipe(process.stdout);*/
 
/*const abReadable = new Readable({
  read(){
    if(!alphabetArr.length){
      this.push(null);
    }else{
      this.push(alphabetArr.shift());
    }
  }
});
abReadable.pipe(process.stdout);*/
 
const abReadable = Readable();
abReadable._read = function(){
  if (!alphabetArr.length) {
    this.push(null);
  } else {
    this.push(alphabetArr.shift());
  }
}
abReadable.pipe(process.stdout);

以上代码使用了四种方法创建一个Readable可读流,必须实现_read()方法,以及用到了readable.push()方法,该方法的作用是将指定的数据添加到读取队列。

Writable流实现

我们只要继承Writable类并实现_write或_writev接口,如下所示(只使用两种方法):

/*class MyWritable extends Writable{
  constructor(){
    super();
  }
  _write(chunk,encoding,callback){
    process.stdout.write(chunk);
    callback();
  }
}
const myWritable = new MyWritable();*/
const myWritable = new Writable({
  write(chunk,encoding,callback){
    process.stdout.write(chunk);
    callback();
  }
});
myWritable.on('finish',()=>{
  process.stdout.write('done');
})
myWritable.write('a');
myWritable.write('b');
myWritable.write('c');
myWritable.end();

Duplex流实现

实现Duplex流,需要继承Duplex类,并实现_read和_write接口,如下所示:

class MyDuplex extends Duplex{
  constructor(){
    super();
    this.source = [];
  }
  _read(){
    if (!this.source.length) {
      this.push(null);
    } else {
      this.push(this.source.shift());
    }
  }
  _write(chunk,encoding,cb){
    this.source.push(chunk);
    cb();
  }
}
 
const myDuplex = new MyDuplex();
myDuplex.on('finish',()=>{
  process.stdout.write('write done.')
});
myDuplex.on('end',()=>{
  process.stdout.write('read done.')
});
myDuplex.write('\na\n');
myDuplex.write('c\n');
myDuplex.end('b\n');
myDuplex.pipe(process.stdout);

上面的代码实现了_read()方法,可作为可读流来使用,同时实现了_write()方法,又可作为可写流来使用。

Transform流实现

实现Transform流,需要继承Transform类,并实现_transform接口,如下所示:

class MyTransform extends Transform{
  constructor(){
    super();
  }
  _transform(chunk, encoding, callback){
    chunk = (chunk+'').toUpperCase();
    callback(null,chunk);
  }
}
const myTransform = new MyTransform();
myTransform.write('hello world!');
myTransform.end();
myTransform.pipe(process.stdout);

上面代码中的_transform()方法,其第一个参数,要么为error,要么为null,第二个参数将被自动转发给readable.push()方法,因此该方法也可以使用如下写法:

_transform(chunk, encoding, callback){
  chunk = (chunk+'').toUpperCase()
  this.push(chunk)
  callback();
}

Object Mode流实现

我们知道流中的数据默认都是Buffer类型,可读流的数据进入流中便被转换成buffer,然后被消耗,可写流写入数据时,底层调用也将其转化为buffer。但将构造函数的objectMode选择设置为true,便可产生原样的数据,如下所示:

const rs = Readable();
rs.push('a');
rs.push('b');
rs.push(null);
rs.on(&#39;data&#39;,(chunk)=>{console.log(chunk);});//<Buffer 61>与<Buffer 62>
 
const rs1 = Readable({objectMode:!0});
rs1.push(&#39;a&#39;);
rs1.push(&#39;b&#39;);
rs1.push(null);
rs1.on(&#39;data&#39;,(chunk)=>{console.log(chunk);});//a与b

下面利用Transform流实现一个简单的CSS压缩工具,如下所示:

function minify(src,dest){
  const transform = new Transform({
    transform(chunk,encoding,cb){
      cb(null,(chunk.toString()).replace(/[\s\r\n\t]/g,&#39;&#39;));
    }
  });
  fs.createReadStream(src,{encoding:&#39;utf8&#39;}).pipe(transform).pipe(fs.createWriteStream(dest));
}
minify(&#39;./reset.css&#39;,&#39;./reset.min.css&#39;);

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持PHP中文网。

更多浅谈Node.js:理解stream相关文章请关注PHP中文网!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn