首頁 >web前端 >js教程 >聊聊Node.js stream 模組,看看如何建立高效能的應用

聊聊Node.js stream 模組,看看如何建立高效能的應用

青灯夜游
青灯夜游轉載
2022-03-28 20:25:382571瀏覽

這篇文章帶大家了解 Node stream 模組,介紹如何使用 Stream 建立高效能的 Node.js 應用,希望對大家有幫助!

聊聊Node.js stream 模組,看看如何建立高效能的應用

當你在鍵盤上輸入字符,從磁碟讀取檔案或在網路上下載檔案時,一股訊息流(bits)在流經不同的裝置和應用。

如果你學會處理這些位元組流,你將能建立高效能且有價值的應用程式。例如,試想一下當你在 YouTube 觀看影片時,你不需要一直等待直到完整的影片下載完。一旦有一個小緩衝,影片就會開始播放,而剩下的會在你觀看時繼續下載。

Nodejs 包含一個內建模組 stream 可以讓我們處理流資料。在這篇文章中,我們將透過幾個簡單的範例來講解stream 的用法,我們也會描述在面對複雜案例建立高效能應用時,應該如何建立管道去合併不同的流。

在我們深入理解應用程式建置之前,理解 Node.js stream 模組提供的特性很重要。

讓我們開始吧!

Node.js 流的型別

Node.js stream 提供了四種類型的流

  • #可讀流(Readable Streams)
  • 可寫入流(Writable Streams)
  • 雙工流(Duplex Streams)
  • 轉換流(Transform Streams)

更多詳情請查看Node.js 官方文件

https://nodejs.org/api/stream.html#stream_types_of_streams

##讓我們在高層面來看看每一種流類型吧。

可讀流

可讀流可以從一個特定的資料來源讀取數據,最常見的是從一個檔案系統讀取。 Node.js 應用中其他常見的可讀流用法有:

  • process.stdin -透過 stdin  在終端應用中讀取使用者輸入。
  • http.IncomingMessage - 在 HTTP 服務中讀取傳入的請求內容或在 HTTP 用戶端中讀取伺服器的 HTTP 回應。

可寫入流

你可以使用可寫入流將來自應用程式的資料寫入到特定的地方,例如一個檔案。

process.stdout  可以用來將資料寫成標準輸出且被 console.log 內部使用。

接下來是雙工流和轉換流,可以被定義為基於可讀流和可寫流的混合流類型。

雙工流

雙工流是可讀流和可寫流的結合,它既可以將資料寫入到特定的地方也可以從數據來源讀取資料。最常見的雙工流程案例是

net.Socket,它被用來從 socket 讀寫資料。

有一點很重要,雙工流中的可讀端和可寫端的操作是相互獨立的,資料不會從一端流向另一端。

轉換流

轉換流與雙工流略有相似,但在轉換流中,可讀端和可寫端是相關聯的。

crypto.Cipher 類別就是一個很好的例子,它實作了加密流。透過 crypto.Cipher 流,應用可以往流的可寫端寫入純文字資料並從流的可讀端讀取加密後的密文。之所以將這種類型的流稱之為轉換流就是因為其轉換性質。

附註:另一個轉換流是 stream.PassThroughstream.PassThrough 從可寫入端傳遞資料到可讀端,沒有任何轉換。這聽起來可能有點多餘,但 Passthrough 流對建立自訂流以及流管道非常有幫助。 (例如創建一個流的資料的多個副本)

從可讀的Node.js 流讀取資料

一旦可讀流連接到生產資料的源頭,例如一個文件,就可以用幾種方法透過該流讀取資料。

首先,先創建一個名為

myfile  的簡單的text 文件,85 個位元組大小,包含以下字串:

Lorem ipsum dolor sit amet, consectetur adipiscing elit. Curabitur nec mauris turpis.

現在,我們看下從可讀流讀取資料的兩種不同方式。

1. 監聽data 事件

#從可讀流讀取資料最常見的方式是監聽流發出的

data 事件。以下程式碼示範了這種方式:

const fs = require('fs')
const readable = fs.createReadStream('./myfile', { highWaterMark: 20 });

readable.on('data', (chunk) => {
    console.log(`Read ${chunk.length} bytes\n"${chunk.toString()}"\n`);
})

highWaterMark 屬性作為一個選項傳遞給 fs.createReadStream,用於決定該流中有多少資料緩衝。然後資料被沖到讀取機制(在這個案例中,是我們的 data 處理程序)。預設情況下,可讀 fs 流的 highWaterMark 值是 64kb。我們刻意重寫該值為 20 位元組用於觸發多個 data 事件。

如果你运行上述程序,它会在五个迭代内从 myfile 中读取 85 个字节。你会在 console 看到以下输出:

Read 20 bytes
"Lorem ipsum dolor si"

Read 20 bytes
"t amet, consectetur "

Read 20 bytes
"adipiscing elit. Cur"

Read 20 bytes
"abitur nec mauris tu"

Read 5 bytes
"rpis."

2. 使用异步迭代器

从可读流中读取数据的另一种方法是使用异步迭代器:

const fs = require('fs')
const readable = fs.createReadStream('./myfile', { highWaterMark: 20 });

(async () => {
    for await (const chunk of readable) {
        console.log(`Read ${chunk.length} bytes\n"${chunk.toString()}"\n`);
    }
})()

如果你运行这个程序,你会得到和前面例子一样的输出。

可读 Node.js 流的状态

当一个监听器监听到可读流的 data 事件时,流的状态会切换成”流动”状态(除非该流被显式的暂停了)。你可以通过流对象的 readableFlowing  属性检查流的”流动”状态

我们可以稍微修改下前面的例子,通过 data 处理器来示范:

const fs = require('fs')
const readable = fs.createReadStream('./myfile', { highWaterMark: 20 });

let bytesRead = 0

console.log(`before attaching 'data' handler. is flowing: ${readable.readableFlowing}`);
readable.on('data', (chunk) => {
    console.log(`Read ${chunk.length} bytes`);
    bytesRead += chunk.length

    // 在从可读流中读取 60 个字节后停止阅读
    if (bytesRead === 60) {
        readable.pause()
        console.log(`after pause() call. is flowing: ${readable.readableFlowing}`);

        // 在等待 1 秒后继续读取
        setTimeout(() => {
            readable.resume()
            console.log(`after resume() call. is flowing: ${readable.readableFlowing}`);
        }, 1000)
    }
})
console.log(`after attaching 'data' handler. is flowing: ${readable.readableFlowing}`);

在这个例子中,我们从一个可读流中读取  myfile,但在读取 60 个字节后,我们临时暂停了数据流 1 秒。我们也在不同的时间打印了 readableFlowing 属性的值去理解他是如何变化的。

如果你运行上述程序,你会得到以下输出:

before attaching 'data' handler. is flowing: null
after attaching 'data' handler. is flowing: true
Read 20 bytes
Read 20 bytes
Read 20 bytes
after pause() call. is flowing: false
after resume() call. is flowing: true
Read 20 bytes
Read 5 bytes

