首頁  >  文章  >  web前端  >  Node.js pipe()方法介紹

Node.js pipe()方法介紹

巴扎黑
巴扎黑原創
2017-08-15 10:14:5213404瀏覽

這篇文章主要介紹了Node.js pipe實作原始碼解析,小編覺得還挺不錯的,現在分享給大家,也給大家做個參考。一起跟著小編過來看看吧

從前面兩篇文章,我們了解到。想要把 Readable 的資料寫到 Writable,就必須先手動的將資料讀入內存,然後寫入 Writable。換句話說,每次傳遞資料時,都需要寫如下的模板程式碼


readable.on('readable', (err) => {
 if(err) throw err

 writable.write(readable.read())
})

為了方便使用,Node.js 提供了pipe() 方法,讓我們可以優雅的傳遞資料


readable.pipe(writable)

現在,就讓我們來看看它是如何實現的吧

pipe

首先需要先呼叫Readable 的pipe() 方法


// lib/_stream_readable.js

Readable.prototype.pipe = function(dest, pipeOpts) {
 var src = this;
 var state = this._readableState;

 // 记录 Writable
 switch (state.pipesCount) {
  case 0:
   state.pipes = dest;
   break;
  case 1:
   state.pipes = [state.pipes, dest];
   break;
  default:
   state.pipes.push(dest);
   break;
 }
 state.pipesCount += 1;

 // ...

  src.once('end', endFn);

 dest.on('unpipe', onunpipe);
 
 // ...

 dest.on('drain', ondrain);

 // ...

 src.on('data', ondata);

 // ...

 // 保证 error 事件触发时,onerror 首先被执行
 prependListener(dest, 'error', onerror);

 // ...

 dest.once('close', onclose);
 
 // ...

 dest.once('finish', onfinish);

 // ...

 // 触发 Writable 的 pipe 事件
 dest.emit('pipe', src);

 // 将 Readable 改为 flow 模式
 if (!state.flowing) {
  debug('pipe resume');
  src.resume();
 }

 return dest;
};

執行pipe() 函數時,先將Writable 記錄到state.pipes 中,然後綁定相關事件,最後如果Readable 不是flow 模式,就呼叫resume() 將Readable 改為flow 模式

傳遞資料

Readable 從資料來源取得到資料後,觸發data 事件,執行ondata()

ondata() 相關程式碼:


// lib/_stream_readable.js

 // 防止在 dest.write(chunk) 内调用 src.push(chunk) 造成 awaitDrain 重复增加,awaitDrain 不能清零,Readable 卡住的情况
 // 详情见 https://github.com/nodejs/node/issues/7278
 var increasedAwaitDrain = false;
 function ondata(chunk) {
  debug('ondata');
  increasedAwaitDrain = false;
  var ret = dest.write(chunk);
  if (false === ret && !increasedAwaitDrain) {
   // 防止在 dest.write() 内调用 src.unpipe(dest),导致 awaitDrain 不能清零,Readable 卡住的情况
   if (((state.pipesCount === 1 && state.pipes === dest) ||
      (state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1)
     ) && 
     !cleanedUp) {
    debug('false write response, pause', src._readableState.awaitDrain);
    src._readableState.awaitDrain++;
    increasedAwaitDrain = true;
   }
   // 进入 pause 模式
   src.pause();
  }
 }

在ondata(chunk) 函數內,透過dest.write(chunk)將資料寫入Writable

此時,在_write() 內部可能會呼叫src.push(chunk) 或使其unpipe,這會導致awaitDrain 多次增加,不能清除,Readable 卡住

當不能再寫入資料到Writable 時,Readable 會進入pause 模式,直到所有的drain 事件觸發

觸發drain 事件,執行ondrain()


#
// lib/_stream_readable.js

 var ondrain = pipeOnDrain(src);

 function pipeOnDrain(src) {
  return function() {
   var state = src._readableState;
   debug('pipeOnDrain', state.awaitDrain);
   if (state.awaitDrain)
    state.awaitDrain--;
   // awaitDrain === 0,且有 data 监听器
   if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
    state.flowing = true;
    flow(src);
   }
  };
 }

每個drain 事件觸發時,都會減少awaitDrain,直到awaitDrain 為0。此時,呼叫flow(src),使Readable 進入flow 模式

到這裡,整個資料傳遞循環已經建立,資料會順著循環源源不斷的流入Writable,直到所有資料寫入完成

unpipe

不管寫入過程中是否出現錯誤,最後都會執行unpipe()


// lib/_stream_readable.js

// ...

 function unpipe() {
  debug('unpipe');
  src.unpipe(dest);
 }

// ...

Readable.prototype.unpipe = function(dest) {
 var state = this._readableState;
 var unpipeInfo = { hasUnpiped: false };

 // 啥也没有
 if (state.pipesCount === 0)
  return this;

 // 只有一个
 if (state.pipesCount === 1) {
  if (dest && dest !== state.pipes)
   return this;
  // 没有指定就 unpipe 所有
  if (!dest)
   dest = state.pipes;

  state.pipes = null;
  state.pipesCount = 0;
  state.flowing = false;
  if (dest)
   dest.emit('unpipe', this, unpipeInfo);
  return this;
 }

 // 没有指定就 unpipe 所有
 if (!dest) {
  var dests = state.pipes;
  var len = state.pipesCount;
  state.pipes = null;
  state.pipesCount = 0;
  state.flowing = false;

  for (var i = 0; i < len; i++)
   dests[i].emit(&#39;unpipe&#39;, this, unpipeInfo);
  return this;
 }

 // 找到指定 Writable,并 unpipe
 var index = state.pipes.indexOf(dest);
 if (index === -1)
  return this;

 state.pipes.splice(index, 1);
 state.pipesCount -= 1;
 if (state.pipesCount === 1)
  state.pipes = state.pipes[0];

 dest.emit(&#39;unpipe&#39;, this, unpipeInfo);

 return this;
};

Readable.prototype .unpipe() 函數會依據state.pipes 屬性和dest 參數,選擇執行策略。最後會觸發dest 的unpipe 事件

unpipe 事件觸發後,呼叫onunpipe(),清理相關資料


##

// lib/_stream_readable.js

 function onunpipe(readable, unpipeInfo) {
  debug(&#39;onunpipe&#39;);
  if (readable === src) {
   if (unpipeInfo && unpipeInfo.hasUnpiped === false) {
    unpipeInfo.hasUnpiped = true;
    // 清理相关数据
    cleanup();
   }
  }
 }

#End

在整個pipe 的過程中,Readable 是主動方( 負責整個pipe 過程:包括資料傳遞、unpipe 與異常處理),Writable 是被動方( 只需要觸發drain 事件)

總結一下pipe 的過程:

  • 先執行readbable.pipe(writable),將readable 與writable 對接上

  • 當readable 中有資料時,readable.emit('data'),將資料寫入writable

  • #如果writable.write(chunk) 傳回false,則進入pause 模式,等待drain 事件觸發

  • drain 事件全部觸發後,再次進入flow 模式,寫入資料

  • #不管資料寫入完成或發生中斷,最後都會呼叫unpipe()

  • unpipe() 呼叫Readable.prototype.unpipe(),觸發dest 的unpipe 事件,清理相關資料

    ##

以上是Node.js pipe()方法介紹的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn