検索
ホームページウェブフロントエンドjsチュートリアルNodejs ストリーム データ ストリーム ユーザーマニュアル_node.js

1. はじめに

この記事では、node.js ストリームを使用したプログラム開発の基本的な方法を紹介します。

<code class="hljs mizar">"We should have some ways of connecting programs like garden hose--screw in
another segment when it becomes necessary to massage data in
another way. This is the way of IO also."
Doug McIlroy. October 11, 1964</code>

Stream に最初に触れたのは、Unix の初期の頃でした。数十年にわたる実践により、Stream のアイデアがいくつかの巨大なシステムを簡単に開発できることが証明されました。 UNIX では、Stream はノード内の |; を介して実装され、組み込みのストリーム モジュールとして、多くのコア モジュールとサードパーティ モジュールが使用されます。 Unix と同様に、ノード Stream の主な操作も .pipe() です。ユーザーは、アンチプレッシャー メカニズムを使用して読み取りと書き込みのバランスを制御できます。

Stream は、再利用可能な統合インターフェイスを開発者に提供し、抽象的な Stream インターフェイスを通じてストリーム間の読み取りと書き込みのバランスを制御できます。

2. ストリームを使用する理由

ノードの I/O は非同期であるため、ディスクとネットワークへの読み取りと書き込みにはデータを読み取るためのコールバック関数が必要です。以下はファイル ダウンロード サーバーの簡単なコードです:

<code class="hljs javascript">var http = require('http');
var fs = require('fs');
var server = http.createServer(function (req, res) {
fs.readFile(__dirname + '/data.txt', function (err, data) {
res.end(data);
});
});
server.listen(8000);</code>

これらのコードは必要な機能を実現できますが、「data.txt」ファイルが大きく、同時実行の量が多い場合、サービスはファイル データ全体をメモリにキャッシュする必要があります。メモリが無駄になります。ユーザーはファイル データを受け入れる前にファイル全体がメモリにキャッシュされるまで待つ必要があるため、ユーザー エクスペリエンスはかなり低下します。しかし幸いなことに、両方のパラメータ (req、res) は Stream なので、 fs.readFile():

の代わりに fs.createReadStream() を使用できます。
<code class="hljs javascript">var http = require('http');
var fs = require('fs');
var server = http.createServer(function (req, res) {
var stream = fs.createReadStream(__dirname + '/data.txt');
stream.pipe(res);
});
server.listen(8000);</code>

.pipe() メソッドは fs.createReadStream() の「data」イベントと「end」イベントをリッスンするため、「data.txt」ファイルはファイル全体をキャッシュする必要がなく、データ ブロックはクライアント接続が完了した直後に送信されます。 .pipe() を使用するもう 1 つの利点は、クライアントの遅延が非常に大きい場合に発生する読み取りと書き込みの不均衡の問題を解決できることです。送信する前にファイルを圧縮したい場合は、サードパーティのモジュールを使用できます:

<code class="hljs javascript">var http = require('http');
var fs = require('fs');
var oppressor = require('oppressor');
var server = http.createServer(function (req, res) {
var stream = fs.createReadStream(__dirname + '/data.txt');
stream.pipe(oppressor(req)).pipe(res);
});
server.listen(8000);</code>

この方法では、ファイルは gzip と deflate をサポートするブラウザによって圧縮されます。 oppressor モジュールはすべてのコンテンツ エンコーディングを処理します。

Stream によりプログラムの開発が簡単になります。

3. 基本概念

読み取り可能、書き込み可能、​​変換、二重、および「クラシック」という 5 つの基本的なストリームがあります。

3-1、パイプ

すべてのタイプのストリーム コレクションは、次のように .pipe() を使用して入出力ペアを作成し、読み取り可能なストリーム src を受け取り、そのデータを書き込み可能なストリーム dst に出力します。

<code class="hljs perl">src.pipe(dst)</code>

.pipe(dst) メソッドは dst ストリームを返すため、次のように複数の .pipe() を連続して使用できます。

<code class="hljs perl">a.pipe( b ).pipe( c ).pipe( d )</code>
次のコードと同じ機能:

<code class="hljs perl">a.pipe( b );
b.pipe( c );
c.pipe( d );</code>

3-2、読み取り可能なストリーム

Readable ストリームの .pipe() メソッドを呼び出すことにより、Readable ストリームのデータを Writable、Transform、または Duplex ストリームに書き込むことができます。

