Как сделать UPSERT (MERGE, INSERT ... ON DUPLICATE UPDATE) в PostgreSQL?

268

Очень часто задаваемый вопрос здесь - как сделать upsert, что вызывает MySQL INSERT ... ON DUPLICATE UPDATEи поддерживает стандарт как часть MERGEоперации.

Учитывая, что PostgreSQL не поддерживает его напрямую (до pg 9.5), как вы это делаете? Учтите следующее:

CREATE TABLE testtable (
    id integer PRIMARY KEY,
    somedata text NOT NULL
);

INSERT INTO testtable (id, somedata) VALUES
(1, 'fred'),
(2, 'bob');

Теперь представьте , что вы хотите «upsert» кортежи (2, 'Joe'), (3, 'Alan')так что новое содержимое таблицы будет выглядеть так :

(1, 'fred'),
(2, 'Joe'),    -- Changed value of existing tuple
(3, 'Alan')    -- Added new tuple

Это то, о чем говорят люди, когда обсуждают upsert. Важно отметить, что любой подход должен быть безопасным при наличии нескольких транзакций, работающих на одной и той же таблице - либо с помощью явной блокировки, либо иным образом защищаясь от возникающих условий гонки.

Эта тема широко обсуждается на Вставке, на дубликате обновления в PostgreSQL? , но это об альтернативах синтаксису MySQL, и с течением времени он вырос довольно много не связанных деталей. Я работаю над окончательными ответами.

Эти методы также полезны для «вставить, если не существует, в противном случае ничего не делать», то есть «вставить ... при игнорировании дубликата ключа».

Крейг Рингер
источник
1
возможный дубликат вставки, при дублировании обновления в PostgreSQL?
Майкл Хэмптон
8
@MichaelHampton цель здесь состояла в том, чтобы создать окончательную версию, которая не смущена множеством устаревших ответов - и заблокирована, так что никто не может ничего с этим поделать. Я не согласен с closevote.
Крейг Рингер
Почему, тогда это скоро станет устаревшим - и заблокированным, так что никто ничего не сможет с этим поделать.
Майкл Хэмптон
2
@MichaelHampton Если вы обеспокоены, возможно, вы могли бы отметить тот, с которым вы связаны, и попросить разблокировать его, чтобы его можно было почистить, тогда мы можем объединить это. Мне просто надоело иметь единственное очевидное as-dup для upsert, такой запутанный и неправильный беспорядок.
Крейг Рингер
1
Этот Q & A не заблокирован!
Майкл Хэмптон

Ответы:

396

9.5 и новее:

PostgreSQL 9.5 и более новая поддержка INSERT ... ON CONFLICT UPDATEON CONFLICT DO NOTHING), т.е. upsert.

Сравнение сON DUPLICATE KEY UPDATE .

Быстрое объяснение .

Для использования см. Руководство - в частности, предложение конфликта_на синтаксической диаграмме и пояснительный текст .

В отличие от решений для 9.4 и старше, приведенных ниже, эта функция работает с несколькими конфликтующими строками и не требует монопольной блокировки или повторного цикла.

Коммит, добавляющий функцию, находится здесь и обсуждение ее разработки здесь. .


Если вы используете 9.5 и не нуждаетесь в обратной совместимости, вы можете прекратить чтение сейчас .


9.4 и старше:

PostgreSQL не имеет встроенного UPSERT(илиMERGE ) средств, и сделать это эффективно при одновременном использовании очень сложно.

Эта статья обсуждает проблему в деталях .

В общем, вы должны выбрать один из двух вариантов:

  • Отдельные операции вставки / обновления в цикле повтора; или
  • Блокировка таблицы и выполнение пакетного слияния

Индивидуальная петля повторения ряда

Использование отдельных загрузчиков строк в цикле повторов является разумным вариантом, если вы хотите, чтобы много соединений одновременно пытались выполнить вставки.

Документация PostgreSQL содержит полезную процедуру, которая позволит вам делать это в цикле внутри базы данных . Это защищает от потерянных обновлений и вставки рас, в отличие от большинства наивных решений. Это будет работать только вREAD COMMITTED режиме и безопасен только в том случае, если это единственное, что вы делаете в транзакции. Функция не будет работать правильно, если триггеры или вторичные уникальные ключи вызывают уникальные нарушения.

Эта стратегия очень неэффективна. Когда это целесообразно, вы должны поставить работу в очередь и выполнить массовую загрузку, как описано ниже.

Многие попытки решить эту проблему не учитывают откатов, поэтому они приводят к неполным обновлениям. Две транзакции мчатся друг с другом; один из них успешно INSERTс; другой получает ошибку дублирующего ключа и UPDATEвместо этого делает . В UPDATEблоках ожидая INSERTоткат или фиксации. Когда он откатывается, UPDATEповторная проверка условия соответствует нулю строк, так что даже еслиUPDATE коммиты на самом деле не выполнили ожидаемый вами переход. Вы должны проверить количество строк результата и при необходимости повторить попытку.

Некоторые попытки решения также не учитывают гонки SELECT. Если вы попробуете очевидное и простое:

-- THIS IS WRONG. DO NOT COPY IT. It's an EXAMPLE.

BEGIN;

UPDATE testtable
SET somedata = 'blah'
WHERE id = 2;

-- Remember, this is WRONG. Do NOT COPY IT.

INSERT INTO testtable (id, somedata)
SELECT 2, 'blah'
WHERE NOT EXISTS (SELECT 1 FROM testtable WHERE testtable.id = 2);

COMMIT;

затем, когда два запускаются одновременно, есть несколько режимов отказа. Одним из них является уже обсуждаемая проблема с перепроверкой обновлений. Другой - где оба UPDATEодновременно совпадают с нулевыми строками и продолжают. Затем они оба делают EXISTSиспытание, которое происходит доINSERT того . Оба получают ноль строк, поэтому оба делают INSERT. Один не удается с ошибкой дубликата ключа.

Вот почему вам нужен повторный цикл. Вы можете подумать, что вы можете предотвратить повторяющиеся ошибки ключа или потерянные обновления с помощью умного SQL, но вы не можете. Вам необходимо проверить количество строк или обработать дубликаты ошибок ключа (в зависимости от выбранного подхода) и повторить попытку.

Пожалуйста, не катите свое собственное решение для этого. Как и в случае с очередями сообщений, это, вероятно, неправильно.

Массовый уперт с замком

Иногда вы хотите выполнить массовое обновление, где у вас есть новый набор данных, который вы хотите объединить в более старый существующий набор данных. Это значительно эффективнее, чем отдельные аппроциклы, и должно быть предпочтительным, когда это целесообразно.

В этом случае вы обычно выполняете следующий процесс:

  • CREATETEMPORARYстол

  • COPY или массово вставить новые данные во временную таблицу

  • LOCKцелевой стол IN EXCLUSIVE MODE. Это разрешает другие транзакции SELECT, но не вносит никаких изменений в таблицу.

  • Сделайте UPDATE ... FROMиз существующих записей, используя значения во временной таблице;

  • Сделайте INSERTиз строк, которые еще не существуют в целевой таблице;

  • COMMIT, открыв замок.

Например, для примера, приведенного в вопросе, используя многозначное значение INSERTдля заполнения временной таблицы:

BEGIN;

CREATE TEMPORARY TABLE newvals(id integer, somedata text);

INSERT INTO newvals(id, somedata) VALUES (2, 'Joe'), (3, 'Alan');

LOCK TABLE testtable IN EXCLUSIVE MODE;

UPDATE testtable
SET somedata = newvals.somedata
FROM newvals
WHERE newvals.id = testtable.id;

INSERT INTO testtable
SELECT newvals.id, newvals.somedata
FROM newvals
LEFT OUTER JOIN testtable ON (testtable.id = newvals.id)
WHERE testtable.id IS NULL;

COMMIT;

Связанное чтение

Что о MERGE ?

SQL-стандарт MERGE самом деле имеет плохо определенную семантику параллелизма и не подходит для загрузки без предварительной блокировки таблицы.

Это действительно полезный оператор OLAP для объединения данных, но на самом деле он не является полезным решением для безопасного параллелизма. Есть много советов людям, использующим другие СУБД для использованияMERGE в upserts, но это на самом деле неправильно.

Другие БД:

Крейг Рингер
источник
В массовом порядке, есть ли возможность удаления из нововведений, а не фильтрации вставки? Например, с upd AS (UPDATE ... RETURNING newvals.id) УДАЛИТЬ ИЗ newvals С помощью upd WHERE newvals.id = upd.id, за которым следует пустая INSERT INTO testtable SELECT * FROM newvals? Моя идея с этим: вместо фильтрации дважды в INSERT (для JOIN / WHERE и для уникального ограничения), повторно используйте результаты проверки существования из UPDATE, которые уже находятся в RAM, и могут быть намного меньше. Это может быть выигрыш, если несколько строк совпадают и / или новые значения намного меньше, чем тестируемый.
Гуннлаугур Брим
1
Есть еще нерешенные проблемы, и для других поставщиков не ясно, что работает, а что нет. 1. Как отмечалось, решение Postgres для циклов не работает в случае нескольких уникальных ключей. 2. На дубликате ключа для MySQL также не работает для нескольких уникальных ключей. 3. Работают ли другие решения для MySQL, SQL Server и Oracle, опубликованные выше? Возможны ли исключения в этих случаях, и нужно ли нам их зацикливать?
Дан б
@danb Это действительно только о PostgreSQL. Не существует кросс-вендорного решения. Решение для PostgreSQL не работает для нескольких строк, к сожалению, вы должны выполнить одну транзакцию на строку. Как MERGEотмечалось выше, «решения», которые используются для SQL Server и Oracle, являются неправильными и подвержены условиям гонки. Вам нужно будет изучить каждую СУБД, чтобы выяснить, как с ними обращаться, я могу только дать совет по PostgreSQL. Единственный способ сделать безопасное многострочное upsert на PostgreSQL - это добавить поддержку основного upsert на главный сервер.
Крейг Рингер
Даже для PostGresQL решение не работает в случае, когда таблица имеет несколько уникальных ключей (обновление только одной строки). В этом случае вам необходимо указать, какой ключ обновляется. Например, может быть решение для нескольких поставщиков, использующее jdbc.
Дан б
2
Postgres теперь поддерживает UPSERT
Крис
32

Я пытаюсь внести свой вклад в другое решение для одной проблемы вставки с версиями PostgreSQL до 9.5. Идея состоит в том, чтобы просто попытаться сначала выполнить вставку, и, если запись уже существует, обновить ее:

do $$
begin 
  insert into testtable(id, somedata) values(2,'Joe');
exception when unique_violation then
  update testtable set somedata = 'Joe' where id = 2;
end $$;

Обратите внимание, что это решение может быть применено, только если нет удаления строк таблицы .

Я не знаю об эффективности этого решения, но оно кажется мне достаточно разумным.

Renzo
источник
3
Спасибо, это именно то, что я искал. Не могу понять, почему это было так трудно найти.
Исапир
4
Ага. Это упрощение работает тогда и только тогда, когда нет удалений.
Крейг Рингер,
@CraigRinger Можете ли вы объяснить, что именно произойдет, если были удалены?
turbanoff
@turbanoff Вставка может завершиться неудачно, поскольку запись уже существует, затем она удаляется одновременно, и обновление затем затрагивает ноль строк, поскольку строка была удалена.
Крейг Рингер
@CraigRinger Так. Удаление происходит одновременно . Каковы возможные outways , если это является отлично работает? Если удаление работает одновременно - тогда оно может быть выполнено сразу после нашего блока. То, что я пытаюсь сказать - если у нас есть одновременное удаление - тогда этот код работает точно так же , как и правильноinsert on update
turbanoff
30

Вот несколько примеров insert ... on conflict ...( стр 9.5+ ):

  • Вставь, на конфликт - ничего не делай .
    insert into dummy(id, name, size) values(1, 'new_name', 3)
    on conflict do nothing;`  
  • Вставить, при конфликте - выполнить обновление , указать цель конфликта через столбец .
    insert into dummy(id, name, size) values(1, 'new_name', 3)
    on conflict(id)
    do update set name = 'new_name', size = 3;  
  • Вставить, при конфликте - выполнить обновление , указать цель конфликта через имя ограничения .
    insert into dummy(id, name, size) values(1, 'new_name', 3)
    on conflict on constraint dummy_pkey
    do update set name = 'new_name', size = 4;
Эрик Ван
источник
отличный ответ - вопрос: почему или в какой ситуации следует использовать целевую спецификацию через имя столбца или ограничения? Есть ли преимущества / недостатки для различных вариантов использования?
Натан Бентон
1
@NathanBenton Я думаю, что есть как минимум 2 различия: (1) имя столбца задается программистом, а имя ограничения может быть либо задано программистом, либо сгенерировано базой данных в соответствии с именами таблиц / столбцов. (2) каждый столбец может иметь несколько ограничений. Тем не менее, это зависит от вашего случая, чтобы выбрать, какой из них использовать.
Эрик Ван
8

Поддержка SQLAlchemy для Postgres> = 9.5

Поскольку большой пост, описанный выше, охватывает множество различных подходов SQL для версий Postgres (не только не-9.5, как в вопросе), я хотел бы добавить, как это сделать в SQLAlchemy, если вы используете Postgres 9.5. Вместо того, чтобы реализовывать свой собственный эффект, вы также можете использовать функции SQLAlchemy (которые были добавлены в SQLAlchemy 1.1). Лично я бы порекомендовал использовать их, если это возможно. Не только из-за удобства, но и потому, что он позволяет PostgreSQL обрабатывать любые условия гонки, которые могут возникнуть.

Перекрестная публикация из другого ответа, который я дал вчера ( https://stackoverflow.com/a/44395983/2156909 )

SQLAlchemy ON CONFLICTтеперь поддерживает два метода on_conflict_do_update()иon_conflict_do_nothing() :

Копирование из документации:

from sqlalchemy.dialects.postgresql import insert

stmt = insert(my_table).values(user_email='a@b.com', data='inserted data')
stmt = stmt.on_conflict_do_update(
    index_elements=[my_table.c.user_email],
    index_where=my_table.c.user_email.like('%@gmail.com'),
    set_=dict(data=stmt.excluded.data)
    )
conn.execute(stmt)

http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html?highlight=conflict#insert-on-conflict-upsert

PR
источник
4
Python и SQLAlchemy не упоминаются в этом вопросе.
Александр Емельянов
Я часто использую Python в решениях, которые я пишу. Но я не изучал SQLAlchemy (или знал об этом). Это кажется элегантным вариантом. Спасибо. Если это подтвердится, я представлю это своей организации.
Роберт
3
WITH UPD AS (UPDATE TEST_TABLE SET SOME_DATA = 'Joe' WHERE ID = 2 
RETURNING ID),
INS AS (SELECT '2', 'Joe' WHERE NOT EXISTS (SELECT * FROM UPD))
INSERT INTO TEST_TABLE(ID, SOME_DATA) SELECT * FROM INS

Протестировано на Postgresql 9.3

Aristar
источник
@CraigRinger: не могли бы вы уточнить это? не атомный?
Париж
2
@parisni Нет. Каждый термин CTE получает свой собственный снимок, если он выполняет запись. Кроме того, нет никакой блокировки предикатов, выполняемой для строк, которые не были найдены, поэтому они все еще могут быть созданы одновременно другим сеансом. Если вы используете SERIALIZABLEизоляцию, вы получите прерывание с ошибкой сериализации, иначе вы, вероятно, получите уникальное нарушение. Не изобретай заново, повторное изобретение будет ошибочным. Использование INSERT ... ON CONFLICT .... Если ваш PostgreSQL слишком старый, обновите его.
Крейг Рингер
@CraigRinger INSERT ... ON CLONFLICT ...не предназначен для массовой загрузки. Из вашего поста, LOCK TABLE testtable IN EXCLUSIVE MODE;внутри CTE есть обходной путь для получения атомарных вещей. Нет?
Париж
@parisni Это не предназначено для массовой загрузки? Говорит кто? postgresql.org/docs/current/sql-insert.html#SQL-ON-CONFLICT . Конечно, это намного медленнее, чем массовая загрузка без поведения, подобного upsert, но это очевидно и будет иметь место независимо от того, что вы делаете. Это намного быстрее, чем использование субтранзакций, это точно. Самый быстрый подход состоит в том, чтобы заблокировать целевую таблицу, а затем, конечно, выполнить insert ... where not exists ...аналогичное действие.
Крейг Рингер
1

Поскольку этот вопрос был закрыт, я публикую здесь о том, как вы делаете это с помощью SQLAlchemy. Через рекурсию он повторяет массовую вставку или обновление для борьбы с условиями гонки и ошибками проверки.

Сначала импорт

import itertools as it

from functools import partial
from operator import itemgetter

from sqlalchemy.exc import IntegrityError
from app import session
from models import Posts

Теперь пара вспомогательных функций

def chunk(content, chunksize=None):
    """Groups data into chunks each with (at most) `chunksize` items.
    https://stackoverflow.com/a/22919323/408556
    """
    if chunksize:
        i = iter(content)
        generator = (list(it.islice(i, chunksize)) for _ in it.count())
    else:
        generator = iter([content])

    return it.takewhile(bool, generator)


def gen_resources(records):
    """Yields a dictionary if the record's id already exists, a row object 
    otherwise.
    """
    ids = {item[0] for item in session.query(Posts.id)}

    for record in records:
        is_row = hasattr(record, 'to_dict')

        if is_row and record.id in ids:
            # It's a row but the id already exists, so we need to convert it 
            # to a dict that updates the existing record. Since it is duplicate,
            # also yield True
            yield record.to_dict(), True
        elif is_row:
            # It's a row and the id doesn't exist, so no conversion needed. 
            # Since it's not a duplicate, also yield False
            yield record, False
        elif record['id'] in ids:
            # It's a dict and the id already exists, so no conversion needed. 
            # Since it is duplicate, also yield True
            yield record, True
        else:
            # It's a dict and the id doesn't exist, so we need to convert it. 
            # Since it's not a duplicate, also yield False
            yield Posts(**record), False

И, наконец, функция upsert

def upsert(data, chunksize=None):
    for records in chunk(data, chunksize):
        resources = gen_resources(records)
        sorted_resources = sorted(resources, key=itemgetter(1))

        for dupe, group in it.groupby(sorted_resources, itemgetter(1)):
            items = [g[0] for g in group]

            if dupe:
                _upsert = partial(session.bulk_update_mappings, Posts)
            else:
                _upsert = session.add_all

            try:
                _upsert(items)
                session.commit()
            except IntegrityError:
                # A record was added or deleted after we checked, so retry
                # 
                # modify accordingly by adding additional exceptions, e.g.,
                # except (IntegrityError, ValidationError, ValueError)
                db.session.rollback()
                upsert(items)
            except Exception as e:
                # Some other error occurred so reduce chunksize to isolate the 
                # offending row(s)
                db.session.rollback()
                num_items = len(items)

                if num_items > 1:
                    upsert(items, num_items // 2)
                else:
                    print('Error adding record {}'.format(items[0]))

Вот как ты это используешь

>>> data = [
...     {'id': 1, 'text': 'updated post1'}, 
...     {'id': 5, 'text': 'updated post5'}, 
...     {'id': 1000, 'text': 'new post1000'}]
... 
>>> upsert(data)

Преимущество этого в bulk_save_objectsтом, что он может обрабатывать отношения, проверку ошибок и т. Д. При вставке (в отличие от массовых операций ).

reubano
источник
Это также выглядит неправильно для меня. Что если параллельный сеанс вставляет строку после сбора списка идентификаторов? Или удаляет один?
Крейг Рингер,
Хороший вопрос @CraigRinger Я делаю что-то похожее на это, но выполняю только 1 сеанс. Каков наилучший способ обработки нескольких сессий тогда? Транзакция возможно?
Reubano
Транзакции не являются волшебным решением всех проблем параллелизма. Вы можете использовать SERIALIZABLE транзакции и обрабатывать ошибки сериализации, но это медленно. Вам нужна обработка ошибок и повторный цикл. См. Мой ответ и раздел «связанное чтение».
Крейг Рингер,
@CraigRinger понял. Я фактически реализовал цикл повтора в моем собственном случае из-за других ошибок проверки. Я обновлю этот ответ соответственно.
Реубано