搜尋
首頁web前端js教程深入理解Node.js 中的流(Stream)

深入理解Node.js 中的流(Stream)

Node.js 中的流(Stream)是出了名的難用甚至是難以理解。 【影片教學推薦:nodejs影片教學

用Dominic Tarr 的話來說:「流是Node 中最好的,也是最容易被誤解的想法。」即使是Redux 的創作者和React.js 的核心團隊成員Dan Abramov 也害怕Node 流。

深入理解Node.js 中的流(Stream)

本文將幫助你了解流以及如何使用。不要害怕,你完全可以把它搞清楚!

什麼是流(Stream)?

串流(Stream)是為 Node.js 應用提供動力的基本概念之一。它們是資料處理方法,用於將輸入的資料順序讀取或將資料寫入輸出。

流是一種以有效方式處理讀寫檔案、網路通訊或任何類型的端對端資訊交換的方式。

流的處理方式非常獨特,流不是像傳統方式那樣將文件一次全部讀取到記憶體中,而是逐段讀取資料塊並處理資料的內容,不將其全部保留在記憶體中。

這種方式使流在處理大量資料時非常強大,例如,檔案的大小可能大於可用的記憶體空間,因此無法將整個檔案讀入記憶體進行處理。那是流的用武之地!

既能用流來處理較小的資料塊,也可以讀取較大的檔案。

以 YouTube 或 Netflix 之類的「串流」服務為例:這些服務不會讓你你立即下載影片和音訊檔案。取而代之的是,你的瀏覽器以連續的塊流形式接收視頻,從而使接收者幾乎可以立即開始觀看和收聽。

但是,串流不僅涉及處理媒體和大數據。它們還在程式碼中賦予了我們「可組合性」的力量。考慮可組合性的設計意味著能夠以某種方式組合多個組件以產生相同類型的結果。在 Node.js 中,可以透過流在其他較小的程式碼段中傳遞數據,從而組成功能強大的程式碼段。

為什麼要使用流?

與其他資料處理方法相比,串流基本上有兩個主要優點:

  1. 記憶體效率:你不需要事先將大量資料載入記憶體即可進行處理
  2. 時間效率:取得資料後立即開始處所需的時間大幅減少,不必等到整個有效資料全部傳送完畢才開始處理

Node.js 中有4 種流:

  1. 可寫入流:可以向其中寫入資料的流。例如,fs.createWriteStream() 使我們可以使用流將資料寫入檔案。
  2. 可讀流:可從中讀取資料的流。例如:fs.createReadStream() 讓我們讀取檔案的內容。
  3. 雙工流(可讀寫的流):可讀和可寫的流。例如,net.Socket
  4. Transform:可在寫入和讀取時修改或轉換資料。例如在檔案壓縮的情況下,你可以在檔案中寫入壓縮數據,也可以從檔案讀取解壓縮的資料。

如果你已經使用過 Node.js,則可能會遇到過流。例如在基於 Node.js 的 HTTP 伺服器中,request 是可讀流,而 response 是可寫流。你可能用過 fs 模組,該模組可讓你用可讀和可寫入檔案流。每當使用Express 時,你都在使用流與客戶端進行交互,而且由於TCP 套接字、TLS棧和其他連接都基於Node.js,所以在每個可以使用的資料庫連接驅動的程式中使用流。

實例

如何建立可讀流?

首先需要可讀性流,然後將其初始化。

const Stream = require('stream')
const readableStream = new Stream.Readable()

現在,流已初始化,可以向其發送資料了:

readableStream.push('ping!')
readableStream.push('pong!')

#非同步迭代器

########### #強烈建議在使用流時配合非同步迭代器(async iterator)。 ###根據 ###Axel Rauschmayer### 博士的說法,非同步迭代是一種用於非同步檢索資料容器內容的協定(這意味著當前「任務」可以在檢索專案之前被暫停)。另外必須提的是,流非同步迭代器實作使用內部的 ###readable### 事件。 ######從可讀流讀取時,可以使用非同步迭代器:###
import * as fs from 'fs';

async function logChunks(readable) {
  for await (const chunk of readable) {
    console.log(chunk);
  }
}

const readable = fs.createReadStream(
  'tmp/test.txt', {encoding: 'utf8'});
logChunks(readable);

// Output:
// 'This is a test!\n'
#######也可以用字串收集可讀流的內容:#######
import {Readable} from 'stream';

async function readableToString2(readable) {
  let result = '';
  for await (const chunk of readable) {
    result += chunk;
  }
  return result;
}

const readable = Readable.from('Good morning!', {encoding: 'utf8'});
assert.equal(await readableToString2(readable), 'Good morning!');

注意,在这种情况下必须使用异步函数,因为我们想返回 Promise。

请切记不要将异步功能与 EventEmitter 混合使用,因为当前在事件处理程序中发出拒绝时,无法捕获拒绝,从而导致难以跟踪错误和内存泄漏。目前的最佳实践是始终将异步函数的内容包装在 try/catch 块中并处理错误,但这很容易出错。 这个 pull request 旨在解决一旦其落在 Node 核心上产生的问题。

要了解有关异步迭代的 Node.js 流的更多信息,请查看这篇很棒的文章

Readable.from():从可迭代对象创建可读流

stream.Readable.from(iterable, [options])  这是一种实用方法,用于从迭代器中创建可读流,该迭代器保存可迭代对象中包含的数据。可迭代对象可以是同步可迭代对象或异步可迭代对象。参数选项是可选的,除其他作用外,还可以用于指定文本编码。

const { Readable } = require('stream');

async function * generate() {
  yield 'hello';
  yield 'streams';
}

const readable = Readable.from(generate());

readable.on('data', (chunk) => {
  console.log(chunk);
});

两种读取模式

根据 Streams API,可读流有效地以两种模式之一运行:flowingpaused。可读流可以处于对象模式,无论处于 flowing 模式还是 paused 模式。

  • 流模式下,将自动从底层系统读取数据,并通过 EventEmitter 接口使用事件将其尽快提供给程序。
  • paused 模式下,必须显式调用 stream.read() 方法以从流中读取数据块。

在 flowing 模式中,要从流中读取数据,可以监听数据事件并附加回调。当有大量数据可用时,可读流将发出一个数据事件,并执行你的回调。看下面的代码片段:

var fs = require("fs");
var data = '';

var readerStream = fs.createReadStream('file.txt'); //Create a readable stream

readerStream.setEncoding('UTF8'); // Set the encoding to be utf8. 

// Handle stream events --> data, end, and error
readerStream.on('data', function(chunk) {
   data += chunk;
});

readerStream.on('end',function() {
   console.log(data);
});

readerStream.on('error', function(err) {
   console.log(err.stack);
});

console.log("Program Ended");

函数调用 fs.createReadStream() 给你一个可读流。最初流处于静态状态。一旦你侦听数据事件并附加了回调,它就会开始流动。之后将读取大块数据并将其传递给你的回调。流实现者决定发送数据事件的频率。例如,每当有几 KB 的数据被读取时,HTTP 请求就可能发出一个数据事件。当从文件中读取数据时,你可能会决定读取一行后就发出数据事件。

当没有更多数据要读取(结束)时,流将发出结束事件。在以上代码段中,我们监听此事件以在结束时得到通知。

另外,如果有错误,流将发出并通知错误。

在 paused 模式下,你只需在流实例上重复调用 read(),直到读完所有数据块为止,如以下示例所示:

var fs = require('fs');
var readableStream = fs.createReadStream('file.txt');
var data = '';
var chunk;

readableStream.on('readable', function() {
    while ((chunk=readableStream.read()) != null) {
        data += chunk;
    }
});

readableStream.on('end', function() {
    console.log(data)
});

read() 函数从内部缓冲区读取一些数据并将其返回。当没有内容可读取时返回 null。所以在 while 循环中,我们检查是否为 null 并终止循环。请注意,当可以从流中读取大量数据时,将会发出可读事件。

所有 Readable 流均以 paused 模式开始,但可以通过以下方式之一切换为 flowing 模式

  • 添加一个 'data' 事件处理。
  • 调用 stream.resume() 方法。
  • 调用 stream.pipe() 方法将数据发送到可写对象。

Readable 可以使以下方法之一切换回 paused 模式:

  • 如果没有管道目标,则通过调用 stream.pause() 方法。
  • 如果有管道目标,请删除所有管道目标。可以通过调用 stream.unpipe() 方法来删除多个管道目标。

一个需要记住的重要概念是,除非提供了一种用于消耗或忽略该数据的机制,否则 Readable 将不会生成数据。如果使用机制被禁用或取消,则 Readable 将会试图停止生成数据。添加 readable 事件处理会自动使流停止 flowing,并通过 read.read() 得到数据。如果删除了 readable 事件处理,那么如果存在 'data' 事件处理,则流将再次开始 flowing。

如何创建可写流?

要将数据写入可写流,你需要在流实例上调用 write()。如以下示例所示:

var fs = require('fs');
var readableStream = fs.createReadStream('file1.txt');
var writableStream = fs.createWriteStream('file2.txt');

readableStream.setEncoding('utf8');

readableStream.on('data', function(chunk) {
    writableStream.write(chunk);
});

上面的代码很简单。它只是简单地从输入流中读取数据块,并使用 write() 写入目的地。该函数返回一个布尔值,指示操作是否成功。如果为 true,则写入成功,你可以继续写入更多数据。如果返回 false,则表示出了点问题,你目前无法写任何内容。可写流将通过发出 drain 事件来通知你什么时候可以开始写入更多数据。

调用 writable.end() 方法表示没有更多数据将被写入 Writable。如果提供,则可选的回调函数将作为 finish 事件的侦听器附加。

// Write 'hello, ' and then end with 'world!'.
const fs = require('fs');
const file = fs.createWriteStream('example.txt');
file.write('hello, ');
file.end('world!');
// Writing more now is not allowed!

你可以用可写流从可读流中读取数据:

const Stream = require('stream')

const readableStream = new Stream.Readable()
const writableStream = new Stream.Writable()

writableStream._write = (chunk, encoding, next) => {
    console.log(chunk.toString())
    next()
}

readableStream.pipe(writableStream)

readableStream.push('ping!')
readableStream.push('pong!')

writableStream.end()

还可以用异步迭代器来写入可写流,建议使用

import * as util from 'util';
import * as stream from 'stream';
import * as fs from 'fs';
import {once} from 'events';

const finished = util.promisify(stream.finished); // (A)

async function writeIterableToFile(iterable, filePath) {
  const writable = fs.createWriteStream(filePath, {encoding: 'utf8'});
  for await (const chunk of iterable) {
    if (!writable.write(chunk)) { // (B)
      // Handle backpressure
      await once(writable, 'drain');
    }
  }
  writable.end(); // (C)
  // Wait until done. Throws if there are errors.
  await finished(writable);
}

await writeIterableToFile(
  ['One', ' line of text.\n'], 'tmp/log.txt');
assert.equal(
  fs.readFileSync('tmp/log.txt', {encoding: 'utf8'}),
  'One line of text.\n');

stream.finished() 的默认版本是基于回调的,但是可以通过 util.promisify() 转换为基于 Promise 的版本(A行)。

在此例中,使用以下两种模式:

Writing to a writable stream while handling backpressure (line B):
在处理 backpressure 时写入可写流(B行):

if (!writable.write(chunk)) {
  await once(writable, 'drain');
}

关闭可写流,并等待写入完成(C行):

writable.end();
await finished(writable);

pipeline()

pipeline(管道)是一种机制,可以将一个流的输出作为另一流的输入。它通常用于从一个流中获取数据并将该流的输出传递到另一个流。管道操作没有限制。换句话说,管道可用于分多个步骤处理流数据。

在 Node 10.x 中引入了 stream.pipeline()。这是一种模块方法,用于在流转发错误和正确清理之间进行管道传输,并在管道完成后提供回调。

这是使用管道的例子:

const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

// 使用 pipeline API 可以轻松将一系列流
// 通过管道传输在一起,并在管道完全完成后得到通知。
// 一个有效地用 gzip压缩巨大视频文件的管道:

pipeline(
  fs.createReadStream('The.Matrix.1080p.mkv'),
  zlib.createGzip(),
  fs.createWriteStream('The.Matrix.1080p.mkv.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline failed', err);
    } else {
      console.log('Pipeline succeeded');
    }
  }
);

由于pipe 不安全,应使用 pipeline 代替 pipe

流模块

Node.js 流模块 提供了构建所有流 API 的基础。

Stream 模块是 Node.js 中默认提供的原生模块。 Stream 是 EventEmitter 类的实例,该类在 Node 中异步处理事件。因此流本质上是基于事件的。

要访问流模块:

const stream = require('stream');

stream 模块对于创建新型流实例非常有用。通常不需要使用 stream 模块来消耗流。

流驱动的 Node API

由于它们的优点,许多 Node.js 核心模块提供了原生流处理功能,最值得注意的是:

  • net.Socket 是流所基于的主 API 节点,它是以下大多数 API 的基础
  • process.stdin 返回连接到 stdin 的流
  • process.stdout 返回连接到 stdout 的流
  • process.stderr 返回连接到 stderr 的流
  • fs.createReadStream() 创建一个可读的文件流
  • fs.createWriteStream() 创建可写的文件流
  • net.connect() 启动基于流的连接
  • http.request() 返回 http.ClientRequest 类的实例,它是可写流
  • zlib.createGzip() 使用gzip(一种压缩算法)将数据压缩到流中
  • zlib.createGunzip() 解压缩 gzip 流。
  • zlib.createDeflate() deflate(压缩算法)将数据压缩到流中
  • zlib.createInflate() 解压缩一个deflate流

流 备忘单:

深入理解Node.js 中的流(Stream)

深入理解Node.js 中的流(Stream)

深入理解Node.js 中的流(Stream)

深入理解Node.js 中的流(Stream)

深入理解Node.js 中的流(Stream)

查看更多:Node.js 流速查表

以下是与可写流相关的一些重要事件:

  • error –表示在寫入或配置管道時發生了錯誤。
  • pipeline – 當可讀流傳遞到可寫流中時,該事件由可寫流發出。
  • unpipe – 當你在可讀流上呼叫 unpipe 並停止將其傳送到目標流中時發出。

結論

這就是所有關於流的基礎知識。流、管道和鍊是 Node.js 的核心和最強大的功能。流確實可以幫你寫簡潔而有效率的程式碼來執行 I/O。

另外,還有一個值得期待的Node.js 戰略計劃,稱為BOB,旨在改善Node.js 的內部資料流以及希望作為未來Node.js 流資料介面的公共API 的。

英文原文網址:https://nodesource.com/blog/understanding-streams-in-nodejs

#作者:Liz Parody

翻譯:瘋狂的科技宅

相關推薦:nodejs 教學

以上是深入理解Node.js 中的流(Stream)的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述
本文轉載於:segmentfault。如有侵權,請聯絡admin@php.cn刪除
超越瀏覽器:現實世界中的JavaScript超越瀏覽器:現實世界中的JavaScriptApr 12, 2025 am 12:06 AM

JavaScript在現實世界中的應用包括服務器端編程、移動應用開發和物聯網控制:1.通過Node.js實現服務器端編程,適用於高並發請求處理。 2.通過ReactNative進行移動應用開發,支持跨平台部署。 3.通過Johnny-Five庫用於物聯網設備控制,適用於硬件交互。

使用Next.js(後端集成)構建多租戶SaaS應用程序使用Next.js(後端集成)構建多租戶SaaS應用程序Apr 11, 2025 am 08:23 AM

我使用您的日常技術工具構建了功能性的多租戶SaaS應用程序(一個Edtech應用程序),您可以做同樣的事情。 首先,什麼是多租戶SaaS應用程序? 多租戶SaaS應用程序可讓您從唱歌中為多個客戶提供服務

