Home  >  Article  >  Web Front-end  >  What is a writable stream in Nodejs? how to use

What is a writable stream in Nodejs? how to use

青灯夜游
青灯夜游forward
2020-11-23 17:49:303268browse

What is a writable stream in Nodejs? how to use

Related recommendations: "nodejs tutorial"

What is a writable stream

Writable A stream is an abstraction of data flowing to a device. It is used to consume data flowing from upstream. Data can be written to the device through a writable stream program. Commonly used are local disk files or network responses such as TCP and HTTP.

Look at an example used before

process.stdin.pipe(process.stdout);

*process.stdout* is a writable stream, and the program writes the data passed by the readable stream process.stdin input standard output device. It is very simple to understand the writable stream on the basis of understanding the readable stream. A stream is directional data, in which the readable stream is the data source, the writable stream is the destination, and the pipeline link in the middle is a bidirectional stream.

Writable streams use

Call the **write() ** method of the writable stream instance to write data to the writable stream

const fs = require('fs');
const rs = fs.createReadStream('./w.js');
const ws = fs.createWriteStream('./copy.js');

rs.setEncoding('utf-8');
rs.on('data', chunk => {
  ws.write(chunk);
});

As mentioned earlier, listening to the data event of the readable stream will cause the readable stream to enter flow mode. We call the write() method of the writable stream in the callback event, so that the data is written to the writable stream. In the abstract device, that is, the copy.js file in the current directory.

The write() method has three parameters

  • chunk {String| Buffer}, indicating the data to be written
  • encoding When the written data is a string, you can set the encoding
  • callback The callback function after the data is written

Custom writable streams

Similar to custom readable streams, a simple custom writable stream only requires two steps

  1. Inherit the stream module'sWritable Class
  2. Implementation_write() Method

Let’s implement a simple writable stream and convert the data passed into the writable stream into After capitalizing, output to the standard output device (a better example may be writing to a local disk file, but it involves too many fs operations, which is troublesome, so be lazy. Writing to the standard output device is also a writing behavior)

const Writable = require('stream').Writable

class OutputStream extends Writable {
    _write(chunk, enc, done) {
        // 转大写之后写入标准输出设备
        process.stdout.write(chunk.toString().toUpperCase());
        // 此处不严谨,应该是监听写完之后才调用 done
        process.nextTick(done);
    }
}

module.exports = OutputStream;

is the same as the write() method exposed by the final writable stream. The _write() method has three parameters and has a similar function.

  • chunk The written data is large. Part of the time it is buffer, unless decodeStrings is set to false
  • encoding If the data is a string, you can set the encoding, buffer or object mode will ignore it
  • callback The callback function after data is written can notify the stream of the next data; when an error occurs, you can also set an error parameter

Of course, there is also a _writev() method. Implementation, this method is only called by the stuck write queue and does not need to be implemented.

Instantiate a writable stream

After we have a writable stream class, we can instantiate it and use it. There are several options when instantiating a writable stream. Select, understanding it can help us understand the knowledge we will use later

  • objectModeThe default is false. After setting it to true, the writable.write() method does not only write string In addition to buffers, arbitrary JavaScript objects can be written. A very useful option. It will be introduced in detail later when we introduce the transform stream
  • highWaterMarkThe maximum amount of data written each time, the default value for Buffer is 16kb, objectMode The default value is 16
  • decodeStringsWhether to convert the incoming data into Buffer, the default is true

In this way we It will be clearer to know the meaning of the parameters passed in the _write() method, and it will be helpful for understanding the back pressure mechanism introduced later.

Events

Like readable streams, writable streams also have several commonly used events. With the foundation of readable streams, it is relatively simple to understand

  • pipe When the readable stream calls the pipe() method to transmit data to the writable stream, the pipe event of the writable stream will be triggered
  • unpipe When the readable stream calls the unpipe() method to remove the data transfer, the unpipe event of the writable stream will be triggered.

These two events are used to notify the writable stream data Will come and will be cut off, are rarely used in normal situations.

writeable.write() The method has a bool return value. As mentioned earlier highWaterMark, when the data required to be written is greater than the highWaterMark of the writable stream At this time, the data will not be written at once, and some data will be retained. At this time, writeable.write() will return false, and if it can be processed, it will return true

drain Triggered when there was stranded data before, that is, writeable.write() returned false. After a period of digestion, the backlog of data has been processed and new data can be continued to be written (the original intention of drain) That is drainage and depletion, quite vivid)

除了 write() 方法可写流还有一个常用的方法 end(),参数和 write() 方法相同,但也可以不传入参数,表示没有其它数据需要写入,可写流可以关闭了。

finish 当调用 writable.end() 方法,并且所有数据都被写入底层后会触发 finish 事件

同样出现错误后会触发 error 事件

back pressure

了解了这些事件,结合上之前提到的可读流的一些知识,我们就能探讨一些有意思的话题了。在最开始我们提到过用流相对于直接操作文件的好处之一是不会把内存压爆,那么流是怎么做到的呢?

最开始我们可能会想到因为流不是一次性把所有数据载入内存处理,而是一边读一边写。但我们知道一般读取的速度会远远快于写入的速度,那么 pipe()  方法是怎么做到供需平衡的呢?

回忆一些基础知识,我们自己来实现一下 pipe() 方法的核心原理

  1. 可读流有流动和暂停两种模式,可以通过 **pause() resume() **方法切换
  2. 可写流的 **write() **方法会返回是否能处理当前的数据,每次可以处理多少是 hignWatermark 决定的
  3. 当可写流处理完了积压数据会触发 drain 事件

我们可以利用这三点来做到数据读取和写入的同步,还是使用之前的例子,但为了使消费速度降下来,我们各一秒再通知完成

class OutputStream extends Writable {
    _write(chunk, enc, done) {
        // 转大写之后写入标准输出设备
        process.stdout.write(chunk.toString().toUpperCase());
        // 故意延缓通知继续传递数据的时间,造成写入速度慢的现象
        setTimeout(done, 1000);
    }
}

我们使用一下自定义的两个类

const RandomNumberStream = require('./RandomNumberStream');
const OutputStream = require('./OutputStream');

const rns = new RandomNumberStream(100);
const os = new OutputStream({
    highWaterMark: 8 // 把水位降低,默认16k还是挺大的
});

rns.on('data', chunk => {
    // 当待处理队列大于 highWaterMark 时返回 false
    if (os.write(chunk) === false) { 
        console.log('pause');
        rns.pause(); // 暂停数据读取
    }
});

// 当待处理队列小于 highWaterMark 时触发 drain 事件
os.on('drain', () => {
    console.log('drain')
    rns.resume(); // 恢复数据读取
});

结合前面的三点和注释很容易看懂上面代码,这就是 pipe() 方法起作用的核心原理。数据的来源的去向我们有了大概了解,后面可以开始介绍数据的加工

  • duplex
  • transform

更多编程相关知识,请访问:编程学习课程!!

The above is the detailed content of What is a writable stream in Nodejs? how to use. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:cnblogs.com. If there is any infringement, please contact admin@php.cn delete