Heim >Web-Frontend >js-Tutorial >Was ist Parallelitätskontrolle? Wie implementiert man die Parallelitätskontrolle in JavaScript?

Was ist Parallelitätskontrolle? Wie implementiert man die Parallelitätskontrolle in JavaScript?

青灯夜游
青灯夜游nach vorne
2021-06-28 10:50:553113Durchsuche

Was ist Parallelitätskontrolle? Wie implementiert man die Parallelitätskontrolle in JavaScript?

Im täglichen Entwicklungsprozess können Parallelitätskontrollszenarien auftreten, z. B. die Steuerung der Anzahl gleichzeitiger Anforderungen. Wie implementiert man also die Parallelitätskontrolle in JavaScript? Bevor wir diese Frage beantworten, stellen wir kurz die Parallelitätskontrolle vor.

Angenommen, es gibt 6 zu erledigende Aufgaben, die ausgeführt werden müssen, und wir möchten die Anzahl der Aufgaben begrenzen, die gleichzeitig ausgeführt werden können, d. h. es können höchstens 2 Aufgaben gleichzeitig ausgeführt werden. Wenn eine Aufgabe in der „Liste der ausgeführten Aufgaben“ abgeschlossen ist, ruft das Programm automatisch eine neue zu erledigende Aufgabe aus der „Liste der zu erledigenden Aufgaben“ ab und fügt die Aufgabe der „Liste der ausgeführten Aufgaben“ hinzu. Damit jeder den obigen Prozess intuitiver verstehen kann, zeichnete Abago absichtlich die folgenden 3 Bilder: 1.1 Phase 1.2 Phase zwei

1.3 Phase drei

Was ist Parallelitätskontrolle? Wie implementiert man die Parallelitätskontrolle in JavaScript?

Okay, nach der Einführung der Parallelitätskontrolle wird Bruder Abao die spezifische Implementierung der Parallelitätskontrolle asynchroner Aufgaben mithilfe der Bibliothek „async-pool“ auf Github vorstellen.

Was ist Parallelitätskontrolle? Wie implementiert man die Parallelitätskontrolle in JavaScript?async-pool: https://github.com/rxaviers/async-pool

Führen Sie mehrere Promise-Returning- und Async-Funktionen mit begrenzter Parallelität mit nativem ES6/ES7 aus.

Was ist Parallelitätskontrolle? Wie implementiert man die Parallelitätskontrolle in JavaScript?

2. Implementierung der Parallelitätskontrolle

async-pool Diese Bibliothek bietet zwei verschiedene Versionen der Implementierung, ES7 und ES6, bevor wir ihre spezifische Implementierung analysieren.

2.1 Verwendung von asyncPool

const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
await asyncPool(2, [1000, 5000, 3000, 2000], timeout);
Im obigen Code verwenden wir die Funktion asyncPool, die von der Bibliothek async-pool

bereitgestellt wird, um die Parallelitätssteuerung asynchroner Aufgaben zu implementieren. Die Signatur der Funktion asyncPool lautet wie folgt:
function asyncPool(poolLimit, array, iteratorFn){ ... }

Die Funktion empfängt 3 Parameter:
  • poolLimit (numerischer Typ): stellt die Anzahl dar Parallelitätsgrenzen ;
  • array (Array-Typ): stellt das Aufgabenarray dar;
  • iteratorFn (Funktionstyp): stellt das dar Iterationsfunktion, die zum Implementieren der Verarbeitung jedes Aufgabenelements verwendet wird. Diese Funktion gibt ein Promise-Objekt oder eine asynchrone Funktion zurück.
Für das obige Beispiel sieht der entsprechende Ausführungsprozess nach Verwendung der Funktion asyncPool wie folgt aus:

const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
await asyncPool(2, [1000, 5000, 3000, 2000], timeout);
// Call iterator (i = 1000)
// Call iterator (i = 5000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 1000 finishes
// Call iterator (i = 3000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 3000 finishes
// Call iterator (i = 2000)
// Itaration is complete, wait until running ones complete...
// 5000 finishes
// 2000 finishes
// Resolves, results are passed in given array order `[1000, 5000, 3000, 2000]`.

Durch Beobachtung der obigen Anmerkungsinformationen können wir das grob verstehen Kontrollfluss innerhalb der Funktion asyncPool. Lassen Sie uns zunächst die ES7-Implementierung der Funktion asyncPool analysieren.

Folgen Sie „Full Stack Road to Immortality“ und lesen Sie 4 kostenlose E-Books, die ursprünglich von Bruder A Bao erstellt wurden (insgesamt über 30.000 Downloads) und mehr als 50 Tutorials zur TS-Serie.

2.2 asyncPool ES7-Implementierung

async function asyncPool(poolLimit, array, iteratorFn) {
  const ret = []; // 存储所有的异步任务
  const executing = []; // 存储正在执行的异步任务
  for (const item of array) {
    // 调用iteratorFn函数创建异步任务
    const p = Promise.resolve().then(() => iteratorFn(item, array));
    ret.push(p); // 保存新的异步任务

    // 当poolLimit值小于或等于总任务个数时,进行并发控制
    if (poolLimit <= array.length) {
      // 当任务完成后,从正在执行的任务数组中移除已完成的任务
      const e = p.then(() => executing.splice(executing.indexOf(e), 1));
      executing.push(e); // 保存正在执行的异步任务
      if (executing.length >= poolLimit) {
        await Promise.race(executing); // 等待较快的任务执行完成
      }
    }
  }
  return Promise.all(ret);
}
Im obigen Code werden Promise.all und Promise.race vollständig genutzt Funktionsmerkmale realisieren in Kombination mit der in ES7 bereitgestellten Funktion async waiting schließlich die Parallelitätskontrollfunktion. Mit der Anweisungszeile await Promise.race(executing); warten wir auf den Abschluss der schnelleren Aufgabe in der Liste der ausgeführten AufgabenasyncPool 函数来实现异步任务的并发控制。 asyncPool 函数的签名如下所示:

function asyncPool(poolLimit, array, iteratorFn) {
  let i = 0;
  const ret = []; // 存储所有的异步任务
  const executing = []; // 存储正在执行的异步任务
  const enqueue = function () {
    if (i === array.length) {
      return Promise.resolve();
    }
    const item = array[i++]; // 获取新的任务项
    const p = Promise.resolve().then(() => iteratorFn(item, array));
    ret.push(p);

    let r = Promise.resolve();

    // 当poolLimit值小于或等于总任务个数时,进行并发控制
    if (poolLimit <= array.length) {
      // 当任务完成后,从正在执行的任务数组中移除已完成的任务
      const e = p.then(() => executing.splice(executing.indexOf(e), 1));
      executing.push(e);
      if (executing.length >= poolLimit) {
        r = Promise.race(executing); 
      }
    }
 
    // 正在执行任务列表 中较快的任务执行完成之后,才会从array数组中获取新的待办任务
    return r.then(() => enqueue());
  };
  return enqueue().then(() => Promise.all(ret));
}

该函数接收 3 个参数:

  • poolLimit(数字类型):表示限制的并发数;
  • array(数组类型):表示任务数组;
  • iteratorFn(函数类型):表示迭代函数,用于实现对每个任务项进行处理,该函数会返回一个 Promise 对象或异步函数。

对于以上示例来说,在使用了 asyncPool 函数之后,对应的执行过程如下所示:

Promise.all = function (iterators) {
  return new Promise((resolve, reject) => {
    if (!iterators || iterators.length === 0) {
      resolve([]);
    } else {
      let count = 0; // 计数器,用于判断所有任务是否执行完成
      let result = []; // 结果数组
      for (let i = 0; i < iterators.length; i++) {
        // 考虑到iterators[i]可能是普通对象,则统一包装为Promise对象
        Promise.resolve(iterators[i]).then(
          (data) => {
            result[i] = data; // 按顺序保存对应的结果
            // 当所有任务都执行完成后,再统一返回结果
            if (++count === iterators.length) {
              resolve(result);
            }
          },
          (err) => {
            reject(err); // 任何一个Promise对象执行失败,则调用reject()方法
            return;
          }
        );
      }
    }
  });
};

通过观察以上的注释信息,我们可以大致地了解 asyncPool 函数内部的控制流程。下面我们先来分析 asyncPool 函数的 ES7 实现。

关注「全栈修仙之路」阅读阿宝哥原创的 4 本免费电子书(累计下载 3万+)及 50 几篇 TS 系列教程。

2.2 asyncPool ES7 实现

Promise.race = function (iterators) {
  return new Promise((resolve, reject) => {
    for (const iter of iterators) {
      Promise.resolve(iter)
        .then((res) => {
          resolve(res);
        })
        .catch((e) => {
          reject(e);
        });
    }
  });
};

在以上代码中,充分利用了 Promise.allPromise.race 函数特点,再结合 ES7 中提供的 async await 特性,最终实现了并发控制的功能。利用 await Promise.race(executing); 这行语句,我们会等待 正在执行任务列表 中较快的任务执行完成之后,才会继续执行下一次循环。

asyncPool ES7 实现相对比较简单,接下来我们来看一下不使用 async await 特性要如何实现同样的功能。

2.3 asyncPool ES6 实现

rrreee

在 ES6 的实现版本中,通过内部封装的 enqueue 函数来实现核心的控制逻辑。当 Promise.race(executing) 返回的 Promise 对象变成已完成状态时,才会调用 enqueue 函数,从 array 数组中获取新的待办任务。

三、阿宝哥有话说

asyncPool 这个库的 ES7 和 ES6 的具体实现中,我们都使用到了 Promise.allPromise.race 函数。其中手写 Promise.all 是一道常见的面试题。刚好趁着这个机会,阿宝哥跟大家一起来手写简易版的 Promise.allPromise.race, bevor wir mit der Ausführung der nächsten Schleife fortfahren.

🎜Die Implementierung von asyncPool ES7 ist relativ einfach. Schauen wir uns als Nächstes an, wie wir dieselbe Funktion erreichen können, ohne die Funktion async waiting zu verwenden. 🎜

2.3 asyncPool ES6-Implementierung🎜rrreee🎜In der ES6-Implementierungsversion wird die Kernsteuerungslogik durch die intern gekapselte enqueue-Funktion implementiert. Wenn das von Promise.race(executing) zurückgegebene Promise-Objekt abgeschlossen ist, wird die Funktion enqueue aus dem array Holen Sie sich neue Aufgaben aus dem Array. 🎜<h3 data-id="heading-8">3. Bruder Abao hat etwas zu sagen🎜🎜In der spezifischen Implementierung von ES7 und ES6 der <code>asyncPool-Bibliothek verwenden wir alle Promise .all- und Promise.race-Funktionen. Unter ihnen ist die Handschrift Promise.all eine häufige Frage in Vorstellungsgesprächen. Bruder A Bao nutzte diese Gelegenheit und schrieb gemeinsam mit allen handschriftlich einfache Versionen der Funktionen Promise.all und Promise.race. 🎜

3.1 手写 Promise.all

Promise.all(iterable) 方法会返回一个 promise 对象,当输入的所有 promise 对象的状态都变成 resolved 时,返回的 promise 对象就会以数组的形式,返回每个 promise 对象 resolve 后的结果。当输入的任何一个 promise 对象状态变成 rejected 时,则返回的 promise 对象会 reject 对应的错误信息。

Promise.all = function (iterators) {
  return new Promise((resolve, reject) => {
    if (!iterators || iterators.length === 0) {
      resolve([]);
    } else {
      let count = 0; // 计数器,用于判断所有任务是否执行完成
      let result = []; // 结果数组
      for (let i = 0; i < iterators.length; i++) {
        // 考虑到iterators[i]可能是普通对象,则统一包装为Promise对象
        Promise.resolve(iterators[i]).then(
          (data) => {
            result[i] = data; // 按顺序保存对应的结果
            // 当所有任务都执行完成后,再统一返回结果
            if (++count === iterators.length) {
              resolve(result);
            }
          },
          (err) => {
            reject(err); // 任何一个Promise对象执行失败,则调用reject()方法
            return;
          }
        );
      }
    }
  });
};

需要注意的是对于 Promise.all 的标准实现来说,它的参数是一个可迭代对象,比如 Array、String 或 Set 等。

3.2 手写 Promise.race

Promise.race(iterable) 方法会返回一个 promise 对象,一旦迭代器中的某个 promise 对象 resolvedrejected,返回的 promise 对象就会 resolve 或 reject 相应的值。

Promise.race = function (iterators) {
  return new Promise((resolve, reject) => {
    for (const iter of iterators) {
      Promise.resolve(iter)
        .then((res) => {
          resolve(res);
        })
        .catch((e) => {
          reject(e);
        });
    }
  });
};

本文阿宝哥带大家详细分析了 async-pool 异步任务并发控制的具体实现,同时为了让大家能够更好地理解 async-pool 的核心代码。最后阿宝哥还带大家一起手写简易版的 Promise.allPromise.race 函数。其实除了 Promise.all 函数之外,还存在另一个函数 —— Promise.allSettled,该函数用于解决 Promise.all 存在的问题,感兴趣的小伙伴可以自行研究一下。

四、参考资源

更多编程相关知识,请访问:编程视频!!

Das obige ist der detaillierte Inhalt vonWas ist Parallelitätskontrolle? Wie implementiert man die Parallelitätskontrolle in JavaScript?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:juejin.cn. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen