Home  >  Article  >  Web Front-end  >  Asynchronous flow control in Node.js

Asynchronous flow control in Node.js

小云云
小云云Original
2018-01-03 11:26:171102browse

When you don’t have in-depth experience in using function callbacks, it is still a bit difficult to read these contents. Due to the unique asynchronous characteristics of Node.js, the problem of "callback hell" appeared. In this article, I recorded in more detail how to solve the asynchronous flow problem. This article mainly introduces a brief discussion of asynchronous flow control in Node.js. The editor thinks it is quite good, so I will share it with you now and give it as a reference. Let’s follow the editor to take a look, I hope it can help everyone.

Preface

The article will be very long, and this article is an explanation of the asynchronous streaming mode. This article will use a simple web spider example. Its function is to crawl the web content of the specified URL and save it in the project. At the end of the article, you can find the source code demo of the entire article.

1. Native JavaScript mode

This article is not for beginners, so most of the basic content will be omitted:

(spider_v1 .js)


const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");

function spider(url, callback) {
  const filename = utilities.urlToFilename(url);
  console.log(`filename: ${filename}`);

  fs.exists(filename, exists => {
    if (!exists) {
      console.log(`Downloading ${url}`);

      request(url, (err, response, body) => {
        if (err) {
          callback(err);
        } else {
          mkdirp(path.dirname(filename), err => {
            if (err) {
              callback(err);
            } else {
              fs.writeFile(filename, body, err => {
                if (err) {
                  callback(err);
                } else {
                  callback(null, filename, true);
                }
              });
            }
          });
        }
      });
    } else {
      callback(null, filename, false);
    }
  });
}

spider(process.argv[2], (err, filename, downloaded) => {
  if (err) {
    console.log(err);
  } else if (downloaded) {
    console.log(`Completed the download of ${filename}`);
  } else {
    console.log(`${filename} was already downloaded`);
  }
});

The process of the above code is roughly like this:

  1. Convert url to filename

  2. Determine whether the file name exists. If it exists, return directly. Otherwise, go to the next step.

  3. Send a request and get the body

  4. Write the body to the file

This is a very simple version of the spider. It can only crawl the content of a url. It is a headache to see the callback above. . Then we start optimizing.

First of all, the if else method can be optimized. This is very simple. Needless to say, let’s put a comparison effect:


/// before
if (err) {
  callback(err);
} else {
  callback(null, filename, true);
}

/// after
if (err) {
  return callback(err);
}
callback(null, filename, true);

The code is written like this, There will be one less layer of nesting, but experienced programmers will think that this way of writing puts too much emphasis on errors. The focus of our programming should be on processing the correct data, and there is also such a requirement in terms of readability.

Another optimization is function splitting. In the spider function in the above code, the downloaded file and the saved file can be split.

(spider_v2.js)


const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");

function saveFile(filename, contents, callback) {
  mkdirp(path.dirname(filename), err => {
    if (err) {
      return callback(err);
    }
    fs.writeFile(filename, contents, callback);
  });
}

function download(url, filename, callback) {
  console.log(`Downloading ${url}`);

  request(url, (err, response, body) => {
    if (err) {
      return callback(err);
    }
    saveFile(filename, body, err => {
      if (err) {
        return callback(err);
      }
      console.log(`Downloaded and saved: ${url}`);
      callback(null, body);
    });
  })
}

function spider(url, callback) {
  const filename = utilities.urlToFilename(url);
  console.log(`filename: ${filename}`);

  fs.exists(filename, exists => {
    if (exists) {
      return callback(null, filename, false);
    }
    download(url, filename, err => {
      if (err) {
        return callback(err);
      }
      callback(null, filename, true);
    })
  });
}

spider(process.argv[2], (err, filename, downloaded) => {
  if (err) {
    console.log(err);
  } else if (downloaded) {
    console.log(`Completed the download of ${filename}`);
  } else {
    console.log(`${filename} was already downloaded`);
  }
});

The above code is basically the result of native optimization, but the function of this spider is too simple, we Now we need to crawl all URLs in a certain web page, which will lead to serial and parallel issues.

(spider_v3.js)


const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");

function saveFile(filename, contents, callback) {
  mkdirp(path.dirname(filename), err => {
    if (err) {
      return callback(err);
    }
    fs.writeFile(filename, contents, callback);
  });
}

function download(url, filename, callback) {
  console.log(`Downloading ${url}`);

  request(url, (err, response, body) => {
    if (err) {
      return callback(err);
    }
    saveFile(filename, body, err => {
      if (err) {
        return callback(err);
      }
      console.log(`Downloaded and saved: ${url}`);
      callback(null, body);
    });
  })
}

/// 最大的启发是实现了如何异步循环遍历数组
function spiderLinks(currentUrl, body, nesting, callback) {
  if (nesting === 0) {
    return process.nextTick(callback);
  }

  const links = utilities.getPageLinks(currentUrl, body);

  function iterate(index) {
    if (index === links.length) {
      return callback();
    }
    spider(links[index], nesting - 1, err => {
      if (err) {
        return callback(err);
      }
      iterate((index + 1));
    })
  }

  iterate(0);
}

function spider(url, nesting, callback) {
  const filename = utilities.urlToFilename(url);

  fs.readFile(filename, "utf8", (err, body) => {
    if (err) {
      if (err.code !== 'ENOENT') {
        return callback(err);
      }
      return download(url, filename, (err, body) => {
        if (err) {
          return callback(err);
        }
        spiderLinks(url, body, nesting, callback);
      });
    }

    spiderLinks(url, body, nesting, callback);
  });
}

spider(process.argv[2], 2, (err, filename, downloaded) => {
  if (err) {
    console.log(err);
  } else if (downloaded) {
    console.log(`Completed the download of ${filename}`);
  } else {
    console.log(`${filename} was already downloaded`);
  }
});

The above code has two more core functions than the previous code. The first is to obtain a certain function through the auxiliary class. The links in the individual body:


const links = utilities.getPageLinks(currentUrl, body);

The internal implementation will not be explained. The other core code is:


/// 最大的启发是实现了如何异步循环遍历数组
function spiderLinks(currentUrl, body, nesting, callback) {
  if (nesting === 0) {
    return process.nextTick(callback);
  }

  const links = utilities.getPageLinks(currentUrl, body);

  function iterate(index) {
    if (index === links.length) {
      return callback();
    }
    spider(links[index], nesting - 1, err => {
      if (err) {
        return callback(err);
      }
      iterate((index + 1));
    })
  }

  iterate(0);
}

It can be said that the small piece of code above uses the native pattern to implement asynchronous serialization. In addition to these, the concept of nesting is also introduced. Through this attribute, the crawling level can be controlled.

At this point we have completely implemented the serial function. Considering performance, we need to develop the parallel crawling function.

(spider_v4.js)


const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");

function saveFile(filename, contents, callback) {
  mkdirp(path.dirname(filename), err => {
    if (err) {
      return callback(err);
    }
    fs.writeFile(filename, contents, callback);
  });
}

function download(url, filename, callback) {
  console.log(`Downloading ${url}`);

  request(url, (err, response, body) => {
    if (err) {
      return callback(err);
    }
    saveFile(filename, body, err => {
      if (err) {
        return callback(err);
      }
      console.log(`Downloaded and saved: ${url}`);
      callback(null, body);
    });
  })
}

/// 最大的启发是实现了如何异步循环遍历数组
function spiderLinks(currentUrl, body, nesting, callback) {
  if (nesting === 0) {
    return process.nextTick(callback);
  }

  const links = utilities.getPageLinks(currentUrl, body);
  if (links.length === 0) {
    return process.nextTick(callback);
  }

  let completed = 0, hasErrors = false;

  function done(err) {
    if (err) {
      hasErrors = true;
      return callback(err);
    }

    if (++completed === links.length && !hasErrors) {
      return callback();
    }
  }

  links.forEach(link => {
    spider(link, nesting - 1, done);
  });
}

const spidering = new Map();

function spider(url, nesting, callback) {
  if (spidering.has(url)) {
    return process.nextTick(callback);
  }

  spidering.set(url, true);

  const filename = utilities.urlToFilename(url);

  /// In this pattern, there will be some issues.
  /// Possible problems to download the same url again and again。
  fs.readFile(filename, "utf8", (err, body) => {
    if (err) {
      if (err.code !== 'ENOENT') {
        return callback(err);
      }
      return download(url, filename, (err, body) => {
        if (err) {
          return callback(err);
        }
        spiderLinks(url, body, nesting, callback);
      });
    }

    spiderLinks(url, body, nesting, callback);
  });
}

spider(process.argv[2], 2, (err, filename, downloaded) => {
  if (err) {
    console.log(err);
  } else if (downloaded) {
    console.log(`Completed the download of ${filename}`);
  } else {
    console.log(`${filename} was already downloaded`);
  }
});

This code is also very simple and has two core contents. One is how to achieve concurrency:


/// 最大的启发是实现了如何异步循环遍历数组
function spiderLinks(currentUrl, body, nesting, callback) {
  if (nesting === 0) {
    return process.nextTick(callback);
  }

  const links = utilities.getPageLinks(currentUrl, body);
  if (links.length === 0) {
    return process.nextTick(callback);
  }

  let completed = 0, hasErrors = false;

  function done(err) {
    if (err) {
      hasErrors = true;
      return callback(err);
    }

    if (++completed === links.length && !hasErrors) {
      return callback();
    }
  }

  links.forEach(link => {
    spider(link, nesting - 1, done);
  });
}

The above code can be said to be a pattern for achieving concurrency. This is achieved using loop traversal. Another core is that since it is concurrent, there will be problems using fs.exists, and the same file may be downloaded repeatedly. The solution here is:

  • Use Map To cache a certain url, the url should be used as the key

Now we have new requirements, which require limiting the maximum number of simultaneous concurrencies, so here we introduce one that I think is the most important Concept: Queue.

(task-Queue.js)


class TaskQueue {
  constructor(concurrency) {
    this.concurrency = concurrency;
    this.running = 0;
    this.queue = [];
  }

  pushTask(task) {
    this.queue.push(task);
    this.next();
  }

  next() {
    while (this.running < this.concurrency && this.queue.length) {
      const task = this.queue.shift();
      task(() => {
        this.running--;
        this.next();
      });
      this.running++;
    }
  }
}

module.exports = TaskQueue;

The code above is the implementation code of the queue. The core is the next() method. It can be seen that when After the task is added to the queue, it will be executed immediately. This does not mean that the task must be executed immediately, but it means that next will be called immediately.

(spider_v5.js)


const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");
const TaskQueue = require("./task-Queue");
const downloadQueue = new TaskQueue(2);

function saveFile(filename, contents, callback) {
  mkdirp(path.dirname(filename), err => {
    if (err) {
      return callback(err);
    }
    fs.writeFile(filename, contents, callback);
  });
}

function download(url, filename, callback) {
  console.log(`Downloading ${url}`);

  request(url, (err, response, body) => {
    if (err) {
      return callback(err);
    }
    saveFile(filename, body, err => {
      if (err) {
        return callback(err);
      }
      console.log(`Downloaded and saved: ${url}`);
      callback(null, body);
    });
  })
}

/// 最大的启发是实现了如何异步循环遍历数组
function spiderLinks(currentUrl, body, nesting, callback) {
  if (nesting === 0) {
    return process.nextTick(callback);
  }

  const links = utilities.getPageLinks(currentUrl, body);
  if (links.length === 0) {
    return process.nextTick(callback);
  }

  let completed = 0, hasErrors = false;

  links.forEach(link => {
    /// 给队列出传递一个任务,这个任务首先是一个函数,其次该函数接受一个参数
    /// 当调用任务时,触发该函数,然后给函数传递一个参数,告诉该函数在任务结束时干什么
    downloadQueue.pushTask(done => {
      spider(link, nesting - 1, err => {
        /// 这里表示,只要发生错误,队列就会退出
        if (err) {
          hasErrors = true;
          return callback(err);
        }
        if (++completed === links.length && !hasErrors) {
          callback();
        }

        done();
      });
    });

  });
}

const spidering = new Map();

function spider(url, nesting, callback) {
  if (spidering.has(url)) {
    return process.nextTick(callback);
  }

  spidering.set(url, true);

  const filename = utilities.urlToFilename(url);

  /// In this pattern, there will be some issues.
  /// Possible problems to download the same url again and again。
  fs.readFile(filename, "utf8", (err, body) => {
    if (err) {
      if (err.code !== &#39;ENOENT&#39;) {
        return callback(err);
      }
      return download(url, filename, (err, body) => {
        if (err) {
          return callback(err);
        }
        spiderLinks(url, body, nesting, callback);
      });
    }

    spiderLinks(url, body, nesting, callback);
  });
}

spider(process.argv[2], 2, (err, filename, downloaded) => {
  if (err) {
    console.log(`error: ${err}`);
  } else if (downloaded) {
    console.log(`Completed the download of ${filename}`);
  } else {
    console.log(`${filename} was already downloaded`);
  }
});

Therefore, in order to limit the number of concurrencies, just put the task traversal into the queue in the spiderLinks method. That's it. This is relatively simple.

So far, we have used native JavaScript to implement a web spider with relatively complete functions, which can be both serial and concurrent, and can also control the number of concurrencies.

2. Use async library

Putting different functions into different functions will bring us huge benefits. The async library is very popular and its The performance is also good, it is based on callback internally.

(spider_v6.js)


const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");
const series = require("async/series");
const eachSeries = require("async/eachSeries");

function download(url, filename, callback) {
  console.log(`Downloading ${url}`);

  let body;

  series([
    callback => {
      request(url, (err, response, resBody) => {
        if (err) {
          return callback(err);
        }
        body = resBody;
        callback();
      });
    },
    mkdirp.bind(null, path.dirname(filename)),
    callback => {
      fs.writeFile(filename, body, callback);
    }
  ], err => {
    if (err) {
      return callback(err);
    }
    console.log(`Downloaded and saved: ${url}`);
    callback(null, body);
  });
}

/// 最大的启发是实现了如何异步循环遍历数组
function spiderLinks(currentUrl, body, nesting, callback) {
  if (nesting === 0) {
    return process.nextTick(callback);
  }

  const links = utilities.getPageLinks(currentUrl, body);
  if (links.length === 0) {
    return process.nextTick(callback);
  }

  eachSeries(links, (link, cb) => {
    "use strict";
    spider(link, nesting - 1, cb);
  }, callback);
}

const spidering = new Map();

function spider(url, nesting, callback) {
  if (spidering.has(url)) {
    return process.nextTick(callback);
  }

  spidering.set(url, true);

  const filename = utilities.urlToFilename(url);

  fs.readFile(filename, "utf8", (err, body) => {
    if (err) {
      if (err.code !== &#39;ENOENT&#39;) {
        return callback(err);
      }
      return download(url, filename, (err, body) => {
        if (err) {
          return callback(err);
        }
        spiderLinks(url, body, nesting, callback);
      });
    }

    spiderLinks(url, body, nesting, callback);
  });
}

spider(process.argv[2], 1, (err, filename, downloaded) => {
  if (err) {
    console.log(err);
  } else if (downloaded) {
    console.log(`Completed the download of ${filename}`);
  } else {
    console.log(`${filename} was already downloaded`);
  }
});

In the above code, we only use the three functions of async:


const series = require("async/series"); // 串行
const eachSeries = require("async/eachSeries"); // 并行
const queue = require("async/queue"); // 队列

Because it is relatively simple, I won’t explain it. The code for the queue in async is in (spider_v7.js), which is very similar to our customized queue above, and I won’t explain it further.

3.Promise

Promise是一个协议,有很多库实现了这个协议,我们用的是ES6的实现。简单来说promise就是一个约定,如果完成了,就调用它的resolve方法,失败了就调用它的reject方法。它内有实现了then方法,then返回promise本身,这样就形成了调用链。

其实Promise的内容有很多,在实际应用中是如何把普通的函数promise化。这方面的内容在这里也不讲了,我自己也不够格

(spider_v8.js)


const utilities = require("./utilities");
const request = utilities.promisify(require("request"));
const fs = require("fs");
const readFile = utilities.promisify(fs.readFile);
const writeFile = utilities.promisify(fs.writeFile);
const mkdirp = utilities.promisify(require("mkdirp"));
const path = require("path");


function saveFile(filename, contents, callback) {
  mkdirp(path.dirname(filename), err => {
    if (err) {
      return callback(err);
    }
    fs.writeFile(filename, contents, callback);
  });
}

function download(url, filename) {
  console.log(`Downloading ${url}`);

  let body;

  return request(url)
    .then(response => {
      "use strict";
      body = response.body;
      return mkdirp(path.dirname(filename));
    })
    .then(() => writeFile(filename, body))
    .then(() => {
      "use strict";
      console.log(`Downloaded adn saved: ${url}`);
      return body;
    });
}

/// promise编程的本质就是为了解决在函数中设置回调函数的问题
/// 通过中间层promise来实现异步函数同步化
function spiderLinks(currentUrl, body, nesting) {
  let promise = Promise.resolve();
  if (nesting === 0) {
    return promise;
  }

  const links = utilities.getPageLinks(currentUrl, body);

  links.forEach(link => {
    "use strict";
    promise = promise.then(() => spider(link, nesting - 1));
  });

  return promise;
}

function spider(url, nesting) {
  const filename = utilities.urlToFilename(url);

  return readFile(filename, "utf8")
    .then(
      body => spiderLinks(url, body, nesting),
      err => {
        "use strict";
        if (err.code !== &#39;ENOENT&#39;) {
          /// 抛出错误,这个方便与在整个异步链的最后通过呢catch来捕获这个链中的错误
          throw err;
        }
        return download(url, filename)
          .then(body => spiderLinks(url, body, nesting));
      }
    );
}

spider(process.argv[2], 1)
  .then(() => {
    "use strict";
    console.log(&#39;Download complete&#39;);
  })
  .catch(err => {
    "use strict";
    console.log(err);
  });

可以看到上边的代码中的函数都是没有callback的,只需要在最后catch就可以了。

在设计api的时候,应该支持两种方式,及支持callback,又支持promise


function asyncpision(pidend, pisor, cb) {
  return new Promise((resolve, reject) => {
    "use strict";
    process.nextTick(() => {
      const result = pidend / pisor;
      if (isNaN(result) || !Number.isFinite(result)) {
        const error = new Error("Invalid operands");
        if (cb) {
          cb(error);
        }
        return reject(error);
      }

      if (cb) {
        cb(null, result);
      }
      resolve(result);
    });
  });
}

asyncpision(10, 2, (err, result) => {
  "use strict";
  if (err) {
    return console.log(err);
  }
  console.log(result);
});

asyncpision(22, 11)
  .then((result) => console.log(result))
  .catch((err) => console.log(err));

4.Generator

Generator很有意思,他可以让暂停函数和恢复函数,利用thunkify和co这两个库,我们下边的代码实现起来非常酷。

(spider_v9.js)


const thunkify = require("thunkify");
const co = require("co");
const path = require("path");
const utilities = require("./utilities");

const request = thunkify(require("request"));
const fs = require("fs");
const mkdirp = thunkify(require("mkdirp"));
const readFile = thunkify(fs.readFile);
const writeFile = thunkify(fs.writeFile);
const nextTick = thunkify(process.nextTick);

function* download(url, filename) {
  console.log(`Downloading ${url}`);

  const response = yield request(url);
  console.log(response);

  const body = response[1];
  yield mkdirp(path.dirname(filename));

  yield writeFile(filename, body);

  console.log(`Downloaded and saved ${url}`);
  return body;
}

function* spider(url, nesting) {
  const filename = utilities.urlToFilename(url);

  let body;

  try {
    body = yield readFile(filename, "utf8");
  } catch (err) {
    if (err.code !== &#39;ENOENT&#39;) {
      throw err;
    }
    body = yield download(url, filename);
  }

  yield spiderLinks(url, body, nesting);
}

function* spiderLinks(currentUrl, body, nesting) {
  if (nesting === 0) {
    return nextTick();
  }

  const links = utilities.getPageLinks(currentUrl, body);

  for (let i = 0; i < links.length; i++) {
    yield spider(links[i], nesting - 1);
  }
}

/// 通过co就自动处理了回调函数,直接返回了回调函数中的参数,把这些参数放到一个数组中,但是去掉了err信息
co(function* () {
  try {
    yield spider(process.argv[2], 1);
    console.log(&#39;Download complete&#39;);
  } catch (err) {
    console.log(err);
  }
});

相关推荐:

Node.js之关于异步流控制的简单介绍

微信小程序中Promise进行异步流程处理的实现过程

小程序开发之利用co处理异步流程的实例教程

The above is the detailed content of Asynchronous flow control in Node.js. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn