Home  >  Article  >  Web Front-end  >  Asynchronous control use cases for Node.js

Asynchronous control use cases for Node.js

php中世界最好的语言
php中世界最好的语言Original
2018-04-17 10:08:241365browse

This time I will bring you the use case of Node.js asynchronous control. What are the precautions when using Node.js asynchronous control? The following is a practical case, let's take a look.

Without in-depth experience in using function callbacks, it is still a bit difficult to read these contents. Due to the unique asynchronous characteristics of Node.js, the problem of "callback hell" appeared. In this article, I recorded in more detail how to solve the asynchronous flow problem.

The article will be long, and this is an explanation of the asynchronous streaming pattern. This article will use a simple web spider example. Its function is to crawl the web content of the specified URL and save it in the project. At the end of the article, you can find the source code demo of the entire article.

1.NativeJavaScriptMode

This article is not aimed at beginners, so most of the basic content will be omitted:

(spider_v1.js)

const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");
function spider(url, callback) {
  const filename = utilities.urlToFilename(url);
  console.log(`filename: ${filename}`);
  fs.exists(filename, exists => {
    if (!exists) {
      console.log(`Downloading ${url}`);
      request(url, (err, response, body) => {
        if (err) {
          callback(err);
        } else {
          mkdirp(path.dirname(filename), err => {
            if (err) {
              callback(err);
            } else {
              fs.writeFile(filename, body, err => {
                if (err) {
                  callback(err);
                } else {
                  callback(null, filename, true);
                }
              });
            }
          });
        }
      });
    } else {
      callback(null, filename, false);
    }
  });
}
spider(process.argv[2], (err, filename, downloaded) => {
  if (err) {
    console.log(err);
  } else if (downloaded) {
    console.log(`Completed the download of ${filename}`);
  } else {
    console.log(`${filename} was already downloaded`);
  }
});

The flow of the above code is roughly like this:

  1. Convert url to filename

  2. Determine whether the file name exists, return directly if it exists, otherwise go to the next step

  3. Send a request and get the body

  4. Write body to file

This is a very simple version of the spider. It can only crawl the content of a URL. See how troublesome the callback above is. Then we start optimizing.

First of all, the if else method can be optimized. This is very simple. Needless to say, here is a comparison effect:

/// before
if (err) {
  callback(err);
} else {
  callback(null, filename, true);
}
/// after
if (err) {
  return callback(err);
}
callback(null, filename, true);

If the code is written like this, there will be one less layer of nesting, but experienced programmers will think that writing it this way emphasizes errors too much. The focus of our programming should be on processing the correct data, and this also exists in terms of readability. requirements.

Another optimization is function splitting. In the spider function in the above code, the downloaded file and the saved file can be split.

(spider_v2.js)

const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");
function saveFile(filename, contents, callback) {
  mkdirp(path.dirname(filename), err => {
    if (err) {
      return callback(err);
    }
    fs.writeFile(filename, contents, callback);
  });
}
function download(url, filename, callback) {
  console.log(`Downloading ${url}`);
  request(url, (err, response, body) => {
    if (err) {
      return callback(err);
    }
    saveFile(filename, body, err => {
      if (err) {
        return callback(err);
      }
      console.log(`Downloaded and saved: ${url}`);
      callback(null, body);
    });
  })
}
function spider(url, callback) {
  const filename = utilities.urlToFilename(url);
  console.log(`filename: ${filename}`);
  fs.exists(filename, exists => {
    if (exists) {
      return callback(null, filename, false);
    }
    download(url, filename, err => {
      if (err) {
        return callback(err);
      }
      callback(null, filename, true);
    })
  });
}
spider(process.argv[2], (err, filename, downloaded) => {
  if (err) {
    console.log(err);
  } else if (downloaded) {
    console.log(`Completed the download of ${filename}`);
  } else {
    console.log(`${filename} was already downloaded`);
  }
});

The above code is basically the result of native optimization, but the function of this spider is too simple. We now need to crawl all URLs in a certain web page, which will lead to serial and parallel issues.

(spider_v3.js)

const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");
function saveFile(filename, contents, callback) {
  mkdirp(path.dirname(filename), err => {
    if (err) {
      return callback(err);
    }
    fs.writeFile(filename, contents, callback);
  });
}
function download(url, filename, callback) {
  console.log(`Downloading ${url}`);
  request(url, (err, response, body) => {
    if (err) {
      return callback(err);
    }
    saveFile(filename, body, err => {
      if (err) {
        return callback(err);
      }
      console.log(`Downloaded and saved: ${url}`);
      callback(null, body);
    });
  })
}
/// 最大的启发是实现了如何异步循环遍历数组
function spiderLinks(currentUrl, body, nesting, callback) {
  if (nesting === 0) {
    return process.nextTick(callback);
  }
  const links = utilities.getPageLinks(currentUrl, body);
  function iterate(index) {
    if (index === links.length) {
      return callback();
    }
    spider(links[index], nesting - 1, err => {
      if (err) {
        return callback(err);
      }
      iterate((index + 1));
    })
  }
  iterate(0);
}
function spider(url, nesting, callback) {
  const filename = utilities.urlToFilename(url);
  fs.readFile(filename, "utf8", (err, body) => {
    if (err) {
      if (err.code !== 'ENOENT') {
        return callback(err);
      }
      return download(url, filename, (err, body) => {
        if (err) {
          return callback(err);
        }
        spiderLinks(url, body, nesting, callback);
      });
    }
    spiderLinks(url, body, nesting, callback);
  });
}
spider(process.argv[2], 2, (err, filename, downloaded) => {
  if (err) {
    console.log(err);
  } else if (downloaded) {
    console.log(`Completed the download of ${filename}`);
  } else {
    console.log(`${filename} was already downloaded`);
  }
});

The above code has two more core functions than the previous code. First, it obtains the links in a certain body through the auxiliary class:

const links = utilities.getPageLinks(currentUrl, body);

I won’t explain the internal implementation. The other core code is:

/// 最大的启发是实现了如何异步循环遍历数组
function spiderLinks(currentUrl, body, nesting, callback) {
  if (nesting === 0) {
    return process.nextTick(callback);
  }
  const links = utilities.getPageLinks(currentUrl, body);
  function iterate(index) {
    if (index === links.length) {
      return callback();
    }
    spider(links[index], nesting - 1, err => {
      if (err) {
        return callback(err);
      }
      iterate((index + 1));
    })
  }
  iterate(0);
}

It can be said that the small piece of code above is a native pattern for implementing asynchronous serialization. In addition to these, the concept of nesting is also introduced. Through this attribute, the crawling level can be controlled.

At this point we have completely implemented the serial function. Considering performance, we need to develop the parallel crawling function.

(spider_v4.js)

const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");
function saveFile(filename, contents, callback) {
  mkdirp(path.dirname(filename), err => {
    if (err) {
      return callback(err);
    }
    fs.writeFile(filename, contents, callback);
  });
}
function download(url, filename, callback) {
  console.log(`Downloading ${url}`);
  request(url, (err, response, body) => {
    if (err) {
      return callback(err);
    }
    saveFile(filename, body, err => {
      if (err) {
        return callback(err);
      }
      console.log(`Downloaded and saved: ${url}`);
      callback(null, body);
    });
  })
}
/// 最大的启发是实现了如何异步循环遍历数组
function spiderLinks(currentUrl, body, nesting, callback) {
  if (nesting === 0) {
    return process.nextTick(callback);
  }
  const links = utilities.getPageLinks(currentUrl, body);
  if (links.length === 0) {
    return process.nextTick(callback);
  }
  let completed = 0, hasErrors = false;
  function done(err) {
    if (err) {
      hasErrors = true;
      return callback(err);
    }
    if (++completed === links.length && !hasErrors) {
      return callback();
    }
  }
  links.forEach(link => {
    spider(link, nesting - 1, done);
  });
}
const spidering = new Map();
function spider(url, nesting, callback) {
  if (spidering.has(url)) {
    return process.nextTick(callback);
  }
  spidering.set(url, true);
  const filename = utilities.urlToFilename(url);
  /// In this pattern, there will be some issues.
  /// Possible problems to download the same url again and again。
  fs.readFile(filename, "utf8", (err, body) => {
    if (err) {
      if (err.code !== 'ENOENT') {
        return callback(err);
      }
      return download(url, filename, (err, body) => {
        if (err) {
          return callback(err);
        }
        spiderLinks(url, body, nesting, callback);
      });
    }
    spiderLinks(url, body, nesting, callback);
  });
}
spider(process.argv[2], 2, (err, filename, downloaded) => {
  if (err) {
    console.log(err);
  } else if (downloaded) {
    console.log(`Completed the download of ${filename}`);
  } else {
    console.log(`${filename} was already downloaded`);
  }
});

