Maison >interface Web >js tutoriel >Présentation de la méthode Node.js pipe()

Présentation de la méthode Node.js pipe()

巴扎黑
巴扎黑original
2017-08-15 10:14:5213477parcourir

Cet article présente principalement l'analyse du code source du tube Node.js. L'éditeur pense que c'est assez bon, je vais donc le partager avec vous maintenant et le donner comme référence. Suivons l'éditeur pour y jeter un œil

Des deux articles précédents, nous avons appris. Si vous souhaitez écrire des données lisibles dans Writable, vous devez d'abord lire manuellement les données en mémoire, puis les écrire dans Writable. En d'autres termes, chaque fois que vous transmettez des données, vous devez écrire le code de modèle suivant


readable.on('readable', (err) => {
 if(err) throw err

 writable.write(readable.read())
})

Pour faciliter l'utilisation, Node.js fournit la méthode pipe() , afin que nous puissions transmettre les données avec élégance


readable.pipe(writable)

Maintenant, voyons comment cela est implémenté

pipe

Vous devez d'abord appeler la méthode pipe() de Readable


// lib/_stream_readable.js

Readable.prototype.pipe = function(dest, pipeOpts) {
 var src = this;
 var state = this._readableState;

 // 记录 Writable
 switch (state.pipesCount) {
  case 0:
   state.pipes = dest;
   break;
  case 1:
   state.pipes = [state.pipes, dest];
   break;
  default:
   state.pipes.push(dest);
   break;
 }
 state.pipesCount += 1;

 // ...

  src.once('end', endFn);

 dest.on('unpipe', onunpipe);
 
 // ...

 dest.on('drain', ondrain);

 // ...

 src.on('data', ondata);

 // ...

 // 保证 error 事件触发时,onerror 首先被执行
 prependListener(dest, 'error', onerror);

 // ...

 dest.once('close', onclose);
 
 // ...

 dest.once('finish', onfinish);

 // ...

 // 触发 Writable 的 pipe 事件
 dest.emit('pipe', src);

 // 将 Readable 改为 flow 模式
 if (!state.flowing) {
  debug('pipe resume');
  src.resume();
 }

 return dest;
};

Lors de l'exécution de la fonction pipe(), enregistrez d'abord le Writable dans state.pipes , et puis liez les événements pertinents. Enfin, si le Readable n'est pas en mode flux, appelez CV () pour changer le Readable en mode flux

Transmettre les données

Readable from. la source de données Après avoir obtenu les données, déclenchez l'événement data et exécutez ondata()

ondata() Code associé :


// lib/_stream_readable.js

 // 防止在 dest.write(chunk) 内调用 src.push(chunk) 造成 awaitDrain 重复增加,awaitDrain 不能清零,Readable 卡住的情况
 // 详情见 https://github.com/nodejs/node/issues/7278
 var increasedAwaitDrain = false;
 function ondata(chunk) {
  debug('ondata');
  increasedAwaitDrain = false;
  var ret = dest.write(chunk);
  if (false === ret && !increasedAwaitDrain) {
   // 防止在 dest.write() 内调用 src.unpipe(dest),导致 awaitDrain 不能清零,Readable 卡住的情况
   if (((state.pipesCount === 1 && state.pipes === dest) ||
      (state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1)
     ) && 
     !cleanedUp) {
    debug('false write response, pause', src._readableState.awaitDrain);
    src._readableState.awaitDrain++;
    increasedAwaitDrain = true;
   }
   // 进入 pause 模式
   src.pause();
  }
 }

dans le. fonction ondata(chunk), écrivez les données dans Writable

via dest.write(chunk). À ce stade, src.push(chunk) peut être appelé ou supprimé dans _write(), ce qui entraînera une augmentation de waitDrain. plusieurs fois. Ne peut pas être effacé, Readable est bloqué

Lorsque plus aucune donnée ne peut être écrite sur l'inscriptible, le Readable entrera en mode pause jusqu'à ce que tous les événements de vidange soient déclenchés

Déclenchez l'événement de vidange et exécuter ondrain()


// lib/_stream_readable.js

 var ondrain = pipeOnDrain(src);

 function pipeOnDrain(src) {
  return function() {
   var state = src._readableState;
   debug('pipeOnDrain', state.awaitDrain);
   if (state.awaitDrain)
    state.awaitDrain--;
   // awaitDrain === 0,且有 data 监听器
   if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
    state.flowing = true;
    flow(src);
   }
  };
 }

Lorsque chaque événement de drainage est déclenché, waitDrain sera réduit jusqu'à ce que waitDrain soit égal à 0. À ce stade, appelez flow(src) pour faire passer le Readable en mode flux

À ce stade, l'ensemble du cycle de transfert de données a été établi et les données circuleront dans le Writable en continu tout au long du cycle jusqu'à ce que tout soit terminé. les données sont écrites

unpipe

Peu importe si une erreur se produit pendant le processus d'écriture, unpipe() sera exécuté à la fin


// lib/_stream_readable.js

// ...

 function unpipe() {
  debug('unpipe');
  src.unpipe(dest);
 }

// ...

Readable.prototype.unpipe = function(dest) {
 var state = this._readableState;
 var unpipeInfo = { hasUnpiped: false };

 // 啥也没有
 if (state.pipesCount === 0)
  return this;

 // 只有一个
 if (state.pipesCount === 1) {
  if (dest && dest !== state.pipes)
   return this;
  // 没有指定就 unpipe 所有
  if (!dest)
   dest = state.pipes;

  state.pipes = null;
  state.pipesCount = 0;
  state.flowing = false;
  if (dest)
   dest.emit('unpipe', this, unpipeInfo);
  return this;
 }

 // 没有指定就 unpipe 所有
 if (!dest) {
  var dests = state.pipes;
  var len = state.pipesCount;
  state.pipes = null;
  state.pipesCount = 0;
  state.flowing = false;

  for (var i = 0; i < len; i++)
   dests[i].emit(&#39;unpipe&#39;, this, unpipeInfo);
  return this;
 }

 // 找到指定 Writable,并 unpipe
 var index = state.pipes.indexOf(dest);
 if (index === -1)
  return this;

 state.pipes.splice(index, 1);
 state.pipesCount -= 1;
 if (state.pipesCount === 1)
  state.pipes = state.pipes[0];

 dest.emit(&#39;unpipe&#39;, this, unpipeInfo);

 return this;
};

Lisible La fonction .prototype.unpipe() sélectionne une stratégie d'exécution basée sur la propriété state.pipes et le paramètre dest. Enfin, l'événement unpipe de dest sera déclenché

Une fois l'événement unpipe déclenché, appelez onunpipe() pour nettoyer les données pertinentes


// lib/_stream_readable.js

 function onunpipe(readable, unpipeInfo) {
  debug(&#39;onunpipe&#39;);
  if (readable === src) {
   if (unpipeInfo && unpipeInfo.hasUnpiped === false) {
    unpipeInfo.hasUnpiped = true;
    // 清理相关数据
    cleanup();
   }
  }
 }

Fin

Dans l'ensemble du processus de pipeline, Readable est la partie active (responsable de l'ensemble du processus de pipeline : y compris le transfert de données, le dépipe et la gestion des exceptions), et Writable est la partie passive ( il suffit de déclencher l'événement de drainage)

Pour résumer le processus de pipe :

  • Exécutez d'abord readbable.pipe(writable) pour connecter les éléments lisibles et inscriptibles

  • Lorsque readable a Lors de la lecture des données, readable.emit('data') écrit les données dans writable

  • Si writable.write(chunk) renvoie false, entrez en mode pause et attendez que l'événement de vidange se déclenche

  • Une fois tous les événements de vidange déclenchés, entrez à nouveau en mode débit et écrivez les données

  • Que l'écriture des données soit terminée ou qu'une interruption se produise, unpipe( sera appelé à la fin)

  • unpipe() appelle Readable.prototype.unpipe(), déclenche l'événement unpipe de destination et nettoie les données associées

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn