首頁 >web前端 >js教程 >Node Stream的運作機制解說(附範例)

Node Stream的運作機制解說(附範例)

不言
不言轉載
2018-10-23 16:07:242768瀏覽

這篇文章帶給大家的內容是關於Node Stream的運作機制講解(附範例),有一定的參考價值,有需要的朋友可以參考一下,希望對你有幫助。

如果你正在學習Node,那麼流一定是一個你需要掌握的概念。如果你想成為一個Node高手,那麼流一定是武功秘籍中不可缺少的一個部分。

引用自Stream-Handbook。由此可見,流對於深入學習Node的重要性。

流是什麼?

你可以把流理解成一種傳輸的能力。透過串流,可以以平緩的方式,無副作用的將資料傳輸到目的地。在Node中,Node Stream建立的流都是專用於String和Buffer上的,一般情況下會使用Buffer。 Stream表示的是一種傳輸能力,Buffer是傳輸內容的載體 (可以這樣理解,Stream:外帶小哥哥, Buffer:你的外帶)。創建流的時候將ObjectMode設定true ,Stream同樣可以傳輸任意類型的JS物件(除了null,null在流中有特殊用途)。

為什麼要使用流?

現在有個需求,我們要傳送一個大檔案到客戶端。如果採用下面的方式

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  fs.readFile('./big.file', (err, data) => {
    if (err) throw err;
    res.end(data);
  });
});

server.listen(8000);

每次接收一個請求,就要把這個大檔案讀入內存,然後再傳輸給客戶端。透過這種方式可能會產生以下三種後果:

  • 記憶體耗盡

  • #拖慢其他行程

  • ##增加垃圾回收器的負載

所以這種方式在傳輸大檔案的情況下,不是一個好的方案。並發量一大,幾百個請求過來很容易就將記憶體耗盡。

如果採用流呢?

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  const src = fs.createReadStream('./big.file');
  src.pipe(res);
});

server.listen(8000);
採用這種方式,不會佔用太多內存,讀取一點就傳輸一點,整個過程平緩進行,非常優雅。如果想在傳輸的過程中,想對檔案進行處理,例如壓縮、加密等等,也很好擴充(後面會具體介紹)。

流在Node中無所不在。從下圖可以看出:

Node Stream的運作機制解說(附範例)

Stream分類

Stream分成四大類:

  • #Readable(可讀流)

Writable (可寫入流)

  • Duplex (雙工流)

  • Transform (轉換流)

Readable

可讀流中的數據,在以下兩種模式下都能產生數據。

Flowing Mode

Non-Flowing Mode

Node Stream的運作機制解說(附範例)兩種模式下,觸發的方式以及消耗的方式不一樣。

Flowing Mode:資料會源源不絕地生產出來,形成「流動」現象。監聽流的

data事件便可進入該模式。 Non-Flowing Mode下:需要顯示地呼叫read()方法,才能取得資料。 兩種模式可以互相轉換#串流的初始狀態是Null,透過監聽data事件,或pipe方法,呼叫resume方法,將流轉為

Flowing Mode

狀態。

Flowing Mode
狀態下呼叫
pause

方法,將流置為

Non-Flowing Mode

狀態。

Non-Flowing Mode

狀態下呼叫resumeNode Stream的運作機制解說(附範例)方法,同樣可以將流置為Flowing Mode

狀態。

下面詳細介紹下兩種模式下,Readable流的運作機制。

Flowing Mode
  • 在Flowing Mode狀態下,建立的myReadable讀流,直接監聽data事件,資料就源源不斷的流出來進行消費了。

    myReadable.on('data',function(chunk){
          consume(chunk);//消费流
    })
    一旦監聽data事件之後,Readable內部的流程如下圖所示
  • #核心的方法是流內部的read方法,它在參數n為不同值時,分別觸發不同的操作。下面描述中的

    hightwatermark
  • 表示的是流內部的緩衝池的大小。
  • n=undefined(消費性數據,並觸發一次可讀流)
  • ###n=0(觸發一次可讀流,但不會消費)############n>hightwatermark(修改hightwatermark的值)############nbuffer (可以傳回null,也可以傳回buffer所有的資料(當時最後一次讀取))###

图中黄色标识的_read(),是用户实现流所需要自己实现的方法,这个方法就是实际读取流的方式(可以这样理解,外卖平台给你提供外卖的能力,那_read()方法就相当于你下单点外卖)。后面会详细介绍如何实现_read方法。

以上的流程可以描述为:监听data方法,Readable内部就会调用read方法,来进行触发读流操作,通过判断是同步还是异步读取,来决定读取的数据是否放入缓冲区。如果为异步的,那么就要调用flow方法,来继续触发read方法,来读取流,同时根据size参数判定是否emit('data')来消费流,循环读取。如果是同步的,那就emit('data')来消费流,同时继续触发read方法,来读取流。一旦push方法传入的是null,整个流就结束了。

从使用者的角度来看,在这种模式下,你可以通过下面的方式来使用流

const fs = require('./fs');
const readFile = fs.createReadStream('./big.file');
const writeFile = fs.createWriteStream('./writeFile.js');
readFile.on('data',function(chunk){
      writeFile1.write(chunk);
})
Non-Flowing Mode

相对于Flowing mode,Non-Flowing Mode要相对简单很多。

消费该模式下的流,需要使用下面的方式

myReadable.on(‘readable’,function(){
     const chunk = myReadable.read()
     consume(chunk);//消费流
})

在Non-Flowing Mode下,Readable内部的流程如下图:

Node Stream的運作機制解說(附範例)

从这个图上看出,你要实现该模式的读流,同样要实现一个_read方法。

整个流程如下:监听readable方法,Readable内部就会调用read方法。调用用户实现的_read方法,来push数据到缓冲池,然后发送emit readable事件,通知用户端消费。

从使用者的角度来看,你可以通过下面的方式来使用该模式下的流

const fs = require('fs');
const readFile = fs.createReadStream('./big.file');
const writeFile = fs.createWriteStream('./writeFile.js');

readFile.on('readable',function(chunk) {
    while (null !== (chunk = myReadable.read())) {
        writeFile.write(chunk);
    }
});

Writable

相对于读流,写流的机制就更容易理解了。

写流使用下面的方式进行数据写入

myWrite.write(chunk);

调用write后,内部Writable的流程如下图所示

Node Stream的運作機制解說(附範例)

类似于读流,实现一个写流,同样需要用户实现一个_write方法。

整个流程是这样的:调用write之后,会首先判定是否要写入缓冲区。如果不需要,那就调用用户实现的_write方法,将流写入到相应的地方,_write会调用一个writeable内部的一个回调函数。

从使用者的角度来看,使用一个写流,采用下面的代码所示的方式。

const fs = require('fs');
const readFile = fs.createReadStream('./big.file');
const writeFile = fs.createWriteStream('./writeFile.js');

readFile.on('data',function(chunk) {
    writeFile.write(chunk);
})

可以看到,使用写流是非常简单的。

我们先讲解一下如何实现一个读流和写流,再来看Duplex和Transform是什么,因为了解了如何实现一个读流和写流,再来理解Duplex和Transform就非常简单了。

实现自定义的Readable

实现自定义的Readable,只需要实现一个_read方法即可,需要在_read方法中调用push方法来实现数据的生产。如下面的代码所示:

const Readable = require('stream').Readable;

class MyReadable extends Readable {
    constructor(dataSource, options) {
        super(options);
        this.dataSource = dataSource;
    }
    _read() {
        const data = this.dataSource.makeData();
        setTimeout(()=>{
            this.push(data);
        });
    }
}

// 模拟资源池
const dataSource = {
    data: new Array(10).fill('-'),
    makeData() {
        if (!dataSource.data.length) return null;
        return dataSource.data.pop();
    }
};

const myReadable = new MyReadable(dataSource,);

myReadable.on('readable', () => {
    let chunk;
    while (null !== (chunk = myReadable.read())) {
        console.log(chunk);
    }
});

实现自定义的writable

实现自定义的writable,只需要实现一个_write方法即可。在_write中消费chunk写入到相应地方,并且调用callback回调。如下面代码所示:

const Writable = require('stream').Writable;
class Mywritable extends  Writable{
    constuctor(options){
        super(options);
    }
    _write(chunk,endcoding,callback){
        console.log(chunk);
        callback && callback();
    }
}
const myWritable = new Mywritable();

Duplex

双工流:简单理解,就是讲一个Readable流和一个Writable流绑定到一起,它既可以用来做读流,又可以用来做写流。

实现一个Duplex流,你需要同时实现_read_write方法。

有一点需要注意的是:它所包含的 Readable流和Writable流是完全独立,互不影响的两个流,两个流使用的不是同一个缓冲区。通过下面的代码可以验证

// 模拟资源池1
const dataSource1 = {
    data: new Array(10).fill('a'),
    makeData() {
        if (!dataSource1.data.length) return null;
        return dataSource1.data.pop();
    }
};
// 模拟资源池2
const dataSource2 = {
    data: new Array(10).fill('b'),
    makeData() {
        if (!dataSource2.data.length) return null;
        return dataSource2.data.pop();
    }
};

