Python sqlite3 и параллелизм

87

У меня есть программа Python, в которой используется модуль «threading». Каждую секунду моя программа запускает новый поток, который извлекает некоторые данные из Интернета и сохраняет эти данные на моем жестком диске. Я хотел бы использовать sqlite3 для хранения этих результатов, но я не могу заставить его работать. Кажется, проблема в следующей строке:

conn = sqlite3.connect("mydatabase.db")
  • Если я помещаю эту строку кода в каждый поток, я получаю OperationalError, сообщающую мне, что файл базы данных заблокирован. Я предполагаю, что это означает, что другой поток открыл mydatabase.db через соединение sqlite3 и заблокировал его.
  • Если я помещаю эту строку кода в основную программу и передаю объект соединения (conn) каждому потоку, я получаю ProgrammingError, говорящую, что объекты SQLite, созданные в потоке, могут использоваться только в том же потоке.

Раньше я сохранял все свои результаты в файлах CSV, и у меня не было ни одной из этих проблем с блокировкой файлов. Надеюсь, это станет возможным с sqlite. Любые идеи?

RexE
источник
5
Я хотел бы отметить, что более свежие версии Python включают более новые версии sqlite3, которые должны решить эту проблему.
Райан Фуггер
@RyanFugger, знаете ли вы, какая самая ранняя версия поддерживает это? Я использую 2.7
notbad.jpeg
@RyanFugger AFAIK нет предварительно созданной версии, которая содержит более новую версию SQLite3, в которой это исправлено. Однако вы можете построить его самостоятельно.
shezi 08

Ответы:

44

Вы можете использовать шаблон потребитель-производитель. Например, вы можете создать очередь, разделяемую между потоками. Первый поток, который получает данные из Интернета, помещает эти данные в общую очередь. Другой поток, которому принадлежит соединение с базой данных, удаляет данные из очереди и передает их в базу данных.

Евгений Лазин
источник
8
FWIW: более поздние версии sqlite утверждают, что вы можете обмениваться соединениями и объектами между потоками (кроме курсоров), но на практике я обнаружил иное.
Ричард Левассер,
Вот пример того, о чем говорил выше Евгений Лазин.
dugres
4
Скрытие вашей базы данных за общей очередью - действительно плохое решение этого вопроса, потому что SQL в целом и SQLite в частности уже имеют встроенные механизмы блокировки, которые, вероятно, намного более совершенны, чем все, что вы можете создать самостоятельно.
shezi 08
1
Вопрос нужно прочитать, на тот момент не было встроенных запорных механизмов. Во многих современных встроенных базах данных этот механизм отсутствует из-за соображений производительности (например, LevelDB).
Евгений Лазин
180

Вопреки распространенному мнению, новые версии sqlite3 сделать поддержку доступа из нескольких потоков.

Это можно включить с помощью необязательного аргумента ключевого слова check_same_thread:

sqlite.connect(":memory:", check_same_thread=False)
Иеремия Роуз
источник
4
Я столкнулся с непредсказуемыми исключениями, и даже Python вылетает с этой опцией (Python 2.7 в Windows 32).
reclosedev
4
Согласно документации , в многопоточном режиме ни одно соединение с базой данных не может использоваться в нескольких потоках. Также есть сериализованный режим
Casebash
1
Неважно, только что нашел: http://sqlite.org/compile.html#threadsafe
Медейрос
1
@FrEaKmAn, извините, это было давно, тоже нет: memory: database. После этого я не использовал соединение sqlite в нескольких потоках.
reclosedev
2
@FrEaKmAn, я столкнулся с этим с дампом ядра процесса python при многопоточном доступе. Поведение было непредсказуемым, и никаких исключений зарегистрировано не было. Если я правильно помню, это было верно как для чтения, так и для записи. Это единственное, что я видел до сих пор, приводя к сбою python: D. Я не пробовал это с sqlite, скомпилированным в потокобезопасном режиме, но в то время у меня не было свободы перекомпилировать sqlite системы по умолчанию. Я закончил тем, что сделал что-то похожее на то, что предложил Эрик, и отключил совместимость потоков
verboze
17

Следующее найдено на mail.python.org.pipermail.1239789

Я нашел решение. Я не знаю, почему в документации Python нет ни слова об этой опции. Итак, мы должны добавить новый аргумент ключевого слова в функцию соединения, и мы сможем создавать из него курсоры в другом потоке. Так что используйте:

sqlite.connect(":memory:", check_same_thread = False)

отлично работает для меня. Конечно, с этого момента мне нужно позаботиться о безопасном многопоточном доступе к БД. В любом случае спасибо всем за попытку помочь.

Роберт Кролик
источник
(С GIL действительно не так много возможностей для настоящего многопоточного доступа к базе данных, что я видел)
Эрик Аронести,
ПРЕДУПРЕЖДЕНИЕ : питон документы были это сказать о check_same_threadопции: «При использовании нескольких потоков с тем же соединением операций записи должно быть сериализовать пользователь коррупции избежать данных.» Итак, да, вы можете использовать SQLite с несколькими потоками, если ваш код гарантирует, что только один поток может писать в базу данных в любой момент времени. В противном случае вы можете повредить свою базу данных.
Ajedi32
14

Переключитесь на многопроцессорность . Он намного лучше, хорошо масштабируется, может выходить за рамки использования нескольких ядер за счет использования нескольких процессоров, а интерфейс такой же, как и при использовании модуля потоковой передачи Python.

Или, как предложил Али, просто используйте механизм объединения потоков SQLAlchemy . Он сделает все за вас автоматически и имеет множество дополнительных функций, просто процитирую некоторые из них:

  1. SQLAlchemy включает диалекты для SQLite, Postgres, MySQL, Oracle, MS-SQL, Firebird, MaxDB, MS Access, Sybase и Informix; IBM также выпустила драйвер DB2. Таким образом, вам не нужно переписывать свое приложение, если вы решите отказаться от SQLite.
  2. Система Unit Of Work, центральная часть Object Relational Mapper (ORM) SQLAlchemy, организует отложенные операции создания / вставки / обновления / удаления в очереди и сбрасывает их все в одном пакете. Для этого он выполняет топологическую «сортировку зависимостей» всех измененных элементов в очереди, чтобы соблюдать ограничения внешнего ключа, и группирует избыточные операторы вместе, где они иногда могут быть дополнительно объединены. Это обеспечивает максимальную эффективность и безопасность транзакций и сводит к минимуму вероятность возникновения тупиковых ситуаций.
носкло
источник
12

Вам вообще не следует использовать для этого потоки. Это тривиальная задача для Twisted, и она, скорее всего, в любом случае приведет вас значительно дальше.

Используйте только один поток, и завершение запроса инициирует событие для выполнения записи.

twisted позаботится о расписании, обратных вызовах и т. д. за вас. Это будет передать вам весь результат в виде строки, или вы можете запустить его через потоковый процессор ( у меня есть твиттер API и FriendFeed API , что и огонь от событий , вызывающих абонентов как результаты еще загружаются).

В зависимости от того, что вы делаете со своими данными, вы можете просто выгружать полный результат в sqlite по мере его завершения, готовить его и выгружать или готовить, пока он читается, и выгружать его в конце.

У меня очень простое приложение, которое делает что-то похожее на то, что вы хотите от github. Я называю это pfetch (параллельная выборка). Он захватывает различные страницы по расписанию, передает результаты в файл и, при необходимости, запускает сценарий после успешного завершения каждой из них. Он также выполняет некоторые причудливые вещи, такие как условные GET, но все же может быть хорошей базой для всего, что вы делаете.

Дастин
источник
7

Или, если вы ленивы, как я, можете использовать SQLAlchemy . Он будет обрабатывать потоки за вас ( используя локальный поток и некоторый пул соединений ), и способ, которым он это делает, даже настраивается .

В качестве дополнительного бонуса, если / когда вы поймете / решите, что использование Sqlite для любого параллельного приложения будет катастрофой, вам не придется менять свой код для использования MySQL, Postgres или чего-либо еще. Вы можете просто переключиться.

Али Афшар
источник
1
Почему нигде на официальном сайте не указана версия Python?
Отображаемое имя
3

Вам необходимо использовать session.close()после каждой транзакции с базой данных, чтобы использовать один и тот же курсор в одном потоке, не используя один и тот же курсор в многопоточности, которая вызывает эту ошибку.

Хазем Халед
источник
1

Используйте threading.Lock ()

Александр
источник
1
Укажите, что делает следующий код и где его следует использовать.
Али Ахтари
0

Мне нравится ответ Евгения: очереди - это вообще лучший способ реализовать межпотоковое взаимодействие. Для полноты картины вот еще несколько вариантов:

  • Закройте соединение с БД, когда порожденные потоки закончат его использовать. Это исправит ваши проблемы OperationalError, но открытие и закрытие таких соединений, как правило, недопустимо из-за накладных расходов на производительность.
  • Не используйте дочерние потоки. Если задача, выполняемая один раз в секунду, достаточно легкая, вам может сойти с рук выполнение выборки и сохранения, а затем сон до подходящего момента. Это нежелательно, так как операции выборки и сохранения могут занимать> 1 секунды, и вы теряете преимущества мультиплексированных ресурсов, которые у вас есть при многопоточном подходе.
Джеймс Брэди
источник
0

Вам необходимо спроектировать параллелизм для вашей программы. SQLite имеет четкие ограничения, и вы должны их соблюдать, см. FAQ (также следующий вопрос).

iny
источник
0

Scrapy кажется потенциальным ответом на мой вопрос. Его домашняя страница точно описывает мою задачу. (Хотя я еще не уверен, насколько стабилен код.)

RexE
источник
0

Я бы посмотрел на модуль Python y_serial для сохранения данных: http://yserial.sourceforge.net

который обрабатывает проблемы взаимоблокировки, связанные с одной базой данных SQLite. Если спрос на параллелизм становится большим, можно легко настроить класс Farm для многих баз данных, чтобы распределить нагрузку в течение стохастического времени.

Надеюсь, это поможет вашему проекту ... он должен быть достаточно простым, чтобы реализовать его за 10 минут.

code43
источник
0

Я не смог найти никаких тестов ни в одном из приведенных выше ответов, поэтому я написал тест для тестирования всего.

Пробовал 3 подхода

  1. Последовательное чтение и запись из базы данных SQLite
  2. Использование ThreadPoolExecutor для чтения / записи
  3. Использование ProcessPoolExecutor для чтения / записи

Результаты и выводы теста следующие.

  1. Последовательные чтения / записи работают лучше всего
  2. Если вы должны обрабатывать параллельно, используйте ProcessPoolExecutor для параллельного чтения
  3. Не выполняйте никаких операций записи ни с помощью ThreadPoolExecutor, ни с помощью ProcessPoolExecutor, поскольку вы столкнетесь с ошибками блокировки базы данных, и вам придется повторить попытку вставки фрагмента снова.

Вы можете найти код и полное решение для тестов в моем SO-ответе ЗДЕСЬ. Надеюсь, что это поможет!

PirateApp
источник
-1

Наиболее вероятная причина возникновения ошибок с заблокированными базами данных заключается в том, что вы должны выполнить

conn.commit()

после завершения операции с базой данных. Если вы этого не сделаете, ваша база данных будет заблокирована от записи и останется такой же. Остальные потоки, ожидающие записи, будут отключены через некоторое время (по умолчанию установлено 5 секунд, см. Http://docs.python.org/2/library/sqlite3.html#sqlite3.connect подробности ) .

Пример правильной и одновременной вставки будет следующим:

import threading, sqlite3
class InsertionThread(threading.Thread):

    def __init__(self, number):
        super(InsertionThread, self).__init__()
        self.number = number

    def run(self):
        conn = sqlite3.connect('yourdb.db', timeout=5)
        conn.execute('CREATE TABLE IF NOT EXISTS threadcount (threadnum, count);')
        conn.commit()

        for i in range(1000):
            conn.execute("INSERT INTO threadcount VALUES (?, ?);", (self.number, i))
            conn.commit()

# create as many of these as you wish
# but be careful to set the timeout value appropriately: thread switching in
# python takes some time
for i in range(2):
    t = InsertionThread(i)
    t.start()

Если вам нравится SQLite или у вас есть другие инструменты, которые работают с базами данных SQLite, или вы хотите заменить файлы CSV файлами SQLite db, или вам нужно сделать что-то редкое, например межплатформенный IPC, то SQLite - отличный инструмент и очень подходит для этой цели. Не позволяйте заставлять себя использовать другое решение, если оно вам не подходит!

Shezi
источник