搜索
首页web前端js教程浅谈Nodejs中的可读流,可读流如何实现?

本篇文章给大家介绍一下Nodejs中的流(stream),看看Node可读流的实现方法。有一定的参考价值,有需要的朋友可以参考一下,希望对大家有所帮助。

浅谈Nodejs中的可读流,可读流如何实现?

stream的概念

流(stream)是 Node.js 中处理流式数据的抽象接口。 stream 模块用于构建实现了流接口的对象。【推荐学习:《nodejs 教程》】

stream的作用

读写大文件的过程中,不会一次性的读写到内存中。可以控制每次读写的个数

stream的分类

1、可读流-Readable

例:fs.createReadStream;

源码位置:lib/_stream_readable.js

2、可写流-Writable

例:fs.createWriteStream;

源码位置:lib/_stream_writable.js

3、双工流-Duplex:满足读写的功能

例:net.Socket();

源码位置:lib/_stream_duplex.js

4、转化流-Transform:用途:压缩,转码

例:

const { Transform } = require('stream');
Transform.call(this, '要转换的数据');//具体的使用详情 见node官网

-源码位置:lib/_stream_tranform.js

可读流读取文件的过程

  • 读取文件代码过程
const path = require("path");
const aPath = path.join(__dirname, "a.txt");//需要读取的文件
const fs = require("fs");
let rs = fs.createReadStream(aPath, {
  flags: "r",
  encoding: null,//默认编码格式是buffer,深挖buffer又要学习字符编码,留个坑 到时候写一个编码规范的学习整理
  autoClose: true,//相当于需要调用close方法,如果为false  文件读取end的时候 就不会执行 close
  start: 0,
  highWaterMark: 3,//每次读取的个数 默认是64*1024个字节
});

rs.on("open", function (fd) {
  // fd  number类型
  console.log("fd", fd);
});
// 他会监听用户,绑定了data事件,就会触发对应的回调,不停的触发
rs.on("data", function (chunk) {
//这里会打印的是ascII 值 ,所以可以toString查看详情自己看得懂的样子
  console.log({ chunk }, "chunk.toString", chunk.toString()); 
  //如果想每一段事件 读一点 可以用rs.pause() 做暂停,然后计时器 里rs.resume()再次触发data事件
  rs.pause();//暂停读取
});
rs.on("close", function () {
  //当文件读取完毕后 会 触发 end事件
  console.log("close");
});
setInterval(() => {
  rs.resume(); //再次触发data,直到读完数据为止
}, 1000);
  • 题外话:想说下 文件流和普通可读流的区别

1、open 和close是文件流独有,支持open和close便是文件流

2、可读流都具备 (on('data'),on('end'),on('error'),resume,pause;所以只要支持这些方法就是可读流

可写流写入文件的过程

  • 写入文件代码过程
const fs = require("fs");
const path = require("path");
const bPath = path.join(__dirname, "b.txt");
let ws = fs.createWriteStream(bPath, {
//参数和可读流的类似
  flags: "w",
  encoding: "utf-8",
  autoClose: true,
  start: 0,
  highWaterMark: 3,
});
ws.on("open", function (fd) {
  console.log("open", fd);
});
ws.on("close", function () {
  console.log("close");
});

//write的参数string 或者buffer,ws.write 还有一个boolea的返回值表示是真实写入文件还是放入缓存中
ws.write("1");
let flag = ws.write("1");
console.log({ flag });//true
flag = ws.write("1");
console.log({ flag });//true
flag = ws.write("1");
console.log({ flag });//false

双工流的写入和读取过程

  • 写一个本地服务 做例子

1、server(服务器代码)实现

const net = require("net"); //net 模块是 node自己封装的tcp层
//socket 就是双工流 能读能写  http源码就是用net模块写的 基于tcp
const server = net.createServer(function (socket) {
  socket.on("data", function (data) {//监听客户端发来的消息
    console.log(data.toString)
    socket.write("server:hello");//写入server:hello
  });
  socket.on("end", function () {
    console.log("客户端关闭");
  });
});
server.on("err", function (err) {
  console.log(err);
});
server.listen(8080);//服务端监听8080端口

2、client(客户端) 实现

const net = require("net"); //net 模块是 node自己封装的tcp层
const socket = new net.Socket(); //
socket.connect(8080, "localhost"); //  表示链接服务器本地8080端口
socket.on("connect", function (data) {
  //和服务器建立链接后
  socket.write("connect server");
});
socket.on("data", function (data) {
  //监听数据,读取服务器传来的数据
  console.log(data.toString());
  socket.destroy()
});
socket.write('ok')
socket.on("error", function (err) {
  console.log(err);
});

3.题外话 如果想看tcp的三次握手和四次挥手 可以 通过我上述代码 用wireshark(一个抓包工具)看实际过程

转化流 transform过程

转化流是双工流的一种, 允许实现输入,并在对数据执行某些操作后返回输出,两者有依赖关系

const stream = require('stream')
let c = 0;
const readable = stream.Readable({
  highWaterMark: 2,
  read: function () {
    let data = c < 26 ? Number(c++ + 97) : null;
    console.log(&#39;push&#39;, data);
    this.push( String.fromCharCode(data));
}
})

const transform = stream.Transform({
  highWaterMark: 2,
  transform: function (buf, enc, next) {
    console.log(&#39;transform&#39;, buf.toString());
    next(null, buf);
  }
})

readable.pipe(transform);
  • 打印结果

1.gif

可读流的实现

跟着断点先了解 可读流的调用过程

就前面可读流文件的读取过程的代码为例子 打断点

rs.on('open')

rs.on('open')为断点入口进入

2.gif

1、通过Stream.prototype.on.call 继承Stream类

源文件位置:no dlib/_stream_readable.js(我是通过断点点到这里 直接找,我也没找到)

3.gif

  • 再点进去 发现 Stream 是EventEmitter的子类 那么 可读流也可以支持发布订阅

4.gif

2、监听的事件类型是否是data和readable任意一个 不是 继续 下一个事件的监听

5.gif

rs.on('data')

6.gif

  • data的部分做两件事

    1、判断flowing(默认值是null)不为false 就自动resume方法执行继续 文件读取(这里我的案例是rs.pause();手动将flowing 值为false了所以不会继续调用)

    2、那如果我没有调用rs.pause() 会继续调用resume 看看resume里做了什么

7.gif

2.1 最终调用了 stream.read()继续读取文件;直到文件读取结束依次去emit end 和close事件

  小结:所以data默认是会不断的读取文件直到文件读取完毕 ,如果想要文件读取变可控可以和我一样用rs.pause()

自己实现

实现思路

继承EventEmitter发布订阅管理我们的事件

const fs = require("fs");
const EventEmitter = require("events");
class ReadStream extends EventEmitter {

}
module.exports = ReadStream;

数据初始化

constructor(path, options = {}) {
    super();
    //参考fs 写实例需要用到的参数
    this.path = path;
    this.flags = options.flags || "r";
    this.encoding - options.encoding || null;//默认编码格式是buffer
    this.autoClose = options.autoClose || true;//相当于需要调用close方法,如果为false  文件读取end的时候 就不会执行 close
    this.start = options.start || 0;//数据读取的开始位置
    this.end = options.end;
    this.highWaterMark = options.highWaterMark || 64 * 1024;//默认一次读取64个字节的数据 
    this.offset = this.start;//fs.read的偏移量
    this.fd = undefined; //初始化fd 用于 open成功后的fd做赋值  供 read里使用
    this.flowing = false;//实现pause和resume备用,设置flag,当监听到data事件的时候 改 flowing为true,
    this.open(); //初始化的时候就要调用open
    this.on("readStreamListener", function (type) {
      // console.log(type)//这里打印就能看到 实例上所有 通过on 绑定的事件名称
      if (type === "data") {
      //监听到data事件的时候 改 flowing为true
        this.flowing = true;
        this.read();
      }
    });
    }

文件读取方法read,pause,resume,open和destroy的实现

open()

 open() {
 // 调用fs.open 读取目标文件 
    fs.open(this.path, this.flags, (err, fd) => { 
      this.fd = fd; //赋值一个fd 供后面的 read()方式使用,文件读取成功,fd是返回一个数字
      this.emit("open", fd);
    });

read()

 read() {
   // console.log("一开始read里的", this.fd); //但是这样依旧拿不到 open后的fd,用 发布订阅 通过on来获取 绑定的事件type
    //这里要做一个容错处理 ,因为open是异步读取文件,read里无法马上拿到open结果
  if (typeof this.fd !== "number") {
      //订阅open,给绑定一个回调事件read 直到this.fd有值
      return this.once("open", () => this.read());
    }
 }
  //fd打开后 调用fs.read
  //实例上的start值是未知number,存在实际剩余的可读的文件大小<highWaterMar的情况 ,用howMuchToRead 替换highWaterMark 去做fs.read的每次读取buffer的大小
    let howMuchToRead = this.end
      ? Math.min(this.end - this.offset + 1, this.highWaterMark)
      : this.highWaterMark;
  //定义一个用户 传进来的highWaterMark 大小的buffer对象
    const buffer = Buffer.alloc(this.highWaterMark);
       //读取文件中的内容fd给buffer 从0位置开始,每次读取howMuchToRead个。插入数据,同时更新偏移量
    fs.read(
      this.fd,
      buffer,
      0,
      howMuchToRead,
      this.offset,
      (err, bytesRead) => {
        if (bytesRead) {
          // 每读完一次,偏移量=已经读到的数量
          this.offset += bytesRead;
          this.emit("data", buffer.slice(0, bytesRead));
          //写到这里实例上的data 已经可以打印出数据了 但是 继续读取 调用this.read() 直到bytesRead不存在 说明数据读取完毕了 走else
          //回调 this.read();时候判断 this.flowing 是否为true
          //pause调用后this.flowing将为false
          if (this.flowing) {
            this.read();
          }
        } else {
          // 执行到这 bytesRead不存在说明  文件数据读取完毕了已经 触发end
          this.emit("end");//emit 实例上绑定的end事件
          //destroy 还没写到 稍等 马上后面就实现...
          this.destroy();
        }
      }
    );

resume()

文件读取不去data事件,会触发对应的回调,不停的触发 所以想要变可控可以手动调用 resume()& pause()

  • pause的实现,调用的时候设置 this.flowing=false,打断 read()
  pause() {
    this.flowing = false;
  }

pause()

  • pause 打断 read()多次读取,可以使用resume 打开 this.flowing=true 并调用read
resume() {
    if (!this.flowing) {
      this.flowing = true;
      this.read();
    }
  }

destroy()

  • 文件open不成功时候抛错时调用
  • 文件读取完毕后&&this.autoClose===true ,read()里文件读取end的时候 就执行close
  destroy(err) {
    if (err) {
      this.emit("error");
    }
    // 把close放destroy里 并 在read里调用
    if (this.autoClose) {
      fs.close(this.fd, () => {
        this.emit("close");
      });
    }
  }

完整代码

  • 实现代码
/**
 *实现简单的可读流
 */

const fs = require("fs");
const EventEmitter = require("events");
class ReadStream extends EventEmitter {
  constructor(path, options = {}) {
    super();
    //参考fs 写实例需要用到的参数
    this.path = path;
    this.flags = options.flags || "r";
    this.encoding - options.encoding || null;
    this.autoClose = options.autoClose || true;
    this.start = options.start || 0;
    this.end = options.end;
    this.highWaterMark = options.highWaterMark || 64 * 1024;
    this.fd = undefined;
    this.offset = this.start;
    this.flowing = false;
    this.open(); 
    this.on("newListener", function (type) {
      if (type === "data") {
        this.flowing = true;
        this.read();
      }
    });
  }
  destroy(err) {
    if (err) {
      this.emit("error");
    }
    if (this.autoClose) {
      fs.close(this.fd, () => {
        this.emit("close");
      });
    }
  }
  open() {
    fs.open(this.path, this.flags, (err, fd) => {
      if (err) {
        return this.destroy(err);
      }
      this.fd = fd;
      this.emit("open", fd);
    });
  }
  resume() {
    if (!this.flowing) {
      this.flowing = true;
      this.read();
    }
  }
  pause() {
    this.flowing = false;
  }

  read() {
    if (typeof this.fd !== "number") {
      return this.once("open", () => this.read());
    }
    let howMuchToRead = this.end
      ? Math.min(this.end - this.offset + 1, this.highWaterMark)
      : this.highWaterMark;
    const buffer = Buffer.alloc(this.highWaterMark);
    fs.read(
      this.fd,
      buffer,
      0,
      howMuchToRead,
      this.offset,
      (err, bytesRead) => {
        if (bytesRead) {
          this.offset += bytesRead;
          this.emit("data", buffer.slice(0, bytesRead));
          if (this.flowing) {
            this.read();
          }
        } else {
          this.emit("end");
          this.destroy();
        }
      }
    );
  }
}

module.exports = ReadStream;
  • 调用代码
const ReadStream = require("./initReadStream");
let rs = new ReadStream(aPath, {
  flags: "r",
  encoding: null, //默认编码格式是buffer
  autoClose: true, //相当于需要调用close方法,如果为false  文件读取end的时候 就不会执行 close
  start: 0,
  highWaterMark: 3, //每次读取的个数 默认是64*1024个字节
});

可写流的实现

待续...

更多编程相关知识,请访问:编程视频!!

以上是浅谈Nodejs中的可读流,可读流如何实现?的详细内容。更多信息请关注PHP中文网其他相关文章!

声明
本文转载于:掘金社区。如有侵权,请联系admin@php.cn删除
C和JavaScript:连接解释C和JavaScript:连接解释Apr 23, 2025 am 12:07 AM

C 和JavaScript通过WebAssembly实现互操作性。1)C 代码编译成WebAssembly模块,引入到JavaScript环境中,增强计算能力。2)在游戏开发中,C 处理物理引擎和图形渲染,JavaScript负责游戏逻辑和用户界面。

从网站到应用程序:JavaScript的不同应用从网站到应用程序:JavaScript的不同应用Apr 22, 2025 am 12:02 AM

JavaScript在网站、移动应用、桌面应用和服务器端编程中均有广泛应用。1)在网站开发中,JavaScript与HTML、CSS一起操作DOM,实现动态效果,并支持如jQuery、React等框架。2)通过ReactNative和Ionic,JavaScript用于开发跨平台移动应用。3)Electron框架使JavaScript能构建桌面应用。4)Node.js让JavaScript在服务器端运行,支持高并发请求。

Python vs. JavaScript:比较用例和应用程序Python vs. JavaScript:比较用例和应用程序Apr 21, 2025 am 12:01 AM

Python更适合数据科学和自动化,JavaScript更适合前端和全栈开发。1.Python在数据科学和机器学习中表现出色,使用NumPy、Pandas等库进行数据处理和建模。2.Python在自动化和脚本编写方面简洁高效。3.JavaScript在前端开发中不可或缺,用于构建动态网页和单页面应用。4.JavaScript通过Node.js在后端开发中发挥作用,支持全栈开发。

C/C在JavaScript口译员和编译器中的作用C/C在JavaScript口译员和编译器中的作用Apr 20, 2025 am 12:01 AM

C和C 在JavaScript引擎中扮演了至关重要的角色,主要用于实现解释器和JIT编译器。 1)C 用于解析JavaScript源码并生成抽象语法树。 2)C 负责生成和执行字节码。 3)C 实现JIT编译器,在运行时优化和编译热点代码,显着提高JavaScript的执行效率。

JavaScript在行动中:现实世界中的示例和项目JavaScript在行动中:现实世界中的示例和项目Apr 19, 2025 am 12:13 AM

JavaScript在现实世界中的应用包括前端和后端开发。1)通过构建TODO列表应用展示前端应用,涉及DOM操作和事件处理。2)通过Node.js和Express构建RESTfulAPI展示后端应用。

JavaScript和Web:核心功能和用例JavaScript和Web:核心功能和用例Apr 18, 2025 am 12:19 AM

JavaScript在Web开发中的主要用途包括客户端交互、表单验证和异步通信。1)通过DOM操作实现动态内容更新和用户交互;2)在用户提交数据前进行客户端验证,提高用户体验;3)通过AJAX技术实现与服务器的无刷新通信。

了解JavaScript引擎:实施详细信息了解JavaScript引擎:实施详细信息Apr 17, 2025 am 12:05 AM

理解JavaScript引擎内部工作原理对开发者重要,因为它能帮助编写更高效的代码并理解性能瓶颈和优化策略。1)引擎的工作流程包括解析、编译和执行三个阶段;2)执行过程中,引擎会进行动态优化,如内联缓存和隐藏类;3)最佳实践包括避免全局变量、优化循环、使用const和let,以及避免过度使用闭包。

Python vs. JavaScript:学习曲线和易用性Python vs. JavaScript:学习曲线和易用性Apr 16, 2025 am 12:12 AM

Python更适合初学者,学习曲线平缓,语法简洁;JavaScript适合前端开发,学习曲线较陡,语法灵活。1.Python语法直观,适用于数据科学和后端开发。2.JavaScript灵活,广泛用于前端和服务器端编程。

See all articles

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

Video Face Swap

Video Face Swap

使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热工具

SublimeText3 Linux新版

SublimeText3 Linux新版

SublimeText3 Linux最新版

VSCode Windows 64位 下载

VSCode Windows 64位 下载

微软推出的免费、功能强大的一款IDE编辑器

MinGW - 适用于 Windows 的极简 GNU

MinGW - 适用于 Windows 的极简 GNU

这个项目正在迁移到osdn.net/projects/mingw的过程中,你可以继续在那里关注我们。MinGW:GNU编译器集合(GCC)的本地Windows移植版本,可自由分发的导入库和用于构建本地Windows应用程序的头文件;包括对MSVC运行时的扩展,以支持C99功能。MinGW的所有软件都可以在64位Windows平台上运行。

Dreamweaver Mac版

Dreamweaver Mac版

视觉化网页开发工具

DVWA

DVWA

Damn Vulnerable Web App (DVWA) 是一个PHP/MySQL的Web应用程序,非常容易受到攻击。它的主要目标是成为安全专业人员在合法环境中测试自己的技能和工具的辅助工具,帮助Web开发人员更好地理解保护Web应用程序的过程,并帮助教师/学生在课堂环境中教授/学习Web应用程序安全。DVWA的目标是通过简单直接的界面练习一些最常见的Web漏洞,难度各不相同。请注意,该软件中