Разница между сопрограммой и будущим / задачей в Python 3.5?

102

Допустим, у нас есть фиктивная функция:

async def foo(arg):
    result = await some_remote_call(arg)
    return result.upper()

Какая разница между:

import asyncio    

coros = []
for i in range(5):
    coros.append(foo(i))

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(coros))

И:

import asyncio

futures = []
for i in range(5):
    futures.append(asyncio.ensure_future(foo(i)))

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(futures))

Примечание . Пример возвращает результат, но вопрос не в этом. Когда значение возврата имеет значение, используйтеgather() вместо wait().

Независимо от возвращаемого значения, я ищу ясности ensure_future(). wait(coros)иwait(futures) оба запускают сопрограммы, поэтому когда и почему сопрограмму следует обернуть вensure_future ?

В принципе, каков правильный способ (TM) запускать кучу неблокирующих операций с использованием Python 3.5 async ?

Что делать, если я хочу группировать звонки в качестве дополнительного кредита? Например, мне нужно позвонить some_remote_call(...)1000 раз, но я не хочу раздавить веб-сервер / базу данных и т. Д. 1000 одновременных подключений. Это можно сделать с потоком или пулом процессов, но есть ли способ сделать это с помощью asyncio?

Обновление 2020 (Python 3.7+) : не используйте эти фрагменты. Вместо этого используйте:

import asyncio

async def do_something_async():
    tasks = []
    for i in range(5):
        tasks.append(asyncio.create_task(foo(i)))
    await asyncio.gather(*tasks)

def do_something():
    asyncio.run(do_something_async)

Также рассмотрите возможность использования Trio , надежной сторонней альтернативы asyncio.

вязать
источник

Ответы:

96

Сопрограмма - это функция генератора, которая может как выдавать значения, так и принимать значения извне. Преимущество использования сопрограммы заключается в том, что мы можем приостановить выполнение функции и возобновить его позже. В случае сетевой операции имеет смысл приостановить выполнение функции, пока мы ждем ответа. Мы можем использовать время для выполнения некоторых других функций.

Будущее похоже на Promiseобъекты из Javascript. Это похоже на заполнитель для значения, которое материализуется в будущем. В вышеупомянутом случае, ожидая сетевого ввода-вывода, функция может предоставить нам контейнер, обещание, что она заполнит контейнер значением после завершения операции. Мы держимся за будущий объект, и когда он будет выполнен, мы можем вызвать для него метод, чтобы получить фактический результат.

Прямой ответ: не нужны, ensure_futureесли не нужны результаты. Они хороши, если вам нужны результаты или извлечение произошедших исключений.

Дополнительные кредиты: я бы выбрал run_in_executorи передал Executorэкземпляр, чтобы контролировать количество максимальных рабочих.

Пояснения и примеры кодов

В первом примере вы используете сопрограммы. waitФункция принимает кучу сопрограмм и объединяет их вместе. Итак, wait()завершается, когда все сопрограммы исчерпаны (завершено / завершено с возвратом всех значений).

loop = get_event_loop() # 
loop.run_until_complete(wait(coros))

Этот run_until_completeметод будет следить за тем, чтобы цикл был активен, пока выполнение не будет завершено. Обратите внимание, что в этом случае вы не получаете результатов асинхронного выполнения.

Во втором примере вы используете ensure_futureфункцию, чтобы обернуть сопрограмму и вернуть Taskобъект, который является разновидностью Future. При вызове сопрограммы запланировано выполнение в основном цикле событий ensure_future. Возвращенный объект future / task еще не имеет значения, но со временем, когда сетевые операции завершатся, будущий объект будет содержать результат операции.

from asyncio import ensure_future

futures = []
for i in range(5):
    futures.append(ensure_future(foo(i)))

loop = get_event_loop()
loop.run_until_complete(wait(futures))

Итак, в этом примере мы делаем то же самое, за исключением того, что используем фьючерсы, а не просто сопрограммы.

Давайте посмотрим на пример использования asyncio / coroutines / futures:

import asyncio


async def slow_operation():
    await asyncio.sleep(1)
    return 'Future is done!'


def got_result(future):
    print(future.result())

    # We have result, so let's stop
    loop.stop()


loop = asyncio.get_event_loop()
task = loop.create_task(slow_operation())
task.add_done_callback(got_result)

# We run forever
loop.run_forever()

Здесь мы использовали create_taskметод loopобъекта. ensure_futureзапланировал бы задачу в основном цикле событий. Этот метод позволяет нам запланировать сопрограмму в выбранном нами цикле.

Мы также видим концепцию добавления обратного вызова с использованием add_done_callbackметода объекта задачи.

A Task- это doneкогда сопрограмма возвращает значение, вызывает исключение или отменяется. Есть способы проверить эти инциденты.

Я написал несколько сообщений в блогах по этим темам, которые могут помочь:

Конечно, вы можете найти более подробную информацию в официальном руководстве: https://docs.python.org/3/library/asyncio.html

Маснун
источник
3
Я обновил свой вопрос, чтобы он стал более ясным - если мне не нужен результат сопрограммы, нужно ли мне его использовать ensure_future()? И если мне нужен результат, нельзя ли просто использовать run_until_complete(gather(coros))?
knite
1
ensure_futureпланирует выполнение сопрограммы в цикле событий. Так что я бы сказал, да, это необходимо. Но, конечно, вы можете запланировать сопрограммы, используя и другие функции / методы. Да, вы можете использовать gather()- но gather будет ждать, пока соберутся все ответы.
masnun
5
@AbuAshrafMasnun @knite gatherи waitфактически обернуть данные сопрограммы как задачи с использованием ensure_future(см. Источники здесь и здесь ). Так что нет никакого смысла использовать ensure_futureзаранее, и это не имеет никакого отношения к получению результата или нет.
Винсент
8
@AbuAshrafMasnun @knite Кроме того , ensure_futureесть loopаргумент, поэтому нет никаких оснований для использования loop.create_taskболее ensure_future. И run_in_executorне будет работать с сопрограммами, вместо этого следует использовать семафор .
Винсент
2
@vincent есть причина использовать create_taskover ensure_future, см. docs . Цитатаcreate_task() (added in Python 3.7) is the preferable way for spawning new tasks.
masi
24

Простой ответ

  • Вызов функции сопрограммы ( async def) НЕ запускает ее. Он возвращает объекты сопрограммы, например, функция генератора возвращает объекты генератора.
  • await извлекает значения из сопрограмм, т.е. "вызывает" сопрограмму
  • eusure_future/create_task запланировать запуск сопрограммы в цикле событий на следующей итерации (не дожидаясь их завершения, как поток демона).

Некоторые примеры кода

Сначала проясним некоторые термины:

  • функция сопрограммы, та, которую вы имеете async def;
  • объект сопрограммы, что вы получаете, когда "вызываете" функцию сопрограммы;
  • task, объект, обернутый вокруг объекта сопрограммы для запуска в цикле событий.

Случай 1, awaitв сопрограмме

Мы создаем две сопрограммы, awaitодну и используем create_taskдля запуска другой.

import asyncio
import time

# coroutine function
async def p(word):
    print(f'{time.time()} - {word}')


async def main():
    loop = asyncio.get_event_loop()
    coro = p('await')  # coroutine
    task2 = loop.create_task(p('create_task'))  # <- runs in next iteration
    await coro  # <-- run directly
    await task2

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

вы получите результат:

1539486251.7055213 - await
1539486251.7055705 - create_task

Объясните:

task1 выполнялась напрямую, а task2 выполнялась на следующей итерации.

Случай 2, передача управления циклу событий

Если мы заменим основную функцию, мы увидим другой результат:

async def main():
    loop = asyncio.get_event_loop()
    coro = p('await')
    task2 = loop.create_task(p('create_task'))  # scheduled to next iteration
    await asyncio.sleep(1)  # loop got control, and runs task2
    await coro  # run coro
    await task2

вы получите результат:

-> % python coro.py
1539486378.5244057 - create_task
1539486379.5252144 - await  # note the delay

Объясните:

При вызове asyncio.sleep(1)элемент управления возвращался в цикл событий, и цикл проверяет наличие задач для запуска, а затем запускает задачу, созданную с помощью create_task.

Обратите внимание, что мы сначала вызываем функцию сопрограммы, но не awaitее, поэтому мы просто создали одну сопрограмму, а не запускали ее. Затем мы снова вызываем функцию сопрограммы и оборачиваем ее в create_taskвызов, creat_task фактически планирует запуск сопрограммы на следующей итерации. Итак, в результате create taskвыполняется доawait .

Фактически, суть здесь в том, чтобы вернуть управление циклу, которое вы могли бы использовать asyncio.sleep(0)для получения того же результата.

Под капотом

loop.create_taskсобственно звонки asyncio.tasks.Task(), которые будут звонить loop.call_soon. И loop.call_soonпоставлю задачу loop._ready. Во время каждой итерации цикла он проверяет все обратные вызовы в loop._ready и запускает его.

asyncio.wait, asyncio.ensure_futureи asyncio.gatherфактически звонят loop.create_taskпрямо или косвенно.

Также обратите внимание на документы :

Обратные вызовы вызываются в том порядке, в котором они зарегистрированы. Каждый обратный вызов будет вызываться ровно один раз.

паук
источник
1
Спасибо за чистое объяснение! Надо сказать, довольно ужасный дизайн. В высокоуровневом API происходит утечка абстракции низкого уровня, что чрезмерно усложняет API.
Борис Бурков
1
посмотрите проект curio, который хорошо продуман
ospider
Хорошее объяснение! Думаю, эффект от await task2звонка можно прояснить. В обоих примерах вызов loop.create_task () - это то, что планирует task2 в цикле событий. Таким образом, в обоих exs вы можете удалить, await task2и в конечном итоге задача2 будет запущена. В ex2 поведение будет идентичным, поскольку await task2я считаю, что это просто планирование уже завершенной задачи (которая не будет запускаться во второй раз), тогда как в ex1 поведение будет немного другим, так как task2 не будет выполняться до тех пор, пока main не будет завершен. Чтобы увидеть разницу, добавьте print("end of main")в конце основной строки ex1
Эндрю
11

Комментарий Винсента, связанный с https://github.com/python/asyncio/blob/master/asyncio/tasks.py#L346 , показывает, что wait()сопрограммы обертывают ensure_future()за вас!

Другими словами, нам действительно нужно будущее, и сопрограммы будут незаметно преобразованы в них.

Я обновлю этот ответ, когда найду исчерпывающее объяснение того, как пакетировать сопрограммы / фьючерсы.

вязать
источник
Означает ли это , что для сопрограммного объекта c, await cэквивалентно await create_task(c)?
Алексей
3

Из BDFL [2013]

Задачи

  • Это сопрограмма, обернутая в будущее
  • класс Task является подклассом класса Future
  • Так что это тоже работает с ожиданием !

  • Чем он отличается от простой сопрограммы?
  • Он может прогрессировать, не дожидаясь этого
    • Пока вы ждете чего-то другого, т.е.
      • жду [something_else]

Имея это в виду, ensure_futureимеет смысл использовать имя для создания Задачи, поскольку результат Будущего будет вычислен независимо от того, ожидаете вы его или нет (пока вы чего-то ждете). Это позволяет циклу событий завершить вашу задачу, пока вы ждете других вещей. Обратите внимание, что Python 3.7 create_taskявляется предпочтительным способом обеспечения будущего .

Примечание: я изменил «yield from» на слайдах Guido на «await» для современности.

crizCraig
источник