Как на самом деле работает asyncio?

123

Этот вопрос мотивирован моим другим вопросом: как ждать в cdef?

В сети есть масса статей и сообщений в блогах asyncio, но все они очень поверхностны. Я не смог найти никакой информации о том, как asyncioэто на самом деле реализовано, и что делает ввод-вывод асинхронным. Я пытался прочитать исходный код, но это тысячи строк кода C не самого высокого уровня, многие из которых имеют дело с вспомогательными объектами, но, что наиболее важно, трудно связать синтаксис Python и то, какой код C он будет переводить в.

Собственная документация Asycnio еще менее полезна. Там нет информации о том, как это работает, только некоторые инструкции о том, как его использовать, которые также иногда вводят в заблуждение / очень плохо написаны.

Я знаком с реализацией сопрограмм в Go и как бы надеялся, что Python сделает то же самое. Если бы это было так, код, который я написал в сообщении, указанном выше, работал бы. Поскольку этого не произошло, теперь я пытаюсь понять, почему. Мое лучшее предположение таково, пожалуйста, поправьте меня, где я ошибаюсь:

  1. Определения процедур формы async def foo(): ...фактически интерпретируются как методы наследника класса coroutine.
  2. Возможно, async defфактически разделен на несколько методов awaitоператорами, где объект, для которого эти методы вызываются, может отслеживать прогресс, достигнутый им в выполнении на данный момент.
  3. Если вышесказанное верно, то, по сути, выполнение сопрограммы сводится к вызову методов объекта сопрограммы каким-либо глобальным менеджером (циклом?).
  4. Глобальный менеджер каким-то образом (как?) Знает, когда операции ввода-вывода выполняются кодом Python (только?), И может выбрать один из ожидающих методов сопрограммы для выполнения после того, как текущий выполняющийся метод отказался от управления (нажмите на awaitоператор ).

Другими словами, вот моя попытка «десугарировать» некоторый asyncioсинтаксис во что-то более понятное:

async def coro(name):
    print('before', name)
    await asyncio.sleep()
    print('after', name)

asyncio.gather(coro('first'), coro('second'))

# translated from async def coro(name)
class Coro(coroutine):
    def before(self, name):
        print('before', name)

    def after(self, name):
        print('after', name)

    def __init__(self, name):
        self.name = name
        self.parts = self.before, self.after
        self.pos = 0

    def __call__():
        self.parts[self.pos](self.name)
        self.pos += 1

    def done(self):
        return self.pos == len(self.parts)


# translated from asyncio.gather()
class AsyncIOManager:

    def gather(*coros):
        while not every(c.done() for c in coros):
            coro = random.choice(coros)
            coro()

Если моя догадка окажется верной: тогда у меня проблема. Как на самом деле происходит ввод-вывод в этом сценарии? В отдельной ветке? Приостановлен ли весь интерпретатор, а ввод-вывод происходит вне интерпретатора? Что именно подразумевается под вводом-выводом? Если моя процедура python вызвала процедуру C open(), а она, в свою очередь, отправила прерывание ядру, передав ему управление, как интерпретатор Python знает об этом и может продолжить выполнение некоторого другого кода, в то время как код ядра выполняет фактический ввод-вывод и до тех пор, пока он пробуждает процедуру Python, которая изначально отправила прерывание? Как интерпретатор Python в принципе может знать об этом?

wvxvw
источник
2
Большая часть логики обрабатывается реализацией цикла событий. Посмотрите, как BaseEventLoopреализован CPython : github.com/python/cpython/blob/…
Blender
@Blender хорошо, я думаю, что наконец нашел то, что хотел, но теперь я не понимаю, почему код был написан таким, каким он был. Почему _run_once, которая на самом деле является единственной полезной функцией во всем этом модуле, сделана «частной»? Реализация ужасная, но это меньшая проблема. Почему единственная функция, которую вы когда-либо хотели бы вызвать в цикле событий, помечена как «не звоните мне»?
wvxvw
Это вопрос для списка рассылки. Какой вариант использования потребует от вас прикосновения _run_onceв первую очередь?
Blender
8
Однако это не совсем ответ на мой вопрос. Как бы вы решили любую полезную задачу, используя только _run_once? asyncioявляется сложным и имеет свои недостатки, но, пожалуйста, продолжайте обсуждение вежливо. Не ругайте разработчиков, стоящих за кодом, которого вы сами не понимаете.
Blender
1
@ user8371915 Если вы считаете, что есть что-то, что я не освещал, вы можете добавить или прокомментировать мой ответ.
Bharel

Ответы:

206

Как работает asyncio?

Прежде чем ответить на этот вопрос, нам нужно понять несколько основных терминов, пропустите их, если вы уже знаете какие-либо из них.

Генераторы

Генераторы - это объекты, которые позволяют нам приостановить выполнение функции Python. Созданные пользователем генераторы реализуются с использованием ключевого слова yield. Создавая обычную функцию, содержащую yieldключевое слово, мы превращаем эту функцию в генератор:

>>> def test():
...     yield 1
...     yield 2
...
>>> gen = test()
>>> next(gen)
1
>>> next(gen)
2
>>> next(gen)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

Как видите, вызов next()генератора заставляет интерпретатор загружать фрейм теста и возвращать yieldзначение ed. При next()повторном вызове заставляет фрейм снова загружаться в стек интерпретатора и продолжать yieldввод другого значения.

К третьему next()вызову наш генератор закончился, и StopIterationего бросили.

Общение с генератором

Менее известной особенностью генераторов является тот факт, что вы можете общаться с ними двумя способами: send()и throw().

>>> def test():
...     val = yield 1
...     print(val)
...     yield 2
...     yield 3
...
>>> gen = test()
>>> next(gen)
1
>>> gen.send("abc")
abc
2
>>> gen.throw(Exception())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 4, in test
Exception

После вызова gen.send()значение передается как возвращаемое значение из yieldключевого слова.

gen.throw()с другой стороны, позволяет генерировать исключения внутри генераторов, за исключением случаев, когда они yieldвызываются в том же месте .

Возврат значений от генераторов

Возврат значения от генератора приводит к тому, что значение помещается в StopIterationисключение. Позже мы можем восстановить значение из исключения и использовать его для наших нужд.

>>> def test():
...     yield 1
...     return "abc"
...
>>> gen = test()
>>> next(gen)
1
>>> try:
...     next(gen)
... except StopIteration as exc:
...     print(exc.value)
...
abc

Вот новое ключевое слово: yield from

Python 3.4 пришел с добавлением нового ключевого слова: yield from. То , что это ключевое слово позволяет нам сделать, это пройти по любому next(), send()и throw()в внутреннепризматический наиболее вложенный генератор. Если внутренний генератор возвращает значение, оно также является возвращаемым значением yield from:

>>> def inner():
...     inner_result = yield 2
...     print('inner', inner_result)
...     return 3
...
>>> def outer():
...     yield 1
...     val = yield from inner()
...     print('outer', val)
...     yield 4
...
>>> gen = outer()
>>> next(gen)
1
>>> next(gen) # Goes inside inner() automatically
2
>>> gen.send("abc")
inner abc
outer 3
4

Я написал статью, чтобы подробнее остановиться на этой теме.

Собираем все вместе

После введения нового ключевого слова yield fromв Python 3.4 мы теперь могли создавать генераторы внутри генераторов, которые, как туннель, передают данные туда и обратно от самого внутреннего к самому внешнему генератору. Это породило новое значение для генераторов - сопрограмм .

Сопрограммы - это функции, которые можно останавливать и возобновлять во время работы. В Python они определяются с помощью async defключевого слова. Так же, как генераторы, они также используют свою собственную форму , yield fromкоторая await. До asyncи awaitбыли введены в Python 3.5, мы создали сопрограммы в точно так же, генераторах были созданы (с yield fromвместо await).

async def inner():
    return 1

async def outer():
    await inner()

Как и любой итератор или генератор, реализующий __iter__()метод, сопрограммы реализуют, __await__()что позволяет им продолжать работу каждый раз при await coroвызове.

Внутри документации Python есть хорошая диаграмма последовательности, которую вы должны проверить.

В asyncio, помимо функций сопрограмм, у нас есть 2 важных объекта: задачи и фьючерсы .

Фьючерсы

Фьючерсы - это объекты, в которых __await__()реализован метод, и их задача - сохранять определенное состояние и результат. Состояние может быть одним из следующих:

  1. ОЖИДАНИЕ - future не имеет результата или исключений.
  2. ОТМЕНЕНО - будущее было отменено с помощью fut.cancel()
  3. ЗАВЕРШЕНО - будущее было завершено либо с использованием набора результатов, либо с помощью набора fut.set_result()исключений с использованиемfut.set_exception()

Результатом, как вы уже догадались, может быть либо объект Python, который будет возвращен, либо исключение, которое может возникнуть.

Другой важной особенностью futureобъектов является то, что они содержат метод с именем add_done_callback(). Этот метод позволяет вызывать функции, как только задача будет выполнена - независимо от того, вызвало ли она исключение или завершилось.

Задачи

Объекты задач - это специальные фьючерсы, которые охватывают сопрограммы и взаимодействуют с самыми внутренними и самыми внешними сопрограммами. Каждый раз, когда сопрограмма awaitпередает future, будущее полностью передается задаче (как в примере yield from), и задача получает его.

Далее задача привязывается к будущему. Он делает это, обращаясь add_done_callback()к будущему. С этого момента, если будущее когда-либо будет выполнено, либо путем отмены, либо передачи исключения, либо в результате передачи объекта Python, будет вызван обратный вызов задачи, и он вернется к существованию.

Asyncio

Последний животрепещущий вопрос, на который мы должны ответить, - как реализован ввод-вывод?

Глубоко внутри asyncio у нас есть цикл обработки событий. Цикл событий задач. Задача цикла обработки событий - вызывать задачи каждый раз, когда они готовы, и координировать все эти усилия на одной рабочей машине.

Часть IO цикла событий построена на единственной вызываемой критически важной функции select. Select - это функция блокировки, реализуемая операционной системой ниже и позволяющая ожидать на сокетах входящих или исходящих данных. После получения данных он просыпается и возвращает сокеты, которые получили данные, или сокеты, готовые к записи.

Когда вы пытаетесь получить или отправить данные через сокет через asyncio, на самом деле ниже происходит то, что сокет сначала проверяется, есть ли в нем какие-либо данные, которые можно немедленно прочитать или отправить. Если его .send()буфер заполнен или .recv()буфер пуст, сокет регистрируется в selectфункции (путем простого добавления его в один из списков rlistдля recvи wlistдля send), а соответствующая функция представляет собой awaitвновь созданный futureобъект, связанный с этим сокетом.

Когда все доступные задачи ожидают фьючерсов, цикл событий вызывает selectи ждет. Когда в один из сокетов поступают входящие данные или его sendбуфер исчерпан, asyncio проверяет будущий объект, привязанный к этому сокету, и устанавливает его в состояние «Готово».

Теперь все волшебство происходит. Будущее настроено на выполненное, задача, которая добавлялась раньше с помощью add_done_callback(), возвращается к жизни и вызывает .send()сопрограмму, которая возобновляет внутреннюю сопрограмму (из-за awaitцепочки), и вы читаете недавно полученные данные из ближайшего буфера, который он пролился на.

Снова цепочка методов в случае recv():

  1. select.select ждет.
  2. Возвращается готовый сокет с данными.
  3. Данные из сокета перемещаются в буфер.
  4. future.set_result() называется.
  5. Задача, которая была добавлена, add_done_callback()теперь просыпается.
  6. Задача вызывает .send()сопрограмму, которая полностью входит в самую внутреннюю сопрограмму и пробуждает ее.
  7. Данные считываются из буфера и возвращаются нашему скромному пользователю.

Таким образом, asyncio использует возможности генератора, которые позволяют приостанавливать и возобновлять функции. Он использует yield fromвозможности, которые позволяют передавать данные туда и обратно от самого внутреннего генератора к самому внешнему. Он использует все это, чтобы остановить выполнение функции, пока он ожидает завершения ввода-вывода (с помощью selectфункции ОС ).

А лучше всего? Пока одна функция приостановлена, другая может работать и чередоваться с деликатной тканью, что является асинхронным.

Bharel
источник
12
Если требуются дополнительные объяснения, не стесняйтесь комментировать. Кстати, я не совсем уверен, должен ли я писать это как статью в блоге или как ответ в stackoverflow. На этот вопрос нужно долго отвечать.
Bharel
1
В асинхронном сокете при попытке отправить или получить данные сначала проверяется буфер ОС. Если вы пытаетесь получить, а в буфере нет данных, основная функция приема вернет значение ошибки, которое будет распространяться как исключение в Python. То же самое с отправкой и полным буфером. Когда возникает исключение, Python, в свою очередь, отправляет эти сокеты функции выбора, которая приостанавливает процесс. Но дело не в том, как работает asyncio, а в том, как работают select и сокеты, что также сильно зависит от ОС.
Bharel 01
2
@ user8371915 Всегда здесь, чтобы помочь :-) Имейте в виду, что для того, чтобы понять Asyncio, вы должны знать, как генераторы, как yield fromработают генераторы . Однако я заметил вверху, что его можно пропустить, если читатель уже знает об этом :-) Что еще, по вашему мнению, я должен добавить?
Bharel 02
2
Вещи перед разделом Asyncio , пожалуй, самые важные, поскольку это единственное, что язык действительно делает сам по себе. Это также selectможет быть квалифицировано, поскольку именно так работают в ОС системные вызовы неблокирующего ввода-вывода. Фактические asyncioконструкции и цикл событий - это просто код уровня приложения, созданный из этих вещей.
MisterMiyagi
3
В этом посте есть информация об основах асинхронного ввода-вывода в Python. Спасибо за такое любезное объяснение.
mjkim
83

Речь async/awaitи asyncioне об одном и том же. Первая - это фундаментальная низкоуровневая конструкция (сопрограммы), а вторая - это библиотека, использующая эти конструкции. И наоборот, нет однозначного ответа.

Ниже приводится общее описание того, как работают async/awaitи- asyncioподобные библиотеки. То есть, сверху могут быть другие уловки (есть ...), но они несущественны, если вы сами их не построите. Разница должна быть незначительной, если вы уже не знаете достаточно, чтобы не задавать такой вопрос.

1. Сопрограммы против подпрограмм в скорлупе

Точно так же, как подпрограммы (функции, процедуры, ...), сопрограммы (генераторы, ...) представляют собой абстракцию стека вызовов и указателя инструкций: существует стек выполняемых частей кода, и каждая из них находится в определенной инструкции.

Различие между defпротивопоставлением async defиспользуется просто для ясности. Фактическая разница - returnпротив yield. От этого awaitили yield fromвзять разницу от отдельных вызовов до целых стеков.

1.1. Подпрограммы

Подпрограмма представляет новый уровень стека для хранения локальных переменных и однократный обход ее инструкций для достижения конца. Рассмотрим такую ​​подпрограмму:

def subfoo(bar):
     qux = 3
     return qux * bar

Когда вы его запускаете, это означает

  1. выделить пространство стека для barиqux
  2. рекурсивно выполнить первый оператор и перейти к следующему оператору
  3. один раз returnпередайте его значение в стек вызовов
  4. очистить стек (1.) и указатель инструкции (2.)

Примечательно, что 4. означает, что подпрограмма всегда запускается в одном и том же состоянии. После завершения все, что относится только к самой функции, теряется. Функцию нельзя возобновить, даже если после нее есть инструкции return.

root -\
  :    \- subfoo --\
  :/--<---return --/
  |
  V

1.2. Сопрограммы как постоянные подпрограммы

Сопрограмма похожа на подпрограмму, но может завершиться без разрушения своего состояния. Рассмотрим такую ​​сопрограмму:

 def cofoo(bar):
      qux = yield bar  # yield marks a break point
      return qux

Когда вы его запускаете, это означает

  1. выделить пространство стека для barиqux
  2. рекурсивно выполнить первый оператор и перейти к следующему оператору
    1. один раз в a yield, поместите его значение в вызывающий стек, но сохраните указатель стека и инструкции
    2. после вызова yield, восстановить стек и указатель инструкции и передать аргументы вqux
  3. один раз returnпередайте его значение в стек вызовов
  4. очистить стек (1.) и указатель инструкции (2.)

Обратите внимание на добавление 2.1 и 2.2 - сопрограмма может быть приостановлена ​​и возобновлена ​​в заранее определенных точках. Это похоже на то, как подпрограмма приостанавливается во время вызова другой подпрограммы. Разница в том, что активная сопрограмма не привязана строго к своему стеку вызовов. Вместо этого приостановленная сопрограмма является частью отдельного изолированного стека.

root -\
  :    \- cofoo --\
  :/--<+--yield --/
  |    :
  V    :

Это означает, что подвешенные сопрограммы можно свободно хранить или перемещать между стеками. Любой стек вызовов, имеющий доступ к сопрограмме, может решить возобновить ее.

1.3. Обход стека вызовов

Пока наша сопрограмма идет вниз по стеку вызовов только с yield. Подпрограмма может перемещаться вниз и вверх по стеку вызовов с помощью returnи (). Для полноты сопрограмм также нужен механизм для подъема по стеку вызовов. Рассмотрим такую ​​сопрограмму:

def wrap():
    yield 'before'
    yield from cofoo()
    yield 'after'

Когда вы его запускаете, это означает, что он по-прежнему выделяет стек и указатель инструкций как подпрограмму. Когда он приостанавливается, это по-прежнему похоже на сохранение подпрограммы.

Однако yield fromделает и то , и другое . Он приостанавливает wrap и запускает указатель стека и инструкции cofoo. Обратите внимание, что wrapостается приостановленным до cofooполного завершения. Всякий раз, когда cofooприостанавливается или что-то отправляется, cofooон напрямую подключается к вызывающему стеку.

1.4. Все сопрограммы вниз

Как установлено, yield fromпозволяет соединить два прицела через другой промежуточный. При рекурсивном применении это означает, что вершина стека может быть соединена с нижней частью стека.

root -\
  :    \-> coro_a -yield-from-> coro_b --\
  :/ <-+------------------------yield ---/
  |    :
  :\ --+-- coro_a.send----------yield ---\
  :                             coro_b <-/

Учтите, что rootи coro_bдруг о друге не знают. Это делает сопрограммы намного чище, чем обратные вызовы: сопрограммы по-прежнему построены на соотношении 1: 1, как подпрограммы. Сопрограммы приостанавливают и возобновляют весь свой существующий стек выполнения до обычной точки вызова.

Примечательно, что rootможно было возобновить произвольное количество сопрограмм. Тем не менее, он никогда не может возобновиться более чем по одному одновременно. Сопрограммы одного и того же корня параллельны, но не параллельны!

1.5. Python asyncиawait

Объяснение до сих пор явно используется yieldи yield fromсловарный запас генераторов - лежащий в основе функциональность та же. Новый синтаксис Python3.5 asyncи awaitсуществует в основном для ясности.

def foo():  # subroutine?
     return None

def foo():  # coroutine?
     yield from foofoo()  # generator? coroutine?

async def foo():  # coroutine!
     await foofoo()  # coroutine!
     return None

async forИ async withутверждения необходимы , потому что вы бы разорвать yield from/awaitцепь с голым forи withотчетностью.

2. Анатомия простого цикла событий

Сама по себе сопрограмма не имеет понятия о передаче управления другой сопрограмме. Он может передать управление только вызывающей стороне в нижней части стека сопрограмм. Затем этот вызывающий может переключиться на другую сопрограмму и запустить ее.

Этот корневой узел нескольких сопрограмм обычно представляет собой цикл событий : при приостановке сопрограмма выдает событие, на котором она хочет возобновить. В свою очередь, цикл событий может эффективно ожидать возникновения этих событий. Это позволяет ему решать, какую сопрограмму запускать следующей или как подождать перед возобновлением.

Такой дизайн подразумевает наличие набора заранее определенных событий, которые понимает цикл. Несколько сопрограмм awaitдруг друга, пока, наконец, не будет awaitобработано событие . Это событие может напрямую связываться с циклом событий посредством yieldуправления.

loop -\
  :    \-> coroutine --await--> event --\
  :/ <-+----------------------- yield --/
  |    :
  |    :  # loop waits for event to happen
  |    :
  :\ --+-- send(reply) -------- yield --\
  :        coroutine <--yield-- event <-/

Ключ в том, что приостановка сопрограмм позволяет циклу событий и событиям напрямую взаимодействовать. Промежуточный стек сопрограмм не требует каких-либо знаний о том, в каком цикле он выполняется, или о том, как работают события.

2.1.1. События во времени

Самое простое событие для обработки - достижение определенного момента времени. Это также фундаментальный блок многопоточного кода: поток повторяется sleepдо тех пор, пока условие не станет истинным. Однако обычное sleepвыполнение блоков само по себе - мы хотим, чтобы другие сопрограммы не блокировались. Вместо этого мы хотим сообщить циклу событий, когда он должен возобновить текущий стек сопрограмм.

2.1.2. Определение события

Событие - это просто значение, которое мы можем идентифицировать - будь то через перечисление, тип или другой идентификатор. Мы можем определить это с помощью простого класса, который хранит наше целевое время. Помимо хранения информации о событии, мы можем awaitнапрямую разрешить класс.

class AsyncSleep:
    """Event to sleep until a point in time"""
    def __init__(self, until: float):
        self.until = until

    # used whenever someone ``await``s an instance of this Event
    def __await__(self):
        # yield this Event to the loop
        yield self

    def __repr__(self):
        return '%s(until=%.1f)' % (self.__class__.__name__, self.until)

Этот класс только сохраняет событие - он не говорит, как на самом деле его обработать.

Единственная особенность __await__- это то, что awaitищет ключевое слово. Фактически, это итератор, но он недоступен для обычного итерационного механизма.

2.2.1. В ожидании события

Теперь, когда у нас есть событие, как на него реагируют сопрограммы? Мы должны иметь возможность выразить эквивалент sleepпосредством awaitнашего события. Чтобы лучше увидеть, что происходит, мы ждем дважды половину времени:

import time

async def asleep(duration: float):
    """await that ``duration`` seconds pass"""
    await AsyncSleep(time.time() + duration / 2)
    await AsyncSleep(time.time() + duration / 2)

Мы можем напрямую создать и запустить эту сопрограмму. Как и в случае с генератором, при использовании coroutine.sendсопрограмма запускается до тех пор, пока не будет yieldполучен результат.

coroutine = asleep(100)
while True:
    print(coroutine.send(None))
    time.sleep(0.1)

Это дает нам два AsyncSleepсобытия, а затем, StopIterationкогда сопрограмма завершена. Обратите внимание, что единственная задержка - это от time.sleepцикла! Каждый AsyncSleepхранит только смещение от текущего времени.

2.2.2. Событие + Сон

На данный момент в нашем распоряжении есть два отдельных механизма:

  • AsyncSleep События, которые могут быть получены из сопрограммы
  • time.sleep что может ждать, не влияя на сопрограммы

Примечательно, что эти двое ортогональны: ни один не влияет на другой и не запускает его. В результате мы можем придумать нашу собственную стратегию, sleepчтобы противостоять задержке файла AsyncSleep.

2.3. Наивный цикл событий

Если у нас несколько сопрограмм, каждая из них может сказать нам, когда она хочет, чтобы ее разбудили. Затем мы можем дождаться возобновления первого из них, затем следующего и так далее. Примечательно, что на каждом этапе нам важно только то, что будет следующим .

Это упрощает планирование:

  1. сортировать сопрограммы по желаемому времени пробуждения
  2. выбрать первого, кто хочет проснуться
  3. подожди до этого момента
  4. запустить эту сопрограмму
  5. повторять от 1.

Тривиальная реализация не требует каких-либо сложных концепций. A listпозволяет сортировать сопрограммы по дате. Ожидание обычное time.sleep. Запуск сопрограмм работает так же, как и раньше, с coroutine.send.

def run(*coroutines):
    """Cooperatively run all ``coroutines`` until completion"""
    # store wake-up-time and coroutines
    waiting = [(0, coroutine) for coroutine in coroutines]
    while waiting:
        # 2. pick the first coroutine that wants to wake up
        until, coroutine = waiting.pop(0)
        # 3. wait until this point in time
        time.sleep(max(0.0, until - time.time()))
        # 4. run this coroutine
        try:
            command = coroutine.send(None)
        except StopIteration:
            continue
        # 1. sort coroutines by their desired suspension
        if isinstance(command, AsyncSleep):
            waiting.append((command.until, coroutine))
            waiting.sort(key=lambda item: item[0])

Конечно, здесь есть много возможностей для улучшения. Мы можем использовать кучу для очереди ожидания или таблицу отправки для событий. Мы также можем получить возвращаемые значения из StopIterationи назначить их сопрограмме. Однако основной принцип остается прежним.

2.4. Кооперативное ожидание

AsyncSleepСобытие и runцикл обработки событий является полностью работоспособно осуществлением своевременных мероприятий.

async def sleepy(identifier: str = "coroutine", count=5):
    for i in range(count):
        print(identifier, 'step', i + 1, 'at %.2f' % time.time())
        await asleep(0.1)

run(*(sleepy("coroutine %d" % j) for j in range(5)))

При этом происходит совместное переключение между каждой из пяти сопрограмм, каждая из которых приостанавливается на 0,1 секунды. Несмотря на то, что цикл обработки событий является синхронным, он по-прежнему выполняет работу за 0,5 секунды вместо 2,5 секунд. Каждая сопрограмма хранит состояние и действует независимо.

3. Цикл событий ввода-вывода

Поддерживаемый цикл событий sleepподходит для опроса . Однако ожидание ввода-вывода для дескриптора файла может быть выполнено более эффективно: операционная система реализует ввод-вывод и, таким образом, знает, какие дескрипторы готовы. В идеале цикл событий должен поддерживать явное событие «готовность к вводу-выводу».

3.1. selectвызов

У Python уже есть интерфейс для запроса в ОС дескрипторов ввода-вывода для чтения. Когда вызывается с дескрипторами для чтения или записи, он возвращает дескрипторы, готовые к чтению или записи:

readable, writeable, _ = select.select(rlist, wlist, xlist, timeout)

Например, мы можем openфайл для записи и ждать, пока он будет готов:

write_target = open('/tmp/foo')
readable, writeable, _ = select.select([], [write_target], [])

После возврата select writeableсодержит наш открытый файл.

3.2. Базовое событие ввода / вывода

Подобно AsyncSleepзапросу, нам нужно определить событие для ввода-вывода. Согласно базовой selectлогике событие должно относиться к читаемому объекту, например к openфайлу. Кроме того, мы храним, сколько данных нужно прочитать.

class AsyncRead:
    def __init__(self, file, amount=1):
        self.file = file
        self.amount = amount
        self._buffer = ''

    def __await__(self):
        while len(self._buffer) < self.amount:
            yield self
            # we only get here if ``read`` should not block
            self._buffer += self.file.read(1)
        return self._buffer

    def __repr__(self):
        return '%s(file=%s, amount=%d, progress=%d)' % (
            self.__class__.__name__, self.file, self.amount, len(self._buffer)
        )

Как и в случае с, AsyncSleepмы в основном просто сохраняем данные, необходимые для основного системного вызова. На этот раз __await__его можно возобновлять несколько раз - пока желаемое amountне будет прочитано. Кроме того, мы получаем returnрезультат ввода-вывода, а не просто возобновляем.

3.3. Дополнение цикла событий чтением ввода-вывода

Основа для нашего цикла событий по-прежнему runопределена ранее. Во-первых, нам нужно отслеживать запросы на чтение. Это больше не отсортированное расписание, мы сопоставляем только запросы чтения с сопрограммами.

# new
waiting_read = {}  # type: Dict[file, coroutine]

Поскольку select.selectпринимает параметр тайм-аута, мы можем использовать его вместо time.sleep.

# old
time.sleep(max(0.0, until - time.time()))
# new
readable, _, _ = select.select(list(reads), [], [])

Это дает нам все читаемые файлы - если они есть, мы запускаем соответствующую сопрограмму. Если таковых нет, значит, мы достаточно долго ждали запуска нашей текущей сопрограммы.

# new - reschedule waiting coroutine, run readable coroutine
if readable:
    waiting.append((until, coroutine))
    waiting.sort()
    coroutine = waiting_read[readable[0]]

Наконец, мы должны фактически прослушивать запросы на чтение.

# new
if isinstance(command, AsyncSleep):
    ...
elif isinstance(command, AsyncRead):
    ...

3.4. Собираем все вместе

Вышесказанное было небольшим упрощением. Нам нужно кое-что переключить, чтобы не истощать спящие сопрограммы, если мы всегда можем читать. Нам нужно справиться с тем, что нечего читать или нечего ждать. Однако конечный результат по-прежнему соответствует 30 LOC.

def run(*coroutines):
    """Cooperatively run all ``coroutines`` until completion"""
    waiting_read = {}  # type: Dict[file, coroutine]
    waiting = [(0, coroutine) for coroutine in coroutines]
    while waiting or waiting_read:
        # 2. wait until the next coroutine may run or read ...
        try:
            until, coroutine = waiting.pop(0)
        except IndexError:
            until, coroutine = float('inf'), None
            readable, _, _ = select.select(list(waiting_read), [], [])
        else:
            readable, _, _ = select.select(list(waiting_read), [], [], max(0.0, until - time.time()))
        # ... and select the appropriate one
        if readable and time.time() < until:
            if until and coroutine:
                waiting.append((until, coroutine))
                waiting.sort()
            coroutine = waiting_read.pop(readable[0])
        # 3. run this coroutine
        try:
            command = coroutine.send(None)
        except StopIteration:
            continue
        # 1. sort coroutines by their desired suspension ...
        if isinstance(command, AsyncSleep):
            waiting.append((command.until, coroutine))
            waiting.sort(key=lambda item: item[0])
        # ... or register reads
        elif isinstance(command, AsyncRead):
            waiting_read[command.file] = coroutine

3.5. Кооперативный ввод / вывод

Реализации AsyncSleep, AsyncReadи runтеперь полностью функциональны для сна и / или чтения. Как и в случае sleepy, мы можем определить помощника для проверки чтения:

async def ready(path, amount=1024*32):
    print('read', path, 'at', '%d' % time.time())
    with open(path, 'rb') as file:
        result = return await AsyncRead(file, amount)
    print('done', path, 'at', '%d' % time.time())
    print('got', len(result), 'B')

run(sleepy('background', 5), ready('/dev/urandom'))

Запустив это, мы видим, что наш ввод-вывод чередуется с ожидающей задачей:

id background round 1
read /dev/urandom at 1530721148
id background round 2
id background round 3
id background round 4
id background round 5
done /dev/urandom at 1530721148
got 1024 B

4. Неблокирующий ввод-вывод

Хотя ввод-вывод в файлах передает эту концепцию, он не совсем подходит для библиотеки, например asyncio: selectвызов всегда возвращается для файлов , и оба openи readмогут блокироваться на неопределенное время . Это блокирует все сопрограммы цикла обработки событий - что плохо. Такие библиотеки, как aiofilesиспользование потоков и синхронизации для имитации неблокирующего ввода-вывода и событий в файле.

Однако сокеты допускают неблокирующий ввод-вывод, а присущая им задержка делает его гораздо более важным. При использовании в цикле событий ожидание данных и повторная попытка могут быть заключены в оболочку, ничего не блокируя.

4.1. Неблокирующее событие ввода / вывода

Подобно нашему AsyncRead, мы можем определить событие приостановки и чтения для сокетов. Вместо файла мы берем сокет, который должен быть неблокирующим. Кроме того, мы __await__используем socket.recvвместо file.read.

class AsyncRecv:
    def __init__(self, connection, amount=1, read_buffer=1024):
        assert not connection.getblocking(), 'connection must be non-blocking for async recv'
        self.connection = connection
        self.amount = amount
        self.read_buffer = read_buffer
        self._buffer = b''

    def __await__(self):
        while len(self._buffer) < self.amount:
            try:
                self._buffer += self.connection.recv(self.read_buffer)
            except BlockingIOError:
                yield self
        return self._buffer

    def __repr__(self):
        return '%s(file=%s, amount=%d, progress=%d)' % (
            self.__class__.__name__, self.connection, self.amount, len(self._buffer)
        )

В отличие от AsyncRead, __await__выполняет действительно неблокирующий ввод-вывод. Когда данные доступны, они всегда читаются. Когда данные недоступны, он всегда приостанавливается. Это означает, что цикл обработки событий блокируется только на время выполнения полезной работы.

4.2. Разблокирование цикла событий

Что касается цикла событий, то здесь особо ничего не меняется. Событие для прослушивания остается таким же, как и для файлов - файловый дескриптор, отмеченный как готовый select.

# old
elif isinstance(command, AsyncRead):
    waiting_read[command.file] = coroutine
# new
elif isinstance(command, AsyncRead):
    waiting_read[command.file] = coroutine
elif isinstance(command, AsyncRecv):
    waiting_read[command.connection] = coroutine

На этом этапе должно быть очевидно, что AsyncReadи AsyncRecvявляются событиями того же типа. Мы могли бы легко реорганизовать их в одно событие с заменяемым компонентом ввода-вывода. По сути, цикл событий, сопрограммы и события четко разделяют планировщик, произвольный промежуточный код и фактический ввод-вывод.

4.3. Уродливая сторона неблокирующего ввода-вывода

В принципе, что вы должны сделать в этой точке повторить логику readкак recvдля AsyncRecv. Однако сейчас это намного уродливее - вам нужно обрабатывать ранние возвраты, когда функции блокируются внутри ядра, но передают вам управление. Например, открытие соединения по сравнению с открытием файла намного дольше:

# file
file = open(path, 'rb')
# non-blocking socket
connection = socket.socket()
connection.setblocking(False)
# open without blocking - retry on failure
try:
    connection.connect((url, port))
except BlockingIOError:
    pass

Короче говоря, осталось несколько десятков строк обработки исключений. На этом этапе события и цикл событий уже работают.

id background round 1
read localhost:25000 at 1530783569
read /dev/urandom at 1530783569
done localhost:25000 at 1530783569 got 32768 B
id background round 2
id background round 3
id background round 4
done /dev/urandom at 1530783569 got 4096 B
id background round 5

Дополнение

Пример кода на github

Мистер Мияги
источник
Использование yield selfв AsyncSleep дает мне Task got back yieldошибку, почему? Я вижу, что код asyncio.Futures использует это. Использование чистого урожая отлично работает.
Рон Серруя
1
Циклы событий обычно ожидают только свои собственные события. Обычно нельзя смешивать события и циклы событий в библиотеках; показанные здесь события работают только с показанным циклом событий. В частности, asyncio использует только None (т.е. чистый доход) в качестве сигнала для цикла событий. События напрямую взаимодействуют с объектом цикла событий для регистрации пробуждений.
MisterMiyagi
12

Ваше coroудаление сахара концептуально правильно, но немного неполно.

awaitне приостанавливает безоговорочно, но только если встречает блокирующий вызов. Как узнать, что звонок заблокирован? Это определяется ожидаемым кодом. Например, ожидаемая реализация чтения сокета может быть уменьшена до:

def read(sock, n):
    # sock must be in non-blocking mode
    try:
        return sock.recv(n)
    except EWOULDBLOCK:
        event_loop.add_reader(sock.fileno, current_task())
        return SUSPEND

В реальном asyncio эквивалентный код изменяет состояние a Futureвместо того, чтобы возвращать магические значения, но концепция остается той же. При соответствующей адаптации к объекту, подобному генератору, приведенный выше код можно awaitредактировать.

На стороне вызывающего, когда ваша сопрограмма содержит:

data = await read(sock, 1024)

Он превращает сахар во что-то близкое к:

data = read(sock, 1024)
if data is SUSPEND:
    return SUSPEND
self.pos += 1
self.parts[self.pos](...)

Люди, знакомые с генераторами, склонны описывать вышесказанное в терминах yield fromавтоматической приостановки.

Цепочка приостановки продолжается вплоть до цикла событий, который замечает, что сопрограмма приостановлена, удаляет ее из рабочего набора и продолжает выполнять сопрограммы, которые запускаются, если таковые имеются. Если сопрограммы не запускаются, цикл ожидает, select()пока дескриптор файла, который интересует сопрограмму, не станет готовым для ввода-вывода. (Цикл событий поддерживает отображение дескриптора файла в сопрограмму.)

В приведенном выше примере, как только select()сообщается, что цикл событий доступен для sockчтения, он будет повторно добавлен coroк запускаемому набору, поэтому он будет продолжен с точки приостановки.

Другими словами:

  1. По умолчанию все происходит в одном потоке.

  2. Цикл событий отвечает за планирование сопрограмм и их пробуждение, когда то, чего они ждали (обычно вызов ввода-вывода, который обычно блокируется, или тайм-аут) становится готовым.

Чтобы получить представление о циклах событий, управляющих сопрограммами, я рекомендую этот доклад Дейва Бизли, в котором он демонстрирует кодирование цикла событий с нуля перед живой аудиторией.

пользователь4815162342
источник
Спасибо, это ближе к тому, что мне нужно, но это все еще не объясняет, почему async.wait_for()не делает то, что должен ... Почему такая большая проблема - добавить обратный вызов в цикл событий и сообщить об этом обработать необходимое количество обратных вызовов, в том числе только что добавленный? Мое разочарование asyncioотчасти вызвано тем фактом, что основная концепция очень проста, и, например, Emacs Lisp имел реализацию на протяжении многих лет, без использования модных словечек ... (то есть create-async-processи accept-process-output- и это все, что нужно ... (продолжение)
wvxvw
10
@wvxvw Я сделал все, что мог, чтобы ответить на ваш вопрос, насколько это вообще возможно, учитывая, что только последний абзац содержит шесть вопросов. И поэтому мы продолжаем - дело не в том, что wait_for оно не выполняет то, что должно (оно работает, это сопрограмма, которую вы должны ждать), а в том, что ваши ожидания не соответствуют тому, для чего была разработана и реализована система. Я думаю, ваша проблема могла бы быть сопоставлена ​​с asyncio, если бы цикл событий выполнялся в отдельном потоке, но я не знаю подробностей вашего варианта использования, и, честно говоря, ваше отношение не делает его очень интересным, чтобы помочь вам.
user4815162342
5
@wvxvw My frustration with asyncio is in part due to the fact that the underlying concept is very simple, and, for example, Emacs Lisp had implementation for ages, without using buzzwords...- Тогда ничто не мешает вам реализовать эту простую концепцию без модных словечек для Python :) Почему вы вообще используете этот уродливый asyncio? Реализуйте собственное с нуля. Например, вы можете начать с создания собственной async.wait_for()функции, которая выполняет именно то, что от нее требуется.
Михаил Герасимов
1
@MikhailGerasimov вы, кажется, думаете, что это риторический вопрос. Но я бы хотел развеять для вас тайну. Язык предназначен для общения с другими. Я не могу выбирать для других, на каком языке они говорят, даже если я считаю, что язык, на котором они говорят, - мусор, лучшее, что я могу сделать, - это попытаться убедить их, что это так. Другими словами, если бы я был свободен выбирать, я бы никогда не выбрал Python для начала, не говоря уже о том asyncio. Но, в принципе, это не мое решение. Меня принуждают использовать язык мусора через en.wikipedia.org/wiki/Ultimatum_game .
wvxvw
4

Все сводится к двум основным проблемам, которые решает asyncio:

  • Как выполнить несколько операций ввода-вывода в одном потоке?
  • Как реализовать совместную многозадачность?

Ответ на первый вопрос существует уже давно и называется циклом выбора . В python это реализовано в модуле селекторов .

Второй вопрос связан с концепцией сопрограммы , то есть функций, которые могут останавливать свое выполнение и впоследствии восстанавливаться. В python сопрограммы реализуются с использованием генераторов и оператора yield from . Это то, что скрывается за синтаксисом async / await .

Дополнительные ресурсы в этом ответе .


РЕДАКТИРОВАТЬ: обращение к вашему комментарию о горутинах:

Ближайший эквивалент горутины в asyncio на самом деле не сопрограмма, а задача (см. Разницу в документации ). В python сопрограмма (или генератор) ничего не знает о концепциях цикла событий или ввода-вывода. Это просто функция, которая может остановить свое выполнение с yieldсохранением текущего состояния, чтобы его можно было восстановить позже. yield fromСинтаксис позволяет для построения цепочки их прозрачным образом.

Теперь в задаче asyncio сопрограмма в самом низу цепочки всегда возвращает будущее . Это будущее затем всплывает в цикл событий и интегрируется во внутренний механизм. Когда будущее установлено на выполнение каким-либо другим внутренним обратным вызовом, цикл событий может восстановить задачу, отправив будущее обратно в цепочку сопрограмм.


РЕДАКТИРОВАТЬ: отвечая на некоторые вопросы в вашем сообщении:

Как на самом деле происходит ввод-вывод в этом сценарии? В отдельной ветке? Приостановлен ли весь интерпретатор, а ввод-вывод происходит вне интерпретатора?

Нет, в ветке ничего не происходит. Ввод-вывод всегда управляется циклом событий, в основном через файловые дескрипторы. Однако регистрация этих файловых дескрипторов обычно скрыта сопрограммами высокого уровня, что делает грязную работу за вас.

Что именно подразумевается под вводом-выводом? Если моя процедура Python вызвала процедуру C open (), и она, в свою очередь, отправила прерывание ядру, передав ему управление, как интерпретатор Python знает об этом и может продолжить выполнение некоторого другого кода, в то время как код ядра выполняет фактический ввод / О, и пока она не разбудит процедуру Python, которая изначально отправила прерывание? Как интерпретатор Python в принципе может знать об этом?

Ввод / вывод - это любой блокирующий вызов. В asyncio все операции ввода-вывода должны проходить через цикл событий, потому что, как вы сказали, цикл событий не может знать, что блокирующий вызов выполняется в каком-то синхронном коде. Это означает, что вы не должны использовать синхронный openкод в контексте сопрограммы. Вместо этого используйте специальную библиотеку, такую ​​как aiofiles, которая предоставляет асинхронную версию open.

Винсент
источник
Сказать, что сопрограммы реализованы с использованием yield from, на самом деле ничего не говорит. yield fromэто просто синтаксическая конструкция, а не фундаментальный строительный блок, который могут выполнять компьютеры. Аналогично для цикла выбора. Да, сопрограммы в Go также используют цикл выбора, но то, что я пытался сделать, будет работать в Go, но не в Python. Мне нужны более подробные ответы, чтобы понять, почему это не сработало.
wvxvw
Извини ... нет, не совсем. «будущее», «задача», «прозрачный путь», «выход из» - это просто модные словечки, это не объекты из области программирования. в программировании есть переменные, процедуры и структуры. Таким образом, утверждение, что «горутина - это задача», - это просто круговое заявление, которое вызывает вопрос. В конечном счете, объяснение того, что asyncioдля меня действительно, будет сводиться к коду C, который иллюстрирует, во что был переведен синтаксис Python.
wvxvw
Чтобы еще больше объяснить, почему ваш ответ не отвечает на мой вопрос: со всей предоставленной вами информацией я понятия не имею, почему моя попытка из кода, который я опубликовал в связанном вопросе, не сработала. Я абсолютно уверен, что смогу написать цикл событий таким образом, чтобы этот код работал. Фактически, это был бы способ написать цикл обработки событий, если бы мне пришлось его написать.
wvxvw
7
@wvxvw Я не согласен. Это не «модные словечки», а высокоуровневые концепции, реализованные во многих библиотеках. Например, задача asyncio, гринлет gevent и горутина - все соответствуют одному и тому же: исполнительному модулю, который может работать одновременно в одном потоке. Также я не думаю, что C вообще нужен для понимания asyncio, если вы не хотите разбираться во внутренней работе генераторов Python.
Винсент
@wvxvw См. мою вторую правку. Это должно устранить некоторые заблуждения.
Винсент