首頁 >web前端 >js教程 >Node.js異步流控制詳解

Node.js異步流控制詳解

小云云
小云云原創
2018-01-22 10:11:591223瀏覽

在沒有深度使用函數回呼的經驗的時候,去看這些內容還是有一點吃力的。由於Node.js獨特的非同步特性,才出現了「回調地獄」的問題,在這篇文章中,我比較詳細的記錄瞭如何解決異步流問題。本文主要介紹了淺談Node.js之非同步流控制,小編覺得挺不錯的,現在分享給大家,也給大家做個參考。一起跟著小編過來看看吧,希望能幫助大家。

文章會很長,而且這篇是對非同步流模式的解釋。文中會使用一個簡單的網路蜘蛛的例子,它的作用是抓取指定URL的網頁內容並保存在專案中,在文章的最後,可以找到整篇文章中的原始碼demo。

1.原生JavaScript模式

本篇不針對初學者,因此會省略掉大部分的基礎內容的講解:

(spider_v1 .js)


const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");

function spider(url, callback) {
  const filename = utilities.urlToFilename(url);
  console.log(`filename: ${filename}`);

  fs.exists(filename, exists => {
    if (!exists) {
      console.log(`Downloading ${url}`);

      request(url, (err, response, body) => {
        if (err) {
          callback(err);
        } else {
          mkdirp(path.dirname(filename), err => {
            if (err) {
              callback(err);
            } else {
              fs.writeFile(filename, body, err => {
                if (err) {
                  callback(err);
                } else {
                  callback(null, filename, true);
                }
              });
            }
          });
        }
      });
    } else {
      callback(null, filename, false);
    }
  });
}

spider(process.argv[2], (err, filename, downloaded) => {
  if (err) {
    console.log(err);
  } else if (downloaded) {
    console.log(`Completed the download of ${filename}`);
  } else {
    console.log(`${filename} was already downloaded`);
  }
});

上邊的程式碼的流程大概是這樣的:

  1. 把url轉換成filename

  2. 判斷該檔案名稱是否存在,若存在直接返回,否則進入下一步

  3. #發送請求,取得body


##把body寫入檔案

這是一個非常簡單版本的蜘蛛,他只能抓取一個url的內容,看到上邊的回呼多麼令人頭疼。那我們開始進行優化。

首先,if else 這種方式可以進行最佳化,這個很簡單,不用多說,放一個對比效果:


/// before
if (err) {
  callback(err);
} else {
  callback(null, filename, true);
}

/// after
if (err) {
  return callback(err);
}
callback(null, filename, true);

程式碼這麼寫,嵌套就會少一層,但經驗豐富的程式設計師會認為,這樣寫過重強調了error,我們程式設計的重點應該放在處理正確的資料上,在可讀性上也存在這樣的要求。

另一個優化是函式拆分,上邊程式碼中的spider函數中,可以把下載檔和儲存檔拆分出去。


(spider_v2.js)

const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");

function saveFile(filename, contents, callback) {
  mkdirp(path.dirname(filename), err => {
    if (err) {
      return callback(err);
    }
    fs.writeFile(filename, contents, callback);
  });
}

function download(url, filename, callback) {
  console.log(`Downloading ${url}`);

  request(url, (err, response, body) => {
    if (err) {
      return callback(err);
    }
    saveFile(filename, body, err => {
      if (err) {
        return callback(err);
      }
      console.log(`Downloaded and saved: ${url}`);
      callback(null, body);
    });
  })
}

function spider(url, callback) {
  const filename = utilities.urlToFilename(url);
  console.log(`filename: ${filename}`);

  fs.exists(filename, exists => {
    if (exists) {
      return callback(null, filename, false);
    }
    download(url, filename, err => {
      if (err) {
        return callback(err);
      }
      callback(null, filename, true);
    })
  });
}

spider(process.argv[2], (err, filename, downloaded) => {
  if (err) {
    console.log(err);
  } else if (downloaded) {
    console.log(`Completed the download of ${filename}`);
  } else {
    console.log(`${filename} was already downloaded`);
  }
});

上邊的程式碼基本上是採用原生最佳化後的結果,但這個蜘蛛的功能太簡單,我們現在需要抓取某個網頁中的所有url,這樣才會引申出串列和並行的問題。

(spider_v3.js)


const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");

function saveFile(filename, contents, callback) {
  mkdirp(path.dirname(filename), err => {
    if (err) {
      return callback(err);
    }
    fs.writeFile(filename, contents, callback);
  });
}

function download(url, filename, callback) {
  console.log(`Downloading ${url}`);

  request(url, (err, response, body) => {
    if (err) {
      return callback(err);
    }
    saveFile(filename, body, err => {
      if (err) {
        return callback(err);
      }
      console.log(`Downloaded and saved: ${url}`);
      callback(null, body);
    });
  })
}

/// 最大的启发是实现了如何异步循环遍历数组
function spiderLinks(currentUrl, body, nesting, callback) {
  if (nesting === 0) {
    return process.nextTick(callback);
  }

  const links = utilities.getPageLinks(currentUrl, body);

  function iterate(index) {
    if (index === links.length) {
      return callback();
    }
    spider(links[index], nesting - 1, err => {
      if (err) {
        return callback(err);
      }
      iterate((index + 1));
    })
  }

  iterate(0);
}

function spider(url, nesting, callback) {
  const filename = utilities.urlToFilename(url);

  fs.readFile(filename, "utf8", (err, body) => {
    if (err) {
      if (err.code !== 'ENOENT') {
        return callback(err);
      }
      return download(url, filename, (err, body) => {
        if (err) {
          return callback(err);
        }
        spiderLinks(url, body, nesting, callback);
      });
    }

    spiderLinks(url, body, nesting, callback);
  });
}

spider(process.argv[2], 2, (err, filename, downloaded) => {
  if (err) {
    console.log(err);
  } else if (downloaded) {
    console.log(`Completed the download of ${filename}`);
  } else {
    console.log(`${filename} was already downloaded`);
  }
});

上邊的程式碼相比之前的程式碼多了兩個核心功能,首先是透過輔助類別取得到了某個body中的links:

const links = utilities.getPageLinks(currentUrl, body);

內部實作就不解釋了,另一個核心程式碼就是:


##
/// 最大的启发是实现了如何异步循环遍历数组
function spiderLinks(currentUrl, body, nesting, callback) {
  if (nesting === 0) {
    return process.nextTick(callback);
  }

  const links = utilities.getPageLinks(currentUrl, body);

  function iterate(index) {
    if (index === links.length) {
      return callback();
    }
    spider(links[index], nesting - 1, err => {
      if (err) {
        return callback(err);
      }
      iterate((index + 1));
    })
  }

  iterate(0);
}

可以說上邊這一小段程式碼,就是採用原生實作非同步串列的pattern了。除了這些之外,還引入了nesting的概念,透過這是這個屬性,可以控制抓取層次。

到這裡我們就完整的實現了串行的功能,考慮到性能,我們要開發並行抓取的功能。

(spider_v4.js)


    const request = require("request");
    const fs = require("fs");
    const mkdirp = require("mkdirp");
    const path = require("path");
    const utilities = require("./utilities");
    
    function saveFile(filename, contents, callback) {
      mkdirp(path.dirname(filename), err => {
        if (err) {
          return callback(err);
        }
        fs.writeFile(filename, contents, callback);
      });
    }
    
    function download(url, filename, callback) {
      console.log(`Downloading ${url}`);
    
      request(url, (err, response, body) => {
        if (err) {
          return callback(err);
        }
        saveFile(filename, body, err => {
          if (err) {
            return callback(err);
          }
          console.log(`Downloaded and saved: ${url}`);
          callback(null, body);
        });
      })
    }
    
    /// 最大的启发是实现了如何异步循环遍历数组
    function spiderLinks(currentUrl, body, nesting, callback) {
      if (nesting === 0) {
        return process.nextTick(callback);
      }
    
      const links = utilities.getPageLinks(currentUrl, body);
      if (links.length === 0) {
        return process.nextTick(callback);
      }
    
      let completed = 0, hasErrors = false;
    
      function done(err) {
        if (err) {
          hasErrors = true;
          return callback(err);
        }
    
        if (++completed === links.length && !hasErrors) {
          return callback();
        }
      }
    
      links.forEach(link => {
        spider(link, nesting - 1, done);
      });
    }
    
    const spidering = new Map();
    
    function spider(url, nesting, callback) {
      if (spidering.has(url)) {
        return process.nextTick(callback);
      }
    
      spidering.set(url, true);
    
      const filename = utilities.urlToFilename(url);
    
      /// In this pattern, there will be some issues.
      /// Possible problems to download the same url again and again。
      fs.readFile(filename, "utf8", (err, body) => {
        if (err) {
          if (err.code !== 'ENOENT') {
            return callback(err);
          }
          return download(url, filename, (err, body) => {
            if (err) {
              return callback(err);
            }
            spiderLinks(url, body, nesting, callback);
          });
        }
    
        spiderLinks(url, body, nesting, callback);
      });
    }
    
    spider(process.argv[2], 2, (err, filename, downloaded) => {
      if (err) {
        console.log(err);
      } else if (downloaded) {
        console.log(`Completed the download of ${filename}`);
      } else {
        console.log(`${filename} was already downloaded`);
      }
    });
  • 這段程式碼同樣很簡單,也有兩個核心內容。一個是如何實現並發:

/// 最大的启发是实现了如何异步循环遍历数组
function spiderLinks(currentUrl, body, nesting, callback) {
  if (nesting === 0) {
    return process.nextTick(callback);
  }

  const links = utilities.getPageLinks(currentUrl, body);
  if (links.length === 0) {
    return process.nextTick(callback);
  }

  let completed = 0, hasErrors = false;

  function done(err) {
    if (err) {
      hasErrors = true;
      return callback(err);
    }

    if (++completed === links.length && !hasErrors) {
      return callback();
    }
  }

  links.forEach(link => {
    spider(link, nesting - 1, done);
  });
}

上邊的程式碼可以說是實作並發的一個pattern。利用迴圈遍歷來實現。另一個核心是,既然是並發的,那麼利用fs.exists 就會存在問題,可能會重複下載同一文件,這裡的解決方案是:


使用Map快取某一url,url應該作為key

現在我們又有了新的需求,要求限制同時並發的最大數,那麼在這裡就引進了一個我認為最重要的概念:隊列。

(task-Queue.js)

class TaskQueue {
  constructor(concurrency) {
    this.concurrency = concurrency;
    this.running = 0;
    this.queue = [];
  }

  pushTask(task) {
    this.queue.push(task);
    this.next();
  }

  next() {
    while (this.running < this.concurrency && this.queue.length) {
      const task = this.queue.shift();
      task(() => {
        this.running--;
        this.next();
      });
      this.running++;
    }
  }
}

module.exports = TaskQueue;

上邊的程式碼就是佇列的實作程式碼,核心是next() 方法,可以看出,當task加入佇列後,會立刻執行,這不是說這個任務一定馬上執行,而是指的是next會立刻呼叫。

(spider_v5.js)

const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");
const TaskQueue = require("./task-Queue");
const downloadQueue = new TaskQueue(2);

function saveFile(filename, contents, callback) {
  mkdirp(path.dirname(filename), err => {
    if (err) {
      return callback(err);
    }
    fs.writeFile(filename, contents, callback);
  });
}

function download(url, filename, callback) {
  console.log(`Downloading ${url}`);

  request(url, (err, response, body) => {
    if (err) {
      return callback(err);
    }
    saveFile(filename, body, err => {
      if (err) {
        return callback(err);
      }
      console.log(`Downloaded and saved: ${url}`);
      callback(null, body);
    });
  })
}

/// 最大的启发是实现了如何异步循环遍历数组
function spiderLinks(currentUrl, body, nesting, callback) {
  if (nesting === 0) {
    return process.nextTick(callback);
  }

  const links = utilities.getPageLinks(currentUrl, body);
  if (links.length === 0) {
    return process.nextTick(callback);
  }

  let completed = 0, hasErrors = false;

  links.forEach(link => {
    /// 给队列出传递一个任务,这个任务首先是一个函数,其次该函数接受一个参数
    /// 当调用任务时,触发该函数,然后给函数传递一个参数,告诉该函数在任务结束时干什么
    downloadQueue.pushTask(done => {
      spider(link, nesting - 1, err => {
        /// 这里表示,只要发生错误,队列就会退出
        if (err) {
          hasErrors = true;
          return callback(err);
        }
        if (++completed === links.length && !hasErrors) {
          callback();
        }

        done();
      });
    });

  });
}

const spidering = new Map();

function spider(url, nesting, callback) {
  if (spidering.has(url)) {
    return process.nextTick(callback);
  }

  spidering.set(url, true);

  const filename = utilities.urlToFilename(url);

  /// In this pattern, there will be some issues.
  /// Possible problems to download the same url again and again。
  fs.readFile(filename, "utf8", (err, body) => {
    if (err) {
      if (err.code !== &#39;ENOENT&#39;) {
        return callback(err);
      }
      return download(url, filename, (err, body) => {
        if (err) {
          return callback(err);
        }
        spiderLinks(url, body, nesting, callback);
      });
    }

    spiderLinks(url, body, nesting, callback);
  });
}

spider(process.argv[2], 2, (err, filename, downloaded) => {
  if (err) {
    console.log(`error: ${err}`);
  } else if (downloaded) {
    console.log(`Completed the download of ${filename}`);
  } else {
    console.log(`${filename} was already downloaded`);
  }
});

因此,為了限制並發的個數,只需在spiderLinks 方法中,把task遍歷放入佇列就可以了。這相對來說很簡單。

到這裡為止,我們使用原生JavaScript實作了一個有相對完整功能的網路蜘蛛,既能串行,也能並發,還可以控制並發個數。


2.使用async函式庫

把不同的功能放到不同的函式中,會為我們帶來巨大的好處,async函式庫十分流行,它的性能也不錯,它內部基於callback。

(spider_v6.js)

const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");
const series = require("async/series");
const eachSeries = require("async/eachSeries");

function download(url, filename, callback) {
  console.log(`Downloading ${url}`);

  let body;

  series([
    callback => {
      request(url, (err, response, resBody) => {
        if (err) {
          return callback(err);
        }
        body = resBody;
        callback();
      });
    },
    mkdirp.bind(null, path.dirname(filename)),
    callback => {
      fs.writeFile(filename, body, callback);
    }
  ], err => {
    if (err) {
      return callback(err);
    }
    console.log(`Downloaded and saved: ${url}`);
    callback(null, body);
  });
}

/// 最大的启发是实现了如何异步循环遍历数组
function spiderLinks(currentUrl, body, nesting, callback) {
  if (nesting === 0) {
    return process.nextTick(callback);
  }

  const links = utilities.getPageLinks(currentUrl, body);
  if (links.length === 0) {
    return process.nextTick(callback);
  }

  eachSeries(links, (link, cb) => {
    "use strict";
    spider(link, nesting - 1, cb);
  }, callback);
}

const spidering = new Map();

function spider(url, nesting, callback) {
  if (spidering.has(url)) {
    return process.nextTick(callback);
  }

  spidering.set(url, true);

  const filename = utilities.urlToFilename(url);

  fs.readFile(filename, "utf8", (err, body) => {
    if (err) {
      if (err.code !== &#39;ENOENT&#39;) {
        return callback(err);
      }
      return download(url, filename, (err, body) => {
        if (err) {
          return callback(err);
        }
        spiderLinks(url, body, nesting, callback);
      });
    }

    spiderLinks(url, body, nesting, callback);
  });
}

spider(process.argv[2], 1, (err, filename, downloaded) => {
  if (err) {
    console.log(err);
  } else if (downloaded) {
    console.log(`Completed the download of ${filename}`);
  } else {
    console.log(`${filename} was already downloaded`);
  }
});
在上邊的程式碼中,我們只使用了async的三個功能:

####
const series = require("async/series"); // 串行
const eachSeries = require("async/eachSeries"); // 并行
const queue = require("async/queue"); // 队列
###由於比較簡單,就不會解釋了。 async中的佇列的程式碼在(spider_v7.js)中,和上邊我們自訂的佇列很相似,也不做更多解釋了。 #########3.Promise#########Promise是一個協議,有很多函式庫實作了這個協議,我們用的是ES6的實作。簡單來說promise就是一個約定,如果完成了,就呼叫它的resolve方法,失敗了就呼叫它的reject方法。它內有實作了then方法,then回傳promise本身,這樣就形成了呼叫鏈。 ###

其实Promise的内容有很多,在实际应用中是如何把普通的函数promise化。这方面的内容在这里也不讲了,我自己也不够格

(spider_v8.js)


const utilities = require("./utilities");
const request = utilities.promisify(require("request"));
const fs = require("fs");
const readFile = utilities.promisify(fs.readFile);
const writeFile = utilities.promisify(fs.writeFile);
const mkdirp = utilities.promisify(require("mkdirp"));
const path = require("path");


function saveFile(filename, contents, callback) {
  mkdirp(path.dirname(filename), err => {
    if (err) {
      return callback(err);
    }
    fs.writeFile(filename, contents, callback);
  });
}

function download(url, filename) {
  console.log(`Downloading ${url}`);

  let body;

  return request(url)
    .then(response => {
      "use strict";
      body = response.body;
      return mkdirp(path.dirname(filename));
    })
    .then(() => writeFile(filename, body))
    .then(() => {
      "use strict";
      console.log(`Downloaded adn saved: ${url}`);
      return body;
    });
}

/// promise编程的本质就是为了解决在函数中设置回调函数的问题
/// 通过中间层promise来实现异步函数同步化
function spiderLinks(currentUrl, body, nesting) {
  let promise = Promise.resolve();
  if (nesting === 0) {
    return promise;
  }

  const links = utilities.getPageLinks(currentUrl, body);

  links.forEach(link => {
    "use strict";
    promise = promise.then(() => spider(link, nesting - 1));
  });

  return promise;
}

function spider(url, nesting) {
  const filename = utilities.urlToFilename(url);

  return readFile(filename, "utf8")
    .then(
      body => spiderLinks(url, body, nesting),
      err => {
        "use strict";
        if (err.code !== &#39;ENOENT&#39;) {
          /// 抛出错误,这个方便与在整个异步链的最后通过呢catch来捕获这个链中的错误
          throw err;
        }
        return download(url, filename)
          .then(body => spiderLinks(url, body, nesting));
      }
    );
}

spider(process.argv[2], 1)
  .then(() => {
    "use strict";
    console.log(&#39;Download complete&#39;);
  })
  .catch(err => {
    "use strict";
    console.log(err);
  });

可以看到上边的代码中的函数都是没有callback的,只需要在最后catch就可以了。

在设计api的时候,应该支持两种方式,及支持callback,又支持promise


function asyncpision(pidend, pisor, cb) {
  return new Promise((resolve, reject) => {
    "use strict";
    process.nextTick(() => {
      const result = pidend / pisor;
      if (isNaN(result) || !Number.isFinite(result)) {
        const error = new Error("Invalid operands");
        if (cb) {
          cb(error);
        }
        return reject(error);
      }

      if (cb) {
        cb(null, result);
      }
      resolve(result);
    });
  });
}

asyncpision(10, 2, (err, result) => {
  "use strict";
  if (err) {
    return console.log(err);
  }
  console.log(result);
});

asyncpision(22, 11)
  .then((result) => console.log(result))
  .catch((err) => console.log(err));

4.Generator

Generator很有意思,他可以让暂停函数和恢复函数,利用thunkify和co这两个库,我们下边的代码实现起来非常酷。

(spider_v9.js)


const thunkify = require("thunkify");
const co = require("co");
const path = require("path");
const utilities = require("./utilities");

const request = thunkify(require("request"));
const fs = require("fs");
const mkdirp = thunkify(require("mkdirp"));
const readFile = thunkify(fs.readFile);
const writeFile = thunkify(fs.writeFile);
const nextTick = thunkify(process.nextTick);

function* download(url, filename) {
  console.log(`Downloading ${url}`);

  const response = yield request(url);
  console.log(response);

  const body = response[1];
  yield mkdirp(path.dirname(filename));

  yield writeFile(filename, body);

  console.log(`Downloaded and saved ${url}`);
  return body;
}

function* spider(url, nesting) {
  const filename = utilities.urlToFilename(url);

  let body;

  try {
    body = yield readFile(filename, "utf8");
  } catch (err) {
    if (err.code !== &#39;ENOENT&#39;) {
      throw err;
    }
    body = yield download(url, filename);
  }

  yield spiderLinks(url, body, nesting);
}

function* spiderLinks(currentUrl, body, nesting) {
  if (nesting === 0) {
    return nextTick();
  }

  const links = utilities.getPageLinks(currentUrl, body);

  for (let i = 0; i < links.length; i++) {
    yield spider(links[i], nesting - 1);
  }
}

/// 通过co就自动处理了回调函数,直接返回了回调函数中的参数,把这些参数放到一个数组中,但是去掉了err信息
co(function* () {
  try {
    yield spider(process.argv[2], 1);
    console.log(&#39;Download complete&#39;);
  } catch (err) {
    console.log(err);
  }
});

总结

我并没有写promise和generator并发的代码。以上这些内容来自于这本书nodejs-design-patterns 。

相关推荐:

Node.js之关于异步流控制的简单介绍

微信小程序中Promise进行异步流程处理的实现过程

小程序开发之利用co处理异步流程的实例教程

以上是Node.js異步流控制詳解的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn