ホームページ > 記事 > ウェブフロントエンド > ストリームとは何ですか? Nodejs のストリームを理解する方法
ストリームとは何ですか?流れをどう理解するか? Node のストリーム (Stream) については、次の記事で詳しく説明していますので、お役に立てれば幸いです。
筆者は最近開発でパイプ関数をよく使っているのですが、ストリームパイプということだけは知っていて、仕組みがよく分からないので、まずは流れから学び始め、読んだ知識やソースコードを記事にまとめてみんなで共有しましょう。
ストリームは Nodejs の非常に基本的な概念であり、多くの基本モジュールがストリームに基づいて実装され、非常に重要な役割を果たします。同時に、フローは理解するのが非常に難しい概念でもあります。これは主に、関連ドキュメントが不足していることが原因です。NodeJ の初心者にとって、この概念を真に習得する前に、フローを理解するのに多くの時間がかかります。幸いなことに、ほとんどの NodeJ では、ユーザーにとっては Web アプリケーションの開発にのみ使用され、ストリームの理解が不十分でも使用には影響しません。ただし、ストリームを理解すると、NodeJ の他のモジュールをより深く理解できるようになり、場合によっては、ストリームを使用してデータを処理した方が良い結果が得られることがあります。 [関連チュートリアルの推奨事項: nodejs ビデオ チュートリアル ]
ストリームのユーザーは、次のことができます。ストリームを配列として考えると、ストリームからの取得 (消費) と書き込み (生成) だけに集中する必要があります。
ストリーム開発者 (ストリーム モジュールを使用して新しいインスタンスを作成する) は、ストリームにいくつかのメソッドを実装する方法に焦点を当てます。通常は、ターゲットが誰であるかという 2 つの点に焦点を当てます。リソースとその操作方法。ターゲット リソースが決定したら、ストリームのさまざまな状態やイベントに従ってターゲット リソース上で操作する必要があります。
NodeJs のすべて すべてのストリームにはバッファー プールがあります。バッファー プールの目的は、ストリームの効率を高めることです。データの生成と消費に時間がかかる場合は、事前にデータを生成し、次の消費の前にバッファプールに保存します。ただし、バッファ プールは常に使用されているわけではなく、たとえば、キャッシュ プールが空の場合、データは本番後にキャッシュ プールに置かれず、直接消費されます。 。
データ生成の速度がデータ消費の速度よりも速い場合、余ったデータはどこかで待機します。データ生成速度がプロセスデータ消費速度よりも遅い場合、データはどこかに一定量蓄積されてから消費されます。 (開発者はデータの生成と消費の速度を制御できません。適切なタイミングでデータの生成または消費を試みることしかできません。)
データが待機し、データが蓄積され、その後発生する場所。 バッファプールです。バッファ プールは通常、コンピュータの RAM (メモリ) にあります。
一般的なバッファの例を挙げると、オンライン ビデオを視聴する場合、インターネット速度が非常に速い場合、バッファは常にすぐにいっぱいになり、再生のためにシステムに送信され、すぐにバッファされます。 。視聴中に遅延が発生することはありません。ネットワーク速度が非常に遅い場合は、バッファがいっぱいになっていることを示す読み込み中が表示されます。いっぱいになると、データがシステムに送信され、このビデオが表示されます。
NodeJs ストリームのキャッシュ プールはバッファ リンク リストであり、キャッシュ プールにデータを追加するたびにバッファ ノードが再作成され、リンク リストの最後に挿入されます。
NodeJs の Stream は EventEmitter を実装した抽象インターフェイスなので、最初に EventEmitter について簡単に紹介します。
EventEmitter はイベントの発行とサブスクリプションの機能を実装するクラスで、一般的に使用されるいくつかのメソッド (on、once、off、emit) は誰もがよく知っていると思われるので、個別に紹介することはしません。
const { EventEmitter } = require('events') const eventEmitter = new EventEmitter() // 为 eventA 事件绑定处理函数 eventEmitter.on('eventA', () => { console.log('eventA active 1'); }); // 为 eventB 事件绑定处理函数 eventEmitter.on('eventB', () => { console.log('eventB active 1'); }); eventEmitter.once('eventA', () => { console.log('eventA active 2'); }); // 触发 eventA eventEmitter.emit('eventA') // eventA active 1 // eventA active 2
EventEmitter
には、newListener
と removeListener
という 2 つのイベントがあることに注意してください。イベントリスニング関数がトリガーされると、newListener
(eventEmitter.emit('newListener')) がトリガーされ、ハンドラー関数が削除されると、同様に removeListener
がトリガーされます。
また、一度バインドされた処理関数は 1 回だけ実行され、removeListener
はその実行前にトリガーされることにも注意してください。これは、once
バインディングを意味します。トリガーされる前に、まず削除されます。
const { EventEmitter } = require('events') const eventEmitter = new EventEmitter() eventEmitter.on('newListener', (event, listener)=>{ console.log('newListener', event, listener) }) eventEmitter.on('removeListener', (event, listener) => { console.log('removeListener', event, listener) }) //newListener removeListener[Function(anonymous)] eventEmitter.on('eventA', () => { console.log('eventA active 1'); }); //newListener eventA [Function (anonymous)] function listenerB() { console.log('eventB active 1'); } eventEmitter.on('eventB', listenerB); // newListener eventB [Function (anonymous)] eventEmitter.once('eventA', () => { console.log('eventA active 2'); }); // newListener eventA [Function (anonymous)] eventEmitter.emit('eventA') // eventA active 1 // removeListener eventA [Function: bound onceWrapper] { listener: [Function (anonymous)] } // eventA active 2 eventEmitter.off('eventB', listenerB) // removeListener eventB[Function: listenerB]
しかし、これは後続のコンテンツにとっては重要ではありません。
Stream は、Node.js でストリーミング データを処理するための抽象インターフェイスです。ストリームは実際のインターフェイスではなく、すべてのストリームの総称です。実際のインターフェイスは、ReadableStream、WritableStream、および ReadWriteStream です。
interface ReadableStream extends EventEmitter { readable: boolean; read(size?: number): string | Buffer; setEncoding(encoding: BufferEncoding): this; pause(): this; resume(): this; isPaused(): boolean; pipe<T extends WritableStream>(destination: T, options?: { end?: boolean | undefined; }): T; unpipe(destination?: WritableStream): this; unshift(chunk: string | Uint8Array, encoding?: BufferEncoding): void; wrap(oldStream: ReadableStream): this; [Symbol.asyncIterator](): AsyncIterableIterator<string | Buffer>; } interface WritableStream extends EventEmitter { writable: boolean; write(buffer: Uint8Array | string, cb?: (err?: Error | null) => void): boolean; write(str: string, encoding?: BufferEncoding, cb?: (err?: Error | null) => void): boolean; end(cb?: () => void): this; end(data: string | Uint8Array, cb?: () => void): this; end(str: string, encoding?: BufferEncoding, cb?: () => void): this; } interface ReadWriteStream extends ReadableStream, WritableStream { }
ReadableStream と WritableStream はどちらも EventEmitter クラスを継承するインターフェイスであることがわかります (ts のインターフェイスは型をマージするだけであるため、クラスを継承できます)。
上記のインターフェイスに対応する実装クラスは、Readable、Writable、Duplex です。
NodeJ には 4 種類のストリームがあります。
ディスクのデータ書き込み速度はメモリの速度よりもはるかに遅いです。メモリとディスクの間にはギャップがあると考えられます。 「パイプライン」、「パイプライン」とは「流れ」を意味します。メモリ内のデータは非常に速くパイプに流れ込みます。パイプがいっぱいになると、メモリ内にデータ バック プレッシャーが発生し、データがメモリ内にバックログされて占有されます。リソース。
NodeJs ストリームの解決策は、各ストリームの バッファ プール
(つまり、図の書き込みキュー) に float 値を設定することです。その中のデータ量がこの float 値に達した後、push
データがキャッシュ プールに再度プッシュされると false が返され、現在のストリームのキャッシュ プールの内容が float 値に達したことを示します。 、これ以上データが書き込まれる予定はありません。現時点では、大きすぎるキャッシュ プールによって引き起こされるバック プレッシャーを防ぐために、データの生成をただちに停止する必要があります。
読み取り可能なストリーム (Readable) はストリームの一種で、2 つのモードと 3 つの状態があります。
2 つの読み取りモード:
フロー モード: データは基盤となるシステムからバッファーに読み書きされます。バッファーがいっぱいになると、データは EventEmitter を通じてできるだけ早く登録されたイベント ハンドラーに自動的に渡されます。
一時停止モード: このモードでは、EventEmitter はデータ送信のためにアクティブにトリガーされず、バッファからデータを読み取るために Readable.read()
メソッドを明示的に呼び出す必要があります。 EventEmitter イベントへの応答をトリガーします。
3 つの状態:
readableFlowing === null (初期状態)
readableFlowing = == false (一時停止モード)
readableFlowing === true (フロー モード)
初期フロー readable.readableFlowing
null
のデータ イベントを追加すると true になります。 pause()
、unpipe()
が呼び出されるか、バック プレッシャーが受信されるか、readable
イベントが追加されると、readableFlowing
が実行されます。 false に設定します。この状態では、リスナーをデータ イベントにバインドしても、readableFlowing が true に切り替わりません。
resume()
を呼び出して、読み取り可能なストリームの readableFlowing
を true に切り替えます
すべての読み取り可能なイベントを削除して、readableFlowing を有効にしますnullになる方法。
説明 | |
---|---|
バッファーに新しい読み取り可能なデータがあるときにトリガーされます (ノードがキャッシュ プールに挿入されるたびにトリガーされます) | |
その後毎回トリガーされますデータの消費 トリガーされる、パラメータは今回消費されるデータです | |
ストリームが閉じられたときにトリガーされます | |
ストリームでエラーが発生したときにトリガー |
以上がストリームとは何ですか? Nodejs のストリームを理解する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。