首頁 >web前端 >js教程 >什麼是並發控制? JavaScript中如何實現並發控制?

什麼是並發控制? JavaScript中如何實現並發控制?

青灯夜游
青灯夜游轉載
2021-06-28 10:50:553087瀏覽

什麼是並發控制? JavaScript中如何實現並發控制?

在日常開發過程中,你可能會遇到並發控制的場景,例如控制請求並發數。那麼在 JavaScript 中如何實現並發控制呢?在回答這個問題之前,我們來先簡單介紹一下並發控制。

假設有 6 個待辦任務要執行,而我們希望限制同時執行的任務個數,也就是最多只有 2 個任務能同時執行。當正在執行任務清單 中的任何1 個任務完成後,程式會自動從待辦事項清單 中取得新的待辦任務並將該任務加入正在執行任務清單 中。為了讓大家更直觀地理解上述的過程,阿寶哥特意畫了以下3 張圖:

1.1 階段一

什麼是並發控制? JavaScript中如何實現並發控制?

##1.2 階段二

什麼是並發控制? JavaScript中如何實現並發控制?

#1.3 階段三

什麼是並發控制? JavaScript中如何實現並發控制?

好的,介紹完並發控制之後,阿寶哥將以Github 上

async-pool 這個函式庫來介紹一下非同步任務並發控制的具體實作。

async-pool:https://github.com/rxaviers/async-pool

Run multiple promise-returning & async functions with limited concurrency using native ES6/ ES7。

二、並發控制的實作

async-pool 這個函式庫提供了ES7 和ES6 兩個不同版本的實現,在分析其在具體實作之前,我們先來看看它如何使用。

2.1 asyncPool 的使用
const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
await asyncPool(2, [1000, 5000, 3000, 2000], timeout);

在上述程式碼中,我們使用

async-pool 這個函式庫提供的asyncPool函數來實現非同步任務的並發控制。 asyncPool 函數的簽章如下所示:

function asyncPool(poolLimit, array, iteratorFn){ ... }

此函數接收3 個參數:

  • poolLimit(數字型別):表示限制的並發數;
  • array(數組類型):表示任務數組;
  • iteratorFn(函數類型):表示迭代函數,用於實作對每個任務項目進行處理,函數會傳回一個Promise 物件或非同步函數。
對於上述範例來說,在使用了

asyncPool 函數之後,對應的執行過程如下所示:

const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
await asyncPool(2, [1000, 5000, 3000, 2000], timeout);
// Call iterator (i = 1000)
// Call iterator (i = 5000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 1000 finishes
// Call iterator (i = 3000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 3000 finishes
// Call iterator (i = 2000)
// Itaration is complete, wait until running ones complete...
// 5000 finishes
// 2000 finishes
// Resolves, results are passed in given array order `[1000, 5000, 3000, 2000]`.

透過觀察以上的註解信息,我們可以大致了解

asyncPool 函數內部的控制流程。下面我們先來分析 asyncPool 函數的 ES7 實作。

關注「全棧修仙之路」閱讀阿寶哥原創的 4 本免費電子書(累計下載 3萬 )及 50 幾篇 TS 系列教學。

2.2 asyncPool ES7 實作

async function asyncPool(poolLimit, array, iteratorFn) {
  const ret = []; // 存储所有的异步任务
  const executing = []; // 存储正在执行的异步任务
  for (const item of array) {
    // 调用iteratorFn函数创建异步任务
    const p = Promise.resolve().then(() => iteratorFn(item, array));
    ret.push(p); // 保存新的异步任务

    // 当poolLimit值小于或等于总任务个数时,进行并发控制
    if (poolLimit <= array.length) {
      // 当任务完成后,从正在执行的任务数组中移除已完成的任务
      const e = p.then(() => executing.splice(executing.indexOf(e), 1));
      executing.push(e); // 保存正在执行的异步任务
      if (executing.length >= poolLimit) {
        await Promise.race(executing); // 等待较快的任务执行完成
      }
    }
  }
  return Promise.all(ret);
}

在上述程式碼中,充分利用了

Promise.allPromise.race 函數特點,再結合ES7 中提供的async await 特性,最終實現了同時控制的功能。利用 await Promise.race(executing); 這行語句,我們會等待 正在執行任務清單 中較快的任務執行完成之後,才會繼續執行下一次迴圈。

asyncPool ES7 實作相對比較簡單,接下來我們來看看不使用

async await 特性要如何實現同樣的功能。

2.3 asyncPool ES6 實作

function asyncPool(poolLimit, array, iteratorFn) {
  let i = 0;
  const ret = []; // 存储所有的异步任务
  const executing = []; // 存储正在执行的异步任务
  const enqueue = function () {
    if (i === array.length) {
      return Promise.resolve();
    }
    const item = array[i++]; // 获取新的任务项
    const p = Promise.resolve().then(() => iteratorFn(item, array));
    ret.push(p);

    let r = Promise.resolve();

    // 当poolLimit值小于或等于总任务个数时,进行并发控制
    if (poolLimit <= array.length) {
      // 当任务完成后,从正在执行的任务数组中移除已完成的任务
      const e = p.then(() => executing.splice(executing.indexOf(e), 1));
      executing.push(e);
      if (executing.length >= poolLimit) {
        r = Promise.race(executing); 
      }
    }
 
    // 正在执行任务列表 中较快的任务执行完成之后,才会从array数组中获取新的待办任务
    return r.then(() => enqueue());
  };
  return enqueue().then(() => Promise.all(ret));
}

在 ES6 的實作版本中,透過內部封裝的

enqueue 函式來實現核心的控制邏輯。當Promise.race(executing) 傳回的Promise 物件變成已完成狀態時,才會呼叫enqueue 函數,從array數組中取得新的待辦任務。

三、阿寶哥有話說

asyncPool 這個函式庫的ES7 和ES6 的具體實作中,我們都使用到了Promise.allPromise.race 函數。其中手寫 Promise.all 是一道常見的面試題。剛好趁著這個機會,阿寶哥跟大家一起來手寫簡易版的 Promise.allPromise.race 函數。

3.1 手写 Promise.all

Promise.all(iterable) 方法会返回一个 promise 对象,当输入的所有 promise 对象的状态都变成 resolved 时,返回的 promise 对象就会以数组的形式,返回每个 promise 对象 resolve 后的结果。当输入的任何一个 promise 对象状态变成 rejected 时,则返回的 promise 对象会 reject 对应的错误信息。

Promise.all = function (iterators) {
  return new Promise((resolve, reject) => {
    if (!iterators || iterators.length === 0) {
      resolve([]);
    } else {
      let count = 0; // 计数器,用于判断所有任务是否执行完成
      let result = []; // 结果数组
      for (let i = 0; i < iterators.length; i++) {
        // 考虑到iterators[i]可能是普通对象,则统一包装为Promise对象
        Promise.resolve(iterators[i]).then(
          (data) => {
            result[i] = data; // 按顺序保存对应的结果
            // 当所有任务都执行完成后,再统一返回结果
            if (++count === iterators.length) {
              resolve(result);
            }
          },
          (err) => {
            reject(err); // 任何一个Promise对象执行失败,则调用reject()方法
            return;
          }
        );
      }
    }
  });
};

需要注意的是对于 Promise.all 的标准实现来说,它的参数是一个可迭代对象,比如 Array、String 或 Set 等。

3.2 手写 Promise.race

Promise.race(iterable) 方法会返回一个 promise 对象,一旦迭代器中的某个 promise 对象 resolvedrejected,返回的 promise 对象就会 resolve 或 reject 相应的值。

Promise.race = function (iterators) {
  return new Promise((resolve, reject) => {
    for (const iter of iterators) {
      Promise.resolve(iter)
        .then((res) => {
          resolve(res);
        })
        .catch((e) => {
          reject(e);
        });
    }
  });
};

本文阿宝哥带大家详细分析了 async-pool 异步任务并发控制的具体实现,同时为了让大家能够更好地理解 async-pool 的核心代码。最后阿宝哥还带大家一起手写简易版的 Promise.allPromise.race 函数。其实除了 Promise.all 函数之外,还存在另一个函数 —— Promise.allSettled,该函数用于解决 Promise.all 存在的问题,感兴趣的小伙伴可以自行研究一下。

四、参考资源

更多编程相关知识,请访问:编程视频!!

以上是什麼是並發控制? JavaScript中如何實現並發控制?的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:juejin.cn。如有侵權,請聯絡admin@php.cn刪除