Вложенности ждут в Parallel.ForEach

183

В приложении метро мне нужно выполнить несколько вызовов WCF. Необходимо сделать значительное количество вызовов, поэтому мне нужно делать их в параллельном цикле. Проблема заключается в том, что параллельный цикл завершается до завершения всех вызовов WCF.

Как бы вы рефакторинг это работать так, как ожидалось?

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customers = new  System.Collections.Concurrent.BlockingCollection<Customer>();

Parallel.ForEach(ids, async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
});

foreach ( var customer in customers )
{
    Console.WriteLine(customer.ID);
}

Console.ReadKey();
Darthg8r
источник

Ответы:

172

Основная идея Parallel.ForEach()заключается в том, что у вас есть набор потоков, и каждый поток обрабатывает часть коллекции. Как вы заметили, это не работает с async- await, где вы хотите освободить поток на время асинхронного вызова.

Вы можете «исправить» это, заблокировав ForEach()потоки, но это разрушает весь смысл async- await.

То, что вы могли бы сделать, это использовать поток данных TPL вместо Parallel.ForEach(), который Taskхорошо поддерживает асинхронные s.

В частности, ваш код может быть написан с использованием, TransformBlockкоторый преобразует каждый идентификатор в Customerиспользование asyncлямбда-выражения. Этот блок может быть настроен для параллельного выполнения. Вы бы связали этот блок с тем, ActionBlockчто записывает каждый Customerна консоль. После того, как вы настроите блочную сеть, вы можете Post()каждый идентификатор в TransformBlock.

В коде:

var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };

var getCustomerBlock = new TransformBlock<string, Customer>(
    async i =>
    {
        ICustomerRepo repo = new CustomerRepo();
        return await repo.GetCustomer(i);
    }, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    });
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
    writeCustomerBlock, new DataflowLinkOptions
    {
        PropagateCompletion = true
    });

foreach (var id in ids)
    getCustomerBlock.Post(id);

getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();

Хотя вы, вероятно, хотите ограничить параллельность TransformBlockнекоторой небольшой константой. Кроме того, вы можете ограничить емкость TransformBlockи добавлять элементы к нему асинхронно SendAsync(), например, если коллекция слишком большая.

Дополнительным преимуществом по сравнению с вашим кодом (если он работал) является то, что запись начнется, как только закончится отдельный элемент, а не будет ждать завершения всей обработки.

svick
источник
2
Очень краткий обзор асинхронных, реактивных расширений, TPL и TPL DataFlow - vantsuyoshi.wordpress.com/2012/01/05/… для таких как я, которым может потребоваться некоторая ясность.
Норман Х
1
Я почти уверен, что этот ответ НЕ распараллеливает обработку. Я считаю, что вам нужно сделать Parallel.ForEach поверх идентификаторов и опубликовать их в getCustomerBlock. По крайней мере, это то, что я нашел, когда проверил это предложение.
JasonLind
4
@JasonLind Это действительно так. Использование Parallel.ForEach()для Post()элементов параллельно не должно быть никакого реального эффекта.
svick
1
@svick Хорошо, я нашел это, ActionBlock также должен быть в Параллеле. Я делал это немного по-другому, мне не нужно было преобразование, поэтому я просто использовал буферный блок и выполнял свою работу в ActionBlock. Я запутался в другом ответе на паутину.
JasonLind
2
Под этим я подразумеваю указание MaxDegreeOfParallelism для ActionBlock, как вы делаете для TransformBlock в своем примере
JasonLind
125

Ответ Свика (как обычно) превосходен.

Тем не менее, я считаю Dataflow более полезным, когда у вас есть большие объемы данных для передачи. Или когда вам нужна async-совместимая очередь.

В вашем случае более простое решение состоит в том, чтобы просто использовать async-стиль параллелизм:

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };

var customerTasks = ids.Select(i =>
  {
    ICustomerRepo repo = new CustomerRepo();
    return repo.GetCustomer(i);
  });
var customers = await Task.WhenAll(customerTasks);

foreach (var customer in customers)
{
  Console.WriteLine(customer.ID);
}

Console.ReadKey();
Стивен Клири
источник
14
Если вы хотите вручную ограничить параллелизм (что вы, скорее всего, делаете в этом случае), сделать это таким образом будет сложнее.
свик
1
Но вы правы, что Dataflow может быть довольно сложным (например, по сравнению с Parallel.ForEach()). Но я думаю, что в настоящее время это лучший вариант для работы практически со всеми asyncколлекциями.
svick
1
@JamesManning, как ParallelOptionsсобирается помочь? Это применимо только к тому Parallel.For/ForEach/Invoke, что в качестве установленного ФП здесь бесполезно.
Охад Шнайдер
1
@StephenCleary Если GetCustomerметод возвращает a Task<T>, следует ли использовать Select(async i => { await repo.GetCustomer(i);});?
Shyju
5
@batmaci: Parallel.ForEachне поддерживает async.
Стивен Клири
81

Использование DataFlow, как предложил svick, может быть излишним, и ответ Стивена не предоставляет средств для контроля параллелизма операции. Однако это может быть достигнуто довольно просто:

public static async Task RunWithMaxDegreeOfConcurrency<T>(
     int maxDegreeOfConcurrency, IEnumerable<T> collection, Func<T, Task> taskFactory)
{
    var activeTasks = new List<Task>(maxDegreeOfConcurrency);
    foreach (var task in collection.Select(taskFactory))
    {
        activeTasks.Add(task);
        if (activeTasks.Count == maxDegreeOfConcurrency)
        {
            await Task.WhenAny(activeTasks.ToArray());
            //observe exceptions here
            activeTasks.RemoveAll(t => t.IsCompleted); 
        }
    }
    await Task.WhenAll(activeTasks.ToArray()).ContinueWith(t => 
    {
        //observe exceptions in a manner consistent with the above   
    });
}

Эти ToArray()вызовы могут быть оптимизированы с помощью массива вместо списка и заменить завершенные задачи, но я сомневаюсь , что это будет делать большую часть разницы в большинстве сценариев. Пример использования по вопросу ОП:

RunWithMaxDegreeOfConcurrency(10, ids, async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
});

Пользователь EDIT Fellow SO и мастер TPL Эли Арбель указали мне на соответствующую статью от Стивена Туба . Как обычно, его реализация элегантна и эффективна:

public static Task ForEachAsync<T>(
      this IEnumerable<T> source, int dop, Func<T, Task> body) 
{ 
    return Task.WhenAll( 
        from partition in Partitioner.Create(source).GetPartitions(dop) 
        select Task.Run(async delegate { 
            using (partition) 
                while (partition.MoveNext()) 
                    await body(partition.Current).ContinueWith(t => 
                          {
                              //observe exceptions
                          });

        })); 
}
Охад Шнайдер
источник
1
@RichardPierre на самом деле эта перегрузка Partitioner.Createиспользует разбиение на фрагменты, которое предоставляет элементы динамически для различных задач, поэтому описанный вами сценарий не будет реализован . Также обратите внимание, что статическое (предопределенное) разбиение может быть быстрее в некоторых случаях из-за меньших издержек (особенно синхронизации). Для получения дополнительной информации см .: msdn.microsoft.com/en-us/library/dd997411(v=vs.110).aspx .
Охад Шнайдер
1
@OhadSchneider В // наблюдаем исключения, если это вызывает исключение, будет ли оно всплывать до вызывающей стороны? Например, если бы я хотел, чтобы все перечислимые прекратили обработку / потерпели неудачу, если какая-либо его часть потерпела неудачу?
Терри
3
@Terry будет пузыриться для вызывающей стороны в том смысле, что самая верхняя задача (созданная Task.WhenAll) будет содержать исключение (внутри AggregateException), и, следовательно, если упомянутый вызывающий awaitобъект используется , исключение будет выброшено на сайт вызова. Тем не менее, Task.WhenAllон по-прежнему будет ждать завершения всех задач и GetPartitionsбудет динамически распределять элементы при partition.MoveNextвызове, пока не останется больше элементов для обработки. Это означает, что если вы не добавите свой собственный механизм для остановки обработки (например CancellationToken), это не произойдет само по себе.
Охад Шнайдер
1
@gibbocool Я все еще не уверен, что следую. Предположим, у вас есть 7 задач с параметрами, указанными в вашем комментарии. Далее предположим, что первая партия занимает 5-секундное задание и три 1-секундных задания. Примерно через секунду 5-секундная задача все еще будет выполняться, тогда как три 1-секундных задачи будут завершены. В этот момент начнут выполняться оставшиеся три 1-секундные задачи (они будут переданы разделителем в три «свободных» потока).
Охад Шнайдер
2
@MichaelFreidgeim вы можете сделать что-то, как var current = partition.Currentраньше, await bodyа затем использовать currentв продолжении ( ContinueWith(t => { ... }).
Охад Шнайдер
43

Вы можете сэкономить усилия с новым пакетом AsyncEnumerator NuGet , который не существовал 4 года назад, когда вопрос был первоначально опубликован. Позволяет контролировать степень параллелизма:

using System.Collections.Async;
...

await ids.ParallelForEachAsync(async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
},
maxDegreeOfParallelism: 10);

Отказ от ответственности: я являюсь автором библиотеки AsyncEnumerator, которая имеет открытый исходный код и распространяется под лицензией MIT, и я публикую это сообщение просто для того, чтобы помочь сообществу.

Серж Семенов
источник
11
Сергей, тебе следует раскрыть, что ты автор библиотеки
Михаэль
5
хорошо, добавил отказ от ответственности. Я не ищу никакой выгоды от рекламы этого, просто хочу помочь людям;)
Серж Семенов
Ваша библиотека не совместима с .NET Core.
Корниэль Нобель
2
@CornielNobel, он совместим с .NET Core - исходный код на GitHub имеет тестовое покрытие как для .NET Framework, так и для .NET Core.
Сергей Семенов
1
@SergeSemenov Я много использовал твою библиотеку AsyncStreamsи должен сказать, что она великолепна. Не могу порекомендовать эту библиотеку.
WBuck
16

Оберните Parallel.Foreachв Task.Run()и вместо awaitиспользования ключевого слова[yourasyncmethod].Result

(вам нужно выполнить задачу Task.Run, чтобы не блокировать поток пользовательского интерфейса)

Что-то вроде этого:

var yourForeachTask = Task.Run(() =>
        {
            Parallel.ForEach(ids, i =>
            {
                ICustomerRepo repo = new CustomerRepo();
                var cust = repo.GetCustomer(i).Result;
                customers.Add(cust);
            });
        });
await yourForeachTask;
ofcoursedude
источник
3
В чем проблема с этим? Я бы сделал это в точности так. Позвольте Parallel.ForEachвыполнить параллельную работу, которая блокирует, пока все не будет сделано, и затем перенести все это в фоновый поток, чтобы иметь отзывчивый интерфейс. Любые проблемы с этим? Может быть, это слишком много, но это короткий, читаемый код.
ygoe
@LonelyPixel Моя единственная проблема заключается в том, что он вызывает, Task.Runкогда TaskCompletionSourceэто предпочтительнее.
Гусдор
1
@Gusdor Любопытно - почему TaskCompletionSourceпредпочтительнее?
Морская рыба
@Seafish Хороший вопрос, на который я бы хотел ответить. Должно быть, это был
тяжелый
Просто короткое обновление. Я искал именно это сейчас, прокрутил вниз, чтобы найти самое простое решение, и снова нашел свой комментарий. Я использовал именно этот код, и он работает как положено. Это только предполагает, что в цикле есть версия Sync исходных асинхронных вызовов. awaitможет быть перемещен вперед, чтобы сохранить дополнительное имя переменной.
ygoe
7

Это должно быть довольно эффективно и проще, чем заставить работать весь поток данных TPL:

var customers = await ids.SelectAsync(async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    return await repo.GetCustomer(i);
});

...

public static async Task<IList<TResult>> SelectAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector, int maxDegreesOfParallelism = 4)
{
    var results = new List<TResult>();

    var activeTasks = new HashSet<Task<TResult>>();
    foreach (var item in source)
    {
        activeTasks.Add(selector(item));
        if (activeTasks.Count >= maxDegreesOfParallelism)
        {
            var completed = await Task.WhenAny(activeTasks);
            activeTasks.Remove(completed);
            results.Add(completed.Result);
        }
    }

    results.AddRange(await Task.WhenAll(activeTasks));
    return results;
}
Джон Гитцен
источник
Не следует ли пример использования использования awaitкак: var customers = await ids.SelectAsync(async i => { ... });?
Paccc
5

Я немного опоздал на вечеринку, но вы можете рассмотреть возможность использования GetAwaiter.GetResult () для запуска вашего асинхронного кода в контексте синхронизации, но в параллельном порядке, как показано ниже;

 Parallel.ForEach(ids, i =>
{
    ICustomerRepo repo = new CustomerRepo();
    // Run this in thread which Parallel library occupied.
    var cust = repo.GetCustomer(i).GetAwaiter().GetResult();
    customers.Add(cust);
});
Теоман Шипахи
источник
5

Метод расширения для этого, который использует SemaphoreSlim, а также позволяет установить максимальную степень параллелизма

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxDegreeOfParallelism = null)
    {
        if (maxDegreeOfParallelism.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item).ContinueWith(res =>
                        {
                            // action is completed, so decrement the number of currently running tasks
                            semaphoreSlim.Release();
                        });
                    }));
                }

                // Wait for all tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }

Пример использования:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);
Джей шах
источник
5

После введения нескольких вспомогательных методов вы сможете запускать параллельные запросы с помощью этого простого синтаксиса:

const int DegreeOfParallelism = 10;
IEnumerable<double> result = await Enumerable.Range(0, 1000000)
    .Split(DegreeOfParallelism)
    .SelectManyAsync(async i => await CalculateAsync(i).ConfigureAwait(false))
    .ConfigureAwait(false);

Здесь происходит следующее: мы разбиваем исходную коллекцию на 10 частей ( .Split(DegreeOfParallelism)), затем запускаем 10 задач, каждая из которых обрабатывает свои элементы по очереди (.SelectManyAsync(...) ), и объединяем их обратно в один список.

Стоит отметить, что есть более простой подход:

double[] result2 = await Enumerable.Range(0, 1000000)
    .Select(async i => await CalculateAsync(i).ConfigureAwait(false))
    .WhenAll()
    .ConfigureAwait(false);

Но для этого нужна предосторожность : если у вас слишком большая исходная коллекция, она запланируетTask для каждого элемента, что может привести к значительному снижению производительности.

Методы расширения, использованные в приведенных выше примерах, выглядят следующим образом:

public static class CollectionExtensions
{
    /// <summary>
    /// Splits collection into number of collections of nearly equal size.
    /// </summary>
    public static IEnumerable<List<T>> Split<T>(this IEnumerable<T> src, int slicesCount)
    {
        if (slicesCount <= 0) throw new ArgumentOutOfRangeException(nameof(slicesCount));

        List<T> source = src.ToList();
        var sourceIndex = 0;
        for (var targetIndex = 0; targetIndex < slicesCount; targetIndex++)
        {
            var list = new List<T>();
            int itemsLeft = source.Count - targetIndex;
            while (slicesCount * list.Count < itemsLeft)
            {
                list.Add(source[sourceIndex++]);
            }

            yield return list;
        }
    }

    /// <summary>
    /// Takes collection of collections, projects those in parallel and merges results.
    /// </summary>
    public static async Task<IEnumerable<TResult>> SelectManyAsync<T, TResult>(
        this IEnumerable<IEnumerable<T>> source,
        Func<T, Task<TResult>> func)
    {
        List<TResult>[] slices = await source
            .Select(async slice => await slice.SelectListAsync(func).ConfigureAwait(false))
            .WhenAll()
            .ConfigureAwait(false);
        return slices.SelectMany(s => s);
    }

    /// <summary>Runs selector and awaits results.</summary>
    public static async Task<List<TResult>> SelectListAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector)
    {
        List<TResult> result = new List<TResult>();
        foreach (TSource source1 in source)
        {
            TResult result1 = await selector(source1).ConfigureAwait(false);
            result.Add(result1);
        }
        return result;
    }

    /// <summary>Wraps tasks with Task.WhenAll.</summary>
    public static Task<TResult[]> WhenAll<TResult>(this IEnumerable<Task<TResult>> source)
    {
        return Task.WhenAll<TResult>(source);
    }
}
Виталий Улантиков
источник