Как лучше всего ограничить параллелизм при использовании ES6 Promise.all ()?

98

У меня есть код, который выполняет итерацию по списку, который был запрошен из базы данных, и делает HTTP-запрос для каждого элемента в этом списке. Иногда этот список может быть достаточно большим (тысячи), и я хотел бы убедиться, что я не попадаю на веб-сервер с тысячами одновременных HTTP-запросов.

Сокращенная версия этого кода в настоящее время выглядит примерно так ...

function getCounts() {
  return users.map(user => {
    return new Promise(resolve => {
      remoteServer.getCount(user) // makes an HTTP request
      .then(() => {
        /* snip */
        resolve();
      });
    });
  });
}

Promise.all(getCounts()).then(() => { /* snip */});

Этот код работает на узле 4.3.2. Повторюсь, можно Promise.allли управлять таким образом, чтобы в любой момент времени выполнялось только определенное количество обещаний?

Крис
источник
1
возможный дубликат Limit concurrency of обещания выполняется
Берги
3
Не забывайте, что это Promise.allдействительно управляет продвижением обещаний - обещания делают это сами, Promise.allпросто ожидая их.
Bergi

Ответы:

51

Обратите внимание, что Promise.all()обещания не запускают свою работу, а само создание обещаний.

Имея это в виду, одним из решений было бы проверять всякий раз, когда выполняется обещание, следует ли запускать новое обещание или вы уже достигли предела.

Однако изобретать велосипед здесь действительно не нужно. Одна из библиотек, которую вы можете использовать для этой цели, - этоes6-promise-pool . Из их примеров:

// On the Web, leave out this line and use the script tag above instead. 
var PromisePool = require('es6-promise-pool')

var promiseProducer = function () {
  // Your code goes here. 
  // If there is work left to be done, return the next work item as a promise. 
  // Otherwise, return null to indicate that all promises have been created. 
  // Scroll down for an example. 
}

// The number of promises to process simultaneously. 
var concurrency = 3

// Create a pool. 
var pool = new PromisePool(promiseProducer, concurrency)

// Start the pool. 
var poolPromise = pool.start()

// Wait for the pool to settle. 
poolPromise.then(function () {
  console.log('All promises fulfilled')
}, function (error) {
  console.log('Some promise rejected: ' + error.message)
})
Тимо
источник
25
К сожалению, es6-prom-pool заново изобретает Promise вместо того, чтобы использовать их. Вместо этого я предлагаю это краткое решение (если вы уже используете ES6 или ES7) github.com/rxaviers/async-pool
Rafael Xavier
3
Взглянул на оба, async-pool выглядит лучше! Более простой и легкий.
Endless
2
Я также обнаружил, что p-limit - самая простая реализация. См. Мой пример ниже. stackoverflow.com/a/52262024/8177355
Matthew Rideout
2
Я думаю, что tiny-asyc-pool намного лучшее, ненавязчивое и довольно естественное решение для ограничения параллелизма обещаний.
Sunny Tambi
73

P-предел

Я сравнил ограничение параллелизма обещаний с настраиваемым скриптом, bluebird, es6-prom-pool и p-limit. Я считаю, что p-limit имеет наиболее простую и урезанную реализацию для этой потребности. Смотрите их документацию .

Требования

Чтобы быть совместимым с async в примере

Мой пример

В этом примере нам нужно запустить функцию для каждого URL-адреса в массиве (например, запрос API). Вот это называется fetchData(). Если бы у нас был массив из тысяч элементов для обработки, параллелизм определенно был бы полезен для экономии ресурсов процессора и памяти.

const pLimit = require('p-limit');

// Example Concurrency of 3 promise at once
const limit = pLimit(3);

let urls = [
    "http://www.exampleone.com/",
    "http://www.exampletwo.com/",
    "http://www.examplethree.com/",
    "http://www.examplefour.com/",
]

// Create an array of our promises using map (fetchData() returns a promise)
let promises = urls.map(url => {

    // wrap the function we are calling in the limit function we defined above
    return limit(() => fetchData(url));
});

(async () => {
    // Only three promises are run at once (as defined above)
    const result = await Promise.all(promises);
    console.log(result);
})();

Результатом журнала консоли является массив данных ответов ваших разрешенных обещаний.

