Heim  >  Artikel  >  Web-Frontend  >  Lesbare Streams in NodeJS verstehen

Lesbare Streams in NodeJS verstehen

青灯夜游
青灯夜游nach vorne
2020-11-20 17:45:579347Durchsuche

Lesbare Streams in NodeJS verstehen

Verwandte Empfehlungen: „node js Tutorial

Was ist ein lesbarer Stream?

Ein lesbarer Stream ist ein Stream, der „Daten erzeugt“ für den Programmverbrauch. Zu unseren gängigen Datenproduktionsmethoden gehören das Lesen von Festplattendateien, das Lesen von Netzwerkanforderungsinhalten usw. Schauen Sie sich das vorherige Beispiel für die Stream-Nutzung an:

const rs = fs.createReadStream(filePath);
rs ist ein lesbarer Stream, und seine Art der Datenerzeugung besteht darin, Festplattendateien zu lesen , unser gemeinsamer Konsolenprozess.stdin ist auch ein lesbarer Stream:

process.stdin.pipe(process.stdout);

Sie können die Eingabe der Konsole mit einem einfachen Satz ausdrucken. Die Art und Weise, wie Process.stdin Daten erzeugt, besteht darin, die Eingaben des Benutzers auf der Konsole zu lesen.

Lassen Sie uns einen Blick zurück auf unsere Definition von lesbaren Streams werfen: Lesbare Streams sind Streams, die Daten für den Programmverbrauch erzeugen.

Benutzerdefinierter lesbarer Stream

Zusätzlich zu dem, was uns das System bietet

, verwenden wir häufig auch die von gulp oder vinyl-fs bereitgestellte src-Methode

gulp.src(['*.js', 'dist/**/*.scss'])
fs.CreateReadStreamWenn wir Daten auf eine bestimmte Art und Weise erzeugen möchten, überlassen wir es dem das Programm Verbrauch, also wie fange ich an?

Nur zwei einfache Schritte

Erben Sie die
    Readable
  1. -Klasseüberschreiben Sie die
  2. _read
  3. -Methode des Stream-Moduls, rufen Sie this.push auf, um die erzeugten Daten in die Warteschlange zum Lesen zu stellen
  4. Readable-Klasse Der größte Teil der Arbeit, die der lesbare Stream leisten muss, ist abgeschlossen. Wir müssen ihn nur erben und dann die Methode zum Erzeugen von Daten in die _read-Methode schreiben, um einen benutzerdefinierten lesbaren Stream zu implementieren.

Wenn wir einen Stream implementieren möchten, der alle 100 Millisekunden eine Zufallszahl generiert (nicht sehr nützlich)

const Readable = require('stream').Readable;

class RandomNumberStream extends Readable {
    constructor(max) {
        super()
    }

    _read() {
        const ctx = this;
        setTimeout(() => {
            const randomNumber = parseInt(Math.random() * 10000);

            // 只能 push 字符串或 Buffer,为了方便显示打一个回车
            ctx.push(`${randomNumber}\n`);
        }, 100);
    }
}

module.exports = RandomNumberStream;

Der Klassenvererbungsteil des Codes ist sehr einfach. Schauen Sie sich hauptsächlich die Implementierung der _read-Methode an. Es gibt mehrere Dinge Bemerkenswert

Readable Es gibt standardmäßig eine Implementierung der _read-Methode in der Klasse, es wird jedoch nichts unternommen. Die _read-Methode verfügt über eine Parametergröße, mit der die Datenmenge angegeben wird sollte gelesen und an die Lesemethode zurückgegeben werden, aber es handelt sich nur um Referenzdaten. Viele Implementierungen ignorieren diesen Parameter, wir ignorieren ihn auch hier, er wird später im Detail erwähnt
  1. Daten über this.push in den Puffer übertragen. Das Pufferkonzept wird später erwähnt und vorübergehend als in die Wasserleitung gepresst und zum Verzehr bereit verstanden. Der Inhalt von Push kann nur eine Zeichenfolge oder ein Puffer sein, keine Zahl. Die Push-Methode verfügt über eine zweite Parametercodierung wird verwendet, um die Codierung anzugeben, wenn der erste Parameter eine Zeichenfolge ist. Führen Sie ihn aus, um den Effekt zu sehen. Da die Zahlen kontinuierlich auf der Konsole angezeigt werden, haben wir einen lesbaren Stream implementiert, der Zufallszahlen generiert Es müssen noch ein paar kleine Probleme gelöst werden lesen?
  2. Schieben Sie einfach eine Null in den Puffer. Ändern wir den Code, damit Verbraucher definieren können, wie viele Zufallszahlen benötigt werden:
  3. const RandomNumberStream = require('./RandomNumberStream');
    
    const rns = new RandomNumberStream();
    
    rns.pipe(process.stdout);
  4. Wir verwenden einen maximalen Bezeichner, um Verbrauchern die Angabe der Anzahl der erforderlichen Zeichen zu ermöglichen, die während der Instanziierung angegeben werden kann
  5. const Readable = require('stream').Readable;
    
    class RandomNumberStream extends Readable {
        constructor(max) {
            super()
            this.max = max;
        }
    
        _read() {
            const ctx = this;
    
            setTimeout(() => {
                if (ctx.max) {
                    const randomNumber = parseInt(Math.random() * 10000);
    
                    // 只能 push 字符串或 Buffer,为了方便显示打一个回车
                    ctx.push(`${randomNumber}\n`);
                    ctx.max -= 1;
                } else {
                    ctx.push(null);
                }
            }, 100);
        }
    }
    
    module.exports = RandomNumberStream;
  6. Wie Sie in der Konsole sehen können Es wurden nur 5 Zeichen gedruckt die Produktion verzögerte sich und die Produktion wurde nicht wiederholt, aber das Ergebnis stimmt?
  7. Dafür ist es erforderlich, die beiden Funktionsweisen von Streams zu verstehen.