我们可以用以下来解释输出:

  • 当我们的程序开始时,readableFlowing 的值是 null,因为我们没有提供任何消耗流的机制。

  • 在连接到 data 处理器后,可读流变为“流动”模式,readableFlowing 变为 true

  • 一旦读取 60 个字节,通过调用 pause()来暂停流,readableFlowing 也转变为 false

  • 在等待 1 秒后,通过调用 resume(),流再次切换为“流动”模式,readableFlowing 改为 `true'。然后剩下的文件内容在流中流动。

通过 Node.js 流处理大量数据

因为有流,应用不需要在内存中保留大型的二进制对象:小型的数据块可以接收到就进行处理。

在这部分,让我们组合不同的流来构建一个可以处理大量数据的真实应用。我们会使用一个小型的工具程序来生成一个给定文件的 SHA-256。

但首先,我们需要创建一个大型的 4GB 的假文件来测试。你可以通过一个简单的 shell 命令来完成:

  • On macOS: mkfile -n 4g 4gb_file
  • On Linux: xfs_mkfile 4096m 4gb_file

在我们创建了假文件 4gb_file 后,让我们在不使用 stream 模块的情况下来生成来文件的 SHA-256 hash。

const fs = require("fs");
const crypto = require("crypto");

fs.readFile("./4gb_file", (readErr, data) => {
  if (readErr) return console.log(readErr)
  const hash = crypto.createHash("sha256").update(data).digest("base64");
  fs.writeFile("./checksum.txt", hash, (writeErr) => {
    writeErr && console.error(err)
  });
});

如果你运行以上代码,你可能会得到以下错误:

RangeError [ERR_FS_FILE_TOO_LARGE]: File size (4294967296) is greater than 2 GB
    at FSReqCallback.readFileAfterStat [as oncomplete] (fs.js:294:11) {
  code: 'ERR_FS_FILE_TOO_LARGE'
}

以上报错之所以发生是因为 JavaScript 运行时无法处理随机的大型缓冲。运行时可以处理的最大尺寸的缓冲取决于你的操作系统结构。你可以通过使用内建的 buffer 模块里的 buffer.constants.MAX_LENGTH 变量来查看你操作系统缓存的最大尺寸。

即使上述报错没有发生,在内存中保留大型文件也是有问题的。我们所拥有的可用的物理内存会限制我们应用能使用的内存量。高内存使用率也会造成应用在 CPU 使用方面性能低下,因为垃圾回收会变得昂贵。

使用  pipeline() 减少 APP 的内存占用

现在,让我们看看如何修改应用去使用流且避免遇到这个报错:

const fs = require("fs");
const crypto = require("crypto");
const { pipeline } = require("stream");

const hashStream = crypto.createHash("sha256");
hashStream.setEncoding('base64')

const inputStream = fs.createReadStream("./4gb_file");
const outputStream = fs.createWriteStream("./checksum.txt");

pipeline(
    inputStream,
    hashStream,
    outputStream,
    (err) => {
        err && console.error(err)
    }
)

在这个例子中,我们使用 crypto.createHash 函数提供的流式方法。它返回一个“转换”流对象 hashStream,为随机的大型文件生成 hash。

为了将文件内容传输到这个转换流中,我们使用 fs.createReadStream4gb_file 创建了一个可读流 inputStream。我们将 hashStream  转换流的输出传递到可写流 outputStream 中,而 checksum.txt 通过 fs.createWriteStream 创建的。

如果你运行以上程序,你将看见在 checksum.txt 文件中看见 4GB 文件的 SHA-256 hash。

对流使用 pipeline()pipe() 的对比

在前面的案例中,我们使用 pipeline 函数来连接多个流。另一种常见的方法是使用 .pipe() 函数,如下所示:

inputStream
  .pipe(hashStream)
  .pipe(outputStream)

但这里有几个原因,所以并不推荐在生产应用中使用 .pipe()。如果其中一个流被关闭或者出现报错,pipe() 不会自动销毁连接的流,这会导致应用内存泄露。同样的,pipe() 不会自动跨流转发错误到一个地方处理。

因为这些问题,所以就有了 pipeline(),所以推荐你使用 pipeline() 而不是 pipe() 来连接不同的流。 我们可以重写上述的 pipe() 例子来使用  pipeline() 函数,如下:

pipeline(
    inputStream,
    hashStream,
    outputStream,
    (err) => {
        err && console.error(err)
    }
)

pipeline() 接受一个回调函数作为最后一个参数。任何来自被连接的流的报错都将触发该回调函数,所以可以很轻松的在一个地方处理报错。

总结:使用 Node.js 流降低内存并提高性能

在 Node.js 中使用流有助于我们构建可以处理大型数据的高性能应用。

在这篇文章中,我们覆盖了:

  • 四种类型的 Node.js 流(可读流、可写流、双工流以及转换流)。
  • 如何通过监听 data 事件或者使用异步迭代器来从可读流中读取数据。
  • 通过使用 pipeline 连接多个流来减少内存占用。

一个简短的警告:你很可能不会遇到太多必须使用流的场景,而基于流的方案会提高你的应用的复杂性。务必确保使用流的好处胜于它所带来的复杂性。

更多node相关知识,请访问:nodejs 教程

以上是聊聊Node.js stream 模組,看看如何建立高效能的應用的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:juejin.cn。如有侵權,請聯絡admin@php.cn刪除