Использование async / await для нескольких задач

406

Я использую клиент API, который является полностью асинхронным, то есть каждая операция либо возвращает, Taskлибо Task<T>, например:

static async Task DoSomething(int siteId, int postId, IBlogClient client)
{
    await client.DeletePost(siteId, postId); // call API client
    Console.WriteLine("Deleted post {0}.", siteId);
}

Используя асинхронные / ожидающие операторы C # 5, каков правильный / наиболее эффективный способ запуска нескольких задач и ожидания их выполнения:

int[] ids = new[] { 1, 2, 3, 4, 5 };
Parallel.ForEach(ids, i => DoSomething(1, i, blogClient).Wait());

или:

int[] ids = new[] { 1, 2, 3, 4, 5 };
Task.WaitAll(ids.Select(i => DoSomething(1, i, blogClient)).ToArray());

Поскольку клиент API использует HttpClient для внутреннего использования, я ожидаю, что он немедленно выдаст 5 HTTP-запросов, записывая их на консоль по завершении каждого из них.

Бен Фостер
источник
И в чем проблема?
Сергей Шевченко,
1
@SergShevchenko Проблема в том, что его Parallel.ForEach выполнен некорректно (см. Ответы) - он спрашивает, правильны ли его попытки запустить асинхронный код параллельно, предлагая две попытки решения, и лучше ли одна попытка решения другой (и, вероятно, почему так) ).
AnorZaken

Ответы:

572
int[] ids = new[] { 1, 2, 3, 4, 5 };
Parallel.ForEach(ids, i => DoSomething(1, i, blogClient).Wait());

Хотя вы выполняете операции параллельно с приведенным выше кодом, этот код блокирует каждый поток, в котором выполняется каждая операция. Например, если сетевой вызов занимает 2 секунды, каждый поток зависает на 2 секунды без каких-либо действий, кроме ожидания.

int[] ids = new[] { 1, 2, 3, 4, 5 };
Task.WaitAll(ids.Select(i => DoSomething(1, i, blogClient)).ToArray());

С другой стороны, приведенный выше код WaitAllтакже блокирует потоки, и ваши потоки не смогут свободно обрабатывать любую другую работу до завершения операции.

Рекомендуемый подход

Я бы предпочел, WhenAllкоторый будет выполнять ваши операции асинхронно в параллель.

public async Task DoWork() {

    int[] ids = new[] { 1, 2, 3, 4, 5 };
    await Task.WhenAll(ids.Select(i => DoSomething(1, i, blogClient)));
}

На самом деле, в приведенном выше случае вам даже не нужно await, вы можете просто напрямую вернуться из метода, поскольку у вас нет никаких продолжений:

public Task DoWork() 
{
    int[] ids = new[] { 1, 2, 3, 4, 5 };
    return Task.WhenAll(ids.Select(i => DoSomething(1, i, blogClient)));
}

Чтобы поддержать это, вот подробное сообщение в блоге, в котором рассматриваются все альтернативы и их преимущества / недостатки: как и где одновременный асинхронный ввод-вывод с ASP.NET Web API

tugberk
источник
31
«приведенный выше код WaitAllтакже блокирует потоки» - разве он не блокирует только один поток, который вызвал WaitAll?
Роулинг
5
@ Документация гласит, что «Тип: System.Threading.Tasks.Task [] Массив экземпляров Task, которые нужно ждать.». Таким образом, он блокирует все потоки.
Mixxiphoid
30
@Mixxiphoid: бит, который вы цитировали, не означает, что он блокирует все потоки. Он блокирует только вызывающий поток, пока выполняются поставленные задачи. То, как эти задачи выполняются, зависит от планировщика. Обычно после завершения каждой задачи поток, в котором она выполнялась, возвращается в пул. Каждый поток не будет заблокирован, пока другие не будут завершены.
Мусуль
3
@ tugberk, Насколько я понимаю, единственное различие между «классическими» методами Task и аналогами Async заключается в том, как они взаимодействуют с потоками между моментом, когда задание начинает выполняться и заканчивается. Классический метод в планировщике по умолчанию будет захватывать поток в течение этого периода (даже если он «спит»), а асинхронные - нет. Никакой разницы за пределами этого периода, то есть задача запланирована, но не запущена, и когда она завершена, но ее вызывающая сторона все еще ждет.
Musaul
3
@tugberk См. stackoverflow.com/a/6123432/750216 Разница в том, заблокирован ли вызывающий поток или нет, остальные - то же самое. Вы можете отредактировать ответ, чтобы уточнить.
Разван Флавиус Панда
45

Мне было любопытно увидеть результаты методов, представленных в вопросе, а также принятый ответ, поэтому я проверил его.

Вот код:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace AsyncTest
{
    class Program
    {
        class Worker
        {
            public int Id;
            public int SleepTimeout;

            public async Task DoWork(DateTime testStart)
            {
                var workerStart = DateTime.Now;
                Console.WriteLine("Worker {0} started on thread {1}, beginning {2} seconds after test start.",
                    Id, Thread.CurrentThread.ManagedThreadId, (workerStart-testStart).TotalSeconds.ToString("F2"));
                await Task.Run(() => Thread.Sleep(SleepTimeout));
                var workerEnd = DateTime.Now;
                Console.WriteLine("Worker {0} stopped; the worker took {1} seconds, and it finished {2} seconds after the test start.",
                   Id, (workerEnd-workerStart).TotalSeconds.ToString("F2"), (workerEnd-testStart).TotalSeconds.ToString("F2"));
            }
        }

        static void Main(string[] args)
        {
            var workers = new List<Worker>
            {
                new Worker { Id = 1, SleepTimeout = 1000 },
                new Worker { Id = 2, SleepTimeout = 2000 },
                new Worker { Id = 3, SleepTimeout = 3000 },
                new Worker { Id = 4, SleepTimeout = 4000 },
                new Worker { Id = 5, SleepTimeout = 5000 },
            };

            var startTime = DateTime.Now;
            Console.WriteLine("Starting test: Parallel.ForEach...");
            PerformTest_ParallelForEach(workers, startTime);
            var endTime = DateTime.Now;
            Console.WriteLine("Test finished after {0} seconds.\n",
                (endTime - startTime).TotalSeconds.ToString("F2"));

            startTime = DateTime.Now;
            Console.WriteLine("Starting test: Task.WaitAll...");
            PerformTest_TaskWaitAll(workers, startTime);
            endTime = DateTime.Now;
            Console.WriteLine("Test finished after {0} seconds.\n",
                (endTime - startTime).TotalSeconds.ToString("F2"));

            startTime = DateTime.Now;
            Console.WriteLine("Starting test: Task.WhenAll...");
            var task = PerformTest_TaskWhenAll(workers, startTime);
            task.Wait();
            endTime = DateTime.Now;
            Console.WriteLine("Test finished after {0} seconds.\n",
                (endTime - startTime).TotalSeconds.ToString("F2"));

            Console.ReadKey();
        }

        static void PerformTest_ParallelForEach(List<Worker> workers, DateTime testStart)
        {
            Parallel.ForEach(workers, worker => worker.DoWork(testStart).Wait());
        }

        static void PerformTest_TaskWaitAll(List<Worker> workers, DateTime testStart)
        {
            Task.WaitAll(workers.Select(worker => worker.DoWork(testStart)).ToArray());
        }

        static Task PerformTest_TaskWhenAll(List<Worker> workers, DateTime testStart)
        {
            return Task.WhenAll(workers.Select(worker => worker.DoWork(testStart)));
        }
    }
}

И полученный результат:

Starting test: Parallel.ForEach...
Worker 1 started on thread 1, beginning 0.21 seconds after test start.
Worker 4 started on thread 5, beginning 0.21 seconds after test start.
Worker 2 started on thread 3, beginning 0.21 seconds after test start.
Worker 5 started on thread 6, beginning 0.21 seconds after test start.
Worker 3 started on thread 4, beginning 0.21 seconds after test start.
Worker 1 stopped; the worker took 1.90 seconds, and it finished 2.11 seconds after the test start.
Worker 2 stopped; the worker took 3.89 seconds, and it finished 4.10 seconds after the test start.
Worker 3 stopped; the worker took 5.89 seconds, and it finished 6.10 seconds after the test start.
Worker 4 stopped; the worker took 5.90 seconds, and it finished 6.11 seconds after the test start.
Worker 5 stopped; the worker took 8.89 seconds, and it finished 9.10 seconds after the test start.
Test finished after 9.10 seconds.

Starting test: Task.WaitAll...
Worker 1 started on thread 1, beginning 0.01 seconds after test start.
Worker 2 started on thread 1, beginning 0.01 seconds after test start.
Worker 3 started on thread 1, beginning 0.01 seconds after test start.
Worker 4 started on thread 1, beginning 0.01 seconds after test start.
Worker 5 started on thread 1, beginning 0.01 seconds after test start.
Worker 1 stopped; the worker took 1.00 seconds, and it finished 1.01 seconds after the test start.
Worker 2 stopped; the worker took 2.00 seconds, and it finished 2.01 seconds after the test start.
Worker 3 stopped; the worker took 3.00 seconds, and it finished 3.01 seconds after the test start.
Worker 4 stopped; the worker took 4.00 seconds, and it finished 4.01 seconds after the test start.
Worker 5 stopped; the worker took 5.00 seconds, and it finished 5.01 seconds after the test start.
Test finished after 5.01 seconds.

Starting test: Task.WhenAll...
Worker 1 started on thread 1, beginning 0.00 seconds after test start.
Worker 2 started on thread 1, beginning 0.00 seconds after test start.
Worker 3 started on thread 1, beginning 0.00 seconds after test start.
Worker 4 started on thread 1, beginning 0.00 seconds after test start.
Worker 5 started on thread 1, beginning 0.00 seconds after test start.
Worker 1 stopped; the worker took 1.00 seconds, and it finished 1.00 seconds after the test start.
Worker 2 stopped; the worker took 2.00 seconds, and it finished 2.00 seconds after the test start.
Worker 3 stopped; the worker took 3.00 seconds, and it finished 3.00 seconds after the test start.
Worker 4 stopped; the worker took 4.00 seconds, and it finished 4.00 seconds after the test start.
Worker 5 stopped; the worker took 5.00 seconds, and it finished 5.00 seconds after the test start.
Test finished after 5.00 seconds.
RiaanDP
источник
2
Если вы потратите время на каждый из этих результатов, это будет более полезно
Серж Саган
8
@SerjSagan Моя первоначальная идея состояла в том, чтобы просто убедиться, что рабочие запускаются одновременно в каждом случае, но я добавил временные метки, чтобы улучшить ясность теста. Спасибо за предложение.
RiaanDP
Спасибо за тест. Однако немного странно, что вы запускаете thread.sleep в потоке, отдельном от «рабочего потока». Не то, чтобы это имело значение в этом случае, но не имеет ли это больше смысла для Task.Run рабочих потоков, если мы моделируем вычислительную работу, или просто Task.Delay вместо sleep, если мы моделируем ввод / вывод? Просто проверяю ваши мысли на этот счет.
AnorZaken
24

Поскольку вызываемый API является асинхронным, Parallel.ForEachверсия не имеет особого смысла. Вы не должны использовать .Waitв WaitAllверсии, так как это потеряло бы параллелизм. Другая альтернатива, если вызывающий объект асинхронный, использует Task.WhenAllпосле выполнения Selectи ToArrayдля генерации массива задач. Второй альтернативой является использование Rx 2.0

Джеймс Мэннинг
источник
10

Вы можете использовать Task.WhenAllфункцию, которую вы можете передать n задач; Task.WhenAllвернет задачу, которая выполняется до завершения, когда все задачи, которые вы передали для Task.WhenAllзавершения. Вы должны ждать асинхронно, Task.WhenAllчтобы не блокировать поток пользовательского интерфейса:

   public async Task DoSomeThing() {

       var Task[] tasks = new Task[numTasks];
       for(int i = 0; i < numTask; i++)
       {
          tasks[i] = CallSomeAsync();
       }
       await Task.WhenAll(tasks);
       // code that'll execute on UI thread
   }
Ахмед Васим
источник
8

Parallel.ForEachтребует список определенных пользователем рабочих и не асинхронный Action для выполнения с каждым рабочим.

Task.WaitAllи Task.WhenAllтребуют List<Task>, которые по определению являются асинхронными.

Я нашел RiaanDP «S ответ очень полезно , чтобы понять разницу, но она нуждается в коррекции для Parallel.ForEach. Недостаточно репутации, чтобы ответить на его комментарий, следовательно, мой собственный ответ.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace AsyncTest
{
    class Program
    {
        class Worker
        {
            public int Id;
            public int SleepTimeout;

            public void DoWork(DateTime testStart)
            {
                var workerStart = DateTime.Now;
                Console.WriteLine("Worker {0} started on thread {1}, beginning {2} seconds after test start.",
                    Id, Thread.CurrentThread.ManagedThreadId, (workerStart - testStart).TotalSeconds.ToString("F2"));
                Thread.Sleep(SleepTimeout);
                var workerEnd = DateTime.Now;
                Console.WriteLine("Worker {0} stopped; the worker took {1} seconds, and it finished {2} seconds after the test start.",
                   Id, (workerEnd - workerStart).TotalSeconds.ToString("F2"), (workerEnd - testStart).TotalSeconds.ToString("F2"));
            }

            public async Task DoWorkAsync(DateTime testStart)
            {
                var workerStart = DateTime.Now;
                Console.WriteLine("Worker {0} started on thread {1}, beginning {2} seconds after test start.",
                    Id, Thread.CurrentThread.ManagedThreadId, (workerStart - testStart).TotalSeconds.ToString("F2"));
                await Task.Run(() => Thread.Sleep(SleepTimeout));
                var workerEnd = DateTime.Now;
                Console.WriteLine("Worker {0} stopped; the worker took {1} seconds, and it finished {2} seconds after the test start.",
                   Id, (workerEnd - workerStart).TotalSeconds.ToString("F2"), (workerEnd - testStart).TotalSeconds.ToString("F2"));
            }
        }

        static void Main(string[] args)
        {
            var workers = new List<Worker>
            {
                new Worker { Id = 1, SleepTimeout = 1000 },
                new Worker { Id = 2, SleepTimeout = 2000 },
                new Worker { Id = 3, SleepTimeout = 3000 },
                new Worker { Id = 4, SleepTimeout = 4000 },
                new Worker { Id = 5, SleepTimeout = 5000 },
            };

            var startTime = DateTime.Now;
            Console.WriteLine("Starting test: Parallel.ForEach...");
            PerformTest_ParallelForEach(workers, startTime);
            var endTime = DateTime.Now;
            Console.WriteLine("Test finished after {0} seconds.\n",
                (endTime - startTime).TotalSeconds.ToString("F2"));

            startTime = DateTime.Now;
            Console.WriteLine("Starting test: Task.WaitAll...");
            PerformTest_TaskWaitAll(workers, startTime);
            endTime = DateTime.Now;
            Console.WriteLine("Test finished after {0} seconds.\n",
                (endTime - startTime).TotalSeconds.ToString("F2"));

            startTime = DateTime.Now;
            Console.WriteLine("Starting test: Task.WhenAll...");
            var task = PerformTest_TaskWhenAll(workers, startTime);
            task.Wait();
            endTime = DateTime.Now;
            Console.WriteLine("Test finished after {0} seconds.\n",
                (endTime - startTime).TotalSeconds.ToString("F2"));

            Console.ReadKey();
        }

        static void PerformTest_ParallelForEach(List<Worker> workers, DateTime testStart)
        {
            Parallel.ForEach(workers, worker => worker.DoWork(testStart));
        }

        static void PerformTest_TaskWaitAll(List<Worker> workers, DateTime testStart)
        {
            Task.WaitAll(workers.Select(worker => worker.DoWorkAsync(testStart)).ToArray());
        }

        static Task PerformTest_TaskWhenAll(List<Worker> workers, DateTime testStart)
        {
            return Task.WhenAll(workers.Select(worker => worker.DoWorkAsync(testStart)));
        }
    }
}

Результирующий вывод ниже. Время исполнения сопоставимо. Я запустил этот тест, пока мой компьютер еженедельно проверял антивирус. Изменение порядка тестов изменило время их выполнения.

Starting test: Parallel.ForEach...
Worker 1 started on thread 9, beginning 0.02 seconds after test start.
Worker 2 started on thread 10, beginning 0.02 seconds after test start.
Worker 3 started on thread 11, beginning 0.02 seconds after test start.
Worker 4 started on thread 13, beginning 0.03 seconds after test start.
Worker 5 started on thread 14, beginning 0.03 seconds after test start.
Worker 1 stopped; the worker took 1.00 seconds, and it finished 1.02 seconds after the test start.
Worker 2 stopped; the worker took 2.00 seconds, and it finished 2.02 seconds after the test start.
Worker 3 stopped; the worker took 3.00 seconds, and it finished 3.03 seconds after the test start.
Worker 4 stopped; the worker took 4.00 seconds, and it finished 4.03 seconds after the test start.
Worker 5 stopped; the worker took 5.00 seconds, and it finished 5.03 seconds after the test start.
Test finished after 5.03 seconds.

Starting test: Task.WaitAll...
Worker 1 started on thread 9, beginning 0.00 seconds after test start.
Worker 2 started on thread 9, beginning 0.00 seconds after test start.
Worker 3 started on thread 9, beginning 0.00 seconds after test start.
Worker 4 started on thread 9, beginning 0.00 seconds after test start.
Worker 5 started on thread 9, beginning 0.01 seconds after test start.
Worker 1 stopped; the worker took 1.00 seconds, and it finished 1.01 seconds after the test start.
Worker 2 stopped; the worker took 2.00 seconds, and it finished 2.01 seconds after the test start.
Worker 3 stopped; the worker took 3.00 seconds, and it finished 3.01 seconds after the test start.
Worker 4 stopped; the worker took 4.00 seconds, and it finished 4.01 seconds after the test start.
Worker 5 stopped; the worker took 5.00 seconds, and it finished 5.01 seconds after the test start.
Test finished after 5.01 seconds.

Starting test: Task.WhenAll...
Worker 1 started on thread 9, beginning 0.00 seconds after test start.
Worker 2 started on thread 9, beginning 0.00 seconds after test start.
Worker 3 started on thread 9, beginning 0.00 seconds after test start.
Worker 4 started on thread 9, beginning 0.00 seconds after test start.
Worker 5 started on thread 9, beginning 0.00 seconds after test start.
Worker 1 stopped; the worker took 1.00 seconds, and it finished 1.00 seconds after the test start.
Worker 2 stopped; the worker took 2.00 seconds, and it finished 2.00 seconds after the test start.
Worker 3 stopped; the worker took 3.00 seconds, and it finished 3.00 seconds after the test start.
Worker 4 stopped; the worker took 4.00 seconds, and it finished 4.00 seconds after the test start.
Worker 5 stopped; the worker took 5.00 seconds, and it finished 5.01 seconds after the test start.
Test finished after 5.01 seconds.
JPortillo
источник