Массовая вставка с помощью SQLAlchemy ORM

132

Есть ли способ заставить SQLAlchemy выполнять массовую вставку вместо вставки каждого отдельного объекта. т.е.

делать:

INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)

скорее, чем:

INSERT INTO `foo` (`bar`) VALUES (1)
INSERT INTO `foo` (`bar`) VALUES (2)
INSERT INTO `foo` (`bar`) VALUES (3)

Я только что преобразовал некоторый код для использования sqlalchemy, а не raw sql, и хотя теперь работать с ним намного приятнее, теперь он кажется медленнее (до 10 раз), мне интересно, является ли это причиной.

Может быть, я смогу улучшить ситуацию с помощью сессий более эффективно. На данный момент у меня есть autoCommit=Falseи делаю session.commit()после того, как добавлю кое-что. Хотя это, кажется, приводит к тому, что данные становятся устаревшими, если БД изменена в другом месте, например, даже если я сделаю новый запрос, я все равно верну старые результаты?

Спасибо за вашу помощь!

Ник Холден
источник
1
Это может помочь: stackoverflow.com/questions/270879/…
Шон Виейра,
1
Ник, я так понимаю, это очень старый пост. Можно ли обновить заголовок на что-нибудь правильное, например «вставка нескольких записей с помощью SQLAlchemy ORM». Операторы вставки с несколькими записями, подобные предоставленному вами, сильно отличаются от операций массовой загрузки на уровне базы данных. Массовые вставки предназначены для загрузки более 1k данных, обычно из больших наборов данных и выполняются менеджерами приложений, а не операциями REST или кодом уровня приложения ... Давайте правильно воспользуемся нашей номенклатурой.
W4t3randWind 09
Для тех, кто наткнулся на этот вопрос при поиске информации о массовых операциях в sqlalchemy Core (не ORM), см. Мой ответ на другой вопрос .
Николай

Ответы:

176

SQLAlchemy представил это в версии 1.0.0:

Массовые операции - документация SQLAlchemy

С помощью этих операций теперь вы можете выполнять массовую вставку или обновление!

Например, вы можете:

s = Session()
objects = [
    User(name="u1"),
    User(name="u2"),
    User(name="u3")
]
s.bulk_save_objects(objects)
s.commit()

Здесь будет сделана объемная вставка.

пьер
источник
30
Вам также понадобится s.commit () для фактического сохранения записей (мне потребовалось немного времени, чтобы понять это).
horcle_buzz
3
Я пробовал это с sqlachemy 1.0.11, и он по-прежнему делает 3 оператора вставки. Но это намного быстрее, чем обычные операции.
zidarsk8 07
3
хотя это не относится к вопросу OP, стоит упомянуть, что это нарушает некоторые функции ORM. docs.sqlalchemy.org/en/rel_1_0/orm/…
dangel
@dangel да, спасибо, что разместили это. Хотя заголовок OP касается «массовой загрузки», его вопрос об операторах вставки нескольких записей не имеет ничего общего с функцией массовой загрузки sqlalchemy.
W4t3randWind 09
По сравнению со вставкой тех же данных из CSV с \copyпомощью psql (от того же клиента на тот же сервер) я вижу огромную разницу в производительности на стороне сервера, что приводит к примерно в 10 раз большему количеству вставок / с. По-видимому, массовая загрузка с использованием \copy(или COPYна сервере) с использованием упаковки при обмене данными от клиента к серверу намного лучше, чем использование SQL через SQLAlchemy. Дополнительная информация: Большая объемная вставка разница в производительности PostgreSQL против ... .
gertvdijk
42

В документации sqlalchemy есть описание производительности различных методов, которые можно использовать для массовых вставок:

ORM в основном не предназначены для высокопроизводительных массовых вставок - это вся причина, по которой SQLAlchemy предлагает Core в дополнение к ORM в качестве первоклассного компонента.

В случае использования быстрой массовой вставки система генерации и выполнения SQL, которую ORM строит поверх, является частью ядра. Используя эту систему напрямую, мы можем создать INSERT, который конкурирует с прямым использованием необработанного API базы данных.

В качестве альтернативы SQLAlchemy ORM предлагает набор методов массовых операций, которые обеспечивают привязки к подразделам рабочего процесса для создания конструкций INSERT и UPDATE на уровне ядра с небольшой степенью автоматизации на основе ORM.