Мэтью Райдаут
источник
4
Спасибо за это! Этот намного проще
Джон
3
Это была, безусловно, лучшая библиотека для ограничения одновременных запросов, которую я когда-либо видел. И отличный пример, спасибо!
Крис Ливдал
2
Спасибо за сравнение. Вы сравнивали с github.com/rxaviers/async-pool ?
ахонг
1
Простота использования, отличный выбор.
drmrbrewer
22

С помощью Array.prototype.splice

while (funcs.length) {
  // 100 at at time
  await Promise.all( funcs.splice(0, 100).map(f => f()) )
}
замедленная икра
источник
2
Это недооцененное решение. Любите простоту.
Браннон,
8
Это запускает функции пакетами, а не пулом, где одна функция вызывается сразу после завершения другой.
cltsang
Очень понравилось это решение!
прасун
потребовалась секунда, чтобы понять, что он делает, из-за отсутствия большего контекста вокруг него, например, это была партия, а не пул. Вы меняете порядок массива каждый раз, когда выполняете стыковку с начала или посередине. (браузер должен переиндексировать все элементы) теоретическая производительность лучше альтернатива - брать материал с конца, вместо этого, arr.splice(-100)если доза заказа не совпадает, возможно, вы можете перевернуть массив: P
Бесконечный
Очень полезно для работы партиями. Примечание: следующая партия не начнется, пока текущая партия не будет завершена на 100%.
Кейси Дуэйн,
20

Если вы знаете, как работают итераторы и как они потребляются, вам не понадобится дополнительная библиотека, так как можно очень легко создать свой собственный параллелизм самостоятельно. Позвольте мне продемонстрировать:

/* [Symbol.iterator]() is equivalent to .values()
const iterator = [1,2,3][Symbol.iterator]() */
const iterator = [1,2,3].values()


// loop over all items with for..of
for (const x of iterator) {
  console.log('x:', x)
  
  // notices how this loop continues the same iterator
  // and consumes the rest of the iterator, making the
  // outer loop not logging any more x's
  for (const y of iterator) {
    console.log('y:', y)
  }
}

Мы можем использовать один и тот же итератор и делиться им между воркерами.

Если бы вы использовали .entries()вместо этого, .values()вы бы использовали 2D-массив, с [[index, value]]которым я продемонстрирую ниже, с параллелизмом 2

const sleep = t => new Promise(rs => setTimeout(rs, t))

async function doWork(iterator) {
  for (let [index, item] of iterator) {
    await sleep(1000)
    console.log(index + ': ' + item)
  }
}

const iterator = Array.from('abcdefghij').entries()
const workers = new Array(2).fill(iterator).map(doWork)
//    ^--- starts two workers sharing the same iterator

Promise.allSettled(workers).then(() => console.log('done'))

Преимущество этого заключается в том, что у вас может быть функция генератора вместо того, чтобы все было готово сразу.


Примечание: отличие от примера async-pool заключается в том, что он порождает двух рабочих, поэтому, если один рабочий выдает ошибку по какой-либо причине, скажем, с индексом 5, это не помешает другому рабочему выполнить остальное. Таким образом, вы переходите от выполнения 2 параллелизма к 1 (чтобы не останавливаться на достигнутом). Поэтому я советую выявлять все ошибки внутри doWorkфункции.

Бесконечный
источник
Это круто! Спасибо бесконечное!
user3413723
Это определенно крутой подход! Просто убедитесь, что ваш параллелизм не превышает длину вашего списка задач (если вы все равно заботитесь о результатах), так как вы можете получить дополнительные возможности!
Крис Ойе
Что-то, что может быть круче позже, - это когда Streams получат поддержку Readable.from (итератор) . Chrome уже сделал потоки переносимыми . чтобы вы могли создавать читаемые потоки и отправлять их веб-воркерам, и все они в конечном итоге использовали бы один и тот же базовый итератор.
Бесконечный
16

Bluebird в Promise.map может воспользоваться возможностью параллелизма , чтобы контролировать , сколько обещаний должны выполняться параллельно. Иногда это проще, чем .allпотому, что вам не нужно создавать массив обещаний.

const Promise = require('bluebird')

function getCounts() {
  return Promise.map(users, user => {
    return new Promise(resolve => {
      remoteServer.getCount(user) // makes an HTTP request
      .then(() => {
        /* snip */
        resolve();
       });
    });
  }, {concurrency: 10}); // <---- at most 10 http requests at a time
}
Цзиншао Чен
источник
bluebird отлично подходит, если вам нужны более быстрые обещания и ~ 18 Кб дополнительного мусора, если вы используете его только для одной цели;)
Endless
1
Все зависит от того, насколько важно для вас одно и есть ли другой, более быстрый / простой способ лучше. Типичный компромисс. Я выберу простоту использования и функциональность в несколько килобайт, но не YMMV.
Jingshao Chen
11

Вместо использования обещаний для ограничения HTTP-запросов используйте встроенный узел http.Agent.maxSockets . Это устраняет необходимость использования библиотеки или написания собственного кода пула и дает дополнительное преимущество - больший контроль над тем, что вы ограничиваете.

agent.maxSockets

По умолчанию установлено на бесконечность. Определяет, сколько одновременных сокетов может быть открыто агентом для каждого источника. Источник - это комбинация «хост: порт» или «хост: порт: localAddress».

Например:

var http = require('http');
var agent = new http.Agent({maxSockets: 5}); // 5 concurrent connections per origin
var request = http.request({..., agent: agent}, ...);

Если вы делаете несколько запросов к одному и тому же источнику, вам также может быть полезно установить keepAliveзначение true (дополнительную информацию см. В документации выше).

tcooc
источник
11
Тем не менее, немедленное создание тысяч замыканий и объединение сокетов не кажется очень эффективным?
Bergi
3

Предлагаю библиотеку async-pool: https://github.com/rxaviers/async-pool

npm install tiny-async-pool

Описание:

Запускать несколько функций, возвращающих обещание, и асинхронных функций с ограниченным параллелизмом, используя собственный ES6 / ES7.

asyncPool выполняет несколько функций, возвращающих обещание, и асинхронных функций в ограниченном пуле параллелизма. Он сразу же отвергает, как только одно из обещаний отклоняется. Он разрешается, когда все обещания выполняются. Он вызывает функцию итератора как можно скорее (при ограничении параллелизма).

Применение:

const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
await asyncPool(2, [1000, 5000, 3000, 2000], timeout);
// Call iterator (i = 1000)
// Call iterator (i = 5000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 1000 finishes
// Call iterator (i = 3000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 3000 finishes
// Call iterator (i = 2000)
// Itaration is complete, wait until running ones complete...
// 5000 finishes
// 2000 finishes
// Resolves, results are passed in given array order `[1000, 5000, 3000, 2000]`.
Venryx
источник
1
Работает для меня. Спасибо. Это отличная библиотека.
Sunny Tambi
2

Это можно решить с помощью рекурсии.

Идея состоит в том, что изначально вы отправляете максимально допустимое количество запросов, и каждый из этих запросов должен рекурсивно продолжать отправлять себя по завершении.

function batchFetch(urls, concurrentRequestsLimit) {
    return new Promise(resolve => {
        var documents = [];
        var index = 0;

        function recursiveFetch() {
            if (index === urls.length) {
                return;
            }
            fetch(urls[index++]).then(r => {
                documents.push(r.text());
                if (documents.length === urls.length) {
                    resolve(documents);
                } else {
                    recursiveFetch();
                }
            });
        }

        for (var i = 0; i < concurrentRequestsLimit; i++) {
            recursiveFetch();
        }
    });
}

var sources = [
    'http://www.example_1.com/',
    'http://www.example_2.com/',
    'http://www.example_3.com/',
    ...
    'http://www.example_100.com/'
];
batchFetch(sources, 5).then(documents => {
   console.log(documents);
});
Антон Фил
источник
2

Вот мое решение ES7 для копирования и вставки с полной Promise.all()/ map()альтернативной функцией с ограничением параллелизма.

Подобно тому, как Promise.all()он поддерживает порядок возврата, а также резерв для возвращаемых значений, не обещающих.

Я также включил сравнение различных реализаций, поскольку оно иллюстрирует некоторые аспекты, которые упустили некоторые другие решения.

Применение

const asyncFn = delay => new Promise(resolve => setTimeout(() => resolve(), delay));
const args = [30, 20, 15, 10];
await asyncPool(args, arg => asyncFn(arg), 4); // concurrency limit of 4

Реализация

async function asyncBatch(args, fn, limit = 8) {
  // Copy arguments to avoid side effects
  args = [...args];
  const outs = [];
  while (args.length) {
    const batch = args.splice(0, limit);
    const out = await Promise.all(batch.map(fn));
    outs.push(...out);
  }
  return outs;
}

async function asyncPool(args, fn, limit = 8) {
  return new Promise((resolve) => {
    // Copy arguments to avoid side effect, reverse queue as
    // pop is faster than shift
    const argQueue = [...args].reverse();
    let count = 0;
    const outs = [];
    const pollNext = () => {
      if (argQueue.length === 0 && count === 0) {
        resolve(outs);
      } else {
        while (count < limit && argQueue.length) {
          const index = args.length - argQueue.length;
          const arg = argQueue.pop();
          count += 1;
          const out = fn(arg);
          const processOut = (out, index) => {
            outs[index] = out;
            count -= 1;
            pollNext();
          };
          if (typeof out === 'object' && out.then) {
            out.then(out => processOut(out, index));
          } else {
            processOut(out, index);
          }
        }
      }
    };
    pollNext();
  });
}

Сравнение

// A simple async function that returns after the given delay
// and prints its value to allow us to determine the response order
const asyncFn = delay => new Promise(resolve => setTimeout(() => {
  console.log(delay);
  resolve(delay);
}, delay));

// List of arguments to the asyncFn function
const args = [30, 20, 15, 10];

// As a comparison of the different implementations, a low concurrency
// limit of 2 is used in order to highlight the performance differences.
// If a limit greater than or equal to args.length is used the results
// would be identical.

// Vanilla Promise.all/map combo
const out1 = await Promise.all(args.map(arg => asyncFn(arg)));
// prints: 10, 15, 20, 30
// total time: 30ms

// Pooled implementation
const out2 = await asyncPool(args, arg => asyncFn(arg), 2);
// prints: 20, 30, 15, 10
// total time: 40ms

// Batched implementation
const out3 = await asyncBatch(args, arg => asyncFn(arg), 2);
// prints: 20, 30, 20, 30
// total time: 45ms

console.log(out1, out2, out3); // prints: [30, 20, 15, 10] x 3

// Conclusion: Execution order and performance is different,
// but return order is still identical

Вывод

asyncPool() должен быть лучшим решением, поскольку он позволяет запускать новые запросы сразу после завершения любого предыдущего.

asyncBatch() включен для сравнения, так как его реализация проще для понимания, но должна быть медленнее по производительности, так как все запросы в одном пакете должны завершиться, чтобы начать следующий пакет.

В этом надуманном примере ваниль без ограничений, Promise.all()конечно, самый быстрый, в то время как другие могут работать более желательно в сценарии перегрузки в реальном мире.

Обновить

Библиотека async-pool, которую уже предлагали другие, вероятно, является лучшей альтернативой моей реализации, поскольку она работает почти идентично и имеет более краткую реализацию с умным использованием Promise.race (): https://github.com/rxaviers/ асинхронный пул / blob / master / lib / es7.js

Надеюсь, мой ответ по-прежнему будет иметь образовательную ценность.

Adelost
источник
1

Вот базовый пример для потоковой передачи и p-limit. Он передает поток чтения http в mongo db.

const stream = require('stream');
const util = require('util');
const pLimit = require('p-limit');
const es = require('event-stream');
const streamToMongoDB = require('stream-to-mongo-db').streamToMongoDB;


const pipeline = util.promisify(stream.pipeline)

const outputDBConfig = {
    dbURL: 'yr-db-url',
    collection: 'some-collection'
};
const limit = pLimit(3);

async yrAsyncStreamingFunction(readStream) => {
        const mongoWriteStream = streamToMongoDB(outputDBConfig);
        const mapperStream = es.map((data, done) => {
                let someDataPromise = limit(() => yr_async_call_to_somewhere())

                    someDataPromise.then(
                        function handleResolve(someData) {

                            data.someData = someData;    
                            done(null, data);
                        },
                        function handleError(error) {
                            done(error)
                        }
                    );
                })

            await pipeline(
                readStream,
                JSONStream.parse('*'),
                mapperStream,
                mongoWriteStream
            );
        }
gosuer1921
источник
0

Итак, я попытался заставить несколько показанных примеров работать для моего кода, но поскольку это было только для сценария импорта, а не производственного кода, использование пакета npm package batch-promises было, безусловно, самым простым путем для меня.

ПРИМЕЧАНИЕ. Требуется среда выполнения для поддержки Promise или для полифилла.

Api batchPromises (int: batchSize, array: Collection, i => Promise: Iteratee) Promise: Iteratee будет вызываться после каждого пакета.

Использование:

batch-promises
Easily batch promises

NOTE: Requires runtime to support Promise or to be polyfilled.

Api
batchPromises(int: batchSize, array: Collection, i => Promise: Iteratee)
The Promise: Iteratee will be called after each batch.

Use:
import batchPromises from 'batch-promises';
 
batchPromises(2, [1,2,3,4,5], i => new Promise((resolve, reject) => {
 
  // The iteratee will fire after each batch resulting in the following behaviour:
  // @ 100ms resolve items 1 and 2 (first batch of 2)
  // @ 200ms resolve items 3 and 4 (second batch of 2)
  // @ 300ms resolve remaining item 5 (last remaining batch)
  setTimeout(() => {
    resolve(i);
  }, 100);
}))
.then(results => {
  console.log(results); // [1,2,3,4,5]
});

Агусти Фернандес Пардо
источник
0

Рекурсия - это ответ, если вы не хотите использовать внешние библиотеки

downloadAll(someArrayWithData){
  var self = this;

  var tracker = function(next){
    return self.someExpensiveRequest(someArrayWithData[next])
    .then(function(){
      next++;//This updates the next in the tracker function parameter
      if(next < someArrayWithData.length){//Did I finish processing all my data?
        return tracker(next);//Go to the next promise
      }
    });
  }

  return tracker(0); 
}
Хуан
источник
0

Это то, что я использовал Promise.raceв своем коде здесь

const identifyTransactions = async function() {
  let promises = []
  let concurrency = 0
  for (let tx of this.transactions) {
    if (concurrency > 4)
      await Promise.race(promises).then(r => { promises = []; concurrency = 0 })
    promises.push(tx.identifyTransaction())
    concurrency++
  }
  if (promises.length > 0)
    await Promise.race(promises) //resolve the rest
}

Если вы хотите увидеть пример: https://jsfiddle.net/thecodermarcelo/av2tp83o/5/

Alex
источник
2
Я бы не стал называть это параллелизмом ... Это больше похоже на пакетное выполнение ... Вы выполняете 4 задачи, ждете, пока все закончатся, а затем выполняете следующие 4. Если одна из них решается раньше, вы все равно ждете завершения остальных 3 , вам следует использоватьPromise.race
Endless
2
что-то вроде этого: github.com/rxaviers/async-pool/blob/master/lib/es7.js
Endless
0
  • Ответ @tcooc был довольно крутым. Не знал об этом и буду использовать это в будущем.
  • Мне также понравился ответ @MatthewRideout , но он использует внешнюю библиотеку !!

По возможности я стараюсь разрабатывать подобные вещи самостоятельно, а не в библиотеке. В конечном итоге вы узнаете множество концепций, которые раньше казались сложными.

Что вы, ребята, думаете об этой попытке:
(Я много думал об этом и думаю, что она работает, но укажите, если это не так или что-то в корне не так)

 class Pool{
        constructor(maxAsync) {
            this.maxAsync = maxAsync;
            this.asyncOperationsQueue = [];
            this.currentAsyncOperations = 0
        }

        runAnother() {
            if (this.asyncOperationsQueue.length > 0 && this.currentAsyncOperations < this.maxAsync) {
                this.currentAsyncOperations += 1;
                this.asyncOperationsQueue.pop()()
                    .then(() => { this.currentAsyncOperations -= 1; this.runAnother() }, () => { this.currentAsyncOperations -= 1; this.runAnother() })
            }
        }

        add(f){  // the argument f is a function of signature () => Promise
            this.runAnother();
            return new Promise((resolve, reject) => {
                this.asyncOperationsQueue.push(
                    () => f().then(resolve).catch(reject)
                )
            })
        }
    }

//#######################################################
//                        TESTS
//#######################################################

function dbCall(id, timeout, fail) {
    return new Promise((resolve, reject) => {
        setTimeout(() => {
            if (fail) {
               reject(`Error for id ${id}`);
            } else {
                resolve(id);
            }
        }, timeout)
    }
    )
}


const dbQuery1 = () => dbCall(1, 5000, false);
const dbQuery2 = () => dbCall(2, 5000, false);
const dbQuery3 = () => dbCall(3, 5000, false);
const dbQuery4 = () => dbCall(4, 5000, true);
const dbQuery5 = () => dbCall(5, 5000, false);


const cappedPool = new Pool(2);

const dbQuery1Res = cappedPool.add(dbQuery1).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery2Res = cappedPool.add(dbQuery2).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery3Res = cappedPool.add(dbQuery3).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery4Res = cappedPool.add(dbQuery4).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery5Res = cappedPool.add(dbQuery5).catch(i => i).then(i => console.log(`Resolved: ${i}`))

Этот подход предоставляет хороший API, похожий на пулы потоков в scala / java.
Создав один экземпляр пула с помощью const cappedPool = new Pool(2), вы просто предоставляете ему обещания cappedPool.add(() => myPromise).
Мы должны позаботиться о том, чтобы обещание не запускалось немедленно, и поэтому мы должны «выполнять его лениво» с помощью функции.

Самое главное, обратите внимание, что результатом метода add является Promise, который будет завершен / разрешен со значением вашего исходного обещания ! Это делает использование очень интуитивным.

const resultPromise = cappedPool.add( () => dbCall(...))
resultPromise
.then( actualResult => {
   // Do something with the result form the DB
  }
)
Карлос Тейшейра
источник
0

К сожалению, с помощью встроенного Promise.all это невозможно сделать, поэтому нужно проявить творческий подход.

Это самый быстрый и краткий способ, который я мог найти без использования каких-либо сторонних библиотек.

Он использует новую функцию JavaScript, называемую итератором. Итератор в основном отслеживает, какие элементы были обработаны, а какие нет.

Чтобы использовать его в коде, вы создаете массив асинхронных функций. Каждая асинхронная функция запрашивает у одного и того же итератора следующий элемент, который необходимо обработать. Каждая функция асинхронно обрабатывает свой собственный элемент и по завершении запрашивает у итератора новый элемент. Когда в итераторе заканчиваются элементы, все функции завершаются.

Спасибо @Endless за вдохновение.

var items = [
    "https://www.stackoverflow.com",
    "https://www.stackoverflow.com",
    "https://www.stackoverflow.com",
    "https://www.stackoverflow.com",
    "https://www.stackoverflow.com",
    "https://www.stackoverflow.com",
    "https://www.stackoverflow.com",
    "https://www.stackoverflow.com",
];

var concurrency = 5

Array(concurrency).fill(items.entries()).map(async (cursor) => {
    for(let [index, url] of cursor){
        console.log("getting url is ", index, url);
        // run your async task instead of this next line
        var text = await fetch(url).then(res => res.text());
        console.log("text is", text.slice(0,20));
    }
})

user3413723
источник
Любопытно, почему это было снижено. Это очень похоже на то, что я придумал.
Крис Ой,
0

Так много хороших решений. Я начал с элегантного решения, опубликованного @Endless, и закончил этим небольшим методом расширения, который не использует никаких внешних библиотек и не запускается партиями (хотя предполагается, что у вас есть такие функции, как async и т. Д.):

Promise.allWithLimit = async (taskList, limit = 5) => {
    const iterator = taskList.entries();
    let results = new Array(taskList.length);
    let workerThreads = new Array(limit).fill(0).map(() => 
        new Promise(async (resolve, reject) => {
            try {
                let entry = iterator.next();
                while (!entry.done) {
                    let [index, promise] = entry.value;
                    try {
                        results[index] = await promise;
                        entry = iterator.next();
                    }
                    catch (err) {
                        results[index] = err;
                    }
                }
                // No more work to do
                resolve(true); 
            }
            catch (err) {
                // This worker is dead
                reject(err);
            }
        }));

    await Promise.all(workerThreads);
    return results;
};

    Promise.allWithLimit = async (taskList, limit = 5) => {
        const iterator = taskList.entries();
        let results = new Array(taskList.length);
        let workerThreads = new Array(limit).fill(0).map(() => 
            new Promise(async (resolve, reject) => {
                try {
                    let entry = iterator.next();
                    while (!entry.done) {
                        let [index, promise] = entry.value;
                        try {
                            results[index] = await promise;
                            entry = iterator.next();
                        }
                        catch (err) {
                            results[index] = err;
                        }
                    }
                    // No more work to do
                    resolve(true); 
                }
                catch (err) {
                    // This worker is dead
                    reject(err);
                }
            }));
    
        await Promise.all(workerThreads);
        return results;
    };

    const demoTasks = new Array(10).fill(0).map((v,i) => new Promise(resolve => {
       let n = (i + 1) * 5;
       setTimeout(() => {
          console.log(`Did nothing for ${n} seconds`);
          resolve(n);
       }, n * 1000);
    }));

    var results = Promise.allWithLimit(demoTasks);

Крис Ой
источник