Maison  >  Article  >  interface Web  >  Explication du mécanisme de fonctionnement de Node Stream (avec exemples)

Explication du mécanisme de fonctionnement de Node Stream (avec exemples)

不言
不言avant
2018-10-23 16:07:242671parcourir

Cet article vous apporte une explication du mécanisme de fonctionnement de Node Stream (avec des exemples). Il a une certaine valeur de référence. Les amis dans le besoin peuvent s'y référer.

Si vous apprenez Node, les flux doivent être un concept que vous devez maîtriser. Si vous voulez devenir un maître Node, alors le flow doit être un élément indispensable des secrets des arts martiaux.

Cité de Stream-Handbook. Cela montre l’importance des flux pour un apprentissage approfondi de Node.

Qu’est-ce que le flux ?

Vous pouvez considérer le streaming comme une capacité de transmission. Grâce aux flux, les données peuvent être transférées vers la destination de manière fluide et sans effets secondaires. Dans Node, les flux créés par Node Stream sont dédiés à String et Buffer. Généralement, Buffer est utilisé. Stream représente une capacité de transmission, et Buffer est le support de transmission du contenu ( peut être compris de cette façon, Stream : le frère à emporter, Buffer : votre plat à emporter ). Définissez ObjectMode sur true lors de la création d'un flux. Stream peut également transmettre tout type d'objet JS (sauf null, qui a des utilisations spéciales dans les flux).

Pourquoi utiliser les flux ?

Maintenant, il y a une exigence, nous devons transférer un gros fichier au client. Si la méthode suivante est utilisée

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  fs.readFile('./big.file', (err, data) => {
    if (err) throw err;
    res.end(data);
  });
});

server.listen(8000);

Chaque fois qu'une demande est reçue, le gros fichier doit être lu en mémoire puis transféré au client. Les trois conséquences suivantes peuvent ainsi se produire :

  • Épuisement de la mémoire

  • Ralentissement d'autres processus

  • Augmentez la charge du garbage collector

Cette méthode n'est donc pas une bonne solution lors du transfert de fichiers volumineux. Le degré de concurrence est important et des centaines de requêtes peuvent facilement épuiser la mémoire.

Et si on utilisait le streaming ?

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  const src = fs.createReadStream('./big.file');
  src.pipe(res);
});

server.listen(8000);

De cette façon, cela ne prendra pas trop de mémoire. Il suffit de lire et de transférer un peu. L'ensemble du processus se déroule sans problème et est très élégant. Si vous souhaitez traiter les fichiers pendant le processus de transmission, comme la compression, le cryptage, etc., il est également facile de les développer (ce sera présenté en détail plus tard).

Les streams sont partout dans Node. Comme le montre l'image ci-dessous :

Explication du mécanisme de fonctionnement de Node Stream (avec exemples)

Classification des flux

Le flux est divisé en quatre grandes catégories :

  • Lisible (flux lisible)

  • Écrit (flux inscriptible)

  • Duplex (flux duplex)

  • Transformation (flux de conversion)

Lisible

Les données du flux lisible peuvent être générées dans les deux modes de données suivants.

  • Mode fluide

  • Mode non fluide

Méthodes de déclenchement dans les deux modes Et le le mode de consommation est différent.

Mode fluide : les données seront produites en continu, formant un phénomène « fluide ». Ce mode peut être accédé en écoutant l'événement data du flux.

En mode non fluide : vous devez appeler explicitement la méthode read() pour obtenir des données.

Les deux modes peuvent être convertis entre eux

Explication du mécanisme de fonctionnement de Node Stream (avec exemples)

L'état initial du flux est Null, en écoutant le data événement, ou pipe méthode, appelle la méthode resume pour transférer le flux vers l'état Flowing Mode. Appelez la méthode Flowing Mode dans l'état pause pour définir le flux sur l'état Non-Flowing Mode. L’appel de la méthode Non-Flowing Mode dans l’état resume peut également définir le flux sur l’état Flowing Mode.

Ce qui suit est une introduction détaillée au mécanisme de fonctionnement du flux Readable dans les deux modes.

Mode fluide

Dans l'état Mode fluide, le flux de lecture myReadable créé surveille directement l'événement de données et les données sortent en continu pour être consommées.

myReadable.on('data',function(chunk){
      consume(chunk);//消费流
})

Une fois l'événement de données écouté, le processus interne de Readable est comme indiqué ci-dessous

Explication du mécanisme de fonctionnement de Node Stream (avec exemples)

La méthode de base est à l'intérieur du flux La méthode read déclenche différentes opérations lorsque le paramètre n est une valeur différente. Le hightwatermark dans la description ci-dessous représente la taille du pool tampon à l'intérieur du flux.

  • n=indéfini (consommer des données et déclencher un flux lisible)

  • n=0 (déclencher un flux lisible mais ne consommera pas)

  • n>hightwatermark (modifier la valeur de hightwatermark)

  • n

  • n>buffer (peut renvoyer null ou toutes les données du tampon (la dernière lecture à ce moment-là))

图中黄色标识的_read(),是用户实现流所需要自己实现的方法,这个方法就是实际读取流的方式(可以这样理解,外卖平台给你提供外卖的能力,那_read()方法就相当于你下单点外卖)。后面会详细介绍如何实现_read方法。

以上的流程可以描述为:监听data方法,Readable内部就会调用read方法,来进行触发读流操作,通过判断是同步还是异步读取,来决定读取的数据是否放入缓冲区。如果为异步的,那么就要调用flow方法,来继续触发read方法,来读取流,同时根据size参数判定是否emit('data')来消费流,循环读取。如果是同步的,那就emit('data')来消费流,同时继续触发read方法,来读取流。一旦push方法传入的是null,整个流就结束了。

从使用者的角度来看,在这种模式下,你可以通过下面的方式来使用流

const fs = require('./fs');
const readFile = fs.createReadStream('./big.file');
const writeFile = fs.createWriteStream('./writeFile.js');
readFile.on('data',function(chunk){
      writeFile1.write(chunk);
})
Non-Flowing Mode

相对于Flowing mode,Non-Flowing Mode要相对简单很多。

消费该模式下的流,需要使用下面的方式

myReadable.on(‘readable’,function(){
     const chunk = myReadable.read()
     consume(chunk);//消费流
})

在Non-Flowing Mode下,Readable内部的流程如下图:

Explication du mécanisme de fonctionnement de Node Stream (avec exemples)

从这个图上看出,你要实现该模式的读流,同样要实现一个_read方法。

整个流程如下:监听readable方法,Readable内部就会调用read方法。调用用户实现的_read方法,来push数据到缓冲池,然后发送emit readable事件,通知用户端消费。

从使用者的角度来看,你可以通过下面的方式来使用该模式下的流

const fs = require('fs');
const readFile = fs.createReadStream('./big.file');
const writeFile = fs.createWriteStream('./writeFile.js');

readFile.on('readable',function(chunk) {
    while (null !== (chunk = myReadable.read())) {
        writeFile.write(chunk);
    }
});

Writable

相对于读流,写流的机制就更容易理解了。

写流使用下面的方式进行数据写入

myWrite.write(chunk);

调用write后,内部Writable的流程如下图所示

Explication du mécanisme de fonctionnement de Node Stream (avec exemples)

类似于读流,实现一个写流,同样需要用户实现一个_write方法。

整个流程是这样的:调用write之后,会首先判定是否要写入缓冲区。如果不需要,那就调用用户实现的_write方法,将流写入到相应的地方,_write会调用一个writeable内部的一个回调函数。

从使用者的角度来看,使用一个写流,采用下面的代码所示的方式。

const fs = require('fs');
const readFile = fs.createReadStream('./big.file');
const writeFile = fs.createWriteStream('./writeFile.js');

readFile.on('data',function(chunk) {
    writeFile.write(chunk);
})

可以看到,使用写流是非常简单的。

我们先讲解一下如何实现一个读流和写流,再来看Duplex和Transform是什么,因为了解了如何实现一个读流和写流,再来理解Duplex和Transform就非常简单了。

实现自定义的Readable

实现自定义的Readable,只需要实现一个_read方法即可,需要在_read方法中调用push方法来实现数据的生产。如下面的代码所示:

const Readable = require('stream').Readable;

class MyReadable extends Readable {
    constructor(dataSource, options) {
        super(options);
        this.dataSource = dataSource;
    }
    _read() {
        const data = this.dataSource.makeData();
        setTimeout(()=>{
            this.push(data);
        });
    }
}

// 模拟资源池
const dataSource = {
    data: new Array(10).fill('-'),
    makeData() {
        if (!dataSource.data.length) return null;
        return dataSource.data.pop();
    }
};

const myReadable = new MyReadable(dataSource,);

myReadable.on('readable', () => {
    let chunk;
    while (null !== (chunk = myReadable.read())) {
        console.log(chunk);
    }
});

实现自定义的writable

实现自定义的writable,只需要实现一个_write方法即可。在_write中消费chunk写入到相应地方,并且调用callback回调。如下面代码所示:

const Writable = require('stream').Writable;
class Mywritable extends  Writable{
    constuctor(options){
        super(options);
    }
    _write(chunk,endcoding,callback){
        console.log(chunk);
        callback && callback();
    }
}
const myWritable = new Mywritable();

Duplex

双工流:简单理解,就是讲一个Readable流和一个Writable流绑定到一起,它既可以用来做读流,又可以用来做写流。

实现一个Duplex流,你需要同时实现_read_write方法。

有一点需要注意的是:它所包含的 Readable流和Writable流是完全独立,互不影响的两个流,两个流使用的不是同一个缓冲区。通过下面的代码可以验证

