搜尋
首頁web前端js教程Nodejs Stream 資料流使用手冊_node.js

1、介紹

本文介紹了使用 node.js streams 開發程式的基本方法。

<code class="hljs mizar">"We should have some ways of connecting programs like garden hose--screw in
another segment when it becomes necessary to massage data in
another way. This is the way of IO also."
Doug McIlroy. October 11, 1964</code>

最早接觸Stream是從早期的unix開始的數十年的實踐證明Stream 思想可以很簡單的開發出一些龐大的系統。在unix裡,Stream是透過 |實現的;在node中,作為內建的stream模組,許多核心模組和三方模組都使用到。和unix一樣,node Stream主要的操作也是.pipe(),使用者可以使用反壓力機制來控制讀取和寫入的平衡。

Stream 可以提供開發者可以重複使用統一的接口,透過抽象的Stream介面來控制Stream之間的讀寫平衡。

2.為什麼要用Stream

node中的I/O是異步的,因此對磁碟和網路的讀寫需要透過回調函數來讀取數據,以下是一個檔案下載伺服器的簡單程式碼:

<code class="hljs javascript">var http = require('http');
var fs = require('fs');
var server = http.createServer(function (req, res) {
fs.readFile(__dirname + '/data.txt', function (err, data) {
res.end(data);
});
});
server.listen(8000);</code>

這些程式碼可以實現需要的功能,但是服務在發送文件資料之前需要緩存整個文件資料到內存,如果"data.txt"文件很大且並發量很大的話,會浪費很多內存。因為用戶需要等到整個文件緩存到內存才能接受的文件數據,這樣導致用戶體驗相當不好。不過還好(req, res)兩個參數都是Stream,這樣我們可以用fs.createReadStream()來取代fs.readFile():

<code class="hljs javascript">var http = require('http');
var fs = require('fs');
var server = http.createServer(function (req, res) {
var stream = fs.createReadStream(__dirname + '/data.txt');
stream.pipe(res);
});
server.listen(8000);</code>

.pipe()方法監聽fs.createReadStream()的'data' 和'end'事件,這樣"data.txt"文件就不需要緩存整個文件,當客戶端連接完成之後馬上可以發送一個資料塊到客戶端。使用.pipe()另一個好處是可以解決當客戶端延遲非常大時導致的讀寫不平衡問題。如果想壓縮檔案再發送,可以使用三方模組實現:

<code class="hljs javascript">var http = require('http');
var fs = require('fs');
var oppressor = require('oppressor');
var server = http.createServer(function (req, res) {
var stream = fs.createReadStream(__dirname + '/data.txt');
stream.pipe(oppressor(req)).pipe(res);
});
server.listen(8000);</code>

這樣檔案就會對支援gzip和deflate的瀏覽器進行壓縮。 oppressor 模組會處理所有的content-encoding。

Stream讓開發程式變得簡單。

3、基礎概念

有五種基本的Stream: readable, writable, transform, duplex, and”classic」.

3-1、pipe

所有類型的Stream收是使用 .pipe() 來建立一個輸入輸出對,接收一個可讀流src並將其資料輸出到可寫流dst,如下:

<code class="hljs perl">src.pipe(dst)</code>

.pipe( dst )方法為傳回dst流,這樣就可以接連使用多個.pipe(),如下:

<code class="hljs perl">a.pipe( b ).pipe( c ).pipe( d )</code>

功能與下面的程式碼相同:

<code class="hljs perl">a.pipe( b );
b.pipe( c );
c.pipe( d );</code>

3-2、readable streams

透過呼叫Readable streams的 .pipe()方法可以把Readable streams的資料寫入一個Writable , Transform, 或Duplex stream。

<code class="hljs perl">readableStream.pipe( dst )</code>

1>建立 readable stream

這裡我們建立一個readable stream!

<code class="hljs perl">var Readable = require('stream').Readable;
var rs = new Readable;
rs.push('beep ');
rs.push('boop\n');
rs.push(null);
rs.pipe(process.stdout);
$ node read0.js
beep boop
</code>

rs.push( null ) 通知資料接收者資料已經發送完畢.

注意到我們在將所有資料內容壓入可讀流之前並沒有呼叫rs.pipe(process.stdout);,但是我們壓入的所有資料內容還是完全的輸出了,這是因為可讀流在接收者沒有讀取資料之前,會快取所有壓入的資料。但是在很多情況下, 更好的方法是只有資料接收著請求資料的時候,才壓入資料到可讀流而不是快取整個資料。下面我們重寫 一下._read()函數:

<code class="hljs javascript">var Readable = require('stream').Readable;
var rs = Readable();
var c = 97;
rs._read = function () {
rs.push(String.fromCharCode(c++));
if (c > 'z'.charCodeAt(0)) rs.push(null);
};
rs.pipe(process.stdout);</code>
<code class="hljs bash">$ node read1.js
abcdefghijklmnopqrstuvwxyz</code>

上面的程式碼透過重寫_read()方法實現了只有在資料接受者請求資料才向可讀流中壓入資料。 _read()方法也可以接收一個size參數表示資料請求著請求的資料大小,但是可讀流可以根據需要忽略這個參數。

注意我們也可以用util.inherits()繼承可讀流。為了說明只有在資料接受者請求資料時_read()方法才被調用,我們在向可讀流壓入資料時做一個延時,如下:

<code class="hljs javascript">var Readable = require('stream').Readable;
var rs = Readable();
var c = 97 - 1;
rs._read = function () {
if (c >= 'z'.charCodeAt(0)) return rs.push(null);
setTimeout(function () {
rs.push(String.fromCharCode(++c));
}, 100);
};
rs.pipe(process.stdout);
process.on('exit', function () {
console.error('\n_read() called ' + (c - 97) + ' times');
});
process.stdout.on('error', process.exit);</code>

用下面的指令執行程式我們發現_read()方法只呼叫了5次:

<code class="hljs bash">$ node read2.js | head -c5
abcde
_read() called 5 times</code>

使用計時器的原因是系統需要時間來發送訊號來通知程式關閉管道。使用process.stdout.on('error', fn) 是為了處理系統因為header命令關閉管道而發送SIGPIPE訊號,因為這樣會導致process.stdout觸發EPIPE事件。如果想要建立一個的可以壓入任意形式資料的可讀流,只要在建立流的時候設定參數objectMode為true即可,例如:Readable({ objectMode: true })。

2>讀取readable stream資料

大部分情況下我們只要簡單的使用pipe方法將可讀流的資料重定向到另外形式的流,但是在某些情況下也許直接從可讀流中讀取資料更有用。如下:

<code class="hljs php">process.stdin.on('readable', function () {
var buf = process.stdin.read();
console.dir(buf);
});
$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume0.js 
<buffer 0a="" 61="" 62="" 63="">
<buffer 0a="" 64="" 65="" 66="">
<buffer 0a="" 67="" 68="" 69="">
null</buffer></buffer></buffer></code>

当可读流中有数据可读取时,流会触发'readable' 事件,这样就可以调用.read()方法来读取相关数据,当可读流中没有数据可读取时,.read() 会返回null,这样就可以结束.read() 的调用, 等待下一次'readable' 事件的触发。下面是一个使用.read(n)从标准输入每次读取3个字节的例子:

<code class="hljs javascript">process.stdin.on('readable', function () {
var buf = process.stdin.read(3);
console.dir(buf);
});</code>

如下运行程序发现,输出结果并不完全!

<code class="hljs bash">$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume1.js 
<buffer 61="" 62="" 63="">
<buffer 0a="" 64="" 65="">
<buffer 0a="" 66="" 67=""></buffer></buffer></buffer></code>

这是应为额外的数据数据留在流的内部缓冲区里了,而我们需要通知流我们要读取更多的数据.read(0)可以达到这个目的。