const Readable = require('stream').Readable;
class MyReadable extends Readable {
    constructor(dataSource, options) {
        super(options);
        this.dataSource = dataSource;
    }
    _read() {
        const data = this.dataSource.makeData();
        setTimeout(()=>{
            this.push(data);
        })

    }
}

const Writable = require('stream').Writable;
class MyWritable extends Writable{
    constructor(options){
        super(options);
    }
    _write(chunk, encoding, callback) {
        console.log(chunk.toString());
        callback && callback();
    }
}

const Duplex = require('stream').Duplex;
class MyDuplex extends Duplex{
    constructor(dataSource,options) {
        super(options);
        this.dataSource = dataSource;
    }
    _read() {
        const data = this.dataSource.makeData();
        setTimeout(()=>{
            this.push(data);
        })
    }
    _write(chunk, encoding, callback) {
        console.log(chunk.toString());
        callback && callback();
    }
}

const myWritable = new MyWritable();
const myReadable = new MyReadable(dataSource1);
const myDuplex = new MyDuplex(dataSource1);
myReadable.pipe(myDuplex).pipe(myWritable);

打印的结果是

abababababababababab

从这个结果可以看出,myReadable.pipe(myDuplex),myDuplex充当的是写流,写入的内容是a;myDuplex.pipe(myWritable),myDuplex充当的是读流,往myWritable写的却是b;所以说它所包含的 Readable流和Writable流是完全独立的。

Transform

理解了Duplex,就更好理解Transform了。Transform是一个转换流,它既有读的功能又有写的功能,但是它和Duplex不同的是,它的读流和写流共用同一个缓冲区;也就是说,通过它读入什么,那它就能写入什么。

实现一个Transform,你只需要实现一个_transform方法。比如最简单的Transform:PassThrough,其源代码如下所示

Node Stream的運作機制解說(附範例)

PassThrough就是一个Transform,但是这个转换流,什么也没做,相当于一个透明的转换流。可以看到_transform中什么都没有,只是简单的将数据进行回调。

如果我们在这个环节做些扩展,只需要在_transform中直接扩展就行了。比如我们可以对流进行压缩,加密,混淆等等操作。

BackPress

最后介绍一个流中非常重要的一个概念:背压。要了解这个,我们首先来看下pipehighWaterMaker是什么。

pipe

首先看下下面的代码

const fs = require('./fs');
const readFile = fs.createReadStream('./big.file');
const writeFile = fs.createWriteStream('./writeFile.js');
readFile.pipe(writeFile);

上面的代码和下面是等价的

const fs = require('./fs');
const readFile = fs.createReadStream('./big.file');
const writeFile = fs.createWriteStream('./writeFile.js');
readFile.on('data',function(data){
    var flag = ws.write(data);
    if(!flag){ // 当前写流缓冲区已满,暂停读数据
        readFile.pause();
    }
})
writeFile.on('drain',function()){
    readFile.resume();// 当前写流缓冲区已清空,重新开始读流
}
readFile.on('end',function(data){
    writeFile.end();//将写流缓冲区的数据全部写入,并且关闭写入的文件
})

pipe所做的操作就是相当于为写流和读流自动做了速度的匹配。

读写流速度不匹配的情况下,一般情况下不会造成什么问题,但是会造成内存增加。内存消耗增加,就有可能会带来一系列的问题。所以在使用的流的时候,强烈推荐使用pipe

highWaterMaker

highWaterMaker说白了,就是定义缓冲区的大小。

  • 默认16Kb(Readable最大8M)

  • 可以自定义

背压的概念可以理解为:为了防止读写流速度不匹配而产生的一种调整机制;背压该调整机制的触发时机,受限于highWaterMaker设置的大小。

如上面的代码 var flag = ws.write(data);,一旦写流的缓冲区满了,那flag就会置为false,反向促进读流的速度调整。

Stream的应用场景

主要有以下场景

  1. 文件操作(复制,压缩,解压,加密等)

下面的就很容易就实现了文件复制的功能。

const fs = require('fs');
const readFile = fs.createReadStream('big.file');
const writeFile = fs.createWriteStream('big_copy.file');
readFile.pipe(writeFile);

那我们想在复制的过程中对文件进行压缩呢?

const fs = require('fs');
const readFile = fs.createReadStream('big.file');
const writeFile = fs.createWriteStream('big.gz');
const zlib = require('zlib');
readFile.pipe(zlib.createGzip()).pipe(writeFile);

实现解压、加密也是类似的。

  1. 静态文件服务器

比如需要返回一个html,可以使用如下代码。

var http = require('http');
var fs = require('fs');
http.createServer(function(req,res){
    fs.createReadStream('./a.html').pipe(res);
}).listen(8000);

以上是Node Stream的運作機制解說(附範例)的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:segmentfault.com。如有侵權,請聯絡admin@php.cn刪除