Создание асинхронной задачи во Flask

104

Я пишу приложение на Flask, которое работает очень хорошо, за исключением того, что WSGIявляется синхронным и блокирующим. В частности, у меня есть одна задача, которая обращается к стороннему API, и выполнение этой задачи может занять несколько минут. Я хотел бы сделать этот звонок (на самом деле это серия звонков) и позволить ему работать. при этом управление возвращается к Flask.

Моя точка зрения выглядит так:

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file = audio_class.render_audio(data=text_list)
    # do stuff
    return Response(
        mimetype='application/json',
        status=200
    )

Теперь то, что я хочу сделать, это иметь линию

final_file = audio_class.render_audio()

run и предоставить обратный вызов, который будет выполняться при возврате метода, в то время как Flask может продолжать обрабатывать запросы. Это единственная задача, которую мне нужно, чтобы Flask выполнялся асинхронно, и я хотел бы получить совет, как лучше всего это реализовать.

Я посмотрел на Twisted и Klein, но не уверен, что они излишни, так как, возможно, Threading будет достаточно. Или, может быть, сельдерей для этого подойдет?

Дарвин Тек
источник
Я обычно использую для этого сельдерей ... это может быть излишним, но многопоточность afaik не работает в веб-средах (iirc ...)
Джоран Бизли
Правильно. Да, я только что исследовал сельдерей. Это может быть хороший подход. Легко реализовать с помощью Flask?
Darwin Tech
хех, я также склонен использовать сокет-сервер (flask-socketio), и да, я думал, что это довольно просто ... самое сложное - установить все,
Джоран Бизли
5
Я бы порекомендовал это проверить . Этот парень пишет отличные учебники для Flask в целом, и этот отлично подходит для понимания того, как интегрировать асинхронные задачи в приложение Flask.
atlspin

Ответы:

106

Я бы использовал Celery для обработки асинхронной задачи за вас. Вам нужно будет установить брокера, который будет служить вашей очередью задач (рекомендуется RabbitMQ и Redis).

app.py:

from flask import Flask
from celery import Celery

broker_url = 'amqp://guest@localhost'          # Broker URL for RabbitMQ task queue

app = Flask(__name__)    
celery = Celery(app.name, broker=broker_url)
celery.config_from_object('celeryconfig')      # Your celery configurations in a celeryconfig.py

@celery.task(bind=True)
def some_long_task(self, x, y):
    # Do some long task
    ...

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file = audio_class.render_audio(data=text_list)
    some_long_task.delay(x, y)                 # Call your async task and pass whatever necessary variables
    return Response(
        mimetype='application/json',
        status=200
    )

Запустите приложение Flask и запустите другой процесс для запуска вашего сельдерея.

$ celery worker -A app.celery --loglevel=debug

Я хотел бы также обратиться к Miguel Gringberg в подправить для более подробного руководства по использованию сельдерея с Колбой.

Конни
источник
Сельдерей - надежное решение, но оно не легкое и требует времени для настройки.
wobbily_col
34

Еще одно возможное решение - многопоточность. Хотя решение на основе Celery лучше для масштабируемых приложений, если вы не ожидаете слишком большого трафика на рассматриваемой конечной точке, потоки - жизнеспособная альтернатива.

Это решение основано на презентации PyCon 2016 Flask at Scale Мигеля Гринберга , в частности, на слайде 41 в его колоде слайдов. Его код также доступен на github для тех, кто интересуется первоисточником.

С точки зрения пользователя код работает следующим образом:

  1. Вы вызываете конечную точку, которая выполняет длительную задачу.
  2. Эта конечная точка возвращает 202 Accepted со ссылкой для проверки статуса задачи.
  3. Вызов ссылки статуса возвращает 202, пока таксы все еще выполняются, и возвращает 200 (и результат), когда задача завершена.

Чтобы преобразовать вызов api в фоновую задачу, просто добавьте декоратор @async_api.

Вот полный пример:

from flask import Flask, g, abort, current_app, request, url_for
from werkzeug.exceptions import HTTPException, InternalServerError
from flask_restful import Resource, Api
from datetime import datetime
from functools import wraps
import threading
import time
import uuid

tasks = {}

app = Flask(__name__)
api = Api(app)


@app.before_first_request
def before_first_request():
    """Start a background thread that cleans up old tasks."""
    def clean_old_tasks():
        """
        This function cleans up old tasks from our in-memory data structure.
        """
        global tasks
        while True:
            # Only keep tasks that are running or that finished less than 5
            # minutes ago.
            five_min_ago = datetime.timestamp(datetime.utcnow()) - 5 * 60
            tasks = {task_id: task for task_id, task in tasks.items()
                     if 'completion_timestamp' not in task or task['completion_timestamp'] > five_min_ago}
            time.sleep(60)

    if not current_app.config['TESTING']:
        thread = threading.Thread(target=clean_old_tasks)
        thread.start()


def async_api(wrapped_function):
    @wraps(wrapped_function)
    def new_function(*args, **kwargs):
        def task_call(flask_app, environ):
            # Create a request context similar to that of the original request
            # so that the task can have access to flask.g, flask.request, etc.
            with flask_app.request_context(environ):
                try:
                    tasks[task_id]['return_value'] = wrapped_function(*args, **kwargs)
                except HTTPException as e:
                    tasks[task_id]['return_value'] = current_app.handle_http_exception(e)
                except Exception as e:
                    # The function raised an exception, so we set a 500 error
                    tasks[task_id]['return_value'] = InternalServerError()
                    if current_app.debug:
                        # We want to find out if something happened so reraise
                        raise
                finally:
                    # We record the time of the response, to help in garbage
                    # collecting old tasks
                    tasks[task_id]['completion_timestamp'] = datetime.timestamp(datetime.utcnow())

                    # close the database session (if any)

        # Assign an id to the asynchronous task
        task_id = uuid.uuid4().hex

        # Record the task, and then launch it
        tasks[task_id] = {'task_thread': threading.Thread(
            target=task_call, args=(current_app._get_current_object(),
                               request.environ))}
        tasks[task_id]['task_thread'].start()

        # Return a 202 response, with a link that the client can use to
        # obtain task status
        print(url_for('gettaskstatus', task_id=task_id))
        return 'accepted', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
    return new_function


class GetTaskStatus(Resource):
    def get(self, task_id):
        """
        Return status about an asynchronous task. If this request returns a 202
        status code, it means that task hasn't finished yet. Else, the response
        from the task is returned.
        """
        task = tasks.get(task_id)
        if task is None:
            abort(404)
        if 'return_value' not in task:
            return '', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
        return task['return_value']


class CatchAll(Resource):
    @async_api
    def get(self, path=''):
        # perform some intensive processing
        print("starting processing task, path: '%s'" % path)
        time.sleep(10)
        print("completed processing task, path: '%s'" % path)
        return f'The answer is: {path}'


api.add_resource(CatchAll, '/<path:path>', '/')
api.add_resource(GetTaskStatus, '/status/<task_id>')


if __name__ == '__main__':
    app.run(debug=True)

Юрген Стрыдом
источник
Когда я использую этот код, у меня появляется ошибка werkzeug.routing.BuildError: не удалось создать URL-адрес для конечной точки gettaskstatus со значениями ['task_id'] Я что-то упустил?
Николя Дюфаур,
15

Вы также можете попробовать использовать multiprocessing.Processwith daemon=True; process.start()метод не блокирует и вы можете вернуть ответ / статус сразу вызывающий абонент в то время как ваши дорогие функции выполняются в фоновом режиме.

У меня возникла аналогичная проблема при работе с фреймворком Falcon, и использование daemonпроцесса помогло.

Вам нужно будет сделать следующее:

from multiprocessing import Process

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    heavy_process = Process(  # Create a daemonic process with heavy "my_func"
        target=my_func,
        daemon=True
    )
    heavy_process.start()
    return Response(
        mimetype='application/json',
        status=200
    )

# Define some heavy function
def my_func():
    time.sleep(10)
    print("Process finished")

Вы должны получить ответ немедленно, а через 10 секунд вы должны увидеть распечатанное сообщение в консоли.

ПРИМЕЧАНИЕ. Помните, что daemonicпроцессам не разрешено порождать дочерние процессы.

Томаш Бартковяк
источник
асинхронный - это определенный тип параллелизма, который не является ни поточным, ни многопроцессорным. Однако
многопоточность
5
Я не понимаю твою точку зрения. Автор говорит об асинхронной задаче, которая выполняется «в фоновом режиме», так что вызывающая сторона не блокируется, пока не получит ответ. Создание процесса демона - пример того, где может быть достигнут такой асинхронизм.
Томаш Бартковяк,
что, если /render/<id>конечная точка чего-то ожидает в результате my_func()?
Уилл Гу
my_funcНапример, вы можете отправить ответ / сердцебиение на другую конечную точку. Или вы можете создать и совместно использовать некоторую очередь сообщений, с которой вы можете общатьсяmy_func
Томаш Бартковяк,