這篇文章帶大家了解一下Node.js中的4種stream,看看怎麼解決爆緩衝區的「背壓」問題,有需要的朋友可以去學習了解一下~
把一個東西從A 搬到B 該怎麼搬呢?
抬起來,移動到目的地,放下不就行了麼。
那如果這個東西有一噸重呢?
那就一部分的搬家。
其實IO 也就是搬東西,包括網路的IO、檔案的IO,如果資料量少,那麼直接傳送全部內容就行了,但如果內容特別多,一次性載入到記憶體會崩潰,而且速度也慢,這時候就可以一部分一部分的處理,這就是流的想法。 【推薦學習:《nodejs 教學》】
各種語言基本上都實作了stream 的api,Node.js 也是,stream api 是比較常用的,下面我們就來探究一下stream。
本文會回答以下問題:
- Node.js 的4 種stream 是什麼
- 產生器如何與Readable Stream 結合
- stream的暫停與流動
- 什麼是背壓問題,如何解決
Node.js 的4種stream
##流的直觀感受
從一個地方流到另一個地方,顯然有流出的一方和流入的一方,流出的一方就是可讀流(readable),而流入的一方就是可寫流(writable)。流的api
Node.js 提供的stream 就是上面介紹的4 種:const stream = require('stream'); // 可读流 const Readable = stream.Readable; // 可写流 const Writable = stream.Writable; // 双工流 const Duplex = stream.Duplex; // 转换流 const Transform = stream.Transform;它們都有要實現的方法:
- Readable 需要實作_read 方法來傳回內容
- Writable 需要實作_write 方法來接受內容
- Duplex 需要實作_read 和_write 方法來接受和返回內容
- Transform 需要實作_transform 方法來把接受的內容轉換之後回傳
Readable
Readable 要實作_read 方法,透過push 傳回特定的資料。const Stream = require('stream'); const readableStream = Stream.Readable(); readableStream._read = function() { this.push('阿门阿前一棵葡萄树,'); this.push('阿东阿东绿的刚发芽,'); this.push('阿东背着那重重的的壳呀,'); this.push('一步一步地往上爬。') this.push(null); } readableStream.on('data', (data)=> { console.log(data.toString()) }); readableStream.on('end', () => { console.log('done~'); });當 push 一個 null 時,就代表結束流。 執行效果如下:
const Stream = require('stream'); class ReadableDong extends Stream.Readable { constructor() { super(); } _read() { this.push('阿门阿前一棵葡萄树,'); this.push('阿东阿东绿的刚发芽,'); this.push('阿东背着那重重的的壳呀,'); this.push('一步一步地往上爬。') this.push(null); } } const readableStream = new ReadableDong(); readableStream.on('data', (data)=> { console.log(data.toString()) }); readableStream.on('end', () => { console.log('done~'); });可讀流是產生內容的,那麼很自然可以和生成器結合:
const Stream = require('stream'); class ReadableDong extends Stream.Readable { constructor(iterator) { super(); this.iterator = iterator; } _read() { const next = this.iterator.next(); if(next.done) { return this.push(null); } else { this.push(next.value) } } } function *songGenerator() { yield '阿门阿前一棵葡萄树,'; yield '阿东阿东绿的刚发芽,'; yield '阿东背着那重重的的壳呀,'; yield '一步一步地往上爬。'; } const songIterator = songGenerator(); const readableStream = new ReadableDong(songIterator); readableStream.on('data', (data)=> { console.log(data.toString()) }); readableStream.on('end', () => { console.log('done~'); });這就是可讀流,透過實作_read 方法來傳回內容。
Writable
Writable 要實作 _write 方法,接收寫入的內容。const Stream = require('stream'); const writableStream = Stream.Writable(); writableStream._write = function (data, enc, next) { console.log(data.toString()); // 每秒写一次 setTimeout(() => { next(); }, 1000); } writableStream.on('finish', () => console.log('done~')); writableStream.write('阿门阿前一棵葡萄树,'); writableStream.write('阿东阿东绿的刚发芽,'); writableStream.write('阿东背着那重重的的壳呀,'); writableStream.write('一步一步地往上爬。'); writableStream.end();接收寫入的內容,列印出來,並且呼叫 next 來處理下一個寫入的內容,這裡呼叫 next 是非同步的,可以控制頻率。 跑了一下,確實可以正常的處理寫入的內容:
Duplex
Duplex 是可讀可寫,同時實作_read 和_write 就可以了const Stream = require('stream'); var duplexStream = Stream.Duplex(); duplexStream._read = function () { this.push('阿门阿前一棵葡萄树,'); this.push('阿东阿东绿的刚发芽,'); this.push('阿东背着那重重的的壳呀,'); this.push('一步一步地往上爬。') this.push(null); } duplexStream._write = function (data, enc, next) { console.log(data.toString()); next(); } duplexStream.on('data', data => console.log(data.toString())); duplexStream.on('end', data => console.log('read done~')); duplexStream.write('阿门阿前一棵葡萄树,'); duplexStream.write('阿东阿东绿的刚发芽,'); duplexStream.write('阿东背着那重重的的壳呀,'); duplexStream.write('一步一步地往上爬。'); duplexStream.end(); duplexStream.on('finish', data => console.log('write done~'));整合了Readable 流和Writable 流的功能,這就是雙工流Duplex。
Transform
Duplex 串流雖然可讀可寫,但是兩者之間沒啥關聯,而有的時候需要對流入的內容做轉換之後流出,這時候就需要轉換流Transform。 Transform 流要實作_transform 的api,我們實作下對內容做反轉的轉換流:const Stream = require('stream'); class TransformReverse extends Stream.Transform { constructor() { super() } _transform(buf, enc, next) { const res = buf.toString().split('').reverse().join(''); this.push(res) next() } } var transformStream = new TransformReverse(); transformStream.on('data', data => console.log(data.toString())) transformStream.on('end', data => console.log('read done~')); transformStream.write('阿门阿前一棵葡萄树'); transformStream.write('阿东阿东绿的刚发芽'); transformStream.write('阿东背着那重重的的壳呀'); transformStream.write('一步一步地往上爬'); transformStream.end() transformStream.on('finish', data => console.log('write done~'));跑了一下,效果如下:
##
流的暂停和流动
我们从 Readable 流中获取内容,然后流入 Writable 流,两边分别做 _read 和 _write 的实现,就实现了流动。
背压
但是 read 和 write 都是异步的,如果两者速率不一致呢?
如果 Readable 读入数据的速率大于 Writable 写入速度的速率,这样就会积累一些数据在缓冲区,如果缓冲的数据过多,就会爆掉,会丢失数据。
而如果 Readable 读入数据的速率小于 Writable 写入速度的速率呢?那没关系,最多就是中间有段空闲时期。
这种读入速率大于写入速率的现象叫做“背压”
,或者“负压”
。也很好理解,写入段压力比较大,写不进去了,会爆缓冲区,导致数据丢失。
这个缓冲区大小可以通过 readableHighWaterMark 和 writableHightWaterMark 来查看,是 16k。
解决背压
怎么解决这种读写速率不一致的问题呢?
当没写完的时候,暂停读就行了。这样就不会读入的数据越来越多,驻留在缓冲区。
readable stream 有个 readableFlowing 的属性,代表是否自动读入数据,默认为 true,也就是自动读入数据,然后监听 data 事件就可以拿到了。
当 readableFlowing 设置为 false 就不会自动读了,需要手动通过 read 来读入。
readableStream.readableFlowing = false; let data; while((data = readableStream.read()) != null) { console.log(data.toString()); }
但自己手动 read 比较麻烦,我们依然可以用自动流入的方式,调用 pause 和 resume 来暂停和恢复就行了。
当调用 writable stream 的 write 方法的时候会返回一个 boolean 值代表是写入了目标还是放在了缓冲区:
- true: 数据已经写入目标
- false:目标不可写入,暂时放在缓冲区
我们可以判断返回 false 的时候就 pause,然后等缓冲区清空了就 resume:
const rs = fs.createReadStream(src); const ws = fs.createWriteStream(dst); rs.on('data', function (chunk) { if (ws.write(chunk) === false) { rs.pause(); } }); rs.on('end', function () { ws.end(); }); ws.on('drain', function () { rs.resume(); });
这样就能达到根据写入速率暂停和恢复读入速率的功能,解决了背压问题。
pipe 有背压问题么?
平时我们经常会用 pipe 来直接把 Readable 流对接到 Writable 流,但是好像也没遇到过背压问题,其实是 pipe 内部已经做了读入速率的动态调节了。
const rs = fs.createReadStream(src); const ws = fs.createWriteStream(dst); rs.pipe(ws);
总结
流是传输数据时常见的思想,就是一部分一部分的传输内容,是文件读写、网络通信的基础概念。
Node.js 也提供了 stream 的 api,包括 Readable 可读流、Writable 可写流、Duplex 双工流、Transform 转换流。它们分别实现 _read、_write、_read + _write、_transform 方法,来做数据的返回和处理。
创建 Readable 对象既可以直接调用 Readable api 创建,然后重写 _read 方法,也可以继承 Readable 实现一个子类,之后实例化。其他流同理。(Readable 可以很容易的和 generator 结合)
当读入的速率大于写入速率的时候就会出现“背压”现象,会爆缓冲区导致数据丢失,解决的方式是根据 write 的速率来动态 pause 和 resume 可读流的速率。pipe 就没有这个问题,因为内部做了处理。
流是掌握 IO 绕不过去的一个概念,而背压问题也是流很常见的问题,遇到了数据丢失可以考虑是否发生了背压。希望这篇文章能够帮大家理清思路,真正掌握 stream!
更多编程相关知识,请访问:编程入门!!
以上是深入了解Node.js中的4種 stream的詳細內容。更多資訊請關注PHP中文網其他相關文章!

Python更適合初學者,學習曲線平緩,語法簡潔;JavaScript適合前端開發,學習曲線較陡,語法靈活。 1.Python語法直觀,適用於數據科學和後端開發。 2.JavaScript靈活,廣泛用於前端和服務器端編程。

Python和JavaScript在社區、庫和資源方面的對比各有優劣。 1)Python社區友好,適合初學者,但前端開發資源不如JavaScript豐富。 2)Python在數據科學和機器學習庫方面強大,JavaScript則在前端開發庫和框架上更勝一籌。 3)兩者的學習資源都豐富,但Python適合從官方文檔開始,JavaScript則以MDNWebDocs為佳。選擇應基於項目需求和個人興趣。

從C/C 轉向JavaScript需要適應動態類型、垃圾回收和異步編程等特點。 1)C/C 是靜態類型語言,需手動管理內存,而JavaScript是動態類型,垃圾回收自動處理。 2)C/C 需編譯成機器碼,JavaScript則為解釋型語言。 3)JavaScript引入閉包、原型鍊和Promise等概念,增強了靈活性和異步編程能力。

不同JavaScript引擎在解析和執行JavaScript代碼時,效果會有所不同,因為每個引擎的實現原理和優化策略各有差異。 1.詞法分析:將源碼轉換為詞法單元。 2.語法分析:生成抽象語法樹。 3.優化和編譯:通過JIT編譯器生成機器碼。 4.執行:運行機器碼。 V8引擎通過即時編譯和隱藏類優化,SpiderMonkey使用類型推斷系統,導致在相同代碼上的性能表現不同。

JavaScript在現實世界中的應用包括服務器端編程、移動應用開發和物聯網控制:1.通過Node.js實現服務器端編程,適用於高並發請求處理。 2.通過ReactNative進行移動應用開發,支持跨平台部署。 3.通過Johnny-Five庫用於物聯網設備控制,適用於硬件交互。

我使用您的日常技術工具構建了功能性的多租戶SaaS應用程序(一個Edtech應用程序),您可以做同樣的事情。 首先,什麼是多租戶SaaS應用程序? 多租戶SaaS應用程序可讓您從唱歌中為多個客戶提供服務

本文展示了與許可證確保的後端的前端集成,並使用Next.js構建功能性Edtech SaaS應用程序。 前端獲取用戶權限以控制UI的可見性並確保API要求遵守角色庫

JavaScript是現代Web開發的核心語言,因其多樣性和靈活性而廣泛應用。 1)前端開發:通過DOM操作和現代框架(如React、Vue.js、Angular)構建動態網頁和單頁面應用。 2)服務器端開發:Node.js利用非阻塞I/O模型處理高並發和實時應用。 3)移動和桌面應用開發:通過ReactNative和Electron實現跨平台開發,提高開發效率。


熱AI工具

Undresser.AI Undress
人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover
用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool
免費脫衣圖片

Clothoff.io
AI脫衣器

AI Hentai Generator
免費產生 AI 無盡。

熱門文章

熱工具

mPDF
mPDF是一個PHP庫,可以從UTF-8編碼的HTML產生PDF檔案。原作者Ian Back編寫mPDF以從他的網站上「即時」輸出PDF文件,並處理不同的語言。與原始腳本如HTML2FPDF相比,它的速度較慢,並且在使用Unicode字體時產生的檔案較大,但支援CSS樣式等,並進行了大量增強。支援幾乎所有語言,包括RTL(阿拉伯語和希伯來語)和CJK(中日韓)。支援嵌套的區塊級元素(如P、DIV),

SAP NetWeaver Server Adapter for Eclipse
將Eclipse與SAP NetWeaver應用伺服器整合。

WebStorm Mac版
好用的JavaScript開發工具

MinGW - Minimalist GNU for Windows
這個專案正在遷移到osdn.net/projects/mingw的過程中,你可以繼續在那裡關注我們。 MinGW:GNU編譯器集合(GCC)的本機Windows移植版本,可自由分發的導入函式庫和用於建置本機Windows應用程式的頭檔;包括對MSVC執行時間的擴展,以支援C99功能。 MinGW的所有軟體都可以在64位元Windows平台上運作。

VSCode Windows 64位元 下載
微軟推出的免費、功能強大的一款IDE編輯器