Хотя в общих случаях они схожи («запустить и получить результаты для многих задач»), каждая функция имеет определенные функции для других случаев:
Возвращает экземпляр Future, позволяя группировать задачи на высоком уровне:
import asyncio
from pprint import pprint
import random
async def coro(tag):
print(">", tag)
await asyncio.sleep(random.uniform(1, 3))
print("<", tag)
return tag
loop = asyncio.get_event_loop()
group1 = asyncio.gather(*[coro("group 1.{}".format(i)) for i in range(1, 6)])
group2 = asyncio.gather(*[coro("group 2.{}".format(i)) for i in range(1, 4)])
group3 = asyncio.gather(*[coro("group 3.{}".format(i)) for i in range(1, 10)])
all_groups = asyncio.gather(group1, group2, group3)
results = loop.run_until_complete(all_groups)
loop.close()
pprint(results)
Все задачи в группе можно отменить по телефону group2.cancel()
или даже all_groups.cancel()
. Также .gather(..., return_exceptions=True)
,
Поддерживает ожидание остановки после выполнения первой задачи или после указанного тайм-аута, что обеспечивает более низкий уровень точности операций:
import asyncio
import random
async def coro(tag):
print(">", tag)
await asyncio.sleep(random.uniform(0.5, 5))
print("<", tag)
return tag
loop = asyncio.get_event_loop()
tasks = [coro(i) for i in range(1, 11)]
print("Get first result:")
finished, unfinished = loop.run_until_complete(
asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))
for task in finished:
print(task.result())
print("unfinished:", len(unfinished))
print("Get more results in 2 seconds:")
finished2, unfinished2 = loop.run_until_complete(
asyncio.wait(unfinished, timeout=2))
for task in finished2:
print(task.result())
print("unfinished2:", len(unfinished2))
print("Get all other results:")
finished3, unfinished3 = loop.run_until_complete(asyncio.wait(unfinished2))
for task in finished3:
print(task.result())
loop.close()
asyncio.wait
уровень ниже чемasyncio.gather
.Как следует из названия,
asyncio.gather
основное внимание уделяется сбору результатов. он ожидает связки фьючерсов и возвращает их результаты в заданном порядке.asyncio.wait
просто ждет будущего. и вместо того, чтобы давать вам результаты напрямую, он показывает выполненные и незавершенные задачи. вы должны собирать значения вручную.Кроме того, вы можете указать дождаться завершения всех фьючерсов или только первого с
wait
.источник
it waits on a bunch of futures and return their results in a given order
. что, если у меня есть 10000000000000 задач, и все они возвращают большие данные? все результат сделает бум памяти?Я также заметил, что вы можете предоставить группу сопрограмм в wait (), просто указав список:
result=loop.run_until_complete(asyncio.wait([ say('first hello', 2), say('second hello', 1), say('third hello', 4) ]))
В то время как группировка в gather () выполняется простым указанием нескольких сопрограмм:
result=loop.run_until_complete(asyncio.gather( say('first hello', 2), say('second hello', 1), say('third hello', 4) ))
источник
gather()
, например:asyncio.gather(*task_list)
Когда дело доходит до исключений, очень важное различие, которое легко упустить, - это предпочтение по умолчанию для этих двух функций.
Я буду использовать этот пример для моделирования сопрограммы, которая будет вызывать исключения, иногда -
import asyncio import random async def a_flaky_tsk(i): await asyncio.sleep(i) # bit of fuzz to simulate a real-world example if i % 2 == 0: print(i, "ok") else: print(i, "crashed!") raise ValueError coros = [a_flaky_tsk(i) for i in range(10)]
await asyncio.gather(*coros)
выходы -0 ok 1 crashed! Traceback (most recent call last): File "/Users/dev/PycharmProjects/trading/xxx.py", line 20, in <module> asyncio.run(main()) File "/Users/dev/.pyenv/versions/3.8.2/lib/python3.8/asyncio/runners.py", line 43, in run return loop.run_until_complete(main) File "/Users/dev/.pyenv/versions/3.8.2/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete return future.result() File "/Users/dev/PycharmProjects/trading/xxx.py", line 17, in main await asyncio.gather(*coros) File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk raise ValueError ValueError
Как видите, сопряжение после индекса
1
никогда не выполнялось.Но
await asyncio.wait(coros)
продолжает выполнять задачи, даже если некоторые из них выходят из строя -0 ok 1 crashed! 2 ok 3 crashed! 4 ok 5 crashed! 6 ok 7 crashed! 8 ok 9 crashed! Task exception was never retrieved future: <Task finished name='Task-10' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()> Traceback (most recent call last): File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk raise ValueError ValueError Task exception was never retrieved future: <Task finished name='Task-8' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()> Traceback (most recent call last): File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk raise ValueError ValueError Task exception was never retrieved future: <Task finished name='Task-2' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()> Traceback (most recent call last): File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk raise ValueError ValueError Task exception was never retrieved future: <Task finished name='Task-9' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()> Traceback (most recent call last): File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk raise ValueError ValueError Task exception was never retrieved future: <Task finished name='Task-3' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()> Traceback (most recent call last): File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk raise ValueError ValueError
Конечно, это поведение можно изменить для обоих, используя -
asyncio.gather(..., return_exceptions=True)
или,
asyncio.wait([...], return_when=asyncio.FIRST_EXCEPTION)
Но это еще не все!
Примечание:
Task exception was never retrieved
в журналах выше.asyncio.wait()
не будет повторно вызывать исключения из дочерних задач, пока вы не сделаетеawait
их индивидуально. (Трассировка стека в журналах - это просто сообщения, их невозможно перехватить!)done, pending = await asyncio.wait(coros) for tsk in done: try: await tsk except Exception as e: print("I caught:", repr(e))
Выход -
0 ok 1 crashed! 2 ok 3 crashed! 4 ok 5 crashed! 6 ok 7 crashed! 8 ok 9 crashed! I caught: ValueError() I caught: ValueError() I caught: ValueError() I caught: ValueError() I caught: ValueError()
С другой стороны, чтобы поймать исключения
asyncio.gather()
, вы должны -results = await asyncio.gather(*coros, return_exceptions=True) for result_or_exc in results: if isinstance(result_or_exc, Exception): print("I caught:", repr(result_or_exc))
(Тот же результат, что и раньше)
источник
В дополнение ко всем предыдущим ответам я хотел бы рассказать о различном поведении
gather()
иwait()
в случае их отмены .Собрать отмену
Если
gather()
отменяется, все отправленные ожидающие (которые еще не завершены) также отменяются .Подождите отмены
Если
wait()
задача отменяется, она просто бросает,CancelledError
а ожидаемые задачи остаются нетронутыми.Простой пример:
import asyncio async def task(arg): await asyncio.sleep(5) return arg async def cancel_waiting_task(work_task, waiting_task): await asyncio.sleep(2) waiting_task.cancel() try: await waiting_task print("Waiting done") except asyncio.CancelledError: print("Waiting task cancelled") try: res = await work_task print(f"Work result: {res}") except asyncio.CancelledError: print("Work task cancelled") async def main(): work_task = asyncio.create_task(task("done")) waiting = asyncio.create_task(asyncio.wait({work_task})) await cancel_waiting_task(work_task, waiting) work_task = asyncio.create_task(task("done")) waiting = asyncio.gather(work_task) await cancel_waiting_task(work_task, waiting) asyncio.run(main())
Выход:
Иногда возникает необходимость совместить
wait()
иgather()
функциональность. Например, мы хотим дождаться завершения хотя бы одной задачи и отменить остальные ожидающие задачи после этого, а еслиwaiting
сама была отменена , то также отменить все ожидающие задачи.В качестве реальных примеров предположим, что у нас есть событие отключения и рабочая задача. И мы хотим дождаться результатов рабочего задания, но если соединение было потеряно, то отменим его. Или мы сделаем несколько параллельных запросов, но по завершении хотя бы одного ответа отменим все остальные.
Это можно было сделать так:
import asyncio from typing import Optional, Tuple, Set async def wait_any( tasks: Set[asyncio.Future], *, timeout: Optional[int] = None, ) -> Tuple[Set[asyncio.Future], Set[asyncio.Future]]: tasks_to_cancel: Set[asyncio.Future] = set() try: done, tasks_to_cancel = await asyncio.wait( tasks, timeout=timeout, return_when=asyncio.FIRST_COMPLETED ) return done, tasks_to_cancel except asyncio.CancelledError: tasks_to_cancel = tasks raise finally: for task in tasks_to_cancel: task.cancel() async def task(): await asyncio.sleep(5) async def cancel_waiting_task(work_task, waiting_task): await asyncio.sleep(2) waiting_task.cancel() try: await waiting_task print("Waiting done") except asyncio.CancelledError: print("Waiting task cancelled") try: res = await work_task print(f"Work result: {res}") except asyncio.CancelledError: print("Work task cancelled") async def check_tasks(waiting_task, working_task, waiting_conn_lost_task): try: await waiting_task print("waiting is done") except asyncio.CancelledError: print("waiting is cancelled") try: await waiting_conn_lost_task print("connection is lost") except asyncio.CancelledError: print("waiting connection lost is cancelled") try: await working_task print("work is done") except asyncio.CancelledError: print("work is cancelled") async def work_done_case(): working_task = asyncio.create_task(task()) connection_lost_event = asyncio.Event() waiting_conn_lost_task = asyncio.create_task(connection_lost_event.wait()) waiting_task = asyncio.create_task(wait_any({working_task, waiting_conn_lost_task})) await check_tasks(waiting_task, working_task, waiting_conn_lost_task) async def conn_lost_case(): working_task = asyncio.create_task(task()) connection_lost_event = asyncio.Event() waiting_conn_lost_task = asyncio.create_task(connection_lost_event.wait()) waiting_task = asyncio.create_task(wait_any({working_task, waiting_conn_lost_task})) await asyncio.sleep(2) connection_lost_event.set() # <--- await check_tasks(waiting_task, working_task, waiting_conn_lost_task) async def cancel_waiting_case(): working_task = asyncio.create_task(task()) connection_lost_event = asyncio.Event() waiting_conn_lost_task = asyncio.create_task(connection_lost_event.wait()) waiting_task = asyncio.create_task(wait_any({working_task, waiting_conn_lost_task})) await asyncio.sleep(2) waiting_task.cancel() # <--- await check_tasks(waiting_task, working_task, waiting_conn_lost_task) async def main(): print("Work done") print("-------------------") await work_done_case() print("\nConnection lost") print("-------------------") await conn_lost_case() print("\nCancel waiting") print("-------------------") await cancel_waiting_case() asyncio.run(main())
Выход:
Work done ------------------- waiting is done waiting connection lost is cancelled work is done Connection lost ------------------- waiting is done connection is lost work is cancelled Cancel waiting ------------------- waiting is cancelled waiting connection lost is cancelled work is cancelled
источник