如何使用Next.js(前端集成)構建多租戶SaaS應用程序如何使用Next.js(前端集成)構建多租戶SaaS應用程序Apr 11, 2025 am 08:22 AM

本文展示了與許可證確保的後端的前端集成,並使用Next.js構建功能性Edtech SaaS應用程序。 前端獲取用戶權限以控制UI的可見性並確保API要求遵守角色庫

JavaScript:探索網絡語言的多功能性JavaScript:探索網絡語言的多功能性Apr 11, 2025 am 12:01 AM

JavaScript是現代Web開發的核心語言,因其多樣性和靈活性而廣泛應用。 1)前端開發:通過DOM操作和現代框架(如React、Vue.js、Angular)構建動態網頁和單頁面應用。 2)服務器端開發:Node.js利用非阻塞I/O模型處理高並發和實時應用。 3)移動和桌面應用開發:通過ReactNative和Electron實現跨平台開發,提高開發效率。

JavaScript的演變:當前的趨勢和未來前景JavaScript的演變:當前的趨勢和未來前景Apr 10, 2025 am 09:33 AM

JavaScript的最新趨勢包括TypeScript的崛起、現代框架和庫的流行以及WebAssembly的應用。未來前景涵蓋更強大的類型系統、服務器端JavaScript的發展、人工智能和機器學習的擴展以及物聯網和邊緣計算的潛力。

神秘的JavaScript:它的作用以及為什麼重要神秘的JavaScript:它的作用以及為什麼重要Apr 09, 2025 am 12:07 AM

JavaScript是現代Web開發的基石,它的主要功能包括事件驅動編程、動態內容生成和異步編程。 1)事件驅動編程允許網頁根據用戶操作動態變化。 2)動態內容生成使得頁面內容可以根據條件調整。 3)異步編程確保用戶界面不被阻塞。 JavaScript廣泛應用於網頁交互、單頁面應用和服務器端開發,極大地提升了用戶體驗和跨平台開發的靈活性。

Python還是JavaScript更好?Python還是JavaScript更好?Apr 06, 2025 am 12:14 AM

Python更适合数据科学和机器学习,JavaScript更适合前端和全栈开发。1.Python以简洁语法和丰富库生态著称,适用于数据分析和Web开发。2.JavaScript是前端开发核心,Node.js支持服务器端编程,适用于全栈开发。

如何安裝JavaScript?如何安裝JavaScript?Apr 05, 2025 am 12:16 AM

JavaScript不需要安裝,因為它已內置於現代瀏覽器中。你只需文本編輯器和瀏覽器即可開始使用。 1)在瀏覽器環境中,通過標籤嵌入HTML文件中運行。 2)在Node.js環境中,下載並安裝Node.js後,通過命令行運行JavaScript文件。

See all articles

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

AI Hentai Generator

AI Hentai Generator

免費產生 AI 無盡。

熱門文章

R.E.P.O.能量晶體解釋及其做什麼(黃色晶體)
3 週前By尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.最佳圖形設置
3 週前By尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.如果您聽不到任何人,如何修復音頻
3 週前By尊渡假赌尊渡假赌尊渡假赌
WWE 2K25:如何解鎖Myrise中的所有內容
4 週前By尊渡假赌尊渡假赌尊渡假赌

熱工具

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具

SecLists

SecLists

SecLists是最終安全測試人員的伙伴。它是一個包含各種類型清單的集合,這些清單在安全評估過程中經常使用,而且都在一個地方。 SecLists透過方便地提供安全測試人員可能需要的所有列表,幫助提高安全測試的效率和生產力。清單類型包括使用者名稱、密碼、URL、模糊測試有效載荷、敏感資料模式、Web shell等等。測試人員只需將此儲存庫拉到新的測試機上,他就可以存取所需的每種類型的清單。

PhpStorm Mac 版本

PhpStorm Mac 版本

最新(2018.2.1 )專業的PHP整合開發工具

ZendStudio 13.5.1 Mac

ZendStudio 13.5.1 Mac

強大的PHP整合開發環境

SublimeText3 Linux新版

SublimeText3 Linux新版

SublimeText3 Linux最新版