This code is also very simple and has two core contents. One is how to achieve concurrency:

/// 最大的启发是实现了如何异步循环遍历数组
function spiderLinks(currentUrl, body, nesting, callback) {
  if (nesting === 0) {
    return process.nextTick(callback);
  }
  const links = utilities.getPageLinks(currentUrl, body);
  if (links.length === 0) {
    return process.nextTick(callback);
  }
  let completed = 0, hasErrors = false;
  function done(err) {
    if (err) {
      hasErrors = true;
      return callback(err);
    }
    if (++completed === links.length && !hasErrors) {
      return callback();
    }
  }
  links.forEach(link => {
    spider(link, nesting - 1, done);
  });
}

The above code can be said to be a pattern for achieving concurrency. This is achieved using loop traversal. Another core is that since it is concurrent, there will be problems using fs.exists, and the same file may be downloaded repeatedly. The solution here is:

  • Use Map to cache a certain url, the url should be used as the key

Now we have a new requirement to limit the maximum number of simultaneous concurrencies, so here we introduce a concept that I think is the most important: queue.

(task-Queue.js)

class TaskQueue {
  constructor(concurrency) {
    this.concurrency = concurrency;
    this.running = 0;
    this.queue = [];
  }
  pushTask(task) {
    this.queue.push(task);
    this.next();
  }
  next() {
    while (this.running < this.concurrency && this.queue.length) {
      const task = this.queue.shift();
      task(() => {
        this.running--;
        this.next();
      });
      this.running++;
    }
  }
}
module.exports = TaskQueue;

The above code is the implementation code of the queue. The core is the next() method. It can be seen that when the task is added to the queue, it will be executed immediately. This does not mean that the task must be executed immediately, but it means that next will be called immediately.

(spider_v5.js)

const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");
const TaskQueue = require("./task-Queue");
const downloadQueue = new TaskQueue(2);
function saveFile(filename, contents, callback) {
  mkdirp(path.dirname(filename), err => {
    if (err) {
      return callback(err);
    }
    fs.writeFile(filename, contents, callback);
  });
}
function download(url, filename, callback) {
  console.log(`Downloading ${url}`);
  request(url, (err, response, body) => {
    if (err) {
      return callback(err);
    }
    saveFile(filename, body, err => {
      if (err) {
        return callback(err);
      }
      console.log(`Downloaded and saved: ${url}`);
      callback(null, body);
    });
  })
}
/// 最大的启发是实现了如何异步循环遍历数组
function spiderLinks(currentUrl, body, nesting, callback) {
  if (nesting === 0) {
    return process.nextTick(callback);
  }
  const links = utilities.getPageLinks(currentUrl, body);
  if (links.length === 0) {
    return process.nextTick(callback);
  }
  let completed = 0, hasErrors = false;
  links.forEach(link => {
    /// 给队列出传递一个任务,这个任务首先是一个函数,其次该函数接受一个参数
    /// 当调用任务时,触发该函数,然后给函数传递一个参数,告诉该函数在任务结束时干什么
    downloadQueue.pushTask(done => {
      spider(link, nesting - 1, err => {
        /// 这里表示,只要发生错误,队列就会退出
        if (err) {
          hasErrors = true;
          return callback(err);
        }
        if (++completed === links.length && !hasErrors) {
          callback();
        }
        done();
      });
    });
  });
}
const spidering = new Map();
function spider(url, nesting, callback) {
  if (spidering.has(url)) {
    return process.nextTick(callback);
  }
  spidering.set(url, true);
  const filename = utilities.urlToFilename(url);
  /// In this pattern, there will be some issues.
  /// Possible problems to download the same url again and again。
  fs.readFile(filename, "utf8", (err, body) => {
    if (err) {
      if (err.code !== 'ENOENT') {
        return callback(err);
      }
      return download(url, filename, (err, body) => {
        if (err) {
          return callback(err);
        }
        spiderLinks(url, body, nesting, callback);
      });
    }
    spiderLinks(url, body, nesting, callback);
  });
}
spider(process.argv[2], 2, (err, filename, downloaded) => {
  if (err) {
    console.log(`error: ${err}`);
  } else if (downloaded) {
    console.log(`Completed the download of ${filename}`);
  } else {
    console.log(`${filename} was already downloaded`);
  }
});

Therefore, in order to limit the number of concurrency, just put the task traversal into the queue in the spiderLinks method. This is relatively simple.

So far, we have used native JavaScript to implement a web spider with relatively complete functions, which can be both serial and concurrent, and can also control the number of concurrencies.

2. Use async library

把不同的功能放到不同的函数中,会给我们带来巨大的好处,async库十分流行,它的性能也不错,它内部基于callback。

(spider_v6.js)

const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");
const series = require("async/series");
const eachSeries = require("async/eachSeries");
function download(url, filename, callback) {
  console.log(`Downloading ${url}`);
  let body;
  series([
    callback => {
      request(url, (err, response, resBody) => {
        if (err) {
          return callback(err);
        }
        body = resBody;
        callback();
      });
    },
    mkdirp.bind(null, path.dirname(filename)),
    callback => {
      fs.writeFile(filename, body, callback);
    }
  ], err => {
    if (err) {
      return callback(err);
    }
    console.log(`Downloaded and saved: ${url}`);
    callback(null, body);
  });
}
/// 最大的启发是实现了如何异步循环遍历数组
function spiderLinks(currentUrl, body, nesting, callback) {
  if (nesting === 0) {
    return process.nextTick(callback);
  }
  const links = utilities.getPageLinks(currentUrl, body);
  if (links.length === 0) {
    return process.nextTick(callback);
  }
  eachSeries(links, (link, cb) => {
    "use strict";
    spider(link, nesting - 1, cb);
  }, callback);
}
const spidering = new Map();
function spider(url, nesting, callback) {
  if (spidering.has(url)) {
    return process.nextTick(callback);
  }
  spidering.set(url, true);
  const filename = utilities.urlToFilename(url);
  fs.readFile(filename, "utf8", (err, body) => {
    if (err) {
      if (err.code !== 'ENOENT') {
        return callback(err);
      }
      return download(url, filename, (err, body) => {
        if (err) {
          return callback(err);
        }
        spiderLinks(url, body, nesting, callback);
      });
    }
    spiderLinks(url, body, nesting, callback);
  });
}
spider(process.argv[2], 1, (err, filename, downloaded) => {
  if (err) {
    console.log(err);
  } else if (downloaded) {
    console.log(`Completed the download of ${filename}`);
  } else {
    console.log(`${filename} was already downloaded`);
  }
});

在上边的代码中,我们只使用了async的三个功能:

const series = require("async/series"); // 串行
const eachSeries = require("async/eachSeries"); // 并行
const queue = require("async/queue"); // 队列

由于比较简单,就不做解释了。async中的队列的代码在(spider_v7.js)中,和上边我们自定义的队列很相似,也不做更多解释了。

3.Promise

Promise是一个协议,有很多库实现了这个协议,我们用的是ES6的实现。简单来说promise就是一个约定,如果完成了,就调用它的resolve方法,失败了就调用它的reject方法。它内有实现了then方法,then返回promise本身,这样就形成了调用链。

其实Promise的内容有很多,在实际应用中是如何把普通的函数promise化。这方面的内容在这里也不讲了,我自己也不够格

(spider_v8.js)

