Я пишу приложение на Flask, которое работает очень хорошо, за исключением того, что WSGI
является синхронным и блокирующим. В частности, у меня есть одна задача, которая обращается к стороннему API, и выполнение этой задачи может занять несколько минут. Я хотел бы сделать этот звонок (на самом деле это серия звонков) и позволить ему работать. при этом управление возвращается к Flask.
Моя точка зрения выглядит так:
@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
...
data = json.loads(request.data)
text_list = data.get('text_list')
final_file = audio_class.render_audio(data=text_list)
# do stuff
return Response(
mimetype='application/json',
status=200
)
Теперь то, что я хочу сделать, это иметь линию
final_file = audio_class.render_audio()
run и предоставить обратный вызов, который будет выполняться при возврате метода, в то время как Flask может продолжать обрабатывать запросы. Это единственная задача, которую мне нужно, чтобы Flask выполнялся асинхронно, и я хотел бы получить совет, как лучше всего это реализовать.
Я посмотрел на Twisted и Klein, но не уверен, что они излишни, так как, возможно, Threading будет достаточно. Или, может быть, сельдерей для этого подойдет?
источник
Ответы:
Я бы использовал Celery для обработки асинхронной задачи за вас. Вам нужно будет установить брокера, который будет служить вашей очередью задач (рекомендуется RabbitMQ и Redis).
app.py
:from flask import Flask from celery import Celery broker_url = 'amqp://guest@localhost' # Broker URL for RabbitMQ task queue app = Flask(__name__) celery = Celery(app.name, broker=broker_url) celery.config_from_object('celeryconfig') # Your celery configurations in a celeryconfig.py @celery.task(bind=True) def some_long_task(self, x, y): # Do some long task ... @app.route('/render/<id>', methods=['POST']) def render_script(id=None): ... data = json.loads(request.data) text_list = data.get('text_list') final_file = audio_class.render_audio(data=text_list) some_long_task.delay(x, y) # Call your async task and pass whatever necessary variables return Response( mimetype='application/json', status=200 )
Запустите приложение Flask и запустите другой процесс для запуска вашего сельдерея.
Я хотел бы также обратиться к Miguel Gringberg в подправить для более подробного руководства по использованию сельдерея с Колбой.
источник
Еще одно возможное решение - многопоточность. Хотя решение на основе Celery лучше для масштабируемых приложений, если вы не ожидаете слишком большого трафика на рассматриваемой конечной точке, потоки - жизнеспособная альтернатива.
Это решение основано на презентации PyCon 2016 Flask at Scale Мигеля Гринберга , в частности, на слайде 41 в его колоде слайдов. Его код также доступен на github для тех, кто интересуется первоисточником.
С точки зрения пользователя код работает следующим образом:
Чтобы преобразовать вызов api в фоновую задачу, просто добавьте декоратор @async_api.
Вот полный пример:
from flask import Flask, g, abort, current_app, request, url_for from werkzeug.exceptions import HTTPException, InternalServerError from flask_restful import Resource, Api from datetime import datetime from functools import wraps import threading import time import uuid tasks = {} app = Flask(__name__) api = Api(app) @app.before_first_request def before_first_request(): """Start a background thread that cleans up old tasks.""" def clean_old_tasks(): """ This function cleans up old tasks from our in-memory data structure. """ global tasks while True: # Only keep tasks that are running or that finished less than 5 # minutes ago. five_min_ago = datetime.timestamp(datetime.utcnow()) - 5 * 60 tasks = {task_id: task for task_id, task in tasks.items() if 'completion_timestamp' not in task or task['completion_timestamp'] > five_min_ago} time.sleep(60) if not current_app.config['TESTING']: thread = threading.Thread(target=clean_old_tasks) thread.start() def async_api(wrapped_function): @wraps(wrapped_function) def new_function(*args, **kwargs): def task_call(flask_app, environ): # Create a request context similar to that of the original request # so that the task can have access to flask.g, flask.request, etc. with flask_app.request_context(environ): try: tasks[task_id]['return_value'] = wrapped_function(*args, **kwargs) except HTTPException as e: tasks[task_id]['return_value'] = current_app.handle_http_exception(e) except Exception as e: # The function raised an exception, so we set a 500 error tasks[task_id]['return_value'] = InternalServerError() if current_app.debug: # We want to find out if something happened so reraise raise finally: # We record the time of the response, to help in garbage # collecting old tasks tasks[task_id]['completion_timestamp'] = datetime.timestamp(datetime.utcnow()) # close the database session (if any) # Assign an id to the asynchronous task task_id = uuid.uuid4().hex # Record the task, and then launch it tasks[task_id] = {'task_thread': threading.Thread( target=task_call, args=(current_app._get_current_object(), request.environ))} tasks[task_id]['task_thread'].start() # Return a 202 response, with a link that the client can use to # obtain task status print(url_for('gettaskstatus', task_id=task_id)) return 'accepted', 202, {'Location': url_for('gettaskstatus', task_id=task_id)} return new_function class GetTaskStatus(Resource): def get(self, task_id): """ Return status about an asynchronous task. If this request returns a 202 status code, it means that task hasn't finished yet. Else, the response from the task is returned. """ task = tasks.get(task_id) if task is None: abort(404) if 'return_value' not in task: return '', 202, {'Location': url_for('gettaskstatus', task_id=task_id)} return task['return_value'] class CatchAll(Resource): @async_api def get(self, path=''): # perform some intensive processing print("starting processing task, path: '%s'" % path) time.sleep(10) print("completed processing task, path: '%s'" % path) return f'The answer is: {path}' api.add_resource(CatchAll, '/<path:path>', '/') api.add_resource(GetTaskStatus, '/status/<task_id>') if __name__ == '__main__': app.run(debug=True)
источник
Вы также можете попробовать использовать
multiprocessing.Process
withdaemon=True
;process.start()
метод не блокирует и вы можете вернуть ответ / статус сразу вызывающий абонент в то время как ваши дорогие функции выполняются в фоновом режиме.У меня возникла аналогичная проблема при работе с фреймворком Falcon, и использование
daemon
процесса помогло.Вам нужно будет сделать следующее:
from multiprocessing import Process @app.route('/render/<id>', methods=['POST']) def render_script(id=None): ... heavy_process = Process( # Create a daemonic process with heavy "my_func" target=my_func, daemon=True ) heavy_process.start() return Response( mimetype='application/json', status=200 ) # Define some heavy function def my_func(): time.sleep(10) print("Process finished")
Вы должны получить ответ немедленно, а через 10 секунд вы должны увидеть распечатанное сообщение в консоли.
ПРИМЕЧАНИЕ. Помните, что
daemonic
процессам не разрешено порождать дочерние процессы.источник
/render/<id>
конечная точка чего-то ожидает в результатеmy_func()
?my_func
Например, вы можете отправить ответ / сердцебиение на другую конечную точку. Или вы можете создать и совместно использовать некоторую очередь сообщений, с которой вы можете общатьсяmy_func