У меня есть код, который выполняет итерацию по списку, который был запрошен из базы данных, и делает HTTP-запрос для каждого элемента в этом списке. Иногда этот список может быть достаточно большим (тысячи), и я хотел бы убедиться, что я не попадаю на веб-сервер с тысячами одновременных HTTP-запросов.
Сокращенная версия этого кода в настоящее время выглядит примерно так ...
function getCounts() {
return users.map(user => {
return new Promise(resolve => {
remoteServer.getCount(user) // makes an HTTP request
.then(() => {
/* snip */
resolve();
});
});
});
}
Promise.all(getCounts()).then(() => { /* snip */});
Этот код работает на узле 4.3.2. Повторюсь, можно Promise.all
ли управлять таким образом, чтобы в любой момент времени выполнялось только определенное количество обещаний?
Promise.all
действительно управляет продвижением обещаний - обещания делают это сами,Promise.all
просто ожидая их.Promise
антипаттерна конструктора !Ответы:
Обратите внимание, что
Promise.all()
обещания не запускают свою работу, а само создание обещаний.Имея это в виду, одним из решений было бы проверять всякий раз, когда выполняется обещание, следует ли запускать новое обещание или вы уже достигли предела.
Однако изобретать велосипед здесь действительно не нужно. Одна из библиотек, которую вы можете использовать для этой цели, - это
es6-promise-pool
. Из их примеров:// On the Web, leave out this line and use the script tag above instead. var PromisePool = require('es6-promise-pool') var promiseProducer = function () { // Your code goes here. // If there is work left to be done, return the next work item as a promise. // Otherwise, return null to indicate that all promises have been created. // Scroll down for an example. } // The number of promises to process simultaneously. var concurrency = 3 // Create a pool. var pool = new PromisePool(promiseProducer, concurrency) // Start the pool. var poolPromise = pool.start() // Wait for the pool to settle. poolPromise.then(function () { console.log('All promises fulfilled') }, function (error) { console.log('Some promise rejected: ' + error.message) })
источник
P-предел
Я сравнил ограничение параллелизма обещаний с настраиваемым скриптом, bluebird, es6-prom-pool и p-limit. Я считаю, что p-limit имеет наиболее простую и урезанную реализацию для этой потребности. Смотрите их документацию .
Требования
Чтобы быть совместимым с async в примере
Мой пример
В этом примере нам нужно запустить функцию для каждого URL-адреса в массиве (например, запрос API). Вот это называется
fetchData()
. Если бы у нас был массив из тысяч элементов для обработки, параллелизм определенно был бы полезен для экономии ресурсов процессора и памяти.const pLimit = require('p-limit'); // Example Concurrency of 3 promise at once const limit = pLimit(3); let urls = [ "http://www.exampleone.com/", "http://www.exampletwo.com/", "http://www.examplethree.com/", "http://www.examplefour.com/", ] // Create an array of our promises using map (fetchData() returns a promise) let promises = urls.map(url => { // wrap the function we are calling in the limit function we defined above return limit(() => fetchData(url)); }); (async () => { // Only three promises are run at once (as defined above) const result = await Promise.all(promises); console.log(result); })();
Результатом журнала консоли является массив данных ответов ваших разрешенных обещаний.
источник
С помощью
Array.prototype.splice
while (funcs.length) { // 100 at at time await Promise.all( funcs.splice(0, 100).map(f => f()) ) }
источник
arr.splice(-100)
если доза заказа не совпадает, возможно, вы можете перевернуть массив: PЕсли вы знаете, как работают итераторы и как они потребляются, вам не понадобится дополнительная библиотека, так как можно очень легко создать свой собственный параллелизм самостоятельно. Позвольте мне продемонстрировать:
/* [Symbol.iterator]() is equivalent to .values() const iterator = [1,2,3][Symbol.iterator]() */ const iterator = [1,2,3].values() // loop over all items with for..of for (const x of iterator) { console.log('x:', x) // notices how this loop continues the same iterator // and consumes the rest of the iterator, making the // outer loop not logging any more x's for (const y of iterator) { console.log('y:', y) } }
Мы можем использовать один и тот же итератор и делиться им между воркерами.
Если бы вы использовали
.entries()
вместо этого,.values()
вы бы использовали 2D-массив, с[[index, value]]
которым я продемонстрирую ниже, с параллелизмом 2const sleep = t => new Promise(rs => setTimeout(rs, t)) async function doWork(iterator) { for (let [index, item] of iterator) { await sleep(1000) console.log(index + ': ' + item) } } const iterator = Array.from('abcdefghij').entries() const workers = new Array(2).fill(iterator).map(doWork) // ^--- starts two workers sharing the same iterator Promise.allSettled(workers).then(() => console.log('done'))
Преимущество этого заключается в том, что у вас может быть функция генератора вместо того, чтобы все было готово сразу.
Примечание: отличие от примера async-pool заключается в том, что он порождает двух рабочих, поэтому, если один рабочий выдает ошибку по какой-либо причине, скажем, с индексом 5, это не помешает другому рабочему выполнить остальное. Таким образом, вы переходите от выполнения 2 параллелизма к 1 (чтобы не останавливаться на достигнутом). Поэтому я советую выявлять все ошибки внутри
doWork
функции.источник
Bluebird в Promise.map может воспользоваться возможностью параллелизма , чтобы контролировать , сколько обещаний должны выполняться параллельно. Иногда это проще, чем
.all
потому, что вам не нужно создавать массив обещаний.const Promise = require('bluebird') function getCounts() { return Promise.map(users, user => { return new Promise(resolve => { remoteServer.getCount(user) // makes an HTTP request .then(() => { /* snip */ resolve(); }); }); }, {concurrency: 10}); // <---- at most 10 http requests at a time }
источник
Вместо использования обещаний для ограничения HTTP-запросов используйте встроенный узел http.Agent.maxSockets . Это устраняет необходимость использования библиотеки или написания собственного кода пула и дает дополнительное преимущество - больший контроль над тем, что вы ограничиваете.
Например:
var http = require('http'); var agent = new http.Agent({maxSockets: 5}); // 5 concurrent connections per origin var request = http.request({..., agent: agent}, ...);
Если вы делаете несколько запросов к одному и тому же источнику, вам также может быть полезно установить
keepAlive
значение true (дополнительную информацию см. В документации выше).источник
Предлагаю библиотеку async-pool: https://github.com/rxaviers/async-pool
npm install tiny-async-pool
Описание:
Применение:
const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i)); await asyncPool(2, [1000, 5000, 3000, 2000], timeout); // Call iterator (i = 1000) // Call iterator (i = 5000) // Pool limit of 2 reached, wait for the quicker one to complete... // 1000 finishes // Call iterator (i = 3000) // Pool limit of 2 reached, wait for the quicker one to complete... // 3000 finishes // Call iterator (i = 2000) // Itaration is complete, wait until running ones complete... // 5000 finishes // 2000 finishes // Resolves, results are passed in given array order `[1000, 5000, 3000, 2000]`.
источник
Это можно решить с помощью рекурсии.
Идея состоит в том, что изначально вы отправляете максимально допустимое количество запросов, и каждый из этих запросов должен рекурсивно продолжать отправлять себя по завершении.
function batchFetch(urls, concurrentRequestsLimit) { return new Promise(resolve => { var documents = []; var index = 0; function recursiveFetch() { if (index === urls.length) { return; } fetch(urls[index++]).then(r => { documents.push(r.text()); if (documents.length === urls.length) { resolve(documents); } else { recursiveFetch(); } }); } for (var i = 0; i < concurrentRequestsLimit; i++) { recursiveFetch(); } }); } var sources = [ 'http://www.example_1.com/', 'http://www.example_2.com/', 'http://www.example_3.com/', ... 'http://www.example_100.com/' ]; batchFetch(sources, 5).then(documents => { console.log(documents); });
источник
Вот мое решение ES7 для копирования и вставки с полной
Promise.all()
/map()
альтернативной функцией с ограничением параллелизма.Подобно тому, как
Promise.all()
он поддерживает порядок возврата, а также резерв для возвращаемых значений, не обещающих.Я также включил сравнение различных реализаций, поскольку оно иллюстрирует некоторые аспекты, которые упустили некоторые другие решения.
Применение
const asyncFn = delay => new Promise(resolve => setTimeout(() => resolve(), delay)); const args = [30, 20, 15, 10]; await asyncPool(args, arg => asyncFn(arg), 4); // concurrency limit of 4
Реализация
async function asyncBatch(args, fn, limit = 8) { // Copy arguments to avoid side effects args = [...args]; const outs = []; while (args.length) { const batch = args.splice(0, limit); const out = await Promise.all(batch.map(fn)); outs.push(...out); } return outs; } async function asyncPool(args, fn, limit = 8) { return new Promise((resolve) => { // Copy arguments to avoid side effect, reverse queue as // pop is faster than shift const argQueue = [...args].reverse(); let count = 0; const outs = []; const pollNext = () => { if (argQueue.length === 0 && count === 0) { resolve(outs); } else { while (count < limit && argQueue.length) { const index = args.length - argQueue.length; const arg = argQueue.pop(); count += 1; const out = fn(arg); const processOut = (out, index) => { outs[index] = out; count -= 1; pollNext(); }; if (typeof out === 'object' && out.then) { out.then(out => processOut(out, index)); } else { processOut(out, index); } } } }; pollNext(); }); }
Сравнение
// A simple async function that returns after the given delay // and prints its value to allow us to determine the response order const asyncFn = delay => new Promise(resolve => setTimeout(() => { console.log(delay); resolve(delay); }, delay)); // List of arguments to the asyncFn function const args = [30, 20, 15, 10]; // As a comparison of the different implementations, a low concurrency // limit of 2 is used in order to highlight the performance differences. // If a limit greater than or equal to args.length is used the results // would be identical. // Vanilla Promise.all/map combo const out1 = await Promise.all(args.map(arg => asyncFn(arg))); // prints: 10, 15, 20, 30 // total time: 30ms // Pooled implementation const out2 = await asyncPool(args, arg => asyncFn(arg), 2); // prints: 20, 30, 15, 10 // total time: 40ms // Batched implementation const out3 = await asyncBatch(args, arg => asyncFn(arg), 2); // prints: 20, 30, 20, 30 // total time: 45ms console.log(out1, out2, out3); // prints: [30, 20, 15, 10] x 3 // Conclusion: Execution order and performance is different, // but return order is still identical
Вывод
asyncPool()
должен быть лучшим решением, поскольку он позволяет запускать новые запросы сразу после завершения любого предыдущего.asyncBatch()
включен для сравнения, так как его реализация проще для понимания, но должна быть медленнее по производительности, так как все запросы в одном пакете должны завершиться, чтобы начать следующий пакет.В этом надуманном примере ваниль без ограничений,
Promise.all()
конечно, самый быстрый, в то время как другие могут работать более желательно в сценарии перегрузки в реальном мире.Обновить
Библиотека async-pool, которую уже предлагали другие, вероятно, является лучшей альтернативой моей реализации, поскольку она работает почти идентично и имеет более краткую реализацию с умным использованием Promise.race (): https://github.com/rxaviers/ асинхронный пул / blob / master / lib / es7.js
Надеюсь, мой ответ по-прежнему будет иметь образовательную ценность.
источник
Вот базовый пример для потоковой передачи и p-limit. Он передает поток чтения http в mongo db.
const stream = require('stream'); const util = require('util'); const pLimit = require('p-limit'); const es = require('event-stream'); const streamToMongoDB = require('stream-to-mongo-db').streamToMongoDB; const pipeline = util.promisify(stream.pipeline) const outputDBConfig = { dbURL: 'yr-db-url', collection: 'some-collection' }; const limit = pLimit(3); async yrAsyncStreamingFunction(readStream) => { const mongoWriteStream = streamToMongoDB(outputDBConfig); const mapperStream = es.map((data, done) => { let someDataPromise = limit(() => yr_async_call_to_somewhere()) someDataPromise.then( function handleResolve(someData) { data.someData = someData; done(null, data); }, function handleError(error) { done(error) } ); }) await pipeline( readStream, JSONStream.parse('*'), mapperStream, mongoWriteStream ); }
источник
Итак, я попытался заставить несколько показанных примеров работать для моего кода, но поскольку это было только для сценария импорта, а не производственного кода, использование пакета npm package batch-promises было, безусловно, самым простым путем для меня.
ПРИМЕЧАНИЕ. Требуется среда выполнения для поддержки Promise или для полифилла.
Api batchPromises (int: batchSize, array: Collection, i => Promise: Iteratee) Promise: Iteratee будет вызываться после каждого пакета.
Использование:
batch-promises Easily batch promises NOTE: Requires runtime to support Promise or to be polyfilled. Api batchPromises(int: batchSize, array: Collection, i => Promise: Iteratee) The Promise: Iteratee will be called after each batch. Use: import batchPromises from 'batch-promises'; batchPromises(2, [1,2,3,4,5], i => new Promise((resolve, reject) => { // The iteratee will fire after each batch resulting in the following behaviour: // @ 100ms resolve items 1 and 2 (first batch of 2) // @ 200ms resolve items 3 and 4 (second batch of 2) // @ 300ms resolve remaining item 5 (last remaining batch) setTimeout(() => { resolve(i); }, 100); })) .then(results => { console.log(results); // [1,2,3,4,5] });
источник
Рекурсия - это ответ, если вы не хотите использовать внешние библиотеки
downloadAll(someArrayWithData){ var self = this; var tracker = function(next){ return self.someExpensiveRequest(someArrayWithData[next]) .then(function(){ next++;//This updates the next in the tracker function parameter if(next < someArrayWithData.length){//Did I finish processing all my data? return tracker(next);//Go to the next promise } }); } return tracker(0); }
источник
Это то, что я использовал
Promise.race
в своем коде здесьconst identifyTransactions = async function() { let promises = [] let concurrency = 0 for (let tx of this.transactions) { if (concurrency > 4) await Promise.race(promises).then(r => { promises = []; concurrency = 0 }) promises.push(tx.identifyTransaction()) concurrency++ } if (promises.length > 0) await Promise.race(promises) //resolve the rest }
Если вы хотите увидеть пример: https://jsfiddle.net/thecodermarcelo/av2tp83o/5/
источник
Promise.race
По возможности я стараюсь разрабатывать подобные вещи самостоятельно, а не в библиотеке. В конечном итоге вы узнаете множество концепций, которые раньше казались сложными.
Что вы, ребята, думаете об этой попытке:
(Я много думал об этом и думаю, что она работает, но укажите, если это не так или что-то в корне не так)
class Pool{ constructor(maxAsync) { this.maxAsync = maxAsync; this.asyncOperationsQueue = []; this.currentAsyncOperations = 0 } runAnother() { if (this.asyncOperationsQueue.length > 0 && this.currentAsyncOperations < this.maxAsync) { this.currentAsyncOperations += 1; this.asyncOperationsQueue.pop()() .then(() => { this.currentAsyncOperations -= 1; this.runAnother() }, () => { this.currentAsyncOperations -= 1; this.runAnother() }) } } add(f){ // the argument f is a function of signature () => Promise this.runAnother(); return new Promise((resolve, reject) => { this.asyncOperationsQueue.push( () => f().then(resolve).catch(reject) ) }) } } //####################################################### // TESTS //####################################################### function dbCall(id, timeout, fail) { return new Promise((resolve, reject) => { setTimeout(() => { if (fail) { reject(`Error for id ${id}`); } else { resolve(id); } }, timeout) } ) } const dbQuery1 = () => dbCall(1, 5000, false); const dbQuery2 = () => dbCall(2, 5000, false); const dbQuery3 = () => dbCall(3, 5000, false); const dbQuery4 = () => dbCall(4, 5000, true); const dbQuery5 = () => dbCall(5, 5000, false); const cappedPool = new Pool(2); const dbQuery1Res = cappedPool.add(dbQuery1).catch(i => i).then(i => console.log(`Resolved: ${i}`)) const dbQuery2Res = cappedPool.add(dbQuery2).catch(i => i).then(i => console.log(`Resolved: ${i}`)) const dbQuery3Res = cappedPool.add(dbQuery3).catch(i => i).then(i => console.log(`Resolved: ${i}`)) const dbQuery4Res = cappedPool.add(dbQuery4).catch(i => i).then(i => console.log(`Resolved: ${i}`)) const dbQuery5Res = cappedPool.add(dbQuery5).catch(i => i).then(i => console.log(`Resolved: ${i}`))
Этот подход предоставляет хороший API, похожий на пулы потоков в scala / java.
Создав один экземпляр пула с помощью
const cappedPool = new Pool(2)
, вы просто предоставляете ему обещанияcappedPool.add(() => myPromise)
.Мы должны позаботиться о том, чтобы обещание не запускалось немедленно, и поэтому мы должны «выполнять его лениво» с помощью функции.
Самое главное, обратите внимание, что результатом метода
add
является Promise, который будет завершен / разрешен со значением вашего исходного обещания ! Это делает использование очень интуитивным.const resultPromise = cappedPool.add( () => dbCall(...)) resultPromise .then( actualResult => { // Do something with the result form the DB } )
источник
К сожалению, с помощью встроенного Promise.all это невозможно сделать, поэтому нужно проявить творческий подход.
Это самый быстрый и краткий способ, который я мог найти без использования каких-либо сторонних библиотек.
Он использует новую функцию JavaScript, называемую итератором. Итератор в основном отслеживает, какие элементы были обработаны, а какие нет.
Чтобы использовать его в коде, вы создаете массив асинхронных функций. Каждая асинхронная функция запрашивает у одного и того же итератора следующий элемент, который необходимо обработать. Каждая функция асинхронно обрабатывает свой собственный элемент и по завершении запрашивает у итератора новый элемент. Когда в итераторе заканчиваются элементы, все функции завершаются.
Спасибо @Endless за вдохновение.
var items = [ "https://www.stackoverflow.com", "https://www.stackoverflow.com", "https://www.stackoverflow.com", "https://www.stackoverflow.com", "https://www.stackoverflow.com", "https://www.stackoverflow.com", "https://www.stackoverflow.com", "https://www.stackoverflow.com", ]; var concurrency = 5 Array(concurrency).fill(items.entries()).map(async (cursor) => { for(let [index, url] of cursor){ console.log("getting url is ", index, url); // run your async task instead of this next line var text = await fetch(url).then(res => res.text()); console.log("text is", text.slice(0,20)); } })
источник
Так много хороших решений. Я начал с элегантного решения, опубликованного @Endless, и закончил этим небольшим методом расширения, который не использует никаких внешних библиотек и не запускается партиями (хотя предполагается, что у вас есть такие функции, как async и т. Д.):
Promise.allWithLimit = async (taskList, limit = 5) => { const iterator = taskList.entries(); let results = new Array(taskList.length); let workerThreads = new Array(limit).fill(0).map(() => new Promise(async (resolve, reject) => { try { let entry = iterator.next(); while (!entry.done) { let [index, promise] = entry.value; try { results[index] = await promise; entry = iterator.next(); } catch (err) { results[index] = err; } } // No more work to do resolve(true); } catch (err) { // This worker is dead reject(err); } })); await Promise.all(workerThreads); return results; };
Promise.allWithLimit = async (taskList, limit = 5) => { const iterator = taskList.entries(); let results = new Array(taskList.length); let workerThreads = new Array(limit).fill(0).map(() => new Promise(async (resolve, reject) => { try { let entry = iterator.next(); while (!entry.done) { let [index, promise] = entry.value; try { results[index] = await promise; entry = iterator.next(); } catch (err) { results[index] = err; } } // No more work to do resolve(true); } catch (err) { // This worker is dead reject(err); } })); await Promise.all(workerThreads); return results; }; const demoTasks = new Array(10).fill(0).map((v,i) => new Promise(resolve => { let n = (i + 1) * 5; setTimeout(() => { console.log(`Did nothing for ${n} seconds`); resolve(n); }, n * 1000); })); var results = Promise.allWithLimit(demoTasks);
источник