Встроенный итератор / генератор SqlAlchemy с эффективным использованием памяти?

91

У меня есть таблица MySQL с ~ 10M записями, с которой я взаимодействую с помощью SqlAlchemy. Я обнаружил, что запросы к большим подмножествам этой таблицы будут потреблять слишком много памяти, хотя я думал, что использую встроенный генератор, который интеллектуально извлекает небольшие фрагменты набора данных:

for thing in session.query(Things):
    analyze(thing)

Чтобы избежать этого, я считаю, что мне нужно создать свой собственный итератор, который откусывает кусками:

lastThingID = None
while True:
    things = query.filter(Thing.id < lastThingID).limit(querySize).all()
    if not rows or len(rows) == 0: 
        break
    for thing in things:
        lastThingID = row.id
        analyze(thing)

Это нормально или мне чего-то не хватает в отношении встроенных генераторов SA?

Ответ на этот вопрос, похоже, указывает на то, что потребление памяти не ожидается.

Павел
источник
У меня что-то очень похожее, разве что уступает "вещь". Работает лучше, чем все другие решения
iElectric
2
Разве это не Thing.id> lastThingID? А что такое «ряды»?
synergetic

Ответы:

118

Большинство реализаций DBAPI полностью буферизуют строки по мере их извлечения - поэтому обычно, прежде чем SQLAlchemy ORM даже получит один результат, весь набор результатов находится в памяти.

Но тогда способ Queryработы таков, что он полностью загружает данный набор результатов по умолчанию перед возвратом вам ваших объектов. Обоснование здесь касается запросов, которые представляют собой нечто большее, чем простые операторы SELECT. Например, в соединениях с другими таблицами, которые могут возвращать один и тот же идентификатор объекта несколько раз в одном наборе результатов (обычно при активной загрузке), полный набор строк должен находиться в памяти, чтобы можно было возвращать правильные результаты, в противном случае коллекции и т. может быть заселен только частично.

Так что Queryпредлагает возможность изменить это поведение через yield_per(). Этот вызов приведет к тому, что объект будет Queryвыдавать строки в пакетах, в которых вы указываете размер пакета. Как указано в документации, это уместно только в том случае, если вы не выполняете какую-либо активную загрузку коллекций, так что в основном, если вы действительно знаете, что делаете. Кроме того, если нижележащий DBAPI предварительно буферизует строки, эти накладные расходы на память все равно будут, поэтому подход масштабируется лишь немного лучше, чем его не использовать.

Я почти никогда не использую yield_per(); вместо этого я использую лучшую версию подхода LIMIT, который вы предлагаете выше, с использованием оконных функций. LIMIT и OFFSET имеют огромную проблему, заключающуюся в том, что очень большие значения OFFSET заставляют запрос становиться все медленнее и медленнее, поскольку OFFSET N заставляет его пролистывать N строк - это все равно, что выполнять один и тот же запрос пятьдесят раз вместо одного, каждый раз читая все большее и большее количество рядов. Используя подход с оконной функцией, я предварительно выбираю набор «оконных» значений, которые относятся к фрагментам таблицы, которые я хочу выбрать. Затем я испускаю отдельные операторы SELECT, которые каждый раз извлекают из одного из этих окон.

Подход с оконными функциями есть в вики, и я использую его с большим успехом.

Также обратите внимание: не все базы данных поддерживают оконные функции; вам нужен Postgresql, Oracle или SQL Server. ИМХО, использование хотя бы Postgresql того стоит - если вы используете реляционную базу данных, вы можете использовать лучшее.

Zzzeek
источник
Вы упомянули, что Query создает все для сравнения идентичностей. Можно ли этого избежать путем сортировки по первичному ключу и сравнения только последовательных результатов?
Tobu
проблема в том, что если вы передаете экземпляр с идентификатором X, приложение получает его, а затем принимает решения на основе этого объекта и, возможно, даже изменяет его. Позже, возможно (на самом деле обычно), даже в следующей строке, та же идентичность возвращается в результате, возможно, чтобы добавить больше содержимого в свои коллекции. Таким образом, приложение получило объект в незавершенном состоянии. сортировка здесь не помогает, потому что самая большая проблема - это работа с нетерпеливой загрузкой - и «объединенная», и «подзапросная» загрузка имеют разные проблемы.
zzzeek
Я понял, что «следующая строка обновляет коллекции», и в этом случае вам нужно только заглянуть вперед на одну строку db, чтобы узнать, когда коллекции будут завершены. Реализация активной загрузки должна взаимодействовать с сортировкой, чтобы обновления коллекции всегда выполнялись в соседних строках.
Tobu
опция yield_per () всегда доступна, когда вы уверены, что отправляемый вами запрос совместим с доставкой частичных наборов результатов. Я потратил марафон на несколько дней, пытаясь включить это поведение во всех случаях, всегда оставались неясными, то есть до тех пор, пока ваша программа не использовала один из них, края, которые терпели неудачу. В частности, нельзя полагаться на заказ. Как всегда, я приветствую фактический вклад в код.
zzzeek
1
Поскольку я использую postgres, похоже, можно использовать транзакцию Repeatable Read только для чтения и запускать все оконные запросы в этой транзакции.
schatten
25

Я не эксперт по базам данных, но при использовании SQLAlchemy в качестве простого уровня абстракции Python (т.е. без использования объекта ORM Query) я нашел удовлетворительное решение для запроса таблицы из 300 миллионов строк без увеличения использования памяти ...

Вот фиктивный пример:

from sqlalchemy import create_engine, select

conn = create_engine("DB URL...").connect()
q = select([huge_table])

proxy = conn.execution_options(stream_results=True).execute(q)

Затем я использую fetchmany()метод SQLAlchemy для перебора результатов в бесконечном whileцикле:

while 'batch not empty':  # equivalent of 'while True', but clearer
    batch = proxy.fetchmany(100000)  # 100,000 rows at a time

    if not batch:
        break

    for row in batch:
        # Do your stuff here...

proxy.close()

Этот метод позволил мне выполнять все виды агрегирования данных без каких-либо опасных накладных расходов на память.

NOTE stream_resultsработы с Postgres и pyscopg2адаптером, но я предполагаю , что это не будет работать с любой DBAPI, ни с каким - либо драйвером базы данных ...

В этом сообщении блога есть интересный пример использования, который вдохновил мой вышеупомянутый метод.

Эдуардтерон
источник
1
Если кто-то работает с postgres или mysql (with pymysql), это должен быть принятый ответ IMHO.
Юки Иноуэ
1
Спас мне жизнь, мои запросы выполнялись все медленнее и медленнее. Я использовал описанное выше на pyodbc (с сервера sql на postgres), и он работает как мечта.
Эд Бейкер
Для меня это был лучший подход. Поскольку я использую ORM, мне нужно было скомпилировать SQL для моего диалекта (Postgres), а затем выполнить его непосредственно из соединения (а не из сеанса), как показано выше. Компиляция "как" я нашел в этом другом вопросе stackoverflow.com/questions/4617291 . Увеличение скорости было большим. Переход с JOINS на SUBQUERIES также привел к значительному увеличению производительности. Также рекомендую использовать sqlalchemy_mixins, использование smart_query очень помогло построить наиболее эффективный запрос. github.com/absent1706/sqlalchemy-mixins
Густаво Гонсалвес
14

Я изучал эффективный обход / разбиение на страницы с помощью SQLAlchemy и хотел бы обновить этот ответ.

Я думаю, вы можете использовать вызов среза, чтобы правильно ограничить объем запроса, и вы можете эффективно использовать его повторно.

Пример:

window_size = 10  # or whatever limit you like
window_idx = 0
while True:
    start,stop = window_size*window_idx, window_size*(window_idx+1)
    things = query.slice(start, stop).all()
    if things is None:
        break
    for thing in things:
        analyze(thing)
    if len(things) < window_size:
        break
    window_idx += 1
Джоэл
источник
Это кажется очень простым и быстрым. Я не уверен, что .all()это необходимо. Я замечаю, что после первого звонка скорость значительно улучшилась.
hamx0r 09
@ hamx0r Я понимаю, что это старый комментарий, поэтому оставлю его потомкам. .all()Запрос без переменной things не поддерживает len ()
Дэвид
9

В духе ответа Джоэла я использую следующее:

WINDOW_SIZE = 1000
def qgen(query):
    start = 0
    while True:
        stop = start + WINDOW_SIZE
        things = query.slice(start, stop).all()
        if len(things) == 0:
            break
        for thing in things:
            yield thing
        start += WINDOW_SIZE
Пьетро Баттистон
источник
things = query.slice (start, stop) .all () вернет [] в конце, а цикл while никогда не прервется
Мартин Регули
4

Использование LIMIT / OFFSET - это плохо, потому что вам нужно найти все столбцы {OFFSET} раньше, поэтому чем больше OFFSET, тем длиннее запрос вы получите. Использование оконного запроса для меня также дает плохие результаты для большой таблицы с большим объемом данных (вы слишком долго ждете первых результатов, что в моем случае для фрагментированного веб-ответа не годится).

Лучший подход, приведенный здесь, https://stackoverflow.com/a/27169302/450103 . В моем случае я решил проблему, просто используя индекс в поле datetime и получая следующий запрос с datetime> = previous_datetime. Глупо, потому что раньше я использовал этот индекс в разных случаях, но подумал, что для получения всех данных лучше использовать оконный запрос. В моем случае я ошибался.

Виктор Гавро
источник
3

AFAIK, первый вариант по-прежнему получает все кортежи из таблицы (с одним SQL-запросом), но при итерации создает представление ORM для каждой сущности. Таким образом, это более эффективно, чем создание списка всех сущностей перед итерацией, но вам все равно нужно получить все (необработанные) данные в память.

Таким образом, использование LIMIT на огромных таблицах кажется мне хорошей идеей.

Панкрат
источник