>  기사  >  웹 프론트엔드  >  Node.js의 비동기 흐름 제어

Node.js의 비동기 흐름 제어

小云云
小云云원래의
2018-01-03 11:26:171103검색

함수 콜백 사용에 대한 깊은 경험이 없으면 아직 이 내용을 읽기가 조금 어렵습니다. Node.js 특유의 비동기적 특성으로 인해 '콜백 지옥' 문제가 등장했습니다. 이번 글에서는 비동기 흐름 문제를 해결하는 방법을 좀 더 자세히 기록했습니다. 이번 글에서는 Node.js의 비동기 흐름 제어에 대한 간략한 논의를 주로 소개하고 있는데, 편집자는 꽤 괜찮다고 생각해서 지금부터 공유하고 참고용으로 올려드리겠습니다. 편집자를 따라 살펴보겠습니다. 모두에게 도움이 되기를 바랍니다.

머리말

글이 매우 길어질 예정이며, 이번 글은 비동기 스트리밍 모드에 대한 설명입니다. 이 기사에서는 간단한 웹 스파이더 예제를 사용합니다. 해당 기능은 지정된 URL의 웹 콘텐츠를 크롤링하고 이를 프로젝트에 저장하는 것입니다. 기사 끝 부분에서 전체 기사의 소스 코드 데모를 찾을 수 있습니다.

1. 네이티브 JavaScript 모드

이 글은 초보자를 위한 글이 아니기 때문에 대부분의 기본 내용은 생략하겠습니다:

(spider_v1.js)


const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");

function spider(url, callback) {
  const filename = utilities.urlToFilename(url);
  console.log(`filename: ${filename}`);

  fs.exists(filename, exists => {
    if (!exists) {
      console.log(`Downloading ${url}`);

      request(url, (err, response, body) => {
        if (err) {
          callback(err);
        } else {
          mkdirp(path.dirname(filename), err => {
            if (err) {
              callback(err);
            } else {
              fs.writeFile(filename, body, err => {
                if (err) {
                  callback(err);
                } else {
                  callback(null, filename, true);
                }
              });
            }
          });
        }
      });
    } else {
      callback(null, filename, false);
    }
  });
}

spider(process.argv[2], (err, filename, downloaded) => {
  if (err) {
    console.log(err);
  } else if (downloaded) {
    console.log(`Completed the download of ${filename}`);
  } else {
    console.log(`${filename} was already downloaded`);
  }
});

위 코드의 과정은 아마도 다음과 같습니다.

  1. URL을 파일 이름으로 변환

  2. 파일 이름이 존재하는지 확인하고, 없으면 직접 반환하고, 그렇지 않으면 다음 단계로 이동하세요.

  3. 요청을 보내고 본문을 가져옵니다

  4. 쓰기 파일에 본문이 있습니다.

이것은 매우 간단한 스파이더 버전입니다. 위의 콜백이 얼마나 번거로운지 확인하세요. 그런 다음 최적화를 시작합니다.

먼저 if else 메소드를 최적화할 수 있습니다. 말할 필요도 없이 비교 효과는 다음과 같습니다.


/// before
if (err) {
  callback(err);
} else {
  callback(null, filename, true);
}

/// after
if (err) {
  return callback(err);
}
callback(null, filename, true);

코드를 이렇게 작성하면 레이어가 하나 줄어듭니다. 중첩되지만 숙련된 프로그래머라면 이러한 방식으로 오류를 지나치게 강조한다고 생각할 수도 있습니다. 우리 프로그래밍의 초점은 올바른 데이터를 처리하는 것이어야 합니다. 가독성 측면에서도 그러한 요구 사항이 있습니다.

또 다른 최적화는 함수 분할입니다. 위 코드의 스파이더 함수에서는 다운로드한 파일과 저장된 파일을 분할할 수 있습니다.

(spider_v2.js)


const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");

function saveFile(filename, contents, callback) {
  mkdirp(path.dirname(filename), err => {
    if (err) {
      return callback(err);
    }
    fs.writeFile(filename, contents, callback);
  });
}

function download(url, filename, callback) {
  console.log(`Downloading ${url}`);

  request(url, (err, response, body) => {
    if (err) {
      return callback(err);
    }
    saveFile(filename, body, err => {
      if (err) {
        return callback(err);
      }
      console.log(`Downloaded and saved: ${url}`);
      callback(null, body);
    });
  })
}

function spider(url, callback) {
  const filename = utilities.urlToFilename(url);
  console.log(`filename: ${filename}`);

  fs.exists(filename, exists => {
    if (exists) {
      return callback(null, filename, false);
    }
    download(url, filename, err => {
      if (err) {
        return callback(err);
      }
      callback(null, filename, true);
    })
  });
}

spider(process.argv[2], (err, filename, downloaded) => {
  if (err) {
    console.log(err);
  } else if (downloaded) {
    console.log(`Completed the download of ${filename}`);
  } else {
    console.log(`${filename} was already downloaded`);
  }
});

위 코드는 기본적으로 네이티브 최적화의 결과이지만 이 스파이더의 기능은 이제 너무 간단하므로 특정 웹페이지의 모든 URL을 크롤링해야 합니다. 직렬 및 병렬 문제가 발생할 수 있습니다.

(spider_v3.js)


const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");

function saveFile(filename, contents, callback) {
  mkdirp(path.dirname(filename), err => {
    if (err) {
      return callback(err);
    }
    fs.writeFile(filename, contents, callback);
  });
}

function download(url, filename, callback) {
  console.log(`Downloading ${url}`);

  request(url, (err, response, body) => {
    if (err) {
      return callback(err);
    }
    saveFile(filename, body, err => {
      if (err) {
        return callback(err);
      }
      console.log(`Downloaded and saved: ${url}`);
      callback(null, body);
    });
  })
}

/// 最大的启发是实现了如何异步循环遍历数组
function spiderLinks(currentUrl, body, nesting, callback) {
  if (nesting === 0) {
    return process.nextTick(callback);
  }

  const links = utilities.getPageLinks(currentUrl, body);

  function iterate(index) {
    if (index === links.length) {
      return callback();
    }
    spider(links[index], nesting - 1, err => {
      if (err) {
        return callback(err);
      }
      iterate((index + 1));
    })
  }

  iterate(0);
}

function spider(url, nesting, callback) {
  const filename = utilities.urlToFilename(url);

  fs.readFile(filename, "utf8", (err, body) => {
    if (err) {
      if (err.code !== 'ENOENT') {
        return callback(err);
      }
      return download(url, filename, (err, body) => {
        if (err) {
          return callback(err);
        }
        spiderLinks(url, body, nesting, callback);
      });
    }

    spiderLinks(url, body, nesting, callback);
  });
}

spider(process.argv[2], 2, (err, filename, downloaded) => {
  if (err) {
    console.log(err);
  } else if (downloaded) {
    console.log(`Completed the download of ${filename}`);
  } else {
    console.log(`${filename} was already downloaded`);
  }
});

이전 코드와 비교하여 위 코드에는 두 가지 핵심 기능이 더 있습니다. 첫 번째는 보조 클래스를 통해 특정 본문의 링크를 얻는 것입니다.


const links = utilities.getPageLinks(currentUrl, body);

내부 구현 더 이상 설명하지 않겠습니다. 또 다른 핵심 코드는 다음과 같습니다.


/// 最大的启发是实现了如何异步循环遍历数组
function spiderLinks(currentUrl, body, nesting, callback) {
  if (nesting === 0) {
    return process.nextTick(callback);
  }

  const links = utilities.getPageLinks(currentUrl, body);

  function iterate(index) {
    if (index === links.length) {
      return callback();
    }
    spider(links[index], nesting - 1, err => {
      if (err) {
        return callback(err);
      }
      iterate((index + 1));
    })
  }

  iterate(0);
}

위의 작은 코드 조각은 비동기 직렬화를 구현하기 위한 기본 패턴이라고 할 수 있습니다. 이 외에도 중첩 개념도 도입되었으며, 이 속성을 통해 크롤링 수준을 제어할 수 있습니다.

이제 성능을 고려하여 직렬 기능을 완전히 구현했으며 병렬 크롤링 기능을 개발해야 합니다.

(spider_v4.js)


const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");

function saveFile(filename, contents, callback) {
  mkdirp(path.dirname(filename), err => {
    if (err) {
      return callback(err);
    }
    fs.writeFile(filename, contents, callback);
  });
}

function download(url, filename, callback) {
  console.log(`Downloading ${url}`);

  request(url, (err, response, body) => {
    if (err) {
      return callback(err);
    }
    saveFile(filename, body, err => {
      if (err) {
        return callback(err);
      }
      console.log(`Downloaded and saved: ${url}`);
      callback(null, body);
    });
  })
}

/// 最大的启发是实现了如何异步循环遍历数组
function spiderLinks(currentUrl, body, nesting, callback) {
  if (nesting === 0) {
    return process.nextTick(callback);
  }

  const links = utilities.getPageLinks(currentUrl, body);
  if (links.length === 0) {
    return process.nextTick(callback);
  }

  let completed = 0, hasErrors = false;

  function done(err) {
    if (err) {
      hasErrors = true;
      return callback(err);
    }

    if (++completed === links.length && !hasErrors) {
      return callback();
    }
  }

  links.forEach(link => {
    spider(link, nesting - 1, done);
  });
}

const spidering = new Map();

function spider(url, nesting, callback) {
  if (spidering.has(url)) {
    return process.nextTick(callback);
  }

  spidering.set(url, true);

  const filename = utilities.urlToFilename(url);

  /// In this pattern, there will be some issues.
  /// Possible problems to download the same url again and again。
  fs.readFile(filename, "utf8", (err, body) => {
    if (err) {
      if (err.code !== 'ENOENT') {
        return callback(err);
      }
      return download(url, filename, (err, body) => {
        if (err) {
          return callback(err);
        }
        spiderLinks(url, body, nesting, callback);
      });
    }

    spiderLinks(url, body, nesting, callback);
  });
}

spider(process.argv[2], 2, (err, filename, downloaded) => {
  if (err) {
    console.log(err);
  } else if (downloaded) {
    console.log(`Completed the download of ${filename}`);
  } else {
    console.log(`${filename} was already downloaded`);
  }
});

이 코드도 매우 간단하고 핵심 내용이 두 개 있습니다. 하나는 동시성을 달성하는 방법입니다.


/// 最大的启发是实现了如何异步循环遍历数组
function spiderLinks(currentUrl, body, nesting, callback) {
  if (nesting === 0) {
    return process.nextTick(callback);
  }

  const links = utilities.getPageLinks(currentUrl, body);
  if (links.length === 0) {
    return process.nextTick(callback);
  }

  let completed = 0, hasErrors = false;

  function done(err) {
    if (err) {
      hasErrors = true;
      return callback(err);
    }

    if (++completed === links.length && !hasErrors) {
      return callback();
    }
  }

  links.forEach(link => {
    spider(link, nesting - 1, done);
  });
}

위 코드는 동시성을 달성하기 위한 패턴이라고 할 수 있습니다. 이는 루프 순회를 사용하여 달성됩니다. 또 다른 핵심은 동시성이므로 fs.exists를 사용하는 데 문제가 있을 수 있으며 동일한 파일이 반복적으로 다운로드될 수 있다는 것입니다. 여기서 해결 방법은 다음과 같습니다.

  • Map을 사용하여 특정 URL을 캐시하면 URL은 Used as the key

이제 최대 동시 동시성 수를 제한해야 하는 새로운 요구 사항이 있으므로 여기서 가장 중요하다고 생각하는 개념인 대기열을 소개합니다.

(task-Queue.js)


class TaskQueue {
  constructor(concurrency) {
    this.concurrency = concurrency;
    this.running = 0;
    this.queue = [];
  }

  pushTask(task) {
    this.queue.push(task);
    this.next();
  }

  next() {
    while (this.running < this.concurrency && this.queue.length) {
      const task = this.queue.shift();
      task(() => {
        this.running--;
        this.next();
      });
      this.running++;
    }
  }
}

module.exports = TaskQueue;

위 코드는 큐의 구현 코드입니다. 핵심은 태스크가 큐에 추가될 때 나타나는 코드입니다. 이는 작업이 즉시 실행되어야 한다는 의미는 아니지만, next가 즉시 호출된다는 의미입니다.

(spider_v5.js)


const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");
const TaskQueue = require("./task-Queue");
const downloadQueue = new TaskQueue(2);

function saveFile(filename, contents, callback) {
  mkdirp(path.dirname(filename), err => {
    if (err) {
      return callback(err);
    }
    fs.writeFile(filename, contents, callback);
  });
}

function download(url, filename, callback) {
  console.log(`Downloading ${url}`);

  request(url, (err, response, body) => {
    if (err) {
      return callback(err);
    }
    saveFile(filename, body, err => {
      if (err) {
        return callback(err);
      }
      console.log(`Downloaded and saved: ${url}`);
      callback(null, body);
    });
  })
}

/// 最大的启发是实现了如何异步循环遍历数组
function spiderLinks(currentUrl, body, nesting, callback) {
  if (nesting === 0) {
    return process.nextTick(callback);
  }

  const links = utilities.getPageLinks(currentUrl, body);
  if (links.length === 0) {
    return process.nextTick(callback);
  }

  let completed = 0, hasErrors = false;

  links.forEach(link => {
    /// 给队列出传递一个任务,这个任务首先是一个函数,其次该函数接受一个参数
    /// 当调用任务时,触发该函数,然后给函数传递一个参数,告诉该函数在任务结束时干什么
    downloadQueue.pushTask(done => {
      spider(link, nesting - 1, err => {
        /// 这里表示,只要发生错误,队列就会退出
        if (err) {
          hasErrors = true;
          return callback(err);
        }
        if (++completed === links.length && !hasErrors) {
          callback();
        }

        done();
      });
    });

  });
}

const spidering = new Map();

function spider(url, nesting, callback) {
  if (spidering.has(url)) {
    return process.nextTick(callback);
  }

  spidering.set(url, true);

  const filename = utilities.urlToFilename(url);

  /// In this pattern, there will be some issues.
  /// Possible problems to download the same url again and again。
  fs.readFile(filename, "utf8", (err, body) => {
    if (err) {
      if (err.code !== &#39;ENOENT&#39;) {
        return callback(err);
      }
      return download(url, filename, (err, body) => {
        if (err) {
          return callback(err);
        }
        spiderLinks(url, body, nesting, callback);
      });
    }

    spiderLinks(url, body, nesting, callback);
  });
}

spider(process.argv[2], 2, (err, filename, downloaded) => {
  if (err) {
    console.log(`error: ${err}`);
  } else if (downloaded) {
    console.log(`Completed the download of ${filename}`);
  } else {
    console.log(`${filename} was already downloaded`);
  }
});

따라서 동시성 수를 제한하려면 spiderLinks 메서드의 대기열에 작업 순회를 넣기만 하면 됩니다. 이것은 비교적 간단합니다.

지금까지 우리는 기본 JavaScript를 사용하여 직렬 및 동시 기능이 모두 가능하고 동시성 수를 제어할 수도 있는 비교적 완전한 기능을 갖춘 웹 스파이더를 구현했습니다.

2. 비동기 라이브러리 사용

비동기 라이브러리는 매우 인기가 높으며 내부적으로도 콜백을 기반으로 합니다.

(spider_v6.js)


const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");
const series = require("async/series");
const eachSeries = require("async/eachSeries");

function download(url, filename, callback) {
  console.log(`Downloading ${url}`);

  let body;

  series([
    callback => {
      request(url, (err, response, resBody) => {
        if (err) {
          return callback(err);
        }
        body = resBody;
        callback();
      });
    },
    mkdirp.bind(null, path.dirname(filename)),
    callback => {
      fs.writeFile(filename, body, callback);
    }
  ], err => {
    if (err) {
      return callback(err);
    }
    console.log(`Downloaded and saved: ${url}`);
    callback(null, body);
  });
}

/// 最大的启发是实现了如何异步循环遍历数组
function spiderLinks(currentUrl, body, nesting, callback) {
  if (nesting === 0) {
    return process.nextTick(callback);
  }

  const links = utilities.getPageLinks(currentUrl, body);
  if (links.length === 0) {
    return process.nextTick(callback);
  }

  eachSeries(links, (link, cb) => {
    "use strict";
    spider(link, nesting - 1, cb);
  }, callback);
}

const spidering = new Map();

function spider(url, nesting, callback) {
  if (spidering.has(url)) {
    return process.nextTick(callback);
  }

  spidering.set(url, true);

  const filename = utilities.urlToFilename(url);

  fs.readFile(filename, "utf8", (err, body) => {
    if (err) {
      if (err.code !== &#39;ENOENT&#39;) {
        return callback(err);
      }
      return download(url, filename, (err, body) => {
        if (err) {
          return callback(err);
        }
        spiderLinks(url, body, nesting, callback);
      });
    }

    spiderLinks(url, body, nesting, callback);
  });
}

spider(process.argv[2], 1, (err, filename, downloaded) => {
  if (err) {
    console.log(err);
  } else if (downloaded) {
    console.log(`Completed the download of ${filename}`);
  } else {
    console.log(`${filename} was already downloaded`);
  }
});

위 코드에서는 async의 세 가지 기능만 사용했습니다.


const series = require("async/series"); // 串行
const eachSeries = require("async/eachSeries"); // 并行
const queue = require("async/queue"); // 队列

상대적으로 간단하므로 설명하지 않겠습니다. 비동기 대기열의 코드는 (spider_v7.js)에 있는데, 이는 위의 사용자 정의 대기열과 매우 유사하므로 더 이상 설명하지 않겠습니다.

3.약속

Promise是一个协议,有很多库实现了这个协议,我们用的是ES6的实现。简单来说promise就是一个约定,如果完成了,就调用它的resolve方法,失败了就调用它的reject方法。它内有实现了then方法,then返回promise本身,这样就形成了调用链。

其实Promise的内容有很多,在实际应用中是如何把普通的函数promise化。这方面的内容在这里也不讲了,我自己也不够格

(spider_v8.js)


const utilities = require("./utilities");
const request = utilities.promisify(require("request"));
const fs = require("fs");
const readFile = utilities.promisify(fs.readFile);
const writeFile = utilities.promisify(fs.writeFile);
const mkdirp = utilities.promisify(require("mkdirp"));
const path = require("path");


function saveFile(filename, contents, callback) {
  mkdirp(path.dirname(filename), err => {
    if (err) {
      return callback(err);
    }
    fs.writeFile(filename, contents, callback);
  });
}

function download(url, filename) {
  console.log(`Downloading ${url}`);

  let body;

  return request(url)
    .then(response => {
      "use strict";
      body = response.body;
      return mkdirp(path.dirname(filename));
    })
    .then(() => writeFile(filename, body))
    .then(() => {
      "use strict";
      console.log(`Downloaded adn saved: ${url}`);
      return body;
    });
}

/// promise编程的本质就是为了解决在函数中设置回调函数的问题
/// 通过中间层promise来实现异步函数同步化
function spiderLinks(currentUrl, body, nesting) {
  let promise = Promise.resolve();
  if (nesting === 0) {
    return promise;
  }

  const links = utilities.getPageLinks(currentUrl, body);

  links.forEach(link => {
    "use strict";
    promise = promise.then(() => spider(link, nesting - 1));
  });

  return promise;
}

function spider(url, nesting) {
  const filename = utilities.urlToFilename(url);

  return readFile(filename, "utf8")
    .then(
      body => spiderLinks(url, body, nesting),
      err => {
        "use strict";
        if (err.code !== &#39;ENOENT&#39;) {
          /// 抛出错误,这个方便与在整个异步链的最后通过呢catch来捕获这个链中的错误
          throw err;
        }
        return download(url, filename)
          .then(body => spiderLinks(url, body, nesting));
      }
    );
}

spider(process.argv[2], 1)
  .then(() => {
    "use strict";
    console.log(&#39;Download complete&#39;);
  })
  .catch(err => {
    "use strict";
    console.log(err);
  });

可以看到上边的代码中的函数都是没有callback的,只需要在最后catch就可以了。

在设计api的时候,应该支持两种方式,及支持callback,又支持promise


function asyncpision(pidend, pisor, cb) {
  return new Promise((resolve, reject) => {
    "use strict";
    process.nextTick(() => {
      const result = pidend / pisor;
      if (isNaN(result) || !Number.isFinite(result)) {
        const error = new Error("Invalid operands");
        if (cb) {
          cb(error);
        }
        return reject(error);
      }

      if (cb) {
        cb(null, result);
      }
      resolve(result);
    });
  });
}

asyncpision(10, 2, (err, result) => {
  "use strict";
  if (err) {
    return console.log(err);
  }
  console.log(result);
});

asyncpision(22, 11)
  .then((result) => console.log(result))
  .catch((err) => console.log(err));

4.Generator

Generator很有意思,他可以让暂停函数和恢复函数,利用thunkify和co这两个库,我们下边的代码实现起来非常酷。

(spider_v9.js)


const thunkify = require("thunkify");
const co = require("co");
const path = require("path");
const utilities = require("./utilities");

const request = thunkify(require("request"));
const fs = require("fs");
const mkdirp = thunkify(require("mkdirp"));
const readFile = thunkify(fs.readFile);
const writeFile = thunkify(fs.writeFile);
const nextTick = thunkify(process.nextTick);

function* download(url, filename) {
  console.log(`Downloading ${url}`);

  const response = yield request(url);
  console.log(response);

  const body = response[1];
  yield mkdirp(path.dirname(filename));

  yield writeFile(filename, body);

  console.log(`Downloaded and saved ${url}`);
  return body;
}

function* spider(url, nesting) {
  const filename = utilities.urlToFilename(url);

  let body;

  try {
    body = yield readFile(filename, "utf8");
  } catch (err) {
    if (err.code !== &#39;ENOENT&#39;) {
      throw err;
    }
    body = yield download(url, filename);
  }

  yield spiderLinks(url, body, nesting);
}

function* spiderLinks(currentUrl, body, nesting) {
  if (nesting === 0) {
    return nextTick();
  }

  const links = utilities.getPageLinks(currentUrl, body);

  for (let i = 0; i < links.length; i++) {
    yield spider(links[i], nesting - 1);
  }
}

/// 通过co就自动处理了回调函数,直接返回了回调函数中的参数,把这些参数放到一个数组中,但是去掉了err信息
co(function* () {
  try {
    yield spider(process.argv[2], 1);
    console.log(&#39;Download complete&#39;);
  } catch (err) {
    console.log(err);
  }
});

相关推荐:

Node.js之关于异步流控制的简单介绍

微信小程序中Promise进行异步流程处理的实现过程

小程序开发之利用co处理异步流程的实例教程

위 내용은 Node.js의 비동기 흐름 제어의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.