这篇文章主要介绍了Node.js pipe实现源码解析,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
从前面两篇文章,我们了解到。想要把 Readable 的数据写到 Writable,就必须先手动的将数据读入内存,然后写入 Writable。换句话说,每次传递数据时,都需要写如下的模板代码
readable.on('readable', (err) => { if(err) throw err writable.write(readable.read()) })
为了方便使用,Node.js 提供了 pipe() 方法,让我们可以优雅的传递数据
readable.pipe(writable)
现在,就让我们来看看它是如何实现的吧
pipe
首先需要先调用 Readable 的 pipe() 方法
// lib/_stream_readable.js Readable.prototype.pipe = function(dest, pipeOpts) { var src = this; var state = this._readableState; // 记录 Writable switch (state.pipesCount) { case 0: state.pipes = dest; break; case 1: state.pipes = [state.pipes, dest]; break; default: state.pipes.push(dest); break; } state.pipesCount += 1; // ... src.once('end', endFn); dest.on('unpipe', onunpipe); // ... dest.on('drain', ondrain); // ... src.on('data', ondata); // ... // 保证 error 事件触发时,onerror 首先被执行 prependListener(dest, 'error', onerror); // ... dest.once('close', onclose); // ... dest.once('finish', onfinish); // ... // 触发 Writable 的 pipe 事件 dest.emit('pipe', src); // 将 Readable 改为 flow 模式 if (!state.flowing) { debug('pipe resume'); src.resume(); } return dest; };
执行 pipe() 函数时,首先将 Writable 记录到 state.pipes 中,然后绑定相关事件,最后如果 Readable 不是 flow 模式,就调用 resume() 将 Readable 改为 flow 模式
传递数据
Readable 从数据源获取到数据后,触发 data 事件,执行 ondata()
ondata() 相关代码:
// lib/_stream_readable.js // 防止在 dest.write(chunk) 内调用 src.push(chunk) 造成 awaitDrain 重复增加,awaitDrain 不能清零,Readable 卡住的情况 // 详情见 https://github.com/nodejs/node/issues/7278 var increasedAwaitDrain = false; function ondata(chunk) { debug('ondata'); increasedAwaitDrain = false; var ret = dest.write(chunk); if (false === ret && !increasedAwaitDrain) { // 防止在 dest.write() 内调用 src.unpipe(dest),导致 awaitDrain 不能清零,Readable 卡住的情况 if (((state.pipesCount === 1 && state.pipes === dest) || (state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1) ) && !cleanedUp) { debug('false write response, pause', src._readableState.awaitDrain); src._readableState.awaitDrain++; increasedAwaitDrain = true; } // 进入 pause 模式 src.pause(); } }
在 ondata(chunk) 函数内,通过 dest.write(chunk) 将数据写入 Writable
此时,在 _write() 内部可能会调用 src.push(chunk) 或使其 unpipe,这会导致 awaitDrain 多次增加,不能清零,Readable 卡住
当不能再向 Writable 写入数据时,Readable 会进入 pause 模式,直到所有的 drain 事件触发
触发 drain 事件,执行 ondrain()
// lib/_stream_readable.js var ondrain = pipeOnDrain(src); function pipeOnDrain(src) { return function() { var state = src._readableState; debug('pipeOnDrain', state.awaitDrain); if (state.awaitDrain) state.awaitDrain--; // awaitDrain === 0,且有 data 监听器 if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) { state.flowing = true; flow(src); } }; }
每个 drain 事件触发时,都会减少 awaitDrain,直到 awaitDrain 为 0。此时,调用 flow(src),使 Readable 进入 flow 模式
到这里,整个数据传递循环已经建立,数据会顺着循环源源不断的流入 Writable,直到所有数据写入完成
unpipe
不管写入过程中是否出现错误,最后都会执行 unpipe()
// lib/_stream_readable.js // ... function unpipe() { debug('unpipe'); src.unpipe(dest); } // ... Readable.prototype.unpipe = function(dest) { var state = this._readableState; var unpipeInfo = { hasUnpiped: false }; // 啥也没有 if (state.pipesCount === 0) return this; // 只有一个 if (state.pipesCount === 1) { if (dest && dest !== state.pipes) return this; // 没有指定就 unpipe 所有 if (!dest) dest = state.pipes; state.pipes = null; state.pipesCount = 0; state.flowing = false; if (dest) dest.emit('unpipe', this, unpipeInfo); return this; } // 没有指定就 unpipe 所有 if (!dest) { var dests = state.pipes; var len = state.pipesCount; state.pipes = null; state.pipesCount = 0; state.flowing = false; for (var i = 0; i < len; i++) dests[i].emit('unpipe', this, unpipeInfo); return this; } // 找到指定 Writable,并 unpipe var index = state.pipes.indexOf(dest); if (index === -1) return this; state.pipes.splice(index, 1); state.pipesCount -= 1; if (state.pipesCount === 1) state.pipes = state.pipes[0]; dest.emit('unpipe', this, unpipeInfo); return this; };
Readable.prototype.unpipe() 函数会根据 state.pipes 属性和 dest 参数,选择执行策略。最后会触发 dest 的 unpipe 事件
unpipe 事件触发后,调用 onunpipe(),清理相关数据
// lib/_stream_readable.js function onunpipe(readable, unpipeInfo) { debug('onunpipe'); if (readable === src) { if (unpipeInfo && unpipeInfo.hasUnpiped === false) { unpipeInfo.hasUnpiped = true; // 清理相关数据 cleanup(); } } }
End
在整个 pipe 的过程中,Readable 是主动方 ( 负责整个 pipe 过程:包括数据传递、unpipe 与异常处理 ),Writable 是被动方 ( 只需要触发 drain 事件 )
总结一下 pipe 的过程:
首先执行 readbable.pipe(writable),将 readable 与 writable 对接上
当 readable 中有数据时,readable.emit('data'),将数据写入 writable
如果 writable.write(chunk) 返回 false,则进入 pause 模式,等待 drain 事件触发
drain 事件全部触发后,再次进入 flow 模式,写入数据
不管数据写入完成或发生中断,最后都会调用 unpipe()
unpipe() 调用 Readable.prototype.unpipe(),触发 dest 的 unpipe 事件,清理相关数据
以上是Node.js pipe()方法介绍的详细内容。更多信息请关注PHP中文网其他相关文章!

不同JavaScript引擎在解析和执行JavaScript代码时,效果会有所不同,因为每个引擎的实现原理和优化策略各有差异。1.词法分析:将源码转换为词法单元。2.语法分析:生成抽象语法树。3.优化和编译:通过JIT编译器生成机器码。4.执行:运行机器码。V8引擎通过即时编译和隐藏类优化,SpiderMonkey使用类型推断系统,导致在相同代码上的性能表现不同。

JavaScript在现实世界中的应用包括服务器端编程、移动应用开发和物联网控制:1.通过Node.js实现服务器端编程,适用于高并发请求处理。2.通过ReactNative进行移动应用开发,支持跨平台部署。3.通过Johnny-Five库用于物联网设备控制,适用于硬件交互。

我使用您的日常技术工具构建了功能性的多租户SaaS应用程序(一个Edtech应用程序),您可以做同样的事情。 首先,什么是多租户SaaS应用程序? 多租户SaaS应用程序可让您从唱歌中为多个客户提供服务

本文展示了与许可证确保的后端的前端集成,并使用Next.js构建功能性Edtech SaaS应用程序。 前端获取用户权限以控制UI的可见性并确保API要求遵守角色库

JavaScript是现代Web开发的核心语言,因其多样性和灵活性而广泛应用。1)前端开发:通过DOM操作和现代框架(如React、Vue.js、Angular)构建动态网页和单页面应用。2)服务器端开发:Node.js利用非阻塞I/O模型处理高并发和实时应用。3)移动和桌面应用开发:通过ReactNative和Electron实现跨平台开发,提高开发效率。

JavaScript的最新趋势包括TypeScript的崛起、现代框架和库的流行以及WebAssembly的应用。未来前景涵盖更强大的类型系统、服务器端JavaScript的发展、人工智能和机器学习的扩展以及物联网和边缘计算的潜力。

JavaScript是现代Web开发的基石,它的主要功能包括事件驱动编程、动态内容生成和异步编程。1)事件驱动编程允许网页根据用户操作动态变化。2)动态内容生成使得页面内容可以根据条件调整。3)异步编程确保用户界面不被阻塞。JavaScript广泛应用于网页交互、单页面应用和服务器端开发,极大地提升了用户体验和跨平台开发的灵活性。

Python更适合数据科学和机器学习,JavaScript更适合前端和全栈开发。 1.Python以简洁语法和丰富库生态着称,适用于数据分析和Web开发。 2.JavaScript是前端开发核心,Node.js支持服务器端编程,适用于全栈开发。


热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

AI Hentai Generator
免费生成ai无尽的。

热门文章

热工具

禅工作室 13.0.1
功能强大的PHP集成开发环境

Dreamweaver Mac版
视觉化网页开发工具

WebStorm Mac版
好用的JavaScript开发工具

SublimeText3 Mac版
神级代码编辑软件(SublimeText3)

mPDF
mPDF是一个PHP库,可以从UTF-8编码的HTML生成PDF文件。原作者Ian Back编写mPDF以从他的网站上“即时”输出PDF文件,并处理不同的语言。与原始脚本如HTML2FPDF相比,它的速度较慢,并且在使用Unicode字体时生成的文件较大,但支持CSS样式等,并进行了大量增强。支持几乎所有语言,包括RTL(阿拉伯语和希伯来语)和CJK(中日韩)。支持嵌套的块级元素(如P、DIV),