The flow mode of the readable stream. This flow mode will have a "switch", and every time the "switch" is turned on When the flow mode takes effect, if this "switch" is set to pause, then this readable stream will not
This series of actions are all based on events, and we all know that events in node are implemented in a publish-subscribe model.
Let’s take a look at how node uses readable streams to read the contents of files?
First we create a readable stream through the fs module. The readable stream accepts two parameters:
The following table illustrates that different symbols represent different meanings:
Symbol | Meaning |
r | Reading the file, the file does not exist and an error is reported |
r | Read and write, the file does not exist and an error is reported |
rs | Read the file synchronously and ignore the cache |
w | Write the file, create it if it does not exist, clear it if it exists |
wx | Write the file exclusively |
w | Read and write the file, create it if it does not exist, clear it if it exists |
wx | Similar to w, exclusive mode Open |
a | Append writing |
ax | Similar to a, write exclusively |
a | Read and append writing, create if it does not exist |
ax | The role is the same as a Similar, but opens the file exclusively |
autoClose: This parameter is mainly used to control the closing of files. If an error occurs during the file reopening or other operations, the file needs to be closed. Then this parameter is the function to set whether the file is automatically closed.
encoding: Use buffer in node to read binary data for file operations. When these data are displayed, we will see a bunch of gibberish, so we need to specify a specific encoding format for this data. The data will then be encoded and converted so that the converted data is data we can understand.
starts: This parameter is mainly used to specify the position from which to start reading the contents of the file. The default is to start from zero.
ends: This parameter is mainly used to specify the specific length of data to be read from the file. It needs to be explained here that this parameter includes its own position, which is the so-called package front And after the package.
Let’s take a look at specific examples of readable streams:
let fs = require("fs");
let rs = fs.createReadStream("./a.js", {
highWaterMark: 3,
encoding: "utf8",
autoClose: true,
start: 0,
end: 9
});
rs.on("open", () => {console.log("open");});
rs.on("close", () => {console.log("close");});
rs.on("data", data => {
console.log(data);
rs.pause();//暂停读取 此时流动模式为暂停模式
});
setInterval(() => {
rs.resume();//重新设置为流动模式,开始读取数据
}, 1000);
rs.on("end", () => { console.log("end"); });
rs.on("error", err => { console.log(err); });
The first step of handwritten readable streams
We said above that the readable stream of node is based on the core module events of node, so we need to inherit the events module when implementing our own readable stream. The code is as follows:
let fs = require('fs');
let EventEmitter = require('events');
class ReadStream extends EventEmitter {
}
Inheriting the EventEmitter class, we can use various methods in the EventEmitter class, and also use the publish-subscribe model to process events.
Step 2: Process the parameters of the readable stream configuration
We mentioned above that you can configure this when creating a readable stream in node Specific parameters of stream configuration, such as
let rs = fs.createReadStream("./a.js", {
highWaterMark: 3,
encoding: "utf8",
autoClose: true,
start: 0,
end: 9
});
So for these parameters, the readable stream class we implement ourselves also needs to process these parameters, so how should these parameters be processed?
constructor(path, options = {}) {
super();
this.path = path; //指定要读取的文件地址
this.highWaterMark = options.highWaterMark || 64 * 1024;
this.autoClose = options.autoClose || true; //是否自动关闭文件
this.start = options.start || 0; //从文件哪个位置开始读取
this.pos = this.start; // pos会随着读取的位置改变
this.end = options.end || null; // null表示没传递
this.encoding = options.encoding || null;// buffer编码
this.flags = options.flags || 'r';
this.flowing = null; // 模式开关
this.buffer = Buffer.alloc(this.highWaterMark);// 根据设置创建一个buffer存储读出来的数
this.open();
}
The usual configuration principle is based on the parameters configured by the user. If the user does not set this parameter, the default configuration will be used.
The third step to implement a readable stream: open the file
The principle here is to use the open method in the node module fs. First, let's review the use of the fs.open() method.
fs.open(filename,flags,[mode],callback);
//实例
fs.open('./1,txt','r',function(err,fd){});
Need to explain here, Callback functionThere are 2 parameters in callback:
The first one is error, and all asynchronous callbacks in node will A parameter returned to describe the specific error message
The second parameter is fd, which is the file descriptor used to identify the file, which is equivalent to The first parameter of the open function
Okay, now let’s take a look at how to implement the open method of our own readable stream:
open() {
fs.open(this.path, this.flags, (err, fd) => {
//fd标识的就是当前this.path这个文件,从3开始(number类型)
if (err) {
if (this.autoClose) { // 如果需要自动关闭则去关闭文件
this.destroy(); // 销毁(关闭文件,触发关闭事件)
}
this.emit('error', err); // 如果有错误触发error事件
return;
}
this.fd = fd; // 保存文件描述符
this.emit('open', this.fd); // 触发文件的打开的方法
});
}
From the code We can see from the above:
fs.open function is an asynchronous function, which means that the callback is executed asynchronously. When the file is successfully opened, the fd attribute is also obtained asynchronously. This needs to be noted. .
Another important point is that if an error occurs when opening the file, it means that the file opening failed, then the file needs to be closed at this time.
The fourth step to implement readable streams: read the file content
We mentioned in detail above that the readable stream itself defines a " switch", when we want to read the content in the file, we need to turn on this "switch", so how does the node readable stream itself turn on this "switch"?
Listen to data events
The node readable stream realizes the opening of this "switch" by listening to the data event:
rs.on("data", data => {
console.log(data);
});
When the user listens to the data event When , the "switch" is turned on and the content is continuously read from the file. So how does node listen to data events?
The answer is the newListener of the event module
This is because the node readable stream is based on events, and in the event, the server can listen to all events from the user through the newListener event. Each Events have corresponding types. When the user listens to the data event, we can obtain it and then read the content in the file. So how can we implement our own readable stream?
// 监听newListener事件,看是否监听了data事件,如果监听了data事件的话,就开始启动流动模式,读取文件中的内容
this.on("newListener", type => {
if (type === "data") {
// 开启流动模式,开始读取文件中的内容
this.flowing = true;
this.read();
}
});
好了,知道了这个"开关"是如何打开的,那么这个时候就到了真正读取文件中内容的关键时候了,先上代码先:
read() {
// 第一次读取文件的话,有可能文件是还没有打开的,此时this.fd可能还没有值
if (typeof this.fd !== "number") {
// 如果此时文件还是没有打开的话,就触发一次open事件,这样文件就真的打开了,然后再读取
return this.once("open", () => this.read());
}
// 具体每次读取多少个字符,需要进行计算,因为最后一次读取倒的可能比highWaterMark小
let howMuchRead = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
fs.read(this.fd, this.buffer, 0, howMuchRead, this.pos, (err, byteRead) => {
// this.pos 是每次读取文件读取的位置,是一个偏移量,每次读取会发生变化
this.pos += byteRead;
// 将读取到的内容转换成字符串串,然后通过data事件,将内容发布出去
let srr = this.encoding ? this.buffer.slice(0, byteRead).toString(this.encoding) : this.buffer.slice(0, byteRead);
// 将内容通过data事件发布出去
this.emit("data", srr);
// 当读取到到内容长度和设置的highWaterMark一致的话,并且还是流动模式的话,就继续读取
if ((byteRead === this.highWaterMark) && this.flowing) {
return this.read();
}
// 没有更多的内容了,此时表示文件中的内容已经读取完毕
if (byteRead < this.highWaterMark) {
// 读取完成,发布end方法,并关闭文件
this.emit("end");
this.destory();
}
});
}
这里我们特别要注意的是:
文件是否已经打开,是否获取到fd,如果没有打开的话,则再次触发open方法
分批次读取文件内容,每次读取的内容是变化的,所以位置和偏移量是要动态计算的
控制读取停止的条件。
实现可读流第五步:关闭文件
好了,到现在,基础的读取工作已经完成,那么就需要将文件关闭了,上面的open和read方法里面都调用了一个方法:destory,没错,这个就是关闭文件的方法,好了,那么我们来看看这个方法该如何实现吧
destory() {
if (typeof this.fd !== "number") {
// 发布close事件
return this.emit("close");
}
// 将文件关闭,发布close事件
fs.close(this.fd, () => {
this.emit("close");
});
}
当然这块的原理就是调用fs模块的close方法啦。
实现可读流第六步:暂停和恢复
既然都说了,node可读流有一个神奇的"开关",就像大坝的阀门一样,可以控制水的流动,同样也可以控制水的暂停啦。当然在node可读流中的暂停是停止对文件的读取,恢复就是将开关打开,继续读取文件内容,那么这两个分别对应的方法就是pause()和resume()方法。
那么我们自己的可读流类里面该如何实现这两个方法的功能呢?非常简单:
我们在定义类的私有属性的时候,定义了这样一个属性flowing,当它的值为true时表示开关打开,反之关闭。
pause() {
this.flowing = false;// 将流动模式设置成暂停模式,不会读取文件
}
resume() {
this.flowing = true;//将模式设置成流动模式,可以读取文件
this.read();// 重新开始读取文件
}
相信看了本文案例你已经掌握了方法,更多精彩请关注php中文网其它相关文章!
推荐阅读:
怎样利用JS做出引用传递与值传递
使用JS实做出加密解密操作
The above is the detailed content of How to use node to create a readable flow pattern. For more information, please follow other related articles on the PHP Chinese website!