Fließmodus: Die Daten werden vom zugrunde liegenden System ausgelesen und der Anwendung so schnell wie möglich bereitgestellt.

Pausemodus: Die Methode read() muss zum Lesen explizit aufgerufen werden Mehrere Datenblöcke

Streams befinden sich standardmäßig im Pausenmodus, was bedeutet, dass das Programm die Methode read() explizit aufrufen muss. In unserem Beispiel können die Daten jedoch abgerufen werden, ohne sie aufzurufen, da unser Stream Pipe() übergibt. Die Methode wechselt in den Fließmodus, sodass unsere _read()-Methode automatisch wiederholt aufgerufen wird, bis die Daten gelesen sind. Wir müssen die Daten also nur einmal in jeder _read()-Methode lesen.

Umschalten zwischen Flow-Modus und Pause-Modus

Flow kann auf folgende Weise vom Standard-Pause-Modus in den Flow-Modus umgeschaltet werden:

Starten Sie die Datenüberwachung durch Hinzufügen eines Datenereignis-Listeners.

Rufen Sie die Methode „resume()“ auf Um den Datenstrom zu startenruft die Methode „pipe()“ auf, um die Daten an einen anderen beschreibbaren Stream zu übertragen

Es gibt zwei Möglichkeiten, vom Flussmodus in den Pausenmodus zu wechseln:

  1. 在流没有 pipe() 时,调用 pause() 方法可以将流暂停
  2. pipe() 时,需要移除所有 data 事件的监听,再调用 unpipe() 方法

data 事件

使用了 pipe() 方法后数据就从可读流进入了可写流,但对我们好像是个黑盒,数据究竟是怎么流向的呢?我们看到切换流动模式和暂停模式的时候有两个重要的名词

  1. 流动模式对应的 data 事件
  2. 暂停模式对应的 read() 方法

这两个机制是我们能够驱动数据流动的原因,先来看一下流动模式 data 事件,一旦我们监听了可读流的 data 时、事件,流就进入了流动模式,我们可以改写一下上面调用流的代码

const RandomNumberStream = require('./RandomNumberStream');

const rns = new RandomNumberStream(5);

rns.on('data', chunk => {
  console.log(chunk);
});

这样我们可以看到控制台打印出了类似下面的结果

<Buffer 39 35 37 0a>
<Buffer 31 30 35 37 0a>
<Buffer 38 35 31 30 0a>
<Buffer 33 30 35 35 0a>
<Buffer 34 36 34 32 0a>

当可读流生产出可供消费的数据后就会触发 data 事件,data 事件监听器绑定后,数据会被尽可能地传递。data 事件的监听器可以在第一个参数收到可读流传递过来的 Buffer 数据,这也就是我们打印的 chunk,如果想显示为数字,可以调用 Buffer 的 toString() 方法。

当数据处理完成后还会触发一个 end 事件,应为流的处理不是同步调用,所以如果我们希望完事后做一些事情就需要监听这个事件,我们在代码最后追加一句:

rns.on('end', () => {
  console.log('done');
});

这样可以在数据接收完了显示 'done'

当然数据处理过程中出现了错误会触发 error 事件,我们同样可以监听,做异常处理:

rns.on('error', (err) => {
  console.log(err);
});

read(size)

流在暂停模式下需要程序显式调用 read() 方法才能得到数据。read() 方法会从内部缓冲区中拉取并返回若干数据,当没有更多可用数据时,会返回null。

使用 read() 方法读取数据时,如果传入了 size 参数,那么它会返回指定字节的数据;当指定的size字节不可用时,则返回null。如果没有指定size参数,那么会返回内部缓冲区中的所有数据。

现在有一个矛盾了,在流动模式下流生产出了数据,然后触发 data 事件通知给程序,这样很方便。在暂停模式下需要程序去读取,那么就有一种可能是读取的时候还没生产好,如果我们才用轮询的方式未免效率有些低。

NodeJS 为我们提供了一个 readable 的事件,事件在可读流准备好数据的时候触发,也就是先监听这个事件,收到通知又数据了我们再去读取就好了:

const rns = new RandomNumberStream(5);

rns.on('readable', () => {
  let chunk;
  while((chunk = rns.read()) !== null){
    console.log(chunk);
  }
});

这样我们同样可以读取到数据,值得注意的一点是并不是每次调用 read() 方法都可以返回数据,前面提到了如果可用的数据没有达到 size 那么返回 null,所以我们在程序中加了个判断。

数据会不会漏掉

开始使用流动模式的时候我经常会担心一个问题,上面代码中可读流在创建好的时候就生产数据了,那么会不会在我们绑定 readable 事件之前就生产了某些数据,触发了 readable 事件,我们还没有绑定,这样不是极端情况下会造成开头数据的丢失嘛

可事实并不会,按照 NodeJS event loop 我们创建流和调用事件监听在一个事件队列里面,儿生产数据由于涉及到异步操作,已经处于了下一个事件队列,我们监听事件再慢也会比数据生产块,数据不会丢失。

看到这里,大家其实对 data事件、readable事件触发时机, read() 方法每次读多少数据,什么时候返回 null 还有又一定的疑问,因为到现在为止我们接触到的仍然是一个黑盒,后面我们介绍了可写流后会在 back pressure 机制部分对这些内部细节结合源码详细讲解,且听下回分解吧。

更多编程相关知识,请访问:编程入门!!

Das obige ist der detaillierte Inhalt vonLesbare Streams in NodeJS verstehen. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:cnblogs.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen