首頁  >  文章  >  web前端  >  什麼是流(Stream)?如何理解Nodejs中的流

什麼是流(Stream)?如何理解Nodejs中的流

青灯夜游
青灯夜游轉載
2022-05-06 19:27:262031瀏覽

什麼是流?如何理解流?以下這篇文章就來帶大家深入了解一下Node中的串流(Stream),希望對大家有幫助!

什麼是流(Stream)?如何理解Nodejs中的流

作者最近在開發中經常使用pipe 函數,只知道這是流的管道,卻不知道他是如何工作的,所以抱著一探究竟的心理乾脆就從流開始學起,隨便將看過的知識和源碼整理成一篇文章分享給大家。

串流(Stream)在 Nodejs 中是個十分基礎的概念,許多基礎模組都是基於流實現的,扮演著十分重要的角色。同時流也是一個十分難以理解的概念,這主要是相關的文檔比較缺少,對於NodeJs 初學者來說,理解流往往需要花很多時間理解,才能真正掌握這個概念,所幸的是,對於大部分NodeJs使用者來說,只是用來開發Web 應用,對流的不充分認識並不影響使用。但是,理解流能夠對 NodeJs 中的其他模組有更好的理解,同時在某些情況下,使用流來處理資料會有更好的效果。 【相關教學推薦:nodejs影片教學

如何理解流

  • 對於串流的使用者來說,可以將流看作一個數組,我們只需要關注從中獲取(消費)和寫入(生產)就可以了。

  • 對於串流的開發者(使用stream模組建立一個新實例),專注的是如何實現流程中的一些方法,通常專注於兩點,目標資源是誰和如何操作目標資源,確定了後就需要根據流的不同狀態和事件來對目標資源進行操作

#快取池

##NodeJs 中所有的流都有緩衝池,緩衝池存在的目的是增加流的效率,當資料的生產和消費都需要時間時,我們可以在下一次消費前提前生產資料存放到緩衝池。但是緩衝池並不是隨時都處於使用狀態,例如快取池為空時,資料生產後就不會放入快取池而是直接消費。 。

如果資料生產的速度大於資料的消耗速度,多餘的資料會在某個地方等待。如果資料的生產速度小於進程資料的消費速度,那麼資料就會在某個地方累積到一定的數量,然後在進行消費。 (開發者無法控制數據的生產和消費速度,只能盡量在何時的時機生產數據或消費數據)

那個數據等待,累計數據,然後發生出去的地方。就是

緩衝池。緩衝池通常位於電腦的RAM(記憶體)。

舉一個常見的緩衝區的例子,我們在觀看線上影片的時候,如果你的網速很快,緩衝區總是會被立即填充,然後發送給系統播放,然後立即緩衝下一段影片。觀看的過程中,不會有卡頓。如果網路速度很慢,則會看到loading,表示緩衝區正在被填充,當填充完成後資料傳送給系統,才能看到這段影片。

NodeJs 流的快取池是一個 Buffer 鍊錶,每一次想快取池中加入資料都會重新建立一個 Buffer 節點插入到鍊錶尾部。

EventEmitter

NodeJs 中對 Stream 是一個實作了 EventEmitter 的抽象接口,所以我會先簡單的介紹一下 EventEmitter。

EventEmitter 是一個實現事件發布訂閱功能的類,其中常用的幾個方法(on, once, off, emit)相信大家都耳熟能詳了,就不一一介紹了。

const { EventEmitter } = require('events')

const eventEmitter = new EventEmitter()

// 为 eventA 事件绑定处理函数
eventEmitter.on('eventA', () => {
    console.log('eventA active 1');
});

// 为 eventB 事件绑定处理函数
eventEmitter.on('eventB', () => {
    console.log('eventB active 1');
});

eventEmitter.once('eventA', () => {
    console.log('eventA active 2');
});

// 触发 eventA
eventEmitter.emit('eventA')
// eventA active 1
// eventA active 2

值得注意的是,

EventEmitter 有兩個叫做 newListenerremoveListener 的事件,當你在一個事件物件中加入任何事件監聽函數後,都會觸發newListener(eventEmitter.emit('newListener')),當一個處理函數被移除時同理會觸發removeListener

還需要注意的是, once 綁定的處理函數只會執行一次,

removeListener 將在其執行前被觸發,這意味著once 綁定的監聽函數是先被移除才被觸發的。

const { EventEmitter } = require('events')

const eventEmitter = new EventEmitter()

eventEmitter.on('newListener', (event, listener)=>{
    console.log('newListener', event, listener)
})

eventEmitter.on('removeListener', (event, listener) => {
    console.log('removeListener', event, listener)
})
//newListener removeListener[Function(anonymous)]


eventEmitter.on('eventA', () => {
    console.log('eventA active 1');
});
//newListener eventA [Function (anonymous)]

function listenerB() { console.log('eventB active 1'); }
eventEmitter.on('eventB', listenerB);
// newListener eventB [Function (anonymous)]

eventEmitter.once('eventA', () => {
    console.log('eventA active 2');
});
// newListener eventA [Function (anonymous)]

eventEmitter.emit('eventA')
// eventA active 1
// removeListener eventA [Function: bound onceWrapper] { listener: [Function (anonymous)] }
// eventA active 2

eventEmitter.off('eventB', listenerB)
// removeListener eventB[Function: listenerB]

不過這對我們後面的內容來說並不重要。

Stream

Stream 是 Node.js 中處理流資料的抽象介面。 Stream 並不是實際的接口,而是對所有流的一種統稱。實際的介面有 ReadableStream、 WritableStream、ReadWriteStream 這幾個。

interface ReadableStream extends EventEmitter {
    readable: boolean;
    read(size?: number): string | Buffer;
    setEncoding(encoding: BufferEncoding): this;
    pause(): this;
    resume(): this;
    isPaused(): boolean;
    pipe<T extends WritableStream>(destination: T, options?: { end?: boolean | undefined; }): T;
    unpipe(destination?: WritableStream): this;
    unshift(chunk: string | Uint8Array, encoding?: BufferEncoding): void;
    wrap(oldStream: ReadableStream): this;
    [Symbol.asyncIterator](): AsyncIterableIterator<string | Buffer>;
}

interface WritableStream extends EventEmitter {
    writable: boolean;
    write(buffer: Uint8Array | string, cb?: (err?: Error | null) => void): boolean;
    write(str: string, encoding?: BufferEncoding, cb?: (err?: Error | null) => void): boolean;
    end(cb?: () => void): this;
    end(data: string | Uint8Array, cb?: () => void): this;
    end(str: string, encoding?: BufferEncoding, cb?: () => void): this;
}

interface ReadWriteStream extends ReadableStream, WritableStream { }

可以看出 ReadableStream 和 WritableStream 都是繼承 EventEmitter 類別的介面(ts中介面是可以繼承類別的,因為他們只是在進行類型的合併)。

上面這些介面對應的實作類別分別是 Readable、Writable 和 Duplex

NodeJs中的流有4種:

  • Readable 可讀流(實作ReadableStream)
  • Writable 可寫流(實作WritableStream)
  • Duplex 可讀可寫流(繼承Readable後實作WritableStream)
  • Transform 轉換流(繼承Duplex)

背壓問題

磁碟寫入資料的速度是遠低於記憶體的,我們想像記憶體和磁碟之間有一個“管道”,“管道”中是“流”,內存的數據流入管道是非常快的,當管道塞滿時,內存中就會產生數據背壓,數據積壓在內存中,佔用資源。

什麼是流(Stream)?如何理解Nodejs中的流

NodeJs Stream 的解決方法是為每個流的快取池(就是圖中寫入佇列)設定一個浮標值,當其中數據量達到這個浮標值後,往快取池再次push 資料時就會回傳false,表示目前流中快取池內容已經達到浮標值,不希望再有資料寫入了,這時我們應該立即停止資料的生產,防止快取池過大產生背壓。

Readable

可讀流(Readable)是流的一種類型,他有兩種模式三種狀態

#兩種讀取模式:

  • 流動模式:資料會從底層系統讀取寫入到緩衝區,當緩衝區被寫滿後自動透過EventEmitter 盡快的將資料傳遞給所註冊的事件處理程序中

  • 暫停模式:在這種模式下將不會主動觸發EventEmitter 傳輸數據,必須顯示的呼叫Readable.read() 方法來從緩衝區讀取數據,read 會觸發回應到EventEmitter 事件。

三種狀態:

  • readableFlowing === null(初始狀態)

  • readableFlowing === false(暫停模式)

  • readableFlowing === true(流動模式)

初始時流的readable. readableFlowingnull

新增data事件後變成true 。呼叫pause()unpipe()、或接收到背壓或新增readable 事件,則readableFlowing 會被設為false ,在這個狀態下,為data 事件綁定監聽器不會使readableFlowing 切換到 true

呼叫resume() 可以讓可讀流的readableFlowing 切換到 true

移除所有的readable 事件是讓 readableFlowing變成null 的唯一方法。

事件名稱 說明
readable 當緩衝區有新的可讀取資料時觸發(每一個想快取池插入節點都會觸發)
#data 每一次消費資料後都會觸發)
#data 每次消費資料後都會觸發,參數是本次消費的資料
close #流關閉時觸發
error流發生錯誤時觸發方法名稱
說明
# #######read(size)######消費長度為size的數據,傳回null表示目前資料不足size,否則傳回本次消費的資料。 size不傳遞時表示消費緩存池中所有資料#############
const fs = require(&#39;fs&#39;);

const readStreams = fs.createReadStream(&#39;./EventEmitter.js&#39;, {
    highWaterMark: 100// 缓存池浮标值
})

readStreams.on(&#39;readable&#39;, () => {
    console.log(&#39;缓冲区满了&#39;)
    readStreams.read()// 消费缓存池的所有数据,返回结果并且触发data事件
})


readStreams.on(&#39;data&#39;, (data) => {
    console.log(&#39;data&#39;)
})

https://github1s.com/nodejs/node/blob/v16.14.0/lib/internal/streams/readable.js#L527

当 size 为 0 会触发 readable 事件。

当缓存池中的数据长度达到浮标值 highWaterMark 后,就不会在主动请求生产数据,而是等待数据被消费后在生产数据

暂停状态的流如果不调用 read 来消费数据时,后续也不会在触发 datareadable,当调用 read 消费时会先判断本次消费后剩余的数据长度是否低于 浮标值,如果低于 浮标值 就会在消费前请求生产数据。这样在 read 后的逻辑执行完成后新的数据大概率也已经生产完成,然后再次触发 readable,这种提前生产下一次消费的数据存放在缓存池的机制也是缓存流为什么快的原因

流动状态下的流有两种情况

  • 生产速度慢于消费速度时:这种情况下每一个生产数据后一般缓存池中都不会有剩余数据,直接将本次生产的数据传递给 data 事件即可(因为没有进入缓存池,所以也不用调用 read 来消费),然后立即开始生产新数据,待上一次数据消费完后新数据才生产好,再次触发 data ,一只到流结束。
  • 生产速度快于消费速度时:此时每一次生产完数据后一般缓存池都还存在未消费的数据,这种情况一般会在消费数据时开始生产下一次消费的数据,待旧数据消费完后新数据已经生产完并且放入缓存池

他们的区别仅仅在于数据生产后缓存池是否还存在数据,如果存在数据则将生产的数据 push 到缓存池等待消费,如果不存在则直接将数据交给 data 而不加入缓存池。

值得注意的是当一个缓存池中存在数据的流从暂停模式进入的流动模式时,会先循环调用 read 来消费数据只到返回 null

暂停模式

什麼是流(Stream)?如何理解Nodejs中的流

暂停模式下,一个可读流读创建时,模式是暂停模式,创建后会自动调用 _read 方法,把数据从数据源 push 到缓冲池中,直到缓冲池中的数据达到了浮标值。每当数据到达浮标值时,可读流会触发一个 " readable " 事件,告诉消费者有数据已经准备好了,可以继续消费。

一般来说, 'readable' 事件表明流有新的动态:要么有新的数据,要么到达流的尽头。所以,数据源的数据被读完前,也会触发一次 'readable' 事件;

消费者 " readable " 事件的处理函数中,通过 stream.read(size) 主动消费缓冲池中的数据。

const { Readable } = require(&#39;stream&#39;)

let count = 1000
const myReadable = new Readable({
    highWaterMark: 300,
    // 参数的 read 方法会作为流的 _read 方法,用于获取源数据
    read(size) {
        // 假设我们的源数据上 1000 个1
        let chunk = null
        // 读取数据的过程一般是异步的,例如IO操作
        setTimeout(() => {
            if (count > 0) {
                let chunkLength = Math.min(count, size)
                chunk = &#39;1&#39;.repeat(chunkLength)
                count -= chunkLength
            }
            this.push(chunk)
        }, 500)
    }
})
// 每一次成功 push 数据到缓存池后都会触发 readable
myReadable.on(&#39;readable&#39;, () => {
    const chunk = myReadable.read()//消费当前缓存池中所有数据
    console.log(chunk.toString())
})

值得注意的是, 如果 read(size) 的 size 大于浮标值,会重新计算新的浮标值,新浮标值是size的下一个二次幂(size

//  hwm 不会大于 1GB.
const MAX_HWM = 0x40000000;
function computeNewHighWaterMark(n) {
  if (n >= MAX_HWM) {
    // 1GB限制
    n = MAX_HWM;
  } else {
    //取下一个2最高幂,以防止过度增加hwm
    n--;
    n |= n >>> 1;
    n |= n >>> 2;
    n |= n >>> 4;
    n |= n >>> 8;
    n |= n >>> 16;
    n++;
  }
  return n;
}

流动模式

什麼是流(Stream)?如何理解Nodejs中的流

所有可读流开始的时候都是暂停模式,可以通过以下方法可以切换至流动模式:

  • 添加 " data " 事件句柄;
  • 调用 “ resume ”方法;
  • 使用 " pipe " 方法把数据发送到可写流

流动模式下,缓冲池里面的数据会自动输出到消费端进行消费,同时,每次输出数据后,会自动回调 _read 方法,把数据源的数据放到缓冲池中,如果此时缓存池中不存在数据则会直接吧数据传递给 data 事件,不会经过缓存池;直到流动模式切换至其他暂停模式,或者数据源的数据被读取完了( push(null) );

可读流可以通过以下方式切换回暂停模式:

  • 如果没有管道目标,则调用 stream.pause()
  • 如果有管道目标,则移除所有管道目标。调用 stream.unpipe() 可以移除多个管道目标。
const { Readable } = require(&#39;stream&#39;)

let count = 1000
const myReadable = new Readable({
    highWaterMark: 300,
    read(size) {
        let chunk = null
        setTimeout(() => {
            if (count > 0) {
                let chunkLength = Math.min(count, size)
                chunk = &#39;1&#39;.repeat(chunkLength)
                count -= chunkLength
            }
            this.push(chunk)
        }, 500)
    }
})

myReadable.on(&#39;data&#39;, data => {
    console.log(data.toString())
})

Writable

相对可读流来说,可写流要简单一些。

什麼是流(Stream)?如何理解Nodejs中的流

当生产者调用 write(chunk) 时,内部会根据一些状态(corked,writing等)选择是否缓存到缓冲队列中或者调用 _write,每次写完数据后,会尝试清空缓存队列中的数据。如果缓冲队列中的数据大小超出了浮标值(highWaterMark),消费者调用 write(chunk) 后会返回 false,这时候生产者应该停止继续写入。

那么什么时候可以继续写入呢?当缓冲中的数据都被成功 _write 之后,清空了缓冲队列后会触发 drain 事件,这时候生产者可以继续写入数据。

当生产者需要结束写入数据时,需要调用 stream.end 方法通知可写流结束。

const { Writable, Duplex } = require(&#39;stream&#39;)
let fileContent = &#39;&#39;
const myWritable = new Writable({
    highWaterMark: 10,
    write(chunk, encoding, callback) {// 会作为_write方法
        setTimeout(()=>{
            fileContent += chunk
            callback()// 写入结束后调用
        }, 500)
    }
})

myWritable.on(&#39;close&#39;, ()=>{
    console.log(&#39;close&#39;, fileContent)
})
myWritable.write(&#39;123123&#39;)// true
myWritable.write(&#39;123123&#39;)// false
myWritable.end()

注意,在缓存池中数据到达浮标值后,此时缓存池中可能存在多个节点,在清空缓存池的过程中(循环调用_read),并不会向可读流一样尽量一次消费长度为浮标值的数据,而是每次消费一个缓冲区节点,即使这个缓冲区长度于浮标值不一致也是如此

const { Writable } = require(&#39;stream&#39;)


let fileContent = &#39;&#39;
const myWritable = new Writable({
    highWaterMark: 10,
    write(chunk, encoding, callback) {
        setTimeout(()=>{
            fileContent += chunk
            console.log(&#39;消费&#39;, chunk.toString())
            callback()// 写入结束后调用
        }, 100)
    }
})

myWritable.on(&#39;close&#39;, ()=>{
    console.log(&#39;close&#39;, fileContent)
})

let count = 0
function productionData(){
    let flag = true
    while (count <= 20 && flag){
        flag = myWritable.write(count.toString())
        count++
    }
    if(count > 20){
        myWritable.end()
    }
}
productionData()
myWritable.on(&#39;drain&#39;, productionData)

上述是一个浮标值为 10 的可写流,现在数据源是一个 0——20 到连续的数字字符串,productionData 用于写入数据。

  • 首先第一次调用 myWritable.write("0") 时,因为缓存池不存在数据,所以 "0" 不进入缓存池,而是直接交给 _wirtemyWritable.write("0") 返回值为 true

  • 当执行 myWritable.write("1") 时,因为 _wirtecallback 还未调用,表明上一次数据还未写入完,位置保证数据写入的有序性,只能创建一个缓冲区将 "1" 加入缓存池中。后面 2-9 都是如此

  • 当执行 myWritable.write("10") 时,此时缓冲区长度为 9(1-9),还未到达浮标值, "10" 继续作为一个缓冲区加入缓存池中,此时缓存池长度变为 11,所以 myWritable.write("1") 返回 false,这意味着缓冲区的数据已经足够,我们需要等待 drain 事件通知时再生产数据。

  • 100ms过后,_write("0", encoding, callback)callback 被调用,表明 "0" 已经写入完成。然后会检查缓存池中是否存在数据,如果存在则会先调用 _read 消费缓存池的头节点("1"),然后继续重复这个过程直到缓存池为空后触发 drain 事件,再次执行 productionData

  • 调用 myWritable.write("11"),触发第1步开始的过程,直到流结束。

Duplex

在理解了可读流与可写流后,双工流就好理解了,双工流事实上是继承了可读流然后实现了可写流(源码是这么写的,但是应该说是同时实现了可读流和可写流更加好)。

什麼是流(Stream)?如何理解Nodejs中的流

Duplex 流需要同时实现下面两个方法

  • 实现 _read() 方法,为可读流生产数据

  • 实现 _write() 方法,为可写流消费数据

上面两个方法如何实现在上面可写流可读流的部分已经介绍过了,这里需要注意的是,双工流是存在两个独立的缓存池分别提供给两个流,他们的数据源也不一样

以 NodeJs 的标准输入输出流为例:

  • 当我们在控制台输入数据时会触发其 data 事件,这证明他有可读流的功能,每一次用户键入回车相当于调用可读的 push 方法推送生产的数据。
  • 当我们调用其 write 方法时也可以向控制台输出内容,但是不会触发 data 事件,这说明他有可写流的功能,而且有独立的缓冲区,_write 方法的实现内容就是让控制台展示文字。
// 每当用户在控制台输入数据(_read),就会触发data事件,这是可读流的特性
process.stdin.on(&#39;data&#39;, data=>{
    process.stdin.write(data);
})

// 每隔一秒向标准输入流生产数据(这是可写流的特性,会直接输出到控制台上),不会触发data
setInterval(()=>{
    process.stdin.write(&#39;不是用户控制台输入的数据&#39;)
}, 1000)

Transform

什麼是流(Stream)?如何理解Nodejs中的流

可以将 Duplex 流视为具有可写流的可读流。两者都是独立的,每个都有独立的内部缓冲区。读写事件独立发生。

                             Duplex Stream
                          ------------------|
                    Read                 External Sink
                          ------------------|

Transform 流是双工的,其中读写以因果关系进行。双工流的端点通过某种转换链接。读取要求发生写入。

                                 Transform Stream
                           --------------|--------------
            You     Write  ---->                   ---->  Read  You
                           --------------|--------------

对于创建 Transform 流,最重要的是要实现 _transform 方法而不是 _write 或者 _read_transform 中对可写流写入的数据做处理(消费)然后为可读流生产数据。

转换流还经常会实现一个 `_flush` 方法,他会在流结束前被调用,一般用于对流的末尾追加一些东西,例如压缩文件时的一些压缩信息就是在这里加上的
const { write } = require(&#39;fs&#39;)
const { Transform, PassThrough } = require(&#39;stream&#39;)

const reurce = &#39;1312123213124341234213423428354816273513461891468186499126412&#39;

const transform = new Transform({
    highWaterMark: 10,
    transform(chunk ,encoding, callback){// 转换数据,调用push将转换结果加入缓存池
        this.push(chunk.toString().replace(&#39;1&#39;, &#39;@&#39;))
        callback()
    },
    flush(callback){// end触发前执行
        this.push(&#39;<<<&#39;)
        callback()
    }
})


// write 不断写入数据
let count = 0
transform.write(&#39;>>>&#39;)
function productionData() {
    let flag = true
    while (count <= 20 && flag) {
        flag = transform.write(count.toString())
        count++
    }
    if (count > 20) {
        transform.end()
    }
}
productionData()
transform.on(&#39;drain&#39;, productionData)


let result = &#39;&#39;
transform.on(&#39;data&#39;, data=>{
    result += data.toString()
})
transform.on(&#39;end&#39;, ()=>{
    console.log(result)
    // >>>0@23456789@0@1@2@3@4@5@6@7@8@920<<<
})

Pipe

管道是将上一个程序的输出作为下一个程序的输入,这是管道在 Linux 中管道的作用。NodeJs 中的管道其实也类似,它管道用于连接两个流,上游的流的输出会作为下游的流的输入。

什麼是流(Stream)?如何理解Nodejs中的流

管道 sourec.pipe(dest, options) 要求 sourec 是可读的,dest是可写的。其返回值是 dest。

对于处于管道中间的流既是下一个流的上游也是上一个流的下游,所以其需要时一个可读可写的双工流,一般我们会使用转换流来作为管道中间的流。

https://github1s.com/nodejs/node/blob/v17.0.0/lib/internal/streams/legacy.js#L16-L33

Stream.prototype.pipe = function(dest, options) {
  const source = this;

  function ondata(chunk) {
    if (dest.writable && dest.write(chunk) === false && source.pause) {
      source.pause();
    }
  }

  source.on(&#39;data&#39;, ondata);

  function ondrain() {
    if (source.readable && source.resume) {
      source.resume();
    }
  }

  dest.on(&#39;drain&#39;, ondrain);
  // ...后面的代码省略
}

pipe 的实现非常清晰,当上游的流发出 data 事件时会调用下游流的 write 方法写入数据,然后立即调用 source.pause() 使得上游变为暂停状态,这主要是为了防止背压。

当下游的流将数据消费完成后会调用 source.resume() 使上游再次变为流动状态。

我们实现一个将 data 文件中所有 1 替换为 @ 然后输出到 result 文件到管道。

const { Transform } = require(&#39;stream&#39;)
const { createReadStream, createWriteStream } = require(&#39;fs&#39;)

// 一个位于管道中的转换流
function createTransformStream(){
    return new Transform({
        transform(chunk, encoding, callback){
            this.push(chunk.toString().replace(/1/g, &#39;@&#39;))
            callback()
        }
    })
}
createReadStream(&#39;./data&#39;)
.pipe(createTransformStream())
.pipe(createWriteStream(&#39;./result&#39;))

在管道中只存在两个流时,其功能和转换流有点类似,都是将一个可读流与一个可写流串联起来,但是管道可以串联多个流。

原文地址:https://juejin.cn/post/7077511716564631566

作者:月夕

更多node相关知识,请访问:nodejs 教程

以上是什麼是流(Stream)?如何理解Nodejs中的流的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:juejin.cn。如有侵權,請聯絡admin@php.cn刪除