В приведенном ниже примере показаны основанные на времени тесты для нескольких различных методов вставки строк, начиная от наиболее автоматизированного до наименее автоматизированного. В cPython 2.7 наблюдаемое время выполнения:

classics-MacBook-Pro:sqlalchemy classic$ python test.py
SQLAlchemy ORM: Total time for 100000 records 12.0471920967 secs
SQLAlchemy ORM pk given: Total time for 100000 records 7.06283402443 secs
SQLAlchemy ORM bulk_save_objects(): Total time for 100000 records 0.856323003769 secs
SQLAlchemy Core: Total time for 100000 records 0.485800027847 secs
sqlite3: Total time for 100000 records 0.487842082977 sec

Автор сценария:

import time
import sqlite3

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String,  create_engine
from sqlalchemy.orm import scoped_session, sessionmaker

Base = declarative_base()
DBSession = scoped_session(sessionmaker())
engine = None


class Customer(Base):
    __tablename__ = "customer"
    id = Column(Integer, primary_key=True)
    name = Column(String(255))


def init_sqlalchemy(dbname='sqlite:///sqlalchemy.db'):
    global engine
    engine = create_engine(dbname, echo=False)
    DBSession.remove()
    DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False)
    Base.metadata.drop_all(engine)
    Base.metadata.create_all(engine)


def test_sqlalchemy_orm(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    for i in xrange(n):
        customer = Customer()
        customer.name = 'NAME ' + str(i)
        DBSession.add(customer)
        if i % 1000 == 0:
            DBSession.flush()
    DBSession.commit()
    print(
        "SQLAlchemy ORM: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_orm_pk_given(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    for i in xrange(n):
        customer = Customer(id=i+1, name="NAME " + str(i))
        DBSession.add(customer)
        if i % 1000 == 0:
            DBSession.flush()
    DBSession.commit()
    print(
        "SQLAlchemy ORM pk given: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_orm_bulk_insert(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    n1 = n
    while n1 > 0:
        n1 = n1 - 10000
        DBSession.bulk_insert_mappings(
            Customer,
            [
                dict(name="NAME " + str(i))
                for i in xrange(min(10000, n1))
            ]
        )
    DBSession.commit()
    print(
        "SQLAlchemy ORM bulk_save_objects(): Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_core(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    engine.execute(
        Customer.__table__.insert(),
        [{"name": 'NAME ' + str(i)} for i in xrange(n)]
    )
    print(
        "SQLAlchemy Core: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def init_sqlite3(dbname):
    conn = sqlite3.connect(dbname)
    c = conn.cursor()
    c.execute("DROP TABLE IF EXISTS customer")
    c.execute(
        "CREATE TABLE customer (id INTEGER NOT NULL, "
        "name VARCHAR(255), PRIMARY KEY(id))")
    conn.commit()
    return conn


def test_sqlite3(n=100000, dbname='sqlite3.db'):
    conn = init_sqlite3(dbname)
    c = conn.cursor()
    t0 = time.time()
    for i in xrange(n):
        row = ('NAME ' + str(i),)
        c.execute("INSERT INTO customer (name) VALUES (?)", row)
    conn.commit()
    print(
        "sqlite3: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " sec")

if __name__ == '__main__':
    test_sqlalchemy_orm(100000)
    test_sqlalchemy_orm_pk_given(100000)
    test_sqlalchemy_orm_bulk_insert(100000)
    test_sqlalchemy_core(100000)
    test_sqlite3(100000)
Грант Хамфрис
источник
1
Спасибо. Действительно полезно и тщательно.
Стив Б.
Я видел другой пример с использованием bindparams. Синтаксис выглядит лаконичным, это хорошо?
Джей
35

Насколько мне известно, нет способа заставить ORM выполнять массовые вставки. Я считаю, что основная причина в том, что SQLAlchemy необходимо отслеживать идентичность каждого объекта (то есть новые первичные ключи), а массовая вставка мешает этому. Например, предположим, что ваша fooтаблица содержит idстолбец и сопоставлена ​​с Fooклассом:

x = Foo(bar=1)
print x.id
# None
session.add(x)
session.flush()
# BEGIN
# INSERT INTO foo (bar) VALUES(1)
# COMMIT
print x.id
# 1

Поскольку SQLAlchemy получил значение для x.idбез выдачи другого запроса, мы можем сделать вывод, что он получил значение непосредственно из INSERTоператора. Если вам не нужен последующий доступ к созданным объектам через те же экземпляры, вы можете пропустить слой ORM для своей вставки:

Foo.__table__.insert().execute([{'bar': 1}, {'bar': 2}, {'bar': 3}])
# INSERT INTO foo (bar) VALUES ((1,), (2,), (3,))

SQLAlchemy не может сопоставить эти новые строки с какими-либо существующими объектами, поэтому вам придется запрашивать их заново для любых последующих операций.

Что касается устаревших данных, полезно помнить, что сеанс не имеет встроенного способа узнать, когда база данных изменяется вне сеанса. Чтобы получить доступ к измененным извне данным через существующие экземпляры, экземпляры должны быть помечены как просроченные . По умолчанию это происходит session.commit(), но это можно сделать вручную, позвонив session.expire_all()или session.expire(instance). Пример (SQL опущен):

x = Foo(bar=1)
session.add(x)
session.commit()
print x.bar
# 1
foo.update().execute(bar=42)
print x.bar
# 1
session.expire(x)
print x.bar
# 42

session.commit()истекает x, поэтому первый оператор печати неявно открывает новую транзакцию и повторно запрашивает xатрибуты. Если вы закомментируете первый оператор печати, вы заметите, что второй теперь принимает правильное значение, потому что новый запрос не создается до тех пор, пока не будет выполнено обновление.

Это имеет смысл с точки зрения изоляции транзакций - вы должны учитывать только внешние модификации между транзакциями. Если это вызывает у вас проблемы, я бы посоветовал уточнить или переосмыслить границы транзакций вашего приложения вместо того, чтобы немедленно достигать их session.expire_all().

dhaffey
источник
Спасибо за ответ, я попробую. WRT истекающая проблема, то, что я видел, было не совсем то же самое. Я использую сессию с ограниченным обзором в турбогенераторах. Выполнение getSession (). Query (Foo) .filter .... all () возвращало разные вещи в зависимости от запроса, а также не возвращало обновленные записи, которые были в db, пока я не перезапустил его. Я исправил эту проблему, выполнив autocommit = True и добавив что-то, что .remove () d сеанс после завершения запроса (я так понимаю, вы все равно должны это сделать).
Ник Холден
Я предполагаю, что он возвращал разные вещи в зависимости от запроса, потому что у него был сеанс с ограниченной областью для каждого потока в пуле, и сеансы были в разных состояниях? Хотя это казалось немного странным, что sa не получит новые данные после нового запроса. Полагаю, я неправильно понимаю, что делает autocommit = False,
Ник Холден
С autocommit=False, я считаю , вы должны называть session.commit()по завершению запроса (я не знаком с TurboGears, так что игнорировать это , если это обрабатывается для вас на уровне каркаса). Помимо проверки внесения ваших изменений в базу данных, это приведет к истечению срока действия всего в сеансе. Следующая транзакция не начнется до следующего использования этого сеанса, поэтому будущие запросы в том же потоке не увидят устаревшие данные.
dhaffey
10
Альтернативный стиль:session.execute(Foo.__table__.insert(), values)
Joril 03
6
Обратите внимание, что в более новых версиях sqlalchemy есть возможности массовой вставки: docs.sqlalchemy.org/en/latest/orm/…
Уэйн Вернер,
18

Обычно я использую add_all.

from app import session
from models import User

objects = [User(name="u1"), User(name="u2"), User(name="u3")]
session.add_all(objects)
session.commit()
reubano
источник
2
Вы уверены, что это работает? Это не просто эквивалентно .addих подключению к сеансу по одному?
Алек
Это было бы нелогично, учитывая имя метода, документация не вдавалась в подробности: есть Add the given collection of instances to this Session.ли у вас основания полагать, что он не выполняет массовую вставку?
reubano
3
Я не думаю, что это слишком нелогично - на самом деле он добавляет все, о чем вы просите. Кажется, что ничего о добавлении всего в сеанс не подразумевает, какие базовые операторы SQL будут выданы. Глядя на источник: github.com/zzzeek/sqlalchemy/blob/… на самом деле кажется, что .addкаждый элемент отдельно.
Алек
Он работает хорошо, по сравнению bulk_save_objects()с a flush(), мы можем получить идентификатор объекта, но bulk_save_objects()не можем (событие с flush()called).
coanor
14

Прямая поддержка была добавлена ​​в SQLAlchemy с версии 0.8

Согласно документам , connection.execute(table.insert().values(data))должно сработать. (Обратите внимание, что это не то же самое, connection.execute(table.insert(), data)что приводит к множеству вставок отдельных строк через вызов executemany). При любом подключении, кроме локального, разница в производительности может быть огромной.

user3805082
источник
Не могли бы вы уточнить, какой из них более производительный?
Джейкоб Ли
10

SQLAlchemy представил это в версии 1.0.0:

Массовые операции - документация SQLAlchemy

С помощью этих операций теперь вы можете выполнять массовую вставку или обновление!

Например (если вам нужны минимальные накладные расходы для простых таблиц INSERT), вы можете использовать Session.bulk_insert_mappings():

loadme = [(1, 'a'),
          (2, 'b'),
          (3, 'c')]
dicts = [dict(bar=t[0], fly=t[1]) for t in loadme]

s = Session()
s.bulk_insert_mappings(Foo, dicts)
s.commit()

Или, если хотите, пропустите loadmeкортежи и напишите словари прямо в них dicts(но мне легче убрать всю многословность из данных и загрузить список словарей в цикле).

juanitogan
источник
7

Ответ Пьера правильный, но одна проблема заключается в том, что bulk_save_objectsпо умолчанию не возвращаются первичные ключи объектов, если это вас беспокоит. Установите, return_defaultsчтобы Trueполучить такое поведение.

Документация здесь .

foos = [Foo(bar='a',), Foo(bar='b'), Foo(bar='c')]
session.bulk_save_objects(foos, return_defaults=True)
for foo in foos:
    assert foo.id is not None
session.commit()
Мэтью Мойзен
источник
2
Следует соблюдать осторожность с флагом. Он будет вставлять по одному объекту последовательно, и значительного прироста производительности может не быть [1]. В моем случае производительность снизилась, как я подозревал, из-за накладных расходов. [1]: docs.sqlalchemy.org/en/13/orm/…
dhfromkorea
6

Все дороги ведут в Рим , но некоторые из них пересекают горы, требуются паромы, но если вы хотите добраться туда быстро, просто следуйте по автомагистрали.


В этом случае автомагистраль должна использовать функцию execute_batch () psycopg2 . Документация говорит об этом лучше всего:

Текущая реализация executemany()(с использованием крайне милосердных преуменьшений) не особенно эффективна. Эти функции можно использовать для ускорения повторного выполнения оператора с набором параметров. За счет уменьшения количества обращений к серверу производительность может быть на несколько порядков выше, чем при использовании executemany().

В моем тесте execute_batch()это примерно в два раза быстрее , как executemany()и дает возможность конфигурировать PAGE_SIZE для дальнейшей настройки (если вы хотите выжать последние 2-3% производительность из драйвера).

Эту же функцию можно легко включить, если вы используете SQLAlchemy, установив use_batch_mode=Trueв качестве параметра при создании экземпляра движка с помощьюcreate_engine()

chjortlund
источник
Примечание: при массовых вставках psycopg2 execute_valuesработает быстрее, чем psycopg2 execute_batch!
Fierr
5

Это способ:

values = [1, 2, 3]
Foo.__table__.insert().execute([{'bar': x} for x in values])

Это будет вставлено так:

INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)

Ссылка: SQLAlchemy FAQ включает тесты для различных методов фиксации.

Eefret
источник
3

Лучший ответ, который я нашел до сих пор, был в документации sqlalchemy:

http://docs.sqlalchemy.org/en/latest/faq/performance.html#im-inserting-400-000-rows-with-the-orm-and-it-s-really-slow

Есть полный пример теста возможных решений.

Как показано в документации:

bulk_save_objects - не лучшее решение, но его производительность правильная.

Вторая лучшая реализация с точки зрения удобочитаемости, я думаю, была с ядром SQLAlchemy:

def test_sqlalchemy_core(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    engine.execute(
        Customer.__table__.insert(),
            [{"name": 'NAME ' + str(i)} for i in xrange(n)]
    )

Контекст этой функции приведен в статье документации.

lelabo_m
источник