首頁 >web前端 >js教程 >聊聊Nodejs中的核心模組:stream流模組(看看如何使用)

聊聊Nodejs中的核心模組:stream流模組(看看如何使用)

青灯夜游
青灯夜游轉載
2021-12-20 11:13:082357瀏覽

本篇文章帶大家詳細理解一下Nodejs中的stream流模組,介紹一下stream流概念及用法,希望對大家有幫助!

聊聊Nodejs中的核心模組:stream流模組(看看如何使用)

stream流模組,是Node中非常核心的一個模組,其它模組如fs、http等都基於流stream模組的實例。

而對於大多前端小白在剛入門Node的學習過程中,對於流的概念及使用還是不太好清晰的理解,因為在前端的工作中似乎很少有過關於"流"處理相關的應用。

1. 流,是什麼?

單純「流」這個字,我們很容易產生水流,流動等的概念。

官方定義:流,是用於在Node.js 中處理流資料的抽象介面

#從官方的定義中,我們可以看出:

  • 流,是Node提供的一種處理資料的工具
  • #流,是Node中的一種抽象介面

準確的理解,流,可以理解為資料流,它是一種用來傳輸資料的手段,在一個應用程式中,流,是一種有序的,有起點和終點的資料流。

造成我們對stream流不太好的理解的主要原因就是,它是一種抽象的概念。

2. 流,的具體使用場景

為了讓我們能夠清楚的理解stream模組,我們首先來以具體的應用場景來說明stream模組有哪些實際應用之處。

stream流,在Node中主要應用在大量資料處理的需求上,如fs對大檔案的讀取和寫入、http請求回應、檔案的壓縮、資料的加密/解密等應用程式。

聊聊Nodejs中的核心模組:stream流模組(看看如何使用)

我們以上面的圖片說明流的使用,水桶可以理解為資料來源,池可以理解為資料目標 ,中間連接的管道,我們可以理解為資料流,透過資料流管道,資料從資料來源流向資料目標。

3. 流的分類

在Node中,流被分為4類:可讀流,可寫流,雙工流,轉換流。

  • Writable: 可以寫入資料的流
  • Readable: 可以從中讀取資料的流
  • Duplex#Readable 和 Writable 的流
  • #Transform: 可以在寫入和讀取資料時修改或轉換資料的 Duplex 流

所有的流都是 EventEmitter 的實例。即我們可以透過事件機制監聽資料流的變化。

4. 資料模式與快取區

在深入學習4類流的具體使用之前,我們需要先理解兩個概念資料模式快取區,有助於我們在接下來流的學習中更好的理解。

4.1 資料模式

Node.js API 所建立的所有流都只對字串#和  Buffer(或 Uint8Array)物件進行操作。

4.2 快取區域

Writable和 Readable 流都會將資料儲存在內部緩衝區(buffer)中。

可緩衝的資料量取決於傳給流的建構函數的 highWaterMark 選項, 對於普通的流,highWaterMark 選項指定位元組的總數 ;對於在物件模式下操作的流,highWaterMark選項指定物件的總數。

highWaterMark 選項是閾值,而不是限制:它規定了流在停止要求更多資料之前緩衝的資料量。

當實作呼叫 stream.push(chunk)# 時,資料在 Readable# 流中快取資料。如果流的消費者沒有呼叫 stream.read(),則資料會一直駐留在內部佇列中,直到被消費為止。

一旦內部讀取緩衝區的總大小達到 highWaterMark 指定的閾值,則流將暫時停止從底層資源讀取數據,直到可以消費當前緩衝的數據

當重複呼叫 writable.write(chunk) 方法時,資料會被快取在 Writable 流中。

5. 可讀流

5.1 流讀取的流與暫停

#Readable流以兩種模式之一有效地運行:流動和暫停。

  • 流動模式:從系統底層讀取資料並push()到快取區,達到highWaterMark後push() 會回傳false,資源停止流向快取區,並觸發data事件消費數據。

  • 暫停模式:所有的Readable流都是以Paused暫停模式開始,必須明確地呼叫stream.read()方法來從流中讀取資料。每一次資料達到快取區都會觸發一次 readable 事件,也就是每一次 push() 都會觸發 readable。

  • 暫停模式切換到流動模式的方式:

    • 新增data事件句柄
    • 呼叫stream.resume()方法
    • 呼叫stream.pipe()方法將資料傳送到Writable
  • #流動模式切換到暫停模式的方式:

    • 如果沒有管道目標,則透過呼叫 stream.pause() 方法。
    • 如果有管道目標,則刪除所有管道目標。可以透過呼叫 stream.unpipe()方法刪除多個管道目標。

5.2 可讀流常用範例

import path from 'path';
import fs, { read } from 'fs';

const filePath = path.join(path.resolve(), 'files', 'text.txt');

const readable = fs.createReadStream(filePath);
// 如果使用 readable.setEncoding() 方法为流指定了默认编码,则监听器回调将把数据块作为字符串传入;否则数据将作为 Buffer 传入。
readable.setEncoding('utf8');
let str = '';

readable.on('open', (fd) => {
  console.log('开始读取文件')
})
// 每当流将数据块的所有权移交给消费者时,则会触发 'data' 事件
readable.on('data', (data) => {
  str += data;
  console.log('读取到数据')
})
// 方法将导致处于流动模式的流停止触发 'data' 事件,切换到暂停模式。 任何可用的数据都将保留在内部缓冲区中。
readable.pause();
// 方法使被显式暂停的 Readable 流恢复触发 'data' 事件,将流切换到流动模式。
readable.resume();
// 当调用 stream.pause() 并且 readableFlowing 不是 false 时,则会触发 'pause' 事件。
readable.on('pause', () => {
  console.log('读取暂停')
})
// 当调用 stream.resume() 并且 readableFlowing 不是 true 时,则会触发 'resume' 事件。
readable.on('resume', () => {
  console.log('重新流动')
})
// 当流中没有更多数据可供消费时,则会触发 'end' 事件。
readable.on('end', () => {
  console.log('文件读取完毕');
})
// 当流及其任何底层资源(例如文件描述符)已关闭时,则会触发 'close' 事件。
readable.on('close', () => {
  console.log('关闭文件读取')
})
// 将 destWritable 流绑定到 readable,使其自动切换到流动模式并将其所有数据推送到绑定的 Writable。 数据流将被自动管理
readable.pipe(destWriteable)
// 如果底层流由于底层内部故障而无法生成数据,或者当流实现尝试推送无效数据块时,可能会发生这种情况。
readable.on('error', (err) => {
  console.log(err)
  console.log('文件读取发生错误')
})

6. 可寫流

#6.1 可寫流的流動與暫停

writeable流與readable流是比較相似的,資料流過來的時候,會直接寫入到快取區,當寫入速度比較緩慢或寫入暫停時,資料流會在快取區快取起來;

當生產者寫入速度過快,把佇列池裝滿了之後,就會出現「背壓」,這個時候是需要告訴生產者暫停生產的,當隊列釋放之後,writable流會給生產者發送一個drain 訊息,讓它恢復生產。

6.2 可寫流程範例

import path from 'path';
import fs, { read } from 'fs';

const filePath = path.join(path.resolve(), 'files', 'text.txt');
const copyFile = path.join(path.resolve(), 'files', 'copy.txt');

let str = '';
// 创建可读流
const readable = fs.createReadStream(filePath);
// 如果使用 readable.setEncoding() 方法为流指定了默认编码
readable.setEncoding('utf8');

// 创建可写流
const wirteable = fs.createWriteStream(copyFile);
// 编码
wirteable.setDefaultEncoding('utf8');

readable.on('open', (fd) => {
  console.log('开始读取文件')
})
// 每当流将数据块的所有权移交给消费者时,则会触发 'data' 事件
readable.on('data', (data) => {
  str += data;
  console.log('读取到数据');

  // 写入
  wirteable.write(data, 'utf8');
})

wirteable.on('open', () => {
  console.log('开始写入数据')
})
// 如果对 stream.write(chunk) 的调用返回 false,则 'drain' 事件将在适合继续将数据写入流时触发。
// 即生产数据的速度大于写入速度,缓存区装满之后,会暂停生产着从底层读取数据
// writeable缓存区释放之后,会发送一个drain事件让生产者继续读取
wirteable.on('drain', () => {
  console.log('继续写入')
})
// 在调用 stream.end() 方法之后,并且所有数据都已刷新到底层系统,则触发 'finish' 事件。
wirteable.on('finish', () => {
  console.log('数据写入完毕')
})

readable.on('end', () => {
  // 数据读取完毕通知可写流
  wirteable.end()
})
// 当在可读流上调用 stream.pipe() 方法将此可写流添加到其目标集时,则触发 'pipe' 事件。
// readable.pipe(destWriteable)
wirteable.on('pipe', () => {
  console.log('管道流创建')
})

wirteable.on('error', () => {
  console.log('数据写入发生错误')
})

更多node相關知識,請造訪:nodejs 教學! !

以上是聊聊Nodejs中的核心模組:stream流模組(看看如何使用)的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:juejin.cn。如有侵權,請聯絡admin@php.cn刪除