Home > Article > Web Front-end > What is concurrency control? How to implement concurrency control in JavaScript?
In the daily development process, you may encounter concurrency control scenarios, such as controlling the number of concurrent requests. So how to implement concurrency control in JavaScript? Before answering this question, let's briefly introduce concurrency control.
Suppose there are 6 to-do tasks to be executed, and we want to limit the number of tasks that can be executed at the same time, that is, at most 2 tasks can be executed at the same time. When any task in Executing Task List is completed, the program will automatically obtain a new to-do task from To-Do Task List and add the task to Executing task list . In order to allow everyone to understand the above process more intuitively, Abago specially drew the following 3 pictures:
Okay, after introducing concurrency control, Brother Abao will use the async-pool library on Github to introduce the specific implementation of asynchronous task concurrency control.
async-pool:https://github.com/rxaviers/async-pool
Run multiple promise-returning & async functions with limited concurrency using native ES6/ ES7.
async-pool This library provides two different versions of implementation, ES7 and ES6. After analyzing the Before the specific implementation, let's take a look at how to use it.
const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i)); await asyncPool(2, [1000, 5000, 3000, 2000], timeout);
In the above code, we use the async-pool provided by this libraryasyncPool
Function to implement concurrency control of asynchronous tasks. asyncPool
The signature of the function is as follows:
function asyncPool(poolLimit, array, iteratorFn){ ... }
The function receives 3 parameters:
(numeric type): represents The number of limited concurrency;
(array type): represents the task array;
(function type): represents the iteration function, used To implement processing of each task item, this function will return a Promise object or an asynchronous function.
asyncPool function, the corresponding execution process is as follows:
const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i)); await asyncPool(2, [1000, 5000, 3000, 2000], timeout); // Call iterator (i = 1000) // Call iterator (i = 5000) // Pool limit of 2 reached, wait for the quicker one to complete... // 1000 finishes // Call iterator (i = 3000) // Pool limit of 2 reached, wait for the quicker one to complete... // 3000 finishes // Call iterator (i = 2000) // Itaration is complete, wait until running ones complete... // 5000 finishes // 2000 finishes // Resolves, results are passed in given array order `[1000, 5000, 3000, 2000]`.By observing the above annotation information, We can roughly understand the control flow inside the
asyncPool function. Let's first analyze the ES7 implementation of the
asyncPool function.
Follow "Full Stack Road to Immortality" and read 4 free e-books originally created by Brother A Bao (a total of 30,000 downloads) and more than 50 TS series tutorials.2.2 asyncPool ES7 implementation
async function asyncPool(poolLimit, array, iteratorFn) { const ret = []; // 存储所有的异步任务 const executing = []; // 存储正在执行的异步任务 for (const item of array) { // 调用iteratorFn函数创建异步任务 const p = Promise.resolve().then(() => iteratorFn(item, array)); ret.push(p); // 保存新的异步任务 // 当poolLimit值小于或等于总任务个数时,进行并发控制 if (poolLimit <= array.length) { // 当任务完成后,从正在执行的任务数组中移除已完成的任务 const e = p.then(() => executing.splice(executing.indexOf(e), 1)); executing.push(e); // 保存正在执行的异步任务 if (executing.length >= poolLimit) { await Promise.race(executing); // 等待较快的任务执行完成 } } } return Promise.all(ret); }
Promise.all and
Promise.race are fully utilized. Combined with the
async await feature provided in ES7, the concurrency control function is finally realized. Using the
await Promise.race(executing); line of statement, we will wait for the faster task in the
executing task list to complete before continuing to execute the next loop.
async await feature.
function asyncPool(poolLimit, array, iteratorFn) { let i = 0; const ret = []; // 存储所有的异步任务 const executing = []; // 存储正在执行的异步任务 const enqueue = function () { if (i === array.length) { return Promise.resolve(); } const item = array[i++]; // 获取新的任务项 const p = Promise.resolve().then(() => iteratorFn(item, array)); ret.push(p); let r = Promise.resolve(); // 当poolLimit值小于或等于总任务个数时,进行并发控制 if (poolLimit <= array.length) { // 当任务完成后,从正在执行的任务数组中移除已完成的任务 const e = p.then(() => executing.splice(executing.indexOf(e), 1)); executing.push(e); if (executing.length >= poolLimit) { r = Promise.race(executing); } } // 正在执行任务列表 中较快的任务执行完成之后,才会从array数组中获取新的待办任务 return r.then(() => enqueue()); }; return enqueue().then(() => Promise.all(ret)); }
enqueue function. When the
Promise object returned by
Promise.race(executing) becomes completed, the
enqueue function will be called, from
array Get new to-do tasks from the array.
asyncPool, we have used
Promise.all and
Promise.race functions. Among them, handwritten
Promise.all is a common interview question. Just taking advantage of this opportunity, Brother A Bao and everyone came to handwrite simple versions of the
Promise.all and
Promise.race functions.
Promise.all(iterable)
方法会返回一个 promise 对象,当输入的所有 promise 对象的状态都变成 resolved
时,返回的 promise 对象就会以数组的形式,返回每个 promise 对象 resolve 后的结果。当输入的任何一个 promise 对象状态变成 rejected
时,则返回的 promise 对象会 reject 对应的错误信息。
Promise.all = function (iterators) { return new Promise((resolve, reject) => { if (!iterators || iterators.length === 0) { resolve([]); } else { let count = 0; // 计数器,用于判断所有任务是否执行完成 let result = []; // 结果数组 for (let i = 0; i < iterators.length; i++) { // 考虑到iterators[i]可能是普通对象,则统一包装为Promise对象 Promise.resolve(iterators[i]).then( (data) => { result[i] = data; // 按顺序保存对应的结果 // 当所有任务都执行完成后,再统一返回结果 if (++count === iterators.length) { resolve(result); } }, (err) => { reject(err); // 任何一个Promise对象执行失败,则调用reject()方法 return; } ); } } }); };
需要注意的是对于 Promise.all
的标准实现来说,它的参数是一个可迭代对象,比如 Array、String 或 Set 等。
Promise.race(iterable)
方法会返回一个 promise 对象,一旦迭代器中的某个 promise 对象 resolved 或 rejected,返回的 promise 对象就会 resolve 或 reject 相应的值。
Promise.race = function (iterators) { return new Promise((resolve, reject) => { for (const iter of iterators) { Promise.resolve(iter) .then((res) => { resolve(res); }) .catch((e) => { reject(e); }); } }); };
本文阿宝哥带大家详细分析了 async-pool 异步任务并发控制的具体实现,同时为了让大家能够更好地理解 async-pool 的核心代码。最后阿宝哥还带大家一起手写简易版的 Promise.all
和 Promise.race
函数。其实除了 Promise.all
函数之外,还存在另一个函数 —— Promise.allSettled
,该函数用于解决 Promise.all
存在的问题,感兴趣的小伙伴可以自行研究一下。
更多编程相关知识,请访问:编程视频!!
The above is the detailed content of What is concurrency control? How to implement concurrency control in JavaScript?. For more information, please follow other related articles on the PHP Chinese website!