>  기사  >  웹 프론트엔드  >  Node.js 파이프() 메소드 소개

Node.js 파이프() 메소드 소개

巴扎黑
巴扎黑원래의
2017-08-15 10:14:5213363검색

이 글은 주로 Node.js 파이프의 소스 ​​코드 분석을 소개합니다. 편집자는 이것이 꽤 좋다고 생각합니다. 이제 여러분과 공유하고 참고할 것입니다. 편집자를 따라가서 살펴보겠습니다

이전 두 기사에서 우리는 배웠습니다. Readable 데이터를 Writable에 쓰려면 먼저 수동으로 데이터를 메모리로 읽은 다음 Writable에 써야 합니다. 즉, 데이터를 전달할 때마다 다음 템플릿 코드를 작성해야 합니다.

이제 어떻게 구현되었는지 살펴보겠습니다


pipe

먼저 Readable의 Pipe() 메소드를 호출해야 합니다

readable.on('readable', (err) => {
 if(err) throw err

 writable.write(readable.read())
})

pipe() 함수를 실행할 때 먼저 Writable to를 기록합니다. .pipes를 상태화한 다음 관련 이벤트를 바인딩합니다. 마지막으로 Readable이 흐름 모드가 아닌 경우 이력서()를 호출하여 Readable을 흐름 모드로 변경합니다.

데이터 전달

Readable이 데이터에서 데이터를 얻은 후. 소스에서 데이터 이벤트를 발생시키고 ondata()

ondata() 관련 코드를 실행합니다:

readable.pipe(writable)
ondata(chunk) 함수에서 dest.write(chunk)

를 통해 Writable에 데이터를 씁니다. 시간이 지나면 _write() 내부에 src.push(chunk)를 호출하거나 파이프를 해제할 수 있으며 이로 인해 waitDrain이 여러 번 증가하고 지울 수 없으며 Readable이 중단됩니다

Writable, Readable에 더 이상 데이터를 쓸 수 없는 경우 모든 배수 이벤트가

드레인 이벤트를 트리거하고 ondrain()

// lib/_stream_readable.js

Readable.prototype.pipe = function(dest, pipeOpts) {
 var src = this;
 var state = this._readableState;

 // 记录 Writable
 switch (state.pipesCount) {
  case 0:
   state.pipes = dest;
   break;
  case 1:
   state.pipes = [state.pipes, dest];
   break;
  default:
   state.pipes.push(dest);
   break;
 }
 state.pipesCount += 1;

 // ...

  src.once('end', endFn);

 dest.on('unpipe', onunpipe);
 
 // ...

 dest.on('drain', ondrain);

 // ...

 src.on('data', ondata);

 // ...

 // 保证 error 事件触发时,onerror 首先被执行
 prependListener(dest, 'error', onerror);

 // ...

 dest.once('close', onclose);
 
 // ...

 dest.once('finish', onfinish);

 // ...

 // 触发 Writable 的 pipe 事件
 dest.emit('pipe', src);

 // 将 Readable 改为 flow 模式
 if (!state.flowing) {
  debug('pipe resume');
  src.resume();
 }

 return dest;
};

을 실행할 때까지 일시 중지 모드로 들어갑니다. 각 배수 이벤트가 트리거되면 waitDrain은 waitDrain이 0이 될 때까지 감소됩니다. 이때 Readable을 flow 모드로 진입시키기 위한 call flow(src)

이 시점에서 전체 데이터 전송 주기가 성립되었으며, 모든 데이터 쓰기가 완료될 때까지 해당 주기를 따라 데이터가 지속적으로 Writable로 흘러 들어갑니다.

unpipe


작성 과정에서 오류가 발생했는지 여부에 관계없이 unpipe()

// lib/_stream_readable.js

 // 防止在 dest.write(chunk) 内调用 src.push(chunk) 造成 awaitDrain 重复增加,awaitDrain 不能清零,Readable 卡住的情况
 // 详情见 https://github.com/nodejs/node/issues/7278
 var increasedAwaitDrain = false;
 function ondata(chunk) {
  debug('ondata');
  increasedAwaitDrain = false;
  var ret = dest.write(chunk);
  if (false === ret && !increasedAwaitDrain) {
   // 防止在 dest.write() 内调用 src.unpipe(dest),导致 awaitDrain 不能清零,Readable 卡住的情况
   if (((state.pipesCount === 1 && state.pipes === dest) ||
      (state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1)
     ) && 
     !cleanedUp) {
    debug('false write response, pause', src._readableState.awaitDrain);
    src._readableState.awaitDrain++;
    increasedAwaitDrain = true;
   }
   // 进入 pause 模式
   src.pause();
  }
 }

Readable.prototype.unpipe() 함수는 state.pipes 속성과 대상을 기반으로 실행 전략을 선택합니다. 매개변수. 마지막으로 dest의 unpipe 이벤트가 트리거됩니다. unpipe 이벤트가 트리거된 후 관련 데이터를 정리하기 위해 onunpipe()가 호출됩니다. 데이터 전송, 언파이프 및 예외 처리를 포함하여 Writable은 수동적 당사자입니다(트리거만 필요함). 배수 이벤트)파이프라인 프로세스를 요약하려면:

먼저 readbable.pipe(writable)를 실행하여 읽기 가능 및 쓰기 가능을 연결합니다

읽기 가능한 데이터가 있는 경우 읽기 가능한.emit('data') 쓰기 데이터를 쓰기 가능으로


writable.write(chunk)가 false를 반환하면 일시 중지 모드로 들어가서 드레인 이벤트가 트리거될 때까지 기다립니다

모든 드레인 이벤트가 트리거된 후 다시 플로우 모드로 들어가 데이터를 씁니다

데이터 쓰기가 완료되거나 중단이 발생하더라도 마지막에는 unpipe()가 호출됩니다

  • unpipe() 및 Readable.prototype.unpipe()가 호출되어 dest cleans의 unpipe 이벤트가 트리거됩니다. 관련 데이터

위 내용은 Node.js 파이프() 메소드 소개의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.