Основная идея Parallel.ForEach()
заключается в том, что у вас есть набор потоков, и каждый поток обрабатывает часть коллекции. Как вы заметили, это не работает с async
- await
, где вы хотите освободить поток на время асинхронного вызова.
Вы можете «исправить» это, заблокировав ForEach()
потоки, но это разрушает весь смысл async
- await
.
То, что вы могли бы сделать, это использовать поток данных TPL вместо Parallel.ForEach()
, который Task
хорошо поддерживает асинхронные s.
В частности, ваш код может быть написан с использованием, TransformBlock
который преобразует каждый идентификатор в Customer
использование async
лямбда-выражения. Этот блок может быть настроен для параллельного выполнения. Вы бы связали этот блок с тем, ActionBlock
что записывает каждый Customer
на консоль. После того, как вы настроите блочную сеть, вы можете Post()
каждый идентификатор в TransformBlock
.
В коде:
var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var getCustomerBlock = new TransformBlock<string, Customer>(
async i =>
{
ICustomerRepo repo = new CustomerRepo();
return await repo.GetCustomer(i);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
writeCustomerBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
foreach (var id in ids)
getCustomerBlock.Post(id);
getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();
Хотя вы, вероятно, хотите ограничить параллельность TransformBlock
некоторой небольшой константой. Кроме того, вы можете ограничить емкость TransformBlock
и добавлять элементы к нему асинхронно SendAsync()
, например, если коллекция слишком большая.
Дополнительным преимуществом по сравнению с вашим кодом (если он работал) является то, что запись начнется, как только закончится отдельный элемент, а не будет ждать завершения всей обработки.
Parallel.ForEach()
дляPost()
элементов параллельно не должно быть никакого реального эффекта.Ответ Свика (как обычно) превосходен.
Тем не менее, я считаю Dataflow более полезным, когда у вас есть большие объемы данных для передачи. Или когда вам нужна
async
-совместимая очередь.В вашем случае более простое решение состоит в том, чтобы просто использовать
async
-стиль параллелизм:источник
Parallel.ForEach()
). Но я думаю, что в настоящее время это лучший вариант для работы практически со всемиasync
коллекциями.ParallelOptions
собирается помочь? Это применимо только к томуParallel.For/ForEach/Invoke
, что в качестве установленного ФП здесь бесполезно.GetCustomer
метод возвращает aTask<T>
, следует ли использоватьSelect(async i => { await repo.GetCustomer(i);});
?Parallel.ForEach
не поддерживаетasync
.Использование DataFlow, как предложил svick, может быть излишним, и ответ Стивена не предоставляет средств для контроля параллелизма операции. Однако это может быть достигнуто довольно просто:
Эти
ToArray()
вызовы могут быть оптимизированы с помощью массива вместо списка и заменить завершенные задачи, но я сомневаюсь , что это будет делать большую часть разницы в большинстве сценариев. Пример использования по вопросу ОП:Пользователь EDIT Fellow SO и мастер TPL Эли Арбель указали мне на соответствующую статью от Стивена Туба . Как обычно, его реализация элегантна и эффективна:
источник
Partitioner.Create
использует разбиение на фрагменты, которое предоставляет элементы динамически для различных задач, поэтому описанный вами сценарий не будет реализован . Также обратите внимание, что статическое (предопределенное) разбиение может быть быстрее в некоторых случаях из-за меньших издержек (особенно синхронизации). Для получения дополнительной информации см .: msdn.microsoft.com/en-us/library/dd997411(v=vs.110).aspx .Task.WhenAll
) будет содержать исключение (внутриAggregateException
), и, следовательно, если упомянутый вызывающийawait
объект используется , исключение будет выброшено на сайт вызова. Тем не менее,Task.WhenAll
он по-прежнему будет ждать завершения всех задач иGetPartitions
будет динамически распределять элементы приpartition.MoveNext
вызове, пока не останется больше элементов для обработки. Это означает, что если вы не добавите свой собственный механизм для остановки обработки (напримерCancellationToken
), это не произойдет само по себе.var current = partition.Current
раньше,await body
а затем использоватьcurrent
в продолжении (ContinueWith(t => { ... }
).Вы можете сэкономить усилия с новым пакетом AsyncEnumerator NuGet , который не существовал 4 года назад, когда вопрос был первоначально опубликован. Позволяет контролировать степень параллелизма:
Отказ от ответственности: я являюсь автором библиотеки AsyncEnumerator, которая имеет открытый исходный код и распространяется под лицензией MIT, и я публикую это сообщение просто для того, чтобы помочь сообществу.
источник
AsyncStreams
и должен сказать, что она великолепна. Не могу порекомендовать эту библиотеку.Оберните
Parallel.Foreach
вTask.Run()
и вместоawait
использования ключевого слова[yourasyncmethod].Result
(вам нужно выполнить задачу Task.Run, чтобы не блокировать поток пользовательского интерфейса)
Что-то вроде этого:
источник
Parallel.ForEach
выполнить параллельную работу, которая блокирует, пока все не будет сделано, и затем перенести все это в фоновый поток, чтобы иметь отзывчивый интерфейс. Любые проблемы с этим? Может быть, это слишком много, но это короткий, читаемый код.Task.Run
когдаTaskCompletionSource
это предпочтительнее.TaskCompletionSource
предпочтительнее?await
может быть перемещен вперед, чтобы сохранить дополнительное имя переменной.Это должно быть довольно эффективно и проще, чем заставить работать весь поток данных TPL:
источник
await
как:var customers = await ids.SelectAsync(async i => { ... });
?Я немного опоздал на вечеринку, но вы можете рассмотреть возможность использования GetAwaiter.GetResult () для запуска вашего асинхронного кода в контексте синхронизации, но в параллельном порядке, как показано ниже;
источник
Метод расширения для этого, который использует SemaphoreSlim, а также позволяет установить максимальную степень параллелизма
Пример использования:
источник
После введения нескольких вспомогательных методов вы сможете запускать параллельные запросы с помощью этого простого синтаксиса:
Здесь происходит следующее: мы разбиваем исходную коллекцию на 10 частей (
.Split(DegreeOfParallelism)
), затем запускаем 10 задач, каждая из которых обрабатывает свои элементы по очереди (.SelectManyAsync(...)
), и объединяем их обратно в один список.Стоит отметить, что есть более простой подход:
Но для этого нужна предосторожность : если у вас слишком большая исходная коллекция, она запланирует
Task
для каждого элемента, что может привести к значительному снижению производительности.Методы расширения, использованные в приведенных выше примерах, выглядят следующим образом:
источник