<code class="hljs perl">readableStream.pipe( dst )</code>
1>読み取り可能なストリームを作成する

ここでは、読み取り可能なストリームを作成します!

<code class="hljs perl">var Readable = require('stream').Readable;
var rs = new Readable;
rs.push('beep ');
rs.push('boop\n');
rs.push(null);
rs.pipe(process.stdout);
$ node read0.js
beep boop
</code>
rs.push( null ) は、データが送信されたことをデータ受信者に通知します。

すべてのデータ コンテンツを読み取り可能なストリームにプッシュする前に rs.pipe(process.stdout); を呼び出していないことに注意してください。これは、読み取り可能なストリームがすべてプッシュされたためです。データは、受信者がデータを読み取るまでキャッシュされます。ただし、多くの場合、データ全体をキャッシュするのではなく、データを受信したときにのみ読み取り可能なストリームにデータをプッシュする方が適切です。 ._read() 関数を書き直してみましょう:

<code class="hljs javascript">var Readable = require('stream').Readable;
var rs = Readable();
var c = 97;
rs._read = function () {
rs.push(String.fromCharCode(c++));
if (c > 'z'.charCodeAt(0)) rs.push(null);
};
rs.pipe(process.stdout);</code>
<code class="hljs bash">$ node read1.js
abcdefghijklmnopqrstuvwxyz</code>
上記のコードは、データ受信者がデータを要求した場合にのみ、_read() メソッドをオーバーライドして読み取り可能なストリームにデータをプッシュすることで実現します。 _read() メソッドは、データ要求によって要求されたデータ サイズを示すサイズ パラメーターも受け取ることができますが、読み取り可能なストリームは必要に応じてこのパラメーターを無視できます。

util.inherits() を使用して読み取り可能なストリームを継承することもできることに注意してください。 _read() メソッドがデータ受信者がデータを要求したときにのみ呼び出されることを示すために、次のように、読み取り可能なストリームにデータをプッシュするときに遅延を設けます。

<code class="hljs javascript">var Readable = require('stream').Readable;
var rs = Readable();
var c = 97 - 1;
rs._read = function () {
if (c >= 'z'.charCodeAt(0)) return rs.push(null);
setTimeout(function () {
rs.push(String.fromCharCode(++c));
}, 100);
};
rs.pipe(process.stdout);
process.on('exit', function () {
console.error('\n_read() called ' + (c - 97) + ' times');
});
process.stdout.on('error', process.exit);</code>
次のコマンドを使用してプログラムを実行すると、_read() メソッドが 5 回しか呼び出されないことがわかります。

<code class="hljs bash">$ node read2.js | head -c5
abcde
_read() called 5 times</code>
タイマーを使用する理由は、システムがプログラムにパイプを閉じるように通知する信号を送信する時間が必要であるためです。 Process.stdout.on('error', fn) は、ヘッダー コマンドがパイプを閉じるため、SIGPIPE シグナルを送信するシステムを処理するために使用されます。これにより、process.stdout が EPIPE イベントをトリガーすることになります。任意の形式でデータをプッシュできる読み取り可能なストリームを作成する場合は、ストリームの作成時にパラメーター objectMode を true に設定します (例: Readable({ objectMode: true }))。

2> 読み取り可能なストリームデータを読み取ります

ほとんどの場合、パイプ メソッドを使用して、読み取り可能なストリームから別の形式のストリームにデータをリダイレクトしますが、場合によっては、読み取り可能なストリームから直接データを読み取る方が便利な場合もあります。以下のように:

当可读流中有数据可读取时,流会触发'readable' 事件,这样就可以调用.read()方法来读取相关数据,当可读流中没有数据可读取时,.read() 会返回null,这样就可以结束.read() 的调用, 等待下一次'readable' 事件的触发。下面是一个使用.read(n)从标准输入每次读取3个字节的例子:

<code class="hljs javascript">process.stdin.on('readable', function () {
var buf = process.stdin.read(3);
console.dir(buf);
});</code>

如下运行程序发现,输出结果并不完全!

<code class="hljs bash">$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume1.js 
<buffer 61="" 62="" 63="">
<buffer 0a="" 64="" 65="">
<buffer 0a="" 66="" 67=""></buffer></buffer></buffer></code>

这是应为额外的数据数据留在流的内部缓冲区里了,而我们需要通知流我们要读取更多的数据.read(0)可以达到这个目的。

<code class="hljs javascript">process.stdin.on('readable', function () {
var buf = process.stdin.read(3);
console.dir(buf);
process.stdin.read(0);
});</code>

这次运行结果如下:

<code class="hljs xml">$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume2.js 
<buffer 0a="" 64="" 65="">
<buffer 0a="" 68="" 69=""></buffer></buffer></code>

我们可以使用 .unshift() 将数据重新押回流数据队列的头部,这样可以接续读取押回的数据。如下面的代码,会按行输出标准输入的内容:

<code class="hljs javascript">var offset = 0;
process.stdin.on('readable', function () {
var buf = process.stdin.read();
if (!buf) return;
for (; offset < buf.length; offset++) {
if (buf[offset] === 0x0a) {
console.dir(buf.slice(0, offset).toString());
buf = buf.slice(offset + 1);
offset = 0;
process.stdin.unshift(buf);
return;
}
}
process.stdin.unshift(buf);
});
$ tail -n +50000 /usr/share/dict/american-english | head -n10 | node lines.js 
'hearties'
'heartiest'
'heartily'
'heartiness'
'heartiness\'s'
'heartland'
'heartland\'s'
'heartlands'
'heartless'
'heartlessly'</code>

当然,有很多模块可以实现这个功能,如:split 。

3-3、writable streams

writable streams只可以作为.pipe()函数的目的参数。如下代码:

<code class="hljs perl">src.pipe( writableStream );</code>

1>创建 writable stream

重写 ._write(chunk, enc, next) 方法就可以接受一个readable stream的数据。

<code class="hljs php">var Writable = require('stream').Writable;
var ws = Writable();
ws._write = function (chunk, enc, next) {
console.dir(chunk);
next();
};
process.stdin.pipe(ws);
$ (echo beep; sleep 1; echo boop) | node write0.js 
<buffer 0a="" 62="" 65="" 70="">
<buffer 0a="" 62="" 6f="" 70=""></buffer></buffer></code>

第一个参数chunk是数据输入者写入的数据。第二个参数end是数据的编码格式。第三个参数next(err)通过回调函数通知数据写入者可以写入更多的时间。如果readable stream写入的是字符串,那么字符串会默认转换为Buffer,如果在创建流的时候设置Writable({ decodeStrings: false })参数,那么不会做转换。如果readable stream写入的数据时对象,那么需要这样创建writable stream

<code class="hljs css">Writable({ objectMode: true })</code>

2>写数据到 writable stream

调用writable stream的.write(data)方法即可完成数据写入。

<code class="hljs vala">process.stdout.write('beep boop\n');</code>

调用.end()方法通知writable stream 数据已经写入完成。

<code class="hljs javascript">var fs = require('fs');
var ws = fs.createWriteStream('message.txt');
ws.write('beep ');
setTimeout(function () {
ws.end('boop\n');
}, 1000);
$ node writing1.js 
$ cat message.txt
beep boop</code>

如果需要设置writable stream的缓冲区的大小,那么在创建流的时候,需要设置opts.highWaterMark,这样如果缓冲区里的数据超过opts.highWaterMark,.write(data)方法会返回false。当缓冲区可写的时候,writable stream会触发'drain' 事件。

3-4、classic streams

Classic streams比较老的接口了,最早出现在node 0.4版本中,但是了解一下其运行原理还是十分有好
处的。当一个流被注册了"data" 事件的回到函数,那么流就会工作在老版本模式下,即会使用老的API。

1>classic readable streams

Classic readable streams事件就是一个事件触发器,如果Classic readable streams有数据可读取,那么其触发 "data" 事件,等到数据读取完毕时,会触发"end" 事件。.pipe() 方法通过检查stream.readable 的值确定流是否有数据可读。下面是一个使用Classic readable streams打印A-J字母的例子:

<code class="hljs javascript">var Stream = require('stream');
var stream = new Stream;
stream.readable = true;
var c = 64;
var iv = setInterval(function () {
if (++c >= 75) {
clearInterval(iv);
stream.emit('end');
}
else stream.emit('data', String.fromCharCode(c));
}, 100);
stream.pipe(process.stdout);
$ node classic0.js
ABCDEFGHIJ</code>

如果要从classic readable stream中读取数据,注册"data" 和"end"两个事件的回调函数即可,代码如下:

<code class="hljs php">process.stdin.on('data', function (buf) {
console.log(buf);
});
process.stdin.on('end', function () {
console.log('__END__');
});
$ (echo beep; sleep 1; echo boop) | node classic1.js 
<buffer 0a="" 62="" 65="" 70="">
<buffer 0a="" 62="" 6f="" 70="">
__END__</buffer></buffer></code>

需要注意的是如果你使用这种方式读取数据,那么会失去使用新接口带来的好处。比如你在往一个 延迟非常大的流写数据时,需要注意读取数据和写数据的平衡问题,否则会导致大量数据缓存在内存中,导致浪费大量内存。一般这时候强烈建议使用流的.pipe()方法,这样就不用自己监听”data” 和”end”事件了,也不用担心读写不平衡的问题了。当然你也可以用 through代替自己监听”data” 和”end” 事件,如下面的代码:

<code class="hljs php">var through = require('through');
process.stdin.pipe(through(write, end));
function write (buf) {
console.log(buf);
}
function end () {
console.log('__END__');
}
$ (echo beep; sleep 1; echo boop) | node through.js 
<buffer 0a="" 62="" 65="" 70="">
<buffer 0a="" 62="" 6f="" 70="">
__END__</buffer></buffer></code>

或者也可以使用concat-stream来缓存整个流的内容:

<code class="hljs oxygene">var concat = require('concat-stream');
process.stdin.pipe(concat(function (body) {
console.log(JSON.parse(body));
}));
$ echo '{"beep":"boop"}' | node concat.js 
{ beep: 'boop' }</code>

当然如果你非要自己监听"data" 和"end"事件,那么你可以在写数据的流不可写的时候使用.pause()方法暂停Classic readable streams继续触发”data” 事件。等到写数据的流可写的时候再使用.resume() 方法通知流继续触发"data" 事件继续读取
数据。

2>classic writable streams

Classic writable streams 非常简单。只有 .write(buf), .end(buf)和.destroy()三个方法。.end(buf) 方法的buf参数是可选的,如果选择该参数,相当于stream.write(buf); stream.end() 这样的操作,需要注意的是当流的缓冲区写满即流不可写时.write(buf)方法会返回false,如果流再次可写时,流会触发drain事件。

4、transform

transform是一个对读入数据过滤然输出的流。

5、duplex

duplex stream是一个可读也可写的双向流,如下面的a就是一个duplex stream:

<code class="hljs livecodeserver">a.pipe(b).pipe(a)</code>

以上内容是小编给大家介绍的Nodejs Stream 数据流使用手册,希望对大家有所帮助!

声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
JavaScriptの文字列文字を交換しますJavaScriptの文字列文字を交換しますMar 11, 2025 am 12:07 AM

JavaScript文字列置換法とFAQの詳細な説明 この記事では、javaScriptの文字列文字を置き換える2つの方法について説明します:内部JavaScriptコードとWebページの内部HTML。 JavaScriptコード内の文字列を交換します 最も直接的な方法は、置換()メソッドを使用することです。 str = str.replace( "find"、 "置換"); この方法は、最初の一致のみを置き換えます。すべての一致を置き換えるには、正規表現を使用して、グローバルフラグGを追加します。 str = str.replace(/fi

独自のAjax Webアプリケーションを構築します独自のAjax Webアプリケーションを構築しますMar 09, 2025 am 12:11 AM

それで、あなたはここで、Ajaxと呼ばれるこのことについてすべてを学ぶ準備ができています。しかし、それは正確には何ですか? Ajaxという用語は、動的でインタラクティブなWebコンテンツを作成するために使用されるテクノロジーのゆるいグループ化を指します。 Ajaxという用語は、もともとJesse Jによって造られました

独自のJavaScriptライブラリを作成および公開するにはどうすればよいですか?独自のJavaScriptライブラリを作成および公開するにはどうすればよいですか?Mar 18, 2025 pm 03:12 PM

記事では、JavaScriptライブラリの作成、公開、および維持について説明し、計画、開発、テスト、ドキュメント、およびプロモーション戦略に焦点を当てています。

ブラウザでのパフォーマンスのためにJavaScriptコードを最適化するにはどうすればよいですか?ブラウザでのパフォーマンスのためにJavaScriptコードを最適化するにはどうすればよいですか?Mar 18, 2025 pm 03:14 PM

この記事では、ブラウザでJavaScriptのパフォーマンスを最適化するための戦略について説明し、実行時間の短縮、ページの負荷速度への影響を最小限に抑えることに焦点を当てています。

ブラウザ開発者ツールを使用してJavaScriptコードを効果的にデバッグするにはどうすればよいですか?ブラウザ開発者ツールを使用してJavaScriptコードを効果的にデバッグするにはどうすればよいですか?Mar 18, 2025 pm 03:16 PM

この記事では、ブラウザ開発者ツールを使用した効果的なJavaScriptデバッグについて説明し、ブレークポイントの設定、コンソールの使用、パフォーマンスの分析に焦点を当てています。

jQueryマトリックス効果jQueryマトリックス効果Mar 10, 2025 am 12:52 AM

マトリックスの映画効果をあなたのページにもたらしましょう!これは、有名な映画「The Matrix」に基づいたクールなJQueryプラグインです。プラグインは、映画の古典的な緑色のキャラクター効果をシミュレートし、画像を選択するだけで、プラグインはそれを数値文字で満たされたマトリックススタイルの画像に変換します。来て、それを試してみてください、それはとても面白いです! それがどのように機能するか プラグインは画像をキャンバスにロードし、ピクセルと色の値を読み取ります。 data = ctx.getimagedata(x、y、settings.greasize、settings.greasize).data プラグインは、写真の長方形の領域を巧みに読み取り、jQueryを使用して各領域の平均色を計算します。次に、使用します

シンプルなjQueryスライダーを構築する方法シンプルなjQueryスライダーを構築する方法Mar 11, 2025 am 12:19 AM

この記事では、jQueryライブラリを使用してシンプルな画像カルーセルを作成するように導きます。 jQuery上に構築されたBXSLiderライブラリを使用し、カルーセルをセットアップするために多くの構成オプションを提供します。 今日、絵のカルーセルはウェブサイトで必須の機能になっています - 1つの写真は千の言葉よりも優れています! 画像カルーセルを使用することを決定した後、次の質問はそれを作成する方法です。まず、高品質の高解像度の写真を収集する必要があります。 次に、HTMLとJavaScriptコードを使用して画像カルーセルを作成する必要があります。ウェブ上には、さまざまな方法でカルーセルを作成するのに役立つ多くのライブラリがあります。オープンソースBXSLiderライブラリを使用します。 BXSLiderライブラリはレスポンシブデザインをサポートしているため、このライブラリで構築されたカルーセルは任意のものに適合させることができます

Angularを使用してCSVファイルをアップロードおよびダウンロードする方法Angularを使用してCSVファイルをアップロードおよびダウンロードする方法Mar 10, 2025 am 01:01 AM

データセットは、APIモデルとさまざまなビジネスプロセスの構築に非常に不可欠です。これが、CSVのインポートとエクスポートが頻繁に必要な機能である理由です。このチュートリアルでは、Angular内でCSVファイルをダウンロードおよびインポートする方法を学びます

See all articles

ホットAIツール

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

写真から衣服を削除するオンライン AI ツール。

Undress AI Tool

Undress AI Tool

脱衣画像を無料で

Clothoff.io

Clothoff.io

AI衣類リムーバー

AI Hentai Generator

AI Hentai Generator

AIヘンタイを無料で生成します。

ホットツール

mPDF

mPDF

mPDF は、UTF-8 でエンコードされた HTML から PDF ファイルを生成できる PHP ライブラリです。オリジナルの作者である Ian Back は、Web サイトから「オンザフライ」で PDF ファイルを出力し、さまざまな言語を処理するために mPDF を作成しました。 HTML2FPDF などのオリジナルのスクリプトよりも遅く、Unicode フォントを使用すると生成されるファイルが大きくなりますが、CSS スタイルなどをサポートし、多くの機能強化が施されています。 RTL (アラビア語とヘブライ語) や CJK (中国語、日本語、韓国語) を含むほぼすべての言語をサポートします。ネストされたブロックレベル要素 (P、DIV など) をサポートします。

SublimeText3 中国語版

SublimeText3 中国語版

中国語版、とても使いやすい

Dreamweaver Mac版

Dreamweaver Mac版

ビジュアル Web 開発ツール

EditPlus 中国語クラック版

EditPlus 中国語クラック版

サイズが小さく、構文の強調表示、コード プロンプト機能はサポートされていません

Safe Exam Browser

Safe Exam Browser

Safe Exam Browser は、オンライン試験を安全に受験するための安全なブラウザ環境です。このソフトウェアは、あらゆるコンピュータを安全なワークステーションに変えます。あらゆるユーティリティへのアクセスを制御し、学生が無許可のリソースを使用するのを防ぎます。