const utilities = require("./utilities");
const request = utilities.promisify(require("request"));
const fs = require("fs");
const readFile = utilities.promisify(fs.readFile);
const writeFile = utilities.promisify(fs.writeFile);
const mkdirp = utilities.promisify(require("mkdirp"));
const path = require("path");
function saveFile(filename, contents, callback) {
  mkdirp(path.dirname(filename), err => {
    if (err) {
      return callback(err);
    }
    fs.writeFile(filename, contents, callback);
  });
}
function download(url, filename) {
  console.log(`Downloading ${url}`);
  let body;
  return request(url)
    .then(response => {
      "use strict";
      body = response.body;
      return mkdirp(path.dirname(filename));
    })
    .then(() => writeFile(filename, body))
    .then(() => {
      "use strict";
      console.log(`Downloaded adn saved: ${url}`);
      return body;
    });
}
/// promise编程的本质就是为了解决在函数中设置回调函数的问题
/// 通过中间层promise来实现异步函数同步化
function spiderLinks(currentUrl, body, nesting) {
  let promise = Promise.resolve();
  if (nesting === 0) {
    return promise;
  }
  const links = utilities.getPageLinks(currentUrl, body);
  links.forEach(link => {
    "use strict";
    promise = promise.then(() => spider(link, nesting - 1));
  });
  return promise;
}
function spider(url, nesting) {
  const filename = utilities.urlToFilename(url);
  return readFile(filename, "utf8")
    .then(
      body => spiderLinks(url, body, nesting),
      err => {
        "use strict";
        if (err.code !== 'ENOENT') {
          /// 抛出错误,这个方便与在整个异步链的最后通过呢catch来捕获这个链中的错误
          throw err;
        }
        return download(url, filename)
          .then(body => spiderLinks(url, body, nesting));
      }
    );
}
spider(process.argv[2], 1)
  .then(() => {
    "use strict";
    console.log('Download complete');
  })
  .catch(err => {
    "use strict";
    console.log(err);
  });

可以看到上边的代码中的函数都是没有callback的,只需要在最后catch就可以了。

在设计api的时候,应该支持两种方式,及支持callback,又支持promise

function asyncpision(pidend, pisor, cb) {
  return new Promise((resolve, reject) => {
    "use strict";
    process.nextTick(() => {
      const result = pidend / pisor;
      if (isNaN(result) || !Number.isFinite(result)) {
        const error = new Error("Invalid operands");
        if (cb) {
          cb(error);
        }
        return reject(error);
      }
      if (cb) {
        cb(null, result);
      }
      resolve(result);
    });
  });
}
asyncpision(10, 2, (err, result) => {
  "use strict";
  if (err) {
    return console.log(err);
  }
  console.log(result);
});
asyncpision(22, 11)
  .then((result) => console.log(result))
  .catch((err) => console.log(err));

4.Generator

Generator很有意思,他可以让暂停函数和恢复函数,利用thunkify和co这两个库,我们下边的代码实现起来非常酷。

(spider_v9.js)

const thunkify = require("thunkify");
const co = require("co");
const path = require("path");
const utilities = require("./utilities");
const request = thunkify(require("request"));
const fs = require("fs");
const mkdirp = thunkify(require("mkdirp"));
const readFile = thunkify(fs.readFile);
const writeFile = thunkify(fs.writeFile);
const nextTick = thunkify(process.nextTick);
function* download(url, filename) {
  console.log(`Downloading ${url}`);
  const response = yield request(url);
  console.log(response);
  const body = response[1];
  yield mkdirp(path.dirname(filename));
  yield writeFile(filename, body);
  console.log(`Downloaded and saved ${url}`);
  return body;
}
function* spider(url, nesting) {
  const filename = utilities.urlToFilename(url);
  let body;
  try {
    body = yield readFile(filename, "utf8");
  } catch (err) {
    if (err.code !== 'ENOENT') {
      throw err;
    }
    body = yield download(url, filename);
  }
  yield spiderLinks(url, body, nesting);
}
function* spiderLinks(currentUrl, body, nesting) {
  if (nesting === 0) {
    return nextTick();
  }
  const links = utilities.getPageLinks(currentUrl, body);
  for (let i = 0; i < links.length; i++) {
    yield spider(links[i], nesting - 1);
  }
}
/// 通过co就自动处理了回调函数,直接返回了回调函数中的参数,把这些参数放到一个数组中,但是去掉了err信息
co(function* () {
  try {
    yield spider(process.argv[2], 1);
    console.log('Download complete');
  } catch (err) {
    console.log(err);
  }
});

相信看了本文案例你已经掌握了方法,更多精彩请关注php中文网其它相关文章!

推荐阅读:



The above is the detailed content of Asynchronous control use cases for Node.js. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn