Asyncio.gather против asyncio.wait

168

asyncio.gatherи, asyncio.waitпохоже, имеют схожие применения: у меня есть куча асинхронных вещей, которые я хочу выполнить / подождать (не обязательно ждать, пока один из них закончится, прежде чем начнется следующий). Они используют другой синтаксис и отличаются в некоторых деталях, но мне кажется очень непитоничным иметь 2 функции, которые имеют такое огромное перекрытие по функциональности. Что мне не хватает?

Клод
источник

Ответы:

196

Хотя в общих случаях они схожи («запустить и получить результаты для многих задач»), каждая функция имеет определенные функции для других случаев:

asyncio.gather()

Возвращает экземпляр Future, позволяя группировать задачи на высоком уровне:

import asyncio
from pprint import pprint

import random


async def coro(tag):
    print(">", tag)
    await asyncio.sleep(random.uniform(1, 3))
    print("<", tag)
    return tag


loop = asyncio.get_event_loop()

group1 = asyncio.gather(*[coro("group 1.{}".format(i)) for i in range(1, 6)])
group2 = asyncio.gather(*[coro("group 2.{}".format(i)) for i in range(1, 4)])
group3 = asyncio.gather(*[coro("group 3.{}".format(i)) for i in range(1, 10)])

all_groups = asyncio.gather(group1, group2, group3)

results = loop.run_until_complete(all_groups)

loop.close()

pprint(results)

Все задачи в группе можно отменить по телефону group2.cancel()или даже all_groups.cancel(). Также .gather(..., return_exceptions=True),

asyncio.wait()

Поддерживает ожидание остановки после выполнения первой задачи или после указанного тайм-аута, что обеспечивает более низкий уровень точности операций:

import asyncio
import random


async def coro(tag):
    print(">", tag)
    await asyncio.sleep(random.uniform(0.5, 5))
    print("<", tag)
    return tag


loop = asyncio.get_event_loop()

tasks = [coro(i) for i in range(1, 11)]

print("Get first result:")
finished, unfinished = loop.run_until_complete(
    asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))

for task in finished:
    print(task.result())
print("unfinished:", len(unfinished))

print("Get more results in 2 seconds:")
finished2, unfinished2 = loop.run_until_complete(
    asyncio.wait(unfinished, timeout=2))

for task in finished2:
    print(task.result())
print("unfinished2:", len(unfinished2))

print("Get all other results:")
finished3, unfinished3 = loop.run_until_complete(asyncio.wait(unfinished2))

for task in finished3:
    print(task.result())

loop.close()
Уди
источник
6
«Форма с одной звездочкой (* args) используется для передачи списка аргументов переменной длины без ключевых слов, а форма с двумя звездочками - для передачи списка аргументов переменной длины с
ключевыми словами
47

asyncio.waitуровень ниже чем asyncio.gather.

Как следует из названия, asyncio.gatherосновное внимание уделяется сбору результатов. он ожидает связки фьючерсов и возвращает их результаты в заданном порядке.

asyncio.waitпросто ждет будущего. и вместо того, чтобы давать вам результаты напрямую, он показывает выполненные и незавершенные задачи. вы должны собирать значения вручную.

Кроме того, вы можете указать дождаться завершения всех фьючерсов или только первого с wait.

паук
источник
1
Вы говорите: it waits on a bunch of futures and return their results in a given order. что, если у меня есть 10000000000000 задач, и все они возвращают большие данные? все результат сделает бум памяти?
Kingname
1
@Kingname ..wat
Мэтт Джойнер
18

Я также заметил, что вы можете предоставить группу сопрограмм в wait (), просто указав список:

result=loop.run_until_complete(asyncio.wait([
        say('first hello', 2),
        say('second hello', 1),
        say('third hello', 4)
    ]))

В то время как группировка в gather () выполняется простым указанием нескольких сопрограмм:

result=loop.run_until_complete(asyncio.gather(
        say('first hello', 2),
        say('second hello', 1),
        say('third hello', 4)
    ))
Джонни Эбанат
источник
20
Списки также можно использовать gather(), например:asyncio.gather(*task_list)
tehfink
1
Генераторы тоже могут
Джаб
Как можно использовать эту сборку, не блокируя остальную часть скрипта?
thebeancounter
4

Когда дело доходит до исключений, очень важное различие, которое легко упустить, - это предпочтение по умолчанию для этих двух функций.


Я буду использовать этот пример для моделирования сопрограммы, которая будет вызывать исключения, иногда -

import asyncio
import random


async def a_flaky_tsk(i):
    await asyncio.sleep(i)  # bit of fuzz to simulate a real-world example

    if i % 2 == 0:
        print(i, "ok")
    else:
        print(i, "crashed!")
        raise ValueError

coros = [a_flaky_tsk(i) for i in range(10)]

await asyncio.gather(*coros) выходы -

0 ok
1 crashed!
Traceback (most recent call last):
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 20, in <module>
    asyncio.run(main())
  File "/Users/dev/.pyenv/versions/3.8.2/lib/python3.8/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/Users/dev/.pyenv/versions/3.8.2/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 17, in main
    await asyncio.gather(*coros)
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
    raise ValueError
ValueError

Как видите, сопряжение после индекса 1никогда не выполнялось.


Но await asyncio.wait(coros)продолжает выполнять задачи, даже если некоторые из них выходят из строя -

0 ok
1 crashed!
2 ok
3 crashed!
4 ok
5 crashed!
6 ok
7 crashed!
8 ok
9 crashed!
Task exception was never retrieved
future: <Task finished name='Task-10' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
    raise ValueError
ValueError
Task exception was never retrieved
future: <Task finished name='Task-8' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
    raise ValueError
ValueError
Task exception was never retrieved
future: <Task finished name='Task-2' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
    raise ValueError
ValueError
Task exception was never retrieved
future: <Task finished name='Task-9' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
    raise ValueError
ValueError
Task exception was never retrieved
future: <Task finished name='Task-3' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
    raise ValueError
ValueError

Конечно, это поведение можно изменить для обоих, используя -

asyncio.gather(..., return_exceptions=True)

или,

asyncio.wait([...], return_when=asyncio.FIRST_EXCEPTION)


Но это еще не все!

Примечание: Task exception was never retrieved в журналах выше.

asyncio.wait()не будет повторно вызывать исключения из дочерних задач, пока вы не сделаете awaitих индивидуально. (Трассировка стека в журналах - это просто сообщения, их невозможно перехватить!)

done, pending = await asyncio.wait(coros)
for tsk in done:
    try:
        await tsk
    except Exception as e:
        print("I caught:", repr(e))

Выход -

0 ok
1 crashed!
2 ok
3 crashed!
4 ok
5 crashed!
6 ok
7 crashed!
8 ok
9 crashed!
I caught: ValueError()
I caught: ValueError()
I caught: ValueError()
I caught: ValueError()
I caught: ValueError()

С другой стороны, чтобы поймать исключения asyncio.gather(), вы должны -

results = await asyncio.gather(*coros, return_exceptions=True)
for result_or_exc in results:
    if isinstance(result_or_exc, Exception):
        print("I caught:", repr(result_or_exc))

(Тот же результат, что и раньше)

Дев Аггарвал
источник
0

В дополнение ко всем предыдущим ответам я хотел бы рассказать о различном поведении gather()и wait()в случае их отмены .

Собрать отмену

Если gather()отменяется, все отправленные ожидающие (которые еще не завершены) также отменяются .

Подождите отмены

Если wait()задача отменяется, она просто бросает, CancelledErrorа ожидаемые задачи остаются нетронутыми.

Простой пример:

import asyncio


async def task(arg):
    await asyncio.sleep(5)
    return arg


async def cancel_waiting_task(work_task, waiting_task):
    await asyncio.sleep(2)
    waiting_task.cancel()
    try:
        await waiting_task
        print("Waiting done")
    except asyncio.CancelledError:
        print("Waiting task cancelled")

    try:
        res = await work_task
        print(f"Work result: {res}")
    except asyncio.CancelledError:
        print("Work task cancelled")


async def main():
    work_task = asyncio.create_task(task("done"))
    waiting = asyncio.create_task(asyncio.wait({work_task}))
    await cancel_waiting_task(work_task, waiting)

    work_task = asyncio.create_task(task("done"))
    waiting = asyncio.gather(work_task)
    await cancel_waiting_task(work_task, waiting)


asyncio.run(main())

Выход:

asyncio.wait()
Waiting task cancelled
Work result: done
----------------
asyncio.gather()
Waiting task cancelled
Work task cancelled

Иногда возникает необходимость совместить wait()и gather()функциональность. Например, мы хотим дождаться завершения хотя бы одной задачи и отменить остальные ожидающие задачи после этого, а если waitingсама была отменена , то также отменить все ожидающие задачи.

В качестве реальных примеров предположим, что у нас есть событие отключения и рабочая задача. И мы хотим дождаться результатов рабочего задания, но если соединение было потеряно, то отменим его. Или мы сделаем несколько параллельных запросов, но по завершении хотя бы одного ответа отменим все остальные.

Это можно было сделать так:

import asyncio
from typing import Optional, Tuple, Set


async def wait_any(
        tasks: Set[asyncio.Future], *, timeout: Optional[int] = None,
) -> Tuple[Set[asyncio.Future], Set[asyncio.Future]]:
    tasks_to_cancel: Set[asyncio.Future] = set()
    try:
        done, tasks_to_cancel = await asyncio.wait(
            tasks, timeout=timeout, return_when=asyncio.FIRST_COMPLETED
        )
        return done, tasks_to_cancel
    except asyncio.CancelledError:
        tasks_to_cancel = tasks
        raise
    finally:
        for task in tasks_to_cancel:
            task.cancel()


async def task():
    await asyncio.sleep(5)


async def cancel_waiting_task(work_task, waiting_task):
    await asyncio.sleep(2)
    waiting_task.cancel()
    try:
        await waiting_task
        print("Waiting done")
    except asyncio.CancelledError:
        print("Waiting task cancelled")

    try:
        res = await work_task
        print(f"Work result: {res}")
    except asyncio.CancelledError:
        print("Work task cancelled")


async def check_tasks(waiting_task, working_task, waiting_conn_lost_task):
    try:
        await waiting_task
        print("waiting is done")
    except asyncio.CancelledError:
        print("waiting is cancelled")

    try:
        await waiting_conn_lost_task
        print("connection is lost")
    except asyncio.CancelledError:
        print("waiting connection lost is cancelled")

    try:
        await working_task
        print("work is done")
    except asyncio.CancelledError:
        print("work is cancelled")


async def work_done_case():
    working_task = asyncio.create_task(task())
    connection_lost_event = asyncio.Event()
    waiting_conn_lost_task = asyncio.create_task(connection_lost_event.wait())
    waiting_task = asyncio.create_task(wait_any({working_task, waiting_conn_lost_task}))
    await check_tasks(waiting_task, working_task, waiting_conn_lost_task)


async def conn_lost_case():
    working_task = asyncio.create_task(task())
    connection_lost_event = asyncio.Event()
    waiting_conn_lost_task = asyncio.create_task(connection_lost_event.wait())
    waiting_task = asyncio.create_task(wait_any({working_task, waiting_conn_lost_task}))
    await asyncio.sleep(2)
    connection_lost_event.set()  # <---
    await check_tasks(waiting_task, working_task, waiting_conn_lost_task)


async def cancel_waiting_case():
    working_task = asyncio.create_task(task())
    connection_lost_event = asyncio.Event()
    waiting_conn_lost_task = asyncio.create_task(connection_lost_event.wait())
    waiting_task = asyncio.create_task(wait_any({working_task, waiting_conn_lost_task}))
    await asyncio.sleep(2)
    waiting_task.cancel()  # <---
    await check_tasks(waiting_task, working_task, waiting_conn_lost_task)


async def main():
    print("Work done")
    print("-------------------")
    await work_done_case()
    print("\nConnection lost")
    print("-------------------")
    await conn_lost_case()
    print("\nCancel waiting")
    print("-------------------")
    await cancel_waiting_case()


asyncio.run(main())

Выход:

Work done
-------------------
waiting is done
waiting connection lost is cancelled
work is done

Connection lost
-------------------
waiting is done
connection is lost
work is cancelled

Cancel waiting
-------------------
waiting is cancelled
waiting connection lost is cancelled
work is cancelled
Алекс Нонам
источник