<code class="hljs javascript">process.stdin.on('readable', function () {
var buf = process.stdin.read(3);
console.dir(buf);
process.stdin.read(0);
});</code>

这次运行结果如下:

<code class="hljs xml">$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume2.js 
<buffer 0a="" 64="" 65="">
<buffer 0a="" 68="" 69=""></buffer></buffer></code>

我们可以使用 .unshift() 将数据重新押回流数据队列的头部,这样可以接续读取押回的数据。如下面的代码,会按行输出标准输入的内容:

<code class="hljs javascript">var offset = 0;
process.stdin.on('readable', function () {
var buf = process.stdin.read();
if (!buf) return;
for (; offset < buf.length; offset++) {
if (buf[offset] === 0x0a) {
console.dir(buf.slice(0, offset).toString());
buf = buf.slice(offset + 1);
offset = 0;
process.stdin.unshift(buf);
return;
}
}
process.stdin.unshift(buf);
});
$ tail -n +50000 /usr/share/dict/american-english | head -n10 | node lines.js 
'hearties'
'heartiest'
'heartily'
'heartiness'
'heartiness\'s'
'heartland'
'heartland\'s'
'heartlands'
'heartless'
'heartlessly'</code>

当然,有很多模块可以实现这个功能,如:split 。

3-3、writable streams

writable streams只可以作为.pipe()函数的目的参数。如下代码:

<code class="hljs perl">src.pipe( writableStream );</code>

1>创建 writable stream

重写 ._write(chunk, enc, next) 方法就可以接受一个readable stream的数据。

<code class="hljs php">var Writable = require('stream').Writable;
var ws = Writable();
ws._write = function (chunk, enc, next) {
console.dir(chunk);
next();
};
process.stdin.pipe(ws);
$ (echo beep; sleep 1; echo boop) | node write0.js 
<buffer 0a="" 62="" 65="" 70="">
<buffer 0a="" 62="" 6f="" 70=""></buffer></buffer></code>

第一个参数chunk是数据输入者写入的数据。第二个参数end是数据的编码格式。第三个参数next(err)通过回调函数通知数据写入者可以写入更多的时间。如果readable stream写入的是字符串,那么字符串会默认转换为Buffer,如果在创建流的时候设置Writable({ decodeStrings: false })参数,那么不会做转换。如果readable stream写入的数据时对象,那么需要这样创建writable stream

<code class="hljs css">Writable({ objectMode: true })</code>

2>写数据到 writable stream

调用writable stream的.write(data)方法即可完成数据写入。

<code class="hljs vala">process.stdout.write('beep boop\n');</code>

调用.end()方法通知writable stream 数据已经写入完成。

<code class="hljs javascript">var fs = require('fs');
var ws = fs.createWriteStream('message.txt');
ws.write('beep ');
setTimeout(function () {
ws.end('boop\n');
}, 1000);
$ node writing1.js 
$ cat message.txt
beep boop</code>

如果需要设置writable stream的缓冲区的大小,那么在创建流的时候,需要设置opts.highWaterMark,这样如果缓冲区里的数据超过opts.highWaterMark,.write(data)方法会返回false。当缓冲区可写的时候,writable stream会触发'drain' 事件。

3-4、classic streams

Classic streams比较老的接口了,最早出现在node 0.4版本中,但是了解一下其运行原理还是十分有好
处的。当一个流被注册了"data" 事件的回到函数,那么流就会工作在老版本模式下,即会使用老的API。

1>classic readable streams

Classic readable streams事件就是一个事件触发器,如果Classic readable streams有数据可读取,那么其触发 "data" 事件,等到数据读取完毕时,会触发"end" 事件。.pipe() 方法通过检查stream.readable 的值确定流是否有数据可读。下面是一个使用Classic readable streams打印A-J字母的例子:

<code class="hljs javascript">var Stream = require('stream');
var stream = new Stream;
stream.readable = true;
var c = 64;
var iv = setInterval(function () {
if (++c >= 75) {
clearInterval(iv);
stream.emit('end');
}
else stream.emit('data', String.fromCharCode(c));
}, 100);
stream.pipe(process.stdout);
$ node classic0.js
ABCDEFGHIJ</code>

如果要从classic readable stream中读取数据,注册"data" 和"end"两个事件的回调函数即可,代码如下:

<code class="hljs php">process.stdin.on('data', function (buf) {
console.log(buf);
});
process.stdin.on('end', function () {
console.log('__END__');
});
$ (echo beep; sleep 1; echo boop) | node classic1.js 
<buffer 0a="" 62="" 65="" 70="">
<buffer 0a="" 62="" 6f="" 70="">
__END__</buffer></buffer></code>

需要注意的是如果你使用这种方式读取数据,那么会失去使用新接口带来的好处。比如你在往一个 延迟非常大的流写数据时,需要注意读取数据和写数据的平衡问题,否则会导致大量数据缓存在内存中,导致浪费大量内存。一般这时候强烈建议使用流的.pipe()方法,这样就不用自己监听”data” 和”end”事件了,也不用担心读写不平衡的问题了。当然你也可以用 through代替自己监听”data” 和”end” 事件,如下面的代码:

<code class="hljs php">var through = require('through');
process.stdin.pipe(through(write, end));
function write (buf) {
console.log(buf);
}
function end () {
console.log('__END__');
}
$ (echo beep; sleep 1; echo boop) | node through.js 
<buffer 0a="" 62="" 65="" 70="">
<buffer 0a="" 62="" 6f="" 70="">
__END__</buffer></buffer></code>

或者也可以使用concat-stream来缓存整个流的内容:

<code class="hljs oxygene">var concat = require('concat-stream');
process.stdin.pipe(concat(function (body) {
console.log(JSON.parse(body));
}));
$ echo '{"beep":"boop"}' | node concat.js 
{ beep: 'boop' }</code>

当然如果你非要自己监听"data" 和"end"事件,那么你可以在写数据的流不可写的时候使用.pause()方法暂停Classic readable streams继续触发”data” 事件。等到写数据的流可写的时候再使用.resume() 方法通知流继续触发"data" 事件继续读取
数据。

2>classic writable streams

Classic writable streams 非常简单。只有 .write(buf), .end(buf)和.destroy()三个方法。.end(buf) 方法的buf参数是可选的,如果选择该参数,相当于stream.write(buf); stream.end() 这样的操作,需要注意的是当流的缓冲区写满即流不可写时.write(buf)方法会返回false,如果流再次可写时,流会触发drain事件。

4、transform

transform是一个对读入数据过滤然输出的流。

5、duplex

duplex stream是一个可读也可写的双向流,如下面的a就是一个duplex stream:

<code class="hljs livecodeserver">a.pipe(b).pipe(a)</code>

以上内容是小编给大家介绍的Nodejs Stream 数据流使用手册,希望对大家有所帮助!

陳述
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
Python和JavaScript的未來:趨勢和預測Python和JavaScript的未來:趨勢和預測Apr 27, 2025 am 12:21 AM

Python和JavaScript的未來趨勢包括:1.Python將鞏固在科學計算和AI領域的地位,2.JavaScript將推動Web技術發展,3.跨平台開發將成為熱門,4.性能優化將是重點。兩者都將繼續在各自領域擴展應用場景,並在性能上有更多突破。

Python vs. JavaScript:開發環境和工具Python vs. JavaScript:開發環境和工具Apr 26, 2025 am 12:09 AM

Python和JavaScript在開發環境上的選擇都很重要。 1)Python的開發環境包括PyCharm、JupyterNotebook和Anaconda,適合數據科學和快速原型開發。 2)JavaScript的開發環境包括Node.js、VSCode和Webpack,適用於前端和後端開發。根據項目需求選擇合適的工具可以提高開發效率和項目成功率。

JavaScript是用C編寫的嗎?檢查證據JavaScript是用C編寫的嗎?檢查證據Apr 25, 2025 am 12:15 AM

是的,JavaScript的引擎核心是用C語言編寫的。 1)C語言提供了高效性能和底層控制,適合JavaScript引擎的開發。 2)以V8引擎為例,其核心用C 編寫,結合了C的效率和麵向對象特性。 3)JavaScript引擎的工作原理包括解析、編譯和執行,C語言在這些過程中發揮關鍵作用。

JavaScript的角色:使網絡交互和動態JavaScript的角色:使網絡交互和動態Apr 24, 2025 am 12:12 AM

JavaScript是現代網站的核心,因為它增強了網頁的交互性和動態性。 1)它允許在不刷新頁面的情況下改變內容,2)通過DOMAPI操作網頁,3)支持複雜的交互效果如動畫和拖放,4)優化性能和最佳實踐提高用戶體驗。

C和JavaScript:連接解釋C和JavaScript:連接解釋Apr 23, 2025 am 12:07 AM

C 和JavaScript通過WebAssembly實現互操作性。 1)C 代碼編譯成WebAssembly模塊,引入到JavaScript環境中,增強計算能力。 2)在遊戲開發中,C 處理物理引擎和圖形渲染,JavaScript負責遊戲邏輯和用戶界面。

從網站到應用程序:JavaScript的不同應用從網站到應用程序:JavaScript的不同應用Apr 22, 2025 am 12:02 AM

JavaScript在網站、移動應用、桌面應用和服務器端編程中均有廣泛應用。 1)在網站開發中,JavaScript與HTML、CSS一起操作DOM,實現動態效果,並支持如jQuery、React等框架。 2)通過ReactNative和Ionic,JavaScript用於開發跨平台移動應用。 3)Electron框架使JavaScript能構建桌面應用。 4)Node.js讓JavaScript在服務器端運行,支持高並發請求。

Python vs. JavaScript:比較用例和應用程序Python vs. JavaScript:比較用例和應用程序Apr 21, 2025 am 12:01 AM

Python更適合數據科學和自動化,JavaScript更適合前端和全棧開發。 1.Python在數據科學和機器學習中表現出色,使用NumPy、Pandas等庫進行數據處理和建模。 2.Python在自動化和腳本編寫方面簡潔高效。 3.JavaScript在前端開發中不可或缺,用於構建動態網頁和單頁面應用。 4.JavaScript通過Node.js在後端開發中發揮作用,支持全棧開發。

C/C在JavaScript口譯員和編譯器中的作用C/C在JavaScript口譯員和編譯器中的作用Apr 20, 2025 am 12:01 AM

C和C 在JavaScript引擎中扮演了至关重要的角色,主要用于实现解释器和JIT编译器。1)C 用于解析JavaScript源码并生成抽象语法树。2)C 负责生成和执行字节码。3)C 实现JIT编译器,在运行时优化和编译热点代码,显著提高JavaScript的执行效率。

See all articles

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱工具

Atom編輯器mac版下載

Atom編輯器mac版下載

最受歡迎的的開源編輯器

SecLists

SecLists

SecLists是最終安全測試人員的伙伴。它是一個包含各種類型清單的集合,這些清單在安全評估過程中經常使用,而且都在一個地方。 SecLists透過方便地提供安全測試人員可能需要的所有列表,幫助提高安全測試的效率和生產力。清單類型包括使用者名稱、密碼、URL、模糊測試有效載荷、敏感資料模式、Web shell等等。測試人員只需將此儲存庫拉到新的測試機上,他就可以存取所需的每種類型的清單。

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

DVWA

DVWA

Damn Vulnerable Web App (DVWA) 是一個PHP/MySQL的Web應用程序,非常容易受到攻擊。它的主要目標是成為安全專業人員在合法環境中測試自己的技能和工具的輔助工具,幫助Web開發人員更好地理解保護網路應用程式的過程,並幫助教師/學生在課堂環境中教授/學習Web應用程式安全性。 DVWA的目標是透過簡單直接的介面練習一些最常見的Web漏洞,難度各不相同。請注意,該軟體中