// 模拟资源池1
const dataSource1 = {
    data: new Array(10).fill('a'),
    makeData() {
        if (!dataSource1.data.length) return null;
        return dataSource1.data.pop();
    }
};
// 模拟资源池2
const dataSource2 = {
    data: new Array(10).fill('b'),
    makeData() {
        if (!dataSource2.data.length) return null;
        return dataSource2.data.pop();
    }
};

const Readable = require('stream').Readable;
class MyReadable extends Readable {
    constructor(dataSource, options) {
        super(options);
        this.dataSource = dataSource;
    }
    _read() {
        const data = this.dataSource.makeData();
        setTimeout(()=>{
            this.push(data);
        })

    }
}

const Writable = require('stream').Writable;
class MyWritable extends Writable{
    constructor(options){
        super(options);
    }
    _write(chunk, encoding, callback) {
        console.log(chunk.toString());
        callback && callback();
    }
}

const Duplex = require('stream').Duplex;
class MyDuplex extends Duplex{
    constructor(dataSource,options) {
        super(options);
        this.dataSource = dataSource;
    }
    _read() {
        const data = this.dataSource.makeData();
        setTimeout(()=>{
            this.push(data);
        })
    }
    _write(chunk, encoding, callback) {
        console.log(chunk.toString());
        callback && callback();
    }
}

const myWritable = new MyWritable();
const myReadable = new MyReadable(dataSource1);
const myDuplex = new MyDuplex(dataSource1);
myReadable.pipe(myDuplex).pipe(myWritable);

打印的结果是

abababababababababab

从这个结果可以看出,myReadable.pipe(myDuplex),myDuplex充当的是写流,写入的内容是a;myDuplex.pipe(myWritable),myDuplex充当的是读流,往myWritable写的却是b;所以说它所包含的 Readable流和Writable流是完全独立的。

Transform

理解了Duplex,就更好理解Transform了。Transform是一个转换流,它既有读的功能又有写的功能,但是它和Duplex不同的是,它的读流和写流共用同一个缓冲区;也就是说,通过它读入什么,那它就能写入什么。

实现一个Transform,你只需要实现一个_transform方法。比如最简单的Transform:PassThrough,其源代码如下所示

Explication du mécanisme de fonctionnement de Node Stream (avec exemples)

PassThrough就是一个Transform,但是这个转换流,什么也没做,相当于一个透明的转换流。可以看到_transform中什么都没有,只是简单的将数据进行回调。

如果我们在这个环节做些扩展,只需要在_transform中直接扩展就行了。比如我们可以对流进行压缩,加密,混淆等等操作。

BackPress

最后介绍一个流中非常重要的一个概念:背压。要了解这个,我们首先来看下pipehighWaterMaker是什么。

pipe

首先看下下面的代码

const fs = require('./fs');
const readFile = fs.createReadStream('./big.file');
const writeFile = fs.createWriteStream('./writeFile.js');
readFile.pipe(writeFile);

上面的代码和下面是等价的

const fs = require('./fs');
const readFile = fs.createReadStream('./big.file');
const writeFile = fs.createWriteStream('./writeFile.js');
readFile.on('data',function(data){
    var flag = ws.write(data);
    if(!flag){ // 当前写流缓冲区已满,暂停读数据
        readFile.pause();
    }
})
writeFile.on('drain',function()){
    readFile.resume();// 当前写流缓冲区已清空,重新开始读流
}
readFile.on('end',function(data){
    writeFile.end();//将写流缓冲区的数据全部写入,并且关闭写入的文件
})

pipe所做的操作就是相当于为写流和读流自动做了速度的匹配。

读写流速度不匹配的情况下,一般情况下不会造成什么问题,但是会造成内存增加。内存消耗增加,就有可能会带来一系列的问题。所以在使用的流的时候,强烈推荐使用pipe

highWaterMaker

highWaterMaker说白了,就是定义缓冲区的大小。

  • 默认16Kb(Readable最大8M)

  • 可以自定义

背压的概念可以理解为:为了防止读写流速度不匹配而产生的一种调整机制;背压该调整机制的触发时机,受限于highWaterMaker设置的大小。

如上面的代码 var flag = ws.write(data);,一旦写流的缓冲区满了,那flag就会置为false,反向促进读流的速度调整。

Stream的应用场景

主要有以下场景

  1. 文件操作(复制,压缩,解压,加密等)

下面的就很容易就实现了文件复制的功能。

const fs = require('fs');
const readFile = fs.createReadStream('big.file');
const writeFile = fs.createWriteStream('big_copy.file');
readFile.pipe(writeFile);

那我们想在复制的过程中对文件进行压缩呢?

const fs = require('fs');
const readFile = fs.createReadStream('big.file');
const writeFile = fs.createWriteStream('big.gz');
const zlib = require('zlib');
readFile.pipe(zlib.createGzip()).pipe(writeFile);

实现解压、加密也是类似的。

  1. 静态文件服务器

比如需要返回一个html,可以使用如下代码。

var http = require('http');
var fs = require('fs');
http.createServer(function(req,res){
    fs.createReadStream('./a.html').pipe(res);
}).listen(8000);

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer