ホームページ  >  記事  >  ウェブフロントエンド  >  Node.js でのストリーム読み取り可能なストリームの使用

Node.js でのストリーム読み取り可能なストリームの使用

hzc
hzc転載
2020-06-17 09:26:041917ブラウズ

読み取り可能なストリームとは、プログラムで使用するためのデータ を生成するストリームです。一般的なデータ生成方法には、ディスク ファイルの読み取り、ネットワーク リクエスト コンテンツの読み取りなどが含まれます。ストリームとは何かについての前の例を見てください。

const rs = fs.createReadStream(filePath);
rs は読み取り可能なストリームであり、そのデータ生成方法は次のとおりです。 read ディスク ファイル、コンソール process.stdin も読み取り可能なストリームです:

process.stdin.pipe(process.stdout);
簡単な文でコンソール入力を出力できます。 process.stdin がデータを生成する方法は、コンソールでのユーザーの入力を読み取ることです。の入力です。

読み取り可能なストリームの定義を振り返ってください。

読み取り可能なストリームは、プログラムで使用するデータを生成するストリームです。

カスタム読み取り可能なストリーム


提供されるシステムに加えて

fs.CreateReadStream

使用される gulp またはヴァイナル-fs によって提供される src メソッドも読み取り可能なストリームを使用します

gulp.src(['*.js', 'dist/**/*.scss'])
特定の方法でデータを生成し、それをプログラムに渡して消費したい場合、どのように始めればよいでしょうか?

これは 2 つの簡単な手順で実行できます。

    ストリーム モジュールの
  1. Readable

    クラスを継承します。
  2. Rewrite
  3. _read

    メソッド、
    this.push

    を呼び出して、生成されたデータをキューに入れて読み取ります
  4. #Readable クラスは、readable ストリームで行うべき作業のほとんどが完了しているので、それを継承し、_read メソッドにデータを生成するメソッドを記述するだけで、カスタム readable ストリームを実装できます。

例: 100 ミリ秒ごとに乱数を生成するストリームを実装します (役に立ちません)

const Readable = require('stream').Readable;
class RandomNumberStream extends Readable {
    constructor(max) {
        super()
    }
    _read() {
        const ctx = this;
        setTimeout(() => {
            const randomNumber = parseInt(Math.random() * 10000);
            // 只能 push 字符串或 Buffer,为了方便显示打一个回车
            ctx.push(`${randomNumber}\n`);
        }, 100);
    }
}
module.exports = RandomNumberStream;

コードのクラス継承部分は非常に単純です。主に、 _read メソッドには、いくつかの注意すべき点があります。

Readable クラスにはデフォルトで _read メソッドが実装されていますが、何も行われません。 #_read メソッドにはパラメータ size があり、どれだけのデータを読み取って read メソッドに返すかを指定するために使用されますが、これは単なる参照データです。多くの実装ではこのパラメータが無視され、ここでも無視されます。詳細は後述
  1. これを介してバッファにプッシュする データをプッシュする、バッファの概念は後述しますが、とりあえず水道管に押し込んで消費できることが分かります
  2. プッシュのコンテンツは文字列またはバッファのみであり、数値は使用できません。
  3. プッシュ メソッドには次のパラメータがあります。最初のパラメータが文字列の場合、2 番目のパラメータ エンコーディングはエンコーディングを指定するために使用されます。
  4. 実行して効果を確認してください
  5. const RandomNumberStream = require('./RandomNumberStream');
    const rns = new RandomNumberStream();
    rns.pipe(process.stdout);
  6. このように、コントロールに数値が連続的に表示されていることがわかりますプラットフォーム上では、乱数を生成する読み取り可能なストリームが実装されています。解決すべき小さな問題がいくつかあります

停止方法

100 ミリ秒ごとに数値をバッファにプッシュすると、たとえば、ローカル ファイルを読み取る場合、常に終了します。データが読み取られたことを示しますか?

null をバッファにプッシュするだけです。消費者が必要な乱数の数を定義できるようにコードを変更します:

const Readable = require('stream').Readable;
class RandomNumberStream extends Readable {
    constructor(max) {
        super()
        this.max = max;
    }
    _read() {
        const ctx = this;
        setTimeout(() => {
            if (ctx.max) {
                const randomNumber = parseInt(Math.random() * 10000);
                // 只能 push 字符串或 Buffer,为了方便显示打一个回车
                ctx.push(`${randomNumber}\n`);
                ctx.max -= 1;
            } else {
                ctx.push(null);
            }
        }, 100);
    }
}
module.exports = RandomNumberStream;

コードは最大識別子を使用して、消費者がニーズを指定できるようにします。インスタンス化時に文字数を指定できます

const RandomNumberStream = require('./');
const rns = new RandomNumberStream(5);
rns.pipe(process.stdout);

このようにすると、コンソールに 5 文字しか出力されないことがわかります

なぜ setInterval ではなく setTimeout なのでしょうか

注意してください学生は、100 ミリ秒ごとの乱数の生成では setInterval を呼び出すのではなく、setTimeout を使用することに気づいたかもしれませんが、結果が正しいのに、遅延するだけで繰り返されないのはなぜでしょうか。

#これには、ストリームが機能する 2 つの方法を理解する必要があります

#フロー モード: データは基礎となるシステムによって読み取られ、できるだけ早くアプリケーションに提供されます

一時停止モード: 複数のデータ ブロックを読み取るには、read() メソッドを明示的に呼び出す必要があります

    ストリームはデフォルトで一時停止モードになっており、プログラムは明示的に read() メソッドを呼び出す必要があります。ただし、上記の例では、pipe() メソッドによってストリームがフロー モードに切り替わるため、呼び出すことなくデータを取得できます。そのため、データが読み込まれるまで _read() メソッドが自動的に繰り返し呼び出されるので、そのたびに_read() メソッドでデータを読み取る必要があるのは 1 回だけです。
  1. フロー モードと一時停止モードの切り替え
  2. 次のメソッドを使用して、ストリームをデフォルトの一時停止モードからフローに切り替えることができます。 mode:

データ イベント リスナーを追加してデータ監視を開始する

resume() メソッドを呼び出してデータ フローを開始する

pipe() メソッドを呼び出して、データを別の書き込み可能なストリームに転送します
  1. フロー モードから一時停止モードに切り替える方法は 2 つあります。
  2. ストリームに Pipe() がない場合、 stop() メソッドはストリームを一時停止できます
pipe() すべてのデータ イベント リスナーを削除し、unpipe() メソッドを呼び出します

#data event
  1. 使用後Pipe() メソッドを使用すると、データは読み取り可能なストリームからストリーミングされます 書き込み可能なストリームに入りましたが、ユーザーにとってはブラック ボックスに見えます データはどのように流れるのでしょうか?フロー モードと一時停止モードを切り替えるときに重要な用語が 2 つあります。
  2. #フロー モードに対応するデータ イベント
一時停止モードに対応する read() メソッド

これら 2 つのメカニズムは、プログラムがデータ フローを駆動できる理由です。最初にフロー モードのデータ イベントを見てみましょう。読み取り可能なストリームのデータ イベントが監視されると、ストリームはフロー モードに入ります。上記のストリームを呼び出すコードを書き換えることができます。

const RandomNumberStream = require('./RandomNumberStream');
const rns = new RandomNumberStream(5);
rns.on('data', chunk => {
  console.log(chunk);
});

这样可以看到控制台打印出了类似下面的结果

<Buffer 39 35 37 0a>
<Buffer 31 30 35 37 0a>
<Buffer 38 35 31 30 0a>
<Buffer 33 30 35 35 0a>
<Buffer 34 36 34 32 0a>

当可读流生产出可供消费的数据后就会触发 data 事件,data 事件监听器绑定后,数据会被尽可能地传递。data 事件的监听器可以在第一个参数收到可读流传递过来的 Buffer 数据,这也就是控制台打印的 chunk,如果想显示为数字,可以调用 Buffer 的 toString() 方法

当数据处理完成后还会触发一个

end

事件,因为流的处理不是同步调用,所以如果希望完事后做一些事情就需要监听这个事件,在代码最后追加一句:

rns.on('end', () => {
  console.log('done');
});复制代码

这样可以在数据接收完了显示 done ,当然数据处理过程中出现了错误会触发 error 事件,可以监听做异常处理:

rns.on('error', (err) => {
  console.log(err);
});复制代码

read(size)

流在暂停模式下需要程序显式调用 read() 方法才能得到数据,read() 方法会从内部缓冲区中拉取并返回若干数据,当没有更多可用数据时,会返回null

使用 read() 方法读取数据时,如果传入了 size 参数,那么它会返回指定字节的数据;当指定的size字节不可用时,则返回null。如果没有指定size参数,那么会返回内部缓冲区中的所有数据

现在有一个矛盾,在流动模式下流生产出了数据,然后触发 data 事件通知给程序,这样很方便。在暂停模式下需要程序去读取,那么就有一种可能是读取的时候还没生产好,如果使用轮询的方式未免效率有些低

NodeJS 提供了一个

readable的事件,事件在可读流准备好数据的时候触发,也就是先监听这个事件,收到通知有数据了再去读取就好了:

const rns = new RandomNumberStream(5);
rns.on('readable', () => {
  let chunk;
  while((chunk = rns.read()) !== null){
    console.log(chunk);
  }
});

这样可以读取到数据,值得注意的一点是并不是每次调用 read() 方法都可以返回数据,前面提到了如果可用的数据没有达到 size 那么返回 null,所以在程序中加了个判断

数据会不会漏掉

const stream = fs.createReadStream('/dev/input/event0');
stream.on('readable', callback);复制代码

在流动模式会不会有这样的问题:可读流在创建好的时候就生产数据了,如果在绑定 readable 事件之前就生产了某些数据,触发了 readable 事件,在极端情况下会造成数据丢失吗?

事实并不会,按照 NodeJS event loop 程序创建流和调用事件监听在一个事件队列里面,生产数据和事件监听都是异步操作,而 on 监听事件使用了 process.nextTick 会保证在数据生产之前被绑定好,相关知识可以看定时器章节中对 event loop 的解读

到这里可能对 data事件、readable事件触发时机, read() 方法每次读多少数据,什么时候返回 null 还有一定的疑问,在后续可写流章节会在 back pressure 部分结合源码介绍相关机制

推荐教程:《JS教程

以上がNode.js でのストリーム読み取り可能なストリームの使用の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はjuejin.cnで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。